From 92de49e38c1703deddb00977c4fedc7063dee545 Mon Sep 17 00:00:00 2001 From: wei liu Date: Tue, 23 Jul 2024 15:57:49 +0800 Subject: [PATCH] fix: Segment may bounce between delegator and worker (#34830) issue: #34595 pr#34596 to we add an overloaded factor to segment in delegator, which cause same segment got different score in delegator and worker. which may cause segment bounce between delegator and worker. This PR use average score to compute the delegator overloaded factor, to avoid segment bounce between delegator and worker. --------- Signed-off-by: Wei Liu --- .../balance/channel_level_score_balancer.go | 9 +- .../channel_level_score_balancer_test.go | 34 +++-- .../balance/score_based_balancer.go | 51 ++++--- .../balance/score_based_balancer_test.go | 139 ++++++++++++++++-- 4 files changed, 177 insertions(+), 56 deletions(-) diff --git a/internal/querycoordv2/balance/channel_level_score_balancer.go b/internal/querycoordv2/balance/channel_level_score_balancer.go index cb59eb67a15ae..332762ec100c2 100644 --- a/internal/querycoordv2/balance/channel_level_score_balancer.go +++ b/internal/querycoordv2/balance/channel_level_score_balancer.go @@ -164,7 +164,7 @@ func (b *ChannelLevelScoreBalancer) genStoppingSegmentPlan(replica *meta.Replica func (b *ChannelLevelScoreBalancer) genSegmentPlan(replica *meta.Replica, channelName string, onlineNodes []int64) []SegmentAssignPlan { segmentDist := make(map[int64][]*meta.Segment) - nodeScore := make(map[int64]int, 0) + nodeScore := b.convertToNodeItems(replica.GetCollectionID(), onlineNodes) totalScore := 0 // list all segment which could be balanced, and calculate node's score @@ -176,10 +176,7 @@ func (b *ChannelLevelScoreBalancer) genSegmentPlan(replica *meta.Replica, channe segment.GetLevel() != datapb.SegmentLevel_L0 }) segmentDist[node] = segments - - rowCount := b.calculateScore(replica.GetCollectionID(), node) - totalScore += rowCount - nodeScore[node] = rowCount + totalScore += nodeScore[node].getPriority() } if totalScore == 0 { @@ -190,7 +187,7 @@ func (b *ChannelLevelScoreBalancer) genSegmentPlan(replica *meta.Replica, channe segmentsToMove := make([]*meta.Segment, 0) average := totalScore / len(onlineNodes) for node, segments := range segmentDist { - leftScore := nodeScore[node] + leftScore := nodeScore[node].getPriority() if leftScore <= average { continue } diff --git a/internal/querycoordv2/balance/channel_level_score_balancer_test.go b/internal/querycoordv2/balance/channel_level_score_balancer_test.go index c9a3ea0bb15cc..d0c3bc079ccec 100644 --- a/internal/querycoordv2/balance/channel_level_score_balancer_test.go +++ b/internal/querycoordv2/balance/channel_level_score_balancer_test.go @@ -84,15 +84,16 @@ func (suite *ChannelLevelScoreBalancerTestSuite) TearDownTest() { func (suite *ChannelLevelScoreBalancerTestSuite) TestAssignSegment() { cases := []struct { - name string - comment string - distributions map[int64][]*meta.Segment - assignments [][]*meta.Segment - nodes []int64 - collectionIDs []int64 - segmentCnts []int - states []session.State - expectPlans [][]SegmentAssignPlan + name string + comment string + distributions map[int64][]*meta.Segment + assignments [][]*meta.Segment + nodes []int64 + collectionIDs []int64 + segmentCnts []int + states []session.State + expectPlans [][]SegmentAssignPlan + unstableAssignment bool }{ { name: "test empty cluster assigning one collection", @@ -105,10 +106,11 @@ func (suite *ChannelLevelScoreBalancerTestSuite) TestAssignSegment() { {SegmentInfo: &datapb.SegmentInfo{ID: 3, NumOfRows: 15, CollectionID: 1}}, }, }, - nodes: []int64{1, 2, 3}, - collectionIDs: []int64{0}, - states: []session.State{session.NodeStateNormal, session.NodeStateNormal, session.NodeStateNormal}, - segmentCnts: []int{0, 0, 0}, + nodes: []int64{1, 2, 3}, + collectionIDs: []int64{0}, + states: []session.State{session.NodeStateNormal, session.NodeStateNormal, session.NodeStateNormal}, + segmentCnts: []int{0, 0, 0}, + unstableAssignment: true, expectPlans: [][]SegmentAssignPlan{ { // as assign segments is used while loading collection, @@ -237,7 +239,11 @@ func (suite *ChannelLevelScoreBalancerTestSuite) TestAssignSegment() { } for i := range c.collectionIDs { plans := balancer.AssignSegment(c.collectionIDs[i], c.assignments[i], c.nodes, false) - assertSegmentAssignPlanElementMatch(&suite.Suite, c.expectPlans[i], plans) + if c.unstableAssignment { + suite.Equal(len(plans), len(c.expectPlans[i])) + } else { + assertSegmentAssignPlanElementMatch(&suite.Suite, c.expectPlans[i], plans) + } } }) } diff --git a/internal/querycoordv2/balance/score_based_balancer.go b/internal/querycoordv2/balance/score_based_balancer.go index b1e2b23f6bdac..87d7c6b5e03bb 100644 --- a/internal/querycoordv2/balance/score_based_balancer.go +++ b/internal/querycoordv2/balance/score_based_balancer.go @@ -60,14 +60,13 @@ func (b *ScoreBasedBalancer) AssignSegment(collectionID int64, segments []*meta. } // calculate each node's score - nodeItems := b.convertToNodeItems(collectionID, nodes) - if len(nodeItems) == 0 { + nodeItemsMap := b.convertToNodeItems(collectionID, nodes) + if len(nodeItemsMap) == 0 { return nil } - nodeItemsMap := lo.SliceToMap(nodeItems, func(item *nodeItem) (int64, *nodeItem) { return item.nodeID, item }) queue := newPriorityQueue() - for _, item := range nodeItems { + for _, item := range nodeItemsMap { queue.push(item) } @@ -139,33 +138,45 @@ func (b *ScoreBasedBalancer) hasEnoughBenefit(sourceNode *nodeItem, targetNode * return true } -func (b *ScoreBasedBalancer) convertToNodeItems(collectionID int64, nodeIDs []int64) []*nodeItem { - ret := make([]*nodeItem, 0, len(nodeIDs)) +func (b *ScoreBasedBalancer) convertToNodeItems(collectionID int64, nodeIDs []int64) map[int64]*nodeItem { + totalScore := 0 + nodeScoreMap := make(map[int64]*nodeItem) for _, node := range nodeIDs { - priority := b.calculateScore(collectionID, node) - nodeItem := newNodeItem(priority, node) - ret = append(ret, &nodeItem) + score := b.calculateScore(collectionID, node) + nodeItem := newNodeItem(score, node) + nodeScoreMap[node] = &nodeItem + totalScore += score } - return ret -} -func (b *ScoreBasedBalancer) calculateScore(collectionID, nodeID int64) int { + if totalScore == 0 { + return nodeScoreMap + } + + average := totalScore / len(nodeIDs) delegatorOverloadFactor := params.Params.QueryCoordCfg.DelegatorMemoryOverloadFactor.GetAsFloat() + // use average * delegatorOverloadFactor * delegator_num, to preserve fixed memory size for delegator + for _, node := range nodeIDs { + collectionViews := b.dist.LeaderViewManager.GetByFilter(meta.WithCollectionID2LeaderView(collectionID), meta.WithNodeID2LeaderView(node)) + if len(collectionViews) > 0 { + newScore := nodeScoreMap[node].getPriority() + int(float64(average)*delegatorOverloadFactor)*len(collectionViews) + nodeScoreMap[node].setPriority(newScore) + } + } + return nodeScoreMap +} +func (b *ScoreBasedBalancer) calculateScore(collectionID, nodeID int64) int { nodeRowCount := 0 - nodeCollectionRowCount := make(map[int64]int) // calculate global sealed segment row count globalSegments := b.dist.SegmentDistManager.GetByFilter(meta.WithNodeID(nodeID)) for _, s := range globalSegments { nodeRowCount += int(s.GetNumOfRows()) - nodeCollectionRowCount[s.CollectionID] += int(s.GetNumOfRows()) } // calculate global growing segment row count views := b.dist.LeaderViewManager.GetByFilter(meta.WithNodeID2LeaderView(nodeID)) for _, view := range views { nodeRowCount += int(float64(view.NumOfGrowingRows)) - nodeRowCount += int(float64(nodeCollectionRowCount[view.CollectionID]) * delegatorOverloadFactor) } // calculate executing task cost in scheduler @@ -182,7 +193,6 @@ func (b *ScoreBasedBalancer) calculateScore(collectionID, nodeID int64) int { collectionViews := b.dist.LeaderViewManager.GetByFilter(meta.WithCollectionID2LeaderView(collectionID), meta.WithNodeID2LeaderView(nodeID)) for _, view := range collectionViews { collectionRowCount += int(float64(view.NumOfGrowingRows)) - collectionRowCount += int(float64(collectionRowCount) * delegatorOverloadFactor) } // calculate executing task cost in scheduler @@ -266,7 +276,7 @@ func (b *ScoreBasedBalancer) genStoppingSegmentPlan(replica *meta.Replica, onlin func (b *ScoreBasedBalancer) genSegmentPlan(replica *meta.Replica, onlineNodes []int64) []SegmentAssignPlan { segmentDist := make(map[int64][]*meta.Segment) - nodeScore := make(map[int64]int, 0) + nodeScore := b.convertToNodeItems(replica.GetCollectionID(), onlineNodes) totalScore := 0 // list all segment which could be balanced, and calculate node's score @@ -278,10 +288,7 @@ func (b *ScoreBasedBalancer) genSegmentPlan(replica *meta.Replica, onlineNodes [ segment.GetLevel() != datapb.SegmentLevel_L0 }) segmentDist[node] = segments - - rowCount := b.calculateScore(replica.GetCollectionID(), node) - totalScore += rowCount - nodeScore[node] = rowCount + totalScore += nodeScore[node].getPriority() } if totalScore == 0 { @@ -292,7 +299,7 @@ func (b *ScoreBasedBalancer) genSegmentPlan(replica *meta.Replica, onlineNodes [ segmentsToMove := make([]*meta.Segment, 0) average := totalScore / len(onlineNodes) for node, segments := range segmentDist { - leftScore := nodeScore[node] + leftScore := nodeScore[node].getPriority() if leftScore <= average { continue } diff --git a/internal/querycoordv2/balance/score_based_balancer_test.go b/internal/querycoordv2/balance/score_based_balancer_test.go index 27382dc7e0a41..b125a9ead97e8 100644 --- a/internal/querycoordv2/balance/score_based_balancer_test.go +++ b/internal/querycoordv2/balance/score_based_balancer_test.go @@ -84,15 +84,16 @@ func (suite *ScoreBasedBalancerTestSuite) TearDownTest() { func (suite *ScoreBasedBalancerTestSuite) TestAssignSegment() { cases := []struct { - name string - comment string - distributions map[int64][]*meta.Segment - assignments [][]*meta.Segment - nodes []int64 - collectionIDs []int64 - segmentCnts []int - states []session.State - expectPlans [][]SegmentAssignPlan + name string + comment string + distributions map[int64][]*meta.Segment + assignments [][]*meta.Segment + nodes []int64 + collectionIDs []int64 + segmentCnts []int + states []session.State + expectPlans [][]SegmentAssignPlan + unstableAssignment bool }{ { name: "test empty cluster assigning one collection", @@ -105,10 +106,11 @@ func (suite *ScoreBasedBalancerTestSuite) TestAssignSegment() { {SegmentInfo: &datapb.SegmentInfo{ID: 3, NumOfRows: 15, CollectionID: 1}}, }, }, - nodes: []int64{1, 2, 3}, - collectionIDs: []int64{0}, - states: []session.State{session.NodeStateNormal, session.NodeStateNormal, session.NodeStateNormal}, - segmentCnts: []int{0, 0, 0}, + nodes: []int64{1, 2, 3}, + collectionIDs: []int64{0}, + states: []session.State{session.NodeStateNormal, session.NodeStateNormal, session.NodeStateNormal}, + segmentCnts: []int{0, 0, 0}, + unstableAssignment: true, expectPlans: [][]SegmentAssignPlan{ { // as assign segments is used while loading collection, @@ -237,7 +239,11 @@ func (suite *ScoreBasedBalancerTestSuite) TestAssignSegment() { } for i := range c.collectionIDs { plans := balancer.AssignSegment(c.collectionIDs[i], c.assignments[i], c.nodes, false) - assertSegmentAssignPlanElementMatch(&suite.Suite, c.expectPlans[i], plans) + if c.unstableAssignment { + suite.Len(plans, len(c.expectPlans[i])) + } else { + assertSegmentAssignPlanElementMatch(&suite.Suite, c.expectPlans[i], plans) + } } }) } @@ -461,6 +467,111 @@ func (suite *ScoreBasedBalancerTestSuite) TestBalanceOneRound() { } } +func (suite *ScoreBasedBalancerTestSuite) TestDelegatorPreserveMemory() { + cases := []struct { + name string + nodes []int64 + collectionID int64 + replicaID int64 + collectionsSegments []*datapb.SegmentInfo + states []session.State + shouldMock bool + distributions map[int64][]*meta.Segment + distributionChannels map[int64][]*meta.DmChannel + expectPlans []SegmentAssignPlan + expectChannelPlans []ChannelAssignPlan + }{ + { + name: "normal balance for one collection only", + nodes: []int64{1, 2}, + collectionID: 1, + replicaID: 1, + collectionsSegments: []*datapb.SegmentInfo{ + {ID: 1, PartitionID: 1}, {ID: 2, PartitionID: 1}, {ID: 3, PartitionID: 1}, {ID: 4, PartitionID: 1}, {ID: 5, PartitionID: 1}, + }, + states: []session.State{session.NodeStateNormal, session.NodeStateNormal}, + distributions: map[int64][]*meta.Segment{ + 1: {{SegmentInfo: &datapb.SegmentInfo{ID: 1, CollectionID: 1, NumOfRows: 10}, Node: 1}}, + 2: { + {SegmentInfo: &datapb.SegmentInfo{ID: 2, CollectionID: 1, NumOfRows: 10}, Node: 2}, + {SegmentInfo: &datapb.SegmentInfo{ID: 3, CollectionID: 1, NumOfRows: 10}, Node: 2}, + {SegmentInfo: &datapb.SegmentInfo{ID: 4, CollectionID: 1, NumOfRows: 10}, Node: 2}, + {SegmentInfo: &datapb.SegmentInfo{ID: 5, CollectionID: 1, NumOfRows: 10}, Node: 2}, + }, + }, + expectPlans: []SegmentAssignPlan{}, + expectChannelPlans: []ChannelAssignPlan{}, + }, + } + + for _, c := range cases { + suite.Run(c.name, func() { + suite.SetupSuite() + defer suite.TearDownTest() + balancer := suite.balancer + + // 1. set up target for multi collections + collection := utils.CreateTestCollection(c.collectionID, int32(c.replicaID)) + suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, c.collectionID).Return( + nil, c.collectionsSegments, nil) + suite.broker.EXPECT().GetPartitions(mock.Anything, c.collectionID).Return([]int64{c.collectionID}, nil).Maybe() + collection.LoadPercentage = 100 + collection.Status = querypb.LoadStatus_Loaded + balancer.meta.CollectionManager.PutCollection(collection) + balancer.meta.CollectionManager.PutPartition(utils.CreateTestPartition(c.collectionID, c.collectionID)) + balancer.meta.ReplicaManager.Put(utils.CreateTestReplica(c.replicaID, c.collectionID, c.nodes)) + balancer.targetMgr.UpdateCollectionNextTarget(c.collectionID) + balancer.targetMgr.UpdateCollectionCurrentTarget(c.collectionID) + balancer.targetMgr.UpdateCollectionNextTarget(c.collectionID) + + // 2. set up target for distribution for multi collections + for node, s := range c.distributions { + balancer.dist.SegmentDistManager.Update(node, s...) + } + for node, v := range c.distributionChannels { + balancer.dist.ChannelDistManager.Update(node, v...) + } + + leaderView := &meta.LeaderView{ + ID: 1, + CollectionID: 1, + } + suite.balancer.dist.LeaderViewManager.Update(1, leaderView) + + // 3. set up nodes info and resourceManager for balancer + for i := range c.nodes { + nodeInfo := session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: c.nodes[i], + Address: "127.0.0.1:0", + Hostname: "localhost", + }) + nodeInfo.UpdateStats(session.WithChannelCnt(len(c.distributionChannels[c.nodes[i]]))) + nodeInfo.SetState(c.states[i]) + suite.balancer.nodeManager.Add(nodeInfo) + suite.balancer.meta.ResourceManager.HandleNodeUp(c.nodes[i]) + } + utils.RecoverAllCollection(balancer.meta) + + // disable delegator preserve memory + paramtable.Get().Save(paramtable.Get().QueryCoordCfg.DelegatorMemoryOverloadFactor.Key, "0") + segmentPlans, channelPlans := suite.getCollectionBalancePlans(balancer, c.collectionID) + suite.Len(channelPlans, 0) + suite.Len(segmentPlans, 1) + suite.Equal(segmentPlans[0].To, int64(1)) + + paramtable.Get().Save(paramtable.Get().QueryCoordCfg.DelegatorMemoryOverloadFactor.Key, "1") + segmentPlans, channelPlans = suite.getCollectionBalancePlans(balancer, c.collectionID) + suite.Len(channelPlans, 0) + suite.Len(segmentPlans, 0) + + paramtable.Get().Save(paramtable.Get().QueryCoordCfg.DelegatorMemoryOverloadFactor.Key, "2") + segmentPlans, channelPlans = suite.getCollectionBalancePlans(balancer, c.collectionID) + suite.Len(segmentPlans, 1) + suite.Equal(segmentPlans[0].To, int64(2)) + }) + } +} + func (suite *ScoreBasedBalancerTestSuite) TestBalanceWithExecutingTask() { cases := []struct { name string