diff --git a/pkg/streamingpromql/types/fpoint_ring_buffer.go b/pkg/streamingpromql/types/fpoint_ring_buffer.go index 473a021b750..464bc3ab30e 100644 --- a/pkg/streamingpromql/types/fpoint_ring_buffer.go +++ b/pkg/streamingpromql/types/fpoint_ring_buffer.go @@ -3,6 +3,7 @@ package types import ( + "fmt" "github.com/prometheus/prometheus/promql" "github.com/grafana/mimir/pkg/streamingpromql/limiting" @@ -18,6 +19,7 @@ import ( type FPointRingBuffer struct { memoryConsumptionTracker *limiting.MemoryConsumptionTracker points []promql.FPoint + pointsIndexMask int // Bitmask used to calculate indices into points efficiently. Computing modulo is relatively expensive, but points is always sized as a power of two, so we can a bitmask to calculate remainders cheaply. firstIndex int // Index into 'points' of first point in this buffer. size int // Number of points in this buffer. } @@ -58,6 +60,11 @@ func (b *FPointRingBuffer) Append(p promql.FPoint) error { return err } + if !isPowerOfTwo(cap(newSlice)) { + // We rely on the capacity being a power of two for the pointsIndexMask optimisation below. + panic(fmt.Sprintf("pool returned slice of capacity %v (requested %v), but wanted a power of two", cap(newSlice), newSize)) + } + newSlice = newSlice[:cap(newSlice)] pointsAtEnd := b.size - b.firstIndex copy(newSlice, b.points[b.firstIndex:]) @@ -66,9 +73,10 @@ func (b *FPointRingBuffer) Append(p promql.FPoint) error { putFPointSliceForRingBuffer(b.points, b.memoryConsumptionTracker) b.points = newSlice b.firstIndex = 0 + b.pointsIndexMask = cap(newSlice) - 1 } - nextIndex := (b.firstIndex + b.size) % len(b.points) + nextIndex := (b.firstIndex + b.size) & b.pointsIndexMask b.points[nextIndex] = p b.size++ return nil @@ -114,7 +122,7 @@ func (b *FPointRingBuffer) ViewUntilSearchingBackwards(maxT int64, existing *FPo // pointAt returns the point at index 'position'. func (b *FPointRingBuffer) pointAt(position int) promql.FPoint { - return b.points[(b.firstIndex+position)%len(b.points)] + return b.points[(b.firstIndex+position)&b.pointsIndexMask] } // Reset clears the contents of this buffer, but retains the underlying point slice for future reuse. @@ -136,12 +144,19 @@ func (b *FPointRingBuffer) Release() { // s will be modified in place when the buffer is modified, and callers should not modify s after passing it off to the ring buffer via Use. // s will be returned to the pool when Close is called, Use is called again, or the buffer needs to expand, so callers // should not return s to the pool themselves. +// s must have a capacity that is a power of two. func (b *FPointRingBuffer) Use(s []promql.FPoint) { + if !isPowerOfTwo(cap(s)) { + // We rely on the capacity being a power of two for the pointsIndexMask optimisation below. + panic(fmt.Sprintf("slice capacity must be a power of two, but is %v", cap(s))) + } + putFPointSliceForRingBuffer(b.points, b.memoryConsumptionTracker) - b.points = s + b.points = s[:cap(s)] b.firstIndex = 0 b.size = len(s) + b.pointsIndexMask = cap(s) - 1 } // Close releases any resources associated with this buffer. @@ -243,3 +258,7 @@ func (v FPointRingBufferView) Any() bool { // These hooks exist so we can override them during unit tests. var getFPointSliceForRingBuffer = FPointSlicePool.Get var putFPointSliceForRingBuffer = FPointSlicePool.Put + +func isPowerOfTwo(n int) bool { + return (n & (n - 1)) == 0 +} diff --git a/pkg/streamingpromql/types/hpoint_ring_buffer.go b/pkg/streamingpromql/types/hpoint_ring_buffer.go index b75c9eb8cf9..7aea59e251d 100644 --- a/pkg/streamingpromql/types/hpoint_ring_buffer.go +++ b/pkg/streamingpromql/types/hpoint_ring_buffer.go @@ -3,6 +3,7 @@ package types import ( + "fmt" "github.com/prometheus/prometheus/promql" "github.com/grafana/mimir/pkg/streamingpromql/limiting" @@ -18,6 +19,7 @@ import ( type HPointRingBuffer struct { memoryConsumptionTracker *limiting.MemoryConsumptionTracker points []promql.HPoint + pointsIndexMask int // Bitmask used to calculate indices into points efficiently. Computing modulo is relatively expensive, but points is always sized as a power of two, so we can a bitmask to calculate remainders cheaply. firstIndex int // Index into 'points' of first point in this buffer. size int // Number of points in this buffer. } @@ -95,7 +97,7 @@ func (b *HPointRingBuffer) ViewUntilSearchingBackwards(maxT int64, existing *HPo // pointAt returns the point at index 'position'. func (b *HPointRingBuffer) pointAt(position int) promql.HPoint { - return b.points[(b.firstIndex+position)%len(b.points)] + return b.points[(b.firstIndex+position)&b.pointsIndexMask] } // NextPoint gets the next point in this buffer, expanding it if required. @@ -118,6 +120,11 @@ func (b *HPointRingBuffer) NextPoint() (*promql.HPoint, error) { return nil, err } + if !isPowerOfTwo(cap(newSlice)) { + // We rely on the capacity being a power of two for the pointsIndexMask optimisation below. + panic(fmt.Sprintf("pool returned slice of capacity %v (requested %v), but wanted a power of two", cap(newSlice), newSize)) + } + newSlice = newSlice[:cap(newSlice)] pointsAtEnd := b.size - b.firstIndex copy(newSlice, b.points[b.firstIndex:]) @@ -131,9 +138,10 @@ func (b *HPointRingBuffer) NextPoint() (*promql.HPoint, error) { putHPointSliceForRingBuffer(b.points, b.memoryConsumptionTracker) b.points = newSlice b.firstIndex = 0 + b.pointsIndexMask = cap(newSlice) - 1 } - nextIndex := (b.firstIndex + b.size) % len(b.points) + nextIndex := (b.firstIndex + b.size) & b.pointsIndexMask b.size++ return &b.points[nextIndex], nil } @@ -173,12 +181,19 @@ func (b *HPointRingBuffer) Release() { // s will be modified in place when the buffer is modified, and callers should not modify s after passing it off to the ring buffer via Use. // s will be returned to the pool when Close is called, Use is called again, or the buffer needs to expand, so callers // should not return s to the pool themselves. +// s must have a capacity that is a power of two. func (b *HPointRingBuffer) Use(s []promql.HPoint) { + if !isPowerOfTwo(cap(s)) { + // We rely on the capacity being a power of two for the pointsIndexMask optimisation below. + panic(fmt.Sprintf("slice capacity must be a power of two, but is %v", cap(s))) + } + putHPointSliceForRingBuffer(b.points, b.memoryConsumptionTracker) - b.points = s + b.points = s[:cap(s)] b.firstIndex = 0 b.size = len(s) + b.pointsIndexMask = cap(s) - 1 } // Close releases any resources associated with this buffer. diff --git a/pkg/streamingpromql/types/limiting_pool.go b/pkg/streamingpromql/types/limiting_pool.go index 97869d92385..5fe1563f6aa 100644 --- a/pkg/streamingpromql/types/limiting_pool.go +++ b/pkg/streamingpromql/types/limiting_pool.go @@ -14,7 +14,7 @@ import ( const ( maxExpectedPointsPerSeries = 100_000 // There's not too much science behind this number: 100000 points allows for a point per minute for just under 70 days. - pointsPerSeriesBucketFactor = 2.0 + pointsPerSeriesBucketFactor = 2 // Treat a native histogram sample as equivalent to this many float samples when considering max in-memory bytes limit. // Keep in mind that float sample = timestamp + float value, so 5x this is equivalent to five timestamps and five floats. diff --git a/pkg/streamingpromql/types/pool.go b/pkg/streamingpromql/types/pool.go index 889dd43680e..eebc9c3ea7e 100644 --- a/pkg/streamingpromql/types/pool.go +++ b/pkg/streamingpromql/types/pool.go @@ -10,7 +10,7 @@ import ( const ( maxExpectedSeriesPerResult = 10_000_000 // Likewise, there's not too much science behind this number: this is the based on examining the largest queries seen at Grafana Labs. - seriesPerResultBucketFactor = 2.0 + seriesPerResultBucketFactor = 2 ) var ( diff --git a/pkg/streamingpromql/types/ring_buffer_test.go b/pkg/streamingpromql/types/ring_buffer_test.go index bb7bd48a328..982d1b6cadf 100644 --- a/pkg/streamingpromql/types/ring_buffer_test.go +++ b/pkg/streamingpromql/types/ring_buffer_test.go @@ -117,7 +117,9 @@ func testRingBuffer[T any](t *testing.T, buf ringBuffer[T], points []T) { require.NoError(t, buf.Append(points[8])) shouldHavePoints(t, buf, points[8]) - buf.Use(points) + pointsWithPowerOfTwoCapacity := make([]T, 0, 16) // Use must be passed a slice with a capacity that is equal to a power of 2. + pointsWithPowerOfTwoCapacity = append(pointsWithPowerOfTwoCapacity, points...) + buf.Use(pointsWithPowerOfTwoCapacity) shouldHavePoints(t, buf, points...) buf.DiscardPointsBefore(5) @@ -126,7 +128,9 @@ func testRingBuffer[T any](t *testing.T, buf ringBuffer[T], points []T) { buf.Release() shouldHaveNoPoints(t, buf) - buf.Use(points[4:]) + subsliceWithPowerOfTwoCapacity := make([]T, 0, 8) // Use must be passed a slice with a capacity that is equal to a power of 2. + subsliceWithPowerOfTwoCapacity = append(subsliceWithPowerOfTwoCapacity, points[4:]...) + buf.Use(subsliceWithPowerOfTwoCapacity) shouldHavePoints(t, buf, points[4:]...) } diff --git a/pkg/util/pool/bucketed_pool.go b/pkg/util/pool/bucketed_pool.go index 6c10d884e44..bbdfaa1e6f8 100644 --- a/pkg/util/pool/bucketed_pool.go +++ b/pkg/util/pool/bucketed_pool.go @@ -21,7 +21,7 @@ type BucketedPool[T ~[]E, E any] struct { // NewBucketedPool returns a new BucketedPool with size buckets for minSize to maxSize // increasing by the given factor. -func NewBucketedPool[T ~[]E, E any](minSize, maxSize int, factor float64, makeFunc func(int) T) *BucketedPool[T, E] { +func NewBucketedPool[T ~[]E, E any](minSize, maxSize int, factor int, makeFunc func(int) T) *BucketedPool[T, E] { if minSize < 1 { panic("invalid minimum pool size") } @@ -34,7 +34,7 @@ func NewBucketedPool[T ~[]E, E any](minSize, maxSize int, factor float64, makeFu var sizes []int - for s := minSize; s <= maxSize; s = int(float64(s) * factor) { + for s := minSize; s <= maxSize; s = s * factor { sizes = append(sizes, s) }