From 2791f3d79e60bc09c69cd12d3302f16c907d13f0 Mon Sep 17 00:00:00 2001 From: SungJin1212 Date: Sat, 14 Dec 2024 07:52:45 +0900 Subject: [PATCH] Add query response series log Signed-off-by: SungJin1212 --- CHANGELOG.md | 1 + pkg/frontend/transport/handler.go | 24 +++++++++-------- pkg/frontend/transport/handler_test.go | 18 +++++++------ pkg/querier/stats/stats.go | 27 +++++++++++++++---- .../tripperware/instantquery/instant_query.go | 4 +++ .../tripperware/queryrange/query_range.go | 4 +++ pkg/querier/tripperware/util.go | 17 ++++++++++++ 7 files changed, 71 insertions(+), 24 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 37ea06d80f..2afed0590d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,7 @@ * [FEATURE] Chunk Cache: Support multi level cache and add metrics. #6249 * [FEATURE] Distributor: Accept multiple HA Tracker pairs in the same request. #6256 * [FEATURE] Ruler: Add support for per-user external labels #6340 +* [ENHANCEMENT] Query Frontend: Add a number of series in query response to query stat log. #6423 * [ENHANCEMENT] Store Gateway: Add a hedged request to reduce the tail latency. #6388 * [ENHANCEMENT] Ingester: Add metrics to track succeed/failed native histograms. #6370 * [ENHANCEMENT] Query Frontend/Querier: Add an experimental flag `-querier.enable-promql-experimental-functions` to enable experimental promQL functions. #6355 diff --git a/pkg/frontend/transport/handler.go b/pkg/frontend/transport/handler.go index 9a2292bb46..bba995988d 100644 --- a/pkg/frontend/transport/handler.go +++ b/pkg/frontend/transport/handler.go @@ -90,7 +90,7 @@ type Handler struct { // Metrics. querySeconds *prometheus.CounterVec - querySeries *prometheus.CounterVec + queryFetchedSeries *prometheus.CounterVec queryFetchedSamples *prometheus.CounterVec queryScannedSamples *prometheus.CounterVec queryPeakSamples *prometheus.HistogramVec @@ -114,7 +114,7 @@ func NewHandler(cfg HandlerConfig, roundTripper http.RoundTripper, log log.Logge Help: "Total amount of wall clock time spend processing queries.", }, []string{"user"}) - h.querySeries = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + h.queryFetchedSeries = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ Name: "cortex_query_fetched_series_total", Help: "Number of series fetched to execute a query.", }, []string{"user"}) @@ -158,7 +158,7 @@ func NewHandler(cfg HandlerConfig, roundTripper http.RoundTripper, log log.Logge h.activeUsers = util.NewActiveUsersCleanupWithDefaultValues(func(user string) { h.querySeconds.DeleteLabelValues(user) - h.querySeries.DeleteLabelValues(user) + h.queryFetchedSeries.DeleteLabelValues(user) h.queryFetchedSamples.DeleteLabelValues(user) h.queryScannedSamples.DeleteLabelValues(user) h.queryPeakSamples.DeleteLabelValues(user) @@ -338,9 +338,10 @@ func (f *Handler) reportSlowQuery(r *http.Request, queryString url.Values, query func (f *Handler) reportQueryStats(r *http.Request, userID string, queryString url.Values, queryResponseTime time.Duration, stats *querier_stats.QueryStats, error error, statusCode int, resp *http.Response) { wallTime := stats.LoadWallTime() queryStorageWallTime := stats.LoadQueryStorageWallTime() - numSeries := stats.LoadFetchedSeries() - numChunks := stats.LoadFetchedChunks() - numSamples := stats.LoadFetchedSamples() + numResponseSeries := stats.LoadResponseSeries() + numFetchedSeries := stats.LoadFetchedSeries() + numFetchedChunks := stats.LoadFetchedChunks() + numFetchedSamples := stats.LoadFetchedSamples() numScannedSamples := stats.LoadScannedSamples() numPeakSamples := stats.LoadPeakSamples() numChunkBytes := stats.LoadFetchedChunkBytes() @@ -353,8 +354,8 @@ func (f *Handler) reportQueryStats(r *http.Request, userID string, queryString u // Track stats. f.querySeconds.WithLabelValues(userID).Add(wallTime.Seconds()) - f.querySeries.WithLabelValues(userID).Add(float64(numSeries)) - f.queryFetchedSamples.WithLabelValues(userID).Add(float64(numSamples)) + f.queryFetchedSeries.WithLabelValues(userID).Add(float64(numFetchedSeries)) + f.queryFetchedSamples.WithLabelValues(userID).Add(float64(numFetchedSamples)) f.queryScannedSamples.WithLabelValues(userID).Add(float64(numScannedSamples)) f.queryPeakSamples.WithLabelValues(userID).Observe(float64(numPeakSamples)) f.queryChunkBytes.WithLabelValues(userID).Add(float64(numChunkBytes)) @@ -378,9 +379,10 @@ func (f *Handler) reportQueryStats(r *http.Request, userID string, queryString u "path", r.URL.Path, "response_time", queryResponseTime, "query_wall_time_seconds", wallTime.Seconds(), - "fetched_series_count", numSeries, - "fetched_chunks_count", numChunks, - "fetched_samples_count", numSamples, + "response_series_count", numResponseSeries, + "fetched_series_count", numFetchedSeries, + "fetched_chunks_count", numFetchedChunks, + "fetched_samples_count", numFetchedSamples, "fetched_chunks_bytes", numChunkBytes, "fetched_data_bytes", numDataBytes, "split_queries", splitQueries, diff --git a/pkg/frontend/transport/handler_test.go b/pkg/frontend/transport/handler_test.go index d394fb2670..2c773d42d6 100644 --- a/pkg/frontend/transport/handler_test.go +++ b/pkg/frontend/transport/handler_test.go @@ -429,14 +429,15 @@ func TestReportQueryStatsFormat(t *testing.T) { tests := map[string]testCase{ "should not include query and header details if empty": { - expectedLog: `level=info msg="query stats" component=query-frontend method=GET path=/prometheus/api/v1/query response_time=1s query_wall_time_seconds=0 fetched_series_count=0 fetched_chunks_count=0 fetched_samples_count=0 fetched_chunks_bytes=0 fetched_data_bytes=0 split_queries=0 status_code=200 response_size=1000`, + expectedLog: `level=info msg="query stats" component=query-frontend method=GET path=/prometheus/api/v1/query response_time=1s query_wall_time_seconds=0 response_series_count=0 fetched_series_count=0 fetched_chunks_count=0 fetched_samples_count=0 fetched_chunks_bytes=0 fetched_data_bytes=0 split_queries=0 status_code=200 response_size=1000`, }, "should include query length and string at the end": { queryString: url.Values(map[string][]string{"query": {"up"}}), - expectedLog: `level=info msg="query stats" component=query-frontend method=GET path=/prometheus/api/v1/query response_time=1s query_wall_time_seconds=0 fetched_series_count=0 fetched_chunks_count=0 fetched_samples_count=0 fetched_chunks_bytes=0 fetched_data_bytes=0 split_queries=0 status_code=200 response_size=1000 query_length=2 param_query=up`, + expectedLog: `level=info msg="query stats" component=query-frontend method=GET path=/prometheus/api/v1/query response_time=1s query_wall_time_seconds=0 response_series_count=0 fetched_series_count=0 fetched_chunks_count=0 fetched_samples_count=0 fetched_chunks_bytes=0 fetched_data_bytes=0 split_queries=0 status_code=200 response_size=1000 query_length=2 param_query=up`, }, "should include query stats": { queryStats: &querier_stats.QueryStats{ + QueryResponseSeries: 100, Stats: querier_stats.Stats{ WallTime: 3 * time.Second, QueryStorageWallTime: 100 * time.Minute, @@ -448,15 +449,15 @@ func TestReportQueryStatsFormat(t *testing.T) { SplitQueries: 10, }, }, - expectedLog: `level=info msg="query stats" component=query-frontend method=GET path=/prometheus/api/v1/query response_time=1s query_wall_time_seconds=3 fetched_series_count=100 fetched_chunks_count=200 fetched_samples_count=300 fetched_chunks_bytes=1024 fetched_data_bytes=2048 split_queries=10 status_code=200 response_size=1000 query_storage_wall_time_seconds=6000`, + expectedLog: `level=info msg="query stats" component=query-frontend method=GET path=/prometheus/api/v1/query response_time=1s query_wall_time_seconds=3 response_series_count=100 fetched_series_count=100 fetched_chunks_count=200 fetched_samples_count=300 fetched_chunks_bytes=1024 fetched_data_bytes=2048 split_queries=10 status_code=200 response_size=1000 query_storage_wall_time_seconds=6000`, }, "should include user agent": { header: http.Header{"User-Agent": []string{"Grafana"}}, - expectedLog: `level=info msg="query stats" component=query-frontend method=GET path=/prometheus/api/v1/query response_time=1s query_wall_time_seconds=0 fetched_series_count=0 fetched_chunks_count=0 fetched_samples_count=0 fetched_chunks_bytes=0 fetched_data_bytes=0 split_queries=0 status_code=200 response_size=1000 user_agent=Grafana`, + expectedLog: `level=info msg="query stats" component=query-frontend method=GET path=/prometheus/api/v1/query response_time=1s query_wall_time_seconds=0 response_series_count=0 fetched_series_count=0 fetched_chunks_count=0 fetched_samples_count=0 fetched_chunks_bytes=0 fetched_data_bytes=0 split_queries=0 status_code=200 response_size=1000 user_agent=Grafana`, }, "should include response error": { responseErr: errors.New("foo_err"), - expectedLog: `level=error msg="query stats" component=query-frontend method=GET path=/prometheus/api/v1/query response_time=1s query_wall_time_seconds=0 fetched_series_count=0 fetched_chunks_count=0 fetched_samples_count=0 fetched_chunks_bytes=0 fetched_data_bytes=0 split_queries=0 status_code=200 response_size=1000 error=foo_err`, + expectedLog: `level=error msg="query stats" component=query-frontend method=GET path=/prometheus/api/v1/query response_time=1s query_wall_time_seconds=0 response_series_count=0 fetched_series_count=0 fetched_chunks_count=0 fetched_samples_count=0 fetched_chunks_bytes=0 fetched_data_bytes=0 split_queries=0 status_code=200 response_size=1000 error=foo_err`, }, "should include query priority": { queryString: url.Values(map[string][]string{"query": {"up"}}), @@ -464,7 +465,7 @@ func TestReportQueryStatsFormat(t *testing.T) { Priority: 99, PriorityAssigned: true, }, - expectedLog: `level=info msg="query stats" component=query-frontend method=GET path=/prometheus/api/v1/query response_time=1s query_wall_time_seconds=0 fetched_series_count=0 fetched_chunks_count=0 fetched_samples_count=0 fetched_chunks_bytes=0 fetched_data_bytes=0 split_queries=0 status_code=200 response_size=1000 query_length=2 priority=99 param_query=up`, + expectedLog: `level=info msg="query stats" component=query-frontend method=GET path=/prometheus/api/v1/query response_time=1s query_wall_time_seconds=0 response_series_count=0 fetched_series_count=0 fetched_chunks_count=0 fetched_samples_count=0 fetched_chunks_bytes=0 fetched_data_bytes=0 split_queries=0 status_code=200 response_size=1000 query_length=2 priority=99 param_query=up`, }, "should include data fetch min and max time": { queryString: url.Values(map[string][]string{"query": {"up"}}), @@ -472,10 +473,11 @@ func TestReportQueryStatsFormat(t *testing.T) { DataSelectMaxTime: 1704153600000, DataSelectMinTime: 1704067200000, }, - expectedLog: `level=info msg="query stats" component=query-frontend method=GET path=/prometheus/api/v1/query response_time=1s query_wall_time_seconds=0 fetched_series_count=0 fetched_chunks_count=0 fetched_samples_count=0 fetched_chunks_bytes=0 fetched_data_bytes=0 split_queries=0 status_code=200 response_size=1000 data_select_max_time=1704153600 data_select_min_time=1704067200 query_length=2 param_query=up`, + expectedLog: `level=info msg="query stats" component=query-frontend method=GET path=/prometheus/api/v1/query response_time=1s query_wall_time_seconds=0 response_series_count=0 fetched_series_count=0 fetched_chunks_count=0 fetched_samples_count=0 fetched_chunks_bytes=0 fetched_data_bytes=0 split_queries=0 status_code=200 response_size=1000 data_select_max_time=1704153600 data_select_min_time=1704067200 query_length=2 param_query=up`, }, "should include query stats with store gateway stats": { queryStats: &querier_stats.QueryStats{ + QueryResponseSeries: 100, Stats: querier_stats.Stats{ WallTime: 3 * time.Second, QueryStorageWallTime: 100 * time.Minute, @@ -489,7 +491,7 @@ func TestReportQueryStatsFormat(t *testing.T) { StoreGatewayTouchedPostingBytes: 200, }, }, - expectedLog: `level=info msg="query stats" component=query-frontend method=GET path=/prometheus/api/v1/query response_time=1s query_wall_time_seconds=3 fetched_series_count=100 fetched_chunks_count=200 fetched_samples_count=300 fetched_chunks_bytes=1024 fetched_data_bytes=2048 split_queries=10 status_code=200 response_size=1000 store_gateway_touched_postings_count=20 store_gateway_touched_posting_bytes=200 query_storage_wall_time_seconds=6000`, + expectedLog: `level=info msg="query stats" component=query-frontend method=GET path=/prometheus/api/v1/query response_time=1s query_wall_time_seconds=3 response_series_count=100 fetched_series_count=100 fetched_chunks_count=200 fetched_samples_count=300 fetched_chunks_bytes=1024 fetched_data_bytes=2048 split_queries=10 status_code=200 response_size=1000 store_gateway_touched_postings_count=20 store_gateway_touched_posting_bytes=200 query_storage_wall_time_seconds=6000`, }, } diff --git a/pkg/querier/stats/stats.go b/pkg/querier/stats/stats.go index 1559731dad..d4eda0f756 100644 --- a/pkg/querier/stats/stats.go +++ b/pkg/querier/stats/stats.go @@ -16,11 +16,12 @@ var ctxKey = contextKey(0) type QueryStats struct { Stats - PriorityAssigned bool - Priority int64 - DataSelectMaxTime int64 - DataSelectMinTime int64 - m sync.Mutex + QueryResponseSeries uint64 + PriorityAssigned bool + Priority int64 + DataSelectMaxTime int64 + DataSelectMinTime int64 + m sync.Mutex } // ContextWithEmptyStats returns a context with empty stats. @@ -65,6 +66,22 @@ func (s *QueryStats) LoadWallTime() time.Duration { return time.Duration(atomic.LoadInt64((*int64)(&s.WallTime))) } +func (s *QueryStats) AddResponseSeries(series uint64) { + if s == nil { + return + } + + atomic.AddUint64(&s.QueryResponseSeries, series) +} + +func (s *QueryStats) LoadResponseSeries() uint64 { + if s == nil { + return 0 + } + + return atomic.LoadUint64(&s.QueryResponseSeries) +} + func (s *QueryStats) AddFetchedSeries(series uint64) { if s == nil { return diff --git a/pkg/querier/tripperware/instantquery/instant_query.go b/pkg/querier/tripperware/instantquery/instant_query.go index 034090a8b9..75c9715d39 100644 --- a/pkg/querier/tripperware/instantquery/instant_query.go +++ b/pkg/querier/tripperware/instantquery/instant_query.go @@ -17,6 +17,7 @@ import ( "github.com/weaveworks/common/httpgrpc" "google.golang.org/grpc/status" + "github.com/cortexproject/cortex/pkg/querier/stats" "github.com/cortexproject/cortex/pkg/querier/tripperware" "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/spanlogger" @@ -176,6 +177,9 @@ func (instantQueryCodec) EncodeResponse(ctx context.Context, res tripperware.Res return nil, httpgrpc.Errorf(http.StatusInternalServerError, "invalid response format") } + queryStats := stats.FromContext(ctx) + tripperware.SetQueryResponseStats(a, queryStats) + b, err := json.Marshal(a) if err != nil { return nil, httpgrpc.Errorf(http.StatusInternalServerError, "error encoding response: %v", err) diff --git a/pkg/querier/tripperware/queryrange/query_range.go b/pkg/querier/tripperware/queryrange/query_range.go index 46f1affb48..875519cd97 100644 --- a/pkg/querier/tripperware/queryrange/query_range.go +++ b/pkg/querier/tripperware/queryrange/query_range.go @@ -18,6 +18,7 @@ import ( "github.com/prometheus/common/model" "github.com/weaveworks/common/httpgrpc" + "github.com/cortexproject/cortex/pkg/querier/stats" "github.com/cortexproject/cortex/pkg/querier/tripperware" "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/spanlogger" @@ -241,6 +242,9 @@ func (prometheusCodec) EncodeResponse(ctx context.Context, res tripperware.Respo if a != nil { m := a.Data.Result.GetMatrix() sp.LogFields(otlog.Int("series", len(m.GetSampleStreams()))) + + queryStats := stats.FromContext(ctx) + tripperware.SetQueryResponseStats(a, queryStats) } b, err := json.Marshal(a) diff --git a/pkg/querier/tripperware/util.go b/pkg/querier/tripperware/util.go index 9ff4e74e55..d9b3551ce2 100644 --- a/pkg/querier/tripperware/util.go +++ b/pkg/querier/tripperware/util.go @@ -6,6 +6,7 @@ import ( "github.com/weaveworks/common/httpgrpc" + "github.com/cortexproject/cortex/pkg/querier/stats" "github.com/cortexproject/cortex/pkg/tenant" "github.com/cortexproject/cortex/pkg/util/validation" ) @@ -70,3 +71,19 @@ func DoRequests(ctx context.Context, downstream Handler, reqs []Request, limits return resps, firstErr } + +func SetQueryResponseStats(a *PrometheusResponse, queryStats *stats.QueryStats) { + if queryStats != nil { + v := a.Data.Result.GetVector() + if v != nil { + queryStats.AddResponseSeries(uint64(len(v.GetSamples()))) + return + } + + m := a.Data.Result.GetMatrix() + if m != nil { + queryStats.AddResponseSeries(uint64(len(m.GetSampleStreams()))) + return + } + } +}