From 6aafe2e2cce1f78028f85b3efac0b16a5f549432 Mon Sep 17 00:00:00 2001 From: Quinn Klassen Date: Sat, 23 Nov 2024 15:36:59 -0800 Subject: [PATCH] Fix worker leak in eager dispatcher --- internal/client.go | 2 +- internal/internal_eager_workflow.go | 23 ++++++++++++++++++----- internal/internal_eager_workflow_test.go | 13 +++++++------ internal/internal_worker.go | 8 ++++++++ internal/internal_workers_test.go | 3 +++ internal/internal_workflow_client_test.go | 8 ++++---- 6 files changed, 41 insertions(+), 16 deletions(-) diff --git a/internal/client.go b/internal/client.go index 867c1c3d9..a4457bb46 100644 --- a/internal/client.go +++ b/internal/client.go @@ -993,7 +993,7 @@ func NewServiceClient(workflowServiceClient workflowservice.WorkflowServiceClien workerInterceptors: workerInterceptors, excludeInternalFromRetry: options.ConnectionOptions.excludeInternalFromRetry, eagerDispatcher: &eagerWorkflowDispatcher{ - workersByTaskQueue: make(map[string][]eagerWorker), + workersByTaskQueue: make(map[string]map[string]eagerWorker), }, } diff --git a/internal/internal_eager_workflow.go b/internal/internal_eager_workflow.go index 9b8eac029..e4cedb1f1 100644 --- a/internal/internal_eager_workflow.go +++ b/internal/internal_eager_workflow.go @@ -33,14 +33,25 @@ import ( // eagerWorkflowDispatcher is responsible for finding an available worker for an eager workflow task. type eagerWorkflowDispatcher struct { lock sync.RWMutex - workersByTaskQueue map[string][]eagerWorker + workersByTaskQueue map[string]map[string]eagerWorker } // registerWorker registers a worker that can be used for eager workflow dispatch func (e *eagerWorkflowDispatcher) registerWorker(worker *workflowWorker) { e.lock.Lock() defer e.lock.Unlock() - e.workersByTaskQueue[worker.executionParameters.TaskQueue] = append(e.workersByTaskQueue[worker.executionParameters.TaskQueue], worker.worker) + taskQueue := worker.executionParameters.TaskQueue + if e.workersByTaskQueue[taskQueue] == nil { + e.workersByTaskQueue[taskQueue] = make(map[string]eagerWorker) + } + e.workersByTaskQueue[taskQueue][worker.id] = worker.worker +} + +// deregisterWorker deregister a worker so that it will not be used for eager workflow dispatch +func (e *eagerWorkflowDispatcher) deregisterWorker(worker *workflowWorker) { + e.lock.Lock() + defer e.lock.Unlock() + delete(e.workersByTaskQueue[worker.executionParameters.TaskQueue], worker.id) } // applyToRequest updates request if eager workflow dispatch is possible and returns the eagerWorkflowExecutor to use @@ -48,9 +59,11 @@ func (e *eagerWorkflowDispatcher) applyToRequest(request *workflowservice.StartW // Try every worker that is assigned to the desired task queue. e.lock.RLock() workers := e.workersByTaskQueue[request.GetTaskQueue().Name] - randWorkers := make([]eagerWorker, len(workers)) - // Copy the slice so we can release the lock. - copy(randWorkers, workers) + randWorkers := make([]eagerWorker, 0, len(workers)) + // Copy the workers so we can release the lock. + for _, worker := range workers { + randWorkers = append(randWorkers, worker) + } e.lock.RUnlock() rand.Shuffle(len(randWorkers), func(i, j int) { randWorkers[i], randWorkers[j] = randWorkers[j], randWorkers[i] }) for _, worker := range randWorkers { diff --git a/internal/internal_eager_workflow_test.go b/internal/internal_eager_workflow_test.go index 294a29c16..b239fb81d 100644 --- a/internal/internal_eager_workflow_test.go +++ b/internal/internal_eager_workflow_test.go @@ -50,10 +50,11 @@ func (e *eagerWorkerMock) pushEagerTask(task eagerTask) { func TestEagerWorkflowDispatchNoWorkerOnTaskQueue(t *testing.T) { dispatcher := &eagerWorkflowDispatcher{ - workersByTaskQueue: make(map[string][]eagerWorker), + workersByTaskQueue: make(map[string]map[string]eagerWorker), } dispatcher.registerWorker(&workflowWorker{ executionParameters: workerExecutionParameters{TaskQueue: "bad-task-queue"}, + id: "worker-id", }) request := &workflowservice.StartWorkflowExecutionRequest{ @@ -66,20 +67,20 @@ func TestEagerWorkflowDispatchNoWorkerOnTaskQueue(t *testing.T) { func TestEagerWorkflowDispatchAvailableWorker(t *testing.T) { dispatcher := &eagerWorkflowDispatcher{ - workersByTaskQueue: make(map[string][]eagerWorker), + workersByTaskQueue: make(map[string]map[string]eagerWorker), } availableWorker := &eagerWorkerMock{ tryReserveSlotCallback: func() *SlotPermit { return &SlotPermit{} }, } - dispatcher.workersByTaskQueue["task-queue"] = []eagerWorker{ - &eagerWorkerMock{ + dispatcher.workersByTaskQueue["task-queue"] = map[string]eagerWorker{ + "1": &eagerWorkerMock{ tryReserveSlotCallback: func() *SlotPermit { return nil }, }, - &eagerWorkerMock{ + "2": &eagerWorkerMock{ tryReserveSlotCallback: func() *SlotPermit { return nil }, }, - availableWorker, + "3": availableWorker, } request := &workflowservice.StartWorkflowExecutionRequest{ diff --git a/internal/internal_worker.go b/internal/internal_worker.go index 23beb0c59..cbcd95dc4 100644 --- a/internal/internal_worker.go +++ b/internal/internal_worker.go @@ -102,6 +102,7 @@ type ( localActivityWorker *baseWorker identity string stopC chan struct{} + id string // Unique identifier of this worker generated by the SDK. } // ActivityWorker wraps the code for hosting activity types. @@ -377,6 +378,7 @@ func newWorkflowTaskWorkerInternal( localActivityWorker: localActivityWorker, identity: params.Identity, stopC: stopC, + id: uuid.New(), } } @@ -1050,6 +1052,9 @@ func (aw *AggregatedWorker) start() error { // stop workflow worker. if !util.IsInterfaceNil(aw.workflowWorker) { if aw.workflowWorker.worker.isWorkerStarted { + if aw.client.eagerDispatcher != nil { + aw.client.eagerDispatcher.deregisterWorker(aw.workflowWorker) + } aw.workflowWorker.Stop() } } @@ -1218,6 +1223,9 @@ func (aw *AggregatedWorker) Stop() { } if !util.IsInterfaceNil(aw.workflowWorker) { + if aw.client.eagerDispatcher != nil { + aw.client.eagerDispatcher.deregisterWorker(aw.workflowWorker) + } aw.workflowWorker.Stop() } if !util.IsInterfaceNil(aw.activityWorker) { diff --git a/internal/internal_workers_test.go b/internal/internal_workers_test.go index 5622e0ca3..11159cf18 100644 --- a/internal/internal_workers_test.go +++ b/internal/internal_workers_test.go @@ -117,6 +117,7 @@ func (s *WorkersTestSuite) TestWorkflowWorker() { workflowWorker.Stop() s.NoError(ctx.Err()) + } type CountingSlotSupplier struct { @@ -736,6 +737,8 @@ func (s *WorkersTestSuite) TestWorkerMultipleStop() { worker := NewAggregatedWorker(client, "multi-stop-tq", WorkerOptions{}) s.NoError(worker.Start()) worker.Stop() + // Verify stopping the worker removes it from the eager dispatcher + s.Empty(client.eagerDispatcher.workersByTaskQueue["multi-stop-tq"]) worker.Stop() } diff --git a/internal/internal_workflow_client_test.go b/internal/internal_workflow_client_test.go index 270b27203..4d44dd365 100644 --- a/internal/internal_workflow_client_test.go +++ b/internal/internal_workflow_client_test.go @@ -1407,7 +1407,7 @@ func (s *workflowClientTestSuite) TestEagerStartWorkflowNotSupported() { }, } client.eagerDispatcher = &eagerWorkflowDispatcher{ - workersByTaskQueue: map[string][]eagerWorker{taskqueue: {eagerMock}}, + workersByTaskQueue: map[string]map[string]eagerWorker{taskqueue: {"1": eagerMock}}, } s.True(ok) options := StartWorkflowOptions{ @@ -1446,7 +1446,7 @@ func (s *workflowClientTestSuite) TestEagerStartWorkflowNoWorker() { tryReserveSlotCallback: func() *SlotPermit { return nil }, processTaskAsyncCallback: func(task eagerTask) { processTask = true }} client.eagerDispatcher = &eagerWorkflowDispatcher{ - workersByTaskQueue: map[string][]eagerWorker{taskqueue: {eagerMock}}, + workersByTaskQueue: map[string]map[string]eagerWorker{taskqueue: {"1": eagerMock}}, } s.True(ok) options := StartWorkflowOptions{ @@ -1485,7 +1485,7 @@ func (s *workflowClientTestSuite) TestEagerStartWorkflow() { tryReserveSlotCallback: func() *SlotPermit { return &SlotPermit{} }, processTaskAsyncCallback: func(task eagerTask) { processTask = true }} client.eagerDispatcher = &eagerWorkflowDispatcher{ - workersByTaskQueue: map[string][]eagerWorker{taskqueue: {eagerMock}}} + workersByTaskQueue: map[string]map[string]eagerWorker{taskqueue: {"1": eagerMock}}} s.True(ok) options := StartWorkflowOptions{ ID: workflowID, @@ -1525,7 +1525,7 @@ func (s *workflowClientTestSuite) TestEagerStartWorkflowStartRequestFail() { tryReserveSlotCallback: func() *SlotPermit { return &SlotPermit{} }, processTaskAsyncCallback: func(task eagerTask) { processTask = true }} client.eagerDispatcher = &eagerWorkflowDispatcher{ - workersByTaskQueue: map[string][]eagerWorker{taskqueue: {eagerMock}}} + workersByTaskQueue: map[string]map[string]eagerWorker{taskqueue: {"1": eagerMock}}} s.True(ok) options := StartWorkflowOptions{ ID: workflowID,