Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

blob storage codec sample #373

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ Each sample demonstrates one feature of the SDK, together with tests.
server to decode payloads for display in tctl and Temporal Web. This setup can be used for any kind of codec, common
examples are compression or encryption.

- [**Blob Store using Data Converters**](./blob-store-data-converter):
Demonstrates how to use the DataConverter to store large payloads greater than a certain size in a blobstore and pass
the object path around in the Temporal Event History.

- [**Query Example**](./query): Demonstrates how to Query the state
of a single Workflow Execution using the `QueryWorkflow` and `SetQueryHandler` APIs. Additional
documentation: [How to Query a Workflow Execution in Go](https://docs.temporal.io/application-development/features/#queries).
Expand Down
53 changes: 53 additions & 0 deletions blob-store-data-converter/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
# Blobstore DataConverter
This sample demonstrates how to use the DataConverter to store large payloads greater than a certain size
in a blobstore and passes the object path around in the Temporal Event History.

The payload size limit is set in [codec.go: `payloadSizeLimit`](./codec.go#L20).

It relies on the use of context propagation to pass blobstore config metadata, like object path prefixes.

In this example, we prefix all object paths with a `tenantID` to better object lifecycle in the blobstore.

> [!NOTE]
> The time it takes to encode/decode payloads is counted in the `StartWorkflowOptions.WorkflowTaskTimeout`,
> which has a [absolute max of 2 minutes](https://github.com/temporalio/temporal/blob/2a0f6b238f6cdab768098194436b0dda453c8064/common/constants.go#L68).

> [!WARNING]
> As of `Temporal UI v2.33.1` (`Temporal v1.25.2`), **does not** have the ability to send context headers.
> This means that Workflow Start, Signal, Queries, etc. from the UI/CLI will pass payloads to the codec-server but the
> worker needs to handle a missing context propagation header.
>
> In this sample when the header is missing, we use a default of `DefaultPropagatedValues()`,
> see [propagator.go: `missingHeaderContextPropagationKeyError`](./propagator.go#L66).
>
> This allows this sample to still work with the UI/CLI. This maybe not suitable depending on your requirements.


### Steps to run this sample:
1. Run a [Temporal service](https://github.com/temporalio/samples-go/tree/main/#how-to-use)
2. Run the following command to start the worker
```
go run worker/main.go
```
3. Run the following command to start the example
```
go run starter/main.go
```
4. Open the Temporal Web UI and observe the following:
- Workflow Input and Activity Input values will be in plain text
- Activity Result and Workflow Result will be an object path
5. Run the following command to start the remote codec server
```
go run ./codec-server
```
6. Open the Temporal Web UI and observe the workflow execution, all payloads will now be fully expanded.
7. You can use the Temporal CLI as well
```
# payloads will be obfuscated object paths
temporal workflow show --env local -w WORKFLOW_ID

# payloads will be fully rendered json
temporal --codec-endpoint 'http://localhost:8081/' workflow show -w WORKFLOW_ID
``````

Note: Please see the [codec-server](../codec-server/) sample for a more complete example of a codec server which provides oauth.
58 changes: 58 additions & 0 deletions blob-store-data-converter/blobstore/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package blobstore

import (
"fmt"
"os"
"strings"
"time"
)

type Client struct {
dir string
simulateNetworkLatency time.Duration
}

func NewClient() *Client {
return &Client{
dir: "/tmp/temporal-sample/blob-store-data-converter/blobs",
simulateNetworkLatency: 1 * time.Second,
}
}

func NewTestClient() *Client {
return &Client{
dir: "/tmp/temporal-sample/blob-store-data-converter/test-blobs",
simulateNetworkLatency: 0,
}
}

func (b *Client) SaveBlob(key string, data []byte) error {
err := os.MkdirAll(b.dir, 0755)
if err != nil {
return fmt.Errorf("failed to create directory %s: %w", b.dir, err)
}

path := fmt.Sprintf(b.dir + "/" + strings.ReplaceAll(key, "/", "_"))
fmt.Println("saving blob to: ", path)
err = os.WriteFile(path, data, 0644)
if err != nil {
return fmt.Errorf("failed to save blob: %w", err)
}

time.Sleep(b.simulateNetworkLatency)

return nil
}

func (b *Client) GetBlob(key string) ([]byte, error) {
path := fmt.Sprintf(b.dir + "/" + strings.ReplaceAll(key, "/", "_"))
fmt.Println("reading blob from: ", path)
data, err := os.ReadFile(path)
if err != nil {
return nil, fmt.Errorf("failed to read blob: %w", err)
}

time.Sleep(b.simulateNetworkLatency)

return data, nil
}
72 changes: 72 additions & 0 deletions blob-store-data-converter/codec-server/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package main

import (
"flag"
"fmt"
bsdc "github.com/temporalio/samples-go/blob-store-data-converter"
"github.com/temporalio/samples-go/blob-store-data-converter/blobstore"
"go.temporal.io/sdk/converter"
"log"
"net/http"
"os"
"os/signal"
"strconv"
)

var portFlag int
var web string

func init() {
flag.IntVar(&portFlag, "port", 8082, "Port to listen on")
flag.StringVar(&web, "web", "http://localhost:8233", "Temporal UI URL")
}

func main() {
flag.Parse()

// This example codec server does not support varying config per namespace,
// decoding for the Temporal Web UI or oauth.
// For a more complete example of a codec server please see the codec-server sample at:
// https://github.com/temporalio/samples-go/tree/main/codec-server
handler := converter.NewPayloadCodecHTTPHandler(
//bsdc.NewBaseCodec(blobstore.NewClient()),
bsdc.NewBlobCodec(blobstore.NewClient(), bsdc.PropagatedValues{}),
)

srv := &http.Server{
Addr: "localhost:" + strconv.Itoa(portFlag),
Handler: newCORSHTTPHandler(handler),
}

errCh := make(chan error, 1)
go func() {
fmt.Printf("allowing CORS Headers for %s\n", web)
fmt.Printf("Listening on http://%s/\n", srv.Addr)
errCh <- srv.ListenAndServe()
}()

sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, os.Interrupt)

select {
case <-sigCh:
_ = srv.Close()
case err := <-errCh:
log.Fatal(err)
}
}

// newCORSHTTPHandler wraps a HTTP handler with CORS support
func newCORSHTTPHandler(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Access-Control-Allow-Origin", web)
w.Header().Set("Access-Control-Allow-Credentials", "true")
w.Header().Set("Access-Control-Allow-Headers", "Authorization,Content-Type,X-Namespace,X-CSRF-Token,Caller-Type")

if r.Method == "OPTIONS" {
return
}

next.ServeHTTP(w, r)
})
}
101 changes: 101 additions & 0 deletions blob-store-data-converter/codec.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package blobstore_data_converter

import (
"fmt"
"github.com/google/uuid"
"github.com/temporalio/samples-go/blob-store-data-converter/blobstore"
commonpb "go.temporal.io/api/common/v1"
"go.temporal.io/sdk/converter"
"strings"
)

const (
MetadataEncodingBlobStorePlain = "blobstore/plain"

// gRPC has a 4MB limit.
// To save some space for other metadata, we should stay around half that.
//
// For this example, as a proof of concept, we'll use much smaller size limit.
payloadSizeLimit = 37
)

// BlobCodec knows where to store the blobs from the PropagatedValues
// Note, see readme for details on missing values
type BlobCodec struct {
client *blobstore.Client
bucket string
tenant string
pathPrefix []string
}

var _ = converter.PayloadCodec(&BlobCodec{}) // Ensure that BlobCodec implements converter.PayloadCodec

// NewBlobCodec is aware of where of the propagated context values from the data converter
func NewBlobCodec(c *blobstore.Client, values PropagatedValues) *BlobCodec {
return &BlobCodec{
client: c,
bucket: "blob://mybucket",
tenant: values.TenantID,
pathPrefix: values.BlobNamePrefix,
}
}

// Encode knows where to store the blobs from values stored in the context
func (c *BlobCodec) Encode(payloads []*commonpb.Payload) ([]*commonpb.Payload, error) {
result := make([]*commonpb.Payload, len(payloads))
for i, p := range payloads {
// if the payload is small enough, just send it as is
fmt.Printf("encoding payload with len(%s): %d\n", string(p.Data), len(p.Data))
if len(p.Data) < payloadSizeLimit {
result[i] = &commonpb.Payload{Metadata: p.Metadata, Data: p.Data}
continue
}

origBytes, err := p.Marshal()
if err != nil {
return payloads, err
}

// save the data in our blob store db
objectName := strings.Join(c.pathPrefix, "_") + "__" + uuid.New().String() // ensures each blob is unique
path := fmt.Sprintf("%s/%s/%s", c.bucket, c.tenant, objectName)
err = c.client.SaveBlob(path, origBytes)
if err != nil {
return payloads, err
}

result[i] = &commonpb.Payload{
Metadata: map[string][]byte{
"encoding": []byte(MetadataEncodingBlobStorePlain),
},
Data: []byte(path),
}
}

return result, nil
}

// Decode does not need to be context aware because it can fetch the blobs via the payload path
func (c *BlobCodec) Decode(payloads []*commonpb.Payload) ([]*commonpb.Payload, error) {
result := make([]*commonpb.Payload, len(payloads))
for i, p := range payloads {
if string(p.Metadata["encoding"]) != MetadataEncodingBlobStorePlain {
result[i] = &commonpb.Payload{Metadata: p.Metadata, Data: p.Data}
continue
}

// fetch it from our blob store db
data, err := c.client.GetBlob(string(p.Data))
if err != nil {
return payloads, err
}

result[i] = &commonpb.Payload{}
err = result[i].Unmarshal(data)
if err != nil {
return payloads, err
}
}

return result, nil
}
65 changes: 65 additions & 0 deletions blob-store-data-converter/data_converter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package blobstore_data_converter

import (
"context"
"github.com/temporalio/samples-go/blob-store-data-converter/blobstore"
"go.temporal.io/sdk/converter"
"go.temporal.io/sdk/workflow"
)

type DataConverter struct {
client *blobstore.Client

parent converter.DataConverter // Until EncodingDataConverter supports workflow.ContextAware we'll store parent here.

converter.DataConverter // embeds converter.DataConverter
}

var _ = workflow.ContextAware(&DataConverter{}) // Ensure that DataConverter implements workflow.ContextAware

// NewDataConverter returns DataConverter, which embeds converter.DataConverter
func NewDataConverter(parent converter.DataConverter, client *blobstore.Client) *DataConverter {
next := []converter.PayloadCodec{
NewBlobCodec(client, UnknownTenant()),
}

return &DataConverter{
client: client,
parent: parent,
DataConverter: converter.NewCodecDataConverter(parent, next...),
}
}

// WithContext will create a BlobCodec used to store and retrieve payloads from the blob storage
//
// This is called when payloads needs to be passed between the Clients/Activity and the Temporal Server. e.g.
// - From starter to encode/decode Workflow Input and Result
// - For each Activity to encode/decode it's Input and Result
func (dc *DataConverter) WithContext(ctx context.Context) converter.DataConverter {
if vals, ok := ctx.Value(PropagatedValuesKey).(PropagatedValues); ok {
parent := dc.parent
if parentWithContext, ok := parent.(workflow.ContextAware); ok {
parent = parentWithContext.WithContext(ctx)
}

return converter.NewCodecDataConverter(parent, NewBlobCodec(dc.client, vals))
}

return dc
}

// WithWorkflowContext will create a BlobCodec used to store payloads in blob storage
//
// This is called inside the Workflow to decode/encode the Workflow Input and Result
func (dc *DataConverter) WithWorkflowContext(ctx workflow.Context) converter.DataConverter {
if vals, ok := ctx.Value(PropagatedValuesKey).(PropagatedValues); ok {
parent := dc.parent
if parentWithContext, ok := parent.(workflow.ContextAware); ok {
parent = parentWithContext.WithWorkflowContext(ctx)
}

return converter.NewCodecDataConverter(parent, NewBlobCodec(dc.client, vals))
}

return dc
}
Loading
Loading