Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Skip balance on collection with zero row count segment #38665

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 28 additions & 1 deletion internal/querycoordv2/checkers/balance_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
type BalanceChecker struct {
*checkerActivation
meta *meta.Meta
dist *meta.DistributionManager
nodeManager *session.NodeManager
normalBalanceCollectionsCurrentRound typeutil.UniqueSet
scheduler task.Scheduler
Expand All @@ -49,6 +50,7 @@ type BalanceChecker struct {
}

func NewBalanceChecker(meta *meta.Meta,
dist *meta.DistributionManager,
targetMgr meta.TargetManagerInterface,
nodeMgr *session.NodeManager,
scheduler task.Scheduler,
Expand All @@ -57,6 +59,7 @@ func NewBalanceChecker(meta *meta.Meta,
return &BalanceChecker{
checkerActivation: newCheckerActivation(),
meta: meta,
dist: dist,
targetMgr: targetMgr,
nodeManager: nodeMgr,
normalBalanceCollectionsCurrentRound: typeutil.NewUniqueSet(),
Expand All @@ -80,6 +83,25 @@ func (b *BalanceChecker) readyToCheck(ctx context.Context, collectionID int64) b
return metaExist && targetExist
}

// after qc restart, the recovered current target doesn't contains segment's row number,
// and dist_handler get segmentInfo from current target to update SegmentDistManager
// which causes the segment row count in SegmentDistManager is zero, balance based on
// wrong segment row count shouldn't happens, this patch may introduce some cpu cost,
// cause we need to check the segment row count before each balance
func (b *BalanceChecker) checkSegmentRowCount(ctx context.Context, collectionID int64) bool {
segments := b.dist.SegmentDistManager.GetByFilter(meta.WithCollectionID(collectionID))
for _, s := range segments {
if s.GetNumOfRows() == 0 {
log.Ctx(ctx).RatedDebug(10, "collection has segment with row count zero, skip balancing in this round",
zap.Int64("collectionID", collectionID),
zap.Int64("segmentID", s.GetID()),
)
return false
}
}
return true
}

func (b *BalanceChecker) replicasToBalance(ctx context.Context) []int64 {
ids := b.meta.GetAll(ctx)

Expand Down Expand Up @@ -128,8 +150,13 @@ func (b *BalanceChecker) replicasToBalance(ctx context.Context) []int64 {
zap.Int64("collectionID", cid))
continue
}
hasUnbalancedCollection = true

b.normalBalanceCollectionsCurrentRound.Insert(cid)
if !b.checkSegmentRowCount(ctx, cid) {
continue
}

hasUnbalancedCollection = true
for _, replica := range b.meta.ReplicaManager.GetByCollection(ctx, cid) {
normalReplicasToBalance = append(normalReplicasToBalance, replica.GetID())
}
Expand Down
71 changes: 70 additions & 1 deletion internal/querycoordv2/checkers/balance_checker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ type BalanceCheckerTestSuite struct {
nodeMgr *session.NodeManager
scheduler *task.MockScheduler
targetMgr *meta.TargetManager
dist *meta.DistributionManager
}

func (suite *BalanceCheckerTestSuite) SetupSuite() {
Expand Down Expand Up @@ -78,7 +79,8 @@ func (suite *BalanceCheckerTestSuite) SetupTest() {
suite.targetMgr = meta.NewTargetManager(suite.broker, suite.meta)

suite.balancer = balance.NewMockBalancer(suite.T())
suite.checker = NewBalanceChecker(suite.meta, suite.targetMgr, suite.nodeMgr, suite.scheduler, func() balance.Balance { return suite.balancer })
suite.dist = meta.NewDistributionManager()
suite.checker = NewBalanceChecker(suite.meta, suite.dist, suite.targetMgr, suite.nodeMgr, suite.scheduler, func() balance.Balance { return suite.balancer })
}

func (suite *BalanceCheckerTestSuite) TearDownTest() {
Expand Down Expand Up @@ -306,6 +308,73 @@ func (suite *BalanceCheckerTestSuite) TestStoppingBalance() {
suite.Len(tasks, 2)
}

func (suite *BalanceCheckerTestSuite) TestSkipBalanceOnCollection() {
ctx := context.Background()

segments := []*datapb.SegmentInfo{
{
ID: 1,
PartitionID: 1,
InsertChannel: "test-insert-channel",
},
}
channels := []*datapb.VchannelInfo{
{
CollectionID: 1,
ChannelName: "test-insert-channel",
},
}
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, mock.Anything).Return(channels, segments, nil)

nodeID1, nodeID2 := int64(1), int64(2)
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: nodeID1,
Address: "localhost",
Hostname: "localhost",
}))
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: nodeID2,
Address: "localhost",
Hostname: "localhost",
}))
suite.checker.meta.ResourceManager.HandleNodeUp(ctx, nodeID1)
suite.checker.meta.ResourceManager.HandleNodeUp(ctx, nodeID2)

cid1, replicaID1, partitionID1 := 1, 1, 1
collection1 := utils.CreateTestCollection(int64(cid1), int32(replicaID1))
collection1.Status = querypb.LoadStatus_Loaded
replica1 := utils.CreateTestReplica(int64(replicaID1), int64(cid1), []int64{nodeID1, nodeID2})
partition1 := utils.CreateTestPartition(int64(cid1), int64(partitionID1))
suite.checker.meta.CollectionManager.PutCollection(ctx, collection1, partition1)
suite.checker.meta.ReplicaManager.Put(ctx, replica1)
suite.targetMgr.UpdateCollectionNextTarget(ctx, int64(cid1))
suite.targetMgr.UpdateCollectionCurrentTarget(ctx, int64(cid1))

// test skip balance on collection with zero row count segment
suite.dist.SegmentDistManager.Update(1, &meta.Segment{
SegmentInfo: &datapb.SegmentInfo{
ID: 1,
CollectionID: int64(cid1),
PartitionID: 1,
InsertChannel: "test-insert-channel",
},
})
replicas := suite.checker.replicasToBalance(ctx)
suite.Len(replicas, 0)

suite.dist.SegmentDistManager.Update(1, &meta.Segment{
SegmentInfo: &datapb.SegmentInfo{
ID: 1,
CollectionID: int64(cid1),
PartitionID: 1,
InsertChannel: "test-insert-channel",
NumOfRows: 1,
},
})
replicas = suite.checker.replicasToBalance(ctx)
suite.Len(replicas, 1)
}

func (suite *BalanceCheckerTestSuite) TestTargetNotReady() {
ctx := context.Background()
// set up nodes info, stopping node1
Expand Down
2 changes: 1 addition & 1 deletion internal/querycoordv2/checkers/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func NewCheckerController(
checkers := map[utils.CheckerType]Checker{
utils.ChannelChecker: NewChannelChecker(meta, dist, targetMgr, nodeMgr, getBalancerFunc),
utils.SegmentChecker: NewSegmentChecker(meta, dist, targetMgr, nodeMgr, getBalancerFunc),
utils.BalanceChecker: NewBalanceChecker(meta, targetMgr, nodeMgr, scheduler, getBalancerFunc),
utils.BalanceChecker: NewBalanceChecker(meta, dist, targetMgr, nodeMgr, scheduler, getBalancerFunc),
utils.IndexChecker: NewIndexChecker(meta, dist, broker, nodeMgr, targetMgr),
// todo temporary work around must fix
// utils.LeaderChecker: NewLeaderChecker(meta, dist, targetMgr, nodeMgr, true),
Expand Down
Loading