Skip to content

Commit

Permalink
make configurable
Browse files Browse the repository at this point in the history
  • Loading branch information
StrikeW committed Sep 6, 2024
1 parent db5d8c2 commit 1151b72
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,13 @@ public JDBCSink(JDBCSinkConfig config, TableSchema tableSchema) {
.collect(Collectors.toList());

LOG.info(
"schema = {}, table = {}, tableSchema = {}, columnSqlTypes = {}, pkIndices = {}",
"schema = {}, table = {}, tableSchema = {}, columnSqlTypes = {}, pkIndices = {}, queryTimeout = {}",
config.getSchemaName(),
config.getTableName(),
tableSchema,
columnSqlTypes,
pkIndices);
pkIndices,
config.getQueryTimeout());

if (factory.isPresent()) {
this.jdbcDialect = factory.get().create(columnSqlTypes, pkIndices);
Expand All @@ -92,7 +93,7 @@ public JDBCSink(JDBCSinkConfig config, TableSchema tableSchema) {
// Commit the `getTransactionIsolation`
conn.commit();

jdbcStatements = new JdbcStatements(conn);
jdbcStatements = new JdbcStatements(conn, config.getQueryTimeout());
} catch (SQLException e) {
throw Status.INTERNAL
.withDescription(
Expand Down Expand Up @@ -173,7 +174,7 @@ public boolean write(Iterable<SinkRow> rows) {
conn = JdbcUtils.getConnection(config.getJdbcUrl());
// reset the flag since we will retry to prepare the batch again
updateFlag = false;
jdbcStatements = new JdbcStatements(conn);
jdbcStatements = new JdbcStatements(conn, config.getQueryTimeout());
} else {
throw io.grpc.Status.INTERNAL
.withDescription(
Expand Down Expand Up @@ -206,14 +207,15 @@ public boolean write(Iterable<SinkRow> rows) {
* across multiple batches if only the JDBC connection is valid.
*/
class JdbcStatements implements AutoCloseable {
static final int QUERY_TIMEOUT_SEC = 600; // 10 minutes
private final int queryTimeoutSecs;
private PreparedStatement deleteStatement;
private PreparedStatement upsertStatement;
private PreparedStatement insertStatement;

private final Connection conn;

public JdbcStatements(Connection conn) throws SQLException {
public JdbcStatements(Connection conn, int queryTimeoutSecs) throws SQLException {
this.queryTimeoutSecs = queryTimeoutSecs;
this.conn = conn;
var schemaTableName =
jdbcDialect.createSchemaTableName(
Expand Down Expand Up @@ -342,7 +344,7 @@ private void executeStatement(PreparedStatement stmt) throws SQLException {
}
// if timeout occurs, a SQLTimeoutException will be thrown
// and we will retry to write the stream chunk in `JDBCSink.write`
stmt.setQueryTimeout(QUERY_TIMEOUT_SEC);
stmt.setQueryTimeout(queryTimeoutSecs);
LOG.debug("Executing statement: {}", stmt);
stmt.executeBatch();
stmt.clearParameters();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ public class JDBCSinkConfig extends CommonSinkConfig {
@JsonProperty(value = "schema.name")
private String schemaName;

@JsonProperty(value = "jdbc.query.timeout")
private int queryTimeoutSeconds = 600;

@JsonCreator
public JDBCSinkConfig(
@JsonProperty(value = "jdbc.url") String jdbcUrl,
Expand Down Expand Up @@ -62,4 +65,8 @@ public String getSinkType() {
public boolean isUpsertSink() {
return this.isUpsertSink;
}

public int getQueryTimeout() {
return queryTimeoutSeconds;
}
}

0 comments on commit 1151b72

Please sign in to comment.