diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslationTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslationTest.java index 7f2ff8945482..5be0c4a57c2c 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslationTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslationTest.java @@ -42,7 +42,7 @@ public class BigQueryIOTranslationTest { // A mapping from Read transform builder methods to the corresponding schema fields in - // KafkaIOTranslation. + // BigQueryIOTranslation. static final Map READ_TRANSFORM_SCHEMA_MAPPING = new HashMap<>(); static { diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/IcebergIO.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/IcebergIO.java index 8b8d852e106b..542f134942ec 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/IcebergIO.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/IcebergIO.java @@ -19,73 +19,116 @@ import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; +import com.google.auto.value.AutoValue; +import java.util.Arrays; +import java.util.List; import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Predicates; import org.apache.iceberg.Table; import org.apache.iceberg.catalog.TableIdentifier; import org.checkerframework.checker.nullness.qual.Nullable; public class IcebergIO { - public static WriteRows writeToDynamicDestinations( - IcebergCatalogConfig catalog, DynamicDestinations dynamicDestinations) { - return new WriteRows(catalog, dynamicDestinations); + public static WriteRows writeRows(IcebergCatalogConfig catalog) { + return new AutoValue_IcebergIO_WriteRows.Builder().setCatalogConfig(catalog).build(); } - public static ReadTable readTable(IcebergCatalogConfig catalogConfig, TableIdentifier tableId) { - return new ReadTable(catalogConfig, tableId); - } + @AutoValue + public abstract static class WriteRows extends PTransform, IcebergWriteResult> { + + abstract IcebergCatalogConfig getCatalogConfig(); + + abstract @Nullable TableIdentifier getTableIdentifier(); + + abstract @Nullable DynamicDestinations getDynamicDestinations(); + + abstract Builder toBuilder(); - static class WriteRows extends PTransform, IcebergWriteResult> { + @AutoValue.Builder + abstract static class Builder { + abstract Builder setCatalogConfig(IcebergCatalogConfig config); - private final IcebergCatalogConfig catalog; - private final DynamicDestinations dynamicDestinations; + abstract Builder setTableIdentifier(TableIdentifier identifier); - private WriteRows(IcebergCatalogConfig catalog, DynamicDestinations dynamicDestinations) { - this.catalog = catalog; - this.dynamicDestinations = dynamicDestinations; + abstract Builder setDynamicDestinations(DynamicDestinations destinations); + + abstract WriteRows build(); + } + + public WriteRows to(TableIdentifier identifier) { + return toBuilder().setTableIdentifier(identifier).build(); + } + + public WriteRows to(DynamicDestinations destinations) { + return toBuilder().setDynamicDestinations(destinations).build(); } @Override public IcebergWriteResult expand(PCollection input) { - + List allToArgs = Arrays.asList(getTableIdentifier(), getDynamicDestinations()); + Preconditions.checkArgument( + 1 == allToArgs.stream().filter(Predicates.notNull()).count(), + "Must set exactly one of table identifier or dynamic destinations object."); + + DynamicDestinations destinations = getDynamicDestinations(); + if (destinations == null) { + destinations = + DynamicDestinations.singleTable(Preconditions.checkNotNull(getTableIdentifier())); + } return input - .apply("Set Destination Metadata", new AssignDestinations(dynamicDestinations)) + .apply("Set Destination Metadata", new AssignDestinations(destinations)) .apply( - "Write Rows to Destinations", new WriteToDestinations(catalog, dynamicDestinations)); + "Write Rows to Destinations", + new WriteToDestinations(getCatalogConfig(), destinations)); } } - public static class ReadTable extends PTransform> { + public static ReadRows readRows(IcebergCatalogConfig catalogConfig) { + return new AutoValue_IcebergIO_ReadRows.Builder().setCatalogConfig(catalogConfig).build(); + } + + @AutoValue + public abstract static class ReadRows extends PTransform> { + + abstract IcebergCatalogConfig getCatalogConfig(); + + abstract @Nullable TableIdentifier getTableIdentifier(); + + abstract Builder toBuilder(); + + @AutoValue.Builder + abstract static class Builder { + abstract Builder setCatalogConfig(IcebergCatalogConfig config); - private final IcebergCatalogConfig catalogConfig; - private final transient @Nullable TableIdentifier tableId; + abstract Builder setTableIdentifier(TableIdentifier identifier); - private TableIdentifier getTableId() { - return checkStateNotNull( - tableId, "Transient field tableId null; it should not be accessed after serialization"); + abstract ReadRows build(); } - private ReadTable(IcebergCatalogConfig catalogConfig, TableIdentifier tableId) { - this.catalogConfig = catalogConfig; - this.tableId = tableId; + public ReadRows from(TableIdentifier tableIdentifier) { + return toBuilder().setTableIdentifier(tableIdentifier).build(); } @Override public PCollection expand(PBegin input) { + TableIdentifier tableId = + checkStateNotNull(getTableIdentifier(), "Must set a table to read from."); - Table table = catalogConfig.catalog().loadTable(getTableId()); + Table table = getCatalogConfig().catalog().loadTable(tableId); return input.apply( Read.from( new ScanSource( IcebergScanConfig.builder() - .setCatalogConfig(catalogConfig) + .setCatalogConfig(getCatalogConfig()) .setScanType(IcebergScanConfig.ScanType.TABLE) - .setTableIdentifier(getTableId()) + .setTableIdentifier(tableId) .setSchema(SchemaAndRowConversions.icebergSchemaToBeamSchema(table.schema())) .build()))); } diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/IcebergIOTranslation.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/IcebergIOTranslation.java new file mode 100644 index 000000000000..5dbc2a08f23b --- /dev/null +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/IcebergIOTranslation.java @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.io.iceberg; + +import static org.apache.beam.io.iceberg.IcebergIO.ReadRows; +import static org.apache.beam.io.iceberg.IcebergIO.WriteRows; +import static org.apache.beam.sdk.util.construction.TransformUpgrader.fromByteArray; +import static org.apache.beam.sdk.util.construction.TransformUpgrader.toByteArray; + +import com.google.auto.service.AutoService; +import java.io.IOException; +import java.io.InvalidClassException; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.runners.AppliedPTransform; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.Schema.FieldType; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.util.construction.PTransformTranslation.TransformPayloadTranslator; +import org.apache.beam.sdk.util.construction.SdkComponents; +import org.apache.beam.sdk.util.construction.TransformPayloadTranslatorRegistrar; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.catalog.TableIdentifier; +import org.checkerframework.checker.nullness.qual.Nullable; + +@SuppressWarnings({"rawtypes", "nullness"}) +public class IcebergIOTranslation { + static class IcebergIOReadTranslator implements TransformPayloadTranslator { + + static Schema schema = + Schema.builder() + .addByteArrayField("catalog_config") + .addNullableArrayField("table_identifier", FieldType.STRING) + .build(); + + public static final String ICEBERG_READ_TRANSFORM_URN = + "beam:transform:org.apache.beam:iceberg_read:v1"; + + @Override + public String getUrn() { + return ICEBERG_READ_TRANSFORM_URN; + } + + @Override + public @Nullable FunctionSpec translate( + AppliedPTransform application, SdkComponents components) + throws IOException { + // Setting an empty payload since Iceberg transform payload is not actually used by runners + // currently. + return FunctionSpec.newBuilder().setUrn(getUrn()).setPayload(ByteString.empty()).build(); + } + + @Override + public Row toConfigRow(ReadRows transform) { + + Map fieldValues = new HashMap<>(); + + if (transform.getCatalogConfig() != null) { + fieldValues.put("catalog_config", toByteArray(transform.getCatalogConfig())); + } + if (transform.getTableIdentifier() != null) { + TableIdentifier identifier = transform.getTableIdentifier(); + List identifierParts = + Arrays.stream(identifier.namespace().levels()).collect(Collectors.toList()); + identifierParts.add(identifier.name()); + fieldValues.put("table_identifier", identifierParts); + } + + return Row.withSchema(schema).withFieldValues(fieldValues).build(); + } + + @Override + public ReadRows fromConfigRow(Row configRow, PipelineOptions options) { + try { + ReadRows.Builder builder = new AutoValue_IcebergIO_ReadRows.Builder(); + + byte[] catalogBytes = configRow.getBytes("catalog_config"); + if (catalogBytes != null) { + builder = builder.setCatalogConfig((IcebergCatalogConfig) fromByteArray(catalogBytes)); + } + Collection tableIdentifierParts = configRow.getArray("table_identifier"); + if (tableIdentifierParts != null) { + builder = + builder.setTableIdentifier( + TableIdentifier.parse(String.join(".", tableIdentifierParts))); + } + return builder.build(); + } catch (InvalidClassException e) { + throw new RuntimeException(e); + } + } + } + + @AutoService(TransformPayloadTranslatorRegistrar.class) + public static class ReadRegistrar implements TransformPayloadTranslatorRegistrar { + @Override + @SuppressWarnings({ + "rawtypes", + }) + public Map, ? extends TransformPayloadTranslator> + getTransformPayloadTranslators() { + return ImmutableMap., TransformPayloadTranslator>builder() + .put(AutoValue_IcebergIO_ReadRows.class, new IcebergIOReadTranslator()) + .build(); + } + } + + static class IcebergIOWriteTranslator implements TransformPayloadTranslator { + + static Schema schema = + Schema.builder() + .addByteArrayField("catalog_config") + .addNullableArrayField("table_identifier", FieldType.STRING) + .addNullableByteArrayField("dynamic_destinations") + .build(); + + public static final String ICEBERG_WRITE_TRANSFORM_URN = + "beam:transform:org.apache.beam:iceberg_write:v1"; + + @Override + public String getUrn() { + return ICEBERG_WRITE_TRANSFORM_URN; + } + + @Override + public @Nullable FunctionSpec translate( + AppliedPTransform application, SdkComponents components) + throws IOException { + // Setting an empty payload since Iceberg transform payload is not actually used by runners + // currently. + return FunctionSpec.newBuilder().setUrn(getUrn()).setPayload(ByteString.empty()).build(); + } + + @Override + public Row toConfigRow(WriteRows transform) { + + Map fieldValues = new HashMap<>(); + + if (transform.getCatalogConfig() != null) { + fieldValues.put("catalog_config", toByteArray(transform.getCatalogConfig())); + } + if (transform.getTableIdentifier() != null) { + TableIdentifier identifier = transform.getTableIdentifier(); + List identifierParts = + Arrays.stream(identifier.namespace().levels()).collect(Collectors.toList()); + identifierParts.add(identifier.name()); + fieldValues.put("table_identifier", identifierParts); + } + if (transform.getDynamicDestinations() != null) { + fieldValues.put("dynamic_destinations", toByteArray(transform.getDynamicDestinations())); + } + + return Row.withSchema(schema).withFieldValues(fieldValues).build(); + } + + @Override + public WriteRows fromConfigRow(Row configRow, PipelineOptions options) { + try { + IcebergIO.WriteRows.Builder builder = new AutoValue_IcebergIO_WriteRows.Builder(); + + byte[] catalogBytes = configRow.getBytes("catalog_config"); + if (catalogBytes != null) { + builder = builder.setCatalogConfig((IcebergCatalogConfig) fromByteArray(catalogBytes)); + } + Collection tableIdentifierParts = configRow.getArray("table_identifier"); + if (tableIdentifierParts != null) { + builder = + builder.setTableIdentifier( + TableIdentifier.parse(String.join(".", tableIdentifierParts))); + } + byte[] dynamicDestinationsBytes = configRow.getBytes("dynamic_destinations"); + if (dynamicDestinationsBytes != null) { + builder = + builder.setDynamicDestinations( + (DynamicDestinations) fromByteArray(dynamicDestinationsBytes)); + } + return builder.build(); + } catch (InvalidClassException e) { + throw new RuntimeException(e); + } + } + } + + @AutoService(TransformPayloadTranslatorRegistrar.class) + public static class WriteRegistrar implements TransformPayloadTranslatorRegistrar { + @Override + @SuppressWarnings({ + "rawtypes", + }) + public Map, ? extends TransformPayloadTranslator> + getTransformPayloadTranslators() { + return ImmutableMap., TransformPayloadTranslator>builder() + .put(AutoValue_IcebergIO_WriteRows.class, new IcebergIOWriteTranslator()) + .build(); + } + } +} diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/io/iceberg/IcebergIOReadTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/io/iceberg/IcebergIOReadTest.java index 0ae63439f76e..e7b75b9e57d7 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/io/iceberg/IcebergIOReadTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/io/iceberg/IcebergIOReadTest.java @@ -102,7 +102,7 @@ public void testSimpleScan() throws Exception { PCollection output = testPipeline - .apply(IcebergIO.readTable(catalogConfig, tableId)) + .apply(IcebergIO.readRows(catalogConfig).from(tableId)) .apply(ParDo.of(new PrintRow())) .setCoder( RowCoder.of( diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/io/iceberg/IcebergIOTranslationTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/io/iceberg/IcebergIOTranslationTest.java new file mode 100644 index 000000000000..94692a78804f --- /dev/null +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/io/iceberg/IcebergIOTranslationTest.java @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.io.iceberg; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.catalog.TableIdentifier; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +public class IcebergIOTranslationTest { + // A mapping from WriteRows transform builder methods to the corresponding schema fields in + // IcebergIOTranslation. + static final Map WRITE_TRANSFORM_SCHEMA_MAPPING = + ImmutableMap.builder() + .put("getCatalogConfig", "catalog_config") + .put("getTableIdentifier", "table_identifier") + .put("getDynamicDestinations", "dynamic_destinations") + .build(); + + // A mapping from ReadRows transform builder methods to the corresponding schema fields in + // IcebergIOTranslation. + static final Map READ_TRANSFORM_SCHEMA_MAPPING = + ImmutableMap.builder() + .put("getCatalogConfig", "catalog_config") + .put("getTableIdentifier", "table_identifier") + .build(); + + @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); + + @Rule + public transient TestDataWarehouse warehouse = new TestDataWarehouse(TEMPORARY_FOLDER, "default"); + + @Test + public void testReCreateWriteTransformFromRowTable() { + // setting a subset of fields here. + IcebergCatalogConfig config = + IcebergCatalogConfig.builder() + .setName("test_catalog") + .setIcebergCatalogType(CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP) + .setWarehouseLocation(warehouse.location) + .build(); + IcebergIO.WriteRows writeTransform = + IcebergIO.writeRows(config).to(TableIdentifier.of("test_namespace", "test_table")); + + IcebergIOTranslation.IcebergIOWriteTranslator translator = + new IcebergIOTranslation.IcebergIOWriteTranslator(); + Row row = translator.toConfigRow(writeTransform); + + IcebergIO.WriteRows writeTransformFromRow = + translator.fromConfigRow(row, PipelineOptionsFactory.create()); + assertNotNull(writeTransformFromRow.getTableIdentifier()); + assertEquals( + "test_namespace", writeTransformFromRow.getTableIdentifier().namespace().levels()[0]); + assertEquals("test_table", writeTransformFromRow.getTableIdentifier().name()); + assertEquals("test_catalog", writeTransformFromRow.getCatalogConfig().getName()); + assertEquals( + CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP, + writeTransformFromRow.getCatalogConfig().getIcebergCatalogType()); + assertEquals( + warehouse.location, writeTransformFromRow.getCatalogConfig().getWarehouseLocation()); + } + + @Test + public void testWriteTransformRowIncludesAllFields() { + List getMethodNames = + Arrays.stream(IcebergIO.WriteRows.class.getDeclaredMethods()) + .map(method -> method.getName()) + .filter(methodName -> methodName.startsWith("get")) + .collect(Collectors.toList()); + + // Just to make sure that this does not pass trivially. + assertTrue(getMethodNames.size() > 0); + + for (String getMethodName : getMethodNames) { + assertTrue( + "Method " + + getMethodName + + " will not be tracked when upgrading the 'IcebergIO.WriteRows' transform. Please update" + + "'IcebergIOTranslation.IcebergIOWriteTranslator' to track the new method " + + "and update this test.", + WRITE_TRANSFORM_SCHEMA_MAPPING.keySet().contains(getMethodName)); + } + + // Confirming that all fields mentioned in `WRITE_TRANSFORM_SCHEMA_MAPPING` are + // actually available in the schema. + WRITE_TRANSFORM_SCHEMA_MAPPING.values().stream() + .forEach( + fieldName -> { + assertTrue( + "Field name " + + fieldName + + " was not found in the transform schema defined in " + + "IcebergIOTranslation.IcebergIOWriteTranslator.", + IcebergIOTranslation.IcebergIOWriteTranslator.schema + .getFieldNames() + .contains(fieldName)); + }); + } + + @Test + public void testReCreateReadTransformFromRowTable() { + // setting a subset of fields here. + IcebergCatalogConfig config = + IcebergCatalogConfig.builder() + .setName("test_catalog") + .setIcebergCatalogType(CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP) + .setWarehouseLocation(warehouse.location) + .build(); + IcebergIO.ReadRows readTransform = + IcebergIO.readRows(config).from(TableIdentifier.of("test_namespace", "test_table")); + + IcebergIOTranslation.IcebergIOReadTranslator translator = + new IcebergIOTranslation.IcebergIOReadTranslator(); + Row row = translator.toConfigRow(readTransform); + + IcebergIO.ReadRows readTransformFromRow = + translator.fromConfigRow(row, PipelineOptionsFactory.create()); + assertNotNull(readTransformFromRow.getTableIdentifier()); + assertEquals( + "test_namespace", readTransformFromRow.getTableIdentifier().namespace().levels()[0]); + assertEquals("test_table", readTransformFromRow.getTableIdentifier().name()); + assertEquals("test_catalog", readTransformFromRow.getCatalogConfig().getName()); + assertEquals( + CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP, + readTransformFromRow.getCatalogConfig().getIcebergCatalogType()); + assertEquals( + warehouse.location, readTransformFromRow.getCatalogConfig().getWarehouseLocation()); + } + + @Test + public void testReadTransformRowIncludesAllFields() { + List getMethodNames = + Arrays.stream(IcebergIO.ReadRows.class.getDeclaredMethods()) + .map(method -> method.getName()) + .filter(methodName -> methodName.startsWith("get")) + .collect(Collectors.toList()); + + // Just to make sure that this does not pass trivially. + assertTrue(getMethodNames.size() > 0); + + for (String getMethodName : getMethodNames) { + assertTrue( + "Method " + + getMethodName + + " will not be tracked when upgrading the 'IcebergIO.ReadRows' transform. Please update" + + "'IcebergIOTranslation.IcebergIOReadTranslator' to track the new method " + + "and update this test.", + READ_TRANSFORM_SCHEMA_MAPPING.keySet().contains(getMethodName)); + } + + // Confirming that all fields mentioned in `WRITE_TRANSFORM_SCHEMA_MAPPING` are + // actually available in the schema. + READ_TRANSFORM_SCHEMA_MAPPING.values().stream() + .forEach( + fieldName -> { + assertTrue( + "Field name " + + fieldName + + " was not found in the transform schema defined in " + + "IcebergIOTranslation.IcebergIOReadTranslator.", + IcebergIOTranslation.IcebergIOReadTranslator.schema + .getFieldNames() + .contains(fieldName)); + }); + } +} diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/io/iceberg/IcebergIOWriteTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/io/iceberg/IcebergIOWriteTest.java index 011ab2662457..cd2303716198 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/io/iceberg/IcebergIOWriteTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/io/iceberg/IcebergIOWriteTest.java @@ -82,12 +82,10 @@ public void testSimpleAppend() throws Exception { .setWarehouseLocation(warehouse.location) .build(); - DynamicDestinations destination = DynamicDestinations.singleTable(tableId); - testPipeline .apply("Records To Add", Create.of(TestFixtures.asRows(TestFixtures.FILE1SNAPSHOT1))) .setRowSchema(SchemaAndRowConversions.icebergSchemaToBeamSchema(TestFixtures.SCHEMA)) - .apply("Append To Table", IcebergIO.writeToDynamicDestinations(catalog, destination)); + .apply("Append To Table", IcebergIO.writeRows(catalog).to(tableId)); LOG.info("Executing pipeline"); testPipeline.run().waitUntilFinish(); @@ -118,7 +116,7 @@ public void testDynamicDestinationsWithoutSpillover() throws Exception { .setWarehouseLocation(warehouse.location) .build(); - DynamicDestinations destination = + DynamicDestinations dynamicDestinations = new DynamicDestinations() { private final Schema schema = Schema.builder().addInt64Field("tableNumber").build(); @@ -154,7 +152,7 @@ public IcebergDestination instantiateDestination(Row dest) { TestFixtures.FILE1SNAPSHOT2, TestFixtures.FILE1SNAPSHOT3)))) .setRowSchema(SchemaAndRowConversions.icebergSchemaToBeamSchema(TestFixtures.SCHEMA)) - .apply("Append To Table", IcebergIO.writeToDynamicDestinations(catalog, destination)); + .apply("Append To Table", IcebergIO.writeRows(catalog).to(dynamicDestinations)); LOG.info("Executing pipeline"); testPipeline.run().waitUntilFinish(); @@ -208,7 +206,7 @@ public void testDynamicDestinationsWithSpillover() throws Exception { .setWarehouseLocation(warehouse.location) .build(); - DynamicDestinations destination = + DynamicDestinations dynamicDestinations = new DynamicDestinations() { private final Schema schema = Schema.builder().addInt64Field("tableNumber").build(); @@ -237,7 +235,7 @@ public IcebergDestination instantiateDestination(Row dest) { testPipeline .apply("Records To Add", Create.of(TestFixtures.asRows(elements))) .setRowSchema(SchemaAndRowConversions.icebergSchemaToBeamSchema(TestFixtures.SCHEMA)) - .apply("Append To Table", IcebergIO.writeToDynamicDestinations(catalog, destination)); + .apply("Append To Table", IcebergIO.writeRows(catalog).to(dynamicDestinations)); LOG.info("Executing pipeline"); testPipeline.run().waitUntilFinish();