Skip to content

Commit

Permalink
[#32003][prism] Support empty transform input sets, such as for flatt…
Browse files Browse the repository at this point in the history
…ens. (#32029)

Co-authored-by: lostluck <[email protected]>
  • Loading branch information
lostluck and lostluck authored Jul 30, 2024
1 parent 121ac71 commit 88a0102
Showing 1 changed file with 32 additions and 5 deletions.
37 changes: 32 additions & 5 deletions sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,16 @@ func (em *ElementManager) AddStage(ID string, inputIDs, outputIDs []string, side
for _, input := range inputIDs {
em.consumers[input] = append(em.consumers[input], ss.ID)
}

// In very rare cases, we can have a stage without any inputs, such as a flatten.
// In that case, there's nothing that will start the watermark refresh cycle,
// so we must do it here.
if len(inputIDs) == 0 {
refreshes := singleSet(ss.ID)
em.addToTestStreamImpulseSet(refreshes)
em.addRefreshes(refreshes)
}

for _, side := range ss.sides {
// Note that we use the StageID as the global ID in the value since we need
// to be able to look up the consuming stage, from the global PCollectionID.
Expand Down Expand Up @@ -265,14 +275,19 @@ func (em *ElementManager) Impulse(stageID string) {
}
refreshes := stage.updateWatermarks(em)

// Since impulses are synthetic, we need to simulate them properly
// if a pipeline is only test stream driven.
em.addToTestStreamImpulseSet(refreshes)
em.addRefreshes(refreshes)
}

// addToTestStreamImpulseSet adds to the set of stages to refresh on pipeline start.
// We keep this separate since impulses are synthetic. In a test stream driven pipeline
// these will need to be stimulated separately, to ensure the test stream has progressed.
func (em *ElementManager) addToTestStreamImpulseSet(refreshes set[string]) {
if em.impulses == nil {
em.impulses = refreshes
} else {
em.impulses.merge(refreshes)
}
em.addRefreshes(refreshes)
}

type RunBundle struct {
Expand Down Expand Up @@ -470,7 +485,19 @@ func (em *ElementManager) checkForQuiescence(advanced set[string]) error {
outW := ss.OutputWatermark()
upPCol, upW := ss.UpstreamWatermark()
upS := em.pcolParents[upPCol]
if upS == "" {
upS = "IMPULSE " // (extra spaces to allow print to align better.)
}
stageState = append(stageState, fmt.Sprintln(id, "watermark in", inW, "out", outW, "upstream", upW, "from", upS, "pending", ss.pending, "byKey", ss.pendingByKeys, "inprogressKeys", ss.inprogressKeys, "byBundle", ss.inprogressKeysByBundle, "holds", ss.watermarkHolds.heap, "holdCounts", ss.watermarkHolds.counts, "holdsInBundle", ss.inprogressHoldsByBundle, "pttEvents", ss.processingTimeTimers.toFire))

var outputConsumers, sideConsumers []string
for _, col := range ss.outputIDs {
outputConsumers = append(outputConsumers, em.consumers[col]...)
for _, l := range em.sideConsumers[col] {
sideConsumers = append(sideConsumers, l.Global)
}
}
stageState = append(stageState, fmt.Sprintf("\tsideInputs: %v outputCols: %v outputConsumers: %v sideConsumers: %v\n", ss.sides, ss.outputIDs, outputConsumers, sideConsumers))
}
return errors.Errorf("nothing in progress and no refreshes with non zero pending elements: %v\n%v", v, strings.Join(stageState, ""))
}
Expand Down Expand Up @@ -721,13 +748,13 @@ func (em *ElementManager) PersistBundle(rb RunBundle, col2Coders map[string]PCol
}
}
consumers := em.consumers[output]
slog.Debug("PersistBundle: bundle has downstream consumers.", "bundle", rb, slog.Int("newPending", len(newPending)), "consumers", consumers)
sideConsumers := em.sideConsumers[output]
slog.Debug("PersistBundle: bundle has downstream consumers.", "bundle", rb, slog.Int("newPending", len(newPending)), "consumers", consumers, "sideConsumers", sideConsumers)
for _, sID := range consumers {
consumer := em.stages[sID]
count := consumer.AddPending(newPending)
em.addPending(count)
}
sideConsumers := em.sideConsumers[output]
for _, link := range sideConsumers {
consumer := em.stages[link.Global]
consumer.AddPendingSide(newPending, link.Transform, link.Local)
Expand Down

0 comments on commit 88a0102

Please sign in to comment.