Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#32211] Support OnWindowExpiration in Prism. #33337

Merged
merged 9 commits into from
Dec 12, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@
## New Features / Improvements

* X feature added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
* Support OnWindowExpiration in Prism ([#32211](https://github.com/apache/beam/issues/32211)).
* This enables initial Java GroupIntoBatches support.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May we consider:

This enables Java @OnWindowExpiration which enables initial GroupIntoBatches support.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, "@" based annotations are a notion in Java, and "@" indicates a Decorator in Python, which doesn't perform the same task as they do in Java.

We want to refer to the Beam feature, not list the specific and complete manifestations of that feature in all SDK languages, so we can't use @OnWindowExpiration. It's immaterial that the feature currently only exists in Java, and not the other SDKs.

What we could do, in a separate PR, is update the programming guide for the feature, and link to that. Then it would be pointing to the documentation for all SDKs, without fitting to a single SDK.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed thanks.


## Breaking Changes

Expand Down
18 changes: 13 additions & 5 deletions runners/prism/java/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,12 @@ def sickbayTests = [
'org.apache.beam.sdk.testing.TestStreamTest.testMultipleStreams',
'org.apache.beam.sdk.testing.TestStreamTest.testProcessingTimeTrigger',

// GroupIntoBatchesTest tests that fail:
// Teststream has bad KV encodings due to using an outer context.
'org.apache.beam.sdk.transforms.GroupIntoBatchesTest.testInStreamingMode',
// ShardedKey not yet implemented.
'org.apache.beam.sdk.transforms.GroupIntoBatchesTest.testWithShardedKeyInGlobalWindow',

// Coding error somehow: short write: reached end of stream after reading 5 bytes; 98 bytes expected
'org.apache.beam.sdk.testing.TestStreamTest.testMultiStage',

Expand Down Expand Up @@ -228,14 +234,16 @@ def createPrismValidatesRunnerTask = { name, environmentType ->
excludeCategories 'org.apache.beam.sdk.testing.UsesSdkHarnessEnvironment'

// Not yet implemented in Prism
// https://github.com/apache/beam/issues/32211
excludeCategories 'org.apache.beam.sdk.testing.UsesOnWindowExpiration'
// https://github.com/apache/beam/issues/32929
excludeCategories 'org.apache.beam.sdk.testing.UsesOrderedListState'

// Not supported in Portable Java SDK yet.
// https://github.com/apache/beam/issues?q=is%3Aissue+is%3Aopen+MultimapState
excludeCategories 'org.apache.beam.sdk.testing.UsesMultimapState'
// Not supported in Portable Java SDK yet.
// https://github.com/apache/beam/issues?q=is%3Aissue+is%3Aopen+MultimapState
excludeCategories 'org.apache.beam.sdk.testing.UsesMultimapState'

// Processing time with TestStream is unreliable without being able to control
// SDK side time portably. Ignore these tests.
excludeCategories 'org.apache.beam.sdk.testing.UsesTestStreamWithProcessingTime'
}
filter {
for (String test : sickbayTests) {
Expand Down
224 changes: 190 additions & 34 deletions sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,8 @@ type Config struct {
//
// Watermarks are advanced based on consumed input, except if the stage produces residuals.
type ElementManager struct {
config Config
config Config
nextBundID func() string // Generates unique bundleIDs. Set in the Bundles method.

impulses set[string] // List of impulse stages.
stages map[string]*stageState // The state for each stage.
Expand All @@ -197,6 +198,7 @@ type ElementManager struct {
refreshCond sync.Cond // refreshCond protects the following fields with it's lock, and unblocks bundle scheduling.
inprogressBundles set[string] // Active bundleIDs
changedStages set[string] // Stages that have changed and need their watermark refreshed.
injectedBundles []RunBundle // Represents ready to execute bundles prepared outside of the main loop, such as for onWindowExpiration, or for Triggers.

livePending atomic.Int64 // An accessible live pending count. DEBUG USE ONLY
pendingElements sync.WaitGroup // pendingElements counts all unprocessed elements in a job. Jobs with no pending elements terminate successfully.
Expand Down Expand Up @@ -271,6 +273,16 @@ func (em *ElementManager) StageStateful(ID string) {
em.stages[ID].stateful = true
}

// StageOnWindowExpiration marks the given stage as stateful, which means elements are
// processed by key.
func (em *ElementManager) StageOnWindowExpiration(stageID string, timer StaticTimerID) {
ss := em.stages[stageID]
ss.onWindowExpiration = timer
ss.keysToExpireByWindow = map[typex.Window]set[string]{}
ss.inProgressExpiredWindows = map[typex.Window]int{}
ss.expiryWindowsByBundles = map[string]typex.Window{}
}

// StageProcessingTimeTimers indicates which timers are processingTime domain timers.
func (em *ElementManager) StageProcessingTimeTimers(ID string, ptTimers map[string]bool) {
em.stages[ID].processingTimeTimersFamilies = ptTimers
Expand Down Expand Up @@ -338,6 +350,8 @@ func (rb RunBundle) LogValue() slog.Value {
// The returned channel is closed when the context is canceled, or there are no pending elements
// remaining.
func (em *ElementManager) Bundles(ctx context.Context, upstreamCancelFn context.CancelCauseFunc, nextBundID func() string) <-chan RunBundle {
// Make it easier for injected bundles to get unique IDs.
em.nextBundID = nextBundID
runStageCh := make(chan RunBundle)
ctx, cancelFn := context.WithCancelCause(ctx)
go func() {
Expand Down Expand Up @@ -370,8 +384,9 @@ func (em *ElementManager) Bundles(ctx context.Context, upstreamCancelFn context.
changedByProcessingTime := em.processTimeEvents.AdvanceTo(emNow)
em.changedStages.merge(changedByProcessingTime)

// If there are no changed stages or ready processing time events available, we wait until there are.
for len(em.changedStages)+len(changedByProcessingTime) == 0 {
// If there are no changed stages, ready processing time events,
// or injected bundles available, we wait until there are.
for len(em.changedStages)+len(changedByProcessingTime)+len(em.injectedBundles) == 0 {
// Check to see if we must exit
select {
case <-ctx.Done():
Expand All @@ -386,6 +401,19 @@ func (em *ElementManager) Bundles(ctx context.Context, upstreamCancelFn context.
changedByProcessingTime = em.processTimeEvents.AdvanceTo(emNow)
em.changedStages.merge(changedByProcessingTime)
}
// Run any injected bundles first.
for len(em.injectedBundles) > 0 {
rb := em.injectedBundles[0]
em.injectedBundles = em.injectedBundles[1:]
em.refreshCond.L.Unlock()

select {
case <-ctx.Done():
return
case runStageCh <- rb:
}
em.refreshCond.L.Lock()
}

// We know there is some work we can do that may advance the watermarks,
// refresh them, and see which stages have advanced.
Expand Down Expand Up @@ -628,6 +656,12 @@ type Block struct {
Transform, Family string
}

// StaticTimerID represents the static user identifiers for a timer,
// in particular, the ID of the Transform, and the family for the timer.
type StaticTimerID struct {
Transform, TimerFamily string
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// Transform identifies fused topologically sorted ids of Pipeline steps.
Transform string

// TimerFamily is the user identifier for a timer family, such as the parameter annotation for the TimerMap in a Java SDK DoFn.
TimerFamily string

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Transform is the direct ID of a single transform, it's immaterial whether it's in a stage or not.

Stages (and their IDs) represent fused, topologically sorted transforms. Essentially, within Beam, everything has it's own ID, and we generally refer to things by ID, instead of by direct code (except for Schema Row types, which is why they're awkward).

The goal is to avoid putting SDK specific information in Prism, so referring to an SDK specific implementation isn't useful, unless you already know that implementation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we get code comments for these properties?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed offline, I renamed the Transform field to TransformID, which makes comments on both fields redundant (they'd be of the form "TransformID is the ID of the transform containing the timer" etc).

Just to re-iterate the other point I made offline, documentation is best served in appropriate contexts. In this case, this is a type in an internal package, not intended for arbitrary user use, so the documentation bar is lower. That context is safe enough to assume that whomever needs to make a change here knows what a Timer is and how they are identified.

Otherwise they should really look into the beam Protos directly to start. (or some kind of SDK Authoring guide...)

}

// StateForBundle retreives relevant state for the given bundle, WRT the data in the bundle.
//
// TODO(lostluck): Consider unifiying with InputForBundle, to reduce lock contention.
Expand Down Expand Up @@ -847,6 +881,19 @@ func (em *ElementManager) PersistBundle(rb RunBundle, col2Coders map[string]PCol
}
delete(stage.inprogressHoldsByBundle, rb.BundleID)

// Clean up OnWindowExpiration bundle accounting, so window state
// may be garbage collected.
if stage.expiryWindowsByBundles != nil {
win, ok := stage.expiryWindowsByBundles[rb.BundleID]
if ok {
stage.inProgressExpiredWindows[win] -= 1
if stage.inProgressExpiredWindows[win] == 0 {
delete(stage.inProgressExpiredWindows, win)
}
delete(stage.expiryWindowsByBundles, rb.BundleID)
}
}

// If there are estimated output watermarks, set the estimated
// output watermark for the stage.
if len(residuals.MinOutputWatermarks) > 0 {
Expand Down Expand Up @@ -1068,6 +1115,12 @@ type stageState struct {
strat winStrat // Windowing Strategy for aggregation fireings.
processingTimeTimersFamilies map[string]bool // Indicates which timer families use the processing time domain.

// onWindowExpiration management
onWindowExpiration StaticTimerID // The static ID of the OnWindowExpiration callback.
keysToExpireByWindow map[typex.Window]set[string] // Tracks all keys ever used with a window, so they may be expired.
inProgressExpiredWindows map[typex.Window]int // Tracks the number of bundles currently expiring these windows, so we don't prematurely garbage collect them.
expiryWindowsByBundles map[string]typex.Window // Tracks which bundle is handling which window, so the above map can be cleared.

mu sync.Mutex
upstreamWatermarks sync.Map // watermark set from inputPCollection's parent.
input mtime.Time // input watermark for the parallel input.
Expand Down Expand Up @@ -1158,6 +1211,14 @@ func (ss *stageState) AddPending(newPending []element) int {
timers: map[timerKey]timerTimes{},
}
ss.pendingByKeys[string(e.keyBytes)] = dnt
if ss.keysToExpireByWindow != nil {
w, ok := ss.keysToExpireByWindow[e.window]
if !ok {
w = make(set[string])
ss.keysToExpireByWindow[e.window] = w
}
w.insert(string(e.keyBytes))
}
}
heap.Push(&dnt.elements, e)

Expand Down Expand Up @@ -1555,48 +1616,143 @@ func (ss *stageState) updateWatermarks(em *ElementManager) set[string] {
if minWatermarkHold < newOut {
newOut = minWatermarkHold
}
refreshes := set[string]{}
// If the newOut is smaller, then don't change downstream watermarks.
if newOut <= ss.output {
return nil
}

// If bigger, advance the output watermark
if newOut > ss.output {
ss.output = newOut
for _, outputCol := range ss.outputIDs {
consumers := em.consumers[outputCol]

for _, sID := range consumers {
em.stages[sID].updateUpstreamWatermark(outputCol, ss.output)
refreshes.insert(sID)
}
// Inform side input consumers, but don't update the upstream watermark.
for _, sID := range em.sideConsumers[outputCol] {
refreshes.insert(sID.Global)
}
}
// Garbage collect state, timers and side inputs, for all windows
// that are before the new output watermark.
// They'll never be read in again.
for _, wins := range ss.sideInputs {
for win := range wins {
// TODO(#https://github.com/apache/beam/issues/31438):
// Adjust with AllowedLateness
// Clear out anything we've already used.
if win.MaxTimestamp() < newOut {
delete(wins, win)
preventDownstreamUpdate := ss.createOnWindowExpirationBundles(newOut, em)

// Garbage collect state, timers and side inputs, for all windows
// that are before the new output watermark, if they aren't in progress
// of being expired.
// They'll never be read in again.
for _, wins := range ss.sideInputs {
for win := range wins {
// TODO(#https://github.com/apache/beam/issues/31438):
// Adjust with AllowedLateness
// Clear out anything we've already used.
if win.MaxTimestamp() < newOut {
// If the expiry is in progress, skip this window.
if ss.inProgressExpiredWindows[win] > 0 {
continue
}
delete(wins, win)
}
}
for _, wins := range ss.state {
for win := range wins {
// TODO(#https://github.com/apache/beam/issues/31438):
// Adjust with AllowedLateness
if win.MaxTimestamp() < newOut {
delete(wins, win)
}
for _, wins := range ss.state {
for win := range wins {
// TODO(#https://github.com/apache/beam/issues/31438):
// Adjust with AllowedLateness
if win.MaxTimestamp() < newOut {
// If the expiry is in progress, skip collecting this window.
if ss.inProgressExpiredWindows[win] > 0 {
continue
}
delete(wins, win)
}
}
}
// If there are windows to expire, we don't update the output watermark yet.
if preventDownstreamUpdate {
return nil
}

// Update this stage's output watermark, and then propagate that to downstream stages
refreshes := set[string]{}
ss.output = newOut
for _, outputCol := range ss.outputIDs {
consumers := em.consumers[outputCol]

for _, sID := range consumers {
em.stages[sID].updateUpstreamWatermark(outputCol, ss.output)
refreshes.insert(sID)
}
// Inform side input consumers, but don't update the upstream watermark.
for _, sID := range em.sideConsumers[outputCol] {
refreshes.insert(sID.Global)
}
}
return refreshes
}

// createOnWindowExpirationBundles injects bundles when windows
// expire for all keys that were used in that window. Returns true if any
lostluck marked this conversation as resolved.
Show resolved Hide resolved
// bundles are created, which means that the window must not yet be garbage
// collected.
//
// Must be called within the stageState.mu's and the ElementManager.refreshCond
// critical sections.
func (ss *stageState) createOnWindowExpirationBundles(newOut mtime.Time, em *ElementManager) bool {
var preventDownstreamUpdate bool
for win, keys := range ss.keysToExpireByWindow {
// Check if the window has expired.
// TODO(#https://github.com/apache/beam/issues/31438):
// Adjust with AllowedLateness
if win.MaxTimestamp() >= newOut {
continue
}
// We can't advance the output watermark if there's garbage to collect.
preventDownstreamUpdate = true
// Hold off on garbage collecting data for these windows while these
// are in progress.
ss.inProgressExpiredWindows[win] += 1

// Produce bundle(s) for these keys and window, and inject them.
wm := win.MaxTimestamp()
rb := RunBundle{StageID: ss.ID, BundleID: "owe-" + em.nextBundID(), Watermark: wm}

// Now we need to actually build the bundle.
var toProcess []element
busyKeys := set[string]{}
usedKeys := set[string]{}
for k := range keys {
if ss.inprogressKeys.present(k) {
busyKeys.insert(k)
continue
}
usedKeys.insert(k)
toProcess = append(toProcess, element{
window: win,
timestamp: wm,
pane: typex.NoFiringPane(),
holdTimestamp: wm,
transform: ss.onWindowExpiration.Transform,
family: ss.onWindowExpiration.TimerFamily,
sequence: 1,
keyBytes: []byte(k),
elmBytes: nil,
})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes the elements available to the SDK during its OnWindowExpiration callback associated with a given key/window?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, this is where we're producing the timer values that the SDK will consume and ultimately call the OnWindowExpiration callbacks.

This sort of thing usually lives in "PersistBundle" but since these timers aren't directly produced by users, we need to synthesize them here.

}
em.addPending(len(toProcess))
ss.watermarkHolds.Add(wm, 1)
ss.makeInProgressBundle(
func() string { return rb.BundleID },
toProcess,
wm,
usedKeys,
map[mtime.Time]int{wm: 1},
)
ss.expiryWindowsByBundles[rb.BundleID] = win

slog.Debug("OnWindowExpiration-Bundle Created", slog.Any("bundle", rb), slog.Any("usedKeys", usedKeys), slog.Any("window", win), slog.Any("toProcess", toProcess), slog.Any("busyKeys", busyKeys))
// We're already in the refreshCond critical section.
// Insert that this is in progress here to avoid a race condition.
em.inprogressBundles.insert(rb.BundleID)
em.injectedBundles = append(em.injectedBundles, rb)

// Remove the key accounting, or continue tracking which keys still need clearing.
if len(busyKeys) == 0 {
delete(ss.keysToExpireByWindow, win)
} else {
ss.keysToExpireByWindow[win] = busyKeys
}
}
return preventDownstreamUpdate
}

// bundleReady returns the maximum allowed watermark for this stage, and whether
// it's permitted to execute by side inputs.
func (ss *stageState) bundleReady(em *ElementManager, emNow mtime.Time) (mtime.Time, bool, bool) {
Expand Down
Loading
Loading