Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
Quinn-With-Two-Ns committed Feb 24, 2024
1 parent 5e27adb commit aacc07f
Show file tree
Hide file tree
Showing 8 changed files with 28 additions and 8 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -76,4 +76,5 @@ require (
google.golang.org/genproto/googleapis/rpc v0.0.0-20240125205218-1f4bbc51befe // indirect
google.golang.org/grpc v1.61.0 // indirect
google.golang.org/protobuf v1.32.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
)
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,7 @@ gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.5/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
Expand Down
7 changes: 4 additions & 3 deletions recovery/recovery_workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/temporal"
"go.temporal.io/sdk/workflow"
"google.golang.org/protobuf/types/known/timestamppb"

"github.com/temporalio/samples-go/recovery/cache"
)
Expand Down Expand Up @@ -288,7 +289,7 @@ func extractStateFromEvent(workflowID string, event *historypb.HistoryEvent) (*R
Options: client.StartWorkflowOptions{
ID: workflowID,
TaskQueue: attr.TaskQueue.GetName(),
WorkflowTaskTimeout: *attr.GetWorkflowTaskTimeout(),
WorkflowTaskTimeout: attr.GetWorkflowTaskTimeout().AsDuration(),
// RetryPolicy: attr.RetryPolicy,
},
State: state,
Expand Down Expand Up @@ -344,8 +345,8 @@ func getAllExecutionsOfType(ctx context.Context, c client.Client, workflowType s
MaximumPageSize: 10,
NextPageToken: nextPageToken,
StartTimeFilter: &filterpb.StartTimeFilter{
EarliestTime: &zeroTime,
LatestTime: &now,
EarliestTime: timestamppb.New(zeroTime),
LatestTime: timestamppb.New(now),
},
Filters: &workflowservice.ListOpenWorkflowExecutionsRequest_TypeFilter{TypeFilter: &filterpb.WorkflowTypeFilter{
Name: workflowType,
Expand Down
3 changes: 2 additions & 1 deletion schedule/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,14 @@ func SampleScheduleWorkflow(ctx workflow.Context) error {
info := workflow.GetInfo(ctx1)

// Workflow Executions started by a Schedule have the following additional properties appended to their search attributes
//lint:ignore SA1019 - this is a sample
scheduledByIDPayload := info.SearchAttributes.IndexedFields["TemporalScheduledById"]
var scheduledByID string
err := converter.GetDefaultDataConverter().FromPayload(scheduledByIDPayload, &scheduledByID)
if err != nil {
return err
}

//lint:ignore SA1019 - this is a sample
startTimePayload := info.SearchAttributes.IndexedFields["TemporalScheduledStartTime"]
var startTime time.Time
err = converter.GetDefaultDataConverter().FromPayload(startTimePayload, &startTime)
Expand Down
5 changes: 5 additions & 0 deletions searchattributes/searchattributes_workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ func SearchAttributesWorkflow(ctx workflow.Context) error {

// Get search attributes that were provided when workflow was started.
info := workflow.GetInfo(ctx)
//lint:ignore SA1019 - this is a sample
val := info.SearchAttributes.IndexedFields["CustomIntField"]
var currentIntValue int
err := converter.GetDefaultDataConverter().FromPayload(val, &currentIntValue)
Expand All @@ -63,13 +64,15 @@ func SearchAttributesWorkflow(ctx workflow.Context) error {
}
// This won't persist search attributes on server because commands are not sent to server,
// but local cache will be updated.
//lint:ignore SA1019 - this is a sample
err = workflow.UpsertSearchAttributes(ctx, attributes)
if err != nil {
return err
}

// Print current search attributes with modifications above.
info = workflow.GetInfo(ctx)
//lint:ignore SA1019 - this is a sample
err = printSearchAttributes(info.SearchAttributes, logger)
if err != nil {
return err
Expand All @@ -79,13 +82,15 @@ func SearchAttributesWorkflow(ctx workflow.Context) error {
attributes = map[string]interface{}{
"CustomKeywordField": "Update2",
}
//lint:ignore SA1019 - this is a sample
err = workflow.UpsertSearchAttributes(ctx, attributes)
if err != nil {
return err
}

// Print current search attributes.
info = workflow.GetInfo(ctx)
//lint:ignore SA1019 - this is a sample
err = printSearchAttributes(info.SearchAttributes, logger)
if err != nil {
return err
Expand Down
3 changes: 2 additions & 1 deletion temporal-fixtures/namespaces/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"log"

"github.com/pborman/uuid"
"google.golang.org/protobuf/types/known/durationpb"

"strconv"

Expand Down Expand Up @@ -35,7 +36,7 @@ func main() {
Namespace: uuidvar + "_" + strconv.Itoa(i),
Description: "Namespace Description " + strconv.Itoa(i),
OwnerEmail: "[email protected]",
WorkflowExecutionRetentionPeriod: &retention,
WorkflowExecutionRetentionPeriod: durationpb.New(retention),
}
if err = c.Register(context.Background(), req); err != nil {
log.Fatalln("Unable to register namespace", err)
Expand Down
10 changes: 8 additions & 2 deletions typed-searchattributes/searchattributes_workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,19 @@ func SearchAttributesWorkflow(ctx workflow.Context) error {
}

// Update search attributes again.
err = workflow.UpsertTypedSearchAttributes(ctx, CustomKeyword.ValueSet("Update2"))
err = workflow.UpsertTypedSearchAttributes(ctx,
CustomKeyword.ValueSet("Update2"),
CustomIntKey.ValueUnset(),
)
if err != nil {
return err
}

// Sleep to allow update to be visible in search.
workflow.Sleep(ctx, 1*time.Second)
err = workflow.Sleep(ctx, 1*time.Second)
if err != nil {
return err
}

// Print current search attributes.
searchAttributes = workflow.GetTypedSearchAttributes(ctx)
Expand Down
6 changes: 5 additions & 1 deletion typed-searchattributes/searchattributes_workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,11 @@ func Test_Workflow(t *testing.T) {
CustomStringField.ValueSet("String field is for text. When query, it will be tokenized for partial match."),
)).Return(nil).Once()

env.OnUpsertTypedSearchAttributes(temporal.NewSearchAttributes(CustomKeyword.ValueSet("Update2"))).Return(nil).Once()
env.OnUpsertTypedSearchAttributes(
temporal.NewSearchAttributes(
CustomKeyword.ValueSet("Update2"),
CustomIntKey.ValueUnset(),
)).Return(nil).Once()

env.ExecuteWorkflow(SearchAttributesWorkflow)
require.True(t, env.IsWorkflowCompleted())
Expand Down

0 comments on commit aacc07f

Please sign in to comment.