Skip to content

Commit

Permalink
[apache#32498][prism] Add split / progress back off + catch-up. (apac…
Browse files Browse the repository at this point in the history
…he#32526)

* [apache#32498] Add split / progress back off.

* Use 100 milliseconds, and decrease "additively".

---------

Co-authored-by: lostluck <[email protected]>
  • Loading branch information
2 people authored and reeba212 committed Dec 4, 2024
1 parent 0be6d76 commit 1e9f49e
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 2 deletions.
6 changes: 5 additions & 1 deletion sdks/go/pkg/beam/runners/prism/internal/preprocess_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,11 @@ func Test_preprocessor_preProcessGraph(t *testing.T) {
}})

gotStages := pre.preProcessGraph(test.input, nil)
if diff := cmp.Diff(test.wantStages, gotStages, cmp.AllowUnexported(stage{}, link{}), cmpopts.EquateEmpty()); diff != "" {
if diff := cmp.Diff(test.wantStages, gotStages,
cmp.AllowUnexported(stage{}, link{}),
cmpopts.EquateEmpty(),
cmpopts.IgnoreFields(stage{}, "baseProgTick"),
); diff != "" {
t.Errorf("preProcessGraph(%q) stages diff (-want,+got)\n%v", test.name, diff)
}

Expand Down
46 changes: 45 additions & 1 deletion sdks/go/pkg/beam/runners/prism/internal/stage.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"fmt"
"io"
"sync/atomic"
"time"

"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
Expand Down Expand Up @@ -76,9 +77,32 @@ type stage struct {

SinkToPCollection map[string]string
OutputsToCoders map[string]engine.PColInfo

// Stage specific progress and splitting interval.
baseProgTick atomic.Value // time.Duration
}

// The minimum and maximum durations between each ProgressBundleRequest and split evaluation.
const (
minimumProgTick = 100 * time.Millisecond
maximumProgTick = 30 * time.Second
)

func clampTick(dur time.Duration) time.Duration {
switch {
case dur < minimumProgTick:
return minimumProgTick
case dur > maximumProgTick:
return maximumProgTick
default:
return dur
}
}

func (s *stage) Execute(ctx context.Context, j *jobservices.Job, wk *worker.W, comps *pipepb.Components, em *engine.ElementManager, rb engine.RunBundle) (err error) {
if s.baseProgTick.Load() == nil {
s.baseProgTick.Store(minimumProgTick)
}
defer func() {
// Convert execution panics to errors to fail the bundle.
if e := recover(); e != nil {
Expand Down Expand Up @@ -142,7 +166,9 @@ func (s *stage) Execute(ctx context.Context, j *jobservices.Job, wk *worker.W, c
previousTotalCount := int64(-2) // Total count of all pcollection elements.

unsplit := true
progTick := time.NewTicker(100 * time.Millisecond)
baseTick := s.baseProgTick.Load().(time.Duration)
ticked := false
progTick := time.NewTicker(baseTick)
defer progTick.Stop()
var dataFinished, bundleFinished bool
// If we have no data outputs, we still need to have progress & splits
Expand Down Expand Up @@ -170,6 +196,7 @@ progress:
break progress // exit progress loop on close.
}
case <-progTick.C:
ticked = true
resp, err := b.Progress(ctx, wk)
if err != nil {
slog.Debug("SDK Error from progress, aborting progress", "bundle", rb, "error", err.Error())
Expand All @@ -196,6 +223,7 @@ progress:
unsplit = false
continue progress
}

// TODO sort out rescheduling primary Roots on bundle failure.
var residuals []engine.Residual
for _, rr := range sr.GetResidualRoots() {
Expand All @@ -220,12 +248,28 @@ progress:
Data: residuals,
})
}

// Any split means we're processing slower than desired, but splitting should increase
// throughput. Back off for this and other bundles for this stage
baseTime := s.baseProgTick.Load().(time.Duration)
newTime := clampTick(baseTime * 4)
if s.baseProgTick.CompareAndSwap(baseTime, newTime) {
progTick.Reset(newTime)
} else {
progTick.Reset(s.baseProgTick.Load().(time.Duration))
}
} else {
previousIndex = index["index"]
previousTotalCount = index["totalCount"]
}
}
}
// If we never received any progress ticks, we may have too long a time, shrink it for new runs instead.
if !ticked {
newTick := clampTick(baseTick - minimumProgTick)
// If it's otherwise unchanged, apply the new duration.
s.baseProgTick.CompareAndSwap(baseTick, newTick)
}
// Tentative Data is ready, commit it to the main datastore.
slog.Debug("Execute: committing data", "bundle", rb, slog.Any("outputsWithData", maps.Keys(b.OutputData.Raw)), slog.Any("outputs", maps.Keys(s.OutputsToCoders)))

Expand Down

0 comments on commit 1e9f49e

Please sign in to comment.