Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Nexus: Delete state machine on terminal state -- Part 3 #6984

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 36 additions & 4 deletions components/nexusoperations/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,18 @@ func (d CompletedEventDefinition) IsWorkflowTaskTrigger() bool {

func (d CompletedEventDefinition) Apply(root *hsm.Node, event *historypb.HistoryEvent) error {
return transitionOperation(root, event, func(node *hsm.Node, o Operation) (hsm.TransitionOutput, error) {
return TransitionSucceeded.Apply(o, EventSucceeded{
output, err := TransitionSucceeded.Apply(o, EventSucceeded{
Time: event.EventTime.AsTime(),
Node: node,
})
if err != nil {
return output, err
}

if err := root.DeleteChild(node.Key); err != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically you'll want to separate this into two operations, a transition, followed by a deletion. Move the DeleteChild call to after the transitionOperation call.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's also a boolean on the operation state saying whether it should be deleted on completion but we can get rid of that and just delete here always.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

return output, err
}
return output, nil
})
}

Expand All @@ -142,11 +150,19 @@ func (d FailedEventDefinition) Type() enumspb.EventType {

func (d FailedEventDefinition) Apply(root *hsm.Node, event *historypb.HistoryEvent) error {
return transitionOperation(root, event, func(node *hsm.Node, o Operation) (hsm.TransitionOutput, error) {
return TransitionFailed.Apply(o, EventFailed{
output, err := TransitionFailed.Apply(o, EventFailed{
Time: event.EventTime.AsTime(),
Attributes: event.GetNexusOperationFailedEventAttributes(),
Node: node,
})
if err != nil {
return output, err
}

if err := root.DeleteChild(node.Key); err != nil {
return output, err
}
return output, nil
})
}

Expand All @@ -169,10 +185,18 @@ func (d CanceledEventDefinition) Type() enumspb.EventType {

func (d CanceledEventDefinition) Apply(root *hsm.Node, event *historypb.HistoryEvent) error {
return transitionOperation(root, event, func(node *hsm.Node, o Operation) (hsm.TransitionOutput, error) {
return TransitionCanceled.Apply(o, EventCanceled{
output, err := TransitionCanceled.Apply(o, EventCanceled{
Time: event.EventTime.AsTime(),
Node: node,
})
if err != nil {
return output, err
}

if err := root.DeleteChild(node.Key); err != nil {
return output, err
}
return output, nil
})
}

Expand All @@ -195,9 +219,17 @@ func (d TimedOutEventDefinition) Type() enumspb.EventType {

func (d TimedOutEventDefinition) Apply(root *hsm.Node, event *historypb.HistoryEvent) error {
return transitionOperation(root, event, func(node *hsm.Node, o Operation) (hsm.TransitionOutput, error) {
return TransitionTimedOut.Apply(o, EventTimedOut{
output, err := TransitionTimedOut.Apply(o, EventTimedOut{
Node: node,
})
if err != nil {
return output, err
}

if err := root.DeleteChild(node.Key); err != nil {
return output, err
}
return output, nil
})
}

Expand Down
112 changes: 112 additions & 0 deletions components/nexusoperations/events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
package nexusoperations_test

import (
"strconv"
"testing"
"time"

Expand All @@ -32,6 +33,7 @@ import (
"go.temporal.io/server/components/nexusoperations"
"go.temporal.io/server/service/history/hsm"
"go.temporal.io/server/service/history/hsm/hsmtest"
"google.golang.org/protobuf/types/known/timestamppb"
)

func TestCherryPick(t *testing.T) {
Expand Down Expand Up @@ -140,3 +142,113 @@ func TestCherryPick(t *testing.T) {
}
})
}

func TestTerminalStatesDeletion(t *testing.T) {
setup := func(t *testing.T) (*hsm.Node, nexusoperations.Operation, int64) {
node := newOperationNode(t, &hsmtest.NodeBackend{}, mustNewScheduledEvent(time.Now(), time.Hour))
op, err := hsm.MachineData[nexusoperations.Operation](node)
require.NoError(t, err)
eventID, err := hsm.EventIDFromToken(op.ScheduledEventToken)
require.NoError(t, err)
return node, op, eventID
}

applyEventAndCheckDeletion := func(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can just inline this in the loop body, there's no value in extracting to a function.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done!

t *testing.T,
node *hsm.Node,
eventID int64,
def hsm.EventDefinition,
attr interface{},
) {
event := &historypb.HistoryEvent{
EventTime: timestamppb.Now(),
}

switch d := def.(type) {
case nexusoperations.CompletedEventDefinition:
event.EventType = d.Type()
event.Attributes = &historypb.HistoryEvent_NexusOperationCompletedEventAttributes{
NexusOperationCompletedEventAttributes: attr.(*historypb.NexusOperationCompletedEventAttributes),
}
case nexusoperations.FailedEventDefinition:
event.EventType = d.Type()
event.Attributes = &historypb.HistoryEvent_NexusOperationFailedEventAttributes{
NexusOperationFailedEventAttributes: attr.(*historypb.NexusOperationFailedEventAttributes),
}
case nexusoperations.CanceledEventDefinition:
event.EventType = d.Type()
event.Attributes = &historypb.HistoryEvent_NexusOperationCanceledEventAttributes{
NexusOperationCanceledEventAttributes: attr.(*historypb.NexusOperationCanceledEventAttributes),
}
case nexusoperations.TimedOutEventDefinition:
event.EventType = d.Type()
event.Attributes = &historypb.HistoryEvent_NexusOperationTimedOutEventAttributes{
NexusOperationTimedOutEventAttributes: attr.(*historypb.NexusOperationTimedOutEventAttributes),
}
default:
t.Fatalf("unknown event definition type: %T", def)
}

err := def.Apply(node.Parent, event)
require.NoError(t, err)

coll := nexusoperations.MachineCollection(node.Parent)
_, err = coll.Node(strconv.FormatInt(eventID, 10))
require.ErrorIs(t, err, hsm.ErrStateMachineNotFound)
}

testCases := []struct {
name string
def hsm.EventDefinition
attributes interface{}
}{
{
name: "CompletedDeletesStateMachine",
def: nexusoperations.CompletedEventDefinition{},
attributes: &historypb.NexusOperationCompletedEventAttributes{
ScheduledEventId: 0,
},
},
{
name: "FailedDeletesStateMachine",
def: nexusoperations.FailedEventDefinition{},
attributes: &historypb.NexusOperationFailedEventAttributes{
ScheduledEventId: 0,
},
},
{
name: "CanceledDeletesStateMachine",
def: nexusoperations.CanceledEventDefinition{},
attributes: &historypb.NexusOperationCanceledEventAttributes{
ScheduledEventId: 0,
},
},
{
name: "TimedOutDeletesStateMachine",
def: nexusoperations.TimedOutEventDefinition{},
attributes: &historypb.NexusOperationTimedOutEventAttributes{
ScheduledEventId: 0,
},
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
node, _, eventID := setup(t)

// Update the event ID in attributes
switch a := tc.attributes.(type) {
case *historypb.NexusOperationCompletedEventAttributes:
a.ScheduledEventId = eventID
case *historypb.NexusOperationFailedEventAttributes:
a.ScheduledEventId = eventID
case *historypb.NexusOperationCanceledEventAttributes:
a.ScheduledEventId = eventID
case *historypb.NexusOperationTimedOutEventAttributes:
a.ScheduledEventId = eventID
}

applyEventAndCheckDeletion(t, node, eventID, tc.def, tc.attributes)
})
}
}
19 changes: 7 additions & 12 deletions components/nexusoperations/workflow/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,27 +216,22 @@ func (ch *commandHandler) HandleCancelCommand(
if errors.Is(err, hsm.ErrStateMachineNotFound) {
return workflow.FailWorkflowTaskError{
Cause: enumspb.WORKFLOW_TASK_FAILED_CAUSE_BAD_REQUEST_CANCEL_NEXUS_OPERATION_ATTRIBUTES,
Message: fmt.Sprintf("requested cancelation for a non-existing operation with scheduled event ID of %d", attrs.ScheduledEventId),
// TODO(bergundy): Message: fmt.Sprintf("requested cancelation for a non-existing or already completed operation with scheduled event ID of %d", attrs.ScheduledEventId),
Message: fmt.Sprintf("requested cancelation for a nonexistent or terminated operation with scheduled event ID %d", attrs.ScheduledEventId),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The message I had in the todo is better. The operation isn't terminated.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

}
}
return err
}
// TODO(bergundy): Remove this when operation auto-deletes itself on terminal state.
// Operation may already be in a terminal state because it doesn't yet delete itself. We don't want to accept
// cancelation in this case.

op, err := hsm.MachineData[nexusoperations.Operation](node)
if err != nil {
return err
}
// The operation is already in a terminal state and the terminal NexusOperation event has not just been buffered.
// We allow the workflow to request canceling an operation that has just completed while a workflow task is in
// flight since it cannot know about the state of the operation.
// TODO(bergundy): When we support state machine deletion, this condition will have to change.
if !nexusoperations.TransitionCanceled.Possible(op) && !ms.HasAnyBufferedEvent(makeNexusOperationTerminalEventFilter(attrs.ScheduledEventId)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You will need allow cancel while a terminal event is buffered (e.g. the SDK isn't aware yet that the operation has completed).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The check for buffered terminal event should be moved to where we error out when the node is not found.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is what you have in mind. Please take a look


// Operation exists but can't be canceled - must be in terminal state
if !nexusoperations.TransitionCanceled.Possible(op) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not possible anymore. And please distinguish between a terminal state and a terminated operation. The latter is not a term that we use.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

return workflow.FailWorkflowTaskError{
Cause: enumspb.WORKFLOW_TASK_FAILED_CAUSE_BAD_REQUEST_CANCEL_NEXUS_OPERATION_ATTRIBUTES,
Message: fmt.Sprintf("requested cancelation for an already complete operation with scheduled event ID of %d", attrs.ScheduledEventId),
Message: fmt.Sprintf("requested cancelation for operation with scheduled event ID %d that has already terminated", attrs.ScheduledEventId),
}
}

Expand All @@ -256,7 +251,7 @@ func (ch *commandHandler) HandleCancelCommand(
if errors.Is(err, hsm.ErrStateMachineAlreadyExists) {
return workflow.FailWorkflowTaskError{
Cause: enumspb.WORKFLOW_TASK_FAILED_CAUSE_BAD_REQUEST_CANCEL_NEXUS_OPERATION_ATTRIBUTES,
Message: fmt.Sprintf("cancelation was already requested for an operation with scheduled event ID of %d", attrs.ScheduledEventId),
Message: fmt.Sprintf("requested cancelation for operation with scheduled event ID %d that is already being canceled", attrs.ScheduledEventId),
}
}

Expand Down
Loading
Loading