From 1abc6c8195aefdcecace3890bacc6f650e6f1898 Mon Sep 17 00:00:00 2001 From: Filipe Regadas Date: Fri, 27 Dec 2024 15:38:22 +0000 Subject: [PATCH] Support Iceberg partition identity transform (#33332) * Support Iceberg partition identity transform * remove uneeded avro dep * Trigger icerberg integration tests * Revert "remove uneeded avro dep" This reverts commit 0b075af322c0cdec3f7ed06593d7c0766c8b654c. --- .../IO_Iceberg_Integration_Tests.json | 2 +- sdks/java/io/iceberg/build.gradle | 1 + .../beam/sdk/io/iceberg/ScanTaskReader.java | 36 +++++- .../sdk/io/iceberg/IcebergIOReadTest.java | 72 ++++++++++++ .../sdk/io/iceberg/TestDataWarehouse.java | 15 ++- .../beam/sdk/io/iceberg/TestFixtures.java | 111 +++++++++++------- 6 files changed, 188 insertions(+), 49 deletions(-) diff --git a/.github/trigger_files/IO_Iceberg_Integration_Tests.json b/.github/trigger_files/IO_Iceberg_Integration_Tests.json index a84f69a97721..5cf4f475f317 100644 --- a/.github/trigger_files/IO_Iceberg_Integration_Tests.json +++ b/.github/trigger_files/IO_Iceberg_Integration_Tests.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run", - "modification": 6 + "modification": 7 } diff --git a/sdks/java/io/iceberg/build.gradle b/sdks/java/io/iceberg/build.gradle index 319848b7626b..cd9e7044632b 100644 --- a/sdks/java/io/iceberg/build.gradle +++ b/sdks/java/io/iceberg/build.gradle @@ -45,6 +45,7 @@ dependencies { implementation library.java.vendored_guava_32_1_2_jre implementation project(path: ":sdks:java:core", configuration: "shadow") implementation project(path: ":model:pipeline", configuration: "shadow") + implementation library.java.avro implementation library.java.slf4j_api implementation library.java.joda_time implementation "org.apache.parquet:parquet-column:$parquet_version" diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ScanTaskReader.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ScanTaskReader.java index b7cb42b2eacb..5784dfd79744 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ScanTaskReader.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ScanTaskReader.java @@ -21,15 +21,22 @@ import java.io.IOException; import java.util.ArrayDeque; +import java.util.Collections; +import java.util.Map; import java.util.NoSuchElementException; import java.util.Queue; +import java.util.Set; +import java.util.function.BiFunction; import javax.annotation.Nullable; import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.values.Row; import org.apache.iceberg.DataFile; import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; import org.apache.iceberg.Table; import org.apache.iceberg.avro.Avro; +import org.apache.iceberg.data.IdentityPartitionConverters; import org.apache.iceberg.data.Record; import org.apache.iceberg.data.avro.DataReader; import org.apache.iceberg.data.orc.GenericOrcReader; @@ -42,6 +49,9 @@ import org.apache.iceberg.io.InputFile; import org.apache.iceberg.orc.ORC; import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.TypeUtil; +import org.apache.iceberg.util.PartitionUtil; import org.checkerframework.checker.nullness.qual.NonNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -112,6 +122,8 @@ public boolean advance() throws IOException { FileScanTask fileTask = fileScanTasks.remove(); DataFile file = fileTask.file(); InputFile input = decryptor.getInputFile(fileTask); + Map idToConstants = + constantsMap(fileTask, IdentityPartitionConverters::convertConstant, project); CloseableIterable iterable; switch (file.format()) { @@ -121,7 +133,9 @@ public boolean advance() throws IOException { ORC.read(input) .split(fileTask.start(), fileTask.length()) .project(project) - .createReaderFunc(fileSchema -> GenericOrcReader.buildReader(project, fileSchema)) + .createReaderFunc( + fileSchema -> + GenericOrcReader.buildReader(project, fileSchema, idToConstants)) .filter(fileTask.residual()) .build(); break; @@ -132,7 +146,8 @@ public boolean advance() throws IOException { .split(fileTask.start(), fileTask.length()) .project(project) .createReaderFunc( - fileSchema -> GenericParquetReaders.buildReader(project, fileSchema)) + fileSchema -> + GenericParquetReaders.buildReader(project, fileSchema, idToConstants)) .filter(fileTask.residual()) .build(); break; @@ -142,7 +157,8 @@ public boolean advance() throws IOException { Avro.read(input) .split(fileTask.start(), fileTask.length()) .project(project) - .createReaderFunc(DataReader::create) + .createReaderFunc( + fileSchema -> DataReader.create(project, fileSchema, idToConstants)) .build(); break; default: @@ -155,6 +171,20 @@ public boolean advance() throws IOException { return false; } + private Map constantsMap( + FileScanTask task, BiFunction converter, Schema schema) { + PartitionSpec spec = task.spec(); + Set idColumns = spec.identitySourceIds(); + Schema partitionSchema = TypeUtil.select(schema, idColumns); + boolean projectsIdentityPartitionColumns = !partitionSchema.columns().isEmpty(); + + if (projectsIdentityPartitionColumns) { + return PartitionUtil.constantsMap(task, converter); + } else { + return Collections.emptyMap(); + } + } + @Override public Row getCurrent() throws NoSuchElementException { if (current == null) { diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOReadTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOReadTest.java index fe4a07dedfdf..39c621975547 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOReadTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOReadTest.java @@ -17,6 +17,7 @@ */ package org.apache.beam.sdk.io.iceberg; +import static org.apache.beam.sdk.io.iceberg.TestFixtures.createRecord; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsInAnyOrder; @@ -35,8 +36,11 @@ import org.apache.beam.sdk.values.Row; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.PartitionKey; +import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Table; import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.types.Types; import org.junit.ClassRule; import org.junit.Rule; import org.junit.Test; @@ -122,4 +126,72 @@ public void testSimpleScan() throws Exception { testPipeline.run(); } + + @Test + public void testIdentityColumnScan() throws Exception { + TableIdentifier tableId = + TableIdentifier.of("default", "table" + Long.toString(UUID.randomUUID().hashCode(), 16)); + Table simpleTable = warehouse.createTable(tableId, TestFixtures.SCHEMA); + + String identityColumnName = "identity"; + String identityColumnValue = "some-value"; + simpleTable.updateSchema().addColumn(identityColumnName, Types.StringType.get()).commit(); + simpleTable.updateSpec().addField(identityColumnName).commit(); + + PartitionSpec spec = simpleTable.spec(); + PartitionKey partitionKey = new PartitionKey(simpleTable.spec(), simpleTable.schema()); + partitionKey.set(0, identityColumnValue); + + simpleTable + .newFastAppend() + .appendFile( + warehouse.writeRecords( + "file1s1.parquet", + TestFixtures.SCHEMA, + spec, + partitionKey, + TestFixtures.FILE1SNAPSHOT1)) + .commit(); + + final Schema schema = IcebergUtils.icebergSchemaToBeamSchema(simpleTable.schema()); + final List expectedRows = + Stream.of(TestFixtures.FILE1SNAPSHOT1_DATA) + .flatMap(List::stream) + .map( + d -> + ImmutableMap.builder() + .putAll(d) + .put(identityColumnName, identityColumnValue) + .build()) + .map(r -> createRecord(simpleTable.schema(), r)) + .map(record -> IcebergUtils.icebergRecordToBeamRow(schema, record)) + .collect(Collectors.toList()); + + Map catalogProps = + ImmutableMap.builder() + .put("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP) + .put("warehouse", warehouse.location) + .build(); + + IcebergCatalogConfig catalogConfig = + IcebergCatalogConfig.builder() + .setCatalogName("name") + .setCatalogProperties(catalogProps) + .build(); + + PCollection output = + testPipeline + .apply(IcebergIO.readRows(catalogConfig).from(tableId)) + .apply(ParDo.of(new PrintRow())) + .setCoder(RowCoder.of(IcebergUtils.icebergSchemaToBeamSchema(simpleTable.schema()))); + + PAssert.that(output) + .satisfies( + (Iterable rows) -> { + assertThat(rows, containsInAnyOrder(expectedRows.toArray())); + return null; + }); + + testPipeline.run(); + } } diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/TestDataWarehouse.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/TestDataWarehouse.java index 1e1c84d31de9..9352123b5c77 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/TestDataWarehouse.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/TestDataWarehouse.java @@ -32,6 +32,7 @@ import org.apache.iceberg.FileFormat; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; +import org.apache.iceberg.StructLike; import org.apache.iceberg.Table; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.Namespace; @@ -108,6 +109,16 @@ protected void after() { public DataFile writeRecords(String filename, Schema schema, List records) throws IOException { + return writeRecords(filename, schema, PartitionSpec.unpartitioned(), null, records); + } + + public DataFile writeRecords( + String filename, + Schema schema, + PartitionSpec spec, + StructLike partition, + List records) + throws IOException { Path path = new Path(location, filename); FileFormat format = FileFormat.fromFileName(filename); @@ -134,9 +145,11 @@ public DataFile writeRecords(String filename, Schema schema, List record } appender.addAll(records); appender.close(); - return DataFiles.builder(PartitionSpec.unpartitioned()) + + return DataFiles.builder(spec) .withInputFile(HadoopInputFile.fromPath(path, hadoopConf)) .withMetrics(appender.metrics()) + .withPartition(partition) .build(); } diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/TestFixtures.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/TestFixtures.java index 6143bd03491d..a2ca86d1b5a2 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/TestFixtures.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/TestFixtures.java @@ -21,11 +21,13 @@ import static org.apache.iceberg.types.Types.NestedField.required; import java.util.ArrayList; +import java.util.List; +import java.util.Map; import org.apache.beam.sdk.values.Row; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; import org.apache.iceberg.Schema; -import org.apache.iceberg.data.GenericRecord; import org.apache.iceberg.data.Record; import org.apache.iceberg.types.Types; @@ -34,58 +36,75 @@ public class TestFixtures { new Schema( required(1, "id", Types.LongType.get()), optional(2, "data", Types.StringType.get())); - private static final Record genericRecord = GenericRecord.create(SCHEMA); - - /* First file in test table */ - public static final ImmutableList FILE1SNAPSHOT1 = + public static final List> FILE1SNAPSHOT1_DATA = ImmutableList.of( - genericRecord.copy(ImmutableMap.of("id", 0L, "data", "clarification")), - genericRecord.copy(ImmutableMap.of("id", 1L, "data", "risky")), - genericRecord.copy(ImmutableMap.of("id", 2L, "data", "falafel"))); - public static final ImmutableList FILE1SNAPSHOT2 = + ImmutableMap.of("id", 0L, "data", "clarification"), + ImmutableMap.of("id", 1L, "data", "risky"), + ImmutableMap.of("id", 2L, "data", "falafel")); + public static final List> FILE1SNAPSHOT2_DATA = ImmutableList.of( - genericRecord.copy(ImmutableMap.of("id", 3L, "data", "obscure")), - genericRecord.copy(ImmutableMap.of("id", 4L, "data", "secure")), - genericRecord.copy(ImmutableMap.of("id", 5L, "data", "feta"))); - public static final ImmutableList FILE1SNAPSHOT3 = + ImmutableMap.of("id", 3L, "data", "obscure"), + ImmutableMap.of("id", 4L, "data", "secure"), + ImmutableMap.of("id", 5L, "data", "feta")); + public static final List> FILE1SNAPSHOT3_DATA = ImmutableList.of( - genericRecord.copy(ImmutableMap.of("id", 6L, "data", "brainy")), - genericRecord.copy(ImmutableMap.of("id", 7L, "data", "film")), - genericRecord.copy(ImmutableMap.of("id", 8L, "data", "feta"))); - - /* Second file in test table */ - public static final ImmutableList FILE2SNAPSHOT1 = + ImmutableMap.of("id", 6L, "data", "brainy"), + ImmutableMap.of("id", 7L, "data", "film"), + ImmutableMap.of("id", 8L, "data", "feta")); + public static final List> FILE2SNAPSHOT1_DATA = ImmutableList.of( - genericRecord.copy(ImmutableMap.of("id", 10L, "data", "clammy")), - genericRecord.copy(ImmutableMap.of("id", 11L, "data", "evacuate")), - genericRecord.copy(ImmutableMap.of("id", 12L, "data", "tissue"))); - public static final ImmutableList FILE2SNAPSHOT2 = + ImmutableMap.of("id", 10L, "data", "clammy"), + ImmutableMap.of("id", 11L, "data", "evacuate"), + ImmutableMap.of("id", 12L, "data", "tissue")); + public static final List> FILE2SNAPSHOT2_DATA = ImmutableList.of( - genericRecord.copy(ImmutableMap.of("id", 14L, "data", "radical")), - genericRecord.copy(ImmutableMap.of("id", 15L, "data", "collocation")), - genericRecord.copy(ImmutableMap.of("id", 16L, "data", "book"))); - public static final ImmutableList FILE2SNAPSHOT3 = + ImmutableMap.of("id", 14L, "data", "radical"), + ImmutableMap.of("id", 15L, "data", "collocation"), + ImmutableMap.of("id", 16L, "data", "book")); + public static final List> FILE2SNAPSHOT3_DATA = ImmutableList.of( - genericRecord.copy(ImmutableMap.of("id", 16L, "data", "cake")), - genericRecord.copy(ImmutableMap.of("id", 17L, "data", "intrinsic")), - genericRecord.copy(ImmutableMap.of("id", 18L, "data", "paper"))); - - /* Third file in test table */ - public static final ImmutableList FILE3SNAPSHOT1 = + ImmutableMap.of("id", 16L, "data", "cake"), + ImmutableMap.of("id", 17L, "data", "intrinsic"), + ImmutableMap.of("id", 18L, "data", "paper")); + public static final List> FILE3SNAPSHOT1_DATA = ImmutableList.of( - genericRecord.copy(ImmutableMap.of("id", 20L, "data", "ocean")), - genericRecord.copy(ImmutableMap.of("id", 21L, "data", "holistic")), - genericRecord.copy(ImmutableMap.of("id", 22L, "data", "preventative"))); - public static final ImmutableList FILE3SNAPSHOT2 = + ImmutableMap.of("id", 20L, "data", "ocean"), + ImmutableMap.of("id", 21L, "data", "holistic"), + ImmutableMap.of("id", 22L, "data", "preventative")); + public static final List> FILE3SNAPSHOT2_DATA = ImmutableList.of( - genericRecord.copy(ImmutableMap.of("id", 24L, "data", "cloud")), - genericRecord.copy(ImmutableMap.of("id", 25L, "data", "zen")), - genericRecord.copy(ImmutableMap.of("id", 26L, "data", "sky"))); - public static final ImmutableList FILE3SNAPSHOT3 = + ImmutableMap.of("id", 24L, "data", "cloud"), + ImmutableMap.of("id", 25L, "data", "zen"), + ImmutableMap.of("id", 26L, "data", "sky")); + public static final List> FILE3SNAPSHOT3_DATA = ImmutableList.of( - genericRecord.copy(ImmutableMap.of("id", 26L, "data", "belleview")), - genericRecord.copy(ImmutableMap.of("id", 27L, "data", "overview")), - genericRecord.copy(ImmutableMap.of("id", 28L, "data", "tender"))); + ImmutableMap.of("id", 26L, "data", "belleview"), + ImmutableMap.of("id", 27L, "data", "overview"), + ImmutableMap.of("id", 28L, "data", "tender")); + + /* First file in test table */ + public static final List FILE1SNAPSHOT1 = + Lists.transform(FILE1SNAPSHOT1_DATA, d -> createRecord(SCHEMA, d)); + public static final List FILE1SNAPSHOT2 = + Lists.transform(FILE1SNAPSHOT2_DATA, d -> createRecord(SCHEMA, d)); + public static final List FILE1SNAPSHOT3 = + Lists.transform(FILE1SNAPSHOT3_DATA, d -> createRecord(SCHEMA, d)); + + /* Second file in test table */ + public static final List FILE2SNAPSHOT1 = + Lists.transform(FILE2SNAPSHOT1_DATA, d -> createRecord(SCHEMA, d)); + public static final List FILE2SNAPSHOT2 = + Lists.transform(FILE2SNAPSHOT2_DATA, d -> createRecord(SCHEMA, d)); + public static final List FILE2SNAPSHOT3 = + Lists.transform(FILE2SNAPSHOT3_DATA, d -> createRecord(SCHEMA, d)); + + /* Third file in test table */ + public static final List FILE3SNAPSHOT1 = + Lists.transform(FILE3SNAPSHOT1_DATA, d -> createRecord(SCHEMA, d)); + public static final List FILE3SNAPSHOT2 = + Lists.transform(FILE3SNAPSHOT2_DATA, d -> createRecord(SCHEMA, d)); + public static final List FILE3SNAPSHOT3 = + Lists.transform(FILE3SNAPSHOT3_DATA, d -> createRecord(SCHEMA, d)); public static final ImmutableList asRows(Iterable records) { ArrayList rows = new ArrayList<>(); @@ -98,4 +117,8 @@ public static final ImmutableList asRows(Iterable records) { } return ImmutableList.copyOf(rows); } + + public static Record createRecord(org.apache.iceberg.Schema schema, Map values) { + return org.apache.iceberg.data.GenericRecord.create(schema).copy(values); + } }