Skip to content

Commit

Permalink
Delete deprecated state migration behavior from SyncPersistence. (#12…
Browse files Browse the repository at this point in the history
…083)
  • Loading branch information
gosusnp committed Apr 11, 2024
1 parent 1d6cc97 commit 1ef2eaf
Show file tree
Hide file tree
Showing 2 changed files with 3 additions and 147 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,12 @@ import datadog.trace.api.Trace
import io.airbyte.api.client.AirbyteApiClient
import io.airbyte.api.client.generated.AttemptApi
import io.airbyte.api.client.generated.StateApi
import io.airbyte.api.client.invoker.generated.ApiException
import io.airbyte.api.client.model.generated.AttemptStats
import io.airbyte.api.client.model.generated.AttemptStreamStats
import io.airbyte.api.client.model.generated.ConnectionIdRequestBody
import io.airbyte.api.client.model.generated.ConnectionState
import io.airbyte.api.client.model.generated.ConnectionStateCreateOrUpdate
import io.airbyte.api.client.model.generated.ConnectionStateType
import io.airbyte.api.client.model.generated.SaveStatsRequestBody
import io.airbyte.commons.converters.StateConverter
import io.airbyte.config.StateType
import io.airbyte.config.StateWrapper
import io.airbyte.config.SyncStats
import io.airbyte.config.helpers.StateMessageHelper
import io.airbyte.metrics.lib.MetricAttribute
Expand All @@ -25,7 +20,6 @@ import io.airbyte.metrics.lib.OssMetricsRegistry
import io.airbyte.protocol.models.AirbyteEstimateTraceMessage
import io.airbyte.protocol.models.AirbyteRecordMessage
import io.airbyte.protocol.models.AirbyteStateMessage
import io.airbyte.protocol.models.CatalogHelpers
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog
import io.airbyte.workers.internal.bookkeeping.SyncStatsTracker
import io.airbyte.workers.internal.bookkeeping.getPerStreamStats
Expand Down Expand Up @@ -83,7 +77,6 @@ class SyncPersistenceImpl
) : SyncPersistence, SyncStatsTracker by syncStatsTracker {
private var stateBuffer = stateAggregatorFactory.create()
private var stateFlushFuture: ScheduledFuture<*>? = null
private var onlyFlushAtTheEnd = false
private var isReceivingStats = false
private var stateToFlush: StateAggregator? = null
private var statsToPersist: SaveStatsRequestBody? = null
Expand Down Expand Up @@ -130,30 +123,11 @@ class SyncPersistenceImpl

metricClient.count(OssMetricsRegistry.STATE_BUFFERING, 1)
stateBuffer.ingest(stateMessage)
startBackgroundFlushStateTask(connectionId, stateMessage)
startBackgroundFlushStateTask(connectionId)
}

