Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Search may return less result after qn recover #36549

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 17 additions & 14 deletions internal/querycoordv2/dist/dist_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,14 @@ type Controller interface {
}

type ControllerImpl struct {
mu sync.RWMutex
handlers map[int64]*distHandler
client session.Cluster
nodeManager *session.NodeManager
dist *meta.DistributionManager
targetMgr meta.TargetManagerInterface
scheduler task.Scheduler
mu sync.RWMutex
handlers map[int64]*distHandler
client session.Cluster
nodeManager *session.NodeManager
dist *meta.DistributionManager
targetMgr meta.TargetManagerInterface
scheduler task.Scheduler
syncTargetVersionFn TriggerUpdateTargetVersion
}

func (dc *ControllerImpl) StartDistInstance(ctx context.Context, nodeID int64) {
Expand All @@ -52,7 +53,7 @@ func (dc *ControllerImpl) StartDistInstance(ctx context.Context, nodeID int64) {
log.Info("node has started", zap.Int64("nodeID", nodeID))
return
}
h := newDistHandler(ctx, nodeID, dc.client, dc.nodeManager, dc.scheduler, dc.dist, dc.targetMgr)
h := newDistHandler(ctx, nodeID, dc.client, dc.nodeManager, dc.scheduler, dc.dist, dc.targetMgr, dc.syncTargetVersionFn)
dc.handlers[nodeID] = h
}

Expand Down Expand Up @@ -100,13 +101,15 @@ func NewDistController(
dist *meta.DistributionManager,
targetMgr meta.TargetManagerInterface,
scheduler task.Scheduler,
syncTargetVersionFn TriggerUpdateTargetVersion,
) *ControllerImpl {
return &ControllerImpl{
handlers: make(map[int64]*distHandler),
client: client,
nodeManager: nodeManager,
dist: dist,
targetMgr: targetMgr,
scheduler: scheduler,
handlers: make(map[int64]*distHandler),
client: client,
nodeManager: nodeManager,
dist: dist,
targetMgr: targetMgr,
scheduler: scheduler,
syncTargetVersionFn: syncTargetVersionFn,
}
}
3 changes: 2 additions & 1 deletion internal/querycoordv2/dist/dist_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ func (suite *DistControllerTestSuite) SetupTest() {
targetManager := meta.NewTargetManager(suite.broker, suite.meta)
suite.mockScheduler = task.NewMockScheduler(suite.T())
suite.mockScheduler.EXPECT().GetExecutedFlag(mock.Anything).Return(nil).Maybe()
suite.controller = NewDistController(suite.mockCluster, suite.nodeMgr, distManager, targetManager, suite.mockScheduler)
syncTargetVersionFn := func(collectionID int64) {}
suite.controller = NewDistController(suite.mockCluster, suite.nodeMgr, distManager, targetManager, suite.mockScheduler, syncTargetVersionFn)
}

