diff --git a/client/client.go b/client/client.go index e7eaf621f..cca121ba5 100644 --- a/client/client.go +++ b/client/client.go @@ -162,16 +162,11 @@ type ( // StartWorkflowOptions configuration parameters for starting a workflow execution. StartWorkflowOptions = internal.StartWorkflowOptions - // WithStartWorkflowOperation is a type of operation that can be executed as part of a workflow start. - // For example, use NewUpdateWithStartWorkflowOperation to perform Update-with-Start. + // WithStartWorkflowOperation defines how to start a workflow when using UpdateWithStartWorkflow. + // See [Client.NewWithStartWorkflowOperation] and [Client.UpdateWithStartWorkflow]. // NOTE: Experimental WithStartWorkflowOperation = internal.WithStartWorkflowOperation - // UpdateWithStartWorkflowOperation is used to perform Update-with-Start. - // See NewUpdateWithStartWorkflowOperation for details. - // NOTE: Experimental - UpdateWithStartWorkflowOperation = internal.UpdateWithStartWorkflowOperation - // HistoryEventIterator is a iterator which can return history events. HistoryEventIterator = internal.HistoryEventIterator @@ -279,6 +274,11 @@ type ( // NOTE: Experimental UpdateWorkflowOptions = internal.UpdateWorkflowOptions + // UpdateWithStartWorkflowOptions encapsulates the parameters used by UpdateWithStartWorkflow. + // See [Client.UpdateWithStartWorkflow] and [Client.NewWithStartWorkflowOperation]. + // NOTE: Experimental + UpdateWithStartWorkflowOptions = internal.UpdateWithStartWorkflowOptions + // WorkflowUpdateHandle represents a running or completed workflow // execution update and gives the holder access to the outcome of the same. // NOTE: Experimental @@ -564,6 +564,11 @@ type ( SignalWithStartWorkflow(ctx context.Context, workflowID string, signalName string, signalArg interface{}, options StartWorkflowOptions, workflow interface{}, workflowArgs ...interface{}) (WorkflowRun, error) + // NewWithStartWorkflowOperation returns a WithStartWorkflowOperation for use with UpdateWithStartWorkflow. + // See [Client.UpdateWithStartWorkflow]. + // NOTE: Experimental + NewWithStartWorkflowOperation(options StartWorkflowOptions, workflow interface{}, args ...interface{}) WithStartWorkflowOperation + // CancelWorkflow request cancellation of a workflow in execution. Cancellation request closes the channel // returned by the workflow.Context.Done() of the workflow that is target of the request. // - workflow ID of the workflow. @@ -826,20 +831,36 @@ type ( // API. If the check fails, an error is returned. CheckHealth(ctx context.Context, request *CheckHealthRequest) (*CheckHealthResponse, error) - // UpdateWorkflow issues an update request to the - // specified workflow execution and returns a handle to the update that - // is running in in parallel with the calling thread. Errors returned - // from the server will be exposed through the return value of - // WorkflowUpdateHandle.Get(). Errors that occur before the - // update is requested (e.g. if the required workflow ID field is - // missing from the UpdateWorkflowOptions) are returned - // directly from this function call. + // UpdateWorkflow issues an update request to the specified workflow and + // returns a handle to the update. The call will block until the update + // has reached the WaitForStage in the options. Note that this means + // that the call will not return successfully until the update has been + // delivered to a worker. Errors returned from the update handler or its + // validator will be exposed through the return value of + // WorkflowUpdateHandle.Get(). Errors that occur before the update is + // delivered to the workflow (e.g. if the required workflow ID field is + // missing from the UpdateWorkflowOptions) are returned directly from + // this function call. // // The errors it can return: // - WorkflowUpdateServiceTimeoutOrCanceledError // NOTE: Experimental UpdateWorkflow(ctx context.Context, options UpdateWorkflowOptions) (WorkflowUpdateHandle, error) + // UpdateWithStartWorkflow issues an update-with-start request. A + // WorkflowIDConflictPolicy must be set in the options. If the specified + // workflow execution is not running, then a new workflow execution is + // started and the update is sent in the first workflow task. + // Alternatively if the specified workflow execution is running then, if + // the WorkflowIDConflictPolicy is USE_EXISTING, the update is issued + // against the specified workflow, and if the WorkflowIDConflictPolicy + // is FAIL, an error is returned. The call will block until the update + // has reached the WaitForStage in the options. Note that this means + // that the call will not return successfully until the update has been + // delivered to a worker. + // NOTE: Experimental + UpdateWithStartWorkflow(ctx context.Context, options UpdateWithStartWorkflowOptions) (WorkflowUpdateHandle, error) + // GetWorkflowUpdateHandle creates a handle to the referenced update // which can be polled for an outcome. Note that runID is optional and // if not specified the most recent runID will be used. @@ -934,14 +955,6 @@ type MetricsTimer = metrics.Timer // MetricsNopHandler is a noop handler that does nothing with the metrics. var MetricsNopHandler = metrics.NopHandler -// NewUpdateWithStartWorkflowOperation returns an UpdateWithStartWorkflowOperation to perform Update-with-Start. -// After executing Client.ExecuteWorkflow with the UpdateWithStartWorkflow in the start options, -// the update result can be obtained. -// NOTE: Experimental -func NewUpdateWithStartWorkflowOperation(options UpdateWorkflowOptions) *UpdateWithStartWorkflowOperation { - return internal.NewUpdateWithStartWorkflowOperation(options) -} - // Dial creates an instance of a workflow client. This will attempt to connect // to the server eagerly and will return an error if the server is not // available. diff --git a/contrib/datadog/go.mod b/contrib/datadog/go.mod index 17b6c31fb..f1979594e 100644 --- a/contrib/datadog/go.mod +++ b/contrib/datadog/go.mod @@ -5,7 +5,7 @@ go 1.21 toolchain go1.21.1 require ( - github.com/stretchr/testify v1.9.0 + github.com/stretchr/testify v1.10.0 go.temporal.io/sdk v1.25.1 gopkg.in/DataDog/dd-trace-go.v1 v1.58.1 ) diff --git a/contrib/datadog/go.sum b/contrib/datadog/go.sum index 5db6faf5e..37260a995 100644 --- a/contrib/datadog/go.sum +++ b/contrib/datadog/go.sum @@ -129,8 +129,8 @@ github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= -github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= -github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= +github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/tinylib/msgp v1.1.8 h1:FCXC1xanKO4I8plpHGH2P7koL/RzZs12l/+r7vakfm0= github.com/tinylib/msgp v1.1.8/go.mod h1:qkpG+2ldGg4xRFmx+jfTvZPxfGFhi64BcnL9vkCm/Tw= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= diff --git a/contrib/opentelemetry/go.mod b/contrib/opentelemetry/go.mod index 0f98401b0..bae1612a6 100644 --- a/contrib/opentelemetry/go.mod +++ b/contrib/opentelemetry/go.mod @@ -3,7 +3,7 @@ module go.temporal.io/sdk/contrib/opentelemetry go 1.21 require ( - github.com/stretchr/testify v1.9.0 + github.com/stretchr/testify v1.10.0 go.opentelemetry.io/otel v1.27.0 go.opentelemetry.io/otel/sdk v1.27.0 go.opentelemetry.io/otel/trace v1.27.0 diff --git a/contrib/opentelemetry/go.sum b/contrib/opentelemetry/go.sum index 2f7894c50..648c847d1 100644 --- a/contrib/opentelemetry/go.sum +++ b/contrib/opentelemetry/go.sum @@ -74,8 +74,8 @@ github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXf github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= -github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= +github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= diff --git a/contrib/opentracing/go.mod b/contrib/opentracing/go.mod index e34e06e7e..31ab65de4 100644 --- a/contrib/opentracing/go.mod +++ b/contrib/opentracing/go.mod @@ -6,7 +6,7 @@ toolchain go1.21.1 require ( github.com/opentracing/opentracing-go v1.2.0 - github.com/stretchr/testify v1.9.0 + github.com/stretchr/testify v1.10.0 go.temporal.io/sdk v1.12.0 ) diff --git a/contrib/opentracing/go.sum b/contrib/opentracing/go.sum index a61a73ce5..208746f9e 100644 --- a/contrib/opentracing/go.sum +++ b/contrib/opentracing/go.sum @@ -71,8 +71,8 @@ github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXf github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= -github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= +github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= diff --git a/contrib/resourcetuner/go.mod b/contrib/resourcetuner/go.mod index f03b1bb36..8d27c529e 100644 --- a/contrib/resourcetuner/go.mod +++ b/contrib/resourcetuner/go.mod @@ -7,7 +7,7 @@ toolchain go1.22.5 require ( github.com/containerd/cgroups/v3 v3.0.3 github.com/shirou/gopsutil/v4 v4.24.8 - github.com/stretchr/testify v1.9.0 + github.com/stretchr/testify v1.10.0 go.einride.tech/pid v0.1.3 go.temporal.io/sdk v1.29.1 ) diff --git a/contrib/resourcetuner/go.sum b/contrib/resourcetuner/go.sum index 8d0b8854c..78d12a20e 100644 --- a/contrib/resourcetuner/go.sum +++ b/contrib/resourcetuner/go.sum @@ -96,8 +96,8 @@ github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXf github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= -github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= +github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/tklauser/go-sysconf v0.3.12 h1:0QaGUFOdQaIVdPgfITYzaTegZvdCjmYO52cSFAEVmqU= github.com/tklauser/go-sysconf v0.3.12/go.mod h1:Ho14jnntGE1fpdOqQEEaiKRpvIavV0hSfmBq8nJbHYI= github.com/tklauser/numcpus v0.6.1 h1:ng9scYS7az0Bk4OZLvrNXNSAO2Pxr1XXRAPyjhIx+Fk= diff --git a/contrib/tally/go.mod b/contrib/tally/go.mod index d10053101..7500cf4da 100644 --- a/contrib/tally/go.mod +++ b/contrib/tally/go.mod @@ -5,7 +5,7 @@ go 1.21 toolchain go1.21.1 require ( - github.com/stretchr/testify v1.9.0 + github.com/stretchr/testify v1.10.0 github.com/uber-go/tally/v4 v4.1.1 go.temporal.io/sdk v1.12.0 ) diff --git a/contrib/tally/go.sum b/contrib/tally/go.sum index 6e54f1b0e..bb633e07b 100644 --- a/contrib/tally/go.sum +++ b/contrib/tally/go.sum @@ -132,8 +132,8 @@ github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXf github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= -github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= +github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/twmb/murmur3 v1.1.5 h1:i9OLS9fkuLzBXjt6dptlAEyk58fJsSTXbRg3SgVyqgk= github.com/twmb/murmur3 v1.1.5/go.mod h1:Qq/R7NUyOfr65zD+6Q5IHKsJLwP7exErjN6lyyq3OSQ= github.com/uber-go/tally/v4 v4.1.1 h1:jhy6WOZp4nHyCqeV43x3Wz370LXUGBhgW2JmzOIHCWI= diff --git a/go.mod b/go.mod index 7ed00c485..6a279e17e 100644 --- a/go.mod +++ b/go.mod @@ -12,7 +12,7 @@ require ( github.com/nexus-rpc/sdk-go v0.0.12 github.com/pborman/uuid v1.2.1 github.com/robfig/cron v1.2.0 - github.com/stretchr/testify v1.9.0 + github.com/stretchr/testify v1.10.0 go.temporal.io/api v1.40.0 golang.org/x/sync v0.8.0 golang.org/x/sys v0.24.0 diff --git a/go.sum b/go.sum index 2f16f13e7..1bd59f679 100644 --- a/go.sum +++ b/go.sum @@ -69,8 +69,8 @@ github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXf github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= -github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= +github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= diff --git a/interceptor/interceptor.go b/interceptor/interceptor.go index aa269ca5c..6d289b7fa 100644 --- a/interceptor/interceptor.go +++ b/interceptor/interceptor.go @@ -220,6 +220,10 @@ type ScheduleClientCreateInput = internal.ScheduleClientCreateInput // ClientOutoundInterceptor.UpdateWorkflow. type ClientUpdateWorkflowInput = internal.ClientUpdateWorkflowInput +// ClientUpdateWithStartWorkflowInput is input for +// ClientOutboundInterceptor.UpdateWithStartWorkflow. +type ClientUpdateWithStartWorkflowInput = internal.ClientUpdateWithStartWorkflowInput + // Header provides Temporal header information from the context for reading or // writing during specific interceptor calls. // diff --git a/interceptor/tracing_interceptor.go b/interceptor/tracing_interceptor.go index 4f3a1e33d..cc6c7bdc1 100644 --- a/interceptor/tracing_interceptor.go +++ b/interceptor/tracing_interceptor.go @@ -379,6 +379,33 @@ func (t *tracingClientOutboundInterceptor) UpdateWorkflow( return val, err } +func (t *tracingClientOutboundInterceptor) UpdateWithStartWorkflow( + ctx context.Context, + in *ClientUpdateWithStartWorkflowInput, +) (client.WorkflowUpdateHandle, error) { + // Only add tracing if enabled + if t.root.options.DisableUpdateTracing { + return t.Next.UpdateWithStartWorkflow(ctx, in) + } + // Start span and write to header + span, ctx, err := t.root.startSpanFromContext(ctx, &TracerStartSpanOptions{ + Operation: "UpdateWithStartWorkflow", + Name: in.UpdateOptions.UpdateName, + Tags: map[string]string{workflowIDTagKey: in.UpdateOptions.WorkflowID, updateIDTagKey: in.UpdateOptions.UpdateID}, + ToHeader: true, + Time: time.Now(), + }) + if err != nil { + return nil, err + } + var finishOpts TracerFinishSpanOptions + defer span.Finish(&finishOpts) + + val, err := t.Next.UpdateWithStartWorkflow(ctx, in) + finishOpts.Error = err + return val, err +} + type tracingActivityOutboundInterceptor struct { ActivityOutboundInterceptorBase root *tracingInterceptor diff --git a/internal/client.go b/internal/client.go index edfe2b2a3..6a5e32833 100644 --- a/internal/client.go +++ b/internal/client.go @@ -27,7 +27,6 @@ package internal import ( "context" "crypto/tls" - "errors" "fmt" "sync/atomic" "time" @@ -141,6 +140,10 @@ type ( SignalWithStartWorkflow(ctx context.Context, workflowID string, signalName string, signalArg interface{}, options StartWorkflowOptions, workflow interface{}, workflowArgs ...interface{}) (WorkflowRun, error) + // NewWithStartWorkflowOperation returns a WithStartWorkflowOperation for use in UpdateWithStartWorkflow. + // NOTE: Experimental + NewWithStartWorkflowOperation(options StartWorkflowOptions, workflow interface{}, args ...interface{}) WithStartWorkflowOperation + // CancelWorkflow cancels a workflow in execution // - workflow ID of the workflow. // - runID can be default(empty string). if empty string then it will pick the running execution of that workflow ID. @@ -400,6 +403,17 @@ type ( // NOTE: Experimental UpdateWorkflow(ctx context.Context, options UpdateWorkflowOptions) (WorkflowUpdateHandle, error) + // UpdateWithStartWorkflow issues an update-with-start request. A + // WorkflowIDConflictPolicy must be set. If the specified workflow is + // not running, then a new workflow execution is started and the update + // is sent in the first workflow task. Alternatively if the specified + // workflow is running then, if the WorkflowIDConflictPolicy is + // USE_EXISTING, the update is issued against the specified workflow, + // and if the WorkflowIDConflictPolicy is FAIL, an error is returned. + // + // NOTE: Experimental + UpdateWithStartWorkflow(ctx context.Context, options UpdateWithStartWorkflowOptions) (WorkflowUpdateHandle, error) + // GetWorkflowUpdateHandle creates a handle to the referenced update // which can be polled for an outcome. Note that runID is optional and // if not specified the most recent runID will be used. @@ -661,18 +675,6 @@ type ( // Optional: defaulted to Fail. WorkflowIDConflictPolicy enumspb.WorkflowIdConflictPolicy - // WithStartOperation - Operation to execute with Workflow Start. - // For example, see NewUpdateWithStartWorkflowOperation to perform Update-with-Start. Note that if the workflow is - // already running and WorkflowIDConflictPolicy is set to UseExisting, the start is skipped and only the - // operation is executed. If instead the policy is set to Fail (the default), nothing is executed and - // an error will be returned (i.e. the option WorkflowExecutionErrorWhenAlreadyStarted is ignored). - // This option will be ignored when used with Client.SignalWithStartWorkflow. - // - // Optional: defaults to nil. - // - // NOTE: Experimental - WithStartOperation WithStartWorkflowOperation - // When WorkflowExecutionErrorWhenAlreadyStarted is true, Client.ExecuteWorkflow will return an error if the // workflow id has already been used and WorkflowIDReusePolicy or WorkflowIDConflictPolicy would // disallow a re-run. If it is set to false, rather than erroring a WorkflowRun instance representing @@ -765,24 +767,24 @@ type ( links []*commonpb.Link } - // WithStartWorkflowOperation is a type of operation that can be executed as part of a workflow start. + // WithStartWorkflowOperation defines how to start a workflow when using UpdateWithStartWorkflow. + // See [NewWithStartWorkflowOperation] and [UpdateWithStartWorkflow]. + // NOTE: Experimental WithStartWorkflowOperation interface { - isWithStartWorkflowOperation() + // Get returns the WorkflowRun that was targeted by the UpdateWithStartWorkflow call. + // This is a blocking API. + Get(ctx context.Context) (WorkflowRun, error) } - // UpdateWithStartWorkflowOperation is used to perform Update-with-Start. - // See NewUpdateWithStartWorkflowOperation for details. - // - // Exposed as: [go.temporal.io/sdk/client.UpdateWithStartWorkflowOperation] - UpdateWithStartWorkflowOperation struct { - input *ClientUpdateWorkflowInput + withStartWorkflowOperationImpl struct { + input *ClientExecuteWorkflowInput // flag to ensure the operation is only executed once executed atomic.Bool // channel to indicate that handle or err is available doneCh chan struct{} - // handle and err cannot be accessed before doneCh is closed - handle WorkflowUpdateHandle - err error + // workflowRun and err cannot be accessed before doneCh is closed + workflowRun WorkflowRun + err error } // RetryPolicy defines the retry policy. @@ -1089,32 +1091,10 @@ func DialCloudOperationsClient(ctx context.Context, options CloudOperationsClien }, nil } -// NewUpdateWithStartWorkflowOperation returns an UpdateWithStartWorkflowOperation that can be used to perform Update-with-Start. -// -// Exposed as: [go.temporal.io/sdk/client.NewUpdateWithStartWorkflowOperation] -func NewUpdateWithStartWorkflowOperation(options UpdateWorkflowOptions) *UpdateWithStartWorkflowOperation { - res := &UpdateWithStartWorkflowOperation{doneCh: make(chan struct{})} - - input, err := createUpdateWorkflowInput(options) - if err != nil { - res.set(nil, err) - } else if options.RunID != "" { - res.set(nil, errors.New("RunID cannot be set because the workflow might not be running")) - } - if options.FirstExecutionRunID != "" { - res.set(nil, errors.New("FirstExecutionRunID cannot be set because the workflow might not be running")) - } else { - res.input = input - } - - return res -} - -// Get blocks until a server response has been received; or the context deadline is exceeded. -func (op *UpdateWithStartWorkflowOperation) Get(ctx context.Context) (WorkflowUpdateHandle, error) { +func (op *withStartWorkflowOperationImpl) Get(ctx context.Context) (WorkflowRun, error) { select { case <-op.doneCh: - return op.handle, op.err + return op.workflowRun, op.err case <-ctx.Done(): if !op.executed.Load() { return nil, fmt.Errorf("%w: %w", ctx.Err(), fmt.Errorf("operation was not executed")) @@ -1123,21 +1103,19 @@ func (op *UpdateWithStartWorkflowOperation) Get(ctx context.Context) (WorkflowUp } } -func (op *UpdateWithStartWorkflowOperation) markExecuted() error { +func (op *withStartWorkflowOperationImpl) markExecuted() error { if op.executed.Swap(true) { return fmt.Errorf("was already executed") } return nil } -func (op *UpdateWithStartWorkflowOperation) set(handle WorkflowUpdateHandle, err error) { - op.handle = handle +func (op *withStartWorkflowOperationImpl) set(workflowRun WorkflowRun, err error) { + op.workflowRun = workflowRun op.err = err close(op.doneCh) } -func (op *UpdateWithStartWorkflowOperation) isWithStartWorkflowOperation() {} - // NewNamespaceClient creates an instance of a namespace client, to manager lifecycle of namespaces. // // Exposed as: [go.temporal.io/sdk/client.NewNamespaceClient] diff --git a/internal/cmd/build/go.mod b/internal/cmd/build/go.mod index f72b00271..130bfb993 100644 --- a/internal/cmd/build/go.mod +++ b/internal/cmd/build/go.mod @@ -22,7 +22,7 @@ require ( github.com/pmezard/go-difflib v1.0.0 // indirect github.com/robfig/cron v1.2.0 // indirect github.com/stretchr/objx v0.5.2 // indirect - github.com/stretchr/testify v1.9.0 // indirect + github.com/stretchr/testify v1.10.0 // indirect go.temporal.io/api v1.40.0 // indirect golang.org/x/exp v0.0.0-20240409090435-93d18d7e34b8 // indirect golang.org/x/exp/typeparams v0.0.0-20240409090435-93d18d7e34b8 // indirect diff --git a/internal/cmd/build/go.sum b/internal/cmd/build/go.sum index c0d158470..ea74bd0d5 100644 --- a/internal/cmd/build/go.sum +++ b/internal/cmd/build/go.sum @@ -73,8 +73,8 @@ github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXf github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= -github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= +github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= diff --git a/internal/interceptor.go b/internal/interceptor.go index 021b9fa84..865dab8b9 100644 --- a/internal/interceptor.go +++ b/internal/interceptor.go @@ -423,6 +423,11 @@ type ClientOutboundInterceptor interface { // NOTE: Experimental UpdateWorkflow(context.Context, *ClientUpdateWorkflowInput) (WorkflowUpdateHandle, error) + // UpdateWithStartWorkflow intercepts client.Client.UpdateWithStartWorkflow. + // + // NOTE: Experimental + UpdateWithStartWorkflow(context.Context, *ClientUpdateWithStartWorkflowInput) (WorkflowUpdateHandle, error) + // PollWorkflowUpdate requests the outcome of a specific update from the // server. // @@ -448,6 +453,11 @@ type ClientUpdateWorkflowInput struct { WaitForStage WorkflowUpdateStage } +type ClientUpdateWithStartWorkflowInput struct { + UpdateOptions *UpdateWorkflowOptions + StartWorkflowOperation WithStartWorkflowOperation +} + // ClientPollWorkflowUpdateInput is the input to // ClientOutboundInterceptor.PollWorkflowUpdate. type ClientPollWorkflowUpdateInput struct { diff --git a/internal/interceptor_base.go b/internal/interceptor_base.go index 157c2c09f..72c1bdbf5 100644 --- a/internal/interceptor_base.go +++ b/internal/interceptor_base.go @@ -508,6 +508,13 @@ func (c *ClientOutboundInterceptorBase) PollWorkflowUpdate( return c.Next.PollWorkflowUpdate(ctx, in) } +func (c *ClientOutboundInterceptorBase) UpdateWithStartWorkflow( + ctx context.Context, + in *ClientUpdateWithStartWorkflowInput, +) (WorkflowUpdateHandle, error) { + return c.Next.UpdateWithStartWorkflow(ctx, in) +} + // ExecuteWorkflow implements ClientOutboundInterceptor.ExecuteWorkflow. func (c *ClientOutboundInterceptorBase) ExecuteWorkflow( ctx context.Context, diff --git a/internal/internal_workflow_client.go b/internal/internal_workflow_client.go index 2afc3065f..aa924a79f 100644 --- a/internal/internal_workflow_client.go +++ b/internal/internal_workflow_client.go @@ -69,9 +69,9 @@ var ( ) var ( - errUnsupportedOperation = fmt.Errorf("unsupported operation") - errInvalidServerResponse = fmt.Errorf("invalid server response") - errInvalidWorkflowOperation = fmt.Errorf("invalid WithStartOperation") + errUnsupportedOperation = fmt.Errorf("unsupported operation") + errInvalidServerResponse = fmt.Errorf("invalid server response") + errInvalidWithStartWorkflowOperation = fmt.Errorf("invalid WithStartWorkflowOperation") ) const ( @@ -237,29 +237,16 @@ func (wc *WorkflowClient) ExecuteWorkflow(ctx context.Context, options StartWork return nil, err } - // Default workflow ID - if options.ID == "" { - options.ID = uuid.New() - } + // Set header before interceptor run + ctx = contextWithNewHeader(ctx) - // Validate function and get name - if err := validateFunctionArgs(workflow, args, true); err != nil { - return nil, err - } - workflowType, err := getWorkflowFunctionName(wc.registry, workflow) + in, err := createStartWorkflowInput(options, workflow, args, wc.registry) if err != nil { return nil, err } - // Set header before interceptor run - ctx = contextWithNewHeader(ctx) - // Run via interceptor - return wc.interceptor.ExecuteWorkflow(ctx, &ClientExecuteWorkflowInput{ - Options: &options, - WorkflowType: workflowType, - Args: args, - }) + return wc.interceptor.ExecuteWorkflow(ctx, in) } // GetWorkflow gets a workflow execution and returns a WorkflowRun that will allow you to wait until this workflow @@ -338,9 +325,6 @@ func (wc *WorkflowClient) SignalWithStartWorkflow(ctx context.Context, workflowI if options.ID != "" && options.ID != workflowID { return nil, fmt.Errorf("workflow ID from options not used, must be unset or match workflow ID parameter") } - if options.WithStartOperation != nil { - return nil, fmt.Errorf("option WithStartOperation is not allowed") - } // Default workflow ID to UUID options.ID = workflowID @@ -370,6 +354,20 @@ func (wc *WorkflowClient) SignalWithStartWorkflow(ctx context.Context, workflowI }) } +func (wc *WorkflowClient) NewWithStartWorkflowOperation(options StartWorkflowOptions, workflow interface{}, args ...interface{}) WithStartWorkflowOperation { + op := &withStartWorkflowOperationImpl{doneCh: make(chan struct{})} + if options.WorkflowIDConflictPolicy == enumspb.WORKFLOW_ID_CONFLICT_POLICY_UNSPECIFIED { + op.err = errors.New("WorkflowIDConflictPolicy must be set in StartWorkflowOptions for update-with-start") + return op + } + input, err := createStartWorkflowInput(options, workflow, args, wc.registry) + if err != nil { + op.err = err + } + op.input = input + return op +} + // CancelWorkflow cancels a workflow in execution. It allows workflow to properly clean up and gracefully close. // workflowID is required, other parameters are optional. // If runID is omit, it will terminate currently running workflow (if there is one) based on the workflowID. @@ -805,6 +803,14 @@ type UpdateWorkflowOptions struct { FirstExecutionRunID string } +// UpdateWithStartWorkflowOptions encapsulates the parameters used by UpdateWithStartWorkflow. +// See UpdateWithStartWorkflow and NewWithStartWorkflowOperation. +// NOTE: Experimental +type UpdateWithStartWorkflowOptions struct { + StartWorkflowOperation WithStartWorkflowOperation + UpdateOptions UpdateWorkflowOptions +} + // WorkflowUpdateHandle is a handle to a workflow execution update process. The // update may or may not have completed so an instance of this type functions // similar to a Future with respect to the outcome of the update. If the update @@ -1170,7 +1176,7 @@ func (wc *WorkflowClient) UpdateWorkflow( return nil, err } - in, err := createUpdateWorkflowInput(options) + in, err := createUpdateWorkflowInput(&options) if err != nil { return nil, err } @@ -1180,6 +1186,35 @@ func (wc *WorkflowClient) UpdateWorkflow( return wc.interceptor.UpdateWorkflow(ctx, in) } +func (wc *WorkflowClient) UpdateWithStartWorkflow( + ctx context.Context, + options UpdateWithStartWorkflowOptions, +) (WorkflowUpdateHandle, error) { + startOp, ok := options.StartWorkflowOperation.(*withStartWorkflowOperationImpl) + if !ok { + return nil, fmt.Errorf("%w: startOperation must be created by NewWithStartWorkflowOperation", errInvalidWithStartWorkflowOperation) + } + if startOp.err != nil { + return nil, startOp.err + } + if err := wc.ensureInitialized(ctx); err != nil { + return nil, err + } + if options.UpdateOptions.RunID != "" { + return nil, errors.New("invalid UpdateWorkflowOptions: RunID cannot be set for UpdateWithStartWorkflow because the workflow might not be running") + } + if options.UpdateOptions.FirstExecutionRunID != "" { + return nil, errors.New("invalid UpdateWorkflowOptions: FirstExecutionRunID cannot be set for UpdateWithStartWorkflow because the workflow might not be running") + } + + ctx = contextWithNewHeader(ctx) + + return wc.interceptor.UpdateWithStartWorkflow(ctx, &ClientUpdateWithStartWorkflowInput{ + UpdateOptions: &options.UpdateOptions, + StartWorkflowOperation: startOp, + }) +} + // CheckHealthRequest is a request for Client.CheckHealth. type CheckHealthRequest struct{} @@ -1534,10 +1569,33 @@ type workflowClientInterceptor struct { client *WorkflowClient } -func (w *workflowClientInterceptor) ExecuteWorkflow( +func createStartWorkflowInput( + options StartWorkflowOptions, + workflow interface{}, + args []interface{}, + registry *registry, +) (*ClientExecuteWorkflowInput, error) { + if options.ID == "" { + options.ID = uuid.New() + } + if err := validateFunctionArgs(workflow, args, true); err != nil { + return nil, err + } + workflowType, err := getWorkflowFunctionName(registry, workflow) + if err != nil { + return nil, err + } + return &ClientExecuteWorkflowInput{ + Options: &options, + WorkflowType: workflowType, + Args: args, + }, nil +} + +func (w *workflowClientInterceptor) createStartWorkflowRequest( ctx context.Context, in *ClientExecuteWorkflowInput, -) (WorkflowRun, error) { +) (*workflowservice.StartWorkflowExecutionRequest, error) { // This is always set before interceptor is invoked workflowID := in.Options.ID if workflowID == "" { @@ -1608,44 +1666,49 @@ func (w *workflowClientInterceptor) ExecuteWorkflow( startRequest.RequestId = uuid.New() } + if in.Options.StartDelay != 0 { + startRequest.WorkflowStartDelay = durationpb.New(in.Options.StartDelay) + } + + return startRequest, nil +} + +func (w *workflowClientInterceptor) ExecuteWorkflow( + ctx context.Context, + in *ClientExecuteWorkflowInput, +) (WorkflowRun, error) { + startRequest, err := w.createStartWorkflowRequest(ctx, in) + if err != nil { + return nil, err + } + workflowID := startRequest.WorkflowId + var eagerExecutor *eagerWorkflowExecutor if in.Options.EnableEagerStart && w.client.capabilities.GetEagerWorkflowStart() && w.client.eagerDispatcher != nil { eagerExecutor = w.client.eagerDispatcher.applyToRequest(startRequest) } - if in.Options.StartDelay != 0 { - startRequest.WorkflowStartDelay = durationpb.New(in.Options.StartDelay) - } - grpcCtx, cancel := newGRPCContext(ctx, grpcMetricsHandler( w.client.metricsHandler.WithTags(metrics.RPCTags(in.WorkflowType, metrics.NoneTagValue, in.Options.TaskQueue))), defaultGrpcRetryParameters(ctx)) defer cancel() var runID string - if in.Options.WithStartOperation == nil { - response, err := w.client.workflowService.StartWorkflowExecution(grpcCtx, startRequest) - - eagerWorkflowTask := response.GetEagerWorkflowTask() - if eagerWorkflowTask != nil && eagerExecutor != nil { - eagerExecutor.handleResponse(eagerWorkflowTask) - } else if eagerExecutor != nil { - eagerExecutor.releaseUnused() - } + response, err := w.client.workflowService.StartWorkflowExecution(grpcCtx, startRequest) - // Allow already-started error - if e, ok := err.(*serviceerror.WorkflowExecutionAlreadyStarted); ok && !in.Options.WorkflowExecutionErrorWhenAlreadyStarted { - runID = e.RunId - } else if err != nil { - return nil, err - } else { - runID = response.RunId - } + eagerWorkflowTask := response.GetEagerWorkflowTask() + if eagerWorkflowTask != nil && eagerExecutor != nil { + eagerExecutor.handleResponse(eagerWorkflowTask) + } else if eagerExecutor != nil { + eagerExecutor.releaseUnused() + } + + // Allow already-started error + if e, ok := err.(*serviceerror.WorkflowExecutionAlreadyStarted); ok && !in.Options.WorkflowExecutionErrorWhenAlreadyStarted { + runID = e.RunId + } else if err != nil { + return nil, err } else { - response, err := w.executeWorkflowWithOperation(grpcCtx, startRequest, in.Options.WithStartOperation) - if err != nil { - return nil, err - } runID = response.RunId } @@ -1669,55 +1732,109 @@ func (w *workflowClientInterceptor) ExecuteWorkflow( }, nil } -func (w *workflowClientInterceptor) executeWorkflowWithOperation( +func (w *workflowClientInterceptor) UpdateWithStartWorkflow( ctx context.Context, - startRequest *workflowservice.StartWorkflowExecutionRequest, - operation WithStartWorkflowOperation, -) (*workflowservice.StartWorkflowExecutionResponse, error) { - startOp := &workflowservice.ExecuteMultiOperationRequest_Operation{ - Operation: &workflowservice.ExecuteMultiOperationRequest_Operation_StartWorkflow{ - StartWorkflow: startRequest, - }, + in *ClientUpdateWithStartWorkflowInput, +) (WorkflowUpdateHandle, error) { + startOp, ok := in.StartWorkflowOperation.(*withStartWorkflowOperationImpl) + if !ok { + return nil, fmt.Errorf("%w: startOperation must be created by NewWithStartWorkflowOperation", errInvalidWithStartWorkflowOperation) + } + if startOp.err != nil { + return nil, fmt.Errorf("%w: %w", errInvalidWithStartWorkflowOperation, startOp.err) } - var withStartOp *workflowservice.ExecuteMultiOperationRequest_Operation - switch t := operation.(type) { - case *UpdateWithStartWorkflowOperation: - if err := t.markExecuted(); err != nil { - return nil, fmt.Errorf("%w: %w", errInvalidWorkflowOperation, err) - } + // Create start request + if err := startOp.markExecuted(); err != nil { + return nil, fmt.Errorf("%w: %w", errInvalidWithStartWorkflowOperation, err) + } + startReq, err := w.createStartWorkflowRequest(ctx, startOp.input) + if err != nil { + return nil, err + } - if t.err != nil { - return nil, fmt.Errorf("%w: %w", errInvalidWorkflowOperation, t.err) - } + updateInput, err := createUpdateWorkflowInput(in.UpdateOptions) + if err != nil { + return nil, err + } - updateReq, err := w.createUpdateWorkflowRequest(ctx, t.input) - if err != nil { - return nil, fmt.Errorf("%w: %w", errInvalidWorkflowOperation, err) - } - if updateReq.WorkflowExecution.WorkflowId == "" { - updateReq.WorkflowExecution.WorkflowId = startRequest.WorkflowId - } + // Create update request + updateReq, err := w.createUpdateWorkflowRequest(ctx, updateInput) + if err != nil { + return nil, fmt.Errorf("%w: %w", errInvalidWithStartWorkflowOperation, err) + } + if updateReq.WorkflowExecution.WorkflowId == "" { + updateReq.WorkflowExecution.WorkflowId = startReq.WorkflowId + } - withStartOp = &workflowservice.ExecuteMultiOperationRequest_Operation{ - Operation: &workflowservice.ExecuteMultiOperationRequest_Operation_UpdateWorkflow{ - UpdateWorkflow: updateReq, - }, - } - default: - return nil, fmt.Errorf("%w: %T", errUnsupportedOperation, t) + grpcCtx, cancel := newGRPCContext( + ctx, + grpcMetricsHandler(w.client.metricsHandler.WithTags( + metrics.RPCTags(startOp.input.WorkflowType, metrics.NoneTagValue, startOp.input.Options.TaskQueue))), + defaultGrpcRetryParameters(ctx)) + defer cancel() + + iterFn := func(fnCtx context.Context, fnRunID string) HistoryEventIterator { + metricsHandler := w.client.metricsHandler.WithTags(metrics.RPCTags(startOp.input.WorkflowType, + metrics.NoneTagValue, startOp.input.Options.TaskQueue)) + return w.client.getWorkflowHistory(fnCtx, startOp.input.Options.ID, fnRunID, true, + enumspb.HISTORY_EVENT_FILTER_TYPE_CLOSE_EVENT, metricsHandler) + } + onStart := func(startResp *workflowservice.StartWorkflowExecutionResponse) { + runIDCell := util.PopulatedOnceCell(startResp.RunId) + startOp.set(&workflowRunImpl{ + workflowType: startOp.input.WorkflowType, + workflowID: startOp.input.Options.ID, + firstRunID: startResp.RunId, + currentRunID: &runIDCell, + iterFn: iterFn, + dataConverter: w.client.dataConverter, + failureConverter: w.client.failureConverter, + registry: w.client.registry, + }, nil) + } + + updateResp, err := w.updateWithStartWorkflow(grpcCtx, startReq, updateReq, onStart) + if err != nil { + return nil, err + } + handle, err := w.updateHandleFromResponse(ctx, updateReq.WaitPolicy.LifecycleStage, updateResp) + if err != nil { + return nil, fmt.Errorf("%w: %w", errInvalidWithStartWorkflowOperation, err) } + return handle, nil +} +// Perform update-with-start using the MultiOperation API. As with +// UpdateWorkflow, we issue the request repeatedly until the update is durable. +// The `onStart` callback is called once, the first time that a valid start +// response is received. +func (w *workflowClientInterceptor) updateWithStartWorkflow( + ctx context.Context, + startRequest *workflowservice.StartWorkflowExecutionRequest, + updateRequest *workflowservice.UpdateWorkflowExecutionRequest, + onStart func(*workflowservice.StartWorkflowExecutionResponse), +) (*workflowservice.UpdateWorkflowExecutionResponse, error) { + startOp := &workflowservice.ExecuteMultiOperationRequest_Operation{ + Operation: &workflowservice.ExecuteMultiOperationRequest_Operation_StartWorkflow{ + StartWorkflow: startRequest, + }, + } + updateOp := &workflowservice.ExecuteMultiOperationRequest_Operation{ + Operation: &workflowservice.ExecuteMultiOperationRequest_Operation_UpdateWorkflow{ + UpdateWorkflow: updateRequest, + }, + } multiRequest := workflowservice.ExecuteMultiOperationRequest{ Namespace: w.client.namespace, Operations: []*workflowservice.ExecuteMultiOperationRequest_Operation{ startOp, - withStartOp, + updateOp, }, } - var startResp *workflowservice.StartWorkflowExecutionResponse var updateResp *workflowservice.UpdateWorkflowExecutionResponse + seenStart := false for { multiResp, err := func() (*workflowservice.ExecuteMultiOperationResponse, error) { grpcCtx, cancel := newGRPCContext(ctx, grpcTimeout(pollUpdateTimeout), grpcLongPoll(true), defaultGrpcRetryParameters(ctx)) @@ -1758,7 +1875,7 @@ func (w *workflowClientInterceptor) executeWorkflowWithOperation( } case *workflowservice.ExecuteMultiOperationRequest_Operation_UpdateWorkflow: if !errors.As(opErr, &abortedErr) { - startErr = fmt.Errorf("%w: %w", errInvalidWorkflowOperation, opErr) + startErr = fmt.Errorf("%w: %w", errInvalidWithStartWorkflowOperation, opErr) } default: // this would only happen if a case statement for a newly added operation is missing above @@ -1781,7 +1898,10 @@ func (w *workflowClientInterceptor) executeWorkflowWithOperation( switch t := opReq.Operation.(type) { case *workflowservice.ExecuteMultiOperationRequest_Operation_StartWorkflow: if opResp, ok := resp.(*workflowservice.ExecuteMultiOperationResponse_Response_StartWorkflow); ok { - startResp = opResp.StartWorkflow + if !seenStart { + onStart(opResp.StartWorkflow) + seenStart = true + } } else { return nil, fmt.Errorf("%w: StartWorkflow response has the wrong type %T", errInvalidServerResponse, resp) } @@ -1801,14 +1921,7 @@ func (w *workflowClientInterceptor) executeWorkflowWithOperation( break } } - - handle, err := w.updateHandleFromResponse(ctx, enumspb.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_UNSPECIFIED, updateResp) - operation.(*UpdateWithStartWorkflowOperation).set(handle, err) - if err != nil { - return nil, fmt.Errorf("%w: %w", errInvalidWorkflowOperation, err) - } - - return startResp, nil + return updateResp, nil } func (w *workflowClientInterceptor) SignalWorkflow(ctx context.Context, in *ClientSignalWorkflowInput) error { @@ -2070,10 +2183,7 @@ func (w *workflowClientInterceptor) updateIsDurable(resp *workflowservice.Update resp.GetStage() != enumspb.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_UNSPECIFIED } -func createUpdateWorkflowInput( - options UpdateWorkflowOptions, -) (*ClientUpdateWorkflowInput, error) { - // Default update ID +func createUpdateWorkflowInput(options *UpdateWorkflowOptions) (*ClientUpdateWorkflowInput, error) { updateID := options.UpdateID if updateID == "" { updateID = uuid.New() diff --git a/internal/internal_workflow_client_test.go b/internal/internal_workflow_client_test.go index 8617163cf..f7b52abc4 100644 --- a/internal/internal_workflow_client_test.go +++ b/internal/internal_workflow_client_test.go @@ -1010,49 +1010,41 @@ func (s *workflowRunSuite) TestExecuteWorkflowWithUpdate_Retry() { }, }, nil) - updOp := NewUpdateWithStartWorkflowOperation( - UpdateWorkflowOptions{ - UpdateName: "update", - WaitForStage: WorkflowUpdateStageCompleted, - }) - - _, err := s.workflowClient.ExecuteWorkflow( - context.Background(), + startOp := s.workflowClient.NewWithStartWorkflowOperation( StartWorkflowOptions{ - ID: workflowID, - TaskQueue: taskqueue, - WithStartOperation: updOp, + ID: workflowID, + WorkflowIDConflictPolicy: enumspb.WORKFLOW_ID_CONFLICT_POLICY_FAIL, + TaskQueue: taskqueue, }, workflowType, ) + + _, err := s.workflowClient.UpdateWithStartWorkflow( + context.Background(), + UpdateWithStartWorkflowOptions{ + UpdateOptions: UpdateWorkflowOptions{ + UpdateName: "update", + WaitForStage: WorkflowUpdateStageCompleted, + }, + StartWorkflowOperation: startOp, + }, + ) s.NoError(err) } func (s *workflowRunSuite) TestExecuteWorkflowWithUpdate_OperationNotExecuted() { - s.workflowServiceClient.EXPECT().StartWorkflowExecution(gomock.Any(), gomock.Any(), gomock.Any()). - Return(&workflowservice.StartWorkflowExecutionResponse{ - RunId: runID, - }, nil) - - updOp := NewUpdateWithStartWorkflowOperation( - UpdateWorkflowOptions{ - UpdateName: "update", - WaitForStage: WorkflowUpdateStageCompleted, - }) - - ctxWithTimeout, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) - defer cancel() - _, err := s.workflowClient.ExecuteWorkflow( - ctxWithTimeout, + startOp := s.workflowClient.NewWithStartWorkflowOperation( StartWorkflowOptions{ - ID: workflowID, - TaskQueue: taskqueue, - // WithStartOperation is not specified! + ID: workflowID, + WorkflowIDConflictPolicy: enumspb.WORKFLOW_ID_CONFLICT_POLICY_FAIL, + TaskQueue: taskqueue, }, workflowType, ) - require.NoError(s.T(), err) - _, err = updOp.Get(ctxWithTimeout) + ctxWithTimeout, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) + defer cancel() + + _, err := startOp.Get(ctxWithTimeout) require.EqualError(s.T(), err, "context deadline exceeded: operation was not executed") } @@ -1092,22 +1084,26 @@ func (s *workflowRunSuite) TestExecuteWorkflowWithUpdate_Abort() { ExecuteMultiOperation(gomock.Any(), gomock.Any(), gomock.Any()). DoAndReturn(tt.respFunc) - updOp := NewUpdateWithStartWorkflowOperation( - UpdateWorkflowOptions{ - UpdateName: "update", - WaitForStage: WorkflowUpdateStageCompleted, - }) + startOp := s.workflowClient.NewWithStartWorkflowOperation( + StartWorkflowOptions{ + ID: workflowID, + WorkflowIDConflictPolicy: enumspb.WORKFLOW_ID_CONFLICT_POLICY_FAIL, + TaskQueue: taskqueue, + }, workflowType, + ) ctxWithTimeout, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) defer cancel() - _, err := s.workflowClient.ExecuteWorkflow( + _, err := s.workflowClient.UpdateWithStartWorkflow( ctxWithTimeout, - StartWorkflowOptions{ - ID: workflowID, - TaskQueue: taskqueue, - WithStartOperation: updOp, - }, workflowType, + UpdateWithStartWorkflowOptions{ + UpdateOptions: UpdateWorkflowOptions{ + UpdateName: "update", + WaitForStage: WorkflowUpdateStageCompleted, + }, + StartWorkflowOperation: startOp, + }, ) var expectedErr *WorkflowUpdateServiceTimeoutOrCanceledError @@ -1122,20 +1118,24 @@ func (s *workflowRunSuite) TestExecuteWorkflowWithUpdate_NonMultiOperationError( ExecuteMultiOperation(gomock.Any(), gomock.Any(), gomock.Any()). Return(nil, serviceerror.NewInternal("internal error")).Times(1) - updOp := NewUpdateWithStartWorkflowOperation( - UpdateWorkflowOptions{ - UpdateName: "update", - WaitForStage: WorkflowUpdateStageCompleted, - }) - - _, err := s.workflowClient.ExecuteWorkflow( - context.Background(), + startOp := s.workflowClient.NewWithStartWorkflowOperation( StartWorkflowOptions{ - ID: workflowID, - TaskQueue: taskqueue, - WithStartOperation: updOp, + ID: workflowID, + WorkflowIDConflictPolicy: enumspb.WORKFLOW_ID_CONFLICT_POLICY_FAIL, + TaskQueue: taskqueue, }, workflowType, ) + + _, err := s.workflowClient.UpdateWithStartWorkflow( + context.Background(), + UpdateWithStartWorkflowOptions{ + UpdateOptions: UpdateWorkflowOptions{ + UpdateName: "update", + WaitForStage: WorkflowUpdateStageCompleted, + }, + StartWorkflowOperation: startOp, + }, + ) s.ErrorContains(err, "internal error") } @@ -1146,20 +1146,24 @@ func (s *workflowRunSuite) TestExecuteWorkflowWithUpdate_ServerResponseCountMism Responses: []*workflowservice.ExecuteMultiOperationResponse_Response{}, }, nil).Times(1) - updOp := NewUpdateWithStartWorkflowOperation( - UpdateWorkflowOptions{ - UpdateName: "update", - WaitForStage: WorkflowUpdateStageCompleted, - }) - - _, err := s.workflowClient.ExecuteWorkflow( - context.Background(), + startOp := s.workflowClient.NewWithStartWorkflowOperation( StartWorkflowOptions{ - ID: workflowID, - TaskQueue: taskqueue, - WithStartOperation: updOp, + ID: workflowID, + WorkflowIDConflictPolicy: enumspb.WORKFLOW_ID_CONFLICT_POLICY_FAIL, + TaskQueue: taskqueue, }, workflowType, ) + + _, err := s.workflowClient.UpdateWithStartWorkflow( + context.Background(), + UpdateWithStartWorkflowOptions{ + UpdateOptions: UpdateWorkflowOptions{ + UpdateName: "update", + WaitForStage: WorkflowUpdateStageCompleted, + }, + StartWorkflowOperation: startOp, + }, + ) s.ErrorContains(err, "invalid server response: 0 instead of 2 operation results") } @@ -1168,20 +1172,24 @@ func (s *workflowRunSuite) TestExecuteWorkflowWithUpdate_ServerErrorResponseCoun ExecuteMultiOperation(gomock.Any(), gomock.Any(), gomock.Any()). Return(nil, serviceerror.NewMultiOperationExecution("Error", []error{})).Times(1) - updOp := NewUpdateWithStartWorkflowOperation( - UpdateWorkflowOptions{ - UpdateName: "update", - WaitForStage: WorkflowUpdateStageCompleted, - }) - - _, err := s.workflowClient.ExecuteWorkflow( - context.Background(), + startOp := s.workflowClient.NewWithStartWorkflowOperation( StartWorkflowOptions{ - ID: workflowID, - TaskQueue: taskqueue, - WithStartOperation: updOp, + ID: workflowID, + WorkflowIDConflictPolicy: enumspb.WORKFLOW_ID_CONFLICT_POLICY_FAIL, + TaskQueue: taskqueue, }, workflowType, ) + + _, err := s.workflowClient.UpdateWithStartWorkflow( + context.Background(), + UpdateWithStartWorkflowOptions{ + UpdateOptions: UpdateWorkflowOptions{ + UpdateName: "update", + WaitForStage: WorkflowUpdateStageCompleted, + }, + StartWorkflowOperation: startOp, + }, + ) s.ErrorContains(err, "invalid server response: 0 instead of 2 operation errors") } @@ -1197,20 +1205,24 @@ func (s *workflowRunSuite) TestExecuteWorkflowWithUpdate_ServerStartResponseType }, }, nil).Times(1) - updOp := NewUpdateWithStartWorkflowOperation( - UpdateWorkflowOptions{ - UpdateName: "update", - WaitForStage: WorkflowUpdateStageCompleted, - }) - - _, err := s.workflowClient.ExecuteWorkflow( - context.Background(), + startOp := s.workflowClient.NewWithStartWorkflowOperation( StartWorkflowOptions{ - ID: workflowID, - TaskQueue: taskqueue, - WithStartOperation: updOp, + ID: workflowID, + WorkflowIDConflictPolicy: enumspb.WORKFLOW_ID_CONFLICT_POLICY_FAIL, + TaskQueue: taskqueue, }, workflowType, ) + + _, err := s.workflowClient.UpdateWithStartWorkflow( + context.Background(), + UpdateWithStartWorkflowOptions{ + UpdateOptions: UpdateWorkflowOptions{ + UpdateName: "update", + WaitForStage: WorkflowUpdateStageCompleted, + }, + StartWorkflowOperation: startOp, + }, + ) s.ErrorContains(err, "invalid server response: StartWorkflow response has the wrong type *workflowservice.ExecuteMultiOperationResponse_Response_UpdateWorkflow") } @@ -1220,7 +1232,11 @@ func (s *workflowRunSuite) TestExecuteWorkflowWithUpdate_ServerUpdateResponseTyp Return(&workflowservice.ExecuteMultiOperationResponse{ Responses: []*workflowservice.ExecuteMultiOperationResponse_Response{ { - Response: &workflowservice.ExecuteMultiOperationResponse_Response_StartWorkflow{}, + Response: &workflowservice.ExecuteMultiOperationResponse_Response_StartWorkflow{ + StartWorkflow: &workflowservice.StartWorkflowExecutionResponse{ + RunId: "RUN_ID", + }, + }, }, { Response: &workflowservice.ExecuteMultiOperationResponse_Response_StartWorkflow{}, // wrong! @@ -1228,20 +1244,24 @@ func (s *workflowRunSuite) TestExecuteWorkflowWithUpdate_ServerUpdateResponseTyp }, }, nil).Times(1) - updOp := NewUpdateWithStartWorkflowOperation( - UpdateWorkflowOptions{ - UpdateName: "update", - WaitForStage: WorkflowUpdateStageCompleted, - }) - - _, err := s.workflowClient.ExecuteWorkflow( - context.Background(), + startOp := s.workflowClient.NewWithStartWorkflowOperation( StartWorkflowOptions{ - ID: workflowID, - TaskQueue: taskqueue, - WithStartOperation: updOp, + ID: workflowID, + WorkflowIDConflictPolicy: enumspb.WORKFLOW_ID_CONFLICT_POLICY_FAIL, + TaskQueue: taskqueue, }, workflowType, ) + + _, err := s.workflowClient.UpdateWithStartWorkflow( + context.Background(), + UpdateWithStartWorkflowOptions{ + UpdateOptions: UpdateWorkflowOptions{ + UpdateName: "update", + WaitForStage: WorkflowUpdateStageCompleted, + }, + StartWorkflowOperation: startOp, + }, + ) s.ErrorContains(err, "invalid server response: UpdateWorkflow response has the wrong type *workflowservice.ExecuteMultiOperationResponse_Response_StartWorkflow") } @@ -1358,15 +1378,6 @@ func (s *workflowClientTestSuite) TestSignalWithStartWorkflowValidation() { context.Background(), "workflow-id-1", "my-signal", "my-signal-value", StartWorkflowOptions{ID: "workflow-id-2"}, workflowType) s.ErrorContains(err, "workflow ID from options not used") - - // unsupported WithStartOperation - _, err = s.client.SignalWithStartWorkflow( - context.Background(), "workflow-id", "my-signal", "my-signal-value", - StartWorkflowOptions{ - ID: "workflow-id", - WithStartOperation: &UpdateWithStartWorkflowOperation{}, - }, workflowType) - s.ErrorContains(err, "option WithStartOperation is not allowed") } func (s *workflowClientTestSuite) TestStartWorkflow() { diff --git a/internal/nexus_operations.go b/internal/nexus_operations.go index 25e7804ad..7d480f614 100644 --- a/internal/nexus_operations.go +++ b/internal/nexus_operations.go @@ -264,6 +264,10 @@ func (t *testSuiteClientForNexusOperations) ExecuteWorkflow(ctx context.Context, return run, nil } +func (t *testSuiteClientForNexusOperations) NewWithStartWorkflowOperation(options StartWorkflowOptions, workflow interface{}, args ...interface{}) WithStartWorkflowOperation { + panic("not implemented in the test environment") +} + // GetSearchAttributes implements Client. func (t *testSuiteClientForNexusOperations) GetSearchAttributes(ctx context.Context) (*workflowservice.GetSearchAttributesResponse, error) { panic("not implemented in the test environment") @@ -379,6 +383,11 @@ func (t *testSuiteClientForNexusOperations) UpdateWorkflow(ctx context.Context, panic("unimplemented in the test environment") } +// UpdateWithStartWorkflow implements Client. +func (t *testSuiteClientForNexusOperations) UpdateWithStartWorkflow(ctx context.Context, options UpdateWithStartWorkflowOptions) (WorkflowUpdateHandle, error) { + panic("unimplemented in the test environment") +} + // UpdateWorkerBuildIdCompatibility implements Client. func (t *testSuiteClientForNexusOperations) UpdateWorkerBuildIdCompatibility(ctx context.Context, options *UpdateWorkerBuildIdCompatibilityOptions) error { panic("not implemented in the test environment") diff --git a/internal/workflow_testsuite.go b/internal/workflow_testsuite.go index 40fe10d1c..0d161f453 100644 --- a/internal/workflow_testsuite.go +++ b/internal/workflow_testsuite.go @@ -359,6 +359,16 @@ func (e *TestWorkflowEnvironment) SetContinuedExecutionRunID(rid string) { e.impl.setContinuedExecutionRunID(rid) } +// InOrderMockCalls declares that the given calls should occur in order. Syntax sugar for NotBefore. +func (e *TestWorkflowEnvironment) InOrderMockCalls(calls ...*MockCallWrapper) { + wrappedCalls := make([]*mock.Call, 0, len(calls)) + for _, call := range calls { + wrappedCalls = append(wrappedCalls, call.call) + } + + mock.InOrder(wrappedCalls...) +} + // OnActivity setup a mock call for activity. Parameter activity must be activity function (func) or activity name (string). // You must call Return() with appropriate parameters on the returned *MockCallWrapper instance. The supplied parameters to // the Return() call should either be a function that has exact same signature as the mocked activity, or it should be diff --git a/internal/workflow_testsuite_test.go b/internal/workflow_testsuite_test.go index 2824d7d11..8030eca6d 100644 --- a/internal/workflow_testsuite_test.go +++ b/internal/workflow_testsuite_test.go @@ -974,6 +974,29 @@ func TestActivityMockingByNameWithoutRegistrationFails(t *testing.T) { assert.Panics(t, func() { env.OnActivity("SayHello", mock.Anything, mock.Anything) }, "The code did not panic") } +func TestMockCallWrapperInOrder(t *testing.T) { + testSuite := &WorkflowTestSuite{} + env := testSuite.NewTestWorkflowEnvironment() + env.RegisterActivity(namedActivity) + + env.InOrderMockCalls( + env.OnActivity(namedActivity, mock.Anything, "call1").Return("result1", nil), + env.OnActivity(namedActivity, mock.Anything, "call2").Return("result2", nil), + ) + + env.ExecuteWorkflow(func(ctx Context) error { + ctx = WithLocalActivityOptions(ctx, LocalActivityOptions{ + ScheduleToCloseTimeout: time.Hour, + StartToCloseTimeout: time.Hour, + }) + var result string + return ExecuteLocalActivity(ctx, "namedActivity", "call2").Get(ctx, &result) + }) + var expectedErr *PanicError + require.ErrorAs(t, env.GetWorkflowError(), &expectedErr) + require.ErrorContains(t, expectedErr, "Must not be called before") +} + func TestMockCallWrapperNotBefore(t *testing.T) { testSuite := &WorkflowTestSuite{} env := testSuite.NewTestWorkflowEnvironment() diff --git a/mocks/Client.go b/mocks/Client.go index 0b5f9102b..8b0ee7233 100644 --- a/mocks/Client.go +++ b/mocks/Client.go @@ -22,7 +22,7 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -// Code generated by mockery v1.0.0. +// Code generated by mockery v1.0.0, and other versions, with manual fixups. // Modified manually for type alias to work correctly. // https://github.com/vektra/mockery/issues/236 @@ -601,6 +601,29 @@ func (_m *Client) ListWorkflow(ctx context.Context, request *workflowservice.Lis return r0, r1 } +// NewWithStartWorkflowOperation provides a mock function with given fields: options, workflow, args +func (_m *Client) NewWithStartWorkflowOperation(options client.StartWorkflowOptions, workflow interface{}, args ...interface{}) client.WithStartWorkflowOperation { + var _ca []interface{} + _ca = append(_ca, options, workflow) + _ca = append(_ca, args...) + ret := _m.Called(_ca...) + + if len(ret) == 0 { + panic("no return value specified for NewWithStartWorkflowOperation") + } + + var r0 client.WithStartWorkflowOperation + if rf, ok := ret.Get(0).(func(client.StartWorkflowOptions, interface{}, ...interface{}) client.WithStartWorkflowOperation); ok { + r0 = rf(options, workflow, args...) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(client.WithStartWorkflowOperation) + } + } + + return r0 +} + // OperatorService provides a mock function with given fields: func (_m *Client) OperatorService() operatorservice.OperatorServiceClient { ret := _m.Called() @@ -878,6 +901,35 @@ func (_m *Client) TerminateWorkflow(ctx context.Context, workflowID string, runI return r0 } +// UpdateWithStartWorkflow provides a mock function with given fields: ctx, options +func (_m *Client) UpdateWithStartWorkflow(ctx context.Context, options client.UpdateWithStartWorkflowOptions) (client.WorkflowUpdateHandle, error) { + ret := _m.Called(ctx, options) + + if len(ret) == 0 { + panic("no return value specified for UpdateWithStartWorkflow") + } + + var r0 client.WorkflowUpdateHandle + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, client.UpdateWithStartWorkflowOptions) (client.WorkflowUpdateHandle, error)); ok { + return rf(ctx, options) + } + if rf, ok := ret.Get(0).(func(context.Context, client.UpdateWithStartWorkflowOptions) client.WorkflowUpdateHandle); ok { + r0 = rf(ctx, options) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(client.WorkflowUpdateHandle) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, client.UpdateWithStartWorkflowOptions) error); ok { + r1 = rf(ctx, options) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} // UpdateWorkerBuildIdCompatibility provides a mock function with given fields: ctx, options // //lint:ignore SA1019 ignore for SDK mocks diff --git a/test/go.mod b/test/go.mod index c91e202f2..d32c8d521 100644 --- a/test/go.mod +++ b/test/go.mod @@ -10,7 +10,7 @@ require ( github.com/nexus-rpc/sdk-go v0.0.12 github.com/opentracing/opentracing-go v1.2.0 github.com/pborman/uuid v1.2.1 - github.com/stretchr/testify v1.9.0 + github.com/stretchr/testify v1.10.0 github.com/uber-go/tally/v4 v4.1.1 go.opentelemetry.io/otel v1.28.0 go.opentelemetry.io/otel/sdk v1.28.0 diff --git a/test/go.sum b/test/go.sum index c0ebd755e..fe0968320 100644 --- a/test/go.sum +++ b/test/go.sum @@ -166,8 +166,8 @@ github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXf github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= -github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= +github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/tklauser/go-sysconf v0.3.12 h1:0QaGUFOdQaIVdPgfITYzaTegZvdCjmYO52cSFAEVmqU= github.com/tklauser/go-sysconf v0.3.12/go.mod h1:Ho14jnntGE1fpdOqQEEaiKRpvIavV0hSfmBq8nJbHYI= github.com/tklauser/numcpus v0.6.1 h1:ng9scYS7az0Bk4OZLvrNXNSAO2Pxr1XXRAPyjhIx+Fk= diff --git a/test/integration_test.go b/test/integration_test.go index bf6b48df2..bafe9667a 100644 --- a/test/integration_test.go +++ b/test/integration_test.go @@ -2536,14 +2536,18 @@ func (ts *IntegrationTestSuite) TestInterceptorStartWithSignal() { } func (ts *IntegrationTestSuite) TestOpenTelemetryTracing() { - ts.testOpenTelemetryTracing(true) + ts.testOpenTelemetryTracing(true, false) +} + +func (ts *IntegrationTestSuite) TestOpenTelemetryTracingWithUpdateWithStart() { + ts.testOpenTelemetryTracing(true, true) } func (ts *IntegrationTestSuite) TestOpenTelemetryTracingWithoutMessages() { - ts.testOpenTelemetryTracing(false) + ts.testOpenTelemetryTracing(false, false) } -func (ts *IntegrationTestSuite) testOpenTelemetryTracing(withMessages bool) { +func (ts *IntegrationTestSuite) testOpenTelemetryTracing(withMessages bool, updateWithStart bool) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() // Start a top-level span @@ -2561,15 +2565,31 @@ func (ts *IntegrationTestSuite) testOpenTelemetryTracing(withMessages bool) { ts.NoError(val.Get(&queryResp)) ts.Equal("query-response", queryResp) - // Update - handle, err := ts.client.UpdateWorkflow(ctx, client.UpdateWorkflowOptions{ - WorkflowID: run.GetID(), - RunID: run.GetRunID(), - UpdateName: "workflow-update", - WaitForStage: client.WorkflowUpdateStageCompleted, - }) - ts.NoError(err) - ts.NoError(handle.Get(ctx, nil)) + if updateWithStart { + uwsStartOptions := ts.startWorkflowOptions(run.GetID()) + uwsStartOptions.EnableEagerStart = false + uwsStartOptions.WorkflowIDConflictPolicy = enumspb.WORKFLOW_ID_CONFLICT_POLICY_USE_EXISTING + startOp := ts.client.NewWithStartWorkflowOperation(uwsStartOptions, ts.workflows.SignalsQueriesAndUpdate, true, true) + updateHandle, err := ts.client.UpdateWithStartWorkflow(ctx, client.UpdateWithStartWorkflowOptions{ + UpdateOptions: client.UpdateWorkflowOptions{ + WorkflowID: run.GetID(), + UpdateName: "workflow-update", + WaitForStage: client.WorkflowUpdateStageCompleted, + }, + StartWorkflowOperation: startOp, + }) + ts.NoError(err) + ts.NoError(updateHandle.Get(ctx, nil)) + } else { + handle, err := ts.client.UpdateWorkflow(ctx, client.UpdateWorkflowOptions{ + WorkflowID: run.GetID(), + RunID: run.GetRunID(), + UpdateName: "workflow-update", + WaitForStage: client.WorkflowUpdateStageCompleted, + }) + ts.NoError(err) + ts.NoError(handle.Get(ctx, nil)) + } // Finish signal ts.NoError(ts.client.SignalWorkflow(ctx, run.GetID(), run.GetRunID(), "finish-signal", nil)) @@ -2579,6 +2599,11 @@ func (ts *IntegrationTestSuite) testOpenTelemetryTracing(withMessages bool) { rootSpan.End() spans := ts.openTelemetrySpanRecorder.Ended() + updateOpName := "UpdateWorkflow" + if updateWithStart { + updateOpName = "UpdateWithStartWorkflow" + } + // Span builder span := func(name string, children ...*interceptortest.SpanInfo) *interceptortest.SpanInfo { // If without signal-and-query headers, filter out those children in place @@ -2590,7 +2615,7 @@ func (ts *IntegrationTestSuite) testOpenTelemetryTracing(withMessages bool) { strings.HasPrefix(child.Name, "HandleSignal:") || strings.HasPrefix(child.Name, "QueryWorkflow:") || strings.HasPrefix(child.Name, "HandleQuery:") || - strings.HasPrefix(child.Name, "UpdateWorkflow:") || + strings.HasPrefix(child.Name, fmt.Sprintf("%s:", updateOpName)) || strings.HasPrefix(child.Name, "ValidateUpdate:") || strings.HasPrefix(child.Name, "HandleUpdate:") if !isMessage { @@ -2658,7 +2683,7 @@ func (ts *IntegrationTestSuite) testOpenTelemetryTracing(withMessages bool) { span("QueryWorkflow:workflow-query", span("HandleQuery:workflow-query"), ), - span("UpdateWorkflow:workflow-update", + span(fmt.Sprintf("%s:workflow-update", updateOpName), span("ValidateUpdate:workflow-update"), span("HandleUpdate:workflow-update", // Child workflow exec @@ -3982,231 +4007,266 @@ func (ts *IntegrationTestSuite) TestUpdateSettingHandlerInHandler() { ts.NoError(run.Get(ctx, nil)) } -func (ts *IntegrationTestSuite) TestExecuteWorkflowWithUpdate() { +func (ts *IntegrationTestSuite) TestUpdateWithStartWorkflow() { ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() - startOptionsWithOperation := func(op client.WithStartWorkflowOperation) client.StartWorkflowOptions { - startOptions := ts.startWorkflowOptions("test-update-with-start-" + uuid.New()) - startOptions.EnableEagerStart = false // not allowed to use with update-with-start - startOptions.WithStartOperation = op - return startOptions + startWorkflowOptions := func() client.StartWorkflowOptions { + opts := ts.startWorkflowOptions("test-update-with-start-" + uuid.New()) + opts.EnableEagerStart = false // not allowed to use with update-with-start + opts.WorkflowIDConflictPolicy = enumspb.WORKFLOW_ID_CONFLICT_POLICY_FAIL // required for update-with-start + return opts } ts.Run("sends update-with-start (no running workflow)", func() { - updateOp := client.NewUpdateWithStartWorkflowOperation( - client.UpdateWorkflowOptions{ + startOp := ts.client.NewWithStartWorkflowOperation( + startWorkflowOptions(), ts.workflows.UpdateEntityWorkflow, + ) + updHandle, err := ts.client.UpdateWithStartWorkflow(ctx, client.UpdateWithStartWorkflowOptions{ + UpdateOptions: client.UpdateWorkflowOptions{ UpdateName: "update", Args: []any{1}, WaitForStage: client.WorkflowUpdateStageAccepted, - }) - - startOptions := startOptionsWithOperation(updateOp) - run, err := ts.client.ExecuteWorkflow(ctx, startOptions, ts.workflows.UpdateEntityWorkflow) + }, + StartWorkflowOperation: startOp, + }) ts.NoError(err) var updateResult int - updHandle, err := updateOp.Get(ctx) - ts.NoError(err) ts.NoError(updHandle.Get(ctx, &updateResult)) ts.Equal(1, updateResult) + run, err := startOp.Get(ctx) + ts.NoError(err) var workflowResult int ts.NoError(run.Get(ctx, &workflowResult)) ts.Equal(1, workflowResult) }) ts.Run("sends update-with-start (already running workflow)", func() { - startOptions := startOptionsWithOperation(nil) + startOptions := startWorkflowOptions() run1, err := ts.client.ExecuteWorkflow(ctx, startOptions, ts.workflows.UpdateEntityWorkflow) ts.NoError(err) - updateOp := client.NewUpdateWithStartWorkflowOperation( - client.UpdateWorkflowOptions{ + startOptions.WorkflowIDConflictPolicy = enumspb.WORKFLOW_ID_CONFLICT_POLICY_USE_EXISTING + startOp := ts.client.NewWithStartWorkflowOperation(startOptions, ts.workflows.UpdateEntityWorkflow) + + updHandle, err := ts.client.UpdateWithStartWorkflow(ctx, client.UpdateWithStartWorkflowOptions{ + UpdateOptions: client.UpdateWorkflowOptions{ UpdateName: "update", Args: []any{1}, WaitForStage: client.WorkflowUpdateStageCompleted, - }) + }, + StartWorkflowOperation: startOp, + }) + ts.NoError(err) - startOptions.WithStartOperation = updateOp - startOptions.WorkflowIDConflictPolicy = enumspb.WORKFLOW_ID_CONFLICT_POLICY_USE_EXISTING - run2, err := ts.client.ExecuteWorkflow(ctx, startOptions, ts.workflows.UpdateEntityWorkflow) + run2, err := startOp.Get(ctx) ts.NoError(err) ts.Equal(run1.GetRunID(), run2.GetRunID()) var updateResult int - updHandle, err := updateOp.Get(ctx) - ts.NoError(err) ts.NoError(updHandle.Get(ctx, &updateResult)) ts.Equal(1, updateResult) }) ts.Run("sends update-with-start but update is rejected", func() { - updateOp := client.NewUpdateWithStartWorkflowOperation( - client.UpdateWorkflowOptions{ + startOp := ts.client.NewWithStartWorkflowOperation(startWorkflowOptions(), ts.workflows.UpdateEntityWorkflow) + + updHandle, err := ts.client.UpdateWithStartWorkflow(ctx, client.UpdateWithStartWorkflowOptions{ + UpdateOptions: client.UpdateWorkflowOptions{ UpdateName: "update", Args: []any{-1}, // rejected update payload WaitForStage: client.WorkflowUpdateStageCompleted, - }) + }, + StartWorkflowOperation: startOp, + }) + ts.NoError(err) - startOptions := startOptionsWithOperation(updateOp) - run, err := ts.client.ExecuteWorkflow(ctx, startOptions, ts.workflows.UpdateEntityWorkflow) + run, err := startOp.Get(ctx) ts.NoError(err) ts.NotNil(run) var updateResult int - updHandle, err := updateOp.Get(ctx) - ts.NoError(err) err = updHandle.Get(ctx, &updateResult) ts.ErrorContains(err, "addend must be non-negative") }) - ts.Run("receives update result in separate goroutines", func() { - updateOp := client.NewUpdateWithStartWorkflowOperation( - client.UpdateWorkflowOptions{ + ts.Run("receives results in separate goroutines", func() { + + startOp := ts.client.NewWithStartWorkflowOperation(startWorkflowOptions(), ts.workflows.UpdateEntityWorkflow) + + done1 := make(chan struct{}) + defer func() { <-done1 }() + go func() { + run, err := startOp.Get(ctx) + ts.NoError(err) + ts.NotNil(run) + done1 <- struct{}{} + }() + + updHandle, err := ts.client.UpdateWithStartWorkflow(ctx, client.UpdateWithStartWorkflowOptions{ + UpdateOptions: client.UpdateWorkflowOptions{ UpdateName: "update", Args: []any{1}, WaitForStage: client.WorkflowUpdateStageAccepted, - }) + }, + StartWorkflowOperation: startOp, + }) + ts.NoError(err) - done := make(chan struct{}) - defer func() { <-done }() + done2 := make(chan struct{}) + defer func() { <-done2 }() go func() { var updateResult int - updHandle, err := updateOp.Get(ctx) - ts.NoError(err) ts.NoError(updHandle.Get(ctx, &updateResult)) ts.Equal(1, updateResult) - done <- struct{}{} + done2 <- struct{}{} }() - startOptions := startOptionsWithOperation(updateOp) - _, err := ts.client.ExecuteWorkflow(ctx, startOptions, ts.workflows.UpdateEntityWorkflow) - ts.NoError(err) - var updateResult int - updHandle, err := updateOp.Get(ctx) - ts.NoError(err) ts.NoError(updHandle.Get(ctx, &updateResult)) ts.Equal(1, updateResult) }) ts.Run("fails when start request is invalid", func() { - updateOp := client.NewUpdateWithStartWorkflowOperation( - client.UpdateWorkflowOptions{ - UpdateName: "update", - WaitForStage: client.WorkflowUpdateStageCompleted, - }) + updateOptions := client.UpdateWorkflowOptions{ + UpdateName: "update", + WaitForStage: client.WorkflowUpdateStageCompleted, + } + startOptions := startWorkflowOptions() - startOptions := startOptionsWithOperation(updateOp) startOptions.CronSchedule = "invalid!" - _, err := ts.client.ExecuteWorkflow(ctx, startOptions, ts.workflows.UpdateEntityWorkflow) + startOp := ts.client.NewWithStartWorkflowOperation(startOptions, ts.workflows.UpdateEntityWorkflow) + _, err := ts.client.UpdateWithStartWorkflow(ctx, client.UpdateWithStartWorkflowOptions{ + UpdateOptions: updateOptions, + StartWorkflowOperation: startOp, + }) ts.Error(err) + + startOptions.WorkflowIDConflictPolicy = enumspb.WORKFLOW_ID_CONFLICT_POLICY_UNSPECIFIED + startOp = ts.client.NewWithStartWorkflowOperation(startOptions, ts.workflows.UpdateEntityWorkflow) + _, err = ts.client.UpdateWithStartWorkflow(ctx, client.UpdateWithStartWorkflowOptions{ + UpdateOptions: updateOptions, + StartWorkflowOperation: startOp, + }) + ts.ErrorContains(err, "WorkflowIDConflictPolicy must be set") }) ts.Run("fails when update operation is invalid", func() { - updateOp := client.NewUpdateWithStartWorkflowOperation( - client.UpdateWorkflowOptions{ - // invalid - }) + startOptions := startWorkflowOptions() - startOptions := startOptionsWithOperation(updateOp) - _, err := ts.client.ExecuteWorkflow(ctx, startOptions, ts.workflows.UpdateEntityWorkflow) - ts.ErrorContains(err, "invalid WithStartOperation: WaitForStage must be specified") + startOp := ts.client.NewWithStartWorkflowOperation(startOptions, ts.workflows.UpdateEntityWorkflow) + + _, err := ts.client.UpdateWithStartWorkflow(ctx, client.UpdateWithStartWorkflowOptions{ + UpdateOptions: client.UpdateWorkflowOptions{ + // invalid + }, + StartWorkflowOperation: startOp, + }) + ts.ErrorContains(err, "WaitForStage must be specified") - updateOp = client.NewUpdateWithStartWorkflowOperation( - client.UpdateWorkflowOptions{ + _, err = ts.client.UpdateWithStartWorkflow(ctx, client.UpdateWithStartWorkflowOptions{ + UpdateOptions: client.UpdateWorkflowOptions{ RunID: "invalid", WaitForStage: client.WorkflowUpdateStageCompleted, - }) - - startOptions = startOptionsWithOperation(updateOp) - _, err = ts.client.ExecuteWorkflow(ctx, startOptions, ts.workflows.UpdateEntityWorkflow) - ts.ErrorContains(err, "invalid WithStartOperation: RunID cannot be set because the workflow might not be running") + }, + StartWorkflowOperation: startOp, + }) + ts.ErrorContains(err, "invalid UpdateWorkflowOptions: RunID cannot be set for UpdateWithStartWorkflow because the workflow might not be running") - updateOp = client.NewUpdateWithStartWorkflowOperation( - client.UpdateWorkflowOptions{ + _, err = ts.client.UpdateWithStartWorkflow(ctx, client.UpdateWithStartWorkflowOptions{ + UpdateOptions: client.UpdateWorkflowOptions{ FirstExecutionRunID: "invalid", WaitForStage: client.WorkflowUpdateStageCompleted, - }) - - startOptions = startOptionsWithOperation(updateOp) - _, err = ts.client.ExecuteWorkflow(ctx, startOptions, ts.workflows.UpdateEntityWorkflow) - ts.ErrorContains(err, "invalid WithStartOperation: FirstExecutionRunID cannot be set because the workflow might not be running") + }, + StartWorkflowOperation: startOp, + }) + ts.ErrorContains(err, "invalid UpdateWorkflowOptions: FirstExecutionRunID cannot be set for UpdateWithStartWorkflow because the workflow might not be running") - updateOp = client.NewUpdateWithStartWorkflowOperation( - client.UpdateWorkflowOptions{ + _, err = ts.client.UpdateWithStartWorkflow(ctx, client.UpdateWithStartWorkflowOptions{ + UpdateOptions: client.UpdateWorkflowOptions{ UpdateName: "", // invalid WaitForStage: client.WorkflowUpdateStageCompleted, - }) - - startOptions = startOptionsWithOperation(updateOp) - _, err = ts.client.ExecuteWorkflow(ctx, startOptions, ts.workflows.UpdateEntityWorkflow) - ts.ErrorContains(err, "invalid WithStartOperation: ") // omitting server message intentionally + }, + StartWorkflowOperation: startOp, + }) + ts.ErrorContains(err, "invalid WithStartWorkflowOperation: ") // omitting server message intentionally - updateOp = client.NewUpdateWithStartWorkflowOperation( - client.UpdateWorkflowOptions{ + _, err = ts.client.UpdateWithStartWorkflow(ctx, client.UpdateWithStartWorkflowOptions{ + UpdateOptions: client.UpdateWorkflowOptions{ WorkflowID: "different", // does not match Start's UpdateName: "update", WaitForStage: client.WorkflowUpdateStageCompleted, - }) - - startOptions = startOptionsWithOperation(updateOp) - _, err = ts.client.ExecuteWorkflow(ctx, startOptions, ts.workflows.UpdateEntityWorkflow) - ts.ErrorContains(err, "invalid WithStartOperation: ") // omitting server message intentionally + }, + StartWorkflowOperation: startOp, + }) + ts.ErrorContains(err, "invalid WithStartWorkflowOperation: ") // omitting server message intentionally }) ts.Run("fails when workflow is already running", func() { - startOptions := startOptionsWithOperation(nil) + startOptions := startWorkflowOptions() _, err := ts.client.ExecuteWorkflow(ctx, startOptions, ts.workflows.UpdateEntityWorkflow) ts.NoError(err) + startOp := ts.client.NewWithStartWorkflowOperation(startOptions, ts.workflows.UpdateEntityWorkflow) - updateOp := client.NewUpdateWithStartWorkflowOperation( - client.UpdateWorkflowOptions{ + _, err = ts.client.UpdateWithStartWorkflow(ctx, client.UpdateWithStartWorkflowOptions{ + UpdateOptions: client.UpdateWorkflowOptions{ UpdateName: "update", Args: []any{1}, WaitForStage: client.WorkflowUpdateStageCompleted, - }) + }, + StartWorkflowOperation: startOp, + }) - startOptions.WithStartOperation = updateOp // NOTE that WorkflowExecutionErrorWhenAlreadyStarted (defaults to false) has no impact - _, err = ts.client.ExecuteWorkflow(ctx, startOptions, ts.workflows.UpdateEntityWorkflow) ts.ErrorContains(err, "Workflow execution is already running") }) ts.Run("fails when executed twice", func() { - updateOp := client.NewUpdateWithStartWorkflowOperation( - client.UpdateWorkflowOptions{ - UpdateName: "update", - Args: []any{1}, - WaitForStage: client.WorkflowUpdateStageCompleted, - }) + startOp := ts.client.NewWithStartWorkflowOperation(startWorkflowOptions(), ts.workflows.UpdateEntityWorkflow) - startOptions := startOptionsWithOperation(updateOp) - _, err := ts.client.ExecuteWorkflow(ctx, startOptions, ts.workflows.UpdateEntityWorkflow) + updateOptions := client.UpdateWorkflowOptions{ + UpdateName: "update", + Args: []any{1}, + WaitForStage: client.WorkflowUpdateStageCompleted, + } + _, err := ts.client.UpdateWithStartWorkflow(ctx, client.UpdateWithStartWorkflowOptions{ + UpdateOptions: updateOptions, + StartWorkflowOperation: startOp, + }) ts.NoError(err) - _, err = ts.client.ExecuteWorkflow(ctx, startOptions, ts.workflows.UpdateEntityWorkflow) - ts.ErrorContains(err, "invalid WithStartOperation: was already executed") + _, err = ts.client.UpdateWithStartWorkflow(ctx, client.UpdateWithStartWorkflowOptions{ + UpdateOptions: updateOptions, + StartWorkflowOperation: startOp, + }) + ts.ErrorContains(err, "invalid WithStartWorkflowOperation: was already executed") }) ts.Run("propagates context", func() { - updateOp := client.NewUpdateWithStartWorkflowOperation( - client.UpdateWorkflowOptions{ - UpdateName: "update", - Args: []any{1}, - WaitForStage: client.WorkflowUpdateStageCompleted, - }) + startOp := ts.client.NewWithStartWorkflowOperation(startWorkflowOptions(), ts.workflows.ContextPropagator, true) - var propagatedValues []string ctx := context.Background() // Propagate values using different context propagators. ctx = context.WithValue(ctx, contextKey(testContextKey1), "propagatedValue1") ctx = context.WithValue(ctx, contextKey(testContextKey2), "propagatedValue2") ctx = context.WithValue(ctx, contextKey(testContextKey3), "non-propagatedValue") - startOptions := startOptionsWithOperation(updateOp) - err := ts.executeWorkflowWithContextAndOption(ctx, startOptions, ts.workflows.ContextPropagator, &propagatedValues, true) + + _, err := ts.client.UpdateWithStartWorkflow(ctx, client.UpdateWithStartWorkflowOptions{ + UpdateOptions: client.UpdateWorkflowOptions{ + UpdateName: "update", + Args: []any{1}, + WaitForStage: client.WorkflowUpdateStageCompleted, + }, + StartWorkflowOperation: startOp, + }) + ts.NoError(err) + + var propagatedValues []string + run, err := startOp.Get(ctx) ts.NoError(err) + ts.NoError(run.Get(ctx, &propagatedValues)) // One copy from workflow and one copy from activity * 2 for child workflow ts.EqualValues([]string{