Skip to content

Commit

Permalink
MQE: create sub-packages for operators (#9693)
Browse files Browse the repository at this point in the history
* Move scalar-related operators to their own package

* Move binary operations to their own package

* Move aggregations and functions to their own packages

* Move selectors to their own package

* Break dependency on `functions` package from `aggregations` and `binops`

* Update linting script to ignore testing utilities
  • Loading branch information
charleskorn authored Oct 21, 2024
1 parent cc59524 commit b2c35f5
Show file tree
Hide file tree
Showing 48 changed files with 282 additions and 268 deletions.
43 changes: 22 additions & 21 deletions pkg/streamingpromql/functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@ import (
"github.com/prometheus/prometheus/promql/parser/posrange"
"github.com/prometheus/prometheus/util/annotations"

"github.com/grafana/mimir/pkg/streamingpromql/functions"
"github.com/grafana/mimir/pkg/streamingpromql/limiting"
"github.com/grafana/mimir/pkg/streamingpromql/operators"
"github.com/grafana/mimir/pkg/streamingpromql/operators/functions"
"github.com/grafana/mimir/pkg/streamingpromql/operators/scalars"
"github.com/grafana/mimir/pkg/streamingpromql/types"
)

Expand All @@ -37,7 +38,7 @@ type ScalarFunctionOperatorFactory func(
// Parameters:
// - name: The name of the function
// - f: The function implementation
func SingleInputVectorFunctionOperatorFactory(name string, f functions.FunctionOverInstantVector) InstantVectorFunctionOperatorFactory {
func SingleInputVectorFunctionOperatorFactory(name string, f functions.FunctionOverInstantVectorDefinition) InstantVectorFunctionOperatorFactory {
return func(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, _ types.QueryTimeRange) (types.InstantVectorOperator, error) {
if len(args) != 1 {
// Should be caught by the PromQL parser, but we check here for safety.
Expand All @@ -50,7 +51,7 @@ func SingleInputVectorFunctionOperatorFactory(name string, f functions.FunctionO
return nil, fmt.Errorf("expected an instant vector argument for %s, got %T", name, args[0])
}

var o types.InstantVectorOperator = operators.NewFunctionOverInstantVector(inner, nil, memoryConsumptionTracker, f, expressionPosition)
var o types.InstantVectorOperator = functions.NewFunctionOverInstantVector(inner, nil, memoryConsumptionTracker, f, expressionPosition)

if f.SeriesMetadataFunction.NeedsSeriesDeduplication {
o = operators.NewDeduplicateAndMerge(o, memoryConsumptionTracker)
Expand All @@ -67,7 +68,7 @@ func SingleInputVectorFunctionOperatorFactory(name string, f functions.FunctionO
// - name: The name of the function
// - seriesDataFunc: The function to handle series data
func InstantVectorTransformationFunctionOperatorFactory(name string, seriesDataFunc functions.InstantVectorSeriesFunction) InstantVectorFunctionOperatorFactory {
f := functions.FunctionOverInstantVector{
f := functions.FunctionOverInstantVectorDefinition{
SeriesDataFunc: seriesDataFunc,
SeriesMetadataFunction: functions.DropSeriesName,
}
Expand All @@ -84,7 +85,7 @@ func InstantVectorTransformationFunctionOperatorFactory(name string, seriesDataF
// - name: The name of the function
// - metadataFunc: The function for handling metadata
func InstantVectorLabelManipulationFunctionOperatorFactory(name string, metadataFunc functions.SeriesMetadataFunctionDefinition) InstantVectorFunctionOperatorFactory {
f := functions.FunctionOverInstantVector{
f := functions.FunctionOverInstantVectorDefinition{
SeriesDataFunc: functions.PassthroughData,
SeriesMetadataFunction: metadataFunc,
}
Expand All @@ -100,7 +101,7 @@ func InstantVectorLabelManipulationFunctionOperatorFactory(name string, metadata
// - f: The function implementation
func FunctionOverRangeVectorOperatorFactory(
name string,
f functions.FunctionOverRangeVector,
f functions.FunctionOverRangeVectorDefinition,
) InstantVectorFunctionOperatorFactory {
return func(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, annotations *annotations.Annotations, expressionPosition posrange.PositionRange, _ types.QueryTimeRange) (types.InstantVectorOperator, error) {
if len(args) != 1 {
Expand All @@ -114,7 +115,7 @@ func FunctionOverRangeVectorOperatorFactory(
return nil, fmt.Errorf("expected a range vector argument for %s, got %T", name, args[0])
}

var o types.InstantVectorOperator = operators.NewFunctionOverRangeVector(inner, memoryConsumptionTracker, f, annotations, expressionPosition)
var o types.InstantVectorOperator = functions.NewFunctionOverRangeVector(inner, memoryConsumptionTracker, f, annotations, expressionPosition)

if f.SeriesMetadataFunction.NeedsSeriesDeduplication {
o = operators.NewDeduplicateAndMerge(o, memoryConsumptionTracker)
Expand All @@ -136,7 +137,7 @@ func scalarToInstantVectorOperatorFactory(args []types.Operator, _ *limiting.Mem
return nil, fmt.Errorf("expected a scalar argument for vector, got %T", args[0])
}

return operators.NewScalarToInstantVector(inner, expressionPosition), nil
return scalars.NewScalarToInstantVector(inner, expressionPosition), nil
}

func LabelReplaceFunctionOperatorFactory() InstantVectorFunctionOperatorFactory {
Expand Down Expand Up @@ -176,15 +177,15 @@ func LabelReplaceFunctionOperatorFactory() InstantVectorFunctionOperatorFactory
return nil, fmt.Errorf("expected a string for 5th argument for label_replace, got %T", args[4])
}

f := functions.FunctionOverInstantVector{
f := functions.FunctionOverInstantVectorDefinition{
SeriesDataFunc: functions.PassthroughData,
SeriesMetadataFunction: functions.SeriesMetadataFunctionDefinition{
Func: functions.LabelReplaceFactory(dstLabel, replacement, srcLabel, regex),
NeedsSeriesDeduplication: true,
},
}

o := operators.NewFunctionOverInstantVector(inner, nil, memoryConsumptionTracker, f, expressionPosition)
o := functions.NewFunctionOverInstantVector(inner, nil, memoryConsumptionTracker, f, expressionPosition)

return operators.NewDeduplicateAndMerge(o, memoryConsumptionTracker), nil
}
Expand Down Expand Up @@ -215,12 +216,12 @@ func ClampFunctionOperatorFactory() InstantVectorFunctionOperatorFactory {
return nil, fmt.Errorf("expected a scalar for 3rd argument for clamp, got %T", args[2])
}

f := functions.FunctionOverInstantVector{
f := functions.FunctionOverInstantVectorDefinition{
SeriesDataFunc: functions.Clamp,
SeriesMetadataFunction: functions.DropSeriesName,
}

return operators.NewFunctionOverInstantVector(inner, []types.ScalarOperator{min, max}, memoryConsumptionTracker, f, expressionPosition), nil
return functions.NewFunctionOverInstantVector(inner, []types.ScalarOperator{min, max}, memoryConsumptionTracker, f, expressionPosition), nil
}
}

Expand All @@ -243,12 +244,12 @@ func ClampMinMaxFunctionOperatorFactory(functionName string, isMin bool) Instant
return nil, fmt.Errorf("expected a scalar for 2nd argument for %s, got %T", functionName, args[1])
}

f := functions.FunctionOverInstantVector{
f := functions.FunctionOverInstantVectorDefinition{
SeriesDataFunc: functions.ClampMinMaxFactory(isMin),
SeriesMetadataFunction: functions.DropSeriesName,
}

return operators.NewFunctionOverInstantVector(inner, []types.ScalarOperator{clampTo}, memoryConsumptionTracker, f, expressionPosition), nil
return functions.NewFunctionOverInstantVector(inner, []types.ScalarOperator{clampTo}, memoryConsumptionTracker, f, expressionPosition), nil
}
}

Expand All @@ -273,18 +274,18 @@ func RoundFunctionOperatorFactory() InstantVectorFunctionOperatorFactory {
return nil, fmt.Errorf("expected a scalar for 2nd argument for round, got %T", args[1])
}
} else {
toNearest = operators.NewScalarConstant(float64(1), timeRange, memoryConsumptionTracker, expressionPosition)
toNearest = scalars.NewScalarConstant(float64(1), timeRange, memoryConsumptionTracker, expressionPosition)
}

f := functions.FunctionOverInstantVector{
f := functions.FunctionOverInstantVectorDefinition{
SeriesDataFunc: functions.Round,
// TODO(jhesketh): With the version of Prometheus vendored at the time of writing, round does not drop the
// __name__ label, and this is verified by our tests.
// We match this for consistency, but will need to drop them once prometheus 3.0 is vendored in.
SeriesMetadataFunction: functions.SeriesMetadataFunctionDefinition{},
}

return operators.NewFunctionOverInstantVector(inner, []types.ScalarOperator{toNearest}, memoryConsumptionTracker, f, expressionPosition), nil
return functions.NewFunctionOverInstantVector(inner, []types.ScalarOperator{toNearest}, memoryConsumptionTracker, f, expressionPosition), nil
}
}

Expand Down Expand Up @@ -365,7 +366,7 @@ func piOperatorFactory(args []types.Operator, memoryConsumptionTracker *limiting
return nil, fmt.Errorf("expected exactly 0 arguments for pi, got %v", len(args))
}

return operators.NewScalarConstant(math.Pi, timeRange, memoryConsumptionTracker, expressionPosition), nil
return scalars.NewScalarConstant(math.Pi, timeRange, memoryConsumptionTracker, expressionPosition), nil
}

func instantVectorToScalarOperatorFactory(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, timeRange types.QueryTimeRange) (types.ScalarOperator, error) {
Expand All @@ -380,15 +381,15 @@ func instantVectorToScalarOperatorFactory(args []types.Operator, memoryConsumpti
return nil, fmt.Errorf("expected an instant vector argument for scalar, got %T", args[0])
}

return operators.NewInstantVectorToScalar(inner, timeRange, memoryConsumptionTracker, expressionPosition), nil
return scalars.NewInstantVectorToScalar(inner, timeRange, memoryConsumptionTracker, expressionPosition), nil
}

func unaryNegationOfInstantVectorOperatorFactory(inner types.InstantVectorOperator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, expressionPosition posrange.PositionRange) types.InstantVectorOperator {
f := functions.FunctionOverInstantVector{
f := functions.FunctionOverInstantVectorDefinition{
SeriesDataFunc: functions.UnaryNegation,
SeriesMetadataFunction: functions.DropSeriesName,
}

o := operators.NewFunctionOverInstantVector(inner, nil, memoryConsumptionTracker, f, expressionPosition)
o := functions.NewFunctionOverInstantVector(inner, nil, memoryConsumptionTracker, f, expressionPosition)
return operators.NewDeduplicateAndMerge(o, memoryConsumptionTracker)
}
2 changes: 1 addition & 1 deletion pkg/streamingpromql/functions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (

"github.com/stretchr/testify/require"

"github.com/grafana/mimir/pkg/streamingpromql/functions"
"github.com/grafana/mimir/pkg/streamingpromql/operators/functions"
)

func TestRegisterInstantVectorFunctionOperatorFactory(t *testing.T) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
// Provenance-includes-license: Apache-2.0
// Provenance-includes-copyright: The Prometheus Authors

package operators
package aggregations

import (
"context"
Expand All @@ -18,10 +18,9 @@ import (
"github.com/prometheus/prometheus/util/annotations"
"github.com/prometheus/prometheus/util/zeropool"

"github.com/grafana/mimir/pkg/streamingpromql/aggregations"
"github.com/grafana/mimir/pkg/streamingpromql/compat"
"github.com/grafana/mimir/pkg/streamingpromql/functions"
"github.com/grafana/mimir/pkg/streamingpromql/limiting"
"github.com/grafana/mimir/pkg/streamingpromql/operators"
"github.com/grafana/mimir/pkg/streamingpromql/types"
)

Expand All @@ -32,15 +31,15 @@ type Aggregation struct {
Without bool
MemoryConsumptionTracker *limiting.MemoryConsumptionTracker

aggregationGroupFactory aggregations.AggregationGroupFactory
aggregationGroupFactory AggregationGroupFactory

Annotations *annotations.Annotations

metricNames *MetricNames
metricNames *operators.MetricNames
currentSeriesIndex int

expressionPosition posrange.PositionRange
emitAnnotationFunc functions.EmitAnnotationFunc
emitAnnotationFunc types.EmitAnnotationFunc

remainingInnerSeriesToGroup []*group // One entry per series produced by Inner, value is the group for that series
remainingGroups []*group // One entry per group, in the order we want to return them
Expand All @@ -58,7 +57,7 @@ func NewAggregation(
annotations *annotations.Annotations,
expressionPosition posrange.PositionRange,
) (*Aggregation, error) {
opGroupFactory := aggregations.AggregationGroupFactories[op]
opGroupFactory := AggregationGroupFactories[op]
if opGroupFactory == nil {
return nil, compat.NewNotSupportedError(fmt.Sprintf("aggregation operation with '%s'", op))
}
Expand All @@ -79,7 +78,7 @@ func NewAggregation(
Without: without,
MemoryConsumptionTracker: memoryConsumptionTracker,
Annotations: annotations,
metricNames: &MetricNames{},
metricNames: &operators.MetricNames{},
expressionPosition: expressionPosition,
aggregationGroupFactory: opGroupFactory,
}
Expand All @@ -103,7 +102,7 @@ type group struct {
lastSeriesIndex int

// The aggregation for this group of series.
aggregation aggregations.AggregationGroup
aggregation AggregationGroup
}

var _ types.InstantVectorOperator = &Aggregation{}
Expand Down Expand Up @@ -302,7 +301,7 @@ func (a *Aggregation) accumulateUntilGroupComplete(ctx context.Context, g *group
return nil
}

func (a *Aggregation) emitAnnotation(generator functions.AnnotationGenerator) {
func (a *Aggregation) emitAnnotation(generator types.AnnotationGenerator) {
metricName := a.metricNames.GetMetricNameForSeries(a.currentSeriesIndex)
a.Annotations.Add(generator(metricName, a.Inner.ExpressionPosition()))
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// SPDX-License-Identifier: AGPL-3.0-only

package operators
package aggregations

import (
"context"
Expand All @@ -12,7 +12,8 @@ import (
"github.com/prometheus/prometheus/promql/parser/posrange"
"github.com/stretchr/testify/require"

"github.com/grafana/mimir/pkg/streamingpromql/aggregations"
"github.com/grafana/mimir/pkg/streamingpromql/operators"
"github.com/grafana/mimir/pkg/streamingpromql/testutils"
"github.com/grafana/mimir/pkg/streamingpromql/types"
)

Expand Down Expand Up @@ -79,15 +80,15 @@ func TestAggregation_ReturnsGroupsFinishedFirstEarliest(t *testing.T) {
for name, testCase := range testCases {
t.Run(name, func(t *testing.T) {
aggregator := &Aggregation{
Inner: &testOperator{series: testCase.inputSeries},
Inner: &operators.TestOperator{Series: testCase.inputSeries},
Grouping: testCase.grouping,
metricNames: &MetricNames{},
aggregationGroupFactory: func() aggregations.AggregationGroup { return &aggregations.SumAggregationGroup{} },
metricNames: &operators.MetricNames{},
aggregationGroupFactory: func() AggregationGroup { return &SumAggregationGroup{} },
}

outputSeries, err := aggregator.SeriesMetadata(context.Background())
require.NoError(t, err)
require.Equal(t, labelsToSeriesMetadata(testCase.expectedOutputSeriesOrder), outputSeries)
require.Equal(t, testutils.LabelsToSeriesMetadata(testCase.expectedOutputSeriesOrder), outputSeries)
})
}
}
Expand Down Expand Up @@ -253,17 +254,3 @@ func TestAggregation_GroupLabelling(t *testing.T) {
})
}
}

func labelsToSeriesMetadata(lbls []labels.Labels) []types.SeriesMetadata {
if len(lbls) == 0 {
return nil
}

m := make([]types.SeriesMetadata, len(lbls))

for i, l := range lbls {
m[i].Labels = l
}

return m
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ import (
"github.com/prometheus/prometheus/promql"

"github.com/grafana/mimir/pkg/streamingpromql/floats"
"github.com/grafana/mimir/pkg/streamingpromql/functions"
"github.com/grafana/mimir/pkg/streamingpromql/limiting"
"github.com/grafana/mimir/pkg/streamingpromql/operators/functions"
"github.com/grafana/mimir/pkg/streamingpromql/types"
)

Expand All @@ -33,7 +33,7 @@ type AvgAggregationGroup struct {
groupSeriesCounts []float64
}

func (g *AvgAggregationGroup) AccumulateSeries(data types.InstantVectorSeriesData, timeRange types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, emitAnnotationFunc functions.EmitAnnotationFunc) error {
func (g *AvgAggregationGroup) AccumulateSeries(data types.InstantVectorSeriesData, timeRange types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, emitAnnotationFunc types.EmitAnnotationFunc) error {
defer types.PutInstantVectorSeriesData(data, memoryConsumptionTracker)
if len(data.Floats) == 0 && len(data.Histograms) == 0 {
// Nothing to do
Expand Down Expand Up @@ -154,7 +154,7 @@ func (g *AvgAggregationGroup) accumulateFloats(data types.InstantVectorSeriesDat
return nil
}

func (g *AvgAggregationGroup) accumulateHistograms(data types.InstantVectorSeriesData, timeRange types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, emitAnnotationFunc functions.EmitAnnotationFunc) error {
func (g *AvgAggregationGroup) accumulateHistograms(data types.InstantVectorSeriesData, timeRange types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, emitAnnotationFunc types.EmitAnnotationFunc) error {
var err error
if len(data.Histograms) > 0 && g.histograms == nil {
// First series with histogram values for this group, populate it.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,14 @@ import (
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/promql/parser"

"github.com/grafana/mimir/pkg/streamingpromql/functions"
"github.com/grafana/mimir/pkg/streamingpromql/limiting"
"github.com/grafana/mimir/pkg/streamingpromql/types"
)

// AggregationGroup accumulates series that have been grouped together and computes the output series data.
type AggregationGroup interface {
// AccumulateSeries takes in a series as part of the group
AccumulateSeries(data types.InstantVectorSeriesData, timeRange types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, emitAnnotationFunc functions.EmitAnnotationFunc) error
AccumulateSeries(data types.InstantVectorSeriesData, timeRange types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, emitAnnotationFunc types.EmitAnnotationFunc) error
// ComputeOutputSeries does any final calculations and returns the grouped series data
ComputeOutputSeries(timeRange types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, bool, error)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ package aggregations
import (
"github.com/prometheus/prometheus/promql"

"github.com/grafana/mimir/pkg/streamingpromql/functions"
"github.com/grafana/mimir/pkg/streamingpromql/limiting"
"github.com/grafana/mimir/pkg/streamingpromql/types"
)
Expand Down Expand Up @@ -38,7 +37,7 @@ func (g *CountGroupAggregationGroup) groupAccumulatePoint(idx int64) {
g.values[idx] = 1
}

func (g *CountGroupAggregationGroup) AccumulateSeries(data types.InstantVectorSeriesData, timeRange types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ functions.EmitAnnotationFunc) error {
func (g *CountGroupAggregationGroup) AccumulateSeries(data types.InstantVectorSeriesData, timeRange types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ types.EmitAnnotationFunc) error {
if (len(data.Floats) > 0 || len(data.Histograms) > 0) && g.values == nil {
var err error
// First series with values for this group, populate it.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (

"github.com/prometheus/prometheus/promql"

"github.com/grafana/mimir/pkg/streamingpromql/functions"
"github.com/grafana/mimir/pkg/streamingpromql/limiting"
"github.com/grafana/mimir/pkg/streamingpromql/types"
)
Expand Down Expand Up @@ -49,7 +48,7 @@ func (g *MinMaxAggregationGroup) minAccumulatePoint(idx int64, f float64) {
}
}

func (g *MinMaxAggregationGroup) AccumulateSeries(data types.InstantVectorSeriesData, timeRange types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ functions.EmitAnnotationFunc) error {
func (g *MinMaxAggregationGroup) AccumulateSeries(data types.InstantVectorSeriesData, timeRange types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ types.EmitAnnotationFunc) error {
if (len(data.Floats) > 0 || len(data.Histograms) > 0) && g.floatValues == nil {
// Even if we only have histograms, we have to populate the float slices, as we'll treat histograms as if they have value 0.
// This is consistent with Prometheus but may not be the desired value: https://github.com/prometheus/prometheus/issues/14711
Expand Down
Loading

0 comments on commit b2c35f5

Please sign in to comment.