Skip to content

Commit

Permalink
[apache#32221] Mark is-last for each timer stream correctly.
Browse files Browse the repository at this point in the history
  • Loading branch information
lostluck committed Aug 19, 2024
1 parent 872a97f commit 5ad16e8
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 12 deletions.
22 changes: 12 additions & 10 deletions sdks/go/pkg/beam/runners/prism/internal/stage.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,15 +55,17 @@ type link struct {
// account, but all serialization boundaries remain since the pcollections
// would continue to get serialized.
type stage struct {
ID string
transforms []string
primaryInput string // PCollection used as the parallel input.
outputs []link // PCollections that must escape this stage.
sideInputs []engine.LinkID // Non-parallel input PCollections and their consumers
internalCols []string // PCollections that escape. Used for precise coder sending.
envID string
stateful bool
hasTimers []string
ID string
transforms []string
primaryInput string // PCollection used as the parallel input.
outputs []link // PCollections that must escape this stage.
sideInputs []engine.LinkID // Non-parallel input PCollections and their consumers
internalCols []string // PCollections that escape. Used for precise coder sending.
envID string
stateful bool
// hasTimers indicates the transform+timerfamily pairs that need to be waited on for
// the stage to be considered complete.
hasTimers []struct{ Transform, TimerFamily string }
processingTimeTimers map[string]bool

exe transformExecuter
Expand Down Expand Up @@ -396,7 +398,7 @@ func buildDescriptor(stg *stage, comps *pipepb.Components, wk *worker.W, em *eng
}
}
for timerID, v := range pardo.GetTimerFamilySpecs() {
stg.hasTimers = append(stg.hasTimers, tid)
stg.hasTimers = append(stg.hasTimers, struct{ Transform, TimerFamily string }{Transform: tid, TimerFamily: timerID})
if v.TimeDomain == pipepb.TimeDomain_PROCESSING_TIME {
if stg.processingTimeTimers == nil {
stg.processingTimeTimers = map[string]bool{}
Expand Down
5 changes: 3 additions & 2 deletions sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type B struct {
InputTransformID string
Input []*engine.Block // Data and Timers for this bundle.
EstimatedInputElements int
HasTimers []string
HasTimers []struct{ Transform, TimerFamily string } // Timer streams to terminate.

// IterableSideInputData is a map from transformID + inputID, to window, to data.
IterableSideInputData map[SideInputKey]map[typex.Window][][]byte
Expand Down Expand Up @@ -175,7 +175,8 @@ func (b *B) ProcessOn(ctx context.Context, wk *W) <-chan struct{} {
for _, tid := range b.HasTimers {
timers = append(timers, &fnpb.Elements_Timers{
InstructionId: b.InstID,
TransformId: tid,
TransformId: tid.Transform,
TimerFamilyId: tid.TimerFamily,
IsLast: true,
})
}
Expand Down

0 comments on commit 5ad16e8

Please sign in to comment.