From c514225d0f1e00266eb031ee429d69475c553a10 Mon Sep 17 00:00:00 2001 From: Jacob LeGrone Date: Fri, 1 Sep 2023 10:50:24 -0400 Subject: [PATCH] Add replay option to disable deadlock detection (#1221) --- internal/internal_worker.go | 9 +++ test/replaytests/deadlocked-workflow.json | 91 +++++++++++++++++++++++ test/replaytests/replay_test.go | 18 +++++ test/replaytests/workflows.go | 7 ++ 4 files changed, 125 insertions(+) create mode 100644 test/replaytests/deadlocked-workflow.json diff --git a/internal/internal_worker.go b/internal/internal_worker.go index 211834665..39947ceb5 100644 --- a/internal/internal_worker.go +++ b/internal/internal_worker.go @@ -1122,6 +1122,7 @@ type WorkflowReplayer struct { failureConverter converter.FailureConverter contextPropagators []ContextPropagator enableLoggingInReplay bool + disableDeadlockDetection bool mu sync.Mutex workflowExecutionResults map[string]*commonpb.Payloads } @@ -1153,6 +1154,10 @@ type WorkflowReplayerOptions struct { // This is only useful for debugging purpose. // default: false EnableLoggingInReplay bool + + // Optional: Disable the default 1 second deadlock detection timeout. This option can be used to step through + // workflow code with multiple breakpoints in a debugger. + DisableDeadlockDetection bool } // ReplayWorkflowHistoryOptions are options for replaying a workflow. @@ -1172,6 +1177,7 @@ func NewWorkflowReplayer(options WorkflowReplayerOptions) (*WorkflowReplayer, er failureConverter: options.FailureConverter, contextPropagators: options.ContextPropagators, enableLoggingInReplay: options.EnableLoggingInReplay, + disableDeadlockDetection: options.DisableDeadlockDetection, workflowExecutionResults: make(map[string]*commonpb.Payloads), }, nil } @@ -1370,6 +1376,9 @@ func (aw *WorkflowReplayer) replayWorkflowHistory(logger log.Logger, service wor SdkMetadata: true, }, } + if aw.disableDeadlockDetection { + params.DeadlockDetectionTimeout = math.MaxInt64 + } taskHandler := newWorkflowTaskHandler(params, nil, aw.registry) wfctx, err := taskHandler.GetOrCreateWorkflowContext(task, iterator) defer wfctx.Unlock(err) diff --git a/test/replaytests/deadlocked-workflow.json b/test/replaytests/deadlocked-workflow.json new file mode 100644 index 000000000..b7a03c789 --- /dev/null +++ b/test/replaytests/deadlocked-workflow.json @@ -0,0 +1,91 @@ +{ + "events": [ + { + "eventId": "1", + "eventTime": "2020-07-30T00:30:02.971655189Z", + "eventType": "WorkflowExecutionStarted", + "version": "-24", + "taskId": "1048576", + "workflowExecutionStartedEventAttributes": { + "workflowType": { + "name": "DeadlockedWorkflow" + }, + "taskQueue": { + "name": "replay-test", + "kind": "Normal" + }, + "input": { + "payloads": [ + { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "IldvcmtmbG93MSI=" + } + ] + }, + "workflowExecutionTimeout": "315360000s", + "workflowRunTimeout": "315360000s", + "workflowTaskTimeout": "10s", + "initiator": "Workflow", + "originalExecutionRunId": "32c62bbb-dfa3-4558-8bab-11cd5b4e17b7", + "identity": "22866@ShtinUbuntu2@", + "firstExecutionRunId": "32c62bbb-dfa3-4558-8bab-11cd5b4e17b7", + "attempt": 1, + "workflowExecutionExpirationTime": "0001-01-01T00:00:00Z", + "firstWorkflowTaskBackoff": "0s", + "header": {} + } + }, + { + "eventId": "2", + "eventTime": "2020-07-30T00:30:02.971668264Z", + "eventType": "WorkflowTaskScheduled", + "version": "-24", + "taskId": "1048577", + "workflowTaskScheduledEventAttributes": { + "taskQueue": { + "name": "replay-test", + "kind": "Normal" + }, + "startToCloseTimeout": "10s", + "attempt": "1" + } + }, + { + "eventId": "3", + "eventTime": "2020-07-30T00:30:02.981403193Z", + "eventType": "WorkflowTaskStarted", + "version": "-24", + "taskId": "1048582", + "workflowTaskStartedEventAttributes": { + "scheduledEventId": "2", + "identity": "22866@ShtinUbuntu2@", + "requestId": "43107987-202a-44ca-b718-4aecc6cd6f3b" + } + }, + { + "eventId": "4", + "eventTime": "2020-07-30T00:30:02.992586820Z", + "eventType": "WorkflowTaskCompleted", + "version": "-24", + "taskId": "1048585", + "workflowTaskCompletedEventAttributes": { + "scheduledEventId": "2", + "startedEventId": "3", + "identity": "22866@ShtinUbuntu2@", + "binaryChecksum": "01c85c2da1ff4eb3ef3641a5746edef0" + } + }, + { + "eventId": "5", + "eventTime": "2020-07-30T00:30:03.070438610Z", + "eventType": "WorkflowExecutionCompleted", + "version": "-24", + "taskId": "1048640", + "workflowExecutionCompletedEventAttributes": { + "workflowTaskCompletedEventId": "4" + } + } + ] +} \ No newline at end of file diff --git a/test/replaytests/replay_test.go b/test/replaytests/replay_test.go index fda209f0c..871ed2ad8 100644 --- a/test/replaytests/replay_test.go +++ b/test/replaytests/replay_test.go @@ -34,6 +34,7 @@ import ( "github.com/stretchr/testify/suite" commonpb "go.temporal.io/api/common/v1" "go.temporal.io/api/workflowservicemock/v1" + "go.temporal.io/sdk/client" "go.temporal.io/sdk/converter" "go.temporal.io/sdk/internal" @@ -360,6 +361,23 @@ func TestReplayCustomConverter(t *testing.T) { require.Contains(t, conv.fromPayloads, "Hello Workflow2!") } +func TestReplayDeadlockDetection(t *testing.T) { + defaultReplayer := worker.NewWorkflowReplayer() + noDeadlockReplayer, err := worker.NewWorkflowReplayerWithOptions(worker.WorkflowReplayerOptions{ + DisableDeadlockDetection: true, + }) + require.NoError(t, err) + + defaultReplayer.RegisterWorkflow(DeadlockedWorkflow) + noDeadlockReplayer.RegisterWorkflow(DeadlockedWorkflow) + + err = defaultReplayer.ReplayWorkflowHistoryFromJSONFile(ilog.NewDefaultLogger(), "deadlocked-workflow.json") + require.Error(t, err) + + err = noDeadlockReplayer.ReplayWorkflowHistoryFromJSONFile(ilog.NewDefaultLogger(), "deadlocked-workflow.json") + require.NoError(t, err) +} + func (s *replayTestSuite) TestVersionAndMutableSideEffect() { replayer := worker.NewWorkflowReplayer() replayer.RegisterWorkflow(VersionAndMutableSideEffectWorkflow) diff --git a/test/replaytests/workflows.go b/test/replaytests/workflows.go index b30e6972e..6d7467b50 100644 --- a/test/replaytests/workflows.go +++ b/test/replaytests/workflows.go @@ -31,6 +31,7 @@ import ( "time" "github.com/google/uuid" + "go.temporal.io/sdk/activity" "go.temporal.io/sdk/temporal" "go.temporal.io/sdk/workflow" @@ -221,6 +222,12 @@ func EmptyWorkflow(ctx workflow.Context, _ string) error { return nil } +func DeadlockedWorkflow(ctx workflow.Context, _ string) error { + // Sleep for just over 1 second to trigger deadlock detection + time.Sleep(1100 * time.Millisecond) + return nil +} + func MutableSideEffectWorkflow(ctx workflow.Context) ([]int, error) { f := func(retVal int) (newVal int) { err := workflow.MutableSideEffect(