From bf962a04feb52ba52f4ac8f8f96cced645a79733 Mon Sep 17 00:00:00 2001 From: wei liu Date: Tue, 12 Nov 2024 16:34:28 +0800 Subject: [PATCH] fix: Search may return less result after qn recover (#36549) issue: #36293 #36242 after qn recover, delegator may be loaded in new node, after all segment has been loaded, delegator becomes serviceable. but delegator's target version hasn't been synced, and if search/query comes, delegator will use wrong target version to filter out a empty segment list, which caused empty search result. This pr will block delegator's serviceable status until target version is synced --------- Signed-off-by: Wei Liu --- internal/querycoordv2/dist/dist_controller.go | 31 +++-- .../querycoordv2/dist/dist_controller_test.go | 3 +- internal/querycoordv2/dist/dist_handler.go | 50 +++++-- .../querycoordv2/dist/dist_handler_test.go | 12 +- internal/querycoordv2/job/job_test.go | 1 + .../querycoordv2/meta/leader_view_manager.go | 1 + internal/querycoordv2/meta/target_manager.go | 9 +- .../observers/collection_observer_test.go | 35 ++++- .../querycoordv2/observers/target_observer.go | 131 +++++++----------- .../observers/target_observer_test.go | 22 ++- internal/querycoordv2/ops_service_test.go | 1 + internal/querycoordv2/server.go | 25 ++-- internal/querycoordv2/server_test.go | 7 + internal/querycoordv2/services.go | 2 + internal/querycoordv2/services_test.go | 1 + internal/querycoordv2/utils/util.go | 4 +- internal/querycoordv2/utils/util_test.go | 57 +++++--- tests/integration/replicas/load/load_test.go | 27 +++- 18 files changed, 263 insertions(+), 156 deletions(-) diff --git a/internal/querycoordv2/dist/dist_controller.go b/internal/querycoordv2/dist/dist_controller.go index 521b6cfd95edf..687e16fe5cfed 100644 --- a/internal/querycoordv2/dist/dist_controller.go +++ b/internal/querycoordv2/dist/dist_controller.go @@ -36,13 +36,14 @@ type Controller interface { } type ControllerImpl struct { - mu sync.RWMutex - handlers map[int64]*distHandler - client session.Cluster - nodeManager *session.NodeManager - dist *meta.DistributionManager - targetMgr *meta.TargetManager - scheduler task.Scheduler + mu sync.RWMutex + handlers map[int64]*distHandler + client session.Cluster + nodeManager *session.NodeManager + dist *meta.DistributionManager + targetMgr meta.TargetManagerInterface + scheduler task.Scheduler + syncTargetVersionFn TriggerUpdateTargetVersion } func (dc *ControllerImpl) StartDistInstance(ctx context.Context, nodeID int64) { @@ -52,7 +53,7 @@ func (dc *ControllerImpl) StartDistInstance(ctx context.Context, nodeID int64) { log.Info("node has started", zap.Int64("nodeID", nodeID)) return } - h := newDistHandler(ctx, nodeID, dc.client, dc.nodeManager, dc.scheduler, dc.dist, dc.targetMgr) + h := newDistHandler(ctx, nodeID, dc.client, dc.nodeManager, dc.scheduler, dc.dist, dc.targetMgr, dc.syncTargetVersionFn) dc.handlers[nodeID] = h } @@ -100,13 +101,15 @@ func NewDistController( dist *meta.DistributionManager, targetMgr *meta.TargetManager, scheduler task.Scheduler, + syncTargetVersionFn TriggerUpdateTargetVersion, ) *ControllerImpl { return &ControllerImpl{ - handlers: make(map[int64]*distHandler), - client: client, - nodeManager: nodeManager, - dist: dist, - targetMgr: targetMgr, - scheduler: scheduler, + handlers: make(map[int64]*distHandler), + client: client, + nodeManager: nodeManager, + dist: dist, + targetMgr: targetMgr, + scheduler: scheduler, + syncTargetVersionFn: syncTargetVersionFn, } } diff --git a/internal/querycoordv2/dist/dist_controller_test.go b/internal/querycoordv2/dist/dist_controller_test.go index 9929962039ef9..702a8608977fb 100644 --- a/internal/querycoordv2/dist/dist_controller_test.go +++ b/internal/querycoordv2/dist/dist_controller_test.go @@ -81,7 +81,8 @@ func (suite *DistControllerTestSuite) SetupTest() { targetManager := meta.NewTargetManager(suite.broker, suite.meta) suite.mockScheduler = task.NewMockScheduler(suite.T()) suite.mockScheduler.EXPECT().GetExecutedFlag(mock.Anything).Return(nil).Maybe() - suite.controller = NewDistController(suite.mockCluster, suite.nodeMgr, distManager, targetManager, suite.mockScheduler) + syncTargetVersionFn := func(collectionID int64) {} + suite.controller = NewDistController(suite.mockCluster, suite.nodeMgr, distManager, targetManager, suite.mockScheduler, syncTargetVersionFn) } func (suite *DistControllerTestSuite) TearDownSuite() { diff --git a/internal/querycoordv2/dist/dist_handler.go b/internal/querycoordv2/dist/dist_handler.go index 85c3c2fcdf684..4140fec8753c2 100644 --- a/internal/querycoordv2/dist/dist_handler.go +++ b/internal/querycoordv2/dist/dist_handler.go @@ -18,6 +18,7 @@ package dist import ( "context" + "fmt" "sync" "time" @@ -39,6 +40,8 @@ import ( "github.com/milvus-io/milvus/pkg/util/paramtable" ) +type TriggerUpdateTargetVersion = func(collectionID int64) + type distHandler struct { nodeID int64 c chan struct{} @@ -51,6 +54,8 @@ type distHandler struct { mu sync.Mutex stopOnce sync.Once lastUpdateTs int64 + + syncTargetVersionFn TriggerUpdateTargetVersion } func (dh *distHandler) start(ctx context.Context) { @@ -221,12 +226,35 @@ func (dh *distHandler) updateLeaderView(resp *querypb.GetDataDistributionRespons NumOfGrowingRows: lview.GetNumOfGrowingRows(), PartitionStatsVersions: lview.PartitionStatsVersions, } + updates = append(updates, view) + // check leader serviceable - // todo by weiliu1031: serviceable status should be maintained by delegator, to avoid heavy check here - if err := utils.CheckLeaderAvailable(dh.nodeManager, dh.target, view); err != nil { + if err := utils.CheckDelegatorDataReady(dh.nodeManager, dh.target, view, meta.CurrentTarget); err != nil { view.UnServiceableError = err + log.Info("leader is not available due to distribution not ready", + zap.Int64("collectionID", view.CollectionID), + zap.Int64("nodeID", view.ID), + zap.String("channel", view.Channel), + zap.Error(err)) + continue + } + + // if target version hasn't been synced, delegator will get empty readable segment list + // so shard leader should be unserviceable until target version is synced + currentTargetVersion := dh.target.GetCollectionTargetVersion(lview.GetCollection(), meta.CurrentTarget) + if lview.TargetVersion <= 0 { + err := merr.WrapErrServiceInternal(fmt.Sprintf("target version mismatch, collection: %d, channel: %s, current target version: %v, leader version: %v", + lview.GetCollection(), lview.GetChannel(), currentTargetVersion, lview.TargetVersion)) + + // segment and channel already loaded, trigger target observer to check target version + dh.syncTargetVersionFn(lview.GetCollection()) + view.UnServiceableError = err + log.Info("leader is not available due to target version not ready", + zap.Int64("collectionID", view.CollectionID), + zap.Int64("nodeID", view.ID), + zap.String("channel", view.Channel), + zap.Error(err)) } - updates = append(updates, view) } dh.dist.LeaderViewManager.Update(resp.GetNodeID(), updates...) @@ -272,15 +300,17 @@ func newDistHandler( scheduler task.Scheduler, dist *meta.DistributionManager, targetMgr meta.TargetManagerInterface, + syncTargetVersionFn TriggerUpdateTargetVersion, ) *distHandler { h := &distHandler{ - nodeID: nodeID, - c: make(chan struct{}), - client: client, - nodeManager: nodeManager, - scheduler: scheduler, - dist: dist, - target: targetMgr, + nodeID: nodeID, + c: make(chan struct{}), + client: client, + nodeManager: nodeManager, + scheduler: scheduler, + dist: dist, + target: targetMgr, + syncTargetVersionFn: syncTargetVersionFn, } h.wg.Add(1) go h.start(ctx) diff --git a/internal/querycoordv2/dist/dist_handler_test.go b/internal/querycoordv2/dist/dist_handler_test.go index fb9624b977a42..94de9c8a75cc1 100644 --- a/internal/querycoordv2/dist/dist_handler_test.go +++ b/internal/querycoordv2/dist/dist_handler_test.go @@ -66,6 +66,7 @@ func (suite *DistHandlerSuite) SetupSuite() { suite.scheduler.EXPECT().GetExecutedFlag(mock.Anything).Return(nil).Maybe() suite.target.EXPECT().GetSealedSegment(mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe() suite.target.EXPECT().GetDmChannel(mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe() + suite.target.EXPECT().GetCollectionTargetVersion(mock.Anything, mock.Anything).Return(1011).Maybe() } func (suite *DistHandlerSuite) TestBasic() { @@ -97,14 +98,16 @@ func (suite *DistHandlerSuite) TestBasic() { LeaderViews: []*querypb.LeaderView{ { - Collection: 1, - Channel: "test-channel-1", + Collection: 1, + Channel: "test-channel-1", + TargetVersion: 1011, }, }, LastModifyTs: 1, }, nil) - suite.handler = newDistHandler(suite.ctx, suite.nodeID, suite.client, suite.nodeManager, suite.scheduler, suite.dist, suite.target) + syncTargetVersionFn := func(collectionID int64) {} + suite.handler = newDistHandler(suite.ctx, suite.nodeID, suite.client, suite.nodeManager, suite.scheduler, suite.dist, suite.target, syncTargetVersionFn) defer suite.handler.stop() time.Sleep(10 * time.Second) @@ -119,7 +122,8 @@ func (suite *DistHandlerSuite) TestGetDistributionFailed() { })) suite.client.EXPECT().GetDataDistribution(mock.Anything, mock.Anything, mock.Anything).Return(nil, errors.New("fake error")) - suite.handler = newDistHandler(suite.ctx, suite.nodeID, suite.client, suite.nodeManager, suite.scheduler, suite.dist, suite.target) + syncTargetVersionFn := func(collectionID int64) {} + suite.handler = newDistHandler(suite.ctx, suite.nodeID, suite.client, suite.nodeManager, suite.scheduler, suite.dist, suite.target, syncTargetVersionFn) defer suite.handler.stop() time.Sleep(10 * time.Second) diff --git a/internal/querycoordv2/job/job_test.go b/internal/querycoordv2/job/job_test.go index b3d0f58d62ed5..ff10f5003bc00 100644 --- a/internal/querycoordv2/job/job_test.go +++ b/internal/querycoordv2/job/job_test.go @@ -171,6 +171,7 @@ func (suite *JobSuite) SetupTest() { suite.dist, suite.broker, suite.cluster, + suite.nodeMgr, ) suite.targetObserver.Start() suite.scheduler = NewScheduler() diff --git a/internal/querycoordv2/meta/leader_view_manager.go b/internal/querycoordv2/meta/leader_view_manager.go index fdb56671e5a73..412a5a3c829a0 100644 --- a/internal/querycoordv2/meta/leader_view_manager.go +++ b/internal/querycoordv2/meta/leader_view_manager.go @@ -149,6 +149,7 @@ func (view *LeaderView) Clone() *LeaderView { TargetVersion: view.TargetVersion, NumOfGrowingRows: view.NumOfGrowingRows, PartitionStatsVersions: view.PartitionStatsVersions, + UnServiceableError: view.UnServiceableError, } } diff --git a/internal/querycoordv2/meta/target_manager.go b/internal/querycoordv2/meta/target_manager.go index 6f05d4c96e004..9c74792380778 100644 --- a/internal/querycoordv2/meta/target_manager.go +++ b/internal/querycoordv2/meta/target_manager.go @@ -157,7 +157,6 @@ func (mgr *TargetManager) UpdateCollectionNextTarget(collectionID int64) error { partitionIDs := lo.Map(partitions, func(partition *Partition, i int) int64 { return partition.PartitionID }) - allocatedTarget := NewCollectionTarget(nil, nil, partitionIDs) channelInfos := make(map[string][]*datapb.VchannelInfo) segments := make(map[int64]*datapb.SegmentInfo, 0) @@ -192,12 +191,11 @@ func (mgr *TargetManager) UpdateCollectionNextTarget(collectionID int64) error { return nil } - mgr.next.updateCollectionTarget(collectionID, NewCollectionTarget(segments, dmChannels, partitionIDs)) + allocatedTarget := NewCollectionTarget(segments, dmChannels, partitionIDs) + mgr.next.updateCollectionTarget(collectionID, allocatedTarget) log.Debug("finish to update next targets for collection", zap.Int64("collectionID", collectionID), - zap.Int64s("PartitionIDs", partitionIDs), - zap.Int64s("segments", allocatedTarget.GetAllSegmentIDs()), - zap.Strings("channels", allocatedTarget.GetAllDmChannelNames())) + zap.Int64s("PartitionIDs", partitionIDs)) return nil } @@ -604,6 +602,7 @@ func (mgr *TargetManager) Recover(catalog metastore.QueryCoordCatalog) error { zap.Int64("collectionID", t.GetCollectionID()), zap.Strings("channels", newTarget.GetAllDmChannelNames()), zap.Int("segmentNum", len(newTarget.GetAllSegmentIDs())), + zap.Int64("version", newTarget.GetTargetVersion()), ) // clear target info in meta store diff --git a/internal/querycoordv2/observers/collection_observer_test.go b/internal/querycoordv2/observers/collection_observer_test.go index 97c4dbe1d0656..3293103c855df 100644 --- a/internal/querycoordv2/observers/collection_observer_test.go +++ b/internal/querycoordv2/observers/collection_observer_test.go @@ -71,6 +71,8 @@ type CollectionObserverSuite struct { targetObserver *TargetObserver checkerController *checkers.CheckerController + nodeMgr *session.NodeManager + // Test object ob *CollectionObserver } @@ -191,8 +193,8 @@ func (suite *CollectionObserverSuite) SetupTest() { // Dependencies suite.dist = meta.NewDistributionManager() - nodeMgr := session.NewNodeManager() - suite.meta = meta.NewMeta(suite.idAllocator, suite.store, nodeMgr) + suite.nodeMgr = session.NewNodeManager() + suite.meta = meta.NewMeta(suite.idAllocator, suite.store, suite.nodeMgr) suite.broker = meta.NewMockBroker(suite.T()) suite.targetMgr = meta.NewTargetManager(suite.broker, suite.meta) suite.cluster = session.NewMockCluster(suite.T()) @@ -201,6 +203,7 @@ func (suite *CollectionObserverSuite) SetupTest() { suite.dist, suite.broker, suite.cluster, + suite.nodeMgr, ) suite.checkerController = &checkers.CheckerController{} @@ -223,6 +226,16 @@ func (suite *CollectionObserverSuite) SetupTest() { suite.targetObserver.Start() suite.ob.Start() suite.loadAll() + + suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: 1, + })) + suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: 2, + })) + suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: 3, + })) } func (suite *CollectionObserverSuite) TearDownTest() { @@ -248,12 +261,19 @@ func (suite *CollectionObserverSuite) TestObserve() { Channel: "100-dmc0", Segments: map[int64]*querypb.SegmentDist{1: {NodeID: 1, Version: 0}}, }) + view := &meta.LeaderView{ + ID: 2, + CollectionID: 103, + Channel: "103-dmc0", + Segments: make(map[int64]*querypb.SegmentDist), + } suite.dist.LeaderViewManager.Update(2, &meta.LeaderView{ ID: 2, CollectionID: 100, Channel: "100-dmc1", Segments: map[int64]*querypb.SegmentDist{2: {NodeID: 2, Version: 0}}, - }) + }, view) + view1 := &meta.LeaderView{ ID: 3, CollectionID: 102, @@ -265,7 +285,7 @@ func (suite *CollectionObserverSuite) TestObserve() { suite.True(ok) view2 := &meta.LeaderView{ ID: 3, - CollectionID: 13, + CollectionID: 103, Channel: "103-dmc0", Segments: make(map[int64]*querypb.SegmentDist), } @@ -273,9 +293,16 @@ func (suite *CollectionObserverSuite) TestObserve() { view2.Segments[segment.GetID()] = &querypb.SegmentDist{ NodeID: 3, Version: 0, } + view.Segments[segment.GetID()] = &querypb.SegmentDist{ + NodeID: 2, Version: 0, + } } suite.dist.LeaderViewManager.Update(3, view1, view2) + suite.broker.EXPECT().DescribeCollection(mock.Anything, mock.Anything).Return(nil, nil).Maybe() + suite.broker.EXPECT().ListIndexes(mock.Anything, mock.Anything).Return(nil, nil).Maybe() + suite.cluster.EXPECT().SyncDistribution(mock.Anything, mock.Anything, mock.Anything).Return(merr.Success(), nil).Maybe() + suite.Eventually(func() bool { return suite.isCollectionLoadedContinue(suite.collections[2], time) }, timeout-1, timeout/10) diff --git a/internal/querycoordv2/observers/target_observer.go b/internal/querycoordv2/observers/target_observer.go index 3eef7d577c167..77595ebdf776c 100644 --- a/internal/querycoordv2/observers/target_observer.go +++ b/internal/querycoordv2/observers/target_observer.go @@ -79,6 +79,7 @@ type TargetObserver struct { distMgr *meta.DistributionManager broker meta.Broker cluster session.Cluster + nodeMgr *session.NodeManager initChan chan initRequest // nextTargetLastUpdate map[int64]time.Time @@ -100,6 +101,7 @@ func NewTargetObserver( distMgr *meta.DistributionManager, broker meta.Broker, cluster session.Cluster, + nodeMgr *session.NodeManager, ) *TargetObserver { result := &TargetObserver{ meta: meta, @@ -107,6 +109,7 @@ func NewTargetObserver( distMgr: distMgr, broker: broker, cluster: cluster, + nodeMgr: nodeMgr, nextTargetLastUpdate: typeutil.NewConcurrentMap[int64, time.Time](), updateChan: make(chan targetUpdateRequest, 10), readyNotifiers: make(map[int64][]chan struct{}), @@ -222,6 +225,10 @@ func (ob *TargetObserver) Check(ctx context.Context, collectionID int64, partiti return result } +func (ob *TargetObserver) TriggerUpdateCurrentTarget(collectionID int64) { + ob.dispatcher.AddTask(collectionID) +} + func (ob *TargetObserver) check(ctx context.Context, collectionID int64) { ob.keylocks.Lock(collectionID) defer ob.keylocks.Unlock(collectionID) @@ -362,89 +369,66 @@ func (ob *TargetObserver) shouldUpdateCurrentTarget(ctx context.Context, collect return false } - for _, channel := range channelNames { - views := ob.distMgr.LeaderViewManager.GetByFilter(meta.WithChannelName2LeaderView(channel.GetChannelName())) - nodes := lo.Map(views, func(v *meta.LeaderView, _ int) int64 { return v.ID }) - group := utils.GroupNodesByReplica(ob.meta.ReplicaManager, collectionID, nodes) - if int32(len(group)) < replicaNum { - log.RatedInfo(10, "channel not ready", - zap.Int("readyReplicaNum", len(group)), - zap.String("channelName", channel.GetChannelName()), - ) - return false - } - } + collectionReadyLeaders := make([]*meta.LeaderView, 0) + for channel := range channelNames { + channelReadyLeaders := lo.Filter(ob.distMgr.LeaderViewManager.GetByFilter(meta.WithChannelName2LeaderView(channel)), func(leader *meta.LeaderView, _ int) bool { + return utils.CheckDelegatorDataReady(ob.nodeMgr, ob.targetMgr, leader, meta.NextTarget) == nil + }) + collectionReadyLeaders = append(collectionReadyLeaders, channelReadyLeaders...) - // and last check historical segment - SealedSegments := ob.targetMgr.GetSealedSegmentsByCollection(collectionID, meta.NextTarget) - for _, segment := range SealedSegments { - views := ob.distMgr.LeaderViewManager.GetByFilter(meta.WithSegment2LeaderView(segment.GetID(), false)) - nodes := lo.Map(views, func(view *meta.LeaderView, _ int) int64 { return view.ID }) + nodes := lo.Map(channelReadyLeaders, func(view *meta.LeaderView, _ int) int64 { return view.ID }) group := utils.GroupNodesByReplica(ob.meta.ReplicaManager, collectionID, nodes) if int32(len(group)) < replicaNum { - log.RatedInfo(10, "segment not ready", - zap.Int("readyReplicaNum", len(group)), - zap.Int64("segmentID", segment.GetID()), + log.RatedInfo(10, "channel not ready", + zap.Int("readyReplicaNum", len(channelReadyLeaders)), + zap.String("channelName", channel), ) return false } } - replicas := ob.meta.ReplicaManager.GetByCollection(collectionID) - actions := make([]*querypb.SyncAction, 0, 1) var collectionInfo *milvuspb.DescribeCollectionResponse var partitions []int64 var indexInfo []*indexpb.IndexInfo var err error - for _, replica := range replicas { - leaders := ob.distMgr.ChannelDistManager.GetShardLeadersByReplica(replica) - for ch, leaderID := range leaders { - actions = actions[:0] - leaderView := ob.distMgr.LeaderViewManager.GetLeaderShardView(leaderID, ch) - if leaderView == nil { - log.RatedInfo(10, "leader view not ready", - zap.Int64("nodeID", leaderID), - zap.String("channel", ch), - ) - continue - } - updateVersionAction := ob.checkNeedUpdateTargetVersion(ctx, leaderView) - if updateVersionAction != nil { - actions = append(actions, updateVersionAction) - } + newVersion := ob.targetMgr.GetCollectionTargetVersion(collectionID, meta.NextTarget) + for _, leader := range collectionReadyLeaders { + updateVersionAction := ob.checkNeedUpdateTargetVersion(ctx, leader, newVersion) + if updateVersionAction == nil { + continue + } + replica := ob.meta.ReplicaManager.GetByCollectionAndNode(collectionID, leader.ID) + if replica == nil { + log.Warn("replica not found", zap.Int64("nodeID", leader.ID), zap.Int64("collectionID", collectionID)) + continue + } - if len(actions) == 0 { - continue + // init all the meta information + if collectionInfo == nil { + collectionInfo, err = ob.broker.DescribeCollection(ctx, collectionID) + if err != nil { + log.Warn("failed to get collection info", zap.Error(err)) + return false } - // init all the meta information - if collectionInfo == nil { - collectionInfo, err = ob.broker.DescribeCollection(ctx, collectionID) - if err != nil { - log.Warn("failed to get collection info", zap.Error(err)) - return false - } - - partitions, err = utils.GetPartitions(ob.meta.CollectionManager, collectionID) - if err != nil { - log.Warn("failed to get partitions", zap.Error(err)) - return false - } - - // Get collection index info - indexInfo, err = ob.broker.ListIndexes(ctx, collectionID) - if err != nil { - log.Warn("fail to get index info of collection", zap.Error(err)) - return false - } + partitions, err = utils.GetPartitions(ob.meta.CollectionManager, collectionID) + if err != nil { + log.Warn("failed to get partitions", zap.Error(err)) + return false } - if !ob.sync(ctx, replica, leaderView, actions, collectionInfo, partitions, indexInfo) { + // Get collection index info + indexInfo, err = ob.broker.ListIndexes(ctx, collectionID) + if err != nil { + log.Warn("fail to get index info of collection", zap.Error(err)) return false } } - } + if !ob.sync(ctx, replica, leader, []*querypb.SyncAction{updateVersionAction}, collectionInfo, partitions, indexInfo) { + return false + } + } return true } @@ -497,29 +481,8 @@ func (ob *TargetObserver) sync(ctx context.Context, replica *meta.Replica, leade return true } -func (ob *TargetObserver) checkCollectionLeaderVersionIsCurrent(ctx context.Context, collectionID int64) bool { - replicas := ob.meta.ReplicaManager.GetByCollection(collectionID) - for _, replica := range replicas { - leaders := ob.distMgr.ChannelDistManager.GetShardLeadersByReplica(replica) - for ch, leaderID := range leaders { - leaderView := ob.distMgr.LeaderViewManager.GetLeaderShardView(leaderID, ch) - if leaderView == nil { - return false - } - - action := ob.checkNeedUpdateTargetVersion(ctx, leaderView) - if action != nil { - return false - } - } - } - return true -} - -func (ob *TargetObserver) checkNeedUpdateTargetVersion(ctx context.Context, leaderView *meta.LeaderView) *querypb.SyncAction { +func (ob *TargetObserver) checkNeedUpdateTargetVersion(ctx context.Context, leaderView *meta.LeaderView, targetVersion int64) *querypb.SyncAction { log.Ctx(ctx).WithRateGroup("qcv2.LeaderObserver", 1, 60) - targetVersion := ob.targetMgr.GetCollectionTargetVersion(leaderView.CollectionID, meta.NextTarget) - if targetVersion <= leaderView.TargetVersion { return nil } diff --git a/internal/querycoordv2/observers/target_observer_test.go b/internal/querycoordv2/observers/target_observer_test.go index 45d804b0f897e..dc269264d8a82 100644 --- a/internal/querycoordv2/observers/target_observer_test.go +++ b/internal/querycoordv2/observers/target_observer_test.go @@ -36,6 +36,7 @@ import ( "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/kv" "github.com/milvus-io/milvus/pkg/util/etcd" + "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/paramtable" ) @@ -78,15 +79,22 @@ func (suite *TargetObserverSuite) SetupTest() { suite.kv = etcdkv.NewEtcdKV(cli, config.MetaRootPath.GetValue()) // meta + nodeMgr := session.NewNodeManager() + nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: 1, + })) + nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: 2, + })) store := querycoord.NewCatalog(suite.kv) idAllocator := RandomIncrementIDAllocator() - suite.meta = meta.NewMeta(idAllocator, store, session.NewNodeManager()) + suite.meta = meta.NewMeta(idAllocator, store, nodeMgr) suite.broker = meta.NewMockBroker(suite.T()) suite.targetMgr = meta.NewTargetManager(suite.broker, suite.meta) suite.distMgr = meta.NewDistributionManager() suite.cluster = session.NewMockCluster(suite.T()) - suite.observer = NewTargetObserver(suite.meta, suite.targetMgr, suite.distMgr, suite.broker, suite.cluster) + suite.observer = NewTargetObserver(suite.meta, suite.targetMgr, suite.distMgr, suite.broker, suite.cluster, nodeMgr) suite.collectionID = int64(1000) suite.partitionID = int64(100) @@ -125,6 +133,7 @@ func (suite *TargetObserverSuite) SetupTest() { } suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, mock.Anything).Return(suite.nextTargetChannels, suite.nextTargetSegments, nil) + suite.broker.EXPECT().DescribeCollection(mock.Anything, mock.Anything).Return(nil, nil).Maybe() suite.observer.Start() } @@ -171,6 +180,7 @@ func (suite *TargetObserverSuite) TestTriggerUpdateTarget() { suite.broker.EXPECT(). GetRecoveryInfoV2(mock.Anything, mock.Anything). Return(suite.nextTargetChannels, suite.nextTargetSegments, nil) + suite.Eventually(func() bool { return len(suite.targetMgr.GetSealedSegmentsByCollection(suite.collectionID, meta.NextTarget)) == 3 && len(suite.targetMgr.GetDmChannelsByCollection(suite.collectionID, meta.NextTarget)) == 2 @@ -201,6 +211,10 @@ func (suite *TargetObserverSuite) TestTriggerUpdateTarget() { }, ) + suite.broker.EXPECT().DescribeCollection(mock.Anything, mock.Anything).Return(nil, nil).Maybe() + suite.broker.EXPECT().ListIndexes(mock.Anything, mock.Anything).Return(nil, nil).Maybe() + suite.cluster.EXPECT().SyncDistribution(mock.Anything, mock.Anything, mock.Anything).Return(merr.Success(), nil).Maybe() + // Able to update current if it's not empty suite.Eventually(func() bool { isReady := false @@ -272,7 +286,8 @@ func (suite *TargetObserverCheckSuite) SetupTest() { // meta store := querycoord.NewCatalog(suite.kv) idAllocator := RandomIncrementIDAllocator() - suite.meta = meta.NewMeta(idAllocator, store, session.NewNodeManager()) + nodeMgr := session.NewNodeManager() + suite.meta = meta.NewMeta(idAllocator, store, nodeMgr) suite.broker = meta.NewMockBroker(suite.T()) suite.targetMgr = meta.NewTargetManager(suite.broker, suite.meta) @@ -284,6 +299,7 @@ func (suite *TargetObserverCheckSuite) SetupTest() { suite.distMgr, suite.broker, suite.cluster, + nodeMgr, ) suite.collectionID = int64(1000) suite.partitionID = int64(100) diff --git a/internal/querycoordv2/ops_service_test.go b/internal/querycoordv2/ops_service_test.go index f670c6f6535c3..2eb4a1c34ae1f 100644 --- a/internal/querycoordv2/ops_service_test.go +++ b/internal/querycoordv2/ops_service_test.go @@ -109,6 +109,7 @@ func (suite *OpsServiceSuite) SetupTest() { suite.dist, suite.broker, suite.cluster, + suite.nodeMgr, ) suite.cluster = session.NewMockCluster(suite.T()) suite.jobScheduler = job.NewScheduler() diff --git a/internal/querycoordv2/server.go b/internal/querycoordv2/server.go index eff909f1a4879..cbd6527d02cb6 100644 --- a/internal/querycoordv2/server.go +++ b/internal/querycoordv2/server.go @@ -284,16 +284,6 @@ func (s *Server) initQueryCoord() error { s.proxyWatcher.DelSessionFunc(s.proxyClientManager.DelProxyClient) log.Info("init proxy manager done") - // Init heartbeat - log.Info("init dist controller") - s.distController = dist.NewDistController( - s.cluster, - s.nodeMgr, - s.dist, - s.targetMgr, - s.taskScheduler, - ) - // Init checker controller log.Info("init checker controller") s.getBalancerFunc = func() balance.Balance { @@ -339,6 +329,20 @@ func (s *Server) initQueryCoord() error { // Init observers s.initObserver() + // Init heartbeat + syncTargetVersionFn := func(collectionID int64) { + s.targetObserver.TriggerUpdateCurrentTarget(collectionID) + } + log.Info("init dist controller") + s.distController = dist.NewDistController( + s.cluster, + s.nodeMgr, + s.dist, + s.targetMgr, + s.taskScheduler, + syncTargetVersionFn, + ) + // Init load status cache meta.GlobalFailedLoadCache = meta.NewFailedLoadCache() @@ -407,6 +411,7 @@ func (s *Server) initObserver() { s.dist, s.broker, s.cluster, + s.nodeMgr, ) s.collectionObserver = observers.NewCollectionObserver( s.dist, diff --git a/internal/querycoordv2/server_test.go b/internal/querycoordv2/server_test.go index 570482982cbd4..70f5892e33e36 100644 --- a/internal/querycoordv2/server_test.go +++ b/internal/querycoordv2/server_test.go @@ -45,6 +45,7 @@ import ( "github.com/milvus-io/milvus/internal/querycoordv2/session" "github.com/milvus-io/milvus/internal/querycoordv2/task" "github.com/milvus-io/milvus/internal/util/sessionutil" + "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/commonpbutil" "github.com/milvus-io/milvus/pkg/util/etcd" @@ -560,12 +561,17 @@ func (suite *ServerSuite) hackServer() { suite.server.cluster, suite.server.nodeMgr, ) + + syncTargetVersionFn := func(collectionID int64) { + suite.server.targetObserver.Check(context.Background(), collectionID, common.AllPartitionsID) + } suite.server.distController = dist.NewDistController( suite.server.cluster, suite.server.nodeMgr, suite.server.dist, suite.server.targetMgr, suite.server.taskScheduler, + syncTargetVersionFn, ) suite.server.checkerController = checkers.NewCheckerController( suite.server.meta, @@ -582,6 +588,7 @@ func (suite *ServerSuite) hackServer() { suite.server.dist, suite.broker, suite.server.cluster, + suite.server.nodeMgr, ) suite.server.collectionObserver = observers.NewCollectionObserver( suite.server.dist, diff --git a/internal/querycoordv2/services.go b/internal/querycoordv2/services.go index 045f04ead15b0..f762d0681137b 100644 --- a/internal/querycoordv2/services.go +++ b/internal/querycoordv2/services.go @@ -154,6 +154,7 @@ func (s *Server) ShowPartitions(ctx context.Context, req *querypb.ShowPartitions return partition.GetPartitionID() }) } + for _, partitionID := range partitions { percentage := s.meta.GetPartitionLoadPercentage(partitionID) if percentage < 0 { @@ -172,6 +173,7 @@ func (s *Server) ShowPartitions(ctx context.Context, req *querypb.ShowPartitions Status: merr.Status(err), }, nil } + percentages = append(percentages, int64(percentage)) } diff --git a/internal/querycoordv2/services_test.go b/internal/querycoordv2/services_test.go index 63c1896819c6e..d5be0f45323a9 100644 --- a/internal/querycoordv2/services_test.go +++ b/internal/querycoordv2/services_test.go @@ -157,6 +157,7 @@ func (suite *ServiceSuite) SetupTest() { suite.dist, suite.broker, suite.cluster, + suite.nodeMgr, ) suite.targetObserver.Start() for _, node := range suite.nodes { diff --git a/internal/querycoordv2/utils/util.go b/internal/querycoordv2/utils/util.go index bcb23138ba583..e37961c8bc0b4 100644 --- a/internal/querycoordv2/utils/util.go +++ b/internal/querycoordv2/utils/util.go @@ -45,7 +45,7 @@ func CheckNodeAvailable(nodeID int64, info *session.NodeInfo) error { // 2. All QueryNodes in the distribution are online // 3. The last heartbeat response time is within HeartbeatAvailableInterval for all QueryNodes(include leader) in the distribution // 4. All segments of the shard in target should be in the distribution -func CheckLeaderAvailable(nodeMgr *session.NodeManager, targetMgr meta.TargetManagerInterface, leader *meta.LeaderView) error { +func CheckDelegatorDataReady(nodeMgr *session.NodeManager, targetMgr meta.TargetManagerInterface, leader *meta.LeaderView, scope int32) error { log := log.Ctx(context.TODO()). WithRateGroup("utils.CheckLeaderAvailable", 1, 60). With(zap.Int64("leaderID", leader.ID)) @@ -68,7 +68,7 @@ func CheckLeaderAvailable(nodeMgr *session.NodeManager, targetMgr meta.TargetMan return err } } - segmentDist := targetMgr.GetSealedSegmentsByChannel(leader.CollectionID, leader.Channel, meta.CurrentTarget) + segmentDist := targetMgr.GetSealedSegmentsByChannel(leader.CollectionID, leader.Channel, scope) // Check whether segments are fully loaded for segmentID, info := range segmentDist { _, exist := leader.Segments[segmentID] diff --git a/internal/querycoordv2/utils/util_test.go b/internal/querycoordv2/utils/util_test.go index 9c414d352f1b1..f6f3d7fe285ab 100644 --- a/internal/querycoordv2/utils/util_test.go +++ b/internal/querycoordv2/utils/util_test.go @@ -52,9 +52,10 @@ func (suite *UtilTestSuite) setNodeAvailable(nodes ...int64) { func (suite *UtilTestSuite) TestCheckLeaderAvaliable() { leadview := &meta.LeaderView{ - ID: 1, - Channel: "test", - Segments: map[int64]*querypb.SegmentDist{2: {NodeID: 2}}, + ID: 1, + Channel: "test", + Segments: map[int64]*querypb.SegmentDist{2: {NodeID: 2}}, + TargetVersion: 1011, } mockTargetManager := meta.NewMockTargetManager(suite.T()) @@ -64,18 +65,20 @@ func (suite *UtilTestSuite) TestCheckLeaderAvaliable() { InsertChannel: "test", }, }).Maybe() + mockTargetManager.EXPECT().GetCollectionTargetVersion(mock.Anything, mock.Anything).Return(1011).Maybe() suite.setNodeAvailable(1, 2) - err := CheckLeaderAvailable(suite.nodeMgr, mockTargetManager, leadview) + err := CheckDelegatorDataReady(suite.nodeMgr, mockTargetManager, leadview, meta.CurrentTarget) suite.NoError(err) } func (suite *UtilTestSuite) TestCheckLeaderAvaliableFailed() { suite.Run("leader not available", func() { leadview := &meta.LeaderView{ - ID: 1, - Channel: "test", - Segments: map[int64]*querypb.SegmentDist{2: {NodeID: 2}}, + ID: 1, + Channel: "test", + Segments: map[int64]*querypb.SegmentDist{2: {NodeID: 2}}, + TargetVersion: 1011, } mockTargetManager := meta.NewMockTargetManager(suite.T()) mockTargetManager.EXPECT().GetSealedSegmentsByChannel(mock.Anything, mock.Anything, mock.Anything).Return(map[int64]*datapb.SegmentInfo{ @@ -84,18 +87,19 @@ func (suite *UtilTestSuite) TestCheckLeaderAvaliableFailed() { InsertChannel: "test", }, }).Maybe() + mockTargetManager.EXPECT().GetCollectionTargetVersion(mock.Anything, mock.Anything).Return(1011).Maybe() // leader nodeID=1 not available suite.setNodeAvailable(2) - err := CheckLeaderAvailable(suite.nodeMgr, mockTargetManager, leadview) + err := CheckDelegatorDataReady(suite.nodeMgr, mockTargetManager, leadview, meta.CurrentTarget) suite.Error(err) - suite.nodeMgr = session.NewNodeManager() }) suite.Run("shard worker not available", func() { leadview := &meta.LeaderView{ - ID: 1, - Channel: "test", - Segments: map[int64]*querypb.SegmentDist{2: {NodeID: 2}}, + ID: 11111, + Channel: "test", + Segments: map[int64]*querypb.SegmentDist{2: {NodeID: 2}}, + TargetVersion: 1011, } mockTargetManager := meta.NewMockTargetManager(suite.T()) @@ -105,14 +109,35 @@ func (suite *UtilTestSuite) TestCheckLeaderAvaliableFailed() { InsertChannel: "test", }, }).Maybe() + mockTargetManager.EXPECT().GetCollectionTargetVersion(mock.Anything, mock.Anything).Return(1011).Maybe() // leader nodeID=2 not available suite.setNodeAvailable(1) - err := CheckLeaderAvailable(suite.nodeMgr, mockTargetManager, leadview) + err := CheckDelegatorDataReady(suite.nodeMgr, mockTargetManager, leadview, meta.CurrentTarget) suite.Error(err) - suite.nodeMgr = session.NewNodeManager() }) suite.Run("segment lacks", func() { + leadview := &meta.LeaderView{ + ID: 1, + Channel: "test", + Segments: map[int64]*querypb.SegmentDist{2: {NodeID: 2}}, + TargetVersion: 1011, + } + mockTargetManager := meta.NewMockTargetManager(suite.T()) + mockTargetManager.EXPECT().GetSealedSegmentsByChannel(mock.Anything, mock.Anything, mock.Anything).Return(map[int64]*datapb.SegmentInfo{ + // target segmentID=1 not in leadView + 1: { + ID: 1, + InsertChannel: "test", + }, + }).Maybe() + mockTargetManager.EXPECT().GetCollectionTargetVersion(mock.Anything, mock.Anything).Return(1011).Maybe() + suite.setNodeAvailable(1, 2) + err := CheckDelegatorDataReady(suite.nodeMgr, mockTargetManager, leadview, meta.CurrentTarget) + suite.Error(err) + }) + + suite.Run("target version not synced", func() { leadview := &meta.LeaderView{ ID: 1, Channel: "test", @@ -126,10 +151,10 @@ func (suite *UtilTestSuite) TestCheckLeaderAvaliableFailed() { InsertChannel: "test", }, }).Maybe() + mockTargetManager.EXPECT().GetCollectionTargetVersion(mock.Anything, mock.Anything).Return(1011).Maybe() suite.setNodeAvailable(1, 2) - err := CheckLeaderAvailable(suite.nodeMgr, mockTargetManager, leadview) + err := CheckDelegatorDataReady(suite.nodeMgr, mockTargetManager, leadview, meta.CurrentTarget) suite.Error(err) - suite.nodeMgr = session.NewNodeManager() }) } diff --git a/tests/integration/replicas/load/load_test.go b/tests/integration/replicas/load/load_test.go index d91fa3df19104..ecbb918d84b92 100644 --- a/tests/integration/replicas/load/load_test.go +++ b/tests/integration/replicas/load/load_test.go @@ -760,7 +760,14 @@ func (s *LoadTestSuite) TestDynamicUpdateLoadConfigs_WithRGLackOfNode() { s.Len(resp.GetResourceGroups(), rgNum+2) // test load collection with dynamic update - s.loadCollection(collectionName, dbName, 3, rgs[:3]) + loadStatus, err := s.Cluster.Proxy.LoadCollection(ctx, &milvuspb.LoadCollectionRequest{ + DbName: dbName, + CollectionName: collectionName, + ReplicaNumber: int32(3), + ResourceGroups: rgs[:3], + }) + s.NoError(err) + s.True(merr.Ok(loadStatus)) s.Eventually(func() bool { resp3, err := s.Cluster.Proxy.GetReplicas(ctx, &milvuspb.GetReplicasRequest{ DbName: dbName, @@ -771,7 +778,14 @@ func (s *LoadTestSuite) TestDynamicUpdateLoadConfigs_WithRGLackOfNode() { return len(resp3.GetReplicas()) == 3 }, 30*time.Second, 1*time.Second) - s.loadCollection(collectionName, dbName, 2, rgs[3:]) + loadStatus, err = s.Cluster.Proxy.LoadCollection(ctx, &milvuspb.LoadCollectionRequest{ + DbName: dbName, + CollectionName: collectionName, + ReplicaNumber: int32(2), + ResourceGroups: rgs[3:], + }) + s.NoError(err) + s.True(merr.Ok(loadStatus)) s.Eventually(func() bool { resp3, err := s.Cluster.Proxy.GetReplicas(ctx, &milvuspb.GetReplicasRequest{ DbName: dbName, @@ -783,7 +797,14 @@ func (s *LoadTestSuite) TestDynamicUpdateLoadConfigs_WithRGLackOfNode() { }, 30*time.Second, 1*time.Second) // test load collection with dynamic update - s.loadCollection(collectionName, dbName, 5, rgs) + loadStatus, err = s.Cluster.Proxy.LoadCollection(ctx, &milvuspb.LoadCollectionRequest{ + DbName: dbName, + CollectionName: collectionName, + ReplicaNumber: int32(5), + ResourceGroups: rgs, + }) + s.NoError(err) + s.True(merr.Ok(loadStatus)) s.Eventually(func() bool { resp3, err := s.Cluster.Proxy.GetReplicas(ctx, &milvuspb.GetReplicasRequest{ DbName: dbName,