From c63a6e1ae7828e80d0f12fd931f2516bdcb6df38 Mon Sep 17 00:00:00 2001 From: Benoit Moriceau Date: Thu, 22 Feb 2024 13:06:06 -0800 Subject: [PATCH] workload discover acceptance test (#11369) --- .../test/utils/AcceptanceTestHarness.java | 44 +++++++++++++++++++ .../test/acceptance/BasicAcceptanceTests.java | 40 +---------------- .../WorkloadBasicAcceptanceTests.java | 23 +++++++++- flags.yml | 8 ++++ 4 files changed, 76 insertions(+), 39 deletions(-) diff --git a/airbyte-test-utils/src/main/java/io/airbyte/test/utils/AcceptanceTestHarness.java b/airbyte-test-utils/src/main/java/io/airbyte/test/utils/AcceptanceTestHarness.java index d868b992578..924f85100e6 100644 --- a/airbyte-test-utils/src/main/java/io/airbyte/test/utils/AcceptanceTestHarness.java +++ b/airbyte-test-utils/src/main/java/io/airbyte/test/utils/AcceptanceTestHarness.java @@ -21,6 +21,9 @@ import io.airbyte.api.client.model.generated.ActorDefinitionRequestBody; import io.airbyte.api.client.model.generated.ActorType; import io.airbyte.api.client.model.generated.AirbyteCatalog; +import io.airbyte.api.client.model.generated.AirbyteStream; +import io.airbyte.api.client.model.generated.AirbyteStreamAndConfiguration; +import io.airbyte.api.client.model.generated.AirbyteStreamConfiguration; import io.airbyte.api.client.model.generated.AttemptInfoRead; import io.airbyte.api.client.model.generated.CheckConnectionRead; import io.airbyte.api.client.model.generated.ConnectionCreate; @@ -117,6 +120,7 @@ import java.time.Duration; import java.time.Instant; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -181,6 +185,7 @@ public class AcceptanceTestHarness { public static final String STAGING_SCHEMA_NAME = "staging"; public static final String COOL_EMPLOYEES_TABLE_NAME = "cool_employees"; public static final String AWESOME_PEOPLE_TABLE_NAME = "awesome_people"; + public static final String PUBLIC = "public"; private static final String DEFAULT_POSTGRES_INIT_SQL_FILE = "postgres_init.sql"; @@ -1260,4 +1265,43 @@ public WebBackendConnectionUpdate getUpdateInput(final ConnectionRead connection .skipReset(false); } + public void compareCatalog(AirbyteCatalog actual) { + final JsonNode expectedSchema = Jsons.deserialize(""" + { + "type": "object", + "properties": { + "%s": { + "type": "number", + "airbyte_type": "integer" + }, + "%s": { + "type": "string" + } + } + } + """.formatted(COLUMN_ID, COLUMN_NAME)); + final AirbyteStream expectedStream = new AirbyteStream() + .name(STREAM_NAME) + .namespace(PUBLIC) + .jsonSchema(expectedSchema) + .sourceDefinedCursor(null) + .defaultCursorField(Collections.emptyList()) + .sourceDefinedPrimaryKey(Collections.emptyList()) + .supportedSyncModes(List.of(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)); + final AirbyteStreamConfiguration expectedStreamConfig = new AirbyteStreamConfiguration() + .syncMode(SyncMode.FULL_REFRESH) + .cursorField(Collections.emptyList()) + .destinationSyncMode(DestinationSyncMode.OVERWRITE) + .primaryKey(Collections.emptyList()) + .aliasName(STREAM_NAME.replace(".", "_")) + .selected(true) + .suggested(true); + final AirbyteCatalog expected = new AirbyteCatalog() + .streams(Lists.newArrayList(new AirbyteStreamAndConfiguration() + .stream(expectedStream) + .config(expectedStreamConfig))); + + assertEquals(expected, actual); + } + } diff --git a/airbyte-tests/src/test-acceptance/java/io/airbyte/test/acceptance/BasicAcceptanceTests.java b/airbyte-tests/src/test-acceptance/java/io/airbyte/test/acceptance/BasicAcceptanceTests.java index 935bc55eae7..52a45876505 100644 --- a/airbyte-tests/src/test-acceptance/java/io/airbyte/test/acceptance/BasicAcceptanceTests.java +++ b/airbyte-tests/src/test-acceptance/java/io/airbyte/test/acceptance/BasicAcceptanceTests.java @@ -19,6 +19,7 @@ import static io.airbyte.test.utils.AcceptanceTestHarness.COLUMN_ID; import static io.airbyte.test.utils.AcceptanceTestHarness.COLUMN_NAME; import static io.airbyte.test.utils.AcceptanceTestHarness.COOL_EMPLOYEES_TABLE_NAME; +import static io.airbyte.test.utils.AcceptanceTestHarness.PUBLIC; import static io.airbyte.test.utils.AcceptanceTestHarness.PUBLIC_SCHEMA_NAME; import static io.airbyte.test.utils.AcceptanceTestHarness.STAGING_SCHEMA_NAME; import static io.airbyte.test.utils.AcceptanceTestHarness.STREAM_NAME; @@ -39,7 +40,6 @@ import io.airbyte.api.client.model.generated.AirbyteCatalog; import io.airbyte.api.client.model.generated.AirbyteStream; import io.airbyte.api.client.model.generated.AirbyteStreamAndConfiguration; -import io.airbyte.api.client.model.generated.AirbyteStreamConfiguration; import io.airbyte.api.client.model.generated.AttemptInfoRead; import io.airbyte.api.client.model.generated.AttemptStatus; import io.airbyte.api.client.model.generated.CheckConnectionRead; @@ -143,7 +143,6 @@ class BasicAcceptanceTests { static final String DUPLICATE_TEST_IN_GKE = "TODO(https://github.com/airbytehq/airbyte-platform-internal/issues/5182): eliminate test duplication"; static final String TYPE = "type"; - static final String PUBLIC = "public"; static final String E2E_TEST_SOURCE = "E2E Test Source -"; static final String INFINITE_FEED = "INFINITE_FEED"; static final String MESSAGE_INTERVAL = "message_interval"; @@ -277,42 +276,7 @@ void testDiscoverSourceSchema() throws ApiException { final AirbyteCatalog actual = testHarness.discoverSourceSchema(sourceId); - final JsonNode expectedSchema = Jsons.deserialize(""" - { - "type": "object", - "properties": { - "%s": { - "type": "number", - "airbyte_type": "integer" - }, - "%s": { - "type": "string" - } - } - } - """.formatted(COLUMN_ID, COLUMN_NAME)); - final AirbyteStream expectedStream = new AirbyteStream() - .name(STREAM_NAME) - .namespace(PUBLIC) - .jsonSchema(expectedSchema) - .sourceDefinedCursor(null) - .defaultCursorField(Collections.emptyList()) - .sourceDefinedPrimaryKey(Collections.emptyList()) - .supportedSyncModes(List.of(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)); - final AirbyteStreamConfiguration expectedStreamConfig = new AirbyteStreamConfiguration() - .syncMode(SyncMode.FULL_REFRESH) - .cursorField(Collections.emptyList()) - .destinationSyncMode(DestinationSyncMode.OVERWRITE) - .primaryKey(Collections.emptyList()) - .aliasName(STREAM_NAME.replace(".", "_")) - .selected(true) - .suggested(true); - final AirbyteCatalog expected = new AirbyteCatalog() - .streams(Lists.newArrayList(new AirbyteStreamAndConfiguration() - .stream(expectedStream) - .config(expectedStreamConfig))); - - assertEquals(expected, actual); + testHarness.compareCatalog(actual); } @Test diff --git a/airbyte-tests/src/test-acceptance/java/io/airbyte/test/acceptance/WorkloadBasicAcceptanceTests.java b/airbyte-tests/src/test-acceptance/java/io/airbyte/test/acceptance/WorkloadBasicAcceptanceTests.java index a296246d867..9ea1ca4283e 100644 --- a/airbyte-tests/src/test-acceptance/java/io/airbyte/test/acceptance/WorkloadBasicAcceptanceTests.java +++ b/airbyte-tests/src/test-acceptance/java/io/airbyte/test/acceptance/WorkloadBasicAcceptanceTests.java @@ -8,8 +8,10 @@ import static io.airbyte.test.acceptance.BasicAcceptanceTestsResources.IS_GKE; import static io.airbyte.test.acceptance.BasicAcceptanceTestsResources.KUBE; import static io.airbyte.test.acceptance.BasicAcceptanceTestsResources.TRUE; +import static org.junit.jupiter.api.Assertions.assertEquals; import io.airbyte.api.client.invoker.generated.ApiException; +import io.airbyte.api.client.model.generated.AirbyteCatalog; import io.airbyte.api.client.model.generated.CheckConnectionRead; import io.airbyte.api.client.model.generated.CheckConnectionRead.StatusEnum; import java.io.IOException; @@ -36,6 +38,7 @@ public class WorkloadBasicAcceptanceTests { static final UUID RUN_WITH_WORKLOAD_WITHOUT_DOC_STORE_WORKSPACE_ID = UUID.fromString("3d2985a0-a412-45f4-9124-e15800b739be"); static final UUID RUN_CHECK_WITH_WORKLOAD_WORKSPACE_ID = UUID.fromString("1bdcfb61-219b-4290-be4f-12f9ac5461be"); + static final UUID RUN_DISCOVER_WITH_WORKLOAD_WORKSPACE_ID = UUID.fromString("3851861d-ac0b-440c-bd60-408cf9e7fc0e"); @BeforeAll static void init() throws URISyntaxException, IOException, InterruptedException, ApiException { @@ -85,7 +88,25 @@ void testDestinationCheckConnectionWithWorkload() throws Exception { final CheckConnectionRead.StatusEnum checkOperationStatus = testResources.getTestHarness().checkDestination(destinationId); Assertions.assertNotNull(checkOperationStatus); - Assertions.assertEquals(StatusEnum.SUCCEEDED, checkOperationStatus); + assertEquals(StatusEnum.SUCCEEDED, checkOperationStatus); + } + + @Test + @EnabledIfEnvironmentVariable(named = KUBE, + matches = TRUE) + @DisabledIfEnvironmentVariable(named = IS_GKE, + matches = TRUE, + disabledReason = DISABLE_TEMPORAL_TESTS_IN_GKE) + void testDiscoverSourceSchema() throws Exception { + // Create workspace with static ID for test which is used in the flags.yaml to perform an override + // in order to exercise the workload path. + testResources.getTestHarness().createWorkspaceWithId(RUN_DISCOVER_WITH_WORKLOAD_WORKSPACE_ID); + + final UUID sourceId = testResources.getTestHarness().createPostgresSource(RUN_DISCOVER_WITH_WORKLOAD_WORKSPACE_ID).getSourceId(); + + final AirbyteCatalog actual = testResources.getTestHarness().discoverSourceSchema(sourceId); + + testResources.getTestHarness().compareCatalog(actual); } } diff --git a/flags.yml b/flags.yml index b8db8c36249..52dc87afe6f 100644 --- a/flags.yml +++ b/flags.yml @@ -57,3 +57,11 @@ flags: include: - "1bdcfb61-219b-4290-be4f-12f9ac5461be" serve: true + - name: platform.use-workload-api-for-discover + serve: false + # Override used in order to run an acceptance test + context: + - type: "workspace" + include: + - "3851861d-ac0b-440c-bd60-408cf9e7fc0e" + serve: true