Skip to content

Commit

Permalink
[GOBBLIN-1942] Create MySQL util class for re-usable methods and setu…
Browse files Browse the repository at this point in the history
…p MysqlDagActio… (#3812)

* Create MySQL util class for re-usable methods and setup MysqlDagActionStore retention

* Add a java doc

* Address review comments

* Close scheduled executors on shutdown & clarify naming and comments

* Remove extra period making config key invalid

* implement Closeable

* Use try with resources

---------

Co-authored-by: Urmi Mustafi <[email protected]>
  • Loading branch information
umustafi and Urmi Mustafi authored Nov 1, 2023
1 parent 6f5199d commit c865b6a
Show file tree
Hide file tree
Showing 5 changed files with 188 additions and 119 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ public class ConfigurationKeys {
public static final int DEFAULT_LOAD_SPEC_BATCH_SIZE = 500;
public static final String SKIP_SCHEDULING_FLOWS_AFTER_NUM_DAYS = "skip.scheduling.flows.after.num.days";
public static final int DEFAULT_NUM_DAYS_TO_SKIP_AFTER = 365;
// Mysql Dag Action Store configuration
public static final String MYSQL_DAG_ACTION_STORE_PREFIX = "MysqlDagActionStore.";
public static final String MYSQL_DAG_ACTION_STORE_TABLE_RETENTION_PERIOD_SECONDS_KEY = MYSQL_DAG_ACTION_STORE_PREFIX + "retentionPeriodSeconds";
public static final long DEFAULT_MYSQL_DAG_ACTION_STORE_TABLE_RETENTION_PERIOD_SEC_KEY = 3 * 24 * 60 * 60; // (3 days in seconds)
// Scheduler lease determination store configuration
public static final String MYSQL_LEASE_ARBITER_PREFIX = "MysqlMultiActiveLeaseArbiter";
public static final String MULTI_ACTIVE_SCHEDULER_CONSTANTS_DB_TABLE_KEY = MYSQL_LEASE_ARBITER_PREFIX + ".constantsTable";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import com.google.inject.Inject;
import com.typesafe.config.Config;
import com.zaxxer.hikari.HikariDataSource;
import java.io.IOException;
import java.sql.Connection;
import java.sql.PreparedStatement;
Expand All @@ -30,7 +29,6 @@
import java.util.Calendar;
import java.util.Optional;
import java.util.TimeZone;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.sql.DataSource;
import lombok.Data;
Expand All @@ -40,8 +38,7 @@
import org.apache.gobblin.metastore.MysqlDataSourceFactory;
import org.apache.gobblin.service.ServiceConfigKeys;
import org.apache.gobblin.util.ConfigUtils;

import static org.apache.gobblin.runtime.api.DagActionStore.DagAction.*;
import org.apache.gobblin.util.DBStatementExecutor;


/**
Expand Down Expand Up @@ -80,13 +77,9 @@
*/
@Slf4j
public class MysqlMultiActiveLeaseArbiter implements MultiActiveLeaseArbiter {
/** `j.u.Function` variant for an operation that may @throw IOException or SQLException: preserves method signature checked exceptions */
@FunctionalInterface
protected interface CheckedFunction<T, R> {
R apply(T t) throws IOException, SQLException;
}

protected final DataSource dataSource;
private final DBStatementExecutor dbStatementExecutor;
private final String leaseArbiterTableName;
private final String constantsTableName;
private final int epsilonMillis;
Expand Down Expand Up @@ -121,7 +114,7 @@ protected interface CheckedFunction<T, R> {
// Deletes rows older than retention time period regardless of lease status as they should all be invalid or completed
// since retention >> linger
private static final String LEASE_ARBITER_TABLE_RETENTION_STATEMENT = "DELETE FROM %s WHERE event_timestamp < "
+ "DATE_SUB(CURRENT_TIMESTAMP(3), INTERVAL ? * 1000 MICROSECOND)";
+ "DATE_SUB(CURRENT_TIMESTAMP(3), INTERVAL %s * 1000 MICROSECOND)";
private static final String CREATE_CONSTANTS_TABLE_STATEMENT = "CREATE TABLE IF NOT EXISTS %s "
+ "(primary_key INT, epsilon INT, linger INT, PRIMARY KEY (primary_key))";
// Only insert epsilon and linger values from config if this table does not contain a pre-existing values already.
Expand Down Expand Up @@ -196,7 +189,8 @@ public MysqlMultiActiveLeaseArbiter(Config config) throws IOException {
ConfigurationKeys.DEFAULT_SCHEDULER_EVENT_LINGER_MILLIS);
this.retentionPeriodMillis = ConfigUtils.getLong(config, ConfigurationKeys.SCHEDULER_LEASE_DETERMINATION_TABLE_RETENTION_PERIOD_MILLIS_KEY,
ConfigurationKeys.DEFAULT_SCHEDULER_LEASE_DETERMINATION_TABLE_RETENTION_PERIOD_MILLIS);
this.thisTableRetentionStatement = String.format(LEASE_ARBITER_TABLE_RETENTION_STATEMENT, this.leaseArbiterTableName);
this.thisTableRetentionStatement = String.format(LEASE_ARBITER_TABLE_RETENTION_STATEMENT, this.leaseArbiterTableName,
retentionPeriodMillis);
this.thisTableGetInfoStatement = String.format(GET_EVENT_INFO_STATEMENT, this.leaseArbiterTableName,
this.constantsTableName);
this.thisTableGetInfoStatementForReminder = String.format(GET_EVENT_INFO_STATEMENT_FOR_REMINDER,
Expand All @@ -208,6 +202,7 @@ public MysqlMultiActiveLeaseArbiter(Config config) throws IOException {
this.thisTableAcquireLeaseIfFinishedStatement =
String.format(CONDITIONALLY_ACQUIRE_LEASE_IF_FINISHED_LEASING_STATEMENT, this.leaseArbiterTableName);
this.dataSource = MysqlDataSourceFactory.get(config, SharedResourcesBrokerFactory.getImplicitBroker());
this.dbStatementExecutor = new DBStatementExecutor(this.dataSource, log);
String createArbiterStatement = String.format(
CREATE_LEASE_ARBITER_TABLE_STATEMENT, leaseArbiterTableName);
try (Connection connection = dataSource.getConnection();
Expand All @@ -218,52 +213,28 @@ public MysqlMultiActiveLeaseArbiter(Config config) throws IOException {
throw new IOException("Table creation failure for " + leaseArbiterTableName, e);
}
initializeConstantsTable();
runRetentionOnArbitrationTable();

// Periodically deletes all rows in the table with event_timestamp older than the retention period defined by config.
dbStatementExecutor.repeatSqlCommandExecutionAtInterval(thisTableRetentionStatement, 4, TimeUnit.HOURS);

log.info("MysqlMultiActiveLeaseArbiter initialized");
}

// Initialize Constants table if needed and insert row into it if one does not exist
private void initializeConstantsTable() throws IOException {
String createConstantsStatement = String.format(CREATE_CONSTANTS_TABLE_STATEMENT, this.constantsTableName);
withPreparedStatement(createConstantsStatement, createStatement -> createStatement.executeUpdate(), true);
dbStatementExecutor.withPreparedStatement(createConstantsStatement, createStatement -> createStatement.executeUpdate(),
true);

String insertConstantsStatement = String.format(UPSERT_CONSTANTS_TABLE_STATEMENT, this.constantsTableName);
withPreparedStatement(insertConstantsStatement, insertStatement -> {
dbStatementExecutor.withPreparedStatement(insertConstantsStatement, insertStatement -> {
int i = 0;
insertStatement.setInt(++i, epsilonMillis);
insertStatement.setInt(++i, lingerMillis);
return insertStatement.executeUpdate();
}, true);
}

/**
* Periodically deletes all rows in the table with event_timestamp older than the retention period defined by config.
* // TODO: create a utility to run a SQL commend in a STPE using interval T
*/
private void runRetentionOnArbitrationTable() {
ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
Runnable retentionTask = () -> {
try {
withPreparedStatement(thisTableRetentionStatement,
retentionStatement -> {
retentionStatement.setLong(1, retentionPeriodMillis);
int numRowsDeleted = retentionStatement.executeUpdate();
if (numRowsDeleted != 0) {
log.info("Multi-active lease arbiter retention thread deleted {} rows from the lease arbiter table",
numRowsDeleted);
}
return numRowsDeleted;
}, true);
} catch (IOException e) {
log.error("Failing to run retention on lease arbiter table. Unbounded growth can lead to database slowness and "
+ "affect our system performance. Examine exception: ", e);
}
};

// Run retention thread every 4 hours (6 times a day)
executor.scheduleAtFixedRate(retentionTask, 0, 4, TimeUnit.HOURS);
}

@Override
public LeaseAttemptStatus tryAcquireLease(DagActionStore.DagAction flowAction, long eventTimeMillis,
boolean isReminderEvent) throws IOException {
Expand Down Expand Up @@ -370,7 +341,7 @@ else if (leaseValidityStatus == 2) {
*/
protected Optional<GetEventInfoResult> getExistingEventInfo(DagActionStore.DagAction flowAction,
boolean isReminderEvent, long eventTimeMillis) throws IOException {
return withPreparedStatement(isReminderEvent ? thisTableGetInfoStatementForReminder : thisTableGetInfoStatement,
return dbStatementExecutor.withPreparedStatement(isReminderEvent ? thisTableGetInfoStatementForReminder : thisTableGetInfoStatement,
getInfoStatement -> {
int i = 0;
if (isReminderEvent) {
Expand Down Expand Up @@ -425,7 +396,7 @@ protected GetEventInfoResult createGetInfoResult(ResultSet resultSet) throws IOE
protected int attemptLeaseIfNewRow(DagActionStore.DagAction flowAction) throws IOException {
String formattedAcquireLeaseNewRowStatement =
String.format(ACQUIRE_LEASE_IF_NEW_ROW_STATEMENT, this.leaseArbiterTableName);
return withPreparedStatement(formattedAcquireLeaseNewRowStatement,
return dbStatementExecutor.withPreparedStatement(formattedAcquireLeaseNewRowStatement,
insertStatement -> {
completeInsertPreparedStatement(insertStatement, flowAction);
try {
Expand All @@ -447,7 +418,7 @@ protected int attemptLeaseIfNewRow(DagActionStore.DagAction flowAction) throws I
protected int attemptLeaseIfExistingRow(String acquireLeaseStatement, DagActionStore.DagAction flowAction,
boolean needEventTimeCheck, boolean needLeaseAcquisition, Timestamp dbEventTimestamp,
Timestamp dbLeaseAcquisitionTimestamp) throws IOException {
return withPreparedStatement(acquireLeaseStatement,
return dbStatementExecutor.withPreparedStatement(acquireLeaseStatement,
insertStatement -> {
completeUpdatePreparedStatement(insertStatement, flowAction, needEventTimeCheck, needLeaseAcquisition,
dbEventTimestamp, dbLeaseAcquisitionTimestamp);
Expand All @@ -460,7 +431,7 @@ protected int attemptLeaseIfExistingRow(String acquireLeaseStatement, DagActionS
* was successful or not.
*/
protected SelectInfoResult getRowInfo(DagActionStore.DagAction flowAction) throws IOException {
return withPreparedStatement(thisTableSelectAfterInsertStatement,
return dbStatementExecutor.withPreparedStatement(thisTableSelectAfterInsertStatement,
selectStatement -> {
completeWhereClauseMatchingKeyPreparedStatement(selectStatement, flowAction);
ResultSet resultSet = selectStatement.executeQuery();
Expand Down Expand Up @@ -596,7 +567,7 @@ public boolean recordLeaseSuccess(LeaseObtainedStatus status)
String flowGroup = flowAction.getFlowGroup();
String flowName = flowAction.getFlowName();
DagActionStore.FlowActionType flowActionType = flowAction.getFlowActionType();
return withPreparedStatement(String.format(CONDITIONALLY_COMPLETE_LEASE_STATEMENT, leaseArbiterTableName),
return dbStatementExecutor.withPreparedStatement(String.format(CONDITIONALLY_COMPLETE_LEASE_STATEMENT, leaseArbiterTableName),
updateStatement -> {
int i = 0;
updateStatement.setString(++i, flowGroup);
Expand All @@ -621,25 +592,6 @@ public boolean recordLeaseSuccess(LeaseObtainedStatus status)
}, true);
}

/** Abstracts recurring pattern around resource management and exception re-mapping. */
protected <T> T withPreparedStatement(String sql, CheckedFunction<PreparedStatement, T> f, boolean shouldCommit)
throws IOException {
try (Connection connection = this.dataSource.getConnection();
PreparedStatement statement = connection.prepareStatement(sql)) {
T result = f.apply(statement);
if (shouldCommit) {
connection.commit();
}
statement.close();
return result;
} catch (SQLException e) {
log.warn("Received SQL exception that can result from invalid connection. Checking if validation query is set {} "
+ "Exception is {}", ((HikariDataSource) this.dataSource).getConnectionTestQuery(), e);
throw new IOException(e);
}
}


/**
* DTO for arbiter's current lease state for a FlowActionEvent
*/
Expand Down
Loading

0 comments on commit c865b6a

Please sign in to comment.