From 9d59447f6e745714c4e512cd12fd15cae56b00f4 Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Thu, 5 Dec 2024 20:29:44 -0500 Subject: [PATCH] New Update-With-Start API (#1731) Instead of using the ExecuteWorkflow client method, update-with-start is invoked via a new client method UpdateWithStartWorkflow. To use this method, first use NewWithStartWorkflowOperation to define the start-workflow operation. A workflow ID conflict policy is required. Then call UpdateWithStartWorkflow, passing it an UpdateWithStartWorkflowOptions containing your WithStartWorkflowOperation, together with an UpdateWorkflowOptions defining the update operation. This will return an UpdateHandle. The WithStartWorkflowOperation exposes a blocking .Get(ctx) method to obtain the workflow run targeted by the update. --- client/client.go | 41 +-- interceptor/interceptor.go | 4 + interceptor/tracing_interceptor.go | 27 ++ internal/client.go | 80 +++--- internal/interceptor.go | 10 + internal/interceptor_base.go | 7 + internal/internal_workflow_client.go | 306 ++++++++++++++------- internal/internal_workflow_client_test.go | 225 ++++++++-------- internal/nexus_operations.go | 9 + mocks/Client.go | 54 +++- test/integration_test.go | 312 +++++++++++++--------- 11 files changed, 679 insertions(+), 396 deletions(-) diff --git a/client/client.go b/client/client.go index e7eaf621f..95474689a 100644 --- a/client/client.go +++ b/client/client.go @@ -162,16 +162,11 @@ type ( // StartWorkflowOptions configuration parameters for starting a workflow execution. StartWorkflowOptions = internal.StartWorkflowOptions - // WithStartWorkflowOperation is a type of operation that can be executed as part of a workflow start. - // For example, use NewUpdateWithStartWorkflowOperation to perform Update-with-Start. + // WithStartWorkflowOperation defines how to start a workflow when using UpdateWithStartWorkflow. + // See [Client.NewWithStartWorkflowOperation] and [Client.UpdateWithStartWorkflow]. // NOTE: Experimental WithStartWorkflowOperation = internal.WithStartWorkflowOperation - // UpdateWithStartWorkflowOperation is used to perform Update-with-Start. - // See NewUpdateWithStartWorkflowOperation for details. - // NOTE: Experimental - UpdateWithStartWorkflowOperation = internal.UpdateWithStartWorkflowOperation - // HistoryEventIterator is a iterator which can return history events. HistoryEventIterator = internal.HistoryEventIterator @@ -279,6 +274,11 @@ type ( // NOTE: Experimental UpdateWorkflowOptions = internal.UpdateWorkflowOptions + // UpdateWithStartWorkflowOptions encapsulates the parameters used by UpdateWithStartWorkflow. + // See [Client.UpdateWithStartWorkflow] and [Client.NewWithStartWorkflowOperation]. + // NOTE: Experimental + UpdateWithStartWorkflowOptions = internal.UpdateWithStartWorkflowOptions + // WorkflowUpdateHandle represents a running or completed workflow // execution update and gives the holder access to the outcome of the same. // NOTE: Experimental @@ -564,6 +564,11 @@ type ( SignalWithStartWorkflow(ctx context.Context, workflowID string, signalName string, signalArg interface{}, options StartWorkflowOptions, workflow interface{}, workflowArgs ...interface{}) (WorkflowRun, error) + // NewWithStartWorkflowOperation returns a WithStartWorkflowOperation for use with UpdateWithStartWorkflow. + // See [Client.UpdateWithStartWorkflow]. + // NOTE: Experimental + NewWithStartWorkflowOperation(options StartWorkflowOptions, workflow interface{}, args ...interface{}) WithStartWorkflowOperation + // CancelWorkflow request cancellation of a workflow in execution. Cancellation request closes the channel // returned by the workflow.Context.Done() of the workflow that is target of the request. // - workflow ID of the workflow. @@ -840,6 +845,20 @@ type ( // NOTE: Experimental UpdateWorkflow(ctx context.Context, options UpdateWorkflowOptions) (WorkflowUpdateHandle, error) + // UpdateWithStartWorkflow issues an update-with-start request. A + // WorkflowIDConflictPolicy must be set in the options. If the specified + // workflow execution is not running, then a new workflow execution is + // started and the update is sent in the first workflow task. + // Alternatively if the specified workflow execution is running then, if + // the WorkflowIDConflictPolicy is USE_EXISTING, the update is issued + // against the specified workflow, and if the WorkflowIDConflictPolicy + // is FAIL, an error is returned. The call will block until the update + // has reached the WaitForStage in the options. Note that this means + // that the call will not return successfully until the update has been + // delivered to a worker. + // NOTE: Experimental + UpdateWithStartWorkflow(ctx context.Context, options UpdateWithStartWorkflowOptions) (WorkflowUpdateHandle, error) + // GetWorkflowUpdateHandle creates a handle to the referenced update // which can be polled for an outcome. Note that runID is optional and // if not specified the most recent runID will be used. @@ -934,14 +953,6 @@ type MetricsTimer = metrics.Timer // MetricsNopHandler is a noop handler that does nothing with the metrics. var MetricsNopHandler = metrics.NopHandler -// NewUpdateWithStartWorkflowOperation returns an UpdateWithStartWorkflowOperation to perform Update-with-Start. -// After executing Client.ExecuteWorkflow with the UpdateWithStartWorkflow in the start options, -// the update result can be obtained. -// NOTE: Experimental -func NewUpdateWithStartWorkflowOperation(options UpdateWorkflowOptions) *UpdateWithStartWorkflowOperation { - return internal.NewUpdateWithStartWorkflowOperation(options) -} - // Dial creates an instance of a workflow client. This will attempt to connect // to the server eagerly and will return an error if the server is not // available. diff --git a/interceptor/interceptor.go b/interceptor/interceptor.go index aa269ca5c..6d289b7fa 100644 --- a/interceptor/interceptor.go +++ b/interceptor/interceptor.go @@ -220,6 +220,10 @@ type ScheduleClientCreateInput = internal.ScheduleClientCreateInput // ClientOutoundInterceptor.UpdateWorkflow. type ClientUpdateWorkflowInput = internal.ClientUpdateWorkflowInput +// ClientUpdateWithStartWorkflowInput is input for +// ClientOutboundInterceptor.UpdateWithStartWorkflow. +type ClientUpdateWithStartWorkflowInput = internal.ClientUpdateWithStartWorkflowInput + // Header provides Temporal header information from the context for reading or // writing during specific interceptor calls. // diff --git a/interceptor/tracing_interceptor.go b/interceptor/tracing_interceptor.go index 4f3a1e33d..cc6c7bdc1 100644 --- a/interceptor/tracing_interceptor.go +++ b/interceptor/tracing_interceptor.go @@ -379,6 +379,33 @@ func (t *tracingClientOutboundInterceptor) UpdateWorkflow( return val, err } +func (t *tracingClientOutboundInterceptor) UpdateWithStartWorkflow( + ctx context.Context, + in *ClientUpdateWithStartWorkflowInput, +) (client.WorkflowUpdateHandle, error) { + // Only add tracing if enabled + if t.root.options.DisableUpdateTracing { + return t.Next.UpdateWithStartWorkflow(ctx, in) + } + // Start span and write to header + span, ctx, err := t.root.startSpanFromContext(ctx, &TracerStartSpanOptions{ + Operation: "UpdateWithStartWorkflow", + Name: in.UpdateOptions.UpdateName, + Tags: map[string]string{workflowIDTagKey: in.UpdateOptions.WorkflowID, updateIDTagKey: in.UpdateOptions.UpdateID}, + ToHeader: true, + Time: time.Now(), + }) + if err != nil { + return nil, err + } + var finishOpts TracerFinishSpanOptions + defer span.Finish(&finishOpts) + + val, err := t.Next.UpdateWithStartWorkflow(ctx, in) + finishOpts.Error = err + return val, err +} + type tracingActivityOutboundInterceptor struct { ActivityOutboundInterceptorBase root *tracingInterceptor diff --git a/internal/client.go b/internal/client.go index 99f9d390e..1564c282f 100644 --- a/internal/client.go +++ b/internal/client.go @@ -27,7 +27,6 @@ package internal import ( "context" "crypto/tls" - "errors" "fmt" "sync/atomic" "time" @@ -135,6 +134,10 @@ type ( SignalWithStartWorkflow(ctx context.Context, workflowID string, signalName string, signalArg interface{}, options StartWorkflowOptions, workflow interface{}, workflowArgs ...interface{}) (WorkflowRun, error) + // NewWithStartWorkflowOperation returns a WithStartWorkflowOperation for use in UpdateWithStartWorkflow. + // NOTE: Experimental + NewWithStartWorkflowOperation(options StartWorkflowOptions, workflow interface{}, args ...interface{}) WithStartWorkflowOperation + // CancelWorkflow cancels a workflow in execution // - workflow ID of the workflow. // - runID can be default(empty string). if empty string then it will pick the running execution of that workflow ID. @@ -394,6 +397,17 @@ type ( // NOTE: Experimental UpdateWorkflow(ctx context.Context, options UpdateWorkflowOptions) (WorkflowUpdateHandle, error) + // UpdateWithStartWorkflow issues an update-with-start request. A + // WorkflowIDConflictPolicy must be set. If the specified workflow is + // not running, then a new workflow execution is started and the update + // is sent in the first workflow task. Alternatively if the specified + // workflow is running then, if the WorkflowIDConflictPolicy is + // USE_EXISTING, the update is issued against the specified workflow, + // and if the WorkflowIDConflictPolicy is FAIL, an error is returned. + // + // NOTE: Experimental + UpdateWithStartWorkflow(ctx context.Context, options UpdateWithStartWorkflowOptions) (WorkflowUpdateHandle, error) + // GetWorkflowUpdateHandle creates a handle to the referenced update // which can be polled for an outcome. Note that runID is optional and // if not specified the most recent runID will be used. @@ -647,18 +661,6 @@ type ( // Optional: defaulted to Fail. WorkflowIDConflictPolicy enumspb.WorkflowIdConflictPolicy - // WithStartOperation - Operation to execute with Workflow Start. - // For example, see NewUpdateWithStartWorkflowOperation to perform Update-with-Start. Note that if the workflow is - // already running and WorkflowIDConflictPolicy is set to UseExisting, the start is skipped and only the - // operation is executed. If instead the policy is set to Fail (the default), nothing is executed and - // an error will be returned (i.e. the option WorkflowExecutionErrorWhenAlreadyStarted is ignored). - // This option will be ignored when used with Client.SignalWithStartWorkflow. - // - // Optional: defaults to nil. - // - // NOTE: Experimental - WithStartOperation WithStartWorkflowOperation - // When WorkflowExecutionErrorWhenAlreadyStarted is true, Client.ExecuteWorkflow will return an error if the // workflow id has already been used and WorkflowIDReusePolicy or WorkflowIDConflictPolicy would // disallow a re-run. If it is set to false, rather than erroring a WorkflowRun instance representing @@ -751,22 +753,24 @@ type ( links []*commonpb.Link } - // WithStartWorkflowOperation is a type of operation that can be executed as part of a workflow start. + // WithStartWorkflowOperation defines how to start a workflow when using UpdateWithStartWorkflow. + // See [NewWithStartWorkflowOperation] and [UpdateWithStartWorkflow]. + // NOTE: Experimental WithStartWorkflowOperation interface { - isWithStartWorkflowOperation() + // Get returns the WorkflowRun that was targeted by the UpdateWithStartWorkflow call. + // This is a blocking API. + Get(ctx context.Context) (WorkflowRun, error) } - // UpdateWithStartWorkflowOperation is used to perform Update-with-Start. - // See NewUpdateWithStartWorkflowOperation for details. - UpdateWithStartWorkflowOperation struct { - input *ClientUpdateWorkflowInput + withStartWorkflowOperationImpl struct { + input *ClientExecuteWorkflowInput // flag to ensure the operation is only executed once executed atomic.Bool // channel to indicate that handle or err is available doneCh chan struct{} - // handle and err cannot be accessed before doneCh is closed - handle WorkflowUpdateHandle - err error + // workflowRun and err cannot be accessed before doneCh is closed + workflowRun WorkflowRun + err error } // RetryPolicy defines the retry policy. @@ -1059,30 +1063,10 @@ func DialCloudOperationsClient(ctx context.Context, options CloudOperationsClien }, nil } -// NewUpdateWithStartWorkflowOperation returns an UpdateWithStartWorkflowOperation that can be used to perform Update-with-Start. -func NewUpdateWithStartWorkflowOperation(options UpdateWorkflowOptions) *UpdateWithStartWorkflowOperation { - res := &UpdateWithStartWorkflowOperation{doneCh: make(chan struct{})} - - input, err := createUpdateWorkflowInput(options) - if err != nil { - res.set(nil, err) - } else if options.RunID != "" { - res.set(nil, errors.New("RunID cannot be set because the workflow might not be running")) - } - if options.FirstExecutionRunID != "" { - res.set(nil, errors.New("FirstExecutionRunID cannot be set because the workflow might not be running")) - } else { - res.input = input - } - - return res -} - -// Get blocks until a server response has been received; or the context deadline is exceeded. -func (op *UpdateWithStartWorkflowOperation) Get(ctx context.Context) (WorkflowUpdateHandle, error) { +func (op *withStartWorkflowOperationImpl) Get(ctx context.Context) (WorkflowRun, error) { select { case <-op.doneCh: - return op.handle, op.err + return op.workflowRun, op.err case <-ctx.Done(): if !op.executed.Load() { return nil, fmt.Errorf("%w: %w", ctx.Err(), fmt.Errorf("operation was not executed")) @@ -1091,21 +1075,19 @@ func (op *UpdateWithStartWorkflowOperation) Get(ctx context.Context) (WorkflowUp } } -func (op *UpdateWithStartWorkflowOperation) markExecuted() error { +func (op *withStartWorkflowOperationImpl) markExecuted() error { if op.executed.Swap(true) { return fmt.Errorf("was already executed") } return nil } -func (op *UpdateWithStartWorkflowOperation) set(handle WorkflowUpdateHandle, err error) { - op.handle = handle +func (op *withStartWorkflowOperationImpl) set(workflowRun WorkflowRun, err error) { + op.workflowRun = workflowRun op.err = err close(op.doneCh) } -func (op *UpdateWithStartWorkflowOperation) isWithStartWorkflowOperation() {} - // NewNamespaceClient creates an instance of a namespace client, to manager lifecycle of namespaces. func NewNamespaceClient(options ClientOptions) (NamespaceClient, error) { // Initialize root tags diff --git a/internal/interceptor.go b/internal/interceptor.go index 41d3fb0c2..914cfd764 100644 --- a/internal/interceptor.go +++ b/internal/interceptor.go @@ -393,6 +393,11 @@ type ClientOutboundInterceptor interface { // NOTE: Experimental UpdateWorkflow(context.Context, *ClientUpdateWorkflowInput) (WorkflowUpdateHandle, error) + // UpdateWithStartWorkflow intercepts client.Client.UpdateWithStartWorkflow. + // + // NOTE: Experimental + UpdateWithStartWorkflow(context.Context, *ClientUpdateWithStartWorkflowInput) (WorkflowUpdateHandle, error) + // PollWorkflowUpdate requests the outcome of a specific update from the // server. // @@ -416,6 +421,11 @@ type ClientUpdateWorkflowInput struct { WaitForStage WorkflowUpdateStage } +type ClientUpdateWithStartWorkflowInput struct { + UpdateOptions *UpdateWorkflowOptions + StartWorkflowOperation WithStartWorkflowOperation +} + // ClientPollWorkflowUpdateInput is the input to // ClientOutboundInterceptor.PollWorkflowUpdate. type ClientPollWorkflowUpdateInput struct { diff --git a/internal/interceptor_base.go b/internal/interceptor_base.go index 7ce4758ed..ff00e89e6 100644 --- a/internal/interceptor_base.go +++ b/internal/interceptor_base.go @@ -485,6 +485,13 @@ func (c *ClientOutboundInterceptorBase) PollWorkflowUpdate( return c.Next.PollWorkflowUpdate(ctx, in) } +func (c *ClientOutboundInterceptorBase) UpdateWithStartWorkflow( + ctx context.Context, + in *ClientUpdateWithStartWorkflowInput, +) (WorkflowUpdateHandle, error) { + return c.Next.UpdateWithStartWorkflow(ctx, in) +} + // ExecuteWorkflow implements ClientOutboundInterceptor.ExecuteWorkflow. func (c *ClientOutboundInterceptorBase) ExecuteWorkflow( ctx context.Context, diff --git a/internal/internal_workflow_client.go b/internal/internal_workflow_client.go index 2afc3065f..aa924a79f 100644 --- a/internal/internal_workflow_client.go +++ b/internal/internal_workflow_client.go @@ -69,9 +69,9 @@ var ( ) var ( - errUnsupportedOperation = fmt.Errorf("unsupported operation") - errInvalidServerResponse = fmt.Errorf("invalid server response") - errInvalidWorkflowOperation = fmt.Errorf("invalid WithStartOperation") + errUnsupportedOperation = fmt.Errorf("unsupported operation") + errInvalidServerResponse = fmt.Errorf("invalid server response") + errInvalidWithStartWorkflowOperation = fmt.Errorf("invalid WithStartWorkflowOperation") ) const ( @@ -237,29 +237,16 @@ func (wc *WorkflowClient) ExecuteWorkflow(ctx context.Context, options StartWork return nil, err } - // Default workflow ID - if options.ID == "" { - options.ID = uuid.New() - } + // Set header before interceptor run + ctx = contextWithNewHeader(ctx) - // Validate function and get name - if err := validateFunctionArgs(workflow, args, true); err != nil { - return nil, err - } - workflowType, err := getWorkflowFunctionName(wc.registry, workflow) + in, err := createStartWorkflowInput(options, workflow, args, wc.registry) if err != nil { return nil, err } - // Set header before interceptor run - ctx = contextWithNewHeader(ctx) - // Run via interceptor - return wc.interceptor.ExecuteWorkflow(ctx, &ClientExecuteWorkflowInput{ - Options: &options, - WorkflowType: workflowType, - Args: args, - }) + return wc.interceptor.ExecuteWorkflow(ctx, in) } // GetWorkflow gets a workflow execution and returns a WorkflowRun that will allow you to wait until this workflow @@ -338,9 +325,6 @@ func (wc *WorkflowClient) SignalWithStartWorkflow(ctx context.Context, workflowI if options.ID != "" && options.ID != workflowID { return nil, fmt.Errorf("workflow ID from options not used, must be unset or match workflow ID parameter") } - if options.WithStartOperation != nil { - return nil, fmt.Errorf("option WithStartOperation is not allowed") - } // Default workflow ID to UUID options.ID = workflowID @@ -370,6 +354,20 @@ func (wc *WorkflowClient) SignalWithStartWorkflow(ctx context.Context, workflowI }) } +func (wc *WorkflowClient) NewWithStartWorkflowOperation(options StartWorkflowOptions, workflow interface{}, args ...interface{}) WithStartWorkflowOperation { + op := &withStartWorkflowOperationImpl{doneCh: make(chan struct{})} + if options.WorkflowIDConflictPolicy == enumspb.WORKFLOW_ID_CONFLICT_POLICY_UNSPECIFIED { + op.err = errors.New("WorkflowIDConflictPolicy must be set in StartWorkflowOptions for update-with-start") + return op + } + input, err := createStartWorkflowInput(options, workflow, args, wc.registry) + if err != nil { + op.err = err + } + op.input = input + return op +} + // CancelWorkflow cancels a workflow in execution. It allows workflow to properly clean up and gracefully close. // workflowID is required, other parameters are optional. // If runID is omit, it will terminate currently running workflow (if there is one) based on the workflowID. @@ -805,6 +803,14 @@ type UpdateWorkflowOptions struct { FirstExecutionRunID string } +// UpdateWithStartWorkflowOptions encapsulates the parameters used by UpdateWithStartWorkflow. +// See UpdateWithStartWorkflow and NewWithStartWorkflowOperation. +// NOTE: Experimental +type UpdateWithStartWorkflowOptions struct { + StartWorkflowOperation WithStartWorkflowOperation + UpdateOptions UpdateWorkflowOptions +} + // WorkflowUpdateHandle is a handle to a workflow execution update process. The // update may or may not have completed so an instance of this type functions // similar to a Future with respect to the outcome of the update. If the update @@ -1170,7 +1176,7 @@ func (wc *WorkflowClient) UpdateWorkflow( return nil, err } - in, err := createUpdateWorkflowInput(options) + in, err := createUpdateWorkflowInput(&options) if err != nil { return nil, err } @@ -1180,6 +1186,35 @@ func (wc *WorkflowClient) UpdateWorkflow( return wc.interceptor.UpdateWorkflow(ctx, in) } +func (wc *WorkflowClient) UpdateWithStartWorkflow( + ctx context.Context, + options UpdateWithStartWorkflowOptions, +) (WorkflowUpdateHandle, error) { + startOp, ok := options.StartWorkflowOperation.(*withStartWorkflowOperationImpl) + if !ok { + return nil, fmt.Errorf("%w: startOperation must be created by NewWithStartWorkflowOperation", errInvalidWithStartWorkflowOperation) + } + if startOp.err != nil { + return nil, startOp.err + } + if err := wc.ensureInitialized(ctx); err != nil { + return nil, err + } + if options.UpdateOptions.RunID != "" { + return nil, errors.New("invalid UpdateWorkflowOptions: RunID cannot be set for UpdateWithStartWorkflow because the workflow might not be running") + } + if options.UpdateOptions.FirstExecutionRunID != "" { + return nil, errors.New("invalid UpdateWorkflowOptions: FirstExecutionRunID cannot be set for UpdateWithStartWorkflow because the workflow might not be running") + } + + ctx = contextWithNewHeader(ctx) + + return wc.interceptor.UpdateWithStartWorkflow(ctx, &ClientUpdateWithStartWorkflowInput{ + UpdateOptions: &options.UpdateOptions, + StartWorkflowOperation: startOp, + }) +} + // CheckHealthRequest is a request for Client.CheckHealth. type CheckHealthRequest struct{} @@ -1534,10 +1569,33 @@ type workflowClientInterceptor struct { client *WorkflowClient } -func (w *workflowClientInterceptor) ExecuteWorkflow( +func createStartWorkflowInput( + options StartWorkflowOptions, + workflow interface{}, + args []interface{}, + registry *registry, +) (*ClientExecuteWorkflowInput, error) { + if options.ID == "" { + options.ID = uuid.New() + } + if err := validateFunctionArgs(workflow, args, true); err != nil { + return nil, err + } + workflowType, err := getWorkflowFunctionName(registry, workflow) + if err != nil { + return nil, err + } + return &ClientExecuteWorkflowInput{ + Options: &options, + WorkflowType: workflowType, + Args: args, + }, nil +} + +func (w *workflowClientInterceptor) createStartWorkflowRequest( ctx context.Context, in *ClientExecuteWorkflowInput, -) (WorkflowRun, error) { +) (*workflowservice.StartWorkflowExecutionRequest, error) { // This is always set before interceptor is invoked workflowID := in.Options.ID if workflowID == "" { @@ -1608,44 +1666,49 @@ func (w *workflowClientInterceptor) ExecuteWorkflow( startRequest.RequestId = uuid.New() } + if in.Options.StartDelay != 0 { + startRequest.WorkflowStartDelay = durationpb.New(in.Options.StartDelay) + } + + return startRequest, nil +} + +func (w *workflowClientInterceptor) ExecuteWorkflow( + ctx context.Context, + in *ClientExecuteWorkflowInput, +) (WorkflowRun, error) { + startRequest, err := w.createStartWorkflowRequest(ctx, in) + if err != nil { + return nil, err + } + workflowID := startRequest.WorkflowId + var eagerExecutor *eagerWorkflowExecutor if in.Options.EnableEagerStart && w.client.capabilities.GetEagerWorkflowStart() && w.client.eagerDispatcher != nil { eagerExecutor = w.client.eagerDispatcher.applyToRequest(startRequest) } - if in.Options.StartDelay != 0 { - startRequest.WorkflowStartDelay = durationpb.New(in.Options.StartDelay) - } - grpcCtx, cancel := newGRPCContext(ctx, grpcMetricsHandler( w.client.metricsHandler.WithTags(metrics.RPCTags(in.WorkflowType, metrics.NoneTagValue, in.Options.TaskQueue))), defaultGrpcRetryParameters(ctx)) defer cancel() var runID string - if in.Options.WithStartOperation == nil { - response, err := w.client.workflowService.StartWorkflowExecution(grpcCtx, startRequest) - - eagerWorkflowTask := response.GetEagerWorkflowTask() - if eagerWorkflowTask != nil && eagerExecutor != nil { - eagerExecutor.handleResponse(eagerWorkflowTask) - } else if eagerExecutor != nil { - eagerExecutor.releaseUnused() - } + response, err := w.client.workflowService.StartWorkflowExecution(grpcCtx, startRequest) - // Allow already-started error - if e, ok := err.(*serviceerror.WorkflowExecutionAlreadyStarted); ok && !in.Options.WorkflowExecutionErrorWhenAlreadyStarted { - runID = e.RunId - } else if err != nil { - return nil, err - } else { - runID = response.RunId - } + eagerWorkflowTask := response.GetEagerWorkflowTask() + if eagerWorkflowTask != nil && eagerExecutor != nil { + eagerExecutor.handleResponse(eagerWorkflowTask) + } else if eagerExecutor != nil { + eagerExecutor.releaseUnused() + } + + // Allow already-started error + if e, ok := err.(*serviceerror.WorkflowExecutionAlreadyStarted); ok && !in.Options.WorkflowExecutionErrorWhenAlreadyStarted { + runID = e.RunId + } else if err != nil { + return nil, err } else { - response, err := w.executeWorkflowWithOperation(grpcCtx, startRequest, in.Options.WithStartOperation) - if err != nil { - return nil, err - } runID = response.RunId } @@ -1669,55 +1732,109 @@ func (w *workflowClientInterceptor) ExecuteWorkflow( }, nil } -func (w *workflowClientInterceptor) executeWorkflowWithOperation( +func (w *workflowClientInterceptor) UpdateWithStartWorkflow( ctx context.Context, - startRequest *workflowservice.StartWorkflowExecutionRequest, - operation WithStartWorkflowOperation, -) (*workflowservice.StartWorkflowExecutionResponse, error) { - startOp := &workflowservice.ExecuteMultiOperationRequest_Operation{ - Operation: &workflowservice.ExecuteMultiOperationRequest_Operation_StartWorkflow{ - StartWorkflow: startRequest, - }, + in *ClientUpdateWithStartWorkflowInput, +) (WorkflowUpdateHandle, error) { + startOp, ok := in.StartWorkflowOperation.(*withStartWorkflowOperationImpl) + if !ok { + return nil, fmt.Errorf("%w: startOperation must be created by NewWithStartWorkflowOperation", errInvalidWithStartWorkflowOperation) + } + if startOp.err != nil { + return nil, fmt.Errorf("%w: %w", errInvalidWithStartWorkflowOperation, startOp.err) } - var withStartOp *workflowservice.ExecuteMultiOperationRequest_Operation - switch t := operation.(type) { - case *UpdateWithStartWorkflowOperation: - if err := t.markExecuted(); err != nil { - return nil, fmt.Errorf("%w: %w", errInvalidWorkflowOperation, err) - } + // Create start request + if err := startOp.markExecuted(); err != nil { + return nil, fmt.Errorf("%w: %w", errInvalidWithStartWorkflowOperation, err) + } + startReq, err := w.createStartWorkflowRequest(ctx, startOp.input) + if err != nil { + return nil, err + } - if t.err != nil { - return nil, fmt.Errorf("%w: %w", errInvalidWorkflowOperation, t.err) - } + updateInput, err := createUpdateWorkflowInput(in.UpdateOptions) + if err != nil { + return nil, err + } - updateReq, err := w.createUpdateWorkflowRequest(ctx, t.input) - if err != nil { - return nil, fmt.Errorf("%w: %w", errInvalidWorkflowOperation, err) - } - if updateReq.WorkflowExecution.WorkflowId == "" { - updateReq.WorkflowExecution.WorkflowId = startRequest.WorkflowId - } + // Create update request + updateReq, err := w.createUpdateWorkflowRequest(ctx, updateInput) + if err != nil { + return nil, fmt.Errorf("%w: %w", errInvalidWithStartWorkflowOperation, err) + } + if updateReq.WorkflowExecution.WorkflowId == "" { + updateReq.WorkflowExecution.WorkflowId = startReq.WorkflowId + } - withStartOp = &workflowservice.ExecuteMultiOperationRequest_Operation{ - Operation: &workflowservice.ExecuteMultiOperationRequest_Operation_UpdateWorkflow{ - UpdateWorkflow: updateReq, - }, - } - default: - return nil, fmt.Errorf("%w: %T", errUnsupportedOperation, t) + grpcCtx, cancel := newGRPCContext( + ctx, + grpcMetricsHandler(w.client.metricsHandler.WithTags( + metrics.RPCTags(startOp.input.WorkflowType, metrics.NoneTagValue, startOp.input.Options.TaskQueue))), + defaultGrpcRetryParameters(ctx)) + defer cancel() + + iterFn := func(fnCtx context.Context, fnRunID string) HistoryEventIterator { + metricsHandler := w.client.metricsHandler.WithTags(metrics.RPCTags(startOp.input.WorkflowType, + metrics.NoneTagValue, startOp.input.Options.TaskQueue)) + return w.client.getWorkflowHistory(fnCtx, startOp.input.Options.ID, fnRunID, true, + enumspb.HISTORY_EVENT_FILTER_TYPE_CLOSE_EVENT, metricsHandler) + } + onStart := func(startResp *workflowservice.StartWorkflowExecutionResponse) { + runIDCell := util.PopulatedOnceCell(startResp.RunId) + startOp.set(&workflowRunImpl{ + workflowType: startOp.input.WorkflowType, + workflowID: startOp.input.Options.ID, + firstRunID: startResp.RunId, + currentRunID: &runIDCell, + iterFn: iterFn, + dataConverter: w.client.dataConverter, + failureConverter: w.client.failureConverter, + registry: w.client.registry, + }, nil) + } + + updateResp, err := w.updateWithStartWorkflow(grpcCtx, startReq, updateReq, onStart) + if err != nil { + return nil, err + } + handle, err := w.updateHandleFromResponse(ctx, updateReq.WaitPolicy.LifecycleStage, updateResp) + if err != nil { + return nil, fmt.Errorf("%w: %w", errInvalidWithStartWorkflowOperation, err) } + return handle, nil +} +// Perform update-with-start using the MultiOperation API. As with +// UpdateWorkflow, we issue the request repeatedly until the update is durable. +// The `onStart` callback is called once, the first time that a valid start +// response is received. +func (w *workflowClientInterceptor) updateWithStartWorkflow( + ctx context.Context, + startRequest *workflowservice.StartWorkflowExecutionRequest, + updateRequest *workflowservice.UpdateWorkflowExecutionRequest, + onStart func(*workflowservice.StartWorkflowExecutionResponse), +) (*workflowservice.UpdateWorkflowExecutionResponse, error) { + startOp := &workflowservice.ExecuteMultiOperationRequest_Operation{ + Operation: &workflowservice.ExecuteMultiOperationRequest_Operation_StartWorkflow{ + StartWorkflow: startRequest, + }, + } + updateOp := &workflowservice.ExecuteMultiOperationRequest_Operation{ + Operation: &workflowservice.ExecuteMultiOperationRequest_Operation_UpdateWorkflow{ + UpdateWorkflow: updateRequest, + }, + } multiRequest := workflowservice.ExecuteMultiOperationRequest{ Namespace: w.client.namespace, Operations: []*workflowservice.ExecuteMultiOperationRequest_Operation{ startOp, - withStartOp, + updateOp, }, } - var startResp *workflowservice.StartWorkflowExecutionResponse var updateResp *workflowservice.UpdateWorkflowExecutionResponse + seenStart := false for { multiResp, err := func() (*workflowservice.ExecuteMultiOperationResponse, error) { grpcCtx, cancel := newGRPCContext(ctx, grpcTimeout(pollUpdateTimeout), grpcLongPoll(true), defaultGrpcRetryParameters(ctx)) @@ -1758,7 +1875,7 @@ func (w *workflowClientInterceptor) executeWorkflowWithOperation( } case *workflowservice.ExecuteMultiOperationRequest_Operation_UpdateWorkflow: if !errors.As(opErr, &abortedErr) { - startErr = fmt.Errorf("%w: %w", errInvalidWorkflowOperation, opErr) + startErr = fmt.Errorf("%w: %w", errInvalidWithStartWorkflowOperation, opErr) } default: // this would only happen if a case statement for a newly added operation is missing above @@ -1781,7 +1898,10 @@ func (w *workflowClientInterceptor) executeWorkflowWithOperation( switch t := opReq.Operation.(type) { case *workflowservice.ExecuteMultiOperationRequest_Operation_StartWorkflow: if opResp, ok := resp.(*workflowservice.ExecuteMultiOperationResponse_Response_StartWorkflow); ok { - startResp = opResp.StartWorkflow + if !seenStart { + onStart(opResp.StartWorkflow) + seenStart = true + } } else { return nil, fmt.Errorf("%w: StartWorkflow response has the wrong type %T", errInvalidServerResponse, resp) } @@ -1801,14 +1921,7 @@ func (w *workflowClientInterceptor) executeWorkflowWithOperation( break } } - - handle, err := w.updateHandleFromResponse(ctx, enumspb.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_UNSPECIFIED, updateResp) - operation.(*UpdateWithStartWorkflowOperation).set(handle, err) - if err != nil { - return nil, fmt.Errorf("%w: %w", errInvalidWorkflowOperation, err) - } - - return startResp, nil + return updateResp, nil } func (w *workflowClientInterceptor) SignalWorkflow(ctx context.Context, in *ClientSignalWorkflowInput) error { @@ -2070,10 +2183,7 @@ func (w *workflowClientInterceptor) updateIsDurable(resp *workflowservice.Update resp.GetStage() != enumspb.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_UNSPECIFIED } -func createUpdateWorkflowInput( - options UpdateWorkflowOptions, -) (*ClientUpdateWorkflowInput, error) { - // Default update ID +func createUpdateWorkflowInput(options *UpdateWorkflowOptions) (*ClientUpdateWorkflowInput, error) { updateID := options.UpdateID if updateID == "" { updateID = uuid.New() diff --git a/internal/internal_workflow_client_test.go b/internal/internal_workflow_client_test.go index 8617163cf..f7b52abc4 100644 --- a/internal/internal_workflow_client_test.go +++ b/internal/internal_workflow_client_test.go @@ -1010,49 +1010,41 @@ func (s *workflowRunSuite) TestExecuteWorkflowWithUpdate_Retry() { }, }, nil) - updOp := NewUpdateWithStartWorkflowOperation( - UpdateWorkflowOptions{ - UpdateName: "update", - WaitForStage: WorkflowUpdateStageCompleted, - }) - - _, err := s.workflowClient.ExecuteWorkflow( - context.Background(), + startOp := s.workflowClient.NewWithStartWorkflowOperation( StartWorkflowOptions{ - ID: workflowID, - TaskQueue: taskqueue, - WithStartOperation: updOp, + ID: workflowID, + WorkflowIDConflictPolicy: enumspb.WORKFLOW_ID_CONFLICT_POLICY_FAIL, + TaskQueue: taskqueue, }, workflowType, ) + + _, err := s.workflowClient.UpdateWithStartWorkflow( + context.Background(), + UpdateWithStartWorkflowOptions{ + UpdateOptions: UpdateWorkflowOptions{ + UpdateName: "update", + WaitForStage: WorkflowUpdateStageCompleted, + }, + StartWorkflowOperation: startOp, + }, + ) s.NoError(err) } func (s *workflowRunSuite) TestExecuteWorkflowWithUpdate_OperationNotExecuted() { - s.workflowServiceClient.EXPECT().StartWorkflowExecution(gomock.Any(), gomock.Any(), gomock.Any()). - Return(&workflowservice.StartWorkflowExecutionResponse{ - RunId: runID, - }, nil) - - updOp := NewUpdateWithStartWorkflowOperation( - UpdateWorkflowOptions{ - UpdateName: "update", - WaitForStage: WorkflowUpdateStageCompleted, - }) - - ctxWithTimeout, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) - defer cancel() - _, err := s.workflowClient.ExecuteWorkflow( - ctxWithTimeout, + startOp := s.workflowClient.NewWithStartWorkflowOperation( StartWorkflowOptions{ - ID: workflowID, - TaskQueue: taskqueue, - // WithStartOperation is not specified! + ID: workflowID, + WorkflowIDConflictPolicy: enumspb.WORKFLOW_ID_CONFLICT_POLICY_FAIL, + TaskQueue: taskqueue, }, workflowType, ) - require.NoError(s.T(), err) - _, err = updOp.Get(ctxWithTimeout) + ctxWithTimeout, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) + defer cancel() + + _, err := startOp.Get(ctxWithTimeout) require.EqualError(s.T(), err, "context deadline exceeded: operation was not executed") } @@ -1092,22 +1084,26 @@ func (s *workflowRunSuite) TestExecuteWorkflowWithUpdate_Abort() { ExecuteMultiOperation(gomock.Any(), gomock.Any(), gomock.Any()). DoAndReturn(tt.respFunc) - updOp := NewUpdateWithStartWorkflowOperation( - UpdateWorkflowOptions{ - UpdateName: "update", - WaitForStage: WorkflowUpdateStageCompleted, - }) + startOp := s.workflowClient.NewWithStartWorkflowOperation( + StartWorkflowOptions{ + ID: workflowID, + WorkflowIDConflictPolicy: enumspb.WORKFLOW_ID_CONFLICT_POLICY_FAIL, + TaskQueue: taskqueue, + }, workflowType, + ) ctxWithTimeout, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) defer cancel() - _, err := s.workflowClient.ExecuteWorkflow( + _, err := s.workflowClient.UpdateWithStartWorkflow( ctxWithTimeout, - StartWorkflowOptions{ - ID: workflowID, - TaskQueue: taskqueue, - WithStartOperation: updOp, - }, workflowType, + UpdateWithStartWorkflowOptions{ + UpdateOptions: UpdateWorkflowOptions{ + UpdateName: "update", + WaitForStage: WorkflowUpdateStageCompleted, + }, + StartWorkflowOperation: startOp, + }, ) var expectedErr *WorkflowUpdateServiceTimeoutOrCanceledError @@ -1122,20 +1118,24 @@ func (s *workflowRunSuite) TestExecuteWorkflowWithUpdate_NonMultiOperationError( ExecuteMultiOperation(gomock.Any(), gomock.Any(), gomock.Any()). Return(nil, serviceerror.NewInternal("internal error")).Times(1) - updOp := NewUpdateWithStartWorkflowOperation( - UpdateWorkflowOptions{ - UpdateName: "update", - WaitForStage: WorkflowUpdateStageCompleted, - }) - - _, err := s.workflowClient.ExecuteWorkflow( - context.Background(), + startOp := s.workflowClient.NewWithStartWorkflowOperation( StartWorkflowOptions{ - ID: workflowID, - TaskQueue: taskqueue, - WithStartOperation: updOp, + ID: workflowID, + WorkflowIDConflictPolicy: enumspb.WORKFLOW_ID_CONFLICT_POLICY_FAIL, + TaskQueue: taskqueue, }, workflowType, ) + + _, err := s.workflowClient.UpdateWithStartWorkflow( + context.Background(), + UpdateWithStartWorkflowOptions{ + UpdateOptions: UpdateWorkflowOptions{ + UpdateName: "update", + WaitForStage: WorkflowUpdateStageCompleted, + }, + StartWorkflowOperation: startOp, + }, + ) s.ErrorContains(err, "internal error") } @@ -1146,20 +1146,24 @@ func (s *workflowRunSuite) TestExecuteWorkflowWithUpdate_ServerResponseCountMism Responses: []*workflowservice.ExecuteMultiOperationResponse_Response{}, }, nil).Times(1) - updOp := NewUpdateWithStartWorkflowOperation( - UpdateWorkflowOptions{ - UpdateName: "update", - WaitForStage: WorkflowUpdateStageCompleted, - }) - - _, err := s.workflowClient.ExecuteWorkflow( - context.Background(), + startOp := s.workflowClient.NewWithStartWorkflowOperation( StartWorkflowOptions{ - ID: workflowID, - TaskQueue: taskqueue, - WithStartOperation: updOp, + ID: workflowID, + WorkflowIDConflictPolicy: enumspb.WORKFLOW_ID_CONFLICT_POLICY_FAIL, + TaskQueue: taskqueue, }, workflowType, ) + + _, err := s.workflowClient.UpdateWithStartWorkflow( + context.Background(), + UpdateWithStartWorkflowOptions{ + UpdateOptions: UpdateWorkflowOptions{ + UpdateName: "update", + WaitForStage: WorkflowUpdateStageCompleted, + }, + StartWorkflowOperation: startOp, + }, + ) s.ErrorContains(err, "invalid server response: 0 instead of 2 operation results") } @@ -1168,20 +1172,24 @@ func (s *workflowRunSuite) TestExecuteWorkflowWithUpdate_ServerErrorResponseCoun ExecuteMultiOperation(gomock.Any(), gomock.Any(), gomock.Any()). Return(nil, serviceerror.NewMultiOperationExecution("Error", []error{})).Times(1) - updOp := NewUpdateWithStartWorkflowOperation( - UpdateWorkflowOptions{ - UpdateName: "update", - WaitForStage: WorkflowUpdateStageCompleted, - }) - - _, err := s.workflowClient.ExecuteWorkflow( - context.Background(), + startOp := s.workflowClient.NewWithStartWorkflowOperation( StartWorkflowOptions{ - ID: workflowID, - TaskQueue: taskqueue, - WithStartOperation: updOp, + ID: workflowID, + WorkflowIDConflictPolicy: enumspb.WORKFLOW_ID_CONFLICT_POLICY_FAIL, + TaskQueue: taskqueue, }, workflowType, ) + + _, err := s.workflowClient.UpdateWithStartWorkflow( + context.Background(), + UpdateWithStartWorkflowOptions{ + UpdateOptions: UpdateWorkflowOptions{ + UpdateName: "update", + WaitForStage: WorkflowUpdateStageCompleted, + }, + StartWorkflowOperation: startOp, + }, + ) s.ErrorContains(err, "invalid server response: 0 instead of 2 operation errors") } @@ -1197,20 +1205,24 @@ func (s *workflowRunSuite) TestExecuteWorkflowWithUpdate_ServerStartResponseType }, }, nil).Times(1) - updOp := NewUpdateWithStartWorkflowOperation( - UpdateWorkflowOptions{ - UpdateName: "update", - WaitForStage: WorkflowUpdateStageCompleted, - }) - - _, err := s.workflowClient.ExecuteWorkflow( - context.Background(), + startOp := s.workflowClient.NewWithStartWorkflowOperation( StartWorkflowOptions{ - ID: workflowID, - TaskQueue: taskqueue, - WithStartOperation: updOp, + ID: workflowID, + WorkflowIDConflictPolicy: enumspb.WORKFLOW_ID_CONFLICT_POLICY_FAIL, + TaskQueue: taskqueue, }, workflowType, ) + + _, err := s.workflowClient.UpdateWithStartWorkflow( + context.Background(), + UpdateWithStartWorkflowOptions{ + UpdateOptions: UpdateWorkflowOptions{ + UpdateName: "update", + WaitForStage: WorkflowUpdateStageCompleted, + }, + StartWorkflowOperation: startOp, + }, + ) s.ErrorContains(err, "invalid server response: StartWorkflow response has the wrong type *workflowservice.ExecuteMultiOperationResponse_Response_UpdateWorkflow") } @@ -1220,7 +1232,11 @@ func (s *workflowRunSuite) TestExecuteWorkflowWithUpdate_ServerUpdateResponseTyp Return(&workflowservice.ExecuteMultiOperationResponse{ Responses: []*workflowservice.ExecuteMultiOperationResponse_Response{ { - Response: &workflowservice.ExecuteMultiOperationResponse_Response_StartWorkflow{}, + Response: &workflowservice.ExecuteMultiOperationResponse_Response_StartWorkflow{ + StartWorkflow: &workflowservice.StartWorkflowExecutionResponse{ + RunId: "RUN_ID", + }, + }, }, { Response: &workflowservice.ExecuteMultiOperationResponse_Response_StartWorkflow{}, // wrong! @@ -1228,20 +1244,24 @@ func (s *workflowRunSuite) TestExecuteWorkflowWithUpdate_ServerUpdateResponseTyp }, }, nil).Times(1) - updOp := NewUpdateWithStartWorkflowOperation( - UpdateWorkflowOptions{ - UpdateName: "update", - WaitForStage: WorkflowUpdateStageCompleted, - }) - - _, err := s.workflowClient.ExecuteWorkflow( - context.Background(), + startOp := s.workflowClient.NewWithStartWorkflowOperation( StartWorkflowOptions{ - ID: workflowID, - TaskQueue: taskqueue, - WithStartOperation: updOp, + ID: workflowID, + WorkflowIDConflictPolicy: enumspb.WORKFLOW_ID_CONFLICT_POLICY_FAIL, + TaskQueue: taskqueue, }, workflowType, ) + + _, err := s.workflowClient.UpdateWithStartWorkflow( + context.Background(), + UpdateWithStartWorkflowOptions{ + UpdateOptions: UpdateWorkflowOptions{ + UpdateName: "update", + WaitForStage: WorkflowUpdateStageCompleted, + }, + StartWorkflowOperation: startOp, + }, + ) s.ErrorContains(err, "invalid server response: UpdateWorkflow response has the wrong type *workflowservice.ExecuteMultiOperationResponse_Response_StartWorkflow") } @@ -1358,15 +1378,6 @@ func (s *workflowClientTestSuite) TestSignalWithStartWorkflowValidation() { context.Background(), "workflow-id-1", "my-signal", "my-signal-value", StartWorkflowOptions{ID: "workflow-id-2"}, workflowType) s.ErrorContains(err, "workflow ID from options not used") - - // unsupported WithStartOperation - _, err = s.client.SignalWithStartWorkflow( - context.Background(), "workflow-id", "my-signal", "my-signal-value", - StartWorkflowOptions{ - ID: "workflow-id", - WithStartOperation: &UpdateWithStartWorkflowOperation{}, - }, workflowType) - s.ErrorContains(err, "option WithStartOperation is not allowed") } func (s *workflowClientTestSuite) TestStartWorkflow() { diff --git a/internal/nexus_operations.go b/internal/nexus_operations.go index 80bd4e98a..6a73e6fc8 100644 --- a/internal/nexus_operations.go +++ b/internal/nexus_operations.go @@ -264,6 +264,10 @@ func (t *testSuiteClientForNexusOperations) ExecuteWorkflow(ctx context.Context, return run, nil } +func (t *testSuiteClientForNexusOperations) NewWithStartWorkflowOperation(options StartWorkflowOptions, workflow interface{}, args ...interface{}) WithStartWorkflowOperation { + panic("not implemented in the test environment") +} + // GetSearchAttributes implements Client. func (t *testSuiteClientForNexusOperations) GetSearchAttributes(ctx context.Context) (*workflowservice.GetSearchAttributesResponse, error) { panic("not implemented in the test environment") @@ -379,6 +383,11 @@ func (t *testSuiteClientForNexusOperations) UpdateWorkflow(ctx context.Context, panic("unimplemented in the test environment") } +// UpdateWithStartWorkflow implements Client. +func (t *testSuiteClientForNexusOperations) UpdateWithStartWorkflow(ctx context.Context, options UpdateWithStartWorkflowOptions) (WorkflowUpdateHandle, error) { + panic("unimplemented in the test environment") +} + // UpdateWorkerBuildIdCompatibility implements Client. func (t *testSuiteClientForNexusOperations) UpdateWorkerBuildIdCompatibility(ctx context.Context, options *UpdateWorkerBuildIdCompatibilityOptions) error { panic("not implemented in the test environment") diff --git a/mocks/Client.go b/mocks/Client.go index 0b5f9102b..8b0ee7233 100644 --- a/mocks/Client.go +++ b/mocks/Client.go @@ -22,7 +22,7 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -// Code generated by mockery v1.0.0. +// Code generated by mockery v1.0.0, and other versions, with manual fixups. // Modified manually for type alias to work correctly. // https://github.com/vektra/mockery/issues/236 @@ -601,6 +601,29 @@ func (_m *Client) ListWorkflow(ctx context.Context, request *workflowservice.Lis return r0, r1 } +// NewWithStartWorkflowOperation provides a mock function with given fields: options, workflow, args +func (_m *Client) NewWithStartWorkflowOperation(options client.StartWorkflowOptions, workflow interface{}, args ...interface{}) client.WithStartWorkflowOperation { + var _ca []interface{} + _ca = append(_ca, options, workflow) + _ca = append(_ca, args...) + ret := _m.Called(_ca...) + + if len(ret) == 0 { + panic("no return value specified for NewWithStartWorkflowOperation") + } + + var r0 client.WithStartWorkflowOperation + if rf, ok := ret.Get(0).(func(client.StartWorkflowOptions, interface{}, ...interface{}) client.WithStartWorkflowOperation); ok { + r0 = rf(options, workflow, args...) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(client.WithStartWorkflowOperation) + } + } + + return r0 +} + // OperatorService provides a mock function with given fields: func (_m *Client) OperatorService() operatorservice.OperatorServiceClient { ret := _m.Called() @@ -878,6 +901,35 @@ func (_m *Client) TerminateWorkflow(ctx context.Context, workflowID string, runI return r0 } +// UpdateWithStartWorkflow provides a mock function with given fields: ctx, options +func (_m *Client) UpdateWithStartWorkflow(ctx context.Context, options client.UpdateWithStartWorkflowOptions) (client.WorkflowUpdateHandle, error) { + ret := _m.Called(ctx, options) + + if len(ret) == 0 { + panic("no return value specified for UpdateWithStartWorkflow") + } + + var r0 client.WorkflowUpdateHandle + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, client.UpdateWithStartWorkflowOptions) (client.WorkflowUpdateHandle, error)); ok { + return rf(ctx, options) + } + if rf, ok := ret.Get(0).(func(context.Context, client.UpdateWithStartWorkflowOptions) client.WorkflowUpdateHandle); ok { + r0 = rf(ctx, options) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(client.WorkflowUpdateHandle) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, client.UpdateWithStartWorkflowOptions) error); ok { + r1 = rf(ctx, options) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} // UpdateWorkerBuildIdCompatibility provides a mock function with given fields: ctx, options // //lint:ignore SA1019 ignore for SDK mocks diff --git a/test/integration_test.go b/test/integration_test.go index bf6b48df2..bafe9667a 100644 --- a/test/integration_test.go +++ b/test/integration_test.go @@ -2536,14 +2536,18 @@ func (ts *IntegrationTestSuite) TestInterceptorStartWithSignal() { } func (ts *IntegrationTestSuite) TestOpenTelemetryTracing() { - ts.testOpenTelemetryTracing(true) + ts.testOpenTelemetryTracing(true, false) +} + +func (ts *IntegrationTestSuite) TestOpenTelemetryTracingWithUpdateWithStart() { + ts.testOpenTelemetryTracing(true, true) } func (ts *IntegrationTestSuite) TestOpenTelemetryTracingWithoutMessages() { - ts.testOpenTelemetryTracing(false) + ts.testOpenTelemetryTracing(false, false) } -func (ts *IntegrationTestSuite) testOpenTelemetryTracing(withMessages bool) { +func (ts *IntegrationTestSuite) testOpenTelemetryTracing(withMessages bool, updateWithStart bool) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() // Start a top-level span @@ -2561,15 +2565,31 @@ func (ts *IntegrationTestSuite) testOpenTelemetryTracing(withMessages bool) { ts.NoError(val.Get(&queryResp)) ts.Equal("query-response", queryResp) - // Update - handle, err := ts.client.UpdateWorkflow(ctx, client.UpdateWorkflowOptions{ - WorkflowID: run.GetID(), - RunID: run.GetRunID(), - UpdateName: "workflow-update", - WaitForStage: client.WorkflowUpdateStageCompleted, - }) - ts.NoError(err) - ts.NoError(handle.Get(ctx, nil)) + if updateWithStart { + uwsStartOptions := ts.startWorkflowOptions(run.GetID()) + uwsStartOptions.EnableEagerStart = false + uwsStartOptions.WorkflowIDConflictPolicy = enumspb.WORKFLOW_ID_CONFLICT_POLICY_USE_EXISTING + startOp := ts.client.NewWithStartWorkflowOperation(uwsStartOptions, ts.workflows.SignalsQueriesAndUpdate, true, true) + updateHandle, err := ts.client.UpdateWithStartWorkflow(ctx, client.UpdateWithStartWorkflowOptions{ + UpdateOptions: client.UpdateWorkflowOptions{ + WorkflowID: run.GetID(), + UpdateName: "workflow-update", + WaitForStage: client.WorkflowUpdateStageCompleted, + }, + StartWorkflowOperation: startOp, + }) + ts.NoError(err) + ts.NoError(updateHandle.Get(ctx, nil)) + } else { + handle, err := ts.client.UpdateWorkflow(ctx, client.UpdateWorkflowOptions{ + WorkflowID: run.GetID(), + RunID: run.GetRunID(), + UpdateName: "workflow-update", + WaitForStage: client.WorkflowUpdateStageCompleted, + }) + ts.NoError(err) + ts.NoError(handle.Get(ctx, nil)) + } // Finish signal ts.NoError(ts.client.SignalWorkflow(ctx, run.GetID(), run.GetRunID(), "finish-signal", nil)) @@ -2579,6 +2599,11 @@ func (ts *IntegrationTestSuite) testOpenTelemetryTracing(withMessages bool) { rootSpan.End() spans := ts.openTelemetrySpanRecorder.Ended() + updateOpName := "UpdateWorkflow" + if updateWithStart { + updateOpName = "UpdateWithStartWorkflow" + } + // Span builder span := func(name string, children ...*interceptortest.SpanInfo) *interceptortest.SpanInfo { // If without signal-and-query headers, filter out those children in place @@ -2590,7 +2615,7 @@ func (ts *IntegrationTestSuite) testOpenTelemetryTracing(withMessages bool) { strings.HasPrefix(child.Name, "HandleSignal:") || strings.HasPrefix(child.Name, "QueryWorkflow:") || strings.HasPrefix(child.Name, "HandleQuery:") || - strings.HasPrefix(child.Name, "UpdateWorkflow:") || + strings.HasPrefix(child.Name, fmt.Sprintf("%s:", updateOpName)) || strings.HasPrefix(child.Name, "ValidateUpdate:") || strings.HasPrefix(child.Name, "HandleUpdate:") if !isMessage { @@ -2658,7 +2683,7 @@ func (ts *IntegrationTestSuite) testOpenTelemetryTracing(withMessages bool) { span("QueryWorkflow:workflow-query", span("HandleQuery:workflow-query"), ), - span("UpdateWorkflow:workflow-update", + span(fmt.Sprintf("%s:workflow-update", updateOpName), span("ValidateUpdate:workflow-update"), span("HandleUpdate:workflow-update", // Child workflow exec @@ -3982,231 +4007,266 @@ func (ts *IntegrationTestSuite) TestUpdateSettingHandlerInHandler() { ts.NoError(run.Get(ctx, nil)) } -func (ts *IntegrationTestSuite) TestExecuteWorkflowWithUpdate() { +func (ts *IntegrationTestSuite) TestUpdateWithStartWorkflow() { ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() - startOptionsWithOperation := func(op client.WithStartWorkflowOperation) client.StartWorkflowOptions { - startOptions := ts.startWorkflowOptions("test-update-with-start-" + uuid.New()) - startOptions.EnableEagerStart = false // not allowed to use with update-with-start - startOptions.WithStartOperation = op - return startOptions + startWorkflowOptions := func() client.StartWorkflowOptions { + opts := ts.startWorkflowOptions("test-update-with-start-" + uuid.New()) + opts.EnableEagerStart = false // not allowed to use with update-with-start + opts.WorkflowIDConflictPolicy = enumspb.WORKFLOW_ID_CONFLICT_POLICY_FAIL // required for update-with-start + return opts } ts.Run("sends update-with-start (no running workflow)", func() { - updateOp := client.NewUpdateWithStartWorkflowOperation( - client.UpdateWorkflowOptions{ + startOp := ts.client.NewWithStartWorkflowOperation( + startWorkflowOptions(), ts.workflows.UpdateEntityWorkflow, + ) + updHandle, err := ts.client.UpdateWithStartWorkflow(ctx, client.UpdateWithStartWorkflowOptions{ + UpdateOptions: client.UpdateWorkflowOptions{ UpdateName: "update", Args: []any{1}, WaitForStage: client.WorkflowUpdateStageAccepted, - }) - - startOptions := startOptionsWithOperation(updateOp) - run, err := ts.client.ExecuteWorkflow(ctx, startOptions, ts.workflows.UpdateEntityWorkflow) + }, + StartWorkflowOperation: startOp, + }) ts.NoError(err) var updateResult int - updHandle, err := updateOp.Get(ctx) - ts.NoError(err) ts.NoError(updHandle.Get(ctx, &updateResult)) ts.Equal(1, updateResult) + run, err := startOp.Get(ctx) + ts.NoError(err) var workflowResult int ts.NoError(run.Get(ctx, &workflowResult)) ts.Equal(1, workflowResult) }) ts.Run("sends update-with-start (already running workflow)", func() { - startOptions := startOptionsWithOperation(nil) + startOptions := startWorkflowOptions() run1, err := ts.client.ExecuteWorkflow(ctx, startOptions, ts.workflows.UpdateEntityWorkflow) ts.NoError(err) - updateOp := client.NewUpdateWithStartWorkflowOperation( - client.UpdateWorkflowOptions{ + startOptions.WorkflowIDConflictPolicy = enumspb.WORKFLOW_ID_CONFLICT_POLICY_USE_EXISTING + startOp := ts.client.NewWithStartWorkflowOperation(startOptions, ts.workflows.UpdateEntityWorkflow) + + updHandle, err := ts.client.UpdateWithStartWorkflow(ctx, client.UpdateWithStartWorkflowOptions{ + UpdateOptions: client.UpdateWorkflowOptions{ UpdateName: "update", Args: []any{1}, WaitForStage: client.WorkflowUpdateStageCompleted, - }) + }, + StartWorkflowOperation: startOp, + }) + ts.NoError(err) - startOptions.WithStartOperation = updateOp - startOptions.WorkflowIDConflictPolicy = enumspb.WORKFLOW_ID_CONFLICT_POLICY_USE_EXISTING - run2, err := ts.client.ExecuteWorkflow(ctx, startOptions, ts.workflows.UpdateEntityWorkflow) + run2, err := startOp.Get(ctx) ts.NoError(err) ts.Equal(run1.GetRunID(), run2.GetRunID()) var updateResult int - updHandle, err := updateOp.Get(ctx) - ts.NoError(err) ts.NoError(updHandle.Get(ctx, &updateResult)) ts.Equal(1, updateResult) }) ts.Run("sends update-with-start but update is rejected", func() { - updateOp := client.NewUpdateWithStartWorkflowOperation( - client.UpdateWorkflowOptions{ + startOp := ts.client.NewWithStartWorkflowOperation(startWorkflowOptions(), ts.workflows.UpdateEntityWorkflow) + + updHandle, err := ts.client.UpdateWithStartWorkflow(ctx, client.UpdateWithStartWorkflowOptions{ + UpdateOptions: client.UpdateWorkflowOptions{ UpdateName: "update", Args: []any{-1}, // rejected update payload WaitForStage: client.WorkflowUpdateStageCompleted, - }) + }, + StartWorkflowOperation: startOp, + }) + ts.NoError(err) - startOptions := startOptionsWithOperation(updateOp) - run, err := ts.client.ExecuteWorkflow(ctx, startOptions, ts.workflows.UpdateEntityWorkflow) + run, err := startOp.Get(ctx) ts.NoError(err) ts.NotNil(run) var updateResult int - updHandle, err := updateOp.Get(ctx) - ts.NoError(err) err = updHandle.Get(ctx, &updateResult) ts.ErrorContains(err, "addend must be non-negative") }) - ts.Run("receives update result in separate goroutines", func() { - updateOp := client.NewUpdateWithStartWorkflowOperation( - client.UpdateWorkflowOptions{ + ts.Run("receives results in separate goroutines", func() { + + startOp := ts.client.NewWithStartWorkflowOperation(startWorkflowOptions(), ts.workflows.UpdateEntityWorkflow) + + done1 := make(chan struct{}) + defer func() { <-done1 }() + go func() { + run, err := startOp.Get(ctx) + ts.NoError(err) + ts.NotNil(run) + done1 <- struct{}{} + }() + + updHandle, err := ts.client.UpdateWithStartWorkflow(ctx, client.UpdateWithStartWorkflowOptions{ + UpdateOptions: client.UpdateWorkflowOptions{ UpdateName: "update", Args: []any{1}, WaitForStage: client.WorkflowUpdateStageAccepted, - }) + }, + StartWorkflowOperation: startOp, + }) + ts.NoError(err) - done := make(chan struct{}) - defer func() { <-done }() + done2 := make(chan struct{}) + defer func() { <-done2 }() go func() { var updateResult int - updHandle, err := updateOp.Get(ctx) - ts.NoError(err) ts.NoError(updHandle.Get(ctx, &updateResult)) ts.Equal(1, updateResult) - done <- struct{}{} + done2 <- struct{}{} }() - startOptions := startOptionsWithOperation(updateOp) - _, err := ts.client.ExecuteWorkflow(ctx, startOptions, ts.workflows.UpdateEntityWorkflow) - ts.NoError(err) - var updateResult int - updHandle, err := updateOp.Get(ctx) - ts.NoError(err) ts.NoError(updHandle.Get(ctx, &updateResult)) ts.Equal(1, updateResult) }) ts.Run("fails when start request is invalid", func() { - updateOp := client.NewUpdateWithStartWorkflowOperation( - client.UpdateWorkflowOptions{ - UpdateName: "update", - WaitForStage: client.WorkflowUpdateStageCompleted, - }) + updateOptions := client.UpdateWorkflowOptions{ + UpdateName: "update", + WaitForStage: client.WorkflowUpdateStageCompleted, + } + startOptions := startWorkflowOptions() - startOptions := startOptionsWithOperation(updateOp) startOptions.CronSchedule = "invalid!" - _, err := ts.client.ExecuteWorkflow(ctx, startOptions, ts.workflows.UpdateEntityWorkflow) + startOp := ts.client.NewWithStartWorkflowOperation(startOptions, ts.workflows.UpdateEntityWorkflow) + _, err := ts.client.UpdateWithStartWorkflow(ctx, client.UpdateWithStartWorkflowOptions{ + UpdateOptions: updateOptions, + StartWorkflowOperation: startOp, + }) ts.Error(err) + + startOptions.WorkflowIDConflictPolicy = enumspb.WORKFLOW_ID_CONFLICT_POLICY_UNSPECIFIED + startOp = ts.client.NewWithStartWorkflowOperation(startOptions, ts.workflows.UpdateEntityWorkflow) + _, err = ts.client.UpdateWithStartWorkflow(ctx, client.UpdateWithStartWorkflowOptions{ + UpdateOptions: updateOptions, + StartWorkflowOperation: startOp, + }) + ts.ErrorContains(err, "WorkflowIDConflictPolicy must be set") }) ts.Run("fails when update operation is invalid", func() { - updateOp := client.NewUpdateWithStartWorkflowOperation( - client.UpdateWorkflowOptions{ - // invalid - }) + startOptions := startWorkflowOptions() - startOptions := startOptionsWithOperation(updateOp) - _, err := ts.client.ExecuteWorkflow(ctx, startOptions, ts.workflows.UpdateEntityWorkflow) - ts.ErrorContains(err, "invalid WithStartOperation: WaitForStage must be specified") + startOp := ts.client.NewWithStartWorkflowOperation(startOptions, ts.workflows.UpdateEntityWorkflow) + + _, err := ts.client.UpdateWithStartWorkflow(ctx, client.UpdateWithStartWorkflowOptions{ + UpdateOptions: client.UpdateWorkflowOptions{ + // invalid + }, + StartWorkflowOperation: startOp, + }) + ts.ErrorContains(err, "WaitForStage must be specified") - updateOp = client.NewUpdateWithStartWorkflowOperation( - client.UpdateWorkflowOptions{ + _, err = ts.client.UpdateWithStartWorkflow(ctx, client.UpdateWithStartWorkflowOptions{ + UpdateOptions: client.UpdateWorkflowOptions{ RunID: "invalid", WaitForStage: client.WorkflowUpdateStageCompleted, - }) - - startOptions = startOptionsWithOperation(updateOp) - _, err = ts.client.ExecuteWorkflow(ctx, startOptions, ts.workflows.UpdateEntityWorkflow) - ts.ErrorContains(err, "invalid WithStartOperation: RunID cannot be set because the workflow might not be running") + }, + StartWorkflowOperation: startOp, + }) + ts.ErrorContains(err, "invalid UpdateWorkflowOptions: RunID cannot be set for UpdateWithStartWorkflow because the workflow might not be running") - updateOp = client.NewUpdateWithStartWorkflowOperation( - client.UpdateWorkflowOptions{ + _, err = ts.client.UpdateWithStartWorkflow(ctx, client.UpdateWithStartWorkflowOptions{ + UpdateOptions: client.UpdateWorkflowOptions{ FirstExecutionRunID: "invalid", WaitForStage: client.WorkflowUpdateStageCompleted, - }) - - startOptions = startOptionsWithOperation(updateOp) - _, err = ts.client.ExecuteWorkflow(ctx, startOptions, ts.workflows.UpdateEntityWorkflow) - ts.ErrorContains(err, "invalid WithStartOperation: FirstExecutionRunID cannot be set because the workflow might not be running") + }, + StartWorkflowOperation: startOp, + }) + ts.ErrorContains(err, "invalid UpdateWorkflowOptions: FirstExecutionRunID cannot be set for UpdateWithStartWorkflow because the workflow might not be running") - updateOp = client.NewUpdateWithStartWorkflowOperation( - client.UpdateWorkflowOptions{ + _, err = ts.client.UpdateWithStartWorkflow(ctx, client.UpdateWithStartWorkflowOptions{ + UpdateOptions: client.UpdateWorkflowOptions{ UpdateName: "", // invalid WaitForStage: client.WorkflowUpdateStageCompleted, - }) - - startOptions = startOptionsWithOperation(updateOp) - _, err = ts.client.ExecuteWorkflow(ctx, startOptions, ts.workflows.UpdateEntityWorkflow) - ts.ErrorContains(err, "invalid WithStartOperation: ") // omitting server message intentionally + }, + StartWorkflowOperation: startOp, + }) + ts.ErrorContains(err, "invalid WithStartWorkflowOperation: ") // omitting server message intentionally - updateOp = client.NewUpdateWithStartWorkflowOperation( - client.UpdateWorkflowOptions{ + _, err = ts.client.UpdateWithStartWorkflow(ctx, client.UpdateWithStartWorkflowOptions{ + UpdateOptions: client.UpdateWorkflowOptions{ WorkflowID: "different", // does not match Start's UpdateName: "update", WaitForStage: client.WorkflowUpdateStageCompleted, - }) - - startOptions = startOptionsWithOperation(updateOp) - _, err = ts.client.ExecuteWorkflow(ctx, startOptions, ts.workflows.UpdateEntityWorkflow) - ts.ErrorContains(err, "invalid WithStartOperation: ") // omitting server message intentionally + }, + StartWorkflowOperation: startOp, + }) + ts.ErrorContains(err, "invalid WithStartWorkflowOperation: ") // omitting server message intentionally }) ts.Run("fails when workflow is already running", func() { - startOptions := startOptionsWithOperation(nil) + startOptions := startWorkflowOptions() _, err := ts.client.ExecuteWorkflow(ctx, startOptions, ts.workflows.UpdateEntityWorkflow) ts.NoError(err) + startOp := ts.client.NewWithStartWorkflowOperation(startOptions, ts.workflows.UpdateEntityWorkflow) - updateOp := client.NewUpdateWithStartWorkflowOperation( - client.UpdateWorkflowOptions{ + _, err = ts.client.UpdateWithStartWorkflow(ctx, client.UpdateWithStartWorkflowOptions{ + UpdateOptions: client.UpdateWorkflowOptions{ UpdateName: "update", Args: []any{1}, WaitForStage: client.WorkflowUpdateStageCompleted, - }) + }, + StartWorkflowOperation: startOp, + }) - startOptions.WithStartOperation = updateOp // NOTE that WorkflowExecutionErrorWhenAlreadyStarted (defaults to false) has no impact - _, err = ts.client.ExecuteWorkflow(ctx, startOptions, ts.workflows.UpdateEntityWorkflow) ts.ErrorContains(err, "Workflow execution is already running") }) ts.Run("fails when executed twice", func() { - updateOp := client.NewUpdateWithStartWorkflowOperation( - client.UpdateWorkflowOptions{ - UpdateName: "update", - Args: []any{1}, - WaitForStage: client.WorkflowUpdateStageCompleted, - }) + startOp := ts.client.NewWithStartWorkflowOperation(startWorkflowOptions(), ts.workflows.UpdateEntityWorkflow) - startOptions := startOptionsWithOperation(updateOp) - _, err := ts.client.ExecuteWorkflow(ctx, startOptions, ts.workflows.UpdateEntityWorkflow) + updateOptions := client.UpdateWorkflowOptions{ + UpdateName: "update", + Args: []any{1}, + WaitForStage: client.WorkflowUpdateStageCompleted, + } + _, err := ts.client.UpdateWithStartWorkflow(ctx, client.UpdateWithStartWorkflowOptions{ + UpdateOptions: updateOptions, + StartWorkflowOperation: startOp, + }) ts.NoError(err) - _, err = ts.client.ExecuteWorkflow(ctx, startOptions, ts.workflows.UpdateEntityWorkflow) - ts.ErrorContains(err, "invalid WithStartOperation: was already executed") + _, err = ts.client.UpdateWithStartWorkflow(ctx, client.UpdateWithStartWorkflowOptions{ + UpdateOptions: updateOptions, + StartWorkflowOperation: startOp, + }) + ts.ErrorContains(err, "invalid WithStartWorkflowOperation: was already executed") }) ts.Run("propagates context", func() { - updateOp := client.NewUpdateWithStartWorkflowOperation( - client.UpdateWorkflowOptions{ - UpdateName: "update", - Args: []any{1}, - WaitForStage: client.WorkflowUpdateStageCompleted, - }) + startOp := ts.client.NewWithStartWorkflowOperation(startWorkflowOptions(), ts.workflows.ContextPropagator, true) - var propagatedValues []string ctx := context.Background() // Propagate values using different context propagators. ctx = context.WithValue(ctx, contextKey(testContextKey1), "propagatedValue1") ctx = context.WithValue(ctx, contextKey(testContextKey2), "propagatedValue2") ctx = context.WithValue(ctx, contextKey(testContextKey3), "non-propagatedValue") - startOptions := startOptionsWithOperation(updateOp) - err := ts.executeWorkflowWithContextAndOption(ctx, startOptions, ts.workflows.ContextPropagator, &propagatedValues, true) + + _, err := ts.client.UpdateWithStartWorkflow(ctx, client.UpdateWithStartWorkflowOptions{ + UpdateOptions: client.UpdateWorkflowOptions{ + UpdateName: "update", + Args: []any{1}, + WaitForStage: client.WorkflowUpdateStageCompleted, + }, + StartWorkflowOperation: startOp, + }) + ts.NoError(err) + + var propagatedValues []string + run, err := startOp.Get(ctx) ts.NoError(err) + ts.NoError(run.Get(ctx, &propagatedValues)) // One copy from workflow and one copy from activity * 2 for child workflow ts.EqualValues([]string{