Skip to content

Commit

Permalink
fix: [2.4] Querycoord will trigger unexpected balance task after rest…
Browse files Browse the repository at this point in the history
…art (#38714)

issue: #38606 
pr: #38630

---------

Signed-off-by: Wei Liu <[email protected]>
  • Loading branch information
weiliu1031 authored Dec 25, 2024
1 parent 648078e commit 8fe883f
Show file tree
Hide file tree
Showing 8 changed files with 129 additions and 40 deletions.
1 change: 1 addition & 0 deletions internal/proto/query_coord.proto
Original file line number Diff line number Diff line change
Expand Up @@ -827,6 +827,7 @@ message CheckerInfo {
message SegmentTarget {
int64 ID = 1;
data.SegmentLevel level = 2;
int64 num_of_rows = 3;
}

message PartitionTarget {
Expand Down
32 changes: 16 additions & 16 deletions internal/querycoordv2/balance/mock_balancer.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 11 additions & 0 deletions internal/querycoordv2/checkers/balance_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,17 @@ func (b *BalanceChecker) replicasToBalance() []int64 {
return nil
}

// Before performing balancing, check the CurrentTarget/LeaderView/Distribution for all collections.
// If any collection has unready info, skip the balance operation to avoid inconsistencies.
notReadyCollections := lo.Filter(loadedCollections, func(cid int64, _ int) bool {
// todo: should also check distribution and leader view in the future
return !b.targetMgr.IsCurrentTargetReady(cid)
})
if len(notReadyCollections) > 0 {
log.RatedInfo(10, "skip normal balance, cause collection not ready for balance", zap.Int64s("collectionIDs", notReadyCollections))
return nil
}

// iterator one normal collection in one round
normalReplicasToBalance := make([]int64, 0)
hasUnbalancedCollection := false
Expand Down
32 changes: 14 additions & 18 deletions internal/querycoordv2/checkers/balance_checker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,20 +320,8 @@ func (suite *BalanceCheckerTestSuite) TestTargetNotReady() {
suite.checker.meta.ResourceManager.HandleNodeUp(nodeID1)
suite.checker.meta.ResourceManager.HandleNodeUp(nodeID2)

segments := []*datapb.SegmentInfo{
{
ID: 1,
PartitionID: 1,
InsertChannel: "test-insert-channel",
},
}
channels := []*datapb.VchannelInfo{
{
CollectionID: 1,
ChannelName: "test-insert-channel",
},
}
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, mock.Anything).Return(channels, segments, nil)
mockTarget := meta.NewMockTargetManager(suite.T())
suite.checker.targetMgr = mockTarget

// set collections meta
cid1, replicaID1, partitionID1 := 1, 1, 1
Expand All @@ -343,8 +331,6 @@ func (suite *BalanceCheckerTestSuite) TestTargetNotReady() {
partition1 := utils.CreateTestPartition(int64(cid1), int64(partitionID1))
suite.checker.meta.CollectionManager.PutCollection(collection1, partition1)
suite.checker.meta.ReplicaManager.Put(replica1)
suite.targetMgr.UpdateCollectionNextTarget(int64(cid1))
suite.targetMgr.UpdateCollectionCurrentTarget(int64(cid1))

cid2, replicaID2, partitionID2 := 2, 2, 2
collection2 := utils.CreateTestCollection(int64(cid2), int32(replicaID2))
Expand All @@ -354,6 +340,17 @@ func (suite *BalanceCheckerTestSuite) TestTargetNotReady() {
suite.checker.meta.CollectionManager.PutCollection(collection2, partition2)
suite.checker.meta.ReplicaManager.Put(replica2)

// test normal balance when one collection has unready target
mockTarget.EXPECT().IsNextTargetExist(mock.Anything).Return(true)
mockTarget.EXPECT().IsCurrentTargetReady(mock.Anything).Return(false)
replicasToBalance := suite.checker.replicasToBalance()
suite.Len(replicasToBalance, 0)

// test stopping balance with target not ready
mockTarget.ExpectedCalls = nil
mockTarget.EXPECT().IsNextTargetExist(mock.Anything).Return(false)
mockTarget.EXPECT().IsCurrentTargetExist(int64(cid1), mock.Anything).Return(true)
mockTarget.EXPECT().IsCurrentTargetExist(int64(cid2), mock.Anything).Return(false)
mr1 := replica1.CopyForWrite()
mr1.AddRONode(1)
suite.checker.meta.ReplicaManager.Put(mr1.IntoReplica())
Expand All @@ -362,9 +359,8 @@ func (suite *BalanceCheckerTestSuite) TestTargetNotReady() {
mr2.AddRONode(1)
suite.checker.meta.ReplicaManager.Put(mr2.IntoReplica())

// test stopping balance
idsToBalance := []int64{int64(replicaID1)}
replicasToBalance := suite.checker.replicasToBalance()
replicasToBalance = suite.checker.replicasToBalance()
suite.ElementsMatch(idsToBalance, replicasToBalance)
}

Expand Down
42 changes: 42 additions & 0 deletions internal/querycoordv2/meta/mock_target_manager.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

33 changes: 27 additions & 6 deletions internal/querycoordv2/meta/target.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@ import (
"time"

"github.com/samber/lo"
"go.uber.org/zap"

"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)

Expand All @@ -32,6 +34,9 @@ type CollectionTarget struct {
dmChannels map[string]*DmChannel
partitions typeutil.Set[int64] // stores target partitions info
version int64

// record target status, if target has been save before milvus v2.4.19, then the target will lack of segment info.
lackSegmentInfo bool
}

func NewCollectionTarget(segments map[int64]*datapb.SegmentInfo, dmChannels map[string]*DmChannel, partitionIDs []int64) *CollectionTarget {
Expand All @@ -48,15 +53,20 @@ func FromPbCollectionTarget(target *querypb.CollectionTarget) *CollectionTarget
dmChannels := make(map[string]*DmChannel)
var partitions []int64

lackSegmentInfo := false
for _, t := range target.GetChannelTargets() {
for _, partition := range t.GetPartitionTargets() {
for _, segment := range partition.GetSegments() {
if segment.GetNumOfRows() <= 0 {
lackSegmentInfo = true
}
segments[segment.GetID()] = &datapb.SegmentInfo{
ID: segment.GetID(),
Level: segment.GetLevel(),
CollectionID: target.GetCollectionID(),
PartitionID: partition.GetPartitionID(),
InsertChannel: t.GetChannelName(),
NumOfRows: segment.GetNumOfRows(),
}
}
partitions = append(partitions, partition.GetPartitionID())
Expand All @@ -73,11 +83,16 @@ func FromPbCollectionTarget(target *querypb.CollectionTarget) *CollectionTarget
}
}

if lackSegmentInfo {
log.Info("target has lack of segment info", zap.Int64("collectionID", target.GetCollectionID()))
}

return &CollectionTarget{
segments: segments,
dmChannels: dmChannels,
partitions: typeutil.NewSet(partitions...),
version: target.GetVersion(),
segments: segments,
dmChannels: dmChannels,
partitions: typeutil.NewSet(partitions...),
version: target.GetVersion(),
lackSegmentInfo: lackSegmentInfo,
}
}

Expand Down Expand Up @@ -111,8 +126,9 @@ func (p *CollectionTarget) toPbMsg() *querypb.CollectionTarget {
}

partitionTarget.Segments = append(partitionTarget.Segments, &querypb.SegmentTarget{
ID: info.GetID(),
Level: info.GetLevel(),
ID: info.GetID(),
Level: info.GetLevel(),
NumOfRows: info.GetNumOfRows(),
})
}
}
Expand Down Expand Up @@ -157,6 +173,11 @@ func (p *CollectionTarget) IsEmpty() bool {
return len(p.dmChannels)+len(p.segments) == 0
}

// if target is ready, it should have all segment info
func (p *CollectionTarget) Ready() bool {
return !p.lackSegmentInfo
}

type target struct {
// just maintain target at collection level
collectionTargetMap map[int64]*CollectionTarget
Expand Down
12 changes: 12 additions & 0 deletions internal/querycoordv2/meta/target_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ type TargetManagerInterface interface {
SaveCurrentTarget(catalog metastore.QueryCoordCatalog)
Recover(catalog metastore.QueryCoordCatalog) error
CanSegmentBeMoved(collectionID, segmentID int64) bool
IsCurrentTargetReady(collectionID int64) bool
}

type TargetManager struct {
Expand Down Expand Up @@ -631,3 +632,14 @@ func (mgr *TargetManager) CanSegmentBeMoved(collectionID, segmentID int64) bool

return false
}

func (mgr *TargetManager) IsCurrentTargetReady(collectionID int64) bool {
mgr.rwMutex.RLock()
defer mgr.rwMutex.RUnlock()
target, ok := mgr.current.collectionTargetMap[collectionID]
if !ok {
return false
}

return target.Ready()
}
6 changes: 6 additions & 0 deletions internal/querycoordv2/meta/target_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -570,11 +570,13 @@ func (suite *TargetManagerSuite) TestRecover() {
ID: 11,
PartitionID: 1,
InsertChannel: "channel-1",
NumOfRows: 100,
},
{
ID: 12,
PartitionID: 1,
InsertChannel: "channel-2",
NumOfRows: 100,
},
}

Expand All @@ -595,6 +597,10 @@ func (suite *TargetManagerSuite) TestRecover() {
suite.Len(target.GetAllDmChannelNames(), 2)
suite.Len(target.GetAllSegmentIDs(), 2)
suite.Equal(target.GetTargetVersion(), version)
for _, segment := range target.GetAllSegments() {
suite.Equal(int64(100), segment.GetNumOfRows())
}
suite.True(target.Ready())

// after recover, target info should be cleaned up
targets, err := suite.catalog.GetCollectionTargets()
Expand Down

0 comments on commit 8fe883f

Please sign in to comment.