From 217feb164956afb04ee9499822fa62ab6711ec8e Mon Sep 17 00:00:00 2001 From: Arun Pandian Date: Tue, 17 Dec 2024 05:12:20 -0800 Subject: [PATCH] fix GlobalWindows + keyedwindows --- ...ultiplexingGroupByKeyTransformMatcher.java | 5 +- .../internal/StateMultiplexingGroupByKey.java | 9 +- .../transforms/windowing}/KeyedWindow.java | 85 +++++++++---------- .../beam/sdk/transforms/windowing/Window.java | 15 +++- 4 files changed, 58 insertions(+), 56 deletions(-) rename {runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal => sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing}/KeyedWindow.java (66%) diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/StateMultiplexingGroupByKeyTransformMatcher.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/StateMultiplexingGroupByKeyTransformMatcher.java index 06953a0928d5..df4aca80d6ea 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/StateMultiplexingGroupByKeyTransformMatcher.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/StateMultiplexingGroupByKeyTransformMatcher.java @@ -36,13 +36,10 @@ class StateMultiplexingGroupByKeyTransformMatcher implements PTransformMatcher { @Override public boolean matches(AppliedPTransform application) { - LOG.info(application.getFullName()); if (!(application.getTransform() instanceof GroupByKey)) { - LOG.info(application.getFullName() + " returning false"); return false; } for (PCollection pCollection : application.getMainInputs().values()) { - LOG.info(application.getFullName() + " " + pCollection.getCoder()); Coder coder = pCollection.getCoder(); if (!(coder instanceof KvCoder)) { return false; @@ -52,7 +49,7 @@ public boolean matches(AppliedPTransform application) { return false; } } - LOG.info(application.getFullName() + " returning true"); + LOG.info("Enabling State Multiplexing on {}", application.getFullName()); return true; } diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/StateMultiplexingGroupByKey.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/StateMultiplexingGroupByKey.java index 831d9493af82..78ad6635813e 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/StateMultiplexingGroupByKey.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/StateMultiplexingGroupByKey.java @@ -21,7 +21,6 @@ import java.util.Arrays; import java.util.Collections; import org.apache.beam.runners.dataflow.DataflowRunner; -import org.apache.beam.runners.dataflow.internal.KeyedWindow.KeyedWindowFn; import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; import org.apache.beam.runners.dataflow.util.ByteStringCoder; import org.apache.beam.sdk.coders.Coder; @@ -36,6 +35,8 @@ import org.apache.beam.sdk.transforms.SimpleFunction; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.KeyedWindow; +import org.apache.beam.sdk.transforms.windowing.KeyedWindow.KeyedWindowFn; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.transforms.windowing.WindowFn; @@ -194,7 +195,7 @@ public KV> apply(KV> kv) { PCollection>> smallKeyBranch = mapKeysToBytes .get(smallKeys) - .apply(Window.into(new KeyedWindowFn<>(originalWindowFn))) + .apply(Window.into(new KeyedWindowFn<>(ByteStringCoder.of(), originalWindowFn))) .apply( "MapKeysToVirtualKeys", MapElements.via( @@ -215,13 +216,13 @@ public KV apply(KV value) { new DoFn>, KV>>() { @ProcessElement public void processElement(ProcessContext c, BoundedWindow w, PaneInfo pane) { - ByteString key = ((KeyedWindow) w).getKey(); + ByteString key = ((KeyedWindow) w).getKey(); try { // is it correct to use the pane from Keyed window here? c.outputWindowedValue( KV.of(keyCoder.decode(key.newInput()), c.element().getValue()), c.timestamp(), - Collections.singleton(((KeyedWindow) w).getWindow()), + Collections.singleton(((KeyedWindow) w).getWindow()), pane); } catch (IOException e) { throw new RuntimeException(e); diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/KeyedWindow.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/KeyedWindow.java similarity index 66% rename from runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/KeyedWindow.java rename to sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/KeyedWindow.java index 0268d50025b1..7c6b0c1105c6 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/KeyedWindow.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/KeyedWindow.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.runners.dataflow.internal; +package org.apache.beam.sdk.transforms.windowing; import java.io.IOException; import java.io.InputStream; @@ -27,33 +27,27 @@ import java.util.Map.Entry; import java.util.Objects; import java.util.stream.Collectors; -import org.apache.beam.runners.dataflow.util.ByteStringCoder; +import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.transforms.display.DisplayData; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.IncompatibleWindowException; -import org.apache.beam.sdk.transforms.windowing.NonMergingWindowFn; -import org.apache.beam.sdk.transforms.windowing.WindowFn; -import org.apache.beam.sdk.transforms.windowing.WindowMappingFn; import org.apache.beam.sdk.values.KV; -import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; import org.checkerframework.checker.nullness.qual.NonNull; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Instant; -public class KeyedWindow extends BoundedWindow { +@Internal +public class KeyedWindow extends BoundedWindow { - private final ByteString key; + private final K key; private final W window; - public KeyedWindow(ByteString name, W window) { - this.key = name; + public KeyedWindow(K key, W window) { + this.key = key; this.window = window; } - public ByteString getKey() { + public K getKey() { return key; } @@ -82,7 +76,7 @@ public boolean equals(@Nullable Object o) { if (!(o instanceof KeyedWindow)) { return false; } - KeyedWindow that = (KeyedWindow) o; + KeyedWindow that = (KeyedWindow) o; return Objects.equals(key, that.key) && Objects.equals(window, that.window); } @@ -91,18 +85,25 @@ public int hashCode() { return Objects.hash(key, window); } - public static class KeyedWindowFn - extends WindowFn, KeyedWindow> { + @Internal + public static class KeyedWindowFn + extends WindowFn, KeyedWindow> { private final WindowFn windowFn; + private final Coder keyCoder; - public KeyedWindowFn(WindowFn windowFn) { + public KeyedWindowFn(Coder keyCoder, WindowFn windowFn) { + this.keyCoder = keyCoder; this.windowFn = (WindowFn) windowFn; } + public WindowFn getInnerWindowFn() { + return windowFn; + } + @Override - public Collection> assignWindows( - WindowFn, KeyedWindow>.AssignContext c) throws Exception { + public Collection> assignWindows( + WindowFn, KeyedWindow>.AssignContext c) throws Exception { return windowFn .assignWindows( @@ -129,11 +130,12 @@ public BoundedWindow window() { } @Override - public void mergeWindows(WindowFn, KeyedWindow>.MergeContext c) throws Exception { + public void mergeWindows(WindowFn, KeyedWindow>.MergeContext c) + throws Exception { if (windowFn instanceof NonMergingWindowFn) { return; } - HashMap> keyToWindow = new HashMap<>(); + HashMap> keyToWindow = new HashMap<>(); c.windows() .forEach( keyedWindow -> { @@ -141,8 +143,8 @@ public void mergeWindows(WindowFn, KeyedWindow>.MergeContext c) thro keyToWindow.computeIfAbsent(keyedWindow.getKey(), k -> new ArrayList<>()); windows.add(keyedWindow.getWindow()); }); - for (Entry> entry : keyToWindow.entrySet()) { - ByteString key = entry.getKey(); + for (Entry> entry : keyToWindow.entrySet()) { + K key = entry.getKey(); List windows = entry.getValue(); windowFn.mergeWindows( new WindowFn.MergeContext() { @@ -153,7 +155,7 @@ public Collection windows() { @Override public void merge(Collection toBeMerged, W mergeResult) throws Exception { - List> toMergedKeyedWindows = + List> toMergedKeyedWindows = toBeMerged.stream() .map(window -> new KeyedWindow<>(key, window)) .collect(Collectors.toList()); @@ -170,22 +172,13 @@ public boolean isCompatible(WindowFn other) { } @Override - public Coder> windowCoder() { - return new KeyedWindowCoder<>(windowFn.windowCoder()); + public Coder> windowCoder() { + return new KeyedWindowCoder<>(keyCoder, windowFn.windowCoder()); } @Override - public WindowMappingFn> getDefaultWindowMappingFn() { - return new WindowMappingFn>() { - @Override - public KeyedWindow getSideInputWindow(BoundedWindow mainWindow) { - Preconditions.checkArgument(mainWindow instanceof KeyedWindow); - KeyedWindow mainKeyedWindow = (KeyedWindow) mainWindow; - return new KeyedWindow<>( - mainKeyedWindow.getKey(), - windowFn.getDefaultWindowMappingFn().getSideInputWindow(mainKeyedWindow.getWindow())); - } - }; + public WindowMappingFn> getDefaultWindowMappingFn() { + throw new UnsupportedOperationException("KeyedWindow not supported with side inputs"); } @Override @@ -211,23 +204,25 @@ public void populateDisplayData(DisplayData.Builder builder) { } } - public static class KeyedWindowCoder extends Coder> { + @Internal + public static class KeyedWindowCoder + extends Coder> { - private final KvCoder coder; + private final KvCoder coder; - public KeyedWindowCoder(Coder windowCoder) { + public KeyedWindowCoder(Coder keyCoder, Coder windowCoder) { // :TODO consider swapping the order for improved state locality - this.coder = KvCoder.of(ByteStringCoder.of(), windowCoder); + this.coder = KvCoder.of(keyCoder, windowCoder); } @Override - public void encode(KeyedWindow value, OutputStream outStream) throws IOException { + public void encode(KeyedWindow value, OutputStream outStream) throws IOException { coder.encode(KV.of(value.getKey(), value.getWindow()), outStream); } @Override - public KeyedWindow decode(InputStream inStream) throws IOException { - KV decode = coder.decode(inStream); + public KeyedWindow decode(InputStream inStream) throws IOException { + KV decode = coder.decode(inStream); return new KeyedWindow<>(decode.getKey(), decode.getValue()); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java index 2aa18fa5ab5b..7ebf03ec86e0 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java @@ -25,6 +25,7 @@ import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.SimpleFunction; import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.windowing.KeyedWindow.KeyedWindowFn; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionList; import org.apache.beam.sdk.values.WindowingStrategy; @@ -214,6 +215,7 @@ public static Window configure() { @AutoValue.Builder abstract static class Builder { + abstract Builder setWindowFn(WindowFn windowFn); abstract Builder setTrigger(Trigger trigger); @@ -352,12 +354,17 @@ private void applicableTo(PCollection input) { WindowingStrategy outputStrategy = getOutputStrategyInternal(input.getWindowingStrategy()); + boolean isGlobalWindow = (outputStrategy.getWindowFn() instanceof GlobalWindows); + if (outputStrategy.getWindowFn() instanceof KeyedWindow.KeyedWindowFn) { + isGlobalWindow = + (((KeyedWindowFn) outputStrategy.getWindowFn()).getInnerWindowFn() + instanceof GlobalWindows); + } + // Make sure that the windowing strategy is complete & valid. if (outputStrategy.isTriggerSpecified() && !(outputStrategy.getTrigger() instanceof DefaultTrigger) - && !(outputStrategy.getWindowFn() instanceof GlobalWindows) - // :TODO Add proper logic for Keyed Window here - && !(outputStrategy.getWindowFn().getClass().getName().contains("KeyedWindow")) + && !(isGlobalWindow) && !outputStrategy.isAllowedLatenessSpecified()) { throw new IllegalArgumentException( "Except when using GlobalWindows," @@ -456,6 +463,7 @@ protected String getKindString() { * Pipeline authors should use {@link Window} directly instead. */ public static class Assign extends PTransform, PCollection> { + private final @Nullable Window original; private final WindowingStrategy updatedStrategy; @@ -506,6 +514,7 @@ public static Remerge remerge() { * again as part of the next {@link org.apache.beam.sdk.transforms.GroupByKey}. */ private static class Remerge extends PTransform, PCollection> { + @Override public PCollection expand(PCollection input) { return input