Skip to content

Commit

Permalink
MQE: Add helper for idx calcs (#9417)
Browse files Browse the repository at this point in the history
* MQE: Add helper for idx calcs

* Update CHANGELOG
  • Loading branch information
jhesketh authored Sep 26, 2024
1 parent 3509c46 commit 45a14c9
Show file tree
Hide file tree
Showing 14 changed files with 114 additions and 38 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
* [CHANGE] Querier: Remove deprecated `-querier.max-query-into-future`. The feature was deprecated in Mimir 2.12. #9407
* [FEATURE] Alertmanager: Added `-alertmanager.log-parsing-label-matchers` to control logging when parsing label matchers. This flag is intended to be used with `-alertmanager.utf8-strict-mode-enabled` to validate UTF-8 strict mode is working as intended. The default value is `false`. #9173
* [FEATURE] Alertmanager: Added `-alertmanager.utf8-migration-logging-enabled` to enable logging of tenant configurations that are incompatible with UTF-8 strict mode. The default value is `false`. #9174
* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #8422 #8430 #8454 #8455 #8360 #8490 #8508 #8577 #8660 #8671 #8677 #8747 #8850 #8872 #8838 #8911 #8909 #8923 #8924 #8925 #8932 #8933 #8934 #8962 #8986 #8993 #8995 #9008 #9017 #9018 #9019 #9120 #9121 #9136 #9139 #9140 #9145 #9191 #9192 #9194 #9196 #9201 #9212 #9225 #9260 #9272 #9277 #9278 #9280 #9281 #9342 #9343 #9367 #9368 #9371 #9398 #9399 #9403
* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #8422 #8430 #8454 #8455 #8360 #8490 #8508 #8577 #8660 #8671 #8677 #8747 #8850 #8872 #8838 #8911 #8909 #8923 #8924 #8925 #8932 #8933 #8934 #8962 #8986 #8993 #8995 #9008 #9017 #9018 #9019 #9120 #9121 #9136 #9139 #9140 #9145 #9191 #9192 #9194 #9196 #9201 #9212 #9225 #9260 #9272 #9277 #9278 #9280 #9281 #9342 #9343 #9367 #9368 #9371 #9398 #9399 #9403 #9417
* [FEATURE] Experimental Kafka-based ingest storage. #6888 #6894 #6929 #6940 #6951 #6974 #6982 #7029 #7030 #7091 #7142 #7147 #7148 #7153 #7160 #7193 #7349 #7376 #7388 #7391 #7393 #7394 #7402 #7404 #7423 #7424 #7437 #7486 #7503 #7508 #7540 #7621 #7682 #7685 #7694 #7695 #7696 #7697 #7701 #7733 #7734 #7741 #7752 #7838 #7851 #7871 #7877 #7880 #7882 #7887 #7891 #7925 #7955 #7967 #8031 #8063 #8077 #8088 #8135 #8176 #8184 #8194 #8216 #8217 #8222 #8233 #8503 #8542 #8579 #8657 #8686 #8688 #8703 #8706 #8708 #8738 #8750 #8778 #8808 #8809 #8841 #8842 #8845 #8853 #8886 #8988
* What it is:
* When the new ingest storage architecture is enabled, distributors write incoming write requests to a Kafka-compatible backend, and the ingesters asynchronously replay ingested data from Kafka. In this architecture, the write and read path are de-coupled through a Kafka-compatible backend. The write path and Kafka load is a function of the incoming write traffic, the read path load is a function of received queries. Whatever the load on the read path, it doesn't affect the write path.
Expand Down
8 changes: 4 additions & 4 deletions pkg/streamingpromql/aggregations/avg.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (g *AvgAggregationGroup) accumulateFloats(data types.InstantVectorSeriesDat
}

for _, p := range data.Floats {
idx := (p.T - timeRange.StartT) / timeRange.IntervalMs
idx := timeRange.PointIndex(p.T)
g.groupSeriesCounts[idx]++
if !g.floatPresent[idx] {
// The first point is just taken as the value
Expand Down Expand Up @@ -168,7 +168,7 @@ func (g *AvgAggregationGroup) accumulateHistograms(data types.InstantVectorSerie
var lastUncopiedHistogram *histogram.FloatHistogram

for inputIdx, p := range data.Histograms {
outputIdx := (p.T - timeRange.StartT) / timeRange.IntervalMs
outputIdx := timeRange.PointIndex(p.T)
g.groupSeriesCounts[outputIdx]++

if g.histograms[outputIdx] == invalidCombinationOfHistograms {
Expand Down Expand Up @@ -294,7 +294,7 @@ func (g *AvgAggregationGroup) ComputeOutputSeries(timeRange types.QueryTimeRange

for i, havePoint := range g.floatPresent {
if havePoint {
t := timeRange.StartT + int64(i)*timeRange.IntervalMs
t := timeRange.StartT + int64(i)*timeRange.IntervalMilliseconds
var f float64
if g.incrementalMeans != nil && g.incrementalMeans[i] {
f = g.floatMeans[i] + g.floatCompensatingMeans[i]
Expand All @@ -315,7 +315,7 @@ func (g *AvgAggregationGroup) ComputeOutputSeries(timeRange types.QueryTimeRange

for i, h := range g.histograms {
if h != nil && h != invalidCombinationOfHistograms {
t := timeRange.StartT + int64(i)*timeRange.IntervalMs
t := timeRange.StartT + int64(i)*timeRange.IntervalMilliseconds
histogramPoints = append(histogramPoints, promql.HPoint{T: t, H: h.Compact(0)})
}
}
Expand Down
8 changes: 3 additions & 5 deletions pkg/streamingpromql/aggregations/count.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,11 @@ func (g *CountGroupAggregationGroup) AccumulateSeries(data types.InstantVectorSe
}

for _, p := range data.Floats {
idx := (p.T - timeRange.StartT) / timeRange.IntervalMs
g.accumulatePoint(idx)
g.accumulatePoint(timeRange.PointIndex(p.T))
}

for _, p := range data.Histograms {
idx := (p.T - timeRange.StartT) / timeRange.IntervalMs
g.accumulatePoint(idx)
g.accumulatePoint(timeRange.PointIndex(p.T))
}

types.PutInstantVectorSeriesData(data, memoryConsumptionTracker)
Expand All @@ -84,7 +82,7 @@ func (g *CountGroupAggregationGroup) ComputeOutputSeries(timeRange types.QueryTi

for i, fv := range g.values {
if fv > 0 {
t := timeRange.StartT + int64(i)*timeRange.IntervalMs
t := timeRange.StartT + int64(i)*timeRange.IntervalMilliseconds
floatPoints = append(floatPoints, promql.FPoint{T: t, F: fv})
}
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/streamingpromql/aggregations/min_max.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,14 +70,14 @@ func (g *MinMaxAggregationGroup) AccumulateSeries(data types.InstantVectorSeries
}

for _, p := range data.Floats {
idx := (p.T - timeRange.StartT) / timeRange.IntervalMs
idx := timeRange.PointIndex(p.T)
g.accumulatePoint(idx, p.F)
}

// If a histogram exists max treats it as 0. We have to detect this here so that we return a 0 value instead of nothing.
// This is consistent with Prometheus but may not be the desired value: https://github.com/prometheus/prometheus/issues/14711
for _, p := range data.Histograms {
idx := (p.T - timeRange.StartT) / timeRange.IntervalMs
idx := timeRange.PointIndex(p.T)
g.accumulatePoint(idx, 0)
}

Expand All @@ -102,7 +102,7 @@ func (g *MinMaxAggregationGroup) ComputeOutputSeries(timeRange types.QueryTimeRa

for i, havePoint := range g.floatPresent {
if havePoint {
t := timeRange.StartT + int64(i)*timeRange.IntervalMs
t := timeRange.StartT + int64(i)*timeRange.IntervalMilliseconds
f := g.floatValues[i]
floatPoints = append(floatPoints, promql.FPoint{T: t, F: f})
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/streamingpromql/aggregations/stddev.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func (g *StddevAggregationGroup) AccumulateSeries(data types.InstantVectorSeries
}

for _, p := range data.Floats {
idx := (p.T - timeRange.StartT) / timeRange.IntervalMs
idx := timeRange.PointIndex(p.T)

g.groupSeriesCounts[idx]++
delta := p.F - g.floatMeans[idx]
Expand Down Expand Up @@ -81,7 +81,7 @@ func (g *StddevAggregationGroup) ComputeOutputSeries(timeRange types.QueryTimeRa

for i, sc := range g.groupSeriesCounts {
if sc > 0 {
t := timeRange.StartT + int64(i)*timeRange.IntervalMs
t := timeRange.StartT + int64(i)*timeRange.IntervalMilliseconds
f := math.Sqrt(g.floats[i] / g.groupSeriesCounts[i])
floatPoints = append(floatPoints, promql.FPoint{T: t, F: f})
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/streamingpromql/aggregations/sum.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (g *SumAggregationGroup) accumulateFloats(data types.InstantVectorSeriesDat
}

for _, p := range data.Floats {
idx := (p.T - timeRange.StartT) / timeRange.IntervalMs
idx := timeRange.PointIndex(p.T)
g.floatSums[idx], g.floatCompensatingValues[idx] = floats.KahanSumInc(p.F, g.floatSums[idx], g.floatCompensatingValues[idx])
g.floatPresent[idx] = true
}
Expand All @@ -90,7 +90,7 @@ func (g *SumAggregationGroup) accumulateHistograms(data types.InstantVectorSerie
}

for inputIdx, p := range data.Histograms {
outputIdx := (p.T - timeRange.StartT) / timeRange.IntervalMs
outputIdx := timeRange.PointIndex(p.T)

if g.histogramSums[outputIdx] == invalidCombinationOfHistograms {
// We've already seen an invalid combination of histograms at this timestamp. Ignore this point.
Expand Down Expand Up @@ -193,7 +193,7 @@ func (g *SumAggregationGroup) ComputeOutputSeries(timeRange types.QueryTimeRange

for i, havePoint := range g.floatPresent {
if havePoint {
t := timeRange.StartT + int64(i)*timeRange.IntervalMs
t := timeRange.StartT + int64(i)*timeRange.IntervalMilliseconds
f := g.floatSums[i] + g.floatCompensatingValues[i]
floatPoints = append(floatPoints, promql.FPoint{T: t, F: f})
}
Expand All @@ -209,7 +209,7 @@ func (g *SumAggregationGroup) ComputeOutputSeries(timeRange types.QueryTimeRange

for i, h := range g.histogramSums {
if h != nil && h != invalidCombinationOfHistograms {
t := timeRange.StartT + int64(i)*timeRange.IntervalMs
t := timeRange.StartT + int64(i)*timeRange.IntervalMilliseconds
histogramPoints = append(histogramPoints, promql.HPoint{T: t, H: h.Compact(0)})
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/streamingpromql/operators/instant_vector_selector.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func (v *InstantVectorSelector) NextSeries(ctx context.Context) (types.InstantVe
lastHistogramT := int64(math.MinInt64)
var lastHistogram *histogram.FloatHistogram

for stepT := v.Selector.TimeRange.StartT; stepT <= v.Selector.TimeRange.EndT; stepT += v.Selector.TimeRange.IntervalMs {
for stepT := v.Selector.TimeRange.StartT; stepT <= v.Selector.TimeRange.EndT; stepT += v.Selector.TimeRange.IntervalMilliseconds {
var t int64
var f float64
var h *histogram.FloatHistogram
Expand Down
4 changes: 2 additions & 2 deletions pkg/streamingpromql/operators/instant_vector_to_scalar.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func (i *InstantVectorToScalar) GetValues(ctx context.Context) (types.ScalarData
return types.ScalarData{}, err
}

for t := i.TimeRange.StartT; t <= i.TimeRange.EndT; t += i.TimeRange.IntervalMs {
for t := i.TimeRange.StartT; t <= i.TimeRange.EndT; t += i.TimeRange.IntervalMilliseconds {
output = append(output, promql.FPoint{
T: t,
F: math.NaN(),
Expand All @@ -71,7 +71,7 @@ func (i *InstantVectorToScalar) GetValues(ctx context.Context) (types.ScalarData
}

for _, p := range seriesData.Floats {
sampleIdx := (p.T - i.TimeRange.StartT) / i.TimeRange.IntervalMs
sampleIdx := (p.T - i.TimeRange.StartT) / i.TimeRange.IntervalMilliseconds

if seenPoint[sampleIdx] {
// We've already seen another point at this timestamp, so return NaN at this timestamp.
Expand Down
2 changes: 1 addition & 1 deletion pkg/streamingpromql/operators/range_vector_selector.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func (m *RangeVectorSelector) NextStepSamples(floats *types.FPointRingBuffer, hi
return types.RangeVectorStepData{}, err
}

m.nextT += m.Selector.TimeRange.IntervalMs
m.nextT += m.Selector.TimeRange.IntervalMilliseconds

return types.RangeVectorStepData{
StepT: stepT,
Expand Down
2 changes: 1 addition & 1 deletion pkg/streamingpromql/operators/scalar_constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func (s *ScalarConstant) GetValues(_ context.Context) (types.ScalarData, error)
samples = samples[:s.TimeRange.StepCount]

for step := 0; step < s.TimeRange.StepCount; step++ {
samples[step].T = s.TimeRange.StartT + int64(step)*s.TimeRange.IntervalMs
samples[step].T = s.TimeRange.StartT + int64(step)*s.TimeRange.IntervalMilliseconds
samples[step].F = s.Value
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/streamingpromql/operators/selector.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func (s *Selector) SeriesMetadata(ctx context.Context) ([]types.SeriesMetadata,
hints := &storage.SelectHints{
Start: startTimestamp,
End: endTimestamp,
Step: s.TimeRange.IntervalMs,
Step: s.TimeRange.IntervalMilliseconds,
Range: rangeMilliseconds,

// Mimir doesn't use Grouping or By, so there's no need to include them here.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ func (v *VectorScalarBinaryOperation) NextSeries(ctx context.Context) (types.Ins
break
}

scalarIdx := (t - v.timeRange.StartT) / v.timeRange.IntervalMs // Scalars always have a value at every step, so we can just compute the index of the corresponding scalar value from the timestamp.
scalarIdx := (t - v.timeRange.StartT) / v.timeRange.IntervalMilliseconds // Scalars always have a value at every step, so we can just compute the index of the corresponding scalar value from the timestamp.
scalarValue := v.scalarData.Samples[scalarIdx].F

f, h, ok, err := v.opFunc(scalarValue, vectorF, vectorH)
Expand Down
30 changes: 18 additions & 12 deletions pkg/streamingpromql/types/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,9 +156,9 @@ func HasDuplicateSeries(metadata []SeriesMetadata) bool {
}

type QueryTimeRange struct {
StartT int64 // Start timestamp, in milliseconds since Unix epoch.
EndT int64 // End timestamp, in milliseconds since Unix epoch.
IntervalMs int64 // Range query interval, or 1 for instant queries. Note that this is deliberately different to parser.EvalStmt.Interval for instant queries (where it is 0) to simplify some loop conditions.
StartT int64 // Start timestamp, in milliseconds since Unix epoch.
EndT int64 // End timestamp, in milliseconds since Unix epoch.
IntervalMilliseconds int64 // Range query interval, or 1 for instant queries. Note that this is deliberately different to parser.EvalStmt.Interval for instant queries (where it is 0) to simplify some loop conditions.

StepCount int // 1 for instant queries.
}
Expand All @@ -167,22 +167,28 @@ func NewInstantQueryTimeRange(t time.Time) QueryTimeRange {
ts := timestamp.FromTime(t)

return QueryTimeRange{
StartT: ts,
EndT: ts,
IntervalMs: 1,
StepCount: 1,
StartT: ts,
EndT: ts,
IntervalMilliseconds: 1,
StepCount: 1,
}
}

func NewRangeQueryTimeRange(start time.Time, end time.Time, interval time.Duration) QueryTimeRange {
startT := timestamp.FromTime(start)
endT := timestamp.FromTime(end)
intervalMs := interval.Milliseconds()
IntervalMilliseconds := interval.Milliseconds()

return QueryTimeRange{
StartT: startT,
EndT: endT,
IntervalMs: intervalMs,
StepCount: int((endT-startT)/intervalMs) + 1,
StartT: startT,
EndT: endT,
IntervalMilliseconds: IntervalMilliseconds,
StepCount: int((endT-startT)/IntervalMilliseconds) + 1,
}
}

// PointIndex returns the index in the QueryTimeRange that the timestamp, t, falls on.
// t must be in line with IntervalMs (ie the step).
func (q *QueryTimeRange) PointIndex(t int64) int64 {
return (t - q.StartT) / q.IntervalMilliseconds
}
72 changes: 72 additions & 0 deletions pkg/streamingpromql/types/data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@ package types

import (
"testing"
"time"

"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/timestamp"
"github.com/prometheus/prometheus/promql"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -230,3 +232,73 @@ func TestHasDuplicateSeries(t *testing.T) {
})
}
}

func TestQueryTimeRange(t *testing.T) {
type testCase struct {
start time.Time
end time.Time
interval time.Duration
expectedStart int64
expectedEnd int64
expectedIntMs int64
expectedSteps int
testTimes []time.Time
expectedIdxs []int64
}

startTime := time.Now()
testCases := map[string]testCase{
"Instant query": {
start: startTime,
end: startTime,
interval: 0,
expectedStart: timestamp.FromTime(startTime),
expectedEnd: timestamp.FromTime(startTime),
expectedIntMs: 1,
expectedSteps: 1,
testTimes: []time.Time{startTime},
expectedIdxs: []int64{0},
},
"Range query with 15-minute interval": {
start: startTime,
end: startTime.Add(time.Hour),
interval: time.Minute * 15,
expectedStart: timestamp.FromTime(startTime),
expectedEnd: timestamp.FromTime(startTime.Add(time.Hour)),
expectedIntMs: (time.Minute * 15).Milliseconds(),
expectedSteps: 5,
testTimes: []time.Time{
startTime,
startTime.Add(time.Minute * 15),
startTime.Add(time.Minute * 30),
startTime.Add(time.Minute * 45),
startTime.Add(time.Hour),
},
expectedIdxs: []int64{0, 1, 2, 3, 4},
},
}

for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
var qtr QueryTimeRange

if tc.interval == 0 {
qtr = NewInstantQueryTimeRange(tc.start)
} else {
qtr = NewRangeQueryTimeRange(tc.start, tc.end, tc.interval)
}

require.Equal(t, tc.expectedStart, qtr.StartT, "StartT matches")
require.Equal(t, tc.expectedEnd, qtr.EndT, "EndT matches")
require.Equal(t, tc.expectedIntMs, qtr.IntervalMilliseconds, "IntervalMs matches")
require.Equal(t, tc.expectedSteps, qtr.StepCount, "StepCount matches")

for i, tt := range tc.testTimes {
ts := timestamp.FromTime(tt)
pointIdx := qtr.PointIndex(ts)
expectedIdx := tc.expectedIdxs[i]
require.Equal(t, expectedIdx, pointIdx, "PointIdx matches for time %v", tt)
}
})
}
}

0 comments on commit 45a14c9

Please sign in to comment.