From 7857f03f2566a2b124f2b8899e982ae22b784b0f Mon Sep 17 00:00:00 2001 From: Quinn Klassen Date: Mon, 18 Sep 2023 15:01:59 -0700 Subject: [PATCH 1/2] Add session failure sample --- session-failure/README.md | 25 +++++++++ session-failure/activities.go | 42 +++++++++++++++ session-failure/starter/main.go | 34 ++++++++++++ session-failure/worker/main.go | 34 ++++++++++++ session-failure/workflow.go | 91 ++++++++++++++++++++++++++++++++ session-failure/workflow_test.go | 40 ++++++++++++++ 6 files changed, 266 insertions(+) create mode 100644 session-failure/README.md create mode 100644 session-failure/activities.go create mode 100644 session-failure/starter/main.go create mode 100644 session-failure/worker/main.go create mode 100644 session-failure/workflow.go create mode 100644 session-failure/workflow_test.go diff --git a/session-failure/README.md b/session-failure/README.md new file mode 100644 index 00000000..49823101 --- /dev/null +++ b/session-failure/README.md @@ -0,0 +1,25 @@ +This sample workflow demos how to recover from a session failure inside a workflow + +The workflow first creates a session then starts a short activity meant to simulate preparing the session session, then it starts a long running activity on the session worker. If the session worker goes down for any reason the session will fail to heartbeat and be marked as failed. This will cause any activities running on the session to be cancelled and the workflow to retry the whole sequence on a new session after a timeout. + +### Note on session failure: + +Workflows detect a session worker has gone down through heartbeats by the session worker, so the workflow has a stale view of the session workers state. This is important to consider if your +workflow schedules any activities on a session that can fail due to a timeout. It is possible that when a session worker fails, if your activities timeout is shorter than twice the session heartbeat timeout, your activity may fail with a timeout error and the session state will not be failed yet. + +It is also worth noting if a session worker is restarted then it is considered a new session worker and will not pick up any activities scheduled on the old session worker. If you want to be able to keep scheduling activities on the same host after restart look at ../activities-sticky-queues + +Steps to run this sample: +1) You need a Temporal service running. See details in README.md +2) Run the following command multiple times on different console window. This is to simulate running workers on multiple different machines. +``` +go run session-failure/worker/main.go +``` +1) Run the following command to submit a start request for this session failure workflow. +``` +go run session-failure/starter/main.go +``` +1) If you want to observe the workflow recover from a failed session you can restart +the worker you launched in step 2). + +You should see that all activities for one particular workflow execution are scheduled to run on one console window. diff --git a/session-failure/activities.go b/session-failure/activities.go new file mode 100644 index 00000000..a2d78edf --- /dev/null +++ b/session-failure/activities.go @@ -0,0 +1,42 @@ +package fileprocessing + +import ( + "context" + "time" + + "go.temporal.io/sdk/activity" +) + +/** + * Sample activities used by session failure sample workflow. + */ + +type Activities struct { +} + +func (a *Activities) PrepareWorkerActivity(ctx context.Context) error { + logger := activity.GetLogger(ctx) + logger.Info("Preparing session worker") + return nil +} + +func (a *Activities) LongRunningActivity(ctx context.Context) error { + logger := activity.GetLogger(ctx) + logger.Info("Started running long running activity.") + + hbTicker := time.NewTicker(20 * time.Second) + defer hbTicker.Stop() + // Create a 5 minute timer to simulate an activity doing some long work + timer := time.NewTimer(5 * time.Minute) + defer timer.Stop() + for { + select { + case <-hbTicker.C: + activity.RecordHeartbeat(ctx) + case <-timer.C: + return ctx.Err() + case <-ctx.Done(): + return ctx.Err() + } + } +} diff --git a/session-failure/starter/main.go b/session-failure/starter/main.go new file mode 100644 index 00000000..b8d91582 --- /dev/null +++ b/session-failure/starter/main.go @@ -0,0 +1,34 @@ +package main + +import ( + "context" + "log" + + "github.com/pborman/uuid" + "go.temporal.io/sdk/client" + + sessionfailure "github.com/temporalio/samples-go/session-failure" +) + +func main() { + // The client is a heavyweight object that should be created once per process. + c, err := client.Dial(client.Options{ + HostPort: client.DefaultHostPort, + }) + if err != nil { + log.Fatalln("Unable to create client", err) + } + defer c.Close() + + fileID := uuid.New() + workflowOptions := client.StartWorkflowOptions{ + ID: "session_failure_" + fileID, + TaskQueue: "session-failure", + } + + we, err := c.ExecuteWorkflow(context.Background(), workflowOptions, sessionfailure.SampleSessionFailureRecoveryWorkflow, fileID) + if err != nil { + log.Fatalln("Unable to execute workflow", err) + } + log.Println("Started workflow", "WorkflowID", we.GetID(), "RunID", we.GetRunID()) +} diff --git a/session-failure/worker/main.go b/session-failure/worker/main.go new file mode 100644 index 00000000..1b28a0f4 --- /dev/null +++ b/session-failure/worker/main.go @@ -0,0 +1,34 @@ +package main + +import ( + "log" + + "go.temporal.io/sdk/client" + "go.temporal.io/sdk/worker" + + sessionfailure "github.com/temporalio/samples-go/session-failure" +) + +func main() { + // The client and worker are heavyweight objects that should be created once per process. + c, err := client.Dial(client.Options{ + HostPort: client.DefaultHostPort, + }) + if err != nil { + log.Fatalln("Unable to create client", err) + } + defer c.Close() + + workerOptions := worker.Options{ + EnableSessionWorker: true, // Important for a worker to participate in the session + } + w := worker.New(c, "session-failure", workerOptions) + + w.RegisterWorkflow(sessionfailure.SampleSessionFailureRecoveryWorkflow) + w.RegisterActivity(&sessionfailure.Activities{}) + + err = w.Run(worker.InterruptCh()) + if err != nil { + log.Fatalln("Unable to start worker", err) + } +} diff --git a/session-failure/workflow.go b/session-failure/workflow.go new file mode 100644 index 00000000..464cbaed --- /dev/null +++ b/session-failure/workflow.go @@ -0,0 +1,91 @@ +package fileprocessing + +import ( + "errors" + "time" + + "go.temporal.io/sdk/temporal" + "go.temporal.io/sdk/workflow" +) + +var ( + ErrSessionHostDown = errors.New("session host down") +) + +// SampleSessionFailureRecoveryWorkflow workflow definition +func SampleSessionFailureRecoveryWorkflow(ctx workflow.Context, fileName string) (err error) { + + err = runSession(ctx, fileName) + numOfRetries := 10 + for err != nil && numOfRetries >= 0 { + // Only retry if we detected the session failed. In a production application + // it may make sense to also retry if some other errors occur, it + // depends on your business logic. + if errors.Is(err, ErrSessionHostDown) { + workflow.Sleep(ctx, 5*time.Minute) + err = runSession(ctx, fileName) + } else { + break + } + numOfRetries-- + } + + if err != nil { + workflow.GetLogger(ctx).Error("Workflow failed.", "Error", err.Error()) + } else { + workflow.GetLogger(ctx).Info("Workflow completed.") + } + return err +} + +func runSession(ctx workflow.Context, fileName string) (err error) { + + so := &workflow.SessionOptions{ + CreationTimeout: time.Minute, + ExecutionTimeout: 20 * time.Minute, + } + sessionCtx, err := workflow.CreateSession(ctx, so) + if err != nil { + // In a production application you may want to distinguish between not being able to create + // a session and a host going down. + if temporal.IsTimeoutError(err) { + workflow.GetLogger(ctx).Error("Session failed", "Error", err.Error()) + err = ErrSessionHostDown + } + return err + } + + defer func() { + workflow.CompleteSession(sessionCtx) + // If the session host fails any scheduled activity started on the host will be cancelled. + // + // Note: SessionState is inherently a stale view of the session state see the README.md of + // this sample for more details + if workflow.GetSessionInfo(sessionCtx).SessionState == workflow.SessionStateFailed { + err = ErrSessionHostDown + } + }() + + ao := workflow.ActivityOptions{ + StartToCloseTimeout: 10 * time.Minute, + // When running an activity in a session you don't need to specify a heartbeat timeout to + // detect the host going down, the session heartbeat timeout will handle that for you. + // You may still want to specify a heartbeat timeout if the activity can get stuck or + // you want to record progress with the heartbeat details. + HeartbeatTimeout: 40 * time.Second, + RetryPolicy: &temporal.RetryPolicy{ + InitialInterval: time.Second, + BackoffCoefficient: 2.0, + MaximumInterval: time.Minute, + }, + } + sessionCtx = workflow.WithActivityOptions(sessionCtx, ao) + + var a *Activities + err = workflow.ExecuteActivity(sessionCtx, a.PrepareWorkerActivity).Get(sessionCtx, nil) + if err != nil { + return err + } + + return workflow.ExecuteActivity(sessionCtx, a.LongRunningActivity).Get(sessionCtx, nil) +} diff --git a/session-failure/workflow_test.go b/session-failure/workflow_test.go new file mode 100644 index 00000000..34bc2b9c --- /dev/null +++ b/session-failure/workflow_test.go @@ -0,0 +1,40 @@ +package fileprocessing + +import ( + "testing" + + "github.com/stretchr/testify/mock" + "go.temporal.io/sdk/worker" + + "github.com/stretchr/testify/suite" + "go.temporal.io/sdk/testsuite" +) + +type UnitTestSuite struct { + suite.Suite + testsuite.WorkflowTestSuite +} + +func TestUnitTestSuite(t *testing.T) { + suite.Run(t, new(UnitTestSuite)) +} + +func (s *UnitTestSuite) Test_SampleFileProcessingWorkflow() { + env := s.NewTestWorkflowEnvironment() + env.SetWorkerOptions(worker.Options{ + EnableSessionWorker: true, // Important for a worker to participate in the session + }) + var a *Activities + + env.OnActivity(a.PrepareWorkerActivity, mock.Anything).Return(nil) + env.OnActivity(a.LongRunningActivity, mock.Anything).Return(nil) + + env.RegisterActivity(a) + + env.ExecuteWorkflow(SampleSessionFailureRecoveryWorkflow, "file1") + + s.True(env.IsWorkflowCompleted()) + s.NoError(env.GetWorkflowError()) + + env.AssertExpectations(s.T()) +} From 20ee51e1881a3b7b780b2987c79b41f9c16f2305 Mon Sep 17 00:00:00 2001 From: Quinn Klassen Date: Mon, 25 Sep 2023 08:58:21 -0700 Subject: [PATCH 2/2] Refactor retries --- session-failure/starter/main.go | 5 ++--- session-failure/workflow.go | 36 +++++++++++++------------------- session-failure/workflow_test.go | 2 +- 3 files changed, 18 insertions(+), 25 deletions(-) diff --git a/session-failure/starter/main.go b/session-failure/starter/main.go index b8d91582..3aac0d47 100644 --- a/session-failure/starter/main.go +++ b/session-failure/starter/main.go @@ -20,13 +20,12 @@ func main() { } defer c.Close() - fileID := uuid.New() workflowOptions := client.StartWorkflowOptions{ - ID: "session_failure_" + fileID, + ID: "session_failure_" + uuid.New(), TaskQueue: "session-failure", } - we, err := c.ExecuteWorkflow(context.Background(), workflowOptions, sessionfailure.SampleSessionFailureRecoveryWorkflow, fileID) + we, err := c.ExecuteWorkflow(context.Background(), workflowOptions, sessionfailure.SampleSessionFailureRecoveryWorkflow) if err != nil { log.Fatalln("Unable to execute workflow", err) } diff --git a/session-failure/workflow.go b/session-failure/workflow.go index 464cbaed..5eba2f4d 100644 --- a/session-failure/workflow.go +++ b/session-failure/workflow.go @@ -13,32 +13,26 @@ var ( ) // SampleSessionFailureRecoveryWorkflow workflow definition -func SampleSessionFailureRecoveryWorkflow(ctx workflow.Context, fileName string) (err error) { - - err = runSession(ctx, fileName) - numOfRetries := 10 - for err != nil && numOfRetries >= 0 { - // Only retry if we detected the session failed. In a production application - // it may make sense to also retry if some other errors occur, it - // depends on your business logic. - if errors.Is(err, ErrSessionHostDown) { - workflow.Sleep(ctx, 5*time.Minute) - err = runSession(ctx, fileName) +func SampleSessionFailureRecoveryWorkflow(ctx workflow.Context) (err error) { + for retryNum := 0; retryNum < 10; retryNum++ { + if err = runSession(ctx); errors.Is(err, ErrSessionHostDown) { + if sleepErr := workflow.Sleep(ctx, 5*time.Minute); sleepErr != nil { + return sleepErr + } + continue + } + if err != nil { + workflow.GetLogger(ctx).Error("Workflow failed.", "Error", err.Error()) } else { - break + workflow.GetLogger(ctx).Info("Workflow completed.") } - numOfRetries-- - } - - if err != nil { - workflow.GetLogger(ctx).Error("Workflow failed.", "Error", err.Error()) - } else { - workflow.GetLogger(ctx).Info("Workflow completed.") + return } - return err + workflow.GetLogger(ctx).Error("Workflow failed after multiple session retries.", "Error", err.Error()) + return } -func runSession(ctx workflow.Context, fileName string) (err error) { +func runSession(ctx workflow.Context) (err error) { so := &workflow.SessionOptions{ CreationTimeout: time.Minute, diff --git a/session-failure/workflow_test.go b/session-failure/workflow_test.go index 34bc2b9c..16411757 100644 --- a/session-failure/workflow_test.go +++ b/session-failure/workflow_test.go @@ -31,7 +31,7 @@ func (s *UnitTestSuite) Test_SampleFileProcessingWorkflow() { env.RegisterActivity(a) - env.ExecuteWorkflow(SampleSessionFailureRecoveryWorkflow, "file1") + env.ExecuteWorkflow(SampleSessionFailureRecoveryWorkflow) s.True(env.IsWorkflowCompleted()) s.NoError(env.GetWorkflowError())