Skip to content

Commit

Permalink
fix: Skip normal balance if target lack of segment info
Browse files Browse the repository at this point in the history
Signed-off-by: Wei Liu <[email protected]>
  • Loading branch information
weiliu1031 committed Dec 24, 2024
1 parent ebc8bf8 commit 3cd369d
Show file tree
Hide file tree
Showing 7 changed files with 119 additions and 38 deletions.
32 changes: 16 additions & 16 deletions internal/querycoordv2/balance/mock_balancer.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 11 additions & 0 deletions internal/querycoordv2/checkers/balance_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,17 @@ func (b *BalanceChecker) replicasToBalance() []int64 {
return nil
}

// Before performing balancing, check the CurrentTarget/LeaderView/Distribution for all collections.
// If any collection has unready info, skip the balance operation to avoid inconsistencies.
notReadyCollections := lo.Filter(loadedCollections, func(cid int64, _ int) bool {
// todo: should also check distribution and leader view in the future
return !b.targetMgr.IsCurrentTargetReady(cid)
})
if len(notReadyCollections) > 0 {
log.RatedInfo(10, "skip normal balance, cause collection not ready for balance", zap.Int64s("collectionIDs", notReadyCollections))
return nil
}

// iterator one normal collection in one round
normalReplicasToBalance := make([]int64, 0)
hasUnbalancedCollection := false
Expand Down
32 changes: 14 additions & 18 deletions internal/querycoordv2/checkers/balance_checker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,20 +320,8 @@ func (suite *BalanceCheckerTestSuite) TestTargetNotReady() {
suite.checker.meta.ResourceManager.HandleNodeUp(nodeID1)
suite.checker.meta.ResourceManager.HandleNodeUp(nodeID2)

segments := []*datapb.SegmentInfo{
{
ID: 1,
PartitionID: 1,
InsertChannel: "test-insert-channel",
},
}
channels := []*datapb.VchannelInfo{
{
CollectionID: 1,
ChannelName: "test-insert-channel",
},
}
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, mock.Anything).Return(channels, segments, nil)
mockTarget := meta.NewMockTargetManager(suite.T())
suite.checker.targetMgr = mockTarget

