Skip to content

Commit

Permalink
[Managed Iceberg] Add support for TIMESTAMP, TIME, and DATE types (ap…
Browse files Browse the repository at this point in the history
…ache#32688)

* support timestamp, time, date types

* add to changes md

* always write java time LocalDateTime for iceberg TIMESTAMP

* update java doc

* add timezone support with Strings

* clean up; reading iceberg timestamptz will return sqltype.datetime

* support string, long, sql.datetime, and datetime; timestamp returns sql.datetime and timestamptTZ returns datetime
  • Loading branch information
ahmedabu98 authored and reeba212 committed Dec 4, 2024
1 parent a781083 commit 0e90f5a
Show file tree
Hide file tree
Showing 7 changed files with 355 additions and 41 deletions.
2 changes: 1 addition & 1 deletion .github/trigger_files/IO_Iceberg_Integration_Tests.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run",
"modification": 2
"modification": 3
}
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@

* Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
* [Managed Iceberg] Support creating tables if needed ([#32686](https://github.com/apache/beam/pull/32686))
* [Managed Iceberg] Add support for TIMESTAMP, TIME, and DATE types ([#32688](https://github.com/apache/beam/pull/32688))

## New Features / Improvements

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.beam.sdk.io.iceberg.hive.testutils.HiveMetastoreExtension;
import org.apache.beam.sdk.managed.Managed;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
Expand Down Expand Up @@ -64,7 +65,10 @@
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.util.DateTimeUtil;
import org.apache.thrift.TException;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Rule;
Expand Down Expand Up @@ -100,6 +104,10 @@ public class IcebergHiveCatalogIT {
.addArrayField("arr_long", Schema.FieldType.INT64)
.addRowField("row", NESTED_ROW_SCHEMA)
.addNullableRowField("nullable_row", NESTED_ROW_SCHEMA)
.addDateTimeField("datetime_tz")
.addLogicalTypeField("datetime", SqlTypes.DATETIME)
.addLogicalTypeField("date", SqlTypes.DATE)
.addLogicalTypeField("time", SqlTypes.TIME)
.build();

private static final SimpleFunction<Long, Row> ROW_FUNC =
Expand Down Expand Up @@ -127,6 +135,10 @@ public Row apply(Long num) {
.addValue(LongStream.range(1, num % 10).boxed().collect(Collectors.toList()))
.addValue(nestedRow)
.addValue(num % 2 == 0 ? null : nestedRow)
.addValue(new DateTime(num).withZone(DateTimeZone.forOffsetHoursMinutes(3, 25)))
.addValue(DateTimeUtil.timestampFromMicros(num))
.addValue(DateTimeUtil.dateFromDays(Integer.parseInt(strNum)))
.addValue(DateTimeUtil.timeFromMicros(num))
.build();
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,16 @@
* <td> DOUBLE </td> <td> DOUBLE </td>
* </tr>
* <tr>
* <td> DATETIME </td> <td> STRING </td>
* <td> SqlTypes.DATETIME </td> <td> TIMESTAMP </td>
* </tr>
* <tr>
* <td> DATETIME </td> <td> TIMESTAMPTZ </td>
* </tr>
* <tr>
* <td> SqlTypes.DATE </td> <td> DATE </td>
* </tr>
* <tr>
* <td> SqlTypes.TIME </td> <td> TIME </td>
* </tr>
* <tr>
* <td> ITERABLE </td> <td> LIST </td>
Expand All @@ -166,6 +175,29 @@
* </tr>
* </table>
*
* <p><b>Note:</b> {@code SqlTypes} are Beam logical types.
*
* <h3>Note on timestamps</h3>
*
* <p>For an existing table, the following Beam types are supported for both {@code timestamp} and
* {@code timestamptz}:
*
* <ul>
* <li>{@code SqlTypes.DATETIME} --> Using a {@link java.time.LocalDateTime} object
* <li>{@code DATETIME} --> Using a {@link org.joda.time.DateTime} object
* <li>{@code INT64} --> Using a {@link Long} representing micros since EPOCH
* <li>{@code STRING} --> Using a timestamp {@link String} representation (e.g. {@code
* "2024-10-08T13:18:20.053+03:27"})
* </ul>
*
* <p><b>Note</b>: If you expect Beam to create the Iceberg table at runtime, please provide {@code
* SqlTypes.DATETIME} for a {@code timestamp} column and {@code DATETIME} for a {@code timestamptz}
* column. If the table does not exist, Beam will treat {@code STRING} and {@code INT64} at
* face-value and create equivalent column types.
*
* <p>For Iceberg reads, the connector will produce Beam {@code SqlTypes.DATETIME} types for
* Iceberg's {@code timestamp} and {@code DATETIME} types for {@code timestamptz}.
*
* <h3>Dynamic Destinations</h3>
*
* <p>Managed Iceberg supports writing to dynamic destinations. To do so, please provide an
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,18 @@
import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;

import java.nio.ByteBuffer;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
import org.apache.beam.sdk.util.Preconditions;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
Expand All @@ -34,15 +40,13 @@
import org.apache.iceberg.data.Record;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.DateTimeUtil;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.Instant;

/** Utilities for converting between Beam and Iceberg types. */
/** Utilities for converting between Beam and Iceberg types, made public for user's convenience. */
public class IcebergUtils {
// This is made public for users convenience, as many may have more experience working with
// Iceberg types.

private IcebergUtils() {}

private static final Map<Schema.TypeName, Type> BEAM_TYPES_TO_ICEBERG_TYPES =
Expand All @@ -54,6 +58,14 @@ private IcebergUtils() {}
.put(Schema.TypeName.DOUBLE, Types.DoubleType.get())
.put(Schema.TypeName.STRING, Types.StringType.get())
.put(Schema.TypeName.BYTES, Types.BinaryType.get())
.put(Schema.TypeName.DATETIME, Types.TimestampType.withZone())
.build();

private static final Map<String, Type> BEAM_LOGICAL_TYPES_TO_ICEBERG_TYPES =
ImmutableMap.<String, Type>builder()
.put(SqlTypes.DATE.getIdentifier(), Types.DateType.get())
.put(SqlTypes.TIME.getIdentifier(), Types.TimeType.get())
.put(SqlTypes.DATETIME.getIdentifier(), Types.TimestampType.withoutZone())
.build();

private static Schema.FieldType icebergTypeToBeamFieldType(final Type type) {
Expand All @@ -69,9 +81,15 @@ private static Schema.FieldType icebergTypeToBeamFieldType(final Type type) {
case DOUBLE:
return Schema.FieldType.DOUBLE;
case DATE:
return Schema.FieldType.logicalType(SqlTypes.DATE);
case TIME:
case TIMESTAMP: // TODO: Logical types?
return Schema.FieldType.DATETIME;
return Schema.FieldType.logicalType(SqlTypes.TIME);
case TIMESTAMP:
Types.TimestampType ts = (Types.TimestampType) type.asPrimitiveType();
if (ts.shouldAdjustToUTC()) {
return Schema.FieldType.DATETIME;
}
return Schema.FieldType.logicalType(SqlTypes.DATETIME);
case STRING:
return Schema.FieldType.STRING;
case UUID:
Expand Down Expand Up @@ -151,6 +169,14 @@ static TypeAndMaxId beamFieldTypeToIcebergFieldType(
// other types.
return new TypeAndMaxId(
--nestedFieldId, BEAM_TYPES_TO_ICEBERG_TYPES.get(beamType.getTypeName()));
} else if (beamType.getTypeName().isLogicalType()) {
String logicalTypeIdentifier =
checkArgumentNotNull(beamType.getLogicalType()).getIdentifier();
@Nullable Type type = BEAM_LOGICAL_TYPES_TO_ICEBERG_TYPES.get(logicalTypeIdentifier);
if (type == null) {
throw new RuntimeException("Unsupported Beam logical type " + logicalTypeIdentifier);
}
return new TypeAndMaxId(--nestedFieldId, type);
} else if (beamType.getTypeName().isCollectionType()) { // ARRAY or ITERABLE
Schema.FieldType beamCollectionType =
Preconditions.checkArgumentNotNull(beamType.getCollectionElementType());
Expand Down Expand Up @@ -227,8 +253,6 @@ static TypeAndMaxId beamFieldTypeToIcebergFieldType(
*
* <p>The following unsupported Beam types will be defaulted to {@link Types.StringType}:
* <li>{@link Schema.TypeName.DECIMAL}
* <li>{@link Schema.TypeName.DATETIME}
* <li>{@link Schema.TypeName.LOGICAL_TYPE}
*/
public static org.apache.iceberg.Schema beamSchemaToIcebergSchema(final Schema schema) {
List<Types.NestedField> fields = new ArrayList<>(schema.getFieldCount());
Expand Down Expand Up @@ -282,12 +306,20 @@ private static void copyFieldIntoRecord(Record rec, Types.NestedField field, Row
Optional.ofNullable(value.getDouble(name)).ifPresent(v -> rec.setField(name, v));
break;
case DATE:
throw new UnsupportedOperationException("Date fields not yet supported");
Optional.ofNullable(value.getLogicalTypeValue(name, LocalDate.class))
.ifPresent(v -> rec.setField(name, v));
break;
case TIME:
throw new UnsupportedOperationException("Time fields not yet supported");
Optional.ofNullable(value.getLogicalTypeValue(name, LocalTime.class))
.ifPresent(v -> rec.setField(name, v));
break;
case TIMESTAMP:
Optional.ofNullable(value.getDateTime(name))
.ifPresent(v -> rec.setField(name, v.getMillis()));
Object val = value.getValue(name);
if (val == null) {
break;
}
Types.TimestampType ts = (Types.TimestampType) field.type().asPrimitiveType();
rec.setField(name, getIcebergTimestampValue(val, ts.shouldAdjustToUTC()));
break;
case STRING:
Optional.ofNullable(value.getString(name)).ifPresent(v -> rec.setField(name, v));
Expand Down Expand Up @@ -322,6 +354,55 @@ private static void copyFieldIntoRecord(Record rec, Types.NestedField field, Row
}
}

/**
* Returns the appropriate value for an Iceberg timestamp field
*
* <p>If `timestamp`, we resolve incoming values to a {@link LocalDateTime}.
*
* <p>If `timestamptz`, we resolve to a UTC {@link OffsetDateTime}. Iceberg already resolves all
* incoming timestamps to UTC, so there is no harm in doing it from our side.
*
* <p>Valid types are:
*
* <ul>
* <li>{@link SqlTypes.DATETIME} --> {@link LocalDateTime}
* <li>{@link Schema.FieldType.DATETIME} --> {@link Instant}
* <li>{@link Schema.FieldType.INT64} --> {@link Long}
* <li>{@link Schema.FieldType.STRING} --> {@link String}
* </ul>
*/
private static Object getIcebergTimestampValue(Object beamValue, boolean shouldAdjustToUtc) {
// timestamptz
if (shouldAdjustToUtc) {
if (beamValue instanceof LocalDateTime) { // SqlTypes.DATETIME
return OffsetDateTime.of((LocalDateTime) beamValue, ZoneOffset.UTC);
} else if (beamValue instanceof Instant) { // FieldType.DATETIME
return DateTimeUtil.timestamptzFromMicros(((Instant) beamValue).getMillis() * 1000L);
} else if (beamValue instanceof Long) { // FieldType.INT64
return DateTimeUtil.timestamptzFromMicros((Long) beamValue);
} else if (beamValue instanceof String) { // FieldType.STRING
return OffsetDateTime.parse((String) beamValue).withOffsetSameInstant(ZoneOffset.UTC);
} else {
throw new UnsupportedOperationException(
"Unsupported Beam type for Iceberg timestamp with timezone: " + beamValue.getClass());
}
}

// timestamp
if (beamValue instanceof LocalDateTime) { // SqlType.DATETIME
return beamValue;
} else if (beamValue instanceof Instant) { // FieldType.DATETIME
return DateTimeUtil.timestampFromMicros(((Instant) beamValue).getMillis() * 1000L);
} else if (beamValue instanceof Long) { // FieldType.INT64
return DateTimeUtil.timestampFromMicros((Long) beamValue);
} else if (beamValue instanceof String) { // FieldType.STRING
return LocalDateTime.parse((String) beamValue);
} else {
throw new UnsupportedOperationException(
"Unsupported Beam type for Iceberg timestamp with timezone: " + beamValue.getClass());
}
}

/** Converts an Iceberg {@link Record} to a Beam {@link Row}. */
public static Row icebergRecordToBeamRow(Schema schema, Record record) {
Row.Builder rowBuilder = Row.withSchema(schema);
Expand All @@ -345,16 +426,17 @@ public static Row icebergRecordToBeamRow(Schema schema, Record record) {
case FLOAT: // Iceberg and Beam both use float
case DOUBLE: // Iceberg and Beam both use double
case STRING: // Iceberg and Beam both use String
case BOOLEAN: // Iceberg and Beam both use String
case BOOLEAN: // Iceberg and Beam both use boolean
case ARRAY:
case ITERABLE:
case MAP:
rowBuilder.addValue(icebergValue);
break;
case DATETIME:
// Iceberg uses a long for millis; Beam uses joda time DateTime
long millis = (long) icebergValue;
rowBuilder.addValue(new DateTime(millis, DateTimeZone.UTC));
// Iceberg uses a long for micros.
// Beam DATETIME uses joda's DateTime, which only supports millis,
// so we do lose some precision here
rowBuilder.addValue(getBeamDateTimeValue(icebergValue));
break;
case BYTES:
// Iceberg uses ByteBuffer; Beam uses byte[]
Expand All @@ -369,13 +451,59 @@ public static Row icebergRecordToBeamRow(Schema schema, Record record) {
rowBuilder.addValue(icebergRecordToBeamRow(nestedSchema, nestedRecord));
break;
case LOGICAL_TYPE:
throw new UnsupportedOperationException(
"Cannot convert iceberg field to Beam logical type");
rowBuilder.addValue(getLogicalTypeValue(icebergValue, field.getType()));
break;
default:
throw new UnsupportedOperationException(
"Unsupported Beam type: " + field.getType().getTypeName());
}
}
return rowBuilder.build();
}

private static DateTime getBeamDateTimeValue(Object icebergValue) {
long micros;
if (icebergValue instanceof OffsetDateTime) {
micros = DateTimeUtil.microsFromTimestamptz((OffsetDateTime) icebergValue);
} else if (icebergValue instanceof LocalDateTime) {
micros = DateTimeUtil.microsFromTimestamp((LocalDateTime) icebergValue);
} else if (icebergValue instanceof Long) {
micros = (long) icebergValue;
} else if (icebergValue instanceof String) {
return DateTime.parse((String) icebergValue);
} else {
throw new UnsupportedOperationException(
"Unsupported Iceberg type for Beam type DATETIME: " + icebergValue.getClass());
}
return new DateTime(micros / 1000L);
}

private static Object getLogicalTypeValue(Object icebergValue, Schema.FieldType type) {
if (icebergValue instanceof String) {
String strValue = (String) icebergValue;
if (type.isLogicalType(SqlTypes.DATE.getIdentifier())) {
return LocalDate.parse(strValue);
} else if (type.isLogicalType(SqlTypes.TIME.getIdentifier())) {
return LocalTime.parse(strValue);
} else if (type.isLogicalType(SqlTypes.DATETIME.getIdentifier())) {
return LocalDateTime.parse(strValue);
}
} else if (icebergValue instanceof Long) {
if (type.isLogicalType(SqlTypes.TIME.getIdentifier())) {
return DateTimeUtil.timeFromMicros((Long) icebergValue);
} else if (type.isLogicalType(SqlTypes.DATETIME.getIdentifier())) {
return DateTimeUtil.timestampFromMicros((Long) icebergValue);
}
} else if (icebergValue instanceof Integer
&& type.isLogicalType(SqlTypes.DATE.getIdentifier())) {
return DateTimeUtil.dateFromDays((Integer) icebergValue);
} else if (icebergValue instanceof OffsetDateTime
&& type.isLogicalType(SqlTypes.DATETIME.getIdentifier())) {
return ((OffsetDateTime) icebergValue)
.withOffsetSameInstant(ZoneOffset.UTC)
.toLocalDateTime();
}
// LocalDateTime, LocalDate, LocalTime
return icebergValue;
}
}
Loading

0 comments on commit 0e90f5a

Please sign in to comment.