Skip to content

Commit

Permalink
refine interfaces for kv operation
Browse files Browse the repository at this point in the history
Signed-off-by: tinswzy <[email protected]>
  • Loading branch information
tinswzy committed Dec 4, 2024
1 parent e359725 commit 0c861ca
Show file tree
Hide file tree
Showing 57 changed files with 2,210 additions and 2,086 deletions.
22 changes: 11 additions & 11 deletions cmd/milvus/mck.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func (c *mck) execute(args []string, flags *flag.FlagSet) {
func (c *mck) run() {
c.connectMinio()

_, values, err := c.metaKV.LoadWithPrefix(segmentPrefix)
_, values, err := c.metaKV.LoadWithPrefix(context.TODO(), segmentPrefix)
if err != nil {
log.Fatal("failed to list the segment info", zap.String("key", segmentPrefix), zap.Error(err))
}
Expand All @@ -121,7 +121,7 @@ func (c *mck) run() {
c.segmentIDMap[info.ID] = info
}

_, values, err = c.metaKV.LoadWithPrefix(collectionPrefix)
_, values, err = c.metaKV.LoadWithPrefix(context.TODO(), collectionPrefix)
if err != nil {
log.Fatal("failed to list the collection info", zap.String("key", collectionPrefix), zap.Error(err))
}
Expand Down Expand Up @@ -150,13 +150,13 @@ func (c *mck) run() {
}
log.Info("partition ids", zap.Int64s("ids", ids))

keys, values, err := c.metaKV.LoadWithPrefix(triggerTaskPrefix)
keys, values, err := c.metaKV.LoadWithPrefix(context.TODO(), triggerTaskPrefix)
if err != nil {
log.Fatal("failed to list the trigger task info", zap.Error(err))
}
c.extractTask(triggerTaskPrefix, keys, values)

keys, values, err = c.metaKV.LoadWithPrefix(activeTaskPrefix)
keys, values, err = c.metaKV.LoadWithPrefix(context.TODO(), activeTaskPrefix)
if err != nil {
log.Fatal("failed to list the active task info", zap.Error(err))
}
Expand Down Expand Up @@ -259,7 +259,7 @@ func getConfigValue(a string, b string, name string) string {
}

func (c *mck) cleanTrash() {
keys, _, err := c.metaKV.LoadWithPrefix(MckTrash)
keys, _, err := c.metaKV.LoadWithPrefix(context.TODO(), MckTrash)
if err != nil {
log.Error("failed to load backup info", zap.Error(err))
return
Expand All @@ -273,7 +273,7 @@ func (c *mck) cleanTrash() {
deleteAll := ""
fmt.Scanln(&deleteAll)
if deleteAll == "Y" {
err = c.metaKV.RemoveWithPrefix(MckTrash)
err = c.metaKV.RemoveWithPrefix(context.TODO(), MckTrash)
if err != nil {
log.Error("failed to remove backup infos", zap.String("key", MckTrash), zap.Error(err))
return
Expand Down Expand Up @@ -395,31 +395,31 @@ func (c *mck) extractTask(prefix string, keys []string, values []string) {
func (c *mck) removeTask(invalidTask int64) bool {
taskType := c.taskNameMap[invalidTask]
key := c.taskKeyMap[invalidTask]
err := c.metaKV.Save(getTrashKey(taskType, key), c.allTaskInfo[key])
err := c.metaKV.Save(context.TODO(), getTrashKey(taskType, key), c.allTaskInfo[key])
if err != nil {
log.Warn("failed to backup task", zap.String("key", getTrashKey(taskType, key)), zap.Int64("task_id", invalidTask), zap.Error(err))
return false
}
fmt.Printf("Back up task successfully, back path: %s\n", getTrashKey(taskType, key))
err = c.metaKV.Remove(key)
err = c.metaKV.Remove(context.TODO(), key)
if err != nil {
log.Warn("failed to remove task", zap.Int64("task_id", invalidTask), zap.Error(err))
return false
}

key = fmt.Sprintf("%s/%d", taskInfoPrefix, invalidTask)
taskInfo, err := c.metaKV.Load(key)
taskInfo, err := c.metaKV.Load(context.TODO(), key)
if err != nil {
log.Warn("failed to load task info", zap.Int64("task_id", invalidTask), zap.Error(err))
return false
}
err = c.metaKV.Save(getTrashKey(taskType, key), taskInfo)
err = c.metaKV.Save(context.TODO(), getTrashKey(taskType, key), taskInfo)
if err != nil {
log.Warn("failed to backup task info", zap.Int64("task_id", invalidTask), zap.Error(err))
return false
}
fmt.Printf("Back up task info successfully, back path: %s\n", getTrashKey(taskType, key))
err = c.metaKV.Remove(key)
err = c.metaKV.Remove(context.TODO(), key)
if err != nil {
log.Warn("failed to remove task info", zap.Int64("task_id", invalidTask), zap.Error(err))
}
Expand Down
3 changes: 2 additions & 1 deletion cmd/tools/datameta/main.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"context"
"flag"
"fmt"
"sort"
Expand Down Expand Up @@ -37,7 +38,7 @@ func main() {

etcdkv := etcdkv.NewEtcdKV(etcdCli, *rootPath)

keys, values, err := etcdkv.LoadWithPrefix("/")
keys, values, err := etcdkv.LoadWithPrefix(context.TODO(), "/")
if err != nil {
log.Fatal("failed to list ", zap.Error(err))
}
Expand Down
4 changes: 3 additions & 1 deletion cmd/tools/migration/backend/etcd.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package backend

import (
"context"

clientv3 "go.etcd.io/etcd/client/v3"

"github.com/milvus-io/milvus/cmd/tools/migration/configs"
Expand All @@ -16,7 +18,7 @@ type etcdBasedBackend struct {
}

func (b etcdBasedBackend) CleanWithPrefix(prefix string) error {
return b.txn.RemoveWithPrefix(prefix)
return b.txn.RemoveWithPrefix(context.TODO(), prefix)
}

func newEtcdBasedBackend(cfg *configs.MilvusConfig) (*etcdBasedBackend, error) {
Expand Down
18 changes: 9 additions & 9 deletions cmd/tools/migration/backend/etcd210.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func newEtcd210(cfg *configs.MilvusConfig) (*etcd210, error) {
func (b etcd210) loadTtAliases() (meta.TtAliasesMeta210, error) {
ttAliases := make(meta.TtAliasesMeta210)
prefix := path.Join(rootcoord.SnapshotPrefix, rootcoord.CollectionAliasMetaPrefix210)
keys, values, err := b.txn.LoadWithPrefix(prefix)
keys, values, err := b.txn.LoadWithPrefix(context.TODO(), prefix)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -74,7 +74,7 @@ func (b etcd210) loadTtAliases() (meta.TtAliasesMeta210, error) {
func (b etcd210) loadAliases() (meta.AliasesMeta210, error) {
aliases := make(meta.AliasesMeta210)
prefix := rootcoord.CollectionAliasMetaPrefix210
keys, values, err := b.txn.LoadWithPrefix(prefix)
keys, values, err := b.txn.LoadWithPrefix(context.TODO(), prefix)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -102,7 +102,7 @@ func (b etcd210) loadAliases() (meta.AliasesMeta210, error) {
func (b etcd210) loadTtCollections() (meta.TtCollectionsMeta210, error) {
ttCollections := make(meta.TtCollectionsMeta210)
prefix := path.Join(rootcoord.SnapshotPrefix, rootcoord.CollectionMetaPrefix)
keys, values, err := b.txn.LoadWithPrefix(prefix)
keys, values, err := b.txn.LoadWithPrefix(context.TODO(), prefix)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -144,7 +144,7 @@ func (b etcd210) loadTtCollections() (meta.TtCollectionsMeta210, error) {
func (b etcd210) loadCollections() (meta.CollectionsMeta210, error) {
collections := make(meta.CollectionsMeta210)
prefix := rootcoord.CollectionMetaPrefix
keys, values, err := b.txn.LoadWithPrefix(prefix)
keys, values, err := b.txn.LoadWithPrefix(context.TODO(), prefix)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -199,7 +199,7 @@ func parseCollectionIndexKey(key string) (collectionID, indexID typeutil.UniqueI
func (b etcd210) loadCollectionIndexes() (meta.CollectionIndexesMeta210, error) {
collectionIndexes := make(meta.CollectionIndexesMeta210)
prefix := legacy.IndexMetaBefore220Prefix
keys, values, err := b.txn.LoadWithPrefix(prefix)
keys, values, err := b.txn.LoadWithPrefix(context.TODO(), prefix)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -227,7 +227,7 @@ func (b etcd210) loadCollectionIndexes() (meta.CollectionIndexesMeta210, error)
func (b etcd210) loadSegmentIndexes() (meta.SegmentIndexesMeta210, error) {
segmentIndexes := make(meta.SegmentIndexesMeta210)
prefix := legacy.SegmentIndexPrefixBefore220
keys, values, err := b.txn.LoadWithPrefix(prefix)
keys, values, err := b.txn.LoadWithPrefix(context.TODO(), prefix)
if err != nil {
return nil, err
}
Expand All @@ -250,7 +250,7 @@ func (b etcd210) loadSegmentIndexes() (meta.SegmentIndexesMeta210, error) {
func (b etcd210) loadIndexBuildMeta() (meta.IndexBuildMeta210, error) {
indexBuildMeta := make(meta.IndexBuildMeta210)
prefix := legacy.IndexBuildPrefixBefore220
keys, values, err := b.txn.LoadWithPrefix(prefix)
keys, values, err := b.txn.LoadWithPrefix(context.TODO(), prefix)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -279,7 +279,7 @@ func (b etcd210) loadLastDDLRecords() (meta.LastDDLRecords, error) {
path.Join(rootcoord.SnapshotPrefix, legacy.DDMsgSendPrefixBefore220),
}
for _, prefix := range prefixes {
keys, values, err := b.txn.LoadWithPrefix(prefix)
keys, values, err := b.txn.LoadWithPrefix(context.TODO(), prefix)
if err != nil {
return nil, err
}
Expand All @@ -295,7 +295,7 @@ func (b etcd210) loadLastDDLRecords() (meta.LastDDLRecords, error) {

func (b etcd210) loadLoadInfos() (meta.CollectionLoadInfo210, error) {
loadInfo := make(meta.CollectionLoadInfo210)
_, collectionValues, err := b.txn.LoadWithPrefix(legacy.CollectionLoadMetaPrefixV1)
_, collectionValues, err := b.txn.LoadWithPrefix(context.TODO(), legacy.CollectionLoadMetaPrefixV1)
if err != nil {
return nil, err
}
Expand Down
3 changes: 2 additions & 1 deletion cmd/tools/migration/backend/etcd220.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package backend

import (
"context"
"fmt"

"github.com/milvus-io/milvus/cmd/tools/migration/configs"
Expand Down Expand Up @@ -36,7 +37,7 @@ func printSaves(saves map[string]string) {

func (b etcd220) save(saves map[string]string) error {
for k, v := range saves {
if err := b.txn.Save(k, v); err != nil {
if err := b.txn.Save(context.TODO(), k, v); err != nil {
return err
}
}
Expand Down
10 changes: 5 additions & 5 deletions internal/datacoord/channel_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ type ChannelManagerSuite struct {
func (s *ChannelManagerSuite) prepareMeta(chNodes map[string]int64, state datapb.ChannelWatchState) {
s.SetupTest()
if chNodes == nil {
s.mockKv.EXPECT().LoadWithPrefix(mock.Anything).Return(nil, nil, nil).Once()
s.mockKv.EXPECT().LoadWithPrefix(mock.Anything, mock.Anything).Return(nil, nil, nil).Once()
return
}
var keys, values []string
Expand All @@ -65,7 +65,7 @@ func (s *ChannelManagerSuite) prepareMeta(chNodes map[string]int64, state datapb
s.Require().NoError(err)
values = append(values, string(bs))
}
s.mockKv.EXPECT().LoadWithPrefix(mock.Anything).Return(keys, values, nil).Once()
s.mockKv.EXPECT().LoadWithPrefix(mock.Anything, mock.Anything).Return(keys, values, nil).Once()
}

func (s *ChannelManagerSuite) checkAssignment(m *ChannelManagerImpl, nodeID int64, channel string, state ChannelState) {
Expand Down Expand Up @@ -104,8 +104,8 @@ func (s *ChannelManagerSuite) SetupTest() {
ChannelName: ch.GetName(),
}
}).Maybe()
s.mockKv.EXPECT().MultiSaveAndRemove(mock.Anything, mock.Anything).RunAndReturn(
func(save map[string]string, removals []string, preds ...predicates.Predicate) error {
s.mockKv.EXPECT().MultiSaveAndRemove(mock.Anything, mock.Anything, mock.Anything).RunAndReturn(
func(ctx context.Context, save map[string]string, removals []string, preds ...predicates.Predicate) error {
log.Info("test save and remove", zap.Any("save", save), zap.Any("removals", removals))
return nil
}).Maybe()
Expand Down Expand Up @@ -723,7 +723,7 @@ func (s *ChannelManagerSuite) TestStartupNilSchema() {
s.Require().NoError(err)
values = append(values, string(bs))
}
s.mockKv.EXPECT().LoadWithPrefix(mock.Anything).Return(keys, values, nil).Once()
s.mockKv.EXPECT().LoadWithPrefix(mock.Anything, mock.Anything).Return(keys, values, nil).Once()
s.mockHandler.EXPECT().CheckShouldDropChannel(mock.Anything).Return(false)
m, err := NewChannelManager(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc)
s.Require().NoError(err)
Expand Down
7 changes: 4 additions & 3 deletions internal/datacoord/channel_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package datacoord

import (
"context"
"fmt"
"math"
"strconv"
Expand Down Expand Up @@ -334,7 +335,7 @@ func NewStateChannelStore(kv kv.TxnKV) *StateChannelStore {

func (c *StateChannelStore) Reload() error {
record := timerecord.NewTimeRecorder("datacoord")
keys, values, err := c.store.LoadWithPrefix(Params.CommonCfg.DataCoordWatchSubPath.GetValue())
keys, values, err := c.store.LoadWithPrefix(context.TODO(), Params.CommonCfg.DataCoordWatchSubPath.GetValue())
if err != nil {
return err
}
Expand Down Expand Up @@ -592,7 +593,7 @@ func (c *StateChannelStore) txn(opSet *ChannelOpSet) error {
saves = lo.Assign(opSaves, saves)
removals = append(removals, opRemovals...)
}
return c.store.MultiSaveAndRemove(saves, removals)
return c.store.MultiSaveAndRemove(context.TODO(), saves, removals)
}

func (c *StateChannelStore) RemoveNode(nodeID int64) {
Expand Down Expand Up @@ -735,5 +736,5 @@ func (c *StateChannelStore) GetNodes() []int64 {
// remove deletes kv pairs from the kv store where keys have given nodeID as prefix.
func (c *StateChannelStore) remove(nodeID int64) error {
k := buildKeyPrefix(nodeID)
return c.store.RemoveWithPrefix(k)
return c.store.RemoveWithPrefix(context.TODO(), k)
}
15 changes: 8 additions & 7 deletions internal/datacoord/channel_store_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package datacoord

import (
"context"
"fmt"
"strconv"
"testing"
Expand Down Expand Up @@ -242,8 +243,8 @@ func (s *StateChannelStoreSuite) TestUpdateWithTxnLimit() {
for _, test := range tests {
s.SetupTest()
s.Run(test.description, func() {
s.mockTxn.EXPECT().MultiSaveAndRemove(mock.Anything, mock.Anything).
Run(func(saves map[string]string, removals []string, preds ...predicates.Predicate) {
s.mockTxn.EXPECT().MultiSaveAndRemove(mock.Anything, mock.Anything, mock.Anything).
Run(func(ctx context.Context, saves map[string]string, removals []string, preds ...predicates.Predicate) {
log.Info("test save and remove", zap.Any("saves", saves), zap.Any("removals", removals))
}).Return(nil).Times(test.outTxnCount)

Expand Down Expand Up @@ -276,8 +277,8 @@ func (s *StateChannelStoreSuite) TestUpdateMeta2000kSegs() {
NewChannelOp(100, Watch, ch),
)
s.SetupTest()
s.mockTxn.EXPECT().MultiSaveAndRemove(mock.Anything, mock.Anything).
Run(func(saves map[string]string, removals []string, preds ...predicates.Predicate) {
s.mockTxn.EXPECT().MultiSaveAndRemove(mock.Anything, mock.Anything, mock.Anything).
Run(func(ctx context.Context, saves map[string]string, removals []string, preds ...predicates.Predicate) {
}).Return(nil).Once()

store := NewStateChannelStore(s.mockTxn)
Expand Down Expand Up @@ -381,8 +382,8 @@ func (s *StateChannelStoreSuite) TestUpdateMeta() {
},
}
s.SetupTest()
s.mockTxn.EXPECT().MultiSaveAndRemove(mock.Anything, mock.Anything).
Run(func(saves map[string]string, removals []string, preds ...predicates.Predicate) {
s.mockTxn.EXPECT().MultiSaveAndRemove(mock.Anything, mock.Anything, mock.Anything).
Run(func(ctx context.Context, saves map[string]string, removals []string, preds ...predicates.Predicate) {
}).Return(nil).Times(len(tests))

for _, test := range tests {
Expand Down Expand Up @@ -495,7 +496,7 @@ func (s *StateChannelStoreSuite) TestReload() {
s.Require().NoError(err)
values = append(values, string(bs))
}
s.mockTxn.EXPECT().LoadWithPrefix(mock.AnythingOfType("string")).Return(keys, values, nil)
s.mockTxn.EXPECT().LoadWithPrefix(mock.Anything, mock.AnythingOfType("string")).Return(keys, values, nil)

store := NewStateChannelStore(s.mockTxn)
err := store.Reload()
Expand Down
16 changes: 8 additions & 8 deletions internal/datacoord/index_meta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -566,9 +566,9 @@ func TestMeta_GetIndexIDByName(t *testing.T) {
}
)
metakv := mockkv.NewMetaKv(t)
metakv.EXPECT().Save(mock.Anything, mock.Anything).Return(errors.New("failed")).Maybe()
metakv.EXPECT().MultiSave(mock.Anything).Return(errors.New("failed")).Maybe()
metakv.EXPECT().LoadWithPrefix(mock.Anything).Return(nil, nil, nil).Maybe()
metakv.EXPECT().Save(mock.Anything, mock.Anything, mock.Anything).Return(errors.New("failed")).Maybe()
metakv.EXPECT().MultiSave(mock.Anything, mock.Anything).Return(errors.New("failed")).Maybe()
metakv.EXPECT().LoadWithPrefix(mock.Anything, mock.Anything).Return(nil, nil, nil).Maybe()

m := newSegmentIndexMeta(&datacoord.Catalog{MetaKv: metakv})
t.Run("no indexes", func(t *testing.T) {
Expand Down Expand Up @@ -621,9 +621,9 @@ func TestMeta_GetSegmentIndexState(t *testing.T) {
}
)
metakv := mockkv.NewMetaKv(t)
metakv.EXPECT().Save(mock.Anything, mock.Anything).Return(errors.New("failed")).Maybe()
metakv.EXPECT().MultiSave(mock.Anything).Return(errors.New("failed")).Maybe()
metakv.EXPECT().LoadWithPrefix(mock.Anything).Return(nil, nil, nil).Maybe()
metakv.EXPECT().Save(mock.Anything, mock.Anything, mock.Anything).Return(errors.New("failed")).Maybe()
metakv.EXPECT().MultiSave(mock.Anything, mock.Anything).Return(errors.New("failed")).Maybe()
metakv.EXPECT().LoadWithPrefix(mock.Anything, mock.Anything).Return(nil, nil, nil).Maybe()

m := newSegmentIndexMeta(&datacoord.Catalog{MetaKv: metakv})
m.segmentIndexes = map[UniqueID]map[UniqueID]*model.SegmentIndex{
Expand Down Expand Up @@ -1280,8 +1280,8 @@ func TestMeta_FinishTask(t *testing.T) {

t.Run("fail", func(t *testing.T) {
metakv := mockkv.NewMetaKv(t)
metakv.EXPECT().Save(mock.Anything, mock.Anything).Return(errors.New("failed")).Maybe()
metakv.EXPECT().MultiSave(mock.Anything).Return(errors.New("failed")).Maybe()
metakv.EXPECT().Save(mock.Anything, mock.Anything, mock.Anything).Return(errors.New("failed")).Maybe()
metakv.EXPECT().MultiSave(mock.Anything, mock.Anything).Return(errors.New("failed")).Maybe()
m.catalog = &datacoord.Catalog{
MetaKv: metakv,
}
Expand Down
4 changes: 2 additions & 2 deletions internal/datacoord/index_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,8 +249,8 @@ func TestServer_CreateIndex(t *testing.T) {

t.Run("save index fail", func(t *testing.T) {
metakv := mockkv.NewMetaKv(t)
metakv.EXPECT().Save(mock.Anything, mock.Anything).Return(errors.New("failed")).Maybe()
metakv.EXPECT().MultiSave(mock.Anything).Return(errors.New("failed")).Maybe()
metakv.EXPECT().Save(mock.Anything, mock.Anything, mock.Anything).Return(errors.New("failed")).Maybe()
metakv.EXPECT().MultiSave(mock.Anything, mock.Anything).Return(errors.New("failed")).Maybe()
s.meta.indexMeta.indexes = map[UniqueID]map[UniqueID]*model.Index{}
s.meta.catalog = &datacoord.Catalog{MetaKv: metakv}
s.meta.indexMeta.catalog = s.meta.catalog
Expand Down
Loading

0 comments on commit 0c861ca

Please sign in to comment.