Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MQE: Add support for histogram_stddev function #10139

Merged
merged 1 commit into from
Dec 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/streamingpromql/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2670,6 +2670,7 @@ func TestCompareVariousMixedMetricsFunctions(t *testing.T) {
expressions = append(expressions, fmt.Sprintf(`histogram_fraction(scalar(series{label="i"}), scalar(series{label="i"}), series{label=~"(%s)"})`, labelRegex))
expressions = append(expressions, fmt.Sprintf(`histogram_quantile(0.8, series{label=~"(%s)"})`, labelRegex))
expressions = append(expressions, fmt.Sprintf(`histogram_quantile(scalar(series{label="i"}), series{label=~"(%s)"})`, labelRegex))
expressions = append(expressions, fmt.Sprintf(`histogram_stddev(series{label=~"(%s)"})`, labelRegex))
expressions = append(expressions, fmt.Sprintf(`histogram_sum(series{label=~"(%s)"})`, labelRegex))
}

Expand Down
1 change: 1 addition & 0 deletions pkg/streamingpromql/functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,7 @@ var instantVectorFunctionOperatorFactories = map[string]InstantVectorFunctionOpe
"histogram_count": InstantVectorTransformationFunctionOperatorFactory("histogram_count", functions.HistogramCount),
"histogram_fraction": HistogramFractionFunctionOperatorFactory(),
"histogram_quantile": HistogramQuantileFunctionOperatorFactory(),
"histogram_stddev": InstantVectorTransformationFunctionOperatorFactory("histogram_stddev", functions.HistogramStdDev),
"histogram_sum": InstantVectorTransformationFunctionOperatorFactory("histogram_sum", functions.HistogramSum),
"increase": FunctionOverRangeVectorOperatorFactory("increase", functions.Increase),
"label_replace": LabelReplaceFunctionOperatorFactory(),
Expand Down
61 changes: 55 additions & 6 deletions pkg/streamingpromql/operators/functions/native_histograms.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,25 @@ package functions

import (
"errors"
"math"

"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/util/annotations"

"github.com/grafana/mimir/pkg/streamingpromql/floats"
"github.com/grafana/mimir/pkg/streamingpromql/limiting"
"github.com/grafana/mimir/pkg/streamingpromql/types"
)

