Skip to content

Commit

Permalink
Update Error handling of the WorkloadApiWorker (#10492)
Browse files Browse the repository at this point in the history
  • Loading branch information
gosusnp committed Dec 20, 2023
1 parent a2d2088 commit f8e8d76
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ public String toString() {

private static final String WORKFLOW_TYPE_SYNC = "SyncWorkflow";
private static final String ACTIVITY_TYPE_REPLICATE = "Replicate";
private static final String ACTIVITY_TYPE_REPLICATEV2 = "ReplicateV2";
private static final String ACTIVITY_TYPE_PERSIST = "Persist";
private static final String ACTIVITY_TYPE_NORMALIZE = "Normalize";
private static final String ACTIVITY_TYPE_DBT_RUN = "Run";
Expand Down Expand Up @@ -381,7 +382,7 @@ public static FailureReason failureReasonFromWorkflowAndActivity(final String wo
final Throwable t,
final Long jobId,
final Integer attemptNumber) {
if (WORKFLOW_TYPE_SYNC.equals(workflowType) && ACTIVITY_TYPE_REPLICATE.equals(activityType)) {
if (WORKFLOW_TYPE_SYNC.equals(workflowType) && (ACTIVITY_TYPE_REPLICATE.equals(activityType) || ACTIVITY_TYPE_REPLICATEV2.equals(activityType))) {
return replicationFailure(t, jobId, attemptNumber);
} else if (WORKFLOW_TYPE_SYNC.equals(workflowType) && ACTIVITY_TYPE_PERSIST.equals(activityType)) {
return persistenceFailure(t, jobId, attemptNumber);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CancellationException;
import java.util.function.Function;
import org.openapitools.client.infrastructure.ClientException;
import org.openapitools.client.infrastructure.ServerException;
Expand Down Expand Up @@ -159,30 +160,38 @@ public ReplicationOutput run(final ReplicationInput replicationInput, final Path
sleep(sleepInterval.toMillis());
}

if (workload.getStatus() == WorkloadStatus.FAILURE) {
if (SOURCE.equals(workload.getTerminationSource())) {
throw new SourceException(workload.getTerminationReason());
} else if (DESTINATION.equals(workload.getTerminationSource())) {
throw new DestinationException(workload.getTerminationReason());
} else {
throw new WorkerException(workload.getTerminationReason());
}
} else if (workload.getStatus() == WorkloadStatus.CANCELLED) {
throw new WorkerException("Replication cancelled by " + workload.getTerminationSource());
if (workload.getStatus() == WorkloadStatus.CANCELLED) {
throw new CancellationException("Replication cancelled by " + workload.getTerminationSource());
}

final ReplicationOutput output;
try {
output = getReplicationOutput(workloadId);
} catch (final DocStoreAccessException e) {
} catch (final Exception e) {
throwFallbackError(workload, e);
throw new WorkerException("Failed to read replication output", e);
}
if (output == null) {
throw new WorkerException("Failed to read replication output");
// If we fail to read the output, fallback to throwing an exception based on the status of the
// workload
throwFallbackError(workload, null);
throw new WorkerException("Replication output is empty");
}
return output;
}

private void throwFallbackError(final Workload workload, final Exception e) throws WorkerException {
if (workload.getStatus() == WorkloadStatus.FAILURE) {
if (SOURCE.equals(workload.getTerminationSource())) {
throw new SourceException(workload.getTerminationReason(), e);
} else if (DESTINATION.equals(workload.getTerminationSource())) {
throw new DestinationException(workload.getTerminationReason(), e);
} else {
throw new WorkerException(workload.getTerminationReason(), e);
}
}
}

@Override
public void cancel() {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import org.openapitools.client.infrastructure.ServerException
import java.nio.file.Path
import java.util.Optional
import java.util.UUID
import java.util.concurrent.CancellationException

internal class WorkloadApiWorkerTest {
private var workloadIdGenerator: WorkloadIdGenerator = mockk()
Expand Down Expand Up @@ -92,6 +93,30 @@ internal class WorkloadApiWorkerTest {
assertEquals(expectedOutput, output)
}

@Test
fun testFailedReplicationWithOutput() {
val jobId = 13L
val attemptNumber = 37
val workloadId = "my-workload"
val expectedDocPrefix = "testNs/orchestrator-repl-job-$jobId-attempt-$attemptNumber"
val expectedOutput =
ReplicationOutput()
.withReplicationAttemptSummary(ReplicationAttemptSummary().withStatus(StandardSyncSummary.ReplicationStatus.COMPLETED))
initializeReplicationInput(jobId, attemptNumber)

every { workloadIdGenerator.generateSyncWorkloadId(replicationInput.connectionId, jobId, attemptNumber) } returns workloadId

every { connectionApi.getConnection(any()) } returns ConnectionRead().geography(Geography.US)
every { workloadApi.workloadCreate(any()) } returns Unit
every { workloadApi.workloadGet(workloadId) } returns mockWorkload(WorkloadStatus.FAILURE)

every { documentStoreClient.read("$expectedDocPrefix/SUCCEEDED") } returns Optional.of(Jsons.serialize(expectedOutput))

val output = workloadApiWorker.run(replicationInput, jobRoot)
// We expect the output to be returned if it exists, even on a failure
assertEquals(expectedOutput, output)
}

@Test
fun testSuccessfulReplicationWithDocOutput() {
workloadApiWorker =
Expand Down Expand Up @@ -185,7 +210,7 @@ internal class WorkloadApiWorkerTest {
terminationReason = "Oops... user cancelled",
)

assertThrows<WorkerException> { workloadApiWorker.run(replicationInput, jobRoot) }
assertThrows<CancellationException> { workloadApiWorker.run(replicationInput, jobRoot) }
}

@Test
Expand Down

0 comments on commit f8e8d76

Please sign in to comment.