Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streaming PromQL engine: native histograms #8121

Merged
merged 31 commits into from
May 29, 2024
Merged
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
a021ff7
Streaming PromQL engine: native histograms
jhesketh May 13, 2024
3ce17f1
Merge branch 'main' into jhesketh/promql
jhesketh May 14, 2024
1970ce3
Use seriesPerResultBucketFactor for histogram pool
jhesketh May 14, 2024
a75e358
Don't defer returning slices
jhesketh May 14, 2024
0b8a160
Refactor slice returning logic
jhesketh May 15, 2024
5c7a2cd
Move returnSlices helper
jhesketh May 15, 2024
235a402
rename val for consistency
jhesketh May 15, 2024
72493bf
Update comment from feedback
jhesketh May 21, 2024
3ea8e20
Use returnSeriesDataSlices where missed
jhesketh May 21, 2024
885adfa
Comment out unsupported tests for now
jhesketh May 21, 2024
84be678
Enable upstream native_histogram test file
jhesketh May 21, 2024
d67498b
Remove duplicate test
jhesketh May 21, 2024
4cf927e
Merge branch 'main' into jhesketh/promql
jhesketh May 21, 2024
229b30a
Fix FloatHistogram selector
jhesketh May 24, 2024
7438138
Add range test
jhesketh May 24, 2024
999d860
Add test for mixed floats+histograms
jhesketh May 24, 2024
4d50e3c
Re-enable supported upstream tests
jhesketh May 24, 2024
91ba5c1
Load benchmark samples in batches to avoid OOM'ing
jhesketh May 27, 2024
aeb1b53
Revert "Load benchmark samples in batches to avoid OOM'ing"
jhesketh May 27, 2024
7b79add
Batch by samples instead of series
jhesketh May 27, 2024
1a38e40
Generate native histogram benchmark test data
jhesketh May 27, 2024
f8ae514
Add native histogram benchmark test
jhesketh May 27, 2024
063ed83
Move compare.sh next to other benchmark scripts
jhesketh May 27, 2024
0cff32b
Fix comparison tests for when there are no floats
jhesketh May 27, 2024
b9adee0
Add extra samples to the histogram tests
jhesketh May 27, 2024
7fbde86
Add note on batch size
jhesketh May 29, 2024
14ff8bd
Add quick check that histograms are loaded
jhesketh May 29, 2024
a08733c
Use clearer variables for indexes
jhesketh May 29, 2024
e94b596
Update changelog
jhesketh May 29, 2024
aa095fb
Merge branch 'main' into jhesketh/promql
jhesketh May 29, 2024
e8e5b24
Fix typo
jhesketh May 29, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion pkg/streamingpromql/benchmarks/benchmarks.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,18 @@ func (c BenchCase) Run(ctx context.Context, t testing.TB, start, end time.Time,
return res, qry.Close
}

