From 4627dd98cd5be00020ef41ce1e03d83ec85fc240 Mon Sep 17 00:00:00 2001 From: Charles Korn Date: Thu, 12 Sep 2024 15:59:44 +1000 Subject: [PATCH] Mimir query engine: binary operations between two scalars (#9277) * Add support for binary operations between two scalars * Add changelog entry * Address PR feedback --- CHANGELOG.md | 2 +- pkg/streamingpromql/engine_test.go | 14 ++- .../scalar_scalar_binary_operation.go | 102 ++++++++++++++++++ pkg/streamingpromql/query.go | 17 ++- .../testdata/ours/binary_operators.test | 28 +++++ .../testdata/upstream/literals.test | 40 +++---- .../testdata/upstream/operators.test | 55 ++++------ 7 files changed, 197 insertions(+), 61 deletions(-) create mode 100644 pkg/streamingpromql/operators/scalar_scalar_binary_operation.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 45e9214229..140bc43028 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -33,7 +33,7 @@ * [CHANGE] Query-scheduler: Remove the experimental `-query-scheduler.use-multi-algorithm-query-queue` flag. The new multi-algorithm tree queue is always used for the scheduler. #9210 * [FEATURE] Alertmanager: Added `-alertmanager.log-parsing-label-matchers` to control logging when parsing label matchers. This flag is intended to be used with `-alertmanager.utf8-strict-mode-enabled` to validate UTF-8 strict mode is working as intended. The default value is `false`. #9173 * [FEATURE] Alertmanager: Added `-alertmanager.utf8-migration-logging-enabled` to enable logging of tenant configurations that are incompatible with UTF-8 strict mode. The default value is `false`. #9174 -* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #8422 #8430 #8454 #8455 #8360 #8490 #8508 #8577 #8660 #8671 #8677 #8747 #8850 #8872 #8838 #8911 #8909 #8923 #8924 #8925 #8932 #8933 #8934 #8962 #8986 #8993 #8995 #9008 #9017 #9018 #9019 #9120 #9121 #9136 #9139 #9140 #9145 #9191 #9192 #9194 #9196 #9201 #9212 #9225 #9260 #9272 #9278 +* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #8422 #8430 #8454 #8455 #8360 #8490 #8508 #8577 #8660 #8671 #8677 #8747 #8850 #8872 #8838 #8911 #8909 #8923 #8924 #8925 #8932 #8933 #8934 #8962 #8986 #8993 #8995 #9008 #9017 #9018 #9019 #9120 #9121 #9136 #9139 #9140 #9145 #9191 #9192 #9194 #9196 #9201 #9212 #9225 #9260 #9272 #9277 #9278 * [FEATURE] Experimental Kafka-based ingest storage. #6888 #6894 #6929 #6940 #6951 #6974 #6982 #7029 #7030 #7091 #7142 #7147 #7148 #7153 #7160 #7193 #7349 #7376 #7388 #7391 #7393 #7394 #7402 #7404 #7423 #7424 #7437 #7486 #7503 #7508 #7540 #7621 #7682 #7685 #7694 #7695 #7696 #7697 #7701 #7733 #7734 #7741 #7752 #7838 #7851 #7871 #7877 #7880 #7882 #7887 #7891 #7925 #7955 #7967 #8031 #8063 #8077 #8088 #8135 #8176 #8184 #8194 #8216 #8217 #8222 #8233 #8503 #8542 #8579 #8657 #8686 #8688 #8703 #8706 #8708 #8738 #8750 #8778 #8808 #8809 #8841 #8842 #8845 #8853 #8886 #8988 * What it is: * When the new ingest storage architecture is enabled, distributors write incoming write requests to a Kafka-compatible backend, and the ingesters asynchronously replay ingested data from Kafka. In this architecture, the write and read path are de-coupled through a Kafka-compatible backend. The write path and Kafka load is a function of the incoming write traffic, the read path load is a function of received queries. Whatever the load on the read path, it doesn't affect the write path. diff --git a/pkg/streamingpromql/engine_test.go b/pkg/streamingpromql/engine_test.go index 20866fa9f6..4bbe2ecdab 100644 --- a/pkg/streamingpromql/engine_test.go +++ b/pkg/streamingpromql/engine_test.go @@ -41,9 +41,8 @@ func TestUnsupportedPromQLFeatures(t *testing.T) { // The goal of this is not to list every conceivable expression that is unsupported, but to cover all the // different cases and make sure we produce a reasonable error message when these cases are encountered. unsupportedExpressions := map[string]string{ - "1 + 2": "binary expression between two scalars", - "metric{} < other_metric{}": "binary expression with '<'", - "metric{} or other_metric{}": "binary expression with many-to-many matching", + "metric{} < other_metric{}": "binary expression with '<'", + "metric{} or other_metric{}": "binary expression with many-to-many matching", "metric{} + on() group_left() other_metric{}": "binary expression with many-to-one matching", "metric{} + on() group_right() other_metric{}": "binary expression with one-to-many matching", "topk(5, metric{})": "'topk' aggregation with parameter", @@ -88,6 +87,15 @@ func TestUnsupportedPromQLFeaturesWithFeatureToggles(t *testing.T) { requireRangeQueryIsUnsupported(t, featureToggles, "metric{} + other_metric{}", "binary expressions") requireInstantQueryIsUnsupported(t, featureToggles, "metric{} + other_metric{}", "binary expressions") + + requireRangeQueryIsUnsupported(t, featureToggles, "metric{} + 1", "binary expressions") + requireInstantQueryIsUnsupported(t, featureToggles, "metric{} + 1", "binary expressions") + + requireRangeQueryIsUnsupported(t, featureToggles, "1 + metric{}", "binary expressions") + requireInstantQueryIsUnsupported(t, featureToggles, "1 + metric{}", "binary expressions") + + requireRangeQueryIsUnsupported(t, featureToggles, "2 + 1", "binary expressions") + requireInstantQueryIsUnsupported(t, featureToggles, "2 + 1", "binary expressions") }) t.Run("..._over_time functions", func(t *testing.T) { diff --git a/pkg/streamingpromql/operators/scalar_scalar_binary_operation.go b/pkg/streamingpromql/operators/scalar_scalar_binary_operation.go new file mode 100644 index 0000000000..453d1823c3 --- /dev/null +++ b/pkg/streamingpromql/operators/scalar_scalar_binary_operation.go @@ -0,0 +1,102 @@ +// SPDX-License-Identifier: AGPL-3.0-only + +package operators + +import ( + "context" + "fmt" + + "github.com/prometheus/prometheus/promql/parser" + "github.com/prometheus/prometheus/promql/parser/posrange" + + "github.com/grafana/mimir/pkg/streamingpromql/compat" + "github.com/grafana/mimir/pkg/streamingpromql/limiting" + "github.com/grafana/mimir/pkg/streamingpromql/types" +) + +type ScalarScalarBinaryOperation struct { + Left types.ScalarOperator + Right types.ScalarOperator + Op parser.ItemType + MemoryConsumptionTracker *limiting.MemoryConsumptionTracker + + opFunc binaryOperationFunc + expressionPosition posrange.PositionRange +} + +var _ types.ScalarOperator = &ScalarScalarBinaryOperation{} + +func NewScalarScalarBinaryOperation( + left, right types.ScalarOperator, + op parser.ItemType, + memoryConsumptionTracker *limiting.MemoryConsumptionTracker, + expressionPosition posrange.PositionRange, +) (*ScalarScalarBinaryOperation, error) { + f := arithmeticOperationFuncs[op] + if f == nil { + return nil, compat.NewNotSupportedError(fmt.Sprintf("binary expression with '%s'", op)) + } + + return &ScalarScalarBinaryOperation{ + Left: left, + Right: right, + Op: op, + MemoryConsumptionTracker: memoryConsumptionTracker, + + opFunc: f, + expressionPosition: expressionPosition, + }, nil +} + +func (s *ScalarScalarBinaryOperation) GetValues(ctx context.Context) (types.ScalarData, error) { + leftValues, err := s.Left.GetValues(ctx) + if err != nil { + return types.ScalarData{}, err + } + + rightValues, err := s.Right.GetValues(ctx) + if err != nil { + return types.ScalarData{}, err + } + + // Binary operations between two scalars always produce a float value, as only arithmetic operators or comparison + // operators (with the bool keyword) are supported between two scalars. + // + // Furthermore, scalar values always have a value at each step. + // + // So we can just compute the result of each pairwise operation without examining the timestamps of each sample. + // + // We store the result in the slice from the left operator, and return the right operator's slice once we're done. + for i, left := range leftValues.Samples { + right := rightValues.Samples[i] + + f, h, ok, err := s.opFunc(left.F, right.F, nil, nil) + + if err != nil { + return types.ScalarData{}, err + } + + if !ok { + panic(fmt.Sprintf("%v binary operation between two scalars (%v and %v) did not produce a result, this should never happen", s.Op.String(), left.F, right.F)) + } + + if h != nil { + panic(fmt.Sprintf("%v binary operation between two scalars (%v and %v) produced a histogram result, this should never happen", s.Op.String(), left.F, right.F)) + } + + leftValues.Samples[i].F = f + } + + types.FPointSlicePool.Put(rightValues.Samples, s.MemoryConsumptionTracker) + + return leftValues, nil +} + +func (s *ScalarScalarBinaryOperation) ExpressionPosition() posrange.PositionRange { + return s.expressionPosition +} + +func (s *ScalarScalarBinaryOperation) Close() { + s.Left.Close() + s.Right.Close() +} diff --git a/pkg/streamingpromql/query.go b/pkg/streamingpromql/query.go index c059e517a2..d2531f3896 100644 --- a/pkg/streamingpromql/query.go +++ b/pkg/streamingpromql/query.go @@ -376,7 +376,22 @@ func (q *Query) convertToScalarOperator(expr parser.Expr) (types.ScalarOperator, case *parser.ParenExpr: return q.convertToScalarOperator(e.Expr) case *parser.BinaryExpr: - return nil, compat.NewNotSupportedError("binary expression between two scalars") + if !q.engine.featureToggles.EnableBinaryOperations { + return nil, compat.NewNotSupportedError("binary expressions") + } + + lhs, err := q.convertToScalarOperator(e.LHS) + if err != nil { + return nil, err + } + + rhs, err := q.convertToScalarOperator(e.RHS) + if err != nil { + return nil, err + } + + return operators.NewScalarScalarBinaryOperation(lhs, rhs, e.Op, q.memoryConsumptionTracker, e.PositionRange()) + default: return nil, compat.NewNotSupportedError(fmt.Sprintf("PromQL expression type %T for scalars", e)) } diff --git a/pkg/streamingpromql/testdata/ours/binary_operators.test b/pkg/streamingpromql/testdata/ours/binary_operators.test index 5c1a9da70f..3a8e281ba5 100644 --- a/pkg/streamingpromql/testdata/ours/binary_operators.test +++ b/pkg/streamingpromql/testdata/ours/binary_operators.test @@ -359,4 +359,32 @@ eval range from 0m to 18m step 6m 2 * {job="bar"} eval_fail range from 0m to 24m step 6m 2 * {job="bar"} expected_fail_message vector cannot contain metrics with the same labelset +clear + +load 6m + metric 1 2 _ 3 stale 4 {{schema:3 sum:4 count:4 buckets:[1 2 1]}} 5 + +# Scalars on both sides. +eval range from 0m to 3m step 1m 1 + 2 + {} 3 3 3 3 + +eval range from 0m to 3m step 1m 1 - 2 + {} -1 -1 -1 -1 + +eval range from 0m to 3m step 1m 2 * 3 + {} 6 6 6 6 + +eval range from 0m to 3m step 1m 1 / 2 + {} 0.5 0.5 0.5 0.5 + +eval range from 0m to 3m step 1m 5 % 2 + {} 1 1 1 1 + +eval range from 0m to 3m step 1m 2 ^ 3 + {} 8 8 8 8 + +eval range from 0m to 3m step 1m 2 atan2 3 + {} 0.5880026035475675 0.5880026035475675 0.5880026035475675 0.5880026035475675 +eval range from 0m to 42m step 6m scalar(metric) + 2 + {} 3 4 NaN 5 NaN 6 NaN 7 diff --git a/pkg/streamingpromql/testdata/upstream/literals.test b/pkg/streamingpromql/testdata/upstream/literals.test index 3e6d357d4d..1638b970f2 100644 --- a/pkg/streamingpromql/testdata/upstream/literals.test +++ b/pkg/streamingpromql/testdata/upstream/literals.test @@ -12,17 +12,14 @@ eval instant at 50m 12.34e+6 eval instant at 50m 12.34e-6 0.00001234 -# Unsupported by streaming engine. -# eval instant at 50m 1+1 -# 2 +eval instant at 50m 1+1 + 2 -# Unsupported by streaming engine. -# eval instant at 50m 1-1 -# 0 +eval instant at 50m 1-1 + 0 -# Unsupported by streaming engine. -# eval instant at 50m 1 - -1 -# 2 +eval instant at 50m 1 - -1 + 2 eval instant at 50m .2 0.2 @@ -51,22 +48,17 @@ eval instant at 50m nan eval instant at 50m 2. 2 -# Unsupported by streaming engine. -# eval instant at 50m 1 / 0 -# +Inf +eval instant at 50m 1 / 0 + +Inf -# Unsupported by streaming engine. -# eval instant at 50m ((1) / (0)) -# +Inf +eval instant at 50m ((1) / (0)) + +Inf -# Unsupported by streaming engine. -# eval instant at 50m -1 / 0 -# -Inf +eval instant at 50m -1 / 0 + -Inf -# Unsupported by streaming engine. -# eval instant at 50m 0 / 0 -# NaN +eval instant at 50m 0 / 0 + NaN -# Unsupported by streaming engine. -# eval instant at 50m 1 % 0 -# NaN +eval instant at 50m 1 % 0 + NaN diff --git a/pkg/streamingpromql/testdata/upstream/operators.test b/pkg/streamingpromql/testdata/upstream/operators.test index e26093bcdc..ba62c3492f 100644 --- a/pkg/streamingpromql/testdata/upstream/operators.test +++ b/pkg/streamingpromql/testdata/upstream/operators.test @@ -42,18 +42,15 @@ eval instant at 50m - - - SUM(http_requests) BY (job) eval instant at 50m - - - 1 -1 -# Unsupported by streaming engine. -# eval instant at 50m -2^---1*3 -# -1.5 +eval instant at 50m -2^---1*3 + -1.5 -# Unsupported by streaming engine. -# eval instant at 50m 2/-2^---1*3+2 -# -10 +eval instant at 50m 2/-2^---1*3+2 + -10 -# Unsupported by streaming engine. -# eval instant at 50m -10^3 * - SUM(http_requests) BY (job) ^ -1 -# {job="api-server"} 1 -# {job="app-server"} 0.38461538461538464 +eval instant at 50m -10^3 * - SUM(http_requests) BY (job) ^ -1 + {job="api-server"} 1 + {job="app-server"} 0.38461538461538464 eval instant at 50m 1000 / SUM(http_requests) BY (job) {job="api-server"} 1 @@ -75,25 +72,21 @@ eval instant at 50m SUM(http_requests) BY (job) ^ 2 {job="api-server"} 1000000 {job="app-server"} 6760000 -# Unsupported by streaming engine. -# eval instant at 50m SUM(http_requests) BY (job) % 3 ^ 2 -# {job="api-server"} 1 -# {job="app-server"} 8 +eval instant at 50m SUM(http_requests) BY (job) % 3 ^ 2 + {job="api-server"} 1 + {job="app-server"} 8 -# Unsupported by streaming engine. -# eval instant at 50m SUM(http_requests) BY (job) % 2 ^ (3 ^ 2) -# {job="api-server"} 488 -# {job="app-server"} 40 +eval instant at 50m SUM(http_requests) BY (job) % 2 ^ (3 ^ 2) + {job="api-server"} 488 + {job="app-server"} 40 -# Unsupported by streaming engine. -# eval instant at 50m SUM(http_requests) BY (job) % 2 ^ 3 ^ 2 -# {job="api-server"} 488 -# {job="app-server"} 40 +eval instant at 50m SUM(http_requests) BY (job) % 2 ^ 3 ^ 2 + {job="api-server"} 488 + {job="app-server"} 40 -# Unsupported by streaming engine. -# eval instant at 50m SUM(http_requests) BY (job) % 2 ^ 3 ^ 2 ^ 2 -# {job="api-server"} 1000 -# {job="app-server"} 2600 +eval instant at 50m SUM(http_requests) BY (job) % 2 ^ 3 ^ 2 ^ 2 + {job="api-server"} 1000 + {job="app-server"} 2600 # Unsupported by streaming engine. # eval instant at 50m COUNT(http_requests) BY (job) ^ COUNT(http_requests) BY (job) @@ -545,10 +538,8 @@ eval instant at 5m trigy atan2 trigx eval instant at 5m trigy atan2 trigNaN {} NaN -# Unsupported by streaming engine. -# eval instant at 5m 10 atan2 20 -# 0.4636476090008061 +eval instant at 5m 10 atan2 20 + 0.4636476090008061 -# Unsupported by streaming engine. -# eval instant at 5m 10 atan2 NaN -# NaN +eval instant at 5m 10 atan2 NaN + NaN