Skip to content

Commit

Permalink
fix: Search may return less result after qn recover (milvus-io#36549)
Browse files Browse the repository at this point in the history
issue: milvus-io#36293 milvus-io#36242
after qn recover, delegator may be loaded in new node, after all segment
has been loaded, delegator becomes serviceable. but delegator's target
version hasn't been synced, and if search/query comes, delegator will
use wrong target version to filter out a empty segment list, which
caused empty search result.

This pr will block delegator's serviceable status until target version
is synced

---------

Signed-off-by: Wei Liu <[email protected]>
  • Loading branch information
weiliu1031 committed Nov 12, 2024
1 parent 25c9699 commit f8606d8
Show file tree
Hide file tree
Showing 18 changed files with 268 additions and 128 deletions.
31 changes: 17 additions & 14 deletions internal/querycoordv2/dist/dist_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,14 @@ type Controller interface {
}

type ControllerImpl struct {
mu sync.RWMutex
handlers map[int64]*distHandler
client session.Cluster
nodeManager *session.NodeManager
dist *meta.DistributionManager
targetMgr *meta.TargetManager
scheduler task.Scheduler
mu sync.RWMutex
handlers map[int64]*distHandler
client session.Cluster
nodeManager *session.NodeManager
dist *meta.DistributionManager
targetMgr meta.TargetManagerInterface
scheduler task.Scheduler
syncTargetVersionFn TriggerUpdateTargetVersion
}

func (dc *ControllerImpl) StartDistInstance(ctx context.Context, nodeID int64) {
Expand All @@ -52,7 +53,7 @@ func (dc *ControllerImpl) StartDistInstance(ctx context.Context, nodeID int64) {
log.Info("node has started", zap.Int64("nodeID", nodeID))
return
}
h := newDistHandler(ctx, nodeID, dc.client, dc.nodeManager, dc.scheduler, dc.dist, dc.targetMgr)
h := newDistHandler(ctx, nodeID, dc.client, dc.nodeManager, dc.scheduler, dc.dist, dc.targetMgr, dc.syncTargetVersionFn)
dc.handlers[nodeID] = h
}

Expand Down Expand Up @@ -100,13 +101,15 @@ func NewDistController(
dist *meta.DistributionManager,
targetMgr *meta.TargetManager,
scheduler task.Scheduler,
syncTargetVersionFn TriggerUpdateTargetVersion,
) *ControllerImpl {
return &ControllerImpl{
handlers: make(map[int64]*distHandler),
client: client,
nodeManager: nodeManager,
dist: dist,
targetMgr: targetMgr,
scheduler: scheduler,
handlers: make(map[int64]*distHandler),
client: client,
nodeManager: nodeManager,
dist: dist,
targetMgr: targetMgr,
scheduler: scheduler,
syncTargetVersionFn: syncTargetVersionFn,
}
}
3 changes: 2 additions & 1 deletion internal/querycoordv2/dist/dist_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ func (suite *DistControllerTestSuite) SetupTest() {
targetManager := meta.NewTargetManager(suite.broker, suite.meta)
suite.mockScheduler = task.NewMockScheduler(suite.T())
suite.mockScheduler.EXPECT().GetExecutedFlag(mock.Anything).Return(nil).Maybe()
suite.controller = NewDistController(suite.mockCluster, suite.nodeMgr, distManager, targetManager, suite.mockScheduler)
syncTargetVersionFn := func(collectionID int64) {}
suite.controller = NewDistController(suite.mockCluster, suite.nodeMgr, distManager, targetManager, suite.mockScheduler, syncTargetVersionFn)
}

func (suite *DistControllerTestSuite) TearDownSuite() {
Expand Down
50 changes: 40 additions & 10 deletions internal/querycoordv2/dist/dist_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package dist

import (
"context"
"fmt"
"sync"
"time"

Expand All @@ -39,6 +40,8 @@ import (
"github.com/milvus-io/milvus/pkg/util/paramtable"
)

type TriggerUpdateTargetVersion = func(collectionID int64)

type distHandler struct {
nodeID int64
c chan struct{}
Expand All @@ -51,6 +54,8 @@ type distHandler struct {
mu sync.Mutex
stopOnce sync.Once
lastUpdateTs int64

syncTargetVersionFn TriggerUpdateTargetVersion
}

func (dh *distHandler) start(ctx context.Context) {
Expand Down Expand Up @@ -221,12 +226,35 @@ func (dh *distHandler) updateLeaderView(resp *querypb.GetDataDistributionRespons
NumOfGrowingRows: lview.GetNumOfGrowingRows(),
PartitionStatsVersions: lview.PartitionStatsVersions,
}
updates = append(updates, view)

// check leader serviceable
// todo by weiliu1031: serviceable status should be maintained by delegator, to avoid heavy check here
if err := utils.CheckLeaderAvailable(dh.nodeManager, dh.target, view); err != nil {
if err := utils.CheckDelegatorDataReady(dh.nodeManager, dh.target, view, meta.CurrentTarget); err != nil {
view.UnServiceableError = err
log.Info("leader is not available due to distribution not ready",
zap.Int64("collectionID", view.CollectionID),
zap.Int64("nodeID", view.ID),
zap.String("channel", view.Channel),
zap.Error(err))
continue
}

// if target version hasn't been synced, delegator will get empty readable segment list
// so shard leader should be unserviceable until target version is synced
currentTargetVersion := dh.target.GetCollectionTargetVersion(lview.GetCollection(), meta.CurrentTarget)
if lview.TargetVersion <= 0 {
err := merr.WrapErrServiceInternal(fmt.Sprintf("target version mismatch, collection: %d, channel: %s, current target version: %v, leader version: %v",
lview.GetCollection(), lview.GetChannel(), currentTargetVersion, lview.TargetVersion))

// segment and channel already loaded, trigger target observer to check target version
dh.syncTargetVersionFn(lview.GetCollection())
view.UnServiceableError = err
log.Info("leader is not available due to target version not ready",
zap.Int64("collectionID", view.CollectionID),
zap.Int64("nodeID", view.ID),
zap.String("channel", view.Channel),
zap.Error(err))
}
updates = append(updates, view)
}

dh.dist.LeaderViewManager.Update(resp.GetNodeID(), updates...)
Expand Down Expand Up @@ -272,15 +300,17 @@ func newDistHandler(
scheduler task.Scheduler,
dist *meta.DistributionManager,
targetMgr meta.TargetManagerInterface,
syncTargetVersionFn TriggerUpdateTargetVersion,
) *distHandler {
h := &distHandler{
nodeID: nodeID,
c: make(chan struct{}),
client: client,
nodeManager: nodeManager,
scheduler: scheduler,
dist: dist,
target: targetMgr,
nodeID: nodeID,
c: make(chan struct{}),
client: client,
nodeManager: nodeManager,
scheduler: scheduler,
dist: dist,
target: targetMgr,
syncTargetVersionFn: syncTargetVersionFn,
}
h.wg.Add(1)
go h.start(ctx)
Expand Down
12 changes: 8 additions & 4 deletions internal/querycoordv2/dist/dist_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ func (suite *DistHandlerSuite) SetupSuite() {
suite.scheduler.EXPECT().GetExecutedFlag(mock.Anything).Return(nil).Maybe()
suite.target.EXPECT().GetSealedSegment(mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe()
suite.target.EXPECT().GetDmChannel(mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe()
suite.target.EXPECT().GetCollectionTargetVersion(mock.Anything, mock.Anything).Return(1011).Maybe()
}

func (suite *DistHandlerSuite) TestBasic() {
Expand Down Expand Up @@ -97,14 +98,16 @@ func (suite *DistHandlerSuite) TestBasic() {

LeaderViews: []*querypb.LeaderView{
{
Collection: 1,
Channel: "test-channel-1",
Collection: 1,
Channel: "test-channel-1",
TargetVersion: 1011,
},
},
LastModifyTs: 1,
}, nil)

suite.handler = newDistHandler(suite.ctx, suite.nodeID, suite.client, suite.nodeManager, suite.scheduler, suite.dist, suite.target)
syncTargetVersionFn := func(collectionID int64) {}
suite.handler = newDistHandler(suite.ctx, suite.nodeID, suite.client, suite.nodeManager, suite.scheduler, suite.dist, suite.target, syncTargetVersionFn)
defer suite.handler.stop()

time.Sleep(10 * time.Second)
Expand All @@ -119,7 +122,8 @@ func (suite *DistHandlerSuite) TestGetDistributionFailed() {
}))
suite.client.EXPECT().GetDataDistribution(mock.Anything, mock.Anything, mock.Anything).Return(nil, errors.New("fake error"))

suite.handler = newDistHandler(suite.ctx, suite.nodeID, suite.client, suite.nodeManager, suite.scheduler, suite.dist, suite.target)
syncTargetVersionFn := func(collectionID int64) {}
suite.handler = newDistHandler(suite.ctx, suite.nodeID, suite.client, suite.nodeManager, suite.scheduler, suite.dist, suite.target, syncTargetVersionFn)
defer suite.handler.stop()

time.Sleep(10 * time.Second)
Expand Down
1 change: 1 addition & 0 deletions internal/querycoordv2/job/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ func (suite *JobSuite) SetupTest() {
suite.dist,
suite.broker,
suite.cluster,
suite.nodeMgr,
)
suite.targetObserver.Start()
suite.scheduler = NewScheduler()
Expand Down
1 change: 1 addition & 0 deletions internal/querycoordv2/meta/leader_view_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ func (view *LeaderView) Clone() *LeaderView {
TargetVersion: view.TargetVersion,
NumOfGrowingRows: view.NumOfGrowingRows,
PartitionStatsVersions: view.PartitionStatsVersions,
UnServiceableError: view.UnServiceableError,
}
}

Expand Down
5 changes: 2 additions & 3 deletions internal/querycoordv2/meta/target_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,9 +195,7 @@ func (mgr *TargetManager) UpdateCollectionNextTarget(collectionID int64) error {
mgr.next.updateCollectionTarget(collectionID, NewCollectionTarget(segments, dmChannels, partitionIDs))
log.Debug("finish to update next targets for collection",
zap.Int64("collectionID", collectionID),
zap.Int64s("PartitionIDs", partitionIDs),
zap.Int64s("segments", allocatedTarget.GetAllSegmentIDs()),
zap.Strings("channels", allocatedTarget.GetAllDmChannelNames()))
zap.Int64s("PartitionIDs", partitionIDs))

return nil
}
Expand Down Expand Up @@ -604,6 +602,7 @@ func (mgr *TargetManager) Recover(catalog metastore.QueryCoordCatalog) error {
zap.Int64("collectionID", t.GetCollectionID()),
zap.Strings("channels", newTarget.GetAllDmChannelNames()),
zap.Int("segmentNum", len(newTarget.GetAllSegmentIDs())),
zap.Int64("version", newTarget.GetTargetVersion()),
)

// clear target info in meta store
Expand Down
35 changes: 31 additions & 4 deletions internal/querycoordv2/observers/collection_observer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ type CollectionObserverSuite struct {
targetObserver *TargetObserver
checkerController *checkers.CheckerController

nodeMgr *session.NodeManager

// Test object
ob *CollectionObserver
}
Expand Down Expand Up @@ -191,8 +193,8 @@ func (suite *CollectionObserverSuite) SetupTest() {

// Dependencies
suite.dist = meta.NewDistributionManager()
nodeMgr := session.NewNodeManager()
suite.meta = meta.NewMeta(suite.idAllocator, suite.store, nodeMgr)
suite.nodeMgr = session.NewNodeManager()
suite.meta = meta.NewMeta(suite.idAllocator, suite.store, suite.nodeMgr)
suite.broker = meta.NewMockBroker(suite.T())
suite.targetMgr = meta.NewTargetManager(suite.broker, suite.meta)
suite.cluster = session.NewMockCluster(suite.T())
Expand All @@ -201,6 +203,7 @@ func (suite *CollectionObserverSuite) SetupTest() {
suite.dist,
suite.broker,
suite.cluster,
suite.nodeMgr,
)
suite.checkerController = &checkers.CheckerController{}

Expand All @@ -223,6 +226,16 @@ func (suite *CollectionObserverSuite) SetupTest() {
suite.targetObserver.Start()
suite.ob.Start()
suite.loadAll()

suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 1,
}))
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 2,
}))
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 3,
}))
}

