Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Managed Iceberg] add GiB autosharding #32612

Merged
merged 8 commits into from
Oct 4, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/trigger_files/IO_Iceberg_Integration_Tests.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run",
"modification": 1
"modification": 2
}
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public void processElement(
}
update.commit();
Snapshot snapshot = table.currentSnapshot();
LOG.info("Created new snapshot for table '{}': {}.", element.getKey(), snapshot);
LOG.info("Created new snapshot for table '{}': {}", element.getKey(), snapshot);
snapshotsCreated.inc();
out.outputWithTimestamp(
KV.of(element.getKey(), SnapshotInfo.fromSnapshot(snapshot)), window.maxTimestamp());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,15 @@
*/
package org.apache.beam.sdk.io.iceberg;

import static org.apache.beam.sdk.io.iceberg.WriteToDestinations.DATA;
import static org.apache.beam.sdk.io.iceberg.WriteToDestinations.DEST;

import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.RowCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.ValueInSingleWindow;
Expand All @@ -37,7 +37,7 @@
* <p>The output record will have the format { dest: ..., data: ...} where the dest field has the
* assigned metadata and the data field has the original row.
*/
class AssignDestinations extends PTransform<PCollection<Row>, PCollection<Row>> {
class AssignDestinations extends PTransform<PCollection<Row>, PCollection<KV<String, Row>>> {

private final DynamicDestinations dynamicDestinations;

Expand All @@ -46,34 +46,27 @@ public AssignDestinations(DynamicDestinations dynamicDestinations) {
}

@Override
public PCollection<Row> expand(PCollection<Row> input) {

final Schema outputSchema =
Schema.builder()
.addStringField(DEST)
.addRowField(DATA, dynamicDestinations.getDataSchema())
.build();

public PCollection<KV<String, Row>> expand(PCollection<Row> input) {
return input
.apply(
ParDo.of(
new DoFn<Row, Row>() {
new DoFn<Row, KV<String, Row>>() {
@ProcessElement
public void processElement(
@Element Row element,
BoundedWindow window,
PaneInfo paneInfo,
@Timestamp Instant timestamp,
OutputReceiver<Row> out) {
OutputReceiver<KV<String, Row>> out) {
String tableIdentifier =
dynamicDestinations.getTableStringIdentifier(
ValueInSingleWindow.of(element, timestamp, window, paneInfo));
Row data = dynamicDestinations.getData(element);

out.output(
Row.withSchema(outputSchema).addValues(tableIdentifier, data).build());
out.output(KV.of(tableIdentifier, data));
}
}))
.setRowSchema(outputSchema);
.setCoder(
KvCoder.of(StringUtf8Coder.of(), RowCoder.of(dynamicDestinations.getDataSchema())));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
*/
package org.apache.beam.sdk.io.iceberg;

import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;

import com.google.auto.value.AutoValue;
Expand All @@ -26,12 +25,6 @@
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.windowing.AfterFirst;
import org.apache.beam.sdk.transforms.windowing.AfterPane;
import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.Repeatedly;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
Expand All @@ -57,7 +50,6 @@ public static WriteRows writeRows(IcebergCatalogConfig catalog) {

@AutoValue
public abstract static class WriteRows extends PTransform<PCollection<Row>, IcebergWriteResult> {
private static final int TRIGGERING_RECORD_COUNT = 50_000;

abstract IcebergCatalogConfig getCatalogConfig();

Expand Down Expand Up @@ -91,12 +83,14 @@ public WriteRows to(DynamicDestinations destinations) {
}

/**
* Sets the frequency at which data is committed and a new {@link org.apache.iceberg.Snapshot}
* is produced.
* Sets the frequency at which data is written to files and a new {@link
* org.apache.iceberg.Snapshot} is produced.
*
* <p>Roughly every triggeringFrequency duration, this connector will try to accumulate all
* {@link org.apache.iceberg.ManifestFile}s and commit them to the table as appended files. Each
* commit results in a new table {@link org.apache.iceberg.Snapshot}.
* <p>Roughly every triggeringFrequency duration, records are written to data files and appended
* to the respective table. Each append operation created a new table snapshot.
*
* <p>Generally speaking, increasing this duration will result in fewer, larger data files and
* fewer snapshots.
*
* <p>This is only applicable when writing an unbounded {@link PCollection} (i.e. a streaming
* pipeline).
Expand All @@ -119,34 +113,13 @@ public IcebergWriteResult expand(PCollection<Row> input) {
Preconditions.checkNotNull(getTableIdentifier()), input.getSchema());
}

// Assign destinations before re-windowing to global because
// Assign destinations before re-windowing to global in WriteToDestinations because
// user's dynamic destination may depend on windowing properties
PCollection<Row> assignedRows =
input.apply("Set Destination Metadata", new AssignDestinations(destinations));

if (assignedRows.isBounded().equals(PCollection.IsBounded.UNBOUNDED)) {
Duration triggeringFrequency = getTriggeringFrequency();
checkArgumentNotNull(
triggeringFrequency, "Streaming pipelines must set a triggering frequency.");
assignedRows =
assignedRows.apply(
"WindowIntoGlobal",
Window.<Row>into(new GlobalWindows())
.triggering(
Repeatedly.forever(
AfterFirst.of(
AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(triggeringFrequency),
AfterPane.elementCountAtLeast(TRIGGERING_RECORD_COUNT))))
.discardingFiredPanes());
} else {
Preconditions.checkArgument(
getTriggeringFrequency() == null,
"Triggering frequency is only applicable for streaming pipelines.");
}
return assignedRows.apply(
"Write Rows to Destinations",
new WriteToDestinations(getCatalogConfig(), destinations, getTriggeringFrequency()));
return input
.apply("Assign Table Destinations", new AssignDestinations(destinations))
.apply(
"Write Rows to Destinations",
new WriteToDestinations(getCatalogConfig(), destinations, getTriggeringFrequency()));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.ShardedKey;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.ShardedKey;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.catalog.Catalog;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,39 +19,34 @@

import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;

import java.util.concurrent.ThreadLocalRandom;
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.RowCoder;
import org.apache.beam.sdk.coders.ShardedKeyCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.GroupIntoBatches;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.transforms.windowing.AfterPane;
import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.Repeatedly;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.util.ShardedKey;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.ShardedKey;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Duration;

class WriteToDestinations extends PTransform<PCollection<Row>, IcebergWriteResult> {
class WriteToDestinations extends PTransform<PCollection<KV<String, Row>>, IcebergWriteResult> {

static final long DEFAULT_MAX_BYTES_PER_FILE = (1L << 40); // 1TB
// Used for auto-sharding in streaming. Limits number of records per batch/file
private static final int FILE_TRIGGERING_RECORD_COUNT = 100_000;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These constants were determined by experimentation or by looking at another sink implementation ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's taken from WriteFiles:

// The record count and buffering duration to trigger flushing records to a tmp file. Mainly used
// for writing unbounded data to avoid generating too many small files.
public static final int FILE_TRIGGERING_RECORD_COUNT = 100000;
public static final int FILE_TRIGGERING_BYTE_COUNT = 64 * 1024 * 1024; // 64MiB as of now

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BigQuery batch loads is similar but has a greater record count limit (500,000):

// If user triggering is supplied, we will trigger the file write after this many records are
// written.
static final int FILE_TRIGGERING_RECORD_COUNT = 500000;
// If user triggering is supplied, we will trigger the file write after this many bytes are
// written.
static final int DEFAULT_FILE_TRIGGERING_BYTE_COUNT =
AsyncWriteChannelOptions.UPLOAD_CHUNK_SIZE_DEFAULT; // 64MiB as of now

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be a good idea in a follow up PR to expose record and byte count, in case the user wants more flexibility. Also not sure if we want this current default of 100000 to be different from the old default of TRIGGERING_RECORD_COUNT=50,000

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think for ManagedIO in general, it might be good to limit the number of knobs we expose. The idea is for Beam/runner to find reasonable optimal values and manage it on behalf of users.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to not exposing it (at least not from the get-go)

Also not sure if we want this current default of 100000 to be different from the old default of TRIGGERING_RECORD_COUNT=50,000

Before a recent PR (#32451), the old default actually wasn't used anywhere. This IO is still pretty new and we haven't stress tested it yet to see what's most optimal. I figured a good starting point would be to follow WriteFiles (100,000) because it's essentially the same function.

// Used for auto-sharding in streaming. Limits total byte size per batch/file
public static final int FILE_TRIGGERING_BYTE_COUNT = 64 * 1024 * 1024; // 64MiB
static final int DEFAULT_NUM_FILE_SHARDS = 0;
// constant field names representing table identifier string and the record
static final String DEST = "dest";
static final String DATA = "data";

private final IcebergCatalogConfig catalogConfig;
private final DynamicDestinations dynamicDestinations;
private final @Nullable Duration triggeringFrequency;
Expand All @@ -66,7 +61,58 @@ class WriteToDestinations extends PTransform<PCollection<Row>, IcebergWriteResul
}

@Override
public IcebergWriteResult expand(PCollection<Row> input) {
public IcebergWriteResult expand(PCollection<KV<String, Row>> input) {
// Write records to files
PCollection<FileWriteResult> writtenFiles =
input.isBounded().equals(PCollection.IsBounded.UNBOUNDED)
? writeTriggered(input)
: writeUntriggered(input);

// Commit files to tables
PCollection<KV<String, SnapshotInfo>> snapshots =
writtenFiles.apply(new AppendFilesToTables(catalogConfig));

return new IcebergWriteResult(input.getPipeline(), snapshots);
}

private PCollection<FileWriteResult> writeTriggered(PCollection<KV<String, Row>> input) {
checkArgumentNotNull(
ahmedabu98 marked this conversation as resolved.
Show resolved Hide resolved
triggeringFrequency, "Streaming pipelines must set a triggering frequency.");

// Group records into batches to avoid writing thousands of small files
PCollection<KV<ShardedKey<String>, Iterable<Row>>> groupedRecords =
input
.apply("WindowIntoGlobal", Window.into(new GlobalWindows()))
// We rely on GroupIntoBatches to group and parallelize records properly,
// respecting our thresholds for number of records and bytes per batch.
// Each output batch will be written to a file.
.apply(
GroupIntoBatches.<String, Row>ofSize(FILE_TRIGGERING_RECORD_COUNT)
.withByteSize(FILE_TRIGGERING_BYTE_COUNT)
.withMaxBufferingDuration(checkArgumentNotNull(triggeringFrequency))
.withShardedKey())
.setCoder(
KvCoder.of(
org.apache.beam.sdk.util.ShardedKey.Coder.of(StringUtf8Coder.of()),
IterableCoder.of(RowCoder.of(dynamicDestinations.getDataSchema()))));

return groupedRecords
.apply("WriteGroupedRows", new WriteGroupedRowsToFiles(catalogConfig, dynamicDestinations))
// Respect user's triggering frequency before committing snapshots
.apply(
"ApplyUserTrigger",
Window.<FileWriteResult>into(new GlobalWindows())
.triggering(
Repeatedly.forever(
AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(checkArgumentNotNull(triggeringFrequency))))
.discardingFiredPanes());
}

private PCollection<FileWriteResult> writeUntriggered(PCollection<KV<String, Row>> input) {
Preconditions.checkArgument(
triggeringFrequency == null,
"Triggering frequency is only applicable for streaming pipelines.");

// First, attempt to write directly to files without shuffling. If there are
// too many distinct destinations in a single bundle, the remaining
Expand All @@ -76,94 +122,17 @@ public IcebergWriteResult expand(PCollection<Row> input) {
"Fast-path write rows",
new WriteUngroupedRowsToFiles(catalogConfig, dynamicDestinations));

// Then write the rest by shuffling on the destination metadata
Preconditions.checkState(
writeUngroupedResult.getSpilledRows().getSchema().hasField(DEST),
"Input schema missing `%s` field.",
DEST);
Schema dataSchema =
checkArgumentNotNull(
writeUngroupedResult
.getSpilledRows()
.getSchema()
.getField(DATA)
.getType()
.getRowSchema(),
"Input schema missing `%s` field",
DATA);

// Then write the rest by shuffling on the destination
PCollection<FileWriteResult> writeGroupedResult =
writeUngroupedResult
.getSpilledRows()
.apply(
"Key by destination and shard",
MapElements.via(
new SimpleFunction<Row, KV<ShardedKey<String>, Row>>() {
private static final int SPILLED_ROWS_SHARDING_FACTOR = 10;
private int shardNumber =
ThreadLocalRandom.current().nextInt(SPILLED_ROWS_SHARDING_FACTOR);

@Override
public KV<ShardedKey<String>, Row> apply(Row elem) {
Row data =
checkArgumentNotNull(
elem.getRow(DATA), "Element missing `%s` field", DATA);
String dest =
checkArgumentNotNull(
elem.getString(DEST), "Element missing `%s` field", DEST);
return KV.of(
ShardedKey.of(dest, ++shardNumber % SPILLED_ROWS_SHARDING_FACTOR),
data);
}
}))
.setCoder(KvCoder.of(ShardedKeyCoder.of(StringUtf8Coder.of()), RowCoder.of(dataSchema)))
.apply("Group spilled rows by destination shard", GroupByKey.create())
.apply(
"Write remaining rows to files",
new WriteGroupedRowsToFiles(catalogConfig, dynamicDestinations));

PCollection<FileWriteResult> writeUngroupedResultPColl = writeUngroupedResult.getWrittenFiles();

if (input.isBounded().equals(PCollection.IsBounded.UNBOUNDED)) {
// for streaming pipelines, re-window both outputs to keep Flatten happy
writeGroupedResult =
writeGroupedResult.apply(
"RewindowGroupedRecords",
Window.<FileWriteResult>into(new GlobalWindows())
.triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
.discardingFiredPanes());
writeUngroupedResultPColl =
writeUngroupedResultPColl.apply(
"RewindowUnGroupedRecords",
Window.<FileWriteResult>into(new GlobalWindows())
.triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
.discardingFiredPanes());
}

PCollection<FileWriteResult> allWrittenFiles =
PCollectionList.of(writeUngroupedResultPColl)
.and(writeGroupedResult)
.apply("Flatten Written Files", Flatten.pCollections());

if (input.isBounded().equals(PCollection.IsBounded.UNBOUNDED)) {
checkArgumentNotNull(
triggeringFrequency, "Streaming pipelines must set a triggering frequency.");
// apply the user's trigger before we start committing and creating snapshots
allWrittenFiles =
allWrittenFiles.apply(
"ApplyUserTrigger",
Window.<FileWriteResult>into(new GlobalWindows())
.triggering(
Repeatedly.forever(
AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(checkArgumentNotNull(triggeringFrequency))))
.discardingFiredPanes());
}

// Apply any sharded writes and flatten everything for catalog updates
PCollection<KV<String, SnapshotInfo>> snapshots =
allWrittenFiles.apply(new AppendFilesToTables(catalogConfig));

return new IcebergWriteResult(input.getPipeline(), snapshots);
return PCollectionList.of(writeUngroupedResult.getWrittenFiles())
.and(writeGroupedResult)
.apply("Flatten Written Files", Flatten.pCollections());
}
}
Loading
Loading