From 5dfa1c33972cb5c9167c29bd1180d31f1cb9f468 Mon Sep 17 00:00:00 2001 From: wei liu Date: Thu, 26 Sep 2024 15:13:15 +0800 Subject: [PATCH] fix: Segment unbalance after many times load/release (#36537) issue: #36536 query coord use `segmentTaskDeleta/channelTaskDelta` to measure the executing workload for querynode in scheduler, and we maintains the `segmentTaskDeleta/channelTaskDelta` by `scheulder.Add(task)` and `scheduler.remove(task)`, but `scheduler.remove(task)` has been called in unexpected way, which cause a wrong `segmentTaskDeleta/channelTaskDelta` value and affect the segment assign logic, causes segment unbalance. This PR moves to compute the `segmentTaskDeleta/channelTaskDelta` when access, to avoid the wrong value affect. Signed-off-by: Wei Liu --- internal/querycoordv2/task/scheduler.go | 126 ++++++++++-------------- 1 file changed, 50 insertions(+), 76 deletions(-) diff --git a/internal/querycoordv2/task/scheduler.go b/internal/querycoordv2/task/scheduler.go index 4dd3561305124..b2f93b94fbfa3 100644 --- a/internal/querycoordv2/task/scheduler.go +++ b/internal/querycoordv2/task/scheduler.go @@ -167,11 +167,6 @@ type taskScheduler struct { channelTasks map[replicaChannelIndex]Task processQueue *taskQueue waitQueue *taskQueue - - // executing task delta changes on node: nodeID -> collectionID -> delta changes - // delta changes measure by segment row count and channel num - segmentExecutingTaskDelta map[int64]map[int64]int - channelExecutingTaskDelta map[int64]map[int64]int } func NewScheduler(ctx context.Context, @@ -198,13 +193,11 @@ func NewScheduler(ctx context.Context, cluster: cluster, nodeMgr: nodeMgr, - tasks: make(UniqueSet), - segmentTasks: make(map[replicaSegmentIndex]Task), - channelTasks: make(map[replicaChannelIndex]Task), - processQueue: newTaskQueue(), - waitQueue: newTaskQueue(), - segmentExecutingTaskDelta: make(map[int64]map[int64]int), - channelExecutingTaskDelta: make(map[int64]map[int64]int), + tasks: make(UniqueSet), + segmentTasks: make(map[replicaSegmentIndex]Task), + channelTasks: make(map[replicaChannelIndex]Task), + processQueue: newTaskQueue(), + waitQueue: newTaskQueue(), } } @@ -217,8 +210,6 @@ func (scheduler *taskScheduler) Stop() { for nodeID, executor := range scheduler.executors { executor.Stop() delete(scheduler.executors, nodeID) - delete(scheduler.segmentExecutingTaskDelta, nodeID) - delete(scheduler.channelExecutingTaskDelta, nodeID) } for _, task := range scheduler.segmentTasks { @@ -244,8 +235,6 @@ func (scheduler *taskScheduler) AddExecutor(nodeID int64) { scheduler.cluster, scheduler.nodeMgr) - scheduler.segmentExecutingTaskDelta[nodeID] = make(map[int64]int) - scheduler.channelExecutingTaskDelta[nodeID] = make(map[int64]int) scheduler.executors[nodeID] = executor executor.Start(scheduler.ctx) log.Info("add executor for new QueryNode", zap.Int64("nodeID", nodeID)) @@ -259,8 +248,6 @@ func (scheduler *taskScheduler) RemoveExecutor(nodeID int64) { if ok { executor.Stop() delete(scheduler.executors, nodeID) - delete(scheduler.segmentExecutingTaskDelta, nodeID) - delete(scheduler.channelExecutingTaskDelta, nodeID) log.Info("remove executor of offline QueryNode", zap.Int64("nodeID", nodeID)) } } @@ -293,52 +280,11 @@ func (scheduler *taskScheduler) Add(task Task) error { } scheduler.updateTaskMetrics() - scheduler.updateTaskDelta(task) - log.Ctx(task.Context()).Info("task added", zap.String("task", task.String())) task.RecordStartTs() return nil } -func (scheduler *taskScheduler) updateTaskDelta(task Task) { - var delta int - var deltaMap map[int64]map[int64]int - switch task := task.(type) { - case *SegmentTask: - // skip growing segment's count, cause doesn't know realtime row number of growing segment - if task.Actions()[0].(*SegmentAction).Scope() == querypb.DataScope_Historical { - segment := scheduler.targetMgr.GetSealedSegment(task.CollectionID(), task.SegmentID(), meta.NextTargetFirst) - if segment != nil { - delta = int(segment.GetNumOfRows()) - } - } - - deltaMap = scheduler.segmentExecutingTaskDelta - - case *ChannelTask: - delta = 1 - deltaMap = scheduler.channelExecutingTaskDelta - } - - // turn delta to negative when try to remove task - if task.Status() == TaskStatusSucceeded || task.Status() == TaskStatusFailed || task.Status() == TaskStatusCanceled { - delta = -delta - } - - if delta != 0 { - for _, action := range task.Actions() { - if deltaMap[action.Node()] == nil { - deltaMap[action.Node()] = make(map[int64]int) - } - if action.Type() == ActionTypeGrow { - deltaMap[action.Node()][task.CollectionID()] += delta - } else if action.Type() == ActionTypeReduce { - deltaMap[action.Node()][task.CollectionID()] -= delta - } - } - } -} - func (scheduler *taskScheduler) updateTaskMetrics() { segmentGrowNum, segmentReduceNum, segmentMoveNum := 0, 0, 0 channelGrowNum, channelReduceNum, channelMoveNum := 0, 0, 0 @@ -533,34 +479,63 @@ func (scheduler *taskScheduler) GetSegmentTaskDelta(nodeID, collectionID int64) scheduler.rwmutex.RLock() defer scheduler.rwmutex.RUnlock() - return scheduler.calculateTaskDelta(nodeID, collectionID, scheduler.segmentExecutingTaskDelta) + targetActions := make([]Action, 0) + for _, t := range scheduler.segmentTasks { + if collectionID != -1 && collectionID != t.CollectionID() { + continue + } + for _, action := range t.Actions() { + if action.Node() == nodeID { + targetActions = append(targetActions, action) + } + } + } + + return scheduler.calculateTaskDelta(collectionID, targetActions) } func (scheduler *taskScheduler) GetChannelTaskDelta(nodeID, collectionID int64) int { scheduler.rwmutex.RLock() defer scheduler.rwmutex.RUnlock() - return scheduler.calculateTaskDelta(nodeID, collectionID, scheduler.channelExecutingTaskDelta) -} - -func (scheduler *taskScheduler) calculateTaskDelta(nodeID, collectionID int64, deltaMap map[int64]map[int64]int) int { - if nodeID == -1 && collectionID == -1 { - return 0 - } - - sum := 0 - for nid, nInfo := range deltaMap { - if nid != nodeID && -1 != nodeID { + targetActions := make([]Action, 0) + for _, t := range scheduler.channelTasks { + if collectionID != -1 && collectionID != t.CollectionID() { continue } - - for cid, cInfo := range nInfo { - if cid == collectionID || -1 == collectionID { - sum += cInfo + for _, action := range t.Actions() { + if action.Node() == nodeID { + targetActions = append(targetActions, action) } } } + return scheduler.calculateTaskDelta(collectionID, targetActions) +} + +func (scheduler *taskScheduler) calculateTaskDelta(collectionID int64, targetActions []Action) int { + sum := 0 + for _, action := range targetActions { + delta := 0 + if action.Type() == ActionTypeGrow { + delta = 1 + } else if action.Type() == ActionTypeReduce { + delta = -1 + } + + switch action := action.(type) { + case *SegmentAction: + // skip growing segment's count, cause doesn't know realtime row number of growing segment + if action.Scope() == querypb.DataScope_Historical { + segment := scheduler.targetMgr.GetSealedSegment(collectionID, action.segmentID, meta.NextTargetFirst) + if segment != nil { + sum += int(segment.GetNumOfRows()) * delta + } + } + case *ChannelAction: + sum += delta + } + } return sum } @@ -836,7 +811,6 @@ func (scheduler *taskScheduler) remove(task Task) { log = log.With(zap.Int64("segmentID", task.SegmentID())) } - scheduler.updateTaskDelta(task) scheduler.updateTaskMetrics() log.Info("task removed")