From 9968f90dda8e469c0574e57c753342176634e4ca Mon Sep 17 00:00:00 2001 From: Kevin Woo <3469532+kevinawoo@users.noreply.github.com> Date: Tue, 3 Dec 2024 10:39:49 -0800 Subject: [PATCH] blob storage codec sample --- README.md | 4 + blob-store-data-converter/README.md | 53 ++++++++ blob-store-data-converter/blobstore/client.go | 58 +++++++++ .../codec-server/main.go | 72 +++++++++++ blob-store-data-converter/codec.go | 101 ++++++++++++++++ blob-store-data-converter/data_converter.go | 65 ++++++++++ .../data_converter_test.go | 43 +++++++ blob-store-data-converter/propagator.go | 113 ++++++++++++++++++ blob-store-data-converter/starter/main.go | 69 +++++++++++ blob-store-data-converter/worker/main.go | 44 +++++++ blob-store-data-converter/workflow.go | 67 +++++++++++ blob-store-data-converter/workflow_test.go | 42 +++++++ 12 files changed, 731 insertions(+) create mode 100644 blob-store-data-converter/README.md create mode 100644 blob-store-data-converter/blobstore/client.go create mode 100644 blob-store-data-converter/codec-server/main.go create mode 100644 blob-store-data-converter/codec.go create mode 100644 blob-store-data-converter/data_converter.go create mode 100644 blob-store-data-converter/data_converter_test.go create mode 100644 blob-store-data-converter/propagator.go create mode 100644 blob-store-data-converter/starter/main.go create mode 100644 blob-store-data-converter/worker/main.go create mode 100644 blob-store-data-converter/workflow.go create mode 100644 blob-store-data-converter/workflow_test.go diff --git a/README.md b/README.md index 330a0df8..f74e232a 100644 --- a/README.md +++ b/README.md @@ -82,6 +82,10 @@ Each sample demonstrates one feature of the SDK, together with tests. server to decode payloads for display in tctl and Temporal Web. This setup can be used for any kind of codec, common examples are compression or encryption. +- [**Blob Store using Data Converters**](./blob-store-data-converter): + Demonstrates how to use the DataConverter to store large payloads greater than a certain size in a blobstore and pass + the object path around in the Temporal Event History. + - [**Query Example**](./query): Demonstrates how to Query the state of a single Workflow Execution using the `QueryWorkflow` and `SetQueryHandler` APIs. Additional documentation: [How to Query a Workflow Execution in Go](https://docs.temporal.io/application-development/features/#queries). diff --git a/blob-store-data-converter/README.md b/blob-store-data-converter/README.md new file mode 100644 index 00000000..48a0226f --- /dev/null +++ b/blob-store-data-converter/README.md @@ -0,0 +1,53 @@ +# Blobstore DataConverter +This sample demonstrates how to use the DataConverter to store large payloads greater than a certain size +in a blobstore and passes the object path around in the Temporal Event History. + +The payload size limit is set in [codec.go: `payloadSizeLimit`](./codec.go#L20). + +It relies on the use of context propagation to pass blobstore config metadata, like object path prefixes. + +In this example, we prefix all object paths with a `tenantID` to better object lifecycle in the blobstore. + +> [!NOTE] +> The time it takes to encode/decode payloads is counted in the `StartWorkflowOptions.WorkflowTaskTimeout`, +> which has a [absolute max of 2 minutes](https://github.com/temporalio/temporal/blob/2a0f6b238f6cdab768098194436b0dda453c8064/common/constants.go#L68). + +> [!WARNING] +> As of `Temporal UI v2.33.1` (`Temporal v1.25.2`), **does not** have the ability to send context headers. +> This means that Workflow Start, Signal, Queries, etc. from the UI/CLI will pass payloads to the codec-server but the +> worker needs to handle a missing context propagation header. +> +> In this sample when the header is missing, we use a default of `DefaultPropagatedValues()`, +> see [propagator.go: `missingHeaderContextPropagationKeyError`](./propagator.go#L66). +> +> This allows this sample to still work with the UI/CLI. This maybe not suitable depending on your requirements. + + +### Steps to run this sample: +1. Run a [Temporal service](https://github.com/temporalio/samples-go/tree/main/#how-to-use) +2. Run the following command to start the worker + ``` + go run worker/main.go + ``` +3. Run the following command to start the example + ``` + go run starter/main.go + ``` +4. Open the Temporal Web UI and observe the following: + - Workflow Input and Activity Input values will be in plain text + - Activity Result and Workflow Result will be an object path +5. Run the following command to start the remote codec server + ``` + go run ./codec-server + ``` +6. Open the Temporal Web UI and observe the workflow execution, all payloads will now be fully expanded. +7. You can use the Temporal CLI as well + ``` + # payloads will be obfuscated object paths + temporal workflow show --env local -w WORKFLOW_ID + + # payloads will be fully rendered json + temporal --codec-endpoint 'http://localhost:8081/' workflow show -w WORKFLOW_ID + `````` + +Note: Please see the [codec-server](../codec-server/) sample for a more complete example of a codec server which provides oauth. diff --git a/blob-store-data-converter/blobstore/client.go b/blob-store-data-converter/blobstore/client.go new file mode 100644 index 00000000..70261117 --- /dev/null +++ b/blob-store-data-converter/blobstore/client.go @@ -0,0 +1,58 @@ +package blobstore + +import ( + "fmt" + "os" + "strings" + "time" +) + +type Client struct { + dir string + simulateNetworkLatency time.Duration +} + +func NewClient() *Client { + return &Client{ + dir: "/tmp/temporal-sample/blob-store-data-converter/blobs", + simulateNetworkLatency: 1 * time.Second, + } +} + +func NewTestClient() *Client { + return &Client{ + dir: "/tmp/temporal-sample/blob-store-data-converter/test-blobs", + simulateNetworkLatency: 0, + } +} + +func (b *Client) SaveBlob(key string, data []byte) error { + err := os.MkdirAll(b.dir, 0755) + if err != nil { + return fmt.Errorf("failed to create directory %s: %w", b.dir, err) + } + + path := fmt.Sprintf(b.dir + "/" + strings.ReplaceAll(key, "/", "_")) + fmt.Println("saving blob to: ", path) + err = os.WriteFile(path, data, 0644) + if err != nil { + return fmt.Errorf("failed to save blob: %w", err) + } + + time.Sleep(b.simulateNetworkLatency) + + return nil +} + +func (b *Client) GetBlob(key string) ([]byte, error) { + path := fmt.Sprintf(b.dir + "/" + strings.ReplaceAll(key, "/", "_")) + fmt.Println("reading blob from: ", path) + data, err := os.ReadFile(path) + if err != nil { + return nil, fmt.Errorf("failed to read blob: %w", err) + } + + time.Sleep(b.simulateNetworkLatency) + + return data, nil +} diff --git a/blob-store-data-converter/codec-server/main.go b/blob-store-data-converter/codec-server/main.go new file mode 100644 index 00000000..9ef1ce63 --- /dev/null +++ b/blob-store-data-converter/codec-server/main.go @@ -0,0 +1,72 @@ +package main + +import ( + "flag" + "fmt" + bsdc "github.com/temporalio/samples-go/blob-store-data-converter" + "github.com/temporalio/samples-go/blob-store-data-converter/blobstore" + "go.temporal.io/sdk/converter" + "log" + "net/http" + "os" + "os/signal" + "strconv" +) + +var portFlag int +var web string + +func init() { + flag.IntVar(&portFlag, "port", 8082, "Port to listen on") + flag.StringVar(&web, "web", "http://localhost:8233", "Temporal UI URL") +} + +func main() { + flag.Parse() + + // This example codec server does not support varying config per namespace, + // decoding for the Temporal Web UI or oauth. + // For a more complete example of a codec server please see the codec-server sample at: + // https://github.com/temporalio/samples-go/tree/main/codec-server + handler := converter.NewPayloadCodecHTTPHandler( + //bsdc.NewBaseCodec(blobstore.NewClient()), + bsdc.NewBlobCodec(blobstore.NewClient(), bsdc.PropagatedValues{}), + ) + + srv := &http.Server{ + Addr: "localhost:" + strconv.Itoa(portFlag), + Handler: newCORSHTTPHandler(handler), + } + + errCh := make(chan error, 1) + go func() { + fmt.Printf("allowing CORS Headers for %s\n", web) + fmt.Printf("Listening on http://%s/\n", srv.Addr) + errCh <- srv.ListenAndServe() + }() + + sigCh := make(chan os.Signal, 1) + signal.Notify(sigCh, os.Interrupt) + + select { + case <-sigCh: + _ = srv.Close() + case err := <-errCh: + log.Fatal(err) + } +} + +// newCORSHTTPHandler wraps a HTTP handler with CORS support +func newCORSHTTPHandler(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Access-Control-Allow-Origin", web) + w.Header().Set("Access-Control-Allow-Credentials", "true") + w.Header().Set("Access-Control-Allow-Headers", "Authorization,Content-Type,X-Namespace,X-CSRF-Token,Caller-Type") + + if r.Method == "OPTIONS" { + return + } + + next.ServeHTTP(w, r) + }) +} diff --git a/blob-store-data-converter/codec.go b/blob-store-data-converter/codec.go new file mode 100644 index 00000000..69754f58 --- /dev/null +++ b/blob-store-data-converter/codec.go @@ -0,0 +1,101 @@ +package blobstore_data_converter + +import ( + "fmt" + "github.com/google/uuid" + "github.com/temporalio/samples-go/blob-store-data-converter/blobstore" + commonpb "go.temporal.io/api/common/v1" + "go.temporal.io/sdk/converter" + "strings" +) + +const ( + MetadataEncodingBlobStorePlain = "blobstore/plain" + + // gRPC has a 4MB limit. + // To save some space for other metadata, we should stay around half that. + // + // For this example, as a proof of concept, we'll use much smaller size limit. + payloadSizeLimit = 33 +) + +// BlobCodec knows where to store the blobs from the PropagatedValues +// Note, see readme for details on missing values +type BlobCodec struct { + client *blobstore.Client + bucket string + tenant string + pathPrefix []string +} + +var _ = converter.PayloadCodec(&BlobCodec{}) // Ensure that BlobCodec implements converter.PayloadCodec + +// NewBlobCodec is aware of where of the propagated context values from the data converter +func NewBlobCodec(c *blobstore.Client, values PropagatedValues) *BlobCodec { + return &BlobCodec{ + client: c, + bucket: "blob://mybucket", + tenant: values.TenantID, + pathPrefix: values.BlobNamePrefix, + } +} + +// Encode knows where to store the blobs from values stored in the context +func (c *BlobCodec) Encode(payloads []*commonpb.Payload) ([]*commonpb.Payload, error) { + result := make([]*commonpb.Payload, len(payloads)) + for i, p := range payloads { + // if the payload is small enough, just send it as is + fmt.Printf("encoding payload with len(%s): %d\n", string(p.Data), len(p.Data)) + if len(p.Data) < payloadSizeLimit { + result[i] = &commonpb.Payload{Metadata: p.Metadata, Data: p.Data} + continue + } + + origBytes, err := p.Marshal() + if err != nil { + return payloads, err + } + + // save the data in our blob store db + objectName := strings.Join(c.pathPrefix, "_") + "__" + uuid.New().String() // ensures each blob is unique + path := fmt.Sprintf("%s/%s/%s", c.bucket, c.tenant, objectName) + err = c.client.SaveBlob(path, origBytes) + if err != nil { + return payloads, err + } + + result[i] = &commonpb.Payload{ + Metadata: map[string][]byte{ + "encoding": []byte(MetadataEncodingBlobStorePlain), + }, + Data: []byte(path), + } + } + + return result, nil +} + +// Decode does not need to be context aware because it can fetch the blobs via the payload path +func (c *BlobCodec) Decode(payloads []*commonpb.Payload) ([]*commonpb.Payload, error) { + result := make([]*commonpb.Payload, len(payloads)) + for i, p := range payloads { + if string(p.Metadata["encoding"]) != MetadataEncodingBlobStorePlain { + result[i] = &commonpb.Payload{Metadata: p.Metadata, Data: p.Data} + continue + } + + // fetch it from our blob store db + data, err := c.client.GetBlob(string(p.Data)) + if err != nil { + return payloads, err + } + + result[i] = &commonpb.Payload{} + err = result[i].Unmarshal(data) + if err != nil { + return payloads, err + } + } + + return result, nil +} diff --git a/blob-store-data-converter/data_converter.go b/blob-store-data-converter/data_converter.go new file mode 100644 index 00000000..a52a92ae --- /dev/null +++ b/blob-store-data-converter/data_converter.go @@ -0,0 +1,65 @@ +package blobstore_data_converter + +import ( + "context" + "github.com/temporalio/samples-go/blob-store-data-converter/blobstore" + "go.temporal.io/sdk/converter" + "go.temporal.io/sdk/workflow" +) + +type DataConverter struct { + client *blobstore.Client + + parent converter.DataConverter // Until EncodingDataConverter supports workflow.ContextAware we'll store parent here. + + converter.DataConverter // embeds converter.DataConverter +} + +var _ = workflow.ContextAware(&DataConverter{}) // Ensure that DataConverter implements workflow.ContextAware + +// NewDataConverter returns DataConverter, which embeds converter.DataConverter +func NewDataConverter(parent converter.DataConverter, client *blobstore.Client) *DataConverter { + next := []converter.PayloadCodec{ + NewBlobCodec(client, UnknownTenant()), + } + + return &DataConverter{ + client: client, + parent: parent, + DataConverter: converter.NewCodecDataConverter(parent, next...), + } +} + +// WithContext will create a BlobCodec used to store and retrieve payloads from the blob storage +// +// This is called when payloads needs to be passed between the Clients/Activity and the Temporal Server. e.g. +// - From starter to encode/decode Workflow Input and Result +// - For each Activity to encode/decode it's Input and Result +func (dc *DataConverter) WithContext(ctx context.Context) converter.DataConverter { + if vals, ok := ctx.Value(PropagatedValuesKey).(PropagatedValues); ok { + parent := dc.parent + if parentWithContext, ok := parent.(workflow.ContextAware); ok { + parent = parentWithContext.WithContext(ctx) + } + + return converter.NewCodecDataConverter(parent, NewBlobCodec(dc.client, vals)) + } + + return dc +} + +// WithWorkflowContext will create a BlobCodec used to store payloads in blob storage +// +// This is called inside the Workflow to decode/encode the Workflow Input and Result +func (dc *DataConverter) WithWorkflowContext(ctx workflow.Context) converter.DataConverter { + if vals, ok := ctx.Value(PropagatedValuesKey).(PropagatedValues); ok { + parent := dc.parent + if parentWithContext, ok := parent.(workflow.ContextAware); ok { + parent = parentWithContext.WithWorkflowContext(ctx) + } + + return converter.NewCodecDataConverter(parent, NewBlobCodec(dc.client, vals)) + } + + return dc +} diff --git a/blob-store-data-converter/data_converter_test.go b/blob-store-data-converter/data_converter_test.go new file mode 100644 index 00000000..74ffb219 --- /dev/null +++ b/blob-store-data-converter/data_converter_test.go @@ -0,0 +1,43 @@ +package blobstore_data_converter + +import ( + "context" + "github.com/temporalio/samples-go/blob-store-data-converter/blobstore" + "testing" + + "github.com/stretchr/testify/require" + "go.temporal.io/sdk/converter" +) + +func Test_DataConverter(t *testing.T) { + defaultDc := converter.GetDefaultDataConverter() + + ctx := context.Background() + ctx = context.WithValue(ctx, PropagatedValuesKey, PropagatedValues{ + TenantID: "t1", + BlobNamePrefix: []string{"t1", "starter"}, + }) + + blobDc := NewDataConverter( + converter.GetDefaultDataConverter(), + blobstore.NewTestClient(), + ) + blobDcCtx := blobDc.WithContext(ctx) + + defaultPayloads, err := defaultDc.ToPayloads("small payload") + require.NoError(t, err) + require.Equal(t, string(defaultPayloads.Payloads[0].GetData()), `"small payload"`) + + const largePayload = "really really really large giant payload" + require.Greater(t, len([]byte(largePayload)), payloadSizeLimit, "payload size should be larger than the limit in the example") + + offloadedPayloads, err := blobDcCtx.ToPayloads(largePayload) + require.NoError(t, err) + require.Contains(t, string(offloadedPayloads.Payloads[0].GetData()), "blob://") + + var result string + err = blobDc.FromPayloads(offloadedPayloads, &result) + require.NoError(t, err) + + require.Equal(t, largePayload, result) +} diff --git a/blob-store-data-converter/propagator.go b/blob-store-data-converter/propagator.go new file mode 100644 index 00000000..1fbc2a9a --- /dev/null +++ b/blob-store-data-converter/propagator.go @@ -0,0 +1,113 @@ +package blobstore_data_converter + +import ( + "context" + "fmt" + "go.temporal.io/sdk/converter" + "go.temporal.io/sdk/workflow" +) + +type ( + // contextKey is an unexported type used as key for items stored in the + // Context object + contextKey int + + // propagator implements the custom context propagator + propagator struct{} +) + +const ( + _ contextKey = iota + PropagatedValuesKey // The key used to store PropagatedValues in the context +) + +// propagationKey is the key used by the propagator to pass values through the +// Temporal Workflow Event History headers +const propagationKey = "context-propagation" + +// PropagatedValues is the struct stored on the context under PropagatedValuesKey +// +// converter.GetDefaultDataConverter() converts this into a json string to be stored in the +// Temporal Workflow Event History headers under propagationKey +type PropagatedValues struct { + TenantID string `json:"tenantID,omitempty"` + BlobNamePrefix []string `json:"bsPathSegs,omitempty"` +} + +// UnknownTenant returns a PropagatedValues struct with a default values +// This happens in edge cases where the tenantID is not set in the context +func UnknownTenant() PropagatedValues { + return PropagatedValues{ + TenantID: "unknown-tenant", + } +} + +// NewContextPropagator returns a context propagator that propagates a set of +// string key-value pairs across a workflow +func NewContextPropagator() workflow.ContextPropagator { + return &propagator{} +} + +// Inject injects values from context into headers for propagation +func (s *propagator) Inject(ctx context.Context, writer workflow.HeaderWriter) error { + value := ctx.Value(PropagatedValuesKey) + payload, err := converter.GetDefaultDataConverter().ToPayload(value) + if err != nil { + return err + } + writer.Set(propagationKey, payload) + return nil +} + +// InjectFromWorkflow injects values from context into headers for propagation +func (s *propagator) InjectFromWorkflow(ctx workflow.Context, writer workflow.HeaderWriter) error { + vals := ctx.Value(PropagatedValuesKey).(PropagatedValues) + + payload, err := converter.GetDefaultDataConverter().ToPayload(vals) + if err != nil { + return err + } + writer.Set(propagationKey, payload) + return nil +} + +// errMissingHeaderContextPropagationKey is an edge case that can happen when the UI/CLI is used +// to start, signal, or query a workflow. It's up to the user to define this behavior. +// +// In this example, we just log the error and continue with a default value. +// This allows UI/CLIs to send json payloads. This also protects the workflow from failing to find the missing ctx key. +var errMissingHeaderContextPropagationKey = fmt.Errorf("context propagation key not found in header: %s", propagationKey) + +// Extract extracts values from headers and puts them into context +func (s *propagator) Extract(ctx context.Context, reader workflow.HeaderReader) (context.Context, error) { + value, ok := reader.Get(propagationKey) + if !ok { + fmt.Println(errMissingHeaderContextPropagationKey) + return context.WithValue(ctx, PropagatedValuesKey, UnknownTenant()), nil + } + + var data PropagatedValues + if err := converter.GetDefaultDataConverter().FromPayload(value, &data); err != nil { + return ctx, fmt.Errorf("failed to extract value from header: %w", err) + } + ctx = context.WithValue(ctx, PropagatedValuesKey, data) + + return ctx, nil +} + +// ExtractToWorkflow extracts values from headers and puts them into context +func (s *propagator) ExtractToWorkflow(ctx workflow.Context, reader workflow.HeaderReader) (workflow.Context, error) { + value, ok := reader.Get(propagationKey) + if !ok { + fmt.Println(errMissingHeaderContextPropagationKey) + return workflow.WithValue(ctx, PropagatedValuesKey, UnknownTenant()), nil + } + + var data PropagatedValues + if err := converter.GetDefaultDataConverter().FromPayload(value, &data); err != nil { + return ctx, fmt.Errorf("failed to extract value from header: %w", err) + } + ctx = workflow.WithValue(ctx, PropagatedValuesKey, data) + + return ctx, nil +} diff --git a/blob-store-data-converter/starter/main.go b/blob-store-data-converter/starter/main.go new file mode 100644 index 00000000..745adc35 --- /dev/null +++ b/blob-store-data-converter/starter/main.go @@ -0,0 +1,69 @@ +package main + +import ( + "context" + bsdc "github.com/temporalio/samples-go/blob-store-data-converter" + "github.com/temporalio/samples-go/blob-store-data-converter/blobstore" + "go.temporal.io/api/enums/v1" + "go.temporal.io/sdk/workflow" + "log" + "time" + + "go.temporal.io/sdk/client" + "go.temporal.io/sdk/converter" +) + +func main() { + ctx := context.Background() + + bsClient := blobstore.NewClient() + + // The client is a heavyweight object that should be created once per process. + c, err := client.Dial(client.Options{ + DataConverter: bsdc.NewDataConverter( + converter.GetDefaultDataConverter(), + bsClient, + ), + // Use a ContextPropagator so that the KeyID value set in the workflow context is + // also available in the context for activities. + ContextPropagators: []workflow.ContextPropagator{ + bsdc.NewContextPropagator(), + }, + }) + if err != nil { + log.Fatalln("Unable to create client", err) + } + defer c.Close() + + ctx = context.WithValue(ctx, bsdc.PropagatedValuesKey, bsdc.PropagatedValues{ + TenantID: "tenant12", + BlobNamePrefix: []string{"starter"}, + }) + + workflowOptions := client.StartWorkflowOptions{ + ID: "blobstore_codec", + TaskQueue: "blobstore_codec", + WorkflowTaskTimeout: 10 * time.Second, // encoding/decoding time counts towards this timeout + WorkflowIDConflictPolicy: enums.WORKFLOW_ID_CONFLICT_POLICY_TERMINATE_EXISTING, + } + + we, err := c.ExecuteWorkflow( + ctx, + workflowOptions, + bsdc.Workflow, + "StarterSays: big big blob", + ) + if err != nil { + log.Fatalln("Unable to execute workflow", err) + } + + log.Println("Started workflow", "WorkflowID", we.GetID()) + + // Synchronously wait for the workflow completion. + var result string + err = we.Get(ctx, &result) + if err != nil { + log.Fatalln("Unable get workflow result", err) + } + log.Println("Workflow result:", result) +} diff --git a/blob-store-data-converter/worker/main.go b/blob-store-data-converter/worker/main.go new file mode 100644 index 00000000..e8747f97 --- /dev/null +++ b/blob-store-data-converter/worker/main.go @@ -0,0 +1,44 @@ +package main + +import ( + bsdc "github.com/temporalio/samples-go/blob-store-data-converter" + "github.com/temporalio/samples-go/blob-store-data-converter/blobstore" + "go.temporal.io/sdk/client" + "go.temporal.io/sdk/converter" + "go.temporal.io/sdk/worker" + "go.temporal.io/sdk/workflow" + "log" +) + +func main() { + bsClient := blobstore.NewClient() + + // The client and worker are heavyweight objects that should be created once per process. + c, err := client.Dial(client.Options{ + // Calls to the blob store will probably be a network call with inherent latency, this may trigger deadlock detection + DataConverter: workflow.DataConverterWithoutDeadlockDetection(bsdc.NewDataConverter( + converter.GetDefaultDataConverter(), + bsClient, + )), + + // Use a ContextPropagator so that the KeyID value set in the workflow context is + // also available in the context for activities. + ContextPropagators: []workflow.ContextPropagator{ + bsdc.NewContextPropagator(), + }, + }) + if err != nil { + log.Fatalln("Unable to create client", err) + } + defer c.Close() + + w := worker.New(c, "blobstore_codec", worker.Options{}) + + w.RegisterWorkflow(bsdc.Workflow) + w.RegisterActivity(bsdc.Activity) + + err = w.Run(worker.InterruptCh()) + if err != nil { + log.Fatalln("Unable to start worker", err) + } +} diff --git a/blob-store-data-converter/workflow.go b/blob-store-data-converter/workflow.go new file mode 100644 index 00000000..8ccaa891 --- /dev/null +++ b/blob-store-data-converter/workflow.go @@ -0,0 +1,67 @@ +package blobstore_data_converter + +import ( + "context" + "fmt" + "time" + + "go.temporal.io/sdk/activity" + "go.temporal.io/sdk/workflow" +) + +// Workflow is a standard workflow definition. +// Note that the Workflow and Activity doesn't need to care that +// their inputs/results are being stored in a blog store and not on the workflow history. +func Workflow(ctx workflow.Context, name string) (string, error) { + ao := workflow.ActivityOptions{ + StartToCloseTimeout: 10 * time.Second, + } + ctx = workflow.WithActivityOptions(ctx, ao) + + logger := workflow.GetLogger(ctx) + logger.Info("workflow started", "name", name) + + ctxVal, ok := ctx.Value(PropagatedValuesKey).(PropagatedValues) + if !ok { + err := fmt.Errorf("failed to find our propagated values in the context") + logger.Error(err.Error()) + return "", err + } + + fmt.Printf("workflow injected from starter ctx value: %+v\n", ctxVal) + wfInfo := workflow.GetInfo(ctx) + ctxVal.BlobNamePrefix = []string{wfInfo.WorkflowType.Name, wfInfo.WorkflowExecution.ID} + ctx = workflow.WithValue(ctx, PropagatedValuesKey, ctxVal) + fmt.Printf("workflow updated in workflow ctx value: %+v\n", ctxVal) + + info := map[string]string{ + "name": name, + } + + var result string + err := workflow.ExecuteActivity(ctx, Activity, info).Get(ctx, &result) + if err != nil { + logger.Error("Activity failed.", "Error", err) + return "", err + } + + result = "WorkflowSays: " + result + fmt.Println("workflow completed.", "result", result) + + return result, nil +} + +func Activity(ctx context.Context, info map[string]string) (string, error) { + logger := activity.GetLogger(ctx) + logger.Info("Activity", "info", info) + + val := ctx.Value(PropagatedValuesKey) + fmt.Printf("Activity ctx value: %+v\n", val) + + name, ok := info["name"] + if !ok { + name = "someone" + } + + return "ActivitySays: " + name + "!", nil +} diff --git a/blob-store-data-converter/workflow_test.go b/blob-store-data-converter/workflow_test.go new file mode 100644 index 00000000..52955198 --- /dev/null +++ b/blob-store-data-converter/workflow_test.go @@ -0,0 +1,42 @@ +package blobstore_data_converter + +import ( + "github.com/stretchr/testify/mock" + commonpb "go.temporal.io/api/common/v1" + "go.temporal.io/sdk/converter" + "go.temporal.io/sdk/workflow" + "testing" + + "github.com/stretchr/testify/require" + "go.temporal.io/sdk/testsuite" +) + +func Test_Workflow(t *testing.T) { + testSuite := &testsuite.WorkflowTestSuite{} + env := testSuite.NewTestWorkflowEnvironment() + + // Set up the environment with the expected context propagators and data converter + env.SetContextPropagators([]workflow.ContextPropagator{NewContextPropagator()}) + headerDC := converter.GetDefaultDataConverter() + p, err := headerDC.ToPayload(PropagatedValues{ + TenantID: "test-tenant", + BlobNamePrefix: []string{t.Name()}, + }) + require.NoError(t, err) + env.SetHeader(&commonpb.Header{ + Fields: map[string]*commonpb.Payload{ + propagationKey: p, + }, + }) + + // Mock activity implementation + env.OnActivity(Activity, mock.Anything, mock.Anything).Return("Hello From TestActivity!", nil) + + env.ExecuteWorkflow(Workflow, "Temporal") + + require.True(t, env.IsWorkflowCompleted()) + require.NoError(t, env.GetWorkflowError()) + var result string + require.NoError(t, env.GetWorkflowResult(&result)) + require.Equal(t, "WorkflowSays: Hello From TestActivity!", result) +}