Skip to content

Commit

Permalink
fix: fix connector rollout percentage calculation and persistence (#1…
Browse files Browse the repository at this point in the history
…4471)
  • Loading branch information
clnoll committed Oct 28, 2024
1 parent 09ed30a commit f9ef998
Show file tree
Hide file tree
Showing 5 changed files with 106 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,7 @@ open class ConnectorRolloutHandler
connectorRolloutUpdate.dockerImageTag,
connectorRolloutUpdate.actorDefinitionId,
connectorRolloutUpdate.id,
connectorRolloutUpdate.updatedBy,
),
)
} catch (e: WorkflowUpdateException) {
Expand All @@ -481,6 +482,7 @@ open class ConnectorRolloutHandler
connectorRolloutUpdate.id,
connectorRolloutUpdate.actorIds,
connectorRolloutUpdate.targetPercentage,
connectorRolloutUpdate.updatedBy,
),
)
} catch (e: WorkflowUpdateException) {
Expand All @@ -502,6 +504,7 @@ open class ConnectorRolloutHandler
connectorRolloutFinalize.dockerImageTag,
connectorRolloutFinalize.actorDefinitionId,
connectorRolloutFinalize.id,
connectorRolloutFinalize.updatedBy,
),
)
} catch (e: WorkflowUpdateException) {
Expand All @@ -522,6 +525,9 @@ open class ConnectorRolloutHandler
connectorRolloutFinalize.actorDefinitionId,
connectorRolloutFinalize.id,
ConnectorRolloutFinalState.fromValue(connectorRolloutFinalize.state.toString()),
connectorRolloutFinalize.errorMsg,
connectorRolloutFinalize.failedReason,
connectorRolloutFinalize.updatedBy,
),
)
} catch (e: WorkflowUpdateException) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,14 +68,13 @@ class RolloutActorFinder(
}

val nPreviouslyPinned = getNPinnedToReleaseCandidate(connectorRollout)
val nEligibleOrAlreadyPinned = candidates.size + nPreviouslyPinned

logger.info { "Rollout ${connectorRollout.id}: $nEligibleOrAlreadyPinned including eligible & already pinned to the release candidate" }
logger.info { "Rollout ${connectorRollout.id}: $nPreviouslyPinned already pinned to the release candidate" }

candidates = filterByAlreadyPinned(connectorRollout.actorDefinitionId, candidates)
val nEligibleOrAlreadyPinned = candidates.size + nPreviouslyPinned

logger.info { "Rollout ${connectorRollout.id}: ${candidates.size - nPreviouslyPinned} pinned to a non-RC" }
logger.info { "Rollout ${connectorRollout.id}: $nEligibleOrAlreadyPinned including eligible & already pinned to the release candidate" }
logger.info { "Rollout ${connectorRollout.id}: ${nEligibleOrAlreadyPinned - candidates.size - nPreviouslyPinned} pinned to a non-RC" }

