Skip to content

Commit

Permalink
refine interfaces for kv operation
Browse files Browse the repository at this point in the history
Signed-off-by: tinswzy <[email protected]>
  • Loading branch information
tinswzy committed Dec 4, 2024
1 parent e359725 commit b0042e9
Show file tree
Hide file tree
Showing 36 changed files with 2,108 additions and 1,993 deletions.
22 changes: 11 additions & 11 deletions cmd/milvus/mck.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func (c *mck) execute(args []string, flags *flag.FlagSet) {
func (c *mck) run() {
c.connectMinio()

_, values, err := c.metaKV.LoadWithPrefix(segmentPrefix)
_, values, err := c.metaKV.LoadWithPrefix(context.TODO(), segmentPrefix)
if err != nil {
log.Fatal("failed to list the segment info", zap.String("key", segmentPrefix), zap.Error(err))
}
Expand All @@ -121,7 +121,7 @@ func (c *mck) run() {
c.segmentIDMap[info.ID] = info
}

_, values, err = c.metaKV.LoadWithPrefix(collectionPrefix)
_, values, err = c.metaKV.LoadWithPrefix(context.TODO(), collectionPrefix)
if err != nil {
log.Fatal("failed to list the collection info", zap.String("key", collectionPrefix), zap.Error(err))
}
Expand Down Expand Up @@ -150,13 +150,13 @@ func (c *mck) run() {
}
log.Info("partition ids", zap.Int64s("ids", ids))

keys, values, err := c.metaKV.LoadWithPrefix(triggerTaskPrefix)
keys, values, err := c.metaKV.LoadWithPrefix(context.TODO(), triggerTaskPrefix)
if err != nil {
log.Fatal("failed to list the trigger task info", zap.Error(err))
}
c.extractTask(triggerTaskPrefix, keys, values)

keys, values, err = c.metaKV.LoadWithPrefix(activeTaskPrefix)
keys, values, err = c.metaKV.LoadWithPrefix(context.TODO(), activeTaskPrefix)
if err != nil {
log.Fatal("failed to list the active task info", zap.Error(err))
}
Expand Down Expand Up @@ -259,7 +259,7 @@ func getConfigValue(a string, b string, name string) string {
}

func (c *mck) cleanTrash() {
keys, _, err := c.metaKV.LoadWithPrefix(MckTrash)
keys, _, err := c.metaKV.LoadWithPrefix(context.TODO(), MckTrash)
if err != nil {
log.Error("failed to load backup info", zap.Error(err))
return
Expand All @@ -273,7 +273,7 @@ func (c *mck) cleanTrash() {
deleteAll := ""
fmt.Scanln(&deleteAll)
if deleteAll == "Y" {
err = c.metaKV.RemoveWithPrefix(MckTrash)
err = c.metaKV.RemoveWithPrefix(context.TODO(), MckTrash)
if err != nil {
log.Error("failed to remove backup infos", zap.String("key", MckTrash), zap.Error(err))
return
Expand Down Expand Up @@ -395,31 +395,31 @@ func (c *mck) extractTask(prefix string, keys []string, values []string) {
func (c *mck) removeTask(invalidTask int64) bool {
taskType := c.taskNameMap[invalidTask]
key := c.taskKeyMap[invalidTask]
err := c.metaKV.Save(getTrashKey(taskType, key), c.allTaskInfo[key])
err := c.metaKV.Save(context.TODO(), getTrashKey(taskType, key), c.allTaskInfo[key])
if err != nil {
log.Warn("failed to backup task", zap.String("key", getTrashKey(taskType, key)), zap.Int64("task_id", invalidTask), zap.Error(err))
return false
}
fmt.Printf("Back up task successfully, back path: %s\n", getTrashKey(taskType, key))
err = c.metaKV.Remove(key)
err = c.metaKV.Remove(context.TODO(), key)
if err != nil {
log.Warn("failed to remove task", zap.Int64("task_id", invalidTask), zap.Error(err))
return false
}

key = fmt.Sprintf("%s/%d", taskInfoPrefix, invalidTask)
taskInfo, err := c.metaKV.Load(key)
taskInfo, err := c.metaKV.Load(context.TODO(), key)
if err != nil {
log.Warn("failed to load task info", zap.Int64("task_id", invalidTask), zap.Error(err))
return false
}
err = c.metaKV.Save(getTrashKey(taskType, key), taskInfo)
err = c.metaKV.Save(context.TODO(), getTrashKey(taskType, key), taskInfo)
if err != nil {
log.Warn("failed to backup task info", zap.Int64("task_id", invalidTask), zap.Error(err))
return false
}
fmt.Printf("Back up task info successfully, back path: %s\n", getTrashKey(taskType, key))
err = c.metaKV.Remove(key)
err = c.metaKV.Remove(context.TODO(), key)
if err != nil {
log.Warn("failed to remove task info", zap.Int64("task_id", invalidTask), zap.Error(err))
}
Expand Down
3 changes: 2 additions & 1 deletion internal/kv/etcd/embed_etcd_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package etcdkv_test

import (
"context"
"os"
"testing"

Expand Down Expand Up @@ -49,7 +50,7 @@ func TestEtcdConfigLoad(te *testing.T) {
require.NoError(t, err)

defer metaKv.Close()
defer metaKv.RemoveWithPrefix("")
defer metaKv.RemoveWithPrefix(context.TODO(), "")

kv := metaKv.(*embed_etcd_kv.EmbedEtcdKV)
assert.Equal(t, kv.GetConfig().SnapshotCount, uint64(1000))
Expand Down
Loading

0 comments on commit b0042e9

Please sign in to comment.