Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Scheduled Actions V2] WIP Generator component #6905

Draft
wants to merge 3 commits into
base: sched2_core
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
289 changes: 289 additions & 0 deletions components/scheduler2/generator/executors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,289 @@
package generator

import (
"context"
"fmt"
"time"

"go.temporal.io/api/serviceerror"
schedpb "go.temporal.io/server/api/schedule/v1"
servercommon "go.temporal.io/server/common"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/log/tag"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/primitives/timestamp"
"go.temporal.io/server/components/scheduler2/common"
"go.temporal.io/server/components/scheduler2/core"
"go.temporal.io/server/components/scheduler2/executor"
"go.temporal.io/server/service/history/consts"
"go.temporal.io/server/service/history/hsm"
"go.temporal.io/server/service/worker/scheduler"
scheduler1 "go.temporal.io/server/service/worker/scheduler"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Duplicate import.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will fix

"go.uber.org/fx"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/types/known/timestamppb"
)

type (
TaskExecutorOptions struct {
fx.In

Config *common.Config
MetricsHandler metrics.Handler
Logger log.Logger
SpecBuilder *scheduler1.SpecBuilder
}

taskExecutor struct {
TaskExecutorOptions
}
)

func RegisterExecutor(registry *hsm.Registry, options TaskExecutorOptions) error {
e := taskExecutor{options}
if err := hsm.RegisterTimerExecutor(registry, e.executeSleepTask); err != nil {
return err
}
if err := hsm.RegisterImmediateExecutor(registry, e.executeBufferTask); err != nil {
return err
}
return nil
}

func (e taskExecutor) executeSleepTask(env hsm.Environment, node *hsm.Node, task SleepTask) error {
return hsm.MachineTransition(node, func(g Generator) (hsm.TransitionOutput, error) {
return TransitionBuffer.Apply(g, EventBuffer{
Node: node,
})
})
}

func (e taskExecutor) executeBufferTask(ctx context.Context, env hsm.Environment, ref hsm.Ref, task BufferTask) error {
scheduler, err := common.LoadSchedulerFromParent(ctx, env, ref)
if err != nil {
return err
}
tweakables := e.Config.Tweakables(scheduler.Namespace)

generator, err := e.loadGenerator(ctx, env, ref)
if err != nil {
return err
}

// if we have no last processed time, this is a new schedule
if generator.LastProcessedTime == nil {
generator.LastProcessedTime = timestamppb.Now()
// TODO - update schedule info with create time

e.logSchedule("Starting schedule", scheduler)
}

// process time range between last high water mark and system time
t1 := timestamp.TimeValue(generator.LastProcessedTime)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not this? (out of curiosity)

Suggested change
t1 := timestamp.TimeValue(generator.LastProcessedTime)
t1 := generator.LastProcessedTime.AsTime()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure, artifact from original implementation I suppose. will fix :)

t2 := time.Now()
if t2.Before(t1) {
e.Logger.Warn("Time went backwards",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wondering how this log would be printed. Could be worth baking a logger with some relevant tags.

tag.NewStringTag("time", t1.String()),
tag.NewStringTag("time", t2.String()))
t2 = t1
}

res, err := e.processTimeRange(scheduler, tweakables, t1, t2, nil)
if err != nil {
// An error here should be impossible and go to the DLQ
e.Logger.Error("Error processing time range", tag.Error(err))

return fmt.Errorf(
"%w: %w",
err,
serviceerror.NewInternal("Scheduler's Generator failed to process a time range"),
)
}
generator.LastProcessedTime = timestamppb.New(res.LastActionTime)
generator.NextInvocationTime = timestamppb.New(res.NextWakeupTime)

