From 88a01027f4cc1c96fbfadb5cf7d26fb0434447dc Mon Sep 17 00:00:00 2001 From: Robert Burke Date: Tue, 30 Jul 2024 16:37:35 -0700 Subject: [PATCH] [#32003][prism] Support empty transform input sets, such as for flattens. (#32029) Co-authored-by: lostluck <13907733+lostluck@users.noreply.github.com> --- .../prism/internal/engine/elementmanager.go | 37 ++++++++++++++++--- 1 file changed, 32 insertions(+), 5 deletions(-) diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go index 2c4e08bcd094..a632318e02c7 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go +++ b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go @@ -211,6 +211,16 @@ func (em *ElementManager) AddStage(ID string, inputIDs, outputIDs []string, side for _, input := range inputIDs { em.consumers[input] = append(em.consumers[input], ss.ID) } + + // In very rare cases, we can have a stage without any inputs, such as a flatten. + // In that case, there's nothing that will start the watermark refresh cycle, + // so we must do it here. + if len(inputIDs) == 0 { + refreshes := singleSet(ss.ID) + em.addToTestStreamImpulseSet(refreshes) + em.addRefreshes(refreshes) + } + for _, side := range ss.sides { // Note that we use the StageID as the global ID in the value since we need // to be able to look up the consuming stage, from the global PCollectionID. @@ -265,14 +275,19 @@ func (em *ElementManager) Impulse(stageID string) { } refreshes := stage.updateWatermarks(em) - // Since impulses are synthetic, we need to simulate them properly - // if a pipeline is only test stream driven. + em.addToTestStreamImpulseSet(refreshes) + em.addRefreshes(refreshes) +} + +// addToTestStreamImpulseSet adds to the set of stages to refresh on pipeline start. +// We keep this separate since impulses are synthetic. In a test stream driven pipeline +// these will need to be stimulated separately, to ensure the test stream has progressed. +func (em *ElementManager) addToTestStreamImpulseSet(refreshes set[string]) { if em.impulses == nil { em.impulses = refreshes } else { em.impulses.merge(refreshes) } - em.addRefreshes(refreshes) } type RunBundle struct { @@ -470,7 +485,19 @@ func (em *ElementManager) checkForQuiescence(advanced set[string]) error { outW := ss.OutputWatermark() upPCol, upW := ss.UpstreamWatermark() upS := em.pcolParents[upPCol] + if upS == "" { + upS = "IMPULSE " // (extra spaces to allow print to align better.) + } stageState = append(stageState, fmt.Sprintln(id, "watermark in", inW, "out", outW, "upstream", upW, "from", upS, "pending", ss.pending, "byKey", ss.pendingByKeys, "inprogressKeys", ss.inprogressKeys, "byBundle", ss.inprogressKeysByBundle, "holds", ss.watermarkHolds.heap, "holdCounts", ss.watermarkHolds.counts, "holdsInBundle", ss.inprogressHoldsByBundle, "pttEvents", ss.processingTimeTimers.toFire)) + + var outputConsumers, sideConsumers []string + for _, col := range ss.outputIDs { + outputConsumers = append(outputConsumers, em.consumers[col]...) + for _, l := range em.sideConsumers[col] { + sideConsumers = append(sideConsumers, l.Global) + } + } + stageState = append(stageState, fmt.Sprintf("\tsideInputs: %v outputCols: %v outputConsumers: %v sideConsumers: %v\n", ss.sides, ss.outputIDs, outputConsumers, sideConsumers)) } return errors.Errorf("nothing in progress and no refreshes with non zero pending elements: %v\n%v", v, strings.Join(stageState, "")) } @@ -721,13 +748,13 @@ func (em *ElementManager) PersistBundle(rb RunBundle, col2Coders map[string]PCol } } consumers := em.consumers[output] - slog.Debug("PersistBundle: bundle has downstream consumers.", "bundle", rb, slog.Int("newPending", len(newPending)), "consumers", consumers) + sideConsumers := em.sideConsumers[output] + slog.Debug("PersistBundle: bundle has downstream consumers.", "bundle", rb, slog.Int("newPending", len(newPending)), "consumers", consumers, "sideConsumers", sideConsumers) for _, sID := range consumers { consumer := em.stages[sID] count := consumer.AddPending(newPending) em.addPending(count) } - sideConsumers := em.sideConsumers[output] for _, link := range sideConsumers { consumer := em.stages[link.Global] consumer.AddPendingSide(newPending, link.Transform, link.Local)