Skip to content

Commit

Permalink
fix GlobalWindows + keyedwindows
Browse files Browse the repository at this point in the history
  • Loading branch information
arunpandianp committed Dec 17, 2024
1 parent 76f471d commit 217feb1
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,10 @@ class StateMultiplexingGroupByKeyTransformMatcher implements PTransformMatcher {

@Override
public boolean matches(AppliedPTransform<?, ?, ?> application) {
LOG.info(application.getFullName());
if (!(application.getTransform() instanceof GroupByKey)) {
LOG.info(application.getFullName() + " returning false");
return false;
}
for (PCollection<?> pCollection : application.getMainInputs().values()) {
LOG.info(application.getFullName() + " " + pCollection.getCoder());
Coder<?> coder = pCollection.getCoder();
if (!(coder instanceof KvCoder)) {
return false;
Expand All @@ -52,7 +49,7 @@ public boolean matches(AppliedPTransform<?, ?, ?> application) {
return false;
}
}
LOG.info(application.getFullName() + " returning true");
LOG.info("Enabling State Multiplexing on {}", application.getFullName());
return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import java.util.Arrays;
import java.util.Collections;
import org.apache.beam.runners.dataflow.DataflowRunner;
import org.apache.beam.runners.dataflow.internal.KeyedWindow.KeyedWindowFn;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.runners.dataflow.util.ByteStringCoder;
import org.apache.beam.sdk.coders.Coder;
Expand All @@ -36,6 +35,8 @@
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.KeyedWindow;
import org.apache.beam.sdk.transforms.windowing.KeyedWindow.KeyedWindowFn;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
Expand Down Expand Up @@ -194,7 +195,7 @@ public KV<K, Iterable<V>> apply(KV<ByteString, Iterable<V>> kv) {
PCollection<KV<K, Iterable<V>>> smallKeyBranch =
mapKeysToBytes
.get(smallKeys)
.apply(Window.into(new KeyedWindowFn<>(originalWindowFn)))
.apply(Window.into(new KeyedWindowFn<>(ByteStringCoder.of(), originalWindowFn)))
.apply(
"MapKeysToVirtualKeys",
MapElements.via(
Expand All @@ -215,13 +216,13 @@ public KV<Integer, V> apply(KV<ByteString, V> value) {
new DoFn<KV<Integer, Iterable<V>>, KV<K, Iterable<V>>>() {
@ProcessElement
public void processElement(ProcessContext c, BoundedWindow w, PaneInfo pane) {
ByteString key = ((KeyedWindow<?>) w).getKey();
ByteString key = ((KeyedWindow<ByteString, ?>) w).getKey();
try {
// is it correct to use the pane from Keyed window here?
c.outputWindowedValue(
KV.of(keyCoder.decode(key.newInput()), c.element().getValue()),
c.timestamp(),
Collections.singleton(((KeyedWindow<?>) w).getWindow()),
Collections.singleton(((KeyedWindow<ByteString, ?>) w).getWindow()),
pane);
} catch (IOException e) {
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.dataflow.internal;
package org.apache.beam.sdk.transforms.windowing;

import java.io.IOException;
import java.io.InputStream;
Expand All @@ -27,33 +27,27 @@
import java.util.Map.Entry;
import java.util.Objects;
import java.util.stream.Collectors;
import org.apache.beam.runners.dataflow.util.ByteStringCoder;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.IncompatibleWindowException;
import org.apache.beam.sdk.transforms.windowing.NonMergingWindowFn;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.transforms.windowing.WindowMappingFn;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Instant;

public class KeyedWindow<W extends @NonNull BoundedWindow> extends BoundedWindow {
@Internal
public class KeyedWindow<K, W extends @NonNull BoundedWindow> extends BoundedWindow {

private final ByteString key;
private final K key;
private final W window;

public KeyedWindow(ByteString name, W window) {
this.key = name;
public KeyedWindow(K key, W window) {
this.key = key;
this.window = window;
}

public ByteString getKey() {
public K getKey() {
return key;
}

Expand Down Expand Up @@ -82,7 +76,7 @@ public boolean equals(@Nullable Object o) {
if (!(o instanceof KeyedWindow)) {
return false;
}
KeyedWindow<?> that = (KeyedWindow<?>) o;
KeyedWindow<?, ?> that = (KeyedWindow<?, ?>) o;
return Objects.equals(key, that.key) && Objects.equals(window, that.window);
}

Expand All @@ -91,18 +85,25 @@ public int hashCode() {
return Objects.hash(key, window);
}

public static class KeyedWindowFn<K extends ByteString, V, W extends BoundedWindow>
extends WindowFn<KV<K, V>, KeyedWindow<W>> {
@Internal
public static class KeyedWindowFn<K, V, W extends BoundedWindow>
extends WindowFn<KV<K, V>, KeyedWindow<K, W>> {

private final WindowFn<V, W> windowFn;
private final Coder<K> keyCoder;

public KeyedWindowFn(WindowFn<?, ?> windowFn) {
public KeyedWindowFn(Coder<K> keyCoder, WindowFn<?, ?> windowFn) {
this.keyCoder = keyCoder;
this.windowFn = (WindowFn<V, W>) windowFn;
}

public WindowFn<V, W> getInnerWindowFn() {
return windowFn;
}

@Override
public Collection<KeyedWindow<W>> assignWindows(
WindowFn<KV<K, V>, KeyedWindow<W>>.AssignContext c) throws Exception {
public Collection<KeyedWindow<K, W>> assignWindows(
WindowFn<KV<K, V>, KeyedWindow<K, W>>.AssignContext c) throws Exception {

return windowFn
.assignWindows(
Expand All @@ -129,20 +130,21 @@ public BoundedWindow window() {
}

@Override
public void mergeWindows(WindowFn<KV<K, V>, KeyedWindow<W>>.MergeContext c) throws Exception {
public void mergeWindows(WindowFn<KV<K, V>, KeyedWindow<K, W>>.MergeContext c)
throws Exception {
if (windowFn instanceof NonMergingWindowFn) {
return;
}
HashMap<ByteString, List<W>> keyToWindow = new HashMap<>();
HashMap<K, List<W>> keyToWindow = new HashMap<>();
c.windows()
.forEach(
keyedWindow -> {
List<W> windows =
keyToWindow.computeIfAbsent(keyedWindow.getKey(), k -> new ArrayList<>());
windows.add(keyedWindow.getWindow());
});
for (Entry<ByteString, List<W>> entry : keyToWindow.entrySet()) {
ByteString key = entry.getKey();
for (Entry<K, List<W>> entry : keyToWindow.entrySet()) {
K key = entry.getKey();
List<W> windows = entry.getValue();
windowFn.mergeWindows(
new WindowFn<V, W>.MergeContext() {
Expand All @@ -153,7 +155,7 @@ public Collection<W> windows() {

@Override
public void merge(Collection<W> toBeMerged, W mergeResult) throws Exception {
List<KeyedWindow<W>> toMergedKeyedWindows =
List<KeyedWindow<K, W>> toMergedKeyedWindows =
toBeMerged.stream()
.map(window -> new KeyedWindow<>(key, window))
.collect(Collectors.toList());
Expand All @@ -170,22 +172,13 @@ public boolean isCompatible(WindowFn<?, ?> other) {
}

@Override
public Coder<KeyedWindow<W>> windowCoder() {
return new KeyedWindowCoder<>(windowFn.windowCoder());
public Coder<KeyedWindow<K, W>> windowCoder() {
return new KeyedWindowCoder<>(keyCoder, windowFn.windowCoder());
}

@Override
public WindowMappingFn<KeyedWindow<W>> getDefaultWindowMappingFn() {
return new WindowMappingFn<KeyedWindow<W>>() {
@Override
public KeyedWindow<W> getSideInputWindow(BoundedWindow mainWindow) {
Preconditions.checkArgument(mainWindow instanceof KeyedWindow);
KeyedWindow<W> mainKeyedWindow = (KeyedWindow<W>) mainWindow;
return new KeyedWindow<>(
mainKeyedWindow.getKey(),
windowFn.getDefaultWindowMappingFn().getSideInputWindow(mainKeyedWindow.getWindow()));
}
};
public WindowMappingFn<KeyedWindow<K, W>> getDefaultWindowMappingFn() {
throw new UnsupportedOperationException("KeyedWindow not supported with side inputs");
}

@Override
Expand All @@ -211,23 +204,25 @@ public void populateDisplayData(DisplayData.Builder builder) {
}
}

public static class KeyedWindowCoder<W extends BoundedWindow> extends Coder<KeyedWindow<W>> {
@Internal
public static class KeyedWindowCoder<K, W extends BoundedWindow>
extends Coder<KeyedWindow<K, W>> {

private final KvCoder<ByteString, W> coder;
private final KvCoder<K, W> coder;

public KeyedWindowCoder(Coder<W> windowCoder) {
public KeyedWindowCoder(Coder<K> keyCoder, Coder<W> windowCoder) {
// :TODO consider swapping the order for improved state locality
this.coder = KvCoder.of(ByteStringCoder.of(), windowCoder);
this.coder = KvCoder.of(keyCoder, windowCoder);
}

@Override
public void encode(KeyedWindow<W> value, OutputStream outStream) throws IOException {
public void encode(KeyedWindow<K, W> value, OutputStream outStream) throws IOException {
coder.encode(KV.of(value.getKey(), value.getWindow()), outStream);
}

@Override
public KeyedWindow<W> decode(InputStream inStream) throws IOException {
KV<ByteString, W> decode = coder.decode(inStream);
public KeyedWindow<K, W> decode(InputStream inStream) throws IOException {
KV<K, W> decode = coder.decode(inStream);
return new KeyedWindow<>(decode.getKey(), decode.getValue());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.windowing.KeyedWindow.KeyedWindowFn;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.WindowingStrategy;
Expand Down Expand Up @@ -214,6 +215,7 @@ public static <T> Window<T> configure() {

@AutoValue.Builder
abstract static class Builder<T> {

abstract Builder<T> setWindowFn(WindowFn<? super T, ?> windowFn);

abstract Builder<T> setTrigger(Trigger trigger);
Expand Down Expand Up @@ -352,12 +354,17 @@ private void applicableTo(PCollection<?> input) {
WindowingStrategy<?, ?> outputStrategy =
getOutputStrategyInternal(input.getWindowingStrategy());

boolean isGlobalWindow = (outputStrategy.getWindowFn() instanceof GlobalWindows);
if (outputStrategy.getWindowFn() instanceof KeyedWindow.KeyedWindowFn) {
isGlobalWindow =
(((KeyedWindowFn<?, ?, ?>) outputStrategy.getWindowFn()).getInnerWindowFn()
instanceof GlobalWindows);
}

// Make sure that the windowing strategy is complete & valid.
if (outputStrategy.isTriggerSpecified()
&& !(outputStrategy.getTrigger() instanceof DefaultTrigger)
&& !(outputStrategy.getWindowFn() instanceof GlobalWindows)
// :TODO Add proper logic for Keyed Window here
&& !(outputStrategy.getWindowFn().getClass().getName().contains("KeyedWindow"))
&& !(isGlobalWindow)
&& !outputStrategy.isAllowedLatenessSpecified()) {
throw new IllegalArgumentException(
"Except when using GlobalWindows,"
Expand Down Expand Up @@ -456,6 +463,7 @@ protected String getKindString() {
* Pipeline authors should use {@link Window} directly instead.
*/
public static class Assign<T> extends PTransform<PCollection<T>, PCollection<T>> {

private final @Nullable Window<T> original;
private final WindowingStrategy<T, ?> updatedStrategy;

Expand Down Expand Up @@ -506,6 +514,7 @@ public static <T> Remerge<T> remerge() {
* again as part of the next {@link org.apache.beam.sdk.transforms.GroupByKey}.
*/
private static class Remerge<T> extends PTransform<PCollection<T>, PCollection<T>> {

@Override
public PCollection<T> expand(PCollection<T> input) {
return input
Expand Down

0 comments on commit 217feb1

Please sign in to comment.