Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix lost signal from Selector when Default path blocks #1682

Merged
merged 11 commits into from
Nov 18, 2024
249 changes: 249 additions & 0 deletions internal/internal_coroutines_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"go.temporal.io/api/workflowservice/v1"
"go.temporal.io/sdk/converter"
)

Expand Down Expand Up @@ -551,6 +552,254 @@ func TestBlockingSelect(t *testing.T) {
require.EqualValues(t, expected, history)
}

func TestSelectBlockingDefault(t *testing.T) {
yuandrew marked this conversation as resolved.
Show resolved Hide resolved
var history []string
env := &workflowEnvironmentImpl{
sdkFlags: newSDKFlags(&workflowservice.GetSystemInfoResponse_Capabilities{SdkMetadata: true}),
commandsHelper: newCommandsHelper(),
dataConverter: converter.GetDefaultDataConverter(),
workflowInfo: &WorkflowInfo{
Namespace: "namespace:" + t.Name(),
TaskQueueName: "taskqueue:" + t.Name(),
},
}
// Verify that the flag is not set
require.False(t, env.GetFlag(SDKFlagBlockedSelectorSignalReceive))
interceptor, ctx, err := newWorkflowContext(env, nil)
require.NoError(t, err, "newWorkflowContext failed")
d, _ := newDispatcher(ctx, interceptor, func(ctx Context) {
c1 := NewChannel(ctx)
c2 := NewChannel(ctx)

Go(ctx, func(ctx Context) {
history = append(history, "add-one")
c1.Send(ctx, "one")
history = append(history, "add-one-done")

})

Go(ctx, func(ctx Context) {
history = append(history, "add-two")
c2.Send(ctx, "two")
history = append(history, "add-two-done")
})

selector := NewSelector(ctx)
var v string
selector.
AddReceive(c1, func(c ReceiveChannel, more bool) {
c.Receive(ctx, &v)
history = append(history, fmt.Sprintf("c1-%v", v))
}).
AddDefault(func() {
c2.Receive(ctx, &v)
history = append(history, fmt.Sprintf("c2-%v", v))
})
history = append(history, "select1")
selector.Select(ctx)

// Default behavior this signal is lost
require.True(t, c1.Len() == 0 && v == "two")

history = append(history, "select2")
selector.Select(ctx)
history = append(history, "done")
}, func() bool { return false })
defer d.Close()
requireNoExecuteErr(t, d.ExecuteUntilAllBlocked(defaultDeadlockDetectionTimeout))
require.False(t, d.IsDone())

expected := []string{
"select1",
"add-one",
"add-one-done",
"add-two",
"add-two-done",
"c2-two",
"select2",
}
require.EqualValues(t, expected, history)
}

func TestSelectBlockingDefaultWithFlag(t *testing.T) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reported bug was that blocking in the default case of a selector could cause signals to be lost, when I last looked at these tests we didn't seem to have any coverage for blocking in one selector case while a signal is received. Can we add tests to verify their is no bugs if a signal is received while blocking in another case of a selector, not just default?

var history []string
env := &workflowEnvironmentImpl{
sdkFlags: newSDKFlags(&workflowservice.GetSystemInfoResponse_Capabilities{SdkMetadata: true}),
commandsHelper: newCommandsHelper(),
dataConverter: converter.GetDefaultDataConverter(),
workflowInfo: &WorkflowInfo{
Namespace: "namespace:" + t.Name(),
TaskQueueName: "taskqueue:" + t.Name(),
},
}
require.True(t, env.TryUse(SDKFlagBlockedSelectorSignalReceive))
interceptor, ctx, err := newWorkflowContext(env, nil)
require.NoError(t, err, "newWorkflowContext failed")
d, _ := newDispatcher(ctx, interceptor, func(ctx Context) {
c1 := NewChannel(ctx)
c2 := NewChannel(ctx)

Go(ctx, func(ctx Context) {
history = append(history, "add-one")
c1.Send(ctx, "one")
history = append(history, "add-one-done")

})

Go(ctx, func(ctx Context) {
history = append(history, "add-two")
c2.Send(ctx, "two")
history = append(history, "add-two-done")
})

selector := NewSelector(ctx)
var v string
selector.
AddReceive(c1, func(c ReceiveChannel, more bool) {
c.Receive(ctx, &v)
history = append(history, fmt.Sprintf("c1-%v", v))
}).
AddDefault(func() {
c2.Receive(ctx, &v)
history = append(history, fmt.Sprintf("c2-%v", v))
})
history = append(history, "select1")
selector.Select(ctx)

// Signal should not be lost
require.False(t, c1.Len() == 0 && v == "two")

history = append(history, "select2")
selector.Select(ctx)
history = append(history, "done")
}, func() bool { return false })
defer d.Close()
requireNoExecuteErr(t, d.ExecuteUntilAllBlocked(defaultDeadlockDetectionTimeout))
require.True(t, d.IsDone())

