From a514f839b242fca8dd92f1721b84d1ec18eeabc2 Mon Sep 17 00:00:00 2001 From: "yihao.dai" Date: Thu, 12 Dec 2024 10:58:43 +0800 Subject: [PATCH] enhance: Speed up meta recovery (#38285) Increase the batchSize in WalkWithPrefix operations to 10000. issue: https://github.com/milvus-io/milvus/issues/37630 --------- Signed-off-by: bigsheeper --- internal/metastore/kv/datacoord/kv_catalog.go | 38 +++++++++++-------- .../metastore/kv/querycoord/kv_catalog.go | 17 +++++---- .../metastore/kv/rootcoord/suffix_snapshot.go | 16 ++++---- pkg/util/paramtable/service_param.go | 9 +++++ pkg/util/paramtable/service_param_test.go | 10 +++++ 5 files changed, 58 insertions(+), 32 deletions(-) diff --git a/internal/metastore/kv/datacoord/kv_catalog.go b/internal/metastore/kv/datacoord/kv_catalog.go index 7bea3819433ca..989fb3e2e0d04 100644 --- a/internal/metastore/kv/datacoord/kv_catalog.go +++ b/internal/metastore/kv/datacoord/kv_catalog.go @@ -44,19 +44,25 @@ import ( "github.com/milvus-io/milvus/pkg/util" "github.com/milvus-io/milvus/pkg/util/etcd" "github.com/milvus-io/milvus/pkg/util/merr" + "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/typeutil" ) -var paginationSize = 2000 - type Catalog struct { - MetaKv kv.MetaKv + MetaKv kv.MetaKv + + paginationSize int ChunkManagerRootPath string metaRootpath string } func NewCatalog(MetaKv kv.MetaKv, chunkManagerRootPath string, metaRootpath string) *Catalog { - return &Catalog{MetaKv: MetaKv, ChunkManagerRootPath: chunkManagerRootPath, metaRootpath: metaRootpath} + return &Catalog{ + MetaKv: MetaKv, + paginationSize: paramtable.Get().MetaStoreCfg.PaginationSize.GetAsInt(), + ChunkManagerRootPath: chunkManagerRootPath, + metaRootpath: metaRootpath, + } } func (kc *Catalog) ListSegments(ctx context.Context) ([]*datapb.SegmentInfo, error) { @@ -130,7 +136,7 @@ func (kc *Catalog) listSegments(ctx context.Context) ([]*datapb.SegmentInfo, err return nil } - err := kc.MetaKv.WalkWithPrefix(ctx, SegmentPrefix+"/", paginationSize, applyFn) + err := kc.MetaKv.WalkWithPrefix(ctx, SegmentPrefix+"/", kc.paginationSize, applyFn) if err != nil { return nil, err } @@ -215,7 +221,7 @@ func (kc *Catalog) listBinlogs(ctx context.Context, binlogType storage.BinlogTyp return nil } - err = kc.MetaKv.WalkWithPrefix(ctx, logPathPrefix, paginationSize, applyFn) + err = kc.MetaKv.WalkWithPrefix(ctx, logPathPrefix, kc.paginationSize, applyFn) if err != nil { return nil, err } @@ -483,7 +489,7 @@ func (kc *Catalog) ListChannelCheckpoint(ctx context.Context) (map[string]*msgpb return nil } - err := kc.MetaKv.WalkWithPrefix(ctx, ChannelCheckpointPrefix, paginationSize, applyFn) + err := kc.MetaKv.WalkWithPrefix(ctx, ChannelCheckpointPrefix, kc.paginationSize, applyFn) if err != nil { return nil, err } @@ -568,7 +574,7 @@ func (kc *Catalog) ListIndexes(ctx context.Context) ([]*model.Index, error) { return nil } - err := kc.MetaKv.WalkWithPrefix(ctx, util.FieldIndexPrefix, paginationSize, applyFn) + err := kc.MetaKv.WalkWithPrefix(ctx, util.FieldIndexPrefix, kc.paginationSize, applyFn) if err != nil { return nil, err } @@ -644,7 +650,7 @@ func (kc *Catalog) ListSegmentIndexes(ctx context.Context) ([]*model.SegmentInde return nil } - err := kc.MetaKv.WalkWithPrefix(ctx, util.SegmentIndexPrefix, paginationSize, applyFn) + err := kc.MetaKv.WalkWithPrefix(ctx, util.SegmentIndexPrefix, kc.paginationSize, applyFn) if err != nil { return nil, err } @@ -698,7 +704,7 @@ func (kc *Catalog) ListImportJobs(ctx context.Context) ([]*datapb.ImportJob, err return nil } - err := kc.MetaKv.WalkWithPrefix(ctx, ImportJobPrefix, paginationSize, applyFn) + err := kc.MetaKv.WalkWithPrefix(ctx, ImportJobPrefix, kc.paginationSize, applyFn) if err != nil { return nil, err } @@ -732,7 +738,7 @@ func (kc *Catalog) ListPreImportTasks(ctx context.Context) ([]*datapb.PreImportT return nil } - err := kc.MetaKv.WalkWithPrefix(ctx, PreImportTaskPrefix, paginationSize, applyFn) + err := kc.MetaKv.WalkWithPrefix(ctx, PreImportTaskPrefix, kc.paginationSize, applyFn) if err != nil { return nil, err } @@ -766,7 +772,7 @@ func (kc *Catalog) ListImportTasks(ctx context.Context) ([]*datapb.ImportTaskV2, return nil } - err := kc.MetaKv.WalkWithPrefix(ctx, ImportTaskPrefix, paginationSize, applyFn) + err := kc.MetaKv.WalkWithPrefix(ctx, ImportTaskPrefix, kc.paginationSize, applyFn) if err != nil { return nil, err } @@ -806,7 +812,7 @@ func (kc *Catalog) ListCompactionTask(ctx context.Context) ([]*datapb.Compaction return nil } - err := kc.MetaKv.WalkWithPrefix(ctx, CompactionTaskPrefix, paginationSize, applyFn) + err := kc.MetaKv.WalkWithPrefix(ctx, CompactionTaskPrefix, kc.paginationSize, applyFn) if err != nil { return nil, err } @@ -845,7 +851,7 @@ func (kc *Catalog) ListAnalyzeTasks(ctx context.Context) ([]*indexpb.AnalyzeTask return nil } - err := kc.MetaKv.WalkWithPrefix(ctx, AnalyzeTaskPrefix, paginationSize, applyFn) + err := kc.MetaKv.WalkWithPrefix(ctx, AnalyzeTaskPrefix, kc.paginationSize, applyFn) if err != nil { return nil, err } @@ -885,7 +891,7 @@ func (kc *Catalog) ListPartitionStatsInfos(ctx context.Context) ([]*datapb.Parti return nil } - err := kc.MetaKv.WalkWithPrefix(ctx, PartitionStatsInfoPrefix, paginationSize, applyFn) + err := kc.MetaKv.WalkWithPrefix(ctx, PartitionStatsInfoPrefix, kc.paginationSize, applyFn) if err != nil { return nil, err } @@ -948,7 +954,7 @@ func (kc *Catalog) ListStatsTasks(ctx context.Context) ([]*indexpb.StatsTask, er return nil } - err := kc.MetaKv.WalkWithPrefix(ctx, StatsTaskPrefix, paginationSize, applyFn) + err := kc.MetaKv.WalkWithPrefix(ctx, StatsTaskPrefix, kc.paginationSize, applyFn) if err != nil { return nil, err } diff --git a/internal/metastore/kv/querycoord/kv_catalog.go b/internal/metastore/kv/querycoord/kv_catalog.go index 026bf2b01da69..3531d8e1e43dc 100644 --- a/internal/metastore/kv/querycoord/kv_catalog.go +++ b/internal/metastore/kv/querycoord/kv_catalog.go @@ -17,10 +17,9 @@ import ( "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/pkg/kv" "github.com/milvus-io/milvus/pkg/util/compressor" + "github.com/milvus-io/milvus/pkg/util/paramtable" ) -var paginationSize = 2000 - var ErrInvalidKey = errors.New("invalid load info key") const ( @@ -36,12 +35,14 @@ const ( ) type Catalog struct { - cli kv.MetaKv + cli kv.MetaKv + paginationSize int } func NewCatalog(cli kv.MetaKv) Catalog { return Catalog{ - cli: cli, + cli: cli, + paginationSize: paramtable.Get().MetaStoreCfg.PaginationSize.GetAsInt(), } } @@ -117,7 +118,7 @@ func (s Catalog) GetCollections(ctx context.Context) ([]*querypb.CollectionLoadI return nil } - err := s.cli.WalkWithPrefix(ctx, CollectionLoadInfoPrefix, paginationSize, applyFn) + err := s.cli.WalkWithPrefix(ctx, CollectionLoadInfoPrefix, s.paginationSize, applyFn) if err != nil { return nil, err } @@ -136,7 +137,7 @@ func (s Catalog) GetPartitions(ctx context.Context) (map[int64][]*querypb.Partit return nil } - err := s.cli.WalkWithPrefix(ctx, PartitionLoadInfoPrefix, paginationSize, applyFn) + err := s.cli.WalkWithPrefix(ctx, PartitionLoadInfoPrefix, s.paginationSize, applyFn) if err != nil { return nil, err } @@ -155,7 +156,7 @@ func (s Catalog) GetReplicas(ctx context.Context) ([]*querypb.Replica, error) { return nil } - err := s.cli.WalkWithPrefix(ctx, ReplicaPrefix, paginationSize, applyFn) + err := s.cli.WalkWithPrefix(ctx, ReplicaPrefix, s.paginationSize, applyFn) if err != nil { return nil, err } @@ -318,7 +319,7 @@ func (s Catalog) GetCollectionTargets(ctx context.Context) (map[int64]*querypb.C return nil } - err := s.cli.WalkWithPrefix(ctx, CollectionTargetPrefix, paginationSize, applyFn) + err := s.cli.WalkWithPrefix(ctx, CollectionTargetPrefix, s.paginationSize, applyFn) if err != nil { return nil, err } diff --git a/internal/metastore/kv/rootcoord/suffix_snapshot.go b/internal/metastore/kv/rootcoord/suffix_snapshot.go index e305bfbfa9b83..60a5080d58bfc 100644 --- a/internal/metastore/kv/rootcoord/suffix_snapshot.go +++ b/internal/metastore/kv/rootcoord/suffix_snapshot.go @@ -42,11 +42,8 @@ import ( "github.com/milvus-io/milvus/pkg/util/typeutil" ) -var ( - // SuffixSnapshotTombstone special value for tombstone mark - SuffixSnapshotTombstone = []byte{0xE2, 0x9B, 0xBC} - PaginationSize = 5000 -) +// SuffixSnapshotTombstone special value for tombstone mark +var SuffixSnapshotTombstone = []byte{0xE2, 0x9B, 0xBC} // IsTombstone used in migration tool also. func IsTombstone(value string) bool { @@ -84,6 +81,8 @@ type SuffixSnapshot struct { // snapshotLen pre calculated offset when parsing snapshot key snapshotLen int + paginationSize int + closeGC chan struct{} } @@ -118,6 +117,7 @@ func NewSuffixSnapshot(metaKV kv.MetaKv, sep, root, snapshot string) (*SuffixSna snapshotLen: snapshotLen, rootPrefix: root, rootLen: rootLen, + paginationSize: paramtable.Get().MetaStoreCfg.PaginationSize.GetAsInt(), closeGC: make(chan struct{}, 1), } go ss.startBackgroundGC(context.TODO()) @@ -449,7 +449,7 @@ func (ss *SuffixSnapshot) LoadWithPrefix(ctx context.Context, key string, ts typ return nil } - err := ss.MetaKv.WalkWithPrefix(ctx, key, PaginationSize, applyFn) + err := ss.MetaKv.WalkWithPrefix(ctx, key, ss.paginationSize, applyFn) return fks, fvs, err } ss.Lock() @@ -472,7 +472,7 @@ func (ss *SuffixSnapshot) LoadWithPrefix(ctx context.Context, key string, ts typ resultValues = append(resultValues, value) } - err := ss.MetaKv.WalkWithPrefix(ctx, prefix, PaginationSize, func(k []byte, v []byte) error { + err := ss.MetaKv.WalkWithPrefix(ctx, prefix, ss.paginationSize, func(k []byte, v []byte) error { sKey := string(k) sValue := string(v) @@ -693,7 +693,7 @@ func (ss *SuffixSnapshot) removeExpiredKvs(ctx context.Context, now time.Time) e } // Walk through all keys with the snapshot prefix - err := ss.MetaKv.WalkWithPrefix(ctx, ss.snapshotPrefix, PaginationSize, func(k []byte, v []byte) error { + err := ss.MetaKv.WalkWithPrefix(ctx, ss.snapshotPrefix, ss.paginationSize, func(k []byte, v []byte) error { key := ss.hideRootPrefix(string(k)) ts, ok := ss.isTSKey(key) if !ok { diff --git a/pkg/util/paramtable/service_param.go b/pkg/util/paramtable/service_param.go index 0a6091a8c87a6..42bff06595e40 100644 --- a/pkg/util/paramtable/service_param.go +++ b/pkg/util/paramtable/service_param.go @@ -458,6 +458,7 @@ type MetaStoreConfig struct { MetaStoreType ParamItem `refreshable:"false"` SnapshotTTLSeconds ParamItem `refreshable:"true"` SnapshotReserveTimeSeconds ParamItem `refreshable:"true"` + PaginationSize ParamItem `refreshable:"true"` } func (p *MetaStoreConfig) Init(base *BaseTable) { @@ -488,6 +489,14 @@ func (p *MetaStoreConfig) Init(base *BaseTable) { } p.SnapshotReserveTimeSeconds.Init(base.mgr) + p.PaginationSize = ParamItem{ + Key: "metastore.paginationSize", + Version: "2.5.1", + DefaultValue: "10000", + Doc: `limits the number of results to return from metastore.`, + } + p.PaginationSize.Init(base.mgr) + // TODO: The initialization operation of metadata storage is called in the initialization phase of every node. // There should be a single initialization operation for meta store, then move the metrics registration to there. metrics.RegisterMetaType(p.MetaStoreType.GetValue()) diff --git a/pkg/util/paramtable/service_param_test.go b/pkg/util/paramtable/service_param_test.go index 2e9c49f6b019c..404ef45e6d0ea 100644 --- a/pkg/util/paramtable/service_param_test.go +++ b/pkg/util/paramtable/service_param_test.go @@ -23,6 +23,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/milvus-io/milvus/pkg/config" + "github.com/milvus-io/milvus/pkg/util" "github.com/milvus-io/milvus/pkg/util/metricsinfo" ) @@ -210,4 +211,13 @@ func TestServiceParam(t *testing.T) { t.Logf("Minio rootpath = %s", Params.RootPath.GetValue()) }) + + t.Run("test metastore config", func(t *testing.T) { + Params := &SParams.MetaStoreCfg + + assert.Equal(t, util.MetaStoreTypeEtcd, Params.MetaStoreType.GetValue()) + assert.Equal(t, 86400*time.Second, Params.SnapshotTTLSeconds.GetAsDuration(time.Second)) + assert.Equal(t, 3600*time.Second, Params.SnapshotReserveTimeSeconds.GetAsDuration(time.Second)) + assert.Equal(t, 10000, Params.PaginationSize.GetAsInt()) + }) }