Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: OnWindowExpiry, and OrderedListState WIP: DO NOT SUBMIT #32950

Draft
wants to merge 4 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 11 additions & 9 deletions runners/prism/java/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ def sickbayTests = [
'org.apache.beam.sdk.transforms.GroupByKeyTest$BasicTests.testAfterProcessingTimeContinuationTriggerEarly',
'org.apache.beam.sdk.transforms.ParDoTest$BundleInvariantsTests.testWatermarkUpdateMidBundle',
'org.apache.beam.sdk.transforms.ViewTest.testTriggeredLatestSingleton',
'org.apache.beam.sdk.transforms.GroupIntoBatchesTest.testBufferingTimerInGlobalWindow',
// Requires Allowed Lateness, among others.
'org.apache.beam.sdk.transforms.ParDoTest$TimerTests.testEventTimeTimerSetWithinAllowedLateness',
'org.apache.beam.sdk.testing.TestStreamTest.testFirstElementLate',
Expand All @@ -105,6 +106,7 @@ def sickbayTests = [
'org.apache.beam.sdk.testing.TestStreamTest.testLateDataAccumulating',
'org.apache.beam.sdk.testing.TestStreamTest.testMultipleStreams',
'org.apache.beam.sdk.testing.TestStreamTest.testProcessingTimeTrigger',
'org.apache.beam.sdk.transforms.ParDoTest$StateTests.testRequiresTimeSortedInputWithLateDataAndAllowedLateness',

// GroupIntoBatchesTest tests that fail:
// Teststream has bad KV encodings due to using an outer context.
Expand Down Expand Up @@ -177,13 +179,6 @@ def sickbayTests = [
// java.lang.IllegalStateException: java.io.EOFException
'org.apache.beam.sdk.transforms.ViewTest.testSideInputWithNestedIterables',

// Requires Time Sorted Input
'org.apache.beam.sdk.transforms.ParDoTest$StateTests.testRequiresTimeSortedInput',
'org.apache.beam.sdk.transforms.ParDoTest$StateTests.testRequiresTimeSortedInputWithTestStream',
'org.apache.beam.sdk.transforms.ParDoTest$StateTests.testRequiresTimeSortedInputWithLateDataAndAllowedLateness',
'org.apache.beam.sdk.transforms.ParDoTest$StateTests.testTwoRequiresTimeSortedInputWithLateData',
'org.apache.beam.sdk.transforms.ParDoTest$StateTests.testRequiresTimeSortedInputWithLateData',

// Missing output due to processing time timer skew.
'org.apache.beam.sdk.transforms.ParDoTest$TimestampTests.testProcessElementSkew',

Expand All @@ -195,6 +190,13 @@ def sickbayTests = [
'org.apache.beam.sdk.transforms.ParDoTest$BundleFinalizationTests.testBundleFinalization',
'org.apache.beam.sdk.transforms.ParDoTest$BundleFinalizationTests.testBundleFinalizationWithSideInputs',

// sharded_key coder isn't implemented in Prism.
'org.apache.beam.sdk.transforms.GroupIntoBatchesTest.testWithShardedKeyInGlobalWindow',

// SDK dies, terminating the job, but due to a bug in how onWindowExpiry is implemented in prism.
// excludeTestsMatching 'org.apache.beam.sdk.transforms.GroupIntoBatchesTest.testBufferingTimerInFixedWindow'
// excludeTestsMatching 'org.apache.beam.sdk.transforms.GroupIntoBatchesTest.testInStreamingMode'

// Filtered by PortableRunner tests.
// Teardown not called in exceptions
// https://github.com/apache/beam/issues/20372
Expand Down Expand Up @@ -222,9 +224,9 @@ def createPrismValidatesRunnerTask = { name, environmentType ->
"--runner=TestPrismRunner",
"--experiments=beam_fn_api",
"--defaultEnvironmentType=${environmentType}",
"--prismLogLevel=warn",
"--prismLogLevel=debug",
"--prismLocation=${prismBuildTask.project.property('buildTarget').toString()}",
"--enableWebUI=false",
"--enableWebUI=true",
])
testClassesDirs = files(project(":sdks:java:core").sourceSets.test.output.classesDirs)
useJUnit {
Expand Down
16 changes: 8 additions & 8 deletions sdks/go/pkg/beam/core/runtime/metricsx/metricsx.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,12 @@ func groupByType(p *pipepb.Pipeline, minfos []*pipepb.MonitoringInfo) (
}
}

var errs []error
var errs []string

for _, minfo := range minfos {
key, err := extractKey(minfo, pcolToTransform)
if err != nil {
errs = append(errs, err)
errs = append(errs, err.Error())
continue
}

Expand All @@ -72,14 +72,14 @@ func groupByType(p *pipepb.Pipeline, minfos []*pipepb.MonitoringInfo) (
case UrnToString(UrnUserSumInt64):
value, err := extractCounterValue(r)
if err != nil {
errs = append(errs, err)
errs = append(errs, err.Error())
continue
}
counters[key] = value
case UrnToString(UrnUserDistInt64):
value, err := extractDistributionValue(r)
if err != nil {
errs = append(errs, err)
errs = append(errs, err.Error())
continue
}
distributions[key] = value
Expand All @@ -89,7 +89,7 @@ func groupByType(p *pipepb.Pipeline, minfos []*pipepb.MonitoringInfo) (
UrnToString(UrnUserBottomNInt64):
value, err := extractGaugeValue(r)
if err != nil {
errs = append(errs, err)
errs = append(errs, err.Error())
continue
}
gauges[key] = value
Expand All @@ -100,7 +100,7 @@ func groupByType(p *pipepb.Pipeline, minfos []*pipepb.MonitoringInfo) (
UrnToString(UrnTransformTotalTime):
value, err := extractMsecValue(r)
if err != nil {
errs = append(errs, err)
errs = append(errs, err.Error())
continue
}
v := msecs[key]
Expand All @@ -118,7 +118,7 @@ func groupByType(p *pipepb.Pipeline, minfos []*pipepb.MonitoringInfo) (
case UrnToString(UrnElementCount):
value, err := extractCounterValue(r)
if err != nil {
errs = append(errs, err)
errs = append(errs, err.Error())
continue
}
v := pcols[key]
Expand All @@ -127,7 +127,7 @@ func groupByType(p *pipepb.Pipeline, minfos []*pipepb.MonitoringInfo) (
case UrnToString(UrnSampledByteSize):
value, err := extractDistributionValue(r)
if err != nil {
errs = append(errs, err)
errs = append(errs, err.Error())
continue
}
v := pcols[key]
Expand Down
1 change: 1 addition & 0 deletions sdks/go/pkg/beam/runners/prism/internal/engine/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ func (d *TentativeData) WriteTimers(transformID, familyID string, timers []byte)
}
link := TimerKey{Transform: transformID, Family: familyID}
d.timers[link] = append(d.timers[link], timers)
// slog.Debug("Data() WriteTimers", slog.Any("transformID", transformID), slog.Any("familyID", familyID), slog.Any("Data", timers))
}

func (d *TentativeData) toWindow(wKey []byte) typex.Window {
Expand Down
39 changes: 30 additions & 9 deletions sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -709,6 +709,7 @@ func (em *ElementManager) StateForBundle(rb RunBundle) TentativeData {
Bag: append([][]byte(nil), data.Bag...),
Multimap: mm,
}
slog.Debug("StateForBundle", slog.String("bundleID", rb.BundleID), slog.Any("state", wlinkMap[key]), slog.Any("key", key), slog.Any("window", w), slog.Any("ret", ret))
}
}
}
Expand Down Expand Up @@ -848,7 +849,7 @@ func (em *ElementManager) PersistBundle(rb RunBundle, col2Coders map[string]PCol
// Triage timers into their time domains for scheduling.
// EventTime timers are handled with normal elements,
// ProcessingTime timers need to be scheduled into the processing time based queue.
newHolds, ptRefreshes := em.triageTimers(d, inputInfo, stage)
newHolds, ptRefreshes := em.triageTimers(rb, d, inputInfo, stage)

// Return unprocessed to this stage's pending
// TODO sort out pending element watermark holds for process continuation residuals.
Expand All @@ -865,6 +866,7 @@ func (em *ElementManager) PersistBundle(rb RunBundle, col2Coders map[string]PCol
// watermark advancement.
stage.mu.Lock()
completed := stage.inprogress[rb.BundleID]
slog.Warn("PersistBundle-cleanup", slog.String("bundle", rb.BundleID), slog.String("stage", rb.StageID))
em.addPending(-len(completed.es))
delete(stage.inprogress, rb.BundleID)
for k := range stage.inprogressKeysByBundle[rb.BundleID] {
Expand Down Expand Up @@ -926,20 +928,21 @@ func (em *ElementManager) PersistBundle(rb RunBundle, col2Coders map[string]PCol
}
}
}
slog.Warn("PersistBundle-cleanup finished", slog.String("bundle", rb.BundleID), slog.String("stage", rb.StageID))
stage.mu.Unlock()

em.markChangedAndClearBundle(stage.ID, rb.BundleID, ptRefreshes)
}

// triageTimers prepares received timers for eventual firing, as well as rebasing processing time timers as needed.
func (em *ElementManager) triageTimers(d TentativeData, inputInfo PColInfo, stage *stageState) (map[mtime.Time]int, set[mtime.Time]) {
func (em *ElementManager) triageTimers(rb RunBundle, d TentativeData, inputInfo PColInfo, stage *stageState) (map[mtime.Time]int, set[mtime.Time]) {
// Process each timer family in the order we received them, so we can filter to the last one.
// Since we're process each timer family individually, use a unique key for each userkey, tag, window.
// The last timer set for each combination is the next one we're keeping.
type timerKey struct {
key string
tag string
win typex.Window
Key string
Tag string
Win typex.Window
}
em.refreshCond.L.Lock()
emNow := em.ProcessingTimeNow()
Expand All @@ -955,19 +958,20 @@ func (em *ElementManager) triageTimers(d TentativeData, inputInfo PColInfo, stag
iter := decodeTimerIter(inputInfo.KeyDec, inputInfo.WindowCoder, t)
iter(func(ret timerRet) bool {
for _, e := range ret.elms {
keyToTimers[timerKey{key: string(ret.keyBytes), tag: ret.tag, win: e.window}] = e
keyToTimers[timerKey{Key: string(ret.keyBytes), Tag: ret.tag, Win: e.window}] = e
}
if len(ret.elms) == 0 {
for _, w := range ret.windows {
delete(keyToTimers, timerKey{key: string(ret.keyBytes), tag: ret.tag, win: w})
delete(keyToTimers, timerKey{Key: string(ret.keyBytes), Tag: ret.tag, Win: w})
}
}
// Indicate we'd like to continue iterating.
return true
})
}

for _, elm := range keyToTimers {
for tk, elm := range keyToTimers {
slog.Debug("PersistBundle() triageTimers", slog.String("bundle", rb.BundleID), slog.Any("staticTimerKey", tentativeKey), slog.Any("timerKey", tk), slog.Any("timerElm", elm))
elm.transform = tentativeKey.Transform
elm.family = tentativeKey.Family

Expand Down Expand Up @@ -1006,7 +1010,8 @@ func (em *ElementManager) triageTimers(d TentativeData, inputInfo PColInfo, stag
}

// FailBundle clears the extant data allowing the execution to shut down.
func (em *ElementManager) FailBundle(rb RunBundle) {
func (em *ElementManager) FailBundle(rb RunBundle, err error) {
slog.Debug("FailBundle", slog.String("bundle", rb.BundleID), slog.Int64("livePending", em.livePending.Load()), slog.Any("error", err))
stage := em.stages[rb.StageID]
stage.mu.Lock()
completed := stage.inprogress[rb.BundleID]
Expand Down Expand Up @@ -1200,6 +1205,18 @@ func makeStageState(ID string, inputIDs, outputIDs []string, sides []LinkID) *st
func (ss *stageState) AddPending(newPending []element) int {
ss.mu.Lock()
defer ss.mu.Unlock()
// TODO(#https://github.com/apache/beam/issues/31438):
// Adjust with AllowedLateness
// Data that arrives after the *output* watermark is late.
threshold := ss.output
origPending := make([]element, 0, ss.pending.Len())
for _, e := range newPending {
if e.timestamp < threshold {
continue
}
origPending = append(origPending, e)
}
newPending = origPending
if ss.stateful {
if ss.pendingByKeys == nil {
ss.pendingByKeys = map[string]*dataAndTimers{}
Expand Down Expand Up @@ -1389,6 +1406,7 @@ keysPerBundle:
e := heap.Pop(&dnt.elements).(element)
if e.IsTimer() {
lastSet, ok := dnt.timers[timerKey{family: e.family, tag: e.tag, window: e.window}]
slog.Debug("startEventTimeBundle: timer read", slog.String("family", e.family), slog.Any("EventTime", lastSet.firing))
if !ok {
timerCleared = true
continue // Timer has "fired" already, so this can be ignored.
Expand All @@ -1402,6 +1420,9 @@ keysPerBundle:
delete(dnt.timers, timerKey{family: e.family, tag: e.tag, window: e.window})
}
toProcess = append(toProcess, e)
if e.family == "ts-gc" {
slog.Debug("startEventTimeBundle: ts-gc timer to be processed")
}
if OneElementPerKey {
break
}
Expand Down
2 changes: 1 addition & 1 deletion sdks/go/pkg/beam/runners/prism/internal/execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ func executePipeline(ctx context.Context, wks map[string]*worker.W, j *jobservic
wk := wks[s.envID]
if err := s.Execute(ctx, j, wk, comps, em, rb); err != nil {
// Ensure we clean up on bundle failure
em.FailBundle(rb)
em.FailBundle(rb, err)
return err
}
return nil
Expand Down
2 changes: 1 addition & 1 deletion sdks/go/pkg/beam/runners/prism/internal/handlerunner.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func (h *runner) handleFlatten(tid string, t *pipepb.PTransform, comps *pipepb.C
return prepareResult{}
}

func (h *runner) handleReshuffle(tid string, t *pipepb.PTransform, comps *pipepb.Components) prepareResult {
func (h *runner) handleReshuffle(_ string, t *pipepb.PTransform, comps *pipepb.Components) prepareResult {
// TODO: Implement the windowing strategy the "backup" transforms used for Reshuffle.

if h.config.SDKReshuffle {
Expand Down
2 changes: 2 additions & 0 deletions sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ var supportedRequirements = map[string]struct{}{
urns.RequirementStatefulProcessing: {},
urns.RequirementBundleFinalization: {},
urns.RequirementOnWindowExpiration: {},

urns.RequirementTimeSortedInput: {},
}

// TODO, move back to main package, and key off of executor handlers?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,8 @@ func (s *Server) Prepare(ctx context.Context, req *jobpb.PrepareJobRequest) (_ *
// Validate all the timer features
for _, spec := range pardo.GetTimerFamilySpecs() {
isStateful = true
check("TimerFamilySpecs.TimeDomain.Urn", spec.GetTimeDomain(), pipepb.TimeDomain_EVENT_TIME, pipepb.TimeDomain_PROCESSING_TIME)
check("TimerFamilySpecs.TimeDomain.Urn", spec.GetTimeDomain(),
pipepb.TimeDomain_EVENT_TIME, pipepb.TimeDomain_PROCESSING_TIME)
}

// Check for a stateful SDF and direct user to https://github.com/apache/beam/issues/32139
Expand Down
5 changes: 5 additions & 0 deletions sdks/go/pkg/beam/runners/prism/internal/preprocess.go
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,11 @@ func finalizeStage(stg *stage, comps *pipepb.Components, pipelineFacts *fusionFa
if len(pardo.GetTimerFamilySpecs())+len(pardo.GetStateSpecs())+len(pardo.GetOnWindowExpirationTimerFamilySpec()) > 0 {
stg.stateful = true
}
// Just make things that require time sorted input be stateful.
// Probably need to just make them a fusion break, but lets start here.
if pardo.GetRequiresTimeSortedInput() {
stg.stateful = true
}
if pardo.GetOnWindowExpirationTimerFamilySpec() != "" {
stg.onWindowExpiration = engine.StaticTimerID{TransformID: link.Transform, TimerFamily: pardo.GetOnWindowExpirationTimerFamilySpec()}
}
Expand Down
19 changes: 17 additions & 2 deletions sdks/go/pkg/beam/runners/prism/internal/web/web.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ type jobDetailsData struct {
JobID, JobName string
State jobpb.JobState_Enum
Transforms []pTransform
PCols map[metrics.StepKey]metrics.PColResult
PCols []metrics.PColResult
DisplayData []*pipepb.LabelledPayload

errorHolder
Expand Down Expand Up @@ -189,12 +189,27 @@ func (h *jobDetailsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
results := metricsx.FromMonitoringInfos(pipeResp.GetPipeline(), mets.GetAttempted(), mets.GetCommitted())

pcols := map[metrics.StepKey]metrics.PColResult{}
var pcolList []metrics.PColResult
allMetsPCol := results.AllMetrics().PCols()
for _, res := range allMetsPCol {
pcols[res.Key] = res
pcolList = append(pcolList, res)
}
sort.Slice(pcolList, func(i, j int) bool {
if pcolList[i].Key.Step < pcolList[j].Key.Step {
return true
} else if pcolList[i].Key.Step > pcolList[j].Key.Step {
return false
}
if pcolList[i].Key.Namespace < pcolList[j].Key.Namespace {
return true
} else if pcolList[i].Key.Namespace > pcolList[j].Key.Namespace {
return false
}
return pcolList[i].Key.Name < pcolList[j].Key.Name
})

data.PCols = pcols
data.PCols = pcolList
trs := pipeResp.GetPipeline().GetComponents().GetTransforms()
col2T, topo := preprocessTransforms(trs)

Expand Down
10 changes: 8 additions & 2 deletions sdks/go/pkg/beam/runners/prism/internal/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -575,14 +575,19 @@ func (wk *W) State(state fnpb.BeamFnState_StateServer) error {
}

case *fnpb.StateRequest_Append:
slog.Debug("StateRequest_Append", "request", prototext.Format(req), "bundle", b)
key := req.GetStateKey()
switch key.GetType().(type) {
case *fnpb.StateKey_BagUserState_:
bagkey := key.GetBagUserState()
b.OutputData.AppendBagState(engine.LinkID{Transform: bagkey.GetTransformId(), Local: bagkey.GetUserStateId()}, bagkey.GetWindow(), bagkey.GetKey(), req.GetAppend().GetData())
b.OutputData.AppendBagState(
engine.LinkID{Transform: bagkey.GetTransformId(), Local: bagkey.GetUserStateId()},
bagkey.GetWindow(), bagkey.GetKey(), req.GetAppend().GetData())
case *fnpb.StateKey_MultimapUserState_:
mmkey := key.GetMultimapUserState()
b.OutputData.AppendMultimapState(engine.LinkID{Transform: mmkey.GetTransformId(), Local: mmkey.GetUserStateId()}, mmkey.GetWindow(), mmkey.GetKey(), mmkey.GetMapKey(), req.GetAppend().GetData())
b.OutputData.AppendMultimapState(
engine.LinkID{Transform: mmkey.GetTransformId(), Local: mmkey.GetUserStateId()},
mmkey.GetWindow(), mmkey.GetKey(), mmkey.GetMapKey(), req.GetAppend().GetData())
case *fnpb.StateKey_OrderedListUserState_:
olkey := key.GetOrderedListUserState()
b.OutputData.AppendOrderedListState(
Expand All @@ -600,6 +605,7 @@ func (wk *W) State(state fnpb.BeamFnState_StateServer) error {
}

case *fnpb.StateRequest_Clear:
slog.Debug("StateRequest_Clear", "request", prototext.Format(req), "bundle", b)
key := req.GetStateKey()
switch key.GetType().(type) {
case *fnpb.StateKey_BagUserState_:
Expand Down
4 changes: 2 additions & 2 deletions sdks/go/pkg/beam/runners/prism/internal/worker/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ func TestWorker_State_MultimapKeysSideInput(t *testing.T) {
instID := wk.NextInst()
wk.activeInstructions[instID] = &B{
MultiMapSideInputData: map[SideInputKey]map[typex.Window]map[string][][]byte{
SideInputKey{
{
TransformID: "transformID",
Local: "i1",
}: {
Expand Down Expand Up @@ -419,7 +419,7 @@ func TestWorker_State_MultimapSideInput(t *testing.T) {
instID := wk.NextInst()
wk.activeInstructions[instID] = &B{
MultiMapSideInputData: map[SideInputKey]map[typex.Window]map[string][][]byte{
SideInputKey{
{
TransformID: "transformID",
Local: "i1",
}: {
Expand Down
Loading
Loading