diff --git a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming_GBK_Multiplexing.json b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming_GBK_Multiplexing.json new file mode 100644 index 000000000000..da1664d2f250 --- /dev/null +++ b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming_GBK_Multiplexing.json @@ -0,0 +1,4 @@ +{ + "comment": "Modify this file in a trivial way to cause this test suite to run", + "https://github.com/apache/beam/pull/33318": "noting that PR #33318 should run this test" +} diff --git a/.github/workflows/README.md b/.github/workflows/README.md index 206364f416f7..006a65ddacc4 100644 --- a/.github/workflows/README.md +++ b/.github/workflows/README.md @@ -336,6 +336,7 @@ PostCommit Jobs run in a schedule against master branch and generally do not get | [ PostCommit Java Tpcds Spark ](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_Tpcds_Spark.yml) | N/A |`beam_PostCommit_Java_Tpcds_Spark.json`| [![.github/workflows/beam_PostCommit_Java_Tpcds_Spark.yml](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_Tpcds_Spark.yml/badge.svg?event=schedule)](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_Tpcds_Spark.yml?query=event%3Aschedule) | | [ PostCommit Java ValidatesRunner Dataflow JavaVersions ](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_ValidatesRunner_Dataflow_JavaVersions.yml) | ['8','21'] |`beam_PostCommit_Java_ValidatesRunner_Dataflow_JavaVersions.json`| [![.github/workflows/beam_PostCommit_Java_ValidatesRunner_Dataflow_JavaVersions.yml](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_ValidatesRunner_Dataflow_JavaVersions.yml/badge.svg?event=schedule)](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_ValidatesRunner_Dataflow_JavaVersions.yml?query=event%3Aschedule) | | [ PostCommit Java ValidatesRunner Dataflow Streaming ](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming.yml) | N/A |`beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming.json`| [![.github/workflows/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming.yml](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming.yml/badge.svg?event=schedule)](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming.yml?query=event%3Aschedule) | +| [ PostCommit Java ValidatesRunner Dataflow Streaming GBK Multiplexing](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming_GBK_Multiplexing.yml) | N/A | `beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming_GBK_Multiplexing.json` | [![.github/workflows/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming_GBK_Multiplexing.yml](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming_GBK_Multiplexing.yml/badge.svg?event=schedule)](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming_GBK_Multiplexing.yml?query=event%3Aschedule) | | [ PostCommit Java ValidatesRunner Dataflow V2 Streaming ](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_ValidatesRunner_Dataflow_V2_Streaming.yml) | N/A |`beam_PostCommit_Java_ValidatesRunner_Dataflow_V2_Streaming.json`| [![.github/workflows/beam_PostCommit_Java_ValidatesRunner_Dataflow_V2_Streaming.yml](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_ValidatesRunner_Dataflow_V2_Streaming.yml/badge.svg?event=schedule)](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_ValidatesRunner_Dataflow_V2_Streaming.yml?query=event%3Aschedule) | | [ PostCommit Java ValidatesRunner Dataflow V2 ](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_ValidatesRunner_Dataflow_V2.yml) | N/A |`beam_PostCommit_Java_ValidatesRunner_Dataflow_V2.json`| [![.github/workflows/beam_PostCommit_Java_ValidatesRunner_Dataflow_V2.yml](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_ValidatesRunner_Dataflow_V2.yml/badge.svg?event=schedule)](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_ValidatesRunner_Dataflow_V2.yml?query=event%3Aschedule) | | [ PostCommit Java ValidatesRunner Dataflow ](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_ValidatesRunner_Dataflow.yml) | N/A |`beam_PostCommit_Java_ValidatesRunner_Dataflow.json`| [![.github/workflows/beam_PostCommit_Java_ValidatesRunner_Dataflow.yml](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_ValidatesRunner_Dataflow.yml/badge.svg?event=schedule)](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_ValidatesRunner_Dataflow.yml?query=event%3Aschedule) | diff --git a/.github/workflows/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming_GBK_Multiplexing.yml b/.github/workflows/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming_GBK_Multiplexing.yml new file mode 100644 index 000000000000..caf78b428389 --- /dev/null +++ b/.github/workflows/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming_GBK_Multiplexing.yml @@ -0,0 +1,97 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +name: PostCommit Java ValidatesRunner Dataflow Streaming GBK Multiplexing + +on: + schedule: + - cron: '30 4/8 * * *' + pull_request_target: + paths: ['release/trigger_all_tests.json', '.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming_GBK_Multiplexing.json'] + workflow_dispatch: + +# This allows a subsequently queued workflow run to interrupt previous runs +concurrency: + group: '${{ github.workflow }} @ ${{ github.event.issue.number || github.sha || github.head_ref || github.ref }}-${{ github.event.schedule || github.event.comment.id || github.event.sender.login }}' + cancel-in-progress: true + +#Setting explicit permissions for the action to avoid the default permissions which are `write-all` in case of pull_request_target event +permissions: + actions: write + pull-requests: write + checks: write + contents: read + deployments: read + id-token: none + issues: write + discussions: read + packages: read + pages: read + repository-projects: read + security-events: read + statuses: read + +env: + DEVELOCITY_ACCESS_KEY: ${{ secrets.GE_ACCESS_TOKEN }} + GRADLE_ENTERPRISE_CACHE_USERNAME: ${{ secrets.GE_CACHE_USERNAME }} + GRADLE_ENTERPRISE_CACHE_PASSWORD: ${{ secrets.GE_CACHE_PASSWORD }} + +jobs: + beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming_GBK_Multiplexing: + name: ${{ matrix.job_name }} (${{ matrix.job_phrase }}) + runs-on: [self-hosted, ubuntu-20.04, main] + timeout-minutes: 720 + strategy: + matrix: + job_name: [beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming_GBK_Multiplexing] + job_phrase: [Run Dataflow Streaming ValidatesRunner GBK Multiplexing] + if: | + github.event_name == 'workflow_dispatch' || + github.event_name == 'pull_request_target' || + (github.event_name == 'schedule' && github.repository == 'apache/beam') || + github.event.comment.body == 'Run Dataflow Streaming ValidatesRunner' + steps: + - uses: actions/checkout@v4 + - name: Setup repository + uses: ./.github/actions/setup-action + with: + comment_phrase: ${{ matrix.job_phrase }} + github_token: ${{ secrets.GITHUB_TOKEN }} + github_job: ${{ matrix.job_name }} (${{ matrix.job_phrase }}) + - name: Setup environment + uses: ./.github/actions/setup-environment-action + with: + java-version: default + - name: run validatesRunnerStreamingWithGbkMultiplexing script + uses: ./.github/actions/gradle-command-self-hosted-action + with: + gradle-command: :runners:google-cloud-dataflow-java:validatesRunnerStreamingWithGbkMultiplexing + max-workers: 12 + - name: Archive JUnit Test Results + uses: actions/upload-artifact@v4 + if: ${{ !success() }} + with: + name: JUnit Test Results + path: "**/build/reports/tests/" + - name: Publish JUnit Test Results + uses: EnricoMi/publish-unit-test-result-action@v2 + if: always() + with: + commit: '${{ env.prsha || env.GITHUB_SHA }}' + comment_mode: ${{ github.event_name == 'issue_comment' && 'always' || 'off' }} + files: '**/build/test-results/**/*.xml' + large_files: true \ No newline at end of file diff --git a/runners/google-cloud-dataflow-java/build.gradle b/runners/google-cloud-dataflow-java/build.gradle index aeace769b4c1..ba0d1b05e8bf 100644 --- a/runners/google-cloud-dataflow-java/build.gradle +++ b/runners/google-cloud-dataflow-java/build.gradle @@ -242,6 +242,34 @@ def createLegacyWorkerValidatesRunnerTest = { Map args -> } } +def createValidatesRunnerStreamingTest = { boolean enableGbkMultiplexing -> + def pipelineOptions = legacyPipelineOptions + ['--streaming'] + if (enableGbkMultiplexing) { + pipelineOptions = pipelineOptions + ['--experiments=enable_gbk_state_multiplexing'] + } + def name = 'validatesRunnerLegacyWorkerTestStreaming' + if (enableGbkMultiplexing) { + name = 'validatesRunnerLegacyWorkerTestStreamingGbkMultiplexing' + } + return createLegacyWorkerValidatesRunnerTest( + name: name, + pipelineOptions: pipelineOptions, + excludedCategories: [ + 'org.apache.beam.sdk.testing.UsesCommittedMetrics', + 'org.apache.beam.sdk.testing.UsesMapState', + 'org.apache.beam.sdk.testing.UsesRequiresTimeSortedInput', + 'org.apache.beam.sdk.testing.UsesSetState', + ], + excludedTests: [ + // TODO(https://github.com/apache/beam/issues/21472) + 'org.apache.beam.sdk.transforms.GroupByKeyTest$BasicTests.testAfterProcessingTimeContinuationTriggerUsingState', + // GroupIntoBatches.withShardedKey not supported on streaming runner v1 + // https://github.com/apache/beam/issues/22592 + 'org.apache.beam.sdk.transforms.GroupIntoBatchesTest.testWithShardedKeyInGlobalWindow', + ] + ) +} + def createRunnerV2ValidatesRunnerTest = { Map args -> def name = args.name def pipelineOptions = args.pipelineOptions ?: runnerV2PipelineOptions @@ -460,24 +488,13 @@ task validatesRunner { task validatesRunnerStreaming { group = "Verification" description "Validates Dataflow runner forcing streaming mode" - dependsOn(createLegacyWorkerValidatesRunnerTest( - name: 'validatesRunnerLegacyWorkerTestStreaming', - pipelineOptions: legacyPipelineOptions + ['--streaming'] - + ['--experiments=enable_gbk_state_multiplexing'], - excludedCategories: [ - 'org.apache.beam.sdk.testing.UsesCommittedMetrics', - 'org.apache.beam.sdk.testing.UsesMapState', - 'org.apache.beam.sdk.testing.UsesRequiresTimeSortedInput', - 'org.apache.beam.sdk.testing.UsesSetState', - ], - excludedTests: [ - // TODO(https://github.com/apache/beam/issues/21472) - 'org.apache.beam.sdk.transforms.GroupByKeyTest$BasicTests.testAfterProcessingTimeContinuationTriggerUsingState', - // GroupIntoBatches.withShardedKey not supported on streaming runner v1 - // https://github.com/apache/beam/issues/22592 - 'org.apache.beam.sdk.transforms.GroupIntoBatchesTest.testWithShardedKeyInGlobalWindow', - ] - )) + dependsOn(createValidatesRunnerStreamingTest(/*enableGbkMultiplexing=*/false)) +} + +task validatesRunnerStreamingWithGbkMultiplexing { + group = "Verification" + description "Validates Dataflow runner forcing streaming mode" + dependsOn(createValidatesRunnerStreamingTest(/*enableGbkMultiplexing=*/true)) } def setupXVR = tasks.register("setupXVR") {