Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Prism] Terminate Job with CancelFn instead of panic #31599

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 14 additions & 11 deletions sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"container/heap"
"context"
"fmt"
"github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This import is in the wrong group. Please put it in the group with the other sdk imports, not the standard library imports.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm so embarrassed. I'll fix it.

"io"
"sort"
"strings"
Expand Down Expand Up @@ -290,9 +291,9 @@ func (rb RunBundle) LogValue() slog.Value {
// Bundles is the core execution loop. It produces a sequences of bundles able to be executed.
// The returned channel is closed when the context is canceled, or there are no pending elements
// remaining.
func (em *ElementManager) Bundles(ctx context.Context, nextBundID func() string) <-chan RunBundle {
runStageCh := make(chan RunBundle)
func (em *ElementManager) Bundles(ctx context.Context, upstreamCancelFn context.CancelCauseFunc, nextBundID func() string) <-chan RunBundle {
ctx, cancelFn := context.WithCancelCause(ctx)
runStageCh := make(chan RunBundle)
damondouglas marked this conversation as resolved.
Show resolved Hide resolved
go func() {
em.pendingElements.Wait()
slog.Debug("no more pending elements: terminating pipeline")
Expand Down Expand Up @@ -384,7 +385,9 @@ func (em *ElementManager) Bundles(ctx context.Context, nextBundID func() string)
}
}
}
em.checkForQuiescence(advanced)
if err := em.checkForQuiescence(advanced); err != nil {
upstreamCancelFn(err)
}
}
}()
return runStageCh
Expand All @@ -400,11 +403,11 @@ func (em *ElementManager) Bundles(ctx context.Context, nextBundID func() string)
// executing off the next TestStream event.
//
// Must be called while holding em.refreshCond.L.
func (em *ElementManager) checkForQuiescence(advanced set[string]) {
func (em *ElementManager) checkForQuiescence(advanced set[string]) error {
defer em.refreshCond.L.Unlock()
if len(em.inprogressBundles) > 0 {
// If there are bundles in progress, then there may be watermark refreshes when they terminate.
return
return nil
}
if len(em.watermarkRefreshes) > 0 {
// If there are watermarks to refresh, we aren't yet stuck.
Expand All @@ -414,12 +417,12 @@ func (em *ElementManager) checkForQuiescence(advanced set[string]) {
slog.Int("refreshCount", len(em.watermarkRefreshes)),
slog.Int64("pendingElementCount", v),
)
return
return nil
}
if em.testStreamHandler == nil && len(em.processTimeEvents.events) > 0 {
// If there's no test stream involved, and processing time events exist, then
// it's only a matter of time.
return
return nil
}
// The job has quiesced!

Expand All @@ -433,20 +436,20 @@ func (em *ElementManager) checkForQuiescence(advanced set[string]) {
// Note: it's a prism bug if test stream never causes a refresh to occur for a given event.
// It's not correct to move to the next event if no refreshes would occur.
if len(em.watermarkRefreshes) > 0 {
return
return nil
} else if _, ok := nextEvent.(tsProcessingTimeEvent); ok {
// It's impossible to fully control processing time SDK side handling for processing time
// Runner side, so we specialize refresh handling here to avoid spuriously getting stuck.
em.watermarkRefreshes.insert(em.testStreamHandler.ID)
return
return nil
}
// If there are no refreshes, then there's no mechanism to make progress, so it's time to fast fail.
}

v := em.livePending.Load()
if v == 0 {
// Since there are no further pending elements, the job will be terminating successfully.
return
return nil
}
// The job is officially stuck. Fail fast and produce debugging information.
// Jobs must never get stuck so this indicates a bug in prism to be investigated.
Expand All @@ -469,7 +472,7 @@ func (em *ElementManager) checkForQuiescence(advanced set[string]) {
upS := em.pcolParents[upPCol]
stageState = append(stageState, fmt.Sprintln(id, "watermark in", inW, "out", outW, "upstream", upW, "from", upS, "pending", ss.pending, "byKey", ss.pendingByKeys, "inprogressKeys", ss.inprogressKeys, "byBundle", ss.inprogressKeysByBundle, "holds", ss.watermarkHolds.heap, "holdCounts", ss.watermarkHolds.counts, "holdsInBundle", ss.inprogressHoldsByBundle, "pttEvents", ss.processingTimeTimers.toFire))
}
panic(fmt.Sprintf("nothing in progress and no refreshes with non zero pending elements: %v\n%v", v, strings.Join(stageState, "")))
return errors.Errorf("nothing in progress and no refreshes with non zero pending elements: %v\n%v", v, strings.Join(stageState, ""))
}

// InputForBundle returns pre-allocated data for the given bundle, encoding the elements using
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,7 @@ func TestStageState_updateWatermarks(t *testing.T) {

func TestElementManager(t *testing.T) {
t.Run("impulse", func(t *testing.T) {
ctx, cancelFn := context.WithCancelCause(context.Background())
em := NewElementManager(Config{})
em.AddStage("impulse", nil, []string{"output"}, nil)
em.AddStage("dofn", []string{"output"}, nil, nil)
Expand All @@ -327,7 +328,7 @@ func TestElementManager(t *testing.T) {
}

var i int
ch := em.Bundles(context.Background(), func() string {
ch := em.Bundles(ctx, cancelFn, func() string {
defer func() { i++ }()
return fmt.Sprintf("%v", i)
})
Expand Down Expand Up @@ -371,14 +372,15 @@ func TestElementManager(t *testing.T) {
}

t.Run("dofn", func(t *testing.T) {
ctx, cancelFn := context.WithCancelCause(context.Background())
em := NewElementManager(Config{})
em.AddStage("impulse", nil, []string{"input"}, nil)
em.AddStage("dofn1", []string{"input"}, []string{"output"}, nil)
em.AddStage("dofn2", []string{"output"}, nil, nil)
em.Impulse("impulse")

var i int
ch := em.Bundles(context.Background(), func() string {
ch := em.Bundles(ctx, cancelFn, func() string {
defer func() { i++ }()
t.Log("generating bundle", i)
return fmt.Sprintf("%v", i)
Expand Down Expand Up @@ -422,14 +424,15 @@ func TestElementManager(t *testing.T) {
})

t.Run("side", func(t *testing.T) {
ctx, cancelFn := context.WithCancelCause(context.Background())
em := NewElementManager(Config{})
em.AddStage("impulse", nil, []string{"input"}, nil)
em.AddStage("dofn1", []string{"input"}, []string{"output"}, nil)
em.AddStage("dofn2", []string{"input"}, nil, []LinkID{{Transform: "dofn2", Global: "output", Local: "local"}})
em.Impulse("impulse")

var i int
ch := em.Bundles(context.Background(), func() string {
ch := em.Bundles(ctx, cancelFn, func() string {
defer func() { i++ }()
t.Log("generating bundle", i)
return fmt.Sprintf("%v", i)
Expand Down Expand Up @@ -473,13 +476,14 @@ func TestElementManager(t *testing.T) {
}
})
t.Run("residual", func(t *testing.T) {
ctx, cancelFn := context.WithCancelCause(context.Background())
em := NewElementManager(Config{})
em.AddStage("impulse", nil, []string{"input"}, nil)
em.AddStage("dofn", []string{"input"}, nil, nil)
em.Impulse("impulse")

var i int
ch := em.Bundles(context.Background(), func() string {
ch := em.Bundles(ctx, cancelFn, func() string {
defer func() { i++ }()
t.Log("generating bundle", i)
return fmt.Sprintf("%v", i)
Expand Down
2 changes: 1 addition & 1 deletion sdks/go/pkg/beam/runners/prism/internal/execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ func executePipeline(ctx context.Context, wks map[string]*worker.W, j *jobservic
eg.SetLimit(8)

var instID uint64
bundles := em.Bundles(egctx, func() string {
bundles := em.Bundles(egctx, j.CancelFn, func() string {
return fmt.Sprintf("inst%03d", atomic.AddUint64(&instID, 1))
})
for {
Expand Down
Loading