diff --git a/cmd/tools/migration/mmap/tool/main.go b/cmd/tools/migration/mmap/tool/main.go index 8975ffe59ef54..a9957a6ea4a49 100644 --- a/cmd/tools/migration/mmap/tool/main.go +++ b/cmd/tools/migration/mmap/tool/main.go @@ -134,7 +134,7 @@ func prepareRootCoordMeta(ctx context.Context, allocator tso.Allocator) rootcoor if ss, err = kvmetestore.NewSuffixSnapshot(metaKV, kvmetestore.SnapshotsSep, paramtable.Get().EtcdCfg.MetaRootPath.GetValue(), kvmetestore.SnapshotPrefix); err != nil { panic(err) } - catalog = &kvmetestore.Catalog{Txn: metaKV, Snapshot: ss} + catalog = kvmetestore.NewCatalog(metaKV, ss) case util.MetaStoreTypeTiKV: log.Info("Using tikv as meta storage.") var metaKV kv.MetaKv @@ -148,7 +148,7 @@ func prepareRootCoordMeta(ctx context.Context, allocator tso.Allocator) rootcoor if ss, err = kvmetestore.NewSuffixSnapshot(metaKV, kvmetestore.SnapshotsSep, paramtable.Get().TiKVCfg.MetaRootPath.GetValue(), kvmetestore.SnapshotPrefix); err != nil { panic(err) } - catalog = &kvmetestore.Catalog{Txn: metaKV, Snapshot: ss} + catalog = kvmetestore.NewCatalog(metaKV, ss) default: panic(fmt.Sprintf("MetaStoreType %s not supported", paramtable.Get().MetaStoreCfg.MetaStoreType.GetValue())) } diff --git a/internal/datacoord/broker/coordinator_broker.go b/internal/datacoord/broker/coordinator_broker.go index 7f079be5f631a..5536618e58fb4 100644 --- a/internal/datacoord/broker/coordinator_broker.go +++ b/internal/datacoord/broker/coordinator_broker.go @@ -25,6 +25,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" + "github.com/milvus-io/milvus/internal/proto/rootcoordpb" "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/commonpbutil" @@ -37,6 +38,7 @@ type Broker interface { DescribeCollectionInternal(ctx context.Context, collectionID int64) (*milvuspb.DescribeCollectionResponse, error) ShowPartitionsInternal(ctx context.Context, collectionID int64) ([]int64, error) ShowCollections(ctx context.Context, dbName string) (*milvuspb.ShowCollectionsResponse, error) + ShowCollectionsInternal(ctx context.Context) (*rootcoordpb.ShowCollectionsInternalResponse, error) ListDatabases(ctx context.Context) (*milvuspb.ListDatabasesResponse, error) HasCollection(ctx context.Context, collectionID int64) (bool, error) } @@ -116,6 +118,23 @@ func (b *coordinatorBroker) ShowCollections(ctx context.Context, dbName string) return resp, nil } +func (b *coordinatorBroker) ShowCollectionsInternal(ctx context.Context) (*rootcoordpb.ShowCollectionsInternalResponse, error) { + ctx, cancel := context.WithTimeout(ctx, paramtable.Get().QueryCoordCfg.BrokerTimeout.GetAsDuration(time.Millisecond)) + defer cancel() + resp, err := b.rootCoord.ShowCollectionsInternal(ctx, &rootcoordpb.ShowCollectionsInternalRequest{ + Base: commonpbutil.NewMsgBase( + commonpbutil.WithMsgType(commonpb.MsgType_ShowCollections), + ), + }) + + if err = merr.CheckRPCCall(resp, err); err != nil { + log.Warn("ShowCollectionsInternal failed", zap.Error(err)) + return nil, err + } + + return resp, nil +} + func (b *coordinatorBroker) ListDatabases(ctx context.Context) (*milvuspb.ListDatabasesResponse, error) { ctx, cancel := context.WithTimeout(ctx, paramtable.Get().QueryCoordCfg.BrokerTimeout.GetAsDuration(time.Millisecond)) defer cancel() diff --git a/internal/datacoord/broker/mock_coordinator_broker.go b/internal/datacoord/broker/mock_coordinator_broker.go index c952eba15b9bb..1ed21a129c7ad 100644 --- a/internal/datacoord/broker/mock_coordinator_broker.go +++ b/internal/datacoord/broker/mock_coordinator_broker.go @@ -7,6 +7,8 @@ import ( milvuspb "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" mock "github.com/stretchr/testify/mock" + + rootcoordpb "github.com/milvus-io/milvus/internal/proto/rootcoordpb" ) // MockBroker is an autogenerated mock type for the Broker type @@ -239,6 +241,64 @@ func (_c *MockBroker_ShowCollections_Call) RunAndReturn(run func(context.Context return _c } +// ShowCollectionsInternal provides a mock function with given fields: ctx +func (_m *MockBroker) ShowCollectionsInternal(ctx context.Context) (*rootcoordpb.ShowCollectionsInternalResponse, error) { + ret := _m.Called(ctx) + + if len(ret) == 0 { + panic("no return value specified for ShowCollectionsInternal") + } + + var r0 *rootcoordpb.ShowCollectionsInternalResponse + var r1 error + if rf, ok := ret.Get(0).(func(context.Context) (*rootcoordpb.ShowCollectionsInternalResponse, error)); ok { + return rf(ctx) + } + if rf, ok := ret.Get(0).(func(context.Context) *rootcoordpb.ShowCollectionsInternalResponse); ok { + r0 = rf(ctx) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*rootcoordpb.ShowCollectionsInternalResponse) + } + } + + if rf, ok := ret.Get(1).(func(context.Context) error); ok { + r1 = rf(ctx) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockBroker_ShowCollectionsInternal_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ShowCollectionsInternal' +type MockBroker_ShowCollectionsInternal_Call struct { + *mock.Call +} + +// ShowCollectionsInternal is a helper method to define mock.On call +// - ctx context.Context +func (_e *MockBroker_Expecter) ShowCollectionsInternal(ctx interface{}) *MockBroker_ShowCollectionsInternal_Call { + return &MockBroker_ShowCollectionsInternal_Call{Call: _e.mock.On("ShowCollectionsInternal", ctx)} +} + +func (_c *MockBroker_ShowCollectionsInternal_Call) Run(run func(ctx context.Context)) *MockBroker_ShowCollectionsInternal_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context)) + }) + return _c +} + +func (_c *MockBroker_ShowCollectionsInternal_Call) Return(_a0 *rootcoordpb.ShowCollectionsInternalResponse, _a1 error) *MockBroker_ShowCollectionsInternal_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockBroker_ShowCollectionsInternal_Call) RunAndReturn(run func(context.Context) (*rootcoordpb.ShowCollectionsInternalResponse, error)) *MockBroker_ShowCollectionsInternal_Call { + _c.Call.Return(run) + return _c +} + // ShowPartitionsInternal provides a mock function with given fields: ctx, collectionID func (_m *MockBroker) ShowPartitionsInternal(ctx context.Context, collectionID int64) ([]int64, error) { ret := _m.Called(ctx, collectionID) diff --git a/internal/datacoord/compaction_task_clustering_test.go b/internal/datacoord/compaction_task_clustering_test.go index 8072b270e80ff..0399d783cbad3 100644 --- a/internal/datacoord/compaction_task_clustering_test.go +++ b/internal/datacoord/compaction_task_clustering_test.go @@ -57,7 +57,9 @@ func (s *ClusteringCompactionTaskSuite) SetupTest() { ctx := context.Background() cm := storage.NewLocalChunkManager(storage.RootPath("")) catalog := datacoord.NewCatalog(NewMetaMemoryKV(), "", "") - meta, err := newMeta(ctx, catalog, cm) + broker := broker.NewMockBroker(s.T()) + broker.EXPECT().ShowCollectionsInternal(mock.Anything).Return(nil, nil) + meta, err := newMeta(ctx, catalog, cm, broker) s.NoError(err) s.meta = meta diff --git a/internal/datacoord/garbage_collector_test.go b/internal/datacoord/garbage_collector_test.go index 0a360a4ddb3a0..81df666915dae 100644 --- a/internal/datacoord/garbage_collector_test.go +++ b/internal/datacoord/garbage_collector_test.go @@ -63,7 +63,7 @@ func Test_garbageCollector_basic(t *testing.T) { cli, _, _, _, _, err := initUtOSSEnv(bucketName, rootPath, 0) require.NoError(t, err) - meta, err := newMemoryMeta() + meta, err := newMemoryMeta(t) assert.NoError(t, err) t.Run("normal gc", func(t *testing.T) { @@ -118,7 +118,7 @@ func Test_garbageCollector_scan(t *testing.T) { cli, inserts, stats, delta, others, err := initUtOSSEnv(bucketName, rootPath, 4) require.NoError(t, err) - meta, err := newMemoryMeta() + meta, err := newMemoryMeta(t) assert.NoError(t, err) t.Run("key is reference", func(t *testing.T) { @@ -1604,7 +1604,7 @@ func (s *GarbageCollectorSuite) SetupTest() { s.cli, s.inserts, s.stats, s.delta, s.others, err = initUtOSSEnv(s.bucketName, s.rootPath, 4) s.Require().NoError(err) - s.meta, err = newMemoryMeta() + s.meta, err = newMemoryMeta(s.T()) s.Require().NoError(err) } diff --git a/internal/datacoord/import_checker_test.go b/internal/datacoord/import_checker_test.go index 8197dd23ff6ec..7458cc7f32933 100644 --- a/internal/datacoord/import_checker_test.go +++ b/internal/datacoord/import_checker_test.go @@ -49,7 +49,7 @@ func (s *ImportCheckerSuite) SetupTest() { catalog.EXPECT().ListImportJobs().Return(nil, nil) catalog.EXPECT().ListPreImportTasks().Return(nil, nil) catalog.EXPECT().ListImportTasks().Return(nil, nil) - catalog.EXPECT().ListSegments(mock.Anything).Return(nil, nil) + catalog.EXPECT().ListSegments(mock.Anything, mock.Anything).Return(nil, nil) catalog.EXPECT().ListChannelCheckpoint(mock.Anything).Return(nil, nil) catalog.EXPECT().ListIndexes(mock.Anything).Return(nil, nil) catalog.EXPECT().ListSegmentIndexes(mock.Anything).Return(nil, nil) @@ -64,10 +64,11 @@ func (s *ImportCheckerSuite) SetupTest() { s.NoError(err) s.imeta = imeta - meta, err := newMeta(context.TODO(), catalog, nil) - s.NoError(err) - broker := broker2.NewMockBroker(s.T()) + broker.EXPECT().ShowCollectionsInternal(mock.Anything).Return(nil, nil) + + meta, err := newMeta(context.TODO(), catalog, nil, broker) + s.NoError(err) checker := NewImportChecker(meta, broker, cluster, alloc, imeta).(*importChecker) s.checker = checker diff --git a/internal/datacoord/import_scheduler_test.go b/internal/datacoord/import_scheduler_test.go index 79eeb2c10732c..2da6f291e6407 100644 --- a/internal/datacoord/import_scheduler_test.go +++ b/internal/datacoord/import_scheduler_test.go @@ -54,7 +54,7 @@ func (s *ImportSchedulerSuite) SetupTest() { s.catalog.EXPECT().ListImportJobs().Return(nil, nil) s.catalog.EXPECT().ListPreImportTasks().Return(nil, nil) s.catalog.EXPECT().ListImportTasks().Return(nil, nil) - s.catalog.EXPECT().ListSegments(mock.Anything).Return(nil, nil) + s.catalog.EXPECT().ListSegments(mock.Anything, mock.Anything).Return(nil, nil) s.catalog.EXPECT().ListChannelCheckpoint(mock.Anything).Return(nil, nil) s.catalog.EXPECT().ListIndexes(mock.Anything).Return(nil, nil) s.catalog.EXPECT().ListSegmentIndexes(mock.Anything).Return(nil, nil) @@ -64,7 +64,9 @@ func (s *ImportSchedulerSuite) SetupTest() { s.cluster = NewMockCluster(s.T()) s.alloc = NewNMockAllocator(s.T()) - s.meta, err = newMeta(context.TODO(), s.catalog, nil) + broker := broker.NewMockBroker(s.T()) + broker.EXPECT().ShowCollectionsInternal(mock.Anything).Return(nil, nil) + s.meta, err = newMeta(context.TODO(), s.catalog, nil, broker) s.NoError(err) s.meta.AddCollection(&collectionInfo{ ID: s.collectionID, diff --git a/internal/datacoord/import_util_test.go b/internal/datacoord/import_util_test.go index 72782e7709dff..7f06353a08eb5 100644 --- a/internal/datacoord/import_util_test.go +++ b/internal/datacoord/import_util_test.go @@ -100,7 +100,7 @@ func TestImportUtil_NewImportTasks(t *testing.T) { alloc.EXPECT().allocTimestamp(mock.Anything).Return(rand.Uint64(), nil) catalog := mocks.NewDataCoordCatalog(t) - catalog.EXPECT().ListSegments(mock.Anything).Return(nil, nil) + catalog.EXPECT().ListSegments(mock.Anything, mock.Anything).Return(nil, nil) catalog.EXPECT().ListChannelCheckpoint(mock.Anything).Return(nil, nil) catalog.EXPECT().ListIndexes(mock.Anything).Return(nil, nil) catalog.EXPECT().ListSegmentIndexes(mock.Anything).Return(nil, nil) @@ -109,7 +109,9 @@ func TestImportUtil_NewImportTasks(t *testing.T) { catalog.EXPECT().ListCompactionTask(mock.Anything).Return(nil, nil) catalog.EXPECT().ListPartitionStatsInfos(mock.Anything).Return(nil, nil) - meta, err := newMeta(context.TODO(), catalog, nil) + broker := broker.NewMockBroker(t) + broker.EXPECT().ShowCollectionsInternal(mock.Anything).Return(nil, nil) + meta, err := newMeta(context.TODO(), catalog, nil, broker) assert.NoError(t, err) tasks, err := NewImportTasks(fileGroups, job, alloc, meta) @@ -151,7 +153,7 @@ func TestImportUtil_AssembleRequest(t *testing.T) { } catalog := mocks.NewDataCoordCatalog(t) - catalog.EXPECT().ListSegments(mock.Anything).Return(nil, nil) + catalog.EXPECT().ListSegments(mock.Anything, mock.Anything).Return(nil, nil) catalog.EXPECT().ListChannelCheckpoint(mock.Anything).Return(nil, nil) catalog.EXPECT().ListIndexes(mock.Anything).Return(nil, nil) catalog.EXPECT().ListSegmentIndexes(mock.Anything).Return(nil, nil) @@ -167,7 +169,9 @@ func TestImportUtil_AssembleRequest(t *testing.T) { }) alloc.EXPECT().allocTimestamp(mock.Anything).Return(800, nil) - meta, err := newMeta(context.TODO(), catalog, nil) + broker := broker.NewMockBroker(t) + broker.EXPECT().ShowCollectionsInternal(mock.Anything).Return(nil, nil) + meta, err := newMeta(context.TODO(), catalog, nil, broker) assert.NoError(t, err) segment := &SegmentInfo{ SegmentInfo: &datapb.SegmentInfo{ID: 5, IsImporting: true}, @@ -236,7 +240,7 @@ func TestImportUtil_CheckDiskQuota(t *testing.T) { catalog.EXPECT().SavePreImportTask(mock.Anything).Return(nil) catalog.EXPECT().ListIndexes(mock.Anything).Return(nil, nil) catalog.EXPECT().ListSegmentIndexes(mock.Anything).Return(nil, nil) - catalog.EXPECT().ListSegments(mock.Anything).Return(nil, nil) + catalog.EXPECT().ListSegments(mock.Anything, mock.Anything).Return(nil, nil) catalog.EXPECT().ListChannelCheckpoint(mock.Anything).Return(nil, nil) catalog.EXPECT().AddSegment(mock.Anything, mock.Anything).Return(nil) catalog.EXPECT().ListAnalyzeTasks(mock.Anything).Return(nil, nil) @@ -246,7 +250,9 @@ func TestImportUtil_CheckDiskQuota(t *testing.T) { imeta, err := NewImportMeta(catalog) assert.NoError(t, err) - meta, err := newMeta(context.TODO(), catalog, nil) + broker := broker.NewMockBroker(t) + broker.EXPECT().ShowCollectionsInternal(mock.Anything).Return(nil, nil) + meta, err := newMeta(context.TODO(), catalog, nil, broker) assert.NoError(t, err) job := &importJob{ @@ -415,7 +421,7 @@ func TestImportUtil_GetImportProgress(t *testing.T) { catalog.EXPECT().ListImportJobs().Return(nil, nil) catalog.EXPECT().ListPreImportTasks().Return(nil, nil) catalog.EXPECT().ListImportTasks().Return(nil, nil) - catalog.EXPECT().ListSegments(mock.Anything).Return(nil, nil) + catalog.EXPECT().ListSegments(mock.Anything, mock.Anything).Return(nil, nil) catalog.EXPECT().ListChannelCheckpoint(mock.Anything).Return(nil, nil) catalog.EXPECT().ListIndexes(mock.Anything).Return(nil, nil) catalog.EXPECT().ListSegmentIndexes(mock.Anything).Return(nil, nil) @@ -431,7 +437,9 @@ func TestImportUtil_GetImportProgress(t *testing.T) { imeta, err := NewImportMeta(catalog) assert.NoError(t, err) - meta, err := newMeta(context.TODO(), catalog, nil) + broker := broker.NewMockBroker(t) + broker.EXPECT().ShowCollectionsInternal(mock.Anything).Return(nil, nil) + meta, err := newMeta(context.TODO(), catalog, nil, broker) assert.NoError(t, err) file1 := &internalpb.ImportFile{ diff --git a/internal/datacoord/meta.go b/internal/datacoord/meta.go index 961d37aa7ad7d..cb16bbddaddce 100644 --- a/internal/datacoord/meta.go +++ b/internal/datacoord/meta.go @@ -42,6 +42,7 @@ import ( "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/metrics" + "github.com/milvus-io/milvus/pkg/util/conc" "github.com/milvus-io/milvus/pkg/util/funcutil" "github.com/milvus-io/milvus/pkg/util/lock" "github.com/milvus-io/milvus/pkg/util/merr" @@ -137,7 +138,7 @@ type collectionInfo struct { } // NewMeta creates meta from provided `kv.TxnKV` -func newMeta(ctx context.Context, catalog metastore.DataCoordCatalog, chunkManager storage.ChunkManager) (*meta, error) { +func newMeta(ctx context.Context, catalog metastore.DataCoordCatalog, chunkManager storage.ChunkManager, broker broker.Broker) (*meta, error) { im, err := newIndexMeta(ctx, catalog) if err != nil { return nil, err @@ -169,7 +170,7 @@ func newMeta(ctx context.Context, catalog metastore.DataCoordCatalog, chunkManag partitionStatsMeta: psm, compactionTaskMeta: ctm, } - err = mt.reloadFromKV() + err = mt.reloadFromKV(broker) if err != nil { return nil, err } @@ -177,39 +178,73 @@ func newMeta(ctx context.Context, catalog metastore.DataCoordCatalog, chunkManag } // reloadFromKV loads meta from KV storage -func (m *meta) reloadFromKV() error { +func (m *meta) reloadFromKV(broker broker.Broker) error { record := timerecord.NewTimeRecorder("datacoord") - segments, err := m.catalog.ListSegments(m.ctx) + + resp, err := broker.ShowCollectionsInternal(m.ctx) if err != nil { return err } - metrics.DataCoordNumCollections.WithLabelValues().Set(0) - metrics.DataCoordNumSegments.Reset() - numStoredRows := int64(0) - for _, segment := range segments { - // segments from catalog.ListSegments will not have logPath - m.segments.SetSegment(segment.ID, NewSegmentInfo(segment)) - metrics.DataCoordNumSegments.WithLabelValues(segment.GetState().String(), segment.GetLevel().String()).Inc() - if segment.State == commonpb.SegmentState_Flushed { - numStoredRows += segment.NumOfRows - - insertFileNum := 0 - for _, fieldBinlog := range segment.GetBinlogs() { - insertFileNum += len(fieldBinlog.GetBinlogs()) - } - metrics.FlushedSegmentFileNum.WithLabelValues(metrics.InsertFileLabel).Observe(float64(insertFileNum)) + log.Info("datacoord show collections done", zap.Duration("dur", record.RecordSpan())) + + collectionIDs := make([]int64, 0, 4096) + for _, collections := range resp.GetDbCollections() { + collectionIDs = append(collectionIDs, collections.GetCollectionIDs()...) + } - statFileNum := 0 - for _, fieldBinlog := range segment.GetStatslogs() { - statFileNum += len(fieldBinlog.GetBinlogs()) + pool := conc.NewPool[any](paramtable.Get().MetaStoreCfg.ReadConcurrency.GetAsInt()) + futures := make([]*conc.Future[any], 0, len(collectionIDs)) + collectionSegments := make([][]*datapb.SegmentInfo, len(collectionIDs)) + for i, collectionID := range collectionIDs { + i := i + collectionID := collectionID + futures = append(futures, pool.Submit(func() (any, error) { + segments, err := m.catalog.ListSegments(m.ctx, collectionID) + if err != nil { + return nil, err } - metrics.FlushedSegmentFileNum.WithLabelValues(metrics.StatFileLabel).Observe(float64(statFileNum)) + collectionSegments[i] = segments + return nil, nil + })) + } + err = conc.AwaitAll(futures...) + if err != nil { + return err + } - deleteFileNum := 0 - for _, filedBinlog := range segment.GetDeltalogs() { - deleteFileNum += len(filedBinlog.GetBinlogs()) + log.Info("datacoord show segments done", zap.Duration("dur", record.RecordSpan())) + + metrics.DataCoordNumCollections.WithLabelValues().Set(0) + metrics.DataCoordNumSegments.Reset() + numStoredRows := int64(0) + numSegments := 0 + for _, segments := range collectionSegments { + numSegments += len(segments) + for _, segment := range segments { + // segments from catalog.ListSegments will not have logPath + m.segments.SetSegment(segment.ID, NewSegmentInfo(segment)) + metrics.DataCoordNumSegments.WithLabelValues(segment.GetState().String(), segment.GetLevel().String()).Inc() + if segment.State == commonpb.SegmentState_Flushed { + numStoredRows += segment.NumOfRows + + insertFileNum := 0 + for _, fieldBinlog := range segment.GetBinlogs() { + insertFileNum += len(fieldBinlog.GetBinlogs()) + } + metrics.FlushedSegmentFileNum.WithLabelValues(metrics.InsertFileLabel).Observe(float64(insertFileNum)) + + statFileNum := 0 + for _, fieldBinlog := range segment.GetStatslogs() { + statFileNum += len(fieldBinlog.GetBinlogs()) + } + metrics.FlushedSegmentFileNum.WithLabelValues(metrics.StatFileLabel).Observe(float64(statFileNum)) + + deleteFileNum := 0 + for _, filedBinlog := range segment.GetDeltalogs() { + deleteFileNum += len(filedBinlog.GetBinlogs()) + } + metrics.FlushedSegmentFileNum.WithLabelValues(metrics.DeleteFileLabel).Observe(float64(deleteFileNum)) } - metrics.FlushedSegmentFileNum.WithLabelValues(metrics.DeleteFileLabel).Observe(float64(deleteFileNum)) } } @@ -226,7 +261,7 @@ func (m *meta) reloadFromKV() error { Set(float64(ts.Unix())) } - log.Info("DataCoord meta reloadFromKV done", zap.Duration("duration", record.ElapseSpan())) + log.Info("DataCoord meta reloadFromKV done", zap.Int("numSegments", numSegments), zap.Duration("duration", record.ElapseSpan())) return nil } diff --git a/internal/datacoord/meta_test.go b/internal/datacoord/meta_test.go index 2f575ff67b589..735f52f47c982 100644 --- a/internal/datacoord/meta_test.go +++ b/internal/datacoord/meta_test.go @@ -38,6 +38,7 @@ import ( "github.com/milvus-io/milvus/internal/metastore/model" "github.com/milvus-io/milvus/internal/mocks" "github.com/milvus-io/milvus/internal/proto/datapb" + "github.com/milvus-io/milvus/internal/proto/rootcoordpb" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/kv" "github.com/milvus-io/milvus/pkg/metrics" @@ -67,23 +68,35 @@ func (suite *MetaReloadSuite) resetMock() { func (suite *MetaReloadSuite) TestReloadFromKV() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() + suite.Run("ListSegments_fail", func() { defer suite.resetMock() - suite.catalog.EXPECT().ListSegments(mock.Anything).Return(nil, errors.New("mock")) + brk := broker.NewMockBroker(suite.T()) + brk.EXPECT().ShowCollectionsInternal(mock.Anything).Return(&rootcoordpb.ShowCollectionsInternalResponse{ + Status: merr.Success(), + DbCollections: []*rootcoordpb.DBCollections{ + { + DbName: "db_1", + CollectionIDs: []int64{100}, + }, + }, + }, nil) + suite.catalog.EXPECT().ListSegments(mock.Anything, mock.Anything).Return(nil, errors.New("mock")) suite.catalog.EXPECT().ListIndexes(mock.Anything).Return([]*model.Index{}, nil) suite.catalog.EXPECT().ListSegmentIndexes(mock.Anything).Return([]*model.SegmentIndex{}, nil) suite.catalog.EXPECT().ListAnalyzeTasks(mock.Anything).Return(nil, nil) suite.catalog.EXPECT().ListCompactionTask(mock.Anything).Return(nil, nil) suite.catalog.EXPECT().ListPartitionStatsInfos(mock.Anything).Return(nil, nil) - _, err := newMeta(ctx, suite.catalog, nil) + _, err := newMeta(ctx, suite.catalog, nil, brk) suite.Error(err) }) suite.Run("ListChannelCheckpoint_fail", func() { defer suite.resetMock() - - suite.catalog.EXPECT().ListSegments(mock.Anything).Return([]*datapb.SegmentInfo{}, nil) + brk := broker.NewMockBroker(suite.T()) + brk.EXPECT().ShowCollectionsInternal(mock.Anything).Return(nil, nil) + suite.catalog.EXPECT().ListSegments(mock.Anything, mock.Anything).Return([]*datapb.SegmentInfo{}, nil) suite.catalog.EXPECT().ListChannelCheckpoint(mock.Anything).Return(nil, errors.New("mock")) suite.catalog.EXPECT().ListIndexes(mock.Anything).Return([]*model.Index{}, nil) suite.catalog.EXPECT().ListSegmentIndexes(mock.Anything).Return([]*model.SegmentIndex{}, nil) @@ -91,18 +104,28 @@ func (suite *MetaReloadSuite) TestReloadFromKV() { suite.catalog.EXPECT().ListCompactionTask(mock.Anything).Return(nil, nil) suite.catalog.EXPECT().ListPartitionStatsInfos(mock.Anything).Return(nil, nil) - _, err := newMeta(ctx, suite.catalog, nil) + _, err := newMeta(ctx, suite.catalog, nil, brk) suite.Error(err) }) suite.Run("ok", func() { defer suite.resetMock() + brk := broker.NewMockBroker(suite.T()) + brk.EXPECT().ShowCollectionsInternal(mock.Anything).Return(&rootcoordpb.ShowCollectionsInternalResponse{ + Status: merr.Success(), + DbCollections: []*rootcoordpb.DBCollections{ + { + DbName: "db_1", + CollectionIDs: []int64{1}, + }, + }, + }, nil) suite.catalog.EXPECT().ListIndexes(mock.Anything).Return([]*model.Index{}, nil) suite.catalog.EXPECT().ListSegmentIndexes(mock.Anything).Return([]*model.SegmentIndex{}, nil) suite.catalog.EXPECT().ListAnalyzeTasks(mock.Anything).Return(nil, nil) suite.catalog.EXPECT().ListCompactionTask(mock.Anything).Return(nil, nil) suite.catalog.EXPECT().ListPartitionStatsInfos(mock.Anything).Return(nil, nil) - suite.catalog.EXPECT().ListSegments(mock.Anything).Return([]*datapb.SegmentInfo{ + suite.catalog.EXPECT().ListSegments(mock.Anything, mock.Anything).Return([]*datapb.SegmentInfo{ { ID: 1, CollectionID: 1, @@ -118,11 +141,56 @@ func (suite *MetaReloadSuite) TestReloadFromKV() { }, }, nil) - _, err := newMeta(ctx, suite.catalog, nil) + _, err := newMeta(ctx, suite.catalog, nil, brk) suite.NoError(err) suite.MetricsEqual(metrics.DataCoordNumSegments.WithLabelValues(metrics.FlushedSegmentLabel, datapb.SegmentLevel_Legacy.String()), 1) }) + + suite.Run("test list segments", func() { + defer suite.resetMock() + brk := broker.NewMockBroker(suite.T()) + brk.EXPECT().ShowCollectionsInternal(mock.Anything).Return(&rootcoordpb.ShowCollectionsInternalResponse{ + Status: merr.Success(), + DbCollections: []*rootcoordpb.DBCollections{ + { + DbName: "db_1", + CollectionIDs: []int64{100, 101, 102}, + }, + { + DbName: "db_2", + CollectionIDs: []int64{200, 201, 202}, + }, + }, + }, nil) + + suite.catalog.EXPECT().ListIndexes(mock.Anything).Return([]*model.Index{}, nil) + suite.catalog.EXPECT().ListSegmentIndexes(mock.Anything).Return([]*model.SegmentIndex{}, nil) + suite.catalog.EXPECT().ListAnalyzeTasks(mock.Anything).Return(nil, nil) + suite.catalog.EXPECT().ListCompactionTask(mock.Anything).Return(nil, nil) + suite.catalog.EXPECT().ListPartitionStatsInfos(mock.Anything).Return(nil, nil) + suite.catalog.EXPECT().ListStatsTasks(mock.Anything).Return(nil, nil) + suite.catalog.EXPECT().ListChannelCheckpoint(mock.Anything).Return(nil, nil) + + suite.catalog.EXPECT().ListSegments(mock.Anything, mock.Anything).RunAndReturn( + func(ctx context.Context, collectionID int64) ([]*datapb.SegmentInfo, error) { + return []*datapb.SegmentInfo{ + { + ID: rand.Int63(), + CollectionID: collectionID, + State: commonpb.SegmentState_Flushed, + }, + }, nil + }) + + meta, err := newMeta(ctx, suite.catalog, nil, brk) + suite.NoError(err) + for _, collectionID := range []int64{100, 101, 102, 200, 201, 202} { + segments := meta.GetSegmentsOfCollection(ctx, collectionID) + suite.Len(segments, 1) + suite.Equal(collectionID, segments[0].GetCollectionID()) + } + }) } type MetaBasicSuite struct { @@ -144,7 +212,7 @@ func (suite *MetaBasicSuite) SetupTest() { suite.partIDs = []int64{100, 101} suite.channelName = "c1" - meta, err := newMemoryMeta() + meta, err := newMemoryMeta(suite.T()) suite.Require().NoError(err) suite.meta = meta @@ -518,7 +586,7 @@ func TestMeta_Basic(t *testing.T) { ctx := context.Background() mockAllocator := newMockAllocator() - meta, err := newMemoryMeta() + meta, err := newMemoryMeta(t) assert.NoError(t, err) testSchema := newTestSchema() @@ -606,7 +674,9 @@ func TestMeta_Basic(t *testing.T) { metakv.EXPECT().WalkWithPrefix(mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe() metakv.EXPECT().LoadWithPrefix(mock.Anything).Return(nil, nil, nil).Maybe() catalog := datacoord.NewCatalog(metakv, "", "") - meta, err := newMeta(context.TODO(), catalog, nil) + broker := broker.NewMockBroker(t) + broker.EXPECT().ShowCollectionsInternal(mock.Anything).Return(nil, nil) + meta, err := newMeta(context.TODO(), catalog, nil, broker) assert.NoError(t, err) err = meta.AddSegment(context.TODO(), NewSegmentInfo(&datapb.SegmentInfo{})) @@ -621,7 +691,7 @@ func TestMeta_Basic(t *testing.T) { metakv2.EXPECT().LoadWithPrefix(mock.Anything).Return(nil, nil, nil).Maybe() metakv2.EXPECT().MultiSaveAndRemoveWithPrefix(mock.Anything, mock.Anything).Return(errors.New("failed")) catalog = datacoord.NewCatalog(metakv2, "", "") - meta, err = newMeta(context.TODO(), catalog, nil) + meta, err = newMeta(context.TODO(), catalog, nil, broker) assert.NoError(t, err) // nil, since no segment yet err = meta.DropSegment(0) @@ -634,7 +704,7 @@ func TestMeta_Basic(t *testing.T) { assert.Error(t, err) catalog = datacoord.NewCatalog(metakv, "", "") - meta, err = newMeta(context.TODO(), catalog, nil) + meta, err = newMeta(context.TODO(), catalog, nil, broker) assert.NoError(t, err) assert.NotNil(t, meta) }) @@ -758,7 +828,7 @@ func TestMeta_Basic(t *testing.T) { }) t.Run("Test AddAllocation", func(t *testing.T) { - meta, _ := newMemoryMeta() + meta, _ := newMemoryMeta(t) err := meta.AddAllocation(1, &Allocation{ SegmentID: 1, NumOfRows: 1, @@ -769,7 +839,7 @@ func TestMeta_Basic(t *testing.T) { } func TestGetUnFlushedSegments(t *testing.T) { - meta, err := newMemoryMeta() + meta, err := newMemoryMeta(t) assert.NoError(t, err) s1 := &datapb.SegmentInfo{ ID: 0, @@ -798,7 +868,7 @@ func TestGetUnFlushedSegments(t *testing.T) { func TestUpdateSegmentsInfo(t *testing.T) { t.Run("normal", func(t *testing.T) { - meta, err := newMemoryMeta() + meta, err := newMemoryMeta(t) assert.NoError(t, err) segment1 := NewSegmentInfo(&datapb.SegmentInfo{ @@ -847,7 +917,7 @@ func TestUpdateSegmentsInfo(t *testing.T) { }) t.Run("update compacted segment", func(t *testing.T) { - meta, err := newMemoryMeta() + meta, err := newMemoryMeta(t) assert.NoError(t, err) // segment not found @@ -871,7 +941,7 @@ func TestUpdateSegmentsInfo(t *testing.T) { assert.NoError(t, err) }) t.Run("update non-existed segment", func(t *testing.T) { - meta, err := newMemoryMeta() + meta, err := newMemoryMeta(t) assert.NoError(t, err) err = meta.UpdateSegmentsInfo( @@ -921,7 +991,7 @@ func TestUpdateSegmentsInfo(t *testing.T) { }) t.Run("update checkpoints and start position of non existed segment", func(t *testing.T) { - meta, err := newMemoryMeta() + meta, err := newMemoryMeta(t) assert.NoError(t, err) segment1 := &SegmentInfo{SegmentInfo: &datapb.SegmentInfo{ID: 1, State: commonpb.SegmentState_Growing}} @@ -943,7 +1013,9 @@ func TestUpdateSegmentsInfo(t *testing.T) { metakv.EXPECT().WalkWithPrefix(mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe() metakv.EXPECT().LoadWithPrefix(mock.Anything).Return(nil, nil, nil).Maybe() catalog := datacoord.NewCatalog(metakv, "", "") - meta, err := newMeta(context.TODO(), catalog, nil) + broker := broker.NewMockBroker(t) + broker.EXPECT().ShowCollectionsInternal(mock.Anything).Return(nil, nil) + meta, err := newMeta(context.TODO(), catalog, nil, broker) assert.NoError(t, err) segmentInfo := &SegmentInfo{ @@ -1234,7 +1306,7 @@ func TestChannelCP(t *testing.T) { } t.Run("UpdateChannelCheckpoint", func(t *testing.T) { - meta, err := newMemoryMeta() + meta, err := newMemoryMeta(t) assert.NoError(t, err) // nil position @@ -1246,7 +1318,7 @@ func TestChannelCP(t *testing.T) { }) t.Run("UpdateChannelCheckpoints", func(t *testing.T) { - meta, err := newMemoryMeta() + meta, err := newMemoryMeta(t) assert.NoError(t, err) assert.Equal(t, 0, len(meta.channelCPs.checkpoints)) @@ -1262,7 +1334,7 @@ func TestChannelCP(t *testing.T) { }) t.Run("GetChannelCheckpoint", func(t *testing.T) { - meta, err := newMemoryMeta() + meta, err := newMemoryMeta(t) assert.NoError(t, err) position := meta.GetChannelCheckpoint(mockVChannel) @@ -1277,7 +1349,7 @@ func TestChannelCP(t *testing.T) { }) t.Run("DropChannelCheckpoint", func(t *testing.T) { - meta, err := newMemoryMeta() + meta, err := newMemoryMeta(t) assert.NoError(t, err) err = meta.DropChannelCheckpoint(mockVChannel) diff --git a/internal/datacoord/mock_test.go b/internal/datacoord/mock_test.go index 5012e309fca51..09220e7ba327c 100644 --- a/internal/datacoord/mock_test.go +++ b/internal/datacoord/mock_test.go @@ -83,9 +83,11 @@ func (mm *metaMemoryKV) CompareVersionAndSwap(key string, version int64, target panic("implement me") } -func newMemoryMeta() (*meta, error) { +func newMemoryMeta(t *testing.T) (*meta, error) { catalog := datacoord.NewCatalog(NewMetaMemoryKV(), "", "") - return newMeta(context.TODO(), catalog, nil) + broker := broker.NewMockBroker(t) + broker.EXPECT().ShowCollectionsInternal(mock.Anything).Return(nil, nil) + return newMeta(context.TODO(), catalog, nil, broker) } var _ allocator = (*MockAllocator)(nil) @@ -467,6 +469,12 @@ func (m *mockRootCoordClient) ShowCollections(ctx context.Context, req *milvuspb }, nil } +func (m *mockRootCoordClient) ShowCollectionsInternal(ctx context.Context, req *rootcoordpb.ShowCollectionsInternalRequest, opts ...grpc.CallOption) (*rootcoordpb.ShowCollectionsInternalResponse, error) { + return &rootcoordpb.ShowCollectionsInternalResponse{ + Status: merr.Success(), + }, nil +} + func (m *mockRootCoordClient) CreateDatabase(ctx context.Context, in *milvuspb.CreateDatabaseRequest, opts ...grpc.CallOption) (*commonpb.Status, error) { panic("not implemented") // TODO: Implement } diff --git a/internal/datacoord/segment_manager_test.go b/internal/datacoord/segment_manager_test.go index 42e5fe7c51ca9..c04b10d786163 100644 --- a/internal/datacoord/segment_manager_test.go +++ b/internal/datacoord/segment_manager_test.go @@ -43,7 +43,7 @@ func TestManagerOptions(t *testing.T) { // ctx := context.Background() paramtable.Init() mockAllocator := newMockAllocator() - meta, err := newMemoryMeta() + meta, err := newMemoryMeta(t) assert.NoError(t, err) segmentManager, _ := newSegmentManager(meta, mockAllocator) t.Run("test with alloc helper", func(t *testing.T) { @@ -104,7 +104,7 @@ func TestAllocSegment(t *testing.T) { paramtable.Init() Params.Save(Params.DataCoordCfg.AllocLatestExpireAttempt.Key, "1") mockAllocator := newMockAllocator() - meta, err := newMemoryMeta() + meta, err := newMemoryMeta(t) assert.NoError(t, err) segmentManager, _ := newSegmentManager(meta, mockAllocator) @@ -190,7 +190,9 @@ func TestLastExpireReset(t *testing.T) { metaKV := etcdkv.NewEtcdKV(etcdCli, rootPath) metaKV.RemoveWithPrefix("") catalog := datacoord.NewCatalog(metaKV, "", "") - meta, err := newMeta(context.TODO(), catalog, nil) + broker := broker.NewMockBroker(t) + broker.EXPECT().ShowCollectionsInternal(mock.Anything).Return(nil, nil) + meta, err := newMeta(context.TODO(), catalog, nil, broker) assert.Nil(t, err) // add collection channelName := "c1" @@ -242,7 +244,7 @@ func TestLastExpireReset(t *testing.T) { newMetaKV := etcdkv.NewEtcdKV(newEtcdCli, rootPath) defer newMetaKV.RemoveWithPrefix("") newCatalog := datacoord.NewCatalog(newMetaKV, "", "") - restartedMeta, err := newMeta(context.TODO(), newCatalog, nil) + restartedMeta, err := newMeta(context.TODO(), newCatalog, nil, broker) restartedMeta.AddCollection(&collectionInfo{ID: collID, Schema: schema}) assert.Nil(t, err) newSegmentManager, _ := newSegmentManager(restartedMeta, mockAllocator) @@ -271,7 +273,7 @@ func TestLoadSegmentsFromMeta(t *testing.T) { ctx := context.Background() paramtable.Init() mockAllocator := newMockAllocator() - meta, err := newMemoryMeta() + meta, err := newMemoryMeta(t) assert.NoError(t, err) schema := newTestSchema() @@ -329,7 +331,7 @@ func TestLoadSegmentsFromMeta(t *testing.T) { func TestSaveSegmentsToMeta(t *testing.T) { paramtable.Init() mockAllocator := newMockAllocator() - meta, err := newMemoryMeta() + meta, err := newMemoryMeta(t) assert.NoError(t, err) schema := newTestSchema() @@ -351,7 +353,7 @@ func TestSaveSegmentsToMeta(t *testing.T) { func TestSaveSegmentsToMetaWithSpecificSegments(t *testing.T) { paramtable.Init() mockAllocator := newMockAllocator() - meta, err := newMemoryMeta() + meta, err := newMemoryMeta(t) assert.NoError(t, err) schema := newTestSchema() @@ -373,7 +375,7 @@ func TestSaveSegmentsToMetaWithSpecificSegments(t *testing.T) { func TestDropSegment(t *testing.T) { paramtable.Init() mockAllocator := newMockAllocator() - meta, err := newMemoryMeta() + meta, err := newMemoryMeta(t) assert.NoError(t, err) schema := newTestSchema() @@ -396,7 +398,7 @@ func TestDropSegment(t *testing.T) { func TestAllocRowsLargerThanOneSegment(t *testing.T) { paramtable.Init() mockAllocator := newMockAllocator() - meta, err := newMemoryMeta() + meta, err := newMemoryMeta(t) assert.NoError(t, err) schema := newTestSchema() @@ -418,7 +420,7 @@ func TestAllocRowsLargerThanOneSegment(t *testing.T) { func TestExpireAllocation(t *testing.T) { paramtable.Init() mockAllocator := newMockAllocator() - meta, err := newMemoryMeta() + meta, err := newMemoryMeta(t) assert.NoError(t, err) schema := newTestSchema() @@ -460,7 +462,7 @@ func TestGetFlushableSegments(t *testing.T) { t.Run("get flushable segments between small interval", func(t *testing.T) { paramtable.Init() mockAllocator := newMockAllocator() - meta, err := newMemoryMeta() + meta, err := newMemoryMeta(t) assert.NoError(t, err) schema := newTestSchema() @@ -506,7 +508,7 @@ func TestTryToSealSegment(t *testing.T) { t.Run("normal seal with segment policies", func(t *testing.T) { paramtable.Init() mockAllocator := newMockAllocator() - meta, err := newMemoryMeta() + meta, err := newMemoryMeta(t) assert.NoError(t, err) schema := newTestSchema() @@ -531,7 +533,7 @@ func TestTryToSealSegment(t *testing.T) { t.Run("normal seal with channel seal policies", func(t *testing.T) { paramtable.Init() mockAllocator := newMockAllocator() - meta, err := newMemoryMeta() + meta, err := newMemoryMeta(t) assert.NoError(t, err) schema := newTestSchema() @@ -556,7 +558,7 @@ func TestTryToSealSegment(t *testing.T) { t.Run("normal seal with both segment & channel seal policy", func(t *testing.T) { paramtable.Init() mockAllocator := newMockAllocator() - meta, err := newMemoryMeta() + meta, err := newMemoryMeta(t) assert.NoError(t, err) schema := newTestSchema() @@ -583,7 +585,7 @@ func TestTryToSealSegment(t *testing.T) { t.Run("test sealByMaxBinlogFileNumberPolicy", func(t *testing.T) { paramtable.Init() mockAllocator := newMockAllocator() - meta, err := newMemoryMeta() + meta, err := newMemoryMeta(t) assert.NoError(t, err) schema := newTestSchema() @@ -667,7 +669,9 @@ func TestTryToSealSegment(t *testing.T) { mockAllocator := newMockAllocator() memoryKV := NewMetaMemoryKV() catalog := datacoord.NewCatalog(memoryKV, "", "") - meta, err := newMeta(context.TODO(), catalog, nil) + broker := broker.NewMockBroker(t) + broker.EXPECT().ShowCollectionsInternal(mock.Anything).Return(nil, nil) + meta, err := newMeta(context.TODO(), catalog, nil, broker) assert.NoError(t, err) schema := newTestSchema() @@ -696,7 +700,9 @@ func TestTryToSealSegment(t *testing.T) { mockAllocator := newMockAllocator() memoryKV := NewMetaMemoryKV() catalog := datacoord.NewCatalog(memoryKV, "", "") - meta, err := newMeta(context.TODO(), catalog, nil) + broker := broker.NewMockBroker(t) + broker.EXPECT().ShowCollectionsInternal(mock.Anything).Return(nil, nil) + meta, err := newMeta(context.TODO(), catalog, nil, broker) assert.NoError(t, err) schema := newTestSchema() diff --git a/internal/datacoord/server.go b/internal/datacoord/server.go index b1517a638a707..9b0f373a3f994 100644 --- a/internal/datacoord/server.go +++ b/internal/datacoord/server.go @@ -46,6 +46,7 @@ import ( "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/internal/types" + "github.com/milvus-io/milvus/internal/util/componentutil" "github.com/milvus-io/milvus/internal/util/dependency" "github.com/milvus-io/milvus/internal/util/sessionutil" "github.com/milvus-io/milvus/pkg/kv" @@ -349,11 +350,20 @@ func (s *Server) UpdateStateCode(code commonpb.StateCode) { } func (s *Server) initDataCoord() error { - var err error - if err = s.initRootCoordClient(); err != nil { + // wait for master init or healthy + log.Info("DataCoord try to wait for RootCoord ready") + if err := s.initRootCoordClient(); err != nil { return err } log.Info("init rootcoord client done") + err := componentutil.WaitForComponentHealthy(s.ctx, s.rootCoordClient, "RootCoord", 1000000, time.Millisecond*200) + if err != nil { + log.Error("DataCoord wait for RootCoord ready failed", zap.Error(err)) + return err + } + log.Info("DataCoord report RootCoord ready") + + s.stateCode.Store(commonpb.StateCode_Initializing) s.broker = broker.NewCoordinatorBroker(s.rootCoordClient) s.allocator = newRootCoordAllocator(s.rootCoordClient) @@ -669,7 +679,7 @@ func (s *Server) initMeta(chunkManager storage.ChunkManager) error { reloadEtcdFn := func() error { var err error catalog := datacoord.NewCatalog(s.kv, chunkManager.RootPath(), metaRootPath) - s.meta, err = newMeta(s.ctx, catalog, chunkManager) + s.meta, err = newMeta(s.ctx, catalog, chunkManager, s.broker) if err != nil { return err } diff --git a/internal/datacoord/server_test.go b/internal/datacoord/server_test.go index 846f04e8cbc37..b8d5d1c1ac729 100644 --- a/internal/datacoord/server_test.go +++ b/internal/datacoord/server_test.go @@ -2181,7 +2181,7 @@ func TestDataCoordServer_SetSegmentState(t *testing.T) { }) t.Run("dataCoord meta set state not exists", func(t *testing.T) { - meta, err := newMemoryMeta() + meta, err := newMemoryMeta(t) assert.NoError(t, err) svr := newTestServer(t, WithMeta(meta)) defer closeTestServer(t, svr) diff --git a/internal/distributed/rootcoord/client/client.go b/internal/distributed/rootcoord/client/client.go index 6de68c45f7d66..45a1f8f457a28 100644 --- a/internal/distributed/rootcoord/client/client.go +++ b/internal/distributed/rootcoord/client/client.go @@ -218,6 +218,18 @@ func (c *Client) ShowCollections(ctx context.Context, in *milvuspb.ShowCollectio }) } +// ShowCollectionsInternal returns all collections, including unhealthy ones. +func (c *Client) ShowCollectionsInternal(ctx context.Context, in *rootcoordpb.ShowCollectionsInternalRequest, opts ...grpc.CallOption) (*rootcoordpb.ShowCollectionsInternalResponse, error) { + in = typeutil.Clone(in) + commonpbutil.UpdateMsgBase( + in.GetBase(), + commonpbutil.FillMsgBaseFromClient(paramtable.GetNodeID(), commonpbutil.WithTargetID(c.grpcClient.GetNodeID())), + ) + return wrapGrpcCall(ctx, c, func(client rootcoordpb.RootCoordClient) (*rootcoordpb.ShowCollectionsInternalResponse, error) { + return client.ShowCollectionsInternal(ctx, in) + }) +} + func (c *Client) AlterCollection(ctx context.Context, request *milvuspb.AlterCollectionRequest, opts ...grpc.CallOption) (*commonpb.Status, error) { request = typeutil.Clone(request) commonpbutil.UpdateMsgBase( diff --git a/internal/distributed/rootcoord/client/client_test.go b/internal/distributed/rootcoord/client/client_test.go index 5661298327fe6..f139630465921 100644 --- a/internal/distributed/rootcoord/client/client_test.go +++ b/internal/distributed/rootcoord/client/client_test.go @@ -104,6 +104,10 @@ func Test_NewClient(t *testing.T) { r, err := client.ShowCollections(ctx, nil) retCheck(retNotNil, r, err) } + { + r, err := client.ShowCollectionsInternal(ctx, nil) + retCheck(retNotNil, r, err) + } { r, err := client.CreatePartition(ctx, nil) retCheck(retNotNil, r, err) @@ -346,6 +350,10 @@ func Test_NewClient(t *testing.T) { rTimeout, err := client.ShowCollections(shortCtx, nil) retCheck(rTimeout, err) } + { + rTimeout, err := client.ShowCollectionsInternal(shortCtx, nil) + retCheck(rTimeout, err) + } { rTimeout, err := client.CreatePartition(shortCtx, nil) retCheck(rTimeout, err) diff --git a/internal/distributed/rootcoord/service.go b/internal/distributed/rootcoord/service.go index e99a81dd50cf9..2d4b4e6fc15ee 100644 --- a/internal/distributed/rootcoord/service.go +++ b/internal/distributed/rootcoord/service.go @@ -405,6 +405,11 @@ func (s *Server) ShowCollections(ctx context.Context, in *milvuspb.ShowCollectio return s.rootCoord.ShowCollections(ctx, in) } +// ShowCollectionsInternal returns all collections, including unhealthy ones. +func (s *Server) ShowCollectionsInternal(ctx context.Context, in *rootcoordpb.ShowCollectionsInternalRequest) (*rootcoordpb.ShowCollectionsInternalResponse, error) { + return s.rootCoord.ShowCollectionsInternal(ctx, in) +} + // CreatePartition creates a partition in a collection func (s *Server) CreatePartition(ctx context.Context, in *milvuspb.CreatePartitionRequest) (*commonpb.Status, error) { return s.rootCoord.CreatePartition(ctx, in) diff --git a/internal/metastore/catalog.go b/internal/metastore/catalog.go index 2d23104912ee5..5cf363d2675ee 100644 --- a/internal/metastore/catalog.go +++ b/internal/metastore/catalog.go @@ -119,7 +119,7 @@ type BinlogsIncrement struct { //go:generate mockery --name=DataCoordCatalog --with-expecter type DataCoordCatalog interface { - ListSegments(ctx context.Context) ([]*datapb.SegmentInfo, error) + ListSegments(ctx context.Context, collectionID int64) ([]*datapb.SegmentInfo, error) AddSegment(ctx context.Context, segment *datapb.SegmentInfo) error // TODO Remove this later, we should update flush segments info for each segment separately, so far we still need transaction AlterSegments(ctx context.Context, newSegments []*datapb.SegmentInfo, binlogs ...BinlogsIncrement) error @@ -181,7 +181,7 @@ type QueryCoordCatalog interface { SavePartition(info ...*querypb.PartitionLoadInfo) error SaveReplica(replicas ...*querypb.Replica) error GetCollections() ([]*querypb.CollectionLoadInfo, error) - GetPartitions() (map[int64][]*querypb.PartitionLoadInfo, error) + GetPartitions(collectionIDs []int64) (map[int64][]*querypb.PartitionLoadInfo, error) GetReplicas() ([]*querypb.Replica, error) ReleaseCollection(collection int64) error ReleasePartition(collection int64, partitions ...int64) error diff --git a/internal/metastore/kv/datacoord/kv_catalog.go b/internal/metastore/kv/datacoord/kv_catalog.go index 7c8872a5fee15..0338aedcfe359 100644 --- a/internal/metastore/kv/datacoord/kv_catalog.go +++ b/internal/metastore/kv/datacoord/kv_catalog.go @@ -64,7 +64,7 @@ func NewCatalog(MetaKv kv.MetaKv, chunkManagerRootPath string, metaRootpath stri } } -func (kc *Catalog) ListSegments(ctx context.Context) ([]*datapb.SegmentInfo, error) { +func (kc *Catalog) ListSegments(ctx context.Context, collectionID int64) ([]*datapb.SegmentInfo, error) { group, _ := errgroup.WithContext(ctx) segments := make([]*datapb.SegmentInfo, 0) insertLogs := make(map[typeutil.UniqueID][]*datapb.FieldBinlog, 1) @@ -73,7 +73,7 @@ func (kc *Catalog) ListSegments(ctx context.Context) ([]*datapb.SegmentInfo, err executeFn := func(binlogType storage.BinlogType, result map[typeutil.UniqueID][]*datapb.FieldBinlog) { group.Go(func() error { - ret, err := kc.listBinlogs(binlogType) + ret, err := kc.listBinlogs(binlogType, collectionID) if err != nil { return err } @@ -88,7 +88,7 @@ func (kc *Catalog) ListSegments(ctx context.Context) ([]*datapb.SegmentInfo, err executeFn(storage.DeleteBinlog, deltaLogs) executeFn(storage.StatsBinlog, statsLogs) group.Go(func() error { - ret, err := kc.listSegments() + ret, err := kc.listSegments(collectionID) if err != nil { return err } @@ -108,7 +108,7 @@ func (kc *Catalog) ListSegments(ctx context.Context) ([]*datapb.SegmentInfo, err return segments, nil } -func (kc *Catalog) listSegments() ([]*datapb.SegmentInfo, error) { +func (kc *Catalog) listSegments(collectionID int64) ([]*datapb.SegmentInfo, error) { segments := make([]*datapb.SegmentInfo, 0) applyFn := func(key []byte, value []byte) error { @@ -127,7 +127,7 @@ func (kc *Catalog) listSegments() ([]*datapb.SegmentInfo, error) { return nil } - err := kc.MetaKv.WalkWithPrefix(SegmentPrefix+"/", kc.paginationSize, applyFn) + err := kc.MetaKv.WalkWithPrefix(buildCollectionPrefix(collectionID), kc.paginationSize, applyFn) if err != nil { return nil, err } @@ -135,43 +135,32 @@ func (kc *Catalog) listSegments() ([]*datapb.SegmentInfo, error) { return segments, nil } -func (kc *Catalog) parseBinlogKey(key string, prefixIdx int) (int64, int64, int64, error) { - remainedKey := key[prefixIdx:] - keyWordGroup := strings.Split(remainedKey, "/") +func (kc *Catalog) parseBinlogKey(key string) (int64, error) { + // by-dev/meta/datacoord-meta/binlog/454086059555817418/454086059555817543/454329387504816753/1 + // ---------------------------------|collectionID |partitionID |segmentID |fieldID + keyWordGroup := strings.Split(key, "/") if len(keyWordGroup) < 3 { - return 0, 0, 0, fmt.Errorf("parse key: %s failed, trimmed key:%s", key, remainedKey) + return 0, fmt.Errorf("parse key: %s failed, key:%s", key, key) } - - collectionID, err := strconv.ParseInt(keyWordGroup[0], 10, 64) - if err != nil { - return 0, 0, 0, fmt.Errorf("parse key: %s failed, trimmed key:%s, %w", key, remainedKey, err) - } - - partitionID, err := strconv.ParseInt(keyWordGroup[1], 10, 64) - if err != nil { - return 0, 0, 0, fmt.Errorf("parse key: %s failed, trimmed key:%s, %w", key, remainedKey, err) - } - - segmentID, err := strconv.ParseInt(keyWordGroup[2], 10, 64) + segmentID, err := strconv.ParseInt(keyWordGroup[len(keyWordGroup)-2], 10, 64) if err != nil { - return 0, 0, 0, fmt.Errorf("parse key: %s failed, trimmed key:%s, %w", key, remainedKey, err) + return 0, fmt.Errorf("parse key failed, key:%s, %w", key, err) } - - return collectionID, partitionID, segmentID, nil + return segmentID, nil } -func (kc *Catalog) listBinlogs(binlogType storage.BinlogType) (map[typeutil.UniqueID][]*datapb.FieldBinlog, error) { +func (kc *Catalog) listBinlogs(binlogType storage.BinlogType, collectionID int64) (map[typeutil.UniqueID][]*datapb.FieldBinlog, error) { ret := make(map[typeutil.UniqueID][]*datapb.FieldBinlog) var err error var logPathPrefix string switch binlogType { case storage.InsertBinlog: - logPathPrefix = SegmentBinlogPathPrefix + logPathPrefix = fmt.Sprintf("%s/%d", SegmentBinlogPathPrefix, collectionID) case storage.DeleteBinlog: - logPathPrefix = SegmentDeltalogPathPrefix + logPathPrefix = fmt.Sprintf("%s/%d", SegmentDeltalogPathPrefix, collectionID) case storage.StatsBinlog: - logPathPrefix = SegmentStatslogPathPrefix + logPathPrefix = fmt.Sprintf("%s/%d", SegmentStatslogPathPrefix, collectionID) default: err = fmt.Errorf("invalid binlog type: %d", binlogType) } @@ -179,13 +168,6 @@ func (kc *Catalog) listBinlogs(binlogType storage.BinlogType) (map[typeutil.Uniq return nil, err } - var prefixIdx int - if len(kc.metaRootpath) == 0 { - prefixIdx = len(logPathPrefix) + 1 - } else { - prefixIdx = len(kc.metaRootpath) + 1 + len(logPathPrefix) + 1 - } - applyFn := func(key []byte, value []byte) error { fieldBinlog := &datapb.FieldBinlog{} err := proto.Unmarshal(value, fieldBinlog) @@ -193,7 +175,7 @@ func (kc *Catalog) listBinlogs(binlogType storage.BinlogType) (map[typeutil.Uniq return fmt.Errorf("failed to unmarshal datapb.FieldBinlog: %d, err:%w", fieldBinlog.FieldID, err) } - _, _, segmentID, err := kc.parseBinlogKey(string(key), prefixIdx) + segmentID, err := kc.parseBinlogKey(string(key)) if err != nil { return fmt.Errorf("prefix:%s, %w", path.Join(kc.metaRootpath, logPathPrefix), err) } diff --git a/internal/metastore/kv/datacoord/kv_catalog_test.go b/internal/metastore/kv/datacoord/kv_catalog_test.go index 7076e6f1ebb81..2672399799a7a 100644 --- a/internal/metastore/kv/datacoord/kv_catalog_test.go +++ b/internal/metastore/kv/datacoord/kv_catalog_test.go @@ -42,7 +42,6 @@ import ( "github.com/milvus-io/milvus/internal/proto/indexpb" "github.com/milvus-io/milvus/pkg/kv/predicates" "github.com/milvus-io/milvus/pkg/util/etcd" - "github.com/milvus-io/milvus/pkg/util/metautil" "github.com/milvus-io/milvus/pkg/util/paramtable" ) @@ -172,7 +171,7 @@ func Test_ListSegments(t *testing.T) { metakv.EXPECT().WalkWithPrefix(mock.Anything, mock.Anything, mock.Anything).Return(errors.New("error")) catalog := NewCatalog(metakv, rootPath, "") - ret, err := catalog.ListSegments(context.TODO()) + ret, err := catalog.ListSegments(context.TODO(), collectionID) assert.Nil(t, ret) assert.Error(t, err) }) @@ -219,7 +218,7 @@ func Test_ListSegments(t *testing.T) { }) catalog := NewCatalog(metakv, rootPath, "") - ret, err := catalog.ListSegments(context.TODO()) + ret, err := catalog.ListSegments(context.TODO(), collectionID) assert.NotNil(t, ret) assert.NoError(t, err) @@ -257,7 +256,7 @@ func Test_ListSegments(t *testing.T) { return errors.New("should not reach here") }) - ret, err := catalog.ListSegments(context.TODO()) + ret, err := catalog.ListSegments(context.TODO(), collectionID) assert.NotNil(t, ret) assert.NoError(t, err) @@ -746,44 +745,16 @@ func Test_ChannelExists_SaveError(t *testing.T) { func Test_parseBinlogKey(t *testing.T) { catalog := NewCatalog(nil, "", "") - t.Run("parse collection id fail", func(t *testing.T) { - ret1, ret2, ret3, err := catalog.parseBinlogKey("root/err/1/1/1", 5) - assert.Error(t, err) - assert.Equal(t, int64(0), ret1) - assert.Equal(t, int64(0), ret2) - assert.Equal(t, int64(0), ret3) - }) - - t.Run("parse partition id fail", func(t *testing.T) { - ret1, ret2, ret3, err := catalog.parseBinlogKey("root/1/err/1/1", 5) - assert.Error(t, err) - assert.Equal(t, int64(0), ret1) - assert.Equal(t, int64(0), ret2) - assert.Equal(t, int64(0), ret3) - }) - t.Run("parse segment id fail", func(t *testing.T) { - ret1, ret2, ret3, err := catalog.parseBinlogKey("root/1/1/err/1", 5) - assert.Error(t, err) - assert.Equal(t, int64(0), ret1) - assert.Equal(t, int64(0), ret2) - assert.Equal(t, int64(0), ret3) - }) - - t.Run("miss field", func(t *testing.T) { - ret1, ret2, ret3, err := catalog.parseBinlogKey("root/1/1/", 5) + segmentID, err := catalog.parseBinlogKey("root/1/1/err/1") assert.Error(t, err) - assert.Equal(t, int64(0), ret1) - assert.Equal(t, int64(0), ret2) - assert.Equal(t, int64(0), ret3) + assert.Equal(t, int64(0), segmentID) }) t.Run("test ok", func(t *testing.T) { - ret1, ret2, ret3, err := catalog.parseBinlogKey("root/1/1/1/1", 5) + segmentID, err := catalog.parseBinlogKey("root/1/1/1/1") assert.NoError(t, err) - assert.Equal(t, int64(1), ret1) - assert.Equal(t, int64(1), ret2) - assert.Equal(t, int64(1), ret3) + assert.Equal(t, int64(1), segmentID) }) } @@ -1194,7 +1165,7 @@ func BenchmarkCatalog_List1000Segments(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - segments, err := catalog.ListSegments(ctx) + segments, err := catalog.ListSegments(ctx, collectionID) assert.NoError(b, err) for _, s := range segments { assert.NotNil(b, s) @@ -1207,13 +1178,8 @@ func BenchmarkCatalog_List1000Segments(b *testing.B) { func generateSegments(ctx context.Context, catalog *Catalog, n int, rootPath string) { rand.Seed(time.Now().UnixNano()) - var collectionID int64 for i := 0; i < n; i++ { - if collectionID%25 == 0 { - collectionID = rand.Int63() - } - v := rand.Int63() segment := addSegment(rootPath, collectionID, v, v, v) err := catalog.AddSegment(ctx, segment) @@ -1230,7 +1196,7 @@ func addSegment(rootPath string, collectionID, partitionID, segmentID, fieldID i Binlogs: []*datapb.Binlog{ { EntriesNum: 10000, - LogPath: metautil.BuildInsertLogPath(rootPath, collectionID, partitionID, segmentID, fieldID, int64(rand.Int())), + LogID: int64(rand.Int()), }, }, }, @@ -1242,58 +1208,7 @@ func addSegment(rootPath string, collectionID, partitionID, segmentID, fieldID i Binlogs: []*datapb.Binlog{ { EntriesNum: 5, - LogPath: metautil.BuildDeltaLogPath(rootPath, collectionID, partitionID, segmentID, int64(rand.Int())), - }, - }, - }, - } - - statslogs = []*datapb.FieldBinlog{ - { - FieldID: 1, - Binlogs: []*datapb.Binlog{ - { - EntriesNum: 5, - LogPath: metautil.BuildStatsLogPath(rootPath, collectionID, partitionID, segmentID, fieldID, int64(rand.Int())), - }, - }, - }, - } - - return &datapb.SegmentInfo{ - ID: segmentID, - CollectionID: collectionID, - PartitionID: partitionID, - NumOfRows: 10000, - State: commonpb.SegmentState_Flushed, - Binlogs: binlogs, - Deltalogs: deltalogs, - Statslogs: statslogs, - } -} - -func getSegment(rootPath string, collectionID, partitionID, segmentID, fieldID int64, binlogNum int) *datapb.SegmentInfo { - binLogPaths := make([]*datapb.Binlog, binlogNum) - for i := 0; i < binlogNum; i++ { - binLogPaths[i] = &datapb.Binlog{ - EntriesNum: 10000, - LogPath: metautil.BuildInsertLogPath(rootPath, collectionID, partitionID, segmentID, fieldID, int64(i)), - } - } - binlogs = []*datapb.FieldBinlog{ - { - FieldID: fieldID, - Binlogs: binLogPaths, - }, - } - - deltalogs = []*datapb.FieldBinlog{ - { - FieldID: fieldID, - Binlogs: []*datapb.Binlog{ - { - EntriesNum: 5, - LogPath: metautil.BuildDeltaLogPath(rootPath, collectionID, partitionID, segmentID, int64(rand.Int())), + LogID: int64(rand.Int()), }, }, }, @@ -1305,7 +1220,7 @@ func getSegment(rootPath string, collectionID, partitionID, segmentID, fieldID i Binlogs: []*datapb.Binlog{ { EntriesNum: 5, - LogPath: metautil.BuildStatsLogPath(rootPath, collectionID, partitionID, segmentID, fieldID, int64(rand.Int())), + LogID: int64(rand.Int()), }, }, }, diff --git a/internal/metastore/kv/querycoord/kv_catalog.go b/internal/metastore/kv/querycoord/kv_catalog.go index fc0676eb682f6..607604f2867bf 100644 --- a/internal/metastore/kv/querycoord/kv_catalog.go +++ b/internal/metastore/kv/querycoord/kv_catalog.go @@ -16,6 +16,7 @@ import ( "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/pkg/kv" "github.com/milvus-io/milvus/pkg/util/compressor" + "github.com/milvus-io/milvus/pkg/util/conc" "github.com/milvus-io/milvus/pkg/util/paramtable" ) @@ -36,12 +37,16 @@ const ( type Catalog struct { cli kv.MetaKv paginationSize int + + pool *conc.Pool[any] } func NewCatalog(cli kv.MetaKv) Catalog { + ioPool := conc.NewPool[any](paramtable.Get().MetaStoreCfg.ReadConcurrency.GetAsInt()) return Catalog{ cli: cli, paginationSize: paramtable.Get().MetaStoreCfg.PaginationSize.GetAsInt(), + pool: ioPool, } } @@ -125,23 +130,40 @@ func (s Catalog) GetCollections() ([]*querypb.CollectionLoadInfo, error) { return ret, nil } -func (s Catalog) GetPartitions() (map[int64][]*querypb.PartitionLoadInfo, error) { - ret := make(map[int64][]*querypb.PartitionLoadInfo) - applyFn := func(key []byte, value []byte) error { - info := querypb.PartitionLoadInfo{} - if err := proto.Unmarshal(value, &info); err != nil { - return err - } - ret[info.GetCollectionID()] = append(ret[info.GetCollectionID()], &info) - return nil +func (s Catalog) GetPartitions(collectionIDs []int64) (map[int64][]*querypb.PartitionLoadInfo, error) { + collectionPartitions := make([][]*querypb.PartitionLoadInfo, len(collectionIDs)) + futures := make([]*conc.Future[any], 0, len(collectionIDs)) + for i, collectionID := range collectionIDs { + i := i + collectionID := collectionID + futures = append(futures, s.pool.Submit(func() (any, error) { + prefix := EncodePartitionLoadInfoPrefix(collectionID) + _, values, err := s.cli.LoadWithPrefix(prefix) + if err != nil { + return nil, err + } + ret := make([]*querypb.PartitionLoadInfo, 0, len(values)) + for _, v := range values { + info := querypb.PartitionLoadInfo{} + if err = proto.Unmarshal([]byte(v), &info); err != nil { + return nil, err + } + ret = append(ret, &info) + } + collectionPartitions[i] = ret + return nil, nil + })) } - - err := s.cli.WalkWithPrefix(PartitionLoadInfoPrefix, s.paginationSize, applyFn) + err := conc.AwaitAll(futures...) if err != nil { return nil, err } - return ret, nil + result := make(map[int64][]*querypb.PartitionLoadInfo, len(collectionIDs)) + for i, partitions := range collectionPartitions { + result[collectionIDs[i]] = partitions + } + return result, nil } func (s Catalog) GetReplicas() ([]*querypb.Replica, error) { @@ -328,6 +350,10 @@ func EncodePartitionLoadInfoKey(collection, partition int64) string { return fmt.Sprintf("%s/%d/%d", PartitionLoadInfoPrefix, collection, partition) } +func EncodePartitionLoadInfoPrefix(collection int64) string { + return fmt.Sprintf("%s/%d/", PartitionLoadInfoPrefix, collection) +} + func encodeReplicaKey(collection, replica int64) string { return fmt.Sprintf("%s/%d/%d", ReplicaPrefix, collection, replica) } diff --git a/internal/metastore/kv/querycoord/kv_catalog_test.go b/internal/metastore/kv/querycoord/kv_catalog_test.go index 078c943a91afa..cb91834828f45 100644 --- a/internal/metastore/kv/querycoord/kv_catalog_test.go +++ b/internal/metastore/kv/querycoord/kv_catalog_test.go @@ -5,6 +5,7 @@ import ( "testing" "github.com/cockroachdb/errors" + "github.com/samber/lo" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" @@ -96,7 +97,9 @@ func (suite *CatalogTestSuite) TestCollectionWithPartition() { suite.NoError(err) suite.Len(collections, 1) suite.Equal(int64(3), collections[0].GetCollectionID()) - partitions, err := suite.catalog.GetPartitions() + partitions, err := suite.catalog.GetPartitions(lo.Map(collections, func(collection *querypb.CollectionLoadInfo, _ int) int64 { + return collection.GetCollectionID() + })) suite.NoError(err) suite.Len(partitions, 1) suite.Len(partitions[int64(3)], 1) @@ -119,11 +122,55 @@ func (suite *CatalogTestSuite) TestPartition() { suite.catalog.ReleasePartition(1) suite.catalog.ReleasePartition(2) - partitions, err := suite.catalog.GetPartitions() + partitions, err := suite.catalog.GetPartitions([]int64{0}) suite.NoError(err) suite.Len(partitions, 1) } +func (suite *CatalogTestSuite) TestGetPartitions() { + suite.catalog.SaveCollection(&querypb.CollectionLoadInfo{ + CollectionID: 1, + }) + suite.catalog.SavePartition(&querypb.PartitionLoadInfo{ + CollectionID: 1, + PartitionID: 100, + }) + suite.catalog.SaveCollection(&querypb.CollectionLoadInfo{ + CollectionID: 2, + }) + suite.catalog.SavePartition(&querypb.PartitionLoadInfo{ + CollectionID: 2, + PartitionID: 200, + }) + suite.catalog.SaveCollection(&querypb.CollectionLoadInfo{ + CollectionID: 3, + }) + suite.catalog.SavePartition(&querypb.PartitionLoadInfo{ + CollectionID: 3, + PartitionID: 300, + }) + + partitions, err := suite.catalog.GetPartitions([]int64{1, 2, 3}) + suite.NoError(err) + suite.Len(partitions, 3) + suite.Len(partitions[int64(1)], 1) + suite.Len(partitions[int64(2)], 1) + suite.Len(partitions[int64(3)], 1) + partitions, err = suite.catalog.GetPartitions([]int64{2, 3}) + suite.NoError(err) + suite.Len(partitions, 2) + suite.Len(partitions[int64(2)], 1) + suite.Len(partitions[int64(3)], 1) + partitions, err = suite.catalog.GetPartitions([]int64{3}) + suite.NoError(err) + suite.Len(partitions, 1) + suite.Len(partitions[int64(3)], 1) + suite.Equal(int64(300), partitions[int64(3)][0].GetPartitionID()) + partitions, err = suite.catalog.GetPartitions([]int64{}) + suite.NoError(err) + suite.Len(partitions, 0) +} + func (suite *CatalogTestSuite) TestReleaseManyPartitions() { partitionIDs := make([]int64, 0) for i := 1; i <= 150; i++ { @@ -136,9 +183,10 @@ func (suite *CatalogTestSuite) TestReleaseManyPartitions() { err := suite.catalog.ReleasePartition(1, partitionIDs...) suite.NoError(err) - partitions, err := suite.catalog.GetPartitions() + partitions, err := suite.catalog.GetPartitions([]int64{1}) suite.NoError(err) - suite.Len(partitions, 0) + suite.Len(partitions, 1) + suite.Len(partitions[int64(1)], 0) } func (suite *CatalogTestSuite) TestReplica() { diff --git a/internal/metastore/kv/rootcoord/kv_catalog.go b/internal/metastore/kv/rootcoord/kv_catalog.go index 6b50f5d38788c..c0cfe2cd9e002 100644 --- a/internal/metastore/kv/rootcoord/kv_catalog.go +++ b/internal/metastore/kv/rootcoord/kv_catalog.go @@ -24,10 +24,12 @@ import ( "github.com/milvus-io/milvus/pkg/kv" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util" + "github.com/milvus-io/milvus/pkg/util/conc" "github.com/milvus-io/milvus/pkg/util/crypto" "github.com/milvus-io/milvus/pkg/util/etcd" "github.com/milvus-io/milvus/pkg/util/funcutil" "github.com/milvus-io/milvus/pkg/util/merr" + "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -39,6 +41,13 @@ import ( type Catalog struct { Txn kv.TxnKV Snapshot kv.SnapShotKV + + pool *conc.Pool[any] +} + +func NewCatalog(metaKV kv.TxnKV, ss kv.SnapShotKV) metastore.RootCoordCatalog { + ioPool := conc.NewPool[any](paramtable.Get().MetaStoreCfg.ReadConcurrency.GetAsInt()) + return &Catalog{Txn: metaKV, Snapshot: ss, pool: ioPool} } func BuildCollectionKey(dbID typeutil.UniqueID, collectionID typeutil.UniqueID) string { @@ -440,7 +449,6 @@ func (kc *Catalog) appendPartitionAndFieldsInfo(ctx context.Context, collMeta *p return collection, nil } -// TODO: This function will be invoked many times if there are many databases, leading to significant overhead. func (kc *Catalog) batchAppendPartitionAndFieldsInfo(ctx context.Context, collMeta []*pb.CollectionInfo, ts typeutil.Timestamp, ) ([]*model.Collection, error) { @@ -728,27 +736,33 @@ func (kc *Catalog) ListCollections(ctx context.Context, dbID int64, ts typeutil. } start := time.Now() - colls := make([]*pb.CollectionInfo, 0, len(vals)) - for _, val := range vals { - collMeta := &pb.CollectionInfo{} - err := proto.Unmarshal([]byte(val), collMeta) - if err != nil { - log.Warn("unmarshal collection info failed", zap.Error(err)) - continue - } - kc.fixDefaultDBIDConsistency(ctx, collMeta, ts) - colls = append(colls, collMeta) + colls := make([]*model.Collection, len(vals)) + futures := make([]*conc.Future[any], 0, len(vals)) + for i, val := range vals { + i := i + val := val + futures = append(futures, kc.pool.Submit(func() (any, error) { + collMeta := &pb.CollectionInfo{} + err := proto.Unmarshal([]byte(val), collMeta) + if err != nil { + log.Warn("unmarshal collection info failed", zap.Error(err)) + return nil, err + } + kc.fixDefaultDBIDConsistency(ctx, collMeta, ts) + collection, err := kc.appendPartitionAndFieldsInfo(ctx, collMeta, ts) + if err != nil { + return nil, err + } + colls[i] = collection + return nil, nil + })) } - log.Info("unmarshal all collection details cost", zap.Int64("db", dbID), zap.Duration("cost", time.Since(start))) - - start = time.Now() - ret, err := kc.batchAppendPartitionAndFieldsInfo(ctx, colls, ts) - log.Info("append partition and fields info cost", zap.Int64("db", dbID), zap.Duration("cost", time.Since(start))) + err = conc.AwaitAll(futures...) if err != nil { return nil, err } - - return ret, nil + log.Info("unmarshal all collection details cost", zap.Int64("db", dbID), zap.Duration("cost", time.Since(start))) + return colls, nil } // fixDefaultDBIDConsistency fix dbID consistency for collectionInfo. @@ -756,12 +770,12 @@ func (kc *Catalog) ListCollections(ctx context.Context, dbID int64, ts typeutil. // all collections in default database should be marked with dbID 1. // this method also update dbid in meta store when dbid is 0 // see also: https://github.com/milvus-io/milvus/issues/33608 -func (kv *Catalog) fixDefaultDBIDConsistency(_ context.Context, collMeta *pb.CollectionInfo, ts typeutil.Timestamp) { +func (kc *Catalog) fixDefaultDBIDConsistency(_ context.Context, collMeta *pb.CollectionInfo, ts typeutil.Timestamp) { if collMeta.DbId == util.NonDBID { coll := model.UnmarshalCollectionModel(collMeta) cloned := coll.Clone() cloned.DBID = util.DefaultDBID - kv.alterModifyCollection(coll, cloned, ts) + kc.alterModifyCollection(coll, cloned, ts) collMeta.DbId = util.DefaultDBID } diff --git a/internal/metastore/kv/rootcoord/kv_catalog_test.go b/internal/metastore/kv/rootcoord/kv_catalog_test.go index abe0f1f46be96..bc9b6e831408b 100644 --- a/internal/metastore/kv/rootcoord/kv_catalog_test.go +++ b/internal/metastore/kv/rootcoord/kv_catalog_test.go @@ -105,7 +105,7 @@ func TestCatalog_ListCollections(t *testing.T) { kv.On("LoadWithPrefix", CollectionMetaPrefix, ts). Return(nil, nil, targetErr) - kc := Catalog{Snapshot: kv} + kc := NewCatalog(nil, kv) ret, err := kc.ListCollections(ctx, util.NonDBID, ts) assert.ErrorIs(t, err, targetErr) assert.Nil(t, ret) @@ -119,12 +119,8 @@ func TestCatalog_ListCollections(t *testing.T) { assert.NoError(t, err) kv.On("LoadWithPrefix", CollectionMetaPrefix, ts). Return([]string{"key"}, []string{string(bColl)}, nil) - kv.On("LoadWithPrefix", mock.MatchedBy( - func(prefix string) bool { - return strings.HasPrefix(prefix, PartitionMetaPrefix) - }), ts). - Return(nil, nil, targetErr) - kc := Catalog{Snapshot: kv} + kv.EXPECT().LoadWithPrefix(mock.Anything, mock.Anything).Return(nil, nil, targetErr) + kc := NewCatalog(nil, kv) ret, err := kc.ListCollections(ctx, util.NonDBID, ts) assert.ErrorIs(t, err, targetErr) @@ -155,7 +151,7 @@ func TestCatalog_ListCollections(t *testing.T) { return strings.HasPrefix(prefix, FieldMetaPrefix) }), ts). Return(nil, nil, targetErr) - kc := Catalog{Snapshot: kv} + kc := NewCatalog(nil, kv) ret, err := kc.ListCollections(ctx, util.NonDBID, ts) assert.ErrorIs(t, err, targetErr) @@ -171,7 +167,7 @@ func TestCatalog_ListCollections(t *testing.T) { kv.On("LoadWithPrefix", CollectionMetaPrefix, ts). Return([]string{"key"}, []string{string(bColl)}, nil) kv.On("MultiSaveAndRemoveWithPrefix", mock.Anything, mock.Anything, ts).Return(nil) - kc := Catalog{Snapshot: kv} + kc := NewCatalog(nil, kv) ret, err := kc.ListCollections(ctx, util.NonDBID, ts) assert.NoError(t, err) @@ -208,7 +204,7 @@ func TestCatalog_ListCollections(t *testing.T) { return strings.HasPrefix(prefix, FieldMetaPrefix) }), ts). Return([]string{"rootcoord/fields/1/1"}, []string{string(fm)}, nil) - kc := Catalog{Snapshot: kv} + kc := NewCatalog(nil, kv) ret, err := kc.ListCollections(ctx, testDb, ts) assert.NoError(t, err) @@ -250,7 +246,7 @@ func TestCatalog_ListCollections(t *testing.T) { }), ts). Return([]string{"rootcoord/fields/1/1"}, []string{string(fm)}, nil) kv.On("MultiSaveAndRemoveWithPrefix", mock.Anything, mock.Anything, ts).Return(nil) - kc := Catalog{Snapshot: kv} + kc := NewCatalog(nil, kv) ret, err := kc.ListCollections(ctx, util.NonDBID, ts) assert.NoError(t, err) @@ -266,7 +262,7 @@ func TestCatalog_loadCollection(t *testing.T) { ctx := context.Background() kv := mocks.NewSnapShotKV(t) kv.EXPECT().Load(mock.Anything, mock.Anything).Return("", errors.New("mock")) - kc := Catalog{Snapshot: kv} + kc := NewCatalog(nil, kv).(*Catalog) _, err := kc.loadCollection(ctx, testDb, 1, 0) assert.Error(t, err) }) @@ -275,7 +271,7 @@ func TestCatalog_loadCollection(t *testing.T) { ctx := context.Background() kv := mocks.NewSnapShotKV(t) kv.EXPECT().Load(mock.Anything, mock.Anything).Return("not in pb format", nil) - kc := Catalog{Snapshot: kv} + kc := NewCatalog(nil, kv).(*Catalog) _, err := kc.loadCollection(ctx, testDb, 1, 0) assert.Error(t, err) }) @@ -287,7 +283,7 @@ func TestCatalog_loadCollection(t *testing.T) { assert.NoError(t, err) kv := mocks.NewSnapShotKV(t) kv.EXPECT().Load(mock.Anything, mock.Anything).Return(string(value), nil) - kc := Catalog{Snapshot: kv} + kc := NewCatalog(nil, kv).(*Catalog) got, err := kc.loadCollection(ctx, util.DefaultDBID, 1, 0) assert.NoError(t, err) assert.Equal(t, got.GetID(), coll.GetID()) @@ -305,7 +301,7 @@ func TestCatalog_loadCollection(t *testing.T) { kv := mocks.NewSnapShotKV(t) kv.EXPECT().Load(mock.Anything, mock.Anything).Return(string(value), nil) kv.EXPECT().MultiSaveAndRemoveWithPrefix(mock.Anything, mock.Anything, mock.Anything).Return(nil) - kc := Catalog{Snapshot: kv} + kc := NewCatalog(nil, kv).(*Catalog) got, err := kc.loadCollection(ctx, util.NonDBID, 1, 0) assert.NoError(t, err) assert.Equal(t, got.GetID(), coll.GetID()) @@ -359,7 +355,7 @@ func Test_partitionExistByName(t *testing.T) { func TestCatalog_GetCollectionByID(t *testing.T) { ctx := context.TODO() ss := mocks.NewSnapShotKV(t) - c := Catalog{Snapshot: ss} + c := NewCatalog(nil, ss) ss.EXPECT().Load(mock.Anything, mock.Anything).Return("", errors.New("load error")).Twice() coll, err := c.GetCollectionByID(ctx, 0, 1, 1) @@ -396,7 +392,7 @@ func TestCatalog_CreatePartitionV2(t *testing.T) { snapshot.LoadFunc = func(key string, ts typeutil.Timestamp) (string, error) { return "", errors.New("mock") } - kc := Catalog{Snapshot: snapshot} + kc := NewCatalog(nil, snapshot).(*Catalog) err := kc.CreatePartition(ctx, 0, &model.Partition{}, 0) assert.Error(t, err) }) @@ -418,7 +414,7 @@ func TestCatalog_CreatePartitionV2(t *testing.T) { return errors.New("mock") } - kc := Catalog{Snapshot: snapshot} + kc := NewCatalog(nil, snapshot).(*Catalog) err = kc.CreatePartition(ctx, 0, &model.Partition{}, 0) assert.Error(t, err) @@ -443,7 +439,7 @@ func TestCatalog_CreatePartitionV2(t *testing.T) { return string(value), nil } - kc := Catalog{Snapshot: snapshot} + kc := NewCatalog(nil, snapshot).(*Catalog) err = kc.CreatePartition(ctx, 0, &model.Partition{PartitionID: partID}, 0) assert.Error(t, err) @@ -462,7 +458,7 @@ func TestCatalog_CreatePartitionV2(t *testing.T) { return string(value), nil } - kc := Catalog{Snapshot: snapshot} + kc := NewCatalog(nil, snapshot).(*Catalog) err = kc.CreatePartition(ctx, 0, &model.Partition{PartitionName: partition}, 0) assert.Error(t, err) @@ -488,7 +484,7 @@ func TestCatalog_CreatePartitionV2(t *testing.T) { return errors.New("mock") } - kc := Catalog{Snapshot: snapshot} + kc := NewCatalog(nil, snapshot).(*Catalog) err = kc.CreatePartition(ctx, 0, &model.Partition{}, 0) assert.Error(t, err) @@ -509,7 +505,7 @@ func TestCatalog_CreateAliasV2(t *testing.T) { return errors.New("mock") } - kc := Catalog{Snapshot: snapshot} + kc := NewCatalog(nil, snapshot).(*Catalog) err := kc.CreateAlias(ctx, &model.Alias{}, 0) assert.Error(t, err) @@ -530,7 +526,7 @@ func TestCatalog_listPartitionsAfter210(t *testing.T) { return nil, nil, errors.New("mock") } - kc := Catalog{Snapshot: snapshot} + kc := NewCatalog(nil, snapshot).(*Catalog) _, err := kc.listPartitionsAfter210(ctx, 1, 0) assert.Error(t, err) @@ -544,7 +540,7 @@ func TestCatalog_listPartitionsAfter210(t *testing.T) { return []string{"key"}, []string{"not in pb format"}, nil } - kc := Catalog{Snapshot: snapshot} + kc := NewCatalog(nil, snapshot).(*Catalog) _, err := kc.listPartitionsAfter210(ctx, 1, 0) assert.Error(t, err) @@ -562,7 +558,7 @@ func TestCatalog_listPartitionsAfter210(t *testing.T) { return []string{"key"}, []string{string(value)}, nil } - kc := Catalog{Snapshot: snapshot} + kc := NewCatalog(nil, snapshot).(*Catalog) got, err := kc.listPartitionsAfter210(ctx, 1, 0) assert.NoError(t, err) @@ -588,7 +584,7 @@ func TestCatalog_listFieldsAfter210(t *testing.T) { return nil, nil, errors.New("mock") } - kc := Catalog{Snapshot: snapshot} + kc := NewCatalog(nil, snapshot).(*Catalog) _, err := kc.listFieldsAfter210(ctx, 1, 0) assert.Error(t, err) @@ -602,7 +598,7 @@ func TestCatalog_listFieldsAfter210(t *testing.T) { return []string{"key"}, []string{"not in pb format"}, nil } - kc := Catalog{Snapshot: snapshot} + kc := NewCatalog(nil, snapshot).(*Catalog) _, err := kc.listFieldsAfter210(ctx, 1, 0) assert.Error(t, err) @@ -620,7 +616,7 @@ func TestCatalog_listFieldsAfter210(t *testing.T) { return []string{"key"}, []string{string(value)}, nil } - kc := Catalog{Snapshot: snapshot} + kc := NewCatalog(nil, snapshot).(*Catalog) got, err := kc.listFieldsAfter210(ctx, 1, 0) assert.NoError(t, err) @@ -637,7 +633,7 @@ func TestCatalog_AlterAliasV2(t *testing.T) { return errors.New("mock") } - kc := Catalog{Snapshot: snapshot} + kc := NewCatalog(nil, snapshot).(*Catalog) err := kc.AlterAlias(ctx, &model.Alias{}, 0) assert.Error(t, err) @@ -686,7 +682,7 @@ func TestCatalog_DropPartitionV2(t *testing.T) { snapshot.On("Load", mock.Anything, mock.Anything).Return("not in codec format", nil) - kc := Catalog{Snapshot: snapshot} + kc := NewCatalog(nil, snapshot).(*Catalog) err := kc.DropPartition(ctx, 0, 100, 101, 0) assert.Error(t, err) @@ -699,7 +695,7 @@ func TestCatalog_DropPartitionV2(t *testing.T) { snapshot.On("Load", mock.Anything, mock.Anything).Return("", merr.WrapErrIoKeyNotFound("partition")) - kc := Catalog{Snapshot: snapshot} + kc := NewCatalog(nil, snapshot).(*Catalog) err := kc.DropPartition(ctx, 0, 100, 101, 0) assert.NoError(t, err) @@ -720,7 +716,7 @@ func TestCatalog_DropPartitionV2(t *testing.T) { return errors.New("mock") } - kc := Catalog{Snapshot: snapshot} + kc := NewCatalog(nil, snapshot).(*Catalog) err = kc.DropPartition(ctx, 0, 100, 101, 0) assert.Error(t, err) @@ -752,7 +748,7 @@ func TestCatalog_DropPartitionV2(t *testing.T) { return errors.New("mock") } - kc := Catalog{Snapshot: snapshot} + kc := NewCatalog(nil, snapshot).(*Catalog) err = kc.DropPartition(ctx, 0, 100, 101, 0) assert.Error(t, err) @@ -773,7 +769,7 @@ func TestCatalog_DropAliasV2(t *testing.T) { return errors.New("mock") } - kc := Catalog{Snapshot: snapshot} + kc := NewCatalog(nil, snapshot).(*Catalog) err := kc.DropAlias(ctx, testDb, "alias", 0) assert.Error(t, err) @@ -794,7 +790,7 @@ func TestCatalog_listAliasesBefore210(t *testing.T) { return nil, nil, errors.New("mock") } - kc := Catalog{Snapshot: snapshot} + kc := NewCatalog(nil, snapshot).(*Catalog) _, err := kc.listAliasesBefore210(ctx, 0) assert.Error(t, err) @@ -808,7 +804,7 @@ func TestCatalog_listAliasesBefore210(t *testing.T) { return []string{"key"}, []string{"not in pb format"}, nil } - kc := Catalog{Snapshot: snapshot} + kc := NewCatalog(nil, snapshot).(*Catalog) _, err := kc.listAliasesBefore210(ctx, 0) assert.Error(t, err) @@ -826,7 +822,7 @@ func TestCatalog_listAliasesBefore210(t *testing.T) { return []string{"key"}, []string{string(value)}, nil } - kc := Catalog{Snapshot: snapshot} + kc := NewCatalog(nil, snapshot).(*Catalog) got, err := kc.listAliasesBefore210(ctx, 0) assert.NoError(t, err) @@ -844,7 +840,7 @@ func TestCatalog_listAliasesAfter210(t *testing.T) { return nil, nil, errors.New("mock") } - kc := Catalog{Snapshot: snapshot} + kc := NewCatalog(nil, snapshot).(*Catalog) _, err := kc.listAliasesAfter210WithDb(ctx, testDb, 0) assert.Error(t, err) @@ -858,7 +854,7 @@ func TestCatalog_listAliasesAfter210(t *testing.T) { return []string{"key"}, []string{"not in pb format"}, nil } - kc := Catalog{Snapshot: snapshot} + kc := NewCatalog(nil, snapshot).(*Catalog) _, err := kc.listAliasesAfter210WithDb(ctx, testDb, 0) assert.Error(t, err) @@ -876,7 +872,7 @@ func TestCatalog_listAliasesAfter210(t *testing.T) { return []string{"key"}, []string{string(value)}, nil } - kc := Catalog{Snapshot: snapshot} + kc := NewCatalog(nil, snapshot).(*Catalog) got, err := kc.listAliasesAfter210WithDb(ctx, testDb, 0) assert.NoError(t, err) @@ -894,7 +890,7 @@ func TestCatalog_ListAliasesV2(t *testing.T) { return []string{"key"}, []string{"not in pb format"}, nil } - kc := Catalog{Snapshot: snapshot} + kc := NewCatalog(nil, snapshot).(*Catalog) _, err := kc.ListAliases(ctx, testDb, 0) assert.Error(t, err) @@ -919,7 +915,7 @@ func TestCatalog_ListAliasesV2(t *testing.T) { return []string{"key"}, []string{string(value)}, nil } - kc := Catalog{Snapshot: snapshot} + kc := NewCatalog(nil, snapshot).(*Catalog) _, err = kc.ListAliases(ctx, util.NonDBID, 0) assert.Error(t, err) @@ -944,7 +940,7 @@ func TestCatalog_ListAliasesV2(t *testing.T) { return []string{}, []string{}, nil } - kc := Catalog{Snapshot: snapshot} + kc := NewCatalog(nil, snapshot).(*Catalog) got, err := kc.ListAliases(ctx, testDb, 0) assert.NoError(t, err) @@ -1001,14 +997,14 @@ func Test_batchMultiSaveAndRemoveWithPrefix(t *testing.T) { func TestCatalog_AlterCollection(t *testing.T) { t.Run("add", func(t *testing.T) { - kc := &Catalog{} + kc := NewCatalog(nil, nil) ctx := context.Background() err := kc.AlterCollection(ctx, nil, nil, metastore.ADD, 0) assert.Error(t, err) }) t.Run("delete", func(t *testing.T) { - kc := &Catalog{} + kc := NewCatalog(nil, nil) ctx := context.Background() err := kc.AlterCollection(ctx, nil, nil, metastore.DELETE, 0) assert.Error(t, err) @@ -1021,7 +1017,7 @@ func TestCatalog_AlterCollection(t *testing.T) { kvs[key] = value return nil } - kc := &Catalog{Snapshot: snapshot} + kc := NewCatalog(nil, snapshot).(*Catalog) ctx := context.Background() var collectionID int64 = 1 oldC := &model.Collection{CollectionID: collectionID, State: pb.CollectionState_CollectionCreating} @@ -1039,7 +1035,7 @@ func TestCatalog_AlterCollection(t *testing.T) { }) t.Run("modify, tenant id changed", func(t *testing.T) { - kc := &Catalog{} + kc := NewCatalog(nil, nil) ctx := context.Background() var collectionID int64 = 1 oldC := &model.Collection{TenantID: "1", CollectionID: collectionID, State: pb.CollectionState_CollectionCreating} @@ -1058,7 +1054,7 @@ func TestCatalog_AlterCollection(t *testing.T) { return nil } - kc := &Catalog{Snapshot: snapshot} + kc := NewCatalog(nil, snapshot).(*Catalog) ctx := context.Background() oldC := &model.Collection{DBID: 0, CollectionID: collectionID, State: pb.CollectionState_CollectionCreated} newC := &model.Collection{DBID: 1, CollectionID: collectionID, State: pb.CollectionState_CollectionCreated} @@ -1069,14 +1065,14 @@ func TestCatalog_AlterCollection(t *testing.T) { func TestCatalog_AlterPartition(t *testing.T) { t.Run("add", func(t *testing.T) { - kc := &Catalog{} + kc := NewCatalog(nil, nil) ctx := context.Background() err := kc.AlterPartition(ctx, testDb, nil, nil, metastore.ADD, 0) assert.Error(t, err) }) t.Run("delete", func(t *testing.T) { - kc := &Catalog{} + kc := NewCatalog(nil, nil) ctx := context.Background() err := kc.AlterPartition(ctx, testDb, nil, nil, metastore.DELETE, 0) assert.Error(t, err) @@ -1089,7 +1085,7 @@ func TestCatalog_AlterPartition(t *testing.T) { kvs[key] = value return nil } - kc := &Catalog{Snapshot: snapshot} + kc := NewCatalog(nil, snapshot).(*Catalog) ctx := context.Background() var collectionID int64 = 1 var partitionID int64 = 2 @@ -1108,7 +1104,7 @@ func TestCatalog_AlterPartition(t *testing.T) { }) t.Run("modify, tenant id changed", func(t *testing.T) { - kc := &Catalog{} + kc := NewCatalog(nil, nil) ctx := context.Background() var collectionID int64 = 1 oldP := &model.Partition{PartitionID: 1, CollectionID: collectionID, State: pb.PartitionState_PartitionCreating} @@ -1162,7 +1158,7 @@ func withMockMultiSaveAndRemoveWithPrefix(err error) mockSnapshotOpt { func TestCatalog_CreateCollection(t *testing.T) { t.Run("collection not creating", func(t *testing.T) { - kc := &Catalog{} + kc := NewCatalog(nil, nil) ctx := context.Background() coll := &model.Collection{State: pb.CollectionState_CollectionDropping} err := kc.CreateCollection(ctx, coll, 100) @@ -1171,7 +1167,7 @@ func TestCatalog_CreateCollection(t *testing.T) { t.Run("failed to save collection", func(t *testing.T) { mockSnapshot := newMockSnapshot(t, withMockSave(errors.New("error mock Save"))) - kc := &Catalog{Snapshot: mockSnapshot} + kc := NewCatalog(nil, mockSnapshot) ctx := context.Background() coll := &model.Collection{State: pb.CollectionState_CollectionCreating} err := kc.CreateCollection(ctx, coll, 100) @@ -1180,7 +1176,7 @@ func TestCatalog_CreateCollection(t *testing.T) { t.Run("succeed to save collection but failed to save other keys", func(t *testing.T) { mockSnapshot := newMockSnapshot(t, withMockSave(nil), withMockMultiSave(errors.New("error mock MultiSave"))) - kc := &Catalog{Snapshot: mockSnapshot} + kc := NewCatalog(nil, mockSnapshot) ctx := context.Background() coll := &model.Collection{ Partitions: []*model.Partition{ @@ -1194,7 +1190,7 @@ func TestCatalog_CreateCollection(t *testing.T) { t.Run("normal case", func(t *testing.T) { mockSnapshot := newMockSnapshot(t, withMockSave(nil), withMockMultiSave(nil)) - kc := &Catalog{Snapshot: mockSnapshot} + kc := NewCatalog(nil, mockSnapshot) ctx := context.Background() coll := &model.Collection{ Partitions: []*model.Partition{ @@ -1210,7 +1206,7 @@ func TestCatalog_CreateCollection(t *testing.T) { func TestCatalog_DropCollection(t *testing.T) { t.Run("failed to remove", func(t *testing.T) { mockSnapshot := newMockSnapshot(t, withMockMultiSaveAndRemoveWithPrefix(errors.New("error mock MultiSaveAndRemoveWithPrefix"))) - kc := &Catalog{Snapshot: mockSnapshot} + kc := NewCatalog(nil, mockSnapshot) ctx := context.Background() coll := &model.Collection{ Partitions: []*model.Partition{ @@ -1244,7 +1240,7 @@ func TestCatalog_DropCollection(t *testing.T) { removeCollectionCalled = true return errors.New("error mock MultiSaveAndRemoveWithPrefix") }).Once() - kc := &Catalog{Snapshot: mockSnapshot} + kc := NewCatalog(nil, mockSnapshot) ctx := context.Background() coll := &model.Collection{ Partitions: []*model.Partition{ @@ -1260,7 +1256,7 @@ func TestCatalog_DropCollection(t *testing.T) { t.Run("normal case", func(t *testing.T) { mockSnapshot := newMockSnapshot(t, withMockMultiSaveAndRemoveWithPrefix(nil)) - kc := &Catalog{Snapshot: mockSnapshot} + kc := NewCatalog(nil, mockSnapshot) ctx := context.Background() coll := &model.Collection{ Partitions: []*model.Partition{ @@ -1285,7 +1281,7 @@ func TestRBAC_Credential(t *testing.T) { t.Run("test GetCredential", func(t *testing.T) { var ( kvmock = mocks.NewTxnKV(t) - c = &Catalog{Txn: kvmock} + c = NewCatalog(kvmock, nil) loadFailName = "invalid" loadFailKey = fmt.Sprintf("%s/%s", CredentialPrefix, loadFailName) @@ -1336,7 +1332,7 @@ func TestRBAC_Credential(t *testing.T) { t.Run("test CreateCredential", func(t *testing.T) { var ( kvmock = mocks.NewTxnKV(t) - c = &Catalog{Txn: kvmock} + c = NewCatalog(kvmock, nil) invalidName = "invalid" ) @@ -1386,7 +1382,7 @@ func TestRBAC_Credential(t *testing.T) { t.Run("test DropCredential", func(t *testing.T) { var ( kvmock = mocks.NewTxnKV(t) - c = &Catalog{Txn: kvmock} + c = NewCatalog(kvmock, nil) validName = "user1" validUserRoleKeyPrefix = funcutil.HandleTenantForEtcdKey(RoleMappingPrefix, util.DefaultTenant, validName) @@ -1443,7 +1439,7 @@ func TestRBAC_Credential(t *testing.T) { t.Run("test ListCredentials", func(t *testing.T) { var ( kvmock = mocks.NewTxnKV(t) - c = &Catalog{Txn: kvmock} + c = NewCatalog(kvmock, nil) cmu sync.RWMutex count = 0 @@ -1525,7 +1521,7 @@ func TestRBAC_Role(t *testing.T) { t.Run("test remove", func(t *testing.T) { var ( kvmock = mocks.NewTxnKV(t) - c = &Catalog{Txn: kvmock} + c = NewCatalog(kvmock, nil).(*Catalog) notExistKey = "not-exist" errorKey = "error" @@ -1569,7 +1565,7 @@ func TestRBAC_Role(t *testing.T) { t.Run("test save", func(t *testing.T) { var ( kvmock = mocks.NewTxnKV(t) - c = &Catalog{Txn: kvmock} + c = NewCatalog(kvmock, nil).(*Catalog) notExistKey = "not-exist" errorKey = "error" @@ -1614,7 +1610,7 @@ func TestRBAC_Role(t *testing.T) { t.Run("test CreateRole", func(t *testing.T) { var ( kvmock = mocks.NewTxnKV(t) - c = &Catalog{Txn: kvmock} + c = NewCatalog(kvmock, nil) notExistName = "not-exist" notExistPath = funcutil.HandleTenantForEtcdKey(RolePrefix, tenant, notExistName) @@ -1657,7 +1653,7 @@ func TestRBAC_Role(t *testing.T) { t.Run("test DropRole", func(t *testing.T) { var ( kvmock = mocks.NewTxnKV(t) - c = &Catalog{Txn: kvmock} + c = NewCatalog(kvmock, nil) validName = "role1" errorName = "error" @@ -1710,7 +1706,7 @@ func TestRBAC_Role(t *testing.T) { t.Run("test AlterUserRole", func(t *testing.T) { var ( kvmock = mocks.NewTxnKV(t) - c = &Catalog{Txn: kvmock} + c = NewCatalog(kvmock, nil) user = "default-user" @@ -1773,7 +1769,7 @@ func TestRBAC_Role(t *testing.T) { t.Run("test entity!=nil", func(t *testing.T) { var ( kvmock = mocks.NewTxnKV(t) - c = &Catalog{Txn: kvmock} + c = NewCatalog(kvmock, nil) errorLoad = "error" errorLoadPath = funcutil.HandleTenantForEtcdKey(RolePrefix, tenant, errorLoad) @@ -1849,7 +1845,7 @@ func TestRBAC_Role(t *testing.T) { t.Run("test entity is nil", func(t *testing.T) { var ( kvmock = mocks.NewTxnKV(t) - c = &Catalog{Txn: kvmock} + c = NewCatalog(kvmock, nil) ) // Return valid keys if loadWithPrefixReturn == True @@ -1906,7 +1902,7 @@ func TestRBAC_Role(t *testing.T) { t.Run("test ListUser", func(t *testing.T) { var ( kvmock = mocks.NewTxnKV(t) - c = &Catalog{Txn: kvmock} + c = NewCatalog(kvmock, nil).(*Catalog) invalidUser = "invalid-user" invalidUserKey = funcutil.HandleTenantForEtcdKey(RoleMappingPrefix, tenant, invalidUser) @@ -2048,7 +2044,7 @@ func TestRBAC_Role(t *testing.T) { var ( loadWithPrefixReturn atomic.Bool kvmock = mocks.NewTxnKV(t) - c = &Catalog{Txn: kvmock} + c = NewCatalog(kvmock, nil) ) // Return valid keys if loadWithPrefixReturn == True @@ -2124,7 +2120,7 @@ func TestRBAC_Grant(t *testing.T) { t.Run("test AlterGrant", func(t *testing.T) { var ( kvmock = mocks.NewTxnKV(t) - c = &Catalog{Txn: kvmock} + c = NewCatalog(kvmock, nil) ) validRoleKey := funcutil.HandleTenantForEtcdKey(GranteePrefix, tenant, fmt.Sprintf("%s/%s/%s", validRole, object, objName)) @@ -2310,7 +2306,7 @@ func TestRBAC_Grant(t *testing.T) { t.Run("test DeleteGrant", func(t *testing.T) { var ( kvmock = mocks.NewTxnKV(t) - c = &Catalog{Txn: kvmock} + c = NewCatalog(kvmock, nil) errorRole = "error-role" errorRolePrefix = funcutil.HandleTenantForEtcdKey(GranteePrefix, tenant, errorRole+"/") @@ -2350,7 +2346,7 @@ func TestRBAC_Grant(t *testing.T) { t.Run("test ListGrant", func(t *testing.T) { var ( kvmock = mocks.NewTxnKV(t) - c = &Catalog{Txn: kvmock} + c = NewCatalog(kvmock, nil) ) // Mock Load in kv_catalog.go:L901 @@ -2470,7 +2466,7 @@ func TestRBAC_Grant(t *testing.T) { t.Run("test ListPolicy", func(t *testing.T) { var ( kvmock = mocks.NewTxnKV(t) - c = &Catalog{Txn: kvmock} + c = NewCatalog(kvmock, nil) firstLoadWithPrefixReturn atomic.Bool secondLoadWithPrefixReturn atomic.Bool @@ -2576,7 +2572,7 @@ func TestRBAC_Backup(t *testing.T) { metaKV := etcdkv.NewEtcdKV(etcdCli, rootPath) defer metaKV.RemoveWithPrefix("") defer metaKV.Close() - c := &Catalog{Txn: metaKV} + c := NewCatalog(metaKV, nil) ctx := context.Background() c.CreateRole(ctx, util.DefaultTenant, &milvuspb.RoleEntity{Name: "role1"}) @@ -2629,7 +2625,7 @@ func TestRBAC_Restore(t *testing.T) { metaKV := etcdkv.NewEtcdKV(etcdCli, rootPath) defer metaKV.RemoveWithPrefix("") defer metaKV.Close() - c := &Catalog{Txn: metaKV} + c := NewCatalog(metaKV, nil) ctx := context.Background() @@ -2793,7 +2789,7 @@ func TestRBAC_PrivilegeGroup(t *testing.T) { t.Run("test GetPrivilegeGroup", func(t *testing.T) { var ( kvmock = mocks.NewTxnKV(t) - c = &Catalog{Txn: kvmock} + c = NewCatalog(kvmock, nil) ) kvmock.EXPECT().Load(key1).Return(string(v1), nil) kvmock.EXPECT().Load(key2).Return("", merr.ErrIoKeyNotFound) @@ -2823,7 +2819,7 @@ func TestRBAC_PrivilegeGroup(t *testing.T) { t.Run("test DropPrivilegeGroup", func(t *testing.T) { var ( kvmock = mocks.NewTxnKV(t) - c = &Catalog{Txn: kvmock} + c = NewCatalog(kvmock, nil) ) kvmock.EXPECT().Remove(key1).Return(nil) @@ -2853,7 +2849,7 @@ func TestRBAC_PrivilegeGroup(t *testing.T) { t.Run("test SavePrivilegeGroup", func(t *testing.T) { var ( kvmock = mocks.NewTxnKV(t) - c = &Catalog{Txn: kvmock} + c = NewCatalog(kvmock, nil) ) kvmock.EXPECT().Save(key1, mock.Anything).Return(nil) @@ -2883,7 +2879,7 @@ func TestRBAC_PrivilegeGroup(t *testing.T) { t.Run("test ListPrivilegeGroups", func(t *testing.T) { var ( kvmock = mocks.NewTxnKV(t) - c = &Catalog{Txn: kvmock} + c = NewCatalog(kvmock, nil) ) kvmock.EXPECT().LoadWithPrefix(PrivilegeGroupPrefix).Return( @@ -2913,7 +2909,7 @@ func getPrivilegeNames(privileges []*milvuspb.PrivilegeEntity) []string { func TestCatalog_AlterDatabase(t *testing.T) { kvmock := mocks.NewSnapShotKV(t) - c := &Catalog{Snapshot: kvmock} + c := NewCatalog(nil, kvmock) db := model.NewDatabase(1, "db", pb.DatabaseState_DatabaseCreated, nil) kvmock.EXPECT().Save(mock.Anything, mock.Anything, mock.Anything).Return(nil) diff --git a/internal/metastore/mocks/mock_datacoord_catalog.go b/internal/metastore/mocks/mock_datacoord_catalog.go index 259602ef8f36c..11c28b260abfc 100644 --- a/internal/metastore/mocks/mock_datacoord_catalog.go +++ b/internal/metastore/mocks/mock_datacoord_catalog.go @@ -1447,25 +1447,25 @@ func (_c *DataCoordCatalog_ListSegmentIndexes_Call) RunAndReturn(run func(contex return _c } -// ListSegments provides a mock function with given fields: ctx -func (_m *DataCoordCatalog) ListSegments(ctx context.Context) ([]*datapb.SegmentInfo, error) { - ret := _m.Called(ctx) +// ListSegments provides a mock function with given fields: ctx, collectionID +func (_m *DataCoordCatalog) ListSegments(ctx context.Context, collectionID int64) ([]*datapb.SegmentInfo, error) { + ret := _m.Called(ctx, collectionID) var r0 []*datapb.SegmentInfo var r1 error - if rf, ok := ret.Get(0).(func(context.Context) ([]*datapb.SegmentInfo, error)); ok { - return rf(ctx) + if rf, ok := ret.Get(0).(func(context.Context, int64) ([]*datapb.SegmentInfo, error)); ok { + return rf(ctx, collectionID) } - if rf, ok := ret.Get(0).(func(context.Context) []*datapb.SegmentInfo); ok { - r0 = rf(ctx) + if rf, ok := ret.Get(0).(func(context.Context, int64) []*datapb.SegmentInfo); ok { + r0 = rf(ctx, collectionID) } else { if ret.Get(0) != nil { r0 = ret.Get(0).([]*datapb.SegmentInfo) } } - if rf, ok := ret.Get(1).(func(context.Context) error); ok { - r1 = rf(ctx) + if rf, ok := ret.Get(1).(func(context.Context, int64) error); ok { + r1 = rf(ctx, collectionID) } else { r1 = ret.Error(1) } @@ -1480,13 +1480,14 @@ type DataCoordCatalog_ListSegments_Call struct { // ListSegments is a helper method to define mock.On call // - ctx context.Context -func (_e *DataCoordCatalog_Expecter) ListSegments(ctx interface{}) *DataCoordCatalog_ListSegments_Call { - return &DataCoordCatalog_ListSegments_Call{Call: _e.mock.On("ListSegments", ctx)} +// - collectionID int64 +func (_e *DataCoordCatalog_Expecter) ListSegments(ctx interface{}, collectionID interface{}) *DataCoordCatalog_ListSegments_Call { + return &DataCoordCatalog_ListSegments_Call{Call: _e.mock.On("ListSegments", ctx, collectionID)} } -func (_c *DataCoordCatalog_ListSegments_Call) Run(run func(ctx context.Context)) *DataCoordCatalog_ListSegments_Call { +func (_c *DataCoordCatalog_ListSegments_Call) Run(run func(ctx context.Context, collectionID int64)) *DataCoordCatalog_ListSegments_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context)) + run(args[0].(context.Context), args[1].(int64)) }) return _c } @@ -1496,7 +1497,7 @@ func (_c *DataCoordCatalog_ListSegments_Call) Return(_a0 []*datapb.SegmentInfo, return _c } -func (_c *DataCoordCatalog_ListSegments_Call) RunAndReturn(run func(context.Context) ([]*datapb.SegmentInfo, error)) *DataCoordCatalog_ListSegments_Call { +func (_c *DataCoordCatalog_ListSegments_Call) RunAndReturn(run func(context.Context, int64) ([]*datapb.SegmentInfo, error)) *DataCoordCatalog_ListSegments_Call { _c.Call.Return(run) return _c } diff --git a/internal/metastore/mocks/mock_querycoord_catalog.go b/internal/metastore/mocks/mock_querycoord_catalog.go index 3ce66e8441206..6b1d7ccf9ae01 100644 --- a/internal/metastore/mocks/mock_querycoord_catalog.go +++ b/internal/metastore/mocks/mock_querycoord_catalog.go @@ -126,25 +126,25 @@ func (_c *QueryCoordCatalog_GetCollections_Call) RunAndReturn(run func() ([]*que return _c } -// GetPartitions provides a mock function with given fields: -func (_m *QueryCoordCatalog) GetPartitions() (map[int64][]*querypb.PartitionLoadInfo, error) { - ret := _m.Called() +// GetPartitions provides a mock function with given fields: collectionIDs +func (_m *QueryCoordCatalog) GetPartitions(collectionIDs []int64) (map[int64][]*querypb.PartitionLoadInfo, error) { + ret := _m.Called(collectionIDs) var r0 map[int64][]*querypb.PartitionLoadInfo var r1 error - if rf, ok := ret.Get(0).(func() (map[int64][]*querypb.PartitionLoadInfo, error)); ok { - return rf() + if rf, ok := ret.Get(0).(func([]int64) (map[int64][]*querypb.PartitionLoadInfo, error)); ok { + return rf(collectionIDs) } - if rf, ok := ret.Get(0).(func() map[int64][]*querypb.PartitionLoadInfo); ok { - r0 = rf() + if rf, ok := ret.Get(0).(func([]int64) map[int64][]*querypb.PartitionLoadInfo); ok { + r0 = rf(collectionIDs) } else { if ret.Get(0) != nil { r0 = ret.Get(0).(map[int64][]*querypb.PartitionLoadInfo) } } - if rf, ok := ret.Get(1).(func() error); ok { - r1 = rf() + if rf, ok := ret.Get(1).(func([]int64) error); ok { + r1 = rf(collectionIDs) } else { r1 = ret.Error(1) } @@ -158,13 +158,14 @@ type QueryCoordCatalog_GetPartitions_Call struct { } // GetPartitions is a helper method to define mock.On call -func (_e *QueryCoordCatalog_Expecter) GetPartitions() *QueryCoordCatalog_GetPartitions_Call { - return &QueryCoordCatalog_GetPartitions_Call{Call: _e.mock.On("GetPartitions")} +// - collectionIDs []int64 +func (_e *QueryCoordCatalog_Expecter) GetPartitions(collectionIDs interface{}) *QueryCoordCatalog_GetPartitions_Call { + return &QueryCoordCatalog_GetPartitions_Call{Call: _e.mock.On("GetPartitions", collectionIDs)} } -func (_c *QueryCoordCatalog_GetPartitions_Call) Run(run func()) *QueryCoordCatalog_GetPartitions_Call { +func (_c *QueryCoordCatalog_GetPartitions_Call) Run(run func(collectionIDs []int64)) *QueryCoordCatalog_GetPartitions_Call { _c.Call.Run(func(args mock.Arguments) { - run() + run(args[0].([]int64)) }) return _c } @@ -174,7 +175,7 @@ func (_c *QueryCoordCatalog_GetPartitions_Call) Return(_a0 map[int64][]*querypb. return _c } -func (_c *QueryCoordCatalog_GetPartitions_Call) RunAndReturn(run func() (map[int64][]*querypb.PartitionLoadInfo, error)) *QueryCoordCatalog_GetPartitions_Call { +func (_c *QueryCoordCatalog_GetPartitions_Call) RunAndReturn(run func([]int64) (map[int64][]*querypb.PartitionLoadInfo, error)) *QueryCoordCatalog_GetPartitions_Call { _c.Call.Return(run) return _c } diff --git a/internal/metastore/mocks/mock_rootcoord_catalog.go b/internal/metastore/mocks/mock_rootcoord_catalog.go index 646eb849ae756..b25411f439016 100644 --- a/internal/metastore/mocks/mock_rootcoord_catalog.go +++ b/internal/metastore/mocks/mock_rootcoord_catalog.go @@ -1003,6 +1003,49 @@ func (_c *RootCoordCatalog_DropPartition_Call) RunAndReturn(run func(context.Con return _c } +// DropPrivilegeGroup provides a mock function with given fields: ctx, groupName +func (_m *RootCoordCatalog) DropPrivilegeGroup(ctx context.Context, groupName string) error { + ret := _m.Called(ctx, groupName) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, string) error); ok { + r0 = rf(ctx, groupName) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// RootCoordCatalog_DropPrivilegeGroup_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DropPrivilegeGroup' +type RootCoordCatalog_DropPrivilegeGroup_Call struct { + *mock.Call +} + +// DropPrivilegeGroup is a helper method to define mock.On call +// - ctx context.Context +// - groupName string +func (_e *RootCoordCatalog_Expecter) DropPrivilegeGroup(ctx interface{}, groupName interface{}) *RootCoordCatalog_DropPrivilegeGroup_Call { + return &RootCoordCatalog_DropPrivilegeGroup_Call{Call: _e.mock.On("DropPrivilegeGroup", ctx, groupName)} +} + +func (_c *RootCoordCatalog_DropPrivilegeGroup_Call) Run(run func(ctx context.Context, groupName string)) *RootCoordCatalog_DropPrivilegeGroup_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(string)) + }) + return _c +} + +func (_c *RootCoordCatalog_DropPrivilegeGroup_Call) Return(_a0 error) *RootCoordCatalog_DropPrivilegeGroup_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *RootCoordCatalog_DropPrivilegeGroup_Call) RunAndReturn(run func(context.Context, string) error) *RootCoordCatalog_DropPrivilegeGroup_Call { + _c.Call.Return(run) + return _c +} + // DropRole provides a mock function with given fields: ctx, tenant, roleName func (_m *RootCoordCatalog) DropRole(ctx context.Context, tenant string, roleName string) error { ret := _m.Called(ctx, tenant, roleName) @@ -1216,6 +1259,61 @@ func (_c *RootCoordCatalog_GetCredential_Call) RunAndReturn(run func(context.Con return _c } +// GetPrivilegeGroup provides a mock function with given fields: ctx, groupName +func (_m *RootCoordCatalog) GetPrivilegeGroup(ctx context.Context, groupName string) (*milvuspb.PrivilegeGroupInfo, error) { + ret := _m.Called(ctx, groupName) + + var r0 *milvuspb.PrivilegeGroupInfo + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, string) (*milvuspb.PrivilegeGroupInfo, error)); ok { + return rf(ctx, groupName) + } + if rf, ok := ret.Get(0).(func(context.Context, string) *milvuspb.PrivilegeGroupInfo); ok { + r0 = rf(ctx, groupName) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*milvuspb.PrivilegeGroupInfo) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, string) error); ok { + r1 = rf(ctx, groupName) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// RootCoordCatalog_GetPrivilegeGroup_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetPrivilegeGroup' +type RootCoordCatalog_GetPrivilegeGroup_Call struct { + *mock.Call +} + +// GetPrivilegeGroup is a helper method to define mock.On call +// - ctx context.Context +// - groupName string +func (_e *RootCoordCatalog_Expecter) GetPrivilegeGroup(ctx interface{}, groupName interface{}) *RootCoordCatalog_GetPrivilegeGroup_Call { + return &RootCoordCatalog_GetPrivilegeGroup_Call{Call: _e.mock.On("GetPrivilegeGroup", ctx, groupName)} +} + +func (_c *RootCoordCatalog_GetPrivilegeGroup_Call) Run(run func(ctx context.Context, groupName string)) *RootCoordCatalog_GetPrivilegeGroup_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(string)) + }) + return _c +} + +func (_c *RootCoordCatalog_GetPrivilegeGroup_Call) Return(_a0 *milvuspb.PrivilegeGroupInfo, _a1 error) *RootCoordCatalog_GetPrivilegeGroup_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *RootCoordCatalog_GetPrivilegeGroup_Call) RunAndReturn(run func(context.Context, string) (*milvuspb.PrivilegeGroupInfo, error)) *RootCoordCatalog_GetPrivilegeGroup_Call { + _c.Call.Return(run) + return _c +} + // ListAliases provides a mock function with given fields: ctx, dbID, ts func (_m *RootCoordCatalog) ListAliases(ctx context.Context, dbID int64, ts uint64) ([]*model.Alias, error) { ret := _m.Called(ctx, dbID, ts) @@ -1602,6 +1700,60 @@ func (_c *RootCoordCatalog_ListPolicy_Call) RunAndReturn(run func(context.Contex return _c } +// ListPrivilegeGroups provides a mock function with given fields: ctx +func (_m *RootCoordCatalog) ListPrivilegeGroups(ctx context.Context) ([]*milvuspb.PrivilegeGroupInfo, error) { + ret := _m.Called(ctx) + + var r0 []*milvuspb.PrivilegeGroupInfo + var r1 error + if rf, ok := ret.Get(0).(func(context.Context) ([]*milvuspb.PrivilegeGroupInfo, error)); ok { + return rf(ctx) + } + if rf, ok := ret.Get(0).(func(context.Context) []*milvuspb.PrivilegeGroupInfo); ok { + r0 = rf(ctx) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]*milvuspb.PrivilegeGroupInfo) + } + } + + if rf, ok := ret.Get(1).(func(context.Context) error); ok { + r1 = rf(ctx) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// RootCoordCatalog_ListPrivilegeGroups_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ListPrivilegeGroups' +type RootCoordCatalog_ListPrivilegeGroups_Call struct { + *mock.Call +} + +// ListPrivilegeGroups is a helper method to define mock.On call +// - ctx context.Context +func (_e *RootCoordCatalog_Expecter) ListPrivilegeGroups(ctx interface{}) *RootCoordCatalog_ListPrivilegeGroups_Call { + return &RootCoordCatalog_ListPrivilegeGroups_Call{Call: _e.mock.On("ListPrivilegeGroups", ctx)} +} + +func (_c *RootCoordCatalog_ListPrivilegeGroups_Call) Run(run func(ctx context.Context)) *RootCoordCatalog_ListPrivilegeGroups_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context)) + }) + return _c +} + +func (_c *RootCoordCatalog_ListPrivilegeGroups_Call) Return(_a0 []*milvuspb.PrivilegeGroupInfo, _a1 error) *RootCoordCatalog_ListPrivilegeGroups_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *RootCoordCatalog_ListPrivilegeGroups_Call) RunAndReturn(run func(context.Context) ([]*milvuspb.PrivilegeGroupInfo, error)) *RootCoordCatalog_ListPrivilegeGroups_Call { + _c.Call.Return(run) + return _c +} + // ListRole provides a mock function with given fields: ctx, tenant, entity, includeUserInfo func (_m *RootCoordCatalog) ListRole(ctx context.Context, tenant string, entity *milvuspb.RoleEntity, includeUserInfo bool) ([]*milvuspb.RoleResult, error) { ret := _m.Called(ctx, tenant, entity, includeUserInfo) @@ -1815,119 +1967,7 @@ func (_c *RootCoordCatalog_RestoreRBAC_Call) RunAndReturn(run func(context.Conte return _c } -// NewRootCoordCatalog creates a new instance of RootCoordCatalog. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. -// The first argument is typically a *testing.T value. -func NewRootCoordCatalog(t interface { - mock.TestingT - Cleanup(func()) -}) *RootCoordCatalog { - mock := &RootCoordCatalog{} - mock.Mock.Test(t) - - t.Cleanup(func() { mock.AssertExpectations(t) }) - - return mock -} - -// GetPrivilegeGroup provides a mock function with given fields: ctx, groupName -func (_m *RootCoordCatalog) GetPrivilegeGroup(ctx context.Context, groupName string) (*milvuspb.PrivilegeGroupInfo, error) { - ret := _m.Called(ctx, groupName) - - var r0 *milvuspb.PrivilegeGroupInfo - var r1 error - if rf, ok := ret.Get(0).(func(context.Context, string) (*milvuspb.PrivilegeGroupInfo, error)); ok { - return rf(ctx, groupName) - } - if rf, ok := ret.Get(0).(func(context.Context, string) *milvuspb.PrivilegeGroupInfo); ok { - r0 = rf(ctx, groupName) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).(*milvuspb.PrivilegeGroupInfo) - } - } - - if rf, ok := ret.Get(1).(func(context.Context, string) error); ok { - r1 = rf(ctx, groupName) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - -// RootCoordCatalog_GetPrivilegeGroup_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetPrivilegeGroup' -type RootCoordCatalog_GetPrivilegeGroup_Call struct { - *mock.Call -} - -// GetPrivilegeGroup is a helper method to define mock.On call -// - ctx context.Context -// - groupName string -func (_e *RootCoordCatalog_Expecter) GetPrivilegeGroup(ctx interface{}, groupName interface{}) *RootCoordCatalog_GetPrivilegeGroup_Call { - return &RootCoordCatalog_GetPrivilegeGroup_Call{Call: _e.mock.On("GetPrivilegeGroup", ctx, groupName)} -} - -func (_c *RootCoordCatalog_GetPrivilegeGroup_Call) Run(run func(ctx context.Context, groupName string)) *RootCoordCatalog_GetPrivilegeGroup_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(string)) - }) - return _c -} - -func (_c *RootCoordCatalog_GetPrivilegeGroup_Call) Return(_a0 *milvuspb.PrivilegeGroupInfo, _a1 error) *RootCoordCatalog_GetPrivilegeGroup_Call { - _c.Call.Return(_a0, _a1) - return _c -} - -func (_c *RootCoordCatalog_GetPrivilegeGroup_Call) RunAndReturn(run func(context.Context, string) (*milvuspb.PrivilegeGroupInfo,error)) *RootCoordCatalog_GetPrivilegeGroup_Call { - _c.Call.Return(run) - return _c -} - -// DropPrivilegeGroup provides a mock function with given fields: ctx, groupName, privileges -func (_m *RootCoordCatalog) DropPrivilegeGroup(ctx context.Context, groupName string) error { - ret := _m.Called(ctx, groupName) - - var r0 error - if rf, ok := ret.Get(0).(func(context.Context, string) error); ok { - r0 = rf(ctx, groupName) - } else { - r0 = ret.Error(0) - } - - return r0 -} - -// RootCoordCatalog_DropPrivilegeGroup_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DropPrivilegeGroup' -type RootCoordCatalog_DropPrivilegeGroup_Call struct { - *mock.Call -} - -// DropPrivilegeGroup is a helper method to define mock.On call -// - ctx context.Context -// - groupName string -func (_e *RootCoordCatalog_Expecter) DropPrivilegeGroup(ctx interface{}, groupName interface{}) *RootCoordCatalog_DropPrivilegeGroup_Call { - return &RootCoordCatalog_DropPrivilegeGroup_Call{Call: _e.mock.On("DropPrivilegeGroup", ctx, groupName)} -} - -func (_c *RootCoordCatalog_DropPrivilegeGroup_Call) Run(run func(ctx context.Context, groupName string)) *RootCoordCatalog_DropPrivilegeGroup_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(string)) - }) - return _c -} - -func (_c *RootCoordCatalog_DropPrivilegeGroup_Call) Return(_a0 error) *RootCoordCatalog_DropPrivilegeGroup_Call { - _c.Call.Return(_a0) - return _c -} - -func (_c *RootCoordCatalog_DropPrivilegeGroup_Call) RunAndReturn(run func(context.Context, string) error) *RootCoordCatalog_DropPrivilegeGroup_Call { - _c.Call.Return(run) - return _c -} - -// SavePrivilegeGroup provides a mock function with given fields: ctx, groupName, privileges +// SavePrivilegeGroup provides a mock function with given fields: ctx, data func (_m *RootCoordCatalog) SavePrivilegeGroup(ctx context.Context, data *milvuspb.PrivilegeGroupInfo) error { ret := _m.Called(ctx, data) @@ -1948,7 +1988,7 @@ type RootCoordCatalog_SavePrivilegeGroup_Call struct { // SavePrivilegeGroup is a helper method to define mock.On call // - ctx context.Context -// - groupName string +// - data *milvuspb.PrivilegeGroupInfo func (_e *RootCoordCatalog_Expecter) SavePrivilegeGroup(ctx interface{}, data interface{}) *RootCoordCatalog_SavePrivilegeGroup_Call { return &RootCoordCatalog_SavePrivilegeGroup_Call{Call: _e.mock.On("SavePrivilegeGroup", ctx, data)} } @@ -1970,56 +2010,16 @@ func (_c *RootCoordCatalog_SavePrivilegeGroup_Call) RunAndReturn(run func(contex return _c } -// ListPrivilegeGroups provides a mock function with given fields: ctx -func (_m *RootCoordCatalog) ListPrivilegeGroups(ctx context.Context) ([]*milvuspb.PrivilegeGroupInfo, error) { - ret := _m.Called(ctx) - - var r0 []*milvuspb.PrivilegeGroupInfo - var r1 error - if rf, ok := ret.Get(0).(func(context.Context) ([]*milvuspb.PrivilegeGroupInfo, error)); ok { - return rf(ctx) - } - if rf, ok := ret.Get(0).(func(context.Context) []*milvuspb.PrivilegeGroupInfo); ok { - r0 = rf(ctx) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).([]*milvuspb.PrivilegeGroupInfo) - } - } - - if rf, ok := ret.Get(1).(func(context.Context) error); ok { - r1 = rf(ctx) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - -// RootCoordCatalog_ListPrivilegeGroups_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ListPrivilegeGroups' -type RootCoordCatalog_ListPrivilegeGroups_Call struct { - *mock.Call -} - -// ListPrivilegeGroups is a helper method to define mock.On call -// - ctx context.Context -func (_e *RootCoordCatalog_Expecter) ListPrivilegeGroups(ctx interface{}) *RootCoordCatalog_ListPrivilegeGroups_Call { - return &RootCoordCatalog_ListPrivilegeGroups_Call{Call: _e.mock.On("ListPrivilegeGroups", ctx)} -} - -func (_c *RootCoordCatalog_ListPrivilegeGroups_Call) Run(run func(ctx context.Context)) *RootCoordCatalog_ListPrivilegeGroups_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context)) - }) - return _c -} +// NewRootCoordCatalog creates a new instance of RootCoordCatalog. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewRootCoordCatalog(t interface { + mock.TestingT + Cleanup(func()) +}) *RootCoordCatalog { + mock := &RootCoordCatalog{} + mock.Mock.Test(t) -func (_c *RootCoordCatalog_ListPrivilegeGroups_Call) Return(_a0 []*milvuspb.PrivilegeGroupInfo, _a1 error) *RootCoordCatalog_ListPrivilegeGroups_Call { - _c.Call.Return(_a0, _a1) - return _c -} + t.Cleanup(func() { mock.AssertExpectations(t) }) -func (_c *RootCoordCatalog_ListPrivilegeGroups_Call) RunAndReturn(run func(context.Context) ([]*milvuspb.PrivilegeGroupInfo, error)) *RootCoordCatalog_ListPrivilegeGroups_Call { - _c.Call.Return(run) - return _c + return mock } diff --git a/internal/mocks/mock_rootcoord.go b/internal/mocks/mock_rootcoord.go index 877174804472f..a1f6fae5a704e 100644 --- a/internal/mocks/mock_rootcoord.go +++ b/internal/mocks/mock_rootcoord.go @@ -2919,6 +2919,65 @@ func (_c *RootCoord_ShowCollections_Call) RunAndReturn(run func(context.Context, return _c } +// ShowCollectionsInternal provides a mock function with given fields: _a0, _a1 +func (_m *RootCoord) ShowCollectionsInternal(_a0 context.Context, _a1 *rootcoordpb.ShowCollectionsInternalRequest) (*rootcoordpb.ShowCollectionsInternalResponse, error) { + ret := _m.Called(_a0, _a1) + + if len(ret) == 0 { + panic("no return value specified for ShowCollectionsInternal") + } + + var r0 *rootcoordpb.ShowCollectionsInternalResponse + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *rootcoordpb.ShowCollectionsInternalRequest) (*rootcoordpb.ShowCollectionsInternalResponse, error)); ok { + return rf(_a0, _a1) + } + if rf, ok := ret.Get(0).(func(context.Context, *rootcoordpb.ShowCollectionsInternalRequest) *rootcoordpb.ShowCollectionsInternalResponse); ok { + r0 = rf(_a0, _a1) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*rootcoordpb.ShowCollectionsInternalResponse) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, *rootcoordpb.ShowCollectionsInternalRequest) error); ok { + r1 = rf(_a0, _a1) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// RootCoord_ShowCollectionsInternal_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ShowCollectionsInternal' +type RootCoord_ShowCollectionsInternal_Call struct { + *mock.Call +} + +// ShowCollectionsInternal is a helper method to define mock.On call +// - _a0 context.Context +// - _a1 *rootcoordpb.ShowCollectionsInternalRequest +func (_e *RootCoord_Expecter) ShowCollectionsInternal(_a0 interface{}, _a1 interface{}) *RootCoord_ShowCollectionsInternal_Call { + return &RootCoord_ShowCollectionsInternal_Call{Call: _e.mock.On("ShowCollectionsInternal", _a0, _a1)} +} + +func (_c *RootCoord_ShowCollectionsInternal_Call) Run(run func(_a0 context.Context, _a1 *rootcoordpb.ShowCollectionsInternalRequest)) *RootCoord_ShowCollectionsInternal_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(*rootcoordpb.ShowCollectionsInternalRequest)) + }) + return _c +} + +func (_c *RootCoord_ShowCollectionsInternal_Call) Return(_a0 *rootcoordpb.ShowCollectionsInternalResponse, _a1 error) *RootCoord_ShowCollectionsInternal_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *RootCoord_ShowCollectionsInternal_Call) RunAndReturn(run func(context.Context, *rootcoordpb.ShowCollectionsInternalRequest) (*rootcoordpb.ShowCollectionsInternalResponse, error)) *RootCoord_ShowCollectionsInternal_Call { + _c.Call.Return(run) + return _c +} + // ShowConfigurations provides a mock function with given fields: _a0, _a1 func (_m *RootCoord) ShowConfigurations(_a0 context.Context, _a1 *internalpb.ShowConfigurationsRequest) (*internalpb.ShowConfigurationsResponse, error) { ret := _m.Called(_a0, _a1) diff --git a/internal/mocks/mock_rootcoord_client.go b/internal/mocks/mock_rootcoord_client.go index 3d3db95c6963d..70055211ee8db 100644 --- a/internal/mocks/mock_rootcoord_client.go +++ b/internal/mocks/mock_rootcoord_client.go @@ -3364,6 +3364,80 @@ func (_c *MockRootCoordClient_ShowCollections_Call) RunAndReturn(run func(contex return _c } +// ShowCollectionsInternal provides a mock function with given fields: ctx, in, opts +func (_m *MockRootCoordClient) ShowCollectionsInternal(ctx context.Context, in *rootcoordpb.ShowCollectionsInternalRequest, opts ...grpc.CallOption) (*rootcoordpb.ShowCollectionsInternalResponse, error) { + _va := make([]interface{}, len(opts)) + for _i := range opts { + _va[_i] = opts[_i] + } + var _ca []interface{} + _ca = append(_ca, ctx, in) + _ca = append(_ca, _va...) + ret := _m.Called(_ca...) + + if len(ret) == 0 { + panic("no return value specified for ShowCollectionsInternal") + } + + var r0 *rootcoordpb.ShowCollectionsInternalResponse + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *rootcoordpb.ShowCollectionsInternalRequest, ...grpc.CallOption) (*rootcoordpb.ShowCollectionsInternalResponse, error)); ok { + return rf(ctx, in, opts...) + } + if rf, ok := ret.Get(0).(func(context.Context, *rootcoordpb.ShowCollectionsInternalRequest, ...grpc.CallOption) *rootcoordpb.ShowCollectionsInternalResponse); ok { + r0 = rf(ctx, in, opts...) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*rootcoordpb.ShowCollectionsInternalResponse) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, *rootcoordpb.ShowCollectionsInternalRequest, ...grpc.CallOption) error); ok { + r1 = rf(ctx, in, opts...) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockRootCoordClient_ShowCollectionsInternal_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ShowCollectionsInternal' +type MockRootCoordClient_ShowCollectionsInternal_Call struct { + *mock.Call +} + +// ShowCollectionsInternal is a helper method to define mock.On call +// - ctx context.Context +// - in *rootcoordpb.ShowCollectionsInternalRequest +// - opts ...grpc.CallOption +func (_e *MockRootCoordClient_Expecter) ShowCollectionsInternal(ctx interface{}, in interface{}, opts ...interface{}) *MockRootCoordClient_ShowCollectionsInternal_Call { + return &MockRootCoordClient_ShowCollectionsInternal_Call{Call: _e.mock.On("ShowCollectionsInternal", + append([]interface{}{ctx, in}, opts...)...)} +} + +func (_c *MockRootCoordClient_ShowCollectionsInternal_Call) Run(run func(ctx context.Context, in *rootcoordpb.ShowCollectionsInternalRequest, opts ...grpc.CallOption)) *MockRootCoordClient_ShowCollectionsInternal_Call { + _c.Call.Run(func(args mock.Arguments) { + variadicArgs := make([]grpc.CallOption, len(args)-2) + for i, a := range args[2:] { + if a != nil { + variadicArgs[i] = a.(grpc.CallOption) + } + } + run(args[0].(context.Context), args[1].(*rootcoordpb.ShowCollectionsInternalRequest), variadicArgs...) + }) + return _c +} + +func (_c *MockRootCoordClient_ShowCollectionsInternal_Call) Return(_a0 *rootcoordpb.ShowCollectionsInternalResponse, _a1 error) *MockRootCoordClient_ShowCollectionsInternal_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockRootCoordClient_ShowCollectionsInternal_Call) RunAndReturn(run func(context.Context, *rootcoordpb.ShowCollectionsInternalRequest, ...grpc.CallOption) (*rootcoordpb.ShowCollectionsInternalResponse, error)) *MockRootCoordClient_ShowCollectionsInternal_Call { + _c.Call.Return(run) + return _c +} + // ShowConfigurations provides a mock function with given fields: ctx, in, opts func (_m *MockRootCoordClient) ShowConfigurations(ctx context.Context, in *internalpb.ShowConfigurationsRequest, opts ...grpc.CallOption) (*internalpb.ShowConfigurationsResponse, error) { _va := make([]interface{}, len(opts)) diff --git a/internal/proto/root_coord.proto b/internal/proto/root_coord.proto index bc1aadc6d6525..b7b3363fd1c92 100644 --- a/internal/proto/root_coord.proto +++ b/internal/proto/root_coord.proto @@ -63,6 +63,8 @@ service RootCoord { */ rpc ShowCollections(milvus.ShowCollectionsRequest) returns (milvus.ShowCollectionsResponse) {} + rpc ShowCollectionsInternal(ShowCollectionsInternalRequest) returns (ShowCollectionsInternalResponse) {} + rpc AlterCollection(milvus.AlterCollectionRequest) returns (common.Status) {} /** @@ -233,3 +235,18 @@ message AlterDatabaseRequest { string db_id = 3; repeated common.KeyValuePair properties = 4; } + +message ShowCollectionsInternalRequest { + common.MsgBase base = 1; + repeated string db_names = 2; +} + +message DBCollections { + string db_name = 1; + repeated int64 collectionIDs = 2; +} + +message ShowCollectionsInternalResponse { + common.Status status = 1; + repeated DBCollections db_collections = 2; +} diff --git a/internal/proxy/rootcoord_mock_test.go b/internal/proxy/rootcoord_mock_test.go index 1fb988cbf554c..3c160c9960e77 100644 --- a/internal/proxy/rootcoord_mock_test.go +++ b/internal/proxy/rootcoord_mock_test.go @@ -576,6 +576,10 @@ func (coord *RootCoordMock) ShowCollections(ctx context.Context, req *milvuspb.S }, nil } +func (coord *RootCoordMock) ShowCollectionsInternal(ctx context.Context, req *rootcoordpb.ShowCollectionsInternalRequest, opts ...grpc.CallOption) (*rootcoordpb.ShowCollectionsInternalResponse, error) { + panic("implements me") +} + func (coord *RootCoordMock) CreatePartition(ctx context.Context, req *milvuspb.CreatePartitionRequest, opts ...grpc.CallOption) (*commonpb.Status, error) { code := coord.state.Load().(commonpb.StateCode) if code != commonpb.StateCode_Healthy { diff --git a/internal/querycoordv2/meta/collection_manager.go b/internal/querycoordv2/meta/collection_manager.go index 5df4286417d3b..1afd3271718b1 100644 --- a/internal/querycoordv2/meta/collection_manager.go +++ b/internal/querycoordv2/meta/collection_manager.go @@ -136,18 +136,24 @@ func NewCollectionManager(catalog metastore.QueryCoordCatalog) *CollectionManage // Recover recovers collections from kv store, // panics if failed func (m *CollectionManager) Recover(broker Broker) error { + start := time.Now() collections, err := m.catalog.GetCollections() if err != nil { return err } - partitions, err := m.catalog.GetPartitions() + log.Info("recover collections from kv store", zap.Duration("dur", time.Since(start))) + + start = time.Now() + partitions, err := m.catalog.GetPartitions(lo.Map(collections, func(collection *querypb.CollectionLoadInfo, _ int) int64 { + return collection.GetCollectionID() + })) if err != nil { return err } ctx := log.WithTraceID(context.Background(), strconv.FormatInt(time.Now().UnixNano(), 10)) ctxLog := log.Ctx(ctx) - ctxLog.Info("recover collections and partitions from kv store") + ctxLog.Info("recover partitions from kv store", zap.Duration("dur", time.Since(start))) for _, collection := range collections { if collection.GetReplicaNumber() <= 0 { diff --git a/internal/rootcoord/meta_table.go b/internal/rootcoord/meta_table.go index 2c13631ec7218..8616ce6dc9092 100644 --- a/internal/rootcoord/meta_table.go +++ b/internal/rootcoord/meta_table.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "sync" + "time" "github.com/cockroachdb/errors" "github.com/samber/lo" @@ -184,6 +185,9 @@ func (mt *MetaTable) reload() error { collectionNum := int64(0) mt.names.createDbIfNotExist(dbName) + + start := time.Now() + // TODO: async list collections to accelerate cases with multiple databases. collections, err := mt.catalog.ListCollections(mt.ctx, db.ID, typeutil.MaxTimestamp) if err != nil { return err @@ -203,7 +207,8 @@ func (mt *MetaTable) reload() error { metrics.RootCoordNumOfPartitions.WithLabelValues().Add(float64(partitionNum)) log.Info("collections recovered from db", zap.String("db_name", dbName), zap.Int64("collection_num", collectionNum), - zap.Int64("partition_num", partitionNum)) + zap.Int64("partition_num", partitionNum), + zap.Duration("dur", time.Since(start))) } // recover aliases from db namespace diff --git a/internal/rootcoord/meta_table_test.go b/internal/rootcoord/meta_table_test.go index 4192b655c03be..3859b41a26709 100644 --- a/internal/rootcoord/meta_table_test.go +++ b/internal/rootcoord/meta_table_test.go @@ -43,7 +43,7 @@ import ( ) func generateMetaTable(t *testing.T) *MetaTable { - return &MetaTable{catalog: &rootcoord.Catalog{Txn: memkv.NewMemoryKV()}} + return &MetaTable{catalog: rootcoord.NewCatalog(memkv.NewMemoryKV(), nil)} } func TestRbacAddCredential(t *testing.T) { diff --git a/internal/rootcoord/root_coord.go b/internal/rootcoord/root_coord.go index 0cfd064577b92..07d05113dd358 100644 --- a/internal/rootcoord/root_coord.go +++ b/internal/rootcoord/root_coord.go @@ -350,7 +350,7 @@ func (c *Core) initMetaTable() error { if ss, err = kvmetestore.NewSuffixSnapshot(metaKV, kvmetestore.SnapshotsSep, Params.EtcdCfg.MetaRootPath.GetValue(), kvmetestore.SnapshotPrefix); err != nil { return err } - catalog = &kvmetestore.Catalog{Txn: metaKV, Snapshot: ss} + catalog = kvmetestore.NewCatalog(metaKV, ss) case util.MetaStoreTypeTiKV: log.Info("Using tikv as meta storage.") var metaKV kv.MetaKv @@ -364,7 +364,7 @@ func (c *Core) initMetaTable() error { if ss, err = kvmetestore.NewSuffixSnapshot(metaKV, kvmetestore.SnapshotsSep, Params.TiKVCfg.MetaRootPath.GetValue(), kvmetestore.SnapshotPrefix); err != nil { return err } - catalog = &kvmetestore.Catalog{Txn: metaKV, Snapshot: ss} + catalog = kvmetestore.NewCatalog(metaKV, ss) default: return retry.Unrecoverable(fmt.Errorf("not supported meta store: %s", Params.MetaStoreCfg.MetaStoreType.GetValue())) } @@ -1262,6 +1262,73 @@ func (c *Core) ShowCollections(ctx context.Context, in *milvuspb.ShowCollections return t.Rsp, nil } +// ShowCollectionsInternal returns all collections, including unhealthy ones. +func (c *Core) ShowCollectionsInternal(ctx context.Context, in *rootcoordpb.ShowCollectionsInternalRequest) (*rootcoordpb.ShowCollectionsInternalResponse, error) { + if err := merr.CheckHealthy(c.GetStateCode()); err != nil { + return &rootcoordpb.ShowCollectionsInternalResponse{ + Status: merr.Status(err), + }, nil + } + + metrics.RootCoordDDLReqCounter.WithLabelValues("ShowCollectionsInternal", metrics.TotalLabel).Inc() + tr := timerecord.NewTimeRecorder("ShowCollectionsInternal") + + ts := typeutil.MaxTimestamp + log := log.Ctx(ctx).With(zap.Strings("dbNames", in.GetDbNames())) + + // Currently, this interface is only called during startup, so there is no need to execute it within the scheduler. + var err error + var dbs []*model.Database + if len(in.GetDbNames()) == 0 { + // show all collections + dbs, err = c.meta.ListDatabases(ctx, ts) + if err != nil { + log.Info("failed to ListDatabases", zap.Error(err)) + metrics.RootCoordDDLReqCounter.WithLabelValues("ShowCollectionsInternal", metrics.FailLabel).Inc() + return &rootcoordpb.ShowCollectionsInternalResponse{ + Status: merr.Status(err), + }, nil + } + } else { + dbs = make([]*model.Database, 0, len(in.GetDbNames())) + for _, name := range in.GetDbNames() { + db, err := c.meta.GetDatabaseByName(ctx, name, ts) + if err != nil { + log.Info("failed to GetDatabaseByName", zap.Error(err)) + metrics.RootCoordDDLReqCounter.WithLabelValues("ShowCollectionsInternal", metrics.FailLabel).Inc() + return &rootcoordpb.ShowCollectionsInternalResponse{ + Status: merr.Status(err), + }, nil + } + dbs = append(dbs, db) + } + } + dbCollections := make([]*rootcoordpb.DBCollections, 0, len(dbs)) + for _, db := range dbs { + collections, err := c.meta.ListCollections(ctx, db.Name, ts, false) + if err != nil { + log.Info("failed to ListCollections", zap.Error(err)) + metrics.RootCoordDDLReqCounter.WithLabelValues("ShowCollectionsInternal", metrics.FailLabel).Inc() + return &rootcoordpb.ShowCollectionsInternalResponse{ + Status: merr.Status(err), + }, nil + } + dbCollections = append(dbCollections, &rootcoordpb.DBCollections{ + DbName: db.Name, + CollectionIDs: lo.Map(collections, func(col *model.Collection, _ int) int64 { + return col.CollectionID + }), + }) + } + metrics.RootCoordDDLReqCounter.WithLabelValues("ShowCollectionsInternal", metrics.SuccessLabel).Inc() + metrics.RootCoordDDLReqLatency.WithLabelValues("ShowCollectionsInternal").Observe(float64(tr.ElapseSpan().Milliseconds())) + + return &rootcoordpb.ShowCollectionsInternalResponse{ + Status: merr.Success(), + DbCollections: dbCollections, + }, nil +} + func (c *Core) AlterCollection(ctx context.Context, in *milvuspb.AlterCollectionRequest) (*commonpb.Status, error) { if err := merr.CheckHealthy(c.GetStateCode()); err != nil { return merr.Status(err), nil diff --git a/internal/rootcoord/root_coord_test.go b/internal/rootcoord/root_coord_test.go index bdf93e15a9a41..2024bd205ee60 100644 --- a/internal/rootcoord/root_coord_test.go +++ b/internal/rootcoord/root_coord_test.go @@ -730,6 +730,72 @@ func TestRootCoord_ShowCollections(t *testing.T) { }) } +func TestRootCoord_ShowCollectionsInternal(t *testing.T) { + t.Run("not healthy", func(t *testing.T) { + c := newTestCore(withAbnormalCode()) + ctx := context.Background() + resp, err := c.ShowCollectionsInternal(ctx, &rootcoordpb.ShowCollectionsInternalRequest{}) + assert.NoError(t, err) + assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) + }) + + t.Run("test failed", func(t *testing.T) { + c := newTestCore(withHealthyCode()) + meta := mockrootcoord.NewIMetaTable(t) + c.meta = meta + + ctx := context.Background() + + // specify db names + meta.EXPECT().GetDatabaseByName(mock.Anything, mock.Anything, typeutil.MaxTimestamp).Return(nil, fmt.Errorf("mock err")) + resp, err := c.ShowCollectionsInternal(ctx, &rootcoordpb.ShowCollectionsInternalRequest{ + DbNames: []string{"db1"}, + }) + assert.NoError(t, err) + assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) + + // not specify db names + meta.EXPECT().ListDatabases(mock.Anything, mock.Anything).Return(nil, fmt.Errorf("mock err")) + resp, err = c.ShowCollectionsInternal(ctx, &rootcoordpb.ShowCollectionsInternalRequest{}) + assert.NoError(t, err) + assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) + + // list collections failed + meta.ExpectedCalls = nil + meta.EXPECT().ListDatabases(mock.Anything, mock.Anything).Return( + []*model.Database{model.NewDatabase(rand.Int63(), "db1", etcdpb.DatabaseState_DatabaseCreated, nil)}, nil) + meta.EXPECT().ListCollections(mock.Anything, mock.Anything, typeutil.MaxTimestamp, false).Return(nil, fmt.Errorf("mock err")) + resp, err = c.ShowCollectionsInternal(ctx, &rootcoordpb.ShowCollectionsInternalRequest{}) + assert.NoError(t, err) + assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) + }) + + t.Run("normal case, everything is ok", func(t *testing.T) { + c := newTestCore(withHealthyCode()) + meta := mockrootcoord.NewIMetaTable(t) + meta.EXPECT().ListCollections(mock.Anything, mock.Anything, typeutil.MaxTimestamp, false).Return([]*model.Collection{}, nil) + c.meta = meta + + ctx := context.Background() + + // specify db names + meta.EXPECT().GetDatabaseByName(mock.Anything, mock.Anything, typeutil.MaxTimestamp).Return( + model.NewDatabase(rand.Int63(), "db1", etcdpb.DatabaseState_DatabaseCreated, nil), nil) + resp, err := c.ShowCollectionsInternal(ctx, &rootcoordpb.ShowCollectionsInternalRequest{ + DbNames: []string{"db1"}, + }) + assert.NoError(t, err) + assert.Equal(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) + + // not specify db names + meta.EXPECT().ListDatabases(mock.Anything, mock.Anything).Return( + []*model.Database{model.NewDatabase(rand.Int63(), "db1", etcdpb.DatabaseState_DatabaseCreated, nil)}, nil) + resp, err = c.ShowCollectionsInternal(ctx, &rootcoordpb.ShowCollectionsInternalRequest{}) + assert.NoError(t, err) + assert.Equal(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) + }) +} + func TestRootCoord_HasPartition(t *testing.T) { t.Run("not healthy", func(t *testing.T) { c := newTestCore(withAbnormalCode()) diff --git a/internal/util/mock/grpc_rootcoord_client.go b/internal/util/mock/grpc_rootcoord_client.go index 0c5d431af4bd8..2ab8941f97e8a 100644 --- a/internal/util/mock/grpc_rootcoord_client.go +++ b/internal/util/mock/grpc_rootcoord_client.go @@ -158,6 +158,10 @@ func (m *GrpcRootCoordClient) ShowCollections(ctx context.Context, in *milvuspb. return &milvuspb.ShowCollectionsResponse{}, m.Err } +func (m *GrpcRootCoordClient) ShowCollectionsInternal(ctx context.Context, in *rootcoordpb.ShowCollectionsInternalRequest, opts ...grpc.CallOption) (*rootcoordpb.ShowCollectionsInternalResponse, error) { + return &rootcoordpb.ShowCollectionsInternalResponse{}, m.Err +} + func (m *GrpcRootCoordClient) CreatePartition(ctx context.Context, in *milvuspb.CreatePartitionRequest, opts ...grpc.CallOption) (*commonpb.Status, error) { return &commonpb.Status{}, m.Err } diff --git a/pkg/util/paramtable/service_param.go b/pkg/util/paramtable/service_param.go index 57b0cfb15ca5f..8467cb1e825b3 100644 --- a/pkg/util/paramtable/service_param.go +++ b/pkg/util/paramtable/service_param.go @@ -459,6 +459,7 @@ type MetaStoreConfig struct { SnapshotTTLSeconds ParamItem `refreshable:"true"` SnapshotReserveTimeSeconds ParamItem `refreshable:"true"` PaginationSize ParamItem `refreshable:"true"` + ReadConcurrency ParamItem `refreshable:"true"` } func (p *MetaStoreConfig) Init(base *BaseTable) { @@ -492,11 +493,19 @@ func (p *MetaStoreConfig) Init(base *BaseTable) { p.PaginationSize = ParamItem{ Key: "metastore.paginationSize", Version: "2.5.1", - DefaultValue: "10000", + DefaultValue: "100000", Doc: `limits the number of results to return from metastore.`, } p.PaginationSize.Init(base.mgr) + p.ReadConcurrency = ParamItem{ + Key: "metastore.readConcurrency", + Version: "2.5.1", + DefaultValue: "32", + Doc: `read concurrency for fetching metadata from the metastore.`, + } + p.ReadConcurrency.Init(base.mgr) + // TODO: The initialization operation of metadata storage is called in the initialization phase of every node. // There should be a single initialization operation for meta store, then move the metrics registration to there. metrics.RegisterMetaType(p.MetaStoreType.GetValue()) diff --git a/pkg/util/paramtable/service_param_test.go b/pkg/util/paramtable/service_param_test.go index 889a0ab048ded..38c2b9d15ac52 100644 --- a/pkg/util/paramtable/service_param_test.go +++ b/pkg/util/paramtable/service_param_test.go @@ -211,6 +211,7 @@ func TestServiceParam(t *testing.T) { assert.Equal(t, util.MetaStoreTypeEtcd, Params.MetaStoreType.GetValue()) assert.Equal(t, 86400*time.Second, Params.SnapshotTTLSeconds.GetAsDuration(time.Second)) assert.Equal(t, 3600*time.Second, Params.SnapshotReserveTimeSeconds.GetAsDuration(time.Second)) - assert.Equal(t, 10000, Params.PaginationSize.GetAsInt()) + assert.Equal(t, 100000, Params.PaginationSize.GetAsInt()) + assert.Equal(t, 32, Params.ReadConcurrency.GetAsInt()) }) }