Skip to content

Commit

Permalink
Replace use of remainder with bit masking
Browse files Browse the repository at this point in the history
  • Loading branch information
charleskorn committed Nov 10, 2024
1 parent 9fe0ad8 commit c4b87bc
Show file tree
Hide file tree
Showing 6 changed files with 50 additions and 12 deletions.
25 changes: 22 additions & 3 deletions pkg/streamingpromql/types/fpoint_ring_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
package types

import (
"fmt"
"github.com/prometheus/prometheus/promql"

"github.com/grafana/mimir/pkg/streamingpromql/limiting"
Expand All @@ -18,6 +19,7 @@ import (
type FPointRingBuffer struct {
memoryConsumptionTracker *limiting.MemoryConsumptionTracker
points []promql.FPoint
pointsIndexMask int // Bitmask used to calculate indices into points efficiently. Computing modulo is relatively expensive, but points is always sized as a power of two, so we can a bitmask to calculate remainders cheaply.
firstIndex int // Index into 'points' of first point in this buffer.
size int // Number of points in this buffer.
}
Expand Down Expand Up @@ -58,6 +60,11 @@ func (b *FPointRingBuffer) Append(p promql.FPoint) error {
return err
}

if !isPowerOfTwo(cap(newSlice)) {
// We rely on the capacity being a power of two for the pointsIndexMask optimisation below.
panic(fmt.Sprintf("pool returned slice of capacity %v (requested %v), but wanted a power of two", cap(newSlice), newSize))
}

newSlice = newSlice[:cap(newSlice)]
pointsAtEnd := b.size - b.firstIndex
copy(newSlice, b.points[b.firstIndex:])
Expand All @@ -66,9 +73,10 @@ func (b *FPointRingBuffer) Append(p promql.FPoint) error {
putFPointSliceForRingBuffer(b.points, b.memoryConsumptionTracker)
b.points = newSlice
b.firstIndex = 0
b.pointsIndexMask = cap(newSlice) - 1
}

nextIndex := (b.firstIndex + b.size) % len(b.points)
nextIndex := (b.firstIndex + b.size) & b.pointsIndexMask
b.points[nextIndex] = p
b.size++
return nil
Expand Down Expand Up @@ -114,7 +122,7 @@ func (b *FPointRingBuffer) ViewUntilSearchingBackwards(maxT int64, existing *FPo

// pointAt returns the point at index 'position'.
func (b *FPointRingBuffer) pointAt(position int) promql.FPoint {
return b.points[(b.firstIndex+position)%len(b.points)]
return b.points[(b.firstIndex+position)&b.pointsIndexMask]
}

// Reset clears the contents of this buffer, but retains the underlying point slice for future reuse.
Expand All @@ -136,12 +144,19 @@ func (b *FPointRingBuffer) Release() {
// s will be modified in place when the buffer is modified, and callers should not modify s after passing it off to the ring buffer via Use.
// s will be returned to the pool when Close is called, Use is called again, or the buffer needs to expand, so callers
// should not return s to the pool themselves.
// s must have a capacity that is a power of two.
func (b *FPointRingBuffer) Use(s []promql.FPoint) {
if !isPowerOfTwo(cap(s)) {
// We rely on the capacity being a power of two for the pointsIndexMask optimisation below.
panic(fmt.Sprintf("slice capacity must be a power of two, but is %v", cap(s)))
}

putFPointSliceForRingBuffer(b.points, b.memoryConsumptionTracker)

b.points = s
b.points = s[:cap(s)]
b.firstIndex = 0
b.size = len(s)
b.pointsIndexMask = cap(s) - 1
}

// Close releases any resources associated with this buffer.
Expand Down Expand Up @@ -243,3 +258,7 @@ func (v FPointRingBufferView) Any() bool {
// These hooks exist so we can override them during unit tests.
var getFPointSliceForRingBuffer = FPointSlicePool.Get
var putFPointSliceForRingBuffer = FPointSlicePool.Put

func isPowerOfTwo(n int) bool {
return (n & (n - 1)) == 0
}
21 changes: 18 additions & 3 deletions pkg/streamingpromql/types/hpoint_ring_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
package types

import (
"fmt"
"github.com/prometheus/prometheus/promql"

"github.com/grafana/mimir/pkg/streamingpromql/limiting"
Expand All @@ -18,6 +19,7 @@ import (
type HPointRingBuffer struct {
memoryConsumptionTracker *limiting.MemoryConsumptionTracker
points []promql.HPoint
pointsIndexMask int // Bitmask used to calculate indices into points efficiently. Computing modulo is relatively expensive, but points is always sized as a power of two, so we can a bitmask to calculate remainders cheaply.
firstIndex int // Index into 'points' of first point in this buffer.
size int // Number of points in this buffer.
}
Expand Down Expand Up @@ -95,7 +97,7 @@ func (b *HPointRingBuffer) ViewUntilSearchingBackwards(maxT int64, existing *HPo

// pointAt returns the point at index 'position'.
func (b *HPointRingBuffer) pointAt(position int) promql.HPoint {
return b.points[(b.firstIndex+position)%len(b.points)]
return b.points[(b.firstIndex+position)&b.pointsIndexMask]
}

// NextPoint gets the next point in this buffer, expanding it if required.
Expand All @@ -118,6 +120,11 @@ func (b *HPointRingBuffer) NextPoint() (*promql.HPoint, error) {
return nil, err
}

if !isPowerOfTwo(cap(newSlice)) {
// We rely on the capacity being a power of two for the pointsIndexMask optimisation below.
panic(fmt.Sprintf("pool returned slice of capacity %v (requested %v), but wanted a power of two", cap(newSlice), newSize))
}

newSlice = newSlice[:cap(newSlice)]
pointsAtEnd := b.size - b.firstIndex
copy(newSlice, b.points[b.firstIndex:])
Expand All @@ -131,9 +138,10 @@ func (b *HPointRingBuffer) NextPoint() (*promql.HPoint, error) {
putHPointSliceForRingBuffer(b.points, b.memoryConsumptionTracker)
b.points = newSlice
b.firstIndex = 0
b.pointsIndexMask = cap(newSlice) - 1
}

nextIndex := (b.firstIndex + b.size) % len(b.points)
nextIndex := (b.firstIndex + b.size) & b.pointsIndexMask
b.size++
return &b.points[nextIndex], nil
}
Expand Down Expand Up @@ -173,12 +181,19 @@ func (b *HPointRingBuffer) Release() {
// s will be modified in place when the buffer is modified, and callers should not modify s after passing it off to the ring buffer via Use.
// s will be returned to the pool when Close is called, Use is called again, or the buffer needs to expand, so callers
// should not return s to the pool themselves.
// s must have a capacity that is a power of two.
func (b *HPointRingBuffer) Use(s []promql.HPoint) {
if !isPowerOfTwo(cap(s)) {
// We rely on the capacity being a power of two for the pointsIndexMask optimisation below.
panic(fmt.Sprintf("slice capacity must be a power of two, but is %v", cap(s)))
}

putHPointSliceForRingBuffer(b.points, b.memoryConsumptionTracker)

b.points = s
b.points = s[:cap(s)]
b.firstIndex = 0
b.size = len(s)
b.pointsIndexMask = cap(s) - 1
}

// Close releases any resources associated with this buffer.
Expand Down
2 changes: 1 addition & 1 deletion pkg/streamingpromql/types/limiting_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (

const (
maxExpectedPointsPerSeries = 100_000 // There's not too much science behind this number: 100000 points allows for a point per minute for just under 70 days.
pointsPerSeriesBucketFactor = 2.0
pointsPerSeriesBucketFactor = 2

// Treat a native histogram sample as equivalent to this many float samples when considering max in-memory bytes limit.
// Keep in mind that float sample = timestamp + float value, so 5x this is equivalent to five timestamps and five floats.
Expand Down
2 changes: 1 addition & 1 deletion pkg/streamingpromql/types/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (

const (
maxExpectedSeriesPerResult = 10_000_000 // Likewise, there's not too much science behind this number: this is the based on examining the largest queries seen at Grafana Labs.
seriesPerResultBucketFactor = 2.0
seriesPerResultBucketFactor = 2
)

var (
Expand Down
8 changes: 6 additions & 2 deletions pkg/streamingpromql/types/ring_buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,9 @@ func testRingBuffer[T any](t *testing.T, buf ringBuffer[T], points []T) {
require.NoError(t, buf.Append(points[8]))
shouldHavePoints(t, buf, points[8])

buf.Use(points)
pointsWithPowerOfTwoCapacity := make([]T, 0, 16) // Use must be passed a slice with a capacity that is equal to a power of 2.
pointsWithPowerOfTwoCapacity = append(pointsWithPowerOfTwoCapacity, points...)
buf.Use(pointsWithPowerOfTwoCapacity)
shouldHavePoints(t, buf, points...)

buf.DiscardPointsBefore(5)
Expand All @@ -126,7 +128,9 @@ func testRingBuffer[T any](t *testing.T, buf ringBuffer[T], points []T) {
buf.Release()
shouldHaveNoPoints(t, buf)

buf.Use(points[4:])
subsliceWithPowerOfTwoCapacity := make([]T, 0, 8) // Use must be passed a slice with a capacity that is equal to a power of 2.
subsliceWithPowerOfTwoCapacity = append(subsliceWithPowerOfTwoCapacity, points[4:]...)
buf.Use(subsliceWithPowerOfTwoCapacity)
shouldHavePoints(t, buf, points[4:]...)
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/util/pool/bucketed_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ type BucketedPool[T ~[]E, E any] struct {

// NewBucketedPool returns a new BucketedPool with size buckets for minSize to maxSize
// increasing by the given factor.
func NewBucketedPool[T ~[]E, E any](minSize, maxSize int, factor float64, makeFunc func(int) T) *BucketedPool[T, E] {
func NewBucketedPool[T ~[]E, E any](minSize, maxSize int, factor int, makeFunc func(int) T) *BucketedPool[T, E] {
if minSize < 1 {
panic("invalid minimum pool size")
}
Expand All @@ -34,7 +34,7 @@ func NewBucketedPool[T ~[]E, E any](minSize, maxSize int, factor float64, makeFu

var sizes []int

for s := minSize; s <= maxSize; s = int(float64(s) * factor) {
for s := minSize; s <= maxSize; s = s * factor {
sizes = append(sizes, s)
}

Expand Down

0 comments on commit c4b87bc

Please sign in to comment.