Skip to content

Commit

Permalink
fix: Prevent balancer from overloading the same QueryNode
Browse files Browse the repository at this point in the history
The balancer calculates the workload of executing tasks as an ongoing score for target nodes.
However, a logic issue arises when GetSegmentTaskDelta or GetChannelTaskDelta is called
with collectionID=-1, which incorrectly returns zero.

Due to the incorrect global score, the executing task's workload is not properly reflected
for each collection. Consequently, each collection submits its own balance task,
leading to the balancer assigning excessive tasks to the same QueryNode.

Signed-off-by: Wei Liu <[email protected]>
  • Loading branch information
weiliu1031 committed Dec 24, 2024
1 parent 90de37e commit 34026c1
Show file tree
Hide file tree
Showing 2 changed files with 123 additions and 25 deletions.
58 changes: 33 additions & 25 deletions internal/querycoordv2/task/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -487,61 +487,69 @@ func (scheduler *taskScheduler) GetSegmentTaskDelta(nodeID, collectionID int64)
scheduler.rwmutex.RLock()
defer scheduler.rwmutex.RUnlock()

targetActions := make([]Action, 0)
targetActions := make(map[int64][]Action, 0)
for _, t := range scheduler.segmentTasks {
if collectionID != -1 && collectionID != t.CollectionID() {
continue
}
for _, action := range t.Actions() {
if action.Node() == nodeID {
targetActions = append(targetActions, action)
if action.Node() == nodeID || nodeID == -1 {
if _, ok := targetActions[t.CollectionID()]; !ok {
targetActions[t.CollectionID()] = make([]Action, 0)
}
targetActions[t.CollectionID()] = append(targetActions[t.ReplicaID()], action)
}
}
}

return scheduler.calculateTaskDelta(collectionID, targetActions)
return scheduler.calculateTaskDelta(targetActions)
}

