Skip to content

Commit

Permalink
MQE: add support for comparison operations (#9398)
Browse files Browse the repository at this point in the history
* Add feature flag and expand unsupported features test.

* Simplify supported / unsupported assertions in tests

* Add test cases

* Implement comparison operations

* Enable upstream test cases

* Fix issue where scalar value is returned when doing a scalar / vector comparison with scalar on LHS

* Remove features that are now supported from `TestUnsupportedPromQLFeatures`

* Add test case for label manipulation when `on` and `ignoring` is used

* Add changelog entry

* Address PR feedback: update comment

Co-authored-by: Arve Knudsen <[email protected]>

---------

Co-authored-by: Arve Knudsen <[email protected]>
  • Loading branch information
charleskorn and aknuds1 authored Sep 26, 2024
1 parent 3ef0ebf commit 76caff9
Show file tree
Hide file tree
Showing 14 changed files with 652 additions and 91 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
* [CHANGE] Querier: Remove deprecated `-querier.max-query-into-future`. The feature was deprecated in Mimir 2.12. #9407
* [FEATURE] Alertmanager: Added `-alertmanager.log-parsing-label-matchers` to control logging when parsing label matchers. This flag is intended to be used with `-alertmanager.utf8-strict-mode-enabled` to validate UTF-8 strict mode is working as intended. The default value is `false`. #9173
* [FEATURE] Alertmanager: Added `-alertmanager.utf8-migration-logging-enabled` to enable logging of tenant configurations that are incompatible with UTF-8 strict mode. The default value is `false`. #9174
* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #8422 #8430 #8454 #8455 #8360 #8490 #8508 #8577 #8660 #8671 #8677 #8747 #8850 #8872 #8838 #8911 #8909 #8923 #8924 #8925 #8932 #8933 #8934 #8962 #8986 #8993 #8995 #9008 #9017 #9018 #9019 #9120 #9121 #9136 #9139 #9140 #9145 #9191 #9192 #9194 #9196 #9201 #9212 #9225 #9260 #9272 #9277 #9278 #9280 #9281 #9342 #9343 #9367 #9368 #9371 #9399
* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #8422 #8430 #8454 #8455 #8360 #8490 #8508 #8577 #8660 #8671 #8677 #8747 #8850 #8872 #8838 #8911 #8909 #8923 #8924 #8925 #8932 #8933 #8934 #8962 #8986 #8993 #8995 #9008 #9017 #9018 #9019 #9120 #9121 #9136 #9139 #9140 #9145 #9191 #9192 #9194 #9196 #9201 #9212 #9225 #9260 #9272 #9277 #9278 #9280 #9281 #9342 #9343 #9367 #9368 #9371 #9398 #9399
* [FEATURE] Experimental Kafka-based ingest storage. #6888 #6894 #6929 #6940 #6951 #6974 #6982 #7029 #7030 #7091 #7142 #7147 #7148 #7153 #7160 #7193 #7349 #7376 #7388 #7391 #7393 #7394 #7402 #7404 #7423 #7424 #7437 #7486 #7503 #7508 #7540 #7621 #7682 #7685 #7694 #7695 #7696 #7697 #7701 #7733 #7734 #7741 #7752 #7838 #7851 #7871 #7877 #7880 #7882 #7887 #7891 #7925 #7955 #7967 #8031 #8063 #8077 #8088 #8135 #8176 #8184 #8194 #8216 #8217 #8222 #8233 #8503 #8542 #8579 #8657 #8686 #8688 #8703 #8706 #8708 #8738 #8750 #8778 #8808 #8809 #8841 #8842 #8845 #8853 #8886 #8988
* What it is:
* When the new ingest storage architecture is enabled, distributors write incoming write requests to a Kafka-compatible backend, and the ingesters asynchronously replay ingested data from Kafka. In this architecture, the write and read path are de-coupled through a Kafka-compatible backend. The write path and Kafka load is a function of the incoming write traffic, the read path load is a function of received queries. Whatever the load on the read path, it doesn't affect the write path.
Expand Down
11 changes: 11 additions & 0 deletions cmd/mimir/config-descriptor.json
Original file line number Diff line number Diff line change
Expand Up @@ -1978,6 +1978,17 @@
"fieldType": "boolean",
"fieldCategory": "experimental"
},
{
"kind": "field",
"name": "enable_binary_comparison_operations",
"required": false,
"desc": "Enable support for binary comparison operations in Mimir's query engine. Only applies if the Mimir query engine is in use.",
"fieldValue": null,
"fieldDefaultValue": true,
"fieldFlag": "querier.mimir-query-engine.enable-binary-comparison-operations",
"fieldType": "boolean",
"fieldCategory": "experimental"
},
{
"kind": "field",
"name": "enable_scalars",
Expand Down
2 changes: 2 additions & 0 deletions cmd/mimir/help-all.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -1921,6 +1921,8 @@ Usage of ./cmd/mimir/mimir:
Maximum number of samples a single query can load into memory. This config option should be set on query-frontend too when query sharding is enabled. (default 50000000)
-querier.mimir-query-engine.enable-aggregation-operations
[experimental] Enable support for aggregation operations in Mimir's query engine. Only applies if the Mimir query engine is in use. (default true)
-querier.mimir-query-engine.enable-binary-comparison-operations
[experimental] Enable support for binary comparison operations in Mimir's query engine. Only applies if the Mimir query engine is in use. (default true)
-querier.mimir-query-engine.enable-scalars
[experimental] Enable support for scalars in Mimir's query engine. Only applies if the Mimir query engine is in use. (default true)
-querier.minimize-ingester-requests
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1504,6 +1504,11 @@ mimir_query_engine:
# CLI flag: -querier.mimir-query-engine.enable-aggregation-operations
[enable_aggregation_operations: <boolean> | default = true]
# (experimental) Enable support for binary comparison operations in Mimir's
# query engine. Only applies if the Mimir query engine is in use.
# CLI flag: -querier.mimir-query-engine.enable-binary-comparison-operations
[enable_binary_comparison_operations: <boolean> | default = true]
# (experimental) Enable support for scalars in Mimir's query engine. Only
# applies if the Mimir query engine is in use.
# CLI flag: -querier.mimir-query-engine.enable-scalars
Expand Down
7 changes: 5 additions & 2 deletions pkg/streamingpromql/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,21 @@ type EngineOpts struct {
}

type FeatureToggles struct {
EnableAggregationOperations bool `yaml:"enable_aggregation_operations" category:"experimental"`
EnableScalars bool `yaml:"enable_scalars" category:"experimental"`
EnableAggregationOperations bool `yaml:"enable_aggregation_operations" category:"experimental"`
EnableBinaryComparisonOperations bool `yaml:"enable_binary_comparison_operations" category:"experimental"`
EnableScalars bool `yaml:"enable_scalars" category:"experimental"`
}

// EnableAllFeatures enables all features supported by MQE, including experimental or incomplete features.
var EnableAllFeatures = FeatureToggles{
// Note that we deliberately use a keyless literal here to force a compilation error if we don't keep this in sync with new fields added to FeatureToggles.
true,
true,
true,
}

func (t *FeatureToggles) RegisterFlags(f *flag.FlagSet) {
f.BoolVar(&t.EnableAggregationOperations, "querier.mimir-query-engine.enable-aggregation-operations", true, "Enable support for aggregation operations in Mimir's query engine. Only applies if the Mimir query engine is in use.")
f.BoolVar(&t.EnableBinaryComparisonOperations, "querier.mimir-query-engine.enable-binary-comparison-operations", true, "Enable support for binary comparison operations in Mimir's query engine. Only applies if the Mimir query engine is in use.")
f.BoolVar(&t.EnableScalars, "querier.mimir-query-engine.enable-scalars", true, "Enable support for scalars in Mimir's query engine. Only applies if the Mimir query engine is in use.")
}
56 changes: 49 additions & 7 deletions pkg/streamingpromql/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ func TestUnsupportedPromQLFeatures(t *testing.T) {
// The goal of this is not to list every conceivable expression that is unsupported, but to cover all the
// different cases and make sure we produce a reasonable error message when these cases are encountered.
unsupportedExpressions := map[string]string{
"metric{} < other_metric{}": "binary expression with '<'",
"metric{} or other_metric{}": "binary expression with many-to-many matching",
"metric{} + on() group_left() other_metric{}": "binary expression with many-to-one matching",
"metric{} + on() group_right() other_metric{}": "binary expression with one-to-many matching",
Expand All @@ -55,8 +54,7 @@ func TestUnsupportedPromQLFeatures(t *testing.T) {

for expression, expectedError := range unsupportedExpressions {
t.Run(expression, func(t *testing.T) {
requireRangeQueryIsUnsupported(t, featureToggles, expression, expectedError)
requireInstantQueryIsUnsupported(t, featureToggles, expression, expectedError)
requireQueryIsUnsupported(t, featureToggles, expression, expectedError)
})
}

Expand All @@ -78,19 +76,43 @@ func TestUnsupportedPromQLFeaturesWithFeatureToggles(t *testing.T) {
featureToggles := EnableAllFeatures
featureToggles.EnableAggregationOperations = false

requireRangeQueryIsUnsupported(t, featureToggles, "sum by (label) (metric)", "aggregation operations")
requireInstantQueryIsUnsupported(t, featureToggles, "sum by (label) (metric)", "aggregation operations")
requireQueryIsUnsupported(t, featureToggles, "sum by (label) (metric)", "aggregation operations")
})

t.Run("binary expressions with comparison operation", func(t *testing.T) {
featureToggles := EnableAllFeatures
featureToggles.EnableBinaryComparisonOperations = false

requireQueryIsUnsupported(t, featureToggles, "metric{} > other_metric{}", "binary expression with '>'")
requireQueryIsUnsupported(t, featureToggles, "metric{} > 1", "binary expression with '>'")
requireQueryIsUnsupported(t, featureToggles, "1 > metric{}", "binary expression with '>'")
requireQueryIsUnsupported(t, featureToggles, "2 > bool 1", "binary expression with '>'")

// Other operations should still be supported.
requireQueryIsSupported(t, featureToggles, "metric{} + other_metric{}")
requireQueryIsSupported(t, featureToggles, "metric{} + 1")
requireQueryIsSupported(t, featureToggles, "1 + metric{}")
requireQueryIsSupported(t, featureToggles, "2 + 1")
})

t.Run("scalars", func(t *testing.T) {
featureToggles := EnableAllFeatures
featureToggles.EnableScalars = false

requireRangeQueryIsUnsupported(t, featureToggles, "2", "scalar values")
requireInstantQueryIsUnsupported(t, featureToggles, "2", "scalar values")
requireQueryIsUnsupported(t, featureToggles, "2", "scalar values")
})
}

func requireQueryIsUnsupported(t *testing.T, toggles FeatureToggles, expression string, expectedError string) {
requireRangeQueryIsUnsupported(t, toggles, expression, expectedError)
requireInstantQueryIsUnsupported(t, toggles, expression, expectedError)
}

func requireQueryIsSupported(t *testing.T, toggles FeatureToggles, expression string) {
requireRangeQueryIsSupported(t, toggles, expression)
requireInstantQueryIsSupported(t, toggles, expression)
}

func requireRangeQueryIsUnsupported(t *testing.T, featureToggles FeatureToggles, expression string, expectedError string) {
opts := NewTestEngineOpts()
opts.FeatureToggles = featureToggles
Expand All @@ -117,6 +139,26 @@ func requireInstantQueryIsUnsupported(t *testing.T, featureToggles FeatureToggle
require.Nil(t, qry)
}

func requireRangeQueryIsSupported(t *testing.T, featureToggles FeatureToggles, expression string) {
opts := NewTestEngineOpts()
opts.FeatureToggles = featureToggles
engine, err := NewEngine(opts, NewStaticQueryLimitsProvider(0), stats.NewQueryMetrics(nil), log.NewNopLogger())
require.NoError(t, err)

_, err = engine.NewRangeQuery(context.Background(), nil, nil, expression, time.Now().Add(-time.Hour), time.Now(), time.Minute)
require.NoError(t, err)
}

func requireInstantQueryIsSupported(t *testing.T, featureToggles FeatureToggles, expression string) {
opts := NewTestEngineOpts()
opts.FeatureToggles = featureToggles
engine, err := NewEngine(opts, NewStaticQueryLimitsProvider(0), stats.NewQueryMetrics(nil), log.NewNopLogger())
require.NoError(t, err)

_, err = engine.NewInstantQuery(context.Background(), nil, nil, expression, time.Now())
require.NoError(t, err)
}

func TestNewRangeQuery_InvalidQueryTime(t *testing.T) {
opts := NewTestEngineOpts()
engine, err := NewEngine(opts, NewStaticQueryLimitsProvider(0), stats.NewQueryMetrics(nil), log.NewNopLogger())
Expand Down
23 changes: 14 additions & 9 deletions pkg/streamingpromql/operators/scalar_scalar_binary_operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,20 +32,25 @@ func NewScalarScalarBinaryOperation(
memoryConsumptionTracker *limiting.MemoryConsumptionTracker,
expressionPosition posrange.PositionRange,
) (*ScalarScalarBinaryOperation, error) {
f := arithmeticOperationFuncs[op]
if f == nil {
return nil, compat.NewNotSupportedError(fmt.Sprintf("binary expression with '%s'", op))
}

return &ScalarScalarBinaryOperation{
s := &ScalarScalarBinaryOperation{
Left: left,
Right: right,
Op: op,
MemoryConsumptionTracker: memoryConsumptionTracker,
expressionPosition: expressionPosition,
}

if op.IsComparisonOperator() {
s.opFunc = boolComparisonOperationFuncs[op]
} else {
s.opFunc = arithmeticAndComparisonOperationFuncs[op]
}

if s.opFunc == nil {
return nil, compat.NewNotSupportedError(fmt.Sprintf("binary expression with '%s'", op))
}

opFunc: f,
expressionPosition: expressionPosition,
}, nil
return s, nil
}

func (s *ScalarScalarBinaryOperation) GetValues(ctx context.Context) (types.ScalarData, error) {
Expand Down
40 changes: 31 additions & 9 deletions pkg/streamingpromql/operators/vector_scalar_binary_operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type VectorScalarBinaryOperation struct {
Vector types.InstantVectorOperator
ScalarIsLeftSide bool
Op parser.ItemType
ReturnBool bool
MemoryConsumptionTracker *limiting.MemoryConsumptionTracker

timeRange types.QueryTimeRange
Expand All @@ -42,12 +43,20 @@ func NewVectorScalarBinaryOperation(
vector types.InstantVectorOperator,
scalarIsLeftSide bool,
op parser.ItemType,
returnBool bool,
timeRange types.QueryTimeRange,
memoryConsumptionTracker *limiting.MemoryConsumptionTracker,
annotations *annotations.Annotations,
expressionPosition posrange.PositionRange,
) (*VectorScalarBinaryOperation, error) {
f := arithmeticOperationFuncs[op]
var f binaryOperationFunc

if returnBool {
f = boolComparisonOperationFuncs[op]
} else {
f = arithmeticAndComparisonOperationFuncs[op]
}

if f == nil {
return nil, compat.NewNotSupportedError(fmt.Sprintf("binary expression with '%s'", op))
}
Expand All @@ -57,6 +66,7 @@ func NewVectorScalarBinaryOperation(
Vector: vector,
ScalarIsLeftSide: scalarIsLeftSide,
Op: op,
ReturnBool: returnBool,
MemoryConsumptionTracker: memoryConsumptionTracker,

timeRange: timeRange,
Expand All @@ -67,13 +77,23 @@ func NewVectorScalarBinaryOperation(
annotations.Add(generator("", expressionPosition))
}

if b.ScalarIsLeftSide {
if !b.ScalarIsLeftSide {
b.opFunc = func(scalar float64, vectorF float64, vectorH *histogram.FloatHistogram) (float64, *histogram.FloatHistogram, bool, error) {
return f(scalar, vectorF, nil, vectorH)
return f(vectorF, scalar, vectorH, nil)
}
} else if op.IsComparisonOperator() && !returnBool {
b.opFunc = func(scalar float64, vectorF float64, vectorH *histogram.FloatHistogram) (float64, *histogram.FloatHistogram, bool, error) {
_, _, ok, err := f(scalar, vectorF, nil, vectorH)

// We always want to return the value from the vector when we're doing a filter-style comparison.
//
// We deliberately ignore the histogram value as we need to treat it as if it were a float with value 0,
// pending the resolution of the discussion in https://github.com/prometheus/prometheus/issues/13934#issuecomment-2372947976.
return vectorF, nil, ok, err
}
} else {
b.opFunc = func(scalar float64, vectorF float64, vectorH *histogram.FloatHistogram) (float64, *histogram.FloatHistogram, bool, error) {
return f(vectorF, scalar, vectorH, nil)
return f(scalar, vectorF, nil, vectorH)
}
}

Expand All @@ -93,11 +113,13 @@ func (v *VectorScalarBinaryOperation) SeriesMetadata(ctx context.Context) ([]typ
return nil, err
}

// We don't need to do deduplication and merging of series in this operator: we expect that this operator
// is wrapped in a DeduplicateAndMerge.
metadata, err = functions.DropSeriesName(metadata, v.MemoryConsumptionTracker)
if err != nil {
return nil, err
if !v.Op.IsComparisonOperator() || v.ReturnBool {
// We don't need to do deduplication and merging of series in this operator: we expect that this operator
// is wrapped in a DeduplicateAndMerge.
metadata, err = functions.DropSeriesName(metadata, v.MemoryConsumptionTracker)
if err != nil {
return nil, err
}
}

return metadata, nil
Expand Down
Loading

0 comments on commit 76caff9

Please sign in to comment.