From 8eef4a3c3a3a676761e4f6748fcc933e70f6ffc0 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud <65791736+ahmedabu98@users.noreply.github.com> Date: Fri, 4 Oct 2024 20:33:06 +0300 Subject: [PATCH] [Managed Iceberg] add GiB autosharding (#32612) (#32663) * [Managed Iceberg] add GiB autosharding * trigger iceberg integration tests * fix test * add to CHANGES.md * increase GiB limits * increase GiB limits * data file size distribution metric; max file size 512mb --- CHANGES.md | 1 + .../sdk/io/iceberg/AppendFilesToTables.java | 2 +- .../sdk/io/iceberg/AssignDestinations.java | 29 ++-- .../apache/beam/sdk/io/iceberg/IcebergIO.java | 53 ++---- .../beam/sdk/io/iceberg/RecordWriter.java | 15 +- .../io/iceberg/WriteGroupedRowsToFiles.java | 4 +- .../sdk/io/iceberg/WriteToDestinations.java | 159 +++++++----------- .../io/iceberg/WriteUngroupedRowsToFiles.java | 62 ++++--- .../sdk/io/iceberg/IcebergIOWriteTest.java | 2 +- 9 files changed, 149 insertions(+), 178 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 9980643e5415..93c0e19dcba6 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -59,6 +59,7 @@ * Added support for using vLLM in the RunInference transform (Python) ([#32528](https://github.com/apache/beam/issues/32528)) * [Managed Iceberg] Added support for streaming writes ([#32451](https://github.com/apache/beam/pull/32451)) +* [Managed Iceberg] Added auto-sharding for streaming writes ([#32612](https://github.com/apache/beam/pull/32612)) * [Managed Iceberg] Added support for writing to dynamic destinations ([#32565](https://github.com/apache/beam/pull/32565)) ## New Features / Improvements diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTables.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTables.java index b26ae83f0866..b91253cf3c12 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTables.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTables.java @@ -105,7 +105,7 @@ public void processElement( } update.commit(); Snapshot snapshot = table.currentSnapshot(); - LOG.info("Created new snapshot for table '{}': {}.", element.getKey(), snapshot); + LOG.info("Created new snapshot for table '{}': {}", element.getKey(), snapshot); snapshotsCreated.inc(); out.outputWithTimestamp( KV.of(element.getKey(), SnapshotInfo.fromSnapshot(snapshot)), window.maxTimestamp()); diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AssignDestinations.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AssignDestinations.java index 37b7dbf107e6..9aba3d830234 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AssignDestinations.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AssignDestinations.java @@ -17,15 +17,15 @@ */ package org.apache.beam.sdk.io.iceberg; -import static org.apache.beam.sdk.io.iceberg.WriteToDestinations.DATA; -import static org.apache.beam.sdk.io.iceberg.WriteToDestinations.DEST; - -import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.RowCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.Row; import org.apache.beam.sdk.values.ValueInSingleWindow; @@ -37,7 +37,7 @@ *

The output record will have the format { dest: ..., data: ...} where the dest field has the * assigned metadata and the data field has the original row. */ -class AssignDestinations extends PTransform, PCollection> { +class AssignDestinations extends PTransform, PCollection>> { private final DynamicDestinations dynamicDestinations; @@ -46,34 +46,27 @@ public AssignDestinations(DynamicDestinations dynamicDestinations) { } @Override - public PCollection expand(PCollection input) { - - final Schema outputSchema = - Schema.builder() - .addStringField(DEST) - .addRowField(DATA, dynamicDestinations.getDataSchema()) - .build(); - + public PCollection> expand(PCollection input) { return input .apply( ParDo.of( - new DoFn() { + new DoFn>() { @ProcessElement public void processElement( @Element Row element, BoundedWindow window, PaneInfo paneInfo, @Timestamp Instant timestamp, - OutputReceiver out) { + OutputReceiver> out) { String tableIdentifier = dynamicDestinations.getTableStringIdentifier( ValueInSingleWindow.of(element, timestamp, window, paneInfo)); Row data = dynamicDestinations.getData(element); - out.output( - Row.withSchema(outputSchema).addValues(tableIdentifier, data).build()); + out.output(KV.of(tableIdentifier, data)); } })) - .setRowSchema(outputSchema); + .setCoder( + KvCoder.of(StringUtf8Coder.of(), RowCoder.of(dynamicDestinations.getDataSchema()))); } } diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java index 6d418ff5cffb..6321f9006e2a 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java @@ -17,7 +17,6 @@ */ package org.apache.beam.sdk.io.iceberg; -import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull; import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; import com.google.auto.value.AutoValue; @@ -28,12 +27,6 @@ import org.apache.beam.sdk.managed.Managed; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.windowing.AfterFirst; -import org.apache.beam.sdk.transforms.windowing.AfterPane; -import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime; -import org.apache.beam.sdk.transforms.windowing.GlobalWindows; -import org.apache.beam.sdk.transforms.windowing.Repeatedly; -import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.Row; @@ -288,7 +281,6 @@ public static WriteRows writeRows(IcebergCatalogConfig catalog) { @AutoValue public abstract static class WriteRows extends PTransform, IcebergWriteResult> { - private static final int TRIGGERING_RECORD_COUNT = 50_000; abstract IcebergCatalogConfig getCatalogConfig(); @@ -322,12 +314,14 @@ public WriteRows to(DynamicDestinations destinations) { } /** - * Sets the frequency at which data is committed and a new {@link org.apache.iceberg.Snapshot} - * is produced. + * Sets the frequency at which data is written to files and a new {@link + * org.apache.iceberg.Snapshot} is produced. * - *

Roughly every triggeringFrequency duration, this connector will try to accumulate all - * {@link org.apache.iceberg.ManifestFile}s and commit them to the table as appended files. Each - * commit results in a new table {@link org.apache.iceberg.Snapshot}. + *

Roughly every triggeringFrequency duration, records are written to data files and appended + * to the respective table. Each append operation created a new table snapshot. + * + *

Generally speaking, increasing this duration will result in fewer, larger data files and + * fewer snapshots. * *

This is only applicable when writing an unbounded {@link PCollection} (i.e. a streaming * pipeline). @@ -350,34 +344,13 @@ public IcebergWriteResult expand(PCollection input) { Preconditions.checkNotNull(getTableIdentifier()), input.getSchema()); } - // Assign destinations before re-windowing to global because + // Assign destinations before re-windowing to global in WriteToDestinations because // user's dynamic destination may depend on windowing properties - PCollection assignedRows = - input.apply("Set Destination Metadata", new AssignDestinations(destinations)); - - if (assignedRows.isBounded().equals(PCollection.IsBounded.UNBOUNDED)) { - Duration triggeringFrequency = getTriggeringFrequency(); - checkArgumentNotNull( - triggeringFrequency, "Streaming pipelines must set a triggering frequency."); - assignedRows = - assignedRows.apply( - "WindowIntoGlobal", - Window.into(new GlobalWindows()) - .triggering( - Repeatedly.forever( - AfterFirst.of( - AfterProcessingTime.pastFirstElementInPane() - .plusDelayOf(triggeringFrequency), - AfterPane.elementCountAtLeast(TRIGGERING_RECORD_COUNT)))) - .discardingFiredPanes()); - } else { - Preconditions.checkArgument( - getTriggeringFrequency() == null, - "Triggering frequency is only applicable for streaming pipelines."); - } - return assignedRows.apply( - "Write Rows to Destinations", - new WriteToDestinations(getCatalogConfig(), destinations, getTriggeringFrequency())); + return input + .apply("Assign Table Destinations", new AssignDestinations(destinations)) + .apply( + "Write Rows to Destinations", + new WriteToDestinations(getCatalogConfig(), destinations, getTriggeringFrequency())); } } diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriter.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriter.java index 1434400563bb..92b5dd58b51e 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriter.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriter.java @@ -19,6 +19,7 @@ import java.io.IOException; import org.apache.beam.sdk.metrics.Counter; +import org.apache.beam.sdk.metrics.Distribution; import org.apache.beam.sdk.metrics.Metrics; import org.apache.iceberg.DataFile; import org.apache.iceberg.FileFormat; @@ -38,6 +39,8 @@ class RecordWriter { private static final Logger LOG = LoggerFactory.getLogger(RecordWriter.class); private final Counter activeIcebergWriters = Metrics.counter(RecordWriterManager.class, "activeIcebergWriters"); + private final Distribution dataFileByteSize = + Metrics.distribution(RecordWriter.class, "dataFileByteSize"); private final DataWriter icebergDataWriter; private final Table table; private final String absoluteFilename; @@ -95,7 +98,7 @@ class RecordWriter { } activeIcebergWriters.inc(); LOG.info( - "Opened {} writer for table {}, partition {}. Writing to path: {}", + "Opened {} writer for table '{}', partition {}. Writing to path: {}", fileFormat, table.name(), partitionKey, @@ -117,7 +120,15 @@ public void close() throws IOException { e); } activeIcebergWriters.dec(); - LOG.info("Closed {} writer for table {}, path: {}", fileFormat, table.name(), absoluteFilename); + DataFile dataFile = icebergDataWriter.toDataFile(); + LOG.info( + "Closed {} writer for table '{}' ({} records, {} bytes), path: {}", + fileFormat, + table.name(), + dataFile.recordCount(), + dataFile.fileSizeInBytes(), + absoluteFilename); + dataFileByteSize.update(dataFile.fileSizeInBytes()); } public long bytesWritten() { diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteGroupedRowsToFiles.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteGroupedRowsToFiles.java index 0bc18ffcf421..1926a769a6da 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteGroupedRowsToFiles.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteGroupedRowsToFiles.java @@ -24,11 +24,11 @@ import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.util.ShardedKey; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.Row; -import org.apache.beam.sdk.values.ShardedKey; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; import org.apache.iceberg.ManifestFile; import org.apache.iceberg.catalog.Catalog; @@ -38,7 +38,7 @@ class WriteGroupedRowsToFiles extends PTransform< PCollection, Iterable>>, PCollection> { - static final long DEFAULT_MAX_BYTES_PER_FILE = (1L << 40); // 1TB + private static final long DEFAULT_MAX_BYTES_PER_FILE = (1L << 29); // 512mb private final DynamicDestinations dynamicDestinations; private final IcebergCatalogConfig catalogConfig; diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteToDestinations.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteToDestinations.java index 60d23f2dd394..4d03f3a3bc58 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteToDestinations.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteToDestinations.java @@ -19,39 +19,34 @@ import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull; -import java.util.concurrent.ThreadLocalRandom; +import org.apache.beam.sdk.coders.IterableCoder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.RowCoder; -import org.apache.beam.sdk.coders.ShardedKeyCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.transforms.Flatten; import org.apache.beam.sdk.transforms.GroupByKey; -import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.GroupIntoBatches; import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.SimpleFunction; -import org.apache.beam.sdk.transforms.windowing.AfterPane; import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime; import org.apache.beam.sdk.transforms.windowing.GlobalWindows; import org.apache.beam.sdk.transforms.windowing.Repeatedly; import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.util.ShardedKey; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionList; import org.apache.beam.sdk.values.Row; -import org.apache.beam.sdk.values.ShardedKey; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Duration; -class WriteToDestinations extends PTransform, IcebergWriteResult> { +class WriteToDestinations extends PTransform>, IcebergWriteResult> { - static final long DEFAULT_MAX_BYTES_PER_FILE = (1L << 40); // 1TB + // Used for auto-sharding in streaming. Limits number of records per batch/file + private static final int FILE_TRIGGERING_RECORD_COUNT = 500_000; + // Used for auto-sharding in streaming. Limits total byte size per batch/file + public static final int FILE_TRIGGERING_BYTE_COUNT = 1 << 30; // 1GiB static final int DEFAULT_NUM_FILE_SHARDS = 0; - // constant field names representing table identifier string and the record - static final String DEST = "dest"; - static final String DATA = "data"; - private final IcebergCatalogConfig catalogConfig; private final DynamicDestinations dynamicDestinations; private final @Nullable Duration triggeringFrequency; @@ -66,7 +61,58 @@ class WriteToDestinations extends PTransform, IcebergWriteResul } @Override - public IcebergWriteResult expand(PCollection input) { + public IcebergWriteResult expand(PCollection> input) { + // Write records to files + PCollection writtenFiles = + input.isBounded().equals(PCollection.IsBounded.UNBOUNDED) + ? writeTriggered(input) + : writeUntriggered(input); + + // Commit files to tables + PCollection> snapshots = + writtenFiles.apply(new AppendFilesToTables(catalogConfig)); + + return new IcebergWriteResult(input.getPipeline(), snapshots); + } + + private PCollection writeTriggered(PCollection> input) { + checkArgumentNotNull( + triggeringFrequency, "Streaming pipelines must set a triggering frequency."); + + // Group records into batches to avoid writing thousands of small files + PCollection, Iterable>> groupedRecords = + input + .apply("WindowIntoGlobal", Window.into(new GlobalWindows())) + // We rely on GroupIntoBatches to group and parallelize records properly, + // respecting our thresholds for number of records and bytes per batch. + // Each output batch will be written to a file. + .apply( + GroupIntoBatches.ofSize(FILE_TRIGGERING_RECORD_COUNT) + .withByteSize(FILE_TRIGGERING_BYTE_COUNT) + .withMaxBufferingDuration(checkArgumentNotNull(triggeringFrequency)) + .withShardedKey()) + .setCoder( + KvCoder.of( + org.apache.beam.sdk.util.ShardedKey.Coder.of(StringUtf8Coder.of()), + IterableCoder.of(RowCoder.of(dynamicDestinations.getDataSchema())))); + + return groupedRecords + .apply("WriteGroupedRows", new WriteGroupedRowsToFiles(catalogConfig, dynamicDestinations)) + // Respect user's triggering frequency before committing snapshots + .apply( + "ApplyUserTrigger", + Window.into(new GlobalWindows()) + .triggering( + Repeatedly.forever( + AfterProcessingTime.pastFirstElementInPane() + .plusDelayOf(checkArgumentNotNull(triggeringFrequency)))) + .discardingFiredPanes()); + } + + private PCollection writeUntriggered(PCollection> input) { + Preconditions.checkArgument( + triggeringFrequency == null, + "Triggering frequency is only applicable for streaming pipelines."); // First, attempt to write directly to files without shuffling. If there are // too many distinct destinations in a single bundle, the remaining @@ -76,94 +122,17 @@ public IcebergWriteResult expand(PCollection input) { "Fast-path write rows", new WriteUngroupedRowsToFiles(catalogConfig, dynamicDestinations)); - // Then write the rest by shuffling on the destination metadata - Preconditions.checkState( - writeUngroupedResult.getSpilledRows().getSchema().hasField(DEST), - "Input schema missing `%s` field.", - DEST); - Schema dataSchema = - checkArgumentNotNull( - writeUngroupedResult - .getSpilledRows() - .getSchema() - .getField(DATA) - .getType() - .getRowSchema(), - "Input schema missing `%s` field", - DATA); - + // Then write the rest by shuffling on the destination PCollection writeGroupedResult = writeUngroupedResult .getSpilledRows() - .apply( - "Key by destination and shard", - MapElements.via( - new SimpleFunction, Row>>() { - private static final int SPILLED_ROWS_SHARDING_FACTOR = 10; - private int shardNumber = - ThreadLocalRandom.current().nextInt(SPILLED_ROWS_SHARDING_FACTOR); - - @Override - public KV, Row> apply(Row elem) { - Row data = - checkArgumentNotNull( - elem.getRow(DATA), "Element missing `%s` field", DATA); - String dest = - checkArgumentNotNull( - elem.getString(DEST), "Element missing `%s` field", DEST); - return KV.of( - ShardedKey.of(dest, ++shardNumber % SPILLED_ROWS_SHARDING_FACTOR), - data); - } - })) - .setCoder(KvCoder.of(ShardedKeyCoder.of(StringUtf8Coder.of()), RowCoder.of(dataSchema))) .apply("Group spilled rows by destination shard", GroupByKey.create()) .apply( "Write remaining rows to files", new WriteGroupedRowsToFiles(catalogConfig, dynamicDestinations)); - PCollection writeUngroupedResultPColl = writeUngroupedResult.getWrittenFiles(); - - if (input.isBounded().equals(PCollection.IsBounded.UNBOUNDED)) { - // for streaming pipelines, re-window both outputs to keep Flatten happy - writeGroupedResult = - writeGroupedResult.apply( - "RewindowGroupedRecords", - Window.into(new GlobalWindows()) - .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1))) - .discardingFiredPanes()); - writeUngroupedResultPColl = - writeUngroupedResultPColl.apply( - "RewindowUnGroupedRecords", - Window.into(new GlobalWindows()) - .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1))) - .discardingFiredPanes()); - } - - PCollection allWrittenFiles = - PCollectionList.of(writeUngroupedResultPColl) - .and(writeGroupedResult) - .apply("Flatten Written Files", Flatten.pCollections()); - - if (input.isBounded().equals(PCollection.IsBounded.UNBOUNDED)) { - checkArgumentNotNull( - triggeringFrequency, "Streaming pipelines must set a triggering frequency."); - // apply the user's trigger before we start committing and creating snapshots - allWrittenFiles = - allWrittenFiles.apply( - "ApplyUserTrigger", - Window.into(new GlobalWindows()) - .triggering( - Repeatedly.forever( - AfterProcessingTime.pastFirstElementInPane() - .plusDelayOf(checkArgumentNotNull(triggeringFrequency)))) - .discardingFiredPanes()); - } - - // Apply any sharded writes and flatten everything for catalog updates - PCollection> snapshots = - allWrittenFiles.apply(new AppendFilesToTables(catalogConfig)); - - return new IcebergWriteResult(input.getPipeline(), snapshots); + return PCollectionList.of(writeUngroupedResult.getWrittenFiles()) + .and(writeGroupedResult) + .apply("Flatten Written Files", Flatten.pCollections()); } } diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteUngroupedRowsToFiles.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteUngroupedRowsToFiles.java index 1982c7fcbad0..3b2308fca89a 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteUngroupedRowsToFiles.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteUngroupedRowsToFiles.java @@ -17,20 +17,23 @@ */ package org.apache.beam.sdk.io.iceberg; -import static org.apache.beam.sdk.io.iceberg.WriteToDestinations.DATA; -import static org.apache.beam.sdk.io.iceberg.WriteToDestinations.DEST; -import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull; - +import java.nio.ByteBuffer; import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.concurrent.ThreadLocalRandom; import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.RowCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.util.ShardedKey; import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.PInput; @@ -55,7 +58,7 @@ * written via another method. */ class WriteUngroupedRowsToFiles - extends PTransform, WriteUngroupedRowsToFiles.Result> { + extends PTransform>, WriteUngroupedRowsToFiles.Result> { /** * Maximum number of writers that will be created per bundle. Any elements requiring more writers @@ -67,7 +70,8 @@ class WriteUngroupedRowsToFiles private static final TupleTag WRITTEN_FILES_TAG = new TupleTag<>("writtenFiles"); private static final TupleTag WRITTEN_ROWS_TAG = new TupleTag("writtenRows") {}; - private static final TupleTag SPILLED_ROWS_TAG = new TupleTag("spilledRows") {}; + private static final TupleTag, Row>> SPILLED_ROWS_TAG = + new TupleTag, Row>>("spilledRows") {}; private final String filePrefix; private final DynamicDestinations dynamicDestinations; @@ -81,7 +85,7 @@ class WriteUngroupedRowsToFiles } @Override - public Result expand(PCollection input) { + public Result expand(PCollection> input) { PCollectionTuple resultTuple = input.apply( @@ -99,8 +103,15 @@ public Result expand(PCollection input) { return new Result( input.getPipeline(), resultTuple.get(WRITTEN_FILES_TAG), - resultTuple.get(WRITTEN_ROWS_TAG).setCoder(input.getCoder()), - resultTuple.get(SPILLED_ROWS_TAG).setCoder(input.getCoder())); + resultTuple + .get(WRITTEN_ROWS_TAG) + .setCoder(RowCoder.of(dynamicDestinations.getDataSchema())), + resultTuple + .get(SPILLED_ROWS_TAG) + .setCoder( + KvCoder.of( + ShardedKey.Coder.of(StringUtf8Coder.of()), + RowCoder.of(dynamicDestinations.getDataSchema())))); } /** @@ -111,14 +122,14 @@ static class Result implements POutput { private final Pipeline pipeline; private final PCollection writtenRows; - private final PCollection spilledRows; + private final PCollection, Row>> spilledRows; private final PCollection writtenFiles; private Result( Pipeline pipeline, PCollection writtenFiles, PCollection writtenRows, - PCollection spilledRows) { + PCollection, Row>> spilledRows) { this.pipeline = pipeline; this.writtenFiles = writtenFiles; this.writtenRows = writtenRows; @@ -129,7 +140,7 @@ public PCollection getWrittenRows() { return writtenRows; } - public PCollection getSpilledRows() { + public PCollection, Row>> getSpilledRows() { return spilledRows; } @@ -170,8 +181,11 @@ public void finishSpecifyingOutput( *

  • the spilled records which were not written * */ - private static class WriteUngroupedRowsToFilesDoFn extends DoFn { + private static class WriteUngroupedRowsToFilesDoFn + extends DoFn, FileWriteResult> { + // When we spill records, shard the output keys to prevent hotspots. + private static final int SPILLED_RECORD_SHARDING_FACTOR = 10; private final String filename; private final int maxWritersPerBundle; private final long maxFileSize; @@ -179,6 +193,7 @@ private static class WriteUngroupedRowsToFilesDoFn extends DoFn element, + BoundedWindow window, + PaneInfo pane, + MultiOutputReceiver out) throws Exception { - String dest = - checkArgumentNotNull(element.getString(DEST), "Input row missing `%s` field.", DEST); - Row data = - checkArgumentNotNull(element.getRow(DATA), "Input row missing `data` field.", DATA); + String dest = element.getKey(); + Row data = element.getValue(); IcebergDestination destination = dynamicDestinations.instantiateDestination(dest); WindowedValue windowedDestination = WindowedValue.of(destination, window.maxTimestamp(), window, pane); @@ -232,7 +249,14 @@ public void processElement( } throw e; } - out.get(writeSuccess ? WRITTEN_ROWS_TAG : SPILLED_ROWS_TAG).output(element); + + if (writeSuccess) { + out.get(WRITTEN_ROWS_TAG).output(data); + } else { + ByteBuffer buffer = ByteBuffer.allocate(Integer.BYTES); + buffer.putInt(++spilledShardNumber % SPILLED_RECORD_SHARDING_FACTOR); + out.get(SPILLED_ROWS_TAG).output(KV.of(ShardedKey.of(dest, buffer.array()), data)); + } } @FinishBundle diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOWriteTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOWriteTest.java index 2f81db671dd7..e62c22be7968 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOWriteTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOWriteTest.java @@ -362,7 +362,7 @@ public void testStreamingWrite() { .getSnapshots(); // verify that 2 snapshots are created (one per triggering interval) PCollection snapshots = output.apply(Count.globally()); - PAssert.that(snapshots).containsInAnyOrder(1L, 1L); + PAssert.that(snapshots).containsInAnyOrder(2L); testPipeline.run().waitUntilFinish(); List writtenRecords = ImmutableList.copyOf(IcebergGenerics.read(table).build());