diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go index 5d665edf2862..76c60e810d40 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go +++ b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go @@ -169,6 +169,7 @@ type ElementManager struct { livePending atomic.Int64 // An accessible live pending count. DEBUG USE ONLY pendingElements sync.WaitGroup // pendingElements counts all unprocessed elements in a job. Jobs with no pending elements terminate successfully. + processTimeEvents *stageRefreshQueue // Manages sequence of stage updates when interfacing with processing time. testStreamHandler *testStreamHandler // Optional test stream handler when a test stream is in the pipeline. } @@ -192,6 +193,7 @@ func NewElementManager(config Config) *ElementManager { watermarkRefreshes: set[string]{}, inprogressBundles: set[string]{}, refreshCond: sync.Cond{L: &sync.Mutex{}}, + processTimeEvents: newStageRefreshQueue(), } } @@ -227,6 +229,11 @@ func (em *ElementManager) StageStateful(ID string) { em.stages[ID].stateful = true } +// StageProcessingTimeTimers indicates which timers are processingTime domain timers. +func (em *ElementManager) StageProcessingTimeTimers(ID string, ptTimers map[string]bool) { + em.stages[ID].processingTimeTimersFamilies = ptTimers +} + // AddTestStream provides a builder interface for the execution layer to build the test stream from // the protos. func (em *ElementManager) AddTestStream(id string, tagToPCol map[string]string) TestStreamBuilder { @@ -305,8 +312,13 @@ func (em *ElementManager) Bundles(ctx context.Context, nextBundID func() string) for { em.refreshCond.L.Lock() - // If there are no watermark refreshes available, we wait until there are. - for len(em.watermarkRefreshes) == 0 { + // Check if processing time has advanced before the wait loop. + emNow := em.ProcessingTimeNow() + ptRefreshed := em.processTimeEvents.AdvanceTo(emNow) + em.watermarkRefreshes.merge(ptRefreshed) + + // If there are no watermark refreshes or ready processing time events available, we wait until there are. + for len(em.watermarkRefreshes)+len(ptRefreshed) == 0 { // Check to see if we must exit select { case <-ctx.Done(): @@ -315,36 +327,61 @@ func (em *ElementManager) Bundles(ctx context.Context, nextBundID func() string) default: } em.refreshCond.Wait() // until watermarks may have changed. + + // Update if the processing time has advanced while we waited, and add refreshes here. (TODO waking on real time here for prod mode) + emNow = em.ProcessingTimeNow() + ptRefreshed = em.processTimeEvents.AdvanceTo(emNow) + em.watermarkRefreshes.merge(ptRefreshed) } // We know there is some work we can do that may advance the watermarks, // refresh them, and see which stages have advanced. advanced := em.refreshWatermarks() + advanced.merge(ptRefreshed) // Check each advanced stage, to see if it's able to execute based on the watermark. for stageID := range advanced { ss := em.stages[stageID] - watermark, ready := ss.bundleReady(em) + watermark, ready, ptimeEventsReady := ss.bundleReady(em, emNow) if ready { - bundleID, ok, reschedule := ss.startBundle(watermark, nextBundID) + bundleID, ok, reschedule := ss.startEventTimeBundle(watermark, nextBundID) // Handle the reschedule even when there's no bundle. if reschedule { em.watermarkRefreshes.insert(stageID) } - if !ok { - continue + if ok { + rb := RunBundle{StageID: stageID, BundleID: bundleID, Watermark: watermark} + + em.inprogressBundles.insert(rb.BundleID) + em.refreshCond.L.Unlock() + + select { + case <-ctx.Done(): + return + case runStageCh <- rb: + } + em.refreshCond.L.Lock() } - rb := RunBundle{StageID: stageID, BundleID: bundleID, Watermark: watermark} - - em.inprogressBundles.insert(rb.BundleID) - em.refreshCond.L.Unlock() - - select { - case <-ctx.Done(): - return - case runStageCh <- rb: + } + if ptimeEventsReady { + bundleID, ok, reschedule := ss.startProcessingTimeBundle(em, emNow, nextBundID) + // Handle the reschedule even when there's no bundle. + if reschedule { + em.watermarkRefreshes.insert(stageID) + } + if ok { + rb := RunBundle{StageID: stageID, BundleID: bundleID, Watermark: watermark} + + em.inprogressBundles.insert(rb.BundleID) + em.refreshCond.L.Unlock() + + select { + case <-ctx.Done(): + return + case runStageCh <- rb: + } + em.refreshCond.L.Lock() } - em.refreshCond.L.Lock() } } em.checkForQuiescence(advanced) @@ -379,6 +416,11 @@ func (em *ElementManager) checkForQuiescence(advanced set[string]) { ) return } + if em.testStreamHandler == nil && len(em.processTimeEvents.events) > 0 { + // If there's no test stream involved, and processing time events exist, then + // it's only a matter of time. + return + } // The job has quiesced! // There are no further incoming watermark changes, see if there are test stream events for this job. @@ -392,6 +434,11 @@ func (em *ElementManager) checkForQuiescence(advanced set[string]) { // It's not correct to move to the next event if no refreshes would occur. if len(em.watermarkRefreshes) > 0 { return + } else if _, ok := nextEvent.(tsProcessingTimeEvent); ok { + // It's impossible to fully control processing time SDK side handling for processing time + // Runner side, so we specialize refresh handling here to avoid spuriously getting stuck. + em.watermarkRefreshes.insert(em.testStreamHandler.ID) + return } // If there are no refreshes, then there's no mechanism to make progress, so it's time to fast fail. } @@ -407,6 +454,12 @@ func (em *ElementManager) checkForQuiescence(advanced set[string]) { slog.Debug("Bundles: nothing in progress and no refreshes", slog.Int64("pendingElementCount", v)) var stageState []string ids := maps.Keys(em.stages) + if em.testStreamHandler != nil { + stageState = append(stageState, fmt.Sprintf("TestStreamHandler: completed %v, curIndex %v of %v events: %+v, processingTime %v, %v, ptEvents %v \n", + em.testStreamHandler.completed, em.testStreamHandler.nextEventIndex, len(em.testStreamHandler.events), em.testStreamHandler.events, em.testStreamHandler.processingTime, mtime.FromTime(em.testStreamHandler.processingTime), em.processTimeEvents)) + } else { + stageState = append(stageState, fmt.Sprintf("ElementManager Now: %v processingTimeEvents: %v\n", em.ProcessingTimeNow(), em.processTimeEvents.events)) + } sort.Strings(ids) for _, id := range ids { ss := em.stages[id] @@ -414,7 +467,7 @@ func (em *ElementManager) checkForQuiescence(advanced set[string]) { outW := ss.OutputWatermark() upPCol, upW := ss.UpstreamWatermark() upS := em.pcolParents[upPCol] - stageState = append(stageState, fmt.Sprintln(id, "watermark in", inW, "out", outW, "upstream", upW, "from", upS, "pending", ss.pending, "byKey", ss.pendingByKeys, "inprogressKeys", ss.inprogressKeys, "byBundle", ss.inprogressKeysByBundle, "holds", ss.watermarkHolds.heap, "holdCounts", ss.watermarkHolds.counts)) + stageState = append(stageState, fmt.Sprintln(id, "watermark in", inW, "out", outW, "upstream", upW, "from", upS, "pending", ss.pending, "byKey", ss.pendingByKeys, "inprogressKeys", ss.inprogressKeys, "byBundle", ss.inprogressKeysByBundle, "holds", ss.watermarkHolds.heap, "holdCounts", ss.watermarkHolds.counts, "holdsInBundle", ss.inprogressHoldsByBundle, "pttEvents", ss.processingTimeTimers.toFire)) } panic(fmt.Sprintf("nothing in progress and no refreshes with non zero pending elements: %v\n%v", v, strings.Join(stageState, ""))) } @@ -683,13 +736,15 @@ func (em *ElementManager) PersistBundle(rb RunBundle, col2Coders map[string]PCol // Triage timers into their time domains for scheduling. // EventTime timers are handled with normal elements, // ProcessingTime timers need to be scheduled into the processing time based queue. - em.triageTimers(d, inputInfo, stage) + newHolds, ptRefreshes := em.triageTimers(d, inputInfo, stage) // Return unprocessed to this stage's pending + // TODO sort out pending element watermark holds for process continuation residuals. unprocessedElements := reElementResiduals(residuals.Data, inputInfo, rb) // Add unprocessed back to the pending stack. if len(unprocessedElements) > 0 { + // TODO actually reschedule based on the residuals delay... count := stage.AddPending(unprocessedElements) em.addPending(count) } @@ -705,6 +760,14 @@ func (em *ElementManager) PersistBundle(rb RunBundle, col2Coders map[string]PCol } delete(stage.inprogressKeysByBundle, rb.BundleID) + // Adjust holds as needed. + for h, c := range newHolds { + if c > 0 { + stage.watermarkHolds.Add(h, c) + } else if c < 0 { + stage.watermarkHolds.Drop(h, -c) + } + } for hold, v := range stage.inprogressHoldsByBundle[rb.BundleID] { stage.watermarkHolds.Drop(hold, v) } @@ -740,11 +803,11 @@ func (em *ElementManager) PersistBundle(rb RunBundle, col2Coders map[string]PCol } stage.mu.Unlock() - em.addRefreshAndClearBundle(stage.ID, rb.BundleID) + em.addRefreshAndClearBundle(stage.ID, rb.BundleID, ptRefreshes) } // triageTimers prepares received timers for eventual firing, as well as rebasing processing time timers as needed. -func (em *ElementManager) triageTimers(d TentativeData, inputInfo PColInfo, stage *stageState) { +func (em *ElementManager) triageTimers(d TentativeData, inputInfo PColInfo, stage *stageState) (map[mtime.Time]int, set[mtime.Time]) { // Process each timer family in the order we received them, so we can filter to the last one. // Since we're process each timer family individually, use a unique key for each userkey, tag, window. // The last timer set for each combination is the next one we're keeping. @@ -753,8 +816,13 @@ func (em *ElementManager) triageTimers(d TentativeData, inputInfo PColInfo, stag tag string win typex.Window } + em.refreshCond.L.Lock() + emNow := em.ProcessingTimeNow() + em.refreshCond.L.Unlock() var pendingEventTimers []element + var pendingProcessingTimers []fireElement + stageRefreshTimes := set[mtime.Time]{} for tentativeKey, timers := range d.timers { keyToTimers := map[timerKey]element{} for _, t := range timers { @@ -763,7 +831,7 @@ func (em *ElementManager) triageTimers(d TentativeData, inputInfo PColInfo, stag keyToTimers[timerKey{key: string(key), tag: tag, win: e.window}] = e } if len(elms) == 0 { - // TODO(lostluck): Determine best way to mark clear a timer cleared. + // TODO(lostluck): Determine best way to mark a timer cleared. continue } } @@ -772,7 +840,20 @@ func (em *ElementManager) triageTimers(d TentativeData, inputInfo PColInfo, stag elm.transform = tentativeKey.Transform elm.family = tentativeKey.Family - pendingEventTimers = append(pendingEventTimers, elm) + if stage.processingTimeTimersFamilies[elm.family] { + // Conditionally rebase processing time or always rebase? + newTimerFire := rebaseProcessingTime(emNow, elm.timestamp) + elm.timestamp = elm.holdTimestamp // Processing Time always uses the hold timestamp as the resulting event time. + pendingProcessingTimers = append(pendingProcessingTimers, fireElement{ + firing: newTimerFire, + timer: elm, + }) + + // Add pending Processing timers to the stage's processing time store & schedule event in the manager. + stageRefreshTimes.insert(newTimerFire) + } else { + pendingEventTimers = append(pendingEventTimers, elm) + } } } @@ -780,6 +861,17 @@ func (em *ElementManager) triageTimers(d TentativeData, inputInfo PColInfo, stag count := stage.AddPending(pendingEventTimers) em.addPending(count) } + changedHolds := map[mtime.Time]int{} + if len(pendingProcessingTimers) > 0 { + stage.mu.Lock() + var count int + for _, v := range pendingProcessingTimers { + count += stage.processingTimeTimers.Persist(v.firing, v.timer, changedHolds) + } + em.addPending(count) + stage.mu.Unlock() + } + return changedHolds, stageRefreshTimes } // FailBundle clears the extant data allowing the execution to shut down. @@ -790,7 +882,7 @@ func (em *ElementManager) FailBundle(rb RunBundle) { em.addPending(-len(completed.es)) delete(stage.inprogress, rb.BundleID) stage.mu.Unlock() - em.addRefreshAndClearBundle(rb.StageID, rb.BundleID) + em.addRefreshAndClearBundle(rb.StageID, rb.BundleID, nil) } // ReturnResiduals is called after a successful split, so the remaining work @@ -815,11 +907,14 @@ func (em *ElementManager) addRefreshes(stages set[string]) { em.refreshCond.Broadcast() } -func (em *ElementManager) addRefreshAndClearBundle(stageID, bundID string) { +func (em *ElementManager) addRefreshAndClearBundle(stageID, bundID string, ptRefreshes set[mtime.Time]) { em.refreshCond.L.Lock() defer em.refreshCond.L.Unlock() delete(em.inprogressBundles, bundID) em.watermarkRefreshes.insert(stageID) + for t := range ptRefreshes { + em.processTimeEvents.Schedule(t, stageID) + } em.refreshCond.Broadcast() } @@ -883,9 +978,10 @@ type stageState struct { sides []LinkID // PCollection IDs of side inputs that can block execution. // Special handling bits - stateful bool // whether this stage uses state or timers, and needs keyed processing. - aggregate bool // whether this stage needs to block for aggregation. - strat winStrat // Windowing Strategy for aggregation fireings. + stateful bool // whether this stage uses state or timers, and needs keyed processing. + aggregate bool // whether this stage needs to block for aggregation. + strat winStrat // Windowing Strategy for aggregation fireings. + processingTimeTimersFamilies map[string]bool // Indicates which timer families use the processing time domain. mu sync.Mutex upstreamWatermarks sync.Map // watermark set from inputPCollection's parent. @@ -909,6 +1005,8 @@ type stageState struct { // This avoids scanning the heap to remove or access a hold for each element. watermarkHolds *holdTracker inprogressHoldsByBundle map[string]map[mtime.Time]int // bundle to associated holds. + + processingTimeTimers *timerHandler } // timerKey uniquely identifies a given timer within the space of a user key. @@ -941,6 +1039,8 @@ func makeStageState(ID string, inputIDs, outputIDs []string, sides []LinkID) *st input: mtime.MinTimestamp, output: mtime.MinTimestamp, estimatedOutput: mtime.MinTimestamp, + + processingTimeTimers: newTimerHandler(), } // Initialize the upstream watermarks to minTime. @@ -1085,10 +1185,10 @@ var ( // startBundle initializes a bundle with elements if possible. // A bundle only starts if there are elements at all, and if it's // an aggregation stage, if the windowing stratgy allows it. -func (ss *stageState) startBundle(watermark mtime.Time, genBundID func() string) (string, bool, bool) { +func (ss *stageState) startEventTimeBundle(watermark mtime.Time, genBundID func() string) (string, bool, bool) { defer func() { if e := recover(); e != nil { - panic(fmt.Sprintf("generating bundle for stage %v at %v panicked\n%v", ss.ID, watermark, e)) + panic(fmt.Sprintf("generating bundle for stage %v at watermark %v panicked\n%v", ss.ID, watermark, e)) } }() ss.mu.Lock() @@ -1112,7 +1212,6 @@ func (ss *stageState) startBundle(watermark mtime.Time, genBundID func() string) // TODO: when we do, we need to ensure that the stage remains schedualable for bundle execution, for remaining pending elements and keys. // With the greedy approach, we don't need to since "new data" triggers a refresh, and so should completing processing of a bundle. newKeys := set[string]{} - stillSchedulable := true holdsInBundle := map[mtime.Time]int{} @@ -1147,7 +1246,7 @@ keysPerBundle: timerCleared = true continue } - holdsInBundle[e.holdTimestamp] = holdsInBundle[e.holdTimestamp] + 1 + holdsInBundle[e.holdTimestamp] += 1 // Clear the "fired" timer so subsequent matches can be ignored. delete(dnt.timers, timerKey{family: e.family, tag: e.tag, window: e.window}) } @@ -1163,8 +1262,9 @@ keysPerBundle: break keysPerBundle } } + stillSchedulable := true if len(ss.pendingByKeys) == 0 && !timerCleared { - // If we're out of data, and timers were not cleared then the watermark is are accurate. + // If we're out of data, and timers were not cleared then the watermark is accurate. stillSchedulable = false } @@ -1173,8 +1273,92 @@ keysPerBundle: return "", false, stillSchedulable } + bundID := ss.makeInProgressBundle(genBundID, toProcess, minTs, newKeys, holdsInBundle) + return bundID, true, stillSchedulable +} + +func (ss *stageState) startProcessingTimeBundle(em *ElementManager, emNow mtime.Time, genBundID func() string) (string, bool, bool) { + ss.mu.Lock() + defer ss.mu.Unlock() + + // TODO: Determine if it's possible and a good idea to treat all EventTime processing as a MinTime + // Special Case for ProcessintTime handling. + // Eg. Always queue EventTime elements at minTime. + // Iterate all available processingTime events until we can't anymore. + // + // Potentially puts too much work on the scheduling thread though. + + var toProcess []element + minTs := mtime.MaxTimestamp + holdsInBundle := map[mtime.Time]int{} + + var notYet []fireElement + + nextTime := ss.processingTimeTimers.Peek() + keyCounts := map[string]int{} + newKeys := set[string]{} + + for nextTime <= emNow { + elems := ss.processingTimeTimers.FireAt(nextTime) + for _, e := range elems { + // Check if we're already executing this timer's key. + if ss.inprogressKeys.present(string(e.keyBytes)) { + notYet = append(notYet, fireElement{firing: nextTime, timer: e}) + continue + } + + // If we are set to have OneKeyPerBundle, and we already have a key for this bundle, we process it later. + if len(keyCounts) > 0 && OneKeyPerBundle { + notYet = append(notYet, fireElement{firing: nextTime, timer: e}) + continue + } + // If we are set to have OneElementPerKey, and we already have an element for this key we set this to process later. + if v := keyCounts[string(e.keyBytes)]; v > 0 && OneElementPerKey { + notYet = append(notYet, fireElement{firing: nextTime, timer: e}) + continue + } + keyCounts[string(e.keyBytes)]++ + newKeys.insert(string(e.keyBytes)) + if e.timestamp < minTs { + minTs = e.timestamp + } + holdsInBundle[e.holdTimestamp]++ + + // We're going to process this timer! + toProcess = append(toProcess, e) + } + + nextTime = ss.processingTimeTimers.Peek() + if nextTime == mtime.MaxTimestamp { + // Escape the loop if there are no more events. + break + } + } + + // Reschedule unfired timers. + notYetHolds := map[mtime.Time]int{} + for _, v := range notYet { + ss.processingTimeTimers.Persist(v.firing, v.timer, notYetHolds) + em.processTimeEvents.Schedule(v.firing, ss.ID) + } + + // Add a refresh if there are still processing time events to process. + stillSchedulable := (nextTime < emNow && nextTime != mtime.MaxTimestamp || len(notYet) > 0) + + if len(toProcess) == 0 { + // If we have nothing + return "", false, stillSchedulable + } + bundID := ss.makeInProgressBundle(genBundID, toProcess, minTs, newKeys, holdsInBundle) + return bundID, true, stillSchedulable +} + +// makeInProgressBundle is common code to store a set of elements as a bundle in progress. +// +// Callers must hold the stage lock. +func (ss *stageState) makeInProgressBundle(genBundID func() string, toProcess []element, minTs mtime.Time, newKeys set[string], holdsInBundle map[mtime.Time]int) string { + // Catch the ordinary case for the minimum timestamp. if toProcess[0].timestamp < minTs { - // Catch the ordinary case. minTs = toProcess[0].timestamp } @@ -1196,7 +1380,7 @@ keysPerBundle: ss.inprogressKeysByBundle[bundID] = newKeys ss.inprogressKeys.merge(newKeys) ss.inprogressHoldsByBundle[bundID] = holdsInBundle - return bundID, true, stillSchedulable + return bundID } func (ss *stageState) splitBundle(rb RunBundle, firstResidual int) { @@ -1327,9 +1511,12 @@ func (ss *stageState) updateWatermarks(em *ElementManager) set[string] { // bundleReady returns the maximum allowed watermark for this stage, and whether // it's permitted to execute by side inputs. -func (ss *stageState) bundleReady(em *ElementManager) (mtime.Time, bool) { +func (ss *stageState) bundleReady(em *ElementManager, emNow mtime.Time) (mtime.Time, bool, bool) { ss.mu.Lock() defer ss.mu.Unlock() + + ptimeEventsReady := ss.processingTimeTimers.Peek() <= emNow || emNow == mtime.MaxTimestamp + // If the upstream watermark and the input watermark are the same, // then we can't yet process this stage. inputW := ss.input @@ -1340,7 +1527,7 @@ func (ss *stageState) bundleReady(em *ElementManager) (mtime.Time, bool) { slog.Group("watermark", slog.Any("upstream", upstreamW), slog.Any("input", inputW))) - return mtime.MinTimestamp, false + return mtime.MinTimestamp, false, ptimeEventsReady } ready := true for _, side := range ss.sides { @@ -1357,5 +1544,28 @@ func (ss *stageState) bundleReady(em *ElementManager) (mtime.Time, bool) { ready = false } } - return upstreamW, ready + return upstreamW, ready, ptimeEventsReady +} + +// ProcessingTimeNow gives the current processing time for the runner. +func (em *ElementManager) ProcessingTimeNow() (ret mtime.Time) { + if em.testStreamHandler != nil && !em.testStreamHandler.completed { + return em.testStreamHandler.Now() + } + // TODO toggle between testmode and production mode. + // "Test" mode -> advance to next processing time event if any, to allow execution. + // if test mode... + if t, ok := em.processTimeEvents.Peek(); ok { + return t + } + + // "Production" mode, always real time now. + now := mtime.Now() + return now +} + +// rebaseProcessingTime turns an absolute processing time to be relative to the provided local clock now. +// Necessary to reasonably schedule ProcessingTime timers within a TestStream using pipeline. +func rebaseProcessingTime(localNow, scheduled mtime.Time) mtime.Time { + return localNow + (scheduled - mtime.Now()) } diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/engine_test.go b/sdks/go/pkg/beam/runners/prism/internal/engine/engine_test.go index 0c042d731d6a..b6e4412c3a83 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/engine/engine_test.go +++ b/sdks/go/pkg/beam/runners/prism/internal/engine/engine_test.go @@ -221,3 +221,46 @@ func TestTestStream(t *testing.T) { } } } + +// TestProcessingTime is the suite for validating behaviors around ProcessingTime. +// Separate from the TestStream, Timers, and Triggers tests due to the unique nature +// of the time domain. +func TestProcessingTime(t *testing.T) { + initRunner(t) + + tests := []struct { + pipeline func(s beam.Scope) + }{ + {pipeline: primitives.TimersProcessingTimeTestStream_Infinity}, + {pipeline: primitives.TimersProcessingTime_Bounded}, + {pipeline: primitives.TimersProcessingTime_Unbounded}, + } + + configs := []struct { + name string + OneElementPerKey, OneKeyPerBundle bool + }{ + {"Greedy", false, false}, + {"AllElementsPerKey", false, true}, + {"OneElementPerKey", true, false}, + // {"OneElementPerBundle", true, true}, // Reveals flaky behavior + } + for _, config := range configs { + for _, test := range tests { + t.Run(initTestName(test.pipeline)+"_"+config.name, func(t *testing.T) { + t.Cleanup(func() { + engine.OneElementPerKey = false + engine.OneKeyPerBundle = false + }) + engine.OneElementPerKey = config.OneElementPerKey + engine.OneKeyPerBundle = config.OneKeyPerBundle + p, s := beam.NewPipelineWithRoot() + test.pipeline(s) + _, err := executeWithT(context.Background(), t, p) + if err != nil { + t.Fatalf("pipeline failed, but feature should be implemented in Prism: %v", err) + } + }) + } + } +} diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/holds.go b/sdks/go/pkg/beam/runners/prism/internal/engine/holds.go index 9077b3f439d6..796ce1c2276a 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/engine/holds.go +++ b/sdks/go/pkg/beam/runners/prism/internal/engine/holds.go @@ -22,21 +22,23 @@ import ( "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime" ) -// holdHeap orders holds based on their timestamps -// so we can always find the minimum timestamp of pending holds. -type holdHeap []mtime.Time +// mtimeHeap is a minHeap to find the earliest processing time event. +// Used for holds, and general processing time event ordering. +type mtimeHeap []mtime.Time -func (h holdHeap) Len() int { return len(h) } -func (h holdHeap) Less(i, j int) bool { return h[i] < h[j] } -func (h holdHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] } +func (h mtimeHeap) Len() int { return len(h) } +func (h mtimeHeap) Less(i, j int) bool { + return h[i] < h[j] +} +func (h mtimeHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] } -func (h *holdHeap) Push(x any) { +func (h *mtimeHeap) Push(x any) { // Push and Pop use pointer receivers because they modify the slice's length, // not just its contents. *h = append(*h, x.(mtime.Time)) } -func (h *holdHeap) Pop() any { +func (h *mtimeHeap) Pop() any { old := *h n := len(old) x := old[n-1] @@ -44,6 +46,15 @@ func (h *holdHeap) Pop() any { return x } +func (h *mtimeHeap) Remove(toRemove mtime.Time) { + for i, v := range *h { + if v == toRemove { + heap.Remove(h, i) + return + } + } +} + // holdTracker track the watermark holds for a stage. // // Timers hold back the watermark until they fire, but multiple @@ -55,7 +66,7 @@ func (h *holdHeap) Pop() any { // A heap of the hold times is kept so we have quick access to the minimum hold, for calculating // how to advance the watermark. type holdTracker struct { - heap holdHeap + heap mtimeHeap counts map[mtime.Time]int } @@ -76,19 +87,13 @@ func (ht *holdTracker) Drop(hold mtime.Time, v int) { panic(fmt.Sprintf("prism error: negative watermark hold count %v for time %v", n, hold)) } delete(ht.counts, hold) - for i, h := range ht.heap { - if hold == h { - heap.Remove(&ht.heap, i) - break - } - } + ht.heap.Remove(hold) } // Add a hold a number of times to heap. If the hold time isn't already present in the heap, it is added. func (ht *holdTracker) Add(hold mtime.Time, v int) { // Mark the hold in the heap. - ht.counts[hold] = ht.counts[hold] + v - + ht.counts[hold] += v if len(ht.counts) != len(ht.heap) { // Since there's a difference, the hold should not be in the heap, so we add it. heap.Push(&ht.heap, hold) diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/processingtime.go b/sdks/go/pkg/beam/runners/prism/internal/engine/processingtime.go new file mode 100644 index 000000000000..9e55c8c43b62 --- /dev/null +++ b/sdks/go/pkg/beam/runners/prism/internal/engine/processingtime.go @@ -0,0 +1,96 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package engine + +import ( + "container/heap" + + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime" +) + +// Notes on Processing Time handling: +// +// ProcessingTime events (processingTime timers, process continuations, triggers) necessarily need to operate on a global queue. +// However, PT timers are per key+family+tag, and may be overwritten by subsequent elements. +// So, similarly to event time timers, we need to manage a "last set" queue, and to manage the holds. +// This implies they should probably be handled by state, instead of globally. +// In reality, it's probably going to be "both", a global PT event queue, and per stage state. +// +// In principle, timers would be how to implement the related features, so getting those right will simplify their handling. +// Test stream is already central, but doesn't set events, it controls their execution. +// +// The ElementManager doesn't retain any data itself, so it should not hold material data about what is being triggered. +// The ElementManager should only contain which stage state should be triggered when in a time domain. +// +// ProcessContinuations count as pending events, and must be drained accordingly before time expires. +// +// A stage may trigger on multiple ticks. +// It's up to a stage to schedule additional work on those notices. + +// stageRefreshQueue manages ProcessingTime events, in particular, which stages need notification +// at which points in processing time they occur. It doesn't handle the interface between +// walltime or any synthetic notions of time. +// +// stageRefreshQueue is not goroutine safe and relies on external synchronization. +type stageRefreshQueue struct { + events map[mtime.Time]set[string] + order mtimeHeap +} + +// newStageRefreshQueue creates an initialized stageRefreshQueue. +func newStageRefreshQueue() *stageRefreshQueue { + return &stageRefreshQueue{ + events: map[mtime.Time]set[string]{}, + } +} + +// Schedule a stage event at the given time. +func (q *stageRefreshQueue) Schedule(t mtime.Time, stageID string) { + if s, ok := q.events[t]; ok { + // We already have a trigger at this time, mutate that instead. + if s.present(stageID) { + // We already notify this stage at this time, no action required. + return + } + s.insert(stageID) + return + } + q.events[t] = set[string]{stageID: struct{}{}} + heap.Push(&q.order, t) +} + +// Peek returns the minimum time in the queue and whether it is valid. +// If there are no times left in the queue, the boolean will be false. +func (q *stageRefreshQueue) Peek() (mtime.Time, bool) { + if len(q.order) == 0 { + return mtime.MaxTimestamp, false + } + return q.order[0], true +} + +// AdvanceTo takes in the current now time, and returns the set of ids that need a refresh. +func (q *stageRefreshQueue) AdvanceTo(now mtime.Time) set[string] { + notify := set[string]{} + for { + // If there are no elements, then we're done. + if len(q.order) == 0 || q.order[0] > now { + return notify + } + // pop elements off the queue until the next time is later than now. + next := heap.Pop(&q.order).(mtime.Time) + notify.merge(q.events[next]) + delete(q.events, next) + } +} diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/processingtime_test.go b/sdks/go/pkg/beam/runners/prism/internal/engine/processingtime_test.go new file mode 100644 index 000000000000..0f0b67b46135 --- /dev/null +++ b/sdks/go/pkg/beam/runners/prism/internal/engine/processingtime_test.go @@ -0,0 +1,139 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package engine + +import ( + "testing" + + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime" + "github.com/google/go-cmp/cmp" +) + +func TestProcessingTimeQueue(t *testing.T) { + t.Run("empty", func(t *testing.T) { + q := newStageRefreshQueue() + emptyTime, ok := q.Peek() + if ok != false { + t.Errorf("q.Peek() on empty queue should have returned false") + } + if got, want := emptyTime, mtime.MaxTimestamp; got != want { + t.Errorf("q.Peek() on empty queue returned %v, want %v", got, want) + } + + tests := []mtime.Time{ + mtime.MinTimestamp, + -273, + 0, + 42, + mtime.EndOfGlobalWindowTime, + mtime.MaxTimestamp, + } + for _, test := range tests { + if got, want := q.AdvanceTo(test), (set[string]{}); len(got) > 0 { + t.Errorf("q.AdvanceTo(%v) on empty queue returned %v, want %v", test, got, want) + } + } + }) + t.Run("scheduled", func(t *testing.T) { + type event struct { + t mtime.Time + stage string + } + + s := func(ids ...string) set[string] { + ret := set[string]{} + for _, id := range ids { + ret.insert(id) + } + return ret + } + + tests := []struct { + name string + events []event + + minTime mtime.Time + + advanceTime mtime.Time + want set[string] + }{ + { + "singleBefore", + []event{{1, "test1"}}, + 1, + 0, + s(), + }, { + "singleAt", + []event{{1, "test1"}}, + 1, + 1, + s("test1"), + }, { + "singleAfter", + []event{{1, "test1"}}, + 1, + 2, + s("test1"), + }, { + "trioDistinct", + []event{{1, "test1"}, {2, "test2"}, {3, "test3"}}, + 1, + 2, + s("test1", "test2"), + }, { + "trioDistinctReversed", + []event{{3, "test3"}, {2, "test2"}, {1, "test1"}}, + 1, + 2, + s("test1", "test2"), + }, { + "trioDistinctTimeSameId", + []event{{3, "test"}, {2, "test"}, {1, "test"}}, + 1, + 2, + s("test"), + }, { + "trioOneTime", + []event{{1, "test3"}, {1, "test2"}, {1, "test1"}}, + 1, + 1, + s("test1", "test2", "test3"), + }, { + "trioDuplicates", + []event{{1, "test"}, {1, "test"}, {1, "test"}}, + 1, + 1, + s("test", "test", "test"), + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + q := newStageRefreshQueue() + for _, e := range test.events { + q.Schedule(e.t, e.stage) + } + if got, _ := q.Peek(); got != test.minTime { + t.Errorf("q.Peek() = %v, want %v", got, test.minTime) + } + + if got, want := q.AdvanceTo(test.advanceTime), test.want; !cmp.Equal(got, want) { + t.Errorf("q.AdvanceTo(%v) = %v, want %v", test.advanceTime, got, want) + } + }) + } + }) +} diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/teststream.go b/sdks/go/pkg/beam/runners/prism/internal/engine/teststream.go index 34b79d455ce1..5f99c2591752 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/engine/teststream.go +++ b/sdks/go/pkg/beam/runners/prism/internal/engine/teststream.go @@ -66,8 +66,8 @@ type tagState struct { // Now represents the overridden ProcessingTime, which is only advanced when directed by an event. // Overrides the elementManager "clock". -func (ts *testStreamHandler) Now() time.Time { - return ts.processingTime +func (ts *testStreamHandler) Now() mtime.Time { + return mtime.FromTime(ts.processingTime) } // TagsToPCollections recieves the map of local output tags to global pcollection ids. @@ -234,6 +234,14 @@ type tsProcessingTimeEvent struct { // Execute this ProcessingTime event by advancing the synthetic processing time. func (ev tsProcessingTimeEvent) Execute(em *ElementManager) { em.testStreamHandler.processingTime = em.testStreamHandler.processingTime.Add(ev.AdvanceBy) + if em.testStreamHandler.processingTime.After(mtime.MaxTimestamp.ToTime()) || ev.AdvanceBy == time.Duration(mtime.MaxTimestamp) { + em.testStreamHandler.processingTime = mtime.MaxTimestamp.ToTime() + } + + // Add the refreshes now so our block prevention logic works. + emNow := em.ProcessingTimeNow() + toRefresh := em.processTimeEvents.AdvanceTo(emNow) + em.watermarkRefreshes.merge(toRefresh) } // tsFinalEvent is the "last" event we perform after all preceeding events. diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/timers.go b/sdks/go/pkg/beam/runners/prism/internal/engine/timers.go index 245b82dd10dd..3f52ebc4510c 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/engine/timers.go +++ b/sdks/go/pkg/beam/runners/prism/internal/engine/timers.go @@ -17,10 +17,12 @@ package engine import ( "bytes" + "container/heap" "encoding/binary" "fmt" "io" "math" + "sync" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coder" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime" @@ -172,3 +174,167 @@ func (d *decoder) Pane() typex.PaneInfo { } return pn } + +// timerHandler tracks timers and ensures that the timer invariant is maintained +// and reports changes in watermark holds. +// +// The invariant is that only a single timer exists for a given userKey+timerID+tag+window. +// +// Timers may prevent the event time watermark using a watermark Hold. +// However due to the invariant, the watermark hold must be changed if a given timer +// has it's firing + hold time updated. +// +// A timerHandler may only hold timers of a single domain, either event time timers, or +// processing time timers. They must not be mixed. +type timerHandler struct { + order mtimeHeap // Maintain the next times to fire at. + toFire map[mtime.Time]map[string]set[timerKey] // fireing time -> userkey -> timerID+tag+window: Lookup + nextFiring map[string]map[timerKey]fireElement // userkey -> timerID+tag+window: actual timer + + timerKeySetPool sync.Pool // set[timerKey]{} + userKeysSetPool sync.Pool // map[string]set[timerKey] + firingMapPool sync.Pool // map[timerKey]element +} + +type fireElement struct { + firing mtime.Time + timer element +} + +func newTimerHandler() *timerHandler { + return &timerHandler{ + toFire: map[mtime.Time]map[string]set[timerKey]{}, + nextFiring: map[string]map[timerKey]fireElement{}, + + timerKeySetPool: sync.Pool{New: func() any { + return set[timerKey]{} + }}, + userKeysSetPool: sync.Pool{New: func() any { + return map[string]set[timerKey]{} + }}, + firingMapPool: sync.Pool{New: func() any { + return map[timerKey]fireElement{} + }}, + } +} + +// timers returns the timers for the userkey. +func (th *timerHandler) timers(timer element) map[timerKey]fireElement { + timers, ok := th.nextFiring[string(timer.keyBytes)] + if !ok { + timers = th.firingMapPool.Get().(map[timerKey]fireElement) + th.nextFiring[string(timer.keyBytes)] = timers + } + return timers +} + +func (th *timerHandler) removeTimer(userKey string, key timerKey) element { + timers, ok := th.nextFiring[userKey] + if !ok { + panic(fmt.Sprintf("prism consistency error: trying to remove a timer for a key without timers: %v,%+v", userKey, key)) + } + times, ok := timers[key] + if !ok { + panic(fmt.Sprintf("prism consistency error: trying to remove a non-existent timer for a key: %v,%+v", userKey, key)) + } + delete(timers, key) + if len(timers) == 0 { + delete(th.nextFiring, userKey) + th.firingMapPool.Put(timers) + } + return times.timer +} + +func (th *timerHandler) add(key timerKey, newFire fireElement) { + byKeys, ok := th.toFire[newFire.firing] + if !ok { + byKeys = th.userKeysSetPool.Get().(map[string]set[timerKey]) + th.toFire[newFire.firing] = byKeys + heap.Push(&th.order, newFire.firing) // We only need to add a firing order when inserting. + } + timers, ok := byKeys[string(newFire.timer.keyBytes)] + if !ok { + timers = th.timerKeySetPool.Get().(set[timerKey]) + byKeys[string(newFire.timer.keyBytes)] = timers + + } + timers.insert(key) + +} + +func (th *timerHandler) replace(key timerKey, oldTimer, newTimer fireElement) { + byKeys := th.toFire[oldTimer.firing] + timers := byKeys[string(oldTimer.timer.keyBytes)] + timers.remove(key) + + th.add(key, newTimer) + + // Clean up timers. + if len(timers) == 0 { + th.timerKeySetPool.Put(timers) + delete(byKeys, string(oldTimer.timer.keyBytes)) + } + if len(byKeys) > 0 { + return + } + th.userKeysSetPool.Put(byKeys) + delete(th.toFire, oldTimer.firing) + th.order.Remove(oldTimer.firing) +} + +// Persist the given timer, and updates the provided hold times map with changes to the hold counts. +func (th *timerHandler) Persist(fire mtime.Time, timer element, holdChanges map[mtime.Time]int) int { + timers := th.timers(timer) + key := timerKey{family: timer.family, tag: timer.tag, window: timer.window} + newTimer := fireElement{firing: fire, timer: timer} + if oldTimer, ok := timers[key]; ok { + // Update with the new times + timers[key] = newTimer + th.replace(key, oldTimer, newTimer) + + holdChanges[newTimer.timer.holdTimestamp] += 1 + holdChanges[oldTimer.timer.holdTimestamp] -= 1 + if holdChanges[oldTimer.timer.holdTimestamp] == 0 { + delete(holdChanges, oldTimer.timer.holdTimestamp) + } + return 0 + } + timers[key] = newTimer + th.add(key, newTimer) + holdChanges[newTimer.timer.holdTimestamp] += 1 + return 1 +} + +// FireAt returns all timers for a key able to fire at the given time. +func (th *timerHandler) FireAt(now mtime.Time) []element { + if th.order.Len() == 0 { + return nil + } + var ret []element + for len(th.order) > 0 && th.order[0] <= now { + next := th.order[0] + byKeys, ok := th.toFire[next] + if ok { + for k, vs := range byKeys { + for v := range vs { + timer := th.removeTimer(k, v) + ret = append(ret, timer) + } + delete(byKeys, k) + } + } + delete(th.toFire, next) + th.userKeysSetPool.Put(byKeys) + heap.Pop(&th.order) + } + return ret +} + +// Peek returns the next scheduled event in the queue. +// Returns [mtime.MaxTimestamp] if the queue is empty. +func (th *timerHandler) Peek() mtime.Time { + if th.order.Len() == 0 { + return mtime.MaxTimestamp + } + return th.order[0] +} diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/timers_test.go b/sdks/go/pkg/beam/runners/prism/internal/engine/timers_test.go new file mode 100644 index 000000000000..b1878ca7991b --- /dev/null +++ b/sdks/go/pkg/beam/runners/prism/internal/engine/timers_test.go @@ -0,0 +1,291 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package engine + +import ( + "slices" + "testing" + + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex" + "github.com/google/go-cmp/cmp" +) + +func TestTimerHandler(t *testing.T) { + fireTime1, fireTime2, fireTime3 := mtime.FromMilliseconds(1000), mtime.FromMilliseconds(1100), mtime.FromMilliseconds(1200) + eventTime1, eventTime2, eventTime3 := mtime.FromMilliseconds(200), mtime.FromMilliseconds(210), mtime.FromMilliseconds(220) + holdTime1, holdTime2, holdTime3 := mtime.FromMilliseconds(300), mtime.FromMilliseconds(301), mtime.FromMilliseconds(303) + tid := "testtransform" + family := "testfamily" + userKey1, userKey2, userKey3 := []byte("userKey1"), []byte("userKey2"), []byte("userKey3") + glo := window.SingleGlobalWindow[0] + iw1, iw2, iw3 := window.IntervalWindow{End: fireTime1}, window.IntervalWindow{End: fireTime2}, window.IntervalWindow{End: fireTime3} + + elemTagWin := func(userKey []byte, eventTime, holdTime mtime.Time, tag string, window typex.Window) element { + return element{ + window: window, + timestamp: eventTime, + holdTimestamp: holdTime, + pane: typex.NoFiringPane(), // TODO, do something with pane. + transform: tid, + family: family, + tag: tag, + keyBytes: userKey, + } + } + + elem := func(userKey []byte, eventTime, holdTime mtime.Time) element { + return elemTagWin(userKey, eventTime, holdTime, "", glo) + } + + fireElem := func(fire mtime.Time, userKey []byte, eventTime, holdTime mtime.Time) fireElement { + return fireElement{ + firing: fire, + timer: elem(userKey, eventTime, holdTime), + } + } + + tests := []struct { + name string + insert []fireElement + + wantHolds map[mtime.Time]int + + onFire mtime.Time + wantTimers []element + }{{ + name: "noTimers", + insert: []fireElement{}, + + wantHolds: map[mtime.Time]int{}, + onFire: fireTime1, + wantTimers: nil, + }, { + name: "singleTimer-singleKey", + insert: []fireElement{fireElem(fireTime1, userKey1, eventTime1, holdTime1)}, + + wantHolds: map[mtime.Time]int{ + holdTime1: 1, + }, + onFire: fireTime1, + wantTimers: []element{elem(userKey1, eventTime1, holdTime1)}, + }, { + name: "singleTimer-multipleKeys", + insert: []fireElement{ + fireElem(fireTime1, userKey1, eventTime1, holdTime1), + fireElem(fireTime1, userKey2, eventTime1, holdTime1), + fireElem(fireTime1, userKey3, eventTime1, holdTime1), + }, + wantHolds: map[mtime.Time]int{ + holdTime1: 3, + }, + onFire: fireTime1, + + wantTimers: []element{ + elem(userKey1, eventTime1, holdTime1), + elem(userKey2, eventTime1, holdTime1), + elem(userKey3, eventTime1, holdTime1), + }, + }, { + name: "multipleTimers-holdsChange", + insert: []fireElement{ + fireElem(fireTime1, userKey1, eventTime1, holdTime1), + fireElem(fireTime1, userKey1, eventTime2, holdTime2), + fireElem(fireTime1, userKey1, eventTime3, holdTime3), + }, + wantHolds: map[mtime.Time]int{ + holdTime3: 1, + }, + onFire: fireTime3, + wantTimers: []element{ + elem(userKey1, eventTime3, holdTime3), + }, + }, { + name: "multipleTimers-firesChange-nofire", + insert: []fireElement{ + fireElem(fireTime1, userKey1, eventTime1, holdTime1), + fireElem(fireTime2, userKey1, eventTime2, holdTime2), + fireElem(fireTime3, userKey1, eventTime3, holdTime3), + }, + wantHolds: map[mtime.Time]int{ + holdTime3: 1, + }, + onFire: fireTime1, + wantTimers: nil, // Nothing should fire. + }, { + name: "multipleTimerKeys-firesAll", + insert: []fireElement{ + fireElem(fireTime1, userKey1, eventTime1, holdTime1), + fireElem(fireTime2, userKey2, eventTime2, holdTime2), + fireElem(fireTime3, userKey3, eventTime3, holdTime3), + }, + wantHolds: map[mtime.Time]int{ + holdTime1: 1, + holdTime2: 1, + holdTime3: 1, + }, + onFire: fireTime3, + wantTimers: []element{ + elem(userKey1, eventTime1, holdTime1), + elem(userKey2, eventTime2, holdTime2), + elem(userKey3, eventTime3, holdTime3), + }, + }, { + name: "multipleTimerKeys-firesTwo", + insert: []fireElement{ + fireElem(fireTime3, userKey3, eventTime3, holdTime3), + fireElem(fireTime2, userKey2, eventTime2, holdTime2), + fireElem(fireTime1, userKey1, eventTime1, holdTime1), + }, + wantHolds: map[mtime.Time]int{ + holdTime1: 1, + holdTime2: 1, + holdTime3: 1, + }, + onFire: fireTime2, + wantTimers: []element{ + elem(userKey1, eventTime1, holdTime1), + elem(userKey2, eventTime2, holdTime2), + }, + }, { + name: "multipleTimerKeys-multiple-replacements", + insert: []fireElement{ + fireElem(fireTime3, userKey3, eventTime3, holdTime3), + fireElem(fireTime2, userKey2, eventTime2, holdTime2), + fireElem(fireTime1, userKey1, eventTime1, holdTime1), + + fireElem(fireTime1, userKey3, eventTime3, holdTime1), // last userKey3 - present at fireTime2 + fireElem(fireTime2, userKey2, eventTime1, holdTime1), + fireElem(fireTime3, userKey1, eventTime1, holdTime1), + + fireElem(fireTime3, userKey1, eventTime3, holdTime3), + fireElem(fireTime2, userKey2, eventTime2, holdTime2), + fireElem(fireTime1, userKey1, eventTime2, holdTime2), + + fireElem(fireTime2, userKey2, eventTime1, holdTime3), + fireElem(fireTime3, userKey2, eventTime2, holdTime2), // last userkey2 - not present at fireTime2 + fireElem(fireTime1, userKey1, eventTime1, holdTime3), // last userkey1 - present at fireTime2 + }, + wantHolds: map[mtime.Time]int{ + holdTime1: 1, + holdTime2: 1, + holdTime3: 1, + }, + onFire: fireTime2, + wantTimers: []element{ + elem(userKey3, eventTime3, holdTime1), + elem(userKey1, eventTime1, holdTime3), + }, + }, { + name: "multipleTimerTags-firesAll", + insert: []fireElement{ + {firing: fireTime1, timer: elemTagWin(userKey1, eventTime1, holdTime1, "tag1", glo)}, + {firing: fireTime2, timer: elemTagWin(userKey1, eventTime1, holdTime1, "tag2", glo)}, + {firing: fireTime3, timer: elemTagWin(userKey1, eventTime1, holdTime1, "tag3", glo)}, + + // Validate replacements on tags + {firing: fireTime1, timer: elemTagWin(userKey1, eventTime2, holdTime1, "tag1", glo)}, + {firing: fireTime2, timer: elemTagWin(userKey1, eventTime2, holdTime1, "tag2", glo)}, + {firing: fireTime3, timer: elemTagWin(userKey1, eventTime2, holdTime1, "tag3", glo)}, + }, + wantHolds: map[mtime.Time]int{ + holdTime1: 3, + }, + onFire: fireTime3, + wantTimers: []element{ + elemTagWin(userKey1, eventTime2, holdTime1, "tag1", glo), + elemTagWin(userKey1, eventTime2, holdTime1, "tag2", glo), + elemTagWin(userKey1, eventTime2, holdTime1, "tag3", glo), + }, + }, { + name: "multipleTimerTags-firesAll", + insert: []fireElement{ + {firing: fireTime1, timer: elemTagWin(userKey1, eventTime1, holdTime1, "", iw1)}, + {firing: fireTime2, timer: elemTagWin(userKey1, eventTime1, holdTime1, "", iw2)}, + {firing: fireTime3, timer: elemTagWin(userKey1, eventTime1, holdTime1, "", iw3)}, + + // Validate replacements on windows + {firing: fireTime1, timer: elemTagWin(userKey1, eventTime2, holdTime1, "", iw1)}, + {firing: fireTime2, timer: elemTagWin(userKey1, eventTime2, holdTime1, "", iw2)}, + {firing: fireTime3, timer: elemTagWin(userKey1, eventTime2, holdTime1, "", iw3)}, + }, + wantHolds: map[mtime.Time]int{ + holdTime1: 3, + }, + onFire: fireTime3, + wantTimers: []element{ + elemTagWin(userKey1, eventTime2, holdTime1, "", iw1), + elemTagWin(userKey1, eventTime2, holdTime1, "", iw2), + elemTagWin(userKey1, eventTime2, holdTime1, "", iw3), + }, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + th := newTimerHandler() + + holdChanges := map[mtime.Time]int{} + + for _, ft := range test.insert { + if ft.timer.IsData() { + t.Fatalf("generated bad timer: %+v", ft) + } + th.Persist(ft.firing, ft.timer, holdChanges) + } + + if d := cmp.Diff(test.wantHolds, holdChanges, cmp.AllowUnexported(element{}, fireElement{})); d != "" { + t.Errorf("Persist(): diff (-want,+got):\n%v", d) + } + fired := th.FireAt(test.onFire) + + lessElement := func(a, b element) int { + if a.timestamp < b.timestamp { + return -1 + } else if a.timestamp > b.timestamp { + return 1 + } + if a.holdTimestamp < b.holdTimestamp { + return -1 + } else if a.holdTimestamp > b.holdTimestamp { + return 1 + } + if string(a.keyBytes) < string(b.keyBytes) { + return -1 + } else if string(a.keyBytes) > string(b.keyBytes) { + return 1 + } + if a.tag < b.tag { + return -1 + } else if a.tag > b.tag { + return 1 + } + if a.window.MaxTimestamp() < b.window.MaxTimestamp() { + return -1 + } else if a.window.MaxTimestamp() > b.window.MaxTimestamp() { + return 1 + } + return 0 + } + slices.SortFunc(fired, lessElement) + slices.SortFunc(test.wantTimers, lessElement) + if d := cmp.Diff(test.wantTimers, fired, cmp.AllowUnexported(element{})); d != "" { + t.Errorf("FireAt(%v): diff (-want,+got):\n%v", test.onFire, d) + } + }) + } +} diff --git a/sdks/go/pkg/beam/runners/prism/internal/execute.go b/sdks/go/pkg/beam/runners/prism/internal/execute.go index b218d84b891c..7276b725a7e0 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/execute.go +++ b/sdks/go/pkg/beam/runners/prism/internal/execute.go @@ -286,7 +286,12 @@ func executePipeline(ctx context.Context, wks map[string]*worker.W, j *jobservic case *pipepb.TestStreamPayload_Event_WatermarkEvent: tsb.AddWatermarkEvent(ev.WatermarkEvent.GetTag(), mtime.Time(ev.WatermarkEvent.GetNewWatermark())) case *pipepb.TestStreamPayload_Event_ProcessingTimeEvent: - tsb.AddProcessingTimeEvent(time.Duration(ev.ProcessingTimeEvent.GetAdvanceDuration()) * time.Millisecond) + if ev.ProcessingTimeEvent.GetAdvanceDuration() == int64(mtime.MaxTimestamp) { + // TODO: Determine the SDK common formalism for setting processing time to infinity. + tsb.AddProcessingTimeEvent(time.Duration(mtime.MaxTimestamp)) + } else { + tsb.AddProcessingTimeEvent(time.Duration(ev.ProcessingTimeEvent.GetAdvanceDuration()) * time.Millisecond) + } default: return fmt.Errorf("prism error building stage %v - unknown TestStream event type: %T", stage.ID, ev) } @@ -310,6 +315,9 @@ func executePipeline(ctx context.Context, wks map[string]*worker.W, j *jobservic if stage.stateful { em.StageStateful(stage.ID) } + if len(stage.processingTimeTimers) > 0 { + em.StageProcessingTimeTimers(stage.ID, stage.processingTimeTimers) + } default: err := fmt.Errorf("unknown environment[%v]", t.GetEnvironmentId()) slog.Error("Execute", err) diff --git a/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go b/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go index 4cff2ae92e7c..6eae311b08f5 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go +++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go @@ -160,7 +160,7 @@ func (s *Server) Prepare(ctx context.Context, req *jobpb.PrepareJobRequest) (*jo } // Validate all the timer features for _, spec := range pardo.GetTimerFamilySpecs() { - check("TimerFamilySpecs.TimeDomain.Urn", spec.GetTimeDomain(), pipepb.TimeDomain_EVENT_TIME) + check("TimerFamilySpecs.TimeDomain.Urn", spec.GetTimeDomain(), pipepb.TimeDomain_EVENT_TIME, pipepb.TimeDomain_PROCESSING_TIME) } check("OnWindowExpirationTimerFamily", pardo.GetOnWindowExpirationTimerFamilySpec(), "") // Unsupported for now. @@ -176,11 +176,6 @@ func (s *Server) Prepare(ctx context.Context, req *jobpb.PrepareJobRequest) (*jo if err := proto.Unmarshal(t.GetSpec().GetPayload(), &testStream); err != nil { return nil, fmt.Errorf("unable to unmarshal TestStreamPayload for %v - %q: %w", tid, t.GetUniqueName(), err) } - for _, ev := range testStream.GetEvents() { - if ev.GetProcessingTimeEvent() != nil { - check("TestStream.Event - ProcessingTimeEvents unsupported.", ev.GetProcessingTimeEvent()) - } - } t.EnvironmentId = "" // Unset the environment, to ensure it's handled prism side. testStreamIds = append(testStreamIds, tid) diff --git a/sdks/go/pkg/beam/runners/prism/internal/stage.go b/sdks/go/pkg/beam/runners/prism/internal/stage.go index 6762f2a3ff70..a8b8bdd918e4 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/stage.go +++ b/sdks/go/pkg/beam/runners/prism/internal/stage.go @@ -55,15 +55,16 @@ type link struct { // account, but all serialization boundaries remain since the pcollections // would continue to get serialized. type stage struct { - ID string - transforms []string - primaryInput string // PCollection used as the parallel input. - outputs []link // PCollections that must escape this stage. - sideInputs []engine.LinkID // Non-parallel input PCollections and their consumers - internalCols []string // PCollections that escape. Used for precise coder sending. - envID string - stateful bool - hasTimers []string + ID string + transforms []string + primaryInput string // PCollection used as the parallel input. + outputs []link // PCollections that must escape this stage. + sideInputs []engine.LinkID // Non-parallel input PCollections and their consumers + internalCols []string // PCollections that escape. Used for precise coder sending. + envID string + stateful bool + hasTimers []string + processingTimeTimers map[string]bool exe transformExecuter inputTransformID string @@ -367,6 +368,12 @@ func buildDescriptor(stg *stage, comps *pipepb.Components, wk *worker.W, em *eng } for timerID, v := range pardo.GetTimerFamilySpecs() { stg.hasTimers = append(stg.hasTimers, tid) + if v.TimeDomain == pipepb.TimeDomain_PROCESSING_TIME { + if stg.processingTimeTimers == nil { + stg.processingTimeTimers = map[string]bool{} + } + stg.processingTimeTimers[timerID] = true + } rewrite = true newCid, err := lpUnknownCoders(v.GetTimerFamilyCoderId(), coders, comps.GetCoders()) if err != nil { diff --git a/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go b/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go index 155f59a1487b..47fc2cccfc54 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go +++ b/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go @@ -354,7 +354,7 @@ func (wk *W) Data(data fnpb.BeamFnData_DataServer) error { for _, d := range resp.GetData() { cr, ok := wk.activeInstructions[d.GetInstructionId()] if !ok { - slog.Info("data.Recv for unknown bundle", "response", resp) + slog.Info("data.Recv data for unknown bundle", "response", resp) continue } // Received data is always for an active ProcessBundle instruction @@ -373,7 +373,7 @@ func (wk *W) Data(data fnpb.BeamFnData_DataServer) error { for _, t := range resp.GetTimers() { cr, ok := wk.activeInstructions[t.GetInstructionId()] if !ok { - slog.Info("data.Recv for unknown bundle", "response", resp) + slog.Info("data.Recv timers for unknown bundle", "response", resp) continue } // Received data is always for an active ProcessBundle instruction diff --git a/sdks/go/test/integration/integration.go b/sdks/go/test/integration/integration.go index 310d6b2b1797..d8aee63718c1 100644 --- a/sdks/go/test/integration/integration.go +++ b/sdks/go/test/integration/integration.go @@ -154,9 +154,6 @@ var prismFilters = []string{ "TestFhirIO.*", // OOMs currently only lead to heap dumps on Dataflow runner "TestOomParDo", - - // The prism runner does not support processing time timers https://github.com/apache/beam/issues/29772. - "TestTimers_ProcessingTime.*", } var flinkFilters = []string{ @@ -188,6 +185,7 @@ var flinkFilters = []string{ "TestTestStreamInt16Sequence", "TestTimers_EventTime_Unbounded", // (failure when comparing on side inputs (NPE on window lookup)) + "TestTimers_ProcessingTime.*", // Flink doesn't support processing time timers. } var samzaFilters = []string{ @@ -292,6 +290,9 @@ var dataflowFilters = []string{ "TestSpannerIO.*", // Dataflow does not drain jobs by itself. "TestDrain", + // Timers + "TestTimers_ProcessingTime_Infinity", // Uses test stream. + "TestTimers_ProcessingTime_Bounded", // Dataflow ignores processing time timers in batch. } // CheckFilters checks if an integration test is filtered to be skipped, either diff --git a/sdks/go/test/integration/primitives/timers.go b/sdks/go/test/integration/primitives/timers.go index 4b7f6e9765f0..40afe98234a7 100644 --- a/sdks/go/test/integration/primitives/timers.go +++ b/sdks/go/test/integration/primitives/timers.go @@ -17,14 +17,17 @@ package primitives import ( "context" + "fmt" "strconv" "time" "github.com/apache/beam/sdks/v2/go/pkg/beam" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/state" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/timers" "github.com/apache/beam/sdks/v2/go/pkg/beam/register" "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert" + "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/teststream" "github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/periodic" ) @@ -150,3 +153,151 @@ func TimersEventTimeUnbounded(s beam.Scope) { return periodic.Impulse(s, now, now.Add(10*time.Second), 0, false) })(s) } + +// Below here are tests for ProcessingTime timers. + +func init() { + register.DoFn2x0[[]byte, func(string, int)](&inputFn[string, int]{}) + register.DoFn5x0[state.Provider, timers.Provider, string, int, func(string, int)](&processingTimeFn{}) +} + +type processingTimeFn struct { + Callback timers.ProcessingTime + MyValue state.Value[int] + Emissions state.Value[int] + + Offset int + TimerOutput int + Cap int +} + +func (fn *processingTimeFn) ProcessElement(sp state.Provider, tp timers.Provider, key string, value int, emit func(string, int)) { + // Sets a processing time callback to occur. + fn.Callback.Set(tp, time.Now().Add(9*time.Second)) + + // Only write to the state if we haven't done so already. + // Writing blind would reset the state, and cause duplicated outputs. + _, ok, err := fn.MyValue.Read(sp) + if err != nil { + panic(err) + } + if !ok { + if err := fn.MyValue.Write(sp, 0); err != nil { + panic(err) + } + } +} + +func (fn *processingTimeFn) OnTimer(ctx context.Context, ts beam.EventTime, sp state.Provider, tp timers.Provider, key string, timer timers.Context, emit func(string, int)) { + switch timer.Family { + case fn.Callback.Family: + switch timer.Tag { + case "": + read, ok, err := fn.MyValue.Read(sp) + if err != nil { + panic(err) + } + if !ok { + panic("State must be set for key: " + key) + } + emit(key, read) + if read < fn.Cap-1 { + if err := fn.MyValue.Write(sp, read+1); err != nil { + panic(err) + } + fn.Callback.Set(tp, time.Now().Add(9*time.Second)) + } + if num, _, err := fn.Emissions.Read(sp); err != nil { + panic(err) + } else if num == fn.Cap { + panic(fmt.Sprintf("cap reached! This shouldn't be possible. key %v, num: %v, cap %v read %v", key, num, fn.Cap, read)) + } else { + if err := fn.Emissions.Write(sp, num+1); err != nil { + panic(err) + } + } + default: + panic("unexpected timer tag: " + timer.Family + " tag:" + timer.Tag + " want: \"\", for key:" + key) + } + default: + if fn.Callback.Family != timer.Family || timer.Tag != "" { + panic("unexpected timer family: " + timer.Family + " tag:" + timer.Tag + " want: " + fn.Callback.Family + ", for key:" + key) + } + } +} + +func regroup(key string, vs func(*int) bool, emit func(kv[string, int])) { + var v int + for vs(&v) { + emit(kvfn(key, v)) + } +} + +func init() { + register.Function3x0(regroup) +} + +func timersProcessingTimePipelineBuilder(makeImp func(s beam.Scope) beam.PCollection) func(s beam.Scope) { + return func(s beam.Scope) { + var inputs, wantOutputs []kv[string, int] + + offset := 5000 + timerOutput := 4093 + + numKeys := 40 + numDuplicateTimers := 15 + + for key := 0; key < numKeys; key++ { + k := strconv.Itoa(key) + for i := 0; i < numDuplicateTimers; i++ { + inputs = append(inputs, kvfn(k, i)) + wantOutputs = append(wantOutputs, kvfn(k, i)) + } + } + + imp := makeImp(s) + + keyed := beam.ParDo(s, &inputFn[string, int]{ + Inputs: inputs, + }, imp) + times := beam.ParDo(s, &processingTimeFn{ + Offset: offset, + TimerOutput: timerOutput, + Callback: timers.InProcessingTime("Callback"), + MyValue: state.MakeValueState[int]("MyValue"), + Cap: numDuplicateTimers, // Syncs the cycles to the number of duplicate keyed inputs. + }, keyed) + // We GroupByKey here so input to passert is blocked until teststream advances time to Infinity. + gbk := beam.GroupByKey(s, times) + regrouped := beam.ParDo(s, regroup, gbk) + passert.EqualsList(s, regrouped, wantOutputs) + } +} + +// TimersProcessingTimeTestStream_Infinity validates processing time timers in a bounded pipeline +// kicked along by TestStream. +func TimersProcessingTimeTestStream_Infinity(s beam.Scope) { + timersProcessingTimePipelineBuilder(func(s beam.Scope) beam.PCollection { + c := teststream.NewConfig() + c.AddElements(123456, []byte{42}) + c.AdvanceWatermarkToInfinity() + c.AdvanceProcessingTime(int64(mtime.FromDuration(10 * time.Second))) + c.AdvanceProcessingTime(int64(mtime.FromDuration(10 * time.Second))) + c.AdvanceProcessingTime(int64(mtime.FromDuration(10 * time.Second))) + c.AdvanceProcessingTimeToInfinity() + return teststream.Create(s, c) + })(s) +} + +// TimersProcessingTimeBounded validates processing time timers in a bounded pipeline. +func TimersProcessingTime_Bounded(s beam.Scope) { + timersProcessingTimePipelineBuilder(beam.Impulse)(s) +} + +// TimersProcessingTimeBounded validates processing time timers in an unbounded pipeline. +func TimersProcessingTime_Unbounded(s beam.Scope) { + timersProcessingTimePipelineBuilder(func(s beam.Scope) beam.PCollection { + now := time.Now() + return periodic.Impulse(s, now, now.Add(10*time.Second), 0, false) + })(s) +} diff --git a/sdks/go/test/integration/primitives/timers_test.go b/sdks/go/test/integration/primitives/timers_test.go index 01f2706c6293..7e62e9da6920 100644 --- a/sdks/go/test/integration/primitives/timers_test.go +++ b/sdks/go/test/integration/primitives/timers_test.go @@ -32,4 +32,12 @@ func TestTimers_EventTime_Unbounded(t *testing.T) { ptest.BuildAndRun(t, TimersEventTimeUnbounded) } -// TODO(https://github.com/apache/beam/issues/29772): Add ProcessingTime Timer tests. +func TestTimers_ProcessingTime_Infinity(t *testing.T) { + integration.CheckFilters(t) + ptest.BuildAndRun(t, TimersProcessingTimeTestStream_Infinity) +} + +func TestTimers_ProcessingTime_Bounded(t *testing.T) { + integration.CheckFilters(t) + ptest.BuildAndRun(t, TimersProcessingTime_Bounded) +}