From 3cd369dac31613dbb1adf3afcd2d5a1216b79f56 Mon Sep 17 00:00:00 2001 From: Wei Liu Date: Tue, 24 Dec 2024 21:33:35 +0800 Subject: [PATCH] fix: Skip normal balance if target lack of segment info Signed-off-by: Wei Liu --- .../querycoordv2/balance/mock_balancer.go | 32 +++++++------- .../querycoordv2/checkers/balance_checker.go | 11 +++++ .../checkers/balance_checker_test.go | 32 +++++++------- .../querycoordv2/meta/mock_target_manager.go | 42 +++++++++++++++++++ internal/querycoordv2/meta/target.go | 27 ++++++++++-- internal/querycoordv2/meta/target_manager.go | 12 ++++++ .../querycoordv2/meta/target_manager_test.go | 1 + 7 files changed, 119 insertions(+), 38 deletions(-) diff --git a/internal/querycoordv2/balance/mock_balancer.go b/internal/querycoordv2/balance/mock_balancer.go index c4880f1849803..de241f882a368 100644 --- a/internal/querycoordv2/balance/mock_balancer.go +++ b/internal/querycoordv2/balance/mock_balancer.go @@ -20,13 +20,13 @@ func (_m *MockBalancer) EXPECT() *MockBalancer_Expecter { return &MockBalancer_Expecter{mock: &_m.Mock} } -// AssignChannel provides a mock function with given fields: collectionID, channels, nodes, manualBalance -func (_m *MockBalancer) AssignChannel(collectionID int64, channels []*meta.DmChannel, nodes []int64, manualBalance bool) []ChannelAssignPlan { - ret := _m.Called(collectionID, channels, nodes, manualBalance) +// AssignChannel provides a mock function with given fields: collectionID, channels, nodes, forceAssign +func (_m *MockBalancer) AssignChannel(collectionID int64, channels []*meta.DmChannel, nodes []int64, forceAssign bool) []ChannelAssignPlan { + ret := _m.Called(collectionID, channels, nodes, forceAssign) var r0 []ChannelAssignPlan if rf, ok := ret.Get(0).(func(int64, []*meta.DmChannel, []int64, bool) []ChannelAssignPlan); ok { - r0 = rf(collectionID, channels, nodes, manualBalance) + r0 = rf(collectionID, channels, nodes, forceAssign) } else { if ret.Get(0) != nil { r0 = ret.Get(0).([]ChannelAssignPlan) @@ -45,12 +45,12 @@ type MockBalancer_AssignChannel_Call struct { // - collectionID int64 // - channels []*meta.DmChannel // - nodes []int64 -// - manualBalance bool -func (_e *MockBalancer_Expecter) AssignChannel(collectionID interface{}, channels interface{}, nodes interface{}, manualBalance interface{}) *MockBalancer_AssignChannel_Call { - return &MockBalancer_AssignChannel_Call{Call: _e.mock.On("AssignChannel", collectionID, channels, nodes, manualBalance)} +// - forceAssign bool +func (_e *MockBalancer_Expecter) AssignChannel(collectionID interface{}, channels interface{}, nodes interface{}, forceAssign interface{}) *MockBalancer_AssignChannel_Call { + return &MockBalancer_AssignChannel_Call{Call: _e.mock.On("AssignChannel", collectionID, channels, nodes, forceAssign)} } -func (_c *MockBalancer_AssignChannel_Call) Run(run func(collectionID int64, channels []*meta.DmChannel, nodes []int64, manualBalance bool)) *MockBalancer_AssignChannel_Call { +func (_c *MockBalancer_AssignChannel_Call) Run(run func(collectionID int64, channels []*meta.DmChannel, nodes []int64, forceAssign bool)) *MockBalancer_AssignChannel_Call { _c.Call.Run(func(args mock.Arguments) { run(args[0].(int64), args[1].([]*meta.DmChannel), args[2].([]int64), args[3].(bool)) }) @@ -67,13 +67,13 @@ func (_c *MockBalancer_AssignChannel_Call) RunAndReturn(run func(int64, []*meta. return _c } -// AssignSegment provides a mock function with given fields: collectionID, segments, nodes, manualBalance -func (_m *MockBalancer) AssignSegment(collectionID int64, segments []*meta.Segment, nodes []int64, manualBalance bool) []SegmentAssignPlan { - ret := _m.Called(collectionID, segments, nodes, manualBalance) +// AssignSegment provides a mock function with given fields: collectionID, segments, nodes, forceAssign +func (_m *MockBalancer) AssignSegment(collectionID int64, segments []*meta.Segment, nodes []int64, forceAssign bool) []SegmentAssignPlan { + ret := _m.Called(collectionID, segments, nodes, forceAssign) var r0 []SegmentAssignPlan if rf, ok := ret.Get(0).(func(int64, []*meta.Segment, []int64, bool) []SegmentAssignPlan); ok { - r0 = rf(collectionID, segments, nodes, manualBalance) + r0 = rf(collectionID, segments, nodes, forceAssign) } else { if ret.Get(0) != nil { r0 = ret.Get(0).([]SegmentAssignPlan) @@ -92,12 +92,12 @@ type MockBalancer_AssignSegment_Call struct { // - collectionID int64 // - segments []*meta.Segment // - nodes []int64 -// - manualBalance bool -func (_e *MockBalancer_Expecter) AssignSegment(collectionID interface{}, segments interface{}, nodes interface{}, manualBalance interface{}) *MockBalancer_AssignSegment_Call { - return &MockBalancer_AssignSegment_Call{Call: _e.mock.On("AssignSegment", collectionID, segments, nodes, manualBalance)} +// - forceAssign bool +func (_e *MockBalancer_Expecter) AssignSegment(collectionID interface{}, segments interface{}, nodes interface{}, forceAssign interface{}) *MockBalancer_AssignSegment_Call { + return &MockBalancer_AssignSegment_Call{Call: _e.mock.On("AssignSegment", collectionID, segments, nodes, forceAssign)} } -func (_c *MockBalancer_AssignSegment_Call) Run(run func(collectionID int64, segments []*meta.Segment, nodes []int64, manualBalance bool)) *MockBalancer_AssignSegment_Call { +func (_c *MockBalancer_AssignSegment_Call) Run(run func(collectionID int64, segments []*meta.Segment, nodes []int64, forceAssign bool)) *MockBalancer_AssignSegment_Call { _c.Call.Run(func(args mock.Arguments) { run(args[0].(int64), args[1].([]*meta.Segment), args[2].([]int64), args[3].(bool)) }) diff --git a/internal/querycoordv2/checkers/balance_checker.go b/internal/querycoordv2/checkers/balance_checker.go index 7acdb898e681e..9f8cc6c55c088 100644 --- a/internal/querycoordv2/checkers/balance_checker.go +++ b/internal/querycoordv2/checkers/balance_checker.go @@ -119,6 +119,17 @@ func (b *BalanceChecker) replicasToBalance() []int64 { return nil } + // Before performing balancing, check the CurrentTarget/LeaderView/Distribution for all collections. + // If any collection has unready info, skip the balance operation to avoid inconsistencies. + notReadyCollections := lo.Filter(loadedCollections, func(cid int64, _ int) bool { + // todo: should also check distribution and leader view in the future + return !b.targetMgr.IsCurrentTargetReady(cid) + }) + if len(notReadyCollections) > 0 { + log.RatedInfo(10, "skip normal balance, cause collection not ready for balance", zap.Int64s("collectionIDs", notReadyCollections)) + return nil + } + // iterator one normal collection in one round normalReplicasToBalance := make([]int64, 0) hasUnbalancedCollection := false diff --git a/internal/querycoordv2/checkers/balance_checker_test.go b/internal/querycoordv2/checkers/balance_checker_test.go index 744d9a2fc7dd7..34d7918ef4fcd 100644 --- a/internal/querycoordv2/checkers/balance_checker_test.go +++ b/internal/querycoordv2/checkers/balance_checker_test.go @@ -320,20 +320,8 @@ func (suite *BalanceCheckerTestSuite) TestTargetNotReady() { suite.checker.meta.ResourceManager.HandleNodeUp(nodeID1) suite.checker.meta.ResourceManager.HandleNodeUp(nodeID2) - segments := []*datapb.SegmentInfo{ - { - ID: 1, - PartitionID: 1, - InsertChannel: "test-insert-channel", - }, - } - channels := []*datapb.VchannelInfo{ - { - CollectionID: 1, - ChannelName: "test-insert-channel", - }, - } - suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, mock.Anything).Return(channels, segments, nil) + mockTarget := meta.NewMockTargetManager(suite.T()) + suite.checker.targetMgr = mockTarget // set collections meta cid1, replicaID1, partitionID1 := 1, 1, 1 @@ -343,8 +331,6 @@ func (suite *BalanceCheckerTestSuite) TestTargetNotReady() { partition1 := utils.CreateTestPartition(int64(cid1), int64(partitionID1)) suite.checker.meta.CollectionManager.PutCollection(collection1, partition1) suite.checker.meta.ReplicaManager.Put(replica1) - suite.targetMgr.UpdateCollectionNextTarget(int64(cid1)) - suite.targetMgr.UpdateCollectionCurrentTarget(int64(cid1)) cid2, replicaID2, partitionID2 := 2, 2, 2 collection2 := utils.CreateTestCollection(int64(cid2), int32(replicaID2)) @@ -354,6 +340,17 @@ func (suite *BalanceCheckerTestSuite) TestTargetNotReady() { suite.checker.meta.CollectionManager.PutCollection(collection2, partition2) suite.checker.meta.ReplicaManager.Put(replica2) + // test normal balance when one collection has unready target + mockTarget.EXPECT().IsNextTargetExist(mock.Anything).Return(true) + mockTarget.EXPECT().IsCurrentTargetReady(mock.Anything).Return(false) + replicasToBalance := suite.checker.replicasToBalance() + suite.Len(replicasToBalance, 0) + + // test stopping balance with target not ready + mockTarget.ExpectedCalls = nil + mockTarget.EXPECT().IsNextTargetExist(mock.Anything).Return(false) + mockTarget.EXPECT().IsCurrentTargetExist(int64(cid1), mock.Anything).Return(true) + mockTarget.EXPECT().IsCurrentTargetExist(int64(cid2), mock.Anything).Return(false) mr1 := replica1.CopyForWrite() mr1.AddRONode(1) suite.checker.meta.ReplicaManager.Put(mr1.IntoReplica()) @@ -362,9 +359,8 @@ func (suite *BalanceCheckerTestSuite) TestTargetNotReady() { mr2.AddRONode(1) suite.checker.meta.ReplicaManager.Put(mr2.IntoReplica()) - // test stopping balance idsToBalance := []int64{int64(replicaID1)} - replicasToBalance := suite.checker.replicasToBalance() + replicasToBalance = suite.checker.replicasToBalance() suite.ElementsMatch(idsToBalance, replicasToBalance) } diff --git a/internal/querycoordv2/meta/mock_target_manager.go b/internal/querycoordv2/meta/mock_target_manager.go index c3622c3fa5e8f..a5b3cd150f6ee 100644 --- a/internal/querycoordv2/meta/mock_target_manager.go +++ b/internal/querycoordv2/meta/mock_target_manager.go @@ -564,6 +564,48 @@ func (_c *MockTargetManager_IsCurrentTargetExist_Call) RunAndReturn(run func(int return _c } +// IsCurrentTargetReady provides a mock function with given fields: collectionID +func (_m *MockTargetManager) IsCurrentTargetReady(collectionID int64) bool { + ret := _m.Called(collectionID) + + var r0 bool + if rf, ok := ret.Get(0).(func(int64) bool); ok { + r0 = rf(collectionID) + } else { + r0 = ret.Get(0).(bool) + } + + return r0 +} + +// MockTargetManager_IsCurrentTargetReady_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'IsCurrentTargetReady' +type MockTargetManager_IsCurrentTargetReady_Call struct { + *mock.Call +} + +// IsCurrentTargetReady is a helper method to define mock.On call +// - collectionID int64 +func (_e *MockTargetManager_Expecter) IsCurrentTargetReady(collectionID interface{}) *MockTargetManager_IsCurrentTargetReady_Call { + return &MockTargetManager_IsCurrentTargetReady_Call{Call: _e.mock.On("IsCurrentTargetReady", collectionID)} +} + +func (_c *MockTargetManager_IsCurrentTargetReady_Call) Run(run func(collectionID int64)) *MockTargetManager_IsCurrentTargetReady_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(int64)) + }) + return _c +} + +func (_c *MockTargetManager_IsCurrentTargetReady_Call) Return(_a0 bool) *MockTargetManager_IsCurrentTargetReady_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockTargetManager_IsCurrentTargetReady_Call) RunAndReturn(run func(int64) bool) *MockTargetManager_IsCurrentTargetReady_Call { + _c.Call.Return(run) + return _c +} + // IsNextTargetExist provides a mock function with given fields: collectionID func (_m *MockTargetManager) IsNextTargetExist(collectionID int64) bool { ret := _m.Called(collectionID) diff --git a/internal/querycoordv2/meta/target.go b/internal/querycoordv2/meta/target.go index 2fca901415405..3cef095ecbd35 100644 --- a/internal/querycoordv2/meta/target.go +++ b/internal/querycoordv2/meta/target.go @@ -19,7 +19,9 @@ package meta import ( "time" + "github.com/pingcap/log" "github.com/samber/lo" + "go.uber.org/zap" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/querypb" @@ -32,6 +34,9 @@ type CollectionTarget struct { dmChannels map[string]*DmChannel partitions typeutil.Set[int64] // stores target partitions info version int64 + + // record target status, if target has been save before milvus v2.4.19, then the target will lack of segment info. + lackSegmentInfo bool } func NewCollectionTarget(segments map[int64]*datapb.SegmentInfo, dmChannels map[string]*DmChannel, partitionIDs []int64) *CollectionTarget { @@ -48,9 +53,13 @@ func FromPbCollectionTarget(target *querypb.CollectionTarget) *CollectionTarget dmChannels := make(map[string]*DmChannel) var partitions []int64 + lackSegmentInfo := false for _, t := range target.GetChannelTargets() { for _, partition := range t.GetPartitionTargets() { for _, segment := range partition.GetSegments() { + if segment.GetNumOfRows() <= 0 { + lackSegmentInfo = true + } segments[segment.GetID()] = &datapb.SegmentInfo{ ID: segment.GetID(), Level: segment.GetLevel(), @@ -74,11 +83,16 @@ func FromPbCollectionTarget(target *querypb.CollectionTarget) *CollectionTarget } } + if lackSegmentInfo { + log.Info("target has lack of segment info", zap.Int64("collectionID", target.GetCollectionID())) + } + return &CollectionTarget{ - segments: segments, - dmChannels: dmChannels, - partitions: typeutil.NewSet(partitions...), - version: target.GetVersion(), + segments: segments, + dmChannels: dmChannels, + partitions: typeutil.NewSet(partitions...), + version: target.GetVersion(), + lackSegmentInfo: lackSegmentInfo, } } @@ -159,6 +173,11 @@ func (p *CollectionTarget) IsEmpty() bool { return len(p.dmChannels)+len(p.segments) == 0 } +// if target is ready, it should have all segment info +func (p *CollectionTarget) Ready() bool { + return !p.lackSegmentInfo +} + type target struct { // just maintain target at collection level collectionTargetMap map[int64]*CollectionTarget diff --git a/internal/querycoordv2/meta/target_manager.go b/internal/querycoordv2/meta/target_manager.go index 9c74792380778..456c6e6cddb79 100644 --- a/internal/querycoordv2/meta/target_manager.go +++ b/internal/querycoordv2/meta/target_manager.go @@ -68,6 +68,7 @@ type TargetManagerInterface interface { SaveCurrentTarget(catalog metastore.QueryCoordCatalog) Recover(catalog metastore.QueryCoordCatalog) error CanSegmentBeMoved(collectionID, segmentID int64) bool + IsCurrentTargetReady(collectionID int64) bool } type TargetManager struct { @@ -631,3 +632,14 @@ func (mgr *TargetManager) CanSegmentBeMoved(collectionID, segmentID int64) bool return false } + +func (mgr *TargetManager) IsCurrentTargetReady(collectionID int64) bool { + mgr.rwMutex.RLock() + defer mgr.rwMutex.RUnlock() + target, ok := mgr.current.collectionTargetMap[collectionID] + if !ok { + return false + } + + return target.Ready() +} diff --git a/internal/querycoordv2/meta/target_manager_test.go b/internal/querycoordv2/meta/target_manager_test.go index ede4d64eafe95..c76c78f5a8f0e 100644 --- a/internal/querycoordv2/meta/target_manager_test.go +++ b/internal/querycoordv2/meta/target_manager_test.go @@ -600,6 +600,7 @@ func (suite *TargetManagerSuite) TestRecover() { for _, segment := range target.GetAllSegments() { suite.Equal(int64(100), segment.GetNumOfRows()) } + suite.True(target.Ready()) // after recover, target info should be cleaned up targets, err := suite.catalog.GetCollectionTargets()