diff --git a/pkg/ingester/ingester_test.go b/pkg/ingester/ingester_test.go index ab68877ce0..a10873c6a5 100644 --- a/pkg/ingester/ingester_test.go +++ b/pkg/ingester/ingester_test.go @@ -110,8 +110,8 @@ func seriesSetFromResponseStream(s *mockQueryStreamServer) (storage.SeriesSet, e serieses = append(serieses, &storage.SeriesEntry{ Lset: ls, - SampleIteratorFn: func(_ chunkenc.Iterator) chunkenc.Iterator { - return batch.NewChunkMergeIterator(chunks, math.MinInt64, math.MaxInt64) + SampleIteratorFn: func(it chunkenc.Iterator) chunkenc.Iterator { + return batch.NewChunkMergeIterator(it, chunks, math.MinInt64, math.MaxInt64) }, }) } diff --git a/pkg/querier/batch/batch.go b/pkg/querier/batch/batch.go index ca7e1f79ee..79dfe8081e 100644 --- a/pkg/querier/batch/batch.go +++ b/pkg/querier/batch/batch.go @@ -52,19 +52,26 @@ type iterator interface { } // NewChunkMergeIterator returns a chunkenc.Iterator that merges Cortex chunks together. -func NewChunkMergeIterator(chunks []chunk.Chunk, _, _ model.Time) chunkenc.Iterator { +func NewChunkMergeIterator(it chunkenc.Iterator, chunks []chunk.Chunk, _, _ model.Time) chunkenc.Iterator { converted := make([]GenericChunk, len(chunks)) for i, c := range chunks { c := c converted[i] = NewGenericChunk(int64(c.From), int64(c.Through), c.NewIterator) } - return NewGenericChunkMergeIterator(converted) + return NewGenericChunkMergeIterator(it, converted) } // NewGenericChunkMergeIterator returns a chunkenc.Iterator that merges generic chunks together. -func NewGenericChunkMergeIterator(chunks []GenericChunk) chunkenc.Iterator { - iter := newMergeIterator(chunks) +func NewGenericChunkMergeIterator(it chunkenc.Iterator, chunks []GenericChunk) chunkenc.Iterator { + + var underlying iterator + + if ia, ok := it.(*iteratorAdapter); ok { + underlying = ia.underlying + } + + iter := newMergeIterator(underlying, chunks) return newIteratorAdapter(iter) } diff --git a/pkg/querier/batch/batch_test.go b/pkg/querier/batch/batch_test.go index 30c0a0e38c..4f4b57bfe4 100644 --- a/pkg/querier/batch/batch_test.go +++ b/pkg/querier/batch/batch_test.go @@ -55,8 +55,9 @@ func BenchmarkNewChunkMergeIterator_CreateAndIterate(b *testing.B) { b.Run(name, func(b *testing.B) { b.ReportAllocs() + var it chunkenc.Iterator for n := 0; n < b.N; n++ { - it := NewChunkMergeIterator(chunks, 0, 0) + it = NewChunkMergeIterator(it, chunks, 0, 0) for it.Next() != chunkenc.ValNone { it.At() } @@ -108,9 +109,9 @@ func BenchmarkNewChunkMergeIterator_Seek(b *testing.B) { b.ResetTimer() b.Run(name, func(b *testing.B) { b.ReportAllocs() - + var it chunkenc.Iterator for n := 0; n < b.N; n++ { - it := NewChunkMergeIterator(chunks, 0, 0) + it = NewChunkMergeIterator(it, chunks, 0, 0) i := int64(0) for it.Seek(i*scenario.seekStep.Milliseconds()) != chunkenc.ValNone { i++ @@ -132,7 +133,7 @@ func TestSeekCorrectlyDealWithSinglePointChunks(t *testing.T) { chunkTwo := util.GenerateChunk(t, step, model.Time(10*step/time.Millisecond), 1, enc) chunks := []chunk.Chunk{chunkOne, chunkTwo} - sut := NewChunkMergeIterator(chunks, 0, 0) + sut := NewChunkMergeIterator(nil, chunks, 0, 0) // Following calls mimics Prometheus's query engine behaviour for VectorSelector. require.Equal(t, valType, sut.Next()) diff --git a/pkg/querier/batch/merge.go b/pkg/querier/batch/merge.go index 8c7ebdf062..5d13453d52 100644 --- a/pkg/querier/batch/merge.go +++ b/pkg/querier/batch/merge.go @@ -23,18 +23,27 @@ type mergeIterator struct { currErr error } -func newMergeIterator(cs []GenericChunk) *mergeIterator { +func newMergeIterator(it iterator, cs []GenericChunk) *mergeIterator { css := partitionChunks(cs) - its := make([]*nonOverlappingIterator, 0, len(css)) - for _, cs := range css { - its = append(its, newNonOverlappingIterator(cs)) + + var c *mergeIterator + + if mIterator, ok := it.(*mergeIterator); ok && cap(mIterator.its) >= len(css) { + c = mIterator.Reset(len(css)) + } else { + c = &mergeIterator{ + h: make(iteratorHeap, 0, len(css)), + batches: make(batchStream, 0, len(css)), + batchesBuf: make(batchStream, len(css)), + } } - c := &mergeIterator{ - its: its, - h: make(iteratorHeap, 0, len(its)), - batches: make(batchStream, 0, len(its)), - batchesBuf: make(batchStream, len(its)), + if cap(c.its) < len(css) { + c.its = make([]*nonOverlappingIterator, 0, len(css)) + } + + for _, cs := range css { + c.its = append(c.its, newNonOverlappingIterator(cs)) } for _, iter := range c.its { @@ -52,6 +61,29 @@ func newMergeIterator(cs []GenericChunk) *mergeIterator { return c } +func (c *mergeIterator) Reset(size int) *mergeIterator { + c.its = c.its[:0] + c.h = c.h[:0] + c.batches = c.batches[:0] + + if len(c.its) <= cap(c.batchesBuf) { + c.batchesBuf = c.batchesBuf[:size] + for i := 0; i < size; i++ { + c.batchesBuf[i] = promchunk.Batch{} + } + } else { + c.batchesBuf = make(batchStream, len(c.its)) + } + + for i := 0; i < len(c.nextBatchBuf); i++ { + c.nextBatchBuf[i] = promchunk.Batch{} + } + + c.currErr = nil + + return c +} + func (c *mergeIterator) Seek(t int64, size int) chunkenc.ValueType { // Optimisation to see if the seek is within our current caches batches. diff --git a/pkg/querier/batch/merge_test.go b/pkg/querier/batch/merge_test.go index 8ad0d16df4..111edb5a0b 100644 --- a/pkg/querier/batch/merge_test.go +++ b/pkg/querier/batch/merge_test.go @@ -1,11 +1,13 @@ package batch import ( + "fmt" "testing" "time" "github.com/prometheus/common/model" - + "github.com/prometheus/prometheus/tsdb/chunkenc" + "github.com/cortexproject/cortex/pkg/chunk/encoding" ) @@ -18,14 +20,40 @@ func TestMergeIter(t *testing.T) { chunk4 := mkGenericChunk(t, model.TimeFromUnix(75), 100, enc) chunk5 := mkGenericChunk(t, model.TimeFromUnix(100), 100, enc) - iter := newMergeIterator([]GenericChunk{chunk1, chunk2, chunk3, chunk4, chunk5}) + iter := newMergeIterator(nil, []GenericChunk{chunk1, chunk2, chunk3, chunk4, chunk5}) testIter(t, 200, newIteratorAdapter(iter), enc) - iter = newMergeIterator([]GenericChunk{chunk1, chunk2, chunk3, chunk4, chunk5}) + iter = newMergeIterator(iter, []GenericChunk{chunk1, chunk2, chunk3, chunk4, chunk5}) testSeek(t, 200, newIteratorAdapter(iter), enc) }) } +func BenchmarkMergeIterator(b *testing.B) { + chunks := make([]GenericChunk, 0, 10) + for i := 0; i < 10; i++ { + chunks = append(chunks, mkGenericChunk(b, model.Time(i*25), 120, encoding.PrometheusXorChunk)) + } + iter := newMergeIterator(nil, chunks) + + for _, r := range []bool{true, false} { + b.Run(fmt.Sprintf("reuse-%t", r), func(b *testing.B) { + b.ResetTimer() + b.ReportAllocs() + for i := 0; i < b.N; i++ { + if r { + iter = newMergeIterator(iter, chunks) + } else { + iter = newMergeIterator(nil, chunks) + } + a := newIteratorAdapter(iter) + for a.Next() != chunkenc.ValNone { + + } + } + }) + } +} + func TestMergeHarder(t *testing.T) { t.Parallel() forEncodings(t, func(t *testing.T, enc encoding.Encoding) { @@ -40,10 +68,10 @@ func TestMergeHarder(t *testing.T) { chunks = append(chunks, mkGenericChunk(t, from, samples, enc)) from = from.Add(time.Duration(offset) * time.Second) } - iter := newMergeIterator(chunks) + iter := newMergeIterator(nil, chunks) testIter(t, offset*numChunks+samples-offset, newIteratorAdapter(iter), enc) - iter = newMergeIterator(chunks) + iter = newMergeIterator(iter, chunks) testSeek(t, offset*numChunks+samples-offset, newIteratorAdapter(iter), enc) }) } diff --git a/pkg/querier/chunk_store_queryable.go b/pkg/querier/chunk_store_queryable.go index e072228ac7..0b3fabf011 100644 --- a/pkg/querier/chunk_store_queryable.go +++ b/pkg/querier/chunk_store_queryable.go @@ -7,4 +7,4 @@ import ( "github.com/cortexproject/cortex/pkg/chunk" ) -type chunkIteratorFunc func(chunks []chunk.Chunk, from, through model.Time) chunkenc.Iterator +type chunkIteratorFunc func(it chunkenc.Iterator, chunks []chunk.Chunk, from, through model.Time) chunkenc.Iterator diff --git a/pkg/querier/distributor_queryable.go b/pkg/querier/distributor_queryable.go index 8ae7c49106..709294e6fa 100644 --- a/pkg/querier/distributor_queryable.go +++ b/pkg/querier/distributor_queryable.go @@ -155,8 +155,8 @@ func (q *distributorQuerier) streamingSelect(ctx context.Context, sortSeries boo serieses = append(serieses, &storage.SeriesEntry{ Lset: ls, - SampleIteratorFn: func(_ chunkenc.Iterator) chunkenc.Iterator { - return q.chunkIterFn(chunks, model.Time(minT), model.Time(maxT)) + SampleIteratorFn: func(it chunkenc.Iterator) chunkenc.Iterator { + return q.chunkIterFn(it, chunks, model.Time(minT), model.Time(maxT)) }, }) } diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index 784aef3bee..0f3821543e 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -188,7 +188,7 @@ func New(cfg Config, limits *validation.Overrides, distributor Distributor, stor QueryStoreAfter: cfg.QueryStoreAfter, } } - queryable := NewQueryable(distributorQueryable, ns, iteratorFunc, cfg, limits) + queryable := NewQueryable(distributorQueryable, ns, cfg, limits) exemplarQueryable := newDistributorExemplarQueryable(distributor) lazyQueryable := storage.QueryableFunc(func(mint int64, maxt int64) (storage.Querier, error) { @@ -275,13 +275,12 @@ type limiterHolder struct { } // NewQueryable creates a new Queryable for cortex. -func NewQueryable(distributor QueryableWithFilter, stores []QueryableWithFilter, chunkIterFn chunkIteratorFunc, cfg Config, limits *validation.Overrides) storage.Queryable { +func NewQueryable(distributor QueryableWithFilter, stores []QueryableWithFilter, cfg Config, limits *validation.Overrides) storage.Queryable { return storage.QueryableFunc(func(mint, maxt int64) (storage.Querier, error) { q := querier{ now: time.Now(), mint: mint, maxt: maxt, - chunkIterFn: chunkIterFn, limits: limits, maxQueryIntoFuture: cfg.MaxQueryIntoFuture, ignoreMaxQueryLength: cfg.IgnoreMaxQueryLength, @@ -295,10 +294,8 @@ func NewQueryable(distributor QueryableWithFilter, stores []QueryableWithFilter, } type querier struct { - chunkIterFn chunkIteratorFunc - now time.Time - mint, maxt int64 - + now time.Time + mint, maxt int64 limits *validation.Overrides maxQueryIntoFuture time.Duration distributor QueryableWithFilter @@ -683,8 +680,8 @@ func partitionChunks(chunks []chunk.Chunk, mint, maxt int64, iteratorFunc chunkI for i := range chunksBySeries { series = append(series, &storage.SeriesEntry{ Lset: chunksBySeries[i][0].Metric, - SampleIteratorFn: func(_ chunkenc.Iterator) chunkenc.Iterator { - return iteratorFunc(chunksBySeries[i], model.Time(mint), model.Time(maxt)) + SampleIteratorFn: func(it chunkenc.Iterator) chunkenc.Iterator { + return iteratorFunc(it, chunksBySeries[i], model.Time(mint), model.Time(maxt)) }, }) } diff --git a/pkg/querier/querier_test.go b/pkg/querier/querier_test.go index 044bf0e193..bf35fee85c 100644 --- a/pkg/querier/querier_test.go +++ b/pkg/querier/querier_test.go @@ -333,7 +333,7 @@ func TestShouldSortSeriesIfQueryingMultipleQueryables(t *testing.T) { for _, queryable := range tc.storeQueriables { wQueriables = append(wQueriables, &wrappedSampleAndChunkQueryable{QueryableWithFilter: queryable}) } - queryable := NewQueryable(wDistributorQueriable, wQueriables, batch.NewChunkMergeIterator, cfg, overrides) + queryable := NewQueryable(wDistributorQueriable, wQueriables, cfg, overrides) opts := promql.EngineOpts{ Logger: log.NewNopLogger(), MaxSamples: 1e6, @@ -521,7 +521,7 @@ func TestLimits(t *testing.T) { overrides, err := validation.NewOverrides(DefaultLimitsConfig(), tc.tenantLimit) require.NoError(t, err) - queryable := NewQueryable(wDistributorQueriable, wQueriables, batch.NewChunkMergeIterator, cfg, overrides) + queryable := NewQueryable(wDistributorQueriable, wQueriables, cfg, overrides) opts := promql.EngineOpts{ Logger: log.NewNopLogger(), MaxSamples: 1e6,