func (suite *DistControllerTestSuite) TearDownSuite() {
Expand Down
50 changes: 40 additions & 10 deletions internal/querycoordv2/dist/dist_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package dist

import (
"context"
"fmt"
"sync"
"time"

Expand All @@ -39,6 +40,8 @@ import (
"github.com/milvus-io/milvus/pkg/util/paramtable"
)

type TriggerUpdateTargetVersion = func(collectionID int64)

type distHandler struct {
nodeID int64
c chan struct{}
Expand All @@ -51,6 +54,8 @@ type distHandler struct {
mu sync.Mutex
stopOnce sync.Once
lastUpdateTs int64

syncTargetVersionFn TriggerUpdateTargetVersion
}

func (dh *distHandler) start(ctx context.Context) {
Expand Down Expand Up @@ -222,12 +227,35 @@ func (dh *distHandler) updateLeaderView(resp *querypb.GetDataDistributionRespons
NumOfGrowingRows: lview.GetNumOfGrowingRows(),
PartitionStatsVersions: lview.PartitionStatsVersions,
}
updates = append(updates, view)

// check leader serviceable
// todo by weiliu1031: serviceable status should be maintained by delegator, to avoid heavy check here
if err := utils.CheckLeaderAvailable(dh.nodeManager, dh.target, view); err != nil {
if err := utils.CheckDelegatorDataReady(dh.nodeManager, dh.target, view, meta.CurrentTarget); err != nil {
view.UnServiceableError = err
log.Info("leader is not available due to distribution not ready",
zap.Int64("collectionID", view.CollectionID),
zap.Int64("nodeID", view.ID),
zap.String("channel", view.Channel),
zap.Error(err))
continue
}

// if target version hasn't been synced, delegator will get empty readable segment list
// so shard leader should be unserviceable until target version is synced
currentTargetVersion := dh.target.GetCollectionTargetVersion(lview.GetCollection(), meta.CurrentTarget)
if lview.TargetVersion <= 0 {
err := merr.WrapErrServiceInternal(fmt.Sprintf("target version mismatch, collection: %d, channel: %s, current target version: %v, leader version: %v",
lview.GetCollection(), lview.GetChannel(), currentTargetVersion, lview.TargetVersion))

// segment and channel already loaded, trigger target observer to check target version
dh.syncTargetVersionFn(lview.GetCollection())
view.UnServiceableError = err
log.Info("leader is not available due to target version not ready",
zap.Int64("collectionID", view.CollectionID),
zap.Int64("nodeID", view.ID),
zap.String("channel", view.Channel),
zap.Error(err))
}
updates = append(updates, view)
}

dh.dist.LeaderViewManager.Update(resp.GetNodeID(), updates...)
Expand Down Expand Up @@ -273,15 +301,17 @@ func newDistHandler(
scheduler task.Scheduler,
dist *meta.DistributionManager,
targetMgr meta.TargetManagerInterface,
syncTargetVersionFn TriggerUpdateTargetVersion,
) *distHandler {
h := &distHandler{
nodeID: nodeID,
c: make(chan struct{}),
client: client,
nodeManager: nodeManager,
scheduler: scheduler,
dist: dist,
target: targetMgr,
nodeID: nodeID,
c: make(chan struct{}),
client: client,
nodeManager: nodeManager,
scheduler: scheduler,
dist: dist,
target: targetMgr,
syncTargetVersionFn: syncTargetVersionFn,
}
h.wg.Add(1)
go h.start(ctx)
Expand Down
15 changes: 10 additions & 5 deletions internal/querycoordv2/dist/dist_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ func (suite *DistHandlerSuite) SetupSuite() {
suite.scheduler.EXPECT().GetExecutedFlag(mock.Anything).Return(suite.executedFlagChan).Maybe()
suite.target.EXPECT().GetSealedSegment(mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe()
suite.target.EXPECT().GetDmChannel(mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe()
suite.target.EXPECT().GetCollectionTargetVersion(mock.Anything, mock.Anything).Return(1011).Maybe()
}

func (suite *DistHandlerSuite) TestBasic() {
Expand Down Expand Up @@ -105,14 +106,16 @@ func (suite *DistHandlerSuite) TestBasic() {

LeaderViews: []*querypb.LeaderView{
{
Collection: 1,
Channel: "test-channel-1",
Collection: 1,
Channel: "test-channel-1",
TargetVersion: 1011,
},
},
LastModifyTs: 1,
}, nil)

suite.handler = newDistHandler(suite.ctx, suite.nodeID, suite.client, suite.nodeManager, suite.scheduler, suite.dist, suite.target)
syncTargetVersionFn := func(collectionID int64) {}
suite.handler = newDistHandler(suite.ctx, suite.nodeID, suite.client, suite.nodeManager, suite.scheduler, suite.dist, suite.target, syncTargetVersionFn)
defer suite.handler.stop()

time.Sleep(3 * time.Second)
Expand All @@ -132,7 +135,8 @@ func (suite *DistHandlerSuite) TestGetDistributionFailed() {
}))
suite.client.EXPECT().GetDataDistribution(mock.Anything, mock.Anything, mock.Anything).Return(nil, errors.New("fake error"))

suite.handler = newDistHandler(suite.ctx, suite.nodeID, suite.client, suite.nodeManager, suite.scheduler, suite.dist, suite.target)
syncTargetVersionFn := func(collectionID int64) {}
suite.handler = newDistHandler(suite.ctx, suite.nodeID, suite.client, suite.nodeManager, suite.scheduler, suite.dist, suite.target, syncTargetVersionFn)
defer suite.handler.stop()

time.Sleep(3 * time.Second)
Expand Down Expand Up @@ -180,7 +184,8 @@ func (suite *DistHandlerSuite) TestForcePullDist() {
LastModifyTs: 1,
}, nil)
suite.executedFlagChan <- struct{}{}
suite.handler = newDistHandler(suite.ctx, suite.nodeID, suite.client, suite.nodeManager, suite.scheduler, suite.dist, suite.target)
syncTargetVersionFn := func(collectionID int64) {}
suite.handler = newDistHandler(suite.ctx, suite.nodeID, suite.client, suite.nodeManager, suite.scheduler, suite.dist, suite.target, syncTargetVersionFn)
defer suite.handler.stop()

time.Sleep(300 * time.Millisecond)
Expand Down
1 change: 1 addition & 0 deletions internal/querycoordv2/job/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ func (suite *JobSuite) SetupTest() {
suite.dist,
suite.broker,
suite.cluster,
suite.nodeMgr,
)
suite.targetObserver.Start()
suite.scheduler = NewScheduler()
Expand Down
1 change: 1 addition & 0 deletions internal/querycoordv2/meta/leader_view_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ func (view *LeaderView) Clone() *LeaderView {
TargetVersion: view.TargetVersion,
NumOfGrowingRows: view.NumOfGrowingRows,
PartitionStatsVersions: view.PartitionStatsVersions,
UnServiceableError: view.UnServiceableError,
}
}

Expand Down
5 changes: 2 additions & 3 deletions internal/querycoordv2/meta/target_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,9 +197,7 @@ func (mgr *TargetManager) UpdateCollectionNextTarget(collectionID int64) error {
mgr.next.updateCollectionTarget(collectionID, allocatedTarget)
log.Debug("finish to update next targets for collection",
zap.Int64("collectionID", collectionID),
zap.Int64s("PartitionIDs", partitionIDs),
zap.Int64s("segments", allocatedTarget.GetAllSegmentIDs()),
zap.Strings("channels", allocatedTarget.GetAllDmChannelNames()))
zap.Int64s("PartitionIDs", partitionIDs))

return nil
}
Expand Down Expand Up @@ -606,6 +604,7 @@ func (mgr *TargetManager) Recover(catalog metastore.QueryCoordCatalog) error {
zap.Int64("collectionID", t.GetCollectionID()),
zap.Strings("channels", newTarget.GetAllDmChannelNames()),
zap.Int("segmentNum", len(newTarget.GetAllSegmentIDs())),
zap.Int64("version", newTarget.GetTargetVersion()),
)

// clear target info in meta store
Expand Down
35 changes: 31 additions & 4 deletions internal/querycoordv2/observers/collection_observer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ type CollectionObserverSuite struct {
targetObserver *TargetObserver
checkerController *checkers.CheckerController

nodeMgr *session.NodeManager

// Test object
ob *CollectionObserver
}
Expand Down Expand Up @@ -191,8 +193,8 @@ func (suite *CollectionObserverSuite) SetupTest() {

// Dependencies
suite.dist = meta.NewDistributionManager()
nodeMgr := session.NewNodeManager()
suite.meta = meta.NewMeta(suite.idAllocator, suite.store, nodeMgr)
suite.nodeMgr = session.NewNodeManager()
suite.meta = meta.NewMeta(suite.idAllocator, suite.store, suite.nodeMgr)
suite.broker = meta.NewMockBroker(suite.T())
suite.targetMgr = meta.NewTargetManager(suite.broker, suite.meta)
suite.cluster = session.NewMockCluster(suite.T())
Expand All @@ -201,6 +203,7 @@ func (suite *CollectionObserverSuite) SetupTest() {
suite.dist,
suite.broker,
suite.cluster,
suite.nodeMgr,
)
suite.checkerController = &checkers.CheckerController{}

Expand All @@ -223,6 +226,16 @@ func (suite *CollectionObserverSuite) SetupTest() {
suite.targetObserver.Start()
suite.ob.Start()
suite.loadAll()

suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 1,
}))
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 2,
}))
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 3,
}))
}

