Skip to content

Commit

Permalink
query-frontend: format query start/end/step, log query length (#6473)
Browse files Browse the repository at this point in the history
* query-frontend: log query start/end/step in milliseconds

This PR adds another layer of stats to the query-frontend. These stats contain details about the parsed query and its execution. For now these details are only

* start time in milliseconds
* end time in milliseconds
* step in milliseconds

These are logged in query stats with precedence over the raw values received over the API. This makes machine-parsing the values easier.

The PR also logs the time since the param_start and param_end parameters. This will help determine whether the query would hit ingesters or store-gateways. I wasn't sure whether these should be easier to be ready by humans or by machines. I opted for
having them as human-readable go duration strings. Loki can also parse these, but I am not sure about other logging solutions.

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Update CHANGELOG.md

Signed-off-by: Dimitar Dimitrov <[email protected]>

---------

Signed-off-by: Dimitar Dimitrov <[email protected]>
  • Loading branch information
dimitarvdimitrov authored Nov 14, 2023
1 parent a0635de commit 5fce9e9
Show file tree
Hide file tree
Showing 6 changed files with 267 additions and 26 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
* [ENHANCEMENT] Server: Add `-server.report-grpc-codes-in-instrumentation-label-enabled` CLI flag to specify whether gRPC status codes should be used in `status_code` label of `cortex_request_duration_seconds` metric. It defaults to false, meaning that successful and erroneous gRPC status codes are represented with `success` and `error` respectively. #6562
* [ENHANCEMENT] Server: Add `-ingester.client.report-grpc-codes-in-instrumentation-label-enabled` CLI flag to specify whether gRPC status codes should be used in `status_code` label of `cortex_ingester_client_request_duration_seconds` metric. It defaults to false, meaning that successful and erroneous gRPC status codes are represented with `2xx` and `error` respectively. #6562
* [ENHANCEMENT] Server: Add `-server.http-log-closed-connections-without-response-enabled` option to log details about connections to HTTP server that were closed before any data was sent back. This can happen if client doesn't manage to send complete HTTP headers before timeout. #6612
* [ENHANCEMENT] Query-frontend: include length of query, time since the earliest and latest points of a query in "query stats" logs. Time parameters (start/end/time) are always formatted as RFC3339 now. #6473
* [BUGFIX] Ring: Ensure network addresses used for component hash rings are formatted correctly when using IPv6. #6068
* [BUGFIX] Query-scheduler: don't retain connections from queriers that have shut down, leading to gradually increasing enqueue latency over time. #6100 #6145
* [BUGFIX] Ingester: prevent query logic from continuing to execute after queries are canceled. #6085
Expand Down
2 changes: 1 addition & 1 deletion pkg/frontend/querymiddleware/roundtrip.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ func newQueryTripperware(

queryRangeMiddleware := []Middleware{
// Track query range statistics. Added first before any subsequent middleware modifies the request.
newQueryStatsMiddleware(registerer),
newQueryStatsMiddleware(registerer, engine),
newLimitsMiddleware(limits, log),
queryBlockerMiddleware,
}
Expand Down
80 changes: 79 additions & 1 deletion pkg/frontend/querymiddleware/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,28 @@ package querymiddleware

import (
"context"
"errors"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/promql/parser"
"github.com/prometheus/prometheus/storage"

"github.com/grafana/mimir/pkg/querier/stats"
)

type queryStatsMiddleware struct {
engine *promql.Engine
nonAlignedQueries prometheus.Counter
regexpMatcherCount prometheus.Counter
regexpMatcherOptimizedCount prometheus.Counter
next Handler
}

func newQueryStatsMiddleware(reg prometheus.Registerer) Middleware {
func newQueryStatsMiddleware(reg prometheus.Registerer, engine *promql.Engine) Middleware {
nonAlignedQueries := promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "cortex_query_frontend_non_step_aligned_queries_total",
Help: "Total queries sent that are not step aligned.",
Expand All @@ -34,6 +41,7 @@ func newQueryStatsMiddleware(reg prometheus.Registerer) Middleware {

return MiddlewareFunc(func(next Handler) Handler {
return &queryStatsMiddleware{
engine: engine,
nonAlignedQueries: nonAlignedQueries,
regexpMatcherCount: regexpMatcherCount,
regexpMatcherOptimizedCount: regexpMatcherOptimizedCount,
Expand Down Expand Up @@ -62,5 +70,75 @@ func (s queryStatsMiddleware) Do(ctx context.Context, req Request) (Response, er
}
}

s.populateQueryDetails(ctx, req)

return s.next.Do(ctx, req)
}

var queryStatsErrQueryable = &storage.MockQueryable{MockQuerier: &storage.MockQuerier{SelectMockFunction: func(bool, *storage.SelectHints, ...*labels.Matcher) storage.SeriesSet {
return storage.ErrSeriesSet(errors.New("cannot use query stats queryable for running queries"))
}}}

func (s queryStatsMiddleware) populateQueryDetails(ctx context.Context, req Request) {
details := QueryDetailsFromContext(ctx)
if details == nil {
return
}
details.Start = time.UnixMilli(req.GetStart())
details.End = time.UnixMilli(req.GetEnd())
details.Step = time.Duration(req.GetStep()) * time.Millisecond

query, err := newQuery(ctx, req, s.engine, queryStatsErrQueryable)
if err != nil {
return
}
defer query.Close()

evalStmt, ok := query.Statement().(*parser.EvalStmt)
if !ok {
return
}
minT, maxT := promql.FindMinMaxTime(evalStmt)
if minT != 0 {
details.MinT = time.UnixMilli(minT)
}
if maxT != 0 {
details.MaxT = time.UnixMilli(maxT)
}
}

type QueryDetails struct {
QuerierStats *stats.Stats

// Start and End are the parsed start and end times of the unmodified user request.
Start, End time.Time
// MinT and MaxT are the earlest and latest points in time which the query might try to use.
// For example, they account for range selectors and @ modifiers.
// MinT and MaxT may be zero-valued if the query doesn't process samples.
MinT, MaxT time.Time
Step time.Duration
}

type contextKey int

var ctxKey = contextKey(0)

// ContextWithEmptyDetails returns a context with empty QueryDetails.
// The returned context also has querier stats.Stats injected. The stats pointer in the context
// and the stats pointer in the QueryDetails are the same.
func ContextWithEmptyDetails(ctx context.Context) (*QueryDetails, context.Context) {
stats, ctx := stats.ContextWithEmptyStats(ctx)
details := &QueryDetails{QuerierStats: stats}
ctx = context.WithValue(ctx, ctxKey, details)
return details, ctx
}

// QueryDetailsFromContext gets the QueryDetails out of the Context. Returns nil if stats have not
// been initialised in the context.
func QueryDetailsFromContext(ctx context.Context) *QueryDetails {
o := ctx.Value(ctxKey)
if o == nil {
return nil
}
return o.(*QueryDetails)
}
31 changes: 26 additions & 5 deletions pkg/frontend/querymiddleware/stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,15 @@ import (
"context"
"strings"
"testing"
"time"

"github.com/grafana/dskit/user"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

querier_stats "github.com/grafana/mimir/pkg/querier/stats"
"github.com/grafana/mimir/pkg/util"
)

Expand All @@ -21,9 +23,10 @@ func Test_queryStatsMiddleware_Do(t *testing.T) {
req Request
}
tests := []struct {
name string
args args
expectedMetrics *strings.Reader
name string
args args
expectedMetrics *strings.Reader
expectedQueryDetails QueryDetails
}{
{
name: "happy path",
Expand All @@ -47,6 +50,14 @@ func Test_queryStatsMiddleware_Do(t *testing.T) {
# TYPE cortex_query_frontend_regexp_matcher_optimized_count counter
cortex_query_frontend_regexp_matcher_optimized_count 1
`),
expectedQueryDetails: QueryDetails{
QuerierStats: &querier_stats.Stats{},
Start: start.Truncate(time.Millisecond),
End: end.Truncate(time.Millisecond),
MinT: start.Truncate(time.Millisecond).Add(-5 * time.Minute),
MaxT: end.Truncate(time.Millisecond),
Step: step,
},
},
{
name: "parseExpr failed",
Expand All @@ -70,15 +81,25 @@ func Test_queryStatsMiddleware_Do(t *testing.T) {
# TYPE cortex_query_frontend_regexp_matcher_optimized_count counter
cortex_query_frontend_regexp_matcher_optimized_count 0
`),
expectedQueryDetails: QueryDetails{
QuerierStats: &querier_stats.Stats{},
Start: start.Truncate(time.Millisecond),
End: end.Truncate(time.Millisecond),
MinT: time.Time{}, // empty because the query is invalid
MaxT: time.Time{}, // empty because the query is invalid
Step: step,
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
reg := prometheus.NewPedanticRegistry()
mw := newQueryStatsMiddleware(reg)
_, err := mw.Wrap(mockHandlerWith(nil, nil)).Do(user.InjectOrgID(context.Background(), "test"), tt.args.req)
mw := newQueryStatsMiddleware(reg, newEngine())
actualDetails, ctx := ContextWithEmptyDetails(context.Background())
_, err := mw.Wrap(mockHandlerWith(nil, nil)).Do(user.InjectOrgID(ctx, "test"), tt.args.req)
require.NoError(t, err)
assert.NoError(t, testutil.GatherAndCompare(reg, tt.expectedMetrics))
assert.Equal(t, tt.expectedQueryDetails, *actualDetails)
})
}
}
58 changes: 45 additions & 13 deletions pkg/frontend/transport/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/prometheus/client_golang/prometheus/promauto"

apierror "github.com/grafana/mimir/pkg/api/error"
"github.com/grafana/mimir/pkg/frontend/querymiddleware"
querier_stats "github.com/grafana/mimir/pkg/querier/stats"
"github.com/grafana/mimir/pkg/util"
"github.com/grafana/mimir/pkg/util/activitytracker"
Expand Down Expand Up @@ -164,13 +165,13 @@ func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
f.mtx.Unlock()
}()

var stats *querier_stats.Stats
var queryDetails *querymiddleware.QueryDetails

// Initialise the stats in the context and make sure it's propagated
// Initialise the queryDetails in the context and make sure it's propagated
// down the request chain.
if f.cfg.QueryStatsEnabled {
var ctx context.Context
stats, ctx = querier_stats.ContextWithEmptyStats(r.Context())
queryDetails, ctx = querymiddleware.ContextWithEmptyDetails(r.Context())
r = r.WithContext(ctx)
}

Expand All @@ -195,7 +196,7 @@ func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {

if err != nil {
writeError(w, err)
f.reportQueryStats(r, params, queryResponseTime, 0, stats, err)
f.reportQueryStats(r, params, startTime, queryResponseTime, 0, queryDetails, err)
return
}

Expand All @@ -205,30 +206,30 @@ func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}

if f.cfg.QueryStatsEnabled {
writeServiceTimingHeader(queryResponseTime, hs, stats)
writeServiceTimingHeader(queryResponseTime, hs, queryDetails.QuerierStats)
}

w.WriteHeader(resp.StatusCode)
// we don't check for copy error as there is no much we can do at this point
queryResponseSize, _ := io.Copy(w, resp.Body)

if f.cfg.LogQueriesLongerThan > 0 && queryResponseTime > f.cfg.LogQueriesLongerThan {
f.reportSlowQuery(r, params, queryResponseTime)
f.reportSlowQuery(r, params, queryResponseTime, queryDetails)
}
if f.cfg.QueryStatsEnabled {
f.reportQueryStats(r, params, queryResponseTime, queryResponseSize, stats, nil)
f.reportQueryStats(r, params, startTime, queryResponseTime, queryResponseSize, queryDetails, nil)
}
}

// reportSlowQuery reports slow queries.
func (f *Handler) reportSlowQuery(r *http.Request, queryString url.Values, queryResponseTime time.Duration) {
func (f *Handler) reportSlowQuery(r *http.Request, queryString url.Values, queryResponseTime time.Duration, details *querymiddleware.QueryDetails) {
logMessage := append([]interface{}{
"msg", "slow query detected",
"method", r.Method,
"host", r.Host,
"path", r.URL.Path,
"time_taken", queryResponseTime.String(),
}, formatQueryString(queryString)...)
}, formatQueryString(details, queryString)...)

if len(f.cfg.LogQueryRequestHeaders) != 0 {
logMessage = append(logMessage, formatRequestHeaders(&r.Header, f.cfg.LogQueryRequestHeaders)...)
Expand All @@ -237,12 +238,16 @@ func (f *Handler) reportSlowQuery(r *http.Request, queryString url.Values, query
level.Info(util_log.WithContext(r.Context(), f.log)).Log(logMessage...)
}

func (f *Handler) reportQueryStats(r *http.Request, queryString url.Values, queryResponseTime time.Duration, queryResponseSizeBytes int64, stats *querier_stats.Stats, queryErr error) {
func (f *Handler) reportQueryStats(r *http.Request, queryString url.Values, queryStartTime time.Time, queryResponseTime time.Duration, queryResponseSizeBytes int64, details *querymiddleware.QueryDetails, queryErr error) {
tenantIDs, err := tenant.TenantIDs(r.Context())
if err != nil {
return
}
userID := tenant.JoinTenantIDs(tenantIDs)
var stats *querier_stats.Stats
if details != nil {
stats = details.QuerierStats
}
wallTime := stats.LoadWallTime()
numSeries := stats.LoadFetchedSeries()
numBytes := stats.LoadFetchedChunkBytes()
Expand Down Expand Up @@ -278,8 +283,21 @@ func (f *Handler) reportQueryStats(r *http.Request, queryString url.Values, quer
"split_queries", stats.LoadSplitQueries(),
"estimated_series_count", stats.GetEstimatedSeriesCount(),
"queue_time_seconds", stats.LoadQueueTime().Seconds(),
}, formatQueryString(queryString)...)
}, formatQueryString(details, queryString)...)

if details != nil {
// Start and End may be zero when the request wasn't a query (e.g. /metadata)
// or if the query was a constant expression and didn't need to process samples.
if !details.MinT.IsZero() && !details.MaxT.IsZero() {
logMessage = append(logMessage, "length", details.MaxT.Sub(details.MinT).String())
}
if !details.MinT.IsZero() {
logMessage = append(logMessage, "time_since_min_time", queryStartTime.Sub(details.MinT))
}
if !details.MaxT.IsZero() {
logMessage = append(logMessage, "time_since_max_time", queryStartTime.Sub(details.MaxT))
}
}
if len(f.cfg.LogQueryRequestHeaders) != 0 {
logMessage = append(logMessage, formatRequestHeaders(&r.Header, f.cfg.LogQueryRequestHeaders)...)
}
Expand All @@ -303,9 +321,23 @@ func (f *Handler) reportQueryStats(r *http.Request, queryString url.Values, quer
level.Info(util_log.WithContext(r.Context(), f.log)).Log(logMessage...)
}

func formatQueryString(queryString url.Values) (fields []interface{}) {
// formatQueryString prefers printing start, end, and step from details if they are not nil.
func formatQueryString(details *querymiddleware.QueryDetails, queryString url.Values) (fields []interface{}) {
for k, v := range queryString {
fields = append(fields, fmt.Sprintf("param_%s", k), strings.Join(v, ","))
var formattedValue string
if details != nil && (k == "start" || k == "end" || k == "step" || k == "time") {
switch k {
case "start", "time":
formattedValue = details.Start.Format(time.RFC3339Nano)
case "end":
formattedValue = details.End.Format(time.RFC3339Nano)
case "step":
formattedValue = strconv.FormatInt(details.Step.Milliseconds(), 10)
}
} else {
formattedValue = strings.Join(v, ",")
}
fields = append(fields, fmt.Sprintf("param_%s", k), formattedValue)
}
return fields
}
Expand Down
Loading

0 comments on commit 5fce9e9

Please sign in to comment.