Skip to content

Commit

Permalink
Switch to use Bootloader. (airbytehq#8584)
Browse files Browse the repository at this point in the history
- Add the CONFIGS_DATABASE_MINIMUM_FLYWAY_MIGRATION_VERSION and JOBS_DATABASE_MINIMUM_FLYWAY_MIGRATION_VERSION. These are env vars that will determine if the database is ready for an application to start.
- Add the CONFIGS_DATABASE_INITIALIZATION_TIMEOUT_MS and the JOBS_DATABASE_INITIALIZATION_TIMEOUT_MS env vars to determine how long an application should wait for the DB before giving up.
- Create the MinimumFlywayMigrationVersionCheck class. This class contains all the assertions to check if 1) a database is initialised. 2) a database meets the minimum migration version.
- Remove all set up operations from the ServerApp. Use MinimumFlywayMigrationVersionCheck operations instead.
- I also had to modify the Databases and BaseDatabaseInstance classes to support connecting to a database with timeouts. We would previously try forever.
- Add Bootloader to the relevant docker files and Kube files.
- Clean up the migration acceptance tests so it's clear what is happening.
  • Loading branch information
davinchia authored Dec 14, 2021
1 parent 57c4590 commit 5cf3967
Show file tree
Hide file tree
Showing 33 changed files with 530 additions and 206 deletions.
2 changes: 2 additions & 0 deletions .env
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@ DATABASE_PORT=5432
DATABASE_DB=airbyte
# translate manually DATABASE_URL=jdbc:postgresql://${DATABASE_HOST}:${DATABASE_PORT/${DATABASE_DB} (do not include the username or password here)
DATABASE_URL=jdbc:postgresql://db:5432/airbyte
JOBS_DATABASE_MINIMUM_FLYWAY_MIGRATION_VERSION=0.29.15.001

# Airbyte Internal Config Database, default to reuse the Job Database when they are empty
# Usually you do not need to set them; they are explicitly left empty to mute docker compose warnings
CONFIG_DATABASE_USER=
CONFIG_DATABASE_PASSWORD=
CONFIG_DATABASE_URL=
CONFIGS_DATABASE_MINIMUM_FLYWAY_MIGRATION_VERSION=0.30.22.001

RUN_DATABASE_MIGRATION_ON_STARTUP=true

Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/gradle.yml
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ jobs:
run: ./tools/bin/acceptance_test.sh

- name: Automatic Migration Acceptance Test
run: MIGRATION_TEST_VERSION=$(grep VERSION .env | tr -d "VERSION=") SUB_BUILD=PLATFORM ./gradlew :airbyte-tests:automaticMigrationAcceptanceTest --scan -i
run: SUB_BUILD=PLATFORM ./gradlew :airbyte-tests:automaticMigrationAcceptanceTest --scan -i

- name: Slack Notification - Failure
if: failure() && github.ref == 'refs/heads/master'
Expand Down
14 changes: 14 additions & 0 deletions airbyte-bootloader/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,17 @@ application {
mainClass = 'io.airbyte.bootloader.BootloaderApp'
applicationDefaultJvmArgs = ['-XX:MaxRAMPercentage=75.0']
}

task copyGeneratedTar(type: Copy) {
dependsOn copyDocker
dependsOn distTar

from('build/distributions') {
include 'airbyte-bootloader-*.tar'
}
into 'build/docker/bin'
}

Task dockerBuildTask = getDockerBuildTask("bootloader", "$project.projectDir")
dockerBuildTask.dependsOn(copyGeneratedTar)
assemble.dependsOn(dockerBuildTask)
Original file line number Diff line number Diff line change
Expand Up @@ -57,37 +57,39 @@ public BootloaderApp() {
}

