From 76f471d140dfc24074e3aaf05d97af2c47f35c0f Mon Sep 17 00:00:00 2001 From: Arun Pandian Date: Tue, 17 Dec 2024 00:04:11 -0800 Subject: [PATCH] fix GlobalWindows + IdentityWindows --- .../beam/sdk/transforms/windowing/Window.java | 2 ++ .../org/apache/beam/sdk/util/IdentityWindowFn.java | 14 ++++++++------ 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java index 32e218d3cb11..2aa18fa5ab5b 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java @@ -356,6 +356,8 @@ private void applicableTo(PCollection input) { if (outputStrategy.isTriggerSpecified() && !(outputStrategy.getTrigger() instanceof DefaultTrigger) && !(outputStrategy.getWindowFn() instanceof GlobalWindows) + // :TODO Add proper logic for Keyed Window here + && !(outputStrategy.getWindowFn().getClass().getName().contains("KeyedWindow")) && !outputStrategy.isAllowedLatenessSpecified()) { throw new IllegalArgumentException( "Except when using GlobalWindows," diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IdentityWindowFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IdentityWindowFn.java index ca78dc60f2fe..a3d802cc740b 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IdentityWindowFn.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IdentityWindowFn.java @@ -70,12 +70,14 @@ public Collection assignWindows(WindowFn.Assign @Override public boolean isCompatible(WindowFn other) { - throw new UnsupportedOperationException( - String.format( - "%s.isCompatible() should never be called." - + " It is a private implementation detail of sdk utilities." - + " This message indicates a bug in the Beam SDK.", - getClass().getCanonicalName())); + // :TODO anything else to consider here? + // throw new UnsupportedOperationException( + // String.format( + // "%s.isCompatible() should never be called." + // + " It is a private implementation detail of sdk utilities." + // + " This message indicates a bug in the Beam SDK.", + // getClass().getCanonicalName())); + return other instanceof IdentityWindowFn; } @Override