From 27e5fb0beb33b153e328e97aa8e5277f94cd5fbc Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Mon, 15 Apr 2024 12:49:12 -0400 Subject: [PATCH] Read schematransform and tests --- .../IcebergReadSchemaTransformProvider.java | 116 ++++++++++++++++++ .../IcebergWriteSchemaTransformProvider.java | 74 ++--------- .../iceberg/SchemaTransformCatalogConfig.java | 59 +++++++++ ...cebergReadSchemaTransformProviderTest.java | 113 +++++++++++++++++ ...ebergWriteSchemaTransformProviderTest.java | 8 +- 5 files changed, 303 insertions(+), 67 deletions(-) create mode 100644 sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/IcebergReadSchemaTransformProvider.java create mode 100644 sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/SchemaTransformCatalogConfig.java create mode 100644 sdks/java/io/iceberg/src/test/java/org/apache/beam/io/iceberg/IcebergReadSchemaTransformProviderTest.java diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/IcebergReadSchemaTransformProvider.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/IcebergReadSchemaTransformProvider.java new file mode 100644 index 000000000000..465e55129414 --- /dev/null +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/IcebergReadSchemaTransformProvider.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.io.iceberg; + +import com.google.auto.service.AutoService; +import com.google.auto.value.AutoValue; +import org.apache.beam.sdk.schemas.AutoValueSchema; +import org.apache.beam.sdk.schemas.annotations.DefaultSchema; +import org.apache.beam.sdk.schemas.transforms.SchemaTransform; +import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; +import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider; +import org.apache.beam.io.iceberg.IcebergReadSchemaTransformProvider.Config; + +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionRowTuple; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings; +import org.apache.iceberg.catalog.TableIdentifier; + +import java.util.Collections; +import java.util.List; + +/** + * SchemaTransform implementation for {@link IcebergIO#readTable}. Reads records from Iceberg and outputs a + * {@link org.apache.beam.sdk.values.PCollection} of Beam {@link org.apache.beam.sdk.values.Row}s. + */ +@AutoService(SchemaTransformProvider.class) +public class IcebergReadSchemaTransformProvider extends TypedSchemaTransformProvider { + static final String OUTPUT_TAG = "output"; + + @Override + protected SchemaTransform from(Config configuration) { + configuration.validate(); + return new IcebergReadSchemaTransform(configuration); + } + + @Override + public List outputCollectionNames() { + return Collections.singletonList(OUTPUT_TAG); + } + + @Override + public String identifier() { + return "beam:schematransform:org.apache.beam:iceberg_read:v1"; + } + + @DefaultSchema(AutoValueSchema.class) + @AutoValue + public abstract static class Config { + public static Builder builder() { + return new AutoValue_IcebergReadSchemaTransformProvider_Config.Builder(); + } + + public abstract String getTable(); + + public abstract SchemaTransformCatalogConfig getCatalogConfig(); + + @AutoValue.Builder + public abstract static class Builder { + public abstract Builder setTable(String tables); + + public abstract Builder setCatalogConfig(SchemaTransformCatalogConfig catalogConfig); + + public abstract Config build(); + } + + public void validate() { + getCatalogConfig().validate(); + } + } + + + + private static class IcebergReadSchemaTransform extends SchemaTransform { + private final Config configuration; + + IcebergReadSchemaTransform(Config configuration) { + this.configuration = configuration; + } + + @Override + public PCollectionRowTuple expand(PCollectionRowTuple input) { + SchemaTransformCatalogConfig catalogConfig = configuration.getCatalogConfig(); + + IcebergCatalogConfig.Builder catalogBuilder = + IcebergCatalogConfig.builder() + .setName(catalogConfig.getCatalogName()); + + if (!Strings.isNullOrEmpty(catalogConfig.getCatalogType())) { + catalogBuilder = catalogBuilder.setIcebergCatalogType(catalogConfig.getCatalogType()); + } + if (!Strings.isNullOrEmpty(catalogConfig.getWarehouseLocation())) { + catalogBuilder = catalogBuilder.setWarehouseLocation(catalogConfig.getWarehouseLocation()); + } + + PCollection output = input.getPipeline().apply(IcebergIO.readTable(catalogBuilder.build(), TableIdentifier.parse(configuration.getTable()))); + + return PCollectionRowTuple.of(OUTPUT_TAG, output); + } + } +} diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/IcebergWriteSchemaTransformProvider.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/IcebergWriteSchemaTransformProvider.java index ec4a946f22d4..a65351c639d5 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/IcebergWriteSchemaTransformProvider.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/IcebergWriteSchemaTransformProvider.java @@ -101,13 +101,13 @@ public static Builder builder() { public abstract String getTable(); - public abstract CatalogConfig getCatalogConfig(); + public abstract SchemaTransformCatalogConfig getCatalogConfig(); @AutoValue.Builder public abstract static class Builder { public abstract Builder setTable(String tables); - public abstract Builder setCatalogConfig(CatalogConfig catalogConfig); + public abstract Builder setCatalogConfig(SchemaTransformCatalogConfig catalogConfig); public abstract Config build(); } @@ -117,53 +117,9 @@ public void validate() { } } - @DefaultSchema(AutoValueSchema.class) - @AutoValue - public abstract static class CatalogConfig { - public static Builder builder() { - return new AutoValue_IcebergWriteSchemaTransformProvider_CatalogConfig.Builder(); - } - - public abstract String getCatalogName(); - - public abstract @Nullable String getCatalogType(); - - public abstract @Nullable String getCatalogImplementation(); - - public abstract @Nullable String getWarehouseLocation(); - - @AutoValue.Builder - public abstract static class Builder { - - public abstract Builder setCatalogName(String catalogName); - - public abstract Builder setCatalogType(String catalogType); - - public abstract Builder setCatalogImplementation(String catalogImplementation); - - public abstract Builder setWarehouseLocation(String warehouseLocation); - - public abstract CatalogConfig build(); - } - - Set validTypes = - Sets.newHashSet( - CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP, - CatalogUtil.ICEBERG_CATALOG_TYPE_HIVE, - CatalogUtil.ICEBERG_CATALOG_TYPE_REST); - - public void validate() { - if (Strings.isNullOrEmpty(getCatalogType())) { - checkArgument( - validTypes.contains(Preconditions.checkArgumentNotNull(getCatalogType())), - "Invalid catalog type. Please pick one of %s", - validTypes); - } - } - } @VisibleForTesting - static class IcebergWriteSchemaTransform extends SchemaTransform { + private static class IcebergWriteSchemaTransform extends SchemaTransform { private final Config configuration; IcebergWriteSchemaTransform(Config configuration) { @@ -175,13 +131,11 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) { PCollection rows = input.get(INPUT_TAG); - CatalogConfig catalogConfig = configuration.getCatalogConfig(); + SchemaTransformCatalogConfig catalogConfig = configuration.getCatalogConfig(); IcebergCatalogConfig.Builder catalogBuilder = IcebergCatalogConfig.builder() - .setName(catalogConfig.getCatalogName()) - .setIcebergCatalogType(catalogConfig.getCatalogType()) - .setWarehouseLocation(catalogConfig.getWarehouseLocation()); + .setName(catalogConfig.getCatalogName()); if (!Strings.isNullOrEmpty(catalogConfig.getCatalogType())) { catalogBuilder = catalogBuilder.setIcebergCatalogType(catalogConfig.getCatalogType()); @@ -212,17 +166,13 @@ static class SnapshotToRow extends SimpleFunction, Row> { @Override public Row apply(KV input) { Snapshot snapshot = input.getValue(); - Row row = - Row.withSchema(OUTPUT_SCHEMA) - .addValues( - input.getKey(), - snapshot.operation(), - snapshot.summary(), - snapshot.manifestListLocation()) - .build(); - System.out.println("SNAPSHOT: " + snapshot); - System.out.println("ROW: " + row); - return row; + return Row.withSchema(OUTPUT_SCHEMA) + .addValues( + input.getKey(), + snapshot.operation(), + snapshot.summary(), + snapshot.manifestListLocation()) + .build(); } } } diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/SchemaTransformCatalogConfig.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/SchemaTransformCatalogConfig.java new file mode 100644 index 000000000000..5d687f0409de --- /dev/null +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/SchemaTransformCatalogConfig.java @@ -0,0 +1,59 @@ +package org.apache.beam.io.iceberg; + +import com.google.auto.value.AutoValue; +import org.apache.beam.sdk.schemas.AutoValueSchema; +import org.apache.beam.sdk.schemas.annotations.DefaultSchema; +import org.apache.beam.sdk.util.Preconditions; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets; +import org.apache.iceberg.CatalogUtil; +import org.checkerframework.checker.nullness.qual.Nullable; + +import java.util.Set; + +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; + +@DefaultSchema(AutoValueSchema.class) +@AutoValue +public abstract class SchemaTransformCatalogConfig { + public static Builder builder() { + return new AutoValue_SchemaTransformCatalogConfig.Builder(); + } + + public abstract String getCatalogName(); + + public abstract @Nullable String getCatalogType(); + + public abstract @Nullable String getCatalogImplementation(); + + public abstract @Nullable String getWarehouseLocation(); + + @AutoValue.Builder + public abstract static class Builder { + + public abstract Builder setCatalogName(String catalogName); + + public abstract Builder setCatalogType(String catalogType); + + public abstract Builder setCatalogImplementation(String catalogImplementation); + + public abstract Builder setWarehouseLocation(String warehouseLocation); + + public abstract SchemaTransformCatalogConfig build(); + } + + Set validTypes = + Sets.newHashSet( + CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP, + CatalogUtil.ICEBERG_CATALOG_TYPE_HIVE, + CatalogUtil.ICEBERG_CATALOG_TYPE_REST); + + public void validate() { + if (Strings.isNullOrEmpty(getCatalogType())) { + checkArgument( + validTypes.contains(Preconditions.checkArgumentNotNull(getCatalogType())), + "Invalid catalog type. Please pick one of %s", + validTypes); + } + } +} diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/io/iceberg/IcebergReadSchemaTransformProviderTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/io/iceberg/IcebergReadSchemaTransformProviderTest.java new file mode 100644 index 000000000000..71b55234b321 --- /dev/null +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/io/iceberg/IcebergReadSchemaTransformProviderTest.java @@ -0,0 +1,113 @@ +package org.apache.beam.io.iceberg; + +import org.apache.beam.sdk.coders.RowCoder; +import org.apache.beam.sdk.schemas.NoSuchSchemaException; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.SchemaRegistry; +import org.apache.beam.sdk.schemas.transforms.SchemaTransform; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionRowTuple; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.TypeDescriptors; +import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.TableIdentifier; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.util.List; +import java.util.UUID; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsInAnyOrder; + +public class IcebergReadSchemaTransformProviderTest { + + @ClassRule + public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); + + @Rule + public TestDataWarehouse warehouse = new TestDataWarehouse(TEMPORARY_FOLDER, "default"); + + @Rule public TestPipeline testPipeline = TestPipeline.create(); + + + @Test + public void testBuildTransformWithRow() throws NoSuchSchemaException { + Row catalogConfigRow = + Row.withSchema( + SchemaRegistry.createDefault() + .getSchema(SchemaTransformCatalogConfig.class)) + .withFieldValue("catalogName", "test_name") + .withFieldValue("catalogType", "test_type") + .withFieldValue("catalogImplementation", "testImplementation") + .withFieldValue("warehouseLocation", "test_location") + .build(); + Row transformConfigRow = + Row.withSchema(new IcebergReadSchemaTransformProvider().configurationSchema()) + .withFieldValue("table", "test_table_identifier") + .withFieldValue("catalogConfig", catalogConfigRow) + .build(); + + SchemaTransform transform = new IcebergReadSchemaTransformProvider().from(transformConfigRow); + + System.out.println(transform.getName()); + } + + @Test + public void testSimpleScan() throws Exception { + String identifier = "default.table_" + Long.toString(UUID.randomUUID().hashCode(), 16); + TableIdentifier tableId = TableIdentifier.parse(identifier); + + Table simpleTable = warehouse.createTable(tableId, TestFixtures.SCHEMA); + final Schema schema = SchemaAndRowConversions.icebergSchemaToBeamSchema(TestFixtures.SCHEMA); + + simpleTable + .newFastAppend() + .appendFile( + warehouse.writeRecords( + "file1s1.parquet", simpleTable.schema(), TestFixtures.FILE1SNAPSHOT1)) + .appendFile( + warehouse.writeRecords( + "file2s1.parquet", simpleTable.schema(), TestFixtures.FILE2SNAPSHOT1)) + .appendFile( + warehouse.writeRecords( + "file3s1.parquet", simpleTable.schema(), TestFixtures.FILE3SNAPSHOT1)) + .commit(); + + final List expectedRows = + Stream.of( + TestFixtures.FILE1SNAPSHOT1, + TestFixtures.FILE2SNAPSHOT1, + TestFixtures.FILE3SNAPSHOT1) + .flatMap(List::stream) + .map(record -> SchemaAndRowConversions.recordToRow(schema, record)) + .collect(Collectors.toList()); + + SchemaTransformCatalogConfig catalogConfig = SchemaTransformCatalogConfig.builder() + .setCatalogName("hadoop") + .setCatalogType(CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP) + .setWarehouseLocation(warehouse.location).build(); + + IcebergReadSchemaTransformProvider.Config readConfig = IcebergReadSchemaTransformProvider.Config.builder().setTable(identifier).setCatalogConfig(catalogConfig).build(); + + PCollection output = PCollectionRowTuple.empty(testPipeline).apply(new IcebergReadSchemaTransformProvider().from(readConfig)).get(IcebergReadSchemaTransformProvider.OUTPUT_TAG); + + PAssert.that(output) + .satisfies( + (Iterable rows) -> { + assertThat(rows, containsInAnyOrder(expectedRows.toArray())); + return null; + }); + + testPipeline.run(); + } +} diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/io/iceberg/IcebergWriteSchemaTransformProviderTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/io/iceberg/IcebergWriteSchemaTransformProviderTest.java index 9ae38ecfd843..ac113b6317d3 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/io/iceberg/IcebergWriteSchemaTransformProviderTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/io/iceberg/IcebergWriteSchemaTransformProviderTest.java @@ -17,7 +17,6 @@ */ package org.apache.beam.io.iceberg; -import static org.apache.beam.io.iceberg.IcebergWriteSchemaTransformProvider.CatalogConfig; import static org.apache.beam.io.iceberg.IcebergWriteSchemaTransformProvider.Config; import static org.apache.beam.io.iceberg.IcebergWriteSchemaTransformProvider.INPUT_TAG; import static org.apache.beam.io.iceberg.IcebergWriteSchemaTransformProvider.OUTPUT_TAG; @@ -65,7 +64,7 @@ public void testBuildTransformWithRow() throws NoSuchSchemaException { Row catalogConfigRow = Row.withSchema( SchemaRegistry.createDefault() - .getSchema(IcebergWriteSchemaTransformProvider.CatalogConfig.class)) + .getSchema(SchemaTransformCatalogConfig.class)) .withFieldValue("catalogName", "test_name") .withFieldValue("catalogType", "test_type") .withFieldValue("catalogImplementation", "testImplementation") @@ -87,7 +86,6 @@ public void testSimpleAppend() { String identifier = "default.table_" + Long.toString(UUID.randomUUID().hashCode(), 16); TableIdentifier tableId = TableIdentifier.parse(identifier); - System.out.println(tableId); // Create a table and add records to it. Table table = warehouse.createTable(tableId, TestFixtures.SCHEMA); @@ -96,7 +94,7 @@ public void testSimpleAppend() { Config.builder() .setTable(identifier) .setCatalogConfig( - CatalogConfig.builder() + SchemaTransformCatalogConfig.builder() .setCatalogName("hadoop") .setCatalogType(CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP) .setWarehouseLocation(warehouse.location) @@ -109,7 +107,7 @@ public void testSimpleAppend() { testPipeline .apply( "Records To Add", Create.of(TestFixtures.asRows(TestFixtures.FILE1SNAPSHOT1))) - .setRowSchema(SchemaHelper.convert(TestFixtures.SCHEMA))); + .setRowSchema(SchemaAndRowConversions.icebergSchemaToBeamSchema(TestFixtures.SCHEMA))); PCollection result = input