Skip to content

Commit

Permalink
[java] BQ: add missing avro conversions to BQ TableRow (apache#33221)
Browse files Browse the repository at this point in the history
* [java] BQ: add missing avro conversions to BQ TableRow

Avro float fields can be used to write BQ FLOAT columns.
Add TableRow conversion for such field.

Adding conversion for aveo 1.10+ logical types local-timestamp-millis
and local-timestam-micros.

* Rework tests

* Add map and fixed types conversion

* Fix checkstyle

* Use valid parameters

* Test record nullable field
  • Loading branch information
RustedBones authored Dec 10, 2024
1 parent 354e8e3 commit 6d8e02e
Show file tree
Hide file tree
Showing 2 changed files with 746 additions and 403 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.apache.avro.Conversions;
import org.apache.avro.LogicalType;
Expand All @@ -50,14 +52,14 @@
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;

/**
* A set of utilities for working with Avro files.
*
* <p>These utilities are based on the <a href="https://avro.apache.org/docs/1.8.1/spec.html">Avro
* 1.8.1</a> specification.
*/
/** A set of utilities for working with Avro files. */
class BigQueryAvroUtils {

private static final String VERSION_AVRO =
Optional.ofNullable(Schema.class.getPackage())
.map(Package::getImplementationVersion)
.orElse("");

// org.apache.avro.LogicalType
static class DateTimeLogicalType extends LogicalType {
public DateTimeLogicalType() {
Expand All @@ -74,6 +76,8 @@ public DateTimeLogicalType() {
* export</a>
* @see <a href=https://cloud.google.com/bigquery/docs/reference/storage#avro_schema_details>BQ
* avro storage</a>
* @see <a href=https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-avro>BQ avro
* load</a>
*/
static Schema getPrimitiveType(TableFieldSchema schema, Boolean useAvroLogicalTypes) {
String bqType = schema.getType();
Expand Down Expand Up @@ -116,6 +120,9 @@ static Schema getPrimitiveType(TableFieldSchema schema, Boolean useAvroLogicalTy
}
case "DATETIME":
if (useAvroLogicalTypes) {
// BQ export uses a custom logical type
// TODO for load/storage use
// LogicalTypes.date().addToSchema(SchemaBuilder.builder().intType())
return DATETIME_LOGICAL_TYPE.addToSchema(SchemaBuilder.builder().stringType());
} else {
return SchemaBuilder.builder().stringBuilder().prop("sqlType", bqType).endString();
Expand Down Expand Up @@ -158,6 +165,12 @@ static Schema getPrimitiveType(TableFieldSchema schema, Boolean useAvroLogicalTy

@VisibleForTesting
static String formatTimestamp(Long timestampMicro) {
String dateTime = formatDatetime(timestampMicro);
return dateTime + " UTC";
}

@VisibleForTesting
static String formatDatetime(Long timestampMicro) {
// timestampMicro is in "microseconds since epoch" format,
// e.g., 1452062291123456L means "2016-01-06 06:38:11.123456 UTC".
// Separate into seconds and microseconds.
Expand All @@ -168,11 +181,13 @@ static String formatTimestamp(Long timestampMicro) {
timestampSec -= 1;
}
String dayAndTime = DATE_AND_SECONDS_FORMATTER.print(timestampSec * 1000);

if (micros == 0) {
return String.format("%s UTC", dayAndTime);
return dayAndTime;
} else if (micros % 1000 == 0) {
return String.format("%s.%03d", dayAndTime, micros / 1000);
} else {
return String.format("%s.%06d", dayAndTime, micros);
}
return String.format("%s.%06d UTC", dayAndTime, micros);
}

/**
Expand Down Expand Up @@ -274,8 +289,7 @@ static TableRow convertGenericRecordToTableRow(GenericRecord record) {
case UNION:
return convertNullableField(name, schema, v);
case MAP:
throw new UnsupportedOperationException(
String.format("Unexpected Avro field schema type %s for field named %s", type, name));
return convertMapField(name, schema, v);
default:
return convertRequiredField(name, schema, v);
}
Expand All @@ -296,6 +310,26 @@ private static List<Object> convertRepeatedField(String name, Schema elementType
return values;
}

private static List<TableRow> convertMapField(String name, Schema map, Object v) {
// Avro maps are represented as key/value RECORD.
if (v == null) {
// Handle the case of an empty map.
return new ArrayList<>();
}

Schema type = map.getValueType();
Map<String, Object> elements = (Map<String, Object>) v;
ArrayList<TableRow> values = new ArrayList<>();
for (Map.Entry<String, Object> element : elements.entrySet()) {
TableRow row =
new TableRow()
.set("key", element.getKey())
.set("value", convertRequiredField(name, type, element.getValue()));
values.add(row);
}
return values;
}

private static Object convertRequiredField(String name, Schema schema, Object v) {
// REQUIRED fields are represented as the corresponding Avro types. For example, a BigQuery
// INTEGER type maps to an Avro LONG type.
Expand All @@ -305,45 +339,83 @@ private static Object convertRequiredField(String name, Schema schema, Object v)
LogicalType logicalType = schema.getLogicalType();
switch (type) {
case BOOLEAN:
// SQL types BOOL, BOOLEAN
// SQL type BOOL (BOOLEAN)
return v;
case INT:
if (logicalType instanceof LogicalTypes.Date) {
// SQL types DATE
// SQL type DATE
// ideally LocalDate but TableRowJsonCoder encodes as String
return formatDate((Integer) v);
} else if (logicalType instanceof LogicalTypes.TimeMillis) {
// Write only: SQL type TIME
// ideally LocalTime but TableRowJsonCoder encodes as String
return formatTime(((Integer) v) * 1000L);
} else {
throw new UnsupportedOperationException(
String.format("Unexpected Avro field schema type %s for field named %s", type, name));
// Write only: SQL type INT64 (INT, SMALLINT, INTEGER, BIGINT, TINYINT, BYTEINT)
// ideally Integer but keep consistency with BQ JSON export that uses String
return ((Integer) v).toString();
}
case LONG:
if (logicalType instanceof LogicalTypes.TimeMicros) {
// SQL types TIME
// SQL type TIME
// ideally LocalTime but TableRowJsonCoder encodes as String
return formatTime((Long) v);
} else if (logicalType instanceof LogicalTypes.TimestampMillis) {
// Write only: SQL type TIMESTAMP
// ideally Instant but TableRowJsonCoder encodes as String
return formatTimestamp((Long) v * 1000L);
} else if (logicalType instanceof LogicalTypes.TimestampMicros) {
// SQL types TIMESTAMP
// SQL type TIMESTAMP
// ideally Instant but TableRowJsonCoder encodes as String
return formatTimestamp((Long) v);
} else if (!(VERSION_AVRO.startsWith("1.8") || VERSION_AVRO.startsWith("1.9"))
&& logicalType instanceof LogicalTypes.LocalTimestampMillis) {
// Write only: SQL type DATETIME
// ideally LocalDateTime but TableRowJsonCoder encodes as String
return formatDatetime(((Long) v) * 1000);
} else if (!(VERSION_AVRO.startsWith("1.8") || VERSION_AVRO.startsWith("1.9"))
&& logicalType instanceof LogicalTypes.LocalTimestampMicros) {
// Write only: SQL type DATETIME
// ideally LocalDateTime but TableRowJsonCoder encodes as String
return formatDatetime((Long) v);
} else {
// SQL types INT64 (INT, SMALLINT, INTEGER, BIGINT, TINYINT, BYTEINT)
// SQL type INT64 (INT, SMALLINT, INTEGER, BIGINT, TINYINT, BYTEINT)
// ideally Long if in [2^53+1, 2^53-1] but keep consistency with BQ JSON export that uses
// String
return ((Long) v).toString();
}
case FLOAT:
// Write only: SQL type FLOAT64
// ideally Float but TableRowJsonCoder decodes as Double
return Double.valueOf(v.toString());
case DOUBLE:
// SQL types FLOAT64
// SQL type FLOAT64
return v;
case BYTES:
if (logicalType instanceof LogicalTypes.Decimal) {
// SQL tpe NUMERIC, BIGNUMERIC
// ideally BigDecimal but TableRowJsonCoder encodes as String
return new Conversions.DecimalConversion()
.fromBytes((ByteBuffer) v, schema, logicalType)
.toString();
} else {
// SQL types BYTES
// SQL type BYTES
// ideally byte[] but TableRowJsonCoder encodes as String
return BaseEncoding.base64().encode(((ByteBuffer) v).array());
}
case STRING:
// SQL types STRING, DATETIME, GEOGRAPHY, JSON
// when not using logical type DATE, TIME too
return v.toString();
case ENUM:
// SQL types STRING
return v.toString();
case FIXED:
// SQL type BYTES
// ideally byte[] but TableRowJsonCoder encodes as String
return BaseEncoding.base64().encode(((ByteBuffer) v).array());
case RECORD:
// SQL types RECORD
return convertGenericRecordToTableRow((GenericRecord) v);
default:
throw new UnsupportedOperationException(
Expand Down
Loading

0 comments on commit 6d8e02e

Please sign in to comment.