Skip to content

Commit

Permalink
Merge branch 'master' into erroneous-replay-fix
Browse files Browse the repository at this point in the history
  • Loading branch information
yuandrew authored Dec 9, 2024
2 parents 6241536 + 41e9b33 commit 027e581
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 54 deletions.
16 changes: 10 additions & 6 deletions internal/internal_workflow_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1761,7 +1761,7 @@ func (w *workflowClientInterceptor) UpdateWithStartWorkflow(
// Create update request
updateReq, err := w.createUpdateWorkflowRequest(ctx, updateInput)
if err != nil {
return nil, fmt.Errorf("%w: %w", errInvalidWithStartWorkflowOperation, err)
return nil, err
}
if updateReq.WorkflowExecution.WorkflowId == "" {
updateReq.WorkflowExecution.WorkflowId = startReq.WorkflowId
Expand Down Expand Up @@ -1800,7 +1800,7 @@ func (w *workflowClientInterceptor) UpdateWithStartWorkflow(
}
handle, err := w.updateHandleFromResponse(ctx, updateReq.WaitPolicy.LifecycleStage, updateResp)
if err != nil {
return nil, fmt.Errorf("%w: %w", errInvalidWithStartWorkflowOperation, err)
return nil, err
}
return handle, nil
}
Expand Down Expand Up @@ -1862,27 +1862,31 @@ func (w *workflowClientInterceptor) updateWithStartWorkflow(
}

var abortedErr *serviceerror.MultiOperationAborted
startErr := errors.New("failed to start workflow")
for i, opReq := range multiRequest.Operations {
// if an operation error is of type MultiOperationAborted, it means it was only aborted because
// of another operation's error and is therefore not interesting or helpful
opErr := multiErr.OperationErrors()[i]
if opErr == nil {
continue
}

switch t := opReq.Operation.(type) {
case *workflowservice.ExecuteMultiOperationRequest_Operation_StartWorkflow:
if !errors.As(opErr, &abortedErr) {
startErr = opErr
return nil, fmt.Errorf("failed workflow start: %w", opErr)
}
case *workflowservice.ExecuteMultiOperationRequest_Operation_UpdateWorkflow:
if !errors.As(opErr, &abortedErr) {
startErr = fmt.Errorf("%w: %w", errInvalidWithStartWorkflowOperation, opErr)
return nil, fmt.Errorf("failed workflow update: %w", opErr)
}
default:
// this would only happen if a case statement for a newly added operation is missing above
return nil, fmt.Errorf("%w: %T", errUnsupportedOperation, t)
}
}
return nil, startErr

// this should never happen
return nil, errors.New(multiErr.Error())
} else if err != nil {
return nil, err
}
Expand Down
120 changes: 72 additions & 48 deletions internal/internal_workflow_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1113,30 +1113,80 @@ func (s *workflowRunSuite) TestExecuteWorkflowWithUpdate_Abort() {
}
}

