From 665bb0fe99f29064c4251c82fe21371f4efe8c18 Mon Sep 17 00:00:00 2001 From: Charles Korn Date: Fri, 4 Oct 2024 13:57:51 +1000 Subject: [PATCH] MQE: add support for `and` (#9507) * Enable upstream test cases * Add feature flag # Conflicts: # cmd/mimir/help-all.txt.tmpl # pkg/streamingpromql/config.go # pkg/streamingpromql/engine_test.go * Add test cases. * Add skeleton * Initial implementation * Add to concurrency test * Add to edge cases test * Enable benchmarks * Clarify comment * Add changelog entry # Conflicts: # CHANGELOG.md * Address PR feedback: move `vectorMatchingGroupKeyFunc` to its own file. * Fix test * Address PR feedback: simplify `NextSeries` * Fix unpooled metadata slice creation and add linting script to catch similar issues in the future * Reuse slices in `FilterLeftSeries` if we can * Add missing license header to script * Reuse left series metadata slice --- CHANGELOG.md | 2 +- Makefile | 2 + cmd/mimir/config-descriptor.json | 11 + cmd/mimir/help-all.txt.tmpl | 2 + .../configuration-parameters/index.md | 5 + pkg/streamingpromql/benchmarks/benchmarks.go | 12 +- pkg/streamingpromql/config.go | 3 + .../engine_concurrency_test.go | 24 ++ pkg/streamingpromql/engine_test.go | 27 +- .../operators/and_binary_operation.go | 291 ++++++++++++++++++ .../operators/binary_operation.go | 41 +++ .../vector_vector_binary_operation.go | 34 +- pkg/streamingpromql/query.go | 14 +- .../testdata/ours/binary_operators.test | 70 +++++ .../testdata/upstream/operators.test | 80 +++-- tools/find-unpooled-slice-creation.sh | 30 ++ 16 files changed, 561 insertions(+), 87 deletions(-) create mode 100644 pkg/streamingpromql/operators/and_binary_operation.go create mode 100644 pkg/streamingpromql/operators/binary_operation.go create mode 100755 tools/find-unpooled-slice-creation.sh diff --git a/CHANGELOG.md b/CHANGELOG.md index fc5683f9e9..dd7abe2d40 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,7 +14,7 @@ * `cortex_alertmanager_alerts` * `cortex_alertmanager_silences` * [CHANGE] Cache: Deprecate experimental support for Redis as a cache backend. #9453 -* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #9367 #9368 #9398 #9399 #9403 #9417 #9418 #9419 #9420 #9482 #9504 #9505 +* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #9367 #9368 #9398 #9399 #9403 #9417 #9418 #9419 #9420 #9482 #9504 #9505 #9507 * [FEATURE] Query-frontend: added experimental configuration options `query-frontend.cache-errors` and `query-frontend.results-cache-ttl-for-errors` to allow non-transient responses to be cached. When set to `true` error responses from hitting limits or bad data are cached for a short TTL. #9028 * [FEATURE] gRPC: Support S2 compression. #9322 * `-alertmanager.alertmanager-client.grpc-compression=s2` diff --git a/Makefile b/Makefile index 6460ca17b0..a963d7b505 100644 --- a/Makefile +++ b/Makefile @@ -340,6 +340,8 @@ lint: ## Run lints to check for style issues. lint: check-makefiles misspell -error $(DOC_SOURCES_PATH) + ./tools/find-unpooled-slice-creation.sh + # Configured via .golangci.yml. golangci-lint run diff --git a/cmd/mimir/config-descriptor.json b/cmd/mimir/config-descriptor.json index ca3c0572bf..181140bef2 100644 --- a/cmd/mimir/config-descriptor.json +++ b/cmd/mimir/config-descriptor.json @@ -2011,6 +2011,17 @@ "fieldType": "boolean", "fieldCategory": "experimental" }, + { + "kind": "field", + "name": "enable_binary_logical_operations", + "required": false, + "desc": "Enable support for binary logical operations in Mimir's query engine. Only applies if the Mimir query engine is in use.", + "fieldValue": null, + "fieldDefaultValue": true, + "fieldFlag": "querier.mimir-query-engine.enable-binary-logical-operations", + "fieldType": "boolean", + "fieldCategory": "experimental" + }, { "kind": "field", "name": "enable_scalars", diff --git a/cmd/mimir/help-all.txt.tmpl b/cmd/mimir/help-all.txt.tmpl index 8de8807dbf..98b795f307 100644 --- a/cmd/mimir/help-all.txt.tmpl +++ b/cmd/mimir/help-all.txt.tmpl @@ -1941,6 +1941,8 @@ Usage of ./cmd/mimir/mimir: Maximum number of samples a single query can load into memory. This config option should be set on query-frontend too when query sharding is enabled. (default 50000000) -querier.mimir-query-engine.enable-aggregation-operations [experimental] Enable support for aggregation operations in Mimir's query engine. Only applies if the Mimir query engine is in use. (default true) + -querier.mimir-query-engine.enable-binary-logical-operations + [experimental] Enable support for binary logical operations in Mimir's query engine. Only applies if the Mimir query engine is in use. (default true) -querier.mimir-query-engine.enable-scalar-scalar-binary-comparison-operations [experimental] Enable support for binary comparison operations between two scalars in Mimir's query engine. Only applies if the Mimir query engine is in use. (default true) -querier.mimir-query-engine.enable-scalars diff --git a/docs/sources/mimir/configure/configuration-parameters/index.md b/docs/sources/mimir/configure/configuration-parameters/index.md index 28daa471cd..09122061fb 100644 --- a/docs/sources/mimir/configure/configuration-parameters/index.md +++ b/docs/sources/mimir/configure/configuration-parameters/index.md @@ -1522,6 +1522,11 @@ mimir_query_engine: # CLI flag: -querier.mimir-query-engine.enable-scalar-scalar-binary-comparison-operations [enable_scalar_scalar_binary_comparison_operations: | default = true] + # (experimental) Enable support for binary logical operations in Mimir's query + # engine. Only applies if the Mimir query engine is in use. + # CLI flag: -querier.mimir-query-engine.enable-binary-logical-operations + [enable_binary_logical_operations: | default = true] + # (experimental) Enable support for scalars in Mimir's query engine. Only # applies if the Mimir query engine is in use. # CLI flag: -querier.mimir-query-engine.enable-scalars diff --git a/pkg/streamingpromql/benchmarks/benchmarks.go b/pkg/streamingpromql/benchmarks/benchmarks.go index 6996f6e4e7..270e004bec 100644 --- a/pkg/streamingpromql/benchmarks/benchmarks.go +++ b/pkg/streamingpromql/benchmarks/benchmarks.go @@ -167,18 +167,18 @@ func TestCases(metricSizes []int) []BenchCase { { Expr: `a_2000 - b_2000{l="1234"}`, }, - //{ - // Expr: "a_X and b_X{l=~'.*[0-4]$'}", - //}, + { + Expr: "a_X and b_X{l=~'.*[0-4]$'}", + }, //{ // Expr: "a_X or b_X{l=~'.*[0-4]$'}", //}, //{ // Expr: "a_X unless b_X{l=~'.*[0-4]$'}", //}, - //{ - // Expr: "a_X and b_X{l='notfound'}", - //}, + { + Expr: "a_X and b_X{l='notfound'}", + }, //// Simple functions. //{ // Expr: "abs(a_X)", diff --git a/pkg/streamingpromql/config.go b/pkg/streamingpromql/config.go index d2ce79d0e0..9a994eb335 100644 --- a/pkg/streamingpromql/config.go +++ b/pkg/streamingpromql/config.go @@ -22,6 +22,7 @@ type FeatureToggles struct { EnableVectorVectorBinaryComparisonOperations bool `yaml:"enable_vector_vector_binary_comparison_operations" category:"experimental"` EnableVectorScalarBinaryComparisonOperations bool `yaml:"enable_vector_scalar_binary_comparison_operations" category:"experimental"` EnableScalarScalarBinaryComparisonOperations bool `yaml:"enable_scalar_scalar_binary_comparison_operations" category:"experimental"` + EnableBinaryLogicalOperations bool `yaml:"enable_binary_logical_operations" category:"experimental"` EnableScalars bool `yaml:"enable_scalars" category:"experimental"` } @@ -33,6 +34,7 @@ var EnableAllFeatures = FeatureToggles{ true, true, true, + true, } func (t *FeatureToggles) RegisterFlags(f *flag.FlagSet) { @@ -40,5 +42,6 @@ func (t *FeatureToggles) RegisterFlags(f *flag.FlagSet) { f.BoolVar(&t.EnableVectorVectorBinaryComparisonOperations, "querier.mimir-query-engine.enable-vector-vector-binary-comparison-operations", true, "Enable support for binary comparison operations between two vectors in Mimir's query engine. Only applies if the Mimir query engine is in use.") f.BoolVar(&t.EnableVectorScalarBinaryComparisonOperations, "querier.mimir-query-engine.enable-vector-scalar-binary-comparison-operations", true, "Enable support for binary comparison operations between a vector and a scalar in Mimir's query engine. Only applies if the Mimir query engine is in use.") f.BoolVar(&t.EnableScalarScalarBinaryComparisonOperations, "querier.mimir-query-engine.enable-scalar-scalar-binary-comparison-operations", true, "Enable support for binary comparison operations between two scalars in Mimir's query engine. Only applies if the Mimir query engine is in use.") + f.BoolVar(&t.EnableBinaryLogicalOperations, "querier.mimir-query-engine.enable-binary-logical-operations", true, "Enable support for binary logical operations in Mimir's query engine. Only applies if the Mimir query engine is in use.") f.BoolVar(&t.EnableScalars, "querier.mimir-query-engine.enable-scalars", true, "Enable support for scalars in Mimir's query engine. Only applies if the Mimir query engine is in use.") } diff --git a/pkg/streamingpromql/engine_concurrency_test.go b/pkg/streamingpromql/engine_concurrency_test.go index 4f966a370e..83896be91f 100644 --- a/pkg/streamingpromql/engine_concurrency_test.go +++ b/pkg/streamingpromql/engine_concurrency_test.go @@ -133,6 +133,30 @@ func TestConcurrentQueries(t *testing.T) { end: startT.Add(10 * time.Minute), step: time.Minute, }, + { + expr: `float{group="a"} and on (instance) float{group="b"}`, + start: startT, + end: startT.Add(10 * time.Minute), + step: time.Minute, + }, + { + expr: `native_histogram{group="a"} and on (instance) float{group="b"}`, + start: startT, + end: startT.Add(10 * time.Minute), + step: time.Minute, + }, + { + expr: `native_histogram{group="a"} and on (instance) native_histogram{group="b"}`, + start: startT, + end: startT.Add(10 * time.Minute), + step: time.Minute, + }, + { + expr: `{group="a"} and on (instance) float{group="b"}`, + start: startT, + end: startT.Add(10 * time.Minute), + step: time.Minute, + }, } storage := promqltest.LoadedStorage(t, data) diff --git a/pkg/streamingpromql/engine_test.go b/pkg/streamingpromql/engine_test.go index d2d3ade261..7ff5acf99a 100644 --- a/pkg/streamingpromql/engine_test.go +++ b/pkg/streamingpromql/engine_test.go @@ -42,7 +42,8 @@ func TestUnsupportedPromQLFeatures(t *testing.T) { // The goal of this is not to list every conceivable expression that is unsupported, but to cover all the // different cases and make sure we produce a reasonable error message when these cases are encountered. unsupportedExpressions := map[string]string{ - "metric{} or other_metric{}": "binary expression with many-to-many matching", + "metric{} or other_metric{}": "binary expression with 'or'", + "metric{} unless other_metric{}": "binary expression with 'unless'", "metric{} + on() group_left() other_metric{}": "binary expression with many-to-one matching", "metric{} + on() group_right() other_metric{}": "binary expression with one-to-many matching", "topk(5, metric{})": "'topk' aggregation with parameter", @@ -93,6 +94,7 @@ func TestUnsupportedPromQLFeaturesWithFeatureToggles(t *testing.T) { requireQueryIsSupported(t, featureToggles, "metric{} + 1") requireQueryIsSupported(t, featureToggles, "1 + metric{}") requireQueryIsSupported(t, featureToggles, "2 + 1") + requireQueryIsSupported(t, featureToggles, "metric{} and other_metric{}") }) t.Run("vector/scalar binary expressions with comparison operation", func(t *testing.T) { @@ -109,6 +111,7 @@ func TestUnsupportedPromQLFeaturesWithFeatureToggles(t *testing.T) { requireQueryIsSupported(t, featureToggles, "metric{} + 1") requireQueryIsSupported(t, featureToggles, "1 + metric{}") requireQueryIsSupported(t, featureToggles, "2 + 1") + requireQueryIsSupported(t, featureToggles, "metric{} and other_metric{}") }) t.Run("scalar/scalar binary expressions with comparison operation", func(t *testing.T) { @@ -125,6 +128,26 @@ func TestUnsupportedPromQLFeaturesWithFeatureToggles(t *testing.T) { requireQueryIsSupported(t, featureToggles, "metric{} + 1") requireQueryIsSupported(t, featureToggles, "1 + metric{}") requireQueryIsSupported(t, featureToggles, "2 + 1") + requireQueryIsSupported(t, featureToggles, "metric{} and other_metric{}") + }) + + t.Run("binary expressions with logical operations", func(t *testing.T) { + featureToggles := EnableAllFeatures + featureToggles.EnableBinaryLogicalOperations = false + + requireQueryIsUnsupported(t, featureToggles, "metric{} and other_metric{}", "binary expression with 'and'") + requireQueryIsUnsupported(t, featureToggles, "metric{} or other_metric{}", "binary expression with 'or'") + requireQueryIsUnsupported(t, featureToggles, "metric{} unless other_metric{}", "binary expression with 'unless'") + + // Other operations should still be supported. + requireQueryIsSupported(t, featureToggles, "metric{} + other_metric{}") + requireQueryIsSupported(t, featureToggles, "metric{} + 1") + requireQueryIsSupported(t, featureToggles, "1 + metric{}") + requireQueryIsSupported(t, featureToggles, "2 + 1") + requireQueryIsSupported(t, featureToggles, "metric{} > other_metric{}") + requireQueryIsSupported(t, featureToggles, "metric{} > 1") + requireQueryIsSupported(t, featureToggles, "1 > metric{}") + requireQueryIsSupported(t, featureToggles, "2 > bool 1") }) t.Run("scalars", func(t *testing.T) { @@ -1880,7 +1903,7 @@ func TestCompareVariousMixedMetricsBinaryOperations(t *testing.T) { expressions := []string{} for _, labels := range labelCombinations { - for _, op := range []string{"+", "-", "*", "/"} { + for _, op := range []string{"+", "-", "*", "/", "and"} { binaryExpr := fmt.Sprintf(`series{label="%s"}`, labels[0]) for _, label := range labels[1:] { binaryExpr += fmt.Sprintf(` %s series{label="%s"}`, op, label) diff --git a/pkg/streamingpromql/operators/and_binary_operation.go b/pkg/streamingpromql/operators/and_binary_operation.go new file mode 100644 index 0000000000..5d16a396a9 --- /dev/null +++ b/pkg/streamingpromql/operators/and_binary_operation.go @@ -0,0 +1,291 @@ +// SPDX-License-Identifier: AGPL-3.0-only + +package operators + +import ( + "context" + + "github.com/prometheus/prometheus/promql/parser" + "github.com/prometheus/prometheus/promql/parser/posrange" + + "github.com/grafana/mimir/pkg/streamingpromql/limiting" + "github.com/grafana/mimir/pkg/streamingpromql/types" +) + +// AndBinaryOperation represents a logical 'and' between two vectors. +type AndBinaryOperation struct { + Left types.InstantVectorOperator + Right types.InstantVectorOperator + VectorMatching parser.VectorMatching + MemoryConsumptionTracker *limiting.MemoryConsumptionTracker + + timeRange types.QueryTimeRange + expressionPosition posrange.PositionRange + leftSeriesGroups []*andGroup + rightSeriesGroups []*andGroup + nextRightSeriesIndex int +} + +var _ types.InstantVectorOperator = &AndBinaryOperation{} + +func NewAndBinaryOperation( + left types.InstantVectorOperator, + right types.InstantVectorOperator, + vectorMatching parser.VectorMatching, + memoryConsumptionTracker *limiting.MemoryConsumptionTracker, + timeRange types.QueryTimeRange, + expressionPosition posrange.PositionRange, +) *AndBinaryOperation { + return &AndBinaryOperation{ + Left: left, + Right: right, + VectorMatching: vectorMatching, + MemoryConsumptionTracker: memoryConsumptionTracker, + timeRange: timeRange, + expressionPosition: expressionPosition, + } +} + +func (a *AndBinaryOperation) SeriesMetadata(ctx context.Context) ([]types.SeriesMetadata, error) { + leftMetadata, err := a.Left.SeriesMetadata(ctx) + if err != nil { + return nil, err + } + + if len(leftMetadata) == 0 { + // We can't produce any series, we are done. + types.PutSeriesMetadataSlice(leftMetadata) + return nil, nil + } + + rightMetadata, err := a.Right.SeriesMetadata(ctx) + if err != nil { + return nil, err + } + + defer types.PutSeriesMetadataSlice(rightMetadata) + + if len(rightMetadata) == 0 { + // We can't produce any series, we are done. + return nil, nil + } + + groupMap := map[string]*andGroup{} + groupKeyFunc := vectorMatchingGroupKeyFunc(a.VectorMatching) + + // Iterate through the left-hand series, and create groups for each based on the matching labels. + a.leftSeriesGroups = make([]*andGroup, 0, len(leftMetadata)) + + for _, s := range leftMetadata { + groupKey := groupKeyFunc(s.Labels) + group, exists := groupMap[string(groupKey)] // Important: don't extract the string(...) call here - passing it directly allows us to avoid allocating it. + + if !exists { + group = &andGroup{lastRightSeriesIndex: -1} + groupMap[string(groupKey)] = group + } + + group.leftSeriesCount++ + a.leftSeriesGroups = append(a.leftSeriesGroups, group) + } + + // Iterate through the right-hand series, and find groups for each based on the matching labels. + a.rightSeriesGroups = make([]*andGroup, 0, len(rightMetadata)) + + for idx, s := range rightMetadata { + groupKey := groupKeyFunc(s.Labels) + group, exists := groupMap[string(groupKey)] // Important: don't extract the string(...) call here - passing it directly allows us to avoid allocating it. + + if exists { + group.lastRightSeriesIndex = idx + } + + // Even if there is no matching group, we want to store a nil value here so we know to throw the series away when we read it later. + a.rightSeriesGroups = append(a.rightSeriesGroups, group) + } + + // Iterate through the left-hand series again, and build the list of output series based on those that matched at least one series on the right. + // It's safe to reuse the left metadata slice as we'll return series in the same order, and only ever return fewer series than the left operator produces. + nextOutputSeriesIndex := 0 + + for seriesIdx, group := range a.leftSeriesGroups { + if group.lastRightSeriesIndex == -1 { + // This series doesn't match any series from the right side. + // Discard the group. + a.leftSeriesGroups[seriesIdx] = nil + } else { + leftMetadata[nextOutputSeriesIndex] = leftMetadata[seriesIdx] + nextOutputSeriesIndex++ + } + } + + return leftMetadata[:nextOutputSeriesIndex], nil +} + +func (a *AndBinaryOperation) NextSeries(ctx context.Context) (types.InstantVectorSeriesData, error) { + for { + if len(a.leftSeriesGroups) == 0 { + // No more series to return. + return types.InstantVectorSeriesData{}, types.EOS + } + + thisSeriesGroup := a.leftSeriesGroups[0] + a.leftSeriesGroups = a.leftSeriesGroups[1:] + + if thisSeriesGroup == nil { + // This series from the left side has no matching series on the right side. + // Read it, discard it, and move on to the next series. + d, err := a.Left.NextSeries(ctx) + if err != nil { + return types.InstantVectorSeriesData{}, err + } + + types.PutInstantVectorSeriesData(d, a.MemoryConsumptionTracker) + continue + } + + if err := a.readRightSideUntilGroupComplete(ctx, thisSeriesGroup); err != nil { + return types.InstantVectorSeriesData{}, err + } + + // Only read the left series after we've finished reading right series, to minimise the number of series we're + // holding in memory at once. + // We deliberately don't return this data to the pool, as FilterLeftSeries reuses the slices. + originalData, err := a.Left.NextSeries(ctx) + if err != nil { + return types.InstantVectorSeriesData{}, err + } + + filteredData, err := thisSeriesGroup.FilterLeftSeries(originalData, a.MemoryConsumptionTracker, a.timeRange) + if err != nil { + return types.InstantVectorSeriesData{}, err + } + + thisSeriesGroup.leftSeriesCount-- + + if thisSeriesGroup.leftSeriesCount == 0 { + // This is the last series for this group, return it to the pool. + thisSeriesGroup.Close(a.MemoryConsumptionTracker) + } + + return filteredData, nil + } +} + +// readRightSideUntilGroupComplete reads series from the right-hand side until all series for desiredGroup have been read. +func (a *AndBinaryOperation) readRightSideUntilGroupComplete(ctx context.Context, desiredGroup *andGroup) error { + for a.nextRightSeriesIndex <= desiredGroup.lastRightSeriesIndex { + groupForRightSeries := a.rightSeriesGroups[0] + a.rightSeriesGroups = a.rightSeriesGroups[1:] + + data, err := a.Right.NextSeries(ctx) + if err != nil { + return err + } + + if groupForRightSeries != nil { + if err := groupForRightSeries.AccumulateRightSeriesPresence(data, a.MemoryConsumptionTracker, a.timeRange); err != nil { + return err + } + } + + types.PutInstantVectorSeriesData(data, a.MemoryConsumptionTracker) + a.nextRightSeriesIndex++ + } + + return nil +} + +func (a *AndBinaryOperation) ExpressionPosition() posrange.PositionRange { + return a.expressionPosition +} + +func (a *AndBinaryOperation) Close() { + a.Left.Close() + a.Right.Close() +} + +type andGroup struct { + leftSeriesCount int + lastRightSeriesIndex int + rightSamplePresence []bool // FIXME: this would be a good candidate for a bitmap type +} + +// AccumulateRightSeriesPresence records the presence of samples on the right-hand side. +func (g *andGroup) AccumulateRightSeriesPresence(data types.InstantVectorSeriesData, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, timeRange types.QueryTimeRange) error { + if g.rightSamplePresence == nil { + var err error + g.rightSamplePresence, err = types.BoolSlicePool.Get(timeRange.StepCount, memoryConsumptionTracker) + + if err != nil { + return err + } + + g.rightSamplePresence = g.rightSamplePresence[:timeRange.StepCount] + } + + for _, p := range data.Floats { + g.rightSamplePresence[timeRange.PointIndex(p.T)] = true + } + + for _, p := range data.Histograms { + g.rightSamplePresence[timeRange.PointIndex(p.T)] = true + } + + return nil +} + +// FilterLeftSeries returns leftData filtered based on samples seen for the right-hand side. +// The return value reuses the slices from leftData, and returns any unused slices to the pool. +func (g *andGroup) FilterLeftSeries(leftData types.InstantVectorSeriesData, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, timeRange types.QueryTimeRange) (types.InstantVectorSeriesData, error) { + filteredData := types.InstantVectorSeriesData{} + nextOutputFloatIndex := 0 + + for _, p := range leftData.Floats { + if !g.rightSamplePresence[timeRange.PointIndex(p.T)] { + continue + } + + leftData.Floats[nextOutputFloatIndex] = p + nextOutputFloatIndex++ + } + + if nextOutputFloatIndex > 0 { + // We have at least one output float point to return. + filteredData.Floats = leftData.Floats[:nextOutputFloatIndex] + } else { + // We don't have any float points to return, return the original slice to the pool. + types.FPointSlicePool.Put(leftData.Floats, memoryConsumptionTracker) + } + + nextOutputHistogramIndex := 0 + + for idx, p := range leftData.Histograms { + if !g.rightSamplePresence[timeRange.PointIndex(p.T)] { + continue + } + + leftData.Histograms[nextOutputHistogramIndex] = p + + if idx > nextOutputHistogramIndex { + // Remove the histogram from the original point to ensure that it's not mutated unexpectedly when the HPoint slice is reused. + leftData.Histograms[idx].H = nil + } + + nextOutputHistogramIndex++ + } + + if nextOutputHistogramIndex > 0 { + // We have at least one output histogram point to return. + filteredData.Histograms = leftData.Histograms[:nextOutputHistogramIndex] + } else { + // We don't have any histogram points to return, return the original slice to the pool. + types.HPointSlicePool.Put(leftData.Histograms, memoryConsumptionTracker) + } + + return filteredData, nil +} + +func (g *andGroup) Close(memoryConsumptionTracker *limiting.MemoryConsumptionTracker) { + types.BoolSlicePool.Put(g.rightSamplePresence, memoryConsumptionTracker) +} diff --git a/pkg/streamingpromql/operators/binary_operation.go b/pkg/streamingpromql/operators/binary_operation.go new file mode 100644 index 0000000000..f588a23255 --- /dev/null +++ b/pkg/streamingpromql/operators/binary_operation.go @@ -0,0 +1,41 @@ +// SPDX-License-Identifier: AGPL-3.0-only + +package operators + +import ( + "slices" + + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/promql/parser" +) + +// vectorMatchingGroupKeyFunc returns a function that computes the grouping key of the output group a series belongs to. +// +// The return value from the function is valid until it is called again. +func vectorMatchingGroupKeyFunc(vectorMatching parser.VectorMatching) func(labels.Labels) []byte { + buf := make([]byte, 0, 1024) + + if vectorMatching.On { + slices.Sort(vectorMatching.MatchingLabels) + + return func(l labels.Labels) []byte { + return l.BytesWithLabels(buf, vectorMatching.MatchingLabels...) + } + } + + if len(vectorMatching.MatchingLabels) == 0 { + // Fast path for common case for expressions like "a + b" with no 'on' or 'without' labels. + return func(l labels.Labels) []byte { + return l.BytesWithoutLabels(buf, labels.MetricName) + } + } + + lbls := make([]string, 0, len(vectorMatching.MatchingLabels)+1) + lbls = append(lbls, labels.MetricName) + lbls = append(lbls, vectorMatching.MatchingLabels...) + slices.Sort(lbls) + + return func(l labels.Labels) []byte { + return l.BytesWithoutLabels(buf, lbls...) + } +} diff --git a/pkg/streamingpromql/operators/vector_vector_binary_operation.go b/pkg/streamingpromql/operators/vector_vector_binary_operation.go index dbf98af096..a91278e80b 100644 --- a/pkg/streamingpromql/operators/vector_vector_binary_operation.go +++ b/pkg/streamingpromql/operators/vector_vector_binary_operation.go @@ -9,7 +9,6 @@ import ( "context" "fmt" "math" - "slices" "sort" "time" @@ -199,7 +198,7 @@ func (b *VectorVectorBinaryOperation) loadSeriesMetadata(ctx context.Context) (b // - a list indicating which series from the right side are needed to compute the output func (b *VectorVectorBinaryOperation) computeOutputSeries() ([]types.SeriesMetadata, []*binaryOperationOutputSeries, []bool, []bool, error) { labelsFunc := b.groupLabelsFunc() - groupKeyFunc := b.groupKeyFunc() + groupKeyFunc := vectorMatchingGroupKeyFunc(b.VectorMatching) outputSeriesMap := map[string]*binaryOperationOutputSeries{} // Use the smaller side to populate the map of possible output series first. @@ -256,7 +255,7 @@ func (b *VectorVectorBinaryOperation) computeOutputSeries() ([]types.SeriesMetad } } - allMetadata := make([]types.SeriesMetadata, 0, len(outputSeriesMap)) + allMetadata := types.GetSeriesMetadataSlice(len(outputSeriesMap)) allSeries := make([]*binaryOperationOutputSeries, 0, len(outputSeriesMap)) leftSeriesUsed, err := types.BoolSlicePool.Get(len(b.leftMetadata), b.MemoryConsumptionTracker) @@ -397,35 +396,6 @@ func (b *VectorVectorBinaryOperation) groupLabelsFunc() func(labels.Labels) labe } } -// groupKeyFunc returns a function that computes the grouping key of the output group this series belongs to. -func (b *VectorVectorBinaryOperation) groupKeyFunc() func(labels.Labels) []byte { - buf := make([]byte, 0, 1024) - - if b.VectorMatching.On { - slices.Sort(b.VectorMatching.MatchingLabels) - - return func(l labels.Labels) []byte { - return l.BytesWithLabels(buf, b.VectorMatching.MatchingLabels...) - } - } - - if len(b.VectorMatching.MatchingLabels) == 0 { - // Fast path for common case for expressions like "a + b" with no 'on' or 'without' labels. - return func(l labels.Labels) []byte { - return l.BytesWithoutLabels(buf, labels.MetricName) - } - } - - lbls := make([]string, 0, len(b.VectorMatching.MatchingLabels)+1) - lbls = append(lbls, labels.MetricName) - lbls = append(lbls, b.VectorMatching.MatchingLabels...) - slices.Sort(lbls) - - return func(l labels.Labels) []byte { - return l.BytesWithoutLabels(buf, lbls...) - } -} - func (b *VectorVectorBinaryOperation) NextSeries(ctx context.Context) (types.InstantVectorSeriesData, error) { if len(b.remainingSeries) == 0 { return types.InstantVectorSeriesData{}, types.EOS diff --git a/pkg/streamingpromql/query.go b/pkg/streamingpromql/query.go index 164e9939c5..c4d14273bb 100644 --- a/pkg/streamingpromql/query.go +++ b/pkg/streamingpromql/query.go @@ -218,7 +218,11 @@ func (q *Query) convertToInstantVectorOperator(expr parser.Expr) (types.InstantV return nil, compat.NewNotSupportedError(fmt.Sprintf("vector/vector binary expression with '%v'", e.Op)) } - if e.VectorMatching.Card != parser.CardOneToOne { + if e.Op.IsSetOperator() && !q.engine.featureToggles.EnableBinaryLogicalOperations { + return nil, compat.NewNotSupportedError(fmt.Sprintf("binary expression with '%v'", e.Op)) + } + + if !e.Op.IsSetOperator() && e.VectorMatching.Card != parser.CardOneToOne { return nil, compat.NewNotSupportedError(fmt.Sprintf("binary expression with %v matching", e.VectorMatching.Card)) } @@ -232,7 +236,13 @@ func (q *Query) convertToInstantVectorOperator(expr parser.Expr) (types.InstantV return nil, err } - return operators.NewVectorVectorBinaryOperation(lhs, rhs, *e.VectorMatching, e.Op, e.ReturnBool, q.memoryConsumptionTracker, q.annotations, e.PositionRange()) + switch e.Op { + case parser.LAND: + return operators.NewAndBinaryOperation(lhs, rhs, *e.VectorMatching, q.memoryConsumptionTracker, q.timeRange, e.PositionRange()), nil + default: + return operators.NewVectorVectorBinaryOperation(lhs, rhs, *e.VectorMatching, e.Op, e.ReturnBool, q.memoryConsumptionTracker, q.annotations, e.PositionRange()) + } + case *parser.UnaryExpr: if e.Op != parser.SUB { return nil, compat.NewNotSupportedError(fmt.Sprintf("unary expression with '%s'", e.Op)) diff --git a/pkg/streamingpromql/testdata/ours/binary_operators.test b/pkg/streamingpromql/testdata/ours/binary_operators.test index 944901b117..1427313c63 100644 --- a/pkg/streamingpromql/testdata/ours/binary_operators.test +++ b/pkg/streamingpromql/testdata/ours/binary_operators.test @@ -759,3 +759,73 @@ eval range from 0 to 24m step 6m left_side > bool on (env) right_side eval range from 0 to 24m step 6m left_side > bool ignoring (pod) right_side {env="test"} 0 0 0 1 1 {env="prod"} 0 0 0 1 1 + +clear + +# Logical operations: and, or and unless. +# Single matching series on each side. +load 6m + left_side{env="test", pod="a"} 1 2 3 4 {{count:5 sum:5}} + left_side{env="test", pod="b"} 6 7 8 _ 10 + left_side{env="prod", pod="a"} 11 12 13 14 15 + left_side{env="prod", pod="b"} 16 17 18 _ _ + right_side{env="test", pod="a"} 0 0 0 {{count:0 sum:0}} {{count:0 sum:0}} + right_side{env="test", pod="b"} _ 0 _ 0 _ + right_side{env="prod", pod="b"} _ _ _ 0 0 + right_side{env="foo", pod="a"} 0 0 0 0 0 + +# {env="test", pod="a"}: Matching series, all samples align +# {env="test", pod="b"}: Matching series, only some samples align +# {env="prod", pod="a"}: No matching series on RHS +# {env="prod", pod="b"}: Matching series, but no samples align +# {env="foo", pod="a"}: No matching series on LHS +eval range from 0 to 24m step 6m left_side and right_side + left_side{env="test", pod="a"} 1 2 3 4 {{count:5 sum:5}} + left_side{env="test", pod="b"} _ 7 _ _ _ + +clear + +# Multiple matching series on each side. +load 6m + left_side{env="test", cluster="blah", pod="a"} 1 2 3 4 + left_side{env="test", cluster="blah", pod="b"} _ 6 7 8 + left_side{env="prod", cluster="blah", pod="a"} 9 10 11 12 + left_side{env="prod", cluster="blah", pod="b"} 13 14 15 16 + left_side{env="test", cluster="food", pod="a"} 17 18 19 20 + right_side{env="test", cluster="blah", idx="1"} 0 _ 0 _ + right_side{env="test", cluster="blah", idx="2"} 0 0 _ _ + right_side{env="prod", cluster="blah", idx="3"} _ 0 _ 0 + right_side{env="test", cluster="food", idx="4"} _ _ _ 0 + +eval range from 0 to 24m step 6m left_side and right_side + # Should return no results. + +eval range from 0 to 24m step 6m left_side and on(cluster, env) right_side + left_side{env="test", cluster="blah", pod="a"} 1 2 3 _ + left_side{env="test", cluster="blah", pod="b"} _ 6 7 _ + left_side{env="prod", cluster="blah", pod="a"} _ 10 _ 12 + left_side{env="prod", cluster="blah", pod="b"} _ 14 _ 16 + left_side{env="test", cluster="food", pod="a"} _ _ _ 20 + +# Same thing again, with labels in different order. +eval range from 0 to 24m step 6m left_side and on(env, cluster) right_side + left_side{env="test", cluster="blah", pod="a"} 1 2 3 _ + left_side{env="test", cluster="blah", pod="b"} _ 6 7 _ + left_side{env="prod", cluster="blah", pod="a"} _ 10 _ 12 + left_side{env="prod", cluster="blah", pod="b"} _ 14 _ 16 + left_side{env="test", cluster="food", pod="a"} _ _ _ 20 + +eval range from 0 to 24m step 6m left_side and ignoring(idx, pod) right_side + left_side{env="test", cluster="blah", pod="a"} 1 2 3 _ + left_side{env="test", cluster="blah", pod="b"} _ 6 7 _ + left_side{env="prod", cluster="blah", pod="a"} _ 10 _ 12 + left_side{env="prod", cluster="blah", pod="b"} _ 14 _ 16 + left_side{env="test", cluster="food", pod="a"} _ _ _ 20 + +# Same thing again, with labels in different order. +eval range from 0 to 24m step 6m left_side and ignoring(pod, idx) right_side + left_side{env="test", cluster="blah", pod="a"} 1 2 3 _ + left_side{env="test", cluster="blah", pod="b"} _ 6 7 _ + left_side{env="prod", cluster="blah", pod="a"} _ 10 _ 12 + left_side{env="prod", cluster="blah", pod="b"} _ 14 _ 16 + left_side{env="test", cluster="food", pod="a"} _ _ _ 20 diff --git a/pkg/streamingpromql/testdata/upstream/operators.test b/pkg/streamingpromql/testdata/upstream/operators.test index b1d0754a16..035804cf10 100644 --- a/pkg/streamingpromql/testdata/upstream/operators.test +++ b/pkg/streamingpromql/testdata/upstream/operators.test @@ -144,35 +144,29 @@ eval instant at 50m (rate((http_requests[25m])) * 25) * 60 {group="production", instance="1", job="app-server"} 300 -# Unsupported by streaming engine. -# eval instant at 50m http_requests{group="canary"} and http_requests{instance="0"} -# http_requests{group="canary", instance="0", job="api-server"} 300 -# http_requests{group="canary", instance="0", job="app-server"} 700 +eval instant at 50m http_requests{group="canary"} and http_requests{instance="0"} + http_requests{group="canary", instance="0", job="api-server"} 300 + http_requests{group="canary", instance="0", job="app-server"} 700 -# Unsupported by streaming engine. -# eval instant at 50m (http_requests{group="canary"} + 1) and http_requests{instance="0"} -# {group="canary", instance="0", job="api-server"} 301 -# {group="canary", instance="0", job="app-server"} 701 +eval instant at 50m (http_requests{group="canary"} + 1) and http_requests{instance="0"} + {group="canary", instance="0", job="api-server"} 301 + {group="canary", instance="0", job="app-server"} 701 -# Unsupported by streaming engine. -# eval instant at 50m (http_requests{group="canary"} + 1) and on(instance, job) http_requests{instance="0", group="production"} -# {group="canary", instance="0", job="api-server"} 301 -# {group="canary", instance="0", job="app-server"} 701 +eval instant at 50m (http_requests{group="canary"} + 1) and on(instance, job) http_requests{instance="0", group="production"} + {group="canary", instance="0", job="api-server"} 301 + {group="canary", instance="0", job="app-server"} 701 -# Unsupported by streaming engine. -# eval instant at 50m (http_requests{group="canary"} + 1) and on(instance) http_requests{instance="0", group="production"} -# {group="canary", instance="0", job="api-server"} 301 -# {group="canary", instance="0", job="app-server"} 701 +eval instant at 50m (http_requests{group="canary"} + 1) and on(instance) http_requests{instance="0", group="production"} + {group="canary", instance="0", job="api-server"} 301 + {group="canary", instance="0", job="app-server"} 701 -# Unsupported by streaming engine. -# eval instant at 50m (http_requests{group="canary"} + 1) and ignoring(group) http_requests{instance="0", group="production"} -# {group="canary", instance="0", job="api-server"} 301 -# {group="canary", instance="0", job="app-server"} 701 +eval instant at 50m (http_requests{group="canary"} + 1) and ignoring(group) http_requests{instance="0", group="production"} + {group="canary", instance="0", job="api-server"} 301 + {group="canary", instance="0", job="app-server"} 701 -# Unsupported by streaming engine. -# eval instant at 50m (http_requests{group="canary"} + 1) and ignoring(group, job) http_requests{instance="0", group="production"} -# {group="canary", instance="0", job="api-server"} 301 -# {group="canary", instance="0", job="app-server"} 701 +eval instant at 50m (http_requests{group="canary"} + 1) and ignoring(group, job) http_requests{instance="0", group="production"} + {group="canary", instance="0", job="api-server"} 301 + {group="canary", instance="0", job="app-server"} 701 # Unsupported by streaming engine. # eval instant at 50m http_requests{group="canary"} or http_requests{group="production"} @@ -250,27 +244,25 @@ eval instant at 50m http_requests{group="canary"} / ignoring(group) http_request {instance="1", job="app-server"} 1.3333333333333333 # https://github.com/prometheus/prometheus/issues/1489 -# Unsupported by streaming engine. -# eval instant at 50m http_requests AND ON (dummy) vector(1) -# http_requests{group="canary", instance="0", job="api-server"} 300 -# http_requests{group="canary", instance="0", job="app-server"} 700 -# http_requests{group="canary", instance="1", job="api-server"} 400 -# http_requests{group="canary", instance="1", job="app-server"} 800 -# http_requests{group="production", instance="0", job="api-server"} 100 -# http_requests{group="production", instance="0", job="app-server"} 500 -# http_requests{group="production", instance="1", job="api-server"} 200 -# http_requests{group="production", instance="1", job="app-server"} 600 +eval instant at 50m http_requests AND ON (dummy) vector(1) + http_requests{group="canary", instance="0", job="api-server"} 300 + http_requests{group="canary", instance="0", job="app-server"} 700 + http_requests{group="canary", instance="1", job="api-server"} 400 + http_requests{group="canary", instance="1", job="app-server"} 800 + http_requests{group="production", instance="0", job="api-server"} 100 + http_requests{group="production", instance="0", job="app-server"} 500 + http_requests{group="production", instance="1", job="api-server"} 200 + http_requests{group="production", instance="1", job="app-server"} 600 -# Unsupported by streaming engine. -# eval instant at 50m http_requests AND IGNORING (group, instance, job) vector(1) -# http_requests{group="canary", instance="0", job="api-server"} 300 -# http_requests{group="canary", instance="0", job="app-server"} 700 -# http_requests{group="canary", instance="1", job="api-server"} 400 -# http_requests{group="canary", instance="1", job="app-server"} 800 -# http_requests{group="production", instance="0", job="api-server"} 100 -# http_requests{group="production", instance="0", job="app-server"} 500 -# http_requests{group="production", instance="1", job="api-server"} 200 -# http_requests{group="production", instance="1", job="app-server"} 600 +eval instant at 50m http_requests AND IGNORING (group, instance, job) vector(1) + http_requests{group="canary", instance="0", job="api-server"} 300 + http_requests{group="canary", instance="0", job="app-server"} 700 + http_requests{group="canary", instance="1", job="api-server"} 400 + http_requests{group="canary", instance="1", job="app-server"} 800 + http_requests{group="production", instance="0", job="api-server"} 100 + http_requests{group="production", instance="0", job="app-server"} 500 + http_requests{group="production", instance="1", job="api-server"} 200 + http_requests{group="production", instance="1", job="app-server"} 600 # Comparisons. diff --git a/tools/find-unpooled-slice-creation.sh b/tools/find-unpooled-slice-creation.sh new file mode 100755 index 0000000000..5d5b8f584b --- /dev/null +++ b/tools/find-unpooled-slice-creation.sh @@ -0,0 +1,30 @@ +#! /usr/bin/env bash +# SPDX-License-Identifier: AGPL-3.0-only + +set -euo pipefail + +SCRIPT_DIR=$(realpath "$(dirname "${0}")") +PROJECT_DIR=$(cd "$SCRIPT_DIR/.." && pwd) + +MATCHES=$( + cd "$PROJECT_DIR"; + grep \ + -R \ + -n \ + --exclude '*_test.go' \ + --exclude '*pool.go' \ + -e 'make(\[\]types\.SeriesMetadata,' \ + -e 'make(\[\]promql\.FPoint,' \ + -e 'make(\[\]promql\.HPoint,' \ + -e 'make(\[\]float64,' \ + -e 'make(\[\]bool,' \ + -e 'make(\[\]\*histogram\.FloatHistogram,' \ + -e 'make(promql\.Vector,' \ + 'pkg/streamingpromql' || true +) + +if [ -n "$MATCHES" ]; then + echo "Found one or more instances of creating a slice directly that should be taken from a pool:" + echo "$MATCHES" + exit 1 +fi