Skip to content

Commit

Permalink
enhance: Add collection id to search request count metrics (milvus-io…
Browse files Browse the repository at this point in the history
  • Loading branch information
weiliu1031 authored Dec 3, 2024
1 parent c5c449f commit b29237e
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 17 deletions.
16 changes: 8 additions & 8 deletions internal/querynodev2/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,10 +193,10 @@ func (node *QueryNode) queryChannel(ctx context.Context, req *querypb.QueryReque
)

var err error
metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(node.GetNodeID()), metrics.QueryLabel, metrics.TotalLabel, metrics.Leader).Inc()
metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(node.GetNodeID()), metrics.QueryLabel, metrics.TotalLabel, metrics.Leader, fmt.Sprint(req.GetReq().GetCollectionID())).Inc()
defer func() {
if err != nil {
metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(node.GetNodeID()), metrics.QueryLabel, metrics.FailLabel, metrics.Leader).Inc()
metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(node.GetNodeID()), metrics.QueryLabel, metrics.FailLabel, metrics.Leader, fmt.Sprint(req.GetReq().GetCollectionID())).Inc()
}
}()

Expand Down Expand Up @@ -252,12 +252,12 @@ func (node *QueryNode) queryChannel(ctx context.Context, req *querypb.QueryReque

latency := tr.ElapseSpan()
metrics.QueryNodeSQReqLatency.WithLabelValues(fmt.Sprint(node.GetNodeID()), metrics.QueryLabel, metrics.Leader).Observe(float64(latency.Milliseconds()))
metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(node.GetNodeID()), metrics.QueryLabel, metrics.SuccessLabel, metrics.Leader).Inc()
metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(node.GetNodeID()), metrics.QueryLabel, metrics.SuccessLabel, metrics.Leader, fmt.Sprint(req.GetReq().GetCollectionID())).Inc()
return resp, nil
}

func (node *QueryNode) queryChannelStream(ctx context.Context, req *querypb.QueryRequest, channel string, srv streamrpc.QueryStreamServer) error {
metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(node.GetNodeID()), metrics.QueryLabel, metrics.TotalLabel, metrics.Leader).Inc()
metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(node.GetNodeID()), metrics.QueryLabel, metrics.TotalLabel, metrics.Leader, fmt.Sprint(req.GetReq().GetCollectionID())).Inc()
msgID := req.Req.Base.GetMsgID()
log := log.Ctx(ctx).With(
zap.Int64("msgID", msgID),
Expand All @@ -269,7 +269,7 @@ func (node *QueryNode) queryChannelStream(ctx context.Context, req *querypb.Quer
var err error
defer func() {
if err != nil {
metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(node.GetNodeID()), metrics.QueryLabel, metrics.FailLabel, metrics.Leader).Inc()
metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(node.GetNodeID()), metrics.QueryLabel, metrics.FailLabel, metrics.Leader, fmt.Sprint(req.GetReq().GetCollectionID())).Inc()
}
}()

Expand Down Expand Up @@ -350,10 +350,10 @@ func (node *QueryNode) searchChannel(ctx context.Context, req *querypb.SearchReq
defer node.lifetime.Done()

var err error
metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(node.GetNodeID()), metrics.SearchLabel, metrics.TotalLabel, metrics.Leader).Inc()
metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(node.GetNodeID()), metrics.SearchLabel, metrics.TotalLabel, metrics.Leader, fmt.Sprint(req.GetReq().GetCollectionID())).Inc()
defer func() {
if err != nil {
metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(node.GetNodeID()), metrics.SearchLabel, metrics.FailLabel, metrics.Leader).Inc()
metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(node.GetNodeID()), metrics.SearchLabel, metrics.FailLabel, metrics.Leader, fmt.Sprint(req.GetReq().GetCollectionID())).Inc()
}
}()

