From 41e9b33d85c067b75b3fea81e563cfe76fbd785d Mon Sep 17 00:00:00 2001 From: Stephan Behnke Date: Mon, 9 Dec 2024 09:01:34 -0800 Subject: [PATCH] Tweak Update-with-Start error reporting (#1746) --- internal/internal_workflow_client.go | 16 +-- internal/internal_workflow_client_test.go | 120 +++++++++++++--------- 2 files changed, 82 insertions(+), 54 deletions(-) diff --git a/internal/internal_workflow_client.go b/internal/internal_workflow_client.go index aa924a79f..cd4045fc4 100644 --- a/internal/internal_workflow_client.go +++ b/internal/internal_workflow_client.go @@ -1761,7 +1761,7 @@ func (w *workflowClientInterceptor) UpdateWithStartWorkflow( // Create update request updateReq, err := w.createUpdateWorkflowRequest(ctx, updateInput) if err != nil { - return nil, fmt.Errorf("%w: %w", errInvalidWithStartWorkflowOperation, err) + return nil, err } if updateReq.WorkflowExecution.WorkflowId == "" { updateReq.WorkflowExecution.WorkflowId = startReq.WorkflowId @@ -1800,7 +1800,7 @@ func (w *workflowClientInterceptor) UpdateWithStartWorkflow( } handle, err := w.updateHandleFromResponse(ctx, updateReq.WaitPolicy.LifecycleStage, updateResp) if err != nil { - return nil, fmt.Errorf("%w: %w", errInvalidWithStartWorkflowOperation, err) + return nil, err } return handle, nil } @@ -1862,27 +1862,31 @@ func (w *workflowClientInterceptor) updateWithStartWorkflow( } var abortedErr *serviceerror.MultiOperationAborted - startErr := errors.New("failed to start workflow") for i, opReq := range multiRequest.Operations { // if an operation error is of type MultiOperationAborted, it means it was only aborted because // of another operation's error and is therefore not interesting or helpful opErr := multiErr.OperationErrors()[i] + if opErr == nil { + continue + } switch t := opReq.Operation.(type) { case *workflowservice.ExecuteMultiOperationRequest_Operation_StartWorkflow: if !errors.As(opErr, &abortedErr) { - startErr = opErr + return nil, fmt.Errorf("failed workflow start: %w", opErr) } case *workflowservice.ExecuteMultiOperationRequest_Operation_UpdateWorkflow: if !errors.As(opErr, &abortedErr) { - startErr = fmt.Errorf("%w: %w", errInvalidWithStartWorkflowOperation, opErr) + return nil, fmt.Errorf("failed workflow update: %w", opErr) } default: // this would only happen if a case statement for a newly added operation is missing above return nil, fmt.Errorf("%w: %T", errUnsupportedOperation, t) } } - return nil, startErr + + // this should never happen + return nil, errors.New(multiErr.Error()) } else if err != nil { return nil, err } diff --git a/internal/internal_workflow_client_test.go b/internal/internal_workflow_client_test.go index f7b52abc4..af55097c5 100644 --- a/internal/internal_workflow_client_test.go +++ b/internal/internal_workflow_client_test.go @@ -1113,30 +1113,80 @@ func (s *workflowRunSuite) TestExecuteWorkflowWithUpdate_Abort() { } } -func (s *workflowRunSuite) TestExecuteWorkflowWithUpdate_NonMultiOperationError() { - s.workflowServiceClient.EXPECT(). - ExecuteMultiOperation(gomock.Any(), gomock.Any(), gomock.Any()). - Return(nil, serviceerror.NewInternal("internal error")).Times(1) +func (s *workflowRunSuite) TestExecuteWorkflowWithUpdate_Errors() { + tests := []struct { + name string + returnedErr error + expectedErr string + }{ + { + name: "NonMultiOperationError", + returnedErr: serviceerror.NewInternal("internal error"), + expectedErr: "internal error", + }, + { + name: "CountMismatch", + returnedErr: serviceerror.NewMultiOperationExecution("Error", []error{}), + expectedErr: "invalid server response: 0 instead of 2 operation errors", + }, + { + name: "NilErrors", + returnedErr: serviceerror.NewMultiOperationExecution("MultiOperation failed", []error{ + nil, nil, + }), + expectedErr: "MultiOperation failed", + }, + { + name: "StartOperationError", + returnedErr: serviceerror.NewMultiOperationExecution("MultiOperation failed", []error{ + serviceerror.NewInvalidArgument("invalid Start"), + serviceerror.NewMultiOperationAborted("aborted Update"), + }), + expectedErr: "failed workflow start: invalid Start", + }, + { + name: "UpdateOperationError_AbortedStart", + returnedErr: serviceerror.NewMultiOperationExecution("MultiOperation failed", []error{ + serviceerror.NewMultiOperationAborted("aborted Start"), + serviceerror.NewInvalidArgument("invalid Update"), + }), + expectedErr: "failed workflow update: invalid Update", + }, + { + name: "UpdateOperationError_SuccessfulStart", + returnedErr: serviceerror.NewMultiOperationExecution("MultiOperation failed", []error{ + nil, // ie successful start + serviceerror.NewInvalidArgument("bad Update"), + }), + expectedErr: "failed workflow update: bad Update", + }, + } - startOp := s.workflowClient.NewWithStartWorkflowOperation( - StartWorkflowOptions{ - ID: workflowID, - WorkflowIDConflictPolicy: enumspb.WORKFLOW_ID_CONFLICT_POLICY_FAIL, - TaskQueue: taskqueue, - }, workflowType, - ) + for _, tt := range tests { + s.Run(tt.name, func() { + s.workflowServiceClient.EXPECT(). + ExecuteMultiOperation(gomock.Any(), gomock.Any(), gomock.Any()). + Return(nil, tt.returnedErr).Times(1) - _, err := s.workflowClient.UpdateWithStartWorkflow( - context.Background(), - UpdateWithStartWorkflowOptions{ - UpdateOptions: UpdateWorkflowOptions{ - UpdateName: "update", - WaitForStage: WorkflowUpdateStageCompleted, - }, - StartWorkflowOperation: startOp, - }, - ) - s.ErrorContains(err, "internal error") + _, err := s.workflowClient.UpdateWithStartWorkflow( + context.Background(), + UpdateWithStartWorkflowOptions{ + UpdateOptions: UpdateWorkflowOptions{ + UpdateName: "update", + WaitForStage: WorkflowUpdateStageCompleted, + }, + StartWorkflowOperation: s.workflowClient.NewWithStartWorkflowOperation( + StartWorkflowOptions{ + ID: workflowID, + WorkflowIDConflictPolicy: enumspb.WORKFLOW_ID_CONFLICT_POLICY_FAIL, + TaskQueue: taskqueue, + }, workflowType, + ), + }, + ) + s.EqualError(err, tt.expectedErr) + }) + } } func (s *workflowRunSuite) TestExecuteWorkflowWithUpdate_ServerResponseCountMismatch() { @@ -1167,32 +1217,6 @@ func (s *workflowRunSuite) TestExecuteWorkflowWithUpdate_ServerResponseCountMism s.ErrorContains(err, "invalid server response: 0 instead of 2 operation results") } -func (s *workflowRunSuite) TestExecuteWorkflowWithUpdate_ServerErrorResponseCountMismatch() { - s.workflowServiceClient.EXPECT(). - ExecuteMultiOperation(gomock.Any(), gomock.Any(), gomock.Any()). - Return(nil, serviceerror.NewMultiOperationExecution("Error", []error{})).Times(1) - - startOp := s.workflowClient.NewWithStartWorkflowOperation( - StartWorkflowOptions{ - ID: workflowID, - WorkflowIDConflictPolicy: enumspb.WORKFLOW_ID_CONFLICT_POLICY_FAIL, - TaskQueue: taskqueue, - }, workflowType, - ) - - _, err := s.workflowClient.UpdateWithStartWorkflow( - context.Background(), - UpdateWithStartWorkflowOptions{ - UpdateOptions: UpdateWorkflowOptions{ - UpdateName: "update", - WaitForStage: WorkflowUpdateStageCompleted, - }, - StartWorkflowOperation: startOp, - }, - ) - s.ErrorContains(err, "invalid server response: 0 instead of 2 operation errors") -} - func (s *workflowRunSuite) TestExecuteWorkflowWithUpdate_ServerStartResponseTypeMismatch() { s.workflowServiceClient.EXPECT(). ExecuteMultiOperation(gomock.Any(), gomock.Any(), gomock.Any()).