From 548e68a88e2416f488cc93b58fe76c188aa6c9d2 Mon Sep 17 00:00:00 2001 From: ClownXC Date: Sun, 12 May 2024 09:21:38 +0800 Subject: [PATCH] reuse unsupportedDataType --- .../common/exception/CommonError.java | 3 +- .../common/exception/CommonErrorCode.java | 2 +- .../seatunnel/paimon/utils/RowConverter.java | 13 +++--- .../paimon/utils/RowKindConverter.java | 11 ++--- .../paimon/utils/RowTypeConverter.java | 45 ++++++++++++------- 5 files changed, 44 insertions(+), 30 deletions(-) diff --git a/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonError.java b/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonError.java index 70b0bc95e61..73a0aedf2de 100644 --- a/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonError.java +++ b/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonError.java @@ -219,9 +219,10 @@ public static SeaTunnelRuntimeException unsupportedArrayGenericType( return new SeaTunnelRuntimeException(UNSUPPORTED_ARRAY_GENERIC_TYPE, params); } - public static SeaTunnelRuntimeException unsupportedRowKind(String identifier, String rowKind) { + public static SeaTunnelRuntimeException unsupportedRowKind(String identifier, String tableId, String rowKind) { Map params = new HashMap<>(); params.put("identifier", identifier); + params.put("table", tableId); params.put("rowKind", rowKind); return new SeaTunnelRuntimeException(UNSUPPORTED_ROW_KIND, params); } diff --git a/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonErrorCode.java b/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonErrorCode.java index 015939eef5b..c91188aa43a 100644 --- a/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonErrorCode.java +++ b/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonErrorCode.java @@ -55,7 +55,7 @@ public enum CommonErrorCode implements SeaTunnelErrorCode { UNSUPPORTED_DATA_TYPE_SIMPLE("COMMON-25", "'' unsupported data type ''"), UNSUPPORTED_ARRAY_GENERIC_TYPE( "COMMON-26", "'' array type not support genericType ''"), - UNSUPPORTED_ROW_KIND("COMMON-27", "'' unsupported rowKind type ''"); + UNSUPPORTED_ROW_KIND("COMMON-27", "'' table '' not support rowKind ''"); private final String code; private final String description; diff --git a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverter.java b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverter.java index 3efe5de9e23..7ccb17aa3d7 100644 --- a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverter.java +++ b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverter.java @@ -340,7 +340,8 @@ public static InternalRow reconvert( BinaryWriter binaryWriter = new BinaryRowWriter(binaryRow); // Convert SeaTunnel RowKind to Paimon RowKind org.apache.paimon.types.RowKind rowKind = - RowKindConverter.convertSeaTunnelRowKind2PaimonRowKind(seaTunnelRow.getRowKind()); + RowKindConverter.convertSeaTunnelRowKind2PaimonRowKind(seaTunnelRow.getTableId(), + seaTunnelRow.getRowKind()); binaryRow.setRowKind(rowKind); SeaTunnelDataType[] fieldTypes = seaTunnelRowType.getFieldTypes(); for (int i = 0; i < fieldTypes.length; i++) { @@ -349,6 +350,7 @@ public static InternalRow reconvert( binaryWriter.setNullAt(i); continue; } + String fieldName = seaTunnelRowType.getFieldName(i); switch (fieldTypes[i].getSqlType()) { case TINYINT: binaryWriter.writeByte(i, (Byte) seaTunnelRow.getField(i)); @@ -394,7 +396,6 @@ public static InternalRow reconvert( .setValue(binaryWriter, i, DateTimeUtils.toInternal(date)); break; case TIMESTAMP: - String fieldName = seaTunnelRowType.getFieldName(i); DataField dataField = SchemaUtil.getDataField(fields, fieldName); int precision = ((TimestampType) dataField.type()).getPrecision(); LocalDateTime datetime = (LocalDateTime) seaTunnelRow.getField(i); @@ -405,8 +406,8 @@ public static InternalRow reconvert( MapType mapType = (MapType) seaTunnelRowType.getFieldType(i); SeaTunnelDataType keyType = mapType.getKeyType(); SeaTunnelDataType valueType = mapType.getValueType(); - DataType paimonKeyType = RowTypeConverter.reconvert(keyType); - DataType paimonValueType = RowTypeConverter.reconvert(valueType); + DataType paimonKeyType = RowTypeConverter.reconvert(fieldName, keyType); + DataType paimonValueType = RowTypeConverter.reconvert(fieldName, valueType); Map field = (Map) seaTunnelRow.getField(i); Object[] keys = field.keySet().toArray(new Object[0]); Object[] values = field.values().toArray(new Object[0]); @@ -424,7 +425,7 @@ public static InternalRow reconvert( i, paimonArray, new InternalArraySerializer( - RowTypeConverter.reconvert(arrayType.getElementType()))); + RowTypeConverter.reconvert(fieldName, arrayType.getElementType()))); break; case ROW: SeaTunnelDataType rowType = seaTunnelRowType.getFieldType(i); @@ -438,7 +439,7 @@ public static InternalRow reconvert( default: throw CommonError.unsupportedDataType( PaimonConfig.CONNECTOR_IDENTITY, - seaTunnelRowType.getFieldType(i).getSqlType().toString()); + seaTunnelRowType.getFieldType(i).getSqlType().toString(), fieldName); } } return binaryRow; diff --git a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowKindConverter.java b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowKindConverter.java index b219699bde6..a3fc356d6ff 100644 --- a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowKindConverter.java +++ b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowKindConverter.java @@ -27,13 +27,14 @@ public class RowKindConverter { /** * Convert SeaTunnel RowKind {@link RowKind} to Paimon RowKind {@link InternalRow} - * - * @param seaTunnelRowInd + * @param tableId Table identifier + * @param seaTunnelRowKind The kind of change that a row describes in a changelog. * @return */ public static org.apache.paimon.types.RowKind convertSeaTunnelRowKind2PaimonRowKind( - RowKind seaTunnelRowInd) { - switch (seaTunnelRowInd) { + String tableId, + RowKind seaTunnelRowKind) { + switch (seaTunnelRowKind) { case DELETE: return org.apache.paimon.types.RowKind.DELETE; case UPDATE_AFTER: @@ -44,7 +45,7 @@ public static org.apache.paimon.types.RowKind convertSeaTunnelRowKind2PaimonRowK return org.apache.paimon.types.RowKind.INSERT; default: throw CommonError.unsupportedRowKind( - PaimonConfig.CONNECTOR_IDENTITY, seaTunnelRowInd.shortString()); + PaimonConfig.CONNECTOR_IDENTITY, seaTunnelRowKind.shortString(), tableId); } } } diff --git a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowTypeConverter.java b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowTypeConverter.java index de391f9d338..003127f8101 100644 --- a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowTypeConverter.java +++ b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowTypeConverter.java @@ -175,7 +175,7 @@ public static Column convert(BasicTypeDefine typeDefine) { break; default: throw CommonError.unsupportedDataType( - PaimonConfig.CONNECTOR_IDENTITY, dataType.asSQLString()); + PaimonConfig.CONNECTOR_IDENTITY, dataType.asSQLString(), typeDefine.getName()); } return physicalColumnBuilder.dataType(seaTunnelDataType).build(); } @@ -188,11 +188,18 @@ public static Column convert(BasicTypeDefine typeDefine) { */ public static RowType reconvert(SeaTunnelRowType seaTunnelRowType, TableSchema tableSchema) { SeaTunnelDataType[] fieldTypes = seaTunnelRowType.getFieldTypes(); + String[] fieldNames = seaTunnelRowType.getFieldNames(); + int totalFields = seaTunnelRowType.getTotalFields(); + DataType[] dataTypes = new DataType[totalFields]; + for (int i = 0; i < totalFields; i++) { + DataType dataType = SeaTunnelTypeToPaimonVisitor.INSTANCE.visit(fieldNames[i], fieldTypes[i]); + dataTypes[i] = dataType; + } List fields = tableSchema.fields(); - DataType[] dataTypes = - Arrays.stream(fieldTypes) - .map(SeaTunnelTypeToPaimonVisitor.INSTANCE::visit) - .toArray(DataType[]::new); +// DataType[] dataTypes = +// Arrays.stream(fieldTypes) +// .map(SeaTunnelTypeToPaimonVisitor.INSTANCE::visit) +// .toArray(DataType[]::new); DataField[] dataFields = new DataField[dataTypes.length]; for (int i = 0; i < dataTypes.length; i++) { DataType dataType = dataTypes[i]; @@ -221,12 +228,12 @@ public static BasicTypeDefine reconvert(Column column) { /** * Mapping SeaTunnel data type {@link SeaTunnelDataType} to Paimon data type {@link DataType} - * + * @param fieldName SeaTunnel field name * @param dataType SeaTunnel data type {@link SeaTunnelDataType} * @return Paimon data type {@link DataType} */ - public static DataType reconvert(SeaTunnelDataType dataType) { - return SeaTunnelTypeToPaimonVisitor.INSTANCE.visit(dataType); + public static DataType reconvert(String fieldName, SeaTunnelDataType dataType) { + return SeaTunnelTypeToPaimonVisitor.INSTANCE.visit(fieldName, dataType); } /** @@ -282,7 +289,7 @@ public BasicTypeDefine visit(Column column) { builder.length(column.getColumnLength()); return builder.build(); default: - builder.nativeType(visit(dataType)); + builder.nativeType(visit(column.getName(), dataType)); builder.columnType(dataType.toString()); builder.length(column.getColumnLength()); builder.dataType(dataType.getSqlType().name()); @@ -290,7 +297,7 @@ public BasicTypeDefine visit(Column column) { } } - public DataType visit(SeaTunnelDataType dataType) { + public DataType visit(String fieldName, SeaTunnelDataType dataType) { switch (dataType.getSqlType()) { case TINYINT: return DataTypes.TINYINT(); @@ -329,21 +336,25 @@ public DataType visit(SeaTunnelDataType dataType) { SeaTunnelDataType valueType = ((org.apache.seatunnel.api.table.type.MapType) dataType) .getValueType(); - return DataTypes.MAP(visit(keyType), visit(valueType)); + return DataTypes.MAP(visit(fieldName, keyType), visit(fieldName, valueType)); case ARRAY: BasicType elementType = ((org.apache.seatunnel.api.table.type.ArrayType) dataType) .getElementType(); - return DataTypes.ARRAY(visit(elementType)); + return DataTypes.ARRAY(visit(fieldName, elementType)); case ROW: - SeaTunnelDataType[] fieldTypes = - ((SeaTunnelRowType) dataType).getFieldTypes(); - DataType[] dataTypes = - Arrays.stream(fieldTypes).map(this::visit).toArray(DataType[]::new); + SeaTunnelRowType row = (SeaTunnelRowType) dataType; + SeaTunnelDataType[] fieldTypes = row.getFieldTypes(); + String[] fieldNames = row.getFieldNames(); + int totalFields = row.getTotalFields(); + DataType[] dataTypes = new DataType[totalFields]; + for (int i = 0; i < totalFields; i++) { + dataTypes[i] = SeaTunnelTypeToPaimonVisitor.INSTANCE.visit(fieldNames[i], fieldTypes[i]); + } return DataTypes.ROW(dataTypes); default: throw CommonError.unsupportedDataType( - PaimonConfig.CONNECTOR_IDENTITY, dataType.getSqlType().toString()); + PaimonConfig.CONNECTOR_IDENTITY, dataType.getSqlType().toString(), fieldName); } } }