diff --git a/.github/workflows/IO_Iceberg.yml b/.github/workflows/IO_Iceberg.yml new file mode 100644 index 000000000000..abc75836322c --- /dev/null +++ b/.github/workflows/IO_Iceberg.yml @@ -0,0 +1,118 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +name: IcebergIO Unit Tests + +on: + push: + tags: ['v*'] + branches: ['master', 'release-*'] + paths: + - "sdks/java/io/iceberg/**" + - ".github/workflows/IO_Iceberg.yml" + pull_request_target: + branches: ['master', 'release-*'] + paths: + - "sdks/java/io/iceberg/**" + - 'release/trigger_all_tests.json' + - '.github/trigger_files/IO_Iceberg.json' + issue_comment: + types: [created] + schedule: + - cron: '15 1/6 * * *' + workflow_dispatch: + +#Setting explicit permissions for the action to avoid the default permissions which are `write-all` in case of pull_request_target event +permissions: + actions: write + pull-requests: write + checks: write + contents: read + deployments: read + id-token: none + issues: write + discussions: read + packages: read + pages: read + repository-projects: read + security-events: read + statuses: read + +# This allows a subsequently queued workflow run to interrupt previous runs +concurrency: + group: '${{ github.workflow }} @ ${{ github.event.issue.number || github.event.pull_request.head.label || github.sha || github.head_ref || github.ref }}-${{ github.event.schedule || github.event.comment.id || github.event.sender.login }}' + cancel-in-progress: true + +env: + GRADLE_ENTERPRISE_ACCESS_KEY: ${{ secrets.GE_ACCESS_TOKEN }} + GRADLE_ENTERPRISE_CACHE_USERNAME: ${{ secrets.GE_CACHE_USERNAME }} + GRADLE_ENTERPRISE_CACHE_PASSWORD: ${{ secrets.GE_CACHE_PASSWORD }} + +jobs: + IO_Iceberg: + name: ${{ matrix.job_name }} (${{ matrix.job_phrase }}) + strategy: + matrix: + job_name: ["IO_Iceberg"] + timeout-minutes: 60 + if: | + github.event_name == 'push' || + github.event_name == 'pull_request_target' || + (github.event_name == 'schedule' && github.repository == 'apache/beam') || + github.event_name == 'workflow_dispatch' || + github.event.comment.body == 'Run Java_Amqp_IO_Direct PreCommit' + runs-on: [self-hosted, ubuntu-20.04, main] + steps: + - uses: actions/checkout@v4 + - name: Setup repository + uses: ./.github/actions/setup-action + with: + comment_phrase: ${{ matrix.job_phrase }} + github_token: ${{ secrets.GITHUB_TOKEN }} + github_job: ${{ matrix.job_name }} (${{ matrix.job_phrase }}) + - name: Setup environment + uses: ./.github/actions/setup-environment-action + - name: run Amqp IO build script + uses: ./.github/actions/gradle-command-self-hosted-action + with: + gradle-command: :sdks:java:io:iceberg:build + arguments: | + -PdisableSpotlessCheck=true \ + -PdisableCheckStyle=true \ + - name: Archive JUnit Test Results + uses: actions/upload-artifact@v4 + if: ${{ !success() }} + with: + name: JUnit Test Results + path: "**/build/reports/tests/" + - name: Publish JUnit Test Results + uses: EnricoMi/publish-unit-test-result-action@v2 + if: always() + with: + commit: '${{ env.prsha || env.GITHUB_SHA }}' + comment_mode: ${{ github.event_name == 'issue_comment' && 'always' || 'off' }} + files: '**/build/test-results/**/*.xml' + - name: Archive SpotBugs Results + uses: actions/upload-artifact@v4 + if: always() + with: + name: SpotBugs Results + path: '**/build/reports/spotbugs/*.html' + - name: Publish SpotBugs Results + uses: jwgmeligmeyling/spotbugs-github-action@v1.2 + if: always() + with: + name: Publish SpotBugs + path: '**/build/reports/spotbugs/*.html' diff --git a/sdks/java/io/iceberg/build.gradle b/sdks/java/io/iceberg/build.gradle new file mode 100644 index 000000000000..6e39575c1679 --- /dev/null +++ b/sdks/java/io/iceberg/build.gradle @@ -0,0 +1,98 @@ +import java.util.stream.Collectors + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +plugins { id 'org.apache.beam.module' } +applyJavaNature( + automaticModuleName: 'org.apache.beam.sdk.io.iceberg', +) + +description = "Apache Beam :: SDKs :: Java :: IO :: Iceberg" +ext.summary = "Integration with Iceberg data warehouses." + +def hadoopVersions = [ + "285": "2.8.5", + "292": "2.9.2", + "2102": "2.10.2", + "324": "3.2.4", +] + +hadoopVersions.each {kv -> configurations.create("hadoopVersion$kv.key")} + +def iceberg_version = "1.4.2" +def parquet_version = "1.12.0" +def orc_version = "1.9.2" +def hive_version = "3.1.3" + +dependencies { + implementation library.java.vendored_guava_32_1_2_jre + implementation project(path: ":sdks:java:core", configuration: "shadow") + implementation project(":sdks:java:io:hadoop-common") + implementation library.java.slf4j_api + implementation "org.apache.parquet:parquet-column:$parquet_version" + implementation "org.apache.parquet:parquet-common:$parquet_version" + implementation "org.apache.orc:orc-core:$orc_version" + implementation "org.apache.iceberg:iceberg-core:$iceberg_version" + implementation "org.apache.iceberg:iceberg-api:$iceberg_version" + implementation "org.apache.iceberg:iceberg-parquet:$iceberg_version" + implementation "org.apache.iceberg:iceberg-orc:$iceberg_version" + implementation "org.apache.iceberg:iceberg-arrow:$iceberg_version" + implementation "org.apache.iceberg:iceberg-data:$iceberg_version" + + + + provided library.java.avro + provided library.java.hadoop_client + permitUnusedDeclared library.java.hadoop_client + provided library.java.hadoop_common + testImplementation library.java.hadoop_client + + testImplementation "org.apache.iceberg:iceberg-gcp:$iceberg_version" + testImplementation project(path: ":sdks:java:core", configuration: "shadowTest") + testImplementation library.java.junit + testRuntimeOnly library.java.slf4j_jdk14 + testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadow") + hadoopVersions.each {kv -> + "hadoopVersion$kv.key" "org.apache.hadoop:hadoop-client:$kv.value" + } +} + +hadoopVersions.each {kv -> + configurations."hadoopVersion$kv.key" { + resolutionStrategy { + force "org.apache.hadoop:hadoop-client:$kv.value" + } + } +} + +task hadoopVersionsTest(group: "Verification") { + description = "Runs Iceberg tests with different Hadoop versions" + def taskNames = hadoopVersions.keySet().stream() + .map{num -> "hadoopVersion${num}Test"} + .collect(Collectors.toList()) + dependsOn taskNames +} + +hadoopVersions.each { kv -> + task "hadoopVersion${kv.key}Test"(type: Test, group: "Verification") { + description = "Runs Iceberg tests with Hadoop version $kv.value" + classpath = configurations."hadoopVersion$kv.key" + sourceSets.test.runtimeClasspath + include '**/*Test.class' + } +} diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/AppendFilesToTables.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/AppendFilesToTables.java new file mode 100644 index 000000000000..468bdcf5790d --- /dev/null +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/AppendFilesToTables.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.io.iceberg; + +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.SerializableCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.WithKeys; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.iceberg.AppendFiles; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.TableIdentifier; +import org.checkerframework.checker.nullness.qual.MonotonicNonNull; + +class AppendFilesToTables + extends PTransform, PCollection>> { + + private final IcebergCatalogConfig catalogConfig; + + AppendFilesToTables(IcebergCatalogConfig catalogConfig) { + this.catalogConfig = catalogConfig; + } + + @Override + public PCollection> expand(PCollection writtenFiles) { + + // Apply any sharded writes and flatten everything for catalog updates + return writtenFiles + .apply( + "Key metadata updates by table", + WithKeys.of( + new SerializableFunction() { + @Override + public String apply(FileWriteResult input) { + return input.getTableIdentifier().toString(); + } + })) + .apply("Group metadata updates by table", GroupByKey.create()) + .apply( + "Append metadata updates to tables", + ParDo.of(new AppendFilesToTablesDoFn(catalogConfig))) + .setCoder(KvCoder.of(StringUtf8Coder.of(), SerializableCoder.of(Snapshot.class))); + } + + private static class AppendFilesToTablesDoFn + extends DoFn>, KV> { + + private final IcebergCatalogConfig catalogConfig; + + private transient @MonotonicNonNull Catalog catalog; + + private AppendFilesToTablesDoFn(IcebergCatalogConfig catalogConfig) { + this.catalogConfig = catalogConfig; + } + + private Catalog getCatalog() { + if (catalog == null) { + catalog = catalogConfig.catalog(); + } + return catalog; + } + + @ProcessElement + public void processElement( + @Element KV> element, + OutputReceiver> out, + BoundedWindow window) { + Table table = getCatalog().loadTable(TableIdentifier.parse(element.getKey())); + AppendFiles update = table.newAppend(); + for (FileWriteResult writtenFile : element.getValue()) { + update.appendFile(writtenFile.getDataFile()); + } + update.commit(); + out.outputWithTimestamp( + KV.of(element.getKey(), table.currentSnapshot()), window.maxTimestamp()); + } + } +} diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/AssignDestinations.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/AssignDestinations.java new file mode 100644 index 000000000000..a9c4e82a7767 --- /dev/null +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/AssignDestinations.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.io.iceberg; + +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.Row; + +/** + * Assigns the destination metadata for each input record. + * + *

