Skip to content

Commit

Permalink
MQE: Add tests for various mixed data and edge cases (#9281)
Browse files Browse the repository at this point in the history
* Move RequireEqualResults and export for other tests to use

* Expand RequireEqualResults to check histogram floats

* Add test to check various edge cases

* Return nil values where promql does

* Add some extra time combinations

* Add test for combinations

* Fix comment after reordering

* Test different size buckets

* Simplify span matching

* Return nil when query has no results

* Simplify combinations helper

* Move MetricsNames to types package

* Also check range vectors and rate function

* Temporarily work around different annotations

* Update CHANGELOG

* Fix linting

* Add a few extra combinations

* stash

* Address review feedback

* Create testutils package

* Actually add files

* Move metric names back

* Add comment
  • Loading branch information
jhesketh authored Sep 20, 2024
1 parent 940c696 commit aa08b49
Show file tree
Hide file tree
Showing 6 changed files with 390 additions and 78 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
* [CHANGE] Distributor: reject incoming requests until the distributor service has started. #9317
* [FEATURE] Alertmanager: Added `-alertmanager.log-parsing-label-matchers` to control logging when parsing label matchers. This flag is intended to be used with `-alertmanager.utf8-strict-mode-enabled` to validate UTF-8 strict mode is working as intended. The default value is `false`. #9173
* [FEATURE] Alertmanager: Added `-alertmanager.utf8-migration-logging-enabled` to enable logging of tenant configurations that are incompatible with UTF-8 strict mode. The default value is `false`. #9174
* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #8422 #8430 #8454 #8455 #8360 #8490 #8508 #8577 #8660 #8671 #8677 #8747 #8850 #8872 #8838 #8911 #8909 #8923 #8924 #8925 #8932 #8933 #8934 #8962 #8986 #8993 #8995 #9008 #9017 #9018 #9019 #9120 #9121 #9136 #9139 #9140 #9145 #9191 #9192 #9194 #9196 #9201 #9212 #9225 #9260 #9272 #9277 #9278 #9280
* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #8422 #8430 #8454 #8455 #8360 #8490 #8508 #8577 #8660 #8671 #8677 #8747 #8850 #8872 #8838 #8911 #8909 #8923 #8924 #8925 #8932 #8933 #8934 #8962 #8986 #8993 #8995 #9008 #9017 #9018 #9019 #9120 #9121 #9136 #9139 #9140 #9145 #9191 #9192 #9194 #9196 #9201 #9212 #9225 #9260 #9272 #9277 #9278 #9280 #9281
* [FEATURE] Experimental Kafka-based ingest storage. #6888 #6894 #6929 #6940 #6951 #6974 #6982 #7029 #7030 #7091 #7142 #7147 #7148 #7153 #7160 #7193 #7349 #7376 #7388 #7391 #7393 #7394 #7402 #7404 #7423 #7424 #7437 #7486 #7503 #7508 #7540 #7621 #7682 #7685 #7694 #7695 #7696 #7697 #7701 #7733 #7734 #7741 #7752 #7838 #7851 #7871 #7877 #7880 #7882 #7887 #7891 #7925 #7955 #7967 #8031 #8063 #8077 #8088 #8135 #8176 #8184 #8194 #8216 #8217 #8222 #8233 #8503 #8542 #8579 #8657 #8686 #8688 #8703 #8706 #8708 #8738 #8750 #8778 #8808 #8809 #8841 #8842 #8845 #8853 #8886 #8988
* What it is:
* When the new ingest storage architecture is enabled, distributors write incoming write requests to a Kafka-compatible backend, and the ingesters asynchronously replay ingested data from Kafka. In this architecture, the write and read path are de-coupled through a Kafka-compatible backend. The write path and Kafka load is a function of the incoming write traffic, the read path load is a function of received queries. Whatever the load on the read path, it doesn't affect the write path.
Expand Down
79 changes: 3 additions & 76 deletions pkg/streamingpromql/benchmarks/comparison_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"context"
"math"
"os"
"slices"
"testing"
"time"

Expand All @@ -23,7 +22,6 @@ import (
"github.com/grafana/dskit/user"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/promql/parser"
"github.com/prometheus/prometheus/storage"
"github.com/stretchr/testify/require"

Expand All @@ -33,6 +31,7 @@ import (
"github.com/grafana/mimir/pkg/querier"
"github.com/grafana/mimir/pkg/querier/stats"
"github.com/grafana/mimir/pkg/streamingpromql"
"github.com/grafana/mimir/pkg/streamingpromql/testutils"
"github.com/grafana/mimir/pkg/util/validation"
)

Expand Down Expand Up @@ -68,7 +67,7 @@ func BenchmarkQuery(b *testing.B) {
prometheusResult, prometheusClose := c.Run(ctx, b, start, end, interval, prometheusEngine, q)
mimirResult, mimirClose := c.Run(ctx, b, start, end, interval, mimirEngine, q)

requireEqualResults(b, c.Expr, prometheusResult, mimirResult)
testutils.RequireEqualResults(b, c.Expr, prometheusResult, mimirResult, true)

prometheusClose()
mimirClose()
Expand Down Expand Up @@ -109,7 +108,7 @@ func TestBothEnginesReturnSameResultsForBenchmarkQueries(t *testing.T) {
prometheusResult, prometheusClose := c.Run(ctx, t, start, end, interval, prometheusEngine, q)
mimirResult, mimirClose := c.Run(ctx, t, start, end, interval, mimirEngine, q)

requireEqualResults(t, c.Expr, prometheusResult, mimirResult)
testutils.RequireEqualResults(t, c.Expr, prometheusResult, mimirResult, true)

prometheusClose()
mimirClose()
Expand Down Expand Up @@ -175,72 +174,6 @@ func TestBenchmarkSetup(t *testing.T) {
require.Equal(t, 18.4, series.Histograms[0].H.Sum)
}

// Why do we do this rather than require.Equal(t, expected, actual)?
// It's possible that floating point values are slightly different due to imprecision, but require.Equal doesn't allow us to set an allowable difference.
func requireEqualResults(t testing.TB, expr string, expected, actual *promql.Result) {
require.Equal(t, expected.Err, actual.Err)
require.Equal(t, expected.Value.Type(), actual.Value.Type())

expectedWarnings, expectedInfos := expected.Warnings.AsStrings(expr, 0, 0)
actualWarnings, actualInfos := actual.Warnings.AsStrings(expr, 0, 0)
require.ElementsMatch(t, expectedWarnings, actualWarnings)
require.ElementsMatch(t, expectedInfos, actualInfos)

switch expected.Value.Type() {
case parser.ValueTypeVector:
expectedVector, err := expected.Vector()
require.NoError(t, err)
actualVector, err := actual.Vector()
require.NoError(t, err)

// Instant queries don't guarantee any particular sort order, so sort results here so that we can easily compare them.
sortVector(expectedVector)
sortVector(actualVector)

require.Len(t, actualVector, len(expectedVector))

for i, expectedSample := range expectedVector {
actualSample := actualVector[i]

require.Equal(t, expectedSample.Metric, actualSample.Metric)
require.Equal(t, expectedSample.T, actualSample.T)
require.Equal(t, expectedSample.H, actualSample.H)
if expectedSample.F == 0 {
require.Equal(t, expectedSample.F, actualSample.F)
} else {
require.InEpsilon(t, expectedSample.F, actualSample.F, 1e-10)
}
}
case parser.ValueTypeMatrix:
expectedMatrix, err := expected.Matrix()
require.NoError(t, err)
actualMatrix, err := actual.Matrix()
require.NoError(t, err)

require.Len(t, actualMatrix, len(expectedMatrix))

for i, expectedSeries := range expectedMatrix {
actualSeries := actualMatrix[i]

require.Equal(t, expectedSeries.Metric, actualSeries.Metric)
require.Equal(t, expectedSeries.Histograms, actualSeries.Histograms)

for j, expectedPoint := range expectedSeries.Floats {
actualPoint := actualSeries.Floats[j]

require.Equal(t, expectedPoint.T, actualPoint.T)
if expectedPoint.F == 0 {
require.Equal(t, expectedPoint.F, actualPoint.F)
} else {
require.InEpsilonf(t, expectedPoint.F, actualPoint.F, 1e-10, "expected series %v to have points %v, but result is %v", expectedSeries.Metric.String(), expectedSeries.Floats, actualSeries.Floats)
}
}
}
default:
require.Fail(t, "unexpected value type", "type: %v", expected.Value.Type())
}
}

func createBenchmarkQueryable(t testing.TB, metricSizes []int) storage.Queryable {
addr := os.Getenv("MIMIR_PROMQL_ENGINE_BENCHMARK_INGESTER_ADDR")

Expand Down Expand Up @@ -317,9 +250,3 @@ type alwaysQueryIngestersConfigProvider struct{}
func (a alwaysQueryIngestersConfigProvider) QueryIngestersWithin(string) time.Duration {
return time.Duration(math.MaxInt64)
}

func sortVector(v promql.Vector) {
slices.SortFunc(v, func(a, b promql.Sample) int {
return labels.Compare(a.Metric, b.Metric)
})
}
154 changes: 154 additions & 0 deletions pkg/streamingpromql/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (

"github.com/grafana/mimir/pkg/querier/stats"
"github.com/grafana/mimir/pkg/streamingpromql/compat"
"github.com/grafana/mimir/pkg/streamingpromql/testutils"
"github.com/grafana/mimir/pkg/streamingpromql/types"
"github.com/grafana/mimir/pkg/util/globalerror"
)
Expand Down Expand Up @@ -1555,6 +1556,26 @@ func TestAnnotations(t *testing.T) {
`PromQL warning: vector contains histograms with incompatible custom buckets for metric name "metric" (1:6)`,
},
},
"rate() over metric without counter suffix with single float or histogram in range": {
data: `
series 3 1 {{schema:3 sum:12 count:7 buckets:[2 2 3]}}
`,
expr: "rate(series[45s])",
expectedWarningAnnotations: []string{},
expectedInfoAnnotations: []string{},
// This can be removed once https://github.com/prometheus/prometheus/pull/14910 is vendored.
skipComparisonWithPrometheusReason: "Prometheus only considers the type of the last point in the vector selector rather than the output value",
},
"rate() over one point in range": {
data: `
series 1
`,
expr: "rate(series[1m])",
expectedWarningAnnotations: []string{},
expectedInfoAnnotations: []string{},
// This can be removed once https://github.com/prometheus/prometheus/pull/14910 is vendored.
skipComparisonWithPrometheusReason: "Prometheus only considers the type of the last point in the vector selector rather than the output value",
},

"sum_over_time() over series with both floats and histograms": {
data: `some_metric 10 {{schema:0 sum:1 count:1 buckets:[1]}}`,
Expand Down Expand Up @@ -1682,3 +1703,136 @@ func TestAnnotations(t *testing.T) {
})
}
}

func TestCompareVariousMixedMetrics(t *testing.T) {
// Although most tests are covered with the promql test files (both ours and upstream),
// there is a lot of repetition around a few edge cases.
// This is not intended to be comprehensive, but instead check for some common edge cases
// ensuring MQE and Prometheus' engines return the same result when querying:
// - Series with mixed floats and histograms
// - Aggregations with mixed data types
// - Points with NaN
// - Stale markers
// - Look backs

opts := NewTestEngineOpts()
mimirEngine, err := NewEngine(opts, NewStaticQueryLimitsProvider(0), stats.NewQueryMetrics(nil), log.NewNopLogger())
require.NoError(t, err)

prometheusEngine := promql.NewEngine(opts.CommonOpts)

// We're loading series with the follow combinations of values. This is difficult to visually see in the actual
// data loaded, so it is represented in a table here.
// f = float value, h = native histogram, _ = no value, N = NaN, s = stale
// {a} f f f f f f
// {b} h h h h h h
// {c} f h f h N h
// {d} f _ _ s f f
// {e} h h _ s h N
// {f} f N _ f f N
// {g} N N N N N N
// {h} N N N _ N s
// {i} f h _ N h s
// {j} f f s s s s
// {k} 0 0 0 N s 0
// {l} h _ f _ s N
// {m} s s N _ _ f
// {n} _ _ _ _ _ _

pointsPerSeries := 6
samples := `
series{label="a", group="a"} 1 2 3 4 5 -50
series{label="b", group="a"} {{schema:1 sum:15 count:10 buckets:[3 2 5 7 9]}} {{schema:2 sum:20 count:15 buckets:[4]}} {{schema:3 sum:25 count:20 buckets:[5 8]}} {{schema:4 sum:30 count:25 buckets:[6 9 10 11]}} {{schema:5 sum:35 count:30 buckets:[7 10 13]}} {{schema:6 sum:40 count:35 buckets:[8 11 14]}}
series{label="c", group="a"} 1 {{schema:3 sum:5 count:3 buckets:[1 1 1]}} 3 {{schema:3 sum:10 count:6 buckets:[2 2 2]}} NaN {{schema:3 sum:12 count:7 buckets:[2 2 3]}}
series{label="d", group="a"} 1 _ _ stale 5 6
series{label="e", group="b"} {{schema:4 sum:12 count:8 buckets:[2 3 3]}} {{schema:4 sum:14 count:9 buckets:[3 3 3]}} _ stale {{schema:4 sum:18 count:11 buckets:[4 4 3]}} NaN
series{label="f", group="b"} 1 NaN _ 4 5 NaN
series{label="g", group="b"} NaN NaN NaN NaN NaN NaN
series{label="h", group="b"} NaN NaN NaN _ NaN stale
series{label="i", group="c"} 1 {{schema:5 sum:15 count:10 buckets:[3 2 5]}} _ NaN {{schema:2 sum:30 count:25 buckets:[6 9 10 9 1]}} stale
series{label="j", group="c"} 1 -20 stale stale stale stale
series{label="k", group="c"} 0 0 0 NaN stale 0
series{label="l", group="d"} {{schema:1 sum:10 count:5 buckets:[1 2]}} _ 3 _ stale NaN
series{label="m", group="d"} stale stale NaN _ _ 4
series{label="n", group="d"}
`

// Labels for generating combinations
labels := []string{"a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k"}

// Generate combinations of 2 and 3 labels. (e.g., "a,b", "e,f", "c,d,e" etc)
// These will be used for binary operations, so we can add up to 3 series.
labelCombinations := testutils.Combinations(labels, 2)
labelCombinations = append(labelCombinations, testutils.Combinations(labels, 3)...)

expressions := []string{}

// Binary operations
for _, labels := range labelCombinations {
if len(labels) >= 2 {
for _, op := range []string{"+", "-", "*", "/"} {
binaryExpr := fmt.Sprintf(`series{label="%s"}`, labels[0])
for _, label := range labels[1:] {
binaryExpr += fmt.Sprintf(` %s series{label="%s"}`, op, label)
}
expressions = append(expressions, binaryExpr)
}
}
}

// For aggregations, also add combinations of 4 labels. (e.g., "a,b,c,d", "c,d,e,f" etc)
labelCombinations = append(labelCombinations, testutils.Combinations(labels, 4)...)

for _, labels := range labelCombinations {
labelRegex := strings.Join(labels, "|")
// Aggregations
for _, aggFunc := range []string{"sum", "avg", "min", "max"} {
expressions = append(expressions, fmt.Sprintf(`%s(series{label=~"(%s)"})`, aggFunc, labelRegex))
expressions = append(expressions, fmt.Sprintf(`%s by (group) (series{label=~"(%s)"})`, aggFunc, labelRegex))
expressions = append(expressions, fmt.Sprintf(`%s without (group) (series{label=~"(%s)"})`, aggFunc, labelRegex))
}
// Multiple range-vector times are used to check lookbacks that only select single points, multiple points, and boundaries.
expressions = append(expressions, fmt.Sprintf(`rate(series{label=~"(%s)"}[45s])`, labelRegex))
expressions = append(expressions, fmt.Sprintf(`rate(series{label=~"(%s)"}[1m])`, labelRegex))
expressions = append(expressions, fmt.Sprintf(`avg(rate(series{label=~"(%s)"}[2m15s]))`, labelRegex))
expressions = append(expressions, fmt.Sprintf(`avg(rate(series{label=~"(%s)"}[5m]))`, labelRegex))
}

timeRanges := []struct {
loadStep int
interval time.Duration
}{
{loadStep: 1, interval: 1 * time.Minute},
{loadStep: 1, interval: 6 * time.Minute},
{loadStep: 1, interval: 5 * time.Minute},
{loadStep: 6, interval: 6 * time.Minute},
{loadStep: 6, interval: 5 * time.Minute},
}

for _, tr := range timeRanges {
start := timestamp.Time(0)
end := start.Add(time.Duration(pointsPerSeries*tr.loadStep) * time.Minute) // Deliberately queries 1 step past the final loaded point

storage := promqltest.LoadedStorage(t, fmt.Sprintf("load %dm", tr.loadStep)+samples)
t.Cleanup(func() { require.NoError(t, storage.Close()) })

for _, expr := range expressions {
testName := fmt.Sprintf("Expr: %s, Start: %d, End: %d, Interval: %s", expr, start.Unix(), end.Unix(), tr.interval)
t.Run(testName, func(t *testing.T) {
q, err := prometheusEngine.NewRangeQuery(context.Background(), storage, nil, expr, start, end, tr.interval)
require.NoError(t, err)
defer q.Close()
expectedResults := q.Exec(context.Background())

q, err = mimirEngine.NewRangeQuery(context.Background(), storage, nil, expr, start, end, tr.interval)
require.NoError(t, err)
defer q.Close()
mimirResults := q.Exec(context.Background())

// We currently omit checking the annotations due to a difference between the engines.
// This can be re-enabled once https://github.com/prometheus/prometheus/pull/14910 is vendored.
testutils.RequireEqualResults(t, expr, expectedResults, mimirResults, false)
})
}
}
}
9 changes: 8 additions & 1 deletion pkg/streamingpromql/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -493,7 +493,10 @@ func (q *Query) Exec(ctx context.Context) *promql.Result {
return &promql.Result{Err: compat.NewNotSupportedError(fmt.Sprintf("unsupported result type %s", parser.DocumentedType(q.statement.Expr.Type())))}
}

q.result.Warnings = *q.annotations
// To make comparing to Prometheus' engine easier, only return the annotations if there are some, otherwise, return nil.
if len(*q.annotations) > 0 {
q.result.Warnings = *q.annotations
}

return q.result
}
Expand Down Expand Up @@ -571,6 +574,10 @@ func (q *Query) populateMatrixFromInstantVectorOperator(ctx context.Context, o t
})
}

if len(m) == 0 {
return nil, nil
}

slices.SortFunc(m, func(a, b promql.Series) int {
return labels.Compare(a.Metric, b.Metric)
})
Expand Down
Loading

0 comments on commit aa08b49

Please sign in to comment.