Skip to content

Commit

Permalink
Add replay option to disable deadlock detection (#1221)
Browse files Browse the repository at this point in the history
  • Loading branch information
jlegrone authored Sep 1, 2023
1 parent 90e7399 commit c514225
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 0 deletions.
9 changes: 9 additions & 0 deletions internal/internal_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -1122,6 +1122,7 @@ type WorkflowReplayer struct {
failureConverter converter.FailureConverter
contextPropagators []ContextPropagator
enableLoggingInReplay bool
disableDeadlockDetection bool
mu sync.Mutex
workflowExecutionResults map[string]*commonpb.Payloads
}
Expand Down Expand Up @@ -1153,6 +1154,10 @@ type WorkflowReplayerOptions struct {
// This is only useful for debugging purpose.
// default: false
EnableLoggingInReplay bool

// Optional: Disable the default 1 second deadlock detection timeout. This option can be used to step through
// workflow code with multiple breakpoints in a debugger.
DisableDeadlockDetection bool
}

// ReplayWorkflowHistoryOptions are options for replaying a workflow.
Expand All @@ -1172,6 +1177,7 @@ func NewWorkflowReplayer(options WorkflowReplayerOptions) (*WorkflowReplayer, er
failureConverter: options.FailureConverter,
contextPropagators: options.ContextPropagators,
enableLoggingInReplay: options.EnableLoggingInReplay,
disableDeadlockDetection: options.DisableDeadlockDetection,
workflowExecutionResults: make(map[string]*commonpb.Payloads),
}, nil
}
Expand Down Expand Up @@ -1370,6 +1376,9 @@ func (aw *WorkflowReplayer) replayWorkflowHistory(logger log.Logger, service wor
SdkMetadata: true,
},
}
if aw.disableDeadlockDetection {
params.DeadlockDetectionTimeout = math.MaxInt64
}
taskHandler := newWorkflowTaskHandler(params, nil, aw.registry)
wfctx, err := taskHandler.GetOrCreateWorkflowContext(task, iterator)
defer wfctx.Unlock(err)
Expand Down
91 changes: 91 additions & 0 deletions test/replaytests/deadlocked-workflow.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
{
"events": [
{
"eventId": "1",
"eventTime": "2020-07-30T00:30:02.971655189Z",
"eventType": "WorkflowExecutionStarted",
"version": "-24",
"taskId": "1048576",
"workflowExecutionStartedEventAttributes": {
"workflowType": {
"name": "DeadlockedWorkflow"
},
"taskQueue": {
"name": "replay-test",
"kind": "Normal"
},
"input": {
"payloads": [
{
"metadata": {
"encoding": "anNvbi9wbGFpbg=="
},
"data": "IldvcmtmbG93MSI="
}
]
},
"workflowExecutionTimeout": "315360000s",
"workflowRunTimeout": "315360000s",
"workflowTaskTimeout": "10s",
"initiator": "Workflow",
"originalExecutionRunId": "32c62bbb-dfa3-4558-8bab-11cd5b4e17b7",
"identity": "22866@ShtinUbuntu2@",
"firstExecutionRunId": "32c62bbb-dfa3-4558-8bab-11cd5b4e17b7",
"attempt": 1,
"workflowExecutionExpirationTime": "0001-01-01T00:00:00Z",
"firstWorkflowTaskBackoff": "0s",
"header": {}
}
},
{
"eventId": "2",
"eventTime": "2020-07-30T00:30:02.971668264Z",
"eventType": "WorkflowTaskScheduled",
"version": "-24",
"taskId": "1048577",
"workflowTaskScheduledEventAttributes": {
"taskQueue": {
"name": "replay-test",
"kind": "Normal"
},
"startToCloseTimeout": "10s",
"attempt": "1"
}
},
{
"eventId": "3",
"eventTime": "2020-07-30T00:30:02.981403193Z",
"eventType": "WorkflowTaskStarted",
"version": "-24",
"taskId": "1048582",
"workflowTaskStartedEventAttributes": {
"scheduledEventId": "2",
"identity": "22866@ShtinUbuntu2@",
"requestId": "43107987-202a-44ca-b718-4aecc6cd6f3b"
}
},
{
"eventId": "4",
"eventTime": "2020-07-30T00:30:02.992586820Z",
"eventType": "WorkflowTaskCompleted",
"version": "-24",
"taskId": "1048585",
"workflowTaskCompletedEventAttributes": {
"scheduledEventId": "2",
"startedEventId": "3",
"identity": "22866@ShtinUbuntu2@",
"binaryChecksum": "01c85c2da1ff4eb3ef3641a5746edef0"
}
},
{
"eventId": "5",
"eventTime": "2020-07-30T00:30:03.070438610Z",
"eventType": "WorkflowExecutionCompleted",
"version": "-24",
"taskId": "1048640",
"workflowExecutionCompletedEventAttributes": {
"workflowTaskCompletedEventId": "4"
}
}
]
}
18 changes: 18 additions & 0 deletions test/replaytests/replay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/stretchr/testify/suite"
commonpb "go.temporal.io/api/common/v1"
"go.temporal.io/api/workflowservicemock/v1"

"go.temporal.io/sdk/client"
"go.temporal.io/sdk/converter"
"go.temporal.io/sdk/internal"
Expand Down Expand Up @@ -360,6 +361,23 @@ func TestReplayCustomConverter(t *testing.T) {
require.Contains(t, conv.fromPayloads, "Hello Workflow2!")
}

func TestReplayDeadlockDetection(t *testing.T) {
defaultReplayer := worker.NewWorkflowReplayer()
noDeadlockReplayer, err := worker.NewWorkflowReplayerWithOptions(worker.WorkflowReplayerOptions{
DisableDeadlockDetection: true,
})
require.NoError(t, err)

defaultReplayer.RegisterWorkflow(DeadlockedWorkflow)
noDeadlockReplayer.RegisterWorkflow(DeadlockedWorkflow)

err = defaultReplayer.ReplayWorkflowHistoryFromJSONFile(ilog.NewDefaultLogger(), "deadlocked-workflow.json")
require.Error(t, err)

err = noDeadlockReplayer.ReplayWorkflowHistoryFromJSONFile(ilog.NewDefaultLogger(), "deadlocked-workflow.json")
require.NoError(t, err)
}

func (s *replayTestSuite) TestVersionAndMutableSideEffect() {
replayer := worker.NewWorkflowReplayer()
replayer.RegisterWorkflow(VersionAndMutableSideEffectWorkflow)
Expand Down
7 changes: 7 additions & 0 deletions test/replaytests/workflows.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"time"

"github.com/google/uuid"

"go.temporal.io/sdk/activity"
"go.temporal.io/sdk/temporal"
"go.temporal.io/sdk/workflow"
Expand Down Expand Up @@ -221,6 +222,12 @@ func EmptyWorkflow(ctx workflow.Context, _ string) error {
return nil
}

func DeadlockedWorkflow(ctx workflow.Context, _ string) error {
// Sleep for just over 1 second to trigger deadlock detection
time.Sleep(1100 * time.Millisecond)
return nil
}

func MutableSideEffectWorkflow(ctx workflow.Context) ([]int, error) {
f := func(retVal int) (newVal int) {
err := workflow.MutableSideEffect(
Expand Down

0 comments on commit c514225

Please sign in to comment.