diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index 9133795f702f..ba95ffb74089 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -398,7 +398,7 @@ class BeamModulePlugin implements Plugin { // Automatically use the official release version if we are performing a release // otherwise append '-SNAPSHOT' - project.version = '2.45.21' + project.version = '2.45.22' if (isLinkedin(project)) { project.ext.mavenGroupId = 'com.linkedin.beam' } diff --git a/gradle.properties b/gradle.properties index 2667e4b5ee4d..46bb2c87c3f6 100644 --- a/gradle.properties +++ b/gradle.properties @@ -30,8 +30,8 @@ signing.gnupg.useLegacyGpg=true # buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy. # To build a custom Beam version make sure you change it in both places, see # https://github.com/apache/beam/issues/21302. -version=2.45.21 -sdk_version=2.45.21 +version=2.45.22 +sdk_version=2.45.22 javaVersion=1.8 diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineLifeCycleListener.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineLifeCycleListener.java index 47f36a229ac1..643f0801f850 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineLifeCycleListener.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineLifeCycleListener.java @@ -17,13 +17,13 @@ */ package org.apache.beam.runners.samza; -import org.apache.samza.config.Config; +import java.util.Map; import org.apache.samza.context.ExternalContext; /** Life cycle listener for a Samza pipeline during runtime. */ public interface SamzaPipelineLifeCycleListener { /** Callback when the pipeline options is created. */ - void onInit(Config config, SamzaPipelineOptions options); + void onInit(Map config, SamzaPipelineOptions options); /** Callback when the pipeline is started. */ ExternalContext onStart(); diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java index 1817d6162bb8..b2c4b91b2525 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java @@ -49,6 +49,7 @@ import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterators; import org.apache.samza.application.StreamApplication; import org.apache.samza.config.Config; +import org.apache.samza.config.MapConfig; import org.apache.samza.context.ExternalContext; import org.apache.samza.metrics.MetricsReporter; import org.apache.samza.metrics.MetricsReporterFactory; @@ -99,13 +100,14 @@ public PortablePipelineResult runPortablePipeline(RunnerApi.Pipeline pipeline, J configBuilder.put(BEAM_PIPELINE_PROTO, Base64Serializer.serializeUnchecked(pipeline)); configBuilder.put(BEAM_JOB_INFO, Base64Serializer.serializeUnchecked(jobInfo)); - final Config config = configBuilder.build(); - options.setConfigOverride(config); - + final Map config = configBuilder.build(); if (listener != null) { listener.onInit(config, options); } + final Config samzaConfig = new MapConfig(config); + options.setConfigOverride(samzaConfig); + final SamzaExecutionContext executionContext = new SamzaExecutionContext(options); final Map reporterFactories = getMetricsReporters(); final StreamApplication app = @@ -117,8 +119,8 @@ public PortablePipelineResult runPortablePipeline(RunnerApi.Pipeline pipeline, J pipeline, new PortableTranslationContext(appDescriptor, options, jobInfo)); }; - ApplicationRunner runner = runSamzaApp(app, config); - return new SamzaPortablePipelineResult(app, runner, executionContext, listener, config); + ApplicationRunner runner = runSamzaApp(app, samzaConfig); + return new SamzaPortablePipelineResult(app, runner, executionContext, listener, samzaConfig); } @Override @@ -153,13 +155,14 @@ public SamzaPipelineResult run(Pipeline pipeline) { configBuilder.put(BEAM_DOT_GRAPH, dotGraph); configBuilder.put(BEAM_JSON_GRAPH, jsonGraph); - final Config config = configBuilder.build(); - options.setConfigOverride(config); - + final Map config = configBuilder.build(); if (listener != null) { listener.onInit(config, options); } + final Config samzaConfig = new MapConfig(config); + options.setConfigOverride(samzaConfig); + final SamzaExecutionContext executionContext = new SamzaExecutionContext(options); final Map reporterFactories = getMetricsReporters(); @@ -175,8 +178,8 @@ public SamzaPipelineResult run(Pipeline pipeline) { // perform a final round of validation for the pipeline options now that all configs are // generated SamzaPipelineOptionsValidator.validate(options); - ApplicationRunner runner = runSamzaApp(app, config); - return new SamzaPipelineResult(runner, executionContext, listener, config); + ApplicationRunner runner = runSamzaApp(app, samzaConfig); + return new SamzaPipelineResult(runner, executionContext, listener, samzaConfig); } private Map getMetricsReporters() { diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ConfigBuilder.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ConfigBuilder.java index 1d02102a1920..f5ed34637a8f 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ConfigBuilder.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ConfigBuilder.java @@ -87,7 +87,7 @@ public void putAll(Map properties) { } /** @return built configuration */ - public Config build() { + public Map build() { try { // apply framework configs config.putAll(createSystemConfig(options, config)); @@ -111,7 +111,7 @@ public Config build() { validateConfigs(options, config); - return new MapConfig(config); + return new HashMap<>(config); } catch (Exception e) { throw new RuntimeException(e); } diff --git a/runners/samza/src/test/java/org/apache/beam/runners/samza/translation/ConfigGeneratorTest.java b/runners/samza/src/test/java/org/apache/beam/runners/samza/translation/ConfigGeneratorTest.java index 601831845e83..45c23c5579b8 100644 --- a/runners/samza/src/test/java/org/apache/beam/runners/samza/translation/ConfigGeneratorTest.java +++ b/runners/samza/src/test/java/org/apache/beam/runners/samza/translation/ConfigGeneratorTest.java @@ -44,7 +44,6 @@ import org.apache.beam.sdk.values.TypeDescriptors; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps; -import org.apache.samza.config.Config; import org.apache.samza.config.JobConfig; import org.apache.samza.config.JobCoordinatorConfig; import org.apache.samza.config.TaskConfig; @@ -81,7 +80,7 @@ public void testStatefulBeamStoreConfig() { final ConfigContext configCtx = new ConfigContext(idMap, nonUniqueStateIds, options); final ConfigBuilder configBuilder = new ConfigBuilder(options); SamzaPipelineTranslator.createConfig(pipeline, configCtx, configBuilder); - final Config config = configBuilder.build(); + final Map config = configBuilder.build(); assertEquals( RocksDbKeyValueStorageEngineFactory.class.getName(), @@ -92,7 +91,7 @@ public void testStatefulBeamStoreConfig() { options.setStateDurable(true); SamzaPipelineTranslator.createConfig(pipeline, configCtx, configBuilder); - final Config config2 = configBuilder.build(); + final Map config2 = configBuilder.build(); assertEquals( "TestStoreConfig-1-beamStore-changelog", config2.get("stores.beamStore.changelog")); } @@ -113,7 +112,7 @@ public void testStatelessBeamStoreConfig() { final ConfigContext configCtx = new ConfigContext(idMap, nonUniqueStateIds, options); final ConfigBuilder configBuilder = new ConfigBuilder(options); SamzaPipelineTranslator.createConfig(pipeline, configCtx, configBuilder); - final Config config = configBuilder.build(); + final Map config = configBuilder.build(); assertEquals( InMemoryKeyValueStorageEngineFactory.class.getName(), @@ -124,7 +123,7 @@ public void testStatelessBeamStoreConfig() { options.setStateDurable(true); SamzaPipelineTranslator.createConfig(pipeline, configCtx, configBuilder); - final Config config2 = configBuilder.build(); + final Map config2 = configBuilder.build(); // For stateless jobs, ignore state durable pipeline option. assertNull(config2.get("stores.beamStore.changelog")); } @@ -146,7 +145,7 @@ public void testSamzaLocalExecutionEnvironmentConfig() { final ConfigContext configCtx = new ConfigContext(idMap, nonUniqueStateIds, options); final ConfigBuilder configBuilder = new ConfigBuilder(options); SamzaPipelineTranslator.createConfig(pipeline, configCtx, configBuilder); - final Config config = configBuilder.build(); + final Map config = configBuilder.build(); assertTrue( Maps.difference(config, ConfigBuilder.localRunConfig()).entriesOnlyOnRight().isEmpty()); @@ -177,7 +176,7 @@ public void testSamzaYarnExecutionEnvironmentConfig() { final ConfigBuilder configBuilder = new ConfigBuilder(options); SamzaPipelineTranslator.createConfig(pipeline, configCtx, configBuilder); try { - Config config = configBuilder.build(); + final Map config = configBuilder.build(); assertEquals(config.get(APP_RUNNER_CLASS), RemoteApplicationRunner.class.getName()); assertEquals(config.get(JOB_FACTORY_CLASS), YarnJobFactory.class.getName()); } catch (IllegalArgumentException e) { @@ -209,7 +208,7 @@ public void testSamzaStandAloneExecutionEnvironmentConfig() { final ConfigBuilder configBuilder = new ConfigBuilder(options); SamzaPipelineTranslator.createConfig(pipeline, configCtx, configBuilder); try { - Config config = configBuilder.build(); + final Map config = configBuilder.build(); assertEquals(config.get(APP_RUNNER_CLASS), LocalApplicationRunner.class.getName()); assertEquals( config.get(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY), @@ -252,7 +251,7 @@ public void processElement( final ConfigBuilder configBuilder = new ConfigBuilder(options); SamzaPipelineTranslator.createConfig(pipeline, configCtx, configBuilder); - final Config config = configBuilder.build(); + final Map config = configBuilder.build(); assertEquals( RocksDbKeyValueStorageEngineFactory.class.getName(), @@ -263,7 +262,7 @@ public void processElement( options.setStateDurable(true); SamzaPipelineTranslator.createConfig(pipeline, configCtx, configBuilder); - final Config config2 = configBuilder.build(); + final Map config2 = configBuilder.build(); assertEquals( "TestStoreConfig-1-testState-changelog", config2.get("stores.testState.changelog")); } @@ -312,7 +311,7 @@ public void processElement( final ConfigContext configCtx = new ConfigContext(idMap, nonUniqueStateIds, options); final ConfigBuilder configBuilder = new ConfigBuilder(options); SamzaPipelineTranslator.createConfig(pipeline, configCtx, configBuilder); - final Config config = configBuilder.build(); + final Map config = configBuilder.build(); assertEquals( RocksDbKeyValueStorageEngineFactory.class.getName(), @@ -330,7 +329,7 @@ public void processElement( options.setStateDurable(true); SamzaPipelineTranslator.createConfig(pipeline, configCtx, configBuilder); - final Config config2 = configBuilder.build(); + final Map config2 = configBuilder.build(); assertEquals( "TestStoreConfig-1-testState-First_stateful_ParDo-changelog", config2.get("stores.testState-First_stateful_ParDo.changelog")); @@ -384,7 +383,7 @@ public void processElement( final ConfigBuilder configBuilder = new ConfigBuilder(options); SamzaPipelineTranslator.createConfig(pipeline, configCtx, configBuilder); - final Config config = configBuilder.build(); + final Map config = configBuilder.build(); assertEquals( RocksDbKeyValueStorageEngineFactory.class.getName(), @@ -406,7 +405,7 @@ public void processElement( options.setStateDurable(true); SamzaPipelineTranslator.createConfig(pipeline, configCtx, configBuilder); - final Config config2 = configBuilder.build(); + final Map config2 = configBuilder.build(); assertEquals( "TestStoreConfig-1-testState-Same_stateful_ParDo_Name-changelog", config2.get("stores.testState-Same_stateful_ParDo_Name.changelog"));