Skip to content

Commit

Permalink
Convert panics in update protocol state machine to errors
Browse files Browse the repository at this point in the history
  • Loading branch information
Quinn-With-Two-Ns committed Oct 2, 2023
1 parent 159a69f commit beb457e
Show file tree
Hide file tree
Showing 7 changed files with 167 additions and 6 deletions.
2 changes: 1 addition & 1 deletion internal/internal_command_state_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ type (
versionMarkerLookup map[int64]versionMarker
}

// panic when command state machine is in illegal state
// panic when command or message state machine is in illegal state
stateMachineIllegalStatePanic struct {
message string
}
Expand Down
9 changes: 9 additions & 0 deletions internal/internal_event_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -1223,6 +1223,15 @@ func (weh *workflowExecutionEventHandlerImpl) ProcessMessage(
isReplay bool,
isLast bool,
) error {
defer func() {
if p := recover(); p != nil {
weh.metricsHandler.Counter(metrics.WorkflowTaskExecutionFailureCounter).Inc(1)
topLine := fmt.Sprintf("process message for %s [panic]:", weh.workflowInfo.TaskQueueName)
st := getStackTraceRaw(topLine, 7, 0)
weh.Complete(nil, newWorkflowPanicError(p, st))
}
}()

ctor, err := weh.protocolConstructorForMessage(msg)
if err != nil {
return nil
Expand Down
64 changes: 64 additions & 0 deletions internal/internal_event_handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -482,3 +482,67 @@ func TestUpdateEvents(t *testing.T) {
require.NoError(t, weh.ProcessEvent(&historypb.HistoryEvent{EventType: evtype}, false, false))
}
}

func TestUpdateEventsPanic(t *testing.T) {
mustPayload := func(i interface{}) *commonpb.Payload {
t.Helper()
p, err := converter.NewJSONPayloadConverter().ToPayload(i)
if err != nil {
t.FailNow()
}
return p
}

var (
gotName string
gotID string
gotArgs *commonpb.Payloads
gotHeader *commonpb.Header
)

weh := &workflowExecutionEventHandlerImpl{
workflowEnvironmentImpl: &workflowEnvironmentImpl{
updateHandler: func(name string, id string, args *commonpb.Payloads, header *commonpb.Header, cb UpdateCallbacks) {
gotName = name
gotID = id
gotArgs = args
gotHeader = header
},
protocols: protocol.NewRegistry(),
},
workflowDefinition: &mockWorkflowDefinition{
OnWorkflowTaskStartedFunc: func(time.Duration) {},
},
}

meta := &updatepb.Meta{
UpdateId: t.Name() + "-id",
Identity: t.Name() + "-identity",
}
input := &updatepb.Input{
Header: &commonpb.Header{Fields: map[string]*commonpb.Payload{"a": mustPayload("b")}},
Name: t.Name(),
Args: &commonpb.Payloads{Payloads: []*commonpb.Payload{mustPayload("arg0")}},
}

body, err := types.MarshalAny(&updatepb.Request{Meta: meta, Input: input})
require.NoError(t, err)

err = weh.ProcessMessage(&protocolpb.Message{
ProtocolInstanceId: t.Name(),
Body: body,
}, false, false)
require.NoError(t, err)

require.Equal(t, input.Name, gotName)
require.Equal(t, t.Name()+"-id", gotID)
require.True(t, proto.Equal(input.Header, gotHeader))
require.True(t, proto.Equal(input.Args, gotArgs))

require.Panics(t, func() {
weh.ProcessMessage(&protocolpb.Message{
ProtocolInstanceId: t.Name(),
Body: body,
}, false, false)
})
}
13 changes: 9 additions & 4 deletions internal/internal_task_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -1066,22 +1066,27 @@ ProcessEvents:
if err != nil {
return nil, err
}
if w.isWorkflowCompleted && !shouldForceReplayCheck() {
break ProcessEvents
}
}

err = eventHandler.ProcessEvent(event, isInReplay, isLast)
if err != nil {
return nil, err
}
if w.isWorkflowCompleted && !shouldForceReplayCheck() {
break ProcessEvents
}

for _, msg := range msgs.takeLTE(event.GetEventId()) {
err := eventHandler.ProcessMessage(msg, isInReplay, isLast)
if err != nil {
return nil, err
}
}

if w.isWorkflowCompleted && !shouldForceReplayCheck() {
break ProcessEvents
if w.isWorkflowCompleted && !shouldForceReplayCheck() {
break ProcessEvents
}
}
}

