Skip to content

Commit

Permalink
MQE: introduce series metadata function type (#9558)
Browse files Browse the repository at this point in the history
* MQE: introduce series metadata function type

* Add changelog entry
  • Loading branch information
charleskorn authored Oct 14, 2024
1 parent dd18bcb commit 7423f25
Show file tree
Hide file tree
Showing 11 changed files with 67 additions and 73 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* `cortex_alertmanager_state_replication_failed_total`
* `cortex_alertmanager_alerts`
* `cortex_alertmanager_silences`
* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #9367 #9368 #9398 #9399 #9403 #9417 #9418 #9419 #9420 #9482 #9504 #9505 #9507 #9518 #9531 #9532 #9533 #9553 #9588
* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #9367 #9368 #9398 #9399 #9403 #9417 #9418 #9419 #9420 #9482 #9504 #9505 #9507 #9518 #9531 #9532 #9533 #9553 #9558 #9588
* [FEATURE] Query-frontend: added experimental configuration options `query-frontend.cache-errors` and `query-frontend.results-cache-ttl-for-errors` to allow non-transient responses to be cached. When set to `true` error responses from hitting limits or bad data are cached for a short TTL. #9028
* [FEATURE] gRPC: Support S2 compression. #9322
* `-alertmanager.alertmanager-client.grpc-compression=s2`
Expand Down
30 changes: 14 additions & 16 deletions pkg/streamingpromql/functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func SingleInputVectorFunctionOperatorFactory(name string, f functions.FunctionO

var o types.InstantVectorOperator = operators.NewFunctionOverInstantVector(inner, memoryConsumptionTracker, f, expressionPosition)

if f.NeedsSeriesDeduplication {
if f.SeriesMetadataFunction.NeedsSeriesDeduplication {
o = operators.NewDeduplicateAndMerge(o, memoryConsumptionTracker)
}

Expand All @@ -67,9 +67,8 @@ func SingleInputVectorFunctionOperatorFactory(name string, f functions.FunctionO
// - seriesDataFunc: The function to handle series data
func InstantVectorTransformationFunctionOperatorFactory(name string, seriesDataFunc functions.InstantVectorSeriesFunction) InstantVectorFunctionOperatorFactory {
f := functions.FunctionOverInstantVector{
SeriesDataFunc: seriesDataFunc,
SeriesMetadataFunc: functions.DropSeriesName,
NeedsSeriesDeduplication: true,
SeriesDataFunc: seriesDataFunc,
SeriesMetadataFunction: functions.DropSeriesName,
}

return SingleInputVectorFunctionOperatorFactory(name, f)
Expand All @@ -83,12 +82,10 @@ func InstantVectorTransformationFunctionOperatorFactory(name string, seriesDataF
// Parameters:
// - name: The name of the function
// - metadataFunc: The function for handling metadata
// - needsSeriesDeduplication: Set to true if metadataFunc may produce multiple series with the same labels and therefore deduplication is required
func InstantVectorLabelManipulationFunctionOperatorFactory(name string, metadataFunc functions.SeriesMetadataFunction, needsSeriesDeduplication bool) InstantVectorFunctionOperatorFactory {
func InstantVectorLabelManipulationFunctionOperatorFactory(name string, metadataFunc functions.SeriesMetadataFunctionDefinition) InstantVectorFunctionOperatorFactory {
f := functions.FunctionOverInstantVector{
SeriesDataFunc: functions.PassthroughData,
SeriesMetadataFunc: metadataFunc,
NeedsSeriesDeduplication: needsSeriesDeduplication,
SeriesDataFunc: functions.PassthroughData,
SeriesMetadataFunction: metadataFunc,
}

return SingleInputVectorFunctionOperatorFactory(name, f)
Expand Down Expand Up @@ -118,7 +115,7 @@ func FunctionOverRangeVectorOperatorFactory(

var o types.InstantVectorOperator = operators.NewFunctionOverRangeVector(inner, memoryConsumptionTracker, f, annotations, expressionPosition)

if f.NeedsSeriesDeduplication {
if f.SeriesMetadataFunction.NeedsSeriesDeduplication {
o = operators.NewDeduplicateAndMerge(o, memoryConsumptionTracker)
}

Expand Down Expand Up @@ -179,9 +176,11 @@ func LabelReplaceFunctionOperatorFactory() InstantVectorFunctionOperatorFactory
}

f := functions.FunctionOverInstantVector{
SeriesDataFunc: functions.PassthroughData,
SeriesMetadataFunc: functions.LabelReplaceFactory(dstLabel, replacement, srcLabel, regex),
NeedsSeriesDeduplication: true,
SeriesDataFunc: functions.PassthroughData,
SeriesMetadataFunction: functions.SeriesMetadataFunctionDefinition{
Func: functions.LabelReplaceFactory(dstLabel, replacement, srcLabel, regex),
NeedsSeriesDeduplication: true,
},
}

o := operators.NewFunctionOverInstantVector(inner, memoryConsumptionTracker, f, expressionPosition)
Expand Down Expand Up @@ -283,9 +282,8 @@ func instantVectorToScalarOperatorFactory(args []types.Operator, memoryConsumpti

func unaryNegationOfInstantVectorOperatorFactory(inner types.InstantVectorOperator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, expressionPosition posrange.PositionRange) types.InstantVectorOperator {
f := functions.FunctionOverInstantVector{
SeriesDataFunc: functions.UnaryNegation,
SeriesMetadataFunc: functions.DropSeriesName,
NeedsSeriesDeduplication: true,
SeriesDataFunc: functions.UnaryNegation,
SeriesMetadataFunction: functions.DropSeriesName,
}

o := operators.NewFunctionOverInstantVector(inner, memoryConsumptionTracker, f, expressionPosition)
Expand Down
50 changes: 26 additions & 24 deletions pkg/streamingpromql/functions/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,16 @@ import (
// SeriesMetadataFunction is a function to operate on the metadata across series.
type SeriesMetadataFunction func(seriesMetadata []types.SeriesMetadata, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) ([]types.SeriesMetadata, error)

// DropSeriesName is a SeriesMetadataFunc that removes the __name__ label from all series in seriesMetadata.
//
// It does not check that the list of returned series is free of duplicates.
func DropSeriesName(seriesMetadata []types.SeriesMetadata, _ *limiting.MemoryConsumptionTracker) ([]types.SeriesMetadata, error) {
for i := range seriesMetadata {
seriesMetadata[i].Labels = seriesMetadata[i].Labels.DropMetricName()
}
// DropSeriesName is a series metadata function that removes the __name__ label from all series.
var DropSeriesName = SeriesMetadataFunctionDefinition{
Func: func(seriesMetadata []types.SeriesMetadata, _ *limiting.MemoryConsumptionTracker) ([]types.SeriesMetadata, error) {
for i := range seriesMetadata {
seriesMetadata[i].Labels = seriesMetadata[i].Labels.DropMetricName()
}

return seriesMetadata, nil
return seriesMetadata, nil
},
NeedsSeriesDeduplication: true,
}

// InstantVectorSeriesFunction is a function that takes in an instant vector and produces an instant vector.
Expand Down Expand Up @@ -93,16 +94,10 @@ type FunctionOverInstantVector struct {
// SeriesDataFunc is the function that computes an output series for a single input series.
SeriesDataFunc InstantVectorSeriesFunction

// SeriesMetadataFunc is the function that computes the output series for this function based on the given input series.
// SeriesMetadataFunction is the function that computes the output series for this function based on the given input series.
//
// If SeriesMetadataFunc is nil, the input series are used as-is.
SeriesMetadataFunc SeriesMetadataFunction

// NeedsSeriesDeduplication enables deduplication and merging of output series with the same labels.
//
// This should be set to true if SeriesMetadataFunc modifies the input series labels in such a way that duplicates may be
// present in the output series labels (eg. dropping a label).
NeedsSeriesDeduplication bool
// If SeriesMetadataFunction.Func is nil, the input series are used as-is.
SeriesMetadataFunction SeriesMetadataFunctionDefinition
}

type FunctionOverRangeVector struct {
Expand All @@ -120,17 +115,24 @@ type FunctionOverRangeVector struct {
// SeriesValidationFuncFactory can be nil, in which case no validation is performed.
SeriesValidationFuncFactory RangeVectorSeriesValidationFunctionFactory

// SeriesMetadataFunc is the function that computes the output series for this function based on the given input series.
// SeriesMetadataFunction is the function that computes the output series for this function based on the given input series.
//
// If SeriesMetadataFunc is nil, the input series are used as-is.
SeriesMetadataFunc SeriesMetadataFunction
// If SeriesMetadataFunction.Func is nil, the input series are used as-is.
SeriesMetadataFunction SeriesMetadataFunctionDefinition

// NeedsSeriesNamesForAnnotations indicates that this function uses the names of input series when emitting annotations.
NeedsSeriesNamesForAnnotations bool
}

type SeriesMetadataFunctionDefinition struct {
// Func is the function that computes the output series for this function based on the given input series.
//
// If Func is nil, the input series are used as-is.
Func SeriesMetadataFunction

// NeedsSeriesDeduplication enables deduplication and merging of output series with the same labels.
//
// This should be set to true if SeriesMetadataFunc modifies the input series labels in such a way that duplicates may be
// This should be set to true if Func modifies the input series labels in such a way that duplicates may be
// present in the output series labels (eg. dropping a label).
NeedsSeriesDeduplication bool

// NeedsSeriesNamesForAnnotations indicates that this function uses the names of input series when emitting annotations.
NeedsSeriesNamesForAnnotations bool
}
2 changes: 1 addition & 1 deletion pkg/streamingpromql/functions/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func TestDropSeriesName(t *testing.T) {
{Labels: labels.FromStrings("label2", "value2")},
}

modifiedMetadata, err := DropSeriesName(seriesMetadata, limiting.NewMemoryConsumptionTracker(0, nil))
modifiedMetadata, err := DropSeriesName.Func(seriesMetadata, limiting.NewMemoryConsumptionTracker(0, nil))
require.NoError(t, err)
require.Equal(t, expected, modifiedMetadata)
}
Expand Down
30 changes: 12 additions & 18 deletions pkg/streamingpromql/functions/range_vectors.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,8 @@ import (
)

var CountOverTime = FunctionOverRangeVector{
SeriesMetadataFunc: DropSeriesName,
NeedsSeriesDeduplication: true,
StepFunc: countOverTime,
SeriesMetadataFunction: DropSeriesName,
StepFunc: countOverTime,
}

func countOverTime(step types.RangeVectorStepData, _ float64, fPoints *types.FPointRingBuffer, hPoints *types.HPointRingBuffer, _ EmitAnnotationFunc) (float64, bool, *histogram.FloatHistogram, error) {
Expand All @@ -34,8 +33,8 @@ func countOverTime(step types.RangeVectorStepData, _ float64, fPoints *types.FPo
}

var LastOverTime = FunctionOverRangeVector{
SeriesMetadataFunc: nil, // We want to use the input series as-is.
StepFunc: lastOverTime,
// We want to use the input series as-is, so no need to set SeriesMetadataFunction.
StepFunc: lastOverTime,
}

func lastOverTime(step types.RangeVectorStepData, _ float64, fPoints *types.FPointRingBuffer, hPoints *types.HPointRingBuffer, _ EmitAnnotationFunc) (float64, bool, *histogram.FloatHistogram, error) {
Expand All @@ -55,9 +54,8 @@ func lastOverTime(step types.RangeVectorStepData, _ float64, fPoints *types.FPoi
}

var PresentOverTime = FunctionOverRangeVector{
SeriesMetadataFunc: DropSeriesName,
NeedsSeriesDeduplication: true,
StepFunc: presentOverTime,
SeriesMetadataFunction: DropSeriesName,
StepFunc: presentOverTime,
}

func presentOverTime(step types.RangeVectorStepData, _ float64, fPoints *types.FPointRingBuffer, hPoints *types.HPointRingBuffer, _ EmitAnnotationFunc) (float64, bool, *histogram.FloatHistogram, error) {
Expand All @@ -69,9 +67,8 @@ func presentOverTime(step types.RangeVectorStepData, _ float64, fPoints *types.F
}

var MaxOverTime = FunctionOverRangeVector{
SeriesMetadataFunc: DropSeriesName,
NeedsSeriesDeduplication: true,
StepFunc: maxOverTime,
SeriesMetadataFunction: DropSeriesName,
StepFunc: maxOverTime,
}

func maxOverTime(step types.RangeVectorStepData, _ float64, fPoints *types.FPointRingBuffer, _ *types.HPointRingBuffer, _ EmitAnnotationFunc) (float64, bool, *histogram.FloatHistogram, error) {
Expand Down Expand Up @@ -107,9 +104,8 @@ func maxOverTime(step types.RangeVectorStepData, _ float64, fPoints *types.FPoin
}

var MinOverTime = FunctionOverRangeVector{
SeriesMetadataFunc: DropSeriesName,
NeedsSeriesDeduplication: true,
StepFunc: minOverTime,
SeriesMetadataFunction: DropSeriesName,
StepFunc: minOverTime,
}

func minOverTime(step types.RangeVectorStepData, _ float64, fPoints *types.FPointRingBuffer, _ *types.HPointRingBuffer, _ EmitAnnotationFunc) (float64, bool, *histogram.FloatHistogram, error) {
Expand Down Expand Up @@ -145,8 +141,7 @@ func minOverTime(step types.RangeVectorStepData, _ float64, fPoints *types.FPoin
}

var SumOverTime = FunctionOverRangeVector{
SeriesMetadataFunc: DropSeriesName,
NeedsSeriesDeduplication: true,
SeriesMetadataFunction: DropSeriesName,
StepFunc: sumOverTime,
NeedsSeriesNamesForAnnotations: true,
}
Expand Down Expand Up @@ -221,8 +216,7 @@ func sumHistograms(head, tail []promql.HPoint, emitAnnotation EmitAnnotationFunc
}

var AvgOverTime = FunctionOverRangeVector{
SeriesMetadataFunc: DropSeriesName,
NeedsSeriesDeduplication: true,
SeriesMetadataFunction: DropSeriesName,
StepFunc: avgOverTime,
NeedsSeriesNamesForAnnotations: true,
}
Expand Down
6 changes: 2 additions & 4 deletions pkg/streamingpromql/functions/rate_increase.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,15 @@ import (
var Rate = FunctionOverRangeVector{
StepFunc: rate(true),
SeriesValidationFuncFactory: rateSeriesValidator,
SeriesMetadataFunc: DropSeriesName,
SeriesMetadataFunction: DropSeriesName,
NeedsSeriesNamesForAnnotations: true,
NeedsSeriesDeduplication: true,
}

var Increase = FunctionOverRangeVector{
StepFunc: rate(false),
SeriesValidationFuncFactory: rateSeriesValidator,
SeriesMetadataFunc: DropSeriesName,
SeriesMetadataFunction: DropSeriesName,
NeedsSeriesNamesForAnnotations: true,
NeedsSeriesDeduplication: true,
}

// isRate is true for `rate` function, or false for `instant` function
Expand Down
4 changes: 2 additions & 2 deletions pkg/streamingpromql/functions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@ import (

func TestRegisterInstantVectorFunctionOperatorFactory(t *testing.T) {
// Register an already existing function
err := RegisterInstantVectorFunctionOperatorFactory("acos", InstantVectorLabelManipulationFunctionOperatorFactory("acos", functions.DropSeriesName, true))
err := RegisterInstantVectorFunctionOperatorFactory("acos", InstantVectorLabelManipulationFunctionOperatorFactory("acos", functions.DropSeriesName))
require.Error(t, err)
require.Equal(t, "function 'acos' has already been registered", err.Error())

// Register a new function
newFunc := InstantVectorLabelManipulationFunctionOperatorFactory("new_function", functions.DropSeriesName, true)
newFunc := InstantVectorLabelManipulationFunctionOperatorFactory("new_function", functions.DropSeriesName)
err = RegisterInstantVectorFunctionOperatorFactory("new_function", newFunc)
require.NoError(t, err)
require.Contains(t, instantVectorFunctionOperatorFactories, "new_function")
Expand Down
4 changes: 2 additions & 2 deletions pkg/streamingpromql/operators/function_over_instant_vector.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ func (m *FunctionOverInstantVector) SeriesMetadata(ctx context.Context) ([]types
return nil, err
}

if m.Func.SeriesMetadataFunc != nil {
return m.Func.SeriesMetadataFunc(metadata, m.MemoryConsumptionTracker)
if m.Func.SeriesMetadataFunction.Func != nil {
return m.Func.SeriesMetadataFunction.Func(metadata, m.MemoryConsumptionTracker)
}

return metadata, nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,10 @@ func TestFunctionOverInstantVector(t *testing.T) {
Inner: inner,
MemoryConsumptionTracker: limiting.NewMemoryConsumptionTracker(0, nil),
Func: functions.FunctionOverInstantVector{
SeriesMetadataFunc: mustBeCalledMetadata,
SeriesDataFunc: mustBeCalledSeriesData,
SeriesDataFunc: mustBeCalledSeriesData,
SeriesMetadataFunction: functions.SeriesMetadataFunctionDefinition{
Func: mustBeCalledMetadata,
},
},
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/streamingpromql/operators/function_over_range_vector.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,8 @@ func (m *FunctionOverRangeVector) SeriesMetadata(ctx context.Context) ([]types.S
m.numSteps = m.Inner.StepCount()
m.rangeSeconds = m.Inner.Range().Seconds()

if m.Func.SeriesMetadataFunc != nil {
return m.Func.SeriesMetadataFunc(metadata, m.MemoryConsumptionTracker)
if m.Func.SeriesMetadataFunction.Func != nil {
return m.Func.SeriesMetadataFunction.Func(metadata, m.MemoryConsumptionTracker)
}

return metadata, nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func (v *VectorScalarBinaryOperation) SeriesMetadata(ctx context.Context) ([]typ
if !v.Op.IsComparisonOperator() || v.ReturnBool {
// We don't need to do deduplication and merging of series in this operator: we expect that this operator
// is wrapped in a DeduplicateAndMerge.
metadata, err = functions.DropSeriesName(metadata, v.MemoryConsumptionTracker)
metadata, err = functions.DropSeriesName.Func(metadata, v.MemoryConsumptionTracker)
if err != nil {
return nil, err
}
Expand Down

0 comments on commit 7423f25

Please sign in to comment.