Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enhance: Avoid assign too much segment/channels to new querynode #34096

Merged
merged 2 commits into from
Jun 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions internal/querycoordv2/balance/balance.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func (b *RoundRobinBalancer) AssignSegment(collectionID int64, segments []*meta.
sort.Slice(nodesInfo, func(i, j int) bool {
cnt1, cnt2 := nodesInfo[i].SegmentCnt(), nodesInfo[j].SegmentCnt()
id1, id2 := nodesInfo[i].ID(), nodesInfo[j].ID()
delta1, delta2 := b.scheduler.GetNodeSegmentDelta(id1), b.scheduler.GetNodeSegmentDelta(id2)
delta1, delta2 := b.scheduler.GetSegmentTaskDelta(id1, -1), b.scheduler.GetSegmentTaskDelta(id2, -1)
return cnt1+delta1 < cnt2+delta2
})
ret := make([]SegmentAssignPlan, 0, len(segments))
Expand Down Expand Up @@ -112,7 +112,7 @@ func (b *RoundRobinBalancer) AssignChannel(channels []*meta.DmChannel, nodes []i
sort.Slice(nodesInfo, func(i, j int) bool {
cnt1, cnt2 := nodesInfo[i].ChannelCnt(), nodesInfo[j].ChannelCnt()
id1, id2 := nodesInfo[i].ID(), nodesInfo[j].ID()
delta1, delta2 := b.scheduler.GetNodeChannelDelta(id1), b.scheduler.GetNodeChannelDelta(id2)
delta1, delta2 := b.scheduler.GetChannelTaskDelta(id1, -1), b.scheduler.GetChannelTaskDelta(id2, -1)
return cnt1+delta1 < cnt2+delta2
})
ret := make([]ChannelAssignPlan, 0, len(channels))
Expand Down
10 changes: 8 additions & 2 deletions internal/querycoordv2/balance/balance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package balance
import (
"testing"

"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"

"github.com/milvus-io/milvus/internal/proto/datapb"
Expand All @@ -38,6 +39,9 @@ func (suite *BalanceTestSuite) SetupTest() {
nodeManager := session.NewNodeManager()
suite.mockScheduler = task.NewMockScheduler(suite.T())
suite.roundRobinBalancer = NewRoundRobinBalancer(suite.mockScheduler, nodeManager)

suite.mockScheduler.EXPECT().GetSegmentTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe()
suite.mockScheduler.EXPECT().GetChannelTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe()
}

func (suite *BalanceTestSuite) TestAssignBalance() {
Expand Down Expand Up @@ -85,6 +89,7 @@ func (suite *BalanceTestSuite) TestAssignBalance() {
for _, c := range cases {
suite.Run(c.name, func() {
suite.SetupTest()
suite.mockScheduler.ExpectedCalls = nil
for i := range c.nodeIDs {
nodeInfo := session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: c.nodeIDs[i],
Expand All @@ -95,7 +100,7 @@ func (suite *BalanceTestSuite) TestAssignBalance() {
nodeInfo.SetState(c.states[i])
suite.roundRobinBalancer.nodeManager.Add(nodeInfo)
if !nodeInfo.IsStoppingState() {
suite.mockScheduler.EXPECT().GetNodeSegmentDelta(c.nodeIDs[i]).Return(c.deltaCnts[i])
suite.mockScheduler.EXPECT().GetSegmentTaskDelta(c.nodeIDs[i], int64(-1)).Return(c.deltaCnts[i])
}
}
plans := suite.roundRobinBalancer.AssignSegment(0, c.assignments, c.nodeIDs, false)
Expand Down Expand Up @@ -149,6 +154,7 @@ func (suite *BalanceTestSuite) TestAssignChannel() {
for _, c := range cases {
suite.Run(c.name, func() {
suite.SetupTest()
suite.mockScheduler.ExpectedCalls = nil
for i := range c.nodeIDs {
nodeInfo := session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: c.nodeIDs[i],
Expand All @@ -160,7 +166,7 @@ func (suite *BalanceTestSuite) TestAssignChannel() {
nodeInfo.SetState(c.states[i])
suite.roundRobinBalancer.nodeManager.Add(nodeInfo)
if !nodeInfo.IsStoppingState() {
suite.mockScheduler.EXPECT().GetNodeChannelDelta(c.nodeIDs[i]).Return(c.deltaCnts[i])
suite.mockScheduler.EXPECT().GetChannelTaskDelta(c.nodeIDs[i], int64(-1)).Return(c.deltaCnts[i])
}
}
plans := suite.roundRobinBalancer.AssignChannel(c.assignments, c.nodeIDs, false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ func (suite *ChannelLevelScoreBalancerTestSuite) SetupTest() {
distManager := meta.NewDistributionManager()
suite.mockScheduler = task.NewMockScheduler(suite.T())
suite.balancer = NewChannelLevelScoreBalancer(suite.mockScheduler, nodeManager, distManager, testMeta, testTarget)

suite.mockScheduler.EXPECT().GetSegmentTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe()
suite.mockScheduler.EXPECT().GetChannelTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe()
}

func (suite *ChannelLevelScoreBalancerTestSuite) TearDownTest() {
Expand Down Expand Up @@ -685,11 +688,8 @@ func (suite *ChannelLevelScoreBalancerTestSuite) TestStoppedBalance() {
expectChannelPlans: []ChannelAssignPlan{},
},
}
for i, c := range cases {
for _, c := range cases {
suite.Run(c.name, func() {
if i == 0 {
suite.mockScheduler.Mock.On("GetNodeChannelDelta", mock.Anything).Return(0).Maybe()
}
suite.SetupSuite()
defer suite.TearDownTest()
balancer := suite.balancer
Expand Down
8 changes: 7 additions & 1 deletion internal/querycoordv2/balance/rowcount_based_balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,9 @@ func (b *RowCountBasedBalancer) convertToNodeItemsBySegment(nodeIDs []int64) []*
rowcnt += int(view.NumOfGrowingRows)
}

// calculate executing task cost in scheduler
rowcnt += b.scheduler.GetSegmentTaskDelta(node, -1)

// more row count, less priority
nodeItem := newNodeItem(rowcnt, node)
ret = append(ret, &nodeItem)
Expand All @@ -152,8 +155,11 @@ func (b *RowCountBasedBalancer) convertToNodeItemsByChannel(nodeIDs []int64) []*
for _, node := range nodeIDs {
channels := b.dist.ChannelDistManager.GetByFilter(meta.WithNodeID2Channel(node))

channelCount := len(channels)
// calculate executing task cost in scheduler
channelCount += b.scheduler.GetChannelTaskDelta(node, -1)
// more channel num, less priority
nodeItem := newNodeItem(len(channels), node)
nodeItem := newNodeItem(channelCount, node)
ret = append(ret, &nodeItem)
}
return ret
Expand Down
7 changes: 3 additions & 4 deletions internal/querycoordv2/balance/rowcount_based_balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ func (suite *RowCountBasedBalancerTestSuite) SetupTest() {
suite.balancer = NewRowCountBasedBalancer(suite.mockScheduler, nodeManager, distManager, testMeta, testTarget)

suite.broker.EXPECT().GetPartitions(mock.Anything, int64(1)).Return([]int64{1}, nil).Maybe()

suite.mockScheduler.EXPECT().GetSegmentTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe()
suite.mockScheduler.EXPECT().GetChannelTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe()
}

func (suite *RowCountBasedBalancerTestSuite) TearDownTest() {
Expand Down Expand Up @@ -461,7 +464,6 @@ func (suite *RowCountBasedBalancerTestSuite) TestBalance() {
balancer.targetMgr.UpdateCollectionNextTarget(int64(1))
balancer.targetMgr.UpdateCollectionCurrentTarget(1)
balancer.targetMgr.UpdateCollectionNextTarget(int64(1))
suite.mockScheduler.Mock.On("GetNodeChannelDelta", mock.Anything).Return(0).Maybe()
for node, s := range c.distributions {
balancer.dist.SegmentDistManager.Update(node, s...)
}
Expand Down Expand Up @@ -675,7 +677,6 @@ func (suite *RowCountBasedBalancerTestSuite) TestBalanceOnPartStopping() {
suite.broker.ExpectedCalls = nil
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, int64(1)).Return(nil, c.segmentInNext, nil)
balancer.targetMgr.UpdateCollectionNextTarget(int64(1))
suite.mockScheduler.Mock.On("GetNodeChannelDelta", mock.Anything).Return(0).Maybe()
for node, s := range c.distributions {
balancer.dist.SegmentDistManager.Update(node, s...)
}
Expand Down Expand Up @@ -780,7 +781,6 @@ func (suite *RowCountBasedBalancerTestSuite) TestBalanceOutboundNodes() {
},
}

suite.mockScheduler.Mock.On("GetNodeChannelDelta", mock.Anything).Return(0).Maybe()
for _, c := range cases {
suite.Run(c.name, func() {
suite.SetupSuite()
Expand Down Expand Up @@ -1052,7 +1052,6 @@ func (suite *RowCountBasedBalancerTestSuite) TestDisableBalanceChannel() {
balancer.targetMgr.UpdateCollectionNextTarget(int64(1))
balancer.targetMgr.UpdateCollectionCurrentTarget(1)
balancer.targetMgr.UpdateCollectionNextTarget(int64(1))
suite.mockScheduler.Mock.On("GetNodeChannelDelta", mock.Anything).Return(0).Maybe()
for node, s := range c.distributions {
balancer.dist.SegmentDistManager.Update(node, s...)
}
Expand Down
15 changes: 11 additions & 4 deletions internal/querycoordv2/balance/score_based_balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,19 +150,22 @@ func (b *ScoreBasedBalancer) convertToNodeItems(collectionID int64, nodeIDs []in
}

func (b *ScoreBasedBalancer) calculateScore(collectionID, nodeID int64) int {
rowCount := 0
nodeRowCount := 0
// calculate global sealed segment row count
globalSegments := b.dist.SegmentDistManager.GetByFilter(meta.WithNodeID(nodeID))
for _, s := range globalSegments {
rowCount += int(s.GetNumOfRows())
nodeRowCount += int(s.GetNumOfRows())
}

// calculate global growing segment row count
views := b.dist.LeaderViewManager.GetByFilter(meta.WithNodeID2LeaderView(nodeID))
for _, view := range views {
rowCount += int(float64(view.NumOfGrowingRows) * params.Params.QueryCoordCfg.GrowingRowCountWeight.GetAsFloat())
nodeRowCount += int(float64(view.NumOfGrowingRows) * params.Params.QueryCoordCfg.GrowingRowCountWeight.GetAsFloat())
}

// calculate executing task cost in scheduler
nodeRowCount += b.scheduler.GetSegmentTaskDelta(nodeID, -1)

collectionRowCount := 0
// calculate collection sealed segment row count
collectionSegments := b.dist.SegmentDistManager.GetByFilter(meta.WithCollectionID(collectionID), meta.WithNodeID(nodeID))
Expand All @@ -175,7 +178,11 @@ func (b *ScoreBasedBalancer) calculateScore(collectionID, nodeID int64) int {
for _, view := range collectionViews {
collectionRowCount += int(float64(view.NumOfGrowingRows) * params.Params.QueryCoordCfg.GrowingRowCountWeight.GetAsFloat())
}
return collectionRowCount + int(float64(rowCount)*

// calculate executing task cost in scheduler
collectionRowCount += b.scheduler.GetSegmentTaskDelta(nodeID, collectionID)

return collectionRowCount + int(float64(nodeRowCount)*
params.Params.QueryCoordCfg.GlobalRowCountFactor.GetAsFloat())
}

Expand Down
105 changes: 101 additions & 4 deletions internal/querycoordv2/balance/score_based_balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ func (suite *ScoreBasedBalancerTestSuite) SetupTest() {
distManager := meta.NewDistributionManager()
suite.mockScheduler = task.NewMockScheduler(suite.T())
suite.balancer = NewScoreBasedBalancer(suite.mockScheduler, nodeManager, distManager, testMeta, testTarget)

suite.mockScheduler.EXPECT().GetSegmentTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe()
suite.mockScheduler.EXPECT().GetChannelTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe()
}

func (suite *ScoreBasedBalancerTestSuite) TearDownTest() {
Expand Down Expand Up @@ -449,6 +452,103 @@ func (suite *ScoreBasedBalancerTestSuite) TestBalanceOneRound() {
}
}

func (suite *ScoreBasedBalancerTestSuite) TestBalanceWithExecutingTask() {
cases := []struct {
name string
nodes []int64
collectionID int64
replicaID int64
collectionsSegments []*datapb.SegmentInfo
states []session.State
shouldMock bool
distributions map[int64][]*meta.Segment
distributionChannels map[int64][]*meta.DmChannel
deltaCounts []int
expectPlans []SegmentAssignPlan
expectChannelPlans []ChannelAssignPlan
}{
{
name: "normal balance for one collection only",
nodes: []int64{1, 2, 3},
deltaCounts: []int{30, 0, 0},
collectionID: 1,
replicaID: 1,
collectionsSegments: []*datapb.SegmentInfo{
{ID: 1, PartitionID: 1}, {ID: 2, PartitionID: 1}, {ID: 3, PartitionID: 1},
},
states: []session.State{session.NodeStateNormal, session.NodeStateNormal, session.NodeStateNormal},
distributions: map[int64][]*meta.Segment{
1: {{SegmentInfo: &datapb.SegmentInfo{ID: 1, CollectionID: 1, NumOfRows: 10}, Node: 1}},
2: {{SegmentInfo: &datapb.SegmentInfo{ID: 2, CollectionID: 1, NumOfRows: 10}, Node: 2}},
3: {
{SegmentInfo: &datapb.SegmentInfo{ID: 3, CollectionID: 1, NumOfRows: 20}, Node: 3},
{SegmentInfo: &datapb.SegmentInfo{ID: 4, CollectionID: 1, NumOfRows: 30}, Node: 3},
},
},
expectPlans: []SegmentAssignPlan{
{Segment: &meta.Segment{SegmentInfo: &datapb.SegmentInfo{ID: 3, CollectionID: 1, NumOfRows: 20}, Node: 3}, From: 3, To: 2, Replica: newReplicaDefaultRG(1)},
},
expectChannelPlans: []ChannelAssignPlan{},
},
}

for _, c := range cases {
suite.Run(c.name, func() {
suite.SetupSuite()
defer suite.TearDownTest()
balancer := suite.balancer

// 1. set up target for multi collections
collection := utils.CreateTestCollection(c.collectionID, int32(c.replicaID))
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, c.collectionID).Return(
nil, c.collectionsSegments, nil)
suite.broker.EXPECT().GetPartitions(mock.Anything, c.collectionID).Return([]int64{c.collectionID}, nil).Maybe()
collection.LoadPercentage = 100
collection.Status = querypb.LoadStatus_Loaded
balancer.meta.CollectionManager.PutCollection(collection)
balancer.meta.CollectionManager.PutPartition(utils.CreateTestPartition(c.collectionID, c.collectionID))
balancer.meta.ReplicaManager.Put(utils.CreateTestReplica(c.replicaID, c.collectionID, c.nodes))
balancer.targetMgr.UpdateCollectionNextTarget(c.collectionID)
balancer.targetMgr.UpdateCollectionCurrentTarget(c.collectionID)
balancer.targetMgr.UpdateCollectionNextTarget(c.collectionID)

// 2. set up target for distribution for multi collections
for node, s := range c.distributions {
balancer.dist.SegmentDistManager.Update(node, s...)
}
for node, v := range c.distributionChannels {
balancer.dist.ChannelDistManager.Update(node, v...)
}

// 3. set up nodes info and resourceManager for balancer
for i := range c.nodes {
nodeInfo := session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: c.nodes[i],
Address: "127.0.0.1:0",
Hostname: "localhost",
})
nodeInfo.UpdateStats(session.WithChannelCnt(len(c.distributionChannels[c.nodes[i]])))
nodeInfo.SetState(c.states[i])
suite.balancer.nodeManager.Add(nodeInfo)
suite.balancer.meta.ResourceManager.HandleNodeUp(c.nodes[i])
}
utils.RecoverAllCollection(balancer.meta)

// set node delta count
suite.mockScheduler.ExpectedCalls = nil
for i, node := range c.nodes {
suite.mockScheduler.EXPECT().GetSegmentTaskDelta(node, int64(1)).Return(c.deltaCounts[i]).Maybe()
suite.mockScheduler.EXPECT().GetSegmentTaskDelta(node, int64(-1)).Return(c.deltaCounts[i]).Maybe()
}

// 4. balance and verify result
segmentPlans, channelPlans := suite.getCollectionBalancePlans(balancer, c.collectionID)
assertChannelAssignPlanElementMatch(&suite.Suite, c.expectChannelPlans, channelPlans)
assertSegmentAssignPlanElementMatch(&suite.Suite, c.expectPlans, segmentPlans)
})
}
}

func (suite *ScoreBasedBalancerTestSuite) TestBalanceMultiRound() {
balanceCase := struct {
name string
Expand Down Expand Up @@ -658,11 +758,8 @@ func (suite *ScoreBasedBalancerTestSuite) TestStoppedBalance() {
expectChannelPlans: []ChannelAssignPlan{},
},
}
for i, c := range cases {
for _, c := range cases {
suite.Run(c.name, func() {
if i == 0 {
suite.mockScheduler.Mock.On("GetNodeChannelDelta", mock.Anything).Return(0).Maybe()
}
suite.SetupSuite()
defer suite.TearDownTest()
balancer := suite.balancer
Expand Down
15 changes: 15 additions & 0 deletions internal/querycoordv2/ops_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,9 @@ func (suite *OpsServiceSuite) SetupTest() {
suite.cluster = session.NewMockCluster(suite.T())
suite.jobScheduler = job.NewScheduler()
suite.taskScheduler = task.NewMockScheduler(suite.T())
suite.taskScheduler.EXPECT().GetSegmentTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe()
suite.taskScheduler.EXPECT().GetChannelTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe()

suite.jobScheduler.Start()
suite.balancer = balance.NewScoreBasedBalancer(
suite.taskScheduler,
Expand Down Expand Up @@ -609,6 +612,8 @@ func (suite *OpsServiceSuite) TestTransferSegment() {

// test copy mode, expect generate 1 load segment task
suite.taskScheduler.ExpectedCalls = nil
suite.taskScheduler.EXPECT().GetSegmentTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe()
suite.taskScheduler.EXPECT().GetChannelTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe()
suite.taskScheduler.EXPECT().Add(mock.Anything).RunAndReturn(func(t task.Task) error {
actions := t.Actions()
suite.Equal(len(actions), 1)
Expand All @@ -626,6 +631,8 @@ func (suite *OpsServiceSuite) TestTransferSegment() {

// test transfer all segments, expect generate 4 load segment task
suite.taskScheduler.ExpectedCalls = nil
suite.taskScheduler.EXPECT().GetSegmentTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe()
suite.taskScheduler.EXPECT().GetChannelTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe()
counter := atomic.NewInt64(0)
suite.taskScheduler.EXPECT().Add(mock.Anything).RunAndReturn(func(t task.Task) error {
actions := t.Actions()
Expand All @@ -645,6 +652,8 @@ func (suite *OpsServiceSuite) TestTransferSegment() {

// test transfer all segment to all nodes, expect generate 4 load segment task
suite.taskScheduler.ExpectedCalls = nil
suite.taskScheduler.EXPECT().GetSegmentTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe()
suite.taskScheduler.EXPECT().GetChannelTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe()
counter = atomic.NewInt64(0)
nodeSet := typeutil.NewUniqueSet()
suite.taskScheduler.EXPECT().Add(mock.Anything).RunAndReturn(func(t task.Task) error {
Expand Down Expand Up @@ -827,6 +836,8 @@ func (suite *OpsServiceSuite) TestTransferChannel() {

// test copy mode, expect generate 1 load segment task
suite.taskScheduler.ExpectedCalls = nil
suite.taskScheduler.EXPECT().GetSegmentTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe()
suite.taskScheduler.EXPECT().GetChannelTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe()
suite.taskScheduler.EXPECT().Add(mock.Anything).RunAndReturn(func(t task.Task) error {
actions := t.Actions()
suite.Equal(len(actions), 1)
Expand All @@ -844,6 +855,8 @@ func (suite *OpsServiceSuite) TestTransferChannel() {

// test transfer all channels, expect generate 4 load segment task
suite.taskScheduler.ExpectedCalls = nil
suite.taskScheduler.EXPECT().GetSegmentTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe()
suite.taskScheduler.EXPECT().GetChannelTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe()
counter := atomic.NewInt64(0)
suite.taskScheduler.EXPECT().Add(mock.Anything).RunAndReturn(func(t task.Task) error {
actions := t.Actions()
Expand All @@ -863,6 +876,8 @@ func (suite *OpsServiceSuite) TestTransferChannel() {

// test transfer all channels to all nodes, expect generate 4 load segment task
suite.taskScheduler.ExpectedCalls = nil
suite.taskScheduler.EXPECT().GetSegmentTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe()
suite.taskScheduler.EXPECT().GetChannelTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe()
counter = atomic.NewInt64(0)
nodeSet := typeutil.NewUniqueSet()
suite.taskScheduler.EXPECT().Add(mock.Anything).RunAndReturn(func(t task.Task) error {
Expand Down
Loading
Loading