Skip to content

Commit

Permalink
[#30083] Add synthetic processing time to prism. (#30492)
Browse files Browse the repository at this point in the history
* [prism] Add basic processing time queue.

* Initial residual handling refactor.

* Re-work teststream initilization. Remove pending element race.

* touch up

* rm merge duplicate

* Simplify watermark hold tracking.

* First successful run!

* Remove duplicated test run.

* Deduplicate processing time heap.

* rm debug text

* Remove some debug prints, cleanup.

* tiny todo cleanup

* ProcessingTime workming most of the time!

* Some cleanup

* try to get github suite to pass #1

* touch

* reduce counts a bit, filter tests some.

* Clean up unrelated state changes. Clean up comments somewhat.

* Filter out dataflow incompatible test.

* Refine processing time event comment.

* Remove test touch.

---------

Co-authored-by: lostluck <[email protected]>
  • Loading branch information
lostluck and lostluck authored May 29, 2024
1 parent 7d28155 commit 4daedbf
Show file tree
Hide file tree
Showing 15 changed files with 1,206 additions and 78 deletions.
284 changes: 247 additions & 37 deletions sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go

Large diffs are not rendered by default.

43 changes: 43 additions & 0 deletions sdks/go/pkg/beam/runners/prism/internal/engine/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,3 +221,46 @@ func TestTestStream(t *testing.T) {
}
}
}

// TestProcessingTime is the suite for validating behaviors around ProcessingTime.
// Separate from the TestStream, Timers, and Triggers tests due to the unique nature
// of the time domain.
func TestProcessingTime(t *testing.T) {
initRunner(t)

tests := []struct {
pipeline func(s beam.Scope)
}{
{pipeline: primitives.TimersProcessingTimeTestStream_Infinity},
{pipeline: primitives.TimersProcessingTime_Bounded},
{pipeline: primitives.TimersProcessingTime_Unbounded},
}

configs := []struct {
name string
OneElementPerKey, OneKeyPerBundle bool
}{
{"Greedy", false, false},
{"AllElementsPerKey", false, true},
{"OneElementPerKey", true, false},
// {"OneElementPerBundle", true, true}, // Reveals flaky behavior
}
for _, config := range configs {
for _, test := range tests {
t.Run(initTestName(test.pipeline)+"_"+config.name, func(t *testing.T) {
t.Cleanup(func() {
engine.OneElementPerKey = false
engine.OneKeyPerBundle = false
})
engine.OneElementPerKey = config.OneElementPerKey
engine.OneKeyPerBundle = config.OneKeyPerBundle
p, s := beam.NewPipelineWithRoot()
test.pipeline(s)
_, err := executeWithT(context.Background(), t, p)
if err != nil {
t.Fatalf("pipeline failed, but feature should be implemented in Prism: %v", err)
}
})
}
}
}
39 changes: 22 additions & 17 deletions sdks/go/pkg/beam/runners/prism/internal/engine/holds.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,28 +22,39 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
)

// holdHeap orders holds based on their timestamps
// so we can always find the minimum timestamp of pending holds.
type holdHeap []mtime.Time
// mtimeHeap is a minHeap to find the earliest processing time event.
// Used for holds, and general processing time event ordering.
type mtimeHeap []mtime.Time

