diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/DestinationTimeoutMonitor.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/DestinationTimeoutMonitor.java index fb115d25364..1d064aa07d4 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/DestinationTimeoutMonitor.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/DestinationTimeoutMonitor.java @@ -280,12 +280,12 @@ public static class TimeoutException extends RuntimeException { public final String humanReadableThreshold; public final String humanReadableTimeSinceLastAction; - public TimeoutException(final long threshold, final long timeSinceLastAction) { + public TimeoutException(final long thresholdMs, final long timeSinceLastActionMs) { super(String.format("Last action %s ago, exceeding the threshold of %s.", - DurationFormatUtils.formatDurationWords(timeSinceLastAction, true, true), - DurationFormatUtils.formatDurationWords(threshold, true, true))); - this.humanReadableThreshold = DurationFormatUtils.formatDurationWords(threshold, true, true); - this.humanReadableTimeSinceLastAction = DurationFormatUtils.formatDurationWords(threshold, true, true); + DurationFormatUtils.formatDurationWords(timeSinceLastActionMs, true, true), + DurationFormatUtils.formatDurationWords(thresholdMs, true, true))); + this.humanReadableThreshold = DurationFormatUtils.formatDurationWords(thresholdMs, true, true); + this.humanReadableTimeSinceLastAction = DurationFormatUtils.formatDurationWords(timeSinceLastActionMs, true, true); } } diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/HeartbeatTimeoutChaperone.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/HeartbeatTimeoutChaperone.java index 1bb36342c9f..02ce9558ba3 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/HeartbeatTimeoutChaperone.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/HeartbeatTimeoutChaperone.java @@ -100,7 +100,8 @@ public HeartbeatTimeoutChaperone(final HeartbeatMonitor heartbeatMonitor, * @throws ExecutionException - throw is the runnable throw an exception */ public void runWithHeartbeatThread(final CompletableFuture runnableFuture) throws ExecutionException { - LOGGER.info("Starting source heartbeat check. Will check every {} minutes.", timeoutCheckDuration.toMinutes()); + LOGGER.info("Starting source heartbeat check. Will check threshold of {} seconds, every {} minutes.", + heartbeatMonitor.getHeartbeatFreshnessThreshold().toSeconds(), timeoutCheckDuration.toMinutes()); final CompletableFuture heartbeatFuture = CompletableFuture.runAsync(customMonitor.orElse(this::monitor), getLazyExecutorService()); try { @@ -127,9 +128,9 @@ public void runWithHeartbeatThread(final CompletableFuture runnableFuture) new MetricAttribute(MetricTags.CONNECTION_ID, connectionId.toString()), new MetricAttribute(MetricTags.KILLED, "true"), new MetricAttribute(MetricTags.SOURCE_IMAGE, sourceDockerImage)); - final var threshold = heartbeatMonitor.getHeartbeatFreshnessThreshold().getSeconds(); - final var timeBetweenLastRecord = heartbeatMonitor.getTimeSinceLastBeat().orElse(Duration.ZERO).getSeconds(); - throw new HeartbeatTimeoutException(threshold, timeBetweenLastRecord); + final var thresholdMs = heartbeatMonitor.getHeartbeatFreshnessThreshold().toMillis(); + final var timeBetweenLastRecordMs = heartbeatMonitor.getTimeSinceLastBeat().orElse(Duration.ZERO).toMillis(); + throw new HeartbeatTimeoutException(thresholdMs, timeBetweenLastRecordMs); } else { LOGGER.info("Do not terminate as feature flag is disable"); metricClient.count(OssMetricsRegistry.SOURCE_HEARTBEAT_FAILURE, 1, @@ -188,12 +189,12 @@ public static class HeartbeatTimeoutException extends RuntimeException { public final String humanReadableThreshold; public final String humanReadableTimeSinceLastRec; - public HeartbeatTimeoutException(final long threshold, final long timeBetweenLastRecord) { + public HeartbeatTimeoutException(final long thresholdMs, final long timeBetweenLastRecordMs) { super(String.format("Last record saw %s ago, exceeding the threshold of %s.", - DurationFormatUtils.formatDurationWords(timeBetweenLastRecord, true, true), - DurationFormatUtils.formatDurationWords(threshold, true, true))); - this.humanReadableThreshold = DurationFormatUtils.formatDurationWords(threshold, true, true); - this.humanReadableTimeSinceLastRec = DurationFormatUtils.formatDurationWords(threshold, true, true); + DurationFormatUtils.formatDurationWords(timeBetweenLastRecordMs, true, true), + DurationFormatUtils.formatDurationWords(thresholdMs, true, true))); + this.humanReadableThreshold = DurationFormatUtils.formatDurationWords(thresholdMs, true, true); + this.humanReadableTimeSinceLastRec = DurationFormatUtils.formatDurationWords(timeBetweenLastRecordMs, true, true); } } diff --git a/airbyte-commons-worker/src/test/java/io/airbyte/workers/general/ReplicationWorkerTest.java b/airbyte-commons-worker/src/test/java/io/airbyte/workers/general/ReplicationWorkerTest.java index 5d8b8a392af..2ca47b7bb06 100644 --- a/airbyte-commons-worker/src/test/java/io/airbyte/workers/general/ReplicationWorkerTest.java +++ b/airbyte-commons-worker/src/test/java/io/airbyte/workers/general/ReplicationWorkerTest.java @@ -1137,12 +1137,21 @@ void testGetFailureReason() { assertEquals(failureReason.getFailureOrigin(), FailureOrigin.SOURCE); failureReason = ReplicationWorkerHelper.getFailureReason(new DestinationException(""), jobId, attempt); assertEquals(failureReason.getFailureOrigin(), FailureOrigin.DESTINATION); - failureReason = ReplicationWorkerHelper.getFailureReason(new HeartbeatTimeoutChaperone.HeartbeatTimeoutException(10, 15), jobId, attempt); + failureReason = ReplicationWorkerHelper.getFailureReason(new HeartbeatTimeoutChaperone.HeartbeatTimeoutException(10000, 15000), jobId, attempt); assertEquals(failureReason.getFailureOrigin(), FailureOrigin.SOURCE); assertEquals(failureReason.getFailureType(), FailureReason.FailureType.HEARTBEAT_TIMEOUT); + assertEquals( + "Airbyte detected that the Source didn't send any records in the last 15 seconds, exceeding the configured 10 seconds threshold. Airbyte will try reading again on the next sync. Please see https://docs.airbyte.com/understanding-airbyte/heartbeats for more info.", + failureReason.getExternalMessage()); + assertEquals("Last record saw 15 seconds ago, exceeding the threshold of 10 seconds.", failureReason.getInternalMessage()); failureReason = ReplicationWorkerHelper.getFailureReason(new RuntimeException(), jobId, attempt); assertEquals(failureReason.getFailureOrigin(), FailureOrigin.REPLICATION); - failureReason = ReplicationWorkerHelper.getFailureReason(new TimeoutException(10, 15), jobId, attempt); + failureReason = ReplicationWorkerHelper.getFailureReason(new TimeoutException(10000, 15000), jobId, attempt); + assertEquals( + "Airbyte detected that the Destination didn't make progress in the last 15 seconds, exceeding the configured 10 seconds threshold. Airbyte will try reading again on the next sync. Please see https://docs.airbyte.com/understanding-airbyte/heartbeats for more info.", + failureReason.getExternalMessage()); + assertEquals("Last action 15 seconds ago, exceeding the threshold of 10 seconds.", failureReason.getInternalMessage()); + System.out.println(failureReason.getInternalMessage()); assertEquals(failureReason.getFailureOrigin(), FailureOrigin.DESTINATION); assertEquals(failureReason.getFailureType(), FailureType.DESTINATION_TIMEOUT); } diff --git a/airbyte-commons-worker/src/test/java/io/airbyte/workers/internal/HeartBeatTimeoutChaperoneTest.java b/airbyte-commons-worker/src/test/java/io/airbyte/workers/internal/HeartBeatTimeoutChaperoneTest.java index 275cc023fa0..66e3c4072a9 100644 --- a/airbyte-commons-worker/src/test/java/io/airbyte/workers/internal/HeartBeatTimeoutChaperoneTest.java +++ b/airbyte-commons-worker/src/test/java/io/airbyte/workers/internal/HeartBeatTimeoutChaperoneTest.java @@ -5,6 +5,8 @@ package io.airbyte.workers.internal; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; @@ -24,7 +26,6 @@ import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; -import org.assertj.core.api.Assertions; import org.junit.jupiter.api.Test; class HeartBeatTimeoutChaperoneTest { @@ -40,6 +41,8 @@ class HeartBeatTimeoutChaperoneTest { @Test void testFailHeartbeat() { when(featureFlagClient.boolVariation(eq(ShouldFailSyncIfHeartbeatFailure.INSTANCE), any())).thenReturn(true); + when(heartbeatMonitor.getHeartbeatFreshnessThreshold()).thenReturn(Duration.ofSeconds(1)); + final HeartbeatTimeoutChaperone heartbeatTimeoutChaperone = new HeartbeatTimeoutChaperone( heartbeatMonitor, timeoutCheckDuration, @@ -49,14 +52,17 @@ void testFailHeartbeat() { connectionId, metricClient); - Assertions.assertThatThrownBy(() -> heartbeatTimeoutChaperone.runWithHeartbeatThread(CompletableFuture.runAsync(() -> { - try { - Thread.sleep(Long.MAX_VALUE); - } catch (final InterruptedException e) { - throw new RuntimeException(e); - } - }))) - .isInstanceOf(HeartbeatTimeoutChaperone.HeartbeatTimeoutException.class); + final var thrown = assertThrows(HeartbeatTimeoutChaperone.HeartbeatTimeoutException.class, + () -> heartbeatTimeoutChaperone.runWithHeartbeatThread(CompletableFuture.runAsync(() -> { + try { + Thread.sleep(Long.MAX_VALUE); + } catch (final InterruptedException e) { + throw new RuntimeException(e); + } + }))); + + assertEquals("Last record saw 0 seconds ago, exceeding the threshold of 1 second.", thrown.getMessage()); + verify(metricClient, times(1)).count(OssMetricsRegistry.SOURCE_HEARTBEAT_FAILURE, 1, new MetricAttribute(MetricTags.CONNECTION_ID, connectionId.toString()), new MetricAttribute(MetricTags.KILLED, "true"),