diff --git a/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchClient.java b/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchClient.java index 7303a67cc..7024a66dc 100644 --- a/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchClient.java +++ b/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchClient.java @@ -491,7 +491,8 @@ private T callWithRetries(String description, Callable function) { description, function, config.maxRetries() + 1, - config.retryBackoffMs() + config.retryBackoffMs(), + config.maxRetryDurationMs() ); } diff --git a/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkConnectorConfig.java b/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkConnectorConfig.java index 51c7d4c85..5cb64d6d6 100644 --- a/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkConnectorConfig.java +++ b/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkConnectorConfig.java @@ -33,6 +33,8 @@ import org.apache.kafka.common.config.ConfigDef.Width; import org.apache.kafka.common.config.types.Password; import org.elasticsearch.common.unit.ByteSizeValue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import static org.apache.kafka.common.config.ConfigDef.Range.between; import static org.apache.kafka.common.config.SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG; @@ -46,6 +48,13 @@ import static org.apache.kafka.common.config.SslConfigs.addClientSslSupport; public class ElasticsearchSinkConnectorConfig extends AbstractConfig { + private static final Logger log = LoggerFactory.getLogger(ElasticsearchSinkConnectorConfig.class); + + /** + * Default value for maximum retry time + */ + static final long MAX_RETRY_TIME_MS = TimeUnit.HOURS.toMillis(24); + // Connector group public static final String CONNECTION_URL_CONFIG = "connection.url"; private static final String CONNECTION_URL_DOC = @@ -137,6 +146,13 @@ public class ElasticsearchSinkConnectorConfig extends AbstractConfig { private static final String MAX_RETRIES_DISPLAY = "Max Retries"; private static final int MAX_RETRIES_DEFAULT = 5; + public static final String MAX_RETRY_DURATION_MS_CONFIG = "max.retry.duration.ms"; + private static final String MAX_RETRY_DURATION_MS_DOC = + "The maximum duration of retry time in ms. Default value is 24 hours. " + + "Once this value is reached, the provided duration will be used for all consequent retries."; + private static final String MAX_RETRY_DURATION_MS_DISPLAY = "Retry Backoff Max (ms)"; + private static final long MAX_RETRY_DURATION_MS_DEFAULT = MAX_RETRY_TIME_MS; + public static final String RETRY_BACKOFF_MS_CONFIG = "retry.backoff.ms"; private static final String RETRY_BACKOFF_MS_DOC = "How long to wait in milliseconds before attempting the first retry of a failed indexing " @@ -554,6 +570,17 @@ private static void addConnectorConfigs(ConfigDef configDef) { ++order, Width.SHORT, RETRY_BACKOFF_MS_DISPLAY + ).define( + MAX_RETRY_DURATION_MS_CONFIG, + Type.LONG, + MAX_RETRY_DURATION_MS_DEFAULT, + between(0, TimeUnit.DAYS.toMillis(1)), + Importance.LOW, + MAX_RETRY_DURATION_MS_DOC, + CONNECTOR_GROUP, + ++order, + Width.SHORT, + MAX_RETRY_DURATION_MS_DISPLAY ).define( CONNECTION_COMPRESSION_CONFIG, Type.BOOLEAN, @@ -1034,6 +1061,18 @@ public long retryBackoffMs() { return getLong(RETRY_BACKOFF_MS_CONFIG); } + public long maxRetryDurationMs() { + long maxRetryDurationMs = getLong(MAX_RETRY_DURATION_MS_CONFIG); + + if (maxRetryDurationMs <= retryBackoffMs()) { + log.warn("Value at \"max.retry.duration.ms\" should be greater than value at \"retry.backoff.ms\". " + + "Using default value of " + MAX_RETRY_DURATION_MS_DEFAULT + " ms for \"max.retry.duration.ms\""); + maxRetryDurationMs = MAX_RETRY_DURATION_MS_DEFAULT; + } + + return maxRetryDurationMs; + } + private SecurityProtocol securityProtocol() { return SecurityProtocol.valueOf(getString(SECURITY_PROTOCOL_CONFIG).toUpperCase()); } diff --git a/src/main/java/io/confluent/connect/elasticsearch/RetryUtil.java b/src/main/java/io/confluent/connect/elasticsearch/RetryUtil.java index 7234da95b..db21e58cd 100644 --- a/src/main/java/io/confluent/connect/elasticsearch/RetryUtil.java +++ b/src/main/java/io/confluent/connect/elasticsearch/RetryUtil.java @@ -17,7 +17,6 @@ import java.util.concurrent.Callable; import java.util.concurrent.ThreadLocalRandom; -import java.util.concurrent.TimeUnit; import org.apache.kafka.common.utils.Time; import org.apache.kafka.connect.errors.ConnectException; @@ -39,11 +38,6 @@ public class RetryUtil { private static final Logger log = LoggerFactory.getLogger(RetryUtil.class); - /** - * An arbitrary absolute maximum practical retry time. - */ - public static final long MAX_RETRY_TIME_MS = TimeUnit.HOURS.toMillis(24); - /** * Compute the time to sleep using exponential backoff with jitter. This method computes the * normal exponential backoff as {@code initialRetryBackoffMs << retryAttempt}, and then @@ -52,34 +46,38 @@ public class RetryUtil { * @param retryAttempts the number of previous retry attempts; must be non-negative * @param initialRetryBackoffMs the initial time to wait before retrying; assumed to * be 0 if value is negative + * @param maxRetryDurationMs The maximum value of the retry duration. * @return the non-negative time in milliseconds to wait before the next retry attempt, * or 0 if {@code initialRetryBackoffMs} is negative */ public static long computeRandomRetryWaitTimeInMillis(int retryAttempts, - long initialRetryBackoffMs) { + long initialRetryBackoffMs, + long maxRetryDurationMs) { if (initialRetryBackoffMs < 0) { return 0; } if (retryAttempts < 0) { return initialRetryBackoffMs; } - long maxRetryTime = computeRetryWaitTimeInMillis(retryAttempts, initialRetryBackoffMs); + long maxRetryTime = computeRetryWaitTimeInMillis(retryAttempts, initialRetryBackoffMs, maxRetryDurationMs); return ThreadLocalRandom.current().nextLong(0, maxRetryTime); } /** * Compute the time to sleep using exponential backoff. This method computes the normal * exponential backoff as {@code initialRetryBackoffMs << retryAttempt}, bounded to always - * be less than {@link #MAX_RETRY_TIME_MS}. + * be less than value provided in maxRetryDurationMs. * * @param retryAttempts the number of previous retry attempts; must be non-negative * @param initialRetryBackoffMs the initial time to wait before retrying; assumed to be 0 * if value is negative + * @param maxRetryDurationMs The maximum value of the retry duration. * @return the non-negative time in milliseconds to wait before the next retry attempt, * or 0 if {@code initialRetryBackoffMs} is negative */ public static long computeRetryWaitTimeInMillis(int retryAttempts, - long initialRetryBackoffMs) { + long initialRetryBackoffMs, + long maxRetryDurationMs) { if (initialRetryBackoffMs < 0) { return 0; } @@ -88,10 +86,10 @@ public static long computeRetryWaitTimeInMillis(int retryAttempts, } if (retryAttempts > 32) { // This would overflow the exponential algorithm ... - return MAX_RETRY_TIME_MS; + return maxRetryDurationMs; } long result = initialRetryBackoffMs << retryAttempts; - return result < 0L ? MAX_RETRY_TIME_MS : Math.min(MAX_RETRY_TIME_MS, result); + return result < 0L ? maxRetryDurationMs : Math.min(maxRetryDurationMs, result); } /** @@ -106,6 +104,7 @@ public static long computeRetryWaitTimeInMillis(int retryAttempts, * @param function the function to call; may not be null * @param maxTotalAttempts maximum number of total attempts, including the first call * @param initialBackoff the initial backoff in ms before retrying + * @param maxRetryDurationMs The maximum duration for retrying a request * @param the return type of the function to retry * @return the function's return value * @throws ConnectException if the function failed after retries @@ -114,9 +113,10 @@ public static T callWithRetries( String description, Callable function, int maxTotalAttempts, - long initialBackoff + long initialBackoff, + long maxRetryDurationMs ) { - return callWithRetries(description, function, maxTotalAttempts, initialBackoff, Time.SYSTEM); + return callWithRetries(description, function, maxTotalAttempts, initialBackoff, maxRetryDurationMs, Time.SYSTEM); } /** @@ -131,6 +131,7 @@ public static T callWithRetries( * @param function the function to call; may not be null * @param maxTotalAttempts maximum number of attempts * @param initialBackoff the initial backoff in ms before retrying + * @param maxRetryDurationMs The maximum duration for retrying a request * @param clock the clock to use for waiting * @param the return type of the function to retry * @return the function's return value @@ -141,6 +142,7 @@ protected static T callWithRetries( Callable function, int maxTotalAttempts, long initialBackoff, + long maxRetryDurationMs, Time clock ) { assert description != null; @@ -166,7 +168,7 @@ protected static T callWithRetries( } // Otherwise it is retriable and we should retry - long backoff = computeRandomRetryWaitTimeInMillis(attempt, initialBackoff); + long backoff = computeRandomRetryWaitTimeInMillis(attempt, initialBackoff, maxRetryDurationMs); log.warn("Failed to {} due to {}. Retrying attempt ({}/{}) after backoff of {} ms", description, e.getCause(), attempt, maxTotalAttempts, backoff); diff --git a/src/test/java/io/confluent/connect/elasticsearch/ElasticsearchSinkConnectorConfigTest.java b/src/test/java/io/confluent/connect/elasticsearch/ElasticsearchSinkConnectorConfigTest.java index 9e1123b5f..d03629240 100644 --- a/src/test/java/io/confluent/connect/elasticsearch/ElasticsearchSinkConnectorConfigTest.java +++ b/src/test/java/io/confluent/connect/elasticsearch/ElasticsearchSinkConnectorConfigTest.java @@ -226,6 +226,27 @@ public void shouldAllowValidKeytab() throws IOException { keytab.toFile().delete(); } + @Test + public void maxRetryDurationValueCheck() { + // Allowed + props.put(RETRY_BACKOFF_MS_CONFIG, "10"); + props.put(MAX_RETRY_DURATION_MS_CONFIG, "100"); + ElasticsearchSinkConnectorConfig config = new ElasticsearchSinkConnectorConfig(props); + assertEquals(100, config.maxRetryDurationMs()); + + // maxRetryDuration = initialBackoff. Not allowed. Takes default + props.put(RETRY_BACKOFF_MS_CONFIG, "100"); + props.put(MAX_RETRY_DURATION_MS_CONFIG, "100"); + config = new ElasticsearchSinkConnectorConfig(props); + assertEquals(MAX_RETRY_TIME_MS, config.maxRetryDurationMs()); + + // maxRetryDuration < initialBackoff. Not allowed. Takes default + props.put(RETRY_BACKOFF_MS_CONFIG, "100"); + props.put(MAX_RETRY_DURATION_MS_CONFIG, "10"); + config = new ElasticsearchSinkConnectorConfig(props); + assertEquals(MAX_RETRY_TIME_MS, config.maxRetryDurationMs()); + } + public static Map addNecessaryProps(Map props) { if (props == null) { props = new HashMap<>(); @@ -233,6 +254,7 @@ public static Map addNecessaryProps(Map props) { props.put(ElasticsearchSinkConnectorConfig.CONNECTION_URL_CONFIG, "http://localhost:8080"); return props; } + @Test public void testLogSensitiveData(){ ElasticsearchSinkConnectorConfig config = new ElasticsearchSinkConnectorConfig(props); diff --git a/src/test/java/io/confluent/connect/elasticsearch/RetryUtilTest.java b/src/test/java/io/confluent/connect/elasticsearch/RetryUtilTest.java index 45438840b..4a2b24198 100644 --- a/src/test/java/io/confluent/connect/elasticsearch/RetryUtilTest.java +++ b/src/test/java/io/confluent/connect/elasticsearch/RetryUtilTest.java @@ -15,6 +15,7 @@ package io.confluent.connect.elasticsearch; import java.io.IOException; + import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.connect.errors.ConnectException; import org.junit.Before; @@ -38,33 +39,46 @@ public void setup() { @Test public void computeRetryBackoffForNegativeAttempts() { - assertComputeRetryInRange(0, 10L); - assertEquals(10L, RetryUtil.computeRandomRetryWaitTimeInMillis(-1, 10L)); + assertComputeRetryInRange(0, 10L, 10000L); + assertEquals(10L, RetryUtil.computeRandomRetryWaitTimeInMillis(-1, 10L, 10000L)); } @Test public void computeRetryBackoffForValidRanges() { - assertComputeRetryInRange(10, 10L); - assertComputeRetryInRange(10, 100L); - assertComputeRetryInRange(10, 1000L); - assertComputeRetryInRange(100, 1000L); + assertComputeRetryInRange(10, 10L, 10000L); + assertComputeRetryInRange(10, 100L, 10000L); + assertComputeRetryInRange(10, 1000L, 10000L); + assertComputeRetryInRange(100, 1000L, 10000L); } @Test public void computeRetryBackoffForNegativeRetryTimes() { - assertComputeRetryInRange(1, -100L); - assertComputeRetryInRange(10, -100L); - assertComputeRetryInRange(100, -100L); + assertComputeRetryInRange(1, -100L, 10000L); + assertComputeRetryInRange(10, -100L, 10000L); + assertComputeRetryInRange(100, -100L, 10000L); } @Test public void computeNonRandomRetryTimes() { - assertEquals(100L, RetryUtil.computeRetryWaitTimeInMillis(0, 100L)); - assertEquals(200L, RetryUtil.computeRetryWaitTimeInMillis(1, 100L)); - assertEquals(400L, RetryUtil.computeRetryWaitTimeInMillis(2, 100L)); - assertEquals(800L, RetryUtil.computeRetryWaitTimeInMillis(3, 100L)); - assertEquals(1600L, RetryUtil.computeRetryWaitTimeInMillis(4, 100L)); - assertEquals(3200L, RetryUtil.computeRetryWaitTimeInMillis(5, 100L)); + assertEquals(100L, RetryUtil.computeRetryWaitTimeInMillis(0, 100L, 10000L)); + assertEquals(200L, RetryUtil.computeRetryWaitTimeInMillis(1, 100L, 10000L)); + assertEquals(400L, RetryUtil.computeRetryWaitTimeInMillis(2, 100L, 10000L)); + assertEquals(800L, RetryUtil.computeRetryWaitTimeInMillis(3, 100L, 10000L)); + assertEquals(1600L, RetryUtil.computeRetryWaitTimeInMillis(4, 100L, 10000L)); + assertEquals(3200L, RetryUtil.computeRetryWaitTimeInMillis(5, 100L, 10000L)); + } + + @Test + public void computeNonRandomRetryTimes_ExhaustingMaxRetryTimesOf32() { + assertEquals(ElasticsearchSinkConnectorConfig.MAX_RETRY_TIME_MS, RetryUtil.computeRetryWaitTimeInMillis(33, 1L, ElasticsearchSinkConnectorConfig.MAX_RETRY_TIME_MS)); + } + + @Test + public void computeNonRandomRetryTimes_ExhaustingMaxBackoffTime() { + assertEquals(100L, RetryUtil.computeRetryWaitTimeInMillis(0, 100L, 300L)); + assertEquals(200L, RetryUtil.computeRetryWaitTimeInMillis(1, 100L, 300L)); + assertEquals(300L, RetryUtil.computeRetryWaitTimeInMillis(2, 100L, 300L)); + assertEquals(300L, RetryUtil.computeRetryWaitTimeInMillis(4, 100L, 300L)); } @Test @@ -72,7 +86,7 @@ public void testCallWithRetriesNoRetries() throws Exception { MockTime mockClock = new MockTime(); long expectedTime = mockClock.milliseconds(); - assertTrue(RetryUtil.callWithRetries("test", () -> testFunction(0), 3, 100, mockClock)); + assertTrue(RetryUtil.callWithRetries("test", () -> testFunction(0), 3, 100, 10000L, mockClock)); assertEquals(expectedTime, mockClock.milliseconds()); } @@ -80,7 +94,7 @@ public void testCallWithRetriesNoRetries() throws Exception { public void testCallWithRetriesSomeRetries() throws Exception { MockTime mockClock = spy(new MockTime()); - assertTrue(RetryUtil.callWithRetries("test", () -> testFunction(2), 3, 100, mockClock)); + assertTrue(RetryUtil.callWithRetries("test", () -> testFunction(2), 3, 100, 10000L, mockClock)); verify(mockClock, times(2)).sleep(anyLong()); } @@ -88,7 +102,7 @@ public void testCallWithRetriesSomeRetries() throws Exception { public void testCallWithRetriesExhaustedRetries() throws Exception { MockTime mockClock = new MockTime(); - assertTrue(RetryUtil.callWithRetries("test", () -> testFunction(4), 3, 100, mockClock)); + assertTrue(RetryUtil.callWithRetries("test", () -> testFunction(4), 3, 100,10000L, mockClock)); verify(mockClock, times(3)).sleep(anyLong()); } @@ -101,11 +115,11 @@ private boolean testFunction(int timesToThrow) throws IOException { return true; } - protected void assertComputeRetryInRange(int retryAttempts, long retryBackoffMs) { - for (int i = 0; i != 20; ++i) { + protected void assertComputeRetryInRange(int retryAttempts, long retryBackoffMs, long maxRetryDurationMs) { + for (int i = 0; i < 20; ++i) { for (int retries = 0; retries <= retryAttempts; ++retries) { - long maxResult = RetryUtil.computeRetryWaitTimeInMillis(retries, retryBackoffMs); - long result = RetryUtil.computeRandomRetryWaitTimeInMillis(retries, retryBackoffMs); + long maxResult = RetryUtil.computeRetryWaitTimeInMillis(retries, retryBackoffMs, maxRetryDurationMs); + long result = RetryUtil.computeRandomRetryWaitTimeInMillis(retries, retryBackoffMs, maxRetryDurationMs); if (retryBackoffMs < 0) { assertEquals(0, result); } else {