diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFn.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFn.java index a413c2c03dbe..558848f488a7 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFn.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFn.java @@ -77,6 +77,7 @@ "nullness" // TODO(https://github.com/apache/beam/issues/20497) }) public class SimpleParDoFn implements ParDoFn { + // TODO: Remove once Distributions has shipped. @VisibleForTesting static final String OUTPUTS_PER_ELEMENT_EXPERIMENT = "outputs_per_element_counter"; @@ -174,6 +175,7 @@ private boolean hasExperiment(String experiment) { /** Simple state tracker to calculate PerElementOutputCount counter. */ private interface OutputsPerElementTracker { + void onOutput(); void onProcessElement(); @@ -182,6 +184,7 @@ private interface OutputsPerElementTracker { } private class OutputsPerElementTrackerImpl implements OutputsPerElementTracker { + private long outputsPerElement; private final Counter counter; @@ -214,6 +217,7 @@ private void reset() { /** No-op {@link OutputsPerElementTracker} implementation used when the counter is disabled. */ private static class NoopOutputsPerElementTracker implements OutputsPerElementTracker { + private NoopOutputsPerElementTracker() {} public static final OutputsPerElementTracker INSTANCE = new NoopOutputsPerElementTracker(); @@ -516,10 +520,14 @@ private void registerStateCleanup( private Instant earliestAllowableCleanupTime( BoundedWindow window, WindowingStrategy windowingStrategy) { - return window - .maxTimestamp() - .plus(windowingStrategy.getAllowedLateness()) - .plus(Duration.millis(1L)); + Instant cleanupTime = + window + .maxTimestamp() + .plus(windowingStrategy.getAllowedLateness()) + .plus(Duration.millis(1L)); + return cleanupTime.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE) + ? BoundedWindow.TIMESTAMP_MAX_VALUE + : cleanupTime; } /** diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/UserParDoFnFactoryTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/UserParDoFnFactoryTest.java index ff114ef2f078..c1e5000f03da 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/UserParDoFnFactoryTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/UserParDoFnFactoryTest.java @@ -18,6 +18,7 @@ package org.apache.beam.runners.dataflow.worker; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.theInstance; @@ -153,6 +154,21 @@ private static class TestStatefulDoFn extends DoFn, Void> { public void processElement(ProcessContext c) {} } + private static class TestStatefulDoFnWithWindowExpiration + extends DoFn, Void> { + + public static final String STATE_ID = "state-id"; + + @StateId(STATE_ID) + private final StateSpec> spec = StateSpecs.value(StringUtf8Coder.of()); + + @ProcessElement + public void processElement(ProcessContext c) {} + + @OnWindowExpiration + public void onWindowExpiration() {} + } + private static final TupleTag MAIN_OUTPUT = new TupleTag<>("1"); private UserParDoFnFactory factory = UserParDoFnFactory.createDefault(); @@ -373,6 +389,92 @@ public void testCleanupRegistered() throws Exception { firstWindow.maxTimestamp().plus(Duration.millis(1L))); } + /** + * Regression test for global window + OnWindowExpiration + allowed lateness > max allowed time + */ + @Test + public void testCleanupTimerForGlobalWindowWithAllowedLateness() throws Exception { + PipelineOptions options = PipelineOptionsFactory.create(); + CounterSet counters = new CounterSet(); + DoFn initialFn = new TestStatefulDoFnWithWindowExpiration(); + Duration allowedLateness = Duration.standardDays(2); + CloudObject cloudObject = + getCloudObject( + initialFn, WindowingStrategy.globalDefault().withAllowedLateness(allowedLateness)); + + StateInternals stateInternals = InMemoryStateInternals.forKey("dummy"); + + TimerInternals timerInternals = mock(TimerInternals.class); + + DataflowStepContext stepContext = mock(DataflowStepContext.class); + when(stepContext.timerInternals()).thenReturn(timerInternals); + DataflowStepContext userStepContext = mock(DataflowStepContext.class); + when(stepContext.namespacedToUser()).thenReturn(userStepContext); + when(stepContext.stateInternals()).thenReturn(stateInternals); + when(userStepContext.stateInternals()).thenReturn((StateInternals) stateInternals); + + DataflowExecutionContext executionContext = + mock(DataflowExecutionContext.class); + TestOperationContext operationContext = TestOperationContext.create(counters); + when(executionContext.getStepContext(operationContext)).thenReturn(stepContext); + when(executionContext.getSideInputReader(any(), any(), any())) + .thenReturn(NullSideInputReader.empty()); + + ParDoFn parDoFn = + factory.create( + options, + cloudObject, + Collections.emptyList(), + MAIN_OUTPUT, + ImmutableMap.of(MAIN_OUTPUT, 0), + executionContext, + operationContext); + + Receiver rcvr = new OutputReceiver(); + parDoFn.startBundle(rcvr); + + GlobalWindow globalWindow = GlobalWindow.INSTANCE; + parDoFn.processElement( + WindowedValue.of("foo", new Instant(1), globalWindow, PaneInfo.NO_FIRING)); + + assertThat( + globalWindow.maxTimestamp().plus(allowedLateness), + greaterThan(BoundedWindow.TIMESTAMP_MAX_VALUE)); + verify(stepContext) + .setStateCleanupTimer( + SimpleParDoFn.CLEANUP_TIMER_ID, + globalWindow, + GlobalWindow.Coder.INSTANCE, + BoundedWindow.TIMESTAMP_MAX_VALUE, + BoundedWindow.TIMESTAMP_MAX_VALUE.minus(Duration.millis(1))); + + StateNamespace globalWindowNamespace = + StateNamespaces.window(GlobalWindow.Coder.INSTANCE, globalWindow); + StateTag> tag = + StateTags.tagForSpec( + TestStatefulDoFnWithWindowExpiration.STATE_ID, StateSpecs.value(StringUtf8Coder.of())); + + when(userStepContext.getNextFiredTimer((Coder) GlobalWindow.Coder.INSTANCE)).thenReturn(null); + when(stepContext.getNextFiredTimer((Coder) GlobalWindow.Coder.INSTANCE)) + .thenReturn( + TimerData.of( + SimpleParDoFn.CLEANUP_TIMER_ID, + globalWindowNamespace, + BoundedWindow.TIMESTAMP_MAX_VALUE, + BoundedWindow.TIMESTAMP_MAX_VALUE.minus(Duration.millis(1)), + TimeDomain.EVENT_TIME)) + .thenReturn(null); + + // Set up non-empty state. We don't mock + verify calls to clear() but instead + // check that state is actually empty. We mustn't care how it is accomplished. + stateInternals.state(globalWindowNamespace, tag).write("first"); + + // And this should clean up the second window + parDoFn.processTimers(); + + assertThat(stateInternals.state(globalWindowNamespace, tag).read(), nullValue()); + } + @Test public void testCleanupWorks() throws Exception { PipelineOptions options = PipelineOptionsFactory.create();