Skip to content

Commit

Permalink
optimize code
Browse files Browse the repository at this point in the history
  • Loading branch information
xiaochen-zhou committed May 12, 2024
1 parent 760fb29 commit 3915e36
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@
import static org.apache.seatunnel.common.exception.CommonErrorCode.UNSUPPORTED_DATA_TYPE;
import static org.apache.seatunnel.common.exception.CommonErrorCode.UNSUPPORTED_DATA_TYPE_SIMPLE;
import static org.apache.seatunnel.common.exception.CommonErrorCode.UNSUPPORTED_ENCODING;
import static org.apache.seatunnel.common.exception.CommonErrorCode.VERSION_NOT_SUPPORTED;
import static org.apache.seatunnel.common.exception.CommonErrorCode.UNSUPPORTED_ROW_KIND;
import static org.apache.seatunnel.common.exception.CommonErrorCode.VERSION_NOT_SUPPORTED;
import static org.apache.seatunnel.common.exception.CommonErrorCode.WRITE_SEATUNNEL_ROW_ERROR;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,19 +49,17 @@ public enum CommonErrorCode implements SeaTunnelErrorCode {
WRITE_SEATUNNEL_ROW_ERROR(
"COMMON-23",
"<connector> write SeaTunnelRow failed, the SeaTunnelRow value is '<seaTunnelRow>'."),

SQL_TEMPLATE_HANDLED_ERROR(
"COMMON-24",
"The table of <tableName> has no <keyName>, but the template \n <template> \n which has the place holder named <placeholder>. Please use the option named <optionName> to specify sql template"),
UNSUPPORTED_DATA_TYPE_SIMPLE("COMMON-25", "'<identifier>' unsupported data type '<dataType>'"),
UNSUPPORTED_ARRAY_GENERIC_TYPE(
"COMMON-26", "'<identifier>' array type not support genericType '<genericType>' of '<fieldName>'"),
UNSUPPORTED_ROW_KIND("COMMON-27", "'<identifier>' table '<tableId>' not support rowKind '<rowKind>'");
"The table of <tableName> has no <keyName>, but the template \n <template> \n which has the place holder named <placeholder>. Please use the option named <optionName> to specify sql template"),

VERSION_NOT_SUPPORTED("COMMON-25", "<identifier> <version> is unsupported."),

OPERATION_NOT_SUPPORTED("COMMON-26", "<identifier> <operation> is unsupported.");
OPERATION_NOT_SUPPORTED("COMMON-26", "<identifier> <operation> is unsupported."),
UNSUPPORTED_DATA_TYPE_SIMPLE("COMMON-27", "'<identifier>' unsupported data type '<dataType>'"),
UNSUPPORTED_ARRAY_GENERIC_TYPE(
"COMMON-28",
"'<identifier>' array type not support genericType '<genericType>' of '<fieldName>'"),
UNSUPPORTED_ROW_KIND(
"COMMON-29", "'<identifier>' table '<tableId>' not support rowKind '<rowKind>'");

private final String code;
private final String description;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,7 @@
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.CommonError;
import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonConfig;
import org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorException;

