Skip to content

Commit

Permalink
Mimir query engine: binary operations between two scalars (#9277)
Browse files Browse the repository at this point in the history
* Add support for binary operations between two scalars

* Add changelog entry

* Address PR feedback
  • Loading branch information
charleskorn authored Sep 12, 2024
1 parent 98f0544 commit 4627dd9
Show file tree
Hide file tree
Showing 7 changed files with 197 additions and 61 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
* [CHANGE] Query-scheduler: Remove the experimental `-query-scheduler.use-multi-algorithm-query-queue` flag. The new multi-algorithm tree queue is always used for the scheduler. #9210
* [FEATURE] Alertmanager: Added `-alertmanager.log-parsing-label-matchers` to control logging when parsing label matchers. This flag is intended to be used with `-alertmanager.utf8-strict-mode-enabled` to validate UTF-8 strict mode is working as intended. The default value is `false`. #9173
* [FEATURE] Alertmanager: Added `-alertmanager.utf8-migration-logging-enabled` to enable logging of tenant configurations that are incompatible with UTF-8 strict mode. The default value is `false`. #9174
* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #8422 #8430 #8454 #8455 #8360 #8490 #8508 #8577 #8660 #8671 #8677 #8747 #8850 #8872 #8838 #8911 #8909 #8923 #8924 #8925 #8932 #8933 #8934 #8962 #8986 #8993 #8995 #9008 #9017 #9018 #9019 #9120 #9121 #9136 #9139 #9140 #9145 #9191 #9192 #9194 #9196 #9201 #9212 #9225 #9260 #9272 #9278
* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #8422 #8430 #8454 #8455 #8360 #8490 #8508 #8577 #8660 #8671 #8677 #8747 #8850 #8872 #8838 #8911 #8909 #8923 #8924 #8925 #8932 #8933 #8934 #8962 #8986 #8993 #8995 #9008 #9017 #9018 #9019 #9120 #9121 #9136 #9139 #9140 #9145 #9191 #9192 #9194 #9196 #9201 #9212 #9225 #9260 #9272 #9277 #9278
* [FEATURE] Experimental Kafka-based ingest storage. #6888 #6894 #6929 #6940 #6951 #6974 #6982 #7029 #7030 #7091 #7142 #7147 #7148 #7153 #7160 #7193 #7349 #7376 #7388 #7391 #7393 #7394 #7402 #7404 #7423 #7424 #7437 #7486 #7503 #7508 #7540 #7621 #7682 #7685 #7694 #7695 #7696 #7697 #7701 #7733 #7734 #7741 #7752 #7838 #7851 #7871 #7877 #7880 #7882 #7887 #7891 #7925 #7955 #7967 #8031 #8063 #8077 #8088 #8135 #8176 #8184 #8194 #8216 #8217 #8222 #8233 #8503 #8542 #8579 #8657 #8686 #8688 #8703 #8706 #8708 #8738 #8750 #8778 #8808 #8809 #8841 #8842 #8845 #8853 #8886 #8988
* What it is:
* When the new ingest storage architecture is enabled, distributors write incoming write requests to a Kafka-compatible backend, and the ingesters asynchronously replay ingested data from Kafka. In this architecture, the write and read path are de-coupled through a Kafka-compatible backend. The write path and Kafka load is a function of the incoming write traffic, the read path load is a function of received queries. Whatever the load on the read path, it doesn't affect the write path.
Expand Down
14 changes: 11 additions & 3 deletions pkg/streamingpromql/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,8 @@ func TestUnsupportedPromQLFeatures(t *testing.T) {
// The goal of this is not to list every conceivable expression that is unsupported, but to cover all the
// different cases and make sure we produce a reasonable error message when these cases are encountered.
unsupportedExpressions := map[string]string{
"1 + 2": "binary expression between two scalars",
"metric{} < other_metric{}": "binary expression with '<'",
"metric{} or other_metric{}": "binary expression with many-to-many matching",
"metric{} < other_metric{}": "binary expression with '<'",
"metric{} or other_metric{}": "binary expression with many-to-many matching",
"metric{} + on() group_left() other_metric{}": "binary expression with many-to-one matching",
"metric{} + on() group_right() other_metric{}": "binary expression with one-to-many matching",
"topk(5, metric{})": "'topk' aggregation with parameter",
Expand Down Expand Up @@ -88,6 +87,15 @@ func TestUnsupportedPromQLFeaturesWithFeatureToggles(t *testing.T) {

requireRangeQueryIsUnsupported(t, featureToggles, "metric{} + other_metric{}", "binary expressions")
requireInstantQueryIsUnsupported(t, featureToggles, "metric{} + other_metric{}", "binary expressions")

requireRangeQueryIsUnsupported(t, featureToggles, "metric{} + 1", "binary expressions")
requireInstantQueryIsUnsupported(t, featureToggles, "metric{} + 1", "binary expressions")

requireRangeQueryIsUnsupported(t, featureToggles, "1 + metric{}", "binary expressions")
requireInstantQueryIsUnsupported(t, featureToggles, "1 + metric{}", "binary expressions")

requireRangeQueryIsUnsupported(t, featureToggles, "2 + 1", "binary expressions")
requireInstantQueryIsUnsupported(t, featureToggles, "2 + 1", "binary expressions")
})

t.Run("..._over_time functions", func(t *testing.T) {
Expand Down
102 changes: 102 additions & 0 deletions pkg/streamingpromql/operators/scalar_scalar_binary_operation.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
// SPDX-License-Identifier: AGPL-3.0-only

package operators

import (
"context"
"fmt"

"github.com/prometheus/prometheus/promql/parser"
"github.com/prometheus/prometheus/promql/parser/posrange"

"github.com/grafana/mimir/pkg/streamingpromql/compat"
"github.com/grafana/mimir/pkg/streamingpromql/limiting"
"github.com/grafana/mimir/pkg/streamingpromql/types"
)

type ScalarScalarBinaryOperation struct {
Left types.ScalarOperator
Right types.ScalarOperator
Op parser.ItemType
MemoryConsumptionTracker *limiting.MemoryConsumptionTracker

opFunc binaryOperationFunc
expressionPosition posrange.PositionRange
}

var _ types.ScalarOperator = &ScalarScalarBinaryOperation{}

func NewScalarScalarBinaryOperation(
left, right types.ScalarOperator,
op parser.ItemType,
memoryConsumptionTracker *limiting.MemoryConsumptionTracker,
expressionPosition posrange.PositionRange,
) (*ScalarScalarBinaryOperation, error) {
f := arithmeticOperationFuncs[op]
if f == nil {
return nil, compat.NewNotSupportedError(fmt.Sprintf("binary expression with '%s'", op))
}

return &ScalarScalarBinaryOperation{
Left: left,
Right: right,
Op: op,
MemoryConsumptionTracker: memoryConsumptionTracker,

opFunc: f,
expressionPosition: expressionPosition,
}, nil
}

func (s *ScalarScalarBinaryOperation) GetValues(ctx context.Context) (types.ScalarData, error) {
leftValues, err := s.Left.GetValues(ctx)
if err != nil {
return types.ScalarData{}, err
}

rightValues, err := s.Right.GetValues(ctx)
if err != nil {
return types.ScalarData{}, err
}

// Binary operations between two scalars always produce a float value, as only arithmetic operators or comparison
// operators (with the bool keyword) are supported between two scalars.
//
// Furthermore, scalar values always have a value at each step.
//
// So we can just compute the result of each pairwise operation without examining the timestamps of each sample.
//
// We store the result in the slice from the left operator, and return the right operator's slice once we're done.
for i, left := range leftValues.Samples {
right := rightValues.Samples[i]

f, h, ok, err := s.opFunc(left.F, right.F, nil, nil)

if err != nil {
return types.ScalarData{}, err
}

if !ok {
panic(fmt.Sprintf("%v binary operation between two scalars (%v and %v) did not produce a result, this should never happen", s.Op.String(), left.F, right.F))
}

if h != nil {
panic(fmt.Sprintf("%v binary operation between two scalars (%v and %v) produced a histogram result, this should never happen", s.Op.String(), left.F, right.F))
}

leftValues.Samples[i].F = f
}

types.FPointSlicePool.Put(rightValues.Samples, s.MemoryConsumptionTracker)

return leftValues, nil
}

func (s *ScalarScalarBinaryOperation) ExpressionPosition() posrange.PositionRange {
return s.expressionPosition
}

func (s *ScalarScalarBinaryOperation) Close() {
s.Left.Close()
s.Right.Close()
}
17 changes: 16 additions & 1 deletion pkg/streamingpromql/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,22 @@ func (q *Query) convertToScalarOperator(expr parser.Expr) (types.ScalarOperator,
case *parser.ParenExpr:
return q.convertToScalarOperator(e.Expr)
case *parser.BinaryExpr:
return nil, compat.NewNotSupportedError("binary expression between two scalars")
if !q.engine.featureToggles.EnableBinaryOperations {
return nil, compat.NewNotSupportedError("binary expressions")
}

lhs, err := q.convertToScalarOperator(e.LHS)
if err != nil {
return nil, err
}

rhs, err := q.convertToScalarOperator(e.RHS)
if err != nil {
return nil, err
}

return operators.NewScalarScalarBinaryOperation(lhs, rhs, e.Op, q.memoryConsumptionTracker, e.PositionRange())

default:
return nil, compat.NewNotSupportedError(fmt.Sprintf("PromQL expression type %T for scalars", e))
}
Expand Down
28 changes: 28 additions & 0 deletions pkg/streamingpromql/testdata/ours/binary_operators.test
Original file line number Diff line number Diff line change
Expand Up @@ -359,4 +359,32 @@ eval range from 0m to 18m step 6m 2 * {job="bar"}
eval_fail range from 0m to 24m step 6m 2 * {job="bar"}
expected_fail_message vector cannot contain metrics with the same labelset

clear

load 6m
metric 1 2 _ 3 stale 4 {{schema:3 sum:4 count:4 buckets:[1 2 1]}} 5

# Scalars on both sides.
eval range from 0m to 3m step 1m 1 + 2
{} 3 3 3 3

eval range from 0m to 3m step 1m 1 - 2
{} -1 -1 -1 -1

eval range from 0m to 3m step 1m 2 * 3
{} 6 6 6 6

eval range from 0m to 3m step 1m 1 / 2
{} 0.5 0.5 0.5 0.5

eval range from 0m to 3m step 1m 5 % 2
{} 1 1 1 1

eval range from 0m to 3m step 1m 2 ^ 3
{} 8 8 8 8

eval range from 0m to 3m step 1m 2 atan2 3
{} 0.5880026035475675 0.5880026035475675 0.5880026035475675 0.5880026035475675

eval range from 0m to 42m step 6m scalar(metric) + 2
{} 3 4 NaN 5 NaN 6 NaN 7
40 changes: 16 additions & 24 deletions pkg/streamingpromql/testdata/upstream/literals.test
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,14 @@ eval instant at 50m 12.34e+6
eval instant at 50m 12.34e-6
0.00001234

# Unsupported by streaming engine.
# eval instant at 50m 1+1
# 2
eval instant at 50m 1+1
2

# Unsupported by streaming engine.
# eval instant at 50m 1-1
# 0
eval instant at 50m 1-1
0

# Unsupported by streaming engine.
# eval instant at 50m 1 - -1
# 2
eval instant at 50m 1 - -1
2

eval instant at 50m .2
0.2
Expand Down Expand Up @@ -51,22 +48,17 @@ eval instant at 50m nan
eval instant at 50m 2.
2

# Unsupported by streaming engine.
# eval instant at 50m 1 / 0
# +Inf
eval instant at 50m 1 / 0
+Inf

# Unsupported by streaming engine.
# eval instant at 50m ((1) / (0))
# +Inf
eval instant at 50m ((1) / (0))
+Inf

# Unsupported by streaming engine.
# eval instant at 50m -1 / 0
# -Inf
eval instant at 50m -1 / 0
-Inf

# Unsupported by streaming engine.
# eval instant at 50m 0 / 0
# NaN
eval instant at 50m 0 / 0
NaN

# Unsupported by streaming engine.
# eval instant at 50m 1 % 0
# NaN
eval instant at 50m 1 % 0
NaN
55 changes: 23 additions & 32 deletions pkg/streamingpromql/testdata/upstream/operators.test
Original file line number Diff line number Diff line change
Expand Up @@ -42,18 +42,15 @@ eval instant at 50m - - - SUM(http_requests) BY (job)
eval instant at 50m - - - 1
-1

# Unsupported by streaming engine.
# eval instant at 50m -2^---1*3
# -1.5
eval instant at 50m -2^---1*3
-1.5

# Unsupported by streaming engine.
# eval instant at 50m 2/-2^---1*3+2
# -10
eval instant at 50m 2/-2^---1*3+2
-10

# Unsupported by streaming engine.
# eval instant at 50m -10^3 * - SUM(http_requests) BY (job) ^ -1
# {job="api-server"} 1
# {job="app-server"} 0.38461538461538464
eval instant at 50m -10^3 * - SUM(http_requests) BY (job) ^ -1
{job="api-server"} 1
{job="app-server"} 0.38461538461538464

eval instant at 50m 1000 / SUM(http_requests) BY (job)
{job="api-server"} 1
Expand All @@ -75,25 +72,21 @@ eval instant at 50m SUM(http_requests) BY (job) ^ 2
{job="api-server"} 1000000
{job="app-server"} 6760000

# Unsupported by streaming engine.
# eval instant at 50m SUM(http_requests) BY (job) % 3 ^ 2
# {job="api-server"} 1
# {job="app-server"} 8
eval instant at 50m SUM(http_requests) BY (job) % 3 ^ 2
{job="api-server"} 1
{job="app-server"} 8

# Unsupported by streaming engine.
# eval instant at 50m SUM(http_requests) BY (job) % 2 ^ (3 ^ 2)
# {job="api-server"} 488
# {job="app-server"} 40
eval instant at 50m SUM(http_requests) BY (job) % 2 ^ (3 ^ 2)
{job="api-server"} 488
{job="app-server"} 40

# Unsupported by streaming engine.
# eval instant at 50m SUM(http_requests) BY (job) % 2 ^ 3 ^ 2
# {job="api-server"} 488
# {job="app-server"} 40
eval instant at 50m SUM(http_requests) BY (job) % 2 ^ 3 ^ 2
{job="api-server"} 488
{job="app-server"} 40

# Unsupported by streaming engine.
# eval instant at 50m SUM(http_requests) BY (job) % 2 ^ 3 ^ 2 ^ 2
# {job="api-server"} 1000
# {job="app-server"} 2600
eval instant at 50m SUM(http_requests) BY (job) % 2 ^ 3 ^ 2 ^ 2
{job="api-server"} 1000
{job="app-server"} 2600

# Unsupported by streaming engine.
# eval instant at 50m COUNT(http_requests) BY (job) ^ COUNT(http_requests) BY (job)
Expand Down Expand Up @@ -545,10 +538,8 @@ eval instant at 5m trigy atan2 trigx
eval instant at 5m trigy atan2 trigNaN
{} NaN

# Unsupported by streaming engine.
# eval instant at 5m 10 atan2 20
# 0.4636476090008061
eval instant at 5m 10 atan2 20
0.4636476090008061

# Unsupported by streaming engine.
# eval instant at 5m 10 atan2 NaN
# NaN
eval instant at 5m 10 atan2 NaN
NaN

0 comments on commit 4627dd9

Please sign in to comment.