Skip to content

Commit

Permalink
Add query response series log
Browse files Browse the repository at this point in the history
Signed-off-by: SungJin1212 <[email protected]>
  • Loading branch information
SungJin1212 committed Dec 13, 2024
1 parent fbd81bb commit 2791f3d
Show file tree
Hide file tree
Showing 7 changed files with 71 additions and 24 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
* [FEATURE] Chunk Cache: Support multi level cache and add metrics. #6249
* [FEATURE] Distributor: Accept multiple HA Tracker pairs in the same request. #6256
* [FEATURE] Ruler: Add support for per-user external labels #6340
* [ENHANCEMENT] Query Frontend: Add a number of series in query response to query stat log. #6423
* [ENHANCEMENT] Store Gateway: Add a hedged request to reduce the tail latency. #6388
* [ENHANCEMENT] Ingester: Add metrics to track succeed/failed native histograms. #6370
* [ENHANCEMENT] Query Frontend/Querier: Add an experimental flag `-querier.enable-promql-experimental-functions` to enable experimental promQL functions. #6355
Expand Down
24 changes: 13 additions & 11 deletions pkg/frontend/transport/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ type Handler struct {

// Metrics.
querySeconds *prometheus.CounterVec
querySeries *prometheus.CounterVec
queryFetchedSeries *prometheus.CounterVec
queryFetchedSamples *prometheus.CounterVec
queryScannedSamples *prometheus.CounterVec
queryPeakSamples *prometheus.HistogramVec
Expand All @@ -114,7 +114,7 @@ func NewHandler(cfg HandlerConfig, roundTripper http.RoundTripper, log log.Logge
Help: "Total amount of wall clock time spend processing queries.",
}, []string{"user"})

