diff --git a/java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/JDBCSink.java b/java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/JDBCSink.java index 10aa371c50aec..02297a4ea57dd 100644 --- a/java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/JDBCSink.java +++ b/java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/JDBCSink.java @@ -71,12 +71,13 @@ public JDBCSink(JDBCSinkConfig config, TableSchema tableSchema) { .collect(Collectors.toList()); LOG.info( - "schema = {}, table = {}, tableSchema = {}, columnSqlTypes = {}, pkIndices = {}", + "schema = {}, table = {}, tableSchema = {}, columnSqlTypes = {}, pkIndices = {}, queryTimeout = {}", config.getSchemaName(), config.getTableName(), tableSchema, columnSqlTypes, - pkIndices); + pkIndices, + config.getQueryTimeout()); if (factory.isPresent()) { this.jdbcDialect = factory.get().create(columnSqlTypes, pkIndices); @@ -92,7 +93,7 @@ public JDBCSink(JDBCSinkConfig config, TableSchema tableSchema) { // Commit the `getTransactionIsolation` conn.commit(); - jdbcStatements = new JdbcStatements(conn); + jdbcStatements = new JdbcStatements(conn, config.getQueryTimeout()); } catch (SQLException e) { throw Status.INTERNAL .withDescription( @@ -173,7 +174,7 @@ public boolean write(Iterable rows) { conn = JdbcUtils.getConnection(config.getJdbcUrl()); // reset the flag since we will retry to prepare the batch again updateFlag = false; - jdbcStatements = new JdbcStatements(conn); + jdbcStatements = new JdbcStatements(conn, config.getQueryTimeout()); } else { throw io.grpc.Status.INTERNAL .withDescription( @@ -206,13 +207,15 @@ public boolean write(Iterable rows) { * across multiple batches if only the JDBC connection is valid. */ class JdbcStatements implements AutoCloseable { + private final int queryTimeoutSecs; private PreparedStatement deleteStatement; private PreparedStatement upsertStatement; private PreparedStatement insertStatement; private final Connection conn; - public JdbcStatements(Connection conn) throws SQLException { + public JdbcStatements(Connection conn, int queryTimeoutSecs) throws SQLException { + this.queryTimeoutSecs = queryTimeoutSecs; this.conn = conn; var schemaTableName = jdbcDialect.createSchemaTableName( @@ -339,6 +342,9 @@ private void executeStatement(PreparedStatement stmt) throws SQLException { if (stmt == null) { return; } + // if timeout occurs, a SQLTimeoutException will be thrown + // and we will retry to write the stream chunk in `JDBCSink.write` + stmt.setQueryTimeout(queryTimeoutSecs); LOG.debug("Executing statement: {}", stmt); stmt.executeBatch(); stmt.clearParameters(); diff --git a/java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/JDBCSinkConfig.java b/java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/JDBCSinkConfig.java index ca74ac6a8eb74..94eb5cdc7e0ff 100644 --- a/java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/JDBCSinkConfig.java +++ b/java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/JDBCSinkConfig.java @@ -32,6 +32,9 @@ public class JDBCSinkConfig extends CommonSinkConfig { @JsonProperty(value = "schema.name") private String schemaName; + @JsonProperty(value = "jdbc.query.timeout") + private int queryTimeoutSeconds = 600; + @JsonCreator public JDBCSinkConfig( @JsonProperty(value = "jdbc.url") String jdbcUrl, @@ -62,4 +65,8 @@ public String getSinkType() { public boolean isUpsertSink() { return this.isUpsertSink; } + + public int getQueryTimeout() { + return queryTimeoutSeconds; + } }