-
Notifications
You must be signed in to change notification settings - Fork 202
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Early-return sample #366
Early-return sample #366
Changes from all commits
7c726e9
509449f
3565530
9b4a47a
733a0b6
ecd78c8
2097b86
a0d812c
d400b38
c509f54
3c68ea4
eac75d8
fb87344
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
### Early-Return Sample | ||
|
||
This sample demonstrates an early-return from a workflow. | ||
|
||
By utilizing Update-with-Start, a client can start a new workflow and synchronously receive | ||
a response mid-workflow, while the workflow continues to run to completion. | ||
|
||
### Steps to run this sample: | ||
1) Run a [Temporal service](https://github.com/temporalio/samples-go/tree/main/#how-to-use). | ||
2) Run the following command to start the worker | ||
``` | ||
go run early-return/worker/main.go | ||
``` | ||
3) Run the following command to start the example | ||
``` | ||
go run early-return/starter/main.go | ||
``` |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,57 @@ | ||
package main | ||
|
||
import ( | ||
"context" | ||
"log" | ||
"time" | ||
|
||
"github.com/pborman/uuid" | ||
"github.com/temporalio/samples-go/early-return" | ||
"go.temporal.io/sdk/client" | ||
) | ||
|
||
func main() { | ||
c, err := client.Dial(client.Options{}) | ||
if err != nil { | ||
log.Fatalln("Unable to create client", err) | ||
} | ||
defer c.Close() | ||
|
||
ctxWithTimeout, cancel := context.WithTimeout(context.Background(), 5*time.Second) | ||
defer cancel() | ||
|
||
updateOperation := client.NewUpdateWithStartWorkflowOperation( | ||
client.UpdateWorkflowOptions{ | ||
UpdateName: earlyreturn.UpdateName, | ||
WaitForStage: client.WorkflowUpdateStageCompleted, | ||
}) | ||
|
||
tx := earlyreturn.Transaction{ID: uuid.New(), SourceAccount: "Bob", TargetAccount: "Alice", Amount: 100} | ||
workflowOptions := client.StartWorkflowOptions{ | ||
ID: "early-return-workflow-ID-" + tx.ID, | ||
TaskQueue: earlyreturn.TaskQueueName, | ||
WithStartOperation: updateOperation, | ||
} | ||
we, err := c.ExecuteWorkflow(ctxWithTimeout, workflowOptions, earlyreturn.Workflow, tx) | ||
if err != nil { | ||
log.Fatalln("Error executing workflow:", err) | ||
} | ||
|
||
log.Println("Started workflow", "WorkflowID", we.GetID(), "RunID", we.GetRunID()) | ||
|
||
updateHandle, err := updateOperation.Get(ctxWithTimeout) | ||
if err != nil { | ||
log.Fatalln("Error obtaining update handle:", err) | ||
} | ||
|
||
err = updateHandle.Get(ctxWithTimeout, nil) | ||
if err != nil { | ||
// The workflow will continue running, cancelling the transaction. | ||
|
||
// NOTE: If the error is retryable, a retry attempt must use a unique workflow ID. | ||
log.Fatalln("Error obtaining update result:", err) | ||
} | ||
|
||
log.Println("Transaction initialized successfully") | ||
// The workflow will continue running, completing the transaction. | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,27 @@ | ||
package main | ||
|
||
import ( | ||
"log" | ||
|
||
"github.com/temporalio/samples-go/early-return" | ||
"go.temporal.io/sdk/client" | ||
"go.temporal.io/sdk/worker" | ||
) | ||
|
||
func main() { | ||
// The client and worker are heavyweight objects that should be created once per process. | ||
c, err := client.Dial(client.Options{}) | ||
if err != nil { | ||
log.Fatalln("Unable to create client", err) | ||
} | ||
defer c.Close() | ||
|
||
w := worker.New(c, earlyreturn.TaskQueueName, worker.Options{}) | ||
|
||
w.RegisterWorkflow(earlyreturn.Workflow) | ||
|
||
err = w.Run(worker.InterruptCh()) | ||
if err != nil { | ||
log.Fatalln("Unable to start worker", err) | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,116 @@ | ||
package earlyreturn | ||
|
||
import ( | ||
"context" | ||
"errors" | ||
"fmt" | ||
"time" | ||
|
||
"go.temporal.io/sdk/activity" | ||
"go.temporal.io/sdk/workflow" | ||
) | ||
|
||
const ( | ||
UpdateName = "early-return" | ||
TaskQueueName = "early-return-tq" | ||
) | ||
|
||
type Transaction struct { | ||
ID string | ||
SourceAccount string | ||
TargetAccount string | ||
Amount int // in cents | ||
|
||
initErr error | ||
initDone bool | ||
} | ||
|
||
// Workflow processes a transaction in two phases. First, the transaction is initialized, and if successful, | ||
// it proceeds to completion. However, if initialization fails - due to validation errors or transient | ||
// issues (e.g., network connectivity problems) - the transaction is cancelled. | ||
// | ||
// By utilizing Update-with-Start, the client can initiate the workflow and immediately receive the result of | ||
// the initialization in a single round trip, even before the transaction processing completes. The remainder | ||
// of the transaction is then processed asynchronously. | ||
func Workflow(ctx workflow.Context, tx Transaction) error { | ||
return run(ctx, tx) | ||
} | ||
|
||
func run(ctx workflow.Context, tx Transaction) error { | ||
logger := workflow.GetLogger(ctx) | ||
|
||
if err := workflow.SetUpdateHandler( | ||
ctx, | ||
UpdateName, | ||
tx.returnInitResult, | ||
); err != nil { | ||
return err | ||
} | ||
|
||
// Phase 1: Initialize the transaction synchronously. | ||
// | ||
// By using a local activity, an additional server roundtrip is avoided. | ||
// See https://docs.temporal.io/activities#local-activity for more details. | ||
|
||
activityOptions := workflow.WithLocalActivityOptions(ctx, workflow.LocalActivityOptions{ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I doubt you'll want the default retry options with such an aggressive schedule to close of 2s |
||
ScheduleToCloseTimeout: 5 * time.Second, // short timeout to avoid another Workflow Task being scheduled | ||
}) | ||
tx.initErr = workflow.ExecuteLocalActivity(activityOptions, tx.InitTransaction).Get(ctx, nil) | ||
tx.initDone = true | ||
|
||
// Phase 2: Complete or cancel the transaction asychronously. | ||
|
||
activityCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ | ||
StartToCloseTimeout: 30 * time.Second, | ||
}) | ||
if tx.initErr != nil { | ||
logger.Error(fmt.Sprintf("cancelling transaction due to init error: %v", tx.initErr)) | ||
|
||
// Transaction failed to be initialized or not quickly enough; cancel the transaction. | ||
if err := workflow.ExecuteActivity(activityCtx, tx.CancelTransaction).Get(ctx, nil); err != nil { | ||
return fmt.Errorf("cancelling the transaction failed: %w", err) | ||
} | ||
|
||
return tx.initErr | ||
} | ||
|
||
logger.Info("completing transaction") | ||
|
||
// Transaction was initialized successfully; complete the transaction. | ||
if err := workflow.ExecuteActivity(activityCtx, tx.CompleteTransaction).Get(ctx, nil); err != nil { | ||
return fmt.Errorf("completing the transaction failed: %w", err) | ||
} | ||
|
||
return nil | ||
} | ||
|
||
func (tx *Transaction) returnInitResult(ctx workflow.Context) error { | ||
if err := workflow.Await(ctx, func() bool { return tx.initDone }); err != nil { | ||
return fmt.Errorf("transaction init cancelled: %w", err) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. AFAICT, this is the only untested line of the workflow. I'm not quite sure how to use the Go SDK testing env to trigger this case. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You would need to test cancellation using https://pkg.go.dev/go.temporal.io/[email protected]/internal#TestWorkflowEnvironment.CancelWorkflow no? Not saying you need to for this sample though. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah, that's the missing piece! Yeah, I agree, it's prob fine without. |
||
} | ||
return tx.initErr | ||
} | ||
|
||
func (tx *Transaction) InitTransaction(ctx context.Context) error { | ||
logger := activity.GetLogger(ctx) | ||
if tx.Amount <= 0 { | ||
return errors.New("invalid Amount") | ||
} | ||
time.Sleep(500 * time.Millisecond) | ||
logger.Info("Transaction initialized") | ||
return nil | ||
} | ||
|
||
func (tx *Transaction) CancelTransaction(ctx context.Context) error { | ||
logger := activity.GetLogger(ctx) | ||
time.Sleep(1 * time.Second) | ||
logger.Info("Transaction cancelled") | ||
return nil | ||
} | ||
|
||
func (tx *Transaction) CompleteTransaction(ctx context.Context) error { | ||
logger := activity.GetLogger(ctx) | ||
time.Sleep(1 * time.Second) | ||
logger.Info("Transaction completed") | ||
return nil | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,102 @@ | ||
package earlyreturn | ||
|
||
import ( | ||
"fmt" | ||
"testing" | ||
|
||
"github.com/pborman/uuid" | ||
"github.com/stretchr/testify/mock" | ||
"github.com/stretchr/testify/require" | ||
"go.temporal.io/sdk/testsuite" | ||
) | ||
|
||
func Test_CompleteTransaction(t *testing.T) { | ||
testSuite := &testsuite.WorkflowTestSuite{} | ||
env := testSuite.NewTestWorkflowEnvironment() | ||
|
||
tx := Transaction{ID: uuid.New(), SourceAccount: "Bob", TargetAccount: "Alice", Amount: 100} | ||
env.RegisterActivity(tx.InitTransaction) | ||
env.RegisterActivity(tx.CompleteTransaction) | ||
|
||
uc := &updateCallback{} | ||
env.RegisterDelayedCallback(func() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: would add a comment explaining this will guarantee the update is sent in the first WFT. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. FWIW I'm confused even after Quinn's comment. Why is this test seemingly not using the normal interface for UwS? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it not backed by the Java test service? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Correct; Go SDK has it's own time-skipping test server, and it has its own APIs. We haven't added an UwS API since this approach here works, too. But I agree, it's not immediately obvious (at least wasn't for me, had to ask Quinn). |
||
env.UpdateWorkflow(UpdateName, uuid.New(), uc) | ||
}, 0) // NOTE: zero delay ensures Update is delivered in first workflow task | ||
env.ExecuteWorkflow(Workflow, tx) | ||
|
||
require.True(t, env.IsWorkflowCompleted()) | ||
require.NoError(t, env.GetWorkflowError()) | ||
require.NoError(t, uc.completeErr) | ||
} | ||
|
||
func Test_CompleteTransaction_Fails(t *testing.T) { | ||
testSuite := &testsuite.WorkflowTestSuite{} | ||
env := testSuite.NewTestWorkflowEnvironment() | ||
|
||
tx := Transaction{ID: uuid.New(), SourceAccount: "Bob", TargetAccount: "Alice", Amount: 100} | ||
env.RegisterActivity(tx.InitTransaction) | ||
env.RegisterActivity(tx.CompleteTransaction) | ||
|
||
env.OnActivity(tx.CompleteTransaction, mock.Anything).Return(fmt.Errorf("crash")) | ||
|
||
uc := &updateCallback{} | ||
env.RegisterDelayedCallback(func() { | ||
env.UpdateWorkflow(UpdateName, uuid.New(), uc) | ||
}, 0) | ||
env.ExecuteWorkflow(Workflow, tx) | ||
|
||
require.True(t, env.IsWorkflowCompleted()) | ||
require.ErrorContains(t, env.GetWorkflowError(), "crash") | ||
} | ||
|
||
func Test_CancelTransaction(t *testing.T) { | ||
testSuite := &testsuite.WorkflowTestSuite{} | ||
env := testSuite.NewTestWorkflowEnvironment() | ||
|
||
tx := Transaction{ID: uuid.New(), SourceAccount: "Bob", TargetAccount: "Alice", Amount: -1} // invalid! | ||
env.RegisterActivity(tx.InitTransaction) | ||
env.RegisterActivity(tx.CancelTransaction) | ||
|
||
uc := &updateCallback{} | ||
env.RegisterDelayedCallback(func() { | ||
env.UpdateWorkflow(UpdateName, uuid.New(), uc) | ||
}, 0) | ||
env.ExecuteWorkflow(Workflow, tx) | ||
|
||
require.True(t, env.IsWorkflowCompleted()) | ||
require.ErrorContains(t, uc.completeErr, "invalid Amount") | ||
require.ErrorContains(t, env.GetWorkflowError(), "invalid Amount") | ||
} | ||
|
||
func Test_CancelTransaction_Fails(t *testing.T) { | ||
testSuite := &testsuite.WorkflowTestSuite{} | ||
env := testSuite.NewTestWorkflowEnvironment() | ||
|
||
tx := Transaction{ID: uuid.New(), SourceAccount: "Bob", TargetAccount: "Alice", Amount: -1} // invalid! | ||
env.RegisterActivity(tx.InitTransaction) | ||
env.RegisterActivity(tx.CancelTransaction) | ||
|
||
env.OnActivity(tx.CancelTransaction, mock.Anything).Return(fmt.Errorf("crash")) | ||
|
||
uc := &updateCallback{} | ||
env.RegisterDelayedCallback(func() { | ||
env.UpdateWorkflow(UpdateName, uuid.New(), uc) | ||
}, 0) | ||
env.ExecuteWorkflow(Workflow, tx) | ||
|
||
require.True(t, env.IsWorkflowCompleted()) | ||
require.ErrorContains(t, uc.completeErr, "invalid Amount") | ||
require.ErrorContains(t, env.GetWorkflowError(), "crash") | ||
} | ||
|
||
type updateCallback struct { | ||
completeErr error | ||
} | ||
|
||
func (uc *updateCallback) Accept() {} | ||
|
||
func (uc *updateCallback) Reject(err error) {} | ||
|
||
func (uc *updateCallback) Complete(success interface{}, err error) { | ||
uc.completeErr = err | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Arguably this logic could be flipped and users may prefer that in many scenarios. Can flip where the update is the init and the primary workflow waits for an init update before continuing. There are tradeoffs to both.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In all my examples until now, I've actually had it flipped. Drew convinced me to do it the other way around, but I'm not quite sure anymore why. What makes you say that the other way might be more preferable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since updates are not durable on admitted that is why we did it this way so this is the only safe way to write this type of workflow. if I remember correctly
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't necessarily think it's more preferable, there are just tradeoffs. The main tradeoff is probably what you want the workflow to do when it's not called via update with start. If you want it to function normally, no problem, if you want it to wait for an update to get it moving, probably want logic flipped.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It isn't more preferable this is the only safe way to write it since update with start is not transactional
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My main reasoning is what chad said: you want the workflow to function properly when the client doesn't call it with an update.
Quinn's reasoning makes sense too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we at least guarantee the update and the start are in the same task? If we don't, all latency bets are off anyways. But whether primary workflow waits on init from update or update waits on init from primary workflow is immaterial I'd think (except if the update can come in a separate task which would be a concern).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We do guarantee it for Update-with-Start.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 Then yeah I think it's probably just semantics on which coroutine waits on the other and probably doesn't matter
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another strong reason to do it this way is that, if the update did the init, then the workflow author has to make sure the workflow is correct in the face of multiple calls to the update handler, i.e. normal updates being sent subsequent to the update with start. But with all steps in the main workflow, multiple calls to the update handler are automatically correct.