From a09c85e4d465a64e73dfb67925cc7ca0aeee3a19 Mon Sep 17 00:00:00 2001 From: Wei Liu Date: Mon, 23 Dec 2024 17:37:12 +0800 Subject: [PATCH] fix: Skip balance on collection with zero row count segment Signed-off-by: Wei Liu --- .../querycoordv2/checkers/balance_checker.go | 29 +++++++- .../checkers/balance_checker_test.go | 71 ++++++++++++++++++- internal/querycoordv2/checkers/controller.go | 2 +- 3 files changed, 99 insertions(+), 3 deletions(-) diff --git a/internal/querycoordv2/checkers/balance_checker.go b/internal/querycoordv2/checkers/balance_checker.go index fb7fa0447f40c..f8733c60161c5 100644 --- a/internal/querycoordv2/checkers/balance_checker.go +++ b/internal/querycoordv2/checkers/balance_checker.go @@ -41,6 +41,7 @@ import ( type BalanceChecker struct { *checkerActivation meta *meta.Meta + dist *meta.DistributionManager nodeManager *session.NodeManager normalBalanceCollectionsCurrentRound typeutil.UniqueSet scheduler task.Scheduler @@ -49,6 +50,7 @@ type BalanceChecker struct { } func NewBalanceChecker(meta *meta.Meta, + dist *meta.DistributionManager, targetMgr meta.TargetManagerInterface, nodeMgr *session.NodeManager, scheduler task.Scheduler, @@ -57,6 +59,7 @@ func NewBalanceChecker(meta *meta.Meta, return &BalanceChecker{ checkerActivation: newCheckerActivation(), meta: meta, + dist: dist, targetMgr: targetMgr, nodeManager: nodeMgr, normalBalanceCollectionsCurrentRound: typeutil.NewUniqueSet(), @@ -80,6 +83,25 @@ func (b *BalanceChecker) readyToCheck(ctx context.Context, collectionID int64) b return metaExist && targetExist } +// after qc restart, the recovered current target doesn't contains segment's row number, +// and dist_handler get segmentInfo from current target to update SegmentDistManager +// which causes the segment row count in SegmentDistManager is zero, balance based on +// wrong segment row count shouldn't happens, this patch may introduce some cpu cost, +// cause we need to check the segment row count before each balance +func (b *BalanceChecker) checkSegmentRowCount(ctx context.Context, collectionID int64) bool { + segments := b.dist.SegmentDistManager.GetByFilter(meta.WithCollectionID(collectionID)) + for _, s := range segments { + if s.GetNumOfRows() == 0 { + log.Ctx(ctx).RatedDebug(10, "collection has segment with row count zero, skip balancing in this round", + zap.Int64("collectionID", collectionID), + zap.Int64("segmentID", s.GetID()), + ) + return false + } + } + return true +} + func (b *BalanceChecker) replicasToBalance(ctx context.Context) []int64 { ids := b.meta.GetAll(ctx) @@ -128,8 +150,13 @@ func (b *BalanceChecker) replicasToBalance(ctx context.Context) []int64 { zap.Int64("collectionID", cid)) continue } - hasUnbalancedCollection = true + b.normalBalanceCollectionsCurrentRound.Insert(cid) + if !b.checkSegmentRowCount(ctx, cid) { + continue + } + + hasUnbalancedCollection = true for _, replica := range b.meta.ReplicaManager.GetByCollection(ctx, cid) { normalReplicasToBalance = append(normalReplicasToBalance, replica.GetID()) } diff --git a/internal/querycoordv2/checkers/balance_checker_test.go b/internal/querycoordv2/checkers/balance_checker_test.go index 2bc24627f8721..4c5c2836c36c1 100644 --- a/internal/querycoordv2/checkers/balance_checker_test.go +++ b/internal/querycoordv2/checkers/balance_checker_test.go @@ -48,6 +48,7 @@ type BalanceCheckerTestSuite struct { nodeMgr *session.NodeManager scheduler *task.MockScheduler targetMgr *meta.TargetManager + dist *meta.DistributionManager } func (suite *BalanceCheckerTestSuite) SetupSuite() { @@ -78,7 +79,8 @@ func (suite *BalanceCheckerTestSuite) SetupTest() { suite.targetMgr = meta.NewTargetManager(suite.broker, suite.meta) suite.balancer = balance.NewMockBalancer(suite.T()) - suite.checker = NewBalanceChecker(suite.meta, suite.targetMgr, suite.nodeMgr, suite.scheduler, func() balance.Balance { return suite.balancer }) + suite.dist = meta.NewDistributionManager() + suite.checker = NewBalanceChecker(suite.meta, suite.dist, suite.targetMgr, suite.nodeMgr, suite.scheduler, func() balance.Balance { return suite.balancer }) } func (suite *BalanceCheckerTestSuite) TearDownTest() { @@ -306,6 +308,73 @@ func (suite *BalanceCheckerTestSuite) TestStoppingBalance() { suite.Len(tasks, 2) } +func (suite *BalanceCheckerTestSuite) TestSkipBalanceOnCollection() { + ctx := context.Background() + + segments := []*datapb.SegmentInfo{ + { + ID: 1, + PartitionID: 1, + InsertChannel: "test-insert-channel", + }, + } + channels := []*datapb.VchannelInfo{ + { + CollectionID: 1, + ChannelName: "test-insert-channel", + }, + } + suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, mock.Anything).Return(channels, segments, nil) + + nodeID1, nodeID2 := int64(1), int64(2) + suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: nodeID1, + Address: "localhost", + Hostname: "localhost", + })) + suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: nodeID2, + Address: "localhost", + Hostname: "localhost", + })) + suite.checker.meta.ResourceManager.HandleNodeUp(ctx, nodeID1) + suite.checker.meta.ResourceManager.HandleNodeUp(ctx, nodeID2) + + cid1, replicaID1, partitionID1 := 1, 1, 1 + collection1 := utils.CreateTestCollection(int64(cid1), int32(replicaID1)) + collection1.Status = querypb.LoadStatus_Loaded + replica1 := utils.CreateTestReplica(int64(replicaID1), int64(cid1), []int64{nodeID1, nodeID2}) + partition1 := utils.CreateTestPartition(int64(cid1), int64(partitionID1)) + suite.checker.meta.CollectionManager.PutCollection(ctx, collection1, partition1) + suite.checker.meta.ReplicaManager.Put(ctx, replica1) + suite.targetMgr.UpdateCollectionNextTarget(ctx, int64(cid1)) + suite.targetMgr.UpdateCollectionCurrentTarget(ctx, int64(cid1)) + + // test skip balance on collection with zero row count segment + suite.dist.SegmentDistManager.Update(1, &meta.Segment{ + SegmentInfo: &datapb.SegmentInfo{ + ID: 1, + CollectionID: int64(cid1), + PartitionID: 1, + InsertChannel: "test-insert-channel", + }, + }) + replicas := suite.checker.replicasToBalance(ctx) + suite.Len(replicas, 0) + + suite.dist.SegmentDistManager.Update(1, &meta.Segment{ + SegmentInfo: &datapb.SegmentInfo{ + ID: 1, + CollectionID: int64(cid1), + PartitionID: 1, + InsertChannel: "test-insert-channel", + NumOfRows: 1, + }, + }) + replicas = suite.checker.replicasToBalance(ctx) + suite.Len(replicas, 1) +} + func (suite *BalanceCheckerTestSuite) TestTargetNotReady() { ctx := context.Background() // set up nodes info, stopping node1 diff --git a/internal/querycoordv2/checkers/controller.go b/internal/querycoordv2/checkers/controller.go index 2cc46e5f1f11b..e8c6cc2dd676c 100644 --- a/internal/querycoordv2/checkers/controller.go +++ b/internal/querycoordv2/checkers/controller.go @@ -67,7 +67,7 @@ func NewCheckerController( checkers := map[utils.CheckerType]Checker{ utils.ChannelChecker: NewChannelChecker(meta, dist, targetMgr, nodeMgr, getBalancerFunc), utils.SegmentChecker: NewSegmentChecker(meta, dist, targetMgr, nodeMgr, getBalancerFunc), - utils.BalanceChecker: NewBalanceChecker(meta, targetMgr, nodeMgr, scheduler, getBalancerFunc), + utils.BalanceChecker: NewBalanceChecker(meta, dist, targetMgr, nodeMgr, scheduler, getBalancerFunc), utils.IndexChecker: NewIndexChecker(meta, dist, broker, nodeMgr, targetMgr), // todo temporary work around must fix // utils.LeaderChecker: NewLeaderChecker(meta, dist, targetMgr, nodeMgr, true),