diff --git a/CHANGELOG.md b/CHANGELOG.md index 47bdd28aef..23c8b77bf9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,7 +13,7 @@ * `cortex_alertmanager_state_replication_failed_total` * `cortex_alertmanager_alerts` * `cortex_alertmanager_silences` -* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #9367 #9368 #9398 #9399 #9403 #9417 #9418 #9419 #9420 #9482 #9504 #9505 #9507 #9518 #9531 #9532 #9533 #9553 #9588 +* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #9367 #9368 #9398 #9399 #9403 #9417 #9418 #9419 #9420 #9482 #9504 #9505 #9507 #9518 #9531 #9532 #9533 #9553 #9558 #9588 * [FEATURE] Query-frontend: added experimental configuration options `query-frontend.cache-errors` and `query-frontend.results-cache-ttl-for-errors` to allow non-transient responses to be cached. When set to `true` error responses from hitting limits or bad data are cached for a short TTL. #9028 * [FEATURE] gRPC: Support S2 compression. #9322 * `-alertmanager.alertmanager-client.grpc-compression=s2` diff --git a/pkg/streamingpromql/functions.go b/pkg/streamingpromql/functions.go index b8e4471ffe..1a7cdd38f4 100644 --- a/pkg/streamingpromql/functions.go +++ b/pkg/streamingpromql/functions.go @@ -51,7 +51,7 @@ func SingleInputVectorFunctionOperatorFactory(name string, f functions.FunctionO var o types.InstantVectorOperator = operators.NewFunctionOverInstantVector(inner, memoryConsumptionTracker, f, expressionPosition) - if f.NeedsSeriesDeduplication { + if f.SeriesMetadataFunction.NeedsSeriesDeduplication { o = operators.NewDeduplicateAndMerge(o, memoryConsumptionTracker) } @@ -67,9 +67,8 @@ func SingleInputVectorFunctionOperatorFactory(name string, f functions.FunctionO // - seriesDataFunc: The function to handle series data func InstantVectorTransformationFunctionOperatorFactory(name string, seriesDataFunc functions.InstantVectorSeriesFunction) InstantVectorFunctionOperatorFactory { f := functions.FunctionOverInstantVector{ - SeriesDataFunc: seriesDataFunc, - SeriesMetadataFunc: functions.DropSeriesName, - NeedsSeriesDeduplication: true, + SeriesDataFunc: seriesDataFunc, + SeriesMetadataFunction: functions.DropSeriesName, } return SingleInputVectorFunctionOperatorFactory(name, f) @@ -83,12 +82,10 @@ func InstantVectorTransformationFunctionOperatorFactory(name string, seriesDataF // Parameters: // - name: The name of the function // - metadataFunc: The function for handling metadata -// - needsSeriesDeduplication: Set to true if metadataFunc may produce multiple series with the same labels and therefore deduplication is required -func InstantVectorLabelManipulationFunctionOperatorFactory(name string, metadataFunc functions.SeriesMetadataFunction, needsSeriesDeduplication bool) InstantVectorFunctionOperatorFactory { +func InstantVectorLabelManipulationFunctionOperatorFactory(name string, metadataFunc functions.SeriesMetadataFunctionDefinition) InstantVectorFunctionOperatorFactory { f := functions.FunctionOverInstantVector{ - SeriesDataFunc: functions.PassthroughData, - SeriesMetadataFunc: metadataFunc, - NeedsSeriesDeduplication: needsSeriesDeduplication, + SeriesDataFunc: functions.PassthroughData, + SeriesMetadataFunction: metadataFunc, } return SingleInputVectorFunctionOperatorFactory(name, f) @@ -118,7 +115,7 @@ func FunctionOverRangeVectorOperatorFactory( var o types.InstantVectorOperator = operators.NewFunctionOverRangeVector(inner, memoryConsumptionTracker, f, annotations, expressionPosition) - if f.NeedsSeriesDeduplication { + if f.SeriesMetadataFunction.NeedsSeriesDeduplication { o = operators.NewDeduplicateAndMerge(o, memoryConsumptionTracker) } @@ -179,9 +176,11 @@ func LabelReplaceFunctionOperatorFactory() InstantVectorFunctionOperatorFactory } f := functions.FunctionOverInstantVector{ - SeriesDataFunc: functions.PassthroughData, - SeriesMetadataFunc: functions.LabelReplaceFactory(dstLabel, replacement, srcLabel, regex), - NeedsSeriesDeduplication: true, + SeriesDataFunc: functions.PassthroughData, + SeriesMetadataFunction: functions.SeriesMetadataFunctionDefinition{ + Func: functions.LabelReplaceFactory(dstLabel, replacement, srcLabel, regex), + NeedsSeriesDeduplication: true, + }, } o := operators.NewFunctionOverInstantVector(inner, memoryConsumptionTracker, f, expressionPosition) @@ -283,9 +282,8 @@ func instantVectorToScalarOperatorFactory(args []types.Operator, memoryConsumpti func unaryNegationOfInstantVectorOperatorFactory(inner types.InstantVectorOperator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, expressionPosition posrange.PositionRange) types.InstantVectorOperator { f := functions.FunctionOverInstantVector{ - SeriesDataFunc: functions.UnaryNegation, - SeriesMetadataFunc: functions.DropSeriesName, - NeedsSeriesDeduplication: true, + SeriesDataFunc: functions.UnaryNegation, + SeriesMetadataFunction: functions.DropSeriesName, } o := operators.NewFunctionOverInstantVector(inner, memoryConsumptionTracker, f, expressionPosition) diff --git a/pkg/streamingpromql/functions/common.go b/pkg/streamingpromql/functions/common.go index a656a22bec..eddb42ab2e 100644 --- a/pkg/streamingpromql/functions/common.go +++ b/pkg/streamingpromql/functions/common.go @@ -13,15 +13,16 @@ import ( // SeriesMetadataFunction is a function to operate on the metadata across series. type SeriesMetadataFunction func(seriesMetadata []types.SeriesMetadata, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) ([]types.SeriesMetadata, error) -// DropSeriesName is a SeriesMetadataFunc that removes the __name__ label from all series in seriesMetadata. -// -// It does not check that the list of returned series is free of duplicates. -func DropSeriesName(seriesMetadata []types.SeriesMetadata, _ *limiting.MemoryConsumptionTracker) ([]types.SeriesMetadata, error) { - for i := range seriesMetadata { - seriesMetadata[i].Labels = seriesMetadata[i].Labels.DropMetricName() - } +// DropSeriesName is a series metadata function that removes the __name__ label from all series. +var DropSeriesName = SeriesMetadataFunctionDefinition{ + Func: func(seriesMetadata []types.SeriesMetadata, _ *limiting.MemoryConsumptionTracker) ([]types.SeriesMetadata, error) { + for i := range seriesMetadata { + seriesMetadata[i].Labels = seriesMetadata[i].Labels.DropMetricName() + } - return seriesMetadata, nil + return seriesMetadata, nil + }, + NeedsSeriesDeduplication: true, } // InstantVectorSeriesFunction is a function that takes in an instant vector and produces an instant vector. @@ -93,16 +94,10 @@ type FunctionOverInstantVector struct { // SeriesDataFunc is the function that computes an output series for a single input series. SeriesDataFunc InstantVectorSeriesFunction - // SeriesMetadataFunc is the function that computes the output series for this function based on the given input series. + // SeriesMetadataFunction is the function that computes the output series for this function based on the given input series. // - // If SeriesMetadataFunc is nil, the input series are used as-is. - SeriesMetadataFunc SeriesMetadataFunction - - // NeedsSeriesDeduplication enables deduplication and merging of output series with the same labels. - // - // This should be set to true if SeriesMetadataFunc modifies the input series labels in such a way that duplicates may be - // present in the output series labels (eg. dropping a label). - NeedsSeriesDeduplication bool + // If SeriesMetadataFunction.Func is nil, the input series are used as-is. + SeriesMetadataFunction SeriesMetadataFunctionDefinition } type FunctionOverRangeVector struct { @@ -120,17 +115,24 @@ type FunctionOverRangeVector struct { // SeriesValidationFuncFactory can be nil, in which case no validation is performed. SeriesValidationFuncFactory RangeVectorSeriesValidationFunctionFactory - // SeriesMetadataFunc is the function that computes the output series for this function based on the given input series. + // SeriesMetadataFunction is the function that computes the output series for this function based on the given input series. // - // If SeriesMetadataFunc is nil, the input series are used as-is. - SeriesMetadataFunc SeriesMetadataFunction + // If SeriesMetadataFunction.Func is nil, the input series are used as-is. + SeriesMetadataFunction SeriesMetadataFunctionDefinition + + // NeedsSeriesNamesForAnnotations indicates that this function uses the names of input series when emitting annotations. + NeedsSeriesNamesForAnnotations bool +} + +type SeriesMetadataFunctionDefinition struct { + // Func is the function that computes the output series for this function based on the given input series. + // + // If Func is nil, the input series are used as-is. + Func SeriesMetadataFunction // NeedsSeriesDeduplication enables deduplication and merging of output series with the same labels. // - // This should be set to true if SeriesMetadataFunc modifies the input series labels in such a way that duplicates may be + // This should be set to true if Func modifies the input series labels in such a way that duplicates may be // present in the output series labels (eg. dropping a label). NeedsSeriesDeduplication bool - - // NeedsSeriesNamesForAnnotations indicates that this function uses the names of input series when emitting annotations. - NeedsSeriesNamesForAnnotations bool } diff --git a/pkg/streamingpromql/functions/common_test.go b/pkg/streamingpromql/functions/common_test.go index 22d88f6cc4..2853aa3a13 100644 --- a/pkg/streamingpromql/functions/common_test.go +++ b/pkg/streamingpromql/functions/common_test.go @@ -25,7 +25,7 @@ func TestDropSeriesName(t *testing.T) { {Labels: labels.FromStrings("label2", "value2")}, } - modifiedMetadata, err := DropSeriesName(seriesMetadata, limiting.NewMemoryConsumptionTracker(0, nil)) + modifiedMetadata, err := DropSeriesName.Func(seriesMetadata, limiting.NewMemoryConsumptionTracker(0, nil)) require.NoError(t, err) require.Equal(t, expected, modifiedMetadata) } diff --git a/pkg/streamingpromql/functions/range_vectors.go b/pkg/streamingpromql/functions/range_vectors.go index 4d662a7623..df0193d860 100644 --- a/pkg/streamingpromql/functions/range_vectors.go +++ b/pkg/streamingpromql/functions/range_vectors.go @@ -17,9 +17,8 @@ import ( ) var CountOverTime = FunctionOverRangeVector{ - SeriesMetadataFunc: DropSeriesName, - NeedsSeriesDeduplication: true, - StepFunc: countOverTime, + SeriesMetadataFunction: DropSeriesName, + StepFunc: countOverTime, } func countOverTime(step types.RangeVectorStepData, _ float64, fPoints *types.FPointRingBuffer, hPoints *types.HPointRingBuffer, _ EmitAnnotationFunc) (float64, bool, *histogram.FloatHistogram, error) { @@ -34,8 +33,8 @@ func countOverTime(step types.RangeVectorStepData, _ float64, fPoints *types.FPo } var LastOverTime = FunctionOverRangeVector{ - SeriesMetadataFunc: nil, // We want to use the input series as-is. - StepFunc: lastOverTime, + // We want to use the input series as-is, so no need to set SeriesMetadataFunction. + StepFunc: lastOverTime, } func lastOverTime(step types.RangeVectorStepData, _ float64, fPoints *types.FPointRingBuffer, hPoints *types.HPointRingBuffer, _ EmitAnnotationFunc) (float64, bool, *histogram.FloatHistogram, error) { @@ -55,9 +54,8 @@ func lastOverTime(step types.RangeVectorStepData, _ float64, fPoints *types.FPoi } var PresentOverTime = FunctionOverRangeVector{ - SeriesMetadataFunc: DropSeriesName, - NeedsSeriesDeduplication: true, - StepFunc: presentOverTime, + SeriesMetadataFunction: DropSeriesName, + StepFunc: presentOverTime, } func presentOverTime(step types.RangeVectorStepData, _ float64, fPoints *types.FPointRingBuffer, hPoints *types.HPointRingBuffer, _ EmitAnnotationFunc) (float64, bool, *histogram.FloatHistogram, error) { @@ -69,9 +67,8 @@ func presentOverTime(step types.RangeVectorStepData, _ float64, fPoints *types.F } var MaxOverTime = FunctionOverRangeVector{ - SeriesMetadataFunc: DropSeriesName, - NeedsSeriesDeduplication: true, - StepFunc: maxOverTime, + SeriesMetadataFunction: DropSeriesName, + StepFunc: maxOverTime, } func maxOverTime(step types.RangeVectorStepData, _ float64, fPoints *types.FPointRingBuffer, _ *types.HPointRingBuffer, _ EmitAnnotationFunc) (float64, bool, *histogram.FloatHistogram, error) { @@ -107,9 +104,8 @@ func maxOverTime(step types.RangeVectorStepData, _ float64, fPoints *types.FPoin } var MinOverTime = FunctionOverRangeVector{ - SeriesMetadataFunc: DropSeriesName, - NeedsSeriesDeduplication: true, - StepFunc: minOverTime, + SeriesMetadataFunction: DropSeriesName, + StepFunc: minOverTime, } func minOverTime(step types.RangeVectorStepData, _ float64, fPoints *types.FPointRingBuffer, _ *types.HPointRingBuffer, _ EmitAnnotationFunc) (float64, bool, *histogram.FloatHistogram, error) { @@ -145,8 +141,7 @@ func minOverTime(step types.RangeVectorStepData, _ float64, fPoints *types.FPoin } var SumOverTime = FunctionOverRangeVector{ - SeriesMetadataFunc: DropSeriesName, - NeedsSeriesDeduplication: true, + SeriesMetadataFunction: DropSeriesName, StepFunc: sumOverTime, NeedsSeriesNamesForAnnotations: true, } @@ -221,8 +216,7 @@ func sumHistograms(head, tail []promql.HPoint, emitAnnotation EmitAnnotationFunc } var AvgOverTime = FunctionOverRangeVector{ - SeriesMetadataFunc: DropSeriesName, - NeedsSeriesDeduplication: true, + SeriesMetadataFunction: DropSeriesName, StepFunc: avgOverTime, NeedsSeriesNamesForAnnotations: true, } diff --git a/pkg/streamingpromql/functions/rate_increase.go b/pkg/streamingpromql/functions/rate_increase.go index d5f8b26426..45a02aca27 100644 --- a/pkg/streamingpromql/functions/rate_increase.go +++ b/pkg/streamingpromql/functions/rate_increase.go @@ -18,17 +18,15 @@ import ( var Rate = FunctionOverRangeVector{ StepFunc: rate(true), SeriesValidationFuncFactory: rateSeriesValidator, - SeriesMetadataFunc: DropSeriesName, + SeriesMetadataFunction: DropSeriesName, NeedsSeriesNamesForAnnotations: true, - NeedsSeriesDeduplication: true, } var Increase = FunctionOverRangeVector{ StepFunc: rate(false), SeriesValidationFuncFactory: rateSeriesValidator, - SeriesMetadataFunc: DropSeriesName, + SeriesMetadataFunction: DropSeriesName, NeedsSeriesNamesForAnnotations: true, - NeedsSeriesDeduplication: true, } // isRate is true for `rate` function, or false for `instant` function diff --git a/pkg/streamingpromql/functions_test.go b/pkg/streamingpromql/functions_test.go index 265f90372f..626cea09f7 100644 --- a/pkg/streamingpromql/functions_test.go +++ b/pkg/streamingpromql/functions_test.go @@ -12,12 +12,12 @@ import ( func TestRegisterInstantVectorFunctionOperatorFactory(t *testing.T) { // Register an already existing function - err := RegisterInstantVectorFunctionOperatorFactory("acos", InstantVectorLabelManipulationFunctionOperatorFactory("acos", functions.DropSeriesName, true)) + err := RegisterInstantVectorFunctionOperatorFactory("acos", InstantVectorLabelManipulationFunctionOperatorFactory("acos", functions.DropSeriesName)) require.Error(t, err) require.Equal(t, "function 'acos' has already been registered", err.Error()) // Register a new function - newFunc := InstantVectorLabelManipulationFunctionOperatorFactory("new_function", functions.DropSeriesName, true) + newFunc := InstantVectorLabelManipulationFunctionOperatorFactory("new_function", functions.DropSeriesName) err = RegisterInstantVectorFunctionOperatorFactory("new_function", newFunc) require.NoError(t, err) require.Contains(t, instantVectorFunctionOperatorFactories, "new_function") diff --git a/pkg/streamingpromql/operators/function_over_instant_vector.go b/pkg/streamingpromql/operators/function_over_instant_vector.go index 116310380e..915cf23712 100644 --- a/pkg/streamingpromql/operators/function_over_instant_vector.go +++ b/pkg/streamingpromql/operators/function_over_instant_vector.go @@ -55,8 +55,8 @@ func (m *FunctionOverInstantVector) SeriesMetadata(ctx context.Context) ([]types return nil, err } - if m.Func.SeriesMetadataFunc != nil { - return m.Func.SeriesMetadataFunc(metadata, m.MemoryConsumptionTracker) + if m.Func.SeriesMetadataFunction.Func != nil { + return m.Func.SeriesMetadataFunction.Func(metadata, m.MemoryConsumptionTracker) } return metadata, nil diff --git a/pkg/streamingpromql/operators/function_over_instant_vector_test.go b/pkg/streamingpromql/operators/function_over_instant_vector_test.go index b28847efc2..ec143cc680 100644 --- a/pkg/streamingpromql/operators/function_over_instant_vector_test.go +++ b/pkg/streamingpromql/operators/function_over_instant_vector_test.go @@ -47,8 +47,10 @@ func TestFunctionOverInstantVector(t *testing.T) { Inner: inner, MemoryConsumptionTracker: limiting.NewMemoryConsumptionTracker(0, nil), Func: functions.FunctionOverInstantVector{ - SeriesMetadataFunc: mustBeCalledMetadata, - SeriesDataFunc: mustBeCalledSeriesData, + SeriesDataFunc: mustBeCalledSeriesData, + SeriesMetadataFunction: functions.SeriesMetadataFunctionDefinition{ + Func: mustBeCalledMetadata, + }, }, } diff --git a/pkg/streamingpromql/operators/function_over_range_vector.go b/pkg/streamingpromql/operators/function_over_range_vector.go index 9ebce607d4..b60c43a253 100644 --- a/pkg/streamingpromql/operators/function_over_range_vector.go +++ b/pkg/streamingpromql/operators/function_over_range_vector.go @@ -86,8 +86,8 @@ func (m *FunctionOverRangeVector) SeriesMetadata(ctx context.Context) ([]types.S m.numSteps = m.Inner.StepCount() m.rangeSeconds = m.Inner.Range().Seconds() - if m.Func.SeriesMetadataFunc != nil { - return m.Func.SeriesMetadataFunc(metadata, m.MemoryConsumptionTracker) + if m.Func.SeriesMetadataFunction.Func != nil { + return m.Func.SeriesMetadataFunction.Func(metadata, m.MemoryConsumptionTracker) } return metadata, nil diff --git a/pkg/streamingpromql/operators/vector_scalar_binary_operation.go b/pkg/streamingpromql/operators/vector_scalar_binary_operation.go index 806140dd79..368cf6ce8c 100644 --- a/pkg/streamingpromql/operators/vector_scalar_binary_operation.go +++ b/pkg/streamingpromql/operators/vector_scalar_binary_operation.go @@ -116,7 +116,7 @@ func (v *VectorScalarBinaryOperation) SeriesMetadata(ctx context.Context) ([]typ if !v.Op.IsComparisonOperator() || v.ReturnBool { // We don't need to do deduplication and merging of series in this operator: we expect that this operator // is wrapped in a DeduplicateAndMerge. - metadata, err = functions.DropSeriesName(metadata, v.MemoryConsumptionTracker) + metadata, err = functions.DropSeriesName.Func(metadata, v.MemoryConsumptionTracker) if err != nil { return nil, err }