Expand Down Expand Up @@ -404,7 +404,7 @@ func (node *QueryNode) searchChannel(ctx context.Context, req *querypb.SearchReq
// update metric to prometheus
latency := tr.ElapseSpan()
metrics.QueryNodeSQReqLatency.WithLabelValues(fmt.Sprint(node.GetNodeID()), metrics.SearchLabel, metrics.Leader).Observe(float64(latency.Milliseconds()))
metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(node.GetNodeID()), metrics.SearchLabel, metrics.SuccessLabel, metrics.Leader).Inc()
metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(node.GetNodeID()), metrics.SearchLabel, metrics.SuccessLabel, metrics.Leader, fmt.Sprint(req.GetReq().GetCollectionID())).Inc()
metrics.QueryNodeSearchNQ.WithLabelValues(fmt.Sprint(node.GetNodeID())).Observe(float64(req.Req.GetNq()))
metrics.QueryNodeSearchTopK.WithLabelValues(fmt.Sprint(node.GetNodeID())).Observe(float64(req.Req.GetTopk()))
return resp, nil
Expand Down
18 changes: 9 additions & 9 deletions internal/querynodev2/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -674,10 +674,10 @@ func (node *QueryNode) SearchSegments(ctx context.Context, req *querypb.SearchRe
}
defer node.lifetime.Done()

metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(node.GetNodeID()), metrics.SearchLabel, metrics.TotalLabel, metrics.FromLeader).Inc()
metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(node.GetNodeID()), metrics.SearchLabel, metrics.TotalLabel, metrics.FromLeader, fmt.Sprint(req.GetReq().GetCollectionID())).Inc()
defer func() {
if !merr.Ok(resp.GetStatus()) {
metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(node.GetNodeID()), metrics.SearchLabel, metrics.FailLabel, metrics.FromLeader).Inc()
metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(node.GetNodeID()), metrics.SearchLabel, metrics.FailLabel, metrics.FromLeader, fmt.Sprint(req.GetReq().GetCollectionID())).Inc()
}
}()

Expand Down Expand Up @@ -725,7 +725,7 @@ func (node *QueryNode) SearchSegments(ctx context.Context, req *querypb.SearchRe

latency := tr.ElapseSpan()
metrics.QueryNodeSQReqLatency.WithLabelValues(fmt.Sprint(node.GetNodeID()), metrics.SearchLabel, metrics.FromLeader).Observe(float64(latency.Milliseconds()))
metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(node.GetNodeID()), metrics.SearchLabel, metrics.SuccessLabel, metrics.FromLeader).Inc()
metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(node.GetNodeID()), metrics.SearchLabel, metrics.SuccessLabel, metrics.FromLeader, fmt.Sprint(req.GetReq().GetCollectionID())).Inc()

resp = task.SearchResult()
resp.GetCostAggregation().ResponseTime = tr.ElapseSpan().Milliseconds()
Expand Down Expand Up @@ -852,10 +852,10 @@ func (node *QueryNode) QuerySegments(ctx context.Context, req *querypb.QueryRequ
}
defer node.lifetime.Done()

metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(node.GetNodeID()), metrics.QueryLabel, metrics.TotalLabel, metrics.FromLeader).Inc()
metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(node.GetNodeID()), metrics.QueryLabel, metrics.TotalLabel, metrics.FromLeader, fmt.Sprint(req.GetReq().GetCollectionID())).Inc()
defer func() {
if resp.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success {
metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(node.GetNodeID()), metrics.QueryLabel, metrics.FailLabel, metrics.FromLeader).Inc()
metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(node.GetNodeID()), metrics.QueryLabel, metrics.FailLabel, metrics.FromLeader, fmt.Sprint(req.GetReq().GetCollectionID())).Inc()
}
}()

Expand Down Expand Up @@ -894,7 +894,7 @@ func (node *QueryNode) QuerySegments(ctx context.Context, req *querypb.QueryRequ
// TODO QueryNodeSQLatencyInQueue QueryNodeReduceLatency
latency := tr.ElapseSpan()
metrics.QueryNodeSQReqLatency.WithLabelValues(fmt.Sprint(node.GetNodeID()), metrics.QueryLabel, metrics.FromLeader).Observe(float64(latency.Milliseconds()))
metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(node.GetNodeID()), metrics.QueryLabel, metrics.SuccessLabel, metrics.FromLeader).Inc()
metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(node.GetNodeID()), metrics.QueryLabel, metrics.SuccessLabel, metrics.FromLeader, fmt.Sprint(req.GetReq().GetCollectionID())).Inc()
result := task.Result()
result.GetCostAggregation().ResponseTime = latency.Milliseconds()
result.GetCostAggregation().TotalNQ = node.scheduler.GetWaitingTaskTotalNQ()
Expand Down Expand Up @@ -1049,10 +1049,10 @@ func (node *QueryNode) QueryStreamSegments(req *querypb.QueryRequest, srv queryp
)

