Skip to content

Commit

Permalink
reuse unsupportedDataType
Browse files Browse the repository at this point in the history
  • Loading branch information
xiaochen-zhou committed May 12, 2024
1 parent 8036acb commit 548e68a
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -219,9 +219,10 @@ public static SeaTunnelRuntimeException unsupportedArrayGenericType(
return new SeaTunnelRuntimeException(UNSUPPORTED_ARRAY_GENERIC_TYPE, params);
}

public static SeaTunnelRuntimeException unsupportedRowKind(String identifier, String rowKind) {
public static SeaTunnelRuntimeException unsupportedRowKind(String identifier, String tableId, String rowKind) {
Map<String, String> params = new HashMap<>();
params.put("identifier", identifier);
params.put("table", tableId);
params.put("rowKind", rowKind);
return new SeaTunnelRuntimeException(UNSUPPORTED_ROW_KIND, params);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public enum CommonErrorCode implements SeaTunnelErrorCode {
UNSUPPORTED_DATA_TYPE_SIMPLE("COMMON-25", "'<identifier>' unsupported data type '<dataType>'"),
UNSUPPORTED_ARRAY_GENERIC_TYPE(
"COMMON-26", "'<identifier>' array type not support genericType '<genericType>'"),
UNSUPPORTED_ROW_KIND("COMMON-27", "'<identifier>' unsupported rowKind type '<rowKind>'");
UNSUPPORTED_ROW_KIND("COMMON-27", "'<identifier>' table '<table>' not support rowKind '<rowKind>'");

private final String code;
private final String description;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,8 @@ public static InternalRow reconvert(
BinaryWriter binaryWriter = new BinaryRowWriter(binaryRow);
// Convert SeaTunnel RowKind to Paimon RowKind
org.apache.paimon.types.RowKind rowKind =
RowKindConverter.convertSeaTunnelRowKind2PaimonRowKind(seaTunnelRow.getRowKind());
RowKindConverter.convertSeaTunnelRowKind2PaimonRowKind(seaTunnelRow.getTableId(),
seaTunnelRow.getRowKind());
binaryRow.setRowKind(rowKind);
SeaTunnelDataType<?>[] fieldTypes = seaTunnelRowType.getFieldTypes();
for (int i = 0; i < fieldTypes.length; i++) {
Expand All @@ -349,6 +350,7 @@ public static InternalRow reconvert(
binaryWriter.setNullAt(i);
continue;
}
String fieldName = seaTunnelRowType.getFieldName(i);
switch (fieldTypes[i].getSqlType()) {
case TINYINT:
binaryWriter.writeByte(i, (Byte) seaTunnelRow.getField(i));
Expand Down Expand Up @@ -394,7 +396,6 @@ public static InternalRow reconvert(
.setValue(binaryWriter, i, DateTimeUtils.toInternal(date));
break;
case TIMESTAMP:
String fieldName = seaTunnelRowType.getFieldName(i);
DataField dataField = SchemaUtil.getDataField(fields, fieldName);
int precision = ((TimestampType) dataField.type()).getPrecision();
LocalDateTime datetime = (LocalDateTime) seaTunnelRow.getField(i);
Expand All @@ -405,8 +406,8 @@ public static InternalRow reconvert(
MapType<?, ?> mapType = (MapType<?, ?>) seaTunnelRowType.getFieldType(i);
SeaTunnelDataType<?> keyType = mapType.getKeyType();
SeaTunnelDataType<?> valueType = mapType.getValueType();
DataType paimonKeyType = RowTypeConverter.reconvert(keyType);
DataType paimonValueType = RowTypeConverter.reconvert(valueType);
DataType paimonKeyType = RowTypeConverter.reconvert(fieldName, keyType);
DataType paimonValueType = RowTypeConverter.reconvert(fieldName, valueType);
Map<?, ?> field = (Map<?, ?>) seaTunnelRow.getField(i);
Object[] keys = field.keySet().toArray(new Object[0]);
Object[] values = field.values().toArray(new Object[0]);
Expand All @@ -424,7 +425,7 @@ public static InternalRow reconvert(
i,
paimonArray,
new InternalArraySerializer(
RowTypeConverter.reconvert(arrayType.getElementType())));
RowTypeConverter.reconvert(fieldName, arrayType.getElementType())));
break;
case ROW:
SeaTunnelDataType<?> rowType = seaTunnelRowType.getFieldType(i);
Expand All @@ -438,7 +439,7 @@ public static InternalRow reconvert(
default:
throw CommonError.unsupportedDataType(
PaimonConfig.CONNECTOR_IDENTITY,
seaTunnelRowType.getFieldType(i).getSqlType().toString());
seaTunnelRowType.getFieldType(i).getSqlType().toString(), fieldName);
}
}
return binaryRow;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,14 @@ public class RowKindConverter {

/**
* Convert SeaTunnel RowKind {@link RowKind} to Paimon RowKind {@link InternalRow}
*
* @param seaTunnelRowInd
* @param tableId Table identifier
* @param seaTunnelRowKind The kind of change that a row describes in a changelog.
* @return
*/
public static org.apache.paimon.types.RowKind convertSeaTunnelRowKind2PaimonRowKind(
RowKind seaTunnelRowInd) {
switch (seaTunnelRowInd) {
String tableId,
RowKind seaTunnelRowKind) {
switch (seaTunnelRowKind) {
case DELETE:
return org.apache.paimon.types.RowKind.DELETE;
case UPDATE_AFTER:
Expand All @@ -44,7 +45,7 @@ public static org.apache.paimon.types.RowKind convertSeaTunnelRowKind2PaimonRowK
return org.apache.paimon.types.RowKind.INSERT;
default:
throw CommonError.unsupportedRowKind(
PaimonConfig.CONNECTOR_IDENTITY, seaTunnelRowInd.shortString());
PaimonConfig.CONNECTOR_IDENTITY, seaTunnelRowKind.shortString(), tableId);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ public static Column convert(BasicTypeDefine<DataType> typeDefine) {
break;
default:
throw CommonError.unsupportedDataType(
PaimonConfig.CONNECTOR_IDENTITY, dataType.asSQLString());
PaimonConfig.CONNECTOR_IDENTITY, dataType.asSQLString(), typeDefine.getName());
}
return physicalColumnBuilder.dataType(seaTunnelDataType).build();
}
Expand All @@ -188,11 +188,18 @@ public static Column convert(BasicTypeDefine<DataType> typeDefine) {
*/
public static RowType reconvert(SeaTunnelRowType seaTunnelRowType, TableSchema tableSchema) {
SeaTunnelDataType<?>[] fieldTypes = seaTunnelRowType.getFieldTypes();
String[] fieldNames = seaTunnelRowType.getFieldNames();
int totalFields = seaTunnelRowType.getTotalFields();
DataType[] dataTypes = new DataType[totalFields];
for (int i = 0; i < totalFields; i++) {
DataType dataType = SeaTunnelTypeToPaimonVisitor.INSTANCE.visit(fieldNames[i], fieldTypes[i]);
dataTypes[i] = dataType;
}
List<DataField> fields = tableSchema.fields();
DataType[] dataTypes =
Arrays.stream(fieldTypes)
.map(SeaTunnelTypeToPaimonVisitor.INSTANCE::visit)
.toArray(DataType[]::new);
// DataType[] dataTypes =
// Arrays.stream(fieldTypes)
// .map(SeaTunnelTypeToPaimonVisitor.INSTANCE::visit)
// .toArray(DataType[]::new);
DataField[] dataFields = new DataField[dataTypes.length];
for (int i = 0; i < dataTypes.length; i++) {
DataType dataType = dataTypes[i];
Expand Down Expand Up @@ -221,12 +228,12 @@ public static BasicTypeDefine<DataType> reconvert(Column column) {

/**
* Mapping SeaTunnel data type {@link SeaTunnelDataType} to Paimon data type {@link DataType}
*
* @param fieldName SeaTunnel field name
* @param dataType SeaTunnel data type {@link SeaTunnelDataType}
* @return Paimon data type {@link DataType}
*/
public static DataType reconvert(SeaTunnelDataType<?> dataType) {
return SeaTunnelTypeToPaimonVisitor.INSTANCE.visit(dataType);
public static DataType reconvert(String fieldName, SeaTunnelDataType<?> dataType) {
return SeaTunnelTypeToPaimonVisitor.INSTANCE.visit(fieldName, dataType);
}

/**
Expand Down Expand Up @@ -282,15 +289,15 @@ public BasicTypeDefine<DataType> visit(Column column) {
builder.length(column.getColumnLength());
return builder.build();
default:
builder.nativeType(visit(dataType));
builder.nativeType(visit(column.getName(), dataType));
builder.columnType(dataType.toString());
builder.length(column.getColumnLength());
builder.dataType(dataType.getSqlType().name());
return builder.build();
}
}

public DataType visit(SeaTunnelDataType<?> dataType) {
public DataType visit(String fieldName, SeaTunnelDataType<?> dataType) {
switch (dataType.getSqlType()) {
case TINYINT:
return DataTypes.TINYINT();
Expand Down Expand Up @@ -329,21 +336,25 @@ public DataType visit(SeaTunnelDataType<?> dataType) {
SeaTunnelDataType<?> valueType =
((org.apache.seatunnel.api.table.type.MapType<?, ?>) dataType)
.getValueType();
return DataTypes.MAP(visit(keyType), visit(valueType));
return DataTypes.MAP(visit(fieldName, keyType), visit(fieldName, valueType));
case ARRAY:
BasicType<?> elementType =
((org.apache.seatunnel.api.table.type.ArrayType<?, ?>) dataType)
.getElementType();
return DataTypes.ARRAY(visit(elementType));
return DataTypes.ARRAY(visit(fieldName, elementType));
case ROW:
SeaTunnelDataType<?>[] fieldTypes =
((SeaTunnelRowType) dataType).getFieldTypes();
DataType[] dataTypes =
Arrays.stream(fieldTypes).map(this::visit).toArray(DataType[]::new);
SeaTunnelRowType row = (SeaTunnelRowType) dataType;
SeaTunnelDataType<?>[] fieldTypes = row.getFieldTypes();
String[] fieldNames = row.getFieldNames();
int totalFields = row.getTotalFields();
DataType[] dataTypes = new DataType[totalFields];
for (int i = 0; i < totalFields; i++) {
dataTypes[i] = SeaTunnelTypeToPaimonVisitor.INSTANCE.visit(fieldNames[i], fieldTypes[i]);
}
return DataTypes.ROW(dataTypes);
default:
throw CommonError.unsupportedDataType(
PaimonConfig.CONNECTOR_IDENTITY, dataType.getSqlType().toString());
PaimonConfig.CONNECTOR_IDENTITY, dataType.getSqlType().toString(), fieldName);
}
}
}
Expand Down

0 comments on commit 548e68a

Please sign in to comment.