diff --git a/internal/proxy/impl.go b/internal/proxy/impl.go index 340b21fa33300..91fe1244b80fc 100644 --- a/internal/proxy/impl.go +++ b/internal/proxy/impl.go @@ -3415,21 +3415,8 @@ func (node *Proxy) Flush(ctx context.Context, request *milvuspb.FlushRequest) (* // Query get the records by primary keys. func (node *Proxy) query(ctx context.Context, qt *queryTask) (*milvuspb.QueryResults, error) { request := qt.request - receiveSize := proto.Size(request) - metrics.ProxyReceiveBytes.WithLabelValues( - strconv.FormatInt(paramtable.GetNodeID(), 10), - metrics.QueryLabel, - request.GetCollectionName(), - ).Add(float64(receiveSize)) - - metrics.ProxyReceivedNQ.WithLabelValues( - strconv.FormatInt(paramtable.GetNodeID(), 10), - metrics.SearchLabel, - request.GetCollectionName(), - ).Add(float64(1)) - - subLabel := GetCollectionRateSubLabel(request) - rateCol.Add(internalpb.RateType_DQLQuery.String(), 1, subLabel) + method := "Query" + isProxyRequest := GetRequestLabelFromContext(ctx) if err := merr.CheckHealthy(node.GetStateCode()); err != nil { return &milvuspb.QueryResults{ @@ -3437,20 +3424,6 @@ func (node *Proxy) query(ctx context.Context, qt *queryTask) (*milvuspb.QueryRes }, nil } - ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-Query") - defer sp.End() - tr := timerecord.NewTimeRecorder("Query") - - method := "Query" - - metrics.ProxyFunctionCall.WithLabelValues( - strconv.FormatInt(paramtable.GetNodeID(), 10), - method, - metrics.TotalLabel, - request.GetDbName(), - request.GetCollectionName(), - ).Inc() - log := log.Ctx(ctx).With( zap.String("role", typeutil.ProxyRole), zap.String("db", request.DbName), @@ -3458,6 +3431,16 @@ func (node *Proxy) query(ctx context.Context, qt *queryTask) (*milvuspb.QueryRes zap.Strings("partitions", request.PartitionNames), ) + log.Debug( + rpcReceived(method), + zap.String("expr", request.Expr), + zap.Strings("OutputFields", request.OutputFields), + zap.Uint64("travel_timestamp", request.TravelTimestamp), + zap.Uint64("guarantee_timestamp", request.GuaranteeTimestamp), + ) + + tr := timerecord.NewTimeRecorder(method) + defer func() { span := tr.ElapseSpan() if span >= paramtable.Get().ProxyCfg.SlowQuerySpanInSeconds.GetAsDuration(time.Second) { @@ -3475,27 +3458,21 @@ func (node *Proxy) query(ctx context.Context, qt *queryTask) (*milvuspb.QueryRes } }() - log.Debug( - rpcReceived(method), - zap.String("expr", request.Expr), - zap.Strings("OutputFields", request.OutputFields), - zap.Uint64("travel_timestamp", request.TravelTimestamp), - zap.Uint64("guarantee_timestamp", request.GuaranteeTimestamp), - ) - if err := node.sched.dqQueue.Enqueue(qt); err != nil { log.Warn( rpcFailedToEnqueue(method), zap.Error(err), ) - metrics.ProxyFunctionCall.WithLabelValues( - strconv.FormatInt(paramtable.GetNodeID(), 10), - method, - metrics.AbandonLabel, - request.GetDbName(), - request.GetCollectionName(), - ).Inc() + if isProxyRequest { + metrics.ProxyFunctionCall.WithLabelValues( + strconv.FormatInt(paramtable.GetNodeID(), 10), + method, + metrics.AbandonLabel, + request.GetDbName(), + request.GetCollectionName(), + ).Inc() + } return &milvuspb.QueryResults{ Status: merr.Status(err), @@ -3510,45 +3487,36 @@ func (node *Proxy) query(ctx context.Context, qt *queryTask) (*milvuspb.QueryRes rpcFailedToWaitToFinish(method), zap.Error(err)) - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.FailLabel, request.GetDbName(), request.GetCollectionName()).Inc() + if isProxyRequest { + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, + metrics.FailLabel, request.GetDbName(), request.GetCollectionName()).Inc() + } return &milvuspb.QueryResults{ Status: merr.Status(err), }, nil } - span := tr.CtxRecord(ctx, "wait query result") - metrics.ProxyWaitForSearchResultLatency.WithLabelValues( - strconv.FormatInt(paramtable.GetNodeID(), 10), - metrics.QueryLabel, - ).Observe(float64(span.Milliseconds())) - log.Debug(rpcDone(method)) - - metrics.ProxyFunctionCall.WithLabelValues( - strconv.FormatInt(paramtable.GetNodeID(), 10), - method, - metrics.SuccessLabel, - request.GetDbName(), - request.GetCollectionName(), - ).Inc() - - metrics.ProxySQLatency.WithLabelValues( - strconv.FormatInt(paramtable.GetNodeID(), 10), - metrics.QueryLabel, - request.GetDbName(), - request.GetCollectionName(), - ).Observe(float64(tr.ElapseSpan().Milliseconds())) + if isProxyRequest { + span := tr.CtxRecord(ctx, "wait query result") + metrics.ProxyWaitForSearchResultLatency.WithLabelValues( + strconv.FormatInt(paramtable.GetNodeID(), 10), + metrics.QueryLabel, + ).Observe(float64(span.Milliseconds())) - metrics.ProxyCollectionSQLatency.WithLabelValues( - strconv.FormatInt(paramtable.GetNodeID(), 10), - metrics.QueryLabel, - request.CollectionName, - ).Observe(float64(tr.ElapseSpan().Milliseconds())) + metrics.ProxySQLatency.WithLabelValues( + strconv.FormatInt(paramtable.GetNodeID(), 10), + metrics.QueryLabel, + request.GetDbName(), + request.GetCollectionName(), + ).Observe(float64(tr.ElapseSpan().Milliseconds())) - sentSize := proto.Size(qt.result) - rateCol.Add(metricsinfo.ReadResultThroughput, float64(sentSize), subLabel) - metrics.ProxyReadReqSendBytes.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10)).Add(float64(sentSize)) + metrics.ProxyCollectionSQLatency.WithLabelValues( + strconv.FormatInt(paramtable.GetNodeID(), 10), + metrics.QueryLabel, + request.CollectionName, + ).Observe(float64(tr.ElapseSpan().Milliseconds())) + } return qt.result, nil } @@ -3570,22 +3538,73 @@ func (node *Proxy) Query(ctx context.Context, request *milvuspb.QueryRequest) (* lb: node.lbPolicy, mustUsePartitionKey: Params.ProxyCfg.MustUsePartitionKey.GetAsBool(), } + + subLabel := GetCollectionRateSubLabel(request) + receiveSize := proto.Size(request) + metrics.ProxyReceiveBytes.WithLabelValues( + strconv.FormatInt(paramtable.GetNodeID(), 10), + metrics.QueryLabel, + request.GetCollectionName(), + ).Add(float64(receiveSize)) + metrics.ProxyReceivedNQ.WithLabelValues( + strconv.FormatInt(paramtable.GetNodeID(), 10), + metrics.SearchLabel, + request.GetCollectionName(), + ).Add(float64(1)) + + rateCol.Add(internalpb.RateType_DQLQuery.String(), 1, subLabel) + + if err := merr.CheckHealthy(node.GetStateCode()); err != nil { + return &milvuspb.QueryResults{ + Status: merr.Status(err), + }, nil + } + + ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-Query") + defer sp.End() + method := "Query" + + metrics.ProxyFunctionCall.WithLabelValues( + strconv.FormatInt(paramtable.GetNodeID(), 10), + method, + metrics.TotalLabel, + request.GetDbName(), + request.GetCollectionName(), + ).Inc() + + ctx = SetRequestLabelForContext(ctx) res, err := node.query(ctx, qt) - if merr.Ok(res.Status) && err == nil { - username := GetCurUserFromContextOrDefault(ctx) - nodeID := paramtable.GetStringNodeID() - v := Extension.Report(map[string]any{ - hookutil.OpTypeKey: hookutil.OpTypeQuery, - hookutil.DatabaseKey: request.DbName, - hookutil.UsernameKey: username, - hookutil.ResultDataSizeKey: proto.Size(res), - hookutil.RelatedDataSizeKey: qt.totalRelatedDataSize, - hookutil.RelatedCntKey: qt.allQueryCnt, - }) - SetReportValue(res.Status, v) - metrics.ProxyReportValue.WithLabelValues(nodeID, hookutil.OpTypeQuery, request.DbName, username).Add(float64(v)) + if err != nil || !merr.Ok(res.Status) { + return res, err } - return res, err + + log.Debug(rpcDone(method)) + + metrics.ProxyFunctionCall.WithLabelValues( + strconv.FormatInt(paramtable.GetNodeID(), 10), + method, + metrics.SuccessLabel, + request.GetDbName(), + request.GetCollectionName(), + ).Inc() + + sentSize := proto.Size(qt.result) + rateCol.Add(metricsinfo.ReadResultThroughput, float64(sentSize), subLabel) + metrics.ProxyReadReqSendBytes.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10)).Add(float64(sentSize)) + + username := GetCurUserFromContextOrDefault(ctx) + nodeID := paramtable.GetStringNodeID() + v := Extension.Report(map[string]any{ + hookutil.OpTypeKey: hookutil.OpTypeQuery, + hookutil.DatabaseKey: request.DbName, + hookutil.UsernameKey: username, + hookutil.ResultDataSizeKey: proto.Size(res), + hookutil.RelatedDataSizeKey: qt.totalRelatedDataSize, + hookutil.RelatedCntKey: qt.allQueryCnt, + }) + SetReportValue(res.Status, v) + metrics.ProxyReportValue.WithLabelValues(nodeID, hookutil.OpTypeQuery, request.DbName, username).Add(float64(v)) + return res, nil } // CreateAlias create alias for collection, then you can search the collection with alias. diff --git a/internal/proxy/util.go b/internal/proxy/util.go index 0e6b373f27dc4..8982a17800aca 100644 --- a/internal/proxy/util.go +++ b/internal/proxy/util.go @@ -1620,3 +1620,22 @@ func GetCostValue(status *commonpb.Status) int { } return value } + +type isProxyRequestKeyType struct{} + +var ctxProxyRequestKey = isProxyRequestKeyType{} + +func SetRequestLabelForContext(ctx context.Context) context.Context { + return context.WithValue(ctx, ctxProxyRequestKey, true) +} + +func GetRequestLabelFromContext(ctx context.Context) bool { + if ctx == nil { + return false + } + v := ctx.Value(ctxProxyRequestKey) + if v == nil { + return false + } + return v.(bool) +} diff --git a/internal/proxy/util_test.go b/internal/proxy/util_test.go index 2d066d0f99add..46b3189351a0b 100644 --- a/internal/proxy/util_test.go +++ b/internal/proxy/util_test.go @@ -2294,3 +2294,24 @@ func TestGetCostValue(t *testing.T) { assert.Equal(t, 100, cost) }) } + +func TestRequestLabelWithContext(t *testing.T) { + ctx := context.Background() + + { + label := GetRequestLabelFromContext(ctx) + assert.False(t, label) + } + + ctx = SetRequestLabelForContext(ctx) + { + label := GetRequestLabelFromContext(ctx) + assert.True(t, label) + } + + { + // nolint + label := GetRequestLabelFromContext(nil) + assert.False(t, label) + } +}