Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enhance: Add ctx param to KV operation interfaces #38154

Merged
merged 1 commit into from
Dec 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 11 additions & 11 deletions cmd/milvus/mck.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func (c *mck) execute(args []string, flags *flag.FlagSet) {
func (c *mck) run() {
c.connectMinio()

_, values, err := c.metaKV.LoadWithPrefix(segmentPrefix)
_, values, err := c.metaKV.LoadWithPrefix(context.TODO(), segmentPrefix)
if err != nil {
log.Fatal("failed to list the segment info", zap.String("key", segmentPrefix), zap.Error(err))
}
Expand All @@ -121,7 +121,7 @@ func (c *mck) run() {
c.segmentIDMap[info.ID] = info
}

_, values, err = c.metaKV.LoadWithPrefix(collectionPrefix)
_, values, err = c.metaKV.LoadWithPrefix(context.TODO(), collectionPrefix)
if err != nil {
log.Fatal("failed to list the collection info", zap.String("key", collectionPrefix), zap.Error(err))
}
Expand Down Expand Up @@ -150,13 +150,13 @@ func (c *mck) run() {
}
log.Info("partition ids", zap.Int64s("ids", ids))

keys, values, err := c.metaKV.LoadWithPrefix(triggerTaskPrefix)
keys, values, err := c.metaKV.LoadWithPrefix(context.TODO(), triggerTaskPrefix)
if err != nil {
log.Fatal("failed to list the trigger task info", zap.Error(err))
}
c.extractTask(triggerTaskPrefix, keys, values)

keys, values, err = c.metaKV.LoadWithPrefix(activeTaskPrefix)
keys, values, err = c.metaKV.LoadWithPrefix(context.TODO(), activeTaskPrefix)
if err != nil {
log.Fatal("failed to list the active task info", zap.Error(err))
}
Expand Down Expand Up @@ -259,7 +259,7 @@ func getConfigValue(a string, b string, name string) string {
}

func (c *mck) cleanTrash() {
keys, _, err := c.metaKV.LoadWithPrefix(MckTrash)
keys, _, err := c.metaKV.LoadWithPrefix(context.TODO(), MckTrash)
if err != nil {
log.Error("failed to load backup info", zap.Error(err))
return
Expand All @@ -273,7 +273,7 @@ func (c *mck) cleanTrash() {
deleteAll := ""
fmt.Scanln(&deleteAll)
if deleteAll == "Y" {
err = c.metaKV.RemoveWithPrefix(MckTrash)
err = c.metaKV.RemoveWithPrefix(context.TODO(), MckTrash)
if err != nil {
log.Error("failed to remove backup infos", zap.String("key", MckTrash), zap.Error(err))
return
Expand Down Expand Up @@ -395,31 +395,31 @@ func (c *mck) extractTask(prefix string, keys []string, values []string) {
func (c *mck) removeTask(invalidTask int64) bool {
taskType := c.taskNameMap[invalidTask]
key := c.taskKeyMap[invalidTask]
err := c.metaKV.Save(getTrashKey(taskType, key), c.allTaskInfo[key])
err := c.metaKV.Save(context.TODO(), getTrashKey(taskType, key), c.allTaskInfo[key])
if err != nil {
log.Warn("failed to backup task", zap.String("key", getTrashKey(taskType, key)), zap.Int64("task_id", invalidTask), zap.Error(err))
return false
}
fmt.Printf("Back up task successfully, back path: %s\n", getTrashKey(taskType, key))
err = c.metaKV.Remove(key)
err = c.metaKV.Remove(context.TODO(), key)
if err != nil {
log.Warn("failed to remove task", zap.Int64("task_id", invalidTask), zap.Error(err))
return false
}

key = fmt.Sprintf("%s/%d", taskInfoPrefix, invalidTask)
taskInfo, err := c.metaKV.Load(key)
taskInfo, err := c.metaKV.Load(context.TODO(), key)
if err != nil {
log.Warn("failed to load task info", zap.Int64("task_id", invalidTask), zap.Error(err))
return false
}
err = c.metaKV.Save(getTrashKey(taskType, key), taskInfo)
err = c.metaKV.Save(context.TODO(), getTrashKey(taskType, key), taskInfo)
if err != nil {
log.Warn("failed to backup task info", zap.Int64("task_id", invalidTask), zap.Error(err))
return false
}
fmt.Printf("Back up task info successfully, back path: %s\n", getTrashKey(taskType, key))
err = c.metaKV.Remove(key)
err = c.metaKV.Remove(context.TODO(), key)
if err != nil {
log.Warn("failed to remove task info", zap.Int64("task_id", invalidTask), zap.Error(err))
}
Expand Down
3 changes: 2 additions & 1 deletion cmd/tools/datameta/main.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"context"
"flag"
"fmt"
"sort"
Expand Down Expand Up @@ -37,7 +38,7 @@ func main() {

etcdkv := etcdkv.NewEtcdKV(etcdCli, *rootPath)

keys, values, err := etcdkv.LoadWithPrefix("/")
keys, values, err := etcdkv.LoadWithPrefix(context.TODO(), "/")
if err != nil {
log.Fatal("failed to list ", zap.Error(err))
}
Expand Down
4 changes: 3 additions & 1 deletion cmd/tools/migration/backend/etcd.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package backend

import (
"context"

clientv3 "go.etcd.io/etcd/client/v3"

"github.com/milvus-io/milvus/cmd/tools/migration/configs"
Expand All @@ -16,7 +18,7 @@ type etcdBasedBackend struct {
}

func (b etcdBasedBackend) CleanWithPrefix(prefix string) error {
return b.txn.RemoveWithPrefix(prefix)
return b.txn.RemoveWithPrefix(context.TODO(), prefix)
}

func newEtcdBasedBackend(cfg *configs.MilvusConfig) (*etcdBasedBackend, error) {
Expand Down
18 changes: 9 additions & 9 deletions cmd/tools/migration/backend/etcd210.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func newEtcd210(cfg *configs.MilvusConfig) (*etcd210, error) {
func (b etcd210) loadTtAliases() (meta.TtAliasesMeta210, error) {
ttAliases := make(meta.TtAliasesMeta210)
prefix := path.Join(rootcoord.SnapshotPrefix, rootcoord.CollectionAliasMetaPrefix210)
keys, values, err := b.txn.LoadWithPrefix(prefix)
keys, values, err := b.txn.LoadWithPrefix(context.TODO(), prefix)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -74,7 +74,7 @@ func (b etcd210) loadTtAliases() (meta.TtAliasesMeta210, error) {
func (b etcd210) loadAliases() (meta.AliasesMeta210, error) {
aliases := make(meta.AliasesMeta210)
prefix := rootcoord.CollectionAliasMetaPrefix210
keys, values, err := b.txn.LoadWithPrefix(prefix)
keys, values, err := b.txn.LoadWithPrefix(context.TODO(), prefix)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -102,7 +102,7 @@ func (b etcd210) loadAliases() (meta.AliasesMeta210, error) {
func (b etcd210) loadTtCollections() (meta.TtCollectionsMeta210, error) {
ttCollections := make(meta.TtCollectionsMeta210)
prefix := path.Join(rootcoord.SnapshotPrefix, rootcoord.CollectionMetaPrefix)
keys, values, err := b.txn.LoadWithPrefix(prefix)
keys, values, err := b.txn.LoadWithPrefix(context.TODO(), prefix)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -144,7 +144,7 @@ func (b etcd210) loadTtCollections() (meta.TtCollectionsMeta210, error) {
func (b etcd210) loadCollections() (meta.CollectionsMeta210, error) {
collections := make(meta.CollectionsMeta210)
prefix := rootcoord.CollectionMetaPrefix
keys, values, err := b.txn.LoadWithPrefix(prefix)
keys, values, err := b.txn.LoadWithPrefix(context.TODO(), prefix)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -199,7 +199,7 @@ func parseCollectionIndexKey(key string) (collectionID, indexID typeutil.UniqueI
func (b etcd210) loadCollectionIndexes() (meta.CollectionIndexesMeta210, error) {
collectionIndexes := make(meta.CollectionIndexesMeta210)
prefix := legacy.IndexMetaBefore220Prefix
keys, values, err := b.txn.LoadWithPrefix(prefix)
keys, values, err := b.txn.LoadWithPrefix(context.TODO(), prefix)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -227,7 +227,7 @@ func (b etcd210) loadCollectionIndexes() (meta.CollectionIndexesMeta210, error)
func (b etcd210) loadSegmentIndexes() (meta.SegmentIndexesMeta210, error) {
segmentIndexes := make(meta.SegmentIndexesMeta210)
prefix := legacy.SegmentIndexPrefixBefore220
keys, values, err := b.txn.LoadWithPrefix(prefix)
keys, values, err := b.txn.LoadWithPrefix(context.TODO(), prefix)
if err != nil {
return nil, err
}
Expand All @@ -250,7 +250,7 @@ func (b etcd210) loadSegmentIndexes() (meta.SegmentIndexesMeta210, error) {
func (b etcd210) loadIndexBuildMeta() (meta.IndexBuildMeta210, error) {
indexBuildMeta := make(meta.IndexBuildMeta210)
prefix := legacy.IndexBuildPrefixBefore220
keys, values, err := b.txn.LoadWithPrefix(prefix)
keys, values, err := b.txn.LoadWithPrefix(context.TODO(), prefix)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -279,7 +279,7 @@ func (b etcd210) loadLastDDLRecords() (meta.LastDDLRecords, error) {
path.Join(rootcoord.SnapshotPrefix, legacy.DDMsgSendPrefixBefore220),
}
for _, prefix := range prefixes {
keys, values, err := b.txn.LoadWithPrefix(prefix)
keys, values, err := b.txn.LoadWithPrefix(context.TODO(), prefix)
if err != nil {
return nil, err
}
Expand All @@ -295,7 +295,7 @@ func (b etcd210) loadLastDDLRecords() (meta.LastDDLRecords, error) {

func (b etcd210) loadLoadInfos() (meta.CollectionLoadInfo210, error) {
loadInfo := make(meta.CollectionLoadInfo210)
_, collectionValues, err := b.txn.LoadWithPrefix(legacy.CollectionLoadMetaPrefixV1)
_, collectionValues, err := b.txn.LoadWithPrefix(context.TODO(), legacy.CollectionLoadMetaPrefixV1)
if err != nil {
return nil, err
}
Expand Down
3 changes: 2 additions & 1 deletion cmd/tools/migration/backend/etcd220.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package backend

import (
"context"
"fmt"

"github.com/milvus-io/milvus/cmd/tools/migration/configs"
Expand Down Expand Up @@ -36,7 +37,7 @@ func printSaves(saves map[string]string) {

func (b etcd220) save(saves map[string]string) error {
for k, v := range saves {
if err := b.txn.Save(k, v); err != nil {
if err := b.txn.Save(context.TODO(), k, v); err != nil {
return err
}
}
Expand Down
10 changes: 5 additions & 5 deletions internal/datacoord/channel_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ type ChannelManagerSuite struct {
func (s *ChannelManagerSuite) prepareMeta(chNodes map[string]int64, state datapb.ChannelWatchState) {
s.SetupTest()
if chNodes == nil {
s.mockKv.EXPECT().LoadWithPrefix(mock.Anything).Return(nil, nil, nil).Once()
s.mockKv.EXPECT().LoadWithPrefix(mock.Anything, mock.Anything).Return(nil, nil, nil).Once()
return
}
var keys, values []string
Expand All @@ -65,7 +65,7 @@ func (s *ChannelManagerSuite) prepareMeta(chNodes map[string]int64, state datapb
s.Require().NoError(err)
values = append(values, string(bs))
}
s.mockKv.EXPECT().LoadWithPrefix(mock.Anything).Return(keys, values, nil).Once()
s.mockKv.EXPECT().LoadWithPrefix(mock.Anything, mock.Anything).Return(keys, values, nil).Once()
}

func (s *ChannelManagerSuite) checkAssignment(m *ChannelManagerImpl, nodeID int64, channel string, state ChannelState) {
Expand Down Expand Up @@ -104,8 +104,8 @@ func (s *ChannelManagerSuite) SetupTest() {
ChannelName: ch.GetName(),
}
}).Maybe()
s.mockKv.EXPECT().MultiSaveAndRemove(mock.Anything, mock.Anything).RunAndReturn(
func(save map[string]string, removals []string, preds ...predicates.Predicate) error {
s.mockKv.EXPECT().MultiSaveAndRemove(mock.Anything, mock.Anything, mock.Anything).RunAndReturn(
func(ctx context.Context, save map[string]string, removals []string, preds ...predicates.Predicate) error {
log.Info("test save and remove", zap.Any("save", save), zap.Any("removals", removals))
return nil
}).Maybe()
Expand Down Expand Up @@ -723,7 +723,7 @@ func (s *ChannelManagerSuite) TestStartupNilSchema() {
s.Require().NoError(err)
values = append(values, string(bs))
}
s.mockKv.EXPECT().LoadWithPrefix(mock.Anything).Return(keys, values, nil).Once()
s.mockKv.EXPECT().LoadWithPrefix(mock.Anything, mock.Anything).Return(keys, values, nil).Once()
s.mockHandler.EXPECT().CheckShouldDropChannel(mock.Anything).Return(false)
m, err := NewChannelManager(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc)
s.Require().NoError(err)
Expand Down
7 changes: 4 additions & 3 deletions internal/datacoord/channel_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package datacoord

import (
"context"
"fmt"
"math"
"strconv"
Expand Down Expand Up @@ -334,7 +335,7 @@

func (c *StateChannelStore) Reload() error {
record := timerecord.NewTimeRecorder("datacoord")
keys, values, err := c.store.LoadWithPrefix(Params.CommonCfg.DataCoordWatchSubPath.GetValue())
keys, values, err := c.store.LoadWithPrefix(context.TODO(), Params.CommonCfg.DataCoordWatchSubPath.GetValue())
if err != nil {
return err
}
Expand Down Expand Up @@ -592,7 +593,7 @@
saves = lo.Assign(opSaves, saves)
removals = append(removals, opRemovals...)
}
return c.store.MultiSaveAndRemove(saves, removals)
return c.store.MultiSaveAndRemove(context.TODO(), saves, removals)
}

func (c *StateChannelStore) RemoveNode(nodeID int64) {
Expand Down Expand Up @@ -735,5 +736,5 @@
// remove deletes kv pairs from the kv store where keys have given nodeID as prefix.
func (c *StateChannelStore) remove(nodeID int64) error {
k := buildKeyPrefix(nodeID)
return c.store.RemoveWithPrefix(k)
return c.store.RemoveWithPrefix(context.TODO(), k)

Check warning on line 739 in internal/datacoord/channel_store.go

View check run for this annotation

Codecov / codecov/patch

internal/datacoord/channel_store.go#L739

Added line #L739 was not covered by tests
}
15 changes: 8 additions & 7 deletions internal/datacoord/channel_store_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package datacoord

import (
"context"
"fmt"
"strconv"
"testing"
Expand Down Expand Up @@ -242,8 +243,8 @@ func (s *StateChannelStoreSuite) TestUpdateWithTxnLimit() {
for _, test := range tests {
s.SetupTest()
s.Run(test.description, func() {
s.mockTxn.EXPECT().MultiSaveAndRemove(mock.Anything, mock.Anything).
Run(func(saves map[string]string, removals []string, preds ...predicates.Predicate) {
s.mockTxn.EXPECT().MultiSaveAndRemove(mock.Anything, mock.Anything, mock.Anything).
Run(func(ctx context.Context, saves map[string]string, removals []string, preds ...predicates.Predicate) {
log.Info("test save and remove", zap.Any("saves", saves), zap.Any("removals", removals))
}).Return(nil).Times(test.outTxnCount)

Expand Down Expand Up @@ -276,8 +277,8 @@ func (s *StateChannelStoreSuite) TestUpdateMeta2000kSegs() {
NewChannelOp(100, Watch, ch),
)
s.SetupTest()
s.mockTxn.EXPECT().MultiSaveAndRemove(mock.Anything, mock.Anything).
Run(func(saves map[string]string, removals []string, preds ...predicates.Predicate) {
s.mockTxn.EXPECT().MultiSaveAndRemove(mock.Anything, mock.Anything, mock.Anything).
Run(func(ctx context.Context, saves map[string]string, removals []string, preds ...predicates.Predicate) {
}).Return(nil).Once()

store := NewStateChannelStore(s.mockTxn)
Expand Down Expand Up @@ -381,8 +382,8 @@ func (s *StateChannelStoreSuite) TestUpdateMeta() {
},
}
s.SetupTest()
s.mockTxn.EXPECT().MultiSaveAndRemove(mock.Anything, mock.Anything).
Run(func(saves map[string]string, removals []string, preds ...predicates.Predicate) {
s.mockTxn.EXPECT().MultiSaveAndRemove(mock.Anything, mock.Anything, mock.Anything).
Run(func(ctx context.Context, saves map[string]string, removals []string, preds ...predicates.Predicate) {
}).Return(nil).Times(len(tests))

for _, test := range tests {
Expand Down Expand Up @@ -495,7 +496,7 @@ func (s *StateChannelStoreSuite) TestReload() {
s.Require().NoError(err)
values = append(values, string(bs))
}
s.mockTxn.EXPECT().LoadWithPrefix(mock.AnythingOfType("string")).Return(keys, values, nil)
s.mockTxn.EXPECT().LoadWithPrefix(mock.Anything, mock.AnythingOfType("string")).Return(keys, values, nil)

store := NewStateChannelStore(s.mockTxn)
err := store.Reload()
Expand Down
16 changes: 8 additions & 8 deletions internal/datacoord/index_meta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -566,9 +566,9 @@ func TestMeta_GetIndexIDByName(t *testing.T) {
}
)
metakv := mockkv.NewMetaKv(t)
metakv.EXPECT().Save(mock.Anything, mock.Anything).Return(errors.New("failed")).Maybe()
metakv.EXPECT().MultiSave(mock.Anything).Return(errors.New("failed")).Maybe()
metakv.EXPECT().LoadWithPrefix(mock.Anything).Return(nil, nil, nil).Maybe()
metakv.EXPECT().Save(mock.Anything, mock.Anything, mock.Anything).Return(errors.New("failed")).Maybe()
metakv.EXPECT().MultiSave(mock.Anything, mock.Anything).Return(errors.New("failed")).Maybe()
metakv.EXPECT().LoadWithPrefix(mock.Anything, mock.Anything).Return(nil, nil, nil).Maybe()

m := newSegmentIndexMeta(&datacoord.Catalog{MetaKv: metakv})
t.Run("no indexes", func(t *testing.T) {
Expand Down Expand Up @@ -621,9 +621,9 @@ func TestMeta_GetSegmentIndexState(t *testing.T) {
}
)
metakv := mockkv.NewMetaKv(t)
metakv.EXPECT().Save(mock.Anything, mock.Anything).Return(errors.New("failed")).Maybe()
metakv.EXPECT().MultiSave(mock.Anything).Return(errors.New("failed")).Maybe()
metakv.EXPECT().LoadWithPrefix(mock.Anything).Return(nil, nil, nil).Maybe()
metakv.EXPECT().Save(mock.Anything, mock.Anything, mock.Anything).Return(errors.New("failed")).Maybe()
metakv.EXPECT().MultiSave(mock.Anything, mock.Anything).Return(errors.New("failed")).Maybe()
metakv.EXPECT().LoadWithPrefix(mock.Anything, mock.Anything).Return(nil, nil, nil).Maybe()

m := newSegmentIndexMeta(&datacoord.Catalog{MetaKv: metakv})
m.segmentIndexes = map[UniqueID]map[UniqueID]*model.SegmentIndex{
Expand Down Expand Up @@ -1280,8 +1280,8 @@ func TestMeta_FinishTask(t *testing.T) {

t.Run("fail", func(t *testing.T) {
metakv := mockkv.NewMetaKv(t)
metakv.EXPECT().Save(mock.Anything, mock.Anything).Return(errors.New("failed")).Maybe()
metakv.EXPECT().MultiSave(mock.Anything).Return(errors.New("failed")).Maybe()
metakv.EXPECT().Save(mock.Anything, mock.Anything, mock.Anything).Return(errors.New("failed")).Maybe()
metakv.EXPECT().MultiSave(mock.Anything, mock.Anything).Return(errors.New("failed")).Maybe()
m.catalog = &datacoord.Catalog{
MetaKv: metakv,
}
Expand Down
4 changes: 2 additions & 2 deletions internal/datacoord/index_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,8 +249,8 @@ func TestServer_CreateIndex(t *testing.T) {

t.Run("save index fail", func(t *testing.T) {
metakv := mockkv.NewMetaKv(t)
metakv.EXPECT().Save(mock.Anything, mock.Anything).Return(errors.New("failed")).Maybe()
metakv.EXPECT().MultiSave(mock.Anything).Return(errors.New("failed")).Maybe()
metakv.EXPECT().Save(mock.Anything, mock.Anything, mock.Anything).Return(errors.New("failed")).Maybe()
metakv.EXPECT().MultiSave(mock.Anything, mock.Anything).Return(errors.New("failed")).Maybe()
s.meta.indexMeta.indexes = map[UniqueID]map[UniqueID]*model.Index{}
s.meta.catalog = &datacoord.Catalog{MetaKv: metakv}
s.meta.indexMeta.catalog = s.meta.catalog
Expand Down
Loading
Loading