Skip to content

Commit

Permalink
MQE: Add support for more aggregation functions (#9008)
Browse files Browse the repository at this point in the history
* MQE: Add feature flag for disabling aggregation operations

* Move sum aggregation into AggregationGroup struct

* Use AggregationFunctionFactories in prep to expand

* Add MAX aggregation

* Update comment from promql issue

* Rename aggregation group

* Split out accumulatePoint

* Move error declaration

* Use emitAnnotationFunc optimisation

* Remove old comment

* Remove unncessary resetting of values

* Finish renaming AggregationGroup

* Add support for min aggregation

* Optimisation: Check for type conflicts as we go

* Move invalidCombinationOfHistograms to common in prep for avg

* Compact resulting histogram (consistent with promql)

* Remove avg that was not ready to be committed

* Do not compact for now

* Rename aggregation

* Construct min/max accumulation point once

* Rename compensation values

* Add extra test for conflicting series

* Revert "Optimisation: Check for type conflicts as we go"

This reverts commit 6605803.

* Fix lint

* Update CHANGELOG

* Update docs

* Address review feedback
  • Loading branch information
jhesketh authored Aug 28, 2024
1 parent 1b3eb87 commit ce3b88a
Show file tree
Hide file tree
Showing 14 changed files with 460 additions and 233 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
* [CHANGE] Distributor: Replace `-distributor.retry-after-header.max-backoff-exponent` and `-distributor.retry-after-header.base-seconds` with `-distributor.retry-after-header.min-backoff` and `-distributor.retry-after-header.max-backoff` for easier configuration. #8694
* [CHANGE] Ingester: increase the default inactivity timeout of active series (`-ingester.active-series-metrics-idle-timeout`) from `10m` to `20m`. #8975
* [CHANGE] Distributor: Remove `-distributor.enable-otlp-metadata-storage` flag, which was deprecated in version 2.12. #9069
* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #8422 #8430 #8454 #8455 #8360 #8490 #8508 #8577 #8660 #8671 #8677 #8747 #8850 #8872 #8838 #8911 #8909 #8923 #8924 #8925 #8932 #8933 #8934 #8962 #8986 #8993 #8995 #9017 #9018
* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #8422 #8430 #8454 #8455 #8360 #8490 #8508 #8577 #8660 #8671 #8677 #8747 #8850 #8872 #8838 #8911 #8909 #8923 #8924 #8925 #8932 #8933 #8934 #8962 #8986 #8993 #8995 #9017 #9018 #9008
* [FEATURE] Experimental Kafka-based ingest storage. #6888 #6894 #6929 #6940 #6951 #6974 #6982 #7029 #7030 #7091 #7142 #7147 #7148 #7153 #7160 #7193 #7349 #7376 #7388 #7391 #7393 #7394 #7402 #7404 #7423 #7424 #7437 #7486 #7503 #7508 #7540 #7621 #7682 #7685 #7694 #7695 #7696 #7697 #7701 #7733 #7734 #7741 #7752 #7838 #7851 #7871 #7877 #7880 #7882 #7887 #7891 #7925 #7955 #7967 #8031 #8063 #8077 #8088 #8135 #8176 #8184 #8194 #8216 #8217 #8222 #8233 #8503 #8542 #8579 #8657 #8686 #8688 #8703 #8706 #8708 #8738 #8750 #8778 #8808 #8809 #8841 #8842 #8845 #8853 #8886 #8988
* What it is:
* When the new ingest storage architecture is enabled, distributors write incoming write requests to a Kafka-compatible backend, and the ingesters asynchronously replay ingested data from Kafka. In this architecture, the write and read path are de-coupled through a Kafka-compatible backend. The write path and Kafka load is a function of the incoming write traffic, the read path load is a function of received queries. Whatever the load on the read path, it doesn't affect the write path.
Expand Down
23 changes: 17 additions & 6 deletions cmd/mimir/config-descriptor.json
Original file line number Diff line number Diff line change
Expand Up @@ -1991,23 +1991,23 @@
"blockEntries": [
{
"kind": "field",
"name": "enable_binary_operations",
"name": "enable_aggregation_operations",
"required": false,
"desc": "Enable support for binary operations in Mimir's query engine. Only applies if the Mimir query engine is in use.",
"desc": "Enable support for aggregation operations in Mimir's query engine. Only applies if the Mimir query engine is in use.",
"fieldValue": null,
"fieldDefaultValue": true,
"fieldFlag": "querier.mimir-query-engine.enable-binary-operations",
"fieldFlag": "querier.mimir-query-engine.enable-aggregation-operations",
"fieldType": "boolean",
"fieldCategory": "experimental"
},
{
"kind": "field",
"name": "enable_over_time_functions",
"name": "enable_binary_operations",
"required": false,
"desc": "Enable support for ..._over_time functions in Mimir's query engine. Only applies if the Mimir query engine is in use.",
"desc": "Enable support for binary operations in Mimir's query engine. Only applies if the Mimir query engine is in use.",
"fieldValue": null,
"fieldDefaultValue": true,
"fieldFlag": "querier.mimir-query-engine.enable-over-time-functions",
"fieldFlag": "querier.mimir-query-engine.enable-binary-operations",
"fieldType": "boolean",
"fieldCategory": "experimental"
},
Expand All @@ -2022,6 +2022,17 @@
"fieldType": "boolean",
"fieldCategory": "experimental"
},
{
"kind": "field",
"name": "enable_over_time_functions",
"required": false,
"desc": "Enable support for ..._over_time functions in Mimir's query engine. Only applies if the Mimir query engine is in use.",
"fieldValue": null,
"fieldDefaultValue": true,
"fieldFlag": "querier.mimir-query-engine.enable-over-time-functions",
"fieldType": "boolean",
"fieldCategory": "experimental"
},
{
"kind": "field",
"name": "enable_scalars",
Expand Down
2 changes: 2 additions & 0 deletions cmd/mimir/help-all.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -1897,6 +1897,8 @@ Usage of ./cmd/mimir/mimir:
Maximum number of split (by time) or partial (by shard) queries that will be scheduled in parallel by the query-frontend for a single input query. This limit is introduced to have a fairer query scheduling and avoid a single query over a large time range saturating all available queriers. (default 14)
-querier.max-samples int
Maximum number of samples a single query can load into memory. This config option should be set on query-frontend too when query sharding is enabled. (default 50000000)
-querier.mimir-query-engine.enable-aggregation-operations
[experimental] Enable support for aggregation operations in Mimir's query engine. Only applies if the Mimir query engine is in use. (default true)
-querier.mimir-query-engine.enable-binary-operations
[experimental] Enable support for binary operations in Mimir's query engine. Only applies if the Mimir query engine is in use. (default true)
-querier.mimir-query-engine.enable-offset-modifier
Expand Down
15 changes: 10 additions & 5 deletions docs/sources/mimir/configure/configuration-parameters/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -1513,21 +1513,26 @@ store_gateway_client:
[promql_experimental_functions_enabled: <boolean> | default = false]
mimir_query_engine:
# (experimental) Enable support for aggregation operations in Mimir's query
# engine. Only applies if the Mimir query engine is in use.
# CLI flag: -querier.mimir-query-engine.enable-aggregation-operations
[enable_aggregation_operations: <boolean> | default = true]
# (experimental) Enable support for binary operations in Mimir's query engine.
# Only applies if the Mimir query engine is in use.
# CLI flag: -querier.mimir-query-engine.enable-binary-operations
[enable_binary_operations: <boolean> | default = true]
# (experimental) Enable support for ..._over_time functions in Mimir's query
# engine. Only applies if the Mimir query engine is in use.
# CLI flag: -querier.mimir-query-engine.enable-over-time-functions
[enable_over_time_functions: <boolean> | default = true]
# (experimental) Enable support for offset modifier in Mimir's query engine.
# Only applies if the Mimir query engine is in use.
# CLI flag: -querier.mimir-query-engine.enable-offset-modifier
[enable_offset_modifier: <boolean> | default = true]
# (experimental) Enable support for ..._over_time functions in Mimir's query
# engine. Only applies if the Mimir query engine is in use.
# CLI flag: -querier.mimir-query-engine.enable-over-time-functions
[enable_over_time_functions: <boolean> | default = true]
# (experimental) Enable support for scalars in Mimir's query engine. Only
# applies if the Mimir query engine is in use.
# CLI flag: -querier.mimir-query-engine.enable-scalars
Expand Down
33 changes: 33 additions & 0 deletions pkg/streamingpromql/aggregations/common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// SPDX-License-Identifier: AGPL-3.0-only

package aggregations

import (
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/promql/parser"

"github.com/grafana/mimir/pkg/streamingpromql/functions"
"github.com/grafana/mimir/pkg/streamingpromql/limiting"
"github.com/grafana/mimir/pkg/streamingpromql/types"
)

// AggregationGroup accumulates series that have been grouped together and computes the output series data.
type AggregationGroup interface {
// AccumulateSeries takes in a series as part of the group
AccumulateSeries(data types.InstantVectorSeriesData, steps int, start int64, interval int64, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, emitAnnotationFunc functions.EmitAnnotationFunc) error
// ComputeOutputSeries does any final calculations and returns the grouped series data
ComputeOutputSeries(start int64, interval int64, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, bool, error)
}

type AggregationGroupFactory func() AggregationGroup

var AggregationGroupFactories = map[parser.ItemType]AggregationGroupFactory{
parser.MAX: func() AggregationGroup { return NewMinMaxAggregationGroup(true) },
parser.MIN: func() AggregationGroup { return NewMinMaxAggregationGroup(false) },
parser.SUM: func() AggregationGroup { return &SumAggregationGroup{} },
}

// Sentinel value used to indicate a sample has seen an invalid combination of histograms and should be ignored.
//
// Invalid combinations include exponential and custom buckets, and histograms with incompatible custom buckets.
var invalidCombinationOfHistograms = &histogram.FloatHistogram{}
114 changes: 114 additions & 0 deletions pkg/streamingpromql/aggregations/min_max.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
// SPDX-License-Identifier: AGPL-3.0-only
// Provenance-includes-location: https://github.com/prometheus/prometheus/blob/main/promql/engine.go
// Provenance-includes-license: Apache-2.0
// Provenance-includes-copyright: The Prometheus Authors

package aggregations

import (
"math"

"github.com/prometheus/prometheus/promql"

"github.com/grafana/mimir/pkg/streamingpromql/functions"
"github.com/grafana/mimir/pkg/streamingpromql/limiting"
"github.com/grafana/mimir/pkg/streamingpromql/types"
)

type MinMaxAggregationGroup struct {
floatValues []float64
floatPresent []bool

accumulatePoint func(idx int64, f float64)
}

// max represents whether this aggregation is `max` (true), or `min` (false)
func NewMinMaxAggregationGroup(max bool) *MinMaxAggregationGroup {
g := &MinMaxAggregationGroup{}
if max {
g.accumulatePoint = g.maxAccumulatePoint
} else {
g.accumulatePoint = g.minAccumulatePoint
}
return g
}

func (g *MinMaxAggregationGroup) maxAccumulatePoint(idx int64, f float64) {
if !g.floatPresent[idx] || g.floatPresent[idx] && f > g.floatValues[idx] {
g.floatValues[idx] = f
g.floatPresent[idx] = true
}
}

func (g *MinMaxAggregationGroup) minAccumulatePoint(idx int64, f float64) {
if !g.floatPresent[idx] || g.floatPresent[idx] && f < g.floatValues[idx] {
g.floatValues[idx] = f
g.floatPresent[idx] = true
}
}

func (g *MinMaxAggregationGroup) AccumulateSeries(data types.InstantVectorSeriesData, steps int, start int64, interval int64, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ functions.EmitAnnotationFunc) error {
if len(data.Floats) > 0 && g.floatValues == nil {
var err error
// First series with float values for this group, populate it.
g.floatValues, err = types.Float64SlicePool.Get(steps, memoryConsumptionTracker)
if err != nil {
return err
}

g.floatPresent, err = types.BoolSlicePool.Get(steps, memoryConsumptionTracker)
if err != nil {
return err
}
g.floatValues = g.floatValues[:steps]
g.floatPresent = g.floatPresent[:steps]
}

for _, p := range data.Floats {
if math.IsNaN(p.F) {
continue
}
idx := (p.T - start) / interval
g.accumulatePoint(idx, p.F)
}

// If a histogram exists max treats it as 0. We have to detect this here so that we return a 0 value instead of nothing.
// This is consistent with prometheus but may not be desired value: https://github.com/prometheus/prometheus/issues/14711
for _, p := range data.Histograms {
idx := (p.T - start) / interval
g.accumulatePoint(idx, 0)
}

types.PutInstantVectorSeriesData(data, memoryConsumptionTracker)
return nil
}

func (g *MinMaxAggregationGroup) ComputeOutputSeries(start int64, interval int64, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, bool, error) {
floatPointCount := 0
for _, p := range g.floatPresent {
if p {
floatPointCount++
}
}
var floatPoints []promql.FPoint
var err error
if floatPointCount > 0 {
floatPoints, err = types.FPointSlicePool.Get(floatPointCount, memoryConsumptionTracker)
if err != nil {
return types.InstantVectorSeriesData{}, false, err
}

for i, havePoint := range g.floatPresent {
if havePoint {
t := start + int64(i)*interval
f := g.floatValues[i]
floatPoints = append(floatPoints, promql.FPoint{T: t, F: f})
}
}
}

types.Float64SlicePool.Put(g.floatValues, memoryConsumptionTracker)
types.BoolSlicePool.Put(g.floatPresent, memoryConsumptionTracker)

return types.InstantVectorSeriesData{Floats: floatPoints}, false, nil
}
Loading

0 comments on commit ce3b88a

Please sign in to comment.