Skip to content

Commit

Permalink
Support for max retry duration time
Browse files Browse the repository at this point in the history
  • Loading branch information
pricelessjunk committed Mar 2, 2024
1 parent f0c057f commit f96de85
Show file tree
Hide file tree
Showing 5 changed files with 114 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -491,7 +491,8 @@ private <T> T callWithRetries(String description, Callable<T> function) {
description,
function,
config.maxRetries() + 1,
config.retryBackoffMs()
config.retryBackoffMs(),
config.maxRetryDurationMs()
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import org.apache.kafka.common.config.ConfigDef.Width;
import org.apache.kafka.common.config.types.Password;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.kafka.common.config.ConfigDef.Range.between;
import static org.apache.kafka.common.config.SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG;
Expand All @@ -46,6 +48,13 @@
import static org.apache.kafka.common.config.SslConfigs.addClientSslSupport;

public class ElasticsearchSinkConnectorConfig extends AbstractConfig {
private static final Logger log = LoggerFactory.getLogger(ElasticsearchSinkConnectorConfig.class);

/**
* Default value for maximum retry time
*/
static final long MAX_RETRY_TIME_MS = TimeUnit.HOURS.toMillis(24);

// Connector group
public static final String CONNECTION_URL_CONFIG = "connection.url";
private static final String CONNECTION_URL_DOC =
Expand Down Expand Up @@ -137,6 +146,13 @@ public class ElasticsearchSinkConnectorConfig extends AbstractConfig {
private static final String MAX_RETRIES_DISPLAY = "Max Retries";
private static final int MAX_RETRIES_DEFAULT = 5;

public static final String MAX_RETRY_DURATION_MS_CONFIG = "max.retry.duration.ms";
private static final String MAX_RETRY_DURATION_MS_DOC =
"The maximum duration of retry time in ms. Default value is 24 hours. " +
"Once this value is reached, the provided duration will be used for all consequent retries.";
private static final String MAX_RETRY_DURATION_MS_DISPLAY = "Retry Backoff Max (ms)";
private static final long MAX_RETRY_DURATION_MS_DEFAULT = MAX_RETRY_TIME_MS;

public static final String RETRY_BACKOFF_MS_CONFIG = "retry.backoff.ms";
private static final String RETRY_BACKOFF_MS_DOC =
"How long to wait in milliseconds before attempting the first retry of a failed indexing "
Expand Down Expand Up @@ -554,6 +570,17 @@ private static void addConnectorConfigs(ConfigDef configDef) {
++order,
Width.SHORT,
RETRY_BACKOFF_MS_DISPLAY
).define(
MAX_RETRY_DURATION_MS_CONFIG,
Type.LONG,
MAX_RETRY_DURATION_MS_DEFAULT,
between(0, TimeUnit.DAYS.toMillis(1)),
Importance.LOW,
MAX_RETRY_DURATION_MS_DOC,
CONNECTOR_GROUP,
++order,
Width.SHORT,
MAX_RETRY_DURATION_MS_DISPLAY
).define(
CONNECTION_COMPRESSION_CONFIG,
Type.BOOLEAN,
Expand Down Expand Up @@ -1034,6 +1061,18 @@ public long retryBackoffMs() {
return getLong(RETRY_BACKOFF_MS_CONFIG);
}

public long maxRetryDurationMs() {
long maxRetryDurationMs = getLong(MAX_RETRY_DURATION_MS_CONFIG);

if (maxRetryDurationMs <= retryBackoffMs()) {
log.warn("Value at \"max.retry.duration.ms\" should be greater than value at \"retry.backoff.ms\". " +
"Using default value of " + MAX_RETRY_DURATION_MS_DEFAULT + " ms for \"max.retry.duration.ms\"");
maxRetryDurationMs = MAX_RETRY_DURATION_MS_DEFAULT;
}

return maxRetryDurationMs;
}

private SecurityProtocol securityProtocol() {
return SecurityProtocol.valueOf(getString(SECURITY_PROTOCOL_CONFIG).toUpperCase());
}
Expand Down
30 changes: 15 additions & 15 deletions src/main/java/io/confluent/connect/elasticsearch/RetryUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

import java.util.concurrent.Callable;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;

import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.errors.ConnectException;
Expand All @@ -39,11 +38,6 @@ public class RetryUtil {

private static final Logger log = LoggerFactory.getLogger(RetryUtil.class);

/**
* An arbitrary absolute maximum practical retry time.
*/
public static final long MAX_RETRY_TIME_MS = TimeUnit.HOURS.toMillis(24);

/**
* Compute the time to sleep using exponential backoff with jitter. This method computes the
* normal exponential backoff as {@code initialRetryBackoffMs << retryAttempt}, and then
Expand All @@ -52,34 +46,38 @@ public class RetryUtil {
* @param retryAttempts the number of previous retry attempts; must be non-negative
* @param initialRetryBackoffMs the initial time to wait before retrying; assumed to
* be 0 if value is negative
* @param maxRetryDurationMs The maximum value of the retry duration.
* @return the non-negative time in milliseconds to wait before the next retry attempt,
* or 0 if {@code initialRetryBackoffMs} is negative
*/
public static long computeRandomRetryWaitTimeInMillis(int retryAttempts,
long initialRetryBackoffMs) {
long initialRetryBackoffMs,
long maxRetryDurationMs) {
if (initialRetryBackoffMs < 0) {
return 0;
}
if (retryAttempts < 0) {
return initialRetryBackoffMs;
}
long maxRetryTime = computeRetryWaitTimeInMillis(retryAttempts, initialRetryBackoffMs);
long maxRetryTime = computeRetryWaitTimeInMillis(retryAttempts, initialRetryBackoffMs, maxRetryDurationMs);
return ThreadLocalRandom.current().nextLong(0, maxRetryTime);
}

/**
* Compute the time to sleep using exponential backoff. This method computes the normal
* exponential backoff as {@code initialRetryBackoffMs << retryAttempt}, bounded to always
* be less than {@link #MAX_RETRY_TIME_MS}.
* be less than value provided in maxRetryDurationMs.
*
* @param retryAttempts the number of previous retry attempts; must be non-negative
* @param initialRetryBackoffMs the initial time to wait before retrying; assumed to be 0
* if value is negative
* @param maxRetryDurationMs The maximum value of the retry duration.
* @return the non-negative time in milliseconds to wait before the next retry attempt,
* or 0 if {@code initialRetryBackoffMs} is negative
*/
public static long computeRetryWaitTimeInMillis(int retryAttempts,
long initialRetryBackoffMs) {
long initialRetryBackoffMs,
long maxRetryDurationMs) {
if (initialRetryBackoffMs < 0) {
return 0;
}
Expand All @@ -88,10 +86,10 @@ public static long computeRetryWaitTimeInMillis(int retryAttempts,
}
if (retryAttempts > 32) {
// This would overflow the exponential algorithm ...
return MAX_RETRY_TIME_MS;
return maxRetryDurationMs;
}
long result = initialRetryBackoffMs << retryAttempts;
return result < 0L ? MAX_RETRY_TIME_MS : Math.min(MAX_RETRY_TIME_MS, result);
return result < 0L ? maxRetryDurationMs : Math.min(maxRetryDurationMs, result);
}

/**
Expand All @@ -114,9 +112,10 @@ public static <T> T callWithRetries(
String description,
Callable<T> function,
int maxTotalAttempts,
long initialBackoff
long initialBackoff,
long backoffMaxMs
) {
return callWithRetries(description, function, maxTotalAttempts, initialBackoff, Time.SYSTEM);
return callWithRetries(description, function, maxTotalAttempts, initialBackoff, backoffMaxMs, Time.SYSTEM);
}

/**
Expand All @@ -141,6 +140,7 @@ protected static <T> T callWithRetries(
Callable<T> function,
int maxTotalAttempts,
long initialBackoff,
long backoffMaxMs,
Time clock
) {
assert description != null;
Expand All @@ -166,7 +166,7 @@ protected static <T> T callWithRetries(
}

// Otherwise it is retriable and we should retry
long backoff = computeRandomRetryWaitTimeInMillis(attempt, initialBackoff);
long backoff = computeRandomRetryWaitTimeInMillis(attempt, initialBackoff, backoffMaxMs);

log.warn("Failed to {} due to {}. Retrying attempt ({}/{}) after backoff of {} ms",
description, e.getCause(), attempt, maxTotalAttempts, backoff);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,13 +226,35 @@ public void shouldAllowValidKeytab() throws IOException {
keytab.toFile().delete();
}

@Test
public void maxRetryDurationValueCheck() {
// Allowed
props.put(RETRY_BACKOFF_MS_CONFIG, "10");
props.put(MAX_RETRY_DURATION_MS_CONFIG, "100");
ElasticsearchSinkConnectorConfig config = new ElasticsearchSinkConnectorConfig(props);
assertEquals(100, config.maxRetryDurationMs());

// maxRetryDuration = initialBackoff. Not allowed. Takes default
props.put(RETRY_BACKOFF_MS_CONFIG, "100");
props.put(MAX_RETRY_DURATION_MS_CONFIG, "100");
config = new ElasticsearchSinkConnectorConfig(props);
assertEquals(MAX_RETRY_TIME_MS, config.maxRetryDurationMs());

// maxRetryDuration < initialBackoff. Not allowed. Takes default
props.put(RETRY_BACKOFF_MS_CONFIG, "100");
props.put(MAX_RETRY_DURATION_MS_CONFIG, "10");
config = new ElasticsearchSinkConnectorConfig(props);
assertEquals(MAX_RETRY_TIME_MS, config.maxRetryDurationMs());
}

public static Map<String, String> addNecessaryProps(Map<String, String> props) {
if (props == null) {
props = new HashMap<>();
}
props.put(ElasticsearchSinkConnectorConfig.CONNECTION_URL_CONFIG, "http://localhost:8080");
return props;
}

@Test
public void testLogSensitiveData(){
ElasticsearchSinkConnectorConfig config = new ElasticsearchSinkConnectorConfig(props);
Expand Down
58 changes: 36 additions & 22 deletions src/test/java/io/confluent/connect/elasticsearch/RetryUtilTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package io.confluent.connect.elasticsearch;

import java.io.IOException;

import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.connect.errors.ConnectException;
import org.junit.Before;
Expand All @@ -38,57 +39,70 @@ public void setup() {

@Test
public void computeRetryBackoffForNegativeAttempts() {
assertComputeRetryInRange(0, 10L);
assertEquals(10L, RetryUtil.computeRandomRetryWaitTimeInMillis(-1, 10L));
assertComputeRetryInRange(0, 10L, 10000L);
assertEquals(10L, RetryUtil.computeRandomRetryWaitTimeInMillis(-1, 10L, 10000L));
}

@Test
public void computeRetryBackoffForValidRanges() {
assertComputeRetryInRange(10, 10L);
assertComputeRetryInRange(10, 100L);
assertComputeRetryInRange(10, 1000L);
assertComputeRetryInRange(100, 1000L);
assertComputeRetryInRange(10, 10L, 10000L);
assertComputeRetryInRange(10, 100L, 10000L);
assertComputeRetryInRange(10, 1000L, 10000L);
assertComputeRetryInRange(100, 1000L, 10000L);
}

@Test
public void computeRetryBackoffForNegativeRetryTimes() {
assertComputeRetryInRange(1, -100L);
assertComputeRetryInRange(10, -100L);
assertComputeRetryInRange(100, -100L);
assertComputeRetryInRange(1, -100L, 10000L);
assertComputeRetryInRange(10, -100L, 10000L);
assertComputeRetryInRange(100, -100L, 10000L);
}

@Test
public void computeNonRandomRetryTimes() {
assertEquals(100L, RetryUtil.computeRetryWaitTimeInMillis(0, 100L));
assertEquals(200L, RetryUtil.computeRetryWaitTimeInMillis(1, 100L));
assertEquals(400L, RetryUtil.computeRetryWaitTimeInMillis(2, 100L));
assertEquals(800L, RetryUtil.computeRetryWaitTimeInMillis(3, 100L));
assertEquals(1600L, RetryUtil.computeRetryWaitTimeInMillis(4, 100L));
assertEquals(3200L, RetryUtil.computeRetryWaitTimeInMillis(5, 100L));
assertEquals(100L, RetryUtil.computeRetryWaitTimeInMillis(0, 100L, 10000L));
assertEquals(200L, RetryUtil.computeRetryWaitTimeInMillis(1, 100L, 10000L));
assertEquals(400L, RetryUtil.computeRetryWaitTimeInMillis(2, 100L, 10000L));
assertEquals(800L, RetryUtil.computeRetryWaitTimeInMillis(3, 100L, 10000L));
assertEquals(1600L, RetryUtil.computeRetryWaitTimeInMillis(4, 100L, 10000L));
assertEquals(3200L, RetryUtil.computeRetryWaitTimeInMillis(5, 100L, 10000L));
}

@Test
public void computeNonRandomRetryTimes_ExhaustingMaxRetryTimesOf32() {
assertEquals(ElasticsearchSinkConnectorConfig.MAX_RETRY_TIME_MS, RetryUtil.computeRetryWaitTimeInMillis(33, 1L, ElasticsearchSinkConnectorConfig.MAX_RETRY_TIME_MS));
}

@Test
public void computeNonRandomRetryTimes_ExhaustingMaxBackoffTime() {
assertEquals(100L, RetryUtil.computeRetryWaitTimeInMillis(0, 100L, 300L));
assertEquals(200L, RetryUtil.computeRetryWaitTimeInMillis(1, 100L, 300L));
assertEquals(300L, RetryUtil.computeRetryWaitTimeInMillis(2, 100L, 300L));
assertEquals(300L, RetryUtil.computeRetryWaitTimeInMillis(4, 100L, 300L));
}

@Test
public void testCallWithRetriesNoRetries() throws Exception {
MockTime mockClock = new MockTime();
long expectedTime = mockClock.milliseconds();

assertTrue(RetryUtil.callWithRetries("test", () -> testFunction(0), 3, 100, mockClock));
assertTrue(RetryUtil.callWithRetries("test", () -> testFunction(0), 3, 100, 10000L, mockClock));
assertEquals(expectedTime, mockClock.milliseconds());
}

@Test
public void testCallWithRetriesSomeRetries() throws Exception {
MockTime mockClock = spy(new MockTime());

assertTrue(RetryUtil.callWithRetries("test", () -> testFunction(2), 3, 100, mockClock));
assertTrue(RetryUtil.callWithRetries("test", () -> testFunction(2), 3, 100, 10000L, mockClock));
verify(mockClock, times(2)).sleep(anyLong());
}

@Test(expected = ConnectException.class)
public void testCallWithRetriesExhaustedRetries() throws Exception {
MockTime mockClock = new MockTime();

assertTrue(RetryUtil.callWithRetries("test", () -> testFunction(4), 3, 100, mockClock));
assertTrue(RetryUtil.callWithRetries("test", () -> testFunction(4), 3, 100,10000L, mockClock));
verify(mockClock, times(3)).sleep(anyLong());
}

Expand All @@ -101,11 +115,11 @@ private boolean testFunction(int timesToThrow) throws IOException {
return true;
}

protected void assertComputeRetryInRange(int retryAttempts, long retryBackoffMs) {
for (int i = 0; i != 20; ++i) {
protected void assertComputeRetryInRange(int retryAttempts, long retryBackoffMs, long maxRetryDurationMs) {
for (int i = 0; i < 20; ++i) {
for (int retries = 0; retries <= retryAttempts; ++retries) {
long maxResult = RetryUtil.computeRetryWaitTimeInMillis(retries, retryBackoffMs);
long result = RetryUtil.computeRandomRetryWaitTimeInMillis(retries, retryBackoffMs);
long maxResult = RetryUtil.computeRetryWaitTimeInMillis(retries, retryBackoffMs, maxRetryDurationMs);
long result = RetryUtil.computeRandomRetryWaitTimeInMillis(retries, retryBackoffMs, maxRetryDurationMs);
if (retryBackoffMs < 0) {
assertEquals(0, result);
} else {
Expand Down

0 comments on commit f96de85

Please sign in to comment.