From 1ebd5039a97f7fd680be8e271c6eee9959619230 Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Tue, 23 Apr 2024 15:35:55 -0400 Subject: [PATCH] Simplify intermediate data in Iceberg sink; use manifest files --- sdks/java/io/iceberg/build.gradle | 1 - .../sdk/io/iceberg/AppendFilesToTables.java | 2 +- .../beam/sdk/io/iceberg/FileWriteResult.java | 210 ++++-------------- .../beam/sdk/io/iceberg/RecordWriter.java | 20 +- .../io/iceberg/WriteGroupedRowsToFiles.java | 3 +- .../io/iceberg/WriteUngroupedRowsToFiles.java | 5 +- .../sdk/io/iceberg/FileWriteResultTest.java | 166 -------------- 7 files changed, 61 insertions(+), 346 deletions(-) delete mode 100644 sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/FileWriteResultTest.java diff --git a/sdks/java/io/iceberg/build.gradle b/sdks/java/io/iceberg/build.gradle index e721b98f1029..f82284e3b393 100644 --- a/sdks/java/io/iceberg/build.gradle +++ b/sdks/java/io/iceberg/build.gradle @@ -52,7 +52,6 @@ dependencies { implementation "org.apache.iceberg:iceberg-api:$iceberg_version" implementation "org.apache.iceberg:iceberg-parquet:$iceberg_version" implementation "org.apache.iceberg:iceberg-orc:$iceberg_version" - implementation library.java.avro implementation library.java.hadoop_common testImplementation library.java.hadoop_client diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTables.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTables.java index e4ba60001824..bb42df5a9330 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTables.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTables.java @@ -90,7 +90,7 @@ public void processElement( Table table = getCatalog().loadTable(TableIdentifier.parse(element.getKey())); AppendFiles update = table.newAppend(); for (FileWriteResult writtenFile : element.getValue()) { - update.appendFile(writtenFile.getDataFile()); + update.appendManifest(writtenFile.getManifestFile()); } update.commit(); out.outputWithTimestamp( diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/FileWriteResult.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/FileWriteResult.java index c12febc03f48..2459c0befde1 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/FileWriteResult.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/FileWriteResult.java @@ -17,197 +17,69 @@ */ package org.apache.beam.sdk.io.iceberg; -import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull; - import com.google.auto.value.AutoValue; import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import org.apache.avro.Schema; -import org.apache.beam.sdk.coders.ByteArrayCoder; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.CoderException; -import org.apache.beam.sdk.coders.CoderProvider; -import org.apache.beam.sdk.coders.CoderProviders; -import org.apache.beam.sdk.coders.DefaultCoder; -import org.apache.beam.sdk.coders.SerializableCoder; -import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.coders.StructuredCoder; -import org.apache.beam.sdk.values.TypeDescriptor; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; -import org.apache.iceberg.DataFile; -import org.apache.iceberg.PartitionSpec; -import org.apache.iceberg.avro.AvroEncoderUtil; -import org.apache.iceberg.avro.AvroSchemaUtil; +import org.apache.beam.sdk.schemas.AutoValueSchema; +import org.apache.beam.sdk.schemas.annotations.DefaultSchema; +import org.apache.beam.sdk.schemas.annotations.SchemaIgnore; +import org.apache.iceberg.ManifestFile; +import org.apache.iceberg.ManifestFiles; import org.apache.iceberg.catalog.TableIdentifier; -import org.apache.iceberg.types.Types; -import org.checkerframework.checker.nullness.qual.Nullable; +import org.checkerframework.checker.nullness.qual.MonotonicNonNull; @AutoValue -@DefaultCoder(FileWriteResult.FileWriteResultCoder.class) +@DefaultSchema(AutoValueSchema.class) abstract class FileWriteResult { - public abstract TableIdentifier getTableIdentifier(); - public abstract PartitionSpec getPartitionSpec(); + private transient @MonotonicNonNull TableIdentifier cachedTableIdentifier; + private transient @MonotonicNonNull ManifestFile cachedManifestFile; - public abstract DataFile getDataFile(); + abstract String getTableIdentifierString(); - public static Builder builder() { - return new AutoValue_FileWriteResult.Builder(); - } + @SuppressWarnings("mutable") + abstract byte[] getManifestFileBytes(); - @AutoValue.Builder - abstract static class Builder { - public abstract Builder setTableIdentifier(TableIdentifier tableId); - - public abstract Builder setPartitionSpec(PartitionSpec partitionSpec); - - public abstract Builder setDataFile(DataFile dataFiles); - - public abstract FileWriteResult build(); - } - - public static class FileWriteResultCoder extends StructuredCoder { - static final int VERSION = 0; - private static final FileWriteResultCoder SINGLETON = new FileWriteResultCoder(); - - private static final Coder tableIdentifierCoder = StringUtf8Coder.of(); - private static final Coder partitionSpecCoder = - SerializableCoder.of(PartitionSpec.class); - private static final Coder dataFileBytesCoder = ByteArrayCoder.of(); - - private static Schema getDataFileAvroSchema(FileWriteResult fileWriteResult) { - Types.StructType partitionType = fileWriteResult.getPartitionSpec().partitionType(); - Types.StructType dataFileStruct = DataFile.getType(partitionType); - Map dataFileNames = - ImmutableMap.of( - dataFileStruct, "org.apache.iceberg.GenericDataFile", - partitionType, "org.apache.iceberg.PartitionData"); - return AvroSchemaUtil.convert(dataFileStruct, dataFileNames); - } - - @Override - public void encode(FileWriteResult value, OutputStream outStream) - throws CoderException, IOException { - // "version" of this coder. - // If breaking changes are introduced (e.g. from Beam, Iceberg, Avro, etc..), - // then update this version and create a fork in decode() below for the new decode logic. - // This helps keep the pipeline update-compatible - outStream.write(VERSION); - - tableIdentifierCoder.encode(value.getTableIdentifier().toString(), outStream); - partitionSpecCoder.encode(value.getPartitionSpec(), outStream); - dataFileBytesCoder.encode( - AvroEncoderUtil.encode(value.getDataFile(), getDataFileAvroSchema(value)), outStream); - } - - @Override - public FileWriteResult decode(InputStream inStream) throws CoderException, IOException { - // Forking logic can be added here depending on the version of this coder - assert inStream.read() == 0; - - TableIdentifier tableId = TableIdentifier.parse(tableIdentifierCoder.decode(inStream)); - PartitionSpec partitionSpec = partitionSpecCoder.decode(inStream); - DataFile dataFile = - checkArgumentNotNull( - AvroEncoderUtil.decode(dataFileBytesCoder.decode(inStream)), - "Decoding of dataFile resulted in null"); - return FileWriteResult.builder() - .setTableIdentifier(tableId) - .setDataFile(dataFile) - .setPartitionSpec(partitionSpec) - .build(); - } - - @Override - public List> getCoderArguments() { - return Collections.emptyList(); - } - - @Override - public Object structuralValue(FileWriteResult fileWriteResult) { - return new FileWriteResultDeepEqualityWrapper(fileWriteResult); - } - - @Override - public void verifyDeterministic() throws NonDeterministicException {} - - @Override - public TypeDescriptor getEncodedTypeDescriptor() { - return TypeDescriptor.of(FileWriteResult.class); + @SchemaIgnore + public TableIdentifier getTableIdentifier() { + if (cachedTableIdentifier == null) { + cachedTableIdentifier = TableIdentifier.parse(getTableIdentifierString()); } + return cachedTableIdentifier; + } - public static FileWriteResultCoder of() { - return SINGLETON; + @SchemaIgnore + public ManifestFile getManifestFile() { + if (cachedManifestFile == null) { + try { + cachedManifestFile = ManifestFiles.decode(getManifestFileBytes()); + } catch (IOException exc) { + throw new RuntimeException("Error decoding manifest file bytes"); + } } + return cachedManifestFile; + } - @SuppressWarnings("unused") // used via `DefaultCoder` annotation - public static CoderProvider getCoderProvider() { - return CoderProviders.forCoder( - TypeDescriptor.of(FileWriteResult.class), FileWriteResultCoder.of()); - } + public static Builder builder() { + return new AutoValue_FileWriteResult.Builder(); } - private static class FileWriteResultDeepEqualityWrapper { - private final FileWriteResult fileWriteResult; + @AutoValue.Builder + abstract static class Builder { - private FileWriteResultDeepEqualityWrapper(FileWriteResult fileWriteResult) { - this.fileWriteResult = fileWriteResult; - } + abstract Builder setTableIdentifierString(String tableIdString); - @Override - public boolean equals(@Nullable Object obj) { - if (obj == this) { - return true; - } - if (obj == null) { - return false; - } - if (!(obj instanceof FileWriteResultDeepEqualityWrapper)) { - return false; - } - FileWriteResultDeepEqualityWrapper other = (FileWriteResultDeepEqualityWrapper) obj; + abstract Builder setManifestFileBytes(byte[] manifestFileBytes); - return Objects.equals( - fileWriteResult.getTableIdentifier(), other.fileWriteResult.getTableIdentifier()) - && Objects.equals( - fileWriteResult.getPartitionSpec(), other.fileWriteResult.getPartitionSpec()) - && dataFilesEqual(fileWriteResult.getDataFile(), other.fileWriteResult.getDataFile()); + @SchemaIgnore + public Builder setTableIdentifier(TableIdentifier tableId) { + return setTableIdentifierString(tableId.toString()); } - private boolean dataFilesEqual(DataFile first, DataFile second) { - return Objects.equals(first.pos(), second.pos()) - && first.specId() == second.specId() - && Objects.equals(first.content(), second.content()) - && Objects.equals(first.path(), second.path()) - && Objects.equals(first.format(), second.format()) - && Objects.equals(first.partition(), second.partition()) - && first.recordCount() == second.recordCount() - && first.fileSizeInBytes() == second.fileSizeInBytes() - && Objects.equals(first.columnSizes(), second.columnSizes()) - && Objects.equals(first.valueCounts(), second.valueCounts()) - && Objects.equals(first.nullValueCounts(), second.nullValueCounts()) - && Objects.equals(first.nanValueCounts(), second.nanValueCounts()) - && Objects.equals(first.lowerBounds(), second.lowerBounds()) - && Objects.equals(first.upperBounds(), second.upperBounds()) - && Objects.equals(first.keyMetadata(), second.keyMetadata()) - && Objects.equals(first.splitOffsets(), second.splitOffsets()) - && Objects.equals(first.equalityFieldIds(), second.equalityFieldIds()) - && Objects.equals(first.sortOrderId(), second.sortOrderId()) - && Objects.equals(first.dataSequenceNumber(), second.dataSequenceNumber()) - && Objects.equals(first.fileSequenceNumber(), second.fileSequenceNumber()); + @SchemaIgnore + public Builder setManifestFile(ManifestFile manifestFile) throws IOException { + return setManifestFileBytes(ManifestFiles.encode(manifestFile)); } - @Override - public int hashCode() { - return Objects.hash( - fileWriteResult.getTableIdentifier(), - fileWriteResult.getPartitionSpec(), - fileWriteResult.getDataFile()); - } + public abstract FileWriteResult build(); } } diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriter.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriter.java index aa203eb6eb66..859310bdcecb 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriter.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriter.java @@ -23,6 +23,9 @@ import org.apache.beam.sdk.values.Row; import org.apache.iceberg.DataFile; import org.apache.iceberg.FileFormat; +import org.apache.iceberg.ManifestFile; +import org.apache.iceberg.ManifestFiles; +import org.apache.iceberg.ManifestWriter; import org.apache.iceberg.Table; import org.apache.iceberg.avro.Avro; import org.apache.iceberg.catalog.Catalog; @@ -37,6 +40,7 @@ class RecordWriter { private final DataWriter icebergDataWriter; private final Table table; + private final String absoluteFilename; RecordWriter(Catalog catalog, IcebergDestination destination, String filename) throws IOException { @@ -46,9 +50,9 @@ class RecordWriter { RecordWriter(Table table, FileFormat fileFormat, String filename) throws IOException { this.table = table; - - String absoluteFilename = table.location() + "/" + filename; + this.absoluteFilename = table.location() + "/" + filename; OutputFile outputFile = table.io().newOutputFile(absoluteFilename); + switch (fileFormat) { case AVRO: icebergDataWriter = @@ -92,7 +96,15 @@ public long bytesWritten() { return icebergDataWriter.length(); } - public DataFile dataFile() { - return icebergDataWriter.toDataFile(); + public ManifestFile getManifestFile() throws IOException { + String manifestFilename = FileFormat.AVRO.addExtension(absoluteFilename + ".manifest"); + OutputFile outputFile = table.io().newOutputFile(manifestFilename); + ManifestWriter manifestWriter; + try (ManifestWriter openWriter = ManifestFiles.write(getTable().spec(), outputFile)) { + openWriter.add(icebergDataWriter.toDataFile()); + manifestWriter = openWriter; + } + + return manifestWriter.toManifestFile(); } } diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteGroupedRowsToFiles.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteGroupedRowsToFiles.java index 731a9fefb49d..c11263519442 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteGroupedRowsToFiles.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteGroupedRowsToFiles.java @@ -95,8 +95,7 @@ public void processElement( c.output( FileWriteResult.builder() .setTableIdentifier(destination.getTableIdentifier()) - .setDataFile(writer.dataFile()) - .setPartitionSpec(writer.getTable().spec()) + .setManifestFile(writer.getManifestFile()) .build()); } } diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteUngroupedRowsToFiles.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteUngroupedRowsToFiles.java index 917aab9e55c5..a00f3de4bb4e 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteUngroupedRowsToFiles.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteUngroupedRowsToFiles.java @@ -267,7 +267,7 @@ public void processElement(@Element Row element, BoundedWindow window, MultiOutp out.get(WRITTEN_FILES_TAG) .output( FileWriteResult.builder() - .setDataFile(writer.dataFile()) + .setManifestFile(writer.getManifestFile()) .setTableIdentifier(destination.getTableIdentifier()) .build()); writer = createAndInsertWriter(destination, window); @@ -307,9 +307,8 @@ private void outputFinalWrittenFiles(DoFn.FinishBundleCont getWindows().get(destination), "internal error: no windows for destination"); c.output( FileWriteResult.builder() - .setDataFile(writer.dataFile()) + .setManifestFile(writer.getManifestFile()) .setTableIdentifier(destination.getTableIdentifier()) - .setPartitionSpec(writer.getTable().spec()) .build(), window.maxTimestamp(), window); diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/FileWriteResultTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/FileWriteResultTest.java deleted file mode 100644 index 644130593152..000000000000 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/FileWriteResultTest.java +++ /dev/null @@ -1,166 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.iceberg; - -import static org.apache.iceberg.types.Types.NestedField.optional; -import static org.apache.iceberg.types.Types.NestedField.required; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.equalTo; -import static org.junit.Assert.assertEquals; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.Serializable; -import java.util.List; -import java.util.UUID; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.testing.CoderProperties; -import org.apache.beam.sdk.values.Row; -import org.apache.beam.sdk.values.TypeDescriptor; -import org.apache.commons.compress.utils.Lists; -import org.apache.iceberg.DataFile; -import org.apache.iceberg.FileFormat; -import org.apache.iceberg.Schema; -import org.apache.iceberg.Table; -import org.apache.iceberg.catalog.TableIdentifier; -import org.apache.iceberg.types.Types; -import org.junit.ClassRule; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; -import org.junit.rules.TemporaryFolder; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -@RunWith(JUnit4.class) -public class FileWriteResultTest implements Serializable { - - @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); - - @Rule public TestDataWarehouse warehouse = new TestDataWarehouse(TEMPORARY_FOLDER, "default"); - - private static final Coder TEST_CODER = - FileWriteResult.FileWriteResultCoder.of(); - - private List getTestValues() throws Exception { - TableIdentifier tableId = - TableIdentifier.of("default", "table" + Long.toString(UUID.randomUUID().hashCode(), 16)); - - // Create a table so we can have some DataFile objects - Table table = warehouse.createTable(tableId, TestFixtures.SCHEMA); - List values = Lists.newArrayList(); - - // A parquet file - RecordWriter writer = - new RecordWriter(table, FileFormat.PARQUET, TEMPORARY_FOLDER.newFile().toString()); - writer.write( - Row.withSchema(SchemaAndRowConversions.icebergSchemaToBeamSchema(TestFixtures.SCHEMA)) - .addValues(42L, "bizzle") - .build()); - writer.close(); - DataFile dataFile = writer.dataFile(); - values.add( - FileWriteResult.builder() - .setDataFile(dataFile) - .setPartitionSpec(table.spec()) - .setTableIdentifier(tableId) - .build()); - - // An avro file - writer = new RecordWriter(table, FileFormat.AVRO, TEMPORARY_FOLDER.newFile().toString()); - writer.write( - Row.withSchema(SchemaAndRowConversions.icebergSchemaToBeamSchema(TestFixtures.SCHEMA)) - .addValues(42L, "bizzle") - .build()); - writer.close(); - dataFile = writer.dataFile(); - values.add( - FileWriteResult.builder() - .setDataFile(dataFile) - .setPartitionSpec(table.spec()) - .setTableIdentifier(tableId) - .build()); - - // Parquet file with a different schema - TableIdentifier tableId2 = - TableIdentifier.of( - "default", "othertable" + Long.toString(UUID.randomUUID().hashCode(), 16)); - Schema schema = - new Schema( - required(1, "id", Types.LongType.get()), - optional(2, "data", Types.StringType.get()), - optional( - 3, - "extra", - Types.StructType.of( - Types.NestedField.required(4, "inner", Types.BinaryType.get())))); - Table table2 = warehouse.createTable(tableId2, schema); - - // A parquet file in this other table - writer = new RecordWriter(table2, FileFormat.PARQUET, TEMPORARY_FOLDER.newFile().toString()); - writer.write( - Row.withSchema(SchemaAndRowConversions.icebergSchemaToBeamSchema(schema)) - .addValues( - 42L, - "bizzle", - Row.withSchema( - org.apache.beam.sdk.schemas.Schema.of( - org.apache.beam.sdk.schemas.Schema.Field.of( - "inner", org.apache.beam.sdk.schemas.Schema.FieldType.BYTES))) - .addValues(new byte[] {0xa}) - .build()) - .build()); - writer.close(); - DataFile dataFile2 = writer.dataFile(); - values.add( - FileWriteResult.builder() - .setDataFile(dataFile2) - .setPartitionSpec(table2.spec()) - .setTableIdentifier(tableId2) - .build()); - - return values; - } - - @Test - public void testDecodeEncodeEqual() throws Exception { - for (FileWriteResult value : getTestValues()) { - CoderProperties.structuralValueDecodeEncodeEqual(TEST_CODER, value); - } - } - - @Test - public void testDecodeEncodeVersionNumber() throws Exception { - ByteArrayOutputStream out = new ByteArrayOutputStream(); - ByteArrayInputStream in; - for (FileWriteResult value : getTestValues()) { - TEST_CODER.encode(value, out); - in = new ByteArrayInputStream(out.toByteArray()); - - assertEquals(FileWriteResult.FileWriteResultCoder.VERSION, in.read()); - } - } - - @Rule public ExpectedException thrown = ExpectedException.none(); - - @Test - public void testEncodedTypeDescriptor() throws Exception { - assertThat( - TEST_CODER.getEncodedTypeDescriptor(), equalTo(TypeDescriptor.of(FileWriteResult.class))); - } -}