From fcec4c21b99c83c45d03b2d4b7bd8e1de3a02649 Mon Sep 17 00:00:00 2001 From: jaime Date: Fri, 2 Aug 2024 17:20:15 +0800 Subject: [PATCH] fix: check collection health(queryable) fail for releasing collection (#34947) issue: #34946 Signed-off-by: jaime --- configs/milvus.yaml | 1 + .../balance/channel_level_score_balancer.go | 2 +- .../balance/multi_target_balance.go | 4 +- .../balance/rowcount_based_balancer.go | 4 +- .../balance/score_based_balancer.go | 2 +- internal/querycoordv2/dist/dist_controller.go | 4 +- internal/querycoordv2/job/job_load.go | 8 +- internal/querycoordv2/job/job_release.go | 8 +- internal/querycoordv2/job/undo.go | 4 +- .../querycoordv2/meta/mock_target_manager.go | 43 +++++++++++ internal/querycoordv2/meta/target_manager.go | 1 + .../observers/collection_observer.go | 4 +- .../querycoordv2/observers/target_observer.go | 4 +- internal/querycoordv2/server.go | 2 +- internal/querycoordv2/services_test.go | 73 ++++++++++++------- internal/querycoordv2/task/executor.go | 4 +- internal/querycoordv2/task/scheduler.go | 4 +- internal/querycoordv2/utils/util.go | 55 +++++++++----- pkg/util/paramtable/component_param.go | 18 ++++- pkg/util/paramtable/component_param_test.go | 3 + 20 files changed, 173 insertions(+), 75 deletions(-) diff --git a/configs/milvus.yaml b/configs/milvus.yaml index af51ae932234a..1fa1ddd4e8b60 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -367,6 +367,7 @@ queryCoord: channelExclusiveNodeFactor: 4 # the least node number for enable channel's exclusive mode collectionObserverInterval: 200 # the interval of collection observer checkExecutedFlagInterval: 100 # the interval of check executed flag to force to pull dist + updateCollectionLoadStatusInterval: 5 # 5m, max interval of updating collection loaded status for check health cleanExcludeSegmentInterval: 60 # the time duration of clean pipeline exclude segment which used for filter invalid data, in seconds ip: # TCP/IP address of queryCoord. If not specified, use the first unicastable address port: 19531 # TCP port of queryCoord diff --git a/internal/querycoordv2/balance/channel_level_score_balancer.go b/internal/querycoordv2/balance/channel_level_score_balancer.go index 29c17b894e7bf..bfbbb659bac77 100644 --- a/internal/querycoordv2/balance/channel_level_score_balancer.go +++ b/internal/querycoordv2/balance/channel_level_score_balancer.go @@ -41,7 +41,7 @@ func NewChannelLevelScoreBalancer(scheduler task.Scheduler, nodeManager *session.NodeManager, dist *meta.DistributionManager, meta *meta.Meta, - targetMgr *meta.TargetManager, + targetMgr meta.TargetManagerInterface, ) *ChannelLevelScoreBalancer { return &ChannelLevelScoreBalancer{ ScoreBasedBalancer: NewScoreBasedBalancer(scheduler, nodeManager, dist, meta, targetMgr), diff --git a/internal/querycoordv2/balance/multi_target_balance.go b/internal/querycoordv2/balance/multi_target_balance.go index 3670244cd5486..e311fd70c1cf2 100644 --- a/internal/querycoordv2/balance/multi_target_balance.go +++ b/internal/querycoordv2/balance/multi_target_balance.go @@ -452,7 +452,7 @@ func (g *randomPlanGenerator) generatePlans() []SegmentAssignPlan { type MultiTargetBalancer struct { *ScoreBasedBalancer dist *meta.DistributionManager - targetMgr *meta.TargetManager + targetMgr meta.TargetManagerInterface } func (b *MultiTargetBalancer) BalanceReplica(replica *meta.Replica) ([]SegmentAssignPlan, []ChannelAssignPlan) { @@ -548,7 +548,7 @@ func (b *MultiTargetBalancer) genPlanByDistributions(nodeSegments, globalNodeSeg return plans } -func NewMultiTargetBalancer(scheduler task.Scheduler, nodeManager *session.NodeManager, dist *meta.DistributionManager, meta *meta.Meta, targetMgr *meta.TargetManager) *MultiTargetBalancer { +func NewMultiTargetBalancer(scheduler task.Scheduler, nodeManager *session.NodeManager, dist *meta.DistributionManager, meta *meta.Meta, targetMgr meta.TargetManagerInterface) *MultiTargetBalancer { return &MultiTargetBalancer{ ScoreBasedBalancer: NewScoreBasedBalancer(scheduler, nodeManager, dist, meta, targetMgr), dist: dist, diff --git a/internal/querycoordv2/balance/rowcount_based_balancer.go b/internal/querycoordv2/balance/rowcount_based_balancer.go index f6c75f2ada929..d4fba76a65f0a 100644 --- a/internal/querycoordv2/balance/rowcount_based_balancer.go +++ b/internal/querycoordv2/balance/rowcount_based_balancer.go @@ -36,7 +36,7 @@ type RowCountBasedBalancer struct { *RoundRobinBalancer dist *meta.DistributionManager meta *meta.Meta - targetMgr *meta.TargetManager + targetMgr meta.TargetManagerInterface } // AssignSegment, when row count based balancer assign segments, it will assign segment to node with least global row count. @@ -354,7 +354,7 @@ func NewRowCountBasedBalancer( nodeManager *session.NodeManager, dist *meta.DistributionManager, meta *meta.Meta, - targetMgr *meta.TargetManager, + targetMgr meta.TargetManagerInterface, ) *RowCountBasedBalancer { return &RowCountBasedBalancer{ RoundRobinBalancer: NewRoundRobinBalancer(scheduler, nodeManager), diff --git a/internal/querycoordv2/balance/score_based_balancer.go b/internal/querycoordv2/balance/score_based_balancer.go index d97ca5dfc9296..270ed7f4ca239 100644 --- a/internal/querycoordv2/balance/score_based_balancer.go +++ b/internal/querycoordv2/balance/score_based_balancer.go @@ -41,7 +41,7 @@ func NewScoreBasedBalancer(scheduler task.Scheduler, nodeManager *session.NodeManager, dist *meta.DistributionManager, meta *meta.Meta, - targetMgr *meta.TargetManager, + targetMgr meta.TargetManagerInterface, ) *ScoreBasedBalancer { return &ScoreBasedBalancer{ RowCountBasedBalancer: NewRowCountBasedBalancer(scheduler, nodeManager, dist, meta, targetMgr), diff --git a/internal/querycoordv2/dist/dist_controller.go b/internal/querycoordv2/dist/dist_controller.go index 521b6cfd95edf..3c726091d3603 100644 --- a/internal/querycoordv2/dist/dist_controller.go +++ b/internal/querycoordv2/dist/dist_controller.go @@ -41,7 +41,7 @@ type ControllerImpl struct { client session.Cluster nodeManager *session.NodeManager dist *meta.DistributionManager - targetMgr *meta.TargetManager + targetMgr meta.TargetManagerInterface scheduler task.Scheduler } @@ -98,7 +98,7 @@ func NewDistController( client session.Cluster, nodeManager *session.NodeManager, dist *meta.DistributionManager, - targetMgr *meta.TargetManager, + targetMgr meta.TargetManagerInterface, scheduler task.Scheduler, ) *ControllerImpl { return &ControllerImpl{ diff --git a/internal/querycoordv2/job/job_load.go b/internal/querycoordv2/job/job_load.go index 03b4de5e332d1..bdefa088ca796 100644 --- a/internal/querycoordv2/job/job_load.go +++ b/internal/querycoordv2/job/job_load.go @@ -48,7 +48,7 @@ type LoadCollectionJob struct { meta *meta.Meta broker meta.Broker cluster session.Cluster - targetMgr *meta.TargetManager + targetMgr meta.TargetManagerInterface targetObserver *observers.TargetObserver collectionObserver *observers.CollectionObserver nodeMgr *session.NodeManager @@ -61,7 +61,7 @@ func NewLoadCollectionJob( meta *meta.Meta, broker meta.Broker, cluster session.Cluster, - targetMgr *meta.TargetManager, + targetMgr meta.TargetManagerInterface, targetObserver *observers.TargetObserver, collectionObserver *observers.CollectionObserver, nodeMgr *session.NodeManager, @@ -239,7 +239,7 @@ type LoadPartitionJob struct { meta *meta.Meta broker meta.Broker cluster session.Cluster - targetMgr *meta.TargetManager + targetMgr meta.TargetManagerInterface targetObserver *observers.TargetObserver collectionObserver *observers.CollectionObserver nodeMgr *session.NodeManager @@ -252,7 +252,7 @@ func NewLoadPartitionJob( meta *meta.Meta, broker meta.Broker, cluster session.Cluster, - targetMgr *meta.TargetManager, + targetMgr meta.TargetManagerInterface, targetObserver *observers.TargetObserver, collectionObserver *observers.CollectionObserver, nodeMgr *session.NodeManager, diff --git a/internal/querycoordv2/job/job_release.go b/internal/querycoordv2/job/job_release.go index 57ad526d94071..49841510312f1 100644 --- a/internal/querycoordv2/job/job_release.go +++ b/internal/querycoordv2/job/job_release.go @@ -39,7 +39,7 @@ type ReleaseCollectionJob struct { meta *meta.Meta broker meta.Broker cluster session.Cluster - targetMgr *meta.TargetManager + targetMgr meta.TargetManagerInterface targetObserver *observers.TargetObserver checkerController *checkers.CheckerController } @@ -50,7 +50,7 @@ func NewReleaseCollectionJob(ctx context.Context, meta *meta.Meta, broker meta.Broker, cluster session.Cluster, - targetMgr *meta.TargetManager, + targetMgr meta.TargetManagerInterface, targetObserver *observers.TargetObserver, checkerController *checkers.CheckerController, ) *ReleaseCollectionJob { @@ -114,7 +114,7 @@ type ReleasePartitionJob struct { meta *meta.Meta broker meta.Broker cluster session.Cluster - targetMgr *meta.TargetManager + targetMgr meta.TargetManagerInterface targetObserver *observers.TargetObserver checkerController *checkers.CheckerController } @@ -125,7 +125,7 @@ func NewReleasePartitionJob(ctx context.Context, meta *meta.Meta, broker meta.Broker, cluster session.Cluster, - targetMgr *meta.TargetManager, + targetMgr meta.TargetManagerInterface, targetObserver *observers.TargetObserver, checkerController *checkers.CheckerController, ) *ReleasePartitionJob { diff --git a/internal/querycoordv2/job/undo.go b/internal/querycoordv2/job/undo.go index 64b89bb78c2d2..6ac0b62296b3b 100644 --- a/internal/querycoordv2/job/undo.go +++ b/internal/querycoordv2/job/undo.go @@ -38,12 +38,12 @@ type UndoList struct { ctx context.Context meta *meta.Meta cluster session.Cluster - targetMgr *meta.TargetManager + targetMgr meta.TargetManagerInterface targetObserver *observers.TargetObserver } func NewUndoList(ctx context.Context, meta *meta.Meta, - cluster session.Cluster, targetMgr *meta.TargetManager, targetObserver *observers.TargetObserver, + cluster session.Cluster, targetMgr meta.TargetManagerInterface, targetObserver *observers.TargetObserver, ) *UndoList { return &UndoList{ ctx: ctx, diff --git a/internal/querycoordv2/meta/mock_target_manager.go b/internal/querycoordv2/meta/mock_target_manager.go index 3637cc420483a..508e7cf3eb6b3 100644 --- a/internal/querycoordv2/meta/mock_target_manager.go +++ b/internal/querycoordv2/meta/mock_target_manager.go @@ -24,6 +24,49 @@ func (_m *MockTargetManager) EXPECT() *MockTargetManager_Expecter { return &MockTargetManager_Expecter{mock: &_m.Mock} } +// CanSegmentBeMoved provides a mock function with given fields: collectionID, segmentID +func (_m *MockTargetManager) CanSegmentBeMoved(collectionID int64, segmentID int64) bool { + ret := _m.Called(collectionID, segmentID) + + var r0 bool + if rf, ok := ret.Get(0).(func(int64, int64) bool); ok { + r0 = rf(collectionID, segmentID) + } else { + r0 = ret.Get(0).(bool) + } + + return r0 +} + +// MockTargetManager_CanSegmentBeMoved_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CanSegmentBeMoved' +type MockTargetManager_CanSegmentBeMoved_Call struct { + *mock.Call +} + +// CanSegmentBeMoved is a helper method to define mock.On call +// - collectionID int64 +// - segmentID int64 +func (_e *MockTargetManager_Expecter) CanSegmentBeMoved(collectionID interface{}, segmentID interface{}) *MockTargetManager_CanSegmentBeMoved_Call { + return &MockTargetManager_CanSegmentBeMoved_Call{Call: _e.mock.On("CanSegmentBeMoved", collectionID, segmentID)} +} + +func (_c *MockTargetManager_CanSegmentBeMoved_Call) Run(run func(collectionID int64, segmentID int64)) *MockTargetManager_CanSegmentBeMoved_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(int64), args[1].(int64)) + }) + return _c +} + +func (_c *MockTargetManager_CanSegmentBeMoved_Call) Return(_a0 bool) *MockTargetManager_CanSegmentBeMoved_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockTargetManager_CanSegmentBeMoved_Call) RunAndReturn(run func(int64, int64) bool) *MockTargetManager_CanSegmentBeMoved_Call { + _c.Call.Return(run) + return _c +} + // GetCollectionTargetVersion provides a mock function with given fields: collectionID, scope func (_m *MockTargetManager) GetCollectionTargetVersion(collectionID int64, scope int32) int64 { ret := _m.Called(collectionID, scope) diff --git a/internal/querycoordv2/meta/target_manager.go b/internal/querycoordv2/meta/target_manager.go index 42c8022cb3c91..6d1eb062e4174 100644 --- a/internal/querycoordv2/meta/target_manager.go +++ b/internal/querycoordv2/meta/target_manager.go @@ -71,6 +71,7 @@ type TargetManagerInterface interface { IsNextTargetExist(collectionID int64) bool SaveCurrentTarget(catalog metastore.QueryCoordCatalog) Recover(catalog metastore.QueryCoordCatalog) error + CanSegmentBeMoved(collectionID, segmentID int64) bool } type TargetManager struct { diff --git a/internal/querycoordv2/observers/collection_observer.go b/internal/querycoordv2/observers/collection_observer.go index 9080cee8f0f2a..6a935a6d521f4 100644 --- a/internal/querycoordv2/observers/collection_observer.go +++ b/internal/querycoordv2/observers/collection_observer.go @@ -43,7 +43,7 @@ type CollectionObserver struct { dist *meta.DistributionManager meta *meta.Meta - targetMgr *meta.TargetManager + targetMgr meta.TargetManagerInterface targetObserver *TargetObserver checkerController *checkers.CheckerController partitionLoadedCount map[int64]int @@ -62,7 +62,7 @@ type LoadTask struct { func NewCollectionObserver( dist *meta.DistributionManager, meta *meta.Meta, - targetMgr *meta.TargetManager, + targetMgr meta.TargetManagerInterface, targetObserver *TargetObserver, checherController *checkers.CheckerController, ) *CollectionObserver { diff --git a/internal/querycoordv2/observers/target_observer.go b/internal/querycoordv2/observers/target_observer.go index 7d3087b83dafa..8c5bf46596a61 100644 --- a/internal/querycoordv2/observers/target_observer.go +++ b/internal/querycoordv2/observers/target_observer.go @@ -55,7 +55,7 @@ type TargetObserver struct { cancel context.CancelFunc wg sync.WaitGroup meta *meta.Meta - targetMgr *meta.TargetManager + targetMgr meta.TargetManagerInterface distMgr *meta.DistributionManager broker meta.Broker cluster session.Cluster @@ -76,7 +76,7 @@ type TargetObserver struct { func NewTargetObserver( meta *meta.Meta, - targetMgr *meta.TargetManager, + targetMgr meta.TargetManagerInterface, distMgr *meta.DistributionManager, broker meta.Broker, cluster session.Cluster, diff --git a/internal/querycoordv2/server.go b/internal/querycoordv2/server.go index 7e39f54a4bb0e..d2c997e339c88 100644 --- a/internal/querycoordv2/server.go +++ b/internal/querycoordv2/server.go @@ -90,7 +90,7 @@ type Server struct { store metastore.QueryCoordCatalog meta *meta.Meta dist *meta.DistributionManager - targetMgr *meta.TargetManager + targetMgr meta.TargetManagerInterface broker meta.Broker // Session diff --git a/internal/querycoordv2/services_test.go b/internal/querycoordv2/services_test.go index 782b672ab5173..75348e7c832f4 100644 --- a/internal/querycoordv2/services_test.go +++ b/internal/querycoordv2/services_test.go @@ -1610,44 +1610,65 @@ func (suite *ServiceSuite) TestGetReplicasWhenNoAvailableNodes() { } func (suite *ServiceSuite) TestCheckHealth() { + suite.loadAll() ctx := context.Background() server := suite.server - // Test for server is not healthy - server.UpdateStateCode(commonpb.StateCode_Initializing) - resp, err := server.CheckHealth(ctx, &milvuspb.CheckHealthRequest{}) - suite.NoError(err) - suite.Equal(resp.IsHealthy, false) - suite.NotEmpty(resp.Reasons) + assertCheckHealthResult := func(isHealthy bool) { + resp, err := server.CheckHealth(ctx, &milvuspb.CheckHealthRequest{}) + suite.NoError(err) + suite.Equal(resp.IsHealthy, isHealthy) + if !isHealthy { + suite.NotEmpty(resp.Reasons) + } else { + suite.Empty(resp.Reasons) + } + } - // Test for components state fail - for _, node := range suite.nodes { - suite.cluster.EXPECT().GetComponentStates(mock.Anything, node).Return( + setNodeSate := func(state commonpb.StateCode) { + // Test for components state fail + suite.cluster.EXPECT().GetComponentStates(mock.Anything, mock.Anything).Unset() + suite.cluster.EXPECT().GetComponentStates(mock.Anything, mock.Anything).Return( &milvuspb.ComponentStates{ - State: &milvuspb.ComponentInfo{StateCode: commonpb.StateCode_Abnormal}, + State: &milvuspb.ComponentInfo{StateCode: state}, Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, }, - nil).Once() + nil).Maybe() } + + // Test for server is not healthy + server.UpdateStateCode(commonpb.StateCode_Initializing) + assertCheckHealthResult(false) + + // Test for components state fail + setNodeSate(commonpb.StateCode_Abnormal) server.UpdateStateCode(commonpb.StateCode_Healthy) - resp, err = server.CheckHealth(ctx, &milvuspb.CheckHealthRequest{}) - suite.NoError(err) - suite.Equal(resp.IsHealthy, false) - suite.NotEmpty(resp.Reasons) + assertCheckHealthResult(false) - // Test for server is healthy + // Test for check load percentage fail + setNodeSate(commonpb.StateCode_Healthy) + assertCheckHealthResult(true) + + // Test for check channel ok + for _, collection := range suite.collections { + suite.updateCollectionStatus(collection, querypb.LoadStatus_Loaded) + suite.updateChannelDist(collection) + } + assertCheckHealthResult(true) + + // Test for check channel fail + tm := meta.NewMockTargetManager(suite.T()) + tm.EXPECT().GetDmChannelsByCollection(mock.Anything, mock.Anything).Return(nil).Maybe() + otm := server.targetMgr + server.targetMgr = tm + assertCheckHealthResult(true) + + // Test for get shard leader fail + server.targetMgr = otm for _, node := range suite.nodes { - suite.cluster.EXPECT().GetComponentStates(mock.Anything, node).Return( - &milvuspb.ComponentStates{ - State: &milvuspb.ComponentInfo{StateCode: commonpb.StateCode_Healthy}, - Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, - }, - nil).Once() + suite.nodeMgr.Suspend(node) } - resp, err = server.CheckHealth(ctx, &milvuspb.CheckHealthRequest{}) - suite.NoError(err) - suite.Equal(resp.IsHealthy, true) - suite.Empty(resp.Reasons) + assertCheckHealthResult(true) } func (suite *ServiceSuite) TestGetShardLeaders() { diff --git a/internal/querycoordv2/task/executor.go b/internal/querycoordv2/task/executor.go index 1ee4df6a9cb5b..a4bac2ff11f4e 100644 --- a/internal/querycoordv2/task/executor.go +++ b/internal/querycoordv2/task/executor.go @@ -57,7 +57,7 @@ type Executor struct { meta *meta.Meta dist *meta.DistributionManager broker meta.Broker - targetMgr *meta.TargetManager + targetMgr meta.TargetManagerInterface cluster session.Cluster nodeMgr *session.NodeManager @@ -69,7 +69,7 @@ type Executor struct { func NewExecutor(meta *meta.Meta, dist *meta.DistributionManager, broker meta.Broker, - targetMgr *meta.TargetManager, + targetMgr meta.TargetManagerInterface, cluster session.Cluster, nodeMgr *session.NodeManager, ) *Executor { diff --git a/internal/querycoordv2/task/scheduler.go b/internal/querycoordv2/task/scheduler.go index d167f864ddf08..641c2c58a4480 100644 --- a/internal/querycoordv2/task/scheduler.go +++ b/internal/querycoordv2/task/scheduler.go @@ -157,7 +157,7 @@ type taskScheduler struct { distMgr *meta.DistributionManager meta *meta.Meta - targetMgr *meta.TargetManager + targetMgr meta.TargetManagerInterface broker meta.Broker cluster session.Cluster nodeMgr *session.NodeManager @@ -177,7 +177,7 @@ type taskScheduler struct { func NewScheduler(ctx context.Context, meta *meta.Meta, distMgr *meta.DistributionManager, - targetMgr *meta.TargetManager, + targetMgr meta.TargetManagerInterface, broker meta.Broker, cluster session.Cluster, nodeMgr *session.NodeManager, diff --git a/internal/querycoordv2/utils/util.go b/internal/querycoordv2/utils/util.go index 6ebf34232d0f8..4ef2f685d368a 100644 --- a/internal/querycoordv2/utils/util.go +++ b/internal/querycoordv2/utils/util.go @@ -19,6 +19,7 @@ package utils import ( "context" "fmt" + "time" "go.uber.org/multierr" "go.uber.org/zap" @@ -29,6 +30,7 @@ import ( "github.com/milvus-io/milvus/internal/querycoordv2/session" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/merr" + "github.com/milvus-io/milvus/pkg/util/paramtable" ) func CheckNodeAvailable(nodeID int64, info *session.NodeInfo) error { @@ -104,7 +106,7 @@ func checkLoadStatus(m *meta.Meta, collectionID int64) error { return nil } -func GetShardLeadersWithChannels(m *meta.Meta, targetMgr *meta.TargetManager, dist *meta.DistributionManager, +func GetShardLeadersWithChannels(m *meta.Meta, targetMgr meta.TargetManagerInterface, dist *meta.DistributionManager, nodeMgr *session.NodeManager, collectionID int64, channels map[string]*meta.DmChannel, ) ([]*querypb.ShardLeadersList, error) { ret := make([]*querypb.ShardLeadersList, 0) @@ -163,7 +165,7 @@ func GetShardLeadersWithChannels(m *meta.Meta, targetMgr *meta.TargetManager, di return ret, nil } -func GetShardLeaders(m *meta.Meta, targetMgr *meta.TargetManager, dist *meta.DistributionManager, nodeMgr *session.NodeManager, collectionID int64) ([]*querypb.ShardLeadersList, error) { +func GetShardLeaders(m *meta.Meta, targetMgr meta.TargetManagerInterface, dist *meta.DistributionManager, nodeMgr *session.NodeManager, collectionID int64) ([]*querypb.ShardLeadersList, error) { if err := checkLoadStatus(m, collectionID); err != nil { return nil, err } @@ -179,30 +181,45 @@ func GetShardLeaders(m *meta.Meta, targetMgr *meta.TargetManager, dist *meta.Dis } // CheckCollectionsQueryable check all channels are watched and all segments are loaded for this collection -func CheckCollectionsQueryable(m *meta.Meta, targetMgr *meta.TargetManager, dist *meta.DistributionManager, nodeMgr *session.NodeManager) error { +func CheckCollectionsQueryable(m *meta.Meta, targetMgr meta.TargetManagerInterface, dist *meta.DistributionManager, nodeMgr *session.NodeManager) error { + maxInterval := paramtable.Get().QueryCoordCfg.UpdateCollectionLoadStatusInterval.GetAsDuration(time.Minute) for _, coll := range m.GetAllCollections() { - collectionID := coll.GetCollectionID() - if err := checkLoadStatus(m, collectionID); err != nil { + err := checkCollectionQueryable(m, targetMgr, dist, nodeMgr, coll) + // the collection is not queryable, if meet following conditions: + // 1. Some segments are not loaded + // 2. Collection is not starting to release + // 3. The load percentage has not been updated in the last 5 minutes. + if err != nil && m.Exist(coll.CollectionID) && time.Since(coll.UpdatedAt) >= maxInterval { return err } + } + return nil +} - channels := targetMgr.GetDmChannelsByCollection(collectionID, meta.CurrentTarget) - if len(channels) == 0 { - msg := "loaded collection do not found any channel in target, may be in recovery" - err := merr.WrapErrCollectionOnRecovering(collectionID, msg) - log.Warn("failed to get channels", zap.Error(err)) - return err - } +// checkCollectionQueryable check all channels are watched and all segments are loaded for this collection +func checkCollectionQueryable(m *meta.Meta, targetMgr meta.TargetManagerInterface, dist *meta.DistributionManager, nodeMgr *session.NodeManager, coll *meta.Collection) error { + collectionID := coll.GetCollectionID() + if err := checkLoadStatus(m, collectionID); err != nil { + return err + } - shardList, err := GetShardLeadersWithChannels(m, targetMgr, dist, nodeMgr, collectionID, channels) - if err != nil { - return err - } + channels := targetMgr.GetDmChannelsByCollection(collectionID, meta.CurrentTarget) + if len(channels) == 0 { + msg := "loaded collection do not found any channel in target, may be in recovery" + err := merr.WrapErrCollectionOnRecovering(collectionID, msg) + log.Warn("failed to get channels", zap.Error(err)) + return err + } - if len(channels) != len(shardList) { - return merr.WrapErrCollectionNotFullyLoaded(collectionID, "still have unwatched channels or loaded segments") - } + shardList, err := GetShardLeadersWithChannels(m, targetMgr, dist, nodeMgr, collectionID, channels) + if err != nil { + return err } + + if len(channels) != len(shardList) { + return merr.WrapErrCollectionNotFullyLoaded(collectionID, "still have unwatched channels or loaded segments") + } + return nil } diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index 36e0c6027db51..09cb245575b69 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -1696,9 +1696,10 @@ type queryCoordConfig struct { EnableStoppingBalance ParamItem `refreshable:"true"` ChannelExclusiveNodeFactor ParamItem `refreshable:"true"` - CollectionObserverInterval ParamItem `refreshable:"false"` - CheckExecutedFlagInterval ParamItem `refreshable:"false"` - CollectionBalanceSegmentBatchSize ParamItem `refreshable:"true"` + CollectionObserverInterval ParamItem `refreshable:"false"` + CheckExecutedFlagInterval ParamItem `refreshable:"false"` + CollectionBalanceSegmentBatchSize ParamItem `refreshable:"true"` + UpdateCollectionLoadStatusInterval ParamItem `refreshable:"false"` } func (p *queryCoordConfig) init(base *BaseTable) { @@ -2093,6 +2094,17 @@ If this parameter is set false, Milvus simply searches the growing segments with } p.CheckHealthInterval.Init(base.mgr) + p.UpdateCollectionLoadStatusInterval = ParamItem{ + Key: "queryCoord.updateCollectionLoadStatusInterval", + Version: "2.4.7", + DefaultValue: "5", + PanicIfEmpty: true, + Doc: "5m, max interval of updating collection loaded status for check health", + Export: true, + } + + p.UpdateCollectionLoadStatusInterval.Init(base.mgr) + p.CheckHealthRPCTimeout = ParamItem{ Key: "queryCoord.checkHealthRPCTimeout", Version: "2.2.7", diff --git a/pkg/util/paramtable/component_param_test.go b/pkg/util/paramtable/component_param_test.go index 33fc7466d0d51..f25f167c1488f 100644 --- a/pkg/util/paramtable/component_param_test.go +++ b/pkg/util/paramtable/component_param_test.go @@ -302,6 +302,9 @@ func TestComponentParam(t *testing.T) { checkHealthRPCTimeout := Params.CheckHealthRPCTimeout.GetAsInt() assert.Equal(t, 2000, checkHealthRPCTimeout) + updateInterval := Params.UpdateCollectionLoadStatusInterval.GetAsDuration(time.Minute) + assert.Equal(t, updateInterval, time.Minute*5) + assert.Equal(t, 0.1, Params.GlobalRowCountFactor.GetAsFloat()) params.Save("queryCoord.globalRowCountFactor", "0.4") assert.Equal(t, 0.4, Params.GlobalRowCountFactor.GetAsFloat())