-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Timers exception on "Job Drain" while using stateful beam processing in global window #20203
Comments
Can reproduce this issue after migrating a
|
Is there any update regarding this issue? |
@bvolpato Can this be fixed by not using GroupIntoBatches? |
Is there any update regarding this issue? |
#33037 should fix this in 2.61.0 |
Hi @arunpandianp, thanks for the follow up. I updated my Beam to 2.61.0 and unfortunately still getting this error messages when trying to Drain my Streaming Pipeline. |
@genyhamud was that on a new job created using 2.61.0? or an update from an older version? With the fix in #33037, new jobs created on 2.61.0 should not see the exception. Do you have a sample pipeline that can reproduce the exception? |
I will try to setup a small project and share here, thanks a lot for taking your time helping me! BTW it was a plain 2.61.0 pipeline without update. |
Hello,
I have a use case where I have two sets of PCollections (RecordA and RecordB) coming from a real time streaming source like Kafka.
Both Records are correlated with a common key, let's say KEY.
The purpose is to enrich RecordA with RecordB's data for which I am using CoGbByKey. Since RecordA and RecordB for a common key can come within 1-2 minutes of event time, I am maintaining a sliding window for both records and then do CoGpByKey for both PCollections.
The sliding windows that will find both RecordA and RecordB for a common key KEY, will emit enriched output. Now, since multiple sliding windows can emit the same output, I finally remove duplicate results by feeding aforementioned outputs to a global window where I maintain a state to check whether output has already been processed or not. Since it is a global window, I maintain a Timer on state (for GC) to let it expire after 10 minutes have elapsed since state has been written.
This is working perfectly fine w.r.t the expected results. However, I am unable to stop job gracefully i.e. Drain the job gracefully. I see following exception:
java.lang.IllegalStateException: org.apache.beam.runners.dataflow.worker.SimpleParDoFn@16b089a6 received state cleanup timer for window org.apache.beam.sdk.transforms.windowing.GlobalWindow@29ca0210 that is before the appropriate cleanup time 294248-01-24T04:00:54.776Z
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:842)
org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processSystemTimer(SimpleParDoFn.java:384)
org.apache.beam.runners.dataflow.worker.SimpleParDoFn.access$700(SimpleParDoFn.java:73)
org.apache.beam.runners.dataflow.worker.SimpleParDoFn$TimerType$2.processTimer(SimpleParDoFn.java:444)
org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processTimers(SimpleParDoFn.java:467)
org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processTimers(SimpleParDoFn.java:354)
org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.finish(ParDoOperation.java:52)
org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:85)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1316)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:149)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:1049)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)
java.lang.IllegalStateException: org.apache.beam.runners.dataflow.worker.SimpleParDoFn@59902a10 received state cleanup timer for window org.apache.beam.sdk.transforms.windowing.GlobalWindow@29ca0210 that is before the appropriate cleanup time 294248-01-24T04:00:54.776Z
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:842)
org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processSystemTimer(SimpleParDoFn.java:384)
org.apache.beam.runners.dataflow.worker.SimpleParDoFn.access$700(SimpleParDoFn.java:73)
org.apache.beam.runners.dataflow.worker.SimpleParDoFn$TimerType$2.processTimer(SimpleParDoFn.java:444)
org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processTimers(SimpleParDoFn.java:467)
org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processTimers(SimpleParDoFn.java:354)
org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.finish(ParDoOperation.java:52)
org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:85)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1316)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:149)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:1049)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)
java.lang.IllegalStateException: org.apache.beam.runners.dataflow.worker.SimpleParDoFn@4316932b received state cleanup timer for window org.apache.beam.sdk.transforms.windowing.GlobalWindow@29ca0210 that is before the appropriate cleanup time 294248-01-24T04:00:54.776Z
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:842)
org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processSystemTimer(SimpleParDoFn.java:384)
org.apache.beam.runners.dataflow.worker.SimpleParDoFn.access$700(SimpleParDoFn.java:73)
org.apache.beam.runners.dataflow.worker.SimpleParDoFn$TimerType$2.processTimer(SimpleParDoFn.java:444)
org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processTimers(SimpleParDoFn.java:467)
org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processTimers(SimpleParDoFn.java:354)
org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.finish(ParDoOperation.java:52)
org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:85)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1316)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:149)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:1049)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)
My code snippet:
PCollection<KV<MyKey, RecordA>> windowedRecordA =
incompleteRecordALogs.apply("Applying_Sliding_Window_RecordA", Window.<KV<MyKey, RecordA>>into(SlidingWindows.of(Duration.standardSeconds(90)).every(Duration.standardSeconds(45)))
.triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow())).withAllowedLateness(Duration.standardSeconds(90)).discardingFiredPanes()); PCollection<KV<MyKey, RecordB>> windowedRecordB =
recordBLogs.apply("Applying_Sliding_Window_RecordB", Window.<KV<MyKey, RecordB>>into(SlidingWindows.of(Duration.standardSeconds(90)).every(Duration.standardSeconds(45)))
.triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow())).withAllowedLateness(Duration.standardSeconds(90)).discardingFiredPanes());PCollection<KV<MyKey, CoGbkResult>> coGbkRecords =
KeyedPCollectionTuple.of(TagRecordA, windowedRecordA)
.and(TagRecordB, windowedRecordB)
.apply("CoGroupByKey", CoGroupByKey.create()); PCollection<RecordA> enrichedRecordA =
coGbkRecords.apply("EnrichRecordAWithRecordB",
new EnrichIncompleteRecordA()); class EnrichIncompleteRecordA extends PTransform<PCollection<KV<MyKey, CoGbkResult>>, PCollection<RecordA>> {
@OverRide
public PCollection<RecordA> expand(PCollection<KV<MyKey, CoGbkResult>> input) {
logger.info("Enriching Incomplete RecordA with RecordB");
return input
.apply("Add_RecordBInfo_To_RecordA", ParDo.of(new AddRecordBData()))
.apply("Applying_Windowing", Window.<KV<MyKey, RecordA>>into(new GlobalWindows())
.triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
.discardingFiredPanes())
.apply("Emit_Unique_RecordA", ParDo.of(new EmitUniqueRecordA()));
}
private class AddRecordBData extends DoFn<KV<MyKey, CoGbkResult>, KV<MyKey, RecordA>> {
@setup
public void setup() {
}
@StartBundle
public void startBundle() {
}
@ProcessElement
public void processElement(@element KV<MyKey, CoGbkResult> input, OutputReceiver<KV<MyKey, RecordA>> out) {
Iterable<RecordA> allRecordALogs = input.getValue().getAll(TagRecordA);
Iterable<RecordB> allRecordBLogs = input.getValue().getAll(TagRecordB);
/*
There should be max 1 RecordB per MyKey
*/
if (allRecordALogs.iterator().hasNext() && allRecordBLogs.iterator().hasNext()) {
RecordB recordB = Iterables.getFirst(allRecordBLogs, null);
for (RecordA recordA : allRecordALogs) {
if (null != recordB) {
logger.info("Enriching incomplete recordA [{}] with recordB: [{}]", recordA, recordB);
<code to populate recordA object with recordB data> out.output(KV.of(input.getKey(), recordA));
} else {
logger.error("No recordB available for recordA log [{}]", recordA);
}
}
} else {
logger.info("Either recordA or recordB not present for myKey: {}", input.getKey());
}
}
@FinishBundle
public void finishBundle() {
}
@teardown
public void teardown() {
}
}
private class EmitUniqueRecordA extends DoFn<KV<MyKey, RecordA>, RecordA> {
@setup
public void setup() {
}
@StartBundle
public void startBundle() {
}
@stateid("processedRecordA")
private final StateSpec<ValueState<RecordA> processedRecordASpec = StateSpecs.value(AvroCoder.of(RecordA.class));
@TimerId("stateExpiry")
private final TimerSpec stateExpirySpec = TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
@ProcessElement
public void processElement(@element KV<MyKey, RecordA> input, OutputReceiver<RecordA> out,
@stateid("processedRecordA") ValueState<Set<RecordA>> processedRecordAState,
@TimerId("stateExpiry") Timer stateExpiryTimer) {
<< code to check if recordA has already been processed by checking state >>
if (recordA need to be emitted) {
processedRecordAState.write(processedRecordASet);
stateExpiryTimer.offset(Duration.standardMinutes(10)).setRelative();
logger.info("Emitting unique recordA {} for myKey {}", recordA, myKey);
out.output(input.getValue());
}
}
@ontimer("stateExpiry")
public void onExpiry(
OnTimerContext context,
@stateid("processedRecordA") ValueState<RecordA> processedRecordAState) {
logger.info("Expiring State after timer expiry");
processedRecordAState.clear();
}
@FinishBundle
public void finishBundle() {
}
@teardown
public void teardown() {
}
}
}
Imported from Jira BEAM-10053. Original Jira may contain additional context.
Reported by: mohilkhare.
The text was updated successfully, but these errors were encountered: