Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Managed Iceberg] Support partitioning time types (year, month, day, hour) #32939

Merged
merged 9 commits into from
Dec 17, 2024
2 changes: 1 addition & 1 deletion .github/trigger_files/IO_Iceberg_Integration_Tests.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run",
"modification": 2
"modification": 4
}
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@
* [Managed Iceberg] Now available in Python SDK ([#31495](https://github.com/apache/beam/pull/31495))
* [Managed Iceberg] Add support for TIMESTAMP, TIME, and DATE types ([#32688](https://github.com/apache/beam/pull/32688))
* BigQuery CDC writes are now available in Python SDK, only supported when using StorageWrite API at least once mode ([#32527](https://github.com/apache/beam/issues/32527))
* [Managed Iceberg] Support partitioning time types (year, month, day, hour) ([#32939](https://github.com/apache/beam/pull/32939))
ahmedabu98 marked this conversation as resolved.
Show resolved Hide resolved
* [Managed Iceberg] Allow updating table partition specs during pipeline runtime ([#32879](https://github.com/apache/beam/pull/32879))
* Added BigQueryIO as a Managed IO ([#31486](https://github.com/apache/beam/pull/31486))
* Support for writing to [Solace messages queues](https://solace.com/) (`SolaceIO.Write`) added (Java) ([#31905](https://github.com/apache/beam/issues/31905)).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;

import java.io.IOException;
import java.time.LocalDateTime;
import java.time.YearMonth;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
Expand All @@ -31,21 +36,29 @@
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Splitter;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.Cache;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.RemovalNotification;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionKey;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.expressions.Literal;
import org.apache.iceberg.transforms.Transform;
import org.apache.iceberg.transforms.Transforms;
import org.apache.iceberg.types.Types;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -90,6 +103,7 @@ class DestinationState {
final Cache<PartitionKey, RecordWriter> writers;
private final List<SerializableDataFile> dataFiles = Lists.newArrayList();
@VisibleForTesting final Map<PartitionKey, Integer> writerCounts = Maps.newHashMap();
private final Map<String, PartitionField> partitionFieldMap = Maps.newHashMap();
private final List<Exception> exceptions = Lists.newArrayList();

DestinationState(IcebergDestination icebergDestination, Table table) {
Expand All @@ -98,6 +112,9 @@ class DestinationState {
this.spec = table.spec();
this.partitionKey = new PartitionKey(spec, schema);
this.table = table;
for (PartitionField partitionField : spec.fields()) {
partitionFieldMap.put(partitionField.name(), partitionField);
}

// build a cache of RecordWriters.
// writers will expire after 1 min of idle time.
Expand All @@ -123,7 +140,9 @@ class DestinationState {
throw rethrow;
}
openWriters--;
dataFiles.add(SerializableDataFile.from(recordWriter.getDataFile(), pk));
String partitionPath = getPartitionDataPath(pk.toPath(), partitionFieldMap);
dataFiles.add(
SerializableDataFile.from(recordWriter.getDataFile(), partitionPath));
})
.build();
}
Expand All @@ -136,7 +155,7 @@ class DestinationState {
* can't create a new writer, the {@link Record} is rejected and {@code false} is returned.
*/
boolean write(Record record) {
partitionKey.partition(record);
partitionKey.partition(getPartitionableRecord(record));

if (!writers.asMap().containsKey(partitionKey) && openWriters >= maxNumWriters) {
return false;
Expand Down Expand Up @@ -185,8 +204,65 @@ private RecordWriter createWriter(PartitionKey partitionKey) {
e);
}
}

/**
* Resolves an input {@link Record}'s partition values and returns another {@link Record} that
* can be applied to the destination's {@link PartitionSpec}.
*/
private Record getPartitionableRecord(Record record) {
if (spec.isUnpartitioned()) {
return record;
}
Record output = GenericRecord.create(schema);
for (PartitionField partitionField : spec.fields()) {
Transform<?, ?> transform = partitionField.transform();
Types.NestedField field = schema.findField(partitionField.sourceId());
String name = field.name();
Object value = record.getField(name);
@Nullable Literal<Object> literal = Literal.of(value.toString()).to(field.type());
if (literal == null || transform.isVoid() || transform.isIdentity()) {
output.setField(name, value);
} else {
output.setField(name, literal.value());
}
}
return output;
}
}

/**
* Returns an equivalent partition path that is made up of partition data. Needed to reconstruct a
* {@link DataFile}.
*/
@VisibleForTesting
static String getPartitionDataPath(
String partitionPath, Map<String, PartitionField> partitionFieldMap) {
if (Strings.isNullOrEmpty(partitionPath) || partitionFieldMap.isEmpty()) {
ahmedabu98 marked this conversation as resolved.
Show resolved Hide resolved
return partitionPath;
}
List<String> resolved = new ArrayList<>();
for (String partition : Splitter.on('/').splitToList(partitionPath)) {
List<String> parts = Splitter.on('=').splitToList(partition);
String name = parts.get(0);
String value = parts.get(1);
String transformName =
Preconditions.checkArgumentNotNull(partitionFieldMap.get(name)).transform().toString();
if (Transforms.month().toString().equals(transformName)) {
int month = YearMonth.parse(value).getMonthValue();
value = String.valueOf(month);
} else if (Transforms.hour().toString().equals(transformName)) {
long hour = ChronoUnit.HOURS.between(EPOCH, LocalDateTime.parse(value, HOUR_FORMATTER));
value = String.valueOf(hour);
}
resolved.add(name + "=" + value);
}
return String.join("/", resolved);
}

private static final DateTimeFormatter HOUR_FORMATTER =
DateTimeFormatter.ofPattern("yyyy-MM-dd-HH");
private static final LocalDateTime EPOCH = LocalDateTime.ofEpochSecond(0, 0, ZoneOffset.UTC);

private final Catalog catalog;
private final String filePrefix;
private final long maxFileSize;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,13 +116,14 @@ abstract static class Builder {
* Create a {@link SerializableDataFile} from a {@link DataFile} and its associated {@link
* PartitionKey}.
*/
static SerializableDataFile from(DataFile f, PartitionKey key) {
static SerializableDataFile from(DataFile f, String partitionPath) {

return SerializableDataFile.builder()
.setPath(f.path().toString())
.setFileFormat(f.format().toString())
.setRecordCount(f.recordCount())
.setFileSizeInBytes(f.fileSizeInBytes())
.setPartitionPath(key.toPath())
.setPartitionPath(partitionPath)
.setPartitionSpecId(f.specId())
.setKeyMetadata(f.keyMetadata())
.setSplitOffsets(f.splitOffsets())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ public void testWritePartitionedData() {
PartitionSpec partitionSpec =
PartitionSpec.builderFor(ICEBERG_SCHEMA)
.identity("bool")
.identity("modulo_5")
.hour("datetime")
.truncate("str", "value_x".length())
.build();
Table table =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,19 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;

import java.time.LocalDate;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.managed.Managed;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.TestStream;
Expand All @@ -49,12 +54,16 @@
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.IcebergGenerics;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.util.DateTimeUtil;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.hamcrest.Matchers;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.ClassRule;
Expand Down Expand Up @@ -360,4 +369,93 @@ public Void apply(Iterable<Row> input) {
return null;
}
}

@Test
public void testWritePartitionedData() {
Schema schema =
Schema.builder()
.addStringField("str")
.addInt32Field("int")
.addLogicalTypeField("y_date", SqlTypes.DATE)
.addLogicalTypeField("y_datetime", SqlTypes.DATETIME)
.addDateTimeField("y_datetime_tz")
.addLogicalTypeField("m_date", SqlTypes.DATE)
.addLogicalTypeField("m_datetime", SqlTypes.DATETIME)
.addDateTimeField("m_datetime_tz")
.addLogicalTypeField("d_date", SqlTypes.DATE)
.addLogicalTypeField("d_datetime", SqlTypes.DATETIME)
.addDateTimeField("d_datetime_tz")
.addLogicalTypeField("h_datetime", SqlTypes.DATETIME)
.addDateTimeField("h_datetime_tz")
.build();
org.apache.iceberg.Schema icebergSchema = IcebergUtils.beamSchemaToIcebergSchema(schema);
PartitionSpec spec =
PartitionSpec.builderFor(icebergSchema)
.identity("str")
.bucket("int", 5)
.year("y_date")
.year("y_datetime")
.year("y_datetime_tz")
.month("m_date")
.month("m_datetime")
.month("m_datetime_tz")
.day("d_date")
.day("d_datetime")
.day("d_datetime_tz")
.hour("h_datetime")
.hour("h_datetime_tz")
.build();
String identifier = "default.table_" + Long.toString(UUID.randomUUID().hashCode(), 16);

warehouse.createTable(TableIdentifier.parse(identifier), icebergSchema, spec);
Map<String, Object> config =
ImmutableMap.of(
"table",
identifier,
"catalog_properties",
ImmutableMap.of("type", "hadoop", "warehouse", warehouse.location));

List<Row> rows = new ArrayList<>();
for (int i = 0; i < 30; i++) {
long millis = i * 100_00_000_000L;
LocalDate localDate = DateTimeUtil.dateFromDays(i * 100);
LocalDateTime localDateTime = DateTimeUtil.timestampFromMicros(millis * 1000);
DateTime dateTime = new DateTime(millis).withZone(DateTimeZone.forOffsetHoursMinutes(3, 25));
Row row =
Row.withSchema(schema)
.addValues(
"str_" + i,
i,
localDate,
localDateTime,
dateTime,
localDate,
localDateTime,
dateTime,
localDate,
localDateTime,
dateTime,
localDateTime,
dateTime)
.build();
rows.add(row);
}

PCollection<Row> result =
testPipeline
.apply("Records To Add", Create.of(rows))
.setRowSchema(schema)
.apply(Managed.write(Managed.ICEBERG).withConfig(config))
.get(SNAPSHOTS_TAG);

PAssert.that(result)
.satisfies(new VerifyOutputs(Collections.singletonList(identifier), "append"));
testPipeline.run().waitUntilFinish();

Pipeline p = Pipeline.create(TestPipeline.testingPipelineOptions());
PCollection<Row> readRows =
p.apply(Managed.read(Managed.ICEBERG).withConfig(config)).getSinglePCollection();
PAssert.that(readRows).containsInAnyOrder(rows);
p.run();
}
}
Loading
Loading