diff --git a/configs/milvus.yaml b/configs/milvus.yaml index b6107345f0c98..9aaa85694ddec 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -347,6 +347,9 @@ queryCoord: rowCountFactor: 0.4 # the row count weight used when balancing segments among queryNodes segmentCountFactor: 0.4 # the segment count weight used when balancing segments among queryNodes globalSegmentCountFactor: 0.1 # the segment count weight used when balancing segments among queryNodes + # the channel count weight used when balancing channels among queryNodes, + # A higher value reduces the likelihood of assigning channels from the same collection to the same QueryNode. Set to 1 to disable this feature. + collectionChannelCountFactor: 10 segmentCountMaxSteps: 50 # segment count based plan generator max steps rowCountMaxSteps: 50 # segment count based plan generator max steps randomMaxSteps: 10 # segment count based plan generator max steps diff --git a/internal/querycoordv2/balance/balance.go b/internal/querycoordv2/balance/balance.go index 2c1b3ffe97d26..04a9494141a79 100644 --- a/internal/querycoordv2/balance/balance.go +++ b/internal/querycoordv2/balance/balance.go @@ -59,7 +59,7 @@ func (chanPlan *ChannelAssignPlan) String() string { type Balance interface { AssignSegment(ctx context.Context, collectionID int64, segments []*meta.Segment, nodes []int64, manualBalance bool) []SegmentAssignPlan - AssignChannel(ctx context.Context, channels []*meta.DmChannel, nodes []int64, manualBalance bool) []ChannelAssignPlan + AssignChannel(ctx context.Context, collectionID int64, channels []*meta.DmChannel, nodes []int64, manualBalance bool) []ChannelAssignPlan BalanceReplica(ctx context.Context, replica *meta.Replica) ([]SegmentAssignPlan, []ChannelAssignPlan) } @@ -104,7 +104,7 @@ func (b *RoundRobinBalancer) AssignSegment(ctx context.Context, collectionID int return ret } -func (b *RoundRobinBalancer) AssignChannel(ctx context.Context, channels []*meta.DmChannel, nodes []int64, manualBalance bool) []ChannelAssignPlan { +func (b *RoundRobinBalancer) AssignChannel(ctx context.Context, collectionID int64, channels []*meta.DmChannel, nodes []int64, manualBalance bool) []ChannelAssignPlan { // skip out suspend node and stopping node during assignment, but skip this check for manual balance if !manualBalance { versionRangeFilter := semver.MustParseRange(">2.3.x") diff --git a/internal/querycoordv2/balance/balance_test.go b/internal/querycoordv2/balance/balance_test.go index c76cf019112c3..0aaf5f3bb0706 100644 --- a/internal/querycoordv2/balance/balance_test.go +++ b/internal/querycoordv2/balance/balance_test.go @@ -177,7 +177,7 @@ func (suite *BalanceTestSuite) TestAssignChannel() { suite.mockScheduler.EXPECT().GetChannelTaskDelta(c.nodeIDs[i], int64(-1)).Return(c.deltaCnts[i]) } } - plans := suite.roundRobinBalancer.AssignChannel(ctx, c.assignments, c.nodeIDs, false) + plans := suite.roundRobinBalancer.AssignChannel(ctx, 1, c.assignments, c.nodeIDs, false) suite.ElementsMatch(c.expectPlans, plans) }) } diff --git a/internal/querycoordv2/balance/channel_level_score_balancer.go b/internal/querycoordv2/balance/channel_level_score_balancer.go index fb4afd1521fab..92d12f0b1f20f 100644 --- a/internal/querycoordv2/balance/channel_level_score_balancer.go +++ b/internal/querycoordv2/balance/channel_level_score_balancer.go @@ -147,7 +147,7 @@ func (b *ChannelLevelScoreBalancer) genStoppingChannelPlan(ctx context.Context, channelPlans := make([]ChannelAssignPlan, 0) for _, nodeID := range offlineNodes { dmChannels := b.dist.ChannelDistManager.GetByCollectionAndFilter(replica.GetCollectionID(), meta.WithNodeID2Channel(nodeID), meta.WithChannelName2Channel(channelName)) - plans := b.AssignChannel(ctx, dmChannels, onlineNodes, false) + plans := b.AssignChannel(ctx, replica.GetCollectionID(), dmChannels, onlineNodes, false) for i := range plans { plans[i].From = nodeID plans[i].Replica = replica @@ -176,7 +176,7 @@ func (b *ChannelLevelScoreBalancer) genStoppingSegmentPlan(ctx context.Context, func (b *ChannelLevelScoreBalancer) genSegmentPlan(ctx context.Context, br *balanceReport, replica *meta.Replica, channelName string, onlineNodes []int64) []SegmentAssignPlan { segmentDist := make(map[int64][]*meta.Segment) - nodeItemsMap := b.convertToNodeItems(br, replica.GetCollectionID(), onlineNodes) + nodeItemsMap := b.convertToNodeItemsBySegment(br, replica.GetCollectionID(), onlineNodes) if len(nodeItemsMap) == 0 { return nil } @@ -262,7 +262,7 @@ func (b *ChannelLevelScoreBalancer) genChannelPlan(ctx context.Context, replica return nil } - channelPlans := b.AssignChannel(ctx, channelsToMove, nodeWithLessChannel, false) + channelPlans := b.AssignChannel(ctx, replica.GetCollectionID(), channelsToMove, nodeWithLessChannel, false) for i := range channelPlans { channelPlans[i].From = channelPlans[i].Channel.Node channelPlans[i].Replica = replica diff --git a/internal/querycoordv2/balance/mock_balancer.go b/internal/querycoordv2/balance/mock_balancer.go index 3ba72b89fb729..5935b3c36c15a 100644 --- a/internal/querycoordv2/balance/mock_balancer.go +++ b/internal/querycoordv2/balance/mock_balancer.go @@ -22,17 +22,17 @@ func (_m *MockBalancer) EXPECT() *MockBalancer_Expecter { return &MockBalancer_Expecter{mock: &_m.Mock} } -// AssignChannel provides a mock function with given fields: ctx, channels, nodes, manualBalance -func (_m *MockBalancer) AssignChannel(ctx context.Context, channels []*meta.DmChannel, nodes []int64, manualBalance bool) []ChannelAssignPlan { - ret := _m.Called(ctx, channels, nodes, manualBalance) +// AssignChannel provides a mock function with given fields: ctx, collectionID, channels, nodes, manualBalance +func (_m *MockBalancer) AssignChannel(ctx context.Context, collectionID int64, channels []*meta.DmChannel, nodes []int64, manualBalance bool) []ChannelAssignPlan { + ret := _m.Called(ctx, collectionID, channels, nodes, manualBalance) if len(ret) == 0 { panic("no return value specified for AssignChannel") } var r0 []ChannelAssignPlan - if rf, ok := ret.Get(0).(func(context.Context, []*meta.DmChannel, []int64, bool) []ChannelAssignPlan); ok { - r0 = rf(ctx, channels, nodes, manualBalance) + if rf, ok := ret.Get(0).(func(context.Context, int64, []*meta.DmChannel, []int64, bool) []ChannelAssignPlan); ok { + r0 = rf(ctx, collectionID, channels, nodes, manualBalance) } else { if ret.Get(0) != nil { r0 = ret.Get(0).([]ChannelAssignPlan) @@ -49,16 +49,17 @@ type MockBalancer_AssignChannel_Call struct { // AssignChannel is a helper method to define mock.On call // - ctx context.Context +// - collectionID int64 // - channels []*meta.DmChannel // - nodes []int64 // - manualBalance bool -func (_e *MockBalancer_Expecter) AssignChannel(ctx interface{}, channels interface{}, nodes interface{}, manualBalance interface{}) *MockBalancer_AssignChannel_Call { - return &MockBalancer_AssignChannel_Call{Call: _e.mock.On("AssignChannel", ctx, channels, nodes, manualBalance)} +func (_e *MockBalancer_Expecter) AssignChannel(ctx interface{}, collectionID interface{}, channels interface{}, nodes interface{}, manualBalance interface{}) *MockBalancer_AssignChannel_Call { + return &MockBalancer_AssignChannel_Call{Call: _e.mock.On("AssignChannel", ctx, collectionID, channels, nodes, manualBalance)} } -func (_c *MockBalancer_AssignChannel_Call) Run(run func(ctx context.Context, channels []*meta.DmChannel, nodes []int64, manualBalance bool)) *MockBalancer_AssignChannel_Call { +func (_c *MockBalancer_AssignChannel_Call) Run(run func(ctx context.Context, collectionID int64, channels []*meta.DmChannel, nodes []int64, manualBalance bool)) *MockBalancer_AssignChannel_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].([]*meta.DmChannel), args[2].([]int64), args[3].(bool)) + run(args[0].(context.Context), args[1].(int64), args[2].([]*meta.DmChannel), args[3].([]int64), args[4].(bool)) }) return _c } @@ -68,7 +69,7 @@ func (_c *MockBalancer_AssignChannel_Call) Return(_a0 []ChannelAssignPlan) *Mock return _c } -func (_c *MockBalancer_AssignChannel_Call) RunAndReturn(run func(context.Context, []*meta.DmChannel, []int64, bool) []ChannelAssignPlan) *MockBalancer_AssignChannel_Call { +func (_c *MockBalancer_AssignChannel_Call) RunAndReturn(run func(context.Context, int64, []*meta.DmChannel, []int64, bool) []ChannelAssignPlan) *MockBalancer_AssignChannel_Call { _c.Call.Return(run) return _c } diff --git a/internal/querycoordv2/balance/rowcount_based_balancer.go b/internal/querycoordv2/balance/rowcount_based_balancer.go index 60d36607136d3..0323bbb81b912 100644 --- a/internal/querycoordv2/balance/rowcount_based_balancer.go +++ b/internal/querycoordv2/balance/rowcount_based_balancer.go @@ -87,7 +87,7 @@ func (b *RowCountBasedBalancer) AssignSegment(ctx context.Context, collectionID // AssignSegment, when row count based balancer assign segments, it will assign channel to node with least global channel count. // try to make every query node has channel count -func (b *RowCountBasedBalancer) AssignChannel(ctx context.Context, channels []*meta.DmChannel, nodes []int64, manualBalance bool) []ChannelAssignPlan { +func (b *RowCountBasedBalancer) AssignChannel(ctx context.Context, collectionID int64, channels []*meta.DmChannel, nodes []int64, manualBalance bool) []ChannelAssignPlan { // skip out suspend node and stopping node during assignment, but skip this check for manual balance if !manualBalance { versionRangeFilter := semver.MustParseRange(">2.3.x") @@ -311,7 +311,7 @@ func (b *RowCountBasedBalancer) genStoppingChannelPlan(ctx context.Context, repl channelPlans := make([]ChannelAssignPlan, 0) for _, nodeID := range roNodes { dmChannels := b.dist.ChannelDistManager.GetByCollectionAndFilter(replica.GetCollectionID(), meta.WithNodeID2Channel(nodeID)) - plans := b.AssignChannel(ctx, dmChannels, rwNodes, false) + plans := b.AssignChannel(ctx, replica.GetCollectionID(), dmChannels, rwNodes, false) for i := range plans { plans[i].From = nodeID plans[i].Replica = replica @@ -349,7 +349,7 @@ func (b *RowCountBasedBalancer) genChannelPlan(ctx context.Context, br *balanceR return nil } - channelPlans := b.AssignChannel(ctx, channelsToMove, nodeWithLessChannel, false) + channelPlans := b.AssignChannel(ctx, replica.GetCollectionID(), channelsToMove, nodeWithLessChannel, false) for i := range channelPlans { channelPlans[i].From = channelPlans[i].Channel.Node channelPlans[i].Replica = replica diff --git a/internal/querycoordv2/balance/score_based_balancer.go b/internal/querycoordv2/balance/score_based_balancer.go index af1092d016273..84fd81ada0273 100644 --- a/internal/querycoordv2/balance/score_based_balancer.go +++ b/internal/querycoordv2/balance/score_based_balancer.go @@ -70,7 +70,7 @@ func (b *ScoreBasedBalancer) assignSegment(br *balanceReport, collectionID int64 } // calculate each node's score - nodeItemsMap := b.convertToNodeItems(br, collectionID, nodes) + nodeItemsMap := b.convertToNodeItemsBySegment(br, collectionID, nodes) if len(nodeItemsMap) == 0 { return nil } @@ -93,6 +93,9 @@ func (b *ScoreBasedBalancer) assignSegment(br *balanceReport, collectionID int64 }) balanceBatchSize := paramtable.Get().QueryCoordCfg.CollectionBalanceSegmentBatchSize.GetAsInt() + if manualBalance { + balanceBatchSize = math.MaxInt64 + } plans := make([]SegmentAssignPlan, 0, len(segments)) for _, s := range segments { func(s *meta.Segment) { @@ -143,6 +146,90 @@ func (b *ScoreBasedBalancer) assignSegment(br *balanceReport, collectionID int64 return plans } +func (b *ScoreBasedBalancer) AssignChannel(ctx context.Context, collectionID int64, channels []*meta.DmChannel, nodes []int64, manualBalance bool) []ChannelAssignPlan { + br := NewBalanceReport() + return b.assignChannel(br, collectionID, channels, nodes, manualBalance) +} + +func (b *ScoreBasedBalancer) assignChannel(br *balanceReport, collectionID int64, channels []*meta.DmChannel, nodes []int64, manualBalance bool) []ChannelAssignPlan { + // skip out suspend node and stopping node during assignment, but skip this check for manual balance + if !manualBalance { + nodes = lo.Filter(nodes, func(node int64, _ int) bool { + info := b.nodeManager.Get(node) + normalNode := info != nil && info.GetState() == session.NodeStateNormal + if !normalNode { + br.AddRecord(StrRecord(fmt.Sprintf("non-manual balance, skip abnormal node: %d", node))) + } + return normalNode + }) + } + + // calculate each node's score + nodeItemsMap := b.convertToNodeItemsByChannel(br, collectionID, nodes) + if len(nodeItemsMap) == 0 { + return nil + } + + queue := newPriorityQueue() + for _, item := range nodeItemsMap { + queue.push(item) + } + + balanceBatchSize := paramtable.Get().QueryCoordCfg.CollectionBalanceChannelBatchSize.GetAsInt() + if manualBalance { + balanceBatchSize = math.MaxInt64 + } + plans := make([]ChannelAssignPlan, 0, len(channels)) + for _, ch := range channels { + func(ch *meta.DmChannel) { + // for each channel, pick the node with the least score + targetNode := queue.pop().(*nodeItem) + // make sure candidate is always push back + defer queue.push(targetNode) + scoreChanges := b.calculateChannelScore(ch, collectionID) + + sourceNode := nodeItemsMap[ch.Node] + // if segment's node exist, which means this segment comes from balancer. we should consider the benefit + // if the segment reassignment doesn't got enough benefit, we should skip this reassignment + // notice: we should skip benefit check for manual balance + if !manualBalance && sourceNode != nil && !b.hasEnoughBenefit(sourceNode, targetNode, scoreChanges) { + br.AddRecord(StrRecordf("skip generate balance plan for channel %s since no enough benefit", ch.GetChannelName())) + return + } + + from := int64(-1) + // fromScore := int64(0) + if sourceNode != nil { + from = sourceNode.nodeID + // fromScore = int64(sourceNode.getPriority()) + } + + plan := ChannelAssignPlan{ + From: from, + To: targetNode.nodeID, + Channel: ch, + // FromScore: fromScore, + // ToScore: int64(targetNode.getPriority()), + // SegmentScore: int64(scoreChanges), + } + br.AddRecord(StrRecordf("add segment plan %s", plan)) + plans = append(plans, plan) + + // update the sourceNode and targetNode's score + if sourceNode != nil { + sourceNode.AddCurrentScoreDelta(-scoreChanges) + } + targetNode.AddCurrentScoreDelta(scoreChanges) + }(ch) + + if len(plans) > balanceBatchSize { + break + } + } + + return plans +} + func (b *ScoreBasedBalancer) hasEnoughBenefit(sourceNode *nodeItem, targetNode *nodeItem, scoreChanges float64) bool { // if the score diff between sourceNode and targetNode is lower than the unbalance toleration factor, there is no need to assign it targetNode oldPriorityDiff := math.Abs(float64(sourceNode.getPriority()) - float64(targetNode.getPriority())) @@ -165,14 +252,14 @@ func (b *ScoreBasedBalancer) hasEnoughBenefit(sourceNode *nodeItem, targetNode * return true } -func (b *ScoreBasedBalancer) convertToNodeItems(br *balanceReport, collectionID int64, nodeIDs []int64) map[int64]*nodeItem { +func (b *ScoreBasedBalancer) convertToNodeItemsBySegment(br *balanceReport, collectionID int64, nodeIDs []int64) map[int64]*nodeItem { totalScore := 0 nodeScoreMap := make(map[int64]*nodeItem) nodeMemMap := make(map[int64]float64) totalMemCapacity := float64(0) allNodeHasMemInfo := true for _, node := range nodeIDs { - score := b.calculateScore(br, collectionID, node) + score := b.calculateScoreBySegment(br, collectionID, node) nodeItem := newNodeItem(score, node) nodeScoreMap[node] = &nodeItem totalScore += score @@ -219,7 +306,53 @@ func (b *ScoreBasedBalancer) convertToNodeItems(br *balanceReport, collectionID return nodeScoreMap } -func (b *ScoreBasedBalancer) calculateScore(br *balanceReport, collectionID, nodeID int64) int { +func (b *ScoreBasedBalancer) convertToNodeItemsByChannel(br *balanceReport, collectionID int64, nodeIDs []int64) map[int64]*nodeItem { + totalScore := 0 + nodeScoreMap := make(map[int64]*nodeItem) + nodeMemMap := make(map[int64]float64) + totalMemCapacity := float64(0) + allNodeHasMemInfo := true + for _, node := range nodeIDs { + score := b.calculateScoreByChannel(br, collectionID, node) + nodeItem := newNodeItem(score, node) + nodeScoreMap[node] = &nodeItem + totalScore += score + br.AddNodeItem(nodeScoreMap[node]) + + // set memory default to 1.0, will multiply average value to compute assigned score + nodeInfo := b.nodeManager.Get(node) + if nodeInfo != nil { + totalMemCapacity += nodeInfo.MemCapacity() + nodeMemMap[node] = nodeInfo.MemCapacity() + } + allNodeHasMemInfo = allNodeHasMemInfo && nodeInfo != nil && nodeInfo.MemCapacity() > 0 + } + + if totalScore == 0 { + return nodeScoreMap + } + + // if all node has memory info, we will use totalScore / totalMemCapacity to calculate the score, then average means average score on memory unit + // otherwise, we will use totalScore / len(nodeItemsMap) to calculate the score, then average means average score on node unit + average := float64(0) + if allNodeHasMemInfo { + average = float64(totalScore) / totalMemCapacity + } else { + average = float64(totalScore) / float64(len(nodeIDs)) + } + + for _, node := range nodeIDs { + if allNodeHasMemInfo { + nodeScoreMap[node].setAssignedScore(nodeMemMap[node] * average) + br.SetMemoryFactor(node, nodeMemMap[node]) + } else { + nodeScoreMap[node].setAssignedScore(average) + } + } + return nodeScoreMap +} + +func (b *ScoreBasedBalancer) calculateScoreBySegment(br *balanceReport, collectionID, nodeID int64) int { nodeRowCount := 0 // calculate global sealed segment row count globalSegments := b.dist.SegmentDistManager.GetByFilter(meta.WithNodeID(nodeID)) @@ -259,11 +392,45 @@ func (b *ScoreBasedBalancer) calculateScore(br *balanceReport, collectionID, nod params.Params.QueryCoordCfg.GlobalRowCountFactor.GetAsFloat()) } +func (b *ScoreBasedBalancer) calculateScoreByChannel(br *balanceReport, collectionID, nodeID int64) int { + // calculate global sealed segment row count + channels := b.dist.ChannelDistManager.GetByFilter(meta.WithNodeID2Channel(nodeID)) + + nodeChannelNum, collectionChannelNum := 0, 0 + for _, ch := range channels { + if ch.GetCollectionID() == collectionID { + collectionChannelNum += 1 + } else { + nodeChannelNum += int(b.calculateChannelScore(ch, -1)) + } + } + + // calculate executing task cost in scheduler + nodeChannelNum += b.scheduler.GetChannelTaskDelta(nodeID, -1) + collectionChannelNum += b.scheduler.GetChannelTaskDelta(nodeID, collectionID) + + br.AddDetailRecord(StrRecordf("Calcalute score for collection %d on node %d, global row count: %d, collection row count: %d", + collectionID, nodeID, nodeChannelNum, collectionChannelNum)) + + // give a higher weight to distribute collection's channels evenly across multiple nodes. + channelWeight := paramtable.Get().QueryCoordCfg.CollectionChannelCountFactor.GetAsFloat() + return nodeChannelNum + int(float64(collectionChannelNum)*math.Max(1.0, channelWeight)) +} + // calculateSegmentScore calculate the score which the segment represented func (b *ScoreBasedBalancer) calculateSegmentScore(s *meta.Segment) float64 { return float64(s.GetNumOfRows()) * (1 + params.Params.QueryCoordCfg.GlobalRowCountFactor.GetAsFloat()) } +func (b *ScoreBasedBalancer) calculateChannelScore(ch *meta.DmChannel, currentCollection int64) float64 { + if ch.GetCollectionID() == currentCollection { + // give a higher weight to distribute current collection's channels evenly across multiple nodes. + channelWeight := paramtable.Get().QueryCoordCfg.CollectionChannelCountFactor.GetAsFloat() + return math.Max(1.0, channelWeight) + } + return 1 +} + func (b *ScoreBasedBalancer) BalanceReplica(ctx context.Context, replica *meta.Replica) (segmentPlans []SegmentAssignPlan, channelPlans []ChannelAssignPlan) { log := log.With( zap.Int64("collection", replica.GetCollectionID()), @@ -346,12 +513,12 @@ func (b *ScoreBasedBalancer) genStoppingSegmentPlan(ctx context.Context, replica func (b *ScoreBasedBalancer) genSegmentPlan(ctx context.Context, br *balanceReport, replica *meta.Replica, onlineNodes []int64) []SegmentAssignPlan { segmentDist := make(map[int64][]*meta.Segment) - nodeItemsMap := b.convertToNodeItems(br, replica.GetCollectionID(), onlineNodes) + nodeItemsMap := b.convertToNodeItemsBySegment(br, replica.GetCollectionID(), onlineNodes) if len(nodeItemsMap) == 0 { return nil } - log.Info("node workload status", + log.Info("node segment workload status", zap.Int64("collectionID", replica.GetCollectionID()), zap.Int64("replicaID", replica.GetID()), zap.Stringers("nodes", lo.Values(nodeItemsMap))) @@ -365,8 +532,6 @@ func (b *ScoreBasedBalancer) genSegmentPlan(ctx context.Context, br *balanceRepo segmentDist[node] = segments } - balanceBatchSize := paramtable.Get().QueryCoordCfg.CollectionBalanceSegmentBatchSize.GetAsInt() - // find the segment from the node which has more score than the average segmentsToMove := make([]*meta.Segment, 0) for node, segments := range segmentDist { @@ -384,11 +549,6 @@ func (b *ScoreBasedBalancer) genSegmentPlan(ctx context.Context, br *balanceRepo segmentScore := b.calculateSegmentScore(s) br.AddRecord(StrRecordf("pick segment %d with score %f from node %d", s.ID, segmentScore, node)) segmentsToMove = append(segmentsToMove, s) - if len(segmentsToMove) >= balanceBatchSize { - br.AddRecord(StrRecordf("stop add segment candidate since current plan is equal to batch max(%d)", balanceBatchSize)) - break - } - currentScore -= segmentScore if currentScore <= assignedScore { br.AddRecord(StrRecordf("stop add segment candidate since node[%d] current score(%f) below assigned(%f)", node, currentScore, assignedScore)) @@ -419,3 +579,70 @@ func (b *ScoreBasedBalancer) genSegmentPlan(ctx context.Context, br *balanceRepo return segmentPlans } + +func (b *ScoreBasedBalancer) genChannelPlan(ctx context.Context, br *balanceReport, replica *meta.Replica, onlineNodes []int64) []ChannelAssignPlan { + nodeItemsMap := b.convertToNodeItemsByChannel(br, replica.GetCollectionID(), onlineNodes) + if len(nodeItemsMap) == 0 { + return nil + } + + log.Info("node channel workload status", + zap.Int64("collectionID", replica.GetCollectionID()), + zap.Int64("replicaID", replica.GetID()), + zap.Stringers("nodes", lo.Values(nodeItemsMap))) + + channelDist := make(map[int64][]*meta.DmChannel) + for _, node := range onlineNodes { + channelDist[node] = b.dist.ChannelDistManager.GetByFilter(meta.WithCollectionID2Channel(replica.GetCollectionID()), meta.WithNodeID2Channel(node)) + } + + balanceBatchSize := paramtable.Get().QueryCoordCfg.CollectionBalanceSegmentBatchSize.GetAsInt() + // find the segment from the node which has more score than the average + channelsToMove := make([]*meta.DmChannel, 0) + for node, channels := range channelDist { + currentScore := nodeItemsMap[node].getCurrentScore() + assignedScore := nodeItemsMap[node].getAssignedScore() + if currentScore <= assignedScore { + br.AddRecord(StrRecordf("node %d skip balance since current score(%f) lower than assigned one (%f)", node, currentScore, assignedScore)) + continue + } + + for _, ch := range channels { + channelScore := b.calculateChannelScore(ch, replica.GetCollectionID()) + br.AddRecord(StrRecordf("pick channel %s with score %f from node %d", ch.GetChannelName(), channelScore, node)) + channelsToMove = append(channelsToMove, ch) + if len(channelsToMove) >= balanceBatchSize { + br.AddRecord(StrRecordf("stop add channel candidate since current plan is equal to batch max(%d)", balanceBatchSize)) + break + } + + currentScore -= channelScore + if currentScore <= assignedScore { + br.AddRecord(StrRecordf("stop add channel candidate since node[%d] current score(%f) below assigned(%f)", node, currentScore, assignedScore)) + break + } + } + } + + // if the channel are redundant, skip it's balance for now + channelsToMove = lo.Filter(channelsToMove, func(ch *meta.DmChannel, _ int) bool { + times := len(b.dist.ChannelDistManager.GetByFilter(meta.WithReplica2Channel(replica), meta.WithChannelName2Channel(ch.GetChannelName()))) + channelUnique := times == 1 + if !channelUnique { + br.AddRecord(StrRecordf("abort balancing channel %s since it appear multiple times(%d) in distribution", ch.GetChannelName(), times)) + } + return channelUnique + }) + + if len(channelsToMove) == 0 { + return nil + } + + channelPlans := b.assignChannel(br, replica.GetCollectionID(), channelsToMove, onlineNodes, false) + for i := range channelPlans { + channelPlans[i].From = channelPlans[i].Channel.Node + channelPlans[i].Replica = replica + } + + return channelPlans +} diff --git a/internal/querycoordv2/balance/score_based_balancer_test.go b/internal/querycoordv2/balance/score_based_balancer_test.go index 225c7c0f0bc91..abb41ffb74830 100644 --- a/internal/querycoordv2/balance/score_based_balancer_test.go +++ b/internal/querycoordv2/balance/score_based_balancer_test.go @@ -17,6 +17,7 @@ package balance import ( "context" + "fmt" "testing" "github.com/samber/lo" @@ -608,9 +609,9 @@ func (suite *ScoreBasedBalancerTestSuite) TestBalanceWithExecutingTask() { // set node delta count suite.mockScheduler.ExpectedCalls = nil - for i, node := range c.nodes { - suite.mockScheduler.EXPECT().GetSegmentTaskDelta(node, int64(1)).Return(c.deltaCounts[i]).Maybe() - suite.mockScheduler.EXPECT().GetSegmentTaskDelta(node, int64(-1)).Return(c.deltaCounts[i]).Maybe() + for i := range c.nodes { + suite.mockScheduler.EXPECT().GetSegmentTaskDelta(c.nodes[i], mock.Anything).Return(c.deltaCounts[i]).Maybe() + suite.mockScheduler.EXPECT().GetChannelTaskDelta(c.nodes[i], mock.Anything).Return(c.deltaCounts[i]).Maybe() suite.mockScheduler.EXPECT().GetSegmentTaskNum(mock.Anything, mock.Anything).Return(0).Maybe() suite.mockScheduler.EXPECT().GetChannelTaskNum(mock.Anything, mock.Anything).Return(0).Maybe() } @@ -1221,3 +1222,251 @@ func (suite *ScoreBasedBalancerTestSuite) TestBalanceSegmentAndChannel() { _, channelPlans = suite.getCollectionBalancePlans(balancer, collectionID) suite.Equal(len(channelPlans), 0) } + +func (suite *ScoreBasedBalancerTestSuite) TestBalanceChannelOnMultiCollections() { + ctx := context.Background() + balancer := suite.balancer + + // mock 10 collections with each collection has 1 channel + collectionNum := 10 + channelNum := 1 + for i := 1; i <= collectionNum; i++ { + collectionID := int64(i) + collection := utils.CreateTestCollection(collectionID, int32(1)) + collection.LoadPercentage = 100 + collection.Status = querypb.LoadStatus_Loaded + balancer.meta.CollectionManager.PutCollection(ctx, collection) + balancer.meta.CollectionManager.PutPartition(ctx, utils.CreateTestPartition(collectionID, collectionID)) + balancer.meta.ReplicaManager.Spawn(ctx, collectionID, map[string]int{meta.DefaultResourceGroupName: 1}, nil) + + channels := make([]*datapb.VchannelInfo, channelNum) + for i := 0; i < channelNum; i++ { + channels[i] = &datapb.VchannelInfo{CollectionID: collectionID, ChannelName: fmt.Sprintf("channel-%d-%d", collectionID, i)} + } + suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, collectionID).Return( + channels, nil, nil) + suite.broker.EXPECT().GetPartitions(mock.Anything, collectionID).Return([]int64{collectionID}, nil).Maybe() + balancer.targetMgr.UpdateCollectionNextTarget(ctx, collectionID) + balancer.targetMgr.UpdateCollectionCurrentTarget(ctx, collectionID) + } + + // mock querynode-1 to node manager + nodeInfo := session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: 1, + Address: "127.0.0.1:0", + Hostname: "localhost", + Version: common.Version, + }) + nodeInfo.SetState(session.NodeStateNormal) + suite.balancer.nodeManager.Add(nodeInfo) + suite.balancer.meta.ResourceManager.HandleNodeUp(ctx, 1) + utils.RecoverAllCollection(balancer.meta) + + // mock channel distribution + channelDist := make([]*meta.DmChannel, 0) + for i := 1; i <= collectionNum; i++ { + collectionID := int64(i) + for i := 0; i < channelNum; i++ { + channelDist = append(channelDist, &meta.DmChannel{ + VchannelInfo: &datapb.VchannelInfo{CollectionID: collectionID, ChannelName: fmt.Sprintf("channel-%d-%d", collectionID, i)}, Node: 1, + }) + } + } + balancer.dist.ChannelDistManager.Update(1, channelDist...) + + // assert balance channel won't happens on 1 querynode + ret := make([]ChannelAssignPlan, 0) + for i := 1; i <= collectionNum; i++ { + collectionID := int64(i) + _, channelPlans := suite.getCollectionBalancePlans(balancer, collectionID) + ret = append(ret, channelPlans...) + } + suite.Len(ret, 0) + + // mock querynode-2 to node manager + nodeInfo2 := session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: 2, + Address: "127.0.0.1:0", + Hostname: "localhost", + Version: common.Version, + }) + suite.balancer.nodeManager.Add(nodeInfo2) + suite.balancer.meta.ResourceManager.HandleNodeUp(ctx, 2) + utils.RecoverAllCollection(balancer.meta) + + _, channelPlans := suite.getCollectionBalancePlans(balancer, 1) + suite.Len(channelPlans, 1) + + // mock 1 channel has been move to querynode-2 + balancer.dist.ChannelDistManager.Update(1, channelDist[1:]...) + balancer.dist.ChannelDistManager.Update(2, channelDist[:1]...) + _, channelPlans = suite.getCollectionBalancePlans(balancer, 6) + suite.Len(channelPlans, 1) + + // mock 5 channel has been move to querynode-2 + balancer.dist.ChannelDistManager.Update(1, channelDist[5:]...) + balancer.dist.ChannelDistManager.Update(2, channelDist[:5]...) + _, channelPlans = suite.getCollectionBalancePlans(balancer, 6) + suite.Len(channelPlans, 0) +} + +func (suite *ScoreBasedBalancerTestSuite) TestBalanceChannelOnDifferentQN() { + ctx := context.Background() + balancer := suite.balancer + + // mock 10 collections with each collection has 1 channel + channelNum := 5 + collectionID := int64(1) + collection := utils.CreateTestCollection(collectionID, int32(1)) + collection.LoadPercentage = 100 + collection.Status = querypb.LoadStatus_Loaded + balancer.meta.CollectionManager.PutCollection(ctx, collection) + balancer.meta.CollectionManager.PutPartition(ctx, utils.CreateTestPartition(collectionID, collectionID)) + balancer.meta.ReplicaManager.Spawn(ctx, collectionID, map[string]int{meta.DefaultResourceGroupName: 1}, nil) + + channels := make([]*datapb.VchannelInfo, channelNum) + for i := 0; i < channelNum; i++ { + channels[i] = &datapb.VchannelInfo{CollectionID: collectionID, ChannelName: fmt.Sprintf("channel-%d-%d", collectionID, i)} + } + suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, collectionID).Return( + channels, nil, nil) + suite.broker.EXPECT().GetPartitions(mock.Anything, collectionID).Return([]int64{collectionID}, nil).Maybe() + balancer.targetMgr.UpdateCollectionNextTarget(ctx, collectionID) + balancer.targetMgr.UpdateCollectionCurrentTarget(ctx, collectionID) + + // mock querynode-1 to node manager + nodeInfo := session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: 1, + Address: "127.0.0.1:0", + Hostname: "localhost", + Version: common.Version, + }) + nodeInfo.UpdateStats(session.WithMemCapacity(1024)) + suite.balancer.nodeManager.Add(nodeInfo) + suite.balancer.meta.ResourceManager.HandleNodeUp(ctx, 1) + utils.RecoverAllCollection(balancer.meta) + + // mock channel distribution + channelDist := make([]*meta.DmChannel, 0) + for i := 0; i < channelNum; i++ { + channelDist = append(channelDist, &meta.DmChannel{ + VchannelInfo: &datapb.VchannelInfo{CollectionID: collectionID, ChannelName: fmt.Sprintf("channel-%d-%d", collectionID, i)}, Node: 1, + }) + } + balancer.dist.ChannelDistManager.Update(1, channelDist...) + + // assert balance channel won't happens on 1 querynode + _, channelPlans := suite.getCollectionBalancePlans(balancer, collectionID) + suite.Len(channelPlans, 0) + + // mock querynode-2 to node manager + nodeInfo2 := session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: 2, + Address: "127.0.0.1:0", + Hostname: "localhost", + Version: common.Version, + }) + suite.balancer.nodeManager.Add(nodeInfo2) + suite.balancer.meta.ResourceManager.HandleNodeUp(ctx, 2) + utils.RecoverAllCollection(balancer.meta) + + paramtable.Get().Save(paramtable.Get().QueryCoordCfg.CollectionBalanceChannelBatchSize.Key, "10") + defer paramtable.Get().Reset(paramtable.Get().QueryCoordCfg.CollectionBalanceChannelBatchSize.Key) + + // test balance channel on same query node + _, channelPlans = suite.getCollectionBalancePlans(balancer, collectionID) + suite.Len(channelPlans, 2) + + // test balance on different query node + nodeInfo2.UpdateStats(session.WithMemCapacity(4096)) + _, channelPlans = suite.getCollectionBalancePlans(balancer, collectionID) + suite.Len(channelPlans, 4) +} + +func (suite *ScoreBasedBalancerTestSuite) TestBalanceChannelOnChannelExclusive() { + ctx := context.Background() + balancer := suite.balancer + + collectionNum := 3 + channelNum := 3 + nodeNum := 3 + // mock 10 collections with each collection has 1 channel + for i := 1; i <= collectionNum; i++ { + collectionID := int64(i) + collection := utils.CreateTestCollection(collectionID, int32(1)) + collection.LoadPercentage = 100 + collection.Status = querypb.LoadStatus_Loaded + balancer.meta.CollectionManager.PutCollection(ctx, collection) + balancer.meta.CollectionManager.PutPartition(ctx, utils.CreateTestPartition(collectionID, collectionID)) + balancer.meta.ReplicaManager.Spawn(ctx, collectionID, map[string]int{meta.DefaultResourceGroupName: 1}, nil) + + channels := make([]*datapb.VchannelInfo, channelNum) + for i := 0; i < channelNum; i++ { + channels[i] = &datapb.VchannelInfo{CollectionID: collectionID, ChannelName: fmt.Sprintf("channel-%d-%d", collectionID, i)} + } + suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, collectionID).Return( + channels, nil, nil) + suite.broker.EXPECT().GetPartitions(mock.Anything, collectionID).Return([]int64{collectionID}, nil).Maybe() + balancer.targetMgr.UpdateCollectionNextTarget(ctx, collectionID) + balancer.targetMgr.UpdateCollectionCurrentTarget(ctx, collectionID) + } + + // mock querynode to node manager + for i := 1; i <= nodeNum; i++ { + nodeID := int64(i) + nodeInfo := session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: nodeID, + Address: "127.0.0.1:0", + Hostname: "localhost", + Version: common.Version, + }) + nodeInfo.SetState(session.NodeStateNormal) + suite.balancer.nodeManager.Add(nodeInfo) + suite.balancer.meta.ResourceManager.HandleNodeUp(ctx, nodeID) + } + + utils.RecoverAllCollection(balancer.meta) + + // mock channels on collection-a to node 1 + collectionID := int64(1) + channelDist1 := make([]*meta.DmChannel, 0) + for i := 0; i < channelNum; i++ { + channelDist1 = append(channelDist1, &meta.DmChannel{ + VchannelInfo: &datapb.VchannelInfo{CollectionID: collectionID, ChannelName: fmt.Sprintf("channel-%d-%d", collectionID, i)}, Node: 1, + }) + } + balancer.dist.ChannelDistManager.Update(1, channelDist1...) + + collectionID = int64(2) + channelDist2 := make([]*meta.DmChannel, 0) + for i := 0; i < channelNum; i++ { + channelDist2 = append(channelDist2, &meta.DmChannel{ + VchannelInfo: &datapb.VchannelInfo{CollectionID: collectionID, ChannelName: fmt.Sprintf("channel-%d-%d", collectionID, i)}, Node: 1, + }) + } + balancer.dist.ChannelDistManager.Update(2, channelDist2...) + + collectionID = int64(3) + channelDist3 := make([]*meta.DmChannel, 0) + for i := 0; i < channelNum; i++ { + channelDist3 = append(channelDist3, &meta.DmChannel{ + VchannelInfo: &datapb.VchannelInfo{CollectionID: collectionID, ChannelName: fmt.Sprintf("channel-%d-%d", collectionID, i)}, Node: 1, + }) + } + balancer.dist.ChannelDistManager.Update(3, channelDist3...) + + // test balance on collection 1 + _, channelPlans := suite.getCollectionBalancePlans(balancer, 1) + suite.Len(channelPlans, 2) + + // mock collection 1 has balanced + balancer.dist.ChannelDistManager.Update(1, channelDist1[0]) + balancer.dist.ChannelDistManager.Update(2, channelDist1[1], channelDist2[0], channelDist2[1], channelDist2[2]) + balancer.dist.ChannelDistManager.Update(3, channelDist1[2], channelDist3[0], channelDist3[1], channelDist3[2]) + _, channelPlans = suite.getCollectionBalancePlans(balancer, 1) + suite.Len(channelPlans, 0) + _, channelPlans = suite.getCollectionBalancePlans(balancer, 2) + suite.Len(channelPlans, 2) + _, channelPlans = suite.getCollectionBalancePlans(balancer, 3) + suite.Len(channelPlans, 2) +} diff --git a/internal/querycoordv2/checkers/channel_checker.go b/internal/querycoordv2/checkers/channel_checker.go index ce7e36c31e00a..b1352a4f74f15 100644 --- a/internal/querycoordv2/checkers/channel_checker.go +++ b/internal/querycoordv2/checkers/channel_checker.go @@ -232,7 +232,7 @@ func (c *ChannelChecker) createChannelLoadTask(ctx context.Context, channels []* if len(rwNodes) == 0 { rwNodes = replica.GetRWNodes() } - plan := c.getBalancerFunc().AssignChannel(ctx, []*meta.DmChannel{ch}, rwNodes, false) + plan := c.getBalancerFunc().AssignChannel(ctx, replica.GetCollectionID(), []*meta.DmChannel{ch}, rwNodes, false) plans = append(plans, plan...) } diff --git a/internal/querycoordv2/checkers/channel_checker_test.go b/internal/querycoordv2/checkers/channel_checker_test.go index 9ea63f1da1ea1..2012f3185ba17 100644 --- a/internal/querycoordv2/checkers/channel_checker_test.go +++ b/internal/querycoordv2/checkers/channel_checker_test.go @@ -100,7 +100,7 @@ func (suite *ChannelCheckerTestSuite) setNodeAvailable(nodes ...int64) { func (suite *ChannelCheckerTestSuite) createMockBalancer() balance.Balance { balancer := balance.NewMockBalancer(suite.T()) - balancer.EXPECT().AssignChannel(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Maybe().Return(func(ctx context.Context, channels []*meta.DmChannel, nodes []int64, _ bool) []balance.ChannelAssignPlan { + balancer.EXPECT().AssignChannel(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Maybe().Return(func(ctx context.Context, collectionID int64, channels []*meta.DmChannel, nodes []int64, _ bool) []balance.ChannelAssignPlan { plans := make([]balance.ChannelAssignPlan, 0, len(channels)) for i, c := range channels { plan := balance.ChannelAssignPlan{ diff --git a/internal/querycoordv2/checkers/controller_test.go b/internal/querycoordv2/checkers/controller_test.go index 60200c847bb5c..1caef1db720a1 100644 --- a/internal/querycoordv2/checkers/controller_test.go +++ b/internal/querycoordv2/checkers/controller_test.go @@ -140,7 +140,7 @@ func (suite *CheckerControllerSuite) TestBasic() { assignSegCounter.Inc() return nil }) - suite.balancer.EXPECT().AssignChannel(mock.Anything, mock.Anything, mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, dc []*meta.DmChannel, i []int64, _ bool) []balance.ChannelAssignPlan { + suite.balancer.EXPECT().AssignChannel(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, collectionID int64, dc []*meta.DmChannel, i []int64, _ bool) []balance.ChannelAssignPlan { assingChanCounter.Inc() return nil }) diff --git a/internal/querycoordv2/handlers.go b/internal/querycoordv2/handlers.go index c6b2e92dfe40d..27329973f5662 100644 --- a/internal/querycoordv2/handlers.go +++ b/internal/querycoordv2/handlers.go @@ -183,7 +183,7 @@ func (s *Server) balanceChannels(ctx context.Context, ) error { log := log.Ctx(ctx).With(zap.Int64("collectionID", collectionID)) - plans := s.getBalancerFunc().AssignChannel(ctx, channels, dstNodes, true) + plans := s.getBalancerFunc().AssignChannel(ctx, collectionID, channels, dstNodes, true) for i := range plans { plans[i].From = srcNode plans[i].Replica = replica diff --git a/internal/querycoordv2/utils/meta.go b/internal/querycoordv2/utils/meta.go index b5e3e7ee50ca2..1744e2367849f 100644 --- a/internal/querycoordv2/utils/meta.go +++ b/internal/querycoordv2/utils/meta.go @@ -82,7 +82,7 @@ func RecoverReplicaOfCollection(ctx context.Context, m *meta.Meta, collectionID logger := log.With(zap.Int64("collectionID", collectionID)) rgNames := m.ReplicaManager.GetResourceGroupByCollection(ctx, collectionID) if rgNames.Len() == 0 { - logger.Error("no resource group found for collection", zap.Int64("collectionID", collectionID)) + logger.Error("no resource group found for collection") return } rgs, err := m.ResourceManager.GetNodesOfMultiRG(ctx, rgNames.Collect()) diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index ec5aefbbe1ec8..b125ec380d54a 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -1780,6 +1780,7 @@ type queryCoordConfig struct { RowCountFactor ParamItem `refreshable:"true"` SegmentCountFactor ParamItem `refreshable:"true"` GlobalSegmentCountFactor ParamItem `refreshable:"true"` + CollectionChannelCountFactor ParamItem `refreshable:"true"` SegmentCountMaxSteps ParamItem `refreshable:"true"` RowCountMaxSteps ParamItem `refreshable:"true"` RandomMaxSteps ParamItem `refreshable:"true"` @@ -1827,6 +1828,7 @@ type queryCoordConfig struct { CollectionObserverInterval ParamItem `refreshable:"false"` CheckExecutedFlagInterval ParamItem `refreshable:"false"` CollectionBalanceSegmentBatchSize ParamItem `refreshable:"true"` + CollectionBalanceChannelBatchSize ParamItem `refreshable:"true"` UpdateCollectionLoadStatusInterval ParamItem `refreshable:"false"` ClusterLevelLoadReplicaNumber ParamItem `refreshable:"true"` ClusterLevelLoadResourceGroups ParamItem `refreshable:"true"` @@ -1945,6 +1947,17 @@ If this parameter is set false, Milvus simply searches the growing segments with } p.GlobalSegmentCountFactor.Init(base.mgr) + p.CollectionChannelCountFactor = ParamItem{ + Key: "queryCoord.collectionChannelCountFactor", + Version: "2.4.18", + DefaultValue: "10", + PanicIfEmpty: true, + Doc: `the channel count weight used when balancing channels among queryNodes, + A higher value reduces the likelihood of assigning channels from the same collection to the same QueryNode. Set to 1 to disable this feature.`, + Export: true, + } + p.CollectionChannelCountFactor.Init(base.mgr) + p.SegmentCountMaxSteps = ParamItem{ Key: "queryCoord.segmentCountMaxSteps", Version: "2.3.0", @@ -2367,6 +2380,15 @@ If this parameter is set false, Milvus simply searches the growing segments with } p.CollectionBalanceSegmentBatchSize.Init(base.mgr) + p.CollectionBalanceChannelBatchSize = ParamItem{ + Key: "queryCoord.collectionBalanceChannelBatchSize", + Version: "2.4.18", + DefaultValue: "1", + Doc: "the max balance task number for channel at each round", + Export: false, + } + p.CollectionBalanceChannelBatchSize.Init(base.mgr) + p.ClusterLevelLoadReplicaNumber = ParamItem{ Key: "queryCoord.clusterLevelLoadReplicaNumber", Version: "2.4.7", diff --git a/pkg/util/paramtable/component_param_test.go b/pkg/util/paramtable/component_param_test.go index 204200a0fbad6..63c7c0b54db81 100644 --- a/pkg/util/paramtable/component_param_test.go +++ b/pkg/util/paramtable/component_param_test.go @@ -369,9 +369,12 @@ func TestComponentParam(t *testing.T) { assert.Equal(t, 0.1, Params.DelegatorMemoryOverloadFactor.GetAsFloat()) assert.Equal(t, 5, Params.CollectionBalanceSegmentBatchSize.GetAsInt()) + assert.Equal(t, 1, Params.CollectionBalanceChannelBatchSize.GetAsInt()) assert.Equal(t, 0, Params.ClusterLevelLoadReplicaNumber.GetAsInt()) assert.Len(t, Params.ClusterLevelLoadResourceGroups.GetAsStrings(), 0) + + assert.Equal(t, 10, Params.CollectionChannelCountFactor.GetAsInt()) }) t.Run("test queryNodeConfig", func(t *testing.T) {