diff --git a/airbyte-commons-server/src/main/java/io/airbyte/commons/server/handlers/JobInputHandler.java b/airbyte-commons-server/src/main/java/io/airbyte/commons/server/handlers/JobInputHandler.java index cde865497c9..a718ae4a962 100644 --- a/airbyte-commons-server/src/main/java/io/airbyte/commons/server/handlers/JobInputHandler.java +++ b/airbyte-commons-server/src/main/java/io/airbyte/commons/server/handlers/JobInputHandler.java @@ -226,8 +226,6 @@ public Object getJobInput(final SyncInput input) { .withDestinationConfiguration(attemptSyncConfig.getDestinationConfiguration()) .withOperationSequence(config.getOperationSequence()) .withWebhookOperationConfigs(config.getWebhookOperationConfigs()) - .withCatalog(config.getConfiguredAirbyteCatalog()) - .withState(attemptSyncConfig.getState()) .withSyncResourceRequirements(config.getSyncResourceRequirements()) .withConnectionId(connectionId) .withWorkspaceId(config.getWorkspaceId()) diff --git a/airbyte-commons-server/src/test/java/io/airbyte/commons/server/handlers/JobInputHandlerTest.java b/airbyte-commons-server/src/test/java/io/airbyte/commons/server/handlers/JobInputHandlerTest.java index e9096cd61af..9ecdcda9a96 100644 --- a/airbyte-commons-server/src/test/java/io/airbyte/commons/server/handlers/JobInputHandlerTest.java +++ b/airbyte-commons-server/src/test/java/io/airbyte/commons/server/handlers/JobInputHandlerTest.java @@ -191,8 +191,6 @@ void testGetSyncWorkflowInput() throws JsonValidationException, ConfigNotFoundEx .withDestinationId(DESTINATION_ID) .withSourceConfiguration(SOURCE_CONFIG_WITH_OAUTH_AND_INJECTED_CONFIG) .withDestinationConfiguration(DESTINATION_CONFIG_WITH_OAUTH) - .withState(STATE) - .withCatalog(jobSyncConfig.getConfiguredAirbyteCatalog()) .withIsReset(false); final JobRunConfig expectedJobRunConfig = new JobRunConfig() @@ -266,8 +264,6 @@ void testGetResetSyncWorkflowInput() throws IOException, ApiException, JsonValid .withDestinationId(DESTINATION_ID) .withSourceConfiguration(Jsons.emptyObject()) .withDestinationConfiguration(DESTINATION_CONFIG_WITH_OAUTH) - .withState(STATE) - .withCatalog(jobResetConfig.getConfiguredAirbyteCatalog()) .withWebhookOperationConfigs(jobResetConfig.getWebhookOperationConfigs()) .withIsReset(true); diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/test_utils/TestConfigHelpers.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/test_utils/TestConfigHelpers.java index e3a8bbce704..9f1d8ccde55 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/test_utils/TestConfigHelpers.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/test_utils/TestConfigHelpers.java @@ -62,9 +62,7 @@ public static ImmutablePair createSyncConfig(fi .withSourceId(replicationInput.getSourceId()) .withDestinationId(replicationInput.getDestinationId()) .withDestinationConfiguration(replicationInput.getDestinationConfiguration()) - .withCatalog(replicationInput.getCatalog()) .withSourceConfiguration(replicationInput.getSourceConfiguration()) - .withState(replicationInput.getState()) .withOperationSequence(replicationInput.getOperationSequence()) .withWorkspaceId(replicationInput.getWorkspaceId()) .withConnectionContext(new ConnectionContext().withOrganizationId(organizationId))); diff --git a/airbyte-config/config-models/src/main/resources/types/StandardSyncInput.yaml b/airbyte-config/config-models/src/main/resources/types/StandardSyncInput.yaml index 86f65a3f95a..33527acf726 100644 --- a/airbyte-config/config-models/src/main/resources/types/StandardSyncInput.yaml +++ b/airbyte-config/config-models/src/main/resources/types/StandardSyncInput.yaml @@ -45,16 +45,6 @@ properties: description: The webhook operation configs belonging to this workspace. See webhookOperationConfigs in StandardWorkspace.yaml. type: object existingJavaType: com.fasterxml.jackson.databind.JsonNode - catalog: - description: DEPRECATED - this is now retrieved separately via API. The configured airbyte catalog. - type: object - # necessary because the configuration declaration is in a separate package. - existingJavaType: io.airbyte.protocol.models.ConfiguredAirbyteCatalog - deprecated: true - state: - description: DEPRECATED - this is now retrieved separately via API. Optional state of the previous run. this object is defined per integration. - "$ref": State.yaml - deprecated: true syncResourceRequirements: description: Resource requirements to use for the sync $ref: SyncResourceRequirements.yaml diff --git a/airbyte-config/config-models/src/main/resources/types/StandardSyncOutput.yaml b/airbyte-config/config-models/src/main/resources/types/StandardSyncOutput.yaml index 037f5bbebc1..16516c32756 100644 --- a/airbyte-config/config-models/src/main/resources/types/StandardSyncOutput.yaml +++ b/airbyte-config/config-models/src/main/resources/types/StandardSyncOutput.yaml @@ -14,10 +14,6 @@ properties: "$ref": NormalizationSummary.yaml webhookOperationSummary: "$ref": WebhookOperationSummary.yaml - state: - "$ref": State.yaml - output_catalog: - existingJavaType: io.airbyte.protocol.models.ConfiguredAirbyteCatalog failures: type: array items: diff --git a/airbyte-db/db-lib/src/test/java/io/airbyte/db/instance/configs/migrations/V0_30_22_001__Store_last_sync_state_test.java b/airbyte-db/db-lib/src/test/java/io/airbyte/db/instance/configs/migrations/V0_30_22_001__Store_last_sync_state_test.java index 3bce25bf50f..a4e0079099f 100644 --- a/airbyte-db/db-lib/src/test/java/io/airbyte/db/instance/configs/migrations/V0_30_22_001__Store_last_sync_state_test.java +++ b/airbyte-db/db-lib/src/test/java/io/airbyte/db/instance/configs/migrations/V0_30_22_001__Store_last_sync_state_test.java @@ -11,10 +11,7 @@ import static io.airbyte.db.instance.configs.migrations.V0_30_22_001__Store_last_sync_state.COLUMN_UPDATED_AT; import static io.airbyte.db.instance.configs.migrations.V0_30_22_001__Store_last_sync_state.TABLE_AIRBYTE_CONFIGS; import static io.airbyte.db.instance.configs.migrations.V0_30_22_001__Store_last_sync_state.getStandardSyncState; -import static org.jooq.impl.DSL.field; -import static org.jooq.impl.DSL.table; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -22,9 +19,6 @@ import io.airbyte.commons.json.Jsons; import io.airbyte.config.ConfigSchema; import io.airbyte.config.Configs; -import io.airbyte.config.JobOutput; -import io.airbyte.config.JobOutput.OutputType; -import io.airbyte.config.StandardSyncOutput; import io.airbyte.config.StandardSyncState; import io.airbyte.config.State; import io.airbyte.db.Database; @@ -33,20 +27,13 @@ import io.airbyte.db.instance.jobs.JobsDatabaseTestProvider; import jakarta.annotation.Nullable; import java.io.IOException; -import java.sql.Connection; import java.sql.SQLException; import java.time.OffsetDateTime; import java.util.Collections; import java.util.Set; import java.util.UUID; import java.util.concurrent.TimeUnit; -import org.flywaydb.core.api.configuration.Configuration; -import org.flywaydb.core.api.migration.Context; import org.jooq.DSLContext; -import org.jooq.Field; -import org.jooq.JSONB; -import org.jooq.Table; -import org.jooq.impl.SQLDataType; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.MethodOrderer; import org.junit.jupiter.api.Order; @@ -58,27 +45,11 @@ @TestMethodOrder(MethodOrderer.OrderAnnotation.class) class V0_30_22_001__Store_last_sync_state_test extends AbstractConfigsDatabaseTest { - private static final OffsetDateTime TIMESTAMP = OffsetDateTime.now(); - - private static final Table JOBS_TABLE = table("jobs"); - private static final Field JOB_ID_FIELD = field("id", SQLDataType.BIGINT); - private static final Field JOB_SCOPE_FIELD = field("scope", SQLDataType.VARCHAR); - private static final Field JOB_CREATED_AT_FIELD = field("created_at", SQLDataType.TIMESTAMPWITHTIMEZONE); - - private static final Table ATTEMPTS_TABLE = table("attempts"); - private static final Field ATTEMPT_ID_FIELD = field("id", SQLDataType.BIGINT); - private static final Field ATTEMPT_JOB_ID_FIELD = field("job_id", SQLDataType.BIGINT); - private static final Field ATTEMPT_NUMBER_FIELD = field("attempt_number", SQLDataType.INTEGER); - private static final Field ATTEMPT_OUTPUT_FIELD = field("output", SQLDataType.JSONB); - private static final Field ATTEMPT_CREATED_AT_FIELD = field("created_at", SQLDataType.TIMESTAMPWITHTIMEZONE); - - private static final UUID CONNECTION_1_ID = UUID.randomUUID(); private static final UUID CONNECTION_2_ID = UUID.randomUUID(); private static final UUID CONNECTION_3_ID = UUID.randomUUID(); private static final State CONNECTION_2_STATE = Jsons.deserialize("{ \"state\": { \"cursor\": 2222 } }", State.class); private static final State CONNECTION_3_STATE = Jsons.deserialize("{ \"state\": { \"cursor\": 3333 } }", State.class); - private static final State CONNECTION_OLD_STATE = Jsons.deserialize("{ \"state\": { \"cursor\": -1 } }", State.class); private static final StandardSyncState STD_CONNECTION_STATE_2 = getStandardSyncState(CONNECTION_2_ID, CONNECTION_2_STATE); private static final StandardSyncState STD_CONNECTION_STATE_3 = getStandardSyncState(CONNECTION_3_ID, CONNECTION_3_STATE); @@ -108,43 +79,6 @@ void testGetJobsDatabase() { .getJobsDatabase(configs.getDatabaseUser(), configs.getDatabasePassword(), configs.getDatabaseUrl()).isPresent()); } - @Test - @Order(20) - void testGetStandardSyncStates() throws Exception { - jobDatabase.query(ctx -> { - // Connection 1 has 1 job, no attempt. - // This is to test that connection without no state is not returned. - createJob(ctx, CONNECTION_1_ID, 30); - - // Connection 2 has two jobs, each has one attempt. - // This is to test that only the state from the latest job is returned. - final long job21 = createJob(ctx, CONNECTION_2_ID, 10); - final long job22 = createJob(ctx, CONNECTION_2_ID, 20); - assertNotEquals(job21, job22); - createAttempt(ctx, job21, 1, createAttemptOutput(CONNECTION_OLD_STATE), 11); - createAttempt(ctx, job22, 1, createAttemptOutput(CONNECTION_2_STATE), 21); - - // Connection 3 has two jobs. - // The first job has multiple attempts. Its third attempt has the latest state. - // The second job has two attempts with no state. - // This is to test that only the state from the latest attempt is returned. - final long job31 = createJob(ctx, CONNECTION_3_ID, 5); - final long job32 = createJob(ctx, CONNECTION_3_ID, 15); - assertNotEquals(job31, job32); - createAttempt(ctx, job31, 1, createAttemptOutput(CONNECTION_OLD_STATE), 6); - createAttempt(ctx, job31, 2, null, 7); - createAttempt(ctx, job31, 3, createAttemptOutput(CONNECTION_3_STATE), 8); - createAttempt(ctx, job31, 4, null, 9); - createAttempt(ctx, job31, 5, null, 10); - createAttempt(ctx, job32, 1, null, 20); - createAttempt(ctx, job32, 2, null, 25); - - assertEquals(STD_CONNECTION_STATES, V0_30_22_001__Store_last_sync_state.getStandardSyncStates(jobDatabase)); - - return null; - }); - } - @Test @Order(30) void testCopyData() throws SQLException { @@ -175,100 +109,6 @@ void testCopyData() throws SQLException { }); } - /** - * Clear the table and test the migration end-to-end. - */ - @Test - @Order(40) - void testMigration() throws Exception { - jobDatabase.query(ctx -> ctx.deleteFrom(TABLE_AIRBYTE_CONFIGS) - .where(COLUMN_CONFIG_TYPE.eq(ConfigSchema.STANDARD_SYNC_STATE.name())) - .execute()); - - final var migration = new V0_30_22_001__Store_last_sync_state(); - // this context is a flyway class - final Context context = new Context() { - - @Override - public Configuration getConfiguration() { - final Configuration configuration = mock(Configuration.class); - when(configuration.getUser()).thenReturn(container.getUsername()); - when(configuration.getPassword()).thenReturn(container.getPassword()); - when(configuration.getUrl()).thenReturn(container.getJdbcUrl()); - return configuration; - } - - @Override - public Connection getConnection() { - try { - return dataSource.getConnection(); - } catch (final SQLException e) { - throw new RuntimeException(e); - } - } - - }; - migration.migrate(context); - jobDatabase.query(ctx -> { - checkSyncStates(ctx, STD_CONNECTION_STATES, null); - return null; - }); - } - - /** - * Create a job record whose scope equals to the passed in connection id, and return the job id. - * - * @param creationOffset Set the creation timestamp to {@code TIMESTAMP} + this passed in offset. - */ - private static long createJob(final DSLContext ctx, final UUID connectionId, final long creationOffset) { - final int insertCount = ctx.insertInto(JOBS_TABLE) - .set(JOB_SCOPE_FIELD, connectionId.toString()) - .set(JOB_CREATED_AT_FIELD, TIMESTAMP.plusDays(creationOffset)) - .execute(); - assertEquals(1, insertCount); - - return ctx.select(JOB_ID_FIELD) - .from(JOBS_TABLE) - .where(JOB_SCOPE_FIELD.eq(connectionId.toString())) - .orderBy(JOB_CREATED_AT_FIELD.desc()) - .limit(1) - .fetchOne() - .get(JOB_ID_FIELD); - } - - /* - * @param creationOffset Set the creation timestamp to {@code TIMESTAMP} + this passed in offset. - */ - private static void createAttempt(final DSLContext ctx, - final long jobId, - final int attemptNumber, - final JobOutput attemptOutput, - final long creationOffset) { - final int insertCount = ctx.insertInto(ATTEMPTS_TABLE) - .set(ATTEMPT_JOB_ID_FIELD, jobId) - .set(ATTEMPT_NUMBER_FIELD, attemptNumber) - .set(ATTEMPT_OUTPUT_FIELD, JSONB.valueOf(Jsons.serialize(attemptOutput))) - .set(ATTEMPT_CREATED_AT_FIELD, TIMESTAMP.plusDays(creationOffset)) - .execute(); - assertEquals(1, insertCount); - - ctx.select(ATTEMPT_ID_FIELD) - .from(ATTEMPTS_TABLE) - .where(ATTEMPT_JOB_ID_FIELD.eq(jobId), ATTEMPT_NUMBER_FIELD.eq(attemptNumber)) - .fetchOne() - .get(ATTEMPT_ID_FIELD); - } - - /** - * Create an JobOutput object whose output type is StandardSyncOutput. - * - * @param state The state object within a StandardSyncOutput. - */ - private static JobOutput createAttemptOutput(final State state) { - final StandardSyncOutput standardSyncOutput = new StandardSyncOutput().withState(state); - return new JobOutput().withOutputType(OutputType.SYNC).withSync(standardSyncOutput); - } - private static void checkSyncStates(final DSLContext ctx, final Set standardSyncStates, @Nullable final OffsetDateTime expectedTimestamp) { diff --git a/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/DefaultJobPersistence.java b/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/DefaultJobPersistence.java index 5e0f758cf7e..f8dc2b833c7 100644 --- a/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/DefaultJobPersistence.java +++ b/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/DefaultJobPersistence.java @@ -434,10 +434,6 @@ private static JobOutput parseJobOutputFromString(final String jobOutputString) // TODO feature flag this for data types rollout // CatalogMigrationV1Helper.upgradeSchemaIfNeeded(jobOutput.getDiscoverCatalog().getCatalog()); CatalogMigrationV1Helper.downgradeSchemaIfNeeded(jobOutput.getDiscoverCatalog().getCatalog()); - } else if (jobOutput.getOutputType() == OutputType.SYNC && jobOutput.getSync() != null && jobOutput.getSync().getOutputCatalog() != null) { - // TODO feature flag this for data types rollout - // CatalogMigrationV1Helper.upgradeSchemaIfNeeded(jobOutput.getSync().getOutputCatalog()); - CatalogMigrationV1Helper.downgradeSchemaIfNeeded(jobOutput.getSync().getOutputCatalog()); } return jobOutput; } diff --git a/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/models/Job.java b/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/models/Job.java index ebef2cfbeb7..378e01aa07b 100644 --- a/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/models/Job.java +++ b/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/models/Job.java @@ -187,19 +187,6 @@ public Optional getLastFailedAttempt() { .findFirst(); } - /** - * Get the last attempt by created_at for the job that had an output. - * - * @return the last attempt. empty optional, if there have been no attempts with outputs. - */ - public Optional getLastAttemptWithOutput() { - return getAttempts() - .stream() - .sorted(Comparator.comparing(Attempt::getCreatedAtInSecond).reversed()) - .filter(a -> a.getOutput().isPresent() && a.getOutput().get().getSync() != null && a.getOutput().get().getSync().getState() != null) - .findFirst(); - } - /** * Get the last attempt by created_at for the job. * diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/GenerateInputActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/GenerateInputActivityImpl.java index 3e3c373fbeb..0bdea5868b8 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/GenerateInputActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/GenerateInputActivityImpl.java @@ -59,12 +59,6 @@ public JobInput getSyncWorkflowInput(final SyncInput input) { .attemptNumber(input.getAttemptId())), "Create job input."), JobInput.class); - // We now fetch the state and catalog until right before we launch the sync pods. - // This is done via the ReplicationInputHydrator#getHydratedInput method. - // Null-ing these out helps avoid Temporal payload size limitations. - jobInput.getSyncInput().setCatalog(null); - jobInput.getSyncInput().setState(null); - MetricAttribute[] attrs = new MetricAttribute[0]; try { attrs = new MetricAttribute[] { diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java index c1d073ff205..4dcadf555a2 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java @@ -31,7 +31,6 @@ import io.micronaut.context.annotation.Requires; import jakarta.inject.Named; import jakarta.inject.Singleton; -import java.util.ArrayList; import lombok.extern.slf4j.Slf4j; /** @@ -97,13 +96,6 @@ public void jobSuccessWithAttemptNumber(final JobSuccessInputWithAttemptNumber i final var output = input.getStandardSyncOutput(); - if (output != null && output.getOutputCatalog() != null && output.getCatalogUri() != null) { - catalogClient.validate( - output.getOutputCatalog(), - output.getCatalogUri(), - new ArrayList<>()); - } - try { final var request = new JobSuccessWithAttemptNumberRequest() .jobId(input.getJobId()) @@ -143,13 +135,6 @@ public void attemptFailureWithAttemptNumber(final AttemptNumberFailureInput inpu final var output = input.getStandardSyncOutput(); - if (output != null && output.getOutputCatalog() != null && output.getCatalogUri() != null) { - catalogClient.validate( - output.getOutputCatalog(), - output.getCatalogUri(), - new ArrayList<>()); - } - try { final var req = new FailAttemptRequest() .attemptNumber(input.getAttemptNumber()) diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivity.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivity.java index 070bb4ccc13..530137a35ba 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivity.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivity.java @@ -7,8 +7,6 @@ import com.fasterxml.jackson.databind.JsonNode; import io.airbyte.config.NormalizationInput; import io.airbyte.config.NormalizationSummary; -import io.airbyte.config.StandardSyncInput; -import io.airbyte.config.StandardSyncOutput; import io.airbyte.persistence.job.models.IntegrationLauncherConfig; import io.airbyte.persistence.job.models.JobRunConfig; import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; @@ -28,18 +26,9 @@ NormalizationSummary normalize(JobRunConfig jobRunConfig, IntegrationLauncherConfig destinationLauncherConfig, NormalizationInput input); - @ActivityMethod - NormalizationInput generateNormalizationInput(final StandardSyncInput syncInput, final StandardSyncOutput syncOutput); - - @ActivityMethod - @Deprecated(forRemoval = true) - NormalizationInput generateNormalizationInputWithMinimumPayload(final JsonNode destinationConfiguration, - final ConfiguredAirbyteCatalog airbyteCatalog, - final UUID workspaceId); - @ActivityMethod NormalizationInput generateNormalizationInputWithMinimumPayloadWithConnectionId(final JsonNode destinationConfiguration, - @Nullable final ConfiguredAirbyteCatalog airbyteCatalog, + @Deprecated @Nullable final ConfiguredAirbyteCatalog airbyteCatalog, final UUID workspaceId, final UUID connectionId, final UUID organizationId); diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivityImpl.java index b3857abcfc3..9978ad60055 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivityImpl.java @@ -37,8 +37,6 @@ import io.airbyte.config.NormalizationInput; import io.airbyte.config.NormalizationSummary; import io.airbyte.config.ResourceRequirements; -import io.airbyte.config.StandardSyncInput; -import io.airbyte.config.StandardSyncOutput; import io.airbyte.config.helpers.LogConfigs; import io.airbyte.config.secrets.SecretsRepositoryReader; import io.airbyte.config.secrets.persistence.RuntimeSecretPersistence; @@ -220,51 +218,17 @@ private NormalizationInput hydrateNormalizationInput(final NormalizationInput in return input.withDestinationConfiguration(fullDestinationConfig).withCatalog(catalog); } - /** - * This activity is deprecated. It is using a big payload which is not needed, it has been replace - * by generateNormalizationInputWithMinimumPayload - * - * @param syncInput sync input - * @param syncOutput sync output - * @return normalization output - */ - @SuppressWarnings("InvalidJavadocPosition") - @Trace(operationName = ACTIVITY_TRACE_OPERATION_NAME) - @Override - @Deprecated(forRemoval = true) - public NormalizationInput generateNormalizationInput(final StandardSyncInput syncInput, final StandardSyncOutput syncOutput) { - return new NormalizationInput() - .withConnectionId(syncInput.getConnectionId()) - .withDestinationConfiguration(syncInput.getDestinationConfiguration()) - .withCatalog(syncOutput.getOutputCatalog()) - .withResourceRequirements(getNormalizationResourceRequirements()) - .withWorkspaceId(syncInput.getWorkspaceId()) - .withConnectionContext(syncInput.getConnectionContext()); - } - - @Trace(operationName = ACTIVITY_TRACE_OPERATION_NAME) - @Override - public NormalizationInput generateNormalizationInputWithMinimumPayload(final JsonNode destinationConfiguration, - final ConfiguredAirbyteCatalog airbyteCatalog, - final UUID workspaceId) { - return new NormalizationInput() - .withDestinationConfiguration(destinationConfiguration) - .withCatalog(airbyteCatalog) - .withResourceRequirements(getNormalizationResourceRequirements()) - .withWorkspaceId(workspaceId); - } - @Trace(operationName = ACTIVITY_TRACE_OPERATION_NAME) @Override public NormalizationInput generateNormalizationInputWithMinimumPayloadWithConnectionId(final JsonNode destinationConfiguration, - @Nullable final ConfiguredAirbyteCatalog airbyteCatalog, + @Deprecated @Nullable final ConfiguredAirbyteCatalog unused, final UUID workspaceId, final UUID connectionId, final UUID organizationId) { return new NormalizationInput() .withConnectionId(connectionId) .withDestinationConfiguration(destinationConfiguration) - .withCatalog(airbyteCatalog) // this may be null as we will hydrate downstream in the NormalizationActivity + .withCatalog(null) // this is null as we will hydrate downstream in the NormalizationActivity .withResourceRequirements(getNormalizationResourceRequirements()) .withWorkspaceId(workspaceId) // As much info as we can give. diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/ReplicationActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/ReplicationActivityImpl.java index 893a897f527..1baa0b5b3af 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/ReplicationActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/ReplicationActivityImpl.java @@ -31,7 +31,6 @@ import io.airbyte.config.secrets.SecretsRepositoryReader; import io.airbyte.featureflag.Connection; import io.airbyte.featureflag.FeatureFlagClient; -import io.airbyte.featureflag.NullOutputCatalogOnSyncOutput; import io.airbyte.featureflag.WriteOutputCatalogToObjectStorage; import io.airbyte.metrics.lib.ApmTraceUtils; import io.airbyte.metrics.lib.MetricAttribute; @@ -256,10 +255,6 @@ private StandardSyncOutput reduceReplicationOutput(final ReplicationOutput outpu syncSummary.setPerformanceMetrics(output.getReplicationAttemptSummary().getPerformanceMetrics()); syncSummary.setStreamCount((long) output.getOutputCatalog().getStreams().size()); - if (!featureFlagClient.boolVariation(NullOutputCatalogOnSyncOutput.INSTANCE, new Connection(connectionId))) { - standardSyncOutput.setOutputCatalog(output.getOutputCatalog()); - } - standardSyncOutput.setStandardSyncSummary(syncSummary); standardSyncOutput.setFailures(output.getFailures()); diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/SyncWorkflowImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/SyncWorkflowImpl.java index d3833598fbd..ee63c0cff24 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/SyncWorkflowImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/SyncWorkflowImpl.java @@ -136,7 +136,7 @@ public StandardSyncOutput run(final JobRunConfig jobRunConfig, LOGGER.info("Not Running Normalization Container for connection {}, attempt id {}, because it ran in destination", connectionId, jobRunConfig.getAttemptId()); } else { - final NormalizationInput normalizationInput = generateNormalizationInput(syncInput, syncOutput); + final NormalizationInput normalizationInput = generateNormalizationInput(syncInput); final NormalizationSummary normalizationSummary = normalizationActivity.normalize(jobRunConfig, destinationLauncherConfig, normalizationInput); syncOutput = syncOutput.withNormalizationSummary(normalizationSummary); @@ -177,10 +177,9 @@ public StandardSyncOutput run(final JobRunConfig jobRunConfig, return syncOutput; } - private NormalizationInput generateNormalizationInput(final StandardSyncInput syncInput, - final StandardSyncOutput syncOutput) { + private NormalizationInput generateNormalizationInput(final StandardSyncInput syncInput) { return normalizationActivity.generateNormalizationInputWithMinimumPayloadWithConnectionId(syncInput.getDestinationConfiguration(), - syncOutput.getOutputCatalog(), + null, syncInput.getWorkspaceId(), syncInput.getConnectionId(), syncInput.getConnectionContext().getOrganizationId()); diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/GenerateInputActivityImplTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/GenerateInputActivityImplTest.java index 20c101cca65..aa6fc2d0fcc 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/GenerateInputActivityImplTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/GenerateInputActivityImplTest.java @@ -4,18 +4,9 @@ package io.airbyte.workers.temporal.scheduling.activities; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.when; - import io.airbyte.api.client.generated.JobsApi; import io.airbyte.commons.temporal.utils.PayloadChecker; -import io.airbyte.config.StandardSyncInput; -import io.airbyte.config.State; -import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; -import io.airbyte.workers.models.JobInput; -import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; @@ -36,25 +27,4 @@ void setUp() { activity = new GenerateInputActivityImpl(mJobsApi, mPayloadChecker); } - @Test - void nullsOutStateAndCatalog() throws Exception { - final var syncInput = new StandardSyncInput() - .withCatalog(new ConfiguredAirbyteCatalog()) - .withState(new State()); - final var jobInput = new JobInput(); - jobInput.setSyncInput(syncInput); - - when(mJobsApi.getJobInput(any())) - .thenReturn(jobInput); - - // mock this to just return the input back to us - when(mPayloadChecker.validatePayloadSize(any(), any())) - .thenAnswer(args -> args.getArguments()[0]); - - final var result = activity.getSyncWorkflowInput(new GenerateInputActivity.SyncInput()); - - Assertions.assertNull(result.getSyncInput().getCatalog()); - Assertions.assertNull(result.getSyncInput().getState()); - } - } diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/sync/SyncWorkflowTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/sync/SyncWorkflowTest.java index 014799b3fd5..f91a6d71d3a 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/sync/SyncWorkflowTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/sync/SyncWorkflowTest.java @@ -142,14 +142,13 @@ void setUp() { syncStats = new SyncStats().withRecordsCommitted(10L); standardSyncSummary = new StandardSyncSummary().withTotalStats(syncStats); failedSyncSummary = new StandardSyncSummary().withStatus(ReplicationStatus.FAILED).withTotalStats(new SyncStats().withRecordsEmitted(0L)); - replicationSuccessOutput = new StandardSyncOutput().withOutputCatalog(syncInput.getCatalog()).withStandardSyncSummary(standardSyncSummary); - replicationFailOutput = new StandardSyncOutput().withOutputCatalog(syncInput.getCatalog()).withStandardSyncSummary(failedSyncSummary); + replicationSuccessOutput = new StandardSyncOutput().withStandardSyncSummary(standardSyncSummary); + replicationFailOutput = new StandardSyncOutput().withStandardSyncSummary(failedSyncSummary); normalizationSummary = new NormalizationSummary(); normalizationInput = new NormalizationInput() .withDestinationConfiguration(syncInput.getDestinationConfiguration()) - .withCatalog(syncInput.getCatalog()) .withResourceRequirements(new ResourceRequirements()) .withConnectionId(syncInput.getConnectionId()) .withWorkspaceId(syncInput.getWorkspaceId())