Skip to content

Commit

Permalink
Streaming PromQL engine: native histograms (#8121)
Browse files Browse the repository at this point in the history
* Streaming PromQL engine: native histograms

Add native histogram support to instant vectors.

* Use seriesPerResultBucketFactor for histogram pool

* Don't defer returning slices

* Refactor slice returning logic

* Move returnSlices helper

* rename val for consistency

* Update comment from feedback

* Use returnSeriesDataSlices where missed

* Comment out unsupported tests for now

* Enable upstream native_histogram test file

* Remove duplicate test

* Fix FloatHistogram selector

* Add range test

* Add test for mixed floats+histograms

* Re-enable supported upstream tests

* Load benchmark samples in batches to avoid OOM'ing

* Revert "Load benchmark samples in batches to avoid OOM'ing"

We can't use OOO samples with native histograms, so instead of
batching by series we'll batch by samples

This reverts commit 91ba5c1.

* Batch by samples instead of series

* Generate native histogram benchmark test data

* Add native histogram benchmark test

* Move compare.sh next to other benchmark scripts

* Fix comparison tests for when there are no floats

* Add extra samples to the histogram tests

* Add note on batch size

* Add quick check that histograms are loaded

* Use clearer variables for indexes

* Update changelog

* Fix typo
  • Loading branch information
jhesketh authored May 29, 2024
1 parent 1fb602b commit 6db3385
Show file tree
Hide file tree
Showing 12 changed files with 519 additions and 333 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* [FEATURE] Continuous-test: now runable as a module with `mimir -target=continuous-test`. #7747
* [FEATURE] Store-gateway: Allow specific tenants to be enabled or disabled via `-store-gateway.enabled-tenants` or `-store-gateway.disabled-tenants` CLI flags or their corresponding YAML settings. #7653
* [FEATURE] New `-<prefix>.s3.bucket-lookup-type` flag configures lookup style type, used to access bucket in s3 compatible providers. #7684
* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.promql-engine=streaming`. #7693 #7898 #7899 #8023 #8058 #8096
* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.promql-engine=streaming`. #7693 #7898 #7899 #8023 #8058 #8096 #8121
* [FEATURE] New `/ingester/unregister-on-shutdown` HTTP endpoint allows dynamic access to ingesters' `-ingester.ring.unregister-on-shutdown` configuration. #7739
* [FEATURE] Server: added experimental [PROXY protocol support](https://www.haproxy.org/download/2.3/doc/proxy-protocol.txt). The PROXY protocol support can be enabled via `-server.proxy-protocol-enabled=true`. When enabled, the support is added both to HTTP and gRPC listening ports. #7698
* [FEATURE] mimirtool: Add `runtime-config verify` sub-command, for verifying Mimir runtime config files. #8123
Expand Down
7 changes: 6 additions & 1 deletion pkg/streamingpromql/benchmarks/benchmarks.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,18 @@ func (c BenchCase) Run(ctx context.Context, t testing.TB, start, end time.Time,
return res, qry.Close
}

// These test cases are taken from https://github.com/prometheus/prometheus/blob/main/promql/bench_test.go.
// These test cases are taken from https://github.com/prometheus/prometheus/blob/main/promql/bench_test.go
// and enhanced/added to.
func TestCases(metricSizes []int) []BenchCase {
cases := []BenchCase{
// Plain retrieval.
{
Expr: "a_X",
},
// Histogram retrieval
{
Expr: "nh_X",
},
// Range vector selector.
{
Expr: "a_X[1m]",
Expand Down
34 changes: 32 additions & 2 deletions pkg/streamingpromql/benchmarks/comparison_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,28 @@ func TestBenchmarkSetup(t *testing.T) {
}

require.Equal(t, expectedPoints, series.Floats)

// Check native histograms are set up correctly
query, err = streamingEngine.NewRangeQuery(ctx, q, nil, "nh_1", time.Unix(0, 0), time.Unix(int64(15*intervalSeconds), 0), interval)
require.NoError(t, err)

t.Cleanup(query.Close)
result = query.Exec(ctx)
require.NoError(t, result.Err)

matrix, err = result.Matrix()
require.NoError(t, err)

require.Len(t, matrix, 1)
series = matrix[0]
require.Equal(t, labels.FromStrings("__name__", "nh_1"), series.Metric)
require.Len(t, series.Floats, 0)
require.Len(t, series.Histograms, 16)

// Check one histogram point is as expected
require.Equal(t, int64(0), series.Histograms[0].T)
require.Equal(t, 12.0, series.Histograms[0].H.Count)
require.Equal(t, 18.4, series.Histograms[0].H.Sum)
}

// Why do we do this rather than require.Equal(t, expected, actual)?
Expand Down Expand Up @@ -194,7 +216,11 @@ func requireEqualResults(t testing.TB, expected, actual *promql.Result) {
require.Equal(t, expectedSample.Metric, actualSample.Metric)
require.Equal(t, expectedSample.T, actualSample.T)
require.Equal(t, expectedSample.H, actualSample.H)
require.InEpsilon(t, expectedSample.F, actualSample.F, 1e-10)
if expectedSample.F == 0 {
require.Equal(t, expectedSample.F, actualSample.F)
} else {
require.InEpsilon(t, expectedSample.F, actualSample.F, 1e-10)
}
}
case parser.ValueTypeMatrix:
expectedMatrix, err := expected.Matrix()
Expand All @@ -214,7 +240,11 @@ func requireEqualResults(t testing.TB, expected, actual *promql.Result) {
actualPoint := actualSeries.Floats[j]

require.Equal(t, expectedPoint.T, actualPoint.T)
require.InEpsilonf(t, expectedPoint.F, actualPoint.F, 1e-10, "expected series %v to have points %v, but result is %v", expectedSeries.Metric.String(), expectedSeries.Floats, actualSeries.Floats)
if expectedPoint.F == 0 {
require.Equal(t, expectedPoint.F, actualPoint.F)
} else {
require.InEpsilonf(t, expectedPoint.F, actualPoint.F, 1e-10, "expected series %v to have points %v, but result is %v", expectedSeries.Metric.String(), expectedSeries.Floats, actualSeries.Floats)
}
}
}
default:
Expand Down
74 changes: 56 additions & 18 deletions pkg/streamingpromql/benchmarks/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"net"
"path/filepath"
"strconv"
"strings"
"time"

"github.com/go-kit/log"
Expand Down Expand Up @@ -174,7 +175,7 @@ func pushTestData(ing *ingester.Ingester, metricSizes []int) error {
totalMetrics := 0

for _, size := range metricSizes {
totalMetrics += (2 + histogramBuckets + 1) * size // 2 non-histogram metrics + 5 metrics for histogram buckets + 1 metric for +Inf histogram bucket
totalMetrics += (2 + histogramBuckets + 1 + 1) * size // 2 non-histogram metrics + 5 metrics for histogram buckets + 1 metric for +Inf histogram bucket + 1 metric for native-histograms
}

metrics := make([]labels.Labels, 0, totalMetrics)
Expand All @@ -183,6 +184,7 @@ func pushTestData(ing *ingester.Ingester, metricSizes []int) error {
aName := "a_" + strconv.Itoa(size)
bName := "b_" + strconv.Itoa(size)
histogramName := "h_" + strconv.Itoa(size)
nativeHistogramName := "nh_" + strconv.Itoa(size)

if size == 1 {
// We don't want a "l" label on metrics with one series (some test cases rely on this label not being present).
Expand All @@ -192,6 +194,7 @@ func pushTestData(ing *ingester.Ingester, metricSizes []int) error {
metrics = append(metrics, labels.FromStrings("__name__", histogramName, "le", strconv.Itoa(le)))
}
metrics = append(metrics, labels.FromStrings("__name__", histogramName, "le", "+Inf"))
metrics = append(metrics, labels.FromStrings("__name__", nativeHistogramName))
} else {
for i := 0; i < size; i++ {
metrics = append(metrics, labels.FromStrings("__name__", aName, "l", strconv.Itoa(i)))
Expand All @@ -200,34 +203,69 @@ func pushTestData(ing *ingester.Ingester, metricSizes []int) error {
metrics = append(metrics, labels.FromStrings("__name__", histogramName, "l", strconv.Itoa(i), "le", strconv.Itoa(le)))
}
metrics = append(metrics, labels.FromStrings("__name__", histogramName, "l", strconv.Itoa(i), "le", "+Inf"))
metrics = append(metrics, labels.FromStrings("__name__", nativeHistogramName, "l", strconv.Itoa(i)))
}
}
}

ctx := user.InjectOrgID(context.Background(), UserID)
req := &mimirpb.WriteRequest{
Timeseries: make([]mimirpb.PreallocTimeseries, len(metrics)),
}

for i, m := range metrics {
series := mimirpb.PreallocTimeseries{TimeSeries: &mimirpb.TimeSeries{
Labels: mimirpb.FromLabelsToLabelAdapters(m),
Samples: make([]mimirpb.Sample, NumIntervals),
}}
// Batch samples into separate requests
// There is no precise science behind this number currently.
// A quick run locally found batching by 100 did not increase the loading time by any noticeable amount.
// Additionally memory usage maxed about 4GB for the whole process.
batchSize := 100
for start := 0; start < NumIntervals; start += batchSize {
end := start + batchSize
if end > NumIntervals {
end = NumIntervals
}

for s := 0; s < NumIntervals; s++ {
series.Samples[s].TimestampMs = int64(s) * interval.Milliseconds()
series.Samples[s].Value = float64(s) + float64(i)/float64(len(metrics))
req := &mimirpb.WriteRequest{
Timeseries: make([]mimirpb.PreallocTimeseries, len(metrics)),
}

req.Timeseries[i] = series
}
for metricIdx, m := range metrics {
if strings.HasPrefix(m.Get("__name__"), "nh_") {
series := mimirpb.PreallocTimeseries{TimeSeries: &mimirpb.TimeSeries{
Labels: mimirpb.FromLabelsToLabelAdapters(m.Copy()),
Histograms: make([]mimirpb.Histogram, end-start),
}}

for ts := start; ts < end; ts++ {
// TODO(jhesketh): Fix this with some better data
series.Histograms[ts-start].Timestamp = int64(ts) * interval.Milliseconds()
series.Histograms[ts-start].Count = &mimirpb.Histogram_CountInt{CountInt: 12}
series.Histograms[ts-start].ZeroCount = &mimirpb.Histogram_ZeroCountInt{ZeroCountInt: 2}
series.Histograms[ts-start].ZeroThreshold = 0.001
series.Histograms[ts-start].Sum = 18.4
series.Histograms[ts-start].Schema = 0
series.Histograms[ts-start].NegativeSpans = []mimirpb.BucketSpan{{Offset: 0, Length: 2}, {Offset: 1, Length: 2}}
series.Histograms[ts-start].NegativeDeltas = []int64{1, 1, -1, 0}
series.Histograms[ts-start].PositiveSpans = []mimirpb.BucketSpan{{Offset: 0, Length: 2}, {Offset: 1, Length: 2}}
series.Histograms[ts-start].PositiveDeltas = []int64{1, 1, -1, 0}
}

if _, err := ing.Push(ctx, req); err != nil {
return fmt.Errorf("failed to push samples to ingester: %w", err)
}
req.Timeseries[metricIdx] = series
} else {
series := mimirpb.PreallocTimeseries{TimeSeries: &mimirpb.TimeSeries{
Labels: mimirpb.FromLabelsToLabelAdapters(m.Copy()),
Samples: make([]mimirpb.Sample, end-start),
}}

ing.Flush()
for ts := start; ts < end; ts++ {
series.Samples[ts-start].TimestampMs = int64(ts) * interval.Milliseconds()
series.Samples[ts-start].Value = float64(ts) + float64(metricIdx)/float64(len(metrics))
}

req.Timeseries[metricIdx] = series
}
}
if _, err := ing.Push(ctx, req); err != nil {
return fmt.Errorf("failed to push samples to ingester: %w", err)
}
ing.Flush()
}

return nil
}
37 changes: 25 additions & 12 deletions pkg/streamingpromql/operator/instant_vector_selector.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ package operator

import (
"context"
"errors"
"fmt"

"github.com/prometheus/prometheus/model/histogram"
Expand Down Expand Up @@ -48,13 +47,11 @@ func (v *InstantVectorSelector) NextSeries(_ context.Context) (InstantVectorSeri

v.memoizedIterator.Reset(v.chunkIterator)

data := InstantVectorSeriesData{
Floats: GetFPointSlice(v.numSteps), // TODO: only allocate this if we have any floats (once we support native histograms)
}
data := InstantVectorSeriesData{}

for stepT := v.Selector.Start; stepT <= v.Selector.End; stepT += v.Selector.Interval {
var t int64
var val float64
var f float64
var h *histogram.FloatHistogram

ts := stepT
Expand All @@ -70,26 +67,42 @@ func (v *InstantVectorSelector) NextSeries(_ context.Context) (InstantVectorSeri
return InstantVectorSeriesData{}, v.memoizedIterator.Err()
}
case chunkenc.ValFloat:
t, val = v.memoizedIterator.At()
t, f = v.memoizedIterator.At()
case chunkenc.ValHistogram, chunkenc.ValFloatHistogram:
t, h = v.memoizedIterator.AtFloatHistogram()
default:
return InstantVectorSeriesData{}, fmt.Errorf("streaming PromQL engine: unknown value type %s", valueType.String())
}

if valueType == chunkenc.ValNone || t > ts {
var ok bool
t, val, h, ok = v.memoizedIterator.PeekPrev()
if h != nil {
return InstantVectorSeriesData{}, errors.New("streaming PromQL engine doesn't support histograms yet")
}
t, f, h, ok = v.memoizedIterator.PeekPrev()
if !ok || t < ts-v.Selector.LookbackDelta.Milliseconds() {
continue
}
}
if value.IsStaleNaN(val) || (h != nil && value.IsStaleNaN(h.Sum)) {
if value.IsStaleNaN(f) || (h != nil && value.IsStaleNaN(h.Sum)) {
continue
}

data.Floats = append(data.Floats, promql.FPoint{T: stepT, F: val})
// if (f, h) have been set by PeekPrev, we do not know if f is 0 because that's the actual value, or because
// the previous value had a histogram.
// PeekPrev will set the histogram to nil, or the value to 0 if the other type exists.
// So check if histograms is nil first. If we don't have a histogram, then we should have a value and vice-versa.
if h != nil {
if len(data.Histograms) == 0 {
// Only create the slice once we know the series is a histogram or not.
// (It is possible to over-allocate in the case where we have both floats and histograms, but that won't be common).
data.Histograms = GetHPointSlice(v.numSteps)
}
data.Histograms = append(data.Histograms, promql.HPoint{T: stepT, H: h})
} else {
if len(data.Floats) == 0 {
// Only create the slice once we know the series is a histogram or not
data.Floats = GetFPointSlice(v.numSteps)
}
data.Floats = append(data.Floats, promql.FPoint{T: stepT, F: f})
}
}

if v.memoizedIterator.Err() != nil {
Expand Down
12 changes: 12 additions & 0 deletions pkg/streamingpromql/operator/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ var (
return make([]promql.FPoint, 0, size)
})

hPointSlicePool = pool.NewBucketedPool(1, maxExpectedPointsPerSeries, seriesPerResultBucketFactor, func(size int) []promql.HPoint {
return make([]promql.HPoint, 0, size)
})

matrixPool = pool.NewBucketedPool(1, maxExpectedSeriesPerResult, seriesPerResultBucketFactor, func(size int) promql.Matrix {
return make(promql.Matrix, 0, size)
})
Expand Down Expand Up @@ -51,6 +55,14 @@ func PutFPointSlice(s []promql.FPoint) {
fPointSlicePool.Put(s)
}

func GetHPointSlice(size int) []promql.HPoint {
return hPointSlicePool.Get(size)
}

func PutHPointSlice(s []promql.HPoint) {
hPointSlicePool.Put(s)
}

func GetMatrix(size int) promql.Matrix {
return matrixPool.Get(size)
}
Expand Down
48 changes: 27 additions & 21 deletions pkg/streamingpromql/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,11 @@ func (q *Query) Exec(ctx context.Context) *promql.Result {
return q.result
}

func returnSeriesDataSlices(d operator.InstantVectorSeriesData) {
operator.PutFPointSlice(d.Floats)
operator.PutHPointSlice(d.Histograms)
}

func (q *Query) populateVectorFromInstantVectorOperator(ctx context.Context, o operator.InstantVectorOperator, series []operator.SeriesMetadata) (promql.Vector, error) {
ts := timeMilliseconds(q.statement.Start)
v := operator.GetVector(len(series))
Expand All @@ -292,26 +297,29 @@ func (q *Query) populateVectorFromInstantVectorOperator(ctx context.Context, o o
return nil, err
}

if len(d.Floats)+len(d.Histograms) != 1 {
operator.PutFPointSlice(d.Floats)
// TODO: put histogram point slice back in pool

if len(d.Floats)+len(d.Histograms) == 0 {
if len(d.Floats) == 1 && len(d.Histograms) == 0 {
point := d.Floats[0]
v = append(v, promql.Sample{
Metric: s.Labels,
T: ts,
F: point.F,
})
} else if len(d.Floats) == 0 && len(d.Histograms) == 1 {
point := d.Histograms[0]
v = append(v, promql.Sample{
Metric: s.Labels,
T: ts,
H: point.H,
})
} else {
returnSeriesDataSlices(d)
// A series may have no data points.
if len(d.Floats) == 0 && len(d.Histograms) == 0 {
continue
}

return nil, fmt.Errorf("expected exactly one sample for series %s, but got %v", s.Labels.String(), len(d.Floats))
return nil, fmt.Errorf("expected exactly one sample for series %s, but got %v floats, %v histograms", s.Labels.String(), len(d.Floats), len(d.Histograms))
}

point := d.Floats[0]
v = append(v, promql.Sample{
Metric: s.Labels,
T: ts,
F: point.F,
})

operator.PutFPointSlice(d.Floats)
// TODO: put histogram point slice back in pool
returnSeriesDataSlices(d)
}

return v, nil
Expand All @@ -331,9 +339,7 @@ func (q *Query) populateMatrixFromInstantVectorOperator(ctx context.Context, o o
}

if len(d.Floats) == 0 && len(d.Histograms) == 0 {
operator.PutFPointSlice(d.Floats)
// TODO: put histogram point slice back in pool

returnSeriesDataSlices(d)
continue
}

Expand Down Expand Up @@ -394,7 +400,7 @@ func (q *Query) Close() {
case promql.Matrix:
for _, s := range v {
operator.PutFPointSlice(s.Floats)
// TODO: put histogram point slice back in pool
operator.PutHPointSlice(s.Histograms)
}

operator.PutMatrix(v)
Expand Down
Loading

0 comments on commit 6db3385

Please sign in to comment.