Skip to content

Commit

Permalink
fix: Fix timeout when listing meta
Browse files Browse the repository at this point in the history
Signed-off-by: bigsheeper <[email protected]>
  • Loading branch information
bigsheeper committed Dec 4, 2024
1 parent febed0a commit 3cb482a
Show file tree
Hide file tree
Showing 4 changed files with 161 additions and 121 deletions.
150 changes: 80 additions & 70 deletions internal/metastore/kv/datacoord/kv_catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -451,23 +451,23 @@ func (kc *Catalog) DropChannel(ctx context.Context, channel string) error {
}

func (kc *Catalog) ListChannelCheckpoint(ctx context.Context) (map[string]*msgpb.MsgPosition, error) {
keys, values, err := kc.MetaKv.LoadWithPrefix(ChannelCheckpointPrefix)
if err != nil {
return nil, err
}

channelCPs := make(map[string]*msgpb.MsgPosition)
for i, key := range keys {
value := values[i]
applyFn := func(key []byte, value []byte) error {
channelCP := &msgpb.MsgPosition{}
err = proto.Unmarshal([]byte(value), channelCP)
err := proto.Unmarshal(value, channelCP)
if err != nil {
log.Error("unmarshal channelCP failed when ListChannelCheckpoint", zap.Error(err))
return nil, err
return err

Check warning on line 460 in internal/metastore/kv/datacoord/kv_catalog.go

View check run for this annotation

Codecov / codecov/patch

internal/metastore/kv/datacoord/kv_catalog.go#L460

Added line #L460 was not covered by tests
}
ss := strings.Split(key, "/")
ss := strings.Split(string(key), "/")
vChannel := ss[len(ss)-1]
channelCPs[vChannel] = channelCP
return nil
}

err := kc.MetaKv.WalkWithPrefix(ChannelCheckpointPrefix, paginationSize, applyFn)
if err != nil {
return nil, err
}

return channelCPs, nil
Expand Down Expand Up @@ -537,24 +537,23 @@ func (kc *Catalog) CreateIndex(ctx context.Context, index *model.Index) error {
}

func (kc *Catalog) ListIndexes(ctx context.Context) ([]*model.Index, error) {
_, values, err := kc.MetaKv.LoadWithPrefix(util.FieldIndexPrefix)
if err != nil {
log.Error("list index meta fail", zap.String("prefix", util.FieldIndexPrefix), zap.Error(err))
return nil, err
}

indexes := make([]*model.Index, 0)
for _, value := range values {
applyFn := func(key []byte, value []byte) error {
meta := &indexpb.FieldIndex{}
err = proto.Unmarshal([]byte(value), meta)
err := proto.Unmarshal(value, meta)
if err != nil {
log.Warn("unmarshal index info failed", zap.Error(err))
return nil, err
return err
}

indexes = append(indexes, model.UnmarshalIndexModel(meta))
return nil
}

err := kc.MetaKv.WalkWithPrefix(util.FieldIndexPrefix, paginationSize, applyFn)
if err != nil {
return nil, err
}
return indexes, nil
}

Expand Down Expand Up @@ -614,22 +613,22 @@ func (kc *Catalog) CreateSegmentIndex(ctx context.Context, segIdx *model.Segment
}

func (kc *Catalog) ListSegmentIndexes(ctx context.Context) ([]*model.SegmentIndex, error) {
_, values, err := kc.MetaKv.LoadWithPrefix(util.SegmentIndexPrefix)
if err != nil {
log.Error("list segment index meta fail", zap.String("prefix", util.SegmentIndexPrefix), zap.Error(err))
return nil, err
}

segIndexes := make([]*model.SegmentIndex, 0)
for _, value := range values {
applyFn := func(key []byte, value []byte) error {
segmentIndexInfo := &indexpb.SegmentIndex{}
err = proto.Unmarshal([]byte(value), segmentIndexInfo)
err := proto.Unmarshal(value, segmentIndexInfo)
if err != nil {
log.Warn("unmarshal segment index info failed", zap.Error(err))
return segIndexes, err
return err
}

segIndexes = append(segIndexes, model.UnmarshalSegmentIndexModel(segmentIndexInfo))
return nil
}

err := kc.MetaKv.WalkWithPrefix(util.SegmentIndexPrefix, paginationSize, applyFn)
if err != nil {
return nil, err
}

return segIndexes, nil
Expand Down Expand Up @@ -671,17 +670,19 @@ func (kc *Catalog) SaveImportJob(job *datapb.ImportJob) error {

func (kc *Catalog) ListImportJobs() ([]*datapb.ImportJob, error) {
jobs := make([]*datapb.ImportJob, 0)
_, values, err := kc.MetaKv.LoadWithPrefix(ImportJobPrefix)
if err != nil {
return nil, err
}
for _, value := range values {
applyFn := func(key []byte, value []byte) error {
job := &datapb.ImportJob{}
err = proto.Unmarshal([]byte(value), job)
err := proto.Unmarshal(value, job)
if err != nil {
return nil, err
return err
}
jobs = append(jobs, job)
return nil
}

err := kc.MetaKv.WalkWithPrefix(ImportJobPrefix, paginationSize, applyFn)
if err != nil {
return nil, err
}
return jobs, nil
}
Expand All @@ -703,19 +704,20 @@ func (kc *Catalog) SavePreImportTask(task *datapb.PreImportTask) error {
func (kc *Catalog) ListPreImportTasks() ([]*datapb.PreImportTask, error) {
tasks := make([]*datapb.PreImportTask, 0)

_, values, err := kc.MetaKv.LoadWithPrefix(PreImportTaskPrefix)
if err != nil {
return nil, err
}
for _, value := range values {
applyFn := func(key []byte, value []byte) error {
task := &datapb.PreImportTask{}
err = proto.Unmarshal([]byte(value), task)
err := proto.Unmarshal(value, task)
if err != nil {
return nil, err
return err
}
tasks = append(tasks, task)
return nil
}

err := kc.MetaKv.WalkWithPrefix(PreImportTaskPrefix, paginationSize, applyFn)
if err != nil {
return nil, err
}
return tasks, nil
}

Expand All @@ -736,17 +738,19 @@ func (kc *Catalog) SaveImportTask(task *datapb.ImportTaskV2) error {
func (kc *Catalog) ListImportTasks() ([]*datapb.ImportTaskV2, error) {
tasks := make([]*datapb.ImportTaskV2, 0)

_, values, err := kc.MetaKv.LoadWithPrefix(ImportTaskPrefix)
if err != nil {
return nil, err
}
for _, value := range values {
applyFn := func(key []byte, value []byte) error {
task := &datapb.ImportTaskV2{}
err = proto.Unmarshal([]byte(value), task)
err := proto.Unmarshal(value, task)
if err != nil {
return nil, err
return err
}
tasks = append(tasks, task)
return nil
}

err := kc.MetaKv.WalkWithPrefix(ImportTaskPrefix, paginationSize, applyFn)
if err != nil {
return nil, err
}
return tasks, nil
}
Expand Down Expand Up @@ -774,17 +778,19 @@ func (kc *Catalog) GcConfirm(ctx context.Context, collectionID, partitionID type
func (kc *Catalog) ListCompactionTask(ctx context.Context) ([]*datapb.CompactionTask, error) {
tasks := make([]*datapb.CompactionTask, 0)

_, values, err := kc.MetaKv.LoadWithPrefix(CompactionTaskPrefix)
if err != nil {
return nil, err
}
for _, value := range values {
applyFn := func(key []byte, value []byte) error {
info := &datapb.CompactionTask{}
err = proto.Unmarshal([]byte(value), info)
err := proto.Unmarshal(value, info)
if err != nil {
return nil, err
return err

Check warning on line 785 in internal/metastore/kv/datacoord/kv_catalog.go

View check run for this annotation

Codecov / codecov/patch

internal/metastore/kv/datacoord/kv_catalog.go#L785

Added line #L785 was not covered by tests
}
tasks = append(tasks, info)
return nil
}

err := kc.MetaKv.WalkWithPrefix(CompactionTaskPrefix, paginationSize, applyFn)
if err != nil {
return nil, err

Check warning on line 793 in internal/metastore/kv/datacoord/kv_catalog.go

View check run for this annotation

Codecov / codecov/patch

internal/metastore/kv/datacoord/kv_catalog.go#L793

Added line #L793 was not covered by tests
}
return tasks, nil
}
Expand All @@ -811,17 +817,19 @@ func (kc *Catalog) DropCompactionTask(ctx context.Context, task *datapb.Compacti
func (kc *Catalog) ListAnalyzeTasks(ctx context.Context) ([]*indexpb.AnalyzeTask, error) {
tasks := make([]*indexpb.AnalyzeTask, 0)

_, values, err := kc.MetaKv.LoadWithPrefix(AnalyzeTaskPrefix)
if err != nil {
return nil, err
}
for _, value := range values {
applyFn := func(key []byte, value []byte) error {
task := &indexpb.AnalyzeTask{}
err = proto.Unmarshal([]byte(value), task)
err := proto.Unmarshal(value, task)

Check warning on line 822 in internal/metastore/kv/datacoord/kv_catalog.go

View check run for this annotation

Codecov / codecov/patch

internal/metastore/kv/datacoord/kv_catalog.go#L822

Added line #L822 was not covered by tests
if err != nil {
return nil, err
return err

Check warning on line 824 in internal/metastore/kv/datacoord/kv_catalog.go

View check run for this annotation

Codecov / codecov/patch

internal/metastore/kv/datacoord/kv_catalog.go#L824

Added line #L824 was not covered by tests
}
tasks = append(tasks, task)
return nil

Check warning on line 827 in internal/metastore/kv/datacoord/kv_catalog.go

View check run for this annotation

Codecov / codecov/patch

internal/metastore/kv/datacoord/kv_catalog.go#L827

Added line #L827 was not covered by tests
}

err := kc.MetaKv.WalkWithPrefix(AnalyzeTaskPrefix, paginationSize, applyFn)
if err != nil {
return nil, err

Check warning on line 832 in internal/metastore/kv/datacoord/kv_catalog.go

View check run for this annotation

Codecov / codecov/patch

internal/metastore/kv/datacoord/kv_catalog.go#L832

Added line #L832 was not covered by tests
}
return tasks, nil
}
Expand Down Expand Up @@ -849,17 +857,19 @@ func (kc *Catalog) DropAnalyzeTask(ctx context.Context, taskID typeutil.UniqueID
func (kc *Catalog) ListPartitionStatsInfos(ctx context.Context) ([]*datapb.PartitionStatsInfo, error) {
infos := make([]*datapb.PartitionStatsInfo, 0)

_, values, err := kc.MetaKv.LoadWithPrefix(PartitionStatsInfoPrefix)
if err != nil {
return nil, err
}
for _, value := range values {
applyFn := func(key []byte, value []byte) error {
info := &datapb.PartitionStatsInfo{}
err = proto.Unmarshal([]byte(value), info)
err := proto.Unmarshal(value, info)

Check warning on line 862 in internal/metastore/kv/datacoord/kv_catalog.go

View check run for this annotation

Codecov / codecov/patch

internal/metastore/kv/datacoord/kv_catalog.go#L862

Added line #L862 was not covered by tests
if err != nil {
return nil, err
return err

Check warning on line 864 in internal/metastore/kv/datacoord/kv_catalog.go

View check run for this annotation

Codecov / codecov/patch

internal/metastore/kv/datacoord/kv_catalog.go#L864

Added line #L864 was not covered by tests
}
infos = append(infos, info)
return nil

Check warning on line 867 in internal/metastore/kv/datacoord/kv_catalog.go

View check run for this annotation

Codecov / codecov/patch

internal/metastore/kv/datacoord/kv_catalog.go#L867

Added line #L867 was not covered by tests
}

err := kc.MetaKv.WalkWithPrefix(PartitionStatsInfoPrefix, paginationSize, applyFn)
if err != nil {
return nil, err

Check warning on line 872 in internal/metastore/kv/datacoord/kv_catalog.go

View check run for this annotation

Codecov / codecov/patch

internal/metastore/kv/datacoord/kv_catalog.go#L872

Added line #L872 was not covered by tests
}
return infos, nil
}
Expand Down
Loading

0 comments on commit 3cb482a

Please sign in to comment.