h.querySeries = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
h.queryFetchedSeries = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_query_fetched_series_total",
Help: "Number of series fetched to execute a query.",
}, []string{"user"})
Expand Down Expand Up @@ -158,7 +158,7 @@ func NewHandler(cfg HandlerConfig, roundTripper http.RoundTripper, log log.Logge

h.activeUsers = util.NewActiveUsersCleanupWithDefaultValues(func(user string) {
h.querySeconds.DeleteLabelValues(user)
h.querySeries.DeleteLabelValues(user)
h.queryFetchedSeries.DeleteLabelValues(user)
h.queryFetchedSamples.DeleteLabelValues(user)
h.queryScannedSamples.DeleteLabelValues(user)
h.queryPeakSamples.DeleteLabelValues(user)
Expand Down Expand Up @@ -338,9 +338,10 @@ func (f *Handler) reportSlowQuery(r *http.Request, queryString url.Values, query
func (f *Handler) reportQueryStats(r *http.Request, userID string, queryString url.Values, queryResponseTime time.Duration, stats *querier_stats.QueryStats, error error, statusCode int, resp *http.Response) {
wallTime := stats.LoadWallTime()
queryStorageWallTime := stats.LoadQueryStorageWallTime()
numSeries := stats.LoadFetchedSeries()
numChunks := stats.LoadFetchedChunks()
numSamples := stats.LoadFetchedSamples()
numResponseSeries := stats.LoadResponseSeries()
numFetchedSeries := stats.LoadFetchedSeries()
numFetchedChunks := stats.LoadFetchedChunks()
numFetchedSamples := stats.LoadFetchedSamples()
numScannedSamples := stats.LoadScannedSamples()
numPeakSamples := stats.LoadPeakSamples()
numChunkBytes := stats.LoadFetchedChunkBytes()
Expand All @@ -353,8 +354,8 @@ func (f *Handler) reportQueryStats(r *http.Request, userID string, queryString u

// Track stats.
f.querySeconds.WithLabelValues(userID).Add(wallTime.Seconds())
f.querySeries.WithLabelValues(userID).Add(float64(numSeries))
f.queryFetchedSamples.WithLabelValues(userID).Add(float64(numSamples))
f.queryFetchedSeries.WithLabelValues(userID).Add(float64(numFetchedSeries))
f.queryFetchedSamples.WithLabelValues(userID).Add(float64(numFetchedSamples))
f.queryScannedSamples.WithLabelValues(userID).Add(float64(numScannedSamples))
f.queryPeakSamples.WithLabelValues(userID).Observe(float64(numPeakSamples))
f.queryChunkBytes.WithLabelValues(userID).Add(float64(numChunkBytes))
Expand All @@ -378,9 +379,10 @@ func (f *Handler) reportQueryStats(r *http.Request, userID string, queryString u
"path", r.URL.Path,
"response_time", queryResponseTime,
"query_wall_time_seconds", wallTime.Seconds(),
"fetched_series_count", numSeries,
"fetched_chunks_count", numChunks,
"fetched_samples_count", numSamples,
"response_series_count", numResponseSeries,
"fetched_series_count", numFetchedSeries,
"fetched_chunks_count", numFetchedChunks,
"fetched_samples_count", numFetchedSamples,
"fetched_chunks_bytes", numChunkBytes,
"fetched_data_bytes", numDataBytes,
"split_queries", splitQueries,
Expand Down
18 changes: 10 additions & 8 deletions pkg/frontend/transport/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -429,14 +429,15 @@ func TestReportQueryStatsFormat(t *testing.T) {

tests := map[string]testCase{
"should not include query and header details if empty": {
expectedLog: `level=info msg="query stats" component=query-frontend method=GET path=/prometheus/api/v1/query response_time=1s query_wall_time_seconds=0 fetched_series_count=0 fetched_chunks_count=0 fetched_samples_count=0 fetched_chunks_bytes=0 fetched_data_bytes=0 split_queries=0 status_code=200 response_size=1000`,
expectedLog: `level=info msg="query stats" component=query-frontend method=GET path=/prometheus/api/v1/query response_time=1s query_wall_time_seconds=0 response_series_count=0 fetched_series_count=0 fetched_chunks_count=0 fetched_samples_count=0 fetched_chunks_bytes=0 fetched_data_bytes=0 split_queries=0 status_code=200 response_size=1000`,
},
"should include query length and string at the end": {
queryString: url.Values(map[string][]string{"query": {"up"}}),
expectedLog: `level=info msg="query stats" component=query-frontend method=GET path=/prometheus/api/v1/query response_time=1s query_wall_time_seconds=0 fetched_series_count=0 fetched_chunks_count=0 fetched_samples_count=0 fetched_chunks_bytes=0 fetched_data_bytes=0 split_queries=0 status_code=200 response_size=1000 query_length=2 param_query=up`,
expectedLog: `level=info msg="query stats" component=query-frontend method=GET path=/prometheus/api/v1/query response_time=1s query_wall_time_seconds=0 response_series_count=0 fetched_series_count=0 fetched_chunks_count=0 fetched_samples_count=0 fetched_chunks_bytes=0 fetched_data_bytes=0 split_queries=0 status_code=200 response_size=1000 query_length=2 param_query=up`,
},
"should include query stats": {
queryStats: &querier_stats.QueryStats{
QueryResponseSeries: 100,
Stats: querier_stats.Stats{
WallTime: 3 * time.Second,
QueryStorageWallTime: 100 * time.Minute,
Expand All @@ -448,34 +449,35 @@ func TestReportQueryStatsFormat(t *testing.T) {
SplitQueries: 10,
},
},
expectedLog: `level=info msg="query stats" component=query-frontend method=GET path=/prometheus/api/v1/query response_time=1s query_wall_time_seconds=3 fetched_series_count=100 fetched_chunks_count=200 fetched_samples_count=300 fetched_chunks_bytes=1024 fetched_data_bytes=2048 split_queries=10 status_code=200 response_size=1000 query_storage_wall_time_seconds=6000`,
expectedLog: `level=info msg="query stats" component=query-frontend method=GET path=/prometheus/api/v1/query response_time=1s query_wall_time_seconds=3 response_series_count=100 fetched_series_count=100 fetched_chunks_count=200 fetched_samples_count=300 fetched_chunks_bytes=1024 fetched_data_bytes=2048 split_queries=10 status_code=200 response_size=1000 query_storage_wall_time_seconds=6000`,
},
"should include user agent": {
header: http.Header{"User-Agent": []string{"Grafana"}},
expectedLog: `level=info msg="query stats" component=query-frontend method=GET path=/prometheus/api/v1/query response_time=1s query_wall_time_seconds=0 fetched_series_count=0 fetched_chunks_count=0 fetched_samples_count=0 fetched_chunks_bytes=0 fetched_data_bytes=0 split_queries=0 status_code=200 response_size=1000 user_agent=Grafana`,
expectedLog: `level=info msg="query stats" component=query-frontend method=GET path=/prometheus/api/v1/query response_time=1s query_wall_time_seconds=0 response_series_count=0 fetched_series_count=0 fetched_chunks_count=0 fetched_samples_count=0 fetched_chunks_bytes=0 fetched_data_bytes=0 split_queries=0 status_code=200 response_size=1000 user_agent=Grafana`,
},
"should include response error": {
responseErr: errors.New("foo_err"),
expectedLog: `level=error msg="query stats" component=query-frontend method=GET path=/prometheus/api/v1/query response_time=1s query_wall_time_seconds=0 fetched_series_count=0 fetched_chunks_count=0 fetched_samples_count=0 fetched_chunks_bytes=0 fetched_data_bytes=0 split_queries=0 status_code=200 response_size=1000 error=foo_err`,
expectedLog: `level=error msg="query stats" component=query-frontend method=GET path=/prometheus/api/v1/query response_time=1s query_wall_time_seconds=0 response_series_count=0 fetched_series_count=0 fetched_chunks_count=0 fetched_samples_count=0 fetched_chunks_bytes=0 fetched_data_bytes=0 split_queries=0 status_code=200 response_size=1000 error=foo_err`,
},
"should include query priority": {
queryString: url.Values(map[string][]string{"query": {"up"}}),
queryStats: &querier_stats.QueryStats{
Priority: 99,
PriorityAssigned: true,
},
expectedLog: `level=info msg="query stats" component=query-frontend method=GET path=/prometheus/api/v1/query response_time=1s query_wall_time_seconds=0 fetched_series_count=0 fetched_chunks_count=0 fetched_samples_count=0 fetched_chunks_bytes=0 fetched_data_bytes=0 split_queries=0 status_code=200 response_size=1000 query_length=2 priority=99 param_query=up`,
expectedLog: `level=info msg="query stats" component=query-frontend method=GET path=/prometheus/api/v1/query response_time=1s query_wall_time_seconds=0 response_series_count=0 fetched_series_count=0 fetched_chunks_count=0 fetched_samples_count=0 fetched_chunks_bytes=0 fetched_data_bytes=0 split_queries=0 status_code=200 response_size=1000 query_length=2 priority=99 param_query=up`,
},
"should include data fetch min and max time": {
queryString: url.Values(map[string][]string{"query": {"up"}}),
queryStats: &querier_stats.QueryStats{
DataSelectMaxTime: 1704153600000,
DataSelectMinTime: 1704067200000,
},
expectedLog: `level=info msg="query stats" component=query-frontend method=GET path=/prometheus/api/v1/query response_time=1s query_wall_time_seconds=0 fetched_series_count=0 fetched_chunks_count=0 fetched_samples_count=0 fetched_chunks_bytes=0 fetched_data_bytes=0 split_queries=0 status_code=200 response_size=1000 data_select_max_time=1704153600 data_select_min_time=1704067200 query_length=2 param_query=up`,
expectedLog: `level=info msg="query stats" component=query-frontend method=GET path=/prometheus/api/v1/query response_time=1s query_wall_time_seconds=0 response_series_count=0 fetched_series_count=0 fetched_chunks_count=0 fetched_samples_count=0 fetched_chunks_bytes=0 fetched_data_bytes=0 split_queries=0 status_code=200 response_size=1000 data_select_max_time=1704153600 data_select_min_time=1704067200 query_length=2 param_query=up`,
},
"should include query stats with store gateway stats": {
queryStats: &querier_stats.QueryStats{
QueryResponseSeries: 100,
Stats: querier_stats.Stats{
WallTime: 3 * time.Second,
QueryStorageWallTime: 100 * time.Minute,
Expand All @@ -489,7 +491,7 @@ func TestReportQueryStatsFormat(t *testing.T) {
StoreGatewayTouchedPostingBytes: 200,
},
},
expectedLog: `level=info msg="query stats" component=query-frontend method=GET path=/prometheus/api/v1/query response_time=1s query_wall_time_seconds=3 fetched_series_count=100 fetched_chunks_count=200 fetched_samples_count=300 fetched_chunks_bytes=1024 fetched_data_bytes=2048 split_queries=10 status_code=200 response_size=1000 store_gateway_touched_postings_count=20 store_gateway_touched_posting_bytes=200 query_storage_wall_time_seconds=6000`,
expectedLog: `level=info msg="query stats" component=query-frontend method=GET path=/prometheus/api/v1/query response_time=1s query_wall_time_seconds=3 response_series_count=100 fetched_series_count=100 fetched_chunks_count=200 fetched_samples_count=300 fetched_chunks_bytes=1024 fetched_data_bytes=2048 split_queries=10 status_code=200 response_size=1000 store_gateway_touched_postings_count=20 store_gateway_touched_posting_bytes=200 query_storage_wall_time_seconds=6000`,
},
}

Expand Down
27 changes: 22 additions & 5 deletions pkg/querier/stats/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,12 @@ var ctxKey = contextKey(0)

type QueryStats struct {
Stats
PriorityAssigned bool
Priority int64
DataSelectMaxTime int64
DataSelectMinTime int64
m sync.Mutex
QueryResponseSeries uint64
PriorityAssigned bool
Priority int64
DataSelectMaxTime int64
DataSelectMinTime int64
m sync.Mutex
}

// ContextWithEmptyStats returns a context with empty stats.
Expand Down Expand Up @@ -65,6 +66,22 @@ func (s *QueryStats) LoadWallTime() time.Duration {
return time.Duration(atomic.LoadInt64((*int64)(&s.WallTime)))
}

func (s *QueryStats) AddResponseSeries(series uint64) {
if s == nil {
return
}

atomic.AddUint64(&s.QueryResponseSeries, series)
}

func (s *QueryStats) LoadResponseSeries() uint64 {
if s == nil {
return 0
}

return atomic.LoadUint64(&s.QueryResponseSeries)
}

func (s *QueryStats) AddFetchedSeries(series uint64) {
if s == nil {
return
Expand Down
4 changes: 4 additions & 0 deletions pkg/querier/tripperware/instantquery/instant_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/weaveworks/common/httpgrpc"
"google.golang.org/grpc/status"

"github.com/cortexproject/cortex/pkg/querier/stats"
"github.com/cortexproject/cortex/pkg/querier/tripperware"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/spanlogger"
Expand Down Expand Up @@ -176,6 +177,9 @@ func (instantQueryCodec) EncodeResponse(ctx context.Context, res tripperware.Res
return nil, httpgrpc.Errorf(http.StatusInternalServerError, "invalid response format")
}

queryStats := stats.FromContext(ctx)
tripperware.SetQueryResponseStats(a, queryStats)

b, err := json.Marshal(a)
if err != nil {
return nil, httpgrpc.Errorf(http.StatusInternalServerError, "error encoding response: %v", err)
Expand Down
4 changes: 4 additions & 0 deletions pkg/querier/tripperware/queryrange/query_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/prometheus/common/model"
"github.com/weaveworks/common/httpgrpc"

"github.com/cortexproject/cortex/pkg/querier/stats"
"github.com/cortexproject/cortex/pkg/querier/tripperware"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/spanlogger"
Expand Down Expand Up @@ -241,6 +242,9 @@ func (prometheusCodec) EncodeResponse(ctx context.Context, res tripperware.Respo
if a != nil {
m := a.Data.Result.GetMatrix()
sp.LogFields(otlog.Int("series", len(m.GetSampleStreams())))

queryStats := stats.FromContext(ctx)
tripperware.SetQueryResponseStats(a, queryStats)
}

b, err := json.Marshal(a)
Expand Down
17 changes: 17 additions & 0 deletions pkg/querier/tripperware/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/weaveworks/common/httpgrpc"

"github.com/cortexproject/cortex/pkg/querier/stats"
"github.com/cortexproject/cortex/pkg/tenant"
"github.com/cortexproject/cortex/pkg/util/validation"
)
Expand Down Expand Up @@ -70,3 +71,19 @@ func DoRequests(ctx context.Context, downstream Handler, reqs []Request, limits

return resps, firstErr
}

func SetQueryResponseStats(a *PrometheusResponse, queryStats *stats.QueryStats) {
if queryStats != nil {
v := a.Data.Result.GetVector()
if v != nil {
queryStats.AddResponseSeries(uint64(len(v.GetSamples())))
return
}

m := a.Data.Result.GetMatrix()
if m != nil {
queryStats.AddResponseSeries(uint64(len(m.GetSampleStreams())))
return
}
}
}

0 comments on commit 2791f3d

Please sign in to comment.