Skip to content

Commit

Permalink
[Managed Iceberg] add GiB autosharding
Browse files Browse the repository at this point in the history
  • Loading branch information
ahmedabu98 committed Oct 1, 2024
1 parent 301286f commit 384d8f6
Show file tree
Hide file tree
Showing 6 changed files with 133 additions and 174 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public void processElement(
}
update.commit();
Snapshot snapshot = table.currentSnapshot();
LOG.info("Created new snapshot for table '{}': {}.", element.getKey(), snapshot);
LOG.info("Created new snapshot for table '{}': {}", element.getKey(), snapshot);
snapshotsCreated.inc();
out.outputWithTimestamp(
KV.of(element.getKey(), SnapshotInfo.fromSnapshot(snapshot)), window.maxTimestamp());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,15 @@
*/
package org.apache.beam.sdk.io.iceberg;

import static org.apache.beam.sdk.io.iceberg.WriteToDestinations.DATA;
import static org.apache.beam.sdk.io.iceberg.WriteToDestinations.DEST;

import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.RowCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.ValueInSingleWindow;
Expand All @@ -37,7 +37,7 @@
* <p>The output record will have the format { dest: ..., data: ...} where the dest field has the
* assigned metadata and the data field has the original row.
*/
class AssignDestinations extends PTransform<PCollection<Row>, PCollection<Row>> {
class AssignDestinations extends PTransform<PCollection<Row>, PCollection<KV<String, Row>>> {

private final DynamicDestinations dynamicDestinations;

Expand All @@ -46,34 +46,27 @@ public AssignDestinations(DynamicDestinations dynamicDestinations) {
}

@Override
public PCollection<Row> expand(PCollection<Row> input) {

final Schema outputSchema =
Schema.builder()
.addStringField(DEST)
.addRowField(DATA, dynamicDestinations.getDataSchema())
.build();

public PCollection<KV<String, Row>> expand(PCollection<Row> input) {
return input
.apply(
ParDo.of(
new DoFn<Row, Row>() {
new DoFn<Row, KV<String, Row>>() {
@ProcessElement
public void processElement(
@Element Row element,
BoundedWindow window,
PaneInfo paneInfo,
@Timestamp Instant timestamp,
OutputReceiver<Row> out) {
OutputReceiver<KV<String, Row>> out) {
String tableIdentifier =
dynamicDestinations.getTableStringIdentifier(
ValueInSingleWindow.of(element, timestamp, window, paneInfo));
Row data = dynamicDestinations.getData(element);

out.output(
Row.withSchema(outputSchema).addValues(tableIdentifier, data).build());
out.output(KV.of(tableIdentifier, data));
}
}))
.setRowSchema(outputSchema);
.setCoder(
KvCoder.of(StringUtf8Coder.of(), RowCoder.of(dynamicDestinations.getDataSchema())));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
*/
package org.apache.beam.sdk.io.iceberg;

import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;

import com.google.auto.value.AutoValue;
Expand All @@ -26,12 +25,6 @@
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.windowing.AfterFirst;
import org.apache.beam.sdk.transforms.windowing.AfterPane;
import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.Repeatedly;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
Expand All @@ -57,7 +50,6 @@ public static WriteRows writeRows(IcebergCatalogConfig catalog) {

@AutoValue
public abstract static class WriteRows extends PTransform<PCollection<Row>, IcebergWriteResult> {
private static final int TRIGGERING_RECORD_COUNT = 50_000;

abstract IcebergCatalogConfig getCatalogConfig();

Expand Down Expand Up @@ -91,12 +83,14 @@ public WriteRows to(DynamicDestinations destinations) {
}

/**
* Sets the frequency at which data is committed and a new {@link org.apache.iceberg.Snapshot}
* is produced.
* Sets the frequency at which data is written to files and a new {@link
* org.apache.iceberg.Snapshot} is produced.
*
* <p>Roughly every triggeringFrequency duration, this connector will try to accumulate all
* {@link org.apache.iceberg.ManifestFile}s and commit them to the table as appended files. Each
* commit results in a new table {@link org.apache.iceberg.Snapshot}.
* <p>Roughly every triggeringFrequency duration, records are written to data files and appended
* to the respective table. Each append operation created a new table snapshot.
*
* <p>Generally speaking, increasing this duration will result in fewer, larger data files and
* fewer snapshots.
*
* <p>This is only applicable when writing an unbounded {@link PCollection} (i.e. a streaming
* pipeline).
Expand All @@ -119,34 +113,13 @@ public IcebergWriteResult expand(PCollection<Row> input) {
Preconditions.checkNotNull(getTableIdentifier()), input.getSchema());
}

// Assign destinations before re-windowing to global because
// Assign destinations before re-windowing to global in WriteToDestinations because
// user's dynamic destination may depend on windowing properties
PCollection<Row> assignedRows =
input.apply("Set Destination Metadata", new AssignDestinations(destinations));

if (assignedRows.isBounded().equals(PCollection.IsBounded.UNBOUNDED)) {
Duration triggeringFrequency = getTriggeringFrequency();
checkArgumentNotNull(
triggeringFrequency, "Streaming pipelines must set a triggering frequency.");
assignedRows =
assignedRows.apply(
"WindowIntoGlobal",
Window.<Row>into(new GlobalWindows())
.triggering(
Repeatedly.forever(
AfterFirst.of(
AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(triggeringFrequency),
AfterPane.elementCountAtLeast(TRIGGERING_RECORD_COUNT))))
.discardingFiredPanes());
} else {
Preconditions.checkArgument(
getTriggeringFrequency() == null,
"Triggering frequency is only applicable for streaming pipelines.");
}
return assignedRows.apply(
"Write Rows to Destinations",
new WriteToDestinations(getCatalogConfig(), destinations, getTriggeringFrequency()));
return input
.apply("Assign Table Destinations", new AssignDestinations(destinations))
.apply(
"Write Rows to Destinations",
new WriteToDestinations(getCatalogConfig(), destinations, getTriggeringFrequency()));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.ShardedKey;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.ShardedKey;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.catalog.Catalog;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,39 +19,34 @@

import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;

import java.util.concurrent.ThreadLocalRandom;
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.RowCoder;
import org.apache.beam.sdk.coders.ShardedKeyCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.GroupIntoBatches;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.transforms.windowing.AfterPane;
import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.Repeatedly;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.util.ShardedKey;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.ShardedKey;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Duration;

class WriteToDestinations extends PTransform<PCollection<Row>, IcebergWriteResult> {
class WriteToDestinations extends PTransform<PCollection<KV<String, Row>>, IcebergWriteResult> {

static final long DEFAULT_MAX_BYTES_PER_FILE = (1L << 40); // 1TB
// Used for auto-sharding in streaming. Limits number of records per batch/file
private static final int FILE_TRIGGERING_RECORD_COUNT = 100_000;
// Used for auto-sharding in streaming. Limits total byte size per batch/file
public static final int FILE_TRIGGERING_BYTE_COUNT = 64 * 1024 * 1024; // 64MiB
static final int DEFAULT_NUM_FILE_SHARDS = 0;
// constant field names representing table identifier string and the record
static final String DEST = "dest";
static final String DATA = "data";

private final IcebergCatalogConfig catalogConfig;
private final DynamicDestinations dynamicDestinations;
private final @Nullable Duration triggeringFrequency;
Expand All @@ -66,7 +61,58 @@ class WriteToDestinations extends PTransform<PCollection<Row>, IcebergWriteResul
}

@Override
public IcebergWriteResult expand(PCollection<Row> input) {
public IcebergWriteResult expand(PCollection<KV<String, Row>> input) {
// Write records to files
PCollection<FileWriteResult> writtenFiles =
input.isBounded().equals(PCollection.IsBounded.UNBOUNDED)
? writeTriggered(input)
: writeUntriggered(input);

// Commit files to tables
PCollection<KV<String, SnapshotInfo>> snapshots =
writtenFiles.apply(new AppendFilesToTables(catalogConfig));

return new IcebergWriteResult(input.getPipeline(), snapshots);
}

private PCollection<FileWriteResult> writeTriggered(PCollection<KV<String, Row>> input) {
checkArgumentNotNull(
triggeringFrequency, "Streaming pipelines must set a triggering frequency.");

// Group records into batches to avoid writing thousands of small files
PCollection<KV<ShardedKey<String>, Iterable<Row>>> groupedRecords =
input
.apply("WindowIntoGlobal", Window.into(new GlobalWindows()))
// We rely on GroupIntoBatches to group and parallelize records properly,
// respecting our thresholds for number of records and bytes per batch.
// Each output batch will be written to a file.
.apply(
GroupIntoBatches.<String, Row>ofSize(FILE_TRIGGERING_RECORD_COUNT)
.withByteSize(FILE_TRIGGERING_BYTE_COUNT)
.withMaxBufferingDuration(checkArgumentNotNull(triggeringFrequency))
.withShardedKey())
.setCoder(
KvCoder.of(
org.apache.beam.sdk.util.ShardedKey.Coder.of(StringUtf8Coder.of()),
IterableCoder.of(RowCoder.of(dynamicDestinations.getDataSchema()))));

return groupedRecords
.apply("WriteGroupedRows", new WriteGroupedRowsToFiles(catalogConfig, dynamicDestinations))
// Respect user's triggering frequency before committing snapshots
.apply(
"ApplyUserTrigger",
Window.<FileWriteResult>into(new GlobalWindows())
.triggering(
Repeatedly.forever(
AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(checkArgumentNotNull(triggeringFrequency))))
.discardingFiredPanes());
}

private PCollection<FileWriteResult> writeUntriggered(PCollection<KV<String, Row>> input) {
Preconditions.checkArgument(
triggeringFrequency == null,
"Triggering frequency is only applicable for streaming pipelines.");

// First, attempt to write directly to files without shuffling. If there are
// too many distinct destinations in a single bundle, the remaining
Expand All @@ -76,94 +122,17 @@ public IcebergWriteResult expand(PCollection<Row> input) {
"Fast-path write rows",
new WriteUngroupedRowsToFiles(catalogConfig, dynamicDestinations));

// Then write the rest by shuffling on the destination metadata
Preconditions.checkState(
writeUngroupedResult.getSpilledRows().getSchema().hasField(DEST),
"Input schema missing `%s` field.",
DEST);
Schema dataSchema =
checkArgumentNotNull(
writeUngroupedResult
.getSpilledRows()
.getSchema()
.getField(DATA)
.getType()
.getRowSchema(),
"Input schema missing `%s` field",
DATA);

// Then write the rest by shuffling on the destination
PCollection<FileWriteResult> writeGroupedResult =
writeUngroupedResult
.getSpilledRows()
.apply(
"Key by destination and shard",
MapElements.via(
new SimpleFunction<Row, KV<ShardedKey<String>, Row>>() {
private static final int SPILLED_ROWS_SHARDING_FACTOR = 10;
private int shardNumber =
ThreadLocalRandom.current().nextInt(SPILLED_ROWS_SHARDING_FACTOR);

@Override
public KV<ShardedKey<String>, Row> apply(Row elem) {
Row data =
checkArgumentNotNull(
elem.getRow(DATA), "Element missing `%s` field", DATA);
String dest =
checkArgumentNotNull(
elem.getString(DEST), "Element missing `%s` field", DEST);
return KV.of(
ShardedKey.of(dest, ++shardNumber % SPILLED_ROWS_SHARDING_FACTOR),
data);
}
}))
.setCoder(KvCoder.of(ShardedKeyCoder.of(StringUtf8Coder.of()), RowCoder.of(dataSchema)))
.apply("Group spilled rows by destination shard", GroupByKey.create())
.apply(
"Write remaining rows to files",
new WriteGroupedRowsToFiles(catalogConfig, dynamicDestinations));

PCollection<FileWriteResult> writeUngroupedResultPColl = writeUngroupedResult.getWrittenFiles();

if (input.isBounded().equals(PCollection.IsBounded.UNBOUNDED)) {
// for streaming pipelines, re-window both outputs to keep Flatten happy
writeGroupedResult =
writeGroupedResult.apply(
"RewindowGroupedRecords",
Window.<FileWriteResult>into(new GlobalWindows())
.triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
.discardingFiredPanes());
writeUngroupedResultPColl =
writeUngroupedResultPColl.apply(
"RewindowUnGroupedRecords",
Window.<FileWriteResult>into(new GlobalWindows())
.triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
.discardingFiredPanes());
}

PCollection<FileWriteResult> allWrittenFiles =
PCollectionList.of(writeUngroupedResultPColl)
.and(writeGroupedResult)
.apply("Flatten Written Files", Flatten.pCollections());

if (input.isBounded().equals(PCollection.IsBounded.UNBOUNDED)) {
checkArgumentNotNull(
triggeringFrequency, "Streaming pipelines must set a triggering frequency.");
// apply the user's trigger before we start committing and creating snapshots
allWrittenFiles =
allWrittenFiles.apply(
"ApplyUserTrigger",
Window.<FileWriteResult>into(new GlobalWindows())
.triggering(
Repeatedly.forever(
AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(checkArgumentNotNull(triggeringFrequency))))
.discardingFiredPanes());
}

// Apply any sharded writes and flatten everything for catalog updates
PCollection<KV<String, SnapshotInfo>> snapshots =
allWrittenFiles.apply(new AppendFilesToTables(catalogConfig));

return new IcebergWriteResult(input.getPipeline(), snapshots);
return PCollectionList.of(writeUngroupedResult.getWrittenFiles())
.and(writeGroupedResult)
.apply("Flatten Written Files", Flatten.pCollections());
}
}
Loading

0 comments on commit 384d8f6

Please sign in to comment.