Skip to content

Commit

Permalink
Handle non serializable avro Schema
Browse files Browse the repository at this point in the history
  • Loading branch information
RustedBones committed Dec 10, 2024
1 parent c62c184 commit a7a81be
Showing 1 changed file with 18 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.google.cloud.bigquery.storage.v1.DataFormat;
import java.io.IOException;
import java.io.InvalidClassException;
import java.io.Serializable;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -156,7 +157,14 @@ public Row toConfigRow(TypedRead<?> transform) {
fieldValues.put("bigquery_services", toByteArray(transform.getBigQueryServices()));
}
if (transform.getAvroSchema() != null) {
fieldValues.put("avro_schema", toByteArray(transform.getAvroSchema()));
org.apache.avro.Schema avroSchema = transform.getAvroSchema();
// avro 1.8 Schema is not serializable
if (avroSchema instanceof Serializable) {
fieldValues.put("avro_schema", toByteArray(transform.getAvroSchema()));
} else {
String avroSchemaStr = avroSchema.toString();
fieldValues.put("avro_schema", toByteArray(avroSchemaStr));
}
}
if (transform.getDatumReaderFactory() != null) {
fieldValues.put("datum_reader_factory", toByteArray(transform.getDatumReaderFactory()));
Expand Down Expand Up @@ -278,7 +286,15 @@ public TypedRead<?> fromConfigRow(Row configRow, PipelineOptions options) {
}
byte[] avroSchemaBytes = configRow.getBytes("avro_schema");
if (avroSchemaBytes != null) {
builder = builder.setAvroSchema((org.apache.avro.Schema) fromByteArray(avroSchemaBytes));
Object avroSchemaObj = fromByteArray(avroSchemaBytes);
if (avroSchemaObj instanceof org.apache.avro.Schema) {
builder = builder.setAvroSchema((org.apache.avro.Schema) avroSchemaObj);
} else {
String avroSchemaStr = (String) avroSchemaObj;
org.apache.avro.Schema avroSchema =
new org.apache.avro.Schema.Parser().parse(avroSchemaStr);
builder = builder.setAvroSchema(avroSchema);
}
}
byte[] parseFnBytes = configRow.getBytes("parse_fn");
if (parseFnBytes != null) {
Expand Down

0 comments on commit a7a81be

Please sign in to comment.