Skip to content

Commit

Permalink
refine exists log print with ctx
Browse files Browse the repository at this point in the history
Signed-off-by: tinswzy <[email protected]>
  • Loading branch information
tinswzy committed Nov 28, 2024
1 parent 84698c0 commit 57e8238
Show file tree
Hide file tree
Showing 58 changed files with 500 additions and 492 deletions.
24 changes: 13 additions & 11 deletions internal/datacoord/channel_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,15 +166,15 @@ func (m *ChannelManagerImpl) Startup(ctx context.Context, legacyNodes, allNodes
}

if m.balanceCheckLoop != nil {
log.Info("starting channel balance loop")
log.Ctx(ctx).Info("starting channel balance loop")
m.wg.Add(1)
go func() {
defer m.wg.Done()
m.balanceCheckLoop(ctx)
}()
}

log.Info("cluster start up",
log.Ctx(ctx).Info("cluster start up",
zap.Int64s("allNodes", allNodes),
zap.Int64s("legacyNodes", legacyNodes),
zap.Int64s("oldNodes", oNodes),
Expand Down Expand Up @@ -242,6 +242,7 @@ func (m *ChannelManagerImpl) Watch(ctx context.Context, ch RWChannel) error {

log.Info("Add channel")
updates := NewChannelOpSet(NewChannelOp(bufferID, Watch, ch))
// TODO fill in traceID to channelOp's watchInfo
err := m.execute(updates)
if err != nil {
log.Warn("fail to update new channel updates into meta",
Expand All @@ -255,6 +256,7 @@ func (m *ChannelManagerImpl) Watch(ctx context.Context, ch RWChannel) error {
return nil
}

// TODO fill in traceID to channelOp's watchInfo
if err := m.execute(updates); err != nil {
log.Warn("fail to assign channel, will retry later", zap.Array("updates", updates), zap.Error(err))
return nil
Expand Down Expand Up @@ -489,7 +491,7 @@ func (m *ChannelManagerImpl) finishRemoveChannel(nodeID int64, channels ...RWCha
}
}

func (m *ChannelManagerImpl) advanceStandbys(_ context.Context, standbys []*NodeChannelInfo) bool {
func (m *ChannelManagerImpl) advanceStandbys(ctx context.Context, standbys []*NodeChannelInfo) bool {
var advanced bool = false
for _, nodeAssign := range standbys {
validChannels := make(map[string]RWChannel)
Expand All @@ -516,15 +518,15 @@ func (m *ChannelManagerImpl) advanceStandbys(_ context.Context, standbys []*Node

chNames := lo.Keys(validChannels)
if err := m.reassign(nodeAssign); err != nil {
log.Warn("Reassign channels fail",
log.Ctx(ctx).Warn("Reassign channels fail",
zap.Int64("nodeID", nodeAssign.NodeID),
zap.Strings("channels", chNames),
zap.Error(err),
)
continue
}

log.Info("Reassign standby channels to node",
log.Ctx(ctx).Info("Reassign standby channels to node",
zap.Int64("nodeID", nodeAssign.NodeID),
zap.Strings("channels", chNames),
)
Expand All @@ -550,7 +552,7 @@ func (m *ChannelManagerImpl) advanceToNotifies(ctx context.Context, toNotifies [
)

chNames := lo.Keys(nodeAssign.Channels)
log.Info("Notify channel operations to datanode",
log.Ctx(ctx).Info("Notify channel operations to datanode",
zap.Int64("assignment", nodeAssign.NodeID),
zap.Int("total operation count", len(nodeAssign.Channels)),
zap.Strings("channel names", chNames),
Expand All @@ -577,7 +579,7 @@ func (m *ChannelManagerImpl) advanceToNotifies(ctx context.Context, toNotifies [
}
}

log.Info("Finish to notify channel operations to datanode",
log.Ctx(ctx).Info("Finish to notify channel operations to datanode",
zap.Int64("assignment", nodeAssign.NodeID),
zap.Int("operation count", channelCount),
zap.Int("success count", len(succeededChannels)),
Expand Down Expand Up @@ -608,7 +610,7 @@ func (m *ChannelManagerImpl) advanceToChecks(ctx context.Context, toChecks []*No
futures := make([]*conc.Future[any], 0, len(nodeAssign.Channels))

chNames := lo.Keys(nodeAssign.Channels)
log.Info("Check ToWatch/ToRelease channel operations progress",
log.Ctx(ctx).Info("Check ToWatch/ToRelease channel operations progress",
zap.Int("channel count", len(nodeAssign.Channels)),
zap.Strings("channel names", chNames),
)
Expand Down Expand Up @@ -641,7 +643,7 @@ func (m *ChannelManagerImpl) advanceToChecks(ctx context.Context, toChecks []*No
}
}

log.Info("Finish to Check ToWatch/ToRelease channel operations progress",
log.Ctx(ctx).Info("Finish to Check ToWatch/ToRelease channel operations progress",
zap.Int("channel count", len(nodeAssign.Channels)),
zap.Strings("channel names", chNames),
)
Expand All @@ -650,7 +652,7 @@ func (m *ChannelManagerImpl) advanceToChecks(ctx context.Context, toChecks []*No
}

func (m *ChannelManagerImpl) Notify(ctx context.Context, nodeID int64, info *datapb.ChannelWatchInfo) error {
log := log.With(
log := log.Ctx(ctx).With(
zap.String("channel", info.GetVchan().GetChannelName()),
zap.Int64("assignment", nodeID),
zap.String("operation", info.GetState().String()),
Expand All @@ -666,7 +668,7 @@ func (m *ChannelManagerImpl) Notify(ctx context.Context, nodeID int64, info *dat
}

func (m *ChannelManagerImpl) Check(ctx context.Context, nodeID int64, info *datapb.ChannelWatchInfo) (successful bool, got bool) {
log := log.With(
log := log.Ctx(ctx).With(
zap.Int64("opID", info.GetOpID()),
zap.Int64("nodeID", nodeID),
zap.String("check operation", info.GetState().String()),
Expand Down
24 changes: 12 additions & 12 deletions internal/datacoord/index_meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,20 +375,20 @@ func (m *indexMeta) HasSameReq(req *indexpb.CreateIndexRequest) (bool, UniqueID)
}

func (m *indexMeta) CreateIndex(ctx context.Context, index *model.Index) error {
log.Info("meta update: CreateIndex", zap.Int64("collectionID", index.CollectionID),
log.Ctx(ctx).Info("meta update: CreateIndex", zap.Int64("collectionID", index.CollectionID),
zap.Int64("fieldID", index.FieldID), zap.Int64("indexID", index.IndexID), zap.String("indexName", index.IndexName))
m.Lock()
defer m.Unlock()

if err := m.catalog.CreateIndex(ctx, index); err != nil {
log.Error("meta update: CreateIndex save meta fail", zap.Int64("collectionID", index.CollectionID),
log.Ctx(ctx).Error("meta update: CreateIndex save meta fail", zap.Int64("collectionID", index.CollectionID),
zap.Int64("fieldID", index.FieldID), zap.Int64("indexID", index.IndexID),
zap.String("indexName", index.IndexName), zap.Error(err))
return err
}

m.updateCollectionIndex(index)
log.Info("meta update: CreateIndex success", zap.Int64("collectionID", index.CollectionID),
log.Ctx(ctx).Info("meta update: CreateIndex success", zap.Int64("collectionID", index.CollectionID),
zap.Int64("fieldID", index.FieldID), zap.Int64("indexID", index.IndexID), zap.String("indexName", index.IndexName))
return nil
}
Expand All @@ -415,19 +415,19 @@ func (m *indexMeta) AddSegmentIndex(ctx context.Context, segIndex *model.Segment
defer m.Unlock()

buildID := segIndex.BuildID
log.Info("meta update: adding segment index", zap.Int64("collectionID", segIndex.CollectionID),
log.Ctx(ctx).Info("meta update: adding segment index", zap.Int64("collectionID", segIndex.CollectionID),
zap.Int64("segmentID", segIndex.SegmentID), zap.Int64("indexID", segIndex.IndexID),
zap.Int64("buildID", buildID))

segIndex.IndexState = commonpb.IndexState_Unissued
if err := m.catalog.CreateSegmentIndex(ctx, segIndex); err != nil {
log.Warn("meta update: adding segment index failed",
log.Ctx(ctx).Warn("meta update: adding segment index failed",
zap.Int64("segmentID", segIndex.SegmentID), zap.Int64("indexID", segIndex.IndexID),
zap.Int64("buildID", segIndex.BuildID), zap.Error(err))
return err
}
m.updateSegmentIndex(segIndex)
log.Info("meta update: adding segment index success", zap.Int64("collectionID", segIndex.CollectionID),
log.Ctx(ctx).Info("meta update: adding segment index success", zap.Int64("collectionID", segIndex.CollectionID),
zap.Int64("segmentID", segIndex.SegmentID), zap.Int64("indexID", segIndex.IndexID),
zap.Int64("buildID", buildID))
m.updateIndexTasksMetrics()
Expand Down Expand Up @@ -562,7 +562,7 @@ func (m *indexMeta) GetFieldIndexes(collID, fieldID UniqueID, indexName string)

// MarkIndexAsDeleted will mark the corresponding index as deleted, and recycleUnusedIndexFiles will recycle these tasks.
func (m *indexMeta) MarkIndexAsDeleted(ctx context.Context, collID UniqueID, indexIDs []UniqueID) error {
log.Info("IndexCoord metaTable MarkIndexAsDeleted", zap.Int64("collectionID", collID),
log.Ctx(ctx).Info("IndexCoord metaTable MarkIndexAsDeleted", zap.Int64("collectionID", collID),
zap.Int64s("indexIDs", indexIDs))

m.Lock()
Expand All @@ -587,14 +587,14 @@ func (m *indexMeta) MarkIndexAsDeleted(ctx context.Context, collID UniqueID, ind
}
err := m.catalog.AlterIndexes(ctx, indexes)
if err != nil {
log.Error("failed to alter index meta in meta store", zap.Int("indexes num", len(indexes)), zap.Error(err))
log.Ctx(ctx).Error("failed to alter index meta in meta store", zap.Int("indexes num", len(indexes)), zap.Error(err))
return err
}
for _, index := range indexes {
m.indexes[index.CollectionID][index.IndexID] = index
}

log.Info("IndexCoord metaTable MarkIndexAsDeleted success", zap.Int64("collectionID", collID), zap.Int64s("indexIDs", indexIDs))
log.Ctx(ctx).Info("IndexCoord metaTable MarkIndexAsDeleted success", zap.Int64("collectionID", collID), zap.Int64s("indexIDs", indexIDs))
return nil
}

Expand Down Expand Up @@ -930,10 +930,10 @@ func (m *indexMeta) GetDeletedIndexes() []*model.Index {
func (m *indexMeta) RemoveIndex(ctx context.Context, collID, indexID UniqueID) error {
m.Lock()
defer m.Unlock()
log.Info("IndexCoord meta table remove index", zap.Int64("collectionID", collID), zap.Int64("indexID", indexID))
log.Ctx(ctx).Info("IndexCoord meta table remove index", zap.Int64("collectionID", collID), zap.Int64("indexID", indexID))
err := m.catalog.DropIndex(ctx, collID, indexID)
if err != nil {
log.Info("IndexCoord meta table remove index fail", zap.Int64("collectionID", collID),
log.Ctx(ctx).Info("IndexCoord meta table remove index fail", zap.Int64("collectionID", collID),
zap.Int64("indexID", indexID), zap.Error(err))
return err
}
Expand All @@ -946,7 +946,7 @@ func (m *indexMeta) RemoveIndex(ctx context.Context, collID, indexID UniqueID) e
metrics.IndexTaskNum.Delete(prometheus.Labels{"collection_id": strconv.FormatInt(collID, 10), "index_task_status": metrics.FinishedIndexTaskLabel})
metrics.IndexTaskNum.Delete(prometheus.Labels{"collection_id": strconv.FormatInt(collID, 10), "index_task_status": metrics.FailedIndexTaskLabel})
}
log.Info("IndexCoord meta table remove index success", zap.Int64("collectionID", collID), zap.Int64("indexID", indexID))
log.Ctx(ctx).Info("IndexCoord meta table remove index success", zap.Int64("collectionID", collID), zap.Int64("indexID", indexID))
return nil
}

Expand Down
10 changes: 5 additions & 5 deletions internal/datacoord/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -690,7 +690,7 @@ func (s *Server) GetStateCode() commonpb.StateCode {
// GetComponentStates returns DataCoord's current state
func (s *Server) GetComponentStates(ctx context.Context, req *milvuspb.GetComponentStatesRequest) (*milvuspb.ComponentStates, error) {
code := s.GetStateCode()
log.Debug("DataCoord current state", zap.String("StateCode", code.String()))
log.Ctx(ctx).Debug("DataCoord current state", zap.String("StateCode", code.String()))
nodeID := common.NotRegisteredID
if s.session != nil && s.session.Registered() {
nodeID = s.session.GetServerID() // or Params.NodeID
Expand Down Expand Up @@ -1528,12 +1528,12 @@ func (s *Server) handleDataNodeTtMsg(ctx context.Context, ttMsg *msgpb.DataNodeT
// An error status will be returned and error will be logged, if we failed to mark *all* segments.
// Deprecated, do not use it
func (s *Server) MarkSegmentsDropped(ctx context.Context, req *datapb.MarkSegmentsDroppedRequest) (*commonpb.Status, error) {
log.Info("marking segments dropped", zap.Int64s("segments", req.GetSegmentIds()))
log.Ctx(ctx).Info("marking segments dropped", zap.Int64s("segments", req.GetSegmentIds()))
var err error
for _, segID := range req.GetSegmentIds() {
if err = s.meta.SetState(ctx, segID, commonpb.SegmentState_Dropped); err != nil {
// Fail-open.
log.Error("failed to set segment state as dropped", zap.Int64("segmentID", segID))
log.Ctx(ctx).Error("failed to set segment state as dropped", zap.Int64("segmentID", segID))
break
}
}
Expand Down Expand Up @@ -1664,7 +1664,7 @@ func (s *Server) ImportV2(ctx context.Context, in *internalpb.ImportRequestInter
Status: merr.Success(),
}

log := log.With(zap.Int64("collection", in.GetCollectionID()),
log := log.Ctx(ctx).With(zap.Int64("collection", in.GetCollectionID()),
zap.Int64s("partitions", in.GetPartitionIDs()),
zap.Strings("channels", in.GetChannelNames()))
log.Info("receive import request", zap.Any("files", in.GetFiles()), zap.Any("options", in.GetOptions()))
Expand Down Expand Up @@ -1749,7 +1749,7 @@ func (s *Server) ImportV2(ctx context.Context, in *internalpb.ImportRequestInter
}

func (s *Server) GetImportProgress(ctx context.Context, in *internalpb.GetImportProgressRequest) (*internalpb.GetImportProgressResponse, error) {
log := log.With(zap.String("jobID", in.GetJobID()))
log := log.Ctx(ctx).With(zap.String("jobID", in.GetJobID()))
if err := merr.CheckHealthy(s.GetStateCode()); err != nil {
return &internalpb.GetImportProgressResponse{
Status: merr.Status(err),
Expand Down
16 changes: 8 additions & 8 deletions internal/datanode/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ import (

// WatchDmChannels is not in use
func (node *DataNode) WatchDmChannels(ctx context.Context, in *datapb.WatchDmChannelsRequest) (*commonpb.Status, error) {
log.Warn("DataNode WatchDmChannels is not in use")
log.Ctx(ctx).Warn("DataNode WatchDmChannels is not in use")

// TODO ERROR OF GRPC NOT IN USE
return merr.Success(), nil
Expand All @@ -61,7 +61,7 @@ func (node *DataNode) WatchDmChannels(ctx context.Context, in *datapb.WatchDmCha
func (node *DataNode) GetComponentStates(ctx context.Context, req *milvuspb.GetComponentStatesRequest) (*milvuspb.ComponentStates, error) {
nodeID := common.NotRegisteredID
state := node.stateCode.Load().(commonpb.StateCode)
log.Debug("DataNode current state", zap.String("State", state.String()))
log.Ctx(ctx).Debug("DataNode current state", zap.String("State", state.String()))
if node.GetSession() != nil && node.session.Registered() {
nodeID = node.GetSession().ServerID
}
Expand Down Expand Up @@ -133,9 +133,9 @@ func (node *DataNode) GetStatisticsChannel(ctx context.Context, req *internalpb.

// ShowConfigurations returns the configurations of DataNode matching req.Pattern
func (node *DataNode) ShowConfigurations(ctx context.Context, req *internalpb.ShowConfigurationsRequest) (*internalpb.ShowConfigurationsResponse, error) {
log.Debug("DataNode.ShowConfigurations", zap.String("pattern", req.Pattern))
log.Ctx(ctx).Debug("DataNode.ShowConfigurations", zap.String("pattern", req.Pattern))
if err := merr.CheckHealthy(node.GetStateCode()); err != nil {
log.Warn("DataNode.ShowConfigurations failed", zap.Int64("nodeId", node.GetNodeID()), zap.Error(err))
log.Ctx(ctx).Warn("DataNode.ShowConfigurations failed", zap.Int64("nodeId", node.GetNodeID()), zap.Error(err))

return &internalpb.ShowConfigurationsResponse{
Status: merr.Status(err),
Expand All @@ -160,7 +160,7 @@ func (node *DataNode) ShowConfigurations(ctx context.Context, req *internalpb.Sh
// GetMetrics return datanode metrics
func (node *DataNode) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) {
if err := merr.CheckHealthy(node.GetStateCode()); err != nil {
log.Warn("DataNode.GetMetrics failed", zap.Int64("nodeId", node.GetNodeID()), zap.Error(err))
log.Ctx(ctx).Warn("DataNode.GetMetrics failed", zap.Int64("nodeId", node.GetNodeID()), zap.Error(err))

return &milvuspb.GetMetricsResponse{
Status: merr.Status(err),
Expand Down Expand Up @@ -252,7 +252,7 @@ func (node *DataNode) CompactionV2(ctx context.Context, req *datapb.CompactionPl
// return status of all compaction plans
func (node *DataNode) GetCompactionState(ctx context.Context, req *datapb.CompactionStateRequest) (*datapb.CompactionStateResponse, error) {
if err := merr.CheckHealthy(node.GetStateCode()); err != nil {
log.Warn("DataNode.GetCompactionState failed", zap.Int64("nodeId", node.GetNodeID()), zap.Error(err))
log.Ctx(ctx).Warn("DataNode.GetCompactionState failed", zap.Int64("nodeId", node.GetNodeID()), zap.Error(err))
return &datapb.CompactionStateResponse{
Status: merr.Status(err),
}, nil
Expand Down Expand Up @@ -351,9 +351,9 @@ func (node *DataNode) SyncSegments(ctx context.Context, req *datapb.SyncSegments
}

func (node *DataNode) NotifyChannelOperation(ctx context.Context, req *datapb.ChannelOperationsRequest) (*commonpb.Status, error) {
log.Ctx(ctx).Info("DataNode receives NotifyChannelOperation",
zap.Int("operation count", len(req.GetInfos())))
log := log.Ctx(ctx).With(zap.Int("operation count", len(req.GetInfos())))

log.Info("DataNode receives NotifyChannelOperation")
if node.channelManager == nil {
log.Warn("DataNode NotifyChannelOperation failed due to nil channelManager")
return merr.Status(merr.WrapErrServiceInternal("channelManager is nil! Ignore if you are upgrading datanode/coord to rpc based watch")), nil
Expand Down
4 changes: 2 additions & 2 deletions internal/distributed/datanode/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func NewClient(ctx context.Context, addr string, serverID int64) (types.DataNode
sess := sessionutil.NewSession(ctx)
if sess == nil {
err := fmt.Errorf("new session error, maybe can not connect to etcd")
log.Debug("DataNodeClient New Etcd Session failed", zap.Error(err))
log.Ctx(ctx).Debug("DataNodeClient New Etcd Session failed", zap.Error(err))
return nil, err
}
config := &Params.DataNodeGrpcClientCfg
Expand All @@ -77,7 +77,7 @@ func NewClient(ctx context.Context, addr string, serverID int64) (types.DataNode
client.grpcClient.EnableEncryption()
cp, err := utils.CreateCertPoolforClient(Params.InternalTLSCfg.InternalTLSCaPemPath.GetValue(), "DataNode")
if err != nil {
log.Error("Failed to create cert pool for DataNode client")
log.Ctx(ctx).Error("Failed to create cert pool for DataNode client")
return nil, err
}
client.grpcClient.SetInternalTLSCertPool(cp)
Expand Down
4 changes: 2 additions & 2 deletions internal/distributed/querynode/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func NewClient(ctx context.Context, addr string, nodeID int64) (types.QueryNodeC
sess := sessionutil.NewSession(ctx)
if sess == nil {
err := fmt.Errorf("new session error, maybe can not connect to etcd")
log.Debug("QueryNodeClient NewClient failed", zap.Error(err))
log.Ctx(ctx).Debug("QueryNodeClient NewClient failed", zap.Error(err))
return nil, err
}
config := &paramtable.Get().QueryNodeGrpcClientCfg
Expand All @@ -77,7 +77,7 @@ func NewClient(ctx context.Context, addr string, nodeID int64) (types.QueryNodeC
client.grpcClient.EnableEncryption()
cp, err := utils.CreateCertPoolforClient(Params.InternalTLSCfg.InternalTLSCaPemPath.GetValue(), "QueryNode")
if err != nil {
log.Error("Failed to create cert pool for QueryNode client")
log.Ctx(ctx).Error("Failed to create cert pool for QueryNode client")
return nil, err
}
client.grpcClient.SetInternalTLSCertPool(cp)
Expand Down
4 changes: 2 additions & 2 deletions internal/indexnode/task_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func (queue *IndexTaskQueue) PopActiveTask(tName string) task {
delete(queue.activeTasks, tName)
return t
}
log.Debug("IndexNode task was not found in the active task list", zap.String("TaskName", tName))
log.Ctx(queue.sched.ctx).Debug("IndexNode task was not found in the active task list", zap.String("TaskName", tName))
return nil
}

Expand Down Expand Up @@ -246,7 +246,7 @@ func (sched *TaskScheduler) processTask(t task, q TaskQueue) {
}

func (sched *TaskScheduler) indexBuildLoop() {
log.Debug("IndexNode TaskScheduler start build loop ...")
log.Ctx(sched.ctx).Debug("IndexNode TaskScheduler start build loop ...")
defer sched.wg.Done()
for {
select {
Expand Down
Loading

0 comments on commit 57e8238

Please sign in to comment.