From deb4b4ada3a414205a87de0dcad745c95f45a2e5 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud <65791736+ahmedabu98@users.noreply.github.com> Date: Mon, 30 Sep 2024 19:30:22 -0400 Subject: [PATCH] Managed Iceberg dynamic destinations (#32565) * iceberg dynamic destinations * update CHANGES * spotless * fix null error * fix some tests * fix some tests * better row filter error message * clarify keep-only-drop mutual exclusivity * use constants for field names --- .../IO_Iceberg_Integration_Tests.json | 2 +- CHANGES.md | 2 + .../org/apache/beam/sdk/util/RowFilter.java | 30 ++- .../beam/sdk/util/RowStringInterpolator.java | 10 +- .../apache/beam/sdk/util/RowFilterTest.java | 8 +- .../sdk/util/RowStringInterpolatorTest.java | 34 ++- .../sdk/io/iceberg/AssignDestinations.java | 30 ++- .../sdk/io/iceberg/DynamicDestinations.java | 13 +- .../apache/beam/sdk/io/iceberg/IcebergIO.java | 22 +- .../IcebergWriteSchemaTransformProvider.java | 32 ++- .../iceberg/OneTableDynamicDestinations.java | 31 +-- .../iceberg/PortableIcebergDestinations.java | 81 +++++++ .../io/iceberg/WriteGroupedRowsToFiles.java | 12 +- .../sdk/io/iceberg/WriteToDestinations.java | 43 ++-- .../io/iceberg/WriteUngroupedRowsToFiles.java | 12 +- .../beam/sdk/io/iceberg/IcebergIOIT.java | 196 ++++++++++++++--- .../sdk/io/iceberg/IcebergIOWriteTest.java | 43 ++-- ...ebergWriteSchemaTransformProviderTest.java | 208 +++++++++++++++++- 18 files changed, 652 insertions(+), 157 deletions(-) create mode 100644 sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/PortableIcebergDestinations.java diff --git a/.github/trigger_files/IO_Iceberg_Integration_Tests.json b/.github/trigger_files/IO_Iceberg_Integration_Tests.json index 3f63c0c9975f..1efc8e9e4405 100644 --- a/.github/trigger_files/IO_Iceberg_Integration_Tests.json +++ b/.github/trigger_files/IO_Iceberg_Integration_Tests.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run", - "modification": 2 + "modification": 1 } diff --git a/CHANGES.md b/CHANGES.md index e7b26851f998..728c8247acce 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -58,6 +58,8 @@ ## Highlights * Added support for using vLLM in the RunInference transform (Python) ([#32528](https://github.com/apache/beam/issues/32528)) +* [Managed Iceberg] Added support for streaming writes ([#32451](https://github.com/apache/beam/pull/32451)) +* [Managed Iceberg] Added support for writing to dynamic destinations ([#32565](https://github.com/apache/beam/pull/32565)) ## New Features / Improvements diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowFilter.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowFilter.java index 4e0d9d3ff30d..a3187de05cb6 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowFilter.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowFilter.java @@ -53,21 +53,21 @@ * * // this filter will exclusively keep these fields and drop everything else * List fields = Arrays.asList("foo", "bar", "baz"); - * RowFilter keepingFilter = new RowFilter(beamSchema).keeping(fields); + * RowFilter keepFilter = new RowFilter(beamSchema).keep(fields); * * // this filter will drop these fields - * RowFilter droppingFilter = new RowFilter(beamSchema).dropping(fields); + * RowFilter dropFilter = new RowFilter(beamSchema).drop(fields); * * // this filter will only output the contents of row field "my_record" * String field = "my_record"; * RowFilter onlyFilter = new RowFilter(beamSchema).only(field); * * // produces a filtered row - * Row outputRow = keepingFilter.filter(row); + * Row outputRow = keepFilter.filter(row); * } * - * Check the documentation for {@link #keeping(List)}, {@link #dropping(List)}, and {@link - * #only(String)} for further details on what an output Row can look like. + * Check the documentation for {@link #keep(List)}, {@link #drop(List)}, and {@link #only(String)} + * for further details on what an output Row can look like. */ public class RowFilter implements Serializable { private final Schema rowSchema; @@ -103,7 +103,7 @@ public RowFilter(Schema rowSchema) { * nested_2: xyz * } */ - public RowFilter keeping(List fields) { + public RowFilter keep(List fields) { checkUnconfigured(); verifyNoNestedFields(fields, "keep"); validateSchemaContainsFields(rowSchema, fields, "keep"); @@ -132,7 +132,7 @@ public RowFilter keeping(List fields) { * bar: 456 * } */ - public RowFilter dropping(List fields) { + public RowFilter drop(List fields) { checkUnconfigured(); verifyNoNestedFields(fields, "drop"); validateSchemaContainsFields(rowSchema, fields, "drop"); @@ -168,6 +168,7 @@ public RowFilter dropping(List fields) { */ public RowFilter only(String field) { checkUnconfigured(); + verifyNoNestedFields(Collections.singletonList(field), "only"); validateSchemaContainsFields(rowSchema, Collections.singletonList(field), "only"); Schema.Field rowField = rowSchema.getField(field); Preconditions.checkArgument( @@ -184,8 +185,8 @@ public RowFilter only(String field) { /** * Performs a filter operation (keep or drop) on the input {@link Row}. Must have already - * configured a filter operation with {@link #dropping(List)} or {@link #keeping(List)} for this - * {@link RowFilter}. + * configured a filter operation with {@link #drop(List)} or {@link #keep(List)} for this {@link + * RowFilter}. * *

If not yet configured, will simply return the same {@link Row}. */ @@ -196,9 +197,9 @@ public Row filter(Row row) { Preconditions.checkState( row.getSchema().assignableTo(rowSchema), - "Encountered Row with schema that is incompatible with this RowFilter's schema." + "Encountered Row with schema that is incompatible with this filter's schema." + "\nRow schema: %s" - + "\nSchema used to initialize this RowFilter: %s", + + "\nSchema used to initialize this filter: %s", row.getSchema(), rowSchema); @@ -219,8 +220,7 @@ public Schema outputSchema() { private void checkUnconfigured() { Preconditions.checkState( transformedSchema == null, - "This RowFilter has already been configured to filter to the following Schema: %s", - transformedSchema); + "Invalid filter configuration: Please set only one of 'keep', 'drop', or 'only'."); } /** Verifies that this selection contains no nested fields. */ @@ -233,9 +233,7 @@ private void verifyNoNestedFields(List fields, String operation) { } if (!nestedFields.isEmpty()) { throw new IllegalArgumentException( - String.format( - "RowFilter does not support specifying nested fields to %s: %s", - operation, nestedFields)); + String.format("'%s' does not support nested fields: %s", operation, nestedFields)); } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowStringInterpolator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowStringInterpolator.java index 1f095522dd8b..46873d77642f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowStringInterpolator.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowStringInterpolator.java @@ -30,6 +30,7 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.ValueInSingleWindow; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets; @@ -118,16 +119,17 @@ public RowStringInterpolator(String template, Schema rowSchema) { * Performs string interpolation on the template using values from the input {@link Row} and its * windowing metadata. */ - public String interpolate(Row row, BoundedWindow window, PaneInfo paneInfo, Instant timestamp) { + public String interpolate(ValueInSingleWindow element) { String interpolated = this.template; for (String field : fieldsToReplace) { Object val; + Instant timestamp = element.getTimestamp(); switch (field) { case WINDOW: - val = window.toString(); + val = element.getWindow().toString(); break; case PANE_INDEX: - val = paneInfo.getIndex(); + val = element.getPane().getIndex(); break; case YYYY: val = timestamp.getChronology().year().get(timestamp.getMillis()); @@ -139,7 +141,7 @@ public String interpolate(Row row, BoundedWindow window, PaneInfo paneInfo, Inst val = timestamp.getChronology().dayOfMonth().get(timestamp.getMillis()); break; default: - val = MoreObjects.firstNonNull(getValue(row, field), ""); + val = MoreObjects.firstNonNull(getValue(element.getValue(), field), ""); break; } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/RowFilterTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/RowFilterTest.java index 22c17f6d07c9..cc3e2bac9958 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/RowFilterTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/RowFilterTest.java @@ -254,10 +254,10 @@ public void testKeepSchemaFields() { @Test public void testDropNestedFieldsFails() { thrown.expect(IllegalArgumentException.class); - thrown.expectMessage("RowFilter does not support specifying nested fields to drop"); + thrown.expectMessage("'drop' does not support nested fields"); new RowFilter(ROW_SCHEMA) - .dropping( + .drop( Arrays.asList( "bool", "nullable_int", @@ -270,10 +270,10 @@ public void testDropNestedFieldsFails() { @Test public void testKeepNestedFieldsFails() { thrown.expect(IllegalArgumentException.class); - thrown.expectMessage("RowFilter does not support specifying nested fields to keep"); + thrown.expectMessage("'keep' does not support nested fields"); new RowFilter(ROW_SCHEMA) - .keeping( + .keep( Arrays.asList("str", "arr_int", "row.nested_str", "row.nested_row.doubly_nested_str")); } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/RowStringInterpolatorTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/RowStringInterpolatorTest.java index 0b1295c38533..fed55cc4cb89 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/RowStringInterpolatorTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/RowStringInterpolatorTest.java @@ -24,6 +24,7 @@ import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.ValueInSingleWindow; import org.joda.time.DateTime; import org.joda.time.Instant; import org.junit.Rule; @@ -68,7 +69,9 @@ public void testInvalidRowThrowsHelpfulError() { thrown.expect(IllegalArgumentException.class); thrown.expectMessage("Invalid row does not contain field 'str'."); - interpolator.interpolate(invalidRow, null, null, null); + interpolator.interpolate( + ValueInSingleWindow.of( + invalidRow, new Instant(0), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING)); } @Test @@ -85,7 +88,9 @@ public void testInvalidRowThrowsHelpfulErrorForNestedFields() { thrown.expect(IllegalArgumentException.class); thrown.expectMessage("Invalid row does not contain field 'nested_int'."); - interpolator.interpolate(invalidRow, null, null, null); + interpolator.interpolate( + ValueInSingleWindow.of( + invalidRow, new Instant(0), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING)); } @Test @@ -105,7 +110,9 @@ public void testInvalidRowThrowsHelpfulErrorForDoublyNestedFields() { thrown.expect(IllegalArgumentException.class); thrown.expectMessage("Invalid row does not contain field 'doubly_nested_int'."); - interpolator.interpolate(invalidRow, null, null, null); + interpolator.interpolate( + ValueInSingleWindow.of( + invalidRow, new Instant(0), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING)); } private static final Row ROW = @@ -134,7 +141,9 @@ public void testTopLevelInterpolation() { String template = "foo {str}, bar {bool}, baz {int}, xyz {nullable_int}"; RowStringInterpolator interpolator = new RowStringInterpolator(template, ROW_SCHEMA); - String output = interpolator.interpolate(ROW, null, null, null); + String output = + interpolator.interpolate( + ValueInSingleWindow.of(ROW, new Instant(0), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING)); assertEquals("foo str_value, bar true, baz 123, xyz ", output); } @@ -144,7 +153,9 @@ public void testNestedLevelInterpolation() { String template = "foo {str}, bar {row.nested_str}, baz {row.nested_float}"; RowStringInterpolator interpolator = new RowStringInterpolator(template, ROW_SCHEMA); - String output = interpolator.interpolate(ROW, null, null, null); + String output = + interpolator.interpolate( + ValueInSingleWindow.of(ROW, new Instant(0), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING)); assertEquals("foo str_value, bar nested_str_value, baz 1.234", output); } @@ -155,7 +166,9 @@ public void testDoublyNestedInterpolation() { "foo {str}, bar {row.nested_row.doubly_nested_str}, baz {row.nested_row.doubly_nested_int}"; RowStringInterpolator interpolator = new RowStringInterpolator(template, ROW_SCHEMA); - String output = interpolator.interpolate(ROW, null, null, null); + String output = + interpolator.interpolate( + ValueInSingleWindow.of(ROW, new Instant(0), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING)); assertEquals("foo str_value, bar doubly_nested_str_value, baz 789", output); } @@ -177,10 +190,11 @@ public void testInterpolateWindowingInformation() { String output = interpolator.interpolate( - ROW, - GlobalWindow.INSTANCE, - PaneInfo.createPane(false, false, PaneInfo.Timing.ON_TIME, 2, 0), - instant); + ValueInSingleWindow.of( + ROW, + instant, + GlobalWindow.INSTANCE, + PaneInfo.createPane(false, false, PaneInfo.Timing.ON_TIME, 2, 0))); String expected = String.format( "str: str_value, window: %s, pane: 2, year: 2024, month: 8, day: 28", diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AssignDestinations.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AssignDestinations.java index 4008fcc6f5bd..37b7dbf107e6 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AssignDestinations.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AssignDestinations.java @@ -17,12 +17,19 @@ */ package org.apache.beam.sdk.io.iceberg; +import static org.apache.beam.sdk.io.iceberg.WriteToDestinations.DATA; +import static org.apache.beam.sdk.io.iceberg.WriteToDestinations.DEST; + import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.ValueInSingleWindow; +import org.joda.time.Instant; /** * Assigns the destination metadata for each input record. @@ -32,7 +39,7 @@ */ class AssignDestinations extends PTransform, PCollection> { - private DynamicDestinations dynamicDestinations; + private final DynamicDestinations dynamicDestinations; public AssignDestinations(DynamicDestinations dynamicDestinations) { this.dynamicDestinations = dynamicDestinations; @@ -41,11 +48,10 @@ public AssignDestinations(DynamicDestinations dynamicDestinations) { @Override public PCollection expand(PCollection input) { - final Schema inputSchema = input.getSchema(); final Schema outputSchema = Schema.builder() - .addRowField("data", inputSchema) - .addRowField("dest", dynamicDestinations.getMetadataSchema()) + .addStringField(DEST) + .addRowField(DATA, dynamicDestinations.getDataSchema()) .build(); return input @@ -53,11 +59,19 @@ public PCollection expand(PCollection input) { ParDo.of( new DoFn() { @ProcessElement - public void processElement(@Element Row data, OutputReceiver out) { + public void processElement( + @Element Row element, + BoundedWindow window, + PaneInfo paneInfo, + @Timestamp Instant timestamp, + OutputReceiver out) { + String tableIdentifier = + dynamicDestinations.getTableStringIdentifier( + ValueInSingleWindow.of(element, timestamp, window, paneInfo)); + Row data = dynamicDestinations.getData(element); + out.output( - Row.withSchema(outputSchema) - .addValues(data, dynamicDestinations.assignDestinationMetadata(data)) - .build()); + Row.withSchema(outputSchema).addValues(tableIdentifier, data).build()); } })) .setRowSchema(outputSchema); diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/DynamicDestinations.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/DynamicDestinations.java index 6fc3c139bdc1..0185758c8aeb 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/DynamicDestinations.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/DynamicDestinations.java @@ -20,17 +20,20 @@ import java.io.Serializable; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.ValueInSingleWindow; import org.apache.iceberg.catalog.TableIdentifier; public interface DynamicDestinations extends Serializable { - Schema getMetadataSchema(); + Schema getDataSchema(); - Row assignDestinationMetadata(Row data); + Row getData(Row element); - IcebergDestination instantiateDestination(Row dest); + IcebergDestination instantiateDestination(String destination); - static DynamicDestinations singleTable(TableIdentifier tableId) { - return new OneTableDynamicDestinations(tableId); + String getTableStringIdentifier(ValueInSingleWindow element); + + static DynamicDestinations singleTable(TableIdentifier tableId, Schema inputSchema) { + return new OneTableDynamicDestinations(tableId, inputSchema); } } diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java index 0f9612339f48..282be826cf41 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java @@ -115,15 +115,21 @@ public IcebergWriteResult expand(PCollection input) { DynamicDestinations destinations = getDynamicDestinations(); if (destinations == null) { destinations = - DynamicDestinations.singleTable(Preconditions.checkNotNull(getTableIdentifier())); + DynamicDestinations.singleTable( + Preconditions.checkNotNull(getTableIdentifier()), input.getSchema()); } - if (input.isBounded().equals(PCollection.IsBounded.UNBOUNDED)) { + // Assign destinations before re-windowing to global because + // user's dynamic destination may depend on windowing properties + PCollection assignedRows = + input.apply("Set Destination Metadata", new AssignDestinations(destinations)); + + if (assignedRows.isBounded().equals(PCollection.IsBounded.UNBOUNDED)) { Duration triggeringFrequency = getTriggeringFrequency(); checkArgumentNotNull( triggeringFrequency, "Streaming pipelines must set a triggering frequency."); - input = - input.apply( + assignedRows = + assignedRows.apply( "WindowIntoGlobal", Window.into(new GlobalWindows()) .triggering( @@ -138,11 +144,9 @@ public IcebergWriteResult expand(PCollection input) { getTriggeringFrequency() == null, "Triggering frequency is only applicable for streaming pipelines."); } - return input - .apply("Set Destination Metadata", new AssignDestinations(destinations)) - .apply( - "Write Rows to Destinations", - new WriteToDestinations(getCatalogConfig(), destinations, getTriggeringFrequency())); + return assignedRows.apply( + "Write Rows to Destinations", + new WriteToDestinations(getCatalogConfig(), destinations, getTriggeringFrequency())); } } diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java index 9f1b51cf2300..89e4804de369 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java @@ -41,7 +41,7 @@ import org.apache.beam.sdk.values.PCollectionRowTuple; import org.apache.beam.sdk.values.Row; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; -import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.FileFormat; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Duration; @@ -89,6 +89,21 @@ public static Builder builder() { "For a streaming pipeline, sets the frequency at which snapshots are produced.") public abstract @Nullable Integer getTriggeringFrequencySeconds(); + @SchemaFieldDescription( + "A list of field names to keep in the input record. All other fields are dropped before writing. " + + "Is mutually exclusive with 'drop' and 'only'.") + public abstract @Nullable List getKeep(); + + @SchemaFieldDescription( + "A list of field names to drop from the input record before writing. " + + "Is mutually exclusive with 'keep' and 'only'.") + public abstract @Nullable List getDrop(); + + @SchemaFieldDescription( + "The name of a single record field that should be written. " + + "Is mutually exclusive with 'keep' and 'drop'.") + public abstract @Nullable String getOnly(); + @AutoValue.Builder public abstract static class Builder { public abstract Builder setTable(String table); @@ -101,6 +116,12 @@ public abstract static class Builder { public abstract Builder setTriggeringFrequencySeconds(Integer triggeringFrequencySeconds); + public abstract Builder setKeep(List keep); + + public abstract Builder setDrop(List drop); + + public abstract Builder setOnly(String only); + public abstract Configuration build(); } @@ -160,7 +181,14 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) { IcebergIO.WriteRows writeTransform = IcebergIO.writeRows(configuration.getIcebergCatalog()) - .to(TableIdentifier.parse(configuration.getTable())); + .to( + new PortableIcebergDestinations( + configuration.getTable(), + FileFormat.PARQUET.toString(), + rows.getSchema(), + configuration.getDrop(), + configuration.getKeep(), + configuration.getOnly())); Integer trigFreq = configuration.getTriggeringFrequencySeconds(); if (trigFreq != null) { diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/OneTableDynamicDestinations.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/OneTableDynamicDestinations.java index e09fdf171fd6..861a8ad198a8 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/OneTableDynamicDestinations.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/OneTableDynamicDestinations.java @@ -17,52 +17,57 @@ */ package org.apache.beam.sdk.io.iceberg; +import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; + import java.io.Externalizable; import java.io.IOException; import java.io.ObjectInput; import java.io.ObjectOutput; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.ValueInSingleWindow; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; import org.apache.iceberg.FileFormat; import org.apache.iceberg.catalog.TableIdentifier; import org.checkerframework.checker.nullness.qual.MonotonicNonNull; class OneTableDynamicDestinations implements DynamicDestinations, Externalizable { - - private static final Schema EMPTY_SCHEMA = Schema.builder().build(); - private static final Row EMPTY_ROW = Row.nullRow(EMPTY_SCHEMA); - // TableId represented as String for serializability private transient @MonotonicNonNull String tableIdString; private transient @MonotonicNonNull TableIdentifier tableId; + private transient @MonotonicNonNull Schema rowSchema; @VisibleForTesting TableIdentifier getTableIdentifier() { if (tableId == null) { - tableId = TableIdentifier.parse(Preconditions.checkNotNull(tableIdString)); + tableId = TableIdentifier.parse(checkStateNotNull(tableIdString)); } return tableId; } - OneTableDynamicDestinations(TableIdentifier tableId) { + OneTableDynamicDestinations(TableIdentifier tableId, Schema rowSchema) { this.tableIdString = tableId.toString(); + this.rowSchema = rowSchema; + } + + @Override + public Schema getDataSchema() { + return checkStateNotNull(rowSchema); } @Override - public Schema getMetadataSchema() { - return EMPTY_SCHEMA; + public Row getData(Row element) { + return element; } @Override - public Row assignDestinationMetadata(Row data) { - return EMPTY_ROW; + public String getTableStringIdentifier(ValueInSingleWindow element) { + return checkStateNotNull(tableIdString); } @Override - public IcebergDestination instantiateDestination(Row dest) { + public IcebergDestination instantiateDestination(String unused) { return IcebergDestination.builder() .setTableIdentifier(getTableIdentifier()) .setTableCreateConfig(null) @@ -75,7 +80,7 @@ public OneTableDynamicDestinations() {} @Override public void writeExternal(ObjectOutput out) throws IOException { - out.writeUTF(Preconditions.checkNotNull(tableIdString)); + out.writeUTF(checkStateNotNull(tableIdString)); } @Override diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/PortableIcebergDestinations.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/PortableIcebergDestinations.java new file mode 100644 index 000000000000..47f661bba3f8 --- /dev/null +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/PortableIcebergDestinations.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.iceberg; + +import java.util.List; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.util.RowFilter; +import org.apache.beam.sdk.util.RowStringInterpolator; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.ValueInSingleWindow; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.catalog.TableIdentifier; +import org.checkerframework.checker.nullness.qual.Nullable; + +class PortableIcebergDestinations implements DynamicDestinations { + private final RowFilter rowFilter; + private final RowStringInterpolator interpolator; + private final String fileFormat; + + public PortableIcebergDestinations( + String destinationTemplate, + String fileFormat, + Schema inputSchema, + @Nullable List fieldsToDrop, + @Nullable List fieldsToKeep, + @Nullable String onlyField) { + interpolator = new RowStringInterpolator(destinationTemplate, inputSchema); + RowFilter rf = new RowFilter(inputSchema); + + if (fieldsToDrop != null) { + rf = rf.drop(fieldsToDrop); + } + if (fieldsToKeep != null) { + rf = rf.keep(fieldsToKeep); + } + if (onlyField != null) { + rf = rf.only(onlyField); + } + rowFilter = rf; + this.fileFormat = fileFormat; + } + + @Override + public Schema getDataSchema() { + return rowFilter.outputSchema(); + } + + @Override + public Row getData(Row element) { + return rowFilter.filter(element); + } + + @Override + public String getTableStringIdentifier(ValueInSingleWindow element) { + return interpolator.interpolate(element); + } + + @Override + public IcebergDestination instantiateDestination(String dest) { + return IcebergDestination.builder() + .setTableIdentifier(TableIdentifier.parse(dest)) + .setTableCreateConfig(null) + .setFileFormat(FileFormat.fromString(fileFormat)) + .build(); + } +} diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteGroupedRowsToFiles.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteGroupedRowsToFiles.java index 65cc3f3c3059..0bc18ffcf421 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteGroupedRowsToFiles.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteGroupedRowsToFiles.java @@ -36,7 +36,7 @@ class WriteGroupedRowsToFiles extends PTransform< - PCollection, Iterable>>, PCollection> { + PCollection, Iterable>>, PCollection> { static final long DEFAULT_MAX_BYTES_PER_FILE = (1L << 40); // 1TB @@ -51,7 +51,7 @@ class WriteGroupedRowsToFiles @Override public PCollection expand( - PCollection, Iterable>> input) { + PCollection, Iterable>> input) { return input.apply( ParDo.of( new WriteGroupedRowsToFilesDoFn( @@ -59,7 +59,7 @@ public PCollection expand( } private static class WriteGroupedRowsToFilesDoFn - extends DoFn, Iterable>, FileWriteResult> { + extends DoFn, Iterable>, FileWriteResult> { private final DynamicDestinations dynamicDestinations; private final IcebergCatalogConfig catalogConfig; @@ -87,13 +87,13 @@ private org.apache.iceberg.catalog.Catalog getCatalog() { @ProcessElement public void processElement( ProcessContext c, - @Element KV, Iterable> element, + @Element KV, Iterable> element, BoundedWindow window, PaneInfo pane) throws Exception { - Row destMetadata = element.getKey().getKey(); - IcebergDestination destination = dynamicDestinations.instantiateDestination(destMetadata); + String tableIdentifier = element.getKey().getKey(); + IcebergDestination destination = dynamicDestinations.instantiateDestination(tableIdentifier); WindowedValue windowedDestination = WindowedValue.of(destination, window.maxTimestamp(), window, pane); RecordWriterManager writer; diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteToDestinations.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteToDestinations.java index f71ff24a1a37..60d23f2dd394 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteToDestinations.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteToDestinations.java @@ -19,9 +19,11 @@ import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull; +import java.util.concurrent.ThreadLocalRandom; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.RowCoder; import org.apache.beam.sdk.coders.ShardedKeyCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.transforms.Flatten; import org.apache.beam.sdk.transforms.GroupByKey; @@ -38,6 +40,7 @@ import org.apache.beam.sdk.values.PCollectionList; import org.apache.beam.sdk.values.Row; import org.apache.beam.sdk.values.ShardedKey; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Duration; @@ -45,6 +48,9 @@ class WriteToDestinations extends PTransform, IcebergWriteResul static final long DEFAULT_MAX_BYTES_PER_FILE = (1L << 40); // 1TB static final int DEFAULT_NUM_FILE_SHARDS = 0; + // constant field names representing table identifier string and the record + static final String DEST = "dest"; + static final String DATA = "data"; private final IcebergCatalogConfig catalogConfig; private final DynamicDestinations dynamicDestinations; @@ -71,24 +77,20 @@ public IcebergWriteResult expand(PCollection input) { new WriteUngroupedRowsToFiles(catalogConfig, dynamicDestinations)); // Then write the rest by shuffling on the destination metadata - Schema destSchema = - checkArgumentNotNull( - writeUngroupedResult - .getSpilledRows() - .getSchema() - .getField("dest") - .getType() - .getRowSchema(), - "Input schema missing `dest` field."); + Preconditions.checkState( + writeUngroupedResult.getSpilledRows().getSchema().hasField(DEST), + "Input schema missing `%s` field.", + DEST); Schema dataSchema = checkArgumentNotNull( writeUngroupedResult .getSpilledRows() .getSchema() - .getField("data") + .getField(DATA) .getType() .getRowSchema(), - "Input schema missing `data` field"); + "Input schema missing `%s` field", + DATA); PCollection writeGroupedResult = writeUngroupedResult @@ -96,24 +98,25 @@ public IcebergWriteResult expand(PCollection input) { .apply( "Key by destination and shard", MapElements.via( - new SimpleFunction, Row>>() { + new SimpleFunction, Row>>() { private static final int SPILLED_ROWS_SHARDING_FACTOR = 10; - private int shardNumber = 0; + private int shardNumber = + ThreadLocalRandom.current().nextInt(SPILLED_ROWS_SHARDING_FACTOR); @Override - public KV, Row> apply(Row elem) { + public KV, Row> apply(Row elem) { Row data = checkArgumentNotNull( - elem.getRow("data"), "Element missing `data` field"); - Row dest = + elem.getRow(DATA), "Element missing `%s` field", DATA); + String dest = checkArgumentNotNull( - elem.getRow("dest"), "Element missing `dest` field"); + elem.getString(DEST), "Element missing `%s` field", DEST); return KV.of( - ShardedKey.of(dest, shardNumber % SPILLED_ROWS_SHARDING_FACTOR), data); + ShardedKey.of(dest, ++shardNumber % SPILLED_ROWS_SHARDING_FACTOR), + data); } })) - .setCoder( - KvCoder.of(ShardedKeyCoder.of(RowCoder.of(destSchema)), RowCoder.of(dataSchema))) + .setCoder(KvCoder.of(ShardedKeyCoder.of(StringUtf8Coder.of()), RowCoder.of(dataSchema))) .apply("Group spilled rows by destination shard", GroupByKey.create()) .apply( "Write remaining rows to files", diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteUngroupedRowsToFiles.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteUngroupedRowsToFiles.java index 0ca06d797750..1982c7fcbad0 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteUngroupedRowsToFiles.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteUngroupedRowsToFiles.java @@ -17,6 +17,8 @@ */ package org.apache.beam.sdk.io.iceberg; +import static org.apache.beam.sdk.io.iceberg.WriteToDestinations.DATA; +import static org.apache.beam.sdk.io.iceberg.WriteToDestinations.DEST; import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull; import java.util.List; @@ -208,11 +210,11 @@ public void startBundle() { public void processElement( @Element Row element, BoundedWindow window, PaneInfo pane, MultiOutputReceiver out) throws Exception { - - Row data = checkArgumentNotNull(element.getRow("data"), "Input row missing `data` field."); - Row destMetadata = - checkArgumentNotNull(element.getRow("dest"), "Input row missing `dest` field."); - IcebergDestination destination = dynamicDestinations.instantiateDestination(destMetadata); + String dest = + checkArgumentNotNull(element.getString(DEST), "Input row missing `%s` field.", DEST); + Row data = + checkArgumentNotNull(element.getRow(DATA), "Input row missing `data` field.", DATA); + IcebergDestination destination = dynamicDestinations.instantiateDestination(dest); WindowedValue windowedDestination = WindowedValue.of(destination, window.maxTimestamp(), window, pane); diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java index 8c6d3d99e35e..84f2146275f0 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java @@ -17,6 +17,7 @@ */ package org.apache.beam.sdk.io.iceberg; +import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.equalTo; @@ -24,12 +25,14 @@ import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; import java.util.stream.Collectors; import java.util.stream.LongStream; +import java.util.stream.Stream; import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; import org.apache.beam.sdk.managed.Managed; import org.apache.beam.sdk.testing.PAssert; @@ -37,12 +40,15 @@ import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.PeriodicImpulse; +import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.SimpleFunction; import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.util.RowFilter; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.Row; import org.apache.beam.sdk.values.TypeDescriptors; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.AppendFiles; @@ -50,6 +56,7 @@ import org.apache.iceberg.CombinedScanTask; import org.apache.iceberg.FileScanTask; import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; import org.apache.iceberg.Table; import org.apache.iceberg.TableScan; import org.apache.iceberg.catalog.Catalog; @@ -64,6 +71,7 @@ import org.apache.iceberg.io.InputFile; import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.parquet.Parquet; +import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Duration; import org.joda.time.Instant; import org.junit.Before; @@ -93,6 +101,7 @@ public class IcebergIOIT implements Serializable { private static final org.apache.beam.sdk.schemas.Schema BEAM_SCHEMA = org.apache.beam.sdk.schemas.Schema.builder() .addStringField("str") + .addStringField("char") .addInt64Field("modulo_5") .addBooleanField("bool") .addInt32Field("int") @@ -121,6 +130,7 @@ public Row apply(Long num) { return Row.withSchema(BEAM_SCHEMA) .addValue("value_" + strNum) + .addValue(String.valueOf((char) (97 + num % 5))) .addValue(num % 5) .addValue(num % 2 == 0) .addValue(Integer.valueOf(strNum)) @@ -154,7 +164,7 @@ public Record apply(Row input) { private String warehouseLocation; - private TableIdentifier tableId; + private String tableId; private Catalog catalog; @BeforeClass @@ -171,7 +181,7 @@ public void setUp() { warehouseLocation = String.format("%s/IcebergIOIT/%s", options.getTempLocation(), UUID.randomUUID()); - tableId = TableIdentifier.of(testName.getMethodName(), "test_table"); + tableId = testName.getMethodName() + ".test_table"; catalog = new HadoopCatalog(catalogHadoopConf, warehouseLocation); } @@ -213,7 +223,8 @@ private List populateTable(Table table) throws IOException { } private List readRecords(Table table) { - TableScan tableScan = table.newScan().project(ICEBERG_SCHEMA); + Schema tableSchema = table.schema(); + TableScan tableScan = table.newScan().project(tableSchema); List writtenRecords = new ArrayList<>(); for (CombinedScanTask task : tableScan.planTasks()) { InputFilesDecryptor descryptor = @@ -223,9 +234,9 @@ private List readRecords(Table table) { CloseableIterable iterable = Parquet.read(inputFile) .split(fileTask.start(), fileTask.length()) - .project(ICEBERG_SCHEMA) + .project(tableSchema) .createReaderFunc( - fileSchema -> GenericParquetReaders.buildReader(ICEBERG_SCHEMA, fileSchema)) + fileSchema -> GenericParquetReaders.buildReader(tableSchema, fileSchema)) .filter(fileTask.residual()) .build(); @@ -237,9 +248,9 @@ private List readRecords(Table table) { return writtenRecords; } - private Map managedIcebergConfig() { + private Map managedIcebergConfig(String tableId) { return ImmutableMap.builder() - .put("table", tableId.toString()) + .put("table", tableId) .put("catalog_name", "test-name") .put( "catalog_properties", @@ -257,11 +268,11 @@ private Map managedIcebergConfig() { */ @Test public void testRead() throws Exception { - Table table = catalog.createTable(tableId, ICEBERG_SCHEMA); + Table table = catalog.createTable(TableIdentifier.parse(tableId), ICEBERG_SCHEMA); List expectedRows = populateTable(table); - Map config = managedIcebergConfig(); + Map config = managedIcebergConfig(tableId); PCollection rows = pipeline.apply(Managed.read(Managed.ICEBERG).withConfig(config)).getSinglePCollection(); @@ -279,10 +290,10 @@ public void testRead() throws Exception { */ @Test public void testWrite() { - Table table = catalog.createTable(tableId, ICEBERG_SCHEMA); + Table table = catalog.createTable(TableIdentifier.parse(tableId), ICEBERG_SCHEMA); // Write with Beam - Map config = managedIcebergConfig(); + Map config = managedIcebergConfig(tableId); PCollection input = pipeline.apply(Create.of(INPUT_ROWS)).setRowSchema(BEAM_SCHEMA); input.apply(Managed.write(Managed.ICEBERG).withConfig(config)); pipeline.run().waitUntilFinish(); @@ -303,10 +314,11 @@ public void testWritePartitionedData() { .identity("modulo_5") .truncate("str", "value_x".length()) .build(); - Table table = catalog.createTable(tableId, ICEBERG_SCHEMA, partitionSpec); + Table table = + catalog.createTable(TableIdentifier.parse(tableId), ICEBERG_SCHEMA, partitionSpec); // Write with Beam - Map config = managedIcebergConfig(); + Map config = managedIcebergConfig(tableId); PCollection input = pipeline.apply(Create.of(INPUT_ROWS)).setRowSchema(BEAM_SCHEMA); input.apply(Managed.write(Managed.ICEBERG).withConfig(config)); pipeline.run().waitUntilFinish(); @@ -317,25 +329,29 @@ public void testWritePartitionedData() { returnedRecords, containsInAnyOrder(INPUT_ROWS.stream().map(RECORD_FUNC::apply).toArray())); } + private PeriodicImpulse getStreamingSource() { + return PeriodicImpulse.create() + .stopAfter(Duration.millis(NUM_RECORDS - 1)) + .withInterval(Duration.millis(1)); + } + @Test public void testStreamingWrite() { PartitionSpec partitionSpec = PartitionSpec.builderFor(ICEBERG_SCHEMA).identity("bool").identity("modulo_5").build(); - Table table = catalog.createTable(tableId, ICEBERG_SCHEMA, partitionSpec); + Table table = + catalog.createTable(TableIdentifier.parse(tableId), ICEBERG_SCHEMA, partitionSpec); - Map config = new HashMap<>(managedIcebergConfig()); + Map config = new HashMap<>(managedIcebergConfig(tableId)); config.put("triggering_frequency_seconds", 4); - // over a span of 10 seconds, create elements from longs in range [0, 1000) + // create elements from longs in range [0, 1000) PCollection input = pipeline - .apply( - PeriodicImpulse.create() - .stopAfter(Duration.millis(9_990)) - .withInterval(Duration.millis(10))) + .apply(getStreamingSource()) .apply( MapElements.into(TypeDescriptors.rows()) - .via(instant -> ROW_FUNC.apply((instant.getMillis() / 10) % 1000))) + .via(instant -> ROW_FUNC.apply(instant.getMillis() % NUM_RECORDS))) .setRowSchema(BEAM_SCHEMA); assertThat(input.isBounded(), equalTo(PCollection.IsBounded.UNBOUNDED)); @@ -352,24 +368,22 @@ public void testStreamingWrite() { public void testStreamingWriteWithPriorWindowing() { PartitionSpec partitionSpec = PartitionSpec.builderFor(ICEBERG_SCHEMA).identity("bool").identity("modulo_5").build(); - Table table = catalog.createTable(tableId, ICEBERG_SCHEMA, partitionSpec); + Table table = + catalog.createTable(TableIdentifier.parse(tableId), ICEBERG_SCHEMA, partitionSpec); - Map config = new HashMap<>(managedIcebergConfig()); + Map config = new HashMap<>(managedIcebergConfig(tableId)); config.put("triggering_frequency_seconds", 4); // over a span of 10 seconds, create elements from longs in range [0, 1000) PCollection input = pipeline - .apply( - PeriodicImpulse.create() - .stopAfter(Duration.millis(9_990)) - .withInterval(Duration.millis(10))) + .apply(getStreamingSource()) .apply( Window.into(FixedWindows.of(Duration.standardSeconds(1))) .accumulatingFiredPanes()) .apply( MapElements.into(TypeDescriptors.rows()) - .via(instant -> ROW_FUNC.apply((instant.getMillis() / 10) % 1000))) + .via(instant -> ROW_FUNC.apply(instant.getMillis() % NUM_RECORDS))) .setRowSchema(BEAM_SCHEMA); assertThat(input.isBounded(), equalTo(PCollection.IsBounded.UNBOUNDED)); @@ -381,4 +395,130 @@ public void testStreamingWriteWithPriorWindowing() { assertThat( returnedRecords, containsInAnyOrder(INPUT_ROWS.stream().map(RECORD_FUNC::apply).toArray())); } + + private void writeToDynamicDestinations(@Nullable String filterOp) { + writeToDynamicDestinations(filterOp, false, false); + } + + /** + * @param filterOp if null, just perform a normal dynamic destination write test; otherwise, + * performs a simple filter on the record before writing. Valid options are "keep", "drop", + * and "only" + */ + private void writeToDynamicDestinations( + @Nullable String filterOp, boolean streaming, boolean partitioning) { + String tableIdentifierTemplate = tableId + "_{modulo_5}_{char}"; + Map writeConfig = new HashMap<>(managedIcebergConfig(tableIdentifierTemplate)); + + List fieldsToFilter = Arrays.asList("row", "str", "int", "nullable_long"); + // an un-configured filter will just return the same row + RowFilter rowFilter = new RowFilter(BEAM_SCHEMA); + if (filterOp != null) { + switch (filterOp) { + case "drop": + rowFilter = rowFilter.drop(fieldsToFilter); + writeConfig.put(filterOp, fieldsToFilter); + break; + case "keep": + rowFilter = rowFilter.keep(fieldsToFilter); + writeConfig.put(filterOp, fieldsToFilter); + break; + case "only": + rowFilter = rowFilter.only(fieldsToFilter.get(0)); + writeConfig.put(filterOp, fieldsToFilter.get(0)); + break; + default: + throw new UnsupportedOperationException("Unknown operation: " + filterOp); + } + } + + Schema tableSchema = IcebergUtils.beamSchemaToIcebergSchema(rowFilter.outputSchema()); + + PartitionSpec partitionSpec = null; + if (partitioning) { + Preconditions.checkState(filterOp == null || !filterOp.equals("only")); + partitionSpec = + PartitionSpec.builderFor(tableSchema).identity("bool").identity("modulo_5").build(); + } + Table table0 = + catalog.createTable(TableIdentifier.parse(tableId + "_0_a"), tableSchema, partitionSpec); + Table table1 = + catalog.createTable(TableIdentifier.parse(tableId + "_1_b"), tableSchema, partitionSpec); + Table table2 = + catalog.createTable(TableIdentifier.parse(tableId + "_2_c"), tableSchema, partitionSpec); + Table table3 = + catalog.createTable(TableIdentifier.parse(tableId + "_3_d"), tableSchema, partitionSpec); + Table table4 = + catalog.createTable(TableIdentifier.parse(tableId + "_4_e"), tableSchema, partitionSpec); + + // Write with Beam + PCollection input; + if (streaming) { + writeConfig.put("triggering_frequency_seconds", 5); + input = + pipeline + .apply(getStreamingSource()) + .apply( + MapElements.into(TypeDescriptors.rows()) + .via(instant -> ROW_FUNC.apply(instant.getMillis() % NUM_RECORDS))); + } else { + input = pipeline.apply(Create.of(INPUT_ROWS)); + } + input.setRowSchema(BEAM_SCHEMA).apply(Managed.write(Managed.ICEBERG).withConfig(writeConfig)); + pipeline.run().waitUntilFinish(); + + // Read back and check records are correct + List> returnedRecords = + Arrays.asList( + readRecords(table0), + readRecords(table1), + readRecords(table2), + readRecords(table3), + readRecords(table4)); + + SerializableFunction recordFunc = + row -> IcebergUtils.beamRowToIcebergRecord(tableSchema, row); + + for (int i = 0; i < returnedRecords.size(); i++) { + List records = returnedRecords.get(i); + long l = i; + Stream expectedRecords = + INPUT_ROWS.stream() + .filter(rec -> checkStateNotNull(rec.getInt64("modulo_5")) == l) + .map(rowFilter::filter) + .map(recordFunc::apply); + + assertThat(records, containsInAnyOrder(expectedRecords.toArray())); + } + } + + @Test + public void testWriteToDynamicDestinations() { + writeToDynamicDestinations(null); + } + + @Test + public void testWriteToDynamicDestinationsAndDropFields() { + writeToDynamicDestinations("drop"); + } + + @Test + public void testWriteToDynamicDestinationsAndKeepFields() { + writeToDynamicDestinations("keep"); + } + + @Test + public void testWriteToDynamicDestinationsWithOnlyRecord() { + writeToDynamicDestinations("only"); + } + + @Test + public void testStreamToDynamicDestinationsAndKeepFields() { + writeToDynamicDestinations("keep", true, false); + } + + @Test + public void testStreamToPartitionedDynamicDestinations() { + writeToDynamicDestinations(null, true, true); + } } diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOWriteTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOWriteTest.java index d3bf13a16787..2f81db671dd7 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOWriteTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOWriteTest.java @@ -33,6 +33,7 @@ import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.ValueInSingleWindow; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; @@ -135,25 +136,26 @@ public void testDynamicDestinationsWithoutSpillover() throws Exception { DynamicDestinations dynamicDestinations = new DynamicDestinations() { - private final Schema schema = Schema.builder().addInt64Field("tableNumber").build(); + @Override + public Schema getDataSchema() { + return IcebergUtils.icebergSchemaToBeamSchema(TestFixtures.SCHEMA); + } @Override - public Schema getMetadataSchema() { - return schema; + public Row getData(Row element) { + return element; } @Override - public Row assignDestinationMetadata(Row data) { - long rowId = data.getInt64("id"); - return Row.withSchema(schema).addValues((rowId / 3) + 1).build(); + public String getTableStringIdentifier(ValueInSingleWindow element) { + long tableNumber = element.getValue().getInt64("id") / 3 + 1; + return String.format("default.table%s-%s", tableNumber, salt); } @Override - public IcebergDestination instantiateDestination(Row dest) { + public IcebergDestination instantiateDestination(String dest) { return IcebergDestination.builder() - .setTableIdentifier( - TableIdentifier.of( - "default", "table" + dest.getInt64("tableNumber") + "-" + salt)) + .setTableIdentifier(TableIdentifier.parse(dest)) .setFileFormat(FileFormat.PARQUET) .build(); } @@ -230,25 +232,26 @@ public void testDynamicDestinationsWithSpillover() throws Exception { DynamicDestinations dynamicDestinations = new DynamicDestinations() { - private final Schema schema = Schema.builder().addInt64Field("tableNumber").build(); + @Override + public Schema getDataSchema() { + return IcebergUtils.icebergSchemaToBeamSchema(TestFixtures.SCHEMA); + } @Override - public Schema getMetadataSchema() { - return schema; + public Row getData(Row element) { + return element; } @Override - public Row assignDestinationMetadata(Row data) { - long rowId = data.getInt64("id"); - return Row.withSchema(schema).addValues(rowId % numDestinations).build(); + public String getTableStringIdentifier(ValueInSingleWindow element) { + long tableNumber = element.getValue().getInt64("id") % numDestinations; + return String.format("default.table%s-%s", tableNumber, salt); } @Override - public IcebergDestination instantiateDestination(Row dest) { + public IcebergDestination instantiateDestination(String dest) { return IcebergDestination.builder() - .setTableIdentifier( - TableIdentifier.of( - "default", "table" + dest.getInt64("tableNumber") + "-" + salt)) + .setTableIdentifier(TableIdentifier.parse(dest)) .setFileFormat(FileFormat.PARQUET) .build(); } diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProviderTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProviderTest.java index 779687c97768..2d36a0409b86 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProviderTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProviderTest.java @@ -23,25 +23,40 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; +import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; import org.apache.beam.sdk.managed.Managed; +import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.testing.TestStream; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.windowing.FixedWindows; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.util.RowFilter; +import org.apache.beam.sdk.util.RowStringInterpolator; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionRowTuple; import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.ValueInSingleWindow; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.apache.iceberg.CatalogUtil; import org.apache.iceberg.Table; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.data.IcebergGenerics; import org.apache.iceberg.data.Record; +import org.checkerframework.checker.nullness.qual.Nullable; import org.hamcrest.Matchers; +import org.joda.time.Duration; +import org.joda.time.Instant; import org.junit.ClassRule; import org.junit.Rule; import org.junit.Test; @@ -109,7 +124,8 @@ public void testSimpleAppend() { .apply("Append To Table", new IcebergWriteSchemaTransformProvider().from(config)) .get(OUTPUT_TAG); - PAssert.that(result).satisfies(new VerifyOutputs(identifier, "append")); + PAssert.that(result) + .satisfies(new VerifyOutputs(Collections.singletonList(identifier), "append")); testPipeline.run().waitUntilFinish(); @@ -140,7 +156,8 @@ public void testWriteUsingManagedTransform() { PCollection result = inputRows.apply(Managed.write(Managed.ICEBERG).withConfig(configMap)).get(OUTPUT_TAG); - PAssert.that(result).satisfies(new VerifyOutputs(identifier, "append")); + PAssert.that(result) + .satisfies(new VerifyOutputs(Collections.singletonList(identifier), "append")); testPipeline.run().waitUntilFinish(); @@ -148,12 +165,191 @@ public void testWriteUsingManagedTransform() { assertThat(writtenRecords, Matchers.containsInAnyOrder(TestFixtures.FILE1SNAPSHOT1.toArray())); } + /** + * @param operation if null, just perform a normal dynamic destination write test; otherwise, + * performs a simple filter on the record before writing. Valid options are "keep", "drop", + * and "only" + */ + private void writeToDynamicDestinationsAndFilter(@Nullable String operation, boolean streaming) { + String salt = Long.toString(UUID.randomUUID().hashCode(), 16); + + Schema nestedSchema = + Schema.builder().addNullableStringField("str").addInt64Field("long").build(); + Schema beamSchema = + Schema.builder() + .addNullableInt32Field("id") + .addStringField("name") + .addFloatField("cost") + .addRowField("nested", nestedSchema) + .build(); + + String destinationTemplate = "default.table_{id}_{name}_"; + // for streaming, test substitution works for windowing + if (streaming) { + destinationTemplate += "{$DD}_"; + } + destinationTemplate += salt; + + Map writeConfig = + new HashMap<>( + ImmutableMap.builder() + .put("table", destinationTemplate) + .put("catalog_name", "test-name") + .put( + "catalog_properties", + ImmutableMap.builder() + .put("type", "hadoop") + .put("warehouse", warehouse.location) + .build()) + .build()); + + if (streaming) { + writeConfig.put("triggering_frequency_seconds", 100); + } + + // (drop) we drop these fields from our iceberg table, so we drop them from our input rows + // (keep) we want to include only these fields in our iceberg table, so we keep them and drop + // everything else + // (only) we unnest and write this single record field. + List filteredFields = Arrays.asList("nested", "id"); + RowFilter filter = new RowFilter(beamSchema); + if (operation != null) { + switch (operation) { + case "drop": + filter = filter.drop(filteredFields); + writeConfig.put(operation, filteredFields); + break; + case "keep": + filter = filter.keep(filteredFields); + writeConfig.put(operation, filteredFields); + break; + case "only": + filter = filter.only(filteredFields.get(0)); + writeConfig.put(operation, filteredFields.get(0)); + break; + default: + throw new UnsupportedOperationException("Unknown operation: " + operation); + } + } + + List rows = + Arrays.asList( + Row.withSchema(beamSchema) + .addValues(0, "a", 1.23f, Row.withSchema(nestedSchema).addValues("x", 1L).build()) + .build(), + Row.withSchema(beamSchema) + .addValues(1, "b", 4.56f, Row.withSchema(nestedSchema).addValues("y", 2L).build()) + .build(), + Row.withSchema(beamSchema) + .addValues(2, "c", 7.89f, Row.withSchema(nestedSchema).addValues("z", 3L).build()) + .build()); + + // use interpolator to fetch destinations identifiers. create iceberg tables beforehand + RowStringInterpolator interpolator = new RowStringInterpolator(destinationTemplate, beamSchema); + Instant first = new Instant(0); + Instant second = first.plus(Duration.standardDays(1)); + Instant third = second.plus(Duration.standardDays(1)); + String identifier0 = + interpolator.interpolate( + ValueInSingleWindow.of(rows.get(0), first, GlobalWindow.INSTANCE, PaneInfo.NO_FIRING)); + String identifier1 = + interpolator.interpolate( + ValueInSingleWindow.of(rows.get(1), second, GlobalWindow.INSTANCE, PaneInfo.NO_FIRING)); + String identifier2 = + interpolator.interpolate( + ValueInSingleWindow.of(rows.get(2), third, GlobalWindow.INSTANCE, PaneInfo.NO_FIRING)); + + org.apache.iceberg.Schema icebergSchema = + IcebergUtils.beamSchemaToIcebergSchema(filter.outputSchema()); + Table table0 = warehouse.createTable(TableIdentifier.parse(identifier0), icebergSchema); + Table table1 = warehouse.createTable(TableIdentifier.parse(identifier1), icebergSchema); + Table table2 = warehouse.createTable(TableIdentifier.parse(identifier2), icebergSchema); + + TestStream stream = + TestStream.create(beamSchema) + .advanceWatermarkTo(first) + .addElements(rows.get(0)) + .advanceProcessingTime(Duration.standardDays(1)) + .advanceWatermarkTo(second) + .addElements(rows.get(1)) + .advanceProcessingTime(Duration.standardDays(1)) + .advanceWatermarkTo(third) + .addElements(rows.get(2)) + .advanceProcessingTime(Duration.standardDays(1)) + .advanceWatermarkToInfinity(); + + PCollection inputRows; + if (streaming) { + inputRows = + testPipeline + .apply(stream) + .apply( + Window.into(FixedWindows.of(Duration.standardMinutes(5))) + .accumulatingFiredPanes()); + } else { + inputRows = testPipeline.apply(Create.of(rows).withRowSchema(beamSchema)); + } + + PCollection result = + inputRows + .apply("Write records", Managed.write(Managed.ICEBERG).withConfig(writeConfig)) + .getSinglePCollection(); + + PAssert.that(result) + .satisfies( + new VerifyOutputs(Arrays.asList(identifier0, identifier1, identifier2), "append")); + + testPipeline.run().waitUntilFinish(); + + List table0Records = ImmutableList.copyOf(IcebergGenerics.read(table0).build()); + List table1Records = ImmutableList.copyOf(IcebergGenerics.read(table1).build()); + List table2Records = ImmutableList.copyOf(IcebergGenerics.read(table2).build()); + + assertThat( + table0Records, + Matchers.contains( + IcebergUtils.beamRowToIcebergRecord(icebergSchema, filter.filter(rows.get(0))))); + assertThat( + table1Records, + Matchers.contains( + IcebergUtils.beamRowToIcebergRecord(icebergSchema, filter.filter(rows.get(1))))); + assertThat( + table2Records, + Matchers.contains( + IcebergUtils.beamRowToIcebergRecord(icebergSchema, filter.filter(rows.get(2))))); + } + + @Test + public void testWriteToDynamicDestinations() { + writeToDynamicDestinationsAndFilter(null, false); + } + + @Test + public void testWriteToDynamicDestinationsAndDropFields() { + writeToDynamicDestinationsAndFilter("drop", false); + } + + @Test + public void testWriteToDynamicDestinationsAndKeepFields() { + writeToDynamicDestinationsAndFilter("keep", false); + } + + @Test + public void testWriteToDynamicDestinationsAndWriteOnlyRecord() { + writeToDynamicDestinationsAndFilter("only", false); + } + + @Test + public void testStreamToDynamicDestinationsAndKeepFields() { + writeToDynamicDestinationsAndFilter("keep", true); + } + private static class VerifyOutputs implements SerializableFunction, Void> { - private final String tableId; + private final List tableIds; private final String operation; - public VerifyOutputs(String identifier, String operation) { - this.tableId = identifier; + public VerifyOutputs(List identifier, String operation) { + this.tableIds = identifier; this.operation = operation; } @@ -161,7 +357,7 @@ public VerifyOutputs(String identifier, String operation) { public Void apply(Iterable input) { Row row = input.iterator().next(); - assertEquals(tableId, row.getString("table")); + assertThat(tableIds, Matchers.hasItem(row.getString("table"))); assertEquals(operation, row.getString("operation")); return null; }