diff --git a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java index a68d08c38044..efe2d61dcb65 100644 --- a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java +++ b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java @@ -243,11 +243,6 @@ public void testReadWithMetadata() throws Exception { elasticsearchIOTestCommon.testReadWithMetadata(); } - @Test - public void testDefaultRetryPredicate() throws IOException { - elasticsearchIOTestCommon.testDefaultRetryPredicate(client); - } - @Test public void testWriteRetry() throws Throwable { elasticsearchIOTestCommon.setExpectedException(expectedException); diff --git a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-6/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-6/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java index be98bfe16e81..0354e6f8a4a6 100644 --- a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-6/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java +++ b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-6/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java @@ -236,11 +236,6 @@ public void testReadWithMetadata() throws Exception { elasticsearchIOTestCommon.testReadWithMetadata(); } - @Test - public void testDefaultRetryPredicate() throws IOException { - elasticsearchIOTestCommon.testDefaultRetryPredicate(client); - } - @Test public void testWriteRetry() throws Throwable { elasticsearchIOTestCommon.setExpectedException(expectedException); diff --git a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-7/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-7/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java index aab09f1b962a..d1b5dfe7fdca 100644 --- a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-7/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java +++ b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-7/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java @@ -237,11 +237,6 @@ public void testReadWithMetadata() throws Exception { elasticsearchIOTestCommon.testReadWithMetadata(); } - @Test - public void testDefaultRetryPredicate() throws IOException { - elasticsearchIOTestCommon.testDefaultRetryPredicate(client); - } - @Test public void testWriteRetry() throws Throwable { elasticsearchIOTestCommon.setExpectedException(expectedException); diff --git a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-8/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-8/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java index 6bf96360d533..9b813ece95b8 100644 --- a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-8/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java +++ b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-8/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java @@ -237,11 +237,6 @@ public void testReadWithMetadata() throws Exception { elasticsearchIOTestCommon.testReadWithMetadata(); } - @Test - public void testDefaultRetryPredicate() throws IOException { - elasticsearchIOTestCommon.testDefaultRetryPredicate(client); - } - @Test public void testWriteRetry() throws Throwable { elasticsearchIOTestCommon.setExpectedException(expectedException); diff --git a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java index 117e004526a7..c51aec63ad85 100644 --- a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java +++ b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java @@ -22,19 +22,9 @@ import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.ConnectionConfiguration; import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.DocToBulk; import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.Read; -import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.RetryConfiguration.DEFAULT_RETRY_PREDICATE; import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.Write; import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.getBackendVersion; -import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestUtils.FAMOUS_SCIENTISTS; -import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestUtils.INVALID_DOCS_IDS; -import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestUtils.NUM_SCIENTISTS; -import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestUtils.SCRIPT_SOURCE; -import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestUtils.countByMatch; -import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestUtils.countByScientistName; -import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestUtils.flushAndRefreshAllIndices; -import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestUtils.insertTestDocuments; -import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestUtils.mapToInputId; -import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestUtils.refreshIndexAndGetCurrentNumDocs; +import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestUtils.*; import static org.apache.beam.sdk.testing.SourceTestUtils.readFromSource; import static org.apache.beam.sdk.values.TypeDescriptors.integers; import static org.hamcrest.MatcherAssert.assertThat; @@ -43,7 +33,6 @@ import static org.hamcrest.core.Is.is; import static org.hamcrest.core.Is.isA; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -57,7 +46,6 @@ import java.io.Serializable; import java.nio.charset.StandardCharsets; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; @@ -95,11 +83,6 @@ import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.TimestampedValue; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables; -import org.apache.http.HttpEntity; -import org.apache.http.entity.ContentType; -import org.apache.http.nio.entity.NStringEntity; -import org.elasticsearch.client.Request; -import org.elasticsearch.client.Response; import org.elasticsearch.client.RestClient; import org.hamcrest.CustomMatcher; import org.hamcrest.Description; @@ -118,21 +101,10 @@ class ElasticsearchIOTestCommon implements Serializable { private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchIOTestCommon.class); - private static final RetryPredicate CUSTOM_RETRY_PREDICATE = new DefaultRetryPredicate(400); + private static final RetryPredicate CUSTOM_RETRY_PREDICATE = new DefaultRetryPredicate(405); private static final int EXPECTED_RETRIES = 2; private static final int MAX_ATTEMPTS = 3; - private static final String[] BAD_FORMATTED_DOC = {"{ \"x\" :a,\"y\":\"ab\" }"}; - private static final String OK_REQUEST = - "{ \"index\" : { \"_index\" : \"test\", \"_type\" : \"doc\", \"_id\" : \"1\" } }\n" - + "{ \"field1\" : 1 }\n"; - private static final String OK_REQUEST_NO_TYPE = - "{ \"index\" : { \"_index\" : \"test\", \"_id\" : \"1\" } }\n" + "{ \"field1\" : 1 }\n"; - private static final String BAD_REQUEST = - "{ \"index\" : { \"_index\" : \"test\", \"_type\" : \"doc\", \"_id\" : \"1\" } }\n" - + "{ \"field1\" : @ }\n"; - private static final String BAD_REQUEST_NO_TYPE = - "{ \"index\" : { \"_index\" : \"test\", \"_id\" : \"1\" } }\n" + "{ \"field1\" : @ }\n"; static String getEsIndex() { return "beam" + Thread.currentThread().getId(); @@ -1066,32 +1038,9 @@ public Void apply(Iterable input) { } } - /** Test that the default predicate correctly parses chosen error code. */ - void testDefaultRetryPredicate(RestClient restClient) throws IOException { - HttpEntity entity1, entity2; - - if (getBackendVersion(restClient) > 7) { - entity1 = new NStringEntity(BAD_REQUEST_NO_TYPE, ContentType.APPLICATION_JSON); - entity2 = new NStringEntity(OK_REQUEST_NO_TYPE, ContentType.APPLICATION_JSON); - } else { - entity1 = new NStringEntity(BAD_REQUEST, ContentType.APPLICATION_JSON); - entity2 = new NStringEntity(OK_REQUEST, ContentType.APPLICATION_JSON); - } - - Request request = new Request("POST", "/_bulk"); - request.addParameters(Collections.emptyMap()); - request.setEntity(entity1); - Response response1 = restClient.performRequest(request); - assertTrue(CUSTOM_RETRY_PREDICATE.test(response1.getEntity())); - - request.setEntity(entity2); - Response response2 = restClient.performRequest(request); - assertFalse(DEFAULT_RETRY_PREDICATE.test(response2.getEntity())); - } - /** * Test that retries are invoked when Elasticsearch returns a specific error code. We invoke this - * by issuing corrupt data and retrying on the `400` error code. Normal behaviour is to retry on + * by using empty endpoint and retrying on the `405` error code. Normal behaviour is to retry on * `429` only but that is difficult to simulate reliably. The logger is used to verify expected * behavior. */ @@ -1104,11 +1053,14 @@ void testWriteRetry() throws Throwable { ElasticsearchIO.Write write = ElasticsearchIO.write() - .withConnectionConfiguration(connectionConfiguration) + .withConnectionConfiguration(connectionConfiguration.withInvalidBulkEndpoint()) .withRetryConfiguration( ElasticsearchIO.RetryConfiguration.create(MAX_ATTEMPTS, Duration.millis(35000)) .withRetryPredicate(CUSTOM_RETRY_PREDICATE)); - pipeline.apply(Create.of(Arrays.asList(BAD_FORMATTED_DOC))).apply(write); + List data = + ElasticsearchIOTestUtils.createDocuments( + numDocs, ElasticsearchIOTestUtils.InjectionMode.DO_NOT_INJECT_INVALID_DOCS); + pipeline.apply(Create.of(data)).apply(write); pipeline.run(); } diff --git a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java index f4b3447936b4..fde5e5f6d769 100644 --- a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java +++ b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java @@ -110,6 +110,7 @@ import org.checkerframework.checker.nullness.qual.Nullable; import org.elasticsearch.client.Request; import org.elasticsearch.client.Response; +import org.elasticsearch.client.ResponseException; import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestClientBuilder; import org.joda.time.Duration; @@ -343,6 +344,8 @@ public abstract static class ConnectionConfiguration implements Serializable { public abstract boolean isTrustSelfSignedCerts(); + abstract boolean isInvalidBulkEndpoint(); + abstract Builder builder(); @AutoValue.Builder @@ -373,6 +376,8 @@ abstract static class Builder { abstract Builder setTrustSelfSignedCerts(boolean trustSelfSignedCerts); + abstract Builder setInvalidBulkEndpoint(boolean invalidBulkEndpoint); + abstract ConnectionConfiguration build(); } @@ -394,6 +399,7 @@ public static ConnectionConfiguration create(String[] addresses, String index, S .setIndex(index) .setType(type) .setTrustSelfSignedCerts(false) + .setInvalidBulkEndpoint(false) .build(); } @@ -413,6 +419,7 @@ public static ConnectionConfiguration create(String[] addresses, String index) { .setIndex(index) .setType("") .setTrustSelfSignedCerts(false) + .setInvalidBulkEndpoint(false) .build(); } @@ -430,6 +437,7 @@ public static ConnectionConfiguration create(String[] addresses) { .setIndex("") .setType("") .setTrustSelfSignedCerts(false) + .setInvalidBulkEndpoint(false) .build(); } @@ -468,6 +476,9 @@ public String getPrefixedEndpoint(String endpoint) { } public String getBulkEndPoint() { + if (isInvalidBulkEndpoint()) { + return ""; + } return getPrefixedEndpoint("_bulk"); } @@ -642,6 +653,12 @@ public ConnectionConfiguration withConnectTimeout(Integer connectTimeout) { return builder().setConnectTimeout(connectTimeout).build(); } + // Exposed only to allow tests to easily simulate server errors + @VisibleForTesting + ConnectionConfiguration withInvalidBulkEndpoint() { + return builder().setInvalidBulkEndpoint(true).build(); + } + private void populateDisplayData(DisplayData.Builder builder) { builder.add(DisplayData.item("address", getAddresses().toString())); builder.add(DisplayData.item("index", getIndex())); @@ -1204,7 +1221,7 @@ RetryConfiguration withRetryPredicate(RetryPredicate predicate) { * the requests to the Elasticsearch server if the {@link RetryConfiguration} permits it. */ @FunctionalInterface - interface RetryPredicate extends Predicate, Serializable {} + interface RetryPredicate extends Predicate, Serializable {} /** * This is the default predicate used to test if a failed ES operation should be retried. A @@ -1224,26 +1241,9 @@ static class DefaultRetryPredicate implements RetryPredicate { this(429); } - /** Returns true if the response has the error code for any mutation. */ - private static boolean errorCodePresent(HttpEntity responseEntity, int errorCode) { - try { - JsonNode json = parseResponse(responseEntity); - if (json.path("errors").asBoolean()) { - for (JsonNode item : json.path("items")) { - if (item.findValue("status").asInt() == errorCode) { - return true; - } - } - } - } catch (IOException e) { - LOG.warn("Could not extract error codes from responseEntity {}", responseEntity); - } - return false; - } - @Override - public boolean test(HttpEntity responseEntity) { - return errorCodePresent(responseEntity, errorCode); + public boolean test(Integer statusCode) { + return this.errorCode == statusCode; } } } @@ -2535,8 +2535,7 @@ private List flushBatch() throws IOException, InterruptedException { bulkRequest.append(doc.getBulkDirective()); } - Response response = null; - HttpEntity responseEntity = null; + HttpEntity responseEntity; // Elasticsearch will default to the index/type provided the {@link // ConnectionConfiguration} if none are set in the document meta (i.e. @@ -2546,28 +2545,15 @@ private List flushBatch() throws IOException, InterruptedException { HttpEntity requestBody = new NStringEntity(bulkRequest.toString(), ContentType.APPLICATION_JSON); - try { + if (spec.getRetryConfiguration() != null) { + responseEntity = + performRequestWithRetry("POST", endPoint, Collections.emptyMap(), requestBody); + } else { Request request = new Request("POST", endPoint); request.addParameters(Collections.emptyMap()); request.setEntity(requestBody); - response = restClient.performRequest(request); + Response response = restClient.performRequest(request); responseEntity = new BufferedHttpEntity(response.getEntity()); - } catch (java.io.IOException ex) { - if (spec.getRetryConfiguration() == null || !isRetryableClientException(ex)) { - throw ex; - } - LOG.error("Caught ES timeout, retrying", ex); - } - - if (spec.getRetryConfiguration() != null - && (response == null - || responseEntity == null - || spec.getRetryConfiguration().getRetryPredicate().test(responseEntity))) { - if (responseEntity != null - && spec.getRetryConfiguration().getRetryPredicate().test(responseEntity)) { - LOG.warn("ES Cluster is responding with HTP 429 - TOO_MANY_REQUESTS."); - } - responseEntity = handleRetry("POST", endPoint, Collections.emptyMap(), requestBody); } List responses = @@ -2584,39 +2570,40 @@ private List flushBatch() throws IOException, InterruptedException { .collect(Collectors.toList()); } - /** retry request based on retry configuration policy. */ - private HttpEntity handleRetry( + /** performe and retry request based on retry configuration policy. */ + private HttpEntity performRequestWithRetry( String method, String endpoint, Map params, HttpEntity requestBody) throws IOException, InterruptedException { - Response response; - HttpEntity responseEntity = null; + RetryConfiguration.RetryPredicate predicate = + Objects.requireNonNull(spec.getRetryConfiguration()).getRetryPredicate(); Sleeper sleeper = Sleeper.DEFAULT; BackOff backoff = retryBackoff.backoff(); - int attempt = 0; + int attempt = -1; // while retry policy exists - while (BackOffUtils.next(sleeper, backoff)) { - LOG.warn(RETRY_ATTEMPT_LOG, ++attempt); + do { + if (++attempt > 0) { + LOG.warn(RETRY_ATTEMPT_LOG, attempt); + } try { Request request = new Request(method, endpoint); request.addParameters(params); request.setEntity(requestBody); - response = restClient.performRequest(request); - responseEntity = new BufferedHttpEntity(response.getEntity()); + Response response = restClient.performRequest(request); + return new BufferedHttpEntity(response.getEntity()); + } catch (ResponseException ex) { + if (predicate.test(ex.getResponse().getStatusLine().getStatusCode())) { + LOG.warn("ES Cluster is responding with HTTP 429 - TOO_MANY_REQUESTS.", ex); + } else { + throw ex; + } } catch (java.io.IOException ex) { if (isRetryableClientException(ex)) { - LOG.error("Caught ES timeout, retrying", ex); - continue; + LOG.warn("Caught ES timeout, retrying", ex); + } else { + throw ex; } } - // if response has no 429 errors - if (!Objects.requireNonNull(spec.getRetryConfiguration()) - .getRetryPredicate() - .test(responseEntity)) { - return responseEntity; - } else { - LOG.warn("ES Cluster is responding with HTP 429 - TOO_MANY_REQUESTS."); - } - } + } while (BackOffUtils.next(sleeper, backoff)); throw new IOException(String.format(RETRY_FAILED_LOG, attempt)); }