Skip to content

Commit

Permalink
add test case
Browse files Browse the repository at this point in the history
  • Loading branch information
xiaochen-zhou committed May 1, 2024
1 parent 2d1cb44 commit 0e91978
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,9 @@
public class PaimonCatalog implements Catalog, PaimonTable {
private static final String DEFAULT_DATABASE = "default";

private String catalogName;
private PaimonSinkConfig paimonSinkConfig;
private PaimonCatalogLoader paimonCatalogLoader;
private final String catalogName;
private final PaimonSinkConfig paimonSinkConfig;
private final PaimonCatalogLoader paimonCatalogLoader;
private org.apache.paimon.catalog.Catalog catalog;

public PaimonCatalog(String catalogName, PaimonSinkConfig paimonSinkConfig) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorException;

Expand Down Expand Up @@ -320,7 +321,7 @@ public static SeaTunnelRow convert(InternalRow rowData, SeaTunnelRowType seaTunn
break;
default:
throw new PaimonConnectorException(
CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE,
CommonErrorCode.UNSUPPORTED_DATA_TYPE,
"SeaTunnel does not support this type");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ public static Column convert(BasicTypeDefine<DataType> typeDefine) {
.nullable(typeDefine.isNullable())
.defaultValue(typeDefine.getDefaultValue())
.comment(typeDefine.getComment());

DataType dataType = typeDefine.getNativeType();
SeaTunnelDataType<?> seaTunnelDataType;
PaimonToSeaTunnelTypeVisitor paimonToSeaTunnelTypeVisitor =
Expand Down Expand Up @@ -258,16 +259,37 @@ public BasicTypeDefine<DataType> visit(Column column) {
Objects.isNull(scale) ? TimestampType.DEFAULT_PRECISION : scale;
TimestampType timestampType = DataTypes.TIMESTAMP(timestampScale);
builder.nativeType(timestampType);
builder.dataType(timestampType.getTypeRoot().name());
builder.columnType(timestampType.toString());
builder.scale(timestampScale);
builder.length(column.getColumnLength());
return builder.build();
case TIME:
int timeScale = Objects.isNull(scale) ? TimeType.DEFAULT_PRECISION : scale;
TimeType timeType = DataTypes.TIME(timeScale);
builder.nativeType(timeType);
builder.columnType(timeType.toString());
builder.dataType(timeType.getTypeRoot().name());
builder.scale(timeScale);
builder.length(column.getColumnLength());
return builder.build();
case DECIMAL:
int precision =
((org.apache.seatunnel.api.table.type.DecimalType) dataType)
.getPrecision();
DecimalType decimalType = DataTypes.DECIMAL(precision, scale);
builder.nativeType(decimalType);
builder.columnType(decimalType.toString());
builder.dataType(decimalType.getTypeRoot().name());
builder.scale(scale);
builder.precision((long) precision);
builder.length(column.getColumnLength());
return builder.build();
default:
builder.nativeType(visit(dataType));
builder.columnType(dataType.toString());
builder.length(column.getColumnLength());
builder.dataType(dataType.getSqlType().name());
return builder.build();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

package org.apache.seatunnel.connectors.seatunnel.paimon.utils;

import org.apache.seatunnel.api.table.catalog.Column;
import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
import org.apache.seatunnel.api.table.type.ArrayType;
import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.DecimalType;
Expand Down Expand Up @@ -46,6 +49,10 @@ public class RowTypeConverterTest {

private RowType rowType;

private BasicTypeDefine<DataType> typeDefine;

private Column column;

private TableSchema tableSchema;

public static final RowType DEFAULT_ROW_TYPE =
Expand Down Expand Up @@ -148,10 +155,39 @@ public void before() {
KEY_NAME_LIST,
Collections.EMPTY_MAP,
"");

typeDefine =
BasicTypeDefine.<DataType>builder()
.name("c_decimal")
.comment("c_decimal_type_define")
.columnType("DECIMAL(30, 8)")
.nativeType(DataTypes.DECIMAL(30, 8))
.dataType(DataTypes.DECIMAL(30, 8).toString())
.length(30L)
.precision(30L)
.scale(8)
.defaultValue(3.0)
.nullable(false)
.build();

org.apache.seatunnel.api.table.type.DecimalType dataType =
new org.apache.seatunnel.api.table.type.DecimalType(30, 8);

column =
PhysicalColumn.builder()
.name("c_decimal")
.sourceType(DataTypes.DECIMAL(30, 8).toString())
.nullable(false)
.dataType(dataType)
.columnLength(30L)
.defaultValue(3.0)
.scale(8)
.comment("c_decimal_type_define")
.build();
}

@Test
public void paimonToSeaTunnel() {
public void paimonRowTypeToSeaTunnel() {
SeaTunnelRowType convert = RowTypeConverter.convert(rowType);
Assertions.assertEquals(convert, seaTunnelRowType);
}
Expand All @@ -161,4 +197,27 @@ public void seaTunnelToPaimon() {
RowType convert = RowTypeConverter.reconvert(seaTunnelRowType, tableSchema);
Assertions.assertEquals(convert, rowType);
}

@Test
public void paimonDataTypeToSeaTunnelColumn() {
Column column = RowTypeConverter.convert(typeDefine);
isEquals(column, typeDefine);
}

@Test
public void seaTunnelColumnToPaimonDataType() {
BasicTypeDefine<DataType> dataTypeDefine = RowTypeConverter.reconvert(column);
isEquals(column, dataTypeDefine);
}

private void isEquals(Column column, BasicTypeDefine<DataType> dataTypeDefine) {
Assertions.assertEquals(column.getComment(), dataTypeDefine.getComment());
Assertions.assertEquals(column.getColumnLength(), dataTypeDefine.getLength());
Assertions.assertEquals(column.getName(), dataTypeDefine.getName());
Assertions.assertEquals(column.isNullable(), dataTypeDefine.isNullable());
Assertions.assertEquals(column.getDefaultValue(), dataTypeDefine.getDefaultValue());
Assertions.assertEquals(column.getScale(), dataTypeDefine.getScale());
Assertions.assertTrue(
column.getDataType().toString().equalsIgnoreCase(dataTypeDefine.getColumnType()));
}
}

0 comments on commit 0e91978

Please sign in to comment.