diff --git a/CHANGES.md b/CHANGES.md
index 9980643e5415..93c0e19dcba6 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -59,6 +59,7 @@
* Added support for using vLLM in the RunInference transform (Python) ([#32528](https://github.com/apache/beam/issues/32528))
* [Managed Iceberg] Added support for streaming writes ([#32451](https://github.com/apache/beam/pull/32451))
+* [Managed Iceberg] Added auto-sharding for streaming writes ([#32612](https://github.com/apache/beam/pull/32612))
* [Managed Iceberg] Added support for writing to dynamic destinations ([#32565](https://github.com/apache/beam/pull/32565))
## New Features / Improvements
diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTables.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTables.java
index b26ae83f0866..b91253cf3c12 100644
--- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTables.java
+++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTables.java
@@ -105,7 +105,7 @@ public void processElement(
}
update.commit();
Snapshot snapshot = table.currentSnapshot();
- LOG.info("Created new snapshot for table '{}': {}.", element.getKey(), snapshot);
+ LOG.info("Created new snapshot for table '{}': {}", element.getKey(), snapshot);
snapshotsCreated.inc();
out.outputWithTimestamp(
KV.of(element.getKey(), SnapshotInfo.fromSnapshot(snapshot)), window.maxTimestamp());
diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AssignDestinations.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AssignDestinations.java
index 37b7dbf107e6..9aba3d830234 100644
--- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AssignDestinations.java
+++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AssignDestinations.java
@@ -17,15 +17,15 @@
*/
package org.apache.beam.sdk.io.iceberg;
-import static org.apache.beam.sdk.io.iceberg.WriteToDestinations.DATA;
-import static org.apache.beam.sdk.io.iceberg.WriteToDestinations.DEST;
-
-import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.RowCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.ValueInSingleWindow;
@@ -37,7 +37,7 @@
*
The output record will have the format { dest: ..., data: ...} where the dest field has the
* assigned metadata and the data field has the original row.
*/
-class AssignDestinations extends PTransform, PCollection> {
+class AssignDestinations extends PTransform, PCollection>> {
private final DynamicDestinations dynamicDestinations;
@@ -46,34 +46,27 @@ public AssignDestinations(DynamicDestinations dynamicDestinations) {
}
@Override
- public PCollection expand(PCollection input) {
-
- final Schema outputSchema =
- Schema.builder()
- .addStringField(DEST)
- .addRowField(DATA, dynamicDestinations.getDataSchema())
- .build();
-
+ public PCollection> expand(PCollection input) {
return input
.apply(
ParDo.of(
- new DoFn() {
+ new DoFn>() {
@ProcessElement
public void processElement(
@Element Row element,
BoundedWindow window,
PaneInfo paneInfo,
@Timestamp Instant timestamp,
- OutputReceiver out) {
+ OutputReceiver> out) {
String tableIdentifier =
dynamicDestinations.getTableStringIdentifier(
ValueInSingleWindow.of(element, timestamp, window, paneInfo));
Row data = dynamicDestinations.getData(element);
- out.output(
- Row.withSchema(outputSchema).addValues(tableIdentifier, data).build());
+ out.output(KV.of(tableIdentifier, data));
}
}))
- .setRowSchema(outputSchema);
+ .setCoder(
+ KvCoder.of(StringUtf8Coder.of(), RowCoder.of(dynamicDestinations.getDataSchema())));
}
}
diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java
index 6d418ff5cffb..6321f9006e2a 100644
--- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java
+++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java
@@ -17,7 +17,6 @@
*/
package org.apache.beam.sdk.io.iceberg;
-import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
import com.google.auto.value.AutoValue;
@@ -28,12 +27,6 @@
import org.apache.beam.sdk.managed.Managed;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.windowing.AfterFirst;
-import org.apache.beam.sdk.transforms.windowing.AfterPane;
-import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
-import org.apache.beam.sdk.transforms.windowing.Repeatedly;
-import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
@@ -288,7 +281,6 @@ public static WriteRows writeRows(IcebergCatalogConfig catalog) {
@AutoValue
public abstract static class WriteRows extends PTransform, IcebergWriteResult> {
- private static final int TRIGGERING_RECORD_COUNT = 50_000;
abstract IcebergCatalogConfig getCatalogConfig();
@@ -322,12 +314,14 @@ public WriteRows to(DynamicDestinations destinations) {
}
/**
- * Sets the frequency at which data is committed and a new {@link org.apache.iceberg.Snapshot}
- * is produced.
+ * Sets the frequency at which data is written to files and a new {@link
+ * org.apache.iceberg.Snapshot} is produced.
*
- * Roughly every triggeringFrequency duration, this connector will try to accumulate all
- * {@link org.apache.iceberg.ManifestFile}s and commit them to the table as appended files. Each
- * commit results in a new table {@link org.apache.iceberg.Snapshot}.
+ *
Roughly every triggeringFrequency duration, records are written to data files and appended
+ * to the respective table. Each append operation created a new table snapshot.
+ *
+ *
Generally speaking, increasing this duration will result in fewer, larger data files and
+ * fewer snapshots.
*
*
This is only applicable when writing an unbounded {@link PCollection} (i.e. a streaming
* pipeline).
@@ -350,34 +344,13 @@ public IcebergWriteResult expand(PCollection input) {
Preconditions.checkNotNull(getTableIdentifier()), input.getSchema());
}
- // Assign destinations before re-windowing to global because
+ // Assign destinations before re-windowing to global in WriteToDestinations because
// user's dynamic destination may depend on windowing properties
- PCollection assignedRows =
- input.apply("Set Destination Metadata", new AssignDestinations(destinations));
-
- if (assignedRows.isBounded().equals(PCollection.IsBounded.UNBOUNDED)) {
- Duration triggeringFrequency = getTriggeringFrequency();
- checkArgumentNotNull(
- triggeringFrequency, "Streaming pipelines must set a triggering frequency.");
- assignedRows =
- assignedRows.apply(
- "WindowIntoGlobal",
- Window.into(new GlobalWindows())
- .triggering(
- Repeatedly.forever(
- AfterFirst.of(
- AfterProcessingTime.pastFirstElementInPane()
- .plusDelayOf(triggeringFrequency),
- AfterPane.elementCountAtLeast(TRIGGERING_RECORD_COUNT))))
- .discardingFiredPanes());
- } else {
- Preconditions.checkArgument(
- getTriggeringFrequency() == null,
- "Triggering frequency is only applicable for streaming pipelines.");
- }
- return assignedRows.apply(
- "Write Rows to Destinations",
- new WriteToDestinations(getCatalogConfig(), destinations, getTriggeringFrequency()));
+ return input
+ .apply("Assign Table Destinations", new AssignDestinations(destinations))
+ .apply(
+ "Write Rows to Destinations",
+ new WriteToDestinations(getCatalogConfig(), destinations, getTriggeringFrequency()));
}
}
diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriter.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriter.java
index 1434400563bb..92b5dd58b51e 100644
--- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriter.java
+++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriter.java
@@ -19,6 +19,7 @@
import java.io.IOException;
import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Distribution;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileFormat;
@@ -38,6 +39,8 @@ class RecordWriter {
private static final Logger LOG = LoggerFactory.getLogger(RecordWriter.class);
private final Counter activeIcebergWriters =
Metrics.counter(RecordWriterManager.class, "activeIcebergWriters");
+ private final Distribution dataFileByteSize =
+ Metrics.distribution(RecordWriter.class, "dataFileByteSize");
private final DataWriter icebergDataWriter;
private final Table table;
private final String absoluteFilename;
@@ -95,7 +98,7 @@ class RecordWriter {
}
activeIcebergWriters.inc();
LOG.info(
- "Opened {} writer for table {}, partition {}. Writing to path: {}",
+ "Opened {} writer for table '{}', partition {}. Writing to path: {}",
fileFormat,
table.name(),
partitionKey,
@@ -117,7 +120,15 @@ public void close() throws IOException {
e);
}
activeIcebergWriters.dec();
- LOG.info("Closed {} writer for table {}, path: {}", fileFormat, table.name(), absoluteFilename);
+ DataFile dataFile = icebergDataWriter.toDataFile();
+ LOG.info(
+ "Closed {} writer for table '{}' ({} records, {} bytes), path: {}",
+ fileFormat,
+ table.name(),
+ dataFile.recordCount(),
+ dataFile.fileSizeInBytes(),
+ absoluteFilename);
+ dataFileByteSize.update(dataFile.fileSizeInBytes());
}
public long bytesWritten() {
diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteGroupedRowsToFiles.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteGroupedRowsToFiles.java
index 0bc18ffcf421..1926a769a6da 100644
--- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteGroupedRowsToFiles.java
+++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteGroupedRowsToFiles.java
@@ -24,11 +24,11 @@
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.ShardedKey;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
-import org.apache.beam.sdk.values.ShardedKey;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.catalog.Catalog;
@@ -38,7 +38,7 @@ class WriteGroupedRowsToFiles
extends PTransform<
PCollection, Iterable>>, PCollection> {
- static final long DEFAULT_MAX_BYTES_PER_FILE = (1L << 40); // 1TB
+ private static final long DEFAULT_MAX_BYTES_PER_FILE = (1L << 29); // 512mb
private final DynamicDestinations dynamicDestinations;
private final IcebergCatalogConfig catalogConfig;
diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteToDestinations.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteToDestinations.java
index 60d23f2dd394..4d03f3a3bc58 100644
--- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteToDestinations.java
+++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteToDestinations.java
@@ -19,39 +19,34 @@
import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
-import java.util.concurrent.ThreadLocalRandom;
+import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.RowCoder;
-import org.apache.beam.sdk.coders.ShardedKeyCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.GroupIntoBatches;
import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.SimpleFunction;
-import org.apache.beam.sdk.transforms.windowing.AfterPane;
import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.Repeatedly;
import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.util.ShardedKey;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.Row;
-import org.apache.beam.sdk.values.ShardedKey;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Duration;
-class WriteToDestinations extends PTransform, IcebergWriteResult> {
+class WriteToDestinations extends PTransform>, IcebergWriteResult> {
- static final long DEFAULT_MAX_BYTES_PER_FILE = (1L << 40); // 1TB
+ // Used for auto-sharding in streaming. Limits number of records per batch/file
+ private static final int FILE_TRIGGERING_RECORD_COUNT = 500_000;
+ // Used for auto-sharding in streaming. Limits total byte size per batch/file
+ public static final int FILE_TRIGGERING_BYTE_COUNT = 1 << 30; // 1GiB
static final int DEFAULT_NUM_FILE_SHARDS = 0;
- // constant field names representing table identifier string and the record
- static final String DEST = "dest";
- static final String DATA = "data";
-
private final IcebergCatalogConfig catalogConfig;
private final DynamicDestinations dynamicDestinations;
private final @Nullable Duration triggeringFrequency;
@@ -66,7 +61,58 @@ class WriteToDestinations extends PTransform, IcebergWriteResul
}
@Override
- public IcebergWriteResult expand(PCollection input) {
+ public IcebergWriteResult expand(PCollection> input) {
+ // Write records to files
+ PCollection writtenFiles =
+ input.isBounded().equals(PCollection.IsBounded.UNBOUNDED)
+ ? writeTriggered(input)
+ : writeUntriggered(input);
+
+ // Commit files to tables
+ PCollection> snapshots =
+ writtenFiles.apply(new AppendFilesToTables(catalogConfig));
+
+ return new IcebergWriteResult(input.getPipeline(), snapshots);
+ }
+
+ private PCollection writeTriggered(PCollection> input) {
+ checkArgumentNotNull(
+ triggeringFrequency, "Streaming pipelines must set a triggering frequency.");
+
+ // Group records into batches to avoid writing thousands of small files
+ PCollection, Iterable>> groupedRecords =
+ input
+ .apply("WindowIntoGlobal", Window.into(new GlobalWindows()))
+ // We rely on GroupIntoBatches to group and parallelize records properly,
+ // respecting our thresholds for number of records and bytes per batch.
+ // Each output batch will be written to a file.
+ .apply(
+ GroupIntoBatches.ofSize(FILE_TRIGGERING_RECORD_COUNT)
+ .withByteSize(FILE_TRIGGERING_BYTE_COUNT)
+ .withMaxBufferingDuration(checkArgumentNotNull(triggeringFrequency))
+ .withShardedKey())
+ .setCoder(
+ KvCoder.of(
+ org.apache.beam.sdk.util.ShardedKey.Coder.of(StringUtf8Coder.of()),
+ IterableCoder.of(RowCoder.of(dynamicDestinations.getDataSchema()))));
+
+ return groupedRecords
+ .apply("WriteGroupedRows", new WriteGroupedRowsToFiles(catalogConfig, dynamicDestinations))
+ // Respect user's triggering frequency before committing snapshots
+ .apply(
+ "ApplyUserTrigger",
+ Window.into(new GlobalWindows())
+ .triggering(
+ Repeatedly.forever(
+ AfterProcessingTime.pastFirstElementInPane()
+ .plusDelayOf(checkArgumentNotNull(triggeringFrequency))))
+ .discardingFiredPanes());
+ }
+
+ private PCollection writeUntriggered(PCollection> input) {
+ Preconditions.checkArgument(
+ triggeringFrequency == null,
+ "Triggering frequency is only applicable for streaming pipelines.");
// First, attempt to write directly to files without shuffling. If there are
// too many distinct destinations in a single bundle, the remaining
@@ -76,94 +122,17 @@ public IcebergWriteResult expand(PCollection input) {
"Fast-path write rows",
new WriteUngroupedRowsToFiles(catalogConfig, dynamicDestinations));
- // Then write the rest by shuffling on the destination metadata
- Preconditions.checkState(
- writeUngroupedResult.getSpilledRows().getSchema().hasField(DEST),
- "Input schema missing `%s` field.",
- DEST);
- Schema dataSchema =
- checkArgumentNotNull(
- writeUngroupedResult
- .getSpilledRows()
- .getSchema()
- .getField(DATA)
- .getType()
- .getRowSchema(),
- "Input schema missing `%s` field",
- DATA);
-
+ // Then write the rest by shuffling on the destination
PCollection writeGroupedResult =
writeUngroupedResult
.getSpilledRows()
- .apply(
- "Key by destination and shard",
- MapElements.via(
- new SimpleFunction, Row>>() {
- private static final int SPILLED_ROWS_SHARDING_FACTOR = 10;
- private int shardNumber =
- ThreadLocalRandom.current().nextInt(SPILLED_ROWS_SHARDING_FACTOR);
-
- @Override
- public KV, Row> apply(Row elem) {
- Row data =
- checkArgumentNotNull(
- elem.getRow(DATA), "Element missing `%s` field", DATA);
- String dest =
- checkArgumentNotNull(
- elem.getString(DEST), "Element missing `%s` field", DEST);
- return KV.of(
- ShardedKey.of(dest, ++shardNumber % SPILLED_ROWS_SHARDING_FACTOR),
- data);
- }
- }))
- .setCoder(KvCoder.of(ShardedKeyCoder.of(StringUtf8Coder.of()), RowCoder.of(dataSchema)))
.apply("Group spilled rows by destination shard", GroupByKey.create())
.apply(
"Write remaining rows to files",
new WriteGroupedRowsToFiles(catalogConfig, dynamicDestinations));
- PCollection writeUngroupedResultPColl = writeUngroupedResult.getWrittenFiles();
-
- if (input.isBounded().equals(PCollection.IsBounded.UNBOUNDED)) {
- // for streaming pipelines, re-window both outputs to keep Flatten happy
- writeGroupedResult =
- writeGroupedResult.apply(
- "RewindowGroupedRecords",
- Window.into(new GlobalWindows())
- .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
- .discardingFiredPanes());
- writeUngroupedResultPColl =
- writeUngroupedResultPColl.apply(
- "RewindowUnGroupedRecords",
- Window.into(new GlobalWindows())
- .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
- .discardingFiredPanes());
- }
-
- PCollection allWrittenFiles =
- PCollectionList.of(writeUngroupedResultPColl)
- .and(writeGroupedResult)
- .apply("Flatten Written Files", Flatten.pCollections());
-
- if (input.isBounded().equals(PCollection.IsBounded.UNBOUNDED)) {
- checkArgumentNotNull(
- triggeringFrequency, "Streaming pipelines must set a triggering frequency.");
- // apply the user's trigger before we start committing and creating snapshots
- allWrittenFiles =
- allWrittenFiles.apply(
- "ApplyUserTrigger",
- Window.into(new GlobalWindows())
- .triggering(
- Repeatedly.forever(
- AfterProcessingTime.pastFirstElementInPane()
- .plusDelayOf(checkArgumentNotNull(triggeringFrequency))))
- .discardingFiredPanes());
- }
-
- // Apply any sharded writes and flatten everything for catalog updates
- PCollection> snapshots =
- allWrittenFiles.apply(new AppendFilesToTables(catalogConfig));
-
- return new IcebergWriteResult(input.getPipeline(), snapshots);
+ return PCollectionList.of(writeUngroupedResult.getWrittenFiles())
+ .and(writeGroupedResult)
+ .apply("Flatten Written Files", Flatten.pCollections());
}
}
diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteUngroupedRowsToFiles.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteUngroupedRowsToFiles.java
index 1982c7fcbad0..3b2308fca89a 100644
--- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteUngroupedRowsToFiles.java
+++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteUngroupedRowsToFiles.java
@@ -17,20 +17,23 @@
*/
package org.apache.beam.sdk.io.iceberg;
-import static org.apache.beam.sdk.io.iceberg.WriteToDestinations.DATA;
-import static org.apache.beam.sdk.io.iceberg.WriteToDestinations.DEST;
-import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
-
+import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.RowCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.ShardedKey;
import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.PInput;
@@ -55,7 +58,7 @@
* written via another method.
*/
class WriteUngroupedRowsToFiles
- extends PTransform, WriteUngroupedRowsToFiles.Result> {
+ extends PTransform>, WriteUngroupedRowsToFiles.Result> {
/**
* Maximum number of writers that will be created per bundle. Any elements requiring more writers
@@ -67,7 +70,8 @@ class WriteUngroupedRowsToFiles
private static final TupleTag WRITTEN_FILES_TAG = new TupleTag<>("writtenFiles");
private static final TupleTag WRITTEN_ROWS_TAG = new TupleTag("writtenRows") {};
- private static final TupleTag SPILLED_ROWS_TAG = new TupleTag("spilledRows") {};
+ private static final TupleTag, Row>> SPILLED_ROWS_TAG =
+ new TupleTag, Row>>("spilledRows") {};
private final String filePrefix;
private final DynamicDestinations dynamicDestinations;
@@ -81,7 +85,7 @@ class WriteUngroupedRowsToFiles
}
@Override
- public Result expand(PCollection input) {
+ public Result expand(PCollection> input) {
PCollectionTuple resultTuple =
input.apply(
@@ -99,8 +103,15 @@ public Result expand(PCollection input) {
return new Result(
input.getPipeline(),
resultTuple.get(WRITTEN_FILES_TAG),
- resultTuple.get(WRITTEN_ROWS_TAG).setCoder(input.getCoder()),
- resultTuple.get(SPILLED_ROWS_TAG).setCoder(input.getCoder()));
+ resultTuple
+ .get(WRITTEN_ROWS_TAG)
+ .setCoder(RowCoder.of(dynamicDestinations.getDataSchema())),
+ resultTuple
+ .get(SPILLED_ROWS_TAG)
+ .setCoder(
+ KvCoder.of(
+ ShardedKey.Coder.of(StringUtf8Coder.of()),
+ RowCoder.of(dynamicDestinations.getDataSchema()))));
}
/**
@@ -111,14 +122,14 @@ static class Result implements POutput {
private final Pipeline pipeline;
private final PCollection writtenRows;
- private final PCollection spilledRows;
+ private final PCollection, Row>> spilledRows;
private final PCollection writtenFiles;
private Result(
Pipeline pipeline,
PCollection writtenFiles,
PCollection writtenRows,
- PCollection spilledRows) {
+ PCollection, Row>> spilledRows) {
this.pipeline = pipeline;
this.writtenFiles = writtenFiles;
this.writtenRows = writtenRows;
@@ -129,7 +140,7 @@ public PCollection getWrittenRows() {
return writtenRows;
}
- public PCollection getSpilledRows() {
+ public PCollection, Row>> getSpilledRows() {
return spilledRows;
}
@@ -170,8 +181,11 @@ public void finishSpecifyingOutput(
* the spilled records which were not written
*
*/
- private static class WriteUngroupedRowsToFilesDoFn extends DoFn {
+ private static class WriteUngroupedRowsToFilesDoFn
+ extends DoFn, FileWriteResult> {
+ // When we spill records, shard the output keys to prevent hotspots.
+ private static final int SPILLED_RECORD_SHARDING_FACTOR = 10;
private final String filename;
private final int maxWritersPerBundle;
private final long maxFileSize;
@@ -179,6 +193,7 @@ private static class WriteUngroupedRowsToFilesDoFn extends DoFn element,
+ BoundedWindow window,
+ PaneInfo pane,
+ MultiOutputReceiver out)
throws Exception {
- String dest =
- checkArgumentNotNull(element.getString(DEST), "Input row missing `%s` field.", DEST);
- Row data =
- checkArgumentNotNull(element.getRow(DATA), "Input row missing `data` field.", DATA);
+ String dest = element.getKey();
+ Row data = element.getValue();
IcebergDestination destination = dynamicDestinations.instantiateDestination(dest);
WindowedValue windowedDestination =
WindowedValue.of(destination, window.maxTimestamp(), window, pane);
@@ -232,7 +249,14 @@ public void processElement(
}
throw e;
}
- out.get(writeSuccess ? WRITTEN_ROWS_TAG : SPILLED_ROWS_TAG).output(element);
+
+ if (writeSuccess) {
+ out.get(WRITTEN_ROWS_TAG).output(data);
+ } else {
+ ByteBuffer buffer = ByteBuffer.allocate(Integer.BYTES);
+ buffer.putInt(++spilledShardNumber % SPILLED_RECORD_SHARDING_FACTOR);
+ out.get(SPILLED_ROWS_TAG).output(KV.of(ShardedKey.of(dest, buffer.array()), data));
+ }
}
@FinishBundle
diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOWriteTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOWriteTest.java
index 2f81db671dd7..e62c22be7968 100644
--- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOWriteTest.java
+++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOWriteTest.java
@@ -362,7 +362,7 @@ public void testStreamingWrite() {
.getSnapshots();
// verify that 2 snapshots are created (one per triggering interval)
PCollection snapshots = output.apply(Count.globally());
- PAssert.that(snapshots).containsInAnyOrder(1L, 1L);
+ PAssert.that(snapshots).containsInAnyOrder(2L);
testPipeline.run().waitUntilFinish();
List writtenRecords = ImmutableList.copyOf(IcebergGenerics.read(table).build());