// These test cases are taken from https://github.com/prometheus/prometheus/blob/main/promql/bench_test.go.
// These test cases are taken from https://github.com/prometheus/prometheus/blob/main/promql/bench_test.go
// and enhanced/added to.
func TestCases(metricSizes []int) []BenchCase {
cases := []BenchCase{
// Plain retrieval.
{
Expr: "a_X",
},
// Histogram retrieval
{
Expr: "nh_X",
},
// Range vector selector.
{
Expr: "a_X[1m]",
Expand Down
30 changes: 28 additions & 2 deletions pkg/streamingpromql/benchmarks/comparison_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,24 @@ func TestBenchmarkSetup(t *testing.T) {
}

require.Equal(t, expectedPoints, series.Floats)

// Check native histograms are set up correctly
query, err = streamingEngine.NewRangeQuery(ctx, q, nil, "nh_1", time.Unix(0, 0), time.Unix(int64(15*intervalSeconds), 0), interval)
require.NoError(t, err)

t.Cleanup(query.Close)
result = query.Exec(ctx)
require.NoError(t, result.Err)

matrix, err = result.Matrix()
require.NoError(t, err)

require.Len(t, matrix, 1)
series = matrix[0]
require.Equal(t, labels.FromStrings("__name__", "nh_1"), series.Metric)
require.Len(t, series.Floats, 0)
require.Len(t, series.Histograms, 16)
// TODO(jhesketh): Maybe check a histogram is set up as expected
jhesketh marked this conversation as resolved.
Show resolved Hide resolved
}

// Why do we do this rather than require.Equal(t, expected, actual)?
Expand Down Expand Up @@ -194,7 +212,11 @@ func requireEqualResults(t testing.TB, expected, actual *promql.Result) {
require.Equal(t, expectedSample.Metric, actualSample.Metric)
require.Equal(t, expectedSample.T, actualSample.T)
require.Equal(t, expectedSample.H, actualSample.H)
require.InEpsilon(t, expectedSample.F, actualSample.F, 1e-10)
if expectedSample.F == 0 {
require.Equal(t, expectedSample.F, actualSample.F)
} else {
require.InEpsilon(t, expectedSample.F, actualSample.F, 1e-10)
}
}
case parser.ValueTypeMatrix:
expectedMatrix, err := expected.Matrix()
Expand All @@ -214,7 +236,11 @@ func requireEqualResults(t testing.TB, expected, actual *promql.Result) {
actualPoint := actualSeries.Floats[j]

require.Equal(t, expectedPoint.T, actualPoint.T)
require.InEpsilonf(t, expectedPoint.F, actualPoint.F, 1e-10, "expected series %v to have points %v, but result is %v", expectedSeries.Metric.String(), expectedSeries.Floats, actualSeries.Floats)
if expectedPoint.F == 0 {
require.Equal(t, expectedPoint.F, actualPoint.F)
} else {
require.InEpsilonf(t, expectedPoint.F, actualPoint.F, 1e-10, "expected series %v to have points %v, but result is %v", expectedSeries.Metric.String(), expectedSeries.Floats, actualSeries.Floats)
}
}
}
default:
Expand Down
71 changes: 53 additions & 18 deletions pkg/streamingpromql/benchmarks/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"net"
"path/filepath"
"strconv"
"strings"
"time"

"github.com/go-kit/log"
Expand Down Expand Up @@ -174,7 +175,7 @@ func pushTestData(ing *ingester.Ingester, metricSizes []int) error {
totalMetrics := 0

for _, size := range metricSizes {
totalMetrics += (2 + histogramBuckets + 1) * size // 2 non-histogram metrics + 5 metrics for histogram buckets + 1 metric for +Inf histogram bucket
totalMetrics += (2 + histogramBuckets + 1 + 1) * size // 2 non-histogram metrics + 5 metrics for histogram buckets + 1 metric for +Inf histogram bucket + 1 metric for native-histograms
}

metrics := make([]labels.Labels, 0, totalMetrics)
Expand All @@ -183,6 +184,7 @@ func pushTestData(ing *ingester.Ingester, metricSizes []int) error {
aName := "a_" + strconv.Itoa(size)
bName := "b_" + strconv.Itoa(size)
histogramName := "h_" + strconv.Itoa(size)
nativeHistogramName := "nh_" + strconv.Itoa(size)

if size == 1 {
// We don't want a "l" label on metrics with one series (some test cases rely on this label not being present).
Expand All @@ -192,6 +194,7 @@ func pushTestData(ing *ingester.Ingester, metricSizes []int) error {
metrics = append(metrics, labels.FromStrings("__name__", histogramName, "le", strconv.Itoa(le)))
}
metrics = append(metrics, labels.FromStrings("__name__", histogramName, "le", "+Inf"))
metrics = append(metrics, labels.FromStrings("__name__", nativeHistogramName))
} else {
for i := 0; i < size; i++ {
metrics = append(metrics, labels.FromStrings("__name__", aName, "l", strconv.Itoa(i)))
Expand All @@ -200,34 +203,66 @@ func pushTestData(ing *ingester.Ingester, metricSizes []int) error {
metrics = append(metrics, labels.FromStrings("__name__", histogramName, "l", strconv.Itoa(i), "le", strconv.Itoa(le)))
}
metrics = append(metrics, labels.FromStrings("__name__", histogramName, "l", strconv.Itoa(i), "le", "+Inf"))
metrics = append(metrics, labels.FromStrings("__name__", nativeHistogramName, "l", strconv.Itoa(i)))
}
}
}

ctx := user.InjectOrgID(context.Background(), UserID)
req := &mimirpb.WriteRequest{
Timeseries: make([]mimirpb.PreallocTimeseries, len(metrics)),
}

for i, m := range metrics {
series := mimirpb.PreallocTimeseries{TimeSeries: &mimirpb.TimeSeries{
Labels: mimirpb.FromLabelsToLabelAdapters(m),
Samples: make([]mimirpb.Sample, NumIntervals),
}}
// Batch samples into separate requests
batchSize := 100
jhesketh marked this conversation as resolved.
Show resolved Hide resolved
for i := 0; i < NumIntervals; i += batchSize {
jhesketh marked this conversation as resolved.
Show resolved Hide resolved
end := i + batchSize
if end > NumIntervals {
end = NumIntervals
}

for s := 0; s < NumIntervals; s++ {
series.Samples[s].TimestampMs = int64(s) * interval.Milliseconds()
series.Samples[s].Value = float64(s) + float64(i)/float64(len(metrics))
req := &mimirpb.WriteRequest{
Timeseries: make([]mimirpb.PreallocTimeseries, len(metrics)),
}

req.Timeseries[i] = series
}
for j, m := range metrics {
if strings.HasPrefix(m.Get("__name__"), "nh_") {
series := mimirpb.PreallocTimeseries{TimeSeries: &mimirpb.TimeSeries{
Labels: mimirpb.FromLabelsToLabelAdapters(m.Copy()),
Histograms: make([]mimirpb.Histogram, end-i),
}}

for s := i; s < end; s++ {
// TODO(jhesketh): Fix this with some better data
charleskorn marked this conversation as resolved.
Show resolved Hide resolved
series.Histograms[s-i].Timestamp = int64(s) * interval.Milliseconds()
series.Histograms[s-i].Count = &mimirpb.Histogram_CountInt{CountInt: 12}
series.Histograms[s-i].ZeroCount = &mimirpb.Histogram_ZeroCountInt{ZeroCountInt: 2}
series.Histograms[s-i].ZeroThreshold = 0.001
series.Histograms[s-i].Sum = 18.4
series.Histograms[s-i].Schema = 0
series.Histograms[s-i].NegativeSpans = []mimirpb.BucketSpan{{Offset: 0, Length: 2}, {Offset: 1, Length: 2}}
series.Histograms[s-i].NegativeDeltas = []int64{1, 1, -1, 0}
series.Histograms[s-i].PositiveSpans = []mimirpb.BucketSpan{{Offset: 0, Length: 2}, {Offset: 1, Length: 2}}
series.Histograms[s-i].PositiveDeltas = []int64{1, 1, -1, 0}
}

if _, err := ing.Push(ctx, req); err != nil {
return fmt.Errorf("failed to push samples to ingester: %w", err)
}
req.Timeseries[j] = series
} else {
series := mimirpb.PreallocTimeseries{TimeSeries: &mimirpb.TimeSeries{
Labels: mimirpb.FromLabelsToLabelAdapters(m.Copy()),
Samples: make([]mimirpb.Sample, end-i),
}}

ing.Flush()
for s := i; s < end; s++ {
series.Samples[s-i].TimestampMs = int64(s) * interval.Milliseconds()
series.Samples[s-i].Value = float64(s) + float64(j)/float64(len(metrics))
}

req.Timeseries[j] = series
}
}
if _, err := ing.Push(ctx, req); err != nil {
return fmt.Errorf("failed to push samples to ingester: %w", err)
}
ing.Flush()
}

return nil
}
37 changes: 25 additions & 12 deletions pkg/streamingpromql/operator/instant_vector_selector.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ package operator

import (
"context"
"errors"
"fmt"

"github.com/prometheus/prometheus/model/histogram"
Expand Down Expand Up @@ -48,13 +47,11 @@ func (v *InstantVectorSelector) NextSeries(_ context.Context) (InstantVectorSeri

v.memoizedIterator.Reset(v.chunkIterator)

data := InstantVectorSeriesData{
Floats: GetFPointSlice(v.numSteps), // TODO: only allocate this if we have any floats (once we support native histograms)
}
data := InstantVectorSeriesData{}

for stepT := v.Selector.Start; stepT <= v.Selector.End; stepT += v.Selector.Interval {
var t int64
jhesketh marked this conversation as resolved.
Show resolved Hide resolved
var val float64
var f float64
var h *histogram.FloatHistogram

ts := stepT
Expand All @@ -70,26 +67,42 @@ func (v *InstantVectorSelector) NextSeries(_ context.Context) (InstantVectorSeri
return InstantVectorSeriesData{}, v.memoizedIterator.Err()
}
case chunkenc.ValFloat:
t, val = v.memoizedIterator.At()
t, f = v.memoizedIterator.At()
case chunkenc.ValHistogram, chunkenc.ValFloatHistogram:
t, h = v.memoizedIterator.AtFloatHistogram()
default:
return InstantVectorSeriesData{}, fmt.Errorf("streaming PromQL engine: unknown value type %s", valueType.String())
}

if valueType == chunkenc.ValNone || t > ts {
var ok bool
t, val, h, ok = v.memoizedIterator.PeekPrev()
if h != nil {
return InstantVectorSeriesData{}, errors.New("streaming PromQL engine doesn't support histograms yet")
}
t, f, h, ok = v.memoizedIterator.PeekPrev()
if !ok || t < ts-v.Selector.LookbackDelta.Milliseconds() {
continue
}
}
if value.IsStaleNaN(val) || (h != nil && value.IsStaleNaN(h.Sum)) {
if value.IsStaleNaN(f) || (h != nil && value.IsStaleNaN(h.Sum)) {
continue
}

data.Floats = append(data.Floats, promql.FPoint{T: stepT, F: val})
// if (f, h) have been set by PeekPrev, we do not know if f is 0 because that's the actual value, or because
// the previous value had a histogram.
// PeekPrev will set the histogram to nil, or the value to 0 if the other type exists.
// So check if histograms is nil first. If we don't have a histogram, then we should have a value and vice-versa.
if h != nil {
if len(data.Histograms) == 0 {
// Only create the slice once we know the series is a histogram or not.
// (It is possible to over-allocate in the case where we have both floats and histograms, but that won't be common).
data.Histograms = GetHPointSlice(v.numSteps)
}
data.Histograms = append(data.Histograms, promql.HPoint{T: stepT, H: h})
} else {
if len(data.Floats) == 0 {
// Only create the slice once we know the series is a histogram or not
data.Floats = GetFPointSlice(v.numSteps)
}
data.Floats = append(data.Floats, promql.FPoint{T: stepT, F: f})
}
}

if v.memoizedIterator.Err() != nil {
Expand Down
12 changes: 12 additions & 0 deletions pkg/streamingpromql/operator/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ var (
return make([]promql.FPoint, 0, size)
})

hPointSlicePool = pool.NewBucketedPool(1, maxExpectedPointsPerSeries, seriesPerResultBucketFactor, func(size int) []promql.HPoint {
return make([]promql.HPoint, 0, size)
})

matrixPool = pool.NewBucketedPool(1, maxExpectedSeriesPerResult, seriesPerResultBucketFactor, func(size int) promql.Matrix {
return make(promql.Matrix, 0, size)
})
Expand Down Expand Up @@ -51,6 +55,14 @@ func PutFPointSlice(s []promql.FPoint) {
fPointSlicePool.Put(s)
}

func GetHPointSlice(size int) []promql.HPoint {
return hPointSlicePool.Get(size)
}

func PutHPointSlice(s []promql.HPoint) {
hPointSlicePool.Put(s)
}

func GetMatrix(size int) promql.Matrix {
return matrixPool.Get(size)
}
Expand Down
48 changes: 27 additions & 21 deletions pkg/streamingpromql/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,11 @@ func (q *Query) Exec(ctx context.Context) *promql.Result {
return q.result
}

func returnSeriesDataSlices(d operator.InstantVectorSeriesData) {
operator.PutFPointSlice(d.Floats)
operator.PutHPointSlice(d.Histograms)
}

func (q *Query) populateVectorFromInstantVectorOperator(ctx context.Context, o operator.InstantVectorOperator, series []operator.SeriesMetadata) (promql.Vector, error) {
ts := timeMilliseconds(q.statement.Start)
v := operator.GetVector(len(series))
Expand All @@ -292,26 +297,29 @@ func (q *Query) populateVectorFromInstantVectorOperator(ctx context.Context, o o
return nil, err
}

if len(d.Floats)+len(d.Histograms) != 1 {
operator.PutFPointSlice(d.Floats)
// TODO: put histogram point slice back in pool

if len(d.Floats)+len(d.Histograms) == 0 {
if len(d.Floats) == 1 && len(d.Histograms) == 0 {
point := d.Floats[0]
v = append(v, promql.Sample{
Metric: s.Labels,
T: ts,
F: point.F,
})
} else if len(d.Floats) == 0 && len(d.Histograms) == 1 {
point := d.Histograms[0]
v = append(v, promql.Sample{
Metric: s.Labels,
T: ts,
H: point.H,
})
} else {
returnSeriesDataSlices(d)
// A series may have no data points.
if len(d.Floats) == 0 && len(d.Histograms) == 0 {
continue
}

return nil, fmt.Errorf("expected exactly one sample for series %s, but got %v", s.Labels.String(), len(d.Floats))
return nil, fmt.Errorf("expected exactly one sample for series %s, but got %v floats, %v histograms", s.Labels.String(), len(d.Floats), len(d.Histograms))
}

point := d.Floats[0]
v = append(v, promql.Sample{
Metric: s.Labels,
T: ts,
F: point.F,
})

operator.PutFPointSlice(d.Floats)
// TODO: put histogram point slice back in pool
returnSeriesDataSlices(d)
}

return v, nil
Expand All @@ -331,9 +339,7 @@ func (q *Query) populateMatrixFromInstantVectorOperator(ctx context.Context, o o
}

if len(d.Floats) == 0 && len(d.Histograms) == 0 {
operator.PutFPointSlice(d.Floats)
// TODO: put histogram point slice back in pool

returnSeriesDataSlices(d)
continue
}

Expand Down Expand Up @@ -394,7 +400,7 @@ func (q *Query) Close() {
case promql.Matrix:
for _, s := range v {
operator.PutFPointSlice(s.Floats)
// TODO: put histogram point slice back in pool
operator.PutHPointSlice(s.Histograms)
}

operator.PutMatrix(v)
Expand Down
23 changes: 23 additions & 0 deletions pkg/streamingpromql/testdata/ours/native_histograms.test
jhesketh marked this conversation as resolved.
Show resolved Hide resolved
jhesketh marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# SPDX-License-Identifier: AGPL-3.0-only
# Provenance-includes-location: https://github.com/prometheus/prometheus/tree/main/promql/testdata/native_histograms.test
# Provenance-includes-license: Apache-2.0
# Provenance-includes-copyright: The Prometheus Authors

# buckets:[1 2 1] means 1 observation in the 1st bucket, 2 observations in the 2nd and 1 observation in the 3rd (total 4).
jhesketh marked this conversation as resolved.
Show resolved Hide resolved
load 5m
single_histogram {{schema:0 sum:5 count:4 buckets:[1 2 1]}} {{schema:0 sum:20 count:7 buckets:[9 10 1]}}

eval instant at 5m single_histogram
jhesketh marked this conversation as resolved.
Show resolved Hide resolved
{__name__="single_histogram"} {{schema:0 sum:20 count:7 buckets:[9 10 1]}}

eval range from 0 to 5m step 1m single_histogram
{__name__="single_histogram"} {{schema:0 sum:5 count:4 buckets:[1 2 1]}} {{schema:0 sum:5 count:4 buckets:[1 2 1]}} {{schema:0 sum:5 count:4 buckets:[1 2 1]}} {{schema:0 sum:5 count:4 buckets:[1 2 1]}} {{schema:0 sum:5 count:4 buckets:[1 2 1]}} {{schema:0 sum:20 count:7 buckets:[9 10 1]}}

clear

# Test metric with mixed floats and histograms
load 1m
mixed_metric 1 2 3 {{schema:0 sum:5 count:4 buckets:[1 2 1]}} {{schema:0 sum:8 count:6 buckets:[1 4 1]}}

eval range from 0 to 4m step 1m mixed_metric
{__name__="mixed_metric"} 1 2 3 {{count:4 sum:5 buckets:[1 2 1]}} {{count:6 sum:8 buckets:[1 4 1]}}
Loading
Loading