Skip to content

Commit

Permalink
fix: Segment may bounce between delegator and worker (#34830)
Browse files Browse the repository at this point in the history
issue: #34595

pr#34596 to we add an overloaded factor to segment in delegator, which
cause same segment got different score in delegator and worker. which
may cause segment bounce between delegator and worker.

This PR use average score to compute the delegator overloaded factor, to
avoid segment bounce between delegator and worker.

---------

Signed-off-by: Wei Liu <[email protected]>
  • Loading branch information
weiliu1031 authored Jul 23, 2024
1 parent a4d7da4 commit 92de49e
Show file tree
Hide file tree
Showing 4 changed files with 177 additions and 56 deletions.
9 changes: 3 additions & 6 deletions internal/querycoordv2/balance/channel_level_score_balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ func (b *ChannelLevelScoreBalancer) genStoppingSegmentPlan(replica *meta.Replica

func (b *ChannelLevelScoreBalancer) genSegmentPlan(replica *meta.Replica, channelName string, onlineNodes []int64) []SegmentAssignPlan {
segmentDist := make(map[int64][]*meta.Segment)
nodeScore := make(map[int64]int, 0)
nodeScore := b.convertToNodeItems(replica.GetCollectionID(), onlineNodes)
totalScore := 0

// list all segment which could be balanced, and calculate node's score
Expand All @@ -176,10 +176,7 @@ func (b *ChannelLevelScoreBalancer) genSegmentPlan(replica *meta.Replica, channe
segment.GetLevel() != datapb.SegmentLevel_L0
})
segmentDist[node] = segments

rowCount := b.calculateScore(replica.GetCollectionID(), node)
totalScore += rowCount
nodeScore[node] = rowCount
totalScore += nodeScore[node].getPriority()
}

if totalScore == 0 {
Expand All @@ -190,7 +187,7 @@ func (b *ChannelLevelScoreBalancer) genSegmentPlan(replica *meta.Replica, channe
segmentsToMove := make([]*meta.Segment, 0)
average := totalScore / len(onlineNodes)
for node, segments := range segmentDist {
leftScore := nodeScore[node]
leftScore := nodeScore[node].getPriority()
if leftScore <= average {
continue
}
Expand Down
34 changes: 20 additions & 14 deletions internal/querycoordv2/balance/channel_level_score_balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,15 +84,16 @@ func (suite *ChannelLevelScoreBalancerTestSuite) TearDownTest() {

func (suite *ChannelLevelScoreBalancerTestSuite) TestAssignSegment() {
cases := []struct {
name string
comment string
distributions map[int64][]*meta.Segment
assignments [][]*meta.Segment
nodes []int64
collectionIDs []int64
segmentCnts []int
states []session.State
expectPlans [][]SegmentAssignPlan
name string
comment string
distributions map[int64][]*meta.Segment
assignments [][]*meta.Segment
nodes []int64
collectionIDs []int64
segmentCnts []int
states []session.State
expectPlans [][]SegmentAssignPlan
unstableAssignment bool
}{
{
name: "test empty cluster assigning one collection",
Expand All @@ -105,10 +106,11 @@ func (suite *ChannelLevelScoreBalancerTestSuite) TestAssignSegment() {
{SegmentInfo: &datapb.SegmentInfo{ID: 3, NumOfRows: 15, CollectionID: 1}},
},
},
nodes: []int64{1, 2, 3},
collectionIDs: []int64{0},
states: []session.State{session.NodeStateNormal, session.NodeStateNormal, session.NodeStateNormal},
segmentCnts: []int{0, 0, 0},
nodes: []int64{1, 2, 3},
collectionIDs: []int64{0},
states: []session.State{session.NodeStateNormal, session.NodeStateNormal, session.NodeStateNormal},
segmentCnts: []int{0, 0, 0},
unstableAssignment: true,
expectPlans: [][]SegmentAssignPlan{
{
// as assign segments is used while loading collection,
Expand Down Expand Up @@ -237,7 +239,11 @@ func (suite *ChannelLevelScoreBalancerTestSuite) TestAssignSegment() {
}
for i := range c.collectionIDs {
plans := balancer.AssignSegment(c.collectionIDs[i], c.assignments[i], c.nodes, false)
assertSegmentAssignPlanElementMatch(&suite.Suite, c.expectPlans[i], plans)
if c.unstableAssignment {
suite.Equal(len(plans), len(c.expectPlans[i]))
} else {
assertSegmentAssignPlanElementMatch(&suite.Suite, c.expectPlans[i], plans)
}
}
})
}
Expand Down
51 changes: 29 additions & 22 deletions internal/querycoordv2/balance/score_based_balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,14 +60,13 @@ func (b *ScoreBasedBalancer) AssignSegment(collectionID int64, segments []*meta.
}

// calculate each node's score
nodeItems := b.convertToNodeItems(collectionID, nodes)
if len(nodeItems) == 0 {
nodeItemsMap := b.convertToNodeItems(collectionID, nodes)
if len(nodeItemsMap) == 0 {
return nil
}

nodeItemsMap := lo.SliceToMap(nodeItems, func(item *nodeItem) (int64, *nodeItem) { return item.nodeID, item })
queue := newPriorityQueue()
for _, item := range nodeItems {
for _, item := range nodeItemsMap {
queue.push(item)
}

Expand Down Expand Up @@ -139,33 +138,45 @@ func (b *ScoreBasedBalancer) hasEnoughBenefit(sourceNode *nodeItem, targetNode *
return true
}

func (b *ScoreBasedBalancer) convertToNodeItems(collectionID int64, nodeIDs []int64) []*nodeItem {
ret := make([]*nodeItem, 0, len(nodeIDs))
func (b *ScoreBasedBalancer) convertToNodeItems(collectionID int64, nodeIDs []int64) map[int64]*nodeItem {
totalScore := 0
nodeScoreMap := make(map[int64]*nodeItem)
for _, node := range nodeIDs {
priority := b.calculateScore(collectionID, node)
nodeItem := newNodeItem(priority, node)
ret = append(ret, &nodeItem)
score := b.calculateScore(collectionID, node)
nodeItem := newNodeItem(score, node)
nodeScoreMap[node] = &nodeItem
totalScore += score
}
return ret
}

func (b *ScoreBasedBalancer) calculateScore(collectionID, nodeID int64) int {
if totalScore == 0 {
return nodeScoreMap
}

average := totalScore / len(nodeIDs)
delegatorOverloadFactor := params.Params.QueryCoordCfg.DelegatorMemoryOverloadFactor.GetAsFloat()
// use average * delegatorOverloadFactor * delegator_num, to preserve fixed memory size for delegator
for _, node := range nodeIDs {
collectionViews := b.dist.LeaderViewManager.GetByFilter(meta.WithCollectionID2LeaderView(collectionID), meta.WithNodeID2LeaderView(node))
if len(collectionViews) > 0 {
newScore := nodeScoreMap[node].getPriority() + int(float64(average)*delegatorOverloadFactor)*len(collectionViews)
nodeScoreMap[node].setPriority(newScore)
}
}
return nodeScoreMap
}

func (b *ScoreBasedBalancer) calculateScore(collectionID, nodeID int64) int {
nodeRowCount := 0
nodeCollectionRowCount := make(map[int64]int)
// calculate global sealed segment row count
globalSegments := b.dist.SegmentDistManager.GetByFilter(meta.WithNodeID(nodeID))
for _, s := range globalSegments {
nodeRowCount += int(s.GetNumOfRows())
nodeCollectionRowCount[s.CollectionID] += int(s.GetNumOfRows())
}

// calculate global growing segment row count
views := b.dist.LeaderViewManager.GetByFilter(meta.WithNodeID2LeaderView(nodeID))
for _, view := range views {
nodeRowCount += int(float64(view.NumOfGrowingRows))
nodeRowCount += int(float64(nodeCollectionRowCount[view.CollectionID]) * delegatorOverloadFactor)
}

// calculate executing task cost in scheduler
Expand All @@ -182,7 +193,6 @@ func (b *ScoreBasedBalancer) calculateScore(collectionID, nodeID int64) int {
collectionViews := b.dist.LeaderViewManager.GetByFilter(meta.WithCollectionID2LeaderView(collectionID), meta.WithNodeID2LeaderView(nodeID))
for _, view := range collectionViews {
collectionRowCount += int(float64(view.NumOfGrowingRows))
collectionRowCount += int(float64(collectionRowCount) * delegatorOverloadFactor)
}

// calculate executing task cost in scheduler
Expand Down Expand Up @@ -266,7 +276,7 @@ func (b *ScoreBasedBalancer) genStoppingSegmentPlan(replica *meta.Replica, onlin

func (b *ScoreBasedBalancer) genSegmentPlan(replica *meta.Replica, onlineNodes []int64) []SegmentAssignPlan {
segmentDist := make(map[int64][]*meta.Segment)
nodeScore := make(map[int64]int, 0)
nodeScore := b.convertToNodeItems(replica.GetCollectionID(), onlineNodes)
totalScore := 0

// list all segment which could be balanced, and calculate node's score
Expand All @@ -278,10 +288,7 @@ func (b *ScoreBasedBalancer) genSegmentPlan(replica *meta.Replica, onlineNodes [
segment.GetLevel() != datapb.SegmentLevel_L0
})
segmentDist[node] = segments

rowCount := b.calculateScore(replica.GetCollectionID(), node)
totalScore += rowCount
nodeScore[node] = rowCount
totalScore += nodeScore[node].getPriority()
}

if totalScore == 0 {
Expand All @@ -292,7 +299,7 @@ func (b *ScoreBasedBalancer) genSegmentPlan(replica *meta.Replica, onlineNodes [
segmentsToMove := make([]*meta.Segment, 0)
average := totalScore / len(onlineNodes)
for node, segments := range segmentDist {
leftScore := nodeScore[node]
leftScore := nodeScore[node].getPriority()
if leftScore <= average {
continue
}
Expand Down
139 changes: 125 additions & 14 deletions internal/querycoordv2/balance/score_based_balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,15 +84,16 @@ func (suite *ScoreBasedBalancerTestSuite) TearDownTest() {

func (suite *ScoreBasedBalancerTestSuite) TestAssignSegment() {
cases := []struct {
name string
comment string
distributions map[int64][]*meta.Segment
assignments [][]*meta.Segment
nodes []int64
collectionIDs []int64
segmentCnts []int
states []session.State
expectPlans [][]SegmentAssignPlan
name string
comment string
distributions map[int64][]*meta.Segment
assignments [][]*meta.Segment
nodes []int64
collectionIDs []int64
segmentCnts []int
states []session.State
expectPlans [][]SegmentAssignPlan
unstableAssignment bool
}{
{
name: "test empty cluster assigning one collection",
Expand All @@ -105,10 +106,11 @@ func (suite *ScoreBasedBalancerTestSuite) TestAssignSegment() {
{SegmentInfo: &datapb.SegmentInfo{ID: 3, NumOfRows: 15, CollectionID: 1}},
},
},
nodes: []int64{1, 2, 3},
collectionIDs: []int64{0},
states: []session.State{session.NodeStateNormal, session.NodeStateNormal, session.NodeStateNormal},
segmentCnts: []int{0, 0, 0},
nodes: []int64{1, 2, 3},
collectionIDs: []int64{0},
states: []session.State{session.NodeStateNormal, session.NodeStateNormal, session.NodeStateNormal},
segmentCnts: []int{0, 0, 0},
unstableAssignment: true,
expectPlans: [][]SegmentAssignPlan{
{
// as assign segments is used while loading collection,
Expand Down Expand Up @@ -237,7 +239,11 @@ func (suite *ScoreBasedBalancerTestSuite) TestAssignSegment() {
}
for i := range c.collectionIDs {
plans := balancer.AssignSegment(c.collectionIDs[i], c.assignments[i], c.nodes, false)
assertSegmentAssignPlanElementMatch(&suite.Suite, c.expectPlans[i], plans)
if c.unstableAssignment {
suite.Len(plans, len(c.expectPlans[i]))
} else {
assertSegmentAssignPlanElementMatch(&suite.Suite, c.expectPlans[i], plans)
}
}
})
}
Expand Down Expand Up @@ -461,6 +467,111 @@ func (suite *ScoreBasedBalancerTestSuite) TestBalanceOneRound() {
}
}

func (suite *ScoreBasedBalancerTestSuite) TestDelegatorPreserveMemory() {
cases := []struct {
name string
nodes []int64
collectionID int64
replicaID int64
collectionsSegments []*datapb.SegmentInfo
states []session.State
shouldMock bool
distributions map[int64][]*meta.Segment
distributionChannels map[int64][]*meta.DmChannel
expectPlans []SegmentAssignPlan
expectChannelPlans []ChannelAssignPlan
}{
{
name: "normal balance for one collection only",
nodes: []int64{1, 2},
collectionID: 1,
replicaID: 1,
collectionsSegments: []*datapb.SegmentInfo{
{ID: 1, PartitionID: 1}, {ID: 2, PartitionID: 1}, {ID: 3, PartitionID: 1}, {ID: 4, PartitionID: 1}, {ID: 5, PartitionID: 1},
},
states: []session.State{session.NodeStateNormal, session.NodeStateNormal},
distributions: map[int64][]*meta.Segment{
1: {{SegmentInfo: &datapb.SegmentInfo{ID: 1, CollectionID: 1, NumOfRows: 10}, Node: 1}},
2: {
{SegmentInfo: &datapb.SegmentInfo{ID: 2, CollectionID: 1, NumOfRows: 10}, Node: 2},
{SegmentInfo: &datapb.SegmentInfo{ID: 3, CollectionID: 1, NumOfRows: 10}, Node: 2},
{SegmentInfo: &datapb.SegmentInfo{ID: 4, CollectionID: 1, NumOfRows: 10}, Node: 2},
{SegmentInfo: &datapb.SegmentInfo{ID: 5, CollectionID: 1, NumOfRows: 10}, Node: 2},
},
},
expectPlans: []SegmentAssignPlan{},
expectChannelPlans: []ChannelAssignPlan{},
},
}

for _, c := range cases {
suite.Run(c.name, func() {
suite.SetupSuite()
defer suite.TearDownTest()
balancer := suite.balancer

// 1. set up target for multi collections
collection := utils.CreateTestCollection(c.collectionID, int32(c.replicaID))
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, c.collectionID).Return(
nil, c.collectionsSegments, nil)
suite.broker.EXPECT().GetPartitions(mock.Anything, c.collectionID).Return([]int64{c.collectionID}, nil).Maybe()
collection.LoadPercentage = 100
collection.Status = querypb.LoadStatus_Loaded
balancer.meta.CollectionManager.PutCollection(collection)
balancer.meta.CollectionManager.PutPartition(utils.CreateTestPartition(c.collectionID, c.collectionID))
balancer.meta.ReplicaManager.Put(utils.CreateTestReplica(c.replicaID, c.collectionID, c.nodes))
balancer.targetMgr.UpdateCollectionNextTarget(c.collectionID)
balancer.targetMgr.UpdateCollectionCurrentTarget(c.collectionID)
balancer.targetMgr.UpdateCollectionNextTarget(c.collectionID)

// 2. set up target for distribution for multi collections
for node, s := range c.distributions {
balancer.dist.SegmentDistManager.Update(node, s...)
}
for node, v := range c.distributionChannels {
balancer.dist.ChannelDistManager.Update(node, v...)
}

leaderView := &meta.LeaderView{
ID: 1,
CollectionID: 1,
}
suite.balancer.dist.LeaderViewManager.Update(1, leaderView)

// 3. set up nodes info and resourceManager for balancer
for i := range c.nodes {
nodeInfo := session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: c.nodes[i],
Address: "127.0.0.1:0",
Hostname: "localhost",
})
nodeInfo.UpdateStats(session.WithChannelCnt(len(c.distributionChannels[c.nodes[i]])))
nodeInfo.SetState(c.states[i])
suite.balancer.nodeManager.Add(nodeInfo)
suite.balancer.meta.ResourceManager.HandleNodeUp(c.nodes[i])
}
utils.RecoverAllCollection(balancer.meta)

// disable delegator preserve memory
paramtable.Get().Save(paramtable.Get().QueryCoordCfg.DelegatorMemoryOverloadFactor.Key, "0")
segmentPlans, channelPlans := suite.getCollectionBalancePlans(balancer, c.collectionID)
suite.Len(channelPlans, 0)
suite.Len(segmentPlans, 1)
suite.Equal(segmentPlans[0].To, int64(1))

paramtable.Get().Save(paramtable.Get().QueryCoordCfg.DelegatorMemoryOverloadFactor.Key, "1")
segmentPlans, channelPlans = suite.getCollectionBalancePlans(balancer, c.collectionID)
suite.Len(channelPlans, 0)
suite.Len(segmentPlans, 0)

paramtable.Get().Save(paramtable.Get().QueryCoordCfg.DelegatorMemoryOverloadFactor.Key, "2")
segmentPlans, channelPlans = suite.getCollectionBalancePlans(balancer, c.collectionID)
suite.Len(segmentPlans, 1)
suite.Equal(segmentPlans[0].To, int64(2))
})
}
}

func (suite *ScoreBasedBalancerTestSuite) TestBalanceWithExecutingTask() {
cases := []struct {
name string
Expand Down

0 comments on commit 92de49e

Please sign in to comment.