From 30c3790bffec324b8270a73999b5e9170e678cff Mon Sep 17 00:00:00 2001 From: Andrew Dye Date: Thu, 16 May 2024 10:52:48 -0700 Subject: [PATCH] Add pending owners cache (#267) ## Overview When kicking off fast tasks, we typically have to do a second round of task evaluation before a worker is available, which adds latency to the initial task runs while the worker(s) come up. This change keeps an in-memory cache of tasks waiting on a worker so that when the first one comes up, we can opportunistically enqueue the owning workflow for evaluation and avoid a ~10s delay. I chose to use a service-wide lock, which trades off some lock contention for reduced complexity. This is acceptable since we already grab the service-wide `queuesLock` when discovering a new worker (call to `Heartbeat`). ## Test Plan ~- [ ] Haven't added any unittests yet. Wanted to get feedback on the approach~ Going to defer unittests to the broad pass @hamersaw is doing - [x] Ran locally and verified that with the change tasks do not require a second round Without the enqueue call (2s delay from worker registered -> send task) ``` "2024-05-13T16:42:22-07:00" "adding pending owner flytesnacks-development/feb7da731f60c482db2d for task feb7da731f60c482db2d-n0-0 on queue 4fc648840f89c02" "2024-05-13T16:42:22-07:00" "offering task feb7da731f60c482db2d-n0-0 on queue 4fc648840f89c02" "2024-05-13T16:42:22-07:00" "offering task feb7da731f60c482db2d-n0-0 on queue 4fc648840f89c02" "2024-05-13T16:42:22-07:00" "offering task feb7da731f60c482db2d-n0-0 on queue 4fc648840f89c02" "2024-05-13T16:42:23-07:00" "worker 6b772a8b-7748-4819-99ea-140086ca27af registered with queue 4fc648840f89c02" "2024-05-13T16:42:25-07:00" "offering task feb7da731f60c482db2d-n0-0 on queue 4fc648840f89c02" "2024-05-13T16:42:25-07:00" "sending task feb7da731f60c482db2d-n0-0 to worker 6b772a8b-7748-4819-99ea-140086ca27af on queue 4fc648840f89c02" ``` With enqueue call (same second) ``` "2024-05-13T16:48:25-07:00" "adding pending owner flytesnacks-development/f96f3fe69ae744129ab3 for task f96f3fe69ae744129ab3-n0-0 on queue 4fc648840f89c02" "2024-05-13T16:48:25-07:00" "offering task f96f3fe69ae744129ab3-n0-0 on queue 4fc648840f89c02" "2024-05-13T16:48:25-07:00" "offering task f96f3fe69ae744129ab3-n0-0 on queue 4fc648840f89c02" "2024-05-13T16:48:25-07:00" "offering task f96f3fe69ae744129ab3-n0-0 on queue 4fc648840f89c02" "2024-05-13T16:48:26-07:00" "worker abb8da76-b4f2-44a2-8fe9-c577f682c914 registered with queue 4fc648840f89c02" "2024-05-13T16:48:26-07:00" "offering task f96f3fe69ae744129ab3-n0-0 on queue 4fc648840f89c02" "2024-05-13T16:48:26-07:00" "sending task f96f3fe69ae744129ab3-n0-0 to worker abb8da76-b4f2-44a2-8fe9-c577f682c914 on queue 4fc648840f89c02" ``` ## Rollout Plan (if applicable) No planning to put this behind a config (although code potentially move `maxPendingOwnersPerQueue` to a config and treat 0 as disabled). Will bring to cloud and deploy in the coming days ## Upstream Changes Should this change be upstreamed to OSS (flyteorg/flyte)? If so, please check this box for auditing. Note, this is the responsibility of each developer. See [this guide](https://unionai.atlassian.net/wiki/spaces/ENG/pages/447610883/Flyte+-+Union+Cloud+Development+Runbook/#When-are-versions-updated%3F). - [ ] To be upstreamed --- fasttask/plugin/plugin.go | 1 - fasttask/plugin/service.go | 153 +++++++++++++++++++++++++++++-------- 2 files changed, 123 insertions(+), 31 deletions(-) diff --git a/fasttask/plugin/plugin.go b/fasttask/plugin/plugin.go index f78c46ff549..2c6e63d97aa 100644 --- a/fasttask/plugin/plugin.go +++ b/fasttask/plugin/plugin.go @@ -262,7 +262,6 @@ func (p *Plugin) Abort(ctx context.Context, tCtx core.TaskExecutionContext) erro // active executions. this is performed in the `Finalize` function which is _always_ called // during any abort. if this logic changes, we will need to add a call to // `fastTaskService.Cleanup` to ensure proper abort here. - // TODO: add this since the service may now hold pending tasks without a worker up return nil } diff --git a/fasttask/plugin/service.go b/fasttask/plugin/service.go index add8c63cef0..6edf3ad38b4 100644 --- a/fasttask/plugin/service.go +++ b/fasttask/plugin/service.go @@ -18,13 +18,22 @@ import ( "github.com/unionai/flyte/fasttask/plugin/pb" ) +var maxPendingOwnersPerQueue = 100 + // FastTaskService is a gRPC service that manages assignment and management of task executions with // respect to fasttask workers. type FastTaskService struct { pb.UnimplementedFastTaskServer - enqueueOwner core.EnqueueOwner - queues map[string]*Queue - queuesLock sync.RWMutex + enqueueOwner core.EnqueueOwner + + queues map[string]*Queue + queuesLock sync.RWMutex + + // A map of pending owners by queue. When a new worker becomes available, use this to enqueue owners for reevaluation. + // Note, this is an optimistic approach and may not include all pending owners. + pendingTaskOwners map[string]map[string]types.NamespacedName // map[queueID]map[taskID]ownerID + pendingTaskOwnersLock sync.RWMutex + taskStatusChannels sync.Map // map[string]chan *WorkerTaskStatus metrics metrics } @@ -107,33 +116,11 @@ func (f *FastTaskService) Heartbeat(stream pb.FastTask_HeartbeatServer) error { } // register worker with queue - f.queuesLock.Lock() - queue, exists := f.queues[heartbeatRequest.GetQueueId()] - if !exists { - queue = &Queue{ - workers: make(map[string]*Worker), - } - f.queues[heartbeatRequest.GetQueueId()] = queue - } - f.queuesLock.Unlock() - - queue.lock.Lock() - queue.workers[workerID] = worker - queue.lock.Unlock() + queue := f.addWorkerToQueue(heartbeatRequest.GetQueueId(), worker) // cleanup worker on exit defer func() { - f.queuesLock.Lock() - queue, exists := f.queues[heartbeatRequest.GetQueueId()] - if exists { - queue.lock.Lock() - delete(queue.workers, workerID) - if len(queue.workers) == 0 { - delete(f.queues, heartbeatRequest.GetQueueId()) - } - queue.lock.Unlock() - } - f.queuesLock.Unlock() + f.removeWorkerFromQueue(heartbeatRequest.GetQueueId(), workerID) }() // start go routine to handle heartbeat responses @@ -150,6 +137,9 @@ func (f *FastTaskService) Heartbeat(stream pb.FastTask_HeartbeatServer) error { } }() + // new worker available, enqueue owners + f.enqueuePendingOwners(heartbeatRequest.GetQueueId()) + // handle heartbeat requests for { heartbeatRequest, err := stream.Recv() @@ -198,6 +188,100 @@ func (f *FastTaskService) Heartbeat(stream pb.FastTask_HeartbeatServer) error { return nil } +func (f *FastTaskService) addWorkerToQueue(queueID string, worker *Worker) *Queue { + f.queuesLock.Lock() + defer f.queuesLock.Unlock() + + queue, exists := f.queues[queueID] + if !exists { + queue = &Queue{ + workers: make(map[string]*Worker), + } + f.queues[queueID] = queue + } + + queue.lock.Lock() + defer queue.lock.Unlock() + + queue.workers[worker.workerID] = worker + return queue +} + +func (f *FastTaskService) removeWorkerFromQueue(queueID, workerID string) { + f.queuesLock.Lock() + defer f.queuesLock.Unlock() + + queue, exists := f.queues[queueID] + if !exists { + return + } + + queue.lock.Lock() + defer queue.lock.Unlock() + + delete(queue.workers, workerID) + if len(queue.workers) == 0 { + delete(f.queues, queueID) + } +} + +// addPendingOwner adds to the pending owners list for the queue, if not already full +func (f *FastTaskService) addPendingOwner(queueID, taskID string, ownerID types.NamespacedName) { + f.pendingTaskOwnersLock.Lock() + defer f.pendingTaskOwnersLock.Unlock() + + owners, exists := f.pendingTaskOwners[queueID] + if !exists { + owners = make(map[string]types.NamespacedName) + f.pendingTaskOwners[queueID] = owners + } + + if len(owners) >= maxPendingOwnersPerQueue { + return + } + owners[taskID] = ownerID +} + +// removePendingOwner removes the pending owner from the list if still there +func (f *FastTaskService) removePendingOwner(queueID, taskID string) { + f.pendingTaskOwnersLock.Lock() + defer f.pendingTaskOwnersLock.Unlock() + + owners, exists := f.pendingTaskOwners[queueID] + if !exists { + return + } + + delete(owners, taskID) + if len(owners) == 0 { + delete(f.pendingTaskOwners, queueID) + } +} + +// enqueuePendingOwners drains the pending owners list for the queue and enqueues them for reevaluation +func (f *FastTaskService) enqueuePendingOwners(queueID string) { + f.pendingTaskOwnersLock.Lock() + defer f.pendingTaskOwnersLock.Unlock() + + owners, exists := f.pendingTaskOwners[queueID] + if !exists { + return + } + + enqueued := make(map[types.NamespacedName]bool) + for _, ownerID := range owners { + if _, ok := enqueued[ownerID]; ok { + continue + } + if err := f.enqueueOwner(ownerID); err != nil { + logger.Warnf(context.Background(), "failed to enqueue owner %s: %+v", ownerID, err) + } + enqueued[ownerID] = true + } + + delete(f.pendingTaskOwners, queueID) +} + // OfferOnQueue offers a task to a worker on a specific queue. If no workers are available, an // empty string is returned. func (f *FastTaskService) OfferOnQueue(ctx context.Context, queueID, taskID, namespace, workflowID string, cmd []string) (string, error) { @@ -206,6 +290,7 @@ func (f *FastTaskService) OfferOnQueue(ctx context.Context, queueID, taskID, nam queue, exists := f.queues[queueID] if !exists { + f.addPendingOwner(queueID, taskID, types.NamespacedName{Namespace: namespace, Name: workflowID}) f.metrics.taskNoWorkersAvailable.Inc() return "", nil // no workers available } @@ -232,8 +317,11 @@ func (f *FastTaskService) OfferOnQueue(ctx context.Context, queueID, taskID, nam worker = acceptedWorkers[rand.Intn(len(acceptedWorkers))] worker.capacity.BacklogCount++ } else { + // No workers available. Note, we do not add to pending owners at this time as we are optimizing for the worker + // startup case. Theworker backlog should be sufficient to keep the worker busy without needing to proactively + // enqueue owners when capacity becomes available. f.metrics.taskNoCapacityAvailable.Inc() - return "", nil // no workers available + return "", nil } // send assign message to worker @@ -325,6 +413,10 @@ func (f *FastTaskService) Cleanup(ctx context.Context, taskID, queueID, workerID // delete task context f.taskStatusChannels.Delete(taskID) + + // remove pending owner + f.removePendingOwner(queueID, taskID) + return nil } @@ -332,8 +424,9 @@ func (f *FastTaskService) Cleanup(ctx context.Context, taskID, queueID, workerID func NewFastTaskService(enqueueOwner core.EnqueueOwner) *FastTaskService { scope := promutils.NewScope("fasttask") svc := &FastTaskService{ - enqueueOwner: enqueueOwner, - queues: make(map[string]*Queue), + enqueueOwner: enqueueOwner, + queues: make(map[string]*Queue), + pendingTaskOwners: make(map[string]map[string]types.NamespacedName), metrics: metrics{ taskNoWorkersAvailable: scope.MustNewCounter("task_no_workers_available", "Count of task assignment attempts with no workers available"), taskNoCapacityAvailable: scope.MustNewCounter("task_no_capacity_available", "Count of task assignment attempts with no capacity available"),