From deee0e9cc8c6d374a5848fee882413e11f671a31 Mon Sep 17 00:00:00 2001 From: Jan Lukavsky Date: Fri, 2 Feb 2024 17:17:33 +0100 Subject: [PATCH] [flink] add support for Flink 1.17 (#29939) --- ...stCommit_Python_ValidatesRunner_Flink.json | 0 ..._PostCommit_Java_ValidatesRunner_Flink.yml | 4 +- gradle.properties | 2 +- runners/flink/1.12/build.gradle | 12 +- .../types/CoderTypeSerializer.java | 10 +- .../types/CoderTypeSerializerTest.java | 2 +- runners/flink/1.13/build.gradle | 12 +- runners/flink/1.14/build.gradle | 13 +- runners/flink/1.15/build.gradle | 13 +- runners/flink/1.16/build.gradle | 13 +- runners/flink/1.17/build.gradle | 25 +++ .../1.17/job-server-container/build.gradle | 26 +++ runners/flink/1.17/job-server/build.gradle | 31 ++++ .../types/CoderTypeSerializer.java | 164 ++++++++++++++++++ runners/flink/flink_runner.gradle | 20 ++- .../UnversionedTypeSerializerSnapshot.java | 86 +++++++++ ...UnversionedTypeSerializerSnapshotTest.java | 52 ++++++ settings.gradle.kts | 4 + .../flink_java_pipeline_options.html | 15 ++ .../flink_python_pipeline_options.html | 15 ++ 20 files changed, 459 insertions(+), 60 deletions(-) create mode 100644 .github/trigger_files/beam_PostCommit_Python_ValidatesRunner_Flink.json rename runners/flink/{ => 1.12}/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java (94%) rename runners/flink/{ => 1.12}/src/test/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializerTest.java (97%) create mode 100644 runners/flink/1.17/build.gradle create mode 100644 runners/flink/1.17/job-server-container/build.gradle create mode 100644 runners/flink/1.17/job-server/build.gradle create mode 100644 runners/flink/1.17/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java create mode 100644 runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/UnversionedTypeSerializerSnapshot.java create mode 100644 runners/flink/src/test/java/org/apache/beam/runners/flink/translation/types/UnversionedTypeSerializerSnapshotTest.java diff --git a/.github/trigger_files/beam_PostCommit_Python_ValidatesRunner_Flink.json b/.github/trigger_files/beam_PostCommit_Python_ValidatesRunner_Flink.json new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/.github/workflows/beam_PostCommit_Java_ValidatesRunner_Flink.yml b/.github/workflows/beam_PostCommit_Java_ValidatesRunner_Flink.yml index 22fd277470c8..00828674f653 100644 --- a/.github/workflows/beam_PostCommit_Java_ValidatesRunner_Flink.yml +++ b/.github/workflows/beam_PostCommit_Java_ValidatesRunner_Flink.yml @@ -78,7 +78,7 @@ jobs: - name: run validatesRunner script uses: ./.github/actions/gradle-command-self-hosted-action with: - gradle-command: :runners:flink:1.15:validatesRunner + gradle-command: :runners:flink:1.17:validatesRunner - name: Archive JUnit Test Results uses: actions/upload-artifact@v4 if: ${{ !success() }} @@ -92,4 +92,4 @@ jobs: large_files: true commit: '${{ env.prsha || env.GITHUB_SHA }}' comment_mode: ${{ github.event_name == 'issue_comment' && 'always' || 'off' }} - files: '**/build/test-results/**/*.xml' \ No newline at end of file + files: '**/build/test-results/**/*.xml' diff --git a/gradle.properties b/gradle.properties index e007151c459d..5b07dd6c2b4e 100644 --- a/gradle.properties +++ b/gradle.properties @@ -39,6 +39,6 @@ docker_image_default_repo_root=apache docker_image_default_repo_prefix=beam_ # supported flink versions -flink_versions=1.12,1.13,1.14,1.15,1.16 +flink_versions=1.12,1.13,1.14,1.15,1.16,1.17 # supported python versions python_versions=3.8,3.9,3.10,3.11 diff --git a/runners/flink/1.12/build.gradle b/runners/flink/1.12/build.gradle index 96ecec453b0b..2acee16c6e87 100644 --- a/runners/flink/1.12/build.gradle +++ b/runners/flink/1.12/build.gradle @@ -16,18 +16,10 @@ * limitations under the License. */ -def basePath = '..' -/* All properties required for loading the Flink build script */ project.ext { - // Set the version of all Flink-related dependencies here. + flink_major = '1.12' flink_version = '1.12.7' - // Version specific code overrides. - main_source_overrides = ['./src/main/java'] - test_source_overrides = ['./src/test/java'] - main_resources_overrides = [] - test_resources_overrides = [] - archives_base_name = 'beam-runners-flink-1.12' } // Load the main build script which contains all build logic. -apply from: "$basePath/flink_runner.gradle" +apply from: "../flink_runner.gradle" diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java b/runners/flink/1.12/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java similarity index 94% rename from runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java rename to runners/flink/1.12/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java index 2195ecdf1ab7..956aad428d8b 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java +++ b/runners/flink/1.12/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java @@ -46,6 +46,8 @@ }) public class CoderTypeSerializer extends TypeSerializer { + private static final long serialVersionUID = 7247319138941746449L; + private final Coder coder; /** @@ -155,10 +157,13 @@ public int hashCode() { @Override public TypeSerializerSnapshot snapshotConfiguration() { - return new LegacySnapshot<>(this); + return new UnversionedTypeSerializerSnapshot<>(this); } - /** A legacy snapshot which does not care about schema compatibility. */ + /** + * A legacy snapshot which does not care about schema compatibility. This is used only for state + * restore of state created by Beam 2.54.0 and below for Flink 1.16 and below. + */ public static class LegacySnapshot extends TypeSerializerConfigSnapshot { /** Needs to be public to work with {@link VersionedIOReadableWritable}. */ @@ -177,6 +182,7 @@ public int getVersion() { @Override public TypeSerializerSchemaCompatibility resolveSchemaCompatibility( TypeSerializer newSerializer) { + // We assume compatibility because we don't have a way of checking schema compatibility return TypeSerializerSchemaCompatibility.compatibleAsIs(); } diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializerTest.java b/runners/flink/1.12/src/test/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializerTest.java similarity index 97% rename from runners/flink/src/test/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializerTest.java rename to runners/flink/1.12/src/test/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializerTest.java index 3c4e43bd339f..fe05517333a8 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializerTest.java +++ b/runners/flink/1.12/src/test/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializerTest.java @@ -71,7 +71,7 @@ private void testWriteAndReadConfigSnapshot(Coder coder) throws IOExcept ComparatorTestBase.TestOutputView outView = new ComparatorTestBase.TestOutputView(); writtenSnapshot.writeSnapshot(outView); - TypeSerializerSnapshot readSnapshot = new CoderTypeSerializer.LegacySnapshot(); + TypeSerializerSnapshot readSnapshot = new UnversionedTypeSerializerSnapshot(); readSnapshot.readSnapshot( writtenSnapshot.getCurrentVersion(), outView.getInputView(), getClass().getClassLoader()); diff --git a/runners/flink/1.13/build.gradle b/runners/flink/1.13/build.gradle index 4045df69e2bb..1aa19e6f071c 100644 --- a/runners/flink/1.13/build.gradle +++ b/runners/flink/1.13/build.gradle @@ -16,18 +16,10 @@ * limitations under the License. */ -def basePath = '..' -/* All properties required for loading the Flink build script */ project.ext { - // Set the version of all Flink-related dependencies here. + flink_major = '1.13' flink_version = '1.13.5' - // Version specific code overrides. - main_source_overrides = ["${basePath}/1.12/src/main/java", './src/main/java'] - test_source_overrides = ["${basePath}/1.12/src/test/java", './src/test/java'] - main_resources_overrides = [] - test_resources_overrides = [] - archives_base_name = 'beam-runners-flink-1.13' } // Load the main build script which contains all build logic. -apply from: "$basePath/flink_runner.gradle" +apply from: "../flink_runner.gradle" diff --git a/runners/flink/1.14/build.gradle b/runners/flink/1.14/build.gradle index 30cb130249c6..cfbd8f8dde79 100644 --- a/runners/flink/1.14/build.gradle +++ b/runners/flink/1.14/build.gradle @@ -16,19 +16,10 @@ * limitations under the License. */ -def basePath = '..' - -/* All properties required for loading the Flink build script */ project.ext { - // Set the version of all Flink-related dependencies here. + flink_major = '1.14' flink_version = '1.14.3' - // Version specific code overrides. - main_source_overrides = ["${basePath}/1.12/src/main/java", "${basePath}/1.13/src/main/java", './src/main/java'] - test_source_overrides = ["${basePath}/1.12/src/test/java", "${basePath}/1.13/src/test/java", './src/test/java'] - main_resources_overrides = [] - test_resources_overrides = [] - archives_base_name = 'beam-runners-flink-1.14' } // Load the main build script which contains all build logic. -apply from: "$basePath/flink_runner.gradle" +apply from: "../flink_runner.gradle" diff --git a/runners/flink/1.15/build.gradle b/runners/flink/1.15/build.gradle index a3b5fb24699f..8055cf593ad0 100644 --- a/runners/flink/1.15/build.gradle +++ b/runners/flink/1.15/build.gradle @@ -16,19 +16,10 @@ * limitations under the License. */ -def basePath = '..' - -/* All properties required for loading the Flink build script */ project.ext { - // Set the version of all Flink-related dependencies here. + flink_major = '1.15' flink_version = '1.15.0' - // Version specific code overrides. - main_source_overrides = ["${basePath}/1.12/src/main/java", "${basePath}/1.13/src/main/java", "${basePath}/1.14/src/main/java", './src/main/java'] - test_source_overrides = ["${basePath}/1.12/src/test/java", "${basePath}/1.13/src/test/java", "${basePath}/1.14/src/test/java", './src/test/java'] - main_resources_overrides = [] - test_resources_overrides = [] - archives_base_name = 'beam-runners-flink-1.15' } // Load the main build script which contains all build logic. -apply from: "$basePath/flink_runner.gradle" +apply from: "../flink_runner.gradle" diff --git a/runners/flink/1.16/build.gradle b/runners/flink/1.16/build.gradle index 772e190aa457..21a222864a27 100644 --- a/runners/flink/1.16/build.gradle +++ b/runners/flink/1.16/build.gradle @@ -16,19 +16,10 @@ * limitations under the License. */ -def basePath = '..' - -/* All properties required for loading the Flink build script */ project.ext { - // Set the version of all Flink-related dependencies here. + flink_major = '1.16' flink_version = '1.16.0' - // Version specific code overrides. - main_source_overrides = ["${basePath}/1.12/src/main/java", "${basePath}/1.13/src/main/java", "${basePath}/1.14/src/main/java", "${basePath}/1.15/src/main/java", './src/main/java'] - test_source_overrides = ["${basePath}/1.12/src/test/java", "${basePath}/1.13/src/test/java", "${basePath}/1.14/src/test/java", "${basePath}/1.15/src/test/java", './src/test/java'] - main_resources_overrides = [] - test_resources_overrides = [] - archives_base_name = 'beam-runners-flink-1.16' } // Load the main build script which contains all build logic. -apply from: "$basePath/flink_runner.gradle" +apply from: "../flink_runner.gradle" diff --git a/runners/flink/1.17/build.gradle b/runners/flink/1.17/build.gradle new file mode 100644 index 000000000000..ae69b879eba9 --- /dev/null +++ b/runners/flink/1.17/build.gradle @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +project.ext { + flink_major = '1.17' + flink_version = '1.17.0' +} + +// Load the main build script which contains all build logic. +apply from: "../flink_runner.gradle" diff --git a/runners/flink/1.17/job-server-container/build.gradle b/runners/flink/1.17/job-server-container/build.gradle new file mode 100644 index 000000000000..afdb68a0fc91 --- /dev/null +++ b/runners/flink/1.17/job-server-container/build.gradle @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +def basePath = '../../job-server-container' + +project.ext { + resource_path = basePath +} + +// Load the main build script which contains all build logic. +apply from: "$basePath/flink_job_server_container.gradle" diff --git a/runners/flink/1.17/job-server/build.gradle b/runners/flink/1.17/job-server/build.gradle new file mode 100644 index 000000000000..89915349ae9a --- /dev/null +++ b/runners/flink/1.17/job-server/build.gradle @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +def basePath = '../../job-server' + +project.ext { + // Look for the source code in the parent module + main_source_dirs = ["$basePath/src/main/java"] + test_source_dirs = ["$basePath/src/test/java"] + main_resources_dirs = ["$basePath/src/main/resources"] + test_resources_dirs = ["$basePath/src/test/resources"] + archives_base_name = 'beam-runners-flink-1.17-job-server' +} + +// Load the main build script which contains all build logic. +apply from: "$basePath/flink_job_server.gradle" diff --git a/runners/flink/1.17/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java b/runners/flink/1.17/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java new file mode 100644 index 000000000000..0f87271a9779 --- /dev/null +++ b/runners/flink/1.17/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.translation.types; + +import java.io.EOFException; +import java.io.IOException; +import org.apache.beam.runners.core.construction.SerializablePipelineOptions; +import org.apache.beam.runners.flink.FlinkPipelineOptions; +import org.apache.beam.runners.flink.translation.wrappers.DataInputViewWrapper; +import org.apache.beam.runners.flink.translation.wrappers.DataOutputViewWrapper; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.util.CoderUtils; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.checkerframework.checker.nullness.qual.Nullable; + +/** + * Flink {@link org.apache.flink.api.common.typeutils.TypeSerializer} for Beam {@link + * org.apache.beam.sdk.coders.Coder Coders}. + */ +@SuppressWarnings({ + "rawtypes", // TODO(https://github.com/apache/beam/issues/20447) + "nullness" // TODO(https://github.com/apache/beam/issues/20497) +}) +public class CoderTypeSerializer extends TypeSerializer { + + private static final long serialVersionUID = 7247319138941746449L; + + private final Coder coder; + + /** + * {@link SerializablePipelineOptions} deserialization will cause {@link + * org.apache.beam.sdk.io.FileSystems} registration needed for {@link + * org.apache.beam.sdk.transforms.Reshuffle} translation. + */ + private final SerializablePipelineOptions pipelineOptions; + + private final boolean fasterCopy; + + public CoderTypeSerializer(Coder coder, SerializablePipelineOptions pipelineOptions) { + Preconditions.checkNotNull(coder); + Preconditions.checkNotNull(pipelineOptions); + this.coder = coder; + this.pipelineOptions = pipelineOptions; + + FlinkPipelineOptions options = pipelineOptions.get().as(FlinkPipelineOptions.class); + this.fasterCopy = options.getFasterCopy(); + } + + @Override + public boolean isImmutableType() { + return false; + } + + @Override + public CoderTypeSerializer duplicate() { + return new CoderTypeSerializer<>(coder, pipelineOptions); + } + + @Override + public T createInstance() { + return null; + } + + @Override + public T copy(T t) { + if (fasterCopy) { + return t; + } + try { + return CoderUtils.clone(coder, t); + } catch (CoderException e) { + throw new RuntimeException("Could not clone.", e); + } + } + + @Override + public T copy(T t, T reuse) { + return copy(t); + } + + @Override + public int getLength() { + return -1; + } + + @Override + public void serialize(T t, DataOutputView dataOutputView) throws IOException { + DataOutputViewWrapper outputWrapper = new DataOutputViewWrapper(dataOutputView); + coder.encode(t, outputWrapper); + } + + @Override + public T deserialize(DataInputView dataInputView) throws IOException { + try { + DataInputViewWrapper inputWrapper = new DataInputViewWrapper(dataInputView); + return coder.decode(inputWrapper); + } catch (CoderException e) { + Throwable cause = e.getCause(); + if (cause instanceof EOFException) { + throw (EOFException) cause; + } else { + throw e; + } + } + } + + @Override + public T deserialize(T t, DataInputView dataInputView) throws IOException { + return deserialize(dataInputView); + } + + @Override + public void copy(DataInputView dataInputView, DataOutputView dataOutputView) throws IOException { + serialize(deserialize(dataInputView), dataOutputView); + } + + @Override + public boolean equals(@Nullable Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + CoderTypeSerializer that = (CoderTypeSerializer) o; + return coder.equals(that.coder); + } + + @Override + public int hashCode() { + return coder.hashCode(); + } + + @Override + public TypeSerializerSnapshot snapshotConfiguration() { + return new UnversionedTypeSerializerSnapshot<>(this); + } + + @Override + public String toString() { + return "CoderTypeSerializer{" + "coder=" + coder + '}'; + } +} diff --git a/runners/flink/flink_runner.gradle b/runners/flink/flink_runner.gradle index 3bcbfdca0290..f786bcf8d882 100644 --- a/runners/flink/flink_runner.gradle +++ b/runners/flink/flink_runner.gradle @@ -25,11 +25,29 @@ import groovy.json.JsonOutput +def base_path = ".." + +def overrides(versions, type, base_path) { + versions.collect { "${base_path}/${it}/src/${type}/java" } + ["./src/${type}/java"] +} + +def all_versions = ['1.12', '1.13', '1.14', '1.15', '1.16', '1.17'] + +def previous_versions = all_versions.findAll { it < flink_major } + +// Version specific code overrides. +def main_source_overrides = overrides(previous_versions, "main", base_path) +def test_source_overrides = overrides(previous_versions, "test", base_path) +def main_resources_overrides = [] +def test_resources_overrides = [] + +def archivesBaseName = "beam-runners-flink-${flink_major}" + apply plugin: 'org.apache.beam.module' applyJavaNature( enableStrictDependencies:true, automaticModuleName: 'org.apache.beam.runners.flink', - archivesBaseName: (project.hasProperty('archives_base_name') ? archives_base_name : archivesBaseName), + archivesBaseName: archivesBaseName, ) description = "Apache Beam :: Runners :: Flink $flink_version" diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/UnversionedTypeSerializerSnapshot.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/UnversionedTypeSerializerSnapshot.java new file mode 100644 index 000000000000..f7e4d6ab4f3f --- /dev/null +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/UnversionedTypeSerializerSnapshot.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.translation.types; + +import java.io.IOException; +import javax.annotation.Nullable; +import org.apache.beam.sdk.util.SerializableUtils; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility; +import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; +import org.apache.flink.core.io.VersionedIOReadableWritable; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.util.TemporaryClassLoaderContext; + +/** A legacy snapshot which does not care about schema compatibility. */ +@SuppressWarnings("allcheckers") +public class UnversionedTypeSerializerSnapshot implements TypeSerializerSnapshot { + + private @Nullable CoderTypeSerializer serializer; + + /** Needs to be public to work with {@link VersionedIOReadableWritable}. */ + public UnversionedTypeSerializerSnapshot() { + this(null); + } + + @SuppressWarnings("initialization") + public UnversionedTypeSerializerSnapshot(CoderTypeSerializer serializer) { + this.serializer = serializer; + } + + @Override + public int getCurrentVersion() { + return 1; + } + + @Override + public void writeSnapshot(DataOutputView dataOutputView) throws IOException { + byte[] bytes = SerializableUtils.serializeToByteArray(serializer); + dataOutputView.writeInt(bytes.length); + dataOutputView.write(bytes); + } + + @SuppressWarnings("unchecked") + @Override + public void readSnapshot(int version, DataInputView dataInputView, ClassLoader classLoader) + throws IOException { + + try (TemporaryClassLoaderContext context = TemporaryClassLoaderContext.of(classLoader)) { + int length = dataInputView.readInt(); + byte[] bytes = new byte[length]; + dataInputView.readFully(bytes); + this.serializer = + (CoderTypeSerializer) + SerializableUtils.deserializeFromByteArray( + bytes, CoderTypeSerializer.class.getName()); + } + } + + @Override + public TypeSerializer restoreSerializer() { + return serializer; + } + + @Override + public TypeSerializerSchemaCompatibility resolveSchemaCompatibility( + TypeSerializer newSerializer) { + + return TypeSerializerSchemaCompatibility.compatibleAsIs(); + } +} diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/types/UnversionedTypeSerializerSnapshotTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/types/UnversionedTypeSerializerSnapshotTest.java new file mode 100644 index 000000000000..b327b5a65835 --- /dev/null +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/types/UnversionedTypeSerializerSnapshotTest.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.translation.types; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import org.apache.beam.runners.core.construction.SerializablePipelineOptions; +import org.apache.beam.sdk.coders.VarIntCoder; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; +import org.apache.flink.core.memory.DataInputDeserializer; +import org.apache.flink.core.memory.DataOutputSerializer; +import org.junit.Test; + +public class UnversionedTypeSerializerSnapshotTest { + + @Test + public void testSerialization() throws IOException { + PipelineOptions opts = PipelineOptionsFactory.create(); + CoderTypeSerializer serializer = + new CoderTypeSerializer<>(VarIntCoder.of(), new SerializablePipelineOptions(opts)); + TypeSerializerSnapshot snapshot = serializer.snapshotConfiguration(); + assertTrue(snapshot instanceof UnversionedTypeSerializerSnapshot); + assertEquals(1, snapshot.getCurrentVersion()); + DataOutputSerializer output = new DataOutputSerializer(1); + snapshot.writeSnapshot(output); + DataInputDeserializer input = new DataInputDeserializer(); + input.setBuffer(output.wrapAsByteBuffer()); + TypeSerializerSnapshot emptySnapshot = new UnversionedTypeSerializerSnapshot<>(); + emptySnapshot.readSnapshot( + snapshot.getCurrentVersion(), input, Thread.currentThread().getContextClassLoader()); + assertEquals(emptySnapshot.restoreSerializer(), serializer); + } +} diff --git a/settings.gradle.kts b/settings.gradle.kts index cca547c9e04e..c2dd9bac1bac 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -141,6 +141,10 @@ include(":runners:flink:1.15:job-server-container") include(":runners:flink:1.16") include(":runners:flink:1.16:job-server") include(":runners:flink:1.16:job-server-container") +// Flink 1.17 +include(":runners:flink:1.17") +include(":runners:flink:1.17:job-server") +include(":runners:flink:1.17:job-server-container") /* End Flink Runner related settings */ include(":runners:twister2") include(":runners:google-cloud-dataflow-java") diff --git a/website/www/site/layouts/shortcodes/flink_java_pipeline_options.html b/website/www/site/layouts/shortcodes/flink_java_pipeline_options.html index 87d69ee60fe3..a6b7ed28f403 100644 --- a/website/www/site/layouts/shortcodes/flink_java_pipeline_options.html +++ b/website/www/site/layouts/shortcodes/flink_java_pipeline_options.html @@ -22,6 +22,11 @@ Flag indicating whether non restored state is allowed if the savepoint contains state for an operator that is no longer part of the pipeline. Default: false + + attachedMode + Specifies if the pipeline is submitted in attached or detached mode + Default: true + autoBalanceWriteFilesShardingEnabled Flag indicating whether auto-balance sharding for WriteFiles transform should be enabled. This might prove useful in streaming use-case, where pipeline needs to write quite many events into files, typically divided into N shards. Default behavior on Flink would be, that some workers will receive more shards to take care of than others. This cause workers to go out of balance in terms of processing backlog and memory usage. Enabling this feature will make shards to be spread evenly among available workers in improve throughput and memory usage stability. @@ -82,6 +87,11 @@ Remove unneeded deep copy between operators. See https://issues.apache.org/jira/browse/BEAM-11146 Default: false + + fileInputSplitMaxSizeMB + Set the maximum size of input split when data is read from a filesystem. 0 implies no max size. + Default: 0 + finishBundleBeforeCheckpointing If set, finishes the current bundle and flushes all output before checkpointing the state of the operators. By default, starts checkpointing immediately and buffers any remaining bundle output as part of the checkpoint. The setting may affect the checkpoint alignment. @@ -97,6 +107,11 @@ Address of the Flink Master where the Pipeline should be executed. Can either be of the form "host:port" or one of the special values [local], [collection] or [auto]. Default: [auto] + + jobCheckIntervalInSecs + Set job check interval in seconds under detached mode in method waitUntilFinish, by default it is 5 seconds + Default: 5 + latencyTrackingInterval Interval in milliseconds for sending latency tracking marks from the sources to the sinks. Interval value <= 0 disables the feature. diff --git a/website/www/site/layouts/shortcodes/flink_python_pipeline_options.html b/website/www/site/layouts/shortcodes/flink_python_pipeline_options.html index 27ae27ad05a3..494b01bc1d02 100644 --- a/website/www/site/layouts/shortcodes/flink_python_pipeline_options.html +++ b/website/www/site/layouts/shortcodes/flink_python_pipeline_options.html @@ -22,6 +22,11 @@ Flag indicating whether non restored state is allowed if the savepoint contains state for an operator that is no longer part of the pipeline. Default: false + + attached_mode + Specifies if the pipeline is submitted in attached or detached mode + Default: true + auto_balance_write_files_sharding_enabled Flag indicating whether auto-balance sharding for WriteFiles transform should be enabled. This might prove useful in streaming use-case, where pipeline needs to write quite many events into files, typically divided into N shards. Default behavior on Flink would be, that some workers will receive more shards to take care of than others. This cause workers to go out of balance in terms of processing backlog and memory usage. Enabling this feature will make shards to be spread evenly among available workers in improve throughput and memory usage stability. @@ -82,6 +87,11 @@ Remove unneeded deep copy between operators. See https://issues.apache.org/jira/browse/BEAM-11146 Default: false + + file_input_split_max_size_m_b + Set the maximum size of input split when data is read from a filesystem. 0 implies no max size. + Default: 0 + finish_bundle_before_checkpointing If set, finishes the current bundle and flushes all output before checkpointing the state of the operators. By default, starts checkpointing immediately and buffers any remaining bundle output as part of the checkpoint. The setting may affect the checkpoint alignment. @@ -97,6 +107,11 @@ Address of the Flink Master where the Pipeline should be executed. Can either be of the form "host:port" or one of the special values [local], [collection] or [auto]. Default: [auto] + + job_check_interval_in_secs + Set job check interval in seconds under detached mode in method waitUntilFinish, by default it is 5 seconds + Default: 5 + latency_tracking_interval Interval in milliseconds for sending latency tracking marks from the sources to the sinks. Interval value <= 0 disables the feature.