Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[POC] Show a way to implement cancellable updates #297

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ go 1.16

replace github.com/cactus/go-statsd-client => github.com/cactus/go-statsd-client v3.2.1+incompatible

replace (
go.temporal.io/sdk v1.23.1 => ../sdk-go
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update interceptors don't work on the current release of the Go SDK

)
require (
github.com/HdrHistogram/hdrhistogram-go v1.1.2 // indirect
github.com/golang/mock v1.6.0
Expand Down
14 changes: 14 additions & 0 deletions update-cancel/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
### Update Cancel Sample

Here we show an example of a workflow with a long running update. Through the use of an interceptor we are able to cancel the update by sending another special "cancel" update.

### Steps to run this sample:
1) Run a [Temporal service](https://github.com/temporalio/samples-go/tree/main/#how-to-use).
2) Run the following command to start the worker
```
go run update-cancel/worker/main.go
```
3) Run the following command to start the example
```
go run update-cancel/starter/main.go
```
66 changes: 66 additions & 0 deletions update-cancel/interceptor.go
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hrmm, not sure this is a pattern we should encourage, especially hiding this logic in an interceptor. I think it may be clearer if we should how to cancel an update context from another update explicitly in workflow code. If they want to make that generic to apply to all updates hidden away in an interceptor, then they can.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason I moved it into the interceptor is to show you don't need to explicitly add support in your workflow

Copy link
Member

@cretz cretz Jul 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, but I don't think we should encourage interceptors like this nor provide a general purpose utility. I think we should show a simple example how you can cancel a single update and if someone wants to go the extra step of hiding this from workflow authors, they can.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it just makes the code a less elegant to do it in the workflow, but for the samples repo your right we shouldn't encourage usage of interceptors like this

Copy link
Member

@cretz cretz Jul 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Definitely less elegant and less reusable. Acknowledged. Kinda on purpose, which is admittedly a bit strange, but we don't want people to blindly copy the approach. What I've found is that many times people want to customize these patterns so seeing them in-workflow is easy to understand.

And really, if we wanted to write this properly for reuse, I might recommend a utility instead of an interceptor.

Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package update_cancel

import (
"errors"

"go.temporal.io/sdk/interceptor"
"go.temporal.io/sdk/workflow"
)

const (
UpdateCancelHandle = "update-cancel"
)

type workerInterceptor struct {
interceptor.WorkerInterceptorBase
}

func NewWorkerInterceptor() interceptor.WorkerInterceptor {
return &workerInterceptor{}
}

func (w *workerInterceptor) InterceptWorkflow(
ctx workflow.Context,
next interceptor.WorkflowInboundInterceptor,
) interceptor.WorkflowInboundInterceptor {
i := &workflowInboundInterceptor{root: w}
i.Next = next
return i
}

type workflowInboundInterceptor struct {
ctxMap map[string]workflow.CancelFunc
interceptor.WorkflowInboundInterceptorBase
root *workerInterceptor
}

func (w *workflowInboundInterceptor) Init(outbound interceptor.WorkflowOutboundInterceptor) error {
w.ctxMap = make(map[string]workflow.CancelFunc)
return w.Next.Init(outbound)
}

func (w *workflowInboundInterceptor) ExecuteWorkflow(ctx workflow.Context, in *interceptor.ExecuteWorkflowInput) (interface{}, error) {
err := workflow.SetUpdateHandlerWithOptions(ctx, UpdateCancelHandle, func(ctx workflow.Context, updateID string) error {
// Cancel the update
w.ctxMap[updateID]()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably want to delete from ctxMap here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't delete because I wanted duplicate cancels to behave the same as the server, but I didn't implement that behaviour yet

return nil
}, workflow.UpdateHandlerOptions{
Validator: func(ctx workflow.Context, updateID string) error {
// Validate that the update ID is known
if _, ok := w.ctxMap[updateID]; !ok {
return errors.New("unknown update ID")
}
return nil
},
})
if err != nil {
return nil, err
}
return w.Next.ExecuteWorkflow(ctx, in)
}

func (w *workflowInboundInterceptor) ExecuteUpdate(ctx workflow.Context, in *interceptor.UpdateInput) (interface{}, error) {
ctx, cancel := workflow.WithCancel(ctx)
w.ctxMap[workflow.GetUpdateInfo(ctx).ID] = cancel
return w.Next.ExecuteUpdate(ctx, in)
}
80 changes: 80 additions & 0 deletions update-cancel/starter/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package main

