Skip to content

Commit

Permalink
Add typed search attribute sample and Update Go SDK to v1.26.0 (#331)
Browse files Browse the repository at this point in the history
* Add typed search attribute sample

* Update to Go SDK v1.26.0
  • Loading branch information
Quinn-With-Two-Ns authored Mar 11, 2024
1 parent 3cc0870 commit 5c63363
Show file tree
Hide file tree
Showing 13 changed files with 290 additions and 57 deletions.
5 changes: 2 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,8 @@ Each sample demonstrates one feature of the SDK, together with tests.
- [**Memo**](./memo): Demonstrates how to use Memo that can be used
to store any kind of data.

- [**Search Attributes**](./searchattributes): Demonstrates how to
use custom Search Attributes that can be used to find Workflow Executions using predicates (must use
with [Elasticsearch](https://docs.temporal.io/cluster-deployment-guide/#elasticsearch)).
- [**Typed Search Attributes**](./typed-searchattributes): Demonstrates how to
use custom Search Attributes that can be used to find Workflow Executions using predicates/

- [**Timer Futures**](./timer): The sample starts a long running
order processing operation and starts a Timer (`workflow.NewTimer()`). If the processing time is too long, a
Expand Down
32 changes: 15 additions & 17 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,27 +7,28 @@ replace github.com/cactus/go-statsd-client => github.com/cactus/go-statsd-client
require (
github.com/golang/mock v1.7.0-rc.1
github.com/golang/snappy v0.0.4
github.com/google/uuid v1.4.0
github.com/google/uuid v1.6.0
github.com/hashicorp/go-plugin v1.4.5
github.com/opentracing/opentracing-go v1.2.0
github.com/pborman/uuid v1.2.1
github.com/prometheus/client_golang v1.18.0
github.com/stretchr/testify v1.8.4
github.com/stretchr/testify v1.9.0
github.com/temporalio/tctl v1.18.0
github.com/uber-go/tally/v4 v4.1.7
github.com/uber/jaeger-client-go v2.30.0+incompatible
go.opentelemetry.io/otel v1.22.0
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.16.0
go.opentelemetry.io/otel/sdk v1.22.0
go.opentelemetry.io/otel/trace v1.22.0
go.temporal.io/api v1.26.0
go.temporal.io/sdk v1.25.1
go.temporal.io/api v1.29.1
go.temporal.io/sdk v1.26.0
go.temporal.io/sdk/contrib/opentelemetry v0.3.0
go.temporal.io/sdk/contrib/opentracing v0.1.0
go.temporal.io/sdk/contrib/tally v0.2.0
go.temporal.io/sdk/contrib/tools/workflowcheck v0.0.0-20230612164027-11c2cb9e7d2d
go.uber.org/multierr v1.11.0
go.uber.org/zap v1.24.0
google.golang.org/protobuf v1.33.0
gopkg.in/square/go-jose.v2 v2.6.0
gopkg.in/yaml.v3 v3.0.1
)
Expand All @@ -42,12 +43,10 @@ require (
github.com/fatih/color v1.15.0 // indirect
github.com/go-logr/logr v1.4.1 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/gogo/googleapis v1.4.1 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/gogo/status v1.1.1 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/golang/protobuf v1.5.4 // indirect
github.com/grpc-ecosystem/go-grpc-middleware v1.4.0 // indirect
github.com/grpc-ecosystem/grpc-gateway v1.16.0 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.1 // indirect
github.com/hashicorp/go-hclog v1.3.1 // indirect
github.com/hashicorp/yamux v0.1.1 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
Expand All @@ -61,21 +60,20 @@ require (
github.com/prometheus/common v0.45.0 // indirect
github.com/prometheus/procfs v0.12.0 // indirect
github.com/robfig/cron v1.2.0 // indirect
github.com/stretchr/objx v0.5.0 // indirect
github.com/stretchr/objx v0.5.2 // indirect
github.com/twmb/murmur3 v1.1.8 // indirect
github.com/uber/jaeger-lib v2.4.1+incompatible // indirect
go.opentelemetry.io/otel/metric v1.22.0 // indirect
go.uber.org/atomic v1.11.0 // indirect
golang.org/x/crypto v0.16.0 // indirect
golang.org/x/net v0.19.0 // indirect
golang.org/x/sys v0.16.0 // indirect
golang.org/x/crypto v0.21.0 // indirect
golang.org/x/exp v0.0.0-20231127185646-65229373498e // indirect
golang.org/x/net v0.22.0 // indirect
golang.org/x/sys v0.18.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/time v0.3.0 // indirect
golang.org/x/tools v0.16.0 // indirect
google.golang.org/genproto v0.0.0-20240102182953-50ed04b92917 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240102182953-50ed04b92917 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240102182953-50ed04b92917 // indirect
google.golang.org/grpc v1.60.1 // indirect
google.golang.org/protobuf v1.32.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240304212257-790db918fca8 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240304212257-790db918fca8 // indirect
google.golang.org/grpc v1.62.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
)
64 changes: 32 additions & 32 deletions go.sum

Large diffs are not rendered by default.

7 changes: 4 additions & 3 deletions recovery/recovery_workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/temporal"
"go.temporal.io/sdk/workflow"
"google.golang.org/protobuf/types/known/timestamppb"

"github.com/temporalio/samples-go/recovery/cache"
)
Expand Down Expand Up @@ -288,7 +289,7 @@ func extractStateFromEvent(workflowID string, event *historypb.HistoryEvent) (*R
Options: client.StartWorkflowOptions{
ID: workflowID,
TaskQueue: attr.TaskQueue.GetName(),
WorkflowTaskTimeout: *attr.GetWorkflowTaskTimeout(),
WorkflowTaskTimeout: attr.GetWorkflowTaskTimeout().AsDuration(),
// RetryPolicy: attr.RetryPolicy,
},
State: state,
Expand Down Expand Up @@ -344,8 +345,8 @@ func getAllExecutionsOfType(ctx context.Context, c client.Client, workflowType s
MaximumPageSize: 10,
NextPageToken: nextPageToken,
StartTimeFilter: &filterpb.StartTimeFilter{
EarliestTime: &zeroTime,
LatestTime: &now,
EarliestTime: timestamppb.New(zeroTime),
LatestTime: timestamppb.New(now),
},
Filters: &workflowservice.ListOpenWorkflowExecutionsRequest_TypeFilter{TypeFilter: &filterpb.WorkflowTypeFilter{
Name: workflowType,
Expand Down
3 changes: 2 additions & 1 deletion schedule/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,14 @@ func SampleScheduleWorkflow(ctx workflow.Context) error {
info := workflow.GetInfo(ctx1)

// Workflow Executions started by a Schedule have the following additional properties appended to their search attributes
//lint:ignore SA1019 - this is a sample
scheduledByIDPayload := info.SearchAttributes.IndexedFields["TemporalScheduledById"]
var scheduledByID string
err := converter.GetDefaultDataConverter().FromPayload(scheduledByIDPayload, &scheduledByID)
if err != nil {
return err
}

//lint:ignore SA1019 - this is a sample
startTimePayload := info.SearchAttributes.IndexedFields["TemporalScheduledStartTime"]
var startTime time.Time
err = converter.GetDefaultDataConverter().FromPayload(startTimePayload, &startTime)
Expand Down
2 changes: 2 additions & 0 deletions searchattributes/README.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
This sample is now out of data, please see the ./typed-searchattributes sample.

### Steps to run this sample:
1) Run [Temporal Server](Run a [Temporal service](https://github.com/temporalio/samples-go/tree/main/#how-to-use) with an Advanced Visibility store (Elasticsearch integrated).
If you are using the default `docker-compose` config, then Elasticsearch is already integrated.
Expand Down
5 changes: 5 additions & 0 deletions searchattributes/searchattributes_workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ func SearchAttributesWorkflow(ctx workflow.Context) error {

// Get search attributes that were provided when workflow was started.
info := workflow.GetInfo(ctx)
//lint:ignore SA1019 - this is a sample
val := info.SearchAttributes.IndexedFields["CustomIntField"]
var currentIntValue int
err := converter.GetDefaultDataConverter().FromPayload(val, &currentIntValue)
Expand All @@ -63,13 +64,15 @@ func SearchAttributesWorkflow(ctx workflow.Context) error {
}
// This won't persist search attributes on server because commands are not sent to server,
// but local cache will be updated.
//lint:ignore SA1019 - this is a sample
err = workflow.UpsertSearchAttributes(ctx, attributes)
if err != nil {
return err
}

// Print current search attributes with modifications above.
info = workflow.GetInfo(ctx)
//lint:ignore SA1019 - this is a sample
err = printSearchAttributes(info.SearchAttributes, logger)
if err != nil {
return err
Expand All @@ -79,13 +82,15 @@ func SearchAttributesWorkflow(ctx workflow.Context) error {
attributes = map[string]interface{}{
"CustomKeywordField": "Update2",
}
//lint:ignore SA1019 - this is a sample
err = workflow.UpsertSearchAttributes(ctx, attributes)
if err != nil {
return err
}

// Print current search attributes.
info = workflow.GetInfo(ctx)
//lint:ignore SA1019 - this is a sample
err = printSearchAttributes(info.SearchAttributes, logger)
if err != nil {
return err
Expand Down
3 changes: 2 additions & 1 deletion temporal-fixtures/namespaces/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"log"

"github.com/pborman/uuid"
"google.golang.org/protobuf/types/known/durationpb"

"strconv"

Expand Down Expand Up @@ -35,7 +36,7 @@ func main() {
Namespace: uuidvar + "_" + strconv.Itoa(i),
Description: "Namespace Description " + strconv.Itoa(i),
OwnerEmail: "[email protected]",
WorkflowExecutionRetentionPeriod: &retention,
WorkflowExecutionRetentionPeriod: durationpb.New(retention),
}
if err = c.Register(context.Background(), req); err != nil {
log.Fatalln("Unable to register namespace", err)
Expand Down
14 changes: 14 additions & 0 deletions typed-searchattributes/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
### Typed Search Attributes Sample

This sample shows how to use and test the Typed Search Attributes API.

### Steps to run this sample:
1) Run a [Temporal service](https://github.com/temporalio/samples-go/tree/main/#how-to-use).
2) Run the following command to start the worker:
```
go run typed-searchattributes/worker/main.go
```
3) Run the following command to start the example:
```
go run typed-searchattributes/starter/main.go
```
109 changes: 109 additions & 0 deletions typed-searchattributes/searchattributes_workflow.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package typedsearchattributes

import (
"errors"
"fmt"
"strings"
"time"

"go.temporal.io/sdk/log"
"go.temporal.io/sdk/temporal"
"go.temporal.io/sdk/workflow"
)

/*
* This sample shows how to use the typed search attributes API.
*/

var (
// CustomIntKey is the key for a custom int search attribute.
CustomIntKey = temporal.NewSearchAttributeKeyInt64("CustomIntField")
// CustomKeyword is the key for a custom keyword search attribute.
CustomKeyword = temporal.NewSearchAttributeKeyString("CustomKeywordField")
// CustomBool is the key for a custom bool search attribute.
CustomBool = temporal.NewSearchAttributeKeyBool("CustomBoolField")
// CustomDouble is the key for a custom double search attribute.
CustomDouble = temporal.NewSearchAttributeKeyFloat64("CustomDoubleField")
// CustomStringField is the key for a custom string search attribute.
CustomStringField = temporal.NewSearchAttributeKeyString("CustomStringField")
// CustomDatetimeField is the key for a custom datetime search attribute.
CustomDatetimeField = temporal.NewSearchAttributeKeyTime("CustomDatetimeField")
)

// SearchAttributesWorkflow workflow definition
func SearchAttributesWorkflow(ctx workflow.Context) error {
logger := workflow.GetLogger(ctx)
logger.Info("SearchAttributes workflow started")

// Get search attributes that were provided when workflow was started.
searchAttributes := workflow.GetTypedSearchAttributes(ctx)
currentIntValue, ok := searchAttributes.GetInt64(CustomIntKey)
if !ok {
return errors.New("CustomIntField is not set")
}
logger.Info("Current search attribute value.", "CustomIntField", currentIntValue)

// Upsert search attributes.

// This won't persist search attributes on server because commands are not sent to server,
// but local cache will be updated.
err := workflow.UpsertTypedSearchAttributes(ctx,
CustomIntKey.ValueSet(2),
CustomKeyword.ValueSet("Update1"),
CustomBool.ValueSet(true),
CustomDouble.ValueSet(3.14),
CustomDatetimeField.ValueSet(workflow.Now(ctx).UTC()),
CustomStringField.ValueSet("String field is for text. When query, it will be tokenized for partial match."),
)
if err != nil {
return err
}

// Print current search attributes with modifications above.
searchAttributes = workflow.GetTypedSearchAttributes(ctx)
err = printSearchAttributes(searchAttributes, logger)
if err != nil {
return err
}

// Update search attributes again.
err = workflow.UpsertTypedSearchAttributes(ctx,
CustomKeyword.ValueSet("Update2"),
CustomIntKey.ValueUnset(),
)
if err != nil {
return err
}

// Sleep to allow update to be visible in search.
err = workflow.Sleep(ctx, 1*time.Second)
if err != nil {
return err
}

// Print current search attributes.
searchAttributes = workflow.GetTypedSearchAttributes(ctx)
err = printSearchAttributes(searchAttributes, logger)
if err != nil {
return err
}

logger.Info("Workflow completed.")
return nil
}

func printSearchAttributes(searchAttributes temporal.SearchAttributes, logger log.Logger) error {
//workflowcheck:ignore
if searchAttributes.Size() == 0 {
logger.Info("Current search attributes are empty.")
return nil
}

var builder strings.Builder
//workflowcheck:ignore Only iterates for logging reasons
for k, v := range searchAttributes.GetUntypedValues() {
builder.WriteString(fmt.Sprintf("%s=%v\n", k.GetName(), v))
}
logger.Info(fmt.Sprintf("Current search attribute values:\n%s", builder.String()))
return nil
}
38 changes: 38 additions & 0 deletions typed-searchattributes/searchattributes_workflow_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package typedsearchattributes

import (
"testing"

"github.com/stretchr/testify/require"
"go.temporal.io/sdk/temporal"
"go.temporal.io/sdk/testsuite"
)

func Test_Workflow(t *testing.T) {
testSuite := &testsuite.WorkflowTestSuite{}
env := testSuite.NewTestWorkflowEnvironment()

// mock search attributes on start
_ = env.SetTypedSearchAttributesOnStart(temporal.NewSearchAttributes(CustomIntKey.ValueSet(1)))

// mock upsert operations
env.OnUpsertTypedSearchAttributes(
temporal.NewSearchAttributes(
CustomIntKey.ValueSet(2),
CustomKeyword.ValueSet("Update1"),
CustomBool.ValueSet(true),
CustomDouble.ValueSet(3.14),
CustomDatetimeField.ValueSet(env.Now().UTC()),
CustomStringField.ValueSet("String field is for text. When query, it will be tokenized for partial match."),
)).Return(nil).Once()

env.OnUpsertTypedSearchAttributes(
temporal.NewSearchAttributes(
CustomKeyword.ValueSet("Update2"),
CustomIntKey.ValueUnset(),
)).Return(nil).Once()

env.ExecuteWorkflow(SearchAttributesWorkflow)
require.True(t, env.IsWorkflowCompleted())
require.NoError(t, env.GetWorkflowError())
}
35 changes: 35 additions & 0 deletions typed-searchattributes/starter/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package main

import (
"context"
"log"

"github.com/pborman/uuid"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/temporal"

typedsearchattributes "github.com/temporalio/samples-go/typed-searchattributes"
)

func main() {
// The client is a heavyweight object that should be created once per process.
c, err := client.Dial(client.Options{
HostPort: client.DefaultHostPort,
})
if err != nil {
log.Fatalln("Unable to create client", err)
}
defer c.Close()

workflowOptions := client.StartWorkflowOptions{
ID: "typed-search_attributes_" + uuid.New(),
TaskQueue: "typed-search-attributes",
TypedSearchAttributes: temporal.NewSearchAttributes(typedsearchattributes.CustomIntKey.ValueSet(1)),
}

we, err := c.ExecuteWorkflow(context.Background(), workflowOptions, typedsearchattributes.SearchAttributesWorkflow)
if err != nil {
log.Fatalln("Unable to execute workflow", err)
}
log.Println("Started workflow", "WorkflowID", we.GetID(), "RunID", we.GetRunID())
}
Loading

0 comments on commit 5c63363

Please sign in to comment.