return env.Access(ctx, ref, hsm.AccessWrite, func(generatorNode *hsm.Node) error {
// Check for scheduler version conflict. We don't need to bump it, as the
// Generator's buffered actions are idempotent when pushed to the Executor, and the
// Generator's own state is also versioned. This helps Generator quickly react to
// schedule updates/pauses.
refreshedScheduler, err := hsm.MachineData[core.Scheduler](generatorNode.Parent)
if err != nil {
return err
}
if refreshedScheduler.ConflictToken != scheduler.ConflictToken {
// schedule was updated while processing, retry
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we actually want to retry the task here? I commented on the transition definition too but I think you can do all of this under a write lock since there's no IO involved AFAICT.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe not - If we were doing this under the write lock on the generator node, is the expectation that we'd also attempt to acquire the generator node for write access during an update request? I've been assuming that env.Access scopes applied only to the given ref, do they/can they apply more broadly?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The lock is held for the entire mutable state record including the whole tree structure.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah! Okay, I'd been thinking that each component could potentially map to its own mutable state. One MS simplifies that. Will update to just fail out the task (I believe ErrStaleReference is for this purpose?)

return fmt.Errorf(
"%w: Scheduler state was updated while buffering actions",
consts.ErrStaleState,
)
}

// transition the executor substate machine to execute the new buffered actions
executorNode, err := generatorNode.Parent.Child([]hsm.Key{executor.MachineKey})
if err != nil {
return fmt.Errorf(
"%w: %w",
err,
serviceerror.NewInternal("Scheduler is missing its Executor node"),
)
}
err = hsm.MachineTransition(executorNode, func(e executor.Executor) (hsm.TransitionOutput, error) {
return executor.TransitionExecute.Apply(e, executor.EventExecute{
Node: executorNode,
BufferedActions: []*schedpb.BufferedStart{},
})
})
if err != nil {
return fmt.Errorf(
"%w: unable to transition Executor to Executing state",
err,
)
}

// transition the generator back to waiting with new wait times. if we fail
// conflict here (after transitioning Executor), that's fine; buffered actions
// are idempotent through their generated request IDs
return hsm.MachineTransition(generatorNode, func(g Generator) (hsm.TransitionOutput, error) {
if g.ConflictToken != generator.ConflictToken {
return hsm.TransitionOutput{}, fmt.Errorf(
"%w: conflicting Generator state while buffering actions",
consts.ErrStaleState,
)
}

g.GeneratorInternalState = generator.GeneratorInternalState
g.ConflictToken++
return TransitionSleep.Apply(g, EventSleep{
Node: generatorNode,
Deadline: generator.NextInvocationTime.AsTime(),
})
})
})
}

func (e taskExecutor) logSchedule(msg string, scheduler core.Scheduler) {
// log spec as json since it's more readable than the Go representation
specJson, _ := protojson.Marshal(scheduler.Schedule.Spec)
policiesJson, _ := protojson.Marshal(scheduler.Schedule.Policies)
e.Logger.Info(msg,
tag.NewStringTag("spec", string(specJson)),
tag.NewStringTag("policies", string(policiesJson)))
}

type processedTimeRangeResult struct {
NextWakeupTime time.Time
LastActionTime time.Time
BufferedStarts []*schedpb.BufferedStart
}

