From 1b273666abc7da07b9c8fcb6a9718453e541124f Mon Sep 17 00:00:00 2001 From: yananhao12 Date: Tue, 8 Oct 2024 18:52:07 -0700 Subject: [PATCH 1/2] add config map to FlinkPipelineOptions --- .../flink/FlinkExecutionEnvironments.java | 22 ++++++++++++++----- .../runners/flink/FlinkPipelineOptions.java | 6 +++++ 2 files changed, 22 insertions(+), 6 deletions(-) diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java index 45c87133c5d3..25a553af372c 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java @@ -21,6 +21,7 @@ import java.util.Collections; import java.util.List; +import java.util.Map; import org.apache.beam.sdk.util.InstanceBuilder; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects; @@ -77,7 +78,7 @@ static ExecutionEnvironment createBatchExecutionEnvironment( // Although Flink uses Rest, it expects the address not to contain a http scheme String flinkMasterHostPort = stripHttpSchema(options.getFlinkMaster()); - Configuration flinkConfiguration = getFlinkConfiguration(confDir); + Configuration flinkConfiguration = getFlinkConfiguration(confDir, options.getFlinkConfMap()); ExecutionEnvironment flinkBatchEnv; // depending on the master, create the right environment. @@ -163,7 +164,7 @@ public static StreamExecutionEnvironment createStreamExecutionEnvironment( // Although Flink uses Rest, it expects the address not to contain a http scheme String masterUrl = stripHttpSchema(options.getFlinkMaster()); - Configuration flinkConfiguration = getFlinkConfiguration(confDir); + Configuration flinkConfiguration = getFlinkConfiguration(confDir, options.getFlinkConfMap()); StreamExecutionEnvironment flinkStreamEnv; // depending on the master, create the right environment. @@ -376,10 +377,19 @@ private static int determineParallelism( return 1; } - private static Configuration getFlinkConfiguration(@Nullable String flinkConfDir) { - return flinkConfDir == null || flinkConfDir.isEmpty() - ? GlobalConfiguration.loadConfiguration() - : GlobalConfiguration.loadConfiguration(flinkConfDir); + private static Configuration getFlinkConfiguration( + @Nullable String flinkConfDir, @Nullable Map flinkConfMap) { + Configuration dynamicProperties = null; + if (flinkConfMap != null && !flinkConfMap.isEmpty()) { + dynamicProperties = Configuration.fromMap(flinkConfMap); + } + if (flinkConfDir != null && !flinkConfDir.isEmpty()) { + return GlobalConfiguration.loadConfiguration(flinkConfDir, dynamicProperties); + } else if (dynamicProperties != null) { + return GlobalConfiguration.loadConfiguration(dynamicProperties); + } else { + return GlobalConfiguration.loadConfiguration(); + } } private static void applyLatencyTrackingInterval( diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java index 829cd559d98a..aa83b60a8ac6 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java @@ -17,6 +17,7 @@ */ package org.apache.beam.runners.flink; +import java.util.Map; import org.apache.beam.sdk.options.ApplicationNameOptions; import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.Description; @@ -305,6 +306,11 @@ public interface FlinkPipelineOptions void setFlinkConfDir(String confDir); + @Description("Map containing Flink configurations") + Map getFlinkConfMap(); + + void setFlinkConfMap(Map flinkConfMap); + static FlinkPipelineOptions defaults() { return PipelineOptionsFactory.as(FlinkPipelineOptions.class); } From 9b29cb2b0ffbfc64e9df6602914fdc87bde3894f Mon Sep 17 00:00:00 2001 From: yananhao12 Date: Tue, 8 Oct 2024 19:06:36 -0700 Subject: [PATCH 2/2] update version --- .../groovy/org/apache/beam/gradle/BeamModulePlugin.groovy | 2 +- gradle.properties | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index 4a7c859d961d..073552a6a4bf 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -398,7 +398,7 @@ class BeamModulePlugin implements Plugin { // Automatically use the official release version if we are performing a release // otherwise append '-SNAPSHOT' - project.version = '2.45.28' + project.version = '2.45.29' if (isLinkedin(project)) { project.ext.mavenGroupId = 'com.linkedin.beam' } diff --git a/gradle.properties b/gradle.properties index 4a98d4786b93..09f57de875e3 100644 --- a/gradle.properties +++ b/gradle.properties @@ -30,8 +30,8 @@ signing.gnupg.useLegacyGpg=true # buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy. # To build a custom Beam version make sure you change it in both places, see # https://github.com/apache/beam/issues/21302. -version=2.45.28 -sdk_version=2.45.28 +version=2.45.29 +sdk_version=2.45.29 javaVersion=1.8