diff --git a/.github/trigger_files/IO_Iceberg_Integration_Tests.json b/.github/trigger_files/IO_Iceberg_Integration_Tests.json index 3f63c0c9975f..bbdc3a3910ef 100644 --- a/.github/trigger_files/IO_Iceberg_Integration_Tests.json +++ b/.github/trigger_files/IO_Iceberg_Integration_Tests.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run", - "modification": 2 + "modification": 3 } diff --git a/CHANGES.md b/CHANGES.md index be4e0ba4d0f6..364b1a5fbdef 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -107,6 +107,7 @@ * Significantly improved performance of Kafka IO reads that enable [commitOffsetsInFinalize](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#commitOffsetsInFinalize--) by removing the data reshuffle from SDF implementation. ([#31682](https://github.com/apache/beam/pull/31682)). * Added support for dynamic writing in MqttIO (Java) ([#19376](https://github.com/apache/beam/issues/19376)) * Optimized Spark Runner parDo transform evaluator (Java) ([#32537](https://github.com/apache/beam/issues/32537)) +* [Managed Iceberg] More efficient manifest file writes/commits ([#32666](https://github.com/apache/beam/issues/32666)) ## Breaking Changes diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTables.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTables.java index b91253cf3c12..defe4f2a603d 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTables.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTables.java @@ -20,6 +20,7 @@ import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.metrics.Counter; +import org.apache.beam.sdk.metrics.Distribution; import org.apache.beam.sdk.metrics.Metrics; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.GroupByKey; @@ -31,6 +32,7 @@ import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.iceberg.AppendFiles; +import org.apache.iceberg.DataFile; import org.apache.iceberg.Snapshot; import org.apache.iceberg.Table; import org.apache.iceberg.catalog.Catalog; @@ -73,6 +75,12 @@ private static class AppendFilesToTablesDoFn extends DoFn>, KV> { private final Counter snapshotsCreated = Metrics.counter(AppendFilesToTables.class, "snapshotsCreated"); + private final Counter dataFilesCommitted = + Metrics.counter(AppendFilesToTables.class, "dataFilesCommitted"); + private final Distribution committedDataFileByteSize = + Metrics.distribution(RecordWriter.class, "committedDataFileByteSize"); + private final Distribution committedDataFileRecordCount = + Metrics.distribution(RecordWriter.class, "committedDataFileRecordCount"); private final IcebergCatalogConfig catalogConfig; @@ -94,18 +102,28 @@ public void processElement( @Element KV> element, OutputReceiver> out, BoundedWindow window) { - if (!element.getValue().iterator().hasNext()) { + String tableStringIdentifier = element.getKey(); + Iterable fileWriteResults = element.getValue(); + if (!fileWriteResults.iterator().hasNext()) { return; } Table table = getCatalog().loadTable(TableIdentifier.parse(element.getKey())); AppendFiles update = table.newAppend(); - for (FileWriteResult writtenFile : element.getValue()) { - update.appendManifest(writtenFile.getManifestFile()); + long numFiles = 0; + for (FileWriteResult result : fileWriteResults) { + DataFile dataFile = result.getDataFile(table.spec()); + update.appendFile(dataFile); + committedDataFileByteSize.update(dataFile.fileSizeInBytes()); + committedDataFileRecordCount.update(dataFile.recordCount()); + numFiles++; } + // this commit will create a ManifestFile. we don't need to manually create one. update.commit(); + dataFilesCommitted.inc(numFiles); + Snapshot snapshot = table.currentSnapshot(); - LOG.info("Created new snapshot for table '{}': {}", element.getKey(), snapshot); + LOG.info("Created new snapshot for table '{}': {}", tableStringIdentifier, snapshot); snapshotsCreated.inc(); out.outputWithTimestamp( KV.of(element.getKey(), SnapshotInfo.fromSnapshot(snapshot)), window.maxTimestamp()); diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/FileWriteResult.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/FileWriteResult.java index 2459c0befde1..c4090d9e7e53 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/FileWriteResult.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/FileWriteResult.java @@ -18,12 +18,11 @@ package org.apache.beam.sdk.io.iceberg; import com.google.auto.value.AutoValue; -import java.io.IOException; import org.apache.beam.sdk.schemas.AutoValueSchema; import org.apache.beam.sdk.schemas.annotations.DefaultSchema; import org.apache.beam.sdk.schemas.annotations.SchemaIgnore; -import org.apache.iceberg.ManifestFile; -import org.apache.iceberg.ManifestFiles; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.catalog.TableIdentifier; import org.checkerframework.checker.nullness.qual.MonotonicNonNull; @@ -32,12 +31,11 @@ abstract class FileWriteResult { private transient @MonotonicNonNull TableIdentifier cachedTableIdentifier; - private transient @MonotonicNonNull ManifestFile cachedManifestFile; + private transient @MonotonicNonNull DataFile cachedDataFile; abstract String getTableIdentifierString(); - @SuppressWarnings("mutable") - abstract byte[] getManifestFileBytes(); + abstract SerializableDataFile getSerializableDataFile(); @SchemaIgnore public TableIdentifier getTableIdentifier() { @@ -48,15 +46,11 @@ public TableIdentifier getTableIdentifier() { } @SchemaIgnore - public ManifestFile getManifestFile() { - if (cachedManifestFile == null) { - try { - cachedManifestFile = ManifestFiles.decode(getManifestFileBytes()); - } catch (IOException exc) { - throw new RuntimeException("Error decoding manifest file bytes"); - } + public DataFile getDataFile(PartitionSpec spec) { + if (cachedDataFile == null) { + cachedDataFile = getSerializableDataFile().createDataFile(spec); } - return cachedManifestFile; + return cachedDataFile; } public static Builder builder() { @@ -68,18 +62,13 @@ abstract static class Builder { abstract Builder setTableIdentifierString(String tableIdString); - abstract Builder setManifestFileBytes(byte[] manifestFileBytes); + abstract Builder setSerializableDataFile(SerializableDataFile dataFile); @SchemaIgnore public Builder setTableIdentifier(TableIdentifier tableId) { return setTableIdentifierString(tableId.toString()); } - @SchemaIgnore - public Builder setManifestFile(ManifestFile manifestFile) throws IOException { - return setManifestFileBytes(ManifestFiles.encode(manifestFile)); - } - public abstract FileWriteResult build(); } } diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java index 6321f9006e2a..5b63803a52d0 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java @@ -318,7 +318,7 @@ public WriteRows to(DynamicDestinations destinations) { * org.apache.iceberg.Snapshot} is produced. * *

