Skip to content

Commit

Permalink
[Managed Iceberg] Make manifest file writes and commits more efficient (
Browse files Browse the repository at this point in the history
#32666)

* group all data files before writing a manifest file

* add to changes md

* add data file roundtrip equality test

* address comments
  • Loading branch information
ahmedabu98 authored Oct 8, 2024
1 parent 41bfd79 commit c9aa996
Show file tree
Hide file tree
Showing 12 changed files with 341 additions and 119 deletions.
2 changes: 1 addition & 1 deletion .github/trigger_files/IO_Iceberg_Integration_Tests.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run",
"modification": 2
"modification": 3
}
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@
* Significantly improved performance of Kafka IO reads that enable [commitOffsetsInFinalize](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#commitOffsetsInFinalize--) by removing the data reshuffle from SDF implementation. ([#31682](https://github.com/apache/beam/pull/31682)).
* Added support for dynamic writing in MqttIO (Java) ([#19376](https://github.com/apache/beam/issues/19376))
* Optimized Spark Runner parDo transform evaluator (Java) ([#32537](https://github.com/apache/beam/issues/32537))
* [Managed Iceberg] More efficient manifest file writes/commits ([#32666](https://github.com/apache/beam/issues/32666))

## Breaking Changes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Distribution;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
Expand All @@ -31,6 +32,7 @@
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
Expand Down Expand Up @@ -73,6 +75,12 @@ private static class AppendFilesToTablesDoFn
extends DoFn<KV<String, Iterable<FileWriteResult>>, KV<String, SnapshotInfo>> {
private final Counter snapshotsCreated =
Metrics.counter(AppendFilesToTables.class, "snapshotsCreated");
private final Counter dataFilesCommitted =
Metrics.counter(AppendFilesToTables.class, "dataFilesCommitted");
private final Distribution committedDataFileByteSize =
Metrics.distribution(RecordWriter.class, "committedDataFileByteSize");
private final Distribution committedDataFileRecordCount =
Metrics.distribution(RecordWriter.class, "committedDataFileRecordCount");

private final IcebergCatalogConfig catalogConfig;

Expand All @@ -94,18 +102,28 @@ public void processElement(
@Element KV<String, Iterable<FileWriteResult>> element,
OutputReceiver<KV<String, SnapshotInfo>> out,
BoundedWindow window) {
if (!element.getValue().iterator().hasNext()) {
String tableStringIdentifier = element.getKey();
Iterable<FileWriteResult> fileWriteResults = element.getValue();
if (!fileWriteResults.iterator().hasNext()) {
return;
}

Table table = getCatalog().loadTable(TableIdentifier.parse(element.getKey()));
AppendFiles update = table.newAppend();
for (FileWriteResult writtenFile : element.getValue()) {
update.appendManifest(writtenFile.getManifestFile());
long numFiles = 0;
for (FileWriteResult result : fileWriteResults) {
DataFile dataFile = result.getDataFile(table.spec());
update.appendFile(dataFile);
committedDataFileByteSize.update(dataFile.fileSizeInBytes());
committedDataFileRecordCount.update(dataFile.recordCount());
numFiles++;
}
// this commit will create a ManifestFile. we don't need to manually create one.
update.commit();
dataFilesCommitted.inc(numFiles);

Snapshot snapshot = table.currentSnapshot();
LOG.info("Created new snapshot for table '{}': {}", element.getKey(), snapshot);
LOG.info("Created new snapshot for table '{}': {}", tableStringIdentifier, snapshot);
snapshotsCreated.inc();
out.outputWithTimestamp(
KV.of(element.getKey(), SnapshotInfo.fromSnapshot(snapshot)), window.maxTimestamp());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,11 @@
package org.apache.beam.sdk.io.iceberg;

import com.google.auto.value.AutoValue;
import java.io.IOException;
import org.apache.beam.sdk.schemas.AutoValueSchema;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.sdk.schemas.annotations.SchemaIgnore;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.ManifestFiles;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.catalog.TableIdentifier;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;

Expand All @@ -32,12 +31,11 @@
abstract class FileWriteResult {

private transient @MonotonicNonNull TableIdentifier cachedTableIdentifier;
private transient @MonotonicNonNull ManifestFile cachedManifestFile;
private transient @MonotonicNonNull DataFile cachedDataFile;

abstract String getTableIdentifierString();

@SuppressWarnings("mutable")
abstract byte[] getManifestFileBytes();
abstract SerializableDataFile getSerializableDataFile();

@SchemaIgnore
public TableIdentifier getTableIdentifier() {
Expand All @@ -48,15 +46,11 @@ public TableIdentifier getTableIdentifier() {
}

@SchemaIgnore
public ManifestFile getManifestFile() {
if (cachedManifestFile == null) {
try {
cachedManifestFile = ManifestFiles.decode(getManifestFileBytes());
} catch (IOException exc) {
throw new RuntimeException("Error decoding manifest file bytes");
}
public DataFile getDataFile(PartitionSpec spec) {
if (cachedDataFile == null) {
cachedDataFile = getSerializableDataFile().createDataFile(spec);
}
return cachedManifestFile;
return cachedDataFile;
}

public static Builder builder() {
Expand All @@ -68,18 +62,13 @@ abstract static class Builder {

abstract Builder setTableIdentifierString(String tableIdString);

abstract Builder setManifestFileBytes(byte[] manifestFileBytes);
abstract Builder setSerializableDataFile(SerializableDataFile dataFile);

@SchemaIgnore
public Builder setTableIdentifier(TableIdentifier tableId) {
return setTableIdentifierString(tableId.toString());
}

@SchemaIgnore
public Builder setManifestFile(ManifestFile manifestFile) throws IOException {
return setManifestFileBytes(ManifestFiles.encode(manifestFile));
}

public abstract FileWriteResult build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ public WriteRows to(DynamicDestinations destinations) {
* org.apache.iceberg.Snapshot} is produced.
*
* <p>Roughly every triggeringFrequency duration, records are written to data files and appended
* to the respective table. Each append operation created a new table snapshot.
* to the respective table. Each append operation creates a new table snapshot.
*
* <p>Generally speaking, increasing this duration will result in fewer, larger data files and
* fewer snapshots.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import java.io.IOException;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Distribution;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileFormat;
Expand All @@ -38,9 +37,8 @@
class RecordWriter {
private static final Logger LOG = LoggerFactory.getLogger(RecordWriter.class);
private final Counter activeIcebergWriters =
Metrics.counter(RecordWriterManager.class, "activeIcebergWriters");
private final Distribution dataFileByteSize =
Metrics.distribution(RecordWriter.class, "dataFileByteSize");
Metrics.counter(RecordWriter.class, "activeIcebergWriters");
private final Counter dataFilesWritten = Metrics.counter(RecordWriter.class, "dataFilesWritten");
private final DataWriter<Record> icebergDataWriter;
private final Table table;
private final String absoluteFilename;
Expand Down Expand Up @@ -128,7 +126,7 @@ public void close() throws IOException {
dataFile.recordCount(),
dataFile.fileSizeInBytes(),
absoluteFilename);
dataFileByteSize.update(dataFile.fileSizeInBytes());
dataFilesWritten.inc();
}

public long bytesWritten() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,11 @@
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.Preconditions;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.Row;
Expand All @@ -38,17 +36,12 @@
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.ManifestFiles;
import org.apache.iceberg.ManifestWriter;
import org.apache.iceberg.PartitionKey;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.OutputFile;

/**
* A writer that manages multiple {@link RecordWriter}s to write to multiple tables and partitions.
Expand All @@ -66,19 +59,13 @@
*
* <ol>
* <li>Close all underlying {@link RecordWriter}s
* <li>Collect all {@link DataFile}s
* <li>Create a new {@link ManifestFile} referencing these {@link DataFile}s
* <li>Collect all {@link DataFile}s as {@link SerializableDataFile}s (a more Beam-friendly type)
* </ol>
*
* <p>After closing, the resulting {@link ManifestFile}s can be retrieved using {@link
* #getManifestFiles()}.
* <p>After closing, the resulting {@link SerializableDataFile}s can be retrieved using {@link
* #getSerializableDataFiles()}.
*/
class RecordWriterManager implements AutoCloseable {
private final Counter dataFilesWritten =
Metrics.counter(RecordWriterManager.class, "dataFilesWritten");
private final Counter manifestFilesWritten =
Metrics.counter(RecordWriterManager.class, "manifestFilesWritten");

/**
* Represents the state of one Iceberg table destination. Creates one {@link RecordWriter} per
* partition and manages them in a {@link Cache}.
Expand All @@ -90,11 +77,9 @@ class DestinationState {
private final PartitionSpec spec;
private final org.apache.iceberg.Schema schema;
private final PartitionKey partitionKey;
private final String tableLocation;
private final FileIO fileIO;
private final Table table;
private final String stateToken = UUID.randomUUID().toString();
private final List<DataFile> dataFiles = Lists.newArrayList();
private final List<SerializableDataFile> dataFiles = Lists.newArrayList();
@VisibleForTesting final Cache<PartitionKey, RecordWriter> writers;
@VisibleForTesting final Map<PartitionKey, Integer> writerCounts = Maps.newHashMap();

Expand All @@ -103,8 +88,6 @@ class DestinationState {
this.schema = table.schema();
this.spec = table.spec();
this.partitionKey = new PartitionKey(spec, schema);
this.tableLocation = table.location();
this.fileIO = table.io();
this.table = table;

// build a cache of RecordWriters.
Expand All @@ -128,8 +111,7 @@ class DestinationState {
e);
}
openWriters--;
dataFiles.add(recordWriter.getDataFile());
dataFilesWritten.inc();
dataFiles.add(SerializableDataFile.from(recordWriter.getDataFile(), pk));
})
.build();
}
Expand Down Expand Up @@ -191,13 +173,6 @@ private RecordWriter createWriter(PartitionKey partitionKey) {
e);
}
}

private String getManifestFileLocation(PaneInfo paneInfo) {
return FileFormat.AVRO.addExtension(
String.format(
"%s/metadata/%s-%s-%s.manifest",
tableLocation, filePrefix, stateToken, paneInfo.getIndex()));
}
}

private final Catalog catalog;
Expand All @@ -209,8 +184,8 @@ private String getManifestFileLocation(PaneInfo paneInfo) {
@VisibleForTesting
final Map<WindowedValue<IcebergDestination>, DestinationState> destinations = Maps.newHashMap();

private final Map<WindowedValue<IcebergDestination>, List<ManifestFile>> totalManifestFiles =
Maps.newHashMap();
private final Map<WindowedValue<IcebergDestination>, List<SerializableDataFile>>
totalSerializableDataFiles = Maps.newHashMap();

private boolean isClosed = false;

Expand Down Expand Up @@ -249,7 +224,6 @@ public boolean write(WindowedValue<IcebergDestination> icebergDestination, Row r
public void close() throws IOException {
for (Map.Entry<WindowedValue<IcebergDestination>, DestinationState>
windowedDestinationAndState : destinations.entrySet()) {
WindowedValue<IcebergDestination> windowedDestination = windowedDestinationAndState.getKey();
DestinationState state = windowedDestinationAndState.getValue();

// removing writers from the state's cache will trigger the logic to collect each writer's
Expand All @@ -259,21 +233,8 @@ public void close() throws IOException {
continue;
}

OutputFile outputFile =
state.fileIO.newOutputFile(state.getManifestFileLocation(windowedDestination.getPane()));

ManifestWriter<DataFile> manifestWriter;
try (ManifestWriter<DataFile> openWriter = ManifestFiles.write(state.spec, outputFile)) {
openWriter.addAll(state.dataFiles);
manifestWriter = openWriter;
}
ManifestFile manifestFile = manifestWriter.toManifestFile();
manifestFilesWritten.inc();

totalManifestFiles
.computeIfAbsent(windowedDestination, dest -> Lists.newArrayList())
.add(manifestFile);

totalSerializableDataFiles.put(
windowedDestinationAndState.getKey(), new ArrayList<>(state.dataFiles));
state.dataFiles.clear();
}
destinations.clear();
Expand All @@ -285,15 +246,16 @@ public void close() throws IOException {
}

/**
* Returns a list of accumulated windowed {@link ManifestFile}s for each windowed {@link
* Returns a list of accumulated serializable {@link DataFile}s for each windowed {@link
* IcebergDestination}. The {@link RecordWriterManager} must first be closed before this is
* called.
*/
public Map<WindowedValue<IcebergDestination>, List<ManifestFile>> getManifestFiles() {
public Map<WindowedValue<IcebergDestination>, List<SerializableDataFile>>
getSerializableDataFiles() {
checkState(
isClosed,
"Please close this %s before retrieving its manifest files.",
"Please close this %s before retrieving its data files.",
getClass().getSimpleName());
return totalManifestFiles;
return totalSerializableDataFiles;
}
}
Loading

0 comments on commit c9aa996

Please sign in to comment.