Skip to content

Commit

Permalink
MQE: Add support for histogram_stddev function
Browse files Browse the repository at this point in the history
  • Loading branch information
jhesketh committed Dec 5, 2024
1 parent be2f23b commit e28a520
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 34 deletions.
1 change: 1 addition & 0 deletions pkg/streamingpromql/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2670,6 +2670,7 @@ func TestCompareVariousMixedMetricsFunctions(t *testing.T) {
expressions = append(expressions, fmt.Sprintf(`histogram_fraction(scalar(series{label="i"}), scalar(series{label="i"}), series{label=~"(%s)"})`, labelRegex))
expressions = append(expressions, fmt.Sprintf(`histogram_quantile(0.8, series{label=~"(%s)"})`, labelRegex))
expressions = append(expressions, fmt.Sprintf(`histogram_quantile(scalar(series{label="i"}), series{label=~"(%s)"})`, labelRegex))
expressions = append(expressions, fmt.Sprintf(`histogram_stddev(series{label=~"(%s)"})`, labelRegex))
expressions = append(expressions, fmt.Sprintf(`histogram_sum(series{label=~"(%s)"})`, labelRegex))
}

Expand Down
1 change: 1 addition & 0 deletions pkg/streamingpromql/functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,7 @@ var instantVectorFunctionOperatorFactories = map[string]InstantVectorFunctionOpe
"histogram_count": InstantVectorTransformationFunctionOperatorFactory("histogram_count", functions.HistogramCount),
"histogram_fraction": HistogramFractionFunctionOperatorFactory(),
"histogram_quantile": HistogramQuantileFunctionOperatorFactory(),
"histogram_stddev": InstantVectorTransformationFunctionOperatorFactory("histogram_stddev", functions.HistogramStdDev),
"histogram_sum": InstantVectorTransformationFunctionOperatorFactory("histogram_sum", functions.HistogramSum),
"increase": FunctionOverRangeVectorOperatorFactory("increase", functions.Increase),
"label_replace": LabelReplaceFunctionOperatorFactory(),
Expand Down
61 changes: 55 additions & 6 deletions pkg/streamingpromql/operators/functions/native_histograms.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,25 @@ package functions

import (
"errors"
"math"

"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/util/annotations"

"github.com/grafana/mimir/pkg/streamingpromql/floats"
"github.com/grafana/mimir/pkg/streamingpromql/limiting"
"github.com/grafana/mimir/pkg/streamingpromql/types"
)

