Skip to content

Commit

Permalink
fix(jdbc-sink): fill exception error message if it is null (#15261)
Browse files Browse the repository at this point in the history
  • Loading branch information
StrikeW authored Feb 26, 2024
1 parent f4da2e8 commit 461f454
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,12 @@ public void onNext(ConnectorServiceProto.SinkWriterStreamResponse response) {

@Override
public void onError(Throwable throwable) {
if (!Binding.sendSinkWriterErrorToChannel(this.responseTxPtr, throwable.getMessage())) {
LOG.error("JniSinkWriterHandler onError: ", throwable);
var errMsg = throwable.getMessage() == null ? "unknown error" : throwable.getMessage();
if (!Binding.sendSinkWriterErrorToChannel(this.responseTxPtr, errMsg)) {
LOG.warn("unable to send error: {}", throwable.getMessage());
}
this.success = false;
LOG.error("JniSinkWriterHandler onError: ", throwable);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,11 @@ public JDBCSink(JDBCSinkConfig config, TableSchema tableSchema) {
var columnName = tableSchema.getColumnNames()[columnIdx];
columnSqlTypes[columnIdx] = columnTypeMapping.get(columnName);
}
LOG.info("columnSqlTypes: {}", Arrays.toString(columnSqlTypes));
LOG.info(
"schema = {}, table = {}: columnSqlTypes = {}",
config.getSchemaName(),
config.getTableName(),
Arrays.toString(columnSqlTypes));

if (factory.isPresent()) {
this.jdbcDialect = factory.get().create(columnSqlTypes);
Expand Down Expand Up @@ -105,7 +109,11 @@ private static Map<String, Integer> getColumnTypeMapping(
String.format(ERROR_REPORT_TEMPLATE, e.getSQLState(), e.getMessage()))
.asRuntimeException();
}
LOG.info("detected column type mapping {}", columnTypeMap);
LOG.info(
"schema = {}, table = {}: detected column type mapping = {}",
schemaName,
tableName,
columnTypeMap);
return columnTypeMap;
}

Expand All @@ -123,7 +131,11 @@ private static List<String> getPkColumnNames(
String.format(ERROR_REPORT_TEMPLATE, e.getSQLState(), e.getMessage()))
.asRuntimeException();
}
LOG.info("detected pk column {}", pkColumnNames);
LOG.info(
"schema = {}, table = {}: detected pk column = {}",
schemaName,
tableName,
pkColumnNames);
return pkColumnNames;
}

Expand Down

0 comments on commit 461f454

Please sign in to comment.