diff --git a/internal/internal_workflow.go b/internal/internal_workflow.go index 062925509..62349900b 100644 --- a/internal/internal_workflow.go +++ b/internal/internal_workflow.go @@ -47,6 +47,7 @@ import ( "go.temporal.io/sdk/converter" "go.temporal.io/sdk/internal/common/metrics" + "go.temporal.io/sdk/log" ) const ( @@ -193,6 +194,7 @@ type ( mutex sync.Mutex // used to synchronize executing closed bool interceptor WorkflowOutboundInterceptor + logger log.Logger deadlockDetector *deadlockDetector readOnly bool // allBlockedCallback is called when all coroutines are blocked, @@ -675,8 +677,11 @@ func (d *syncWorkflowDefinition) Close() { // Context passed to the root function is child of the passed rootCtx. // This way rootCtx can be used to pass values to the coroutine code. func newDispatcher(rootCtx Context, interceptor *workflowEnvironmentInterceptor, root func(ctx Context), allBlockedCallback func() bool) (*dispatcherImpl, Context) { + env := getWorkflowEnvironment(rootCtx) + result := &dispatcherImpl{ interceptor: interceptor.outboundInterceptor, + logger: env.GetLogger(), deadlockDetector: newDeadlockDetector(), allBlockedCallback: allBlockedCallback, } @@ -1157,21 +1162,31 @@ func (s *coroutineState) close() { } // exit tries to run Goexit on the coroutine and wait for it to exit -// within timeout. -func (s *coroutineState) exit(timeout time.Duration) { +// within timeout. If it doesn't exit within timeout, it will log a warning. +func (s *coroutineState) exit(logger log.Logger, warnTimeout time.Duration) { if !s.closed.Load() { s.unblock <- func(status string, stackDepth int) bool { runtime.Goexit() return true } - timer := time.NewTimer(timeout) + timer := time.NewTimer(warnTimeout) defer timer.Stop() select { case <-s.aboutToBlock: + return case <-timer.C: + st, err := getCoroStackTrace(s, "running", 0) + if err != nil { + st = fmt.Sprintf("<%s>", err) + } + + logger.Warn(fmt.Sprintf("Workflow coroutine %q didn't exit within %v", s.name, warnTimeout), "stackTrace", st) } + // We need to make sure the coroutine is closed, otherwise we risk concurrent coroutines running + // at the same time causing a race condition. + <-s.aboutToBlock } } @@ -1331,7 +1346,7 @@ func (d *dispatcherImpl) Close() { // * On exit the coroutines defers will still run and that may block. go func() { for _, c := range d.coroutines { - c.exit(defaultCoroutineExitTimeout) + c.exit(d.logger, defaultDeadlockDetectionTimeout) } }() }