expected := []string{
"select1",
"add-one",
"add-one-done",
"add-two",
"add-two-done",
"c2-two",
"select2",
"c1-one",
"done",
}

require.EqualValues(t, expected, history)
}

func TestBlockingSelectFuture(t *testing.T) {
var history []string
d := createNewDispatcher(func(ctx Context) {
c1 := NewChannel(ctx)
f1, s1 := NewFuture(ctx)

Go(ctx, func(ctx Context) {
history = append(history, "add-one")
c1.Send(ctx, "one")
history = append(history, "add-one-done")
})
Go(ctx, func(ctx Context) {
history = append(history, "add-two")
s1.SetValue("one-future")
})

selector := NewSelector(ctx)
selector.
AddReceive(c1, func(c ReceiveChannel, more bool) {
var v string
c.Receive(ctx, &v)
history = append(history, fmt.Sprintf("c1-%v", v))
}).
AddFuture(f1, func(f Future) {
var v string
err := f.Get(ctx, &v)
require.NoError(t, err)
history = append(history, fmt.Sprintf("f1-%v", v))
})
history = append(history, "select1")
selector.Select(ctx)
fmt.Println("select1 done", history)

history = append(history, "select2")
selector.Select(ctx)
history = append(history, "done")

})
defer d.Close()
requireNoExecuteErr(t, d.ExecuteUntilAllBlocked(defaultDeadlockDetectionTimeout))
require.True(t, d.IsDone(), strings.Join(history, "\n"))
expected := []string{
"select1",
"add-one",
"add-one-done",
"add-two",
"c1-one",
"select2",
"f1-one-future",
"done",
}
require.EqualValues(t, expected, history)
}

func TestBlockingSelectSend(t *testing.T) {
var history []string
d := createNewDispatcher(func(ctx Context) {
c1 := NewChannel(ctx)
c2 := NewChannel(ctx)

Go(ctx, func(ctx Context) {
history = append(history, "add-one")
c1.Send(ctx, "one")
history = append(history, "add-one-done")
})
Go(ctx, func(ctx Context) {
require.True(t, c2.Len() == 1)
history = append(history, "receiver")
var v string
more := c2.Receive(ctx, &v)
require.True(t, more)
history = append(history, fmt.Sprintf("c2-%v", v))
require.True(t, c2.Len() == 0)
})

selector := NewSelector(ctx)
selector.
AddReceive(c1, func(c ReceiveChannel, more bool) {
var v string
c.Receive(ctx, &v)
history = append(history, fmt.Sprintf("c1-%v", v))
}).
AddSend(c2, "two", func() { history = append(history, "send2") })
history = append(history, "select1")
selector.Select(ctx)

history = append(history, "select2")
selector.Select(ctx)
history = append(history, "done")

})
defer d.Close()
requireNoExecuteErr(t, d.ExecuteUntilAllBlocked(defaultDeadlockDetectionTimeout))
require.True(t, d.IsDone(), strings.Join(history, "\n"))
expected := []string{
"select1",
"add-one",
"add-one-done",
"receiver",
"c1-one",
"select2",
"send2",
"done",
"c2-two",
}
require.EqualValues(t, expected, history)
}

