From e994f43aed0170166af6fc1d8034f1e0d91c5c95 Mon Sep 17 00:00:00 2001 From: Ryan Br Date: Thu, 11 Apr 2024 16:21:08 -0700 Subject: [PATCH] Properly set catalog and state fields. Handle null state in validate. (#12086) --- .../airbyte/workers/storage/activities/OutputStorageClient.kt | 4 +++- .../workers/temporal/sync/ReplicationActivityImpl.java | 4 ++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/airbyte-commons-worker/src/main/kotlin/io/airbyte/workers/storage/activities/OutputStorageClient.kt b/airbyte-commons-worker/src/main/kotlin/io/airbyte/workers/storage/activities/OutputStorageClient.kt index c304fa17684..418a2ed15d2 100644 --- a/airbyte-commons-worker/src/main/kotlin/io/airbyte/workers/storage/activities/OutputStorageClient.kt +++ b/airbyte-commons-worker/src/main/kotlin/io/airbyte/workers/storage/activities/OutputStorageClient.kt @@ -61,12 +61,14 @@ class OutputStorageClient * the expected. Emits a metric whether it's a match. */ fun validate( - expected: T, + expected: T?, connectionId: UUID, jobId: Long, attemptNumber: Int, attrs: List, ) { + if (expected == null) return + val uri = ActivityPayloadURI.v1(connectionId, jobId, attemptNumber, payloadName) storageClient.validateOutput( diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/ReplicationActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/ReplicationActivityImpl.java index fadd5b9fffc..14c352b402e 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/ReplicationActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/ReplicationActivityImpl.java @@ -270,11 +270,11 @@ private StandardSyncOutput reduceReplicationOutput(final ReplicationOutput outpu syncSummary.setStreamCount((long) output.getOutputCatalog().getStreams().size()); if (!featureFlagClient.boolVariation(NullOutputStateOnSyncOutput.INSTANCE, new Connection(connectionId))) { - standardSyncOutput.setState(standardSyncOutput.getState()); + standardSyncOutput.setState(output.getState()); } if (!featureFlagClient.boolVariation(NullOutputCatalogOnSyncOutput.INSTANCE, new Connection(connectionId))) { - standardSyncOutput.setOutputCatalog(standardSyncOutput.getOutputCatalog()); + standardSyncOutput.setOutputCatalog(output.getOutputCatalog()); } standardSyncOutput.setStandardSyncSummary(syncSummary);