From c554a04b1f03787c2efbd5293afaa4c4e2e16add Mon Sep 17 00:00:00 2001 From: Wei Liu Date: Wed, 25 Dec 2024 11:53:55 +0800 Subject: [PATCH] fix review comment Signed-off-by: Wei Liu --- internal/querycoordv2/task/scheduler.go | 45 ++++++++++++++----------- 1 file changed, 25 insertions(+), 20 deletions(-) diff --git a/internal/querycoordv2/task/scheduler.go b/internal/querycoordv2/task/scheduler.go index f26c5b561a6a4..316f1a552be71 100644 --- a/internal/querycoordv2/task/scheduler.go +++ b/internal/querycoordv2/task/scheduler.go @@ -487,18 +487,15 @@ func (scheduler *taskScheduler) GetSegmentTaskDelta(nodeID, collectionID int64) scheduler.rwmutex.RLock() defer scheduler.rwmutex.RUnlock() - targetActions := make(map[int64][]Action, 0) - for _, t := range scheduler.segmentTasks { - if collectionID != -1 && collectionID != t.CollectionID() { + targetActions := make(map[int64][]Action) + for _, task := range scheduler.segmentTasks { // Map key: replicaSegmentIndex + taskCollID := task.CollectionID() + if collectionID != -1 && collectionID != taskCollID { continue } - for _, action := range t.Actions() { - if action.Node() == nodeID || nodeID == -1 { - if _, ok := targetActions[t.CollectionID()]; !ok { - targetActions[t.CollectionID()] = make([]Action, 0) - } - targetActions[t.CollectionID()] = append(targetActions[t.CollectionID()], action) - } + actions := filterActions(task.Actions(), nodeID) + if len(actions) > 0 { + targetActions[taskCollID] = append(targetActions[taskCollID], actions...) } } @@ -509,24 +506,32 @@ func (scheduler *taskScheduler) GetChannelTaskDelta(nodeID, collectionID int64) scheduler.rwmutex.RLock() defer scheduler.rwmutex.RUnlock() - targetActions := make(map[int64][]Action, 0) - for _, t := range scheduler.channelTasks { - if collectionID != -1 && collectionID != t.CollectionID() { + targetActions := make(map[int64][]Action) + for _, task := range scheduler.channelTasks { // Map key: replicaChannelIndex + taskCollID := task.CollectionID() + if collectionID != -1 && collectionID != taskCollID { continue } - for _, action := range t.Actions() { - if action.Node() == nodeID || nodeID == -1 { - if _, ok := targetActions[t.CollectionID()]; !ok { - targetActions[t.CollectionID()] = make([]Action, 0) - } - targetActions[t.CollectionID()] = append(targetActions[t.CollectionID()], action) - } + actions := filterActions(task.Actions(), nodeID) + if len(actions) > 0 { + targetActions[taskCollID] = append(targetActions[taskCollID], actions...) } } return scheduler.calculateTaskDelta(targetActions) } +// filter actions by nodeID +func filterActions(actions []Action, nodeID int64) []Action { + filtered := make([]Action, 0, len(actions)) + for _, action := range actions { + if nodeID == -1 || action.Node() == nodeID { + filtered = append(filtered, action) + } + } + return filtered +} + func (scheduler *taskScheduler) calculateTaskDelta(targetActions map[int64][]Action) int { sum := 0 for collectionID, actions := range targetActions {