Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MQE: improve performance of subqueries when the parent query is a range query with many steps #9719

Merged
merged 16 commits into from
Nov 11, 2024
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* `cortex_alertmanager_silences`
* [CHANGE] Distributor: Drop experimental `-distributor.direct-otlp-translation-enabled` flag, since direct OTLP translation is well tested at this point. #9647
* [FEATURE] Distributor: Add support for `lz4` OTLP compression. #9763
* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #9367 #9368 #9398 #9399 #9403 #9417 #9418 #9419 #9420 #9482 #9504 #9505 #9507 #9518 #9531 #9532 #9533 #9553 #9558 #9588 #9589 #9639 #9641 #9642 #9651 #9664 #9681 #9717
* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #9367 #9368 #9398 #9399 #9403 #9417 #9418 #9419 #9420 #9482 #9504 #9505 #9507 #9518 #9531 #9532 #9533 #9553 #9558 #9588 #9589 #9639 #9641 #9642 #9651 #9664 #9681 #9717 #9719
* [FEATURE] Query-frontend: added experimental configuration options `query-frontend.cache-errors` and `query-frontend.results-cache-ttl-for-errors` to allow non-transient responses to be cached. When set to `true` error responses from hitting limits or bad data are cached for a short TTL. #9028
* [FEATURE] gRPC: Support S2 compression. #9322
* `-alertmanager.alertmanager-client.grpc-compression=s2`
Expand Down
2 changes: 1 addition & 1 deletion pkg/streamingpromql/operators/functions/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func PassthroughData(seriesData types.InstantVectorSeriesData, _ []types.ScalarD
// - h *histogram.FloatHistogram: nil if no histogram is present.
// - err error.
type RangeVectorStepFunction func(
step types.RangeVectorStepData,
step *types.RangeVectorStepData,
rangeSeconds float64,
emitAnnotation types.EmitAnnotationFunc,
) (f float64, hasFloat bool, h *histogram.FloatHistogram, err error)
Expand Down
86 changes: 26 additions & 60 deletions pkg/streamingpromql/operators/functions/range_vectors.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ var CountOverTime = FunctionOverRangeVectorDefinition{
StepFunc: countOverTime,
}

func countOverTime(step types.RangeVectorStepData, _ float64, _ types.EmitAnnotationFunc) (float64, bool, *histogram.FloatHistogram, error) {
fPointCount := step.Floats.CountAtOrBefore(step.RangeEnd)
hPointCount := step.Histograms.CountAtOrBefore(step.RangeEnd)
func countOverTime(step *types.RangeVectorStepData, _ float64, _ types.EmitAnnotationFunc) (float64, bool, *histogram.FloatHistogram, error) {
fPointCount := step.Floats.Count()
hPointCount := step.Histograms.Count()

if fPointCount == 0 && hPointCount == 0 {
return 0, false, nil, nil
Expand All @@ -37,9 +37,9 @@ var LastOverTime = FunctionOverRangeVectorDefinition{
StepFunc: lastOverTime,
}

func lastOverTime(step types.RangeVectorStepData, _ float64, _ types.EmitAnnotationFunc) (float64, bool, *histogram.FloatHistogram, error) {
lastFloat, floatAvailable := step.Floats.LastAtOrBefore(step.RangeEnd)
lastHistogram, histogramAvailable := step.Histograms.LastAtOrBefore(step.RangeEnd)
func lastOverTime(step *types.RangeVectorStepData, _ float64, _ types.EmitAnnotationFunc) (float64, bool, *histogram.FloatHistogram, error) {
lastFloat, floatAvailable := step.Floats.Last()
lastHistogram, histogramAvailable := step.Histograms.Last()

if !floatAvailable && !histogramAvailable {
return 0, false, nil, nil
Expand All @@ -58,8 +58,8 @@ var PresentOverTime = FunctionOverRangeVectorDefinition{
StepFunc: presentOverTime,
}

func presentOverTime(step types.RangeVectorStepData, _ float64, _ types.EmitAnnotationFunc) (float64, bool, *histogram.FloatHistogram, error) {
if step.Floats.AnyAtOrBefore(step.RangeEnd) || step.Histograms.AnyAtOrBefore(step.RangeEnd) {
func presentOverTime(step *types.RangeVectorStepData, _ float64, _ types.EmitAnnotationFunc) (float64, bool, *histogram.FloatHistogram, error) {
if step.Floats.Any() || step.Histograms.Any() {
return 1, true, nil, nil
}

Expand All @@ -71,22 +71,15 @@ var MaxOverTime = FunctionOverRangeVectorDefinition{
StepFunc: maxOverTime,
}

func maxOverTime(step types.RangeVectorStepData, _ float64, _ types.EmitAnnotationFunc) (float64, bool, *histogram.FloatHistogram, error) {
head, tail := step.Floats.UnsafePoints(step.RangeEnd)
func maxOverTime(step *types.RangeVectorStepData, _ float64, _ types.EmitAnnotationFunc) (float64, bool, *histogram.FloatHistogram, error) {
head, tail := step.Floats.UnsafePoints()

if len(head) == 0 && len(tail) == 0 {
return 0, false, nil, nil
}

var maxSoFar float64

if len(head) > 0 {
maxSoFar = head[0].F
head = head[1:]
} else {
maxSoFar = tail[0].F
tail = tail[1:]
}
maxSoFar := head[0].F
head = head[1:]

for _, p := range head {
if p.F > maxSoFar || math.IsNaN(maxSoFar) {
Expand All @@ -108,22 +101,15 @@ var MinOverTime = FunctionOverRangeVectorDefinition{
StepFunc: minOverTime,
}

func minOverTime(step types.RangeVectorStepData, _ float64, _ types.EmitAnnotationFunc) (float64, bool, *histogram.FloatHistogram, error) {
head, tail := step.Floats.UnsafePoints(step.RangeEnd)
func minOverTime(step *types.RangeVectorStepData, _ float64, _ types.EmitAnnotationFunc) (float64, bool, *histogram.FloatHistogram, error) {
head, tail := step.Floats.UnsafePoints()

if len(head) == 0 && len(tail) == 0 {
return 0, false, nil, nil
}

var minSoFar float64

if len(head) > 0 {
minSoFar = head[0].F
head = head[1:]
} else {
minSoFar = tail[0].F
tail = tail[1:]
}
minSoFar := head[0].F
head = head[1:]

for _, p := range head {
if p.F < minSoFar || math.IsNaN(minSoFar) {
Expand All @@ -146,9 +132,9 @@ var SumOverTime = FunctionOverRangeVectorDefinition{
NeedsSeriesNamesForAnnotations: true,
}

func sumOverTime(step types.RangeVectorStepData, _ float64, emitAnnotation types.EmitAnnotationFunc) (float64, bool, *histogram.FloatHistogram, error) {
fHead, fTail := step.Floats.UnsafePoints(step.RangeEnd)
hHead, hTail := step.Histograms.UnsafePoints(step.RangeEnd)
func sumOverTime(step *types.RangeVectorStepData, _ float64, emitAnnotation types.EmitAnnotationFunc) (float64, bool, *histogram.FloatHistogram, error) {
fHead, fTail := step.Floats.UnsafePoints()
hHead, hTail := step.Histograms.UnsafePoints()

haveFloats := len(fHead) > 0 || len(fTail) > 0
haveHistograms := len(hHead) > 0 || len(hTail) > 0
Expand Down Expand Up @@ -185,18 +171,8 @@ func sumFloats(head, tail []promql.FPoint) float64 {
}

func sumHistograms(head, tail []promql.HPoint, emitAnnotation types.EmitAnnotationFunc) (*histogram.FloatHistogram, error) {
var sum *histogram.FloatHistogram

if len(head) > 0 {
sum = head[0].H
head = head[1:]
} else {
sum = tail[0].H
tail = tail[1:]
}

// We must make a copy of the histogram, as the ring buffer may reuse the FloatHistogram instance on subsequent steps.
sum = sum.Copy()
sum := head[0].H.Copy() // We must make a copy of the histogram, as the ring buffer may reuse the FloatHistogram instance on subsequent steps.
head = head[1:]

for _, p := range head {
if _, err := sum.Add(p.H); err != nil {
Expand All @@ -221,9 +197,9 @@ var AvgOverTime = FunctionOverRangeVectorDefinition{
NeedsSeriesNamesForAnnotations: true,
}

func avgOverTime(step types.RangeVectorStepData, _ float64, emitAnnotation types.EmitAnnotationFunc) (float64, bool, *histogram.FloatHistogram, error) {
fHead, fTail := step.Floats.UnsafePoints(step.RangeEnd)
hHead, hTail := step.Histograms.UnsafePoints(step.RangeEnd)
func avgOverTime(step *types.RangeVectorStepData, _ float64, emitAnnotation types.EmitAnnotationFunc) (float64, bool, *histogram.FloatHistogram, error) {
fHead, fTail := step.Floats.UnsafePoints()
hHead, hTail := step.Histograms.UnsafePoints()

haveFloats := len(fHead) > 0 || len(fTail) > 0
haveHistograms := len(hHead) > 0 || len(hTail) > 0
Expand Down Expand Up @@ -306,20 +282,10 @@ func avgFloats(head, tail []promql.FPoint) float64 {
}

func avgHistograms(head, tail []promql.HPoint) (*histogram.FloatHistogram, error) {
var avgSoFar *histogram.FloatHistogram
avgSoFar := head[0].H.Copy() // We must make a copy of the histogram, as the ring buffer may reuse the FloatHistogram instance on subsequent steps.
head = head[1:]
count := 1.0

if len(head) > 0 {
avgSoFar = head[0].H
head = head[1:]
} else {
avgSoFar = tail[0].H
tail = tail[1:]
}

// We must make a copy of the histogram, as the ring buffer may reuse the FloatHistogram instance on subsequent steps.
avgSoFar = avgSoFar.Copy()

// Reuse these instances if we need them, to avoid allocating two FloatHistograms for every remaining histogram in the range.
var contributionByP *histogram.FloatHistogram
var contributionByAvgSoFar *histogram.FloatHistogram
Expand Down
32 changes: 14 additions & 18 deletions pkg/streamingpromql/operators/functions/rate_increase.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,11 @@ var Increase = FunctionOverRangeVectorDefinition{

// isRate is true for `rate` function, or false for `instant` function
func rate(isRate bool) RangeVectorStepFunction {
return func(step types.RangeVectorStepData, rangeSeconds float64, emitAnnotation types.EmitAnnotationFunc) (float64, bool, *histogram.FloatHistogram, error) {
fHead, fTail := step.Floats.UnsafePoints(step.RangeEnd)
return func(step *types.RangeVectorStepData, rangeSeconds float64, emitAnnotation types.EmitAnnotationFunc) (float64, bool, *histogram.FloatHistogram, error) {
fHead, fTail := step.Floats.UnsafePoints()
fCount := len(fHead) + len(fTail)

hHead, hTail := step.Histograms.UnsafePoints(step.RangeEnd)
hHead, hTail := step.Histograms.UnsafePoints()
hCount := len(hHead) + len(hTail)

if fCount > 0 && hCount > 0 {
Expand All @@ -46,12 +46,13 @@ func rate(isRate bool) RangeVectorStepFunction {
}

if fCount >= 2 {
val := floatRate(isRate, fCount, step.Floats, step, fHead, fTail, rangeSeconds)
// TODO: just pass step here? (and below)
val := floatRate(isRate, fCount, fHead, fTail, step.RangeStart, step.RangeEnd, rangeSeconds)
return val, true, nil, nil
}

if hCount >= 2 {
val, err := histogramRate(isRate, step, hHead, hTail, rangeSeconds, hCount, emitAnnotation)
val, err := histogramRate(isRate, hCount, hHead, hTail, step.RangeStart, step.RangeEnd, rangeSeconds, emitAnnotation)
if err != nil {
err = NativeHistogramErrorToAnnotation(err, emitAnnotation)
return 0, false, nil, err
Expand All @@ -63,15 +64,9 @@ func rate(isRate bool) RangeVectorStepFunction {
}
}

func histogramRate(isRate bool, step types.RangeVectorStepData, hHead []promql.HPoint, hTail []promql.HPoint, rangeSeconds float64, hCount int, emitAnnotation types.EmitAnnotationFunc) (*histogram.FloatHistogram, error) {
var firstPoint promql.HPoint
if len(hHead) > 0 {
firstPoint = hHead[0]
hHead = hHead[1:]
} else {
firstPoint = hTail[0]
hTail = hTail[1:]
}
func histogramRate(isRate bool, hCount int, hHead []promql.HPoint, hTail []promql.HPoint, rangeStart int64, rangeEnd int64, rangeSeconds float64, emitAnnotation types.EmitAnnotationFunc) (*histogram.FloatHistogram, error) {
firstPoint := hHead[0]
hHead = hHead[1:]

var lastPoint promql.HPoint
if len(hTail) > 0 {
Expand Down Expand Up @@ -142,12 +137,13 @@ func histogramRate(isRate bool, step types.RangeVectorStepData, hHead []promql.H
delta = delta.CopyToSchema(desiredSchema)
}

val := calculateHistogramRate(isRate, step.RangeStart, step.RangeEnd, rangeSeconds, firstPoint, lastPoint, delta, hCount)
val := calculateHistogramRate(isRate, rangeStart, rangeEnd, rangeSeconds, firstPoint, lastPoint, delta, hCount)
return val, err
}

func floatRate(isRate bool, fCount int, floatBuffer *types.FPointRingBuffer, step types.RangeVectorStepData, fHead []promql.FPoint, fTail []promql.FPoint, rangeSeconds float64) float64 {
firstPoint := floatBuffer.First()
func floatRate(isRate bool, fCount int, fHead []promql.FPoint, fTail []promql.FPoint, rangeStart int64, rangeEnd int64, rangeSeconds float64) float64 {
firstPoint := fHead[0]
fHead = fHead[1:]

var lastPoint promql.FPoint
if len(fTail) > 0 {
Expand All @@ -173,7 +169,7 @@ func floatRate(isRate bool, fCount int, floatBuffer *types.FPointRingBuffer, ste
accumulate(fHead)
accumulate(fTail)

val := calculateFloatRate(isRate, step.RangeStart, step.RangeEnd, rangeSeconds, firstPoint, lastPoint, delta, fCount)
val := calculateFloatRate(isRate, rangeStart, rangeEnd, rangeSeconds, firstPoint, lastPoint, delta, fCount)
return val
}

Expand Down
32 changes: 16 additions & 16 deletions pkg/streamingpromql/operators/selectors/range_vector_selector.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,10 @@ type RangeVectorSelector struct {

rangeMilliseconds int64
chunkIterator chunkenc.Iterator
nextT int64
nextStepT int64
floats *types.FPointRingBuffer
histograms *types.HPointRingBuffer
stepData *types.RangeVectorStepData // Retain the last step data instance we used to avoid allocating it for every step.
}

var _ types.RangeVectorOperator = &RangeVectorSelector{}
Expand All @@ -36,6 +37,7 @@ func NewRangeVectorSelector(selector *Selector, memoryConsumptionTracker *limiti
Selector: selector,
floats: types.NewFPointRingBuffer(memoryConsumptionTracker),
histograms: types.NewHPointRingBuffer(memoryConsumptionTracker),
stepData: &types.RangeVectorStepData{},
}
}

Expand Down Expand Up @@ -65,19 +67,20 @@ func (m *RangeVectorSelector) NextSeries(ctx context.Context) error {
return err
}

m.nextT = m.Selector.TimeRange.StartT
m.nextStepT = m.Selector.TimeRange.StartT
m.floats.Reset()
m.histograms.Reset()
return nil
}

func (m *RangeVectorSelector) NextStepSamples() (types.RangeVectorStepData, error) {
if m.nextT > m.Selector.TimeRange.EndT {
return types.RangeVectorStepData{}, types.EOS
func (m *RangeVectorSelector) NextStepSamples() (*types.RangeVectorStepData, error) {
if m.nextStepT > m.Selector.TimeRange.EndT {
return nil, types.EOS
}

stepT := m.nextT
rangeEnd := stepT
m.stepData.StepT = m.nextStepT
rangeEnd := m.nextStepT
m.nextStepT += m.Selector.TimeRange.IntervalMilliseconds

if m.Selector.Timestamp != nil {
// Timestamp from @ modifier takes precedence over query evaluation timestamp.
Expand All @@ -91,18 +94,15 @@ func (m *RangeVectorSelector) NextStepSamples() (types.RangeVectorStepData, erro
m.histograms.DiscardPointsBefore(rangeStart)

if err := m.fillBuffer(m.floats, m.histograms, rangeStart, rangeEnd); err != nil {
return types.RangeVectorStepData{}, err
return nil, err
}

m.nextT += m.Selector.TimeRange.IntervalMilliseconds
m.stepData.Floats = m.floats.ViewUntilSearchingBackwards(rangeEnd, m.stepData.Floats)
m.stepData.Histograms = m.histograms.ViewUntilSearchingBackwards(rangeEnd, m.stepData.Histograms)
m.stepData.RangeStart = rangeStart
m.stepData.RangeEnd = rangeEnd

return types.RangeVectorStepData{
Floats: m.floats,
Histograms: m.histograms,
StepT: stepT,
RangeStart: rangeStart,
RangeEnd: rangeEnd,
}, nil
return m.stepData, nil
}

func (m *RangeVectorSelector) fillBuffer(floats *types.FPointRingBuffer, histograms *types.HPointRingBuffer, rangeStart, rangeEnd int64) error {
Expand Down
24 changes: 12 additions & 12 deletions pkg/streamingpromql/operators/subquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type Subquery struct {
rangeMilliseconds int64
floats *types.FPointRingBuffer
histograms *types.HPointRingBuffer
stepData *types.RangeVectorStepData // Retain the last step data instance we used to avoid allocating it for every step.
}

var _ types.RangeVectorOperator = &Subquery{}
Expand All @@ -49,6 +50,7 @@ func NewSubquery(
rangeMilliseconds: subqueryRange.Milliseconds(),
floats: types.NewFPointRingBuffer(memoryConsumptionTracker),
histograms: types.NewHPointRingBuffer(memoryConsumptionTracker),
stepData: &types.RangeVectorStepData{},
}
}

Expand All @@ -72,13 +74,14 @@ func (s *Subquery) NextSeries(ctx context.Context) error {
return nil
}

func (s *Subquery) NextStepSamples() (types.RangeVectorStepData, error) {
func (s *Subquery) NextStepSamples() (*types.RangeVectorStepData, error) {
if s.nextStepT > s.ParentQueryTimeRange.EndT {
return types.RangeVectorStepData{}, types.EOS
return nil, types.EOS
}

stepT := s.nextStepT
rangeEnd := stepT
s.stepData.StepT = s.nextStepT
rangeEnd := s.nextStepT
s.nextStepT += s.ParentQueryTimeRange.IntervalMilliseconds

if s.SubqueryTimestamp != nil {
// Timestamp from @ modifier takes precedence over query evaluation timestamp.
Expand All @@ -91,15 +94,12 @@ func (s *Subquery) NextStepSamples() (types.RangeVectorStepData, error) {
s.floats.DiscardPointsBefore(rangeStart)
s.histograms.DiscardPointsBefore(rangeStart)

s.nextStepT += s.ParentQueryTimeRange.IntervalMilliseconds
s.stepData.Floats = s.floats.ViewUntilSearchingForwards(rangeEnd, s.stepData.Floats)
s.stepData.Histograms = s.histograms.ViewUntilSearchingForwards(rangeEnd, s.stepData.Histograms)
s.stepData.RangeStart = rangeStart
s.stepData.RangeEnd = rangeEnd

return types.RangeVectorStepData{
Floats: s.floats,
Histograms: s.histograms,
StepT: stepT,
RangeStart: rangeStart,
RangeEnd: rangeEnd,
}, nil
return s.stepData, nil
}

func (s *Subquery) StepCount() int {
Expand Down
4 changes: 2 additions & 2 deletions pkg/streamingpromql/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -730,12 +730,12 @@ func (q *Query) populateMatrixFromRangeVectorOperator(ctx context.Context, o typ
return nil, err
}

floats, err := step.Floats.CopyPoints(step.RangeEnd)
floats, err := step.Floats.CopyPoints()
if err != nil {
return nil, err
}

histograms, err := step.Histograms.CopyPoints(step.RangeEnd)
histograms, err := step.Histograms.CopyPoints()
if err != nil {
return nil, err
}
Expand Down
Loading