Skip to content

Commit

Permalink
fix: avoid downgrading manually upgraded connectors on OSS when resta…
Browse files Browse the repository at this point in the history
…rting Airbyte (#14567)
  • Loading branch information
pedroslopez committed Nov 15, 2024
1 parent 8fe2f15 commit 2b7326f
Show file tree
Hide file tree
Showing 5 changed files with 152 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import io.airbyte.commons.version.AirbyteVersion;
import io.airbyte.commons.version.Version;
import io.airbyte.config.Configs.DeploymentMode;
import io.airbyte.config.Configs.SeedDefinitionsProviderType;
import io.airbyte.config.init.AirbyteCompatibleConnectorsValidator;
import io.airbyte.config.init.ApplyDefinitionsHelper;
import io.airbyte.config.init.BreakingChangeNotificationHelper;
Expand Down Expand Up @@ -85,6 +86,7 @@ class BootloaderTest {
private FeatureFlagClient featureFlagClient;
private static final String DEFAULT_REALM = "airbyte";
private static final String DOCKER = "docker";
private static final SeedDefinitionsProviderType SEED_PROVIDER_TYPE = SeedDefinitionsProviderType.LOCAL;
private static final String PROTOCOL_VERSION_001 = "0.0.1";
private static final String PROTOCOL_VERSION_124 = "1.2.4";
private static final String VERSION_0330_ALPHA = "0.33.0-alpha";
Expand Down Expand Up @@ -200,7 +202,8 @@ void testBootloaderAppBlankDb() throws Exception {
when(airbyteCompatibleConnectorsValidator.validateDeclarativeManifest(anyString()))
.thenReturn(new ConnectorPlatformCompatibilityValidationResult(true, ""));
val applyDefinitionsHelper =
new ApplyDefinitionsHelper(definitionsProvider, jobsPersistence, actorDefinitionService, sourceService, destinationService,
new ApplyDefinitionsHelper(definitionsProvider, SEED_PROVIDER_TYPE, jobsPersistence, actorDefinitionService, sourceService,
destinationService,
metricClient, supportStateUpdater, actorDefinitionVersionResolver, airbyteCompatibleConnectorsValidator, connectorRolloutService);
final DeclarativeManifestImageVersionsProvider declarativeManifestImageVersionsProvider = new LocalDeclarativeManifestImageVersionsProvider();
val declarativeSourceUpdater =
Expand Down Expand Up @@ -298,7 +301,8 @@ void testRequiredVersionUpgradePredicate() throws Exception {
val airbyteCompatibleConnectorsValidator = mock(AirbyteCompatibleConnectorsValidator.class);
val connectorRolloutService = mock(ConnectorRolloutService.class);
val applyDefinitionsHelper =
new ApplyDefinitionsHelper(definitionsProvider, jobsPersistence, actorDefinitionService, sourceService, destinationService,
new ApplyDefinitionsHelper(definitionsProvider, SEED_PROVIDER_TYPE, jobsPersistence, actorDefinitionService, sourceService,
destinationService,
metricClient, supportStateUpdater, actorDefinitionVersionResolver, airbyteCompatibleConnectorsValidator, connectorRolloutService);
final DeclarativeManifestImageVersionsProvider declarativeManifestImageVersionsProvider = new LocalDeclarativeManifestImageVersionsProvider();
val declarativeSourceUpdater =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,14 @@ enum SecretPersistenceType {
AWS_SECRET_MANAGER
}

/**
* Seed definitions provider type.
*/
enum SeedDefinitionsProviderType {
LOCAL,
REMOTE
}

/**
* The configured Airbyte edition for the instance. By default, an Airbyte instance is configured as
* Community edition. If configured as Pro edition, the instance will perform a license check and
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import io.airbyte.commons.version.AirbyteProtocolVersionRange
import io.airbyte.config.ActorDefinitionBreakingChange
import io.airbyte.config.ActorDefinitionVersion
import io.airbyte.config.ActorType
import io.airbyte.config.Configs.SeedDefinitionsProviderType
import io.airbyte.config.ConnectorEnumRolloutState
import io.airbyte.config.ConnectorRegistryDestinationDefinition
import io.airbyte.config.ConnectorRegistrySourceDefinition
Expand Down Expand Up @@ -49,6 +50,7 @@ import kotlin.jvm.optionals.getOrNull
@Requires(bean = MetricClient::class)
class ApplyDefinitionsHelper(
@param:Named("seedDefinitionsProvider") private val definitionsProvider: DefinitionsProvider,
private val seedProviderType: SeedDefinitionsProviderType,
private val jobPersistence: JobPersistence,
private val actorDefinitionService: ActorDefinitionService,
private val sourceService: SourceService,
Expand Down Expand Up @@ -201,7 +203,7 @@ class ApplyDefinitionsHelper(
}

@VisibleForTesting
fun <T> applyReleaseCandidates(rcDefinitions: List<T>) {
internal fun <T> applyReleaseCandidates(rcDefinitions: List<T>) {
for (rcDef in rcDefinitions) {
val rcAdv =
when (rcDef) {
Expand Down Expand Up @@ -349,13 +351,24 @@ class ApplyDefinitionsHelper(
return reImportVersionInUse && definitionIsInUse
}

private fun getShouldUpdateActorDefinitionDefaultVersion(
@VisibleForTesting
internal fun getShouldUpdateActorDefinitionDefaultVersion(
currentDefaultADV: ActorDefinitionVersion,
newADV: ActorDefinitionVersion,
actorDefinitionIdsInUse: Set<UUID>,
updateAll: Boolean,
): Boolean {
val newVersionIsAvailable = newADV.dockerImageTag != currentDefaultADV.dockerImageTag
val newVersionIsAvailable =
when (seedProviderType) {
SeedDefinitionsProviderType.REMOTE -> newADV.dockerImageTag != currentDefaultADV.dockerImageTag
SeedDefinitionsProviderType.LOCAL -> {
// (oss) if we're using the registry shipped with the platform, connector versions may be stale.
// We should only update if the new version is greater than the current version, in case the user has manually
// upgraded the connector via the UI. See https://github.com/airbytehq/airbyte-internal-issues/issues/8691.
newADV.dockerImageTag > currentDefaultADV.dockerImageTag
}
}

val definitionIsInUse = actorDefinitionIdsInUse.contains(currentDefaultADV.actorDefinitionId)
val shouldApplyNewVersion = updateAll || !definitionIsInUse

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
*/
package io.airbyte.config.init.config

import io.airbyte.config.Configs.SeedDefinitionsProviderType
import io.airbyte.config.init.AirbyteCompatibleConnectorsValidator
import io.airbyte.config.init.DeclarativeManifestImageVersionsProvider
import io.airbyte.config.init.DeclarativeSourceUpdater
Expand Down Expand Up @@ -32,15 +33,29 @@ class SeedBeanFactory {
@Singleton
@Named("seedDefinitionsProvider")
fun seedDefinitionsProvider(
@Value("\${airbyte.connector-registry.seed-provider}") seedProvider: String,
seedProvider: SeedDefinitionsProviderType,
remoteDefinitionsProvider: RemoteDefinitionsProvider,
): DefinitionsProvider {
return when (seedProvider) {
SeedDefinitionsProviderType.LOCAL -> {
LOGGER.info("Using local definitions provider for seeding")
LocalDefinitionsProvider()
}
SeedDefinitionsProviderType.REMOTE -> {
LOGGER.info("Using remote definitions provider for seeding")
remoteDefinitionsProvider
}
}
}

@Singleton
fun seedDefinitionsProviderType(
@Value("\${airbyte.connector-registry.seed-provider}") seedProvider: String,
): SeedDefinitionsProviderType {
if (StringUtils.isEmpty(seedProvider) || LOCAL_SEED_PROVIDER.equals(seedProvider, ignoreCase = true)) {
LOGGER.info("Using local definitions provider for seeding")
return LocalDefinitionsProvider()
return SeedDefinitionsProviderType.LOCAL
} else if (REMOTE_SEED_PROVIDER.equals(seedProvider, ignoreCase = true)) {
LOGGER.info("Using remote definitions provider for seeding")
return remoteDefinitionsProvider
return SeedDefinitionsProviderType.REMOTE
}

throw IllegalArgumentException("Invalid seed provider: $seedProvider")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import io.airbyte.commons.version.AirbyteProtocolVersionRange
import io.airbyte.commons.version.Version
import io.airbyte.config.ActorDefinitionVersion
import io.airbyte.config.BreakingChanges
import io.airbyte.config.Configs.SeedDefinitionsProviderType
import io.airbyte.config.ConnectorEnumRolloutState
import io.airbyte.config.ConnectorRegistryDestinationDefinition
import io.airbyte.config.ConnectorRegistrySourceDefinition
Expand Down Expand Up @@ -40,6 +41,8 @@ import io.mockk.justRun
import io.mockk.mockk
import io.mockk.verify
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Assertions.assertFalse
import org.junit.jupiter.api.Assertions.assertTrue
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertThrows
Expand All @@ -65,13 +68,15 @@ internal class ApplyDefinitionsHelperTest {
private val actorDefinitionVersionResolver: ActorDefinitionVersionResolver = mockk()
private val airbyteCompatibleConnectorsValidator: AirbyteCompatibleConnectorsValidator = mockk()
private val connectorRolloutService: ConnectorRolloutService = mockk()
private val seedDefinitionsProviderType: SeedDefinitionsProviderType = mockk()
private lateinit var applyDefinitionsHelper: ApplyDefinitionsHelper

@BeforeEach
fun setup() {
applyDefinitionsHelper =
ApplyDefinitionsHelper(
definitionsProvider,
seedDefinitionsProviderType,
jobPersistence,
actorDefinitionService,
sourceService,
Expand All @@ -90,6 +95,7 @@ internal class ApplyDefinitionsHelperTest {
every { airbyteCompatibleConnectorsValidator.validate(any(), any()) } returns ConnectorPlatformCompatibilityValidationResult(true, null)
every { jobPersistence.currentProtocolVersionRange } returns Optional.of(AirbyteProtocolVersionRange(Version("2.0.0"), Version("3.0.0")))
every { actorDefinitionVersionResolver.fetchRemoteActorDefinitionVersion(any(), any(), any()) } returns Optional.empty()
every { seedDefinitionsProviderType.ordinal } returns SeedDefinitionsProviderType.REMOTE.ordinal
mockVoidReturningFunctions()
}

Expand Down Expand Up @@ -856,6 +862,89 @@ internal class ApplyDefinitionsHelperTest {
confirmVerified(actorDefinitionService, sourceService, destinationService, supportStateUpdater, metricClient)
}

@ParameterizedTest
@MethodSource("updateScenarioWithSeedType")
fun `should only perform version rollbacks when using remote definitions provider`(
updateAll: Boolean,
isInUse: Boolean,
seedType: SeedDefinitionsProviderType,
) {
every { seedDefinitionsProviderType.ordinal } returns seedType.ordinal

val currentVersion = ConnectorRegistryConverters.toActorDefinitionVersion(SOURCE_POSTGRES_2)
val newVersion = ConnectorRegistryConverters.toActorDefinitionVersion(SOURCE_POSTGRES)

val definitionsInUse = if (isInUse) setOf(currentVersion.actorDefinitionId) else setOf()

val shouldUpdateVersion =
applyDefinitionsHelper.getShouldUpdateActorDefinitionDefaultVersion(
currentVersion,
newVersion,
definitionsInUse,
updateAll,
)

if (seedType == SeedDefinitionsProviderType.REMOTE && (!isInUse || updateAll)) {
assertTrue(shouldUpdateVersion)
} else {
assertFalse(shouldUpdateVersion)
}
}

@ParameterizedTest
@MethodSource("updateScenarioWithSeedType")
fun `should perform version upgrades regardless of definitions provider`(
updateAll: Boolean,
isInUse: Boolean,
seedType: SeedDefinitionsProviderType,
) {
every { seedDefinitionsProviderType.ordinal } returns seedType.ordinal

val currentVersion = ConnectorRegistryConverters.toActorDefinitionVersion(SOURCE_POSTGRES)
val newVersion = ConnectorRegistryConverters.toActorDefinitionVersion(SOURCE_POSTGRES_2)

val definitionsInUse = if (isInUse) setOf(currentVersion.actorDefinitionId) else setOf()

val shouldUpdateVersion =
applyDefinitionsHelper.getShouldUpdateActorDefinitionDefaultVersion(
currentVersion,
newVersion,
definitionsInUse,
updateAll,
)

if (!isInUse || updateAll) {
assertTrue(shouldUpdateVersion)
} else {
assertFalse(shouldUpdateVersion)
}
}

@ParameterizedTest
@MethodSource("updateScenarioWithSeedType")
fun `should not try to update the connector version if it is already matching`(
updateAll: Boolean,
isInUse: Boolean,
seedType: SeedDefinitionsProviderType,
) {
every { seedDefinitionsProviderType.ordinal } returns seedType.ordinal

val currentVersion = ConnectorRegistryConverters.toActorDefinitionVersion(SOURCE_POSTGRES)
val newVersion = ConnectorRegistryConverters.toActorDefinitionVersion(SOURCE_POSTGRES)

val definitionsInUse = if (isInUse) setOf(currentVersion.actorDefinitionId) else setOf()

val shouldUpdateVersion =
applyDefinitionsHelper.getShouldUpdateActorDefinitionDefaultVersion(
currentVersion,
newVersion,
definitionsInUse,
updateAll,
)

assertFalse(shouldUpdateVersion)
}

companion object {
private const val INITIAL_CONNECTOR_VERSION = "0.1.0"
private const val UPDATED_CONNECTOR_VERSION = "0.2.0"
Expand Down Expand Up @@ -998,6 +1087,19 @@ internal class ApplyDefinitionsHelperTest {
Arguments.of(false, true),
)

@JvmStatic
fun updateScenarioWithSeedType(): Stream<Arguments> =
Stream.of(
Arguments.of(true, true, SeedDefinitionsProviderType.REMOTE),
Arguments.of(true, false, SeedDefinitionsProviderType.REMOTE),
Arguments.of(false, false, SeedDefinitionsProviderType.REMOTE),
Arguments.of(false, true, SeedDefinitionsProviderType.REMOTE),
Arguments.of(true, true, SeedDefinitionsProviderType.LOCAL),
Arguments.of(true, false, SeedDefinitionsProviderType.LOCAL),
Arguments.of(false, false, SeedDefinitionsProviderType.LOCAL),
Arguments.of(false, true, SeedDefinitionsProviderType.LOCAL),
)

@JvmStatic
fun validInsertStates() = listOf(ConnectorEnumRolloutState.CANCELED)

Expand Down

0 comments on commit 2b7326f

Please sign in to comment.