Skip to content

Commit

Permalink
Merge branch 'main' into charleskorn/mqe-functions
Browse files Browse the repository at this point in the history
  • Loading branch information
charleskorn authored Oct 14, 2024
2 parents 925e240 + dd18bcb commit 9742b34
Show file tree
Hide file tree
Showing 11 changed files with 87 additions and 142 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* `cortex_alertmanager_state_replication_failed_total`
* `cortex_alertmanager_alerts`
* `cortex_alertmanager_silences`
* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #9367 #9368 #9398 #9399 #9403 #9417 #9418 #9419 #9420 #9482 #9504 #9505 #9507 #9518 #9531 #9532 #9533 #9553 #9558
* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #9367 #9368 #9398 #9399 #9403 #9417 #9418 #9419 #9420 #9482 #9504 #9505 #9507 #9518 #9531 #9532 #9533 #9553 #9558 #9588
* [FEATURE] Query-frontend: added experimental configuration options `query-frontend.cache-errors` and `query-frontend.results-cache-ttl-for-errors` to allow non-transient responses to be cached. When set to `true` error responses from hitting limits or bad data are cached for a short TTL. #9028
* [FEATURE] gRPC: Support S2 compression. #9322
* `-alertmanager.alertmanager-client.grpc-compression=s2`
Expand Down
29 changes: 16 additions & 13 deletions pkg/streamingpromql/aggregations/aggregations_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,11 @@ func TestAggregationGroupNativeHistogramSafety(t *testing.T) {
h1 := &histogram.FloatHistogram{Sum: 1}
h2 := &histogram.FloatHistogram{Sum: 2}
h3 := &histogram.FloatHistogram{Sum: 3}
h4 := &histogram.FloatHistogram{Sum: 4}
histograms = append(histograms, promql.HPoint{T: 0, H: h1})
histograms = append(histograms, promql.HPoint{T: 1, H: h2})
histograms = append(histograms, promql.HPoint{T: 2, H: h2}) // T=2 is a lookback and refers to the same histogram as T=1.
histograms = append(histograms, promql.HPoint{T: 4, H: h3})
histograms = append(histograms, promql.HPoint{T: 2, H: h3})
histograms = append(histograms, promql.HPoint{T: 4, H: h4})
series := types.InstantVectorSeriesData{Histograms: histograms}

require.NoError(t, group.AccumulateSeries(series, timeRange, memoryConsumptionTracker, nil))
Expand All @@ -49,24 +50,26 @@ func TestAggregationGroupNativeHistogramSafety(t *testing.T) {
// Second series: all histograms that are not retained should be nil-ed out after returning.
histograms, err = types.HPointSlicePool.Get(5, memoryConsumptionTracker)
require.NoError(t, err)
h4 := &histogram.FloatHistogram{Sum: 4}
h5 := &histogram.FloatHistogram{Sum: 5}
h6 := &histogram.FloatHistogram{Sum: 6}
histograms = append(histograms, promql.HPoint{T: 0, H: h4})
histograms = append(histograms, promql.HPoint{T: 1, H: h5})
histograms = append(histograms, promql.HPoint{T: 2, H: h6})
histograms = append(histograms, promql.HPoint{T: 3, H: h6}) // T=3 is a lookback and refers to the same histogram as T=2.
histograms = append(histograms, promql.HPoint{T: 4, H: h6})
h7 := &histogram.FloatHistogram{Sum: 7}
h8 := &histogram.FloatHistogram{Sum: 8}
h9 := &histogram.FloatHistogram{Sum: 9}
histograms = append(histograms, promql.HPoint{T: 0, H: h5})
histograms = append(histograms, promql.HPoint{T: 1, H: h6})
histograms = append(histograms, promql.HPoint{T: 2, H: h7})
histograms = append(histograms, promql.HPoint{T: 3, H: h8})
histograms = append(histograms, promql.HPoint{T: 4, H: h9})
series = types.InstantVectorSeriesData{Histograms: histograms}

require.NoError(t, group.AccumulateSeries(series, timeRange, memoryConsumptionTracker, nil))

expected := []promql.HPoint{
{T: 0, H: h4}, // h4 not retained (added to h1)
{T: 1, H: h5}, // h5 not retained (added to h2)
{T: 2, H: nil}, // h6 is retained for T=3
{T: 3, H: nil}, // h6 is retained for this point
{T: 4, H: nil}, // h6 is retained for T=3
{T: 0, H: h5}, // h5 not retained (added to h1)
{T: 1, H: h6}, // h6 not retained (added to h2)
{T: 2, H: h7}, // h7 not retained (added to h3)
{T: 3, H: nil}, // h8 is retained for this point
{T: 4, H: h9}, // h9 not retained (added to h4)
}

require.Equal(t, expected, series.Histograms, "all histograms retained should be nil-ed out after accumulating series")
Expand Down
37 changes: 6 additions & 31 deletions pkg/streamingpromql/aggregations/avg.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,6 @@ func (g *AvgAggregationGroup) accumulateHistograms(data types.InstantVectorSerie
g.histograms = g.histograms[:timeRange.StepCount]
}

var lastUncopiedHistogram *histogram.FloatHistogram

for inputIdx, p := range data.Histograms {
outputIdx := timeRange.PointIndex(p.T)
g.groupSeriesCounts[outputIdx]++
Expand All @@ -176,41 +174,18 @@ func (g *AvgAggregationGroup) accumulateHistograms(data types.InstantVectorSerie
continue
}

if lastUncopiedHistogram == p.H {
// Ensure the FloatHistogram instance is not reused when the HPoint slice is reused, as we're retaining a reference to it.
data.Histograms[inputIdx].H = nil
}

if g.histograms[outputIdx] == nil {
if lastUncopiedHistogram == p.H {
// We've already used this histogram for a previous point due to lookback.
// Make a copy of it so we don't modify the other point.
g.histograms[outputIdx] = p.H.Copy()
g.histogramPointCount++

continue
}

// We have not previously used this histogram as the start of an output point.
// It is safe to store it and modify it later without copying, as we'll make copies above if the same histogram is used for subsequent points.
// First sample for this output point, retain the histogram as-is.
g.histograms[outputIdx] = p.H
g.histogramPointCount++
lastUncopiedHistogram = p.H

// Ensure the FloatHistogram instance is not reused when the HPoint slice data.Histograms is reused, including if it was used at previous points.
data.RemoveReferencesToRetainedHistogram(p.H, inputIdx)
// Ensure the FloatHistogram instance is not reused when the HPoint slice data.Histograms is reused.
data.Histograms[inputIdx].H = nil

continue
}

// Check if the next point in data.Histograms is the same as the current point (due to lookback)
// If it is, create a copy before modifying it.
toAdd := p.H
if inputIdx+1 < len(data.Histograms) && data.Histograms[inputIdx+1].H == p.H {
toAdd = p.H.Copy()
}

_, err = toAdd.Sub(g.histograms[outputIdx])
_, err = p.H.Sub(g.histograms[outputIdx])
if err != nil {
// Unable to subtract histograms (likely due to invalid combination of histograms). Make sure we don't emit a sample at this timestamp.
g.histograms[outputIdx] = invalidCombinationOfHistograms
Expand All @@ -223,8 +198,8 @@ func (g *AvgAggregationGroup) accumulateHistograms(data types.InstantVectorSerie
continue
}

toAdd.Div(g.groupSeriesCounts[outputIdx])
_, err = g.histograms[outputIdx].Add(toAdd)
p.H.Div(g.groupSeriesCounts[outputIdx])
_, err = g.histograms[outputIdx].Add(p.H)
if err != nil {
// Unable to add histograms together (likely due to invalid combination of histograms). Make sure we don't emit a sample at this timestamp.
g.histograms[outputIdx] = invalidCombinationOfHistograms
Expand Down
23 changes: 3 additions & 20 deletions pkg/streamingpromql/aggregations/sum.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ func (g *SumAggregationGroup) accumulateFloats(data types.InstantVectorSeriesDat

func (g *SumAggregationGroup) accumulateHistograms(data types.InstantVectorSeriesData, timeRange types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, emitAnnotationFunc functions.EmitAnnotationFunc) error {
var err error
var lastUncopiedHistogram *histogram.FloatHistogram

if len(data.Histograms) > 0 && g.histogramSums == nil {
// First series with histogram values for this group, populate it.
Expand All @@ -97,29 +96,13 @@ func (g *SumAggregationGroup) accumulateHistograms(data types.InstantVectorSerie
continue
}

if lastUncopiedHistogram == p.H {
// Ensure the FloatHistogram instance is not reused when the HPoint slice is reused, as we're retaining a reference to it.
data.Histograms[inputIdx].H = nil
}

if g.histogramSums[outputIdx] == nil {
if lastUncopiedHistogram == p.H {
// We've already used this histogram for a previous point due to lookback.
// Make a copy of it so we don't modify the other point.
g.histogramSums[outputIdx] = p.H.Copy()
g.histogramPointCount++

continue
}

// We have not previously used this histogram as the start of an output point.
// It is safe to store it and modify it later without copying, as we'll make copies above if the same histogram is used for subsequent points.
// First sample for this output point, retain the histogram as-is.
g.histogramSums[outputIdx] = p.H
g.histogramPointCount++
lastUncopiedHistogram = p.H

// Ensure the FloatHistogram instance is not reused when the HPoint slice data.Histograms is reused, including if it was used at previous points.
data.RemoveReferencesToRetainedHistogram(p.H, inputIdx)
// Ensure the FloatHistogram instance is not reused when the HPoint slice data.Histograms is reused.
data.Histograms[inputIdx].H = nil

continue
}
Expand Down
5 changes: 0 additions & 5 deletions pkg/streamingpromql/functions/math.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,6 @@ var UnaryNegation InstantVectorSeriesFunction = func(seriesData types.InstantVec
}

for i := range seriesData.Histograms {
if i > 0 && seriesData.Histograms[i].H == seriesData.Histograms[i-1].H {
// Previous point shares the same histogram instance, which we've already negated, so don't negate it again.
continue
}

seriesData.Histograms[i].H.Mul(-1) // Mul modifies the histogram in-place, so we don't need to do anything with the result here.
}

Expand Down
18 changes: 10 additions & 8 deletions pkg/streamingpromql/operators/instant_vector_selector.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,8 @@ func (v *InstantVectorSelector) NextSeries(ctx context.Context) (types.InstantVe
t, f = v.memoizedIterator.At()
case chunkenc.ValHistogram, chunkenc.ValFloatHistogram:
if atT := v.memoizedIterator.AtT(); atT == lastHistogramT && lastHistogram != nil {
// We're still looking at the last histogram we used, don't bother creating another FloatHistogram.
// Consuming operators are expected to check for the same FloatHistogram instance used at multiple points and copy it
// if they are going to mutate it, so this is safe to do.
// We're still looking at the last histogram we used, don't bother creating another FloatHistogram yet as we might not need it.
// If we're going to return this histogram, we'll make a copy below.
t, h = atT, lastHistogram
} else {
t, h = v.memoizedIterator.AtFloatHistogram()
Expand All @@ -109,12 +108,9 @@ func (v *InstantVectorSelector) NextSeries(ctx context.Context) (types.InstantVe
}
if h != nil {
if t == lastHistogramT && lastHistogram != nil {
// Reuse exactly the same FloatHistogram as last time.
// Reuse exactly the same FloatHistogram as last time, don't bother creating another FloatHistogram yet.
// PeekPrev can return a new FloatHistogram instance with the same underlying bucket slices as a previous call
// to AtFloatHistogram.
// Consuming operators are expected to check for the same FloatHistogram instance used at multiple points and copy
// it if they are going to mutate it, but consuming operators don't check the underlying bucket slices, so without
// this, we can end up with incorrect query results.
// to AtFloatHistogram, so if we're going to return this histogram, we'll make a copy below.
h = lastHistogram
}
}
Expand All @@ -136,6 +132,12 @@ func (v *InstantVectorSelector) NextSeries(ctx context.Context) (types.InstantVe
return types.InstantVectorSeriesData{}, err
}
}

if t == lastHistogramT {
// We're returning a histogram we've previously used, so make a copy of it now.
h = h.Copy()
}

data.Histograms = append(data.Histograms, promql.HPoint{T: stepT, H: h})
lastHistogramT = t
lastHistogram = h
Expand Down
60 changes: 19 additions & 41 deletions pkg/streamingpromql/operators/instant_vector_selector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package operators

import (
"context"
"fmt"
"testing"
"time"

Expand All @@ -18,14 +19,14 @@ import (
)

func TestInstantVectorSelector_NativeHistogramPointerHandling(t *testing.T) {
requireNotSame := func(t *testing.T, h1, h2 *histogram.FloatHistogram) {
require.NotSame(t, h1, h2, "must not point to the same *FloatHistogram")

requireNotSameSlices(t, h1.PositiveSpans, h2.PositiveSpans, "positive spans")
requireNotSameSlices(t, h1.NegativeSpans, h2.NegativeSpans, "negative spans")
requireNotSameSlices(t, h1.PositiveBuckets, h2.PositiveBuckets, "positive buckets")
requireNotSameSlices(t, h1.NegativeBuckets, h2.NegativeBuckets, "negative buckets")
requireNotSameSlices(t, h1.CustomValues, h2.CustomValues, "custom values")
requireNotSame := func(t *testing.T, h1, h2 *histogram.FloatHistogram, context string) {
require.NotSamef(t, h1, h2, "%v: must not point to the same *FloatHistogram", context)

requireNotSameSlices(t, h1.PositiveSpans, h2.PositiveSpans, "positive spans", context)
requireNotSameSlices(t, h1.NegativeSpans, h2.NegativeSpans, "negative spans", context)
requireNotSameSlices(t, h1.PositiveBuckets, h2.PositiveBuckets, "positive buckets", context)
requireNotSameSlices(t, h1.NegativeBuckets, h2.NegativeBuckets, "negative buckets", context)
requireNotSameSlices(t, h1.CustomValues, h2.CustomValues, "custom values", context)
}

testCases := map[string]struct {
Expand All @@ -45,9 +46,6 @@ func TestInstantVectorSelector_NativeHistogramPointerHandling(t *testing.T) {
require.Equal(t, 5.0, points[0].H.Sum)
require.Equal(t, 20.0, points[1].H.Sum)
require.Equal(t, 21.0, points[2].H.Sum)

requireNotSame(t, points[0].H, points[1].H)
requireNotSame(t, points[1].H, points[2].H)
},
},
"different histograms at each point, some due to lookback": {
Expand All @@ -63,10 +61,6 @@ func TestInstantVectorSelector_NativeHistogramPointerHandling(t *testing.T) {
require.Equal(t, 5.0, points[1].H.Sum)
require.Equal(t, 20.0, points[2].H.Sum)
require.Equal(t, 21.0, points[3].H.Sum)

requireNotSame(t, points[0].H, points[1].H)
requireNotSame(t, points[1].H, points[2].H)
requireNotSame(t, points[2].H, points[3].H)
},
},
"same histogram at each point due to lookback": {
Expand All @@ -80,9 +74,6 @@ func TestInstantVectorSelector_NativeHistogramPointerHandling(t *testing.T) {
require.Equal(t, 5.0, points[0].H.Sum)
require.Equal(t, 5.0, points[1].H.Sum)
require.Equal(t, 5.0, points[2].H.Sum)

require.Same(t, points[0].H, points[1].H)
require.Same(t, points[1].H, points[2].H)
},
},
"same histogram at each point not due to lookback": {
Expand All @@ -95,8 +86,6 @@ func TestInstantVectorSelector_NativeHistogramPointerHandling(t *testing.T) {
require.Len(t, points, 2)
require.Equal(t, 5.0, points[0].H.Sum)
require.Equal(t, 5.0, points[1].H.Sum)

requireNotSame(t, points[0].H, points[1].H)
},
},
"last point is from lookback and is the same as the previous point": {
Expand All @@ -111,9 +100,6 @@ func TestInstantVectorSelector_NativeHistogramPointerHandling(t *testing.T) {
require.Equal(t, 3.0, points[0].H.Sum)
require.Equal(t, 5.0, points[1].H.Sum)
require.Equal(t, 5.0, points[2].H.Sum)

requireNotSame(t, points[0].H, points[1].H)
require.Same(t, points[1].H, points[2].H)
},
},
"last point is from lookback but is not the same as the previous point": {
Expand All @@ -128,9 +114,6 @@ func TestInstantVectorSelector_NativeHistogramPointerHandling(t *testing.T) {
require.Equal(t, 3.0, points[0].H.Sum)
require.Equal(t, 5.0, points[1].H.Sum)
require.Equal(t, 20.0, points[2].H.Sum)

requireNotSame(t, points[0].H, points[1].H)
requireNotSame(t, points[1].H, points[2].H)
},
},

Expand All @@ -145,11 +128,6 @@ func TestInstantVectorSelector_NativeHistogramPointerHandling(t *testing.T) {
require.Equal(t, 3.0, points[0].H.Sum)
require.Equal(t, 5.0, points[1].H.Sum)
require.Equal(t, 3.0, points[2].H.Sum)

requireNotSame(t, points[0].H, points[1].H)
requireNotSame(t, points[1].H, points[2].H)

requireNotSame(t, points[0].H, points[2].H)
},
},
"different histograms should have different spans": {
Expand All @@ -160,7 +138,6 @@ func TestInstantVectorSelector_NativeHistogramPointerHandling(t *testing.T) {
stepCount: 2,
check: func(t *testing.T, points []promql.HPoint, _ []promql.FPoint) {
require.Len(t, points, 2)
requireNotSame(t, points[0].H, points[1].H)
},
},
"successive histograms returned due to lookback should create different histograms at each point": {
Expand All @@ -172,7 +149,6 @@ func TestInstantVectorSelector_NativeHistogramPointerHandling(t *testing.T) {
stepCount: 3,
check: func(t *testing.T, points []promql.HPoint, _ []promql.FPoint) {
require.Len(t, points, 2)
requireNotSame(t, points[0].H, points[1].H)
},
},
"lookback points in middle of series reuse existing histogram": {
Expand All @@ -183,9 +159,6 @@ func TestInstantVectorSelector_NativeHistogramPointerHandling(t *testing.T) {
stepCount: 5,
check: func(t *testing.T, points []promql.HPoint, _ []promql.FPoint) {
require.Len(t, points, 4)
requireNotSame(t, points[0].H, points[2].H)
require.Same(t, points[0].H, points[1].H)
require.Same(t, points[2].H, points[3].H)
},
},
// FIXME: this test currently fails due to https://github.com/prometheus/prometheus/issues/14172
Expand All @@ -201,8 +174,6 @@ func TestInstantVectorSelector_NativeHistogramPointerHandling(t *testing.T) {
// require.Equal(t, 3.0, hPoints[0].H.Sum)
// require.Equal(t, 3.0, hPoints[1].H.Sum)
//
// require.Same(t, hPoints[0].H, hPoints[1].H)
//
// require.Equal(t, []promql.FPoint{{T: 60000, F: 2}}, fPoints)
// },
//},
Expand Down Expand Up @@ -234,16 +205,23 @@ func TestInstantVectorSelector_NativeHistogramPointerHandling(t *testing.T) {
series, err := selector.NextSeries(ctx)
require.NoError(t, err)
testCase.check(t, series.Histograms, series.Floats)

for i := 1; i < len(series.Histograms); i++ {
first := series.Histograms[i-1].H
second := series.Histograms[i].H

requireNotSame(t, first, second, fmt.Sprintf("histograms for points at index %v and %v in %v", i-1, i, series.Histograms))
}
})
}
}

func requireNotSameSlices[T any](t *testing.T, s1, s2 []T, description string) {
require.NotSamef(t, s1, s2, "must not point to the same %v slice", description)
func requireNotSameSlices[T any](t *testing.T, s1, s2 []T, description string, context string) {
require.NotSamef(t, s1, s2, "%v: must not point to the same %v slice", context, description)

// require.NotSame only checks the slice headers are different. It does not check that the slices do not point the same underlying arrays.
// So specifically check if the first elements are different.
if len(s1) > 0 && len(s2) > 0 {
require.NotSamef(t, &s1[0], &s2[0], "must not point to the same underlying %v array", description)
require.NotSamef(t, &s1[0], &s2[0], "%v: must not point to the same underlying %v array", context, description)
}
}
Loading

0 comments on commit 9742b34

Please sign in to comment.