Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streaming PromQL engine: native histograms #8121

Merged
merged 31 commits into from
May 29, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
a021ff7
Streaming PromQL engine: native histograms
jhesketh May 13, 2024
3ce17f1
Merge branch 'main' into jhesketh/promql
jhesketh May 14, 2024
1970ce3
Use seriesPerResultBucketFactor for histogram pool
jhesketh May 14, 2024
a75e358
Don't defer returning slices
jhesketh May 14, 2024
0b8a160
Refactor slice returning logic
jhesketh May 15, 2024
5c7a2cd
Move returnSlices helper
jhesketh May 15, 2024
235a402
rename val for consistency
jhesketh May 15, 2024
72493bf
Update comment from feedback
jhesketh May 21, 2024
3ea8e20
Use returnSeriesDataSlices where missed
jhesketh May 21, 2024
885adfa
Comment out unsupported tests for now
jhesketh May 21, 2024
84be678
Enable upstream native_histogram test file
jhesketh May 21, 2024
d67498b
Remove duplicate test
jhesketh May 21, 2024
4cf927e
Merge branch 'main' into jhesketh/promql
jhesketh May 21, 2024
229b30a
Fix FloatHistogram selector
jhesketh May 24, 2024
7438138
Add range test
jhesketh May 24, 2024
999d860
Add test for mixed floats+histograms
jhesketh May 24, 2024
4d50e3c
Re-enable supported upstream tests
jhesketh May 24, 2024
91ba5c1
Load benchmark samples in batches to avoid OOM'ing
jhesketh May 27, 2024
aeb1b53
Revert "Load benchmark samples in batches to avoid OOM'ing"
jhesketh May 27, 2024
7b79add
Batch by samples instead of series
jhesketh May 27, 2024
1a38e40
Generate native histogram benchmark test data
jhesketh May 27, 2024
f8ae514
Add native histogram benchmark test
jhesketh May 27, 2024
063ed83
Move compare.sh next to other benchmark scripts
jhesketh May 27, 2024
0cff32b
Fix comparison tests for when there are no floats
jhesketh May 27, 2024
b9adee0
Add extra samples to the histogram tests
jhesketh May 27, 2024
7fbde86
Add note on batch size
jhesketh May 29, 2024
14ff8bd
Add quick check that histograms are loaded
jhesketh May 29, 2024
a08733c
Use clearer variables for indexes
jhesketh May 29, 2024
e94b596
Update changelog
jhesketh May 29, 2024
aa095fb
Merge branch 'main' into jhesketh/promql
jhesketh May 29, 2024
e8e5b24
Fix typo
jhesketh May 29, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 24 additions & 12 deletions pkg/streamingpromql/operator/instant_vector_selector.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ package operator

