diff --git a/internal/querycoordv2/balance/mock_balancer.go b/internal/querycoordv2/balance/mock_balancer.go index 5935b3c36c15a..e7f082f66745a 100644 --- a/internal/querycoordv2/balance/mock_balancer.go +++ b/internal/querycoordv2/balance/mock_balancer.go @@ -22,9 +22,9 @@ func (_m *MockBalancer) EXPECT() *MockBalancer_Expecter { return &MockBalancer_Expecter{mock: &_m.Mock} } -// AssignChannel provides a mock function with given fields: ctx, collectionID, channels, nodes, manualBalance -func (_m *MockBalancer) AssignChannel(ctx context.Context, collectionID int64, channels []*meta.DmChannel, nodes []int64, manualBalance bool) []ChannelAssignPlan { - ret := _m.Called(ctx, collectionID, channels, nodes, manualBalance) +// AssignChannel provides a mock function with given fields: ctx, collectionID, channels, nodes, forceAssign +func (_m *MockBalancer) AssignChannel(ctx context.Context, collectionID int64, channels []*meta.DmChannel, nodes []int64, forceAssign bool) []ChannelAssignPlan { + ret := _m.Called(ctx, collectionID, channels, nodes, forceAssign) if len(ret) == 0 { panic("no return value specified for AssignChannel") @@ -32,7 +32,7 @@ func (_m *MockBalancer) AssignChannel(ctx context.Context, collectionID int64, c var r0 []ChannelAssignPlan if rf, ok := ret.Get(0).(func(context.Context, int64, []*meta.DmChannel, []int64, bool) []ChannelAssignPlan); ok { - r0 = rf(ctx, collectionID, channels, nodes, manualBalance) + r0 = rf(ctx, collectionID, channels, nodes, forceAssign) } else { if ret.Get(0) != nil { r0 = ret.Get(0).([]ChannelAssignPlan) @@ -52,12 +52,12 @@ type MockBalancer_AssignChannel_Call struct { // - collectionID int64 // - channels []*meta.DmChannel // - nodes []int64 -// - manualBalance bool -func (_e *MockBalancer_Expecter) AssignChannel(ctx interface{}, collectionID interface{}, channels interface{}, nodes interface{}, manualBalance interface{}) *MockBalancer_AssignChannel_Call { - return &MockBalancer_AssignChannel_Call{Call: _e.mock.On("AssignChannel", ctx, collectionID, channels, nodes, manualBalance)} +// - forceAssign bool +func (_e *MockBalancer_Expecter) AssignChannel(ctx interface{}, collectionID interface{}, channels interface{}, nodes interface{}, forceAssign interface{}) *MockBalancer_AssignChannel_Call { + return &MockBalancer_AssignChannel_Call{Call: _e.mock.On("AssignChannel", ctx, collectionID, channels, nodes, forceAssign)} } -func (_c *MockBalancer_AssignChannel_Call) Run(run func(ctx context.Context, collectionID int64, channels []*meta.DmChannel, nodes []int64, manualBalance bool)) *MockBalancer_AssignChannel_Call { +func (_c *MockBalancer_AssignChannel_Call) Run(run func(ctx context.Context, collectionID int64, channels []*meta.DmChannel, nodes []int64, forceAssign bool)) *MockBalancer_AssignChannel_Call { _c.Call.Run(func(args mock.Arguments) { run(args[0].(context.Context), args[1].(int64), args[2].([]*meta.DmChannel), args[3].([]int64), args[4].(bool)) }) @@ -74,9 +74,9 @@ func (_c *MockBalancer_AssignChannel_Call) RunAndReturn(run func(context.Context return _c } -// AssignSegment provides a mock function with given fields: ctx, collectionID, segments, nodes, manualBalance -func (_m *MockBalancer) AssignSegment(ctx context.Context, collectionID int64, segments []*meta.Segment, nodes []int64, manualBalance bool) []SegmentAssignPlan { - ret := _m.Called(ctx, collectionID, segments, nodes, manualBalance) +// AssignSegment provides a mock function with given fields: ctx, collectionID, segments, nodes, forceAssign +func (_m *MockBalancer) AssignSegment(ctx context.Context, collectionID int64, segments []*meta.Segment, nodes []int64, forceAssign bool) []SegmentAssignPlan { + ret := _m.Called(ctx, collectionID, segments, nodes, forceAssign) if len(ret) == 0 { panic("no return value specified for AssignSegment") @@ -84,7 +84,7 @@ func (_m *MockBalancer) AssignSegment(ctx context.Context, collectionID int64, s var r0 []SegmentAssignPlan if rf, ok := ret.Get(0).(func(context.Context, int64, []*meta.Segment, []int64, bool) []SegmentAssignPlan); ok { - r0 = rf(ctx, collectionID, segments, nodes, manualBalance) + r0 = rf(ctx, collectionID, segments, nodes, forceAssign) } else { if ret.Get(0) != nil { r0 = ret.Get(0).([]SegmentAssignPlan) @@ -104,12 +104,12 @@ type MockBalancer_AssignSegment_Call struct { // - collectionID int64 // - segments []*meta.Segment // - nodes []int64 -// - manualBalance bool -func (_e *MockBalancer_Expecter) AssignSegment(ctx interface{}, collectionID interface{}, segments interface{}, nodes interface{}, manualBalance interface{}) *MockBalancer_AssignSegment_Call { - return &MockBalancer_AssignSegment_Call{Call: _e.mock.On("AssignSegment", ctx, collectionID, segments, nodes, manualBalance)} +// - forceAssign bool +func (_e *MockBalancer_Expecter) AssignSegment(ctx interface{}, collectionID interface{}, segments interface{}, nodes interface{}, forceAssign interface{}) *MockBalancer_AssignSegment_Call { + return &MockBalancer_AssignSegment_Call{Call: _e.mock.On("AssignSegment", ctx, collectionID, segments, nodes, forceAssign)} } -func (_c *MockBalancer_AssignSegment_Call) Run(run func(ctx context.Context, collectionID int64, segments []*meta.Segment, nodes []int64, manualBalance bool)) *MockBalancer_AssignSegment_Call { +func (_c *MockBalancer_AssignSegment_Call) Run(run func(ctx context.Context, collectionID int64, segments []*meta.Segment, nodes []int64, forceAssign bool)) *MockBalancer_AssignSegment_Call { _c.Call.Run(func(args mock.Arguments) { run(args[0].(context.Context), args[1].(int64), args[2].([]*meta.Segment), args[3].([]int64), args[4].(bool)) }) diff --git a/internal/querycoordv2/checkers/balance_checker.go b/internal/querycoordv2/checkers/balance_checker.go index fb7fa0447f40c..23f7aadb45f46 100644 --- a/internal/querycoordv2/checkers/balance_checker.go +++ b/internal/querycoordv2/checkers/balance_checker.go @@ -119,6 +119,17 @@ func (b *BalanceChecker) replicasToBalance(ctx context.Context) []int64 { return nil } + // Before performing balancing, check the CurrentTarget/LeaderView/Distribution for all collections. + // If any collection has unready info, skip the balance operation to avoid inconsistencies. + notReadyCollections := lo.Filter(loadedCollections, func(cid int64, _ int) bool { + // todo: should also check distribution and leader view in the future + return !b.targetMgr.IsCurrentTargetReady(ctx, cid) + }) + if len(notReadyCollections) > 0 { + log.RatedInfo(10, "skip normal balance, cause collection not ready for balance", zap.Int64s("collectionIDs", notReadyCollections)) + return nil + } + // iterator one normal collection in one round normalReplicasToBalance := make([]int64, 0) hasUnbalancedCollection := false diff --git a/internal/querycoordv2/checkers/balance_checker_test.go b/internal/querycoordv2/checkers/balance_checker_test.go index 2bc24627f8721..45e6eb3b9e8af 100644 --- a/internal/querycoordv2/checkers/balance_checker_test.go +++ b/internal/querycoordv2/checkers/balance_checker_test.go @@ -324,20 +324,8 @@ func (suite *BalanceCheckerTestSuite) TestTargetNotReady() { suite.checker.meta.ResourceManager.HandleNodeUp(ctx, nodeID1) suite.checker.meta.ResourceManager.HandleNodeUp(ctx, nodeID2) - segments := []*datapb.SegmentInfo{ - { - ID: 1, - PartitionID: 1, - InsertChannel: "test-insert-channel", - }, - } - channels := []*datapb.VchannelInfo{ - { - CollectionID: 1, - ChannelName: "test-insert-channel", - }, - } - suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, mock.Anything).Return(channels, segments, nil) + mockTarget := meta.NewMockTargetManager(suite.T()) + suite.checker.targetMgr = mockTarget // set collections meta cid1, replicaID1, partitionID1 := 1, 1, 1 @@ -347,8 +335,6 @@ func (suite *BalanceCheckerTestSuite) TestTargetNotReady() { partition1 := utils.CreateTestPartition(int64(cid1), int64(partitionID1)) suite.checker.meta.CollectionManager.PutCollection(ctx, collection1, partition1) suite.checker.meta.ReplicaManager.Put(ctx, replica1) - suite.targetMgr.UpdateCollectionNextTarget(ctx, int64(cid1)) - suite.targetMgr.UpdateCollectionCurrentTarget(ctx, int64(cid1)) cid2, replicaID2, partitionID2 := 2, 2, 2 collection2 := utils.CreateTestCollection(int64(cid2), int32(replicaID2)) @@ -358,6 +344,17 @@ func (suite *BalanceCheckerTestSuite) TestTargetNotReady() { suite.checker.meta.CollectionManager.PutCollection(ctx, collection2, partition2) suite.checker.meta.ReplicaManager.Put(ctx, replica2) + // test normal balance when one collection has unready target + mockTarget.EXPECT().IsNextTargetExist(mock.Anything, mock.Anything).Return(true) + mockTarget.EXPECT().IsCurrentTargetReady(mock.Anything, mock.Anything).Return(false) + replicasToBalance := suite.checker.replicasToBalance(ctx) + suite.Len(replicasToBalance, 0) + + // test stopping balance with target not ready + mockTarget.ExpectedCalls = nil + mockTarget.EXPECT().IsNextTargetExist(mock.Anything, mock.Anything).Return(false) + mockTarget.EXPECT().IsCurrentTargetExist(mock.Anything, int64(cid1), mock.Anything).Return(true) + mockTarget.EXPECT().IsCurrentTargetExist(mock.Anything, int64(cid2), mock.Anything).Return(false) mr1 := replica1.CopyForWrite() mr1.AddRONode(1) suite.checker.meta.ReplicaManager.Put(ctx, mr1.IntoReplica()) @@ -366,9 +363,8 @@ func (suite *BalanceCheckerTestSuite) TestTargetNotReady() { mr2.AddRONode(1) suite.checker.meta.ReplicaManager.Put(ctx, mr2.IntoReplica()) - // test stopping balance idsToBalance := []int64{int64(replicaID1)} - replicasToBalance := suite.checker.replicasToBalance(ctx) + replicasToBalance = suite.checker.replicasToBalance(ctx) suite.ElementsMatch(idsToBalance, replicasToBalance) } diff --git a/internal/querycoordv2/meta/mock_target_manager.go b/internal/querycoordv2/meta/mock_target_manager.go index cbf588960e166..6c69dff9f24fb 100644 --- a/internal/querycoordv2/meta/mock_target_manager.go +++ b/internal/querycoordv2/meta/mock_target_manager.go @@ -733,6 +733,53 @@ func (_c *MockTargetManager_IsCurrentTargetExist_Call) RunAndReturn(run func(con return _c } +// IsCurrentTargetReady provides a mock function with given fields: ctx, collectionID +func (_m *MockTargetManager) IsCurrentTargetReady(ctx context.Context, collectionID int64) bool { + ret := _m.Called(ctx, collectionID) + + if len(ret) == 0 { + panic("no return value specified for IsCurrentTargetReady") + } + + var r0 bool + if rf, ok := ret.Get(0).(func(context.Context, int64) bool); ok { + r0 = rf(ctx, collectionID) + } else { + r0 = ret.Get(0).(bool) + } + + return r0 +} + +// MockTargetManager_IsCurrentTargetReady_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'IsCurrentTargetReady' +type MockTargetManager_IsCurrentTargetReady_Call struct { + *mock.Call +} + +// IsCurrentTargetReady is a helper method to define mock.On call +// - ctx context.Context +// - collectionID int64 +func (_e *MockTargetManager_Expecter) IsCurrentTargetReady(ctx interface{}, collectionID interface{}) *MockTargetManager_IsCurrentTargetReady_Call { + return &MockTargetManager_IsCurrentTargetReady_Call{Call: _e.mock.On("IsCurrentTargetReady", ctx, collectionID)} +} + +func (_c *MockTargetManager_IsCurrentTargetReady_Call) Run(run func(ctx context.Context, collectionID int64)) *MockTargetManager_IsCurrentTargetReady_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(int64)) + }) + return _c +} + +func (_c *MockTargetManager_IsCurrentTargetReady_Call) Return(_a0 bool) *MockTargetManager_IsCurrentTargetReady_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockTargetManager_IsCurrentTargetReady_Call) RunAndReturn(run func(context.Context, int64) bool) *MockTargetManager_IsCurrentTargetReady_Call { + _c.Call.Return(run) + return _c +} + // IsNextTargetExist provides a mock function with given fields: ctx, collectionID func (_m *MockTargetManager) IsNextTargetExist(ctx context.Context, collectionID int64) bool { ret := _m.Called(ctx, collectionID) diff --git a/internal/querycoordv2/meta/target.go b/internal/querycoordv2/meta/target.go index 9526d697e3ef1..4795eade4cfae 100644 --- a/internal/querycoordv2/meta/target.go +++ b/internal/querycoordv2/meta/target.go @@ -20,10 +20,12 @@ import ( "time" "github.com/samber/lo" + "go.uber.org/zap" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/util/metrics" + "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/metricsinfo" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -34,6 +36,9 @@ type CollectionTarget struct { dmChannels map[string]*DmChannel partitions typeutil.Set[int64] // stores target partitions info version int64 + + // record target status, if target has been save before milvus v2.4.19, then the target will lack of segment info. + lackSegmentInfo bool } func NewCollectionTarget(segments map[int64]*datapb.SegmentInfo, dmChannels map[string]*DmChannel, partitionIDs []int64) *CollectionTarget { @@ -50,9 +55,13 @@ func FromPbCollectionTarget(target *querypb.CollectionTarget) *CollectionTarget dmChannels := make(map[string]*DmChannel) var partitions []int64 + lackSegmentInfo := false for _, t := range target.GetChannelTargets() { for _, partition := range t.GetPartitionTargets() { for _, segment := range partition.GetSegments() { + if segment.GetNumOfRows() <= 0 { + lackSegmentInfo = true + } segments[segment.GetID()] = &datapb.SegmentInfo{ ID: segment.GetID(), Level: segment.GetLevel(), @@ -76,11 +85,16 @@ func FromPbCollectionTarget(target *querypb.CollectionTarget) *CollectionTarget } } + if lackSegmentInfo { + log.Info("target has lack of segment info", zap.Int64("collectionID", target.GetCollectionID())) + } + return &CollectionTarget{ - segments: segments, - dmChannels: dmChannels, - partitions: typeutil.NewSet(partitions...), - version: target.GetVersion(), + segments: segments, + dmChannels: dmChannels, + partitions: typeutil.NewSet(partitions...), + version: target.GetVersion(), + lackSegmentInfo: lackSegmentInfo, } } @@ -161,6 +175,11 @@ func (p *CollectionTarget) IsEmpty() bool { return len(p.dmChannels)+len(p.segments) == 0 } +// if target is ready, it should have all segment info +func (p *CollectionTarget) Ready() bool { + return !p.lackSegmentInfo +} + type target struct { // just maintain target at collection level collectionTargetMap map[int64]*CollectionTarget diff --git a/internal/querycoordv2/meta/target_manager.go b/internal/querycoordv2/meta/target_manager.go index 5b7c0f6567281..10fe0b787b55d 100644 --- a/internal/querycoordv2/meta/target_manager.go +++ b/internal/querycoordv2/meta/target_manager.go @@ -72,6 +72,7 @@ type TargetManagerInterface interface { CanSegmentBeMoved(ctx context.Context, collectionID, segmentID int64) bool GetTargetJSON(ctx context.Context, scope TargetScope) string GetPartitions(ctx context.Context, collectionID int64, scope TargetScope) ([]int64, error) + IsCurrentTargetReady(ctx context.Context, collectionID int64) bool } type TargetManager struct { @@ -673,3 +674,14 @@ func (mgr *TargetManager) getTarget(scope TargetScope) *target { return mgr.next } + +func (mgr *TargetManager) IsCurrentTargetReady(ctx context.Context, collectionID int64) bool { + mgr.rwMutex.RLock() + defer mgr.rwMutex.RUnlock() + target, ok := mgr.current.collectionTargetMap[collectionID] + if !ok { + return false + } + + return target.Ready() +} diff --git a/internal/querycoordv2/meta/target_manager_test.go b/internal/querycoordv2/meta/target_manager_test.go index d8bd7b05c362e..34bf64136a2e2 100644 --- a/internal/querycoordv2/meta/target_manager_test.go +++ b/internal/querycoordv2/meta/target_manager_test.go @@ -614,6 +614,7 @@ func (suite *TargetManagerSuite) TestRecover() { for _, segment := range target.GetAllSegments() { suite.Equal(int64(100), segment.GetNumOfRows()) } + suite.True(target.Ready()) // after recover, target info should be cleaned up targets, err := suite.catalog.GetCollectionTargets(ctx)