Skip to content

Commit

Permalink
🐛 Fix error message bug (#11743)
Browse files Browse the repository at this point in the history
This slack thread made me realise we were incorrectly converting some units.

DurationFormatUtils.formatDurationWords takes in millisecs, and we are passing in seconds.

This PR corrects that for both Source and Destination.

- Pass in milliseconds instead of seconds.
- Add tests in at the Heartbeat Chaperone layer and the final reporting layer.
  • Loading branch information
davinchia committed Mar 20, 2024
1 parent f6c91d0 commit 404ac8f
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -280,12 +280,12 @@ public static class TimeoutException extends RuntimeException {
public final String humanReadableThreshold;
public final String humanReadableTimeSinceLastAction;

public TimeoutException(final long threshold, final long timeSinceLastAction) {
public TimeoutException(final long thresholdMs, final long timeSinceLastActionMs) {
super(String.format("Last action %s ago, exceeding the threshold of %s.",
DurationFormatUtils.formatDurationWords(timeSinceLastAction, true, true),
DurationFormatUtils.formatDurationWords(threshold, true, true)));
this.humanReadableThreshold = DurationFormatUtils.formatDurationWords(threshold, true, true);
this.humanReadableTimeSinceLastAction = DurationFormatUtils.formatDurationWords(threshold, true, true);
DurationFormatUtils.formatDurationWords(timeSinceLastActionMs, true, true),
DurationFormatUtils.formatDurationWords(thresholdMs, true, true)));
this.humanReadableThreshold = DurationFormatUtils.formatDurationWords(thresholdMs, true, true);
this.humanReadableTimeSinceLastAction = DurationFormatUtils.formatDurationWords(timeSinceLastActionMs, true, true);
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,8 @@ public HeartbeatTimeoutChaperone(final HeartbeatMonitor heartbeatMonitor,
* @throws ExecutionException - throw is the runnable throw an exception
*/
public void runWithHeartbeatThread(final CompletableFuture<Void> runnableFuture) throws ExecutionException {
LOGGER.info("Starting source heartbeat check. Will check every {} minutes.", timeoutCheckDuration.toMinutes());
LOGGER.info("Starting source heartbeat check. Will check threshold of {} seconds, every {} minutes.",
heartbeatMonitor.getHeartbeatFreshnessThreshold().toSeconds(), timeoutCheckDuration.toMinutes());
final CompletableFuture<Void> heartbeatFuture = CompletableFuture.runAsync(customMonitor.orElse(this::monitor), getLazyExecutorService());

try {
Expand All @@ -127,9 +128,9 @@ public void runWithHeartbeatThread(final CompletableFuture<Void> runnableFuture)
new MetricAttribute(MetricTags.CONNECTION_ID, connectionId.toString()),
new MetricAttribute(MetricTags.KILLED, "true"),
new MetricAttribute(MetricTags.SOURCE_IMAGE, sourceDockerImage));
final var threshold = heartbeatMonitor.getHeartbeatFreshnessThreshold().getSeconds();
final var timeBetweenLastRecord = heartbeatMonitor.getTimeSinceLastBeat().orElse(Duration.ZERO).getSeconds();
throw new HeartbeatTimeoutException(threshold, timeBetweenLastRecord);
final var thresholdMs = heartbeatMonitor.getHeartbeatFreshnessThreshold().toMillis();
final var timeBetweenLastRecordMs = heartbeatMonitor.getTimeSinceLastBeat().orElse(Duration.ZERO).toMillis();
throw new HeartbeatTimeoutException(thresholdMs, timeBetweenLastRecordMs);
} else {
LOGGER.info("Do not terminate as feature flag is disable");
metricClient.count(OssMetricsRegistry.SOURCE_HEARTBEAT_FAILURE, 1,
Expand Down Expand Up @@ -188,12 +189,12 @@ public static class HeartbeatTimeoutException extends RuntimeException {
public final String humanReadableThreshold;
public final String humanReadableTimeSinceLastRec;

public HeartbeatTimeoutException(final long threshold, final long timeBetweenLastRecord) {
public HeartbeatTimeoutException(final long thresholdMs, final long timeBetweenLastRecordMs) {
super(String.format("Last record saw %s ago, exceeding the threshold of %s.",
DurationFormatUtils.formatDurationWords(timeBetweenLastRecord, true, true),
DurationFormatUtils.formatDurationWords(threshold, true, true)));
this.humanReadableThreshold = DurationFormatUtils.formatDurationWords(threshold, true, true);
this.humanReadableTimeSinceLastRec = DurationFormatUtils.formatDurationWords(threshold, true, true);
DurationFormatUtils.formatDurationWords(timeBetweenLastRecordMs, true, true),
DurationFormatUtils.formatDurationWords(thresholdMs, true, true)));
this.humanReadableThreshold = DurationFormatUtils.formatDurationWords(thresholdMs, true, true);
this.humanReadableTimeSinceLastRec = DurationFormatUtils.formatDurationWords(timeBetweenLastRecordMs, true, true);
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1137,12 +1137,21 @@ void testGetFailureReason() {
assertEquals(failureReason.getFailureOrigin(), FailureOrigin.SOURCE);
failureReason = ReplicationWorkerHelper.getFailureReason(new DestinationException(""), jobId, attempt);
assertEquals(failureReason.getFailureOrigin(), FailureOrigin.DESTINATION);
failureReason = ReplicationWorkerHelper.getFailureReason(new HeartbeatTimeoutChaperone.HeartbeatTimeoutException(10, 15), jobId, attempt);
failureReason = ReplicationWorkerHelper.getFailureReason(new HeartbeatTimeoutChaperone.HeartbeatTimeoutException(10000, 15000), jobId, attempt);
assertEquals(failureReason.getFailureOrigin(), FailureOrigin.SOURCE);
assertEquals(failureReason.getFailureType(), FailureReason.FailureType.HEARTBEAT_TIMEOUT);
assertEquals(
"Airbyte detected that the Source didn't send any records in the last 15 seconds, exceeding the configured 10 seconds threshold. Airbyte will try reading again on the next sync. Please see https://docs.airbyte.com/understanding-airbyte/heartbeats for more info.",
failureReason.getExternalMessage());
assertEquals("Last record saw 15 seconds ago, exceeding the threshold of 10 seconds.", failureReason.getInternalMessage());
failureReason = ReplicationWorkerHelper.getFailureReason(new RuntimeException(), jobId, attempt);
assertEquals(failureReason.getFailureOrigin(), FailureOrigin.REPLICATION);
failureReason = ReplicationWorkerHelper.getFailureReason(new TimeoutException(10, 15), jobId, attempt);
failureReason = ReplicationWorkerHelper.getFailureReason(new TimeoutException(10000, 15000), jobId, attempt);
assertEquals(
"Airbyte detected that the Destination didn't make progress in the last 15 seconds, exceeding the configured 10 seconds threshold. Airbyte will try reading again on the next sync. Please see https://docs.airbyte.com/understanding-airbyte/heartbeats for more info.",
failureReason.getExternalMessage());
assertEquals("Last action 15 seconds ago, exceeding the threshold of 10 seconds.", failureReason.getInternalMessage());
System.out.println(failureReason.getInternalMessage());
assertEquals(failureReason.getFailureOrigin(), FailureOrigin.DESTINATION);
assertEquals(failureReason.getFailureType(), FailureType.DESTINATION_TIMEOUT);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
package io.airbyte.workers.internal;

import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
Expand All @@ -24,7 +26,6 @@
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

class HeartBeatTimeoutChaperoneTest {
Expand All @@ -40,6 +41,8 @@ class HeartBeatTimeoutChaperoneTest {
@Test
void testFailHeartbeat() {
when(featureFlagClient.boolVariation(eq(ShouldFailSyncIfHeartbeatFailure.INSTANCE), any())).thenReturn(true);
when(heartbeatMonitor.getHeartbeatFreshnessThreshold()).thenReturn(Duration.ofSeconds(1));

final HeartbeatTimeoutChaperone heartbeatTimeoutChaperone = new HeartbeatTimeoutChaperone(
heartbeatMonitor,
timeoutCheckDuration,
Expand All @@ -49,14 +52,17 @@ void testFailHeartbeat() {
connectionId,
metricClient);

Assertions.assertThatThrownBy(() -> heartbeatTimeoutChaperone.runWithHeartbeatThread(CompletableFuture.runAsync(() -> {
try {
Thread.sleep(Long.MAX_VALUE);
} catch (final InterruptedException e) {
throw new RuntimeException(e);
}
})))
.isInstanceOf(HeartbeatTimeoutChaperone.HeartbeatTimeoutException.class);
final var thrown = assertThrows(HeartbeatTimeoutChaperone.HeartbeatTimeoutException.class,
() -> heartbeatTimeoutChaperone.runWithHeartbeatThread(CompletableFuture.runAsync(() -> {
try {
Thread.sleep(Long.MAX_VALUE);
} catch (final InterruptedException e) {
throw new RuntimeException(e);
}
})));

assertEquals("Last record saw 0 seconds ago, exceeding the threshold of 1 second.", thrown.getMessage());

verify(metricClient, times(1)).count(OssMetricsRegistry.SOURCE_HEARTBEAT_FAILURE, 1,
new MetricAttribute(MetricTags.CONNECTION_ID, connectionId.toString()),
new MetricAttribute(MetricTags.KILLED, "true"),
Expand Down

0 comments on commit 404ac8f

Please sign in to comment.