Skip to content

Commit

Permalink
Use new breaking change pins on support state crons (#12195)
Browse files Browse the repository at this point in the history
  • Loading branch information
pedroslopez committed Apr 25, 2024
1 parent 55b5d24 commit bc0d4e0
Show file tree
Hide file tree
Showing 16 changed files with 600 additions and 390 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import io.airbyte.bootloader.helpers.NoOpDefinitionVersionOverrideProvider;
import io.airbyte.commons.features.FeatureFlags;
import io.airbyte.commons.resources.MoreResources;
import io.airbyte.commons.version.AirbyteProtocolVersionRange;
Expand All @@ -23,7 +22,7 @@
import io.airbyte.config.init.DeclarativeSourceUpdater;
import io.airbyte.config.init.PostLoadExecutor;
import io.airbyte.config.init.SupportStateUpdater;
import io.airbyte.config.persistence.ActorDefinitionVersionHelper;
import io.airbyte.config.persistence.BreakingChangesHelper;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.config.persistence.OrganizationPersistence;
import io.airbyte.config.secrets.SecretsRepositoryReader;
Expand Down Expand Up @@ -168,6 +167,11 @@ void testBootloaderAppBlankDb() throws Exception {
secretPersistenceConfigService,
connectionService,
actorDefinitionVersionUpdater);
val workspaceService = new WorkspaceServiceJooqImpl(configDatabase,
featureFlagClient,
secretsRepositoryReader,
secretsRepositoryWriter,
secretPersistenceConfigService);
val configRepository = new ConfigRepository(
new ActorDefinitionServiceJooqImpl(configDatabase),
new CatalogServiceJooqImpl(configDatabase),
Expand All @@ -180,11 +184,7 @@ void testBootloaderAppBlankDb() throws Exception {
secretPersistenceConfigService),
new OperationServiceJooqImpl(configDatabase),
sourceService,
new WorkspaceServiceJooqImpl(configDatabase,
featureFlagClient,
secretsRepositoryReader,
secretsRepositoryWriter,
secretPersistenceConfigService));
workspaceService);
val configsDatabaseInitializationTimeoutMs = TimeUnit.SECONDS.toMillis(60L);
val configDatabaseInitializer = DatabaseCheckFactory.createConfigsDatabaseInitializer(configsDslContext,
configsDatabaseInitializationTimeoutMs, MoreResources.readResource(DatabaseConstants.CONFIGS_INITIAL_SCHEMA_PATH));
Expand All @@ -198,13 +198,10 @@ void testBootloaderAppBlankDb() throws Exception {
val organizationPersistence = new OrganizationPersistence(jobDatabase);
val protocolVersionChecker = new ProtocolVersionChecker(jobsPersistence, airbyteProtocolRange, configRepository, definitionsProvider);
val breakingChangeNotificationHelper = new BreakingChangeNotificationHelper(configRepository, featureFlagClient);
val actorDefinitionVersionHelper =
new ActorDefinitionVersionHelper(configRepository, new NoOpDefinitionVersionOverrideProvider(), new NoOpDefinitionVersionOverrideProvider(),
featureFlagClient);
val breakingChangeHelper = new BreakingChangesHelper(scopedConfigurationService, workspaceService, destinationService, sourceService);
val supportStateUpdater =
new SupportStateUpdater(actorDefinitionService, sourceService, destinationService, DeploymentMode.OSS,
actorDefinitionVersionHelper, breakingChangeNotificationHelper,
featureFlagClient);
new SupportStateUpdater(actorDefinitionService, sourceService, destinationService, DeploymentMode.OSS, breakingChangeHelper,
breakingChangeNotificationHelper, featureFlagClient);
val metricClient = new NotImplementedMetricClient();
val applyDefinitionsHelper =
new ApplyDefinitionsHelper(definitionsProvider, jobsPersistence, actorDefinitionService, sourceService, destinationService,
Expand Down Expand Up @@ -261,35 +258,38 @@ void testRequiredVersionUpgradePredicate() throws Exception {
connectionService,
actorDefinitionService,
scopedConfigurationService);
val sourceService = new SourceServiceJooqImpl(configDatabase,
featureFlagClient,
mock(SecretsRepositoryReader.class),
mock(SecretsRepositoryWriter.class),
mock(SecretPersistenceConfigService.class),
connectionService,
actorDefinitionVersionUpdater);
val destinationService = new DestinationServiceJooqImpl(configDatabase,
featureFlagClient,
mock(SecretsRepositoryReader.class),
mock(SecretsRepositoryWriter.class),
mock(SecretPersistenceConfigService.class),
connectionService,
actorDefinitionVersionUpdater);
val workspaceService = new WorkspaceServiceJooqImpl(configDatabase,
featureFlagClient,
mock(SecretsRepositoryReader.class),
mock(SecretsRepositoryWriter.class),
mock(SecretPersistenceConfigService.class));
val configRepository = new ConfigRepository(
new ActorDefinitionServiceJooqImpl(configDatabase),
new CatalogServiceJooqImpl(configDatabase),
connectionService,
new ConnectorBuilderServiceJooqImpl(configDatabase),
new DestinationServiceJooqImpl(configDatabase,
featureFlagClient,
mock(SecretsRepositoryReader.class),
mock(SecretsRepositoryWriter.class),
mock(SecretPersistenceConfigService.class),
connectionService,
actorDefinitionVersionUpdater),
destinationService,
new OAuthServiceJooqImpl(configDatabase,
featureFlagClient,
mock(SecretsRepositoryReader.class),
mock(SecretPersistenceConfigService.class)),
new OperationServiceJooqImpl(configDatabase),
new SourceServiceJooqImpl(configDatabase,
featureFlagClient,
mock(SecretsRepositoryReader.class),
mock(SecretsRepositoryWriter.class),
mock(SecretPersistenceConfigService.class),
connectionService,
actorDefinitionVersionUpdater),
new WorkspaceServiceJooqImpl(configDatabase,
featureFlagClient,
mock(SecretsRepositoryReader.class),
mock(SecretsRepositoryWriter.class),
mock(SecretPersistenceConfigService.class)));
sourceService,
workspaceService);
val configsDatabaseInitializationTimeoutMs = TimeUnit.SECONDS.toMillis(60L);
val configDatabaseInitializer = DatabaseCheckFactory.createConfigsDatabaseInitializer(configsDslContext,
configsDatabaseInitializationTimeoutMs, MoreResources.readResource(DatabaseConstants.CONFIGS_INITIAL_SCHEMA_PATH));
Expand All @@ -302,25 +302,9 @@ void testRequiredVersionUpgradePredicate() throws Exception {
val jobsPersistence = new DefaultJobPersistence(jobDatabase);
val organizationPersistence = new OrganizationPersistence(jobDatabase);
val breakingChangeNotificationHelper = new BreakingChangeNotificationHelper(configRepository, featureFlagClient);
val actorDefinitionVersionHelper =
new ActorDefinitionVersionHelper(configRepository, new NoOpDefinitionVersionOverrideProvider(), new NoOpDefinitionVersionOverrideProvider(),
featureFlagClient);
val sourceService = new SourceServiceJooqImpl(configDatabase,
featureFlagClient,
mock(SecretsRepositoryReader.class),
mock(SecretsRepositoryWriter.class),
mock(SecretPersistenceConfigService.class),
connectionService,
actorDefinitionVersionUpdater);
val destinationService = new DestinationServiceJooqImpl(configDatabase,
featureFlagClient,
mock(SecretsRepositoryReader.class),
mock(SecretsRepositoryWriter.class),
mock(SecretPersistenceConfigService.class),
connectionService,
actorDefinitionVersionUpdater);
val breakingChangesHelper = new BreakingChangesHelper(scopedConfigurationService, workspaceService, destinationService, sourceService);
val supportStateUpdater =
new SupportStateUpdater(actorDefinitionService, sourceService, destinationService, DeploymentMode.OSS, actorDefinitionVersionHelper,
new SupportStateUpdater(actorDefinitionService, sourceService, destinationService, DeploymentMode.OSS, breakingChangesHelper,
breakingChangeNotificationHelper, featureFlagClient);
val protocolVersionChecker = new ProtocolVersionChecker(jobsPersistence, airbyteProtocolRange, configRepository, definitionsProvider);
val metricClient = new NotImplementedMetricClient();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import io.airbyte.config.SourceConnection;
import io.airbyte.config.StandardDestinationDefinition;
import io.airbyte.config.StandardSourceDefinition;
import io.airbyte.config.persistence.ConfigRepository.StandardSyncQuery;
import io.airbyte.config.persistence.version_overrides.DefinitionVersionOverrideProvider;
import io.airbyte.featureflag.EnableConfigurationOverrideProvider;
import io.airbyte.featureflag.FeatureFlagClient;
Expand All @@ -22,13 +21,9 @@
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import kotlin.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -263,92 +258,6 @@ public boolean getSourceOrDestinationIsAlphaOrBeta(final UUID workspaceId,
return hasAlphaOrBetaVersion(List.of(sourceVersion, destinationVersion));
}

/**
* Helper method to retrieve active syncs using a given source version, excluding syncs that have a
* version override applied.
*
* @param sourceDefinition - source definition
* @param sourceVersionIds - source version ids
* @return list of pairs, where the first element is the workspace id and the second element is a
* list of affected sync ids in the workspace
*/
public List<Pair<UUID, List<UUID>>> getActiveWorkspaceSyncsWithSourceVersionIds(final StandardSourceDefinition sourceDefinition,
final List<UUID> sourceVersionIds)
throws IOException, ConfigNotFoundException, JsonValidationException {
final List<SourceConnection> sourceConnections = configRepository.listSourcesWithVersionIds(sourceVersionIds);
final Map<UUID, List<SourceConnection>> sourceConnectionsByWorkspace = new HashMap<>();

// verify that a version override has not been applied to the source, and collect by workspace
for (final SourceConnection source : sourceConnections) {
final ActorDefinitionVersionWithOverrideStatus versionWithOverrideStatus =
getSourceVersionWithOverrideStatus(sourceDefinition, source.getWorkspaceId(), source.getSourceId());
if (!versionWithOverrideStatus.isOverrideApplied()) {
sourceConnectionsByWorkspace
.computeIfAbsent(source.getWorkspaceId(), k -> new ArrayList<>())
.add(source);
}
}

// get affected syncs for each workspace and add them to the list
final List<Pair<UUID, List<UUID>>> workspaceSyncIds = new ArrayList<>();
for (final Map.Entry<UUID, List<SourceConnection>> entry : sourceConnectionsByWorkspace.entrySet()) {
final UUID workspaceId = entry.getKey();
final List<SourceConnection> sourcesForWorkspace = entry.getValue();
final List<UUID> sourceIds = sourcesForWorkspace.stream().map(SourceConnection::getSourceId).toList();
final StandardSyncQuery syncQuery = new StandardSyncQuery(workspaceId, sourceIds, null, false);
final List<UUID> activeSyncIds = configRepository.listWorkspaceActiveSyncIds(syncQuery);
if (!activeSyncIds.isEmpty()) {
workspaceSyncIds.add(new Pair<>(workspaceId, activeSyncIds));
}
}

return workspaceSyncIds;
}

/**
* Helper method to retrieve active syncs using a given destination version, excluding syncs that
* have a version override applied.
*
* @param destinationDefinition - destination definition
* @param destinationVersionIds - destination version ids
* @return list of pairs, where the first element is the workspace id and the second element is a
* list of affected sync ids in the workspace
*/
public List<Pair<UUID, List<UUID>>> getActiveWorkspaceSyncsWithDestinationVersionIds(final StandardDestinationDefinition destinationDefinition,
final List<UUID> destinationVersionIds)
throws IOException, JsonValidationException, ConfigNotFoundException {
final List<DestinationConnection> destinationConnections = configRepository.listDestinationsWithVersionIds(destinationVersionIds);
final Map<UUID, List<DestinationConnection>> destinationConnectionsByWorkspace = new HashMap<>();

// verify that a version override has not been applied to the destination, and collect by workspace
for (final DestinationConnection destination : destinationConnections) {
final ActorDefinitionVersionWithOverrideStatus versionWithOverrideStatus = getDestinationVersionWithOverrideStatus(
destinationDefinition,
destination.getWorkspaceId(),
destination.getDestinationId());
if (!versionWithOverrideStatus.isOverrideApplied()) {
destinationConnectionsByWorkspace
.computeIfAbsent(destination.getWorkspaceId(), k -> new ArrayList<>())
.add(destination);
}
}

// get affected syncs for each workspace and add them to the list
final List<Pair<UUID, List<UUID>>> workspaceSyncIds = new ArrayList<>();
for (final Map.Entry<UUID, List<DestinationConnection>> entry : destinationConnectionsByWorkspace.entrySet()) {
final UUID workspaceId = entry.getKey();
final List<DestinationConnection> destinationsForWorkspace = entry.getValue();
final List<UUID> destinationIds = destinationsForWorkspace.stream().map(DestinationConnection::getDestinationId).toList();
final StandardSyncQuery syncQuery = new StandardSyncQuery(workspaceId, null, destinationIds, false);
final List<UUID> activeSyncIds = configRepository.listWorkspaceActiveSyncIds(syncQuery);
if (!activeSyncIds.isEmpty()) {
workspaceSyncIds.add(new Pair<>(workspaceId, activeSyncIds));
}
}

return workspaceSyncIds;
}

/**
* Get the docker image name (docker_repository:docker_image_tag) for a given actor definition
* version.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,126 @@
import io.airbyte.commons.version.Version;
import io.airbyte.config.ActorDefinitionBreakingChange;
import io.airbyte.config.ActorDefinitionVersion;
import io.airbyte.config.ActorType;
import io.airbyte.config.ConfigOriginType;
import io.airbyte.config.ConfigResourceType;
import io.airbyte.config.ConfigScopeType;
import io.airbyte.config.DestinationConnection;
import io.airbyte.config.ScopedConfiguration;
import io.airbyte.config.SourceConnection;
import io.airbyte.data.exceptions.ConfigNotFoundException;
import io.airbyte.data.services.ActorDefinitionService;
import io.airbyte.data.services.DestinationService;
import io.airbyte.data.services.ScopedConfigurationService;
import io.airbyte.data.services.SourceService;
import io.airbyte.data.services.WorkspaceService;
import io.airbyte.data.services.shared.ConnectorVersionKey;
import io.airbyte.data.services.shared.StandardSyncQuery;
import jakarta.inject.Singleton;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;
import kotlin.Pair;

/**
* Helper class containing logic related to breaking changes.
*/
@Singleton
public class BreakingChangesHelper {

private final ScopedConfigurationService scopedConfigurationService;
private final WorkspaceService workspaceService;
private final DestinationService destinationService;
private final SourceService sourceService;

public BreakingChangesHelper(final ScopedConfigurationService scopedConfigurationService,
final WorkspaceService workspaceService,
final DestinationService destinationService,
final SourceService sourceService) {
this.scopedConfigurationService = scopedConfigurationService;
this.workspaceService = workspaceService;
this.destinationService = destinationService;
this.sourceService = sourceService;
}

/**
* Finds all active syncs on versions that are unsupported due to a breaking change. Results are
* given per workspace.
*
* @param actorType - type of actor
* @param actorDefinitionId - actor definition id
* @param unsupportedVersionIds - unsupported version ids
* @return list of workspace ids with active syncs on unsupported versions, along with the sync ids
*/
public List<Pair<UUID, List<UUID>>> getBreakingActiveSyncsPerWorkspace(final ActorType actorType,
final UUID actorDefinitionId,
final List<UUID> unsupportedVersionIds)
throws IOException {
// get actors pinned to unsupported versions (due to a breaking change)
final List<String> pinnedValues = unsupportedVersionIds.stream().map(UUID::toString).toList();
final List<ScopedConfiguration> breakingChangePins =
scopedConfigurationService.listScopedConfigurationsWithValues(
ConnectorVersionKey.INSTANCE.getKey(),
ConfigResourceType.ACTOR_DEFINITION,
actorDefinitionId,
ConfigScopeType.ACTOR,
ConfigOriginType.BREAKING_CHANGE,
pinnedValues);

// fetch actors and group by workspace
final Map<UUID, List<UUID>> actorIdsByWorkspace = getActorIdsByWorkspace(actorType, breakingChangePins);

// get affected syncs for each workspace
final List<Pair<UUID, List<UUID>>> workspaceSyncIds = new ArrayList<>();
for (final Map.Entry<UUID, List<UUID>> entry : actorIdsByWorkspace.entrySet()) {
final UUID workspaceId = entry.getKey();
final List<UUID> actorIdsForWorkspace = entry.getValue();
final StandardSyncQuery syncQuery = buildStandardSyncQuery(actorType, workspaceId, actorIdsForWorkspace);
final List<UUID> activeSyncIds = workspaceService.listWorkspaceActiveSyncIds(syncQuery);
if (!activeSyncIds.isEmpty()) {
workspaceSyncIds.add(new Pair<>(workspaceId, activeSyncIds));
}
}

return workspaceSyncIds;
}

private Map<UUID, List<UUID>> getActorIdsByWorkspace(final ActorType actorType, final Collection<ScopedConfiguration> scopedConfigs)
throws IOException {
final List<UUID> actorIds = scopedConfigs.stream().map(ScopedConfiguration::getScopeId).toList();
switch (actorType) {
case SOURCE -> {
return sourceService.listSourcesWithIds(actorIds).stream()
.collect(Collectors.groupingBy(SourceConnection::getWorkspaceId,
Collectors.mapping(SourceConnection::getSourceId,
Collectors.toList())));
}
case DESTINATION -> {
return destinationService.listDestinationsWithIds(actorIds).stream()
.collect(Collectors.groupingBy(DestinationConnection::getWorkspaceId,
Collectors.mapping(DestinationConnection::getDestinationId,
Collectors.toList())));
}
default -> throw new IllegalArgumentException("Actor type not supported: " + actorType);
}
}

private StandardSyncQuery buildStandardSyncQuery(final ActorType actorType, final UUID workspaceId, final List<UUID> actorIds) {
switch (actorType) {
case SOURCE -> {
return new StandardSyncQuery(workspaceId, actorIds, null, false);
}
case DESTINATION -> {
return new StandardSyncQuery(workspaceId, null, actorIds, false);
}
default -> throw new IllegalArgumentException("Actor type not supported: " + actorType);
}
}

/**
* Given a list of breaking changes and the current default version, filter for breaking changes
* that would affect the current default version. This is used to avoid considering breaking changes
Expand Down
Loading

0 comments on commit bc0d4e0

Please sign in to comment.