Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[java] Fix avro logical-types conversions for BQ storage #33422

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import com.google.protobuf.Descriptors.FieldDescriptor;
import com.google.protobuf.DynamicMessage;
import java.math.BigDecimal;
import java.math.MathContext;
import java.math.RoundingMode;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.Optional;
Expand All @@ -44,15 +46,15 @@
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Functions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.joda.time.Days;
import org.joda.time.Instant;
import org.joda.time.ReadableInstant;

/**
* Utility methods for converting Avro {@link GenericRecord} objects to dynamic protocol message,
* for use with the Storage write API.
*/
public class AvroGenericRecordToStorageApiProto {

private static final org.joda.time.LocalDate EPOCH_DATE = new org.joda.time.LocalDate(1970, 1, 1);

static final Map<Schema.Type, TableFieldSchema.Type> PRIMITIVE_TYPES =
ImmutableMap.<Schema.Type, TableFieldSchema.Type>builder()
.put(Schema.Type.INT, TableFieldSchema.Type.INT64)
Expand All @@ -69,11 +71,15 @@ public class AvroGenericRecordToStorageApiProto {
// A map of supported logical types to the protobuf field type.
static final Map<String, TableFieldSchema.Type> LOGICAL_TYPES =
ImmutableMap.<String, TableFieldSchema.Type>builder()
.put(LogicalTypes.date().getName(), TableFieldSchema.Type.DATE)
.put(LogicalTypes.decimal(1).getName(), TableFieldSchema.Type.BIGNUMERIC)
.put(LogicalTypes.timestampMicros().getName(), TableFieldSchema.Type.TIMESTAMP)
.put(LogicalTypes.timestampMillis().getName(), TableFieldSchema.Type.TIMESTAMP)
.put(LogicalTypes.uuid().getName(), TableFieldSchema.Type.STRING)
.put("date", TableFieldSchema.Type.DATE)
.put("time-micros", TableFieldSchema.Type.TIME)
.put("time-millis", TableFieldSchema.Type.TIME)
.put("decimal", TableFieldSchema.Type.BIGNUMERIC)
.put("timestamp-micros", TableFieldSchema.Type.TIMESTAMP)
.put("timestamp-millis", TableFieldSchema.Type.TIMESTAMP)
.put("local-timestamp-micros", TableFieldSchema.Type.DATETIME)
.put("local-timestamp-millis", TableFieldSchema.Type.DATETIME)
.put("uuid", TableFieldSchema.Type.STRING)
.build();

static final Map<Schema.Type, Function<Object, Object>> PRIMITIVE_ENCODERS =
Expand All @@ -92,16 +98,15 @@ public class AvroGenericRecordToStorageApiProto {
// A map of supported logical types to their encoding functions.
static final Map<String, BiFunction<LogicalType, Object, Object>> LOGICAL_TYPE_ENCODERS =
ImmutableMap.<String, BiFunction<LogicalType, Object, Object>>builder()
.put(LogicalTypes.date().getName(), (logicalType, value) -> convertDate(value))
.put(
LogicalTypes.decimal(1).getName(), AvroGenericRecordToStorageApiProto::convertDecimal)
.put(
LogicalTypes.timestampMicros().getName(),
(logicalType, value) -> convertTimestamp(value, true))
.put(
LogicalTypes.timestampMillis().getName(),
(logicalType, value) -> convertTimestamp(value, false))
.put(LogicalTypes.uuid().getName(), (logicalType, value) -> convertUUID(value))
.put("date", (logicalType, value) -> convertDate(value))
.put("time-micros", (logicalType, value) -> convertTime(value, true))
.put("time-millis", (logicalType, value) -> convertTime(value, false))
.put("decimal", AvroGenericRecordToStorageApiProto::convertDecimal)
.put("timestamp-micros", (logicalType, value) -> convertTimestamp(value, true))
.put("timestamp-millis", (logicalType, value) -> convertTimestamp(value, false))
.put("local-timestamp-micros", (logicalType, value) -> convertDateTime(value, true))
.put("local-timestamp-millis", (logicalType, value) -> convertDateTime(value, false))
.put("uuid", (logicalType, value) -> convertUUID(value))
.build();

static String convertUUID(Object value) {
Expand All @@ -115,34 +120,97 @@ static String convertUUID(Object value) {
}

static Long convertTimestamp(Object value, boolean micros) {
if (value instanceof ReadableInstant) {
return ((ReadableInstant) value).getMillis() * (micros ? 1000 : 1);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was wrong. BQ always expects epoch microseconds. Conversion should be applied on the raw type depending if it represents millis or micros

if (value instanceof org.joda.time.ReadableInstant) {
return ((org.joda.time.ReadableInstant) value).getMillis() * 1_000L;
} else if (value instanceof java.time.Instant) {
java.time.Instant instant = (java.time.Instant) value;
long seconds = instant.getEpochSecond();
int nanos = instant.getNano();

if (seconds < 0 && nanos > 0) {
long ms = Math.multiplyExact(seconds + 1, 1_000_000L);
long adjustment = (nanos / 1_000L) - 1_000_000L;
return Math.addExact(ms, adjustment);
} else {
long ms = Math.multiplyExact(seconds, 1_000_000L);
return Math.addExact(ms, nanos / 1_000L);
}
} else {
Preconditions.checkArgument(
value instanceof Long, "Expecting a value as Long type (millis).");
return (Long) value;
value instanceof Long, "Expecting a value as Long type (timestamp).");
return (micros ? 1 : 1_000L) * ((Long) value);
}
}

static Integer convertDate(Object value) {
if (value instanceof ReadableInstant) {
return Days.daysBetween(Instant.EPOCH, (ReadableInstant) value).getDays();
if (value instanceof org.joda.time.LocalDate) {
return org.joda.time.Days.daysBetween(EPOCH_DATE, (org.joda.time.LocalDate) value).getDays();
} else if (value instanceof java.time.LocalDate) {
return (int) ((java.time.LocalDate) value).toEpochDay();
} else {
Preconditions.checkArgument(
value instanceof Integer, "Expecting a value as Integer type (days).");
return (Integer) value;
}
}

static Long convertTime(Object value, boolean micros) {
if (value instanceof org.joda.time.LocalTime) {
return 1_000L * (long) ((org.joda.time.LocalTime) value).getMillisOfDay();
} else if (value instanceof java.time.LocalTime) {
return java.util.concurrent.TimeUnit.NANOSECONDS.toMicros(
((java.time.LocalTime) value).toNanoOfDay());
} else {
if (micros) {
Preconditions.checkArgument(
value instanceof Long, "Expecting a value as Long type (time).");
return (Long) value;
} else {
Preconditions.checkArgument(
value instanceof Integer, "Expecting a value as Integer type (time).");
return 1_000L * (Integer) value;
}
}
}

static Long convertDateTime(Object value, boolean micros) {
if (value instanceof org.joda.time.LocalDateTime) {
// we should never come here as local-timestamp has been added after joda deprecation
// implement nonetheless for consistency
org.joda.time.DateTime dateTime =
((org.joda.time.LocalDateTime) value).toDateTime(org.joda.time.DateTimeZone.UTC);
return 1_000L * dateTime.getMillis();
} else if (value instanceof java.time.LocalDateTime) {
java.time.Instant instant =
((java.time.LocalDateTime) value).toInstant(java.time.ZoneOffset.UTC);
return convertTimestamp(instant, micros);
} else {
Preconditions.checkArgument(
value instanceof Long, "Expecting a value as Long type (local-timestamp).");
return (micros ? 1 : 1_000L) * ((Long) value);
}
}

static ByteString convertDecimal(LogicalType logicalType, Object value) {
ByteBuffer byteBuffer = (ByteBuffer) value;
BigDecimal bigDecimal =
new Conversions.DecimalConversion()
.fromBytes(
byteBuffer.duplicate(),
Schema.create(Schema.Type.NULL), // dummy schema, not used
logicalType);
return BeamRowToStorageApiProto.serializeBigDecimalToNumeric(bigDecimal);
if (value instanceof BigDecimal) {
LogicalTypes.Decimal type = (LogicalTypes.Decimal) logicalType;
BigDecimal bigDecimal =
((BigDecimal) value)
.setScale(type.getScale(), RoundingMode.DOWN)
.round(new MathContext(type.getPrecision(), RoundingMode.DOWN));
Comment on lines +199 to +200
Copy link
Contributor Author

@RustedBones RustedBones Dec 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this seems legit to round ? We might also fail if the BigDecimal precision and scale do not match with expected logical type

return BeamRowToStorageApiProto.serializeBigDecimalToNumeric(bigDecimal);
} else {
Preconditions.checkArgument(
value instanceof ByteBuffer, "Expecting a value as ByteBuffer type (decimal).");
ByteBuffer byteBuffer = (ByteBuffer) value;
BigDecimal bigDecimal =
new Conversions.DecimalConversion()
.fromBytes(
byteBuffer.duplicate(),
Schema.create(Schema.Type.NULL), // dummy schema, not used
logicalType);
return BeamRowToStorageApiProto.serializeBigDecimalToNumeric(bigDecimal);
}
}

/**
Expand Down Expand Up @@ -213,7 +281,7 @@ public static DynamicMessage messageFromGenericRecord(
return builder.build();
}

private static TableFieldSchema fieldDescriptorFromAvroField(Schema.Field field) {
private static TableFieldSchema fieldDescriptorFromAvroField(org.apache.avro.Schema.Field field) {
@Nullable Schema schema = field.schema();
Preconditions.checkNotNull(schema, "Unexpected null schema!");
if (StorageApiCDC.COLUMNS.contains(field.name())) {
Expand Down Expand Up @@ -309,7 +377,10 @@ private static TableFieldSchema fieldDescriptorFromAvroField(Schema.Field field)

@Nullable
private static Object messageValueFromGenericRecordValue(
FieldDescriptor fieldDescriptor, Schema.Field avroField, String name, GenericRecord record) {
FieldDescriptor fieldDescriptor,
org.apache.avro.Schema.Field avroField,
String name,
GenericRecord record) {
@Nullable Object value = record.get(name);
if (value == null) {
if (fieldDescriptor.isOptional()
Expand All @@ -325,7 +396,7 @@ private static Object messageValueFromGenericRecordValue(
}

private static Object toProtoValue(
FieldDescriptor fieldDescriptor, Schema avroSchema, Object value) {
FieldDescriptor fieldDescriptor, org.apache.avro.Schema avroSchema, Object value) {
switch (avroSchema.getType()) {
case RECORD:
return messageFromGenericRecord(
Expand Down
Loading
Loading