Skip to content

Commit

Permalink
[Prism] Support BundleFinalization DoFn parameter (apache#32425)
Browse files Browse the repository at this point in the history
* Support BundleFinalization DoFn parameter

* Replace beam.Register with register.DoFn2x0

* Add TestParDoBundleFinalizer.* to filters

* Register test funcs

* Add filter to portable runner tests

* Temporarily skip test

* Simply tests; refactor per PR comments

* Skip tests for not lookback mode

* Clean up tests; add to coverage

* Fix import ordering

---------

Co-authored-by: Robert Burke <[email protected]>
  • Loading branch information
2 people authored and reeba212 committed Dec 4, 2024
1 parent 1e9f49e commit c09cf58
Show file tree
Hide file tree
Showing 12 changed files with 177 additions and 7 deletions.
1 change: 1 addition & 0 deletions sdks/go/pkg/beam/core/runtime/graphx/translate.go
Original file line number Diff line number Diff line change
Expand Up @@ -494,6 +494,7 @@ func (m *marshaller) addMultiEdge(edge NamedEdge) ([]string, error) {
m.requirements[URNRequiresSplittableDoFn] = true
}
if _, ok := edge.Edge.DoFn.ProcessElementFn().BundleFinalization(); ok {
payload.RequestsFinalization = true
m.requirements[URNRequiresBundleFinalization] = true
}
if _, ok := edge.Edge.DoFn.ProcessElementFn().StateProvider(); ok {
Expand Down
12 changes: 11 additions & 1 deletion sdks/go/pkg/beam/core/typex/special.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,18 @@ type Window interface {
Equals(o Window) bool
}

// BundleFinalization allows registering callbacks to be performed after the runner durably persists bundle results.
// BundleFinalization allows registering callbacks for the runner to invoke after the bundle completes and the runner
// commits the output. Parameter is accessible during DoFn StartBundle, ProcessElement, FinishBundle.
// However, if your DoFn implementation requires BundleFinalization in StartBundle or FinishBundle, it is needed in the
// ProcessElement signature, even if not invoked,
// Common use cases for BundleFinalization would be to perform work after elements in a bundle have been processed.
// See beam.ParDo for documentation on these DoFn lifecycle methods.
type BundleFinalization interface {

// RegisterCallback registers the runner to invoke func() after the runner persists the bundle of processed elements.
// The time.Duration configures the callback expiration, after which the runner will not invoke func().
// Returning error communicates to the runner that bundle finalization failed and the runner may choose to attempt
// finalization again.
RegisterCallback(time.Duration, func() error)
}

Expand Down
1 change: 1 addition & 0 deletions sdks/go/pkg/beam/forward.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ type Window = typex.Window

// BundleFinalization represents the parameter used to register callbacks to
// be run once the runner has durably persisted output for a bundle.
// See typex.BundleFinalization for more details.
type BundleFinalization = typex.BundleFinalization

// These are the reflect.Type instances of the universal types, which are used
Expand Down
6 changes: 1 addition & 5 deletions sdks/go/pkg/beam/runners/prism/internal/handlepardo.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,7 @@ func (h *pardo) PrepareTransform(tid string, t *pipepb.PTransform, comps *pipepb
}

// Lets check for and remove anything that makes things less simple.
if pdo.OnWindowExpirationTimerFamilySpec == "" &&
!pdo.RequestsFinalization &&
!pdo.RequiresStableInput &&
!pdo.RequiresTimeSortedInput &&
pdo.RestrictionCoderId == "" {
if pdo.RestrictionCoderId == "" {
// Which inputs are Side inputs don't change the graph further,
// so they're not included here. Any nearly any ParDo can have them.

Expand Down
1 change: 1 addition & 0 deletions sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
var supportedRequirements = map[string]struct{}{
urns.RequirementSplittableDoFn: {},
urns.RequirementStatefulProcessing: {},
urns.RequirementBundleFinalization: {},
}

// TODO, move back to main package, and key off of executor handlers?
Expand Down
1 change: 1 addition & 0 deletions sdks/go/pkg/beam/runners/prism/internal/preprocess.go
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,7 @@ func finalizeStage(stg *stage, comps *pipepb.Components, pipelineFacts *fusionFa
if err := (proto.UnmarshalOptions{}).Unmarshal(t.GetSpec().GetPayload(), pardo); err != nil {
return fmt.Errorf("unable to decode ParDoPayload for %v", link.Transform)
}
stg.finalize = pardo.RequestsFinalization
if len(pardo.GetTimerFamilySpecs())+len(pardo.GetStateSpecs())+len(pardo.GetOnWindowExpirationTimerFamilySpec()) > 0 {
stg.stateful = true
}
Expand Down
12 changes: 11 additions & 1 deletion sdks/go/pkg/beam/runners/prism/internal/stage.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"fmt"
"io"
"runtime/debug"
"sync/atomic"
"time"

Expand Down Expand Up @@ -63,6 +64,7 @@ type stage struct {
sideInputs []engine.LinkID // Non-parallel input PCollections and their consumers
internalCols []string // PCollections that escape. Used for precise coder sending.
envID string
finalize bool
stateful bool
// hasTimers indicates the transform+timerfamily pairs that need to be waited on for
// the stage to be considered complete.
Expand Down Expand Up @@ -106,7 +108,7 @@ func (s *stage) Execute(ctx context.Context, j *jobservices.Job, wk *worker.W, c
defer func() {
// Convert execution panics to errors to fail the bundle.
if e := recover(); e != nil {
err = fmt.Errorf("panic in stage.Execute bundle processing goroutine: %v, stage: %+v", e, s)
err = fmt.Errorf("panic in stage.Execute bundle processing goroutine: %v, stage: %+v,stackTrace:\n%s", e, s, debug.Stack())
}
}()
slog.Debug("Execute: starting bundle", "bundle", rb)
Expand Down Expand Up @@ -322,6 +324,14 @@ progress:
slog.Debug("returned empty residual application", "bundle", rb, slog.Int("numResiduals", l), slog.String("pcollection", s.primaryInput))
}
em.PersistBundle(rb, s.OutputsToCoders, b.OutputData, s.inputInfo, residuals)
if s.finalize {
_, err := b.Finalize(ctx, wk)
if err != nil {
slog.Error("SDK Error from bundle finalization", "bundle", rb, "error", err.Error())
panic(err)
}
slog.Info("finalized bundle", "bundle", rb)
}
b.OutputData = engine.TentativeData{} // Clear the data.
return nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ func TestImplemented(t *testing.T) {
{pipeline: primitives.Checkpoints},
{pipeline: primitives.CoGBK},
{pipeline: primitives.ReshuffleKV},
{pipeline: primitives.ParDoProcessElementBundleFinalizer},

// The following have been "allowed" to unblock further development
// But it's not clear these tests truly validate the expected behavior
Expand Down
11 changes: 11 additions & 0 deletions sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,17 @@ func (b *B) Cleanup(wk *W) {
wk.mu.Unlock()
}

func (b *B) Finalize(ctx context.Context, wk *W) (*fnpb.FinalizeBundleResponse, error) {
resp := wk.sendInstruction(ctx, &fnpb.InstructionRequest{
Request: &fnpb.InstructionRequest_FinalizeBundle{
FinalizeBundle: &fnpb.FinalizeBundleRequest{
InstructionId: b.InstID,
},
},
})
return resp.GetFinalizeBundle(), nil
}

// Progress sends a progress request for the given bundle to the passed in worker, blocking on the response.
func (b *B) Progress(ctx context.Context, wk *W) (*fnpb.ProcessBundleProgressResponse, error) {
resp := wk.sendInstruction(ctx, &fnpb.InstructionRequest{
Expand Down
15 changes: 15 additions & 0 deletions sdks/go/test/integration/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,9 @@ var directFilters = []string{
"TestSetState",
"TestSetStateClear",
"TestTimers.*", // no timer support for the go direct runner.

// no support for BundleFinalizer
"TestParDoBundleFinalizer.*",
}

var portableFilters = []string{
Expand Down Expand Up @@ -134,6 +137,9 @@ var portableFilters = []string{

// The portable runner does not uniquify timers. (data elements re-fired)
"TestTimers.*",

// no support for BundleFinalizer
"TestParDoBundleFinalizer.*",
}

var prismFilters = []string{
Expand Down Expand Up @@ -190,6 +196,9 @@ var flinkFilters = []string{

"TestTimers_EventTime_Unbounded", // (failure when comparing on side inputs (NPE on window lookup))
"TestTimers_ProcessingTime.*", // Flink doesn't support processing time timers.

// no support for BundleFinalizer
"TestParDoBundleFinalizer.*",
}

var samzaFilters = []string{
Expand Down Expand Up @@ -231,6 +240,9 @@ var samzaFilters = []string{

// Samza does not support state.
"TestTimers.*",

// no support for BundleFinalizer
"TestParDoBundleFinalizer.*",
}

var sparkFilters = []string{
Expand Down Expand Up @@ -265,6 +277,9 @@ var sparkFilters = []string{

"TestTimers_EventTime_Unbounded", // Side inputs in executable stage not supported.
"TestTimers_ProcessingTime_Infinity", // Spark doesn't support test stream.

// no support for BundleFinalizer
"TestParDoBundleFinalizer.*",
}

var dataflowFilters = []string{
Expand Down
80 changes: 80 additions & 0 deletions sdks/go/test/integration/primitives/pardo.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package primitives
import (
"flag"
"fmt"
"sync/atomic"
"time"

"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
Expand All @@ -32,6 +34,9 @@ func init() {
register.Function3x2(asymJoinFn)
register.Function5x0(splitByName)
register.Function2x0(emitPipelineOptions)
register.DoFn2x0[beam.BundleFinalization, []byte]((*processElemBundleFinalizer)(nil))
register.DoFn2x0[beam.BundleFinalization, []byte]((*finalizerInFinishBundle)(nil))
register.DoFn2x0[beam.BundleFinalization, []byte]((*finalizerInAll)(nil))

register.Iter1[int]()
register.Iter2[int, int]()
Expand Down Expand Up @@ -192,3 +197,78 @@ func emitPipelineOptions(_ []byte, emit func(string)) {
emit(fmt.Sprintf("%s: %s", "B", beam.PipelineOptions.Get("B")))
emit(fmt.Sprintf("%s: %s", "C", beam.PipelineOptions.Get("C")))
}

var CountInvokeBundleFinalizer atomic.Int32

const (
BundleFinalizerStart = 1
BundleFinalizerProcess = 2
BundleFinalizerFinish = 4
)

// ParDoProcessElementBundleFinalizer creates a beam.Pipeline with a beam.ParDo0 that processes a DoFn with a
// beam.BundleFinalization in its ProcessElement method.
func ParDoProcessElementBundleFinalizer(s beam.Scope) {
imp := beam.Impulse(s)
beam.ParDo0(s, &processElemBundleFinalizer{}, imp)
}

type processElemBundleFinalizer struct {
}

func (fn *processElemBundleFinalizer) ProcessElement(bf beam.BundleFinalization, _ []byte) {
bf.RegisterCallback(time.Second, func() error {
CountInvokeBundleFinalizer.Add(BundleFinalizerProcess)
return nil
})
}

// ParDoFinishBundleFinalizer creates a beam.Pipeline with a beam.ParDo0 that processes a DoFn containing a noop
// beam.BundleFinalization in its ProcessElement method and a beam.BundleFinalization in its FinishBundle method.
func ParDoFinishBundleFinalizer(s beam.Scope) {
imp := beam.Impulse(s)
beam.ParDo0(s, &finalizerInFinishBundle{}, imp)
}

type finalizerInFinishBundle struct{}

// ProcessElement requires beam.BundleFinalization in its method signature in order for FinishBundle's
// beam.BundleFinalization to be invoked.
func (fn *finalizerInFinishBundle) ProcessElement(_ beam.BundleFinalization, _ []byte) {}

func (fn *finalizerInFinishBundle) FinishBundle(bf beam.BundleFinalization) {
bf.RegisterCallback(time.Second, func() error {
CountInvokeBundleFinalizer.Add(BundleFinalizerFinish)
return nil
})
}

// ParDoFinalizerInAll creates a beam.Pipeline with a beam.ParDo0 that processes a DoFn containing a beam.BundleFinalization
// in all three lifecycle methods StartBundle, ProcessElement, FinishBundle.
func ParDoFinalizerInAll(s beam.Scope) {
imp := beam.Impulse(s)
beam.ParDo0(s, &finalizerInAll{}, imp)
}

type finalizerInAll struct{}

func (fn *finalizerInAll) StartBundle(bf beam.BundleFinalization) {
bf.RegisterCallback(time.Second, func() error {
CountInvokeBundleFinalizer.Add(BundleFinalizerStart)
return nil
})
}

func (fn *finalizerInAll) ProcessElement(bf beam.BundleFinalization, _ []byte) {
bf.RegisterCallback(time.Second, func() error {
CountInvokeBundleFinalizer.Add(BundleFinalizerProcess)
return nil
})
}

func (fn *finalizerInAll) FinishBundle(bf beam.BundleFinalization) {
bf.RegisterCallback(time.Second, func() error {
CountInvokeBundleFinalizer.Add(BundleFinalizerFinish)
return nil
})
}
43 changes: 43 additions & 0 deletions sdks/go/test/integration/primitives/pardo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package primitives
import (
"testing"

"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/options/jobopts"
"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest"
"github.com/apache/beam/sdks/v2/go/test/integration"
)
Expand Down Expand Up @@ -46,3 +48,44 @@ func TestParDoPipelineOptions(t *testing.T) {
integration.CheckFilters(t)
ptest.RunAndValidate(t, ParDoPipelineOptions())
}

func TestParDoBundleFinalizer(t *testing.T) {
integration.CheckFilters(t)
if !jobopts.IsLoopback() {
t.Skip("Only Loopback mode is supported")
}
for _, tt := range []struct {
name string
pipelineFn func(s beam.Scope)
want int32
}{
{
name: "InProcessElement",
pipelineFn: ParDoProcessElementBundleFinalizer,
want: BundleFinalizerProcess,
},
{
name: "InFinishBundle",
pipelineFn: ParDoFinishBundleFinalizer,
want: BundleFinalizerFinish,
},
{
name: "InStartProcessFinishBundle",
pipelineFn: ParDoFinalizerInAll,
want: BundleFinalizerStart + BundleFinalizerProcess + BundleFinalizerFinish,
},
} {
t.Run(tt.name, func(t *testing.T) {
CountInvokeBundleFinalizer.Store(0)
p, s := beam.NewPipelineWithRoot()
tt.pipelineFn(s)
_, err := ptest.RunWithMetrics(p)
if err != nil {
t.Fatalf("Failed to execute job: %v", err)
}
if got := CountInvokeBundleFinalizer.Load(); got != tt.want {
t.Errorf("BundleFinalization RegisterCallback not invoked as expected via proxy counts, got: %v, want: %v", got, tt.want)
}
})
}
}

0 comments on commit c09cf58

Please sign in to comment.