Skip to content

Commit

Permalink
Add versioning override to start workflow (#1727)
Browse files Browse the repository at this point in the history
  • Loading branch information
antlai-temporal authored Nov 26, 2024
1 parent 83ad667 commit 7ea6a02
Show file tree
Hide file tree
Showing 6 changed files with 107 additions and 11 deletions.
8 changes: 8 additions & 0 deletions internal/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -747,6 +747,14 @@ type (
// NOTE: Experimental
StaticDetails string

// VersioningOverride - Sets the versioning configuration of a specific workflow execution, ignoring current
// server or worker default policies. This enables running canary tests without affecting existing workflows.
// To unset the override after the workflow is running, use UpdateWorkflowExecutionOptions.
// Optional: defaults to no override.
//
// NOTE: Experimental
VersioningOverride VersioningOverride

// request ID. Only settable by the SDK - e.g. [temporalnexus.workflowRunOperation].
requestID string
// workflow completion callback. Only settable by the SDK - e.g. [temporalnexus.workflowRunOperation].
Expand Down
2 changes: 2 additions & 0 deletions internal/internal_schedule_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -657,6 +657,7 @@ func convertToPBScheduleAction(
SearchAttributes: searchAttrs,
Header: header,
UserMetadata: userMetadata,
VersioningOverride: versioningOverrideToProto(action.VersioningOverride),
},
},
}, nil
Expand Down Expand Up @@ -708,6 +709,7 @@ func convertFromPBScheduleAction(logger log.Logger, action *schedulepb.ScheduleA
Memo: memos,
TypedSearchAttributes: searchAttrs,
UntypedSearchAttributes: untypedSearchAttrs,
VersioningOverride: versioningOverrideFromProto(workflow.VersioningOverride),
}, nil
default:
// TODO maybe just panic instead?
Expand Down
2 changes: 2 additions & 0 deletions internal/internal_workflow_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1625,6 +1625,7 @@ func (w *workflowClientInterceptor) ExecuteWorkflow(
Header: header,
CompletionCallbacks: in.Options.callbacks,
Links: in.Options.links,
VersioningOverride: versioningOverrideToProto(in.Options.VersioningOverride),
}

startRequest.UserMetadata, err = buildUserMetadata(in.Options.StaticSummary, in.Options.StaticDetails, dataConverter)
Expand Down Expand Up @@ -1929,6 +1930,7 @@ func (w *workflowClientInterceptor) SignalWithStartWorkflow(
WorkflowIdReusePolicy: in.Options.WorkflowIDReusePolicy,
WorkflowIdConflictPolicy: in.Options.WorkflowIDConflictPolicy,
Header: header,
VersioningOverride: versioningOverrideToProto(in.Options.VersioningOverride),
}

if in.Options.StartDelay != 0 {
Expand Down
61 changes: 61 additions & 0 deletions internal/internal_workflow_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1684,6 +1684,67 @@ func (s *workflowClientTestSuite) TestSignalWithStartWorkflowWithMemoAndSearchAt
_, _ = s.client.SignalWithStartWorkflow(context.Background(), "wid", "signal", "value", options, wf)
}

func (s *workflowClientTestSuite) TestStartWorkflowWithVersioningOverride() {
versioningOverride := VersioningOverride{
Behavior: VersioningBehaviorPinned,
Deployment: Deployment{
BuildId: "build1",
SeriesName: "deployment1",
},
}

options := StartWorkflowOptions{
ID: workflowID,
TaskQueue: taskqueue,
WorkflowExecutionTimeout: timeoutInSeconds,
WorkflowTaskTimeout: timeoutInSeconds,
VersioningOverride: versioningOverride,
}

wf := func(ctx Context) string {
panic("this is just a stub")
}
startResp := &workflowservice.StartWorkflowExecutionResponse{}

s.service.EXPECT().StartWorkflowExecution(gomock.Any(), gomock.Any(), gomock.Any()).Return(startResp, nil).
Do(func(_ interface{}, req *workflowservice.StartWorkflowExecutionRequest, _ ...interface{}) {
s.Equal(versioningBehaviorToProto(VersioningBehaviorPinned), req.VersioningOverride.GetBehavior())
s.Equal("build1", req.VersioningOverride.GetDeployment().GetBuildId())
s.Equal("deployment1", req.VersioningOverride.GetDeployment().GetSeriesName())
})
_, _ = s.client.ExecuteWorkflow(context.Background(), options, wf)
}

