diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java
index cddde05b194c..1af44ba7a012 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java
@@ -34,6 +34,8 @@
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import org.apache.avro.Conversions;
import org.apache.avro.LogicalType;
@@ -50,14 +52,14 @@
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
-/**
- * A set of utilities for working with Avro files.
- *
- *
These utilities are based on the Avro
- * 1.8.1 specification.
- */
+/** A set of utilities for working with Avro files. */
class BigQueryAvroUtils {
+ private static final String VERSION_AVRO =
+ Optional.ofNullable(Schema.class.getPackage())
+ .map(Package::getImplementationVersion)
+ .orElse("");
+
// org.apache.avro.LogicalType
static class DateTimeLogicalType extends LogicalType {
public DateTimeLogicalType() {
@@ -74,6 +76,8 @@ public DateTimeLogicalType() {
* export
* @see BQ
* avro storage
+ * @see BQ avro
+ * load
*/
static Schema getPrimitiveType(TableFieldSchema schema, Boolean useAvroLogicalTypes) {
String bqType = schema.getType();
@@ -116,6 +120,9 @@ static Schema getPrimitiveType(TableFieldSchema schema, Boolean useAvroLogicalTy
}
case "DATETIME":
if (useAvroLogicalTypes) {
+ // BQ export uses a custom logical type
+ // TODO for load/storage use
+ // LogicalTypes.date().addToSchema(SchemaBuilder.builder().intType())
return DATETIME_LOGICAL_TYPE.addToSchema(SchemaBuilder.builder().stringType());
} else {
return SchemaBuilder.builder().stringBuilder().prop("sqlType", bqType).endString();
@@ -158,6 +165,12 @@ static Schema getPrimitiveType(TableFieldSchema schema, Boolean useAvroLogicalTy
@VisibleForTesting
static String formatTimestamp(Long timestampMicro) {
+ String dateTime = formatDatetime(timestampMicro);
+ return dateTime + " UTC";
+ }
+
+ @VisibleForTesting
+ static String formatDatetime(Long timestampMicro) {
// timestampMicro is in "microseconds since epoch" format,
// e.g., 1452062291123456L means "2016-01-06 06:38:11.123456 UTC".
// Separate into seconds and microseconds.
@@ -168,11 +181,13 @@ static String formatTimestamp(Long timestampMicro) {
timestampSec -= 1;
}
String dayAndTime = DATE_AND_SECONDS_FORMATTER.print(timestampSec * 1000);
-
if (micros == 0) {
- return String.format("%s UTC", dayAndTime);
+ return dayAndTime;
+ } else if (micros % 1000 == 0) {
+ return String.format("%s.%03d", dayAndTime, micros / 1000);
+ } else {
+ return String.format("%s.%06d", dayAndTime, micros);
}
- return String.format("%s.%06d UTC", dayAndTime, micros);
}
/**
@@ -274,8 +289,7 @@ static TableRow convertGenericRecordToTableRow(GenericRecord record) {
case UNION:
return convertNullableField(name, schema, v);
case MAP:
- throw new UnsupportedOperationException(
- String.format("Unexpected Avro field schema type %s for field named %s", type, name));
+ return convertMapField(name, schema, v);
default:
return convertRequiredField(name, schema, v);
}
@@ -296,6 +310,26 @@ private static List