diff --git a/CHANGELOG.md b/CHANGELOG.md index 201fff5c40e..6966665ee82 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,7 +14,7 @@ * `cortex_alertmanager_alerts` * `cortex_alertmanager_silences` * [CHANGE] Distributor: Drop experimental `-distributor.direct-otlp-translation-enabled` flag, since direct OTLP translation is well tested at this point. #9647 -* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #9367 #9368 #9398 #9399 #9403 #9417 #9418 #9419 #9420 #9482 #9504 #9505 #9507 #9518 #9531 #9532 #9533 #9553 #9558 #9588 #9589 #9639 #9641 #9642 +* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #9367 #9368 #9398 #9399 #9403 #9417 #9418 #9419 #9420 #9482 #9504 #9505 #9507 #9518 #9531 #9532 #9533 #9553 #9558 #9588 #9589 #9639 #9641 #9642 #9664 * [FEATURE] Query-frontend: added experimental configuration options `query-frontend.cache-errors` and `query-frontend.results-cache-ttl-for-errors` to allow non-transient responses to be cached. When set to `true` error responses from hitting limits or bad data are cached for a short TTL. #9028 * [FEATURE] gRPC: Support S2 compression. #9322 * `-alertmanager.alertmanager-client.grpc-compression=s2` diff --git a/cmd/mimir/config-descriptor.json b/cmd/mimir/config-descriptor.json index 3ffd8bf20cf..c9c0754f665 100644 --- a/cmd/mimir/config-descriptor.json +++ b/cmd/mimir/config-descriptor.json @@ -2032,6 +2032,17 @@ "fieldFlag": "querier.mimir-query-engine.enable-scalars", "fieldType": "boolean", "fieldCategory": "experimental" + }, + { + "kind": "field", + "name": "enable_subqueries", + "required": false, + "desc": "Enable support for subqueries in Mimir's query engine. Only applies if the Mimir query engine is in use.", + "fieldValue": null, + "fieldDefaultValue": true, + "fieldFlag": "querier.mimir-query-engine.enable-subqueries", + "fieldType": "boolean", + "fieldCategory": "experimental" } ], "fieldValue": null, diff --git a/cmd/mimir/help-all.txt.tmpl b/cmd/mimir/help-all.txt.tmpl index f9c2835e18b..6417172c9f9 100644 --- a/cmd/mimir/help-all.txt.tmpl +++ b/cmd/mimir/help-all.txt.tmpl @@ -1957,6 +1957,8 @@ Usage of ./cmd/mimir/mimir: [experimental] Enable support for binary comparison operations between two scalars in Mimir's query engine. Only applies if the Mimir query engine is in use. (default true) -querier.mimir-query-engine.enable-scalars [experimental] Enable support for scalars in Mimir's query engine. Only applies if the Mimir query engine is in use. (default true) + -querier.mimir-query-engine.enable-subqueries + [experimental] Enable support for subqueries in Mimir's query engine. Only applies if the Mimir query engine is in use. (default true) -querier.mimir-query-engine.enable-vector-scalar-binary-comparison-operations [experimental] Enable support for binary comparison operations between a vector and a scalar in Mimir's query engine. Only applies if the Mimir query engine is in use. (default true) -querier.mimir-query-engine.enable-vector-vector-binary-comparison-operations diff --git a/docs/sources/mimir/configure/configuration-parameters/index.md b/docs/sources/mimir/configure/configuration-parameters/index.md index baaa03e4d2a..5da9d5db8f8 100644 --- a/docs/sources/mimir/configure/configuration-parameters/index.md +++ b/docs/sources/mimir/configure/configuration-parameters/index.md @@ -1534,6 +1534,11 @@ mimir_query_engine: # applies if the Mimir query engine is in use. # CLI flag: -querier.mimir-query-engine.enable-scalars [enable_scalars: | default = true] + + # (experimental) Enable support for subqueries in Mimir's query engine. Only + # applies if the Mimir query engine is in use. + # CLI flag: -querier.mimir-query-engine.enable-subqueries + [enable_subqueries: | default = true] ``` ### frontend diff --git a/pkg/streamingpromql/benchmarks/benchmarks.go b/pkg/streamingpromql/benchmarks/benchmarks.go index 96a5022aca2..61888124340 100644 --- a/pkg/streamingpromql/benchmarks/benchmarks.go +++ b/pkg/streamingpromql/benchmarks/benchmarks.go @@ -130,6 +130,16 @@ func TestCases(metricSizes []int) []BenchCase { //{ // Expr: "absent_over_time(a_X[1d])", //}, + // Subqueries. + { + Expr: "sum_over_time(a_X[10m:3m])", + }, + { + Expr: "sum_over_time(nh_X[10m:3m])", + }, + { + Expr: "sum(sum_over_time(a_X[10m:3m]))", + }, //// Unary operators. //{ // Expr: "-a_X", diff --git a/pkg/streamingpromql/config.go b/pkg/streamingpromql/config.go index 9a994eb3352..f9689de27cb 100644 --- a/pkg/streamingpromql/config.go +++ b/pkg/streamingpromql/config.go @@ -24,6 +24,7 @@ type FeatureToggles struct { EnableScalarScalarBinaryComparisonOperations bool `yaml:"enable_scalar_scalar_binary_comparison_operations" category:"experimental"` EnableBinaryLogicalOperations bool `yaml:"enable_binary_logical_operations" category:"experimental"` EnableScalars bool `yaml:"enable_scalars" category:"experimental"` + EnableSubqueries bool `yaml:"enable_subqueries" category:"experimental"` } // EnableAllFeatures enables all features supported by MQE, including experimental or incomplete features. @@ -35,6 +36,7 @@ var EnableAllFeatures = FeatureToggles{ true, true, true, + true, } func (t *FeatureToggles) RegisterFlags(f *flag.FlagSet) { @@ -44,4 +46,5 @@ func (t *FeatureToggles) RegisterFlags(f *flag.FlagSet) { f.BoolVar(&t.EnableScalarScalarBinaryComparisonOperations, "querier.mimir-query-engine.enable-scalar-scalar-binary-comparison-operations", true, "Enable support for binary comparison operations between two scalars in Mimir's query engine. Only applies if the Mimir query engine is in use.") f.BoolVar(&t.EnableBinaryLogicalOperations, "querier.mimir-query-engine.enable-binary-logical-operations", true, "Enable support for binary logical operations in Mimir's query engine. Only applies if the Mimir query engine is in use.") f.BoolVar(&t.EnableScalars, "querier.mimir-query-engine.enable-scalars", true, "Enable support for scalars in Mimir's query engine. Only applies if the Mimir query engine is in use.") + f.BoolVar(&t.EnableSubqueries, "querier.mimir-query-engine.enable-subqueries", true, "Enable support for subqueries in Mimir's query engine. Only applies if the Mimir query engine is in use.") } diff --git a/pkg/streamingpromql/engine.go b/pkg/streamingpromql/engine.go index bf31ade83c4..23425acdf12 100644 --- a/pkg/streamingpromql/engine.go +++ b/pkg/streamingpromql/engine.go @@ -45,11 +45,12 @@ func NewEngine(opts EngineOpts, limitsProvider QueryLimitsProvider, metrics *sta } return &Engine{ - lookbackDelta: lookbackDelta, - timeout: opts.CommonOpts.Timeout, - limitsProvider: limitsProvider, - activeQueryTracker: opts.CommonOpts.ActiveQueryTracker, - featureToggles: opts.FeatureToggles, + lookbackDelta: lookbackDelta, + timeout: opts.CommonOpts.Timeout, + limitsProvider: limitsProvider, + activeQueryTracker: opts.CommonOpts.ActiveQueryTracker, + featureToggles: opts.FeatureToggles, + noStepSubqueryIntervalFn: opts.CommonOpts.NoStepSubqueryIntervalFn, logger: logger, estimatedPeakMemoryConsumption: promauto.With(opts.CommonOpts.Reg).NewHistogram(prometheus.HistogramOpts{ @@ -64,11 +65,12 @@ func NewEngine(opts EngineOpts, limitsProvider QueryLimitsProvider, metrics *sta } type Engine struct { - lookbackDelta time.Duration - timeout time.Duration - limitsProvider QueryLimitsProvider - activeQueryTracker promql.QueryTracker - featureToggles FeatureToggles + lookbackDelta time.Duration + timeout time.Duration + limitsProvider QueryLimitsProvider + activeQueryTracker promql.QueryTracker + featureToggles FeatureToggles + noStepSubqueryIntervalFn func(rangeMillis int64) int64 logger log.Logger estimatedPeakMemoryConsumption prometheus.Histogram diff --git a/pkg/streamingpromql/engine_test.go b/pkg/streamingpromql/engine_test.go index 9cc0d81d646..8ad786e5896 100644 --- a/pkg/streamingpromql/engine_test.go +++ b/pkg/streamingpromql/engine_test.go @@ -1,4 +1,7 @@ // SPDX-License-Identifier: AGPL-3.0-only +// Provenance-includes-location: https://github.com/prometheus/prometheus/tree/main/promql/engine_test.go +// Provenance-includes-license: Apache-2.0 +// Provenance-includes-copyright: The Prometheus Authors package streamingpromql @@ -23,6 +26,7 @@ import ( "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/timestamp" "github.com/prometheus/prometheus/promql" + "github.com/prometheus/prometheus/promql/parser/posrange" "github.com/prometheus/prometheus/promql/promqltest" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/util/annotations" @@ -46,7 +50,6 @@ func TestUnsupportedPromQLFeatures(t *testing.T) { "metric{} + on() group_right() other_metric{}": "binary expression with one-to-many matching", "topk(5, metric{})": "'topk' aggregation with parameter", `count_values("foo", metric{})`: "'count_values' aggregation with parameter", - "rate(metric{}[5m:1m])": "PromQL expression type *parser.SubqueryExpr for range vectors", "quantile_over_time(0.4, metric{}[5m])": "'quantile_over_time' function", "quantile(0.95, metric{})": "'quantile' aggregation with parameter", } @@ -56,17 +59,6 @@ func TestUnsupportedPromQLFeatures(t *testing.T) { requireQueryIsUnsupported(t, featureToggles, expression, expectedError) }) } - - // These expressions are also unsupported, but are only valid as instant queries. - unsupportedInstantQueryExpressions := map[string]string{ - "metric{}[5m:1m]": "PromQL expression type *parser.SubqueryExpr for range vectors", - } - - for expression, expectedError := range unsupportedInstantQueryExpressions { - t.Run(expression, func(t *testing.T) { - requireInstantQueryIsUnsupported(t, featureToggles, expression, expectedError) - }) - } } func TestUnsupportedPromQLFeaturesWithFeatureToggles(t *testing.T) { @@ -153,6 +145,13 @@ func TestUnsupportedPromQLFeaturesWithFeatureToggles(t *testing.T) { requireQueryIsUnsupported(t, featureToggles, "2", "scalar values") }) + + t.Run("subqueries", func(t *testing.T) { + featureToggles := EnableAllFeatures + featureToggles.EnableSubqueries = false + + requireQueryIsUnsupported(t, featureToggles, "sum_over_time(metric[1m:10s])", "subquery") + }) } func requireQueryIsUnsupported(t *testing.T, toggles FeatureToggles, expression string, expectedError string) { @@ -861,6 +860,333 @@ func TestRangeVectorSelectors(t *testing.T) { } } +func TestSubqueries(t *testing.T) { + // This test is based on Prometheus' TestSubquerySelector. + data := `load 10s + metric{type="floats"} 1 2 + metric{type="histograms"} {{count:1}} {{count:2}} + http_requests{job="api-server", instance="0", group="production"} 0+10x1000 100+30x1000 + http_requests{job="api-server", instance="1", group="production"} 0+20x1000 200+30x1000 + http_requests{job="api-server", instance="0", group="canary"} 0+30x1000 300+80x1000 + http_requests{job="api-server", instance="1", group="canary"} 0+40x2000 + other_metric{type="floats"} 0 4 3 6 -1 10 + other_metric{type="histograms"} {{count:0}} {{count:4}} {{count:3}} {{count:6}} {{count:-1}} {{count:10}} + other_metric{type="mixed"} 0 4 3 6 {{count:-1}} {{count:10}} + ` + + opts := NewTestEngineOpts() + mimirEngine, err := NewEngine(opts, NewStaticQueryLimitsProvider(0), stats.NewQueryMetrics(nil), log.NewNopLogger()) + require.NoError(t, err) + prometheusEngine := promql.NewEngine(opts.CommonOpts) + storage := promqltest.LoadedStorage(t, data) + t.Cleanup(func() { storage.Close() }) + + testCases := []struct { + Query string + Result promql.Result + Start time.Time + }{ + { + Query: "metric[20s:10s]", + Result: promql.Result{ + Value: promql.Matrix{ + promql.Series{ + Floats: []promql.FPoint{{F: 1, T: 0}, {F: 2, T: 10000}}, + Metric: labels.FromStrings("__name__", "metric", "type", "floats"), + }, + promql.Series{ + Histograms: []promql.HPoint{{H: &histogram.FloatHistogram{Count: 1}, T: 0}, {H: &histogram.FloatHistogram{Count: 2}, T: 10000}}, + Metric: labels.FromStrings("__name__", "metric", "type", "histograms"), + }, + }, + }, + Start: time.Unix(10, 0), + }, + { + Query: "metric[20s:5s]", + Result: promql.Result{ + Value: promql.Matrix{ + promql.Series{ + Floats: []promql.FPoint{{F: 1, T: 0}, {F: 1, T: 5000}, {F: 2, T: 10000}}, + Metric: labels.FromStrings("__name__", "metric", "type", "floats"), + }, + promql.Series{ + Histograms: []promql.HPoint{{H: &histogram.FloatHistogram{Count: 1}, T: 0}, {H: &histogram.FloatHistogram{Count: 1}, T: 5000}, {H: &histogram.FloatHistogram{Count: 2}, T: 10000}}, + Metric: labels.FromStrings("__name__", "metric", "type", "histograms"), + }, + }, + }, + Start: time.Unix(10, 0), + }, + { + Query: "metric[20s:5s] offset 2s", + Result: promql.Result{ + Value: promql.Matrix{ + promql.Series{ + Floats: []promql.FPoint{{F: 1, T: 0}, {F: 1, T: 5000}, {F: 2, T: 10000}}, + Metric: labels.FromStrings("__name__", "metric", "type", "floats"), + }, + promql.Series{ + Histograms: []promql.HPoint{{H: &histogram.FloatHistogram{Count: 1}, T: 0}, {H: &histogram.FloatHistogram{Count: 1}, T: 5000}, {H: &histogram.FloatHistogram{Count: 2}, T: 10000}}, + Metric: labels.FromStrings("__name__", "metric", "type", "histograms"), + }, + }, + }, + Start: time.Unix(12, 0), + }, + { + Query: "metric[20s:5s] offset 6s", + Result: promql.Result{ + Value: promql.Matrix{ + promql.Series{ + Floats: []promql.FPoint{{F: 1, T: 0}, {F: 1, T: 5000}, {F: 2, T: 10000}}, + Metric: labels.FromStrings("__name__", "metric", "type", "floats"), + }, + promql.Series{ + Histograms: []promql.HPoint{{H: &histogram.FloatHistogram{Count: 1}, T: 0}, {H: &histogram.FloatHistogram{Count: 1}, T: 5000}, {H: &histogram.FloatHistogram{Count: 2}, T: 10000}}, + Metric: labels.FromStrings("__name__", "metric", "type", "histograms"), + }, + }, + }, + Start: time.Unix(20, 0), + }, + { + Query: "metric[20s:5s] offset 4s", + Result: promql.Result{ + Value: promql.Matrix{ + promql.Series{ + Floats: []promql.FPoint{{F: 2, T: 15000}, {F: 2, T: 20000}, {F: 2, T: 25000}, {F: 2, T: 30000}}, + Metric: labels.FromStrings("__name__", "metric", "type", "floats"), + }, + promql.Series{ + Histograms: []promql.HPoint{{H: &histogram.FloatHistogram{Count: 2}, T: 15000}, {H: &histogram.FloatHistogram{Count: 2}, T: 20000}, {H: &histogram.FloatHistogram{Count: 2}, T: 25000}, {H: &histogram.FloatHistogram{Count: 2}, T: 30000}}, + Metric: labels.FromStrings("__name__", "metric", "type", "histograms"), + }, + }, + }, + Start: time.Unix(35, 0), + }, + { + Query: "metric[20s:5s] offset 5s", + Result: promql.Result{ + Value: promql.Matrix{ + promql.Series{ + Floats: []promql.FPoint{{F: 2, T: 10000}, {F: 2, T: 15000}, {F: 2, T: 20000}, {F: 2, T: 25000}, {F: 2, T: 30000}}, + Metric: labels.FromStrings("__name__", "metric", "type", "floats"), + }, + promql.Series{ + Histograms: []promql.HPoint{{H: &histogram.FloatHistogram{Count: 2}, T: 10000}, {H: &histogram.FloatHistogram{Count: 2}, T: 15000}, {H: &histogram.FloatHistogram{Count: 2}, T: 20000}, {H: &histogram.FloatHistogram{Count: 2}, T: 25000}, {H: &histogram.FloatHistogram{Count: 2}, T: 30000}}, + Metric: labels.FromStrings("__name__", "metric", "type", "histograms"), + }, + }, + }, + Start: time.Unix(35, 0), + }, + { + Query: "metric[20s:5s] offset 6s", + Result: promql.Result{ + Value: promql.Matrix{ + promql.Series{ + Floats: []promql.FPoint{{F: 2, T: 10000}, {F: 2, T: 15000}, {F: 2, T: 20000}, {F: 2, T: 25000}}, + Metric: labels.FromStrings("__name__", "metric", "type", "floats"), + }, + promql.Series{ + Histograms: []promql.HPoint{{H: &histogram.FloatHistogram{Count: 2}, T: 10000}, {H: &histogram.FloatHistogram{Count: 2}, T: 15000}, {H: &histogram.FloatHistogram{Count: 2}, T: 20000}, {H: &histogram.FloatHistogram{Count: 2}, T: 25000}}, + Metric: labels.FromStrings("__name__", "metric", "type", "histograms"), + }, + }, + }, + Start: time.Unix(35, 0), + }, + { + Query: "metric[20s:5s] offset 7s", + Result: promql.Result{ + Value: promql.Matrix{ + promql.Series{ + Floats: []promql.FPoint{{F: 2, T: 10000}, {F: 2, T: 15000}, {F: 2, T: 20000}, {F: 2, T: 25000}}, + Metric: labels.FromStrings("__name__", "metric", "type", "floats"), + }, + promql.Series{ + Histograms: []promql.HPoint{{H: &histogram.FloatHistogram{Count: 2}, T: 10000}, {H: &histogram.FloatHistogram{Count: 2}, T: 15000}, {H: &histogram.FloatHistogram{Count: 2}, T: 20000}, {H: &histogram.FloatHistogram{Count: 2}, T: 25000}}, + Metric: labels.FromStrings("__name__", "metric", "type", "histograms"), + }, + }, + }, + Start: time.Unix(35, 0), + }, + { // Normal selector. + Query: `http_requests{group=~"pro.*",instance="0"}[30s:10s]`, + Result: promql.Result{ + Value: promql.Matrix{ + promql.Series{ + Floats: []promql.FPoint{{F: 9990, T: 9990000}, {F: 10000, T: 10000000}, {F: 100, T: 10010000}, {F: 130, T: 10020000}}, + Metric: labels.FromStrings("__name__", "http_requests", "job", "api-server", "instance", "0", "group", "production"), + }, + }, + }, + Start: time.Unix(10020, 0), + }, + { // Default step. + Query: `http_requests{group=~"pro.*",instance="0"}[5m:]`, + Result: promql.Result{ + Value: promql.Matrix{ + promql.Series{ + Floats: []promql.FPoint{{F: 9840, T: 9840000}, {F: 9900, T: 9900000}, {F: 9960, T: 9960000}, {F: 130, T: 10020000}, {F: 310, T: 10080000}}, + Metric: labels.FromStrings("__name__", "http_requests", "job", "api-server", "instance", "0", "group", "production"), + }, + }, + }, + Start: time.Unix(10100, 0), + }, + { // Checking if high offset (>LookbackDelta) is being taken care of. + Query: `http_requests{group=~"pro.*",instance="0"}[5m:] offset 20m`, + Result: promql.Result{ + Value: promql.Matrix{ + promql.Series{ + Floats: []promql.FPoint{{F: 8640, T: 8640000}, {F: 8700, T: 8700000}, {F: 8760, T: 8760000}, {F: 8820, T: 8820000}, {F: 8880, T: 8880000}}, + Metric: labels.FromStrings("__name__", "http_requests", "job", "api-server", "instance", "0", "group", "production"), + }, + }, + }, + Start: time.Unix(10100, 0), + }, + { + Query: `rate(http_requests[1m])[15s:5s]`, + Result: promql.Result{ + Value: promql.Matrix{ + promql.Series{ + Floats: []promql.FPoint{{F: 3, T: 7985000}, {F: 3, T: 7990000}, {F: 3, T: 7995000}, {F: 3, T: 8000000}}, + Metric: labels.FromStrings("job", "api-server", "instance", "0", "group", "canary"), + DropName: true, + }, + promql.Series{ + Floats: []promql.FPoint{{F: 4, T: 7985000}, {F: 4, T: 7990000}, {F: 4, T: 7995000}, {F: 4, T: 8000000}}, + Metric: labels.FromStrings("job", "api-server", "instance", "1", "group", "canary"), + DropName: true, + }, + promql.Series{ + Floats: []promql.FPoint{{F: 1, T: 7985000}, {F: 1, T: 7990000}, {F: 1, T: 7995000}, {F: 1, T: 8000000}}, + Metric: labels.FromStrings("job", "api-server", "instance", "0", "group", "production"), + DropName: true, + }, + promql.Series{ + Floats: []promql.FPoint{{F: 2, T: 7985000}, {F: 2, T: 7990000}, {F: 2, T: 7995000}, {F: 2, T: 8000000}}, + Metric: labels.FromStrings("job", "api-server", "instance", "1", "group", "production"), + DropName: true, + }, + }, + Warnings: annotations.New().Add(annotations.NewPossibleNonCounterInfo("http_requests", posrange.PositionRange{Start: 5})), + }, + Start: time.Unix(8000, 0), + }, + { + Query: `sum(http_requests{group=~"pro.*"})[30s:10s]`, + Result: promql.Result{ + Value: promql.Matrix{ + promql.Series{ + Floats: []promql.FPoint{{F: 270, T: 90000}, {F: 300, T: 100000}, {F: 330, T: 110000}, {F: 360, T: 120000}}, + Metric: labels.EmptyLabels(), + }, + }, + }, + Start: time.Unix(120, 0), + }, + { + Query: `sum(http_requests)[40s:10s]`, + Result: promql.Result{ + Value: promql.Matrix{ + promql.Series{ + Floats: []promql.FPoint{{F: 800, T: 80000}, {F: 900, T: 90000}, {F: 1000, T: 100000}, {F: 1100, T: 110000}, {F: 1200, T: 120000}}, + Metric: labels.EmptyLabels(), + }, + }, + }, + Start: time.Unix(120, 0), + }, + { + Query: `(sum(http_requests{group=~"p.*"})+sum(http_requests{group=~"c.*"}))[20s:5s]`, + Result: promql.Result{ + Value: promql.Matrix{ + promql.Series{ + Floats: []promql.FPoint{{F: 1000, T: 100000}, {F: 1000, T: 105000}, {F: 1100, T: 110000}, {F: 1100, T: 115000}, {F: 1200, T: 120000}}, + Metric: labels.EmptyLabels(), + }, + }, + }, + Start: time.Unix(120, 0), + }, + // These tests exercise @ start() and @ end(), and use the same data as testdata/ours/subqueries.test, to + // mirror the range query tests there. + { + Query: `last_over_time(other_metric[20s:10s] @ start())`, + Result: promql.Result{ + Value: promql.Vector{ + { + F: -1, + T: 40000, + Metric: labels.FromStrings(labels.MetricName, "other_metric", "type", "floats"), + }, + { + H: &histogram.FloatHistogram{Count: -1, CounterResetHint: histogram.CounterReset}, + T: 40000, + Metric: labels.FromStrings(labels.MetricName, "other_metric", "type", "histograms"), + }, + { + H: &histogram.FloatHistogram{Count: -1, CounterResetHint: histogram.UnknownCounterReset}, + T: 40000, + Metric: labels.FromStrings(labels.MetricName, "other_metric", "type", "mixed"), + }, + }, + }, + Start: time.Unix(40, 0), + }, + { + Query: `last_over_time(other_metric[20s:10s] @ end())`, + Result: promql.Result{ + Value: promql.Vector{ + { + F: 6, + T: 30000, + Metric: labels.FromStrings(labels.MetricName, "other_metric", "type", "floats"), + }, + { + H: &histogram.FloatHistogram{Count: 6, CounterResetHint: histogram.NotCounterReset}, + T: 30000, + Metric: labels.FromStrings(labels.MetricName, "other_metric", "type", "histograms"), + }, + { + F: 6, + T: 30000, + Metric: labels.FromStrings(labels.MetricName, "other_metric", "type", "mixed"), + }, + }, + }, + Start: time.Unix(30, 0), + }, + } + + for _, testCase := range testCases { + t.Run(fmt.Sprintf("%v evaluated at %v", testCase.Query, testCase.Start.Unix()), func(t *testing.T) { + runTest := func(t *testing.T, engine promql.QueryEngine) { + qry, err := engine.NewInstantQuery(context.Background(), storage, nil, testCase.Query, testCase.Start) + require.NoError(t, err) + + res := qry.Exec(context.Background()) + testutils.RequireEqualResults(t, testCase.Query, &testCase.Result, res) + } + + // Ensure our test cases are correct by running them against Prometheus' engine too. + t.Run("Prometheus' engine", func(t *testing.T) { + runTest(t, prometheusEngine) + }) + + t.Run("Mimir's engine", func(t *testing.T) { + runTest(t, mimirEngine) + }) + }) + } +} + func TestQueryCancellation(t *testing.T) { opts := NewTestEngineOpts() engine, err := NewEngine(opts, NewStaticQueryLimitsProvider(0), stats.NewQueryMetrics(nil), log.NewNopLogger()) diff --git a/pkg/streamingpromql/functions/common.go b/pkg/streamingpromql/functions/common.go index 021421d614c..c9f90522860 100644 --- a/pkg/streamingpromql/functions/common.go +++ b/pkg/streamingpromql/functions/common.go @@ -73,8 +73,6 @@ func PassthroughData(seriesData types.InstantVectorSeriesData, _ []types.ScalarD type RangeVectorStepFunction func( step types.RangeVectorStepData, rangeSeconds float64, - floatBuffer *types.FPointRingBuffer, - histogramBuffer *types.HPointRingBuffer, emitAnnotation EmitAnnotationFunc, ) (f float64, hasFloat bool, h *histogram.FloatHistogram, err error) diff --git a/pkg/streamingpromql/functions/range_vectors.go b/pkg/streamingpromql/functions/range_vectors.go index df0193d8606..1c3b86d090c 100644 --- a/pkg/streamingpromql/functions/range_vectors.go +++ b/pkg/streamingpromql/functions/range_vectors.go @@ -21,9 +21,9 @@ var CountOverTime = FunctionOverRangeVector{ StepFunc: countOverTime, } -func countOverTime(step types.RangeVectorStepData, _ float64, fPoints *types.FPointRingBuffer, hPoints *types.HPointRingBuffer, _ EmitAnnotationFunc) (float64, bool, *histogram.FloatHistogram, error) { - fPointCount := fPoints.CountAtOrBefore(step.RangeEnd) - hPointCount := hPoints.CountAtOrBefore(step.RangeEnd) +func countOverTime(step types.RangeVectorStepData, _ float64, _ EmitAnnotationFunc) (float64, bool, *histogram.FloatHistogram, error) { + fPointCount := step.Floats.CountAtOrBefore(step.RangeEnd) + hPointCount := step.Histograms.CountAtOrBefore(step.RangeEnd) if fPointCount == 0 && hPointCount == 0 { return 0, false, nil, nil @@ -37,9 +37,9 @@ var LastOverTime = FunctionOverRangeVector{ StepFunc: lastOverTime, } -func lastOverTime(step types.RangeVectorStepData, _ float64, fPoints *types.FPointRingBuffer, hPoints *types.HPointRingBuffer, _ EmitAnnotationFunc) (float64, bool, *histogram.FloatHistogram, error) { - lastFloat, floatAvailable := fPoints.LastAtOrBefore(step.RangeEnd) - lastHistogram, histogramAvailable := hPoints.LastAtOrBefore(step.RangeEnd) +func lastOverTime(step types.RangeVectorStepData, _ float64, _ EmitAnnotationFunc) (float64, bool, *histogram.FloatHistogram, error) { + lastFloat, floatAvailable := step.Floats.LastAtOrBefore(step.RangeEnd) + lastHistogram, histogramAvailable := step.Histograms.LastAtOrBefore(step.RangeEnd) if !floatAvailable && !histogramAvailable { return 0, false, nil, nil @@ -58,8 +58,8 @@ var PresentOverTime = FunctionOverRangeVector{ StepFunc: presentOverTime, } -func presentOverTime(step types.RangeVectorStepData, _ float64, fPoints *types.FPointRingBuffer, hPoints *types.HPointRingBuffer, _ EmitAnnotationFunc) (float64, bool, *histogram.FloatHistogram, error) { - if fPoints.AnyAtOrBefore(step.RangeEnd) || hPoints.AnyAtOrBefore(step.RangeEnd) { +func presentOverTime(step types.RangeVectorStepData, _ float64, _ EmitAnnotationFunc) (float64, bool, *histogram.FloatHistogram, error) { + if step.Floats.AnyAtOrBefore(step.RangeEnd) || step.Histograms.AnyAtOrBefore(step.RangeEnd) { return 1, true, nil, nil } @@ -71,8 +71,8 @@ var MaxOverTime = FunctionOverRangeVector{ StepFunc: maxOverTime, } -func maxOverTime(step types.RangeVectorStepData, _ float64, fPoints *types.FPointRingBuffer, _ *types.HPointRingBuffer, _ EmitAnnotationFunc) (float64, bool, *histogram.FloatHistogram, error) { - head, tail := fPoints.UnsafePoints(step.RangeEnd) +func maxOverTime(step types.RangeVectorStepData, _ float64, _ EmitAnnotationFunc) (float64, bool, *histogram.FloatHistogram, error) { + head, tail := step.Floats.UnsafePoints(step.RangeEnd) if len(head) == 0 && len(tail) == 0 { return 0, false, nil, nil @@ -108,8 +108,8 @@ var MinOverTime = FunctionOverRangeVector{ StepFunc: minOverTime, } -func minOverTime(step types.RangeVectorStepData, _ float64, fPoints *types.FPointRingBuffer, _ *types.HPointRingBuffer, _ EmitAnnotationFunc) (float64, bool, *histogram.FloatHistogram, error) { - head, tail := fPoints.UnsafePoints(step.RangeEnd) +func minOverTime(step types.RangeVectorStepData, _ float64, _ EmitAnnotationFunc) (float64, bool, *histogram.FloatHistogram, error) { + head, tail := step.Floats.UnsafePoints(step.RangeEnd) if len(head) == 0 && len(tail) == 0 { return 0, false, nil, nil @@ -146,9 +146,9 @@ var SumOverTime = FunctionOverRangeVector{ NeedsSeriesNamesForAnnotations: true, } -func sumOverTime(step types.RangeVectorStepData, _ float64, fPoints *types.FPointRingBuffer, hPoints *types.HPointRingBuffer, emitAnnotation EmitAnnotationFunc) (float64, bool, *histogram.FloatHistogram, error) { - fHead, fTail := fPoints.UnsafePoints(step.RangeEnd) - hHead, hTail := hPoints.UnsafePoints(step.RangeEnd) +func sumOverTime(step types.RangeVectorStepData, _ float64, emitAnnotation EmitAnnotationFunc) (float64, bool, *histogram.FloatHistogram, error) { + fHead, fTail := step.Floats.UnsafePoints(step.RangeEnd) + hHead, hTail := step.Histograms.UnsafePoints(step.RangeEnd) haveFloats := len(fHead) > 0 || len(fTail) > 0 haveHistograms := len(hHead) > 0 || len(hTail) > 0 @@ -221,9 +221,9 @@ var AvgOverTime = FunctionOverRangeVector{ NeedsSeriesNamesForAnnotations: true, } -func avgOverTime(step types.RangeVectorStepData, _ float64, fPoints *types.FPointRingBuffer, hPoints *types.HPointRingBuffer, emitAnnotation EmitAnnotationFunc) (float64, bool, *histogram.FloatHistogram, error) { - fHead, fTail := fPoints.UnsafePoints(step.RangeEnd) - hHead, hTail := hPoints.UnsafePoints(step.RangeEnd) +func avgOverTime(step types.RangeVectorStepData, _ float64, emitAnnotation EmitAnnotationFunc) (float64, bool, *histogram.FloatHistogram, error) { + fHead, fTail := step.Floats.UnsafePoints(step.RangeEnd) + hHead, hTail := step.Histograms.UnsafePoints(step.RangeEnd) haveFloats := len(fHead) > 0 || len(fTail) > 0 haveHistograms := len(hHead) > 0 || len(hTail) > 0 diff --git a/pkg/streamingpromql/functions/rate_increase.go b/pkg/streamingpromql/functions/rate_increase.go index 45a02aca27e..6fe3160b0b6 100644 --- a/pkg/streamingpromql/functions/rate_increase.go +++ b/pkg/streamingpromql/functions/rate_increase.go @@ -31,11 +31,11 @@ var Increase = FunctionOverRangeVector{ // isRate is true for `rate` function, or false for `instant` function func rate(isRate bool) RangeVectorStepFunction { - return func(step types.RangeVectorStepData, rangeSeconds float64, floatBuffer *types.FPointRingBuffer, histogramBuffer *types.HPointRingBuffer, emitAnnotation EmitAnnotationFunc) (float64, bool, *histogram.FloatHistogram, error) { - fHead, fTail := floatBuffer.UnsafePoints(step.RangeEnd) + return func(step types.RangeVectorStepData, rangeSeconds float64, emitAnnotation EmitAnnotationFunc) (float64, bool, *histogram.FloatHistogram, error) { + fHead, fTail := step.Floats.UnsafePoints(step.RangeEnd) fCount := len(fHead) + len(fTail) - hHead, hTail := histogramBuffer.UnsafePoints(step.RangeEnd) + hHead, hTail := step.Histograms.UnsafePoints(step.RangeEnd) hCount := len(hHead) + len(hTail) if fCount > 0 && hCount > 0 { @@ -46,7 +46,7 @@ func rate(isRate bool) RangeVectorStepFunction { } if fCount >= 2 { - val := floatRate(isRate, fCount, floatBuffer, step, fHead, fTail, rangeSeconds) + val := floatRate(isRate, fCount, step.Floats, step, fHead, fTail, rangeSeconds) return val, true, nil, nil } diff --git a/pkg/streamingpromql/operators/function_over_range_vector.go b/pkg/streamingpromql/operators/function_over_range_vector.go index b60c43a2538..d2fa3e1421c 100644 --- a/pkg/streamingpromql/operators/function_over_range_vector.go +++ b/pkg/streamingpromql/operators/function_over_range_vector.go @@ -29,10 +29,8 @@ type FunctionOverRangeVector struct { metricNames *MetricNames currentSeriesIndex int - numSteps int - rangeSeconds float64 - floatBuffer *types.FPointRingBuffer - histogramBuffer *types.HPointRingBuffer + numSteps int + rangeSeconds float64 expressionPosition posrange.PositionRange emitAnnotationFunc functions.EmitAnnotationFunc @@ -102,21 +100,10 @@ func (m *FunctionOverRangeVector) NextSeries(ctx context.Context) (types.Instant m.currentSeriesIndex++ }() - if m.floatBuffer == nil { - m.floatBuffer = types.NewFPointRingBuffer(m.MemoryConsumptionTracker) - } - - if m.histogramBuffer == nil { - m.histogramBuffer = types.NewHPointRingBuffer(m.MemoryConsumptionTracker) - } - - m.floatBuffer.Reset() - m.histogramBuffer.Reset() - data := types.InstantVectorSeriesData{} for { - step, err := m.Inner.NextStepSamples(m.floatBuffer, m.histogramBuffer) + step, err := m.Inner.NextStepSamples() // nolint:errorlint // errors.Is introduces a performance overhead, and NextStepSamples is guaranteed to return exactly EOS, never a wrapped error. if err == types.EOS { @@ -129,7 +116,7 @@ func (m *FunctionOverRangeVector) NextSeries(ctx context.Context) (types.Instant return types.InstantVectorSeriesData{}, err } - f, hasFloat, h, err := m.Func.StepFunc(step, m.rangeSeconds, m.floatBuffer, m.histogramBuffer, m.emitAnnotationFunc) + f, hasFloat, h, err := m.Func.StepFunc(step, m.rangeSeconds, m.emitAnnotationFunc) if err != nil { return types.InstantVectorSeriesData{}, err } @@ -167,11 +154,4 @@ func (m *FunctionOverRangeVector) emitAnnotation(generator functions.AnnotationG func (m *FunctionOverRangeVector) Close() { m.Inner.Close() - - if m.floatBuffer != nil { - m.floatBuffer.Close() - } - if m.histogramBuffer != nil { - m.histogramBuffer.Close() - } } diff --git a/pkg/streamingpromql/operators/range_vector_selector.go b/pkg/streamingpromql/operators/range_vector_selector.go index 9f08e41acf0..e6ac6fa5bc9 100644 --- a/pkg/streamingpromql/operators/range_vector_selector.go +++ b/pkg/streamingpromql/operators/range_vector_selector.go @@ -15,6 +15,7 @@ import ( "github.com/prometheus/prometheus/promql/parser/posrange" "github.com/prometheus/prometheus/tsdb/chunkenc" + "github.com/grafana/mimir/pkg/streamingpromql/limiting" "github.com/grafana/mimir/pkg/streamingpromql/types" ) @@ -24,10 +25,20 @@ type RangeVectorSelector struct { rangeMilliseconds int64 chunkIterator chunkenc.Iterator nextT int64 + floats *types.FPointRingBuffer + histograms *types.HPointRingBuffer } var _ types.RangeVectorOperator = &RangeVectorSelector{} +func NewRangeVectorSelector(selector *Selector, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) *RangeVectorSelector { + return &RangeVectorSelector{ + Selector: selector, + floats: types.NewFPointRingBuffer(memoryConsumptionTracker), + histograms: types.NewHPointRingBuffer(memoryConsumptionTracker), + } +} + func (m *RangeVectorSelector) ExpressionPosition() posrange.PositionRange { return m.Selector.ExpressionPosition } @@ -55,10 +66,12 @@ func (m *RangeVectorSelector) NextSeries(ctx context.Context) error { } m.nextT = m.Selector.TimeRange.StartT + m.floats.Reset() + m.histograms.Reset() return nil } -func (m *RangeVectorSelector) NextStepSamples(floats *types.FPointRingBuffer, histograms *types.HPointRingBuffer) (types.RangeVectorStepData, error) { +func (m *RangeVectorSelector) NextStepSamples() (types.RangeVectorStepData, error) { if m.nextT > m.Selector.TimeRange.EndT { return types.RangeVectorStepData{}, types.EOS } @@ -74,16 +87,18 @@ func (m *RangeVectorSelector) NextStepSamples(floats *types.FPointRingBuffer, hi // Apply offset after adjusting for timestamp from @ modifier. rangeEnd = rangeEnd - m.Selector.Offset rangeStart := rangeEnd - m.rangeMilliseconds - floats.DiscardPointsBefore(rangeStart) - histograms.DiscardPointsBefore(rangeStart) + m.floats.DiscardPointsBefore(rangeStart) + m.histograms.DiscardPointsBefore(rangeStart) - if err := m.fillBuffer(floats, histograms, rangeStart, rangeEnd); err != nil { + if err := m.fillBuffer(m.floats, m.histograms, rangeStart, rangeEnd); err != nil { return types.RangeVectorStepData{}, err } m.nextT += m.Selector.TimeRange.IntervalMilliseconds return types.RangeVectorStepData{ + Floats: m.floats, + Histograms: m.histograms, StepT: stepT, RangeStart: rangeStart, RangeEnd: rangeEnd, @@ -143,4 +158,6 @@ func (m *RangeVectorSelector) fillBuffer(floats *types.FPointRingBuffer, histogr func (m *RangeVectorSelector) Close() { m.Selector.Close() + m.floats.Close() + m.histograms.Close() } diff --git a/pkg/streamingpromql/operators/subquery.go b/pkg/streamingpromql/operators/subquery.go new file mode 100644 index 00000000000..f57a05cfe6d --- /dev/null +++ b/pkg/streamingpromql/operators/subquery.go @@ -0,0 +1,121 @@ +// SPDX-License-Identifier: AGPL-3.0-only + +package operators + +import ( + "context" + "time" + + "github.com/prometheus/prometheus/promql/parser/posrange" + + "github.com/grafana/mimir/pkg/streamingpromql/limiting" + "github.com/grafana/mimir/pkg/streamingpromql/types" +) + +type Subquery struct { + Inner types.InstantVectorOperator + ParentQueryTimeRange types.QueryTimeRange + + SubqueryTimestamp *int64 // Milliseconds since Unix epoch, only set if selector uses @ modifier (eg. metric{...} @ 123) + SubqueryOffset int64 // In milliseconds + SubqueryRange time.Duration + + expressionPosition posrange.PositionRange + + nextStepT int64 + rangeMilliseconds int64 + floats *types.FPointRingBuffer + histograms *types.HPointRingBuffer +} + +var _ types.RangeVectorOperator = &Subquery{} + +func NewSubquery( + inner types.InstantVectorOperator, + parentQueryTimeRange types.QueryTimeRange, + subqueryTimestamp *int64, + subqueryOffset time.Duration, + subqueryRange time.Duration, + expressionPosition posrange.PositionRange, + memoryConsumptionTracker *limiting.MemoryConsumptionTracker, +) *Subquery { + return &Subquery{ + Inner: inner, + ParentQueryTimeRange: parentQueryTimeRange, + SubqueryTimestamp: subqueryTimestamp, + SubqueryOffset: subqueryOffset.Milliseconds(), + SubqueryRange: subqueryRange, + expressionPosition: expressionPosition, + rangeMilliseconds: subqueryRange.Milliseconds(), + floats: types.NewFPointRingBuffer(memoryConsumptionTracker), + histograms: types.NewHPointRingBuffer(memoryConsumptionTracker), + } +} + +func (s *Subquery) SeriesMetadata(ctx context.Context) ([]types.SeriesMetadata, error) { + return s.Inner.SeriesMetadata(ctx) +} + +func (s *Subquery) NextSeries(ctx context.Context) error { + // Release the previous series' slices now, so we are likely to reuse the slices for the next series. + s.floats.Release() + s.histograms.Release() + + data, err := s.Inner.NextSeries(ctx) + if err != nil { + return err + } + + s.nextStepT = s.ParentQueryTimeRange.StartT + s.floats.Use(data.Floats) + s.histograms.Use(data.Histograms) + return nil +} + +func (s *Subquery) NextStepSamples() (types.RangeVectorStepData, error) { + if s.nextStepT > s.ParentQueryTimeRange.EndT { + return types.RangeVectorStepData{}, types.EOS + } + + stepT := s.nextStepT + rangeEnd := stepT + + if s.SubqueryTimestamp != nil { + // Timestamp from @ modifier takes precedence over query evaluation timestamp. + rangeEnd = *s.SubqueryTimestamp + } + + // Apply offset after adjusting for timestamp from @ modifier. + rangeEnd = rangeEnd - s.SubqueryOffset + rangeStart := rangeEnd - s.rangeMilliseconds + s.floats.DiscardPointsBefore(rangeStart) + s.histograms.DiscardPointsBefore(rangeStart) + + s.nextStepT += s.ParentQueryTimeRange.IntervalMilliseconds + + return types.RangeVectorStepData{ + Floats: s.floats, + Histograms: s.histograms, + StepT: stepT, + RangeStart: rangeStart, + RangeEnd: rangeEnd, + }, nil +} + +func (s *Subquery) StepCount() int { + return s.ParentQueryTimeRange.StepCount +} + +func (s *Subquery) Range() time.Duration { + return s.SubqueryRange +} + +func (s *Subquery) ExpressionPosition() posrange.PositionRange { + return s.expressionPosition +} + +func (s *Subquery) Close() { + s.Inner.Close() + s.histograms.Close() + s.floats.Close() +} diff --git a/pkg/streamingpromql/query.go b/pkg/streamingpromql/query.go index 8f9758df56f..0243e478676 100644 --- a/pkg/streamingpromql/query.go +++ b/pkg/streamingpromql/query.go @@ -14,6 +14,7 @@ import ( "github.com/go-kit/log/level" "github.com/grafana/dskit/cancellation" "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/model/timestamp" "github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/promql/parser" "github.com/prometheus/prometheus/storage" @@ -43,7 +44,9 @@ type Query struct { memoryConsumptionTracker *limiting.MemoryConsumptionTracker annotations *annotations.Annotations - timeRange types.QueryTimeRange + // Time range of the top-level query. + // Subqueries may use a different range. + topLevelQueryTimeRange types.QueryTimeRange result *promql.Result } @@ -83,16 +86,16 @@ func newQuery(ctx context.Context, queryable storage.Queryable, opts promql.Quer } if q.IsInstant() { - q.timeRange = types.NewInstantQueryTimeRange(start) + q.topLevelQueryTimeRange = types.NewInstantQueryTimeRange(start) } else { - q.timeRange = types.NewRangeQueryTimeRange(start, end, interval) + q.topLevelQueryTimeRange = types.NewRangeQueryTimeRange(start, end, interval) if expr.Type() != parser.ValueTypeVector && expr.Type() != parser.ValueTypeScalar { return nil, fmt.Errorf("query expression produces a %s, but expression for range queries must produce an instant vector or scalar", parser.DocumentedType(expr.Type())) } } - q.root, err = q.convertToOperator(expr) + q.root, err = q.convertToOperator(expr, q.topLevelQueryTimeRange) if err != nil { return nil, err } @@ -100,14 +103,14 @@ func newQuery(ctx context.Context, queryable storage.Queryable, opts promql.Quer return q, nil } -func (q *Query) convertToOperator(expr parser.Expr) (types.Operator, error) { +func (q *Query) convertToOperator(expr parser.Expr, timeRange types.QueryTimeRange) (types.Operator, error) { switch expr.Type() { case parser.ValueTypeMatrix: - return q.convertToRangeVectorOperator(expr) + return q.convertToRangeVectorOperator(expr, timeRange) case parser.ValueTypeVector: - return q.convertToInstantVectorOperator(expr) + return q.convertToInstantVectorOperator(expr, timeRange) case parser.ValueTypeScalar: - return q.convertToScalarOperator(expr) + return q.convertToScalarOperator(expr, timeRange) case parser.ValueTypeString: return q.convertToStringOperator(expr) default: @@ -133,7 +136,7 @@ func (q *Query) convertToStringOperator(expr parser.Expr) (types.StringOperator, } } -func (q *Query) convertToInstantVectorOperator(expr parser.Expr) (types.InstantVectorOperator, error) { +func (q *Query) convertToInstantVectorOperator(expr parser.Expr, timeRange types.QueryTimeRange) (types.InstantVectorOperator, error) { if expr.Type() != parser.ValueTypeVector { return nil, fmt.Errorf("cannot create instant vector operator for expression that produces a %s", parser.DocumentedType(expr.Type())) } @@ -149,7 +152,7 @@ func (q *Query) convertToInstantVectorOperator(expr parser.Expr) (types.InstantV MemoryConsumptionTracker: q.memoryConsumptionTracker, Selector: &operators.Selector{ Queryable: q.queryable, - TimeRange: q.timeRange, + TimeRange: timeRange, Timestamp: e.Timestamp, Offset: e.OriginalOffset.Milliseconds(), LookbackDelta: lookbackDelta, @@ -167,14 +170,14 @@ func (q *Query) convertToInstantVectorOperator(expr parser.Expr) (types.InstantV return nil, compat.NewNotSupportedError(fmt.Sprintf("'%s' aggregation with parameter", e.Op)) } - inner, err := q.convertToInstantVectorOperator(e.Expr) + inner, err := q.convertToInstantVectorOperator(e.Expr, timeRange) if err != nil { return nil, err } return operators.NewAggregation( inner, - q.timeRange, + timeRange, e.Grouping, e.Without, e.Op, @@ -183,7 +186,7 @@ func (q *Query) convertToInstantVectorOperator(expr parser.Expr) (types.InstantV e.PosRange, ) case *parser.Call: - return q.convertFunctionCallToInstantVectorOperator(e) + return q.convertFunctionCallToInstantVectorOperator(e, timeRange) case *parser.BinaryExpr: // We only need to handle three combinations of types here: // Scalar on left, vector on right @@ -202,22 +205,22 @@ func (q *Query) convertToInstantVectorOperator(expr parser.Expr) (types.InstantV var err error if e.LHS.Type() == parser.ValueTypeScalar { - scalar, err = q.convertToScalarOperator(e.LHS) + scalar, err = q.convertToScalarOperator(e.LHS, timeRange) if err != nil { return nil, err } - vector, err = q.convertToInstantVectorOperator(e.RHS) + vector, err = q.convertToInstantVectorOperator(e.RHS, timeRange) if err != nil { return nil, err } } else { - scalar, err = q.convertToScalarOperator(e.RHS) + scalar, err = q.convertToScalarOperator(e.RHS, timeRange) if err != nil { return nil, err } - vector, err = q.convertToInstantVectorOperator(e.LHS) + vector, err = q.convertToInstantVectorOperator(e.LHS, timeRange) if err != nil { return nil, err } @@ -225,7 +228,7 @@ func (q *Query) convertToInstantVectorOperator(expr parser.Expr) (types.InstantV scalarIsLeftSide := e.LHS.Type() == parser.ValueTypeScalar - o, err := operators.NewVectorScalarBinaryOperation(scalar, vector, scalarIsLeftSide, e.Op, e.ReturnBool, q.timeRange, q.memoryConsumptionTracker, q.annotations, e.PositionRange()) + o, err := operators.NewVectorScalarBinaryOperation(scalar, vector, scalarIsLeftSide, e.Op, e.ReturnBool, timeRange, q.memoryConsumptionTracker, q.annotations, e.PositionRange()) if err != nil { return nil, err } @@ -246,21 +249,21 @@ func (q *Query) convertToInstantVectorOperator(expr parser.Expr) (types.InstantV return nil, compat.NewNotSupportedError(fmt.Sprintf("binary expression with %v matching", e.VectorMatching.Card)) } - lhs, err := q.convertToInstantVectorOperator(e.LHS) + lhs, err := q.convertToInstantVectorOperator(e.LHS, timeRange) if err != nil { return nil, err } - rhs, err := q.convertToInstantVectorOperator(e.RHS) + rhs, err := q.convertToInstantVectorOperator(e.RHS, timeRange) if err != nil { return nil, err } switch e.Op { case parser.LAND, parser.LUNLESS: - return operators.NewAndUnlessBinaryOperation(lhs, rhs, *e.VectorMatching, q.memoryConsumptionTracker, e.Op == parser.LUNLESS, q.timeRange, e.PositionRange()), nil + return operators.NewAndUnlessBinaryOperation(lhs, rhs, *e.VectorMatching, q.memoryConsumptionTracker, e.Op == parser.LUNLESS, timeRange, e.PositionRange()), nil case parser.LOR: - return operators.NewOrBinaryOperation(lhs, rhs, *e.VectorMatching, q.memoryConsumptionTracker, q.timeRange, e.PositionRange()), nil + return operators.NewOrBinaryOperation(lhs, rhs, *e.VectorMatching, q.memoryConsumptionTracker, timeRange, e.PositionRange()), nil default: return operators.NewVectorVectorBinaryOperation(lhs, rhs, *e.VectorMatching, e.Op, e.ReturnBool, q.memoryConsumptionTracker, q.annotations, e.PositionRange()) } @@ -270,7 +273,7 @@ func (q *Query) convertToInstantVectorOperator(expr parser.Expr) (types.InstantV return nil, compat.NewNotSupportedError(fmt.Sprintf("unary expression with '%s'", e.Op)) } - inner, err := q.convertToInstantVectorOperator(e.Expr) + inner, err := q.convertToInstantVectorOperator(e.Expr, timeRange) if err != nil { return nil, err } @@ -279,15 +282,15 @@ func (q *Query) convertToInstantVectorOperator(expr parser.Expr) (types.InstantV case *parser.StepInvariantExpr: // One day, we'll do something smarter here. - return q.convertToInstantVectorOperator(e.Expr) + return q.convertToInstantVectorOperator(e.Expr, timeRange) case *parser.ParenExpr: - return q.convertToInstantVectorOperator(e.Expr) + return q.convertToInstantVectorOperator(e.Expr, timeRange) default: return nil, compat.NewNotSupportedError(fmt.Sprintf("PromQL expression type %T for instant vectors", e)) } } -func (q *Query) convertFunctionCallToInstantVectorOperator(e *parser.Call) (types.InstantVectorOperator, error) { +func (q *Query) convertFunctionCallToInstantVectorOperator(e *parser.Call, timeRange types.QueryTimeRange) (types.InstantVectorOperator, error) { factory, ok := instantVectorFunctionOperatorFactories[e.Func.Name] if !ok { return nil, compat.NewNotSupportedError(fmt.Sprintf("'%s' function", e.Func.Name)) @@ -295,7 +298,7 @@ func (q *Query) convertFunctionCallToInstantVectorOperator(e *parser.Call) (type args := make([]types.Operator, len(e.Args)) for i := range e.Args { - a, err := q.convertToOperator(e.Args[i]) + a, err := q.convertToOperator(e.Args[i], timeRange) if err != nil { return nil, err } @@ -305,7 +308,7 @@ func (q *Query) convertFunctionCallToInstantVectorOperator(e *parser.Call) (type return factory(args, q.memoryConsumptionTracker, q.annotations, e.PosRange) } -func (q *Query) convertToRangeVectorOperator(expr parser.Expr) (types.RangeVectorOperator, error) { +func (q *Query) convertToRangeVectorOperator(expr parser.Expr, timeRange types.QueryTimeRange) (types.RangeVectorOperator, error) { if expr.Type() != parser.ValueTypeMatrix { return nil, fmt.Errorf("cannot create range vector operator for expression that produces a %s", parser.DocumentedType(expr.Type())) } @@ -313,30 +316,88 @@ func (q *Query) convertToRangeVectorOperator(expr parser.Expr) (types.RangeVecto switch e := expr.(type) { case *parser.MatrixSelector: vectorSelector := e.VectorSelector.(*parser.VectorSelector) + selector := &operators.Selector{ + Queryable: q.queryable, + TimeRange: timeRange, + Timestamp: vectorSelector.Timestamp, + Offset: vectorSelector.OriginalOffset.Milliseconds(), + Range: e.Range, + Matchers: vectorSelector.LabelMatchers, - return &operators.RangeVectorSelector{ - Selector: &operators.Selector{ - Queryable: q.queryable, - TimeRange: q.timeRange, - Timestamp: vectorSelector.Timestamp, - Offset: vectorSelector.OriginalOffset.Milliseconds(), - Range: e.Range, - Matchers: vectorSelector.LabelMatchers, + ExpressionPosition: e.PositionRange(), + } + + return operators.NewRangeVectorSelector(selector, q.memoryConsumptionTracker), nil + + case *parser.SubqueryExpr: + if !q.engine.featureToggles.EnableSubqueries { + return nil, compat.NewNotSupportedError("subquery") + } + + // Subqueries are evaluated as a single range query with steps aligned to Unix epoch time 0. + // They are not evaluated as queries aligned to the individual step timestamps. + // See https://www.robustperception.io/promql-subqueries-and-alignment/ for an explanation. + // Subquery evaluation aligned to step timestamps is not supported by Prometheus, but may be + // introduced in the future in https://github.com/prometheus/prometheus/pull/9114. + // + // While this makes subqueries simpler to implement and more efficient in most cases, it does + // mean we could waste time evaluating steps that won't be used if the subquery range is less + // than the parent query step. For example, if the parent query is running with a step of 1h, + // and the subquery is for a 10m range with 1m steps, then we'll evaluate ~50m of steps that + // won't be used. + // This is relatively uncommon, and Prometheus' engine does the same thing. In the future, we + // could be smarter about this if it turns out to be a big problem. + step := e.Step.Milliseconds() + + if step == 0 { + step = q.engine.noStepSubqueryIntervalFn(e.Range.Milliseconds()) + } + + start := timeRange.StartT + end := timeRange.EndT + + if e.Timestamp != nil { + start = *e.Timestamp + end = *e.Timestamp + } + + // Find the first timestamp inside the subquery range that is aligned to the step. + alignedStart := step * ((start - e.OriginalOffset.Milliseconds() - e.Range.Milliseconds()) / step) + if alignedStart < start-e.OriginalOffset.Milliseconds()-e.Range.Milliseconds() { + alignedStart += step + } + + end = end - e.OriginalOffset.Milliseconds() + + subqueryTimeRange := types.NewRangeQueryTimeRange(timestamp.Time(alignedStart), timestamp.Time(end), time.Duration(step)*time.Millisecond) + inner, err := q.convertToInstantVectorOperator(e.Expr, subqueryTimeRange) + if err != nil { + return nil, err + } + + subquery := operators.NewSubquery( + inner, + timeRange, + e.Timestamp, + e.OriginalOffset, + e.Range, + e.PositionRange(), + q.memoryConsumptionTracker, + ) + + return subquery, nil - ExpressionPosition: e.PositionRange(), - }, - }, nil case *parser.StepInvariantExpr: // One day, we'll do something smarter here. - return q.convertToRangeVectorOperator(e.Expr) + return q.convertToRangeVectorOperator(e.Expr, timeRange) case *parser.ParenExpr: - return q.convertToRangeVectorOperator(e.Expr) + return q.convertToRangeVectorOperator(e.Expr, timeRange) default: return nil, compat.NewNotSupportedError(fmt.Sprintf("PromQL expression type %T for range vectors", e)) } } -func (q *Query) convertToScalarOperator(expr parser.Expr) (types.ScalarOperator, error) { +func (q *Query) convertToScalarOperator(expr parser.Expr, timeRange types.QueryTimeRange) (types.ScalarOperator, error) { if expr.Type() != parser.ValueTypeScalar { return nil, fmt.Errorf("cannot create scalar operator for expression that produces a %s", parser.DocumentedType(expr.Type())) } @@ -349,7 +410,7 @@ func (q *Query) convertToScalarOperator(expr parser.Expr) (types.ScalarOperator, case *parser.NumberLiteral: o := operators.NewScalarConstant( e.Val, - q.timeRange, + timeRange, q.memoryConsumptionTracker, e.PositionRange(), ) @@ -357,14 +418,14 @@ func (q *Query) convertToScalarOperator(expr parser.Expr) (types.ScalarOperator, return o, nil case *parser.Call: - return q.convertFunctionCallToScalarOperator(e) + return q.convertFunctionCallToScalarOperator(e, timeRange) case *parser.UnaryExpr: if e.Op != parser.SUB { return nil, compat.NewNotSupportedError(fmt.Sprintf("unary expression with '%s'", e.Op)) } - inner, err := q.convertToScalarOperator(e.Expr) + inner, err := q.convertToScalarOperator(e.Expr, timeRange) if err != nil { return nil, err } @@ -373,20 +434,20 @@ func (q *Query) convertToScalarOperator(expr parser.Expr) (types.ScalarOperator, case *parser.StepInvariantExpr: // One day, we'll do something smarter here. - return q.convertToScalarOperator(e.Expr) + return q.convertToScalarOperator(e.Expr, timeRange) case *parser.ParenExpr: - return q.convertToScalarOperator(e.Expr) + return q.convertToScalarOperator(e.Expr, timeRange) case *parser.BinaryExpr: if e.Op.IsComparisonOperator() && !q.engine.featureToggles.EnableScalarScalarBinaryComparisonOperations { return nil, compat.NewNotSupportedError(fmt.Sprintf("scalar/scalar binary expression with '%v'", e.Op)) } - lhs, err := q.convertToScalarOperator(e.LHS) + lhs, err := q.convertToScalarOperator(e.LHS, timeRange) if err != nil { return nil, err } - rhs, err := q.convertToScalarOperator(e.RHS) + rhs, err := q.convertToScalarOperator(e.RHS, timeRange) if err != nil { return nil, err } @@ -398,7 +459,7 @@ func (q *Query) convertToScalarOperator(expr parser.Expr) (types.ScalarOperator, } } -func (q *Query) convertFunctionCallToScalarOperator(e *parser.Call) (types.ScalarOperator, error) { +func (q *Query) convertFunctionCallToScalarOperator(e *parser.Call, timeRange types.QueryTimeRange) (types.ScalarOperator, error) { factory, ok := scalarFunctionOperatorFactories[e.Func.Name] if !ok { return nil, compat.NewNotSupportedError(fmt.Sprintf("'%s' function", e.Func.Name)) @@ -406,14 +467,14 @@ func (q *Query) convertFunctionCallToScalarOperator(e *parser.Call) (types.Scala args := make([]types.Operator, len(e.Args)) for i := range e.Args { - a, err := q.convertToOperator(e.Args[i]) + a, err := q.convertToOperator(e.Args[i], timeRange) if err != nil { return nil, err } args[i] = a } - return factory(args, q.memoryConsumptionTracker, q.annotations, e.PosRange, q.timeRange) + return factory(args, q.memoryConsumptionTracker, q.annotations, e.PosRange, timeRange) } func (q *Query) IsInstant() bool { @@ -459,14 +520,14 @@ func (q *Query) Exec(ctx context.Context) *promql.Result { if q.IsInstant() { msg = append(msg, "queryType", "instant", - "time", q.timeRange.StartT, + "time", q.topLevelQueryTimeRange.StartT, ) } else { msg = append(msg, "queryType", "range", - "start", q.timeRange.StartT, - "end", q.timeRange.EndT, - "step", q.timeRange.IntervalMilliseconds, + "start", q.topLevelQueryTimeRange.StartT, + "end", q.topLevelQueryTimeRange.EndT, + "step", q.topLevelQueryTimeRange.IntervalMilliseconds, ) } @@ -639,10 +700,6 @@ func (q *Query) populateMatrixFromInstantVectorOperator(ctx context.Context, o t func (q *Query) populateMatrixFromRangeVectorOperator(ctx context.Context, o types.RangeVectorOperator, series []types.SeriesMetadata) (promql.Matrix, error) { m := types.GetMatrix(len(series)) - floatBuffer := types.NewFPointRingBuffer(q.memoryConsumptionTracker) - histogramBuffer := types.NewHPointRingBuffer(q.memoryConsumptionTracker) - defer floatBuffer.Close() - defer histogramBuffer.Close() for i, s := range series { err := o.NextSeries(ctx) @@ -654,19 +711,17 @@ func (q *Query) populateMatrixFromRangeVectorOperator(ctx context.Context, o typ return nil, err } - floatBuffer.Reset() - histogramBuffer.Reset() - step, err := o.NextStepSamples(floatBuffer, histogramBuffer) + step, err := o.NextStepSamples() if err != nil { return nil, err } - floats, err := floatBuffer.CopyPoints(step.RangeEnd) + floats, err := step.Floats.CopyPoints(step.RangeEnd) if err != nil { return nil, err } - histograms, err := histogramBuffer.CopyPoints(step.RangeEnd) + histograms, err := step.Histograms.CopyPoints(step.RangeEnd) if err != nil { return nil, err } diff --git a/pkg/streamingpromql/testdata/ours/subqueries.test b/pkg/streamingpromql/testdata/ours/subqueries.test new file mode 100644 index 00000000000..475acde23cb --- /dev/null +++ b/pkg/streamingpromql/testdata/ours/subqueries.test @@ -0,0 +1,129 @@ +# SPDX-License-Identifier: AGPL-3.0-only + +# Most cases for functions are covered already in the upstream test cases. +# These test cases cover scenarios not covered by the upstream test cases, such as range queries, or edge cases that are uniquely likely to cause issues in the streaming engine. + +load 1m + metric{type="floats"} 0 4 3 6 -1 10 + metric{type="histograms"} {{count:0}} {{count:4}} {{count:3}} {{count:6}} {{count:-1}} {{count:10}} + metric{type="mixed"} 0 4 3 6 {{count:-1}} {{count:10}} + +# Test that both ends of the time range selected are inclusive. +eval instant at 4m59s count_over_time(metric[4m:30s]) + {type="floats"} 8 + {type="histograms"} 8 + {type="mixed"} 8 + +eval instant at 5m count_over_time(metric[4m:30s]) + {type="floats"} 9 + {type="histograms"} 9 + {type="mixed"} 9 + +eval instant at 5m count_over_time(metric[3m59s:30s]) + {type="floats"} 8 + {type="histograms"} 8 + {type="mixed"} 8 + +eval range from 4m59s to 5m step 1s count_over_time(metric[4m:30s]) + {type="floats"} 8 9 + {type="histograms"} 8 9 + {type="mixed"} 8 9 + +eval range from 5m to 5m1s step 1s count_over_time(metric[3m59s:30s]) + {type="floats"} 8 8 + {type="histograms"} 8 8 + {type="mixed"} 8 8 + +# Evaluation step should be aligned to T=0, not the query evaluation time. +eval instant at 5m last_over_time(metric[4m:3m]) + metric{type="floats"} 6 + metric{type="histograms"} {{count:6}} + metric{type="mixed"} 6 + +eval range from 0 to 14m step 1m last_over_time(metric[4m:3m]) + metric{type="floats"} 0 0 0 6 6 6 10 10 10 10 10 10 10 10 _ + metric{type="histograms"} {{count:0}} {{count:0}} {{count:0}} {{count:6}} {{count:6}} {{count:6}} {{count:10}} {{count:10}} {{count:10}} {{count:10}} {{count:10}} {{count:10}} {{count:10}} {{count:10}} _ + metric{type="mixed"} 0 0 0 6 6 6 {{count:10}} {{count:10}} {{count:10}} {{count:10}} {{count:10}} {{count:10}} {{count:10}} {{count:10}} _ + +# Subquery with @ +eval instant at 5m last_over_time(metric[2m:1m] @ 1m) + metric{type="floats"} 4 + metric{type="histograms"} {{count:4}} + metric{type="mixed"} 4 + +eval range from 0 to 5m step 1m last_over_time(metric[2m:1m] @ 1m) + metric{type="floats"} 4 4 4 4 4 4 + metric{type="histograms"} {{count:4}} {{count:4}} {{count:4}} {{count:4}} {{count:4}} {{count:4}} + metric{type="mixed"} 4 4 4 4 4 4 + +# Instant queries with @ start() and @ end() are exercised in TestSubqueries, as the range mode of 'eval instant' +# alters the start timestamp, making testing this here impossible. +eval range from 0 to 5m step 1m last_over_time(metric[2m:1m] @ start()) + metric{type="floats"} 0 0 0 0 0 0 + metric{type="histograms"} {{count:0}} {{count:0}} {{count:0}} {{count:0}} {{count:0}} {{count:0}} + metric{type="mixed"} 0 0 0 0 0 0 + +eval range from 0 to 5m step 1m last_over_time(metric[2m:1m] @ end()) + metric{type="floats"} 10 10 10 10 10 10 + metric{type="histograms"} {{count:10}} {{count:10}} {{count:10}} {{count:10}} {{count:10}} {{count:10}} + metric{type="mixed"} {{count:10}} {{count:10}} {{count:10}} {{count:10}} {{count:10}} {{count:10}} + +# Subquery with 'offset' +# Start with query without offset to show non-offset results +eval range from 0 to 5m step 1m last_over_time(metric[2m:1m]) + metric{type="floats"} 0 4 3 6 -1 10 + metric{type="histograms"} {{count:0}} {{count:4}} {{count:3}} {{count:6}} {{count:-1}} {{count:10}} + metric{type="mixed"} 0 4 3 6 {{count:-1}} {{count:10}} + +eval instant at 5m last_over_time(metric[2m:1m] offset 1m) + metric{type="floats"} -1 + metric{type="histograms"} {{count:-1}} + metric{type="mixed"} {{count:-1}} + +eval range from 0 to 5m step 1m last_over_time(metric[2m:1m] offset 1m) + metric{type="floats"} _ 0 4 3 6 -1 + metric{type="histograms"} _ {{count:0}} {{count:4}} {{count:3}} {{count:6}} {{count:-1}} + metric{type="mixed"} _ 0 4 3 6 {{count:-1}} + +# Subquery range smaller than subquery step +eval instant at 5m last_over_time(metric[1m:2m]) + metric{type="floats"} -1 + metric{type="histograms"} {{count:-1}} + metric{type="mixed"} {{count:-1}} + +eval range from 0 to 5m step 1m last_over_time(metric[1m:2m]) + metric{type="floats"} 0 0 3 3 -1 -1 + metric{type="histograms"} {{count:0}} {{count:0}} {{count:3}} {{count:3}} {{count:-1}} {{count:-1}} + metric{type="mixed"} 0 0 3 3 {{count:-1}} {{count:-1}} + +# Nesting +# +# last_over_time[2m:1m] produces these results: +# T=0m T=1m T=2m T=3m T=4m T=5m +# 0 4 3 6 -1 10 + +eval_warn instant at 5m sum_over_time(last_over_time(metric[2m:1m])[5m:90s]) + {type="floats"} 9 + {type="histograms"} {{count:9}} + # No results for {type="mixed"} due to mixture of floats and histograms + +eval_warn range from 0 to 5m step 1m sum_over_time(last_over_time(metric[2m:1m])[5m:90s]) + {type="floats"} 0 0 4 10 10 9 + {type="histograms"} {{count:0}} {{count:0}} {{count:4}} {{count:10}} {{count:10}} {{count:9}} + {type="mixed"} 0 0 4 10 10 _ + # Last sample for {type="mixed"} dropped due to mixture of floats and histograms + +clear + +# Test deeply nested subquery with changing step. +load 1m + metric 0 1 2 3 4 + +eval range from 0 to 4m step 15s sum_over_time(metric[2m:30s]) + {} 0 0 0 0 1 1 2 2 4 4 6 6 9 8 11 10 14 + +eval range from 0 to 4m step 20s sum_over_time(sum_over_time(metric[2m:30s])[3m:15s]) + {} 0 0 0 1 2 4 10 14 20 35 43 54 78 + +eval range from 0 to 4m step 3m sum_over_time(sum_over_time(sum_over_time(metric[2m:30s])[3m:15s])[4m:20s]) + {} 0 86 diff --git a/pkg/streamingpromql/testdata/upstream/aggregators.test b/pkg/streamingpromql/testdata/upstream/aggregators.test index e29151a04dc..0ac575e970b 100644 --- a/pkg/streamingpromql/testdata/upstream/aggregators.test +++ b/pkg/streamingpromql/testdata/upstream/aggregators.test @@ -219,9 +219,8 @@ load 5m http_requests{job="api-server", instance="1", group="production"} 0+1.33x10 http_requests{job="api-server", instance="0", group="canary"} 0+1.33x10 -# Unsupported by streaming engine. -# eval instant at 50m stddev(http_requests) -# {} 0.0 +eval instant at 50m stddev(http_requests) + {} 0.0 eval instant at 50m stdvar(http_requests) {} 0.0 @@ -458,11 +457,10 @@ load 10s data{test="uneven samples",point="c"} 4 foo .8 -# Unsupported by streaming engine. -# eval instant at 1m group without(point)(data) -# {test="two samples"} 1 -# {test="three samples"} 1 -# {test="uneven samples"} 1 +eval instant at 1m group without(point)(data) + {test="two samples"} 1 + {test="three samples"} 1 + {test="uneven samples"} 1 eval instant at 1m group(foo) {} 1 diff --git a/pkg/streamingpromql/testdata/upstream/at_modifier.test b/pkg/streamingpromql/testdata/upstream/at_modifier.test index 755ac85b6e5..1dc18eff70d 100644 --- a/pkg/streamingpromql/testdata/upstream/at_modifier.test +++ b/pkg/streamingpromql/testdata/upstream/at_modifier.test @@ -107,57 +107,49 @@ eval instant at 25s sum_over_time(metric{job="1"}[100] @ 100) + label_replace(su # Subqueries. # 10*(1+2+...+9) + 10. -# Unsupported by streaming engine. -# eval instant at 25s sum_over_time(metric{job="1"}[100s:1s] @ 100) -# {job="1"} 460 +eval instant at 25s sum_over_time(metric{job="1"}[100s:1s] @ 100) + {job="1"} 460 # 10*(1+2+...+7) + 8. -# Unsupported by streaming engine. -# eval instant at 25s sum_over_time(metric{job="1"}[100s:1s] @ 100 offset 20s) -# {job="1"} 288 +eval instant at 25s sum_over_time(metric{job="1"}[100s:1s] @ 100 offset 20s) + {job="1"} 288 # 10*(1+2+...+7) + 8. -# Unsupported by streaming engine. -# eval instant at 25s sum_over_time(metric{job="1"}[100s:1s] offset 20s @ 100) -# {job="1"} 288 +eval instant at 25s sum_over_time(metric{job="1"}[100s:1s] offset 20s @ 100) + {job="1"} 288 # 10*(1+2+...+7) + 8. -# Unsupported by streaming engine. -# eval instant at 25s sum_over_time(metric{job="1"}[100:1] offset 20 @ 100) -# {job="1"} 288 +eval instant at 25s sum_over_time(metric{job="1"}[100:1] offset 20 @ 100) + {job="1"} 288 # Subquery with different timestamps. # Since vector selector has timestamp, the result value does not depend on the timestamp of subqueries. # Inner most sum=1+2+...+10=55. # With [100s:25s] subquery, it's 55*5. -# Unsupported by streaming engine. -# eval instant at 100s sum_over_time(sum_over_time(metric{job="1"}[100s] @ 100)[100s:25s] @ 50) -# {job="1"} 275 +eval instant at 100s sum_over_time(sum_over_time(metric{job="1"}[100s] @ 100)[100s:25s] @ 50) + {job="1"} 275 # Nested subqueries with different timestamps on both. # Since vector selector has timestamp, the result value does not depend on the timestamp of subqueries. # Sum of innermost subquery is 275 as above. The outer subquery repeats it 4 times. -# Unsupported by streaming engine. -# eval instant at 0s sum_over_time(sum_over_time(sum_over_time(metric{job="1"}[100s] @ 100)[100s:25s] @ 50)[3s:1s] @ 3000) -# {job="1"} 1100 +eval instant at 0s sum_over_time(sum_over_time(sum_over_time(metric{job="1"}[100s] @ 100)[100s:25s] @ 50)[3s:1s] @ 3000) + {job="1"} 1100 # Testing the inner subquery timestamp since vector selector does not have @. # Inner sum for subquery [100s:25s] @ 50 are # at -50 nothing, at -25 nothing, at 0=0, at 25=2, at 50=4+5=9. # This sum of 11 is repeated 4 times by outer subquery. -# Unsupported by streaming engine. -# eval instant at 0s sum_over_time(sum_over_time(sum_over_time(metric{job="1"}[10s])[100s:25s] @ 50)[3s:1s] @ 200) -# {job="1"} 44 +eval instant at 0s sum_over_time(sum_over_time(sum_over_time(metric{job="1"}[10s])[100s:25s] @ 50)[3s:1s] @ 200) + {job="1"} 44 # Inner sum for subquery [100s:25s] @ 200 are # at 100=9+10, at 125=12, at 150=14+15, at 175=17, at 200=19+20. # This sum of 116 is repeated 4 times by outer subquery. -# Unsupported by streaming engine. -# eval instant at 0s sum_over_time(sum_over_time(sum_over_time(metric{job="1"}[10s])[100s:25s] @ 200)[3s:1s] @ 50) -# {job="1"} 464 +eval instant at 0s sum_over_time(sum_over_time(sum_over_time(metric{job="1"}[10s])[100s:25s] @ 200)[3s:1s] @ 50) + {job="1"} 464 # Nested subqueries with timestamp only on outer subquery. # Outer most subquery: @@ -171,9 +163,8 @@ eval instant at 25s sum_over_time(metric{job="1"}[100] @ 100) + label_replace(su # inner subquery: at 945=94+93, at 955=95+94, at 965=96+95 # at 1000=873 # inner subquery: at 970=97+96+95, at 980=98+97+96, at 990=99+98+97 -# Unsupported by streaming engine. -# eval instant at 0s sum_over_time(sum_over_time(sum_over_time(metric{job="1"}[20s])[20s:10s] offset 10s)[100s:25s] @ 1000) -# {job="1"} 3588 +eval instant at 0s sum_over_time(sum_over_time(sum_over_time(metric{job="1"}[20s])[20s:10s] offset 10s)[100s:25s] @ 1000) + {job="1"} 3588 # minute is counted on the value of the sample. # Unsupported by streaming engine. diff --git a/pkg/streamingpromql/testdata/upstream/functions.test b/pkg/streamingpromql/testdata/upstream/functions.test index fc22e60cd69..a74d23f2365 100644 --- a/pkg/streamingpromql/testdata/upstream/functions.test +++ b/pkg/streamingpromql/testdata/upstream/functions.test @@ -1245,8 +1245,7 @@ eval instant at 1m present_over_time(http_requests{handler!="/foo"}[5m]) eval instant at 1m present_over_time(http_requests{handler="/foo", handler="/bar", handler="/foobar"}[5m]) -# Unsupported by streaming engine. -# eval instant at 1m present_over_time(rate(nonexistant[5m])[5m:]) +eval instant at 1m present_over_time(rate(nonexistant[5m])[5m:]) eval instant at 1m present_over_time(http_requests{handler="/foo", handler="/bar", instance="127.0.0.1"}[5m]) @@ -1261,10 +1260,9 @@ eval instant at 5m present_over_time(http_requests[5m]) {instance="127.0.0.1", job="httpd", path="/bar"} 1 {instance="127.0.0.1", job="httpd", path="/foo"} 1 -# Unsupported by streaming engine. -# eval instant at 5m present_over_time(rate(http_requests[5m])[5m:1m]) -# {instance="127.0.0.1", job="httpd", path="/bar"} 1 -# {instance="127.0.0.1", job="httpd", path="/foo"} 1 +eval instant at 5m present_over_time(rate(http_requests[5m])[5m:1m]) + {instance="127.0.0.1", job="httpd", path="/bar"} 1 + {instance="127.0.0.1", job="httpd", path="/foo"} 1 eval instant at 0m present_over_time(httpd_log_lines_total[30s]) {instance="127.0.0.1",job="node"} 1 @@ -1289,8 +1287,7 @@ eval instant at 16m present_over_time({instance="127.0.0.1"}[5m]) eval instant at 21m present_over_time({job="grok"}[20m]) -# Unsupported by streaming engine. -# eval instant at 30m present_over_time({instance="127.0.0.1"}[5m:5s]) +eval instant at 30m present_over_time({instance="127.0.0.1"}[5m:5s]) eval instant at 5m present_over_time({job="ingress"}[4m]) {job="ingress"} 1 diff --git a/pkg/streamingpromql/testdata/upstream/native_histograms.test b/pkg/streamingpromql/testdata/upstream/native_histograms.test index 0e731ace9c7..3da3301493e 100644 --- a/pkg/streamingpromql/testdata/upstream/native_histograms.test +++ b/pkg/streamingpromql/testdata/upstream/native_histograms.test @@ -1146,10 +1146,8 @@ clear load 1m histogram_sum_over_time {{schema:0 count:25 sum:1234.5 z_bucket:4 z_bucket_w:0.001 buckets:[1 2 0 1 1] n_buckets:[2 4 0 0 1 9]}} {{schema:0 count:41 sum:2345.6 z_bucket:5 z_bucket_w:0.001 buckets:[1 3 1 2 1 1 1] n_buckets:[0 1 4 2 7 0 0 0 0 5 5 2]}} {{schema:0 count:41 sum:1111.1 z_bucket:5 z_bucket_w:0.001 buckets:[1 3 1 2 1 1 1] n_buckets:[0 1 4 2 7 0 0 0 0 5 5 2]}} {{schema:1 count:0}} -# Unsupported by streaming engine. -# eval instant at 3m sum_over_time(histogram_sum_over_time[3m:1m]) -# {} {{schema:0 count:107 sum:4691.2 z_bucket:14 z_bucket_w:0.001 buckets:[3 8 2 5 3 2 2] n_buckets:[2 6 8 4 15 9 0 0 0 10 10 4]}} +eval instant at 3m sum_over_time(histogram_sum_over_time[3m:1m]) + {} {{schema:0 count:107 sum:4691.2 z_bucket:14 z_bucket_w:0.001 buckets:[3 8 2 5 3 2 2] n_buckets:[2 6 8 4 15 9 0 0 0 10 10 4]}} -# Unsupported by streaming engine. -# eval instant at 3m avg_over_time(histogram_sum_over_time[3m:1m]) -# {} {{schema:0 count:26.75 sum:1172.8 z_bucket:3.5 z_bucket_w:0.001 buckets:[0.75 2 0.5 1.25 0.75 0.5 0.5] n_buckets:[0.5 1.5 2 1 3.75 2.25 0 0 0 2.5 2.5 1]}} +eval instant at 3m avg_over_time(histogram_sum_over_time[3m:1m]) + {} {{schema:0 count:26.75 sum:1172.8 z_bucket:3.5 z_bucket_w:0.001 buckets:[0.75 2 0.5 1.25 0.75 0.5 0.5] n_buckets:[0.5 1.5 2 1 3.75 2.25 0 0 0 2.5 2.5 1]}} diff --git a/pkg/streamingpromql/testdata/upstream/subquery.test b/pkg/streamingpromql/testdata/upstream/subquery.test index e9d2e516005..4056278afef 100644 --- a/pkg/streamingpromql/testdata/upstream/subquery.test +++ b/pkg/streamingpromql/testdata/upstream/subquery.test @@ -7,32 +7,26 @@ load 10s metric 1 2 # Evaluation before 0s gets no sample. -# Unsupported by streaming engine. -# eval instant at 10s sum_over_time(metric[50s:10s]) -# {} 3 +eval instant at 10s sum_over_time(metric[50s:10s]) + {} 3 -# Unsupported by streaming engine. -# eval instant at 10s sum_over_time(metric[50s:5s]) -# {} 4 +eval instant at 10s sum_over_time(metric[50s:5s]) + {} 4 # Every evaluation yields the last value, i.e. 2 -# Unsupported by streaming engine. -# eval instant at 5m sum_over_time(metric[50s:10s]) -# {} 12 +eval instant at 5m sum_over_time(metric[50s:10s]) + {} 12 # Series becomes stale at 5m10s (5m after last sample) # Hence subquery gets a single sample at 6m-50s=5m10s. -# Unsupported by streaming engine. -# eval instant at 6m sum_over_time(metric[50s:10s]) -# {} 2 +eval instant at 6m sum_over_time(metric[50s:10s]) + {} 2 -# Unsupported by streaming engine. -# eval instant at 10s rate(metric[20s:10s]) -# {} 0.1 +eval instant at 10s rate(metric[20s:10s]) + {} 0.1 -# Unsupported by streaming engine. -# eval instant at 20s rate(metric[20s:5s]) -# {} 0.05 +eval instant at 20s rate(metric[20s:5s]) + {} 0.05 clear @@ -42,17 +36,15 @@ load 10s http_requests{job="api-server", instance="0", group="canary"} 0+30x1000 300+80x1000 http_requests{job="api-server", instance="1", group="canary"} 0+40x2000 -# Unsupported by streaming engine. -# eval instant at 8000s rate(http_requests{group=~"pro.*"}[1m:10s]) -# {job="api-server", instance="0", group="production"} 1 -# {job="api-server", instance="1", group="production"} 2 +eval instant at 8000s rate(http_requests{group=~"pro.*"}[1m:10s]) + {job="api-server", instance="0", group="production"} 1 + {job="api-server", instance="1", group="production"} 2 -# Unsupported by streaming engine. -# eval instant at 20000s avg_over_time(rate(http_requests[1m])[1m:1s]) -# {job="api-server", instance="0", group="canary"} 8 -# {job="api-server", instance="1", group="canary"} 4 -# {job="api-server", instance="1", group="production"} 3 -# {job="api-server", instance="0", group="production"} 3 +eval instant at 20000s avg_over_time(rate(http_requests[1m])[1m:1s]) + {job="api-server", instance="0", group="canary"} 8 + {job="api-server", instance="1", group="canary"} 4 + {job="api-server", instance="1", group="production"} 3 + {job="api-server", instance="0", group="production"} 3 clear @@ -61,78 +53,61 @@ load 10s metric2 0+2x1000 metric3 0+3x1000 -# Unsupported by streaming engine. -# eval instant at 1000s sum_over_time(metric1[30s:10s]) -# {} 394 +eval instant at 1000s sum_over_time(metric1[30s:10s]) + {} 394 # This is (394*2 - 100), because other than the last 100 at 1000s, # everything else is repeated with the 5s step. -# Unsupported by streaming engine. -# eval instant at 1000s sum_over_time(metric1[30s:5s]) -# {} 688 +eval instant at 1000s sum_over_time(metric1[30s:5s]) + {} 688 # Offset is aligned with the step. -# Unsupported by streaming engine. -# eval instant at 1010s sum_over_time(metric1[30s:10s] offset 10s) -# {} 394 +eval instant at 1010s sum_over_time(metric1[30s:10s] offset 10s) + {} 394 # Same result for different offsets due to step alignment. -# Unsupported by streaming engine. -# eval instant at 1010s sum_over_time(metric1[30s:10s] offset 9s) -# {} 297 +eval instant at 1010s sum_over_time(metric1[30s:10s] offset 9s) + {} 297 -# Unsupported by streaming engine. -# eval instant at 1010s sum_over_time(metric1[30s:10s] offset 7s) -# {} 297 +eval instant at 1010s sum_over_time(metric1[30s:10s] offset 7s) + {} 297 -# Unsupported by streaming engine. -# eval instant at 1010s sum_over_time(metric1[30s:10s] offset 5s) -# {} 297 +eval instant at 1010s sum_over_time(metric1[30s:10s] offset 5s) + {} 297 -# Unsupported by streaming engine. -# eval instant at 1010s sum_over_time(metric1[30s:10s] offset 3s) -# {} 297 +eval instant at 1010s sum_over_time(metric1[30s:10s] offset 3s) + {} 297 -# Unsupported by streaming engine. -# eval instant at 1010s sum_over_time((metric1)[30s:10s] offset 3s) -# {} 297 +eval instant at 1010s sum_over_time((metric1)[30s:10s] offset 3s) + {} 297 -# Unsupported by streaming engine. -# eval instant at 1010s sum_over_time(metric1[30:10] offset 3) -# {} 297 +eval instant at 1010s sum_over_time(metric1[30:10] offset 3) + {} 297 -# Unsupported by streaming engine. -# eval instant at 1010s sum_over_time((metric1)[30:10s] offset 3s) -# {} 297 +eval instant at 1010s sum_over_time((metric1)[30:10s] offset 3s) + {} 297 -# Unsupported by streaming engine. -# eval instant at 1010s sum_over_time((metric1)[30:10s] offset 3s) -# {} 297 +eval instant at 1010s sum_over_time((metric1)[30:10s] offset 3s) + {} 297 -# Unsupported by streaming engine. -# eval instant at 1010s sum_over_time((metric1)[30:10] offset 3s) -# {} 297 +eval instant at 1010s sum_over_time((metric1)[30:10] offset 3s) + {} 297 -# Unsupported by streaming engine. -# eval instant at 1010s sum_over_time((metric1)[30:10] offset 3) -# {} 297 +eval instant at 1010s sum_over_time((metric1)[30:10] offset 3) + {} 297 # Nested subqueries -# Unsupported by streaming engine. -# eval instant at 1000s rate(sum_over_time(metric1[30s:10s])[50s:10s]) -# {} 0.4 +eval instant at 1000s rate(sum_over_time(metric1[30s:10s])[50s:10s]) + {} 0.4 -# Unsupported by streaming engine. -# eval instant at 1000s rate(sum_over_time(metric2[30s:10s])[50s:10s]) -# {} 0.8 +eval instant at 1000s rate(sum_over_time(metric2[30s:10s])[50s:10s]) + {} 0.8 -# Unsupported by streaming engine. -# eval instant at 1000s rate(sum_over_time(metric3[30s:10s])[50s:10s]) -# {} 1.2 +eval instant at 1000s rate(sum_over_time(metric3[30s:10s])[50s:10s]) + {} 1.2 -# Unsupported by streaming engine. -# eval instant at 1000s rate(sum_over_time((metric1+metric2+metric3)[30s:10s])[30s:10s]) -# {} 2.4 +eval instant at 1000s rate(sum_over_time((metric1+metric2+metric3)[30s:10s])[30s:10s]) + {} 2.4 clear @@ -146,20 +121,17 @@ eval instant at 80s rate(metric[1m]) {} 2.517857143 # No extrapolation, [2@20, 144@80]: (144 - 2) / 60 -# Unsupported by streaming engine. -# eval instant at 80s rate(metric[1m:10s]) -# {} 2.366666667 +eval instant at 80s rate(metric[1m:10s]) + {} 2.366666667 # Only one value between 10s and 20s, 2@14 eval instant at 20s min_over_time(metric[10s]) {} 2 # min(1@10, 2@20) -# Unsupported by streaming engine. -# eval instant at 20s min_over_time(metric[10s:10s]) -# {} 1 +eval instant at 20s min_over_time(metric[10s:10s]) + {} 1 -# Unsupported by streaming engine. -# eval instant at 20m min_over_time(rate(metric[5m])[20m:1m]) -# {} 0.12119047619047618 +eval instant at 20m min_over_time(rate(metric[5m])[20m:1m]) + {} 0.12119047619047618 diff --git a/pkg/streamingpromql/testing.go b/pkg/streamingpromql/testing.go index 01ff956bb8d..51599ecee64 100644 --- a/pkg/streamingpromql/testing.go +++ b/pkg/streamingpromql/testing.go @@ -12,12 +12,13 @@ import ( func NewTestEngineOpts() EngineOpts { return EngineOpts{ CommonOpts: promql.EngineOpts{ - Logger: nil, - Reg: nil, - MaxSamples: math.MaxInt, - Timeout: 100 * time.Second, - EnableAtModifier: true, - EnableNegativeOffset: true, + Logger: nil, + Reg: nil, + MaxSamples: math.MaxInt, + Timeout: 100 * time.Second, + EnableAtModifier: true, + EnableNegativeOffset: true, + NoStepSubqueryIntervalFn: func(int64) int64 { return time.Minute.Milliseconds() }, }, FeatureToggles: EnableAllFeatures, diff --git a/pkg/streamingpromql/types/data.go b/pkg/streamingpromql/types/data.go index 2ea14d9fa2c..3c2ce8e6606 100644 --- a/pkg/streamingpromql/types/data.go +++ b/pkg/streamingpromql/types/data.go @@ -67,10 +67,10 @@ func (i *InstantVectorSeriesDataIterator) Next() (t int64, f float64, h *histogr return point.T, 0, point.H, true } -// RangeVectorStepData contains the timestamps associated with a single time step produced by a +// RangeVectorStepData contains the data and timestamps associated with a single time step produced by a // RangeVectorOperator. // -// All values are in milliseconds since the Unix epoch. +// All timestamps are in milliseconds since the Unix epoch. // // For example, if the operator represents the selector "some_metric[5m]", and this time step is for // 2024-05-02T00:00:00Z, then: @@ -84,6 +84,26 @@ func (i *InstantVectorSeriesDataIterator) Next() (t int64, f float64, h *histogr // - RangeStart is 1712015700000 (2024-04-01T23:55:00Z) // - RangeEnd is 1712016000000 (2024-04-02T00:00:00Z) type RangeVectorStepData struct { + // Floats contains the float samples for this time step, and possibly points beyond the end of the + // selected range. Callers should compare points' timestamps to RangeEnd. + // + // The ring buffer must not be modified, including closing it, as RangeVectorOperator implementations + // may return the same ring buffer for subsequent steps and reuse the same points, if the ranges for + // both steps overlap. + Floats *FPointRingBuffer + + // Histograms contains the histogram samples for this time step, and possibly points beyond the end of the + // selected range. Callers should compare points' timestamps to RangeEnd. + // + // The ring buffer must not be modified, including closing it, as RangeVectorOperator implementations + // may return the same ring buffer for subsequent steps and reuse the same points, if the ranges for + // both steps overlap. + // + // FloatHistogram instances in the buffer must not be modified as they may be returned for subsequent steps. + // FloatHistogram instances that are retained after the next call to NextStepSamples must be copied, as they + // may be modified on subsequent calls to NextStepSamples. + Histograms *HPointRingBuffer + // StepT is the timestamp of this time step. StepT int64 diff --git a/pkg/streamingpromql/types/fpoint_ring_buffer.go b/pkg/streamingpromql/types/fpoint_ring_buffer.go index fc1335567a3..cdbf1649312 100644 --- a/pkg/streamingpromql/types/fpoint_ring_buffer.go +++ b/pkg/streamingpromql/types/fpoint_ring_buffer.go @@ -150,12 +150,33 @@ func (b *FPointRingBuffer) Append(p promql.FPoint) error { return nil } -// Reset clears the contents of this buffer. +// Reset clears the contents of this buffer, but retains the underlying point slice for future reuse. func (b *FPointRingBuffer) Reset() { b.firstIndex = 0 b.size = 0 } +// Release clears the contents of this buffer and releases the underlying point slice. +// The buffer can be used again and will acquire a new slice when required. +func (b *FPointRingBuffer) Release() { + b.Reset() + putFPointSliceForRingBuffer(b.points, b.memoryConsumptionTracker) + b.points = nil +} + +// Use replaces the contents of this buffer with s. +// The points in s must be in time order, not contain duplicate timestamps and start at index 0. +// s will be modified in place when the buffer is modified, and callers should not modify s after passing it off to the ring buffer via Use. +// s will be returned to the pool when Close is called, Use is called again, or the buffer needs to expand, so callers +// should not return s to the pool themselves. +func (b *FPointRingBuffer) Use(s []promql.FPoint) { + putFPointSliceForRingBuffer(b.points, b.memoryConsumptionTracker) + + b.points = s + b.firstIndex = 0 + b.size = len(s) +} + // Close releases any resources associated with this buffer. func (b *FPointRingBuffer) Close() { putFPointSliceForRingBuffer(b.points, b.memoryConsumptionTracker) diff --git a/pkg/streamingpromql/types/hpoint_ring_buffer.go b/pkg/streamingpromql/types/hpoint_ring_buffer.go index 945db554d16..1887ac12b89 100644 --- a/pkg/streamingpromql/types/hpoint_ring_buffer.go +++ b/pkg/streamingpromql/types/hpoint_ring_buffer.go @@ -201,12 +201,33 @@ func (b *HPointRingBuffer) RemoveLastPoint() { } } -// Reset clears the contents of this buffer. +// Reset clears the contents of this buffer, but retains the underlying point slice for future reuse. func (b *HPointRingBuffer) Reset() { b.firstIndex = 0 b.size = 0 } +// Release clears the contents of this buffer and releases the underlying point slice. +// The buffer can be used again and will acquire a new slice when required. +func (b *HPointRingBuffer) Release() { + b.Reset() + putHPointSliceForRingBuffer(b.points, b.memoryConsumptionTracker) + b.points = nil +} + +// Use replaces the contents of this buffer with s. +// The points in s must be in time order, not contain duplicate timestamps and start at index 0. +// s will be modified in place when the buffer is modified, and callers should not modify s after passing it off to the ring buffer via Use. +// s will be returned to the pool when Close is called, Use is called again, or the buffer needs to expand, so callers +// should not return s to the pool themselves. +func (b *HPointRingBuffer) Use(s []promql.HPoint) { + putHPointSliceForRingBuffer(b.points, b.memoryConsumptionTracker) + + b.points = s + b.firstIndex = 0 + b.size = len(s) +} + // Close releases any resources associated with this buffer. func (b *HPointRingBuffer) Close() { putHPointSliceForRingBuffer(b.points, b.memoryConsumptionTracker) diff --git a/pkg/streamingpromql/types/operator.go b/pkg/streamingpromql/types/operator.go index 42815f3a46b..c4553c8b434 100644 --- a/pkg/streamingpromql/types/operator.go +++ b/pkg/streamingpromql/types/operator.go @@ -60,17 +60,10 @@ type RangeVectorOperator interface { // SeriesMetadata must be called exactly once before calling NextSeries. NextSeries(ctx context.Context) error - // NextStepSamples populates the provided RingBuffers with the samples for the next time step for the - // current series and returns the timestamps of the next time step, or returns EOS if no more time + // NextStepSamples returns populated RingBuffers with the samples for the next time step for the + // current series and the timestamps of the next time step, or returns EOS if no more time // steps are available. - // The provided RingBuffers are expected to only contain points for the current series, and the same - // RingBuffers should be passed to subsequent NextStepSamples calls for the same series. - // The provided RingBuffers may be populated with points beyond the end of the expected time range, and - // callers should compare returned points' timestamps to the returned RangeVectorStepData.RangeEnd. - // Next must be called at least once before calling NextStepSamples. - // Keep in mind that HPoint contains a pointer to a histogram, so it is generally not safe to - // modify directly as the histogram may be used for other HPoint values, such as when lookback has occurred. - NextStepSamples(floats *FPointRingBuffer, histograms *HPointRingBuffer) (RangeVectorStepData, error) + NextStepSamples() (RangeVectorStepData, error) } // ScalarOperator represents all operators that produce scalars. diff --git a/pkg/streamingpromql/types/ring_buffer_test.go b/pkg/streamingpromql/types/ring_buffer_test.go index 1e34b0d24c9..ba70c135a8e 100644 --- a/pkg/streamingpromql/types/ring_buffer_test.go +++ b/pkg/streamingpromql/types/ring_buffer_test.go @@ -26,6 +26,8 @@ type ringBuffer[T any] interface { AnyAtOrBefore(maxT int64) bool First() T Reset() + Use(s []T) + Release() GetPoints() []T GetFirstIndex() int GetTimestamp(point T) int64 @@ -110,6 +112,18 @@ func testRingBuffer[T any](t *testing.T, buf ringBuffer[T], points []T) { require.NoError(t, buf.Append(points[8])) shouldHavePoints(t, buf, points[8]) + + buf.Use(points) + shouldHavePoints(t, buf, points...) + + buf.DiscardPointsBefore(5) + shouldHavePoints(t, buf, points[4:]...) + + buf.Release() + shouldHaveNoPoints(t, buf) + + buf.Use(points[4:]) + shouldHavePoints(t, buf, points[4:]...) } func TestRingBuffer_DiscardPointsBefore_ThroughWrapAround(t *testing.T) {