From 3cb482ad4818ecf310c576ae1f1b4f398116034d Mon Sep 17 00:00:00 2001 From: "yihao.dai" Date: Tue, 3 Dec 2024 16:28:39 +0800 Subject: [PATCH] fix: Fix timeout when listing meta Signed-off-by: bigsheeper --- internal/metastore/kv/datacoord/kv_catalog.go | 150 ++++++++++-------- .../metastore/kv/datacoord/kv_catalog_test.go | 58 ++++--- .../metastore/kv/querycoord/kv_catalog.go | 72 +++++---- .../kv/querycoord/kv_catalog_test.go | 2 +- 4 files changed, 161 insertions(+), 121 deletions(-) diff --git a/internal/metastore/kv/datacoord/kv_catalog.go b/internal/metastore/kv/datacoord/kv_catalog.go index 15a3bfde4096f..a7af54e0d2abc 100644 --- a/internal/metastore/kv/datacoord/kv_catalog.go +++ b/internal/metastore/kv/datacoord/kv_catalog.go @@ -451,23 +451,23 @@ func (kc *Catalog) DropChannel(ctx context.Context, channel string) error { } func (kc *Catalog) ListChannelCheckpoint(ctx context.Context) (map[string]*msgpb.MsgPosition, error) { - keys, values, err := kc.MetaKv.LoadWithPrefix(ChannelCheckpointPrefix) - if err != nil { - return nil, err - } - channelCPs := make(map[string]*msgpb.MsgPosition) - for i, key := range keys { - value := values[i] + applyFn := func(key []byte, value []byte) error { channelCP := &msgpb.MsgPosition{} - err = proto.Unmarshal([]byte(value), channelCP) + err := proto.Unmarshal(value, channelCP) if err != nil { log.Error("unmarshal channelCP failed when ListChannelCheckpoint", zap.Error(err)) - return nil, err + return err } - ss := strings.Split(key, "/") + ss := strings.Split(string(key), "/") vChannel := ss[len(ss)-1] channelCPs[vChannel] = channelCP + return nil + } + + err := kc.MetaKv.WalkWithPrefix(ChannelCheckpointPrefix, paginationSize, applyFn) + if err != nil { + return nil, err } return channelCPs, nil @@ -537,24 +537,23 @@ func (kc *Catalog) CreateIndex(ctx context.Context, index *model.Index) error { } func (kc *Catalog) ListIndexes(ctx context.Context) ([]*model.Index, error) { - _, values, err := kc.MetaKv.LoadWithPrefix(util.FieldIndexPrefix) - if err != nil { - log.Error("list index meta fail", zap.String("prefix", util.FieldIndexPrefix), zap.Error(err)) - return nil, err - } - indexes := make([]*model.Index, 0) - for _, value := range values { + applyFn := func(key []byte, value []byte) error { meta := &indexpb.FieldIndex{} - err = proto.Unmarshal([]byte(value), meta) + err := proto.Unmarshal(value, meta) if err != nil { log.Warn("unmarshal index info failed", zap.Error(err)) - return nil, err + return err } indexes = append(indexes, model.UnmarshalIndexModel(meta)) + return nil } + err := kc.MetaKv.WalkWithPrefix(util.FieldIndexPrefix, paginationSize, applyFn) + if err != nil { + return nil, err + } return indexes, nil } @@ -614,22 +613,22 @@ func (kc *Catalog) CreateSegmentIndex(ctx context.Context, segIdx *model.Segment } func (kc *Catalog) ListSegmentIndexes(ctx context.Context) ([]*model.SegmentIndex, error) { - _, values, err := kc.MetaKv.LoadWithPrefix(util.SegmentIndexPrefix) - if err != nil { - log.Error("list segment index meta fail", zap.String("prefix", util.SegmentIndexPrefix), zap.Error(err)) - return nil, err - } - segIndexes := make([]*model.SegmentIndex, 0) - for _, value := range values { + applyFn := func(key []byte, value []byte) error { segmentIndexInfo := &indexpb.SegmentIndex{} - err = proto.Unmarshal([]byte(value), segmentIndexInfo) + err := proto.Unmarshal(value, segmentIndexInfo) if err != nil { log.Warn("unmarshal segment index info failed", zap.Error(err)) - return segIndexes, err + return err } segIndexes = append(segIndexes, model.UnmarshalSegmentIndexModel(segmentIndexInfo)) + return nil + } + + err := kc.MetaKv.WalkWithPrefix(util.SegmentIndexPrefix, paginationSize, applyFn) + if err != nil { + return nil, err } return segIndexes, nil @@ -671,17 +670,19 @@ func (kc *Catalog) SaveImportJob(job *datapb.ImportJob) error { func (kc *Catalog) ListImportJobs() ([]*datapb.ImportJob, error) { jobs := make([]*datapb.ImportJob, 0) - _, values, err := kc.MetaKv.LoadWithPrefix(ImportJobPrefix) - if err != nil { - return nil, err - } - for _, value := range values { + applyFn := func(key []byte, value []byte) error { job := &datapb.ImportJob{} - err = proto.Unmarshal([]byte(value), job) + err := proto.Unmarshal(value, job) if err != nil { - return nil, err + return err } jobs = append(jobs, job) + return nil + } + + err := kc.MetaKv.WalkWithPrefix(ImportJobPrefix, paginationSize, applyFn) + if err != nil { + return nil, err } return jobs, nil } @@ -703,19 +704,20 @@ func (kc *Catalog) SavePreImportTask(task *datapb.PreImportTask) error { func (kc *Catalog) ListPreImportTasks() ([]*datapb.PreImportTask, error) { tasks := make([]*datapb.PreImportTask, 0) - _, values, err := kc.MetaKv.LoadWithPrefix(PreImportTaskPrefix) - if err != nil { - return nil, err - } - for _, value := range values { + applyFn := func(key []byte, value []byte) error { task := &datapb.PreImportTask{} - err = proto.Unmarshal([]byte(value), task) + err := proto.Unmarshal(value, task) if err != nil { - return nil, err + return err } tasks = append(tasks, task) + return nil } + err := kc.MetaKv.WalkWithPrefix(PreImportTaskPrefix, paginationSize, applyFn) + if err != nil { + return nil, err + } return tasks, nil } @@ -736,17 +738,19 @@ func (kc *Catalog) SaveImportTask(task *datapb.ImportTaskV2) error { func (kc *Catalog) ListImportTasks() ([]*datapb.ImportTaskV2, error) { tasks := make([]*datapb.ImportTaskV2, 0) - _, values, err := kc.MetaKv.LoadWithPrefix(ImportTaskPrefix) - if err != nil { - return nil, err - } - for _, value := range values { + applyFn := func(key []byte, value []byte) error { task := &datapb.ImportTaskV2{} - err = proto.Unmarshal([]byte(value), task) + err := proto.Unmarshal(value, task) if err != nil { - return nil, err + return err } tasks = append(tasks, task) + return nil + } + + err := kc.MetaKv.WalkWithPrefix(ImportTaskPrefix, paginationSize, applyFn) + if err != nil { + return nil, err } return tasks, nil } @@ -774,17 +778,19 @@ func (kc *Catalog) GcConfirm(ctx context.Context, collectionID, partitionID type func (kc *Catalog) ListCompactionTask(ctx context.Context) ([]*datapb.CompactionTask, error) { tasks := make([]*datapb.CompactionTask, 0) - _, values, err := kc.MetaKv.LoadWithPrefix(CompactionTaskPrefix) - if err != nil { - return nil, err - } - for _, value := range values { + applyFn := func(key []byte, value []byte) error { info := &datapb.CompactionTask{} - err = proto.Unmarshal([]byte(value), info) + err := proto.Unmarshal(value, info) if err != nil { - return nil, err + return err } tasks = append(tasks, info) + return nil + } + + err := kc.MetaKv.WalkWithPrefix(CompactionTaskPrefix, paginationSize, applyFn) + if err != nil { + return nil, err } return tasks, nil } @@ -811,17 +817,19 @@ func (kc *Catalog) DropCompactionTask(ctx context.Context, task *datapb.Compacti func (kc *Catalog) ListAnalyzeTasks(ctx context.Context) ([]*indexpb.AnalyzeTask, error) { tasks := make([]*indexpb.AnalyzeTask, 0) - _, values, err := kc.MetaKv.LoadWithPrefix(AnalyzeTaskPrefix) - if err != nil { - return nil, err - } - for _, value := range values { + applyFn := func(key []byte, value []byte) error { task := &indexpb.AnalyzeTask{} - err = proto.Unmarshal([]byte(value), task) + err := proto.Unmarshal(value, task) if err != nil { - return nil, err + return err } tasks = append(tasks, task) + return nil + } + + err := kc.MetaKv.WalkWithPrefix(AnalyzeTaskPrefix, paginationSize, applyFn) + if err != nil { + return nil, err } return tasks, nil } @@ -849,17 +857,19 @@ func (kc *Catalog) DropAnalyzeTask(ctx context.Context, taskID typeutil.UniqueID func (kc *Catalog) ListPartitionStatsInfos(ctx context.Context) ([]*datapb.PartitionStatsInfo, error) { infos := make([]*datapb.PartitionStatsInfo, 0) - _, values, err := kc.MetaKv.LoadWithPrefix(PartitionStatsInfoPrefix) - if err != nil { - return nil, err - } - for _, value := range values { + applyFn := func(key []byte, value []byte) error { info := &datapb.PartitionStatsInfo{} - err = proto.Unmarshal([]byte(value), info) + err := proto.Unmarshal(value, info) if err != nil { - return nil, err + return err } infos = append(infos, info) + return nil + } + + err := kc.MetaKv.WalkWithPrefix(PartitionStatsInfoPrefix, paginationSize, applyFn) + if err != nil { + return nil, err } return infos, nil } diff --git a/internal/metastore/kv/datacoord/kv_catalog_test.go b/internal/metastore/kv/datacoord/kv_catalog_test.go index 3c2e76999e381..7076e6f1ebb81 100644 --- a/internal/metastore/kv/datacoord/kv_catalog_test.go +++ b/internal/metastore/kv/datacoord/kv_catalog_test.go @@ -638,7 +638,9 @@ func TestChannelCP(t *testing.T) { err := catalog.SaveChannelCheckpoint(context.TODO(), mockVChannel, pos) assert.NoError(t, err) - txn.EXPECT().LoadWithPrefix(mock.Anything).Return([]string{k}, []string{string(v)}, nil) + txn.EXPECT().WalkWithPrefix(mock.Anything, mock.Anything, mock.Anything).RunAndReturn(func(_ string, _ int, f func([]byte, []byte) error) error { + return f([]byte(k), v) + }) res, err := catalog.ListChannelCheckpoint(context.TODO()) assert.NoError(t, err) assert.True(t, len(res) > 0) @@ -647,7 +649,7 @@ func TestChannelCP(t *testing.T) { t.Run("ListChannelCheckpoint failed", func(t *testing.T) { txn := mocks.NewMetaKv(t) catalog := NewCatalog(txn, rootPath, "") - txn.EXPECT().LoadWithPrefix(mock.Anything).Return(nil, nil, errors.New("mock error")) + txn.EXPECT().WalkWithPrefix(mock.Anything, mock.Anything, mock.Anything).Return(errors.New("mock error")) _, err = catalog.ListChannelCheckpoint(context.TODO()) assert.Error(t, err) }) @@ -692,7 +694,7 @@ func TestChannelCP(t *testing.T) { assert.NoError(t, err) txn.EXPECT().Remove(mock.Anything).Return(nil) - txn.EXPECT().LoadWithPrefix(mock.Anything).Return(nil, nil, nil) + txn.EXPECT().WalkWithPrefix(mock.Anything, mock.Anything, mock.Anything).Return(nil) err = catalog.DropChannelCheckpoint(context.TODO(), mockVChannel) assert.NoError(t, err) res, err := catalog.ListChannelCheckpoint(context.TODO()) @@ -879,7 +881,7 @@ func TestCatalog_CreateIndex(t *testing.T) { func TestCatalog_ListIndexes(t *testing.T) { t.Run("success", func(t *testing.T) { metakv := mocks.NewMetaKv(t) - metakv.EXPECT().LoadWithPrefix(mock.Anything).RunAndReturn(func(s string) ([]string, []string, error) { + metakv.EXPECT().WalkWithPrefix(mock.Anything, mock.Anything, mock.Anything).RunAndReturn(func(_ string, _ int, f func([]byte, []byte) error) error { i := &indexpb.FieldIndex{ IndexInfo: &indexpb.IndexInfo{ CollectionID: 0, @@ -894,7 +896,7 @@ func TestCatalog_ListIndexes(t *testing.T) { } v, err := proto.Marshal(i) assert.NoError(t, err) - return []string{"1"}, []string{string(v)}, nil + return f([]byte("1"), v) }) catalog := &Catalog{ @@ -907,7 +909,7 @@ func TestCatalog_ListIndexes(t *testing.T) { t.Run("failed", func(t *testing.T) { txn := mocks.NewMetaKv(t) - txn.EXPECT().LoadWithPrefix(mock.Anything).Return(nil, nil, errors.New("error")) + txn.EXPECT().WalkWithPrefix(mock.Anything, mock.Anything, mock.Anything).Return(errors.New("error")) catalog := &Catalog{ MetaKv: txn, } @@ -917,7 +919,9 @@ func TestCatalog_ListIndexes(t *testing.T) { t.Run("unmarshal failed", func(t *testing.T) { txn := mocks.NewMetaKv(t) - txn.EXPECT().LoadWithPrefix(mock.Anything).Return([]string{"1"}, []string{"invalid"}, nil) + txn.EXPECT().WalkWithPrefix(mock.Anything, mock.Anything, mock.Anything).RunAndReturn(func(_ string, _ int, f func([]byte, []byte) error) error { + return f([]byte("1"), []byte("invalid")) + }) catalog := &Catalog{ MetaKv: txn, @@ -1071,7 +1075,9 @@ func TestCatalog_ListSegmentIndexes(t *testing.T) { assert.NoError(t, err) metakv := mocks.NewMetaKv(t) - metakv.EXPECT().LoadWithPrefix(mock.Anything).Return([]string{"key"}, []string{string(v)}, nil) + metakv.EXPECT().WalkWithPrefix(mock.Anything, mock.Anything, mock.Anything).RunAndReturn(func(_ string, _ int, f func([]byte, []byte) error) error { + return f([]byte("key"), v) + }) catalog := &Catalog{ MetaKv: metakv, } @@ -1083,7 +1089,7 @@ func TestCatalog_ListSegmentIndexes(t *testing.T) { t.Run("failed", func(t *testing.T) { metakv := mocks.NewMetaKv(t) - metakv.EXPECT().LoadWithPrefix(mock.Anything).Return([]string{}, []string{}, errors.New("error")) + metakv.EXPECT().WalkWithPrefix(mock.Anything, mock.Anything, mock.Anything).Return(errors.New("error")) catalog := &Catalog{ MetaKv: metakv, } @@ -1094,7 +1100,9 @@ func TestCatalog_ListSegmentIndexes(t *testing.T) { t.Run("unmarshal failed", func(t *testing.T) { metakv := mocks.NewMetaKv(t) - metakv.EXPECT().LoadWithPrefix(mock.Anything).Return([]string{"key"}, []string{"invalid"}, nil) + metakv.EXPECT().WalkWithPrefix(mock.Anything, mock.Anything, mock.Anything).RunAndReturn(func(_ string, _ int, f func([]byte, []byte) error) error { + return f([]byte("key"), []byte("invalid")) + }) catalog := &Catalog{ MetaKv: metakv, } @@ -1377,20 +1385,24 @@ func TestCatalog_Import(t *testing.T) { txn := mocks.NewMetaKv(t) value, err := proto.Marshal(job) assert.NoError(t, err) - txn.EXPECT().LoadWithPrefix(mock.Anything).Return(nil, []string{string(value)}, nil) + txn.EXPECT().WalkWithPrefix(mock.Anything, mock.Anything, mock.Anything).RunAndReturn(func(_ string, _ int, f func([]byte, []byte) error) error { + return f(nil, value) + }) kc.MetaKv = txn jobs, err := kc.ListImportJobs() assert.NoError(t, err) assert.Equal(t, 1, len(jobs)) txn = mocks.NewMetaKv(t) - txn.EXPECT().LoadWithPrefix(mock.Anything).Return(nil, []string{"@#%#^#"}, nil) + txn.EXPECT().WalkWithPrefix(mock.Anything, mock.Anything, mock.Anything).RunAndReturn(func(_ string, _ int, f func([]byte, []byte) error) error { + return f(nil, []byte("@#%#^#")) + }) kc.MetaKv = txn _, err = kc.ListImportJobs() assert.Error(t, err) txn = mocks.NewMetaKv(t) - txn.EXPECT().LoadWithPrefix(mock.Anything).Return(nil, nil, mockErr) + txn.EXPECT().WalkWithPrefix(mock.Anything, mock.Anything, mock.Anything).Return(mockErr) kc.MetaKv = txn _, err = kc.ListImportJobs() assert.Error(t, err) @@ -1431,20 +1443,24 @@ func TestCatalog_Import(t *testing.T) { txn := mocks.NewMetaKv(t) value, err := proto.Marshal(pit) assert.NoError(t, err) - txn.EXPECT().LoadWithPrefix(mock.Anything).Return(nil, []string{string(value)}, nil) + txn.EXPECT().WalkWithPrefix(mock.Anything, mock.Anything, mock.Anything).RunAndReturn(func(_ string, _ int, f func([]byte, []byte) error) error { + return f(nil, value) + }) kc.MetaKv = txn tasks, err := kc.ListPreImportTasks() assert.NoError(t, err) assert.Equal(t, 1, len(tasks)) txn = mocks.NewMetaKv(t) - txn.EXPECT().LoadWithPrefix(mock.Anything).Return(nil, []string{"@#%#^#"}, nil) + txn.EXPECT().WalkWithPrefix(mock.Anything, mock.Anything, mock.Anything).RunAndReturn(func(_ string, _ int, f func([]byte, []byte) error) error { + return f(nil, []byte("@#%#^#")) + }) kc.MetaKv = txn _, err = kc.ListPreImportTasks() assert.Error(t, err) txn = mocks.NewMetaKv(t) - txn.EXPECT().LoadWithPrefix(mock.Anything).Return(nil, nil, mockErr) + txn.EXPECT().WalkWithPrefix(mock.Anything, mock.Anything, mock.Anything).Return(mockErr) kc.MetaKv = txn _, err = kc.ListPreImportTasks() assert.Error(t, err) @@ -1485,20 +1501,24 @@ func TestCatalog_Import(t *testing.T) { txn := mocks.NewMetaKv(t) value, err := proto.Marshal(it) assert.NoError(t, err) - txn.EXPECT().LoadWithPrefix(mock.Anything).Return(nil, []string{string(value)}, nil) + txn.EXPECT().WalkWithPrefix(mock.Anything, mock.Anything, mock.Anything).RunAndReturn(func(_ string, _ int, f func([]byte, []byte) error) error { + return f(nil, value) + }) kc.MetaKv = txn tasks, err := kc.ListImportTasks() assert.NoError(t, err) assert.Equal(t, 1, len(tasks)) txn = mocks.NewMetaKv(t) - txn.EXPECT().LoadWithPrefix(mock.Anything).Return(nil, []string{"@#%#^#"}, nil) + txn.EXPECT().WalkWithPrefix(mock.Anything, mock.Anything, mock.Anything).RunAndReturn(func(_ string, _ int, f func([]byte, []byte) error) error { + return f(nil, []byte("@#%#^#")) + }) kc.MetaKv = txn _, err = kc.ListImportTasks() assert.Error(t, err) txn = mocks.NewMetaKv(t) - txn.EXPECT().LoadWithPrefix(mock.Anything).Return(nil, nil, mockErr) + txn.EXPECT().WalkWithPrefix(mock.Anything, mock.Anything, mock.Anything).Return(mockErr) kc.MetaKv = txn _, err = kc.ListImportTasks() assert.Error(t, err) diff --git a/internal/metastore/kv/querycoord/kv_catalog.go b/internal/metastore/kv/querycoord/kv_catalog.go index ce546d52340c4..e070cb2acbb0b 100644 --- a/internal/metastore/kv/querycoord/kv_catalog.go +++ b/internal/metastore/kv/querycoord/kv_catalog.go @@ -18,6 +18,8 @@ import ( "github.com/milvus-io/milvus/pkg/util/compressor" ) +var paginationSize = 2000 + var ErrInvalidKey = errors.New("invalid load info key") const ( @@ -104,51 +106,57 @@ func (s Catalog) RemoveResourceGroup(rgName string) error { } func (s Catalog) GetCollections() ([]*querypb.CollectionLoadInfo, error) { - _, values, err := s.cli.LoadWithPrefix(CollectionLoadInfoPrefix) - if err != nil { - return nil, err - } - ret := make([]*querypb.CollectionLoadInfo, 0, len(values)) - for _, v := range values { + ret := make([]*querypb.CollectionLoadInfo, 0) + applyFn := func(key []byte, value []byte) error { info := querypb.CollectionLoadInfo{} - if err := proto.Unmarshal([]byte(v), &info); err != nil { - return nil, err + if err := proto.Unmarshal(value, &info); err != nil { + return err } ret = append(ret, &info) + return nil + } + + err := s.cli.WalkWithPrefix(CollectionLoadInfoPrefix, paginationSize, applyFn) + if err != nil { + return nil, err } return ret, nil } func (s Catalog) GetPartitions() (map[int64][]*querypb.PartitionLoadInfo, error) { - _, values, err := s.cli.LoadWithPrefix(PartitionLoadInfoPrefix) - if err != nil { - return nil, err - } ret := make(map[int64][]*querypb.PartitionLoadInfo) - for _, v := range values { + applyFn := func(key []byte, value []byte) error { info := querypb.PartitionLoadInfo{} - if err := proto.Unmarshal([]byte(v), &info); err != nil { - return nil, err + if err := proto.Unmarshal(value, &info); err != nil { + return err } ret[info.GetCollectionID()] = append(ret[info.GetCollectionID()], &info) + return nil + } + + err := s.cli.WalkWithPrefix(PartitionLoadInfoPrefix, paginationSize, applyFn) + if err != nil { + return nil, err } return ret, nil } func (s Catalog) GetReplicas() ([]*querypb.Replica, error) { - _, values, err := s.cli.LoadWithPrefix(ReplicaPrefix) - if err != nil { - return nil, err - } - ret := make([]*querypb.Replica, 0, len(values)) - for _, v := range values { + ret := make([]*querypb.Replica, 0) + applyFn := func(key []byte, value []byte) error { info := querypb.Replica{} - if err := proto.Unmarshal([]byte(v), &info); err != nil { - return nil, err + if err := proto.Unmarshal(value, &info); err != nil { + return err } ret = append(ret, &info) + return nil + } + + err := s.cli.WalkWithPrefix(ReplicaPrefix, paginationSize, applyFn) + if err != nil { + return nil, err } replicasV1, err := s.getReplicasFromV1() @@ -289,21 +297,23 @@ func (s Catalog) RemoveCollectionTarget(collectionID int64) error { } func (s Catalog) GetCollectionTargets() (map[int64]*querypb.CollectionTarget, error) { - keys, values, err := s.cli.LoadWithPrefix(CollectionTargetPrefix) - if err != nil { - return nil, err - } ret := make(map[int64]*querypb.CollectionTarget) - for i, v := range values { + applyFn := func(key []byte, value []byte) error { var decompressed bytes.Buffer - compressor.ZstdDecompress(bytes.NewReader([]byte(v)), io.Writer(&decompressed)) + compressor.ZstdDecompress(bytes.NewReader(value), io.Writer(&decompressed)) target := &querypb.CollectionTarget{} if err := proto.Unmarshal(decompressed.Bytes(), target); err != nil { // recover target from meta is a optimize policy, skip when failure happens - log.Warn("failed to unmarshal collection target", zap.String("key", keys[i]), zap.Error(err)) - continue + log.Warn("failed to unmarshal collection target", zap.String("key", string(key)), zap.Error(err)) + return nil } ret[target.GetCollectionID()] = target + return nil + } + + err := s.cli.WalkWithPrefix(CollectionTargetPrefix, paginationSize, applyFn) + if err != nil { + return nil, err } return ret, nil diff --git a/internal/metastore/kv/querycoord/kv_catalog_test.go b/internal/metastore/kv/querycoord/kv_catalog_test.go index 6dbdadfb1f004..078c943a91afa 100644 --- a/internal/metastore/kv/querycoord/kv_catalog_test.go +++ b/internal/metastore/kv/querycoord/kv_catalog_test.go @@ -231,7 +231,7 @@ func (suite *CatalogTestSuite) TestCollectionTarget() { mockStore := mocks.NewMetaKv(suite.T()) mockErr := errors.New("failed to access etcd") mockStore.EXPECT().MultiSave(mock.Anything).Return(mockErr) - mockStore.EXPECT().LoadWithPrefix(mock.Anything).Return(nil, nil, mockErr) + mockStore.EXPECT().WalkWithPrefix(mock.Anything, mock.Anything, mock.Anything).Return(mockErr) suite.catalog.cli = mockStore err = suite.catalog.SaveCollectionTargets(&querypb.CollectionTarget{})