Skip to content

Commit

Permalink
MQE: Add support for stdvar (#9419)
Browse files Browse the repository at this point in the history
* MQE: Add support for stdvar

* Update CHANGELOG
  • Loading branch information
jhesketh authored Sep 26, 2024
1 parent 588ffbc commit b212502
Show file tree
Hide file tree
Showing 6 changed files with 38 additions and 18 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
* [CHANGE] Querier: Remove deprecated `-querier.max-query-into-future`. The feature was deprecated in Mimir 2.12. #9407
* [FEATURE] Alertmanager: Added `-alertmanager.log-parsing-label-matchers` to control logging when parsing label matchers. This flag is intended to be used with `-alertmanager.utf8-strict-mode-enabled` to validate UTF-8 strict mode is working as intended. The default value is `false`. #9173
* [FEATURE] Alertmanager: Added `-alertmanager.utf8-migration-logging-enabled` to enable logging of tenant configurations that are incompatible with UTF-8 strict mode. The default value is `false`. #9174
* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #8422 #8430 #8454 #8455 #8360 #8490 #8508 #8577 #8660 #8671 #8677 #8747 #8850 #8872 #8838 #8911 #8909 #8923 #8924 #8925 #8932 #8933 #8934 #8962 #8986 #8993 #8995 #9008 #9017 #9018 #9019 #9120 #9121 #9136 #9139 #9140 #9145 #9191 #9192 #9194 #9196 #9201 #9212 #9225 #9260 #9272 #9277 #9278 #9280 #9281 #9342 #9343 #9367 #9368 #9371 #9398 #9399 #9403 #9417 #9418
* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #8422 #8430 #8454 #8455 #8360 #8490 #8508 #8577 #8660 #8671 #8677 #8747 #8850 #8872 #8838 #8911 #8909 #8923 #8924 #8925 #8932 #8933 #8934 #8962 #8986 #8993 #8995 #9008 #9017 #9018 #9019 #9120 #9121 #9136 #9139 #9140 #9145 #9191 #9192 #9194 #9196 #9201 #9212 #9225 #9260 #9272 #9277 #9278 #9280 #9281 #9342 #9343 #9367 #9368 #9371 #9398 #9399 #9403 #9417 #9418 #9419
* [FEATURE] Experimental Kafka-based ingest storage. #6888 #6894 #6929 #6940 #6951 #6974 #6982 #7029 #7030 #7091 #7142 #7147 #7148 #7153 #7160 #7193 #7349 #7376 #7388 #7391 #7393 #7394 #7402 #7404 #7423 #7424 #7437 #7486 #7503 #7508 #7540 #7621 #7682 #7685 #7694 #7695 #7696 #7697 #7701 #7733 #7734 #7741 #7752 #7838 #7851 #7871 #7877 #7880 #7882 #7887 #7891 #7925 #7955 #7967 #8031 #8063 #8077 #8088 #8135 #8176 #8184 #8194 #8216 #8217 #8222 #8233 #8503 #8542 #8579 #8657 #8686 #8688 #8703 #8706 #8708 #8738 #8750 #8778 #8808 #8809 #8841 #8842 #8845 #8853 #8886 #8988
* What it is:
* When the new ingest storage architecture is enabled, distributors write incoming write requests to a Kafka-compatible backend, and the ingesters asynchronously replay ingested data from Kafka. In this architecture, the write and read path are de-coupled through a Kafka-compatible backend. The write path and Kafka load is a function of the incoming write traffic, the read path load is a function of received queries. Whatever the load on the read path, it doesn't affect the write path.
Expand Down
3 changes: 2 additions & 1 deletion pkg/streamingpromql/aggregations/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ var AggregationGroupFactories = map[parser.ItemType]AggregationGroupFactory{
parser.GROUP: func() AggregationGroup { return NewCountGroupAggregationGroup(false) },
parser.MAX: func() AggregationGroup { return NewMinMaxAggregationGroup(true) },
parser.MIN: func() AggregationGroup { return NewMinMaxAggregationGroup(false) },
parser.STDDEV: func() AggregationGroup { return &StddevAggregationGroup{} },
parser.STDDEV: func() AggregationGroup { return NewStddevStdvarAggregationGroup(true) },
parser.STDVAR: func() AggregationGroup { return NewStddevStdvarAggregationGroup(false) },
parser.SUM: func() AggregationGroup { return &SumAggregationGroup{} },
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,26 @@ import (
"github.com/grafana/mimir/pkg/streamingpromql/types"
)

type StddevAggregationGroup struct {
// stddev represents whether this aggregation is `stddev` (true), or `stdvar` (false)
func NewStddevStdvarAggregationGroup(stddev bool) *StddevStdvarAggregationGroup {
return &StddevStdvarAggregationGroup{stddev: stddev}
}

type StddevStdvarAggregationGroup struct {
floats []float64
floatMeans []float64

// stddev represents whether this aggregation is `stddev` (true), or `stdvar` (false)
stddev bool

// Keeps track of how many samples we have encountered thus far for the group at this point
// This is necessary to do per point (instead of just counting the input series) as a series may have
// stale or non-existent values that are not added towards the count.
// We use float64 instead of uint64 so that we can reuse the float pool and don't have to retype on each division.
groupSeriesCounts []float64
}

func (g *StddevAggregationGroup) AccumulateSeries(data types.InstantVectorSeriesData, timeRange types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ functions.EmitAnnotationFunc) error {
func (g *StddevStdvarAggregationGroup) AccumulateSeries(data types.InstantVectorSeriesData, timeRange types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ functions.EmitAnnotationFunc) error {
var err error

// Native histograms are ignored for stddev
Expand Down Expand Up @@ -63,7 +71,7 @@ func (g *StddevAggregationGroup) AccumulateSeries(data types.InstantVectorSeries
return nil
}

func (g *StddevAggregationGroup) ComputeOutputSeries(timeRange types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, bool, error) {
func (g *StddevStdvarAggregationGroup) ComputeOutputSeries(timeRange types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, bool, error) {
floatPointCount := 0
for _, sc := range g.groupSeriesCounts {
if sc > 0 {
Expand All @@ -82,7 +90,14 @@ func (g *StddevAggregationGroup) ComputeOutputSeries(timeRange types.QueryTimeRa
for i, sc := range g.groupSeriesCounts {
if sc > 0 {
t := timeRange.StartT + int64(i)*timeRange.IntervalMilliseconds
f := math.Sqrt(g.floats[i] / g.groupSeriesCounts[i])
var f float64
if g.stddev {
// stddev
f = math.Sqrt(g.floats[i] / g.groupSeriesCounts[i])
} else {
// stdvar
f = g.floats[i] / g.groupSeriesCounts[i]
}
floatPoints = append(floatPoints, promql.FPoint{T: t, F: f})
}
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/streamingpromql/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1853,8 +1853,8 @@ func TestCompareVariousMixedMetricsAggregations(t *testing.T) {

for _, labels := range labelCombinations {
labelRegex := strings.Join(labels, "|")
// TODO(jhesketh): Add stddev back in.
// stddev is excluded until https://github.com/prometheus/prometheus/pull/14941 is merged
// TODO(jhesketh): Add stddev/stdvar back in.
// stddev/stdvar are excluded until https://github.com/prometheus/prometheus/pull/14941 is merged
// fixing an inconsistency in the Prometheus' engine where if a native histogram is the first sample
// loaded, it is incorrectly treated as a 0 float point.
for _, aggFunc := range []string{"avg", "count", "group", "min", "max", "sum"} {
Expand Down
7 changes: 7 additions & 0 deletions pkg/streamingpromql/testdata/ours-only/aggregators.test
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,11 @@ eval range from 0 to 6m step 1m stddev by (group) (series)
{group="d"} NaN 0 0 0 NaN 0 0
{group="e"} NaN 0 0 0 NaN 0 0

eval range from 0 to 6m step 1m stdvar by (group) (series)
{group="a"} 16 9 4 49 56.25 676 676
{group="b"} 0 NaN NaN 324 NaN 625 625
{group="c"} NaN 0.25 2.25 6.25 NaN 900 900
{group="d"} NaN 0 0 0 NaN 0 0
{group="e"} NaN 0 0 0 NaN 0 0

clear
17 changes: 7 additions & 10 deletions pkg/streamingpromql/testdata/upstream/aggregators.test
Original file line number Diff line number Diff line change
Expand Up @@ -206,14 +206,12 @@ eval instant at 50m stddev by (instance)(http_requests)
{instance="0"} 223.60679774998
{instance="1"} 223.60679774998

# Unsupported by streaming engine.
# eval instant at 50m stdvar(http_requests)
# {} 52500
eval instant at 50m stdvar(http_requests)
{} 52500

# Unsupported by streaming engine.
# eval instant at 50m stdvar by (instance)(http_requests)
# {instance="0"} 50000
# {instance="1"} 50000
eval instant at 50m stdvar by (instance)(http_requests)
{instance="0"} 50000
{instance="1"} 50000

# Float precision test for standard deviation and variance
clear
Expand All @@ -226,9 +224,8 @@ load 5m
# eval instant at 50m stddev(http_requests)
# {} 0.0

# Unsupported by streaming engine.
# eval instant at 50m stdvar(http_requests)
# {} 0.0
eval instant at 50m stdvar(http_requests)
{} 0.0


# Regression test for missing separator byte in labelsToGroupingKey.
Expand Down

0 comments on commit b212502

Please sign in to comment.