Roughly every triggeringFrequency duration, records are written to data files and appended - * to the respective table. Each append operation created a new table snapshot. + * to the respective table. Each append operation creates a new table snapshot. * *

Generally speaking, increasing this duration will result in fewer, larger data files and * fewer snapshots. diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriter.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriter.java index 92b5dd58b51e..9a3262e19845 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriter.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriter.java @@ -19,7 +19,6 @@ import java.io.IOException; import org.apache.beam.sdk.metrics.Counter; -import org.apache.beam.sdk.metrics.Distribution; import org.apache.beam.sdk.metrics.Metrics; import org.apache.iceberg.DataFile; import org.apache.iceberg.FileFormat; @@ -38,9 +37,8 @@ class RecordWriter { private static final Logger LOG = LoggerFactory.getLogger(RecordWriter.class); private final Counter activeIcebergWriters = - Metrics.counter(RecordWriterManager.class, "activeIcebergWriters"); - private final Distribution dataFileByteSize = - Metrics.distribution(RecordWriter.class, "dataFileByteSize"); + Metrics.counter(RecordWriter.class, "activeIcebergWriters"); + private final Counter dataFilesWritten = Metrics.counter(RecordWriter.class, "dataFilesWritten"); private final DataWriter icebergDataWriter; private final Table table; private final String absoluteFilename; @@ -128,7 +126,7 @@ public void close() throws IOException { dataFile.recordCount(), dataFile.fileSizeInBytes(), absoluteFilename); - dataFileByteSize.update(dataFile.fileSizeInBytes()); + dataFilesWritten.inc(); } public long bytesWritten() { diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java index 5979e2a60131..055c8882b72c 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java @@ -21,13 +21,11 @@ import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState; import java.io.IOException; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.TimeUnit; -import org.apache.beam.sdk.metrics.Counter; -import org.apache.beam.sdk.metrics.Metrics; -import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.Preconditions; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.Row; @@ -38,17 +36,12 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps; import org.apache.iceberg.DataFile; -import org.apache.iceberg.FileFormat; import org.apache.iceberg.ManifestFile; -import org.apache.iceberg.ManifestFiles; -import org.apache.iceberg.ManifestWriter; import org.apache.iceberg.PartitionKey; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Table; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.data.Record; -import org.apache.iceberg.io.FileIO; -import org.apache.iceberg.io.OutputFile; /** * A writer that manages multiple {@link RecordWriter}s to write to multiple tables and partitions. @@ -66,19 +59,13 @@ * *

    *
  1. Close all underlying {@link RecordWriter}s - *
  2. Collect all {@link DataFile}s - *
  3. Create a new {@link ManifestFile} referencing these {@link DataFile}s + *
  4. Collect all {@link DataFile}s as {@link SerializableDataFile}s (a more Beam-friendly type) *
* - *

After closing, the resulting {@link ManifestFile}s can be retrieved using {@link - * #getManifestFiles()}. + *

After closing, the resulting {@link SerializableDataFile}s can be retrieved using {@link + * #getSerializableDataFiles()}. */ class RecordWriterManager implements AutoCloseable { - private final Counter dataFilesWritten = - Metrics.counter(RecordWriterManager.class, "dataFilesWritten"); - private final Counter manifestFilesWritten = - Metrics.counter(RecordWriterManager.class, "manifestFilesWritten"); - /** * Represents the state of one Iceberg table destination. Creates one {@link RecordWriter} per * partition and manages them in a {@link Cache}. @@ -90,11 +77,9 @@ class DestinationState { private final PartitionSpec spec; private final org.apache.iceberg.Schema schema; private final PartitionKey partitionKey; - private final String tableLocation; - private final FileIO fileIO; private final Table table; private final String stateToken = UUID.randomUUID().toString(); - private final List dataFiles = Lists.newArrayList(); + private final List dataFiles = Lists.newArrayList(); @VisibleForTesting final Cache writers; @VisibleForTesting final Map writerCounts = Maps.newHashMap(); @@ -103,8 +88,6 @@ class DestinationState { this.schema = table.schema(); this.spec = table.spec(); this.partitionKey = new PartitionKey(spec, schema); - this.tableLocation = table.location(); - this.fileIO = table.io(); this.table = table; // build a cache of RecordWriters. @@ -128,8 +111,7 @@ class DestinationState { e); } openWriters--; - dataFiles.add(recordWriter.getDataFile()); - dataFilesWritten.inc(); + dataFiles.add(SerializableDataFile.from(recordWriter.getDataFile(), pk)); }) .build(); } @@ -191,13 +173,6 @@ private RecordWriter createWriter(PartitionKey partitionKey) { e); } } - - private String getManifestFileLocation(PaneInfo paneInfo) { - return FileFormat.AVRO.addExtension( - String.format( - "%s/metadata/%s-%s-%s.manifest", - tableLocation, filePrefix, stateToken, paneInfo.getIndex())); - } } private final Catalog catalog; @@ -209,8 +184,8 @@ private String getManifestFileLocation(PaneInfo paneInfo) { @VisibleForTesting final Map, DestinationState> destinations = Maps.newHashMap(); - private final Map, List> totalManifestFiles = - Maps.newHashMap(); + private final Map, List> + totalSerializableDataFiles = Maps.newHashMap(); private boolean isClosed = false; @@ -249,7 +224,6 @@ public boolean write(WindowedValue icebergDestination, Row r public void close() throws IOException { for (Map.Entry, DestinationState> windowedDestinationAndState : destinations.entrySet()) { - WindowedValue windowedDestination = windowedDestinationAndState.getKey(); DestinationState state = windowedDestinationAndState.getValue(); // removing writers from the state's cache will trigger the logic to collect each writer's @@ -259,21 +233,8 @@ public void close() throws IOException { continue; } - OutputFile outputFile = - state.fileIO.newOutputFile(state.getManifestFileLocation(windowedDestination.getPane())); - - ManifestWriter manifestWriter; - try (ManifestWriter openWriter = ManifestFiles.write(state.spec, outputFile)) { - openWriter.addAll(state.dataFiles); - manifestWriter = openWriter; - } - ManifestFile manifestFile = manifestWriter.toManifestFile(); - manifestFilesWritten.inc(); - - totalManifestFiles - .computeIfAbsent(windowedDestination, dest -> Lists.newArrayList()) - .add(manifestFile); - + totalSerializableDataFiles.put( + windowedDestinationAndState.getKey(), new ArrayList<>(state.dataFiles)); state.dataFiles.clear(); } destinations.clear(); @@ -285,15 +246,16 @@ public void close() throws IOException { } /** - * Returns a list of accumulated windowed {@link ManifestFile}s for each windowed {@link + * Returns a list of accumulated serializable {@link DataFile}s for each windowed {@link * IcebergDestination}. The {@link RecordWriterManager} must first be closed before this is * called. */ - public Map, List> getManifestFiles() { + public Map, List> + getSerializableDataFiles() { checkState( isClosed, - "Please close this %s before retrieving its manifest files.", + "Please close this %s before retrieving its data files.", getClass().getSimpleName()); - return totalManifestFiles; + return totalSerializableDataFiles; } } diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/SerializableDataFile.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/SerializableDataFile.java new file mode 100644 index 000000000000..699d4fa4dfd0 --- /dev/null +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/SerializableDataFile.java @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.iceberg; + +import com.google.auto.value.AutoValue; +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.beam.sdk.schemas.AutoValueSchema; +import org.apache.beam.sdk.schemas.annotations.DefaultSchema; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataFiles; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Metrics; +import org.apache.iceberg.PartitionKey; +import org.apache.iceberg.PartitionSpec; +import org.checkerframework.checker.nullness.qual.Nullable; + +/** + * Serializable version of an Iceberg {@link DataFile}. + * + *

{@link DataFile} is not serializable and the Iceberg API doesn't offer an easy way to + * encode/decode it. This class is an identical version that can be used as a PCollection element + * type. + * + *

Use {@link #from(DataFile, PartitionKey)} to create a {@link SerializableDataFile} and {@link + * #createDataFile(PartitionSpec)} to reconstruct the original {@link DataFile}. + */ +@DefaultSchema(AutoValueSchema.class) +@AutoValue +abstract class SerializableDataFile { + public static Builder builder() { + return new AutoValue_SerializableDataFile.Builder(); + } + + abstract String getPath(); + + abstract String getFileFormat(); + + abstract long getRecordCount(); + + abstract long getFileSizeInBytes(); + + abstract String getPartitionPath(); + + abstract int getPartitionSpecId(); + + abstract @Nullable ByteBuffer getKeyMetadata(); + + abstract @Nullable List getSplitOffsets(); + + abstract @Nullable Map getColumnSizes(); + + abstract @Nullable Map getValueCounts(); + + abstract @Nullable Map getNullValueCounts(); + + abstract @Nullable Map getNanValueCounts(); + + abstract @Nullable Map getLowerBounds(); + + abstract @Nullable Map getUpperBounds(); + + @AutoValue.Builder + abstract static class Builder { + abstract Builder setPath(String path); + + abstract Builder setFileFormat(String fileFormat); + + abstract Builder setRecordCount(long recordCount); + + abstract Builder setFileSizeInBytes(long fileSizeInBytes); + + abstract Builder setPartitionPath(String partitionPath); + + abstract Builder setPartitionSpecId(int partitionSpec); + + abstract Builder setKeyMetadata(ByteBuffer keyMetadata); + + abstract Builder setSplitOffsets(List splitOffsets); + + abstract Builder setColumnSizes(Map columnSizes); + + abstract Builder setValueCounts(Map valueCounts); + + abstract Builder setNullValueCounts(Map nullValueCounts); + + abstract Builder setNanValueCounts(Map nanValueCounts); + + abstract Builder setLowerBounds(@Nullable Map lowerBounds); + + abstract Builder setUpperBounds(@Nullable Map upperBounds); + + abstract SerializableDataFile build(); + } + + /** + * Create a {@link SerializableDataFile} from a {@link DataFile} and its associated {@link + * PartitionKey}. + */ + static SerializableDataFile from(DataFile f, PartitionKey key) { + return SerializableDataFile.builder() + .setPath(f.path().toString()) + .setFileFormat(f.format().toString()) + .setRecordCount(f.recordCount()) + .setFileSizeInBytes(f.fileSizeInBytes()) + .setPartitionPath(key.toPath()) + .setPartitionSpecId(f.specId()) + .setKeyMetadata(f.keyMetadata()) + .setSplitOffsets(f.splitOffsets()) + .setColumnSizes(f.columnSizes()) + .setValueCounts(f.valueCounts()) + .setNullValueCounts(f.nullValueCounts()) + .setNanValueCounts(f.nanValueCounts()) + .setLowerBounds(toByteArrayMap(f.lowerBounds())) + .setUpperBounds(toByteArrayMap(f.upperBounds())) + .build(); + } + + /** + * Reconstructs the original {@link DataFile} from this {@link SerializableDataFile}. + * + *

We require an input {@link PartitionSpec} as well because there's no easy way to reconstruct + * it from Beam-compatible types. + */ + @SuppressWarnings("nullness") + DataFile createDataFile(PartitionSpec partitionSpec) { + Preconditions.checkState( + partitionSpec.specId() == getPartitionSpecId(), + "Invalid partition spec id '%s'. This DataFile was originally created with spec id '%s'.", + partitionSpec.specId(), + getPartitionSpecId()); + + Metrics dataFileMetrics = + new Metrics( + getRecordCount(), + getColumnSizes(), + getValueCounts(), + getNullValueCounts(), + getNanValueCounts(), + toByteBufferMap(getLowerBounds()), + toByteBufferMap(getUpperBounds())); + + return DataFiles.builder(partitionSpec) + .withFormat(FileFormat.fromString(getFileFormat())) + .withPath(getPath()) + .withPartitionPath(getPartitionPath()) + .withEncryptionKeyMetadata(getKeyMetadata()) + .withFileSizeInBytes(getFileSizeInBytes()) + .withMetrics(dataFileMetrics) + .withSplitOffsets(getSplitOffsets()) + .build(); + } + + // ByteBuddyUtils has trouble converting Map value type ByteBuffer + // to byte[] and back to ByteBuffer, so we perform these conversions manually + // TODO(https://github.com/apache/beam/issues/32701) + private static @Nullable Map toByteArrayMap( + @Nullable Map input) { + if (input == null) { + return null; + } + Map output = new HashMap<>(input.size()); + for (Map.Entry e : input.entrySet()) { + output.put(e.getKey(), e.getValue().array()); + } + return output; + } + + private static @Nullable Map toByteBufferMap( + @Nullable Map input) { + if (input == null) { + return null; + } + Map output = new HashMap<>(input.size()); + for (Map.Entry e : input.entrySet()) { + output.put(e.getKey(), ByteBuffer.wrap(e.getValue())); + } + return output; + } +} diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteGroupedRowsToFiles.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteGroupedRowsToFiles.java index 1926a769a6da..6a61aafbe8b9 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteGroupedRowsToFiles.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteGroupedRowsToFiles.java @@ -18,7 +18,6 @@ package org.apache.beam.sdk.io.iceberg; import java.util.List; -import java.util.UUID; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; @@ -30,7 +29,6 @@ import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.Row; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; -import org.apache.iceberg.ManifestFile; import org.apache.iceberg.catalog.Catalog; import org.checkerframework.checker.nullness.qual.MonotonicNonNull; @@ -42,11 +40,15 @@ class WriteGroupedRowsToFiles private final DynamicDestinations dynamicDestinations; private final IcebergCatalogConfig catalogConfig; + private final String filePrefix; WriteGroupedRowsToFiles( - IcebergCatalogConfig catalogConfig, DynamicDestinations dynamicDestinations) { + IcebergCatalogConfig catalogConfig, + DynamicDestinations dynamicDestinations, + String filePrefix) { this.catalogConfig = catalogConfig; this.dynamicDestinations = dynamicDestinations; + this.filePrefix = filePrefix; } @Override @@ -55,7 +57,7 @@ public PCollection expand( return input.apply( ParDo.of( new WriteGroupedRowsToFilesDoFn( - catalogConfig, dynamicDestinations, DEFAULT_MAX_BYTES_PER_FILE))); + catalogConfig, dynamicDestinations, DEFAULT_MAX_BYTES_PER_FILE, filePrefix))); } private static class WriteGroupedRowsToFilesDoFn @@ -70,10 +72,11 @@ private static class WriteGroupedRowsToFilesDoFn WriteGroupedRowsToFilesDoFn( IcebergCatalogConfig catalogConfig, DynamicDestinations dynamicDestinations, - long maxFileSize) { + long maxFileSize, + String filePrefix) { this.catalogConfig = catalogConfig; this.dynamicDestinations = dynamicDestinations; - this.filePrefix = UUID.randomUUID().toString(); + this.filePrefix = filePrefix; this.maxFileSize = maxFileSize; } @@ -105,13 +108,13 @@ public void processElement( } } - List manifestFiles = - Preconditions.checkNotNull(writer.getManifestFiles().get(windowedDestination)); - for (ManifestFile manifestFile : manifestFiles) { + List serializableDataFiles = + Preconditions.checkNotNull(writer.getSerializableDataFiles().get(windowedDestination)); + for (SerializableDataFile dataFile : serializableDataFiles) { c.output( FileWriteResult.builder() .setTableIdentifier(destination.getTableIdentifier()) - .setManifestFile(manifestFile) + .setSerializableDataFile(dataFile) .build()); } } diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteToDestinations.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteToDestinations.java index 4d03f3a3bc58..a2d0c320f58f 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteToDestinations.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteToDestinations.java @@ -19,6 +19,7 @@ import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull; +import java.util.UUID; import org.apache.beam.sdk.coders.IterableCoder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.RowCoder; @@ -50,6 +51,7 @@ class WriteToDestinations extends PTransform>, Icebe private final IcebergCatalogConfig catalogConfig; private final DynamicDestinations dynamicDestinations; private final @Nullable Duration triggeringFrequency; + private final String filePrefix; WriteToDestinations( IcebergCatalogConfig catalogConfig, @@ -58,6 +60,8 @@ class WriteToDestinations extends PTransform>, Icebe this.dynamicDestinations = dynamicDestinations; this.catalogConfig = catalogConfig; this.triggeringFrequency = triggeringFrequency; + // single unique prefix per write transform + this.filePrefix = UUID.randomUUID().toString(); } @Override @@ -97,7 +101,9 @@ private PCollection writeTriggered(PCollection> IterableCoder.of(RowCoder.of(dynamicDestinations.getDataSchema())))); return groupedRecords - .apply("WriteGroupedRows", new WriteGroupedRowsToFiles(catalogConfig, dynamicDestinations)) + .apply( + "WriteGroupedRows", + new WriteGroupedRowsToFiles(catalogConfig, dynamicDestinations, filePrefix)) // Respect user's triggering frequency before committing snapshots .apply( "ApplyUserTrigger", @@ -120,7 +126,7 @@ private PCollection writeUntriggered(PCollection writeGroupedResult = @@ -129,7 +135,7 @@ private PCollection writeUntriggered(PCollection, List> destinationAndFiles : - Preconditions.checkNotNull(recordWriterManager).getManifestFiles().entrySet()) { + + for (Map.Entry, List> + destinationAndFiles : + Preconditions.checkNotNull(recordWriterManager) + .getSerializableDataFiles() + .entrySet()) { WindowedValue windowedDestination = destinationAndFiles.getKey(); - for (ManifestFile manifestFile : destinationAndFiles.getValue()) { + for (SerializableDataFile dataFile : destinationAndFiles.getValue()) { c.output( FileWriteResult.builder() - .setManifestFile(manifestFile) + .setSerializableDataFile(dataFile) .setTableIdentifier(windowedDestination.getValue().getTableIdentifier()) .build(), windowedDestination.getTimestamp(), diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java index 1c2e8bc2c451..7adf6defe520 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java @@ -31,11 +31,10 @@ import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.Row; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; import org.apache.commons.lang3.RandomStringUtils; import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.DataFile; import org.apache.iceberg.FileFormat; -import org.apache.iceberg.ManifestFile; import org.apache.iceberg.PartitionKey; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.catalog.TableIdentifier; @@ -147,9 +146,10 @@ public void testCreateNewWriterForEachDestination() throws IOException { writerManager.close(); assertEquals(0, writerManager.openWriters); - // We should only have 3 manifest files (one for each destination we wrote to) - assertEquals(3, writerManager.getManifestFiles().keySet().size()); - assertThat(writerManager.getManifestFiles().keySet(), containsInAnyOrder(dest1, dest2, dest3)); + // We should only have 3 data files (one for each destination we wrote to) + assertEquals(3, writerManager.getSerializableDataFiles().keySet().size()); + assertThat( + writerManager.getSerializableDataFiles().keySet(), containsInAnyOrder(dest1, dest2, dest3)); } @Test @@ -195,16 +195,21 @@ public void testCreateNewWriterForEachPartition() throws IOException { assertFalse(writeSuccess); assertEquals(3, writerManager.openWriters); - // Closing PartitionRecordWriter will close all writers. + // Closing RecordWriterManager will close all writers. writerManager.close(); assertEquals(0, writerManager.openWriters); - assertEquals(1, writerManager.getManifestFiles().size()); - ManifestFile manifestFile = - Iterables.getOnlyElement(writerManager.getManifestFiles().get(windowedDestination)); - - assertEquals(3, manifestFile.addedFilesCount().intValue()); - assertEquals(4, manifestFile.addedRowsCount().intValue()); + // We should have only one destination + assertEquals(1, writerManager.getSerializableDataFiles().size()); + assertTrue(writerManager.getSerializableDataFiles().containsKey(windowedDestination)); + // We should have 3 data files (one for each partition we wrote to) + assertEquals(3, writerManager.getSerializableDataFiles().get(windowedDestination).size()); + long totalRows = 0; + for (SerializableDataFile dataFile : + writerManager.getSerializableDataFiles().get(windowedDestination)) { + totalRows += dataFile.getRecordCount(); + } + assertEquals(4L, totalRows); } @Test @@ -255,12 +260,50 @@ public void testRespectMaxFileSize() throws IOException { } @Test - public void testRequireClosingBeforeFetchingManifestFiles() { + public void testRequireClosingBeforeFetchingDataFiles() { RecordWriterManager writerManager = new RecordWriterManager(catalog, "test_file_name", 100, 2); Row row = Row.withSchema(BEAM_SCHEMA).addValues(1, "aaa", true).build(); writerManager.write(windowedDestination, row); assertEquals(1, writerManager.openWriters); - assertThrows(IllegalStateException.class, writerManager::getManifestFiles); + assertThrows(IllegalStateException.class, writerManager::getSerializableDataFiles); + } + + @Test + public void testSerializableDataFileRoundTripEquality() throws IOException { + PartitionKey partitionKey = new PartitionKey(PARTITION_SPEC, ICEBERG_SCHEMA); + + Row row = Row.withSchema(BEAM_SCHEMA).addValues(1, "abcdef", true).build(); + Row row2 = Row.withSchema(BEAM_SCHEMA).addValues(2, "abcxyz", true).build(); + // same partition for both records (name_trunc=abc, bool=true) + partitionKey.partition(IcebergUtils.beamRowToIcebergRecord(ICEBERG_SCHEMA, row)); + + RecordWriter writer = + new RecordWriter(catalog, windowedDestination.getValue(), "test_file_name", partitionKey); + writer.write(IcebergUtils.beamRowToIcebergRecord(ICEBERG_SCHEMA, row)); + writer.write(IcebergUtils.beamRowToIcebergRecord(ICEBERG_SCHEMA, row2)); + + writer.close(); + DataFile datafile = writer.getDataFile(); + assertEquals(2L, datafile.recordCount()); + + DataFile roundTripDataFile = + SerializableDataFile.from(datafile, partitionKey).createDataFile(PARTITION_SPEC); + // DataFile doesn't implement a .equals() method. Check equality manually + assertEquals(datafile.path(), roundTripDataFile.path()); + assertEquals(datafile.format(), roundTripDataFile.format()); + assertEquals(datafile.recordCount(), roundTripDataFile.recordCount()); + assertEquals(datafile.partition(), roundTripDataFile.partition()); + assertEquals(datafile.specId(), roundTripDataFile.specId()); + assertEquals(datafile.keyMetadata(), roundTripDataFile.keyMetadata()); + assertEquals(datafile.splitOffsets(), roundTripDataFile.splitOffsets()); + assertEquals(datafile.columnSizes(), roundTripDataFile.columnSizes()); + assertEquals(datafile.valueCounts(), roundTripDataFile.valueCounts()); + assertEquals(datafile.nullValueCounts(), roundTripDataFile.nullValueCounts()); + assertEquals(datafile.nanValueCounts(), roundTripDataFile.nanValueCounts()); + assertEquals(datafile.equalityFieldIds(), roundTripDataFile.equalityFieldIds()); + assertEquals(datafile.fileSequenceNumber(), roundTripDataFile.fileSequenceNumber()); + assertEquals(datafile.dataSequenceNumber(), roundTripDataFile.dataSequenceNumber()); + assertEquals(datafile.pos(), roundTripDataFile.pos()); } }