Skip to content

Commit

Permalink
feat: progressive rollouts table migration (#13819)
Browse files Browse the repository at this point in the history
  • Loading branch information
clnoll committed Sep 5, 2024
1 parent 910f012 commit 6db66c0
Show file tree
Hide file tree
Showing 4 changed files with 270 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ class BootloaderTest {

// ⚠️ This line should change with every new migration to show that you meant to make a new
// migration to the prod database
private static final String CURRENT_CONFIGS_MIGRATION_VERSION = "0.57.4.017";
private static final String CURRENT_CONFIGS_MIGRATION_VERSION = "0.57.5.001";
private static final String CURRENT_JOBS_MIGRATION_VERSION = "0.57.2.005";

@BeforeEach
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,241 @@
/*
* Copyright (c) 2020-2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.db.instance.configs.migrations;

import java.util.EnumSet;
import java.util.LinkedHashSet;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import org.flywaydb.core.api.migration.BaseJavaMigration;
import org.flywaydb.core.api.migration.Context;
import org.jetbrains.annotations.NotNull;
import org.jooq.Catalog;
import org.jooq.DSLContext;
import org.jooq.EnumType;
import org.jooq.Field;
import org.jooq.Record;
import org.jooq.Schema;
import org.jooq.Table;
import org.jooq.impl.DSL;
import org.jooq.impl.SQLDataType;
import org.jooq.impl.SchemaImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class V0_57_5_001__AddConnectorRolloutTable extends BaseJavaMigration {

private static final Logger LOGGER = LoggerFactory.getLogger(V0_57_5_001__AddConnectorRolloutTable.class);
private static final Table<Record> CONNECTOR_ROLLOUT_TABLE = DSL.table("connector_rollout");
private static final Field<UUID> ACTOR_DEFINITION_ID = DSL.field("actor_definition_id", UUID.class);
private static final Field<String> STATE = DSL.field("state", String.class);
private static final String CONNECTOR_ROLLOUT_STATE_CATEGORY = "connector_rollout_state_category";
private static final String CONNECTOR_ROLLOUT_STATE_TYPE = "connector_rollout_state_type";
private static final String CONNECTOR_ROLLOUT_STRATEGY_TYPE = "connector_rollout_strategy_type";

@Override
public void migrate(final Context context) throws Exception {
LOGGER.info("Running migration: {}", this.getClass().getSimpleName());

// Warning: please do not use any jOOQ generated code to write a migration.
// As database schema changes, the generated jOOQ code can be deprecated. So
// old migration may not compile if there is any generated code.
final DSLContext ctx = DSL.using(context.getConnection());
createStateCategoryEnum(ctx);
createStateTypeEnum(ctx);
createStrategyTypeEnum(ctx);
createRolloutTable(ctx);
createPartialUniqueIndex(ctx);
LOGGER.info("connector_rollout table created");
}

static void createStateTypeEnum(final DSLContext ctx) {
ctx.createType(CONNECTOR_ROLLOUT_STATE_TYPE).asEnum(
ConnectorRolloutStateType.INITIALIZED.literal,
ConnectorRolloutStateType.IN_PROGRESS.literal,
ConnectorRolloutStateType.PAUSED.literal,
ConnectorRolloutStateType.FINALIZING.literal,
ConnectorRolloutStateType.SUCCEEDED.literal,
ConnectorRolloutStateType.ERRORED.literal,
ConnectorRolloutStateType.FAILED_ROLLED_BACK.literal,
ConnectorRolloutStateType.CANCELED_ROLLED_BACK.literal).execute();
}

static void createStateCategoryEnum(final DSLContext ctx) {
ctx.createType(CONNECTOR_ROLLOUT_STATE_CATEGORY).asEnum(
ConnectorRolloutStateCategory.ACTIVE.literal,
ConnectorRolloutStateCategory.TERMINAL.literal).execute();
}

static void createStrategyTypeEnum(final DSLContext ctx) {
ctx.createType(CONNECTOR_ROLLOUT_STRATEGY_TYPE)
.asEnum(ConnectorRolloutStrategyType.MANUAL.literal, ConnectorRolloutStrategyType.AUTOMATED.literal,
ConnectorRolloutStrategyType.OVERRIDDEN.literal)
.execute();
}

static void createRolloutTable(final DSLContext ctx) {
ctx.createTableIfNotExists(CONNECTOR_ROLLOUT_TABLE)
.column("id", SQLDataType.UUID.nullable(false))
.column("actor_definition_id", SQLDataType.UUID.nullable(false))
.column("release_candidate_version_id", SQLDataType.UUID.nullable(false))
.column("initial_version_id", SQLDataType.UUID.nullable(true))
.column("state", SQLDataType.VARCHAR(32).nullable(false))
.column("initial_rollout_pct", SQLDataType.INTEGER.nullable(false))
.column("current_target_rollout_pct", SQLDataType.INTEGER.nullable(true))
.column("final_target_rollout_pct", SQLDataType.INTEGER.nullable(false))
.column("has_breaking_changes", SQLDataType.BOOLEAN.nullable(false))
.column("max_step_wait_time_mins", SQLDataType.INTEGER.nullable(false))
.column("updated_by", SQLDataType.UUID.nullable(true))
.column("created_at", SQLDataType.TIMESTAMP.nullable(false).defaultValue(DSL.currentTimestamp()))
.column("updated_at", SQLDataType.TIMESTAMP.nullable(false).defaultValue(DSL.currentTimestamp()))
.column("completed_at", SQLDataType.TIMESTAMP.nullable(true))
.column("expires_at", SQLDataType.TIMESTAMP.nullable(false))
.column("error_msg", SQLDataType.VARCHAR(1024).nullable(true))
.column("failed_reason", SQLDataType.VARCHAR(1024).nullable(true))
.column("rollout_strategy", SQLDataType.VARCHAR(256).nullable(false))
.constraints(
DSL.constraint("pk_connector_rollout").primaryKey("id"),
DSL.constraint("fk_actor_definition_id").foreignKey("actor_definition_id").references("actor_definition", "id"),
DSL.constraint("fk_initial_version_id").foreignKey("initial_version_id").references("actor_definition_version", "id"),
DSL.constraint("fk_release_candidate_version_id").foreignKey("release_candidate_version_id").references("actor_definition_version", "id"),
DSL.constraint("fk_updated_by").foreignKey("updated_by").references("user", "id"))
.execute();
}

static void createPartialUniqueIndex(final DSLContext ctx) {
// Create a partial unique index to guarantee that only one active rollout exists for a given
// connector.
ctx.createUniqueIndex("actor_definition_id_state_unique_idx")
.on(CONNECTOR_ROLLOUT_TABLE, ACTOR_DEFINITION_ID)
.where(STATE.in(ConnectorRolloutStateType.getActiveStates())).execute();
}

public enum ConnectorRolloutStateCategory implements EnumType {

ACTIVE("active"),
TERMINAL("terminal");

private final String literal;

ConnectorRolloutStateCategory(final String literal) {
this.literal = literal;
}

@Override
public @NotNull String getLiteral() {
return literal;
}

@Override
public Catalog getCatalog() {
return getSchema() == null ? null : getSchema().getCatalog();
}

@Override
public Schema getSchema() {
return new SchemaImpl(DSL.name("public"), null);
}

@Override
public String getName() {
return CONNECTOR_ROLLOUT_STATE_CATEGORY;
}

}

public enum ConnectorRolloutStateType implements EnumType {

INITIALIZED("initialized", ConnectorRolloutStateCategory.ACTIVE),
IN_PROGRESS("in_progress", ConnectorRolloutStateCategory.ACTIVE),
PAUSED("paused", ConnectorRolloutStateCategory.ACTIVE),
FINALIZING("finalizing", ConnectorRolloutStateCategory.ACTIVE),
ERRORED("errored", ConnectorRolloutStateCategory.ACTIVE),
SUCCEEDED("succeeded", ConnectorRolloutStateCategory.TERMINAL),
FAILED_ROLLED_BACK("failed_rolled_back", ConnectorRolloutStateCategory.TERMINAL),
CANCELED_ROLLED_BACK("canceled_rolled_back", ConnectorRolloutStateCategory.TERMINAL);

private final String literal;
private final ConnectorRolloutStateCategory category;

ConnectorRolloutStateType(final String literal, final ConnectorRolloutStateCategory category) {
this.literal = literal;
this.category = category;
}

@Override
public @NotNull String getLiteral() {
return literal;
}

public ConnectorRolloutStateCategory getCategory() {
return category;
}

public static Set<ConnectorRolloutStateType> getActiveStates() {
return EnumSet.allOf(ConnectorRolloutStateType.class).stream()
.filter(state -> state.getCategory() == ConnectorRolloutStateCategory.ACTIVE)
.sorted((state1, state2) -> state1.getLiteral().compareTo(state2.getLiteral()))
.collect(Collectors.toCollection(LinkedHashSet::new));
}

public static Set<ConnectorRolloutStateType> getTerminalStates() {
return EnumSet.allOf(ConnectorRolloutStateType.class).stream()
.filter(state -> state.getCategory() == ConnectorRolloutStateCategory.TERMINAL)
.collect(Collectors.toSet());
}

@Override
public Catalog getCatalog() {
return getSchema() == null ? null : getSchema().getCatalog();
}

@Override
public Schema getSchema() {
return new SchemaImpl(DSL.name("public"), null);
}

@Override
public String getName() {
return CONNECTOR_ROLLOUT_STATE_TYPE;
}

}

enum ConnectorRolloutStrategyType implements EnumType {

MANUAL("manual"),
AUTOMATED("automated"),
OVERRIDDEN("overridden");

private final String literal;

ConnectorRolloutStrategyType(final String literal) {
this.literal = literal;
}

@Override
public Catalog getCatalog() {
return getSchema() == null ? null : getSchema().getCatalog();
}

@Override
public Schema getSchema() {
return new SchemaImpl(DSL.name("public"), null);
}

@Override
public String getName() {
return CONNECTOR_ROLLOUT_STRATEGY_TYPE;
}

@Override
public @NotNull String getLiteral() {
return literal;
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,27 @@ create table "public"."connector_builder_project" (
"contribution_actor_definition_id" uuid,
constraint "connector_builder_project_pkey" primary key ("id")
);
create table "public"."connector_rollout" (
"id" uuid not null,
"actor_definition_id" uuid not null,
"release_candidate_version_id" uuid not null,
"initial_version_id" uuid,
"state" varchar(32) not null,
"initial_rollout_pct" int not null,
"current_target_rollout_pct" int,
"final_target_rollout_pct" int not null,
"has_breaking_changes" boolean not null,
"max_step_wait_time_mins" int not null,
"updated_by" uuid,
"created_at" timestamp(6) not null default current_timestamp,
"updated_at" timestamp(6) not null default current_timestamp,
"completed_at" timestamp(6),
"expires_at" timestamp(6) not null,
"error_msg" varchar(1024),
"failed_reason" varchar(1024),
"rollout_strategy" varchar(256) not null,
constraint "pk_connector_rollout" primary key ("id")
);
create table "public"."declarative_manifest" (
"actor_definition_id" uuid not null,
"description" varchar(256) not null,
Expand Down Expand Up @@ -487,6 +508,8 @@ create index "connection_status_idx" on "public"."connection"("status" asc);
create index "connection_operation_connection_id_idx" on "public"."connection_operation"("connection_id" asc);
create index "idx_connection_timeline_connection_id" on "public"."connection_timeline_event"("connection_id" asc, "created_at" desc, "event_type" asc);
create index "connector_builder_project_workspace_idx" on "public"."connector_builder_project"("workspace_id" asc);
create unique index "actor_definition_id_state_unique_idx" on "public"."connector_rollout"("actor_definition_id" asc)
where (((state)::text = ANY ((ARRAY['errored'::character varying, 'finalizing'::character varying, 'in_progress'::character varying, 'initialized'::character varying, 'paused'::character varying])::text[])));
create index "organization_email_domain_email_domain_idx" on "public"."organization_email_domain"("email_domain" asc);
create index "organization_email_domain_organization_id_idx" on "public"."organization_email_domain"("organization_id" asc);
create index "organization_payment_config_grace_period_end_at_idx" on "public"."organization_payment_config"("grace_period_end_at" asc);
Expand Down Expand Up @@ -538,6 +561,10 @@ alter table "public"."connection_operation" add constraint "connection_operation
alter table "public"."connection_timeline_event" add constraint "connection_timeline_event_connection_id_fkey" foreign key ("connection_id") references "public"."connection" ("id");
alter table "public"."connection_timeline_event" add constraint "connection_timeline_event_user_id_fkey" foreign key ("user_id") references "public"."user" ("id");
alter table "public"."connector_builder_project" add constraint "connector_builder_project_base_adv_id_fkey" foreign key ("base_actor_definition_version_id") references "public"."actor_definition_version" ("id");
alter table "public"."connector_rollout" add constraint "fk_actor_definition_id" foreign key ("actor_definition_id") references "public"."actor_definition" ("id");
alter table "public"."connector_rollout" add constraint "fk_initial_version_id" foreign key ("initial_version_id") references "public"."actor_definition_version" ("id");
alter table "public"."connector_rollout" add constraint "fk_release_candidate_version_id" foreign key ("release_candidate_version_id") references "public"."actor_definition_version" ("id");
alter table "public"."connector_rollout" add constraint "fk_updated_by" foreign key ("updated_by") references "public"."user" ("id");
alter table "public"."notification_configuration" add constraint "notification_configuration_connection_id_fkey" foreign key ("connection_id") references "public"."connection" ("id");
alter table "public"."operation" add constraint "operation_workspace_id_fkey" foreign key ("workspace_id") references "public"."workspace" ("id");
alter table "public"."organization_email_domain" add constraint "organization_email_domain_organization_id_fkey" foreign key ("organization_id") references "public"."organization" ("id");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ protected static void truncateAllTables() throws SQLException {
connection_operation,
connection_timeline_event,
connector_builder_project,
connector_rollout,
declarative_manifest,
notification_configuration,
operation,
Expand Down

0 comments on commit 6db66c0

Please sign in to comment.