diff --git a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalog.java b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalog.java index aab4a0fb814..6fecdae69f9 100644 --- a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalog.java +++ b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalog.java @@ -48,9 +48,9 @@ public class PaimonCatalog implements Catalog, PaimonTable { private static final String DEFAULT_DATABASE = "default"; - private String catalogName; - private PaimonSinkConfig paimonSinkConfig; - private PaimonCatalogLoader paimonCatalogLoader; + private final String catalogName; + private final PaimonSinkConfig paimonSinkConfig; + private final PaimonCatalogLoader paimonCatalogLoader; private org.apache.paimon.catalog.Catalog catalog; public PaimonCatalog(String catalogName, PaimonSinkConfig paimonSinkConfig) { diff --git a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverter.java b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverter.java index cb45edac739..9368649f313 100644 --- a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverter.java +++ b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverter.java @@ -23,6 +23,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.common.exception.CommonErrorCode; import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated; import org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorException; @@ -320,7 +321,7 @@ public static SeaTunnelRow convert(InternalRow rowData, SeaTunnelRowType seaTunn break; default: throw new PaimonConnectorException( - CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE, + CommonErrorCode.UNSUPPORTED_DATA_TYPE, "SeaTunnel does not support this type"); } } diff --git a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowTypeConverter.java b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowTypeConverter.java index fb845440c3f..bd5c2233c73 100644 --- a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowTypeConverter.java +++ b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowTypeConverter.java @@ -94,6 +94,7 @@ public static Column convert(BasicTypeDefine typeDefine) { .nullable(typeDefine.isNullable()) .defaultValue(typeDefine.getDefaultValue()) .comment(typeDefine.getComment()); + DataType dataType = typeDefine.getNativeType(); SeaTunnelDataType seaTunnelDataType; PaimonToSeaTunnelTypeVisitor paimonToSeaTunnelTypeVisitor = @@ -258,16 +259,37 @@ public BasicTypeDefine visit(Column column) { Objects.isNull(scale) ? TimestampType.DEFAULT_PRECISION : scale; TimestampType timestampType = DataTypes.TIMESTAMP(timestampScale); builder.nativeType(timestampType); + builder.dataType(timestampType.getTypeRoot().name()); + builder.columnType(timestampType.toString()); builder.scale(timestampScale); + builder.length(column.getColumnLength()); return builder.build(); case TIME: int timeScale = Objects.isNull(scale) ? TimeType.DEFAULT_PRECISION : scale; TimeType timeType = DataTypes.TIME(timeScale); builder.nativeType(timeType); + builder.columnType(timeType.toString()); + builder.dataType(timeType.getTypeRoot().name()); builder.scale(timeScale); + builder.length(column.getColumnLength()); + return builder.build(); + case DECIMAL: + int precision = + ((org.apache.seatunnel.api.table.type.DecimalType) dataType) + .getPrecision(); + DecimalType decimalType = DataTypes.DECIMAL(precision, scale); + builder.nativeType(decimalType); + builder.columnType(decimalType.toString()); + builder.dataType(decimalType.getTypeRoot().name()); + builder.scale(scale); + builder.precision((long) precision); + builder.length(column.getColumnLength()); return builder.build(); default: builder.nativeType(visit(dataType)); + builder.columnType(dataType.toString()); + builder.length(column.getColumnLength()); + builder.dataType(dataType.getSqlType().name()); return builder.build(); } } diff --git a/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowTypeConverterTest.java b/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowTypeConverterTest.java index 5e614aeda53..e075efde159 100644 --- a/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowTypeConverterTest.java +++ b/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowTypeConverterTest.java @@ -17,6 +17,9 @@ package org.apache.seatunnel.connectors.seatunnel.paimon.utils; +import org.apache.seatunnel.api.table.catalog.Column; +import org.apache.seatunnel.api.table.catalog.PhysicalColumn; +import org.apache.seatunnel.api.table.converter.BasicTypeDefine; import org.apache.seatunnel.api.table.type.ArrayType; import org.apache.seatunnel.api.table.type.BasicType; import org.apache.seatunnel.api.table.type.DecimalType; @@ -46,6 +49,10 @@ public class RowTypeConverterTest { private RowType rowType; + private BasicTypeDefine typeDefine; + + private Column column; + private TableSchema tableSchema; public static final RowType DEFAULT_ROW_TYPE = @@ -148,10 +155,39 @@ public void before() { KEY_NAME_LIST, Collections.EMPTY_MAP, ""); + + typeDefine = + BasicTypeDefine.builder() + .name("c_decimal") + .comment("c_decimal_type_define") + .columnType("DECIMAL(30, 8)") + .nativeType(DataTypes.DECIMAL(30, 8)) + .dataType(DataTypes.DECIMAL(30, 8).toString()) + .length(30L) + .precision(30L) + .scale(8) + .defaultValue(3.0) + .nullable(false) + .build(); + + org.apache.seatunnel.api.table.type.DecimalType dataType = + new org.apache.seatunnel.api.table.type.DecimalType(30, 8); + + column = + PhysicalColumn.builder() + .name("c_decimal") + .sourceType(DataTypes.DECIMAL(30, 8).toString()) + .nullable(false) + .dataType(dataType) + .columnLength(30L) + .defaultValue(3.0) + .scale(8) + .comment("c_decimal_type_define") + .build(); } @Test - public void paimonToSeaTunnel() { + public void paimonRowTypeToSeaTunnel() { SeaTunnelRowType convert = RowTypeConverter.convert(rowType); Assertions.assertEquals(convert, seaTunnelRowType); } @@ -161,4 +197,27 @@ public void seaTunnelToPaimon() { RowType convert = RowTypeConverter.reconvert(seaTunnelRowType, tableSchema); Assertions.assertEquals(convert, rowType); } + + @Test + public void paimonDataTypeToSeaTunnelColumn() { + Column column = RowTypeConverter.convert(typeDefine); + isEquals(column, typeDefine); + } + + @Test + public void seaTunnelColumnToPaimonDataType() { + BasicTypeDefine dataTypeDefine = RowTypeConverter.reconvert(column); + isEquals(column, dataTypeDefine); + } + + private void isEquals(Column column, BasicTypeDefine dataTypeDefine) { + Assertions.assertEquals(column.getComment(), dataTypeDefine.getComment()); + Assertions.assertEquals(column.getColumnLength(), dataTypeDefine.getLength()); + Assertions.assertEquals(column.getName(), dataTypeDefine.getName()); + Assertions.assertEquals(column.isNullable(), dataTypeDefine.isNullable()); + Assertions.assertEquals(column.getDefaultValue(), dataTypeDefine.getDefaultValue()); + Assertions.assertEquals(column.getScale(), dataTypeDefine.getScale()); + Assertions.assertTrue( + column.getDataType().toString().equalsIgnoreCase(dataTypeDefine.getColumnType())); + } }