From 34b1096ab129eabc59eed36c50197d4e286366ff Mon Sep 17 00:00:00 2001 From: Antonio Lain <135073478+antlai-temporal@users.noreply.github.com> Date: Mon, 2 Dec 2024 18:24:12 -0800 Subject: [PATCH] Add deployment API (#1733) --- client/client.go | 96 +++++++ internal/client.go | 3 + internal/deployment_client.go | 243 +++++++++++++++++ internal/internal_deployment_client.go | 255 ++++++++++++++++++ internal/internal_deployment_client_test.go | 182 +++++++++++++ internal/internal_workflow_client.go | 7 + internal/internal_workflow_client_test.go | 4 +- .../internal_workflow_execution_options.go | 15 +- ...nternal_workflow_execution_options_test.go | 2 +- internal/nexus_operations.go | 5 + mocks/Client.go | 20 ++ mocks/DeploymentClient.go | 200 ++++++++++++++ mocks/DeploymentListIterator.go | 104 +++++++ 13 files changed, 1120 insertions(+), 16 deletions(-) create mode 100644 internal/deployment_client.go create mode 100644 internal/internal_deployment_client.go create mode 100644 internal/internal_deployment_client_test.go create mode 100644 mocks/DeploymentClient.go create mode 100644 mocks/DeploymentListIterator.go diff --git a/client/client.go b/client/client.go index 7ac15bcad..037b47495 100644 --- a/client/client.go +++ b/client/client.go @@ -46,6 +46,36 @@ import ( "go.temporal.io/sdk/internal/common/metrics" ) +// DeploymentReachability specifies which category of tasks may reach a worker +// associated with a deployment, simplifying safe decommission. +// NOTE: Experimental +type DeploymentReachability = internal.DeploymentReachability + +const ( + // DeploymentReachabilityUnspecified - Reachability level not specified. + // NOTE: Experimental + DeploymentReachabilityUnspecified = internal.DeploymentReachabilityUnspecified + + // DeploymentReachabilityReachable - The deployment is reachable by new + // and/or open workflows. The deployment cannot be decommissioned safely. + // NOTE: Experimental + DeploymentReachabilityReachable = internal.DeploymentReachabilityReachable + + // DeploymentReachabilityClosedWorkflows - The deployment is not reachable + // by new or open workflows, but might be still needed by + // Queries sent to closed workflows. The deployment can be decommissioned + // safely if user does not query closed workflows. + // NOTE: Experimental + DeploymentReachabilityClosedWorkflows = internal.DeploymentReachabilityClosedWorkflows + + // DeploymentReachabilityUnreachable - The deployment is not reachable by + // any workflow because all the workflows who needed this + // deployment are out of the retention period. The deployment can be + // decommissioned safely. + // NOTE: Experimental + DeploymentReachabilityUnreachable = internal.DeploymentReachabilityUnreachable +) + // TaskReachability specifies which category of tasks may reach a worker on a versioned task queue. // Used both in a reachability query and its response. // @@ -279,6 +309,68 @@ type ( // NOTE: Experimental UpdateWorkflowOptions = internal.UpdateWorkflowOptions + // Deployment identifies a set of workers. This identifier combines + // the deployment series name with their Build ID. + // NOTE: Experimental + Deployment = internal.Deployment + + // DeploymentTaskQueueInfo describes properties of the Task Queues involved + // in a deployment. + // NOTE: Experimental + DeploymentTaskQueueInfo = internal.DeploymentTaskQueueInfo + + // DeploymentInfo holds information associated with + // workers in this deployment. + // Workers can poll multiple task queues in a single deployment, + // which are listed in this message. + // NOTE: Experimental + DeploymentInfo = internal.DeploymentInfo + + // DeploymentListEntry is a subset of fields from DeploymentInfo. + // NOTE: Experimental + DeploymentListEntry = internal.DeploymentListEntry + + // DeploymentListIterator is an iterator for deployments. + // NOTE: Experimental + DeploymentListIterator = internal.DeploymentListIterator + + // DeploymentListOptions are the parameters for configuring listing deployments. + // NOTE: Experimental + DeploymentListOptions = internal.DeploymentListOptions + + // DeploymentReachabilityInfo extends DeploymentInfo with reachability information. + // NOTE: Experimental + DeploymentReachabilityInfo = internal.DeploymentReachabilityInfo + + // DeploymentMetadataUpdate modifies user-defined metadata entries that describe + // a deployment. + // NOTE: Experimental + DeploymentMetadataUpdate = internal.DeploymentMetadataUpdate + + // DeploymentDescribeOptions provides options for DeploymentClient.Describe. + // NOTE: Experimental + DeploymentDescribeOptions = internal.DeploymentDescribeOptions + + // DeploymentGetReachabilityOptions provides options for DeploymentClient.GetReachability. + // NOTE: Experimental + DeploymentGetReachabilityOptions = internal.DeploymentGetReachabilityOptions + + // DeploymentGetCurrentOptions provides options for DeploymentClient.GetCurrent. + // NOTE: Experimental + DeploymentGetCurrentOptions = internal.DeploymentGetCurrentOptions + + // DeploymentSetCurrentOptions provides options for DeploymentClient.SetCurrent. + // NOTE: Experimental + DeploymentSetCurrentOptions = internal.DeploymentSetCurrentOptions + + // DeploymentSetCurrentResponse is the response type for DeploymentClient.SetCurrent. + // NOTE: Experimental + DeploymentSetCurrentResponse = internal.DeploymentSetCurrentResponse + + // DeploymentClient is the server interface to manage deployments. + // NOTE: Experimental + DeploymentClient = internal.DeploymentClient + // UpdateWorkflowExecutionOptionsRequest is a request for Client.UpdateWorkflowExecutionOptions. // NOTE: Experimental UpdateWorkflowExecutionOptionsRequest = internal.UpdateWorkflowExecutionOptionsRequest @@ -880,6 +972,10 @@ type ( // Schedule creates a new shedule client with the same gRPC connection as this client. ScheduleClient() ScheduleClient + // DeploymentClient create a new deployment client with the same gRPC connection as this client. + // NOTE: Experimental + DeploymentClient() DeploymentClient + // Close client and clean up underlying resources. // // If this client was created via NewClientFromExisting or this client has diff --git a/internal/client.go b/internal/client.go index 7cbec8645..4c7c16e68 100644 --- a/internal/client.go +++ b/internal/client.go @@ -417,6 +417,9 @@ type ( // Schedule creates a new shedule client with the same gRPC connection as this client. ScheduleClient() ScheduleClient + // DeploymentClient creates a new deployment client with the same gRPC connection as this client. + DeploymentClient() DeploymentClient + // Close client and clean up underlying resources. Close() } diff --git a/internal/deployment_client.go b/internal/deployment_client.go new file mode 100644 index 000000000..f9c693d3c --- /dev/null +++ b/internal/deployment_client.go @@ -0,0 +1,243 @@ +// The MIT License +// +// Copyright (c) 2022 Temporal Technologies Inc. All rights reserved. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package internal + +import ( + "context" + "time" + + commonpb "go.temporal.io/api/common/v1" +) + +// DeploymentReachability specifies which category of tasks may reach a worker +// associated with a deployment, simplifying safe decommission. +// NOTE: Experimental +type DeploymentReachability int + +const ( + // DeploymentReachabilityUnspecified - Reachability level not specified. + // NOTE: Experimental + DeploymentReachabilityUnspecified = iota + + // DeploymentReachabilityReachable - The deployment is reachable by new + // and/or open workflows. The deployment cannot be decommissioned safely. + // NOTE: Experimental + DeploymentReachabilityReachable + + // DeploymentReachabilityClosedWorkflows - The deployment is not reachable + // by new or open workflows, but might be still needed by + // Queries sent to closed workflows. The deployment can be decommissioned + // safely if user does not query closed workflows. + // NOTE: Experimental + DeploymentReachabilityClosedWorkflows + + // DeploymentReachabilityUnreachable - The deployment is not reachable by + // any workflow because all the workflows who needed this + // deployment are out of the retention period. The deployment can be + // decommissioned safely. + // NOTE: Experimental + DeploymentReachabilityUnreachable +) + +type ( + // Deployment identifies a set of workers. This identifier combines + // the deployment series name with their Build ID. + // NOTE: Experimental + Deployment struct { + // SeriesName - Name of the deployment series. Different versions of the same worker + // service/application are linked together by sharing a series name. + SeriesName string + + // BuildID - identifies the worker's code and configuration version. + BuildID string + } + + // DeploymentTaskQueueInfo describes properties of the Task Queues involved + // in a deployment. + // NOTE: Experimental + DeploymentTaskQueueInfo struct { + // Name - Task queue name. + Name string + + // Type - The type of this task queue. + Type TaskQueueType + + // FirstPollerTime - Time when the server saw the first poller for this task queue + // in this deployment. + FirstPollerTime time.Time + } + + // DeploymentInfo holds information associated with + // workers in this deployment. + // Workers can poll multiple task queues in a single deployment, + // which are listed in this message. + // NOTE: Experimental + DeploymentInfo struct { + // Deployment - An identifier for this deployment. + Deployment Deployment + + // CreateTime - When this deployment was created. + CreateTime time.Time + + // Current - Whether this deployment is the current one for its deployment series. + Current bool + + // TaskQueuesInfo - List of task queues polled by workers in this deployment. + TaskQueuesInfo []DeploymentTaskQueueInfo + + // Metadata - A user-defined set of key-values. Can be updated with `DeploymentClient.SetCurrent`. + Metadata map[string]*commonpb.Payload + } + + // DeploymentListEntry is a subset of fields from DeploymentInfo + // NOTE: Experimental + DeploymentListEntry struct { + // An identifier for this deployment. + Deployment Deployment + + // When this deployment was created. + CreateTime time.Time + + // Whether this deployment is the current one for its deployment series. + Current bool + } + + // DeploymentListIterator is an iterator for deployments. + // NOTE: Experimental + DeploymentListIterator interface { + // HasNext - Return whether this iterator has next value. + HasNext() bool + + // Next - Returns the next deployment and error + Next() (*DeploymentListEntry, error) + } + + // DeploymentListOptions are the parameters for configuring listing deployments. + // NOTE: Experimental + DeploymentListOptions struct { + // PageSize - How many results to fetch from the Server at a time. + // Optional: defaulted to 1000 + PageSize int + + // SeriesName - Filter with the name of the deployment series. + // Optional: If present, use an exact series name match. + SeriesName string + } + + // DeploymentReachabilityInfo extends DeploymentInfo with reachability information. + // NOTE: Experimental + DeploymentReachabilityInfo struct { + DeploymentInfo + + // Reachability - Kind of tasks that may reach a worker + // associated with a deployment. + Reachability DeploymentReachability + + // LastUpdateTime - When reachability was last computed. Computing reachability + // is an expensive operation, and the server caches results. + LastUpdateTime time.Time + } + + // DeploymentMetadataUpdate modifies user-defined metadata entries that describe + // a deployment. + // NOTE: Experimental + DeploymentMetadataUpdate struct { + // UpsertEntries - Metadata entries inserted or modified. When values are not + // of type *commonpb.Payload, the default data converter will be used to generate + // payloads. + UpsertEntries map[string]interface{} + + // RemoveEntries - List of keys to remove from the metadata. + RemoveEntries []string + } + + // DeploymentSetCurrentOptions provides options for DeploymentClient.SetCurrent. + // NOTE: Experimental + DeploymentSetCurrentOptions struct { + // Deployment - An identifier for this deployment. + Deployment Deployment + + // MetadataUpdate - Optional: Changes to the user-defined metadata entries + // for this deployment. + MetadataUpdate DeploymentMetadataUpdate + } + + // DeploymentSetCurrentResponse is the response type for DeploymentClient.SetCurrent. + // NOTE: Experimental + DeploymentSetCurrentResponse struct { + // Current - Information about the current deployment after this operation. + Current DeploymentInfo + + // Previous - Information about the last current deployment, i.e., before this operation. + Previous DeploymentInfo + } + + // DeploymentDescribeOptions provides options for DeploymentClient.Describe. + // NOTE: Experimental + DeploymentDescribeOptions struct { + // Deployment - Identifier that combines the deployment series name with their Build ID. + Deployment Deployment + } + + // DeploymentGetReachabilityOptions provides options for DeploymentClient.GetReachability. + // NOTE: Experimental + DeploymentGetReachabilityOptions struct { + // Deployment - Identifier that combines the deployment series name with their Build ID. + Deployment Deployment + } + + // DeploymentGetCurrentOptions provides options for DeploymentClient.GetCurrent. + // NOTE: Experimental + DeploymentGetCurrentOptions struct { + // SeriesName - Name of the deployment series. + SeriesName string + } + + // DeploymentClient is the client that manages deployments. + // NOTE: Experimental + DeploymentClient interface { + // Describes an existing deployment. + // NOTE: Experimental + Describe(ctx context.Context, options DeploymentDescribeOptions) (DeploymentInfo, error) + + // List returns an iterator to enumerate deployments in the client's namespace. + // It can also optionally filter deployments by series name. + // NOTE: Experimental + List(ctx context.Context, options DeploymentListOptions) (DeploymentListIterator, error) + + // GetReachability returns reachability information for a deployment. This operation is + // expensive, and results may be cached. Use the returned DeploymentReachabilityInfo.LastUpdateTime + // to estimate cache staleness. + // NOTE: Experimental + GetReachability(ctx context.Context, options DeploymentGetReachabilityOptions) (DeploymentReachabilityInfo, error) + + // GetCurrent returns the current deployment for a given deployment series. + // NOTE: Experimental + GetCurrent(ctx context.Context, options DeploymentGetCurrentOptions) (DeploymentInfo, error) + + // SetCurrent changes the current deployment for a given deployment series. It can also + // update metadata for this deployment. + // NOTE: Experimental + SetCurrent(ctx context.Context, options DeploymentSetCurrentOptions) (DeploymentSetCurrentResponse, error) + } +) diff --git a/internal/internal_deployment_client.go b/internal/internal_deployment_client.go new file mode 100644 index 000000000..86f49ab62 --- /dev/null +++ b/internal/internal_deployment_client.go @@ -0,0 +1,255 @@ +// The MIT License +// +// Copyright (c) 2022 Temporal Technologies Inc. All rights reserved. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package internal + +import ( + "context" + "fmt" + + "go.temporal.io/api/common/v1" + "go.temporal.io/api/deployment/v1" + "go.temporal.io/api/workflowservice/v1" + "go.temporal.io/sdk/converter" +) + +type ( + // deploymentClient is the client for managing deployments. + deploymentClient struct { + workflowClient *WorkflowClient + } + + // deploymentListIteratorImpl is the implementation of DeploymentListIterator. + // Adapted from scheduleListIteratorImpl. + deploymentListIteratorImpl struct { + // nextDeploymentIndex - Local index to cached deployments. + nextDeploymentIndex int + + // err - Error from getting the last page of deployments. + err error + + // response - Last page of deployments from server. + response *workflowservice.ListDeploymentsResponse + + // paginate - Function to get the next page of deployment from server. + paginate func(nexttoken []byte) (*workflowservice.ListDeploymentsResponse, error) + } +) + +func (iter *deploymentListIteratorImpl) HasNext() bool { + if iter.err == nil { + if iter.response == nil || + (iter.nextDeploymentIndex >= len(iter.response.Deployments) && len(iter.response.NextPageToken) > 0) { + iter.response, iter.err = iter.paginate(iter.response.GetNextPageToken()) + iter.nextDeploymentIndex = 0 + } + } + return iter.nextDeploymentIndex < len(iter.response.GetDeployments()) || iter.err != nil +} + +func (iter *deploymentListIteratorImpl) Next() (*DeploymentListEntry, error) { + if !iter.HasNext() { + panic("DeploymentListIterator Next() called without checking HasNext()") + } else if iter.err != nil { + return nil, iter.err + } + deployment := iter.response.Deployments[iter.nextDeploymentIndex] + iter.nextDeploymentIndex++ + return deploymentListEntryFromProto(deployment), nil +} + +func deploymentFromProto(deployment *deployment.Deployment) Deployment { + return Deployment{ + SeriesName: deployment.GetSeriesName(), + BuildID: deployment.GetBuildId(), + } +} + +func deploymentToProto(deploymentID Deployment) *deployment.Deployment { + return &deployment.Deployment{ + SeriesName: deploymentID.SeriesName, + BuildId: deploymentID.BuildID, + } +} + +func deploymentListEntryFromProto(deployment *deployment.DeploymentListInfo) *DeploymentListEntry { + return &DeploymentListEntry{ + Deployment: deploymentFromProto(deployment.GetDeployment()), + CreateTime: deployment.GetCreateTime().AsTime(), + Current: deployment.GetIsCurrent(), + } +} + +func deploymentTaskQueuesInfoFromProto(tqsInfo []*deployment.DeploymentInfo_TaskQueueInfo) []DeploymentTaskQueueInfo { + result := []DeploymentTaskQueueInfo{} + for _, info := range tqsInfo { + result = append(result, DeploymentTaskQueueInfo{ + Name: info.GetName(), + Type: TaskQueueType(info.GetType()), + FirstPollerTime: info.GetFirstPollerTime().AsTime(), + }) + } + return result +} + +func deploymentInfoFromProto(deploymentInfo *deployment.DeploymentInfo) DeploymentInfo { + return DeploymentInfo{ + Deployment: deploymentFromProto(deploymentInfo.GetDeployment()), + CreateTime: deploymentInfo.GetCreateTime().AsTime(), + Current: deploymentInfo.GetIsCurrent(), + TaskQueuesInfo: deploymentTaskQueuesInfoFromProto(deploymentInfo.GetTaskQueueInfos()), + Metadata: deploymentInfo.GetMetadata(), + } +} + +func deploymentReachabilityInfoFromProto(response *workflowservice.GetDeploymentReachabilityResponse) DeploymentReachabilityInfo { + return DeploymentReachabilityInfo{ + DeploymentInfo: deploymentInfoFromProto(response.GetDeploymentInfo()), + Reachability: DeploymentReachability(response.GetReachability()), + LastUpdateTime: response.GetLastUpdateTime().AsTime(), + } +} + +func deploymentMetadataUpdateToProto(dc converter.DataConverter, update DeploymentMetadataUpdate) *deployment.UpdateDeploymentMetadata { + upsertEntries := make(map[string]*common.Payload) + + for k, v := range update.UpsertEntries { + if enc, ok := v.(*common.Payload); ok { + upsertEntries[k] = enc + } else { + dataConverter := dc + if dataConverter == nil { + dataConverter = converter.GetDefaultDataConverter() + } + metadataBytes, err := dataConverter.ToPayload(v) + if err != nil { + panic(fmt.Sprintf("encode deployment metadata error: %v", err.Error())) + } + upsertEntries[k] = metadataBytes + } + } + + return &deployment.UpdateDeploymentMetadata{ + UpsertEntries: upsertEntries, + RemoveEntries: update.RemoveEntries, + } +} + +func (dc *deploymentClient) List(ctx context.Context, options DeploymentListOptions) (DeploymentListIterator, error) { + paginate := func(nextToken []byte) (*workflowservice.ListDeploymentsResponse, error) { + grpcCtx, cancel := newGRPCContext(ctx, defaultGrpcRetryParameters(ctx)) + defer cancel() + request := &workflowservice.ListDeploymentsRequest{ + Namespace: dc.workflowClient.namespace, + PageSize: int32(options.PageSize), + NextPageToken: nextToken, + SeriesName: options.SeriesName, + } + + return dc.workflowClient.workflowService.ListDeployments(grpcCtx, request) + } + + return &deploymentListIteratorImpl{ + paginate: paginate, + }, nil +} + +func (dc *deploymentClient) Describe(ctx context.Context, options DeploymentDescribeOptions) (DeploymentInfo, error) { + if err := dc.workflowClient.ensureInitialized(ctx); err != nil { + return DeploymentInfo{}, err + } + request := &workflowservice.DescribeDeploymentRequest{ + Namespace: dc.workflowClient.namespace, + Deployment: deploymentToProto(options.Deployment), + } + grpcCtx, cancel := newGRPCContext(ctx, defaultGrpcRetryParameters(ctx)) + defer cancel() + + resp, err := dc.workflowClient.workflowService.DescribeDeployment(grpcCtx, request) + if err != nil { + return DeploymentInfo{}, err + } + + return deploymentInfoFromProto(resp.GetDeploymentInfo()), nil +} + +func (dc *deploymentClient) GetReachability(ctx context.Context, options DeploymentGetReachabilityOptions) (DeploymentReachabilityInfo, error) { + if err := dc.workflowClient.ensureInitialized(ctx); err != nil { + return DeploymentReachabilityInfo{}, err + } + + request := &workflowservice.GetDeploymentReachabilityRequest{ + Namespace: dc.workflowClient.namespace, + Deployment: deploymentToProto(options.Deployment), + } + grpcCtx, cancel := newGRPCContext(ctx, defaultGrpcRetryParameters(ctx)) + defer cancel() + + resp, err := dc.workflowClient.workflowService.GetDeploymentReachability(grpcCtx, request) + if err != nil { + return DeploymentReachabilityInfo{}, err + } + + return deploymentReachabilityInfoFromProto(resp), nil +} + +func (dc *deploymentClient) GetCurrent(ctx context.Context, options DeploymentGetCurrentOptions) (DeploymentInfo, error) { + if err := dc.workflowClient.ensureInitialized(ctx); err != nil { + return DeploymentInfo{}, err + } + request := &workflowservice.GetCurrentDeploymentRequest{ + Namespace: dc.workflowClient.namespace, + SeriesName: options.SeriesName, + } + grpcCtx, cancel := newGRPCContext(ctx, defaultGrpcRetryParameters(ctx)) + defer cancel() + + resp, err := dc.workflowClient.workflowService.GetCurrentDeployment(grpcCtx, request) + if err != nil { + return DeploymentInfo{}, err + } + return deploymentInfoFromProto(resp.GetCurrentDeploymentInfo()), nil +} + +func (dc *deploymentClient) SetCurrent(ctx context.Context, options DeploymentSetCurrentOptions) (DeploymentSetCurrentResponse, error) { + if err := dc.workflowClient.ensureInitialized(ctx); err != nil { + return DeploymentSetCurrentResponse{}, err + } + request := &workflowservice.SetCurrentDeploymentRequest{ + Namespace: dc.workflowClient.namespace, + Deployment: deploymentToProto(options.Deployment), + Identity: dc.workflowClient.identity, + UpdateMetadata: deploymentMetadataUpdateToProto(dc.workflowClient.dataConverter, options.MetadataUpdate), + } + grpcCtx, cancel := newGRPCContext(ctx, defaultGrpcRetryParameters(ctx)) + defer cancel() + + resp, err := dc.workflowClient.workflowService.SetCurrentDeployment(grpcCtx, request) + if err != nil { + return DeploymentSetCurrentResponse{}, err + } + + return DeploymentSetCurrentResponse{ + Current: deploymentInfoFromProto(resp.GetCurrentDeploymentInfo()), + Previous: deploymentInfoFromProto(resp.GetPreviousDeploymentInfo()), + }, nil +} diff --git a/internal/internal_deployment_client_test.go b/internal/internal_deployment_client_test.go new file mode 100644 index 000000000..f23ee283a --- /dev/null +++ b/internal/internal_deployment_client_test.go @@ -0,0 +1,182 @@ +// The MIT License +// +// Copyright (c) 2022 Temporal Technologies Inc. All rights reserved. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package internal + +import ( + "context" + "testing" + + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/suite" + deploymentpb "go.temporal.io/api/deployment/v1" + "go.temporal.io/api/serviceerror" + "go.temporal.io/api/workflowservice/v1" + "go.temporal.io/api/workflowservicemock/v1" + "go.temporal.io/sdk/converter" +) + +// deployment client test suite +type ( + deploymentClientTestSuite struct { + suite.Suite + mockCtrl *gomock.Controller + service *workflowservicemock.MockWorkflowServiceClient + client Client + dataConverter converter.DataConverter + } +) + +func TestDeploymentClientSuite(t *testing.T) { + suite.Run(t, new(deploymentClientTestSuite)) +} + +func (d *deploymentClientTestSuite) SetupTest() { + d.mockCtrl = gomock.NewController(d.T()) + d.service = workflowservicemock.NewMockWorkflowServiceClient(d.mockCtrl) + d.service.EXPECT().GetSystemInfo(gomock.Any(), gomock.Any(), gomock.Any()).Return(&workflowservice.GetSystemInfoResponse{}, nil).AnyTimes() + d.client = NewServiceClient(d.service, nil, ClientOptions{}) + d.dataConverter = converter.GetDefaultDataConverter() +} + +func (d *deploymentClientTestSuite) TearDownTest() { + d.mockCtrl.Finish() // assert mock’s expectations +} + +func (d *deploymentClientTestSuite) TestSetCurrentDeployment() { + metadata := map[string]interface{}{ + "data1": "metadata 1", + } + + options := DeploymentSetCurrentOptions{ + Deployment: Deployment{ + BuildID: "bid1", + SeriesName: "series1", + }, + MetadataUpdate: DeploymentMetadataUpdate{ + UpsertEntries: metadata, + RemoveEntries: []string{"never"}, + }, + } + createResp := &workflowservice.SetCurrentDeploymentResponse{} + + d.service.EXPECT().SetCurrentDeployment(gomock.Any(), gomock.Any(), gomock.Any()).Return(createResp, nil). + Do(func(_ interface{}, req *workflowservice.SetCurrentDeploymentRequest, _ ...interface{}) { + var resultMeta string + // verify the metadata + err := d.dataConverter.FromPayload(req.UpdateMetadata.UpsertEntries["data1"], &resultMeta) + d.NoError(err) + d.Equal("metadata 1", resultMeta) + + d.Equal(req.UpdateMetadata.RemoveEntries, []string{"never"}) + d.Equal(req.Deployment.BuildId, "bid1") + d.Equal(req.Deployment.SeriesName, "series1") + }) + _, _ = d.client.DeploymentClient().SetCurrent(context.Background(), options) +} + +func getListDeploymentsRequest() *workflowservice.ListDeploymentsRequest { + request := &workflowservice.ListDeploymentsRequest{ + Namespace: DefaultNamespace, + } + + return request +} + +// DeploymentIterator + +func (d *deploymentClientTestSuite) TestDeploymentIterator_NoError() { + request1 := getListDeploymentsRequest() + response1 := &workflowservice.ListDeploymentsResponse{ + Deployments: []*deploymentpb.DeploymentListInfo{ + { + IsCurrent: false, + }, + }, + NextPageToken: []byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, + } + request2 := getListDeploymentsRequest() + request2.NextPageToken = response1.NextPageToken + response2 := &workflowservice.ListDeploymentsResponse{ + Deployments: []*deploymentpb.DeploymentListInfo{ + { + IsCurrent: false, + }, + }, + NextPageToken: []byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, + } + + request3 := getListDeploymentsRequest() + request3.NextPageToken = response2.NextPageToken + response3 := &workflowservice.ListDeploymentsResponse{ + Deployments: []*deploymentpb.DeploymentListInfo{ + { + IsCurrent: false, + }, + }, + NextPageToken: nil, + } + + d.service.EXPECT().ListDeployments(gomock.Any(), request1, gomock.Any()).Return(response1, nil).Times(1) + d.service.EXPECT().ListDeployments(gomock.Any(), request2, gomock.Any()).Return(response2, nil).Times(1) + d.service.EXPECT().ListDeployments(gomock.Any(), request3, gomock.Any()).Return(response3, nil).Times(1) + + var events []*DeploymentListEntry + iter, _ := d.client.DeploymentClient().List(context.Background(), DeploymentListOptions{}) + for iter.HasNext() { + event, err := iter.Next() + d.Nil(err) + events = append(events, event) + } + d.Equal(3, len(events)) +} + +func (d *deploymentClientTestSuite) TestIteratorError() { + request1 := getListDeploymentsRequest() + response1 := &workflowservice.ListDeploymentsResponse{ + Deployments: []*deploymentpb.DeploymentListInfo{ + { + IsCurrent: false, + }, + }, + NextPageToken: []byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, + } + + request2 := getListDeploymentsRequest() + request2.NextPageToken = response1.NextPageToken + + d.service.EXPECT().ListDeployments(gomock.Any(), request1, gomock.Any()).Return(response1, nil).Times(1) + + iter, _ := d.client.DeploymentClient().List(context.Background(), DeploymentListOptions{}) + + d.True(iter.HasNext()) + event, err := iter.Next() + d.NotNil(event) + d.Nil(err) + + d.service.EXPECT().ListDeployments(gomock.Any(), request2, gomock.Any()).Return(nil, serviceerror.NewNotFound("")).Times(1) + + d.True(iter.HasNext()) + event, err = iter.Next() + d.Nil(event) + d.NotNil(err) +} diff --git a/internal/internal_workflow_client.go b/internal/internal_workflow_client.go index 6bc7f6bb7..686f9456e 100644 --- a/internal/internal_workflow_client.go +++ b/internal/internal_workflow_client.go @@ -1298,6 +1298,13 @@ func (wc *WorkflowClient) ScheduleClient() ScheduleClient { } } +// DeploymentClient implements Client.DeploymentClient. +func (wc *WorkflowClient) DeploymentClient() DeploymentClient { + return &deploymentClient{ + workflowClient: wc, + } +} + // Close client and clean up underlying resources. func (wc *WorkflowClient) Close() { // If there's a set of unclosed clients, we have to decrement it and then diff --git a/internal/internal_workflow_client_test.go b/internal/internal_workflow_client_test.go index 618924c7f..f3b9bfc85 100644 --- a/internal/internal_workflow_client_test.go +++ b/internal/internal_workflow_client_test.go @@ -1688,7 +1688,7 @@ func (s *workflowClientTestSuite) TestStartWorkflowWithVersioningOverride() { versioningOverride := VersioningOverride{ Behavior: VersioningBehaviorPinned, Deployment: Deployment{ - BuildId: "build1", + BuildID: "build1", SeriesName: "deployment1", }, } @@ -1719,7 +1719,7 @@ func (s *workflowClientTestSuite) TestSignalWithStartWorkflowWithVersioningOverr versioningOverride := VersioningOverride{ Behavior: VersioningBehaviorPinned, Deployment: Deployment{ - BuildId: "build1", + BuildID: "build1", SeriesName: "deployment1", }, } diff --git a/internal/internal_workflow_execution_options.go b/internal/internal_workflow_execution_options.go index 4d80c34bd..92f17e059 100644 --- a/internal/internal_workflow_execution_options.go +++ b/internal/internal_workflow_execution_options.go @@ -70,17 +70,6 @@ type ( // VersioningBehaviorPinned. Deployment Deployment } - - // Deployment identifies a set of workers. This identifier combines the deployment series - // name with their Build ID. - // NOTE: Experimental - Deployment struct { - // Name of the deployment series. Different versions of the same worker service/application are - // linked together by sharing a series name. - SeriesName string - // Build ID for the worker's code and configuration version. - BuildId string - } ) // Mapping WorkflowExecutionOptions field names to proto ones. @@ -113,7 +102,7 @@ func workflowExecutionOptionsMaskToProto(mask []string) *fieldmaskpb.FieldMask { func workerDeploymentToProto(d Deployment) *deploymentpb.Deployment { return &deploymentpb.Deployment{ SeriesName: d.SeriesName, - BuildId: d.BuildId, + BuildId: d.BuildID, } } @@ -136,7 +125,7 @@ func versioningOverrideFromProto(versioningOverride *workflowpb.VersioningOverri Behavior: VersioningBehavior(versioningOverride.GetBehavior()), Deployment: Deployment{ SeriesName: versioningOverride.GetDeployment().GetSeriesName(), - BuildId: versioningOverride.GetDeployment().GetBuildId(), + BuildID: versioningOverride.GetDeployment().GetBuildId(), }, } } diff --git a/internal/internal_workflow_execution_options_test.go b/internal/internal_workflow_execution_options_test.go index 6e2a63eff..19ebb80e0 100644 --- a/internal/internal_workflow_execution_options_test.go +++ b/internal/internal_workflow_execution_options_test.go @@ -61,7 +61,7 @@ func Test_WorkflowExecutionOptions_fromProtoResponse(t *testing.T) { Behavior: VersioningBehaviorPinned, Deployment: Deployment{ SeriesName: "my series", - BuildId: "v1", + BuildID: "v1", }, }, }, diff --git a/internal/nexus_operations.go b/internal/nexus_operations.go index e00522f1a..76350983c 100644 --- a/internal/nexus_operations.go +++ b/internal/nexus_operations.go @@ -394,6 +394,11 @@ func (t *testSuiteClientForNexusOperations) WorkflowService() workflowservice.Wo panic("not implemented in the test environment") } +// DeploymentClient implements Client. +func (t *testSuiteClientForNexusOperations) DeploymentClient() DeploymentClient { + panic("not implemented in the test environment") +} + // UpdateWorkflowExecutionOptions implements Client. func (t *testSuiteClientForNexusOperations) UpdateWorkflowExecutionOptions(ctx context.Context, options UpdateWorkflowExecutionOptionsRequest) (WorkflowExecutionOptions, error) { panic("not implemented in the test environment") diff --git a/mocks/Client.go b/mocks/Client.go index 2fe366fa8..24dfc5524 100644 --- a/mocks/Client.go +++ b/mocks/Client.go @@ -167,6 +167,26 @@ func (_m *Client) CountWorkflow(ctx context.Context, request *workflowservice.Co return r0, r1 } +// DeploymentClient provides a mock function with given fields: +func (_m *Client) DeploymentClient() client.DeploymentClient { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for DeploymentClient") + } + + var r0 client.DeploymentClient + if rf, ok := ret.Get(0).(func() client.DeploymentClient); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(client.DeploymentClient) + } + } + + return r0 +} + // DescribeTaskQueue provides a mock function with given fields: ctx, taskqueue, taskqueueType func (_m *Client) DescribeTaskQueue(ctx context.Context, taskqueue string, taskqueueType enums.TaskQueueType) (*workflowservice.DescribeTaskQueueResponse, error) { ret := _m.Called(ctx, taskqueue, taskqueueType) diff --git a/mocks/DeploymentClient.go b/mocks/DeploymentClient.go new file mode 100644 index 000000000..737ca50d0 --- /dev/null +++ b/mocks/DeploymentClient.go @@ -0,0 +1,200 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +// Code generated by mockery v1.0.0. +// Modified manually for type alias to work correctly. +// https://github.com/vektra/mockery/issues/236 + +// Code generated by mockery v2.42.1. DO NOT EDIT. + +package mocks + +import ( + context "context" + + "go.temporal.io/sdk/client" + + "github.com/stretchr/testify/mock" +) + +// DeploymentClient is an autogenerated mock type for the DeploymentClient type +type DeploymentClient struct { + mock.Mock +} + +// Describe provides a mock function with given fields: ctx, options +func (_m *DeploymentClient) Describe(ctx context.Context, options client.DeploymentDescribeOptions) (client.DeploymentInfo, error) { + ret := _m.Called(ctx, options) + + if len(ret) == 0 { + panic("no return value specified for Describe") + } + + var r0 client.DeploymentInfo + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, client.DeploymentDescribeOptions) (client.DeploymentInfo, error)); ok { + return rf(ctx, options) + } + if rf, ok := ret.Get(0).(func(context.Context, client.DeploymentDescribeOptions) client.DeploymentInfo); ok { + r0 = rf(ctx, options) + } else { + r0 = ret.Get(0).(client.DeploymentInfo) + } + + if rf, ok := ret.Get(1).(func(context.Context, client.DeploymentDescribeOptions) error); ok { + r1 = rf(ctx, options) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// GetCurrent provides a mock function with given fields: ctx, options +func (_m *DeploymentClient) GetCurrent(ctx context.Context, options client.DeploymentGetCurrentOptions) (client.DeploymentInfo, error) { + ret := _m.Called(ctx, options) + + if len(ret) == 0 { + panic("no return value specified for GetCurrent") + } + + var r0 client.DeploymentInfo + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, client.DeploymentGetCurrentOptions) (client.DeploymentInfo, error)); ok { + return rf(ctx, options) + } + if rf, ok := ret.Get(0).(func(context.Context, client.DeploymentGetCurrentOptions) client.DeploymentInfo); ok { + r0 = rf(ctx, options) + } else { + r0 = ret.Get(0).(client.DeploymentInfo) + } + + if rf, ok := ret.Get(1).(func(context.Context, client.DeploymentGetCurrentOptions) error); ok { + r1 = rf(ctx, options) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// GetReachability provides a mock function with given fields: ctx, options +func (_m *DeploymentClient) GetReachability(ctx context.Context, options client.DeploymentGetReachabilityOptions) (client.DeploymentReachabilityInfo, error) { + ret := _m.Called(ctx, options) + + if len(ret) == 0 { + panic("no return value specified for GetReachability") + } + + var r0 client.DeploymentReachabilityInfo + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, client.DeploymentGetReachabilityOptions) (client.DeploymentReachabilityInfo, error)); ok { + return rf(ctx, options) + } + if rf, ok := ret.Get(0).(func(context.Context, client.DeploymentGetReachabilityOptions) client.DeploymentReachabilityInfo); ok { + r0 = rf(ctx, options) + } else { + r0 = ret.Get(0).(client.DeploymentReachabilityInfo) + } + + if rf, ok := ret.Get(1).(func(context.Context, client.DeploymentGetReachabilityOptions) error); ok { + r1 = rf(ctx, options) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// List provides a mock function with given fields: ctx, options +func (_m *DeploymentClient) List(ctx context.Context, options client.DeploymentListOptions) (client.DeploymentListIterator, error) { + ret := _m.Called(ctx, options) + + if len(ret) == 0 { + panic("no return value specified for List") + } + + var r0 client.DeploymentListIterator + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, client.DeploymentListOptions) (client.DeploymentListIterator, error)); ok { + return rf(ctx, options) + } + if rf, ok := ret.Get(0).(func(context.Context, client.DeploymentListOptions) client.DeploymentListIterator); ok { + r0 = rf(ctx, options) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(client.DeploymentListIterator) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, client.DeploymentListOptions) error); ok { + r1 = rf(ctx, options) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// SetCurrent provides a mock function with given fields: ctx, options +func (_m *DeploymentClient) SetCurrent(ctx context.Context, options client.DeploymentSetCurrentOptions) (client.DeploymentSetCurrentResponse, error) { + ret := _m.Called(ctx, options) + + if len(ret) == 0 { + panic("no return value specified for SetCurrent") + } + + var r0 client.DeploymentSetCurrentResponse + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, client.DeploymentSetCurrentOptions) (client.DeploymentSetCurrentResponse, error)); ok { + return rf(ctx, options) + } + if rf, ok := ret.Get(0).(func(context.Context, client.DeploymentSetCurrentOptions) client.DeploymentSetCurrentResponse); ok { + r0 = rf(ctx, options) + } else { + r0 = ret.Get(0).(client.DeploymentSetCurrentResponse) + } + + if rf, ok := ret.Get(1).(func(context.Context, client.DeploymentSetCurrentOptions) error); ok { + r1 = rf(ctx, options) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// NewDeploymentClient creates a new instance of DeploymentClient. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewDeploymentClient(t interface { + mock.TestingT + Cleanup(func()) +}) *DeploymentClient { + mock := &DeploymentClient{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/mocks/DeploymentListIterator.go b/mocks/DeploymentListIterator.go new file mode 100644 index 000000000..60dc3f517 --- /dev/null +++ b/mocks/DeploymentListIterator.go @@ -0,0 +1,104 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +// Code generated by mockery v1.0.0. +// Modified manually for type alias to work correctly. +// https://github.com/vektra/mockery/issues/236 + +// Code generated by mockery v2.42.1. DO NOT EDIT. + +package mocks + +import ( + "go.temporal.io/sdk/client" + + "github.com/stretchr/testify/mock" +) + +// DeploymentListIterator is an autogenerated mock type for the DeploymentListIterator type +type DeploymentListIterator struct { + mock.Mock +} + +// HasNext provides a mock function with given fields: +func (_m *DeploymentListIterator) HasNext() bool { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for HasNext") + } + + var r0 bool + if rf, ok := ret.Get(0).(func() bool); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(bool) + } + + return r0 +} + +// Next provides a mock function with given fields: +func (_m *DeploymentListIterator) Next() (*client.DeploymentListEntry, error) { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for Next") + } + + var r0 *client.DeploymentListEntry + var r1 error + if rf, ok := ret.Get(0).(func() (*client.DeploymentListEntry, error)); ok { + return rf() + } + if rf, ok := ret.Get(0).(func() *client.DeploymentListEntry); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*client.DeploymentListEntry) + } + } + + if rf, ok := ret.Get(1).(func() error); ok { + r1 = rf() + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// NewDeploymentListIterator creates a new instance of DeploymentListIterator. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewDeploymentListIterator(t interface { + mock.TestingT + Cleanup(func()) +}) *DeploymentListIterator { + mock := &DeploymentListIterator{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +}