Skip to content

Commit

Permalink
enhance: the proxy metric in the query request (milvus-io#33307)
Browse files Browse the repository at this point in the history
/kind improvement
issue: milvus-io#33306

Signed-off-by: SimFG <[email protected]>
  • Loading branch information
SimFG authored May 24, 2024
1 parent e895cfe commit 2964f60
Show file tree
Hide file tree
Showing 3 changed files with 148 additions and 89 deletions.
197 changes: 108 additions & 89 deletions internal/proxy/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -3415,49 +3415,32 @@ func (node *Proxy) Flush(ctx context.Context, request *milvuspb.FlushRequest) (*
// Query get the records by primary keys.
func (node *Proxy) query(ctx context.Context, qt *queryTask) (*milvuspb.QueryResults, error) {
request := qt.request
receiveSize := proto.Size(request)
metrics.ProxyReceiveBytes.WithLabelValues(
strconv.FormatInt(paramtable.GetNodeID(), 10),
metrics.QueryLabel,
request.GetCollectionName(),
).Add(float64(receiveSize))

metrics.ProxyReceivedNQ.WithLabelValues(
strconv.FormatInt(paramtable.GetNodeID(), 10),
metrics.SearchLabel,
request.GetCollectionName(),
).Add(float64(1))

subLabel := GetCollectionRateSubLabel(request)
rateCol.Add(internalpb.RateType_DQLQuery.String(), 1, subLabel)
method := "Query"
isProxyRequest := GetRequestLabelFromContext(ctx)

if err := merr.CheckHealthy(node.GetStateCode()); err != nil {
return &milvuspb.QueryResults{
Status: merr.Status(err),
}, nil
}

ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-Query")
defer sp.End()
tr := timerecord.NewTimeRecorder("Query")

method := "Query"

metrics.ProxyFunctionCall.WithLabelValues(
strconv.FormatInt(paramtable.GetNodeID(), 10),
method,
metrics.TotalLabel,
request.GetDbName(),
request.GetCollectionName(),
).Inc()

log := log.Ctx(ctx).With(
zap.String("role", typeutil.ProxyRole),
zap.String("db", request.DbName),
zap.String("collection", request.CollectionName),
zap.Strings("partitions", request.PartitionNames),
)

log.Debug(
rpcReceived(method),
zap.String("expr", request.Expr),
zap.Strings("OutputFields", request.OutputFields),
zap.Uint64("travel_timestamp", request.TravelTimestamp),
zap.Uint64("guarantee_timestamp", request.GuaranteeTimestamp),
)

tr := timerecord.NewTimeRecorder(method)

defer func() {
span := tr.ElapseSpan()
if span >= paramtable.Get().ProxyCfg.SlowQuerySpanInSeconds.GetAsDuration(time.Second) {
Expand All @@ -3475,27 +3458,21 @@ func (node *Proxy) query(ctx context.Context, qt *queryTask) (*milvuspb.QueryRes
}
}()

log.Debug(
rpcReceived(method),
zap.String("expr", request.Expr),
zap.Strings("OutputFields", request.OutputFields),
zap.Uint64("travel_timestamp", request.TravelTimestamp),
zap.Uint64("guarantee_timestamp", request.GuaranteeTimestamp),
)

if err := node.sched.dqQueue.Enqueue(qt); err != nil {
log.Warn(
rpcFailedToEnqueue(method),
zap.Error(err),
)

metrics.ProxyFunctionCall.WithLabelValues(
strconv.FormatInt(paramtable.GetNodeID(), 10),
method,
metrics.AbandonLabel,
request.GetDbName(),
request.GetCollectionName(),
).Inc()
if isProxyRequest {
metrics.ProxyFunctionCall.WithLabelValues(
strconv.FormatInt(paramtable.GetNodeID(), 10),
method,
metrics.AbandonLabel,
request.GetDbName(),
request.GetCollectionName(),
).Inc()
}

return &milvuspb.QueryResults{
Status: merr.Status(err),
Expand All @@ -3510,45 +3487,36 @@ func (node *Proxy) query(ctx context.Context, qt *queryTask) (*milvuspb.QueryRes
rpcFailedToWaitToFinish(method),
zap.Error(err))

metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
metrics.FailLabel, request.GetDbName(), request.GetCollectionName()).Inc()
if isProxyRequest {
metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
metrics.FailLabel, request.GetDbName(), request.GetCollectionName()).Inc()
}

return &milvuspb.QueryResults{
Status: merr.Status(err),
}, nil
}
span := tr.CtxRecord(ctx, "wait query result")
metrics.ProxyWaitForSearchResultLatency.WithLabelValues(
strconv.FormatInt(paramtable.GetNodeID(), 10),
metrics.QueryLabel,
).Observe(float64(span.Milliseconds()))

log.Debug(rpcDone(method))

metrics.ProxyFunctionCall.WithLabelValues(
strconv.FormatInt(paramtable.GetNodeID(), 10),
method,
metrics.SuccessLabel,
request.GetDbName(),
request.GetCollectionName(),
).Inc()

metrics.ProxySQLatency.WithLabelValues(
strconv.FormatInt(paramtable.GetNodeID(), 10),
metrics.QueryLabel,
request.GetDbName(),
request.GetCollectionName(),
).Observe(float64(tr.ElapseSpan().Milliseconds()))
if isProxyRequest {
span := tr.CtxRecord(ctx, "wait query result")
metrics.ProxyWaitForSearchResultLatency.WithLabelValues(
strconv.FormatInt(paramtable.GetNodeID(), 10),
metrics.QueryLabel,
).Observe(float64(span.Milliseconds()))

metrics.ProxyCollectionSQLatency.WithLabelValues(
strconv.FormatInt(paramtable.GetNodeID(), 10),
metrics.QueryLabel,
request.CollectionName,
).Observe(float64(tr.ElapseSpan().Milliseconds()))
metrics.ProxySQLatency.WithLabelValues(
strconv.FormatInt(paramtable.GetNodeID(), 10),
metrics.QueryLabel,
request.GetDbName(),
request.GetCollectionName(),
).Observe(float64(tr.ElapseSpan().Milliseconds()))

sentSize := proto.Size(qt.result)
rateCol.Add(metricsinfo.ReadResultThroughput, float64(sentSize), subLabel)
metrics.ProxyReadReqSendBytes.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10)).Add(float64(sentSize))
metrics.ProxyCollectionSQLatency.WithLabelValues(
strconv.FormatInt(paramtable.GetNodeID(), 10),
metrics.QueryLabel,
request.CollectionName,
).Observe(float64(tr.ElapseSpan().Milliseconds()))
}

return qt.result, nil
}
Expand All @@ -3570,22 +3538,73 @@ func (node *Proxy) Query(ctx context.Context, request *milvuspb.QueryRequest) (*
lb: node.lbPolicy,
mustUsePartitionKey: Params.ProxyCfg.MustUsePartitionKey.GetAsBool(),
}

subLabel := GetCollectionRateSubLabel(request)
receiveSize := proto.Size(request)
metrics.ProxyReceiveBytes.WithLabelValues(
strconv.FormatInt(paramtable.GetNodeID(), 10),
metrics.QueryLabel,
request.GetCollectionName(),
).Add(float64(receiveSize))
metrics.ProxyReceivedNQ.WithLabelValues(
strconv.FormatInt(paramtable.GetNodeID(), 10),
metrics.SearchLabel,
request.GetCollectionName(),
).Add(float64(1))

rateCol.Add(internalpb.RateType_DQLQuery.String(), 1, subLabel)

if err := merr.CheckHealthy(node.GetStateCode()); err != nil {
return &milvuspb.QueryResults{
Status: merr.Status(err),
}, nil
}

ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-Query")
defer sp.End()
method := "Query"

metrics.ProxyFunctionCall.WithLabelValues(
strconv.FormatInt(paramtable.GetNodeID(), 10),
method,
metrics.TotalLabel,
request.GetDbName(),
request.GetCollectionName(),
).Inc()

ctx = SetRequestLabelForContext(ctx)
res, err := node.query(ctx, qt)
if merr.Ok(res.Status) && err == nil {
username := GetCurUserFromContextOrDefault(ctx)
nodeID := paramtable.GetStringNodeID()
v := Extension.Report(map[string]any{
hookutil.OpTypeKey: hookutil.OpTypeQuery,
hookutil.DatabaseKey: request.DbName,
hookutil.UsernameKey: username,
hookutil.ResultDataSizeKey: proto.Size(res),
hookutil.RelatedDataSizeKey: qt.totalRelatedDataSize,
hookutil.RelatedCntKey: qt.allQueryCnt,
})
SetReportValue(res.Status, v)
metrics.ProxyReportValue.WithLabelValues(nodeID, hookutil.OpTypeQuery, request.DbName, username).Add(float64(v))
if err != nil || !merr.Ok(res.Status) {
return res, err
}
return res, err

log.Debug(rpcDone(method))

metrics.ProxyFunctionCall.WithLabelValues(
strconv.FormatInt(paramtable.GetNodeID(), 10),
method,
metrics.SuccessLabel,
request.GetDbName(),
request.GetCollectionName(),
).Inc()

sentSize := proto.Size(qt.result)
rateCol.Add(metricsinfo.ReadResultThroughput, float64(sentSize), subLabel)
metrics.ProxyReadReqSendBytes.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10)).Add(float64(sentSize))