// Processes the given time range, generating buffered actions according to the
// schedule spec.
func (e taskExecutor) processTimeRange(
scheduler core.Scheduler,
tweakables common.Tweakables,
start, end time.Time,
limit *int,
) (*processedTimeRangeResult, error) {
overlapPolicy := scheduler.OverlapPolicy()

e.Logger.Debug("processTimeRange",
tag.NewTimeTag("start", start),
tag.NewTimeTag("end", end),
tag.NewAnyTag("overlap-policy", overlapPolicy),
tag.NewBoolTag("manual", false))

catchupWindow := e.catchupWindow(scheduler, tweakables)

// Peek at paused/remaining actions state and don't bother if we're not going to
// take an action now. (Don't count as missed catchup window either.)
// Skip over entire time range if paused or no actions can be taken.
if !scheduler.CanTakeScheduledAction(false) {
// use end as last action time so that we don't reprocess time spent paused
next, err := e.getNextTime(scheduler, end)
if err != nil {
return nil, err
}

return &processedTimeRangeResult{
NextWakeupTime: next.Next,
LastActionTime: end,
BufferedStarts: nil,
}, nil
}

lastAction := start
var next scheduler1.GetNextTimeResult
var bufferedStarts []*schedpb.BufferedStart
for next, err := e.getNextTime(scheduler, start); err == nil && !(next.Next.IsZero() || next.Next.After(end)); next, err = e.getNextTime(scheduler, next.Next) {
if scheduler.Info.UpdateTime.AsTime().After(next.Next) {
// We're reprocessing since the most recent event after an update. Discard actions before
// the update time (which was just set to "now"). This doesn't have to be guarded with
// hasMinVersion because this condition couldn't happen in previous versions.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I imagine this comment should be rewritten somewhat. We don't have versions anymore, at least. Also I'm not sure how updates fit into this execution model, it may not be true that "we just set update time to 'now'"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, yes, will rewrite this.

continue
}

if end.Sub(next.Next) > catchupWindow {
e.Logger.Warn("Schedule missed catchup window", tag.NewTimeTag("now", end), tag.NewTimeTag("time", next.Next))
e.MetricsHandler.Counter(metrics.ScheduleMissedCatchupWindow.Name()).Record(1)

// TODO - update Info.MissedCatchupWindow
// s.Info.MissedCatchupWindow++
// or write that to the generator's persisted state?
continue
}

bufferedStarts = append(bufferedStarts, &schedpb.BufferedStart{
NominalTime: timestamppb.New(next.Nominal),
ActualTime: timestamppb.New(next.Next),
OverlapPolicy: scheduler.OverlapPolicy(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
OverlapPolicy: scheduler.OverlapPolicy(),
OverlapPolicy: overlapPolicy,

?

Manual: false,
RequestId: common.GenerateRequestID(scheduler, "", next.Nominal, next.Next),
})
lastAction = next.Next

if limit != nil {
if (*limit)--; *limit <= 0 {
break
}
}
}

return &processedTimeRangeResult{
NextWakeupTime: next.Next,
LastActionTime: lastAction,
BufferedStarts: bufferedStarts,
}, nil
}

func (e taskExecutor) catchupWindow(s core.Scheduler, tweakables common.Tweakables) time.Duration {
cw := s.Schedule.Policies.CatchupWindow
if cw == nil {
return tweakables.DefaultCatchupWindow
}

return max(cw.AsDuration(), tweakables.MinCatchupWindow)
}

// Returns the next time result, or an error if the schedule cannot be compiled.
func (e taskExecutor) getNextTime(s core.Scheduler, after time.Time) (scheduler.GetNextTimeResult, error) {
spec, err := s.CompiledSpec(e.SpecBuilder)
Copy link
Member

@dnr dnr Dec 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just curious if these can get cached at all. it's not that expensive to construct so probably not worth it, except for backfills where it's doing this in a loop during one transition


oh, I see, the caching is elsewhere. nm

if err != nil {
e.Logger.Error("Invalid schedule", tag.Error(err))
return scheduler1.GetNextTimeResult{}, err
}

return spec.GetNextTime(s.JitterSeed(), after), nil
}

// Loads the generator's persisted state, returning a cloned copy.
func (e taskExecutor) loadGenerator(ctx context.Context, env hsm.Environment, ref hsm.Ref) (generator Generator, err error) {
err = env.Access(ctx, ref, hsm.AccessRead, func(node *hsm.Node) error {
prevGenerator, err := hsm.MachineData[Generator](node)
generator = Generator{
GeneratorInternalState: servercommon.CloneProto(prevGenerator.GeneratorInternalState),
}
return err
})
return
}
90 changes: 90 additions & 0 deletions components/scheduler2/generator/statemachine.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package generator

import (
"fmt"

enumsspb "go.temporal.io/server/api/enums/v1"
schedspb "go.temporal.io/server/api/schedule/v1"
"go.temporal.io/server/service/history/hsm"
"google.golang.org/protobuf/proto"
)

type (
// The Generator substate machine is responsible for buffering actions according
// to the schedule's specification. Manually requested actions (from an immediate
// request or backfill) are separately handled in the Backfiller substate machine.
Generator struct {
*schedspb.GeneratorInternalState
}

// The machine definition provides serialization/deserialization and type information.
machineDefinition struct{}
)

const (
// Unique identifier for the Generator substate machine.
MachineType = "scheduler.Generator"
)

var (
_ hsm.StateMachine[enumsspb.SchedulerGeneratorState] = Generator{}
_ hsm.StateMachineDefinition = &machineDefinition{}

// Each substate machine is a singleton of the top-level Scheduler, accessed with
// a fixed key
MachineKey = hsm.Key{Type: MachineType, ID: ""}
)

func (g Generator) State() enumsspb.SchedulerGeneratorState {
return g.GeneratorInternalState.State
}

func (g Generator) SetState(state enumsspb.SchedulerGeneratorState) {
g.GeneratorInternalState.State = state
}

func (g Generator) RegenerateTasks(node *hsm.Node) ([]hsm.Task, error) {
return g.tasks()
}

func (machineDefinition) Type() string {
return MachineType
}

func (machineDefinition) Serialize(state any) ([]byte, error) {
if state, ok := state.(Generator); ok {
return proto.Marshal(state.GeneratorInternalState)
}
return nil, fmt.Errorf("invalid generator state provided: %v", state)
}

func (machineDefinition) Deserialize(body []byte) (any, error) {
state := &schedspb.GeneratorInternalState{}
return Generator{
GeneratorInternalState: state,
}, proto.Unmarshal(body, state)
}

// Returns:
//
// 0 when states are equal
// 1 when a is newer than b
// -1 when b is newer than a
func (machineDefinition) CompareState(a any, b any) (int, error) {
s1, ok := a.(Generator)
if !ok {
return 0, fmt.Errorf("%w: expected state1 to be a Generator instance, got %v", hsm.ErrIncompatibleType, s1)
}
s2, ok := a.(Generator)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
s2, ok := a.(Generator)
s2, ok := b.(Generator)

if !ok {
return 0, fmt.Errorf("%w: expected state1 to be a Generator instance, got %v", hsm.ErrIncompatibleType, s2)
}

if s1.State() > s2.State() {
return 1, nil
} else if s1.State() < s2.State() {
return -1, nil
}

return 0, nil
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if s1.State() > s2.State() {
return 1, nil
} else if s1.State() < s2.State() {
return -1, nil
}
return 0, nil
return cmp.Compare(s1.State(), s2.State()), nil

}
Loading
Loading