Skip to content

Commit

Permalink
Allow SamzaPipelineLifeCycleListener to update configs (#121)
Browse files Browse the repository at this point in the history
  • Loading branch information
xinyuiscool authored Apr 4, 2024
1 parent 2b9243e commit e1b1981
Show file tree
Hide file tree
Showing 6 changed files with 33 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@ class BeamModulePlugin implements Plugin<Project> {

// Automatically use the official release version if we are performing a release
// otherwise append '-SNAPSHOT'
project.version = '2.45.21'
project.version = '2.45.22'
if (isLinkedin(project)) {
project.ext.mavenGroupId = 'com.linkedin.beam'
}
Expand Down
4 changes: 2 additions & 2 deletions gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ signing.gnupg.useLegacyGpg=true
# buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy.
# To build a custom Beam version make sure you change it in both places, see
# https://github.com/apache/beam/issues/21302.
version=2.45.21
sdk_version=2.45.21
version=2.45.22
sdk_version=2.45.22

javaVersion=1.8

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@
*/
package org.apache.beam.runners.samza;

import org.apache.samza.config.Config;
import java.util.Map;
import org.apache.samza.context.ExternalContext;

/** Life cycle listener for a Samza pipeline during runtime. */
public interface SamzaPipelineLifeCycleListener {
/** Callback when the pipeline options is created. */
void onInit(Config config, SamzaPipelineOptions options);
void onInit(Map<String, String> config, SamzaPipelineOptions options);

/** Callback when the pipeline is started. */
ExternalContext onStart();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterators;
import org.apache.samza.application.StreamApplication;
import org.apache.samza.config.Config;
import org.apache.samza.config.MapConfig;
import org.apache.samza.context.ExternalContext;
import org.apache.samza.metrics.MetricsReporter;
import org.apache.samza.metrics.MetricsReporterFactory;
Expand Down Expand Up @@ -99,13 +100,14 @@ public PortablePipelineResult runPortablePipeline(RunnerApi.Pipeline pipeline, J
configBuilder.put(BEAM_PIPELINE_PROTO, Base64Serializer.serializeUnchecked(pipeline));
configBuilder.put(BEAM_JOB_INFO, Base64Serializer.serializeUnchecked(jobInfo));

final Config config = configBuilder.build();
options.setConfigOverride(config);

final Map<String, String> config = configBuilder.build();
if (listener != null) {
listener.onInit(config, options);
}

final Config samzaConfig = new MapConfig(config);
options.setConfigOverride(samzaConfig);

final SamzaExecutionContext executionContext = new SamzaExecutionContext(options);
final Map<String, MetricsReporterFactory> reporterFactories = getMetricsReporters();
final StreamApplication app =
Expand All @@ -117,8 +119,8 @@ public PortablePipelineResult runPortablePipeline(RunnerApi.Pipeline pipeline, J
pipeline, new PortableTranslationContext(appDescriptor, options, jobInfo));
};

ApplicationRunner runner = runSamzaApp(app, config);
return new SamzaPortablePipelineResult(app, runner, executionContext, listener, config);
ApplicationRunner runner = runSamzaApp(app, samzaConfig);
return new SamzaPortablePipelineResult(app, runner, executionContext, listener, samzaConfig);
}

@Override
Expand Down Expand Up @@ -153,13 +155,14 @@ public SamzaPipelineResult run(Pipeline pipeline) {
configBuilder.put(BEAM_DOT_GRAPH, dotGraph);
configBuilder.put(BEAM_JSON_GRAPH, jsonGraph);

final Config config = configBuilder.build();
options.setConfigOverride(config);

final Map<String, String> config = configBuilder.build();
if (listener != null) {
listener.onInit(config, options);
}

final Config samzaConfig = new MapConfig(config);
options.setConfigOverride(samzaConfig);

final SamzaExecutionContext executionContext = new SamzaExecutionContext(options);
final Map<String, MetricsReporterFactory> reporterFactories = getMetricsReporters();

Expand All @@ -175,8 +178,8 @@ public SamzaPipelineResult run(Pipeline pipeline) {
// perform a final round of validation for the pipeline options now that all configs are
// generated
SamzaPipelineOptionsValidator.validate(options);
ApplicationRunner runner = runSamzaApp(app, config);
return new SamzaPipelineResult(runner, executionContext, listener, config);
ApplicationRunner runner = runSamzaApp(app, samzaConfig);
return new SamzaPipelineResult(runner, executionContext, listener, samzaConfig);
}

private Map<String, MetricsReporterFactory> getMetricsReporters() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public void putAll(Map<String, String> properties) {
}

/** @return built configuration */
public Config build() {
public Map<String, String> build() {
try {
// apply framework configs
config.putAll(createSystemConfig(options, config));
Expand All @@ -111,7 +111,7 @@ public Config build() {

validateConfigs(options, config);

return new MapConfig(config);
return new HashMap<>(config);
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.JobCoordinatorConfig;
import org.apache.samza.config.TaskConfig;
Expand Down Expand Up @@ -81,7 +80,7 @@ public void testStatefulBeamStoreConfig() {
final ConfigContext configCtx = new ConfigContext(idMap, nonUniqueStateIds, options);
final ConfigBuilder configBuilder = new ConfigBuilder(options);
SamzaPipelineTranslator.createConfig(pipeline, configCtx, configBuilder);
final Config config = configBuilder.build();
final Map<String, String> config = configBuilder.build();

assertEquals(
RocksDbKeyValueStorageEngineFactory.class.getName(),
Expand All @@ -92,7 +91,7 @@ public void testStatefulBeamStoreConfig() {

options.setStateDurable(true);
SamzaPipelineTranslator.createConfig(pipeline, configCtx, configBuilder);
final Config config2 = configBuilder.build();
final Map<String, String> config2 = configBuilder.build();
assertEquals(
"TestStoreConfig-1-beamStore-changelog", config2.get("stores.beamStore.changelog"));
}
Expand All @@ -113,7 +112,7 @@ public void testStatelessBeamStoreConfig() {
final ConfigContext configCtx = new ConfigContext(idMap, nonUniqueStateIds, options);
final ConfigBuilder configBuilder = new ConfigBuilder(options);
SamzaPipelineTranslator.createConfig(pipeline, configCtx, configBuilder);
final Config config = configBuilder.build();
final Map<String, String> config = configBuilder.build();

assertEquals(
InMemoryKeyValueStorageEngineFactory.class.getName(),
Expand All @@ -124,7 +123,7 @@ public void testStatelessBeamStoreConfig() {

options.setStateDurable(true);
SamzaPipelineTranslator.createConfig(pipeline, configCtx, configBuilder);
final Config config2 = configBuilder.build();
final Map<String, String> config2 = configBuilder.build();
// For stateless jobs, ignore state durable pipeline option.
assertNull(config2.get("stores.beamStore.changelog"));
}
Expand All @@ -146,7 +145,7 @@ public void testSamzaLocalExecutionEnvironmentConfig() {
final ConfigContext configCtx = new ConfigContext(idMap, nonUniqueStateIds, options);
final ConfigBuilder configBuilder = new ConfigBuilder(options);
SamzaPipelineTranslator.createConfig(pipeline, configCtx, configBuilder);
final Config config = configBuilder.build();
final Map<String, String> config = configBuilder.build();

assertTrue(
Maps.difference(config, ConfigBuilder.localRunConfig()).entriesOnlyOnRight().isEmpty());
Expand Down Expand Up @@ -177,7 +176,7 @@ public void testSamzaYarnExecutionEnvironmentConfig() {
final ConfigBuilder configBuilder = new ConfigBuilder(options);
SamzaPipelineTranslator.createConfig(pipeline, configCtx, configBuilder);
try {
Config config = configBuilder.build();
final Map<String, String> config = configBuilder.build();
assertEquals(config.get(APP_RUNNER_CLASS), RemoteApplicationRunner.class.getName());
assertEquals(config.get(JOB_FACTORY_CLASS), YarnJobFactory.class.getName());
} catch (IllegalArgumentException e) {
Expand Down Expand Up @@ -209,7 +208,7 @@ public void testSamzaStandAloneExecutionEnvironmentConfig() {
final ConfigBuilder configBuilder = new ConfigBuilder(options);
SamzaPipelineTranslator.createConfig(pipeline, configCtx, configBuilder);
try {
Config config = configBuilder.build();
final Map<String, String> config = configBuilder.build();
assertEquals(config.get(APP_RUNNER_CLASS), LocalApplicationRunner.class.getName());
assertEquals(
config.get(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY),
Expand Down Expand Up @@ -252,7 +251,7 @@ public void processElement(
final ConfigBuilder configBuilder = new ConfigBuilder(options);

SamzaPipelineTranslator.createConfig(pipeline, configCtx, configBuilder);
final Config config = configBuilder.build();
final Map<String, String> config = configBuilder.build();

assertEquals(
RocksDbKeyValueStorageEngineFactory.class.getName(),
Expand All @@ -263,7 +262,7 @@ public void processElement(

options.setStateDurable(true);
SamzaPipelineTranslator.createConfig(pipeline, configCtx, configBuilder);
final Config config2 = configBuilder.build();
final Map<String, String> config2 = configBuilder.build();
assertEquals(
"TestStoreConfig-1-testState-changelog", config2.get("stores.testState.changelog"));
}
Expand Down Expand Up @@ -312,7 +311,7 @@ public void processElement(
final ConfigContext configCtx = new ConfigContext(idMap, nonUniqueStateIds, options);
final ConfigBuilder configBuilder = new ConfigBuilder(options);
SamzaPipelineTranslator.createConfig(pipeline, configCtx, configBuilder);
final Config config = configBuilder.build();
final Map<String, String> config = configBuilder.build();

assertEquals(
RocksDbKeyValueStorageEngineFactory.class.getName(),
Expand All @@ -330,7 +329,7 @@ public void processElement(

options.setStateDurable(true);
SamzaPipelineTranslator.createConfig(pipeline, configCtx, configBuilder);
final Config config2 = configBuilder.build();
final Map<String, String> config2 = configBuilder.build();
assertEquals(
"TestStoreConfig-1-testState-First_stateful_ParDo-changelog",
config2.get("stores.testState-First_stateful_ParDo.changelog"));
Expand Down Expand Up @@ -384,7 +383,7 @@ public void processElement(

final ConfigBuilder configBuilder = new ConfigBuilder(options);
SamzaPipelineTranslator.createConfig(pipeline, configCtx, configBuilder);
final Config config = configBuilder.build();
final Map<String, String> config = configBuilder.build();

assertEquals(
RocksDbKeyValueStorageEngineFactory.class.getName(),
Expand All @@ -406,7 +405,7 @@ public void processElement(

options.setStateDurable(true);
SamzaPipelineTranslator.createConfig(pipeline, configCtx, configBuilder);
final Config config2 = configBuilder.build();
final Map<String, String> config2 = configBuilder.build();
assertEquals(
"TestStoreConfig-1-testState-Same_stateful_ParDo_Name-changelog",
config2.get("stores.testState-Same_stateful_ParDo_Name.changelog"));
Expand Down

0 comments on commit e1b1981

Please sign in to comment.