Skip to content

Commit

Permalink
Avro datum factory fixup (#27173)
Browse files Browse the repository at this point in the history
* Remove DecimalConversion from logical-type conversions

* Restore AvroCloudObject compatibility
  • Loading branch information
RustedBones authored Jun 20, 2023
1 parent 7a4cbc1 commit 4c66866
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@
class AvroCoderCloudObjectTranslator implements CloudObjectTranslator<AvroCoder> {
private static final String DATUM_FACTORY_FIELD = "datum_factory";
private static final String SCHEMA_FIELD = "schema";
// deprecated fields
private static final String TYPE_FIELD = "type";
private static final String REFLECT_API_FIELD = "reflect_api";

@Override
public CloudObject toCloudObject(AvroCoder target, SdkComponents sdkComponents) {
Expand All @@ -45,15 +48,28 @@ public CloudObject toCloudObject(AvroCoder target, SdkComponents sdkComponents)

@Override
public AvroCoder<?> fromCloudObject(CloudObject cloudObject) {
Schema.Parser parser = new Schema.Parser();
byte[] deserializedDatumFactory =
StringUtils.jsonStringToByteArray(Structs.getString(cloudObject, DATUM_FACTORY_FIELD));
String schemaString = Structs.getString(cloudObject, SCHEMA_FIELD);
Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(schemaString);
AvroDatumFactory<?> datumFactory =
(AvroDatumFactory)
SerializableUtils.deserializeFromByteArray(
deserializedDatumFactory, DATUM_FACTORY_FIELD);
AvroDatumFactory<?> datumFactory;
if (cloudObject.containsKey(TYPE_FIELD)) {
// coder was created with an older beam version. use default datum factory
try {
String className = Structs.getString(cloudObject, TYPE_FIELD);
Class<?> type = Class.forName(className);
boolean useReflectApi = Structs.getBoolean(cloudObject, REFLECT_API_FIELD);
datumFactory = AvroDatumFactory.of(type, useReflectApi);
} catch (ClassNotFoundException e) {
throw new IllegalArgumentException(e);
}
} else {
byte[] deserializedDatumFactory =
StringUtils.jsonStringToByteArray(Structs.getString(cloudObject, DATUM_FACTORY_FIELD));
datumFactory =
(AvroDatumFactory)
SerializableUtils.deserializeFromByteArray(
deserializedDatumFactory, DATUM_FACTORY_FIELD);
}
return AvroCoder.of(datumFactory, schema);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,8 @@ public class AvroUtils {
private static final ForLoadedType JODA_INSTANT = new ForLoadedType(Instant.class);

public static void addLogicalTypeConversions(final GenericData data) {
data.addLogicalTypeConversion(new Conversions.DecimalConversion());
// do not add DecimalConversion by default as schema must have extra 'scale' and 'precision'
// properties. avro reflect already handles BigDecimal as string with the 'java-class' property
data.addLogicalTypeConversion(new Conversions.UUIDConversion());
// joda-time
data.addLogicalTypeConversion(new AvroJodaTimeConversions.DateConversion());
Expand Down

0 comments on commit 4c66866

Please sign in to comment.