From e28a520503d7b8914264f8adb6302951aa03a8de Mon Sep 17 00:00:00 2001 From: Joshua Hesketh Date: Thu, 5 Dec 2024 21:30:46 +1100 Subject: [PATCH] MQE: Add support for histogram_stddev function --- pkg/streamingpromql/engine_test.go | 1 + pkg/streamingpromql/functions.go | 1 + .../operators/functions/native_histograms.go | 61 +++++++++++++++++-- .../testdata/ours/native_histograms.test | 25 ++++++-- .../testdata/upstream/native_histograms.test | 40 +++++------- 5 files changed, 94 insertions(+), 34 deletions(-) diff --git a/pkg/streamingpromql/engine_test.go b/pkg/streamingpromql/engine_test.go index a987f0df4b0..d3e4ff2dfc8 100644 --- a/pkg/streamingpromql/engine_test.go +++ b/pkg/streamingpromql/engine_test.go @@ -2670,6 +2670,7 @@ func TestCompareVariousMixedMetricsFunctions(t *testing.T) { expressions = append(expressions, fmt.Sprintf(`histogram_fraction(scalar(series{label="i"}), scalar(series{label="i"}), series{label=~"(%s)"})`, labelRegex)) expressions = append(expressions, fmt.Sprintf(`histogram_quantile(0.8, series{label=~"(%s)"})`, labelRegex)) expressions = append(expressions, fmt.Sprintf(`histogram_quantile(scalar(series{label="i"}), series{label=~"(%s)"})`, labelRegex)) + expressions = append(expressions, fmt.Sprintf(`histogram_stddev(series{label=~"(%s)"})`, labelRegex)) expressions = append(expressions, fmt.Sprintf(`histogram_sum(series{label=~"(%s)"})`, labelRegex)) } diff --git a/pkg/streamingpromql/functions.go b/pkg/streamingpromql/functions.go index 95267bbf9fd..77ed149dd7c 100644 --- a/pkg/streamingpromql/functions.go +++ b/pkg/streamingpromql/functions.go @@ -371,6 +371,7 @@ var instantVectorFunctionOperatorFactories = map[string]InstantVectorFunctionOpe "histogram_count": InstantVectorTransformationFunctionOperatorFactory("histogram_count", functions.HistogramCount), "histogram_fraction": HistogramFractionFunctionOperatorFactory(), "histogram_quantile": HistogramQuantileFunctionOperatorFactory(), + "histogram_stddev": InstantVectorTransformationFunctionOperatorFactory("histogram_stddev", functions.HistogramStdDev), "histogram_sum": InstantVectorTransformationFunctionOperatorFactory("histogram_sum", functions.HistogramSum), "increase": FunctionOverRangeVectorOperatorFactory("increase", functions.Increase), "label_replace": LabelReplaceFunctionOperatorFactory(), diff --git a/pkg/streamingpromql/operators/functions/native_histograms.go b/pkg/streamingpromql/operators/functions/native_histograms.go index 46c1795e6ac..fe902acaec7 100644 --- a/pkg/streamingpromql/operators/functions/native_histograms.go +++ b/pkg/streamingpromql/operators/functions/native_histograms.go @@ -4,23 +4,25 @@ package functions import ( "errors" + "math" "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/util/annotations" + "github.com/grafana/mimir/pkg/streamingpromql/floats" "github.com/grafana/mimir/pkg/streamingpromql/limiting" "github.com/grafana/mimir/pkg/streamingpromql/types" ) func HistogramAvg(seriesData types.InstantVectorSeriesData, _ []types.ScalarData, _ types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, error) { - floats, err := types.FPointSlicePool.Get(len(seriesData.Histograms), memoryConsumptionTracker) + fPoints, err := types.FPointSlicePool.Get(len(seriesData.Histograms), memoryConsumptionTracker) if err != nil { return types.InstantVectorSeriesData{}, err } data := types.InstantVectorSeriesData{ - Floats: floats, + Floats: fPoints, } for _, histogram := range seriesData.Histograms { @@ -36,13 +38,13 @@ func HistogramAvg(seriesData types.InstantVectorSeriesData, _ []types.ScalarData } func HistogramCount(seriesData types.InstantVectorSeriesData, _ []types.ScalarData, _ types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, error) { - floats, err := types.FPointSlicePool.Get(len(seriesData.Histograms), memoryConsumptionTracker) + fPoints, err := types.FPointSlicePool.Get(len(seriesData.Histograms), memoryConsumptionTracker) if err != nil { return types.InstantVectorSeriesData{}, err } data := types.InstantVectorSeriesData{ - Floats: floats, + Floats: fPoints, } for _, histogram := range seriesData.Histograms { @@ -58,13 +60,13 @@ func HistogramCount(seriesData types.InstantVectorSeriesData, _ []types.ScalarDa } func HistogramFraction(seriesData types.InstantVectorSeriesData, scalarArgsData []types.ScalarData, timeRange types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, error) { - floats, err := types.FPointSlicePool.Get(len(seriesData.Histograms), memoryConsumptionTracker) + fPoints, err := types.FPointSlicePool.Get(len(seriesData.Histograms), memoryConsumptionTracker) if err != nil { return types.InstantVectorSeriesData{}, err } data := types.InstantVectorSeriesData{ - Floats: floats, + Floats: fPoints, } lower := scalarArgsData[0] @@ -87,6 +89,53 @@ func HistogramFraction(seriesData types.InstantVectorSeriesData, scalarArgsData return data, nil } +// HistogramStdDev returns the estimated standard deviation of observations in a native histogram +// Float values are ignored. +func HistogramStdDev(seriesData types.InstantVectorSeriesData, _ []types.ScalarData, _ types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, error) { + fPoints, err := types.FPointSlicePool.Get(len(seriesData.Histograms), memoryConsumptionTracker) + if err != nil { + return types.InstantVectorSeriesData{}, err + } + + data := types.InstantVectorSeriesData{ + Floats: fPoints, + } + + for _, histogram := range seriesData.Histograms { + mean := histogram.H.Sum / histogram.H.Count + var variance, cVariance float64 + it := histogram.H.AllBucketIterator() + for it.Next() { + bucket := it.At() + if bucket.Count == 0 { + continue + } + var val float64 + if bucket.Lower <= 0 && 0 <= bucket.Upper { + val = 0 + } else { + val = math.Sqrt(bucket.Upper * bucket.Lower) + if bucket.Upper < 0 { + val = -val + } + } + delta := val - mean + variance, cVariance = floats.KahanSumInc(bucket.Count*delta*delta, variance, cVariance) + } + variance += cVariance + variance /= histogram.H.Count + + data.Floats = append(data.Floats, promql.FPoint{ + T: histogram.T, + F: math.Sqrt(variance), + }) + } + + types.PutInstantVectorSeriesData(seriesData, memoryConsumptionTracker) + + return data, nil +} + func HistogramSum(seriesData types.InstantVectorSeriesData, _ []types.ScalarData, _ types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, error) { floats, err := types.FPointSlicePool.Get(len(seriesData.Histograms), memoryConsumptionTracker) if err != nil { diff --git a/pkg/streamingpromql/testdata/ours/native_histograms.test b/pkg/streamingpromql/testdata/ours/native_histograms.test index 8282fbc2e04..a71c5047902 100644 --- a/pkg/streamingpromql/testdata/ours/native_histograms.test +++ b/pkg/streamingpromql/testdata/ours/native_histograms.test @@ -14,18 +14,21 @@ eval range from 0 to 5m step 1m single_histogram {__name__="single_histogram"} {{schema:0 sum:5 count:4 buckets:[1 2 1]}} {{schema:0 sum:5 count:4 buckets:[1 2 1]}} {{schema:0 sum:5 count:4 buckets:[1 2 1]}} {{schema:0 sum:5 count:4 buckets:[1 2 1]}} {{schema:0 sum:5 count:4 buckets:[1 2 1]}} {{schema:0 sum:20 count:7 buckets:[9 10 1]}} # histogram_avg is sum / count. -eval range from 0 to 5m step 1m histogram_avg(single_histogram) +eval range from 0 to 5m step 1m histogram_avg(single_histogram) {} 1.25 1.25 1.25 1.25 1.25 2.857142857142857 # histogram_count extracts the count property from the histogram. -eval range from 0 to 5m step 1m histogram_count(single_histogram) +eval range from 0 to 5m step 1m histogram_count(single_histogram) {} 4 4 4 4 4 7 -eval range from 0 to 5m step 1m histogram_fraction(0, 2, single_histogram) +eval range from 0 to 5m step 1m histogram_fraction(0, 2, single_histogram) {} 0.75 0.75 0.75 0.75 0.75 1 +eval range from 0 to 5m step 1m histogram_stddev(single_histogram) + {} 0.842629429717281 0.842629429717281 0.842629429717281 0.842629429717281 0.842629429717281 2.986282214238901 + # histogram_sum extracts the sum property from the histogram. -eval range from 0 to 5m step 1m histogram_sum(single_histogram) +eval range from 0 to 5m step 1m histogram_sum(single_histogram) {} 5 5 5 5 5 20 clear @@ -46,6 +49,9 @@ eval instant at 3m histogram_count(mixed_metric) eval instant at 3m histogram_fraction(0, 1, mixed_metric) {} 0.25 +eval instant at 4m histogram_stddev(mixed_metric) + {} 0.6650352854715079 + eval instant at 4m histogram_sum(mixed_metric) {} 8 @@ -64,6 +70,9 @@ eval instant at 2m histogram_count(mixed_metric) # histogram_fraction ignores any float values eval instant at 2m histogram_fraction(0, 1, mixed_metric) +# histogram_stddev ignores any float values +eval instant at 2m histogram_stddev(mixed_metric) + # histogram_sum ignores any float values eval instant at 2m histogram_sum(mixed_metric) @@ -95,6 +104,11 @@ eval instant at 0 histogram_fraction(-10, 20, route) {path="two"} 1 {path="three"} 1 +eval instant at 0 histogram_stddev(route) + {path="one"} 0.842629429717281 + {path="two"} 0.8415900492770793 + {path="three"} 1.1865698706402301 + eval instant at 0 histogram_sum(route) {path="one"} 5 {path="two"} 10 @@ -132,6 +146,9 @@ eval range from 0 to 8m step 1m histogram_count(mixed_metric) eval range from 0 to 8m step 1m histogram_fraction(0, 1, mixed_metric) {} _ _ _ _ _ _ _ 0.3 0.3 +eval range from 0 to 8m step 1m histogram_stddev(mixed_metric) +{} _ _ _ _ _ _ _ 0.8574122997574659 0.8574122997574659 + eval range from 0 to 8m step 1m histogram_sum(mixed_metric) {} _ _ _ _ _ _ _ 18 18 diff --git a/pkg/streamingpromql/testdata/upstream/native_histograms.test b/pkg/streamingpromql/testdata/upstream/native_histograms.test index 93b946d863a..5be3e54a4e4 100644 --- a/pkg/streamingpromql/testdata/upstream/native_histograms.test +++ b/pkg/streamingpromql/testdata/upstream/native_histograms.test @@ -317,9 +317,8 @@ clear load 10m histogram_stddev_stdvar_1 {{schema:2 count:4 sum:10 buckets:[1 0 0 0 1 0 0 1 1]}}x1 -# Unsupported by streaming engine. -# eval instant at 10m histogram_stddev(histogram_stddev_stdvar_1) -# {} 1.0787993180043811 +eval instant at 10m histogram_stddev(histogram_stddev_stdvar_1) + {} 1.0787993180043811 # Unsupported by streaming engine. # eval instant at 10m histogram_stdvar(histogram_stddev_stdvar_1) @@ -331,9 +330,8 @@ clear load 10m histogram_stddev_stdvar_2 {{schema:8 count:10 sum:10 buckets:[1 2 3 4]}}x1 -# Unsupported by streaming engine. -# eval instant at 10m histogram_stddev(histogram_stddev_stdvar_2) -# {} 0.0048960313898237465 +eval instant at 10m histogram_stddev(histogram_stddev_stdvar_2) + {} 0.0048960313898237465 # Unsupported by streaming engine. # eval instant at 10m histogram_stdvar(histogram_stddev_stdvar_2) @@ -345,9 +343,8 @@ clear load 10m histogram_stddev_stdvar_3 {{schema:3 count:7 sum:62 z_bucket:1 buckets:[0 0 0 0 0 0 0 0 0 0 0 0 0 1 0 0 0 0 0 0 0 0 0 0 1 0 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 ] n_buckets:[0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 ]}}x1 -# Unsupported by streaming engine. -# eval instant at 10m histogram_stddev(histogram_stddev_stdvar_3) -# {} 42.947236400258 +eval instant at 10m histogram_stddev(histogram_stddev_stdvar_3) + {} 42.947236400258 # Unsupported by streaming engine. # eval instant at 10m histogram_stdvar(histogram_stddev_stdvar_3) @@ -359,9 +356,8 @@ clear load 10m histogram_stddev_stdvar_4 {{schema:0 count:10 sum:-112946 z_bucket:0 n_buckets:[0 0 1 1 1 0 1 1 0 0 3 0 0 0 1 0 0 1]}}x1 -# Unsupported by streaming engine. -# eval instant at 10m histogram_stddev(histogram_stddev_stdvar_4) -# {} 27556.344499842 +eval instant at 10m histogram_stddev(histogram_stddev_stdvar_4) + {} 27556.344499842 # Unsupported by streaming engine. # eval instant at 10m histogram_stdvar(histogram_stddev_stdvar_4) @@ -373,9 +369,8 @@ clear load 10m histogram_stddev_stdvar_5 {{schema:0 count:10 sum:-100 z_bucket:0 n_buckets:[0 0 0 0 10]}}x1 -# Unsupported by streaming engine. -# eval instant at 10m histogram_stddev(histogram_stddev_stdvar_5) -# {} 1.3137084989848 +eval instant at 10m histogram_stddev(histogram_stddev_stdvar_5) + {} 1.3137084989848 # Unsupported by streaming engine. # eval instant at 10m histogram_stdvar(histogram_stddev_stdvar_5) @@ -387,9 +382,8 @@ clear load 10m histogram_stddev_stdvar_6 {{schema:3 count:7 sum:NaN z_bucket:1 buckets:[0 0 0 0 0 0 0 0 0 0 0 0 0 1 0 0 0 0 0 0 0 0 0 0 1 0 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 ] n_buckets:[0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 ]}}x1 -# Unsupported by streaming engine. -# eval instant at 10m histogram_stddev(histogram_stddev_stdvar_6) -# {} NaN +eval instant at 10m histogram_stddev(histogram_stddev_stdvar_6) + {} NaN # Unsupported by streaming engine. # eval instant at 10m histogram_stdvar(histogram_stddev_stdvar_6) @@ -401,9 +395,8 @@ clear load 10m histogram_stddev_stdvar_7 {{schema:3 count:7 sum:Inf z_bucket:1 buckets:[0 0 0 0 0 0 0 0 0 0 0 0 0 1 0 0 0 0 0 0 0 0 0 0 1 0 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 ] n_buckets:[0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 ]}}x1 -# Unsupported by streaming engine. -# eval instant at 10m histogram_stddev(histogram_stddev_stdvar_7) -# {} Inf +eval instant at 10m histogram_stddev(histogram_stddev_stdvar_7) + {} Inf # Unsupported by streaming engine. # eval instant at 10m histogram_stdvar(histogram_stddev_stdvar_7) @@ -1105,9 +1098,8 @@ eval instant at 5m histogram_quantile(1.0, rate(const_histogram[5m])) {} NaN # Zero buckets mean no observations, so there is no standard deviation. -# Unsupported by streaming engine. -# eval instant at 5m histogram_stddev(rate(const_histogram[5m])) -# {} NaN +eval instant at 5m histogram_stddev(rate(const_histogram[5m])) + {} NaN # Zero buckets mean no observations, so there is no standard variance. # Unsupported by streaming engine.