username := GetCurUserFromContextOrDefault(ctx)
nodeID := paramtable.GetStringNodeID()
v := Extension.Report(map[string]any{
hookutil.OpTypeKey: hookutil.OpTypeQuery,
hookutil.DatabaseKey: request.DbName,
hookutil.UsernameKey: username,
hookutil.ResultDataSizeKey: proto.Size(res),
hookutil.RelatedDataSizeKey: qt.totalRelatedDataSize,
hookutil.RelatedCntKey: qt.allQueryCnt,
})
SetReportValue(res.Status, v)
metrics.ProxyReportValue.WithLabelValues(nodeID, hookutil.OpTypeQuery, request.DbName, username).Add(float64(v))
return res, nil
}

// CreateAlias create alias for collection, then you can search the collection with alias.
Expand Down
19 changes: 19 additions & 0 deletions internal/proxy/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -1620,3 +1620,22 @@ func GetCostValue(status *commonpb.Status) int {
}
return value
}

type isProxyRequestKeyType struct{}

var ctxProxyRequestKey = isProxyRequestKeyType{}

func SetRequestLabelForContext(ctx context.Context) context.Context {
return context.WithValue(ctx, ctxProxyRequestKey, true)
}

func GetRequestLabelFromContext(ctx context.Context) bool {
if ctx == nil {
return false
}
v := ctx.Value(ctxProxyRequestKey)
if v == nil {
return false
}
return v.(bool)
}
21 changes: 21 additions & 0 deletions internal/proxy/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2294,3 +2294,24 @@ func TestGetCostValue(t *testing.T) {
assert.Equal(t, 100, cost)
})
}

func TestRequestLabelWithContext(t *testing.T) {
ctx := context.Background()

{
label := GetRequestLabelFromContext(ctx)
assert.False(t, label)
}

ctx = SetRequestLabelForContext(ctx)
{
label := GetRequestLabelFromContext(ctx)
assert.True(t, label)
}

{
// nolint
label := GetRequestLabelFromContext(nil)
assert.False(t, label)
}
}

0 comments on commit 2964f60

Please sign in to comment.