func (scheduler *taskScheduler) GetChannelTaskDelta(nodeID, collectionID int64) int {
scheduler.rwmutex.RLock()
defer scheduler.rwmutex.RUnlock()

targetActions := make([]Action, 0)
targetActions := make(map[int64][]Action, 0)
for _, t := range scheduler.channelTasks {
if collectionID != -1 && collectionID != t.CollectionID() {
continue
}
for _, action := range t.Actions() {
if action.Node() == nodeID {
targetActions = append(targetActions, action)
if action.Node() == nodeID || nodeID == -1 {
if _, ok := targetActions[t.CollectionID()]; !ok {
targetActions[t.CollectionID()] = make([]Action, 0)
}
targetActions[t.CollectionID()] = append(targetActions[t.ReplicaID()], action)
}
}
}

return scheduler.calculateTaskDelta(collectionID, targetActions)
return scheduler.calculateTaskDelta(targetActions)
}

func (scheduler *taskScheduler) calculateTaskDelta(collectionID int64, targetActions []Action) int {
func (scheduler *taskScheduler) calculateTaskDelta(targetActions map[int64][]Action) int {
sum := 0
for _, action := range targetActions {
delta := 0
if action.Type() == ActionTypeGrow {
delta = 1
} else if action.Type() == ActionTypeReduce {
delta = -1
}
for collectionID, actions := range targetActions {
for _, action := range actions {
delta := 0
if action.Type() == ActionTypeGrow {
delta = 1
} else if action.Type() == ActionTypeReduce {
delta = -1
}

switch action := action.(type) {
case *SegmentAction:
// skip growing segment's count, cause doesn't know realtime row number of growing segment
if action.Scope == querypb.DataScope_Historical {
segment := scheduler.targetMgr.GetSealedSegment(scheduler.ctx, collectionID, action.SegmentID, meta.NextTargetFirst)
if segment != nil {
sum += int(segment.GetNumOfRows()) * delta
switch action := action.(type) {
case *SegmentAction:
// skip growing segment's count, cause doesn't know realtime row number of growing segment
if action.Scope == querypb.DataScope_Historical {
segment := scheduler.targetMgr.GetSealedSegment(scheduler.ctx, collectionID, action.SegmentID, meta.NextTargetFirst)
if segment != nil {
sum += int(segment.GetNumOfRows()) * delta
}
}
case *ChannelAction:
sum += delta
}
case *ChannelAction:
sum += delta
}
}
return sum
Expand Down
90 changes: 90 additions & 0 deletions internal/querycoordv2/task/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1855,6 +1855,96 @@ func (suite *TaskSuite) TestGetTasksJSON() {
suite.Equal(2, len(tasks))
}

func (suite *TaskSuite) TestCalculateTaskDelta() {
ctx := context.Background()
scheduler := suite.newScheduler()

mockTarget := meta.NewMockTargetManager(suite.T())
mockTarget.EXPECT().GetSealedSegment(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&datapb.SegmentInfo{
NumOfRows: 100,
})
scheduler.targetMgr = mockTarget

coll := int64(1001)
nodeID := int64(1)
channelName := "channel-1"
segmentID := int64(1)
// add segment task for collection
task1, err := NewSegmentTask(
ctx,
10*time.Second,
WrapIDSource(0),
coll,
suite.replica,
NewSegmentActionWithScope(nodeID, ActionTypeGrow, "", segmentID, querypb.DataScope_Historical),
)
suite.NoError(err)
err = scheduler.Add(task1)
suite.NoError(err)
task2, err := NewChannelTask(
ctx,
10*time.Second,
WrapIDSource(0),
coll,
suite.replica,
NewChannelAction(nodeID, ActionTypeGrow, channelName),
)
suite.NoError(err)
err = scheduler.Add(task2)
suite.NoError(err)

coll2 := int64(1005)
nodeID2 := int64(2)
channelName2 := "channel-2"
segmentID2 := int64(2)
task3, err := NewSegmentTask(
ctx,
10*time.Second,
WrapIDSource(0),
coll2,
suite.replica,
NewSegmentActionWithScope(nodeID2, ActionTypeGrow, "", segmentID2, querypb.DataScope_Historical),
)
suite.NoError(err)
err = scheduler.Add(task3)
suite.NoError(err)
task4, err := NewChannelTask(
ctx,
10*time.Second,
WrapIDSource(0),
coll2,
suite.replica,
NewChannelAction(nodeID2, ActionTypeGrow, channelName2),
)
suite.NoError(err)
err = scheduler.Add(task4)
suite.NoError(err)

// check task delta with collectionID and nodeID
suite.Equal(100, scheduler.GetSegmentTaskDelta(nodeID, coll))
suite.Equal(1, scheduler.GetChannelTaskDelta(nodeID, coll))
suite.Equal(100, scheduler.GetSegmentTaskDelta(nodeID2, coll2))
suite.Equal(1, scheduler.GetChannelTaskDelta(nodeID2, coll2))

// check task delta with collectionID=-1
suite.Equal(100, scheduler.GetSegmentTaskDelta(nodeID, -1))
suite.Equal(1, scheduler.GetChannelTaskDelta(nodeID, -1))
suite.Equal(100, scheduler.GetSegmentTaskDelta(nodeID2, -1))
suite.Equal(1, scheduler.GetChannelTaskDelta(nodeID2, -1))

// check task delta with nodeID=-1
suite.Equal(100, scheduler.GetSegmentTaskDelta(-1, coll))
suite.Equal(1, scheduler.GetChannelTaskDelta(-1, coll))
suite.Equal(100, scheduler.GetSegmentTaskDelta(-1, coll))
suite.Equal(1, scheduler.GetChannelTaskDelta(-1, coll))

// check task delta with nodeID=-1 and collectionID=-1
suite.Equal(200, scheduler.GetSegmentTaskDelta(-1, -1))
suite.Equal(2, scheduler.GetChannelTaskDelta(-1, -1))
suite.Equal(200, scheduler.GetSegmentTaskDelta(-1, -1))
suite.Equal(2, scheduler.GetChannelTaskDelta(-1, -1))
}

func TestTask(t *testing.T) {
suite.Run(t, new(TaskSuite))
}
Expand Down

0 comments on commit 34026c1

Please sign in to comment.