// Calculate the number to pin based on the input percentage
val targetTotalToPin = ceil(nEligibleOrAlreadyPinned * targetPercent / 100.0).toInt()
Expand All @@ -95,12 +94,17 @@ class RolloutActorFinder(
return ActorSelectionInfo(
actorIdsToPin = actorIdsToPin,
nActors = initialNCandidates,
nActorsEligibleOrAlreadyPinned = candidates.size,
nActorsEligibleOrAlreadyPinned = nEligibleOrAlreadyPinned,
nNewPinned = actorIdsToPin.size,
nPreviouslyPinned = nPreviouslyPinned,
// Total percentage pinned out of all eligible, including new and previously pinned
// This could end up being >100% if the number of eligible actors changes between rollout increments.
percentagePinned = if (actorIdsToPin.isEmpty()) 0 else ((nPreviouslyPinned + actorIdsToPin.size) / nEligibleOrAlreadyPinned) * 100,
percentagePinned =
if (nEligibleOrAlreadyPinned == 0) {
0
} else {
ceil((nPreviouslyPinned + actorIdsToPin.size) * 100.0 / nEligibleOrAlreadyPinned).toInt()
},
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,11 @@ class RolloutActorFinderTest {
} else {
assertEquals(DESTINATION_ACTOR_IDS.toSet().size * TARGET_PERCENTAGE / 100, actorSelectionInfo.actorIdsToPin.size)
}
assertEquals(8, actorSelectionInfo.nActors)
assertEquals(4, actorSelectionInfo.nActorsEligibleOrAlreadyPinned)
assertEquals(2, actorSelectionInfo.nNewPinned)
assertEquals(0, actorSelectionInfo.nPreviouslyPinned)
assertEquals(50, actorSelectionInfo.percentagePinned)
}

@ParameterizedTest
Expand Down Expand Up @@ -297,11 +302,83 @@ class RolloutActorFinderTest {
jobPersistence.getLastSyncJobForConnections(any())
}

assertEquals(1, actorSelectionInfo.actorIdsToPin.size)
assertEquals(8, actorSelectionInfo.nActors)
assertEquals(4, actorSelectionInfo.nActorsEligibleOrAlreadyPinned)
assertEquals(1, actorSelectionInfo.nNewPinned)
assertEquals(0, actorSelectionInfo.nPreviouslyPinned)
assertEquals(25, actorSelectionInfo.percentagePinned)
}

@ParameterizedTest
@MethodSource("actorDefinitionIds")
fun `test getActorIdsToPin with previously pinned`(actorDefinitionId: UUID) {
if (actorDefinitionId == SOURCE_ACTOR_DEFINITION_ID) {
assertEquals(1, actorSelectionInfo.actorIdsToPin.size)
every { sourceService.getStandardSourceDefinition(any()) } returns StandardSourceDefinition()
every { scopedConfigurationService.listScopedConfigurationsWithValues(any(), any(), any(), any(), any(), any()) } returns
listOf(
ScopedConfiguration().apply {
id = UUID.randomUUID()
key = "key1"
value = RELEASE_CANDIDATE_VERSION_ID.toString()
resourceId = ORGANIZATION_1_WORKSPACE_1_ACTOR_ID_SOURCE
resourceType = ConfigResourceType.SOURCE
scopeId = UUID.randomUUID()
scopeType = ConfigScopeType.ACTOR
originType = ConfigOriginType.RELEASE_CANDIDATE
},
)
} else {
assertEquals(1, actorSelectionInfo.actorIdsToPin.size)
every { sourceService.getStandardSourceDefinition(any()) } throws ConfigNotFoundException("", "Not found")
every { destinationService.getStandardDestinationDefinition(any()) } returns StandardDestinationDefinition()
every { scopedConfigurationService.listScopedConfigurationsWithValues(any(), any(), any(), any(), any(), any()) } returns
listOf(
ScopedConfiguration().apply {
id = UUID.randomUUID()
key = "key1"
value = RELEASE_CANDIDATE_VERSION_ID.toString()
resourceId = ORGANIZATION_1_WORKSPACE_1_ACTOR_ID_DESTINATION
resourceType = ConfigResourceType.DESTINATION
scopeId = UUID.randomUUID()
scopeType = ConfigScopeType.ACTOR
originType = ConfigOriginType.RELEASE_CANDIDATE
},
)
}
every { actorDefinitionVersionUpdater.getConfigScopeMaps(any()) } returns CONFIG_SCOPE_MAP.values
every {
actorDefinitionVersionUpdater.getUpgradeCandidates(any(), any())
} returns CONFIG_SCOPE_MAP.map { it.key }.toSet() -
setOf(
ORGANIZATION_1_WORKSPACE_1_ACTOR_ID_SOURCE,
ORGANIZATION_1_WORKSPACE_1_ACTOR_ID_DESTINATION,
)
every { scopedConfigurationService.getScopedConfigurations(any(), any(), any(), any()) } returns mapOf()
every { connectionService.listConnectionsByActorDefinitionIdAndType(any(), any(), any()) } returns MOCK_CONNECTION_SYNCS
every { jobPersistence.getLastSyncJobForConnections(any()) } returns JOB_STATUS_SUMMARIES

val actorSelectionInfo = rolloutActorFinder.getActorSelectionInfo(createMockConnectorRollout(actorDefinitionId), 1)

verify {
if (actorDefinitionId == SOURCE_ACTOR_DEFINITION_ID) {
sourceService.getStandardSourceDefinition(any())
} else {
destinationService.getStandardDestinationDefinition(any())
}
actorDefinitionVersionUpdater.getConfigScopeMaps(any())
actorDefinitionVersionUpdater.getUpgradeCandidates(any(), any())
scopedConfigurationService.listScopedConfigurationsWithValues(any(), any(), any(), any(), any(), any())
connectionService.listConnectionsByActorDefinitionIdAndType(any(), any(), any())
jobPersistence.getLastSyncJobForConnections(any())
}

// We already exceed the target percentage so shouldn't pin something new
assertEquals(0, actorSelectionInfo.actorIdsToPin.size)
assertEquals(8, actorSelectionInfo.nActors)
assertEquals(4, actorSelectionInfo.nActorsEligibleOrAlreadyPinned)
assertEquals(0, actorSelectionInfo.nNewPinned)
assertEquals(1, actorSelectionInfo.nPreviouslyPinned)
assertEquals(25, actorSelectionInfo.percentagePinned)
}

