-
Notifications
You must be signed in to change notification settings - Fork 202
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
12 changed files
with
731 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,53 @@ | ||
# Blobstore DataConverter | ||
This sample demonstrates how to use the DataConverter to store large payloads greater than a certain size | ||
in a blobstore and passes the object path around in the Temporal Event History. | ||
|
||
The payload size limit is set in [codec.go: `payloadSizeLimit`](./codec.go#L20). | ||
|
||
It relies on the use of context propagation to pass blobstore config metadata, like object path prefixes. | ||
|
||
In this example, we prefix all object paths with a `tenantID` to better object lifecycle in the blobstore. | ||
|
||
> [!NOTE] | ||
> The time it takes to encode/decode payloads is counted in the `StartWorkflowOptions.WorkflowTaskTimeout`, | ||
> which has a [absolute max of 2 minutes](https://github.com/temporalio/temporal/blob/2a0f6b238f6cdab768098194436b0dda453c8064/common/constants.go#L68). | ||
> [!WARNING] | ||
> As of `Temporal UI v2.33.1` (`Temporal v1.25.2`), **does not** have the ability to send context headers. | ||
> This means that Workflow Start, Signal, Queries, etc. from the UI/CLI will pass payloads to the codec-server but the | ||
> worker needs to handle a missing context propagation header. | ||
> | ||
> In this sample when the header is missing, we use a default of `DefaultPropagatedValues()`, | ||
> see [propagator.go: `missingHeaderContextPropagationKeyError`](./propagator.go#L66). | ||
> | ||
> This allows this sample to still work with the UI/CLI. This maybe not suitable depending on your requirements. | ||
|
||
### Steps to run this sample: | ||
1. Run a [Temporal service](https://github.com/temporalio/samples-go/tree/main/#how-to-use) | ||
2. Run the following command to start the worker | ||
``` | ||
go run worker/main.go | ||
``` | ||
3. Run the following command to start the example | ||
``` | ||
go run starter/main.go | ||
``` | ||
4. Open the Temporal Web UI and observe the following: | ||
- Workflow Input and Activity Input values will be in plain text | ||
- Activity Result and Workflow Result will be an object path | ||
5. Run the following command to start the remote codec server | ||
``` | ||
go run ./codec-server | ||
``` | ||
6. Open the Temporal Web UI and observe the workflow execution, all payloads will now be fully expanded. | ||
7. You can use the Temporal CLI as well | ||
``` | ||
# payloads will be obfuscated object paths | ||
temporal workflow show --env local -w WORKFLOW_ID | ||
# payloads will be fully rendered json | ||
temporal --codec-endpoint 'http://localhost:8081/' workflow show -w WORKFLOW_ID | ||
`````` | ||
Note: Please see the [codec-server](../codec-server/) sample for a more complete example of a codec server which provides oauth. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,58 @@ | ||
package blobstore | ||
|
||
import ( | ||
"fmt" | ||
"os" | ||
"strings" | ||
"time" | ||
) | ||
|
||
type Client struct { | ||
dir string | ||
simulateNetworkLatency time.Duration | ||
} | ||
|
||
func NewClient() *Client { | ||
return &Client{ | ||
dir: "/tmp/temporal-sample/blob-store-data-converter/blobs", | ||
simulateNetworkLatency: 1 * time.Second, | ||
} | ||
} | ||
|
||
func NewTestClient() *Client { | ||
return &Client{ | ||
dir: "/tmp/temporal-sample/blob-store-data-converter/test-blobs", | ||
simulateNetworkLatency: 0, | ||
} | ||
} | ||
|
||
func (b *Client) SaveBlob(key string, data []byte) error { | ||
err := os.MkdirAll(b.dir, 0755) | ||
if err != nil { | ||
return fmt.Errorf("failed to create directory %s: %w", b.dir, err) | ||
} | ||
|
||
path := fmt.Sprintf(b.dir + "/" + strings.ReplaceAll(key, "/", "_")) | ||
fmt.Println("saving blob to: ", path) | ||
err = os.WriteFile(path, data, 0644) | ||
if err != nil { | ||
return fmt.Errorf("failed to save blob: %w", err) | ||
} | ||
|
||
time.Sleep(b.simulateNetworkLatency) | ||
|
||
return nil | ||
} | ||
|
||
func (b *Client) GetBlob(key string) ([]byte, error) { | ||
path := fmt.Sprintf(b.dir + "/" + strings.ReplaceAll(key, "/", "_")) | ||
fmt.Println("reading blob from: ", path) | ||
data, err := os.ReadFile(path) | ||
if err != nil { | ||
return nil, fmt.Errorf("failed to read blob: %w", err) | ||
} | ||
|
||
time.Sleep(b.simulateNetworkLatency) | ||
|
||
return data, nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,72 @@ | ||
package main | ||
|
||
import ( | ||
"flag" | ||
"fmt" | ||
bsdc "github.com/temporalio/samples-go/blob-store-data-converter" | ||
"github.com/temporalio/samples-go/blob-store-data-converter/blobstore" | ||
"go.temporal.io/sdk/converter" | ||
"log" | ||
"net/http" | ||
"os" | ||
"os/signal" | ||
"strconv" | ||
) | ||
|
||
var portFlag int | ||
var web string | ||
|
||
func init() { | ||
flag.IntVar(&portFlag, "port", 8082, "Port to listen on") | ||
flag.StringVar(&web, "web", "http://localhost:8233", "Temporal UI URL") | ||
} | ||
|
||
func main() { | ||
flag.Parse() | ||
|
||
// This example codec server does not support varying config per namespace, | ||
// decoding for the Temporal Web UI or oauth. | ||
// For a more complete example of a codec server please see the codec-server sample at: | ||
// https://github.com/temporalio/samples-go/tree/main/codec-server | ||
handler := converter.NewPayloadCodecHTTPHandler( | ||
//bsdc.NewBaseCodec(blobstore.NewClient()), | ||
bsdc.NewBlobCodec(blobstore.NewClient(), bsdc.PropagatedValues{}), | ||
) | ||
|
||
srv := &http.Server{ | ||
Addr: "localhost:" + strconv.Itoa(portFlag), | ||
Handler: newCORSHTTPHandler(handler), | ||
} | ||
|
||
errCh := make(chan error, 1) | ||
go func() { | ||
fmt.Printf("allowing CORS Headers for %s\n", web) | ||
fmt.Printf("Listening on http://%s/\n", srv.Addr) | ||
errCh <- srv.ListenAndServe() | ||
}() | ||
|
||
sigCh := make(chan os.Signal, 1) | ||
signal.Notify(sigCh, os.Interrupt) | ||
|
||
select { | ||
case <-sigCh: | ||
_ = srv.Close() | ||
case err := <-errCh: | ||
log.Fatal(err) | ||
} | ||
} | ||
|
||
// newCORSHTTPHandler wraps a HTTP handler with CORS support | ||
func newCORSHTTPHandler(next http.Handler) http.Handler { | ||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { | ||
w.Header().Set("Access-Control-Allow-Origin", web) | ||
w.Header().Set("Access-Control-Allow-Credentials", "true") | ||
w.Header().Set("Access-Control-Allow-Headers", "Authorization,Content-Type,X-Namespace,X-CSRF-Token,Caller-Type") | ||
|
||
if r.Method == "OPTIONS" { | ||
return | ||
} | ||
|
||
next.ServeHTTP(w, r) | ||
}) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,101 @@ | ||
package blobstore_data_converter | ||
|
||
import ( | ||
"fmt" | ||
"github.com/google/uuid" | ||
"github.com/temporalio/samples-go/blob-store-data-converter/blobstore" | ||
commonpb "go.temporal.io/api/common/v1" | ||
"go.temporal.io/sdk/converter" | ||
"strings" | ||
) | ||
|
||
const ( | ||
MetadataEncodingBlobStorePlain = "blobstore/plain" | ||
|
||
// gRPC has a 4MB limit. | ||
// To save some space for other metadata, we should stay around half that. | ||
// | ||
// For this example, as a proof of concept, we'll use much smaller size limit. | ||
payloadSizeLimit = 33 | ||
) | ||
|
||
// BlobCodec knows where to store the blobs from the PropagatedValues | ||
// Note, see readme for details on missing values | ||
type BlobCodec struct { | ||
client *blobstore.Client | ||
bucket string | ||
tenant string | ||
pathPrefix []string | ||
} | ||
|
||
var _ = converter.PayloadCodec(&BlobCodec{}) // Ensure that BlobCodec implements converter.PayloadCodec | ||
|
||
// NewBlobCodec is aware of where of the propagated context values from the data converter | ||
func NewBlobCodec(c *blobstore.Client, values PropagatedValues) *BlobCodec { | ||
return &BlobCodec{ | ||
client: c, | ||
bucket: "blob://mybucket", | ||
tenant: values.TenantID, | ||
pathPrefix: values.BlobNamePrefix, | ||
} | ||
} | ||
|
||
// Encode knows where to store the blobs from values stored in the context | ||
func (c *BlobCodec) Encode(payloads []*commonpb.Payload) ([]*commonpb.Payload, error) { | ||
result := make([]*commonpb.Payload, len(payloads)) | ||
for i, p := range payloads { | ||
// if the payload is small enough, just send it as is | ||
fmt.Printf("encoding payload with len(%s): %d\n", string(p.Data), len(p.Data)) | ||
if len(p.Data) < payloadSizeLimit { | ||
result[i] = &commonpb.Payload{Metadata: p.Metadata, Data: p.Data} | ||
continue | ||
} | ||
|
||
origBytes, err := p.Marshal() | ||
if err != nil { | ||
return payloads, err | ||
} | ||
|
||
// save the data in our blob store db | ||
objectName := strings.Join(c.pathPrefix, "_") + "__" + uuid.New().String() // ensures each blob is unique | ||
path := fmt.Sprintf("%s/%s/%s", c.bucket, c.tenant, objectName) | ||
err = c.client.SaveBlob(path, origBytes) | ||
if err != nil { | ||
return payloads, err | ||
} | ||
|
||
result[i] = &commonpb.Payload{ | ||
Metadata: map[string][]byte{ | ||
"encoding": []byte(MetadataEncodingBlobStorePlain), | ||
}, | ||
Data: []byte(path), | ||
} | ||
} | ||
|
||
return result, nil | ||
} | ||
|
||
// Decode does not need to be context aware because it can fetch the blobs via the payload path | ||
func (c *BlobCodec) Decode(payloads []*commonpb.Payload) ([]*commonpb.Payload, error) { | ||
result := make([]*commonpb.Payload, len(payloads)) | ||
for i, p := range payloads { | ||
if string(p.Metadata["encoding"]) != MetadataEncodingBlobStorePlain { | ||
result[i] = &commonpb.Payload{Metadata: p.Metadata, Data: p.Data} | ||
continue | ||
} | ||
|
||
// fetch it from our blob store db | ||
data, err := c.client.GetBlob(string(p.Data)) | ||
if err != nil { | ||
return payloads, err | ||
} | ||
|
||
result[i] = &commonpb.Payload{} | ||
err = result[i].Unmarshal(data) | ||
if err != nil { | ||
return payloads, err | ||
} | ||
} | ||
|
||
return result, nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,65 @@ | ||
package blobstore_data_converter | ||
|
||
import ( | ||
"context" | ||
"github.com/temporalio/samples-go/blob-store-data-converter/blobstore" | ||
"go.temporal.io/sdk/converter" | ||
"go.temporal.io/sdk/workflow" | ||
) | ||
|
||
type DataConverter struct { | ||
client *blobstore.Client | ||
|
||
parent converter.DataConverter // Until EncodingDataConverter supports workflow.ContextAware we'll store parent here. | ||
|
||
converter.DataConverter // embeds converter.DataConverter | ||
} | ||
|
||
var _ = workflow.ContextAware(&DataConverter{}) // Ensure that DataConverter implements workflow.ContextAware | ||
|
||
// NewDataConverter returns DataConverter, which embeds converter.DataConverter | ||
func NewDataConverter(parent converter.DataConverter, client *blobstore.Client) *DataConverter { | ||
next := []converter.PayloadCodec{ | ||
NewBlobCodec(client, UnknownTenant()), | ||
} | ||
|
||
return &DataConverter{ | ||
client: client, | ||
parent: parent, | ||
DataConverter: converter.NewCodecDataConverter(parent, next...), | ||
} | ||
} | ||
|
||
// WithContext will create a BlobCodec used to store and retrieve payloads from the blob storage | ||
// | ||
// This is called when payloads needs to be passed between the Clients/Activity and the Temporal Server. e.g. | ||
// - From starter to encode/decode Workflow Input and Result | ||
// - For each Activity to encode/decode it's Input and Result | ||
func (dc *DataConverter) WithContext(ctx context.Context) converter.DataConverter { | ||
if vals, ok := ctx.Value(PropagatedValuesKey).(PropagatedValues); ok { | ||
parent := dc.parent | ||
if parentWithContext, ok := parent.(workflow.ContextAware); ok { | ||
parent = parentWithContext.WithContext(ctx) | ||
} | ||
|
||
return converter.NewCodecDataConverter(parent, NewBlobCodec(dc.client, vals)) | ||
} | ||
|
||
return dc | ||
} | ||
|
||
// WithWorkflowContext will create a BlobCodec used to store payloads in blob storage | ||
// | ||
// This is called inside the Workflow to decode/encode the Workflow Input and Result | ||
func (dc *DataConverter) WithWorkflowContext(ctx workflow.Context) converter.DataConverter { | ||
if vals, ok := ctx.Value(PropagatedValuesKey).(PropagatedValues); ok { | ||
parent := dc.parent | ||
if parentWithContext, ok := parent.(workflow.ContextAware); ok { | ||
parent = parentWithContext.WithWorkflowContext(ctx) | ||
} | ||
|
||
return converter.NewCodecDataConverter(parent, NewBlobCodec(dc.client, vals)) | ||
} | ||
|
||
return dc | ||
} |
Oops, something went wrong.