Skip to content

Commit

Permalink
Set URIs on sync output for state and catalog (#12088)
Browse files Browse the repository at this point in the history
  • Loading branch information
tryangul committed Apr 12, 2024
1 parent e994f43 commit a9e464c
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import io.airbyte.metrics.lib.MetricTags
import io.airbyte.metrics.lib.OssMetricsRegistry
import io.github.oshai.kotlinlogging.KotlinLogging
import java.util.UUID
import io.airbyte.config.ActivityPayloadURI as OpenApiURI

private val logger = KotlinLogging.logger {}

Expand All @@ -31,8 +32,8 @@ class OutputStorageClient<T : Any>
jobId: Long,
attemptNumber: Int,
metricAttributes: Array<MetricAttribute>,
) {
if (obj == null) return
): OpenApiURI? {
if (obj == null) return null

val uri = ActivityPayloadURI.v1(connectionId, jobId, attemptNumber, payloadName)

Expand All @@ -54,25 +55,24 @@ class OutputStorageClient<T : Any>

metricClient.count(OssMetricsRegistry.PAYLOAD_FAILURE_WRITE, 1, *attrs.toTypedArray())
}

return uri.toOpenApi()
}

/**
* Queries object storage based on the provided connection, job and attempt ids and compares to
* the expected. Emits a metric whether it's a match.
* Queries object storage based on the provided uri. Emits a metric whether it's a match.
*/
fun validate(
expected: T?,
connectionId: UUID,
jobId: Long,
attemptNumber: Int,
uri: OpenApiURI,
attrs: List<MetricAttribute>,
) {
if (expected == null) return

val uri = ActivityPayloadURI.v1(connectionId, jobId, attemptNumber, payloadName)
val domainUri = ActivityPayloadURI.fromOpenApi(uri) ?: return

storageClient.validateOutput(
uri,
domainUri,
target,
expected,
comparator,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ type: object
additionalProperties: true
required:
- standardSyncSummary
- state
- output_catalog
properties:
standardSyncSummary:
"$ref": StandardSyncSummary.yaml
Expand All @@ -26,3 +24,7 @@ properties:
"$ref": FailureReason.yaml
uri:
"$ref": ActivityPayloadURI.yaml
catalogUri:
"$ref": ActivityPayloadURI.yaml
stateUri:
"$ref": ActivityPayloadURI.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,7 @@
import io.airbyte.commons.temporal.config.WorkerMode;
import io.airbyte.commons.temporal.exception.RetryableException;
import io.airbyte.config.State;
import io.airbyte.featureflag.Connection;
import io.airbyte.featureflag.FeatureFlagClient;
import io.airbyte.featureflag.NullOutputCatalogOnSyncOutput;
import io.airbyte.featureflag.NullOutputStateOnSyncOutput;
import io.airbyte.featureflag.WriteOutputCatalogToObjectStorage;
import io.airbyte.featureflag.WriteOutputStateToObjectStorage;
import io.airbyte.metrics.lib.ApmTraceUtils;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.workers.context.AttemptContext;
Expand Down Expand Up @@ -111,23 +106,17 @@ public void jobSuccessWithAttemptNumber(final JobSuccessInputWithAttemptNumber i
input.getAttemptNumber());

if (output != null) {
if (featureFlagClient.boolVariation(WriteOutputStateToObjectStorage.INSTANCE, new Connection(input.getConnectionId()))
&& !featureFlagClient.boolVariation(NullOutputStateOnSyncOutput.INSTANCE, new Connection(input.getConnectionId()))) {
if (output.getState() != null && output.getStateUri() != null) {
stateClient.validate(
output.getState(),
input.getConnectionId(),
input.getJobId(),
input.getAttemptNumber(),
output.getStateUri(),
new ArrayList<>());
}

if (featureFlagClient.boolVariation(WriteOutputCatalogToObjectStorage.INSTANCE, new Connection(input.getConnectionId()))
&& !featureFlagClient.boolVariation(NullOutputCatalogOnSyncOutput.INSTANCE, new Connection(input.getConnectionId()))) {
if (output.getOutputCatalog() != null && output.getCatalogUri() != null) {
catalogClient.validate(
output.getOutputCatalog(),
input.getConnectionId(),
input.getJobId(),
input.getAttemptNumber(),
output.getCatalogUri(),
new ArrayList<>());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,21 +226,25 @@ public StandardSyncOutput replicateV2(final ReplicationActivityInput replication
LOGGER.info("sync summary after backfill: {}", standardSyncOutput);

if (featureFlagClient.boolVariation(WriteOutputStateToObjectStorage.INSTANCE, new Connection(connectionId))) {
stateStorageClient.persist(
final var uri = stateStorageClient.persist(
standardSyncOutput.getState(),
connectionId,
Long.parseLong(jobId),
attemptNumber.intValue(),
metricAttributes);

standardSyncOutput.setStateUri(uri);
}

if (featureFlagClient.boolVariation(WriteOutputCatalogToObjectStorage.INSTANCE, new Connection(connectionId))) {
catalogStorageClient.persist(
final var uri = catalogStorageClient.persist(
standardSyncOutput.getOutputCatalog(),
connectionId,
Long.parseLong(jobId),
attemptNumber.intValue(),
metricAttributes);

standardSyncOutput.setCatalogUri(uri);
}

final StandardSyncOutput output = payloadChecker.validatePayloadSize(standardSyncOutput, metricAttributes);
Expand Down

0 comments on commit a9e464c

Please sign in to comment.