func (s *workflowRunSuite) TestExecuteWorkflowWithUpdate_NonMultiOperationError() {
s.workflowServiceClient.EXPECT().
ExecuteMultiOperation(gomock.Any(), gomock.Any(), gomock.Any()).
Return(nil, serviceerror.NewInternal("internal error")).Times(1)
func (s *workflowRunSuite) TestExecuteWorkflowWithUpdate_Errors() {
tests := []struct {
name string
returnedErr error
expectedErr string
}{
{
name: "NonMultiOperationError",
returnedErr: serviceerror.NewInternal("internal error"),
expectedErr: "internal error",
},
{
name: "CountMismatch",
returnedErr: serviceerror.NewMultiOperationExecution("Error", []error{}),
expectedErr: "invalid server response: 0 instead of 2 operation errors",
},
{
name: "NilErrors",
returnedErr: serviceerror.NewMultiOperationExecution("MultiOperation failed", []error{
nil, nil,
}),
expectedErr: "MultiOperation failed",
},
{
name: "StartOperationError",
returnedErr: serviceerror.NewMultiOperationExecution("MultiOperation failed", []error{
serviceerror.NewInvalidArgument("invalid Start"),
serviceerror.NewMultiOperationAborted("aborted Update"),
}),
expectedErr: "failed workflow start: invalid Start",
},
{
name: "UpdateOperationError_AbortedStart",
returnedErr: serviceerror.NewMultiOperationExecution("MultiOperation failed", []error{
serviceerror.NewMultiOperationAborted("aborted Start"),
serviceerror.NewInvalidArgument("invalid Update"),
}),
expectedErr: "failed workflow update: invalid Update",
},
{
name: "UpdateOperationError_SuccessfulStart",
returnedErr: serviceerror.NewMultiOperationExecution("MultiOperation failed", []error{
nil, // ie successful start
serviceerror.NewInvalidArgument("bad Update"),
}),
expectedErr: "failed workflow update: bad Update",
},
}

startOp := s.workflowClient.NewWithStartWorkflowOperation(
StartWorkflowOptions{
ID: workflowID,
WorkflowIDConflictPolicy: enumspb.WORKFLOW_ID_CONFLICT_POLICY_FAIL,
TaskQueue: taskqueue,
}, workflowType,
)
for _, tt := range tests {
s.Run(tt.name, func() {
s.workflowServiceClient.EXPECT().
ExecuteMultiOperation(gomock.Any(), gomock.Any(), gomock.Any()).
Return(nil, tt.returnedErr).Times(1)

_, err := s.workflowClient.UpdateWithStartWorkflow(
context.Background(),
UpdateWithStartWorkflowOptions{
UpdateOptions: UpdateWorkflowOptions{
UpdateName: "update",
WaitForStage: WorkflowUpdateStageCompleted,
},
StartWorkflowOperation: startOp,
},
)
s.ErrorContains(err, "internal error")
_, err := s.workflowClient.UpdateWithStartWorkflow(
context.Background(),
UpdateWithStartWorkflowOptions{
UpdateOptions: UpdateWorkflowOptions{
UpdateName: "update",
WaitForStage: WorkflowUpdateStageCompleted,
},
StartWorkflowOperation: s.workflowClient.NewWithStartWorkflowOperation(
StartWorkflowOptions{
ID: workflowID,
WorkflowIDConflictPolicy: enumspb.WORKFLOW_ID_CONFLICT_POLICY_FAIL,
TaskQueue: taskqueue,
}, workflowType,
),
},
)
s.EqualError(err, tt.expectedErr)
})
}
}

func (s *workflowRunSuite) TestExecuteWorkflowWithUpdate_ServerResponseCountMismatch() {
Expand Down Expand Up @@ -1167,32 +1217,6 @@ func (s *workflowRunSuite) TestExecuteWorkflowWithUpdate_ServerResponseCountMism
s.ErrorContains(err, "invalid server response: 0 instead of 2 operation results")
}

func (s *workflowRunSuite) TestExecuteWorkflowWithUpdate_ServerErrorResponseCountMismatch() {
s.workflowServiceClient.EXPECT().
ExecuteMultiOperation(gomock.Any(), gomock.Any(), gomock.Any()).
Return(nil, serviceerror.NewMultiOperationExecution("Error", []error{})).Times(1)

startOp := s.workflowClient.NewWithStartWorkflowOperation(
StartWorkflowOptions{
ID: workflowID,
WorkflowIDConflictPolicy: enumspb.WORKFLOW_ID_CONFLICT_POLICY_FAIL,
TaskQueue: taskqueue,
}, workflowType,
)

_, err := s.workflowClient.UpdateWithStartWorkflow(
context.Background(),
UpdateWithStartWorkflowOptions{
UpdateOptions: UpdateWorkflowOptions{
UpdateName: "update",
WaitForStage: WorkflowUpdateStageCompleted,
},
StartWorkflowOperation: startOp,
},
)
s.ErrorContains(err, "invalid server response: 0 instead of 2 operation errors")
}

func (s *workflowRunSuite) TestExecuteWorkflowWithUpdate_ServerStartResponseTypeMismatch() {
s.workflowServiceClient.EXPECT().
ExecuteMultiOperation(gomock.Any(), gomock.Any(), gomock.Any()).
Expand Down

0 comments on commit 027e581

Please sign in to comment.