Skip to content

Commit

Permalink
refine interfaces for kv operation
Browse files Browse the repository at this point in the history
Signed-off-by: tinswzy <[email protected]>
  • Loading branch information
tinswzy committed Dec 3, 2024
1 parent e359725 commit cf0456f
Show file tree
Hide file tree
Showing 36 changed files with 2,044 additions and 1,930 deletions.
22 changes: 11 additions & 11 deletions cmd/milvus/mck.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func (c *mck) execute(args []string, flags *flag.FlagSet) {
func (c *mck) run() {
c.connectMinio()

_, values, err := c.metaKV.LoadWithPrefix(segmentPrefix)
_, values, err := c.metaKV.LoadWithPrefix(context.TODO(), segmentPrefix)
if err != nil {
log.Fatal("failed to list the segment info", zap.String("key", segmentPrefix), zap.Error(err))
}
Expand All @@ -121,7 +121,7 @@ func (c *mck) run() {
c.segmentIDMap[info.ID] = info
}

_, values, err = c.metaKV.LoadWithPrefix(collectionPrefix)
_, values, err = c.metaKV.LoadWithPrefix(context.TODO(), collectionPrefix)
if err != nil {
log.Fatal("failed to list the collection info", zap.String("key", collectionPrefix), zap.Error(err))
}
Expand Down Expand Up @@ -150,13 +150,13 @@ func (c *mck) run() {
}
log.Info("partition ids", zap.Int64s("ids", ids))

keys, values, err := c.metaKV.LoadWithPrefix(triggerTaskPrefix)
keys, values, err := c.metaKV.LoadWithPrefix(context.TODO(), triggerTaskPrefix)
if err != nil {
log.Fatal("failed to list the trigger task info", zap.Error(err))
}
c.extractTask(triggerTaskPrefix, keys, values)

keys, values, err = c.metaKV.LoadWithPrefix(activeTaskPrefix)
keys, values, err = c.metaKV.LoadWithPrefix(context.TODO(), activeTaskPrefix)
if err != nil {
log.Fatal("failed to list the active task info", zap.Error(err))
}
Expand Down Expand Up @@ -259,7 +259,7 @@ func getConfigValue(a string, b string, name string) string {
}

func (c *mck) cleanTrash() {
keys, _, err := c.metaKV.LoadWithPrefix(MckTrash)
keys, _, err := c.metaKV.LoadWithPrefix(context.TODO(), MckTrash)
if err != nil {
log.Error("failed to load backup info", zap.Error(err))
return
Expand All @@ -273,7 +273,7 @@ func (c *mck) cleanTrash() {
deleteAll := ""
fmt.Scanln(&deleteAll)
if deleteAll == "Y" {
err = c.metaKV.RemoveWithPrefix(MckTrash)
err = c.metaKV.RemoveWithPrefix(context.TODO(), MckTrash)
if err != nil {
log.Error("failed to remove backup infos", zap.String("key", MckTrash), zap.Error(err))
return
Expand Down Expand Up @@ -395,31 +395,31 @@ func (c *mck) extractTask(prefix string, keys []string, values []string) {
func (c *mck) removeTask(invalidTask int64) bool {
taskType := c.taskNameMap[invalidTask]
key := c.taskKeyMap[invalidTask]
err := c.metaKV.Save(getTrashKey(taskType, key), c.allTaskInfo[key])
err := c.metaKV.Save(context.TODO(), getTrashKey(taskType, key), c.allTaskInfo[key])
if err != nil {
log.Warn("failed to backup task", zap.String("key", getTrashKey(taskType, key)), zap.Int64("task_id", invalidTask), zap.Error(err))
return false
}
fmt.Printf("Back up task successfully, back path: %s\n", getTrashKey(taskType, key))
err = c.metaKV.Remove(key)
err = c.metaKV.Remove(context.TODO(), key)
if err != nil {
log.Warn("failed to remove task", zap.Int64("task_id", invalidTask), zap.Error(err))
return false
}

key = fmt.Sprintf("%s/%d", taskInfoPrefix, invalidTask)
taskInfo, err := c.metaKV.Load(key)
taskInfo, err := c.metaKV.Load(context.TODO(), key)
if err != nil {
log.Warn("failed to load task info", zap.Int64("task_id", invalidTask), zap.Error(err))
return false
}
err = c.metaKV.Save(getTrashKey(taskType, key), taskInfo)
err = c.metaKV.Save(context.TODO(), getTrashKey(taskType, key), taskInfo)
if err != nil {
log.Warn("failed to backup task info", zap.Int64("task_id", invalidTask), zap.Error(err))
return false
}
fmt.Printf("Back up task info successfully, back path: %s\n", getTrashKey(taskType, key))
err = c.metaKV.Remove(key)
err = c.metaKV.Remove(context.TODO(), key)
if err != nil {
log.Warn("failed to remove task info", zap.Int64("task_id", invalidTask), zap.Error(err))
}
Expand Down
3 changes: 2 additions & 1 deletion internal/kv/etcd/embed_etcd_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package etcdkv_test

import (
"context"
"os"
"testing"

Expand Down Expand Up @@ -49,7 +50,7 @@ func TestEtcdConfigLoad(te *testing.T) {
require.NoError(t, err)

defer metaKv.Close()
defer metaKv.RemoveWithPrefix("")
defer metaKv.RemoveWithPrefix(context.TODO(), "")

kv := metaKv.(*embed_etcd_kv.EmbedEtcdKV)
assert.Equal(t, kv.GetConfig().SnapshotCount, uint64(1000))
Expand Down
56 changes: 28 additions & 28 deletions internal/kv/etcd/embed_etcd_kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func (kv *EmbedEtcdKV) GetPath(key string) string {
return path.Join(kv.rootPath, key)
}

func (kv *EmbedEtcdKV) WalkWithPrefix(prefix string, paginationSize int, fn func([]byte, []byte) error) error {
func (kv *EmbedEtcdKV) WalkWithPrefix(ctx context.Context, prefix string, paginationSize int, fn func([]byte, []byte) error) error {
prefix = path.Join(kv.rootPath, prefix)
ctx, cancel := context.WithTimeout(context.TODO(), kv.requestTimeout)
defer cancel()
Expand Down Expand Up @@ -158,7 +158,7 @@ func (kv *EmbedEtcdKV) WalkWithPrefix(prefix string, paginationSize int, fn func
}

// LoadWithPrefix returns all the keys and values with the given key prefix
func (kv *EmbedEtcdKV) LoadWithPrefix(key string) ([]string, []string, error) {
func (kv *EmbedEtcdKV) LoadWithPrefix(ctx context.Context, key string) ([]string, []string, error) {
key = path.Join(kv.rootPath, key)
log.Debug("LoadWithPrefix ", zap.String("prefix", key))
ctx, cancel := context.WithTimeout(context.TODO(), kv.requestTimeout)
Expand All @@ -178,7 +178,7 @@ func (kv *EmbedEtcdKV) LoadWithPrefix(key string) ([]string, []string, error) {
return keys, values, nil
}

func (kv *EmbedEtcdKV) Has(key string) (bool, error) {
func (kv *EmbedEtcdKV) Has(ctx context.Context, key string) (bool, error) {
key = path.Join(kv.rootPath, key)
log.Debug("Has", zap.String("key", key))
ctx, cancel := context.WithTimeout(context.TODO(), kv.requestTimeout)
Expand All @@ -190,7 +190,7 @@ func (kv *EmbedEtcdKV) Has(key string) (bool, error) {
return resp.Count != 0, nil
}

func (kv *EmbedEtcdKV) HasPrefix(prefix string) (bool, error) {
func (kv *EmbedEtcdKV) HasPrefix(ctx context.Context, prefix string) (bool, error) {
prefix = path.Join(kv.rootPath, prefix)
log.Debug("HasPrefix", zap.String("prefix", prefix))

Expand All @@ -206,7 +206,7 @@ func (kv *EmbedEtcdKV) HasPrefix(prefix string) (bool, error) {
}

// LoadBytesWithPrefix returns all the keys and values with the given key prefix
func (kv *EmbedEtcdKV) LoadBytesWithPrefix(key string) ([]string, [][]byte, error) {
func (kv *EmbedEtcdKV) LoadBytesWithPrefix(ctx context.Context, key string) ([]string, [][]byte, error) {
key = path.Join(kv.rootPath, key)
log.Debug("LoadBytesWithPrefix ", zap.String("prefix", key))
ctx, cancel := context.WithTimeout(context.TODO(), kv.requestTimeout)
Expand All @@ -226,7 +226,7 @@ func (kv *EmbedEtcdKV) LoadBytesWithPrefix(key string) ([]string, [][]byte, erro
}

// LoadBytesWithPrefix2 returns all the keys and values with versions by the given key prefix
func (kv *EmbedEtcdKV) LoadBytesWithPrefix2(key string) ([]string, [][]byte, []int64, error) {
func (kv *EmbedEtcdKV) LoadBytesWithPrefix2(ctx context.Context, key string) ([]string, [][]byte, []int64, error) {
key = path.Join(kv.rootPath, key)
log.Debug("LoadBytesWithPrefix2 ", zap.String("prefix", key))
ctx, cancel := context.WithTimeout(context.TODO(), kv.requestTimeout)
Expand All @@ -248,7 +248,7 @@ func (kv *EmbedEtcdKV) LoadBytesWithPrefix2(key string) ([]string, [][]byte, []i
}

// Load returns value of the given key
func (kv *EmbedEtcdKV) Load(key string) (string, error) {
func (kv *EmbedEtcdKV) Load(ctx context.Context, key string) (string, error) {
key = path.Join(kv.rootPath, key)
ctx, cancel := context.WithTimeout(context.TODO(), kv.requestTimeout)
defer cancel()
Expand All @@ -264,7 +264,7 @@ func (kv *EmbedEtcdKV) Load(key string) (string, error) {
}

// LoadBytes returns value of the given key
func (kv *EmbedEtcdKV) LoadBytes(key string) ([]byte, error) {
func (kv *EmbedEtcdKV) LoadBytes(ctx context.Context, key string) ([]byte, error) {
key = path.Join(kv.rootPath, key)
ctx, cancel := context.WithTimeout(context.TODO(), kv.requestTimeout)
defer cancel()
Expand All @@ -280,7 +280,7 @@ func (kv *EmbedEtcdKV) LoadBytes(key string) ([]byte, error) {
}

// MultiLoad returns values of a set of keys
func (kv *EmbedEtcdKV) MultiLoad(keys []string) ([]string, error) {
func (kv *EmbedEtcdKV) MultiLoad(ctx context.Context, keys []string) ([]string, error) {
ops := make([]clientv3.Op, 0, len(keys))
for _, keyLoad := range keys {
ops = append(ops, clientv3.OpGet(path.Join(kv.rootPath, keyLoad)))
Expand Down Expand Up @@ -316,7 +316,7 @@ func (kv *EmbedEtcdKV) MultiLoad(keys []string) ([]string, error) {
}

// MultiLoadBytes returns values of a set of keys
func (kv *EmbedEtcdKV) MultiLoadBytes(keys []string) ([][]byte, error) {
func (kv *EmbedEtcdKV) MultiLoadBytes(ctx context.Context, keys []string) ([][]byte, error) {
ops := make([]clientv3.Op, 0, len(keys))
for _, keyLoad := range keys {
ops = append(ops, clientv3.OpGet(path.Join(kv.rootPath, keyLoad)))
Expand Down Expand Up @@ -352,7 +352,7 @@ func (kv *EmbedEtcdKV) MultiLoadBytes(keys []string) ([][]byte, error) {
}

// LoadBytesWithRevision returns keys, values and revision with given key prefix.
func (kv *EmbedEtcdKV) LoadBytesWithRevision(key string) ([]string, [][]byte, int64, error) {
func (kv *EmbedEtcdKV) LoadBytesWithRevision(ctx context.Context, key string) ([]string, [][]byte, int64, error) {
key = path.Join(kv.rootPath, key)
log.Debug("LoadBytesWithRevision ", zap.String("prefix", key))
ctx, cancel := context.WithTimeout(context.TODO(), kv.requestTimeout)
Expand All @@ -372,7 +372,7 @@ func (kv *EmbedEtcdKV) LoadBytesWithRevision(key string) ([]string, [][]byte, in
}

// Save saves the key-value pair.
func (kv *EmbedEtcdKV) Save(key, value string) error {
func (kv *EmbedEtcdKV) Save(ctx context.Context, key, value string) error {
key = path.Join(kv.rootPath, key)
ctx, cancel := context.WithTimeout(context.TODO(), kv.requestTimeout)
defer cancel()
Expand All @@ -381,7 +381,7 @@ func (kv *EmbedEtcdKV) Save(key, value string) error {
}

// SaveBytes saves the key-value pair.
func (kv *EmbedEtcdKV) SaveBytes(key string, value []byte) error {
func (kv *EmbedEtcdKV) SaveBytes(ctx context.Context, key string, value []byte) error {
key = path.Join(kv.rootPath, key)
ctx, cancel := context.WithTimeout(context.TODO(), kv.requestTimeout)
defer cancel()
Expand All @@ -390,7 +390,7 @@ func (kv *EmbedEtcdKV) SaveBytes(key string, value []byte) error {
}

// SaveBytesWithLease is a function to put value in etcd with etcd lease options.
func (kv *EmbedEtcdKV) SaveBytesWithLease(key string, value []byte, id clientv3.LeaseID) error {
func (kv *EmbedEtcdKV) SaveBytesWithLease(ctx context.Context, key string, value []byte, id clientv3.LeaseID) error {
key = path.Join(kv.rootPath, key)
ctx, cancel := context.WithTimeout(context.TODO(), kv.requestTimeout)
defer cancel()
Expand All @@ -399,7 +399,7 @@ func (kv *EmbedEtcdKV) SaveBytesWithLease(key string, value []byte, id clientv3.
}

// MultiSave saves the key-value pairs in a transaction.
func (kv *EmbedEtcdKV) MultiSave(kvs map[string]string) error {
func (kv *EmbedEtcdKV) MultiSave(ctx context.Context, kvs map[string]string) error {
ops := make([]clientv3.Op, 0, len(kvs))
for key, value := range kvs {
ops = append(ops, clientv3.OpPut(path.Join(kv.rootPath, key), value))
Expand All @@ -413,7 +413,7 @@ func (kv *EmbedEtcdKV) MultiSave(kvs map[string]string) error {
}

// MultiSaveBytes saves the key-value pairs in a transaction.
func (kv *EmbedEtcdKV) MultiSaveBytes(kvs map[string][]byte) error {
func (kv *EmbedEtcdKV) MultiSaveBytes(ctx context.Context, kvs map[string][]byte) error {
ops := make([]clientv3.Op, 0, len(kvs))
for key, value := range kvs {
ops = append(ops, clientv3.OpPut(path.Join(kv.rootPath, key), string(value)))
Expand All @@ -427,7 +427,7 @@ func (kv *EmbedEtcdKV) MultiSaveBytes(kvs map[string][]byte) error {
}

// RemoveWithPrefix removes the keys with given prefix.
func (kv *EmbedEtcdKV) RemoveWithPrefix(prefix string) error {
func (kv *EmbedEtcdKV) RemoveWithPrefix(ctx context.Context, prefix string) error {
key := path.Join(kv.rootPath, prefix)
ctx, cancel := context.WithTimeout(context.TODO(), kv.requestTimeout)
defer cancel()
Expand All @@ -437,7 +437,7 @@ func (kv *EmbedEtcdKV) RemoveWithPrefix(prefix string) error {
}

// Remove removes the key.
func (kv *EmbedEtcdKV) Remove(key string) error {
func (kv *EmbedEtcdKV) Remove(ctx context.Context, key string) error {
key = path.Join(kv.rootPath, key)
ctx, cancel := context.WithTimeout(context.TODO(), kv.requestTimeout)
defer cancel()
Expand All @@ -447,7 +447,7 @@ func (kv *EmbedEtcdKV) Remove(key string) error {
}

// MultiRemove removes the keys in a transaction.
func (kv *EmbedEtcdKV) MultiRemove(keys []string) error {
func (kv *EmbedEtcdKV) MultiRemove(ctx context.Context, keys []string) error {
ops := make([]clientv3.Op, 0, len(keys))
for _, key := range keys {
ops = append(ops, clientv3.OpDelete(path.Join(kv.rootPath, key)))
Expand All @@ -461,7 +461,7 @@ func (kv *EmbedEtcdKV) MultiRemove(keys []string) error {
}

// MultiSaveAndRemove saves the key-value pairs and removes the keys in a transaction.
func (kv *EmbedEtcdKV) MultiSaveAndRemove(saves map[string]string, removals []string, preds ...predicates.Predicate) error {
func (kv *EmbedEtcdKV) MultiSaveAndRemove(ctx context.Context, saves map[string]string, removals []string, preds ...predicates.Predicate) error {
cmps, err := parsePredicates(kv.rootPath, preds...)
if err != nil {
return err
Expand Down Expand Up @@ -491,7 +491,7 @@ func (kv *EmbedEtcdKV) MultiSaveAndRemove(saves map[string]string, removals []st
}

// MultiSaveBytesAndRemove saves the key-value pairs and removes the keys in a transaction.
func (kv *EmbedEtcdKV) MultiSaveBytesAndRemove(saves map[string][]byte, removals []string) error {
func (kv *EmbedEtcdKV) MultiSaveBytesAndRemove(ctx context.Context, saves map[string][]byte, removals []string) error {
ops := make([]clientv3.Op, 0, len(saves)+len(removals))
for key, value := range saves {
ops = append(ops, clientv3.OpPut(path.Join(kv.rootPath, key), string(value)))
Expand All @@ -508,26 +508,26 @@ func (kv *EmbedEtcdKV) MultiSaveBytesAndRemove(saves map[string][]byte, removals
return err
}

func (kv *EmbedEtcdKV) Watch(key string) clientv3.WatchChan {
func (kv *EmbedEtcdKV) Watch(ctx context.Context, key string) clientv3.WatchChan {
key = path.Join(kv.rootPath, key)
rch := kv.client.Watch(context.Background(), key, clientv3.WithCreatedNotify())
return rch
}

func (kv *EmbedEtcdKV) WatchWithPrefix(key string) clientv3.WatchChan {
func (kv *EmbedEtcdKV) WatchWithPrefix(ctx context.Context, key string) clientv3.WatchChan {
key = path.Join(kv.rootPath, key)
rch := kv.client.Watch(context.Background(), key, clientv3.WithPrefix(), clientv3.WithCreatedNotify())
return rch
}

func (kv *EmbedEtcdKV) WatchWithRevision(key string, revision int64) clientv3.WatchChan {
func (kv *EmbedEtcdKV) WatchWithRevision(ctx context.Context, key string, revision int64) clientv3.WatchChan {
key = path.Join(kv.rootPath, key)
rch := kv.client.Watch(context.Background(), key, clientv3.WithPrefix(), clientv3.WithPrevKV(), clientv3.WithRev(revision))
return rch
}

// MultiSaveAndRemoveWithPrefix saves kv in @saves and removes the keys with given prefix in @removals.
func (kv *EmbedEtcdKV) MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string, preds ...predicates.Predicate) error {
func (kv *EmbedEtcdKV) MultiSaveAndRemoveWithPrefix(ctx context.Context, saves map[string]string, removals []string, preds ...predicates.Predicate) error {
cmps, err := parsePredicates(kv.rootPath, preds...)
if err != nil {
return err
Expand Down Expand Up @@ -557,7 +557,7 @@ func (kv *EmbedEtcdKV) MultiSaveAndRemoveWithPrefix(saves map[string]string, rem
}

// MultiSaveBytesAndRemoveWithPrefix saves kv in @saves and removes the keys with given prefix in @removals.
func (kv *EmbedEtcdKV) MultiSaveBytesAndRemoveWithPrefix(saves map[string][]byte, removals []string) error {
func (kv *EmbedEtcdKV) MultiSaveBytesAndRemoveWithPrefix(ctx context.Context, saves map[string][]byte, removals []string) error {
ops := make([]clientv3.Op, 0, len(saves)+len(removals))
for key, value := range saves {
ops = append(ops, clientv3.OpPut(path.Join(kv.rootPath, key), string(value)))
Expand All @@ -576,7 +576,7 @@ func (kv *EmbedEtcdKV) MultiSaveBytesAndRemoveWithPrefix(saves map[string][]byte

// CompareVersionAndSwap compares the existing key-value's version with version, and if
// they are equal, the target is stored in etcd.
func (kv *EmbedEtcdKV) CompareVersionAndSwap(key string, version int64, target string) (bool, error) {
func (kv *EmbedEtcdKV) CompareVersionAndSwap(ctx context.Context, key string, version int64, target string) (bool, error) {
ctx, cancel := context.WithTimeout(context.TODO(), kv.requestTimeout)
defer cancel()
resp, err := kv.client.Txn(ctx).If(
Expand All @@ -593,7 +593,7 @@ func (kv *EmbedEtcdKV) CompareVersionAndSwap(key string, version int64, target s

// CompareVersionAndSwapBytes compares the existing key-value's version with version, and if
// they are equal, the target is stored in etcd.
func (kv *EmbedEtcdKV) CompareVersionAndSwapBytes(key string, version int64, target []byte, opts ...clientv3.OpOption) (bool, error) {
func (kv *EmbedEtcdKV) CompareVersionAndSwapBytes(ctx context.Context, key string, version int64, target []byte, opts ...clientv3.OpOption) (bool, error) {
ctx, cancel := context.WithTimeout(context.TODO(), kv.requestTimeout)
defer cancel()
resp, err := kv.client.Txn(ctx).If(
Expand Down
Loading

0 comments on commit cf0456f

Please sign in to comment.