From 6db66c05d4608bbd0f7604d0e2058e6a00a1c7f3 Mon Sep 17 00:00:00 2001 From: Catherine Noll Date: Thu, 5 Sep 2024 14:20:36 -0400 Subject: [PATCH] feat: progressive rollouts table migration (#13819) --- .../io/airbyte/bootloader/BootloaderTest.java | 2 +- ...V0_57_5_001__AddConnectorRolloutTable.java | 241 ++++++++++++++++++ .../configs_database/schema_dump.txt | 27 ++ .../test/utils/BaseConfigDatabaseTest.java | 1 + 4 files changed, 270 insertions(+), 1 deletion(-) create mode 100644 airbyte-db/db-lib/src/main/java/io/airbyte/db/instance/configs/migrations/V0_57_5_001__AddConnectorRolloutTable.java diff --git a/airbyte-bootloader/src/test-integration/java/io/airbyte/bootloader/BootloaderTest.java b/airbyte-bootloader/src/test-integration/java/io/airbyte/bootloader/BootloaderTest.java index 8243ac5d857..7fc5bf8ba7a 100644 --- a/airbyte-bootloader/src/test-integration/java/io/airbyte/bootloader/BootloaderTest.java +++ b/airbyte-bootloader/src/test-integration/java/io/airbyte/bootloader/BootloaderTest.java @@ -101,7 +101,7 @@ class BootloaderTest { // ⚠️ This line should change with every new migration to show that you meant to make a new // migration to the prod database - private static final String CURRENT_CONFIGS_MIGRATION_VERSION = "0.57.4.017"; + private static final String CURRENT_CONFIGS_MIGRATION_VERSION = "0.57.5.001"; private static final String CURRENT_JOBS_MIGRATION_VERSION = "0.57.2.005"; @BeforeEach diff --git a/airbyte-db/db-lib/src/main/java/io/airbyte/db/instance/configs/migrations/V0_57_5_001__AddConnectorRolloutTable.java b/airbyte-db/db-lib/src/main/java/io/airbyte/db/instance/configs/migrations/V0_57_5_001__AddConnectorRolloutTable.java new file mode 100644 index 00000000000..4046f5a3d69 --- /dev/null +++ b/airbyte-db/db-lib/src/main/java/io/airbyte/db/instance/configs/migrations/V0_57_5_001__AddConnectorRolloutTable.java @@ -0,0 +1,241 @@ +/* + * Copyright (c) 2020-2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.db.instance.configs.migrations; + +import java.util.EnumSet; +import java.util.LinkedHashSet; +import java.util.Set; +import java.util.UUID; +import java.util.stream.Collectors; +import org.flywaydb.core.api.migration.BaseJavaMigration; +import org.flywaydb.core.api.migration.Context; +import org.jetbrains.annotations.NotNull; +import org.jooq.Catalog; +import org.jooq.DSLContext; +import org.jooq.EnumType; +import org.jooq.Field; +import org.jooq.Record; +import org.jooq.Schema; +import org.jooq.Table; +import org.jooq.impl.DSL; +import org.jooq.impl.SQLDataType; +import org.jooq.impl.SchemaImpl; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class V0_57_5_001__AddConnectorRolloutTable extends BaseJavaMigration { + + private static final Logger LOGGER = LoggerFactory.getLogger(V0_57_5_001__AddConnectorRolloutTable.class); + private static final Table CONNECTOR_ROLLOUT_TABLE = DSL.table("connector_rollout"); + private static final Field ACTOR_DEFINITION_ID = DSL.field("actor_definition_id", UUID.class); + private static final Field STATE = DSL.field("state", String.class); + private static final String CONNECTOR_ROLLOUT_STATE_CATEGORY = "connector_rollout_state_category"; + private static final String CONNECTOR_ROLLOUT_STATE_TYPE = "connector_rollout_state_type"; + private static final String CONNECTOR_ROLLOUT_STRATEGY_TYPE = "connector_rollout_strategy_type"; + + @Override + public void migrate(final Context context) throws Exception { + LOGGER.info("Running migration: {}", this.getClass().getSimpleName()); + + // Warning: please do not use any jOOQ generated code to write a migration. + // As database schema changes, the generated jOOQ code can be deprecated. So + // old migration may not compile if there is any generated code. + final DSLContext ctx = DSL.using(context.getConnection()); + createStateCategoryEnum(ctx); + createStateTypeEnum(ctx); + createStrategyTypeEnum(ctx); + createRolloutTable(ctx); + createPartialUniqueIndex(ctx); + LOGGER.info("connector_rollout table created"); + } + + static void createStateTypeEnum(final DSLContext ctx) { + ctx.createType(CONNECTOR_ROLLOUT_STATE_TYPE).asEnum( + ConnectorRolloutStateType.INITIALIZED.literal, + ConnectorRolloutStateType.IN_PROGRESS.literal, + ConnectorRolloutStateType.PAUSED.literal, + ConnectorRolloutStateType.FINALIZING.literal, + ConnectorRolloutStateType.SUCCEEDED.literal, + ConnectorRolloutStateType.ERRORED.literal, + ConnectorRolloutStateType.FAILED_ROLLED_BACK.literal, + ConnectorRolloutStateType.CANCELED_ROLLED_BACK.literal).execute(); + } + + static void createStateCategoryEnum(final DSLContext ctx) { + ctx.createType(CONNECTOR_ROLLOUT_STATE_CATEGORY).asEnum( + ConnectorRolloutStateCategory.ACTIVE.literal, + ConnectorRolloutStateCategory.TERMINAL.literal).execute(); + } + + static void createStrategyTypeEnum(final DSLContext ctx) { + ctx.createType(CONNECTOR_ROLLOUT_STRATEGY_TYPE) + .asEnum(ConnectorRolloutStrategyType.MANUAL.literal, ConnectorRolloutStrategyType.AUTOMATED.literal, + ConnectorRolloutStrategyType.OVERRIDDEN.literal) + .execute(); + } + + static void createRolloutTable(final DSLContext ctx) { + ctx.createTableIfNotExists(CONNECTOR_ROLLOUT_TABLE) + .column("id", SQLDataType.UUID.nullable(false)) + .column("actor_definition_id", SQLDataType.UUID.nullable(false)) + .column("release_candidate_version_id", SQLDataType.UUID.nullable(false)) + .column("initial_version_id", SQLDataType.UUID.nullable(true)) + .column("state", SQLDataType.VARCHAR(32).nullable(false)) + .column("initial_rollout_pct", SQLDataType.INTEGER.nullable(false)) + .column("current_target_rollout_pct", SQLDataType.INTEGER.nullable(true)) + .column("final_target_rollout_pct", SQLDataType.INTEGER.nullable(false)) + .column("has_breaking_changes", SQLDataType.BOOLEAN.nullable(false)) + .column("max_step_wait_time_mins", SQLDataType.INTEGER.nullable(false)) + .column("updated_by", SQLDataType.UUID.nullable(true)) + .column("created_at", SQLDataType.TIMESTAMP.nullable(false).defaultValue(DSL.currentTimestamp())) + .column("updated_at", SQLDataType.TIMESTAMP.nullable(false).defaultValue(DSL.currentTimestamp())) + .column("completed_at", SQLDataType.TIMESTAMP.nullable(true)) + .column("expires_at", SQLDataType.TIMESTAMP.nullable(false)) + .column("error_msg", SQLDataType.VARCHAR(1024).nullable(true)) + .column("failed_reason", SQLDataType.VARCHAR(1024).nullable(true)) + .column("rollout_strategy", SQLDataType.VARCHAR(256).nullable(false)) + .constraints( + DSL.constraint("pk_connector_rollout").primaryKey("id"), + DSL.constraint("fk_actor_definition_id").foreignKey("actor_definition_id").references("actor_definition", "id"), + DSL.constraint("fk_initial_version_id").foreignKey("initial_version_id").references("actor_definition_version", "id"), + DSL.constraint("fk_release_candidate_version_id").foreignKey("release_candidate_version_id").references("actor_definition_version", "id"), + DSL.constraint("fk_updated_by").foreignKey("updated_by").references("user", "id")) + .execute(); + } + + static void createPartialUniqueIndex(final DSLContext ctx) { + // Create a partial unique index to guarantee that only one active rollout exists for a given + // connector. + ctx.createUniqueIndex("actor_definition_id_state_unique_idx") + .on(CONNECTOR_ROLLOUT_TABLE, ACTOR_DEFINITION_ID) + .where(STATE.in(ConnectorRolloutStateType.getActiveStates())).execute(); + } + + public enum ConnectorRolloutStateCategory implements EnumType { + + ACTIVE("active"), + TERMINAL("terminal"); + + private final String literal; + + ConnectorRolloutStateCategory(final String literal) { + this.literal = literal; + } + + @Override + public @NotNull String getLiteral() { + return literal; + } + + @Override + public Catalog getCatalog() { + return getSchema() == null ? null : getSchema().getCatalog(); + } + + @Override + public Schema getSchema() { + return new SchemaImpl(DSL.name("public"), null); + } + + @Override + public String getName() { + return CONNECTOR_ROLLOUT_STATE_CATEGORY; + } + + } + + public enum ConnectorRolloutStateType implements EnumType { + + INITIALIZED("initialized", ConnectorRolloutStateCategory.ACTIVE), + IN_PROGRESS("in_progress", ConnectorRolloutStateCategory.ACTIVE), + PAUSED("paused", ConnectorRolloutStateCategory.ACTIVE), + FINALIZING("finalizing", ConnectorRolloutStateCategory.ACTIVE), + ERRORED("errored", ConnectorRolloutStateCategory.ACTIVE), + SUCCEEDED("succeeded", ConnectorRolloutStateCategory.TERMINAL), + FAILED_ROLLED_BACK("failed_rolled_back", ConnectorRolloutStateCategory.TERMINAL), + CANCELED_ROLLED_BACK("canceled_rolled_back", ConnectorRolloutStateCategory.TERMINAL); + + private final String literal; + private final ConnectorRolloutStateCategory category; + + ConnectorRolloutStateType(final String literal, final ConnectorRolloutStateCategory category) { + this.literal = literal; + this.category = category; + } + + @Override + public @NotNull String getLiteral() { + return literal; + } + + public ConnectorRolloutStateCategory getCategory() { + return category; + } + + public static Set getActiveStates() { + return EnumSet.allOf(ConnectorRolloutStateType.class).stream() + .filter(state -> state.getCategory() == ConnectorRolloutStateCategory.ACTIVE) + .sorted((state1, state2) -> state1.getLiteral().compareTo(state2.getLiteral())) + .collect(Collectors.toCollection(LinkedHashSet::new)); + } + + public static Set getTerminalStates() { + return EnumSet.allOf(ConnectorRolloutStateType.class).stream() + .filter(state -> state.getCategory() == ConnectorRolloutStateCategory.TERMINAL) + .collect(Collectors.toSet()); + } + + @Override + public Catalog getCatalog() { + return getSchema() == null ? null : getSchema().getCatalog(); + } + + @Override + public Schema getSchema() { + return new SchemaImpl(DSL.name("public"), null); + } + + @Override + public String getName() { + return CONNECTOR_ROLLOUT_STATE_TYPE; + } + + } + + enum ConnectorRolloutStrategyType implements EnumType { + + MANUAL("manual"), + AUTOMATED("automated"), + OVERRIDDEN("overridden"); + + private final String literal; + + ConnectorRolloutStrategyType(final String literal) { + this.literal = literal; + } + + @Override + public Catalog getCatalog() { + return getSchema() == null ? null : getSchema().getCatalog(); + } + + @Override + public Schema getSchema() { + return new SchemaImpl(DSL.name("public"), null); + } + + @Override + public String getName() { + return CONNECTOR_ROLLOUT_STRATEGY_TYPE; + } + + @Override + public @NotNull String getLiteral() { + return literal; + } + + } + +} diff --git a/airbyte-db/db-lib/src/main/resources/configs_database/schema_dump.txt b/airbyte-db/db-lib/src/main/resources/configs_database/schema_dump.txt index 75bf36575ba..34129cb639c 100644 --- a/airbyte-db/db-lib/src/main/resources/configs_database/schema_dump.txt +++ b/airbyte-db/db-lib/src/main/resources/configs_database/schema_dump.txt @@ -209,6 +209,27 @@ create table "public"."connector_builder_project" ( "contribution_actor_definition_id" uuid, constraint "connector_builder_project_pkey" primary key ("id") ); +create table "public"."connector_rollout" ( + "id" uuid not null, + "actor_definition_id" uuid not null, + "release_candidate_version_id" uuid not null, + "initial_version_id" uuid, + "state" varchar(32) not null, + "initial_rollout_pct" int not null, + "current_target_rollout_pct" int, + "final_target_rollout_pct" int not null, + "has_breaking_changes" boolean not null, + "max_step_wait_time_mins" int not null, + "updated_by" uuid, + "created_at" timestamp(6) not null default current_timestamp, + "updated_at" timestamp(6) not null default current_timestamp, + "completed_at" timestamp(6), + "expires_at" timestamp(6) not null, + "error_msg" varchar(1024), + "failed_reason" varchar(1024), + "rollout_strategy" varchar(256) not null, + constraint "pk_connector_rollout" primary key ("id") +); create table "public"."declarative_manifest" ( "actor_definition_id" uuid not null, "description" varchar(256) not null, @@ -487,6 +508,8 @@ create index "connection_status_idx" on "public"."connection"("status" asc); create index "connection_operation_connection_id_idx" on "public"."connection_operation"("connection_id" asc); create index "idx_connection_timeline_connection_id" on "public"."connection_timeline_event"("connection_id" asc, "created_at" desc, "event_type" asc); create index "connector_builder_project_workspace_idx" on "public"."connector_builder_project"("workspace_id" asc); +create unique index "actor_definition_id_state_unique_idx" on "public"."connector_rollout"("actor_definition_id" asc) +where (((state)::text = ANY ((ARRAY['errored'::character varying, 'finalizing'::character varying, 'in_progress'::character varying, 'initialized'::character varying, 'paused'::character varying])::text[]))); create index "organization_email_domain_email_domain_idx" on "public"."organization_email_domain"("email_domain" asc); create index "organization_email_domain_organization_id_idx" on "public"."organization_email_domain"("organization_id" asc); create index "organization_payment_config_grace_period_end_at_idx" on "public"."organization_payment_config"("grace_period_end_at" asc); @@ -538,6 +561,10 @@ alter table "public"."connection_operation" add constraint "connection_operation alter table "public"."connection_timeline_event" add constraint "connection_timeline_event_connection_id_fkey" foreign key ("connection_id") references "public"."connection" ("id"); alter table "public"."connection_timeline_event" add constraint "connection_timeline_event_user_id_fkey" foreign key ("user_id") references "public"."user" ("id"); alter table "public"."connector_builder_project" add constraint "connector_builder_project_base_adv_id_fkey" foreign key ("base_actor_definition_version_id") references "public"."actor_definition_version" ("id"); +alter table "public"."connector_rollout" add constraint "fk_actor_definition_id" foreign key ("actor_definition_id") references "public"."actor_definition" ("id"); +alter table "public"."connector_rollout" add constraint "fk_initial_version_id" foreign key ("initial_version_id") references "public"."actor_definition_version" ("id"); +alter table "public"."connector_rollout" add constraint "fk_release_candidate_version_id" foreign key ("release_candidate_version_id") references "public"."actor_definition_version" ("id"); +alter table "public"."connector_rollout" add constraint "fk_updated_by" foreign key ("updated_by") references "public"."user" ("id"); alter table "public"."notification_configuration" add constraint "notification_configuration_connection_id_fkey" foreign key ("connection_id") references "public"."connection" ("id"); alter table "public"."operation" add constraint "operation_workspace_id_fkey" foreign key ("workspace_id") references "public"."workspace" ("id"); alter table "public"."organization_email_domain" add constraint "organization_email_domain_organization_id_fkey" foreign key ("organization_id") references "public"."organization" ("id"); diff --git a/airbyte-test-utils/src/main/java/io/airbyte/test/utils/BaseConfigDatabaseTest.java b/airbyte-test-utils/src/main/java/io/airbyte/test/utils/BaseConfigDatabaseTest.java index f23f19f7f90..ccb2a0f68ff 100644 --- a/airbyte-test-utils/src/main/java/io/airbyte/test/utils/BaseConfigDatabaseTest.java +++ b/airbyte-test-utils/src/main/java/io/airbyte/test/utils/BaseConfigDatabaseTest.java @@ -134,6 +134,7 @@ protected static void truncateAllTables() throws SQLException { connection_operation, connection_timeline_event, connector_builder_project, + connector_rollout, declarative_manifest, notification_configuration, operation,