From c9ad32ea8f44f6559222a8af274cdf552c6ba54c Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud <65791736+ahmedabu98@users.noreply.github.com> Date: Fri, 16 Aug 2024 08:34:51 -0700 Subject: [PATCH] [Managed Iceberg] Support writing to partitioned tables (#32102) * support writing partitioned data * trigger integration tests * partitioned record writer to manage writers for different partitions * partitioned record writer * reject rows when we are saturated with record writers * refactor record writer manager * add tests * add more tests * make record writer manager transient * clean up test path * cleanup * cleanup * address comments * revert readability change * add to changes md --- CHANGES.md | 1 + .../beam/sdk/io/iceberg/RecordWriter.java | 75 +++-- .../sdk/io/iceberg/RecordWriterManager.java | 298 ++++++++++++++++++ .../io/iceberg/WriteGroupedRowsToFiles.java | 49 ++- .../io/iceberg/WriteUngroupedRowsToFiles.java | 164 +++------- .../beam/sdk/io/iceberg/IcebergIOIT.java | 285 +++++++++-------- .../io/iceberg/RecordWriterManagerTest.java | 266 ++++++++++++++++ .../sdk/io/iceberg/TestDataWarehouse.java | 8 +- 8 files changed, 843 insertions(+), 303 deletions(-) create mode 100644 sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java create mode 100644 sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java diff --git a/CHANGES.md b/CHANGES.md index bb5e1f18db3e..c64042ecd8c9 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -64,6 +64,7 @@ * Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). * Improvements to the performance of BigqueryIO when using withPropagateSuccessfulStorageApiWrites(true) method (Java) ([#31840](https://github.com/apache/beam/pull/31840)). +* [Managed Iceberg] Added support for writing to partitioned tables ([#32102](https://github.com/apache/beam/pull/32102)) ## New Features / Improvements diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriter.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriter.java index d7212783d1b6..576d1e32c463 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriter.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriter.java @@ -17,15 +17,12 @@ */ package org.apache.beam.sdk.io.iceberg; -import static org.apache.beam.sdk.io.iceberg.IcebergUtils.beamRowToIcebergRecord; - import java.io.IOException; -import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.metrics.Counter; +import org.apache.beam.sdk.metrics.Metrics; import org.apache.iceberg.DataFile; import org.apache.iceberg.FileFormat; -import org.apache.iceberg.ManifestFile; -import org.apache.iceberg.ManifestFiles; -import org.apache.iceberg.ManifestWriter; +import org.apache.iceberg.PartitionKey; import org.apache.iceberg.Table; import org.apache.iceberg.avro.Avro; import org.apache.iceberg.catalog.Catalog; @@ -34,23 +31,37 @@ import org.apache.iceberg.io.DataWriter; import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.parquet.Parquet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; class RecordWriter { - + private static final Logger LOG = LoggerFactory.getLogger(RecordWriter.class); + private final Counter activeWriters = Metrics.counter(RecordWriterManager.class, "activeWriters"); private final DataWriter icebergDataWriter; - private final Table table; private final String absoluteFilename; + private final FileFormat fileFormat; - RecordWriter(Catalog catalog, IcebergDestination destination, String filename) + RecordWriter( + Catalog catalog, IcebergDestination destination, String filename, PartitionKey partitionKey) throws IOException { this( - catalog.loadTable(destination.getTableIdentifier()), destination.getFileFormat(), filename); + catalog.loadTable(destination.getTableIdentifier()), + destination.getFileFormat(), + filename, + partitionKey); } - RecordWriter(Table table, FileFormat fileFormat, String filename) throws IOException { + RecordWriter(Table table, FileFormat fileFormat, String filename, PartitionKey partitionKey) + throws IOException { this.table = table; - this.absoluteFilename = table.location() + "/" + filename; + this.fileFormat = fileFormat; + if (table.spec().isUnpartitioned()) { + absoluteFilename = table.locationProvider().newDataLocation(filename); + } else { + absoluteFilename = + table.locationProvider().newDataLocation(table.spec(), partitionKey, filename); + } OutputFile outputFile = table.io().newOutputFile(absoluteFilename); switch (fileFormat) { @@ -60,6 +71,7 @@ class RecordWriter { .createWriterFunc(org.apache.iceberg.data.avro.DataWriter::create) .schema(table.schema()) .withSpec(table.spec()) + .withPartition(partitionKey) .overwrite() .build(); break; @@ -69,6 +81,7 @@ class RecordWriter { .createWriterFunc(GenericParquetWriter::buildWriter) .schema(table.schema()) .withSpec(table.spec()) + .withPartition(partitionKey) .overwrite() .build(); break; @@ -77,34 +90,38 @@ class RecordWriter { default: throw new RuntimeException("Unknown File Format: " + fileFormat); } + activeWriters.inc(); + LOG.info( + "Opened {} writer for table {}, partition {}. Writing to path: {}", + fileFormat, + table.name(), + partitionKey, + absoluteFilename); } - public void write(Row row) { - Record record = beamRowToIcebergRecord(table.schema(), row); + public void write(Record record) { icebergDataWriter.write(record); } public void close() throws IOException { - icebergDataWriter.close(); - } - - public Table getTable() { - return table; + try { + icebergDataWriter.close(); + } catch (IOException e) { + throw new IOException( + String.format( + "Failed to close %s writer for table %s, path: %s", + fileFormat, table.name(), absoluteFilename), + e); + } + activeWriters.dec(); + LOG.info("Closed {} writer for table {}, path: {}", fileFormat, table.name(), absoluteFilename); } public long bytesWritten() { return icebergDataWriter.length(); } - public ManifestFile getManifestFile() throws IOException { - String manifestFilename = FileFormat.AVRO.addExtension(absoluteFilename + ".manifest"); - OutputFile outputFile = table.io().newOutputFile(manifestFilename); - ManifestWriter manifestWriter; - try (ManifestWriter openWriter = ManifestFiles.write(getTable().spec(), outputFile)) { - openWriter.add(icebergDataWriter.toDataFile()); - manifestWriter = openWriter; - } - - return manifestWriter.toManifestFile(); + public DataFile getDataFile() { + return icebergDataWriter.toDataFile(); } } diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java new file mode 100644 index 000000000000..b16f0caeb81b --- /dev/null +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java @@ -0,0 +1,298 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.iceberg; + +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.util.Preconditions; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.Cache; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.RemovalNotification; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.ManifestFile; +import org.apache.iceberg.ManifestFiles; +import org.apache.iceberg.ManifestWriter; +import org.apache.iceberg.PartitionKey; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.OutputFile; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A writer that manages multiple {@link RecordWriter}s to write to multiple tables and partitions. + * Assigns one {@link DestinationState} per windowed destination. A {@link DestinationState} assigns + * one writer per partition in table destination. If the Iceberg {@link Table} is un-partitioned, + * the data is written normally using one {@link RecordWriter} (i.e. the {@link DestinationState} + * has one writer). At any given moment, the number of open data writers should be less than or + * equal to the number of total partitions (across all windowed destinations). + * + *

A {@link DestinationState} maintains its writers in a {@link Cache}. If a {@link RecordWriter} + * is inactive for 1 minute, the {@link DestinationState} will automatically close it to free up + * resources. Calling {@link #close()} on this {@link RecordWriterManager} will do the following for + * each {@link DestinationState}: + * + *

    + *
  1. Close all underlying {@link RecordWriter}s + *
  2. Collect all {@link DataFile}s + *
  3. Create a new {@link ManifestFile} referencing these {@link DataFile}s + *
+ * + *

After closing, the resulting {@link ManifestFile}s can be retrieved using {@link + * #getManifestFiles()}. + */ +class RecordWriterManager implements AutoCloseable { + private static final Logger LOG = LoggerFactory.getLogger(RecordWriterManager.class); + + /** + * Represents the state of one Iceberg table destination. Creates one {@link RecordWriter} per + * partition and manages them in a {@link Cache}. + * + *

On closing, each writer's output {@link DataFile} is collected. + */ + class DestinationState { + private final IcebergDestination icebergDestination; + private final PartitionSpec spec; + private final org.apache.iceberg.Schema schema; + private final PartitionKey partitionKey; + private final String tableLocation; + private final FileIO fileIO; + private final String stateToken = UUID.randomUUID().toString(); + private final List dataFiles = Lists.newArrayList(); + @VisibleForTesting final Cache writers; + @VisibleForTesting final Map writerCounts = Maps.newHashMap(); + + DestinationState(IcebergDestination icebergDestination, Table table) { + this.icebergDestination = icebergDestination; + this.schema = table.schema(); + this.spec = table.spec(); + this.partitionKey = new PartitionKey(spec, schema); + this.tableLocation = table.location(); + this.fileIO = table.io(); + + // build a cache of RecordWriters. + // writers will expire after 1 min of idle time. + // when a writer expires, its data file is collected. + this.writers = + CacheBuilder.newBuilder() + .expireAfterAccess(1, TimeUnit.MINUTES) + .removalListener( + (RemovalNotification removal) -> { + final PartitionKey pk = Preconditions.checkStateNotNull(removal.getKey()); + final RecordWriter recordWriter = + Preconditions.checkStateNotNull(removal.getValue()); + try { + recordWriter.close(); + } catch (IOException e) { + throw new RuntimeException( + String.format( + "Encountered an error when closing data writer for table '%s', partition %s", + icebergDestination.getTableIdentifier(), pk), + e); + } + openWriters--; + dataFiles.add(recordWriter.getDataFile()); + }) + .build(); + } + + /** + * Computes the partition key for this Iceberg {@link Record} and writes it using the + * appropriate {@link RecordWriter}, creating new writers as needed. + * + *

However, if this {@link RecordWriterManager} is already saturated with writers, and we + * can't create a new writer, the {@link Record} is rejected and {@code false} is returned. + */ + boolean write(Record record) { + partitionKey.partition(record); + + if (!writers.asMap().containsKey(partitionKey) && openWriters >= maxNumWriters) { + return false; + } + RecordWriter writer = fetchWriterForPartition(partitionKey); + writer.write(record); + return true; + } + + /** + * Checks if a viable {@link RecordWriter} already exists for this partition and returns it. If + * no {@link RecordWriter} exists or if it has reached the maximum limit of bytes written, a new + * one is created and returned. + */ + private RecordWriter fetchWriterForPartition(PartitionKey partitionKey) { + RecordWriter recordWriter = writers.getIfPresent(partitionKey); + + if (recordWriter == null || recordWriter.bytesWritten() > maxFileSize) { + // calling invalidate for a non-existent key is a safe operation + writers.invalidate(partitionKey); + recordWriter = createWriter(partitionKey); + writers.put(partitionKey, recordWriter); + } + return recordWriter; + } + + private RecordWriter createWriter(PartitionKey partitionKey) { + // keep track of how many writers we opened for each destination-partition path + // use this as a prefix to differentiate the new path. + // this avoids overwriting a data file written by a previous writer in this destination state. + int recordIndex = writerCounts.merge(partitionKey, 1, Integer::sum); + try { + RecordWriter writer = + new RecordWriter( + catalog, + icebergDestination, + filePrefix + "_" + stateToken + "_" + recordIndex, + partitionKey); + openWriters++; + return writer; + } catch (IOException e) { + throw new RuntimeException( + String.format( + "Encountered an error when creating a RecordWriter for table '%s', partition %s.", + icebergDestination.getTableIdentifier(), partitionKey), + e); + } + } + + private String getManifestFileLocation(PaneInfo paneInfo) { + return FileFormat.AVRO.addExtension( + String.format( + "%s/metadata/%s-%s-%s.manifest", + tableLocation, filePrefix, stateToken, paneInfo.getIndex())); + } + } + + private final Catalog catalog; + private final String filePrefix; + private final long maxFileSize; + private final int maxNumWriters; + @VisibleForTesting int openWriters = 0; + + @VisibleForTesting + final Map, DestinationState> destinations = Maps.newHashMap(); + + private final Map, List> totalManifestFiles = + Maps.newHashMap(); + + private boolean isClosed = false; + + RecordWriterManager(Catalog catalog, String filePrefix, long maxFileSize, int maxNumWriters) { + this.catalog = catalog; + this.filePrefix = filePrefix; + this.maxFileSize = maxFileSize; + this.maxNumWriters = maxNumWriters; + } + + /** + * Fetches the appropriate {@link RecordWriter} for this destination and partition and writes the + * record. + * + *

If the {@link RecordWriterManager} is saturated (i.e. has hit the maximum limit of open + * writers), the record is rejected and {@code false} is returned. + */ + public boolean write(WindowedValue icebergDestination, Row row) { + DestinationState destinationState = + destinations.computeIfAbsent( + icebergDestination, + destination -> { + Table table = catalog.loadTable(destination.getValue().getTableIdentifier()); + return new DestinationState(destination.getValue(), table); + }); + + Record icebergRecord = IcebergUtils.beamRowToIcebergRecord(destinationState.schema, row); + return destinationState.write(icebergRecord); + } + + /** + * Closes all remaining writers and collects all their {@link DataFile}s. Writes one {@link + * ManifestFile} per windowed table destination. + */ + @Override + public void close() throws IOException { + for (Map.Entry, DestinationState> + windowedDestinationAndState : destinations.entrySet()) { + WindowedValue windowedDestination = windowedDestinationAndState.getKey(); + DestinationState state = windowedDestinationAndState.getValue(); + + // removing writers from the state's cache will trigger the logic to collect each writer's + // data file. + state.writers.invalidateAll(); + if (state.dataFiles.isEmpty()) { + continue; + } + + OutputFile outputFile = + state.fileIO.newOutputFile(state.getManifestFileLocation(windowedDestination.getPane())); + + ManifestWriter manifestWriter; + try (ManifestWriter openWriter = ManifestFiles.write(state.spec, outputFile)) { + openWriter.addAll(state.dataFiles); + manifestWriter = openWriter; + } + ManifestFile manifestFile = manifestWriter.toManifestFile(); + + LOG.info( + "Successfully wrote manifest file, adding {} data files ({} rows) to table '{}': {}.", + manifestFile.addedFilesCount(), + manifestFile.addedRowsCount(), + windowedDestination.getValue().getTableIdentifier(), + outputFile.location()); + + totalManifestFiles + .computeIfAbsent(windowedDestination, dest -> Lists.newArrayList()) + .add(manifestFile); + + state.dataFiles.clear(); + } + destinations.clear(); + checkArgument( + openWriters == 0, + "Expected all data writers to be closed, but found %s data writer(s) still open", + openWriters); + isClosed = true; + } + + /** + * Returns a list of accumulated windowed {@link ManifestFile}s for each windowed {@link + * IcebergDestination}. The {@link RecordWriterManager} must first be closed before this is + * called. + */ + public Map, List> getManifestFiles() { + checkState( + isClosed, + "Please close this %s before retrieving its manifest files.", + getClass().getSimpleName()); + return totalManifestFiles; + } +} diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteGroupedRowsToFiles.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteGroupedRowsToFiles.java index c11263519442..65cc3f3c3059 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteGroupedRowsToFiles.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteGroupedRowsToFiles.java @@ -17,15 +17,20 @@ */ package org.apache.beam.sdk.io.iceberg; -import java.io.IOException; +import java.util.List; import java.util.UUID; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.Row; import org.apache.beam.sdk.values.ShardedKey; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; +import org.apache.iceberg.ManifestFile; import org.apache.iceberg.catalog.Catalog; import org.checkerframework.checker.nullness.qual.MonotonicNonNull; @@ -59,6 +64,8 @@ private static class WriteGroupedRowsToFilesDoFn private final DynamicDestinations dynamicDestinations; private final IcebergCatalogConfig catalogConfig; private transient @MonotonicNonNull Catalog catalog; + private final String filePrefix; + private final long maxFileSize; WriteGroupedRowsToFilesDoFn( IcebergCatalogConfig catalogConfig, @@ -66,6 +73,8 @@ private static class WriteGroupedRowsToFilesDoFn long maxFileSize) { this.catalogConfig = catalogConfig; this.dynamicDestinations = dynamicDestinations; + this.filePrefix = UUID.randomUUID().toString(); + this.maxFileSize = maxFileSize; } private org.apache.iceberg.catalog.Catalog getCatalog() { @@ -75,28 +84,36 @@ private org.apache.iceberg.catalog.Catalog getCatalog() { return catalog; } - private RecordWriter createWriter(IcebergDestination destination) throws IOException { - return new RecordWriter(getCatalog(), destination, "-" + UUID.randomUUID()); - } - @ProcessElement public void processElement( - ProcessContext c, @Element KV, Iterable> element) throws Exception { + ProcessContext c, + @Element KV, Iterable> element, + BoundedWindow window, + PaneInfo pane) + throws Exception { Row destMetadata = element.getKey().getKey(); IcebergDestination destination = dynamicDestinations.instantiateDestination(destMetadata); - RecordWriter writer = createWriter(destination); - - for (Row e : element.getValue()) { - writer.write(e); + WindowedValue windowedDestination = + WindowedValue.of(destination, window.maxTimestamp(), window, pane); + RecordWriterManager writer; + try (RecordWriterManager openWriter = + new RecordWriterManager(getCatalog(), filePrefix, maxFileSize, Integer.MAX_VALUE)) { + writer = openWriter; + for (Row e : element.getValue()) { + writer.write(windowedDestination, e); + } } - writer.close(); - c.output( - FileWriteResult.builder() - .setTableIdentifier(destination.getTableIdentifier()) - .setManifestFile(writer.getManifestFile()) - .build()); + List manifestFiles = + Preconditions.checkNotNull(writer.getManifestFiles().get(windowedDestination)); + for (ManifestFile manifestFile : manifestFiles) { + c.output( + FileWriteResult.builder() + .setTableIdentifier(destination.getTableIdentifier()) + .setManifestFile(manifestFile) + .build()); + } } } } diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteUngroupedRowsToFiles.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteUngroupedRowsToFiles.java index a00f3de4bb4e..0ca06d797750 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteUngroupedRowsToFiles.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteUngroupedRowsToFiles.java @@ -18,9 +18,7 @@ package org.apache.beam.sdk.io.iceberg; import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull; -import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; -import java.io.IOException; import java.util.List; import java.util.Map; import java.util.UUID; @@ -29,6 +27,8 @@ import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.PInput; @@ -38,10 +38,11 @@ import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TupleTagList; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; +import org.apache.iceberg.ManifestFile; import org.apache.iceberg.catalog.Catalog; import org.checkerframework.checker.nullness.qual.MonotonicNonNull; import org.checkerframework.checker.nullness.qual.Nullable; @@ -66,7 +67,7 @@ class WriteUngroupedRowsToFiles private static final TupleTag WRITTEN_ROWS_TAG = new TupleTag("writtenRows") {}; private static final TupleTag SPILLED_ROWS_TAG = new TupleTag("spilledRows") {}; - private final String fileSuffix; + private final String filePrefix; private final DynamicDestinations dynamicDestinations; private final IcebergCatalogConfig catalogConfig; @@ -74,7 +75,7 @@ class WriteUngroupedRowsToFiles IcebergCatalogConfig catalogConfig, DynamicDestinations dynamicDestinations) { this.catalogConfig = catalogConfig; this.dynamicDestinations = dynamicDestinations; - this.fileSuffix = UUID.randomUUID().toString(); + this.filePrefix = UUID.randomUUID().toString(); } @Override @@ -86,7 +87,7 @@ public Result expand(PCollection input) { new WriteUngroupedRowsToFilesDoFn( catalogConfig, dynamicDestinations, - fileSuffix, + filePrefix, DEFAULT_MAX_WRITERS_PER_BUNDLE, DEFAULT_MAX_BYTES_PER_FILE)) .withOutputTags( @@ -174,10 +175,8 @@ private static class WriteUngroupedRowsToFilesDoFn extends DoFn writers; - private transient @MonotonicNonNull Map windows; private transient @MonotonicNonNull Catalog catalog; + private transient @Nullable RecordWriterManager recordWriterManager; public WriteUngroupedRowsToFilesDoFn( IcebergCatalogConfig catalogConfig, @@ -192,20 +191,6 @@ public WriteUngroupedRowsToFilesDoFn( this.maxFileSize = maxFileSize; } - private Map getWriters() { - if (writers == null) { - writers = Maps.newHashMap(); - } - return writers; - } - - private Map getWindows() { - if (windows == null) { - windows = Maps.newHashMap(); - } - return windows; - } - private org.apache.iceberg.catalog.Catalog getCatalog() { if (catalog == null) { this.catalog = catalogConfig.catalog(); @@ -213,137 +198,62 @@ private org.apache.iceberg.catalog.Catalog getCatalog() { return catalog; } - private RecordWriter createAndInsertWriter(IcebergDestination destination, BoundedWindow window) - throws IOException { - RecordWriter writer = - new RecordWriter(getCatalog(), destination, filename + "-" + UUID.randomUUID()); - getWindows().put(destination, window); - getWriters().put(destination, writer); - return writer; - } - - /** - * Returns active writer for this destination if possible. If this returns null then we have - * reached the maximum number of writers and should spill any records associated. - */ - @Nullable - RecordWriter getWriterIfPossible(IcebergDestination destination, BoundedWindow window) - throws IOException { - - RecordWriter existingWriter = getWriters().get(destination); - if (existingWriter != null) { - return existingWriter; - } - - if (getWriters().size() > maxWritersPerBundle) { - return null; - } - - return createAndInsertWriter(destination, window); - } - @StartBundle - public void startBundle() {} + public void startBundle() { + recordWriterManager = + new RecordWriterManager(getCatalog(), filename, maxFileSize, maxWritersPerBundle); + } @ProcessElement - public void processElement(@Element Row element, BoundedWindow window, MultiOutputReceiver out) + public void processElement( + @Element Row element, BoundedWindow window, PaneInfo pane, MultiOutputReceiver out) throws Exception { Row data = checkArgumentNotNull(element.getRow("data"), "Input row missing `data` field."); Row destMetadata = checkArgumentNotNull(element.getRow("dest"), "Input row missing `dest` field."); IcebergDestination destination = dynamicDestinations.instantiateDestination(destMetadata); + WindowedValue windowedDestination = + WindowedValue.of(destination, window.maxTimestamp(), window, pane); - // Spill record if writer cannot be created - RecordWriter writer = getWriterIfPossible(destination, window); - if (writer == null) { - out.get(SPILLED_ROWS_TAG).output(element); - return; - } - - // Reset writer if max file size reached - if (writer.bytesWritten() > maxFileSize) { - writer.close(); - out.get(WRITTEN_FILES_TAG) - .output( - FileWriteResult.builder() - .setManifestFile(writer.getManifestFile()) - .setTableIdentifier(destination.getTableIdentifier()) - .build()); - writer = createAndInsertWriter(destination, window); - } - - // Actually write the data + // Attempt to write record. If the writer is saturated and cannot accept + // the record, spill it over to WriteGroupedRowsToFiles + boolean writeSuccess; try { - writer.write(data); - out.get(WRITTEN_ROWS_TAG).output(element); + writeSuccess = + Preconditions.checkNotNull(recordWriterManager).write(windowedDestination, data); } catch (Exception e) { try { - writer.close(); + Preconditions.checkNotNull(recordWriterManager).close(); } catch (Exception closeException) { e.addSuppressed(closeException); } throw e; } + out.get(writeSuccess ? WRITTEN_ROWS_TAG : SPILLED_ROWS_TAG).output(element); } @FinishBundle public void finishBundle(FinishBundleContext c) throws Exception { - closeAllWriters(); - outputFinalWrittenFiles(c); - getWriters().clear(); - } - - private void outputFinalWrittenFiles(DoFn.FinishBundleContext c) - throws Exception { - List exceptionList = Lists.newArrayList(); - for (Map.Entry entry : getWriters().entrySet()) { - try { - IcebergDestination destination = entry.getKey(); + if (recordWriterManager == null) { + return; + } + recordWriterManager.close(); + for (Map.Entry, List> destinationAndFiles : + Preconditions.checkNotNull(recordWriterManager).getManifestFiles().entrySet()) { + WindowedValue windowedDestination = destinationAndFiles.getKey(); - RecordWriter writer = entry.getValue(); - BoundedWindow window = - checkStateNotNull( - getWindows().get(destination), "internal error: no windows for destination"); + for (ManifestFile manifestFile : destinationAndFiles.getValue()) { c.output( FileWriteResult.builder() - .setManifestFile(writer.getManifestFile()) - .setTableIdentifier(destination.getTableIdentifier()) + .setManifestFile(manifestFile) + .setTableIdentifier(windowedDestination.getValue().getTableIdentifier()) .build(), - window.maxTimestamp(), - window); - } catch (Exception e) { - exceptionList.add(e); - } - } - - if (!exceptionList.isEmpty()) { - Exception e = - new IOException("Exception emitting writer metadata. See suppressed exceptions"); - for (Exception thrown : exceptionList) { - e.addSuppressed(thrown); + windowedDestination.getTimestamp(), + Iterables.getFirst(windowedDestination.getWindows(), null)); } - throw e; - } - } - - private void closeAllWriters() throws Exception { - List exceptionList = Lists.newArrayList(); - for (RecordWriter writer : getWriters().values()) { - try { - writer.close(); - } catch (Exception e) { - exceptionList.add(e); - } - } - - if (!exceptionList.isEmpty()) { - Exception e = new IOException("Exception closing some writers. See suppressed exceptions."); - for (Exception thrown : exceptionList) { - e.addSuppressed(thrown); - } - throw e; } + recordWriterManager = null; } } } diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java index 3a169eeb40da..2e748e9644e8 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java @@ -22,21 +22,18 @@ import java.io.IOException; import java.io.Serializable; -import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.UUID; import java.util.stream.Collectors; -import java.util.stream.IntStream; +import java.util.stream.LongStream; import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; import org.apache.beam.sdk.managed.Managed; -import org.apache.beam.sdk.options.Default; -import org.apache.beam.sdk.options.Description; -import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.SimpleFunction; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.Row; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; @@ -45,12 +42,11 @@ import org.apache.iceberg.CatalogUtil; import org.apache.iceberg.CombinedScanTask; import org.apache.iceberg.FileScanTask; -import org.apache.iceberg.Schema; +import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Table; import org.apache.iceberg.TableScan; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.TableIdentifier; -import org.apache.iceberg.data.GenericRecord; import org.apache.iceberg.data.Record; import org.apache.iceberg.data.parquet.GenericParquetReaders; import org.apache.iceberg.data.parquet.GenericParquetWriter; @@ -72,26 +68,76 @@ /** Integration tests for {@link IcebergIO} source and sink. */ @RunWith(JUnit4.class) public class IcebergIOIT implements Serializable { + private static final org.apache.beam.sdk.schemas.Schema DOUBLY_NESTED_ROW_SCHEMA = + org.apache.beam.sdk.schemas.Schema.builder() + .addStringField("doubly_nested_str") + .addInt64Field("doubly_nested_float") + .build(); - public interface IcebergIOTestPipelineOptions extends GcpOptions { - @Description("Number of records that will be written and/or read by the test") - @Default.Integer(1000) - Integer getNumRecords(); - - void setNumRecords(Integer numRecords); - - @Description("Number of shards in the test table") - @Default.Integer(10) - Integer getNumShards(); - - void setNumShards(Integer numShards); - } + private static final org.apache.beam.sdk.schemas.Schema NESTED_ROW_SCHEMA = + org.apache.beam.sdk.schemas.Schema.builder() + .addStringField("nested_str") + .addRowField("nested_row", DOUBLY_NESTED_ROW_SCHEMA) + .addInt32Field("nested_int") + .addFloatField("nested_float") + .build(); + private static final org.apache.beam.sdk.schemas.Schema BEAM_SCHEMA = + org.apache.beam.sdk.schemas.Schema.builder() + .addStringField("str") + .addInt64Field("modulo_5") + .addBooleanField("bool") + .addInt32Field("int") + .addRowField("row", NESTED_ROW_SCHEMA) + .addArrayField("arr_long", org.apache.beam.sdk.schemas.Schema.FieldType.INT64) + .addNullableRowField("nullable_row", NESTED_ROW_SCHEMA) + .addNullableInt64Field("nullable_long") + .build(); - @Rule public TestPipeline writePipeline = TestPipeline.create(); + private static final SimpleFunction ROW_FUNC = + new SimpleFunction() { + @Override + public Row apply(Long num) { + String strNum = Long.toString(num); + Row nestedRow = + Row.withSchema(NESTED_ROW_SCHEMA) + .addValue("nested_str_value_" + strNum) + .addValue( + Row.withSchema(DOUBLY_NESTED_ROW_SCHEMA) + .addValue("doubly_nested_str_value_" + strNum) + .addValue(num) + .build()) + .addValue(Integer.valueOf(strNum)) + .addValue(Float.valueOf(strNum + "." + strNum)) + .build(); + + return Row.withSchema(BEAM_SCHEMA) + .addValue("value_" + strNum) + .addValue(num % 5) + .addValue(num % 2 == 0) + .addValue(Integer.valueOf(strNum)) + .addValue(nestedRow) + .addValue(LongStream.range(0, num % 10).boxed().collect(Collectors.toList())) + .addValue(num % 2 == 0 ? null : nestedRow) + .addValue(num) + .build(); + } + }; + + private static final org.apache.iceberg.Schema ICEBERG_SCHEMA = + IcebergUtils.beamSchemaToIcebergSchema(BEAM_SCHEMA); + private static final SimpleFunction RECORD_FUNC = + new SimpleFunction() { + @Override + public Record apply(Row input) { + return IcebergUtils.beamRowToIcebergRecord(ICEBERG_SCHEMA, input); + } + }; + private static final Integer NUM_RECORDS = 1000; + private static final Integer NUM_SHARDS = 10; - @Rule public TestPipeline readPipeline = TestPipeline.create(); + @Rule public TestPipeline pipeline = TestPipeline.create(); - static IcebergIOTestPipelineOptions options; + static GcpOptions options; static Configuration catalogHadoopConf; @@ -100,11 +146,11 @@ public interface IcebergIOTestPipelineOptions extends GcpOptions { private String warehouseLocation; private TableIdentifier tableId; + private Catalog catalog; @BeforeClass public static void beforeClass() { - PipelineOptionsFactory.register(IcebergIOTestPipelineOptions.class); - options = TestPipeline.testingPipelineOptions().as(IcebergIOTestPipelineOptions.class); + options = TestPipeline.testingPipelineOptions().as(GcpOptions.class); catalogHadoopConf = new Configuration(); catalogHadoopConf.set("fs.gs.project.id", options.getProject()); @@ -114,53 +160,21 @@ public static void beforeClass() { @Before public void setUp() { warehouseLocation = - String.format( - "%s/IcebergIOIT/%s/%s", - options.getTempLocation(), testName.getMethodName(), UUID.randomUUID()); + String.format("%s/IcebergIOIT/%s", options.getTempLocation(), UUID.randomUUID()); - tableId = - TableIdentifier.of( - testName.getMethodName(), "table" + Long.toString(UUID.randomUUID().hashCode(), 16)); + tableId = TableIdentifier.of(testName.getMethodName(), "test_table"); + catalog = new HadoopCatalog(catalogHadoopConf, warehouseLocation); } - static final org.apache.beam.sdk.schemas.Schema BEAM_SCHEMA = - org.apache.beam.sdk.schemas.Schema.builder() - .addInt32Field("int") - .addFloatField("float") - .addDoubleField("double") - .addInt64Field("long") - .addStringField("str") - .addBooleanField("bool") - .addByteArrayField("bytes") - .build(); - - static final Schema ICEBERG_SCHEMA = IcebergUtils.beamSchemaToIcebergSchema(BEAM_SCHEMA); - - Map getValues(int num) { - String strNum = Integer.toString(num); - return ImmutableMap.builder() - .put("int", num) - .put("float", Float.valueOf(strNum)) - .put("double", Double.valueOf(strNum)) - .put("long", Long.valueOf(strNum)) - .put("str", strNum) - .put("bool", num % 2 == 0) - .put("bytes", ByteBuffer.wrap(new byte[] {(byte) num})) - .build(); - } - - /** - * Populates the Iceberg table according to the configuration specified in {@link - * IcebergIOTestPipelineOptions}. Returns a {@link List} of expected elements. - */ - List populateTable(Table table) throws IOException { - double recordsPerShardFraction = options.getNumRecords().doubleValue() / options.getNumShards(); + /** Populates the Iceberg table and Returns a {@link List} of expected elements. */ + private List populateTable(Table table) throws IOException { + double recordsPerShardFraction = NUM_RECORDS.doubleValue() / NUM_SHARDS; long maxRecordsPerShard = Math.round(Math.ceil(recordsPerShardFraction)); AppendFiles appendFiles = table.newAppend(); - List expectedRows = new ArrayList<>(options.getNumRecords()); + List expectedRows = new ArrayList<>(NUM_RECORDS); int totalRecords = 0; - for (int shardNum = 0; shardNum < options.getNumShards(); ++shardNum) { + for (int shardNum = 0; shardNum < NUM_SHARDS; ++shardNum) { String filepath = table.location() + "/" + UUID.randomUUID(); OutputFile file = table.io().newOutputFile(filepath); DataWriter writer = @@ -172,14 +186,14 @@ List populateTable(Table table) throws IOException { .build(); for (int recordNum = 0; - recordNum < maxRecordsPerShard && totalRecords < options.getNumRecords(); + recordNum < maxRecordsPerShard && totalRecords < NUM_RECORDS; ++recordNum, ++totalRecords) { - Map values = getValues(recordNum); - GenericRecord rec = GenericRecord.create(ICEBERG_SCHEMA).copy(values); - writer.write(rec); + Row expectedBeamRow = ROW_FUNC.apply((long) recordNum); + Record icebergRecord = RECORD_FUNC.apply(expectedBeamRow); - expectedRows.add(Row.withSchema(BEAM_SCHEMA).withFieldValues(values).build()); + writer.write(icebergRecord); + expectedRows.add(expectedBeamRow); } writer.close(); appendFiles.appendFile(writer.toDataFile()); @@ -189,6 +203,44 @@ List populateTable(Table table) throws IOException { return expectedRows; } + private List readRecords(Table table) { + TableScan tableScan = table.newScan().project(ICEBERG_SCHEMA); + List writtenRecords = new ArrayList<>(); + for (CombinedScanTask task : tableScan.planTasks()) { + InputFilesDecryptor descryptor = + new InputFilesDecryptor(task, table.io(), table.encryption()); + for (FileScanTask fileTask : task.files()) { + InputFile inputFile = descryptor.getInputFile(fileTask); + CloseableIterable iterable = + Parquet.read(inputFile) + .split(fileTask.start(), fileTask.length()) + .project(ICEBERG_SCHEMA) + .createReaderFunc( + fileSchema -> GenericParquetReaders.buildReader(ICEBERG_SCHEMA, fileSchema)) + .filter(fileTask.residual()) + .build(); + + for (Record rec : iterable) { + writtenRecords.add(rec); + } + } + } + return writtenRecords; + } + + private Map managedIcebergConfig() { + return ImmutableMap.builder() + .put("table", tableId.toString()) + .put("catalog_name", "test-name") + .put( + "catalog_properties", + ImmutableMap.builder() + .put("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP) + .put("warehouse", warehouseLocation) + .build()) + .build(); + } + /** * Test of a predetermined moderate number of records written directly to Iceberg then read via a * Beam pipeline. Table initialization is done on a single process using the Iceberg APIs so the @@ -196,90 +248,63 @@ List populateTable(Table table) throws IOException { */ @Test public void testRead() throws Exception { - Catalog catalog = new HadoopCatalog(catalogHadoopConf, warehouseLocation); Table table = catalog.createTable(tableId, ICEBERG_SCHEMA); List expectedRows = populateTable(table); - Map config = - ImmutableMap.builder() - .put("table", tableId.toString()) - .put("catalog_name", "test-name") - .put( - "catalog_properties", - ImmutableMap.builder() - .put("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP) - .put("warehouse", warehouseLocation) - .build()) - .build(); + Map config = managedIcebergConfig(); PCollection rows = - readPipeline.apply(Managed.read(Managed.ICEBERG).withConfig(config)).getSinglePCollection(); + pipeline.apply(Managed.read(Managed.ICEBERG).withConfig(config)).getSinglePCollection(); PAssert.that(rows).containsInAnyOrder(expectedRows); - readPipeline.run().waitUntilFinish(); + pipeline.run().waitUntilFinish(); } + private static final List INPUT_ROWS = + LongStream.range(0, NUM_RECORDS).boxed().map(ROW_FUNC::apply).collect(Collectors.toList()); + /** * Test of a predetermined moderate number of records written to Iceberg using a Beam pipeline, * then read directly using Iceberg API. */ @Test public void testWrite() { - Catalog catalog = new HadoopCatalog(catalogHadoopConf, warehouseLocation); Table table = catalog.createTable(tableId, ICEBERG_SCHEMA); - List inputRecords = - IntStream.range(0, options.getNumRecords()) - .boxed() - .map(i -> GenericRecord.create(ICEBERG_SCHEMA).copy(getValues(i))) - .collect(Collectors.toList()); + // Write with Beam + Map config = managedIcebergConfig(); + PCollection input = pipeline.apply(Create.of(INPUT_ROWS)).setRowSchema(BEAM_SCHEMA); + input.apply(Managed.write(Managed.ICEBERG).withConfig(config)); + pipeline.run().waitUntilFinish(); - List inputRows = - inputRecords.stream() - .map(record -> IcebergUtils.icebergRecordToBeamRow(BEAM_SCHEMA, record)) - .collect(Collectors.toList()); + // Read back and check records are correct + List returnedRecords = readRecords(table); + assertThat( + returnedRecords, containsInAnyOrder(INPUT_ROWS.stream().map(RECORD_FUNC::apply).toArray())); + } - // Write with Beam - Map config = - ImmutableMap.builder() - .put("table", tableId.toString()) - .put("catalog_name", "test-name") - .put( - "catalog_properties", - ImmutableMap.builder() - .put("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP) - .put("warehouse", warehouseLocation) - .build()) + @Test + public void testWritePartitionedData() { + // For an example row where bool=true, modulo_5=3, str=value_303, + // this partition spec will create a partition like: /bool=true/modulo_5=3/str_trunc=value_3/ + PartitionSpec partitionSpec = + PartitionSpec.builderFor(ICEBERG_SCHEMA) + .identity("bool") + .identity("modulo_5") + .truncate("str", "value_x".length()) .build(); + Table table = catalog.createTable(tableId, ICEBERG_SCHEMA, partitionSpec); - PCollection input = writePipeline.apply(Create.of(inputRows)).setRowSchema(BEAM_SCHEMA); + // Write with Beam + Map config = managedIcebergConfig(); + PCollection input = pipeline.apply(Create.of(INPUT_ROWS)).setRowSchema(BEAM_SCHEMA); input.apply(Managed.write(Managed.ICEBERG).withConfig(config)); - - writePipeline.run().waitUntilFinish(); + pipeline.run().waitUntilFinish(); // Read back and check records are correct - TableScan tableScan = table.newScan().project(ICEBERG_SCHEMA); - List writtenRecords = new ArrayList<>(); - for (CombinedScanTask task : tableScan.planTasks()) { - InputFilesDecryptor decryptor = new InputFilesDecryptor(task, table.io(), table.encryption()); - for (FileScanTask fileTask : task.files()) { - InputFile inputFile = decryptor.getInputFile(fileTask); - CloseableIterable iterable = - Parquet.read(inputFile) - .split(fileTask.start(), fileTask.length()) - .project(ICEBERG_SCHEMA) - .createReaderFunc( - fileSchema -> GenericParquetReaders.buildReader(ICEBERG_SCHEMA, fileSchema)) - .filter(fileTask.residual()) - .build(); - - for (Record rec : iterable) { - writtenRecords.add(rec); - } - } - } - - assertThat(inputRecords, containsInAnyOrder(writtenRecords.toArray())); + List returnedRecords = readRecords(table); + assertThat( + returnedRecords, containsInAnyOrder(INPUT_ROWS.stream().map(RECORD_FUNC::apply).toArray())); } } diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java new file mode 100644 index 000000000000..1c2e8bc2c451 --- /dev/null +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java @@ -0,0 +1,266 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.iceberg; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.Map; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; +import org.apache.commons.lang3.RandomStringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.ManifestFile; +import org.apache.iceberg.PartitionKey; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.hadoop.HadoopCatalog; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.rules.TestName; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Test class for {@link RecordWriterManager}. */ +@RunWith(JUnit4.class) +public class RecordWriterManagerTest { + @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); + + @Rule + public transient TestDataWarehouse warehouse = new TestDataWarehouse(TEMPORARY_FOLDER, "default"); + + @Rule public TestName testName = new TestName(); + private static final Schema BEAM_SCHEMA = + Schema.builder().addInt32Field("id").addStringField("name").addBooleanField("bool").build(); + private static final org.apache.iceberg.Schema ICEBERG_SCHEMA = + IcebergUtils.beamSchemaToIcebergSchema(BEAM_SCHEMA); + private static final PartitionSpec PARTITION_SPEC = + PartitionSpec.builderFor(ICEBERG_SCHEMA).truncate("name", 3).identity("bool").build(); + + private WindowedValue windowedDestination; + private HadoopCatalog catalog; + + @Before + public void setUp() { + windowedDestination = + getWindowedDestination("table_" + testName.getMethodName(), PARTITION_SPEC); + catalog = new HadoopCatalog(new Configuration(), warehouse.location); + } + + private WindowedValue getWindowedDestination( + String tableName, @Nullable PartitionSpec partitionSpec) { + TableIdentifier tableIdentifier = TableIdentifier.of("default", tableName); + + warehouse.createTable(tableIdentifier, ICEBERG_SCHEMA, partitionSpec); + + IcebergDestination icebergDestination = + IcebergDestination.builder() + .setFileFormat(FileFormat.PARQUET) + .setTableIdentifier(tableIdentifier) + .build(); + return WindowedValue.of( + icebergDestination, + GlobalWindow.TIMESTAMP_MAX_VALUE, + GlobalWindow.INSTANCE, + PaneInfo.NO_FIRING); + } + + @Test + public void testCreateNewWriterForEachDestination() throws IOException { + // Writer manager with a maximum limit of 3 writers + RecordWriterManager writerManager = new RecordWriterManager(catalog, "test_file_name", 1000, 3); + assertEquals(0, writerManager.openWriters); + + boolean writeSuccess; + + WindowedValue dest1 = getWindowedDestination("dest1", null); + WindowedValue dest2 = getWindowedDestination("dest2", null); + WindowedValue dest3 = getWindowedDestination("dest3", PARTITION_SPEC); + WindowedValue dest4 = getWindowedDestination("dest4", null); + + // dest1 + // This is a new destination so a new writer will be created. + Row row = Row.withSchema(BEAM_SCHEMA).addValues(1, "aaa", true).build(); + writeSuccess = writerManager.write(dest1, row); + assertTrue(writeSuccess); + assertEquals(1, writerManager.openWriters); + + // dest2 + // This is a new destination so a new writer will be created. + row = Row.withSchema(BEAM_SCHEMA).addValues(1, "aaa", true).build(); + writeSuccess = writerManager.write(dest2, row); + assertTrue(writeSuccess); + assertEquals(2, writerManager.openWriters); + + // dest3, partition: [aaa, true] + // This is a new destination so a new writer will be created. + row = Row.withSchema(BEAM_SCHEMA).addValues(1, "aaa", true).build(); + writeSuccess = writerManager.write(dest3, row); + assertTrue(writeSuccess); + assertEquals(3, writerManager.openWriters); + + // dest4 + // This is a new destination, but the writer manager is saturated with 3 writers. reject the + // record + row = Row.withSchema(BEAM_SCHEMA).addValues(1, "aaa", true).build(); + writeSuccess = writerManager.write(dest4, row); + assertFalse(writeSuccess); + assertEquals(3, writerManager.openWriters); + + // dest3, partition: [aaa, false] + // new partition, but the writer manager is saturated with 3 writers. reject the record + row = Row.withSchema(BEAM_SCHEMA).addValues(1, "aaa", false).build(); + writeSuccess = writerManager.write(dest3, row); + assertFalse(writeSuccess); + assertEquals(3, writerManager.openWriters); + + // Closing PartitionRecordWriter will close all writers. + writerManager.close(); + assertEquals(0, writerManager.openWriters); + + // We should only have 3 manifest files (one for each destination we wrote to) + assertEquals(3, writerManager.getManifestFiles().keySet().size()); + assertThat(writerManager.getManifestFiles().keySet(), containsInAnyOrder(dest1, dest2, dest3)); + } + + @Test + public void testCreateNewWriterForEachPartition() throws IOException { + // Writer manager with a maximum limit of 3 writers + RecordWriterManager writerManager = new RecordWriterManager(catalog, "test_file_name", 1000, 3); + assertEquals(0, writerManager.openWriters); + + boolean writeSuccess; + + // partition: [aaa, true]. + // This is a new partition so a new writer will be created. + Row row = Row.withSchema(BEAM_SCHEMA).addValues(1, "aaa", true).build(); + writeSuccess = writerManager.write(windowedDestination, row); + assertTrue(writeSuccess); + assertEquals(1, writerManager.openWriters); + + // partition: [bbb, false]. + // This is a new partition so a new writer will be created. + row = Row.withSchema(BEAM_SCHEMA).addValues(2, "bbb", false).build(); + writeSuccess = writerManager.write(windowedDestination, row); + assertTrue(writeSuccess); + assertEquals(2, writerManager.openWriters); + + // partition: [bbb, false]. + // A writer already exists for this partition, so no new writers are created. + row = Row.withSchema(BEAM_SCHEMA).addValues(3, "bbbaaa", false).build(); + writeSuccess = writerManager.write(windowedDestination, row); + assertTrue(writeSuccess); + assertEquals(2, writerManager.openWriters); + + // partition: [bbb, true]. + // This is a new partition so a new writer will be created. + row = Row.withSchema(BEAM_SCHEMA).addValues(4, "bbb123", true).build(); + writeSuccess = writerManager.write(windowedDestination, row); + assertTrue(writeSuccess); + assertEquals(3, writerManager.openWriters); + + // partition: [aaa, false]. + // The writerManager is already saturated with three writers. This record is rejected. + row = Row.withSchema(BEAM_SCHEMA).addValues(5, "aaa123", false).build(); + writeSuccess = writerManager.write(windowedDestination, row); + assertFalse(writeSuccess); + assertEquals(3, writerManager.openWriters); + + // Closing PartitionRecordWriter will close all writers. + writerManager.close(); + assertEquals(0, writerManager.openWriters); + + assertEquals(1, writerManager.getManifestFiles().size()); + ManifestFile manifestFile = + Iterables.getOnlyElement(writerManager.getManifestFiles().get(windowedDestination)); + + assertEquals(3, manifestFile.addedFilesCount().intValue()); + assertEquals(4, manifestFile.addedRowsCount().intValue()); + } + + @Test + public void testRespectMaxFileSize() throws IOException { + // Writer manager with a maximum file size of 100 bytes + RecordWriterManager writerManager = new RecordWriterManager(catalog, "test_file_name", 100, 2); + assertEquals(0, writerManager.openWriters); + boolean writeSuccess; + + PartitionKey partitionKey = new PartitionKey(PARTITION_SPEC, ICEBERG_SCHEMA); + // row partition:: [aaa, true]. + // This is a new partition so a new writer will be created. + Row row = Row.withSchema(BEAM_SCHEMA).addValues(1, "aaa", true).build(); + writeSuccess = writerManager.write(windowedDestination, row); + assertTrue(writeSuccess); + assertEquals(1, writerManager.openWriters); + + partitionKey.partition(IcebergUtils.beamRowToIcebergRecord(ICEBERG_SCHEMA, row)); + Map writerCounts = + writerManager.destinations.get(windowedDestination).writerCounts; + // this is our first writer + assertEquals(1, writerCounts.get(partitionKey).intValue()); + + // row partition:: [aaa, true]. + // existing partition. use existing writer + row = + Row.withSchema(BEAM_SCHEMA) + .addValues(2, "aaa" + RandomStringUtils.randomAlphanumeric(1000), true) + .build(); + writeSuccess = writerManager.write(windowedDestination, row); + assertTrue(writeSuccess); + assertEquals(1, writerManager.openWriters); + // check that we still use our first writer + assertEquals(1, writerCounts.get(partitionKey).intValue()); + + // row partition:: [aaa, true]. + // writer has reached max file size. create a new writer + row = Row.withSchema(BEAM_SCHEMA).addValues(2, "aaabb", true).build(); + writeSuccess = writerManager.write(windowedDestination, row); + assertTrue(writeSuccess); + // check that we have opened and are using a second writer + assertEquals(2, writerCounts.get(partitionKey).intValue()); + // check that only one writer is open (we have closed the first writer) + assertEquals(1, writerManager.openWriters); + + writerManager.close(); + assertEquals(0, writerManager.openWriters); + } + + @Test + public void testRequireClosingBeforeFetchingManifestFiles() { + RecordWriterManager writerManager = new RecordWriterManager(catalog, "test_file_name", 100, 2); + Row row = Row.withSchema(BEAM_SCHEMA).addValues(1, "aaa", true).build(); + writerManager.write(windowedDestination, row); + assertEquals(1, writerManager.openWriters); + + assertThrows(IllegalStateException.class, writerManager::getManifestFiles); + } +} diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/TestDataWarehouse.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/TestDataWarehouse.java index a8f63383801b..ad4fc6b382d4 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/TestDataWarehouse.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/TestDataWarehouse.java @@ -44,6 +44,7 @@ import org.apache.iceberg.io.FileAppender; import org.apache.iceberg.orc.ORC; import org.apache.iceberg.parquet.Parquet; +import org.checkerframework.checker.nullness.qual.Nullable; import org.junit.Assert; import org.junit.rules.ExternalResource; import org.junit.rules.TemporaryFolder; @@ -140,7 +141,12 @@ public DataFile writeRecords(String filename, Schema schema, List record } public Table createTable(TableIdentifier tableId, Schema schema) { + return createTable(tableId, schema, null); + } + + public Table createTable( + TableIdentifier tableId, Schema schema, @Nullable PartitionSpec partitionSpec) { someTableHasBeenCreated = true; - return catalog.createTable(tableId, schema); + return catalog.createTable(tableId, schema, partitionSpec); } }