func (h holdHeap) Len() int { return len(h) }
func (h holdHeap) Less(i, j int) bool { return h[i] < h[j] }
func (h holdHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
func (h mtimeHeap) Len() int { return len(h) }
func (h mtimeHeap) Less(i, j int) bool {
return h[i] < h[j]
}
func (h mtimeHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }

func (h *holdHeap) Push(x any) {
func (h *mtimeHeap) Push(x any) {
// Push and Pop use pointer receivers because they modify the slice's length,
// not just its contents.
*h = append(*h, x.(mtime.Time))
}

func (h *holdHeap) Pop() any {
func (h *mtimeHeap) Pop() any {
old := *h
n := len(old)
x := old[n-1]
*h = old[0 : n-1]
return x
}

func (h *mtimeHeap) Remove(toRemove mtime.Time) {
for i, v := range *h {
if v == toRemove {
heap.Remove(h, i)
return
}
}
}

// holdTracker track the watermark holds for a stage.
//
// Timers hold back the watermark until they fire, but multiple
Expand All @@ -55,7 +66,7 @@ func (h *holdHeap) Pop() any {
// A heap of the hold times is kept so we have quick access to the minimum hold, for calculating
// how to advance the watermark.
type holdTracker struct {
heap holdHeap
heap mtimeHeap
counts map[mtime.Time]int
}

Expand All @@ -76,19 +87,13 @@ func (ht *holdTracker) Drop(hold mtime.Time, v int) {
panic(fmt.Sprintf("prism error: negative watermark hold count %v for time %v", n, hold))
}
delete(ht.counts, hold)
for i, h := range ht.heap {
if hold == h {
heap.Remove(&ht.heap, i)
break
}
}
ht.heap.Remove(hold)
}

// Add a hold a number of times to heap. If the hold time isn't already present in the heap, it is added.
func (ht *holdTracker) Add(hold mtime.Time, v int) {
// Mark the hold in the heap.
ht.counts[hold] = ht.counts[hold] + v

ht.counts[hold] += v
if len(ht.counts) != len(ht.heap) {
// Since there's a difference, the hold should not be in the heap, so we add it.
heap.Push(&ht.heap, hold)
Expand Down
96 changes: 96 additions & 0 deletions sdks/go/pkg/beam/runners/prism/internal/engine/processingtime.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package engine

import (
"container/heap"

"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
)

// Notes on Processing Time handling:
//
// ProcessingTime events (processingTime timers, process continuations, triggers) necessarily need to operate on a global queue.
// However, PT timers are per key+family+tag, and may be overwritten by subsequent elements.
// So, similarly to event time timers, we need to manage a "last set" queue, and to manage the holds.
// This implies they should probably be handled by state, instead of globally.
// In reality, it's probably going to be "both", a global PT event queue, and per stage state.
//
// In principle, timers would be how to implement the related features, so getting those right will simplify their handling.
// Test stream is already central, but doesn't set events, it controls their execution.
//
// The ElementManager doesn't retain any data itself, so it should not hold material data about what is being triggered.
// The ElementManager should only contain which stage state should be triggered when in a time domain.
//
// ProcessContinuations count as pending events, and must be drained accordingly before time expires.
//
// A stage may trigger on multiple ticks.
// It's up to a stage to schedule additional work on those notices.

// stageRefreshQueue manages ProcessingTime events, in particular, which stages need notification
// at which points in processing time they occur. It doesn't handle the interface between
// walltime or any synthetic notions of time.
//
// stageRefreshQueue is not goroutine safe and relies on external synchronization.
type stageRefreshQueue struct {
events map[mtime.Time]set[string]
order mtimeHeap
}

// newStageRefreshQueue creates an initialized stageRefreshQueue.
func newStageRefreshQueue() *stageRefreshQueue {
return &stageRefreshQueue{
events: map[mtime.Time]set[string]{},
}
}

// Schedule a stage event at the given time.
func (q *stageRefreshQueue) Schedule(t mtime.Time, stageID string) {
if s, ok := q.events[t]; ok {
// We already have a trigger at this time, mutate that instead.
if s.present(stageID) {
// We already notify this stage at this time, no action required.
return
}
s.insert(stageID)
return
}
q.events[t] = set[string]{stageID: struct{}{}}
heap.Push(&q.order, t)
}

// Peek returns the minimum time in the queue and whether it is valid.
// If there are no times left in the queue, the boolean will be false.
func (q *stageRefreshQueue) Peek() (mtime.Time, bool) {
if len(q.order) == 0 {
return mtime.MaxTimestamp, false
}
return q.order[0], true
}

// AdvanceTo takes in the current now time, and returns the set of ids that need a refresh.
func (q *stageRefreshQueue) AdvanceTo(now mtime.Time) set[string] {
notify := set[string]{}
for {
// If there are no elements, then we're done.
if len(q.order) == 0 || q.order[0] > now {
return notify
}
// pop elements off the queue until the next time is later than now.
next := heap.Pop(&q.order).(mtime.Time)
notify.merge(q.events[next])
delete(q.events, next)
}
}
139 changes: 139 additions & 0 deletions sdks/go/pkg/beam/runners/prism/internal/engine/processingtime_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package engine

import (
"testing"

"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
"github.com/google/go-cmp/cmp"
)

func TestProcessingTimeQueue(t *testing.T) {
t.Run("empty", func(t *testing.T) {
q := newStageRefreshQueue()
emptyTime, ok := q.Peek()
if ok != false {
t.Errorf("q.Peek() on empty queue should have returned false")
}
if got, want := emptyTime, mtime.MaxTimestamp; got != want {
t.Errorf("q.Peek() on empty queue returned %v, want %v", got, want)
}

tests := []mtime.Time{
mtime.MinTimestamp,
-273,
0,
42,
mtime.EndOfGlobalWindowTime,
mtime.MaxTimestamp,
}
for _, test := range tests {
if got, want := q.AdvanceTo(test), (set[string]{}); len(got) > 0 {
t.Errorf("q.AdvanceTo(%v) on empty queue returned %v, want %v", test, got, want)
}
}
})
t.Run("scheduled", func(t *testing.T) {
type event struct {
t mtime.Time
stage string
}

s := func(ids ...string) set[string] {
ret := set[string]{}
for _, id := range ids {
ret.insert(id)
}
return ret
}

tests := []struct {
name string
events []event

minTime mtime.Time

advanceTime mtime.Time
want set[string]
}{
{
"singleBefore",
[]event{{1, "test1"}},
1,
0,
s(),
}, {
"singleAt",
[]event{{1, "test1"}},
1,
1,
s("test1"),
}, {
"singleAfter",
[]event{{1, "test1"}},
1,
2,
s("test1"),
}, {
"trioDistinct",
[]event{{1, "test1"}, {2, "test2"}, {3, "test3"}},
1,
2,
s("test1", "test2"),
}, {
"trioDistinctReversed",
[]event{{3, "test3"}, {2, "test2"}, {1, "test1"}},
1,
2,
s("test1", "test2"),
}, {
"trioDistinctTimeSameId",
[]event{{3, "test"}, {2, "test"}, {1, "test"}},
1,
2,
s("test"),
}, {
"trioOneTime",
[]event{{1, "test3"}, {1, "test2"}, {1, "test1"}},
1,
1,
s("test1", "test2", "test3"),
}, {
"trioDuplicates",
[]event{{1, "test"}, {1, "test"}, {1, "test"}},
1,
1,
s("test", "test", "test"),
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
q := newStageRefreshQueue()
for _, e := range test.events {
q.Schedule(e.t, e.stage)
}
if got, _ := q.Peek(); got != test.minTime {
t.Errorf("q.Peek() = %v, want %v", got, test.minTime)
}

if got, want := q.AdvanceTo(test.advanceTime), test.want; !cmp.Equal(got, want) {
t.Errorf("q.AdvanceTo(%v) = %v, want %v", test.advanceTime, got, want)
}
})
}
})
}
12 changes: 10 additions & 2 deletions sdks/go/pkg/beam/runners/prism/internal/engine/teststream.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ type tagState struct {

// Now represents the overridden ProcessingTime, which is only advanced when directed by an event.
// Overrides the elementManager "clock".
func (ts *testStreamHandler) Now() time.Time {
return ts.processingTime
func (ts *testStreamHandler) Now() mtime.Time {
return mtime.FromTime(ts.processingTime)
}

// TagsToPCollections recieves the map of local output tags to global pcollection ids.
Expand Down Expand Up @@ -234,6 +234,14 @@ type tsProcessingTimeEvent struct {
// Execute this ProcessingTime event by advancing the synthetic processing time.
func (ev tsProcessingTimeEvent) Execute(em *ElementManager) {
em.testStreamHandler.processingTime = em.testStreamHandler.processingTime.Add(ev.AdvanceBy)
if em.testStreamHandler.processingTime.After(mtime.MaxTimestamp.ToTime()) || ev.AdvanceBy == time.Duration(mtime.MaxTimestamp) {
em.testStreamHandler.processingTime = mtime.MaxTimestamp.ToTime()
}

// Add the refreshes now so our block prevention logic works.
emNow := em.ProcessingTimeNow()
toRefresh := em.processTimeEvents.AdvanceTo(emNow)
em.watermarkRefreshes.merge(toRefresh)
}

// tsFinalEvent is the "last" event we perform after all preceeding events.
Expand Down
Loading

0 comments on commit 4daedbf

Please sign in to comment.