Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix lost signal from Selector when Default path blocks #1682

Merged
merged 11 commits into from
Nov 18, 2024
132 changes: 132 additions & 0 deletions internal/internal_coroutines_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -551,6 +551,138 @@ func TestBlockingSelect(t *testing.T) {
require.EqualValues(t, expected, history)
}

func TestSelectBlockingDefault(t *testing.T) {
yuandrew marked this conversation as resolved.
Show resolved Hide resolved
// manually create a dispatcher to ensure sdkFlags are not set
var history []string
env := &workflowEnvironmentImpl{
sdkFlags: &sdkFlags{},
commandsHelper: newCommandsHelper(),
dataConverter: converter.GetDefaultDataConverter(),
workflowInfo: &WorkflowInfo{
Namespace: "namespace:" + t.Name(),
TaskQueueName: "taskqueue:" + t.Name(),
},
}
interceptor, ctx, err := newWorkflowContext(env, nil)
require.NoError(t, err, "newWorkflowContext failed")
d, _ := newDispatcher(ctx, interceptor, func(ctx Context) {
c1 := NewChannel(ctx)
c2 := NewChannel(ctx)

Go(ctx, func(ctx Context) {
history = append(history, "add-one")
c1.Send(ctx, "one")
history = append(history, "add-one-done")

})

Go(ctx, func(ctx Context) {
history = append(history, "add-two")
c2.Send(ctx, "two")
history = append(history, "add-two-done")
})

selector := NewSelector(ctx)
var v string
selector.
AddReceive(c1, func(c ReceiveChannel, more bool) {
c.Receive(ctx, &v)
history = append(history, fmt.Sprintf("c1-%v", v))
}).
AddDefault(func() {
c2.Receive(ctx, &v)
history = append(history, fmt.Sprintf("c2-%v", v))
})
history = append(history, "select1")
require.False(t, selector.HasPending())
selector.Select(ctx)

// Default behavior this signal is lost
require.True(t, c1.Len() == 0 && v == "two")

history = append(history, "select2")
require.False(t, selector.HasPending())
history = append(history, "done")
}, func() bool { return false })
defer d.Close()
requireNoExecuteErr(t, d.ExecuteUntilAllBlocked(defaultDeadlockDetectionTimeout))
require.True(t, d.IsDone())

expected := []string{
"select1",
"add-one",
"add-one-done",
"add-two",
"add-two-done",
"c2-two",
"select2",
"done",
}
require.EqualValues(t, expected, history)
}

func TestSelectBlockingDefaultWithFlag(t *testing.T) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reported bug was that blocking in the default case of a selector could cause signals to be lost, when I last looked at these tests we didn't seem to have any coverage for blocking in one selector case while a signal is received. Can we add tests to verify their is no bugs if a signal is received while blocking in another case of a selector, not just default?

// sdkFlags are set by default for tests
var history []string
d := createNewDispatcher(func(ctx Context) {
c1 := NewChannel(ctx)
c2 := NewChannel(ctx)

Go(ctx, func(ctx Context) {
history = append(history, "add-one")
c1.Send(ctx, "one")
history = append(history, "add-one-done")

})

Go(ctx, func(ctx Context) {
history = append(history, "add-two")
c2.Send(ctx, "two")
history = append(history, "add-two-done")
})

selector := NewSelector(ctx)
var v string
selector.
AddReceive(c1, func(c ReceiveChannel, more bool) {
c.Receive(ctx, &v)
history = append(history, fmt.Sprintf("c1-%v", v))
}).
AddDefault(func() {
c2.Receive(ctx, &v)
history = append(history, fmt.Sprintf("c2-%v", v))
})
history = append(history, "select1")
require.False(t, selector.HasPending())
selector.Select(ctx)

// Signal should not be lost
require.False(t, c1.Len() == 0 && v == "two")

history = append(history, "select2")
require.True(t, selector.HasPending())
selector.Select(ctx)
require.False(t, selector.HasPending())
history = append(history, "done")
})
defer d.Close()
requireNoExecuteErr(t, d.ExecuteUntilAllBlocked(defaultDeadlockDetectionTimeout))
require.True(t, d.IsDone())

expected := []string{
"select1",
"add-one",
"add-one-done",
"add-two",
"add-two-done",
"c2-two",
"select2",
"c1-one",
"done",
}
require.EqualValues(t, expected, history)
}

