Skip to content

Commit

Permalink
Query Frontend: Expose samples_processed in Server-Timing header (#10103
Browse files Browse the repository at this point in the history
)

* Query Frontend: Expose total_samples in Server-Timing header

* Update Changelog

* Update integration test

* Refactor total_samples to samples_processed

* Added stats testing

* Reviewed stats_renderer testing

* Reset model.NameValidationScheme to model.LegacyValidation as this was affecting other tests and Mimir doesn't support Prometheus' UTF-8 metric/label name scheme yet.

* Reorganize imports

* Remove unnecessary comment
  • Loading branch information
tinitiuset authored Dec 19, 2024
1 parent 45cd8ed commit a53f79d
Show file tree
Hide file tree
Showing 12 changed files with 282 additions and 29 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

* [CHANGE] Distributor: OTLP and push handler replace all non-UTF8 characters with the unicode replacement character `\uFFFD` in error messages before propagating them. #10236
* [CHANGE] Querier: pass query matchers to queryable `IsApplicable` hook. #10256
* [ENHANCEMENT] Query Frontend: Return server-side `samples_processed` statistics. #10103
* [ENHANCEMENT] Distributor: OTLP receiver now converts also metric metadata. See also https://github.com/prometheus/prometheus/pull/15416. #10168
* [ENHANCEMENT] Distributor: discard float and histogram samples with duplicated timestamps from each timeseries in a request before the request is forwarded to ingesters. Discarded samples are tracked by the `cortex_discarded_samples_total` metrics with reason `sample_duplicate_timestamp`. #10145
* [ENHANCEMENT] Ruler: Add `cortex_prometheus_rule_group_last_rule_duration_sum_seconds` metric to track the total evaluation duration of a rule group regardless of concurrency #10189
Expand Down
2 changes: 1 addition & 1 deletion integration/query_frontend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ func runQueryFrontendTest(t *testing.T, cfg queryFrontendTestConfig) {
if userID == 0 && cfg.queryStatsEnabled {
res, _, err := c.QueryRaw("{instance=~\"hello.*\"}")
require.NoError(t, err)
require.Regexp(t, "querier_wall_time;dur=[0-9.]*, response_time;dur=[0-9.]*, bytes_processed;val=[0-9.]*$", res.Header.Values("Server-Timing")[0])
require.Regexp(t, "querier_wall_time;dur=[0-9.]*, response_time;dur=[0-9.]*, bytes_processed;val=[0-9.]*, samples_processed;val=[0-9.]*$", res.Header.Values("Server-Timing")[0])
}

// Beyond the range of -querier.query-ingesters-within should return nothing. No need to repeat it for each user.
Expand Down
2 changes: 1 addition & 1 deletion pkg/api/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ func NewQuerierHandler(
// This is used for the stats API which we should not support. Or find other ways to.
prometheus.GathererFunc(func() ([]*dto.MetricFamily, error) { return nil, nil }),
reg,
nil,
querier.StatsRenderer,
remoteWriteEnabled,
nil,
otlpEnabled,
Expand Down
2 changes: 2 additions & 0 deletions pkg/frontend/transport/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,7 @@ func (f *Handler) reportQueryStats(
"estimated_series_count", stats.GetEstimatedSeriesCount(),
"queue_time_seconds", stats.LoadQueueTime().Seconds(),
"encode_time_seconds", stats.LoadEncodeTime().Seconds(),
"samples_processed", stats.LoadSamplesProcessed(),
}, formatQueryString(details, queryString)...)

if details != nil {
Expand Down Expand Up @@ -485,6 +486,7 @@ func writeServiceTimingHeader(queryResponseTime time.Duration, headers http.Head
parts = append(parts, statsValue("querier_wall_time", stats.LoadWallTime()))
parts = append(parts, statsValue("response_time", queryResponseTime))
parts = append(parts, statsValue("bytes_processed", stats.LoadFetchedChunkBytes()+stats.LoadFetchedIndexBytes()))
parts = append(parts, statsValue("samples_processed", stats.GetSamplesProcessed()))
headers.Set(ServiceTimingHeaderName, strings.Join(parts, ", "))
}
}
Expand Down
1 change: 1 addition & 0 deletions pkg/frontend/transport/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,7 @@ func TestHandler_ServeHTTP(t *testing.T) {
expectedReadConsistency: "",
assertHeaders: func(t *testing.T, headers http.Header) {
assert.Contains(t, headers.Get(ServiceTimingHeaderName), "bytes_processed;val=0")
assert.Contains(t, headers.Get(ServiceTimingHeaderName), "samples_processed;val=0")
},
},
} {
Expand Down
4 changes: 4 additions & 0 deletions pkg/querier/remote_read_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ type mockQuerier struct {
selectFn func(ctx context.Context, sorted bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet
}

func (m mockQuerier) Close() error {
return nil
}

func (m mockQuerier) Select(ctx context.Context, sorted bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet {
if m.selectFn != nil {
return m.selectFn(ctx, sorted, hints, matchers...)
Expand Down
17 changes: 17 additions & 0 deletions pkg/querier/stats/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,22 @@ func (s *Stats) LoadEncodeTime() time.Duration {
return time.Duration(atomic.LoadInt64((*int64)(&s.EncodeTime)))
}

func (s *Stats) AddSamplesProcessed(c uint64) {
if s == nil {
return
}

atomic.AddUint64(&s.SamplesProcessed, c)
}

func (s *Stats) LoadSamplesProcessed() uint64 {
if s == nil {
return 0
}

return atomic.LoadUint64(&s.SamplesProcessed)
}

// Merge the provided Stats into this one.
func (s *Stats) Merge(other *Stats) {
if s == nil || other == nil {
Expand All @@ -219,6 +235,7 @@ func (s *Stats) Merge(other *Stats) {
s.AddEstimatedSeriesCount(other.LoadEstimatedSeriesCount())
s.AddQueueTime(other.LoadQueueTime())
s.AddEncodeTime(other.LoadEncodeTime())
s.AddSamplesProcessed(other.LoadSamplesProcessed())
}

// Copy returns a copy of the stats. Use this rather than regular struct assignment
Expand Down
97 changes: 70 additions & 27 deletions pkg/querier/stats/stats.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions pkg/querier/stats/stats.proto
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,6 @@ message Stats {
google.protobuf.Duration queue_time = 9 [(gogoproto.stdduration) = true, (gogoproto.nullable) = false];
// The time spent at the frontend encoding the query's final results. Does not include time spent serializing results at the querier.
google.protobuf.Duration encode_time = 10 [(gogoproto.stdduration) = true, (gogoproto.nullable) = false];
// TotalSamples represents the total number of samples scanned while evaluating a query.
uint64 samples_processed = 11;
}
22 changes: 22 additions & 0 deletions pkg/querier/stats/stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,23 @@ func TestStats_QueueTime(t *testing.T) {
})
}

func TestStats_SamplesProcessed(t *testing.T) {
t.Run("add and load samples processed", func(t *testing.T) {
stats, _ := ContextWithEmptyStats(context.Background())
stats.AddSamplesProcessed(10)
stats.AddSamplesProcessed(20)

assert.Equal(t, uint64(30), stats.LoadSamplesProcessed())
})

t.Run("add and load samples processed nil receiver", func(t *testing.T) {
var stats *Stats
stats.AddSamplesProcessed(10)

assert.Equal(t, uint64(0), stats.LoadSamplesProcessed())
})
}

func TestStats_Merge(t *testing.T) {
t.Run("merge two stats objects", func(t *testing.T) {
stats1 := &Stats{}
Expand All @@ -142,6 +159,7 @@ func TestStats_Merge(t *testing.T) {
stats1.AddShardedQueries(20)
stats1.AddSplitQueries(10)
stats1.AddQueueTime(5 * time.Second)
stats1.AddSamplesProcessed(10)

stats2 := &Stats{}
stats2.AddWallTime(time.Second)
Expand All @@ -151,6 +169,7 @@ func TestStats_Merge(t *testing.T) {
stats2.AddShardedQueries(21)
stats2.AddSplitQueries(11)
stats2.AddQueueTime(10 * time.Second)
stats2.AddSamplesProcessed(20)

stats1.Merge(stats2)

Expand All @@ -161,6 +180,7 @@ func TestStats_Merge(t *testing.T) {
assert.Equal(t, uint32(41), stats1.LoadShardedQueries())
assert.Equal(t, uint32(21), stats1.LoadSplitQueries())
assert.Equal(t, 15*time.Second, stats1.LoadQueueTime())
assert.Equal(t, uint64(30), stats1.LoadSamplesProcessed())
})

t.Run("merge two nil stats objects", func(t *testing.T) {
Expand All @@ -176,6 +196,7 @@ func TestStats_Merge(t *testing.T) {
assert.Equal(t, uint32(0), stats1.LoadShardedQueries())
assert.Equal(t, uint32(0), stats1.LoadSplitQueries())
assert.Equal(t, time.Duration(0), stats1.LoadQueueTime())
assert.Equal(t, uint64(0), stats1.LoadSamplesProcessed())
})
}

Expand All @@ -190,6 +211,7 @@ func TestStats_Copy(t *testing.T) {
FetchedIndexBytes: 7,
EstimatedSeriesCount: 8,
QueueTime: 9,
SamplesProcessed: 10,
}
s2 := s1.Copy()
assert.NotSame(t, s1, s2)
Expand Down
20 changes: 20 additions & 0 deletions pkg/querier/stats_renderer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
// SPDX-License-Identifier: AGPL-3.0-only

package querier

import (
"context"

promql_stats "github.com/prometheus/prometheus/util/stats"
prom_api "github.com/prometheus/prometheus/web/api/v1"

"github.com/grafana/mimir/pkg/querier/stats"
)

func StatsRenderer(ctx context.Context, s *promql_stats.Statistics, param string) promql_stats.QueryStats {
mimirStats := stats.FromContext(ctx)
if mimirStats != nil && s != nil {
mimirStats.AddSamplesProcessed(uint64(s.Samples.TotalSamples))
}
return prom_api.DefaultStatsRenderer(ctx, s, param)
}
Loading

0 comments on commit a53f79d

Please sign in to comment.