Skip to content

Commit

Permalink
Faux Major Version Bump (airbytehq#7876)
Browse files Browse the repository at this point in the history
  • Loading branch information
benmoriceau authored Nov 11, 2021
1 parent b05c068 commit 9d05b1c
Show file tree
Hide file tree
Showing 22 changed files with 949 additions and 288 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ public <T> List<T> listConfigs(final AirbyteConfig configType, final Class<T> cl
}

@Override
public <T> List<ConfigWithMetadata<T>> listConfigsWithMetadata(AirbyteConfig configType, Class<T> clazz)
public <T> List<ConfigWithMetadata<T>> listConfigsWithMetadata(final AirbyteConfig configType, final Class<T> clazz)
throws JsonValidationException, IOException {
throw new UnsupportedOperationException("Yaml Seed Config doesn't support metadata");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ public interface Configs {

boolean getPublishMetrics();

boolean getVersion31ForceUpgrade();

SecretPersistenceType getSecretPersistenceType();

enum TrackingStrategy {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ public class EnvConfigs implements Configs {
private static final String SECRET_PERSISTENCE = "SECRET_PERSISTENCE";
private static final String JOBS_IMAGE_PULL_SECRET = "JOBS_IMAGE_PULL_SECRET";
private static final String PUBLISH_METRICS = "PUBLISH_METRICS";
private static final String VERSION_0_31_0_FORCE_UPGRADE = "VERSION_0_31_0_FORCE_UPGRADE";

// defaults
private static final String DEFAULT_SPEC_CACHE_BUCKET = "io-airbyte-cloud-spec-cache";
Expand All @@ -95,7 +96,7 @@ public class EnvConfigs implements Configs {
public static final String DEFAULT_NETWORK = "host";

private final Function<String, String> getEnv;
private LogConfiguration logConfiguration;
private final LogConfiguration logConfiguration;

public EnvConfigs() {
this(System::getenv);
Expand Down Expand Up @@ -281,12 +282,14 @@ private WorkerPodToleration workerPodToleration(final String tolerationStr) {
.collect(Collectors.toMap(s -> s[0], s -> s[1]));

if (tolerationMap.containsKey("key") && tolerationMap.containsKey("effect") && tolerationMap.containsKey("operator")) {
return new WorkerPodToleration(tolerationMap.get("key"),
return new WorkerPodToleration(
tolerationMap.get("key"),
tolerationMap.get("effect"),
tolerationMap.get("value"),
tolerationMap.get("operator"));
} else {
LOGGER.warn("Ignoring toleration {}, missing one of key,effect or operator",
LOGGER.warn(
"Ignoring toleration {}, missing one of key,effect or operator",
tolerationStr);
return null;
}
Expand Down Expand Up @@ -454,6 +457,11 @@ public boolean getPublishMetrics() {
return getEnvOrDefault(PUBLISH_METRICS, false);
}

@Override
public boolean getVersion31ForceUpgrade() {
return getEnvOrDefault(VERSION_0_31_0_FORCE_UPGRADE, false);
}

@Override
public SecretPersistenceType getSecretPersistenceType() {
final var secretPersistenceStr = getEnvOrDefault(SECRET_PERSISTENCE, SecretPersistenceType.NONE.name());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.function.Function;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -150,6 +151,17 @@ public void deleteStandardSourceDefinition(final UUID sourceDefId) throws IOExce
}
}

public void deleteSourceDefinitionAndAssociations(final UUID sourceDefinitionId)
throws JsonValidationException, ConfigNotFoundException, IOException {
deleteConnectorDefinitionAndAssociations(
ConfigSchema.STANDARD_SOURCE_DEFINITION,
ConfigSchema.SOURCE_CONNECTION,
SourceConnection.class,
SourceConnection::getSourceId,
SourceConnection::getSourceDefinitionId,
sourceDefinitionId);
}

public StandardDestinationDefinition getStandardDestinationDefinition(final UUID destinationDefinitionId)
throws JsonValidationException, IOException, ConfigNotFoundException {
return persistence.getConfig(ConfigSchema.STANDARD_DESTINATION_DEFINITION, destinationDefinitionId.toString(),
Expand Down Expand Up @@ -194,6 +206,44 @@ public void deleteStandardDestinationDefinition(final UUID destDefId) throws IOE
}
}

public void deleteDestinationDefinitionAndAssociations(final UUID destinationDefinitionId)
throws JsonValidationException, ConfigNotFoundException, IOException {
deleteConnectorDefinitionAndAssociations(
ConfigSchema.STANDARD_DESTINATION_DEFINITION,
ConfigSchema.DESTINATION_CONNECTION,
DestinationConnection.class,
DestinationConnection::getDestinationId,
DestinationConnection::getDestinationDefinitionId,
destinationDefinitionId);
}

private <T> void deleteConnectorDefinitionAndAssociations(
final ConfigSchema definitionType,
final ConfigSchema connectorType,
final Class<T> connectorClass,
final Function<T, UUID> connectorIdGetter,
final Function<T, UUID> connectorDefinitionIdGetter,
final UUID definitionId)
throws JsonValidationException, IOException, ConfigNotFoundException {
final Set<T> connectors = persistence.listConfigs(connectorType, connectorClass)
.stream()
.filter(connector -> connectorDefinitionIdGetter.apply(connector).equals(definitionId))
.collect(Collectors.toSet());
for (final T connector : connectors) {
final Set<StandardSync> syncs = persistence.listConfigs(ConfigSchema.STANDARD_SYNC, StandardSync.class)
.stream()
.filter(sync -> sync.getSourceId().equals(connectorIdGetter.apply(connector))
|| sync.getDestinationId().equals(connectorIdGetter.apply(connector)))
.collect(Collectors.toSet());

for (final StandardSync sync : syncs) {
persistence.deleteConfig(ConfigSchema.STANDARD_SYNC, sync.getConnectionId().toString());
}
persistence.deleteConfig(connectorType, connectorIdGetter.apply(connector).toString());
}
persistence.deleteConfig(definitionType, definitionId.toString());
}

public SourceConnection getSourceConnection(final UUID sourceId) throws JsonValidationException, ConfigNotFoundException, IOException {
return persistence.getConfig(ConfigSchema.SOURCE_CONNECTION, sourceId.toString(), SourceConnection.class);
}
Expand Down Expand Up @@ -229,7 +279,6 @@ public void writeSourceConnection(final SourceConnection source, final Connector
}

/**
*
* @param workspaceId workspace id for the config
* @param fullConfig full config
* @param spec connector specification
Expand All @@ -240,7 +289,6 @@ public JsonNode statefulSplitSecrets(final UUID workspaceId, final JsonNode full
}

/**
*
* @param workspaceId workspace id for the config
* @param oldConfig old full config
* @param fullConfig new full config
Expand Down Expand Up @@ -279,7 +327,6 @@ public JsonNode statefulUpdateSecrets(final UUID workspaceId,
}

/**
*
* @param fullConfig full config
* @param spec connector specification
* @return partial config
Expand Down Expand Up @@ -469,7 +516,8 @@ public void updateConnectionState(final UUID connectionId, final State state) th
public static Map<AirbyteConfig, Stream<?>> deserialize(final Map<String, Stream<JsonNode>> configurations) {
final Map<AirbyteConfig, Stream<?>> deserialized = new LinkedHashMap<AirbyteConfig, Stream<?>>();
for (final String configSchemaName : configurations.keySet()) {
deserialized.put(ConfigSchema.valueOf(configSchemaName),
deserialized.put(
ConfigSchema.valueOf(configSchemaName),
configurations.get(configSchemaName).map(jsonNode -> Jsons.object(jsonNode, ConfigSchema.valueOf(configSchemaName).getClassName())));
}
return deserialized;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@ public void loadData(final ConfigPersistence seedConfigPersistence) throws IOExc
});
}

public Set<String> getInUseConnectorDockerImageNames() throws IOException {
return database.transaction(this::getConnectorRepositoriesInUse);
}

public ValidatingConfigPersistence withValidation() {
return new ValidatingConfigPersistence(this);
}
Expand Down Expand Up @@ -131,7 +135,8 @@ public <T> List<ConfigWithMetadata<T>> listConfigsWithMetadata(final AirbyteConf
.orderBy(AIRBYTE_CONFIGS.CONFIG_TYPE, AIRBYTE_CONFIGS.CONFIG_ID)
.fetch());
return results.stream()
.map(record -> new ConfigWithMetadata<>(record.get(AIRBYTE_CONFIGS.CONFIG_ID),
.map(record -> new ConfigWithMetadata<>(
record.get(AIRBYTE_CONFIGS.CONFIG_ID),
record.get(AIRBYTE_CONFIGS.CONFIG_TYPE),
record.get(AIRBYTE_CONFIGS.CREATED_AT).toInstant(),
record.get(AIRBYTE_CONFIGS.UPDATED_AT).toInstant(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public <T> List<T> listConfigs(final AirbyteConfig configType, final Class<T> cl
}

@Override
public <T> List<ConfigWithMetadata<T>> listConfigsWithMetadata(AirbyteConfig configType, Class<T> clazz)
public <T> List<ConfigWithMetadata<T>> listConfigsWithMetadata(final AirbyteConfig configType, final Class<T> clazz)
throws JsonValidationException, IOException {
throw new UnsupportedOperationException("File Persistence doesn't support metadata");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public <T> List<T> listConfigs(final AirbyteConfig configType, final Class<T> cl
}

@Override
public <T> List<ConfigWithMetadata<T>> listConfigsWithMetadata(AirbyteConfig configType, Class<T> clazz)
public <T> List<ConfigWithMetadata<T>> listConfigsWithMetadata(final AirbyteConfig configType, final Class<T> clazz)
throws JsonValidationException, IOException {
final List<ConfigWithMetadata<T>> configs = decoratedPersistence.listConfigsWithMetadata(configType, clazz);
for (final ConfigWithMetadata<T> config : configs) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,27 @@

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import io.airbyte.commons.json.Jsons;
import io.airbyte.config.ConfigSchema;
import io.airbyte.config.DestinationConnection;
import io.airbyte.config.SourceConnection;
import io.airbyte.config.StandardDestinationDefinition;
import io.airbyte.config.StandardSourceDefinition;
import io.airbyte.config.StandardSync;
import io.airbyte.config.StandardSyncState;
import io.airbyte.config.StandardWorkspace;
import io.airbyte.config.State;
import io.airbyte.config.persistence.split_secrets.MemorySecretPersistence;
import io.airbyte.config.persistence.split_secrets.NoOpSecretsHydrator;
import io.airbyte.validation.json.JsonValidationException;
import java.io.IOException;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import org.junit.jupiter.api.AfterEach;
Expand Down Expand Up @@ -97,4 +104,77 @@ void testUpdateConnectionState() throws Exception {
verify(configPersistence, times(1)).writeConfig(ConfigSchema.STANDARD_SYNC_STATE, connectionId.toString(), connectionState2);
}

@Test
void testDeleteSourceDefinitionAndAssociations() throws JsonValidationException, IOException, ConfigNotFoundException {
final StandardSourceDefinition sourceDefToDelete = new StandardSourceDefinition().withSourceDefinitionId(UUID.randomUUID());
final StandardSourceDefinition sourceDefToStay = new StandardSourceDefinition().withSourceDefinitionId(UUID.randomUUID());

final SourceConnection sourceConnectionToDelete = new SourceConnection().withSourceId(UUID.randomUUID())
.withSourceDefinitionId(sourceDefToDelete.getSourceDefinitionId());
final SourceConnection sourceConnectionToStay = new SourceConnection().withSourceId(UUID.randomUUID())
.withSourceDefinitionId(sourceDefToStay.getSourceDefinitionId());
when(configPersistence.listConfigs(ConfigSchema.SOURCE_CONNECTION, SourceConnection.class)).thenReturn(List.of(
sourceConnectionToDelete,
sourceConnectionToStay));
when(configPersistence.listConfigs(ConfigSchema.DESTINATION_CONNECTION, DestinationConnection.class)).thenReturn(List.of());

final StandardSync syncToDelete = new StandardSync().withConnectionId(UUID.randomUUID()).withSourceId(sourceConnectionToDelete.getSourceId())
.withDestinationId(UUID.randomUUID());
final StandardSync syncToStay = new StandardSync().withConnectionId(UUID.randomUUID()).withSourceId(sourceConnectionToStay.getSourceId())
.withDestinationId(UUID.randomUUID());

when(configPersistence.listConfigs(ConfigSchema.STANDARD_SYNC, StandardSync.class)).thenReturn(List.of(syncToDelete, syncToStay));

configRepository.deleteSourceDefinitionAndAssociations(sourceDefToDelete.getSourceDefinitionId());

// verify that all records associated with sourceDefToDelete were deleted
verify(configPersistence, times(1)).deleteConfig(ConfigSchema.STANDARD_SYNC, syncToDelete.getConnectionId().toString());
verify(configPersistence, times(1)).deleteConfig(ConfigSchema.SOURCE_CONNECTION, sourceConnectionToDelete.getSourceId().toString());
verify(configPersistence, times(1)).deleteConfig(ConfigSchema.STANDARD_SOURCE_DEFINITION, sourceDefToDelete.getSourceDefinitionId().toString());

// verify that none of the records associated with sourceDefToStay were deleted
verify(configPersistence, never()).deleteConfig(ConfigSchema.STANDARD_SYNC, syncToStay.getConnectionId().toString());
verify(configPersistence, never()).deleteConfig(ConfigSchema.SOURCE_CONNECTION, sourceConnectionToStay.getSourceId().toString());
verify(configPersistence, never()).deleteConfig(ConfigSchema.STANDARD_SOURCE_DEFINITION, sourceDefToStay.getSourceDefinitionId().toString());
}

@Test
void testDeleteDestinationDefinitionAndAssociations() throws JsonValidationException, IOException, ConfigNotFoundException {
final StandardDestinationDefinition destDefToDelete = new StandardDestinationDefinition().withDestinationDefinitionId(UUID.randomUUID());
final StandardDestinationDefinition destDefToStay = new StandardDestinationDefinition().withDestinationDefinitionId(UUID.randomUUID());

final DestinationConnection destConnectionToDelete = new DestinationConnection().withDestinationId(UUID.randomUUID())
.withDestinationDefinitionId(destDefToDelete.getDestinationDefinitionId());
final DestinationConnection destConnectionToStay = new DestinationConnection().withDestinationId(UUID.randomUUID())
.withDestinationDefinitionId(destDefToStay.getDestinationDefinitionId());
when(configPersistence.listConfigs(ConfigSchema.DESTINATION_CONNECTION, DestinationConnection.class)).thenReturn(List.of(
destConnectionToDelete,
destConnectionToStay));
when(configPersistence.listConfigs(ConfigSchema.SOURCE_CONNECTION, SourceConnection.class)).thenReturn(List.of());

final StandardSync syncToDelete = new StandardSync().withConnectionId(UUID.randomUUID())
.withDestinationId(destConnectionToDelete.getDestinationId())
.withSourceId(UUID.randomUUID());
final StandardSync syncToStay = new StandardSync().withConnectionId(UUID.randomUUID()).withDestinationId(destConnectionToStay.getDestinationId())
.withSourceId(UUID.randomUUID());

when(configPersistence.listConfigs(ConfigSchema.STANDARD_SYNC, StandardSync.class)).thenReturn(List.of(syncToDelete, syncToStay));

configRepository.deleteDestinationDefinitionAndAssociations(destDefToDelete.getDestinationDefinitionId());

// verify that all records associated with destDefToDelete were deleted
verify(configPersistence, times(1)).deleteConfig(ConfigSchema.STANDARD_SYNC, syncToDelete.getConnectionId().toString());
verify(configPersistence, times(1)).deleteConfig(ConfigSchema.DESTINATION_CONNECTION, destConnectionToDelete.getDestinationId().toString());
verify(configPersistence, times(1)).deleteConfig(
ConfigSchema.STANDARD_DESTINATION_DEFINITION,
destDefToDelete.getDestinationDefinitionId().toString());

// verify that none of the records associated with destDefToStay were deleted
verify(configPersistence, never()).deleteConfig(ConfigSchema.STANDARD_SYNC, syncToStay.getConnectionId().toString());
verify(configPersistence, never()).deleteConfig(ConfigSchema.DESTINATION_CONNECTION, destConnectionToStay.getDestinationId().toString());
verify(configPersistence, never()).deleteConfig(
ConfigSchema.STANDARD_DESTINATION_DEFINITION,
destDefToStay.getDestinationDefinitionId().toString());
}

}
Loading

0 comments on commit 9d05b1c

Please sign in to comment.