Skip to content

Commit

Permalink
Support Managed Iceberg streaming writes (apache#32451)
Browse files Browse the repository at this point in the history
* iceberg streaming writes

* cleanup

* adress comments
  • Loading branch information
ahmedabu98 authored Sep 19, 2024
1 parent ec307a5 commit 75a4637
Show file tree
Hide file tree
Showing 12 changed files with 355 additions and 42 deletions.
2 changes: 1 addition & 1 deletion .github/trigger_files/IO_Iceberg_Integration_Tests.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run",
"modification": 1
"modification": 2
}
3 changes: 1 addition & 2 deletions sdks/java/io/expansion-service/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,7 @@ dependencies {
// **** IcebergIO runtime dependencies ****
runtimeOnly library.java.hadoop_client
// Needed when using GCS as the warehouse location.
implementation library.java.bigdataoss_gcs_connector
permitUnusedDeclared library.java.bigdataoss_gcs_connector
runtimeOnly library.java.bigdataoss_gcs_connector
// Needed for HiveCatalog
runtimeOnly ("org.apache.iceberg:iceberg-hive-metastore:1.4.2")
runtimeOnly project(path: ":sdks:java:io:iceberg:hive:exec", configuration: "shadow")
Expand Down
15 changes: 11 additions & 4 deletions sdks/java/io/iceberg/hive/exec/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,17 @@ artifacts {
shadowJar {
zip64 true

relocate 'com.google.common', getJavaRelocatedPath('iceberg.hive.com.google.common')
relocate 'com.google.protobuf', getJavaRelocatedPath('iceberg.hive.com.google.protobuf')
relocate 'shaded.parquet', getJavaRelocatedPath('iceberg.hive.shaded.parquet')
relocate 'org.apache.parquet', getJavaRelocatedPath('iceberg.hive.org.apache.parquet')
def problematicPackages = [
'com.google.protobuf',
'com.google.common',
'shaded.parquet',
'org.apache.parquet',
'org.joda'
]

problematicPackages.forEach {
relocate it, getJavaRelocatedPath("iceberg.hive.${it}")
}

version "3.1.3"
mergeServiceFiles()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.PTransform;
Expand All @@ -29,14 +31,17 @@
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class AppendFilesToTables
extends PTransform<PCollection<FileWriteResult>, PCollection<KV<String, SnapshotInfo>>> {

private static final Logger LOG = LoggerFactory.getLogger(AppendFilesToTables.class);
private final IcebergCatalogConfig catalogConfig;

AppendFilesToTables(IcebergCatalogConfig catalogConfig) {
Expand Down Expand Up @@ -66,6 +71,8 @@ public String apply(FileWriteResult input) {

private static class AppendFilesToTablesDoFn
extends DoFn<KV<String, Iterable<FileWriteResult>>, KV<String, SnapshotInfo>> {
private final Counter snapshotsCreated =
Metrics.counter(AppendFilesToTables.class, "snapshotsCreated");

private final IcebergCatalogConfig catalogConfig;

Expand All @@ -87,15 +94,21 @@ public void processElement(
@Element KV<String, Iterable<FileWriteResult>> element,
OutputReceiver<KV<String, SnapshotInfo>> out,
BoundedWindow window) {
if (!element.getValue().iterator().hasNext()) {
return;
}

Table table = getCatalog().loadTable(TableIdentifier.parse(element.getKey()));
AppendFiles update = table.newAppend();
for (FileWriteResult writtenFile : element.getValue()) {
update.appendManifest(writtenFile.getManifestFile());
}
update.commit();
Snapshot snapshot = table.currentSnapshot();
LOG.info("Created new snapshot for table '{}': {}.", element.getKey(), snapshot);
snapshotsCreated.inc();
out.outputWithTimestamp(
KV.of(element.getKey(), SnapshotInfo.fromSnapshot(table.currentSnapshot())),
window.maxTimestamp());
KV.of(element.getKey(), SnapshotInfo.fromSnapshot(snapshot)), window.maxTimestamp());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.beam.sdk.io.iceberg;

import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;

import com.google.auto.value.AutoValue;
Expand All @@ -25,6 +26,12 @@
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.windowing.AfterFirst;
import org.apache.beam.sdk.transforms.windowing.AfterPane;
import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.Repeatedly;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
Expand All @@ -33,6 +40,7 @@
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.TableIdentifier;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Duration;

/**
* The underlying Iceberg connector used by {@link org.apache.beam.sdk.managed.Managed#ICEBERG}. Not
Expand All @@ -49,13 +57,16 @@ public static WriteRows writeRows(IcebergCatalogConfig catalog) {

@AutoValue
public abstract static class WriteRows extends PTransform<PCollection<Row>, IcebergWriteResult> {
private static final int TRIGGERING_RECORD_COUNT = 50_000;

abstract IcebergCatalogConfig getCatalogConfig();

abstract @Nullable TableIdentifier getTableIdentifier();

abstract @Nullable DynamicDestinations getDynamicDestinations();

abstract @Nullable Duration getTriggeringFrequency();

abstract Builder toBuilder();

@AutoValue.Builder
Expand All @@ -66,6 +77,8 @@ abstract static class Builder {

abstract Builder setDynamicDestinations(DynamicDestinations destinations);

abstract Builder setTriggeringFrequency(Duration triggeringFrequency);

abstract WriteRows build();
}

Expand All @@ -77,6 +90,21 @@ public WriteRows to(DynamicDestinations destinations) {
return toBuilder().setDynamicDestinations(destinations).build();
}

/**
* Sets the frequency at which data is committed and a new {@link org.apache.iceberg.Snapshot}
* is produced.
*
* <p>Roughly every triggeringFrequency duration, this connector will try to accumulate all
* {@link org.apache.iceberg.ManifestFile}s and commit them to the table as appended files. Each
* commit results in a new table {@link org.apache.iceberg.Snapshot}.
*
* <p>This is only applicable when writing an unbounded {@link PCollection} (i.e. a streaming
* pipeline).
*/
public WriteRows withTriggeringFrequency(Duration triggeringFrequency) {
return toBuilder().setTriggeringFrequency(triggeringFrequency).build();
}

@Override
public IcebergWriteResult expand(PCollection<Row> input) {
List<?> allToArgs = Arrays.asList(getTableIdentifier(), getDynamicDestinations());
Expand All @@ -89,11 +117,32 @@ public IcebergWriteResult expand(PCollection<Row> input) {
destinations =
DynamicDestinations.singleTable(Preconditions.checkNotNull(getTableIdentifier()));
}

if (input.isBounded().equals(PCollection.IsBounded.UNBOUNDED)) {
Duration triggeringFrequency = getTriggeringFrequency();
checkArgumentNotNull(
triggeringFrequency, "Streaming pipelines must set a triggering frequency.");
input =
input.apply(
"WindowIntoGlobal",
Window.<Row>into(new GlobalWindows())
.triggering(
Repeatedly.forever(
AfterFirst.of(
AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(triggeringFrequency),
AfterPane.elementCountAtLeast(TRIGGERING_RECORD_COUNT))))
.discardingFiredPanes());
} else {
Preconditions.checkArgument(
getTriggeringFrequency() == null,
"Triggering frequency is only applicable for streaming pipelines.");
}
return input
.apply("Set Destination Metadata", new AssignDestinations(destinations))
.apply(
"Write Rows to Destinations",
new WriteToDestinations(getCatalogConfig(), destinations));
new WriteToDestinations(getCatalogConfig(), destinations, getTriggeringFrequency()));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,20 @@
*/
package org.apache.beam.sdk.io.iceberg;

import static org.apache.beam.sdk.io.iceberg.IcebergWriteSchemaTransformProvider.Configuration;

import com.google.auto.service.AutoService;
import com.google.auto.value.AutoValue;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.beam.sdk.managed.ManagedTransformConstants;
import org.apache.beam.sdk.schemas.AutoValueSchema;
import org.apache.beam.sdk.schemas.NoSuchSchemaException;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.SchemaRegistry;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription;
import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
Expand All @@ -35,14 +42,16 @@
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.catalog.TableIdentifier;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Duration;

/**
* SchemaTransform implementation for {@link IcebergIO#writeRows}. Writes Beam Rows to Iceberg and
* outputs a {@code PCollection<Row>} representing snapshots created in the process.
*/
@AutoService(SchemaTransformProvider.class)
public class IcebergWriteSchemaTransformProvider
extends TypedSchemaTransformProvider<SchemaTransformConfiguration> {
extends TypedSchemaTransformProvider<Configuration> {

static final String INPUT_TAG = "input";
static final String OUTPUT_TAG = "output";
Expand All @@ -57,8 +66,55 @@ public String description() {
+ "{\"table\" (str), \"operation\" (str), \"summary\" (map[str, str]), \"manifestListLocation\" (str)}";
}

@DefaultSchema(AutoValueSchema.class)
@AutoValue
public abstract static class Configuration {
public static Builder builder() {
return new AutoValue_IcebergWriteSchemaTransformProvider_Configuration.Builder();
}

@SchemaFieldDescription("Identifier of the Iceberg table.")
public abstract String getTable();

@SchemaFieldDescription("Name of the catalog containing the table.")
public abstract @Nullable String getCatalogName();

@SchemaFieldDescription("Properties used to set up the Iceberg catalog.")
public abstract @Nullable Map<String, String> getCatalogProperties();

@SchemaFieldDescription("Properties passed to the Hadoop Configuration.")
public abstract @Nullable Map<String, String> getConfigProperties();

@SchemaFieldDescription(
"For a streaming pipeline, sets the frequency at which snapshots are produced.")
public abstract @Nullable Integer getTriggeringFrequencySeconds();

@AutoValue.Builder
public abstract static class Builder {
public abstract Builder setTable(String table);

public abstract Builder setCatalogName(String catalogName);

public abstract Builder setCatalogProperties(Map<String, String> catalogProperties);

public abstract Builder setConfigProperties(Map<String, String> confProperties);

public abstract Builder setTriggeringFrequencySeconds(Integer triggeringFrequencySeconds);

public abstract Configuration build();
}

public IcebergCatalogConfig getIcebergCatalog() {
return IcebergCatalogConfig.builder()
.setCatalogName(getCatalogName())
.setCatalogProperties(getCatalogProperties())
.setConfigProperties(getConfigProperties())
.build();
}
}

@Override
protected SchemaTransform from(SchemaTransformConfiguration configuration) {
protected SchemaTransform from(Configuration configuration) {
return new IcebergWriteSchemaTransform(configuration);
}

Expand All @@ -78,9 +134,9 @@ public String identifier() {
}

static class IcebergWriteSchemaTransform extends SchemaTransform {
private final SchemaTransformConfiguration configuration;
private final Configuration configuration;

IcebergWriteSchemaTransform(SchemaTransformConfiguration configuration) {
IcebergWriteSchemaTransform(Configuration configuration) {
this.configuration = configuration;
}

Expand All @@ -89,7 +145,7 @@ Row getConfigurationRow() {
// To stay consistent with our SchemaTransform configuration naming conventions,
// we sort lexicographically and convert field names to snake_case
return SchemaRegistry.createDefault()
.getToRowFunction(SchemaTransformConfiguration.class)
.getToRowFunction(Configuration.class)
.apply(configuration)
.sorted()
.toSnakeCase();
Expand All @@ -102,11 +158,17 @@ Row getConfigurationRow() {
public PCollectionRowTuple expand(PCollectionRowTuple input) {
PCollection<Row> rows = input.get(INPUT_TAG);

IcebergIO.WriteRows writeTransform =
IcebergIO.writeRows(configuration.getIcebergCatalog())
.to(TableIdentifier.parse(configuration.getTable()));

Integer trigFreq = configuration.getTriggeringFrequencySeconds();
if (trigFreq != null) {
writeTransform = writeTransform.withTriggeringFrequency(Duration.standardSeconds(trigFreq));
}

// TODO: support dynamic destinations
IcebergWriteResult result =
rows.apply(
IcebergIO.writeRows(configuration.getIcebergCatalog())
.to(TableIdentifier.parse(configuration.getTable())));
IcebergWriteResult result = rows.apply(writeTransform);

PCollection<Row> snapshots =
result
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@

class RecordWriter {
private static final Logger LOG = LoggerFactory.getLogger(RecordWriter.class);
private final Counter activeWriters = Metrics.counter(RecordWriterManager.class, "activeWriters");
private final Counter activeIcebergWriters =
Metrics.counter(RecordWriterManager.class, "activeIcebergWriters");
private final DataWriter<Record> icebergDataWriter;
private final Table table;
private final String absoluteFilename;
Expand Down Expand Up @@ -92,7 +93,7 @@ class RecordWriter {
default:
throw new RuntimeException("Unknown File Format: " + fileFormat);
}
activeWriters.inc();
activeIcebergWriters.inc();
LOG.info(
"Opened {} writer for table {}, partition {}. Writing to path: {}",
fileFormat,
Expand All @@ -115,7 +116,7 @@ public void close() throws IOException {
fileFormat, table.name(), absoluteFilename),
e);
}
activeWriters.dec();
activeIcebergWriters.dec();
LOG.info("Closed {} writer for table {}, path: {}", fileFormat, table.name(), absoluteFilename);
}

Expand Down
Loading

0 comments on commit 75a4637

Please sign in to comment.