Skip to content

Commit

Permalink
[Managed Iceberg] Support writing to partitioned tables (#32102)
Browse files Browse the repository at this point in the history
* support writing partitioned data

* trigger integration tests

* partitioned record writer to manage writers for different partitions

* partitioned record writer

* reject rows when we are saturated with record writers

* refactor record writer manager

* add tests

* add more tests

* make record writer manager transient

* clean up test path

* cleanup

* cleanup

* address comments

* revert readability change

* add to changes md
  • Loading branch information
ahmedabu98 authored Aug 16, 2024
1 parent f1e2147 commit c9ad32e
Show file tree
Hide file tree
Showing 8 changed files with 843 additions and 303 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@

* Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
* Improvements to the performance of BigqueryIO when using withPropagateSuccessfulStorageApiWrites(true) method (Java) ([#31840](https://github.com/apache/beam/pull/31840)).
* [Managed Iceberg] Added support for writing to partitioned tables ([#32102](https://github.com/apache/beam/pull/32102))

## New Features / Improvements

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,12 @@
*/
package org.apache.beam.sdk.io.iceberg;

import static org.apache.beam.sdk.io.iceberg.IcebergUtils.beamRowToIcebergRecord;

import java.io.IOException;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.ManifestFiles;
import org.apache.iceberg.ManifestWriter;
import org.apache.iceberg.PartitionKey;
import org.apache.iceberg.Table;
import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.catalog.Catalog;
Expand All @@ -34,23 +31,37 @@
import org.apache.iceberg.io.DataWriter;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.parquet.Parquet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class RecordWriter {

private static final Logger LOG = LoggerFactory.getLogger(RecordWriter.class);
private final Counter activeWriters = Metrics.counter(RecordWriterManager.class, "activeWriters");
private final DataWriter<Record> icebergDataWriter;

private final Table table;
private final String absoluteFilename;
private final FileFormat fileFormat;

RecordWriter(Catalog catalog, IcebergDestination destination, String filename)
RecordWriter(
Catalog catalog, IcebergDestination destination, String filename, PartitionKey partitionKey)
throws IOException {
this(
catalog.loadTable(destination.getTableIdentifier()), destination.getFileFormat(), filename);
catalog.loadTable(destination.getTableIdentifier()),
destination.getFileFormat(),
filename,
partitionKey);
}

RecordWriter(Table table, FileFormat fileFormat, String filename) throws IOException {
RecordWriter(Table table, FileFormat fileFormat, String filename, PartitionKey partitionKey)
throws IOException {
this.table = table;
this.absoluteFilename = table.location() + "/" + filename;
this.fileFormat = fileFormat;
if (table.spec().isUnpartitioned()) {
absoluteFilename = table.locationProvider().newDataLocation(filename);
} else {
absoluteFilename =
table.locationProvider().newDataLocation(table.spec(), partitionKey, filename);
}
OutputFile outputFile = table.io().newOutputFile(absoluteFilename);

switch (fileFormat) {
Expand All @@ -60,6 +71,7 @@ class RecordWriter {
.createWriterFunc(org.apache.iceberg.data.avro.DataWriter::create)
.schema(table.schema())
.withSpec(table.spec())
.withPartition(partitionKey)
.overwrite()
.build();
break;
Expand All @@ -69,6 +81,7 @@ class RecordWriter {
.createWriterFunc(GenericParquetWriter::buildWriter)
.schema(table.schema())
.withSpec(table.spec())
.withPartition(partitionKey)
.overwrite()
.build();
break;
Expand All @@ -77,34 +90,38 @@ class RecordWriter {
default:
throw new RuntimeException("Unknown File Format: " + fileFormat);
}
activeWriters.inc();
LOG.info(
"Opened {} writer for table {}, partition {}. Writing to path: {}",
fileFormat,
table.name(),
partitionKey,
absoluteFilename);
}

public void write(Row row) {
Record record = beamRowToIcebergRecord(table.schema(), row);
public void write(Record record) {
icebergDataWriter.write(record);
}

public void close() throws IOException {
icebergDataWriter.close();
}

public Table getTable() {
return table;
try {
icebergDataWriter.close();
} catch (IOException e) {
throw new IOException(
String.format(
"Failed to close %s writer for table %s, path: %s",
fileFormat, table.name(), absoluteFilename),
e);
}
activeWriters.dec();
LOG.info("Closed {} writer for table {}, path: {}", fileFormat, table.name(), absoluteFilename);
}

public long bytesWritten() {
return icebergDataWriter.length();
}

public ManifestFile getManifestFile() throws IOException {
String manifestFilename = FileFormat.AVRO.addExtension(absoluteFilename + ".manifest");
OutputFile outputFile = table.io().newOutputFile(manifestFilename);
ManifestWriter<DataFile> manifestWriter;
try (ManifestWriter<DataFile> openWriter = ManifestFiles.write(getTable().spec(), outputFile)) {
openWriter.add(icebergDataWriter.toDataFile());
manifestWriter = openWriter;
}

return manifestWriter.toManifestFile();
public DataFile getDataFile() {
return icebergDataWriter.toDataFile();
}
}
Loading

0 comments on commit c9ad32e

Please sign in to comment.