From ed728986b430e606f6439ee89e6f7b2a5fe54160 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Thu, 11 Apr 2024 12:04:19 -0400 Subject: [PATCH 1/2] IcebergIO translation and tests --- .../bigquery/BigQueryIOTranslationTest.java | 2 +- .../org/apache/beam/io/iceberg/IcebergIO.java | 62 ++++++-- .../beam/io/iceberg/IcebergIOTranslation.java | 136 ++++++++++++++++++ .../beam/io/iceberg/IcebergIOWriteTest.java | 4 +- .../io/iceberg/IcebergIoTranslationTest.java | 120 ++++++++++++++++ 5 files changed, 311 insertions(+), 13 deletions(-) create mode 100644 sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/IcebergIOTranslation.java create mode 100644 sdks/java/io/iceberg/src/test/java/org/apache/beam/io/iceberg/IcebergIoTranslationTest.java diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslationTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslationTest.java index 7f2ff8945482..5be0c4a57c2c 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslationTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslationTest.java @@ -42,7 +42,7 @@ public class BigQueryIOTranslationTest { // A mapping from Read transform builder methods to the corresponding schema fields in - // KafkaIOTranslation. + // BigQueryIOTranslation. static final Map READ_TRANSFORM_SCHEMA_MAPPING = new HashMap<>(); static { diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/IcebergIO.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/IcebergIO.java index 4b9e3102a02e..84d2c2b468ec 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/IcebergIO.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/IcebergIO.java @@ -17,34 +17,78 @@ */ package org.apache.beam.io.iceberg; +import com.google.auto.value.AutoValue; +import java.util.Arrays; +import java.util.List; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Predicates; +import org.apache.iceberg.catalog.TableIdentifier; +import org.checkerframework.checker.nullness.qual.Nullable; public class IcebergIO { + public static WriteRows writeRows(IcebergCatalogConfig catalog) { + return new AutoValue_IcebergIO_WriteRows.Builder().setCatalogConfig(catalog).build(); + } + public static WriteRows writeToDynamicDestinations( IcebergCatalogConfig catalog, DynamicDestinations dynamicDestinations) { - return new WriteRows(catalog, dynamicDestinations); + return new AutoValue_IcebergIO_WriteRows.Builder() + .setCatalogConfig(catalog) + .setDynamicDestinations(dynamicDestinations) + .build(); } - static class WriteRows extends PTransform, IcebergWriteResult> { + @AutoValue + public abstract static class WriteRows extends PTransform, IcebergWriteResult> { + + abstract IcebergCatalogConfig getCatalogConfig(); + + abstract @Nullable TableIdentifier getTableIdentifier(); + + abstract @Nullable DynamicDestinations getDynamicDestinations(); + + abstract Builder toBuilder(); - private final IcebergCatalogConfig catalog; - private final DynamicDestinations dynamicDestinations; + @AutoValue.Builder + abstract static class Builder { + abstract Builder setCatalogConfig(IcebergCatalogConfig config); + + abstract Builder setTableIdentifier(TableIdentifier identifier); + + abstract Builder setDynamicDestinations(DynamicDestinations destinations); + + abstract WriteRows build(); + } + + public WriteRows to(TableIdentifier identifier) { + return toBuilder().setTableIdentifier(identifier).build(); + } - private WriteRows(IcebergCatalogConfig catalog, DynamicDestinations dynamicDestinations) { - this.catalog = catalog; - this.dynamicDestinations = dynamicDestinations; + public WriteRows to(DynamicDestinations destinations) { + return toBuilder().setDynamicDestinations(destinations).build(); } @Override public IcebergWriteResult expand(PCollection input) { + List allToArgs = Arrays.asList(getTableIdentifier(), getDynamicDestinations()); + Preconditions.checkArgument( + 1 == allToArgs.stream().filter(Predicates.notNull()).count(), + "Must set exactly one of table identifier or dynamic destinations object."); + DynamicDestinations destinations = getDynamicDestinations(); + if (destinations == null) { + destinations = + DynamicDestinations.singleTable(Preconditions.checkNotNull(getTableIdentifier())); + } return input - .apply("Set Destination Metadata", new AssignDestinations(dynamicDestinations)) + .apply("Set Destination Metadata", new AssignDestinations(destinations)) .apply( - "Write Rows to Destinations", new WriteToDestinations(catalog, dynamicDestinations)); + "Write Rows to Destinations", + new WriteToDestinations(getCatalogConfig(), destinations)); } } } diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/IcebergIOTranslation.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/IcebergIOTranslation.java new file mode 100644 index 000000000000..04898d4ced25 --- /dev/null +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/IcebergIOTranslation.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.io.iceberg; + +import static org.apache.beam.io.iceberg.IcebergIO.WriteRows; +import static org.apache.beam.sdk.util.construction.TransformUpgrader.fromByteArray; +import static org.apache.beam.sdk.util.construction.TransformUpgrader.toByteArray; + +import com.google.auto.service.AutoService; +import java.io.IOException; +import java.io.InvalidClassException; +import java.util.*; +import java.util.stream.Collectors; +import org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.runners.AppliedPTransform; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.Schema.FieldType; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.util.construction.PTransformTranslation.TransformPayloadTranslator; +import org.apache.beam.sdk.util.construction.SdkComponents; +import org.apache.beam.sdk.util.construction.TransformPayloadTranslatorRegistrar; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.catalog.TableIdentifier; +import org.checkerframework.checker.nullness.qual.Nullable; + +@SuppressWarnings({"rawtypes", "nullness"}) +public class IcebergIOTranslation { + static class IcebergIOWriteTranslator implements TransformPayloadTranslator { + + static Schema schema = + Schema.builder() + .addByteArrayField("catalog_config") + .addNullableArrayField("table_identifier", FieldType.STRING) + .addNullableByteArrayField("dynamic_destinations") + .build(); + + public static final String ICEBERG_WRITE_TRANSFORM_URN = + "beam:transform:org.apache.beam:iceberg_write:v1"; + + @Override + public String getUrn() { + return ICEBERG_WRITE_TRANSFORM_URN; + } + + @Override + public @Nullable FunctionSpec translate( + AppliedPTransform application, SdkComponents components) + throws IOException { + // Setting an empty payload since Iceberg transform payload is not actually used by runners + // currently. + return FunctionSpec.newBuilder().setUrn(getUrn()).setPayload(ByteString.empty()).build(); + } + + @Override + public Row toConfigRow(WriteRows transform) { + + Map fieldValues = new HashMap<>(); + + if (transform.getCatalogConfig() != null) { + fieldValues.put("catalog_config", toByteArray(transform.getCatalogConfig())); + } + if (transform.getTableIdentifier() != null) { + TableIdentifier identifier = transform.getTableIdentifier(); + List identifierParts = + Arrays.stream(identifier.namespace().levels()).collect(Collectors.toList()); + identifierParts.add(identifier.name()); + fieldValues.put("table_identifier", identifierParts); + } + if (transform.getDynamicDestinations() != null) { + fieldValues.put("dynamic_destinations", toByteArray(transform.getDynamicDestinations())); + } + + return Row.withSchema(schema).withFieldValues(fieldValues).build(); + } + + @Override + public WriteRows fromConfigRow(Row configRow, PipelineOptions options) { + try { + IcebergIO.WriteRows.Builder builder = new AutoValue_IcebergIO_WriteRows.Builder(); + + byte[] catalogBytes = configRow.getBytes("catalog_config"); + if (catalogBytes != null) { + builder = builder.setCatalogConfig((IcebergCatalogConfig) fromByteArray(catalogBytes)); + } + Collection tableIdentifierParts = configRow.getArray("table_identifier"); + if (tableIdentifierParts != null) { + builder = + builder.setTableIdentifier( + TableIdentifier.parse(String.join(".", tableIdentifierParts))); + } + byte[] dynamicDestinationsBytes = configRow.getBytes("dynamic_destinations"); + if (dynamicDestinationsBytes != null) { + builder = + builder.setDynamicDestinations( + (DynamicDestinations) fromByteArray(dynamicDestinationsBytes)); + } + return builder.build(); + } catch (InvalidClassException e) { + throw new RuntimeException(e); + } + } + } + + @AutoService(TransformPayloadTranslatorRegistrar.class) + public static class WriteRegistrar implements TransformPayloadTranslatorRegistrar { + + @Override + @SuppressWarnings({ + "rawtypes", + }) + public Map, ? extends TransformPayloadTranslator> + getTransformPayloadTranslators() { + return ImmutableMap., TransformPayloadTranslator>builder() + .put(AutoValue_IcebergIO_WriteRows.class, new IcebergIOWriteTranslator()) + .build(); + } + } +} diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/io/iceberg/IcebergIOWriteTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/io/iceberg/IcebergIOWriteTest.java index c77d162aafd4..15b72e7f7b20 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/io/iceberg/IcebergIOWriteTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/io/iceberg/IcebergIOWriteTest.java @@ -82,12 +82,10 @@ public void testSimpleAppend() throws Exception { .setWarehouseLocation(warehouse.location) .build(); - DynamicDestinations destination = DynamicDestinations.singleTable(tableId); - testPipeline .apply("Records To Add", Create.of(TestFixtures.asRows(TestFixtures.FILE1SNAPSHOT1))) .setRowSchema(SchemaHelper.convert(TestFixtures.SCHEMA)) - .apply("Append To Table", IcebergIO.writeToDynamicDestinations(catalog, destination)); + .apply("Append To Table", IcebergIO.writeRows(catalog).to(tableId)); LOG.info("Executing pipeline"); testPipeline.run().waitUntilFinish(); diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/io/iceberg/IcebergIoTranslationTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/io/iceberg/IcebergIoTranslationTest.java new file mode 100644 index 000000000000..018717bd480e --- /dev/null +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/io/iceberg/IcebergIoTranslationTest.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.io.iceberg; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.catalog.TableIdentifier; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +public class IcebergIoTranslationTest { + // A mapping from Read transform builder methods to the corresponding schema fields in + // BigQueryIOTranslation. + static final Map WRITE_TRANSFORM_SCHEMA_MAPPING = + ImmutableMap.builder() + .put("getCatalogConfig", "catalog_config") + .put("getTableIdentifier", "table_identifier") + .put("getDynamicDestinations", "dynamic_destinations") + .build(); + + @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); + + @Rule + public transient TestDataWarehouse warehouse = new TestDataWarehouse(TEMPORARY_FOLDER, "default"); + + @Test + public void testReCreateWriteTransformFromRowTable() { + // setting a subset of fields here. + IcebergCatalogConfig config = + IcebergCatalogConfig.builder() + .setName("test_catalog") + .setIcebergCatalogType(CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP) + .setWarehouseLocation(warehouse.location) + .build(); + IcebergIO.WriteRows writeTransform = + IcebergIO.writeRows(config).to(TableIdentifier.of("test_namespace", "test_table")); + + IcebergIOTranslation.IcebergIOWriteTranslator translator = + new IcebergIOTranslation.IcebergIOWriteTranslator(); + Row row = translator.toConfigRow(writeTransform); + + IcebergIO.WriteRows writeTransformFromRow = + translator.fromConfigRow(row, PipelineOptionsFactory.create()); + assertNotNull(writeTransformFromRow.getTableIdentifier()); + assertEquals( + "test_namespace", writeTransformFromRow.getTableIdentifier().namespace().levels()[0]); + assertEquals("test_table", writeTransformFromRow.getTableIdentifier().name()); + assertEquals("test_catalog", writeTransformFromRow.getCatalogConfig().getName()); + assertEquals( + CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP, + writeTransformFromRow.getCatalogConfig().getIcebergCatalogType()); + assertEquals( + warehouse.location, writeTransformFromRow.getCatalogConfig().getWarehouseLocation()); + } + + @Test + public void testWriteTransformRowIncludesAllFields() { + // These fields do not represent properties of the transform. + List getMethodNames = + Arrays.stream(IcebergIO.WriteRows.class.getDeclaredMethods()) + .map(method -> method.getName()) + .filter(methodName -> methodName.startsWith("get")) + .collect(Collectors.toList()); + + // Just to make sure that this does not pass trivially. + assertTrue(getMethodNames.size() > 0); + + for (String getMethodName : getMethodNames) { + assertTrue( + "Method " + + getMethodName + + " will not be tracked when upgrading the 'IcebergIO.WriteRows' transform. Please update" + + "'IcebergIOTranslation.IcebergIOWriteTranslator' to track the new method " + + "and update this test.", + WRITE_TRANSFORM_SCHEMA_MAPPING.keySet().contains(getMethodName)); + } + + // Confirming that all fields mentioned in `WRITE_TRANSFORM_SCHEMA_MAPPING` are + // actually available in the schema. + WRITE_TRANSFORM_SCHEMA_MAPPING.values().stream() + .forEach( + fieldName -> { + assertTrue( + "Field name " + + fieldName + + " was not found in the transform schema defined in " + + "IcebergIOTranslation.IcebergIOWriteTranslator.", + IcebergIOTranslation.IcebergIOWriteTranslator.schema + .getFieldNames() + .contains(fieldName)); + }); + } +} From 79d2c9486d9d51019b2429cdb055bf5f03ae2b82 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Thu, 11 Apr 2024 12:36:15 -0400 Subject: [PATCH 2/2] spotless --- .../org/apache/beam/io/iceberg/IcebergIOTranslation.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/IcebergIOTranslation.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/IcebergIOTranslation.java index 04898d4ced25..29d9b3790830 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/IcebergIOTranslation.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/IcebergIOTranslation.java @@ -24,7 +24,11 @@ import com.google.auto.service.AutoService; import java.io.IOException; import java.io.InvalidClassException; -import java.util.*; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.stream.Collectors; import org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec; import org.apache.beam.sdk.options.PipelineOptions;