From ede9deab9b0bea0c235f8041c6e6c88050ff2e3e Mon Sep 17 00:00:00 2001 From: Arun Pandian Date: Fri, 6 Dec 2024 14:28:12 -0800 Subject: [PATCH 01/11] Add an experiment `enable_gbk_state_multiplexing` for multiplexing GroupByKey state over a fixed number of sharding keys. The number of sharding keys is fixed at 32k. --- .../beam/runners/dataflow/DataflowRunner.java | 17 + ...MultiplexingGroupByKeyOverrideFactory.java | 42 +++ .../internal/StateMultiplexingGroupByKey.java | 316 ++++++++++++++++++ runners/prism/java/build.gradle | 9 +- .../sdk/transforms/windowing/KeyedWindow.java | 269 +++++++++++++++ 5 files changed, 652 insertions(+), 1 deletion(-) create mode 100644 runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/StateMultiplexingGroupByKeyOverrideFactory.java create mode 100644 runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/StateMultiplexingGroupByKey.java create mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/KeyedWindow.java diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index ce99958c57fd..9f41ea138bd5 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -117,6 +117,7 @@ import org.apache.beam.sdk.transforms.Combine.GroupedValues; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.GroupIntoBatches; import org.apache.beam.sdk.transforms.Impulse; import org.apache.beam.sdk.transforms.PTransform; @@ -214,6 +215,8 @@ public class DataflowRunner extends PipelineRunner { "unsafely_attempt_to_process_unbounded_data_in_batch_mode"; private static final Logger LOG = LoggerFactory.getLogger(DataflowRunner.class); + private static final String EXPERIMENT_ENABLE_GBK_STATE_MULTIPLEXING = + "enable_gbk_state_multiplexing"; /** Provided configuration options. */ private final DataflowPipelineOptions options; @@ -596,6 +599,7 @@ protected DataflowRunner(DataflowPipelineOptions options) { private static class AlwaysCreateViaRead implements PTransformOverrideFactory, Create.Values> { + @Override public PTransformOverrideFactory.PTransformReplacement> getReplacementTransform( @@ -797,6 +801,12 @@ private List getOverrides(boolean streaming) { new RedistributeByKeyOverrideFactory())); if (streaming) { + if (DataflowRunner.hasExperiment(options, EXPERIMENT_ENABLE_GBK_STATE_MULTIPLEXING)) { + overridesBuilder.add( + PTransformOverride.of( + PTransformMatchers.classEqualTo(GroupByKey.class), + new StateMultiplexingGroupByKeyOverrideFactory<>())); + } // For update compatibility, always use a Read for Create in streaming mode. overridesBuilder .add( @@ -1180,6 +1190,7 @@ private List getDefaultArtifacts() { @VisibleForTesting static boolean isMultiLanguagePipeline(Pipeline pipeline) { class IsMultiLanguageVisitor extends PipelineVisitor.Defaults { + private boolean isMultiLanguage = false; private void performMultiLanguageTest(Node node) { @@ -1648,6 +1659,7 @@ private static EnvironmentInfo getEnvironmentInfoFromEnvironmentId( @AutoValue abstract static class EnvironmentInfo { + static EnvironmentInfo create( String environmentId, String containerUrl, List capabilities) { return new AutoValue_DataflowRunner_EnvironmentInfo( @@ -2105,6 +2117,7 @@ protected String getKindString() { } private static class StreamingPubsubSinkTranslators { + /** Rewrite {@link StreamingPubsubIOWrite} to the appropriate internal node. */ static class StreamingPubsubIOWriteTranslator implements TransformTranslator { @@ -2161,6 +2174,7 @@ private static void translate( private static class SingleOutputExpandableTransformTranslator implements TransformTranslator { + @Override public void translate( External.SingleOutputExpandableTransform transform, TranslationContext context) { @@ -2178,6 +2192,7 @@ public void translate( private static class MultiOutputExpandableTransformTranslator implements TransformTranslator { + @Override public void translate( External.MultiOutputExpandableTransform transform, TranslationContext context) { @@ -2726,6 +2741,7 @@ static void verifyStateSupportForWindowingStrategy(WindowingStrategy strategy) { */ private static class DataflowPayloadTranslator implements TransformPayloadTranslator> { + @Override public String getUrn(PTransform transform) { return "dataflow_stub:" + transform.getClass().getName(); @@ -2750,6 +2766,7 @@ public RunnerApi.FunctionSpec translate( }) @AutoService(TransformPayloadTranslatorRegistrar.class) public static class DataflowTransformTranslator implements TransformPayloadTranslatorRegistrar { + @Override public Map, ? extends TransformPayloadTranslator> getTransformPayloadTranslators() { diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/StateMultiplexingGroupByKeyOverrideFactory.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/StateMultiplexingGroupByKeyOverrideFactory.java new file mode 100644 index 000000000000..468a0a95d77c --- /dev/null +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/StateMultiplexingGroupByKeyOverrideFactory.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow; + +import org.apache.beam.runners.dataflow.internal.StateMultiplexingGroupByKey; +import org.apache.beam.sdk.runners.AppliedPTransform; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.util.construction.PTransformReplacements; +import org.apache.beam.sdk.util.construction.SingleInputOutputOverrideFactory; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; + +class StateMultiplexingGroupByKeyOverrideFactory + extends SingleInputOutputOverrideFactory< + PCollection>, PCollection>>, GroupByKey> { + + @Override + public PTransformReplacement>, PCollection>>> + getReplacementTransform( + AppliedPTransform< + PCollection>, PCollection>>, GroupByKey> + transform) { + return PTransformReplacement.of( + PTransformReplacements.getSingletonMainInput(transform), + StateMultiplexingGroupByKey.create(transform.getTransform().fewKeys())); + } +} diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/StateMultiplexingGroupByKey.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/StateMultiplexingGroupByKey.java new file mode 100644 index 000000000000..661652ce3453 --- /dev/null +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/StateMultiplexingGroupByKey.java @@ -0,0 +1,316 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.internal; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.Map; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.Coder.NonDeterministicException; +import org.apache.beam.sdk.coders.IterableCoder; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.Flatten; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SimpleFunction; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.windowing.AfterWatermark.AfterWatermarkEarlyAndLate; +import org.apache.beam.sdk.transforms.windowing.AfterWatermark.FromEndOfWindow; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.DefaultTrigger; +import org.apache.beam.sdk.transforms.windowing.GlobalWindows; +import org.apache.beam.sdk.transforms.windowing.KeyedWindow; +import org.apache.beam.sdk.transforms.windowing.KeyedWindow.KeyedWindowFn; +import org.apache.beam.sdk.transforms.windowing.Never.NeverTrigger; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.transforms.windowing.WindowFn; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollection.IsBounded; +import org.apache.beam.sdk.values.PCollectionList; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.TupleTagList; +import org.apache.beam.sdk.values.WindowingStrategy; +import org.apache.beam.vendor.grpc.v1p60p1.com.google.common.base.Preconditions; +import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString; +import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString.Output; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; +import org.checkerframework.checker.nullness.qual.Nullable; + +/** + * A GroupByKey implementation that multiplexes many small user keys over a fixed set of sharding + * keys for reducing per key overhead. + */ +public class StateMultiplexingGroupByKey + extends PTransform>, PCollection>>> { + + /* + * Keys larger than this threshold will not be multiplexed. + */ + private static final int SMALL_KEY_BYTES_THRESHOLD = 4096; + private final boolean fewKeys; + private final int numShardingKeys; + + private StateMultiplexingGroupByKey(boolean fewKeys) { + // :TODO plumb fewKeys to DataflowGroupByKey + this.fewKeys = fewKeys; + // :TODO Make this configurable + this.numShardingKeys = 32 << 10; + } + + /** + * Returns a {@code GroupByKey} {@code PTransform}. + * + * @param the type of the keys of the input and output {@code PCollection}s + * @param the type of the values of the input {@code PCollection} and the elements of the + * {@code Iterable}s in the output {@code PCollection} + */ + public static StateMultiplexingGroupByKey create(boolean fewKeys) { + return new StateMultiplexingGroupByKey<>(fewKeys); + } + + ///////////////////////////////////////////////////////////////////////////// + + public static void applicableTo(PCollection input) { + WindowingStrategy windowingStrategy = input.getWindowingStrategy(); + // Verify that the input PCollection is bounded, or that there is windowing/triggering being + // used. Without this, the watermark (at end of global window) will never be reached. + if (windowingStrategy.getWindowFn() instanceof GlobalWindows + && windowingStrategy.getTrigger() instanceof DefaultTrigger + && input.isBounded() != IsBounded.BOUNDED) { + throw new IllegalStateException( + "GroupByKey cannot be applied to non-bounded PCollection in the GlobalWindow without a" + + " trigger. Use a Window.into or Window.triggering transform prior to GroupByKey."); + } + + // Validate that the trigger does not finish before garbage collection time + if (!triggerIsSafe(windowingStrategy)) { + throw new IllegalArgumentException( + String.format( + "Unsafe trigger '%s' may lose data, did you mean to wrap it in" + + "`Repeatedly.forever(...)`?%nSee " + + "https://s.apache.org/finishing-triggers-drop-data " + + "for details.", + windowingStrategy.getTrigger())); + } + } + + @Override + public void validate( + @Nullable PipelineOptions options, + Map, PCollection> inputs, + Map, PCollection> outputs) { + PCollection input = Iterables.getOnlyElement(inputs.values()); + KvCoder inputCoder = getInputKvCoder(input.getCoder()); + + // Ensure that the output coder key and value types aren't different. + Coder outputCoder = Iterables.getOnlyElement(outputs.values()).getCoder(); + KvCoder expectedOutputCoder = getOutputKvCoder(inputCoder); + if (!expectedOutputCoder.equals(outputCoder)) { + throw new IllegalStateException( + String.format( + "the GroupByKey requires its output coder to be %s but found %s.", + expectedOutputCoder, outputCoder)); + } + } + + // Note that Never trigger finishes *at* GC time so it is OK, and + // AfterWatermark.fromEndOfWindow() finishes at end-of-window time so it is + // OK if there is no allowed lateness. + private static boolean triggerIsSafe(WindowingStrategy windowingStrategy) { + if (!windowingStrategy.getTrigger().mayFinish()) { + return true; + } + + if (windowingStrategy.getTrigger() instanceof NeverTrigger) { + return true; + } + + if (windowingStrategy.getTrigger() instanceof FromEndOfWindow + && windowingStrategy.getAllowedLateness().getMillis() == 0) { + return true; + } + + if (windowingStrategy.getTrigger() instanceof AfterWatermarkEarlyAndLate + && windowingStrategy.getAllowedLateness().getMillis() == 0) { + return true; + } + + if (windowingStrategy.getTrigger() instanceof AfterWatermarkEarlyAndLate + && ((AfterWatermarkEarlyAndLate) windowingStrategy.getTrigger()).getLateTrigger() != null) { + return true; + } + + return false; + } + + @Override + public PCollection>> expand(PCollection> input) { + applicableTo(input); + // Verify that the input Coder> is a KvCoder, and that + // the key coder is deterministic. + Coder keyCoder = getKeyCoder(input.getCoder()); + Coder valueCoder = getInputValueCoder(input.getCoder()); + KvCoder> outputKvCoder = getOutputKvCoder(input.getCoder()); + + try { + keyCoder.verifyDeterministic(); + } catch (NonDeterministicException e) { + throw new IllegalStateException("the keyCoder of a GroupByKey must be deterministic", e); + } + Preconditions.checkArgument(numShardingKeys > 0); + final TupleTag> largeKeys = new TupleTag>() {}; + final TupleTag> smallKeys = new TupleTag>() {}; + WindowingStrategy originalWindowingStrategy = input.getWindowingStrategy(); + + PCollectionTuple mapKeysToBytes = + input.apply( + "MapKeysToBytes", + ParDo.of( + new DoFn, KV>() { + @ProcessElement + public void processElement(ProcessContext c) { + KV kv = c.element(); + Output output = ByteString.newOutput(); + try { + keyCoder.encode(kv.getKey(), output); + } catch (IOException e) { + throw new RuntimeException(e); + } + + KV outputKV = KV.of(output.toByteString(), kv.getValue()); + if (outputKV.getKey().size() <= SMALL_KEY_BYTES_THRESHOLD) { + c.output(smallKeys, outputKV); + } else { + c.output(largeKeys, outputKV); + } + } + }) + .withOutputTags(largeKeys, TupleTagList.of(smallKeys))); + + PCollection>> largeKeyBranch = + mapKeysToBytes + .get(largeKeys) + .setCoder(KvCoder.of(KeyedWindow.ByteStringCoder.of(), valueCoder)) + .apply(DataflowGroupByKey.create()) + .apply( + "DecodeKey", + MapElements.via( + new SimpleFunction>, KV>>() { + @Override + public KV> apply(KV> kv) { + try { + return KV.of(keyCoder.decode(kv.getKey().newInput()), kv.getValue()); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + })) + .setCoder(outputKvCoder); + + WindowFn windowFn = originalWindowingStrategy.getWindowFn(); + PCollection>> smallKeyBranch = + mapKeysToBytes + .get(smallKeys) + .apply(Window.into(new KeyedWindowFn<>(windowFn))) + .apply( + "MapKeys", + MapElements.via( + new SimpleFunction, KV>() { + @Override + public KV apply(KV value) { + return KV.of(value.getKey().hashCode() % numShardingKeys, value.getValue()); + } + })) + .apply(DataflowGroupByKey.create()) + .apply( + "Restore Keys", + ParDo.of( + new DoFn>, KV>>() { + @ProcessElement + public void processElement(ProcessContext c, BoundedWindow w, PaneInfo pane) { + ByteString key = ((KeyedWindow) w).getKey(); + try { + + // is it correct to use the pane from Keyed window here? + c.outputWindowedValue( + KV.of(keyCoder.decode(key.newInput()), c.element().getValue()), + c.timestamp(), + Collections.singleton(((KeyedWindow) w).getWindow()), + pane); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + })) + .setWindowingStrategyInternal(originalWindowingStrategy) + .setCoder(outputKvCoder); + return PCollectionList.of(Arrays.asList(smallKeyBranch, largeKeyBranch)) + .apply(Flatten.pCollections()); + } + + /** + * Returns the {@code Coder} of the input to this transform, which should be a {@code KvCoder}. + */ + @SuppressWarnings("unchecked") + static KvCoder getInputKvCoder(Coder inputCoder) { + if (!(inputCoder instanceof KvCoder)) { + throw new IllegalStateException("GroupByKey requires its input to use KvCoder"); + } + return (KvCoder) inputCoder; + } + + ///////////////////////////////////////////////////////////////////////////// + + /** + * Returns the {@code Coder} of the keys of the input to this transform, which is also used as the + * {@code Coder} of the keys of the output of this transform. + */ + public static Coder getKeyCoder(Coder> inputCoder) { + return StateMultiplexingGroupByKey.getInputKvCoder(inputCoder).getKeyCoder(); + } + + /** Returns the {@code Coder} of the values of the input to this transform. */ + public static Coder getInputValueCoder(Coder> inputCoder) { + return StateMultiplexingGroupByKey.getInputKvCoder(inputCoder).getValueCoder(); + } + + /** Returns the {@code Coder} of the {@code Iterable} values of the output of this transform. */ + static Coder> getOutputValueCoder(Coder> inputCoder) { + return IterableCoder.of(getInputValueCoder(inputCoder)); + } + + /** Returns the {@code Coder} of the output of this transform. */ + public static KvCoder> getOutputKvCoder(Coder> inputCoder) { + return KvCoder.of(getKeyCoder(inputCoder), getOutputValueCoder(inputCoder)); + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + super.populateDisplayData(builder); + if (fewKeys) { + builder.add(DisplayData.item("fewKeys", true).withLabel("Has Few Keys")); + } + } +} diff --git a/runners/prism/java/build.gradle b/runners/prism/java/build.gradle index 82eb62b9e207..f2dfa2bb1a28 100644 --- a/runners/prism/java/build.gradle +++ b/runners/prism/java/build.gradle @@ -178,7 +178,10 @@ def sickbayTests = [ 'org.apache.beam.sdk.transforms.ParDoTest$StateTests.testTwoRequiresTimeSortedInputWithLateData', 'org.apache.beam.sdk.transforms.ParDoTest$StateTests.testRequiresTimeSortedInputWithLateData', - // Missing output due to processing time timer skew. + // Timer race condition/ordering issue in Prism. + 'org.apache.beam.sdk.transforms.ParDoTest$TimerTests.testTwoTimersSettingEachOtherWithCreateAsInputUnbounded', + + // Missing output due to timer skew. 'org.apache.beam.sdk.transforms.ParDoTest$TimestampTests.testProcessElementSkew', // TestStream + BundleFinalization. @@ -238,6 +241,10 @@ def createPrismValidatesRunnerTask = { name, environmentType -> excludeCategories 'org.apache.beam.sdk.testing.UsesMultimapState' } filter { + // Hangs forever with prism. Put here instead of sickbay to allow sickbay runs to terminate. + // https://github.com/apache/beam/issues/32222 + excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoTest$TimerTests.testEventTimeTimerOrderingWithCreate' + for (String test : sickbayTests) { excludeTestsMatching test } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/KeyedWindow.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/KeyedWindow.java new file mode 100644 index 000000000000..c0e9e513afda --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/KeyedWindow.java @@ -0,0 +1,269 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms.windowing; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map.Entry; +import java.util.Objects; +import java.util.stream.Collectors; +import org.apache.beam.sdk.coders.AtomicCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.util.VarInt; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.vendor.grpc.v1p60p1.com.google.common.base.Preconditions; +import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.ByteStreams; +import org.checkerframework.checker.nullness.qual.NonNull; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.joda.time.Instant; + +public class KeyedWindow extends BoundedWindow { + + private final ByteString key; + private final W window; + + public KeyedWindow(ByteString name, W window) { + this.key = name; + this.window = window; + } + + public ByteString getKey() { + return key; + } + + public W getWindow() { + return window; + } + + @Override + public Instant maxTimestamp() { + return window.maxTimestamp(); + } + + @Override + public String toString() { + return "NamedWindow{" + "name='" + key + '\'' + ", window=" + window + '}'; + } + + @Override + public boolean equals(@Nullable Object o) { + if (o == null) { + return false; + } + if (this == o) { + return true; + } + if (!(o instanceof KeyedWindow)) { + return false; + } + KeyedWindow that = (KeyedWindow) o; + return Objects.equals(key, that.key) && Objects.equals(window, that.window); + } + + @Override + public int hashCode() { + return Objects.hash(key, window); + } + + public static class KeyedWindowFn + extends WindowFn, KeyedWindow> { + + private final WindowFn windowFn; + + public KeyedWindowFn(WindowFn windowFn) { + this.windowFn = (WindowFn) windowFn; + } + + @Override + public Collection> assignWindows( + WindowFn, KeyedWindow>.AssignContext c) throws Exception { + + return windowFn + .assignWindows( + new WindowFn.AssignContext() { + + @Override + public V element() { + return c.element().getValue(); + } + + @Override + public Instant timestamp() { + return c.timestamp(); + } + + @Override + public BoundedWindow window() { + return c.window(); + } + }) + .stream() + .map(window -> new KeyedWindow<>(c.element().getKey(), window)) + .collect(Collectors.toList()); + } + + @Override + public void mergeWindows(WindowFn, KeyedWindow>.MergeContext c) throws Exception { + if (windowFn instanceof NonMergingWindowFn) { + return; + } + HashMap> keyToWindow = new HashMap<>(); + c.windows() + .forEach( + keyedWindow -> { + List windows = + keyToWindow.computeIfAbsent(keyedWindow.getKey(), k -> new ArrayList<>()); + windows.add(keyedWindow.getWindow()); + }); + for (Entry> entry : keyToWindow.entrySet()) { + ByteString key = entry.getKey(); + List windows = entry.getValue(); + windowFn.mergeWindows( + new WindowFn.MergeContext() { + @Override + public Collection windows() { + return windows; + } + + @Override + public void merge(Collection toBeMerged, W mergeResult) throws Exception { + List> toMergedKeyedWindows = + toBeMerged.stream() + .map(window -> new KeyedWindow<>(key, window)) + .collect(Collectors.toList()); + c.merge(toMergedKeyedWindows, new KeyedWindow<>(key, mergeResult)); + } + }); + } + } + + @Override + public boolean isCompatible(WindowFn other) { + return (other instanceof KeyedWindowFn) + && windowFn.isCompatible(((KeyedWindowFn) other).windowFn); + } + + @Override + public Coder> windowCoder() { + return new KeyedWindowCoder<>(windowFn.windowCoder()); + } + + @Override + public WindowMappingFn> getDefaultWindowMappingFn() { + return new WindowMappingFn>() { + @Override + public KeyedWindow getSideInputWindow(BoundedWindow mainWindow) { + Preconditions.checkArgument(mainWindow instanceof KeyedWindow); + KeyedWindow mainKeyedWindow = (KeyedWindow) mainWindow; + return new KeyedWindow<>( + mainKeyedWindow.getKey(), + windowFn.getDefaultWindowMappingFn().getSideInputWindow(mainKeyedWindow.getWindow())); + } + }; + } + + @Override + public boolean isNonMerging() { + return windowFn.isNonMerging(); + } + + @Override + public boolean assignsToOneWindow() { + return windowFn.assignsToOneWindow(); + } + + @Override + public void verifyCompatibility(WindowFn other) throws IncompatibleWindowException { + if (other instanceof KeyedWindowFn) { + windowFn.verifyCompatibility(((KeyedWindowFn) other).windowFn); + } + ; + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + windowFn.populateDisplayData(builder); + } + } + + public static class KeyedWindowCoder extends Coder> { + + private final KvCoder coder; + + public KeyedWindowCoder(Coder windowCoder) { + //:TODO consider swapping the order for improved state locality + this.coder = KvCoder.of(ByteStringCoder.of(), windowCoder); + } + + @Override + public void encode(KeyedWindow value, OutputStream outStream) throws IOException { + coder.encode(KV.of(value.getKey(), value.getWindow()), outStream); + } + + @Override + public KeyedWindow decode(InputStream inStream) throws IOException { + KV decode = coder.decode(inStream); + return new KeyedWindow<>(decode.getKey(), decode.getValue()); + } + + @Override + public List> getCoderArguments() { + return coder.getCoderArguments(); + } + + @Override + public void verifyDeterministic() throws NonDeterministicException { + coder.verifyDeterministic(); + } + + @Override + public boolean consistentWithEquals() { + return coder.getValueCoder().consistentWithEquals(); + } + } + + public static class ByteStringCoder extends AtomicCoder { + public static ByteStringCoder of() { + return INSTANCE; + } + + private static final ByteStringCoder INSTANCE = new ByteStringCoder(); + + private ByteStringCoder() {} + + @Override + public void encode(ByteString value, OutputStream os) throws IOException { + VarInt.encode(value.size(), os); + value.writeTo(os); + } + + @Override + public ByteString decode(InputStream is) throws IOException { + int size = VarInt.decodeInt(is); + return ByteString.readFrom(ByteStreams.limit(is, size), size); + } + } +} From 1e28c575dbc38f599510ce00b700a16889a1f69c Mon Sep 17 00:00:00 2001 From: Arun Pandian Date: Sun, 15 Dec 2024 04:49:16 -0800 Subject: [PATCH 02/11] cleanup + plumb fewKeys + make keys configurable --- ...tCommit_Java_ValidatesRunner_Dataflow.json | 3 +- ...va_ValidatesRunner_Dataflow_Streaming.json | 3 +- .../google-cloud-dataflow-java/build.gradle | 3 +- .../dataflow/DataflowPipelineTranslator.java | 15 +- .../beam/runners/dataflow/DataflowRunner.java | 25 +- ...MultiplexingGroupByKeyOverrideFactory.java | 8 +- .../dataflow/internal/DataflowGroupByKey.java | 17 +- .../dataflow/internal}/KeyedWindow.java | 35 +-- .../internal/StateMultiplexingGroupByKey.java | 225 ++++++------------ .../dataflow/util}/ByteStringCoder.java | 2 +- 10 files changed, 141 insertions(+), 195 deletions(-) rename {sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing => runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal}/KeyedWindow.java (89%) rename runners/google-cloud-dataflow-java/{worker/src/main/java/org/apache/beam/runners/dataflow/worker => src/main/java/org/apache/beam/runners/dataflow/util}/ByteStringCoder.java (97%) diff --git a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow.json b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow.json index 96e098eb7f97..c98ca2b07f9d 100644 --- a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow.json +++ b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow.json @@ -2,5 +2,6 @@ "comment": "Modify this file in a trivial way to cause this test suite to run", "https://github.com/apache/beam/pull/31156": "noting that PR #31156 should run this test", "https://github.com/apache/beam/pull/31268": "noting that PR #31268 should run this test", - "https://github.com/apache/beam/pull/31490": "noting that PR #31490 should run this test" + "https://github.com/apache/beam/pull/31490": "noting that PR #31490 should run this test", + "https://github.com/apache/beam/pull/33318": "noting that PR #33318 should run this test" } diff --git a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming.json b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming.json index 96e098eb7f97..c98ca2b07f9d 100644 --- a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming.json +++ b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming.json @@ -2,5 +2,6 @@ "comment": "Modify this file in a trivial way to cause this test suite to run", "https://github.com/apache/beam/pull/31156": "noting that PR #31156 should run this test", "https://github.com/apache/beam/pull/31268": "noting that PR #31268 should run this test", - "https://github.com/apache/beam/pull/31490": "noting that PR #31490 should run this test" + "https://github.com/apache/beam/pull/31490": "noting that PR #31490 should run this test", + "https://github.com/apache/beam/pull/33318": "noting that PR #33318 should run this test" } diff --git a/runners/google-cloud-dataflow-java/build.gradle b/runners/google-cloud-dataflow-java/build.gradle index 811a3c15f836..aeace769b4c1 100644 --- a/runners/google-cloud-dataflow-java/build.gradle +++ b/runners/google-cloud-dataflow-java/build.gradle @@ -462,7 +462,8 @@ task validatesRunnerStreaming { description "Validates Dataflow runner forcing streaming mode" dependsOn(createLegacyWorkerValidatesRunnerTest( name: 'validatesRunnerLegacyWorkerTestStreaming', - pipelineOptions: legacyPipelineOptions + ['--streaming'], + pipelineOptions: legacyPipelineOptions + ['--streaming'] + + ['--experiments=enable_gbk_state_multiplexing'], excludedCategories: [ 'org.apache.beam.sdk.testing.UsesCommittedMetrics', 'org.apache.beam.sdk.testing.UsesMapState', diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java index c01096716c97..a548cbf63eba 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java @@ -935,7 +935,20 @@ private void dataflowGroupByKeyHelper( stepContext.addOutput(PropertyNames.OUTPUT, context.getOutput(transform)); WindowingStrategy windowingStrategy = input.getWindowingStrategy(); - stepContext.addInput(PropertyNames.DISALLOW_COMBINER_LIFTING, true); + boolean isStreaming = + context.getPipelineOptions().as(StreamingOptions.class).isStreaming(); + // :TODO do we set this for batch? + boolean allowCombinerLifting = false; + if (isStreaming) { + allowCombinerLifting = + !windowingStrategy.needsMerge() + && windowingStrategy.getWindowFn().assignsToOneWindow(); + allowCombinerLifting &= transform.fewKeys(); + // TODO: Allow combiner lifting on the non-default trigger, as appropriate. + allowCombinerLifting &= (windowingStrategy.getTrigger() instanceof DefaultTrigger); + } + stepContext.addInput(PropertyNames.DISALLOW_COMBINER_LIFTING, !allowCombinerLifting); + stepContext.addInput( PropertyNames.SERIALIZED_FN, byteArrayToJsonString( diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index 9f41ea138bd5..676ceb495c21 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -53,6 +53,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Random; import java.util.Set; import java.util.SortedSet; @@ -64,6 +65,7 @@ import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.JobSpecification; import org.apache.beam.runners.dataflow.StreamingViewOverrides.StreamingCreatePCollectionViewFactory; import org.apache.beam.runners.dataflow.TransformTranslator.StepTranslationContext; +import org.apache.beam.runners.dataflow.internal.StateMultiplexingGroupByKey; import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions; import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; import org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions; @@ -215,8 +217,6 @@ public class DataflowRunner extends PipelineRunner { "unsafely_attempt_to_process_unbounded_data_in_batch_mode"; private static final Logger LOG = LoggerFactory.getLogger(DataflowRunner.class); - private static final String EXPERIMENT_ENABLE_GBK_STATE_MULTIPLEXING = - "enable_gbk_state_multiplexing"; /** Provided configuration options. */ private final DataflowPipelineOptions options; @@ -801,11 +801,12 @@ private List getOverrides(boolean streaming) { new RedistributeByKeyOverrideFactory())); if (streaming) { - if (DataflowRunner.hasExperiment(options, EXPERIMENT_ENABLE_GBK_STATE_MULTIPLEXING)) { + if (DataflowRunner.hasExperiment( + options, StateMultiplexingGroupByKey.EXPERIMENT_ENABLE_GBK_STATE_MULTIPLEXING)) { overridesBuilder.add( PTransformOverride.of( PTransformMatchers.classEqualTo(GroupByKey.class), - new StateMultiplexingGroupByKeyOverrideFactory<>())); + new StateMultiplexingGroupByKeyOverrideFactory<>(options))); } // For update compatibility, always use a Read for Create in streaming mode. overridesBuilder @@ -1714,6 +1715,22 @@ public static boolean hasExperiment(DataflowPipelineDebugOptions options, String return experiments.contains(experiment); } + /** Return the value for the specified experiment or null if not present. */ + public static Optional getExperimentValue( + DataflowPipelineDebugOptions options, String experiment) { + List experiments = options.getExperiments(); + if (experiments == null) { + return Optional.empty(); + } + String prefix = experiment + "="; + for (String experimentEntry : experiments) { + if (experimentEntry.startsWith(prefix)) { + return Optional.of(experimentEntry.substring(prefix.length())); + } + } + return Optional.empty(); + } + /** Helper to configure the Dataflow Job Environment based on the user's job options. */ private static Map getEnvironmentVersion(DataflowPipelineOptions options) { DataflowRunnerInfo runnerInfo = DataflowRunnerInfo.getDataflowRunnerInfo(); diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/StateMultiplexingGroupByKeyOverrideFactory.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/StateMultiplexingGroupByKeyOverrideFactory.java index 468a0a95d77c..93c97a547fc1 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/StateMultiplexingGroupByKeyOverrideFactory.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/StateMultiplexingGroupByKeyOverrideFactory.java @@ -18,6 +18,7 @@ package org.apache.beam.runners.dataflow; import org.apache.beam.runners.dataflow.internal.StateMultiplexingGroupByKey; +import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; import org.apache.beam.sdk.runners.AppliedPTransform; import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.util.construction.PTransformReplacements; @@ -28,6 +29,11 @@ class StateMultiplexingGroupByKeyOverrideFactory extends SingleInputOutputOverrideFactory< PCollection>, PCollection>>, GroupByKey> { + private final DataflowPipelineOptions options; + + StateMultiplexingGroupByKeyOverrideFactory(DataflowPipelineOptions options) { + this.options = options; + } @Override public PTransformReplacement>, PCollection>>> @@ -37,6 +43,6 @@ class StateMultiplexingGroupByKeyOverrideFactory transform) { return PTransformReplacement.of( PTransformReplacements.getSingletonMainInput(transform), - StateMultiplexingGroupByKey.create(transform.getTransform().fewKeys())); + StateMultiplexingGroupByKey.create(options, transform.getTransform().fewKeys())); } } diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowGroupByKey.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowGroupByKey.java index 89135641689e..811eb888379b 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowGroupByKey.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowGroupByKey.java @@ -47,8 +47,11 @@ public class DataflowGroupByKey // Plumbed from Redistribute transform. private final boolean allowDuplicates; - private DataflowGroupByKey(boolean allowDuplicates) { + private final boolean fewKeys; + + private DataflowGroupByKey(boolean allowDuplicates, boolean fewKeys) { this.allowDuplicates = allowDuplicates; + this.fewKeys = fewKeys; } /** @@ -59,7 +62,11 @@ private DataflowGroupByKey(boolean allowDuplicates) { * {@code Iterable}s in the output {@code PCollection} */ public static DataflowGroupByKey create() { - return new DataflowGroupByKey<>(false); + return new DataflowGroupByKey<>(/*allowDuplicates=*/ false, /*fewKeys=*/ false); + } + + static DataflowGroupByKey createWithFewKeys() { + return new DataflowGroupByKey<>(/*allowDuplicates=*/ false, /*fewKeys=*/ true); } /** @@ -71,7 +78,7 @@ public static DataflowGroupByKey create() { * {@code Iterable}s in the output {@code PCollection} */ public static DataflowGroupByKey createWithAllowDuplicates() { - return new DataflowGroupByKey<>(true); + return new DataflowGroupByKey<>(/*allowDuplicates=*/ true, /*fewKeys=*/ false); } /** Returns whether it allows duplicated elements in the output. */ @@ -79,6 +86,10 @@ public boolean allowDuplicates() { return allowDuplicates; } + /** Returns whether it groups just few keys. */ + public boolean fewKeys() { + return fewKeys; + } ///////////////////////////////////////////////////////////////////////////// public static void applicableTo(PCollection input) { diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/KeyedWindow.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/KeyedWindow.java similarity index 89% rename from sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/KeyedWindow.java rename to runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/KeyedWindow.java index c0e9e513afda..1f7f568c142f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/KeyedWindow.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/KeyedWindow.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.sdk.transforms.windowing; +package org.apache.beam.runners.dataflow.internal; import java.io.IOException; import java.io.InputStream; @@ -27,15 +27,18 @@ import java.util.Map.Entry; import java.util.Objects; import java.util.stream.Collectors; -import org.apache.beam.sdk.coders.AtomicCoder; +import org.apache.beam.runners.dataflow.util.ByteStringCoder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.transforms.display.DisplayData; -import org.apache.beam.sdk.util.VarInt; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.IncompatibleWindowException; +import org.apache.beam.sdk.transforms.windowing.NonMergingWindowFn; +import org.apache.beam.sdk.transforms.windowing.WindowFn; +import org.apache.beam.sdk.transforms.windowing.WindowMappingFn; import org.apache.beam.sdk.values.KV; import org.apache.beam.vendor.grpc.v1p60p1.com.google.common.base.Preconditions; import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.ByteStreams; import org.checkerframework.checker.nullness.qual.NonNull; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Instant; @@ -214,7 +217,7 @@ public static class KeyedWindowCoder extends Coder coder; public KeyedWindowCoder(Coder windowCoder) { - //:TODO consider swapping the order for improved state locality + // :TODO consider swapping the order for improved state locality this.coder = KvCoder.of(ByteStringCoder.of(), windowCoder); } @@ -244,26 +247,4 @@ public boolean consistentWithEquals() { return coder.getValueCoder().consistentWithEquals(); } } - - public static class ByteStringCoder extends AtomicCoder { - public static ByteStringCoder of() { - return INSTANCE; - } - - private static final ByteStringCoder INSTANCE = new ByteStringCoder(); - - private ByteStringCoder() {} - - @Override - public void encode(ByteString value, OutputStream os) throws IOException { - VarInt.encode(value.size(), os); - value.writeTo(os); - } - - @Override - public ByteString decode(InputStream is) throws IOException { - int size = VarInt.decodeInt(is); - return ByteString.readFrom(ByteStreams.limit(is, size), size); - } - } } diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/StateMultiplexingGroupByKey.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/StateMultiplexingGroupByKey.java index 661652ce3453..487032dd977d 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/StateMultiplexingGroupByKey.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/StateMultiplexingGroupByKey.java @@ -20,12 +20,13 @@ import java.io.IOException; import java.util.Arrays; import java.util.Collections; -import java.util.Map; +import org.apache.beam.runners.dataflow.DataflowRunner; +import org.apache.beam.runners.dataflow.internal.KeyedWindow.KeyedWindowFn; +import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; +import org.apache.beam.runners.dataflow.util.ByteStringCoder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.Coder.NonDeterministicException; -import org.apache.beam.sdk.coders.IterableCoder; import org.apache.beam.sdk.coders.KvCoder; -import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Flatten; import org.apache.beam.sdk.transforms.MapElements; @@ -33,20 +34,13 @@ import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SimpleFunction; import org.apache.beam.sdk.transforms.display.DisplayData; -import org.apache.beam.sdk.transforms.windowing.AfterWatermark.AfterWatermarkEarlyAndLate; -import org.apache.beam.sdk.transforms.windowing.AfterWatermark.FromEndOfWindow; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.DefaultTrigger; -import org.apache.beam.sdk.transforms.windowing.GlobalWindows; -import org.apache.beam.sdk.transforms.windowing.KeyedWindow; -import org.apache.beam.sdk.transforms.windowing.KeyedWindow.KeyedWindowFn; -import org.apache.beam.sdk.transforms.windowing.Never.NeverTrigger; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.transforms.windowing.WindowFn; +import org.apache.beam.sdk.util.ByteStringOutputStream; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollection.IsBounded; import org.apache.beam.sdk.values.PCollectionList; import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.TupleTag; @@ -54,9 +48,6 @@ import org.apache.beam.sdk.values.WindowingStrategy; import org.apache.beam.vendor.grpc.v1p60p1.com.google.common.base.Preconditions; import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString; -import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString.Output; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; -import org.checkerframework.checker.nullness.qual.Nullable; /** * A GroupByKey implementation that multiplexes many small user keys over a fixed set of sharding @@ -65,142 +56,97 @@ public class StateMultiplexingGroupByKey extends PTransform>, PCollection>>> { + public static final String EXPERIMENT_ENABLE_GBK_STATE_MULTIPLEXING = + "enable_gbk_state_multiplexing"; + private static final String EXPERIMENT_NUM_VIRTUAL_KEYS = "gbk_state_multiplexing_num_keys"; + private static final String EXPERIMENT_SMALL_KEY_BYTES_THRESHOLD = + "gbk_state_multiplexing_small_key_bytes"; + /* * Keys larger than this threshold will not be multiplexed. */ - private static final int SMALL_KEY_BYTES_THRESHOLD = 4096; + private static final int DEFAULT_SMALL_KEY_BYTES_THRESHOLD = 4096; + private static final int DEFAULT_NUM_VIRTUAL_KEYS = 32 << 10; private final boolean fewKeys; - private final int numShardingKeys; + private final int numVirtualKeys; + private final int smallKeyBytesThreshold; - private StateMultiplexingGroupByKey(boolean fewKeys) { - // :TODO plumb fewKeys to DataflowGroupByKey + private StateMultiplexingGroupByKey(DataflowPipelineOptions options, boolean fewKeys) { this.fewKeys = fewKeys; - // :TODO Make this configurable - this.numShardingKeys = 32 << 10; + this.numVirtualKeys = + getExperimentValue(options, EXPERIMENT_NUM_VIRTUAL_KEYS, DEFAULT_NUM_VIRTUAL_KEYS); + this.smallKeyBytesThreshold = + getExperimentValue( + options, EXPERIMENT_SMALL_KEY_BYTES_THRESHOLD, DEFAULT_SMALL_KEY_BYTES_THRESHOLD); + } + + private static int getExperimentValue( + DataflowPipelineOptions options, String experiment, int defaultValue) { + return DataflowRunner.getExperimentValue(options, experiment) + .map(Integer::parseInt) + .orElse(defaultValue); } /** - * Returns a {@code GroupByKey} {@code PTransform}. + * Returns a {@code StateMultiplexingGroupByKey} {@code PTransform}. * * @param the type of the keys of the input and output {@code PCollection}s * @param the type of the values of the input {@code PCollection} and the elements of the * {@code Iterable}s in the output {@code PCollection} */ - public static StateMultiplexingGroupByKey create(boolean fewKeys) { - return new StateMultiplexingGroupByKey<>(fewKeys); - } - - ///////////////////////////////////////////////////////////////////////////// - - public static void applicableTo(PCollection input) { - WindowingStrategy windowingStrategy = input.getWindowingStrategy(); - // Verify that the input PCollection is bounded, or that there is windowing/triggering being - // used. Without this, the watermark (at end of global window) will never be reached. - if (windowingStrategy.getWindowFn() instanceof GlobalWindows - && windowingStrategy.getTrigger() instanceof DefaultTrigger - && input.isBounded() != IsBounded.BOUNDED) { - throw new IllegalStateException( - "GroupByKey cannot be applied to non-bounded PCollection in the GlobalWindow without a" - + " trigger. Use a Window.into or Window.triggering transform prior to GroupByKey."); - } - - // Validate that the trigger does not finish before garbage collection time - if (!triggerIsSafe(windowingStrategy)) { - throw new IllegalArgumentException( - String.format( - "Unsafe trigger '%s' may lose data, did you mean to wrap it in" - + "`Repeatedly.forever(...)`?%nSee " - + "https://s.apache.org/finishing-triggers-drop-data " - + "for details.", - windowingStrategy.getTrigger())); - } - } - - @Override - public void validate( - @Nullable PipelineOptions options, - Map, PCollection> inputs, - Map, PCollection> outputs) { - PCollection input = Iterables.getOnlyElement(inputs.values()); - KvCoder inputCoder = getInputKvCoder(input.getCoder()); - - // Ensure that the output coder key and value types aren't different. - Coder outputCoder = Iterables.getOnlyElement(outputs.values()).getCoder(); - KvCoder expectedOutputCoder = getOutputKvCoder(inputCoder); - if (!expectedOutputCoder.equals(outputCoder)) { - throw new IllegalStateException( - String.format( - "the GroupByKey requires its output coder to be %s but found %s.", - expectedOutputCoder, outputCoder)); - } - } - - // Note that Never trigger finishes *at* GC time so it is OK, and - // AfterWatermark.fromEndOfWindow() finishes at end-of-window time so it is - // OK if there is no allowed lateness. - private static boolean triggerIsSafe(WindowingStrategy windowingStrategy) { - if (!windowingStrategy.getTrigger().mayFinish()) { - return true; - } - - if (windowingStrategy.getTrigger() instanceof NeverTrigger) { - return true; - } - - if (windowingStrategy.getTrigger() instanceof FromEndOfWindow - && windowingStrategy.getAllowedLateness().getMillis() == 0) { - return true; - } - - if (windowingStrategy.getTrigger() instanceof AfterWatermarkEarlyAndLate - && windowingStrategy.getAllowedLateness().getMillis() == 0) { - return true; - } - - if (windowingStrategy.getTrigger() instanceof AfterWatermarkEarlyAndLate - && ((AfterWatermarkEarlyAndLate) windowingStrategy.getTrigger()).getLateTrigger() != null) { - return true; - } - - return false; + public static StateMultiplexingGroupByKey create( + DataflowPipelineOptions options, boolean fewKeys) { + return new StateMultiplexingGroupByKey<>(options, fewKeys); } @Override public PCollection>> expand(PCollection> input) { - applicableTo(input); + DataflowGroupByKey.applicableTo(input); // Verify that the input Coder> is a KvCoder, and that // the key coder is deterministic. - Coder keyCoder = getKeyCoder(input.getCoder()); - Coder valueCoder = getInputValueCoder(input.getCoder()); - KvCoder> outputKvCoder = getOutputKvCoder(input.getCoder()); - + Coder keyCoder = DataflowGroupByKey.getKeyCoder(input.getCoder()); try { keyCoder.verifyDeterministic(); } catch (NonDeterministicException e) { throw new IllegalStateException("the keyCoder of a GroupByKey must be deterministic", e); } - Preconditions.checkArgument(numShardingKeys > 0); - final TupleTag> largeKeys = new TupleTag>() {}; - final TupleTag> smallKeys = new TupleTag>() {}; + Coder valueCoder = DataflowGroupByKey.getInputValueCoder(input.getCoder()); + KvCoder> outputKvCoder = DataflowGroupByKey.getOutputKvCoder(input.getCoder()); + + Preconditions.checkArgument(numVirtualKeys > 0); + final TupleTag> largeKeys = new TupleTag>() { + }; + final TupleTag> smallKeys = new TupleTag>() { + }; WindowingStrategy originalWindowingStrategy = input.getWindowingStrategy(); + WindowFn originalWindowFn = originalWindowingStrategy.getWindowFn(); PCollectionTuple mapKeysToBytes = input.apply( "MapKeysToBytes", ParDo.of( new DoFn, KV>() { + transient ByteStringOutputStream byteStringOutputStream; + + @StartBundle + public void setup() { + byteStringOutputStream = new ByteStringOutputStream(); + } + @ProcessElement public void processElement(ProcessContext c) { KV kv = c.element(); - Output output = ByteString.newOutput(); try { - keyCoder.encode(kv.getKey(), output); + // clear output stream + byteStringOutputStream.toByteStringAndReset(); + keyCoder.encode(kv.getKey(), byteStringOutputStream); } catch (IOException e) { throw new RuntimeException(e); } - KV outputKV = KV.of(output.toByteString(), kv.getValue()); - if (outputKV.getKey().size() <= SMALL_KEY_BYTES_THRESHOLD) { + ByteString keyBytes = byteStringOutputStream.toByteStringAndReset(); + KV outputKV = KV.of(keyBytes, kv.getValue()); + if (keyBytes.size() <= smallKeyBytesThreshold) { c.output(smallKeys, outputKV); } else { c.output(largeKeys, outputKV); @@ -209,11 +155,12 @@ public void processElement(ProcessContext c) { }) .withOutputTags(largeKeys, TupleTagList.of(smallKeys))); + // Pass large keys as it is through DataflowGroupByKey PCollection>> largeKeyBranch = mapKeysToBytes .get(largeKeys) - .setCoder(KvCoder.of(KeyedWindow.ByteStringCoder.of(), valueCoder)) - .apply(DataflowGroupByKey.create()) + .setCoder(KvCoder.of(ByteStringCoder.of(), valueCoder)) + .apply(fewKeys ? DataflowGroupByKey.createWithFewKeys() : DataflowGroupByKey.create()) .apply( "DecodeKey", MapElements.via( @@ -229,30 +176,31 @@ public KV> apply(KV> kv) { })) .setCoder(outputKvCoder); - WindowFn windowFn = originalWindowingStrategy.getWindowFn(); + // Multiplex small keys over `numShardingKeys` virtual keys. + // Original user keys are sent as part of windows. + // After GroupByKey the original keys are restored from windows. PCollection>> smallKeyBranch = mapKeysToBytes .get(smallKeys) - .apply(Window.into(new KeyedWindowFn<>(windowFn))) + .apply(Window.into(new KeyedWindowFn<>(originalWindowFn))) .apply( - "MapKeys", + "MapKeysToVirtualKeys", MapElements.via( new SimpleFunction, KV>() { @Override public KV apply(KV value) { - return KV.of(value.getKey().hashCode() % numShardingKeys, value.getValue()); + return KV.of(value.getKey().hashCode() % numVirtualKeys, value.getValue()); } })) - .apply(DataflowGroupByKey.create()) + .apply(fewKeys ? DataflowGroupByKey.createWithFewKeys() : DataflowGroupByKey.create()) .apply( - "Restore Keys", + "RestoreOriginalKeys", ParDo.of( new DoFn>, KV>>() { @ProcessElement public void processElement(ProcessContext c, BoundedWindow w, PaneInfo pane) { ByteString key = ((KeyedWindow) w).getKey(); try { - // is it correct to use the pane from Keyed window here? c.outputWindowedValue( KV.of(keyCoder.decode(key.newInput()), c.element().getValue()), @@ -270,47 +218,14 @@ public void processElement(ProcessContext c, BoundedWindow w, PaneInfo pane) { .apply(Flatten.pCollections()); } - /** - * Returns the {@code Coder} of the input to this transform, which should be a {@code KvCoder}. - */ - @SuppressWarnings("unchecked") - static KvCoder getInputKvCoder(Coder inputCoder) { - if (!(inputCoder instanceof KvCoder)) { - throw new IllegalStateException("GroupByKey requires its input to use KvCoder"); - } - return (KvCoder) inputCoder; - } - - ///////////////////////////////////////////////////////////////////////////// - - /** - * Returns the {@code Coder} of the keys of the input to this transform, which is also used as the - * {@code Coder} of the keys of the output of this transform. - */ - public static Coder getKeyCoder(Coder> inputCoder) { - return StateMultiplexingGroupByKey.getInputKvCoder(inputCoder).getKeyCoder(); - } - - /** Returns the {@code Coder} of the values of the input to this transform. */ - public static Coder getInputValueCoder(Coder> inputCoder) { - return StateMultiplexingGroupByKey.getInputKvCoder(inputCoder).getValueCoder(); - } - - /** Returns the {@code Coder} of the {@code Iterable} values of the output of this transform. */ - static Coder> getOutputValueCoder(Coder> inputCoder) { - return IterableCoder.of(getInputValueCoder(inputCoder)); - } - - /** Returns the {@code Coder} of the output of this transform. */ - public static KvCoder> getOutputKvCoder(Coder> inputCoder) { - return KvCoder.of(getKeyCoder(inputCoder), getOutputValueCoder(inputCoder)); - } - @Override public void populateDisplayData(DisplayData.Builder builder) { super.populateDisplayData(builder); if (fewKeys) { builder.add(DisplayData.item("fewKeys", true).withLabel("Has Few Keys")); } + builder.add(DisplayData.item("numVirtualKeys", numVirtualKeys).withLabel("Num Virtual Keys")); + builder.add(DisplayData.item("smallKeyBytesThreshold", smallKeyBytesThreshold) + .withLabel("Small Key Bytes Threshold")); } } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ByteStringCoder.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/ByteStringCoder.java similarity index 97% rename from runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ByteStringCoder.java rename to runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/ByteStringCoder.java index b9b1b45902c8..0c1a80aa2ace 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ByteStringCoder.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/ByteStringCoder.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.runners.dataflow.worker; +package org.apache.beam.runners.dataflow.util; import java.io.IOException; import java.io.InputStream; From 2c19b34209cb1dfbf6ee82e8c326cb65a4fd8ed5 Mon Sep 17 00:00:00 2001 From: Arun Pandian Date: Sun, 15 Dec 2024 05:15:06 -0800 Subject: [PATCH 03/11] fix build --- .../internal/StateMultiplexingGroupByKey.java | 24 ++++++++++++------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/StateMultiplexingGroupByKey.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/StateMultiplexingGroupByKey.java index 487032dd977d..54dd63e7a893 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/StateMultiplexingGroupByKey.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/StateMultiplexingGroupByKey.java @@ -114,10 +114,8 @@ public PCollection>> expand(PCollection> input) { KvCoder> outputKvCoder = DataflowGroupByKey.getOutputKvCoder(input.getCoder()); Preconditions.checkArgument(numVirtualKeys > 0); - final TupleTag> largeKeys = new TupleTag>() { - }; - final TupleTag> smallKeys = new TupleTag>() { - }; + final TupleTag> largeKeys = new TupleTag>() {}; + final TupleTag> smallKeys = new TupleTag>() {}; WindowingStrategy originalWindowingStrategy = input.getWindowingStrategy(); WindowFn originalWindowFn = originalWindowingStrategy.getWindowFn(); @@ -126,7 +124,8 @@ public PCollection>> expand(PCollection> input) { "MapKeysToBytes", ParDo.of( new DoFn, KV>() { - transient ByteStringOutputStream byteStringOutputStream; + transient ByteStringOutputStream byteStringOutputStream = + new ByteStringOutputStream(); @StartBundle public void setup() { @@ -160,7 +159,10 @@ public void processElement(ProcessContext c) { mapKeysToBytes .get(largeKeys) .setCoder(KvCoder.of(ByteStringCoder.of(), valueCoder)) - .apply(fewKeys ? DataflowGroupByKey.createWithFewKeys() : DataflowGroupByKey.create()) + .apply( + fewKeys + ? DataflowGroupByKey.createWithFewKeys() + : DataflowGroupByKey.create()) .apply( "DecodeKey", MapElements.via( @@ -192,7 +194,10 @@ public KV apply(KV value) { return KV.of(value.getKey().hashCode() % numVirtualKeys, value.getValue()); } })) - .apply(fewKeys ? DataflowGroupByKey.createWithFewKeys() : DataflowGroupByKey.create()) + .apply( + fewKeys + ? DataflowGroupByKey.createWithFewKeys() + : DataflowGroupByKey.create()) .apply( "RestoreOriginalKeys", ParDo.of( @@ -225,7 +230,8 @@ public void populateDisplayData(DisplayData.Builder builder) { builder.add(DisplayData.item("fewKeys", true).withLabel("Has Few Keys")); } builder.add(DisplayData.item("numVirtualKeys", numVirtualKeys).withLabel("Num Virtual Keys")); - builder.add(DisplayData.item("smallKeyBytesThreshold", smallKeyBytesThreshold) - .withLabel("Small Key Bytes Threshold")); + builder.add( + DisplayData.item("smallKeyBytesThreshold", smallKeyBytesThreshold) + .withLabel("Small Key Bytes Threshold")); } } From 8ad6a4041c34954e5972bf01410240931e327d26 Mon Sep 17 00:00:00 2001 From: Arun Pandian Date: Mon, 16 Dec 2024 02:11:51 -0800 Subject: [PATCH 04/11] don't multiplex null keys to fix Combine.globally --- .../apache/beam/runners/dataflow/internal/KeyedWindow.java | 1 - .../dataflow/internal/StateMultiplexingGroupByKey.java | 6 ++++++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/KeyedWindow.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/KeyedWindow.java index 1f7f568c142f..0ec578b679c7 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/KeyedWindow.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/KeyedWindow.java @@ -203,7 +203,6 @@ public void verifyCompatibility(WindowFn other) throws IncompatibleWindowE if (other instanceof KeyedWindowFn) { windowFn.verifyCompatibility(((KeyedWindowFn) other).windowFn); } - ; } @Override diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/StateMultiplexingGroupByKey.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/StateMultiplexingGroupByKey.java index 54dd63e7a893..8eec34c3bbc0 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/StateMultiplexingGroupByKey.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/StateMultiplexingGroupByKey.java @@ -135,6 +135,12 @@ public void setup() { @ProcessElement public void processElement(ProcessContext c) { KV kv = c.element(); + if (kv.getKey() == null) { + // Combine.globally treats null keys specially + // so don't multiplex them. + c.output(largeKeys, KV.of(null, kv.getValue())); + return; + } try { // clear output stream byteStringOutputStream.toByteStringAndReset(); From 9625af8337fa8d02e4e9ac3376d524bf370951a4 Mon Sep 17 00:00:00 2001 From: Arun Pandian Date: Mon, 16 Dec 2024 02:31:25 -0800 Subject: [PATCH 05/11] fix spotless, use @Setup instead of @StartBundle to initialize buffer --- .../apache/beam/runners/dataflow/internal/KeyedWindow.java | 2 +- .../dataflow/internal/StateMultiplexingGroupByKey.java | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/KeyedWindow.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/KeyedWindow.java index 0ec578b679c7..06ad80e455c8 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/KeyedWindow.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/KeyedWindow.java @@ -17,6 +17,7 @@ */ package org.apache.beam.runners.dataflow.internal; +import com.google.common.base.Preconditions; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -37,7 +38,6 @@ import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.transforms.windowing.WindowMappingFn; import org.apache.beam.sdk.values.KV; -import org.apache.beam.vendor.grpc.v1p60p1.com.google.common.base.Preconditions; import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString; import org.checkerframework.checker.nullness.qual.NonNull; import org.checkerframework.checker.nullness.qual.Nullable; diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/StateMultiplexingGroupByKey.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/StateMultiplexingGroupByKey.java index 8eec34c3bbc0..5d97ef77d1e8 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/StateMultiplexingGroupByKey.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/StateMultiplexingGroupByKey.java @@ -17,6 +17,7 @@ */ package org.apache.beam.runners.dataflow.internal; +import com.google.common.base.Preconditions; import java.io.IOException; import java.util.Arrays; import java.util.Collections; @@ -46,7 +47,6 @@ import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TupleTagList; import org.apache.beam.sdk.values.WindowingStrategy; -import org.apache.beam.vendor.grpc.v1p60p1.com.google.common.base.Preconditions; import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString; /** @@ -127,7 +127,7 @@ public PCollection>> expand(PCollection> input) { transient ByteStringOutputStream byteStringOutputStream = new ByteStringOutputStream(); - @StartBundle + @Setup public void setup() { byteStringOutputStream = new ByteStringOutputStream(); } From 68d30471859b144e4344c033856f8c29cb7201db Mon Sep 17 00:00:00 2001 From: Arun Pandian Date: Mon, 16 Dec 2024 02:58:04 -0800 Subject: [PATCH 06/11] add test suite --- ...r_Dataflow_Streaming_GBK_Multiplexing.json | 4 + .github/workflows/README.md | 1 + ...er_Dataflow_Streaming_GBK_Multiplexing.yml | 97 +++++++++++++++++++ .../google-cloud-dataflow-java/build.gradle | 53 ++++++---- 4 files changed, 137 insertions(+), 18 deletions(-) create mode 100644 .github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming_GBK_Multiplexing.json create mode 100644 .github/workflows/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming_GBK_Multiplexing.yml diff --git a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming_GBK_Multiplexing.json b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming_GBK_Multiplexing.json new file mode 100644 index 000000000000..da1664d2f250 --- /dev/null +++ b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming_GBK_Multiplexing.json @@ -0,0 +1,4 @@ +{ + "comment": "Modify this file in a trivial way to cause this test suite to run", + "https://github.com/apache/beam/pull/33318": "noting that PR #33318 should run this test" +} diff --git a/.github/workflows/README.md b/.github/workflows/README.md index 206364f416f7..006a65ddacc4 100644 --- a/.github/workflows/README.md +++ b/.github/workflows/README.md @@ -336,6 +336,7 @@ PostCommit Jobs run in a schedule against master branch and generally do not get | [ PostCommit Java Tpcds Spark ](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_Tpcds_Spark.yml) | N/A |`beam_PostCommit_Java_Tpcds_Spark.json`| [![.github/workflows/beam_PostCommit_Java_Tpcds_Spark.yml](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_Tpcds_Spark.yml/badge.svg?event=schedule)](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_Tpcds_Spark.yml?query=event%3Aschedule) | | [ PostCommit Java ValidatesRunner Dataflow JavaVersions ](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_ValidatesRunner_Dataflow_JavaVersions.yml) | ['8','21'] |`beam_PostCommit_Java_ValidatesRunner_Dataflow_JavaVersions.json`| [![.github/workflows/beam_PostCommit_Java_ValidatesRunner_Dataflow_JavaVersions.yml](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_ValidatesRunner_Dataflow_JavaVersions.yml/badge.svg?event=schedule)](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_ValidatesRunner_Dataflow_JavaVersions.yml?query=event%3Aschedule) | | [ PostCommit Java ValidatesRunner Dataflow Streaming ](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming.yml) | N/A |`beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming.json`| [![.github/workflows/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming.yml](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming.yml/badge.svg?event=schedule)](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming.yml?query=event%3Aschedule) | +| [ PostCommit Java ValidatesRunner Dataflow Streaming GBK Multiplexing](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming_GBK_Multiplexing.yml) | N/A | `beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming_GBK_Multiplexing.json` | [![.github/workflows/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming_GBK_Multiplexing.yml](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming_GBK_Multiplexing.yml/badge.svg?event=schedule)](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming_GBK_Multiplexing.yml?query=event%3Aschedule) | | [ PostCommit Java ValidatesRunner Dataflow V2 Streaming ](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_ValidatesRunner_Dataflow_V2_Streaming.yml) | N/A |`beam_PostCommit_Java_ValidatesRunner_Dataflow_V2_Streaming.json`| [![.github/workflows/beam_PostCommit_Java_ValidatesRunner_Dataflow_V2_Streaming.yml](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_ValidatesRunner_Dataflow_V2_Streaming.yml/badge.svg?event=schedule)](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_ValidatesRunner_Dataflow_V2_Streaming.yml?query=event%3Aschedule) | | [ PostCommit Java ValidatesRunner Dataflow V2 ](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_ValidatesRunner_Dataflow_V2.yml) | N/A |`beam_PostCommit_Java_ValidatesRunner_Dataflow_V2.json`| [![.github/workflows/beam_PostCommit_Java_ValidatesRunner_Dataflow_V2.yml](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_ValidatesRunner_Dataflow_V2.yml/badge.svg?event=schedule)](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_ValidatesRunner_Dataflow_V2.yml?query=event%3Aschedule) | | [ PostCommit Java ValidatesRunner Dataflow ](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_ValidatesRunner_Dataflow.yml) | N/A |`beam_PostCommit_Java_ValidatesRunner_Dataflow.json`| [![.github/workflows/beam_PostCommit_Java_ValidatesRunner_Dataflow.yml](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_ValidatesRunner_Dataflow.yml/badge.svg?event=schedule)](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_ValidatesRunner_Dataflow.yml?query=event%3Aschedule) | diff --git a/.github/workflows/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming_GBK_Multiplexing.yml b/.github/workflows/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming_GBK_Multiplexing.yml new file mode 100644 index 000000000000..caf78b428389 --- /dev/null +++ b/.github/workflows/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming_GBK_Multiplexing.yml @@ -0,0 +1,97 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +name: PostCommit Java ValidatesRunner Dataflow Streaming GBK Multiplexing + +on: + schedule: + - cron: '30 4/8 * * *' + pull_request_target: + paths: ['release/trigger_all_tests.json', '.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming_GBK_Multiplexing.json'] + workflow_dispatch: + +# This allows a subsequently queued workflow run to interrupt previous runs +concurrency: + group: '${{ github.workflow }} @ ${{ github.event.issue.number || github.sha || github.head_ref || github.ref }}-${{ github.event.schedule || github.event.comment.id || github.event.sender.login }}' + cancel-in-progress: true + +#Setting explicit permissions for the action to avoid the default permissions which are `write-all` in case of pull_request_target event +permissions: + actions: write + pull-requests: write + checks: write + contents: read + deployments: read + id-token: none + issues: write + discussions: read + packages: read + pages: read + repository-projects: read + security-events: read + statuses: read + +env: + DEVELOCITY_ACCESS_KEY: ${{ secrets.GE_ACCESS_TOKEN }} + GRADLE_ENTERPRISE_CACHE_USERNAME: ${{ secrets.GE_CACHE_USERNAME }} + GRADLE_ENTERPRISE_CACHE_PASSWORD: ${{ secrets.GE_CACHE_PASSWORD }} + +jobs: + beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming_GBK_Multiplexing: + name: ${{ matrix.job_name }} (${{ matrix.job_phrase }}) + runs-on: [self-hosted, ubuntu-20.04, main] + timeout-minutes: 720 + strategy: + matrix: + job_name: [beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming_GBK_Multiplexing] + job_phrase: [Run Dataflow Streaming ValidatesRunner GBK Multiplexing] + if: | + github.event_name == 'workflow_dispatch' || + github.event_name == 'pull_request_target' || + (github.event_name == 'schedule' && github.repository == 'apache/beam') || + github.event.comment.body == 'Run Dataflow Streaming ValidatesRunner' + steps: + - uses: actions/checkout@v4 + - name: Setup repository + uses: ./.github/actions/setup-action + with: + comment_phrase: ${{ matrix.job_phrase }} + github_token: ${{ secrets.GITHUB_TOKEN }} + github_job: ${{ matrix.job_name }} (${{ matrix.job_phrase }}) + - name: Setup environment + uses: ./.github/actions/setup-environment-action + with: + java-version: default + - name: run validatesRunnerStreamingWithGbkMultiplexing script + uses: ./.github/actions/gradle-command-self-hosted-action + with: + gradle-command: :runners:google-cloud-dataflow-java:validatesRunnerStreamingWithGbkMultiplexing + max-workers: 12 + - name: Archive JUnit Test Results + uses: actions/upload-artifact@v4 + if: ${{ !success() }} + with: + name: JUnit Test Results + path: "**/build/reports/tests/" + - name: Publish JUnit Test Results + uses: EnricoMi/publish-unit-test-result-action@v2 + if: always() + with: + commit: '${{ env.prsha || env.GITHUB_SHA }}' + comment_mode: ${{ github.event_name == 'issue_comment' && 'always' || 'off' }} + files: '**/build/test-results/**/*.xml' + large_files: true \ No newline at end of file diff --git a/runners/google-cloud-dataflow-java/build.gradle b/runners/google-cloud-dataflow-java/build.gradle index aeace769b4c1..ba0d1b05e8bf 100644 --- a/runners/google-cloud-dataflow-java/build.gradle +++ b/runners/google-cloud-dataflow-java/build.gradle @@ -242,6 +242,34 @@ def createLegacyWorkerValidatesRunnerTest = { Map args -> } } +def createValidatesRunnerStreamingTest = { boolean enableGbkMultiplexing -> + def pipelineOptions = legacyPipelineOptions + ['--streaming'] + if (enableGbkMultiplexing) { + pipelineOptions = pipelineOptions + ['--experiments=enable_gbk_state_multiplexing'] + } + def name = 'validatesRunnerLegacyWorkerTestStreaming' + if (enableGbkMultiplexing) { + name = 'validatesRunnerLegacyWorkerTestStreamingGbkMultiplexing' + } + return createLegacyWorkerValidatesRunnerTest( + name: name, + pipelineOptions: pipelineOptions, + excludedCategories: [ + 'org.apache.beam.sdk.testing.UsesCommittedMetrics', + 'org.apache.beam.sdk.testing.UsesMapState', + 'org.apache.beam.sdk.testing.UsesRequiresTimeSortedInput', + 'org.apache.beam.sdk.testing.UsesSetState', + ], + excludedTests: [ + // TODO(https://github.com/apache/beam/issues/21472) + 'org.apache.beam.sdk.transforms.GroupByKeyTest$BasicTests.testAfterProcessingTimeContinuationTriggerUsingState', + // GroupIntoBatches.withShardedKey not supported on streaming runner v1 + // https://github.com/apache/beam/issues/22592 + 'org.apache.beam.sdk.transforms.GroupIntoBatchesTest.testWithShardedKeyInGlobalWindow', + ] + ) +} + def createRunnerV2ValidatesRunnerTest = { Map args -> def name = args.name def pipelineOptions = args.pipelineOptions ?: runnerV2PipelineOptions @@ -460,24 +488,13 @@ task validatesRunner { task validatesRunnerStreaming { group = "Verification" description "Validates Dataflow runner forcing streaming mode" - dependsOn(createLegacyWorkerValidatesRunnerTest( - name: 'validatesRunnerLegacyWorkerTestStreaming', - pipelineOptions: legacyPipelineOptions + ['--streaming'] - + ['--experiments=enable_gbk_state_multiplexing'], - excludedCategories: [ - 'org.apache.beam.sdk.testing.UsesCommittedMetrics', - 'org.apache.beam.sdk.testing.UsesMapState', - 'org.apache.beam.sdk.testing.UsesRequiresTimeSortedInput', - 'org.apache.beam.sdk.testing.UsesSetState', - ], - excludedTests: [ - // TODO(https://github.com/apache/beam/issues/21472) - 'org.apache.beam.sdk.transforms.GroupByKeyTest$BasicTests.testAfterProcessingTimeContinuationTriggerUsingState', - // GroupIntoBatches.withShardedKey not supported on streaming runner v1 - // https://github.com/apache/beam/issues/22592 - 'org.apache.beam.sdk.transforms.GroupIntoBatchesTest.testWithShardedKeyInGlobalWindow', - ] - )) + dependsOn(createValidatesRunnerStreamingTest(/*enableGbkMultiplexing=*/false)) +} + +task validatesRunnerStreamingWithGbkMultiplexing { + group = "Verification" + description "Validates Dataflow runner forcing streaming mode" + dependsOn(createValidatesRunnerStreamingTest(/*enableGbkMultiplexing=*/true)) } def setupXVR = tasks.register("setupXVR") { From e432f933989aa89c3c019a7cdd4984d95bf2c6d5 Mon Sep 17 00:00:00 2001 From: Arun Pandian Date: Mon, 16 Dec 2024 03:04:04 -0800 Subject: [PATCH 07/11] fix --- runners/google-cloud-dataflow-java/build.gradle | 8 ++++---- .../beam/runners/dataflow/internal/KeyedWindow.java | 2 +- .../dataflow/internal/StateMultiplexingGroupByKey.java | 2 +- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/runners/google-cloud-dataflow-java/build.gradle b/runners/google-cloud-dataflow-java/build.gradle index ba0d1b05e8bf..9cf8f3ad2517 100644 --- a/runners/google-cloud-dataflow-java/build.gradle +++ b/runners/google-cloud-dataflow-java/build.gradle @@ -244,10 +244,10 @@ def createLegacyWorkerValidatesRunnerTest = { Map args -> def createValidatesRunnerStreamingTest = { boolean enableGbkMultiplexing -> def pipelineOptions = legacyPipelineOptions + ['--streaming'] - if (enableGbkMultiplexing) { - pipelineOptions = pipelineOptions + ['--experiments=enable_gbk_state_multiplexing'] - } def name = 'validatesRunnerLegacyWorkerTestStreaming' +// if (enableGbkMultiplexing) { + pipelineOptions = pipelineOptions + ['--experiments=enable_gbk_state_multiplexing'] +// } if (enableGbkMultiplexing) { name = 'validatesRunnerLegacyWorkerTestStreamingGbkMultiplexing' } @@ -493,7 +493,7 @@ task validatesRunnerStreaming { task validatesRunnerStreamingWithGbkMultiplexing { group = "Verification" - description "Validates Dataflow runner forcing streaming mode" + description "Validates Dataflow runner forcing streaming mode with GBK state multiplexing" dependsOn(createValidatesRunnerStreamingTest(/*enableGbkMultiplexing=*/true)) } diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/KeyedWindow.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/KeyedWindow.java index 06ad80e455c8..0268d50025b1 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/KeyedWindow.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/KeyedWindow.java @@ -17,7 +17,6 @@ */ package org.apache.beam.runners.dataflow.internal; -import com.google.common.base.Preconditions; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -39,6 +38,7 @@ import org.apache.beam.sdk.transforms.windowing.WindowMappingFn; import org.apache.beam.sdk.values.KV; import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; import org.checkerframework.checker.nullness.qual.NonNull; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Instant; diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/StateMultiplexingGroupByKey.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/StateMultiplexingGroupByKey.java index 5d97ef77d1e8..41514d5b0cb3 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/StateMultiplexingGroupByKey.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/StateMultiplexingGroupByKey.java @@ -17,7 +17,6 @@ */ package org.apache.beam.runners.dataflow.internal; -import com.google.common.base.Preconditions; import java.io.IOException; import java.util.Arrays; import java.util.Collections; @@ -48,6 +47,7 @@ import org.apache.beam.sdk.values.TupleTagList; import org.apache.beam.sdk.values.WindowingStrategy; import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; /** * A GroupByKey implementation that multiplexes many small user keys over a fixed set of sharding From d6a2974690107030b40e2d0ed28661bd0251c668 Mon Sep 17 00:00:00 2001 From: Arun Pandian Date: Mon, 16 Dec 2024 04:08:49 -0800 Subject: [PATCH 08/11] use nullable coder --- .../dataflow/internal/StateMultiplexingGroupByKey.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/StateMultiplexingGroupByKey.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/StateMultiplexingGroupByKey.java index 41514d5b0cb3..4df6a745c783 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/StateMultiplexingGroupByKey.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/StateMultiplexingGroupByKey.java @@ -27,6 +27,7 @@ import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.Coder.NonDeterministicException; import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.NullableCoder; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Flatten; import org.apache.beam.sdk.transforms.MapElements; @@ -164,7 +165,7 @@ public void processElement(ProcessContext c) { PCollection>> largeKeyBranch = mapKeysToBytes .get(largeKeys) - .setCoder(KvCoder.of(ByteStringCoder.of(), valueCoder)) + .setCoder(KvCoder.of(NullableCoder.of(ByteStringCoder.of()), valueCoder)) .apply( fewKeys ? DataflowGroupByKey.createWithFewKeys() @@ -176,6 +177,9 @@ public void processElement(ProcessContext c) { @Override public KV> apply(KV> kv) { try { + if (kv.getKey() == null) { + return KV.of((K) null, kv.getValue()); + } return KV.of(keyCoder.decode(kv.getKey().newInput()), kv.getValue()); } catch (IOException e) { throw new RuntimeException(e); From b15fe3536c7fc8fcd2f4cc4bf77653a9db1f0c3e Mon Sep 17 00:00:00 2001 From: Arun Pandian Date: Mon, 16 Dec 2024 05:45:46 -0800 Subject: [PATCH 09/11] disable multiplexing on VoidCoder --- .../beam/runners/dataflow/DataflowRunner.java | 3 +- ...ultiplexingGroupByKeyTransformMatcher.java | 62 +++++++++++++++++++ .../internal/StateMultiplexingGroupByKey.java | 1 + 3 files changed, 64 insertions(+), 2 deletions(-) create mode 100644 runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/StateMultiplexingGroupByKeyTransformMatcher.java diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index 676ceb495c21..a7b080e87841 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -119,7 +119,6 @@ import org.apache.beam.sdk.transforms.Combine.GroupedValues; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.GroupIntoBatches; import org.apache.beam.sdk.transforms.Impulse; import org.apache.beam.sdk.transforms.PTransform; @@ -805,7 +804,7 @@ private List getOverrides(boolean streaming) { options, StateMultiplexingGroupByKey.EXPERIMENT_ENABLE_GBK_STATE_MULTIPLEXING)) { overridesBuilder.add( PTransformOverride.of( - PTransformMatchers.classEqualTo(GroupByKey.class), + StateMultiplexingGroupByKeyTransformMatcher.getInstance(), new StateMultiplexingGroupByKeyOverrideFactory<>(options))); } // For update compatibility, always use a Read for Create in streaming mode. diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/StateMultiplexingGroupByKeyTransformMatcher.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/StateMultiplexingGroupByKeyTransformMatcher.java new file mode 100644 index 000000000000..06953a0928d5 --- /dev/null +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/StateMultiplexingGroupByKeyTransformMatcher.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow; + +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.VoidCoder; +import org.apache.beam.sdk.runners.AppliedPTransform; +import org.apache.beam.sdk.runners.PTransformMatcher; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.values.PCollection; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class StateMultiplexingGroupByKeyTransformMatcher implements PTransformMatcher { + + private static final Logger LOG = + LoggerFactory.getLogger(StateMultiplexingGroupByKeyTransformMatcher.class); + private static final StateMultiplexingGroupByKeyTransformMatcher INSTANCE = + new StateMultiplexingGroupByKeyTransformMatcher(); + + @Override + public boolean matches(AppliedPTransform application) { + LOG.info(application.getFullName()); + if (!(application.getTransform() instanceof GroupByKey)) { + LOG.info(application.getFullName() + " returning false"); + return false; + } + for (PCollection pCollection : application.getMainInputs().values()) { + LOG.info(application.getFullName() + " " + pCollection.getCoder()); + Coder coder = pCollection.getCoder(); + if (!(coder instanceof KvCoder)) { + return false; + } + // Don't enable multiplexing on Void keys + if (((KvCoder) coder).getKeyCoder() instanceof VoidCoder) { + return false; + } + } + LOG.info(application.getFullName() + " returning true"); + return true; + } + + public static StateMultiplexingGroupByKeyTransformMatcher getInstance() { + return INSTANCE; + } +} diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/StateMultiplexingGroupByKey.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/StateMultiplexingGroupByKey.java index 4df6a745c783..831d9493af82 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/StateMultiplexingGroupByKey.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/StateMultiplexingGroupByKey.java @@ -201,6 +201,7 @@ public KV> apply(KV> kv) { new SimpleFunction, KV>() { @Override public KV apply(KV value) { + // should we use a different hash code? return KV.of(value.getKey().hashCode() % numVirtualKeys, value.getValue()); } })) From 76f471d140dfc24074e3aaf05d97af2c47f35c0f Mon Sep 17 00:00:00 2001 From: Arun Pandian Date: Tue, 17 Dec 2024 00:04:11 -0800 Subject: [PATCH 10/11] fix GlobalWindows + IdentityWindows --- .../beam/sdk/transforms/windowing/Window.java | 2 ++ .../org/apache/beam/sdk/util/IdentityWindowFn.java | 14 ++++++++------ 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java index 32e218d3cb11..2aa18fa5ab5b 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java @@ -356,6 +356,8 @@ private void applicableTo(PCollection input) { if (outputStrategy.isTriggerSpecified() && !(outputStrategy.getTrigger() instanceof DefaultTrigger) && !(outputStrategy.getWindowFn() instanceof GlobalWindows) + // :TODO Add proper logic for Keyed Window here + && !(outputStrategy.getWindowFn().getClass().getName().contains("KeyedWindow")) && !outputStrategy.isAllowedLatenessSpecified()) { throw new IllegalArgumentException( "Except when using GlobalWindows," diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IdentityWindowFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IdentityWindowFn.java index ca78dc60f2fe..a3d802cc740b 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IdentityWindowFn.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IdentityWindowFn.java @@ -70,12 +70,14 @@ public Collection assignWindows(WindowFn.Assign @Override public boolean isCompatible(WindowFn other) { - throw new UnsupportedOperationException( - String.format( - "%s.isCompatible() should never be called." - + " It is a private implementation detail of sdk utilities." - + " This message indicates a bug in the Beam SDK.", - getClass().getCanonicalName())); + // :TODO anything else to consider here? + // throw new UnsupportedOperationException( + // String.format( + // "%s.isCompatible() should never be called." + // + " It is a private implementation detail of sdk utilities." + // + " This message indicates a bug in the Beam SDK.", + // getClass().getCanonicalName())); + return other instanceof IdentityWindowFn; } @Override From 217feb164956afb04ee9499822fa62ab6711ec8e Mon Sep 17 00:00:00 2001 From: Arun Pandian Date: Tue, 17 Dec 2024 05:12:20 -0800 Subject: [PATCH 11/11] fix GlobalWindows + keyedwindows --- ...ultiplexingGroupByKeyTransformMatcher.java | 5 +- .../internal/StateMultiplexingGroupByKey.java | 9 +- .../transforms/windowing}/KeyedWindow.java | 85 +++++++++---------- .../beam/sdk/transforms/windowing/Window.java | 15 +++- 4 files changed, 58 insertions(+), 56 deletions(-) rename {runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal => sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing}/KeyedWindow.java (66%) diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/StateMultiplexingGroupByKeyTransformMatcher.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/StateMultiplexingGroupByKeyTransformMatcher.java index 06953a0928d5..df4aca80d6ea 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/StateMultiplexingGroupByKeyTransformMatcher.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/StateMultiplexingGroupByKeyTransformMatcher.java @@ -36,13 +36,10 @@ class StateMultiplexingGroupByKeyTransformMatcher implements PTransformMatcher { @Override public boolean matches(AppliedPTransform application) { - LOG.info(application.getFullName()); if (!(application.getTransform() instanceof GroupByKey)) { - LOG.info(application.getFullName() + " returning false"); return false; } for (PCollection pCollection : application.getMainInputs().values()) { - LOG.info(application.getFullName() + " " + pCollection.getCoder()); Coder coder = pCollection.getCoder(); if (!(coder instanceof KvCoder)) { return false; @@ -52,7 +49,7 @@ public boolean matches(AppliedPTransform application) { return false; } } - LOG.info(application.getFullName() + " returning true"); + LOG.info("Enabling State Multiplexing on {}", application.getFullName()); return true; } diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/StateMultiplexingGroupByKey.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/StateMultiplexingGroupByKey.java index 831d9493af82..78ad6635813e 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/StateMultiplexingGroupByKey.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/StateMultiplexingGroupByKey.java @@ -21,7 +21,6 @@ import java.util.Arrays; import java.util.Collections; import org.apache.beam.runners.dataflow.DataflowRunner; -import org.apache.beam.runners.dataflow.internal.KeyedWindow.KeyedWindowFn; import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; import org.apache.beam.runners.dataflow.util.ByteStringCoder; import org.apache.beam.sdk.coders.Coder; @@ -36,6 +35,8 @@ import org.apache.beam.sdk.transforms.SimpleFunction; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.KeyedWindow; +import org.apache.beam.sdk.transforms.windowing.KeyedWindow.KeyedWindowFn; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.transforms.windowing.WindowFn; @@ -194,7 +195,7 @@ public KV> apply(KV> kv) { PCollection>> smallKeyBranch = mapKeysToBytes .get(smallKeys) - .apply(Window.into(new KeyedWindowFn<>(originalWindowFn))) + .apply(Window.into(new KeyedWindowFn<>(ByteStringCoder.of(), originalWindowFn))) .apply( "MapKeysToVirtualKeys", MapElements.via( @@ -215,13 +216,13 @@ public KV apply(KV value) { new DoFn>, KV>>() { @ProcessElement public void processElement(ProcessContext c, BoundedWindow w, PaneInfo pane) { - ByteString key = ((KeyedWindow) w).getKey(); + ByteString key = ((KeyedWindow) w).getKey(); try { // is it correct to use the pane from Keyed window here? c.outputWindowedValue( KV.of(keyCoder.decode(key.newInput()), c.element().getValue()), c.timestamp(), - Collections.singleton(((KeyedWindow) w).getWindow()), + Collections.singleton(((KeyedWindow) w).getWindow()), pane); } catch (IOException e) { throw new RuntimeException(e); diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/KeyedWindow.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/KeyedWindow.java similarity index 66% rename from runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/KeyedWindow.java rename to sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/KeyedWindow.java index 0268d50025b1..7c6b0c1105c6 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/KeyedWindow.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/KeyedWindow.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.runners.dataflow.internal; +package org.apache.beam.sdk.transforms.windowing; import java.io.IOException; import java.io.InputStream; @@ -27,33 +27,27 @@ import java.util.Map.Entry; import java.util.Objects; import java.util.stream.Collectors; -import org.apache.beam.runners.dataflow.util.ByteStringCoder; +import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.transforms.display.DisplayData; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.IncompatibleWindowException; -import org.apache.beam.sdk.transforms.windowing.NonMergingWindowFn; -import org.apache.beam.sdk.transforms.windowing.WindowFn; -import org.apache.beam.sdk.transforms.windowing.WindowMappingFn; import org.apache.beam.sdk.values.KV; -import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; import org.checkerframework.checker.nullness.qual.NonNull; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Instant; -public class KeyedWindow extends BoundedWindow { +@Internal +public class KeyedWindow extends BoundedWindow { - private final ByteString key; + private final K key; private final W window; - public KeyedWindow(ByteString name, W window) { - this.key = name; + public KeyedWindow(K key, W window) { + this.key = key; this.window = window; } - public ByteString getKey() { + public K getKey() { return key; } @@ -82,7 +76,7 @@ public boolean equals(@Nullable Object o) { if (!(o instanceof KeyedWindow)) { return false; } - KeyedWindow that = (KeyedWindow) o; + KeyedWindow that = (KeyedWindow) o; return Objects.equals(key, that.key) && Objects.equals(window, that.window); } @@ -91,18 +85,25 @@ public int hashCode() { return Objects.hash(key, window); } - public static class KeyedWindowFn - extends WindowFn, KeyedWindow> { + @Internal + public static class KeyedWindowFn + extends WindowFn, KeyedWindow> { private final WindowFn windowFn; + private final Coder keyCoder; - public KeyedWindowFn(WindowFn windowFn) { + public KeyedWindowFn(Coder keyCoder, WindowFn windowFn) { + this.keyCoder = keyCoder; this.windowFn = (WindowFn) windowFn; } + public WindowFn getInnerWindowFn() { + return windowFn; + } + @Override - public Collection> assignWindows( - WindowFn, KeyedWindow>.AssignContext c) throws Exception { + public Collection> assignWindows( + WindowFn, KeyedWindow>.AssignContext c) throws Exception { return windowFn .assignWindows( @@ -129,11 +130,12 @@ public BoundedWindow window() { } @Override - public void mergeWindows(WindowFn, KeyedWindow>.MergeContext c) throws Exception { + public void mergeWindows(WindowFn, KeyedWindow>.MergeContext c) + throws Exception { if (windowFn instanceof NonMergingWindowFn) { return; } - HashMap> keyToWindow = new HashMap<>(); + HashMap> keyToWindow = new HashMap<>(); c.windows() .forEach( keyedWindow -> { @@ -141,8 +143,8 @@ public void mergeWindows(WindowFn, KeyedWindow>.MergeContext c) thro keyToWindow.computeIfAbsent(keyedWindow.getKey(), k -> new ArrayList<>()); windows.add(keyedWindow.getWindow()); }); - for (Entry> entry : keyToWindow.entrySet()) { - ByteString key = entry.getKey(); + for (Entry> entry : keyToWindow.entrySet()) { + K key = entry.getKey(); List windows = entry.getValue(); windowFn.mergeWindows( new WindowFn.MergeContext() { @@ -153,7 +155,7 @@ public Collection windows() { @Override public void merge(Collection toBeMerged, W mergeResult) throws Exception { - List> toMergedKeyedWindows = + List> toMergedKeyedWindows = toBeMerged.stream() .map(window -> new KeyedWindow<>(key, window)) .collect(Collectors.toList()); @@ -170,22 +172,13 @@ public boolean isCompatible(WindowFn other) { } @Override - public Coder> windowCoder() { - return new KeyedWindowCoder<>(windowFn.windowCoder()); + public Coder> windowCoder() { + return new KeyedWindowCoder<>(keyCoder, windowFn.windowCoder()); } @Override - public WindowMappingFn> getDefaultWindowMappingFn() { - return new WindowMappingFn>() { - @Override - public KeyedWindow getSideInputWindow(BoundedWindow mainWindow) { - Preconditions.checkArgument(mainWindow instanceof KeyedWindow); - KeyedWindow mainKeyedWindow = (KeyedWindow) mainWindow; - return new KeyedWindow<>( - mainKeyedWindow.getKey(), - windowFn.getDefaultWindowMappingFn().getSideInputWindow(mainKeyedWindow.getWindow())); - } - }; + public WindowMappingFn> getDefaultWindowMappingFn() { + throw new UnsupportedOperationException("KeyedWindow not supported with side inputs"); } @Override @@ -211,23 +204,25 @@ public void populateDisplayData(DisplayData.Builder builder) { } } - public static class KeyedWindowCoder extends Coder> { + @Internal + public static class KeyedWindowCoder + extends Coder> { - private final KvCoder coder; + private final KvCoder coder; - public KeyedWindowCoder(Coder windowCoder) { + public KeyedWindowCoder(Coder keyCoder, Coder windowCoder) { // :TODO consider swapping the order for improved state locality - this.coder = KvCoder.of(ByteStringCoder.of(), windowCoder); + this.coder = KvCoder.of(keyCoder, windowCoder); } @Override - public void encode(KeyedWindow value, OutputStream outStream) throws IOException { + public void encode(KeyedWindow value, OutputStream outStream) throws IOException { coder.encode(KV.of(value.getKey(), value.getWindow()), outStream); } @Override - public KeyedWindow decode(InputStream inStream) throws IOException { - KV decode = coder.decode(inStream); + public KeyedWindow decode(InputStream inStream) throws IOException { + KV decode = coder.decode(inStream); return new KeyedWindow<>(decode.getKey(), decode.getValue()); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java index 2aa18fa5ab5b..7ebf03ec86e0 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java @@ -25,6 +25,7 @@ import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.SimpleFunction; import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.windowing.KeyedWindow.KeyedWindowFn; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionList; import org.apache.beam.sdk.values.WindowingStrategy; @@ -214,6 +215,7 @@ public static Window configure() { @AutoValue.Builder abstract static class Builder { + abstract Builder setWindowFn(WindowFn windowFn); abstract Builder setTrigger(Trigger trigger); @@ -352,12 +354,17 @@ private void applicableTo(PCollection input) { WindowingStrategy outputStrategy = getOutputStrategyInternal(input.getWindowingStrategy()); + boolean isGlobalWindow = (outputStrategy.getWindowFn() instanceof GlobalWindows); + if (outputStrategy.getWindowFn() instanceof KeyedWindow.KeyedWindowFn) { + isGlobalWindow = + (((KeyedWindowFn) outputStrategy.getWindowFn()).getInnerWindowFn() + instanceof GlobalWindows); + } + // Make sure that the windowing strategy is complete & valid. if (outputStrategy.isTriggerSpecified() && !(outputStrategy.getTrigger() instanceof DefaultTrigger) - && !(outputStrategy.getWindowFn() instanceof GlobalWindows) - // :TODO Add proper logic for Keyed Window here - && !(outputStrategy.getWindowFn().getClass().getName().contains("KeyedWindow")) + && !(isGlobalWindow) && !outputStrategy.isAllowedLatenessSpecified()) { throw new IllegalArgumentException( "Except when using GlobalWindows," @@ -456,6 +463,7 @@ protected String getKindString() { * Pipeline authors should use {@link Window} directly instead. */ public static class Assign extends PTransform, PCollection> { + private final @Nullable Window original; private final WindowingStrategy updatedStrategy; @@ -506,6 +514,7 @@ public static Remerge remerge() { * again as part of the next {@link org.apache.beam.sdk.transforms.GroupByKey}. */ private static class Remerge extends PTransform, PCollection> { + @Override public PCollection expand(PCollection input) { return input