Skip to content

Commit

Permalink
fix sync distribution with wrong version (#28130) (#28170)
Browse files Browse the repository at this point in the history
Signed-off-by: Wei Liu <[email protected]>
  • Loading branch information
weiliu1031 authored Nov 6, 2023
1 parent 5c44421 commit 87e8d04
Show file tree
Hide file tree
Showing 6 changed files with 74 additions and 25 deletions.
5 changes: 3 additions & 2 deletions internal/querycoordv2/observers/collection_observer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,8 @@ func (suite *CollectionObserverSuite) SetupTest() {

// Dependencies
suite.dist = meta.NewDistributionManager()
suite.meta = meta.NewMeta(suite.idAllocator, suite.store, session.NewNodeManager())
nodeMgr := session.NewNodeManager()
suite.meta = meta.NewMeta(suite.idAllocator, suite.store, nodeMgr)
suite.broker = meta.NewMockBroker(suite.T())
suite.targetMgr = meta.NewTargetManager(suite.broker, suite.meta)
suite.targetObserver = NewTargetObserver(suite.meta,
Expand All @@ -196,7 +197,7 @@ func (suite *CollectionObserverSuite) SetupTest() {
suite.checkerController = &checkers.CheckerController{}

mockCluster := session.NewMockCluster(suite.T())
suite.leaderObserver = NewLeaderObserver(suite.dist, suite.meta, suite.targetMgr, suite.broker, mockCluster)
suite.leaderObserver = NewLeaderObserver(suite.dist, suite.meta, suite.targetMgr, suite.broker, mockCluster, nodeMgr)
mockCluster.EXPECT().SyncDistribution(mock.Anything, mock.Anything, mock.Anything).Return(merr.Success(), nil).Maybe()

// Test object
Expand Down
8 changes: 8 additions & 0 deletions internal/querycoordv2/observers/leader_observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ type LeaderObserver struct {
target *meta.TargetManager
broker meta.Broker
cluster session.Cluster
nodeMgr *session.NodeManager

dispatcher *taskDispatcher[int64]

Expand Down Expand Up @@ -118,6 +119,11 @@ func (o *LeaderObserver) observeCollection(ctx context.Context, collection int64
for _, replica := range replicas {
leaders := o.dist.ChannelDistManager.GetShardLeadersByReplica(replica)
for ch, leaderID := range leaders {
if ok, _ := o.nodeMgr.IsStoppingNode(leaderID); ok {
// no need to correct leader's view which is loaded on stopping node
continue
}

leaderView := o.dist.LeaderViewManager.GetLeaderShardView(leaderID, ch)
if leaderView == nil {
continue
Expand Down Expand Up @@ -326,13 +332,15 @@ func NewLeaderObserver(
targetMgr *meta.TargetManager,
broker meta.Broker,
cluster session.Cluster,
nodeMgr *session.NodeManager,
) *LeaderObserver {
ob := &LeaderObserver{
dist: dist,
meta: meta,
target: targetMgr,
broker: broker,
cluster: cluster,
nodeMgr: nodeMgr,
}

dispatcher := newTaskDispatcher[int64](ob.observeCollection)
Expand Down
5 changes: 3 additions & 2 deletions internal/querycoordv2/observers/leader_observer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ func (suite *LeaderObserverTestSuite) SetupTest() {
// meta
store := querycoord.NewCatalog(suite.kv)
idAllocator := RandomIncrementIDAllocator()
suite.meta = meta.NewMeta(idAllocator, store, session.NewNodeManager())
nodeMgr := session.NewNodeManager()
suite.meta = meta.NewMeta(idAllocator, store, nodeMgr)
suite.broker = meta.NewMockBroker(suite.T())

suite.mockCluster = session.NewMockCluster(suite.T())
Expand All @@ -80,7 +81,7 @@ func (suite *LeaderObserverTestSuite) SetupTest() {
// }, nil).Maybe()
distManager := meta.NewDistributionManager()
targetManager := meta.NewTargetManager(suite.broker, suite.meta)
suite.observer = NewLeaderObserver(distManager, suite.meta, targetManager, suite.broker, suite.mockCluster)
suite.observer = NewLeaderObserver(distManager, suite.meta, targetManager, suite.broker, suite.mockCluster, nodeMgr)
}

func (suite *LeaderObserverTestSuite) TearDownTest() {
Expand Down
1 change: 1 addition & 0 deletions internal/querycoordv2/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,7 @@ func (s *Server) initObserver() {
s.targetMgr,
s.broker,
s.cluster,
s.nodeMgr,
)
s.targetObserver = observers.NewTargetObserver(
s.meta,
Expand Down
46 changes: 25 additions & 21 deletions internal/querynodev2/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -1317,7 +1317,7 @@ func (node *QueryNode) SyncDistribution(ctx context.Context, req *querypb.SyncDi

// translate segment action
removeActions := make([]*querypb.SyncAction, 0)
addSegments := make(map[int64][]*querypb.SegmentLoadInfo)
group, ctx := errgroup.WithContext(ctx)
for _, action := range req.GetActions() {
log := log.With(zap.String("Action",
action.GetType().String()))
Expand All @@ -1331,7 +1331,26 @@ func (node *QueryNode) SyncDistribution(ctx context.Context, req *querypb.SyncDi
log.Warn("sync request from legacy querycoord without load info, skip")
continue
}
addSegments[action.GetNodeID()] = append(addSegments[action.GetNodeID()], action.GetInfo())

// to pass segment'version, we call load segment one by one
action := action
group.Go(func() error {
return shardDelegator.LoadSegments(ctx, &querypb.LoadSegmentsRequest{
Base: commonpbutil.NewMsgBase(
commonpbutil.WithMsgType(commonpb.MsgType_LoadSegments),
commonpbutil.WithMsgID(req.Base.GetMsgID()),
),
Infos: []*querypb.SegmentLoadInfo{action.GetInfo()},
Schema: req.GetSchema(),
LoadMeta: req.GetLoadMeta(),
CollectionID: req.GetCollectionID(),
ReplicaID: req.GetReplicaID(),
DstNodeID: action.GetNodeID(),
Version: action.GetVersion(),
NeedTransfer: false,
LoadScope: querypb.LoadScope_Delta,
})
})
case querypb.SyncType_UpdateVersion:
log.Info("sync action", zap.Int64("TargetVersion", action.GetTargetVersion()))
pipeline := node.pipelineManager.Get(req.GetChannel())
Expand All @@ -1353,25 +1372,10 @@ func (node *QueryNode) SyncDistribution(ctx context.Context, req *querypb.SyncDi
}
}

for nodeID, infos := range addSegments {
err := shardDelegator.LoadSegments(ctx, &querypb.LoadSegmentsRequest{
Base: commonpbutil.NewMsgBase(
commonpbutil.WithMsgType(commonpb.MsgType_LoadSegments),
commonpbutil.WithMsgID(req.Base.GetMsgID()),
),
Infos: infos,
Schema: req.GetSchema(),
LoadMeta: req.GetLoadMeta(),
CollectionID: req.GetCollectionID(),
ReplicaID: req.GetReplicaID(),
DstNodeID: nodeID,
Version: req.GetVersion(),
NeedTransfer: false,
LoadScope: querypb.LoadScope_Delta,
})
if err != nil {
return merr.Status(err), nil
}
err := group.Wait()
if err != nil {
log.Warn("failed to sync distribution", zap.Error(err))
return merr.Status(err), nil
}

for _, action := range removeActions {
Expand Down
34 changes: 34 additions & 0 deletions internal/querynodev2/services_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
Expand All @@ -45,6 +46,7 @@ import (
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/util/dependency"
"github.com/milvus-io/milvus/internal/util/streamrpc"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/mq/msgstream"
"github.com/milvus-io/milvus/pkg/util/conc"
"github.com/milvus-io/milvus/pkg/util/etcd"
Expand Down Expand Up @@ -1765,6 +1767,7 @@ func (suite *ServiceSuite) TestSyncDistribution_Normal() {
suite.NoError(err)
suite.Equal(commonpb.ErrorCode_Success, status.ErrorCode)

// test sync targte version
syncVersionAction := &querypb.SyncAction{
Type: querypb.SyncType_UpdateVersion,
SealedInTarget: []int64{3},
Expand All @@ -1777,6 +1780,37 @@ func (suite *ServiceSuite) TestSyncDistribution_Normal() {
status, err = suite.node.SyncDistribution(ctx, req)
suite.NoError(err)
suite.Equal(commonpb.ErrorCode_Success, status.GetErrorCode())

// test sync segments
segmentVersion := int64(111)
syncSegmentVersion := &querypb.SyncAction{
Type: querypb.SyncType_Set,
SegmentID: suite.validSegmentIDs[0],
NodeID: 0,
PartitionID: suite.partitionIDs[0],
Info: &querypb.SegmentLoadInfo{},
Version: segmentVersion,
}
req.Actions = []*querypb.SyncAction{syncSegmentVersion}

testChannel := "test_sync_segment"
req.Channel = testChannel

// expected call load segment with right segment version
var versionMatch bool
mockDelegator := delegator.NewMockShardDelegator(suite.T())
mockDelegator.EXPECT().LoadSegments(mock.Anything, mock.Anything).
RunAndReturn(func(ctx context.Context, req *querypb.LoadSegmentsRequest) error {
log.Info("version", zap.Int64("versionInload", req.GetVersion()))
versionMatch = req.GetVersion() == segmentVersion
return nil
})
suite.node.delegators.Insert(testChannel, mockDelegator)

status, err = suite.node.SyncDistribution(ctx, req)
suite.NoError(err)
suite.Equal(commonpb.ErrorCode_Success, status.GetErrorCode())
suite.True(versionMatch)
}

func (suite *ServiceSuite) TestSyncDistribution_ReleaseResultCheck() {
Expand Down

0 comments on commit 87e8d04

Please sign in to comment.