diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java index 98519cd508ba..01b12cfa717a 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java @@ -812,7 +812,9 @@ public final void processElement2(StreamRecord streamRecord) thro WindowedValue element = it.next(); // we need to set the correct key in case the operator is // a (keyed) window operator - setKeyContextElement1(new StreamRecord<>(element)); + if (keySelector != null) { + setCurrentKey(keySelector.getKey(element)); + } Iterable> justPushedBack = pushbackDoFnRunner.processElementInReadyWindows(element); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java index 5464838ad4db..3e033183a019 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java @@ -254,9 +254,7 @@ public void testAfterProcessingTimeContinuationTriggerUsingState() throws Except final long waitMillis = 500; PCollection triggeredSums = - p.apply( - GenerateSequence.from(0) - .to(1)) // forces unbounded so delay cannot be fast-forwarded + p.apply(Create.of(0L)) .apply(WithKeys.of("dummy key")) .apply( "output then delay",