Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add attempt count to HSM tasks #6982

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
328 changes: 169 additions & 159 deletions api/persistence/v1/hsm.pb.go

Large diffs are not rendered by default.

8 changes: 8 additions & 0 deletions components/callbacks/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ func (t InvocationTask) Deadline() time.Time {
return hsm.Immediate
}

func (InvocationTask) Attempt() int32 {
return 0
}

func (InvocationTask) Validate(ref *persistencespb.StateMachineRef, node *hsm.Node) error {
return hsm.ValidateState[enumsspb.CallbackState, Callback](node, enumsspb.CALLBACK_STATE_SCHEDULED)
}
Expand Down Expand Up @@ -87,6 +91,10 @@ func (t BackoffTask) Deadline() time.Time {
return t.deadline
}

func (BackoffTask) Attempt() int32 {
return 0
}

func (BackoffTask) Destination() string {
return ""
}
Expand Down
8 changes: 8 additions & 0 deletions components/dummy/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ func (t ImmediateTask) Destination() string {
return t.destination
}

func (ImmediateTask) Attempt() int32 {
return 0
}

func (ImmediateTask) Validate(ref *persistencespb.StateMachineRef, node *hsm.Node) error {
return hsm.ValidateNotTransitioned(ref, node)
}
Expand Down Expand Up @@ -86,6 +90,10 @@ func (TimerTask) Destination() string {
return ""
}

func (TimerTask) Attempt() int32 {
return 0
}

