Skip to content

Commit

Permalink
Merge branch 'iceberg_autosharding' of https://github.com/ahmedabu98/…
Browse files Browse the repository at this point in the history
…beam into cp32612
  • Loading branch information
ahmedabu98 committed Oct 4, 2024
2 parents cca83d2 + 9955868 commit 1889d4d
Show file tree
Hide file tree
Showing 9 changed files with 149 additions and 178 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@

* Added support for using vLLM in the RunInference transform (Python) ([#32528](https://github.com/apache/beam/issues/32528))
* [Managed Iceberg] Added support for streaming writes ([#32451](https://github.com/apache/beam/pull/32451))
* [Managed Iceberg] Added auto-sharding for streaming writes ([#32612](https://github.com/apache/beam/pull/32612))
* [Managed Iceberg] Added support for writing to dynamic destinations ([#32565](https://github.com/apache/beam/pull/32565))

## New Features / Improvements
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public void processElement(
}
update.commit();
Snapshot snapshot = table.currentSnapshot();
LOG.info("Created new snapshot for table '{}': {}.", element.getKey(), snapshot);
LOG.info("Created new snapshot for table '{}': {}", element.getKey(), snapshot);
snapshotsCreated.inc();
out.outputWithTimestamp(
KV.of(element.getKey(), SnapshotInfo.fromSnapshot(snapshot)), window.maxTimestamp());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,15 @@
*/
package org.apache.beam.sdk.io.iceberg;

import static org.apache.beam.sdk.io.iceberg.WriteToDestinations.DATA;
import static org.apache.beam.sdk.io.iceberg.WriteToDestinations.DEST;

import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.RowCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.ValueInSingleWindow;
Expand All @@ -37,7 +37,7 @@
* <p>The output record will have the format { dest: ..., data: ...} where the dest field has the
* assigned metadata and the data field has the original row.
*/
class AssignDestinations extends PTransform<PCollection<Row>, PCollection<Row>> {
class AssignDestinations extends PTransform<PCollection<Row>, PCollection<KV<String, Row>>> {

private final DynamicDestinations dynamicDestinations;

Expand All @@ -46,34 +46,27 @@ public AssignDestinations(DynamicDestinations dynamicDestinations) {
}

@Override
public PCollection<Row> expand(PCollection<Row> input) {

final Schema outputSchema =
Schema.builder()
.addStringField(DEST)
.addRowField(DATA, dynamicDestinations.getDataSchema())
.build();

public PCollection<KV<String, Row>> expand(PCollection<Row> input) {
return input
.apply(
ParDo.of(
new DoFn<Row, Row>() {
new DoFn<Row, KV<String, Row>>() {
@ProcessElement
public void processElement(
@Element Row element,
BoundedWindow window,
PaneInfo paneInfo,
@Timestamp Instant timestamp,
OutputReceiver<Row> out) {
OutputReceiver<KV<String, Row>> out) {
String tableIdentifier =
dynamicDestinations.getTableStringIdentifier(
ValueInSingleWindow.of(element, timestamp, window, paneInfo));
Row data = dynamicDestinations.getData(element);

out.output(
Row.withSchema(outputSchema).addValues(tableIdentifier, data).build());
out.output(KV.of(tableIdentifier, data));
}
}))
.setRowSchema(outputSchema);
.setCoder(
KvCoder.of(StringUtf8Coder.of(), RowCoder.of(dynamicDestinations.getDataSchema())));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
*/
package org.apache.beam.sdk.io.iceberg;

import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;

import com.google.auto.value.AutoValue;
Expand All @@ -28,12 +27,6 @@
import org.apache.beam.sdk.managed.Managed;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.windowing.AfterFirst;
import org.apache.beam.sdk.transforms.windowing.AfterPane;
import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.Repeatedly;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
Expand Down Expand Up @@ -288,7 +281,6 @@ public static WriteRows writeRows(IcebergCatalogConfig catalog) {

@AutoValue
public abstract static class WriteRows extends PTransform<PCollection<Row>, IcebergWriteResult> {
private static final int TRIGGERING_RECORD_COUNT = 50_000;

abstract IcebergCatalogConfig getCatalogConfig();

Expand Down Expand Up @@ -322,12 +314,14 @@ public WriteRows to(DynamicDestinations destinations) {
}

/**
* Sets the frequency at which data is committed and a new {@link org.apache.iceberg.Snapshot}
* is produced.
* Sets the frequency at which data is written to files and a new {@link
* org.apache.iceberg.Snapshot} is produced.
*
* <p>Roughly every triggeringFrequency duration, this connector will try to accumulate all
* {@link org.apache.iceberg.ManifestFile}s and commit them to the table as appended files. Each
* commit results in a new table {@link org.apache.iceberg.Snapshot}.
* <p>Roughly every triggeringFrequency duration, records are written to data files and appended
* to the respective table. Each append operation created a new table snapshot.
*
* <p>Generally speaking, increasing this duration will result in fewer, larger data files and
* fewer snapshots.
*
* <p>This is only applicable when writing an unbounded {@link PCollection} (i.e. a streaming
* pipeline).
Expand All @@ -350,34 +344,13 @@ public IcebergWriteResult expand(PCollection<Row> input) {
Preconditions.checkNotNull(getTableIdentifier()), input.getSchema());
}

// Assign destinations before re-windowing to global because
// Assign destinations before re-windowing to global in WriteToDestinations because
// user's dynamic destination may depend on windowing properties
PCollection<Row> assignedRows =
input.apply("Set Destination Metadata", new AssignDestinations(destinations));

if (assignedRows.isBounded().equals(PCollection.IsBounded.UNBOUNDED)) {
Duration triggeringFrequency = getTriggeringFrequency();
checkArgumentNotNull(
triggeringFrequency, "Streaming pipelines must set a triggering frequency.");
assignedRows =
assignedRows.apply(
"WindowIntoGlobal",
Window.<Row>into(new GlobalWindows())
.triggering(
Repeatedly.forever(
AfterFirst.of(
AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(triggeringFrequency),
AfterPane.elementCountAtLeast(TRIGGERING_RECORD_COUNT))))
.discardingFiredPanes());
} else {
Preconditions.checkArgument(
getTriggeringFrequency() == null,
"Triggering frequency is only applicable for streaming pipelines.");
}
return assignedRows.apply(
"Write Rows to Destinations",
new WriteToDestinations(getCatalogConfig(), destinations, getTriggeringFrequency()));
return input
.apply("Assign Table Destinations", new AssignDestinations(destinations))
.apply(
"Write Rows to Destinations",
new WriteToDestinations(getCatalogConfig(), destinations, getTriggeringFrequency()));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.io.IOException;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Distribution;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileFormat;
Expand All @@ -38,6 +39,8 @@ class RecordWriter {
private static final Logger LOG = LoggerFactory.getLogger(RecordWriter.class);
private final Counter activeIcebergWriters =
Metrics.counter(RecordWriterManager.class, "activeIcebergWriters");
private final Distribution dataFileByteSize =
Metrics.distribution(RecordWriter.class, "dataFileByteSize");
private final DataWriter<Record> icebergDataWriter;
private final Table table;
private final String absoluteFilename;
Expand Down Expand Up @@ -95,7 +98,7 @@ class RecordWriter {
}
activeIcebergWriters.inc();
LOG.info(
"Opened {} writer for table {}, partition {}. Writing to path: {}",
"Opened {} writer for table '{}', partition {}. Writing to path: {}",
fileFormat,
table.name(),
partitionKey,
Expand All @@ -117,7 +120,15 @@ public void close() throws IOException {
e);
}
activeIcebergWriters.dec();
LOG.info("Closed {} writer for table {}, path: {}", fileFormat, table.name(), absoluteFilename);
DataFile dataFile = icebergDataWriter.toDataFile();
LOG.info(
"Closed {} writer for table '{}' ({} records, {} bytes), path: {}",
fileFormat,
table.name(),
dataFile.recordCount(),
dataFile.fileSizeInBytes(),
absoluteFilename);
dataFileByteSize.update(dataFile.fileSizeInBytes());
}

public long bytesWritten() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.ShardedKey;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.ShardedKey;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.catalog.Catalog;
Expand All @@ -38,7 +38,7 @@ class WriteGroupedRowsToFiles
extends PTransform<
PCollection<KV<ShardedKey<String>, Iterable<Row>>>, PCollection<FileWriteResult>> {

static final long DEFAULT_MAX_BYTES_PER_FILE = (1L << 40); // 1TB
private static final long DEFAULT_MAX_BYTES_PER_FILE = (1L << 29); // 512mb

private final DynamicDestinations dynamicDestinations;
private final IcebergCatalogConfig catalogConfig;
Expand Down
Loading

0 comments on commit 1889d4d

Please sign in to comment.