public void load() throws Exception {
final Database configDatabase = new ConfigsDatabaseInstance(
configs.getConfigDatabaseUser(),
configs.getConfigDatabasePassword(),
configs.getConfigDatabaseUrl())
.getAndInitialize();
final DatabaseConfigPersistence configPersistence = new DatabaseConfigPersistence(configDatabase);
final ConfigRepository configRepository =
new ConfigRepository(configPersistence.withValidation(), null, Optional.empty(), Optional.empty());
createWorkspaceIfNoneExists(configRepository);
LOGGER.info("Set up config database and default workspace..");

final Database jobDatabase = new JobsDatabaseInstance(
configs.getDatabaseUser(),
configs.getDatabasePassword(),
configs.getDatabaseUrl())
.getAndInitialize();
final JobPersistence jobPersistence = new DefaultJobPersistence(jobDatabase);
createDeploymentIfNoneExists(jobPersistence);
LOGGER.info("Set up job database and default deployment..");

final AirbyteVersion currAirbyteVersion = configs.getAirbyteVersion();
assertNonBreakingMigration(jobPersistence, currAirbyteVersion);

runFlywayMigration(configs, configDatabase, jobDatabase);
LOGGER.info("Ran Flyway migrations...");

jobPersistence.setVersion(currAirbyteVersion.serialize());
LOGGER.info("Set version to {}", currAirbyteVersion);

configPersistence.loadData(YamlSeedConfigPersistence.getDefault());
LOGGER.info("Loaded seed data...");
LOGGER.info("Setting up config database and default workspace..");

try (
final Database configDatabase =
new ConfigsDatabaseInstance(configs.getConfigDatabaseUser(), configs.getConfigDatabasePassword(), configs.getConfigDatabaseUrl())
.getAndInitialize();
final Database jobDatabase =
new JobsDatabaseInstance(configs.getDatabaseUser(), configs.getDatabasePassword(), configs.getDatabaseUrl()).getAndInitialize()) {
LOGGER.info("Created initial jobs and configs database...");

final JobPersistence jobPersistence = new DefaultJobPersistence(jobDatabase);
final AirbyteVersion currAirbyteVersion = configs.getAirbyteVersion();
assertNonBreakingMigration(jobPersistence, currAirbyteVersion);

runFlywayMigration(configs, configDatabase, jobDatabase);
LOGGER.info("Ran Flyway migrations...");

final DatabaseConfigPersistence configPersistence = new DatabaseConfigPersistence(configDatabase);
final ConfigRepository configRepository =
new ConfigRepository(configPersistence.withValidation(), null, Optional.empty(), Optional.empty());

createWorkspaceIfNoneExists(configRepository);
LOGGER.info("Default workspace created..");

createDeploymentIfNoneExists(jobPersistence);
LOGGER.info("Default deployment created..");

jobPersistence.setVersion(currAirbyteVersion.serialize());
LOGGER.info("Set version to {}", currAirbyteVersion);

configPersistence.loadData(YamlSeedConfigPersistence.getDefault());
LOGGER.info("Loaded seed data...");
}

LOGGER.info("Finished bootstrapping Airbyte environment..");
}
Expand Down Expand Up @@ -129,6 +131,7 @@ private static void createWorkspaceIfNoneExists(final ConfigRepository configRep
private static void assertNonBreakingMigration(JobPersistence jobPersistence, AirbyteVersion airbyteVersion) throws IOException {
// version in the database when the server main method is called. may be empty if this is the first
// time the server is started.
LOGGER.info("Checking illegal upgrade..");
final Optional<AirbyteVersion> initialAirbyteDatabaseVersion = jobPersistence.getVersion().map(AirbyteVersion::new);
if (!isLegalUpgrade(initialAirbyteDatabaseVersion.orElse(null), airbyteVersion)) {
final String attentionBanner = MoreResources.readResource("banner/attention-banner.txt");
Expand All @@ -148,10 +151,14 @@ private static void assertNonBreakingMigration(JobPersistence jobPersistence, Ai
static boolean isLegalUpgrade(final AirbyteVersion airbyteDatabaseVersion, final AirbyteVersion airbyteVersion) {
// means there was no previous version so upgrade even needs to happen. always legal.
if (airbyteDatabaseVersion == null) {
LOGGER.info("No previous Airbyte Version set..");
return true;
}

final var isUpgradingThroughVersionBreak = airbyteDatabaseVersion.lessThan(VERSION_BREAK) && airbyteVersion.greaterThan(VERSION_BREAK);
LOGGER.info("Current Airbyte version: {}", airbyteDatabaseVersion);
LOGGER.info("Future Airbyte version: {}", airbyteVersion);
final var futureVersionIsAfterVersionBreak = airbyteVersion.greaterThan(VERSION_BREAK) || airbyteVersion.isDev();
final var isUpgradingThroughVersionBreak = airbyteDatabaseVersion.lessThan(VERSION_BREAK) && futureVersionIsAfterVersionBreak;
return !isUpgradingThroughVersionBreak;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,20 @@ public interface Configs {

String getDatabaseUrl();

String getJobsDatabaseMinimumFlywayMigrationVersion();

long getJobsDatabaseInitializationTimeoutMs();

String getConfigDatabaseUser();

String getConfigDatabasePassword();

String getConfigDatabaseUrl();

String getConfigsDatabaseMinimumFlywayMigrationVersion();

long getConfigsDatabaseInitializationTimeoutMs();

boolean runDatabaseMigrationOnStartup();

// Airbyte Services
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ public class EnvConfigs implements Configs {
private static final String SECRET_PERSISTENCE = "SECRET_PERSISTENCE";
private static final String JOB_POD_MAIN_CONTAINER_IMAGE_PULL_SECRET = "JOB_POD_MAIN_CONTAINER_IMAGE_PULL_SECRET";
private static final String PUBLISH_METRICS = "PUBLISH_METRICS";
private static final String CONFIGS_DATABASE_MINIMUM_FLYWAY_MIGRATION_VERSION = "CONFIGS_DATABASE_MINIMUM_FLYWAY_MIGRATION_VERSION";
private static final String CONFIGS_DATABASE_INITIALIZATION_TIMEOUT_MS = "CONFIGS_DATABASE_INITIALIZATION_TIMEOUT_MS";
private static final String JOBS_DATABASE_MINIMUM_FLYWAY_MIGRATION_VERSION = "JOBS_DATABASE_MINIMUM_FLYWAY_MIGRATION_VERSION";
private static final String JOBS_DATABASE_INITIALIZATION_TIMEOUT_MS = "JOBS_DATABASE_INITIALIZATION_TIMEOUT_MS";

private static final String STATE_STORAGE_S3_BUCKET_NAME = "STATE_STORAGE_S3_BUCKET_NAME";
private static final String STATE_STORAGE_S3_REGION = "STATE_STORAGE_S3_REGION";
Expand All @@ -106,6 +110,7 @@ public class EnvConfigs implements Configs {
private static final long DEFAULT_MINIMUM_WORKSPACE_RETENTION_DAYS = 1;
private static final long DEFAULT_MAXIMUM_WORKSPACE_RETENTION_DAYS = 60;
private static final long DEFAULT_MAXIMUM_WORKSPACE_SIZE_MB = 5000;
private static final int DEFAULT_DATABASE_INTILIZATION_TIMEOUT_MS = 60 * 1000;

public static final long DEFAULT_MAX_SPEC_WORKERS = 5;
public static final long DEFAULT_MAX_CHECK_WORKERS = 5;
Expand Down Expand Up @@ -275,6 +280,16 @@ public String getDatabaseUrl() {
return getEnsureEnv(DATABASE_URL);
}

@Override
public String getJobsDatabaseMinimumFlywayMigrationVersion() {
return getEnsureEnv(JOBS_DATABASE_MINIMUM_FLYWAY_MIGRATION_VERSION);
}

@Override
public long getJobsDatabaseInitializationTimeoutMs() {
return getEnvOrDefault(JOBS_DATABASE_INITIALIZATION_TIMEOUT_MS, DEFAULT_DATABASE_INTILIZATION_TIMEOUT_MS);
}

@Override
public String getConfigDatabaseUser() {
// Default to reuse the job database
Expand All @@ -293,6 +308,16 @@ public String getConfigDatabaseUrl() {
return getEnvOrDefault(CONFIG_DATABASE_URL, getDatabaseUrl());
}

@Override
public String getConfigsDatabaseMinimumFlywayMigrationVersion() {
return getEnsureEnv(CONFIGS_DATABASE_MINIMUM_FLYWAY_MIGRATION_VERSION);
}

@Override
public long getConfigsDatabaseInitializationTimeoutMs() {
return getEnvOrDefault(JOBS_DATABASE_INITIALIZATION_TIMEOUT_MS, DEFAULT_DATABASE_INTILIZATION_TIMEOUT_MS);
}

@Override
public boolean runDatabaseMigrationOnStartup() {
return getEnvOrDefault(RUN_DATABASE_MIGRATION_ON_STARTUP, true);
Expand Down
38 changes: 36 additions & 2 deletions airbyte-db/lib/src/main/java/io/airbyte/db/Databases.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@
import io.airbyte.db.jdbc.JdbcUtils;
import io.airbyte.db.jdbc.StreamingJdbcDatabase;
import io.airbyte.db.mongodb.MongoDatabase;
import java.io.IOException;
import java.util.Optional;
import java.util.function.Function;
import lombok.val;
import org.apache.commons.dbcp2.BasicDataSource;
import org.jooq.SQLDialect;
import org.slf4j.Logger;
Expand All @@ -23,6 +25,7 @@
public class Databases {

private static final Logger LOGGER = LoggerFactory.getLogger(Databases.class);
private static final long DEFAULT_WAIT_MS = 5 * 1000;

public static Database createPostgresDatabase(final String username, final String password, final String jdbcConnectionString) {
return createDatabase(username, password, jdbcConnectionString, "org.postgresql.Driver", SQLDialect.POSTGRES);
Expand All @@ -33,25 +36,54 @@ public static Database createPostgresDatabaseWithRetry(final String username,
final String jdbcConnectionString,
final Function<Database, Boolean> isDbReady) {
Database database = null;
while (database == null) {
try {
val infinity = Integer.MAX_VALUE;
database = createPostgresDatabaseWithRetryTimeout(username, password, jdbcConnectionString, isDbReady, infinity);
} catch (IOException e) {
// This should theoretically never happen since we set the timeout to be a very high number.
}
}

LOGGER.info("Database available!");
return database;
}

public static Database createPostgresDatabaseWithRetryTimeout(final String username,
final String password,
final String jdbcConnectionString,
final Function<Database, Boolean> isDbReady,
final long timeoutMs)
throws IOException {
Database database = null;
if (jdbcConnectionString == null || jdbcConnectionString.trim().equals("")) {
throw new IllegalArgumentException("Using a null or empty jdbc url will hang database creation; aborting.");
}

var totalTime = 0;
while (database == null) {
LOGGER.warn("Waiting for database to become available...");
if (totalTime >= timeoutMs) {
final var error = String.format("Unable to connection to database at %s..", jdbcConnectionString);
throw new IOException(error);
}

try {
database = createPostgresDatabase(username, password, jdbcConnectionString);
if (!isDbReady.apply(database)) {
LOGGER.info("Database is not ready yet. Please wait a moment, it might still be initializing...");
database.close();

database = null;
Exceptions.toRuntime(() -> Thread.sleep(5000));
Exceptions.toRuntime(() -> Thread.sleep(DEFAULT_WAIT_MS));
totalTime += DEFAULT_WAIT_MS;
}
} catch (final Exception e) {
// Ignore the exception because this likely means that the database server is still initializing.
LOGGER.warn("Ignoring exception while trying to request database:", e);
database = null;
Exceptions.toRuntime(() -> Thread.sleep(5000));
Exceptions.toRuntime(() -> Thread.sleep(DEFAULT_WAIT_MS));
totalTime += DEFAULT_WAIT_MS;
}
}

Expand Down Expand Up @@ -175,6 +207,8 @@ private static BasicDataSource createBasicDataSource(final String username,
connectionPool.setDriverClassName(driverClassName);
connectionPool.setUsername(username);
connectionPool.setPassword(password);
connectionPool.setInitialSize(0);
connectionPool.setMaxTotal(5);
connectionPool.setUrl(jdbcConnectionString);
connectionProperties.ifPresent(connectionPool::setConnectionProperties);
return connectionPool;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@

public abstract class BaseDatabaseInstance implements DatabaseInstance {

// Public so classes consuming the getInitialized method have a sense of the time taken.
public static final long DEFAULT_CONNECTION_TIMEOUT_MS = 30 * 1000;

private static final Logger LOGGER = LoggerFactory.getLogger(BaseDatabaseInstance.class);

protected final String username;
Expand Down Expand Up @@ -56,11 +59,12 @@ protected BaseDatabaseInstance(final String username,

@Override
public boolean isInitialized() throws IOException {
final Database database = Databases.createPostgresDatabaseWithRetry(
final Database database = Databases.createPostgresDatabaseWithRetryTimeout(
username,
password,
connectionString,
isDatabaseConnected(databaseName));
isDatabaseConnected(databaseName),
DEFAULT_CONNECTION_TIMEOUT_MS);
return new ExceptionWrappingDatabase(database).transaction(ctx -> tableNames.stream().allMatch(tableName -> hasTable(ctx, tableName)));
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.db.instance;

import java.io.IOException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Contains reusable methods asserting if a database is ready.
* <p>
* This is intended to be used by applications in combination with the bootloader, and the minimum
* migration env vars from {@link io.airbyte.config.Configs}, so application know it is safe to
* start interacting with the database.
* <p>
* Both methods here poll every {@link #DEFAULT_POLL_PERIOD_MS} and have configurable timeouts.
*/
public class MinimumFlywayMigrationVersionCheck {

public static final long DEFAULT_ASSERT_DATABASE_TIMEOUT_MS = 2 * BaseDatabaseInstance.DEFAULT_CONNECTION_TIMEOUT_MS;

private static final Logger LOGGER = LoggerFactory.getLogger(MinimumFlywayMigrationVersionCheck.class);
private static final long DEFAULT_POLL_PERIOD_MS = 2000;

/**
* Assert the given database can be connected to.
*
* @param db
* @param timeoutMs
*/
public static void assertDatabase(DatabaseInstance db, long timeoutMs) {
var currWaitingTime = 0;
var initialized = false;
while (!initialized) {
if (currWaitingTime >= timeoutMs) {
throw new RuntimeException("Timeout while connecting to the database..");
}

try {
initialized = db.isInitialized();
} catch (IOException e) {
currWaitingTime += BaseDatabaseInstance.DEFAULT_CONNECTION_TIMEOUT_MS;
}
}
}

/**
* Assert the given database contains the minimum flyway migrations needed to run the application.
*
* @param migrator
* @param minimumFlywayVersion
* @param timeoutMs
* @throws InterruptedException
*/
public static void assertMigrations(DatabaseMigrator migrator, String minimumFlywayVersion, long timeoutMs) throws InterruptedException {
var currWaitingTime = 0;
var currDatabaseMigrationVersion = migrator.getLatestMigration().getVersion().getVersion();

while (currDatabaseMigrationVersion.compareTo(minimumFlywayVersion) < 0) {
if (currWaitingTime >= timeoutMs) {
throw new RuntimeException("Timeout while waiting for database to fulfill minimum flyway migration version..");
}

Thread.sleep(DEFAULT_POLL_PERIOD_MS);
currWaitingTime += DEFAULT_POLL_PERIOD_MS;
currDatabaseMigrationVersion = migrator.getLatestMigration().getVersion().getVersion();
}
}

}
Loading

0 comments on commit 5cf3967

Please sign in to comment.