Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify querier merge code #620

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 0 additions & 23 deletions pkg/querier/chunk_store_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,32 +2,9 @@ package querier

import (
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/tsdb/chunkenc"

"github.com/cortexproject/cortex/pkg/chunk"
)

type chunkIteratorFunc func(chunks []chunk.Chunk, from, through model.Time) chunkenc.Iterator

// Implements SeriesWithChunks
type chunkSeries struct {
labels labels.Labels
chunks []chunk.Chunk
chunkIteratorFunc chunkIteratorFunc
mint, maxt int64
}

func (s *chunkSeries) Labels() labels.Labels {
return s.labels
}

// Iterator returns a new iterator of the data of the series.
func (s *chunkSeries) Iterator(chunkenc.Iterator) chunkenc.Iterator {
return s.chunkIteratorFunc(s.chunks, model.Time(s.mint), model.Time(s.maxt))
}

// Chunks implements SeriesWithChunks interface.
func (s *chunkSeries) Chunks() []chunk.Chunk {
return s.chunks
}
37 changes: 0 additions & 37 deletions pkg/querier/chunk_store_queryable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package querier
import (
"context"
"fmt"
"sort"
"testing"
"time"

Expand All @@ -20,9 +19,6 @@ import (
promchunk "github.com/cortexproject/cortex/pkg/chunk/encoding"
)

// Make sure that chunkSeries implements SeriesWithChunks
var _ SeriesWithChunks = &chunkSeries{}

func TestChunkQueryable(t *testing.T) {
t.Parallel()
opts := promql.EngineOpts{
Expand Down Expand Up @@ -92,36 +88,3 @@ func mkChunk(t require.TestingT, mint, maxt model.Time, step time.Duration, enco
}
return chunk.NewChunk(metric, pc, mint, maxt)
}

func TestPartitionChunksOutputIsSortedByLabels(t *testing.T) {
t.Parallel()

var allChunks []chunk.Chunk

const count = 10
// go down, to add series in reversed order
for i := count; i > 0; i-- {
ch := mkChunk(t, model.Time(0), model.Time(1000), time.Millisecond, promchunk.PrometheusXorChunk)
// mkChunk uses `foo` as metric name, so we rename metric to be unique
ch.Metric[0].Value = fmt.Sprintf("%02d", i)

allChunks = append(allChunks, ch)
}

res := partitionChunks(allChunks, 0, 1000, mergeChunks)

// collect labels from each series
var seriesLabels []labels.Labels
for res.Next() {
seriesLabels = append(seriesLabels, res.At().Labels())
}

require.Len(t, seriesLabels, count)
require.True(t, sort.IsSorted(sortedByLabels(seriesLabels)))
}

type sortedByLabels []labels.Labels

func (b sortedByLabels) Len() int { return len(b) }
func (b sortedByLabels) Swap(i, j int) { b[i], b[j] = b[j], b[i] }
func (b sortedByLabels) Less(i, j int) bool { return labels.Compare(b[i], b[j]) < 0 }
12 changes: 6 additions & 6 deletions pkg/querier/distributor_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/scrape"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/util/annotations"

"github.com/cortexproject/cortex/pkg/cortexpb"
Expand Down Expand Up @@ -184,12 +185,11 @@ func (q *distributorQuerier) streamingSelect(ctx context.Context, sortSeries boo
return storage.ErrSeriesSet(err)
}

serieses = append(serieses, &chunkSeries{
labels: ls,
chunks: chunks,
chunkIteratorFunc: q.chunkIterFn,
mint: minT,
maxt: maxT,
serieses = append(serieses, &storage.SeriesEntry{
Lset: ls,
SampleIteratorFn: func(iterator chunkenc.Iterator) chunkenc.Iterator {
return q.chunkIterFn(chunks, model.Time(minT), model.Time(maxT))
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am also thinking about getting rid of this chunkIterFn completely so that we can remove a ton of code and just reuse upstream interfaces. It makes so much easier to support native histograms.

But this breaks the batch iterator we have.

		iterables := make([]chunkenc.Iterable, len(chunks))
		serieses = append(serieses, &storage.SeriesEntry{
			Lset: ls,
			SampleIteratorFn: func(iterator chunkenc.Iterator) chunkenc.Iterator {
				for i, c := range chunks {
					iterables[i] = c.Data.SampleIterable()
				}
				return storage.ChainSampleIteratorFromIterables(iterator, iterables)
			},
		})

},
})
}

Expand Down
86 changes: 8 additions & 78 deletions pkg/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/util/annotations"
v1 "github.com/prometheus/prometheus/web/api/v1"
"github.com/thanos-io/promql-engine/engine"
Expand Down Expand Up @@ -433,10 +434,7 @@ func (q querier) Select(ctx context.Context, sortSeries bool, sp *storage.Select
}
}

// we have all the sets from different sources (chunk from store, chunks from ingesters,
// time series from store and time series from ingesters).
// mergeSeriesSets will return sorted set.
return q.mergeSeriesSets(result)
return storage.NewMergeSeriesSet(result, storage.ChainedSeriesMerge)
}

// LabelValues implements storage.Querier.
Expand Down Expand Up @@ -552,74 +550,6 @@ func (querier) Close() error {
return nil
}

func (q querier) mergeSeriesSets(sets []storage.SeriesSet) storage.SeriesSet {
// Here we deal with sets that are based on chunks and build single set from them.
// Remaining sets are merged with chunks-based one using storage.NewMergeSeriesSet

otherSets := []storage.SeriesSet(nil)
chunks := []chunk.Chunk(nil)

for _, set := range sets {
nonChunkSeries := []storage.Series(nil)

// SeriesSet may have some series backed up by chunks, and some not.
for set.Next() {
s := set.At()

if sc, ok := s.(SeriesWithChunks); ok {
chunks = append(chunks, sc.Chunks()...)
} else {
nonChunkSeries = append(nonChunkSeries, s)
}
}

if err := set.Err(); err != nil {
otherSets = append(otherSets, storage.ErrSeriesSet(err))
} else if len(nonChunkSeries) > 0 {
otherSets = append(otherSets, &sliceSeriesSet{series: nonChunkSeries, ix: -1})
}
}

if len(chunks) == 0 {
return storage.NewMergeSeriesSet(otherSets, storage.ChainedSeriesMerge)
}

// partitionChunks returns set with sorted series, so it can be used by NewMergeSeriesSet
chunksSet := partitionChunks(chunks, q.mint, q.maxt, q.chunkIterFn)

if len(otherSets) == 0 {
return chunksSet
}

otherSets = append(otherSets, chunksSet)
return storage.NewMergeSeriesSet(otherSets, storage.ChainedSeriesMerge)
}

type sliceSeriesSet struct {
series []storage.Series
ix int
}

func (s *sliceSeriesSet) Next() bool {
s.ix++
return s.ix < len(s.series)
}

func (s *sliceSeriesSet) At() storage.Series {
if s.ix < 0 || s.ix >= len(s.series) {
return nil
}
return s.series[s.ix]
}

func (s *sliceSeriesSet) Err() error {
return nil
}

func (s *sliceSeriesSet) Warnings() annotations.Annotations {
return nil
}

type storeQueryable struct {
QueryableWithFilter
QueryStoreAfter time.Duration
Expand Down Expand Up @@ -717,6 +647,7 @@ func validateQueryTimeRange(ctx context.Context, userID string, startMs, endMs i
}

// Series in the returned set are sorted alphabetically by labels.
// Only used in tests now.
func partitionChunks(chunks []chunk.Chunk, mint, maxt int64, iteratorFunc chunkIteratorFunc) storage.SeriesSet {
chunksBySeries := map[string][]chunk.Chunk{}
for _, c := range chunks {
Expand All @@ -726,12 +657,11 @@ func partitionChunks(chunks []chunk.Chunk, mint, maxt int64, iteratorFunc chunkI

series := make([]storage.Series, 0, len(chunksBySeries))
for i := range chunksBySeries {
series = append(series, &chunkSeries{
labels: chunksBySeries[i][0].Metric,
chunks: chunksBySeries[i],
chunkIteratorFunc: iteratorFunc,
mint: mint,
maxt: maxt,
series = append(series, &storage.SeriesEntry{
Lset: chunksBySeries[i][0].Metric,
SampleIteratorFn: func(iterator chunkenc.Iterator) chunkenc.Iterator {
return iteratorFunc(chunksBySeries[i], model.Time(mint), model.Time(maxt))
},
})
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/querier/series/series_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ type concreteSeriesIterator struct {
series *ConcreteSeries
}

// NewConcreteSeriesIterator instaniates an in memory chunkenc.Iterator
// NewConcreteSeriesIterator instantiates an in memory chunkenc.Iterator
func NewConcreteSeriesIterator(series *ConcreteSeries) chunkenc.Iterator {
return iterators.NewCompatibleChunksIterator(&concreteSeriesIterator{
cur: -1,
Expand Down
15 changes: 0 additions & 15 deletions pkg/querier/series_with_chunks.go

This file was deleted.

Loading