func HistogramAvg(seriesData types.InstantVectorSeriesData, _ []types.ScalarData, _ types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, error) {
floats, err := types.FPointSlicePool.Get(len(seriesData.Histograms), memoryConsumptionTracker)
fPoints, err := types.FPointSlicePool.Get(len(seriesData.Histograms), memoryConsumptionTracker)
if err != nil {
return types.InstantVectorSeriesData{}, err
}

data := types.InstantVectorSeriesData{
Floats: floats,
Floats: fPoints,
}

for _, histogram := range seriesData.Histograms {
Expand All @@ -36,13 +38,13 @@ func HistogramAvg(seriesData types.InstantVectorSeriesData, _ []types.ScalarData
}

func HistogramCount(seriesData types.InstantVectorSeriesData, _ []types.ScalarData, _ types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, error) {
floats, err := types.FPointSlicePool.Get(len(seriesData.Histograms), memoryConsumptionTracker)
fPoints, err := types.FPointSlicePool.Get(len(seriesData.Histograms), memoryConsumptionTracker)
if err != nil {
return types.InstantVectorSeriesData{}, err
}

data := types.InstantVectorSeriesData{
Floats: floats,
Floats: fPoints,
}

for _, histogram := range seriesData.Histograms {
Expand All @@ -58,13 +60,13 @@ func HistogramCount(seriesData types.InstantVectorSeriesData, _ []types.ScalarDa
}

func HistogramFraction(seriesData types.InstantVectorSeriesData, scalarArgsData []types.ScalarData, timeRange types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, error) {
floats, err := types.FPointSlicePool.Get(len(seriesData.Histograms), memoryConsumptionTracker)
fPoints, err := types.FPointSlicePool.Get(len(seriesData.Histograms), memoryConsumptionTracker)
if err != nil {
return types.InstantVectorSeriesData{}, err
}

data := types.InstantVectorSeriesData{
Floats: floats,
Floats: fPoints,
}

lower := scalarArgsData[0]
Expand All @@ -87,6 +89,53 @@ func HistogramFraction(seriesData types.InstantVectorSeriesData, scalarArgsData
return data, nil
}

// HistogramStdDev returns the estimated standard deviation of observations in a native histogram
// Float values are ignored.
func HistogramStdDev(seriesData types.InstantVectorSeriesData, _ []types.ScalarData, _ types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, error) {
fPoints, err := types.FPointSlicePool.Get(len(seriesData.Histograms), memoryConsumptionTracker)
if err != nil {
return types.InstantVectorSeriesData{}, err
}

data := types.InstantVectorSeriesData{
Floats: fPoints,
}

for _, histogram := range seriesData.Histograms {
mean := histogram.H.Sum / histogram.H.Count
var variance, cVariance float64
it := histogram.H.AllBucketIterator()
for it.Next() {
bucket := it.At()
if bucket.Count == 0 {
continue
}
var val float64
if bucket.Lower <= 0 && 0 <= bucket.Upper {
val = 0
} else {
val = math.Sqrt(bucket.Upper * bucket.Lower)
if bucket.Upper < 0 {
val = -val
}
}
delta := val - mean
variance, cVariance = floats.KahanSumInc(bucket.Count*delta*delta, variance, cVariance)
}
variance += cVariance
variance /= histogram.H.Count

data.Floats = append(data.Floats, promql.FPoint{
T: histogram.T,
F: math.Sqrt(variance),
})
}

types.PutInstantVectorSeriesData(seriesData, memoryConsumptionTracker)

return data, nil
}

func HistogramSum(seriesData types.InstantVectorSeriesData, _ []types.ScalarData, _ types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, error) {
floats, err := types.FPointSlicePool.Get(len(seriesData.Histograms), memoryConsumptionTracker)
if err != nil {
Expand Down
25 changes: 21 additions & 4 deletions pkg/streamingpromql/testdata/ours/native_histograms.test
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,21 @@ eval range from 0 to 5m step 1m single_histogram
{__name__="single_histogram"} {{schema:0 sum:5 count:4 buckets:[1 2 1]}} {{schema:0 sum:5 count:4 buckets:[1 2 1]}} {{schema:0 sum:5 count:4 buckets:[1 2 1]}} {{schema:0 sum:5 count:4 buckets:[1 2 1]}} {{schema:0 sum:5 count:4 buckets:[1 2 1]}} {{schema:0 sum:20 count:7 buckets:[9 10 1]}}

# histogram_avg is sum / count.
eval range from 0 to 5m step 1m histogram_avg(single_histogram)
eval range from 0 to 5m step 1m histogram_avg(single_histogram)
{} 1.25 1.25 1.25 1.25 1.25 2.857142857142857

# histogram_count extracts the count property from the histogram.
eval range from 0 to 5m step 1m histogram_count(single_histogram)
eval range from 0 to 5m step 1m histogram_count(single_histogram)
{} 4 4 4 4 4 7

eval range from 0 to 5m step 1m histogram_fraction(0, 2, single_histogram)
eval range from 0 to 5m step 1m histogram_fraction(0, 2, single_histogram)
{} 0.75 0.75 0.75 0.75 0.75 1

eval range from 0 to 5m step 1m histogram_stddev(single_histogram)
{} 0.842629429717281 0.842629429717281 0.842629429717281 0.842629429717281 0.842629429717281 2.986282214238901

# histogram_sum extracts the sum property from the histogram.
eval range from 0 to 5m step 1m histogram_sum(single_histogram)
eval range from 0 to 5m step 1m histogram_sum(single_histogram)
{} 5 5 5 5 5 20

clear
Expand All @@ -46,6 +49,9 @@ eval instant at 3m histogram_count(mixed_metric)
eval instant at 3m histogram_fraction(0, 1, mixed_metric)
{} 0.25

eval instant at 4m histogram_stddev(mixed_metric)
{} 0.6650352854715079

eval instant at 4m histogram_sum(mixed_metric)
{} 8

Expand All @@ -64,6 +70,9 @@ eval instant at 2m histogram_count(mixed_metric)
# histogram_fraction ignores any float values
eval instant at 2m histogram_fraction(0, 1, mixed_metric)

# histogram_stddev ignores any float values
eval instant at 2m histogram_stddev(mixed_metric)

# histogram_sum ignores any float values
eval instant at 2m histogram_sum(mixed_metric)

Expand Down Expand Up @@ -95,6 +104,11 @@ eval instant at 0 histogram_fraction(-10, 20, route)
{path="two"} 1
{path="three"} 1

eval instant at 0 histogram_stddev(route)
{path="one"} 0.842629429717281
{path="two"} 0.8415900492770793
{path="three"} 1.1865698706402301

eval instant at 0 histogram_sum(route)
{path="one"} 5
{path="two"} 10
Expand Down Expand Up @@ -132,6 +146,9 @@ eval range from 0 to 8m step 1m histogram_count(mixed_metric)
eval range from 0 to 8m step 1m histogram_fraction(0, 1, mixed_metric)
{} _ _ _ _ _ _ _ 0.3 0.3

eval range from 0 to 8m step 1m histogram_stddev(mixed_metric)
{} _ _ _ _ _ _ _ 0.8574122997574659 0.8574122997574659

eval range from 0 to 8m step 1m histogram_sum(mixed_metric)
{} _ _ _ _ _ _ _ 18 18

Expand Down
40 changes: 16 additions & 24 deletions pkg/streamingpromql/testdata/upstream/native_histograms.test
Original file line number Diff line number Diff line change
Expand Up @@ -317,9 +317,8 @@ clear
load 10m
histogram_stddev_stdvar_1 {{schema:2 count:4 sum:10 buckets:[1 0 0 0 1 0 0 1 1]}}x1

# Unsupported by streaming engine.
# eval instant at 10m histogram_stddev(histogram_stddev_stdvar_1)
# {} 1.0787993180043811
eval instant at 10m histogram_stddev(histogram_stddev_stdvar_1)
{} 1.0787993180043811

# Unsupported by streaming engine.
# eval instant at 10m histogram_stdvar(histogram_stddev_stdvar_1)
Expand All @@ -331,9 +330,8 @@ clear
load 10m
histogram_stddev_stdvar_2 {{schema:8 count:10 sum:10 buckets:[1 2 3 4]}}x1

# Unsupported by streaming engine.
# eval instant at 10m histogram_stddev(histogram_stddev_stdvar_2)
# {} 0.0048960313898237465
eval instant at 10m histogram_stddev(histogram_stddev_stdvar_2)
{} 0.0048960313898237465

# Unsupported by streaming engine.
# eval instant at 10m histogram_stdvar(histogram_stddev_stdvar_2)
Expand All @@ -345,9 +343,8 @@ clear
load 10m
histogram_stddev_stdvar_3 {{schema:3 count:7 sum:62 z_bucket:1 buckets:[0 0 0 0 0 0 0 0 0 0 0 0 0 1 0 0 0 0 0 0 0 0 0 0 1 0 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 ] n_buckets:[0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 ]}}x1

# Unsupported by streaming engine.
# eval instant at 10m histogram_stddev(histogram_stddev_stdvar_3)
# {} 42.947236400258
eval instant at 10m histogram_stddev(histogram_stddev_stdvar_3)
{} 42.947236400258

# Unsupported by streaming engine.
# eval instant at 10m histogram_stdvar(histogram_stddev_stdvar_3)
Expand All @@ -359,9 +356,8 @@ clear
load 10m
histogram_stddev_stdvar_4 {{schema:0 count:10 sum:-112946 z_bucket:0 n_buckets:[0 0 1 1 1 0 1 1 0 0 3 0 0 0 1 0 0 1]}}x1

# Unsupported by streaming engine.
# eval instant at 10m histogram_stddev(histogram_stddev_stdvar_4)
# {} 27556.344499842
eval instant at 10m histogram_stddev(histogram_stddev_stdvar_4)
{} 27556.344499842

# Unsupported by streaming engine.
# eval instant at 10m histogram_stdvar(histogram_stddev_stdvar_4)
Expand All @@ -373,9 +369,8 @@ clear
load 10m
histogram_stddev_stdvar_5 {{schema:0 count:10 sum:-100 z_bucket:0 n_buckets:[0 0 0 0 10]}}x1

# Unsupported by streaming engine.
# eval instant at 10m histogram_stddev(histogram_stddev_stdvar_5)
# {} 1.3137084989848
eval instant at 10m histogram_stddev(histogram_stddev_stdvar_5)
{} 1.3137084989848

# Unsupported by streaming engine.
# eval instant at 10m histogram_stdvar(histogram_stddev_stdvar_5)
Expand All @@ -387,9 +382,8 @@ clear
load 10m
histogram_stddev_stdvar_6 {{schema:3 count:7 sum:NaN z_bucket:1 buckets:[0 0 0 0 0 0 0 0 0 0 0 0 0 1 0 0 0 0 0 0 0 0 0 0 1 0 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 ] n_buckets:[0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 ]}}x1

# Unsupported by streaming engine.
# eval instant at 10m histogram_stddev(histogram_stddev_stdvar_6)
# {} NaN
eval instant at 10m histogram_stddev(histogram_stddev_stdvar_6)
{} NaN

# Unsupported by streaming engine.
# eval instant at 10m histogram_stdvar(histogram_stddev_stdvar_6)
Expand All @@ -401,9 +395,8 @@ clear
load 10m
histogram_stddev_stdvar_7 {{schema:3 count:7 sum:Inf z_bucket:1 buckets:[0 0 0 0 0 0 0 0 0 0 0 0 0 1 0 0 0 0 0 0 0 0 0 0 1 0 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 ] n_buckets:[0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 ]}}x1

# Unsupported by streaming engine.
# eval instant at 10m histogram_stddev(histogram_stddev_stdvar_7)
# {} Inf
eval instant at 10m histogram_stddev(histogram_stddev_stdvar_7)
{} Inf

# Unsupported by streaming engine.
# eval instant at 10m histogram_stdvar(histogram_stddev_stdvar_7)
Expand Down Expand Up @@ -1105,9 +1098,8 @@ eval instant at 5m histogram_quantile(1.0, rate(const_histogram[5m]))
{} NaN

# Zero buckets mean no observations, so there is no standard deviation.
# Unsupported by streaming engine.
# eval instant at 5m histogram_stddev(rate(const_histogram[5m]))
# {} NaN
eval instant at 5m histogram_stddev(rate(const_histogram[5m]))
{} NaN

# Zero buckets mean no observations, so there is no standard variance.
# Unsupported by streaming engine.
Expand Down
Loading