Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streaming PromQL engine: native histograms #8121

Merged
merged 31 commits into from
May 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
a021ff7
Streaming PromQL engine: native histograms
jhesketh May 13, 2024
3ce17f1
Merge branch 'main' into jhesketh/promql
jhesketh May 14, 2024
1970ce3
Use seriesPerResultBucketFactor for histogram pool
jhesketh May 14, 2024
a75e358
Don't defer returning slices
jhesketh May 14, 2024
0b8a160
Refactor slice returning logic
jhesketh May 15, 2024
5c7a2cd
Move returnSlices helper
jhesketh May 15, 2024
235a402
rename val for consistency
jhesketh May 15, 2024
72493bf
Update comment from feedback
jhesketh May 21, 2024
3ea8e20
Use returnSeriesDataSlices where missed
jhesketh May 21, 2024
885adfa
Comment out unsupported tests for now
jhesketh May 21, 2024
84be678
Enable upstream native_histogram test file
jhesketh May 21, 2024
d67498b
Remove duplicate test
jhesketh May 21, 2024
4cf927e
Merge branch 'main' into jhesketh/promql
jhesketh May 21, 2024
229b30a
Fix FloatHistogram selector
jhesketh May 24, 2024
7438138
Add range test
jhesketh May 24, 2024
999d860
Add test for mixed floats+histograms
jhesketh May 24, 2024
4d50e3c
Re-enable supported upstream tests
jhesketh May 24, 2024
91ba5c1
Load benchmark samples in batches to avoid OOM'ing
jhesketh May 27, 2024
aeb1b53
Revert "Load benchmark samples in batches to avoid OOM'ing"
jhesketh May 27, 2024
7b79add
Batch by samples instead of series
jhesketh May 27, 2024
1a38e40
Generate native histogram benchmark test data
jhesketh May 27, 2024
f8ae514
Add native histogram benchmark test
jhesketh May 27, 2024
063ed83
Move compare.sh next to other benchmark scripts
jhesketh May 27, 2024
0cff32b
Fix comparison tests for when there are no floats
jhesketh May 27, 2024
b9adee0
Add extra samples to the histogram tests
jhesketh May 27, 2024
7fbde86
Add note on batch size
jhesketh May 29, 2024
14ff8bd
Add quick check that histograms are loaded
jhesketh May 29, 2024
a08733c
Use clearer variables for indexes
jhesketh May 29, 2024
e94b596
Update changelog
jhesketh May 29, 2024
aa095fb
Merge branch 'main' into jhesketh/promql
jhesketh May 29, 2024
e8e5b24
Fix typo
jhesketh May 29, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* [FEATURE] Continuous-test: now runable as a module with `mimir -target=continuous-test`. #7747
* [FEATURE] Store-gateway: Allow specific tenants to be enabled or disabled via `-store-gateway.enabled-tenants` or `-store-gateway.disabled-tenants` CLI flags or their corresponding YAML settings. #7653
* [FEATURE] New `-<prefix>.s3.bucket-lookup-type` flag configures lookup style type, used to access bucket in s3 compatible providers. #7684
* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.promql-engine=streaming`. #7693 #7898 #7899 #8023 #8058 #8096
* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.promql-engine=streaming`. #7693 #7898 #7899 #8023 #8058 #8096 #8121
* [FEATURE] New `/ingester/unregister-on-shutdown` HTTP endpoint allows dynamic access to ingesters' `-ingester.ring.unregister-on-shutdown` configuration. #7739
* [FEATURE] Server: added experimental [PROXY protocol support](https://www.haproxy.org/download/2.3/doc/proxy-protocol.txt). The PROXY protocol support can be enabled via `-server.proxy-protocol-enabled=true`. When enabled, the support is added both to HTTP and gRPC listening ports. #7698
* [FEATURE] mimirtool: Add `runtime-config verify` sub-command, for verifying Mimir runtime config files. #8123
Expand Down
7 changes: 6 additions & 1 deletion pkg/streamingpromql/benchmarks/benchmarks.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,18 @@ func (c BenchCase) Run(ctx context.Context, t testing.TB, start, end time.Time,
return res, qry.Close
}

// These test cases are taken from https://github.com/prometheus/prometheus/blob/main/promql/bench_test.go.
// These test cases are taken from https://github.com/prometheus/prometheus/blob/main/promql/bench_test.go
// and enhanced/added to.
func TestCases(metricSizes []int) []BenchCase {
cases := []BenchCase{
// Plain retrieval.
{
Expr: "a_X",
},
// Histogram retrieval
{
Expr: "nh_X",
},
// Range vector selector.
{
Expr: "a_X[1m]",
Expand Down
34 changes: 32 additions & 2 deletions pkg/streamingpromql/benchmarks/comparison_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,28 @@ func TestBenchmarkSetup(t *testing.T) {
}

require.Equal(t, expectedPoints, series.Floats)

// Check native histograms are set up correctly
query, err = streamingEngine.NewRangeQuery(ctx, q, nil, "nh_1", time.Unix(0, 0), time.Unix(int64(15*intervalSeconds), 0), interval)
require.NoError(t, err)

t.Cleanup(query.Close)
result = query.Exec(ctx)
require.NoError(t, result.Err)

matrix, err = result.Matrix()
require.NoError(t, err)

require.Len(t, matrix, 1)
series = matrix[0]
require.Equal(t, labels.FromStrings("__name__", "nh_1"), series.Metric)
require.Len(t, series.Floats, 0)
require.Len(t, series.Histograms, 16)

// Check one histogram point is as expected
require.Equal(t, int64(0), series.Histograms[0].T)
require.Equal(t, 12.0, series.Histograms[0].H.Count)
require.Equal(t, 18.4, series.Histograms[0].H.Sum)
}

// Why do we do this rather than require.Equal(t, expected, actual)?
Expand Down Expand Up @@ -194,7 +216,11 @@ func requireEqualResults(t testing.TB, expected, actual *promql.Result) {
require.Equal(t, expectedSample.Metric, actualSample.Metric)
require.Equal(t, expectedSample.T, actualSample.T)
require.Equal(t, expectedSample.H, actualSample.H)
require.InEpsilon(t, expectedSample.F, actualSample.F, 1e-10)
if expectedSample.F == 0 {
require.Equal(t, expectedSample.F, actualSample.F)
} else {
require.InEpsilon(t, expectedSample.F, actualSample.F, 1e-10)
}
}
case parser.ValueTypeMatrix:
expectedMatrix, err := expected.Matrix()
Expand All @@ -214,7 +240,11 @@ func requireEqualResults(t testing.TB, expected, actual *promql.Result) {
actualPoint := actualSeries.Floats[j]

require.Equal(t, expectedPoint.T, actualPoint.T)
require.InEpsilonf(t, expectedPoint.F, actualPoint.F, 1e-10, "expected series %v to have points %v, but result is %v", expectedSeries.Metric.String(), expectedSeries.Floats, actualSeries.Floats)
if expectedPoint.F == 0 {
require.Equal(t, expectedPoint.F, actualPoint.F)
} else {
require.InEpsilonf(t, expectedPoint.F, actualPoint.F, 1e-10, "expected series %v to have points %v, but result is %v", expectedSeries.Metric.String(), expectedSeries.Floats, actualSeries.Floats)
}
}
}
default:
Expand Down
74 changes: 56 additions & 18 deletions pkg/streamingpromql/benchmarks/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"net"
"path/filepath"
"strconv"
"strings"
"time"

"github.com/go-kit/log"
Expand Down Expand Up @@ -174,7 +175,7 @@ func pushTestData(ing *ingester.Ingester, metricSizes []int) error {
totalMetrics := 0

for _, size := range metricSizes {
totalMetrics += (2 + histogramBuckets + 1) * size // 2 non-histogram metrics + 5 metrics for histogram buckets + 1 metric for +Inf histogram bucket
totalMetrics += (2 + histogramBuckets + 1 + 1) * size // 2 non-histogram metrics + 5 metrics for histogram buckets + 1 metric for +Inf histogram bucket + 1 metric for native-histograms
}

metrics := make([]labels.Labels, 0, totalMetrics)
Expand All @@ -183,6 +184,7 @@ func pushTestData(ing *ingester.Ingester, metricSizes []int) error {
aName := "a_" + strconv.Itoa(size)
bName := "b_" + strconv.Itoa(size)
histogramName := "h_" + strconv.Itoa(size)
nativeHistogramName := "nh_" + strconv.Itoa(size)

if size == 1 {
// We don't want a "l" label on metrics with one series (some test cases rely on this label not being present).
Expand All @@ -192,6 +194,7 @@ func pushTestData(ing *ingester.Ingester, metricSizes []int) error {
metrics = append(metrics, labels.FromStrings("__name__", histogramName, "le", strconv.Itoa(le)))
}
metrics = append(metrics, labels.FromStrings("__name__", histogramName, "le", "+Inf"))
metrics = append(metrics, labels.FromStrings("__name__", nativeHistogramName))
} else {
for i := 0; i < size; i++ {
metrics = append(metrics, labels.FromStrings("__name__", aName, "l", strconv.Itoa(i)))
Expand All @@ -200,34 +203,69 @@ func pushTestData(ing *ingester.Ingester, metricSizes []int) error {
metrics = append(metrics, labels.FromStrings("__name__", histogramName, "l", strconv.Itoa(i), "le", strconv.Itoa(le)))
}
metrics = append(metrics, labels.FromStrings("__name__", histogramName, "l", strconv.Itoa(i), "le", "+Inf"))
metrics = append(metrics, labels.FromStrings("__name__", nativeHistogramName, "l", strconv.Itoa(i)))
}
}
}

ctx := user.InjectOrgID(context.Background(), UserID)
req := &mimirpb.WriteRequest{
Timeseries: make([]mimirpb.PreallocTimeseries, len(metrics)),
}

for i, m := range metrics {
series := mimirpb.PreallocTimeseries{TimeSeries: &mimirpb.TimeSeries{
Labels: mimirpb.FromLabelsToLabelAdapters(m),
Samples: make([]mimirpb.Sample, NumIntervals),
}}
// Batch samples into separate requests
// There is no precise science behind this number currently.
// A quick run locally found batching by 100 did not increase the loading time by any noticeable amount.
// Additionally memory usage maxed about 4GB for the whole process.
batchSize := 100
jhesketh marked this conversation as resolved.
Show resolved Hide resolved
for start := 0; start < NumIntervals; start += batchSize {
end := start + batchSize
if end > NumIntervals {
end = NumIntervals
}

for s := 0; s < NumIntervals; s++ {
series.Samples[s].TimestampMs = int64(s) * interval.Milliseconds()
series.Samples[s].Value = float64(s) + float64(i)/float64(len(metrics))
req := &mimirpb.WriteRequest{
Timeseries: make([]mimirpb.PreallocTimeseries, len(metrics)),
}

req.Timeseries[i] = series
}
for metricIdx, m := range metrics {
if strings.HasPrefix(m.Get("__name__"), "nh_") {
series := mimirpb.PreallocTimeseries{TimeSeries: &mimirpb.TimeSeries{
Labels: mimirpb.FromLabelsToLabelAdapters(m.Copy()),
Histograms: make([]mimirpb.Histogram, end-start),
}}

for ts := start; ts < end; ts++ {
// TODO(jhesketh): Fix this with some better data
charleskorn marked this conversation as resolved.
Show resolved Hide resolved
series.Histograms[ts-start].Timestamp = int64(ts) * interval.Milliseconds()
series.Histograms[ts-start].Count = &mimirpb.Histogram_CountInt{CountInt: 12}
series.Histograms[ts-start].ZeroCount = &mimirpb.Histogram_ZeroCountInt{ZeroCountInt: 2}
series.Histograms[ts-start].ZeroThreshold = 0.001
series.Histograms[ts-start].Sum = 18.4
series.Histograms[ts-start].Schema = 0
series.Histograms[ts-start].NegativeSpans = []mimirpb.BucketSpan{{Offset: 0, Length: 2}, {Offset: 1, Length: 2}}
series.Histograms[ts-start].NegativeDeltas = []int64{1, 1, -1, 0}
series.Histograms[ts-start].PositiveSpans = []mimirpb.BucketSpan{{Offset: 0, Length: 2}, {Offset: 1, Length: 2}}
series.Histograms[ts-start].PositiveDeltas = []int64{1, 1, -1, 0}
}

if _, err := ing.Push(ctx, req); err != nil {
return fmt.Errorf("failed to push samples to ingester: %w", err)
}
req.Timeseries[metricIdx] = series
} else {
series := mimirpb.PreallocTimeseries{TimeSeries: &mimirpb.TimeSeries{
Labels: mimirpb.FromLabelsToLabelAdapters(m.Copy()),
Samples: make([]mimirpb.Sample, end-start),
}}

ing.Flush()
for ts := start; ts < end; ts++ {
series.Samples[ts-start].TimestampMs = int64(ts) * interval.Milliseconds()
series.Samples[ts-start].Value = float64(ts) + float64(metricIdx)/float64(len(metrics))
}

req.Timeseries[metricIdx] = series
}
}
if _, err := ing.Push(ctx, req); err != nil {
return fmt.Errorf("failed to push samples to ingester: %w", err)
}
ing.Flush()
}

return nil
}
37 changes: 25 additions & 12 deletions pkg/streamingpromql/operator/instant_vector_selector.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ package operator

import (
"context"
"errors"
"fmt"

"github.com/prometheus/prometheus/model/histogram"
Expand Down Expand Up @@ -48,13 +47,11 @@ func (v *InstantVectorSelector) NextSeries(_ context.Context) (InstantVectorSeri

v.memoizedIterator.Reset(v.chunkIterator)

data := InstantVectorSeriesData{
Floats: GetFPointSlice(v.numSteps), // TODO: only allocate this if we have any floats (once we support native histograms)
}
data := InstantVectorSeriesData{}

for stepT := v.Selector.Start; stepT <= v.Selector.End; stepT += v.Selector.Interval {
var t int64
jhesketh marked this conversation as resolved.
Show resolved Hide resolved
var val float64
var f float64
var h *histogram.FloatHistogram

ts := stepT
Expand All @@ -70,26 +67,42 @@ func (v *InstantVectorSelector) NextSeries(_ context.Context) (InstantVectorSeri
return InstantVectorSeriesData{}, v.memoizedIterator.Err()
}
case chunkenc.ValFloat:
t, val = v.memoizedIterator.At()
t, f = v.memoizedIterator.At()
case chunkenc.ValHistogram, chunkenc.ValFloatHistogram:
t, h = v.memoizedIterator.AtFloatHistogram()
default:
return InstantVectorSeriesData{}, fmt.Errorf("streaming PromQL engine: unknown value type %s", valueType.String())
}

if valueType == chunkenc.ValNone || t > ts {
var ok bool
t, val, h, ok = v.memoizedIterator.PeekPrev()
if h != nil {
return InstantVectorSeriesData{}, errors.New("streaming PromQL engine doesn't support histograms yet")
}
t, f, h, ok = v.memoizedIterator.PeekPrev()
if !ok || t < ts-v.Selector.LookbackDelta.Milliseconds() {
continue
}
}
if value.IsStaleNaN(val) || (h != nil && value.IsStaleNaN(h.Sum)) {
if value.IsStaleNaN(f) || (h != nil && value.IsStaleNaN(h.Sum)) {
continue
}

data.Floats = append(data.Floats, promql.FPoint{T: stepT, F: val})
// if (f, h) have been set by PeekPrev, we do not know if f is 0 because that's the actual value, or because
// the previous value had a histogram.
// PeekPrev will set the histogram to nil, or the value to 0 if the other type exists.
// So check if histograms is nil first. If we don't have a histogram, then we should have a value and vice-versa.
if h != nil {
if len(data.Histograms) == 0 {
// Only create the slice once we know the series is a histogram or not.
// (It is possible to over-allocate in the case where we have both floats and histograms, but that won't be common).
data.Histograms = GetHPointSlice(v.numSteps)
}
data.Histograms = append(data.Histograms, promql.HPoint{T: stepT, H: h})
} else {
if len(data.Floats) == 0 {
// Only create the slice once we know the series is a histogram or not
data.Floats = GetFPointSlice(v.numSteps)
}
data.Floats = append(data.Floats, promql.FPoint{T: stepT, F: f})
}
}

if v.memoizedIterator.Err() != nil {
Expand Down
12 changes: 12 additions & 0 deletions pkg/streamingpromql/operator/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ var (
return make([]promql.FPoint, 0, size)
})

hPointSlicePool = pool.NewBucketedPool(1, maxExpectedPointsPerSeries, seriesPerResultBucketFactor, func(size int) []promql.HPoint {
return make([]promql.HPoint, 0, size)
})

matrixPool = pool.NewBucketedPool(1, maxExpectedSeriesPerResult, seriesPerResultBucketFactor, func(size int) promql.Matrix {
return make(promql.Matrix, 0, size)
})
Expand Down Expand Up @@ -51,6 +55,14 @@ func PutFPointSlice(s []promql.FPoint) {
fPointSlicePool.Put(s)
}

func GetHPointSlice(size int) []promql.HPoint {
return hPointSlicePool.Get(size)
}

func PutHPointSlice(s []promql.HPoint) {
hPointSlicePool.Put(s)
}

func GetMatrix(size int) promql.Matrix {
return matrixPool.Get(size)
}
Expand Down
48 changes: 27 additions & 21 deletions pkg/streamingpromql/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,11 @@ func (q *Query) Exec(ctx context.Context) *promql.Result {
return q.result
}

func returnSeriesDataSlices(d operator.InstantVectorSeriesData) {
operator.PutFPointSlice(d.Floats)
operator.PutHPointSlice(d.Histograms)
}

func (q *Query) populateVectorFromInstantVectorOperator(ctx context.Context, o operator.InstantVectorOperator, series []operator.SeriesMetadata) (promql.Vector, error) {
ts := timeMilliseconds(q.statement.Start)
v := operator.GetVector(len(series))
Expand All @@ -292,26 +297,29 @@ func (q *Query) populateVectorFromInstantVectorOperator(ctx context.Context, o o
return nil, err
}

if len(d.Floats)+len(d.Histograms) != 1 {
operator.PutFPointSlice(d.Floats)
// TODO: put histogram point slice back in pool

if len(d.Floats)+len(d.Histograms) == 0 {
if len(d.Floats) == 1 && len(d.Histograms) == 0 {
point := d.Floats[0]
v = append(v, promql.Sample{
Metric: s.Labels,
T: ts,
F: point.F,
})
} else if len(d.Floats) == 0 && len(d.Histograms) == 1 {
point := d.Histograms[0]
v = append(v, promql.Sample{
Metric: s.Labels,
T: ts,
H: point.H,
})
} else {
returnSeriesDataSlices(d)
// A series may have no data points.
if len(d.Floats) == 0 && len(d.Histograms) == 0 {
continue
}

return nil, fmt.Errorf("expected exactly one sample for series %s, but got %v", s.Labels.String(), len(d.Floats))
return nil, fmt.Errorf("expected exactly one sample for series %s, but got %v floats, %v histograms", s.Labels.String(), len(d.Floats), len(d.Histograms))
}

point := d.Floats[0]
v = append(v, promql.Sample{
Metric: s.Labels,
T: ts,
F: point.F,
})

operator.PutFPointSlice(d.Floats)
// TODO: put histogram point slice back in pool
returnSeriesDataSlices(d)
}

return v, nil
Expand All @@ -331,9 +339,7 @@ func (q *Query) populateMatrixFromInstantVectorOperator(ctx context.Context, o o
}

if len(d.Floats) == 0 && len(d.Histograms) == 0 {
operator.PutFPointSlice(d.Floats)
// TODO: put histogram point slice back in pool

returnSeriesDataSlices(d)
continue
}

Expand Down Expand Up @@ -394,7 +400,7 @@ func (q *Query) Close() {
case promql.Matrix:
for _, s := range v {
operator.PutFPointSlice(s.Floats)
// TODO: put histogram point slice back in pool
operator.PutHPointSlice(s.Histograms)
}

operator.PutMatrix(v)
Expand Down
Loading
Loading