Skip to content

Commit

Permalink
Reusing Batch Iterators (#6403)
Browse files Browse the repository at this point in the history
* Reusing Batch Iterators

Signed-off-by: alanprot <[email protected]>

* addressing some comments

Signed-off-by: alanprot <[email protected]>

* lint

Signed-off-by: alanprot <[email protected]>

* fixing reset method

Signed-off-by: alanprot <[email protected]>

---------

Signed-off-by: alanprot <[email protected]>
  • Loading branch information
alanprot authored Dec 18, 2024
1 parent 3269f9d commit e359f50
Show file tree
Hide file tree
Showing 9 changed files with 121 additions and 62 deletions.
4 changes: 2 additions & 2 deletions pkg/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,8 @@ func seriesSetFromResponseStream(s *mockQueryStreamServer) (storage.SeriesSet, e

serieses = append(serieses, &storage.SeriesEntry{
Lset: ls,
SampleIteratorFn: func(_ chunkenc.Iterator) chunkenc.Iterator {
return batch.NewChunkMergeIterator(chunks, math.MinInt64, math.MaxInt64)
SampleIteratorFn: func(it chunkenc.Iterator) chunkenc.Iterator {
return batch.NewChunkMergeIterator(it, chunks, math.MinInt64, math.MaxInt64)
},
})
}
Expand Down
15 changes: 11 additions & 4 deletions pkg/querier/batch/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,19 +52,26 @@ type iterator interface {
}

// NewChunkMergeIterator returns a chunkenc.Iterator that merges Cortex chunks together.
func NewChunkMergeIterator(chunks []chunk.Chunk, _, _ model.Time) chunkenc.Iterator {
func NewChunkMergeIterator(it chunkenc.Iterator, chunks []chunk.Chunk, _, _ model.Time) chunkenc.Iterator {
converted := make([]GenericChunk, len(chunks))
for i, c := range chunks {
c := c
converted[i] = NewGenericChunk(int64(c.From), int64(c.Through), c.NewIterator)
}

return NewGenericChunkMergeIterator(converted)
return NewGenericChunkMergeIterator(it, converted)
}

// NewGenericChunkMergeIterator returns a chunkenc.Iterator that merges generic chunks together.
func NewGenericChunkMergeIterator(chunks []GenericChunk) chunkenc.Iterator {
iter := newMergeIterator(chunks)
func NewGenericChunkMergeIterator(it chunkenc.Iterator, chunks []GenericChunk) chunkenc.Iterator {

var underlying iterator

if ia, ok := it.(*iteratorAdapter); ok {
underlying = ia.underlying
}

iter := newMergeIterator(underlying, chunks)
return newIteratorAdapter(iter)
}

Expand Down
9 changes: 5 additions & 4 deletions pkg/querier/batch/batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,9 @@ func BenchmarkNewChunkMergeIterator_CreateAndIterate(b *testing.B) {
b.Run(name, func(b *testing.B) {
b.ReportAllocs()

var it chunkenc.Iterator
for n := 0; n < b.N; n++ {
it := NewChunkMergeIterator(chunks, 0, 0)
it = NewChunkMergeIterator(it, chunks, 0, 0)
for it.Next() != chunkenc.ValNone {
it.At()
}
Expand Down Expand Up @@ -108,9 +109,9 @@ func BenchmarkNewChunkMergeIterator_Seek(b *testing.B) {
b.ResetTimer()
b.Run(name, func(b *testing.B) {
b.ReportAllocs()

var it chunkenc.Iterator
for n := 0; n < b.N; n++ {
it := NewChunkMergeIterator(chunks, 0, 0)
it = NewChunkMergeIterator(it, chunks, 0, 0)
i := int64(0)
for it.Seek(i*scenario.seekStep.Milliseconds()) != chunkenc.ValNone {
i++
Expand All @@ -132,7 +133,7 @@ func TestSeekCorrectlyDealWithSinglePointChunks(t *testing.T) {
chunkTwo := util.GenerateChunk(t, step, model.Time(10*step/time.Millisecond), 1, enc)
chunks := []chunk.Chunk{chunkOne, chunkTwo}

sut := NewChunkMergeIterator(chunks, 0, 0)
sut := NewChunkMergeIterator(nil, chunks, 0, 0)

// Following calls mimics Prometheus's query engine behaviour for VectorSelector.
require.Equal(t, valType, sut.Next())
Expand Down
50 changes: 41 additions & 9 deletions pkg/querier/batch/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,27 @@ type mergeIterator struct {
currErr error
}

func newMergeIterator(cs []GenericChunk) *mergeIterator {
func newMergeIterator(it iterator, cs []GenericChunk) *mergeIterator {
css := partitionChunks(cs)
its := make([]*nonOverlappingIterator, 0, len(css))
for _, cs := range css {
its = append(its, newNonOverlappingIterator(cs))

var c *mergeIterator

if mIterator, ok := it.(*mergeIterator); ok && cap(mIterator.its) >= len(css) {
c = mIterator.Reset(len(css))
} else {
c = &mergeIterator{
h: make(iteratorHeap, 0, len(css)),
batches: make(batchStream, 0, len(css)),
batchesBuf: make(batchStream, len(css)),
}
}

c := &mergeIterator{
its: its,
h: make(iteratorHeap, 0, len(its)),
batches: make(batchStream, 0, len(its)),
batchesBuf: make(batchStream, len(its)),
if cap(c.its) < len(css) {
c.its = make([]*nonOverlappingIterator, 0, len(css))
}

for _, cs := range css {
c.its = append(c.its, newNonOverlappingIterator(cs))
}

for _, iter := range c.its {
Expand All @@ -52,6 +61,29 @@ func newMergeIterator(cs []GenericChunk) *mergeIterator {
return c
}

func (c *mergeIterator) Reset(size int) *mergeIterator {
c.its = c.its[:0]
c.h = c.h[:0]
c.batches = c.batches[:0]

if size > cap(c.batchesBuf) {
c.batchesBuf = make(batchStream, len(c.its))
} else {
c.batchesBuf = c.batchesBuf[:size]
for i := 0; i < size; i++ {
c.batchesBuf[i] = promchunk.Batch{}
}
}

for i := 0; i < len(c.nextBatchBuf); i++ {
c.nextBatchBuf[i] = promchunk.Batch{}
}

c.currErr = nil

return c
}

func (c *mergeIterator) Seek(t int64, size int) chunkenc.ValueType {

// Optimisation to see if the seek is within our current caches batches.
Expand Down
36 changes: 32 additions & 4 deletions pkg/querier/batch/merge_test.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package batch

import (
"fmt"
"testing"
"time"

"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/tsdb/chunkenc"

"github.com/cortexproject/cortex/pkg/chunk/encoding"
)
Expand All @@ -18,14 +20,40 @@ func TestMergeIter(t *testing.T) {
chunk4 := mkGenericChunk(t, model.TimeFromUnix(75), 100, enc)
chunk5 := mkGenericChunk(t, model.TimeFromUnix(100), 100, enc)

iter := newMergeIterator([]GenericChunk{chunk1, chunk2, chunk3, chunk4, chunk5})
iter := newMergeIterator(nil, []GenericChunk{chunk1, chunk2, chunk3, chunk4, chunk5})
testIter(t, 200, newIteratorAdapter(iter), enc)

iter = newMergeIterator([]GenericChunk{chunk1, chunk2, chunk3, chunk4, chunk5})
iter = newMergeIterator(iter, []GenericChunk{chunk1, chunk2, chunk3, chunk4, chunk5})
testSeek(t, 200, newIteratorAdapter(iter), enc)
})
}

func BenchmarkMergeIterator(b *testing.B) {
chunks := make([]GenericChunk, 0, 10)
for i := 0; i < 10; i++ {
chunks = append(chunks, mkGenericChunk(b, model.Time(i*25), 120, encoding.PrometheusXorChunk))
}
iter := newMergeIterator(nil, chunks)

for _, r := range []bool{true, false} {
b.Run(fmt.Sprintf("reuse-%t", r), func(b *testing.B) {
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
if r {
iter = newMergeIterator(iter, chunks)
} else {
iter = newMergeIterator(nil, chunks)
}
a := newIteratorAdapter(iter)
for a.Next() != chunkenc.ValNone {

}
}
})
}
}

func TestMergeHarder(t *testing.T) {
t.Parallel()
forEncodings(t, func(t *testing.T, enc encoding.Encoding) {
Expand All @@ -40,10 +68,10 @@ func TestMergeHarder(t *testing.T) {
chunks = append(chunks, mkGenericChunk(t, from, samples, enc))
from = from.Add(time.Duration(offset) * time.Second)
}
iter := newMergeIterator(chunks)
iter := newMergeIterator(nil, chunks)
testIter(t, offset*numChunks+samples-offset, newIteratorAdapter(iter), enc)

iter = newMergeIterator(chunks)
iter = newMergeIterator(iter, chunks)
testSeek(t, offset*numChunks+samples-offset, newIteratorAdapter(iter), enc)
})
}
2 changes: 1 addition & 1 deletion pkg/querier/chunk_store_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,4 @@ import (
"github.com/cortexproject/cortex/pkg/chunk"
)

type chunkIteratorFunc func(chunks []chunk.Chunk, from, through model.Time) chunkenc.Iterator
type chunkIteratorFunc func(it chunkenc.Iterator, chunks []chunk.Chunk, from, through model.Time) chunkenc.Iterator
4 changes: 2 additions & 2 deletions pkg/querier/distributor_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,8 @@ func (q *distributorQuerier) streamingSelect(ctx context.Context, sortSeries boo

serieses = append(serieses, &storage.SeriesEntry{
Lset: ls,
SampleIteratorFn: func(_ chunkenc.Iterator) chunkenc.Iterator {
return q.chunkIterFn(chunks, model.Time(minT), model.Time(maxT))
SampleIteratorFn: func(it chunkenc.Iterator) chunkenc.Iterator {
return q.chunkIterFn(it, chunks, model.Time(minT), model.Time(maxT))
},
})
}
Expand Down
36 changes: 4 additions & 32 deletions pkg/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,14 @@ import (
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/promql/parser"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/util/annotations"
"github.com/thanos-io/promql-engine/engine"
"github.com/thanos-io/promql-engine/logicalplan"
"github.com/thanos-io/thanos/pkg/strutil"
"golang.org/x/sync/errgroup"

"github.com/cortexproject/cortex/pkg/chunk"
"github.com/cortexproject/cortex/pkg/ingester/client"
"github.com/cortexproject/cortex/pkg/querier/batch"
"github.com/cortexproject/cortex/pkg/querier/lazyquery"
seriesset "github.com/cortexproject/cortex/pkg/querier/series"
querier_stats "github.com/cortexproject/cortex/pkg/querier/stats"
"github.com/cortexproject/cortex/pkg/tenant"
"github.com/cortexproject/cortex/pkg/util"
Expand Down Expand Up @@ -188,7 +184,7 @@ func New(cfg Config, limits *validation.Overrides, distributor Distributor, stor
QueryStoreAfter: cfg.QueryStoreAfter,
}
}
queryable := NewQueryable(distributorQueryable, ns, iteratorFunc, cfg, limits)
queryable := NewQueryable(distributorQueryable, ns, cfg, limits)
exemplarQueryable := newDistributorExemplarQueryable(distributor)

lazyQueryable := storage.QueryableFunc(func(mint int64, maxt int64) (storage.Querier, error) {
Expand Down Expand Up @@ -275,13 +271,12 @@ type limiterHolder struct {
}

// NewQueryable creates a new Queryable for cortex.
func NewQueryable(distributor QueryableWithFilter, stores []QueryableWithFilter, chunkIterFn chunkIteratorFunc, cfg Config, limits *validation.Overrides) storage.Queryable {
func NewQueryable(distributor QueryableWithFilter, stores []QueryableWithFilter, cfg Config, limits *validation.Overrides) storage.Queryable {
return storage.QueryableFunc(func(mint, maxt int64) (storage.Querier, error) {
q := querier{
now: time.Now(),
mint: mint,
maxt: maxt,
chunkIterFn: chunkIterFn,
limits: limits,
maxQueryIntoFuture: cfg.MaxQueryIntoFuture,
ignoreMaxQueryLength: cfg.IgnoreMaxQueryLength,
Expand All @@ -295,10 +290,8 @@ func NewQueryable(distributor QueryableWithFilter, stores []QueryableWithFilter,
}

type querier struct {
chunkIterFn chunkIteratorFunc
now time.Time
mint, maxt int64

now time.Time
mint, maxt int64
limits *validation.Overrides
maxQueryIntoFuture time.Duration
distributor QueryableWithFilter
Expand Down Expand Up @@ -670,24 +663,3 @@ func validateQueryTimeRange(ctx context.Context, userID string, startMs, endMs i

return int64(startTime), int64(endTime), nil
}

// Series in the returned set are sorted alphabetically by labels.
func partitionChunks(chunks []chunk.Chunk, mint, maxt int64, iteratorFunc chunkIteratorFunc) storage.SeriesSet {
chunksBySeries := map[string][]chunk.Chunk{}
for _, c := range chunks {
key := client.LabelsToKeyString(c.Metric)
chunksBySeries[key] = append(chunksBySeries[key], c)
}

series := make([]storage.Series, 0, len(chunksBySeries))
for i := range chunksBySeries {
series = append(series, &storage.SeriesEntry{
Lset: chunksBySeries[i][0].Metric,
SampleIteratorFn: func(_ chunkenc.Iterator) chunkenc.Iterator {
return iteratorFunc(chunksBySeries[i], model.Time(mint), model.Time(maxt))
},
})
}

return seriesset.NewConcreteSeriesSet(true, series)
}
27 changes: 23 additions & 4 deletions pkg/querier/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/scrape"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/tsdb/tsdbutil"
"github.com/prometheus/prometheus/util/annotations"
"github.com/stretchr/testify/assert"
Expand All @@ -32,6 +33,7 @@ import (
"github.com/cortexproject/cortex/pkg/cortexpb"
"github.com/cortexproject/cortex/pkg/ingester/client"
"github.com/cortexproject/cortex/pkg/querier/batch"
"github.com/cortexproject/cortex/pkg/querier/series"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/chunkcompat"
"github.com/cortexproject/cortex/pkg/util/flagext"
Expand Down Expand Up @@ -333,7 +335,7 @@ func TestShouldSortSeriesIfQueryingMultipleQueryables(t *testing.T) {
for _, queryable := range tc.storeQueriables {
wQueriables = append(wQueriables, &wrappedSampleAndChunkQueryable{QueryableWithFilter: queryable})
}
queryable := NewQueryable(wDistributorQueriable, wQueriables, batch.NewChunkMergeIterator, cfg, overrides)
queryable := NewQueryable(wDistributorQueriable, wQueriables, cfg, overrides)
opts := promql.EngineOpts{
Logger: log.NewNopLogger(),
MaxSamples: 1e6,
Expand Down Expand Up @@ -521,7 +523,7 @@ func TestLimits(t *testing.T) {
overrides, err := validation.NewOverrides(DefaultLimitsConfig(), tc.tenantLimit)
require.NoError(t, err)

queryable := NewQueryable(wDistributorQueriable, wQueriables, batch.NewChunkMergeIterator, cfg, overrides)
queryable := NewQueryable(wDistributorQueriable, wQueriables, cfg, overrides)
opts := promql.EngineOpts{
Logger: log.NewNopLogger(),
MaxSamples: 1e6,
Expand Down Expand Up @@ -1476,7 +1478,7 @@ type mockStoreQuerier struct {

// Select implements storage.Querier interface.
// The bool passed is ignored because the series is always sorted.
func (q *mockStoreQuerier) Select(ctx context.Context, _ bool, sp *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet {
func (q *mockStoreQuerier) Select(_ context.Context, _ bool, sp *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet {
// If we don't skip here, it'll make /series lookups extremely slow as all the chunks will be loaded.
// That flag is only to be set with blocks storage engine, and this is a protective measure.
if sp != nil && sp.Func == "series" {
Expand All @@ -1488,7 +1490,24 @@ func (q *mockStoreQuerier) Select(ctx context.Context, _ bool, sp *storage.Selec
return storage.ErrSeriesSet(err)
}

return partitionChunks(chunks, q.mint, q.maxt, q.chunkIteratorFunc)
cs := make([]storage.Series, 0, len(chunks))
chunksBySeries := map[string][]chunk.Chunk{}

for _, c := range chunks {
key := client.LabelsToKeyString(c.Metric)
chunksBySeries[key] = append(chunksBySeries[key], c)
}

for i, c := range chunksBySeries {
cs = append(cs, &storage.SeriesEntry{
Lset: chunksBySeries[i][0].Metric,
SampleIteratorFn: func(it chunkenc.Iterator) chunkenc.Iterator {
return q.chunkIteratorFunc(it, c, model.Time(mint), model.Time(maxt))
},
})
}

return series.NewConcreteSeriesSet(true, cs)
}

func (q *mockStoreQuerier) LabelValues(ctx context.Context, name string, _ *storage.LabelHints, labels ...*labels.Matcher) ([]string, annotations.Annotations, error) {
Expand Down

0 comments on commit e359f50

Please sign in to comment.