func (s *workflowClientTestSuite) TestSignalWithStartWorkflowWithVersioningOverride() {
versioningOverride := VersioningOverride{
Behavior: VersioningBehaviorPinned,
Deployment: Deployment{
BuildId: "build1",
SeriesName: "deployment1",
},
}

options := StartWorkflowOptions{
ID: "wid",
TaskQueue: taskqueue,
WorkflowExecutionTimeout: timeoutInSeconds,
WorkflowTaskTimeout: timeoutInSeconds,
VersioningOverride: versioningOverride,
}
wf := func(ctx Context) string {
panic("this is just a stub")
}
startResp := &workflowservice.SignalWithStartWorkflowExecutionResponse{}

s.service.EXPECT().SignalWithStartWorkflowExecution(gomock.Any(), gomock.Any(), gomock.Any()).Return(startResp, nil).
Do(func(_ interface{}, req *workflowservice.SignalWithStartWorkflowExecutionRequest, _ ...interface{}) {
s.Equal(versioningBehaviorToProto(VersioningBehaviorPinned), req.VersioningOverride.GetBehavior())
s.Equal("build1", req.VersioningOverride.GetDeployment().GetBuildId())
s.Equal("deployment1", req.VersioningOverride.GetDeployment().GetSeriesName())
})
_, _ = s.client.SignalWithStartWorkflow(context.Background(), "wid", "signal", "value", options, wf)
}

func (s *workflowClientTestSuite) TestGetWorkflowMemo() {
var input1 map[string]interface{}
result1, err := getWorkflowMemo(input1, s.dataConverter)
Expand Down
37 changes: 26 additions & 11 deletions internal/internal_workflow_execution_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,12 +117,33 @@ func workerDeploymentToProto(d Deployment) *deploymentpb.Deployment {
}
}

func versioningOverrideToProto(versioningOverride VersioningOverride) *workflowpb.VersioningOverride {
if (VersioningOverride{}) == versioningOverride {
return nil
}
return &workflowpb.VersioningOverride{
Behavior: versioningBehaviorToProto(versioningOverride.Behavior),
Deployment: workerDeploymentToProto(versioningOverride.Deployment),
}
}

func versioningOverrideFromProto(versioningOverride *workflowpb.VersioningOverride) VersioningOverride {
if versioningOverride == nil {
return VersioningOverride{}
}

return VersioningOverride{
Behavior: VersioningBehavior(versioningOverride.GetBehavior()),
Deployment: Deployment{
SeriesName: versioningOverride.GetDeployment().GetSeriesName(),
BuildId: versioningOverride.GetDeployment().GetBuildId(),
},
}
}

func workflowExecutionOptionsToProto(options WorkflowExecutionOptions) *workflowpb.WorkflowExecutionOptions {
return &workflowpb.WorkflowExecutionOptions{
VersioningOverride: &workflowpb.VersioningOverride{
Behavior: versioningBehaviorToProto(options.VersioningOverride.Behavior),
Deployment: workerDeploymentToProto(options.VersioningOverride.Deployment),
},
VersioningOverride: versioningOverrideToProto(options.VersioningOverride),
}
}

Expand All @@ -134,12 +155,6 @@ func workflowExecutionOptionsFromProtoUpdateResponse(response *workflowservice.U
versioningOverride := response.GetWorkflowExecutionOptions().GetVersioningOverride()

return WorkflowExecutionOptions{
VersioningOverride: VersioningOverride{
Behavior: VersioningBehavior(versioningOverride.GetBehavior()),
Deployment: Deployment{
SeriesName: versioningOverride.GetDeployment().GetSeriesName(),
BuildId: versioningOverride.GetDeployment().GetBuildId(),
},
},
VersioningOverride: versioningOverrideFromProto(versioningOverride),
}
}
8 changes: 8 additions & 0 deletions internal/schedule_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,14 @@ type (
// Deprecated - This is only for update of older search attributes. This may be removed in a future version.
UntypedSearchAttributes map[string]*commonpb.Payload

// VersioningOverride - Sets the versioning configuration of a specific workflow execution, ignoring current
// server or worker default policies. This enables running canary tests without affecting existing workflows.
// To unset the override after the workflow is running, use Client.UpdateWorkflowExecutionOptions.
// Optional: defaults to no override.
//
// NOTE: Experimental
VersioningOverride VersioningOverride

// TODO(cretz): Expose once https://github.com/temporalio/temporal/issues/6412 is fixed
staticSummary string
staticDetails string
Expand Down

0 comments on commit 7ea6a02

Please sign in to comment.