Skip to content

Commit

Permalink
Merge pull request apache#30797: Initial Iceberg Sink
Browse files Browse the repository at this point in the history
  • Loading branch information
kennknowles authored Apr 9, 2024
2 parents 812e98f + a7a6515 commit 819e54c
Show file tree
Hide file tree
Showing 26 changed files with 2,916 additions and 0 deletions.
118 changes: 118 additions & 0 deletions .github/workflows/IO_Iceberg.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

name: IcebergIO Unit Tests

on:
push:
tags: ['v*']
branches: ['master', 'release-*']
paths:
- "sdks/java/io/iceberg/**"
- ".github/workflows/IO_Iceberg.yml"
pull_request_target:
branches: ['master', 'release-*']
paths:
- "sdks/java/io/iceberg/**"
- 'release/trigger_all_tests.json'
- '.github/trigger_files/IO_Iceberg.json'
issue_comment:
types: [created]
schedule:
- cron: '15 1/6 * * *'
workflow_dispatch:

#Setting explicit permissions for the action to avoid the default permissions which are `write-all` in case of pull_request_target event
permissions:
actions: write
pull-requests: write
checks: write
contents: read
deployments: read
id-token: none
issues: write
discussions: read
packages: read
pages: read
repository-projects: read
security-events: read
statuses: read

# This allows a subsequently queued workflow run to interrupt previous runs
concurrency:
group: '${{ github.workflow }} @ ${{ github.event.issue.number || github.event.pull_request.head.label || github.sha || github.head_ref || github.ref }}-${{ github.event.schedule || github.event.comment.id || github.event.sender.login }}'
cancel-in-progress: true

env:
GRADLE_ENTERPRISE_ACCESS_KEY: ${{ secrets.GE_ACCESS_TOKEN }}
GRADLE_ENTERPRISE_CACHE_USERNAME: ${{ secrets.GE_CACHE_USERNAME }}
GRADLE_ENTERPRISE_CACHE_PASSWORD: ${{ secrets.GE_CACHE_PASSWORD }}

jobs:
IO_Iceberg:
name: ${{ matrix.job_name }} (${{ matrix.job_phrase }})
strategy:
matrix:
job_name: ["IO_Iceberg"]
timeout-minutes: 60
if: |
github.event_name == 'push' ||
github.event_name == 'pull_request_target' ||
(github.event_name == 'schedule' && github.repository == 'apache/beam') ||
github.event_name == 'workflow_dispatch' ||
github.event.comment.body == 'Run Java_Amqp_IO_Direct PreCommit'
runs-on: [self-hosted, ubuntu-20.04, main]
steps:
- uses: actions/checkout@v4
- name: Setup repository
uses: ./.github/actions/setup-action
with:
comment_phrase: ${{ matrix.job_phrase }}
github_token: ${{ secrets.GITHUB_TOKEN }}
github_job: ${{ matrix.job_name }} (${{ matrix.job_phrase }})
- name: Setup environment
uses: ./.github/actions/setup-environment-action
- name: run Amqp IO build script
uses: ./.github/actions/gradle-command-self-hosted-action
with:
gradle-command: :sdks:java:io:iceberg:build
arguments: |
-PdisableSpotlessCheck=true \
-PdisableCheckStyle=true \
- name: Archive JUnit Test Results
uses: actions/upload-artifact@v4
if: ${{ !success() }}
with:
name: JUnit Test Results
path: "**/build/reports/tests/"
- name: Publish JUnit Test Results
uses: EnricoMi/publish-unit-test-result-action@v2
if: always()
with:
commit: '${{ env.prsha || env.GITHUB_SHA }}'
comment_mode: ${{ github.event_name == 'issue_comment' && 'always' || 'off' }}
files: '**/build/test-results/**/*.xml'
- name: Archive SpotBugs Results
uses: actions/upload-artifact@v4
if: always()
with:
name: SpotBugs Results
path: '**/build/reports/spotbugs/*.html'
- name: Publish SpotBugs Results
uses: jwgmeligmeyling/[email protected]
if: always()
with:
name: Publish SpotBugs
path: '**/build/reports/spotbugs/*.html'
98 changes: 98 additions & 0 deletions sdks/java/io/iceberg/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
import java.util.stream.Collectors

/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* License); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an AS IS BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

plugins { id 'org.apache.beam.module' }
applyJavaNature(
automaticModuleName: 'org.apache.beam.sdk.io.iceberg',
)

description = "Apache Beam :: SDKs :: Java :: IO :: Iceberg"
ext.summary = "Integration with Iceberg data warehouses."

def hadoopVersions = [
"285": "2.8.5",
"292": "2.9.2",
"2102": "2.10.2",
"324": "3.2.4",
]

hadoopVersions.each {kv -> configurations.create("hadoopVersion$kv.key")}

def iceberg_version = "1.4.2"
def parquet_version = "1.12.0"
def orc_version = "1.9.2"
def hive_version = "3.1.3"

