From a021ff7facafe49f4f2b2a834c3dc074c70c0d0c Mon Sep 17 00:00:00 2001 From: Joshua Hesketh Date: Mon, 13 May 2024 20:48:54 +1000 Subject: [PATCH 01/28] Streaming PromQL engine: native histograms Add native histogram support to instant vectors. --- .../operator/instant_vector_selector.go | 28 +++++++---- pkg/streamingpromql/operator/pool.go | 12 +++++ pkg/streamingpromql/query.go | 46 +++++++++++-------- .../testdata/ours/native_histograms.test | 18 ++++++++ 4 files changed, 77 insertions(+), 27 deletions(-) create mode 100644 pkg/streamingpromql/testdata/ours/native_histograms.test diff --git a/pkg/streamingpromql/operator/instant_vector_selector.go b/pkg/streamingpromql/operator/instant_vector_selector.go index 5fd06435bc3..e784d60f974 100644 --- a/pkg/streamingpromql/operator/instant_vector_selector.go +++ b/pkg/streamingpromql/operator/instant_vector_selector.go @@ -7,7 +7,6 @@ package operator import ( "context" - "errors" "fmt" "github.com/prometheus/prometheus/model/histogram" @@ -48,9 +47,7 @@ func (v *InstantVectorSelector) NextSeries(_ context.Context) (InstantVectorSeri v.memoizedIterator.Reset(v.chunkIterator) - data := InstantVectorSeriesData{ - Floats: GetFPointSlice(v.numSteps), // TODO: only allocate this if we have any floats (once we support native histograms) - } + data := InstantVectorSeriesData{} for stepT := v.Selector.Start; stepT <= v.Selector.End; stepT += v.Selector.Interval { var t int64 @@ -71,6 +68,8 @@ func (v *InstantVectorSelector) NextSeries(_ context.Context) (InstantVectorSeri } case chunkenc.ValFloat: t, val = v.memoizedIterator.At() + case chunkenc.ValHistogram: + t, h = v.memoizedIterator.AtFloatHistogram() default: return InstantVectorSeriesData{}, fmt.Errorf("streaming PromQL engine: unknown value type %s", valueType.String()) } @@ -78,9 +77,6 @@ func (v *InstantVectorSelector) NextSeries(_ context.Context) (InstantVectorSeri if valueType == chunkenc.ValNone || t > ts { var ok bool t, val, h, ok = v.memoizedIterator.PeekPrev() - if h != nil { - return InstantVectorSeriesData{}, errors.New("streaming PromQL engine doesn't support histograms yet") - } if !ok || t < ts-v.Selector.LookbackDelta.Milliseconds() { continue } @@ -89,7 +85,23 @@ func (v *InstantVectorSelector) NextSeries(_ context.Context) (InstantVectorSeri continue } - data.Floats = append(data.Floats, promql.FPoint{T: stepT, F: val}) + // if (val, h have been set by PeekPrev, we do not know if val is 0 because that's the actual value, or because + // the previous value had a histogram. + // PeekPrev will set the histogram to nil, or the value to 0 if the other type exists. + // So check if histograms is nil first. If we don't have a histogram, then we should have a value and vice-versa. + if h != nil { + if len(data.Histograms) == 0 { + // Only create the pool once we know the series is a histogram or not + data.Histograms = GetHPointSlice(v.numSteps) + } + data.Histograms = append(data.Histograms, promql.HPoint{T: stepT, H: h}) + } else { + if len(data.Floats) == 0 { + // Only create the pool once we know the series is a histogram or not + data.Floats = GetFPointSlice(v.numSteps) + } + data.Floats = append(data.Floats, promql.FPoint{T: stepT, F: val}) + } } if v.memoizedIterator.Err() != nil { diff --git a/pkg/streamingpromql/operator/pool.go b/pkg/streamingpromql/operator/pool.go index d7b93e99f15..42598981d39 100644 --- a/pkg/streamingpromql/operator/pool.go +++ b/pkg/streamingpromql/operator/pool.go @@ -19,6 +19,10 @@ var ( return make([]promql.FPoint, 0, size) }) + hPointSlicePool = pool.NewBucketedPool(1, maxExpectedPointsPerSeries, 10, func(size int) []promql.HPoint { + return make([]promql.HPoint, 0, size) + }) + matrixPool = pool.NewBucketedPool(1, maxExpectedSeriesPerResult, 10, func(size int) promql.Matrix { return make(promql.Matrix, 0, size) }) @@ -49,6 +53,14 @@ func PutFPointSlice(s []promql.FPoint) { fPointSlicePool.Put(s) } +func GetHPointSlice(size int) []promql.HPoint { + return hPointSlicePool.Get(size) +} + +func PutHPointSlice(s []promql.HPoint) { + hPointSlicePool.Put(s) +} + func GetMatrix(size int) promql.Matrix { return matrixPool.Get(size) } diff --git a/pkg/streamingpromql/query.go b/pkg/streamingpromql/query.go index cae4ace5c61..f04b7d25a00 100644 --- a/pkg/streamingpromql/query.go +++ b/pkg/streamingpromql/query.go @@ -271,26 +271,35 @@ func (q *Query) populateVectorFromInstantVectorOperator(ctx context.Context, o o return nil, err } - if len(d.Floats)+len(d.Histograms) != 1 { - operator.PutFPointSlice(d.Floats) - // TODO: put histogram point slice back in pool - - if len(d.Floats)+len(d.Histograms) == 0 { - continue - } + defer operator.PutFPointSlice(d.Floats) + defer operator.PutHPointSlice(d.Histograms) - return nil, fmt.Errorf("expected exactly one sample for series %s, but got %v", s.Labels.String(), len(d.Floats)) + // A series may have no data points. + if len(d.Floats)+len(d.Histograms) == 0 { + continue } - point := d.Floats[0] - v = append(v, promql.Sample{ - Metric: s.Labels, - T: ts, - F: point.F, - }) + if len(d.Floats)+len(d.Histograms) != 1 { + return nil, fmt.Errorf("expected exactly one sample for series %s, but got %v Floats, %v Histograms", s.Labels.String(), len(d.Floats), len(d.Histograms)) + } - operator.PutFPointSlice(d.Floats) - // TODO: put histogram point slice back in pool + if len(d.Floats) == 1 { + point := d.Floats[0] + v = append(v, promql.Sample{ + Metric: s.Labels, + T: ts, + F: point.F, + }) + } else if len(d.Histograms) == 1 { + point := d.Histograms[0] + v = append(v, promql.Sample{ + Metric: s.Labels, + T: ts, + H: point.H, + }) + } else { + return nil, fmt.Errorf("expected exactly one sample for series %s, but got %v Floats, %v Histograms", s.Labels.String(), len(d.Floats), len(d.Histograms)) + } } return v, nil @@ -311,8 +320,7 @@ func (q *Query) populateMatrixFromInstantVectorOperator(ctx context.Context, o o if len(d.Floats) == 0 && len(d.Histograms) == 0 { operator.PutFPointSlice(d.Floats) - // TODO: put histogram point slice back in pool - + operator.PutHPointSlice(d.Histograms) continue } @@ -373,7 +381,7 @@ func (q *Query) Close() { case promql.Matrix: for _, s := range v { operator.PutFPointSlice(s.Floats) - // TODO: put histogram point slice back in pool + operator.PutHPointSlice(s.Histograms) } operator.PutMatrix(v) diff --git a/pkg/streamingpromql/testdata/ours/native_histograms.test b/pkg/streamingpromql/testdata/ours/native_histograms.test new file mode 100644 index 00000000000..3b9c08d80e1 --- /dev/null +++ b/pkg/streamingpromql/testdata/ours/native_histograms.test @@ -0,0 +1,18 @@ +# SPDX-License-Identifier: AGPL-3.0-only +# Provenance-includes-location: https://github.com/prometheus/prometheus/tree/main/promql/testdata/native_histograms.test +# Provenance-includes-license: Apache-2.0 +# Provenance-includes-copyright: The Prometheus Authors + +# Minimal valid case: an empty histogram. +load 5m + empty_histogram {{}} + +eval instant at 5m empty_histogram + {__name__="empty_histogram"} {{}} + +# buckets:[1 2 1] means 1 observation in the 1st bucket, 2 observations in the 2nd and 1 observation in the 3rd (total 4). +load 5m + single_histogram {{schema:0 sum:5 count:4 buckets:[1 2 1]}} + +eval instant at 5m single_histogram + {__name__="single_histogram"} {{count:4 sum:5 buckets:[1 2 1]}} From 1970ce31d4e34a580d6c0d07c13dd448e4f81c32 Mon Sep 17 00:00:00 2001 From: Joshua Hesketh Date: Tue, 14 May 2024 22:19:49 +1000 Subject: [PATCH 02/28] Use seriesPerResultBucketFactor for histogram pool --- pkg/streamingpromql/operator/pool.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/streamingpromql/operator/pool.go b/pkg/streamingpromql/operator/pool.go index 0fff4770ab6..b33f90db58e 100644 --- a/pkg/streamingpromql/operator/pool.go +++ b/pkg/streamingpromql/operator/pool.go @@ -21,7 +21,7 @@ var ( return make([]promql.FPoint, 0, size) }) - hPointSlicePool = pool.NewBucketedPool(1, maxExpectedPointsPerSeries, 10, func(size int) []promql.HPoint { + hPointSlicePool = pool.NewBucketedPool(1, maxExpectedPointsPerSeries, seriesPerResultBucketFactor, func(size int) []promql.HPoint { return make([]promql.HPoint, 0, size) }) From a75e358d4a600e8c1345ee7ab06c8d34b030e5b9 Mon Sep 17 00:00:00 2001 From: Joshua Hesketh Date: Tue, 14 May 2024 22:29:39 +1000 Subject: [PATCH 03/28] Don't defer returning slices --- pkg/streamingpromql/query.go | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/pkg/streamingpromql/query.go b/pkg/streamingpromql/query.go index 5154b28301b..3cafedf73d9 100644 --- a/pkg/streamingpromql/query.go +++ b/pkg/streamingpromql/query.go @@ -282,6 +282,11 @@ func (q *Query) populateVectorFromInstantVectorOperator(ctx context.Context, o o ts := timeMilliseconds(q.statement.Start) v := operator.GetVector(len(series)) + returnSlices := func(f []promql.FPoint, h []promql.HPoint) { + operator.PutFPointSlice(f) + operator.PutHPointSlice(h) + } + for i, s := range series { d, err := o.NextSeries(ctx) if err != nil { @@ -292,15 +297,13 @@ func (q *Query) populateVectorFromInstantVectorOperator(ctx context.Context, o o return nil, err } - defer operator.PutFPointSlice(d.Floats) - defer operator.PutHPointSlice(d.Histograms) - // A series may have no data points. if len(d.Floats)+len(d.Histograms) == 0 { continue } if len(d.Floats)+len(d.Histograms) != 1 { + returnSlices(d.Floats, d.Histograms) return nil, fmt.Errorf("expected exactly one sample for series %s, but got %v Floats, %v Histograms", s.Labels.String(), len(d.Floats), len(d.Histograms)) } @@ -319,8 +322,10 @@ func (q *Query) populateVectorFromInstantVectorOperator(ctx context.Context, o o H: point.H, }) } else { + returnSlices(d.Floats, d.Histograms) return nil, fmt.Errorf("expected exactly one sample for series %s, but got %v Floats, %v Histograms", s.Labels.String(), len(d.Floats), len(d.Histograms)) } + returnSlices(d.Floats, d.Histograms) } return v, nil From 0b8a1604455a629bec24b4caca7d869f49756e01 Mon Sep 17 00:00:00 2001 From: Joshua Hesketh Date: Wed, 15 May 2024 23:06:11 +1000 Subject: [PATCH 04/28] Refactor slice returning logic --- pkg/streamingpromql/query.go | 20 +++++++------------- 1 file changed, 7 insertions(+), 13 deletions(-) diff --git a/pkg/streamingpromql/query.go b/pkg/streamingpromql/query.go index 3cafedf73d9..c3041d48aa7 100644 --- a/pkg/streamingpromql/query.go +++ b/pkg/streamingpromql/query.go @@ -297,24 +297,14 @@ func (q *Query) populateVectorFromInstantVectorOperator(ctx context.Context, o o return nil, err } - // A series may have no data points. - if len(d.Floats)+len(d.Histograms) == 0 { - continue - } - - if len(d.Floats)+len(d.Histograms) != 1 { - returnSlices(d.Floats, d.Histograms) - return nil, fmt.Errorf("expected exactly one sample for series %s, but got %v Floats, %v Histograms", s.Labels.String(), len(d.Floats), len(d.Histograms)) - } - - if len(d.Floats) == 1 { + if len(d.Floats) == 1 && len(d.Histograms) == 0 { point := d.Floats[0] v = append(v, promql.Sample{ Metric: s.Labels, T: ts, F: point.F, }) - } else if len(d.Histograms) == 1 { + } else if len(d.Floats) == 0 && len(d.Histograms) == 1 { point := d.Histograms[0] v = append(v, promql.Sample{ Metric: s.Labels, @@ -323,7 +313,11 @@ func (q *Query) populateVectorFromInstantVectorOperator(ctx context.Context, o o }) } else { returnSlices(d.Floats, d.Histograms) - return nil, fmt.Errorf("expected exactly one sample for series %s, but got %v Floats, %v Histograms", s.Labels.String(), len(d.Floats), len(d.Histograms)) + // A series may have no data points. + if len(d.Floats) == 0 && len(d.Histograms) == 0 { + continue + } + return nil, fmt.Errorf("expected exactly one sample for series %s, but got %v floats, %v histograms", s.Labels.String(), len(d.Floats), len(d.Histograms)) } returnSlices(d.Floats, d.Histograms) } From 5c7a2cd9ae2645f15837b157de31f4b6d7ee94ff Mon Sep 17 00:00:00 2001 From: Joshua Hesketh Date: Wed, 15 May 2024 23:11:36 +1000 Subject: [PATCH 05/28] Move returnSlices helper --- pkg/streamingpromql/query.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/pkg/streamingpromql/query.go b/pkg/streamingpromql/query.go index c3041d48aa7..9e906bd33ff 100644 --- a/pkg/streamingpromql/query.go +++ b/pkg/streamingpromql/query.go @@ -278,15 +278,15 @@ func (q *Query) Exec(ctx context.Context) *promql.Result { return q.result } +func returnSeriesDataSlices(d operator.InstantVectorSeriesData) { + operator.PutFPointSlice(d.Floats) + operator.PutHPointSlice(d.Histograms) +} + func (q *Query) populateVectorFromInstantVectorOperator(ctx context.Context, o operator.InstantVectorOperator, series []operator.SeriesMetadata) (promql.Vector, error) { ts := timeMilliseconds(q.statement.Start) v := operator.GetVector(len(series)) - returnSlices := func(f []promql.FPoint, h []promql.HPoint) { - operator.PutFPointSlice(f) - operator.PutHPointSlice(h) - } - for i, s := range series { d, err := o.NextSeries(ctx) if err != nil { @@ -312,14 +312,14 @@ func (q *Query) populateVectorFromInstantVectorOperator(ctx context.Context, o o H: point.H, }) } else { - returnSlices(d.Floats, d.Histograms) + returnSeriesDataSlices(d) // A series may have no data points. if len(d.Floats) == 0 && len(d.Histograms) == 0 { continue } return nil, fmt.Errorf("expected exactly one sample for series %s, but got %v floats, %v histograms", s.Labels.String(), len(d.Floats), len(d.Histograms)) } - returnSlices(d.Floats, d.Histograms) + returnSeriesDataSlices(d) } return v, nil From 235a4024a70a52505cfd37b9dd351acb6a9a4baf Mon Sep 17 00:00:00 2001 From: Joshua Hesketh Date: Thu, 16 May 2024 00:09:34 +1000 Subject: [PATCH 06/28] rename val for consistency --- .../operator/instant_vector_selector.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/pkg/streamingpromql/operator/instant_vector_selector.go b/pkg/streamingpromql/operator/instant_vector_selector.go index e784d60f974..001340f0021 100644 --- a/pkg/streamingpromql/operator/instant_vector_selector.go +++ b/pkg/streamingpromql/operator/instant_vector_selector.go @@ -51,7 +51,7 @@ func (v *InstantVectorSelector) NextSeries(_ context.Context) (InstantVectorSeri for stepT := v.Selector.Start; stepT <= v.Selector.End; stepT += v.Selector.Interval { var t int64 - var val float64 + var f float64 var h *histogram.FloatHistogram ts := stepT @@ -67,7 +67,7 @@ func (v *InstantVectorSelector) NextSeries(_ context.Context) (InstantVectorSeri return InstantVectorSeriesData{}, v.memoizedIterator.Err() } case chunkenc.ValFloat: - t, val = v.memoizedIterator.At() + t, f = v.memoizedIterator.At() case chunkenc.ValHistogram: t, h = v.memoizedIterator.AtFloatHistogram() default: @@ -76,31 +76,31 @@ func (v *InstantVectorSelector) NextSeries(_ context.Context) (InstantVectorSeri if valueType == chunkenc.ValNone || t > ts { var ok bool - t, val, h, ok = v.memoizedIterator.PeekPrev() + t, f, h, ok = v.memoizedIterator.PeekPrev() if !ok || t < ts-v.Selector.LookbackDelta.Milliseconds() { continue } } - if value.IsStaleNaN(val) || (h != nil && value.IsStaleNaN(h.Sum)) { + if value.IsStaleNaN(f) || (h != nil && value.IsStaleNaN(h.Sum)) { continue } - // if (val, h have been set by PeekPrev, we do not know if val is 0 because that's the actual value, or because + // if (f, h) have been set by PeekPrev, we do not know if f is 0 because that's the actual value, or because // the previous value had a histogram. // PeekPrev will set the histogram to nil, or the value to 0 if the other type exists. // So check if histograms is nil first. If we don't have a histogram, then we should have a value and vice-versa. if h != nil { if len(data.Histograms) == 0 { - // Only create the pool once we know the series is a histogram or not + // Only create the slice once we know the series is a histogram or not data.Histograms = GetHPointSlice(v.numSteps) } data.Histograms = append(data.Histograms, promql.HPoint{T: stepT, H: h}) } else { if len(data.Floats) == 0 { - // Only create the pool once we know the series is a histogram or not + // Only create the slice once we know the series is a histogram or not data.Floats = GetFPointSlice(v.numSteps) } - data.Floats = append(data.Floats, promql.FPoint{T: stepT, F: val}) + data.Floats = append(data.Floats, promql.FPoint{T: stepT, F: f}) } } From 72493bf80a9fa8e834f915d6211d538c397ff4d6 Mon Sep 17 00:00:00 2001 From: Joshua Hesketh Date: Tue, 21 May 2024 21:41:55 +1000 Subject: [PATCH 07/28] Update comment from feedback --- pkg/streamingpromql/operator/instant_vector_selector.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/streamingpromql/operator/instant_vector_selector.go b/pkg/streamingpromql/operator/instant_vector_selector.go index 001340f0021..633c9218a72 100644 --- a/pkg/streamingpromql/operator/instant_vector_selector.go +++ b/pkg/streamingpromql/operator/instant_vector_selector.go @@ -91,7 +91,8 @@ func (v *InstantVectorSelector) NextSeries(_ context.Context) (InstantVectorSeri // So check if histograms is nil first. If we don't have a histogram, then we should have a value and vice-versa. if h != nil { if len(data.Histograms) == 0 { - // Only create the slice once we know the series is a histogram or not + // Only create the slice once we know the series is a histogram or not. + // (It is possible to over-allocate in the case where we have both floats and histograms, but that won't be common). data.Histograms = GetHPointSlice(v.numSteps) } data.Histograms = append(data.Histograms, promql.HPoint{T: stepT, H: h}) From 3ea8e20c5383e026eefc116f5d03a259d5b54243 Mon Sep 17 00:00:00 2001 From: Joshua Hesketh Date: Tue, 21 May 2024 21:42:03 +1000 Subject: [PATCH 08/28] Use returnSeriesDataSlices where missed --- pkg/streamingpromql/query.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pkg/streamingpromql/query.go b/pkg/streamingpromql/query.go index 9e906bd33ff..93f424049b9 100644 --- a/pkg/streamingpromql/query.go +++ b/pkg/streamingpromql/query.go @@ -339,8 +339,7 @@ func (q *Query) populateMatrixFromInstantVectorOperator(ctx context.Context, o o } if len(d.Floats) == 0 && len(d.Histograms) == 0 { - operator.PutFPointSlice(d.Floats) - operator.PutHPointSlice(d.Histograms) + returnSeriesDataSlices(d) continue } From 885adfa831885716a3341edb3c8f9f3186162dd8 Mon Sep 17 00:00:00 2001 From: Joshua Hesketh Date: Tue, 21 May 2024 21:42:22 +1000 Subject: [PATCH 09/28] Comment out unsupported tests for now --- .../upstream/native_histograms.test.disabled | 305 +++++++++++------- 1 file changed, 183 insertions(+), 122 deletions(-) diff --git a/pkg/streamingpromql/testdata/upstream/native_histograms.test.disabled b/pkg/streamingpromql/testdata/upstream/native_histograms.test.disabled index 392294edd3c..3c76da796d1 100644 --- a/pkg/streamingpromql/testdata/upstream/native_histograms.test.disabled +++ b/pkg/streamingpromql/testdata/upstream/native_histograms.test.disabled @@ -10,20 +10,25 @@ load 5m eval instant at 5m empty_histogram {__name__="empty_histogram"} {{}} -eval instant at 5m histogram_count(empty_histogram) - {} 0 +# Unsupported by streaming engine. +# eval instant at 5m histogram_count(empty_histogram) +# {} 0 -eval instant at 5m histogram_sum(empty_histogram) - {} 0 +# Unsupported by streaming engine. +# eval instant at 5m histogram_sum(empty_histogram) +# {} 0 -eval instant at 5m histogram_avg(empty_histogram) - {} NaN +# Unsupported by streaming engine. +# eval instant at 5m histogram_avg(empty_histogram) +# {} NaN -eval instant at 5m histogram_fraction(-Inf, +Inf, empty_histogram) - {} NaN +# Unsupported by streaming engine. +# eval instant at 5m histogram_fraction(-Inf, +Inf, empty_histogram) +# {} NaN -eval instant at 5m histogram_fraction(0, 8, empty_histogram) - {} NaN +# Unsupported by streaming engine. +# eval instant at 5m histogram_fraction(0, 8, empty_histogram) +# {} NaN @@ -32,28 +37,34 @@ load 5m single_histogram {{schema:0 sum:5 count:4 buckets:[1 2 1]}} # histogram_count extracts the count property from the histogram. -eval instant at 5m histogram_count(single_histogram) - {} 4 +# Unsupported by streaming engine. +# eval instant at 5m histogram_count(single_histogram) +# {} 4 # histogram_sum extracts the sum property from the histogram. -eval instant at 5m histogram_sum(single_histogram) - {} 5 +# Unsupported by streaming engine. +# eval instant at 5m histogram_sum(single_histogram) +# {} 5 # histogram_avg calculates the average from sum and count properties. -eval instant at 5m histogram_avg(single_histogram) - {} 1.25 +# Unsupported by streaming engine. +# eval instant at 5m histogram_avg(single_histogram) +# {} 1.25 # We expect half of the values to fall in the range 1 < x <= 2. -eval instant at 5m histogram_fraction(1, 2, single_histogram) - {} 0.5 +# Unsupported by streaming engine. +# eval instant at 5m histogram_fraction(1, 2, single_histogram) +# {} 0.5 # We expect all values to fall in the range 0 < x <= 8. -eval instant at 5m histogram_fraction(0, 8, single_histogram) - {} 1 +# Unsupported by streaming engine. +# eval instant at 5m histogram_fraction(0, 8, single_histogram) +# {} 1 # Median is 1.5 due to linear estimation of the midpoint of the middle bucket, whose values are within range 1 < x <= 2. -eval instant at 5m histogram_quantile(0.5, single_histogram) - {} 1.5 +# Unsupported by streaming engine. +# eval instant at 5m histogram_quantile(0.5, single_histogram) +# {} 1.5 @@ -61,37 +72,47 @@ eval instant at 5m histogram_quantile(0.5, single_histogram) load 5m multi_histogram {{schema:0 sum:5 count:4 buckets:[1 2 1]}}x10 -eval instant at 5m histogram_count(multi_histogram) - {} 4 +# Unsupported by streaming engine. +# eval instant at 5m histogram_count(multi_histogram) +# {} 4 -eval instant at 5m histogram_sum(multi_histogram) - {} 5 +# Unsupported by streaming engine. +# eval instant at 5m histogram_sum(multi_histogram) +# {} 5 -eval instant at 5m histogram_avg(multi_histogram) - {} 1.25 +# Unsupported by streaming engine. +# eval instant at 5m histogram_avg(multi_histogram) +# {} 1.25 -eval instant at 5m histogram_fraction(1, 2, multi_histogram) - {} 0.5 +# Unsupported by streaming engine. +# eval instant at 5m histogram_fraction(1, 2, multi_histogram) +# {} 0.5 -eval instant at 5m histogram_quantile(0.5, multi_histogram) - {} 1.5 +# Unsupported by streaming engine. +# eval instant at 5m histogram_quantile(0.5, multi_histogram) +# {} 1.5 # Each entry should look the same as the first. -eval instant at 50m histogram_count(multi_histogram) - {} 4 +# Unsupported by streaming engine. +# eval instant at 50m histogram_count(multi_histogram) +# {} 4 -eval instant at 50m histogram_sum(multi_histogram) - {} 5 +# Unsupported by streaming engine. +# eval instant at 50m histogram_sum(multi_histogram) +# {} 5 -eval instant at 50m histogram_avg(multi_histogram) - {} 1.25 +# Unsupported by streaming engine. +# eval instant at 50m histogram_avg(multi_histogram) +# {} 1.25 -eval instant at 50m histogram_fraction(1, 2, multi_histogram) - {} 0.5 +# Unsupported by streaming engine. +# eval instant at 50m histogram_fraction(1, 2, multi_histogram) +# {} 0.5 -eval instant at 50m histogram_quantile(0.5, multi_histogram) - {} 1.5 +# Unsupported by streaming engine. +# eval instant at 50m histogram_quantile(0.5, multi_histogram) +# {} 1.5 @@ -101,49 +122,62 @@ eval instant at 50m histogram_quantile(0.5, multi_histogram) load 5m incr_histogram {{schema:0 sum:4 count:4 buckets:[1 2 1]}}+{{sum:2 count:1 buckets:[1] offset:1}}x10 -eval instant at 5m histogram_count(incr_histogram) - {} 5 +# Unsupported by streaming engine. +# eval instant at 5m histogram_count(incr_histogram) +# {} 5 -eval instant at 5m histogram_sum(incr_histogram) - {} 6 +# Unsupported by streaming engine. +# eval instant at 5m histogram_sum(incr_histogram) +# {} 6 -eval instant at 5m histogram_avg(incr_histogram) - {} 1.2 +# Unsupported by streaming engine. +# eval instant at 5m histogram_avg(incr_histogram) +# {} 1.2 # We expect 3/5ths of the values to fall in the range 1 < x <= 2. -eval instant at 5m histogram_fraction(1, 2, incr_histogram) - {} 0.6 +# Unsupported by streaming engine. +# eval instant at 5m histogram_fraction(1, 2, incr_histogram) +# {} 0.6 -eval instant at 5m histogram_quantile(0.5, incr_histogram) - {} 1.5 +# Unsupported by streaming engine. +# eval instant at 5m histogram_quantile(0.5, incr_histogram) +# {} 1.5 -eval instant at 50m incr_histogram - {__name__="incr_histogram"} {{count:14 sum:24 buckets:[1 12 1]}} +# Unsupported by streaming engine. +# eval instant at 50m incr_histogram +# {__name__="incr_histogram"} {{count:14 sum:24 buckets:[1 12 1]}} -eval instant at 50m histogram_count(incr_histogram) - {} 14 +# Unsupported by streaming engine. +# eval instant at 50m histogram_count(incr_histogram) +# {} 14 -eval instant at 50m histogram_sum(incr_histogram) - {} 24 +# Unsupported by streaming engine. +# eval instant at 50m histogram_sum(incr_histogram) +# {} 24 -eval instant at 50m histogram_avg(incr_histogram) - {} 1.7142857142857142 +# Unsupported by streaming engine. +# eval instant at 50m histogram_avg(incr_histogram) +# {} 1.7142857142857142 # We expect 12/14ths of the values to fall in the range 1 < x <= 2. -eval instant at 50m histogram_fraction(1, 2, incr_histogram) - {} 0.8571428571428571 +# Unsupported by streaming engine. +# eval instant at 50m histogram_fraction(1, 2, incr_histogram) +# {} 0.8571428571428571 -eval instant at 50m histogram_quantile(0.5, incr_histogram) - {} 1.5 +# Unsupported by streaming engine. +# eval instant at 50m histogram_quantile(0.5, incr_histogram) +# {} 1.5 # Per-second average rate of increase should be 1/(5*60) for count and buckets, then 2/(5*60) for sum. -eval instant at 50m rate(incr_histogram[5m]) - {} {{count:0.0033333333333333335 sum:0.006666666666666667 offset:1 buckets:[0.0033333333333333335]}} +# Unsupported by streaming engine. +# eval instant at 50m rate(incr_histogram[5m]) +# {} {{count:0.0033333333333333335 sum:0.006666666666666667 offset:1 buckets:[0.0033333333333333335]}} # Calculate the 50th percentile of observations over the last 10m. -eval instant at 50m histogram_quantile(0.5, rate(incr_histogram[10m])) - {} 1.5 +# Unsupported by streaming engine. +# eval instant at 50m histogram_quantile(0.5, rate(incr_histogram[10m])) +# {} 1.5 @@ -155,21 +189,26 @@ eval instant at 50m histogram_quantile(0.5, rate(incr_histogram[10m])) load 5m low_res_histogram {{schema:-1 sum:4 count:1 buckets:[1] offset:1}}+{{schema:0 sum:4 count:4 buckets:[2 2] offset:1}}x1 -eval instant at 5m low_res_histogram - {__name__="low_res_histogram"} {{schema:-1 count:5 sum:8 offset:1 buckets:[5]}} +# Unsupported by streaming engine. +# eval instant at 5m low_res_histogram +# {__name__="low_res_histogram"} {{schema:-1 count:5 sum:8 offset:1 buckets:[5]}} -eval instant at 5m histogram_count(low_res_histogram) - {} 5 +# Unsupported by streaming engine. +# eval instant at 5m histogram_count(low_res_histogram) +# {} 5 -eval instant at 5m histogram_sum(low_res_histogram) - {} 8 +# Unsupported by streaming engine. +# eval instant at 5m histogram_sum(low_res_histogram) +# {} 8 -eval instant at 5m histogram_avg(low_res_histogram) - {} 1.6 +# Unsupported by streaming engine. +# eval instant at 5m histogram_avg(low_res_histogram) +# {} 1.6 # We expect all values to fall into the lower-resolution bucket with the range 1 < x <= 4. -eval instant at 5m histogram_fraction(1, 4, low_res_histogram) - {} 1 +# Unsupported by streaming engine. +# eval instant at 5m histogram_fraction(1, 4, low_res_histogram) +# {} 1 @@ -178,24 +217,29 @@ eval instant at 5m histogram_fraction(1, 4, low_res_histogram) load 5m single_zero_histogram {{schema:0 z_bucket:1 z_bucket_w:0.5 sum:0.25 count:1}} -eval instant at 5m histogram_count(single_zero_histogram) - {} 1 +# Unsupported by streaming engine. +# eval instant at 5m histogram_count(single_zero_histogram) +# {} 1 -eval instant at 5m histogram_sum(single_zero_histogram) - {} 0.25 +# Unsupported by streaming engine. +# eval instant at 5m histogram_sum(single_zero_histogram) +# {} 0.25 -eval instant at 5m histogram_avg(single_zero_histogram) - {} 0.25 +# Unsupported by streaming engine. +# eval instant at 5m histogram_avg(single_zero_histogram) +# {} 0.25 # When only the zero bucket is populated, or there are negative buckets, the distribution is assumed to be equally # distributed around zero; i.e. that there are an equal number of positive and negative observations. Therefore the # entire distribution must lie within the full range of the zero bucket, in this case: -0.5 < x <= +0.5. -eval instant at 5m histogram_fraction(-0.5, 0.5, single_zero_histogram) - {} 1 +# Unsupported by streaming engine. +# eval instant at 5m histogram_fraction(-0.5, 0.5, single_zero_histogram) +# {} 1 # Half of the observations are estimated to be zero, as this is the midpoint between -0.5 and +0.5. -eval instant at 5m histogram_quantile(0.5, single_zero_histogram) - {} 0 +# Unsupported by streaming engine. +# eval instant at 5m histogram_quantile(0.5, single_zero_histogram) +# {} 0 @@ -203,21 +247,26 @@ eval instant at 5m histogram_quantile(0.5, single_zero_histogram) load 5m negative_histogram {{schema:0 sum:-5 count:4 n_buckets:[1 2 1]}} -eval instant at 5m histogram_count(negative_histogram) - {} 4 +# Unsupported by streaming engine. +# eval instant at 5m histogram_count(negative_histogram) +# {} 4 -eval instant at 5m histogram_sum(negative_histogram) - {} -5 +# Unsupported by streaming engine. +# eval instant at 5m histogram_sum(negative_histogram) +# {} -5 -eval instant at 5m histogram_avg(negative_histogram) - {} -1.25 +# Unsupported by streaming engine. +# eval instant at 5m histogram_avg(negative_histogram) +# {} -1.25 # We expect half of the values to fall in the range -2 < x <= -1. -eval instant at 5m histogram_fraction(-2, -1, negative_histogram) - {} 0.5 +# Unsupported by streaming engine. +# eval instant at 5m histogram_fraction(-2, -1, negative_histogram) +# {} 0.5 -eval instant at 5m histogram_quantile(0.5, negative_histogram) - {} -1.5 +# Unsupported by streaming engine. +# eval instant at 5m histogram_quantile(0.5, negative_histogram) +# {} -1.5 @@ -226,20 +275,25 @@ load 5m two_samples_histogram {{schema:0 sum:4 count:4 buckets:[1 2 1]}} {{schema:0 sum:-4 count:4 n_buckets:[1 2 1]}} # We expect to see the newest sample. -eval instant at 10m histogram_count(two_samples_histogram) - {} 4 +# Unsupported by streaming engine. +# eval instant at 10m histogram_count(two_samples_histogram) +# {} 4 -eval instant at 10m histogram_sum(two_samples_histogram) - {} -4 +# Unsupported by streaming engine. +# eval instant at 10m histogram_sum(two_samples_histogram) +# {} -4 -eval instant at 10m histogram_avg(two_samples_histogram) - {} -1 +# Unsupported by streaming engine. +# eval instant at 10m histogram_avg(two_samples_histogram) +# {} -1 -eval instant at 10m histogram_fraction(-2, -1, two_samples_histogram) - {} 0.5 +# Unsupported by streaming engine. +# eval instant at 10m histogram_fraction(-2, -1, two_samples_histogram) +# {} 0.5 -eval instant at 10m histogram_quantile(0.5, two_samples_histogram) - {} -1.5 +# Unsupported by streaming engine. +# eval instant at 10m histogram_quantile(0.5, two_samples_histogram) +# {} -1.5 @@ -247,30 +301,37 @@ eval instant at 10m histogram_quantile(0.5, two_samples_histogram) load 5m balanced_histogram {{schema:0 sum:4 count:4 buckets:[1 2 1]}}+{{schema:0 sum:-4 count:4 n_buckets:[1 2 1]}}x1 -eval instant at 5m histogram_count(balanced_histogram) - {} 8 +# Unsupported by streaming engine. +# eval instant at 5m histogram_count(balanced_histogram) +# {} 8 -eval instant at 5m histogram_sum(balanced_histogram) - {} 0 +# Unsupported by streaming engine. +# eval instant at 5m histogram_sum(balanced_histogram) +# {} 0 -eval instant at 5m histogram_avg(balanced_histogram) - {} 0 +# Unsupported by streaming engine. +# eval instant at 5m histogram_avg(balanced_histogram) +# {} 0 -eval instant at 5m histogram_fraction(0, 4, balanced_histogram) - {} 0.5 +# Unsupported by streaming engine. +# eval instant at 5m histogram_fraction(0, 4, balanced_histogram) +# {} 0.5 # If the quantile happens to be located in a span of empty buckets, the actually returned value is the lower bound of # the first populated bucket after the span of empty buckets. -eval instant at 5m histogram_quantile(0.5, balanced_histogram) - {} 0.5 +# Unsupported by streaming engine. +# eval instant at 5m histogram_quantile(0.5, balanced_histogram) +# {} 0.5 # Add histogram to test sum(last_over_time) regression load 5m incr_sum_histogram{number="1"} {{schema:0 sum:0 count:0 buckets:[1]}}+{{schema:0 sum:1 count:1 buckets:[1]}}x10 incr_sum_histogram{number="2"} {{schema:0 sum:0 count:0 buckets:[1]}}+{{schema:0 sum:2 count:1 buckets:[1]}}x10 -eval instant at 50m histogram_sum(sum(incr_sum_histogram)) - {} 30 +# Unsupported by streaming engine. +# eval instant at 50m histogram_sum(sum(incr_sum_histogram)) +# {} 30 -eval instant at 50m histogram_sum(sum(last_over_time(incr_sum_histogram[5m]))) - {} 30 +# Unsupported by streaming engine. +# eval instant at 50m histogram_sum(sum(last_over_time(incr_sum_histogram[5m]))) +# {} 30 From 84be678c184066da7a608578933d339b04d66971 Mon Sep 17 00:00:00 2001 From: Joshua Hesketh Date: Tue, 21 May 2024 21:42:37 +1000 Subject: [PATCH 10/28] Enable upstream native_histogram test file --- .../{native_histograms.test.disabled => native_histograms.test} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename pkg/streamingpromql/testdata/upstream/{native_histograms.test.disabled => native_histograms.test} (100%) diff --git a/pkg/streamingpromql/testdata/upstream/native_histograms.test.disabled b/pkg/streamingpromql/testdata/upstream/native_histograms.test similarity index 100% rename from pkg/streamingpromql/testdata/upstream/native_histograms.test.disabled rename to pkg/streamingpromql/testdata/upstream/native_histograms.test From d67498bd10a3b94588664e0f2e94f5634f90d6dc Mon Sep 17 00:00:00 2001 From: Joshua Hesketh Date: Tue, 21 May 2024 21:43:56 +1000 Subject: [PATCH 11/28] Remove duplicate test --- pkg/streamingpromql/testdata/ours/native_histograms.test | 7 ------- 1 file changed, 7 deletions(-) diff --git a/pkg/streamingpromql/testdata/ours/native_histograms.test b/pkg/streamingpromql/testdata/ours/native_histograms.test index 3b9c08d80e1..9466997c254 100644 --- a/pkg/streamingpromql/testdata/ours/native_histograms.test +++ b/pkg/streamingpromql/testdata/ours/native_histograms.test @@ -3,13 +3,6 @@ # Provenance-includes-license: Apache-2.0 # Provenance-includes-copyright: The Prometheus Authors -# Minimal valid case: an empty histogram. -load 5m - empty_histogram {{}} - -eval instant at 5m empty_histogram - {__name__="empty_histogram"} {{}} - # buckets:[1 2 1] means 1 observation in the 1st bucket, 2 observations in the 2nd and 1 observation in the 3rd (total 4). load 5m single_histogram {{schema:0 sum:5 count:4 buckets:[1 2 1]}} From 229b30ac89ef269e427d6a49345d5398a139294b Mon Sep 17 00:00:00 2001 From: Joshua Hesketh Date: Fri, 24 May 2024 18:02:21 +1000 Subject: [PATCH 12/28] Fix FloatHistogram selector --- pkg/streamingpromql/operator/instant_vector_selector.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/streamingpromql/operator/instant_vector_selector.go b/pkg/streamingpromql/operator/instant_vector_selector.go index 633c9218a72..d4903272bb4 100644 --- a/pkg/streamingpromql/operator/instant_vector_selector.go +++ b/pkg/streamingpromql/operator/instant_vector_selector.go @@ -68,7 +68,7 @@ func (v *InstantVectorSelector) NextSeries(_ context.Context) (InstantVectorSeri } case chunkenc.ValFloat: t, f = v.memoizedIterator.At() - case chunkenc.ValHistogram: + case chunkenc.ValHistogram, chunkenc.ValFloatHistogram: t, h = v.memoizedIterator.AtFloatHistogram() default: return InstantVectorSeriesData{}, fmt.Errorf("streaming PromQL engine: unknown value type %s", valueType.String()) From 7438138e1b90c9180bb805544d5ce9f4b0e05eaa Mon Sep 17 00:00:00 2001 From: Joshua Hesketh Date: Fri, 24 May 2024 18:02:49 +1000 Subject: [PATCH 13/28] Add range test --- pkg/streamingpromql/testdata/ours/native_histograms.test | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/pkg/streamingpromql/testdata/ours/native_histograms.test b/pkg/streamingpromql/testdata/ours/native_histograms.test index 9466997c254..d1532d46d6b 100644 --- a/pkg/streamingpromql/testdata/ours/native_histograms.test +++ b/pkg/streamingpromql/testdata/ours/native_histograms.test @@ -9,3 +9,8 @@ load 5m eval instant at 5m single_histogram {__name__="single_histogram"} {{count:4 sum:5 buckets:[1 2 1]}} + +eval range from 0 to 4m step 1m single_histogram + {__name__="single_histogram"} {{schema:0 sum:5 count:4 buckets:[1 2 1]}}x4 + +clear From 999d860cb0116d36bb844bc3e215d6f7399fa357 Mon Sep 17 00:00:00 2001 From: Joshua Hesketh Date: Fri, 24 May 2024 18:03:04 +1000 Subject: [PATCH 14/28] Add test for mixed floats+histograms --- pkg/streamingpromql/testdata/ours/native_histograms.test | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/pkg/streamingpromql/testdata/ours/native_histograms.test b/pkg/streamingpromql/testdata/ours/native_histograms.test index d1532d46d6b..33594092d18 100644 --- a/pkg/streamingpromql/testdata/ours/native_histograms.test +++ b/pkg/streamingpromql/testdata/ours/native_histograms.test @@ -14,3 +14,10 @@ eval range from 0 to 4m step 1m single_histogram {__name__="single_histogram"} {{schema:0 sum:5 count:4 buckets:[1 2 1]}}x4 clear + +# Test metric with mixed floats and histograms +load 1m + mixed_metric 1 2 3 {{schema:0 sum:5 count:4 buckets:[1 2 1]}} {{schema:0 sum:8 count:6 buckets:[1 4 1]}} + +eval range from 0 to 4m step 1m mixed_metric + {__name__="mixed_metric"} 1 2 3 {{count:4 sum:5 buckets:[1 2 1]}} {{count:6 sum:8 buckets:[1 4 1]}} From 4d50e3ca2a489bd1ba4e0ce8da94ad314e877f8f Mon Sep 17 00:00:00 2001 From: Joshua Hesketh Date: Fri, 24 May 2024 18:03:17 +1000 Subject: [PATCH 15/28] Re-enable supported upstream tests --- .../testdata/upstream/native_histograms.test | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/pkg/streamingpromql/testdata/upstream/native_histograms.test b/pkg/streamingpromql/testdata/upstream/native_histograms.test index 3c76da796d1..ff107a7c20c 100644 --- a/pkg/streamingpromql/testdata/upstream/native_histograms.test +++ b/pkg/streamingpromql/testdata/upstream/native_histograms.test @@ -144,9 +144,8 @@ load 5m # {} 1.5 -# Unsupported by streaming engine. -# eval instant at 50m incr_histogram -# {__name__="incr_histogram"} {{count:14 sum:24 buckets:[1 12 1]}} +eval instant at 50m incr_histogram + {__name__="incr_histogram"} {{count:14 sum:24 buckets:[1 12 1]}} # Unsupported by streaming engine. # eval instant at 50m histogram_count(incr_histogram) @@ -189,9 +188,8 @@ load 5m load 5m low_res_histogram {{schema:-1 sum:4 count:1 buckets:[1] offset:1}}+{{schema:0 sum:4 count:4 buckets:[2 2] offset:1}}x1 -# Unsupported by streaming engine. -# eval instant at 5m low_res_histogram -# {__name__="low_res_histogram"} {{schema:-1 count:5 sum:8 offset:1 buckets:[5]}} +eval instant at 5m low_res_histogram + {__name__="low_res_histogram"} {{schema:-1 count:5 sum:8 offset:1 buckets:[5]}} # Unsupported by streaming engine. # eval instant at 5m histogram_count(low_res_histogram) From 91ba5c14709348ef5a64a6f1b2fb2caf224fd6a7 Mon Sep 17 00:00:00 2001 From: Joshua Hesketh Date: Mon, 27 May 2024 13:55:31 +1000 Subject: [PATCH 16/28] Load benchmark samples in batches to avoid OOM'ing --- pkg/streamingpromql/benchmarks/ingester.go | 50 ++++++++++++++-------- 1 file changed, 33 insertions(+), 17 deletions(-) diff --git a/pkg/streamingpromql/benchmarks/ingester.go b/pkg/streamingpromql/benchmarks/ingester.go index 61b5621d942..6bfe89b3551 100644 --- a/pkg/streamingpromql/benchmarks/ingester.go +++ b/pkg/streamingpromql/benchmarks/ingester.go @@ -23,6 +23,7 @@ import ( "github.com/grafana/dskit/ring" "github.com/grafana/dskit/services" "github.com/grafana/dskit/user" + "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" "google.golang.org/grpc" @@ -64,6 +65,7 @@ func startBenchmarkIngester(rootDataDir string) (*ingester.Ingester, string, fun limits := defaultLimitsTestConfig() limits.NativeHistogramsIngestionEnabled = true + limits.OutOfOrderTimeWindow = model.Duration(time.Duration(NumIntervals+1) * interval) overrides, err := validation.NewOverrides(limits, nil) if err != nil { @@ -205,29 +207,43 @@ func pushTestData(ing *ingester.Ingester, metricSizes []int) error { } ctx := user.InjectOrgID(context.Background(), UserID) - req := &mimirpb.WriteRequest{ - Timeseries: make([]mimirpb.PreallocTimeseries, len(metrics)), - } - for i, m := range metrics { - series := mimirpb.PreallocTimeseries{TimeSeries: &mimirpb.TimeSeries{ - Labels: mimirpb.FromLabelsToLabelAdapters(m), - Samples: make([]mimirpb.Sample, NumIntervals), - }} + // Send the samples in batches to reduce total startup memory usage + numBatches := 10 + batchSize := int(math.Ceil(float64(totalMetrics) / float64(numBatches))) - for s := 0; s < NumIntervals; s++ { - series.Samples[s].TimestampMs = int64(s) * interval.Milliseconds() - series.Samples[s].Value = float64(s) + float64(i)/float64(len(metrics)) + for i := 0; i < totalMetrics; i += batchSize { + end := i + batchSize + if end > totalMetrics { + end = totalMetrics } + batch := metrics[i:end] - req.Timeseries[i] = series - } + // Create a request per batch + req := &mimirpb.WriteRequest{ + Timeseries: make([]mimirpb.PreallocTimeseries, len(batch)), + } - if _, err := ing.Push(ctx, req); err != nil { - return fmt.Errorf("failed to push samples to ingester: %w", err) - } + for j, m := range batch { + series := mimirpb.PreallocTimeseries{TimeSeries: &mimirpb.TimeSeries{ + Labels: mimirpb.FromLabelsToLabelAdapters(m), + Samples: make([]mimirpb.Sample, NumIntervals), + }} + + for s := 0; s < NumIntervals; s++ { + series.Samples[s].TimestampMs = int64(s) * interval.Milliseconds() + series.Samples[s].Value = float64(s) + float64(i+j)/float64(len(batch)) + } - ing.Flush() + req.Timeseries[j] = series + } + + if _, err := ing.Push(ctx, req); err != nil { + return fmt.Errorf("failed to push samples to ingester: %w", err) + } + + ing.Flush() + } return nil } From aeb1b537e69fd3ee231f72bee805eb06841ed7f1 Mon Sep 17 00:00:00 2001 From: Joshua Hesketh Date: Mon, 27 May 2024 14:55:19 +1000 Subject: [PATCH 17/28] Revert "Load benchmark samples in batches to avoid OOM'ing" We can't use OOO samples with native histograms, so instead of batching by series we'll batch by samples This reverts commit 91ba5c14709348ef5a64a6f1b2fb2caf224fd6a7. --- pkg/streamingpromql/benchmarks/ingester.go | 50 ++++++++-------------- 1 file changed, 17 insertions(+), 33 deletions(-) diff --git a/pkg/streamingpromql/benchmarks/ingester.go b/pkg/streamingpromql/benchmarks/ingester.go index 6bfe89b3551..61b5621d942 100644 --- a/pkg/streamingpromql/benchmarks/ingester.go +++ b/pkg/streamingpromql/benchmarks/ingester.go @@ -23,7 +23,6 @@ import ( "github.com/grafana/dskit/ring" "github.com/grafana/dskit/services" "github.com/grafana/dskit/user" - "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" "google.golang.org/grpc" @@ -65,7 +64,6 @@ func startBenchmarkIngester(rootDataDir string) (*ingester.Ingester, string, fun limits := defaultLimitsTestConfig() limits.NativeHistogramsIngestionEnabled = true - limits.OutOfOrderTimeWindow = model.Duration(time.Duration(NumIntervals+1) * interval) overrides, err := validation.NewOverrides(limits, nil) if err != nil { @@ -207,43 +205,29 @@ func pushTestData(ing *ingester.Ingester, metricSizes []int) error { } ctx := user.InjectOrgID(context.Background(), UserID) + req := &mimirpb.WriteRequest{ + Timeseries: make([]mimirpb.PreallocTimeseries, len(metrics)), + } - // Send the samples in batches to reduce total startup memory usage - numBatches := 10 - batchSize := int(math.Ceil(float64(totalMetrics) / float64(numBatches))) - - for i := 0; i < totalMetrics; i += batchSize { - end := i + batchSize - if end > totalMetrics { - end = totalMetrics - } - batch := metrics[i:end] - - // Create a request per batch - req := &mimirpb.WriteRequest{ - Timeseries: make([]mimirpb.PreallocTimeseries, len(batch)), - } - - for j, m := range batch { - series := mimirpb.PreallocTimeseries{TimeSeries: &mimirpb.TimeSeries{ - Labels: mimirpb.FromLabelsToLabelAdapters(m), - Samples: make([]mimirpb.Sample, NumIntervals), - }} - - for s := 0; s < NumIntervals; s++ { - series.Samples[s].TimestampMs = int64(s) * interval.Milliseconds() - series.Samples[s].Value = float64(s) + float64(i+j)/float64(len(batch)) - } + for i, m := range metrics { + series := mimirpb.PreallocTimeseries{TimeSeries: &mimirpb.TimeSeries{ + Labels: mimirpb.FromLabelsToLabelAdapters(m), + Samples: make([]mimirpb.Sample, NumIntervals), + }} - req.Timeseries[j] = series + for s := 0; s < NumIntervals; s++ { + series.Samples[s].TimestampMs = int64(s) * interval.Milliseconds() + series.Samples[s].Value = float64(s) + float64(i)/float64(len(metrics)) } - if _, err := ing.Push(ctx, req); err != nil { - return fmt.Errorf("failed to push samples to ingester: %w", err) - } + req.Timeseries[i] = series + } - ing.Flush() + if _, err := ing.Push(ctx, req); err != nil { + return fmt.Errorf("failed to push samples to ingester: %w", err) } + ing.Flush() + return nil } From 7b79addc4a95aea4daefb1fe3e81b9a6e9ced6da Mon Sep 17 00:00:00 2001 From: Joshua Hesketh Date: Mon, 27 May 2024 15:18:05 +1000 Subject: [PATCH 18/28] Batch by samples instead of series --- pkg/streamingpromql/benchmarks/ingester.go | 43 +++++++++++++--------- 1 file changed, 26 insertions(+), 17 deletions(-) diff --git a/pkg/streamingpromql/benchmarks/ingester.go b/pkg/streamingpromql/benchmarks/ingester.go index 61b5621d942..de202e18e55 100644 --- a/pkg/streamingpromql/benchmarks/ingester.go +++ b/pkg/streamingpromql/benchmarks/ingester.go @@ -205,29 +205,38 @@ func pushTestData(ing *ingester.Ingester, metricSizes []int) error { } ctx := user.InjectOrgID(context.Background(), UserID) - req := &mimirpb.WriteRequest{ - Timeseries: make([]mimirpb.PreallocTimeseries, len(metrics)), - } - for i, m := range metrics { - series := mimirpb.PreallocTimeseries{TimeSeries: &mimirpb.TimeSeries{ - Labels: mimirpb.FromLabelsToLabelAdapters(m), - Samples: make([]mimirpb.Sample, NumIntervals), - }} + // Batch samples into separate requests + batchSize := 100 + for i := 0; i < NumIntervals; i += batchSize { + end := i + batchSize + if end > NumIntervals { + end = NumIntervals + } - for s := 0; s < NumIntervals; s++ { - series.Samples[s].TimestampMs = int64(s) * interval.Milliseconds() - series.Samples[s].Value = float64(s) + float64(i)/float64(len(metrics)) + req := &mimirpb.WriteRequest{ + Timeseries: make([]mimirpb.PreallocTimeseries, len(metrics)), } - req.Timeseries[i] = series - } + for j, m := range metrics { + series := mimirpb.PreallocTimeseries{TimeSeries: &mimirpb.TimeSeries{ + Labels: mimirpb.FromLabelsToLabelAdapters(m.Copy()), + Samples: make([]mimirpb.Sample, end-i), + }} - if _, err := ing.Push(ctx, req); err != nil { - return fmt.Errorf("failed to push samples to ingester: %w", err) - } + for s := i; s < end; s++ { + series.Samples[s-i].TimestampMs = int64(s) * interval.Milliseconds() + series.Samples[s-i].Value = float64(s) + float64(j)/float64(len(metrics)) + } - ing.Flush() + req.Timeseries[j] = series + } + + if _, err := ing.Push(ctx, req); err != nil { + return fmt.Errorf("failed to push samples to ingester: %w", err) + } + ing.Flush() + } return nil } From 1a38e40607b0dbb76a114169b5789bba1c32d64b Mon Sep 17 00:00:00 2001 From: Joshua Hesketh Date: Mon, 27 May 2024 15:34:19 +1000 Subject: [PATCH 19/28] Generate native histogram benchmark test data --- pkg/streamingpromql/benchmarks/ingester.go | 50 ++++++++++++++++------ 1 file changed, 38 insertions(+), 12 deletions(-) diff --git a/pkg/streamingpromql/benchmarks/ingester.go b/pkg/streamingpromql/benchmarks/ingester.go index de202e18e55..e9dfe9be8e5 100644 --- a/pkg/streamingpromql/benchmarks/ingester.go +++ b/pkg/streamingpromql/benchmarks/ingester.go @@ -14,6 +14,7 @@ import ( "net" "path/filepath" "strconv" + "strings" "time" "github.com/go-kit/log" @@ -174,7 +175,7 @@ func pushTestData(ing *ingester.Ingester, metricSizes []int) error { totalMetrics := 0 for _, size := range metricSizes { - totalMetrics += (2 + histogramBuckets + 1) * size // 2 non-histogram metrics + 5 metrics for histogram buckets + 1 metric for +Inf histogram bucket + totalMetrics += (2 + histogramBuckets + 1 + 1) * size // 2 non-histogram metrics + 5 metrics for histogram buckets + 1 metric for +Inf histogram bucket + 1 metric for native-histograms } metrics := make([]labels.Labels, 0, totalMetrics) @@ -183,6 +184,7 @@ func pushTestData(ing *ingester.Ingester, metricSizes []int) error { aName := "a_" + strconv.Itoa(size) bName := "b_" + strconv.Itoa(size) histogramName := "h_" + strconv.Itoa(size) + nativeHistogramName := "nh_" + strconv.Itoa(size) if size == 1 { // We don't want a "l" label on metrics with one series (some test cases rely on this label not being present). @@ -192,6 +194,7 @@ func pushTestData(ing *ingester.Ingester, metricSizes []int) error { metrics = append(metrics, labels.FromStrings("__name__", histogramName, "le", strconv.Itoa(le))) } metrics = append(metrics, labels.FromStrings("__name__", histogramName, "le", "+Inf")) + metrics = append(metrics, labels.FromStrings("__name__", nativeHistogramName)) } else { for i := 0; i < size; i++ { metrics = append(metrics, labels.FromStrings("__name__", aName, "l", strconv.Itoa(i))) @@ -200,6 +203,7 @@ func pushTestData(ing *ingester.Ingester, metricSizes []int) error { metrics = append(metrics, labels.FromStrings("__name__", histogramName, "l", strconv.Itoa(i), "le", strconv.Itoa(le))) } metrics = append(metrics, labels.FromStrings("__name__", histogramName, "l", strconv.Itoa(i), "le", "+Inf")) + metrics = append(metrics, labels.FromStrings("__name__", nativeHistogramName, "l", strconv.Itoa(i))) } } } @@ -219,19 +223,41 @@ func pushTestData(ing *ingester.Ingester, metricSizes []int) error { } for j, m := range metrics { - series := mimirpb.PreallocTimeseries{TimeSeries: &mimirpb.TimeSeries{ - Labels: mimirpb.FromLabelsToLabelAdapters(m.Copy()), - Samples: make([]mimirpb.Sample, end-i), - }} - - for s := i; s < end; s++ { - series.Samples[s-i].TimestampMs = int64(s) * interval.Milliseconds() - series.Samples[s-i].Value = float64(s) + float64(j)/float64(len(metrics)) - } + if strings.HasPrefix(m.Get("__name__"), "nh_") { + series := mimirpb.PreallocTimeseries{TimeSeries: &mimirpb.TimeSeries{ + Labels: mimirpb.FromLabelsToLabelAdapters(m.Copy()), + Histograms: make([]mimirpb.Histogram, end-i), + }} + + for s := i; s < end; s++ { + // TODO(jhesketh): Fix this with some better data + series.Histograms[s-i].Timestamp = int64(s) * interval.Milliseconds() + series.Histograms[s-i].Count = &mimirpb.Histogram_CountInt{CountInt: 12} + series.Histograms[s-i].ZeroCount = &mimirpb.Histogram_ZeroCountInt{ZeroCountInt: 2} + series.Histograms[s-i].ZeroThreshold = 0.001 + series.Histograms[s-i].Sum = 18.4 + series.Histograms[s-i].Schema = 0 + series.Histograms[s-i].NegativeSpans = []mimirpb.BucketSpan{{Offset: 0, Length: 2}, {Offset: 1, Length: 2}} + series.Histograms[s-i].NegativeDeltas = []int64{1, 1, -1, 0} + series.Histograms[s-i].PositiveSpans = []mimirpb.BucketSpan{{Offset: 0, Length: 2}, {Offset: 1, Length: 2}} + series.Histograms[s-i].PositiveDeltas = []int64{1, 1, -1, 0} + } - req.Timeseries[j] = series - } + req.Timeseries[j] = series + } else { + series := mimirpb.PreallocTimeseries{TimeSeries: &mimirpb.TimeSeries{ + Labels: mimirpb.FromLabelsToLabelAdapters(m.Copy()), + Samples: make([]mimirpb.Sample, end-i), + }} + for s := i; s < end; s++ { + series.Samples[s-i].TimestampMs = int64(s) * interval.Milliseconds() + series.Samples[s-i].Value = float64(s) + float64(j)/float64(len(metrics)) + } + + req.Timeseries[j] = series + } + } if _, err := ing.Push(ctx, req); err != nil { return fmt.Errorf("failed to push samples to ingester: %w", err) } From f8ae51456b2d14e7ded520d3ee5741e381cfb5ab Mon Sep 17 00:00:00 2001 From: Joshua Hesketh Date: Mon, 27 May 2024 15:34:34 +1000 Subject: [PATCH 20/28] Add native histogram benchmark test --- pkg/streamingpromql/benchmarks/benchmarks.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pkg/streamingpromql/benchmarks/benchmarks.go b/pkg/streamingpromql/benchmarks/benchmarks.go index c94781ee7b9..38c22daab63 100644 --- a/pkg/streamingpromql/benchmarks/benchmarks.go +++ b/pkg/streamingpromql/benchmarks/benchmarks.go @@ -76,6 +76,10 @@ func TestCases(metricSizes []int) []BenchCase { { Expr: "a_X", }, + // Histogram retrieval + { + Expr: "nh_X", + }, // Range vector selector. { Expr: "a_X[1m]", From 063ed83acf93db2fe079d9d66f846b3ea31f36ee Mon Sep 17 00:00:00 2001 From: Joshua Hesketh Date: Mon, 27 May 2024 15:47:22 +1000 Subject: [PATCH 21/28] Move compare.sh next to other benchmark scripts --- tools/benchmark-query-engine/README.md | 2 +- .../streamingpromql => tools/benchmark-query-engine}/compare.sh | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) rename {pkg/streamingpromql => tools/benchmark-query-engine}/compare.sh (90%) diff --git a/tools/benchmark-query-engine/README.md b/tools/benchmark-query-engine/README.md index 96171d4d8f4..057f963d582 100644 --- a/tools/benchmark-query-engine/README.md +++ b/tools/benchmark-query-engine/README.md @@ -4,7 +4,7 @@ Each benchmark is run in a separate process to provide some kind of guarantee th An ingester is started in the `benchmark-query-engine` process (ie. not the benchmark process) to ensure the TSDB does not skew results. -Results from `benchmark-query-engine` can be summarised with `benchstat`, as well as [`compare.sh`](../../pkg/querier/engine/streaming/compare.sh). +Results from `benchmark-query-engine` can be summarised with `benchstat`, as well as [`compare.sh`](./compare.sh). Usage: diff --git a/pkg/streamingpromql/compare.sh b/tools/benchmark-query-engine/compare.sh similarity index 90% rename from pkg/streamingpromql/compare.sh rename to tools/benchmark-query-engine/compare.sh index 5c7cbbf345d..de21345e39d 100755 --- a/pkg/streamingpromql/compare.sh +++ b/tools/benchmark-query-engine/compare.sh @@ -4,7 +4,7 @@ set -euo pipefail -RESULTS_FILE="$1" # Should be the path to a file produced by a command like `go run ./tools/benchmark-query-engine -count=6 | tee output.txt` +RESULTS_FILE="$1" # Should be the path to a file produced by a command like `go run . -count=6 | tee output.txt` PROMETHEUS_RESULTS_FILE=$(mktemp /tmp/prometheus.XXXX) STREAMING_RESULTS_FILE=$(mktemp /tmp/streaming.XXXX) From 0cff32b5dad43af731d94e678c08db377b5bfc81 Mon Sep 17 00:00:00 2001 From: Joshua Hesketh Date: Mon, 27 May 2024 17:22:13 +1000 Subject: [PATCH 22/28] Fix comparison tests for when there are no floats --- pkg/streamingpromql/benchmarks/benchmarks.go | 3 +- .../benchmarks/comparison_test.go | 30 +++++++++++++++++-- 2 files changed, 30 insertions(+), 3 deletions(-) diff --git a/pkg/streamingpromql/benchmarks/benchmarks.go b/pkg/streamingpromql/benchmarks/benchmarks.go index 38c22daab63..7e84b54f4f0 100644 --- a/pkg/streamingpromql/benchmarks/benchmarks.go +++ b/pkg/streamingpromql/benchmarks/benchmarks.go @@ -69,7 +69,8 @@ func (c BenchCase) Run(ctx context.Context, t testing.TB, start, end time.Time, return res, qry.Close } -// These test cases are taken from https://github.com/prometheus/prometheus/blob/main/promql/bench_test.go. +// These test cases are taken from https://github.com/prometheus/prometheus/blob/main/promql/bench_test.go +// and enhanced/added to. func TestCases(metricSizes []int) []BenchCase { cases := []BenchCase{ // Plain retrieval. diff --git a/pkg/streamingpromql/benchmarks/comparison_test.go b/pkg/streamingpromql/benchmarks/comparison_test.go index 7d7d7e4dfa0..c0f44b33493 100644 --- a/pkg/streamingpromql/benchmarks/comparison_test.go +++ b/pkg/streamingpromql/benchmarks/comparison_test.go @@ -163,6 +163,24 @@ func TestBenchmarkSetup(t *testing.T) { } require.Equal(t, expectedPoints, series.Floats) + + // Check native histograms are set up correctly + query, err = streamingEngine.NewRangeQuery(ctx, q, nil, "nh_1", time.Unix(0, 0), time.Unix(int64(15*intervalSeconds), 0), interval) + require.NoError(t, err) + + t.Cleanup(query.Close) + result = query.Exec(ctx) + require.NoError(t, result.Err) + + matrix, err = result.Matrix() + require.NoError(t, err) + + require.Len(t, matrix, 1) + series = matrix[0] + require.Equal(t, labels.FromStrings("__name__", "nh_1"), series.Metric) + require.Len(t, series.Floats, 0) + require.Len(t, series.Histograms, 16) + // TODO(jhesketh): Maybe check a histogram is set up as expected } // Why do we do this rather than require.Equal(t, expected, actual)? @@ -194,7 +212,11 @@ func requireEqualResults(t testing.TB, expected, actual *promql.Result) { require.Equal(t, expectedSample.Metric, actualSample.Metric) require.Equal(t, expectedSample.T, actualSample.T) require.Equal(t, expectedSample.H, actualSample.H) - require.InEpsilon(t, expectedSample.F, actualSample.F, 1e-10) + if expectedSample.F == 0 { + require.Equal(t, expectedSample.F, actualSample.F) + } else { + require.InEpsilon(t, expectedSample.F, actualSample.F, 1e-10) + } } case parser.ValueTypeMatrix: expectedMatrix, err := expected.Matrix() @@ -214,7 +236,11 @@ func requireEqualResults(t testing.TB, expected, actual *promql.Result) { actualPoint := actualSeries.Floats[j] require.Equal(t, expectedPoint.T, actualPoint.T) - require.InEpsilonf(t, expectedPoint.F, actualPoint.F, 1e-10, "expected series %v to have points %v, but result is %v", expectedSeries.Metric.String(), expectedSeries.Floats, actualSeries.Floats) + if expectedPoint.F == 0 { + require.Equal(t, expectedPoint.F, actualPoint.F) + } else { + require.InEpsilonf(t, expectedPoint.F, actualPoint.F, 1e-10, "expected series %v to have points %v, but result is %v", expectedSeries.Metric.String(), expectedSeries.Floats, actualSeries.Floats) + } } } default: From b9adee0ed2bf30742de46065bd3d128ac9fb62a9 Mon Sep 17 00:00:00 2001 From: Joshua Hesketh Date: Mon, 27 May 2024 17:24:45 +1000 Subject: [PATCH 23/28] Add extra samples to the histogram tests --- pkg/streamingpromql/testdata/ours/native_histograms.test | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/streamingpromql/testdata/ours/native_histograms.test b/pkg/streamingpromql/testdata/ours/native_histograms.test index 33594092d18..6ada96ef656 100644 --- a/pkg/streamingpromql/testdata/ours/native_histograms.test +++ b/pkg/streamingpromql/testdata/ours/native_histograms.test @@ -5,13 +5,13 @@ # buckets:[1 2 1] means 1 observation in the 1st bucket, 2 observations in the 2nd and 1 observation in the 3rd (total 4). load 5m - single_histogram {{schema:0 sum:5 count:4 buckets:[1 2 1]}} + single_histogram {{schema:0 sum:5 count:4 buckets:[1 2 1]}} {{schema:0 sum:20 count:7 buckets:[9 10 1]}} eval instant at 5m single_histogram - {__name__="single_histogram"} {{count:4 sum:5 buckets:[1 2 1]}} + {__name__="single_histogram"} {{schema:0 sum:20 count:7 buckets:[9 10 1]}} -eval range from 0 to 4m step 1m single_histogram - {__name__="single_histogram"} {{schema:0 sum:5 count:4 buckets:[1 2 1]}}x4 +eval range from 0 to 5m step 1m single_histogram + {__name__="single_histogram"} {{schema:0 sum:5 count:4 buckets:[1 2 1]}} {{schema:0 sum:5 count:4 buckets:[1 2 1]}} {{schema:0 sum:5 count:4 buckets:[1 2 1]}} {{schema:0 sum:5 count:4 buckets:[1 2 1]}} {{schema:0 sum:5 count:4 buckets:[1 2 1]}} {{schema:0 sum:20 count:7 buckets:[9 10 1]}} clear From 7fbde860e9a521b05b70d3b1ed60a55211af661f Mon Sep 17 00:00:00 2001 From: Joshua Hesketh Date: Wed, 29 May 2024 13:35:50 +1000 Subject: [PATCH 24/28] Add note on batch size --- pkg/streamingpromql/benchmarks/ingester.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pkg/streamingpromql/benchmarks/ingester.go b/pkg/streamingpromql/benchmarks/ingester.go index e9dfe9be8e5..05bceea5b3a 100644 --- a/pkg/streamingpromql/benchmarks/ingester.go +++ b/pkg/streamingpromql/benchmarks/ingester.go @@ -211,6 +211,9 @@ func pushTestData(ing *ingester.Ingester, metricSizes []int) error { ctx := user.InjectOrgID(context.Background(), UserID) // Batch samples into separate requests + // There is no precise science behind this number currently. + // A quick run locally found batching by 100 did not increase the loading time by any noticable amount. + // Additionally memory usage maxed about 4GB for the whole process. batchSize := 100 for i := 0; i < NumIntervals; i += batchSize { end := i + batchSize From 14ff8bd60ccc05a932e3272a9d55707a318094dc Mon Sep 17 00:00:00 2001 From: Joshua Hesketh Date: Wed, 29 May 2024 13:48:17 +1000 Subject: [PATCH 25/28] Add quick check that histograms are loaded --- pkg/streamingpromql/benchmarks/comparison_test.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/pkg/streamingpromql/benchmarks/comparison_test.go b/pkg/streamingpromql/benchmarks/comparison_test.go index c0f44b33493..df5b2d38b59 100644 --- a/pkg/streamingpromql/benchmarks/comparison_test.go +++ b/pkg/streamingpromql/benchmarks/comparison_test.go @@ -180,7 +180,11 @@ func TestBenchmarkSetup(t *testing.T) { require.Equal(t, labels.FromStrings("__name__", "nh_1"), series.Metric) require.Len(t, series.Floats, 0) require.Len(t, series.Histograms, 16) - // TODO(jhesketh): Maybe check a histogram is set up as expected + + // Check one histogram point is as expected + require.Equal(t, int64(0), series.Histograms[0].T) + require.Equal(t, 12.0, series.Histograms[0].H.Count) + require.Equal(t, 18.4, series.Histograms[0].H.Sum) } // Why do we do this rather than require.Equal(t, expected, actual)? From a08733c3132b64027e6d3aaedda2dff41eee63b8 Mon Sep 17 00:00:00 2001 From: Joshua Hesketh Date: Wed, 29 May 2024 13:53:32 +1000 Subject: [PATCH 26/28] Use clearer variables for indexes --- pkg/streamingpromql/benchmarks/ingester.go | 42 +++++++++++----------- 1 file changed, 21 insertions(+), 21 deletions(-) diff --git a/pkg/streamingpromql/benchmarks/ingester.go b/pkg/streamingpromql/benchmarks/ingester.go index 05bceea5b3a..2938ce356c2 100644 --- a/pkg/streamingpromql/benchmarks/ingester.go +++ b/pkg/streamingpromql/benchmarks/ingester.go @@ -215,8 +215,8 @@ func pushTestData(ing *ingester.Ingester, metricSizes []int) error { // A quick run locally found batching by 100 did not increase the loading time by any noticable amount. // Additionally memory usage maxed about 4GB for the whole process. batchSize := 100 - for i := 0; i < NumIntervals; i += batchSize { - end := i + batchSize + for start := 0; start < NumIntervals; start += batchSize { + end := start + batchSize if end > NumIntervals { end = NumIntervals } @@ -225,40 +225,40 @@ func pushTestData(ing *ingester.Ingester, metricSizes []int) error { Timeseries: make([]mimirpb.PreallocTimeseries, len(metrics)), } - for j, m := range metrics { + for metricIdx, m := range metrics { if strings.HasPrefix(m.Get("__name__"), "nh_") { series := mimirpb.PreallocTimeseries{TimeSeries: &mimirpb.TimeSeries{ Labels: mimirpb.FromLabelsToLabelAdapters(m.Copy()), - Histograms: make([]mimirpb.Histogram, end-i), + Histograms: make([]mimirpb.Histogram, end-start), }} - for s := i; s < end; s++ { + for ts := start; ts < end; ts++ { // TODO(jhesketh): Fix this with some better data - series.Histograms[s-i].Timestamp = int64(s) * interval.Milliseconds() - series.Histograms[s-i].Count = &mimirpb.Histogram_CountInt{CountInt: 12} - series.Histograms[s-i].ZeroCount = &mimirpb.Histogram_ZeroCountInt{ZeroCountInt: 2} - series.Histograms[s-i].ZeroThreshold = 0.001 - series.Histograms[s-i].Sum = 18.4 - series.Histograms[s-i].Schema = 0 - series.Histograms[s-i].NegativeSpans = []mimirpb.BucketSpan{{Offset: 0, Length: 2}, {Offset: 1, Length: 2}} - series.Histograms[s-i].NegativeDeltas = []int64{1, 1, -1, 0} - series.Histograms[s-i].PositiveSpans = []mimirpb.BucketSpan{{Offset: 0, Length: 2}, {Offset: 1, Length: 2}} - series.Histograms[s-i].PositiveDeltas = []int64{1, 1, -1, 0} + series.Histograms[ts-start].Timestamp = int64(ts) * interval.Milliseconds() + series.Histograms[ts-start].Count = &mimirpb.Histogram_CountInt{CountInt: 12} + series.Histograms[ts-start].ZeroCount = &mimirpb.Histogram_ZeroCountInt{ZeroCountInt: 2} + series.Histograms[ts-start].ZeroThreshold = 0.001 + series.Histograms[ts-start].Sum = 18.4 + series.Histograms[ts-start].Schema = 0 + series.Histograms[ts-start].NegativeSpans = []mimirpb.BucketSpan{{Offset: 0, Length: 2}, {Offset: 1, Length: 2}} + series.Histograms[ts-start].NegativeDeltas = []int64{1, 1, -1, 0} + series.Histograms[ts-start].PositiveSpans = []mimirpb.BucketSpan{{Offset: 0, Length: 2}, {Offset: 1, Length: 2}} + series.Histograms[ts-start].PositiveDeltas = []int64{1, 1, -1, 0} } - req.Timeseries[j] = series + req.Timeseries[metricIdx] = series } else { series := mimirpb.PreallocTimeseries{TimeSeries: &mimirpb.TimeSeries{ Labels: mimirpb.FromLabelsToLabelAdapters(m.Copy()), - Samples: make([]mimirpb.Sample, end-i), + Samples: make([]mimirpb.Sample, end-start), }} - for s := i; s < end; s++ { - series.Samples[s-i].TimestampMs = int64(s) * interval.Milliseconds() - series.Samples[s-i].Value = float64(s) + float64(j)/float64(len(metrics)) + for ts := start; ts < end; ts++ { + series.Samples[ts-start].TimestampMs = int64(ts) * interval.Milliseconds() + series.Samples[ts-start].Value = float64(ts) + float64(metricIdx)/float64(len(metrics)) } - req.Timeseries[j] = series + req.Timeseries[metricIdx] = series } } if _, err := ing.Push(ctx, req); err != nil { From e94b5965a2eea77f4ade066a16b1ec3005b9ab14 Mon Sep 17 00:00:00 2001 From: Joshua Hesketh Date: Wed, 29 May 2024 13:54:10 +1000 Subject: [PATCH 27/28] Update changelog --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c330cbe84fe..d117dd26ef3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,7 +14,7 @@ * [FEATURE] Continuous-test: now runable as a module with `mimir -target=continuous-test`. #7747 * [FEATURE] Store-gateway: Allow specific tenants to be enabled or disabled via `-store-gateway.enabled-tenants` or `-store-gateway.disabled-tenants` CLI flags or their corresponding YAML settings. #7653 * [FEATURE] New `-.s3.bucket-lookup-type` flag configures lookup style type, used to access bucket in s3 compatible providers. #7684 -* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.promql-engine=streaming`. #7693 #7898 #7899 #8023 #8058 #8096 +* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.promql-engine=streaming`. #7693 #7898 #7899 #8023 #8058 #8096 #8121 * [FEATURE] New `/ingester/unregister-on-shutdown` HTTP endpoint allows dynamic access to ingesters' `-ingester.ring.unregister-on-shutdown` configuration. #7739 * [FEATURE] Server: added experimental [PROXY protocol support](https://www.haproxy.org/download/2.3/doc/proxy-protocol.txt). The PROXY protocol support can be enabled via `-server.proxy-protocol-enabled=true`. When enabled, the support is added both to HTTP and gRPC listening ports. #7698 * [FEATURE] mimirtool: Add `runtime-config verify` sub-command, for verifying Mimir runtime config files. #8123 From e8e5b24387fcaf4fed138d7b427900bf006cab4d Mon Sep 17 00:00:00 2001 From: Joshua Hesketh Date: Wed, 29 May 2024 14:09:36 +1000 Subject: [PATCH 28/28] Fix typo --- pkg/streamingpromql/benchmarks/ingester.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/streamingpromql/benchmarks/ingester.go b/pkg/streamingpromql/benchmarks/ingester.go index 2938ce356c2..5c6947c949e 100644 --- a/pkg/streamingpromql/benchmarks/ingester.go +++ b/pkg/streamingpromql/benchmarks/ingester.go @@ -212,7 +212,7 @@ func pushTestData(ing *ingester.Ingester, metricSizes []int) error { // Batch samples into separate requests // There is no precise science behind this number currently. - // A quick run locally found batching by 100 did not increase the loading time by any noticable amount. + // A quick run locally found batching by 100 did not increase the loading time by any noticeable amount. // Additionally memory usage maxed about 4GB for the whole process. batchSize := 100 for start := 0; start < NumIntervals; start += batchSize {