Skip to content

Commit

Permalink
When evicting workflows from cache make sure all go routines are evic…
Browse files Browse the repository at this point in the history
…ted serially
  • Loading branch information
Quinn-With-Two-Ns committed Nov 26, 2024
1 parent c7fa7e8 commit 33fa708
Showing 1 changed file with 19 additions and 4 deletions.
23 changes: 19 additions & 4 deletions internal/internal_workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import (

"go.temporal.io/sdk/converter"
"go.temporal.io/sdk/internal/common/metrics"
"go.temporal.io/sdk/log"
)

const (
Expand Down Expand Up @@ -193,6 +194,7 @@ type (
mutex sync.Mutex // used to synchronize executing
closed bool
interceptor WorkflowOutboundInterceptor
logger log.Logger
deadlockDetector *deadlockDetector
readOnly bool
// allBlockedCallback is called when all coroutines are blocked,
Expand Down Expand Up @@ -675,8 +677,11 @@ func (d *syncWorkflowDefinition) Close() {
// Context passed to the root function is child of the passed rootCtx.
// This way rootCtx can be used to pass values to the coroutine code.
func newDispatcher(rootCtx Context, interceptor *workflowEnvironmentInterceptor, root func(ctx Context), allBlockedCallback func() bool) (*dispatcherImpl, Context) {
env := getWorkflowEnvironment(rootCtx)

result := &dispatcherImpl{
interceptor: interceptor.outboundInterceptor,
logger: env.GetLogger(),
deadlockDetector: newDeadlockDetector(),
allBlockedCallback: allBlockedCallback,
}
Expand Down Expand Up @@ -1157,21 +1162,31 @@ func (s *coroutineState) close() {
}

// exit tries to run Goexit on the coroutine and wait for it to exit
// within timeout.
func (s *coroutineState) exit(timeout time.Duration) {
// within timeout. If it doesn't exit within timeout, it will log a warning.
func (s *coroutineState) exit(logger log.Logger, warnTimeout time.Duration) {
if !s.closed.Load() {
s.unblock <- func(status string, stackDepth int) bool {
runtime.Goexit()
return true
}

timer := time.NewTimer(timeout)
timer := time.NewTimer(warnTimeout)
defer timer.Stop()

select {
case <-s.aboutToBlock:
return
case <-timer.C:
st, err := getCoroStackTrace(s, "running", 0)
if err != nil {
st = fmt.Sprintf("<%s>", err)
}

logger.Warn(fmt.Sprintf("Workflow coroutine %q didn't exit within %v", s.name, warnTimeout), "stackTrace", st)
}
// We need to make sure the coroutine is closed, otherwise we risk concurrent coroutines running
// at the same time causing a race condition.
<-s.aboutToBlock
}
}

Expand Down Expand Up @@ -1331,7 +1346,7 @@ func (d *dispatcherImpl) Close() {
// * On exit the coroutines defers will still run and that may block.
go func() {
for _, c := range d.coroutines {
c.exit(defaultCoroutineExitTimeout)
c.exit(d.logger, defaultDeadlockDetectionTimeout)
}
}()
}
Expand Down

0 comments on commit 33fa708

Please sign in to comment.