func (t TimerTask) Validate(ref *persistencespb.StateMachineRef, node *hsm.Node) error {
if !t.concurrent {
return hsm.ValidateNotTransitioned(ref, node)
Expand Down
2 changes: 1 addition & 1 deletion components/nexusoperations/executors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ func TestProcessInvocationTask(t *testing.T) {
},
{
name: "invocation timeout by request timeout",
requestTimeout: 2 * time.Millisecond,
requestTimeout: 10 * time.Millisecond,
schedToCloseTimeout: time.Hour,
destinationDown: true,
expectedMetricOutcome: "request-timeout",
Expand Down
4 changes: 2 additions & 2 deletions components/nexusoperations/statemachine.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func (o Operation) transitionTasks() ([]hsm.Task, error) {
case enumsspb.NEXUS_OPERATION_STATE_BACKING_OFF:
return []hsm.Task{BackoffTask{deadline: o.NextAttemptScheduleTime.AsTime()}}, nil
case enumsspb.NEXUS_OPERATION_STATE_SCHEDULED:
return []hsm.Task{InvocationTask{EndpointName: o.Endpoint}}, nil
return []hsm.Task{InvocationTask{EndpointName: o.Endpoint, attempt: o.Attempt}}, nil
default:
return nil, nil
}
Expand Down Expand Up @@ -581,7 +581,7 @@ func (c Cancelation) RegenerateTasks(node *hsm.Node) ([]hsm.Task, error) {
}
switch c.State() { // nolint:exhaustive
case enumspb.NEXUS_OPERATION_CANCELLATION_STATE_SCHEDULED:
return []hsm.Task{CancelationTask{EndpointName: op.Endpoint}}, nil
return []hsm.Task{CancelationTask{EndpointName: op.Endpoint, attempt: c.Attempt}}, nil
case enumspb.NEXUS_OPERATION_CANCELLATION_STATE_BACKING_OFF:
return []hsm.Task{CancelationBackoffTask{deadline: c.NextAttemptScheduleTime.AsTime()}}, nil
default:
Expand Down
2 changes: 2 additions & 0 deletions components/nexusoperations/statemachine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ func TestRegenerateTasks(t *testing.T) {
require.Equal(t, 2, len(tasks))
require.Equal(t, nexusoperations.TaskTypeInvocation, tasks[0].Type())
require.Equal(t, tasks[0].(nexusoperations.InvocationTask).EndpointName, "endpoint")
require.Equal(t, tasks[0].(nexusoperations.InvocationTask).Attempt(), int32(0))
require.Equal(t, nexusoperations.TaskTypeTimeout, tasks[1].Type())
},
},
Expand Down Expand Up @@ -221,6 +222,7 @@ func TestRetry(t *testing.T) {
require.Equal(t, 1, len(transitionOp.Output.Tasks))
invocationTask := transitionOp.Output.Tasks[0].(nexusoperations.InvocationTask) // nolint:revive
require.Equal(t, "endpoint", invocationTask.EndpointName)
require.Equal(t, int32(1), invocationTask.Attempt())
op, err = hsm.MachineData[nexusoperations.Operation](node)
require.NoError(t, err)
require.Equal(t, enumsspb.NEXUS_OPERATION_STATE_SCHEDULED, op.State())
Expand Down
26 changes: 24 additions & 2 deletions components/nexusoperations/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ func (TimeoutTask) Destination() string {
return ""
}

func (TimeoutTask) Attempt() int32 {
return 0
}

// Validate checks if the timeout task is still valid to execute for the given node state.
func (t TimeoutTask) Validate(ref *persistencespb.StateMachineRef, node *hsm.Node) error {
if err := node.CheckRunning(); err != nil {
Expand Down Expand Up @@ -91,6 +95,7 @@ func (TimeoutTaskSerializer) Serialize(hsm.Task) ([]byte, error) {

type InvocationTask struct {
EndpointName string
attempt int32
}

var _ hsm.Task = InvocationTask{}
Expand All @@ -107,6 +112,10 @@ func (t InvocationTask) Destination() string {
return t.EndpointName
}

func (t InvocationTask) Attempt() int32 {
return t.attempt
}

func (InvocationTask) Validate(ref *persistencespb.StateMachineRef, node *hsm.Node) error {
if err := node.CheckRunning(); err != nil {
return err
Expand All @@ -117,7 +126,7 @@ func (InvocationTask) Validate(ref *persistencespb.StateMachineRef, node *hsm.No
type InvocationTaskSerializer struct{}

func (InvocationTaskSerializer) Deserialize(data []byte, attrs hsm.TaskAttributes) (hsm.Task, error) {
return InvocationTask{EndpointName: attrs.Destination}, nil
return InvocationTask{EndpointName: attrs.Destination, attempt: attrs.Attempt}, nil
}

func (InvocationTaskSerializer) Serialize(hsm.Task) ([]byte, error) {
Expand All @@ -142,6 +151,10 @@ func (t BackoffTask) Destination() string {
return ""
}

func (BackoffTask) Attempt() int32 {
return 0
}

func (t BackoffTask) Validate(_ *persistencespb.StateMachineRef, node *hsm.Node) error {
if err := node.CheckRunning(); err != nil {
return err
Expand All @@ -161,6 +174,7 @@ func (BackoffTaskSerializer) Serialize(hsm.Task) ([]byte, error) {

type CancelationTask struct {
EndpointName string
attempt int32
}

var _ hsm.Task = CancelationTask{}
Expand All @@ -177,6 +191,10 @@ func (t CancelationTask) Destination() string {
return t.EndpointName
}

func (t CancelationTask) Attempt() int32 {
return t.attempt
}

func (CancelationTask) Validate(ref *persistencespb.StateMachineRef, node *hsm.Node) error {
if err := node.CheckRunning(); err != nil {
return err
Expand All @@ -187,7 +205,7 @@ func (CancelationTask) Validate(ref *persistencespb.StateMachineRef, node *hsm.N
type CancelationTaskSerializer struct{}

func (CancelationTaskSerializer) Deserialize(data []byte, attrs hsm.TaskAttributes) (hsm.Task, error) {
return CancelationTask{EndpointName: attrs.Destination}, nil
return CancelationTask{EndpointName: attrs.Destination, attempt: attrs.Attempt}, nil
}

func (CancelationTaskSerializer) Serialize(hsm.Task) ([]byte, error) {
Expand All @@ -212,6 +230,10 @@ func (CancelationBackoffTask) Destination() string {
return ""
}

func (CancelationBackoffTask) Attempt() int32 {
return 0
}

func (CancelationBackoffTask) Validate(ref *persistencespb.StateMachineRef, node *hsm.Node) error {
if err := node.CheckRunning(); err != nil {
return err
Expand Down
2 changes: 2 additions & 0 deletions proto/internal/temporal/server/api/persistence/v1/hsm.proto
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,8 @@ message StateMachineTaskInfo {
string type = 2;
// Opaque data attached to this task. May be nil. Deserialized by a registered TaskSerializer for this type.
bytes data = 3;
// Task attempt count.
int32 attempt = 4;
}

// A group of state machine timer tasks for a given deadline, used for collapsing state machine timer tasks.
Expand Down
4 changes: 4 additions & 0 deletions service/history/hsm/hsmtest/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ func (t *Task) Destination() string {
return t.attrs.Destination
}

func (t *Task) Attempt() int32 {
return t.attrs.Attempt
}

func (t *Task) Validate(ref *persistencespb.StateMachineRef, node *hsm.Node) error {
if t.IsConcurrent {
return hsm.ValidateNotTransitioned(ref, node)
Expand Down
3 changes: 3 additions & 0 deletions service/history/hsm/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ type Task interface {
// If a destination is set, a task will be scheduled on the outbound queue.
// Currently Destination and Deadline are mutually exclusive.
Destination() string
// The attempt count for this task. Tasks may return 0 if the attempt count is not relevant for that task type.
Attempt() int32
// Validate checks if the task is still valid for processing for the current node state.
// Implementors may return [ErrStaleReference] or [consts.ErrWorkflowCompleted] if the task is no longer valid.
// A typical implementation may use [node.CheckRunning], [ValidateNotTransitioned], or check if the state of the
Expand All @@ -66,6 +68,7 @@ type Task interface {
type TaskAttributes struct {
Deadline time.Time
Destination string
Attempt int32
}

// TaskSerializer provides type information and a serializer for a state machine.
Expand Down
7 changes: 4 additions & 3 deletions service/history/workflow/task_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -899,9 +899,10 @@ func generateSubStateMachineTask(
}

taskInfo := &persistencespb.StateMachineTaskInfo{
Ref: ref,
Type: task.Type(),
Data: data,
Ref: ref,
Type: task.Type(),
Data: data,
Attempt: task.Attempt(),
}
// NOTE: at the moment deadline is mutually exclusive with destination.
// This will change when we add the outbound timer queue.
Expand Down
5 changes: 3 additions & 2 deletions service/history/workflow/task_generator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -436,8 +436,9 @@ func TestTaskGenerator_GenerateDirtySubStateMachineTasks(t *testing.T) {
},
MachineTransitionCount: 1,
},
Type: callbacks.TaskTypeInvocation,
Data: nil,
Type: callbacks.TaskTypeInvocation,
Data: nil,
Attempt: int32(0),
}, invocationTask.Info)

require.Equal(t, tests.WorkflowKey, backoffTask.WorkflowKey)
Expand Down
Loading