diff --git a/sdks/go/pkg/beam/runners/prism/internal/stage.go b/sdks/go/pkg/beam/runners/prism/internal/stage.go index 3d1e506f5e32..da23ca8ccce1 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/stage.go +++ b/sdks/go/pkg/beam/runners/prism/internal/stage.go @@ -55,15 +55,17 @@ type link struct { // account, but all serialization boundaries remain since the pcollections // would continue to get serialized. type stage struct { - ID string - transforms []string - primaryInput string // PCollection used as the parallel input. - outputs []link // PCollections that must escape this stage. - sideInputs []engine.LinkID // Non-parallel input PCollections and their consumers - internalCols []string // PCollections that escape. Used for precise coder sending. - envID string - stateful bool - hasTimers []string + ID string + transforms []string + primaryInput string // PCollection used as the parallel input. + outputs []link // PCollections that must escape this stage. + sideInputs []engine.LinkID // Non-parallel input PCollections and their consumers + internalCols []string // PCollections that escape. Used for precise coder sending. + envID string + stateful bool + // hasTimers indicates the transform+timerfamily pairs that need to be waited on for + // the stage to be considered complete. + hasTimers []struct{ Transform, TimerFamily string } processingTimeTimers map[string]bool exe transformExecuter @@ -396,7 +398,7 @@ func buildDescriptor(stg *stage, comps *pipepb.Components, wk *worker.W, em *eng } } for timerID, v := range pardo.GetTimerFamilySpecs() { - stg.hasTimers = append(stg.hasTimers, tid) + stg.hasTimers = append(stg.hasTimers, struct{ Transform, TimerFamily string }{Transform: tid, TimerFamily: timerID}) if v.TimeDomain == pipepb.TimeDomain_PROCESSING_TIME { if stg.processingTimeTimers == nil { stg.processingTimeTimers = map[string]bool{} diff --git a/sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go b/sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go index 842c5fdfc19d..50e427ca36f5 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go +++ b/sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go @@ -42,7 +42,7 @@ type B struct { InputTransformID string Input []*engine.Block // Data and Timers for this bundle. EstimatedInputElements int - HasTimers []string + HasTimers []struct{ Transform, TimerFamily string } // Timer streams to terminate. // IterableSideInputData is a map from transformID + inputID, to window, to data. IterableSideInputData map[SideInputKey]map[typex.Window][][]byte @@ -175,7 +175,8 @@ func (b *B) ProcessOn(ctx context.Context, wk *W) <-chan struct{} { for _, tid := range b.HasTimers { timers = append(timers, &fnpb.Elements_Timers{ InstructionId: b.InstID, - TransformId: tid, + TransformId: tid.Transform, + TimerFamilyId: tid.TimerFamily, IsLast: true, }) } diff --git a/sdks/python/apache_beam/runners/portability/prism_runner_test.py b/sdks/python/apache_beam/runners/portability/prism_runner_test.py index 387b7ba2bec7..04a2dbd4faed 100644 --- a/sdks/python/apache_beam/runners/portability/prism_runner_test.py +++ b/sdks/python/apache_beam/runners/portability/prism_runner_test.py @@ -35,7 +35,6 @@ from apache_beam.runners.portability import portable_runner_test from apache_beam.testing.util import assert_that from apache_beam.testing.util import equal_to -from apache_beam.transforms import userstate from apache_beam.transforms import window from apache_beam.utils import timestamp @@ -195,37 +194,6 @@ def test_windowing(self): assert_that( res, equal_to([('k', [1, 2]), ('k', [100, 101, 102]), ('k', [123])])) - # The fn_runner_test.py version of this test doesn't execute the process - # method for some reason. Overridden here to validate that the cleared - # timer won't re-fire. - def test_pardo_timers_clear(self): - timer_spec = userstate.TimerSpec('timer', userstate.TimeDomain.WATERMARK) - - class TimerDoFn(beam.DoFn): - def process(self, element, timer=beam.DoFn.TimerParam(timer_spec)): - unused_key, ts = element - timer.set(ts) - timer.set(2 * ts) - - @userstate.on_timer(timer_spec) - def process_timer( - self, - ts=beam.DoFn.TimestampParam, - timer=beam.DoFn.TimerParam(timer_spec)): - timer.set(timestamp.Timestamp(micros=2 * ts.micros)) - timer.clear() # Shouldn't fire again - yield 'fired' - - with self.create_pipeline() as p: - actual = ( - p - | beam.Create([('k1', 10), ('k2', 100)]) - | beam.ParDo(TimerDoFn()) - | beam.Map(lambda x, ts=beam.DoFn.TimestampParam: (x, ts))) - - expected = [('fired', ts) for ts in (20, 200)] - assert_that(actual, equal_to(expected)) - # Can't read host files from within docker, read a "local" file there. def test_read(self): print('name:', __name__)