private fun startBackgroundFlushStateTask(
connectionId: UUID,
stateMessage: AirbyteStateMessage,
) {
if (stateFlushFuture != null || onlyFlushAtTheEnd) {
return
}

// Fetch the current persisted state to see if it is a state migration.
// In case of a state migration, we only flush at the end of the sync to avoid dropping states in
// case of a sync failure
val currentPersistedState: ConnectionState? =
try {
stateApi.getState(ConnectionIdRequestBody().connectionId(connectionId))
} catch (e: ApiException) {
logger.warn(e) { "Failed to check current state for connectionId $connectionId, it will be retried next time we see a state" }
return
}
if (isMigration(currentPersistedState, stateMessage) && stateMessage.type == AirbyteStateMessage.AirbyteStateType.STREAM) {
logger.info { "State type migration from LEGACY to STREAM detected, all states will be persisted at the end of the sync" }
onlyFlushAtTheEnd = true
private fun startBackgroundFlushStateTask(connectionId: UUID) {
if (stateFlushFuture != null) {
return
}

Expand Down Expand Up @@ -220,9 +194,6 @@ class SyncPersistenceImpl
if (hasStatesToFlush()) {
// we still have data to flush
prepareDataForFlush()
if (onlyFlushAtTheEnd) {
validateStreamMigration()
}
try {
retryWithJitterThrows("Flush States from SyncPersistenceImpl") {
doFlushState()
Expand Down Expand Up @@ -333,16 +304,6 @@ class SyncPersistenceImpl
metricClient.count(OssMetricsRegistry.STATE_COMMIT_ATTEMPT_SUCCESSFUL, 1)
}

private fun isMigration(
currentPersistedState: ConnectionState?,
stateMessage: AirbyteStateMessage,
): Boolean {
return (
!isStateEmpty(currentPersistedState) && currentPersistedState?.stateType == ConnectionStateType.LEGACY &&
stateMessage.type != AirbyteStateMessage.AirbyteStateType.LEGACY
)
}

private fun doFlushStats() {
if (!hasStatsToFlush()) {
return
Expand All @@ -364,17 +325,6 @@ class SyncPersistenceImpl

private fun hasStatsToFlush(): Boolean = isReceivingStats && statsToPersist != null

private fun validateStreamMigration() {
val state = stateToFlush?.getAggregated() ?: return

StateMessageHelper.getTypedState(state.state)
.getOrNull()
?.takeIf { it.stateType == StateType.STREAM }
?.let {
validateStreamStates(it, catalog)
}
}

/**
* Wraps RetryWithJitterThrows for testing.
*
Expand Down Expand Up @@ -454,29 +404,3 @@ private fun MetricClient.emitFailedStatsCloseMetrics(connectionId: UUID?) {
val attribute: MetricAttribute? = connectionId?.let { MetricAttribute(MetricTags.CONNECTION_ID, it.toString()) }
count(OssMetricsRegistry.STATS_COMMIT_NOT_ATTEMPTED, 1, attribute)
}

/**
* Validate that the LEGACY -> STREAM migration is correct
*
* During the migration, we will lose any previous stream state that isn't in the new state. To
* avoid a potential loss of state, we ensure that all the incremental streams are present in the
* new state.
*
* @param state the new state we want to persist
* @param configuredCatalog the configured catalog of the connection of state
*/
fun validateStreamStates(
state: StateWrapper,
configuredCatalog: ConfiguredAirbyteCatalog,
) {
val stateStreamDescriptors = state.stateMessages.map { it.stream.streamDescriptor }.toList()

CatalogHelpers.extractIncrementalStreamDescriptors(configuredCatalog)
.find { !stateStreamDescriptors.contains(it) }
?.let {
throw IllegalStateException(
"Job ran during migration from Legacy State to Per Stream State. One of the streams that did not have state is: " +
"(namespace: ${it.namespace}, name: ${it.name}). Job must be retried in order to properly store state.",
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import io.airbyte.api.client.generated.AttemptApi;
import io.airbyte.api.client.generated.StateApi;
import io.airbyte.api.client.invoker.generated.ApiException;
import io.airbyte.api.client.model.generated.ConnectionIdRequestBody;
import io.airbyte.api.client.model.generated.ConnectionState;
import io.airbyte.api.client.model.generated.ConnectionStateCreateOrUpdate;
import io.airbyte.api.client.model.generated.ConnectionStateType;
Expand All @@ -34,12 +33,9 @@
import io.airbyte.protocol.models.AirbyteGlobalState;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import io.airbyte.protocol.models.AirbyteStateMessage;
import io.airbyte.protocol.models.AirbyteStream;
import io.airbyte.protocol.models.AirbyteStreamState;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.StreamDescriptor;
import io.airbyte.protocol.models.SyncMode;
import io.airbyte.workers.internal.bookkeeping.SyncStatsTracker;
import io.airbyte.workers.internal.stateaggregator.StateAggregatorFactory;
import java.util.List;
Expand Down Expand Up @@ -107,7 +103,6 @@ void afterEach() throws Exception {
void testPersistHappyPath() throws ApiException {
final AirbyteStateMessage stateA1 = getStreamState("A", 1);
syncPersistence.persist(connectionId, stateA1);
verify(stateApi).getState(any());
verify(executorService).scheduleAtFixedRate(any(Runnable.class), eq(0L), eq(flushPeriod), eq(TimeUnit.SECONDS));
clearInvocations(executorService, stateApi);

Expand Down Expand Up @@ -395,69 +390,6 @@ void testLegacyStatesAreGettingIntoTheScheduledFlushLogic() throws Exception {
assertTrue(Jsons.serialize(captor.getValue()).contains("myOtherState2"));
}

@Test
void testLegacyStateMigrationToStreamAreOnlyFlushedAtTheEnd() throws Exception {
// Migration is defined by current state returned from the API is LEGACY, and we are trying to
// persist a non LEGACY state
when(stateApi.getState(new ConnectionIdRequestBody().connectionId(connectionId)))
.thenReturn(new ConnectionState().state(Jsons.deserialize("{\"state\":\"some_state\"}")).stateType(ConnectionStateType.LEGACY));

final AirbyteStateMessage message = getStreamState("migration1", 12);
syncPersistence.persist(connectionId, message);
verify(stateApi).getState(new ConnectionIdRequestBody().connectionId(connectionId));
verify(executorService, never()).scheduleAtFixedRate(any(), anyLong(), anyLong(), any());

reset(stateApi);

// Since we're delaying the flush, executorService should not have been called
// We also want to make sure we are not calling getState every time
final AirbyteStateMessage otherMessage = getStreamState("migration2", 10);
syncPersistence.persist(connectionId, otherMessage);
verify(stateApi, never()).getState(new ConnectionIdRequestBody().connectionId(connectionId));
verify(executorService, never()).scheduleAtFixedRate(any(), anyLong(), anyLong(), any());

when(executorService.awaitTermination(anyLong(), any())).thenReturn(true);
when(catalog.getStreams()).thenReturn(List.of(
new ConfiguredAirbyteStream().withStream(new AirbyteStream().withName("migration1")).withSyncMode(SyncMode.INCREMENTAL),
new ConfiguredAirbyteStream().withStream(new AirbyteStream().withName("migration2")).withSyncMode(SyncMode.INCREMENTAL)));
syncPersistence.close();
verifyStateUpdateApiCall(List.of(message, otherMessage));
}

@Test
void testLegacyStateMigrationToGlobalGettingIntoTheScheduledFlushLogic() throws ApiException, InterruptedException {
// Migration is defined by current state returned from the API is LEGACY, and we are trying to
// persist a non LEGACY state
when(stateApi.getState(new ConnectionIdRequestBody().connectionId(connectionId)))
.thenReturn(new ConnectionState().state(Jsons.deserialize("{\"state\":\"some_state\"}")).stateType(ConnectionStateType.LEGACY));

final AirbyteStateMessage message = getGlobalState(14);
syncPersistence.persist(connectionId, message);
verify(stateApi).getState(new ConnectionIdRequestBody().connectionId(connectionId));
verify(executorService).scheduleAtFixedRate(any(), anyLong(), anyLong(), any());
}

@Test
void testDoNotStartThreadUntilStateCheckSucceeds() throws ApiException {
when(stateApi.getState(any()))
.thenThrow(new ApiException())
.thenReturn(null);

final AirbyteStateMessage s1 = getStreamState("stream 1", 9);
syncPersistence.persist(connectionId, s1);
// First getState failed, we should not have started the thread or persisted states
verify(executorService, never()).scheduleAtFixedRate(any(), anyLong(), anyLong(), any());
verify(stateApi, never()).createOrUpdateState(any());

final AirbyteStateMessage s2 = getStreamState("stream 2", 19);
syncPersistence.persist(connectionId, s2);
verify(executorService).scheduleAtFixedRate(any(), anyLong(), anyLong(), any());

// Since the first state check failed, we should be flushing both states on the first flush
actualFlushMethod.getValue().run();
verifyStateUpdateApiCall(List.of(s1, s2));
}

@Test
void testSyncStatsTrackerWrapping() {
syncPersistence.updateStats(new AirbyteRecordMessage());
Expand Down

0 comments on commit 1ef2eaf

Please sign in to comment.