From 9477fcd1da14b4b27b8a953b8a7605d8f00420c1 Mon Sep 17 00:00:00 2001 From: Quinn Klassen Date: Sun, 24 Nov 2024 20:23:37 -0800 Subject: [PATCH] Refactor freeing --- internal/client.go | 2 +- internal/internal_eager_workflow.go | 10 +++++----- internal/internal_eager_workflow_test.go | 17 ++++++++--------- internal/internal_worker.go | 2 -- internal/internal_workflow_client_test.go | 8 ++++---- 5 files changed, 18 insertions(+), 21 deletions(-) diff --git a/internal/client.go b/internal/client.go index a4457bb46..99f9d390e 100644 --- a/internal/client.go +++ b/internal/client.go @@ -993,7 +993,7 @@ func NewServiceClient(workflowServiceClient workflowservice.WorkflowServiceClien workerInterceptors: workerInterceptors, excludeInternalFromRetry: options.ConnectionOptions.excludeInternalFromRetry, eagerDispatcher: &eagerWorkflowDispatcher{ - workersByTaskQueue: make(map[string]map[string]eagerWorker), + workersByTaskQueue: make(map[string]map[eagerWorker]struct{}), }, } diff --git a/internal/internal_eager_workflow.go b/internal/internal_eager_workflow.go index e4cedb1f1..029829b88 100644 --- a/internal/internal_eager_workflow.go +++ b/internal/internal_eager_workflow.go @@ -33,7 +33,7 @@ import ( // eagerWorkflowDispatcher is responsible for finding an available worker for an eager workflow task. type eagerWorkflowDispatcher struct { lock sync.RWMutex - workersByTaskQueue map[string]map[string]eagerWorker + workersByTaskQueue map[string]map[eagerWorker]struct{} } // registerWorker registers a worker that can be used for eager workflow dispatch @@ -42,16 +42,16 @@ func (e *eagerWorkflowDispatcher) registerWorker(worker *workflowWorker) { defer e.lock.Unlock() taskQueue := worker.executionParameters.TaskQueue if e.workersByTaskQueue[taskQueue] == nil { - e.workersByTaskQueue[taskQueue] = make(map[string]eagerWorker) + e.workersByTaskQueue[taskQueue] = make(map[eagerWorker]struct{}) } - e.workersByTaskQueue[taskQueue][worker.id] = worker.worker + e.workersByTaskQueue[taskQueue][worker.worker] = struct{}{} } // deregisterWorker deregister a worker so that it will not be used for eager workflow dispatch func (e *eagerWorkflowDispatcher) deregisterWorker(worker *workflowWorker) { e.lock.Lock() defer e.lock.Unlock() - delete(e.workersByTaskQueue[worker.executionParameters.TaskQueue], worker.id) + delete(e.workersByTaskQueue[worker.executionParameters.TaskQueue], worker.worker) } // applyToRequest updates request if eager workflow dispatch is possible and returns the eagerWorkflowExecutor to use @@ -61,7 +61,7 @@ func (e *eagerWorkflowDispatcher) applyToRequest(request *workflowservice.StartW workers := e.workersByTaskQueue[request.GetTaskQueue().Name] randWorkers := make([]eagerWorker, 0, len(workers)) // Copy the workers so we can release the lock. - for _, worker := range workers { + for worker := range workers { randWorkers = append(randWorkers, worker) } e.lock.RUnlock() diff --git a/internal/internal_eager_workflow_test.go b/internal/internal_eager_workflow_test.go index b239fb81d..db963551f 100644 --- a/internal/internal_eager_workflow_test.go +++ b/internal/internal_eager_workflow_test.go @@ -50,11 +50,10 @@ func (e *eagerWorkerMock) pushEagerTask(task eagerTask) { func TestEagerWorkflowDispatchNoWorkerOnTaskQueue(t *testing.T) { dispatcher := &eagerWorkflowDispatcher{ - workersByTaskQueue: make(map[string]map[string]eagerWorker), + workersByTaskQueue: make(map[string]map[eagerWorker]struct{}), } dispatcher.registerWorker(&workflowWorker{ executionParameters: workerExecutionParameters{TaskQueue: "bad-task-queue"}, - id: "worker-id", }) request := &workflowservice.StartWorkflowExecutionRequest{ @@ -67,20 +66,20 @@ func TestEagerWorkflowDispatchNoWorkerOnTaskQueue(t *testing.T) { func TestEagerWorkflowDispatchAvailableWorker(t *testing.T) { dispatcher := &eagerWorkflowDispatcher{ - workersByTaskQueue: make(map[string]map[string]eagerWorker), + workersByTaskQueue: make(map[string]map[eagerWorker]struct{}), } availableWorker := &eagerWorkerMock{ tryReserveSlotCallback: func() *SlotPermit { return &SlotPermit{} }, } - dispatcher.workersByTaskQueue["task-queue"] = map[string]eagerWorker{ - "1": &eagerWorkerMock{ + dispatcher.workersByTaskQueue["task-queue"] = map[eagerWorker]struct{}{ + &eagerWorkerMock{ tryReserveSlotCallback: func() *SlotPermit { return nil }, - }, - "2": &eagerWorkerMock{ + }: {}, + &eagerWorkerMock{ tryReserveSlotCallback: func() *SlotPermit { return nil }, - }, - "3": availableWorker, + }: {}, + availableWorker: {}, } request := &workflowservice.StartWorkflowExecutionRequest{ diff --git a/internal/internal_worker.go b/internal/internal_worker.go index cbcd95dc4..61f324f91 100644 --- a/internal/internal_worker.go +++ b/internal/internal_worker.go @@ -102,7 +102,6 @@ type ( localActivityWorker *baseWorker identity string stopC chan struct{} - id string // Unique identifier of this worker generated by the SDK. } // ActivityWorker wraps the code for hosting activity types. @@ -378,7 +377,6 @@ func newWorkflowTaskWorkerInternal( localActivityWorker: localActivityWorker, identity: params.Identity, stopC: stopC, - id: uuid.New(), } } diff --git a/internal/internal_workflow_client_test.go b/internal/internal_workflow_client_test.go index 4d44dd365..8617163cf 100644 --- a/internal/internal_workflow_client_test.go +++ b/internal/internal_workflow_client_test.go @@ -1407,7 +1407,7 @@ func (s *workflowClientTestSuite) TestEagerStartWorkflowNotSupported() { }, } client.eagerDispatcher = &eagerWorkflowDispatcher{ - workersByTaskQueue: map[string]map[string]eagerWorker{taskqueue: {"1": eagerMock}}, + workersByTaskQueue: map[string]map[eagerWorker]struct{}{taskqueue: {eagerMock: {}}}, } s.True(ok) options := StartWorkflowOptions{ @@ -1446,7 +1446,7 @@ func (s *workflowClientTestSuite) TestEagerStartWorkflowNoWorker() { tryReserveSlotCallback: func() *SlotPermit { return nil }, processTaskAsyncCallback: func(task eagerTask) { processTask = true }} client.eagerDispatcher = &eagerWorkflowDispatcher{ - workersByTaskQueue: map[string]map[string]eagerWorker{taskqueue: {"1": eagerMock}}, + workersByTaskQueue: map[string]map[eagerWorker]struct{}{taskqueue: {eagerMock: {}}}, } s.True(ok) options := StartWorkflowOptions{ @@ -1485,7 +1485,7 @@ func (s *workflowClientTestSuite) TestEagerStartWorkflow() { tryReserveSlotCallback: func() *SlotPermit { return &SlotPermit{} }, processTaskAsyncCallback: func(task eagerTask) { processTask = true }} client.eagerDispatcher = &eagerWorkflowDispatcher{ - workersByTaskQueue: map[string]map[string]eagerWorker{taskqueue: {"1": eagerMock}}} + workersByTaskQueue: map[string]map[eagerWorker]struct{}{taskqueue: {eagerMock: {}}}} s.True(ok) options := StartWorkflowOptions{ ID: workflowID, @@ -1525,7 +1525,7 @@ func (s *workflowClientTestSuite) TestEagerStartWorkflowStartRequestFail() { tryReserveSlotCallback: func() *SlotPermit { return &SlotPermit{} }, processTaskAsyncCallback: func(task eagerTask) { processTask = true }} client.eagerDispatcher = &eagerWorkflowDispatcher{ - workersByTaskQueue: map[string]map[string]eagerWorker{taskqueue: {"1": eagerMock}}} + workersByTaskQueue: map[string]map[eagerWorker]struct{}{taskqueue: {eagerMock: {}}}} s.True(ok) options := StartWorkflowOptions{ ID: workflowID,