func TestBlockingSelectAsyncSend(t *testing.T) {
var history []string
d := createNewDispatcher(func(ctx Context) {
Expand Down
7 changes: 6 additions & 1 deletion internal/internal_flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,10 @@ const (
// SDKPriorityUpdateHandling will cause update request to be handled before the main workflow method.
// It will also cause the SDK to immediately handle updates when a handler is registered.
SDKPriorityUpdateHandling = 4
SDKFlagUnknown = math.MaxUint32
// SDKFlagBlockedSelectorSignalReceive will cause a signal to not be lost
// when the Default path is blocked.
SDKFlagBlockedSelectorSignalReceive = 5
SDKFlagUnknown = math.MaxUint32
)

func sdkFlagFromUint(value uint32) sdkFlag {
Expand All @@ -62,6 +65,8 @@ func sdkFlagFromUint(value uint32) sdkFlag {
return SDKFlagProtocolMessageCommand
case uint32(SDKPriorityUpdateHandling):
return SDKPriorityUpdateHandling
case uint32(SDKFlagBlockedSelectorSignalReceive):
return SDKFlagBlockedSelectorSignalReceive
default:
return SDKFlagUnknown
}
Expand Down
11 changes: 10 additions & 1 deletion internal/internal_workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -1407,8 +1407,17 @@ func (s *selectorImpl) Select(ctx Context) {
if readyBranch != nil {
return false
}
readyBranch = func() {
// readyBranch is not executed when AddDefault is specified,
// setting the value here prevents the signal from being dropped
dropSignalFlag := getWorkflowEnvironment(ctx).TryUse(SDKFlagBlockedSelectorSignalReceive)
yuandrew marked this conversation as resolved.
Show resolved Hide resolved
if dropSignalFlag {
c.recValue = &v
}

readyBranch = func() {
if !dropSignalFlag {
c.recValue = &v
}
f(c, more)
}
return true
Expand Down
36 changes: 36 additions & 0 deletions internal/internal_workflow_testsuite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4240,3 +4240,39 @@ func (s *WorkflowTestSuiteUnitTest) Test_SameWorkflowAndActivityNames() {
s.Require().True(env.IsWorkflowCompleted())
s.Require().NoError(env.GetWorkflowError())
}

func (s *WorkflowTestSuiteUnitTest) Test_SignalLoss() {
workflowFn := func(ctx Context) error {
ch1 := GetSignalChannel(ctx, "test-signal")
ch2 := GetSignalChannel(ctx, "test-signal-2")
selector := NewSelector(ctx)
var v string
selector.AddReceive(ch1, func(c ReceiveChannel, more bool) {
c.Receive(ctx, &v)
fmt.Println("received signal from ch1")
})
selector.AddDefault(func() {
ch2.Receive(ctx, &v)
fmt.Println("received signal from ch2")
})
selector.Select(ctx)
fmt.Println("ch1.Len()", ch1.Len(), "s", v)
// testWorkflowEnvironmentImpl.TryUse always returns true for flags
// test for fixed behavior
s.Require().True(ch1.Len() == 1 && v == "s2")

return nil
}

// send a signal 5 seconds after workflow started
env := s.NewTestWorkflowEnvironment()
env.RegisterDelayedCallback(func() {
fmt.Println("sending signal to 1")
env.SignalWorkflow("test-signal", "s1")
fmt.Println("sending signal to 2")
env.SignalWorkflow("test-signal-2", "s2")
}, 5*time.Second)
env.ExecuteWorkflow(workflowFn)
s.True(env.IsWorkflowCompleted())
s.NoError(env.GetWorkflowError())
}
10 changes: 10 additions & 0 deletions test/replaytests/replay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,16 @@ func (s *replayTestSuite) TestGogoprotoPayloadWorkflow() {
s.NoError(err)
}

func (s *replayTestSuite) TestSelectorBlockingDefault() {
replayer := worker.NewWorkflowReplayer()
replayer.RegisterWorkflow(SelectorBlockingDefaultWorkflow)
// Verify we can still replay an old workflow that does
// not have the SDKFlagBlockedSelectorSignalReceive flag
err := replayer.ReplayWorkflowHistoryFromJSONFile(ilog.NewDefaultLogger(), "selector-blocking-default.json")
s.NoError(err)
require.NoError(s.T(), err)
}

type captureConverter struct {
converter.DataConverter
toPayloads []interface{}
Expand Down
89 changes: 89 additions & 0 deletions test/replaytests/selector-blocking-default.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
{
"events": [
{
"eventId": "1",
"eventTime": "2024-10-21T23:39:08.991521Z",
"eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_STARTED",
"taskId": "1048587",
"workflowExecutionStartedEventAttributes": {
"workflowType": {
"name": "SelectorBlockingDefaultWorkflow"
},
"taskQueue": {
"name": "hello-world",
"kind": "TASK_QUEUE_KIND_NORMAL"
},
"workflowExecutionTimeout": "0s",
"workflowRunTimeout": "0s",
"workflowTaskTimeout": "10s",
"originalExecutionRunId": "dde5f879-0e59-47e8-8048-ac0f164866fd",
"identity": "[email protected]@",
"firstExecutionRunId": "dde5f879-0e59-47e8-8048-ac0f164866fd",
"attempt": 1,
"firstWorkflowTaskBackoff": "0s",
"header": {},
"workflowId": "hello_world_workflowID"
}
},
{
"eventId": "2",
"eventTime": "2024-10-21T23:39:08.991569Z",
"eventType": "EVENT_TYPE_WORKFLOW_TASK_SCHEDULED",
"taskId": "1048588",
"workflowTaskScheduledEventAttributes": {
"taskQueue": {
"name": "hello-world",
"kind": "TASK_QUEUE_KIND_NORMAL"
},
"startToCloseTimeout": "10s",
"attempt": 1
}
},
{
"eventId": "3",
"eventTime": "2024-10-21T23:39:08.994898Z",
"eventType": "EVENT_TYPE_WORKFLOW_TASK_STARTED",
"taskId": "1048593",
"workflowTaskStartedEventAttributes": {
"scheduledEventId": "2",
"identity": "[email protected]@",
"requestId": "a7a50c99-1d0d-449c-9d75-09458ac1e7af",
"historySizeBytes": "282",
"workerVersion": {
"buildId": "e15e79cbae5f5acc33774a930eed2f97"
}
}
},
{
"eventId": "4",
"eventTime": "2024-10-21T23:39:08.999006Z",
"eventType": "EVENT_TYPE_WORKFLOW_TASK_COMPLETED",
"taskId": "1048597",
"workflowTaskCompletedEventAttributes": {
"scheduledEventId": "2",
"startedEventId": "3",
"identity": "[email protected]@",
"workerVersion": {
"buildId": "e15e79cbae5f5acc33774a930eed2f97"
},
"sdkMetadata": {
"langUsedFlags": [
3
],
"sdkName": "temporal-go",
"sdkVersion": "1.29.1"
},
"meteringMetadata": {}
}
},
{
"eventId": "5",
"eventTime": "2024-10-21T23:39:08.999055Z",
"eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED",
"taskId": "1048598",
"workflowExecutionCompletedEventAttributes": {
"workflowTaskCompletedEventId": "4"
}
}
]
}
46 changes: 46 additions & 0 deletions test/replaytests/workflows.go
Original file line number Diff line number Diff line change
Expand Up @@ -610,3 +610,49 @@ func ListAndDescribeWorkflow(ctx workflow.Context) (int, error) {
}
return len(result.Executions), nil
}

func SelectorBlockingDefaultWorkflow(ctx workflow.Context) error {
logger := workflow.GetLogger(ctx)
ao := workflow.ActivityOptions{
StartToCloseTimeout: 10 * time.Second,
}
ctx = workflow.WithActivityOptions(ctx, ao)

ch1 := workflow.NewChannel(ctx)
ch2 := workflow.NewChannel(ctx)

workflow.Go(ctx, func(ctx workflow.Context) {
ch1.Send(ctx, "one")

})

workflow.Go(ctx, func(ctx workflow.Context) {
ch2.Send(ctx, "two")
})

selector := workflow.NewSelector(ctx)
var s string
selector.AddReceive(ch1, func(c workflow.ReceiveChannel, more bool) {
c.Receive(ctx, &s)
})
selector.AddDefault(func() {
ch2.Receive(ctx, &s)
})
selector.Select(ctx)
yuandrew marked this conversation as resolved.
Show resolved Hide resolved
if ch1.Len() == 0 && s == "two" {
logger.Info("Signal in ch1 lost")
return nil
} else {
var result string
activity := workflow.ExecuteActivity(ctx, SelectorBlockingDefaultActivity, "Signal not lost")
activity.Get(ctx, &result)
logger.Info("Result", result)
}
return nil
}

func SelectorBlockingDefaultActivity(ctx context.Context, value string) (string, error) {
logger := activity.GetLogger(ctx)
logger.Info("Activity", "value", value)
return value + " was logged!", nil
}
Loading