func HistogramAvg(seriesData types.InstantVectorSeriesData, _ []types.ScalarData, _ types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, error) {
floats, err := types.FPointSlicePool.Get(len(seriesData.Histograms), memoryConsumptionTracker)
fPoints, err := types.FPointSlicePool.Get(len(seriesData.Histograms), memoryConsumptionTracker)
if err != nil {
return types.InstantVectorSeriesData{}, err
}

data := types.InstantVectorSeriesData{
Floats: floats,
Floats: fPoints,
}

for _, histogram := range seriesData.Histograms {
Expand All @@ -36,13 +38,13 @@ func HistogramAvg(seriesData types.InstantVectorSeriesData, _ []types.ScalarData
}

func HistogramCount(seriesData types.InstantVectorSeriesData, _ []types.ScalarData, _ types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, error) {
floats, err := types.FPointSlicePool.Get(len(seriesData.Histograms), memoryConsumptionTracker)
fPoints, err := types.FPointSlicePool.Get(len(seriesData.Histograms), memoryConsumptionTracker)
if err != nil {
return types.InstantVectorSeriesData{}, err
}

data := types.InstantVectorSeriesData{
Floats: floats,
Floats: fPoints,
}

for _, histogram := range seriesData.Histograms {
Expand All @@ -58,13 +60,13 @@ func HistogramCount(seriesData types.InstantVectorSeriesData, _ []types.ScalarDa
}

func HistogramFraction(seriesData types.InstantVectorSeriesData, scalarArgsData []types.ScalarData, timeRange types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, error) {
floats, err := types.FPointSlicePool.Get(len(seriesData.Histograms), memoryConsumptionTracker)
fPoints, err := types.FPointSlicePool.Get(len(seriesData.Histograms), memoryConsumptionTracker)
if err != nil {
return types.InstantVectorSeriesData{}, err
}

data := types.InstantVectorSeriesData{
Floats: floats,
Floats: fPoints,
}

lower := scalarArgsData[0]
Expand All @@ -87,6 +89,53 @@ func HistogramFraction(seriesData types.InstantVectorSeriesData, scalarArgsData
return data, nil
}

// HistogramStdDev returns the estimated standard deviation of observations in a native histogram
// Float values are ignored.
func HistogramStdDev(seriesData types.InstantVectorSeriesData, _ []types.ScalarData, _ types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, error) {
fPoints, err := types.FPointSlicePool.Get(len(seriesData.Histograms), memoryConsumptionTracker)
if err != nil {
return types.InstantVectorSeriesData{}, err
}

data := types.InstantVectorSeriesData{
Floats: fPoints,
}

for _, histogram := range seriesData.Histograms {
mean := histogram.H.Sum / histogram.H.Count
var variance, cVariance float64
it := histogram.H.AllBucketIterator()
for it.Next() {
bucket := it.At()
if bucket.Count == 0 {
continue
}
var val float64
if bucket.Lower <= 0 && 0 <= bucket.Upper {
val = 0
} else {
val = math.Sqrt(bucket.Upper * bucket.Lower)
if bucket.Upper < 0 {
val = -val
}
}
delta := val - mean
variance, cVariance = floats.KahanSumInc(bucket.Count*delta*delta, variance, cVariance)
}
variance += cVariance
variance /= histogram.H.Count

data.Floats = append(data.Floats, promql.FPoint{
T: histogram.T,
F: math.Sqrt(variance),
})
}

types.PutInstantVectorSeriesData(seriesData, memoryConsumptionTracker)

return data, nil
}

func HistogramSum(seriesData types.InstantVectorSeriesData, _ []types.ScalarData, _ types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, error) {
floats, err := types.FPointSlicePool.Get(len(seriesData.Histograms), memoryConsumptionTracker)
if err != nil {
Expand Down
25 changes: 21 additions & 4 deletions pkg/streamingpromql/testdata/ours/native_histograms.test
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,21 @@ eval range from 0 to 5m step 1m single_histogram
{__name__="single_histogram"} {{schema:0 sum:5 count:4 buckets:[1 2 1]}} {{schema:0 sum:5 count:4 buckets:[1 2 1]}} {{schema:0 sum:5 count:4 buckets:[1 2 1]}} {{schema:0 sum:5 count:4 buckets:[1 2 1]}} {{schema:0 sum:5 count:4 buckets:[1 2 1]}} {{schema:0 sum:20 count:7 buckets:[9 10 1]}}

# histogram_avg is sum / count.
eval range from 0 to 5m step 1m histogram_avg(single_histogram)
eval range from 0 to 5m step 1m histogram_avg(single_histogram)
{} 1.25 1.25 1.25 1.25 1.25 2.857142857142857

# histogram_count extracts the count property from the histogram.
eval range from 0 to 5m step 1m histogram_count(single_histogram)
eval range from 0 to 5m step 1m histogram_count(single_histogram)
{} 4 4 4 4 4 7

eval range from 0 to 5m step 1m histogram_fraction(0, 2, single_histogram)
eval range from 0 to 5m step 1m histogram_fraction(0, 2, single_histogram)
{} 0.75 0.75 0.75 0.75 0.75 1

eval range from 0 to 5m step 1m histogram_stddev(single_histogram)
{} 0.842629429717281 0.842629429717281 0.842629429717281 0.842629429717281 0.842629429717281 2.986282214238901

# histogram_sum extracts the sum property from the histogram.
eval range from 0 to 5m step 1m histogram_sum(single_histogram)
eval range from 0 to 5m step 1m histogram_sum(single_histogram)
{} 5 5 5 5 5 20

clear
Expand All @@ -46,6 +49,9 @@ eval instant at 3m histogram_count(mixed_metric)
eval instant at 3m histogram_fraction(0, 1, mixed_metric)
{} 0.25

eval instant at 4m histogram_stddev(mixed_metric)
{} 0.6650352854715079

eval instant at 4m histogram_sum(mixed_metric)
{} 8

Expand All @@ -64,6 +70,9 @@ eval instant at 2m histogram_count(mixed_metric)
# histogram_fraction ignores any float values
eval instant at 2m histogram_fraction(0, 1, mixed_metric)

# histogram_stddev ignores any float values
eval instant at 2m histogram_stddev(mixed_metric)

# histogram_sum ignores any float values
eval instant at 2m histogram_sum(mixed_metric)

Expand Down Expand Up @@ -95,6 +104,11 @@ eval instant at 0 histogram_fraction(-10, 20, route)
{path="two"} 1
{path="three"} 1

eval instant at 0 histogram_stddev(route)
{path="one"} 0.842629429717281
{path="two"} 0.8415900492770793
{path="three"} 1.1865698706402301

eval instant at 0 histogram_sum(route)
{path="one"} 5
{path="two"} 10
Expand Down Expand Up @@ -132,6 +146,9 @@ eval range from 0 to 8m step 1m histogram_count(mixed_metric)
eval range from 0 to 8m step 1m histogram_fraction(0, 1, mixed_metric)
{} _ _ _ _ _ _ _ 0.3 0.3

eval range from 0 to 8m step 1m histogram_stddev(mixed_metric)
{} _ _ _ _ _ _ _ 0.8574122997574659 0.8574122997574659

eval range from 0 to 8m step 1m histogram_sum(mixed_metric)
{} _ _ _ _ _ _ _ 18 18

Expand Down
40 changes: 16 additions & 24 deletions pkg/streamingpromql/testdata/upstream/native_histograms.test
Original file line number Diff line number Diff line change
Expand Up @@ -317,9 +317,8 @@ clear
load 10m
histogram_stddev_stdvar_1 {{schema:2 count:4 sum:10 buckets:[1 0 0 0 1 0 0 1 1]}}x1

# Unsupported by streaming engine.
# eval instant at 10m histogram_stddev(histogram_stddev_stdvar_1)
# {} 1.0787993180043811
eval instant at 10m histogram_stddev(histogram_stddev_stdvar_1)
{} 1.0787993180043811

# Unsupported by streaming engine.
# eval instant at 10m histogram_stdvar(histogram_stddev_stdvar_1)
Expand All @@ -331,9 +330,8 @@ clear
load 10m
histogram_stddev_stdvar_2 {{schema:8 count:10 sum:10 buckets:[1 2 3 4]}}x1

# Unsupported by streaming engine.
# eval instant at 10m histogram_stddev(histogram_stddev_stdvar_2)
# {} 0.0048960313898237465
eval instant at 10m histogram_stddev(histogram_stddev_stdvar_2)
{} 0.0048960313898237465

# Unsupported by streaming engine.
# eval instant at 10m histogram_stdvar(histogram_stddev_stdvar_2)
Expand All @@ -345,9 +343,8 @@ clear
load 10m
histogram_stddev_stdvar_3 {{schema:3 count:7 sum:62 z_bucket:1 buckets:[0 0 0 0 0 0 0 0 0 0 0 0 0 1 0 0 0 0 0 0 0 0 0 0 1 0 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 ] n_buckets:[0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 ]}}x1

# Unsupported by streaming engine.
# eval instant at 10m histogram_stddev(histogram_stddev_stdvar_3)
# {} 42.947236400258
eval instant at 10m histogram_stddev(histogram_stddev_stdvar_3)
{} 42.947236400258

# Unsupported by streaming engine.
# eval instant at 10m histogram_stdvar(histogram_stddev_stdvar_3)
Expand All @@ -359,9 +356,8 @@ clear
load 10m
histogram_stddev_stdvar_4 {{schema:0 count:10 sum:-112946 z_bucket:0 n_buckets:[0 0 1 1 1 0 1 1 0 0 3 0 0 0 1 0 0 1]}}x1

# Unsupported by streaming engine.
# eval instant at 10m histogram_stddev(histogram_stddev_stdvar_4)
# {} 27556.344499842
eval instant at 10m histogram_stddev(histogram_stddev_stdvar_4)
{} 27556.344499842

# Unsupported by streaming engine.
# eval instant at 10m histogram_stdvar(histogram_stddev_stdvar_4)
Expand All @@ -373,9 +369,8 @@ clear
load 10m
histogram_stddev_stdvar_5 {{schema:0 count:10 sum:-100 z_bucket:0 n_buckets:[0 0 0 0 10]}}x1

# Unsupported by streaming engine.
# eval instant at 10m histogram_stddev(histogram_stddev_stdvar_5)
# {} 1.3137084989848
eval instant at 10m histogram_stddev(histogram_stddev_stdvar_5)
{} 1.3137084989848

# Unsupported by streaming engine.
# eval instant at 10m histogram_stdvar(histogram_stddev_stdvar_5)
Expand All @@ -387,9 +382,8 @@ clear
load 10m
histogram_stddev_stdvar_6 {{schema:3 count:7 sum:NaN z_bucket:1 buckets:[0 0 0 0 0 0 0 0 0 0 0 0 0 1 0 0 0 0 0 0 0 0 0 0 1 0 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 ] n_buckets:[0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 ]}}x1

# Unsupported by streaming engine.
# eval instant at 10m histogram_stddev(histogram_stddev_stdvar_6)
# {} NaN
eval instant at 10m histogram_stddev(histogram_stddev_stdvar_6)
{} NaN

# Unsupported by streaming engine.
# eval instant at 10m histogram_stdvar(histogram_stddev_stdvar_6)
Expand All @@ -401,9 +395,8 @@ clear
load 10m
histogram_stddev_stdvar_7 {{schema:3 count:7 sum:Inf z_bucket:1 buckets:[0 0 0 0 0 0 0 0 0 0 0 0 0 1 0 0 0 0 0 0 0 0 0 0 1 0 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 ] n_buckets:[0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 ]}}x1

# Unsupported by streaming engine.
# eval instant at 10m histogram_stddev(histogram_stddev_stdvar_7)
# {} Inf
eval instant at 10m histogram_stddev(histogram_stddev_stdvar_7)
{} Inf

# Unsupported by streaming engine.
# eval instant at 10m histogram_stdvar(histogram_stddev_stdvar_7)
Expand Down Expand Up @@ -1105,9 +1098,8 @@ eval instant at 5m histogram_quantile(1.0, rate(const_histogram[5m]))
{} NaN

# Zero buckets mean no observations, so there is no standard deviation.
# Unsupported by streaming engine.
# eval instant at 5m histogram_stddev(rate(const_histogram[5m]))
# {} NaN
eval instant at 5m histogram_stddev(rate(const_histogram[5m]))
{} NaN

# Zero buckets mean no observations, so there is no standard variance.
# Unsupported by streaming engine.
Expand Down

0 comments on commit e28a520

Please sign in to comment.