Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Nexus: Delete state machine on terminal state -- Part 3 #6984

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 42 additions & 11 deletions components/nexusoperations/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (d ScheduledEventDefinition) Apply(root *hsm.Node, event *historypb.History
if err != nil {
return err
}
_, err = AddChild(root, strconv.FormatInt(event.EventId, 10), event, token, true)
_, err = AddChild(root, strconv.FormatInt(event.EventId, 10), event, token)
return err
}

Expand All @@ -67,9 +67,11 @@ func (d CancelRequestedEventDefinition) Type() enumspb.EventType {
}

func (d CancelRequestedEventDefinition) Apply(root *hsm.Node, event *historypb.HistoryEvent) error {
return transitionOperation(root, event, func(node *hsm.Node, o Operation) (hsm.TransitionOutput, error) {
_, err := transitionOperation(root, event, func(node *hsm.Node, o Operation) (hsm.TransitionOutput, error) {
return o.Cancel(node, event.EventTime.AsTime())
})

return err
}

func (d CancelRequestedEventDefinition) CherryPick(root *hsm.Node, event *historypb.HistoryEvent, _ map[enumspb.ResetReapplyExcludeType]struct{}) error {
Expand All @@ -88,13 +90,15 @@ func (d StartedEventDefinition) Type() enumspb.EventType {
}

func (d StartedEventDefinition) Apply(root *hsm.Node, event *historypb.HistoryEvent) error {
return transitionOperation(root, event, func(node *hsm.Node, o Operation) (hsm.TransitionOutput, error) {
_, err := transitionOperation(root, event, func(node *hsm.Node, o Operation) (hsm.TransitionOutput, error) {
return TransitionStarted.Apply(o, EventStarted{
Time: event.EventTime.AsTime(),
Node: node,
Attributes: event.GetNexusOperationStartedEventAttributes(),
})
})

return err
}

func (d StartedEventDefinition) CherryPick(root *hsm.Node, event *historypb.HistoryEvent, excludeTypes map[enumspb.ResetReapplyExcludeType]struct{}) error {
Expand All @@ -111,12 +115,17 @@ func (d CompletedEventDefinition) IsWorkflowTaskTrigger() bool {
}

func (d CompletedEventDefinition) Apply(root *hsm.Node, event *historypb.HistoryEvent) error {
return transitionOperation(root, event, func(node *hsm.Node, o Operation) (hsm.TransitionOutput, error) {
node, err := transitionOperation(root, event, func(node *hsm.Node, o Operation) (hsm.TransitionOutput, error) {
return TransitionSucceeded.Apply(o, EventSucceeded{
Time: event.EventTime.AsTime(),
Node: node,
})
})
if err != nil {
return err
}

return root.DeleteChild(node.Key)
}

func (d CompletedEventDefinition) Type() enumspb.EventType {
Expand All @@ -141,13 +150,18 @@ func (d FailedEventDefinition) Type() enumspb.EventType {
}

func (d FailedEventDefinition) Apply(root *hsm.Node, event *historypb.HistoryEvent) error {
return transitionOperation(root, event, func(node *hsm.Node, o Operation) (hsm.TransitionOutput, error) {
node, err := transitionOperation(root, event, func(node *hsm.Node, o Operation) (hsm.TransitionOutput, error) {
return TransitionFailed.Apply(o, EventFailed{
Time: event.EventTime.AsTime(),
Attributes: event.GetNexusOperationFailedEventAttributes(),
Node: node,
})
})
if err != nil {
return err
}

return root.DeleteChild(node.Key)
}

func (d FailedEventDefinition) CherryPick(root *hsm.Node, event *historypb.HistoryEvent, excludeTypes map[enumspb.ResetReapplyExcludeType]struct{}) error {
Expand All @@ -168,12 +182,17 @@ func (d CanceledEventDefinition) Type() enumspb.EventType {
}

func (d CanceledEventDefinition) Apply(root *hsm.Node, event *historypb.HistoryEvent) error {
return transitionOperation(root, event, func(node *hsm.Node, o Operation) (hsm.TransitionOutput, error) {
node, err := transitionOperation(root, event, func(node *hsm.Node, o Operation) (hsm.TransitionOutput, error) {
return TransitionCanceled.Apply(o, EventCanceled{
Time: event.EventTime.AsTime(),
Node: node,
})
})
if err != nil {
return err
}

return root.DeleteChild(node.Key)
}

func (d CanceledEventDefinition) CherryPick(root *hsm.Node, event *historypb.HistoryEvent, excludeTypes map[enumspb.ResetReapplyExcludeType]struct{}) error {
Expand All @@ -194,11 +213,16 @@ func (d TimedOutEventDefinition) Type() enumspb.EventType {
}

func (d TimedOutEventDefinition) Apply(root *hsm.Node, event *historypb.HistoryEvent) error {
return transitionOperation(root, event, func(node *hsm.Node, o Operation) (hsm.TransitionOutput, error) {
node, err := transitionOperation(root, event, func(node *hsm.Node, o Operation) (hsm.TransitionOutput, error) {
return TransitionTimedOut.Apply(o, EventTimedOut{
Node: node,
})
})
if err != nil {
return err
}

return root.DeleteChild(node.Key)
}

func (d TimedOutEventDefinition) CherryPick(root *hsm.Node, event *historypb.HistoryEvent, excludeTypes map[enumspb.ResetReapplyExcludeType]struct{}) error {
Expand Down Expand Up @@ -230,14 +254,21 @@ func RegisterEventDefinitions(reg *hsm.Registry) error {
return reg.RegisterEventDefinition(TimedOutEventDefinition{})
}

func transitionOperation(root *hsm.Node, event *historypb.HistoryEvent, fn func(node *hsm.Node, o Operation) (hsm.TransitionOutput, error)) error {
func transitionOperation(
root *hsm.Node,
event *historypb.HistoryEvent,
fn func(node *hsm.Node, o Operation) (hsm.TransitionOutput, error),
) (*hsm.Node, error) {
node, err := findOperationNode(root, event)
if err != nil {
return err
return nil, err
}
return hsm.MachineTransition(node, func(o Operation) (hsm.TransitionOutput, error) {
if err := hsm.MachineTransition(node, func(o Operation) (hsm.TransitionOutput, error) {
return fn(node, o)
})
}); err != nil {
return nil, err
}
return node, nil
}

func findOperationNode(root *hsm.Node, event *historypb.HistoryEvent) (*hsm.Node, error) {
Expand Down
107 changes: 107 additions & 0 deletions components/nexusoperations/events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
package nexusoperations_test

import (
"strconv"
"testing"
"time"

Expand All @@ -32,6 +33,7 @@ import (
"go.temporal.io/server/components/nexusoperations"
"go.temporal.io/server/service/history/hsm"
"go.temporal.io/server/service/history/hsm/hsmtest"
"google.golang.org/protobuf/types/known/timestamppb"
)

func TestCherryPick(t *testing.T) {
Expand Down Expand Up @@ -140,3 +142,108 @@ func TestCherryPick(t *testing.T) {
}
})
}

func TestTerminalStatesDeletion(t *testing.T) {
applyEventAndCheckDeletion := func(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can just inline this in the loop body, there's no value in extracting to a function.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done!

t *testing.T,
node *hsm.Node,
eventID int64,
def hsm.EventDefinition,
attr interface{},
) {
event := &historypb.HistoryEvent{
EventTime: timestamppb.Now(),
}

switch d := def.(type) {
case nexusoperations.CompletedEventDefinition:
event.EventType = d.Type()
event.Attributes = &historypb.HistoryEvent_NexusOperationCompletedEventAttributes{
NexusOperationCompletedEventAttributes: attr.(*historypb.NexusOperationCompletedEventAttributes),
}
case nexusoperations.FailedEventDefinition:
event.EventType = d.Type()
event.Attributes = &historypb.HistoryEvent_NexusOperationFailedEventAttributes{
NexusOperationFailedEventAttributes: attr.(*historypb.NexusOperationFailedEventAttributes),
}
case nexusoperations.CanceledEventDefinition:
event.EventType = d.Type()
event.Attributes = &historypb.HistoryEvent_NexusOperationCanceledEventAttributes{
NexusOperationCanceledEventAttributes: attr.(*historypb.NexusOperationCanceledEventAttributes),
}
case nexusoperations.TimedOutEventDefinition:
event.EventType = d.Type()
event.Attributes = &historypb.HistoryEvent_NexusOperationTimedOutEventAttributes{
NexusOperationTimedOutEventAttributes: attr.(*historypb.NexusOperationTimedOutEventAttributes),
}
default:
t.Fatalf("unknown event definition type: %T", def)
}

err := def.Apply(node.Parent, event)
require.NoError(t, err)

coll := nexusoperations.MachineCollection(node.Parent)
_, err = coll.Node(strconv.FormatInt(eventID, 10))
require.ErrorIs(t, err, hsm.ErrStateMachineNotFound)
}

testCases := []struct {
name string
def hsm.EventDefinition
attributes interface{}
}{
{
name: "CompletedDeletesStateMachine",
def: nexusoperations.CompletedEventDefinition{},
attributes: &historypb.NexusOperationCompletedEventAttributes{
ScheduledEventId: 0,
},
},
{
name: "FailedDeletesStateMachine",
def: nexusoperations.FailedEventDefinition{},
attributes: &historypb.NexusOperationFailedEventAttributes{
ScheduledEventId: 0,
},
},
{
name: "CanceledDeletesStateMachine",
def: nexusoperations.CanceledEventDefinition{},
attributes: &historypb.NexusOperationCanceledEventAttributes{
ScheduledEventId: 0,
},
},
{
name: "TimedOutDeletesStateMachine",
def: nexusoperations.TimedOutEventDefinition{},
attributes: &historypb.NexusOperationTimedOutEventAttributes{
ScheduledEventId: 0,
},
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
node := newOperationNode(t, &hsmtest.NodeBackend{}, mustNewScheduledEvent(time.Now(), time.Hour))
op, err := hsm.MachineData[nexusoperations.Operation](node)
require.NoError(t, err)
eventID, err := hsm.EventIDFromToken(op.ScheduledEventToken)
require.NoError(t, err)

// Update the event ID in attributes
switch a := tc.attributes.(type) {
case *historypb.NexusOperationCompletedEventAttributes:
a.ScheduledEventId = eventID
case *historypb.NexusOperationFailedEventAttributes:
a.ScheduledEventId = eventID
case *historypb.NexusOperationCanceledEventAttributes:
a.ScheduledEventId = eventID
case *historypb.NexusOperationTimedOutEventAttributes:
a.ScheduledEventId = eventID
}

applyEventAndCheckDeletion(t, node, eventID, tc.def, tc.attributes)
})
}
}
2 changes: 1 addition & 1 deletion components/nexusoperations/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func newOperationNode(t *testing.T, backend *hsmtest.NodeBackend, event *history
root := newRoot(t, backend)
token, err := hsm.GenerateEventLoadToken(event)
require.NoError(t, err)
node, err := nexusoperations.AddChild(root, fmt.Sprintf("%d", event.EventId), event, token, false)
node, err := nexusoperations.AddChild(root, fmt.Sprintf("%d", event.EventId), event, token)
require.NoError(t, err)
return node
}
Expand Down
6 changes: 2 additions & 4 deletions components/nexusoperations/statemachine.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ type Operation struct {
}

// AddChild adds a new operation child machine to the given node and transitions it to the SCHEDULED state.
func AddChild(node *hsm.Node, id string, event *historypb.HistoryEvent, eventToken []byte, deleteOnCompletion bool) (*hsm.Node, error) {
func AddChild(node *hsm.Node, id string, event *historypb.HistoryEvent, eventToken []byte) (*hsm.Node, error) {
attrs := event.GetNexusOperationScheduledEventAttributes()

node, err := node.AddChild(hsm.Key{Type: OperationMachineType, ID: id}, Operation{
Expand All @@ -78,9 +78,7 @@ func AddChild(node *hsm.Node, id string, event *historypb.HistoryEvent, eventTok
ScheduleToCloseTimeout: attrs.ScheduleToCloseTimeout,
RequestId: attrs.RequestId,
State: enumsspb.NEXUS_OPERATION_STATE_UNSPECIFIED,
// TODO(bergundy): actually delete on completion if this is set.
DeleteOnCompletion: deleteOnCompletion,
ScheduledEventToken: eventToken,
ScheduledEventToken: eventToken,
},
})

Expand Down
2 changes: 1 addition & 1 deletion components/nexusoperations/statemachine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func TestAddChild(t *testing.T) {
},
},
}
child, err := nexusoperations.AddChild(root, "test-id", event, []byte("token"), false)
child, err := nexusoperations.AddChild(root, "test-id", event, []byte("token"))
require.NoError(t, err)
opLog, err := root.Outputs()
require.NoError(t, err)
Expand Down
37 changes: 18 additions & 19 deletions components/nexusoperations/workflow/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,34 +209,33 @@ func (ch *commandHandler) HandleCancelCommand(
Message: "empty CancelNexusOperationCommandAttributes",
}
}

coll := nexusoperations.MachineCollection(ms.HSM())
nodeID := strconv.FormatInt(attrs.ScheduledEventId, 10)
node, err := coll.Node(nodeID)
_, err := coll.Node(nodeID)
if err != nil {
if errors.Is(err, hsm.ErrStateMachineNotFound) {
if ms.HasAnyBufferedEvent(makeNexusOperationTerminalEventFilter(attrs.ScheduledEventId)) {
// Allow cancelation if there are buffered terminal events
event := ms.AddHistoryEvent(enumspb.EVENT_TYPE_NEXUS_OPERATION_CANCEL_REQUESTED, func(he *historypb.HistoryEvent) {
he.Attributes = &historypb.HistoryEvent_NexusOperationCancelRequestedEventAttributes{
NexusOperationCancelRequestedEventAttributes: &historypb.NexusOperationCancelRequestedEventAttributes{
ScheduledEventId: attrs.ScheduledEventId,
WorkflowTaskCompletedEventId: workflowTaskCompletedEventID,
},
}
he.UserMetadata = command.UserMetadata
})
return nexusoperations.CancelRequestedEventDefinition{}.Apply(ms.HSM(), event)
}
return workflow.FailWorkflowTaskError{
Cause: enumspb.WORKFLOW_TASK_FAILED_CAUSE_BAD_REQUEST_CANCEL_NEXUS_OPERATION_ATTRIBUTES,
Message: fmt.Sprintf("requested cancelation for a non-existing operation with scheduled event ID of %d", attrs.ScheduledEventId),
// TODO(bergundy): Message: fmt.Sprintf("requested cancelation for a non-existing or already completed operation with scheduled event ID of %d", attrs.ScheduledEventId),
Message: fmt.Sprintf("requested cancelation for a non-existing or already completed operation with scheduled event ID %d", attrs.ScheduledEventId),
}
}
return err
}
// TODO(bergundy): Remove this when operation auto-deletes itself on terminal state.
// Operation may already be in a terminal state because it doesn't yet delete itself. We don't want to accept
// cancelation in this case.
op, err := hsm.MachineData[nexusoperations.Operation](node)
if err != nil {
return err
}
// The operation is already in a terminal state and the terminal NexusOperation event has not just been buffered.
// We allow the workflow to request canceling an operation that has just completed while a workflow task is in
// flight since it cannot know about the state of the operation.
// TODO(bergundy): When we support state machine deletion, this condition will have to change.
if !nexusoperations.TransitionCanceled.Possible(op) && !ms.HasAnyBufferedEvent(makeNexusOperationTerminalEventFilter(attrs.ScheduledEventId)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You will need allow cancel while a terminal event is buffered (e.g. the SDK isn't aware yet that the operation has completed).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The check for buffered terminal event should be moved to where we error out when the node is not found.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is what you have in mind. Please take a look

return workflow.FailWorkflowTaskError{
Cause: enumspb.WORKFLOW_TASK_FAILED_CAUSE_BAD_REQUEST_CANCEL_NEXUS_OPERATION_ATTRIBUTES,
Message: fmt.Sprintf("requested cancelation for an already complete operation with scheduled event ID of %d", attrs.ScheduledEventId),
Message: fmt.Sprintf("error looking up operation with scheduled event ID %d: %v", attrs.ScheduledEventId, err),
}
}

Expand All @@ -256,7 +255,7 @@ func (ch *commandHandler) HandleCancelCommand(
if errors.Is(err, hsm.ErrStateMachineAlreadyExists) {
return workflow.FailWorkflowTaskError{
Cause: enumspb.WORKFLOW_TASK_FAILED_CAUSE_BAD_REQUEST_CANCEL_NEXUS_OPERATION_ATTRIBUTES,
Message: fmt.Sprintf("cancelation was already requested for an operation with scheduled event ID of %d", attrs.ScheduledEventId),
Message: fmt.Sprintf("requested cancelation for operation with scheduled event ID %d that is already being canceled", attrs.ScheduledEventId),
}
}

Expand Down
Loading
Loading