Skip to content

Commit

Permalink
Fix cleanup timer timestamp to not exceed max allowed timestamp (apac…
Browse files Browse the repository at this point in the history
…he#33037)

This fixes an exception during drain on jobs with GlobalWindows + AllowedLateness > 24h + @OnExpiredWindows callback
  • Loading branch information
arunpandianp authored Nov 11, 2024
1 parent 073aac9 commit fca0bea
Show file tree
Hide file tree
Showing 2 changed files with 114 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
})
public class SimpleParDoFn<InputT, OutputT> implements ParDoFn {

// TODO: Remove once Distributions has shipped.
@VisibleForTesting
static final String OUTPUTS_PER_ELEMENT_EXPERIMENT = "outputs_per_element_counter";
Expand Down Expand Up @@ -174,6 +175,7 @@ private boolean hasExperiment(String experiment) {

/** Simple state tracker to calculate PerElementOutputCount counter. */
private interface OutputsPerElementTracker {

void onOutput();

void onProcessElement();
Expand All @@ -182,6 +184,7 @@ private interface OutputsPerElementTracker {
}

private class OutputsPerElementTrackerImpl implements OutputsPerElementTracker {

private long outputsPerElement;
private final Counter<Long, CounterFactory.CounterDistribution> counter;

Expand Down Expand Up @@ -214,6 +217,7 @@ private void reset() {

/** No-op {@link OutputsPerElementTracker} implementation used when the counter is disabled. */
private static class NoopOutputsPerElementTracker implements OutputsPerElementTracker {

private NoopOutputsPerElementTracker() {}

public static final OutputsPerElementTracker INSTANCE = new NoopOutputsPerElementTracker();
Expand Down Expand Up @@ -516,10 +520,14 @@ private <W extends BoundedWindow> void registerStateCleanup(

private Instant earliestAllowableCleanupTime(
BoundedWindow window, WindowingStrategy windowingStrategy) {
return window
.maxTimestamp()
.plus(windowingStrategy.getAllowedLateness())
.plus(Duration.millis(1L));
Instant cleanupTime =
window
.maxTimestamp()
.plus(windowingStrategy.getAllowedLateness())
.plus(Duration.millis(1L));
return cleanupTime.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE)
? BoundedWindow.TIMESTAMP_MAX_VALUE
: cleanupTime;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.beam.runners.dataflow.worker;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.theInstance;
Expand Down Expand Up @@ -153,6 +154,21 @@ private static class TestStatefulDoFn extends DoFn<KV<String, Integer>, Void> {
public void processElement(ProcessContext c) {}
}

private static class TestStatefulDoFnWithWindowExpiration
extends DoFn<KV<String, Integer>, Void> {

public static final String STATE_ID = "state-id";

@StateId(STATE_ID)
private final StateSpec<ValueState<String>> spec = StateSpecs.value(StringUtf8Coder.of());

@ProcessElement
public void processElement(ProcessContext c) {}

@OnWindowExpiration
public void onWindowExpiration() {}
}

private static final TupleTag<String> MAIN_OUTPUT = new TupleTag<>("1");

private UserParDoFnFactory factory = UserParDoFnFactory.createDefault();
Expand Down Expand Up @@ -373,6 +389,92 @@ public void testCleanupRegistered() throws Exception {
firstWindow.maxTimestamp().plus(Duration.millis(1L)));
}

/**
* Regression test for global window + OnWindowExpiration + allowed lateness > max allowed time
*/
@Test
public void testCleanupTimerForGlobalWindowWithAllowedLateness() throws Exception {
PipelineOptions options = PipelineOptionsFactory.create();
CounterSet counters = new CounterSet();
DoFn<?, ?> initialFn = new TestStatefulDoFnWithWindowExpiration();
Duration allowedLateness = Duration.standardDays(2);
CloudObject cloudObject =
getCloudObject(
initialFn, WindowingStrategy.globalDefault().withAllowedLateness(allowedLateness));

StateInternals stateInternals = InMemoryStateInternals.forKey("dummy");

TimerInternals timerInternals = mock(TimerInternals.class);

DataflowStepContext stepContext = mock(DataflowStepContext.class);
when(stepContext.timerInternals()).thenReturn(timerInternals);
DataflowStepContext userStepContext = mock(DataflowStepContext.class);
when(stepContext.namespacedToUser()).thenReturn(userStepContext);
when(stepContext.stateInternals()).thenReturn(stateInternals);
when(userStepContext.stateInternals()).thenReturn((StateInternals) stateInternals);

DataflowExecutionContext<DataflowStepContext> executionContext =
mock(DataflowExecutionContext.class);
TestOperationContext operationContext = TestOperationContext.create(counters);
when(executionContext.getStepContext(operationContext)).thenReturn(stepContext);
when(executionContext.getSideInputReader(any(), any(), any()))
.thenReturn(NullSideInputReader.empty());

ParDoFn parDoFn =
factory.create(
options,
cloudObject,
Collections.emptyList(),
MAIN_OUTPUT,
ImmutableMap.of(MAIN_OUTPUT, 0),
executionContext,
operationContext);

Receiver rcvr = new OutputReceiver();
parDoFn.startBundle(rcvr);

GlobalWindow globalWindow = GlobalWindow.INSTANCE;
parDoFn.processElement(
WindowedValue.of("foo", new Instant(1), globalWindow, PaneInfo.NO_FIRING));

assertThat(
globalWindow.maxTimestamp().plus(allowedLateness),
greaterThan(BoundedWindow.TIMESTAMP_MAX_VALUE));
verify(stepContext)
.setStateCleanupTimer(
SimpleParDoFn.CLEANUP_TIMER_ID,
globalWindow,
GlobalWindow.Coder.INSTANCE,
BoundedWindow.TIMESTAMP_MAX_VALUE,
BoundedWindow.TIMESTAMP_MAX_VALUE.minus(Duration.millis(1)));

StateNamespace globalWindowNamespace =
StateNamespaces.window(GlobalWindow.Coder.INSTANCE, globalWindow);
StateTag<ValueState<String>> tag =
StateTags.tagForSpec(
TestStatefulDoFnWithWindowExpiration.STATE_ID, StateSpecs.value(StringUtf8Coder.of()));

when(userStepContext.getNextFiredTimer((Coder) GlobalWindow.Coder.INSTANCE)).thenReturn(null);
when(stepContext.getNextFiredTimer((Coder) GlobalWindow.Coder.INSTANCE))
.thenReturn(
TimerData.of(
SimpleParDoFn.CLEANUP_TIMER_ID,
globalWindowNamespace,
BoundedWindow.TIMESTAMP_MAX_VALUE,
BoundedWindow.TIMESTAMP_MAX_VALUE.minus(Duration.millis(1)),
TimeDomain.EVENT_TIME))
.thenReturn(null);

// Set up non-empty state. We don't mock + verify calls to clear() but instead
// check that state is actually empty. We mustn't care how it is accomplished.
stateInternals.state(globalWindowNamespace, tag).write("first");

// And this should clean up the second window
parDoFn.processTimers();

assertThat(stateInternals.state(globalWindowNamespace, tag).read(), nullValue());
}

@Test
public void testCleanupWorks() throws Exception {
PipelineOptions options = PipelineOptionsFactory.create();
Expand Down

0 comments on commit fca0bea

Please sign in to comment.