func (suite *CollectionObserverSuite) TearDownTest() {
Expand All @@ -248,12 +261,19 @@ func (suite *CollectionObserverSuite) TestObserve() {
Channel: "100-dmc0",
Segments: map[int64]*querypb.SegmentDist{1: {NodeID: 1, Version: 0}},
})
view := &meta.LeaderView{
ID: 2,
CollectionID: 103,
Channel: "103-dmc0",
Segments: make(map[int64]*querypb.SegmentDist),
}
suite.dist.LeaderViewManager.Update(2, &meta.LeaderView{
ID: 2,
CollectionID: 100,
Channel: "100-dmc1",
Segments: map[int64]*querypb.SegmentDist{2: {NodeID: 2, Version: 0}},
})
}, view)

view1 := &meta.LeaderView{
ID: 3,
CollectionID: 102,
Expand All @@ -265,17 +285,24 @@ func (suite *CollectionObserverSuite) TestObserve() {
suite.True(ok)
view2 := &meta.LeaderView{
ID: 3,
CollectionID: 13,
CollectionID: 103,
Channel: "103-dmc0",
Segments: make(map[int64]*querypb.SegmentDist),
}
for _, segment := range segmentsInfo {
view2.Segments[segment.GetID()] = &querypb.SegmentDist{
NodeID: 3, Version: 0,
}
view.Segments[segment.GetID()] = &querypb.SegmentDist{
NodeID: 2, Version: 0,
}
}
suite.dist.LeaderViewManager.Update(3, view1, view2)

suite.broker.EXPECT().DescribeCollection(mock.Anything, mock.Anything).Return(nil, nil).Maybe()
suite.broker.EXPECT().ListIndexes(mock.Anything, mock.Anything).Return(nil, nil).Maybe()
suite.cluster.EXPECT().SyncDistribution(mock.Anything, mock.Anything, mock.Anything).Return(merr.Success(), nil).Maybe()

suite.Eventually(func() bool {
return suite.isCollectionLoadedContinue(suite.collections[2], time)
}, timeout-1, timeout/10)
Expand Down
Loading
Loading