diff --git a/sdks/go/pkg/beam/runners/prism/internal/stage.go b/sdks/go/pkg/beam/runners/prism/internal/stage.go index 3d1e506f5e32..da23ca8ccce1 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/stage.go +++ b/sdks/go/pkg/beam/runners/prism/internal/stage.go @@ -55,15 +55,17 @@ type link struct { // account, but all serialization boundaries remain since the pcollections // would continue to get serialized. type stage struct { - ID string - transforms []string - primaryInput string // PCollection used as the parallel input. - outputs []link // PCollections that must escape this stage. - sideInputs []engine.LinkID // Non-parallel input PCollections and their consumers - internalCols []string // PCollections that escape. Used for precise coder sending. - envID string - stateful bool - hasTimers []string + ID string + transforms []string + primaryInput string // PCollection used as the parallel input. + outputs []link // PCollections that must escape this stage. + sideInputs []engine.LinkID // Non-parallel input PCollections and their consumers + internalCols []string // PCollections that escape. Used for precise coder sending. + envID string + stateful bool + // hasTimers indicates the transform+timerfamily pairs that need to be waited on for + // the stage to be considered complete. + hasTimers []struct{ Transform, TimerFamily string } processingTimeTimers map[string]bool exe transformExecuter @@ -396,7 +398,7 @@ func buildDescriptor(stg *stage, comps *pipepb.Components, wk *worker.W, em *eng } } for timerID, v := range pardo.GetTimerFamilySpecs() { - stg.hasTimers = append(stg.hasTimers, tid) + stg.hasTimers = append(stg.hasTimers, struct{ Transform, TimerFamily string }{Transform: tid, TimerFamily: timerID}) if v.TimeDomain == pipepb.TimeDomain_PROCESSING_TIME { if stg.processingTimeTimers == nil { stg.processingTimeTimers = map[string]bool{} diff --git a/sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go b/sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go index 842c5fdfc19d..50e427ca36f5 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go +++ b/sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go @@ -42,7 +42,7 @@ type B struct { InputTransformID string Input []*engine.Block // Data and Timers for this bundle. EstimatedInputElements int - HasTimers []string + HasTimers []struct{ Transform, TimerFamily string } // Timer streams to terminate. // IterableSideInputData is a map from transformID + inputID, to window, to data. IterableSideInputData map[SideInputKey]map[typex.Window][][]byte @@ -175,7 +175,8 @@ func (b *B) ProcessOn(ctx context.Context, wk *W) <-chan struct{} { for _, tid := range b.HasTimers { timers = append(timers, &fnpb.Elements_Timers{ InstructionId: b.InstID, - TransformId: tid, + TransformId: tid.Transform, + TimerFamilyId: tid.TimerFamily, IsLast: true, }) }