Skip to content

Commit

Permalink
fix GlobalWindows + IdentityWindows
Browse files Browse the repository at this point in the history
  • Loading branch information
arunpandianp committed Dec 17, 2024
1 parent b15fe35 commit 76f471d
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,8 @@ private void applicableTo(PCollection<?> input) {
if (outputStrategy.isTriggerSpecified()
&& !(outputStrategy.getTrigger() instanceof DefaultTrigger)
&& !(outputStrategy.getWindowFn() instanceof GlobalWindows)
// :TODO Add proper logic for Keyed Window here
&& !(outputStrategy.getWindowFn().getClass().getName().contains("KeyedWindow"))
&& !outputStrategy.isAllowedLatenessSpecified()) {
throw new IllegalArgumentException(
"Except when using GlobalWindows,"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,14 @@ public Collection<BoundedWindow> assignWindows(WindowFn<T, BoundedWindow>.Assign

@Override
public boolean isCompatible(WindowFn<?, ?> other) {
throw new UnsupportedOperationException(
String.format(
"%s.isCompatible() should never be called."
+ " It is a private implementation detail of sdk utilities."
+ " This message indicates a bug in the Beam SDK.",
getClass().getCanonicalName()));
// :TODO anything else to consider here?
// throw new UnsupportedOperationException(
// String.format(
// "%s.isCompatible() should never be called."
// + " It is a private implementation detail of sdk utilities."
// + " This message indicates a bug in the Beam SDK.",
// getClass().getCanonicalName()));
return other instanceof IdentityWindowFn;
}

@Override
Expand Down

0 comments on commit 76f471d

Please sign in to comment.