From 69096ea90a7bf5c8b3fee280552a3a7c4496da4f Mon Sep 17 00:00:00 2001 From: Jan Lukavsky Date: Wed, 13 Mar 2024 10:58:22 +0100 Subject: [PATCH] [runners-flink] #30621 use groupBy for Reshuffle in batch --- runners/flink/flink_runner.gradle | 2 -- .../flink/FlinkBatchTransformTranslators.java | 17 +++++++++-------- 2 files changed, 9 insertions(+), 10 deletions(-) diff --git a/runners/flink/flink_runner.gradle b/runners/flink/flink_runner.gradle index 1d91284a3d16..3bcbfdca0290 100644 --- a/runners/flink/flink_runner.gradle +++ b/runners/flink/flink_runner.gradle @@ -192,13 +192,11 @@ dependencies { if (flink_version.compareTo("1.14") >= 0) { implementation "org.apache.flink:flink-runtime:$flink_version" - implementation "org.apache.flink:flink-optimizer:$flink_version" implementation "org.apache.flink:flink-metrics-core:$flink_version" testImplementation "org.apache.flink:flink-runtime:$flink_version:tests" testImplementation "org.apache.flink:flink-rpc-akka:$flink_version" } else { implementation "org.apache.flink:flink-runtime_2.12:$flink_version" - implementation "org.apache.flink:flink-optimizer_2.12:$flink_version" testImplementation "org.apache.flink:flink-runtime_2.12:$flink_version:tests" } testImplementation project(path: ":sdks:java:core", configuration: "shadowTest") diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java index 8f37178d24f1..383730daeef8 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java @@ -94,8 +94,6 @@ import org.apache.flink.api.java.operators.MapOperator; import org.apache.flink.api.java.operators.SingleInputUdfOperator; import org.apache.flink.api.java.operators.UnsortedGrouping; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.optimizer.Optimizer; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Instant; @@ -413,12 +411,15 @@ public void translateNode( outputType, FlinkIdentityFunction.of(), getCurrentTransformName(context)); - final Configuration partitionOptions = new Configuration(); - partitionOptions.setString( - Optimizer.HINT_SHIP_STRATEGY, Optimizer.HINT_SHIP_STRATEGY_REPARTITION); - context.setOutputDataSet( - context.getOutput(transform), - retypedDataSet.map(FlinkIdentityFunction.of()).withParameters(partitionOptions)); + WindowedValue.WindowedValueCoder> kvWvCoder = + (WindowedValue.WindowedValueCoder>) outputType.getCoder(); + KvCoder kvCoder = (KvCoder) kvWvCoder.getValueCoder(); + DataSet>> reshuffle = + retypedDataSet + .groupBy(new KvKeySelector<>(kvCoder.getKeyCoder())) + .>>reduceGroup((i, c) -> i.forEach(c::collect)) + .returns(outputType); + context.setOutputDataSet(context.getOutput(transform), reshuffle); } }