import (
"context"
"log"
"time"

update_cancel "github.com/temporalio/samples-go/update-cancel"
enumspb "go.temporal.io/api/enums/v1"
updatepb "go.temporal.io/api/update/v1"
"go.temporal.io/sdk/client"
)

func main() {
c, err := client.Dial(client.Options{})
if err != nil {
log.Fatalln("Unable to create client", err)
}
defer c.Close()

workflowOptions := client.StartWorkflowOptions{
ID: "update_cancel-workflow-ID",
TaskQueue: "update_cancel",
}

we, err := c.ExecuteWorkflow(context.Background(), workflowOptions, update_cancel.UpdateWorkflow)
if err != nil {
log.Fatalln("Unable to execute workflow", err)
}

log.Println("Started workflow", "WorkflowID", we.GetID(), "RunID", we.GetRunID())

cancellableUpdateID := "cancellable-update-ID"
log.Println("Sending update", "UpdateID", cancellableUpdateID)

// Send an async update request.
handle, err := c.UpdateWorkflowWithOptions(context.Background(), &client.UpdateWorkflowWithOptionsRequest{
WorkflowID: we.GetID(),
RunID: we.GetRunID(),
UpdateName: update_cancel.UpdateHandle,
UpdateID: cancellableUpdateID,
WaitPolicy: &updatepb.WaitPolicy{
LifecycleStage: enumspb.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_ACCEPTED,
},
Args: []interface{}{
4 * time.Hour,
},
})
if err != nil {
log.Fatalln("Unable to execute update", err)
}
log.Println("Sent update")

log.Println("Waiting 5s to send cancel")
time.Sleep(5 * time.Second)
log.Println("Sending cancel to update", "UpdateID", cancellableUpdateID)

_, err = c.UpdateWorkflow(context.Background(), we.GetID(), we.GetRunID(), update_cancel.UpdateCancelHandle, cancellableUpdateID)
if err != nil {
log.Fatalln("Unable to send cancel", err)
}
log.Println("Sent cancel")

var sleepTime time.Duration
err = handle.Get(context.Background(), &sleepTime)
if err != nil {
log.Fatalln("Unable to get update result", err)
}
// Update will only sleep for 5s because it was cancelled.
log.Println("Update slept for:", sleepTime)

if err = c.SignalWorkflow(context.Background(), we.GetID(), we.GetRunID(), update_cancel.Done, nil); err != nil {
log.Fatalf("failed to send %q signal to workflow: %v", update_cancel.Done, err)
}
var wfresult int
if err = we.Get(context.Background(), &wfresult); err != nil {
log.Fatalf("unable get workflow result: %v", err)
}
log.Println("workflow result:", wfresult)
}
29 changes: 29 additions & 0 deletions update-cancel/update.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package update_cancel

import (
"time"

"go.temporal.io/sdk/workflow"
)

const (
UpdateHandle = "update_handle"
Done = "done"
)

func UpdateWorkflow(ctx workflow.Context) error {
if err := workflow.SetUpdateHandler(
ctx,
UpdateHandle,
func(ctx workflow.Context, sleepTime time.Duration) (time.Duration, error) {
dt := workflow.Now(ctx)
workflow.Sleep(ctx, sleepTime)
return workflow.Now(ctx).Sub(dt), nil
},
); err != nil {
return err
}

_ = workflow.GetSignalChannel(ctx, Done).Receive(ctx, nil)
return ctx.Err()
}
30 changes: 30 additions & 0 deletions update-cancel/worker/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package main

import (
"log"

update_cancel "github.com/temporalio/samples-go/update-cancel"
"go.temporal.io/sdk/client"
sdkinterceptor "go.temporal.io/sdk/interceptor"
"go.temporal.io/sdk/worker"
)

func main() {
// The client and worker are heavyweight objects that should be created once per process.
c, err := client.Dial(client.Options{})
if err != nil {
log.Fatalln("Unable to create client", err)
}
defer c.Close()

w := worker.New(c, "update_cancel", worker.Options{
Interceptors: []sdkinterceptor.WorkerInterceptor{update_cancel.NewWorkerInterceptor()},
})

w.RegisterWorkflow(update_cancel.UpdateWorkflow)

err = w.Run(worker.InterruptCh())
if err != nil {
log.Fatalln("Unable to start worker", err)
}
}