From ee604a5eb8a49e6c89d5cae37c7122ce2972151f Mon Sep 17 00:00:00 2001 From: Michel Davit Date: Fri, 27 Sep 2024 23:50:02 +0200 Subject: [PATCH] Improve BQ <-> Avro conversions (#32482) * Improve BQ avro handling * Include minimal changeset * Apply review comments * Streamline avro type conversion with beam schema --- .../io/gcp/bigquery/BigQueryAvroUtils.java | 427 +++++++++--------- .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 7 +- .../io/gcp/bigquery/BigQuerySourceBase.java | 3 +- .../sdk/io/gcp/bigquery/BigQueryUtils.java | 59 ++- .../gcp/bigquery/BigQueryAvroUtilsTest.java | 213 ++++++--- 5 files changed, 403 insertions(+), 306 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java index 911d5b4d0aa3..cddde05b194c 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java @@ -21,15 +21,12 @@ import static java.time.temporal.ChronoField.MINUTE_OF_HOUR; import static java.time.temporal.ChronoField.NANO_OF_SECOND; import static java.time.temporal.ChronoField.SECOND_OF_MINUTE; -import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects.firstNonNull; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Verify.verify; -import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Verify.verifyNotNull; import com.google.api.services.bigquery.model.TableFieldSchema; import com.google.api.services.bigquery.model.TableRow; import com.google.api.services.bigquery.model.TableSchema; -import java.math.BigDecimal; import java.nio.ByteBuffer; import java.time.LocalDate; import java.time.LocalTime; @@ -37,7 +34,6 @@ import java.util.ArrayList; import java.util.HashSet; import java.util.List; -import java.util.Optional; import java.util.Set; import org.apache.avro.Conversions; import org.apache.avro.LogicalType; @@ -45,10 +41,10 @@ import org.apache.avro.Schema; import org.apache.avro.Schema.Field; import org.apache.avro.Schema.Type; +import org.apache.avro.SchemaBuilder; +import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableCollection; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMultimap; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.BaseEncoding; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.format.DateTimeFormat; @@ -62,35 +58,96 @@ */ class BigQueryAvroUtils { + // org.apache.avro.LogicalType + static class DateTimeLogicalType extends LogicalType { + public DateTimeLogicalType() { + super("datetime"); + } + } + + static final DateTimeLogicalType DATETIME_LOGICAL_TYPE = new DateTimeLogicalType(); + /** * Defines the valid mapping between BigQuery types and native Avro types. * - *

Some BigQuery types are duplicated here since slightly different Avro records are produced - * when exporting data in Avro format and when reading data directly using the read API. + * @see BQ avro + * export + * @see BQ + * avro storage */ - static final ImmutableMultimap BIG_QUERY_TO_AVRO_TYPES = - ImmutableMultimap.builder() - .put("STRING", Type.STRING) - .put("GEOGRAPHY", Type.STRING) - .put("BYTES", Type.BYTES) - .put("INTEGER", Type.LONG) - .put("INT64", Type.LONG) - .put("FLOAT", Type.DOUBLE) - .put("FLOAT64", Type.DOUBLE) - .put("NUMERIC", Type.BYTES) - .put("BIGNUMERIC", Type.BYTES) - .put("BOOLEAN", Type.BOOLEAN) - .put("BOOL", Type.BOOLEAN) - .put("TIMESTAMP", Type.LONG) - .put("RECORD", Type.RECORD) - .put("STRUCT", Type.RECORD) - .put("DATE", Type.STRING) - .put("DATE", Type.INT) - .put("DATETIME", Type.STRING) - .put("TIME", Type.STRING) - .put("TIME", Type.LONG) - .put("JSON", Type.STRING) - .build(); + static Schema getPrimitiveType(TableFieldSchema schema, Boolean useAvroLogicalTypes) { + String bqType = schema.getType(); + // see + // https://googleapis.dev/java/google-api-services-bigquery/latest/com/google/api/services/bigquery/model/TableFieldSchema.html#getType-- + switch (bqType) { + case "STRING": + // string + return SchemaBuilder.builder().stringType(); + case "BYTES": + // bytes + return SchemaBuilder.builder().bytesType(); + case "INTEGER": + case "INT64": + // long + return SchemaBuilder.builder().longType(); + case "FLOAT": + case "FLOAT64": + // double + return SchemaBuilder.builder().doubleType(); + case "BOOLEAN": + case "BOOL": + // boolean + return SchemaBuilder.builder().booleanType(); + case "TIMESTAMP": + // in Extract Jobs, it always uses the Avro logical type + // we may have to change this if we move to EXPORT DATA + return LogicalTypes.timestampMicros().addToSchema(SchemaBuilder.builder().longType()); + case "DATE": + if (useAvroLogicalTypes) { + return LogicalTypes.date().addToSchema(SchemaBuilder.builder().intType()); + } else { + return SchemaBuilder.builder().stringBuilder().prop("sqlType", bqType).endString(); + } + case "TIME": + if (useAvroLogicalTypes) { + return LogicalTypes.timeMicros().addToSchema(SchemaBuilder.builder().longType()); + } else { + return SchemaBuilder.builder().stringBuilder().prop("sqlType", bqType).endString(); + } + case "DATETIME": + if (useAvroLogicalTypes) { + return DATETIME_LOGICAL_TYPE.addToSchema(SchemaBuilder.builder().stringType()); + } else { + return SchemaBuilder.builder().stringBuilder().prop("sqlType", bqType).endString(); + } + case "NUMERIC": + case "BIGNUMERIC": + // decimal + LogicalType logicalType; + if (schema.getScale() != null) { + logicalType = + LogicalTypes.decimal(schema.getPrecision().intValue(), schema.getScale().intValue()); + } else if (schema.getPrecision() != null) { + logicalType = LogicalTypes.decimal(schema.getPrecision().intValue()); + } else if (bqType.equals("NUMERIC")) { + logicalType = LogicalTypes.decimal(38, 9); + } else { + // BIGNUMERIC + logicalType = LogicalTypes.decimal(77, 38); + } + return logicalType.addToSchema(SchemaBuilder.builder().bytesType()); + case "GEOGRAPHY": + case "JSON": + return SchemaBuilder.builder().stringBuilder().prop("sqlType", bqType).endString(); + case "RECORD": + case "STRUCT": + // record + throw new IllegalArgumentException("RECORD/STRUCT are not primitive types"); + case "RANGE": // TODO add support for range type + default: + throw new IllegalArgumentException("Unknown BigQuery type: " + bqType); + } + } /** * Formats BigQuery seconds-since-epoch into String matching JSON export. Thread-safe and @@ -179,20 +236,27 @@ private static String formatTime(long timeMicros) { * *

See "Avro * format" for more information. + * + * @deprecated Only kept for previous TableRowParser implementation */ + @Deprecated static TableRow convertGenericRecordToTableRow(GenericRecord record, TableSchema schema) { - return convertGenericRecordToTableRow(record, schema.getFields()); + return convertGenericRecordToTableRow(record); } - private static TableRow convertGenericRecordToTableRow( - GenericRecord record, List fields) { + /** + * Utility function to convert from an Avro {@link GenericRecord} to a BigQuery {@link TableRow}. + * + *

See "Avro + * format" for more information. + */ + static TableRow convertGenericRecordToTableRow(GenericRecord record) { TableRow row = new TableRow(); - for (TableFieldSchema subSchema : fields) { - // Per https://cloud.google.com/bigquery/docs/reference/v2/tables#schema, the name field - // is required, so it may not be null. - Field field = record.getSchema().getField(subSchema.getName()); + Schema schema = record.getSchema(); + + for (Field field : schema.getFields()) { Object convertedValue = - getTypedCellValue(field.schema(), subSchema, record.get(field.name())); + getTypedCellValue(field.name(), field.schema(), record.get(field.pos())); if (convertedValue != null) { // To match the JSON files exported by BigQuery, do not include null values in the output. row.set(field.name(), convertedValue); @@ -202,32 +266,22 @@ private static TableRow convertGenericRecordToTableRow( return row; } - private static @Nullable Object getTypedCellValue( - Schema schema, TableFieldSchema fieldSchema, Object v) { - // Per https://cloud.google.com/bigquery/docs/reference/v2/tables#schema, the mode field - // is optional (and so it may be null), but defaults to "NULLABLE". - String mode = firstNonNull(fieldSchema.getMode(), "NULLABLE"); - switch (mode) { - case "REQUIRED": - return convertRequiredField(schema.getType(), schema.getLogicalType(), fieldSchema, v); - case "REPEATED": - return convertRepeatedField(schema, fieldSchema, v); - case "NULLABLE": - return convertNullableField(schema, fieldSchema, v); - default: + private static @Nullable Object getTypedCellValue(String name, Schema schema, Object v) { + Type type = schema.getType(); + switch (type) { + case ARRAY: + return convertRepeatedField(name, schema.getElementType(), v); + case UNION: + return convertNullableField(name, schema, v); + case MAP: throw new UnsupportedOperationException( - "Parsing a field with BigQuery field schema mode " + fieldSchema.getMode()); + String.format("Unexpected Avro field schema type %s for field named %s", type, name)); + default: + return convertRequiredField(name, schema, v); } } - private static List convertRepeatedField( - Schema schema, TableFieldSchema fieldSchema, Object v) { - Type arrayType = schema.getType(); - verify( - arrayType == Type.ARRAY, - "BigQuery REPEATED field %s should be Avro ARRAY, not %s", - fieldSchema.getName(), - arrayType); + private static List convertRepeatedField(String name, Schema elementType, Object v) { // REPEATED fields are represented as Avro arrays. if (v == null) { // Handle the case of an empty repeated field. @@ -236,145 +290,100 @@ private static List convertRepeatedField( @SuppressWarnings("unchecked") List elements = (List) v; ArrayList values = new ArrayList<>(); - Type elementType = schema.getElementType().getType(); - LogicalType elementLogicalType = schema.getElementType().getLogicalType(); for (Object element : elements) { - values.add(convertRequiredField(elementType, elementLogicalType, fieldSchema, element)); + values.add(convertRequiredField(name, elementType, element)); } return values; } - private static Object convertRequiredField( - Type avroType, LogicalType avroLogicalType, TableFieldSchema fieldSchema, Object v) { + private static Object convertRequiredField(String name, Schema schema, Object v) { // REQUIRED fields are represented as the corresponding Avro types. For example, a BigQuery // INTEGER type maps to an Avro LONG type. - checkNotNull(v, "REQUIRED field %s should not be null", fieldSchema.getName()); - // Per https://cloud.google.com/bigquery/docs/reference/v2/tables#schema, the type field - // is required, so it may not be null. - String bqType = fieldSchema.getType(); - ImmutableCollection expectedAvroTypes = BIG_QUERY_TO_AVRO_TYPES.get(bqType); - verifyNotNull(expectedAvroTypes, "Unsupported BigQuery type: %s", bqType); - verify( - expectedAvroTypes.contains(avroType), - "Expected Avro schema types %s for BigQuery %s field %s, but received %s", - expectedAvroTypes, - bqType, - fieldSchema.getName(), - avroType); - // For historical reasons, don't validate avroLogicalType except for with NUMERIC. - // BigQuery represents NUMERIC in Avro format as BYTES with a DECIMAL logical type. - switch (bqType) { - case "STRING": - case "DATETIME": - case "GEOGRAPHY": - case "JSON": - // Avro will use a CharSequence to represent String objects, but it may not always use - // java.lang.String; for example, it may prefer org.apache.avro.util.Utf8. - verify(v instanceof CharSequence, "Expected CharSequence (String), got %s", v.getClass()); - return v.toString(); - case "DATE": - if (avroType == Type.INT) { - verify(v instanceof Integer, "Expected Integer, got %s", v.getClass()); - verifyNotNull(avroLogicalType, "Expected Date logical type"); - verify(avroLogicalType instanceof LogicalTypes.Date, "Expected Date logical type"); + checkNotNull(v, "REQUIRED field %s should not be null", name); + + Type type = schema.getType(); + LogicalType logicalType = schema.getLogicalType(); + switch (type) { + case BOOLEAN: + // SQL types BOOL, BOOLEAN + return v; + case INT: + if (logicalType instanceof LogicalTypes.Date) { + // SQL types DATE return formatDate((Integer) v); } else { - verify(v instanceof CharSequence, "Expected CharSequence (String), got %s", v.getClass()); - return v.toString(); + throw new UnsupportedOperationException( + String.format("Unexpected Avro field schema type %s for field named %s", type, name)); } - case "TIME": - if (avroType == Type.LONG) { - verify(v instanceof Long, "Expected Long, got %s", v.getClass()); - verifyNotNull(avroLogicalType, "Expected TimeMicros logical type"); - verify( - avroLogicalType instanceof LogicalTypes.TimeMicros, - "Expected TimeMicros logical type"); + case LONG: + if (logicalType instanceof LogicalTypes.TimeMicros) { + // SQL types TIME return formatTime((Long) v); + } else if (logicalType instanceof LogicalTypes.TimestampMicros) { + // SQL types TIMESTAMP + return formatTimestamp((Long) v); } else { - verify(v instanceof CharSequence, "Expected CharSequence (String), got %s", v.getClass()); - return v.toString(); + // SQL types INT64 (INT, SMALLINT, INTEGER, BIGINT, TINYINT, BYTEINT) + return ((Long) v).toString(); } - case "INTEGER": - case "INT64": - verify(v instanceof Long, "Expected Long, got %s", v.getClass()); - return ((Long) v).toString(); - case "FLOAT": - case "FLOAT64": - verify(v instanceof Double, "Expected Double, got %s", v.getClass()); + case DOUBLE: + // SQL types FLOAT64 return v; - case "NUMERIC": - case "BIGNUMERIC": - // NUMERIC data types are represented as BYTES with the DECIMAL logical type. They are - // converted back to Strings with precision and scale determined by the logical type. - verify(v instanceof ByteBuffer, "Expected ByteBuffer, got %s", v.getClass()); - verifyNotNull(avroLogicalType, "Expected Decimal logical type"); - verify(avroLogicalType instanceof LogicalTypes.Decimal, "Expected Decimal logical type"); - BigDecimal numericValue = - new Conversions.DecimalConversion() - .fromBytes((ByteBuffer) v, Schema.create(avroType), avroLogicalType); - return numericValue.toString(); - case "BOOL": - case "BOOLEAN": - verify(v instanceof Boolean, "Expected Boolean, got %s", v.getClass()); - return v; - case "TIMESTAMP": - // TIMESTAMP data types are represented as Avro LONG types, microseconds since the epoch. - // Values may be negative since BigQuery timestamps start at 0001-01-01 00:00:00 UTC. - verify(v instanceof Long, "Expected Long, got %s", v.getClass()); - return formatTimestamp((Long) v); - case "RECORD": - case "STRUCT": - verify(v instanceof GenericRecord, "Expected GenericRecord, got %s", v.getClass()); - return convertGenericRecordToTableRow((GenericRecord) v, fieldSchema.getFields()); - case "BYTES": - verify(v instanceof ByteBuffer, "Expected ByteBuffer, got %s", v.getClass()); - ByteBuffer byteBuffer = (ByteBuffer) v; - byte[] bytes = new byte[byteBuffer.limit()]; - byteBuffer.get(bytes); - return BaseEncoding.base64().encode(bytes); + case BYTES: + if (logicalType instanceof LogicalTypes.Decimal) { + // SQL tpe NUMERIC, BIGNUMERIC + return new Conversions.DecimalConversion() + .fromBytes((ByteBuffer) v, schema, logicalType) + .toString(); + } else { + // SQL types BYTES + return BaseEncoding.base64().encode(((ByteBuffer) v).array()); + } + case STRING: + // SQL types STRING, DATETIME, GEOGRAPHY, JSON + // when not using logical type DATE, TIME too + return v.toString(); + case RECORD: + return convertGenericRecordToTableRow((GenericRecord) v); default: throw new UnsupportedOperationException( - String.format( - "Unexpected BigQuery field schema type %s for field named %s", - fieldSchema.getType(), fieldSchema.getName())); + String.format("Unexpected Avro field schema type %s for field named %s", type, name)); } } - private static @Nullable Object convertNullableField( - Schema avroSchema, TableFieldSchema fieldSchema, Object v) { + private static @Nullable Object convertNullableField(String name, Schema union, Object v) { // NULLABLE fields are represented as an Avro Union of the corresponding type and "null". verify( - avroSchema.getType() == Type.UNION, + union.getType() == Type.UNION, "Expected Avro schema type UNION, not %s, for BigQuery NULLABLE field %s", - avroSchema.getType(), - fieldSchema.getName()); - List unionTypes = avroSchema.getTypes(); + union.getType(), + name); + List unionTypes = union.getTypes(); verify( unionTypes.size() == 2, "BigQuery NULLABLE field %s should be an Avro UNION of NULL and another type, not %s", - fieldSchema.getName(), - unionTypes); + name, + union); - if (v == null) { + Schema type = union.getTypes().get(GenericData.get().resolveUnion(union, v)); + if (type.getType() == Type.NULL) { return null; + } else { + return convertRequiredField(name, type, v); } - - Type firstType = unionTypes.get(0).getType(); - if (!firstType.equals(Type.NULL)) { - return convertRequiredField(firstType, unionTypes.get(0).getLogicalType(), fieldSchema, v); - } - return convertRequiredField( - unionTypes.get(1).getType(), unionTypes.get(1).getLogicalType(), fieldSchema, v); } - static Schema toGenericAvroSchema( - String schemaName, List fieldSchemas, @Nullable String namespace) { + private static Schema toGenericAvroSchema( + String schemaName, + List fieldSchemas, + Boolean useAvroLogicalTypes, + @Nullable String namespace) { String nextNamespace = namespace == null ? null : String.format("%s.%s", namespace, schemaName); List avroFields = new ArrayList<>(); for (TableFieldSchema bigQueryField : fieldSchemas) { - avroFields.add(convertField(bigQueryField, nextNamespace)); + avroFields.add(convertField(bigQueryField, useAvroLogicalTypes, nextNamespace)); } return Schema.createRecord( schemaName, @@ -384,11 +393,19 @@ static Schema toGenericAvroSchema( avroFields); } - static Schema toGenericAvroSchema(String schemaName, List fieldSchemas) { - return toGenericAvroSchema( - schemaName, - fieldSchemas, - hasNamespaceCollision(fieldSchemas) ? "org.apache.beam.sdk.io.gcp.bigquery" : null); + static Schema toGenericAvroSchema(TableSchema tableSchema) { + return toGenericAvroSchema("root", tableSchema.getFields(), true); + } + + static Schema toGenericAvroSchema(TableSchema tableSchema, Boolean useAvroLogicalTypes) { + return toGenericAvroSchema("root", tableSchema.getFields(), useAvroLogicalTypes); + } + + static Schema toGenericAvroSchema( + String schemaName, List fieldSchemas, Boolean useAvroLogicalTypes) { + String namespace = + hasNamespaceCollision(fieldSchemas) ? "org.apache.beam.sdk.io.gcp.bigquery" : null; + return toGenericAvroSchema(schemaName, fieldSchemas, useAvroLogicalTypes, namespace); } // To maintain backwards compatibility we only disambiguate collisions in the field namespaces as @@ -415,64 +432,30 @@ private static boolean hasNamespaceCollision(List fieldSchemas @SuppressWarnings({ "nullness" // Avro library not annotated }) - private static Field convertField(TableFieldSchema bigQueryField, @Nullable String namespace) { - ImmutableCollection avroTypes = BIG_QUERY_TO_AVRO_TYPES.get(bigQueryField.getType()); - if (avroTypes.isEmpty()) { - throw new IllegalArgumentException( - "Unable to map BigQuery field type " + bigQueryField.getType() + " to avro type."); - } - - Type avroType = avroTypes.iterator().next(); - Schema elementSchema; - if (avroType == Type.RECORD) { - elementSchema = - toGenericAvroSchema(bigQueryField.getName(), bigQueryField.getFields(), namespace); - } else { - elementSchema = handleAvroLogicalTypes(bigQueryField, avroType); - } + private static Field convertField( + TableFieldSchema bigQueryField, Boolean useAvroLogicalTypes, @Nullable String namespace) { + String fieldName = bigQueryField.getName(); Schema fieldSchema; - if (bigQueryField.getMode() == null || "NULLABLE".equals(bigQueryField.getMode())) { - fieldSchema = Schema.createUnion(Schema.create(Type.NULL), elementSchema); - } else if ("REQUIRED".equals(bigQueryField.getMode())) { - fieldSchema = elementSchema; - } else if ("REPEATED".equals(bigQueryField.getMode())) { - fieldSchema = Schema.createArray(elementSchema); + String bqType = bigQueryField.getType(); + if ("RECORD".equals(bqType) || "STRUCT".equals(bqType)) { + fieldSchema = + toGenericAvroSchema(fieldName, bigQueryField.getFields(), useAvroLogicalTypes, namespace); } else { - throw new IllegalArgumentException( - String.format("Unknown BigQuery Field Mode: %s", bigQueryField.getMode())); + fieldSchema = getPrimitiveType(bigQueryField, useAvroLogicalTypes); + } + + String bqMode = bigQueryField.getMode(); + if (bqMode == null || "NULLABLE".equals(bqMode)) { + fieldSchema = SchemaBuilder.unionOf().nullType().and().type(fieldSchema).endUnion(); + } else if ("REPEATED".equals(bqMode)) { + fieldSchema = SchemaBuilder.array().items(fieldSchema); + } else if (!"REQUIRED".equals(bqMode)) { + throw new IllegalArgumentException(String.format("Unknown BigQuery Field Mode: %s", bqMode)); } return new Field( - bigQueryField.getName(), + fieldName, fieldSchema, bigQueryField.getDescription(), (Object) null /* Cast to avoid deprecated JsonNode constructor. */); } - - private static Schema handleAvroLogicalTypes(TableFieldSchema bigQueryField, Type avroType) { - String bqType = bigQueryField.getType(); - switch (bqType) { - case "NUMERIC": - // Default value based on - // https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#decimal_types - int precision = Optional.ofNullable(bigQueryField.getPrecision()).orElse(38L).intValue(); - int scale = Optional.ofNullable(bigQueryField.getScale()).orElse(9L).intValue(); - return LogicalTypes.decimal(precision, scale).addToSchema(Schema.create(Type.BYTES)); - case "BIGNUMERIC": - // Default value based on - // https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#decimal_types - int precisionBigNumeric = - Optional.ofNullable(bigQueryField.getPrecision()).orElse(77L).intValue(); - int scaleBigNumeric = Optional.ofNullable(bigQueryField.getScale()).orElse(38L).intValue(); - return LogicalTypes.decimal(precisionBigNumeric, scaleBigNumeric) - .addToSchema(Schema.create(Type.BYTES)); - case "TIMESTAMP": - return LogicalTypes.timestampMicros().addToSchema(Schema.create(Type.LONG)); - case "GEOGRAPHY": - Schema geoSchema = Schema.create(Type.STRING); - geoSchema.addProp(LogicalType.LOGICAL_TYPE_PROP, "geography_wkt"); - return geoSchema; - default: - return Schema.create(avroType); - } - } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index f3ade1948986..88dfa2c26348 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -627,9 +627,7 @@ public class BigQueryIO { GENERIC_DATUM_WRITER_FACTORY = schema -> new GenericDatumWriter<>(); private static final SerializableFunction - DEFAULT_AVRO_SCHEMA_FACTORY = - (SerializableFunction) - input -> BigQueryAvroUtils.toGenericAvroSchema("root", input.getFields()); + DEFAULT_AVRO_SCHEMA_FACTORY = BigQueryAvroUtils::toGenericAvroSchema; /** * @deprecated Use {@link #read(SerializableFunction)} or {@link #readTableRows} instead. {@link @@ -793,8 +791,7 @@ static class TableRowParser implements SerializableFunction executeExtract( List> createSources( List files, TableSchema schema, @Nullable List metadata) throws IOException, InterruptedException { - String avroSchema = - BigQueryAvroUtils.toGenericAvroSchema("root", schema.getFields()).toString(); + String avroSchema = BigQueryAvroUtils.toGenericAvroSchema(schema).toString(); AvroSource.DatumReaderFactory factory = readerFactory.apply(schema); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java index f2f997bdbfa9..b4d110f90fe2 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java @@ -312,38 +312,45 @@ static StandardSQLTypeName toStandardSQLTypeName(FieldType fieldType) { * *

Supports both standard and legacy SQL types. * - * @param typeName Name of the type + * @param typeName Name of the type returned by {@link TableFieldSchema#getType()} * @param nestedFields Nested fields for the given type (eg. RECORD type) * @return Corresponding Beam {@link FieldType} */ private static FieldType fromTableFieldSchemaType( String typeName, List nestedFields, SchemaConversionOptions options) { + // see + // https://googleapis.dev/java/google-api-services-bigquery/latest/com/google/api/services/bigquery/model/TableFieldSchema.html#getType-- switch (typeName) { case "STRING": return FieldType.STRING; case "BYTES": return FieldType.BYTES; - case "INT64": case "INTEGER": + case "INT64": return FieldType.INT64; - case "FLOAT64": case "FLOAT": + case "FLOAT64": return FieldType.DOUBLE; - case "BOOL": case "BOOLEAN": + case "BOOL": return FieldType.BOOLEAN; - case "NUMERIC": - return FieldType.DECIMAL; case "TIMESTAMP": return FieldType.DATETIME; - case "TIME": - return FieldType.logicalType(SqlTypes.TIME); case "DATE": return FieldType.logicalType(SqlTypes.DATE); + case "TIME": + return FieldType.logicalType(SqlTypes.TIME); case "DATETIME": return FieldType.logicalType(SqlTypes.DATETIME); - case "STRUCT": + case "NUMERIC": + case "BIGNUMERIC": + return FieldType.DECIMAL; + case "GEOGRAPHY": + case "JSON": + // TODO Add metadata for custom sql types ? + return FieldType.STRING; case "RECORD": + case "STRUCT": if (options.getInferMaps() && nestedFields.size() == 2) { TableFieldSchema key = nestedFields.get(0); TableFieldSchema value = nestedFields.get(1); @@ -354,9 +361,9 @@ private static FieldType fromTableFieldSchemaType( fromTableFieldSchemaType(value.getType(), value.getFields(), options)); } } - Schema rowSchema = fromTableFieldSchema(nestedFields, options); return FieldType.row(rowSchema); + case "RANGE": // TODO add support for range type default: throw new UnsupportedOperationException( "Converting BigQuery type " + typeName + " to Beam type is unsupported"); @@ -448,10 +455,27 @@ public static Schema fromTableSchema(TableSchema tableSchema, SchemaConversionOp return fromTableFieldSchema(tableSchema.getFields(), options); } + /** Convert a list of BigQuery {@link TableSchema} to Avro {@link org.apache.avro.Schema}. */ + public static org.apache.avro.Schema toGenericAvroSchema(TableSchema tableSchema) { + return toGenericAvroSchema(tableSchema, false); + } + + /** Convert a list of BigQuery {@link TableSchema} to Avro {@link org.apache.avro.Schema}. */ + public static org.apache.avro.Schema toGenericAvroSchema( + TableSchema tableSchema, Boolean useAvroLogicalTypes) { + return toGenericAvroSchema("root", tableSchema.getFields(), useAvroLogicalTypes); + } + /** Convert a list of BigQuery {@link TableFieldSchema} to Avro {@link org.apache.avro.Schema}. */ public static org.apache.avro.Schema toGenericAvroSchema( String schemaName, List fieldSchemas) { - return BigQueryAvroUtils.toGenericAvroSchema(schemaName, fieldSchemas); + return toGenericAvroSchema(schemaName, fieldSchemas, false); + } + + /** Convert a list of BigQuery {@link TableFieldSchema} to Avro {@link org.apache.avro.Schema}. */ + public static org.apache.avro.Schema toGenericAvroSchema( + String schemaName, List fieldSchemas, Boolean useAvroLogicalTypes) { + return BigQueryAvroUtils.toGenericAvroSchema(schemaName, fieldSchemas, useAvroLogicalTypes); } private static final BigQueryIO.TypedRead.ToBeamRowFunction @@ -516,9 +540,20 @@ public static Row toBeamRow(GenericRecord record, Schema schema, ConversionOptio return Row.withSchema(schema).addValues(valuesInOrder).build(); } + /** + * Convert generic record to Bq TableRow. + * + * @deprecated use {@link #convertGenericRecordToTableRow(GenericRecord)} + */ + @Deprecated public static TableRow convertGenericRecordToTableRow( GenericRecord record, TableSchema tableSchema) { - return BigQueryAvroUtils.convertGenericRecordToTableRow(record, tableSchema); + return convertGenericRecordToTableRow(record); + } + + /** Convert generic record to Bq TableRow. */ + public static TableRow convertGenericRecordToTableRow(GenericRecord record) { + return BigQueryAvroUtils.convertGenericRecordToTableRow(record); } /** Convert a Beam Row to a BigQuery TableRow. */ diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtilsTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtilsTest.java index c87888134c8a..662f2658eb6b 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtilsTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtilsTest.java @@ -28,6 +28,7 @@ import java.math.BigDecimal; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; +import java.time.Instant; import java.util.ArrayList; import java.util.List; import org.apache.avro.Conversions; @@ -38,14 +39,14 @@ import org.apache.avro.Schema.Type; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; +import org.apache.avro.reflect.AvroSchema; import org.apache.avro.reflect.Nullable; +import org.apache.avro.reflect.ReflectData; import org.apache.avro.util.Utf8; -import org.apache.beam.sdk.coders.DefaultCoder; import org.apache.beam.sdk.extensions.avro.coders.AvroCoder; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.BaseEncoding; -import org.apache.commons.lang3.tuple.Pair; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -96,64 +97,26 @@ public class BigQueryAvroUtilsTest { .setFields(subFields), new TableFieldSchema().setName("geoPositions").setType("GEOGRAPHY").setMode("NULLABLE")); - private Pair convertToByteBuffer(BigDecimal bigDecimal, Schema schema) { - LogicalType bigDecimalLogicalType = - LogicalTypes.decimal(bigDecimal.precision(), bigDecimal.scale()); - // DecimalConversion.toBytes returns a ByteBuffer, which can be mutated by callees if passed - // to other methods. We wrap the byte array as a ByteBuffer before adding it to the - // GenericRecords. - byte[] bigDecimalBytes = - new Conversions.DecimalConversion() - .toBytes(bigDecimal, schema, bigDecimalLogicalType) - .array(); - return Pair.of(bigDecimalLogicalType, bigDecimalBytes); + private ByteBuffer convertToBytes(BigDecimal bigDecimal, int precision, int scale) { + LogicalType bigDecimalLogicalType = LogicalTypes.decimal(precision, scale); + return new Conversions.DecimalConversion().toBytes(bigDecimal, null, bigDecimalLogicalType); } @Test public void testConvertGenericRecordToTableRow() throws Exception { - TableSchema tableSchema = new TableSchema(); - tableSchema.setFields(fields); - - // BigQuery encodes NUMERIC and BIGNUMERIC values to Avro using the BYTES type with the DECIMAL - // logical type. AvroCoder can't apply logical types to Schemas directly, so we need to get the - // Schema for the Bird class defined below, then replace the field used to test NUMERIC with - // a field that has the appropriate Schema. - Schema numericSchema = Schema.create(Type.BYTES); BigDecimal numeric = new BigDecimal("123456789.123456789"); - Pair numericPair = convertToByteBuffer(numeric, numericSchema); - Schema bigNumericSchema = Schema.create(Type.BYTES); + ByteBuffer numericBytes = convertToBytes(numeric, 38, 9); BigDecimal bigNumeric = new BigDecimal( "578960446186580977117854925043439539266.34992332820282019728792003956564819967"); - Pair bigNumericPair = convertToByteBuffer(bigNumeric, bigNumericSchema); - - // In order to update the Schema for NUMERIC and BIGNUMERIC values, we need to recreate all of - // the Fields. - List avroFields = new ArrayList<>(); - for (Schema.Field field : AvroCoder.of(Bird.class).getSchema().getFields()) { - Schema schema = field.schema(); - if ("birthdayMoney".equals(field.name())) { - // birthdayMoney is nullable field with type BYTES/DECIMAL. - schema = - Schema.createUnion( - Schema.create(Type.NULL), numericPair.getLeft().addToSchema(numericSchema)); - } else if ("lotteryWinnings".equals(field.name())) { - // lotteryWinnings is nullable field with type BYTES/DECIMAL. - schema = - Schema.createUnion( - Schema.create(Type.NULL), bigNumericPair.getLeft().addToSchema(bigNumericSchema)); - } - // After a Field is added to a Schema, it is assigned a position, so we can't simply reuse - // the existing Field. - avroFields.add(new Schema.Field(field.name(), schema, field.doc(), field.defaultVal())); - } - Schema avroSchema = Schema.createRecord(avroFields); + ByteBuffer bigNumericBytes = convertToBytes(bigNumeric, 77, 38); + Schema avroSchema = ReflectData.get().getSchema(Bird.class); { // Test nullable fields. GenericRecord record = new GenericData.Record(avroSchema); record.put("number", 5L); - TableRow convertedRow = BigQueryAvroUtils.convertGenericRecordToTableRow(record, tableSchema); + TableRow convertedRow = BigQueryAvroUtils.convertGenericRecordToTableRow(record); TableRow row = new TableRow().set("number", "5").set("associates", new ArrayList()); assertEquals(row, convertedRow); TableRow clonedRow = convertedRow.clone(); @@ -169,15 +132,15 @@ public void testConvertGenericRecordToTableRow() throws Exception { record.put("number", 5L); record.put("quality", 5.0); record.put("birthday", 5L); - record.put("birthdayMoney", ByteBuffer.wrap(numericPair.getRight())); - record.put("lotteryWinnings", ByteBuffer.wrap(bigNumericPair.getRight())); + record.put("birthdayMoney", numericBytes); + record.put("lotteryWinnings", bigNumericBytes); record.put("flighted", Boolean.TRUE); record.put("sound", soundByteBuffer); record.put("anniversaryDate", new Utf8("2000-01-01")); record.put("anniversaryDatetime", new String("2000-01-01 00:00:00.000005")); record.put("anniversaryTime", new Utf8("00:00:00.000005")); record.put("geoPositions", new String("LINESTRING(1 2, 3 4, 5 6, 7 8)")); - TableRow convertedRow = BigQueryAvroUtils.convertGenericRecordToTableRow(record, tableSchema); + TableRow convertedRow = BigQueryAvroUtils.convertGenericRecordToTableRow(record); TableRow row = new TableRow() .set("number", "5") @@ -204,9 +167,9 @@ public void testConvertGenericRecordToTableRow() throws Exception { GenericRecord record = new GenericData.Record(avroSchema); record.put("number", 5L); record.put("associates", Lists.newArrayList(nestedRecord)); - record.put("birthdayMoney", ByteBuffer.wrap(numericPair.getRight())); - record.put("lotteryWinnings", ByteBuffer.wrap(bigNumericPair.getRight())); - TableRow convertedRow = BigQueryAvroUtils.convertGenericRecordToTableRow(record, tableSchema); + record.put("birthdayMoney", numericBytes); + record.put("lotteryWinnings", bigNumericBytes); + TableRow convertedRow = BigQueryAvroUtils.convertGenericRecordToTableRow(record); TableRow row = new TableRow() .set("associates", Lists.newArrayList(new TableRow().set("species", "other"))) @@ -223,8 +186,7 @@ public void testConvertGenericRecordToTableRow() throws Exception { public void testConvertBigQuerySchemaToAvroSchema() { TableSchema tableSchema = new TableSchema(); tableSchema.setFields(fields); - Schema avroSchema = - BigQueryAvroUtils.toGenericAvroSchema("testSchema", tableSchema.getFields()); + Schema avroSchema = BigQueryAvroUtils.toGenericAvroSchema(tableSchema); assertThat(avroSchema.getField("number").schema(), equalTo(Schema.create(Type.LONG))); assertThat( @@ -260,17 +222,23 @@ public void testConvertBigQuerySchemaToAvroSchema() { assertThat( avroSchema.getField("sound").schema(), equalTo(Schema.createUnion(Schema.create(Type.NULL), Schema.create(Type.BYTES)))); + Schema dateSchema = Schema.create(Type.INT); + LogicalTypes.date().addToSchema(dateSchema); assertThat( avroSchema.getField("anniversaryDate").schema(), - equalTo(Schema.createUnion(Schema.create(Type.NULL), Schema.create(Type.STRING)))); + equalTo(Schema.createUnion(Schema.create(Type.NULL), dateSchema))); + Schema dateTimeSchema = Schema.create(Type.STRING); + BigQueryAvroUtils.DATETIME_LOGICAL_TYPE.addToSchema(dateTimeSchema); assertThat( avroSchema.getField("anniversaryDatetime").schema(), - equalTo(Schema.createUnion(Schema.create(Type.NULL), Schema.create(Type.STRING)))); + equalTo(Schema.createUnion(Schema.create(Type.NULL), dateTimeSchema))); + Schema timeSchema = Schema.create(Type.LONG); + LogicalTypes.timeMicros().addToSchema(timeSchema); assertThat( avroSchema.getField("anniversaryTime").schema(), - equalTo(Schema.createUnion(Schema.create(Type.NULL), Schema.create(Type.STRING)))); + equalTo(Schema.createUnion(Schema.create(Type.NULL), timeSchema))); Schema geoSchema = Schema.create(Type.STRING); - geoSchema.addProp(LogicalType.LOGICAL_TYPE_PROP, "geography_wkt"); + geoSchema.addProp("sqlType", "GEOGRAPHY"); assertThat( avroSchema.getField("geoPositions").schema(), equalTo(Schema.createUnion(Schema.create(Type.NULL), geoSchema))); @@ -309,6 +277,109 @@ public void testConvertBigQuerySchemaToAvroSchema() { (Object) null)))))); } + @Test + public void testConvertBigQuerySchemaToAvroSchemaWithoutLogicalTypes() { + TableSchema tableSchema = new TableSchema(); + tableSchema.setFields(fields); + Schema avroSchema = BigQueryAvroUtils.toGenericAvroSchema(tableSchema, false); + + assertThat(avroSchema.getField("number").schema(), equalTo(Schema.create(Schema.Type.LONG))); + assertThat( + avroSchema.getField("species").schema(), + equalTo( + Schema.createUnion( + Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.STRING)))); + assertThat( + avroSchema.getField("quality").schema(), + equalTo( + Schema.createUnion( + Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.DOUBLE)))); + assertThat( + avroSchema.getField("quantity").schema(), + equalTo( + Schema.createUnion(Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.LONG)))); + assertThat( + avroSchema.getField("birthday").schema(), + equalTo( + Schema.createUnion( + Schema.create(Schema.Type.NULL), + LogicalTypes.timestampMicros().addToSchema(Schema.create(Schema.Type.LONG))))); + assertThat( + avroSchema.getField("birthdayMoney").schema(), + equalTo( + Schema.createUnion( + Schema.create(Schema.Type.NULL), + LogicalTypes.decimal(38, 9).addToSchema(Schema.create(Schema.Type.BYTES))))); + assertThat( + avroSchema.getField("lotteryWinnings").schema(), + equalTo( + Schema.createUnion( + Schema.create(Schema.Type.NULL), + LogicalTypes.decimal(77, 38).addToSchema(Schema.create(Schema.Type.BYTES))))); + assertThat( + avroSchema.getField("flighted").schema(), + equalTo( + Schema.createUnion( + Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.BOOLEAN)))); + assertThat( + avroSchema.getField("sound").schema(), + equalTo( + Schema.createUnion(Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.BYTES)))); + Schema dateSchema = Schema.create(Schema.Type.STRING); + dateSchema.addProp("sqlType", "DATE"); + assertThat( + avroSchema.getField("anniversaryDate").schema(), + equalTo(Schema.createUnion(Schema.create(Schema.Type.NULL), dateSchema))); + Schema dateTimeSchema = Schema.create(Schema.Type.STRING); + dateTimeSchema.addProp("sqlType", "DATETIME"); + assertThat( + avroSchema.getField("anniversaryDatetime").schema(), + equalTo(Schema.createUnion(Schema.create(Schema.Type.NULL), dateTimeSchema))); + Schema timeSchema = Schema.create(Schema.Type.STRING); + timeSchema.addProp("sqlType", "TIME"); + assertThat( + avroSchema.getField("anniversaryTime").schema(), + equalTo(Schema.createUnion(Schema.create(Schema.Type.NULL), timeSchema))); + Schema geoSchema = Schema.create(Type.STRING); + geoSchema.addProp("sqlType", "GEOGRAPHY"); + assertThat( + avroSchema.getField("geoPositions").schema(), + equalTo(Schema.createUnion(Schema.create(Schema.Type.NULL), geoSchema))); + assertThat( + avroSchema.getField("scion").schema(), + equalTo( + Schema.createUnion( + Schema.create(Schema.Type.NULL), + Schema.createRecord( + "scion", + "Translated Avro Schema for scion", + "org.apache.beam.sdk.io.gcp.bigquery", + false, + ImmutableList.of( + new Schema.Field( + "species", + Schema.createUnion( + Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.STRING)), + null, + (Object) null)))))); + assertThat( + avroSchema.getField("associates").schema(), + equalTo( + Schema.createArray( + Schema.createRecord( + "associates", + "Translated Avro Schema for associates", + "org.apache.beam.sdk.io.gcp.bigquery", + false, + ImmutableList.of( + new Schema.Field( + "species", + Schema.createUnion( + Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.STRING)), + null, + (Object) null)))))); + } + @Test public void testFormatTimestamp() { assertThat( @@ -427,22 +498,34 @@ public void testSchemaCollisionsInAvroConversion() { .setType("FLOAT"))))))))), new TableFieldSchema().setName("platform").setType("STRING"))); // To string should be sufficient here as this exercises Avro's conversion feature - String output = BigQueryAvroUtils.toGenericAvroSchema("root", schema.getFields()).toString(); + String output = BigQueryAvroUtils.toGenericAvroSchema(schema, false).toString(); assertThat(output.length(), greaterThan(0)); } /** Pojo class used as the record type in tests. */ - @DefaultCoder(AvroCoder.class) @SuppressWarnings("unused") // Used by Avro reflection. static class Bird { long number; @Nullable String species; @Nullable Double quality; @Nullable Long quantity; - @Nullable Long birthday; // Exercises TIMESTAMP. - @Nullable ByteBuffer birthdayMoney; // Exercises NUMERIC. - @Nullable ByteBuffer lotteryWinnings; // Exercises BIGNUMERIC. - @Nullable String geoPositions; // Exercises GEOGRAPHY. + + @AvroSchema(value = "[\"null\", {\"type\": \"long\", \"logicalType\": \"timestamp-micros\"}]") + Instant birthday; + + @AvroSchema( + value = + "[\"null\", {\"type\": \"bytes\", \"logicalType\": \"decimal\", \"precision\": 38, \"scale\": 9}]") + BigDecimal birthdayMoney; + + @AvroSchema( + value = + "[\"null\", {\"type\": \"bytes\", \"logicalType\": \"decimal\", \"precision\": 77, \"scale\": 38}]") + BigDecimal lotteryWinnings; + + @AvroSchema(value = "[\"null\", {\"type\": \"string\", \"sqlType\": \"GEOGRAPHY\"}]") + String geoPositions; + @Nullable Boolean flighted; @Nullable ByteBuffer sound; @Nullable Utf8 anniversaryDate;