Expand Down
76 changes: 76 additions & 0 deletions internal/internal_task_handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1342,6 +1342,82 @@ func (t *TaskHandlersTestSuite) TestWorkflowTask_PageToken() {
t.NotNil(response)
}

func (t *TaskHandlersTestSuite) TestWorkflowTask_DuplicateMessagesPanic() {
//taskQueue := "taskQueue"
testEvents := []*historypb.HistoryEvent{
createTestEventWorkflowExecutionStarted(1, &historypb.WorkflowExecutionStartedEventAttributes{TaskQueue: &taskqueuepb.TaskQueue{Name: testWorkflowTaskTaskqueue}}),
createTestEventWorkflowTaskScheduled(2, &historypb.WorkflowTaskScheduledEventAttributes{}),
createTestEventWorkflowTaskStarted(3),
createTestEventWorkflowTaskCompleted(4, &historypb.WorkflowTaskCompletedEventAttributes{
ScheduledEventId: 2,
StartedEventId: 3,
SdkMetadata: &sdk.WorkflowTaskCompletedMetadata{
LangUsedFlags: []uint32{
3,
},
},
}),
{
EventId: 5,
EventType: enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_ACCEPTED,
Attributes: &historypb.HistoryEvent_WorkflowExecutionUpdateAcceptedEventAttributes{
WorkflowExecutionUpdateAcceptedEventAttributes: &historypb.WorkflowExecutionUpdateAcceptedEventAttributes{
AcceptedRequestSequencingEventId: 2,
ProtocolInstanceId: "test",
AcceptedRequest: &updatepb.Request{
Meta: &updatepb.Meta{
UpdateId: "test",
},
Input: &updatepb.Input{
Name: updateType,
},
},
},
},
},
{
EventId: 6,
EventType: enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_ACCEPTED,
Attributes: &historypb.HistoryEvent_WorkflowExecutionUpdateAcceptedEventAttributes{
WorkflowExecutionUpdateAcceptedEventAttributes: &historypb.WorkflowExecutionUpdateAcceptedEventAttributes{
AcceptedRequestSequencingEventId: 2,
ProtocolInstanceId: "test",
AcceptedRequest: &updatepb.Request{
Meta: &updatepb.Meta{
UpdateId: "test",
},
Input: &updatepb.Input{
Name: updateType,
},
},
},
},
},
}
// createWorkflowTask add a schedule and start event
task := createWorkflowTask(testEvents, 0, "HelloUpdate_Workflow")
task.NextPageToken = []byte("token")
task.PreviousStartedEventId = 14

params := t.getTestWorkerExecutionParams()

historyIterator := &historyIteratorImpl{
nextPageToken: []byte("token"),
iteratorFunc: func(nextToken []byte) (*historypb.History, []byte, error) {
return &historypb.History{Events: nil}, nil, nil
},
}
taskHandler := newWorkflowTaskHandler(params, nil, t.registry)
wftask := workflowTask{task: task, historyIterator: historyIterator}
wfctx := t.mustWorkflowContextImpl(&wftask, taskHandler)
request, err := taskHandler.ProcessWorkflowTask(&wftask, wfctx, nil)
wfctx.Unlock(err)
t.Error(err)
t.Nil(request)
_, ok := err.(*workflowPanicError)
t.True(ok)
}

func (t *TaskHandlersTestSuite) TestWorkflowTask_Messages() {
taskQueue := "taskQueue"
testEvents := []*historypb.HistoryEvent{
Expand Down
2 changes: 1 addition & 1 deletion internal/internal_update.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func (up *updateProtocol) requireState(action string, valid ...updateState) {
return
}
}
panic(fmt.Sprintf("invalid action %q in update protocol from state %s", action, up.state))
panicIllegalState(fmt.Sprintf("invalid action %q in update protocol %v", action, up))
}

func (up *updateProtocol) HandleMessage(msg *protocolpb.Message) error {
Expand Down
7 changes: 7 additions & 0 deletions test/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4085,6 +4085,13 @@ func (ts *IntegrationTestSuite) TestSendsCorrectMeteringData() {
w.Stop()
}

func (ts *IntegrationTestSuite) TestNondeterministicUpdateRegistertion() {
var expected []string
err := ts.executeWorkflow("test-activity-retry-options-change", ts.workflows.ActivityRetryOptionsChange, &expected)
ts.NoError(err)
ts.EqualValues(expected, ts.activities.invoked())
}

// executeWorkflow executes a given workflow and waits for the result
func (ts *IntegrationTestSuite) executeWorkflow(
wfID string, wfFunc interface{}, retValPtr interface{}, args ...interface{},
Expand Down

0 comments on commit beb457e

Please sign in to comment.