From 7516b981161e080425bac24dedf351a65ee75d8c Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Wed, 4 Dec 2024 10:39:58 -0800 Subject: [PATCH] Fix the python test instead. --- .../pkg/beam/runners/prism/internal/engine/elementmanager.go | 4 ++-- .../runners/portability/fn_api_runner/fn_runner_test.py | 3 ++- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go index 2c81ce6a53ab..1739efdb742a 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go +++ b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go @@ -1579,7 +1579,7 @@ func (ss *stageState) updateWatermarks(em *ElementManager) set[string] { // TODO(#https://github.com/apache/beam/issues/31438): // Adjust with AllowedLateness // Clear out anything we've already used. - if win.MaxTimestamp()+1 < newOut { + if win.MaxTimestamp() < newOut { delete(wins, win) } } @@ -1588,7 +1588,7 @@ func (ss *stageState) updateWatermarks(em *ElementManager) set[string] { for win := range wins { // TODO(#https://github.com/apache/beam/issues/31438): // Adjust with AllowedLateness - if win.MaxTimestamp()+1 < newOut { + if win.MaxTimestamp() < newOut { delete(wins, win) } } diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py index c788caee68d1..45022ebc781f 100644 --- a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py +++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py @@ -776,7 +776,8 @@ def process( state.clear() yield buffer else: - timer.set(ts + 1) + # Set the timer to fire within it's window. + timer.set(ts + (1 - timestamp.Duration(micros=1000))) @userstate.on_timer(timer_spec) def process_timer(self, state=beam.DoFn.StateParam(state_spec)):