From 99da46dd0b35f02cf7a33e4c5c6e29582b5621c0 Mon Sep 17 00:00:00 2001 From: "yihao.dai" Date: Thu, 21 Nov 2024 15:11:03 +0800 Subject: [PATCH] fix: [10kcp] Fix load slowly (#37454) (#37878) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When there're a lot of loaded collections, they would occupy the target observer scheduler’s pool. This prevents loading collections from updating the current target in time, slowing down the load process. This PR adds a separate target dispatcher for loading collections. issue: https://github.com/milvus-io/milvus/issues/37166 pr: https://github.com/milvus-io/milvus/pull/37454 Signed-off-by: bigsheeper --- .../querycoordv2/observers/target_observer.go | 30 +++++++++++++------ .../observers/target_observer_test.go | 7 +++-- 2 files changed, 26 insertions(+), 11 deletions(-) diff --git a/internal/querycoordv2/observers/target_observer.go b/internal/querycoordv2/observers/target_observer.go index 87caa9e91c036..3bb1c31924a40 100644 --- a/internal/querycoordv2/observers/target_observer.go +++ b/internal/querycoordv2/observers/target_observer.go @@ -88,8 +88,12 @@ type TargetObserver struct { mut sync.Mutex // Guard readyNotifiers readyNotifiers map[int64][]chan struct{} // CollectionID -> Notifiers - dispatcher *taskDispatcher[int64] - keylocks *lock.KeyLock[int64] + // loadingDispatcher updates targets for collections that are loading (also collections without a current target). + loadingDispatcher *taskDispatcher[int64] + // loadedDispatcher updates targets for loaded collections. + loadedDispatcher *taskDispatcher[int64] + + keylocks *lock.KeyLock[int64] startOnce sync.Once stopOnce sync.Once @@ -117,8 +121,8 @@ func NewTargetObserver( keylocks: lock.NewKeyLock[int64](), } - dispatcher := newTaskDispatcher(result.check) - result.dispatcher = dispatcher + result.loadingDispatcher = newTaskDispatcher(result.check) + result.loadedDispatcher = newTaskDispatcher(result.check) return result } @@ -127,7 +131,8 @@ func (ob *TargetObserver) Start() { ctx, cancel := context.WithCancel(context.Background()) ob.cancel = cancel - ob.dispatcher.Start() + ob.loadingDispatcher.Start() + ob.loadedDispatcher.Start() ob.wg.Add(1) go func() { @@ -147,7 +152,8 @@ func (ob *TargetObserver) Stop() { } ob.wg.Wait() - ob.dispatcher.Stop() + ob.loadingDispatcher.Stop() + ob.loadedDispatcher.Stop() }) } @@ -170,7 +176,13 @@ func (ob *TargetObserver) schedule(ctx context.Context) { case <-ticker.C: ob.clean() - ob.dispatcher.AddTask(ob.meta.GetAll()...) + loaded := lo.FilterMap(ob.meta.GetAllCollections(), func(collection *meta.Collection, _ int) (int64, bool) { + if collection.GetStatus() == querypb.LoadStatus_Loaded { + return collection.GetCollectionID(), true + } + return 0, false + }) + ob.loadedDispatcher.AddTask(loaded...) case req := <-ob.updateChan: log.Info("manually trigger update target", @@ -220,13 +232,13 @@ func (ob *TargetObserver) schedule(ctx context.Context) { func (ob *TargetObserver) Check(ctx context.Context, collectionID int64, partitionID int64) bool { result := ob.targetMgr.IsCurrentTargetExist(collectionID, partitionID) if !result { - ob.dispatcher.AddTask(collectionID) + ob.loadingDispatcher.AddTask(collectionID) } return result } func (ob *TargetObserver) TriggerUpdateCurrentTarget(collectionID int64) { - ob.dispatcher.AddTask(collectionID) + ob.loadingDispatcher.AddTask(collectionID) } func (ob *TargetObserver) check(ctx context.Context, collectionID int64) { diff --git a/internal/querycoordv2/observers/target_observer_test.go b/internal/querycoordv2/observers/target_observer_test.go index dc269264d8a82..dcbd8ec5f247e 100644 --- a/internal/querycoordv2/observers/target_observer_test.go +++ b/internal/querycoordv2/observers/target_observer_test.go @@ -98,7 +98,9 @@ func (suite *TargetObserverSuite) SetupTest() { suite.collectionID = int64(1000) suite.partitionID = int64(100) - err = suite.meta.CollectionManager.PutCollection(utils.CreateTestCollection(suite.collectionID, 1)) + testCollection := utils.CreateTestCollection(suite.collectionID, 1) + testCollection.Status = querypb.LoadStatus_Loaded + err = suite.meta.CollectionManager.PutCollection(testCollection) suite.NoError(err) err = suite.meta.CollectionManager.PutPartition(utils.CreateTestPartition(suite.collectionID, suite.partitionID)) suite.NoError(err) @@ -318,7 +320,8 @@ func (suite *TargetObserverCheckSuite) SetupTest() { func (s *TargetObserverCheckSuite) TestCheck() { r := s.observer.Check(context.Background(), s.collectionID, common.AllPartitionsID) s.False(r) - s.True(s.observer.dispatcher.tasks.Contain(s.collectionID)) + s.False(s.observer.loadedDispatcher.tasks.Contain(s.collectionID)) + s.True(s.observer.loadingDispatcher.tasks.Contain(s.collectionID)) } func TestTargetObserver(t *testing.T) {