Skip to content

Commit

Permalink
Fix the python test instead.
Browse files Browse the repository at this point in the history
  • Loading branch information
lostluck committed Dec 4, 2024
1 parent 2e799bf commit 7516b98
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1579,7 +1579,7 @@ func (ss *stageState) updateWatermarks(em *ElementManager) set[string] {
// TODO(#https://github.com/apache/beam/issues/31438):
// Adjust with AllowedLateness
// Clear out anything we've already used.
if win.MaxTimestamp()+1 < newOut {
if win.MaxTimestamp() < newOut {
delete(wins, win)
}
}
Expand All @@ -1588,7 +1588,7 @@ func (ss *stageState) updateWatermarks(em *ElementManager) set[string] {
for win := range wins {
// TODO(#https://github.com/apache/beam/issues/31438):
// Adjust with AllowedLateness
if win.MaxTimestamp()+1 < newOut {
if win.MaxTimestamp() < newOut {
delete(wins, win)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -776,7 +776,8 @@ def process(
state.clear()
yield buffer
else:
timer.set(ts + 1)
# Set the timer to fire within it's window.
timer.set(ts + (1 - timestamp.Duration(micros=1000)))

@userstate.on_timer(timer_spec)
def process_timer(self, state=beam.DoFn.StateParam(state_spec)):
Expand Down

0 comments on commit 7516b98

Please sign in to comment.