From 051bc280dd8b3e76a1639719b715fd7d577b9861 Mon Sep 17 00:00:00 2001 From: congqixia Date: Thu, 5 Dec 2024 16:24:40 +0800 Subject: [PATCH] enhance: Make dynamic load/release partition follow targets (#38059) Related to #37849 --------- Signed-off-by: Congqi Xia --- internal/querycoordv2/job/job_load.go | 22 +- internal/querycoordv2/job/job_release.go | 8 +- internal/querycoordv2/job/job_sync.go | 34 +- internal/querycoordv2/job/job_test.go | 309 ++++++------------ internal/querycoordv2/job/undo.go | 5 +- internal/querycoordv2/job/utils.go | 91 +----- .../querycoordv2/meta/mock_target_manager.go | 60 ++++ internal/querycoordv2/meta/target_manager.go | 18 +- .../observers/collection_observer_test.go | 1 + .../querycoordv2/observers/target_observer.go | 20 +- internal/querycoordv2/services.go | 6 +- internal/querycoordv2/services_test.go | 42 +-- internal/querycoordv2/task/executor.go | 4 +- internal/querycoordv2/utils/meta.go | 16 +- internal/querynodev2/delegator/delegator.go | 63 +--- .../querynodev2/delegator/delegator_data.go | 4 +- .../delegator/delegator_data_test.go | 9 +- .../querynodev2/delegator/delegator_test.go | 2 +- .../delegator/delta_forward_test.go | 34 +- .../querynodev2/delegator/distribution.go | 61 +++- .../delegator/distribution_test.go | 38 ++- .../querynodev2/delegator/mock_delegator.go | 17 +- internal/querynodev2/delegator/snapshot.go | 3 + internal/querynodev2/segments/validate.go | 44 +-- internal/querynodev2/services.go | 4 +- internal/querynodev2/services_test.go | 23 ++ 26 files changed, 399 insertions(+), 539 deletions(-) diff --git a/internal/querycoordv2/job/job_load.go b/internal/querycoordv2/job/job_load.go index 9457b1b303ab3..5009be26fb4ad 100644 --- a/internal/querycoordv2/job/job_load.go +++ b/internal/querycoordv2/job/job_load.go @@ -50,7 +50,6 @@ type LoadCollectionJob struct { dist *meta.DistributionManager meta *meta.Meta broker meta.Broker - cluster session.Cluster targetMgr meta.TargetManagerInterface targetObserver *observers.TargetObserver collectionObserver *observers.CollectionObserver @@ -63,7 +62,6 @@ func NewLoadCollectionJob( dist *meta.DistributionManager, meta *meta.Meta, broker meta.Broker, - cluster session.Cluster, targetMgr meta.TargetManagerInterface, targetObserver *observers.TargetObserver, collectionObserver *observers.CollectionObserver, @@ -72,11 +70,10 @@ func NewLoadCollectionJob( return &LoadCollectionJob{ BaseJob: NewBaseJob(ctx, req.Base.GetMsgID(), req.GetCollectionID()), req: req, - undo: NewUndoList(ctx, meta, cluster, targetMgr, targetObserver), + undo: NewUndoList(ctx, meta, targetMgr, targetObserver), dist: dist, meta: meta, broker: broker, - cluster: cluster, targetMgr: targetMgr, targetObserver: targetObserver, collectionObserver: collectionObserver, @@ -193,12 +190,6 @@ func (job *LoadCollectionJob) Execute() error { job.undo.IsReplicaCreated = true } - // 3. loadPartitions on QueryNodes - err = loadPartitions(job.ctx, job.meta, job.cluster, job.broker, true, req.GetCollectionID(), lackPartitionIDs...) - if err != nil { - return err - } - // 4. put collection/partitions meta partitions := lo.Map(lackPartitionIDs, func(partID int64, _ int) *meta.Partition { return &meta.Partition{ @@ -264,7 +255,6 @@ type LoadPartitionJob struct { dist *meta.DistributionManager meta *meta.Meta broker meta.Broker - cluster session.Cluster targetMgr meta.TargetManagerInterface targetObserver *observers.TargetObserver collectionObserver *observers.CollectionObserver @@ -277,7 +267,6 @@ func NewLoadPartitionJob( dist *meta.DistributionManager, meta *meta.Meta, broker meta.Broker, - cluster session.Cluster, targetMgr meta.TargetManagerInterface, targetObserver *observers.TargetObserver, collectionObserver *observers.CollectionObserver, @@ -286,11 +275,10 @@ func NewLoadPartitionJob( return &LoadPartitionJob{ BaseJob: NewBaseJob(ctx, req.Base.GetMsgID(), req.GetCollectionID()), req: req, - undo: NewUndoList(ctx, meta, cluster, targetMgr, targetObserver), + undo: NewUndoList(ctx, meta, targetMgr, targetObserver), dist: dist, meta: meta, broker: broker, - cluster: cluster, targetMgr: targetMgr, targetObserver: targetObserver, collectionObserver: collectionObserver, @@ -399,12 +387,6 @@ func (job *LoadPartitionJob) Execute() error { job.undo.IsReplicaCreated = true } - // 3. loadPartitions on QueryNodes - err = loadPartitions(job.ctx, job.meta, job.cluster, job.broker, true, req.GetCollectionID(), lackPartitionIDs...) - if err != nil { - return err - } - // 4. put collection/partitions meta partitions := lo.Map(lackPartitionIDs, func(partID int64, _ int) *meta.Partition { return &meta.Partition{ diff --git a/internal/querycoordv2/job/job_release.go b/internal/querycoordv2/job/job_release.go index 8dcf4466f6785..3ffccecd7bfae 100644 --- a/internal/querycoordv2/job/job_release.go +++ b/internal/querycoordv2/job/job_release.go @@ -53,7 +53,6 @@ func NewReleaseCollectionJob(ctx context.Context, dist *meta.DistributionManager, meta *meta.Meta, broker meta.Broker, - cluster session.Cluster, targetMgr meta.TargetManagerInterface, targetObserver *observers.TargetObserver, checkerController *checkers.CheckerController, @@ -65,7 +64,6 @@ func NewReleaseCollectionJob(ctx context.Context, dist: dist, meta: meta, broker: broker, - cluster: cluster, targetMgr: targetMgr, targetObserver: targetObserver, checkerController: checkerController, @@ -86,7 +84,6 @@ func (job *ReleaseCollectionJob) Execute() error { toRelease := lo.Map(loadedPartitions, func(partition *meta.Partition, _ int) int64 { return partition.GetPartitionID() }) - releasePartitions(job.ctx, job.meta, job.cluster, req.GetCollectionID(), toRelease...) err := job.meta.CollectionManager.RemoveCollection(job.ctx, req.GetCollectionID()) if err != nil { @@ -137,7 +134,6 @@ func NewReleasePartitionJob(ctx context.Context, dist *meta.DistributionManager, meta *meta.Meta, broker meta.Broker, - cluster session.Cluster, targetMgr meta.TargetManagerInterface, targetObserver *observers.TargetObserver, checkerController *checkers.CheckerController, @@ -149,7 +145,6 @@ func NewReleasePartitionJob(ctx context.Context, dist: dist, meta: meta, broker: broker, - cluster: cluster, targetMgr: targetMgr, targetObserver: targetObserver, checkerController: checkerController, @@ -178,7 +173,6 @@ func (job *ReleasePartitionJob) Execute() error { log.Warn("releasing partition(s) not loaded") return nil } - releasePartitions(job.ctx, job.meta, job.cluster, req.GetCollectionID(), toRelease...) // If all partitions are released, clear all if len(toRelease) == len(loadedPartitions) { @@ -211,6 +205,8 @@ func (job *ReleasePartitionJob) Execute() error { return errors.Wrap(err, msg) } job.targetObserver.ReleasePartition(req.GetCollectionID(), toRelease...) + // wait current target updated, so following querys will act as expected + waitCurrentTargetUpdated(job.ctx, job.targetObserver, job.req.GetCollectionID()) waitCollectionReleased(job.dist, job.checkerController, req.GetCollectionID(), toRelease...) } return nil diff --git a/internal/querycoordv2/job/job_sync.go b/internal/querycoordv2/job/job_sync.go index 49cb805b7d449..899cbe56495fc 100644 --- a/internal/querycoordv2/job/job_sync.go +++ b/internal/querycoordv2/job/job_sync.go @@ -25,31 +25,36 @@ import ( "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/querycoordv2/meta" + "github.com/milvus-io/milvus/internal/querycoordv2/observers" "github.com/milvus-io/milvus/internal/querycoordv2/session" "github.com/milvus-io/milvus/pkg/log" ) type SyncNewCreatedPartitionJob struct { *BaseJob - req *querypb.SyncNewCreatedPartitionRequest - meta *meta.Meta - cluster session.Cluster - broker meta.Broker + req *querypb.SyncNewCreatedPartitionRequest + meta *meta.Meta + cluster session.Cluster + broker meta.Broker + targetObserver *observers.TargetObserver + targetMgr meta.TargetManagerInterface } func NewSyncNewCreatedPartitionJob( ctx context.Context, req *querypb.SyncNewCreatedPartitionRequest, meta *meta.Meta, - cluster session.Cluster, broker meta.Broker, + targetObserver *observers.TargetObserver, + targetMgr meta.TargetManagerInterface, ) *SyncNewCreatedPartitionJob { return &SyncNewCreatedPartitionJob{ - BaseJob: NewBaseJob(ctx, req.Base.GetMsgID(), req.GetCollectionID()), - req: req, - meta: meta, - cluster: cluster, - broker: broker, + BaseJob: NewBaseJob(ctx, req.Base.GetMsgID(), req.GetCollectionID()), + req: req, + meta: meta, + broker: broker, + targetObserver: targetObserver, + targetMgr: targetMgr, } } @@ -75,11 +80,6 @@ func (job *SyncNewCreatedPartitionJob) Execute() error { return nil } - err := loadPartitions(job.ctx, job.meta, job.cluster, job.broker, false, req.GetCollectionID(), req.GetPartitionID()) - if err != nil { - return err - } - partition := &meta.Partition{ PartitionLoadInfo: &querypb.PartitionLoadInfo{ CollectionID: req.GetCollectionID(), @@ -89,12 +89,12 @@ func (job *SyncNewCreatedPartitionJob) Execute() error { LoadPercentage: 100, CreatedAt: time.Now(), } - err = job.meta.CollectionManager.PutPartition(job.ctx, partition) + err := job.meta.CollectionManager.PutPartition(job.ctx, partition) if err != nil { msg := "failed to store partitions" log.Warn(msg, zap.Error(err)) return errors.Wrap(err, msg) } - return nil + return waitCurrentTargetUpdated(job.ctx, job.targetObserver, job.req.GetCollectionID()) } diff --git a/internal/querycoordv2/job/job_test.go b/internal/querycoordv2/job/job_test.go index 02615713a5b00..7c7e188c9d56e 100644 --- a/internal/querycoordv2/job/job_test.go +++ b/internal/querycoordv2/job/job_test.go @@ -18,8 +18,8 @@ package job import ( "context" - "fmt" "testing" + "time" "github.com/cockroachdb/errors" "github.com/samber/lo" @@ -39,11 +39,13 @@ import ( "github.com/milvus-io/milvus/internal/querycoordv2/observers" . "github.com/milvus-io/milvus/internal/querycoordv2/params" "github.com/milvus-io/milvus/internal/querycoordv2/session" + "github.com/milvus-io/milvus/internal/querycoordv2/utils" "github.com/milvus-io/milvus/internal/util/proxyutil" "github.com/milvus-io/milvus/pkg/kv" "github.com/milvus-io/milvus/pkg/util/etcd" "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/paramtable" + "github.com/milvus-io/milvus/pkg/util/typeutil" ) const ( @@ -136,15 +138,10 @@ func (suite *JobSuite) SetupSuite() { suite.broker.EXPECT().DescribeCollection(mock.Anything, mock.Anything). Return(nil, nil) suite.broker.EXPECT().ListIndexes(mock.Anything, mock.Anything). - Return(nil, nil) + Return(nil, nil).Maybe() suite.cluster = session.NewMockCluster(suite.T()) - suite.cluster.EXPECT(). - LoadPartitions(mock.Anything, mock.Anything, mock.Anything). - Return(merr.Success(), nil) - suite.cluster.EXPECT(). - ReleasePartitions(mock.Anything, mock.Anything, mock.Anything). - Return(merr.Success(), nil).Maybe() + suite.cluster.EXPECT().SyncDistribution(mock.Anything, mock.Anything, mock.Anything).Return(merr.Success(), nil).Maybe() suite.proxyManager = proxyutil.NewMockProxyClientManager(suite.T()) suite.proxyManager.EXPECT().InvalidateCollectionMetaCache(mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe() @@ -247,7 +244,6 @@ func (suite *JobSuite) TestLoadCollection() { suite.dist, suite.meta, suite.broker, - suite.cluster, suite.targetMgr, suite.targetObserver, suite.collectionObserver, @@ -275,7 +271,6 @@ func (suite *JobSuite) TestLoadCollection() { suite.dist, suite.meta, suite.broker, - suite.cluster, suite.targetMgr, suite.targetObserver, suite.collectionObserver, @@ -301,7 +296,6 @@ func (suite *JobSuite) TestLoadCollection() { suite.dist, suite.meta, suite.broker, - suite.cluster, suite.targetMgr, suite.targetObserver, suite.collectionObserver, @@ -329,7 +323,6 @@ func (suite *JobSuite) TestLoadCollection() { suite.dist, suite.meta, suite.broker, - suite.cluster, suite.targetMgr, suite.targetObserver, suite.collectionObserver, @@ -365,7 +358,6 @@ func (suite *JobSuite) TestLoadCollection() { suite.dist, suite.meta, suite.broker, - suite.cluster, suite.targetMgr, suite.targetObserver, suite.collectionObserver, @@ -387,7 +379,6 @@ func (suite *JobSuite) TestLoadCollection() { suite.dist, suite.meta, suite.broker, - suite.cluster, suite.targetMgr, suite.targetObserver, suite.collectionObserver, @@ -417,7 +408,6 @@ func (suite *JobSuite) TestLoadCollectionWithReplicas() { suite.dist, suite.meta, suite.broker, - suite.cluster, suite.targetMgr, suite.targetObserver, suite.collectionObserver, @@ -449,7 +439,6 @@ func (suite *JobSuite) TestLoadCollectionWithLoadFields() { suite.dist, suite.meta, suite.broker, - suite.cluster, suite.targetMgr, suite.targetObserver, suite.collectionObserver, @@ -479,7 +468,6 @@ func (suite *JobSuite) TestLoadCollectionWithLoadFields() { suite.dist, suite.meta, suite.broker, - suite.cluster, suite.targetMgr, suite.targetObserver, suite.collectionObserver, @@ -507,7 +495,6 @@ func (suite *JobSuite) TestLoadCollectionWithLoadFields() { suite.dist, suite.meta, suite.broker, - suite.cluster, suite.targetMgr, suite.targetObserver, suite.collectionObserver, @@ -541,7 +528,6 @@ func (suite *JobSuite) TestLoadCollectionWithLoadFields() { suite.dist, suite.meta, suite.broker, - suite.cluster, suite.targetMgr, suite.targetObserver, suite.collectionObserver, @@ -574,7 +560,6 @@ func (suite *JobSuite) TestLoadPartition() { suite.dist, suite.meta, suite.broker, - suite.cluster, suite.targetMgr, suite.targetObserver, suite.collectionObserver, @@ -605,7 +590,6 @@ func (suite *JobSuite) TestLoadPartition() { suite.dist, suite.meta, suite.broker, - suite.cluster, suite.targetMgr, suite.targetObserver, suite.collectionObserver, @@ -633,7 +617,6 @@ func (suite *JobSuite) TestLoadPartition() { suite.dist, suite.meta, suite.broker, - suite.cluster, suite.targetMgr, suite.targetObserver, suite.collectionObserver, @@ -661,7 +644,6 @@ func (suite *JobSuite) TestLoadPartition() { suite.dist, suite.meta, suite.broker, - suite.cluster, suite.targetMgr, suite.targetObserver, suite.collectionObserver, @@ -688,7 +670,6 @@ func (suite *JobSuite) TestLoadPartition() { suite.dist, suite.meta, suite.broker, - suite.cluster, suite.targetMgr, suite.targetObserver, suite.collectionObserver, @@ -724,7 +705,6 @@ func (suite *JobSuite) TestLoadPartition() { suite.dist, suite.meta, suite.broker, - suite.cluster, suite.targetMgr, suite.targetObserver, suite.collectionObserver, @@ -747,7 +727,6 @@ func (suite *JobSuite) TestLoadPartition() { suite.dist, suite.meta, suite.broker, - suite.cluster, suite.targetMgr, suite.targetObserver, suite.collectionObserver, @@ -780,7 +759,6 @@ func (suite *JobSuite) TestLoadPartitionWithLoadFields() { suite.dist, suite.meta, suite.broker, - suite.cluster, suite.targetMgr, suite.targetObserver, suite.collectionObserver, @@ -813,7 +791,6 @@ func (suite *JobSuite) TestLoadPartitionWithLoadFields() { suite.dist, suite.meta, suite.broker, - suite.cluster, suite.targetMgr, suite.targetObserver, suite.collectionObserver, @@ -843,7 +820,6 @@ func (suite *JobSuite) TestLoadPartitionWithLoadFields() { suite.dist, suite.meta, suite.broker, - suite.cluster, suite.targetMgr, suite.targetObserver, suite.collectionObserver, @@ -879,7 +855,6 @@ func (suite *JobSuite) TestLoadPartitionWithLoadFields() { suite.dist, suite.meta, suite.broker, - suite.cluster, suite.targetMgr, suite.targetObserver, suite.collectionObserver, @@ -909,7 +884,6 @@ func (suite *JobSuite) TestDynamicLoad() { suite.dist, suite.meta, suite.broker, - suite.cluster, suite.targetMgr, suite.targetObserver, suite.collectionObserver, @@ -928,7 +902,6 @@ func (suite *JobSuite) TestDynamicLoad() { suite.dist, suite.meta, suite.broker, - suite.cluster, suite.targetMgr, suite.targetObserver, suite.collectionObserver, @@ -1028,7 +1001,6 @@ func (suite *JobSuite) TestLoadPartitionWithReplicas() { suite.dist, suite.meta, suite.broker, - suite.cluster, suite.targetMgr, suite.targetObserver, suite.collectionObserver, @@ -1056,7 +1028,6 @@ func (suite *JobSuite) TestReleaseCollection() { suite.dist, suite.meta, suite.broker, - suite.cluster, suite.targetMgr, suite.targetObserver, @@ -1080,7 +1051,6 @@ func (suite *JobSuite) TestReleaseCollection() { suite.dist, suite.meta, suite.broker, - suite.cluster, suite.targetMgr, suite.targetObserver, suite.checkerController, @@ -1110,7 +1080,6 @@ func (suite *JobSuite) TestReleasePartition() { suite.dist, suite.meta, suite.broker, - suite.cluster, suite.targetMgr, suite.targetObserver, suite.checkerController, @@ -1134,7 +1103,6 @@ func (suite *JobSuite) TestReleasePartition() { suite.dist, suite.meta, suite.broker, - suite.cluster, suite.targetMgr, suite.targetObserver, suite.checkerController, @@ -1149,6 +1117,12 @@ func (suite *JobSuite) TestReleasePartition() { // Test release partial partitions suite.releaseAll() suite.loadAll() + for _, collectionID := range suite.collections { + // make collection able to get into loaded state + suite.updateChannelDist(ctx, collectionID, true) + suite.updateSegmentDist(collectionID, 3000, suite.partitions[collectionID]...) + waitCurrentTargetUpdated(ctx, suite.targetObserver, collectionID) + } for _, collection := range suite.collections { req := &querypb.ReleasePartitionsRequest{ CollectionID: collection, @@ -1160,13 +1134,14 @@ func (suite *JobSuite) TestReleasePartition() { suite.dist, suite.meta, suite.broker, - suite.cluster, suite.targetMgr, suite.targetObserver, suite.checkerController, suite.proxyManager, ) suite.scheduler.Add(job) + suite.updateChannelDist(ctx, collection, true) + suite.updateSegmentDist(collection, 3000, suite.partitions[collection][:1]...) err := job.Wait() suite.NoError(err) suite.True(suite.meta.Exist(ctx, collection)) @@ -1194,7 +1169,6 @@ func (suite *JobSuite) TestDynamicRelease() { suite.dist, suite.meta, suite.broker, - suite.cluster, suite.targetMgr, suite.targetObserver, suite.checkerController, @@ -1212,7 +1186,6 @@ func (suite *JobSuite) TestDynamicRelease() { suite.dist, suite.meta, suite.broker, - suite.cluster, suite.targetMgr, suite.targetObserver, suite.checkerController, @@ -1225,8 +1198,18 @@ func (suite *JobSuite) TestDynamicRelease() { // action: release p0 // expect: p0 released, p1, p2 loaded suite.loadAll() + for _, collectionID := range suite.collections { + // make collection able to get into loaded state + suite.updateChannelDist(ctx, collectionID, true) + suite.updateSegmentDist(collectionID, 3000, suite.partitions[collectionID]...) + waitCurrentTargetUpdated(ctx, suite.targetObserver, collectionID) + } + job := newReleasePartJob(col0, p0) suite.scheduler.Add(job) + // update segments + suite.updateSegmentDist(col0, 3000, p1, p2) + suite.updateChannelDist(ctx, col0, true) err := job.Wait() suite.NoError(err) suite.assertPartitionReleased(col0, p0) @@ -1237,6 +1220,8 @@ func (suite *JobSuite) TestDynamicRelease() { // expect: p1 released, p2 loaded job = newReleasePartJob(col0, p0, p1) suite.scheduler.Add(job) + suite.updateSegmentDist(col0, 3000, p2) + suite.updateChannelDist(ctx, col0, true) err = job.Wait() suite.NoError(err) suite.assertPartitionReleased(col0, p0, p1) @@ -1247,6 +1232,8 @@ func (suite *JobSuite) TestDynamicRelease() { // expect: loadType=col: col loaded, p2 released job = newReleasePartJob(col0, p2) suite.scheduler.Add(job) + suite.updateSegmentDist(col0, 3000) + suite.updateChannelDist(ctx, col0, false) err = job.Wait() suite.NoError(err) suite.assertPartitionReleased(col0, p0, p1, p2) @@ -1307,7 +1294,6 @@ func (suite *JobSuite) TestLoadCollectionStoreFailed() { suite.dist, suite.meta, suite.broker, - suite.cluster, suite.targetMgr, suite.targetObserver, suite.collectionObserver, @@ -1350,7 +1336,6 @@ func (suite *JobSuite) TestLoadPartitionStoreFailed() { suite.dist, suite.meta, suite.broker, - suite.cluster, suite.targetMgr, suite.targetObserver, suite.collectionObserver, @@ -1378,7 +1363,6 @@ func (suite *JobSuite) TestLoadCreateReplicaFailed() { suite.dist, suite.meta, suite.broker, - suite.cluster, suite.targetMgr, suite.targetObserver, suite.collectionObserver, @@ -1390,183 +1374,28 @@ func (suite *JobSuite) TestLoadCreateReplicaFailed() { } } -func (suite *JobSuite) TestCallLoadPartitionFailed() { - // call LoadPartitions failed at get index info - getIndexErr := fmt.Errorf("mock get index error") - suite.broker.ExpectedCalls = lo.Filter(suite.broker.ExpectedCalls, func(call *mock.Call, _ int) bool { - return call.Method != "ListIndexes" - }) - for _, collection := range suite.collections { - suite.broker.EXPECT().ListIndexes(mock.Anything, collection).Return(nil, getIndexErr) - loadCollectionReq := &querypb.LoadCollectionRequest{ - CollectionID: collection, - } - loadCollectionJob := NewLoadCollectionJob( - context.Background(), - loadCollectionReq, - suite.dist, - suite.meta, - suite.broker, - suite.cluster, - suite.targetMgr, - suite.targetObserver, - suite.collectionObserver, - suite.nodeMgr, - ) - suite.scheduler.Add(loadCollectionJob) - err := loadCollectionJob.Wait() - suite.T().Logf("%s", err) - suite.ErrorIs(err, getIndexErr) - - loadPartitionReq := &querypb.LoadPartitionsRequest{ - CollectionID: collection, - PartitionIDs: suite.partitions[collection], - } - loadPartitionJob := NewLoadPartitionJob( - context.Background(), - loadPartitionReq, - suite.dist, - suite.meta, - suite.broker, - suite.cluster, - suite.targetMgr, - suite.targetObserver, - suite.collectionObserver, - suite.nodeMgr, - ) - suite.scheduler.Add(loadPartitionJob) - err = loadPartitionJob.Wait() - suite.ErrorIs(err, getIndexErr) - } - - // call LoadPartitions failed at get schema - getSchemaErr := fmt.Errorf("mock get schema error") - suite.broker.ExpectedCalls = lo.Filter(suite.broker.ExpectedCalls, func(call *mock.Call, _ int) bool { - return call.Method != "DescribeCollection" - }) - for _, collection := range suite.collections { - suite.broker.EXPECT().DescribeCollection(mock.Anything, collection).Return(nil, getSchemaErr) - loadCollectionReq := &querypb.LoadCollectionRequest{ - CollectionID: collection, - } - loadCollectionJob := NewLoadCollectionJob( - context.Background(), - loadCollectionReq, - suite.dist, - suite.meta, - suite.broker, - suite.cluster, - suite.targetMgr, - suite.targetObserver, - suite.collectionObserver, - suite.nodeMgr, - ) - suite.scheduler.Add(loadCollectionJob) - err := loadCollectionJob.Wait() - suite.ErrorIs(err, getSchemaErr) - - loadPartitionReq := &querypb.LoadPartitionsRequest{ - CollectionID: collection, - PartitionIDs: suite.partitions[collection], - } - loadPartitionJob := NewLoadPartitionJob( - context.Background(), - loadPartitionReq, - suite.dist, - suite.meta, - suite.broker, - suite.cluster, - suite.targetMgr, - suite.targetObserver, - suite.collectionObserver, - suite.nodeMgr, - ) - suite.scheduler.Add(loadPartitionJob) - err = loadPartitionJob.Wait() - suite.ErrorIs(err, getSchemaErr) - } - - suite.broker.ExpectedCalls = lo.Filter(suite.broker.ExpectedCalls, func(call *mock.Call, _ int) bool { - return call.Method != "ListIndexes" && call.Method != "DescribeCollection" - }) - suite.broker.EXPECT().DescribeCollection(mock.Anything, mock.Anything).Return(nil, nil) - suite.broker.EXPECT().ListIndexes(mock.Anything, mock.Anything).Return(nil, nil) -} - -func (suite *JobSuite) TestCallReleasePartitionFailed() { - ctx := context.Background() - suite.loadAll() - - releasePartitionErr := fmt.Errorf("mock release partitions error") - suite.cluster.ExpectedCalls = lo.Filter(suite.cluster.ExpectedCalls, func(call *mock.Call, _ int) bool { - return call.Method != "ReleasePartitions" - }) - suite.cluster.EXPECT().ReleasePartitions(mock.Anything, mock.Anything, mock.Anything). - Return(nil, releasePartitionErr) - for _, collection := range suite.collections { - releaseCollectionReq := &querypb.ReleaseCollectionRequest{ - CollectionID: collection, - } - releaseCollectionJob := NewReleaseCollectionJob( - ctx, - releaseCollectionReq, - suite.dist, - suite.meta, - suite.broker, - suite.cluster, - suite.targetMgr, - suite.targetObserver, - suite.checkerController, - suite.proxyManager, - ) - suite.scheduler.Add(releaseCollectionJob) - err := releaseCollectionJob.Wait() - suite.NoError(err) - - releasePartitionReq := &querypb.ReleasePartitionsRequest{ - CollectionID: collection, - PartitionIDs: suite.partitions[collection], - } - releasePartitionJob := NewReleasePartitionJob( - ctx, - releasePartitionReq, - suite.dist, - suite.meta, - suite.broker, - suite.cluster, - suite.targetMgr, - suite.targetObserver, - suite.checkerController, - suite.proxyManager, - ) - suite.scheduler.Add(releasePartitionJob) - err = releasePartitionJob.Wait() - suite.NoError(err) - } - - suite.cluster.ExpectedCalls = lo.Filter(suite.cluster.ExpectedCalls, func(call *mock.Call, _ int) bool { - return call.Method != "ReleasePartitions" - }) - suite.cluster.EXPECT().ReleasePartitions(mock.Anything, mock.Anything, mock.Anything). - Return(merr.Success(), nil) -} - func (suite *JobSuite) TestSyncNewCreatedPartition() { newPartition := int64(999) ctx := context.Background() // test sync new created partition suite.loadAll() + collectionID := suite.collections[0] + // make collection able to get into loaded state + suite.updateChannelDist(ctx, collectionID, true) + suite.updateSegmentDist(collectionID, 3000, suite.partitions[collectionID]...) + req := &querypb.SyncNewCreatedPartitionRequest{ - CollectionID: suite.collections[0], + CollectionID: collectionID, PartitionID: newPartition, } job := NewSyncNewCreatedPartitionJob( - context.Background(), + ctx, req, suite.meta, - suite.cluster, suite.broker, + suite.targetObserver, + suite.targetMgr, ) suite.scheduler.Add(job) err := job.Wait() @@ -1581,11 +1410,12 @@ func (suite *JobSuite) TestSyncNewCreatedPartition() { PartitionID: newPartition, } job = NewSyncNewCreatedPartitionJob( - context.Background(), + ctx, req, suite.meta, - suite.cluster, suite.broker, + suite.targetObserver, + suite.targetMgr, ) suite.scheduler.Add(job) err = job.Wait() @@ -1597,11 +1427,12 @@ func (suite *JobSuite) TestSyncNewCreatedPartition() { PartitionID: newPartition, } job = NewSyncNewCreatedPartitionJob( - context.Background(), + ctx, req, suite.meta, - suite.cluster, suite.broker, + suite.targetObserver, + suite.targetMgr, ) suite.scheduler.Add(job) err = job.Wait() @@ -1621,7 +1452,6 @@ func (suite *JobSuite) loadAll() { suite.dist, suite.meta, suite.broker, - suite.cluster, suite.targetMgr, suite.targetObserver, suite.collectionObserver, @@ -1646,7 +1476,6 @@ func (suite *JobSuite) loadAll() { suite.dist, suite.meta, suite.broker, - suite.cluster, suite.targetMgr, suite.targetObserver, suite.collectionObserver, @@ -1676,7 +1505,6 @@ func (suite *JobSuite) releaseAll() { suite.dist, suite.meta, suite.broker, - suite.cluster, suite.targetMgr, suite.targetObserver, suite.checkerController, @@ -1746,6 +1574,59 @@ func (suite *JobSuite) assertPartitionReleased(collection int64, partitionIDs .. } } +func (suite *JobSuite) updateSegmentDist(collection, node int64, partitions ...int64) { + partitionSet := typeutil.NewSet(partitions...) + metaSegments := make([]*meta.Segment, 0) + for partition, segments := range suite.segments[collection] { + if !partitionSet.Contain(partition) { + continue + } + for _, segment := range segments { + metaSegments = append(metaSegments, + utils.CreateTestSegment(collection, partition, segment, node, 1, "test-channel")) + } + } + suite.dist.SegmentDistManager.Update(node, metaSegments...) +} + +func (suite *JobSuite) updateChannelDist(ctx context.Context, collection int64, loaded bool) { + channels := suite.channels[collection] + segments := lo.Flatten(lo.Values(suite.segments[collection])) + + replicas := suite.meta.ReplicaManager.GetByCollection(ctx, collection) + for _, replica := range replicas { + if loaded { + i := 0 + for _, node := range replica.GetNodes() { + suite.dist.ChannelDistManager.Update(node, meta.DmChannelFromVChannel(&datapb.VchannelInfo{ + CollectionID: collection, + ChannelName: channels[i], + })) + suite.dist.LeaderViewManager.Update(node, &meta.LeaderView{ + ID: node, + CollectionID: collection, + Channel: channels[i], + Segments: lo.SliceToMap(segments, func(segment int64) (int64, *querypb.SegmentDist) { + return segment, &querypb.SegmentDist{ + NodeID: node, + Version: time.Now().Unix(), + } + }), + }) + i++ + if i >= len(channels) { + break + } + } + } else { + for _, node := range replica.GetNodes() { + suite.dist.ChannelDistManager.Update(node) + suite.dist.LeaderViewManager.Update(node) + } + } + } +} + func TestJob(t *testing.T) { suite.Run(t, new(JobSuite)) } diff --git a/internal/querycoordv2/job/undo.go b/internal/querycoordv2/job/undo.go index 3fe97e98aff63..d7d5427e3c66f 100644 --- a/internal/querycoordv2/job/undo.go +++ b/internal/querycoordv2/job/undo.go @@ -23,7 +23,6 @@ import ( "github.com/milvus-io/milvus/internal/querycoordv2/meta" "github.com/milvus-io/milvus/internal/querycoordv2/observers" - "github.com/milvus-io/milvus/internal/querycoordv2/session" "github.com/milvus-io/milvus/pkg/log" ) @@ -37,18 +36,16 @@ type UndoList struct { ctx context.Context meta *meta.Meta - cluster session.Cluster targetMgr meta.TargetManagerInterface targetObserver *observers.TargetObserver } func NewUndoList(ctx context.Context, meta *meta.Meta, - cluster session.Cluster, targetMgr meta.TargetManagerInterface, targetObserver *observers.TargetObserver, + targetMgr meta.TargetManagerInterface, targetObserver *observers.TargetObserver, ) *UndoList { return &UndoList{ ctx: ctx, meta: meta, - cluster: cluster, targetMgr: targetMgr, targetObserver: targetObserver, } diff --git a/internal/querycoordv2/job/utils.go b/internal/querycoordv2/job/utils.go index a202d03662b78..a98abfbe8f8be 100644 --- a/internal/querycoordv2/job/utils.go +++ b/internal/querycoordv2/job/utils.go @@ -23,14 +23,10 @@ import ( "github.com/samber/lo" "go.uber.org/zap" - "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" - "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" - "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/querycoordv2/checkers" "github.com/milvus-io/milvus/internal/querycoordv2/meta" - "github.com/milvus-io/milvus/internal/querycoordv2/session" + "github.com/milvus-io/milvus/internal/querycoordv2/observers" "github.com/milvus-io/milvus/pkg/log" - "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -68,82 +64,21 @@ func waitCollectionReleased(dist *meta.DistributionManager, checkerController *c } } -func loadPartitions(ctx context.Context, - meta *meta.Meta, - cluster session.Cluster, - broker meta.Broker, - withSchema bool, - collection int64, - partitions ...int64, -) error { - var err error - var schema *schemapb.CollectionSchema - if withSchema { - collectionInfo, err := broker.DescribeCollection(ctx, collection) - if err != nil { - return err - } - schema = collectionInfo.GetSchema() - } - indexes, err := broker.ListIndexes(ctx, collection) +func waitCurrentTargetUpdated(ctx context.Context, targetObserver *observers.TargetObserver, collection int64) error { + // manual trigger update next target + ready, err := targetObserver.UpdateNextTarget(collection) if err != nil { + log.Warn("failed to update next target for sync partition job", zap.Error(err)) return err } - replicas := meta.ReplicaManager.GetByCollection(ctx, collection) - loadReq := &querypb.LoadPartitionsRequest{ - Base: &commonpb.MsgBase{ - MsgType: commonpb.MsgType_LoadPartitions, - }, - CollectionID: collection, - PartitionIDs: partitions, - Schema: schema, - IndexInfoList: indexes, - } - for _, replica := range replicas { - for _, node := range replica.GetNodes() { - status, err := cluster.LoadPartitions(ctx, node, loadReq) - // There is no need to rollback LoadPartitions as the load job will fail - // and the Delegator will not be created, - // resulting in search and query requests failing due to the absence of Delegator. - if err != nil { - return err - } - if !merr.Ok(status) { - return merr.Error(status) - } - } - } - return nil -} - -func releasePartitions(ctx context.Context, - meta *meta.Meta, - cluster session.Cluster, - collection int64, - partitions ...int64, -) { - log := log.Ctx(ctx).With(zap.Int64("collection", collection), zap.Int64s("partitions", partitions)) - replicas := meta.ReplicaManager.GetByCollection(ctx, collection) - releaseReq := &querypb.ReleasePartitionsRequest{ - Base: &commonpb.MsgBase{ - MsgType: commonpb.MsgType_ReleasePartitions, - }, - CollectionID: collection, - PartitionIDs: partitions, - } - for _, replica := range replicas { - for _, node := range replica.GetNodes() { - status, err := cluster.ReleasePartitions(ctx, node, releaseReq) - // Ignore error as the Delegator will be removed from the query node, - // causing search and query requests to fail due to the absence of Delegator. - if err != nil { - log.Warn("failed to ReleasePartitions", zap.Int64("node", node), zap.Error(err)) - continue - } - if !merr.Ok(status) { - log.Warn("failed to ReleasePartitions", zap.Int64("node", node), zap.Error(merr.Error(status))) - } - } + // accelerate check + targetObserver.TriggerUpdateCurrentTarget(collection) + // wait current target ready + select { + case <-ready: + return nil + case <-ctx.Done(): + return ctx.Err() } } diff --git a/internal/querycoordv2/meta/mock_target_manager.go b/internal/querycoordv2/meta/mock_target_manager.go index d31dece57cbd0..cbf588960e166 100644 --- a/internal/querycoordv2/meta/mock_target_manager.go +++ b/internal/querycoordv2/meta/mock_target_manager.go @@ -375,6 +375,66 @@ func (_c *MockTargetManager_GetGrowingSegmentsByCollection_Call) RunAndReturn(ru return _c } +// GetPartitions provides a mock function with given fields: ctx, collectionID, scope +func (_m *MockTargetManager) GetPartitions(ctx context.Context, collectionID int64, scope int32) ([]int64, error) { + ret := _m.Called(ctx, collectionID, scope) + + if len(ret) == 0 { + panic("no return value specified for GetPartitions") + } + + var r0 []int64 + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, int64, int32) ([]int64, error)); ok { + return rf(ctx, collectionID, scope) + } + if rf, ok := ret.Get(0).(func(context.Context, int64, int32) []int64); ok { + r0 = rf(ctx, collectionID, scope) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]int64) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, int64, int32) error); ok { + r1 = rf(ctx, collectionID, scope) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockTargetManager_GetPartitions_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetPartitions' +type MockTargetManager_GetPartitions_Call struct { + *mock.Call +} + +// GetPartitions is a helper method to define mock.On call +// - ctx context.Context +// - collectionID int64 +// - scope int32 +func (_e *MockTargetManager_Expecter) GetPartitions(ctx interface{}, collectionID interface{}, scope interface{}) *MockTargetManager_GetPartitions_Call { + return &MockTargetManager_GetPartitions_Call{Call: _e.mock.On("GetPartitions", ctx, collectionID, scope)} +} + +func (_c *MockTargetManager_GetPartitions_Call) Run(run func(ctx context.Context, collectionID int64, scope int32)) *MockTargetManager_GetPartitions_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(int64), args[2].(int32)) + }) + return _c +} + +func (_c *MockTargetManager_GetPartitions_Call) Return(_a0 []int64, _a1 error) *MockTargetManager_GetPartitions_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockTargetManager_GetPartitions_Call) RunAndReturn(run func(context.Context, int64, int32) ([]int64, error)) *MockTargetManager_GetPartitions_Call { + _c.Call.Return(run) + return _c +} + // GetSealedSegment provides a mock function with given fields: ctx, collectionID, id, scope func (_m *MockTargetManager) GetSealedSegment(ctx context.Context, collectionID int64, id int64, scope int32) *datapb.SegmentInfo { ret := _m.Called(ctx, collectionID, id, scope) diff --git a/internal/querycoordv2/meta/target_manager.go b/internal/querycoordv2/meta/target_manager.go index 68f5b8cf7ba55..cfedeaa93672e 100644 --- a/internal/querycoordv2/meta/target_manager.go +++ b/internal/querycoordv2/meta/target_manager.go @@ -34,6 +34,7 @@ import ( "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/metrics" "github.com/milvus-io/milvus/pkg/util/conc" + "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/retry" "github.com/milvus-io/milvus/pkg/util/tsoutil" @@ -70,6 +71,7 @@ type TargetManagerInterface interface { Recover(ctx context.Context, catalog metastore.QueryCoordCatalog) error CanSegmentBeMoved(ctx context.Context, collectionID, segmentID int64) bool GetTargetJSON(ctx context.Context, scope TargetScope) string + GetPartitions(ctx context.Context, collectionID int64, scope TargetScope) ([]int64, error) } type TargetManager struct { @@ -140,9 +142,9 @@ func (mgr *TargetManager) UpdateCollectionCurrentTarget(ctx context.Context, col func (mgr *TargetManager) UpdateCollectionNextTarget(ctx context.Context, collectionID int64) error { var vChannelInfos []*datapb.VchannelInfo var segmentInfos []*datapb.SegmentInfo - err := retry.Handle(context.TODO(), func() (bool, error) { + err := retry.Handle(ctx, func() (bool, error) { var err error - vChannelInfos, segmentInfos, err = mgr.broker.GetRecoveryInfoV2(context.TODO(), collectionID) + vChannelInfos, segmentInfos, err = mgr.broker.GetRecoveryInfoV2(ctx, collectionID) if err != nil { return true, err } @@ -651,6 +653,18 @@ func (mgr *TargetManager) GetTargetJSON(ctx context.Context, scope TargetScope) return string(v) } +func (mgr *TargetManager) GetPartitions(ctx context.Context, collectionID int64, scope TargetScope) ([]int64, error) { + mgr.rwMutex.RLock() + defer mgr.rwMutex.RUnlock() + + ret := mgr.getCollectionTarget(scope, collectionID) + if len(ret) == 0 { + return nil, merr.WrapErrCollectionNotLoaded(collectionID) + } + + return ret[0].partitions.Collect(), nil +} + func (mgr *TargetManager) getTarget(scope TargetScope) *target { if scope == CurrentTarget { return mgr.current diff --git a/internal/querycoordv2/observers/collection_observer_test.go b/internal/querycoordv2/observers/collection_observer_test.go index 8bb4f6cad876f..53b2e47a1b711 100644 --- a/internal/querycoordv2/observers/collection_observer_test.go +++ b/internal/querycoordv2/observers/collection_observer_test.go @@ -174,6 +174,7 @@ func (suite *CollectionObserverSuite) SetupSuite() { } func (suite *CollectionObserverSuite) SetupTest() { + suite.ctx = context.Background() // Mocks var err error suite.idAllocator = RandomIncrementIDAllocator() diff --git a/internal/querycoordv2/observers/target_observer.go b/internal/querycoordv2/observers/target_observer.go index eb415ba567620..ac636c40f31f4 100644 --- a/internal/querycoordv2/observers/target_observer.go +++ b/internal/querycoordv2/observers/target_observer.go @@ -26,7 +26,6 @@ import ( "go.uber.org/zap" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" - "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" "github.com/milvus-io/milvus/internal/proto/indexpb" "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/querycoordv2/meta" @@ -398,7 +397,6 @@ func (ob *TargetObserver) shouldUpdateCurrentTarget(ctx context.Context, collect collectionReadyLeaders = append(collectionReadyLeaders, channelReadyLeaders...) } - var collectionInfo *milvuspb.DescribeCollectionResponse var partitions []int64 var indexInfo []*indexpb.IndexInfo var err error @@ -413,16 +411,9 @@ func (ob *TargetObserver) shouldUpdateCurrentTarget(ctx context.Context, collect log.Warn("replica not found", zap.Int64("nodeID", leader.ID), zap.Int64("collectionID", collectionID)) continue } - // init all the meta information - if collectionInfo == nil { - collectionInfo, err = ob.broker.DescribeCollection(ctx, collectionID) - if err != nil { - log.Warn("failed to get collection info", zap.Error(err)) - return false - } - - partitions, err = utils.GetPartitions(ctx, ob.meta.CollectionManager, collectionID) + if partitions == nil { + partitions, err = utils.GetPartitions(ctx, ob.targetMgr, collectionID) if err != nil { log.Warn("failed to get partitions", zap.Error(err)) return false @@ -436,7 +427,7 @@ func (ob *TargetObserver) shouldUpdateCurrentTarget(ctx context.Context, collect } } - if !ob.sync(ctx, replica, leader, []*querypb.SyncAction{updateVersionAction}, collectionInfo, partitions, indexInfo) { + if !ob.sync(ctx, replica, leader, []*querypb.SyncAction{updateVersionAction}, partitions, indexInfo) { return false } } @@ -444,7 +435,7 @@ func (ob *TargetObserver) shouldUpdateCurrentTarget(ctx context.Context, collect } func (ob *TargetObserver) sync(ctx context.Context, replica *meta.Replica, leaderView *meta.LeaderView, diffs []*querypb.SyncAction, - collectionInfo *milvuspb.DescribeCollectionResponse, partitions []int64, indexInfo []*indexpb.IndexInfo, + partitions []int64, indexInfo []*indexpb.IndexInfo, ) bool { if len(diffs) == 0 { return true @@ -465,12 +456,10 @@ func (ob *TargetObserver) sync(ctx context.Context, replica *meta.Replica, leade ReplicaID: replicaID, Channel: leaderView.Channel, Actions: diffs, - Schema: collectionInfo.GetSchema(), LoadMeta: &querypb.LoadMetaInfo{ LoadType: ob.meta.GetLoadType(ctx, leaderView.CollectionID), CollectionID: leaderView.CollectionID, PartitionIDs: partitions, - DbName: collectionInfo.GetDbName(), ResourceGroup: replica.GetResourceGroup(), }, Version: time.Now().UnixNano(), @@ -478,6 +467,7 @@ func (ob *TargetObserver) sync(ctx context.Context, replica *meta.Replica, leade } ctx, cancel := context.WithTimeout(ctx, paramtable.Get().QueryCoordCfg.SegmentTaskTimeout.GetAsDuration(time.Millisecond)) defer cancel() + resp, err := ob.cluster.SyncDistribution(ctx, leaderView.ID, req) if err != nil { log.Warn("failed to sync distribution", zap.Error(err)) diff --git a/internal/querycoordv2/services.go b/internal/querycoordv2/services.go index d764dcaddba01..456711e326315 100644 --- a/internal/querycoordv2/services.go +++ b/internal/querycoordv2/services.go @@ -287,7 +287,6 @@ func (s *Server) LoadCollection(ctx context.Context, req *querypb.LoadCollection s.dist, s.meta, s.broker, - s.cluster, s.targetMgr, s.targetObserver, s.collectionObserver, @@ -328,7 +327,6 @@ func (s *Server) ReleaseCollection(ctx context.Context, req *querypb.ReleaseColl s.dist, s.meta, s.broker, - s.cluster, s.targetMgr, s.targetObserver, s.checkerController, @@ -404,7 +402,6 @@ func (s *Server) LoadPartitions(ctx context.Context, req *querypb.LoadPartitions s.dist, s.meta, s.broker, - s.cluster, s.targetMgr, s.targetObserver, s.collectionObserver, @@ -451,7 +448,6 @@ func (s *Server) ReleasePartitions(ctx context.Context, req *querypb.ReleasePart s.dist, s.meta, s.broker, - s.cluster, s.targetMgr, s.targetObserver, s.checkerController, @@ -596,7 +592,7 @@ func (s *Server) SyncNewCreatedPartition(ctx context.Context, req *querypb.SyncN return merr.Status(err), nil } - syncJob := job.NewSyncNewCreatedPartitionJob(ctx, req, s.meta, s.cluster, s.broker) + syncJob := job.NewSyncNewCreatedPartitionJob(ctx, req, s.meta, s.broker, s.targetObserver, s.targetMgr) s.jobScheduler.Add(syncJob) err := syncJob.Wait() if err != nil { diff --git a/internal/querycoordv2/services_test.go b/internal/querycoordv2/services_test.go index 451bf85596b73..1381bc2a23b51 100644 --- a/internal/querycoordv2/services_test.go +++ b/internal/querycoordv2/services_test.go @@ -151,6 +151,8 @@ func (suite *ServiceSuite) SetupTest() { suite.meta = meta.NewMeta(params.RandomIncrementIDAllocator(), suite.store, suite.nodeMgr) suite.broker = meta.NewMockBroker(suite.T()) suite.targetMgr = meta.NewTargetManager(suite.broker, suite.meta) + suite.cluster = session.NewMockCluster(suite.T()) + suite.cluster.EXPECT().SyncDistribution(mock.Anything, mock.Anything, mock.Anything).Return(merr.Success(), nil).Maybe() suite.targetObserver = observers.NewTargetObserver( suite.meta, suite.targetMgr, @@ -168,8 +170,6 @@ func (suite *ServiceSuite) SetupTest() { })) suite.meta.ResourceManager.HandleNodeUp(context.TODO(), node) } - suite.cluster = session.NewMockCluster(suite.T()) - suite.cluster.EXPECT().SyncDistribution(mock.Anything, mock.Anything, mock.Anything).Return(merr.Success(), nil).Maybe() suite.jobScheduler = job.NewScheduler() suite.taskScheduler = task.NewMockScheduler(suite.T()) suite.taskScheduler.EXPECT().GetSegmentTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe() @@ -345,8 +345,9 @@ func (suite *ServiceSuite) TestLoadCollection() { // Test load all collections for _, collection := range suite.collections { + suite.broker.EXPECT().DescribeCollection(mock.Anything, mock.Anything). + Return(nil, nil) suite.expectGetRecoverInfo(collection) - suite.expectLoadPartitions() req := &querypb.LoadCollectionRequest{ CollectionID: collection, @@ -914,7 +915,8 @@ func (suite *ServiceSuite) TestLoadPartition() { // Test load all partitions for _, collection := range suite.collections { - suite.expectLoadPartitions() + suite.broker.EXPECT().DescribeCollection(mock.Anything, mock.Anything). + Return(nil, nil) suite.expectGetRecoverInfo(collection) req := &querypb.LoadPartitionsRequest{ @@ -1009,9 +1011,6 @@ func (suite *ServiceSuite) TestReleaseCollection() { ctx := context.Background() server := suite.server - suite.cluster.EXPECT().ReleasePartitions(mock.Anything, mock.Anything, mock.Anything). - Return(merr.Success(), nil) - // Test release all collections for _, collection := range suite.collections { req := &querypb.ReleaseCollectionRequest{ @@ -1044,18 +1043,23 @@ func (suite *ServiceSuite) TestReleaseCollection() { } func (suite *ServiceSuite) TestReleasePartition() { - suite.loadAll() ctx := context.Background() + suite.loadAll() + for _, collection := range suite.collections { + suite.updateChannelDist(ctx, collection) + suite.updateSegmentDist(collection, suite.nodes[0]) + } + server := suite.server // Test release all partitions - suite.cluster.EXPECT().ReleasePartitions(mock.Anything, mock.Anything, mock.Anything). - Return(merr.Success(), nil) for _, collection := range suite.collections { req := &querypb.ReleasePartitionsRequest{ CollectionID: collection, PartitionIDs: suite.partitions[collection][0:1], } + suite.updateChannelDist(ctx, collection) + suite.updateSegmentDist(collection, suite.nodes[0], suite.partitions[collection][1:]...) resp, err := server.ReleasePartitions(ctx, req) suite.NoError(err) suite.Equal(commonpb.ErrorCode_Success, resp.ErrorCode) @@ -1826,7 +1830,7 @@ func (suite *ServiceSuite) TestHandleNodeUp() { func (suite *ServiceSuite) loadAll() { ctx := context.Background() for _, collection := range suite.collections { - suite.expectLoadPartitions() + suite.expectLoadMetaRPCs() suite.expectGetRecoverInfo(collection) if suite.loadTypes[collection] == querypb.LoadType_LoadCollection { req := &querypb.LoadCollectionRequest{ @@ -1839,7 +1843,6 @@ func (suite *ServiceSuite) loadAll() { suite.dist, suite.meta, suite.broker, - suite.cluster, suite.targetMgr, suite.targetObserver, suite.collectionObserver, @@ -1864,7 +1867,6 @@ func (suite *ServiceSuite) loadAll() { suite.dist, suite.meta, suite.broker, - suite.cluster, suite.targetMgr, suite.targetObserver, suite.collectionObserver, @@ -1963,13 +1965,11 @@ func (suite *ServiceSuite) expectGetRecoverInfo(collection int64) { Return(vChannels, segmentBinlogs, nil).Maybe() } -func (suite *ServiceSuite) expectLoadPartitions() { +func (suite *ServiceSuite) expectLoadMetaRPCs() { suite.broker.EXPECT().DescribeCollection(mock.Anything, mock.Anything). - Return(nil, nil) + Return(nil, nil).Maybe() suite.broker.EXPECT().ListIndexes(mock.Anything, mock.Anything). - Return(nil, nil) - suite.cluster.EXPECT().LoadPartitions(mock.Anything, mock.Anything, mock.Anything). - Return(merr.Success(), nil) + Return(nil, nil).Maybe() } func (suite *ServiceSuite) getAllSegments(collection int64) []int64 { @@ -1980,9 +1980,13 @@ func (suite *ServiceSuite) getAllSegments(collection int64) []int64 { return allSegments } -func (suite *ServiceSuite) updateSegmentDist(collection, node int64) { +func (suite *ServiceSuite) updateSegmentDist(collection, node int64, partitions ...int64) { + partitionSet := typeutil.NewSet(partitions...) metaSegments := make([]*meta.Segment, 0) for partition, segments := range suite.segments[collection] { + if partitionSet.Len() > 0 && !partitionSet.Contain(partition) { + continue + } for _, segment := range segments { metaSegments = append(metaSegments, utils.CreateTestSegment(collection, partition, segment, node, 1, "test-channel")) diff --git a/internal/querycoordv2/task/executor.go b/internal/querycoordv2/task/executor.go index fbdc9ddc14db9..4a44836258e5b 100644 --- a/internal/querycoordv2/task/executor.go +++ b/internal/querycoordv2/task/executor.go @@ -345,7 +345,7 @@ func (ex *Executor) subscribeChannel(task *ChannelTask, step int) error { return err } loadFields := ex.meta.GetLoadFields(ctx, task.CollectionID()) - partitions, err := utils.GetPartitions(ctx, ex.meta.CollectionManager, task.CollectionID()) + partitions, err := utils.GetPartitions(ctx, ex.targetMgr, task.CollectionID()) if err != nil { log.Warn("failed to get partitions of collection") return err @@ -653,7 +653,7 @@ func (ex *Executor) getMetaInfo(ctx context.Context, task Task) (*milvuspb.Descr return nil, nil, nil, err } loadFields := ex.meta.GetLoadFields(ctx, task.CollectionID()) - partitions, err := utils.GetPartitions(ctx, ex.meta.CollectionManager, collectionID) + partitions, err := utils.GetPartitions(ctx, ex.targetMgr, collectionID) if err != nil { log.Warn("failed to get partitions of collection", zap.Error(err)) return nil, nil, nil, err diff --git a/internal/querycoordv2/utils/meta.go b/internal/querycoordv2/utils/meta.go index 9139379a5b784..b5e3e7ee50ca2 100644 --- a/internal/querycoordv2/utils/meta.go +++ b/internal/querycoordv2/utils/meta.go @@ -30,18 +30,10 @@ import ( "github.com/milvus-io/milvus/pkg/util/typeutil" ) -func GetPartitions(ctx context.Context, collectionMgr *meta.CollectionManager, collectionID int64) ([]int64, error) { - collection := collectionMgr.GetCollection(ctx, collectionID) - if collection != nil { - partitions := collectionMgr.GetPartitionsByCollection(ctx, collectionID) - if partitions != nil { - return lo.Map(partitions, func(partition *meta.Partition, i int) int64 { - return partition.PartitionID - }), nil - } - } - - return nil, merr.WrapErrCollectionNotLoaded(collectionID) +func GetPartitions(ctx context.Context, targetMgr meta.TargetManagerInterface, collectionID int64) ([]int64, error) { + // fetch next target first, sync next target contains the wanted partition list + // if not found, current will be used instead for dist adjustment requests + return targetMgr.GetPartitions(ctx, collectionID, meta.NextTargetFirst) } // GroupNodesByReplica groups nodes by replica, diff --git a/internal/querynodev2/delegator/delegator.go b/internal/querynodev2/delegator/delegator.go index 01e6f70c53675..f856bd48a9af8 100644 --- a/internal/querynodev2/delegator/delegator.go +++ b/internal/querynodev2/delegator/delegator.go @@ -82,7 +82,7 @@ type ShardDelegator interface { LoadGrowing(ctx context.Context, infos []*querypb.SegmentLoadInfo, version int64) error LoadSegments(ctx context.Context, req *querypb.LoadSegmentsRequest) error ReleaseSegments(ctx context.Context, req *querypb.ReleaseSegmentsRequest, force bool) error - SyncTargetVersion(newVersion int64, growingInTarget []int64, sealedInTarget []int64, droppedInTarget []int64, checkpoint *msgpb.MsgPosition) + SyncTargetVersion(newVersion int64, partitions []int64, growingInTarget []int64, sealedInTarget []int64, droppedInTarget []int64, checkpoint *msgpb.MsgPosition) GetTargetVersion() int64 GetDeleteBufferSize() (entryNum int64, memorySize int64) @@ -268,25 +268,6 @@ func (sd *shardDelegator) modifyQueryRequest(req *querypb.QueryRequest, scope qu return nodeReq } -func (sd *shardDelegator) getTargetPartitions(reqPartitions []int64) (searchPartitions []int64, err error) { - existPartitions := sd.collection.GetPartitions() - - // search all loaded partitions if req partition ids not provided - if len(reqPartitions) == 0 { - searchPartitions = existPartitions - return searchPartitions, nil - } - - // use brute search to avoid map struct cost - for _, partition := range reqPartitions { - if !funcutil.SliceContain(existPartitions, partition) { - return nil, merr.WrapErrPartitionNotLoaded(reqPartitions) - } - } - searchPartitions = reqPartitions - return searchPartitions, nil -} - // Search preforms search operation on shard. func (sd *shardDelegator) search(ctx context.Context, req *querypb.SearchRequest, sealed []SnapshotItem, growing []SegmentEntry) ([]*internalpb.SearchResults, error) { log := sd.getLogger(ctx) @@ -382,19 +363,10 @@ func (sd *shardDelegator) Search(ctx context.Context, req *querypb.SearchRequest sealed, growing, version, err := sd.distribution.PinReadableSegments(req.GetReq().GetPartitionIDs()...) if err != nil { - log.Warn("delegator failed to search, current distribution is not serviceable") - return nil, merr.WrapErrChannelNotAvailable(sd.vchannelName, "distribution is not servcieable") - } - defer sd.distribution.Unpin(version) - targetPartitions, err := sd.getTargetPartitions(req.GetReq().GetPartitionIDs()) - if err != nil { + log.Warn("delegator failed to search, current distribution is not serviceable", zap.Error(err)) return nil, err } - // set target partition ids to sub task request - req.Req.PartitionIDs = targetPartitions - growing = lo.Filter(growing, func(segment SegmentEntry, _ int) bool { - return funcutil.SliceContain(targetPartitions, segment.PartitionID) - }) + defer sd.distribution.Unpin(version) if req.GetReq().GetIsAdvanced() { futures := make([]*conc.Future[*internalpb.SearchResults], len(req.GetReq().GetSubReqs())) @@ -499,21 +471,11 @@ func (sd *shardDelegator) QueryStream(ctx context.Context, req *querypb.QueryReq sealed, growing, version, err := sd.distribution.PinReadableSegments(req.GetReq().GetPartitionIDs()...) if err != nil { - log.Warn("delegator failed to query, current distribution is not serviceable") - return merr.WrapErrChannelNotAvailable(sd.vchannelName, "distribution is not servcieable") - } - defer sd.distribution.Unpin(version) - - targetPartitions, err := sd.getTargetPartitions(req.GetReq().GetPartitionIDs()) - if err != nil { + log.Warn("delegator failed to query, current distribution is not serviceable", zap.Error(err)) return err } - // set target partition ids to sub task request - req.Req.PartitionIDs = targetPartitions + defer sd.distribution.Unpin(version) - growing = lo.Filter(growing, func(segment SegmentEntry, _ int) bool { - return funcutil.SliceContain(targetPartitions, segment.PartitionID) - }) if req.Req.IgnoreGrowing { growing = []SegmentEntry{} } @@ -572,24 +534,13 @@ func (sd *shardDelegator) Query(ctx context.Context, req *querypb.QueryRequest) sealed, growing, version, err := sd.distribution.PinReadableSegments(req.GetReq().GetPartitionIDs()...) if err != nil { - log.Warn("delegator failed to query, current distribution is not serviceable") - return nil, merr.WrapErrChannelNotAvailable(sd.vchannelName, "distribution is not servcieable") - } - defer sd.distribution.Unpin(version) - - targetPartitions, err := sd.getTargetPartitions(req.GetReq().GetPartitionIDs()) - if err != nil { + log.Warn("delegator failed to query, current distribution is not serviceable", zap.Error(err)) return nil, err } - // set target partition ids to sub task request - req.Req.PartitionIDs = targetPartitions + defer sd.distribution.Unpin(version) if req.Req.IgnoreGrowing { growing = []SegmentEntry{} - } else { - growing = lo.Filter(growing, func(segment SegmentEntry, _ int) bool { - return funcutil.SliceContain(targetPartitions, segment.PartitionID) - }) } if paramtable.Get().QueryNodeCfg.EnableSegmentPrune.GetAsBool() { diff --git a/internal/querynodev2/delegator/delegator_data.go b/internal/querynodev2/delegator/delegator_data.go index ba454152ea343..421fe3b99e0ca 100644 --- a/internal/querynodev2/delegator/delegator_data.go +++ b/internal/querynodev2/delegator/delegator_data.go @@ -948,7 +948,7 @@ func (sd *shardDelegator) ReleaseSegments(ctx context.Context, req *querypb.Rele return nil } -func (sd *shardDelegator) SyncTargetVersion(newVersion int64, growingInTarget []int64, +func (sd *shardDelegator) SyncTargetVersion(newVersion int64, partitions []int64, growingInTarget []int64, sealedInTarget []int64, droppedInTarget []int64, checkpoint *msgpb.MsgPosition, ) { growings := sd.segmentManager.GetBy( @@ -980,7 +980,7 @@ func (sd *shardDelegator) SyncTargetVersion(newVersion int64, growingInTarget [] log.Warn("found redundant growing segments", zap.Int64s("growingSegments", redundantGrowingIDs)) } - sd.distribution.SyncTargetVersion(newVersion, growingInTarget, sealedInTarget, redundantGrowingIDs) + sd.distribution.SyncTargetVersion(newVersion, partitions, growingInTarget, sealedInTarget, redundantGrowingIDs) sd.deleteBuffer.TryDiscard(checkpoint.GetTimestamp()) } diff --git a/internal/querynodev2/delegator/delegator_data_test.go b/internal/querynodev2/delegator/delegator_data_test.go index 7fc98c8c7e944..60d99be805f1d 100644 --- a/internal/querynodev2/delegator/delegator_data_test.go +++ b/internal/querynodev2/delegator/delegator_data_test.go @@ -1363,9 +1363,10 @@ func (s *DelegatorDataSuite) TestReleaseSegment() { s.ElementsMatch([]SegmentEntry{ { - SegmentID: 1001, - NodeID: 1, - PartitionID: 500, + SegmentID: 1001, + NodeID: 1, + PartitionID: 500, + TargetVersion: unreadableTargetVersion, }, }, growing) @@ -1503,7 +1504,7 @@ func (s *DelegatorDataSuite) TestSyncTargetVersion() { s.manager.Segment.Put(context.Background(), segments.SegmentTypeGrowing, ms) } - s.delegator.SyncTargetVersion(int64(5), []int64{1}, []int64{2}, []int64{3, 4}, &msgpb.MsgPosition{}) + s.delegator.SyncTargetVersion(int64(5), []int64{1}, []int64{1}, []int64{2}, []int64{3, 4}, &msgpb.MsgPosition{}) s.Equal(int64(5), s.delegator.GetTargetVersion()) } diff --git a/internal/querynodev2/delegator/delegator_test.go b/internal/querynodev2/delegator/delegator_test.go index 1bd61173c2aee..b55c52f5cac0b 100644 --- a/internal/querynodev2/delegator/delegator_test.go +++ b/internal/querynodev2/delegator/delegator_test.go @@ -331,7 +331,7 @@ func (s *DelegatorSuite) initSegments() { Version: 2001, }, ) - s.delegator.SyncTargetVersion(2001, []int64{1004}, []int64{1000, 1001, 1002, 1003}, []int64{}, &msgpb.MsgPosition{}) + s.delegator.SyncTargetVersion(2001, []int64{500, 501}, []int64{1004}, []int64{1000, 1001, 1002, 1003}, []int64{}, &msgpb.MsgPosition{}) } func (s *DelegatorSuite) TestSearch() { diff --git a/internal/querynodev2/delegator/delta_forward_test.go b/internal/querynodev2/delegator/delta_forward_test.go index ae6f832266b93..108b9210a5d7d 100644 --- a/internal/querynodev2/delegator/delta_forward_test.go +++ b/internal/querynodev2/delegator/delta_forward_test.go @@ -174,18 +174,21 @@ func (s *StreamingForwardSuite) TestBFStreamingForward() { // Setup distribution delegator.distribution.AddGrowing(SegmentEntry{ - NodeID: 1, - SegmentID: 100, + NodeID: 1, + PartitionID: 1, + SegmentID: 100, }) delegator.distribution.AddDistributions(SegmentEntry{ - NodeID: 1, - SegmentID: 101, + NodeID: 1, + PartitionID: 1, + SegmentID: 101, }) delegator.distribution.AddDistributions(SegmentEntry{ - NodeID: 1, - SegmentID: 102, + NodeID: 1, + PartitionID: 1, + SegmentID: 102, }) - delegator.distribution.SyncTargetVersion(1, []int64{100}, []int64{101, 102}, nil) + delegator.distribution.SyncTargetVersion(1, []int64{1}, []int64{100}, []int64{101, 102}, nil) // Setup pk oracle // empty bfs will not match @@ -224,18 +227,21 @@ func (s *StreamingForwardSuite) TestDirectStreamingForward() { // Setup distribution delegator.distribution.AddGrowing(SegmentEntry{ - NodeID: 1, - SegmentID: 100, + NodeID: 1, + PartitionID: 1, + SegmentID: 100, }) delegator.distribution.AddDistributions(SegmentEntry{ - NodeID: 1, - SegmentID: 101, + NodeID: 1, + PartitionID: 1, + SegmentID: 101, }) delegator.distribution.AddDistributions(SegmentEntry{ - NodeID: 1, - SegmentID: 102, + NodeID: 1, + PartitionID: 1, + SegmentID: 102, }) - delegator.distribution.SyncTargetVersion(1, []int64{100}, []int64{101, 102}, nil) + delegator.distribution.SyncTargetVersion(1, []int64{1}, []int64{100}, []int64{101, 102}, nil) // Setup pk oracle // empty bfs will not match diff --git a/internal/querynodev2/delegator/distribution.go b/internal/querynodev2/delegator/distribution.go index 0bcbea20b869e..08fa2d327aa68 100644 --- a/internal/querynodev2/delegator/distribution.go +++ b/internal/querynodev2/delegator/distribution.go @@ -117,10 +117,17 @@ func (d *distribution) PinReadableSegments(partitions ...int64) (sealed []Snapsh defer d.mut.RUnlock() if !d.Serviceable() { - return nil, nil, -1, merr.WrapErrServiceInternal("channel distribution is not serviceable") + return nil, nil, -1, merr.WrapErrChannelNotAvailable("channel distribution is not serviceable") } current := d.current.Load() + // snapshot sanity check + // if user specified a partition id which is not serviceable, return err + for _, partition := range partitions { + if !current.partitions.Contain(partition) { + return nil, nil, -1, merr.WrapErrPartitionNotLoaded(partition) + } + } sealed, growing = current.Get(partitions...) version = current.version targetVersion := current.GetTargetVersion() @@ -261,7 +268,7 @@ func (d *distribution) AddOfflines(segmentIDs ...int64) { } // UpdateTargetVersion update readable segment version -func (d *distribution) SyncTargetVersion(newVersion int64, growingInTarget []int64, sealedInTarget []int64, redundantGrowings []int64) { +func (d *distribution) SyncTargetVersion(newVersion int64, partitions []int64, growingInTarget []int64, sealedInTarget []int64, redundantGrowings []int64) { d.mut.Lock() defer d.mut.Unlock() @@ -299,10 +306,12 @@ func (d *distribution) SyncTargetVersion(newVersion int64, growingInTarget []int oldValue := d.targetVersion.Load() d.targetVersion.Store(newVersion) - d.genSnapshot() + // update working partition list + d.genSnapshot(WithPartitions(partitions)) // if sealed segment in leader view is less than sealed segment in target, set delegator to unserviceable d.serviceable.Store(available) log.Info("Update readable segment version", + zap.Int64s("partitions", partitions), zap.Int64("oldVersion", oldValue), zap.Int64("newVersion", newVersion), zap.Int("growingSegmentNum", len(growingInTarget)), @@ -345,33 +354,55 @@ func (d *distribution) RemoveDistributions(sealedSegments []SegmentEntry, growin // getSnapshot converts current distribution to snapshot format. // in which, user could use found nodeID=>segmentID list. // mutex RLock is required before calling this method. -func (d *distribution) genSnapshot() chan struct{} { +func (d *distribution) genSnapshot(opts ...genSnapshotOpt) chan struct{} { + // stores last snapshot + // ok to be nil + last := d.current.Load() + option := &genSnapshotOption{ + partitions: typeutil.NewSet[int64](), // if no working list provided, snapshot shall have no item + } + // use last snapshot working parition list by default + if last != nil { + option.partitions = last.partitions + } + for _, opt := range opts { + opt(option) + } + nodeSegments := make(map[int64][]SegmentEntry) for _, entry := range d.sealedSegments { nodeSegments[entry.NodeID] = append(nodeSegments[entry.NodeID], entry) } + // only store working partition entry in snapshot to reduce calculation dist := make([]SnapshotItem, 0, len(nodeSegments)) for nodeID, items := range nodeSegments { dist = append(dist, SnapshotItem{ - NodeID: nodeID, - Segments: items, + NodeID: nodeID, + Segments: lo.Map(items, func(entry SegmentEntry, _ int) SegmentEntry { + if !option.partitions.Contain(entry.PartitionID) { + entry.TargetVersion = unreadableTargetVersion + } + return entry + }), }) } growing := make([]SegmentEntry, 0, len(d.growingSegments)) for _, entry := range d.growingSegments { + if !option.partitions.Contain(entry.PartitionID) { + entry.TargetVersion = unreadableTargetVersion + } growing = append(growing, entry) } d.serviceable.Store(d.offlines.Len() == 0) - // stores last snapshot - // ok to be nil - last := d.current.Load() // update snapshot version d.snapshotVersion++ newSnapShot := NewSnapshot(dist, growing, last, d.snapshotVersion, d.targetVersion.Load()) + newSnapShot.partitions = option.partitions + d.current.Store(newSnapShot) // shall be a new one d.snapshots.GetOrInsert(d.snapshotVersion, newSnapShot) @@ -404,3 +435,15 @@ func (d *distribution) getCleanup(version int64) snapshotCleanup { d.snapshots.GetAndRemove(version) } } + +type genSnapshotOption struct { + partitions typeutil.Set[int64] +} + +type genSnapshotOpt func(*genSnapshotOption) + +func WithPartitions(partitions []int64) genSnapshotOpt { + return func(opt *genSnapshotOption) { + opt.partitions = typeutil.NewSet(partitions...) + } +} diff --git a/internal/querynodev2/delegator/distribution_test.go b/internal/querynodev2/delegator/distribution_test.go index aa8c534e7d7e2..6b19e2fb3d9c6 100644 --- a/internal/querynodev2/delegator/distribution_test.go +++ b/internal/querynodev2/delegator/distribution_test.go @@ -217,9 +217,10 @@ func (s *DistributionSuite) compareSnapshotItems(target, value []SnapshotItem) { func (s *DistributionSuite) TestAddGrowing() { type testCase struct { - tag string - input []SegmentEntry - expected []SegmentEntry + tag string + workingParts []int64 + input []SegmentEntry + expected []SegmentEntry } cases := []testCase{ @@ -229,15 +230,27 @@ func (s *DistributionSuite) TestAddGrowing() { expected: []SegmentEntry{}, }, { - tag: "normal case", + tag: "normal_case", input: []SegmentEntry{ {SegmentID: 1, PartitionID: 1}, {SegmentID: 2, PartitionID: 2}, }, + workingParts: []int64{1, 2}, expected: []SegmentEntry{ + {SegmentID: 1, PartitionID: 1, TargetVersion: 1000}, + {SegmentID: 2, PartitionID: 2, TargetVersion: 1000}, + }, + }, + { + tag: "partial_partition_working", + input: []SegmentEntry{ {SegmentID: 1, PartitionID: 1}, {SegmentID: 2, PartitionID: 2}, }, + workingParts: []int64{1}, + expected: []SegmentEntry{ + {SegmentID: 1, PartitionID: 1, TargetVersion: 1000}, + }, }, } @@ -247,6 +260,7 @@ func (s *DistributionSuite) TestAddGrowing() { defer s.TearDownTest() s.dist.AddGrowing(tc.input...) + s.dist.SyncTargetVersion(1000, tc.workingParts, []int64{1, 2}, nil, nil) _, growing, version, err := s.dist.PinReadableSegments() s.Require().NoError(err) defer s.dist.Unpin(version) @@ -305,7 +319,7 @@ func (s *DistributionSuite) TestRemoveDistribution() { }, }, }, - expectGrowing: []SegmentEntry{{SegmentID: 4}}, + expectGrowing: []SegmentEntry{{SegmentID: 4, TargetVersion: unreadableTargetVersion}}, }, { tag: "remove with wrong nodeID", @@ -341,7 +355,7 @@ func (s *DistributionSuite) TestRemoveDistribution() { }, }, }, - expectGrowing: []SegmentEntry{{SegmentID: 4}, {SegmentID: 5}}, + expectGrowing: []SegmentEntry{{SegmentID: 4, TargetVersion: unreadableTargetVersion}, {SegmentID: 5, TargetVersion: unreadableTargetVersion}}, }, { tag: "remove with wildcardNodeID", @@ -376,7 +390,7 @@ func (s *DistributionSuite) TestRemoveDistribution() { }, }, }, - expectGrowing: []SegmentEntry{{SegmentID: 4}, {SegmentID: 5}}, + expectGrowing: []SegmentEntry{{SegmentID: 4, TargetVersion: unreadableTargetVersion}, {SegmentID: 5, TargetVersion: unreadableTargetVersion}}, }, { tag: "remove with read", @@ -421,7 +435,7 @@ func (s *DistributionSuite) TestRemoveDistribution() { }, }, }, - expectGrowing: []SegmentEntry{{SegmentID: 4}}, + expectGrowing: []SegmentEntry{{SegmentID: 4, TargetVersion: unreadableTargetVersion}}, }, } @@ -714,7 +728,7 @@ func (s *DistributionSuite) Test_SyncTargetVersion() { s.dist.AddGrowing(growing...) s.dist.AddDistributions(sealed...) - s.dist.SyncTargetVersion(2, []int64{2, 3}, []int64{6}, []int64{}) + s.dist.SyncTargetVersion(2, []int64{1}, []int64{2, 3}, []int64{6}, []int64{}) s1, s2, _, err := s.dist.PinReadableSegments() s.Require().NoError(err) @@ -726,13 +740,13 @@ func (s *DistributionSuite) Test_SyncTargetVersion() { s.Len(s2, 3) s.dist.serviceable.Store(true) - s.dist.SyncTargetVersion(2, []int64{222}, []int64{}, []int64{}) + s.dist.SyncTargetVersion(2, []int64{1}, []int64{222}, []int64{}, []int64{}) s.True(s.dist.Serviceable()) - s.dist.SyncTargetVersion(2, []int64{}, []int64{333}, []int64{}) + s.dist.SyncTargetVersion(2, []int64{1}, []int64{}, []int64{333}, []int64{}) s.False(s.dist.Serviceable()) - s.dist.SyncTargetVersion(2, []int64{}, []int64{333}, []int64{1, 2, 3}) + s.dist.SyncTargetVersion(2, []int64{1}, []int64{}, []int64{333}, []int64{1, 2, 3}) _, _, _, err = s.dist.PinReadableSegments() s.Error(err) } diff --git a/internal/querynodev2/delegator/mock_delegator.go b/internal/querynodev2/delegator/mock_delegator.go index 3ef1a03ea87cb..c1144db4dc501 100644 --- a/internal/querynodev2/delegator/mock_delegator.go +++ b/internal/querynodev2/delegator/mock_delegator.go @@ -940,9 +940,9 @@ func (_c *MockShardDelegator_SyncPartitionStats_Call) RunAndReturn(run func(cont return _c } -// SyncTargetVersion provides a mock function with given fields: newVersion, growingInTarget, sealedInTarget, droppedInTarget, checkpoint -func (_m *MockShardDelegator) SyncTargetVersion(newVersion int64, growingInTarget []int64, sealedInTarget []int64, droppedInTarget []int64, checkpoint *msgpb.MsgPosition) { - _m.Called(newVersion, growingInTarget, sealedInTarget, droppedInTarget, checkpoint) +// SyncTargetVersion provides a mock function with given fields: newVersion, partitions, growingInTarget, sealedInTarget, droppedInTarget, checkpoint +func (_m *MockShardDelegator) SyncTargetVersion(newVersion int64, partitions []int64, growingInTarget []int64, sealedInTarget []int64, droppedInTarget []int64, checkpoint *msgpb.MsgPosition) { + _m.Called(newVersion, partitions, growingInTarget, sealedInTarget, droppedInTarget, checkpoint) } // MockShardDelegator_SyncTargetVersion_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SyncTargetVersion' @@ -952,17 +952,18 @@ type MockShardDelegator_SyncTargetVersion_Call struct { // SyncTargetVersion is a helper method to define mock.On call // - newVersion int64 +// - partitions []int64 // - growingInTarget []int64 // - sealedInTarget []int64 // - droppedInTarget []int64 // - checkpoint *msgpb.MsgPosition -func (_e *MockShardDelegator_Expecter) SyncTargetVersion(newVersion interface{}, growingInTarget interface{}, sealedInTarget interface{}, droppedInTarget interface{}, checkpoint interface{}) *MockShardDelegator_SyncTargetVersion_Call { - return &MockShardDelegator_SyncTargetVersion_Call{Call: _e.mock.On("SyncTargetVersion", newVersion, growingInTarget, sealedInTarget, droppedInTarget, checkpoint)} +func (_e *MockShardDelegator_Expecter) SyncTargetVersion(newVersion interface{}, partitions interface{}, growingInTarget interface{}, sealedInTarget interface{}, droppedInTarget interface{}, checkpoint interface{}) *MockShardDelegator_SyncTargetVersion_Call { + return &MockShardDelegator_SyncTargetVersion_Call{Call: _e.mock.On("SyncTargetVersion", newVersion, partitions, growingInTarget, sealedInTarget, droppedInTarget, checkpoint)} } -func (_c *MockShardDelegator_SyncTargetVersion_Call) Run(run func(newVersion int64, growingInTarget []int64, sealedInTarget []int64, droppedInTarget []int64, checkpoint *msgpb.MsgPosition)) *MockShardDelegator_SyncTargetVersion_Call { +func (_c *MockShardDelegator_SyncTargetVersion_Call) Run(run func(newVersion int64, partitions []int64, growingInTarget []int64, sealedInTarget []int64, droppedInTarget []int64, checkpoint *msgpb.MsgPosition)) *MockShardDelegator_SyncTargetVersion_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(int64), args[1].([]int64), args[2].([]int64), args[3].([]int64), args[4].(*msgpb.MsgPosition)) + run(args[0].(int64), args[1].([]int64), args[2].([]int64), args[3].([]int64), args[4].([]int64), args[5].(*msgpb.MsgPosition)) }) return _c } @@ -972,7 +973,7 @@ func (_c *MockShardDelegator_SyncTargetVersion_Call) Return() *MockShardDelegato return _c } -func (_c *MockShardDelegator_SyncTargetVersion_Call) RunAndReturn(run func(int64, []int64, []int64, []int64, *msgpb.MsgPosition)) *MockShardDelegator_SyncTargetVersion_Call { +func (_c *MockShardDelegator_SyncTargetVersion_Call) RunAndReturn(run func(int64, []int64, []int64, []int64, []int64, *msgpb.MsgPosition)) *MockShardDelegator_SyncTargetVersion_Call { _c.Call.Return(run) return _c } diff --git a/internal/querynodev2/delegator/snapshot.go b/internal/querynodev2/delegator/snapshot.go index c48965b6b4518..ae6e88e423d1c 100644 --- a/internal/querynodev2/delegator/snapshot.go +++ b/internal/querynodev2/delegator/snapshot.go @@ -23,6 +23,7 @@ import ( "go.uber.org/atomic" "github.com/milvus-io/milvus/pkg/util/funcutil" + "github.com/milvus-io/milvus/pkg/util/typeutil" ) // SnapshotItem group segmentEntry slice @@ -36,6 +37,7 @@ type snapshotCleanup func() // snapshot records segment distribution with ref count. type snapshot struct { + partitions typeutil.Set[int64] dist []SnapshotItem growing []SegmentEntry targetVersion int64 @@ -60,6 +62,7 @@ type snapshot struct { // NewSnapshot returns a prepared snapshot with channel initialized. func NewSnapshot(sealed []SnapshotItem, growing []SegmentEntry, last *snapshot, version int64, targetVersion int64) *snapshot { return &snapshot{ + partitions: typeutil.NewSet[int64](), version: version, growing: growing, dist: sealed, diff --git a/internal/querynodev2/segments/validate.go b/internal/querynodev2/segments/validate.go index c421bc1129744..68fa8adf75f9e 100644 --- a/internal/querynodev2/segments/validate.go +++ b/internal/querynodev2/segments/validate.go @@ -18,45 +18,20 @@ package segments import ( "context" - "fmt" - "github.com/cockroachdb/errors" "go.uber.org/zap" - "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/pkg/log" - "github.com/milvus-io/milvus/pkg/util/funcutil" "github.com/milvus-io/milvus/pkg/util/merr" ) func validate(ctx context.Context, manager *Manager, collectionID int64, partitionIDs []int64, segmentIDs []int64, segmentFilter SegmentFilter) ([]Segment, error) { - var searchPartIDs []int64 - collection := manager.Collection.Get(collectionID) if collection == nil { return nil, merr.WrapErrCollectionNotFound(collectionID) } - // validate partition - // no partition id specified, get all partition ids in collection - if len(partitionIDs) == 0 { - searchPartIDs = collection.GetPartitions() - } else { - // use request partition ids directly, ignoring meta partition ids - // partitions shall be controlled by delegator distribution - searchPartIDs = partitionIDs - } - - log.Ctx(ctx).Debug("read target partitions", zap.Int64("collectionID", collectionID), zap.Int64s("partitionIDs", searchPartIDs)) - - // all partitions have been released - if len(searchPartIDs) == 0 && collection.GetLoadType() == querypb.LoadType_LoadPartition { - return nil, errors.Newf("partitions have been released , collectionID = %d target partitionIDs = %v", collectionID, searchPartIDs) - } - - if len(searchPartIDs) == 0 && collection.GetLoadType() == querypb.LoadType_LoadCollection { - return []Segment{}, nil - } + log.Ctx(ctx).Debug("read target partitions", zap.Int64("collectionID", collectionID), zap.Int64s("partitionIDs", partitionIDs)) // validate segment segments := make([]Segment, 0, len(segmentIDs)) @@ -67,23 +42,18 @@ func validate(ctx context.Context, manager *Manager, collectionID int64, partiti } }() if len(segmentIDs) == 0 { - for _, partID := range searchPartIDs { - segments, err = manager.Segment.GetAndPinBy(WithPartition(partID), segmentFilter) - if err != nil { - return nil, err - } + // legacy logic + segments, err = manager.Segment.GetAndPinBy(segmentFilter, SegmentFilterFunc(func(s Segment) bool { + return s.Collection() == collectionID + })) + if err != nil { + return nil, err } } else { segments, err = manager.Segment.GetAndPin(segmentIDs, segmentFilter) if err != nil { return nil, err } - for _, segment := range segments { - if !funcutil.SliceContain(searchPartIDs, segment.Partition()) { - err = fmt.Errorf("segment %d belongs to partition %d, which is not in %v", segment.ID(), segment.Partition(), searchPartIDs) - return nil, err - } - } } return segments, nil } diff --git a/internal/querynodev2/services.go b/internal/querynodev2/services.go index 734d8e4d717c6..35a1b4fa89878 100644 --- a/internal/querynodev2/services.go +++ b/internal/querynodev2/services.go @@ -1292,7 +1292,7 @@ func (node *QueryNode) SyncDistribution(ctx context.Context, req *querypb.SyncDi }) }) case querypb.SyncType_UpdateVersion: - log.Info("sync action", zap.Int64("TargetVersion", action.GetTargetVersion())) + log.Info("sync action", zap.Int64("TargetVersion", action.GetTargetVersion()), zap.Int64s("partitions", req.GetLoadMeta().GetPartitionIDs())) droppedInfos := lo.SliceToMap(action.GetDroppedInTarget(), func(id int64) (int64, uint64) { if action.GetCheckpoint() == nil { return id, typeutil.MaxTimestamp @@ -1307,7 +1307,7 @@ func (node *QueryNode) SyncDistribution(ctx context.Context, req *querypb.SyncDi return id, action.GetCheckpoint().Timestamp }) shardDelegator.AddExcludedSegments(flushedInfo) - shardDelegator.SyncTargetVersion(action.GetTargetVersion(), action.GetGrowingInTarget(), + shardDelegator.SyncTargetVersion(action.GetTargetVersion(), req.GetLoadMeta().GetPartitionIDs(), action.GetGrowingInTarget(), action.GetSealedInTarget(), action.GetDroppedInTarget(), action.GetCheckpoint()) case querypb.SyncType_UpdatePartitionStats: log.Info("sync update partition stats versions") diff --git a/internal/querynodev2/services_test.go b/internal/querynodev2/services_test.go index 075290bf0543c..98ca580086f82 100644 --- a/internal/querynodev2/services_test.go +++ b/internal/querynodev2/services_test.go @@ -1162,6 +1162,20 @@ func (suite *ServiceSuite) TestGetSegmentInfo_Failed() { suite.Equal(commonpb.ErrorCode_NotReadyServe, rsp.GetStatus().GetErrorCode()) } +func (suite *ServiceSuite) syncDistribution(ctx context.Context) { + suite.node.SyncDistribution(ctx, &querypb.SyncDistributionRequest{ + Channel: suite.vchannel, + CollectionID: suite.collectionID, + LoadMeta: &querypb.LoadMetaInfo{ + CollectionID: suite.collectionID, + PartitionIDs: suite.partitionIDs, + }, + Actions: []*querypb.SyncAction{ + {Type: querypb.SyncType_UpdateVersion, SealedInTarget: suite.validSegmentIDs, TargetVersion: time.Now().UnixNano()}, + }, + }) +} + // Test Search func (suite *ServiceSuite) genCSearchRequest(nq int64, dataType schemapb.DataType, fieldID int64, metricType string, isTopkReduce bool) (*internalpb.SearchRequest, error) { placeHolder, err := genPlaceHolderGroup(nq) @@ -1196,6 +1210,7 @@ func (suite *ServiceSuite) TestSearch_Normal() { // pre suite.TestWatchDmChannelsInt64() suite.TestLoadSegments_Int64() + suite.syncDistribution(ctx) creq, err := suite.genCSearchRequest(10, schemapb.DataType_FloatVector, 107, defaultMetricType, false) req := &querypb.SearchRequest{ @@ -1216,6 +1231,7 @@ func (suite *ServiceSuite) TestSearch_Concurrent() { // pre suite.TestWatchDmChannelsInt64() suite.TestLoadSegments_Int64() + suite.syncDistribution(ctx) concurrency := 16 futures := make([]*conc.Future[*internalpb.SearchResults], 0, concurrency) @@ -1278,6 +1294,7 @@ func (suite *ServiceSuite) TestSearch_Failed() { suite.TestWatchDmChannelsInt64() suite.TestLoadSegments_Int64() + // suite.syncDistribution(ctx) // sync segment data syncReq := &querypb.SyncDistributionRequest{ @@ -1287,6 +1304,10 @@ func (suite *ServiceSuite) TestSearch_Failed() { }, CollectionID: suite.collectionID, Channel: suite.vchannel, + LoadMeta: &querypb.LoadMetaInfo{ + CollectionID: suite.collectionID, + PartitionIDs: suite.partitionIDs, + }, } syncVersionAction := &querypb.SyncAction{ @@ -1458,6 +1479,7 @@ func (suite *ServiceSuite) TestQuery_Normal() { // pre suite.TestWatchDmChannelsInt64() suite.TestLoadSegments_Int64() + suite.syncDistribution(ctx) // data schema := mock_segcore.GenTestCollectionSchema(suite.collectionName, schemapb.DataType_Int64, false) @@ -1539,6 +1561,7 @@ func (suite *ServiceSuite) TestQueryStream_Normal() { // prepare suite.TestWatchDmChannelsInt64() suite.TestLoadSegments_Int64() + suite.syncDistribution(ctx) // data schema := mock_segcore.GenTestCollectionSchema(suite.collectionName, schemapb.DataType_Int64, false)