Skip to content

Commit

Permalink
MQE: include more information in query stats log lines (#9505)
Browse files Browse the repository at this point in the history
* MQE: include more information in query stats log lines

* Add changelog entry

# Conflicts:
#	CHANGELOG.md
  • Loading branch information
charleskorn authored Oct 3, 2024
1 parent 204d110 commit 5855711
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 22 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* `cortex_alertmanager_alerts`
* `cortex_alertmanager_silences`
* [CHANGE] Cache: Deprecate experimental support for Redis as a cache backend. #9453
* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #9367 #9368 #9398 #9399 #9403 #9417 #9418 #9419 #9420 #9482 #9504
* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #9367 #9368 #9398 #9399 #9403 #9417 #9418 #9419 #9420 #9482 #9504 #9505
* [FEATURE] Query-frontend: added experimental configuration options `query-frontend.cache-errors` and `query-frontend.results-cache-ttl-for-errors` to allow non-transient responses to be cached. When set to `true` error responses from hitting limits or bad data are cached for a short TTL. #9028
* [FEATURE] gRPC: Support S2 compression. #9322
* `-alertmanager.alertmanager-client.grpc-compression=s2`
Expand Down
60 changes: 40 additions & 20 deletions pkg/streamingpromql/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1137,34 +1137,54 @@ func TestMemoryConsumptionLimit_SingleQueries(t *testing.T) {
return engine, reg, span, ctx
}

assertEstimatedPeakMemoryConsumption := func(t *testing.T, reg *prometheus.Registry, span opentracing.Span, expectedMemoryConsumptionEstimate uint64) {
peakMemoryConsumptionHistogram := getHistogram(t, reg, "cortex_mimir_query_engine_estimated_query_peak_memory_consumption")
require.Equal(t, float64(expectedMemoryConsumptionEstimate), peakMemoryConsumptionHistogram.GetSampleSum())

jaegerSpan, ok := span.(*jaeger.Span)
require.True(t, ok)
require.Len(t, jaegerSpan.Logs(), 1)
traceLog := jaegerSpan.Logs()[0]
expectedFields := []otlog.Field{
otlog.String("level", "info"),
otlog.String("msg", "query stats"),
otlog.Uint64("estimatedPeakMemoryConsumption", expectedMemoryConsumptionEstimate),
}
require.Equal(t, expectedFields, traceLog.Fields)
}

start := timestamp.Time(0)
end := start.Add(4 * time.Minute)
step := time.Minute

for name, testCase := range testCases {
assertEstimatedPeakMemoryConsumption := func(t *testing.T, reg *prometheus.Registry, span opentracing.Span, expectedMemoryConsumptionEstimate uint64, queryType string) {
peakMemoryConsumptionHistogram := getHistogram(t, reg, "cortex_mimir_query_engine_estimated_query_peak_memory_consumption")
require.Equal(t, float64(expectedMemoryConsumptionEstimate), peakMemoryConsumptionHistogram.GetSampleSum())

jaegerSpan, ok := span.(*jaeger.Span)
require.True(t, ok)
require.Len(t, jaegerSpan.Logs(), 1)
traceLog := jaegerSpan.Logs()[0]
expectedFields := []otlog.Field{
otlog.String("level", "info"),
otlog.String("msg", "query stats"),
otlog.Uint64("estimatedPeakMemoryConsumption", expectedMemoryConsumptionEstimate),
otlog.String("expr", testCase.expr),
otlog.String("queryType", queryType),
}

switch queryType {
case "instant":
expectedFields = append(expectedFields,
otlog.Int64("time", start.UnixMilli()),
)
case "range":
expectedFields = append(expectedFields,
otlog.Int64("start", start.UnixMilli()),
otlog.Int64("end", end.UnixMilli()),
otlog.Int64("step", step.Milliseconds()),
)
default:
panic(fmt.Sprintf("unknown query type: %s", queryType))
}

require.Equal(t, expectedFields, traceLog.Fields)
}

t.Run(name, func(t *testing.T) {
queryTypes := map[string]func(t *testing.T) (promql.Query, *prometheus.Registry, opentracing.Span, context.Context, uint64){
"range query": func(t *testing.T) (promql.Query, *prometheus.Registry, opentracing.Span, context.Context, uint64) {
"range": func(t *testing.T) (promql.Query, *prometheus.Registry, opentracing.Span, context.Context, uint64) {
engine, reg, span, ctx := createEngine(t, testCase.rangeQueryLimit)
q, err := engine.NewRangeQuery(ctx, storage, nil, testCase.expr, start, start.Add(4*time.Minute), time.Minute)
q, err := engine.NewRangeQuery(ctx, storage, nil, testCase.expr, start, end, step)
require.NoError(t, err)
return q, reg, span, ctx, testCase.rangeQueryExpectedPeak
},
"instant query": func(t *testing.T) (promql.Query, *prometheus.Registry, opentracing.Span, context.Context, uint64) {
"instant": func(t *testing.T) (promql.Query, *prometheus.Registry, opentracing.Span, context.Context, uint64) {
engine, reg, span, ctx := createEngine(t, testCase.instantQueryLimit)
q, err := engine.NewInstantQuery(ctx, storage, nil, testCase.expr, start)
require.NoError(t, err)
Expand All @@ -1187,7 +1207,7 @@ func TestMemoryConsumptionLimit_SingleQueries(t *testing.T) {
require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(rejectedMetrics(1)), "cortex_querier_queries_rejected_total"))
}

assertEstimatedPeakMemoryConsumption(t, reg, span, expectedPeakMemoryConsumption)
assertEstimatedPeakMemoryConsumption(t, reg, span, expectedPeakMemoryConsumption, queryType)
})
}
})
Expand Down
24 changes: 23 additions & 1 deletion pkg/streamingpromql/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,29 @@ func (q *Query) Exec(ctx context.Context) *promql.Result {

defer func() {
logger := spanlogger.FromContext(ctx, q.engine.logger)
level.Info(logger).Log("msg", "query stats", "estimatedPeakMemoryConsumption", q.memoryConsumptionTracker.PeakEstimatedMemoryConsumptionBytes)
msg := make([]interface{}, 0, 2*(3+4)) // 3 fields for all query types, plus worst case of 4 fields for range queries

msg = append(msg,
"msg", "query stats",
"estimatedPeakMemoryConsumption", q.memoryConsumptionTracker.PeakEstimatedMemoryConsumptionBytes,
"expr", q.qs,
)

if q.IsInstant() {
msg = append(msg,
"queryType", "instant",
"time", q.timeRange.StartT,
)
} else {
msg = append(msg,
"queryType", "range",
"start", q.timeRange.StartT,
"end", q.timeRange.EndT,
"step", q.timeRange.IntervalMilliseconds,
)
}

level.Info(logger).Log(msg...)
q.engine.estimatedPeakMemoryConsumption.Observe(float64(q.memoryConsumptionTracker.PeakEstimatedMemoryConsumptionBytes))
}()

Expand Down

0 comments on commit 5855711

Please sign in to comment.