From aa8b1ed7e354203629320ada135454256b0eeb27 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Mon, 15 Apr 2024 12:53:59 -0400 Subject: [PATCH] pulling in IcebergIO changes; spotless --- .../IcebergReadSchemaTransformProvider.java | 135 ++++++------ .../IcebergWriteSchemaTransformProvider.java | 18 +- .../iceberg/SchemaTransformCatalogConfig.java | 80 ++++--- ...cebergReadSchemaTransformProviderTest.java | 196 ++++++++++-------- ...ebergWriteSchemaTransformProviderTest.java | 11 +- 5 files changed, 229 insertions(+), 211 deletions(-) diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/IcebergReadSchemaTransformProvider.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/IcebergReadSchemaTransformProvider.java index 465e55129414..b5d7539ce491 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/IcebergReadSchemaTransformProvider.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/IcebergReadSchemaTransformProvider.java @@ -19,98 +19,99 @@ import com.google.auto.service.AutoService; import com.google.auto.value.AutoValue; +import java.util.Collections; +import java.util.List; +import org.apache.beam.io.iceberg.IcebergReadSchemaTransformProvider.Config; import org.apache.beam.sdk.schemas.AutoValueSchema; import org.apache.beam.sdk.schemas.annotations.DefaultSchema; import org.apache.beam.sdk.schemas.transforms.SchemaTransform; import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider; -import org.apache.beam.io.iceberg.IcebergReadSchemaTransformProvider.Config; - import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionRowTuple; import org.apache.beam.sdk.values.Row; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings; import org.apache.iceberg.catalog.TableIdentifier; -import java.util.Collections; -import java.util.List; - /** - * SchemaTransform implementation for {@link IcebergIO#readTable}. Reads records from Iceberg and outputs a - * {@link org.apache.beam.sdk.values.PCollection} of Beam {@link org.apache.beam.sdk.values.Row}s. + * SchemaTransform implementation for {@link IcebergIO#readRows}. Reads records from Iceberg and + * outputs a {@link org.apache.beam.sdk.values.PCollection} of Beam {@link + * org.apache.beam.sdk.values.Row}s. */ @AutoService(SchemaTransformProvider.class) public class IcebergReadSchemaTransformProvider extends TypedSchemaTransformProvider { - static final String OUTPUT_TAG = "output"; - - @Override - protected SchemaTransform from(Config configuration) { - configuration.validate(); - return new IcebergReadSchemaTransform(configuration); - } - - @Override - public List outputCollectionNames() { - return Collections.singletonList(OUTPUT_TAG); + static final String OUTPUT_TAG = "output"; + + @Override + protected SchemaTransform from(Config configuration) { + configuration.validate(); + return new IcebergReadSchemaTransform(configuration); + } + + @Override + public List outputCollectionNames() { + return Collections.singletonList(OUTPUT_TAG); + } + + @Override + public String identifier() { + return "beam:schematransform:org.apache.beam:iceberg_read:v1"; + } + + @DefaultSchema(AutoValueSchema.class) + @AutoValue + public abstract static class Config { + public static Builder builder() { + return new AutoValue_IcebergReadSchemaTransformProvider_Config.Builder(); } - @Override - public String identifier() { - return "beam:schematransform:org.apache.beam:iceberg_read:v1"; - } - - @DefaultSchema(AutoValueSchema.class) - @AutoValue - public abstract static class Config { - public static Builder builder() { - return new AutoValue_IcebergReadSchemaTransformProvider_Config.Builder(); - } - - public abstract String getTable(); + public abstract String getTable(); - public abstract SchemaTransformCatalogConfig getCatalogConfig(); + public abstract SchemaTransformCatalogConfig getCatalogConfig(); - @AutoValue.Builder - public abstract static class Builder { - public abstract Builder setTable(String tables); + @AutoValue.Builder + public abstract static class Builder { + public abstract Builder setTable(String tables); - public abstract Builder setCatalogConfig(SchemaTransformCatalogConfig catalogConfig); + public abstract Builder setCatalogConfig(SchemaTransformCatalogConfig catalogConfig); - public abstract Config build(); - } - - public void validate() { - getCatalogConfig().validate(); - } + public abstract Config build(); } + public void validate() { + getCatalogConfig().validate(); + } + } + private static class IcebergReadSchemaTransform extends SchemaTransform { + private final Config configuration; - private static class IcebergReadSchemaTransform extends SchemaTransform { - private final Config configuration; - - IcebergReadSchemaTransform(Config configuration) { - this.configuration = configuration; - } - - @Override - public PCollectionRowTuple expand(PCollectionRowTuple input) { - SchemaTransformCatalogConfig catalogConfig = configuration.getCatalogConfig(); - - IcebergCatalogConfig.Builder catalogBuilder = - IcebergCatalogConfig.builder() - .setName(catalogConfig.getCatalogName()); - - if (!Strings.isNullOrEmpty(catalogConfig.getCatalogType())) { - catalogBuilder = catalogBuilder.setIcebergCatalogType(catalogConfig.getCatalogType()); - } - if (!Strings.isNullOrEmpty(catalogConfig.getWarehouseLocation())) { - catalogBuilder = catalogBuilder.setWarehouseLocation(catalogConfig.getWarehouseLocation()); - } - - PCollection output = input.getPipeline().apply(IcebergIO.readTable(catalogBuilder.build(), TableIdentifier.parse(configuration.getTable()))); + IcebergReadSchemaTransform(Config configuration) { + this.configuration = configuration; + } - return PCollectionRowTuple.of(OUTPUT_TAG, output); - } + @Override + public PCollectionRowTuple expand(PCollectionRowTuple input) { + SchemaTransformCatalogConfig catalogConfig = configuration.getCatalogConfig(); + + IcebergCatalogConfig.Builder catalogBuilder = + IcebergCatalogConfig.builder().setName(catalogConfig.getCatalogName()); + + if (!Strings.isNullOrEmpty(catalogConfig.getCatalogType())) { + catalogBuilder = catalogBuilder.setIcebergCatalogType(catalogConfig.getCatalogType()); + } + if (!Strings.isNullOrEmpty(catalogConfig.getWarehouseLocation())) { + catalogBuilder = catalogBuilder.setWarehouseLocation(catalogConfig.getWarehouseLocation()); + } + + PCollection output = + input + .getPipeline() + .apply( + IcebergIO.readRows(catalogBuilder.build()) + .from(TableIdentifier.parse(configuration.getTable()))); + + return PCollectionRowTuple.of(OUTPUT_TAG, output); } + } } diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/IcebergWriteSchemaTransformProvider.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/IcebergWriteSchemaTransformProvider.java index a65351c639d5..e1e51d564d12 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/IcebergWriteSchemaTransformProvider.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/IcebergWriteSchemaTransformProvider.java @@ -17,13 +17,10 @@ */ package org.apache.beam.io.iceberg; -import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; - import com.google.auto.service.AutoService; import com.google.auto.value.AutoValue; import java.util.Collections; import java.util.List; -import java.util.Set; import org.apache.beam.io.iceberg.IcebergWriteSchemaTransformProvider.Config; import org.apache.beam.sdk.schemas.AutoValueSchema; import org.apache.beam.sdk.schemas.Schema; @@ -33,22 +30,18 @@ import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.SimpleFunction; -import org.apache.beam.sdk.util.Preconditions; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionRowTuple; import org.apache.beam.sdk.values.Row; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets; -import org.apache.iceberg.CatalogUtil; import org.apache.iceberg.Snapshot; import org.apache.iceberg.catalog.TableIdentifier; -import org.checkerframework.checker.nullness.qual.Nullable; /** - * SchemaTransform implementation for {@link IcebergIO#writeToDynamicDestinations}. Writes Beam Rows - * to Iceberg and outputs a {@code PCollection} representing snapshots created in the process. + * SchemaTransform implementation for {@link IcebergIO#writeRows}. Writes Beam Rows to Iceberg and + * outputs a {@code PCollection} representing snapshots created in the process. */ @AutoService(SchemaTransformProvider.class) public class IcebergWriteSchemaTransformProvider extends TypedSchemaTransformProvider { @@ -117,7 +110,6 @@ public void validate() { } } - @VisibleForTesting private static class IcebergWriteSchemaTransform extends SchemaTransform { private final Config configuration; @@ -134,8 +126,7 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) { SchemaTransformCatalogConfig catalogConfig = configuration.getCatalogConfig(); IcebergCatalogConfig.Builder catalogBuilder = - IcebergCatalogConfig.builder() - .setName(catalogConfig.getCatalogName()); + IcebergCatalogConfig.builder().setName(catalogConfig.getCatalogName()); if (!Strings.isNullOrEmpty(catalogConfig.getCatalogType())) { catalogBuilder = catalogBuilder.setIcebergCatalogType(catalogConfig.getCatalogType()); @@ -149,8 +140,7 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) { DynamicDestinations.singleTable(TableIdentifier.parse(configuration.getTable())); IcebergWriteResult result = - rows.apply( - IcebergIO.writeToDynamicDestinations(catalogBuilder.build(), dynamicDestinations)); + rows.apply(IcebergIO.writeRows(catalogBuilder.build()).to(dynamicDestinations)); PCollection snapshots = result diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/SchemaTransformCatalogConfig.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/SchemaTransformCatalogConfig.java index 5d687f0409de..9dd7b0f71554 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/SchemaTransformCatalogConfig.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/SchemaTransformCatalogConfig.java @@ -1,6 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.beam.io.iceberg; +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; + import com.google.auto.value.AutoValue; +import java.util.Set; import org.apache.beam.sdk.schemas.AutoValueSchema; import org.apache.beam.sdk.schemas.annotations.DefaultSchema; import org.apache.beam.sdk.util.Preconditions; @@ -9,51 +29,47 @@ import org.apache.iceberg.CatalogUtil; import org.checkerframework.checker.nullness.qual.Nullable; -import java.util.Set; - -import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; - @DefaultSchema(AutoValueSchema.class) @AutoValue public abstract class SchemaTransformCatalogConfig { - public static Builder builder() { - return new AutoValue_SchemaTransformCatalogConfig.Builder(); - } + public static Builder builder() { + return new AutoValue_SchemaTransformCatalogConfig.Builder(); + } - public abstract String getCatalogName(); + public abstract String getCatalogName(); - public abstract @Nullable String getCatalogType(); + public abstract @Nullable String getCatalogType(); - public abstract @Nullable String getCatalogImplementation(); + public abstract @Nullable String getCatalogImplementation(); - public abstract @Nullable String getWarehouseLocation(); + public abstract @Nullable String getWarehouseLocation(); - @AutoValue.Builder - public abstract static class Builder { + @AutoValue.Builder + public abstract static class Builder { - public abstract Builder setCatalogName(String catalogName); + public abstract Builder setCatalogName(String catalogName); - public abstract Builder setCatalogType(String catalogType); + public abstract Builder setCatalogType(String catalogType); - public abstract Builder setCatalogImplementation(String catalogImplementation); + public abstract Builder setCatalogImplementation(String catalogImplementation); - public abstract Builder setWarehouseLocation(String warehouseLocation); + public abstract Builder setWarehouseLocation(String warehouseLocation); - public abstract SchemaTransformCatalogConfig build(); - } + public abstract SchemaTransformCatalogConfig build(); + } + + Set validTypes = + Sets.newHashSet( + CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP, + CatalogUtil.ICEBERG_CATALOG_TYPE_HIVE, + CatalogUtil.ICEBERG_CATALOG_TYPE_REST); - Set validTypes = - Sets.newHashSet( - CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP, - CatalogUtil.ICEBERG_CATALOG_TYPE_HIVE, - CatalogUtil.ICEBERG_CATALOG_TYPE_REST); - - public void validate() { - if (Strings.isNullOrEmpty(getCatalogType())) { - checkArgument( - validTypes.contains(Preconditions.checkArgumentNotNull(getCatalogType())), - "Invalid catalog type. Please pick one of %s", - validTypes); - } + public void validate() { + if (Strings.isNullOrEmpty(getCatalogType())) { + checkArgument( + validTypes.contains(Preconditions.checkArgumentNotNull(getCatalogType())), + "Invalid catalog type. Please pick one of %s", + validTypes); } + } } diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/io/iceberg/IcebergReadSchemaTransformProviderTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/io/iceberg/IcebergReadSchemaTransformProviderTest.java index 71b55234b321..9b73743fa3e7 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/io/iceberg/IcebergReadSchemaTransformProviderTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/io/iceberg/IcebergReadSchemaTransformProviderTest.java @@ -1,18 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.beam.io.iceberg; -import org.apache.beam.sdk.coders.RowCoder; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsInAnyOrder; + +import java.util.List; +import java.util.UUID; +import java.util.stream.Collectors; +import java.util.stream.Stream; import org.apache.beam.sdk.schemas.NoSuchSchemaException; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.SchemaRegistry; import org.apache.beam.sdk.schemas.transforms.SchemaTransform; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.transforms.MapElements; -import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionRowTuple; import org.apache.beam.sdk.values.Row; -import org.apache.beam.sdk.values.TypeDescriptors; import org.apache.iceberg.CatalogUtil; import org.apache.iceberg.Table; import org.apache.iceberg.catalog.TableIdentifier; @@ -21,93 +41,87 @@ import org.junit.Test; import org.junit.rules.TemporaryFolder; -import java.util.List; -import java.util.UUID; -import java.util.stream.Collectors; -import java.util.stream.Stream; - -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.containsInAnyOrder; - public class IcebergReadSchemaTransformProviderTest { - @ClassRule - public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); - - @Rule - public TestDataWarehouse warehouse = new TestDataWarehouse(TEMPORARY_FOLDER, "default"); - - @Rule public TestPipeline testPipeline = TestPipeline.create(); - - - @Test - public void testBuildTransformWithRow() throws NoSuchSchemaException { - Row catalogConfigRow = - Row.withSchema( - SchemaRegistry.createDefault() - .getSchema(SchemaTransformCatalogConfig.class)) - .withFieldValue("catalogName", "test_name") - .withFieldValue("catalogType", "test_type") - .withFieldValue("catalogImplementation", "testImplementation") - .withFieldValue("warehouseLocation", "test_location") - .build(); - Row transformConfigRow = - Row.withSchema(new IcebergReadSchemaTransformProvider().configurationSchema()) - .withFieldValue("table", "test_table_identifier") - .withFieldValue("catalogConfig", catalogConfigRow) - .build(); - - SchemaTransform transform = new IcebergReadSchemaTransformProvider().from(transformConfigRow); - - System.out.println(transform.getName()); - } - - @Test - public void testSimpleScan() throws Exception { - String identifier = "default.table_" + Long.toString(UUID.randomUUID().hashCode(), 16); - TableIdentifier tableId = TableIdentifier.parse(identifier); - - Table simpleTable = warehouse.createTable(tableId, TestFixtures.SCHEMA); - final Schema schema = SchemaAndRowConversions.icebergSchemaToBeamSchema(TestFixtures.SCHEMA); - - simpleTable - .newFastAppend() - .appendFile( - warehouse.writeRecords( - "file1s1.parquet", simpleTable.schema(), TestFixtures.FILE1SNAPSHOT1)) - .appendFile( - warehouse.writeRecords( - "file2s1.parquet", simpleTable.schema(), TestFixtures.FILE2SNAPSHOT1)) - .appendFile( - warehouse.writeRecords( - "file3s1.parquet", simpleTable.schema(), TestFixtures.FILE3SNAPSHOT1)) - .commit(); - - final List expectedRows = - Stream.of( - TestFixtures.FILE1SNAPSHOT1, - TestFixtures.FILE2SNAPSHOT1, - TestFixtures.FILE3SNAPSHOT1) - .flatMap(List::stream) - .map(record -> SchemaAndRowConversions.recordToRow(schema, record)) - .collect(Collectors.toList()); - - SchemaTransformCatalogConfig catalogConfig = SchemaTransformCatalogConfig.builder() - .setCatalogName("hadoop") - .setCatalogType(CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP) - .setWarehouseLocation(warehouse.location).build(); - - IcebergReadSchemaTransformProvider.Config readConfig = IcebergReadSchemaTransformProvider.Config.builder().setTable(identifier).setCatalogConfig(catalogConfig).build(); - - PCollection output = PCollectionRowTuple.empty(testPipeline).apply(new IcebergReadSchemaTransformProvider().from(readConfig)).get(IcebergReadSchemaTransformProvider.OUTPUT_TAG); - - PAssert.that(output) - .satisfies( - (Iterable rows) -> { - assertThat(rows, containsInAnyOrder(expectedRows.toArray())); - return null; - }); - - testPipeline.run(); - } + @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); + + @Rule public TestDataWarehouse warehouse = new TestDataWarehouse(TEMPORARY_FOLDER, "default"); + + @Rule public TestPipeline testPipeline = TestPipeline.create(); + + @Test + public void testBuildTransformWithRow() throws NoSuchSchemaException { + Row catalogConfigRow = + Row.withSchema(SchemaRegistry.createDefault().getSchema(SchemaTransformCatalogConfig.class)) + .withFieldValue("catalogName", "test_name") + .withFieldValue("catalogType", "test_type") + .withFieldValue("catalogImplementation", "testImplementation") + .withFieldValue("warehouseLocation", "test_location") + .build(); + Row transformConfigRow = + Row.withSchema(new IcebergReadSchemaTransformProvider().configurationSchema()) + .withFieldValue("table", "test_table_identifier") + .withFieldValue("catalogConfig", catalogConfigRow) + .build(); + + new IcebergReadSchemaTransformProvider().from(transformConfigRow); + } + + @Test + public void testSimpleScan() throws Exception { + String identifier = "default.table_" + Long.toString(UUID.randomUUID().hashCode(), 16); + TableIdentifier tableId = TableIdentifier.parse(identifier); + + Table simpleTable = warehouse.createTable(tableId, TestFixtures.SCHEMA); + final Schema schema = SchemaAndRowConversions.icebergSchemaToBeamSchema(TestFixtures.SCHEMA); + + simpleTable + .newFastAppend() + .appendFile( + warehouse.writeRecords( + "file1s1.parquet", simpleTable.schema(), TestFixtures.FILE1SNAPSHOT1)) + .appendFile( + warehouse.writeRecords( + "file2s1.parquet", simpleTable.schema(), TestFixtures.FILE2SNAPSHOT1)) + .appendFile( + warehouse.writeRecords( + "file3s1.parquet", simpleTable.schema(), TestFixtures.FILE3SNAPSHOT1)) + .commit(); + + final List expectedRows = + Stream.of( + TestFixtures.FILE1SNAPSHOT1, + TestFixtures.FILE2SNAPSHOT1, + TestFixtures.FILE3SNAPSHOT1) + .flatMap(List::stream) + .map(record -> SchemaAndRowConversions.recordToRow(schema, record)) + .collect(Collectors.toList()); + + SchemaTransformCatalogConfig catalogConfig = + SchemaTransformCatalogConfig.builder() + .setCatalogName("hadoop") + .setCatalogType(CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP) + .setWarehouseLocation(warehouse.location) + .build(); + + IcebergReadSchemaTransformProvider.Config readConfig = + IcebergReadSchemaTransformProvider.Config.builder() + .setTable(identifier) + .setCatalogConfig(catalogConfig) + .build(); + + PCollection output = + PCollectionRowTuple.empty(testPipeline) + .apply(new IcebergReadSchemaTransformProvider().from(readConfig)) + .get(IcebergReadSchemaTransformProvider.OUTPUT_TAG); + + PAssert.that(output) + .satisfies( + (Iterable rows) -> { + assertThat(rows, containsInAnyOrder(expectedRows.toArray())); + return null; + }); + + testPipeline.run(); + } } diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/io/iceberg/IcebergWriteSchemaTransformProviderTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/io/iceberg/IcebergWriteSchemaTransformProviderTest.java index ac113b6317d3..639fbdd331de 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/io/iceberg/IcebergWriteSchemaTransformProviderTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/io/iceberg/IcebergWriteSchemaTransformProviderTest.java @@ -62,9 +62,7 @@ public class IcebergWriteSchemaTransformProviderTest { @Test public void testBuildTransformWithRow() throws NoSuchSchemaException { Row catalogConfigRow = - Row.withSchema( - SchemaRegistry.createDefault() - .getSchema(SchemaTransformCatalogConfig.class)) + Row.withSchema(SchemaRegistry.createDefault().getSchema(SchemaTransformCatalogConfig.class)) .withFieldValue("catalogName", "test_name") .withFieldValue("catalogType", "test_type") .withFieldValue("catalogImplementation", "testImplementation") @@ -76,9 +74,7 @@ public void testBuildTransformWithRow() throws NoSuchSchemaException { .withFieldValue("catalogConfig", catalogConfigRow) .build(); - SchemaTransform transform = new IcebergWriteSchemaTransformProvider().from(transformConfigRow); - - System.out.println(transform.getName()); + new IcebergWriteSchemaTransformProvider().from(transformConfigRow); } @Test @@ -107,7 +103,8 @@ public void testSimpleAppend() { testPipeline .apply( "Records To Add", Create.of(TestFixtures.asRows(TestFixtures.FILE1SNAPSHOT1))) - .setRowSchema(SchemaAndRowConversions.icebergSchemaToBeamSchema(TestFixtures.SCHEMA))); + .setRowSchema( + SchemaAndRowConversions.icebergSchemaToBeamSchema(TestFixtures.SCHEMA))); PCollection result = input