Skip to content

Commit

Permalink
Properly set catalog and state fields. Handle null state in validate.…
Browse files Browse the repository at this point in the history
… (#12086)
  • Loading branch information
tryangul committed Apr 11, 2024
1 parent 4e5508b commit e994f43
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,14 @@ class OutputStorageClient<T : Any>
* the expected. Emits a metric whether it's a match.
*/
fun validate(
expected: T,
expected: T?,
connectionId: UUID,
jobId: Long,
attemptNumber: Int,
attrs: List<MetricAttribute>,
) {
if (expected == null) return

val uri = ActivityPayloadURI.v1(connectionId, jobId, attemptNumber, payloadName)

storageClient.validateOutput(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,11 +270,11 @@ private StandardSyncOutput reduceReplicationOutput(final ReplicationOutput outpu
syncSummary.setStreamCount((long) output.getOutputCatalog().getStreams().size());

if (!featureFlagClient.boolVariation(NullOutputStateOnSyncOutput.INSTANCE, new Connection(connectionId))) {
standardSyncOutput.setState(standardSyncOutput.getState());
standardSyncOutput.setState(output.getState());
}

if (!featureFlagClient.boolVariation(NullOutputCatalogOnSyncOutput.INSTANCE, new Connection(connectionId))) {
standardSyncOutput.setOutputCatalog(standardSyncOutput.getOutputCatalog());
standardSyncOutput.setOutputCatalog(output.getOutputCatalog());
}

standardSyncOutput.setStandardSyncSummary(syncSummary);
Expand Down

0 comments on commit e994f43

Please sign in to comment.