dependencies {
implementation library.java.vendored_guava_32_1_2_jre
implementation project(path: ":sdks:java:core", configuration: "shadow")
implementation project(":sdks:java:io:hadoop-common")
implementation library.java.slf4j_api
implementation "org.apache.parquet:parquet-column:$parquet_version"
implementation "org.apache.parquet:parquet-common:$parquet_version"
implementation "org.apache.orc:orc-core:$orc_version"
implementation "org.apache.iceberg:iceberg-core:$iceberg_version"
implementation "org.apache.iceberg:iceberg-api:$iceberg_version"
implementation "org.apache.iceberg:iceberg-parquet:$iceberg_version"
implementation "org.apache.iceberg:iceberg-orc:$iceberg_version"
implementation "org.apache.iceberg:iceberg-arrow:$iceberg_version"
implementation "org.apache.iceberg:iceberg-data:$iceberg_version"



provided library.java.avro
provided library.java.hadoop_client
permitUnusedDeclared library.java.hadoop_client
provided library.java.hadoop_common
testImplementation library.java.hadoop_client

testImplementation "org.apache.iceberg:iceberg-gcp:$iceberg_version"
testImplementation project(path: ":sdks:java:core", configuration: "shadowTest")
testImplementation library.java.junit
testRuntimeOnly library.java.slf4j_jdk14
testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadow")
hadoopVersions.each {kv ->
"hadoopVersion$kv.key" "org.apache.hadoop:hadoop-client:$kv.value"
}
}

hadoopVersions.each {kv ->
configurations."hadoopVersion$kv.key" {
resolutionStrategy {
force "org.apache.hadoop:hadoop-client:$kv.value"
}
}
}

task hadoopVersionsTest(group: "Verification") {
description = "Runs Iceberg tests with different Hadoop versions"
def taskNames = hadoopVersions.keySet().stream()
.map{num -> "hadoopVersion${num}Test"}
.collect(Collectors.toList())
dependsOn taskNames
}

hadoopVersions.each { kv ->
task "hadoopVersion${kv.key}Test"(type: Test, group: "Verification") {
description = "Runs Iceberg tests with Hadoop version $kv.value"
classpath = configurations."hadoopVersion$kv.key" + sourceSets.test.runtimeClasspath
include '**/*Test.class'
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.io.iceberg;

import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.WithKeys;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;

class AppendFilesToTables
extends PTransform<PCollection<FileWriteResult>, PCollection<KV<String, Snapshot>>> {

private final IcebergCatalogConfig catalogConfig;

AppendFilesToTables(IcebergCatalogConfig catalogConfig) {
this.catalogConfig = catalogConfig;
}

@Override
public PCollection<KV<String, Snapshot>> expand(PCollection<FileWriteResult> writtenFiles) {

// Apply any sharded writes and flatten everything for catalog updates
return writtenFiles
.apply(
"Key metadata updates by table",
WithKeys.of(
new SerializableFunction<FileWriteResult, String>() {
@Override
public String apply(FileWriteResult input) {
return input.getTableIdentifier().toString();
}
}))
.apply("Group metadata updates by table", GroupByKey.create())
.apply(
"Append metadata updates to tables",
ParDo.of(new AppendFilesToTablesDoFn(catalogConfig)))
.setCoder(KvCoder.of(StringUtf8Coder.of(), SerializableCoder.of(Snapshot.class)));
}

private static class AppendFilesToTablesDoFn
extends DoFn<KV<String, Iterable<FileWriteResult>>, KV<String, Snapshot>> {

private final IcebergCatalogConfig catalogConfig;

private transient @MonotonicNonNull Catalog catalog;

private AppendFilesToTablesDoFn(IcebergCatalogConfig catalogConfig) {
this.catalogConfig = catalogConfig;
}

private Catalog getCatalog() {
if (catalog == null) {
catalog = catalogConfig.catalog();
}
return catalog;
}

@ProcessElement
public void processElement(
@Element KV<String, Iterable<FileWriteResult>> element,
OutputReceiver<KV<String, Snapshot>> out,
BoundedWindow window) {
Table table = getCatalog().loadTable(TableIdentifier.parse(element.getKey()));
AppendFiles update = table.newAppend();
for (FileWriteResult writtenFile : element.getValue()) {
update.appendFile(writtenFile.getDataFile());
}
update.commit();
out.outputWithTimestamp(
KV.of(element.getKey(), table.currentSnapshot()), window.maxTimestamp());
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.io.iceberg;

import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;

/**
* Assigns the destination metadata for each input record.
*
* <p>The output record will have the format { dest: ..., data: ...} where the dest field has the
* assigned metadata and the data field has the original row.
*/
class AssignDestinations extends PTransform<PCollection<Row>, PCollection<Row>> {

private DynamicDestinations dynamicDestinations;

public AssignDestinations(DynamicDestinations dynamicDestinations) {
this.dynamicDestinations = dynamicDestinations;
}

@Override
public PCollection<Row> expand(PCollection<Row> input) {

final Schema inputSchema = input.getSchema();
final Schema outputSchema =
Schema.builder()
.addRowField("data", inputSchema)
.addRowField("dest", dynamicDestinations.getMetadataSchema())
.build();

return input
.apply(
ParDo.of(
new DoFn<Row, Row>() {
@ProcessElement
public void processElement(@Element Row data, OutputReceiver<Row> out) {
out.output(
Row.withSchema(outputSchema)
.addValues(data, dynamicDestinations.assignDestinationMetadata(data))
.build());
}
}))
.setRowSchema(outputSchema);
}
}
Loading

0 comments on commit 819e54c

Please sign in to comment.