// set collections meta
cid1, replicaID1, partitionID1 := 1, 1, 1
Expand All @@ -343,8 +331,6 @@ func (suite *BalanceCheckerTestSuite) TestTargetNotReady() {
partition1 := utils.CreateTestPartition(int64(cid1), int64(partitionID1))
suite.checker.meta.CollectionManager.PutCollection(collection1, partition1)
suite.checker.meta.ReplicaManager.Put(replica1)
suite.targetMgr.UpdateCollectionNextTarget(int64(cid1))
suite.targetMgr.UpdateCollectionCurrentTarget(int64(cid1))

cid2, replicaID2, partitionID2 := 2, 2, 2
collection2 := utils.CreateTestCollection(int64(cid2), int32(replicaID2))
Expand All @@ -354,6 +340,17 @@ func (suite *BalanceCheckerTestSuite) TestTargetNotReady() {
suite.checker.meta.CollectionManager.PutCollection(collection2, partition2)
suite.checker.meta.ReplicaManager.Put(replica2)

// test normal balance when one collection has unready target
mockTarget.EXPECT().IsNextTargetExist(mock.Anything).Return(true)
mockTarget.EXPECT().IsCurrentTargetReady(mock.Anything).Return(false)
replicasToBalance := suite.checker.replicasToBalance()
suite.Len(replicasToBalance, 0)

// test stopping balance with target not ready
mockTarget.ExpectedCalls = nil
mockTarget.EXPECT().IsNextTargetExist(mock.Anything).Return(false)
mockTarget.EXPECT().IsCurrentTargetExist(int64(cid1), mock.Anything).Return(true)
mockTarget.EXPECT().IsCurrentTargetExist(int64(cid2), mock.Anything).Return(false)
mr1 := replica1.CopyForWrite()
mr1.AddRONode(1)
suite.checker.meta.ReplicaManager.Put(mr1.IntoReplica())
Expand All @@ -362,9 +359,8 @@ func (suite *BalanceCheckerTestSuite) TestTargetNotReady() {
mr2.AddRONode(1)
suite.checker.meta.ReplicaManager.Put(mr2.IntoReplica())

// test stopping balance
idsToBalance := []int64{int64(replicaID1)}
replicasToBalance := suite.checker.replicasToBalance()
replicasToBalance = suite.checker.replicasToBalance()
suite.ElementsMatch(idsToBalance, replicasToBalance)
}

Expand Down
42 changes: 42 additions & 0 deletions internal/querycoordv2/meta/mock_target_manager.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

27 changes: 23 additions & 4 deletions internal/querycoordv2/meta/target.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ package meta
import (
"time"

"github.com/pingcap/log"
"github.com/samber/lo"
"go.uber.org/zap"

"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/querypb"
Expand All @@ -32,6 +34,9 @@ type CollectionTarget struct {
dmChannels map[string]*DmChannel
partitions typeutil.Set[int64] // stores target partitions info
version int64

// record target status, if target has been save before milvus v2.4.19, then the target will lack of segment info.
lackSegmentInfo bool
}

func NewCollectionTarget(segments map[int64]*datapb.SegmentInfo, dmChannels map[string]*DmChannel, partitionIDs []int64) *CollectionTarget {
Expand All @@ -48,9 +53,13 @@ func FromPbCollectionTarget(target *querypb.CollectionTarget) *CollectionTarget
dmChannels := make(map[string]*DmChannel)
var partitions []int64

lackSegmentInfo := false
for _, t := range target.GetChannelTargets() {
for _, partition := range t.GetPartitionTargets() {
for _, segment := range partition.GetSegments() {
if segment.GetNumOfRows() <= 0 {
lackSegmentInfo = true
}
segments[segment.GetID()] = &datapb.SegmentInfo{
ID: segment.GetID(),
Level: segment.GetLevel(),
Expand All @@ -74,11 +83,16 @@ func FromPbCollectionTarget(target *querypb.CollectionTarget) *CollectionTarget
}
}

if lackSegmentInfo {
log.Info("target has lack of segment info", zap.Int64("collectionID", target.GetCollectionID()))
}

return &CollectionTarget{
segments: segments,
dmChannels: dmChannels,
partitions: typeutil.NewSet(partitions...),
version: target.GetVersion(),
segments: segments,
dmChannels: dmChannels,
partitions: typeutil.NewSet(partitions...),
version: target.GetVersion(),
lackSegmentInfo: lackSegmentInfo,
}
}

Expand Down Expand Up @@ -159,6 +173,11 @@ func (p *CollectionTarget) IsEmpty() bool {
return len(p.dmChannels)+len(p.segments) == 0
}

// if target is ready, it should have all segment info
func (p *CollectionTarget) Ready() bool {
return !p.lackSegmentInfo
}

type target struct {
// just maintain target at collection level
collectionTargetMap map[int64]*CollectionTarget
Expand Down
12 changes: 12 additions & 0 deletions internal/querycoordv2/meta/target_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ type TargetManagerInterface interface {
SaveCurrentTarget(catalog metastore.QueryCoordCatalog)
Recover(catalog metastore.QueryCoordCatalog) error
CanSegmentBeMoved(collectionID, segmentID int64) bool
IsCurrentTargetReady(collectionID int64) bool
}

type TargetManager struct {
Expand Down Expand Up @@ -631,3 +632,14 @@ func (mgr *TargetManager) CanSegmentBeMoved(collectionID, segmentID int64) bool

return false
}

func (mgr *TargetManager) IsCurrentTargetReady(collectionID int64) bool {
mgr.rwMutex.RLock()
defer mgr.rwMutex.RUnlock()
target, ok := mgr.current.collectionTargetMap[collectionID]
if !ok {
return false
}

return target.Ready()
}
1 change: 1 addition & 0 deletions internal/querycoordv2/meta/target_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -600,6 +600,7 @@ func (suite *TargetManagerSuite) TestRecover() {
for _, segment := range target.GetAllSegments() {
suite.Equal(int64(100), segment.GetNumOfRows())
}
suite.True(target.Ready())

// after recover, target info should be cleaned up
targets, err := suite.catalog.GetCollectionTargets()
Expand Down

0 comments on commit 3cd369d

Please sign in to comment.