Skip to content

Commit

Permalink
Combine parameters in a single struct
Browse files Browse the repository at this point in the history
  • Loading branch information
dandavison committed Dec 4, 2024
1 parent 2cf86f4 commit 0ca798a
Show file tree
Hide file tree
Showing 7 changed files with 183 additions and 100 deletions.
23 changes: 17 additions & 6 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,8 @@ type (
// StartWorkflowOptions configuration parameters for starting a workflow execution.
StartWorkflowOptions = internal.StartWorkflowOptions

// WithStartWorkflowOperation defines how to start a workflow when using Update-With-Start.
// See [Client.NewWithStartWorkflowOperation] for details.
// WithStartWorkflowOperation defines how to start a workflow when using update-with-start.
// See [Client.NewWithStartWorkflowOperation] and [Client.UpdateWithStartWorkflow].
// NOTE: Experimental
WithStartWorkflowOperation = internal.WithStartWorkflowOperation

Expand Down Expand Up @@ -274,6 +274,11 @@ type (
// NOTE: Experimental
UpdateWorkflowOptions = internal.UpdateWorkflowOptions

// UpdateWithStartWorkflowOptions encapsulates the parameters for update-with-start.
// See [Client.UpdateWithStartWorkflow].
// NOTE: Experimental
UpdateWithStartWorkflowOptions = internal.UpdateWithStartWorkflowOptions

// WorkflowUpdateHandle represents a running or completed workflow
// execution update and gives the holder access to the outcome of the same.
// NOTE: Experimental
Expand Down Expand Up @@ -559,8 +564,8 @@ type (
SignalWithStartWorkflow(ctx context.Context, workflowID string, signalName string, signalArg interface{},
options StartWorkflowOptions, workflow interface{}, workflowArgs ...interface{}) (WorkflowRun, error)

// NewWithStartWorkflowOperation returns a WithStartWorkflowOperation to perform Update-with-Start.
// Returns an error if the WorkflowIDConflictPolicy is not set, or if the workflow or arguments are invalid.
// NewWithStartWorkflowOperation returns a WithStartWorkflowOperation to perform update-with-start.
// See [Client.UpdateWithStartWorkflow].
// NOTE: Experimental
NewWithStartWorkflowOperation(options StartWorkflowOptions, workflow interface{}, args ...interface{}) WithStartWorkflowOperation

Expand Down Expand Up @@ -840,10 +845,16 @@ type (
// NOTE: Experimental
UpdateWorkflow(ctx context.Context, options UpdateWorkflowOptions) (WorkflowUpdateHandle, error)

// UpdateWithStartWorkflow intercepts client.Client.UpdateWithStartWorkflow.
// UpdateWithStartWorkflow issues an update-with-start request. A
// WorkflowIDConflictPolicy must be set. If the specified workflow is
// not running, then a new workflow execution is started and the update
// is sent in the first workflow task. Alternatively if the specified
// workflow is running then, if the WorkflowIDConflictPolicy is
// USE_EXISTING, the update is issued against the specified workflow,
// and if the WorkflowIDConflictPolicy is FAIL, an error is returned.
//
// NOTE: Experimental
UpdateWithStartWorkflow(ctx context.Context, options UpdateWorkflowOptions, startOperation WithStartWorkflowOperation) (WorkflowUpdateHandle, error)
UpdateWithStartWorkflow(ctx context.Context, options UpdateWithStartWorkflowOptions) (WorkflowUpdateHandle, error)

// GetWorkflowUpdateHandle creates a handle to the referenced update
// which can be polled for an outcome. Note that runID is optional and
Expand Down
17 changes: 12 additions & 5 deletions internal/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ type (
SignalWithStartWorkflow(ctx context.Context, workflowID string, signalName string, signalArg interface{},
options StartWorkflowOptions, workflow interface{}, workflowArgs ...interface{}) (WorkflowRun, error)

// NewWithStartWorkflowOperation returns a WithStartWorkflowOperation to perform Update-with-Start.
// NewWithStartWorkflowOperation returns a WithStartWorkflowOperation for use in UpdateWithStartWorkflow.
// NOTE: Experimental
NewWithStartWorkflowOperation(options StartWorkflowOptions, workflow interface{}, args ...interface{}) WithStartWorkflowOperation

Expand Down Expand Up @@ -397,9 +397,16 @@ type (
// NOTE: Experimental
UpdateWorkflow(ctx context.Context, options UpdateWorkflowOptions) (WorkflowUpdateHandle, error)

// UpdateWithStartWorkflow issues an update request to the specified workflow execution, starting the workflow if appropriate.
// UpdateWithStartWorkflow issues an update-with-start request. A
// WorkflowIDConflictPolicy must be set. If the specified workflow is
// not running, then a new workflow execution is started and the update
// is sent in the first workflow task. Alternatively if the specified
// workflow is running then, if the WorkflowIDConflictPolicy is
// USE_EXISTING, the update is issued against the specified workflow,
// and if the WorkflowIDConflictPolicy is FAIL, an error is returned.
//
// NOTE: Experimental
UpdateWithStartWorkflow(ctx context.Context, options UpdateWorkflowOptions, startOperation WithStartWorkflowOperation) (WorkflowUpdateHandle, error)
UpdateWithStartWorkflow(ctx context.Context, options UpdateWithStartWorkflowOptions) (WorkflowUpdateHandle, error)

// GetWorkflowUpdateHandle creates a handle to the referenced update
// which can be polled for an outcome. Note that runID is optional and
Expand Down Expand Up @@ -746,8 +753,8 @@ type (
links []*commonpb.Link
}

// WithStartWorkflowOperation defines how to start a workflow when using Update-With-Start.
// See NewWithStartWorkflowOperation for details.
// WithStartWorkflowOperation defines how to start a workflow when using update-with-start.
// See [NewWithStartWorkflowOperation] and [UpdateWithStartWorkflow].
// NOTE: Experimental
WithStartWorkflowOperation interface {
Get(ctx context.Context) (WorkflowRun, error)
Expand Down
23 changes: 15 additions & 8 deletions internal/internal_workflow_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ func (wc *WorkflowClient) SignalWithStartWorkflow(ctx context.Context, workflowI
func (wc *WorkflowClient) NewWithStartWorkflowOperation(options StartWorkflowOptions, workflow interface{}, args ...interface{}) WithStartWorkflowOperation {
op := &WithStartWorkflowOperationImpl{doneCh: make(chan struct{})}
if options.WorkflowIDConflictPolicy == enumspb.WORKFLOW_ID_CONFLICT_POLICY_UNSPECIFIED {
op.err = errors.New("WorkflowIDConflictPolicy must be set in StartWorkflowOptions for Update-With-Start")
op.err = errors.New("WorkflowIDConflictPolicy must be set in StartWorkflowOptions for update-with-start")
return op
}
input, err := createStartWorkflowInput(options, workflow, args, wc.registry)
Expand Down Expand Up @@ -803,6 +803,14 @@ type UpdateWorkflowOptions struct {
FirstExecutionRunID string
}

// UpdateWithStartWorkflowOptions encapsulates the parameters for update-with-start.
// See [UpdateWithStartWorkflow].
// NOTE: Experimental
type UpdateWithStartWorkflowOptions struct {
StartOperation WithStartWorkflowOperation
UpdateOptions UpdateWorkflowOptions
}

// WorkflowUpdateHandle is a handle to a workflow execution update process. The
// update may or may not have completed so an instance of this type functions
// similar to a Future with respect to the outcome of the update. If the update
Expand Down Expand Up @@ -1180,10 +1188,9 @@ func (wc *WorkflowClient) UpdateWorkflow(

func (wc *WorkflowClient) UpdateWithStartWorkflow(
ctx context.Context,
updateOptions UpdateWorkflowOptions,
startOperation WithStartWorkflowOperation,
options UpdateWithStartWorkflowOptions,
) (WorkflowUpdateHandle, error) {
startOp, ok := startOperation.(*WithStartWorkflowOperationImpl)
startOp, ok := options.StartOperation.(*WithStartWorkflowOperationImpl)
if !ok {
panic("startOperation must be created by NewWithStartWorkflowOperation")
}
Expand All @@ -1193,21 +1200,21 @@ func (wc *WorkflowClient) UpdateWithStartWorkflow(
if err := wc.ensureInitialized(ctx); err != nil {
return nil, err
}
if updateOptions.RunID != "" {
if options.UpdateOptions.RunID != "" {
return nil, errors.New("invalid UpdateWorkflowOptions: RunID cannot be set for UpdateWithStartWorkflow because the workflow might not be running")
}
if updateOptions.FirstExecutionRunID != "" {
if options.UpdateOptions.FirstExecutionRunID != "" {
return nil, errors.New("invalid UpdateWorkflowOptions: FirstExecutionRunID cannot be set for UpdateWithStartWorkflow because the workflow might not be running")
}

updateInput, err := createUpdateWorkflowInput(updateOptions)
updateInput, err := createUpdateWorkflowInput(options.UpdateOptions)
if err != nil {
return nil, err
}

ctx = contextWithNewHeader(ctx)

return wc.interceptor.UpdateWithStartWorkflow(ctx, updateInput, startOperation)
return wc.interceptor.UpdateWithStartWorkflow(ctx, updateInput, startOp)
}

// CheckHealthRequest is a request for Client.CheckHealth.
Expand Down
77 changes: 49 additions & 28 deletions internal/internal_workflow_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1020,10 +1020,13 @@ func (s *workflowRunSuite) TestExecuteWorkflowWithUpdate_Retry() {

_, err := s.workflowClient.UpdateWithStartWorkflow(
context.Background(),
UpdateWorkflowOptions{
UpdateName: "update",
WaitForStage: WorkflowUpdateStageCompleted,
}, startOp,
UpdateWithStartWorkflowOptions{
UpdateOptions: UpdateWorkflowOptions{
UpdateName: "update",
WaitForStage: WorkflowUpdateStageCompleted,
},
StartOperation: startOp,
},
)
s.NoError(err)
}
Expand Down Expand Up @@ -1094,10 +1097,13 @@ func (s *workflowRunSuite) TestExecuteWorkflowWithUpdate_Abort() {

_, err := s.workflowClient.UpdateWithStartWorkflow(
ctxWithTimeout,
UpdateWorkflowOptions{
UpdateName: "update",
WaitForStage: WorkflowUpdateStageCompleted,
}, startOp,
UpdateWithStartWorkflowOptions{
UpdateOptions: UpdateWorkflowOptions{
UpdateName: "update",
WaitForStage: WorkflowUpdateStageCompleted,
},
StartOperation: startOp,
},
)

var expectedErr *WorkflowUpdateServiceTimeoutOrCanceledError
Expand All @@ -1122,10 +1128,13 @@ func (s *workflowRunSuite) TestExecuteWorkflowWithUpdate_NonMultiOperationError(

_, err := s.workflowClient.UpdateWithStartWorkflow(
context.Background(),
UpdateWorkflowOptions{
UpdateName: "update",
WaitForStage: WorkflowUpdateStageCompleted,
}, startOp,
UpdateWithStartWorkflowOptions{
UpdateOptions: UpdateWorkflowOptions{
UpdateName: "update",
WaitForStage: WorkflowUpdateStageCompleted,
},
StartOperation: startOp,
},
)
s.ErrorContains(err, "internal error")
}
Expand All @@ -1147,10 +1156,13 @@ func (s *workflowRunSuite) TestExecuteWorkflowWithUpdate_ServerResponseCountMism

_, err := s.workflowClient.UpdateWithStartWorkflow(
context.Background(),
UpdateWorkflowOptions{
UpdateName: "update",
WaitForStage: WorkflowUpdateStageCompleted,
}, startOp,
UpdateWithStartWorkflowOptions{
UpdateOptions: UpdateWorkflowOptions{
UpdateName: "update",
WaitForStage: WorkflowUpdateStageCompleted,
},
StartOperation: startOp,
},
)
s.ErrorContains(err, "invalid server response: 0 instead of 2 operation results")
}
Expand All @@ -1170,10 +1182,13 @@ func (s *workflowRunSuite) TestExecuteWorkflowWithUpdate_ServerErrorResponseCoun

_, err := s.workflowClient.UpdateWithStartWorkflow(
context.Background(),
UpdateWorkflowOptions{
UpdateName: "update",
WaitForStage: WorkflowUpdateStageCompleted,
}, startOp,
UpdateWithStartWorkflowOptions{
UpdateOptions: UpdateWorkflowOptions{
UpdateName: "update",
WaitForStage: WorkflowUpdateStageCompleted,
},
StartOperation: startOp,
},
)
s.ErrorContains(err, "invalid server response: 0 instead of 2 operation errors")
}
Expand All @@ -1200,10 +1215,13 @@ func (s *workflowRunSuite) TestExecuteWorkflowWithUpdate_ServerStartResponseType

_, err := s.workflowClient.UpdateWithStartWorkflow(
context.Background(),
UpdateWorkflowOptions{
UpdateName: "update",
WaitForStage: WorkflowUpdateStageCompleted,
}, startOp,
UpdateWithStartWorkflowOptions{
UpdateOptions: UpdateWorkflowOptions{
UpdateName: "update",
WaitForStage: WorkflowUpdateStageCompleted,
},
StartOperation: startOp,
},
)
s.ErrorContains(err, "invalid server response: StartWorkflow response has the wrong type *workflowservice.ExecuteMultiOperationResponse_Response_UpdateWorkflow")
}
Expand Down Expand Up @@ -1236,10 +1254,13 @@ func (s *workflowRunSuite) TestExecuteWorkflowWithUpdate_ServerUpdateResponseTyp

_, err := s.workflowClient.UpdateWithStartWorkflow(
context.Background(),
UpdateWorkflowOptions{
UpdateName: "update",
WaitForStage: WorkflowUpdateStageCompleted,
}, startOp,
UpdateWithStartWorkflowOptions{
UpdateOptions: UpdateWorkflowOptions{
UpdateName: "update",
WaitForStage: WorkflowUpdateStageCompleted,
},
StartOperation: startOp,
},
)
s.ErrorContains(err, "invalid server response: UpdateWorkflow response has the wrong type *workflowservice.ExecuteMultiOperationResponse_Response_StartWorkflow")
}
Expand Down
2 changes: 1 addition & 1 deletion internal/nexus_operations.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ func (t *testSuiteClientForNexusOperations) UpdateWorkflow(ctx context.Context,
}

// UpdateWithStartWorkflow implements Client.
func (t *testSuiteClientForNexusOperations) UpdateWithStartWorkflow(ctx context.Context, options UpdateWorkflowOptions, startOperation WithStartWorkflowOperation) (WorkflowUpdateHandle, error) {
func (t *testSuiteClientForNexusOperations) UpdateWithStartWorkflow(ctx context.Context, options UpdateWithStartWorkflowOptions) (WorkflowUpdateHandle, error) {
panic("unimplemented in the test environment")
}

Expand Down
18 changes: 9 additions & 9 deletions mocks/Client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 0ca798a

Please sign in to comment.