Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MQE: improve performance of subqueries when the parent query is a range query with many steps #9719

Merged
merged 16 commits into from
Nov 11, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/streamingpromql/operators/functions/rate_increase.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func histogramRate(isRate bool, step types.RangeVectorStepData, hHead []promql.H
return val, err
}

func floatRate(isRate bool, fCount int, floatBuffer types.FPointRingBufferView, step types.RangeVectorStepData, fHead []promql.FPoint, fTail []promql.FPoint, rangeSeconds float64) float64 {
func floatRate(isRate bool, fCount int, floatBuffer *types.FPointRingBufferView, step types.RangeVectorStepData, fHead []promql.FPoint, fTail []promql.FPoint, rangeSeconds float64) float64 {
firstPoint := floatBuffer.First()

var lastPoint promql.FPoint
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ type RangeVectorSelector struct {
chunkIterator chunkenc.Iterator
nextT int64
floats *types.FPointRingBuffer
floatView *types.FPointRingBufferView
histograms *types.HPointRingBuffer
histogramView *types.HPointRingBufferView
}

var _ types.RangeVectorOperator = &RangeVectorSelector{}
Expand Down Expand Up @@ -95,10 +97,12 @@ func (m *RangeVectorSelector) NextStepSamples() (types.RangeVectorStepData, erro
}

m.nextT += m.Selector.TimeRange.IntervalMilliseconds
m.floatView = m.floats.ViewUntil(rangeEnd, false, m.floatView)
m.histogramView = m.histograms.ViewUntil(rangeEnd, false, m.histogramView)

return types.RangeVectorStepData{
Floats: m.floats.ViewUntil(rangeEnd, false),
Histograms: m.histograms.ViewUntil(rangeEnd, false),
Floats: m.floatView,
Histograms: m.histogramView,
StepT: stepT,
RangeStart: rangeStart,
RangeEnd: rangeEnd,
Expand Down
8 changes: 6 additions & 2 deletions pkg/streamingpromql/operators/subquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ type Subquery struct {
nextStepT int64
rangeMilliseconds int64
floats *types.FPointRingBuffer
floatView *types.FPointRingBufferView
histograms *types.HPointRingBuffer
histogramView *types.HPointRingBufferView
}

var _ types.RangeVectorOperator = &Subquery{}
Expand Down Expand Up @@ -92,10 +94,12 @@ func (s *Subquery) NextStepSamples() (types.RangeVectorStepData, error) {
s.histograms.DiscardPointsBefore(rangeStart)

s.nextStepT += s.ParentQueryTimeRange.IntervalMilliseconds
s.floatView = s.floats.ViewUntil(rangeEnd, true, s.floatView)
s.histogramView = s.histograms.ViewUntil(rangeEnd, true, s.histogramView)

return types.RangeVectorStepData{
Floats: s.floats.ViewUntil(rangeEnd, true),
Histograms: s.histograms.ViewUntil(rangeEnd, true),
Floats: s.floatView,
Histograms: s.histogramView,
StepT: stepT,
RangeStart: rangeStart,
RangeEnd: rangeEnd,
Expand Down
4 changes: 2 additions & 2 deletions pkg/streamingpromql/types/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,14 +85,14 @@ func (i *InstantVectorSeriesDataIterator) Next() (t int64, f float64, h *histogr
// - RangeEnd is 1712016000000 (2024-04-02T00:00:00Z)
type RangeVectorStepData struct {
// Floats contains the float samples for this time step.
Floats FPointRingBufferView
Floats *FPointRingBufferView

// Histograms contains the histogram samples for this time step.
//
// FloatHistogram instances in the buffer must not be modified as they may be returned for subsequent steps.
// FloatHistogram instances that are retained after the next call to NextStepSamples must be copied, as they
// may be modified on subsequent calls to NextStepSamples.
Histograms HPointRingBufferView
Histograms *HPointRingBufferView

// StepT is the timestamp of this time step.
StepT int64
Expand Down
13 changes: 10 additions & 3 deletions pkg/streamingpromql/types/fpoint_ring_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,16 +80,22 @@ func (b *FPointRingBuffer) Append(p promql.FPoint) error {
// Set searchForwards to true if it is expected that there are many points with timestamp greater than maxT, and few points with
// earlier timestamps.
// Set searchForwards to false if it is expected that only a few of the points will have timestamp greater than maxT.
// existing is an existing view instance for this buffer that is reused if provided. It can be nil.
// The returned view is no longer valid if this buffer is modified (eg. a point is added, or the buffer is reset or closed).
func (b *FPointRingBuffer) ViewUntil(maxT int64, searchForwards bool) FPointRingBufferView {
func (b *FPointRingBuffer) ViewUntil(maxT int64, searchForwards bool, existing *FPointRingBufferView) *FPointRingBufferView {
if existing == nil {
existing = &FPointRingBufferView{buffer: b}
}

if searchForwards {
size := 0

for size < b.size && b.pointAt(size).T <= maxT {
size++
}

return FPointRingBufferView{buffer: b, size: size}
existing.size = size
return existing
}

size := b.size
Expand All @@ -98,7 +104,8 @@ func (b *FPointRingBuffer) ViewUntil(maxT int64, searchForwards bool) FPointRing
size--
}

return FPointRingBufferView{buffer: b, size: size}
existing.size = size
return existing
}

// pointAt returns the point at index 'position'.
Expand Down
13 changes: 10 additions & 3 deletions pkg/streamingpromql/types/hpoint_ring_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,16 +61,22 @@ func (b *HPointRingBuffer) Append(p promql.HPoint) error {
// Set searchForwards to true if it is expected that there are many points with timestamp greater than maxT, and few points with
// earlier timestamps.
// Set searchForwards to false if it is expected that only a few of the points will have timestamp greater than maxT.
// existing is an existing view instance for this buffer that is reused if provided. It can be nil.
// The returned view is no longer valid if this buffer is modified (eg. a point is added, or the buffer is reset or closed).
func (b *HPointRingBuffer) ViewUntil(maxT int64, searchForwards bool) HPointRingBufferView {
func (b *HPointRingBuffer) ViewUntil(maxT int64, searchForwards bool, existing *HPointRingBufferView) *HPointRingBufferView {
if existing == nil {
existing = &HPointRingBufferView{buffer: b}
}

if searchForwards {
size := 0

for size < b.size && b.pointAt(size).T <= maxT {
size++
}

return HPointRingBufferView{buffer: b, size: size}
existing.size = size
return existing
}

size := b.size
Expand All @@ -79,7 +85,8 @@ func (b *HPointRingBuffer) ViewUntil(maxT int64, searchForwards bool) HPointRing
size--
}

return HPointRingBufferView{buffer: b, size: size}
existing.size = size
return existing
}

// pointAt returns the point at index 'position'.
Expand Down
4 changes: 2 additions & 2 deletions pkg/streamingpromql/types/ring_buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,7 @@ type fPointRingBufferWrapper struct {
}

func (w *fPointRingBufferWrapper) ViewUntilForTesting(maxT int64, searchForwards bool) ringBufferView[promql.FPoint] {
return w.ViewUntil(maxT, searchForwards)
return w.ViewUntil(maxT, searchForwards, nil)
charleskorn marked this conversation as resolved.
Show resolved Hide resolved
}

func (w *fPointRingBufferWrapper) GetPoints() []promql.FPoint {
Expand All @@ -391,7 +391,7 @@ type hPointRingBufferWrapper struct {
}

func (w *hPointRingBufferWrapper) ViewUntilForTesting(maxT int64, searchForwards bool) ringBufferView[promql.HPoint] {
return w.ViewUntil(maxT, searchForwards)
return w.ViewUntil(maxT, searchForwards, nil)
}

func (w *hPointRingBufferWrapper) GetPoints() []promql.HPoint {
Expand Down