Skip to content

Commit

Permalink
Fix worker leak in eager dispatcher
Browse files Browse the repository at this point in the history
  • Loading branch information
Quinn-With-Two-Ns committed Nov 23, 2024
1 parent c31c2f2 commit 6aafe2e
Show file tree
Hide file tree
Showing 6 changed files with 41 additions and 16 deletions.
2 changes: 1 addition & 1 deletion internal/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -993,7 +993,7 @@ func NewServiceClient(workflowServiceClient workflowservice.WorkflowServiceClien
workerInterceptors: workerInterceptors,
excludeInternalFromRetry: options.ConnectionOptions.excludeInternalFromRetry,
eagerDispatcher: &eagerWorkflowDispatcher{
workersByTaskQueue: make(map[string][]eagerWorker),
workersByTaskQueue: make(map[string]map[string]eagerWorker),
},
}

Expand Down
23 changes: 18 additions & 5 deletions internal/internal_eager_workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,24 +33,37 @@ import (
// eagerWorkflowDispatcher is responsible for finding an available worker for an eager workflow task.
type eagerWorkflowDispatcher struct {
lock sync.RWMutex
workersByTaskQueue map[string][]eagerWorker
workersByTaskQueue map[string]map[string]eagerWorker
}

// registerWorker registers a worker that can be used for eager workflow dispatch
func (e *eagerWorkflowDispatcher) registerWorker(worker *workflowWorker) {
e.lock.Lock()
defer e.lock.Unlock()
e.workersByTaskQueue[worker.executionParameters.TaskQueue] = append(e.workersByTaskQueue[worker.executionParameters.TaskQueue], worker.worker)
taskQueue := worker.executionParameters.TaskQueue
if e.workersByTaskQueue[taskQueue] == nil {
e.workersByTaskQueue[taskQueue] = make(map[string]eagerWorker)
}
e.workersByTaskQueue[taskQueue][worker.id] = worker.worker
}

// deregisterWorker deregister a worker so that it will not be used for eager workflow dispatch
func (e *eagerWorkflowDispatcher) deregisterWorker(worker *workflowWorker) {
e.lock.Lock()
defer e.lock.Unlock()
delete(e.workersByTaskQueue[worker.executionParameters.TaskQueue], worker.id)
}

// applyToRequest updates request if eager workflow dispatch is possible and returns the eagerWorkflowExecutor to use
func (e *eagerWorkflowDispatcher) applyToRequest(request *workflowservice.StartWorkflowExecutionRequest) *eagerWorkflowExecutor {
// Try every worker that is assigned to the desired task queue.
e.lock.RLock()
workers := e.workersByTaskQueue[request.GetTaskQueue().Name]
randWorkers := make([]eagerWorker, len(workers))
// Copy the slice so we can release the lock.
copy(randWorkers, workers)
randWorkers := make([]eagerWorker, 0, len(workers))
// Copy the workers so we can release the lock.
for _, worker := range workers {
randWorkers = append(randWorkers, worker)
}
e.lock.RUnlock()
rand.Shuffle(len(randWorkers), func(i, j int) { randWorkers[i], randWorkers[j] = randWorkers[j], randWorkers[i] })
for _, worker := range randWorkers {
Expand Down
13 changes: 7 additions & 6 deletions internal/internal_eager_workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,11 @@ func (e *eagerWorkerMock) pushEagerTask(task eagerTask) {

func TestEagerWorkflowDispatchNoWorkerOnTaskQueue(t *testing.T) {
dispatcher := &eagerWorkflowDispatcher{
workersByTaskQueue: make(map[string][]eagerWorker),
workersByTaskQueue: make(map[string]map[string]eagerWorker),
}
dispatcher.registerWorker(&workflowWorker{
executionParameters: workerExecutionParameters{TaskQueue: "bad-task-queue"},
id: "worker-id",
})

request := &workflowservice.StartWorkflowExecutionRequest{
Expand All @@ -66,20 +67,20 @@ func TestEagerWorkflowDispatchNoWorkerOnTaskQueue(t *testing.T) {

func TestEagerWorkflowDispatchAvailableWorker(t *testing.T) {
dispatcher := &eagerWorkflowDispatcher{
workersByTaskQueue: make(map[string][]eagerWorker),
workersByTaskQueue: make(map[string]map[string]eagerWorker),
}

availableWorker := &eagerWorkerMock{
tryReserveSlotCallback: func() *SlotPermit { return &SlotPermit{} },
}
dispatcher.workersByTaskQueue["task-queue"] = []eagerWorker{
&eagerWorkerMock{
dispatcher.workersByTaskQueue["task-queue"] = map[string]eagerWorker{
"1": &eagerWorkerMock{
tryReserveSlotCallback: func() *SlotPermit { return nil },
},
&eagerWorkerMock{
"2": &eagerWorkerMock{
tryReserveSlotCallback: func() *SlotPermit { return nil },
},
availableWorker,
"3": availableWorker,
}

request := &workflowservice.StartWorkflowExecutionRequest{
Expand Down
8 changes: 8 additions & 0 deletions internal/internal_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ type (
localActivityWorker *baseWorker
identity string
stopC chan struct{}
id string // Unique identifier of this worker generated by the SDK.
}

// ActivityWorker wraps the code for hosting activity types.
Expand Down Expand Up @@ -377,6 +378,7 @@ func newWorkflowTaskWorkerInternal(
localActivityWorker: localActivityWorker,
identity: params.Identity,
stopC: stopC,
id: uuid.New(),
}
}

Expand Down Expand Up @@ -1050,6 +1052,9 @@ func (aw *AggregatedWorker) start() error {
// stop workflow worker.
if !util.IsInterfaceNil(aw.workflowWorker) {
if aw.workflowWorker.worker.isWorkerStarted {
if aw.client.eagerDispatcher != nil {
aw.client.eagerDispatcher.deregisterWorker(aw.workflowWorker)
}
aw.workflowWorker.Stop()
}
}
Expand Down Expand Up @@ -1218,6 +1223,9 @@ func (aw *AggregatedWorker) Stop() {
}

if !util.IsInterfaceNil(aw.workflowWorker) {
if aw.client.eagerDispatcher != nil {
aw.client.eagerDispatcher.deregisterWorker(aw.workflowWorker)
}
aw.workflowWorker.Stop()
}
if !util.IsInterfaceNil(aw.activityWorker) {
Expand Down
3 changes: 3 additions & 0 deletions internal/internal_workers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ func (s *WorkersTestSuite) TestWorkflowWorker() {
workflowWorker.Stop()

s.NoError(ctx.Err())

}

type CountingSlotSupplier struct {
Expand Down Expand Up @@ -736,6 +737,8 @@ func (s *WorkersTestSuite) TestWorkerMultipleStop() {
worker := NewAggregatedWorker(client, "multi-stop-tq", WorkerOptions{})
s.NoError(worker.Start())
worker.Stop()
// Verify stopping the worker removes it from the eager dispatcher
s.Empty(client.eagerDispatcher.workersByTaskQueue["multi-stop-tq"])
worker.Stop()
}

Expand Down
8 changes: 4 additions & 4 deletions internal/internal_workflow_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1407,7 +1407,7 @@ func (s *workflowClientTestSuite) TestEagerStartWorkflowNotSupported() {
},
}
client.eagerDispatcher = &eagerWorkflowDispatcher{
workersByTaskQueue: map[string][]eagerWorker{taskqueue: {eagerMock}},
workersByTaskQueue: map[string]map[string]eagerWorker{taskqueue: {"1": eagerMock}},
}
s.True(ok)
options := StartWorkflowOptions{
Expand Down Expand Up @@ -1446,7 +1446,7 @@ func (s *workflowClientTestSuite) TestEagerStartWorkflowNoWorker() {
tryReserveSlotCallback: func() *SlotPermit { return nil },
processTaskAsyncCallback: func(task eagerTask) { processTask = true }}
client.eagerDispatcher = &eagerWorkflowDispatcher{
workersByTaskQueue: map[string][]eagerWorker{taskqueue: {eagerMock}},
workersByTaskQueue: map[string]map[string]eagerWorker{taskqueue: {"1": eagerMock}},
}
s.True(ok)
options := StartWorkflowOptions{
Expand Down Expand Up @@ -1485,7 +1485,7 @@ func (s *workflowClientTestSuite) TestEagerStartWorkflow() {
tryReserveSlotCallback: func() *SlotPermit { return &SlotPermit{} },
processTaskAsyncCallback: func(task eagerTask) { processTask = true }}
client.eagerDispatcher = &eagerWorkflowDispatcher{
workersByTaskQueue: map[string][]eagerWorker{taskqueue: {eagerMock}}}
workersByTaskQueue: map[string]map[string]eagerWorker{taskqueue: {"1": eagerMock}}}
s.True(ok)
options := StartWorkflowOptions{
ID: workflowID,
Expand Down Expand Up @@ -1525,7 +1525,7 @@ func (s *workflowClientTestSuite) TestEagerStartWorkflowStartRequestFail() {
tryReserveSlotCallback: func() *SlotPermit { return &SlotPermit{} },
processTaskAsyncCallback: func(task eagerTask) { processTask = true }}
client.eagerDispatcher = &eagerWorkflowDispatcher{
workersByTaskQueue: map[string][]eagerWorker{taskqueue: {eagerMock}}}
workersByTaskQueue: map[string]map[string]eagerWorker{taskqueue: {"1": eagerMock}}}
s.True(ok)
options := StartWorkflowOptions{
ID: workflowID,
Expand Down

0 comments on commit 6aafe2e

Please sign in to comment.