diff --git a/internal/querycoordv2/observers/collection_observer.go b/internal/querycoordv2/observers/collection_observer.go index 685074749690d..8186a23106025 100644 --- a/internal/querycoordv2/observers/collection_observer.go +++ b/internal/querycoordv2/observers/collection_observer.go @@ -226,7 +226,15 @@ func (ob *CollectionObserver) observePartitionLoadStatus(ctx context.Context, pa } ob.partitionLoadedCount[partition.GetPartitionID()] = loadedCount - if loadPercentage == 100 && ob.targetObserver.Check(ctx, partition.GetCollectionID()) && ob.leaderObserver.CheckTargetVersion(ctx, partition.GetCollectionID()) { + if loadPercentage == 100 { + if !ob.targetObserver.Check(ctx, partition.GetCollectionID()) { + log.Warn("failed to manual check current target, skip update load status") + return + } + if !ob.leaderObserver.CheckTargetVersion(ctx, partition.GetCollectionID()) { + log.Warn("failed to manual check leader target version ,skip update load status") + return + } delete(ob.partitionLoadedCount, partition.GetPartitionID()) } collectionPercentage, err := ob.meta.CollectionManager.UpdateLoadPercent(partition.PartitionID, loadPercentage) diff --git a/internal/querycoordv2/observers/leader_observer.go b/internal/querycoordv2/observers/leader_observer.go index 0e01477fbdfa9..5a4b95c18d7da 100644 --- a/internal/querycoordv2/observers/leader_observer.go +++ b/internal/querycoordv2/observers/leader_observer.go @@ -41,14 +41,15 @@ const ( // LeaderObserver is to sync the distribution with leader type LeaderObserver struct { - wg sync.WaitGroup - cancel context.CancelFunc - dist *meta.DistributionManager - meta *meta.Meta - target *meta.TargetManager - broker meta.Broker - cluster session.Cluster - manualCheck chan checkRequest + wg sync.WaitGroup + cancel context.CancelFunc + dist *meta.DistributionManager + meta *meta.Meta + target *meta.TargetManager + broker meta.Broker + cluster session.Cluster + + dispatcher *taskDispatcher[int64] stopOnce sync.Once } @@ -57,27 +58,12 @@ func (o *LeaderObserver) Start() { ctx, cancel := context.WithCancel(context.Background()) o.cancel = cancel + o.dispatcher.Start() + o.wg.Add(1) go func() { defer o.wg.Done() - ticker := time.NewTicker(interval) - defer ticker.Stop() - for { - select { - case <-ctx.Done(): - log.Info("stop leader observer") - return - - case req := <-o.manualCheck: - log.Info("triggering manual check") - ret := o.observeCollection(ctx, req.CollectionID) - req.Notifier <- ret - log.Info("manual check done", zap.Bool("result", ret)) - - case <-ticker.C: - o.observe(ctx) - } - } + o.schedule(ctx) }() } @@ -87,9 +73,26 @@ func (o *LeaderObserver) Stop() { o.cancel() } o.wg.Wait() + + o.dispatcher.Stop() }) } +func (o *LeaderObserver) schedule(ctx context.Context) { + ticker := time.NewTicker(interval) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + log.Info("stop leader observer") + return + + case <-ticker.C: + o.observe(ctx) + } + } +} + func (o *LeaderObserver) observe(ctx context.Context) { o.observeSegmentsDist(ctx) } @@ -105,14 +108,13 @@ func (o *LeaderObserver) observeSegmentsDist(ctx context.Context) { collectionIDs := o.meta.CollectionManager.GetAll() for _, cid := range collectionIDs { if o.readyToObserve(cid) { - o.observeCollection(ctx, cid) + o.dispatcher.AddTask(cid) } } } -func (o *LeaderObserver) observeCollection(ctx context.Context, collection int64) bool { +func (o *LeaderObserver) observeCollection(ctx context.Context, collection int64) { replicas := o.meta.ReplicaManager.GetByCollection(collection) - result := true for _, replica := range replicas { leaders := o.dist.ChannelDistManager.GetShardLeadersByReplica(replica) for ch, leaderID := range leaders { @@ -128,29 +130,42 @@ func (o *LeaderObserver) observeCollection(ctx context.Context, collection int64 if updateVersionAction != nil { actions = append(actions, updateVersionAction) } - success := o.sync(ctx, replica.GetID(), leaderView, actions) - if !success { - result = false - } + o.sync(ctx, replica.GetID(), leaderView, actions) } } - return result } -func (ob *LeaderObserver) CheckTargetVersion(ctx context.Context, collectionID int64) bool { - notifier := make(chan bool) - select { - case ob.manualCheck <- checkRequest{CollectionID: collectionID, Notifier: notifier}: - case <-ctx.Done(): +func (o *LeaderObserver) CheckTargetVersion(ctx context.Context, collectionID int64) bool { + // if not ready to observer, skip add task + if !o.readyToObserve(collectionID) { return false } - select { - case result := <-notifier: - return result - case <-ctx.Done(): - return false + result := o.checkCollectionLeaderVersionIsCurrent(ctx, collectionID) + if !result { + o.dispatcher.AddTask(collectionID) } + + return result +} + +func (o *LeaderObserver) checkCollectionLeaderVersionIsCurrent(ctx context.Context, collectionID int64) bool { + replicas := o.meta.ReplicaManager.GetByCollection(collectionID) + for _, replica := range replicas { + leaders := o.dist.ChannelDistManager.GetShardLeadersByReplica(replica) + for ch, leaderID := range leaders { + leaderView := o.dist.LeaderViewManager.GetLeaderShardView(leaderID, ch) + if leaderView == nil { + return false + } + + action := o.checkNeedUpdateTargetVersion(ctx, leaderView) + if action != nil { + return false + } + } + } + return true } func (o *LeaderObserver) checkNeedUpdateTargetVersion(ctx context.Context, leaderView *meta.LeaderView) *querypb.SyncAction { @@ -312,12 +327,16 @@ func NewLeaderObserver( broker meta.Broker, cluster session.Cluster, ) *LeaderObserver { - return &LeaderObserver{ - dist: dist, - meta: meta, - target: targetMgr, - broker: broker, - cluster: cluster, - manualCheck: make(chan checkRequest, 10), + ob := &LeaderObserver{ + dist: dist, + meta: meta, + target: targetMgr, + broker: broker, + cluster: cluster, } + + dispatcher := newTaskDispatcher[int64](ob.observeCollection) + ob.dispatcher = dispatcher + + return ob } diff --git a/internal/querycoordv2/observers/leader_observer_test.go b/internal/querycoordv2/observers/leader_observer_test.go index c2f1f771d2ece..3a2738f4ff1aa 100644 --- a/internal/querycoordv2/observers/leader_observer_test.go +++ b/internal/querycoordv2/observers/leader_observer_test.go @@ -591,44 +591,6 @@ func (suite *LeaderObserverTestSuite) TestSyncTargetVersion() { suite.Len(action.SealedInTarget, 1) } -func (suite *LeaderObserverTestSuite) TestCheckTargetVersion() { - collectionID := int64(1001) - observer := suite.observer - - suite.Run("check_channel_blocked", func() { - oldCh := observer.manualCheck - defer func() { - observer.manualCheck = oldCh - }() - - // zero-length channel - observer.manualCheck = make(chan checkRequest) - - ctx, cancel := context.WithCancel(context.Background()) - // cancel context, make test return fast - cancel() - - result := observer.CheckTargetVersion(ctx, collectionID) - suite.False(result) - }) - - suite.Run("check_return_ctx_timeout", func() { - oldCh := observer.manualCheck - defer func() { - observer.manualCheck = oldCh - }() - - // make channel length = 1, task received - observer.manualCheck = make(chan checkRequest, 1) - - ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*200) - defer cancel() - - result := observer.CheckTargetVersion(ctx, collectionID) - suite.False(result) - }) -} - func TestLeaderObserverSuite(t *testing.T) { suite.Run(t, new(LeaderObserverTestSuite)) } diff --git a/internal/querycoordv2/observers/target_observer.go b/internal/querycoordv2/observers/target_observer.go index d1f210fed083e..da3a7f9d17356 100644 --- a/internal/querycoordv2/observers/target_observer.go +++ b/internal/querycoordv2/observers/target_observer.go @@ -51,36 +51,48 @@ type TargetObserver struct { distMgr *meta.DistributionManager broker meta.Broker - initChan chan initRequest - manualCheck chan checkRequest - nextTargetLastUpdate map[int64]time.Time + initChan chan initRequest + manualCheck chan checkRequest + // nextTargetLastUpdate map[int64]time.Time + nextTargetLastUpdate *typeutil.ConcurrentMap[int64, time.Time] updateChan chan targetUpdateRequest mut sync.Mutex // Guard readyNotifiers readyNotifiers map[int64][]chan struct{} // CollectionID -> Notifiers + dispatcher *taskDispatcher[int64] + stopOnce sync.Once } func NewTargetObserver(meta *meta.Meta, targetMgr *meta.TargetManager, distMgr *meta.DistributionManager, broker meta.Broker) *TargetObserver { - return &TargetObserver{ + result := &TargetObserver{ meta: meta, targetMgr: targetMgr, distMgr: distMgr, broker: broker, manualCheck: make(chan checkRequest, 10), - nextTargetLastUpdate: make(map[int64]time.Time), + nextTargetLastUpdate: typeutil.NewConcurrentMap[int64, time.Time](), updateChan: make(chan targetUpdateRequest), readyNotifiers: make(map[int64][]chan struct{}), initChan: make(chan initRequest), } + + dispatcher := newTaskDispatcher(result.check) + result.dispatcher = dispatcher + return result } func (ob *TargetObserver) Start() { ctx, cancel := context.WithCancel(context.Background()) ob.cancel = cancel + ob.dispatcher.Start() + ob.wg.Add(1) - go ob.schedule(ctx) + go func() { + defer ob.wg.Done() + ob.schedule(ctx) + }() // after target observer start, update target for all collection ob.initChan <- initRequest{} @@ -92,11 +104,12 @@ func (ob *TargetObserver) Stop() { ob.cancel() } ob.wg.Wait() + + ob.dispatcher.Stop() }) } func (ob *TargetObserver) schedule(ctx context.Context) { - defer ob.wg.Done() log.Info("Start update next target loop") ticker := time.NewTicker(params.Params.QueryCoordCfg.UpdateNextTargetInterval.GetAsDuration(time.Second)) @@ -111,16 +124,11 @@ func (ob *TargetObserver) schedule(ctx context.Context) { for _, collectionID := range ob.meta.GetAll() { ob.init(collectionID) } + log.Info("target observer init done") case <-ticker.C: ob.clean() - for _, collectionID := range ob.meta.GetAll() { - ob.check(collectionID) - } - - case req := <-ob.manualCheck: - ob.check(req.CollectionID) - req.Notifier <- ob.targetMgr.IsCurrentTargetExist(req.CollectionID) + ob.dispatcher.AddTask(ob.meta.GetAll()...) case req := <-ob.updateChan: err := ob.updateNextTarget(req.CollectionID) @@ -137,26 +145,17 @@ func (ob *TargetObserver) schedule(ctx context.Context) { } } -// Check checks whether the next target is ready, -// and updates the current target if it is, -// returns true if current target is not nil +// Check whether provided collection is has current target. +// If not, submit a async task into dispatcher. func (ob *TargetObserver) Check(ctx context.Context, collectionID int64) bool { - notifier := make(chan bool) - select { - case ob.manualCheck <- checkRequest{CollectionID: collectionID, Notifier: notifier}: - case <-ctx.Done(): - return false - } - - select { - case result := <-notifier: - return result - case <-ctx.Done(): - return false + result := ob.targetMgr.IsCurrentTargetExist(collectionID) + if !result { + ob.dispatcher.AddTask(collectionID) } + return result } -func (ob *TargetObserver) check(collectionID int64) { +func (ob *TargetObserver) check(ctx context.Context, collectionID int64) { if !ob.meta.Exist(collectionID) { ob.ReleaseCollection(collectionID) ob.targetMgr.RemoveCollection(collectionID) @@ -215,11 +214,12 @@ func (ob *TargetObserver) ReleaseCollection(collectionID int64) { func (ob *TargetObserver) clean() { collectionSet := typeutil.NewUniqueSet(ob.meta.GetAll()...) // for collection which has been removed from target, try to clear nextTargetLastUpdate - for collection := range ob.nextTargetLastUpdate { - if !collectionSet.Contain(collection) { - delete(ob.nextTargetLastUpdate, collection) + ob.nextTargetLastUpdate.Range(func(collectionID int64, _ time.Time) bool { + if !collectionSet.Contain(collectionID) { + ob.nextTargetLastUpdate.Remove(collectionID) } - } + return true + }) ob.mut.Lock() defer ob.mut.Unlock() @@ -238,7 +238,11 @@ func (ob *TargetObserver) shouldUpdateNextTarget(collectionID int64) bool { } func (ob *TargetObserver) isNextTargetExpired(collectionID int64) bool { - return time.Since(ob.nextTargetLastUpdate[collectionID]) > params.Params.QueryCoordCfg.NextTargetSurviveTime.GetAsDuration(time.Second) + lastUpdated, has := ob.nextTargetLastUpdate.Get(collectionID) + if !has { + return true + } + return time.Since(lastUpdated) > params.Params.QueryCoordCfg.NextTargetSurviveTime.GetAsDuration(time.Second) } func (ob *TargetObserver) updateNextTarget(collectionID int64) error { @@ -256,7 +260,7 @@ func (ob *TargetObserver) updateNextTarget(collectionID int64) error { } func (ob *TargetObserver) updateNextTargetTimestamp(collectionID int64) { - ob.nextTargetLastUpdate[collectionID] = time.Now() + ob.nextTargetLastUpdate.Insert(collectionID, time.Now()) } func (ob *TargetObserver) shouldUpdateCurrentTarget(collectionID int64) bool { diff --git a/internal/querycoordv2/observers/target_observer_test.go b/internal/querycoordv2/observers/target_observer_test.go index 2fee0098b0bb0..66015a897bab0 100644 --- a/internal/querycoordv2/observers/target_observer_test.go +++ b/internal/querycoordv2/observers/target_observer_test.go @@ -273,41 +273,10 @@ func (suite *TargetObserverCheckSuite) SetupTest() { suite.NoError(err) } -func (suite *TargetObserverCheckSuite) TestCheckCtxDone() { - observer := suite.observer - - suite.Run("check_channel_blocked", func() { - oldCh := observer.manualCheck - defer func() { - observer.manualCheck = oldCh - }() - - // zero-length channel - observer.manualCheck = make(chan checkRequest) - - ctx, cancel := context.WithCancel(context.Background()) - // cancel context, make test return fast - cancel() - - result := observer.Check(ctx, suite.collectionID) - suite.False(result) - }) - - suite.Run("check_return_ctx_timeout", func() { - oldCh := observer.manualCheck - defer func() { - observer.manualCheck = oldCh - }() - - // make channel length = 1, task received - observer.manualCheck = make(chan checkRequest, 1) - - ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*200) - defer cancel() - - result := observer.Check(ctx, suite.collectionID) - suite.False(result) - }) +func (s *TargetObserverCheckSuite) TestCheck() { + r := s.observer.Check(context.Background(), s.collectionID) + s.False(r) + s.True(s.observer.dispatcher.tasks.Contain(s.collectionID)) } func TestTargetObserver(t *testing.T) { diff --git a/internal/querycoordv2/observers/task_dispatcher.go b/internal/querycoordv2/observers/task_dispatcher.go new file mode 100644 index 0000000000000..87a254c5f69fb --- /dev/null +++ b/internal/querycoordv2/observers/task_dispatcher.go @@ -0,0 +1,104 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package observers + +import ( + "context" + "sync" + + "github.com/milvus-io/milvus/pkg/util/conc" + "github.com/milvus-io/milvus/pkg/util/paramtable" + "github.com/milvus-io/milvus/pkg/util/typeutil" +) + +// taskDispatcher is the utility to provide task dedup and dispatch feature +type taskDispatcher[K comparable] struct { + tasks *typeutil.ConcurrentSet[K] + pool *conc.Pool[any] + notifyCh chan struct{} + taskRunner task[K] + wg sync.WaitGroup + cancel context.CancelFunc + stopOnce sync.Once +} + +type task[K comparable] func(context.Context, K) + +func newTaskDispatcher[K comparable](runner task[K]) *taskDispatcher[K] { + return &taskDispatcher[K]{ + tasks: typeutil.NewConcurrentSet[K](), + pool: conc.NewPool[any](paramtable.Get().QueryCoordCfg.ObserverTaskParallel.GetAsInt()), + notifyCh: make(chan struct{}, 1), + taskRunner: runner, + } +} + +func (d *taskDispatcher[K]) Start() { + ctx, cancel := context.WithCancel(context.Background()) + d.cancel = cancel + + d.wg.Add(1) + go func() { + defer d.wg.Done() + d.schedule(ctx) + }() +} + +func (d *taskDispatcher[K]) Stop() { + d.stopOnce.Do(func() { + if d.cancel != nil { + d.cancel() + } + d.wg.Wait() + }) +} + +func (d *taskDispatcher[K]) AddTask(keys ...K) { + var added bool + for _, key := range keys { + added = added || d.tasks.Insert(key) + } + if added { + d.notify() + } +} + +func (d *taskDispatcher[K]) notify() { + select { + case d.notifyCh <- struct{}{}: + default: + } +} + +func (d *taskDispatcher[K]) schedule(ctx context.Context) { + for { + select { + case <-ctx.Done(): + return + case <-d.notifyCh: + d.tasks.Range(func(k K) bool { + d.tasks.Insert(k) + d.pool.Submit(func() (any, error) { + d.taskRunner(ctx, k) + d.tasks.Remove(k) + return struct{}{}, nil + }) + return true + }) + } + } +} diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index 14082440d6713..e1b1f642355c3 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -1204,6 +1204,7 @@ type queryCoordConfig struct { CheckHealthRPCTimeout ParamItem `refreshable:"true"` BrokerTimeout ParamItem `refreshable:"false"` CollectionRecoverTimesLimit ParamItem `refreshable:"true"` + ObserverTaskParallel ParamItem `refreshable:"false"` } func (p *queryCoordConfig) init(base *BaseTable) { @@ -1515,6 +1516,16 @@ func (p *queryCoordConfig) init(base *BaseTable) { Export: true, } p.CollectionRecoverTimesLimit.Init(base.mgr) + + p.ObserverTaskParallel = ParamItem{ + Key: "queryCoord.observerTaskParallel", + Version: "2.3.2", + DefaultValue: "16", + PanicIfEmpty: true, + Doc: "the parallel observer dispatcher task number", + Export: true, + } + p.ObserverTaskParallel.Init(base.mgr) } // /////////////////////////////////////////////////////////////////////////////