Skip to content

Commit

Permalink
enhance: Avoid assign too much segment/channels to new querynode
Browse files Browse the repository at this point in the history
When a new query node comes online, the segment_checker, channel_checker, and balance_checker simultaneously attempt to allocate segments to it. If this occurs during the execution of a load task and the distribution of the new query node hasn't been updated, the query coordinator may mistakenly view the new query node as empty. As a result, it assigns segments or channels to it, potentially overloading the new query node with more segments or channels than expected.

This PR measures the workload of the executing tasks on the target query node to prevent assigning an excessive number of segments to it.

Signed-off-by: Wei Liu <[email protected]>
  • Loading branch information
weiliu1031 committed Jun 25, 2024
1 parent 940a0ac commit cc7bb0a
Show file tree
Hide file tree
Showing 11 changed files with 302 additions and 130 deletions.
4 changes: 2 additions & 2 deletions internal/querycoordv2/balance/balance.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func (b *RoundRobinBalancer) AssignSegment(collectionID int64, segments []*meta.
sort.Slice(nodesInfo, func(i, j int) bool {
cnt1, cnt2 := nodesInfo[i].SegmentCnt(), nodesInfo[j].SegmentCnt()
id1, id2 := nodesInfo[i].ID(), nodesInfo[j].ID()
delta1, delta2 := b.scheduler.GetNodeSegmentDelta(id1), b.scheduler.GetNodeSegmentDelta(id2)
delta1, delta2 := b.scheduler.GetSegmentTaskDelta(id1, -1), b.scheduler.GetSegmentTaskDelta(id2, -1)
return cnt1+delta1 < cnt2+delta2
})
ret := make([]SegmentAssignPlan, 0, len(segments))
Expand Down Expand Up @@ -112,7 +112,7 @@ func (b *RoundRobinBalancer) AssignChannel(channels []*meta.DmChannel, nodes []i
sort.Slice(nodesInfo, func(i, j int) bool {
cnt1, cnt2 := nodesInfo[i].ChannelCnt(), nodesInfo[j].ChannelCnt()
id1, id2 := nodesInfo[i].ID(), nodesInfo[j].ID()
delta1, delta2 := b.scheduler.GetNodeChannelDelta(id1), b.scheduler.GetNodeChannelDelta(id2)
delta1, delta2 := b.scheduler.GetChannelTaskDelta(id1, -1), b.scheduler.GetChannelTaskDelta(id2, -1)
return cnt1+delta1 < cnt2+delta2
})
ret := make([]ChannelAssignPlan, 0, len(channels))
Expand Down
10 changes: 8 additions & 2 deletions internal/querycoordv2/balance/balance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package balance
import (
"testing"

"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"

"github.com/milvus-io/milvus/internal/proto/datapb"
Expand All @@ -38,6 +39,9 @@ func (suite *BalanceTestSuite) SetupTest() {
nodeManager := session.NewNodeManager()
suite.mockScheduler = task.NewMockScheduler(suite.T())
suite.roundRobinBalancer = NewRoundRobinBalancer(suite.mockScheduler, nodeManager)

suite.mockScheduler.EXPECT().GetSegmentTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe()
suite.mockScheduler.EXPECT().GetChannelTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe()
}

func (suite *BalanceTestSuite) TestAssignBalance() {
Expand Down Expand Up @@ -85,6 +89,7 @@ func (suite *BalanceTestSuite) TestAssignBalance() {
for _, c := range cases {
suite.Run(c.name, func() {
suite.SetupTest()
suite.mockScheduler.ExpectedCalls = nil
for i := range c.nodeIDs {
nodeInfo := session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: c.nodeIDs[i],
Expand All @@ -95,7 +100,7 @@ func (suite *BalanceTestSuite) TestAssignBalance() {
nodeInfo.SetState(c.states[i])
suite.roundRobinBalancer.nodeManager.Add(nodeInfo)
if !nodeInfo.IsStoppingState() {
suite.mockScheduler.EXPECT().GetNodeSegmentDelta(c.nodeIDs[i]).Return(c.deltaCnts[i])
suite.mockScheduler.EXPECT().GetSegmentTaskDelta(c.nodeIDs[i], int64(-1)).Return(c.deltaCnts[i])
}
}
plans := suite.roundRobinBalancer.AssignSegment(0, c.assignments, c.nodeIDs, false)
Expand Down Expand Up @@ -149,6 +154,7 @@ func (suite *BalanceTestSuite) TestAssignChannel() {
for _, c := range cases {
suite.Run(c.name, func() {
suite.SetupTest()
suite.mockScheduler.ExpectedCalls = nil
for i := range c.nodeIDs {
nodeInfo := session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: c.nodeIDs[i],
Expand All @@ -160,7 +166,7 @@ func (suite *BalanceTestSuite) TestAssignChannel() {
nodeInfo.SetState(c.states[i])
suite.roundRobinBalancer.nodeManager.Add(nodeInfo)
if !nodeInfo.IsStoppingState() {
suite.mockScheduler.EXPECT().GetNodeChannelDelta(c.nodeIDs[i]).Return(c.deltaCnts[i])
suite.mockScheduler.EXPECT().GetChannelTaskDelta(c.nodeIDs[i], int64(-1)).Return(c.deltaCnts[i])
}
}
plans := suite.roundRobinBalancer.AssignChannel(c.assignments, c.nodeIDs, false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ func (suite *ChannelLevelScoreBalancerTestSuite) SetupTest() {
distManager := meta.NewDistributionManager()
suite.mockScheduler = task.NewMockScheduler(suite.T())
suite.balancer = NewChannelLevelScoreBalancer(suite.mockScheduler, nodeManager, distManager, testMeta, testTarget)

suite.mockScheduler.EXPECT().GetSegmentTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe()
suite.mockScheduler.EXPECT().GetChannelTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe()
}

func (suite *ChannelLevelScoreBalancerTestSuite) TearDownTest() {
Expand Down Expand Up @@ -685,11 +688,8 @@ func (suite *ChannelLevelScoreBalancerTestSuite) TestStoppedBalance() {
expectChannelPlans: []ChannelAssignPlan{},
},
}
for i, c := range cases {
for _, c := range cases {
suite.Run(c.name, func() {
if i == 0 {
suite.mockScheduler.Mock.On("GetNodeChannelDelta", mock.Anything).Return(0).Maybe()
}
suite.SetupSuite()
defer suite.TearDownTest()
balancer := suite.balancer
Expand Down
8 changes: 7 additions & 1 deletion internal/querycoordv2/balance/rowcount_based_balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,9 @@ func (b *RowCountBasedBalancer) convertToNodeItemsBySegment(nodeIDs []int64) []*
rowcnt += int(view.NumOfGrowingRows)
}

// calculate executing task cost in scheduler
rowcnt += b.scheduler.GetSegmentTaskDelta(node, -1)

// more row count, less priority
nodeItem := newNodeItem(rowcnt, node)
ret = append(ret, &nodeItem)
Expand All @@ -152,8 +155,11 @@ func (b *RowCountBasedBalancer) convertToNodeItemsByChannel(nodeIDs []int64) []*
for _, node := range nodeIDs {
channels := b.dist.ChannelDistManager.GetByFilter(meta.WithNodeID2Channel(node))

channelCount := len(channels)
// calculate executing task cost in scheduler
channelCount += b.scheduler.GetChannelTaskDelta(node, -1)
// more channel num, less priority
nodeItem := newNodeItem(len(channels), node)
nodeItem := newNodeItem(channelCount, node)
ret = append(ret, &nodeItem)
}
return ret
Expand Down
7 changes: 3 additions & 4 deletions internal/querycoordv2/balance/rowcount_based_balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ func (suite *RowCountBasedBalancerTestSuite) SetupTest() {
suite.balancer = NewRowCountBasedBalancer(suite.mockScheduler, nodeManager, distManager, testMeta, testTarget)

suite.broker.EXPECT().GetPartitions(mock.Anything, int64(1)).Return([]int64{1}, nil).Maybe()

suite.mockScheduler.EXPECT().GetSegmentTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe()
suite.mockScheduler.EXPECT().GetChannelTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe()
}

func (suite *RowCountBasedBalancerTestSuite) TearDownTest() {
Expand Down Expand Up @@ -461,7 +464,6 @@ func (suite *RowCountBasedBalancerTestSuite) TestBalance() {
balancer.targetMgr.UpdateCollectionNextTarget(int64(1))
balancer.targetMgr.UpdateCollectionCurrentTarget(1)
balancer.targetMgr.UpdateCollectionNextTarget(int64(1))
suite.mockScheduler.Mock.On("GetNodeChannelDelta", mock.Anything).Return(0).Maybe()
for node, s := range c.distributions {
balancer.dist.SegmentDistManager.Update(node, s...)
}
Expand Down Expand Up @@ -675,7 +677,6 @@ func (suite *RowCountBasedBalancerTestSuite) TestBalanceOnPartStopping() {
suite.broker.ExpectedCalls = nil
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, int64(1)).Return(nil, c.segmentInNext, nil)
balancer.targetMgr.UpdateCollectionNextTarget(int64(1))
suite.mockScheduler.Mock.On("GetNodeChannelDelta", mock.Anything).Return(0).Maybe()
for node, s := range c.distributions {
balancer.dist.SegmentDistManager.Update(node, s...)
}
Expand Down Expand Up @@ -780,7 +781,6 @@ func (suite *RowCountBasedBalancerTestSuite) TestBalanceOutboundNodes() {
},
}

suite.mockScheduler.Mock.On("GetNodeChannelDelta", mock.Anything).Return(0).Maybe()
for _, c := range cases {
suite.Run(c.name, func() {
suite.SetupSuite()
Expand Down Expand Up @@ -1052,7 +1052,6 @@ func (suite *RowCountBasedBalancerTestSuite) TestDisableBalanceChannel() {
balancer.targetMgr.UpdateCollectionNextTarget(int64(1))
balancer.targetMgr.UpdateCollectionCurrentTarget(1)
balancer.targetMgr.UpdateCollectionNextTarget(int64(1))
suite.mockScheduler.Mock.On("GetNodeChannelDelta", mock.Anything).Return(0).Maybe()
for node, s := range c.distributions {
balancer.dist.SegmentDistManager.Update(node, s...)
}
Expand Down
15 changes: 11 additions & 4 deletions internal/querycoordv2/balance/score_based_balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,19 +150,22 @@ func (b *ScoreBasedBalancer) convertToNodeItems(collectionID int64, nodeIDs []in
}

func (b *ScoreBasedBalancer) calculateScore(collectionID, nodeID int64) int {
rowCount := 0
nodeRowCount := 0
// calculate global sealed segment row count
globalSegments := b.dist.SegmentDistManager.GetByFilter(meta.WithNodeID(nodeID))
for _, s := range globalSegments {
rowCount += int(s.GetNumOfRows())
nodeRowCount += int(s.GetNumOfRows())
}

// calculate global growing segment row count
views := b.dist.LeaderViewManager.GetByFilter(meta.WithNodeID2LeaderView(nodeID))
for _, view := range views {
rowCount += int(float64(view.NumOfGrowingRows) * params.Params.QueryCoordCfg.GrowingRowCountWeight.GetAsFloat())
nodeRowCount += int(float64(view.NumOfGrowingRows) * params.Params.QueryCoordCfg.GrowingRowCountWeight.GetAsFloat())
}

// calculate executing task cost in scheduler
nodeRowCount += b.scheduler.GetSegmentTaskDelta(nodeID, -1)

collectionRowCount := 0
// calculate collection sealed segment row count
collectionSegments := b.dist.SegmentDistManager.GetByFilter(meta.WithCollectionID(collectionID), meta.WithNodeID(nodeID))
Expand All @@ -175,7 +178,11 @@ func (b *ScoreBasedBalancer) calculateScore(collectionID, nodeID int64) int {
for _, view := range collectionViews {
collectionRowCount += int(float64(view.NumOfGrowingRows) * params.Params.QueryCoordCfg.GrowingRowCountWeight.GetAsFloat())
}
return collectionRowCount + int(float64(rowCount)*

// calculate executing task cost in scheduler
collectionRowCount += b.scheduler.GetSegmentTaskDelta(nodeID, collectionID)

return collectionRowCount + int(float64(nodeRowCount)*
params.Params.QueryCoordCfg.GlobalRowCountFactor.GetAsFloat())
}

Expand Down
105 changes: 101 additions & 4 deletions internal/querycoordv2/balance/score_based_balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ func (suite *ScoreBasedBalancerTestSuite) SetupTest() {
distManager := meta.NewDistributionManager()
suite.mockScheduler = task.NewMockScheduler(suite.T())
suite.balancer = NewScoreBasedBalancer(suite.mockScheduler, nodeManager, distManager, testMeta, testTarget)

suite.mockScheduler.EXPECT().GetSegmentTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe()
suite.mockScheduler.EXPECT().GetChannelTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe()
}

func (suite *ScoreBasedBalancerTestSuite) TearDownTest() {
Expand Down Expand Up @@ -449,6 +452,103 @@ func (suite *ScoreBasedBalancerTestSuite) TestBalanceOneRound() {
}
}

func (suite *ScoreBasedBalancerTestSuite) TestBalanceWithExecutingTask() {
cases := []struct {
name string
nodes []int64
collectionID int64
replicaID int64
collectionsSegments []*datapb.SegmentInfo
states []session.State
shouldMock bool
distributions map[int64][]*meta.Segment
distributionChannels map[int64][]*meta.DmChannel
deltaCounts []int
expectPlans []SegmentAssignPlan
expectChannelPlans []ChannelAssignPlan
}{
{
name: "normal balance for one collection only",
nodes: []int64{1, 2, 3},
deltaCounts: []int{30, 0, 0},
collectionID: 1,
replicaID: 1,
collectionsSegments: []*datapb.SegmentInfo{
{ID: 1, PartitionID: 1}, {ID: 2, PartitionID: 1}, {ID: 3, PartitionID: 1},
},
states: []session.State{session.NodeStateNormal, session.NodeStateNormal, session.NodeStateNormal},
distributions: map[int64][]*meta.Segment{
1: {{SegmentInfo: &datapb.SegmentInfo{ID: 1, CollectionID: 1, NumOfRows: 10}, Node: 1}},
2: {{SegmentInfo: &datapb.SegmentInfo{ID: 2, CollectionID: 1, NumOfRows: 10}, Node: 2}},
3: {
{SegmentInfo: &datapb.SegmentInfo{ID: 3, CollectionID: 1, NumOfRows: 20}, Node: 3},
{SegmentInfo: &datapb.SegmentInfo{ID: 4, CollectionID: 1, NumOfRows: 30}, Node: 3},
},
},
expectPlans: []SegmentAssignPlan{
{Segment: &meta.Segment{SegmentInfo: &datapb.SegmentInfo{ID: 3, CollectionID: 1, NumOfRows: 20}, Node: 3}, From: 3, To: 2, Replica: newReplicaDefaultRG(1)},
},
expectChannelPlans: []ChannelAssignPlan{},
},
}

for _, c := range cases {
suite.Run(c.name, func() {
suite.SetupSuite()
defer suite.TearDownTest()
balancer := suite.balancer

// 1. set up target for multi collections
collection := utils.CreateTestCollection(c.collectionID, int32(c.replicaID))
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, c.collectionID).Return(
nil, c.collectionsSegments, nil)
suite.broker.EXPECT().GetPartitions(mock.Anything, c.collectionID).Return([]int64{c.collectionID}, nil).Maybe()
collection.LoadPercentage = 100
collection.Status = querypb.LoadStatus_Loaded
balancer.meta.CollectionManager.PutCollection(collection)
balancer.meta.CollectionManager.PutPartition(utils.CreateTestPartition(c.collectionID, c.collectionID))
balancer.meta.ReplicaManager.Put(utils.CreateTestReplica(c.replicaID, c.collectionID, c.nodes))
balancer.targetMgr.UpdateCollectionNextTarget(c.collectionID)
balancer.targetMgr.UpdateCollectionCurrentTarget(c.collectionID)
balancer.targetMgr.UpdateCollectionNextTarget(c.collectionID)

// 2. set up target for distribution for multi collections
for node, s := range c.distributions {
balancer.dist.SegmentDistManager.Update(node, s...)
}
for node, v := range c.distributionChannels {
balancer.dist.ChannelDistManager.Update(node, v...)
}

// 3. set up nodes info and resourceManager for balancer
for i := range c.nodes {
nodeInfo := session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: c.nodes[i],
Address: "127.0.0.1:0",
Hostname: "localhost",
})
nodeInfo.UpdateStats(session.WithChannelCnt(len(c.distributionChannels[c.nodes[i]])))
nodeInfo.SetState(c.states[i])
suite.balancer.nodeManager.Add(nodeInfo)
suite.balancer.meta.ResourceManager.HandleNodeUp(c.nodes[i])
}
utils.RecoverAllCollection(balancer.meta)

// set node delta count
suite.mockScheduler.ExpectedCalls = nil
for i, node := range c.nodes {
suite.mockScheduler.EXPECT().GetSegmentTaskDelta(node, int64(1)).Return(c.deltaCounts[i]).Maybe()
suite.mockScheduler.EXPECT().GetSegmentTaskDelta(node, int64(-1)).Return(c.deltaCounts[i]).Maybe()
}

// 4. balance and verify result
segmentPlans, channelPlans := suite.getCollectionBalancePlans(balancer, c.collectionID)
assertChannelAssignPlanElementMatch(&suite.Suite, c.expectChannelPlans, channelPlans)
assertSegmentAssignPlanElementMatch(&suite.Suite, c.expectPlans, segmentPlans)
})
}
}

func (suite *ScoreBasedBalancerTestSuite) TestBalanceMultiRound() {
balanceCase := struct {
name string
Expand Down Expand Up @@ -658,11 +758,8 @@ func (suite *ScoreBasedBalancerTestSuite) TestStoppedBalance() {
expectChannelPlans: []ChannelAssignPlan{},
},
}
for i, c := range cases {
for _, c := range cases {
suite.Run(c.name, func() {
if i == 0 {
suite.mockScheduler.Mock.On("GetNodeChannelDelta", mock.Anything).Return(0).Maybe()
}
suite.SetupSuite()
defer suite.TearDownTest()
balancer := suite.balancer
Expand Down
15 changes: 15 additions & 0 deletions internal/querycoordv2/ops_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,9 @@ func (suite *OpsServiceSuite) SetupTest() {
suite.cluster = session.NewMockCluster(suite.T())
suite.jobScheduler = job.NewScheduler()
suite.taskScheduler = task.NewMockScheduler(suite.T())
suite.taskScheduler.EXPECT().GetSegmentTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe()
suite.taskScheduler.EXPECT().GetChannelTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe()

suite.jobScheduler.Start()
suite.balancer = balance.NewScoreBasedBalancer(
suite.taskScheduler,
Expand Down Expand Up @@ -609,6 +612,8 @@ func (suite *OpsServiceSuite) TestTransferSegment() {

// test copy mode, expect generate 1 load segment task
suite.taskScheduler.ExpectedCalls = nil
suite.taskScheduler.EXPECT().GetSegmentTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe()
suite.taskScheduler.EXPECT().GetChannelTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe()
suite.taskScheduler.EXPECT().Add(mock.Anything).RunAndReturn(func(t task.Task) error {
actions := t.Actions()
suite.Equal(len(actions), 1)
Expand All @@ -626,6 +631,8 @@ func (suite *OpsServiceSuite) TestTransferSegment() {

// test transfer all segments, expect generate 4 load segment task
suite.taskScheduler.ExpectedCalls = nil
suite.taskScheduler.EXPECT().GetSegmentTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe()
suite.taskScheduler.EXPECT().GetChannelTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe()
counter := atomic.NewInt64(0)
suite.taskScheduler.EXPECT().Add(mock.Anything).RunAndReturn(func(t task.Task) error {
actions := t.Actions()
Expand All @@ -645,6 +652,8 @@ func (suite *OpsServiceSuite) TestTransferSegment() {

// test transfer all segment to all nodes, expect generate 4 load segment task
suite.taskScheduler.ExpectedCalls = nil
suite.taskScheduler.EXPECT().GetSegmentTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe()
suite.taskScheduler.EXPECT().GetChannelTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe()
counter = atomic.NewInt64(0)
nodeSet := typeutil.NewUniqueSet()
suite.taskScheduler.EXPECT().Add(mock.Anything).RunAndReturn(func(t task.Task) error {
Expand Down Expand Up @@ -827,6 +836,8 @@ func (suite *OpsServiceSuite) TestTransferChannel() {

// test copy mode, expect generate 1 load segment task
suite.taskScheduler.ExpectedCalls = nil
suite.taskScheduler.EXPECT().GetSegmentTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe()
suite.taskScheduler.EXPECT().GetChannelTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe()
suite.taskScheduler.EXPECT().Add(mock.Anything).RunAndReturn(func(t task.Task) error {
actions := t.Actions()
suite.Equal(len(actions), 1)
Expand All @@ -844,6 +855,8 @@ func (suite *OpsServiceSuite) TestTransferChannel() {

// test transfer all channels, expect generate 4 load segment task
suite.taskScheduler.ExpectedCalls = nil
suite.taskScheduler.EXPECT().GetSegmentTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe()
suite.taskScheduler.EXPECT().GetChannelTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe()
counter := atomic.NewInt64(0)
suite.taskScheduler.EXPECT().Add(mock.Anything).RunAndReturn(func(t task.Task) error {
actions := t.Actions()
Expand All @@ -863,6 +876,8 @@ func (suite *OpsServiceSuite) TestTransferChannel() {

// test transfer all channels to all nodes, expect generate 4 load segment task
suite.taskScheduler.ExpectedCalls = nil
suite.taskScheduler.EXPECT().GetSegmentTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe()
suite.taskScheduler.EXPECT().GetChannelTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe()
counter = atomic.NewInt64(0)
nodeSet := typeutil.NewUniqueSet()
suite.taskScheduler.EXPECT().Add(mock.Anything).RunAndReturn(func(t task.Task) error {
Expand Down
Loading

0 comments on commit cc7bb0a

Please sign in to comment.