Skip to content

Commit

Permalink
refine exists log print with ctx
Browse files Browse the repository at this point in the history
Signed-off-by: tinswzy <[email protected]>
  • Loading branch information
tinswzy committed Dec 5, 2024
1 parent 6d0a4fd commit 5bd70fa
Show file tree
Hide file tree
Showing 157 changed files with 1,068 additions and 866 deletions.
4 changes: 2 additions & 2 deletions cmd/components/data_coord.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,10 @@ func (s *DataCoord) Prepare() error {
// Run starts service
func (s *DataCoord) Run() error {
if err := s.svr.Run(); err != nil {
log.Error("DataCoord starts error", zap.Error(err))
log.Ctx(s.ctx).Error("DataCoord starts error", zap.Error(err))
return err
}
log.Debug("DataCoord successfully started")
log.Ctx(s.ctx).Debug("DataCoord successfully started")
return nil
}

Expand Down
4 changes: 2 additions & 2 deletions cmd/components/data_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,10 @@ func (d *DataNode) Prepare() error {
// Run starts service
func (d *DataNode) Run() error {
if err := d.svr.Run(); err != nil {
log.Error("DataNode starts error", zap.Error(err))
log.Ctx(d.ctx).Error("DataNode starts error", zap.Error(err))
return err
}
log.Debug("Datanode successfully started")
log.Ctx(d.ctx).Info("Datanode successfully started")
return nil
}

Expand Down
4 changes: 2 additions & 2 deletions cmd/components/index_coord.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,13 @@ func (s *IndexCoord) Prepare() error {

// Run starts service
func (s *IndexCoord) Run() error {
log.Info("IndexCoord running ...")
log.Ctx(context.TODO()).Info("IndexCoord running ...")
return nil
}

// Stop terminates service
func (s *IndexCoord) Stop() error {
log.Info("IndexCoord stopping ...")
log.Ctx(context.TODO()).Info("IndexCoord stopping ...")
return nil
}

Expand Down
9 changes: 6 additions & 3 deletions cmd/components/index_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,16 @@ import (

// IndexNode implements IndexNode grpc server
type IndexNode struct {
ctx context.Context
svr *grpcindexnode.Server
}

// NewIndexNode creates a new IndexNode
func NewIndexNode(ctx context.Context, factory dependency.Factory) (*IndexNode, error) {
var err error
n := &IndexNode{}
n := &IndexNode{
ctx: ctx,
}
svr, err := grpcindexnode.NewServer(ctx, factory)
if err != nil {
return nil, err
Expand All @@ -55,10 +58,10 @@ func (n *IndexNode) Prepare() error {
// Run starts service
func (n *IndexNode) Run() error {
if err := n.svr.Run(); err != nil {
log.Error("IndexNode starts error", zap.Error(err))
log.Ctx(n.ctx).Error("IndexNode starts error", zap.Error(err))
return err
}
log.Debug("IndexNode successfully started")
log.Ctx(n.ctx).Info("IndexNode successfully started")
return nil
}

Expand Down
4 changes: 2 additions & 2 deletions cmd/components/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,10 @@ func (n *Proxy) Prepare() error {
// Run starts service
func (n *Proxy) Run() error {
if err := n.svr.Run(); err != nil {
log.Error("Proxy starts error", zap.Error(err))
log.Ctx(context.TODO()).Error("Proxy starts error", zap.Error(err))
return err
}
log.Info("Proxy successfully started")
log.Ctx(context.TODO()).Info("Proxy successfully started")
return nil
}

Expand Down
4 changes: 2 additions & 2 deletions cmd/components/query_coord.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,10 @@ func (qs *QueryCoord) Prepare() error {
// Run starts service
func (qs *QueryCoord) Run() error {
if err := qs.svr.Run(); err != nil {
log.Error("QueryCoord starts error", zap.Error(err))
log.Ctx(qs.ctx).Error("QueryCoord starts error", zap.Error(err))
return err
}
log.Debug("QueryCoord successfully started")
log.Ctx(qs.ctx).Info("QueryCoord successfully started")
return nil
}

Expand Down
4 changes: 2 additions & 2 deletions cmd/components/query_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,10 @@ func (q *QueryNode) Prepare() error {
// Run starts service
func (q *QueryNode) Run() error {
if err := q.svr.Run(); err != nil {
log.Error("QueryNode starts error", zap.Error(err))
log.Ctx(q.ctx).Error("QueryNode starts error", zap.Error(err))
return err
}
log.Debug("QueryNode successfully started")
log.Ctx(q.ctx).Info("QueryNode successfully started")
return nil
}

Expand Down
4 changes: 2 additions & 2 deletions cmd/components/root_coord.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,10 @@ func (rc *RootCoord) Prepare() error {
// Run starts service
func (rc *RootCoord) Run() error {
if err := rc.svr.Run(); err != nil {
log.Error("RootCoord starts error", zap.Error(err))
log.Ctx(rc.ctx).Error("RootCoord starts error", zap.Error(err))
return err
}
log.Info("RootCoord successfully started")
log.Ctx(rc.ctx).Info("RootCoord successfully started")
return nil
}

Expand Down
4 changes: 2 additions & 2 deletions cmd/components/streaming_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ type StreamingNode struct {
}

// NewStreamingNode creates a new StreamingNode
func NewStreamingNode(_ context.Context, factory dependency.Factory) (*StreamingNode, error) {
svr, err := streamingnode.NewServer(factory)
func NewStreamingNode(ctx context.Context, factory dependency.Factory) (*StreamingNode, error) {
svr, err := streamingnode.NewServer(ctx, factory)
if err != nil {
return nil, err
}
Expand Down
12 changes: 9 additions & 3 deletions cmd/milvus/mck.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ func (c *mck) execute(args []string, flags *flag.FlagSet) {

func (c *mck) run() {
c.connectMinio()
log := log.Ctx(context.TODO())

_, values, err := c.metaKV.LoadWithPrefix(segmentPrefix)
if err != nil {
Expand Down Expand Up @@ -206,13 +207,14 @@ func (c *mck) formatFlags(args []string, flags *flag.FlagSet) {
if err := flags.Parse(os.Args[2:]); err != nil {
log.Fatal("failed to parse flags", zap.Error(err))
}
log.Info("args", zap.Strings("args", args))
log.Ctx(context.TODO()).Info("args", zap.Strings("args", args))
}

func (c *mck) connectEctd() {
c.params.Init(paramtable.NewBaseTable())
var etcdCli *clientv3.Client
var err error
log := log.Ctx(context.TODO())
if c.etcdIP != "" {
etcdCli, err = etcd.GetRemoteEtcdClient([]string{c.etcdIP})
} else {
Expand Down Expand Up @@ -243,7 +245,7 @@ func (c *mck) connectMinio() {
var err error
c.minioChunkManager, err = chunkManagerFactory.NewPersistentStorageChunkManager(context.Background())
if err != nil {
log.Fatal("failed to connect to minio", zap.Error(err))
log.Ctx(context.TODO()).Fatal("failed to connect to minio", zap.Error(err))
}
}

Expand All @@ -254,11 +256,12 @@ func getConfigValue(a string, b string, name string) string {
if b != "" {
return b
}
log.Panic(fmt.Sprintf("the config '%s' is empty", name))
log.Ctx(context.TODO()).Panic(fmt.Sprintf("the config '%s' is empty", name))
return ""
}

func (c *mck) cleanTrash() {
log := log.Ctx(context.TODO())
keys, _, err := c.metaKV.LoadWithPrefix(MckTrash)
if err != nil {
log.Error("failed to load backup info", zap.Error(err))
Expand Down Expand Up @@ -367,6 +370,7 @@ func getTrashKey(taskType, key string) string {
}

func (c *mck) extractTask(prefix string, keys []string, values []string) {
log := log.Ctx(context.TODO())
for i := range keys {
taskID, err := strconv.ParseInt(filepath.Base(keys[i]), 10, 64)
if err != nil {
Expand All @@ -393,6 +397,7 @@ func (c *mck) extractTask(prefix string, keys []string, values []string) {
}

func (c *mck) removeTask(invalidTask int64) bool {
log := log.Ctx(context.TODO())
taskType := c.taskNameMap[invalidTask]
key := c.taskKeyMap[invalidTask]
err := c.metaKV.Save(getTrashKey(taskType, key), c.allTaskInfo[key])
Expand Down Expand Up @@ -520,6 +525,7 @@ func (c *mck) extractVecFieldIndexInfo(taskID int64, infos []*querypb.FieldIndex

// return partitionIDs,segmentIDs,error
func (c *mck) unmarshalTask(taskID int64, t string) (string, []int64, []int64, error) {
log := log.Ctx(context.TODO())
header := commonpb.MsgHeader{}
err := proto.Unmarshal([]byte(t), &header)
if err != nil {
Expand Down
3 changes: 2 additions & 1 deletion cmd/milvus/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ func CleanSession(metaPath string, etcdEndpoints []string, sessionSuffix []strin
for _, key := range keys {
_, _ = etcdCli.Delete(ctx, key)
}
log.Info("clean sessions from etcd", zap.Any("keys", keys))
log.Ctx(ctx).Info("clean sessions from etcd", zap.Any("keys", keys))
return nil
}

Expand All @@ -259,6 +259,7 @@ func getSessionPaths(ctx context.Context, client *clientv3.Client, metaPath stri

// filterUnmatchedKey skip active keys that don't match completed key, the latest active key may from standby server
func addActiveKeySuffix(ctx context.Context, client *clientv3.Client, sessionPathPrefix string, sessionSuffix []string) []string {
log := log.Ctx(ctx)
suffixSet := lo.SliceToMap(sessionSuffix, func(t string) (string, struct{}) {
return t, struct{}{}
})
Expand Down
11 changes: 6 additions & 5 deletions cmd/tools/config/generate.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"context"
"encoding/csv"
"fmt"
"io"
Expand Down Expand Up @@ -59,7 +60,7 @@ func collectRecursive(params *paramtable.ComponentParam, data *[]DocContent, val
if val.Kind() != reflect.Struct {
return
}
log.Debug("enter", zap.Any("variable", val.String()))
log.Ctx(context.TODO()).Debug("enter", zap.Any("variable", val.String()))
for j := 0; j < val.NumField(); j++ {
subVal := val.Field(j)
tag := val.Type().Field(j).Tag
Expand All @@ -71,11 +72,11 @@ func collectRecursive(params *paramtable.ComponentParam, data *[]DocContent, val
if strings.HasPrefix(item.DefaultValue, "\"") && strings.HasSuffix(item.DefaultValue, "\"") {
defaultValue = fmt.Sprintf("\"%s\"", defaultValue)
}
log.Debug("got key", zap.String("key", item.Key), zap.Any("value", defaultValue), zap.String("variable", val.Type().Field(j).Name))
log.Ctx(context.TODO()).Debug("got key", zap.String("key", item.Key), zap.Any("value", defaultValue), zap.String("variable", val.Type().Field(j).Name))
*data = append(*data, DocContent{item.Key, defaultValue, item.Version, refreshable, item.Export, item.Doc})
} else if t == "paramtable.ParamGroup" {
item := subVal.Interface().(paramtable.ParamGroup)
log.Debug("got key", zap.String("key", item.KeyPrefix), zap.String("variable", val.Type().Field(j).Name))
log.Ctx(context.TODO()).Debug("got key", zap.String("key", item.KeyPrefix), zap.String("variable", val.Type().Field(j).Name))
refreshable := tag.Get("refreshable")

// Sort group items to stablize the output order
Expand All @@ -87,7 +88,7 @@ func collectRecursive(params *paramtable.ComponentParam, data *[]DocContent, val
sort.Strings(keys)
for _, key := range keys {
value := m[key]
log.Debug("got group entry", zap.String("key", key), zap.String("value", value))
log.Ctx(context.TODO()).Debug("got group entry", zap.String("key", key), zap.String("value", value))
*data = append(*data, DocContent{fmt.Sprintf("%s%s", item.KeyPrefix, key), quoteIfNeeded(value), item.Version, refreshable, item.Export, item.GetDoc(key)})
}
} else {
Expand Down Expand Up @@ -148,7 +149,7 @@ func (m *YamlMarshaller) writeYamlRecursive(data []DocContent, level int) {
for _, key := range keys {
contents, ok := topLevels.Get(key)
if !ok {
log.Debug("didnot found config for " + key)
log.Ctx(context.TODO()).Debug("didnot found config for " + key)
continue
}
content := contents[0]
Expand Down
2 changes: 1 addition & 1 deletion cmd/tools/migration/mmap/tool/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func prepareRootCoordMeta(ctx context.Context, allocator tso.Allocator) rootcoor
}
catalog = &kvmetestore.Catalog{Txn: metaKV, Snapshot: ss}
case util.MetaStoreTypeTiKV:
log.Info("Using tikv as meta storage.")
log.Ctx(ctx).Info("Using tikv as meta storage.")
var metaKV kv.MetaKv
var ss *kvmetestore.SuffixSnapshot
var err error
Expand Down
6 changes: 3 additions & 3 deletions internal/coordinator/coordclient/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func RegisterQueryCoordServer(server querypb.QueryCoordServer) {
}
newLocalClient := grpcclient.NewLocalGRPCClient(&querypb.QueryCoord_ServiceDesc, server, querypb.NewQueryCoordClient)
glocalClient.queryCoordClient.Set(&nopCloseQueryCoordClient{newLocalClient})
log.Info("register query coord server", zap.Any("enableLocalClient", enableLocal))
log.Ctx(context.TODO()).Info("register query coord server", zap.Any("enableLocalClient", enableLocal))
}

// RegsterDataCoordServer register data coord server
Expand All @@ -76,7 +76,7 @@ func RegisterDataCoordServer(server datapb.DataCoordServer) {
}
newLocalClient := grpcclient.NewLocalGRPCClient(&datapb.DataCoord_ServiceDesc, server, datapb.NewDataCoordClient)
glocalClient.dataCoordClient.Set(&nopCloseDataCoordClient{newLocalClient})
log.Info("register data coord server", zap.Any("enableLocalClient", enableLocal))
log.Ctx(context.TODO()).Info("register data coord server", zap.Any("enableLocalClient", enableLocal))
}

// RegisterRootCoordServer register root coord server
Expand All @@ -86,7 +86,7 @@ func RegisterRootCoordServer(server rootcoordpb.RootCoordServer) {
}
newLocalClient := grpcclient.NewLocalGRPCClient(&rootcoordpb.RootCoord_ServiceDesc, server, rootcoordpb.NewRootCoordClient)
glocalClient.rootCoordClient.Set(&nopCloseRootCoordClient{newLocalClient})
log.Info("register root coord server", zap.Any("enableLocalClient", enableLocal))
log.Ctx(context.TODO()).Info("register root coord server", zap.Any("enableLocalClient", enableLocal))
}

// GetQueryCoordClient return query coord client
Expand Down
Loading

0 comments on commit 5bd70fa

Please sign in to comment.