Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add config map to FlinkPipelineOptions #129

Merged
merged 2 commits into from
Oct 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@ class BeamModulePlugin implements Plugin<Project> {

// Automatically use the official release version if we are performing a release
// otherwise append '-SNAPSHOT'
project.version = '2.45.28'
project.version = '2.45.29'
if (isLinkedin(project)) {
project.ext.mavenGroupId = 'com.linkedin.beam'
}
Expand Down
4 changes: 2 additions & 2 deletions gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ signing.gnupg.useLegacyGpg=true
# buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy.
# To build a custom Beam version make sure you change it in both places, see
# https://github.com/apache/beam/issues/21302.
version=2.45.28
sdk_version=2.45.28
version=2.45.29
sdk_version=2.45.29

javaVersion=1.8

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.beam.sdk.util.InstanceBuilder;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
Expand Down Expand Up @@ -77,7 +78,7 @@ static ExecutionEnvironment createBatchExecutionEnvironment(

// Although Flink uses Rest, it expects the address not to contain a http scheme
String flinkMasterHostPort = stripHttpSchema(options.getFlinkMaster());
Configuration flinkConfiguration = getFlinkConfiguration(confDir);
Configuration flinkConfiguration = getFlinkConfiguration(confDir, options.getFlinkConfMap());
ExecutionEnvironment flinkBatchEnv;

// depending on the master, create the right environment.
Expand Down Expand Up @@ -163,7 +164,7 @@ public static StreamExecutionEnvironment createStreamExecutionEnvironment(

// Although Flink uses Rest, it expects the address not to contain a http scheme
String masterUrl = stripHttpSchema(options.getFlinkMaster());
Configuration flinkConfiguration = getFlinkConfiguration(confDir);
Configuration flinkConfiguration = getFlinkConfiguration(confDir, options.getFlinkConfMap());
StreamExecutionEnvironment flinkStreamEnv;

// depending on the master, create the right environment.
Expand Down Expand Up @@ -376,10 +377,19 @@ private static int determineParallelism(
return 1;
}

private static Configuration getFlinkConfiguration(@Nullable String flinkConfDir) {
return flinkConfDir == null || flinkConfDir.isEmpty()
? GlobalConfiguration.loadConfiguration()
: GlobalConfiguration.loadConfiguration(flinkConfDir);
private static Configuration getFlinkConfiguration(
@Nullable String flinkConfDir, @Nullable Map<String, String> flinkConfMap) {
Configuration dynamicProperties = null;
if (flinkConfMap != null && !flinkConfMap.isEmpty()) {
dynamicProperties = Configuration.fromMap(flinkConfMap);
}
if (flinkConfDir != null && !flinkConfDir.isEmpty()) {
return GlobalConfiguration.loadConfiguration(flinkConfDir, dynamicProperties);
} else if (dynamicProperties != null) {
return GlobalConfiguration.loadConfiguration(dynamicProperties);
} else {
return GlobalConfiguration.loadConfiguration();
}
}

private static void applyLatencyTrackingInterval(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.beam.runners.flink;

import java.util.Map;
import org.apache.beam.sdk.options.ApplicationNameOptions;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
Expand Down Expand Up @@ -305,6 +306,11 @@ public interface FlinkPipelineOptions

void setFlinkConfDir(String confDir);

@Description("Map containing Flink configurations")
Map<String, String> getFlinkConfMap();

void setFlinkConfMap(Map<String, String> flinkConfMap);

static FlinkPipelineOptions defaults() {
return PipelineOptionsFactory.as(FlinkPipelineOptions.class);
}
Expand Down
Loading