From aacc07f75425c1dce1c2678b4d63f7d5d85e7c7a Mon Sep 17 00:00:00 2001 From: Quinn Klassen Date: Fri, 23 Feb 2024 18:57:08 -0800 Subject: [PATCH] update --- go.mod | 1 + go.sum | 1 + recovery/recovery_workflow.go | 7 ++++--- schedule/workflow.go | 3 ++- searchattributes/searchattributes_workflow.go | 5 +++++ temporal-fixtures/namespaces/main.go | 3 ++- typed-searchattributes/searchattributes_workflow.go | 10 ++++++++-- .../searchattributes_workflow_test.go | 6 +++++- 8 files changed, 28 insertions(+), 8 deletions(-) diff --git a/go.mod b/go.mod index 981f3f4c..74d874cf 100644 --- a/go.mod +++ b/go.mod @@ -76,4 +76,5 @@ require ( google.golang.org/genproto/googleapis/rpc v0.0.0-20240125205218-1f4bbc51befe // indirect google.golang.org/grpc v1.61.0 // indirect google.golang.org/protobuf v1.32.0 // indirect + gopkg.in/yaml.v2 v2.4.0 // indirect ) diff --git a/go.sum b/go.sum index a5e07489..d781e739 100644 --- a/go.sum +++ b/go.sum @@ -484,6 +484,7 @@ gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.5/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/recovery/recovery_workflow.go b/recovery/recovery_workflow.go index d01ca30e..51f4551a 100644 --- a/recovery/recovery_workflow.go +++ b/recovery/recovery_workflow.go @@ -15,6 +15,7 @@ import ( "go.temporal.io/sdk/client" "go.temporal.io/sdk/temporal" "go.temporal.io/sdk/workflow" + "google.golang.org/protobuf/types/known/timestamppb" "github.com/temporalio/samples-go/recovery/cache" ) @@ -288,7 +289,7 @@ func extractStateFromEvent(workflowID string, event *historypb.HistoryEvent) (*R Options: client.StartWorkflowOptions{ ID: workflowID, TaskQueue: attr.TaskQueue.GetName(), - WorkflowTaskTimeout: *attr.GetWorkflowTaskTimeout(), + WorkflowTaskTimeout: attr.GetWorkflowTaskTimeout().AsDuration(), // RetryPolicy: attr.RetryPolicy, }, State: state, @@ -344,8 +345,8 @@ func getAllExecutionsOfType(ctx context.Context, c client.Client, workflowType s MaximumPageSize: 10, NextPageToken: nextPageToken, StartTimeFilter: &filterpb.StartTimeFilter{ - EarliestTime: &zeroTime, - LatestTime: &now, + EarliestTime: timestamppb.New(zeroTime), + LatestTime: timestamppb.New(now), }, Filters: &workflowservice.ListOpenWorkflowExecutionsRequest_TypeFilter{TypeFilter: &filterpb.WorkflowTypeFilter{ Name: workflowType, diff --git a/schedule/workflow.go b/schedule/workflow.go index 35198317..889bd987 100644 --- a/schedule/workflow.go +++ b/schedule/workflow.go @@ -23,13 +23,14 @@ func SampleScheduleWorkflow(ctx workflow.Context) error { info := workflow.GetInfo(ctx1) // Workflow Executions started by a Schedule have the following additional properties appended to their search attributes + //lint:ignore SA1019 - this is a sample scheduledByIDPayload := info.SearchAttributes.IndexedFields["TemporalScheduledById"] var scheduledByID string err := converter.GetDefaultDataConverter().FromPayload(scheduledByIDPayload, &scheduledByID) if err != nil { return err } - + //lint:ignore SA1019 - this is a sample startTimePayload := info.SearchAttributes.IndexedFields["TemporalScheduledStartTime"] var startTime time.Time err = converter.GetDefaultDataConverter().FromPayload(startTimePayload, &startTime) diff --git a/searchattributes/searchattributes_workflow.go b/searchattributes/searchattributes_workflow.go index 6c939d86..b210e4cc 100644 --- a/searchattributes/searchattributes_workflow.go +++ b/searchattributes/searchattributes_workflow.go @@ -43,6 +43,7 @@ func SearchAttributesWorkflow(ctx workflow.Context) error { // Get search attributes that were provided when workflow was started. info := workflow.GetInfo(ctx) + //lint:ignore SA1019 - this is a sample val := info.SearchAttributes.IndexedFields["CustomIntField"] var currentIntValue int err := converter.GetDefaultDataConverter().FromPayload(val, ¤tIntValue) @@ -63,6 +64,7 @@ func SearchAttributesWorkflow(ctx workflow.Context) error { } // This won't persist search attributes on server because commands are not sent to server, // but local cache will be updated. + //lint:ignore SA1019 - this is a sample err = workflow.UpsertSearchAttributes(ctx, attributes) if err != nil { return err @@ -70,6 +72,7 @@ func SearchAttributesWorkflow(ctx workflow.Context) error { // Print current search attributes with modifications above. info = workflow.GetInfo(ctx) + //lint:ignore SA1019 - this is a sample err = printSearchAttributes(info.SearchAttributes, logger) if err != nil { return err @@ -79,6 +82,7 @@ func SearchAttributesWorkflow(ctx workflow.Context) error { attributes = map[string]interface{}{ "CustomKeywordField": "Update2", } + //lint:ignore SA1019 - this is a sample err = workflow.UpsertSearchAttributes(ctx, attributes) if err != nil { return err @@ -86,6 +90,7 @@ func SearchAttributesWorkflow(ctx workflow.Context) error { // Print current search attributes. info = workflow.GetInfo(ctx) + //lint:ignore SA1019 - this is a sample err = printSearchAttributes(info.SearchAttributes, logger) if err != nil { return err diff --git a/temporal-fixtures/namespaces/main.go b/temporal-fixtures/namespaces/main.go index 892b3aed..e0c4faa9 100644 --- a/temporal-fixtures/namespaces/main.go +++ b/temporal-fixtures/namespaces/main.go @@ -8,6 +8,7 @@ import ( "log" "github.com/pborman/uuid" + "google.golang.org/protobuf/types/known/durationpb" "strconv" @@ -35,7 +36,7 @@ func main() { Namespace: uuidvar + "_" + strconv.Itoa(i), Description: "Namespace Description " + strconv.Itoa(i), OwnerEmail: "owner@mail.com", - WorkflowExecutionRetentionPeriod: &retention, + WorkflowExecutionRetentionPeriod: durationpb.New(retention), } if err = c.Register(context.Background(), req); err != nil { log.Fatalln("Unable to register namespace", err) diff --git a/typed-searchattributes/searchattributes_workflow.go b/typed-searchattributes/searchattributes_workflow.go index 6ecbc49a..c853b741 100644 --- a/typed-searchattributes/searchattributes_workflow.go +++ b/typed-searchattributes/searchattributes_workflow.go @@ -67,13 +67,19 @@ func SearchAttributesWorkflow(ctx workflow.Context) error { } // Update search attributes again. - err = workflow.UpsertTypedSearchAttributes(ctx, CustomKeyword.ValueSet("Update2")) + err = workflow.UpsertTypedSearchAttributes(ctx, + CustomKeyword.ValueSet("Update2"), + CustomIntKey.ValueUnset(), + ) if err != nil { return err } // Sleep to allow update to be visible in search. - workflow.Sleep(ctx, 1*time.Second) + err = workflow.Sleep(ctx, 1*time.Second) + if err != nil { + return err + } // Print current search attributes. searchAttributes = workflow.GetTypedSearchAttributes(ctx) diff --git a/typed-searchattributes/searchattributes_workflow_test.go b/typed-searchattributes/searchattributes_workflow_test.go index f590c94a..9743db1e 100644 --- a/typed-searchattributes/searchattributes_workflow_test.go +++ b/typed-searchattributes/searchattributes_workflow_test.go @@ -26,7 +26,11 @@ func Test_Workflow(t *testing.T) { CustomStringField.ValueSet("String field is for text. When query, it will be tokenized for partial match."), )).Return(nil).Once() - env.OnUpsertTypedSearchAttributes(temporal.NewSearchAttributes(CustomKeyword.ValueSet("Update2"))).Return(nil).Once() + env.OnUpsertTypedSearchAttributes( + temporal.NewSearchAttributes( + CustomKeyword.ValueSet("Update2"), + CustomIntKey.ValueUnset(), + )).Return(nil).Once() env.ExecuteWorkflow(SearchAttributesWorkflow) require.True(t, env.IsWorkflowCompleted())