import (
"context"
"errors"
"fmt"

"github.com/prometheus/prometheus/model/histogram"
Expand Down Expand Up @@ -48,13 +47,11 @@ func (v *InstantVectorSelector) NextSeries(_ context.Context) (InstantVectorSeri

v.memoizedIterator.Reset(v.chunkIterator)

data := InstantVectorSeriesData{
Floats: GetFPointSlice(v.numSteps), // TODO: only allocate this if we have any floats (once we support native histograms)
}
data := InstantVectorSeriesData{}

for stepT := v.Selector.Start; stepT <= v.Selector.End; stepT += v.Selector.Interval {
var t int64
jhesketh marked this conversation as resolved.
Show resolved Hide resolved
var val float64
var f float64
var h *histogram.FloatHistogram

ts := stepT
Expand All @@ -70,26 +67,41 @@ func (v *InstantVectorSelector) NextSeries(_ context.Context) (InstantVectorSeri
return InstantVectorSeriesData{}, v.memoizedIterator.Err()
}
case chunkenc.ValFloat:
t, val = v.memoizedIterator.At()
t, f = v.memoizedIterator.At()
case chunkenc.ValHistogram:
t, h = v.memoizedIterator.AtFloatHistogram()
default:
return InstantVectorSeriesData{}, fmt.Errorf("streaming PromQL engine: unknown value type %s", valueType.String())
}

if valueType == chunkenc.ValNone || t > ts {
var ok bool
t, val, h, ok = v.memoizedIterator.PeekPrev()
if h != nil {
return InstantVectorSeriesData{}, errors.New("streaming PromQL engine doesn't support histograms yet")
}
t, f, h, ok = v.memoizedIterator.PeekPrev()
if !ok || t < ts-v.Selector.LookbackDelta.Milliseconds() {
continue
}
}
if value.IsStaleNaN(val) || (h != nil && value.IsStaleNaN(h.Sum)) {
if value.IsStaleNaN(f) || (h != nil && value.IsStaleNaN(h.Sum)) {
continue
}

data.Floats = append(data.Floats, promql.FPoint{T: stepT, F: val})
// if (f, h) have been set by PeekPrev, we do not know if f is 0 because that's the actual value, or because
// the previous value had a histogram.
// PeekPrev will set the histogram to nil, or the value to 0 if the other type exists.
// So check if histograms is nil first. If we don't have a histogram, then we should have a value and vice-versa.
if h != nil {
if len(data.Histograms) == 0 {
// Only create the slice once we know the series is a histogram or not
data.Histograms = GetHPointSlice(v.numSteps)
}
data.Histograms = append(data.Histograms, promql.HPoint{T: stepT, H: h})
} else {
if len(data.Floats) == 0 {
// Only create the slice once we know the series is a histogram or not
data.Floats = GetFPointSlice(v.numSteps)
}
data.Floats = append(data.Floats, promql.FPoint{T: stepT, F: f})
}
}

if v.memoizedIterator.Err() != nil {
Expand Down
12 changes: 12 additions & 0 deletions pkg/streamingpromql/operator/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ var (
return make([]promql.FPoint, 0, size)
})

hPointSlicePool = pool.NewBucketedPool(1, maxExpectedPointsPerSeries, seriesPerResultBucketFactor, func(size int) []promql.HPoint {
return make([]promql.HPoint, 0, size)
})

matrixPool = pool.NewBucketedPool(1, maxExpectedSeriesPerResult, seriesPerResultBucketFactor, func(size int) promql.Matrix {
return make(promql.Matrix, 0, size)
})
Expand Down Expand Up @@ -51,6 +55,14 @@ func PutFPointSlice(s []promql.FPoint) {
fPointSlicePool.Put(s)
}

func GetHPointSlice(size int) []promql.HPoint {
return hPointSlicePool.Get(size)
}

func PutHPointSlice(s []promql.HPoint) {
hPointSlicePool.Put(s)
}

func GetMatrix(size int) promql.Matrix {
return matrixPool.Get(size)
}
Expand Down
47 changes: 27 additions & 20 deletions pkg/streamingpromql/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,11 @@ func (q *Query) Exec(ctx context.Context) *promql.Result {
return q.result
}

func returnSeriesDataSlices(d operator.InstantVectorSeriesData) {
operator.PutFPointSlice(d.Floats)
operator.PutHPointSlice(d.Histograms)
}

func (q *Query) populateVectorFromInstantVectorOperator(ctx context.Context, o operator.InstantVectorOperator, series []operator.SeriesMetadata) (promql.Vector, error) {
ts := timeMilliseconds(q.statement.Start)
v := operator.GetVector(len(series))
Expand All @@ -292,26 +297,29 @@ func (q *Query) populateVectorFromInstantVectorOperator(ctx context.Context, o o
return nil, err
}

if len(d.Floats)+len(d.Histograms) != 1 {
operator.PutFPointSlice(d.Floats)
// TODO: put histogram point slice back in pool

if len(d.Floats)+len(d.Histograms) == 0 {
if len(d.Floats) == 1 && len(d.Histograms) == 0 {
point := d.Floats[0]
v = append(v, promql.Sample{
Metric: s.Labels,
T: ts,
F: point.F,
})
} else if len(d.Floats) == 0 && len(d.Histograms) == 1 {
point := d.Histograms[0]
v = append(v, promql.Sample{
Metric: s.Labels,
T: ts,
H: point.H,
})
} else {
returnSeriesDataSlices(d)
// A series may have no data points.
if len(d.Floats) == 0 && len(d.Histograms) == 0 {
continue
}

return nil, fmt.Errorf("expected exactly one sample for series %s, but got %v", s.Labels.String(), len(d.Floats))
return nil, fmt.Errorf("expected exactly one sample for series %s, but got %v floats, %v histograms", s.Labels.String(), len(d.Floats), len(d.Histograms))
}

point := d.Floats[0]
v = append(v, promql.Sample{
Metric: s.Labels,
T: ts,
F: point.F,
})

operator.PutFPointSlice(d.Floats)
// TODO: put histogram point slice back in pool
returnSeriesDataSlices(d)
}

return v, nil
Expand All @@ -332,8 +340,7 @@ func (q *Query) populateMatrixFromInstantVectorOperator(ctx context.Context, o o

if len(d.Floats) == 0 && len(d.Histograms) == 0 {
operator.PutFPointSlice(d.Floats)
// TODO: put histogram point slice back in pool

operator.PutHPointSlice(d.Histograms)
jhesketh marked this conversation as resolved.
Show resolved Hide resolved
continue
}

Expand Down Expand Up @@ -394,7 +401,7 @@ func (q *Query) Close() {
case promql.Matrix:
for _, s := range v {
operator.PutFPointSlice(s.Floats)
// TODO: put histogram point slice back in pool
operator.PutHPointSlice(s.Histograms)
}

operator.PutMatrix(v)
Expand Down
18 changes: 18 additions & 0 deletions pkg/streamingpromql/testdata/ours/native_histograms.test
jhesketh marked this conversation as resolved.
Show resolved Hide resolved
jhesketh marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# SPDX-License-Identifier: AGPL-3.0-only
# Provenance-includes-location: https://github.com/prometheus/prometheus/tree/main/promql/testdata/native_histograms.test
# Provenance-includes-license: Apache-2.0
# Provenance-includes-copyright: The Prometheus Authors

# Minimal valid case: an empty histogram.
load 5m
empty_histogram {{}}

eval instant at 5m empty_histogram
{__name__="empty_histogram"} {{}}
jhesketh marked this conversation as resolved.
Show resolved Hide resolved

# buckets:[1 2 1] means 1 observation in the 1st bucket, 2 observations in the 2nd and 1 observation in the 3rd (total 4).
jhesketh marked this conversation as resolved.
Show resolved Hide resolved
load 5m
single_histogram {{schema:0 sum:5 count:4 buckets:[1 2 1]}}

eval instant at 5m single_histogram
jhesketh marked this conversation as resolved.
Show resolved Hide resolved
{__name__="single_histogram"} {{count:4 sum:5 buckets:[1 2 1]}}
Loading