From 6ccc10b121e29fcdb6127521b2c69a784ddc85d8 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Fri, 24 May 2024 09:33:44 -0400 Subject: [PATCH 01/10] add iceberg load test, put integration test in its own suite --- .../IO_Iceberg_Integration_Tests.yml | 81 +++++++ .../IO_Iceberg_Performance_Tests.yml | 81 +++++++ .github/workflows/IO_Iceberg_Unit_Tests.yml | 2 +- sdks/java/io/iceberg/build.gradle | 24 +- .../beam/sdk/io/iceberg/IcebergIOLT.java | 210 ++++++++++++++++++ 5 files changed, 396 insertions(+), 2 deletions(-) create mode 100644 .github/workflows/IO_Iceberg_Integration_Tests.yml create mode 100644 .github/workflows/IO_Iceberg_Performance_Tests.yml create mode 100644 sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOLT.java diff --git a/.github/workflows/IO_Iceberg_Integration_Tests.yml b/.github/workflows/IO_Iceberg_Integration_Tests.yml new file mode 100644 index 000000000000..58f9ee9adb9e --- /dev/null +++ b/.github/workflows/IO_Iceberg_Integration_Tests.yml @@ -0,0 +1,81 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +name: IcebergIO Integration Tests + +on: + schedule: + - cron: '15 4/6 * * *' + workflow_dispatch: + +#Setting explicit permissions for the action to avoid the default permissions which are `write-all` in case of pull_request_target event +permissions: + actions: write + pull-requests: write + checks: write + contents: read + deployments: read + id-token: none + issues: write + discussions: read + packages: read + pages: read + repository-projects: read + security-events: read + statuses: read + +# This allows a subsequently queued workflow run to interrupt previous runs +concurrency: + group: '${{ github.workflow }} @ ${{ github.event.issue.number || github.sha || github.head_ref || github.ref }}-${{ github.event.schedule || github.event.comment.id || github.event.sender.login }}' + cancel-in-progress: true + +env: + GRADLE_ENTERPRISE_ACCESS_KEY: ${{ secrets.GE_ACCESS_TOKEN }} + GRADLE_ENTERPRISE_CACHE_USERNAME: ${{ secrets.GE_CACHE_USERNAME }} + GRADLE_ENTERPRISE_CACHE_PASSWORD: ${{ secrets.GE_CACHE_PASSWORD }} + +jobs: + IO_Iceberg_Integration_Tests: + if: | + github.event_name == 'workflow_dispatch' || + (github.event_name == 'schedule' && github.repository == 'apache/beam') || + github.event.comment.body == 'Run IcebergIO Integration Test' + runs-on: [self-hosted, ubuntu-20.04, main] + timeout-minutes: 120 + name: ${{ matrix.job_name }} (${{ matrix.job_phrase }}) + strategy: + matrix: + job_name: ["IO_Iceberg_Integration_Tests"] + job_phrase: ["Run IcebergIO Integration Test"] + steps: + - uses: actions/checkout@v4 + - name: Setup repository + uses: ./.github/actions/setup-action + with: + comment_phrase: ${{ matrix.job_phrase }} + github_token: ${{ secrets.GITHUB_TOKEN }} + github_job: ${{ matrix.job_name }} (${{ matrix.job_phrase }}) + - name: Setup environment + uses: ./.github/actions/setup-environment-action + - name: Authenticate on GCP + id: auth + uses: google-github-actions/auth@v1 + with: + credentials_json: ${{ secrets.GCP_SA_KEY }} + project_id: ${{ secrets.GCP_PROJECT_ID }} + - name: Run IcebergIO Integration Test + uses: ./.github/actions/gradle-command-self-hosted-action + with: + gradle-command: :sdks:java:io:iceberg:integrationTest \ No newline at end of file diff --git a/.github/workflows/IO_Iceberg_Performance_Tests.yml b/.github/workflows/IO_Iceberg_Performance_Tests.yml new file mode 100644 index 000000000000..27d93b8a40d4 --- /dev/null +++ b/.github/workflows/IO_Iceberg_Performance_Tests.yml @@ -0,0 +1,81 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +name: IcebergIO Performance Tests + +on: + schedule: + - cron: '10 10/12 * * *' + workflow_dispatch: + +#Setting explicit permissions for the action to avoid the default permissions which are `write-all` in case of pull_request_target event +permissions: + actions: write + pull-requests: write + checks: write + contents: read + deployments: read + id-token: none + issues: write + discussions: read + packages: read + pages: read + repository-projects: read + security-events: read + statuses: read + +# This allows a subsequently queued workflow run to interrupt previous runs +concurrency: + group: '${{ github.workflow }} @ ${{ github.event.issue.number || github.sha || github.head_ref || github.ref }}-${{ github.event.schedule || github.event.comment.id || github.event.sender.login }}' + cancel-in-progress: true + +env: + GRADLE_ENTERPRISE_ACCESS_KEY: ${{ secrets.GE_ACCESS_TOKEN }} + GRADLE_ENTERPRISE_CACHE_USERNAME: ${{ secrets.GE_CACHE_USERNAME }} + GRADLE_ENTERPRISE_CACHE_PASSWORD: ${{ secrets.GE_CACHE_PASSWORD }} + +jobs: + IO_Iceberg_Performance_Tests: + if: | + github.event_name == 'workflow_dispatch' || + (github.event_name == 'schedule' && github.repository == 'apache/beam') || + github.event.comment.body == 'Run IcebergIO Performance Test' + runs-on: [self-hosted, ubuntu-20.04, main] + timeout-minutes: 120 + name: ${{ matrix.job_name }} (${{ matrix.job_phrase }}) + strategy: + matrix: + job_name: ["IO_Iceberg_Performance_Tests"] + job_phrase: ["Run IcebergIO Performance Test"] + steps: + - uses: actions/checkout@v4 + - name: Setup repository + uses: ./.github/actions/setup-action + with: + comment_phrase: ${{ matrix.job_phrase }} + github_token: ${{ secrets.GITHUB_TOKEN }} + github_job: ${{ matrix.job_name }} (${{ matrix.job_phrase }}) + - name: Setup environment + uses: ./.github/actions/setup-environment-action + - name: Authenticate on GCP + id: auth + uses: google-github-actions/auth@v1 + with: + credentials_json: ${{ secrets.GCP_SA_KEY }} + project_id: ${{ secrets.GCP_PROJECT_ID }} + - name: Run IcebergIO Performance Test + uses: ./.github/actions/gradle-command-self-hosted-action + with: + gradle-command: :sdks:java:io:iceberg:loadTest \ No newline at end of file diff --git a/.github/workflows/IO_Iceberg_Unit_Tests.yml b/.github/workflows/IO_Iceberg_Unit_Tests.yml index 33b703ee59b9..c6c556322dfd 100644 --- a/.github/workflows/IO_Iceberg_Unit_Tests.yml +++ b/.github/workflows/IO_Iceberg_Unit_Tests.yml @@ -94,7 +94,7 @@ jobs: - name: run IcebergIO build script uses: ./.github/actions/gradle-command-self-hosted-action with: - gradle-command: :sdks:java:io:iceberg:build :sdks:java:io:iceberg:integrationTest + gradle-command: :sdks:java:io:iceberg:build arguments: | -PdisableSpotlessCheck=true \ -PdisableCheckStyle=true \ diff --git a/sdks/java/io/iceberg/build.gradle b/sdks/java/io/iceberg/build.gradle index 6f1c885a7cc2..765fa4948752 100644 --- a/sdks/java/io/iceberg/build.gradle +++ b/sdks/java/io/iceberg/build.gradle @@ -66,6 +66,7 @@ dependencies { testImplementation library.java.junit testRuntimeOnly library.java.slf4j_jdk14 testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadow") + testRuntimeOnly project(path: ":runners:google-cloud-dataflow-java") hadoopVersions.each {kv -> "hadoopVersion$kv.key" "org.apache.hadoop:hadoop-client:$kv.value" } @@ -95,7 +96,7 @@ hadoopVersions.each { kv -> } } -task integrationTest(type: Test, dependsOn: processTestResources) { +task integrationTest(type: Test) { group = "Verification" def gcpProject = project.findProperty('gcpProject') ?: 'apache-beam-testing' def gcpTempLocation = project.findProperty('gcpTempLocation') ?: 'gs://temp-storage-for-end-to-end-tests' @@ -113,3 +114,24 @@ task integrationTest(type: Test, dependsOn: processTestResources) { classpath = sourceSets.test.runtimeClasspath testClassesDirs = sourceSets.test.output.classesDirs } + +task loadTest(type: Test) { + def gcpProject = project.findProperty('gcpProject') ?: 'apache-beam-testing' + def gcpTempLocation = project.findProperty('gcpTempLocation') ?: 'gs://temp-storage-for-end-to-end-tests/temp-lt' + systemProperty "beamTestPipelineOptions", JsonOutput.toJson([ + "--project=${gcpProject}", + "--tempLocation=${gcpTempLocation}", + "--testSize=large", + "--runner=DataflowRunner", + "--region=us-central1" + ]) + + // Disable Gradle cache: these ITs interact with live service that should always be considered "out of date" + outputs.upToDateWhen { false } + + include '**/*LT.class' + + maxParallelForks 4 + classpath = sourceSets.test.runtimeClasspath + testClassesDirs = sourceSets.test.output.classesDirs +} diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOLT.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOLT.java new file mode 100644 index 000000000000..2d8e5becd323 --- /dev/null +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOLT.java @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.iceberg; + +import com.google.auto.value.AutoValue; +import java.io.Serializable; +import java.util.Map; +import java.util.UUID; +import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; +import org.apache.beam.sdk.io.GenerateSequence; +import org.apache.beam.sdk.managed.Managed; +import org.apache.beam.sdk.options.Default; +import org.apache.beam.sdk.options.Description; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.*; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionRowTuple; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.TypeDescriptors; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.Schema; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.hadoop.HadoopCatalog; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class IcebergIOLT implements Serializable { + private static final Map TEST_CONFIGS = + // num rows --- num fields --- byte size per field + ImmutableMap.of( + "local", // 1K rows, >50 KB + TestConfiguration.of(1_000L, 1, 50), + "small", // 100K rows, >50 MB + TestConfiguration.of(100_000L, 5, 100), + "medium", // 10M rows, >10 GB + TestConfiguration.of(10_000_000L, 10, 100), + "large", // 1B rows, >2 TB + TestConfiguration.of(1_000_000_000L, 20, 100)); + + /** Options for Iceberg IO load test. */ + @AutoValue + abstract static class TestConfiguration { + /** Number of rows to generate */ + abstract Long numRows(); + + /** Data shape: The number of fields per row. */ + abstract Integer numFields(); + + /** Data shape: The byte-size for each field. */ + abstract Integer byteSizePerField(); + + static TestConfiguration of(Long numRows, Integer numFields, Integer byteSizePerField) { + return new AutoValue_IcebergIOLT_TestConfiguration(numRows, numFields, byteSizePerField); + } + } + + public interface IcebergLoadTestPipelineOptions extends GcpOptions { + @Description( + "Size of the data written/read. Possible values are: ['local', 'small', 'medium', 'large'].") + @Default.String("local") + String getTestSize(); + + void setTestSize(String testSize); + } + + @Rule public TestPipeline writePipeline = TestPipeline.create(); + + @Rule public TestPipeline readPipeline = TestPipeline.create(); + + static IcebergLoadTestPipelineOptions options; + + @Rule public TestName testName = new TestName(); + + private String warehouseLocation; + + private TableIdentifier tableId; + + private static TestConfiguration testConfiguration; + + static org.apache.beam.sdk.schemas.Schema beamSchema; + static Schema icebergSchema; + + @BeforeClass + public static void beforeClass() { + PipelineOptionsFactory.register(IcebergLoadTestPipelineOptions.class); + options = TestPipeline.testingPipelineOptions().as(IcebergLoadTestPipelineOptions.class); + + testConfiguration = TEST_CONFIGS.get(options.getTestSize()); + + org.apache.beam.sdk.schemas.Schema.Builder builder = + org.apache.beam.sdk.schemas.Schema.builder(); + for (int i = 0; i < testConfiguration.numFields(); i++) { + builder = builder.addByteArrayField("bytes_" + i); + } + beamSchema = builder.build(); + icebergSchema = SchemaAndRowConversions.beamSchemaToIcebergSchema(beamSchema); + } + + @Before + public void setUp() { + warehouseLocation = + String.format( + "%s/IcebergIOLT/%s/%s", + options.getTempLocation(), testName.getMethodName(), UUID.randomUUID()); + + tableId = + TableIdentifier.of( + testName.getMethodName(), "table" + Long.toString(UUID.randomUUID().hashCode(), 16)); + } + + @Test + public void testReadWriteHadoopType() { + Configuration catalogHadoopConf = new Configuration(); + catalogHadoopConf.set("fs.gs.project.id", options.getProject()); + catalogHadoopConf.set("fs.gs.auth.type", "SERVICE_ACCOUNT_JSON_KEYFILE"); + catalogHadoopConf.set( + "fs.gs.auth.service.account.json.keyfile", System.getenv("GOOGLE_APPLICATION_CREDENTIALS")); + + Catalog catalog = new HadoopCatalog(catalogHadoopConf, warehouseLocation); + catalog.createTable(tableId, icebergSchema); + + Map config = + ImmutableMap.builder() + .put("table", tableId.toString()) + .put( + "catalog_config", + ImmutableMap.builder() + .put("catalog_name", "hadoop") + .put("catalog_type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP) + .put("warehouse_location", warehouseLocation) + .build()) + .build(); + + PCollection source = + writePipeline.apply(GenerateSequence.from(0).to(testConfiguration.numRows())); + + PCollection inputRows = + source + .apply( + "Create Rows", + MapElements.into(TypeDescriptors.rows()) + .via( + new GenerateRow( + testConfiguration.numFields(), + testConfiguration.byteSizePerField(), + beamSchema))) + .setRowSchema(beamSchema); + + PCollectionRowTuple.of("input", inputRows) + .apply(Managed.write(Managed.ICEBERG).withConfig(config)); + writePipeline.run().waitUntilFinish(); + + // read pipeline + PCollection countRows = + PCollectionRowTuple.empty(readPipeline) + .apply(Managed.read(Managed.ICEBERG).withConfig(config)) + .get("output") + .apply(Count.globally()); + + PAssert.thatSingleton(countRows).isEqualTo(testConfiguration.numRows()); + readPipeline.run().waitUntilFinish(); + } + + private static class GenerateRow implements SerializableFunction { + final int numFields; + final int byteSizePerField; + final org.apache.beam.sdk.schemas.Schema schema; + + GenerateRow(int numFields, int byteSizePerField, org.apache.beam.sdk.schemas.Schema schema) { + this.numFields = numFields; + this.byteSizePerField = byteSizePerField; + this.schema = schema; + } + + @Override + public Row apply(Long input) { + Row.Builder rowBuilder = Row.withSchema(schema); + for (int i = 0; i < numFields; i++) { + rowBuilder = rowBuilder.addValue(new byte[byteSizePerField]); + } + return rowBuilder.build(); + } + } +} From 6a21980b657d457c19789c65283b573f37e4bae6 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Fri, 24 May 2024 09:41:11 -0400 Subject: [PATCH 02/10] spotless --- .../java/org/apache/beam/sdk/io/iceberg/IcebergIOLT.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOLT.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOLT.java index 2d8e5becd323..08c7627d198a 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOLT.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOLT.java @@ -29,7 +29,9 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.transforms.*; +import org.apache.beam.sdk.transforms.Count; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionRowTuple; import org.apache.beam.sdk.values.Row; @@ -66,7 +68,7 @@ public class IcebergIOLT implements Serializable { /** Options for Iceberg IO load test. */ @AutoValue abstract static class TestConfiguration { - /** Number of rows to generate */ + /** Number of rows to generate. */ abstract Long numRows(); /** Data shape: The number of fields per row. */ From ba1f07479144832b6ad9b2caa6eadb03c69e56d3 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Fri, 24 May 2024 09:59:11 -0400 Subject: [PATCH 03/10] small docstring --- .../test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java | 3 +++ .../test/java/org/apache/beam/sdk/io/iceberg/IcebergIOLT.java | 3 +++ 2 files changed, 6 insertions(+) diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java index c51ec5b8285e..e5607fec321d 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java @@ -70,6 +70,9 @@ import org.junit.runner.RunWith; import org.junit.runners.JUnit4; +/** + * Integration tests for {@link IcebergIO} source and sink. + */ @RunWith(JUnit4.class) public class IcebergIOIT implements Serializable { diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOLT.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOLT.java index 08c7627d198a..e01ebff567a3 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOLT.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOLT.java @@ -51,6 +51,9 @@ import org.junit.runner.RunWith; import org.junit.runners.JUnit4; +/** + * Load tests for {@link IcebergIO} source and sink. + */ @RunWith(JUnit4.class) public class IcebergIOLT implements Serializable { private static final Map TEST_CONFIGS = From 46c14b0123f65f2eee464a580de86267f293f21e Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Fri, 24 May 2024 16:37:06 -0400 Subject: [PATCH 04/10] ignore --- .github/workflows/IO_Iceberg_Integration_Tests.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/IO_Iceberg_Integration_Tests.yml b/.github/workflows/IO_Iceberg_Integration_Tests.yml index 58f9ee9adb9e..ab055c3a6094 100644 --- a/.github/workflows/IO_Iceberg_Integration_Tests.yml +++ b/.github/workflows/IO_Iceberg_Integration_Tests.yml @@ -20,7 +20,7 @@ on: - cron: '15 4/6 * * *' workflow_dispatch: -#Setting explicit permissions for the action to avoid the default permissions which are `write-all` in case of pull_request_target event +# Setting explicit permissions for the action to avoid the default permissions which are `write-all` in case of pull_request_target event permissions: actions: write pull-requests: write From b1208ece23c531fd44cd671caedb9ff3f8b895d6 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Fri, 24 May 2024 17:04:00 -0400 Subject: [PATCH 05/10] spotless --- .../test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java | 4 +--- .../test/java/org/apache/beam/sdk/io/iceberg/IcebergIOLT.java | 4 +--- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java index e5607fec321d..539bfbaacc8a 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java @@ -70,9 +70,7 @@ import org.junit.runner.RunWith; import org.junit.runners.JUnit4; -/** - * Integration tests for {@link IcebergIO} source and sink. - */ +/** Integration tests for {@link IcebergIO} source and sink. */ @RunWith(JUnit4.class) public class IcebergIOIT implements Serializable { diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOLT.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOLT.java index e01ebff567a3..4d0aafb6498e 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOLT.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOLT.java @@ -51,9 +51,7 @@ import org.junit.runner.RunWith; import org.junit.runners.JUnit4; -/** - * Load tests for {@link IcebergIO} source and sink. - */ +/** Load tests for {@link IcebergIO} source and sink. */ @RunWith(JUnit4.class) public class IcebergIOLT implements Serializable { private static final Map TEST_CONFIGS = From 9b72f9e3bf9ad0d9328652889b7397956dc6b6fe Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Tue, 28 May 2024 16:07:29 -0400 Subject: [PATCH 06/10] trigger tests --- .github/trigger_files/IO_Iceberg_Integration_Tests.json | 4 ++++ .github/trigger_files/IO_Iceberg_Performance_Tests.json | 4 ++++ .github/trigger_files/IO_Iceberg_Unit_Tests.json | 2 +- .github/workflows/IO_Iceberg_Integration_Tests.yml | 2 ++ .github/workflows/IO_Iceberg_Performance_Tests.yml | 2 ++ 5 files changed, 13 insertions(+), 1 deletion(-) create mode 100644 .github/trigger_files/IO_Iceberg_Integration_Tests.json create mode 100644 .github/trigger_files/IO_Iceberg_Performance_Tests.json diff --git a/.github/trigger_files/IO_Iceberg_Integration_Tests.json b/.github/trigger_files/IO_Iceberg_Integration_Tests.json new file mode 100644 index 000000000000..920c8d132e4a --- /dev/null +++ b/.github/trigger_files/IO_Iceberg_Integration_Tests.json @@ -0,0 +1,4 @@ +{ + "comment": "Modify this file in a trivial way to cause this test suite to run", + "modification": 1 +} \ No newline at end of file diff --git a/.github/trigger_files/IO_Iceberg_Performance_Tests.json b/.github/trigger_files/IO_Iceberg_Performance_Tests.json new file mode 100644 index 000000000000..920c8d132e4a --- /dev/null +++ b/.github/trigger_files/IO_Iceberg_Performance_Tests.json @@ -0,0 +1,4 @@ +{ + "comment": "Modify this file in a trivial way to cause this test suite to run", + "modification": 1 +} \ No newline at end of file diff --git a/.github/trigger_files/IO_Iceberg_Unit_Tests.json b/.github/trigger_files/IO_Iceberg_Unit_Tests.json index b26833333238..e3d6056a5de9 100644 --- a/.github/trigger_files/IO_Iceberg_Unit_Tests.json +++ b/.github/trigger_files/IO_Iceberg_Unit_Tests.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run", - "modification": 2 + "modification": 1 } diff --git a/.github/workflows/IO_Iceberg_Integration_Tests.yml b/.github/workflows/IO_Iceberg_Integration_Tests.yml index ab055c3a6094..006cd9d13bee 100644 --- a/.github/workflows/IO_Iceberg_Integration_Tests.yml +++ b/.github/workflows/IO_Iceberg_Integration_Tests.yml @@ -18,6 +18,8 @@ name: IcebergIO Integration Tests on: schedule: - cron: '15 4/6 * * *' + pull_request_target: + paths: [ 'release/trigger_all_tests.json', '.github/trigger_files/IO_Iceberg_Integration_Tests.json' ] workflow_dispatch: # Setting explicit permissions for the action to avoid the default permissions which are `write-all` in case of pull_request_target event diff --git a/.github/workflows/IO_Iceberg_Performance_Tests.yml b/.github/workflows/IO_Iceberg_Performance_Tests.yml index 27d93b8a40d4..e9920a5a1386 100644 --- a/.github/workflows/IO_Iceberg_Performance_Tests.yml +++ b/.github/workflows/IO_Iceberg_Performance_Tests.yml @@ -18,6 +18,8 @@ name: IcebergIO Performance Tests on: schedule: - cron: '10 10/12 * * *' + pull_request_target: + paths: [ '.github/trigger_files/IO_Iceberg_Performance_Tests.json' ] workflow_dispatch: #Setting explicit permissions for the action to avoid the default permissions which are `write-all` in case of pull_request_target event From 0d729d71e373f56c09aaaaa9f488ea567438f2eb Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Tue, 28 May 2024 16:28:54 -0400 Subject: [PATCH 07/10] trigger --- .github/trigger_files/IO_Iceberg_Integration_Tests.json | 2 +- .github/trigger_files/IO_Iceberg_Performance_Tests.json | 2 +- .github/workflows/IO_Iceberg_Performance_Tests.yml | 1 + 3 files changed, 3 insertions(+), 2 deletions(-) diff --git a/.github/trigger_files/IO_Iceberg_Integration_Tests.json b/.github/trigger_files/IO_Iceberg_Integration_Tests.json index 920c8d132e4a..8784d0786c02 100644 --- a/.github/trigger_files/IO_Iceberg_Integration_Tests.json +++ b/.github/trigger_files/IO_Iceberg_Integration_Tests.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run", - "modification": 1 + "modification": 2 } \ No newline at end of file diff --git a/.github/trigger_files/IO_Iceberg_Performance_Tests.json b/.github/trigger_files/IO_Iceberg_Performance_Tests.json index 920c8d132e4a..8784d0786c02 100644 --- a/.github/trigger_files/IO_Iceberg_Performance_Tests.json +++ b/.github/trigger_files/IO_Iceberg_Performance_Tests.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run", - "modification": 1 + "modification": 2 } \ No newline at end of file diff --git a/.github/workflows/IO_Iceberg_Performance_Tests.yml b/.github/workflows/IO_Iceberg_Performance_Tests.yml index e9920a5a1386..20b2f3d40e31 100644 --- a/.github/workflows/IO_Iceberg_Performance_Tests.yml +++ b/.github/workflows/IO_Iceberg_Performance_Tests.yml @@ -51,6 +51,7 @@ env: jobs: IO_Iceberg_Performance_Tests: if: | + github.event_name == 'pull_request_target' || github.event_name == 'workflow_dispatch' || (github.event_name == 'schedule' && github.repository == 'apache/beam') || github.event.comment.body == 'Run IcebergIO Performance Test' From 1c5b4f00c3e3dfaa6f27bb35dad98aff2185bec5 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Tue, 28 May 2024 16:30:33 -0400 Subject: [PATCH 08/10] trigger on PR target --- .github/trigger_files/IO_Iceberg_Integration_Tests.json | 2 +- .github/trigger_files/IO_Iceberg_Performance_Tests.json | 2 +- .github/workflows/IO_Iceberg_Integration_Tests.yml | 1 + 3 files changed, 3 insertions(+), 2 deletions(-) diff --git a/.github/trigger_files/IO_Iceberg_Integration_Tests.json b/.github/trigger_files/IO_Iceberg_Integration_Tests.json index 8784d0786c02..920c8d132e4a 100644 --- a/.github/trigger_files/IO_Iceberg_Integration_Tests.json +++ b/.github/trigger_files/IO_Iceberg_Integration_Tests.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run", - "modification": 2 + "modification": 1 } \ No newline at end of file diff --git a/.github/trigger_files/IO_Iceberg_Performance_Tests.json b/.github/trigger_files/IO_Iceberg_Performance_Tests.json index 8784d0786c02..920c8d132e4a 100644 --- a/.github/trigger_files/IO_Iceberg_Performance_Tests.json +++ b/.github/trigger_files/IO_Iceberg_Performance_Tests.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run", - "modification": 2 + "modification": 1 } \ No newline at end of file diff --git a/.github/workflows/IO_Iceberg_Integration_Tests.yml b/.github/workflows/IO_Iceberg_Integration_Tests.yml index 006cd9d13bee..8b2ddfae72ec 100644 --- a/.github/workflows/IO_Iceberg_Integration_Tests.yml +++ b/.github/workflows/IO_Iceberg_Integration_Tests.yml @@ -51,6 +51,7 @@ env: jobs: IO_Iceberg_Integration_Tests: if: | + github.event_name == 'pull_request_target' || github.event_name == 'workflow_dispatch' || (github.event_name == 'schedule' && github.repository == 'apache/beam') || github.event.comment.body == 'Run IcebergIO Integration Test' From 1480ea552d95485a7955cc1b06f8a5cbbcd196a1 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Mon, 17 Jun 2024 20:45:10 -0400 Subject: [PATCH 09/10] addressing comments --- .../beam/sdk/io/iceberg/IcebergIOLT.java | 16 ++++++---- sdks/python/temp.py | 29 +++++++++++++++++++ 2 files changed, 39 insertions(+), 6 deletions(-) create mode 100644 sdks/python/temp.py diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOLT.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOLT.java index 4d0aafb6498e..a11d77b866c1 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOLT.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOLT.java @@ -17,10 +17,14 @@ */ package org.apache.beam.sdk.io.iceberg; +import static org.junit.Assert.assertEquals; + import com.google.auto.value.AutoValue; import java.io.Serializable; import java.util.Map; import java.util.UUID; +import java.util.concurrent.ThreadLocalRandom; +import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; import org.apache.beam.sdk.io.GenerateSequence; import org.apache.beam.sdk.managed.Managed; @@ -33,7 +37,6 @@ import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionRowTuple; import org.apache.beam.sdk.values.Row; import org.apache.beam.sdk.values.TypeDescriptors; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; @@ -175,13 +178,12 @@ public void testReadWriteHadoopType() { beamSchema))) .setRowSchema(beamSchema); - PCollectionRowTuple.of("input", inputRows) - .apply(Managed.write(Managed.ICEBERG).withConfig(config)); - writePipeline.run().waitUntilFinish(); + inputRows.apply(Managed.write(Managed.ICEBERG).withConfig(config)); + assertEquals(PipelineResult.State.DONE, writePipeline.run().waitUntilFinish()); // read pipeline PCollection countRows = - PCollectionRowTuple.empty(readPipeline) + readPipeline .apply(Managed.read(Managed.ICEBERG).withConfig(config)) .get("output") .apply(Count.globally()); @@ -205,7 +207,9 @@ private static class GenerateRow implements SerializableFunction { public Row apply(Long input) { Row.Builder rowBuilder = Row.withSchema(schema); for (int i = 0; i < numFields; i++) { - rowBuilder = rowBuilder.addValue(new byte[byteSizePerField]); + byte[] value = new byte[byteSizePerField]; + ThreadLocalRandom.current().nextBytes(value); + rowBuilder = rowBuilder.addValue(value); } return rowBuilder.build(); } diff --git a/sdks/python/temp.py b/sdks/python/temp.py new file mode 100644 index 000000000000..5adb5f4ab47d --- /dev/null +++ b/sdks/python/temp.py @@ -0,0 +1,29 @@ +""" +**Note:** input and output element types need to be PCollections of Beam :py:class:`apache_beam.pvalue.Row`. +""" + +import datetime + +import pytz + +import apache_beam as beam +from apache_beam.utils.timestamp import Timestamp + + +users_schema = { + "fields": [ + {"type": "INTEGER", "name": "id", "mode": "NULLABLE"}, + {"type": "TIMESTAMP", "name": "date", "mode": "NULLABLE"}, + ] +} + +with beam.Pipeline() as p: + p | beam.Create([{"id": 1, "date": Timestamp.from_utc_datetime(datetime.datetime(2000, 9, 15, tzinfo=pytz.UTC))}, + {"id": 2, "date": Timestamp.from_utc_datetime(datetime.datetime(100, 1, 1, tzinfo=pytz.UTC))}] + ) | beam.io.WriteToBigQuery( + create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, + write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND, + schema=users_schema, + table="google.com:clouddfe:ahmedabualsaud_test.jdbc-repro2", + method=beam.io.WriteToBigQuery.Method.STORAGE_WRITE_API, + ) From 50b2488c6d8a9af39bec5705843d7484fc0787b8 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Mon, 17 Jun 2024 21:18:59 -0400 Subject: [PATCH 10/10] cleanup --- sdks/python/temp.py | 29 ----------------------------- 1 file changed, 29 deletions(-) delete mode 100644 sdks/python/temp.py diff --git a/sdks/python/temp.py b/sdks/python/temp.py deleted file mode 100644 index 5adb5f4ab47d..000000000000 --- a/sdks/python/temp.py +++ /dev/null @@ -1,29 +0,0 @@ -""" -**Note:** input and output element types need to be PCollections of Beam :py:class:`apache_beam.pvalue.Row`. -""" - -import datetime - -import pytz - -import apache_beam as beam -from apache_beam.utils.timestamp import Timestamp - - -users_schema = { - "fields": [ - {"type": "INTEGER", "name": "id", "mode": "NULLABLE"}, - {"type": "TIMESTAMP", "name": "date", "mode": "NULLABLE"}, - ] -} - -with beam.Pipeline() as p: - p | beam.Create([{"id": 1, "date": Timestamp.from_utc_datetime(datetime.datetime(2000, 9, 15, tzinfo=pytz.UTC))}, - {"id": 2, "date": Timestamp.from_utc_datetime(datetime.datetime(100, 1, 1, tzinfo=pytz.UTC))}] - ) | beam.io.WriteToBigQuery( - create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, - write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND, - schema=users_schema, - table="google.com:clouddfe:ahmedabualsaud_test.jdbc-repro2", - method=beam.io.WriteToBigQuery.Method.STORAGE_WRITE_API, - )