Skip to content

Commit

Permalink
unsupportedArrayGenericType add fieldname
Browse files Browse the repository at this point in the history
  • Loading branch information
xiaochen-zhou committed May 12, 2024
1 parent 548e68a commit 6e3f204
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -212,10 +212,11 @@ public static SeaTunnelRuntimeException sqlTemplateHandledError(
}

public static SeaTunnelRuntimeException unsupportedArrayGenericType(
String identifier, String dataType) {
String identifier, String dataType, String fieldName) {
Map<String, String> params = new HashMap<>();
params.put("identifier", identifier);
params.put("dataType", dataType);
params.put("fieldName", fieldName);
return new SeaTunnelRuntimeException(UNSUPPORTED_ARRAY_GENERIC_TYPE, params);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public enum CommonErrorCode implements SeaTunnelErrorCode {
"The table of <tableName> has no <keyName>, but the template \n <template> \n which has the place holder named <placeholder>. Please use the option named <optionName> to specify sql template"),
UNSUPPORTED_DATA_TYPE_SIMPLE("COMMON-25", "'<identifier>' unsupported data type '<dataType>'"),
UNSUPPORTED_ARRAY_GENERIC_TYPE(
"COMMON-26", "'<identifier>' array type not support genericType '<genericType>'"),
"COMMON-26", "'<identifier>' array type not support genericType '<genericType>' of '<fieldName>'"),
UNSUPPORTED_ROW_KIND("COMMON-27", "'<identifier>' table '<table>' not support rowKind '<rowKind>'");

private final String code;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ private RowConverter() {}
* @param dataType Data type of the array
* @return SeaTunnel array object
*/
public static Object convert(InternalArray array, SeaTunnelDataType<?> dataType) {
public static Object convert(String fieldName, InternalArray array, SeaTunnelDataType<?> dataType) {
switch (dataType.getSqlType()) {
case STRING:
String[] strings = new String[array.size()];
Expand Down Expand Up @@ -122,7 +122,7 @@ public static Object convert(InternalArray array, SeaTunnelDataType<?> dataType)
return doubles;
default:
throw CommonError.unsupportedArrayGenericType(
PaimonConfig.CONNECTOR_IDENTITY, dataType.getSqlType().toString());
PaimonConfig.CONNECTOR_IDENTITY, dataType.getSqlType().toString(), fieldName);
}
}

Expand All @@ -133,7 +133,7 @@ public static Object convert(InternalArray array, SeaTunnelDataType<?> dataType)
* @param dataType SeaTunnel array data type
* @return Paimon array object {@link BinaryArray}
*/
public static BinaryArray reconvert(Object array, SeaTunnelDataType<?> dataType) {
public static BinaryArray reconvert(String fieldName, Object array, SeaTunnelDataType<?> dataType) {
int length = ((Object[]) array).length;
BinaryArray binaryArray = new BinaryArray();
BinaryArrayWriter binaryArrayWriter;
Expand Down Expand Up @@ -221,7 +221,7 @@ public static BinaryArray reconvert(Object array, SeaTunnelDataType<?> dataType)
break;
default:
throw CommonError.unsupportedArrayGenericType(
PaimonConfig.CONNECTOR_IDENTITY, dataType.getSqlType().toString());
PaimonConfig.CONNECTOR_IDENTITY, dataType.getSqlType().toString(), fieldName);
}
binaryArrayWriter.complete();
return binaryArray;
Expand Down Expand Up @@ -293,17 +293,18 @@ public static SeaTunnelRow convert(InternalRow rowData, SeaTunnelRowType seaTunn
case ARRAY:
SeaTunnelDataType<?> arrayType = seaTunnelRowType.getFieldType(i);
InternalArray array = rowData.getArray(i);
objects[i] = convert(array, ((ArrayType<?, ?>) arrayType).getElementType());
objects[i] = convert(seaTunnelRowType.getFieldName(i), array, ((ArrayType<?, ?>) arrayType).getElementType());
break;
case MAP:
String fieldName = seaTunnelRowType.getFieldName(i);
SeaTunnelDataType<?> mapType = seaTunnelRowType.getFieldType(i);
InternalMap map = rowData.getMap(i);
InternalArray keyArray = map.keyArray();
InternalArray valueArray = map.valueArray();
SeaTunnelDataType<?> keyType = ((MapType<?, ?>) mapType).getKeyType();
SeaTunnelDataType<?> valueType = ((MapType<?, ?>) mapType).getValueType();
Object[] key = (Object[]) convert(keyArray, keyType);
Object[] value = (Object[]) convert(valueArray, valueType);
Object[] key = (Object[]) convert(fieldName, keyArray, keyType);
Object[] value = (Object[]) convert(fieldName, valueArray, valueType);
Map<Object, Object> mapData = new HashMap<>();
for (int j = 0; j < key.length; j++) {
mapData.put(key[j], value[j]);
Expand Down Expand Up @@ -414,13 +415,13 @@ public static InternalRow reconvert(
binaryWriter.writeMap(
i,
BinaryMap.valueOf(
reconvert(keys, keyType), reconvert(values, valueType)),
reconvert(fieldName, keys, keyType), reconvert(fieldName, values, valueType)),
new InternalMapSerializer(paimonKeyType, paimonValueType));
break;
case ARRAY:
ArrayType<?, ?> arrayType = (ArrayType<?, ?>) seaTunnelRowType.getFieldType(i);
BinaryArray paimonArray =
reconvert(seaTunnelRow.getField(i), arrayType.getElementType());
reconvert(fieldName, seaTunnelRow.getField(i), arrayType.getElementType());
binaryWriter.writeArray(
i,
paimonArray,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,11 @@ public static Column convert(BasicTypeDefine<DataType> typeDefine) {
break;
case ARRAY:
seaTunnelDataType = paimonToSeaTunnelTypeVisitor.visit((ArrayType) dataType);
if (seaTunnelDataType == null) {
throw CommonError.unsupportedArrayGenericType(
PaimonConfig.CONNECTOR_IDENTITY,
dataType.getTypeRoot().toString(), typeDefine.getName());
}
break;
case MAP:
seaTunnelDataType = paimonToSeaTunnelTypeVisitor.visit((MapType) dataType);
Expand Down Expand Up @@ -468,9 +473,7 @@ public SeaTunnelDataType<?> visit(ArrayType arrayType) {
case DOUBLE:
return org.apache.seatunnel.api.table.type.ArrayType.DOUBLE_ARRAY_TYPE;
default:
throw CommonError.unsupportedArrayGenericType(
PaimonConfig.CONNECTOR_IDENTITY,
seaTunnelArrayType.getSqlType().toString());
return null;
}
}

Expand Down

0 comments on commit 6e3f204

Please sign in to comment.