From 19044ec97729f1328577ac4930d79a517e713741 Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Thu, 3 Oct 2024 15:46:21 -0400 Subject: [PATCH 1/3] Touch Flink trigger files for testing addition of Flink 1.19 support --- .../setup-default-test-properties/test-properties.json | 2 +- .github/trigger_files/beam_PostCommit_Go_VR_Flink.json | 3 ++- .../beam_PostCommit_Java_Examples_Flink.json | 3 +++ .../beam_PostCommit_Java_Jpms_Flink_Java11.json | 3 +++ .../beam_PostCommit_Java_ValidatesRunner_Flink.json | 3 ++- ...beam_PostCommit_Java_ValidatesRunner_Flink_Java11.json | 3 ++- .../beam_PostCommit_Java_ValidatesRunner_Flink_Java8.json | 4 ++++ .../beam_PostCommit_Python_ValidatesRunner_Flink.json | 3 +++ .github/trigger_files/beam_PostCommit_XVR_Flink.json | 3 +++ sdks/go/examples/wasm/README.md | 2 +- sdks/python/apache_beam/options/pipeline_options.py | 2 +- sdks/typescript/src/apache_beam/runners/flink.ts | 2 +- .../www/site/content/en/documentation/runners/flink.md | 8 +++++++- 13 files changed, 33 insertions(+), 8 deletions(-) create mode 100644 .github/trigger_files/beam_PostCommit_Java_Examples_Flink.json create mode 100644 .github/trigger_files/beam_PostCommit_Java_Jpms_Flink_Java11.json create mode 100644 .github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Flink_Java8.json create mode 100644 .github/trigger_files/beam_PostCommit_XVR_Flink.json diff --git a/.github/actions/setup-default-test-properties/test-properties.json b/.github/actions/setup-default-test-properties/test-properties.json index 098e4ca1935c..efe66de8ee1e 100644 --- a/.github/actions/setup-default-test-properties/test-properties.json +++ b/.github/actions/setup-default-test-properties/test-properties.json @@ -14,7 +14,7 @@ }, "JavaTestProperties": { "SUPPORTED_VERSIONS": ["8", "11", "17", "21"], - "FLINK_VERSIONS": ["1.15", "1.16", "1.17", "1.18"], + "FLINK_VERSIONS": ["1.15", "1.16", "1.17", "1.18", "1.19"], "SPARK_VERSIONS": ["2", "3"] }, "GoTestProperties": { diff --git a/.github/trigger_files/beam_PostCommit_Go_VR_Flink.json b/.github/trigger_files/beam_PostCommit_Go_VR_Flink.json index e3d6056a5de9..b98aece75634 100644 --- a/.github/trigger_files/beam_PostCommit_Go_VR_Flink.json +++ b/.github/trigger_files/beam_PostCommit_Go_VR_Flink.json @@ -1,4 +1,5 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run", - "modification": 1 + "modification": 1, + "https://github.com/apache/beam/pull/32648": "testing addition of Flink 1.19 support" } diff --git a/.github/trigger_files/beam_PostCommit_Java_Examples_Flink.json b/.github/trigger_files/beam_PostCommit_Java_Examples_Flink.json new file mode 100644 index 000000000000..dd9afb90e638 --- /dev/null +++ b/.github/trigger_files/beam_PostCommit_Java_Examples_Flink.json @@ -0,0 +1,3 @@ +{ + "https://github.com/apache/beam/pull/32648": "testing flink 1.19 support" +} diff --git a/.github/trigger_files/beam_PostCommit_Java_Jpms_Flink_Java11.json b/.github/trigger_files/beam_PostCommit_Java_Jpms_Flink_Java11.json new file mode 100644 index 000000000000..dd9afb90e638 --- /dev/null +++ b/.github/trigger_files/beam_PostCommit_Java_Jpms_Flink_Java11.json @@ -0,0 +1,3 @@ +{ + "https://github.com/apache/beam/pull/32648": "testing flink 1.19 support" +} diff --git a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Flink.json b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Flink.json index b970762c8397..9200c368abbe 100644 --- a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Flink.json +++ b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Flink.json @@ -1,4 +1,5 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run", - "https://github.com/apache/beam/pull/31156": "noting that PR #31156 should run this test" + "https://github.com/apache/beam/pull/31156": "noting that PR #31156 should run this test", + "https://github.com/apache/beam/pull/32648": "testing addition of Flink 1.19 support" } diff --git a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Flink_Java11.json b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Flink_Java11.json index b970762c8397..9200c368abbe 100644 --- a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Flink_Java11.json +++ b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Flink_Java11.json @@ -1,4 +1,5 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run", - "https://github.com/apache/beam/pull/31156": "noting that PR #31156 should run this test" + "https://github.com/apache/beam/pull/31156": "noting that PR #31156 should run this test", + "https://github.com/apache/beam/pull/32648": "testing addition of Flink 1.19 support" } diff --git a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Flink_Java8.json b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Flink_Java8.json new file mode 100644 index 000000000000..b07a3c47e196 --- /dev/null +++ b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Flink_Java8.json @@ -0,0 +1,4 @@ +{ + + "https://github.com/apache/beam/pull/32648": "testing addition of Flink 1.19 support" +} diff --git a/.github/trigger_files/beam_PostCommit_Python_ValidatesRunner_Flink.json b/.github/trigger_files/beam_PostCommit_Python_ValidatesRunner_Flink.json index e69de29bb2d1..0b34d452d42c 100644 --- a/.github/trigger_files/beam_PostCommit_Python_ValidatesRunner_Flink.json +++ b/.github/trigger_files/beam_PostCommit_Python_ValidatesRunner_Flink.json @@ -0,0 +1,3 @@ +{ + "https://github.com/apache/beam/pull/32648": "testing addition of Flink 1.19 support" +} diff --git a/.github/trigger_files/beam_PostCommit_XVR_Flink.json b/.github/trigger_files/beam_PostCommit_XVR_Flink.json new file mode 100644 index 000000000000..0b34d452d42c --- /dev/null +++ b/.github/trigger_files/beam_PostCommit_XVR_Flink.json @@ -0,0 +1,3 @@ +{ + "https://github.com/apache/beam/pull/32648": "testing addition of Flink 1.19 support" +} diff --git a/sdks/go/examples/wasm/README.md b/sdks/go/examples/wasm/README.md index 84d30a3c6a63..a78649134305 100644 --- a/sdks/go/examples/wasm/README.md +++ b/sdks/go/examples/wasm/README.md @@ -68,7 +68,7 @@ cd $BEAM_HOME Expected output should include the following, from which you acquire the latest flink runner version. ```shell -'flink_versions: 1.15,1.16,1.17,1.18' +'flink_versions: 1.15,1.16,1.17,1.18,1.19' ``` #### 2. Set to the latest flink runner version i.e. 1.16 diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py index 4497ab0993a4..837dc0f5439f 100644 --- a/sdks/python/apache_beam/options/pipeline_options.py +++ b/sdks/python/apache_beam/options/pipeline_options.py @@ -1679,7 +1679,7 @@ def _add_argparse_args(cls, parser): class FlinkRunnerOptions(PipelineOptions): # These should stay in sync with gradle.properties. - PUBLISHED_FLINK_VERSIONS = ['1.15', '1.16', '1.17', '1.18'] + PUBLISHED_FLINK_VERSIONS = ['1.15', '1.16', '1.17', '1.18', '1.19'] @classmethod def _add_argparse_args(cls, parser): diff --git a/sdks/typescript/src/apache_beam/runners/flink.ts b/sdks/typescript/src/apache_beam/runners/flink.ts index ad4339b431f5..e21876c0d517 100644 --- a/sdks/typescript/src/apache_beam/runners/flink.ts +++ b/sdks/typescript/src/apache_beam/runners/flink.ts @@ -28,7 +28,7 @@ import { JavaJarService } from "../utils/service"; const MAGIC_HOST_NAMES = ["[local]", "[auto]"]; // These should stay in sync with gradle.properties. -const PUBLISHED_FLINK_VERSIONS = ["1.15", "1.16", "1.17", "1.18"]; +const PUBLISHED_FLINK_VERSIONS = ["1.15", "1.16", "1.17", "1.18", "1.19"]; const defaultOptions = { flinkMaster: "[local]", diff --git a/website/www/site/content/en/documentation/runners/flink.md b/website/www/site/content/en/documentation/runners/flink.md index 7325c480955c..2c28aa7062ec 100644 --- a/website/www/site/content/en/documentation/runners/flink.md +++ b/website/www/site/content/en/documentation/runners/flink.md @@ -93,7 +93,7 @@ from the [compatibility table](#flink-version-compatibility) below. For example: {{< highlight java >}} org.apache.beam - beam-runners-flink-1.17 + beam-runners-flink-1.19 {{< param release_latest >}} {{< /highlight >}} @@ -200,6 +200,7 @@ Starting with Beam 2.18.0, pre-built Flink Job Service Docker images are availab [Flink 1.16](https://hub.docker.com/r/apache/beam_flink1.16_job_server). [Flink 1.17](https://hub.docker.com/r/apache/beam_flink1.17_job_server). [Flink 1.18](https://hub.docker.com/r/apache/beam_flink1.18_job_server). +[Flink 1.19](https://hub.docker.com/r/apache/beam_flink1.19_job_server). {{< /paragraph >}} @@ -326,6 +327,11 @@ To find out which version of Flink is compatible with Beam please see the table Artifact Id Supported Beam Versions + + 1.19.x + beam-runners-flink-1.19 + ≥ 2.61.0 + 1.18.x beam-runners-flink-1.18 From aa74cc0b413a4b7cfe8f95e02c9c3e144168ced8 Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Wed, 16 Oct 2024 13:23:03 -0400 Subject: [PATCH 2/3] Pin snakeyaml-engine to 2.6 so it is not downgraded by transitive deps --- .../main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy | 2 ++ 1 file changed, 2 insertions(+) diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index a7e129211757..dd3b129e6c34 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -637,6 +637,7 @@ class BeamModulePlugin implements Plugin { def sbe_tool_version = "1.25.1" def singlestore_jdbc_version = "1.1.4" def slf4j_version = "1.7.30" + def snakeyaml_engine_version = "2.6" def snakeyaml_version = "2.2" def solace_version = "10.21.0" def spark2_version = "2.4.8" @@ -870,6 +871,7 @@ class BeamModulePlugin implements Plugin { singlestore_jdbc : "com.singlestore:singlestore-jdbc-client:$singlestore_jdbc_version", slf4j_api : "org.slf4j:slf4j-api:$slf4j_version", snake_yaml : "org.yaml:snakeyaml:$snakeyaml_version", + snakeyaml_engine : "org.snakeyaml:snakeyaml-engine:$snakeyaml_engine_version", slf4j_android : "org.slf4j:slf4j-android:$slf4j_version", slf4j_ext : "org.slf4j:slf4j-ext:$slf4j_version", slf4j_jdk14 : "org.slf4j:slf4j-jdk14:$slf4j_version", From 39789223b7da5cf0fc19e191595aad6a20839bab Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Thu, 3 Oct 2024 13:58:38 -0400 Subject: [PATCH 3/3] Add support for Flink 1.19 --- .../beam_PostCommit_Java_Tpcds_Flink.yml | 2 +- ..._PostCommit_Java_ValidatesRunner_Flink.yml | 2 +- CHANGES.md | 2 + contributor-docs/release-guide.md | 2 +- gradle.properties | 2 +- release/build.gradle.kts | 2 +- .../streaming/MemoryStateBackendWrapper.java | 76 ++++++++++++++++++ .../flink/streaming/StreamSources.java | 0 runners/flink/1.19/build.gradle | 25 ++++++ .../1.19/job-server-container/build.gradle | 26 ++++++ runners/flink/1.19/job-server/build.gradle | 31 +++++++ .../streaming/MemoryStateBackendWrapper.java | 80 +++++++++++++++++++ .../flink/streaming/StreamSources.java | 61 ++++++++++++++ .../FlinkBroadcastStateInternalsTest.java | 4 +- .../streaming/FlinkStateInternalsTest.java | 4 +- sdks/go/examples/stringsplit/stringsplit.go | 2 +- settings.gradle.kts | 4 + 17 files changed, 313 insertions(+), 12 deletions(-) create mode 100644 runners/flink/1.15/src/test/java/org/apache/beam/runners/flink/streaming/MemoryStateBackendWrapper.java rename runners/flink/{ => 1.15}/src/test/java/org/apache/beam/runners/flink/streaming/StreamSources.java (100%) create mode 100644 runners/flink/1.19/build.gradle create mode 100644 runners/flink/1.19/job-server-container/build.gradle create mode 100644 runners/flink/1.19/job-server/build.gradle create mode 100644 runners/flink/1.19/src/test/java/org/apache/beam/runners/flink/streaming/MemoryStateBackendWrapper.java create mode 100644 runners/flink/1.19/src/test/java/org/apache/beam/runners/flink/streaming/StreamSources.java diff --git a/.github/workflows/beam_PostCommit_Java_Tpcds_Flink.yml b/.github/workflows/beam_PostCommit_Java_Tpcds_Flink.yml index 19329026c034..cf85d9563122 100644 --- a/.github/workflows/beam_PostCommit_Java_Tpcds_Flink.yml +++ b/.github/workflows/beam_PostCommit_Java_Tpcds_Flink.yml @@ -101,5 +101,5 @@ jobs: with: gradle-command: :sdks:java:testing:tpcds:run arguments: | - -Ptpcds.runner=:runners:flink:1.18 \ + -Ptpcds.runner=:runners:flink:1.19 \ "-Ptpcds.args=${{env.tpcdsBigQueryArgs}} ${{env.tpcdsInfluxDBArgs}} ${{ env.GRADLE_COMMAND_ARGUMENTS }} --queries=${{env.tpcdsQueriesArg}}" \ diff --git a/.github/workflows/beam_PostCommit_Java_ValidatesRunner_Flink.yml b/.github/workflows/beam_PostCommit_Java_ValidatesRunner_Flink.yml index b6334d8e9858..f79ca8747828 100644 --- a/.github/workflows/beam_PostCommit_Java_ValidatesRunner_Flink.yml +++ b/.github/workflows/beam_PostCommit_Java_ValidatesRunner_Flink.yml @@ -78,7 +78,7 @@ jobs: - name: run validatesRunner script uses: ./.github/actions/gradle-command-self-hosted-action with: - gradle-command: :runners:flink:1.18:validatesRunner + gradle-command: :runners:flink:1.19:validatesRunner - name: Archive JUnit Test Results uses: actions/upload-artifact@v4 if: ${{ !success() }} diff --git a/CHANGES.md b/CHANGES.md index 9980643e5415..3ab52f077076 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -25,6 +25,8 @@ * New highly anticipated feature X added to Python SDK ([#X](https://github.com/apache/beam/issues/X)). * New highly anticipated feature Y added to Java SDK ([#Y](https://github.com/apache/beam/issues/Y)). +* Added support for Flink 1.19 + ## I/Os * Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). diff --git a/contributor-docs/release-guide.md b/contributor-docs/release-guide.md index 4fe35aa4aac2..d351049c96cd 100644 --- a/contributor-docs/release-guide.md +++ b/contributor-docs/release-guide.md @@ -887,7 +887,7 @@ write to BigQuery, and create a cluster of machines for running containers (for ``` **Flink Local Runner** ``` - ./gradlew :runners:flink:1.18:runQuickstartJavaFlinkLocal \ + ./gradlew :runners:flink:1.19:runQuickstartJavaFlinkLocal \ -Prepourl=https://repository.apache.org/content/repositories/orgapachebeam-${KEY} \ -Pver=${RELEASE_VERSION} ``` diff --git a/gradle.properties b/gradle.properties index f2a0b05eca09..f6e143690a34 100644 --- a/gradle.properties +++ b/gradle.properties @@ -39,6 +39,6 @@ docker_image_default_repo_root=apache docker_image_default_repo_prefix=beam_ # supported flink versions -flink_versions=1.15,1.16,1.17,1.18 +flink_versions=1.15,1.16,1.17,1.18,1.19 # supported python versions python_versions=3.8,3.9,3.10,3.11,3.12 diff --git a/release/build.gradle.kts b/release/build.gradle.kts index ca1c152c9eb5..7ec49b86aac2 100644 --- a/release/build.gradle.kts +++ b/release/build.gradle.kts @@ -39,7 +39,7 @@ task("runJavaExamplesValidationTask") { dependsOn(":runners:direct-java:runQuickstartJavaDirect") dependsOn(":runners:google-cloud-dataflow-java:runQuickstartJavaDataflow") dependsOn(":runners:spark:3:runQuickstartJavaSpark") - dependsOn(":runners:flink:1.18:runQuickstartJavaFlinkLocal") + dependsOn(":runners:flink:1.19:runQuickstartJavaFlinkLocal") dependsOn(":runners:direct-java:runMobileGamingJavaDirect") dependsOn(":runners:google-cloud-dataflow-java:runMobileGamingJavaDataflow") dependsOn(":runners:twister2:runQuickstartJavaTwister2") diff --git a/runners/flink/1.15/src/test/java/org/apache/beam/runners/flink/streaming/MemoryStateBackendWrapper.java b/runners/flink/1.15/src/test/java/org/apache/beam/runners/flink/streaming/MemoryStateBackendWrapper.java new file mode 100644 index 000000000000..7317788a72ee --- /dev/null +++ b/runners/flink/1.15/src/test/java/org/apache/beam/runners/flink/streaming/MemoryStateBackendWrapper.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.streaming; + +import java.util.Collection; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.query.TaskKvStateRegistry; +import org.apache.flink.runtime.state.AbstractKeyedStateBackend; +import org.apache.flink.runtime.state.BackendBuildingException; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.OperatorStateBackend; +import org.apache.flink.runtime.state.OperatorStateHandle; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.runtime.state.ttl.TtlTimeProvider; + +class MemoryStateBackendWrapper { + static AbstractKeyedStateBackend createKeyedStateBackend( + Environment env, + JobID jobID, + String operatorIdentifier, + TypeSerializer keySerializer, + int numberOfKeyGroups, + KeyGroupRange keyGroupRange, + TaskKvStateRegistry kvStateRegistry, + TtlTimeProvider ttlTimeProvider, + MetricGroup metricGroup, + Collection stateHandles, + CloseableRegistry cancelStreamRegistry) + throws BackendBuildingException { + + MemoryStateBackend backend = new MemoryStateBackend(); + return backend.createKeyedStateBackend( + env, + jobID, + operatorIdentifier, + keySerializer, + numberOfKeyGroups, + keyGroupRange, + kvStateRegistry, + ttlTimeProvider, + metricGroup, + stateHandles, + cancelStreamRegistry); + } + + static OperatorStateBackend createOperatorStateBackend( + Environment env, + String operatorIdentifier, + Collection stateHandles, + CloseableRegistry cancelStreamRegistry) + throws Exception { + MemoryStateBackend backend = new MemoryStateBackend(); + return backend.createOperatorStateBackend( + env, operatorIdentifier, stateHandles, cancelStreamRegistry); + } +} diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/StreamSources.java b/runners/flink/1.15/src/test/java/org/apache/beam/runners/flink/streaming/StreamSources.java similarity index 100% rename from runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/StreamSources.java rename to runners/flink/1.15/src/test/java/org/apache/beam/runners/flink/streaming/StreamSources.java diff --git a/runners/flink/1.19/build.gradle b/runners/flink/1.19/build.gradle new file mode 100644 index 000000000000..1545da258477 --- /dev/null +++ b/runners/flink/1.19/build.gradle @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +project.ext { + flink_major = '1.19' + flink_version = '1.19.0' +} + +// Load the main build script which contains all build logic. +apply from: "../flink_runner.gradle" diff --git a/runners/flink/1.19/job-server-container/build.gradle b/runners/flink/1.19/job-server-container/build.gradle new file mode 100644 index 000000000000..afdb68a0fc91 --- /dev/null +++ b/runners/flink/1.19/job-server-container/build.gradle @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +def basePath = '../../job-server-container' + +project.ext { + resource_path = basePath +} + +// Load the main build script which contains all build logic. +apply from: "$basePath/flink_job_server_container.gradle" diff --git a/runners/flink/1.19/job-server/build.gradle b/runners/flink/1.19/job-server/build.gradle new file mode 100644 index 000000000000..332f04e08ceb --- /dev/null +++ b/runners/flink/1.19/job-server/build.gradle @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +def basePath = '../../job-server' + +project.ext { + // Look for the source code in the parent module + main_source_dirs = ["$basePath/src/main/java"] + test_source_dirs = ["$basePath/src/test/java"] + main_resources_dirs = ["$basePath/src/main/resources"] + test_resources_dirs = ["$basePath/src/test/resources"] + archives_base_name = 'beam-runners-flink-1.19-job-server' +} + +// Load the main build script which contains all build logic. +apply from: "$basePath/flink_job_server.gradle" diff --git a/runners/flink/1.19/src/test/java/org/apache/beam/runners/flink/streaming/MemoryStateBackendWrapper.java b/runners/flink/1.19/src/test/java/org/apache/beam/runners/flink/streaming/MemoryStateBackendWrapper.java new file mode 100644 index 000000000000..cbaa6fd3a8c4 --- /dev/null +++ b/runners/flink/1.19/src/test/java/org/apache/beam/runners/flink/streaming/MemoryStateBackendWrapper.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.streaming; + +import java.util.Collection; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.query.TaskKvStateRegistry; +import org.apache.flink.runtime.state.AbstractKeyedStateBackend; +import org.apache.flink.runtime.state.BackendBuildingException; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyedStateBackendParametersImpl; +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.OperatorStateBackend; +import org.apache.flink.runtime.state.OperatorStateBackendParametersImpl; +import org.apache.flink.runtime.state.OperatorStateHandle; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.runtime.state.ttl.TtlTimeProvider; + +class MemoryStateBackendWrapper { + static AbstractKeyedStateBackend createKeyedStateBackend( + Environment env, + JobID jobID, + String operatorIdentifier, + TypeSerializer keySerializer, + int numberOfKeyGroups, + KeyGroupRange keyGroupRange, + TaskKvStateRegistry kvStateRegistry, + TtlTimeProvider ttlTimeProvider, + MetricGroup metricGroup, + Collection stateHandles, + CloseableRegistry cancelStreamRegistry) + throws BackendBuildingException { + + MemoryStateBackend backend = new MemoryStateBackend(); + return backend.createKeyedStateBackend( + new KeyedStateBackendParametersImpl<>( + env, + jobID, + operatorIdentifier, + keySerializer, + numberOfKeyGroups, + keyGroupRange, + kvStateRegistry, + ttlTimeProvider, + metricGroup, + stateHandles, + cancelStreamRegistry)); + } + + static OperatorStateBackend createOperatorStateBackend( + Environment env, + String operatorIdentifier, + Collection stateHandles, + CloseableRegistry cancelStreamRegistry) + throws Exception { + MemoryStateBackend backend = new MemoryStateBackend(); + return backend.createOperatorStateBackend( + new OperatorStateBackendParametersImpl( + env, operatorIdentifier, stateHandles, cancelStreamRegistry)); + } +} diff --git a/runners/flink/1.19/src/test/java/org/apache/beam/runners/flink/streaming/StreamSources.java b/runners/flink/1.19/src/test/java/org/apache/beam/runners/flink/streaming/StreamSources.java new file mode 100644 index 000000000000..c03799d09535 --- /dev/null +++ b/runners/flink/1.19/src/test/java/org/apache/beam/runners/flink/streaming/StreamSources.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.streaming; + +import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.operators.StreamSource; +import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.OperatorChain; +import org.apache.flink.streaming.runtime.tasks.RegularOperatorChain; +import org.apache.flink.streaming.runtime.tasks.StreamTask; +import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus; + +/** {@link StreamSource} utilities, that bridge incompatibilities between Flink releases. */ +public class StreamSources { + + public static > void run( + StreamSource streamSource, + Object lockingObject, + Output> collector) + throws Exception { + streamSource.run(lockingObject, collector, createOperatorChain(streamSource)); + } + + private static OperatorChain createOperatorChain(AbstractStreamOperator operator) { + return new RegularOperatorChain<>( + operator.getContainingTask(), + StreamTask.createRecordWriterDelegate( + operator.getOperatorConfig(), new MockEnvironmentBuilder().build())); + } + + /** The emitWatermarkStatus method was added in Flink 1.14, so we need to wrap Output. */ + public interface OutputWrapper extends Output { + @Override + default void emitWatermarkStatus(WatermarkStatus watermarkStatus) {} + + /** In Flink 1.19 the {@code emitRecordAttributes} method was added. */ + @Override + default void emitRecordAttributes(RecordAttributes recordAttributes) { + throw new UnsupportedOperationException("emitRecordAttributes not implemented"); + } + } +} diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkBroadcastStateInternalsTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkBroadcastStateInternalsTest.java index 10e20a6d47d3..c679c0725051 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkBroadcastStateInternalsTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkBroadcastStateInternalsTest.java @@ -25,7 +25,6 @@ import org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkBroadcastStateInternals; import org.apache.flink.runtime.operators.testutils.DummyEnvironment; import org.apache.flink.runtime.state.OperatorStateBackend; -import org.apache.flink.runtime.state.memory.MemoryStateBackend; import org.junit.Ignore; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -41,10 +40,9 @@ public class FlinkBroadcastStateInternalsTest extends StateInternalsTest { @Override protected StateInternals createStateInternals() { - MemoryStateBackend backend = new MemoryStateBackend(); try { OperatorStateBackend operatorStateBackend = - backend.createOperatorStateBackend( + MemoryStateBackendWrapper.createOperatorStateBackend( new DummyEnvironment("test", 1, 0), "", Collections.emptyList(), null); return new FlinkBroadcastStateInternals<>( 1, diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java index a2d6f5027abb..d0338ec3b0d3 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java @@ -47,7 +47,6 @@ import org.apache.flink.runtime.state.AbstractKeyedStateBackend; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyedStateBackend; -import org.apache.flink.runtime.state.memory.MemoryStateBackend; import org.apache.flink.runtime.state.ttl.TtlTimeProvider; import org.hamcrest.Matchers; import org.joda.time.Instant; @@ -185,9 +184,8 @@ public void testGlobalWindowWatermarkHoldClear() throws Exception { } public static KeyedStateBackend createStateBackend() throws Exception { - MemoryStateBackend backend = new MemoryStateBackend(); AbstractKeyedStateBackend keyedStateBackend = - backend.createKeyedStateBackend( + MemoryStateBackendWrapper.createKeyedStateBackend( new DummyEnvironment("test", 1, 0), new JobID(), "test_op", diff --git a/sdks/go/examples/stringsplit/stringsplit.go b/sdks/go/examples/stringsplit/stringsplit.go index 266cdd99fb37..76140075b625 100644 --- a/sdks/go/examples/stringsplit/stringsplit.go +++ b/sdks/go/examples/stringsplit/stringsplit.go @@ -21,7 +21,7 @@ // 1. From a command line, navigate to the top-level beam/ directory and run // the Flink job server: // -// ./gradlew :runners:flink:1.18:job-server:runShadow -Djob-host=localhost -Dflink-master=local +// ./gradlew :runners:flink:1.19:job-server:runShadow -Djob-host=localhost -Dflink-master=local // // 2. The job server is ready to receive jobs once it outputs a log like the // following: `JobService started on localhost:8099`. Take note of the endpoint diff --git a/settings.gradle.kts b/settings.gradle.kts index 9701b4dbc06f..b71ed1ede134 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -141,6 +141,10 @@ include(":runners:flink:1.17:job-server-container") include(":runners:flink:1.18") include(":runners:flink:1.18:job-server") include(":runners:flink:1.18:job-server-container") +// Flink 1.19 +include(":runners:flink:1.19") +include(":runners:flink:1.19:job-server") +include(":runners:flink:1.19:job-server-container") /* End Flink Runner related settings */ include(":runners:twister2") include(":runners:google-cloud-dataflow-java")