Skip to content

Commit

Permalink
delete feature flag used for tracking committed records for global st…
Browse files Browse the repository at this point in the history
…ate (#10560)
  • Loading branch information
jpefaur committed Jan 3, 2024
1 parent 55ea983 commit 022707c
Show file tree
Hide file tree
Showing 9 changed files with 96 additions and 131 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import io.airbyte.featureflag.Source;
import io.airbyte.featureflag.SourceDefinition;
import io.airbyte.featureflag.SourceType;
import io.airbyte.featureflag.TrackCommittedStatsWhenUsingGlobalState;
import io.airbyte.featureflag.Workspace;
import io.airbyte.metrics.lib.MetricAttribute;
import io.airbyte.metrics.lib.MetricClient;
Expand Down Expand Up @@ -168,7 +167,7 @@ public ReplicationWorker create(final ReplicationInput replicationInput,

log.info("Setting up replication worker...");
final SyncPersistence syncPersistence = createSyncPersistence(syncPersistenceFactory, replicationInput, sourceLauncherConfig);
final AirbyteMessageTracker messageTracker = createMessageTracker(featureFlagClient, syncPersistence, featureFlags, replicationInput);
final AirbyteMessageTracker messageTracker = createMessageTracker(syncPersistence, featureFlags, replicationInput);

return createReplicationWorker(airbyteSource, airbyteDestination, messageTracker,
syncPersistence, recordSchemaValidator, fieldSelector, heartbeatTimeoutChaperone,
Expand Down Expand Up @@ -260,14 +259,11 @@ private static DestinationTimeoutMonitor createDestinationTimeout(final FeatureF
/**
* Create MessageTracker.
*/
private static AirbyteMessageTracker createMessageTracker(final FeatureFlagClient featureFlagClient,
final SyncPersistence syncPersistence,
private static AirbyteMessageTracker createMessageTracker(final SyncPersistence syncPersistence,
final FeatureFlags featureFlags,
final ReplicationInput replicationInput) {
Context context = new Multi(List.of(new Workspace(replicationInput.getWorkspaceId()), new Connection(replicationInput.getConnectionId())));
boolean trackCommittedStatsWhenUsingGlobalState = featureFlagClient.boolVariation(TrackCommittedStatsWhenUsingGlobalState.INSTANCE, context);
return new AirbyteMessageTracker(syncPersistence, featureFlags, replicationInput.getSourceLauncherConfig().getDockerImage(),
replicationInput.getDestinationLauncherConfig().getDockerImage(), trackCommittedStatsWhenUsingGlobalState);
replicationInput.getDestinationLauncherConfig().getDockerImage());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ class AirbyteMessageTracker(
featureFlags: FeatureFlags,
private val sourceDockerImage: String,
private val destinationDockerImage: String,
private val trackCommittedStatsWhenUsingGlobalState: Boolean,
) {
private val dstErrorTraceMsgs = ArrayList<AirbyteTraceMessage>()
private val srcErrorTraceMsgs = ArrayList<AirbyteTraceMessage>()
Expand All @@ -43,7 +42,7 @@ class AirbyteMessageTracker(
when (msg.type) {
AirbyteMessage.Type.TRACE -> handleEmittedTrace(msg.trace, AirbyteMessageOrigin.SOURCE)
AirbyteMessage.Type.RECORD -> syncStatsTracker.updateStats(msg.record)
AirbyteMessage.Type.STATE -> syncStatsTracker.updateSourceStatesStats(msg.state, trackCommittedStatsWhenUsingGlobalState)
AirbyteMessage.Type.STATE -> syncStatsTracker.updateSourceStatesStats(msg.state)
AirbyteMessage.Type.CONTROL -> logger.debug { "Control message not currently tracked." }
else -> logger.warn { "Invalid message type for message: $msg" }
}
Expand All @@ -63,7 +62,7 @@ class AirbyteMessageTracker(
AirbyteMessage.Type.STATE ->
msg.state?.let {
stateAggregator.ingest(it)
syncStatsTracker.updateDestinationStateStats(it, trackCommittedStatsWhenUsingGlobalState)
syncStatsTracker.updateDestinationStateStats(it)
}
AirbyteMessage.Type.CONTROL -> logger.debug { "Control message not currently tracked." }
else -> logger.warn { " Invalid message type for message: $msg" }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,49 +61,33 @@ class ParallelStreamStatsTracker(private val metricClient: MetricClient) : SyncS
}
}

override fun updateSourceStatesStats(
stateMessage: AirbyteStateMessage,
trackCommittedStatsWhenUsingGlobalState: Boolean,
) {
if (trackCommittedStatsWhenUsingGlobalState) {
when (stateMessage.type) {
AirbyteStateMessage.AirbyteStateType.GLOBAL -> {
stateMessage.global.streamStates.forEach {
getOrCreateStreamStatsTracker(getNameNamespacePair(it.streamDescriptor))
.trackStateFromSource(stateMessage)
}
}
else -> {
getOrCreateStreamStatsTracker(getNameNamespacePair(stateMessage))
override fun updateSourceStatesStats(stateMessage: AirbyteStateMessage) {
when (stateMessage.type) {
AirbyteStateMessage.AirbyteStateType.GLOBAL -> {
stateMessage.global.streamStates.forEach {
getOrCreateStreamStatsTracker(getNameNamespacePair(it.streamDescriptor))
.trackStateFromSource(stateMessage)
}
}
} else {
getOrCreateStreamStatsTracker(getNameNamespacePair(stateMessage))
.trackStateFromSource(stateMessage)
else -> {
getOrCreateStreamStatsTracker(getNameNamespacePair(stateMessage))
.trackStateFromSource(stateMessage)
}
}
}

override fun updateDestinationStateStats(
stateMessage: AirbyteStateMessage,
trackCommittedStatsWhenUsingGlobalState: Boolean,
) {
if (trackCommittedStatsWhenUsingGlobalState) {
when (stateMessage.type) {
AirbyteStateMessage.AirbyteStateType.GLOBAL -> {
stateMessage.global.streamStates.forEach {
getOrCreateStreamStatsTracker(getNameNamespacePair(it.streamDescriptor))
.trackStateFromDestination(stateMessage)
}
}
else -> {
getOrCreateStreamStatsTracker(getNameNamespacePair(stateMessage))
override fun updateDestinationStateStats(stateMessage: AirbyteStateMessage) {
when (stateMessage.type) {
AirbyteStateMessage.AirbyteStateType.GLOBAL -> {
stateMessage.global.streamStates.forEach {
getOrCreateStreamStatsTracker(getNameNamespacePair(it.streamDescriptor))
.trackStateFromDestination(stateMessage)
}
}
} else {
getOrCreateStreamStatsTracker(getNameNamespacePair(stateMessage))
.trackStateFromDestination(stateMessage)
else -> {
getOrCreateStreamStatsTracker(getNameNamespacePair(stateMessage))
.trackStateFromDestination(stateMessage)
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,12 @@ interface SyncStatsTracker {
/**
* Update the stats count from the source state message.
*/
fun updateSourceStatesStats(
stateMessage: AirbyteStateMessage,
trackCommittedStatsWhenUsingGlobalState: Boolean,
)
fun updateSourceStatesStats(stateMessage: AirbyteStateMessage)

/**
* Update the stats count from the source state message.
*/
fun updateDestinationStateStats(
stateMessage: AirbyteStateMessage,
trackCommittedStatsWhenUsingGlobalState: Boolean,
)
fun updateDestinationStateStats(stateMessage: AirbyteStateMessage)

/**
* Get the per-stream committed bytes count.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -398,20 +398,14 @@ class SyncPersistenceImpl
syncStatsTracker.updateEstimates(estimate)
}

override fun updateSourceStatesStats(
stateMessage: AirbyteStateMessage,
trackCommittedStatsWhenUsingGlobalState: Boolean,
) {
override fun updateSourceStatesStats(stateMessage: AirbyteStateMessage) {
isReceivingStats = true
syncStatsTracker.updateSourceStatesStats(stateMessage, trackCommittedStatsWhenUsingGlobalState)
syncStatsTracker.updateSourceStatesStats(stateMessage)
}

override fun updateDestinationStateStats(
stateMessage: AirbyteStateMessage,
trackCommittedStatsWhenUsingGlobalState: Boolean,
) {
override fun updateDestinationStateStats(stateMessage: AirbyteStateMessage) {
isReceivingStats = true
syncStatsTracker.updateDestinationStateStats(stateMessage, trackCommittedStatsWhenUsingGlobalState)
syncStatsTracker.updateDestinationStateStats(stateMessage)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class AirbyteMessageTrackerTest {
@BeforeEach
void setup() {
this.messageTracker =
new AirbyteMessageTracker(syncStatsTracker, new EnvVariableFeatureFlags(), "airbyte/source-image", "airbyte/destination-image", false);
new AirbyteMessageTracker(syncStatsTracker, new EnvVariableFeatureFlags(), "airbyte/source-image", "airbyte/destination-image");
}

@Test
Expand Down Expand Up @@ -88,7 +88,7 @@ void testAcceptFromSourceState() {

messageTracker.acceptFromSource(state);

verify(syncStatsTracker).updateSourceStatesStats(state.getState(), false);
verify(syncStatsTracker).updateSourceStatesStats(state.getState());
}

@Test
Expand Down Expand Up @@ -169,7 +169,7 @@ void testAcceptFromDestinationState() {

messageTracker.acceptFromDestination(state);

verify(syncStatsTracker).updateDestinationStateStats(state.getState(), false);
verify(syncStatsTracker).updateDestinationStateStats(state.getState());
}

@Test
Expand Down
Loading

0 comments on commit 022707c

Please sign in to comment.