diff --git a/internal/distributed/proxy/httpserver/handler_v2.go b/internal/distributed/proxy/httpserver/handler_v2.go index 40a9a9b812508..d2093e52996db 100644 --- a/internal/distributed/proxy/httpserver/handler_v2.go +++ b/internal/distributed/proxy/httpserver/handler_v2.go @@ -29,6 +29,7 @@ import ( "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/metrics" "github.com/milvus-io/milvus/pkg/util/crypto" "github.com/milvus-io/milvus/pkg/util/funcutil" "github.com/milvus-io/milvus/pkg/util/merr" @@ -61,41 +62,49 @@ func (h *HandlersV2) RegisterRoutesToV2(router gin.IRouter) { router.POST(CollectionCategory+LoadAction, timeoutMiddleware(wrapperPost(func() any { return &CollectionNameReq{} }, wrapperTraceLog(h.wrapperCheckDatabase(h.loadCollection))))) router.POST(CollectionCategory+ReleaseAction, timeoutMiddleware(wrapperPost(func() any { return &CollectionNameReq{} }, wrapperTraceLog(h.wrapperCheckDatabase(h.releaseCollection))))) - router.POST(EntityCategory+QueryAction, timeoutMiddleware(wrapperPost(func() any { + // Query + router.POST(EntityCategory+QueryAction, restfulSizeMiddleware(timeoutMiddleware(wrapperPost(func() any { return &QueryReqV2{ Limit: 100, OutputFields: []string{DefaultOutputFields}, } - }, wrapperTraceLog(h.wrapperCheckDatabase(h.query))))) - router.POST(EntityCategory+GetAction, timeoutMiddleware(wrapperPost(func() any { + }, wrapperTraceLog(h.wrapperCheckDatabase(h.query)))), true)) + // Get + router.POST(EntityCategory+GetAction, restfulSizeMiddleware(timeoutMiddleware(wrapperPost(func() any { return &CollectionIDReq{ OutputFields: []string{DefaultOutputFields}, } - }, wrapperTraceLog(h.wrapperCheckDatabase(h.get))))) - router.POST(EntityCategory+DeleteAction, timeoutMiddleware(wrapperPost(func() any { + }, wrapperTraceLog(h.wrapperCheckDatabase(h.get)))), true)) + // Delete + router.POST(EntityCategory+DeleteAction, restfulSizeMiddleware(timeoutMiddleware(wrapperPost(func() any { return &CollectionFilterReq{} - }, wrapperTraceLog(h.wrapperCheckDatabase(h.delete))))) - router.POST(EntityCategory+InsertAction, timeoutMiddleware(wrapperPost(func() any { + }, wrapperTraceLog(h.wrapperCheckDatabase(h.delete)))), false)) + // Insert + router.POST(EntityCategory+InsertAction, restfulSizeMiddleware(timeoutMiddleware(wrapperPost(func() any { return &CollectionDataReq{} - }, wrapperTraceLog(h.wrapperCheckDatabase(h.insert))))) - router.POST(EntityCategory+UpsertAction, timeoutMiddleware(wrapperPost(func() any { + }, wrapperTraceLog(h.wrapperCheckDatabase(h.insert)))), false)) + // Upsert + router.POST(EntityCategory+UpsertAction, restfulSizeMiddleware(timeoutMiddleware(wrapperPost(func() any { return &CollectionDataReq{} - }, wrapperTraceLog(h.wrapperCheckDatabase(h.upsert))))) - router.POST(EntityCategory+SearchAction, timeoutMiddleware(wrapperPost(func() any { + }, wrapperTraceLog(h.wrapperCheckDatabase(h.upsert)))), false)) + // Search + router.POST(EntityCategory+SearchAction, restfulSizeMiddleware(timeoutMiddleware(wrapperPost(func() any { return &SearchReqV2{ Limit: 100, } - }, wrapperTraceLog(h.wrapperCheckDatabase(h.search))))) - router.POST(EntityCategory+AdvancedSearchAction, timeoutMiddleware(wrapperPost(func() any { + }, wrapperTraceLog(h.wrapperCheckDatabase(h.search)))), true)) + // advanced_search, backward compatible uri + router.POST(EntityCategory+AdvancedSearchAction, restfulSizeMiddleware(timeoutMiddleware(wrapperPost(func() any { return &HybridSearchReq{ Limit: 100, } - }, wrapperTraceLog(h.wrapperCheckDatabase(h.advancedSearch))))) - router.POST(EntityCategory+HybridSearchAction, timeoutMiddleware(wrapperPost(func() any { + }, wrapperTraceLog(h.wrapperCheckDatabase(h.advancedSearch)))), true)) + // HybridSearch + router.POST(EntityCategory+HybridSearchAction, restfulSizeMiddleware(timeoutMiddleware(wrapperPost(func() any { return &HybridSearchReq{ Limit: 100, } - }, wrapperTraceLog(h.wrapperCheckDatabase(h.advancedSearch))))) + }, wrapperTraceLog(h.wrapperCheckDatabase(h.advancedSearch)))), true)) router.POST(PartitionCategory+ListAction, timeoutMiddleware(wrapperPost(func() any { return &CollectionNameReq{} }, wrapperTraceLog(h.wrapperCheckDatabase(h.listPartitions))))) router.POST(PartitionCategory+HasAction, timeoutMiddleware(wrapperPost(func() any { return &PartitionReq{} }, wrapperTraceLog(h.wrapperCheckDatabase(h.hasPartitions))))) @@ -183,7 +192,7 @@ func wrapperPost(newReq newReqFunc, v2 handlerFuncV2) gin.HandlerFunc { } } username, _ := c.Get(ContextUsername) - ctx, span := otel.Tracer(typeutil.ProxyRole).Start(context.Background(), c.Request.URL.Path) + ctx, span := otel.Tracer(typeutil.ProxyRole).Start(c, c.Request.URL.Path) defer span.End() ctx = proxy.NewContextWithMetadata(ctx, username.(string), dbName) traceID := span.SpanContext().TraceID().String() @@ -195,6 +204,15 @@ func wrapperPost(newReq newReqFunc, v2 handlerFuncV2) gin.HandlerFunc { } } +// restfulSizeMiddleware is the middleware fetchs metrics stats from gin struct. +func restfulSizeMiddleware(handler gin.HandlerFunc, observeOutbound bool) gin.HandlerFunc { + return func(ctx *gin.Context) { + h := metrics.WrapRestfulContext(ctx, ctx.Request.ContentLength) + handler(ctx) + metrics.RecordRestfulMetrics(h, int64(ctx.Writer.Size()), observeOutbound) + } +} + func wrapperTraceLog(v2 handlerFuncV2) handlerFuncV2 { return func(ctx context.Context, c *gin.Context, req any, dbName string) (interface{}, error) { switch proxy.Params.CommonCfg.TraceLogMode.GetAsInt() { diff --git a/pkg/metrics/grpc_stats_handler.go b/pkg/metrics/grpc_stats_handler.go index 0a473e83e88f8..50b887db10fd1 100644 --- a/pkg/metrics/grpc_stats_handler.go +++ b/pkg/metrics/grpc_stats_handler.go @@ -25,12 +25,12 @@ import ( "github.com/milvus-io/milvus/pkg/util/typeutil" ) -// milvusGrpcKey is context key type. -type milvusGrpcKey struct{} +// milvusStatsKey is context key type. +type milvusStatsKey struct{} -// GrpcStats stores the meta and payload size info +// RPCStats stores the meta and payload size info // it should be attached to context so that request sizing could be avoided -type GrpcStats struct { +type RPCStats struct { fullMethodName string collectionName string inboundPayloadSize int @@ -38,7 +38,7 @@ type GrpcStats struct { nodeID int64 } -func (s *GrpcStats) SetCollectionName(collName string) *GrpcStats { +func (s *RPCStats) SetCollectionName(collName string) *RPCStats { if s == nil { return s } @@ -46,7 +46,7 @@ func (s *GrpcStats) SetCollectionName(collName string) *GrpcStats { return s } -func (s *GrpcStats) SetInboundLabel(label string) *GrpcStats { +func (s *RPCStats) SetInboundLabel(label string) *RPCStats { if s == nil { return s } @@ -54,7 +54,7 @@ func (s *GrpcStats) SetInboundLabel(label string) *GrpcStats { return s } -func (s *GrpcStats) SetNodeID(nodeID int64) *GrpcStats { +func (s *RPCStats) SetNodeID(nodeID int64) *RPCStats { if s == nil { return s } @@ -62,12 +62,12 @@ func (s *GrpcStats) SetNodeID(nodeID int64) *GrpcStats { return s } -func attachStats(ctx context.Context, stats *GrpcStats) context.Context { - return context.WithValue(ctx, milvusGrpcKey{}, stats) +func attachStats(ctx context.Context, stats *RPCStats) context.Context { + return context.WithValue(ctx, milvusStatsKey{}, stats) } -func GetStats(ctx context.Context) *GrpcStats { - stats, ok := ctx.Value(milvusGrpcKey{}).(*GrpcStats) +func GetStats(ctx context.Context) *RPCStats { + stats, ok := ctx.Value(milvusStatsKey{}).(*RPCStats) if !ok { return nil } @@ -122,7 +122,7 @@ func (h *grpcSizeStatsHandler) TagRPC(ctx context.Context, info *stats.RPCTagInf return ctx } // attach stats - return attachStats(ctx, &GrpcStats{fullMethodName: info.FullMethodName}) + return attachStats(ctx, &RPCStats{fullMethodName: info.FullMethodName}) } // HandleRPC implements per-RPC stats instrumentation. diff --git a/pkg/metrics/restful_middleware.go b/pkg/metrics/restful_middleware.go new file mode 100644 index 0000000000000..9461c0e3b677a --- /dev/null +++ b/pkg/metrics/restful_middleware.go @@ -0,0 +1,45 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package metrics + +import ( + "context" + "strconv" +) + +func WrapRestfulContext(ctx context.Context, inputLength int64) context.Context { + return context.WithValue(ctx, milvusStatsKey{}, &RPCStats{ + inboundPayloadSize: int(inputLength), + }) +} + +func RecordRestfulMetrics(ctx context.Context, outputLength int64, observeOutbound bool) { + if mstats := GetStats(ctx); mstats != nil { + // all info set + // set metrics with inbound size and related meta + nodeIDValue := strconv.FormatInt(mstats.nodeID, 10) + if mstats.inboundPayloadSize > 0 { + ProxyReceiveBytes.WithLabelValues( + nodeIDValue, + mstats.inboundLabel, mstats.collectionName).Add(float64(mstats.inboundPayloadSize)) + } + // set outbound payload size metrics + if outputLength > 0 && observeOutbound { + ProxyReadReqSendBytes.WithLabelValues(nodeIDValue).Add(float64(outputLength)) + } + } +}