Skip to content

Commit

Permalink
Remove output state peristence. Add catalog validation to failure cas…
Browse files Browse the repository at this point in the history
…e. (#12102)
  • Loading branch information
tryangul committed Apr 12, 2024
1 parent c322b18 commit b702635
Show file tree
Hide file tree
Showing 4 changed files with 12 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,3 @@ properties:
"$ref": ActivityPayloadURI.yaml
catalogUri:
"$ref": ActivityPayloadURI.yaml
stateUri:
"$ref": ActivityPayloadURI.yaml
4 changes: 0 additions & 4 deletions airbyte-featureflag/src/main/kotlin/FlagDefinitions.kt
Original file line number Diff line number Diff line change
Expand Up @@ -202,12 +202,8 @@ object WriteReplicationOutputToObjectStorage : Temporary<Boolean>(key = "platfor

object ReadReplicationOutputFromObjectStorage : Temporary<Boolean>(key = "platform.read-replication-output-from-object-storage", default = false)

object WriteOutputStateToObjectStorage : Temporary<Boolean>(key = "platform.write-output-state-to-object-storage", default = false)

object WriteOutputCatalogToObjectStorage : Temporary<Boolean>(key = "platform.write-output-catalog-to-object-storage", default = false)

object NullOutputStateOnSyncOutput : Temporary<Boolean>(key = "platform.null-output-state-on-sync-output", default = false)

object NullOutputCatalogOnSyncOutput : Temporary<Boolean>(key = "platform.null-output-catalog-on-sync-output", default = false)

object UseCustomK8sInitCheck : Temporary<Boolean>(key = "platform.use-custom-k8s-init-check", default = true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,20 +105,11 @@ public void jobSuccessWithAttemptNumber(final JobSuccessInputWithAttemptNumber i
input.getJobId(),
input.getAttemptNumber());

if (output != null) {
if (output.getState() != null && output.getStateUri() != null) {
stateClient.validate(
output.getState(),
output.getStateUri(),
new ArrayList<>());
}

if (output.getOutputCatalog() != null && output.getCatalogUri() != null) {
catalogClient.validate(
output.getOutputCatalog(),
output.getCatalogUri(),
new ArrayList<>());
}
if (output != null && output.getOutputCatalog() != null && output.getCatalogUri() != null) {
catalogClient.validate(
output.getOutputCatalog(),
output.getCatalogUri(),
new ArrayList<>());
}

try {
Expand Down Expand Up @@ -164,6 +155,13 @@ public void attemptFailureWithAttemptNumber(final AttemptNumberFailureInput inpu
input.getJobId(),
input.getAttemptNumber());

if (output != null && output.getOutputCatalog() != null && output.getCatalogUri() != null) {
catalogClient.validate(
output.getOutputCatalog(),
output.getCatalogUri(),
new ArrayList<>());
}

try {
final var req = new FailAttemptRequest()
.attemptNumber(input.getAttemptNumber())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,7 @@
import io.airbyte.featureflag.Connection;
import io.airbyte.featureflag.FeatureFlagClient;
import io.airbyte.featureflag.NullOutputCatalogOnSyncOutput;
import io.airbyte.featureflag.NullOutputStateOnSyncOutput;
import io.airbyte.featureflag.WriteOutputCatalogToObjectStorage;
import io.airbyte.featureflag.WriteOutputStateToObjectStorage;
import io.airbyte.metrics.lib.ApmTraceUtils;
import io.airbyte.metrics.lib.MetricAttribute;
import io.airbyte.metrics.lib.MetricClient;
Expand Down Expand Up @@ -225,17 +223,6 @@ public StandardSyncOutput replicateV2(final ReplicationActivityInput replication
BackfillHelper.markBackfilledStreams(streamsToBackfill, standardSyncOutput);
LOGGER.info("sync summary after backfill: {}", standardSyncOutput);

if (featureFlagClient.boolVariation(WriteOutputStateToObjectStorage.INSTANCE, new Connection(connectionId))) {
final var uri = stateStorageClient.persist(
attemptOutput.getState(),
connectionId,
Long.parseLong(jobId),
attemptNumber.intValue(),
metricAttributes);

standardSyncOutput.setStateUri(uri);
}

if (featureFlagClient.boolVariation(WriteOutputCatalogToObjectStorage.INSTANCE, new Connection(connectionId))) {
final var uri = catalogStorageClient.persist(
attemptOutput.getOutputCatalog(),
Expand Down Expand Up @@ -273,10 +260,6 @@ private StandardSyncOutput reduceReplicationOutput(final ReplicationOutput outpu
syncSummary.setPerformanceMetrics(output.getReplicationAttemptSummary().getPerformanceMetrics());
syncSummary.setStreamCount((long) output.getOutputCatalog().getStreams().size());

if (!featureFlagClient.boolVariation(NullOutputStateOnSyncOutput.INSTANCE, new Connection(connectionId))) {
standardSyncOutput.setState(output.getState());
}

if (!featureFlagClient.boolVariation(NullOutputCatalogOnSyncOutput.INSTANCE, new Connection(connectionId))) {
standardSyncOutput.setOutputCatalog(output.getOutputCatalog());
}
Expand Down

0 comments on commit b702635

Please sign in to comment.