Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Managed Iceberg] Allow updating partition specs during pipeline runtime #32879

Merged
merged 8 commits into from
Nov 5, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/trigger_files/IO_Iceberg_Integration_Tests.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run",
"modification": 4
"modification": 3
}
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
* [Managed Iceberg] Now available in Python SDK ([#31495](https://github.com/apache/beam/pull/31495))
* [Managed Iceberg] Add support for TIMESTAMP, TIME, and DATE types ([#32688](https://github.com/apache/beam/pull/32688))
* BigQuery CDC writes are now available in Python SDK, only supported when using StorageWrite API at least once mode ([#32527](https://github.com/apache/beam/issues/32527))
* [Managed Iceberg] Allow updating partition specs at runtime ([#32879](https://github.com/apache/beam/pull/32879))

## New Features / Improvements

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ public void processElement(
AppendFiles update = table.newAppend();
long numFiles = 0;
for (FileWriteResult result : fileWriteResults) {
DataFile dataFile = result.getDataFile(table.spec());
DataFile dataFile = result.getDataFile(table.specs());
update.appendFile(dataFile);
committedDataFileByteSize.update(dataFile.fileSizeInBytes());
committedDataFileRecordCount.update(dataFile.recordCount());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.beam.sdk.io.iceberg;

import com.google.auto.value.AutoValue;
import java.util.Map;
import org.apache.beam.sdk.schemas.AutoValueSchema;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.sdk.schemas.annotations.SchemaIgnore;
Expand Down Expand Up @@ -46,9 +47,9 @@ public TableIdentifier getTableIdentifier() {
}

@SchemaIgnore
public DataFile getDataFile(PartitionSpec spec) {
public DataFile getDataFile(Map<Integer, PartitionSpec> specs) {
if (cachedDataFile == null) {
cachedDataFile = getSerializableDataFile().createDataFile(spec);
cachedDataFile = getSerializableDataFile().createDataFile(specs);
}
return cachedDataFile;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@
*/
package org.apache.beam.sdk.io.iceberg;

import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;

import com.google.auto.value.AutoValue;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.beam.sdk.schemas.AutoValueSchema;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.FileFormat;
Expand Down Expand Up @@ -141,12 +142,14 @@ static SerializableDataFile from(DataFile f, PartitionKey key) {
* it from Beam-compatible types.
*/
@SuppressWarnings("nullness")
DataFile createDataFile(PartitionSpec partitionSpec) {
Preconditions.checkState(
partitionSpec.specId() == getPartitionSpecId(),
"Invalid partition spec id '%s'. This DataFile was originally created with spec id '%s'.",
partitionSpec.specId(),
getPartitionSpecId());
DataFile createDataFile(Map<Integer, PartitionSpec> partitionSpecs) {
PartitionSpec partitionSpec =
checkStateNotNull(
partitionSpecs.get(getPartitionSpecId()),
"This DataFile was originally created with spec id '%s'. Could not find "
+ "this spec id in table's partition specs: %s.",
getPartitionSpecId(),
partitionSpecs.keySet());

Metrics dataFileMetrics =
new Metrics(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,14 @@
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionKey;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.hadoop.HadoopCatalog;
import org.checkerframework.checker.nullness.qual.Nullable;
Expand Down Expand Up @@ -269,6 +271,25 @@ public void testRequireClosingBeforeFetchingDataFiles() {
assertThrows(IllegalStateException.class, writerManager::getSerializableDataFiles);
}

/** DataFile doesn't implement a .equals() method. Check equality manually. */
private static void checkDataFileEquality(DataFile d1, DataFile d2) {
assertEquals(d1.path(), d2.path());
assertEquals(d1.format(), d2.format());
assertEquals(d1.recordCount(), d2.recordCount());
assertEquals(d1.partition(), d2.partition());
assertEquals(d1.specId(), d2.specId());
assertEquals(d1.keyMetadata(), d2.keyMetadata());
assertEquals(d1.splitOffsets(), d2.splitOffsets());
assertEquals(d1.columnSizes(), d2.columnSizes());
assertEquals(d1.valueCounts(), d2.valueCounts());
assertEquals(d1.nullValueCounts(), d2.nullValueCounts());
assertEquals(d1.nanValueCounts(), d2.nanValueCounts());
assertEquals(d1.equalityFieldIds(), d2.equalityFieldIds());
assertEquals(d1.fileSequenceNumber(), d2.fileSequenceNumber());
assertEquals(d1.dataSequenceNumber(), d2.dataSequenceNumber());
assertEquals(d1.pos(), d2.pos());
}

@Test
public void testSerializableDataFileRoundTripEquality() throws IOException {
PartitionKey partitionKey = new PartitionKey(PARTITION_SPEC, ICEBERG_SCHEMA);
Expand All @@ -288,22 +309,54 @@ public void testSerializableDataFileRoundTripEquality() throws IOException {
assertEquals(2L, datafile.recordCount());

DataFile roundTripDataFile =
SerializableDataFile.from(datafile, partitionKey).createDataFile(PARTITION_SPEC);
// DataFile doesn't implement a .equals() method. Check equality manually
assertEquals(datafile.path(), roundTripDataFile.path());
assertEquals(datafile.format(), roundTripDataFile.format());
assertEquals(datafile.recordCount(), roundTripDataFile.recordCount());
assertEquals(datafile.partition(), roundTripDataFile.partition());
assertEquals(datafile.specId(), roundTripDataFile.specId());
assertEquals(datafile.keyMetadata(), roundTripDataFile.keyMetadata());
assertEquals(datafile.splitOffsets(), roundTripDataFile.splitOffsets());
assertEquals(datafile.columnSizes(), roundTripDataFile.columnSizes());
assertEquals(datafile.valueCounts(), roundTripDataFile.valueCounts());
assertEquals(datafile.nullValueCounts(), roundTripDataFile.nullValueCounts());
assertEquals(datafile.nanValueCounts(), roundTripDataFile.nanValueCounts());
assertEquals(datafile.equalityFieldIds(), roundTripDataFile.equalityFieldIds());
assertEquals(datafile.fileSequenceNumber(), roundTripDataFile.fileSequenceNumber());
assertEquals(datafile.dataSequenceNumber(), roundTripDataFile.dataSequenceNumber());
assertEquals(datafile.pos(), roundTripDataFile.pos());
SerializableDataFile.from(datafile, partitionKey)
.createDataFile(
ImmutableMap.<Integer, PartitionSpec>builder()
.put(PARTITION_SPEC.specId(), PARTITION_SPEC)
.build());

checkDataFileEquality(datafile, roundTripDataFile);
}

/**
* Users may update the table's spec while a write pipeline is running. Sometimes, this can happen
* after converting {@link DataFile} to {@link SerializableDataFile}s. When converting back to
* {@link DataFile} to commit in the {@link AppendFilesToTables} step, we need to make sure to use
* the same {@link PartitionSpec} it was originally created with.
*
* <p>This test checks that we're preserving the right {@link PartitionSpec} when such an update
* happens.
*/
@Test
public void testRecreateSerializableDataAfterUpdatingPartitionSpec() throws IOException {
PartitionKey partitionKey = new PartitionKey(PARTITION_SPEC, ICEBERG_SCHEMA);

Row row = Row.withSchema(BEAM_SCHEMA).addValues(1, "abcdef", true).build();
Row row2 = Row.withSchema(BEAM_SCHEMA).addValues(2, "abcxyz", true).build();
// same partition for both records (name_trunc=abc, bool=true)
partitionKey.partition(IcebergUtils.beamRowToIcebergRecord(ICEBERG_SCHEMA, row));

// write some rows
RecordWriter writer =
new RecordWriter(catalog, windowedDestination.getValue(), "test_file_name", partitionKey);
writer.write(IcebergUtils.beamRowToIcebergRecord(ICEBERG_SCHEMA, row));
writer.write(IcebergUtils.beamRowToIcebergRecord(ICEBERG_SCHEMA, row2));
writer.close();

// fetch data file and its serializable version
DataFile datafile = writer.getDataFile();
SerializableDataFile serializableDataFile = SerializableDataFile.from(datafile, partitionKey);

assertEquals(2L, datafile.recordCount());
assertEquals(serializableDataFile.getPartitionSpecId(), datafile.specId());

// update spec
Table table = catalog.loadTable(windowedDestination.getValue().getTableIdentifier());
table.updateSpec().addField("id").removeField("bool").commit();

Map<Integer, PartitionSpec> updatedSpecs = table.specs();
DataFile roundTripDataFile = serializableDataFile.createDataFile(updatedSpecs);

checkDataFileEquality(datafile, roundTripDataFile);
}
}
Loading