func TestBlockingSelectAsyncSend(t *testing.T) {
var history []string
d := createNewDispatcher(func(ctx Context) {
Expand Down
4 changes: 4 additions & 0 deletions internal/internal_event_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -978,6 +978,10 @@ func (wc *workflowEnvironmentImpl) TryUse(flag sdkFlag) bool {
return wc.sdkFlags.tryUse(flag, !wc.isReplay)
}

func (wc *workflowEnvironmentImpl) GetFlag(flag sdkFlag) bool {
return wc.sdkFlags.getFlag(flag)
}

func (wc *workflowEnvironmentImpl) QueueUpdate(name string, f func()) {
wc.bufferedUpdateRequests[name] = append(wc.bufferedUpdateRequests[name], f)
}
Expand Down
20 changes: 19 additions & 1 deletion internal/internal_flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,14 @@ const (
// SDKPriorityUpdateHandling will cause update request to be handled before the main workflow method.
// It will also cause the SDK to immediately handle updates when a handler is registered.
SDKPriorityUpdateHandling = 4
SDKFlagUnknown = math.MaxUint32
// SDKFlagBlockedSelectorSignalReceive will cause a signal to not be lost
// when the Default path is blocked.
SDKFlagBlockedSelectorSignalReceive = 5
SDKFlagUnknown = math.MaxUint32
)

var unblockSelectorSignal bool

func sdkFlagFromUint(value uint32) sdkFlag {
switch value {
case uint32(SDKFlagUnset):
Expand All @@ -62,6 +67,8 @@ func sdkFlagFromUint(value uint32) sdkFlag {
return SDKFlagProtocolMessageCommand
case uint32(SDKPriorityUpdateHandling):
return SDKPriorityUpdateHandling
case uint32(SDKFlagBlockedSelectorSignalReceive):
return SDKFlagBlockedSelectorSignalReceive
default:
return SDKFlagUnknown
}
Expand Down Expand Up @@ -105,6 +112,11 @@ func (sf *sdkFlags) tryUse(flag sdkFlag, record bool) bool {
}
}

// getFlag returns true if the flag is currently set.
func (sf *sdkFlags) getFlag(flag sdkFlag) bool {
return sf.currentFlags[flag] || sf.newFlags[flag]
}

// set marks a flag as in current use regardless of replay status.
func (sf *sdkFlags) set(flags ...sdkFlag) {
if !sf.capabilities.GetSdkMetadata() {
Expand All @@ -131,3 +143,9 @@ func (sf *sdkFlags) gatherNewSDKFlags() []sdkFlag {
}
return flags
}

// SetUnblockSelectorSignal toggles the flag to unblock the selector signal.
// For test use only,
func SetUnblockSelectorSignal(unblockSignal bool) {
unblockSelectorSignal = unblockSignal
}
2 changes: 2 additions & 0 deletions internal/internal_worker_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,8 @@ type (
DrainUnhandledUpdates() bool
// TryUse returns true if this flag may currently be used.
TryUse(flag sdkFlag) bool
// GetFlag returns if the flag is currently used.
GetFlag(flag sdkFlag) bool
}

// WorkflowDefinitionFactory factory for creating WorkflowDefinition instances.
Expand Down
18 changes: 17 additions & 1 deletion internal/internal_workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -1407,8 +1407,24 @@ func (s *selectorImpl) Select(ctx Context) {
if readyBranch != nil {
return false
}
readyBranch = func() {
// readyBranch is not executed when AddDefault is specified,
// setting the value here prevents the signal from being dropped
env := getWorkflowEnvironment(ctx)
var dropSignalFlag bool
if unblockSelectorSignal {
dropSignalFlag = env.TryUse(SDKFlagBlockedSelectorSignalReceive)
} else {
dropSignalFlag = env.GetFlag(SDKFlagBlockedSelectorSignalReceive)
}

if dropSignalFlag {
c.recValue = &v
}

readyBranch = func() {
if !dropSignalFlag {
c.recValue = &v
}
f(c, more)
}
return true
Expand Down
9 changes: 8 additions & 1 deletion internal/internal_workflow_testsuite.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,8 @@ type (

workflowFunctionExecuting bool
bufferedUpdateRequests map[string][]func()

sdkFlags *sdkFlags
}

testSessionEnvironmentImpl struct {
Expand Down Expand Up @@ -313,6 +315,7 @@ func newTestWorkflowEnvironmentImpl(s *WorkflowTestSuite, parentRegistry *regist
failureConverter: GetDefaultFailureConverter(),
runTimeout: maxWorkflowTimeout,
bufferedUpdateRequests: make(map[string][]func()),
sdkFlags: newSDKFlags(&workflowservice.GetSystemInfoResponse_Capabilities{SdkMetadata: true}),
}

if debugMode {
Expand Down Expand Up @@ -605,7 +608,11 @@ func (env *testWorkflowEnvironmentImpl) getWorkflowDefinition(wt WorkflowType) (
}

func (env *testWorkflowEnvironmentImpl) TryUse(flag sdkFlag) bool {
return true
return env.sdkFlags.tryUse(flag, true)
}

func (env *testWorkflowEnvironmentImpl) GetFlag(flag sdkFlag) bool {
return env.sdkFlags.getFlag(flag)
}

func (env *testWorkflowEnvironmentImpl) QueueUpdate(name string, f func()) {
Expand Down
Loading
Loading