Skip to content

Commit

Permalink
enhance: Enable score based balance channel policy (#38143)
Browse files Browse the repository at this point in the history
issue: #38142
current balance channel policy only consider current collection's
distribution, so if all collections has 1 channel, and all channels has
been loaded on same querynode, after querynode num increase, balance
channel won't be triggered.

This PR enable score based balance channel policy, to achieve:
1. distribute all channels evenly across multiple querynodes
2. distribute each collection's channel evenly across multiple
querynodes.

---------

Signed-off-by: Wei Liu <[email protected]>
  • Loading branch information
weiliu1031 authored Dec 11, 2024
1 parent 0d7a89a commit e279ccf
Show file tree
Hide file tree
Showing 15 changed files with 545 additions and 40 deletions.
3 changes: 3 additions & 0 deletions configs/milvus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,9 @@ queryCoord:
rowCountFactor: 0.4 # the row count weight used when balancing segments among queryNodes
segmentCountFactor: 0.4 # the segment count weight used when balancing segments among queryNodes
globalSegmentCountFactor: 0.1 # the segment count weight used when balancing segments among queryNodes
# the channel count weight used when balancing channels among queryNodes,
# A higher value reduces the likelihood of assigning channels from the same collection to the same QueryNode. Set to 1 to disable this feature.
collectionChannelCountFactor: 10
segmentCountMaxSteps: 50 # segment count based plan generator max steps
rowCountMaxSteps: 50 # segment count based plan generator max steps
randomMaxSteps: 10 # segment count based plan generator max steps
Expand Down
4 changes: 2 additions & 2 deletions internal/querycoordv2/balance/balance.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (chanPlan *ChannelAssignPlan) String() string {

type Balance interface {
AssignSegment(ctx context.Context, collectionID int64, segments []*meta.Segment, nodes []int64, manualBalance bool) []SegmentAssignPlan
AssignChannel(ctx context.Context, channels []*meta.DmChannel, nodes []int64, manualBalance bool) []ChannelAssignPlan
AssignChannel(ctx context.Context, collectionID int64, channels []*meta.DmChannel, nodes []int64, manualBalance bool) []ChannelAssignPlan
BalanceReplica(ctx context.Context, replica *meta.Replica) ([]SegmentAssignPlan, []ChannelAssignPlan)
}

Expand Down Expand Up @@ -104,7 +104,7 @@ func (b *RoundRobinBalancer) AssignSegment(ctx context.Context, collectionID int
return ret
}

func (b *RoundRobinBalancer) AssignChannel(ctx context.Context, channels []*meta.DmChannel, nodes []int64, manualBalance bool) []ChannelAssignPlan {
func (b *RoundRobinBalancer) AssignChannel(ctx context.Context, collectionID int64, channels []*meta.DmChannel, nodes []int64, manualBalance bool) []ChannelAssignPlan {
// skip out suspend node and stopping node during assignment, but skip this check for manual balance
if !manualBalance {
versionRangeFilter := semver.MustParseRange(">2.3.x")
Expand Down
2 changes: 1 addition & 1 deletion internal/querycoordv2/balance/balance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ func (suite *BalanceTestSuite) TestAssignChannel() {
suite.mockScheduler.EXPECT().GetChannelTaskDelta(c.nodeIDs[i], int64(-1)).Return(c.deltaCnts[i])
}
}
plans := suite.roundRobinBalancer.AssignChannel(ctx, c.assignments, c.nodeIDs, false)
plans := suite.roundRobinBalancer.AssignChannel(ctx, 1, c.assignments, c.nodeIDs, false)
suite.ElementsMatch(c.expectPlans, plans)
})
}
Expand Down
6 changes: 3 additions & 3 deletions internal/querycoordv2/balance/channel_level_score_balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func (b *ChannelLevelScoreBalancer) genStoppingChannelPlan(ctx context.Context,
channelPlans := make([]ChannelAssignPlan, 0)
for _, nodeID := range offlineNodes {
dmChannels := b.dist.ChannelDistManager.GetByCollectionAndFilter(replica.GetCollectionID(), meta.WithNodeID2Channel(nodeID), meta.WithChannelName2Channel(channelName))
plans := b.AssignChannel(ctx, dmChannels, onlineNodes, false)
plans := b.AssignChannel(ctx, replica.GetCollectionID(), dmChannels, onlineNodes, false)
for i := range plans {
plans[i].From = nodeID
plans[i].Replica = replica
Expand Down Expand Up @@ -176,7 +176,7 @@ func (b *ChannelLevelScoreBalancer) genStoppingSegmentPlan(ctx context.Context,

func (b *ChannelLevelScoreBalancer) genSegmentPlan(ctx context.Context, br *balanceReport, replica *meta.Replica, channelName string, onlineNodes []int64) []SegmentAssignPlan {
segmentDist := make(map[int64][]*meta.Segment)
nodeItemsMap := b.convertToNodeItems(br, replica.GetCollectionID(), onlineNodes)
nodeItemsMap := b.convertToNodeItemsBySegment(br, replica.GetCollectionID(), onlineNodes)
if len(nodeItemsMap) == 0 {
return nil
}
Expand Down Expand Up @@ -262,7 +262,7 @@ func (b *ChannelLevelScoreBalancer) genChannelPlan(ctx context.Context, replica
return nil
}

channelPlans := b.AssignChannel(ctx, channelsToMove, nodeWithLessChannel, false)
channelPlans := b.AssignChannel(ctx, replica.GetCollectionID(), channelsToMove, nodeWithLessChannel, false)
for i := range channelPlans {
channelPlans[i].From = channelPlans[i].Channel.Node
channelPlans[i].Replica = replica
Expand Down
21 changes: 11 additions & 10 deletions internal/querycoordv2/balance/mock_balancer.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions internal/querycoordv2/balance/rowcount_based_balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (b *RowCountBasedBalancer) AssignSegment(ctx context.Context, collectionID

// AssignSegment, when row count based balancer assign segments, it will assign channel to node with least global channel count.
// try to make every query node has channel count
func (b *RowCountBasedBalancer) AssignChannel(ctx context.Context, channels []*meta.DmChannel, nodes []int64, manualBalance bool) []ChannelAssignPlan {
func (b *RowCountBasedBalancer) AssignChannel(ctx context.Context, collectionID int64, channels []*meta.DmChannel, nodes []int64, manualBalance bool) []ChannelAssignPlan {
// skip out suspend node and stopping node during assignment, but skip this check for manual balance
if !manualBalance {
versionRangeFilter := semver.MustParseRange(">2.3.x")
Expand Down Expand Up @@ -311,7 +311,7 @@ func (b *RowCountBasedBalancer) genStoppingChannelPlan(ctx context.Context, repl
channelPlans := make([]ChannelAssignPlan, 0)
for _, nodeID := range roNodes {
dmChannels := b.dist.ChannelDistManager.GetByCollectionAndFilter(replica.GetCollectionID(), meta.WithNodeID2Channel(nodeID))
plans := b.AssignChannel(ctx, dmChannels, rwNodes, false)
plans := b.AssignChannel(ctx, replica.GetCollectionID(), dmChannels, rwNodes, false)
for i := range plans {
plans[i].From = nodeID
plans[i].Replica = replica
Expand Down Expand Up @@ -349,7 +349,7 @@ func (b *RowCountBasedBalancer) genChannelPlan(ctx context.Context, br *balanceR
return nil
}

channelPlans := b.AssignChannel(ctx, channelsToMove, nodeWithLessChannel, false)
channelPlans := b.AssignChannel(ctx, replica.GetCollectionID(), channelsToMove, nodeWithLessChannel, false)
for i := range channelPlans {
channelPlans[i].From = channelPlans[i].Channel.Node
channelPlans[i].Replica = replica
Expand Down
Loading

0 comments on commit e279ccf

Please sign in to comment.