Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CP iceberg autosharding #32663

Merged
merged 1 commit into from
Oct 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@

* Added support for using vLLM in the RunInference transform (Python) ([#32528](https://github.com/apache/beam/issues/32528))
* [Managed Iceberg] Added support for streaming writes ([#32451](https://github.com/apache/beam/pull/32451))
* [Managed Iceberg] Added auto-sharding for streaming writes ([#32612](https://github.com/apache/beam/pull/32612))
* [Managed Iceberg] Added support for writing to dynamic destinations ([#32565](https://github.com/apache/beam/pull/32565))

## New Features / Improvements
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public void processElement(
}
update.commit();
Snapshot snapshot = table.currentSnapshot();
LOG.info("Created new snapshot for table '{}': {}.", element.getKey(), snapshot);
LOG.info("Created new snapshot for table '{}': {}", element.getKey(), snapshot);
snapshotsCreated.inc();
out.outputWithTimestamp(
KV.of(element.getKey(), SnapshotInfo.fromSnapshot(snapshot)), window.maxTimestamp());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,15 @@
*/
package org.apache.beam.sdk.io.iceberg;

import static org.apache.beam.sdk.io.iceberg.WriteToDestinations.DATA;
import static org.apache.beam.sdk.io.iceberg.WriteToDestinations.DEST;

import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.RowCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.ValueInSingleWindow;
Expand All @@ -37,7 +37,7 @@
* <p>The output record will have the format { dest: ..., data: ...} where the dest field has the
* assigned metadata and the data field has the original row.
*/
class AssignDestinations extends PTransform<PCollection<Row>, PCollection<Row>> {
class AssignDestinations extends PTransform<PCollection<Row>, PCollection<KV<String, Row>>> {

private final DynamicDestinations dynamicDestinations;

Expand All @@ -46,34 +46,27 @@ public AssignDestinations(DynamicDestinations dynamicDestinations) {
}

@Override
public PCollection<Row> expand(PCollection<Row> input) {

final Schema outputSchema =
Schema.builder()
.addStringField(DEST)
.addRowField(DATA, dynamicDestinations.getDataSchema())
.build();

public PCollection<KV<String, Row>> expand(PCollection<Row> input) {
return input
.apply(
ParDo.of(
new DoFn<Row, Row>() {
new DoFn<Row, KV<String, Row>>() {
@ProcessElement
public void processElement(
@Element Row element,
BoundedWindow window,
PaneInfo paneInfo,
@Timestamp Instant timestamp,
OutputReceiver<Row> out) {
OutputReceiver<KV<String, Row>> out) {
String tableIdentifier =
dynamicDestinations.getTableStringIdentifier(
ValueInSingleWindow.of(element, timestamp, window, paneInfo));
Row data = dynamicDestinations.getData(element);

out.output(
Row.withSchema(outputSchema).addValues(tableIdentifier, data).build());
out.output(KV.of(tableIdentifier, data));
}
}))
.setRowSchema(outputSchema);
.setCoder(
KvCoder.of(StringUtf8Coder.of(), RowCoder.of(dynamicDestinations.getDataSchema())));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
*/
package org.apache.beam.sdk.io.iceberg;

import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;

import com.google.auto.value.AutoValue;
Expand All @@ -28,12 +27,6 @@
import org.apache.beam.sdk.managed.Managed;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.windowing.AfterFirst;
import org.apache.beam.sdk.transforms.windowing.AfterPane;
import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.Repeatedly;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
Expand Down Expand Up @@ -288,7 +281,6 @@ public static WriteRows writeRows(IcebergCatalogConfig catalog) {

@AutoValue
public abstract static class WriteRows extends PTransform<PCollection<Row>, IcebergWriteResult> {
private static final int TRIGGERING_RECORD_COUNT = 50_000;

abstract IcebergCatalogConfig getCatalogConfig();

Expand Down Expand Up @@ -322,12 +314,14 @@ public WriteRows to(DynamicDestinations destinations) {
}

/**
* Sets the frequency at which data is committed and a new {@link org.apache.iceberg.Snapshot}
* is produced.
* Sets the frequency at which data is written to files and a new {@link
* org.apache.iceberg.Snapshot} is produced.
*
* <p>Roughly every triggeringFrequency duration, this connector will try to accumulate all
* {@link org.apache.iceberg.ManifestFile}s and commit them to the table as appended files. Each
* commit results in a new table {@link org.apache.iceberg.Snapshot}.
* <p>Roughly every triggeringFrequency duration, records are written to data files and appended
* to the respective table. Each append operation created a new table snapshot.
*
* <p>Generally speaking, increasing this duration will result in fewer, larger data files and
* fewer snapshots.
*
* <p>This is only applicable when writing an unbounded {@link PCollection} (i.e. a streaming
* pipeline).
Expand All @@ -350,34 +344,13 @@ public IcebergWriteResult expand(PCollection<Row> input) {
Preconditions.checkNotNull(getTableIdentifier()), input.getSchema());
}

// Assign destinations before re-windowing to global because
// Assign destinations before re-windowing to global in WriteToDestinations because
// user's dynamic destination may depend on windowing properties
PCollection<Row> assignedRows =
input.apply("Set Destination Metadata", new AssignDestinations(destinations));

if (assignedRows.isBounded().equals(PCollection.IsBounded.UNBOUNDED)) {
Duration triggeringFrequency = getTriggeringFrequency();
checkArgumentNotNull(
triggeringFrequency, "Streaming pipelines must set a triggering frequency.");
assignedRows =
assignedRows.apply(
"WindowIntoGlobal",
Window.<Row>into(new GlobalWindows())
.triggering(
Repeatedly.forever(
AfterFirst.of(
AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(triggeringFrequency),
AfterPane.elementCountAtLeast(TRIGGERING_RECORD_COUNT))))
.discardingFiredPanes());
} else {
Preconditions.checkArgument(
getTriggeringFrequency() == null,
"Triggering frequency is only applicable for streaming pipelines.");
}
return assignedRows.apply(
"Write Rows to Destinations",
new WriteToDestinations(getCatalogConfig(), destinations, getTriggeringFrequency()));
return input
.apply("Assign Table Destinations", new AssignDestinations(destinations))
.apply(
"Write Rows to Destinations",
new WriteToDestinations(getCatalogConfig(), destinations, getTriggeringFrequency()));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.io.IOException;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Distribution;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileFormat;
Expand All @@ -38,6 +39,8 @@ class RecordWriter {
private static final Logger LOG = LoggerFactory.getLogger(RecordWriter.class);
private final Counter activeIcebergWriters =
Metrics.counter(RecordWriterManager.class, "activeIcebergWriters");
private final Distribution dataFileByteSize =
Metrics.distribution(RecordWriter.class, "dataFileByteSize");
private final DataWriter<Record> icebergDataWriter;
private final Table table;
private final String absoluteFilename;
Expand Down Expand Up @@ -95,7 +98,7 @@ class RecordWriter {
}
activeIcebergWriters.inc();
LOG.info(
"Opened {} writer for table {}, partition {}. Writing to path: {}",
"Opened {} writer for table '{}', partition {}. Writing to path: {}",
fileFormat,
table.name(),
partitionKey,
Expand All @@ -117,7 +120,15 @@ public void close() throws IOException {
e);
}
activeIcebergWriters.dec();
LOG.info("Closed {} writer for table {}, path: {}", fileFormat, table.name(), absoluteFilename);
DataFile dataFile = icebergDataWriter.toDataFile();
LOG.info(
"Closed {} writer for table '{}' ({} records, {} bytes), path: {}",
fileFormat,
table.name(),
dataFile.recordCount(),
dataFile.fileSizeInBytes(),
absoluteFilename);
dataFileByteSize.update(dataFile.fileSizeInBytes());
}

public long bytesWritten() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.ShardedKey;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.ShardedKey;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.catalog.Catalog;
Expand All @@ -38,7 +38,7 @@ class WriteGroupedRowsToFiles
extends PTransform<
PCollection<KV<ShardedKey<String>, Iterable<Row>>>, PCollection<FileWriteResult>> {

static final long DEFAULT_MAX_BYTES_PER_FILE = (1L << 40); // 1TB
private static final long DEFAULT_MAX_BYTES_PER_FILE = (1L << 29); // 512mb

private final DynamicDestinations dynamicDestinations;
private final IcebergCatalogConfig catalogConfig;
Expand Down
Loading
Loading