@ParameterizedTest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ class FinalizeRolloutActivityImpl(private val airbyteApiClient: AirbyteApiClient
val (state, errorMsg, failureReason) =
when (input.result) {
ConnectorRolloutFinalState.SUCCEEDED -> Triple(ConnectorRolloutStateTerminal.SUCCEEDED, null, null)
ConnectorRolloutFinalState.FAILED_ROLLED_BACK -> Triple(ConnectorRolloutStateTerminal.FAILED_ROLLED_BACK, null, null)
ConnectorRolloutFinalState.CANCELED_ROLLED_BACK -> Triple(ConnectorRolloutStateTerminal.CANCELED_ROLLED_BACK, null, null)
ConnectorRolloutFinalState.FAILED_ROLLED_BACK -> Triple(ConnectorRolloutStateTerminal.FAILED_ROLLED_BACK, null, input.failedReason)
ConnectorRolloutFinalState.CANCELED_ROLLED_BACK -> Triple(ConnectorRolloutStateTerminal.CANCELED_ROLLED_BACK, input.errorMsg, null)
else -> throw RuntimeException("Unexpected termination state: ${input.result}")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,12 @@ import io.airbyte.data.services.ConnectorRolloutService
import io.airbyte.data.services.impls.data.mappers.toConfigModel
import io.airbyte.data.services.impls.data.mappers.toEntity
import io.airbyte.db.instance.configs.jooq.generated.enums.ConnectorRolloutStateType
import io.github.oshai.kotlinlogging.KotlinLogging
import jakarta.inject.Singleton
import java.util.UUID

private val logger = KotlinLogging.logger {}

@Singleton
open class ConnectorRolloutServiceDataImpl(private val repository: ConnectorRolloutRepository) : ConnectorRolloutService {
override fun getConnectorRollout(id: UUID): ConnectorRollout {
Expand Down Expand Up @@ -55,10 +58,14 @@ open class ConnectorRolloutServiceDataImpl(private val repository: ConnectorRoll
}

override fun writeConnectorRollout(connectorRollout: ConnectorRollout): ConnectorRollout {
val entity = connectorRollout.toEntity()

if (repository.existsById(connectorRollout.id)) {
return repository.update(connectorRollout.toEntity()).toConfigModel()
logger.info { "Updating existing connector rollout: connectorRollout=$connectorRollout entity=$entity" }
return repository.update(entity).toConfigModel()
}
return repository.save(connectorRollout.toEntity()).toConfigModel()
logger.info { "Creating new connector rollout: connectorRollout=$connectorRollout entity=$entity" }
return repository.save(entity).toConfigModel()
}

override fun listConnectorRollouts(): List<ConnectorRollout> {
Expand Down

0 comments on commit f9ef998

Please sign in to comment.