From 022707c379ec8659afbf5c7208f507a885e92a9c Mon Sep 17 00:00:00 2001 From: Jose Pefaur Date: Wed, 3 Jan 2024 08:49:22 -0600 Subject: [PATCH] delete feature flag used for tracking committed records for global state (#10560) --- .../general/ReplicationWorkerFactory.java | 10 +- .../bookkeeping/AirbyteMessageTracker.kt | 5 +- .../bookkeeping/ParallelStreamStatsTracker.kt | 52 +++----- .../internal/bookkeeping/SyncStatsTracker.kt | 10 +- .../syncpersistence/SyncPersistence.kt | 14 +- .../AirbyteMessageTrackerTest.java | 6 +- .../ParallelStreamStatsTrackerTest.java | 120 +++++++++--------- .../SyncPersistenceImplTest.java | 8 +- .../src/main/kotlin/FlagDefinitions.kt | 2 - 9 files changed, 96 insertions(+), 131 deletions(-) diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/ReplicationWorkerFactory.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/ReplicationWorkerFactory.java index 40d8a3c7348..a5d95f44051 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/ReplicationWorkerFactory.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/ReplicationWorkerFactory.java @@ -28,7 +28,6 @@ import io.airbyte.featureflag.Source; import io.airbyte.featureflag.SourceDefinition; import io.airbyte.featureflag.SourceType; -import io.airbyte.featureflag.TrackCommittedStatsWhenUsingGlobalState; import io.airbyte.featureflag.Workspace; import io.airbyte.metrics.lib.MetricAttribute; import io.airbyte.metrics.lib.MetricClient; @@ -168,7 +167,7 @@ public ReplicationWorker create(final ReplicationInput replicationInput, log.info("Setting up replication worker..."); final SyncPersistence syncPersistence = createSyncPersistence(syncPersistenceFactory, replicationInput, sourceLauncherConfig); - final AirbyteMessageTracker messageTracker = createMessageTracker(featureFlagClient, syncPersistence, featureFlags, replicationInput); + final AirbyteMessageTracker messageTracker = createMessageTracker(syncPersistence, featureFlags, replicationInput); return createReplicationWorker(airbyteSource, airbyteDestination, messageTracker, syncPersistence, recordSchemaValidator, fieldSelector, heartbeatTimeoutChaperone, @@ -260,14 +259,11 @@ private static DestinationTimeoutMonitor createDestinationTimeout(final FeatureF /** * Create MessageTracker. */ - private static AirbyteMessageTracker createMessageTracker(final FeatureFlagClient featureFlagClient, - final SyncPersistence syncPersistence, + private static AirbyteMessageTracker createMessageTracker(final SyncPersistence syncPersistence, final FeatureFlags featureFlags, final ReplicationInput replicationInput) { - Context context = new Multi(List.of(new Workspace(replicationInput.getWorkspaceId()), new Connection(replicationInput.getConnectionId()))); - boolean trackCommittedStatsWhenUsingGlobalState = featureFlagClient.boolVariation(TrackCommittedStatsWhenUsingGlobalState.INSTANCE, context); return new AirbyteMessageTracker(syncPersistence, featureFlags, replicationInput.getSourceLauncherConfig().getDockerImage(), - replicationInput.getDestinationLauncherConfig().getDockerImage(), trackCommittedStatsWhenUsingGlobalState); + replicationInput.getDestinationLauncherConfig().getDockerImage()); } /** diff --git a/airbyte-commons-worker/src/main/kotlin/io/airbyte/workers/internal/bookkeeping/AirbyteMessageTracker.kt b/airbyte-commons-worker/src/main/kotlin/io/airbyte/workers/internal/bookkeeping/AirbyteMessageTracker.kt index a4e93a13ef4..de60459c143 100644 --- a/airbyte-commons-worker/src/main/kotlin/io/airbyte/workers/internal/bookkeeping/AirbyteMessageTracker.kt +++ b/airbyte-commons-worker/src/main/kotlin/io/airbyte/workers/internal/bookkeeping/AirbyteMessageTracker.kt @@ -24,7 +24,6 @@ class AirbyteMessageTracker( featureFlags: FeatureFlags, private val sourceDockerImage: String, private val destinationDockerImage: String, - private val trackCommittedStatsWhenUsingGlobalState: Boolean, ) { private val dstErrorTraceMsgs = ArrayList() private val srcErrorTraceMsgs = ArrayList() @@ -43,7 +42,7 @@ class AirbyteMessageTracker( when (msg.type) { AirbyteMessage.Type.TRACE -> handleEmittedTrace(msg.trace, AirbyteMessageOrigin.SOURCE) AirbyteMessage.Type.RECORD -> syncStatsTracker.updateStats(msg.record) - AirbyteMessage.Type.STATE -> syncStatsTracker.updateSourceStatesStats(msg.state, trackCommittedStatsWhenUsingGlobalState) + AirbyteMessage.Type.STATE -> syncStatsTracker.updateSourceStatesStats(msg.state) AirbyteMessage.Type.CONTROL -> logger.debug { "Control message not currently tracked." } else -> logger.warn { "Invalid message type for message: $msg" } } @@ -63,7 +62,7 @@ class AirbyteMessageTracker( AirbyteMessage.Type.STATE -> msg.state?.let { stateAggregator.ingest(it) - syncStatsTracker.updateDestinationStateStats(it, trackCommittedStatsWhenUsingGlobalState) + syncStatsTracker.updateDestinationStateStats(it) } AirbyteMessage.Type.CONTROL -> logger.debug { "Control message not currently tracked." } else -> logger.warn { " Invalid message type for message: $msg" } diff --git a/airbyte-commons-worker/src/main/kotlin/io/airbyte/workers/internal/bookkeeping/ParallelStreamStatsTracker.kt b/airbyte-commons-worker/src/main/kotlin/io/airbyte/workers/internal/bookkeeping/ParallelStreamStatsTracker.kt index 3e0b56d9819..fc77590de4c 100644 --- a/airbyte-commons-worker/src/main/kotlin/io/airbyte/workers/internal/bookkeeping/ParallelStreamStatsTracker.kt +++ b/airbyte-commons-worker/src/main/kotlin/io/airbyte/workers/internal/bookkeeping/ParallelStreamStatsTracker.kt @@ -61,49 +61,33 @@ class ParallelStreamStatsTracker(private val metricClient: MetricClient) : SyncS } } - override fun updateSourceStatesStats( - stateMessage: AirbyteStateMessage, - trackCommittedStatsWhenUsingGlobalState: Boolean, - ) { - if (trackCommittedStatsWhenUsingGlobalState) { - when (stateMessage.type) { - AirbyteStateMessage.AirbyteStateType.GLOBAL -> { - stateMessage.global.streamStates.forEach { - getOrCreateStreamStatsTracker(getNameNamespacePair(it.streamDescriptor)) - .trackStateFromSource(stateMessage) - } - } - else -> { - getOrCreateStreamStatsTracker(getNameNamespacePair(stateMessage)) + override fun updateSourceStatesStats(stateMessage: AirbyteStateMessage) { + when (stateMessage.type) { + AirbyteStateMessage.AirbyteStateType.GLOBAL -> { + stateMessage.global.streamStates.forEach { + getOrCreateStreamStatsTracker(getNameNamespacePair(it.streamDescriptor)) .trackStateFromSource(stateMessage) } } - } else { - getOrCreateStreamStatsTracker(getNameNamespacePair(stateMessage)) - .trackStateFromSource(stateMessage) + else -> { + getOrCreateStreamStatsTracker(getNameNamespacePair(stateMessage)) + .trackStateFromSource(stateMessage) + } } } - override fun updateDestinationStateStats( - stateMessage: AirbyteStateMessage, - trackCommittedStatsWhenUsingGlobalState: Boolean, - ) { - if (trackCommittedStatsWhenUsingGlobalState) { - when (stateMessage.type) { - AirbyteStateMessage.AirbyteStateType.GLOBAL -> { - stateMessage.global.streamStates.forEach { - getOrCreateStreamStatsTracker(getNameNamespacePair(it.streamDescriptor)) - .trackStateFromDestination(stateMessage) - } - } - else -> { - getOrCreateStreamStatsTracker(getNameNamespacePair(stateMessage)) + override fun updateDestinationStateStats(stateMessage: AirbyteStateMessage) { + when (stateMessage.type) { + AirbyteStateMessage.AirbyteStateType.GLOBAL -> { + stateMessage.global.streamStates.forEach { + getOrCreateStreamStatsTracker(getNameNamespacePair(it.streamDescriptor)) .trackStateFromDestination(stateMessage) } } - } else { - getOrCreateStreamStatsTracker(getNameNamespacePair(stateMessage)) - .trackStateFromDestination(stateMessage) + else -> { + getOrCreateStreamStatsTracker(getNameNamespacePair(stateMessage)) + .trackStateFromDestination(stateMessage) + } } } diff --git a/airbyte-commons-worker/src/main/kotlin/io/airbyte/workers/internal/bookkeeping/SyncStatsTracker.kt b/airbyte-commons-worker/src/main/kotlin/io/airbyte/workers/internal/bookkeeping/SyncStatsTracker.kt index e9d4a44b7a0..0e6dc9abb00 100644 --- a/airbyte-commons-worker/src/main/kotlin/io/airbyte/workers/internal/bookkeeping/SyncStatsTracker.kt +++ b/airbyte-commons-worker/src/main/kotlin/io/airbyte/workers/internal/bookkeeping/SyncStatsTracker.kt @@ -32,18 +32,12 @@ interface SyncStatsTracker { /** * Update the stats count from the source state message. */ - fun updateSourceStatesStats( - stateMessage: AirbyteStateMessage, - trackCommittedStatsWhenUsingGlobalState: Boolean, - ) + fun updateSourceStatesStats(stateMessage: AirbyteStateMessage) /** * Update the stats count from the source state message. */ - fun updateDestinationStateStats( - stateMessage: AirbyteStateMessage, - trackCommittedStatsWhenUsingGlobalState: Boolean, - ) + fun updateDestinationStateStats(stateMessage: AirbyteStateMessage) /** * Get the per-stream committed bytes count. diff --git a/airbyte-commons-worker/src/main/kotlin/io/airbyte/workers/internal/syncpersistence/SyncPersistence.kt b/airbyte-commons-worker/src/main/kotlin/io/airbyte/workers/internal/syncpersistence/SyncPersistence.kt index 312d849b395..78296fa792c 100644 --- a/airbyte-commons-worker/src/main/kotlin/io/airbyte/workers/internal/syncpersistence/SyncPersistence.kt +++ b/airbyte-commons-worker/src/main/kotlin/io/airbyte/workers/internal/syncpersistence/SyncPersistence.kt @@ -398,20 +398,14 @@ class SyncPersistenceImpl syncStatsTracker.updateEstimates(estimate) } - override fun updateSourceStatesStats( - stateMessage: AirbyteStateMessage, - trackCommittedStatsWhenUsingGlobalState: Boolean, - ) { + override fun updateSourceStatesStats(stateMessage: AirbyteStateMessage) { isReceivingStats = true - syncStatsTracker.updateSourceStatesStats(stateMessage, trackCommittedStatsWhenUsingGlobalState) + syncStatsTracker.updateSourceStatesStats(stateMessage) } - override fun updateDestinationStateStats( - stateMessage: AirbyteStateMessage, - trackCommittedStatsWhenUsingGlobalState: Boolean, - ) { + override fun updateDestinationStateStats(stateMessage: AirbyteStateMessage) { isReceivingStats = true - syncStatsTracker.updateDestinationStateStats(stateMessage, trackCommittedStatsWhenUsingGlobalState) + syncStatsTracker.updateDestinationStateStats(stateMessage) } } diff --git a/airbyte-commons-worker/src/test/java/io/airbyte/workers/internal/bookkeeping/AirbyteMessageTrackerTest.java b/airbyte-commons-worker/src/test/java/io/airbyte/workers/internal/bookkeeping/AirbyteMessageTrackerTest.java index 2ba1f06d653..2dc83a393db 100644 --- a/airbyte-commons-worker/src/test/java/io/airbyte/workers/internal/bookkeeping/AirbyteMessageTrackerTest.java +++ b/airbyte-commons-worker/src/test/java/io/airbyte/workers/internal/bookkeeping/AirbyteMessageTrackerTest.java @@ -34,7 +34,7 @@ class AirbyteMessageTrackerTest { @BeforeEach void setup() { this.messageTracker = - new AirbyteMessageTracker(syncStatsTracker, new EnvVariableFeatureFlags(), "airbyte/source-image", "airbyte/destination-image", false); + new AirbyteMessageTracker(syncStatsTracker, new EnvVariableFeatureFlags(), "airbyte/source-image", "airbyte/destination-image"); } @Test @@ -88,7 +88,7 @@ void testAcceptFromSourceState() { messageTracker.acceptFromSource(state); - verify(syncStatsTracker).updateSourceStatesStats(state.getState(), false); + verify(syncStatsTracker).updateSourceStatesStats(state.getState()); } @Test @@ -169,7 +169,7 @@ void testAcceptFromDestinationState() { messageTracker.acceptFromDestination(state); - verify(syncStatsTracker).updateDestinationStateStats(state.getState(), false); + verify(syncStatsTracker).updateDestinationStateStats(state.getState()); } @Test diff --git a/airbyte-commons-worker/src/test/java/io/airbyte/workers/internal/bookkeeping/ParallelStreamStatsTrackerTest.java b/airbyte-commons-worker/src/test/java/io/airbyte/workers/internal/bookkeeping/ParallelStreamStatsTrackerTest.java index 313e4874d02..37966ba7e20 100644 --- a/airbyte-commons-worker/src/test/java/io/airbyte/workers/internal/bookkeeping/ParallelStreamStatsTrackerTest.java +++ b/airbyte-commons-worker/src/test/java/io/airbyte/workers/internal/bookkeeping/ParallelStreamStatsTrackerTest.java @@ -71,8 +71,8 @@ void testSerialStreamStatsTracking() { statsTracker.updateStats(S1_MESSAGE1); statsTracker.updateStats(S1_MESSAGE2); final var s1State1 = createStreamState(STREAM1_NAME, 2); - statsTracker.updateSourceStatesStats(s1State1, false); - statsTracker.updateDestinationStateStats(s1State1, false); + statsTracker.updateSourceStatesStats(s1State1); + statsTracker.updateDestinationStateStats(s1State1); statsTracker.updateStats(S1_MESSAGE3); statsTracker.updateStats(S2_MESSAGE1); @@ -102,21 +102,21 @@ void testSerialStreamStatsTrackingOnSingleStream() { final var s1State3 = createStreamState(STREAM1_NAME, 3); statsTracker.updateStats(S1_MESSAGE1); - statsTracker.updateSourceStatesStats(s1State1, false); + statsTracker.updateSourceStatesStats(s1State1); statsTracker.updateStats(S1_MESSAGE1); - statsTracker.updateSourceStatesStats(s1State2, false); + statsTracker.updateSourceStatesStats(s1State2); statsTracker.updateStats(S1_MESSAGE1); - statsTracker.updateSourceStatesStats(s1State3, false); + statsTracker.updateSourceStatesStats(s1State3); - statsTracker.updateDestinationStateStats(s1State1, false); + statsTracker.updateDestinationStateStats(s1State1); final SyncStats actualSyncStatsAfter1 = statsTracker.getTotalStats(false); assertSyncStatsCoreStatsEquals(buildSyncStats(3L, 1L), actualSyncStatsAfter1); - statsTracker.updateDestinationStateStats(s1State2, false); + statsTracker.updateDestinationStateStats(s1State2); final SyncStats actualSyncStatsAfter2 = statsTracker.getTotalStats(false); assertSyncStatsCoreStatsEquals(buildSyncStats(3L, 2L), actualSyncStatsAfter2); - statsTracker.updateDestinationStateStats(s1State3, false); + statsTracker.updateDestinationStateStats(s1State3); final SyncStats actualSyncStatsAfter3 = statsTracker.getTotalStats(false); assertSyncStatsCoreStatsEquals(buildSyncStats(3L, 3L), actualSyncStatsAfter3); } @@ -129,25 +129,25 @@ void testSerialStreamStatsTrackingOnSingleStreamWhileSkippingStates() { final var s1State4 = createStreamState(STREAM1_NAME, 4); statsTracker.updateStats(S1_MESSAGE1); - statsTracker.updateSourceStatesStats(s1State1, false); + statsTracker.updateSourceStatesStats(s1State1); statsTracker.updateStats(S1_MESSAGE1); statsTracker.updateStats(S1_MESSAGE1); - statsTracker.updateSourceStatesStats(s1State2, false); + statsTracker.updateSourceStatesStats(s1State2); statsTracker.updateStats(S1_MESSAGE1); - statsTracker.updateSourceStatesStats(s1State3, false); + statsTracker.updateSourceStatesStats(s1State3); statsTracker.updateStats(S1_MESSAGE1); - statsTracker.updateDestinationStateStats(s1State2, false); + statsTracker.updateDestinationStateStats(s1State2); final SyncStats actualSyncStatsAfter1 = statsTracker.getTotalStats(false); assertSyncStatsCoreStatsEquals(buildSyncStats(5L, 3L), actualSyncStatsAfter1); // Adding more messages around the state to also test the emitted tracking logic statsTracker.updateStats(S1_MESSAGE1); - statsTracker.updateSourceStatesStats(s1State4, false); + statsTracker.updateSourceStatesStats(s1State4); statsTracker.updateStats(S1_MESSAGE1); statsTracker.updateStats(S1_MESSAGE1); - statsTracker.updateDestinationStateStats(s1State4, false); + statsTracker.updateDestinationStateStats(s1State4); final SyncStats actualSyncStatsAfter2 = statsTracker.getTotalStats(false); assertSyncStatsCoreStatsEquals(buildSyncStats(8L, 6L), actualSyncStatsAfter2); } @@ -156,15 +156,15 @@ void testSerialStreamStatsTrackingOnSingleStreamWhileSkippingStates() { void testSerialStreamStatsTrackingCompletedSync() { statsTracker.updateStats(S1_MESSAGE1); final var s1State1 = createStreamState(STREAM1_NAME, 1); - statsTracker.updateSourceStatesStats(s1State1, false); - statsTracker.updateDestinationStateStats(s1State1, false); + statsTracker.updateSourceStatesStats(s1State1); + statsTracker.updateDestinationStateStats(s1State1); statsTracker.updateStats(S2_MESSAGE1); statsTracker.updateStats(S2_MESSAGE2); statsTracker.updateStats(S2_MESSAGE3); final var s2State1 = createStreamState(STREAM2_NAME, 3); - statsTracker.updateSourceStatesStats(s2State1, false); - statsTracker.updateDestinationStateStats(s2State1, false); + statsTracker.updateSourceStatesStats(s2State1); + statsTracker.updateDestinationStateStats(s2State1); // Worth noting, in the current implementation, if replication has completed, we assume all records // to be committed, even though there is no state messages after. @@ -192,12 +192,12 @@ void testParallelStreamStatsTracking() { statsTracker.updateStats(S2_MESSAGE1); statsTracker.updateStats(S1_MESSAGE2); final var s1State1 = createStreamState(STREAM1_NAME, 2); - statsTracker.updateSourceStatesStats(s1State1, false); + statsTracker.updateSourceStatesStats(s1State1); statsTracker.updateStats(S2_MESSAGE2); statsTracker.updateStats(S1_MESSAGE3); final var s1State2 = createStreamState(STREAM1_NAME, 3); - statsTracker.updateSourceStatesStats(s1State2, false); - statsTracker.updateDestinationStateStats(s1State1, false); + statsTracker.updateSourceStatesStats(s1State2); + statsTracker.updateDestinationStateStats(s1State1); // At this point, only s1state1 has been committed. final SyncStats midSyncCheckpoint1Stats = statsTracker.getTotalStats(false); @@ -206,8 +206,8 @@ void testParallelStreamStatsTracking() { // Sending more state for stream 2 final var s2State1 = createStreamState(STREAM2_NAME, 2); - statsTracker.updateSourceStatesStats(s2State1, false); - statsTracker.updateDestinationStateStats(s2State1, false); + statsTracker.updateSourceStatesStats(s2State1); + statsTracker.updateDestinationStateStats(s2State1); // We should now have data for stream two as well final SyncStats midSyncCheckpoint2Stats = statsTracker.getTotalStats(false); @@ -215,7 +215,7 @@ void testParallelStreamStatsTracking() { assertSyncStatsCoreStatsEquals(expectedMidSyncCheckpoint2Stats, midSyncCheckpoint2Stats); // Closing up states - statsTracker.updateDestinationStateStats(s1State2, false); + statsTracker.updateDestinationStateStats(s1State2); final SyncStats midSyncCheckpoint3Stats = statsTracker.getTotalStats(false); final SyncStats expectedMidSyncCheckpoint3Stats = buildSyncStats(5L, 5L); assertSyncStatsCoreStatsEquals(expectedMidSyncCheckpoint3Stats, midSyncCheckpoint3Stats); @@ -230,7 +230,7 @@ void testCommittedStatsTrackingWithGlobalStates() { statsTracker.updateStats(S1_MESSAGE2); AirbyteStateMessage globalState1 = createGlobalState(1, STREAM1_NAME, STREAM2_NAME); // emitted records so far paired with globalState1 - statsTracker.updateSourceStatesStats(globalState1, true); + statsTracker.updateSourceStatesStats(globalState1); // emitted records that will never be committed statsTracker.updateStats(S2_MESSAGE2); @@ -238,10 +238,10 @@ void testCommittedStatsTrackingWithGlobalStates() { AirbyteStateMessage globalState2 = createGlobalState(2, STREAM1_NAME, STREAM2_NAME); // the last 2 emitted records paired with globalState2 - statsTracker.updateSourceStatesStats(globalState2, true); + statsTracker.updateSourceStatesStats(globalState2); // records paired with globalState1 are now considered committed - statsTracker.updateDestinationStateStats(globalState1, true); + statsTracker.updateDestinationStateStats(globalState1); Map streamToCommittedRecords = statsTracker.getStreamToCommittedRecords(); @@ -257,15 +257,15 @@ void testDuplicatedSourceStates() { final var s1State2 = createStreamState(STREAM1_NAME, 2); final var s2State1 = createStreamState(STREAM2_NAME, 1); statsTracker.updateStats(S1_MESSAGE1); - statsTracker.updateSourceStatesStats(s1State1, false); - statsTracker.updateDestinationStateStats(s1State1, false); + statsTracker.updateSourceStatesStats(s1State1); + statsTracker.updateDestinationStateStats(s1State1); statsTracker.updateStats(S1_MESSAGE2); - statsTracker.updateSourceStatesStats(s1State2, false); - statsTracker.updateSourceStatesStats(s1State2, false); // We will drop mid-sync committed stats for the stream because of this - statsTracker.updateDestinationStateStats(s1State2, false); + statsTracker.updateSourceStatesStats(s1State2); + statsTracker.updateSourceStatesStats(s1State2); // We will drop mid-sync committed stats for the stream because of this + statsTracker.updateDestinationStateStats(s1State2); statsTracker.updateStats(S2_MESSAGE1); - statsTracker.updateSourceStatesStats(s2State1, false); - statsTracker.updateDestinationStateStats(s2State1, false); + statsTracker.updateSourceStatesStats(s2State1); + statsTracker.updateDestinationStateStats(s2State1); final SyncStats actualMidSyncSyncStats = statsTracker.getTotalStats(false); assertSyncStatsCoreStatsEquals(buildSyncStats(3L, 2L), actualMidSyncSyncStats); @@ -288,12 +288,12 @@ void testUnexpectedStateFromDestination() { final var s1State2 = createStreamState(STREAM1_NAME, 2); statsTracker.updateStats(S1_MESSAGE1); - statsTracker.updateSourceStatesStats(s1State1, false); - statsTracker.updateDestinationStateStats(createStreamState(STREAM1_NAME, 5), false); // This is unexpected since it never came from the source. - statsTracker.updateDestinationStateStats(s1State1, false); + statsTracker.updateSourceStatesStats(s1State1); + statsTracker.updateDestinationStateStats(createStreamState(STREAM1_NAME, 5)); // This is unexpected since it never came from the source. + statsTracker.updateDestinationStateStats(s1State1); statsTracker.updateStats(S1_MESSAGE2); - statsTracker.updateSourceStatesStats(s1State2, false); - statsTracker.updateDestinationStateStats(s1State2, false); + statsTracker.updateSourceStatesStats(s1State2); + statsTracker.updateDestinationStateStats(s1State2); final SyncStats actualMidSyncSyncStats = statsTracker.getTotalStats(false); assertSyncStatsCoreStatsEquals(buildSyncStats(2L, 2L), actualMidSyncSyncStats); @@ -308,26 +308,26 @@ void testReceivingTheSameStateFromDestinationDoesntFlushUnexpectedStates() { final var s1State3 = createStreamState(STREAM1_NAME, 3); statsTracker.updateStats(S1_MESSAGE1); - statsTracker.updateSourceStatesStats(s1State1, false); + statsTracker.updateSourceStatesStats(s1State1); statsTracker.updateStats(S1_MESSAGE1); - statsTracker.updateSourceStatesStats(s1State2, false); + statsTracker.updateSourceStatesStats(s1State2); statsTracker.updateStats(S1_MESSAGE1); - statsTracker.updateSourceStatesStats(s1State3, false); + statsTracker.updateSourceStatesStats(s1State3); // Sending state 2 should clear state1 and state2 - statsTracker.updateDestinationStateStats(s1State2, false); + statsTracker.updateDestinationStateStats(s1State2); final SyncStats statsAfterState2 = statsTracker.getTotalStats(false); assertSyncStatsCoreStatsEquals(buildSyncStats(3L, 2L), statsAfterState2); // Sending state 1 out of order - statsTracker.updateDestinationStateStats(s1State1, false); + statsTracker.updateDestinationStateStats(s1State1); final SyncStats statsAfterState1OutOfOrder = statsTracker.getTotalStats(false); // Stats count should remain stable because state1 has already been handled assertSyncStatsCoreStatsEquals(buildSyncStats(3L, 2L), statsAfterState1OutOfOrder); verify(metricClient, times(1)).count(OssMetricsRegistry.STATE_ERROR_UNKNOWN_FROM_DESTINATION, 1); // Sending state 2 again - statsTracker.updateDestinationStateStats(s1State2, false); + statsTracker.updateDestinationStateStats(s1State2); final SyncStats statsAfterState2Again = statsTracker.getTotalStats(false); // Stats count should remain stable because state1 has already been handled assertSyncStatsCoreStatsEquals(buildSyncStats(3L, 2L), statsAfterState2Again); @@ -335,14 +335,14 @@ void testReceivingTheSameStateFromDestinationDoesntFlushUnexpectedStates() { // Sending state 3 reset(metricClient); - statsTracker.updateDestinationStateStats(s1State3, false); + statsTracker.updateDestinationStateStats(s1State3); final SyncStats statsAfterState3 = statsTracker.getTotalStats(false); // Stats count should remain stable because state1 has already been handled assertSyncStatsCoreStatsEquals(buildSyncStats(3L, 3L), statsAfterState3); verify(metricClient, never()).count(OssMetricsRegistry.STATE_ERROR_UNKNOWN_FROM_DESTINATION, 1); // Sending state 3 again - statsTracker.updateDestinationStateStats(s1State3, false); + statsTracker.updateDestinationStateStats(s1State3); final SyncStats statsAfterState3Again = statsTracker.getTotalStats(false); // Stats count should remain stable because state1 has already been handled assertSyncStatsCoreStatsEquals(buildSyncStats(3L, 3L), statsAfterState3Again); @@ -356,9 +356,9 @@ void testAccessors() { statsTracker.updateStats(S2_MESSAGE1); final var s1State1 = createStreamState(STREAM1_NAME, 1); final var s2State1 = createStreamState(STREAM2_NAME, 2); - statsTracker.updateSourceStatesStats(s1State1, false); - statsTracker.updateSourceStatesStats(s2State1, false); - statsTracker.updateDestinationStateStats(s2State1, false); + statsTracker.updateSourceStatesStats(s1State1); + statsTracker.updateSourceStatesStats(s2State1); + statsTracker.updateDestinationStateStats(s2State1); assertEquals(Map.of(STREAM1, 0L, STREAM2, 2L), statsTracker.getStreamToCommittedRecords()); assertEquals(Map.of(STREAM1, 0L, STREAM2, 2L * MESSAGE_SIZE), statsTracker.getStreamToCommittedBytes()); @@ -453,14 +453,14 @@ void testCheckpointingMetrics() throws InterruptedException { final var s2State1 = createStreamState(STREAM2_NAME, 1); final var s2State2 = createStreamState(STREAM2_NAME, 3); - statsTracker.updateSourceStatesStats(s1State1, false); - statsTracker.updateSourceStatesStats(s2State1, false); + statsTracker.updateSourceStatesStats(s1State1); + statsTracker.updateSourceStatesStats(s2State1); Thread.sleep(1000); - statsTracker.updateSourceStatesStats(s1State2, false); + statsTracker.updateSourceStatesStats(s1State2); Thread.sleep(1000); - statsTracker.updateSourceStatesStats(s2State2, false); - statsTracker.updateDestinationStateStats(s1State1, false); - statsTracker.updateDestinationStateStats(s2State1, false); + statsTracker.updateSourceStatesStats(s2State2); + statsTracker.updateDestinationStateStats(s1State1); + statsTracker.updateDestinationStateStats(s2State1); assertEquals(4, statsTracker.getTotalSourceStateMessagesEmitted()); assertEquals(2, statsTracker.getTotalDestinationStateMessagesEmitted()); @@ -492,8 +492,8 @@ void testNoStatsForNullStreamAreReturned() { // Checking for LegacyStates final var legacyState = AirbyteMessageUtils.createStateMessage(1337).getState(); - statsTracker.updateSourceStatesStats(legacyState, false); - statsTracker.updateDestinationStateStats(legacyState, false); + statsTracker.updateSourceStatesStats(legacyState); + statsTracker.updateDestinationStateStats(legacyState); final List actualLegacyStreamStats = statsTracker.getAllStreamSyncStats(false); assertStreamSyncStatsCoreStatsEquals(List.of(), actualLegacyStreamStats); @@ -508,8 +508,8 @@ void testNoStatsForNullStreamAreReturned() { // Checking for GlobalStates final var globalState = AirbyteMessageUtils.createGlobalStateMessage(1337).getState(); - statsTracker.updateSourceStatesStats(globalState, false); - statsTracker.updateDestinationStateStats(globalState, false); + statsTracker.updateSourceStatesStats(globalState); + statsTracker.updateDestinationStateStats(globalState); final List actualGlobalStreamStats = statsTracker.getAllStreamSyncStats(false); assertStreamSyncStatsCoreStatsEquals(List.of(), actualGlobalStreamStats); diff --git a/airbyte-commons-worker/src/test/java/io/airbyte/workers/internal/syncpersistence/SyncPersistenceImplTest.java b/airbyte-commons-worker/src/test/java/io/airbyte/workers/internal/syncpersistence/SyncPersistenceImplTest.java index eae3bdbf5a1..75f18b409ec 100644 --- a/airbyte-commons-worker/src/test/java/io/airbyte/workers/internal/syncpersistence/SyncPersistenceImplTest.java +++ b/airbyte-commons-worker/src/test/java/io/airbyte/workers/internal/syncpersistence/SyncPersistenceImplTest.java @@ -466,12 +466,12 @@ void testSyncStatsTrackerWrapping() { verify(syncStatsTracker).updateEstimates(new AirbyteEstimateTraceMessage()); clearInvocations(); - syncPersistence.updateDestinationStateStats(new AirbyteStateMessage(), false); - verify(syncStatsTracker).updateDestinationStateStats(new AirbyteStateMessage(), false); + syncPersistence.updateDestinationStateStats(new AirbyteStateMessage()); + verify(syncStatsTracker).updateDestinationStateStats(new AirbyteStateMessage()); clearInvocations(); - syncPersistence.updateSourceStatesStats(new AirbyteStateMessage(), false); - verify(syncStatsTracker).updateSourceStatesStats(new AirbyteStateMessage(), false); + syncPersistence.updateSourceStatesStats(new AirbyteStateMessage()); + verify(syncStatsTracker).updateSourceStatesStats(new AirbyteStateMessage()); clearInvocations(); syncPersistence.getStreamToCommittedBytes(); diff --git a/airbyte-featureflag/src/main/kotlin/FlagDefinitions.kt b/airbyte-featureflag/src/main/kotlin/FlagDefinitions.kt index eae0e8b7074..17629b1fb11 100644 --- a/airbyte-featureflag/src/main/kotlin/FlagDefinitions.kt +++ b/airbyte-featureflag/src/main/kotlin/FlagDefinitions.kt @@ -177,8 +177,6 @@ object FailMissingPks : Temporary(key = "platform.fail-missing-pks", de object PrintLongRecordPks : Temporary(key = "platform.print-long-record-pks", default = false) -object TrackCommittedStatsWhenUsingGlobalState : Temporary(key = "global-state-committed-stats-tracking-enabled", default = false) - object InjectAwsSecretsToConnectorPods : Temporary(key = "platform.inject-aws-secrets-to-connector-pods", default = false) object UseWorkloadOutputDocStore : Temporary(key = "platform.use-workload-output-doc-store", default = false)