From bc0d4e0c9a23df4e675bee630ea29cb3170644d6 Mon Sep 17 00:00:00 2001 From: "Pedro S. Lopez" Date: Thu, 25 Apr 2024 15:34:12 -0400 Subject: [PATCH] Use new breaking change pins on support state crons (#12195) --- .../io/airbyte/bootloader/BootloaderTest.java | 84 +++---- .../ActorDefinitionVersionHelper.java | 91 ------- .../persistence/BreakingChangesHelper.java | 107 +++++++++ .../ActorDefinitionVersionHelperTest.java | 222 ------------------ .../BreakingChangesHelperTest.java | 186 +++++++++++++++ .../config/init/SupportStateUpdater.java | 24 +- .../config/init/SupportStateUpdaterTest.java | 33 +-- .../data/services/DestinationService.java | 2 + .../airbyte/data/services/SourceService.java | 2 + .../jooq/DestinationServiceJooqImpl.java | 12 + .../impls/jooq/SourceServiceJooqImpl.java | 10 + .../ScopedConfigurationRepository.kt | 9 + .../services/ScopedConfigurationService.kt | 12 + .../ScopedConfigurationServiceDataImpl.kt | 18 ++ .../ScopedConfigurationRepositoryTest.kt | 112 +++++++++ .../ScopedConfigurationServiceDataImplTest.kt | 66 ++++++ 16 files changed, 600 insertions(+), 390 deletions(-) diff --git a/airbyte-bootloader/src/test/java/io/airbyte/bootloader/BootloaderTest.java b/airbyte-bootloader/src/test/java/io/airbyte/bootloader/BootloaderTest.java index 5af2e7eaf38..7bd103e9e8c 100644 --- a/airbyte-bootloader/src/test/java/io/airbyte/bootloader/BootloaderTest.java +++ b/airbyte-bootloader/src/test/java/io/airbyte/bootloader/BootloaderTest.java @@ -10,7 +10,6 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -import io.airbyte.bootloader.helpers.NoOpDefinitionVersionOverrideProvider; import io.airbyte.commons.features.FeatureFlags; import io.airbyte.commons.resources.MoreResources; import io.airbyte.commons.version.AirbyteProtocolVersionRange; @@ -23,7 +22,7 @@ import io.airbyte.config.init.DeclarativeSourceUpdater; import io.airbyte.config.init.PostLoadExecutor; import io.airbyte.config.init.SupportStateUpdater; -import io.airbyte.config.persistence.ActorDefinitionVersionHelper; +import io.airbyte.config.persistence.BreakingChangesHelper; import io.airbyte.config.persistence.ConfigRepository; import io.airbyte.config.persistence.OrganizationPersistence; import io.airbyte.config.secrets.SecretsRepositoryReader; @@ -168,6 +167,11 @@ void testBootloaderAppBlankDb() throws Exception { secretPersistenceConfigService, connectionService, actorDefinitionVersionUpdater); + val workspaceService = new WorkspaceServiceJooqImpl(configDatabase, + featureFlagClient, + secretsRepositoryReader, + secretsRepositoryWriter, + secretPersistenceConfigService); val configRepository = new ConfigRepository( new ActorDefinitionServiceJooqImpl(configDatabase), new CatalogServiceJooqImpl(configDatabase), @@ -180,11 +184,7 @@ void testBootloaderAppBlankDb() throws Exception { secretPersistenceConfigService), new OperationServiceJooqImpl(configDatabase), sourceService, - new WorkspaceServiceJooqImpl(configDatabase, - featureFlagClient, - secretsRepositoryReader, - secretsRepositoryWriter, - secretPersistenceConfigService)); + workspaceService); val configsDatabaseInitializationTimeoutMs = TimeUnit.SECONDS.toMillis(60L); val configDatabaseInitializer = DatabaseCheckFactory.createConfigsDatabaseInitializer(configsDslContext, configsDatabaseInitializationTimeoutMs, MoreResources.readResource(DatabaseConstants.CONFIGS_INITIAL_SCHEMA_PATH)); @@ -198,13 +198,10 @@ void testBootloaderAppBlankDb() throws Exception { val organizationPersistence = new OrganizationPersistence(jobDatabase); val protocolVersionChecker = new ProtocolVersionChecker(jobsPersistence, airbyteProtocolRange, configRepository, definitionsProvider); val breakingChangeNotificationHelper = new BreakingChangeNotificationHelper(configRepository, featureFlagClient); - val actorDefinitionVersionHelper = - new ActorDefinitionVersionHelper(configRepository, new NoOpDefinitionVersionOverrideProvider(), new NoOpDefinitionVersionOverrideProvider(), - featureFlagClient); + val breakingChangeHelper = new BreakingChangesHelper(scopedConfigurationService, workspaceService, destinationService, sourceService); val supportStateUpdater = - new SupportStateUpdater(actorDefinitionService, sourceService, destinationService, DeploymentMode.OSS, - actorDefinitionVersionHelper, breakingChangeNotificationHelper, - featureFlagClient); + new SupportStateUpdater(actorDefinitionService, sourceService, destinationService, DeploymentMode.OSS, breakingChangeHelper, + breakingChangeNotificationHelper, featureFlagClient); val metricClient = new NotImplementedMetricClient(); val applyDefinitionsHelper = new ApplyDefinitionsHelper(definitionsProvider, jobsPersistence, actorDefinitionService, sourceService, destinationService, @@ -261,35 +258,38 @@ void testRequiredVersionUpgradePredicate() throws Exception { connectionService, actorDefinitionService, scopedConfigurationService); + val sourceService = new SourceServiceJooqImpl(configDatabase, + featureFlagClient, + mock(SecretsRepositoryReader.class), + mock(SecretsRepositoryWriter.class), + mock(SecretPersistenceConfigService.class), + connectionService, + actorDefinitionVersionUpdater); + val destinationService = new DestinationServiceJooqImpl(configDatabase, + featureFlagClient, + mock(SecretsRepositoryReader.class), + mock(SecretsRepositoryWriter.class), + mock(SecretPersistenceConfigService.class), + connectionService, + actorDefinitionVersionUpdater); + val workspaceService = new WorkspaceServiceJooqImpl(configDatabase, + featureFlagClient, + mock(SecretsRepositoryReader.class), + mock(SecretsRepositoryWriter.class), + mock(SecretPersistenceConfigService.class)); val configRepository = new ConfigRepository( new ActorDefinitionServiceJooqImpl(configDatabase), new CatalogServiceJooqImpl(configDatabase), connectionService, new ConnectorBuilderServiceJooqImpl(configDatabase), - new DestinationServiceJooqImpl(configDatabase, - featureFlagClient, - mock(SecretsRepositoryReader.class), - mock(SecretsRepositoryWriter.class), - mock(SecretPersistenceConfigService.class), - connectionService, - actorDefinitionVersionUpdater), + destinationService, new OAuthServiceJooqImpl(configDatabase, featureFlagClient, mock(SecretsRepositoryReader.class), mock(SecretPersistenceConfigService.class)), new OperationServiceJooqImpl(configDatabase), - new SourceServiceJooqImpl(configDatabase, - featureFlagClient, - mock(SecretsRepositoryReader.class), - mock(SecretsRepositoryWriter.class), - mock(SecretPersistenceConfigService.class), - connectionService, - actorDefinitionVersionUpdater), - new WorkspaceServiceJooqImpl(configDatabase, - featureFlagClient, - mock(SecretsRepositoryReader.class), - mock(SecretsRepositoryWriter.class), - mock(SecretPersistenceConfigService.class))); + sourceService, + workspaceService); val configsDatabaseInitializationTimeoutMs = TimeUnit.SECONDS.toMillis(60L); val configDatabaseInitializer = DatabaseCheckFactory.createConfigsDatabaseInitializer(configsDslContext, configsDatabaseInitializationTimeoutMs, MoreResources.readResource(DatabaseConstants.CONFIGS_INITIAL_SCHEMA_PATH)); @@ -302,25 +302,9 @@ void testRequiredVersionUpgradePredicate() throws Exception { val jobsPersistence = new DefaultJobPersistence(jobDatabase); val organizationPersistence = new OrganizationPersistence(jobDatabase); val breakingChangeNotificationHelper = new BreakingChangeNotificationHelper(configRepository, featureFlagClient); - val actorDefinitionVersionHelper = - new ActorDefinitionVersionHelper(configRepository, new NoOpDefinitionVersionOverrideProvider(), new NoOpDefinitionVersionOverrideProvider(), - featureFlagClient); - val sourceService = new SourceServiceJooqImpl(configDatabase, - featureFlagClient, - mock(SecretsRepositoryReader.class), - mock(SecretsRepositoryWriter.class), - mock(SecretPersistenceConfigService.class), - connectionService, - actorDefinitionVersionUpdater); - val destinationService = new DestinationServiceJooqImpl(configDatabase, - featureFlagClient, - mock(SecretsRepositoryReader.class), - mock(SecretsRepositoryWriter.class), - mock(SecretPersistenceConfigService.class), - connectionService, - actorDefinitionVersionUpdater); + val breakingChangesHelper = new BreakingChangesHelper(scopedConfigurationService, workspaceService, destinationService, sourceService); val supportStateUpdater = - new SupportStateUpdater(actorDefinitionService, sourceService, destinationService, DeploymentMode.OSS, actorDefinitionVersionHelper, + new SupportStateUpdater(actorDefinitionService, sourceService, destinationService, DeploymentMode.OSS, breakingChangesHelper, breakingChangeNotificationHelper, featureFlagClient); val protocolVersionChecker = new ProtocolVersionChecker(jobsPersistence, airbyteProtocolRange, configRepository, definitionsProvider); val metricClient = new NotImplementedMetricClient(); diff --git a/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/ActorDefinitionVersionHelper.java b/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/ActorDefinitionVersionHelper.java index 632e9ef4b9c..cf16f893935 100644 --- a/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/ActorDefinitionVersionHelper.java +++ b/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/ActorDefinitionVersionHelper.java @@ -11,7 +11,6 @@ import io.airbyte.config.SourceConnection; import io.airbyte.config.StandardDestinationDefinition; import io.airbyte.config.StandardSourceDefinition; -import io.airbyte.config.persistence.ConfigRepository.StandardSyncQuery; import io.airbyte.config.persistence.version_overrides.DefinitionVersionOverrideProvider; import io.airbyte.featureflag.EnableConfigurationOverrideProvider; import io.airbyte.featureflag.FeatureFlagClient; @@ -22,13 +21,9 @@ import jakarta.inject.Named; import jakarta.inject.Singleton; import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.Optional; import java.util.UUID; -import kotlin.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -263,92 +258,6 @@ public boolean getSourceOrDestinationIsAlphaOrBeta(final UUID workspaceId, return hasAlphaOrBetaVersion(List.of(sourceVersion, destinationVersion)); } - /** - * Helper method to retrieve active syncs using a given source version, excluding syncs that have a - * version override applied. - * - * @param sourceDefinition - source definition - * @param sourceVersionIds - source version ids - * @return list of pairs, where the first element is the workspace id and the second element is a - * list of affected sync ids in the workspace - */ - public List>> getActiveWorkspaceSyncsWithSourceVersionIds(final StandardSourceDefinition sourceDefinition, - final List sourceVersionIds) - throws IOException, ConfigNotFoundException, JsonValidationException { - final List sourceConnections = configRepository.listSourcesWithVersionIds(sourceVersionIds); - final Map> sourceConnectionsByWorkspace = new HashMap<>(); - - // verify that a version override has not been applied to the source, and collect by workspace - for (final SourceConnection source : sourceConnections) { - final ActorDefinitionVersionWithOverrideStatus versionWithOverrideStatus = - getSourceVersionWithOverrideStatus(sourceDefinition, source.getWorkspaceId(), source.getSourceId()); - if (!versionWithOverrideStatus.isOverrideApplied()) { - sourceConnectionsByWorkspace - .computeIfAbsent(source.getWorkspaceId(), k -> new ArrayList<>()) - .add(source); - } - } - - // get affected syncs for each workspace and add them to the list - final List>> workspaceSyncIds = new ArrayList<>(); - for (final Map.Entry> entry : sourceConnectionsByWorkspace.entrySet()) { - final UUID workspaceId = entry.getKey(); - final List sourcesForWorkspace = entry.getValue(); - final List sourceIds = sourcesForWorkspace.stream().map(SourceConnection::getSourceId).toList(); - final StandardSyncQuery syncQuery = new StandardSyncQuery(workspaceId, sourceIds, null, false); - final List activeSyncIds = configRepository.listWorkspaceActiveSyncIds(syncQuery); - if (!activeSyncIds.isEmpty()) { - workspaceSyncIds.add(new Pair<>(workspaceId, activeSyncIds)); - } - } - - return workspaceSyncIds; - } - - /** - * Helper method to retrieve active syncs using a given destination version, excluding syncs that - * have a version override applied. - * - * @param destinationDefinition - destination definition - * @param destinationVersionIds - destination version ids - * @return list of pairs, where the first element is the workspace id and the second element is a - * list of affected sync ids in the workspace - */ - public List>> getActiveWorkspaceSyncsWithDestinationVersionIds(final StandardDestinationDefinition destinationDefinition, - final List destinationVersionIds) - throws IOException, JsonValidationException, ConfigNotFoundException { - final List destinationConnections = configRepository.listDestinationsWithVersionIds(destinationVersionIds); - final Map> destinationConnectionsByWorkspace = new HashMap<>(); - - // verify that a version override has not been applied to the destination, and collect by workspace - for (final DestinationConnection destination : destinationConnections) { - final ActorDefinitionVersionWithOverrideStatus versionWithOverrideStatus = getDestinationVersionWithOverrideStatus( - destinationDefinition, - destination.getWorkspaceId(), - destination.getDestinationId()); - if (!versionWithOverrideStatus.isOverrideApplied()) { - destinationConnectionsByWorkspace - .computeIfAbsent(destination.getWorkspaceId(), k -> new ArrayList<>()) - .add(destination); - } - } - - // get affected syncs for each workspace and add them to the list - final List>> workspaceSyncIds = new ArrayList<>(); - for (final Map.Entry> entry : destinationConnectionsByWorkspace.entrySet()) { - final UUID workspaceId = entry.getKey(); - final List destinationsForWorkspace = entry.getValue(); - final List destinationIds = destinationsForWorkspace.stream().map(DestinationConnection::getDestinationId).toList(); - final StandardSyncQuery syncQuery = new StandardSyncQuery(workspaceId, null, destinationIds, false); - final List activeSyncIds = configRepository.listWorkspaceActiveSyncIds(syncQuery); - if (!activeSyncIds.isEmpty()) { - workspaceSyncIds.add(new Pair<>(workspaceId, activeSyncIds)); - } - } - - return workspaceSyncIds; - } - /** * Get the docker image name (docker_repository:docker_image_tag) for a given actor definition * version. diff --git a/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/BreakingChangesHelper.java b/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/BreakingChangesHelper.java index ed0d2213948..4efa8089b9c 100644 --- a/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/BreakingChangesHelper.java +++ b/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/BreakingChangesHelper.java @@ -7,12 +7,30 @@ import io.airbyte.commons.version.Version; import io.airbyte.config.ActorDefinitionBreakingChange; import io.airbyte.config.ActorDefinitionVersion; +import io.airbyte.config.ActorType; +import io.airbyte.config.ConfigOriginType; +import io.airbyte.config.ConfigResourceType; +import io.airbyte.config.ConfigScopeType; +import io.airbyte.config.DestinationConnection; +import io.airbyte.config.ScopedConfiguration; +import io.airbyte.config.SourceConnection; import io.airbyte.data.exceptions.ConfigNotFoundException; import io.airbyte.data.services.ActorDefinitionService; +import io.airbyte.data.services.DestinationService; +import io.airbyte.data.services.ScopedConfigurationService; +import io.airbyte.data.services.SourceService; +import io.airbyte.data.services.WorkspaceService; +import io.airbyte.data.services.shared.ConnectorVersionKey; +import io.airbyte.data.services.shared.StandardSyncQuery; import jakarta.inject.Singleton; import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; import java.util.List; +import java.util.Map; import java.util.UUID; +import java.util.stream.Collectors; +import kotlin.Pair; /** * Helper class containing logic related to breaking changes. @@ -20,6 +38,95 @@ @Singleton public class BreakingChangesHelper { + private final ScopedConfigurationService scopedConfigurationService; + private final WorkspaceService workspaceService; + private final DestinationService destinationService; + private final SourceService sourceService; + + public BreakingChangesHelper(final ScopedConfigurationService scopedConfigurationService, + final WorkspaceService workspaceService, + final DestinationService destinationService, + final SourceService sourceService) { + this.scopedConfigurationService = scopedConfigurationService; + this.workspaceService = workspaceService; + this.destinationService = destinationService; + this.sourceService = sourceService; + } + + /** + * Finds all active syncs on versions that are unsupported due to a breaking change. Results are + * given per workspace. + * + * @param actorType - type of actor + * @param actorDefinitionId - actor definition id + * @param unsupportedVersionIds - unsupported version ids + * @return list of workspace ids with active syncs on unsupported versions, along with the sync ids + */ + public List>> getBreakingActiveSyncsPerWorkspace(final ActorType actorType, + final UUID actorDefinitionId, + final List unsupportedVersionIds) + throws IOException { + // get actors pinned to unsupported versions (due to a breaking change) + final List pinnedValues = unsupportedVersionIds.stream().map(UUID::toString).toList(); + final List breakingChangePins = + scopedConfigurationService.listScopedConfigurationsWithValues( + ConnectorVersionKey.INSTANCE.getKey(), + ConfigResourceType.ACTOR_DEFINITION, + actorDefinitionId, + ConfigScopeType.ACTOR, + ConfigOriginType.BREAKING_CHANGE, + pinnedValues); + + // fetch actors and group by workspace + final Map> actorIdsByWorkspace = getActorIdsByWorkspace(actorType, breakingChangePins); + + // get affected syncs for each workspace + final List>> workspaceSyncIds = new ArrayList<>(); + for (final Map.Entry> entry : actorIdsByWorkspace.entrySet()) { + final UUID workspaceId = entry.getKey(); + final List actorIdsForWorkspace = entry.getValue(); + final StandardSyncQuery syncQuery = buildStandardSyncQuery(actorType, workspaceId, actorIdsForWorkspace); + final List activeSyncIds = workspaceService.listWorkspaceActiveSyncIds(syncQuery); + if (!activeSyncIds.isEmpty()) { + workspaceSyncIds.add(new Pair<>(workspaceId, activeSyncIds)); + } + } + + return workspaceSyncIds; + } + + private Map> getActorIdsByWorkspace(final ActorType actorType, final Collection scopedConfigs) + throws IOException { + final List actorIds = scopedConfigs.stream().map(ScopedConfiguration::getScopeId).toList(); + switch (actorType) { + case SOURCE -> { + return sourceService.listSourcesWithIds(actorIds).stream() + .collect(Collectors.groupingBy(SourceConnection::getWorkspaceId, + Collectors.mapping(SourceConnection::getSourceId, + Collectors.toList()))); + } + case DESTINATION -> { + return destinationService.listDestinationsWithIds(actorIds).stream() + .collect(Collectors.groupingBy(DestinationConnection::getWorkspaceId, + Collectors.mapping(DestinationConnection::getDestinationId, + Collectors.toList()))); + } + default -> throw new IllegalArgumentException("Actor type not supported: " + actorType); + } + } + + private StandardSyncQuery buildStandardSyncQuery(final ActorType actorType, final UUID workspaceId, final List actorIds) { + switch (actorType) { + case SOURCE -> { + return new StandardSyncQuery(workspaceId, actorIds, null, false); + } + case DESTINATION -> { + return new StandardSyncQuery(workspaceId, null, actorIds, false); + } + default -> throw new IllegalArgumentException("Actor type not supported: " + actorType); + } + } + /** * Given a list of breaking changes and the current default version, filter for breaking changes * that would affect the current default version. This is used to avoid considering breaking changes diff --git a/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/ActorDefinitionVersionHelperTest.java b/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/ActorDefinitionVersionHelperTest.java index 6ef5dd547a6..780741ef936 100644 --- a/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/ActorDefinitionVersionHelperTest.java +++ b/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/ActorDefinitionVersionHelperTest.java @@ -11,10 +11,8 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoInteractions; -import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; import io.airbyte.commons.json.Jsons; @@ -26,7 +24,6 @@ import io.airbyte.config.StandardDestinationDefinition; import io.airbyte.config.StandardSourceDefinition; import io.airbyte.config.persistence.ActorDefinitionVersionHelper.ActorDefinitionVersionWithOverrideStatus; -import io.airbyte.config.persistence.ConfigRepository.StandardSyncQuery; import io.airbyte.config.persistence.version_overrides.ConfigurationDefinitionVersionOverrideProvider; import io.airbyte.config.persistence.version_overrides.DefinitionVersionOverrideProvider; import io.airbyte.featureflag.EnableConfigurationOverrideProvider; @@ -41,7 +38,6 @@ import java.util.Map; import java.util.Optional; import java.util.UUID; -import kotlin.Pair; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; @@ -384,224 +380,6 @@ void testGetDefaultDestinationVersionWithNoDefaultThrows() { assertTrue(exception.getMessage().contains("Default version for destination is not set")); } - @Test - void testGetActiveWorkspaceSyncsWithSourceVersionIds() throws JsonValidationException, ConfigNotFoundException, IOException { - when(mFeatureFlagClient.boolVariation(eq(UseActorScopedDefaultVersions.INSTANCE), any())).thenReturn(true); - - final StandardSourceDefinition sourceDefinition = new StandardSourceDefinition() - .withSourceDefinitionId(ACTOR_DEFINITION_ID); - - final ActorDefinitionVersion ADV_1_0_0 = new ActorDefinitionVersion().withVersionId(UUID.randomUUID()).withDockerImageTag("1.0.0"); - final ActorDefinitionVersion ADV_2_0_0 = new ActorDefinitionVersion().withVersionId(UUID.randomUUID()).withDockerImageTag("2.0.0"); - - when(mConfigRepository.getActorDefinitionVersion(ADV_1_0_0.getVersionId())) - .thenReturn(ADV_1_0_0); - when(mConfigRepository.getActorDefinitionVersion(ADV_2_0_0.getVersionId())) - .thenReturn(ADV_2_0_0); - - final SourceConnection sourceConnection = new SourceConnection() - .withWorkspaceId(WORKSPACE_ID) - .withSourceId(UUID.randomUUID()) - .withDefaultVersionId(ADV_1_0_0.getVersionId()); - final SourceConnection sourceConnection2 = new SourceConnection() - .withWorkspaceId(WORKSPACE_ID) - .withSourceId(UUID.randomUUID()) - .withDefaultVersionId(ADV_2_0_0.getVersionId()); - final SourceConnection sourceConnection3 = new SourceConnection() - .withWorkspaceId(WORKSPACE_ID_2) - .withSourceId(UUID.randomUUID()) - .withDefaultVersionId(ADV_2_0_0.getVersionId()); - - when(mConfigRepository.getSourceConnection(sourceConnection.getSourceId())) - .thenReturn(sourceConnection); - when(mConfigRepository.getSourceConnection(sourceConnection2.getSourceId())) - .thenReturn(sourceConnection2); - when(mConfigRepository.getSourceConnection(sourceConnection3.getSourceId())) - .thenReturn(sourceConnection3); - - when(mFFOverrideProvider.getOverride(ActorType.SOURCE, sourceDefinition.getSourceDefinitionId(), WORKSPACE_ID, sourceConnection.getSourceId(), - ADV_1_0_0)) - .thenReturn(Optional.empty()); - when(mFFOverrideProvider.getOverride(ActorType.SOURCE, sourceDefinition.getSourceDefinitionId(), WORKSPACE_ID, sourceConnection2.getSourceId(), - ADV_2_0_0)) - .thenReturn(Optional.empty()); - when(mFFOverrideProvider.getOverride(ActorType.SOURCE, sourceDefinition.getSourceDefinitionId(), WORKSPACE_ID_2, sourceConnection3.getSourceId(), - ADV_2_0_0)) - .thenReturn(Optional.empty()); - - final SourceConnection sourceWithOverride = new SourceConnection() - .withWorkspaceId(WORKSPACE_ID) - .withSourceId(UUID.randomUUID()) - .withDefaultVersionId(ADV_2_0_0.getVersionId()); - when(mConfigRepository.getSourceConnection(sourceWithOverride.getSourceId())) - .thenReturn(sourceWithOverride); - when(mFFOverrideProvider.getOverride(ActorType.SOURCE, sourceDefinition.getSourceDefinitionId(), WORKSPACE_ID, sourceWithOverride.getSourceId(), - ADV_2_0_0)) - .thenReturn(Optional.of(new ActorDefinitionVersionWithOverrideStatus(ADV_1_0_0, true))); - - final List unsupportedVersionIds = List.of(ADV_1_0_0.getVersionId(), ADV_2_0_0.getVersionId()); - when(mConfigRepository.listSourcesWithVersionIds(unsupportedVersionIds)) - .thenReturn(List.of(sourceConnection, sourceConnection2, sourceConnection3, sourceWithOverride)); - - final StandardSyncQuery workspaceQuery1 = new StandardSyncQuery( - WORKSPACE_ID, - List.of(sourceConnection.getSourceId(), sourceConnection2.getSourceId()), - null, false); - final List workspaceSyncs1 = List.of(UUID.randomUUID(), UUID.randomUUID(), UUID.randomUUID(), UUID.randomUUID(), UUID.randomUUID()); - when(mConfigRepository.listWorkspaceActiveSyncIds(workspaceQuery1)).thenReturn(workspaceSyncs1); - - final StandardSyncQuery workspaceQuery2 = new StandardSyncQuery( - WORKSPACE_ID_2, - List.of(sourceConnection3.getSourceId()), - null, false); - final List workspaceSyncs2 = List.of(UUID.randomUUID()); - when(mConfigRepository.listWorkspaceActiveSyncIds(workspaceQuery2)).thenReturn(workspaceSyncs2); - - final List>> expectedWorkspaceSyncIds = List.of( - new Pair<>(WORKSPACE_ID, workspaceSyncs1), - new Pair<>(WORKSPACE_ID_2, workspaceSyncs2)); - - final List>> actualWorkspaceSyncIds = - actorDefinitionVersionHelper.getActiveWorkspaceSyncsWithSourceVersionIds(sourceDefinition, unsupportedVersionIds); - assertTrue(expectedWorkspaceSyncIds.containsAll(actualWorkspaceSyncIds)); - - verify(mConfigRepository).listSourcesWithVersionIds(unsupportedVersionIds); - verify(mConfigRepository).listWorkspaceActiveSyncIds(workspaceQuery1); - verify(mConfigRepository).listWorkspaceActiveSyncIds(workspaceQuery2); - verify(mConfigRepository).getSourceConnection(sourceConnection.getSourceId()); - verify(mConfigRepository).getSourceConnection(sourceConnection2.getSourceId()); - verify(mConfigRepository).getSourceConnection(sourceConnection3.getSourceId()); - verify(mConfigRepository).getSourceConnection(sourceWithOverride.getSourceId()); - verify(mConfigRepository).getActorDefinitionVersion(ADV_1_0_0.getVersionId()); - verify(mConfigRepository, times(3)).getActorDefinitionVersion(ADV_2_0_0.getVersionId()); - verify(mFFOverrideProvider).getOverride(ActorType.SOURCE, sourceDefinition.getSourceDefinitionId(), WORKSPACE_ID, sourceConnection.getSourceId(), - ADV_1_0_0); - verify(mFFOverrideProvider).getOverride(ActorType.SOURCE, sourceDefinition.getSourceDefinitionId(), WORKSPACE_ID, sourceConnection2.getSourceId(), - ADV_2_0_0); - verify(mFFOverrideProvider).getOverride(ActorType.SOURCE, sourceDefinition.getSourceDefinitionId(), WORKSPACE_ID_2, - sourceConnection3.getSourceId(), - ADV_2_0_0); - verify(mFFOverrideProvider).getOverride(ActorType.SOURCE, sourceDefinition.getSourceDefinitionId(), WORKSPACE_ID, - sourceWithOverride.getSourceId(), - ADV_2_0_0); - - verifyNoMoreInteractions(mConfigRepository); - verifyNoMoreInteractions(mFFOverrideProvider); - } - - @Test - void testGetActiveWorkspaceSyncsWithDestinationVersionIds() throws JsonValidationException, ConfigNotFoundException, IOException { - when(mFeatureFlagClient.boolVariation(eq(UseActorScopedDefaultVersions.INSTANCE), any())).thenReturn(true); - - final StandardDestinationDefinition destinationDefinition = new StandardDestinationDefinition() - .withDestinationDefinitionId(ACTOR_DEFINITION_ID); - - final ActorDefinitionVersion ADV_1_0_0 = new ActorDefinitionVersion().withVersionId(UUID.randomUUID()).withDockerImageTag("1.0.0"); - final ActorDefinitionVersion ADV_2_0_0 = new ActorDefinitionVersion().withVersionId(UUID.randomUUID()).withDockerImageTag("2.0.0"); - - when(mConfigRepository.getActorDefinitionVersion(ADV_1_0_0.getVersionId())) - .thenReturn(ADV_1_0_0); - when(mConfigRepository.getActorDefinitionVersion(ADV_2_0_0.getVersionId())) - .thenReturn(ADV_2_0_0); - - final DestinationConnection destinationConnection = new DestinationConnection() - .withWorkspaceId(WORKSPACE_ID) - .withDestinationId(UUID.randomUUID()) - .withDefaultVersionId(ADV_1_0_0.getVersionId()); - final DestinationConnection destinationConnection2 = new DestinationConnection() - .withWorkspaceId(WORKSPACE_ID) - .withDestinationId(UUID.randomUUID()) - .withDefaultVersionId(ADV_2_0_0.getVersionId()); - final DestinationConnection destinationConnection3 = new DestinationConnection() - .withWorkspaceId(WORKSPACE_ID_2) - .withDestinationId(UUID.randomUUID()) - .withDefaultVersionId(ADV_2_0_0.getVersionId()); - - when(mConfigRepository.getDestinationConnection(destinationConnection.getDestinationId())) - .thenReturn(destinationConnection); - when(mConfigRepository.getDestinationConnection(destinationConnection2.getDestinationId())) - .thenReturn(destinationConnection2); - when(mConfigRepository.getDestinationConnection(destinationConnection3.getDestinationId())) - .thenReturn(destinationConnection3); - - when(mFFOverrideProvider.getOverride(ActorType.DESTINATION, destinationDefinition.getDestinationDefinitionId(), WORKSPACE_ID, - destinationConnection.getDestinationId(), - ADV_1_0_0)) - .thenReturn(Optional.empty()); - when(mFFOverrideProvider.getOverride(ActorType.DESTINATION, destinationDefinition.getDestinationDefinitionId(), WORKSPACE_ID, - destinationConnection2.getDestinationId(), - ADV_2_0_0)) - .thenReturn(Optional.empty()); - when(mFFOverrideProvider.getOverride(ActorType.DESTINATION, destinationDefinition.getDestinationDefinitionId(), WORKSPACE_ID_2, - destinationConnection3.getDestinationId(), - ADV_2_0_0)) - .thenReturn(Optional.empty()); - - final DestinationConnection destinationWithOverride = new DestinationConnection() - .withWorkspaceId(WORKSPACE_ID) - .withDestinationId(UUID.randomUUID()) - .withDefaultVersionId(ADV_2_0_0.getVersionId()); - when(mConfigRepository.getDestinationConnection(destinationWithOverride.getDestinationId())) - .thenReturn(destinationWithOverride); - when(mFFOverrideProvider.getOverride(ActorType.DESTINATION, destinationDefinition.getDestinationDefinitionId(), WORKSPACE_ID, - destinationWithOverride.getDestinationId(), - ADV_2_0_0)) - .thenReturn(Optional.of(new ActorDefinitionVersionWithOverrideStatus(ADV_1_0_0, true))); - - final List unsupportedVersionIds = List.of(ADV_1_0_0.getVersionId(), ADV_2_0_0.getVersionId()); - when(mConfigRepository.listDestinationsWithVersionIds(unsupportedVersionIds)) - .thenReturn(List.of(destinationConnection, destinationConnection2, destinationConnection3, destinationWithOverride)); - - final StandardSyncQuery workspaceQuery1 = new StandardSyncQuery( - WORKSPACE_ID, - null, - List.of(destinationConnection.getDestinationId(), destinationConnection2.getDestinationId()), - false); - final List workspaceSyncs1 = List.of(UUID.randomUUID(), UUID.randomUUID(), UUID.randomUUID(), UUID.randomUUID(), UUID.randomUUID()); - when(mConfigRepository.listWorkspaceActiveSyncIds(workspaceQuery1)).thenReturn(workspaceSyncs1); - - final StandardSyncQuery workspaceQuery2 = new StandardSyncQuery( - WORKSPACE_ID_2, - null, - List.of(destinationConnection3.getDestinationId()), - false); - final List workspaceSyncs2 = List.of(UUID.randomUUID()); - when(mConfigRepository.listWorkspaceActiveSyncIds(workspaceQuery2)).thenReturn(workspaceSyncs2); - - final List>> expectedWorkspaceSyncIds = List.of( - new Pair<>(WORKSPACE_ID, workspaceSyncs1), - new Pair<>(WORKSPACE_ID_2, workspaceSyncs2)); - - final List>> actualWorkspaceSyncIds = - actorDefinitionVersionHelper.getActiveWorkspaceSyncsWithDestinationVersionIds(destinationDefinition, unsupportedVersionIds); - assertTrue(expectedWorkspaceSyncIds.containsAll(actualWorkspaceSyncIds)); - - verify(mConfigRepository).listDestinationsWithVersionIds(unsupportedVersionIds); - verify(mConfigRepository).listWorkspaceActiveSyncIds(workspaceQuery1); - verify(mConfigRepository).listWorkspaceActiveSyncIds(workspaceQuery2); - verify(mConfigRepository).getDestinationConnection(destinationConnection.getDestinationId()); - verify(mConfigRepository).getDestinationConnection(destinationConnection2.getDestinationId()); - verify(mConfigRepository).getDestinationConnection(destinationConnection3.getDestinationId()); - verify(mConfigRepository).getDestinationConnection(destinationWithOverride.getDestinationId()); - verify(mConfigRepository).getActorDefinitionVersion(ADV_1_0_0.getVersionId()); - verify(mConfigRepository, times(3)).getActorDefinitionVersion(ADV_2_0_0.getVersionId()); - verify(mFFOverrideProvider).getOverride(ActorType.DESTINATION, destinationDefinition.getDestinationDefinitionId(), WORKSPACE_ID, - destinationConnection.getDestinationId(), - ADV_1_0_0); - verify(mFFOverrideProvider).getOverride(ActorType.DESTINATION, destinationDefinition.getDestinationDefinitionId(), WORKSPACE_ID, - destinationConnection2.getDestinationId(), - ADV_2_0_0); - verify(mFFOverrideProvider).getOverride(ActorType.DESTINATION, destinationDefinition.getDestinationDefinitionId(), WORKSPACE_ID_2, - destinationConnection3.getDestinationId(), - ADV_2_0_0); - verify(mFFOverrideProvider).getOverride(ActorType.DESTINATION, destinationDefinition.getDestinationDefinitionId(), WORKSPACE_ID, - destinationWithOverride.getDestinationId(), - ADV_2_0_0); - - verifyNoMoreInteractions(mConfigRepository); - verifyNoMoreInteractions(mFFOverrideProvider); - } - @ParameterizedTest @CsvSource({"alpha,generally_available,true", "beta,generally_available,true", "generally_available,generally_available,false", "alpha,beta,true"}) void testHasAlphaOrBeta(final String sourceReleaseStageStr, final String destinationReleaseStageStr, final boolean expected) { diff --git a/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/BreakingChangesHelperTest.java b/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/BreakingChangesHelperTest.java index 9d15f1b157a..cf7f6b79fa5 100644 --- a/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/BreakingChangesHelperTest.java +++ b/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/BreakingChangesHelperTest.java @@ -13,14 +13,200 @@ import io.airbyte.commons.version.Version; import io.airbyte.config.ActorDefinitionBreakingChange; import io.airbyte.config.ActorDefinitionVersion; +import io.airbyte.config.ActorType; +import io.airbyte.config.ConfigOriginType; +import io.airbyte.config.ConfigResourceType; +import io.airbyte.config.ConfigScopeType; +import io.airbyte.config.DestinationConnection; +import io.airbyte.config.ScopedConfiguration; +import io.airbyte.config.SourceConnection; import io.airbyte.data.exceptions.ConfigNotFoundException; import io.airbyte.data.services.ActorDefinitionService; +import io.airbyte.data.services.DestinationService; +import io.airbyte.data.services.ScopedConfigurationService; +import io.airbyte.data.services.SourceService; +import io.airbyte.data.services.WorkspaceService; +import io.airbyte.data.services.shared.ConnectorVersionKey; +import io.airbyte.data.services.shared.StandardSyncQuery; import java.io.IOException; +import java.util.Comparator; import java.util.List; +import java.util.UUID; +import java.util.stream.Stream; +import kotlin.Pair; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; class BreakingChangesHelperTest { + private static final UUID ACTOR_DEFINITION_ID = UUID.randomUUID(); + + private WorkspaceService mWorkspaceService; + private ScopedConfigurationService mScopedConfigurationService; + private SourceService mSourceService; + private DestinationService mDestinationService; + + private BreakingChangesHelper breakingChangesHelper; + + @BeforeEach + void setup() { + mWorkspaceService = mock(WorkspaceService.class); + mScopedConfigurationService = mock(ScopedConfigurationService.class); + mSourceService = mock(SourceService.class); + mDestinationService = mock(DestinationService.class); + + breakingChangesHelper = new BreakingChangesHelper(mScopedConfigurationService, mWorkspaceService, mDestinationService, mSourceService); + } + + @Test + void testGetBreakingActiveSyncsPerWorkspaceWithSource() throws IOException { + final UUID unsupportedVersionId1 = UUID.randomUUID(); + final UUID unsupportedVersionId2 = UUID.randomUUID(); + + final List unsupportedVersionIds = List.of(unsupportedVersionId1, unsupportedVersionId2); + + final UUID workspaceId1 = UUID.randomUUID(); + final SourceConnection source1 = new SourceConnection() + .withSourceId(UUID.randomUUID()) + .withWorkspaceId(workspaceId1); + final SourceConnection source2 = new SourceConnection() + .withSourceId(UUID.randomUUID()) + .withWorkspaceId(workspaceId1); + + final UUID workspaceId2 = UUID.randomUUID(); + final SourceConnection source3 = new SourceConnection() + .withSourceId(UUID.randomUUID()) + .withWorkspaceId(workspaceId2); + + final UUID workspaceId3 = UUID.randomUUID(); + final SourceConnection source4 = new SourceConnection() + .withSourceId(UUID.randomUUID()) + .withWorkspaceId(workspaceId3); + + final List sourcesOnUnsupportedVersions = List.of(source1, source2, source3, source4); + final List sourceIds = sourcesOnUnsupportedVersions.stream().map(SourceConnection::getSourceId).toList(); + + when(mSourceService.listSourcesWithIds(sourceIds)).thenReturn(sourcesOnUnsupportedVersions); + + final List existingPins = sourceIds.stream().map(sourceId -> new ScopedConfiguration() + .withScopeType(ConfigScopeType.ACTOR) + .withScopeId(sourceId)).toList(); + + when(mScopedConfigurationService.listScopedConfigurationsWithValues(ConnectorVersionKey.INSTANCE.getKey(), ConfigResourceType.ACTOR_DEFINITION, + ACTOR_DEFINITION_ID, ConfigScopeType.ACTOR, ConfigOriginType.BREAKING_CHANGE, unsupportedVersionIds.stream().map(UUID::toString).toList())) + .thenReturn(existingPins); + + final StandardSyncQuery workspace1SyncQuery = + new StandardSyncQuery(workspaceId1, List.of(source1.getSourceId(), source2.getSourceId()), null, false); + final List workspace1ActiveSyncIds = List.of(UUID.randomUUID(), UUID.randomUUID()); + when(mWorkspaceService.listWorkspaceActiveSyncIds(workspace1SyncQuery)).thenReturn(workspace1ActiveSyncIds); + + final StandardSyncQuery workspace2SyncQuery = + new StandardSyncQuery(workspaceId2, List.of(source3.getSourceId()), null, false); + final List workspace2ActiveSyncIds = List.of(UUID.randomUUID()); + when(mWorkspaceService.listWorkspaceActiveSyncIds(workspace2SyncQuery)).thenReturn(workspace2ActiveSyncIds); + + final StandardSyncQuery workspace3SyncQuery = + new StandardSyncQuery(workspaceId3, List.of(source4.getSourceId()), null, false); + final List workspace3ActiveSyncIds = List.of(); + when(mWorkspaceService.listWorkspaceActiveSyncIds(workspace3SyncQuery)).thenReturn(workspace3ActiveSyncIds); + + final List>> expectedResult = Stream.of( + new Pair<>(workspaceId1, workspace1ActiveSyncIds), + new Pair<>(workspaceId2, workspace2ActiveSyncIds)).sorted(Comparator.comparing(Pair::getFirst)).toList(); + + final List>> result = + breakingChangesHelper.getBreakingActiveSyncsPerWorkspace(ActorType.SOURCE, ACTOR_DEFINITION_ID, unsupportedVersionIds); + + assertEquals(2, result.size()); + + final List>> sortedResult = result.stream().sorted(Comparator.comparing(Pair::getFirst)).toList(); + assertEquals(expectedResult, sortedResult); + + verify(mSourceService).listSourcesWithIds(sourceIds); + verify(mScopedConfigurationService).listScopedConfigurationsWithValues(ConnectorVersionKey.INSTANCE.getKey(), ConfigResourceType.ACTOR_DEFINITION, + ACTOR_DEFINITION_ID, ConfigScopeType.ACTOR, ConfigOriginType.BREAKING_CHANGE, unsupportedVersionIds.stream().map(UUID::toString).toList()); + verify(mWorkspaceService).listWorkspaceActiveSyncIds(workspace1SyncQuery); + verify(mWorkspaceService).listWorkspaceActiveSyncIds(workspace2SyncQuery); + verify(mWorkspaceService).listWorkspaceActiveSyncIds(workspace3SyncQuery); + verifyNoMoreInteractions(mSourceService, mScopedConfigurationService, mWorkspaceService); + } + + @Test + void testGetBreakingActiveSyncsPerWorkspaceWithDestination() throws IOException { + final UUID unsupportedVersionId1 = UUID.randomUUID(); + final UUID unsupportedVersionId2 = UUID.randomUUID(); + + final List unsupportedVersionIds = List.of(unsupportedVersionId1, unsupportedVersionId2); + + final UUID workspaceId1 = UUID.randomUUID(); + final DestinationConnection destination1 = new DestinationConnection() + .withDestinationId(UUID.randomUUID()) + .withWorkspaceId(workspaceId1); + final DestinationConnection destination2 = new DestinationConnection() + .withDestinationId(UUID.randomUUID()) + .withWorkspaceId(workspaceId1); + + final UUID workspaceId2 = UUID.randomUUID(); + final DestinationConnection destination3 = new DestinationConnection() + .withDestinationId(UUID.randomUUID()) + .withWorkspaceId(workspaceId2); + + final UUID workspaceId3 = UUID.randomUUID(); + final DestinationConnection destination4 = new DestinationConnection() + .withDestinationId(UUID.randomUUID()) + .withWorkspaceId(workspaceId3); + + final List destinationsOnUnsupportedVersions = List.of(destination1, destination2, destination3, destination4); + final List destinationIds = destinationsOnUnsupportedVersions.stream().map(DestinationConnection::getDestinationId).toList(); + + when(mDestinationService.listDestinationsWithIds(destinationIds)).thenReturn(destinationsOnUnsupportedVersions); + + final List existingPins = destinationIds.stream().map(destinationId -> new ScopedConfiguration() + .withScopeType(ConfigScopeType.ACTOR) + .withScopeId(destinationId)).toList(); + + when(mScopedConfigurationService.listScopedConfigurationsWithValues(ConnectorVersionKey.INSTANCE.getKey(), ConfigResourceType.ACTOR_DEFINITION, + ACTOR_DEFINITION_ID, ConfigScopeType.ACTOR, ConfigOriginType.BREAKING_CHANGE, unsupportedVersionIds.stream().map(UUID::toString).toList())) + .thenReturn(existingPins); + + final StandardSyncQuery workspace1SyncQuery = + new StandardSyncQuery(workspaceId1, null, + List.of(destination1.getDestinationId(), destination2.getDestinationId()), false); + final List workspace1ActiveSyncIds = List.of(UUID.randomUUID(), UUID.randomUUID()); + when(mWorkspaceService.listWorkspaceActiveSyncIds(workspace1SyncQuery)).thenReturn(workspace1ActiveSyncIds); + + final StandardSyncQuery workspace2SyncQuery = new StandardSyncQuery(workspaceId2, null, + List.of(destination3.getDestinationId()), false); + final List workspace2ActiveSyncIds = List.of(UUID.randomUUID()); + when(mWorkspaceService.listWorkspaceActiveSyncIds(workspace2SyncQuery)).thenReturn(workspace2ActiveSyncIds); + + final StandardSyncQuery workspace3SyncQuery = new StandardSyncQuery(workspaceId3, null, + List.of(destination4.getDestinationId()), false); + final List workspace3ActiveSyncIds = List.of(); + when(mWorkspaceService.listWorkspaceActiveSyncIds(workspace3SyncQuery)).thenReturn(workspace3ActiveSyncIds); + + final List>> expectedResult = Stream.of( + new Pair<>(workspaceId1, workspace1ActiveSyncIds), + new Pair<>(workspaceId2, workspace2ActiveSyncIds)).sorted(Comparator.comparing(Pair::getFirst)).toList(); + + final List>> result = + breakingChangesHelper.getBreakingActiveSyncsPerWorkspace(ActorType.DESTINATION, ACTOR_DEFINITION_ID, unsupportedVersionIds); + + assertEquals(2, result.size()); + + final List>> sortedResult = result.stream().sorted(Comparator.comparing(Pair::getFirst)).toList(); + assertEquals(expectedResult, sortedResult); + + verify(mDestinationService).listDestinationsWithIds(destinationIds); + verify(mScopedConfigurationService).listScopedConfigurationsWithValues(ConnectorVersionKey.INSTANCE.getKey(), ConfigResourceType.ACTOR_DEFINITION, + ACTOR_DEFINITION_ID, ConfigScopeType.ACTOR, ConfigOriginType.BREAKING_CHANGE, unsupportedVersionIds.stream().map(UUID::toString).toList()); + verify(mWorkspaceService).listWorkspaceActiveSyncIds(workspace1SyncQuery); + verify(mWorkspaceService).listWorkspaceActiveSyncIds(workspace2SyncQuery); + verify(mWorkspaceService).listWorkspaceActiveSyncIds(workspace3SyncQuery); + verifyNoMoreInteractions(mSourceService, mScopedConfigurationService, mWorkspaceService); + } + @Test void testGetLastApplicableBreakingChange() throws ConfigNotFoundException, IOException { final ActorDefinitionVersion defaultVersion = new ActorDefinitionVersion() diff --git a/airbyte-config/init/src/main/java/io/airbyte/config/init/SupportStateUpdater.java b/airbyte-config/init/src/main/java/io/airbyte/config/init/SupportStateUpdater.java index e379930330f..0dc29f20be6 100644 --- a/airbyte-config/init/src/main/java/io/airbyte/config/init/SupportStateUpdater.java +++ b/airbyte-config/init/src/main/java/io/airbyte/config/init/SupportStateUpdater.java @@ -16,7 +16,6 @@ import io.airbyte.config.StandardDestinationDefinition; import io.airbyte.config.StandardSourceDefinition; import io.airbyte.config.init.BreakingChangeNotificationHelper.BreakingChangeNotificationData; -import io.airbyte.config.persistence.ActorDefinitionVersionHelper; import io.airbyte.config.persistence.BreakingChangesHelper; import io.airbyte.data.exceptions.ConfigNotFoundException; import io.airbyte.data.services.ActorDefinitionService; @@ -25,7 +24,6 @@ import io.airbyte.featureflag.FeatureFlagClient; import io.airbyte.featureflag.NotifyBreakingChangesOnSupportStateUpdate; import io.airbyte.featureflag.Workspace; -import io.airbyte.validation.json.JsonValidationException; import jakarta.inject.Singleton; import java.io.IOException; import java.time.LocalDate; @@ -70,14 +68,14 @@ public static SupportStateUpdate merge(final SupportStateUpdate a, final Support private final SourceService sourceService; private final DestinationService destinationService; private final FeatureFlagClient featureFlagClient; - private final ActorDefinitionVersionHelper actorDefinitionVersionHelper; + private final BreakingChangesHelper breakingChangesHelper; private final BreakingChangeNotificationHelper breakingChangeNotificationHelper; public SupportStateUpdater(final ActorDefinitionService actorDefinitionService, final SourceService sourceService, final DestinationService destinationService, final DeploymentMode deploymentMode, - final ActorDefinitionVersionHelper actorDefinitionVersionHelper, + final BreakingChangesHelper breakingChangesHelper, final BreakingChangeNotificationHelper breakingChangeNotificationHelper, final FeatureFlagClient featureFlagClient) { this.deploymentMode = deploymentMode; @@ -85,7 +83,7 @@ public SupportStateUpdater(final ActorDefinitionService actorDefinitionService, this.sourceService = sourceService; this.destinationService = destinationService; this.featureFlagClient = featureFlagClient; - this.actorDefinitionVersionHelper = actorDefinitionVersionHelper; + this.breakingChangesHelper = breakingChangesHelper; this.breakingChangeNotificationHelper = breakingChangeNotificationHelper; } @@ -149,8 +147,7 @@ SupportStateUpdate getSupportStateUpdate(final Version currentDefaultVersion, /** * Updates the version support states for all source and destination definitions. */ - public void updateSupportStates() - throws IOException, JsonValidationException, ConfigNotFoundException, io.airbyte.config.persistence.ConfigNotFoundException { + public void updateSupportStates() throws IOException, ConfigNotFoundException { updateSupportStates(LocalDate.now()); } @@ -159,8 +156,7 @@ public void updateSupportStates() * reference date, and disables syncs with unsupported versions. */ @VisibleForTesting - void updateSupportStates(final LocalDate referenceDate) - throws IOException, JsonValidationException, ConfigNotFoundException, io.airbyte.config.persistence.ConfigNotFoundException { + void updateSupportStates(final LocalDate referenceDate) throws IOException, ConfigNotFoundException { log.info("Updating support states for all definitions"); final List sourceDefinitions = sourceService.listPublicSourceDefinitions(false); final List destinationDefinitions = destinationService.listPublicDestinationDefinitions(false); @@ -226,10 +222,11 @@ BreakingChangeNotificationData buildSourceNotificationData(final StandardSourceD final ActorDefinitionBreakingChange breakingChange, final List versionsBeforeUpdate, final SupportStateUpdate supportStateUpdate) - throws JsonValidationException, ConfigNotFoundException, IOException, io.airbyte.config.persistence.ConfigNotFoundException { + throws IOException { final List newlyDeprecatedVersionIds = getNewlyDeprecatedVersionIds(versionsBeforeUpdate, supportStateUpdate); final List>> workspaceSyncIds = - actorDefinitionVersionHelper.getActiveWorkspaceSyncsWithSourceVersionIds(sourceDefinition, newlyDeprecatedVersionIds); + breakingChangesHelper.getBreakingActiveSyncsPerWorkspace(ActorType.SOURCE, sourceDefinition.getSourceDefinitionId(), + newlyDeprecatedVersionIds); final List workspaceIds = workspaceSyncIds.stream().map(Pair::getFirst).toList(); return new BreakingChangeNotificationData( ActorType.SOURCE, @@ -243,10 +240,11 @@ BreakingChangeNotificationData buildDestinationNotificationData(final StandardDe final ActorDefinitionBreakingChange breakingChange, final List versionsBeforeUpdate, final SupportStateUpdate supportStateUpdate) - throws JsonValidationException, ConfigNotFoundException, IOException, io.airbyte.config.persistence.ConfigNotFoundException { + throws IOException { final List newlyDeprecatedVersionIds = getNewlyDeprecatedVersionIds(versionsBeforeUpdate, supportStateUpdate); final List>> workspaceSyncIds = - actorDefinitionVersionHelper.getActiveWorkspaceSyncsWithDestinationVersionIds(destinationDefinition, newlyDeprecatedVersionIds); + breakingChangesHelper.getBreakingActiveSyncsPerWorkspace(ActorType.DESTINATION, destinationDefinition.getDestinationDefinitionId(), + newlyDeprecatedVersionIds); final List workspaceIds = workspaceSyncIds.stream().map(Pair::getFirst).toList(); return new BreakingChangeNotificationData( ActorType.DESTINATION, diff --git a/airbyte-config/init/src/test/java/io/airbyte/config/init/SupportStateUpdaterTest.java b/airbyte-config/init/src/test/java/io/airbyte/config/init/SupportStateUpdaterTest.java index 00ad2278076..d41fb711d94 100644 --- a/airbyte-config/init/src/test/java/io/airbyte/config/init/SupportStateUpdaterTest.java +++ b/airbyte-config/init/src/test/java/io/airbyte/config/init/SupportStateUpdaterTest.java @@ -22,7 +22,7 @@ import io.airbyte.config.StandardSourceDefinition; import io.airbyte.config.init.BreakingChangeNotificationHelper.BreakingChangeNotificationData; import io.airbyte.config.init.SupportStateUpdater.SupportStateUpdate; -import io.airbyte.config.persistence.ActorDefinitionVersionHelper; +import io.airbyte.config.persistence.BreakingChangesHelper; import io.airbyte.data.exceptions.ConfigNotFoundException; import io.airbyte.data.services.ActorDefinitionService; import io.airbyte.data.services.DestinationService; @@ -52,7 +52,7 @@ class SupportStateUpdaterTest { private ActorDefinitionService mActorDefinitionService; private SourceService mSourceService; private DestinationService mDestinationService; - private ActorDefinitionVersionHelper mActorDefinitionVersionHelper; + private BreakingChangesHelper mBreakingChangesHelper; private BreakingChangeNotificationHelper mBreakingChangeNotificationHelper; private SupportStateUpdater supportStateUpdater; @@ -63,13 +63,13 @@ void setup() { mSourceService = mock(SourceService.class); mDestinationService = mock(DestinationService.class); mBreakingChangeNotificationHelper = mock(BreakingChangeNotificationHelper.class); - mActorDefinitionVersionHelper = mock(ActorDefinitionVersionHelper.class); + mBreakingChangesHelper = mock(BreakingChangesHelper.class); final FeatureFlagClient featureFlagClient = mock(TestClient.class); when(featureFlagClient.boolVariation(NotifyBreakingChangesOnSupportStateUpdate.INSTANCE, new Workspace(ANONYMOUS))) .thenReturn(true); supportStateUpdater = new SupportStateUpdater(mActorDefinitionService, mSourceService, mDestinationService, - DeploymentMode.CLOUD, mActorDefinitionVersionHelper, + DeploymentMode.CLOUD, mBreakingChangesHelper, mBreakingChangeNotificationHelper, featureFlagClient); } @@ -193,8 +193,9 @@ void testUpdateSupportStates() when(mActorDefinitionService.getActorDefinitionVersion(DEST_V1_0_0.getVersionId())).thenReturn(DEST_V1_0_0); final List workspaceIdsToNotify = List.of(UUID.randomUUID(), UUID.randomUUID()); - when(mActorDefinitionVersionHelper.getActiveWorkspaceSyncsWithDestinationVersionIds(destinationDefinition, List.of(DEST_V0_1_0.getVersionId()))) - .thenReturn(workspaceIdsToNotify.stream().map(id -> new Pair<>(id, List.of(UUID.randomUUID()))).toList()); + when(mBreakingChangesHelper.getBreakingActiveSyncsPerWorkspace(ActorType.DESTINATION, destinationDefinition.getDestinationDefinitionId(), + List.of(DEST_V0_1_0.getVersionId()))) + .thenReturn(workspaceIdsToNotify.stream().map(id -> new Pair<>(id, List.of(UUID.randomUUID()))).toList()); supportStateUpdater.updateSupportStates(LocalDate.parse("2020-01-15")); @@ -212,10 +213,10 @@ void testUpdateSupportStates() verify(mActorDefinitionService).listActorDefinitionVersionsForDefinition(ACTOR_DEFINITION_ID); verify(mActorDefinitionService).listActorDefinitionVersionsForDefinition(destinationDefinitionId); verify(mActorDefinitionService).getActorDefinitionVersion(DEST_V1_0_0.getVersionId()); - verify(mActorDefinitionVersionHelper).getActiveWorkspaceSyncsWithDestinationVersionIds(destinationDefinition, + verify(mBreakingChangesHelper).getBreakingActiveSyncsPerWorkspace(ActorType.DESTINATION, destinationDefinition.getDestinationDefinitionId(), List.of(DEST_V0_1_0.getVersionId())); verifyNoMoreInteractions(mActorDefinitionService, mSourceService, mDestinationService); - verifyNoMoreInteractions(mActorDefinitionVersionHelper); + verifyNoMoreInteractions(mBreakingChangesHelper); verifyNoMoreInteractions(mBreakingChangeNotificationHelper); } @@ -319,8 +320,9 @@ void testBuildSourceNotificationData() new SupportStateUpdate(List.of(), List.of(ADV_1_0_0.getVersionId(), ADV_3_0_0.getVersionId()), List.of()); final List workspaceIds = List.of(UUID.randomUUID(), UUID.randomUUID()); - when(mActorDefinitionVersionHelper.getActiveWorkspaceSyncsWithSourceVersionIds(sourceDefinition, List.of(ADV_3_0_0.getVersionId()))) - .thenReturn(workspaceIds.stream().map(id -> new Pair<>(id, List.of(UUID.randomUUID(), UUID.randomUUID()))).toList()); + when(mBreakingChangesHelper.getBreakingActiveSyncsPerWorkspace(ActorType.SOURCE, sourceDefinition.getSourceDefinitionId(), + List.of(ADV_3_0_0.getVersionId()))) + .thenReturn(workspaceIds.stream().map(id -> new Pair<>(id, List.of(UUID.randomUUID(), UUID.randomUUID()))).toList()); final ActorDefinitionBreakingChange latestBreakingChange = new ActorDefinitionBreakingChange() .withMessage("Test Breaking Change"); @@ -331,7 +333,8 @@ void testBuildSourceNotificationData() supportStateUpdater.buildSourceNotificationData(sourceDefinition, latestBreakingChange, versionsBeforeUpdate, supportStateUpdate); assertEquals(expectedNotificationData, notificationData); - verify(mActorDefinitionVersionHelper).getActiveWorkspaceSyncsWithSourceVersionIds(sourceDefinition, List.of(ADV_3_0_0.getVersionId())); + verify(mBreakingChangesHelper).getBreakingActiveSyncsPerWorkspace(ActorType.SOURCE, sourceDefinition.getSourceDefinitionId(), + List.of(ADV_3_0_0.getVersionId())); } @Test @@ -350,8 +353,9 @@ void testBuildDestinationNotificationData() new SupportStateUpdate(List.of(), List.of(ADV_1_0_0.getVersionId(), ADV_3_0_0.getVersionId()), List.of()); final List workspaceIds = List.of(UUID.randomUUID(), UUID.randomUUID()); - when(mActorDefinitionVersionHelper.getActiveWorkspaceSyncsWithDestinationVersionIds(destinationDefinition, List.of(ADV_3_0_0.getVersionId()))) - .thenReturn(workspaceIds.stream().map(id -> new Pair<>(id, List.of(UUID.randomUUID(), UUID.randomUUID()))).toList()); + when(mBreakingChangesHelper.getBreakingActiveSyncsPerWorkspace(ActorType.DESTINATION, destinationDefinition.getDestinationDefinitionId(), + List.of(ADV_3_0_0.getVersionId()))) + .thenReturn(workspaceIds.stream().map(id -> new Pair<>(id, List.of(UUID.randomUUID(), UUID.randomUUID()))).toList()); final ActorDefinitionBreakingChange latestBreakingChange = new ActorDefinitionBreakingChange() .withMessage("Test Breaking Change 2"); @@ -362,7 +366,8 @@ void testBuildDestinationNotificationData() supportStateUpdater.buildDestinationNotificationData(destinationDefinition, latestBreakingChange, versionsBeforeUpdate, supportStateUpdate); assertEquals(expectedNotificationData, notificationData); - verify(mActorDefinitionVersionHelper).getActiveWorkspaceSyncsWithDestinationVersionIds(destinationDefinition, List.of(ADV_3_0_0.getVersionId())); + verify(mBreakingChangesHelper).getBreakingActiveSyncsPerWorkspace(ActorType.DESTINATION, destinationDefinition.getDestinationDefinitionId(), + List.of(ADV_3_0_0.getVersionId())); } } diff --git a/airbyte-data/src/main/java/io/airbyte/data/services/DestinationService.java b/airbyte-data/src/main/java/io/airbyte/data/services/DestinationService.java index 45d878c8bd9..62bb38e5f01 100644 --- a/airbyte-data/src/main/java/io/airbyte/data/services/DestinationService.java +++ b/airbyte-data/src/main/java/io/airbyte/data/services/DestinationService.java @@ -69,6 +69,8 @@ void writeConnectorMetadata(final StandardDestinationDefinition destinationDefin List listDestinationsWithVersionIds(final List actorDefinitionVersionIds) throws IOException; + List listDestinationsWithIds(final List destinationIds) throws IOException; + DestinationConnection getDestinationConnectionWithSecrets(UUID destinationId) throws JsonValidationException, ConfigNotFoundException, IOException; void writeDestinationConnectionWithSecrets( diff --git a/airbyte-data/src/main/java/io/airbyte/data/services/SourceService.java b/airbyte-data/src/main/java/io/airbyte/data/services/SourceService.java index f7d56b4a712..0fa6918e5d7 100644 --- a/airbyte-data/src/main/java/io/airbyte/data/services/SourceService.java +++ b/airbyte-data/src/main/java/io/airbyte/data/services/SourceService.java @@ -53,6 +53,8 @@ public interface SourceService { List getSourceAndDefinitionsFromSourceIds(List sourceIds) throws IOException; + List listSourcesWithIds(final List sourceIds) throws IOException; + void writeConnectorMetadata(final StandardSourceDefinition sourceDefinition, final ActorDefinitionVersion actorDefinitionVersion, final List breakingChangesForDefinition) diff --git a/airbyte-data/src/main/java/io/airbyte/data/services/impls/jooq/DestinationServiceJooqImpl.java b/airbyte-data/src/main/java/io/airbyte/data/services/impls/jooq/DestinationServiceJooqImpl.java index a7846fc1dc0..f217955d516 100644 --- a/airbyte-data/src/main/java/io/airbyte/data/services/impls/jooq/DestinationServiceJooqImpl.java +++ b/airbyte-data/src/main/java/io/airbyte/data/services/impls/jooq/DestinationServiceJooqImpl.java @@ -453,6 +453,18 @@ public List listDestinationsWithVersionIds( return result.stream().map(DbConverter::buildDestinationConnection).toList(); } + @Override + public List listDestinationsWithIds( + final List destinationIds) + throws IOException { + final Result result = database.query(ctx -> ctx.select(asterisk()) + .from(ACTOR) + .where(ACTOR.ACTOR_TYPE.eq(ActorType.destination)) + .and(ACTOR.ID.in(destinationIds)) + .andNot(ACTOR.TOMBSTONE).fetch()); + return result.stream().map(DbConverter::buildDestinationConnection).toList(); + } + private int writeActorDefinitionWorkspaceGrant(final UUID actorDefinitionId, final UUID scopeId, final io.airbyte.db.instance.configs.jooq.generated.enums.ScopeType scopeType, diff --git a/airbyte-data/src/main/java/io/airbyte/data/services/impls/jooq/SourceServiceJooqImpl.java b/airbyte-data/src/main/java/io/airbyte/data/services/impls/jooq/SourceServiceJooqImpl.java index 63fca04a09e..6963cc97439 100644 --- a/airbyte-data/src/main/java/io/airbyte/data/services/impls/jooq/SourceServiceJooqImpl.java +++ b/airbyte-data/src/main/java/io/airbyte/data/services/impls/jooq/SourceServiceJooqImpl.java @@ -442,6 +442,16 @@ public List listSourcesWithVersionIds( return result.stream().map(DbConverter::buildSourceConnection).toList(); } + @Override + public List listSourcesWithIds(final List sourceIds) throws IOException { + final Result result = database.query(ctx -> ctx.select(asterisk()) + .from(ACTOR) + .where(ACTOR.ACTOR_TYPE.eq(ActorType.source)) + .and(ACTOR.ID.in(sourceIds)) + .andNot(ACTOR.TOMBSTONE).fetch()); + return result.stream().map(DbConverter::buildSourceConnection).toList(); + } + /** * Retrieve from Launch Darkly the default max seconds between messages for a given source. This * allows us to dynamically change the default max seconds between messages for a source. diff --git a/airbyte-data/src/main/kotlin/io/airbyte/data/repositories/ScopedConfigurationRepository.kt b/airbyte-data/src/main/kotlin/io/airbyte/data/repositories/ScopedConfigurationRepository.kt index 7cde7d6daa3..a5e3dd8ac07 100644 --- a/airbyte-data/src/main/kotlin/io/airbyte/data/repositories/ScopedConfigurationRepository.kt +++ b/airbyte-data/src/main/kotlin/io/airbyte/data/repositories/ScopedConfigurationRepository.kt @@ -35,6 +35,15 @@ interface ScopedConfigurationRepository : PageableRepository, ): List + fun findByKeyAndResourceTypeAndResourceIdAndScopeTypeAndOriginTypeAndValueInList( + key: String, + resourceType: ConfigResourceType, + resourceId: UUID, + scopeType: ConfigScopeType, + originType: ConfigOriginType, + values: List, + ): List + fun findByKey(key: String): List fun deleteByIdInList(ids: List) diff --git a/airbyte-data/src/main/kotlin/io/airbyte/data/services/ScopedConfigurationService.kt b/airbyte-data/src/main/kotlin/io/airbyte/data/services/ScopedConfigurationService.kt index 229f4e68a5e..7b3181d45ce 100644 --- a/airbyte-data/src/main/kotlin/io/airbyte/data/services/ScopedConfigurationService.kt +++ b/airbyte-data/src/main/kotlin/io/airbyte/data/services/ScopedConfigurationService.kt @@ -114,6 +114,18 @@ interface ScopedConfigurationService { origins: List, ): List + /** + * List scoped configurations with a given origin type and values. + */ + fun listScopedConfigurationsWithValues( + key: String, + resourceType: ConfigResourceType, + resourceId: UUID, + scopeType: ConfigScopeType, + originType: ConfigOriginType, + values: List, + ): List + /** * Delete a scoped configuration by id. */ diff --git a/airbyte-data/src/main/kotlin/io/airbyte/data/services/impls/data/ScopedConfigurationServiceDataImpl.kt b/airbyte-data/src/main/kotlin/io/airbyte/data/services/impls/data/ScopedConfigurationServiceDataImpl.kt index 46b8c0ef0d4..726af7b5b98 100644 --- a/airbyte-data/src/main/kotlin/io/airbyte/data/services/impls/data/ScopedConfigurationServiceDataImpl.kt +++ b/airbyte-data/src/main/kotlin/io/airbyte/data/services/impls/data/ScopedConfigurationServiceDataImpl.kt @@ -174,6 +174,24 @@ class ScopedConfigurationServiceDataImpl(private val repository: ScopedConfigura ).map { it.toConfigModel() }.toList() } + override fun listScopedConfigurationsWithValues( + key: String, + resourceType: ConfigResourceType, + resourceId: UUID, + scopeType: ConfigScopeType, + originType: ConfigOriginType, + values: List, + ): List { + return repository.findByKeyAndResourceTypeAndResourceIdAndScopeTypeAndOriginTypeAndValueInList( + key, + resourceType.toEntity(), + resourceId, + scopeType.toEntity(), + originType.toEntity(), + values, + ).map { it.toConfigModel() }.toList() + } + override fun deleteScopedConfiguration(configId: UUID) { repository.deleteById(configId) } diff --git a/airbyte-data/src/test/kotlin/io/airbyte/data/repositories/ScopedConfigurationRepositoryTest.kt b/airbyte-data/src/test/kotlin/io/airbyte/data/repositories/ScopedConfigurationRepositoryTest.kt index 7dc15ae8c89..584f6a3e7d8 100644 --- a/airbyte-data/src/test/kotlin/io/airbyte/data/repositories/ScopedConfigurationRepositoryTest.kt +++ b/airbyte-data/src/test/kotlin/io/airbyte/data/repositories/ScopedConfigurationRepositoryTest.kt @@ -480,4 +480,116 @@ internal class ScopedConfigurationRepositoryTest : AbstractConfigRepositoryTest< val persistedIds = findConfigsResult.map { it.id } assert(persistedIds.containsAll(listOf(config.id, config2.id, config3.id))) } + + @Test + fun `test db find by value in list`() { + val resourceId = UUID.randomUUID() + val valueA = "version-1" + val bcConfig1 = + ScopedConfiguration( + id = UUID.randomUUID(), + key = CONFIG_KEY, + value = valueA, + scopeType = ConfigScopeType.actor, + scopeId = UUID.randomUUID(), + resourceType = ConfigResourceType.actor_definition, + resourceId = resourceId, + originType = ConfigOriginType.breaking_change, + origin = "origin", + ) + + repository.save(bcConfig1) + + val bcConfig2 = + ScopedConfiguration( + id = UUID.randomUUID(), + key = CONFIG_KEY, + value = valueA, + scopeType = ConfigScopeType.actor, + scopeId = UUID.randomUUID(), + resourceType = ConfigResourceType.actor_definition, + resourceId = resourceId, + originType = ConfigOriginType.breaking_change, + origin = "origin", + ) + + repository.save(bcConfig2) + + val bcConfigOnOtherScopeType = + ScopedConfiguration( + id = UUID.randomUUID(), + key = CONFIG_KEY, + value = valueA, + scopeType = ConfigScopeType.workspace, + scopeId = UUID.randomUUID(), + resourceType = ConfigResourceType.actor_definition, + resourceId = resourceId, + originType = ConfigOriginType.breaking_change, + origin = "origin", + ) + + repository.save(bcConfigOnOtherScopeType) + + val userConfig = + ScopedConfiguration( + id = UUID.randomUUID(), + key = CONFIG_KEY, + value = valueA, + scopeType = ConfigScopeType.actor, + scopeId = UUID.randomUUID(), + resourceType = ConfigResourceType.actor_definition, + resourceId = resourceId, + originType = ConfigOriginType.user, + origin = "origin", + ) + + repository.save(userConfig) + + val valueB = "version-2" + val bcConfig3 = + ScopedConfiguration( + id = UUID.randomUUID(), + key = CONFIG_KEY, + value = valueB, + scopeType = ConfigScopeType.actor, + scopeId = UUID.randomUUID(), + resourceType = ConfigResourceType.actor_definition, + resourceId = resourceId, + originType = ConfigOriginType.breaking_change, + origin = "origin2", + ) + + repository.save(bcConfig3) + + val valueC = "version-3" + val bcConfig4 = + ScopedConfiguration( + id = UUID.randomUUID(), + key = CONFIG_KEY, + value = valueC, + scopeType = ConfigScopeType.organization, + scopeId = bcConfig1.scopeId, + resourceType = ConfigResourceType.actor_definition, + resourceId = resourceId, + originType = ConfigOriginType.breaking_change, + origin = "origin3", + ) + + repository.save(bcConfig4) + assert(repository.count() == 6L) + + val findConfigsResult = + repository.findByKeyAndResourceTypeAndResourceIdAndScopeTypeAndOriginTypeAndValueInList( + CONFIG_KEY, + ConfigResourceType.actor_definition, + resourceId, + ConfigScopeType.actor, + ConfigOriginType.breaking_change, + listOf(valueA, valueB), + ) + assert(findConfigsResult.size == 3) + + val persistedIds = findConfigsResult.map { it.id } + assert(persistedIds.containsAll(listOf(bcConfig1.id, bcConfig2.id, bcConfig3.id))) + } } diff --git a/airbyte-data/src/test/kotlin/io/airbyte/data/services/impls/data/ScopedConfigurationServiceDataImplTest.kt b/airbyte-data/src/test/kotlin/io/airbyte/data/services/impls/data/ScopedConfigurationServiceDataImplTest.kt index 40d82d923bd..aa7b4f6c1e5 100644 --- a/airbyte-data/src/test/kotlin/io/airbyte/data/services/impls/data/ScopedConfigurationServiceDataImplTest.kt +++ b/airbyte-data/src/test/kotlin/io/airbyte/data/services/impls/data/ScopedConfigurationServiceDataImplTest.kt @@ -3,6 +3,8 @@ package io.airbyte.data.services.impls.data import io.airbyte.data.exceptions.ConfigNotFoundException import io.airbyte.data.repositories.ScopedConfigurationRepository import io.airbyte.data.repositories.entities.ScopedConfiguration +import io.airbyte.data.services.impls.data.mappers.EntityConfigOriginType +import io.airbyte.data.services.impls.data.mappers.ModelConfigOriginType import io.airbyte.data.services.impls.data.mappers.ModelConfigScopeType import io.airbyte.data.services.impls.data.mappers.toConfigModel import io.airbyte.data.services.shared.ConfigScopeMapWithId @@ -777,6 +779,70 @@ internal class ScopedConfigurationServiceDataImplTest { } } + @Test + fun `test list configurations with values`() { + val resourceId = UUID.randomUUID() + + val config = + ScopedConfiguration( + id = UUID.randomUUID(), + key = "key", + value = "value", + scopeType = EntityConfigScopeType.workspace, + scopeId = UUID.randomUUID(), + resourceType = EntityConfigResourceType.actor_definition, + resourceId = resourceId, + originType = ConfigOriginType.user, + origin = "my_user_id", + ) + + val config2 = + ScopedConfiguration( + id = UUID.randomUUID(), + key = "key", + value = "value2", + scopeType = EntityConfigScopeType.workspace, + scopeId = UUID.randomUUID(), + resourceType = EntityConfigResourceType.actor_definition, + resourceId = resourceId, + originType = ConfigOriginType.user, + origin = "my_user_id2", + ) + + every { + scopedConfigurationRepository.findByKeyAndResourceTypeAndResourceIdAndScopeTypeAndOriginTypeAndValueInList( + "key", + EntityConfigResourceType.actor_definition, + resourceId, + EntityConfigScopeType.workspace, + EntityConfigOriginType.user, + listOf(config.value, config2.value), + ) + } returns listOf(config, config2) + + val res = + scopedConfigurationService.listScopedConfigurationsWithValues( + "key", + ModelConfigResourceType.ACTOR_DEFINITION, + resourceId, + ModelConfigScopeType.WORKSPACE, + ModelConfigOriginType.USER, + listOf(config.value, config2.value), + ) + assert(res == listOf(config.toConfigModel(), config2.toConfigModel())) + + verify { + scopedConfigurationRepository.findByKeyAndResourceTypeAndResourceIdAndScopeTypeAndOriginTypeAndValueInList( + "key", + EntityConfigResourceType.actor_definition, + resourceId, + EntityConfigScopeType.workspace, + EntityConfigOriginType.user, + listOf(config.value, config2.value), + ) + } + } + @Test fun `test delete scoped configuration`() { val configId = UUID.randomUUID()