Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle response exception from Elasticsearch client #26424

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -243,11 +243,6 @@ public void testReadWithMetadata() throws Exception {
elasticsearchIOTestCommon.testReadWithMetadata();
}

@Test
public void testDefaultRetryPredicate() throws IOException {
elasticsearchIOTestCommon.testDefaultRetryPredicate(client);
}

@Test
public void testWriteRetry() throws Throwable {
elasticsearchIOTestCommon.setExpectedException(expectedException);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,11 +236,6 @@ public void testReadWithMetadata() throws Exception {
elasticsearchIOTestCommon.testReadWithMetadata();
}

@Test
public void testDefaultRetryPredicate() throws IOException {
elasticsearchIOTestCommon.testDefaultRetryPredicate(client);
}

@Test
public void testWriteRetry() throws Throwable {
elasticsearchIOTestCommon.setExpectedException(expectedException);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,11 +237,6 @@ public void testReadWithMetadata() throws Exception {
elasticsearchIOTestCommon.testReadWithMetadata();
}

@Test
public void testDefaultRetryPredicate() throws IOException {
elasticsearchIOTestCommon.testDefaultRetryPredicate(client);
}

@Test
public void testWriteRetry() throws Throwable {
elasticsearchIOTestCommon.setExpectedException(expectedException);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,11 +237,6 @@ public void testReadWithMetadata() throws Exception {
elasticsearchIOTestCommon.testReadWithMetadata();
}

@Test
public void testDefaultRetryPredicate() throws IOException {
elasticsearchIOTestCommon.testDefaultRetryPredicate(client);
}

@Test
public void testWriteRetry() throws Throwable {
elasticsearchIOTestCommon.setExpectedException(expectedException);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,9 @@
import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.ConnectionConfiguration;
import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.DocToBulk;
import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.Read;
import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.RetryConfiguration.DEFAULT_RETRY_PREDICATE;
import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.Write;
import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.getBackendVersion;
import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestUtils.FAMOUS_SCIENTISTS;
import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestUtils.INVALID_DOCS_IDS;
import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestUtils.NUM_SCIENTISTS;
import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestUtils.SCRIPT_SOURCE;
import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestUtils.countByMatch;
import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestUtils.countByScientistName;
import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestUtils.flushAndRefreshAllIndices;
import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestUtils.insertTestDocuments;
import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestUtils.mapToInputId;
import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestUtils.refreshIndexAndGetCurrentNumDocs;
import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestUtils.*;
import static org.apache.beam.sdk.testing.SourceTestUtils.readFromSource;
import static org.apache.beam.sdk.values.TypeDescriptors.integers;
import static org.hamcrest.MatcherAssert.assertThat;
Expand All @@ -43,7 +33,6 @@
import static org.hamcrest.core.Is.is;
import static org.hamcrest.core.Is.isA;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

Expand All @@ -57,7 +46,6 @@
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -95,11 +83,6 @@
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.http.HttpEntity;
import org.apache.http.entity.ContentType;
import org.apache.http.nio.entity.NStringEntity;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.RestClient;
import org.hamcrest.CustomMatcher;
import org.hamcrest.Description;
Expand All @@ -118,21 +101,10 @@ class ElasticsearchIOTestCommon implements Serializable {

private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchIOTestCommon.class);

private static final RetryPredicate CUSTOM_RETRY_PREDICATE = new DefaultRetryPredicate(400);
private static final RetryPredicate CUSTOM_RETRY_PREDICATE = new DefaultRetryPredicate(405);

private static final int EXPECTED_RETRIES = 2;
private static final int MAX_ATTEMPTS = 3;
private static final String[] BAD_FORMATTED_DOC = {"{ \"x\" :a,\"y\":\"ab\" }"};
private static final String OK_REQUEST =
"{ \"index\" : { \"_index\" : \"test\", \"_type\" : \"doc\", \"_id\" : \"1\" } }\n"
+ "{ \"field1\" : 1 }\n";
private static final String OK_REQUEST_NO_TYPE =
"{ \"index\" : { \"_index\" : \"test\", \"_id\" : \"1\" } }\n" + "{ \"field1\" : 1 }\n";
private static final String BAD_REQUEST =
"{ \"index\" : { \"_index\" : \"test\", \"_type\" : \"doc\", \"_id\" : \"1\" } }\n"
+ "{ \"field1\" : @ }\n";
private static final String BAD_REQUEST_NO_TYPE =
"{ \"index\" : { \"_index\" : \"test\", \"_id\" : \"1\" } }\n" + "{ \"field1\" : @ }\n";

static String getEsIndex() {
return "beam" + Thread.currentThread().getId();
Expand Down Expand Up @@ -1066,32 +1038,9 @@ public Void apply(Iterable<String> input) {
}
}

/** Test that the default predicate correctly parses chosen error code. */
void testDefaultRetryPredicate(RestClient restClient) throws IOException {
HttpEntity entity1, entity2;

if (getBackendVersion(restClient) > 7) {
entity1 = new NStringEntity(BAD_REQUEST_NO_TYPE, ContentType.APPLICATION_JSON);
entity2 = new NStringEntity(OK_REQUEST_NO_TYPE, ContentType.APPLICATION_JSON);
} else {
entity1 = new NStringEntity(BAD_REQUEST, ContentType.APPLICATION_JSON);
entity2 = new NStringEntity(OK_REQUEST, ContentType.APPLICATION_JSON);
}

Request request = new Request("POST", "/_bulk");
request.addParameters(Collections.emptyMap());
request.setEntity(entity1);
Response response1 = restClient.performRequest(request);
assertTrue(CUSTOM_RETRY_PREDICATE.test(response1.getEntity()));

request.setEntity(entity2);
Response response2 = restClient.performRequest(request);
assertFalse(DEFAULT_RETRY_PREDICATE.test(response2.getEntity()));
}

/**
* Test that retries are invoked when Elasticsearch returns a specific error code. We invoke this
* by issuing corrupt data and retrying on the `400` error code. Normal behaviour is to retry on
* by using empty endpoint and retrying on the `405` error code. Normal behaviour is to retry on
* `429` only but that is difficult to simulate reliably. The logger is used to verify expected
* behavior.
*/
Expand All @@ -1104,11 +1053,14 @@ void testWriteRetry() throws Throwable {

ElasticsearchIO.Write write =
ElasticsearchIO.write()
.withConnectionConfiguration(connectionConfiguration)
.withConnectionConfiguration(connectionConfiguration.withInvalidBulkEndpoint())
.withRetryConfiguration(
ElasticsearchIO.RetryConfiguration.create(MAX_ATTEMPTS, Duration.millis(35000))
.withRetryPredicate(CUSTOM_RETRY_PREDICATE));
pipeline.apply(Create.of(Arrays.asList(BAD_FORMATTED_DOC))).apply(write);
List<String> data =
ElasticsearchIOTestUtils.createDocuments(
numDocs, ElasticsearchIOTestUtils.InjectionMode.DO_NOT_INJECT_INVALID_DOCS);
pipeline.apply(Create.of(data)).apply(write);

pipeline.run();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@
import org.checkerframework.checker.nullness.qual.Nullable;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.joda.time.Duration;
Expand Down Expand Up @@ -343,6 +344,8 @@ public abstract static class ConnectionConfiguration implements Serializable {

public abstract boolean isTrustSelfSignedCerts();

abstract boolean isInvalidBulkEndpoint();

abstract Builder builder();

@AutoValue.Builder
Expand Down Expand Up @@ -373,6 +376,8 @@ abstract static class Builder {

abstract Builder setTrustSelfSignedCerts(boolean trustSelfSignedCerts);

abstract Builder setInvalidBulkEndpoint(boolean invalidBulkEndpoint);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't particularly like the idea of having this builder method only for the sake of testing. I feel it will be very confusing to users to see this as an option when building configurations. Could you look for an alternative approach?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this method and this are not visible from users. Same approach as this.


abstract ConnectionConfiguration build();
}

Expand All @@ -394,6 +399,7 @@ public static ConnectionConfiguration create(String[] addresses, String index, S
.setIndex(index)
.setType(type)
.setTrustSelfSignedCerts(false)
.setInvalidBulkEndpoint(false)
.build();
}

Expand All @@ -413,6 +419,7 @@ public static ConnectionConfiguration create(String[] addresses, String index) {
.setIndex(index)
.setType("")
.setTrustSelfSignedCerts(false)
.setInvalidBulkEndpoint(false)
.build();
}

Expand All @@ -430,6 +437,7 @@ public static ConnectionConfiguration create(String[] addresses) {
.setIndex("")
.setType("")
.setTrustSelfSignedCerts(false)
.setInvalidBulkEndpoint(false)
.build();
}

Expand Down Expand Up @@ -468,6 +476,9 @@ public String getPrefixedEndpoint(String endpoint) {
}

public String getBulkEndPoint() {
if (isInvalidBulkEndpoint()) {
return "";
}
return getPrefixedEndpoint("_bulk");
}

Expand Down Expand Up @@ -642,6 +653,12 @@ public ConnectionConfiguration withConnectTimeout(Integer connectTimeout) {
return builder().setConnectTimeout(connectTimeout).build();
}

// Exposed only to allow tests to easily simulate server errors
@VisibleForTesting
ConnectionConfiguration withInvalidBulkEndpoint() {
return builder().setInvalidBulkEndpoint(true).build();
}

private void populateDisplayData(DisplayData.Builder builder) {
builder.add(DisplayData.item("address", getAddresses().toString()));
builder.add(DisplayData.item("index", getIndex()));
Expand Down Expand Up @@ -1204,7 +1221,7 @@ RetryConfiguration withRetryPredicate(RetryPredicate predicate) {
* the requests to the Elasticsearch server if the {@link RetryConfiguration} permits it.
*/
@FunctionalInterface
interface RetryPredicate extends Predicate<HttpEntity>, Serializable {}
interface RetryPredicate extends Predicate<Integer>, Serializable {}

/**
* This is the default predicate used to test if a failed ES operation should be retried. A
Expand All @@ -1224,26 +1241,9 @@ static class DefaultRetryPredicate implements RetryPredicate {
this(429);
}

/** Returns true if the response has the error code for any mutation. */
private static boolean errorCodePresent(HttpEntity responseEntity, int errorCode) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @mrkm4ntr for your patience, I have kept you waiting too many times.

This change is definitely interesting. I can see that in some cases, such as successfully creating a document via _bulk, the http status code is 200 while the status field under items in the response body is 201 (updating the doc results in status field being 200:

{
    "took": 383,
    "errors": false,
    "items": [
        {
            "update": {
                "_index": "foo",
                "_type": "_doc",
                "_id": "2",
                "_version": 1,
                "result": "created",
                "_shards": {
                    "total": 2,
                    "successful": 1,
                    "failed": 0
                },
                "_seq_no": 0,
                "_primary_term": 1,
                "status": 201
            }
        }
    ]
}

It's also the case, more importantly, that when one of the operations of the bulk payload fails, but another succeeds, there can be an http status code of 200 but one of the items can have status field with non-200 code:

{
    "took": 19,
    "errors": true,
    "items": [
        {
            "update": {
                "_index": "foo",
                "_type": "_doc",
                "_id": "2",
                "_version": 7,
                "result": "updated",
                "_shards": {
                    "total": 2,
                    "successful": 2,
                    "failed": 0
                },
                "_seq_no": 7,
                "_primary_term": 1,
                "status": 200
            }
        },
        {
            "update": {
                "_index": "foo",
                "_type": "_doc",
                "_id": "2",
                "status": 400,
                "error": {
                    "type": "illegal_argument_exception",
                    "reason": "failed to execute script",
                    "caused_by": {
                        "type": "script_exception",
                        "reason": "compile error",
                        "script_stack": [
                            "if(ctx._source.group !=== null) { ctx._source.gro ...",
                            "                        ^---- HERE"
                        ],
                        "script": "if(ctx._source.group !=== null) { ctx._source.group = params.id % 2 } else { ctx._source.group = 0 }",
                        "lang": "painless",
                        "position": {
                            "offset": 24,
                            "start": 0,
                            "end": 49
                        },
                        "caused_by": {
                            "type": "illegal_argument_exception",
                            "reason": "invalid sequence of tokens near ['='].",
                            "caused_by": {
                                "type": "no_viable_alt_exception",
                                "reason": "no_viable_alt_exception: null"
                            }
                        }
                    }
                }
            }
        }
    ]
}

So the status code alone cannot be "trusted" to inform of all failures, since there could be false negatives. However, it seems that the http status code can be used as a first check: if the http status code is >=400, we know a failure has occurred and we don't need to iterate through the items necessarily.

Checking the http status code first might be a viable path forward for properly handling the 429 case.

Thoughts?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks.
So, you believe there's a scenario where the HTTP status code is 200, but the status in the body is 429. I'm not certain this is accurate, but we should pay attention to this if we're not confident.

OK, I will change my PR like this.

Checking the http status code first

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, you believe there's a scenario where the HTTP status code is 200, but the status in the body is 429.

It's more so that I feel that we cannot remove the errorCodePresent method and rely on http status code alone. I have seen first hand (examples above) scenarios where 1 out of n bulk entities succeeds and therefore the http status code is 200, even if n-1 bulk entities failed and had their associated status field in items set to something like 400.

I believe the failure mode for not properly handling http 429 is that the response body does not contain the key items which errorCodePresent currently depends on to look further for status. Instead, in the case of 429, status is a top-level key in the response body.

So I feel that we need to keep all existing errorCodePresent logic, and add additional handling to check the http status code.

try {
JsonNode json = parseResponse(responseEntity);
if (json.path("errors").asBoolean()) {
for (JsonNode item : json.path("items")) {
if (item.findValue("status").asInt() == errorCode) {
return true;
}
}
}
} catch (IOException e) {
LOG.warn("Could not extract error codes from responseEntity {}", responseEntity);
}
return false;
}

@Override
public boolean test(HttpEntity responseEntity) {
return errorCodePresent(responseEntity, errorCode);
public boolean test(Integer statusCode) {
return this.errorCode == statusCode;
}
}
}
Expand Down Expand Up @@ -2535,8 +2535,7 @@ private List<Document> flushBatch() throws IOException, InterruptedException {
bulkRequest.append(doc.getBulkDirective());
}

Response response = null;
HttpEntity responseEntity = null;
HttpEntity responseEntity;

// Elasticsearch will default to the index/type provided the {@link
// ConnectionConfiguration} if none are set in the document meta (i.e.
Expand All @@ -2546,28 +2545,15 @@ private List<Document> flushBatch() throws IOException, InterruptedException {

HttpEntity requestBody =
new NStringEntity(bulkRequest.toString(), ContentType.APPLICATION_JSON);
try {
if (spec.getRetryConfiguration() != null) {
responseEntity =
performRequestWithRetry("POST", endPoint, Collections.emptyMap(), requestBody);
} else {
Request request = new Request("POST", endPoint);
request.addParameters(Collections.emptyMap());
request.setEntity(requestBody);
response = restClient.performRequest(request);
Response response = restClient.performRequest(request);
responseEntity = new BufferedHttpEntity(response.getEntity());
} catch (java.io.IOException ex) {
if (spec.getRetryConfiguration() == null || !isRetryableClientException(ex)) {
throw ex;
}
LOG.error("Caught ES timeout, retrying", ex);
}

if (spec.getRetryConfiguration() != null
&& (response == null
|| responseEntity == null
|| spec.getRetryConfiguration().getRetryPredicate().test(responseEntity))) {
if (responseEntity != null
&& spec.getRetryConfiguration().getRetryPredicate().test(responseEntity)) {
LOG.warn("ES Cluster is responding with HTP 429 - TOO_MANY_REQUESTS.");
}
responseEntity = handleRetry("POST", endPoint, Collections.emptyMap(), requestBody);
}

List<Document> responses =
Expand All @@ -2584,39 +2570,40 @@ private List<Document> flushBatch() throws IOException, InterruptedException {
.collect(Collectors.toList());
}

/** retry request based on retry configuration policy. */
private HttpEntity handleRetry(
/** performe and retry request based on retry configuration policy. */
private HttpEntity performRequestWithRetry(
String method, String endpoint, Map<String, String> params, HttpEntity requestBody)
throws IOException, InterruptedException {
Response response;
HttpEntity responseEntity = null;
RetryConfiguration.RetryPredicate predicate =
Objects.requireNonNull(spec.getRetryConfiguration()).getRetryPredicate();
Sleeper sleeper = Sleeper.DEFAULT;
BackOff backoff = retryBackoff.backoff();
int attempt = 0;
int attempt = -1;
// while retry policy exists
while (BackOffUtils.next(sleeper, backoff)) {
LOG.warn(RETRY_ATTEMPT_LOG, ++attempt);
do {
if (++attempt > 0) {
LOG.warn(RETRY_ATTEMPT_LOG, attempt);
}
try {
Request request = new Request(method, endpoint);
request.addParameters(params);
request.setEntity(requestBody);
response = restClient.performRequest(request);
responseEntity = new BufferedHttpEntity(response.getEntity());
Response response = restClient.performRequest(request);
return new BufferedHttpEntity(response.getEntity());
} catch (ResponseException ex) {
if (predicate.test(ex.getResponse().getStatusLine().getStatusCode())) {
LOG.warn("ES Cluster is responding with HTTP 429 - TOO_MANY_REQUESTS.", ex);
} else {
throw ex;
}
} catch (java.io.IOException ex) {
if (isRetryableClientException(ex)) {
LOG.error("Caught ES timeout, retrying", ex);
continue;
LOG.warn("Caught ES timeout, retrying", ex);
} else {
throw ex;
}
}
// if response has no 429 errors
if (!Objects.requireNonNull(spec.getRetryConfiguration())
.getRetryPredicate()
.test(responseEntity)) {
return responseEntity;
} else {
LOG.warn("ES Cluster is responding with HTP 429 - TOO_MANY_REQUESTS.");
}
}
} while (BackOffUtils.next(sleeper, backoff));
throw new IOException(String.format(RETRY_FAILED_LOG, attempt));
}

Expand Down