From 1d5a58d819daa0e3b7dc83347062edaa8fc66b9f Mon Sep 17 00:00:00 2001 From: Wei Liu Date: Tue, 24 Dec 2024 20:04:01 +0800 Subject: [PATCH 1/2] fix: [2.4] Prevent balancer from overloading the same QueryNode The balancer calculates the workload of executing tasks as an ongoing score for target nodes. However, a logic issue arises when GetSegmentTaskDelta or GetChannelTaskDelta is called with collectionID=-1, which incorrectly returns zero. Due to the incorrect global score, the executing task's workload is not properly reflected for each collection. Consequently, each collection submits its own balance task, leading to the balancer assigning excessive tasks to the same QueryNode. Signed-off-by: Wei Liu --- internal/querycoordv2/task/scheduler.go | 58 +++++++++------- internal/querycoordv2/task/task_test.go | 90 +++++++++++++++++++++++++ 2 files changed, 123 insertions(+), 25 deletions(-) diff --git a/internal/querycoordv2/task/scheduler.go b/internal/querycoordv2/task/scheduler.go index 416d3ea1d4675..0958b602cbae3 100644 --- a/internal/querycoordv2/task/scheduler.go +++ b/internal/querycoordv2/task/scheduler.go @@ -481,61 +481,69 @@ func (scheduler *taskScheduler) GetSegmentTaskDelta(nodeID, collectionID int64) scheduler.rwmutex.RLock() defer scheduler.rwmutex.RUnlock() - targetActions := make([]Action, 0) + targetActions := make(map[int64][]Action, 0) for _, t := range scheduler.segmentTasks { if collectionID != -1 && collectionID != t.CollectionID() { continue } for _, action := range t.Actions() { - if action.Node() == nodeID { - targetActions = append(targetActions, action) + if action.Node() == nodeID || nodeID == -1 { + if _, ok := targetActions[t.CollectionID()]; !ok { + targetActions[t.CollectionID()] = make([]Action, 0) + } + targetActions[t.CollectionID()] = append(targetActions[t.ReplicaID()], action) } } } - return scheduler.calculateTaskDelta(collectionID, targetActions) + return scheduler.calculateTaskDelta(targetActions) } func (scheduler *taskScheduler) GetChannelTaskDelta(nodeID, collectionID int64) int { scheduler.rwmutex.RLock() defer scheduler.rwmutex.RUnlock() - targetActions := make([]Action, 0) + targetActions := make(map[int64][]Action, 0) for _, t := range scheduler.channelTasks { if collectionID != -1 && collectionID != t.CollectionID() { continue } for _, action := range t.Actions() { - if action.Node() == nodeID { - targetActions = append(targetActions, action) + if action.Node() == nodeID || nodeID == -1 { + if _, ok := targetActions[t.CollectionID()]; !ok { + targetActions[t.CollectionID()] = make([]Action, 0) + } + targetActions[t.CollectionID()] = append(targetActions[t.ReplicaID()], action) } } } - return scheduler.calculateTaskDelta(collectionID, targetActions) + return scheduler.calculateTaskDelta(targetActions) } -func (scheduler *taskScheduler) calculateTaskDelta(collectionID int64, targetActions []Action) int { +func (scheduler *taskScheduler) calculateTaskDelta(targetActions map[int64][]Action) int { sum := 0 - for _, action := range targetActions { - delta := 0 - if action.Type() == ActionTypeGrow { - delta = 1 - } else if action.Type() == ActionTypeReduce { - delta = -1 - } + for collectionID, actions := range targetActions { + for _, action := range actions { + delta := 0 + if action.Type() == ActionTypeGrow { + delta = 1 + } else if action.Type() == ActionTypeReduce { + delta = -1 + } - switch action := action.(type) { - case *SegmentAction: - // skip growing segment's count, cause doesn't know realtime row number of growing segment - if action.Scope() == querypb.DataScope_Historical { - segment := scheduler.targetMgr.GetSealedSegment(collectionID, action.segmentID, meta.NextTargetFirst) - if segment != nil { - sum += int(segment.GetNumOfRows()) * delta + switch action := action.(type) { + case *SegmentAction: + // skip growing segment's count, cause doesn't know realtime row number of growing segment + if action.Scope() == querypb.DataScope_Historical { + segment := scheduler.targetMgr.GetSealedSegment(collectionID, action.SegmentID(), meta.NextTargetFirst) + if segment != nil { + sum += int(segment.GetNumOfRows()) * delta + } } + case *ChannelAction: + sum += delta } - case *ChannelAction: - sum += delta } } return sum diff --git a/internal/querycoordv2/task/task_test.go b/internal/querycoordv2/task/task_test.go index 9e82566bfee6b..f9aa3fbe41cb8 100644 --- a/internal/querycoordv2/task/task_test.go +++ b/internal/querycoordv2/task/task_test.go @@ -1810,6 +1810,96 @@ func (suite *TaskSuite) TestBalanceChannelWithL0SegmentTask() { suite.Equal(2, task.step) } +func (suite *TaskSuite) TestCalculateTaskDelta() { + ctx := context.Background() + scheduler := suite.newScheduler() + + mockTarget := meta.NewMockTargetManager(suite.T()) + mockTarget.EXPECT().GetSealedSegment(mock.Anything, mock.Anything, mock.Anything).Return(&datapb.SegmentInfo{ + NumOfRows: 100, + }) + scheduler.targetMgr = mockTarget + + coll := int64(1001) + nodeID := int64(1) + channelName := "channel-1" + segmentID := int64(1) + // add segment task for collection + task1, err := NewSegmentTask( + ctx, + 10*time.Second, + WrapIDSource(0), + coll, + suite.replica, + NewSegmentActionWithScope(nodeID, ActionTypeGrow, "", segmentID, querypb.DataScope_Historical), + ) + suite.NoError(err) + err = scheduler.Add(task1) + suite.NoError(err) + task2, err := NewChannelTask( + ctx, + 10*time.Second, + WrapIDSource(0), + coll, + suite.replica, + NewChannelAction(nodeID, ActionTypeGrow, channelName), + ) + suite.NoError(err) + err = scheduler.Add(task2) + suite.NoError(err) + + coll2 := int64(1005) + nodeID2 := int64(2) + channelName2 := "channel-2" + segmentID2 := int64(2) + task3, err := NewSegmentTask( + ctx, + 10*time.Second, + WrapIDSource(0), + coll2, + suite.replica, + NewSegmentActionWithScope(nodeID2, ActionTypeGrow, "", segmentID2, querypb.DataScope_Historical), + ) + suite.NoError(err) + err = scheduler.Add(task3) + suite.NoError(err) + task4, err := NewChannelTask( + ctx, + 10*time.Second, + WrapIDSource(0), + coll2, + suite.replica, + NewChannelAction(nodeID2, ActionTypeGrow, channelName2), + ) + suite.NoError(err) + err = scheduler.Add(task4) + suite.NoError(err) + + // check task delta with collectionID and nodeID + suite.Equal(100, scheduler.GetSegmentTaskDelta(nodeID, coll)) + suite.Equal(1, scheduler.GetChannelTaskDelta(nodeID, coll)) + suite.Equal(100, scheduler.GetSegmentTaskDelta(nodeID2, coll2)) + suite.Equal(1, scheduler.GetChannelTaskDelta(nodeID2, coll2)) + + // check task delta with collectionID=-1 + suite.Equal(100, scheduler.GetSegmentTaskDelta(nodeID, -1)) + suite.Equal(1, scheduler.GetChannelTaskDelta(nodeID, -1)) + suite.Equal(100, scheduler.GetSegmentTaskDelta(nodeID2, -1)) + suite.Equal(1, scheduler.GetChannelTaskDelta(nodeID2, -1)) + + // check task delta with nodeID=-1 + suite.Equal(100, scheduler.GetSegmentTaskDelta(-1, coll)) + suite.Equal(1, scheduler.GetChannelTaskDelta(-1, coll)) + suite.Equal(100, scheduler.GetSegmentTaskDelta(-1, coll)) + suite.Equal(1, scheduler.GetChannelTaskDelta(-1, coll)) + + // check task delta with nodeID=-1 and collectionID=-1 + suite.Equal(200, scheduler.GetSegmentTaskDelta(-1, -1)) + suite.Equal(2, scheduler.GetChannelTaskDelta(-1, -1)) + suite.Equal(200, scheduler.GetSegmentTaskDelta(-1, -1)) + suite.Equal(2, scheduler.GetChannelTaskDelta(-1, -1)) +} + func TestTask(t *testing.T) { suite.Run(t, new(TaskSuite)) } From 6fd0ad3830d02875fadee4f655bcada01b035cef Mon Sep 17 00:00:00 2001 From: Wei Liu Date: Wed, 25 Dec 2024 11:53:55 +0800 Subject: [PATCH 2/2] fix review comment Signed-off-by: Wei Liu --- internal/querycoordv2/task/scheduler.go | 45 ++++++++++++++----------- 1 file changed, 25 insertions(+), 20 deletions(-) diff --git a/internal/querycoordv2/task/scheduler.go b/internal/querycoordv2/task/scheduler.go index 0958b602cbae3..54945ed915edf 100644 --- a/internal/querycoordv2/task/scheduler.go +++ b/internal/querycoordv2/task/scheduler.go @@ -481,18 +481,15 @@ func (scheduler *taskScheduler) GetSegmentTaskDelta(nodeID, collectionID int64) scheduler.rwmutex.RLock() defer scheduler.rwmutex.RUnlock() - targetActions := make(map[int64][]Action, 0) - for _, t := range scheduler.segmentTasks { - if collectionID != -1 && collectionID != t.CollectionID() { + targetActions := make(map[int64][]Action) + for _, task := range scheduler.segmentTasks { // Map key: replicaSegmentIndex + taskCollID := task.CollectionID() + if collectionID != -1 && collectionID != taskCollID { continue } - for _, action := range t.Actions() { - if action.Node() == nodeID || nodeID == -1 { - if _, ok := targetActions[t.CollectionID()]; !ok { - targetActions[t.CollectionID()] = make([]Action, 0) - } - targetActions[t.CollectionID()] = append(targetActions[t.ReplicaID()], action) - } + actions := filterActions(task.Actions(), nodeID) + if len(actions) > 0 { + targetActions[taskCollID] = append(targetActions[taskCollID], actions...) } } @@ -503,24 +500,32 @@ func (scheduler *taskScheduler) GetChannelTaskDelta(nodeID, collectionID int64) scheduler.rwmutex.RLock() defer scheduler.rwmutex.RUnlock() - targetActions := make(map[int64][]Action, 0) - for _, t := range scheduler.channelTasks { - if collectionID != -1 && collectionID != t.CollectionID() { + targetActions := make(map[int64][]Action) + for _, task := range scheduler.channelTasks { // Map key: replicaChannelIndex + taskCollID := task.CollectionID() + if collectionID != -1 && collectionID != taskCollID { continue } - for _, action := range t.Actions() { - if action.Node() == nodeID || nodeID == -1 { - if _, ok := targetActions[t.CollectionID()]; !ok { - targetActions[t.CollectionID()] = make([]Action, 0) - } - targetActions[t.CollectionID()] = append(targetActions[t.ReplicaID()], action) - } + actions := filterActions(task.Actions(), nodeID) + if len(actions) > 0 { + targetActions[taskCollID] = append(targetActions[taskCollID], actions...) } } return scheduler.calculateTaskDelta(targetActions) } +// filter actions by nodeID +func filterActions(actions []Action, nodeID int64) []Action { + filtered := make([]Action, 0, len(actions)) + for _, action := range actions { + if nodeID == -1 || action.Node() == nodeID { + filtered = append(filtered, action) + } + } + return filtered +} + func (scheduler *taskScheduler) calculateTaskDelta(targetActions map[int64][]Action) int { sum := 0 for collectionID, actions := range targetActions {