Skip to content

Commit

Permalink
New Update-With-Start API (#1731)
Browse files Browse the repository at this point in the history
Instead of using the ExecuteWorkflow client method, update-with-start is invoked
via a new client method UpdateWithStartWorkflow.

To use this method, first use NewWithStartWorkflowOperation to define
the start-workflow operation. A workflow ID conflict policy is required. Then
call UpdateWithStartWorkflow, passing it an UpdateWithStartWorkflowOptions
containing your WithStartWorkflowOperation, together with an
UpdateWorkflowOptions defining the update operation. This will return an
UpdateHandle. The  WithStartWorkflowOperation exposes a blocking .Get(ctx)
method to obtain the workflow run targeted by the update.
  • Loading branch information
dandavison authored Dec 6, 2024
1 parent 9c4dde8 commit 9d59447
Show file tree
Hide file tree
Showing 11 changed files with 679 additions and 396 deletions.
41 changes: 26 additions & 15 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,16 +162,11 @@ type (
// StartWorkflowOptions configuration parameters for starting a workflow execution.
StartWorkflowOptions = internal.StartWorkflowOptions

// WithStartWorkflowOperation is a type of operation that can be executed as part of a workflow start.
// For example, use NewUpdateWithStartWorkflowOperation to perform Update-with-Start.
// WithStartWorkflowOperation defines how to start a workflow when using UpdateWithStartWorkflow.
// See [Client.NewWithStartWorkflowOperation] and [Client.UpdateWithStartWorkflow].
// NOTE: Experimental
WithStartWorkflowOperation = internal.WithStartWorkflowOperation

// UpdateWithStartWorkflowOperation is used to perform Update-with-Start.
// See NewUpdateWithStartWorkflowOperation for details.
// NOTE: Experimental
UpdateWithStartWorkflowOperation = internal.UpdateWithStartWorkflowOperation

// HistoryEventIterator is a iterator which can return history events.
HistoryEventIterator = internal.HistoryEventIterator

Expand Down Expand Up @@ -279,6 +274,11 @@ type (
// NOTE: Experimental
UpdateWorkflowOptions = internal.UpdateWorkflowOptions

// UpdateWithStartWorkflowOptions encapsulates the parameters used by UpdateWithStartWorkflow.
// See [Client.UpdateWithStartWorkflow] and [Client.NewWithStartWorkflowOperation].
// NOTE: Experimental
UpdateWithStartWorkflowOptions = internal.UpdateWithStartWorkflowOptions

// WorkflowUpdateHandle represents a running or completed workflow
// execution update and gives the holder access to the outcome of the same.
// NOTE: Experimental
Expand Down Expand Up @@ -564,6 +564,11 @@ type (
SignalWithStartWorkflow(ctx context.Context, workflowID string, signalName string, signalArg interface{},
options StartWorkflowOptions, workflow interface{}, workflowArgs ...interface{}) (WorkflowRun, error)

// NewWithStartWorkflowOperation returns a WithStartWorkflowOperation for use with UpdateWithStartWorkflow.
// See [Client.UpdateWithStartWorkflow].
// NOTE: Experimental
NewWithStartWorkflowOperation(options StartWorkflowOptions, workflow interface{}, args ...interface{}) WithStartWorkflowOperation

// CancelWorkflow request cancellation of a workflow in execution. Cancellation request closes the channel
// returned by the workflow.Context.Done() of the workflow that is target of the request.
// - workflow ID of the workflow.
Expand Down Expand Up @@ -840,6 +845,20 @@ type (
// NOTE: Experimental
UpdateWorkflow(ctx context.Context, options UpdateWorkflowOptions) (WorkflowUpdateHandle, error)

// UpdateWithStartWorkflow issues an update-with-start request. A
// WorkflowIDConflictPolicy must be set in the options. If the specified
// workflow execution is not running, then a new workflow execution is
// started and the update is sent in the first workflow task.
// Alternatively if the specified workflow execution is running then, if
// the WorkflowIDConflictPolicy is USE_EXISTING, the update is issued
// against the specified workflow, and if the WorkflowIDConflictPolicy
// is FAIL, an error is returned. The call will block until the update
// has reached the WaitForStage in the options. Note that this means
// that the call will not return successfully until the update has been
// delivered to a worker.
// NOTE: Experimental
UpdateWithStartWorkflow(ctx context.Context, options UpdateWithStartWorkflowOptions) (WorkflowUpdateHandle, error)

// GetWorkflowUpdateHandle creates a handle to the referenced update
// which can be polled for an outcome. Note that runID is optional and
// if not specified the most recent runID will be used.
Expand Down Expand Up @@ -934,14 +953,6 @@ type MetricsTimer = metrics.Timer
// MetricsNopHandler is a noop handler that does nothing with the metrics.
var MetricsNopHandler = metrics.NopHandler

// NewUpdateWithStartWorkflowOperation returns an UpdateWithStartWorkflowOperation to perform Update-with-Start.
// After executing Client.ExecuteWorkflow with the UpdateWithStartWorkflow in the start options,
// the update result can be obtained.
// NOTE: Experimental
func NewUpdateWithStartWorkflowOperation(options UpdateWorkflowOptions) *UpdateWithStartWorkflowOperation {
return internal.NewUpdateWithStartWorkflowOperation(options)
}

// Dial creates an instance of a workflow client. This will attempt to connect
// to the server eagerly and will return an error if the server is not
// available.
Expand Down
4 changes: 4 additions & 0 deletions interceptor/interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,10 @@ type ScheduleClientCreateInput = internal.ScheduleClientCreateInput
// ClientOutoundInterceptor.UpdateWorkflow.
type ClientUpdateWorkflowInput = internal.ClientUpdateWorkflowInput

// ClientUpdateWithStartWorkflowInput is input for
// ClientOutboundInterceptor.UpdateWithStartWorkflow.
type ClientUpdateWithStartWorkflowInput = internal.ClientUpdateWithStartWorkflowInput

// Header provides Temporal header information from the context for reading or
// writing during specific interceptor calls.
//
Expand Down
27 changes: 27 additions & 0 deletions interceptor/tracing_interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,33 @@ func (t *tracingClientOutboundInterceptor) UpdateWorkflow(
return val, err
}

func (t *tracingClientOutboundInterceptor) UpdateWithStartWorkflow(
ctx context.Context,
in *ClientUpdateWithStartWorkflowInput,
) (client.WorkflowUpdateHandle, error) {
// Only add tracing if enabled
if t.root.options.DisableUpdateTracing {
return t.Next.UpdateWithStartWorkflow(ctx, in)
}
// Start span and write to header
span, ctx, err := t.root.startSpanFromContext(ctx, &TracerStartSpanOptions{
Operation: "UpdateWithStartWorkflow",
Name: in.UpdateOptions.UpdateName,
Tags: map[string]string{workflowIDTagKey: in.UpdateOptions.WorkflowID, updateIDTagKey: in.UpdateOptions.UpdateID},
ToHeader: true,
Time: time.Now(),
})
if err != nil {
return nil, err
}
var finishOpts TracerFinishSpanOptions
defer span.Finish(&finishOpts)

val, err := t.Next.UpdateWithStartWorkflow(ctx, in)
finishOpts.Error = err
return val, err
}

type tracingActivityOutboundInterceptor struct {
ActivityOutboundInterceptorBase
root *tracingInterceptor
Expand Down
80 changes: 31 additions & 49 deletions internal/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ package internal
import (
"context"
"crypto/tls"
"errors"
"fmt"
"sync/atomic"
"time"
Expand Down Expand Up @@ -135,6 +134,10 @@ type (
SignalWithStartWorkflow(ctx context.Context, workflowID string, signalName string, signalArg interface{},
options StartWorkflowOptions, workflow interface{}, workflowArgs ...interface{}) (WorkflowRun, error)

// NewWithStartWorkflowOperation returns a WithStartWorkflowOperation for use in UpdateWithStartWorkflow.
// NOTE: Experimental
NewWithStartWorkflowOperation(options StartWorkflowOptions, workflow interface{}, args ...interface{}) WithStartWorkflowOperation

// CancelWorkflow cancels a workflow in execution
// - workflow ID of the workflow.
// - runID can be default(empty string). if empty string then it will pick the running execution of that workflow ID.
Expand Down Expand Up @@ -394,6 +397,17 @@ type (
// NOTE: Experimental
UpdateWorkflow(ctx context.Context, options UpdateWorkflowOptions) (WorkflowUpdateHandle, error)

// UpdateWithStartWorkflow issues an update-with-start request. A
// WorkflowIDConflictPolicy must be set. If the specified workflow is
// not running, then a new workflow execution is started and the update
// is sent in the first workflow task. Alternatively if the specified
// workflow is running then, if the WorkflowIDConflictPolicy is
// USE_EXISTING, the update is issued against the specified workflow,
// and if the WorkflowIDConflictPolicy is FAIL, an error is returned.
//
// NOTE: Experimental
UpdateWithStartWorkflow(ctx context.Context, options UpdateWithStartWorkflowOptions) (WorkflowUpdateHandle, error)

// GetWorkflowUpdateHandle creates a handle to the referenced update
// which can be polled for an outcome. Note that runID is optional and
// if not specified the most recent runID will be used.
Expand Down Expand Up @@ -647,18 +661,6 @@ type (
// Optional: defaulted to Fail.
WorkflowIDConflictPolicy enumspb.WorkflowIdConflictPolicy

// WithStartOperation - Operation to execute with Workflow Start.
// For example, see NewUpdateWithStartWorkflowOperation to perform Update-with-Start. Note that if the workflow is
// already running and WorkflowIDConflictPolicy is set to UseExisting, the start is skipped and only the
// operation is executed. If instead the policy is set to Fail (the default), nothing is executed and
// an error will be returned (i.e. the option WorkflowExecutionErrorWhenAlreadyStarted is ignored).
// This option will be ignored when used with Client.SignalWithStartWorkflow.
//
// Optional: defaults to nil.
//
// NOTE: Experimental
WithStartOperation WithStartWorkflowOperation

// When WorkflowExecutionErrorWhenAlreadyStarted is true, Client.ExecuteWorkflow will return an error if the
// workflow id has already been used and WorkflowIDReusePolicy or WorkflowIDConflictPolicy would
// disallow a re-run. If it is set to false, rather than erroring a WorkflowRun instance representing
Expand Down Expand Up @@ -751,22 +753,24 @@ type (
links []*commonpb.Link
}

// WithStartWorkflowOperation is a type of operation that can be executed as part of a workflow start.
// WithStartWorkflowOperation defines how to start a workflow when using UpdateWithStartWorkflow.
// See [NewWithStartWorkflowOperation] and [UpdateWithStartWorkflow].
// NOTE: Experimental
WithStartWorkflowOperation interface {
isWithStartWorkflowOperation()
// Get returns the WorkflowRun that was targeted by the UpdateWithStartWorkflow call.
// This is a blocking API.
Get(ctx context.Context) (WorkflowRun, error)
}

// UpdateWithStartWorkflowOperation is used to perform Update-with-Start.
// See NewUpdateWithStartWorkflowOperation for details.
UpdateWithStartWorkflowOperation struct {
input *ClientUpdateWorkflowInput
withStartWorkflowOperationImpl struct {
input *ClientExecuteWorkflowInput
// flag to ensure the operation is only executed once
executed atomic.Bool
// channel to indicate that handle or err is available
doneCh chan struct{}
// handle and err cannot be accessed before doneCh is closed
handle WorkflowUpdateHandle
err error
// workflowRun and err cannot be accessed before doneCh is closed
workflowRun WorkflowRun
err error
}

// RetryPolicy defines the retry policy.
Expand Down Expand Up @@ -1059,30 +1063,10 @@ func DialCloudOperationsClient(ctx context.Context, options CloudOperationsClien
}, nil
}

// NewUpdateWithStartWorkflowOperation returns an UpdateWithStartWorkflowOperation that can be used to perform Update-with-Start.
func NewUpdateWithStartWorkflowOperation(options UpdateWorkflowOptions) *UpdateWithStartWorkflowOperation {
res := &UpdateWithStartWorkflowOperation{doneCh: make(chan struct{})}

input, err := createUpdateWorkflowInput(options)
if err != nil {
res.set(nil, err)
} else if options.RunID != "" {
res.set(nil, errors.New("RunID cannot be set because the workflow might not be running"))
}
if options.FirstExecutionRunID != "" {
res.set(nil, errors.New("FirstExecutionRunID cannot be set because the workflow might not be running"))
} else {
res.input = input
}

return res
}

// Get blocks until a server response has been received; or the context deadline is exceeded.
func (op *UpdateWithStartWorkflowOperation) Get(ctx context.Context) (WorkflowUpdateHandle, error) {
func (op *withStartWorkflowOperationImpl) Get(ctx context.Context) (WorkflowRun, error) {
select {
case <-op.doneCh:
return op.handle, op.err
return op.workflowRun, op.err
case <-ctx.Done():
if !op.executed.Load() {
return nil, fmt.Errorf("%w: %w", ctx.Err(), fmt.Errorf("operation was not executed"))
Expand All @@ -1091,21 +1075,19 @@ func (op *UpdateWithStartWorkflowOperation) Get(ctx context.Context) (WorkflowUp
}
}

func (op *UpdateWithStartWorkflowOperation) markExecuted() error {
func (op *withStartWorkflowOperationImpl) markExecuted() error {
if op.executed.Swap(true) {
return fmt.Errorf("was already executed")
}
return nil
}

func (op *UpdateWithStartWorkflowOperation) set(handle WorkflowUpdateHandle, err error) {
op.handle = handle
func (op *withStartWorkflowOperationImpl) set(workflowRun WorkflowRun, err error) {
op.workflowRun = workflowRun
op.err = err
close(op.doneCh)
}

func (op *UpdateWithStartWorkflowOperation) isWithStartWorkflowOperation() {}

// NewNamespaceClient creates an instance of a namespace client, to manager lifecycle of namespaces.
func NewNamespaceClient(options ClientOptions) (NamespaceClient, error) {
// Initialize root tags
Expand Down
10 changes: 10 additions & 0 deletions internal/interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,11 @@ type ClientOutboundInterceptor interface {
// NOTE: Experimental
UpdateWorkflow(context.Context, *ClientUpdateWorkflowInput) (WorkflowUpdateHandle, error)

// UpdateWithStartWorkflow intercepts client.Client.UpdateWithStartWorkflow.
//
// NOTE: Experimental
UpdateWithStartWorkflow(context.Context, *ClientUpdateWithStartWorkflowInput) (WorkflowUpdateHandle, error)

// PollWorkflowUpdate requests the outcome of a specific update from the
// server.
//
Expand All @@ -416,6 +421,11 @@ type ClientUpdateWorkflowInput struct {
WaitForStage WorkflowUpdateStage
}

type ClientUpdateWithStartWorkflowInput struct {
UpdateOptions *UpdateWorkflowOptions
StartWorkflowOperation WithStartWorkflowOperation
}

// ClientPollWorkflowUpdateInput is the input to
// ClientOutboundInterceptor.PollWorkflowUpdate.
type ClientPollWorkflowUpdateInput struct {
Expand Down
7 changes: 7 additions & 0 deletions internal/interceptor_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,13 @@ func (c *ClientOutboundInterceptorBase) PollWorkflowUpdate(
return c.Next.PollWorkflowUpdate(ctx, in)
}

func (c *ClientOutboundInterceptorBase) UpdateWithStartWorkflow(
ctx context.Context,
in *ClientUpdateWithStartWorkflowInput,
) (WorkflowUpdateHandle, error) {
return c.Next.UpdateWithStartWorkflow(ctx, in)
}

// ExecuteWorkflow implements ClientOutboundInterceptor.ExecuteWorkflow.
func (c *ClientOutboundInterceptorBase) ExecuteWorkflow(
ctx context.Context,
Expand Down
Loading

0 comments on commit 9d59447

Please sign in to comment.