From c865b6a8e4628f9602671151a1a33c8a27dacdef Mon Sep 17 00:00:00 2001 From: umustafi Date: Wed, 1 Nov 2023 12:15:24 -0700 Subject: [PATCH] =?UTF-8?q?[GOBBLIN-1942]=20Create=20MySQL=20util=20class?= =?UTF-8?q?=20for=20re-usable=20methods=20and=20setup=20MysqlDagActio?= =?UTF-8?q?=E2=80=A6=20(#3812)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Create MySQL util class for re-usable methods and setup MysqlDagActionStore retention * Add a java doc * Address review comments * Close scheduled executors on shutdown & clarify naming and comments * Remove extra period making config key invalid * implement Closeable * Use try with resources --------- Co-authored-by: Urmi Mustafi --- .../configuration/ConfigurationKeys.java | 4 + .../api/MysqlMultiActiveLeaseArbiter.java | 84 +++---------- .../dag_action_store/MysqlDagActionStore.java | 107 ++++++++--------- .../spec_store/MysqlBaseSpecStore.java | 1 + .../gobblin/util/DBStatementExecutor.java | 111 ++++++++++++++++++ 5 files changed, 188 insertions(+), 119 deletions(-) create mode 100644 gobblin-runtime/src/main/java/org/apache/gobblin/util/DBStatementExecutor.java diff --git a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java index a50ba8c75eb..5fe8f001af1 100644 --- a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java +++ b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java @@ -95,6 +95,10 @@ public class ConfigurationKeys { public static final int DEFAULT_LOAD_SPEC_BATCH_SIZE = 500; public static final String SKIP_SCHEDULING_FLOWS_AFTER_NUM_DAYS = "skip.scheduling.flows.after.num.days"; public static final int DEFAULT_NUM_DAYS_TO_SKIP_AFTER = 365; + // Mysql Dag Action Store configuration + public static final String MYSQL_DAG_ACTION_STORE_PREFIX = "MysqlDagActionStore."; + public static final String MYSQL_DAG_ACTION_STORE_TABLE_RETENTION_PERIOD_SECONDS_KEY = MYSQL_DAG_ACTION_STORE_PREFIX + "retentionPeriodSeconds"; + public static final long DEFAULT_MYSQL_DAG_ACTION_STORE_TABLE_RETENTION_PERIOD_SEC_KEY = 3 * 24 * 60 * 60; // (3 days in seconds) // Scheduler lease determination store configuration public static final String MYSQL_LEASE_ARBITER_PREFIX = "MysqlMultiActiveLeaseArbiter"; public static final String MULTI_ACTIVE_SCHEDULER_CONSTANTS_DB_TABLE_KEY = MYSQL_LEASE_ARBITER_PREFIX + ".constantsTable"; diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java index 338e908a2e0..05449767cf1 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java @@ -19,7 +19,6 @@ import com.google.inject.Inject; import com.typesafe.config.Config; -import com.zaxxer.hikari.HikariDataSource; import java.io.IOException; import java.sql.Connection; import java.sql.PreparedStatement; @@ -30,7 +29,6 @@ import java.util.Calendar; import java.util.Optional; import java.util.TimeZone; -import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import javax.sql.DataSource; import lombok.Data; @@ -40,8 +38,7 @@ import org.apache.gobblin.metastore.MysqlDataSourceFactory; import org.apache.gobblin.service.ServiceConfigKeys; import org.apache.gobblin.util.ConfigUtils; - -import static org.apache.gobblin.runtime.api.DagActionStore.DagAction.*; +import org.apache.gobblin.util.DBStatementExecutor; /** @@ -80,13 +77,9 @@ */ @Slf4j public class MysqlMultiActiveLeaseArbiter implements MultiActiveLeaseArbiter { - /** `j.u.Function` variant for an operation that may @throw IOException or SQLException: preserves method signature checked exceptions */ - @FunctionalInterface - protected interface CheckedFunction { - R apply(T t) throws IOException, SQLException; - } protected final DataSource dataSource; + private final DBStatementExecutor dbStatementExecutor; private final String leaseArbiterTableName; private final String constantsTableName; private final int epsilonMillis; @@ -121,7 +114,7 @@ protected interface CheckedFunction { // Deletes rows older than retention time period regardless of lease status as they should all be invalid or completed // since retention >> linger private static final String LEASE_ARBITER_TABLE_RETENTION_STATEMENT = "DELETE FROM %s WHERE event_timestamp < " - + "DATE_SUB(CURRENT_TIMESTAMP(3), INTERVAL ? * 1000 MICROSECOND)"; + + "DATE_SUB(CURRENT_TIMESTAMP(3), INTERVAL %s * 1000 MICROSECOND)"; private static final String CREATE_CONSTANTS_TABLE_STATEMENT = "CREATE TABLE IF NOT EXISTS %s " + "(primary_key INT, epsilon INT, linger INT, PRIMARY KEY (primary_key))"; // Only insert epsilon and linger values from config if this table does not contain a pre-existing values already. @@ -196,7 +189,8 @@ public MysqlMultiActiveLeaseArbiter(Config config) throws IOException { ConfigurationKeys.DEFAULT_SCHEDULER_EVENT_LINGER_MILLIS); this.retentionPeriodMillis = ConfigUtils.getLong(config, ConfigurationKeys.SCHEDULER_LEASE_DETERMINATION_TABLE_RETENTION_PERIOD_MILLIS_KEY, ConfigurationKeys.DEFAULT_SCHEDULER_LEASE_DETERMINATION_TABLE_RETENTION_PERIOD_MILLIS); - this.thisTableRetentionStatement = String.format(LEASE_ARBITER_TABLE_RETENTION_STATEMENT, this.leaseArbiterTableName); + this.thisTableRetentionStatement = String.format(LEASE_ARBITER_TABLE_RETENTION_STATEMENT, this.leaseArbiterTableName, + retentionPeriodMillis); this.thisTableGetInfoStatement = String.format(GET_EVENT_INFO_STATEMENT, this.leaseArbiterTableName, this.constantsTableName); this.thisTableGetInfoStatementForReminder = String.format(GET_EVENT_INFO_STATEMENT_FOR_REMINDER, @@ -208,6 +202,7 @@ public MysqlMultiActiveLeaseArbiter(Config config) throws IOException { this.thisTableAcquireLeaseIfFinishedStatement = String.format(CONDITIONALLY_ACQUIRE_LEASE_IF_FINISHED_LEASING_STATEMENT, this.leaseArbiterTableName); this.dataSource = MysqlDataSourceFactory.get(config, SharedResourcesBrokerFactory.getImplicitBroker()); + this.dbStatementExecutor = new DBStatementExecutor(this.dataSource, log); String createArbiterStatement = String.format( CREATE_LEASE_ARBITER_TABLE_STATEMENT, leaseArbiterTableName); try (Connection connection = dataSource.getConnection(); @@ -218,17 +213,21 @@ public MysqlMultiActiveLeaseArbiter(Config config) throws IOException { throw new IOException("Table creation failure for " + leaseArbiterTableName, e); } initializeConstantsTable(); - runRetentionOnArbitrationTable(); + + // Periodically deletes all rows in the table with event_timestamp older than the retention period defined by config. + dbStatementExecutor.repeatSqlCommandExecutionAtInterval(thisTableRetentionStatement, 4, TimeUnit.HOURS); + log.info("MysqlMultiActiveLeaseArbiter initialized"); } // Initialize Constants table if needed and insert row into it if one does not exist private void initializeConstantsTable() throws IOException { String createConstantsStatement = String.format(CREATE_CONSTANTS_TABLE_STATEMENT, this.constantsTableName); - withPreparedStatement(createConstantsStatement, createStatement -> createStatement.executeUpdate(), true); + dbStatementExecutor.withPreparedStatement(createConstantsStatement, createStatement -> createStatement.executeUpdate(), + true); String insertConstantsStatement = String.format(UPSERT_CONSTANTS_TABLE_STATEMENT, this.constantsTableName); - withPreparedStatement(insertConstantsStatement, insertStatement -> { + dbStatementExecutor.withPreparedStatement(insertConstantsStatement, insertStatement -> { int i = 0; insertStatement.setInt(++i, epsilonMillis); insertStatement.setInt(++i, lingerMillis); @@ -236,34 +235,6 @@ private void initializeConstantsTable() throws IOException { }, true); } - /** - * Periodically deletes all rows in the table with event_timestamp older than the retention period defined by config. - * // TODO: create a utility to run a SQL commend in a STPE using interval T - */ - private void runRetentionOnArbitrationTable() { - ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1); - Runnable retentionTask = () -> { - try { - withPreparedStatement(thisTableRetentionStatement, - retentionStatement -> { - retentionStatement.setLong(1, retentionPeriodMillis); - int numRowsDeleted = retentionStatement.executeUpdate(); - if (numRowsDeleted != 0) { - log.info("Multi-active lease arbiter retention thread deleted {} rows from the lease arbiter table", - numRowsDeleted); - } - return numRowsDeleted; - }, true); - } catch (IOException e) { - log.error("Failing to run retention on lease arbiter table. Unbounded growth can lead to database slowness and " - + "affect our system performance. Examine exception: ", e); - } - }; - - // Run retention thread every 4 hours (6 times a day) - executor.scheduleAtFixedRate(retentionTask, 0, 4, TimeUnit.HOURS); - } - @Override public LeaseAttemptStatus tryAcquireLease(DagActionStore.DagAction flowAction, long eventTimeMillis, boolean isReminderEvent) throws IOException { @@ -370,7 +341,7 @@ else if (leaseValidityStatus == 2) { */ protected Optional getExistingEventInfo(DagActionStore.DagAction flowAction, boolean isReminderEvent, long eventTimeMillis) throws IOException { - return withPreparedStatement(isReminderEvent ? thisTableGetInfoStatementForReminder : thisTableGetInfoStatement, + return dbStatementExecutor.withPreparedStatement(isReminderEvent ? thisTableGetInfoStatementForReminder : thisTableGetInfoStatement, getInfoStatement -> { int i = 0; if (isReminderEvent) { @@ -425,7 +396,7 @@ protected GetEventInfoResult createGetInfoResult(ResultSet resultSet) throws IOE protected int attemptLeaseIfNewRow(DagActionStore.DagAction flowAction) throws IOException { String formattedAcquireLeaseNewRowStatement = String.format(ACQUIRE_LEASE_IF_NEW_ROW_STATEMENT, this.leaseArbiterTableName); - return withPreparedStatement(formattedAcquireLeaseNewRowStatement, + return dbStatementExecutor.withPreparedStatement(formattedAcquireLeaseNewRowStatement, insertStatement -> { completeInsertPreparedStatement(insertStatement, flowAction); try { @@ -447,7 +418,7 @@ protected int attemptLeaseIfNewRow(DagActionStore.DagAction flowAction) throws I protected int attemptLeaseIfExistingRow(String acquireLeaseStatement, DagActionStore.DagAction flowAction, boolean needEventTimeCheck, boolean needLeaseAcquisition, Timestamp dbEventTimestamp, Timestamp dbLeaseAcquisitionTimestamp) throws IOException { - return withPreparedStatement(acquireLeaseStatement, + return dbStatementExecutor.withPreparedStatement(acquireLeaseStatement, insertStatement -> { completeUpdatePreparedStatement(insertStatement, flowAction, needEventTimeCheck, needLeaseAcquisition, dbEventTimestamp, dbLeaseAcquisitionTimestamp); @@ -460,7 +431,7 @@ protected int attemptLeaseIfExistingRow(String acquireLeaseStatement, DagActionS * was successful or not. */ protected SelectInfoResult getRowInfo(DagActionStore.DagAction flowAction) throws IOException { - return withPreparedStatement(thisTableSelectAfterInsertStatement, + return dbStatementExecutor.withPreparedStatement(thisTableSelectAfterInsertStatement, selectStatement -> { completeWhereClauseMatchingKeyPreparedStatement(selectStatement, flowAction); ResultSet resultSet = selectStatement.executeQuery(); @@ -596,7 +567,7 @@ public boolean recordLeaseSuccess(LeaseObtainedStatus status) String flowGroup = flowAction.getFlowGroup(); String flowName = flowAction.getFlowName(); DagActionStore.FlowActionType flowActionType = flowAction.getFlowActionType(); - return withPreparedStatement(String.format(CONDITIONALLY_COMPLETE_LEASE_STATEMENT, leaseArbiterTableName), + return dbStatementExecutor.withPreparedStatement(String.format(CONDITIONALLY_COMPLETE_LEASE_STATEMENT, leaseArbiterTableName), updateStatement -> { int i = 0; updateStatement.setString(++i, flowGroup); @@ -621,25 +592,6 @@ public boolean recordLeaseSuccess(LeaseObtainedStatus status) }, true); } - /** Abstracts recurring pattern around resource management and exception re-mapping. */ - protected T withPreparedStatement(String sql, CheckedFunction f, boolean shouldCommit) - throws IOException { - try (Connection connection = this.dataSource.getConnection(); - PreparedStatement statement = connection.prepareStatement(sql)) { - T result = f.apply(statement); - if (shouldCommit) { - connection.commit(); - } - statement.close(); - return result; - } catch (SQLException e) { - log.warn("Received SQL exception that can result from invalid connection. Checking if validation query is set {} " - + "Exception is {}", ((HikariDataSource) this.dataSource).getConnectionTestQuery(), e); - throw new IOException(e); - } - } - - /** * DTO for arbiter's current lease state for a FlowActionEvent */ diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/dag_action_store/MysqlDagActionStore.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/dag_action_store/MysqlDagActionStore.java index 4f639e04a4f..894d0a3004c 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/dag_action_store/MysqlDagActionStore.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/dag_action_store/MysqlDagActionStore.java @@ -28,6 +28,7 @@ import com.google.inject.Inject; import com.typesafe.config.Config; +import java.util.concurrent.TimeUnit; import javax.sql.DataSource; import lombok.extern.slf4j.Slf4j; @@ -38,6 +39,8 @@ import org.apache.gobblin.service.ServiceConfigKeys; import org.apache.gobblin.util.ConfigUtils; import org.apache.gobblin.util.ExponentialBackoff; +import org.apache.gobblin.util.DBStatementExecutor; + @Slf4j public class MysqlDagActionStore implements DagActionStore { @@ -45,7 +48,10 @@ public class MysqlDagActionStore implements DagActionStore { public static final String CONFIG_PREFIX = "MysqlDagActionStore"; protected final DataSource dataSource; + private final DBStatementExecutor dbStatementExecutor; private final String tableName; + private final long retentionPeriodSeconds; + private String thisTableRetentionStatement; private static final String EXISTS_STATEMENT = "SELECT EXISTS(SELECT * FROM %s WHERE flow_group = ? AND flow_name =? AND flow_execution_id = ? AND dag_action = ?)"; protected static final String INSERT_STATEMENT = "INSERT INTO %s (flow_group, flow_name, flow_execution_id, dag_action) " @@ -58,6 +64,8 @@ public class MysqlDagActionStore implements DagActionStore { + "flow_execution_id varchar(" + ServiceConfigKeys.MAX_FLOW_EXECUTION_ID_LENGTH + ") NOT NULL, " + "dag_action varchar(100) NOT NULL, modified_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP on update CURRENT_TIMESTAMP NOT NULL, " + "PRIMARY KEY (flow_group,flow_name,flow_execution_id, dag_action))"; + // Deletes rows older than retention time period (in seconds) to prevent this table from growing unbounded. + private static final String RETENTION_STATEMENT = "DELETE FROM %s WHERE modified_time < DATE_SUB(CURRENT_TIMESTAMP, INTERVAL %s SECOND)"; private final int getDagActionMaxRetries; @@ -71,7 +79,8 @@ public MysqlDagActionStore(Config config) throws IOException { this.tableName = ConfigUtils.getString(config, ConfigurationKeys.STATE_STORE_DB_TABLE_KEY, ConfigurationKeys.DEFAULT_STATE_STORE_DB_TABLE); this.getDagActionMaxRetries = ConfigUtils.getInt(config, ConfigurationKeys.MYSQL_GET_MAX_RETRIES, ConfigurationKeys.DEFAULT_MYSQL_GET_MAX_RETRIES); - + this.retentionPeriodSeconds = ConfigUtils.getLong(config, ConfigurationKeys.MYSQL_DAG_ACTION_STORE_TABLE_RETENTION_PERIOD_SECONDS_KEY, + ConfigurationKeys.DEFAULT_MYSQL_DAG_ACTION_STORE_TABLE_RETENTION_PERIOD_SEC_KEY); this.dataSource = MysqlDataSourceFactory.get(config, SharedResourcesBrokerFactory.getImplicitBroker()); try (Connection connection = dataSource.getConnection(); @@ -81,116 +90,108 @@ public MysqlDagActionStore(Config config) throws IOException { } catch (SQLException e) { throw new IOException("Failure creation table " + tableName, e); } + this.dbStatementExecutor = new DBStatementExecutor(this.dataSource, log); + this.thisTableRetentionStatement = String.format(RETENTION_STATEMENT, this.tableName, retentionPeriodSeconds); + // Periodically deletes all rows in the table last_modified before the retention period defined by config. + dbStatementExecutor.repeatSqlCommandExecutionAtInterval(thisTableRetentionStatement, 6, TimeUnit.HOURS); } @Override public boolean exists(String flowGroup, String flowName, String flowExecutionId, FlowActionType flowActionType) throws IOException, SQLException { - ResultSet rs = null; - try (Connection connection = this.dataSource.getConnection(); - PreparedStatement existStatement = connection.prepareStatement(String.format(EXISTS_STATEMENT, tableName))) { + return dbStatementExecutor.withPreparedStatement(String.format(EXISTS_STATEMENT, tableName), existStatement -> { int i = 0; existStatement.setString(++i, flowGroup); existStatement.setString(++i, flowName); existStatement.setString(++i, flowExecutionId); existStatement.setString(++i, flowActionType.toString()); - rs = existStatement.executeQuery(); - rs.next(); - return rs.getBoolean(1); - } catch (SQLException e) { - throw new IOException(String.format("Failure checking existence of DagAction: %s in table %s", - new DagAction(flowGroup, flowName, flowExecutionId, flowActionType), tableName), e); - } finally { - if (rs != null) { - rs.close(); + ResultSet rs = null; + try { + rs = existStatement.executeQuery(); + rs.next(); + return rs.getBoolean(1); + } catch (SQLException e) { + throw new IOException(String.format("Failure checking existence of DagAction: %s in table %s", + new DagAction(flowGroup, flowName, flowExecutionId, flowActionType), tableName), e); + } finally { + if (rs != null) { + rs.close(); + } } - } + }, true); } @Override public void addDagAction(String flowGroup, String flowName, String flowExecutionId, FlowActionType flowActionType) throws IOException { - try (Connection connection = this.dataSource.getConnection(); - PreparedStatement insertStatement = connection.prepareStatement(String.format(INSERT_STATEMENT, tableName))) { + dbStatementExecutor.withPreparedStatement(String.format(INSERT_STATEMENT, tableName), insertStatement -> { + try { int i = 0; insertStatement.setString(++i, flowGroup); insertStatement.setString(++i, flowName); insertStatement.setString(++i, flowExecutionId); insertStatement.setString(++i, flowActionType.toString()); - insertStatement.executeUpdate(); - connection.commit(); + return insertStatement.executeUpdate(); } catch (SQLException e) { throw new IOException(String.format("Failure adding action for DagAction: %s in table %s", new DagAction(flowGroup, flowName, flowExecutionId, flowActionType), tableName), e); - } + }}, true); } @Override public boolean deleteDagAction(DagAction dagAction) throws IOException { - try (Connection connection = this.dataSource.getConnection(); - PreparedStatement deleteStatement = connection.prepareStatement(String.format(DELETE_STATEMENT, tableName))) { + return dbStatementExecutor.withPreparedStatement(String.format(DELETE_STATEMENT, tableName), deleteStatement -> { + try { int i = 0; deleteStatement.setString(++i, dagAction.getFlowGroup()); deleteStatement.setString(++i, dagAction.getFlowName()); deleteStatement.setString(++i, dagAction.getFlowExecutionId()); deleteStatement.setString(++i, dagAction.getFlowActionType().toString()); int result = deleteStatement.executeUpdate(); - connection.commit(); return result != 0; } catch (SQLException e) { throw new IOException(String.format("Failure deleting action for DagAction: %s in table %s", dagAction, tableName), e); - } + }}, true); } // TODO: later change this to getDagActions relating to a particular flow execution if it makes sense private DagAction getDagActionWithRetry(String flowGroup, String flowName, String flowExecutionId, FlowActionType flowActionType, ExponentialBackoff exponentialBackoff) throws IOException, SQLException { - ResultSet rs = null; - try (Connection connection = this.dataSource.getConnection(); - PreparedStatement getStatement = connection.prepareStatement(String.format(GET_STATEMENT, tableName))) { + return dbStatementExecutor.withPreparedStatement(String.format(GET_STATEMENT, tableName), getStatement -> { int i = 0; getStatement.setString(++i, flowGroup); getStatement.setString(++i, flowName); getStatement.setString(++i, flowExecutionId); getStatement.setString(++i, flowActionType.toString()); - rs = getStatement.executeQuery(); - if (rs.next()) { - return new DagAction(rs.getString(1), rs.getString(2), rs.getString(3), FlowActionType.valueOf(rs.getString(4))); - } else { - if (exponentialBackoff.awaitNextRetryIfAvailable()) { + try (ResultSet rs = getStatement.executeQuery()) { + if (rs.next()) { + return new DagAction(rs.getString(1), rs.getString(2), rs.getString(3), FlowActionType.valueOf(rs.getString(4))); + } else if (exponentialBackoff.awaitNextRetryIfAvailable()) { return getDagActionWithRetry(flowGroup, flowName, flowExecutionId, flowActionType, exponentialBackoff); } else { log.warn(String.format("Can not find dag action: %s with flowGroup: %s, flowName: %s, flowExecutionId: %s", flowActionType, flowGroup, flowName, flowExecutionId)); return null; } + } catch (SQLException | InterruptedException e) { + throw new IOException(String.format("Failure get %s from table %s", + new DagAction(flowGroup, flowName, flowExecutionId, flowActionType), tableName), e); } - } catch (SQLException | InterruptedException e) { - throw new IOException(String.format("Failure get %s from table %s", new DagAction(flowGroup, flowName, flowExecutionId, - flowActionType), tableName), e); - } finally { - if (rs != null) { - rs.close(); - } - } + }, true); } @Override public Collection getDagActions() throws IOException { - HashSet result = new HashSet<>(); - try (Connection connection = this.dataSource.getConnection(); - PreparedStatement getAllStatement = connection.prepareStatement(String.format(GET_ALL_STATEMENT, tableName)); - ResultSet rs = getAllStatement.executeQuery()) { - while (rs.next()) { - result.add( - new DagAction(rs.getString(1), rs.getString(2), rs.getString(3), FlowActionType.valueOf(rs.getString(4)))); - } - if (rs != null) { - rs.close(); + return dbStatementExecutor.withPreparedStatement(String.format(GET_ALL_STATEMENT, tableName), getAllStatement -> { + HashSet result = new HashSet<>(); + try (ResultSet rs = getAllStatement.executeQuery()) { + while (rs.next()) { + result.add(new DagAction(rs.getString(1), rs.getString(2), rs.getString(3), FlowActionType.valueOf(rs.getString(4)))); + } + return result; + } catch (SQLException e) { + throw new IOException(String.format("Failure get dag actions from table %s ", tableName), e); } - return result; - } catch (SQLException e) { - throw new IOException(String.format("Failure get dag actions from table %s ", tableName), e); - } + }, true); } } diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlBaseSpecStore.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlBaseSpecStore.java index 713c2b9d8a9..595ba89d79f 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlBaseSpecStore.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlBaseSpecStore.java @@ -352,6 +352,7 @@ public Optional getSpecStoreURI() { return Optional.of(this.specStoreURI); } + // TODO: migrate this class to use common util {@link DBStatementExecutor} /** Abstracts recurring pattern around resource management and exception re-mapping. */ protected T withPreparedStatement(String sql, CheckedFunction f, boolean shouldCommit) throws IOException { try (Connection connection = this.dataSource.getConnection(); diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/util/DBStatementExecutor.java b/gobblin-runtime/src/main/java/org/apache/gobblin/util/DBStatementExecutor.java new file mode 100644 index 00000000000..1f554779c2c --- /dev/null +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/util/DBStatementExecutor.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.util; + +import com.zaxxer.hikari.HikariDataSource; +import java.io.Closeable; +import java.io.IOException; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import javax.sql.DataSource; +import org.slf4j.Logger; + + +/** + * Many database stores require common functionality that can be stored in a utility class. The functionality + * includes executing prepared statements on a data source object and SQL queries at fixed intervals. + * The caller of the class MUST maintain ownership of the {@link DataSource} and close this instance when the + * {@link DataSource} is about to be closed well. Both are to be done only once this instance will no longer be used. + */ +public class DBStatementExecutor implements Closeable { + private final DataSource dataSource; + private final Logger log; + private final ArrayList scheduledExecutors; + + public DBStatementExecutor(DataSource dataSource, Logger log) { + this.dataSource = dataSource; + this.log = log; + this.scheduledExecutors = new ArrayList<>(); + } + + /** `j.u.Function` variant for an operation that may @throw IOException or SQLException: preserves method signature checked exceptions */ + @FunctionalInterface + public interface CheckedFunction { + R apply(T t) throws IOException, SQLException; + } + + /** Abstracts recurring pattern around resource management and exception re-mapping. */ + public T withPreparedStatement(String sql, CheckedFunction f, boolean shouldCommit) + throws IOException { + try (Connection connection = dataSource.getConnection(); + PreparedStatement statement = connection.prepareStatement(sql)) { + T result = f.apply(statement); + if (shouldCommit) { + connection.commit(); + } + statement.close(); + return result; + } catch (SQLException e) { + log.warn("Received SQL exception that can result from invalid connection. Checking if validation query is set {} " + + "Exception is {}", ((HikariDataSource) dataSource).getConnectionTestQuery(), e); + throw new IOException(e); + } + } + + /** + * Repeats execution of a SQL command at a fixed interval while the service is running. The first execution of the + * command is immediate. + * @param sqlCommand SQL string + * @param interval frequency with which command will run + * @param timeUnit unit of time for interval + */ + public void repeatSqlCommandExecutionAtInterval(String sqlCommand, long interval, TimeUnit timeUnit) { + ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1); + Runnable task = () -> { + try { + withPreparedStatement(sqlCommand, + preparedStatement -> { + int numRowsAffected = preparedStatement.executeUpdate(); + if (numRowsAffected != 0) { + log.info("{} rows affected by SQL command: {}", numRowsAffected, sqlCommand); + } + return numRowsAffected; + }, true); + } catch (IOException e) { + log.error("Failed to execute SQL command: {}", sqlCommand, e); + } + }; + executor.scheduleAtFixedRate(task, 0, interval, timeUnit); + this.scheduledExecutors.add(executor); + } + + /** + * Call before closing the data source object associated with this instance to also shut down any executors expecting + * to be run on the data source. + */ + @Override + public void close() { + for (ScheduledThreadPoolExecutor executor : this.scheduledExecutors) { + executor.shutdownNow(); + } + } +}