From 10074a9313316238baa877bfa8b91a29bb642ac4 Mon Sep 17 00:00:00 2001 From: Xinyu Liu Date: Wed, 31 Jul 2024 11:44:05 -0700 Subject: [PATCH] Fix the repeatively calling toProto() when creating ReduceFniRunner (#122) * Fix the repeatively calling toProto() when creating ReduceFniRunner * Update beam version * Fix avro plugin repo --------- Co-authored-by: Xinyu Liu --- buildSrc/build.gradle.kts | 7 ++++++- .../org/apache/beam/gradle/BeamModulePlugin.groovy | 2 +- gradle.properties | 4 ++-- .../core/GroupAlsoByWindowViaWindowSetNewDoFn.java | 9 ++++++--- .../beam/runners/core/metrics/DistributionData.java | 1 - 5 files changed, 15 insertions(+), 8 deletions(-) diff --git a/buildSrc/build.gradle.kts b/buildSrc/build.gradle.kts index 9d2f8f08169c..e373fb2efd09 100644 --- a/buildSrc/build.gradle.kts +++ b/buildSrc/build.gradle.kts @@ -30,6 +30,11 @@ repositories { url = uri("https://repo.spring.io/plugins-release/") content { includeGroup("io.spring.gradle") } } + // For obsolete Avro plugin + maven { + url = uri("https://jitpack.io") + content { includeGroup("com.github.davidmc24.gradle-avro-plugin") } + } } // Dependencies on other plugins used when this plugin is invoked @@ -40,7 +45,7 @@ dependencies { implementation("com.github.spotbugs.snom:spotbugs-gradle-plugin:5.0.3") runtimeOnly("com.google.protobuf:protobuf-gradle-plugin:0.8.13") // Enable proto code generation - runtimeOnly("com.commercehub.gradle.plugin:gradle-avro-plugin:0.11.0") // Enable Avro code generation + runtimeOnly("com.github.davidmc24.gradle-avro-plugin:gradle-avro-plugin:0.16.0") // Enable Avro code generation runtimeOnly("com.diffplug.spotless:spotless-plugin-gradle:5.6.1") // Enable a code formatting plugin runtimeOnly("gradle.plugin.com.palantir.gradle.docker:gradle-docker:0.22.0") // Enable building Docker containers runtimeOnly("gradle.plugin.com.dorongold.plugins:task-tree:1.5") // Adds a 'taskTree' task to print task dependency tree diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index 3e1f4ff6e036..8e51b6a749f1 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -398,7 +398,7 @@ class BeamModulePlugin implements Plugin { // Automatically use the official release version if we are performing a release // otherwise append '-SNAPSHOT' - project.version = '2.45.23' + project.version = '2.45.24' if (isLinkedin(project)) { project.ext.mavenGroupId = 'com.linkedin.beam' } diff --git a/gradle.properties b/gradle.properties index 0126fa707469..440fcc3051f1 100644 --- a/gradle.properties +++ b/gradle.properties @@ -30,8 +30,8 @@ signing.gnupg.useLegacyGpg=true # buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy. # To build a custom Beam version make sure you change it in both places, see # https://github.com/apache/beam/issues/21302. -version=2.45.23 -sdk_version=2.45.23 +version=2.45.24 +sdk_version=2.45.24 javaVersion=1.8 diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java index 5a0a85433d53..cc3f41a62470 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java @@ -68,6 +68,7 @@ DoFn, KV> create( private transient SideInputReader sideInputReader; private transient DoFnRunners.OutputManager outputManager; private TupleTag> mainTag; + private ExecutableTriggerStateMachine triggerStateMachine; public GroupAlsoByWindowViaWindowSetNewDoFn( WindowingStrategy windowingStrategy, @@ -86,6 +87,10 @@ public GroupAlsoByWindowViaWindowSetNewDoFn( this.windowingStrategy = noWildcard; this.reduceFn = reduceFn; this.stateInternalsFactory = stateInternalsFactory; + this.triggerStateMachine = + ExecutableTriggerStateMachine.create( + TriggerStateMachines.stateMachineForTrigger( + TriggerTranslation.toProto(windowingStrategy.getTrigger()))); } private OutputWindowedValue> outputWindowedValue() { @@ -123,9 +128,7 @@ public void processElement(ProcessContext c) throws Exception { new ReduceFnRunner<>( key, windowingStrategy, - ExecutableTriggerStateMachine.create( - TriggerStateMachines.stateMachineForTrigger( - TriggerTranslation.toProto(windowingStrategy.getTrigger()))), + triggerStateMachine, stateInternals, timerInternals, outputWindowedValue(), diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/DistributionData.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/DistributionData.java index bbbab400abf9..85124152dff4 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/DistributionData.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/DistributionData.java @@ -36,7 +36,6 @@ import org.apache.datasketches.quantiles.UpdateDoublesSketch; import org.checkerframework.checker.nullness.qual.Nullable; - /** * Data describing the the distribution. This should retain enough detail that it can be combined * with other {@link DistributionData}.