resp := &internalpb.RetrieveResults{}
metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(node.GetNodeID()), metrics.QueryLabel, metrics.TotalLabel, metrics.FromLeader).Inc()
metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(node.GetNodeID()), metrics.QueryLabel, metrics.TotalLabel, metrics.FromLeader, fmt.Sprint(req.GetReq().GetCollectionID())).Inc()
defer func() {
if resp.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success {
metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(node.GetNodeID()), metrics.QueryLabel, metrics.FailLabel, metrics.FromLeader).Inc()
metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(node.GetNodeID()), metrics.QueryLabel, metrics.FailLabel, metrics.FromLeader, fmt.Sprint(req.GetReq().GetCollectionID())).Inc()
}
}()

Expand Down Expand Up @@ -1083,7 +1083,7 @@ func (node *QueryNode) QueryStreamSegments(req *querypb.QueryRequest, srv queryp
// TODO QueryNodeSQLatencyInQueue QueryNodeReduceLatency
latency := tr.ElapseSpan()
metrics.QueryNodeSQReqLatency.WithLabelValues(fmt.Sprint(node.GetNodeID()), metrics.QueryLabel, metrics.FromLeader).Observe(float64(latency.Milliseconds()))
metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(node.GetNodeID()), metrics.QueryLabel, metrics.SuccessLabel, metrics.FromLeader).Inc()
metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(node.GetNodeID()), metrics.QueryLabel, metrics.SuccessLabel, metrics.FromLeader, fmt.Sprint(req.GetReq().GetCollectionID())).Inc()
return nil
}

Expand Down
56 changes: 56 additions & 0 deletions pkg/metrics/querynode_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ var (
queryTypeLabelName,
statusLabelName,
requestScope,
collectionIDLabelName,
})

QueryNodeSQReqLatency = prometheus.NewHistogramVec(
Expand Down Expand Up @@ -905,4 +906,59 @@ func CleanupQueryNodeCollectionMetrics(nodeID int64, collectionID int64) {
nodeIDLabelName: nodeIDLabel,
collectionIDLabelName: collectionIDLabel,
})
QueryNodeNumSegments.
DeletePartialMatch(
prometheus.Labels{
nodeIDLabelName: nodeIDLabel,
collectionIDLabelName: collectionIDLabel,
})

QueryNodeSQCount.
DeletePartialMatch(
prometheus.Labels{
nodeIDLabelName: nodeIDLabel,
collectionIDLabelName: collectionIDLabel,
})

QueryNodeSearchHitSegmentNum.
DeletePartialMatch(
prometheus.Labels{
nodeIDLabelName: nodeIDLabel,
collectionIDLabelName: collectionIDLabel,
})

QueryNodeSegmentPruneRatio.
DeletePartialMatch(
prometheus.Labels{
nodeIDLabelName: nodeIDLabel,
collectionIDLabelName: collectionIDLabel,
})

QueryNodeSegmentPruneBias.
DeletePartialMatch(
prometheus.Labels{
nodeIDLabelName: nodeIDLabel,
collectionIDLabelName: collectionIDLabel,
})

QueryNodeSegmentPruneLatency.
DeletePartialMatch(
prometheus.Labels{
nodeIDLabelName: nodeIDLabel,
collectionIDLabelName: collectionIDLabel,
})

QueryNodeEntitiesSize.
DeletePartialMatch(
prometheus.Labels{
nodeIDLabelName: nodeIDLabel,
collectionIDLabelName: collectionIDLabel,
})

QueryNodeLevelZeroSize.
DeletePartialMatch(
prometheus.Labels{
nodeIDLabelName: nodeIDLabel,
collectionIDLabelName: collectionIDLabel,
})
}

0 comments on commit b29237e

Please sign in to comment.