Skip to content

Commit

Permalink
[refactor](hudi, iceberg) optimize some code (apache#42636)
Browse files Browse the repository at this point in the history
make code more readable
  • Loading branch information
morningman authored Oct 28, 2024
1 parent 3845c36 commit 5b46c84
Show file tree
Hide file tree
Showing 3 changed files with 153 additions and 118 deletions.
242 changes: 133 additions & 109 deletions fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/HudiUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.doris.catalog.StructType;
import org.apache.doris.catalog.Type;

import com.google.common.base.Preconditions;
import org.apache.avro.LogicalType;
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
Expand All @@ -41,30 +40,58 @@
public class HudiUtils {
private static final SimpleDateFormat defaultDateFormat = new SimpleDateFormat("yyyy-MM-dd");

public static String fromAvroHudiTypeToHiveTypeString(Schema avroSchema) {
Schema.Type columnType = avroSchema.getType();
LogicalType logicalType = avroSchema.getLogicalType();
switch (columnType) {
/**
* Convert different query instant time format to the commit time format.
* Currently we support three kinds of instant time format for time travel query:
* 1、yyyy-MM-dd HH:mm:ss
* 2、yyyy-MM-dd
* This will convert to 'yyyyMMdd000000'.
* 3、yyyyMMddHHmmss
*/
public static String formatQueryInstant(String queryInstant) throws ParseException {
int instantLength = queryInstant.length();
if (instantLength == 19 || instantLength == 23) { // for yyyy-MM-dd HH:mm:ss[.SSS]
if (instantLength == 19) {
queryInstant += ".000";
}
return HoodieInstantTimeGenerator.getInstantForDateString(queryInstant);
} else if (instantLength == HoodieInstantTimeGenerator.SECS_INSTANT_ID_LENGTH
|| instantLength == HoodieInstantTimeGenerator.MILLIS_INSTANT_ID_LENGTH) { // for yyyyMMddHHmmss[SSS]
HoodieActiveTimeline.parseDateFromInstantTime(queryInstant); // validate the format
return queryInstant;
} else if (instantLength == 10) { // for yyyy-MM-dd
return HoodieActiveTimeline.formatDate(defaultDateFormat.parse(queryInstant));
} else {
throw new IllegalArgumentException("Unsupported query instant time format: " + queryInstant
+ ", Supported time format are: 'yyyy-MM-dd HH:mm:ss[.SSS]' "
+ "or 'yyyy-MM-dd' or 'yyyyMMddHHmmss[SSS]'");
}
}

public static String convertAvroToHiveType(Schema schema) {
Schema.Type type = schema.getType();
LogicalType logicalType = schema.getLogicalType();

switch (type) {
case BOOLEAN:
return "boolean";
case INT:
if (logicalType instanceof LogicalTypes.Date) {
return "date";
} else if (logicalType instanceof LogicalTypes.TimeMillis) {
break;
} else {
return "int";
}
if (logicalType instanceof LogicalTypes.TimeMillis) {
return handleUnsupportedType(schema);
}
return "int";
case LONG:
if (logicalType instanceof LogicalTypes.TimestampMillis
|| logicalType instanceof LogicalTypes.TimestampMicros) {
return logicalType.getName();
}
if (logicalType instanceof LogicalTypes.TimeMicros) {
break;
} else if (logicalType instanceof LogicalTypes.TimestampMillis) {
return "timestamp(3)";
} else if (logicalType instanceof LogicalTypes.TimestampMicros) {
return "timestamp(6)";
} else {
return "bigint";
return handleUnsupportedType(schema);
}
return "bigint";
case FLOAT:
return "float";
case DOUBLE:
Expand All @@ -74,71 +101,57 @@ public static String fromAvroHudiTypeToHiveTypeString(Schema avroSchema) {
case FIXED:
case BYTES:
if (logicalType instanceof LogicalTypes.Decimal) {
int precision = ((LogicalTypes.Decimal) logicalType).getPrecision();
int scale = ((LogicalTypes.Decimal) logicalType).getScale();
return String.format("decimal(%s,%s)", precision, scale);
} else {
if (columnType == Schema.Type.BYTES) {
return "binary";
}
return "string";
LogicalTypes.Decimal decimalType = (LogicalTypes.Decimal) logicalType;
return String.format("decimal(%d,%d)", decimalType.getPrecision(), decimalType.getScale());
}
return "string";
case ARRAY:
String elementType = fromAvroHudiTypeToHiveTypeString(avroSchema.getElementType());
return String.format("array<%s>", elementType);
String arrayElementType = convertAvroToHiveType(schema.getElementType());
return String.format("array<%s>", arrayElementType);
case RECORD:
List<Field> fields = avroSchema.getFields();
Preconditions.checkArgument(fields.size() > 0);
String nameToType = fields.stream()
.map(f -> String.format("%s:%s", f.name(),
fromAvroHudiTypeToHiveTypeString(f.schema())))
List<Field> recordFields = schema.getFields();
if (recordFields.isEmpty()) {
throw new IllegalArgumentException("Record must have fields");
}
String structFields = recordFields.stream()
.map(field -> String.format("%s:%s", field.name(), convertAvroToHiveType(field.schema())))
.collect(Collectors.joining(","));
return String.format("struct<%s>", nameToType);
return String.format("struct<%s>", structFields);
case MAP:
Schema value = avroSchema.getValueType();
String valueType = fromAvroHudiTypeToHiveTypeString(value);
return String.format("map<%s,%s>", "string", valueType);
Schema mapValueType = schema.getValueType();
String mapValueTypeString = convertAvroToHiveType(mapValueType);
return String.format("map<string,%s>", mapValueTypeString);
case UNION:
List<Schema> nonNullMembers = avroSchema.getTypes().stream()
.filter(schema -> !Schema.Type.NULL.equals(schema.getType()))
List<Schema> unionTypes = schema.getTypes().stream()
.filter(s -> s.getType() != Schema.Type.NULL)
.collect(Collectors.toList());
// The nullable column in hudi is the union type with schemas [null, real column type]
if (nonNullMembers.size() == 1) {
return fromAvroHudiTypeToHiveTypeString(nonNullMembers.get(0));
if (unionTypes.size() == 1) {
return convertAvroToHiveType(unionTypes.get(0));
}
break;
default:
break;
}
String errorMsg = String.format("Unsupported hudi %s type of column %s", avroSchema.getType().getName(),
avroSchema.getName());
throw new IllegalArgumentException(errorMsg);

throw new IllegalArgumentException(
String.format("Unsupported type: %s for column: %s", type.getName(), schema.getName()));
}

private static String handleUnsupportedType(Schema schema) {
throw new IllegalArgumentException(String.format("Unsupported logical type: %s", schema.getLogicalType()));
}

public static Type fromAvroHudiTypeToDorisType(Schema avroSchema) {
Schema.Type columnType = avroSchema.getType();
LogicalType logicalType = avroSchema.getLogicalType();

switch (columnType) {
case BOOLEAN:
return Type.BOOLEAN;
case INT:
if (logicalType instanceof LogicalTypes.Date) {
return ScalarType.createDateV2Type();
} else if (logicalType instanceof LogicalTypes.TimeMillis) {
return ScalarType.createTimeV2Type(3);
} else {
return Type.INT;
}
return handleIntType(logicalType);
case LONG:
if (logicalType instanceof LogicalTypes.TimeMicros) {
return ScalarType.createTimeV2Type(6);
} else if (logicalType instanceof LogicalTypes.TimestampMillis) {
return ScalarType.createDatetimeV2Type(3);
} else if (logicalType instanceof LogicalTypes.TimestampMicros) {
return ScalarType.createDatetimeV2Type(6);
} else {
return Type.BIGINT;
}
return handleLongType(logicalType);
case FLOAT:
return Type.FLOAT;
case DOUBLE:
Expand All @@ -147,64 +160,75 @@ public static Type fromAvroHudiTypeToDorisType(Schema avroSchema) {
return Type.STRING;
case FIXED:
case BYTES:
if (logicalType instanceof LogicalTypes.Decimal) {
int precision = ((LogicalTypes.Decimal) logicalType).getPrecision();
int scale = ((LogicalTypes.Decimal) logicalType).getScale();
return ScalarType.createDecimalV3Type(precision, scale);
} else {
return Type.STRING;
}
return handleFixedOrBytesType(logicalType);
case ARRAY:
Type innerType = fromAvroHudiTypeToDorisType(avroSchema.getElementType());
return ArrayType.create(innerType, true);
return handleArrayType(avroSchema);
case RECORD:
ArrayList<StructField> fields = new ArrayList<>();
avroSchema.getFields().forEach(
f -> fields.add(new StructField(f.name(), fromAvroHudiTypeToDorisType(f.schema()))));
return new StructType(fields);
return handleRecordType(avroSchema);
case MAP:
// Hudi map's key must be string
return new MapType(Type.STRING, fromAvroHudiTypeToDorisType(avroSchema.getValueType()));
return handleMapType(avroSchema);
case UNION:
List<Schema> nonNullMembers = avroSchema.getTypes().stream()
.filter(schema -> !Schema.Type.NULL.equals(schema.getType()))
.collect(Collectors.toList());
// The nullable column in hudi is the union type with schemas [null, real column type]
if (nonNullMembers.size() == 1) {
return fromAvroHudiTypeToDorisType(nonNullMembers.get(0));
}
break;
return handleUnionType(avroSchema);
default:
break;
return Type.UNSUPPORTED;
}
return Type.UNSUPPORTED;
}

/**
* Convert different query instant time format to the commit time format.
* Currently we support three kinds of instant time format for time travel query:
* 1、yyyy-MM-dd HH:mm:ss
* 2、yyyy-MM-dd
* This will convert to 'yyyyMMdd000000'.
* 3、yyyyMMddHHmmss
*/
public static String formatQueryInstant(String queryInstant) throws ParseException {
int instantLength = queryInstant.length();
if (instantLength == 19 || instantLength == 23) { // for yyyy-MM-dd HH:mm:ss[.SSS]
if (instantLength == 19) {
queryInstant += ".000";
}
return HoodieInstantTimeGenerator.getInstantForDateString(queryInstant);
} else if (instantLength == HoodieInstantTimeGenerator.SECS_INSTANT_ID_LENGTH
|| instantLength == HoodieInstantTimeGenerator.MILLIS_INSTANT_ID_LENGTH) { // for yyyyMMddHHmmss[SSS]
HoodieActiveTimeline.parseDateFromInstantTime(queryInstant); // validate the format
return queryInstant;
} else if (instantLength == 10) { // for yyyy-MM-dd
return HoodieActiveTimeline.formatDate(defaultDateFormat.parse(queryInstant));
} else {
throw new IllegalArgumentException("Unsupported query instant time format: " + queryInstant
+ ", Supported time format are: 'yyyy-MM-dd HH:mm:ss[.SSS]' "
+ "or 'yyyy-MM-dd' or 'yyyyMMddHHmmss[SSS]'");
private static Type handleIntType(LogicalType logicalType) {
if (logicalType instanceof LogicalTypes.Date) {
return ScalarType.createDateV2Type();
}
if (logicalType instanceof LogicalTypes.TimeMillis) {
return ScalarType.createTimeV2Type(3);
}
return Type.INT;
}

private static Type handleLongType(LogicalType logicalType) {
if (logicalType instanceof LogicalTypes.TimeMicros) {
return ScalarType.createTimeV2Type(6);
}
if (logicalType instanceof LogicalTypes.TimestampMillis) {
return ScalarType.createDatetimeV2Type(3);
}
if (logicalType instanceof LogicalTypes.TimestampMicros) {
return ScalarType.createDatetimeV2Type(6);
}
return Type.BIGINT;
}

private static Type handleFixedOrBytesType(LogicalType logicalType) {
if (logicalType instanceof LogicalTypes.Decimal) {
int precision = ((LogicalTypes.Decimal) logicalType).getPrecision();
int scale = ((LogicalTypes.Decimal) logicalType).getScale();
return ScalarType.createDecimalV3Type(precision, scale);
}
return Type.STRING;
}

private static Type handleArrayType(Schema avroSchema) {
Type innerType = fromAvroHudiTypeToDorisType(avroSchema.getElementType());
return ArrayType.create(innerType, true);
}

private static Type handleRecordType(Schema avroSchema) {
ArrayList<StructField> fields = new ArrayList<>();
avroSchema.getFields().forEach(
f -> fields.add(new StructField(f.name(), fromAvroHudiTypeToDorisType(f.schema()))));
return new StructType(fields);
}

private static Type handleMapType(Schema avroSchema) {
return new MapType(Type.STRING, fromAvroHudiTypeToDorisType(avroSchema.getValueType()));
}

private static Type handleUnionType(Schema avroSchema) {
List<Schema> nonNullMembers = avroSchema.getTypes().stream()
.filter(schema -> !Schema.Type.NULL.equals(schema.getType()))
.collect(Collectors.toList());
if (nonNullMembers.size() == 1) {
return fromAvroHudiTypeToDorisType(nonNullMembers.get(0));
}
return Type.UNSUPPORTED;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ protected void doInitialize() throws UserException {
}
for (Schema.Field hudiField : hudiSchema.getFields()) {
columnNames.add(hudiField.name().toLowerCase(Locale.ROOT));
String columnType = HudiUtils.fromAvroHudiTypeToHiveTypeString(hudiField.schema());
String columnType = HudiUtils.convertAvroToHiveType(hudiField.schema());
columnTypes.add(columnType);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,22 +41,32 @@ public class DLFCachedClientPool implements ClientPool<IMetaStoreClient, TExcept
public DLFCachedClientPool(Configuration conf, Map<String, String> properties) {
this.conf = conf;
this.endpoint = conf.get("", "");
this.clientPoolSize =
PropertyUtil.propertyAsInt(
this.clientPoolSize = getClientPoolSize(properties);
this.evictionInterval = getEvictionInterval(properties);
initializeClientPoolCache();
}

private int getClientPoolSize(Map<String, String> properties) {
return PropertyUtil.propertyAsInt(
properties,
CatalogProperties.CLIENT_POOL_SIZE,
CatalogProperties.CLIENT_POOL_SIZE_DEFAULT);
this.evictionInterval =
PropertyUtil.propertyAsLong(
CatalogProperties.CLIENT_POOL_SIZE_DEFAULT
);
}

private long getEvictionInterval(Map<String, String> properties) {
return PropertyUtil.propertyAsLong(
properties,
CatalogProperties.CLIENT_POOL_CACHE_EVICTION_INTERVAL_MS,
CatalogProperties.CLIENT_POOL_CACHE_EVICTION_INTERVAL_MS_DEFAULT);
CatalogProperties.CLIENT_POOL_CACHE_EVICTION_INTERVAL_MS_DEFAULT
);
}

private void initializeClientPoolCache() {
if (clientPoolCache == null) {
synchronized (clientPoolCacheLock) {
if (clientPoolCache == null) {
clientPoolCache =
Caffeine.newBuilder()
clientPoolCache = Caffeine.newBuilder()
.expireAfterAccess(evictionInterval, TimeUnit.MILLISECONDS)
.removalListener((key, value, cause) -> ((DLFClientPool) value).close())
.build();
Expand All @@ -80,3 +90,4 @@ public <R> R run(Action<R, IMetaStoreClient, TException> action, boolean retry)
return clientPool().run(action, retry);
}
}

0 comments on commit 5b46c84

Please sign in to comment.