The output record will have the format { dest: ..., data: ...} where the dest field has the + * assigned metadata and the data field has the original row. + */ +class AssignDestinations extends PTransform, PCollection> { + + private DynamicDestinations dynamicDestinations; + + public AssignDestinations(DynamicDestinations dynamicDestinations) { + this.dynamicDestinations = dynamicDestinations; + } + + @Override + public PCollection expand(PCollection input) { + + final Schema inputSchema = input.getSchema(); + final Schema outputSchema = + Schema.builder() + .addRowField("data", inputSchema) + .addRowField("dest", dynamicDestinations.getMetadataSchema()) + .build(); + + return input + .apply( + ParDo.of( + new DoFn() { + @ProcessElement + public void processElement(@Element Row data, OutputReceiver out) { + out.output( + Row.withSchema(outputSchema) + .addValues(data, dynamicDestinations.assignDestinationMetadata(data)) + .build()); + } + })) + .setRowSchema(outputSchema); + } +} diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/DynamicDestinations.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/DynamicDestinations.java new file mode 100644 index 000000000000..a395086403f6 --- /dev/null +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/DynamicDestinations.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.io.iceberg; + +import java.io.Serializable; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.values.Row; +import org.apache.iceberg.catalog.TableIdentifier; + +public interface DynamicDestinations extends Serializable { + + Schema getMetadataSchema(); + + Row assignDestinationMetadata(Row data); + + IcebergDestination instantiateDestination(Row dest); + + static DynamicDestinations singleTable(TableIdentifier tableId) { + return new OneTableDynamicDestinations(tableId); + } +} diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/FileWriteResult.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/FileWriteResult.java new file mode 100644 index 000000000000..09645cdd827b --- /dev/null +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/FileWriteResult.java @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.io.iceberg; + +import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull; + +import com.google.auto.value.AutoValue; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import org.apache.avro.Schema; +import org.apache.beam.sdk.coders.ByteArrayCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.CoderProvider; +import org.apache.beam.sdk.coders.CoderProviders; +import org.apache.beam.sdk.coders.DefaultCoder; +import org.apache.beam.sdk.coders.SerializableCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.coders.StructuredCoder; +import org.apache.beam.sdk.values.TypeDescriptor; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.avro.AvroEncoderUtil; +import org.apache.iceberg.avro.AvroSchemaUtil; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.types.Types; +import org.checkerframework.checker.nullness.qual.Nullable; + +@AutoValue +@DefaultCoder(FileWriteResult.FileWriteResultCoder.class) +abstract class FileWriteResult { + public abstract TableIdentifier getTableIdentifier(); + + public abstract PartitionSpec getPartitionSpec(); + + public abstract DataFile getDataFile(); + + public static Builder builder() { + return new AutoValue_FileWriteResult.Builder(); + } + + @AutoValue.Builder + abstract static class Builder { + public abstract Builder setTableIdentifier(TableIdentifier tableId); + + public abstract Builder setPartitionSpec(PartitionSpec partitionSpec); + + public abstract Builder setDataFile(DataFile dataFiles); + + public abstract FileWriteResult build(); + } + + public static class FileWriteResultCoder extends StructuredCoder { + private static final FileWriteResultCoder SINGLETON = new FileWriteResultCoder(); + + private static final Coder tableIdentifierCoder = StringUtf8Coder.of(); + private static final Coder partitionSpecCoder = + SerializableCoder.of(PartitionSpec.class); + private static final Coder dataFileBytesCoder = ByteArrayCoder.of(); + + private static Schema getDataFileAvroSchema(FileWriteResult fileWriteResult) { + Types.StructType partitionType = fileWriteResult.getPartitionSpec().partitionType(); + Types.StructType dataFileStruct = DataFile.getType(partitionType); + Map dataFileNames = + ImmutableMap.of( + dataFileStruct, "org.apache.iceberg.GenericDataFile", + partitionType, "org.apache.iceberg.PartitionData"); + return AvroSchemaUtil.convert(dataFileStruct, dataFileNames); + } + + @Override + public void encode(FileWriteResult value, OutputStream outStream) + throws CoderException, IOException { + tableIdentifierCoder.encode(value.getTableIdentifier().toString(), outStream); + partitionSpecCoder.encode(value.getPartitionSpec(), outStream); + dataFileBytesCoder.encode( + AvroEncoderUtil.encode(value.getDataFile(), getDataFileAvroSchema(value)), outStream); + } + + @Override + public FileWriteResult decode(InputStream inStream) throws CoderException, IOException { + TableIdentifier tableId = TableIdentifier.parse(tableIdentifierCoder.decode(inStream)); + PartitionSpec partitionSpec = partitionSpecCoder.decode(inStream); + DataFile dataFile = + checkArgumentNotNull( + AvroEncoderUtil.decode(dataFileBytesCoder.decode(inStream)), + "Decoding of dataFile resulted in null"); + return FileWriteResult.builder() + .setTableIdentifier(tableId) + .setDataFile(dataFile) + .setPartitionSpec(partitionSpec) + .build(); + } + + @Override + public List> getCoderArguments() { + return Collections.emptyList(); + } + + @Override + public Object structuralValue(FileWriteResult fileWriteResult) { + return new FileWriteResultDeepEqualityWrapper(fileWriteResult); + } + + @Override + public void verifyDeterministic() throws NonDeterministicException {} + + @Override + public TypeDescriptor getEncodedTypeDescriptor() { + return TypeDescriptor.of(FileWriteResult.class); + } + + public static FileWriteResultCoder of() { + return SINGLETON; + } + + @SuppressWarnings("unused") // used via `DefaultCoder` annotation + public static CoderProvider getCoderProvider() { + return CoderProviders.forCoder( + TypeDescriptor.of(FileWriteResult.class), FileWriteResultCoder.of()); + } + } + + private static class FileWriteResultDeepEqualityWrapper { + private final FileWriteResult fileWriteResult; + + private FileWriteResultDeepEqualityWrapper(FileWriteResult fileWriteResult) { + this.fileWriteResult = fileWriteResult; + } + + @Override + public boolean equals(@Nullable Object obj) { + if (obj == this) { + return true; + } + if (obj == null) { + return false; + } + if (!(obj instanceof FileWriteResultDeepEqualityWrapper)) { + return false; + } + FileWriteResultDeepEqualityWrapper other = (FileWriteResultDeepEqualityWrapper) obj; + + return Objects.equals( + fileWriteResult.getTableIdentifier(), other.fileWriteResult.getTableIdentifier()) + && Objects.equals( + fileWriteResult.getPartitionSpec(), other.fileWriteResult.getPartitionSpec()) + && dataFilesEqual(fileWriteResult.getDataFile(), other.fileWriteResult.getDataFile()); + } + + private boolean dataFilesEqual(DataFile first, DataFile second) { + return Objects.equals(first.pos(), second.pos()) + && first.specId() == second.specId() + && Objects.equals(first.content(), second.content()) + && Objects.equals(first.path(), second.path()) + && Objects.equals(first.format(), second.format()) + && Objects.equals(first.partition(), second.partition()) + && first.recordCount() == second.recordCount() + && first.fileSizeInBytes() == second.fileSizeInBytes() + && Objects.equals(first.columnSizes(), second.columnSizes()) + && Objects.equals(first.valueCounts(), second.valueCounts()) + && Objects.equals(first.nullValueCounts(), second.nullValueCounts()) + && Objects.equals(first.nanValueCounts(), second.nanValueCounts()) + && Objects.equals(first.lowerBounds(), second.lowerBounds()) + && Objects.equals(first.upperBounds(), second.upperBounds()) + && Objects.equals(first.keyMetadata(), second.keyMetadata()) + && Objects.equals(first.splitOffsets(), second.splitOffsets()) + && Objects.equals(first.equalityFieldIds(), second.equalityFieldIds()) + && Objects.equals(first.sortOrderId(), second.sortOrderId()) + && Objects.equals(first.dataSequenceNumber(), second.dataSequenceNumber()) + && Objects.equals(first.fileSequenceNumber(), second.fileSequenceNumber()); + } + + @Override + public int hashCode() { + return Objects.hash( + fileWriteResult.getTableIdentifier(), + fileWriteResult.getPartitionSpec(), + fileWriteResult.getDataFile()); + } + } +} diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/IcebergCatalogConfig.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/IcebergCatalogConfig.java new file mode 100644 index 000000000000..06a29ac14652 --- /dev/null +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/IcebergCatalogConfig.java @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.io.iceberg; + +import com.google.auto.value.AutoValue; +import java.io.Serializable; +import javax.annotation.Nullable; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.CatalogUtil; +import org.checkerframework.dataflow.qual.Pure; + +@AutoValue +public abstract class IcebergCatalogConfig implements Serializable { + + @Pure + public abstract String getName(); + + /* Core Properties */ + @Pure + public abstract @Nullable String getIcebergCatalogType(); + + @Pure + public abstract @Nullable String getCatalogImplementation(); + + @Pure + public abstract @Nullable String getFileIOImplementation(); + + @Pure + public abstract @Nullable String getWarehouseLocation(); + + @Pure + public abstract @Nullable String getMetricsReporterImplementation(); + + /* Caching */ + @Pure + public abstract boolean getCacheEnabled(); + + @Pure + public abstract boolean getCacheCaseSensitive(); + + @Pure + public abstract long getCacheExpirationIntervalMillis(); + + @Pure + public abstract boolean getIOManifestCacheEnabled(); + + @Pure + public abstract long getIOManifestCacheExpirationIntervalMillis(); + + @Pure + public abstract long getIOManifestCacheMaxTotalBytes(); + + @Pure + public abstract long getIOManifestCacheMaxContentLength(); + + @Pure + public abstract @Nullable String getUri(); + + @Pure + public abstract int getClientPoolSize(); + + @Pure + public abstract long getClientPoolEvictionIntervalMs(); + + @Pure + public abstract @Nullable String getClientPoolCacheKeys(); + + @Pure + public abstract @Nullable String getLockImplementation(); + + @Pure + public abstract long getLockHeartbeatIntervalMillis(); + + @Pure + public abstract long getLockHeartbeatTimeoutMillis(); + + @Pure + public abstract int getLockHeartbeatThreads(); + + @Pure + public abstract long getLockAcquireIntervalMillis(); + + @Pure + public abstract long getLockAcquireTimeoutMillis(); + + @Pure + public abstract @Nullable String getAppIdentifier(); + + @Pure + public abstract @Nullable String getUser(); + + @Pure + public abstract long getAuthSessionTimeoutMillis(); + + @Pure + public abstract @Nullable Configuration getConfiguration(); + + @Pure + public static Builder builder() { + return new AutoValue_IcebergCatalogConfig.Builder() + .setIcebergCatalogType(null) + .setCatalogImplementation(null) + .setFileIOImplementation(null) + .setWarehouseLocation(null) + .setMetricsReporterImplementation(null) // TODO: Set this to our implementation + .setCacheEnabled(CatalogProperties.CACHE_ENABLED_DEFAULT) + .setCacheCaseSensitive(CatalogProperties.CACHE_CASE_SENSITIVE_DEFAULT) + .setCacheExpirationIntervalMillis(CatalogProperties.CACHE_EXPIRATION_INTERVAL_MS_DEFAULT) + .setIOManifestCacheEnabled(CatalogProperties.IO_MANIFEST_CACHE_ENABLED_DEFAULT) + .setIOManifestCacheExpirationIntervalMillis( + CatalogProperties.CACHE_EXPIRATION_INTERVAL_MS_DEFAULT) + .setIOManifestCacheMaxTotalBytes( + CatalogProperties.IO_MANIFEST_CACHE_MAX_TOTAL_BYTES_DEFAULT) + .setIOManifestCacheMaxContentLength( + CatalogProperties.IO_MANIFEST_CACHE_MAX_CONTENT_LENGTH_DEFAULT) + .setUri(null) + .setClientPoolSize(CatalogProperties.CLIENT_POOL_SIZE_DEFAULT) + .setClientPoolEvictionIntervalMs( + CatalogProperties.CLIENT_POOL_CACHE_EVICTION_INTERVAL_MS_DEFAULT) + .setClientPoolCacheKeys(null) + .setLockImplementation(null) + .setLockHeartbeatIntervalMillis(CatalogProperties.LOCK_HEARTBEAT_INTERVAL_MS_DEFAULT) + .setLockHeartbeatTimeoutMillis(CatalogProperties.LOCK_HEARTBEAT_TIMEOUT_MS_DEFAULT) + .setLockHeartbeatThreads(CatalogProperties.LOCK_HEARTBEAT_THREADS_DEFAULT) + .setLockAcquireIntervalMillis(CatalogProperties.LOCK_ACQUIRE_INTERVAL_MS_DEFAULT) + .setLockAcquireTimeoutMillis(CatalogProperties.LOCK_HEARTBEAT_TIMEOUT_MS_DEFAULT) + .setAppIdentifier(null) + .setUser(null) + .setAuthSessionTimeoutMillis(CatalogProperties.AUTH_SESSION_TIMEOUT_MS_DEFAULT) + .setConfiguration(null); + } + + @Pure + public ImmutableMap properties() { + return new PropertyBuilder() + .put(CatalogUtil.ICEBERG_CATALOG_TYPE, getIcebergCatalogType()) + .put(CatalogProperties.CATALOG_IMPL, getCatalogImplementation()) + .put(CatalogProperties.FILE_IO_IMPL, getFileIOImplementation()) + .put(CatalogProperties.WAREHOUSE_LOCATION, getWarehouseLocation()) + .put(CatalogProperties.METRICS_REPORTER_IMPL, getMetricsReporterImplementation()) + .put(CatalogProperties.CACHE_ENABLED, getCacheEnabled()) + .put(CatalogProperties.CACHE_CASE_SENSITIVE, getCacheCaseSensitive()) + .put(CatalogProperties.CACHE_EXPIRATION_INTERVAL_MS, getCacheExpirationIntervalMillis()) + .build(); + } + + public org.apache.iceberg.catalog.Catalog catalog() { + Configuration conf = getConfiguration(); + if (conf == null) { + conf = new Configuration(); + } + return CatalogUtil.buildIcebergCatalog(getName(), properties(), conf); + } + + @AutoValue.Builder + public abstract static class Builder { + + /* Core Properties */ + public abstract Builder setName(String name); + + public abstract Builder setIcebergCatalogType(@Nullable String icebergType); + + public abstract Builder setCatalogImplementation(@Nullable String catalogImpl); + + public abstract Builder setFileIOImplementation(@Nullable String fileIOImpl); + + public abstract Builder setWarehouseLocation(@Nullable String warehouse); + + public abstract Builder setMetricsReporterImplementation(@Nullable String metricsImpl); + + /* Caching */ + public abstract Builder setCacheEnabled(boolean cacheEnabled); + + public abstract Builder setCacheCaseSensitive(boolean cacheCaseSensitive); + + public abstract Builder setCacheExpirationIntervalMillis(long expiration); + + public abstract Builder setIOManifestCacheEnabled(boolean enabled); + + public abstract Builder setIOManifestCacheExpirationIntervalMillis(long expiration); + + public abstract Builder setIOManifestCacheMaxTotalBytes(long bytes); + + public abstract Builder setIOManifestCacheMaxContentLength(long length); + + public abstract Builder setUri(@Nullable String uri); + + public abstract Builder setClientPoolSize(int size); + + public abstract Builder setClientPoolEvictionIntervalMs(long interval); + + public abstract Builder setClientPoolCacheKeys(@Nullable String keys); + + public abstract Builder setLockImplementation(@Nullable String lockImplementation); + + public abstract Builder setLockHeartbeatIntervalMillis(long interval); + + public abstract Builder setLockHeartbeatTimeoutMillis(long timeout); + + public abstract Builder setLockHeartbeatThreads(int threads); + + public abstract Builder setLockAcquireIntervalMillis(long interval); + + public abstract Builder setLockAcquireTimeoutMillis(long timeout); + + public abstract Builder setAppIdentifier(@Nullable String id); + + public abstract Builder setUser(@Nullable String user); + + public abstract Builder setAuthSessionTimeoutMillis(long timeout); + + public abstract Builder setConfiguration(@Nullable Configuration conf); + + public abstract IcebergCatalogConfig build(); + } +} diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/IcebergDestination.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/IcebergDestination.java new file mode 100644 index 000000000000..dcd69c87e9d0 --- /dev/null +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/IcebergDestination.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.io.iceberg; + +import com.google.auto.value.AutoValue; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.catalog.TableIdentifier; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.checkerframework.dataflow.qual.Pure; + +@AutoValue +public abstract class IcebergDestination { + + /** + * The iceberg table identifier to write data to. This is relative to the catalog, which is + * presumed to be configured outside of this destination specification. + */ + @Pure + public abstract TableIdentifier getTableIdentifier(); + + /** File format for created files. */ + @Pure + public abstract FileFormat getFileFormat(); + + /** + * Metadata and constraints for creating a new table, if it must be done dynamically. + * + *

If null, dynamic table creation will fail, and this should be disallowed at the top level + * configuration. + */ + @Pure + public abstract @Nullable IcebergTableCreateConfig getTableCreateConfig(); + + @Pure + public static Builder builder() { + return new AutoValue_IcebergDestination.Builder(); + } + + @AutoValue.Builder + public abstract static class Builder { + public abstract Builder setTableIdentifier(TableIdentifier tableId); + + public abstract Builder setFileFormat(FileFormat fileFormat); + + public abstract Builder setTableCreateConfig(@Nullable IcebergTableCreateConfig createConfig); + + @Pure + public abstract IcebergDestination build(); + } +} diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/IcebergIO.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/IcebergIO.java new file mode 100644 index 000000000000..4b9e3102a02e --- /dev/null +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/IcebergIO.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.io.iceberg; + +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.Row; + +public class IcebergIO { + + public static WriteRows writeToDynamicDestinations( + IcebergCatalogConfig catalog, DynamicDestinations dynamicDestinations) { + return new WriteRows(catalog, dynamicDestinations); + } + + static class WriteRows extends PTransform, IcebergWriteResult> { + + private final IcebergCatalogConfig catalog; + private final DynamicDestinations dynamicDestinations; + + private WriteRows(IcebergCatalogConfig catalog, DynamicDestinations dynamicDestinations) { + this.catalog = catalog; + this.dynamicDestinations = dynamicDestinations; + } + + @Override + public IcebergWriteResult expand(PCollection input) { + + return input + .apply("Set Destination Metadata", new AssignDestinations(dynamicDestinations)) + .apply( + "Write Rows to Destinations", new WriteToDestinations(catalog, dynamicDestinations)); + } + } +} diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/IcebergTableCreateConfig.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/IcebergTableCreateConfig.java new file mode 100644 index 000000000000..c1041f026c31 --- /dev/null +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/IcebergTableCreateConfig.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.io.iceberg; + +import com.google.auto.value.AutoValue; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.checkerframework.dataflow.qual.Pure; + +@AutoValue +public abstract class IcebergTableCreateConfig { + + /** Schema for the destination, in the event that it must be dynamically created. */ + @Pure + public abstract Schema getSchema(); + + /** Partition spec destination, in the event that it must be dynamically created. */ + @Pure + public abstract PartitionSpec getPartitionSpec(); + + @Pure + public Builder builder() { + return new AutoValue_IcebergTableCreateConfig.Builder(); + } + + @AutoValue.Builder + public abstract static class Builder { + public abstract Builder setSchema(Schema schema); + + public abstract Builder setPartitionSpec(PartitionSpec partitionSpec); + + @Pure + public abstract IcebergTableCreateConfig build(); + } +} diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/IcebergWriteResult.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/IcebergWriteResult.java new file mode 100644 index 000000000000..94ac576674ed --- /dev/null +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/IcebergWriteResult.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.io.iceberg; + +import java.util.Map; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PInput; +import org.apache.beam.sdk.values.POutput; +import org.apache.beam.sdk.values.PValue; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.Snapshot; + +public final class IcebergWriteResult implements POutput { + + private static final TupleTag> SNAPSHOTS_TAG = + new TupleTag>() {}; + + private final Pipeline pipeline; + + private final PCollection> snapshots; + + public PCollection> getSnapshots() { + return snapshots; + } + + IcebergWriteResult(Pipeline pipeline, PCollection> snapshots) { + this.pipeline = pipeline; + this.snapshots = snapshots; + } + + @Override + public Pipeline getPipeline() { + return pipeline; + } + + @Override + public Map, PValue> expand() { + ImmutableMap.Builder, PValue> output = ImmutableMap.builder(); + output.put(SNAPSHOTS_TAG, snapshots); + return output.build(); + } + + @Override + public void finishSpecifyingOutput( + String transformName, PInput input, PTransform transform) {} +} diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/OneTableDynamicDestinations.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/OneTableDynamicDestinations.java new file mode 100644 index 000000000000..dc6815235a06 --- /dev/null +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/OneTableDynamicDestinations.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.io.iceberg; + +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.values.Row; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.catalog.TableIdentifier; +import org.checkerframework.checker.nullness.qual.MonotonicNonNull; + +class OneTableDynamicDestinations implements DynamicDestinations { + + private static final Schema EMPTY_SCHEMA = Schema.builder().build(); + private static final Row EMPTY_ROW = Row.nullRow(EMPTY_SCHEMA); + + // TableId represented as String for serializability + private final String tableIdString; + + private transient @MonotonicNonNull TableIdentifier tableId; + + private TableIdentifier getTableIdentifier() { + if (tableId == null) { + tableId = TableIdentifier.parse(tableIdString); + } + return tableId; + } + + OneTableDynamicDestinations(TableIdentifier tableId) { + this.tableIdString = tableId.toString(); + } + + @Override + public Schema getMetadataSchema() { + return EMPTY_SCHEMA; + } + + @Override + public Row assignDestinationMetadata(Row data) { + return EMPTY_ROW; + } + + @Override + public IcebergDestination instantiateDestination(Row dest) { + return IcebergDestination.builder() + .setTableIdentifier(getTableIdentifier()) + .setTableCreateConfig(null) + .setFileFormat(FileFormat.PARQUET) + .build(); + } +} diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/PropertyBuilder.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/PropertyBuilder.java new file mode 100644 index 000000000000..53184c70dfca --- /dev/null +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/PropertyBuilder.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.io.iceberg; + +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.checkerframework.checker.nullness.qual.Nullable; + +/** Convenience utility class to build immutable maps that drops attempts to set null values. */ +class PropertyBuilder { + + ImmutableMap.Builder builder = ImmutableMap.builder(); + + public PropertyBuilder put(String key, @Nullable Object value) { + if (value != null) { + builder = builder.put(key, "" + value); + } + return this; + } + + public ImmutableMap build() { + return builder.build(); + } +} diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/RecordWriter.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/RecordWriter.java new file mode 100644 index 000000000000..d9a3427c11fc --- /dev/null +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/RecordWriter.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.io.iceberg; + +import static org.apache.beam.io.iceberg.RowHelper.rowToRecord; + +import java.io.IOException; +import org.apache.beam.sdk.values.Row; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Table; +import org.apache.iceberg.avro.Avro; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.data.parquet.GenericParquetWriter; +import org.apache.iceberg.io.DataWriter; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.orc.ORC; +import org.apache.iceberg.parquet.Parquet; + +class RecordWriter { + + private final DataWriter icebergDataWriter; + + private final Table table; + + RecordWriter(Catalog catalog, IcebergDestination destination, String filename) + throws IOException { + this( + catalog.loadTable(destination.getTableIdentifier()), destination.getFileFormat(), filename); + } + + RecordWriter(Table table, FileFormat fileFormat, String filename) throws IOException { + this.table = table; + + String absoluteFilename = table.location() + "/" + filename; + OutputFile outputFile = table.io().newOutputFile(absoluteFilename); + switch (fileFormat) { + case AVRO: + icebergDataWriter = + Avro.writeData(outputFile) + .createWriterFunc(org.apache.iceberg.data.avro.DataWriter::create) + .schema(table.schema()) + .withSpec(table.spec()) + .overwrite() + .build(); + break; + case PARQUET: + icebergDataWriter = + Parquet.writeData(outputFile) + .createWriterFunc(GenericParquetWriter::buildWriter) + .schema(table.schema()) + .withSpec(table.spec()) + .overwrite() + .build(); + break; + case ORC: + throw new UnsupportedOperationException("ORC file format not currently supported."); + default: + throw new RuntimeException("Unknown File Format: " + fileFormat); + } + } + + public void write(Row row) { + Record record = rowToRecord(table.schema(), row); + icebergDataWriter.write(record); + } + + public void close() throws IOException { + icebergDataWriter.close(); + } + + public Table getTable() { + return table; + } + + public long bytesWritten() { + return icebergDataWriter.length(); + } + + public DataFile dataFile() { + return icebergDataWriter.toDataFile(); + } +} diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/RowHelper.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/RowHelper.java new file mode 100644 index 000000000000..92f55208a0da --- /dev/null +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/RowHelper.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.io.iceberg; + +import java.nio.ByteBuffer; +import java.util.Optional; +import java.util.UUID; +import org.apache.beam.sdk.values.Row; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.types.Types.NestedField; + +class RowHelper { + + // static helper functions only + private RowHelper() {} + + public static Record rowToRecord(org.apache.iceberg.Schema schema, Row row) { + return copy(GenericRecord.create(schema), row); + } + + private static Record copy(Record baseRecord, Row value) { + Record rec = baseRecord.copy(); + for (NestedField f : rec.struct().fields()) { + copyInto(rec, f, value); + } + return rec; + } + + private static void copyInto(Record rec, NestedField field, Row value) { + String name = field.name(); + switch (field.type().typeId()) { + case BOOLEAN: + Optional.ofNullable(value.getBoolean(name)).ifPresent(v -> rec.setField(name, v)); + break; + case INTEGER: + Optional.ofNullable(value.getInt32(name)).ifPresent(v -> rec.setField(name, v)); + break; + case LONG: + Optional.ofNullable(value.getInt64(name)).ifPresent(v -> rec.setField(name, v)); + break; + case FLOAT: + Optional.ofNullable(value.getFloat(name)).ifPresent(v -> rec.setField(name, v)); + break; + case DOUBLE: + Optional.ofNullable(value.getDouble(name)).ifPresent(v -> rec.setField(name, v)); + break; + case DATE: + throw new UnsupportedOperationException("Date fields not yet supported"); + case TIME: + throw new UnsupportedOperationException("Time fields not yet supported"); + case TIMESTAMP: + Optional.ofNullable(value.getDateTime(name)) + .ifPresent(v -> rec.setField(name, v.getMillis())); + break; + case STRING: + Optional.ofNullable(value.getString(name)).ifPresent(v -> rec.setField(name, v)); + break; + case UUID: + Optional.ofNullable(value.getBytes(name)) + .ifPresent(v -> rec.setField(name, UUID.nameUUIDFromBytes(v))); + break; + case FIXED: + throw new UnsupportedOperationException("Fixed-precision fields are not yet supported."); + case BINARY: + Optional.ofNullable(value.getBytes(name)) + .ifPresent(v -> rec.setField(name, ByteBuffer.wrap(v))); + break; + case DECIMAL: + Optional.ofNullable(value.getDecimal(name)).ifPresent(v -> rec.setField(name, v)); + break; + case STRUCT: + Optional.ofNullable(value.getRow(name)) + .ifPresent( + row -> + rec.setField( + name, copy(GenericRecord.create(field.type().asStructType()), row))); + break; + case LIST: + throw new UnsupportedOperationException("List fields are not yet supported."); + case MAP: + throw new UnsupportedOperationException("Map fields are not yet supported."); + } + } +} diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/SchemaHelper.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/SchemaHelper.java new file mode 100644 index 000000000000..bbd23239de66 --- /dev/null +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/SchemaHelper.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.io.iceberg; + +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.Schema.FieldType; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; + +@SuppressWarnings({"dereference.of.nullable"}) +class SchemaHelper { + + private SchemaHelper() {} + + public static final String ICEBERG_TYPE_OPTION_NAME = "icebergTypeID"; + + public static Schema.FieldType fieldTypeForType(final Type type) { + switch (type.typeId()) { + case BOOLEAN: + return FieldType.BOOLEAN; + case INTEGER: + return FieldType.INT32; + case LONG: + return FieldType.INT64; + case FLOAT: + return FieldType.FLOAT; + case DOUBLE: + return FieldType.DOUBLE; + case DATE: + case TIME: + case TIMESTAMP: // TODO: Logical types? + return FieldType.DATETIME; + case STRING: + return FieldType.STRING; + case UUID: + case BINARY: + return FieldType.BYTES; + case FIXED: + case DECIMAL: + return FieldType.DECIMAL; + case STRUCT: + return FieldType.row(convert(type.asStructType())); + case LIST: + return FieldType.iterable(fieldTypeForType(type.asListType().elementType())); + case MAP: + return FieldType.map( + fieldTypeForType(type.asMapType().keyType()), + fieldTypeForType(type.asMapType().valueType())); + } + throw new RuntimeException("Unrecognized IcebergIO Type"); + } + + public static Schema.Field convert(final Types.NestedField field) { + return Schema.Field.of(field.name(), fieldTypeForType(field.type())) + .withOptions( + Schema.Options.builder() + .setOption( + ICEBERG_TYPE_OPTION_NAME, Schema.FieldType.STRING, field.type().typeId().name()) + .build()) + .withNullable(field.isOptional()); + } + + public static Schema convert(final org.apache.iceberg.Schema schema) { + Schema.Builder builder = Schema.builder(); + for (Types.NestedField f : schema.columns()) { + builder.addField(convert(f)); + } + return builder.build(); + } + + public static Schema convert(final Types.StructType struct) { + Schema.Builder builder = Schema.builder(); + for (Types.NestedField f : struct.fields()) { + builder.addField(convert(f)); + } + return builder.build(); + } + + public static Types.NestedField convert(int fieldId, final Schema.Field field) { + String typeId = field.getOptions().getValue(ICEBERG_TYPE_OPTION_NAME, String.class); + if (typeId != null) { + return Types.NestedField.of( + fieldId, + field.getType().getNullable(), + field.getName(), + Types.fromPrimitiveString(typeId)); + } else { + return Types.NestedField.of( + fieldId, field.getType().getNullable(), field.getName(), Types.StringType.get()); + } + } + + public static org.apache.iceberg.Schema convert(final Schema schema) { + Types.NestedField[] fields = new Types.NestedField[schema.getFieldCount()]; + int fieldId = 0; + for (Schema.Field f : schema.getFields()) { + fields[fieldId++] = convert(fieldId, f); + } + return new org.apache.iceberg.Schema(fields); + } +} diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/WriteGroupedRowsToFiles.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/WriteGroupedRowsToFiles.java new file mode 100644 index 000000000000..7a152b602581 --- /dev/null +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/WriteGroupedRowsToFiles.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.io.iceberg; + +import java.io.IOException; +import java.util.UUID; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.ShardedKey; +import org.apache.iceberg.catalog.Catalog; +import org.checkerframework.checker.nullness.qual.MonotonicNonNull; + +class WriteGroupedRowsToFiles + extends PTransform< + PCollection, Iterable>>, PCollection> { + + static final long DEFAULT_MAX_BYTES_PER_FILE = (1L << 40); // 1TB + + private final DynamicDestinations dynamicDestinations; + private final IcebergCatalogConfig catalogConfig; + + WriteGroupedRowsToFiles( + IcebergCatalogConfig catalogConfig, DynamicDestinations dynamicDestinations) { + this.catalogConfig = catalogConfig; + this.dynamicDestinations = dynamicDestinations; + } + + @Override + public PCollection expand( + PCollection, Iterable>> input) { + return input.apply( + ParDo.of( + new WriteGroupedRowsToFilesDoFn( + catalogConfig, dynamicDestinations, DEFAULT_MAX_BYTES_PER_FILE))); + } + + private static class WriteGroupedRowsToFilesDoFn + extends DoFn, Iterable>, FileWriteResult> { + + private final DynamicDestinations dynamicDestinations; + private final IcebergCatalogConfig catalogConfig; + private transient @MonotonicNonNull Catalog catalog; + + WriteGroupedRowsToFilesDoFn( + IcebergCatalogConfig catalogConfig, + DynamicDestinations dynamicDestinations, + long maxFileSize) { + this.catalogConfig = catalogConfig; + this.dynamicDestinations = dynamicDestinations; + } + + private org.apache.iceberg.catalog.Catalog getCatalog() { + if (catalog == null) { + this.catalog = catalogConfig.catalog(); + } + return catalog; + } + + private RecordWriter createWriter(IcebergDestination destination) throws IOException { + return new RecordWriter(getCatalog(), destination, "-" + UUID.randomUUID()); + } + + @ProcessElement + public void processElement( + ProcessContext c, @Element KV, Iterable> element) throws Exception { + + Row destMetadata = element.getKey().getKey(); + IcebergDestination destination = dynamicDestinations.instantiateDestination(destMetadata); + RecordWriter writer = createWriter(destination); + + for (Row e : element.getValue()) { + writer.write(e); + } + + writer.close(); + c.output( + FileWriteResult.builder() + .setTableIdentifier(destination.getTableIdentifier()) + .setDataFile(writer.dataFile()) + .setPartitionSpec(writer.getTable().spec()) + .build()); + } + } +} diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/WriteToDestinations.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/WriteToDestinations.java new file mode 100644 index 000000000000..22e27a4df7ca --- /dev/null +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/WriteToDestinations.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.io.iceberg; + +import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull; + +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.RowCoder; +import org.apache.beam.sdk.coders.ShardedKeyCoder; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.transforms.Flatten; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.SimpleFunction; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionList; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.ShardedKey; +import org.apache.iceberg.Snapshot; + +class WriteToDestinations extends PTransform, IcebergWriteResult> { + + static final long DEFAULT_MAX_BYTES_PER_FILE = (1L << 40); // 1TB + static final int DEFAULT_NUM_FILE_SHARDS = 0; + static final int FILE_TRIGGERING_RECORD_COUNT = 50_000; + + private final IcebergCatalogConfig catalogConfig; + private final DynamicDestinations dynamicDestinations; + + WriteToDestinations(IcebergCatalogConfig catalogConfig, DynamicDestinations dynamicDestinations) { + this.dynamicDestinations = dynamicDestinations; + this.catalogConfig = catalogConfig; + } + + @Override + public IcebergWriteResult expand(PCollection input) { + + // First, attempt to write directly to files without shuffling. If there are + // too many distinct destinations in a single bundle, the remaining + // elements will be emitted to take the "slow path" that involves a shuffle + WriteUngroupedRowsToFiles.Result writeUngroupedResult = + input.apply( + "Fast-path write rows", + new WriteUngroupedRowsToFiles(catalogConfig, dynamicDestinations)); + + // Then write the rest by shuffling on the destination metadata + Schema destSchema = + checkArgumentNotNull( + writeUngroupedResult + .getSpilledRows() + .getSchema() + .getField("dest") + .getType() + .getRowSchema(), + "Input schema missing `dest` field."); + Schema dataSchema = + checkArgumentNotNull( + writeUngroupedResult + .getSpilledRows() + .getSchema() + .getField("data") + .getType() + .getRowSchema(), + "Input schema missing `data` field"); + + PCollection writeGroupedResult = + writeUngroupedResult + .getSpilledRows() + .apply( + "Key by destination and shard", + MapElements.via( + new SimpleFunction, Row>>() { + private static final int SPILLED_ROWS_SHARDING_FACTOR = 10; + private int shardNumber = 0; + + @Override + public KV, Row> apply(Row elem) { + Row data = + checkArgumentNotNull( + elem.getRow("data"), "Element missing `data` field"); + Row dest = + checkArgumentNotNull( + elem.getRow("dest"), "Element missing `dest` field"); + return KV.of( + ShardedKey.of(dest, shardNumber % SPILLED_ROWS_SHARDING_FACTOR), data); + } + })) + .setCoder( + KvCoder.of(ShardedKeyCoder.of(RowCoder.of(destSchema)), RowCoder.of(dataSchema))) + .apply("Group spilled rows by destination shard", GroupByKey.create()) + .apply( + "Write remaining rows to files", + new WriteGroupedRowsToFiles(catalogConfig, dynamicDestinations)); + + PCollection allWrittenFiles = + PCollectionList.of(writeUngroupedResult.getWrittenFiles()) + .and(writeGroupedResult) + .apply("Flatten Written Files", Flatten.pCollections()); + + // Apply any sharded writes and flatten everything for catalog updates + PCollection> snapshots = + allWrittenFiles.apply(new AppendFilesToTables(catalogConfig)); + + return new IcebergWriteResult(input.getPipeline(), snapshots); + } +} diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/WriteUngroupedRowsToFiles.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/WriteUngroupedRowsToFiles.java new file mode 100644 index 000000000000..240678c83697 --- /dev/null +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/WriteUngroupedRowsToFiles.java @@ -0,0 +1,350 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.io.iceberg; + +import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull; +import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.PInput; +import org.apache.beam.sdk.values.POutput; +import org.apache.beam.sdk.values.PValue; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.TupleTagList; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps; +import org.apache.iceberg.catalog.Catalog; +import org.checkerframework.checker.nullness.qual.MonotonicNonNull; +import org.checkerframework.checker.nullness.qual.Nullable; + +/** + * A PTransform that writes rows to files according to their dynamic destination. If there are too + * many destinations in a single bundle, some rows will be written to a secondary output and must be + * written via another method. + */ +class WriteUngroupedRowsToFiles + extends PTransform, WriteUngroupedRowsToFiles.Result> { + + /** + * Maximum number of writers that will be created per bundle. Any elements requiring more writers + * will be spilled. + */ + @VisibleForTesting static final int DEFAULT_MAX_WRITERS_PER_BUNDLE = 20; + + private static final long DEFAULT_MAX_BYTES_PER_FILE = (1L << 29); // 512mb + + private static final TupleTag WRITTEN_FILES_TAG = new TupleTag<>("writtenFiles"); + private static final TupleTag WRITTEN_ROWS_TAG = new TupleTag("writtenRows") {}; + private static final TupleTag SPILLED_ROWS_TAG = new TupleTag("spilledRows") {}; + + private final String fileSuffix; + private final DynamicDestinations dynamicDestinations; + private final IcebergCatalogConfig catalogConfig; + + WriteUngroupedRowsToFiles( + IcebergCatalogConfig catalogConfig, DynamicDestinations dynamicDestinations) { + this.catalogConfig = catalogConfig; + this.dynamicDestinations = dynamicDestinations; + this.fileSuffix = UUID.randomUUID().toString(); + } + + @Override + public Result expand(PCollection input) { + + PCollectionTuple resultTuple = + input.apply( + ParDo.of( + new WriteUngroupedRowsToFilesDoFn( + catalogConfig, + dynamicDestinations, + fileSuffix, + DEFAULT_MAX_WRITERS_PER_BUNDLE, + DEFAULT_MAX_BYTES_PER_FILE)) + .withOutputTags( + WRITTEN_FILES_TAG, + TupleTagList.of(ImmutableList.of(WRITTEN_ROWS_TAG, SPILLED_ROWS_TAG)))); + + return new Result( + input.getPipeline(), + resultTuple.get(WRITTEN_FILES_TAG), + resultTuple.get(WRITTEN_ROWS_TAG).setCoder(input.getCoder()), + resultTuple.get(SPILLED_ROWS_TAG).setCoder(input.getCoder())); + } + + /** + * The result of this transform has two components: the records that were written and the records + * that spilled over and need to be written by a subsquent method. + */ + static class Result implements POutput { + + private final Pipeline pipeline; + private final PCollection writtenRows; + private final PCollection spilledRows; + private final PCollection writtenFiles; + + private Result( + Pipeline pipeline, + PCollection writtenFiles, + PCollection writtenRows, + PCollection spilledRows) { + this.pipeline = pipeline; + this.writtenFiles = writtenFiles; + this.writtenRows = writtenRows; + this.spilledRows = spilledRows; + } + + public PCollection getWrittenRows() { + return writtenRows; + } + + public PCollection getSpilledRows() { + return spilledRows; + } + + public PCollection getWrittenFiles() { + return writtenFiles; + } + + @Override + public Pipeline getPipeline() { + return pipeline; + } + + @Override + public Map, PValue> expand() { + return ImmutableMap., PValue>builder() + .put(WRITTEN_FILES_TAG, writtenFiles) + .put(WRITTEN_ROWS_TAG, writtenRows) + .put(SPILLED_ROWS_TAG, spilledRows) + .build(); + } + + @Override + public void finishSpecifyingOutput( + String transformName, PInput input, PTransform transform) { + // noop + } + } + + /** + * A DoFn that writes each input row to its assigned destination and outputs a result object + * summarizing what it accomplished for a given bundle. + * + *

Specifically, the outputs are: + * + *

    + *
  • (main output) the written files + *
  • the written records + *
  • the spilled records which were not written + *
+ */ + private static class WriteUngroupedRowsToFilesDoFn extends DoFn { + + private final String filename; + private final int maxWritersPerBundle; + private final long maxFileSize; + private final DynamicDestinations dynamicDestinations; + private final IcebergCatalogConfig catalogConfig; + + private transient @MonotonicNonNull Map writers; + private transient @MonotonicNonNull Map windows; + private transient @MonotonicNonNull Catalog catalog; + + public WriteUngroupedRowsToFilesDoFn( + IcebergCatalogConfig catalogConfig, + DynamicDestinations dynamicDestinations, + String filename, + int maximumWritersPerBundle, + long maxFileSize) { + this.catalogConfig = catalogConfig; + this.dynamicDestinations = dynamicDestinations; + this.filename = filename; + this.maxWritersPerBundle = maximumWritersPerBundle; + this.maxFileSize = maxFileSize; + } + + private Map getWriters() { + if (writers == null) { + writers = Maps.newHashMap(); + } + return writers; + } + + private Map getWindows() { + if (windows == null) { + windows = Maps.newHashMap(); + } + return windows; + } + + private org.apache.iceberg.catalog.Catalog getCatalog() { + if (catalog == null) { + this.catalog = catalogConfig.catalog(); + } + return catalog; + } + + private RecordWriter createAndInsertWriter(IcebergDestination destination, BoundedWindow window) + throws IOException { + RecordWriter writer = + new RecordWriter(getCatalog(), destination, filename + "-" + UUID.randomUUID()); + getWindows().put(destination, window); + getWriters().put(destination, writer); + return writer; + } + + /** + * Returns active writer for this destination if possible. If this returns null then we have + * reached the maximum number of writers and should spill any records associated. + */ + @Nullable + RecordWriter getWriterIfPossible(IcebergDestination destination, BoundedWindow window) + throws IOException { + + RecordWriter existingWriter = getWriters().get(destination); + if (existingWriter != null) { + return existingWriter; + } + + if (getWriters().size() > maxWritersPerBundle) { + return null; + } + + return createAndInsertWriter(destination, window); + } + + @StartBundle + public void startBundle() {} + + @ProcessElement + public void processElement(@Element Row element, BoundedWindow window, MultiOutputReceiver out) + throws Exception { + + Row data = checkArgumentNotNull(element.getRow("data"), "Input row missing `data` field."); + Row destMetadata = + checkArgumentNotNull(element.getRow("dest"), "Input row missing `dest` field."); + IcebergDestination destination = dynamicDestinations.instantiateDestination(destMetadata); + + // Spill record if writer cannot be created + RecordWriter writer = getWriterIfPossible(destination, window); + if (writer == null) { + out.get(SPILLED_ROWS_TAG).output(element); + return; + } + + // Reset writer if max file size reached + if (writer.bytesWritten() > maxFileSize) { + writer.close(); + out.get(WRITTEN_FILES_TAG) + .output( + FileWriteResult.builder() + .setDataFile(writer.dataFile()) + .setTableIdentifier(destination.getTableIdentifier()) + .build()); + writer = createAndInsertWriter(destination, window); + } + + // Actually write the data + try { + writer.write(data); + out.get(WRITTEN_ROWS_TAG).output(element); + } catch (Exception e) { + try { + writer.close(); + } catch (Exception closeException) { + e.addSuppressed(closeException); + } + throw e; + } + } + + @FinishBundle + public void finishBundle(FinishBundleContext c) throws Exception { + closeAllWriters(); + outputFinalWrittenFiles(c); + getWriters().clear(); + } + + private void outputFinalWrittenFiles(DoFn.FinishBundleContext c) + throws Exception { + List exceptionList = Lists.newArrayList(); + for (Map.Entry entry : getWriters().entrySet()) { + try { + IcebergDestination destination = entry.getKey(); + + RecordWriter writer = entry.getValue(); + BoundedWindow window = + checkStateNotNull( + getWindows().get(destination), "internal error: no windows for destination"); + c.output( + FileWriteResult.builder() + .setDataFile(writer.dataFile()) + .setTableIdentifier(destination.getTableIdentifier()) + .setPartitionSpec(writer.getTable().spec()) + .build(), + window.maxTimestamp(), + window); + } catch (Exception e) { + exceptionList.add(e); + } + } + + if (!exceptionList.isEmpty()) { + Exception e = + new IOException("Exception emitting writer metadata. See suppressed exceptions"); + for (Exception thrown : exceptionList) { + e.addSuppressed(thrown); + } + throw e; + } + } + + private void closeAllWriters() throws Exception { + List exceptionList = Lists.newArrayList(); + for (RecordWriter writer : getWriters().values()) { + try { + writer.close(); + } catch (Exception e) { + exceptionList.add(e); + } + } + + if (!exceptionList.isEmpty()) { + Exception e = new IOException("Exception closing some writers. See suppressed exceptions."); + for (Exception thrown : exceptionList) { + e.addSuppressed(thrown); + } + throw e; + } + } + } +} diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/package-info.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/package-info.java new file mode 100644 index 000000000000..f97ff98677f9 --- /dev/null +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/package-info.java @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** Iceberg connectors. */ +package org.apache.beam.io.iceberg; diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/io/iceberg/FileWriteResultTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/io/iceberg/FileWriteResultTest.java new file mode 100644 index 000000000000..2499331beadc --- /dev/null +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/io/iceberg/FileWriteResultTest.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.io.iceberg; + +import static org.apache.iceberg.types.Types.NestedField.optional; +import static org.apache.iceberg.types.Types.NestedField.required; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; + +import java.io.Serializable; +import java.util.List; +import java.util.UUID; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.testing.CoderProperties; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.TypeDescriptor; +import org.apache.commons.compress.utils.Lists; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.types.Types; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class FileWriteResultTest implements Serializable { + + @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); + + @Rule public TestDataWarehouse warehouse = new TestDataWarehouse(TEMPORARY_FOLDER, "default"); + + private static final Coder TEST_CODER = + FileWriteResult.FileWriteResultCoder.of(); + + private List getTestValues() throws Exception { + TableIdentifier tableId = + TableIdentifier.of("default", "table" + Long.toString(UUID.randomUUID().hashCode(), 16)); + + // Create a table so we can have some DataFile objects + Table table = warehouse.createTable(tableId, TestFixtures.SCHEMA); + List values = Lists.newArrayList(); + + // A parquet file + RecordWriter writer = + new RecordWriter(table, FileFormat.PARQUET, TEMPORARY_FOLDER.newFile().toString()); + writer.write( + Row.withSchema(SchemaHelper.convert(TestFixtures.SCHEMA)).addValues(42L, "bizzle").build()); + writer.close(); + DataFile dataFile = writer.dataFile(); + values.add( + FileWriteResult.builder() + .setDataFile(dataFile) + .setPartitionSpec(table.spec()) + .setTableIdentifier(tableId) + .build()); + + // An avro file + writer = new RecordWriter(table, FileFormat.AVRO, TEMPORARY_FOLDER.newFile().toString()); + writer.write( + Row.withSchema(SchemaHelper.convert(TestFixtures.SCHEMA)).addValues(42L, "bizzle").build()); + writer.close(); + dataFile = writer.dataFile(); + values.add( + FileWriteResult.builder() + .setDataFile(dataFile) + .setPartitionSpec(table.spec()) + .setTableIdentifier(tableId) + .build()); + + // Parquet file with a different schema + TableIdentifier tableId2 = + TableIdentifier.of( + "default", "othertable" + Long.toString(UUID.randomUUID().hashCode(), 16)); + Schema schema = + new Schema( + required(1, "id", Types.LongType.get()), + optional(2, "data", Types.StringType.get()), + optional( + 3, + "extra", + Types.StructType.of( + Types.NestedField.required(4, "inner", Types.BinaryType.get())))); + Table table2 = warehouse.createTable(tableId2, schema); + + // A parquet file in this other table + writer = new RecordWriter(table2, FileFormat.PARQUET, TEMPORARY_FOLDER.newFile().toString()); + writer.write( + Row.withSchema(SchemaHelper.convert(schema)) + .addValues( + 42L, + "bizzle", + Row.withSchema( + org.apache.beam.sdk.schemas.Schema.of( + org.apache.beam.sdk.schemas.Schema.Field.of( + "inner", org.apache.beam.sdk.schemas.Schema.FieldType.BYTES))) + .addValues(new byte[] {0xa}) + .build()) + .build()); + writer.close(); + DataFile dataFile2 = writer.dataFile(); + values.add( + FileWriteResult.builder() + .setDataFile(dataFile2) + .setPartitionSpec(table2.spec()) + .setTableIdentifier(tableId2) + .build()); + + return values; + } + + @Test + public void testDecodeEncodeEqual() throws Exception { + for (FileWriteResult value : getTestValues()) { + CoderProperties.structuralValueDecodeEncodeEqual(TEST_CODER, value); + } + } + + @Rule public ExpectedException thrown = ExpectedException.none(); + + @Test + public void testEncodedTypeDescriptor() throws Exception { + assertThat( + TEST_CODER.getEncodedTypeDescriptor(), equalTo(TypeDescriptor.of(FileWriteResult.class))); + } +} diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/io/iceberg/IcebergIOWriteTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/io/iceberg/IcebergIOWriteTest.java new file mode 100644 index 000000000000..c77d162aafd4 --- /dev/null +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/io/iceberg/IcebergIOWriteTest.java @@ -0,0 +1,293 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.io.iceberg; + +import static org.apache.beam.io.iceberg.RowHelper.rowToRecord; +import static org.hamcrest.MatcherAssert.assertThat; + +import java.io.Serializable; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps; +import org.apache.commons.compress.utils.Lists; +import org.apache.iceberg.AppendFiles; +import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.IcebergGenerics; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.data.parquet.GenericParquetWriter; +import org.apache.iceberg.io.DataWriter; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.parquet.Parquet; +import org.hamcrest.Matchers; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@RunWith(JUnit4.class) +public class IcebergIOWriteTest implements Serializable { + private static final Logger LOG = LoggerFactory.getLogger(IcebergIOWriteTest.class); + + @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); + + @Rule + public transient TestDataWarehouse warehouse = new TestDataWarehouse(TEMPORARY_FOLDER, "default"); + + @Rule public transient TestPipeline testPipeline = TestPipeline.create(); + + @Test + public void testSimpleAppend() throws Exception { + TableIdentifier tableId = + TableIdentifier.of("default", "table" + Long.toString(UUID.randomUUID().hashCode(), 16)); + + // Create a table and add records to it. + Table table = warehouse.createTable(tableId, TestFixtures.SCHEMA); + + IcebergCatalogConfig catalog = + IcebergCatalogConfig.builder() + .setName("hadoop") + .setIcebergCatalogType(CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP) + .setWarehouseLocation(warehouse.location) + .build(); + + DynamicDestinations destination = DynamicDestinations.singleTable(tableId); + + testPipeline + .apply("Records To Add", Create.of(TestFixtures.asRows(TestFixtures.FILE1SNAPSHOT1))) + .setRowSchema(SchemaHelper.convert(TestFixtures.SCHEMA)) + .apply("Append To Table", IcebergIO.writeToDynamicDestinations(catalog, destination)); + + LOG.info("Executing pipeline"); + testPipeline.run().waitUntilFinish(); + LOG.info("Done running pipeline"); + + List writtenRecords = ImmutableList.copyOf(IcebergGenerics.read(table).build()); + + assertThat(writtenRecords, Matchers.containsInAnyOrder(TestFixtures.FILE1SNAPSHOT1.toArray())); + } + + /** Tests that a small write to three different tables with dynamic destinations works. */ + @Test + public void testDynamicDestinationsWithoutSpillover() throws Exception { + final String salt = Long.toString(UUID.randomUUID().hashCode(), 16); + final TableIdentifier table1Id = TableIdentifier.of("default", "table1-" + salt); + final TableIdentifier table2Id = TableIdentifier.of("default", "table2-" + salt); + final TableIdentifier table3Id = TableIdentifier.of("default", "table3-" + salt); + + // Create a table and add records to it. + Table table1 = warehouse.createTable(table1Id, TestFixtures.SCHEMA); + Table table2 = warehouse.createTable(table2Id, TestFixtures.SCHEMA); + Table table3 = warehouse.createTable(table3Id, TestFixtures.SCHEMA); + + IcebergCatalogConfig catalog = + IcebergCatalogConfig.builder() + .setName("hadoop") + .setIcebergCatalogType(CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP) + .setWarehouseLocation(warehouse.location) + .build(); + + DynamicDestinations destination = + new DynamicDestinations() { + private final Schema schema = Schema.builder().addInt64Field("tableNumber").build(); + + @Override + public Schema getMetadataSchema() { + return schema; + } + + @Override + public Row assignDestinationMetadata(Row data) { + long rowId = data.getInt64("id"); + return Row.withSchema(schema).addValues((rowId / 3) + 1).build(); + } + + @Override + public IcebergDestination instantiateDestination(Row dest) { + return IcebergDestination.builder() + .setTableIdentifier( + TableIdentifier.of( + "default", "table" + dest.getInt64("tableNumber") + "-" + salt)) + .setFileFormat(FileFormat.PARQUET) + .build(); + } + }; + + testPipeline + .apply( + "Records To Add", + Create.of( + TestFixtures.asRows( + Iterables.concat( + TestFixtures.FILE1SNAPSHOT1, + TestFixtures.FILE1SNAPSHOT2, + TestFixtures.FILE1SNAPSHOT3)))) + .setRowSchema(SchemaHelper.convert(TestFixtures.SCHEMA)) + .apply("Append To Table", IcebergIO.writeToDynamicDestinations(catalog, destination)); + + LOG.info("Executing pipeline"); + testPipeline.run().waitUntilFinish(); + LOG.info("Done running pipeline"); + + List writtenRecords1 = ImmutableList.copyOf(IcebergGenerics.read(table1).build()); + List writtenRecords2 = ImmutableList.copyOf(IcebergGenerics.read(table2).build()); + List writtenRecords3 = ImmutableList.copyOf(IcebergGenerics.read(table3).build()); + + assertThat(writtenRecords1, Matchers.containsInAnyOrder(TestFixtures.FILE1SNAPSHOT1.toArray())); + assertThat(writtenRecords2, Matchers.containsInAnyOrder(TestFixtures.FILE1SNAPSHOT2.toArray())); + assertThat(writtenRecords3, Matchers.containsInAnyOrder(TestFixtures.FILE1SNAPSHOT3.toArray())); + } + + /** + * Tests writing to enough destinations to spill over to the "slow" write path. + * + *

Note that we could have added a config to lower the spill number but exceeding the default + * is fine + */ + @Test + public void testDynamicDestinationsWithSpillover() throws Exception { + final String salt = Long.toString(UUID.randomUUID().hashCode(), 16); + + // Create far more tables than the max writers per bundle + int numDestinations = 5 * WriteUngroupedRowsToFiles.DEFAULT_MAX_WRITERS_PER_BUNDLE; + List tableIdentifiers = Lists.newArrayList(); + List tables = Lists.newArrayList(); + for (int i = 0; i < numDestinations; ++i) { + TableIdentifier id = TableIdentifier.of("default", "table" + i + "-" + salt); + tableIdentifiers.add(id); + tables.add(warehouse.createTable(id, TestFixtures.SCHEMA)); + } + + // Create plenty of data to hit each table + int numElements = 10 * numDestinations; + List elements = Lists.newArrayList(); + final Record genericRecord = GenericRecord.create(TestFixtures.SCHEMA); + Map> elementsPerTable = Maps.newHashMap(); + for (int i = 0; i < numElements; ++i) { + Record element = genericRecord.copy(ImmutableMap.of("id", (long) i, "data", "data for " + i)); + TableIdentifier tableId = tableIdentifiers.get(i % numDestinations); + elements.add(element); + elementsPerTable.computeIfAbsent(tableId, ignored -> Lists.newArrayList()).add(element); + } + + IcebergCatalogConfig catalog = + IcebergCatalogConfig.builder() + .setName("hadoop") + .setIcebergCatalogType(CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP) + .setWarehouseLocation(warehouse.location) + .build(); + + DynamicDestinations destination = + new DynamicDestinations() { + private final Schema schema = Schema.builder().addInt64Field("tableNumber").build(); + + @Override + public Schema getMetadataSchema() { + return schema; + } + + @Override + public Row assignDestinationMetadata(Row data) { + long rowId = data.getInt64("id"); + return Row.withSchema(schema).addValues(rowId % numDestinations).build(); + } + + @Override + public IcebergDestination instantiateDestination(Row dest) { + return IcebergDestination.builder() + .setTableIdentifier( + TableIdentifier.of( + "default", "table" + dest.getInt64("tableNumber") + "-" + salt)) + .setFileFormat(FileFormat.PARQUET) + .build(); + } + }; + + testPipeline + .apply("Records To Add", Create.of(TestFixtures.asRows(elements))) + .setRowSchema(SchemaHelper.convert(TestFixtures.SCHEMA)) + .apply("Append To Table", IcebergIO.writeToDynamicDestinations(catalog, destination)); + + LOG.info("Executing pipeline"); + testPipeline.run().waitUntilFinish(); + LOG.info("Done running pipeline"); + + for (int i = 0; i < numDestinations; ++i) { + TableIdentifier tableId = tableIdentifiers.get(i); + Table table = tables.get(i); + List writtenRecords = ImmutableList.copyOf(IcebergGenerics.read(table).build()); + assertThat( + writtenRecords, Matchers.containsInAnyOrder(elementsPerTable.get(tableId).toArray())); + } + } + + /** + * A test of our assumptions about how two commits of the same file work in iceberg's Java API. + */ + @Test + public void testIdempotentCommit() throws Exception { + TableIdentifier tableId = + TableIdentifier.of("default", "table" + Long.toString(UUID.randomUUID().hashCode(), 16)); + + // Create a table and add records to it. + Table table = warehouse.createTable(tableId, TestFixtures.SCHEMA); + Record record = + rowToRecord( + table.schema(), + Row.withSchema(SchemaHelper.convert(TestFixtures.SCHEMA)) + .addValues(42L, "bizzle") + .build()); + + OutputFile outputFile = table.io().newOutputFile(TEMPORARY_FOLDER.newFile().toString()); + DataWriter icebergDataWriter = + Parquet.writeData(outputFile) + .createWriterFunc(GenericParquetWriter::buildWriter) + .schema(table.schema()) + .withSpec(table.spec()) + .overwrite() + .build(); + + icebergDataWriter.write(record); + icebergDataWriter.close(); + DataFile dataFile = icebergDataWriter.toDataFile(); + + AppendFiles update = table.newAppend(); + update.appendFile(dataFile); + update.commit(); + + AppendFiles secondUpdate = table.newAppend(); + secondUpdate.appendFile(dataFile); + secondUpdate.commit(); + } +} diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/io/iceberg/RowHelperTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/io/iceberg/RowHelperTest.java new file mode 100644 index 000000000000..931937f407dd --- /dev/null +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/io/iceberg/RowHelperTest.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.io.iceberg; + +import static org.apache.iceberg.types.Types.NestedField.required; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; + +import java.io.Serializable; +import java.nio.ByteBuffer; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.values.Row; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; +import org.joda.time.DateTime; +import org.joda.time.DateTimeZone; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class RowHelperTest implements Serializable { + + /** + * Checks a value that when converted to Iceberg type is the same value when interpreted in Java. + */ + private void checkTypeConversion(Schema.FieldType sourceType, Type destType, Object value) { + checkTypeConversion(sourceType, value, destType, value); + } + + private void checkTypeConversion( + Schema.FieldType sourceType, Object sourceValue, Type destType, Object destValue) { + Schema beamSchema = Schema.of(Schema.Field.of("v", sourceType)); + Row row = Row.withSchema(beamSchema).addValues(sourceValue).build(); + + org.apache.iceberg.Schema icebergSchema = + new org.apache.iceberg.Schema(required(0, "v", destType)); + Record record = RowHelper.rowToRecord(icebergSchema, row); + + assertThat(record.getField("v"), equalTo(destValue)); + } + + @Test + public void testBoolean() throws Exception { + checkTypeConversion(Schema.FieldType.BOOLEAN, Types.BooleanType.get(), true); + checkTypeConversion(Schema.FieldType.BOOLEAN, Types.BooleanType.get(), false); + } + + @Test + public void testInteger() throws Exception { + checkTypeConversion(Schema.FieldType.INT32, Types.IntegerType.get(), -13); + checkTypeConversion(Schema.FieldType.INT32, Types.IntegerType.get(), 42); + checkTypeConversion(Schema.FieldType.INT32, Types.IntegerType.get(), 0); + } + + @Test + public void testLong() throws Exception { + checkTypeConversion(Schema.FieldType.INT64, Types.LongType.get(), 13L); + checkTypeConversion(Schema.FieldType.INT64, Types.LongType.get(), 42L); + } + + @Test + public void testFloat() throws Exception { + checkTypeConversion(Schema.FieldType.FLOAT, Types.FloatType.get(), 3.14159f); + checkTypeConversion(Schema.FieldType.FLOAT, Types.FloatType.get(), 42.0f); + } + + @Test + public void testDouble() throws Exception { + checkTypeConversion(Schema.FieldType.DOUBLE, Types.DoubleType.get(), 3.14159); + } + + @Test + public void testDate() throws Exception {} + + @Test + public void testTime() throws Exception {} + + @Test + public void testTimestamp() throws Exception { + DateTime dateTime = + new DateTime().withDate(1979, 03, 14).withTime(1, 2, 3, 4).withZone(DateTimeZone.UTC); + + checkTypeConversion( + Schema.FieldType.DATETIME, + dateTime.toInstant(), + Types.TimestampType.withoutZone(), + dateTime.getMillis()); + } + + @Test + public void testFixed() throws Exception {} + + @Test + public void testBinary() throws Exception { + byte[] bytes = new byte[] {1, 2, 3, 4}; + checkTypeConversion( + Schema.FieldType.BYTES, bytes, Types.BinaryType.get(), ByteBuffer.wrap(bytes)); + } + + @Test + public void testDecimal() throws Exception {} + + @Test + public void testStruct() throws Exception {} + + @Test + public void testMap() throws Exception {} + + @Test + public void testList() throws Exception {} +} diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/io/iceberg/TestDataWarehouse.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/io/iceberg/TestDataWarehouse.java new file mode 100644 index 000000000000..0fc704cfc087 --- /dev/null +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/io/iceberg/TestDataWarehouse.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.io.iceberg; + +import static org.apache.iceberg.hadoop.HadoopOutputFile.fromPath; + +import java.io.File; +import java.io.IOException; +import java.util.List; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataFiles; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.data.orc.GenericOrcWriter; +import org.apache.iceberg.data.parquet.GenericParquetWriter; +import org.apache.iceberg.hadoop.HadoopCatalog; +import org.apache.iceberg.hadoop.HadoopInputFile; +import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.orc.ORC; +import org.apache.iceberg.parquet.Parquet; +import org.junit.Assert; +import org.junit.rules.ExternalResource; +import org.junit.rules.TemporaryFolder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TestDataWarehouse extends ExternalResource { + private static final Logger LOG = LoggerFactory.getLogger(TestDataWarehouse.class); + + protected final TemporaryFolder temporaryFolder; + protected final String database; + + protected final Configuration hadoopConf; + + protected String location; + protected Catalog catalog; + protected boolean someTableHasBeenCreated = false; + + public TestDataWarehouse(TemporaryFolder temporaryFolder, String database) { + this.temporaryFolder = temporaryFolder; + this.database = database; + this.hadoopConf = new Configuration(); + } + + @Override + protected void before() throws Throwable { + File warehouseFile = temporaryFolder.newFolder(); + Assert.assertTrue(warehouseFile.delete()); + location = "file:" + warehouseFile.toString(); + catalog = + CatalogUtil.loadCatalog( + CatalogUtil.ICEBERG_CATALOG_HADOOP, + "hadoop", + ImmutableMap.of(CatalogProperties.WAREHOUSE_LOCATION, location), + hadoopConf); + } + + @Override + protected void after() { + if (!someTableHasBeenCreated) { + return; + } + + List tables = catalog.listTables(Namespace.of(database)); + + LOG.info("Cleaning up {} tables in test warehouse", tables.size()); + for (TableIdentifier t : tables) { + try { + LOG.info("Removing table {}", t); + catalog.dropTable(t); + } catch (Exception e) { + LOG.error("Unable to remove table", e); + } + } + try { + ((HadoopCatalog) catalog).close(); + } catch (Exception e) { + LOG.error("Unable to close catalog", e); + } + } + + public DataFile writeRecords(String filename, Schema schema, List records) + throws IOException { + Path path = new Path(location, filename); + FileFormat format = FileFormat.fromFileName(filename); + + FileAppender appender; + switch (format) { + case PARQUET: + appender = + Parquet.write(fromPath(path, hadoopConf)) + .createWriterFunc(GenericParquetWriter::buildWriter) + .schema(schema) + .overwrite() + .build(); + break; + case ORC: + appender = + ORC.write(fromPath(path, hadoopConf)) + .createWriterFunc(GenericOrcWriter::buildWriter) + .schema(schema) + .overwrite() + .build(); + break; + default: + throw new IOException("Unable to create appender for " + format); + } + appender.addAll(records); + appender.close(); + return DataFiles.builder(PartitionSpec.unpartitioned()) + .withInputFile(HadoopInputFile.fromPath(path, hadoopConf)) + .withMetrics(appender.metrics()) + .build(); + } + + public Table createTable(TableIdentifier tableId, Schema schema) { + someTableHasBeenCreated = true; + return catalog.createTable(tableId, schema); + } +} diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/io/iceberg/TestFixtures.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/io/iceberg/TestFixtures.java new file mode 100644 index 000000000000..a39754fb7149 --- /dev/null +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/io/iceberg/TestFixtures.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.io.iceberg; + +import static org.apache.iceberg.types.Types.NestedField.optional; +import static org.apache.iceberg.types.Types.NestedField.required; + +import java.util.ArrayList; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.Schema; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.types.Types; + +public class TestFixtures { + public static final Schema SCHEMA = + new Schema( + required(1, "id", Types.LongType.get()), optional(2, "data", Types.StringType.get())); + + private static final Record genericRecord = GenericRecord.create(SCHEMA); + + /* First file in test table */ + public static final ImmutableList FILE1SNAPSHOT1 = + ImmutableList.of( + genericRecord.copy(ImmutableMap.of("id", 0L, "data", "clarification")), + genericRecord.copy(ImmutableMap.of("id", 1L, "data", "risky")), + genericRecord.copy(ImmutableMap.of("id", 2L, "data", "falafel"))); + public static final ImmutableList FILE1SNAPSHOT2 = + ImmutableList.of( + genericRecord.copy(ImmutableMap.of("id", 3L, "data", "obscure")), + genericRecord.copy(ImmutableMap.of("id", 4L, "data", "secure")), + genericRecord.copy(ImmutableMap.of("id", 5L, "data", "feta"))); + public static final ImmutableList FILE1SNAPSHOT3 = + ImmutableList.of( + genericRecord.copy(ImmutableMap.of("id", 6L, "data", "brainy")), + genericRecord.copy(ImmutableMap.of("id", 7L, "data", "film")), + genericRecord.copy(ImmutableMap.of("id", 8L, "data", "feta"))); + + /* Second file in test table */ + public static final ImmutableList FILE2SNAPSHOT1 = + ImmutableList.of( + genericRecord.copy(ImmutableMap.of("id", 10L, "data", "clammy")), + genericRecord.copy(ImmutableMap.of("id", 11L, "data", "evacuate")), + genericRecord.copy(ImmutableMap.of("id", 12L, "data", "tissue"))); + public static final ImmutableList FILE2SNAPSHOT2 = + ImmutableList.of( + genericRecord.copy(ImmutableMap.of("id", 14L, "data", "radical")), + genericRecord.copy(ImmutableMap.of("id", 15L, "data", "collocation")), + genericRecord.copy(ImmutableMap.of("id", 16L, "data", "book"))); + public static final ImmutableList FILE2SNAPSHOT3 = + ImmutableList.of( + genericRecord.copy(ImmutableMap.of("id", 16L, "data", "cake")), + genericRecord.copy(ImmutableMap.of("id", 17L, "data", "intrinsic")), + genericRecord.copy(ImmutableMap.of("id", 18L, "data", "paper"))); + + /* Third file in test table */ + public static final ImmutableList FILE3SNAPSHOT1 = + ImmutableList.of( + genericRecord.copy(ImmutableMap.of("id", 20L, "data", "ocean")), + genericRecord.copy(ImmutableMap.of("id", 21L, "data", "holistic")), + genericRecord.copy(ImmutableMap.of("id", 22L, "data", "preventative"))); + public static final ImmutableList FILE3SNAPSHOT2 = + ImmutableList.of( + genericRecord.copy(ImmutableMap.of("id", 24L, "data", "cloud")), + genericRecord.copy(ImmutableMap.of("id", 25L, "data", "zen")), + genericRecord.copy(ImmutableMap.of("id", 26L, "data", "sky"))); + public static final ImmutableList FILE3SNAPSHOT3 = + ImmutableList.of( + genericRecord.copy(ImmutableMap.of("id", 26L, "data", "belleview")), + genericRecord.copy(ImmutableMap.of("id", 27L, "data", "overview")), + genericRecord.copy(ImmutableMap.of("id", 28L, "data", "tender"))); + + public static final ImmutableList asRows(Iterable records) { + ArrayList rows = new ArrayList<>(); + for (Record record : records) { + rows.add( + Row.withSchema(SchemaHelper.convert(SCHEMA)) + .withFieldValue("id", record.getField("id")) + .withFieldValue("data", record.getField("data")) + .build()); + } + return ImmutableList.copyOf(rows); + } +} diff --git a/settings.gradle.kts b/settings.gradle.kts index 1e52e425b215..cca547c9e04e 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -355,3 +355,5 @@ include("sdks:java:io:kafka:kafka-01103") findProject(":sdks:java:io:kafka:kafka-01103")?.name = "kafka-01103" include("sdks:java:managed") findProject(":sdks:java:managed")?.name = "managed" +include("sdks:java:io:iceberg") +findProject(":sdks:java:io:iceberg")?.name = "iceberg"