diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go index 2c81ce6a53ab..1739efdb742a 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go +++ b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go @@ -1579,7 +1579,7 @@ func (ss *stageState) updateWatermarks(em *ElementManager) set[string] { // TODO(#https://github.com/apache/beam/issues/31438): // Adjust with AllowedLateness // Clear out anything we've already used. - if win.MaxTimestamp()+1 < newOut { + if win.MaxTimestamp() < newOut { delete(wins, win) } } @@ -1588,7 +1588,7 @@ func (ss *stageState) updateWatermarks(em *ElementManager) set[string] { for win := range wins { // TODO(#https://github.com/apache/beam/issues/31438): // Adjust with AllowedLateness - if win.MaxTimestamp()+1 < newOut { + if win.MaxTimestamp() < newOut { delete(wins, win) } } diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py index c788caee68d1..45022ebc781f 100644 --- a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py +++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py @@ -776,7 +776,8 @@ def process( state.clear() yield buffer else: - timer.set(ts + 1) + # Set the timer to fire within it's window. + timer.set(ts + (1 - timestamp.Duration(micros=1000))) @userstate.on_timer(timer_spec) def process_timer(self, state=beam.DoFn.StateParam(state_spec)):