Skip to content

Commit

Permalink
[prism] rename watermarkRefreshs concept to changedStages. (#32233)
Browse files Browse the repository at this point in the history
* [prism] rename watermarRefreshs concept to changed stages.

* missed file

---------

Co-authored-by: lostluck <[email protected]>
  • Loading branch information
lostluck and lostluck authored Aug 19, 2024
1 parent 8c0fbf7 commit 88713ac
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 50 deletions.
96 changes: 52 additions & 44 deletions sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,9 +194,9 @@ type ElementManager struct {

pcolParents map[string]string // Map from pcollectionID to stageIDs that produce the pcollection.

refreshCond sync.Cond // refreshCond protects the following fields with it's lock, and unblocks bundle scheduling.
inprogressBundles set[string] // Active bundleIDs
watermarkRefreshes set[string] // Scheduled stageID watermark refreshes
refreshCond sync.Cond // refreshCond protects the following fields with it's lock, and unblocks bundle scheduling.
inprogressBundles set[string] // Active bundleIDs
changedStages set[string] // Stages that have changed and need their watermark refreshed.

livePending atomic.Int64 // An accessible live pending count. DEBUG USE ONLY
pendingElements sync.WaitGroup // pendingElements counts all unprocessed elements in a job. Jobs with no pending elements terminate successfully.
Expand All @@ -217,15 +217,15 @@ type LinkID struct {

func NewElementManager(config Config) *ElementManager {
return &ElementManager{
config: config,
stages: map[string]*stageState{},
consumers: map[string][]string{},
sideConsumers: map[string][]LinkID{},
pcolParents: map[string]string{},
watermarkRefreshes: set[string]{},
inprogressBundles: set[string]{},
refreshCond: sync.Cond{L: &sync.Mutex{}},
processTimeEvents: newStageRefreshQueue(),
config: config,
stages: map[string]*stageState{},
consumers: map[string][]string{},
sideConsumers: map[string][]LinkID{},
pcolParents: map[string]string{},
changedStages: set[string]{},
inprogressBundles: set[string]{},
refreshCond: sync.Cond{L: &sync.Mutex{}},
processTimeEvents: newStageRefreshQueue(),
}
}

Expand All @@ -249,7 +249,7 @@ func (em *ElementManager) AddStage(ID string, inputIDs, outputIDs []string, side
if len(inputIDs) == 0 {
refreshes := singleSet(ss.ID)
em.addToTestStreamImpulseSet(refreshes)
em.addRefreshes(refreshes)
em.markStagesAsChanged(refreshes)
}

for _, side := range ss.sides {
Expand Down Expand Up @@ -307,7 +307,7 @@ func (em *ElementManager) Impulse(stageID string) {
refreshes := stage.updateWatermarks(em)

em.addToTestStreamImpulseSet(refreshes)
em.addRefreshes(refreshes)
em.markStagesAsChanged(refreshes)
}

// addToTestStreamImpulseSet adds to the set of stages to refresh on pipeline start.
Expand Down Expand Up @@ -357,21 +357,21 @@ func (em *ElementManager) Bundles(ctx context.Context, upstreamCancelFn context.
}()
defer close(runStageCh)

// If we have a test stream, clear out existing refreshes, so the test stream can
// insert any elements it needs.
// If we have a test stream, clear out existing changed stages,
// so the test stream can insert any elements it needs.
if em.testStreamHandler != nil {
em.watermarkRefreshes = singleSet(em.testStreamHandler.ID)
em.changedStages = singleSet(em.testStreamHandler.ID)
}

for {
em.refreshCond.L.Lock()
// Check if processing time has advanced before the wait loop.
emNow := em.ProcessingTimeNow()
ptRefreshed := em.processTimeEvents.AdvanceTo(emNow)
em.watermarkRefreshes.merge(ptRefreshed)
changedByProcessingTime := em.processTimeEvents.AdvanceTo(emNow)
em.changedStages.merge(changedByProcessingTime)

// If there are no watermark refreshes or ready processing time events available, we wait until there are.
for len(em.watermarkRefreshes)+len(ptRefreshed) == 0 {
// If there are no changed stages or ready processing time events available, we wait until there are.
for len(em.changedStages)+len(changedByProcessingTime) == 0 {
// Check to see if we must exit
select {
case <-ctx.Done():
Expand All @@ -383,14 +383,14 @@ func (em *ElementManager) Bundles(ctx context.Context, upstreamCancelFn context.

// Update if the processing time has advanced while we waited, and add refreshes here. (TODO waking on real time here for prod mode)
emNow = em.ProcessingTimeNow()
ptRefreshed = em.processTimeEvents.AdvanceTo(emNow)
em.watermarkRefreshes.merge(ptRefreshed)
changedByProcessingTime = em.processTimeEvents.AdvanceTo(emNow)
em.changedStages.merge(changedByProcessingTime)
}

// We know there is some work we can do that may advance the watermarks,
// refresh them, and see which stages have advanced.
advanced := em.refreshWatermarks()
advanced.merge(ptRefreshed)
advanced.merge(changedByProcessingTime)

// Check each advanced stage, to see if it's able to execute based on the watermark.
for stageID := range advanced {
Expand All @@ -400,7 +400,7 @@ func (em *ElementManager) Bundles(ctx context.Context, upstreamCancelFn context.
bundleID, ok, reschedule := ss.startEventTimeBundle(watermark, nextBundID)
// Handle the reschedule even when there's no bundle.
if reschedule {
em.watermarkRefreshes.insert(stageID)
em.changedStages.insert(stageID)
}
if ok {
rb := RunBundle{StageID: stageID, BundleID: bundleID, Watermark: watermark}
Expand All @@ -420,7 +420,7 @@ func (em *ElementManager) Bundles(ctx context.Context, upstreamCancelFn context.
bundleID, ok, reschedule := ss.startProcessingTimeBundle(em, emNow, nextBundID)
// Handle the reschedule even when there's no bundle.
if reschedule {
em.watermarkRefreshes.insert(stageID)
em.changedStages.insert(stageID)
}
if ok {
rb := RunBundle{StageID: stageID, BundleID: bundleID, Watermark: watermark}
Expand Down Expand Up @@ -461,12 +461,13 @@ func (em *ElementManager) checkForQuiescence(advanced set[string]) error {
// If there are bundles in progress, then there may be watermark refreshes when they terminate.
return nil
}
if len(em.watermarkRefreshes) > 0 {
// If there are watermarks to refresh, we aren't yet stuck.
if len(em.changedStages) > 0 {
// If there are changed stages that need a watermarks refresh,
// we aren't yet stuck.
v := em.livePending.Load()
slog.Debug("Bundles: nothing in progress after advance",
slog.Any("advanced", advanced),
slog.Int("refreshCount", len(em.watermarkRefreshes)),
slog.Int("changeCount", len(em.changedStages)),
slog.Int64("pendingElementCount", v),
)
return nil
Expand All @@ -484,18 +485,20 @@ func (em *ElementManager) checkForQuiescence(advanced set[string]) error {
nextEvent.Execute(em)
// Decrement pending for the event being processed.
em.addPending(-1)
// If there are refreshes scheduled, then test stream permitted execution to continue.
// If there are changedStages scheduled for a watermark refresh,
// then test stream permits execution to continue.
// Note: it's a prism bug if test stream never causes a refresh to occur for a given event.
// It's not correct to move to the next event if no refreshes would occur.
if len(em.watermarkRefreshes) > 0 {
if len(em.changedStages) > 0 {
return nil
} else if _, ok := nextEvent.(tsProcessingTimeEvent); ok {
// It's impossible to fully control processing time SDK side handling for processing time
// Runner side, so we specialize refresh handling here to avoid spuriously getting stuck.
em.watermarkRefreshes.insert(em.testStreamHandler.ID)
em.changedStages.insert(em.testStreamHandler.ID)
return nil
}
// If there are no refreshes, then there's no mechanism to make progress, so it's time to fast fail.
// If there are no changed stages due to a test stream event
// then there's no mechanism to make progress, so it's time to fast fail.
}

v := em.livePending.Load()
Expand Down Expand Up @@ -874,7 +877,7 @@ func (em *ElementManager) PersistBundle(rb RunBundle, col2Coders map[string]PCol
}
stage.mu.Unlock()

em.addRefreshAndClearBundle(stage.ID, rb.BundleID, ptRefreshes)
em.markChangedAndClearBundle(stage.ID, rb.BundleID, ptRefreshes)
}

// triageTimers prepares received timers for eventual firing, as well as rebasing processing time timers as needed.
Expand Down Expand Up @@ -959,7 +962,7 @@ func (em *ElementManager) FailBundle(rb RunBundle) {
em.addPending(-len(completed.es))
delete(stage.inprogress, rb.BundleID)
stage.mu.Unlock()
em.addRefreshAndClearBundle(rb.StageID, rb.BundleID, nil)
em.markChangedAndClearBundle(rb.StageID, rb.BundleID, nil)
}

// ReturnResiduals is called after a successful split, so the remaining work
Expand All @@ -974,38 +977,43 @@ func (em *ElementManager) ReturnResiduals(rb RunBundle, firstRsIndex int, inputI
count := stage.AddPending(unprocessedElements)
em.addPending(count)
}
em.addRefreshes(singleSet(rb.StageID))
em.markStagesAsChanged(singleSet(rb.StageID))
}

func (em *ElementManager) addRefreshes(stages set[string]) {
// markStagesAsChanged updates the set of changed stages,
// and broadcasts that there may be watermark evaluation work to do.
func (em *ElementManager) markStagesAsChanged(stages set[string]) {
em.refreshCond.L.Lock()
defer em.refreshCond.L.Unlock()
em.watermarkRefreshes.merge(stages)
em.changedStages.merge(stages)
em.refreshCond.Broadcast()
}

func (em *ElementManager) addRefreshAndClearBundle(stageID, bundID string, ptRefreshes set[mtime.Time]) {
// markChangedAndClearBundle markes the current stage as changed,
// and removes the given bundle from being in progress.
func (em *ElementManager) markChangedAndClearBundle(stageID, bundID string, ptRefreshes set[mtime.Time]) {
em.refreshCond.L.Lock()
defer em.refreshCond.L.Unlock()
delete(em.inprogressBundles, bundID)
em.watermarkRefreshes.insert(stageID)
em.changedStages.insert(stageID)
for t := range ptRefreshes {
em.processTimeEvents.Schedule(t, stageID)
}
em.refreshCond.Broadcast()
}

// refreshWatermarks incrementally refreshes the watermarks, and returns the set of stages where the
// refreshWatermarks incrementally refreshes the watermarks of stages that have
// been marked as changed, and returns the set of stages where the
// the watermark may have advanced.
// Must be called while holding em.refreshCond.L
func (em *ElementManager) refreshWatermarks() set[string] {
// Need to have at least one refresh signal.
nextUpdates := set[string]{}
refreshed := set[string]{}
var i int
for stageID := range em.watermarkRefreshes {
for stageID := range em.changedStages {
// clear out old one.
em.watermarkRefreshes.remove(stageID)
em.changedStages.remove(stageID)
ss := em.stages[stageID]
refreshed.insert(stageID)

Expand All @@ -1018,7 +1026,7 @@ func (em *ElementManager) refreshWatermarks() set[string] {
break
}
}
em.watermarkRefreshes.merge(nextUpdates)
em.changedStages.merge(nextUpdates)
return refreshed
}

Expand Down
12 changes: 6 additions & 6 deletions sdks/go/pkg/beam/runners/prism/internal/engine/teststream.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func (ts *testStreamHandler) UpdateHold(em *ElementManager, newHold mtime.Time)
kick.merge(em.impulses)

// This executes under the refreshCond lock, so we can't call em.addRefreshes.
em.watermarkRefreshes.merge(kick)
em.changedStages.merge(kick)
em.refreshCond.Broadcast()
}

Expand Down Expand Up @@ -190,13 +190,13 @@ func (ev tsElementEvent) Execute(em *ElementManager) {
ss := em.stages[sID]
added := ss.AddPending(pending)
em.addPending(added)
em.watermarkRefreshes.insert(sID)
em.changedStages.insert(sID)
}

for _, link := range em.sideConsumers[t.pcollection] {
ss := em.stages[link.Global]
ss.AddPendingSide(pending, link.Transform, link.Local)
em.watermarkRefreshes.insert(link.Global)
em.changedStages.insert(link.Global)
}
}

Expand All @@ -220,7 +220,7 @@ func (ev tsWatermarkEvent) Execute(em *ElementManager) {
for _, sID := range em.consumers[t.pcollection] {
ss := em.stages[sID]
ss.updateUpstreamWatermark(ss.inputID, t.watermark)
em.watermarkRefreshes.insert(sID)
em.changedStages.insert(sID)
}
// Clear the default hold after the inserts have occured.
em.testStreamHandler.UpdateHold(em, t.watermark)
Expand All @@ -241,7 +241,7 @@ func (ev tsProcessingTimeEvent) Execute(em *ElementManager) {
// Add the refreshes now so our block prevention logic works.
emNow := em.ProcessingTimeNow()
toRefresh := em.processTimeEvents.AdvanceTo(emNow)
em.watermarkRefreshes.merge(toRefresh)
em.changedStages.merge(toRefresh)
}

// tsFinalEvent is the "last" event we perform after all preceeding events.
Expand All @@ -256,7 +256,7 @@ func (ev tsFinalEvent) Execute(em *ElementManager) {
ss := em.stages[ev.stageID]
kickSet := ss.updateWatermarks(em)
kickSet.insert(ev.stageID)
em.watermarkRefreshes.merge(kickSet)
em.changedStages.merge(kickSet)
}

// TestStreamBuilder builds a synthetic sequence of events for the engine to execute.
Expand Down

0 comments on commit 88713ac

Please sign in to comment.