Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Managed Iceberg] Support partitioning time types (year, month, day, hour) #32939

Merged
merged 9 commits into from
Dec 17, 2024
2 changes: 1 addition & 1 deletion .github/trigger_files/IO_Iceberg_Integration_Tests.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run",
"modification": 3
"modification": 5
}
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@

* gcs-connector config options can be set via GcsOptions (Java) ([#32769](https://github.com/apache/beam/pull/32769)).
* Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
* [Managed Iceberg] Support partitioning by time (year, month, day, hour) for types `date`, `time`, `timestamp`, and `timestamp(tz)` ([#32939](https://github.com/apache/beam/pull/32939))
* Upgraded the default version of Hadoop dependencies to 3.4.1. Hadoop 2.10.2 is still supported (Java) ([#33011](https://github.com/apache/beam/issues/33011)).

## New Features / Improvements
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;

import java.io.IOException;
import java.time.LocalDateTime;
import java.time.YearMonth;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
Expand All @@ -31,21 +36,28 @@
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Splitter;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.Cache;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.RemovalNotification;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionKey;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.expressions.Literal;
import org.apache.iceberg.transforms.Transform;
import org.apache.iceberg.transforms.Transforms;
import org.apache.iceberg.types.Types;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -90,6 +102,7 @@ class DestinationState {
final Cache<PartitionKey, RecordWriter> writers;
private final List<SerializableDataFile> dataFiles = Lists.newArrayList();
@VisibleForTesting final Map<PartitionKey, Integer> writerCounts = Maps.newHashMap();
private final Map<String, PartitionField> partitionFieldMap = Maps.newHashMap();
private final List<Exception> exceptions = Lists.newArrayList();

DestinationState(IcebergDestination icebergDestination, Table table) {
Expand All @@ -98,6 +111,9 @@ class DestinationState {
this.spec = table.spec();
this.partitionKey = new PartitionKey(spec, schema);
this.table = table;
for (PartitionField partitionField : spec.fields()) {
partitionFieldMap.put(partitionField.name(), partitionField);
}

// build a cache of RecordWriters.
// writers will expire after 1 min of idle time.
Expand All @@ -123,7 +139,9 @@ class DestinationState {
throw rethrow;
}
openWriters--;
dataFiles.add(SerializableDataFile.from(recordWriter.getDataFile(), pk));
String partitionPath = getPartitionDataPath(pk.toPath(), partitionFieldMap);
dataFiles.add(
SerializableDataFile.from(recordWriter.getDataFile(), partitionPath));
})
.build();
}
Expand All @@ -136,7 +154,7 @@ class DestinationState {
* can't create a new writer, the {@link Record} is rejected and {@code false} is returned.
*/
boolean write(Record record) {
partitionKey.partition(record);
partitionKey.partition(getPartitionableRecord(record));

if (!writers.asMap().containsKey(partitionKey) && openWriters >= maxNumWriters) {
return false;
Expand Down Expand Up @@ -185,8 +203,65 @@ private RecordWriter createWriter(PartitionKey partitionKey) {
e);
}
}

/**
* Resolves an input {@link Record}'s partition values and returns another {@link Record} that
* can be applied to the destination's {@link PartitionSpec}.
*/
private Record getPartitionableRecord(Record record) {
if (spec.isUnpartitioned()) {
return record;
}
Record output = GenericRecord.create(schema);
for (PartitionField partitionField : spec.fields()) {
Transform<?, ?> transform = partitionField.transform();
Types.NestedField field = schema.findField(partitionField.sourceId());
String name = field.name();
Object value = record.getField(name);
@Nullable Literal<Object> literal = Literal.of(value.toString()).to(field.type());
if (literal == null || transform.isVoid() || transform.isIdentity()) {
output.setField(name, value);
} else {
output.setField(name, literal.value());
}
}
return output;
}
}

/**
* Returns an equivalent partition path that is made up of partition data. Needed to reconstruct a
* {@link DataFile}.
*/
@VisibleForTesting
static String getPartitionDataPath(
String partitionPath, Map<String, PartitionField> partitionFieldMap) {
if (partitionPath.isEmpty() || partitionFieldMap.isEmpty()) {
return partitionPath;
}
List<String> resolved = new ArrayList<>();
for (String partition : Splitter.on('/').splitToList(partitionPath)) {
List<String> nameAndValue = Splitter.on('=').splitToList(partition);
String name = nameAndValue.get(0);
String value = nameAndValue.get(1);
String transformName =
Preconditions.checkArgumentNotNull(partitionFieldMap.get(name)).transform().toString();
if (Transforms.month().toString().equals(transformName)) {
int month = YearMonth.parse(value).getMonthValue();
value = String.valueOf(month);
} else if (Transforms.hour().toString().equals(transformName)) {
long hour = ChronoUnit.HOURS.between(EPOCH, LocalDateTime.parse(value, HOUR_FORMATTER));
value = String.valueOf(hour);
}
resolved.add(name + "=" + value);
}
return String.join("/", resolved);
}

private static final DateTimeFormatter HOUR_FORMATTER =
DateTimeFormatter.ofPattern("yyyy-MM-dd-HH");
private static final LocalDateTime EPOCH = LocalDateTime.ofEpochSecond(0, 0, ZoneOffset.UTC);

private final Catalog catalog;
private final String filePrefix;
private final long maxFileSize;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,13 +116,14 @@ abstract static class Builder {
* Create a {@link SerializableDataFile} from a {@link DataFile} and its associated {@link
* PartitionKey}.
*/
static SerializableDataFile from(DataFile f, PartitionKey key) {
static SerializableDataFile from(DataFile f, String partitionPath) {

return SerializableDataFile.builder()
.setPath(f.path().toString())
.setFileFormat(f.format().toString())
.setRecordCount(f.recordCount())
.setFileSizeInBytes(f.fileSizeInBytes())
.setPartitionPath(key.toPath())
.setPartitionPath(partitionPath)
.setPartitionSpecId(f.specId())
.setKeyMetadata(f.keyMetadata())
.setSplitOffsets(f.splitOffsets())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ public void testWritePartitionedData() {
PartitionSpec partitionSpec =
PartitionSpec.builderFor(ICEBERG_SCHEMA)
.identity("bool")
.identity("modulo_5")
.hour("datetime")
.truncate("str", "value_x".length())
.build();
Table table =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,19 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;

import java.time.LocalDate;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.managed.Managed;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.TestStream;
Expand All @@ -49,12 +54,16 @@
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.IcebergGenerics;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.util.DateTimeUtil;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.hamcrest.Matchers;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.ClassRule;
Expand Down Expand Up @@ -360,4 +369,93 @@ public Void apply(Iterable<Row> input) {
return null;
}
}

@Test
public void testWritePartitionedData() {
Schema schema =
Schema.builder()
.addStringField("str")
.addInt32Field("int")
.addLogicalTypeField("y_date", SqlTypes.DATE)
.addLogicalTypeField("y_datetime", SqlTypes.DATETIME)
.addDateTimeField("y_datetime_tz")
.addLogicalTypeField("m_date", SqlTypes.DATE)
.addLogicalTypeField("m_datetime", SqlTypes.DATETIME)
.addDateTimeField("m_datetime_tz")
.addLogicalTypeField("d_date", SqlTypes.DATE)
.addLogicalTypeField("d_datetime", SqlTypes.DATETIME)
.addDateTimeField("d_datetime_tz")
.addLogicalTypeField("h_datetime", SqlTypes.DATETIME)
.addDateTimeField("h_datetime_tz")
.build();
org.apache.iceberg.Schema icebergSchema = IcebergUtils.beamSchemaToIcebergSchema(schema);
PartitionSpec spec =
PartitionSpec.builderFor(icebergSchema)
.identity("str")
.bucket("int", 5)
.year("y_date")
.year("y_datetime")
.year("y_datetime_tz")
.month("m_date")
.month("m_datetime")
.month("m_datetime_tz")
.day("d_date")
.day("d_datetime")
.day("d_datetime_tz")
.hour("h_datetime")
.hour("h_datetime_tz")
.build();
String identifier = "default.table_" + Long.toString(UUID.randomUUID().hashCode(), 16);

warehouse.createTable(TableIdentifier.parse(identifier), icebergSchema, spec);
Map<String, Object> config =
ImmutableMap.of(
"table",
identifier,
"catalog_properties",
ImmutableMap.of("type", "hadoop", "warehouse", warehouse.location));

List<Row> rows = new ArrayList<>();
for (int i = 0; i < 30; i++) {
long millis = i * 100_00_000_000L;
LocalDate localDate = DateTimeUtil.dateFromDays(i * 100);
LocalDateTime localDateTime = DateTimeUtil.timestampFromMicros(millis * 1000);
DateTime dateTime = new DateTime(millis).withZone(DateTimeZone.forOffsetHoursMinutes(3, 25));
Row row =
Row.withSchema(schema)
.addValues(
"str_" + i,
i,
localDate,
localDateTime,
dateTime,
localDate,
localDateTime,
dateTime,
localDate,
localDateTime,
dateTime,
localDateTime,
dateTime)
.build();
rows.add(row);
}

PCollection<Row> result =
testPipeline
.apply("Records To Add", Create.of(rows))
.setRowSchema(schema)
.apply(Managed.write(Managed.ICEBERG).withConfig(config))
.get(SNAPSHOTS_TAG);

PAssert.that(result)
.satisfies(new VerifyOutputs(Collections.singletonList(identifier), "append"));
testPipeline.run().waitUntilFinish();

Pipeline p = Pipeline.create(TestPipeline.testingPipelineOptions());
PCollection<Row> readRows =
p.apply(Managed.read(Managed.ICEBERG).withConfig(config)).getSinglePCollection();
PAssert.that(readRows).containsInAnyOrder(rows);
p.run();
}
}
Loading
Loading