From e488a639328b0631fee8a432b74aea3927de2620 Mon Sep 17 00:00:00 2001 From: hunterliu Date: Mon, 25 Nov 2024 14:20:28 +0800 Subject: [PATCH] feat(flink-cdc): improve Starrocks unsupported field error return value --- .../starrocks/sink/EventRecordSerializationSchema.java | 2 +- .../flink/cdc/connectors/starrocks/sink/StarRocksUtils.java | 6 ++++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/EventRecordSerializationSchema.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/EventRecordSerializationSchema.java index 76b55aafc98..2bc9d5658e8 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/EventRecordSerializationSchema.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/EventRecordSerializationSchema.java @@ -98,7 +98,7 @@ private void applySchemaChangeEvent(SchemaChangeEvent event) { for (int i = 0; i < newSchema.getColumnCount(); i++) { tableInfo.fieldGetters[i] = StarRocksUtils.createFieldGetter( - newSchema.getColumns().get(i).getType(), i, zoneId); + newSchema.getColumns().get(i).getType(), i, zoneId, tableId); } tableInfoMap.put(tableId, tableInfo); } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksUtils.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksUtils.java index ccab99c7a36..6f8b3c08704 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksUtils.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksUtils.java @@ -132,7 +132,7 @@ public static void toStarRocksDataType( * */ public static RecordData.FieldGetter createFieldGetter( - DataType fieldType, int fieldPos, ZoneId zoneId) { + DataType fieldType, int fieldPos, ZoneId zoneId, TableId tableId) { final RecordData.FieldGetter fieldGetter; // ordered by type root definition switch (fieldType.getTypeRoot()) { @@ -195,7 +195,9 @@ fieldPos, getPrecision(fieldType)) break; default: throw new UnsupportedOperationException( - "Don't support data type " + fieldType.getTypeRoot()); + String.format( + "Don't support data type %s in table: %s, field pos: %d", + fieldType.getTypeRoot(), tableId, fieldPos)); } if (!fieldType.isNullable()) { return fieldGetter;