From e7ad71df69f4c5783dc6dea1a767ce2a4c74a6cf Mon Sep 17 00:00:00 2001 From: Yi Hu Date: Wed, 21 Dec 2022 13:32:38 -0500 Subject: [PATCH] Support SqlTypes Date in AvroUtils --- .../beam/sdk/schemas/utils/AvroUtils.java | 10 +++++++++- .../beam/sdk/schemas/utils/AvroUtilsTest.java | 20 +++++++++++++++++++ 2 files changed, 29 insertions(+), 1 deletion(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java index 8a19672f9ea1..8aad6348fbad 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java @@ -76,6 +76,7 @@ import org.apache.beam.sdk.schemas.logicaltypes.FixedBytes; import org.apache.beam.sdk.schemas.logicaltypes.FixedString; import org.apache.beam.sdk.schemas.logicaltypes.OneOfType; +import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes; import org.apache.beam.sdk.schemas.logicaltypes.VariableBytes; import org.apache.beam.sdk.schemas.logicaltypes.VariableString; import org.apache.beam.sdk.schemas.utils.ByteBuddyUtils.ConvertType; @@ -119,6 +120,7 @@ * RECORD <-----> ROW * UNION <-----> LogicalType(OneOfType) * LogicalTypes.Date <-----> LogicalType(DATE) + * <------ LogicalType(urn="beam:logical_type:date:v1") * LogicalTypes.TimestampMillis <-----> DATETIME * LogicalTypes.Decimal <-----> DECIMAL * @@ -966,7 +968,7 @@ private static org.apache.avro.Schema getFieldSchema( oneOfType.getOneOfSchema().getFields().stream() .map(x -> getFieldSchema(x.getType(), x.getName(), namespace)) .collect(Collectors.toList())); - } else if ("DATE".equals(identifier)) { + } else if ("DATE".equals(identifier) || SqlTypes.DATE.getIdentifier().equals(identifier)) { baseType = LogicalTypes.date().addToSchema(org.apache.avro.Schema.create(Type.INT)); } else if ("TIME".equals(identifier)) { baseType = LogicalTypes.timeMillis().addToSchema(org.apache.avro.Schema.create(Type.INT)); @@ -1094,7 +1096,11 @@ private static org.apache.avro.Schema getFieldSchema( oneOfValue.getValue()); } } else if ("DATE".equals(identifier)) { + // "Date" is backed by joda.time.Instant return Days.daysBetween(Instant.EPOCH, (Instant) value).getDays(); + } else if (SqlTypes.DATE.getIdentifier().equals(identifier)) { + // portable SqlTypes.DATE is backed by java.time.LocalDate + return ((java.time.LocalDate) value).toEpochDay(); } else if ("TIME".equals(identifier)) { return (int) ((Instant) value).getMillis(); } else { @@ -1174,6 +1180,8 @@ private static org.apache.avro.Schema getFieldSchema( if (value instanceof ReadableInstant) { int epochDays = Days.daysBetween(Instant.EPOCH, (ReadableInstant) value).getDays(); return convertDateStrict(epochDays, fieldType); + } else if (value instanceof java.time.LocalDate) { + return convertDateStrict((int) ((java.time.LocalDate) value).toEpochDay(), fieldType); } else { return convertDateStrict((Integer) value, fieldType); } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/AvroUtilsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/AvroUtilsTest.java index 3087959c1e02..5c115aa89bcc 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/AvroUtilsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/AvroUtilsTest.java @@ -46,6 +46,7 @@ import org.apache.beam.sdk.schemas.Schema.FieldType; import org.apache.beam.sdk.schemas.logicaltypes.EnumerationType; import org.apache.beam.sdk.schemas.logicaltypes.OneOfType; +import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes; import org.apache.beam.sdk.schemas.utils.AvroGenerators.RecordSchemaGenerator; import org.apache.beam.sdk.schemas.utils.AvroUtils.TypeWithNullability; import org.apache.beam.sdk.testing.CoderProperties; @@ -714,6 +715,25 @@ public void testJdbcLogicalDateAndTimeRowDataToGenericRecord() { assertEquals(expectedRecord, AvroUtils.toGenericRecord(rowData, avroSchema)); } + @Test + public void testSqlTypesToGenericRecord() { + // SqlTypes to LogicalTypes.date conversion is one direction + java.time.LocalDate localDate = java.time.LocalDate.of(1979, 3, 14); + + Schema beamSchema = + Schema.builder() + .addField(Field.of("local_date", FieldType.logicalType(SqlTypes.DATE))) + .build(); + + Row rowData = Row.withSchema(beamSchema).addValue(localDate).build(); + + org.apache.avro.Schema avroSchema = AvroUtils.toAvroSchema(beamSchema); + GenericRecord expectedRecord = + new GenericRecordBuilder(avroSchema).set("local_date", localDate.toEpochDay()).build(); + + assertEquals(expectedRecord, AvroUtils.toGenericRecord(rowData, avroSchema)); + } + @Test public void testBeamRowToGenericRecord() { GenericRecord genericRecord = AvroUtils.toGenericRecord(getBeamRow(), null);