func (suite *CollectionObserverSuite) TearDownTest() {
Expand All @@ -248,12 +261,19 @@ func (suite *CollectionObserverSuite) TestObserve() {
Channel: "100-dmc0",
Segments: map[int64]*querypb.SegmentDist{1: {NodeID: 1, Version: 0}},
})
view := &meta.LeaderView{
ID: 2,
CollectionID: 103,
Channel: "103-dmc0",
Segments: make(map[int64]*querypb.SegmentDist),
}
suite.dist.LeaderViewManager.Update(2, &meta.LeaderView{
ID: 2,
CollectionID: 100,
Channel: "100-dmc1",
Segments: map[int64]*querypb.SegmentDist{2: {NodeID: 2, Version: 0}},
})
}, view)

view1 := &meta.LeaderView{
ID: 3,
CollectionID: 102,
Expand All @@ -265,17 +285,24 @@ func (suite *CollectionObserverSuite) TestObserve() {
suite.True(ok)
view2 := &meta.LeaderView{
ID: 3,
CollectionID: 13,
CollectionID: 103,
Channel: "103-dmc0",
Segments: make(map[int64]*querypb.SegmentDist),
}
for _, segment := range segmentsInfo {
view2.Segments[segment.GetID()] = &querypb.SegmentDist{
NodeID: 3, Version: 0,
}
view.Segments[segment.GetID()] = &querypb.SegmentDist{
NodeID: 2, Version: 0,
}
}
suite.dist.LeaderViewManager.Update(3, view1, view2)

suite.broker.EXPECT().DescribeCollection(mock.Anything, mock.Anything).Return(nil, nil).Maybe()
suite.broker.EXPECT().ListIndexes(mock.Anything, mock.Anything).Return(nil, nil).Maybe()
suite.cluster.EXPECT().SyncDistribution(mock.Anything, mock.Anything, mock.Anything).Return(merr.Success(), nil).Maybe()

suite.Eventually(func() bool {
return suite.isCollectionLoadedContinue(suite.collections[2], time)
}, timeout-1, timeout/10)
Expand Down
Loading

0 comments on commit f8606d8

Please sign in to comment.