From 1024205b2535460838b47d2751a47f17f4747cc0 Mon Sep 17 00:00:00 2001 From: Malik Diarra Date: Wed, 18 Dec 2024 10:34:06 -0800 Subject: [PATCH] feat: add function to retrieve connections per destinationId (#14850) --- .../data/services/ConnectionService.java | 2 ++ .../impls/jooq/ConnectionServiceJooqImpl.java | 28 +++++++++++++++++++ 2 files changed, 30 insertions(+) diff --git a/airbyte-data/src/main/java/io/airbyte/data/services/ConnectionService.java b/airbyte-data/src/main/java/io/airbyte/data/services/ConnectionService.java index 727ab04351..dcd007e5c3 100644 --- a/airbyte-data/src/main/java/io/airbyte/data/services/ConnectionService.java +++ b/airbyte-data/src/main/java/io/airbyte/data/services/ConnectionService.java @@ -44,6 +44,8 @@ Map> listWorkspaceStandardSyncsPaginated(List wor List listConnectionsBySource(UUID sourceId, boolean includeDeleted) throws IOException; + List listConnectionsByDestination(UUID destinationId, boolean includeDeleted) throws IOException; + List listConnectionsByActorDefinitionIdAndType(UUID actorDefinitionId, String actorTypeValue, boolean includeDeleted) throws IOException; diff --git a/airbyte-data/src/main/java/io/airbyte/data/services/impls/jooq/ConnectionServiceJooqImpl.java b/airbyte-data/src/main/java/io/airbyte/data/services/impls/jooq/ConnectionServiceJooqImpl.java index 1965468eab..04ff2c2d1f 100644 --- a/airbyte-data/src/main/java/io/airbyte/data/services/impls/jooq/ConnectionServiceJooqImpl.java +++ b/airbyte-data/src/main/java/io/airbyte/data/services/impls/jooq/ConnectionServiceJooqImpl.java @@ -331,6 +331,34 @@ public List listConnectionsBySource(final UUID sourceId, final boo return getStandardSyncsFromResult(connectionAndOperationIdsResult, getNotificationConfigurationByConnectionIds(connectionIds)); } + /** + * List connections that use a source. + * + * @param destinationId destination id + * @param includeDeleted include deleted + * @return connections that use the provided destination + * @throws IOException if there is an issue while interacting with db. + */ + @Override + public List listConnectionsByDestination(final UUID destinationId, final boolean includeDeleted) + throws IOException { + final Result connectionAndOperationIdsResult = database.query(ctx -> ctx + .select( + CONNECTION.asterisk(), + groupConcat(CONNECTION_OPERATION.OPERATION_ID).separator(OPERATION_IDS_AGG_DELIMITER).as(OPERATION_IDS_AGG_FIELD), + SCHEMA_MANAGEMENT.AUTO_PROPAGATION_STATUS, SCHEMA_MANAGEMENT.BACKFILL_PREFERENCE) + .from(CONNECTION) + .leftJoin(CONNECTION_OPERATION).on(CONNECTION_OPERATION.CONNECTION_ID.eq(CONNECTION.ID)) + .leftJoin(SCHEMA_MANAGEMENT).on(SCHEMA_MANAGEMENT.CONNECTION_ID.eq(CONNECTION.ID)) + .where(CONNECTION.DESTINATION_ID.eq(destinationId) + .and(includeDeleted ? noCondition() : CONNECTION.STATUS.notEqual(StatusType.deprecated))) + .groupBy(CONNECTION.ID, SCHEMA_MANAGEMENT.AUTO_PROPAGATION_STATUS, SCHEMA_MANAGEMENT.BACKFILL_PREFERENCE)).fetch(); + + final List connectionIds = connectionAndOperationIdsResult.map(record -> record.get(CONNECTION.ID)); + + return getStandardSyncsFromResult(connectionAndOperationIdsResult, getNotificationConfigurationByConnectionIds(connectionIds)); + } + /** * List connections that use a particular actor definition. *