diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/JniSinkWriterResponseObserver.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/JniSinkWriterResponseObserver.java index 296592fbd784c..1f1c68f54551e 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/JniSinkWriterResponseObserver.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/JniSinkWriterResponseObserver.java @@ -41,11 +41,12 @@ public void onNext(ConnectorServiceProto.SinkWriterStreamResponse response) { @Override public void onError(Throwable throwable) { - if (!Binding.sendSinkWriterErrorToChannel(this.responseTxPtr, throwable.getMessage())) { + LOG.error("JniSinkWriterHandler onError: ", throwable); + var errMsg = throwable.getMessage() == null ? "unknown error" : throwable.getMessage(); + if (!Binding.sendSinkWriterErrorToChannel(this.responseTxPtr, errMsg)) { LOG.warn("unable to send error: {}", throwable.getMessage()); } this.success = false; - LOG.error("JniSinkWriterHandler onError: ", throwable); } @Override diff --git a/java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/JDBCSink.java b/java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/JDBCSink.java index 52213b224c151..dfdc01f4a36ed 100644 --- a/java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/JDBCSink.java +++ b/java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/JDBCSink.java @@ -64,7 +64,11 @@ public JDBCSink(JDBCSinkConfig config, TableSchema tableSchema) { var columnName = tableSchema.getColumnNames()[columnIdx]; columnSqlTypes[columnIdx] = columnTypeMapping.get(columnName); } - LOG.info("columnSqlTypes: {}", Arrays.toString(columnSqlTypes)); + LOG.info( + "schema = {}, table = {}: columnSqlTypes = {}", + config.getSchemaName(), + config.getTableName(), + Arrays.toString(columnSqlTypes)); if (factory.isPresent()) { this.jdbcDialect = factory.get().create(columnSqlTypes); @@ -105,7 +109,11 @@ private static Map getColumnTypeMapping( String.format(ERROR_REPORT_TEMPLATE, e.getSQLState(), e.getMessage())) .asRuntimeException(); } - LOG.info("detected column type mapping {}", columnTypeMap); + LOG.info( + "schema = {}, table = {}: detected column type mapping = {}", + schemaName, + tableName, + columnTypeMap); return columnTypeMap; } @@ -123,7 +131,11 @@ private static List getPkColumnNames( String.format(ERROR_REPORT_TEMPLATE, e.getSQLState(), e.getMessage())) .asRuntimeException(); } - LOG.info("detected pk column {}", pkColumnNames); + LOG.info( + "schema = {}, table = {}: detected pk column = {}", + schemaName, + tableName, + pkColumnNames); return pkColumnNames; }