import org.apache.paimon.data.BinaryArray;
import org.apache.paimon.data.BinaryArrayWriter;
Expand Down Expand Up @@ -70,7 +68,8 @@ private RowConverter() {}
* @param dataType Data type of the array
* @return SeaTunnel array object
*/
public static Object convert(String fieldName, InternalArray array, SeaTunnelDataType<?> dataType) {
public static Object convertArrayType(
String fieldName, InternalArray array, SeaTunnelDataType<?> dataType) {
switch (dataType.getSqlType()) {
case STRING:
String[] strings = new String[array.size()];
Expand Down Expand Up @@ -122,7 +121,9 @@ public static Object convert(String fieldName, InternalArray array, SeaTunnelDat
return doubles;
default:
throw CommonError.unsupportedArrayGenericType(
PaimonConfig.CONNECTOR_IDENTITY, dataType.getSqlType().toString(), fieldName);
PaimonConfig.CONNECTOR_IDENTITY,
dataType.getSqlType().toString(),
fieldName);
}
}

Expand All @@ -133,7 +134,8 @@ public static Object convert(String fieldName, InternalArray array, SeaTunnelDat
* @param dataType SeaTunnel array data type
* @return Paimon array object {@link BinaryArray}
*/
public static BinaryArray reconvert(String fieldName, Object array, SeaTunnelDataType<?> dataType) {
public static BinaryArray reconvert(
String fieldName, Object array, SeaTunnelDataType<?> dataType) {
int length = ((Object[]) array).length;
BinaryArray binaryArray = new BinaryArray();
BinaryArrayWriter binaryArrayWriter;
Expand Down Expand Up @@ -221,7 +223,9 @@ public static BinaryArray reconvert(String fieldName, Object array, SeaTunnelDat
break;
default:
throw CommonError.unsupportedArrayGenericType(
PaimonConfig.CONNECTOR_IDENTITY, dataType.getSqlType().toString(), fieldName);
PaimonConfig.CONNECTOR_IDENTITY,
dataType.getSqlType().toString(),
fieldName);
}
binaryArrayWriter.complete();
return binaryArray;
Expand All @@ -243,6 +247,7 @@ public static SeaTunnelRow convert(InternalRow rowData, SeaTunnelRowType seaTunn
continue;
}
SeaTunnelDataType<?> fieldType = seaTunnelRowType.getFieldType(i);
String fieldName = seaTunnelRowType.getFieldName(i);
switch (fieldType.getSqlType()) {
case TINYINT:
objects[i] = rowData.getByte(i);
Expand All @@ -263,12 +268,11 @@ public static SeaTunnelRow convert(InternalRow rowData, SeaTunnelRowType seaTunn
objects[i] = rowData.getDouble(i);
break;
case DECIMAL:
SeaTunnelDataType<?> decimalType = seaTunnelRowType.getFieldType(i);
Decimal decimal =
rowData.getDecimal(
i,
((DecimalType) decimalType).getPrecision(),
((DecimalType) decimalType).getScale());
((DecimalType) fieldType).getPrecision(),
((DecimalType) fieldType).getScale());
objects[i] = decimal.toBigDecimal();
break;
case STRING:
Expand All @@ -291,20 +295,21 @@ public static SeaTunnelRow convert(InternalRow rowData, SeaTunnelRowType seaTunn
objects[i] = timestamp.toLocalDateTime();
break;
case ARRAY:
SeaTunnelDataType<?> arrayType = seaTunnelRowType.getFieldType(i);
InternalArray array = rowData.getArray(i);
objects[i] = convert(seaTunnelRowType.getFieldName(i), array, ((ArrayType<?, ?>) arrayType).getElementType());
InternalArray paimonArray = rowData.getArray(i);
ArrayType<?, ?> seatunnelArray = (ArrayType<?, ?>) fieldType;
objects[i] =
convertArrayType(
fieldName, paimonArray, seatunnelArray.getElementType());
break;
case MAP:
String fieldName = seaTunnelRowType.getFieldName(i);
SeaTunnelDataType<?> mapType = seaTunnelRowType.getFieldType(i);
MapType<?, ?> mapType = (MapType<?, ?>) fieldType;
InternalMap map = rowData.getMap(i);
InternalArray keyArray = map.keyArray();
InternalArray valueArray = map.valueArray();
SeaTunnelDataType<?> keyType = ((MapType<?, ?>) mapType).getKeyType();
SeaTunnelDataType<?> valueType = ((MapType<?, ?>) mapType).getValueType();
Object[] key = (Object[]) convert(fieldName, keyArray, keyType);
Object[] value = (Object[]) convert(fieldName, valueArray, valueType);
SeaTunnelDataType<?> keyType = mapType.getKeyType();
SeaTunnelDataType<?> valueType = mapType.getValueType();
Object[] key = (Object[]) convertArrayType(fieldName, keyArray, keyType);
Object[] value = (Object[]) convertArrayType(fieldName, valueArray, valueType);
Map<Object, Object> mapData = new HashMap<>();
for (int j = 0; j < key.length; j++) {
mapData.put(key[j], value[j]);
Expand All @@ -318,9 +323,10 @@ public static SeaTunnelRow convert(InternalRow rowData, SeaTunnelRowType seaTunn
objects[i] = convert(row, (SeaTunnelRowType) rowType);
break;
default:
throw new PaimonConnectorException(
CommonErrorCode.UNSUPPORTED_DATA_TYPE,
"SeaTunnel does not support this type");
throw CommonError.unsupportedDataType(
PaimonConfig.CONNECTOR_IDENTITY,
fieldType.getSqlType().toString(),
fieldName);
}
}
return new SeaTunnelRow(objects);
Expand All @@ -341,8 +347,13 @@ public static InternalRow reconvert(
BinaryWriter binaryWriter = new BinaryRowWriter(binaryRow);
// Convert SeaTunnel RowKind to Paimon RowKind
org.apache.paimon.types.RowKind rowKind =
RowKindConverter.convertSeaTunnelRowKind2PaimonRowKind(seaTunnelRow.getTableId(),
seaTunnelRow.getRowKind());
RowKindConverter.convertSeaTunnelRowKind2PaimonRowKind(seaTunnelRow.getRowKind());
if (rowKind == null) {
throw CommonError.unsupportedRowKind(
PaimonConfig.CONNECTOR_IDENTITY,
seaTunnelRow.getRowKind().shortString(),
seaTunnelRow.getTableId());
}
binaryRow.setRowKind(rowKind);
SeaTunnelDataType<?>[] fieldTypes = seaTunnelRowType.getFieldTypes();
for (int i = 0; i < fieldTypes.length; i++) {
Expand Down Expand Up @@ -415,18 +426,23 @@ public static InternalRow reconvert(
binaryWriter.writeMap(
i,
BinaryMap.valueOf(
reconvert(fieldName, keys, keyType), reconvert(fieldName, values, valueType)),
reconvert(fieldName, keys, keyType),
reconvert(fieldName, values, valueType)),
new InternalMapSerializer(paimonKeyType, paimonValueType));
break;
case ARRAY:
ArrayType<?, ?> arrayType = (ArrayType<?, ?>) seaTunnelRowType.getFieldType(i);
BinaryArray paimonArray =
reconvert(fieldName, seaTunnelRow.getField(i), arrayType.getElementType());
reconvert(
fieldName,
seaTunnelRow.getField(i),
arrayType.getElementType());
binaryWriter.writeArray(
i,
paimonArray,
new InternalArraySerializer(
RowTypeConverter.reconvert(fieldName, arrayType.getElementType())));
RowTypeConverter.reconvert(
fieldName, arrayType.getElementType())));
break;
case ROW:
SeaTunnelDataType<?> rowType = seaTunnelRowType.getFieldType(i);
Expand All @@ -440,7 +456,8 @@ public static InternalRow reconvert(
default:
throw CommonError.unsupportedDataType(
PaimonConfig.CONNECTOR_IDENTITY,
seaTunnelRowType.getFieldType(i).getSqlType().toString(), fieldName);
seaTunnelRowType.getFieldType(i).getSqlType().toString(),
fieldName);
}
}
return binaryRow;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,18 @@
package org.apache.seatunnel.connectors.seatunnel.paimon.utils;

import org.apache.seatunnel.api.table.type.RowKind;
import org.apache.seatunnel.common.exception.CommonError;
import org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonConfig;

import org.apache.paimon.data.InternalRow;

public class RowKindConverter {

/**
* Convert SeaTunnel RowKind {@link RowKind} to Paimon RowKind {@link InternalRow}
* @param tableId Table identifier
*
* @param seaTunnelRowKind The kind of change that a row describes in a changelog.
* @return
*/
public static org.apache.paimon.types.RowKind convertSeaTunnelRowKind2PaimonRowKind(
String tableId,
RowKind seaTunnelRowKind) {
switch (seaTunnelRowKind) {
case DELETE:
Expand All @@ -44,8 +41,7 @@ public static org.apache.paimon.types.RowKind convertSeaTunnelRowKind2PaimonRowK
case INSERT:
return org.apache.paimon.types.RowKind.INSERT;
default:
throw CommonError.unsupportedRowKind(
PaimonConfig.CONNECTOR_IDENTITY, seaTunnelRowKind.shortString(), tableId);
return null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@
import org.apache.paimon.types.VarBinaryType;
import org.apache.paimon.types.VarCharType;

import java.util.Arrays;
import java.util.List;
import java.util.Objects;

Expand Down Expand Up @@ -169,7 +168,8 @@ public static Column convert(BasicTypeDefine<DataType> typeDefine) {
if (seaTunnelDataType == null) {
throw CommonError.unsupportedArrayGenericType(
PaimonConfig.CONNECTOR_IDENTITY,
dataType.getTypeRoot().toString(), typeDefine.getName());
dataType.getTypeRoot().toString(),
typeDefine.getName());
}
break;
case MAP:
Expand All @@ -180,7 +180,9 @@ public static Column convert(BasicTypeDefine<DataType> typeDefine) {
break;
default:
throw CommonError.unsupportedDataType(
PaimonConfig.CONNECTOR_IDENTITY, dataType.asSQLString(), typeDefine.getName());
PaimonConfig.CONNECTOR_IDENTITY,
dataType.asSQLString(),
typeDefine.getName());
}
return physicalColumnBuilder.dataType(seaTunnelDataType).build();
}
Expand All @@ -195,21 +197,13 @@ public static RowType reconvert(SeaTunnelRowType seaTunnelRowType, TableSchema t
SeaTunnelDataType<?>[] fieldTypes = seaTunnelRowType.getFieldTypes();
String[] fieldNames = seaTunnelRowType.getFieldNames();
int totalFields = seaTunnelRowType.getTotalFields();
DataType[] dataTypes = new DataType[totalFields];
for (int i = 0; i < totalFields; i++) {
DataType dataType = SeaTunnelTypeToPaimonVisitor.INSTANCE.visit(fieldNames[i], fieldTypes[i]);
dataTypes[i] = dataType;
}
List<DataField> fields = tableSchema.fields();
// DataType[] dataTypes =
// Arrays.stream(fieldTypes)
// .map(SeaTunnelTypeToPaimonVisitor.INSTANCE::visit)
// .toArray(DataType[]::new);
DataField[] dataFields = new DataField[dataTypes.length];
for (int i = 0; i < dataTypes.length; i++) {
DataType dataType = dataTypes[i];
DataField[] dataFields = new DataField[totalFields];
for (int i = 0; i < totalFields; i++) {
String fieldName = fieldNames[i];
DataType dataType =
SeaTunnelTypeToPaimonVisitor.INSTANCE.visit(fieldName, fieldTypes[i]);
DataTypeRoot typeRoot = dataType.getTypeRoot();
String fieldName = seaTunnelRowType.getFieldName(i);
if (typeRoot.equals(DataTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE)
|| typeRoot.equals(DataTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE)) {
DataField dataField = SchemaUtil.getDataField(fields, fieldName);
Expand All @@ -232,7 +226,9 @@ public static BasicTypeDefine<DataType> reconvert(Column column) {
}

/**
* Mapping SeaTunnel data type {@link SeaTunnelDataType} to Paimon data type {@link DataType}
* Mapping SeaTunnel data type {@link SeaTunnelDataType} of fieldName to Paimon data type {@link
* DataType}
*
* @param fieldName SeaTunnel field name
* @param dataType SeaTunnel data type {@link SeaTunnelDataType}
* @return Paimon data type {@link DataType}
Expand Down Expand Up @@ -354,12 +350,16 @@ public DataType visit(String fieldName, SeaTunnelDataType<?> dataType) {
int totalFields = row.getTotalFields();
DataType[] dataTypes = new DataType[totalFields];
for (int i = 0; i < totalFields; i++) {
dataTypes[i] = SeaTunnelTypeToPaimonVisitor.INSTANCE.visit(fieldNames[i], fieldTypes[i]);
dataTypes[i] =
SeaTunnelTypeToPaimonVisitor.INSTANCE.visit(
fieldNames[i], fieldTypes[i]);
}
return DataTypes.ROW(dataTypes);
default:
throw CommonError.unsupportedDataType(
PaimonConfig.CONNECTOR_IDENTITY, dataType.getSqlType().toString(), fieldName);
PaimonConfig.CONNECTOR_IDENTITY,
dataType.getSqlType().toString(),
fieldName);
}
}
}
Expand Down

0 comments on commit 3915e36

Please sign in to comment.