From f390efcf8c52dc00e188dba75658e8cf7f83fad3 Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Thu, 22 Jun 2023 00:45:11 -0700 Subject: [PATCH 1/6] optimize postings fetching by checking postings and series size Signed-off-by: Ben Ye --- cmd/thanos/store.go | 5 + docs/components/store.md | 5 + pkg/block/block_test.go | 10 +- pkg/block/indexheader/binary_reader.go | 19 +- pkg/block/indexheader/header.go | 8 +- pkg/block/indexheader/header_test.go | 32 ++ pkg/block/indexheader/lazy_binary_reader.go | 13 + pkg/store/bucket.go | 290 +++++++----- pkg/store/bucket_test.go | 203 +++++---- pkg/store/lazy_postings.go | 260 +++++++++++ pkg/store/lazy_postings_test.go | 473 ++++++++++++++++++++ pkg/testutil/e2eutil/prometheus.go | 61 ++- test/e2e/store_gateway_test.go | 146 +++++- 13 files changed, 1327 insertions(+), 198 deletions(-) create mode 100644 pkg/store/lazy_postings.go create mode 100644 pkg/store/lazy_postings_test.go diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go index 9ddfbd89b7..29ac6921a2 100644 --- a/cmd/thanos/store.go +++ b/cmd/thanos/store.go @@ -88,6 +88,7 @@ type storeConfig struct { reqLogConfig *extflag.PathOrContent lazyIndexReaderEnabled bool lazyIndexReaderIdleTimeout time.Duration + lazyExpandedPostingsEnabled bool } func (sc *storeConfig) registerFlag(cmd extkingpin.FlagClause) { @@ -182,6 +183,9 @@ func (sc *storeConfig) registerFlag(cmd extkingpin.FlagClause) { cmd.Flag("store.index-header-lazy-reader-idle-timeout", "If index-header lazy reader is enabled and this idle timeout setting is > 0, memory map-ed index-headers will be automatically released after 'idle timeout' inactivity."). Hidden().Default("5m").DurationVar(&sc.lazyIndexReaderIdleTimeout) + cmd.Flag("store.enable-lazy-expanded-postings", "If true, Store Gateway will estimate postings size and try to lazily expand postings if it downloads less data than expanding all postings."). + Default("false").BoolVar(&sc.lazyExpandedPostingsEnabled) + cmd.Flag("web.disable", "Disable Block Viewer UI.").Default("false").BoolVar(&sc.disableWeb) cmd.Flag("web.external-prefix", "Static prefix for all HTML links and redirect URLs in the bucket web UI interface. Actual endpoints are still served on / or the web.route-prefix. This allows thanos bucket web UI to be served behind a reverse proxy that strips a URL sub-path."). @@ -382,6 +386,7 @@ func runStore( } return conf.estimatedMaxChunkSize }), + store.WithLazyExpandedPostings(conf.lazyExpandedPostingsEnabled), } if conf.debugLogging { diff --git a/docs/components/store.md b/docs/components/store.md index ac0234f1df..26b359f326 100644 --- a/docs/components/store.md +++ b/docs/components/store.md @@ -176,6 +176,11 @@ Flags: If true, Store Gateway will lazy memory map index-header only once the block is required by a query. + --store.enable-lazy-expanded-postings + If true, Store Gateway will estimate postings + size and try to lazily expand postings if + it downloads less data than expanding all + postings. --store.grpc.downloaded-bytes-limit=0 Maximum amount of downloaded (either fetched or touched) bytes in a single diff --git a/pkg/block/block_test.go b/pkg/block/block_test.go index 12eb5eed84..a271270590 100644 --- a/pkg/block/block_test.go +++ b/pkg/block/block_test.go @@ -144,7 +144,7 @@ func TestUpload(t *testing.T) { testutil.Equals(t, 3, len(bkt.Objects())) testutil.Equals(t, 3727, len(bkt.Objects()[path.Join(b1.String(), ChunksDirname, "000001")])) testutil.Equals(t, 401, len(bkt.Objects()[path.Join(b1.String(), IndexFilename)])) - testutil.Equals(t, 567, len(bkt.Objects()[path.Join(b1.String(), MetaFilename)])) + testutil.Equals(t, 595, len(bkt.Objects()[path.Join(b1.String(), MetaFilename)])) // File stats are gathered. testutil.Equals(t, fmt.Sprintf(`{ @@ -184,7 +184,9 @@ func TestUpload(t *testing.T) { "rel_path": "meta.json" } ], - "index_stats": {} + "index_stats": { + "series_max_size": 16 + } } } `, b1.String(), b1.String()), string(bkt.Objects()[path.Join(b1.String(), MetaFilename)])) @@ -195,7 +197,7 @@ func TestUpload(t *testing.T) { testutil.Equals(t, 3, len(bkt.Objects())) testutil.Equals(t, 3727, len(bkt.Objects()[path.Join(b1.String(), ChunksDirname, "000001")])) testutil.Equals(t, 401, len(bkt.Objects()[path.Join(b1.String(), IndexFilename)])) - testutil.Equals(t, 567, len(bkt.Objects()[path.Join(b1.String(), MetaFilename)])) + testutil.Equals(t, 595, len(bkt.Objects()[path.Join(b1.String(), MetaFilename)])) } { // Upload with no external labels should be blocked. @@ -227,7 +229,7 @@ func TestUpload(t *testing.T) { testutil.Equals(t, 6, len(bkt.Objects())) testutil.Equals(t, 3727, len(bkt.Objects()[path.Join(b2.String(), ChunksDirname, "000001")])) testutil.Equals(t, 401, len(bkt.Objects()[path.Join(b2.String(), IndexFilename)])) - testutil.Equals(t, 546, len(bkt.Objects()[path.Join(b2.String(), MetaFilename)])) + testutil.Equals(t, 574, len(bkt.Objects()[path.Join(b2.String(), MetaFilename)])) } } diff --git a/pkg/block/indexheader/binary_reader.go b/pkg/block/indexheader/binary_reader.go index 1befe63a7f..16ef73ac3b 100644 --- a/pkg/block/indexheader/binary_reader.go +++ b/pkg/block/indexheader/binary_reader.go @@ -47,6 +47,8 @@ const ( postingLengthFieldSize = 4 ) +var NotFoundRange = index.Range{Start: -1, End: -1} + // The table gets initialized with sync.Once but may still cause a race // with any other use of the crc32 package anywhere. Thus we initialize it // before. @@ -747,13 +749,18 @@ func (r *BinaryReader) IndexVersion() (int, error) { return r.indexVersion, nil } +// PostingsOffsets implements Reader. +func (r *BinaryReader) PostingsOffsets(name string, values ...string) ([]index.Range, error) { + return r.postingsOffset(name, values...) +} + // TODO(bwplotka): Get advantage of multi value offset fetch. func (r *BinaryReader) PostingsOffset(name, value string) (index.Range, error) { rngs, err := r.postingsOffset(name, value) if err != nil { return index.Range{}, err } - if len(rngs) != 1 { + if len(rngs) != 1 || rngs[0] == NotFoundRange { return index.Range{}, NotFoundRangeErr } return rngs[0], nil @@ -801,6 +808,7 @@ func (r *BinaryReader) postingsOffset(name string, values ...string) ([]index.Ra valueIndex := 0 for valueIndex < len(values) && values[valueIndex] < e.offsets[0].value { // Discard values before the start. + rngs = append(rngs, NotFoundRange) valueIndex++ } @@ -811,6 +819,9 @@ func (r *BinaryReader) postingsOffset(name string, values ...string) ([]index.Ra i := sort.Search(len(e.offsets), func(i int) bool { return e.offsets[i].value >= wantedValue }) if i == len(e.offsets) { // We're past the end. + for len(rngs) < len(values) { + rngs = append(rngs, NotFoundRange) + } break } if i > 0 && e.offsets[i].value != wantedValue { @@ -858,6 +869,8 @@ func (r *BinaryReader) postingsOffset(name string, values ...string) ([]index.Ra // Record on the way if wanted value is equal to the current value. if string(value) == wantedValue { newSameRngs = append(newSameRngs, index.Range{Start: postingOffset + postingLengthFieldSize}) + } else { + rngs = append(rngs, NotFoundRange) } valueIndex++ if valueIndex == len(values) { @@ -877,6 +890,10 @@ func (r *BinaryReader) postingsOffset(name string, values ...string) ([]index.Ra } if valueIndex != len(values) && wantedValue <= e.offsets[i+1].value { + // Increment i when wanted value is same as next offset. + if wantedValue == e.offsets[i+1].value { + i++ + } // wantedValue is smaller or same as the next offset we know about, let's iterate further to add those. continue } diff --git a/pkg/block/indexheader/header.go b/pkg/block/indexheader/header.go index 8ecef33564..d0b4141afd 100644 --- a/pkg/block/indexheader/header.go +++ b/pkg/block/indexheader/header.go @@ -20,10 +20,16 @@ type Reader interface { // IndexVersion returns version of index. IndexVersion() (int, error) + // PostingsOffsets returns start and end offsets for postings for given name and values. + // Input values need to be sorted. + // If the requested label name doesn't exist, then no posting and error will be returned. + // If the requested label name exists, but some values don't exist, the corresponding index range + // will be set to -1 for both start and end. + PostingsOffsets(name string, value ...string) ([]index.Range, error) + // PostingsOffset returns start and end offsets of postings for given name and value. // The end offset might be bigger than the actual posting ending, but not larger than the whole index file. // NotFoundRangeErr is returned when no index can be found for given name and value. - // TODO(bwplotka): Move to PostingsOffsets(name string, value ...string) []index.Range and benchmark. PostingsOffset(name string, value string) (index.Range, error) // LookupSymbol returns string based on given reference. diff --git a/pkg/block/indexheader/header_test.go b/pkg/block/indexheader/header_test.go index d0d7eb5f7d..540e4d6c57 100644 --- a/pkg/block/indexheader/header_test.go +++ b/pkg/block/indexheader/header_test.go @@ -141,6 +141,38 @@ func TestReaders(t *testing.T) { testutil.Ok(t, err) testutil.Equals(t, []string(nil), vals) + // single value + rngs, err := br.PostingsOffsets("a", "9") + testutil.Ok(t, err) + for _, rng := range rngs { + testutil.Assert(t, rng.End > rng.Start) + } + + rngs, err = br.PostingsOffsets("a", "2", "3", "4", "5", "6", "7", "8", "9") + testutil.Ok(t, err) + for _, rng := range rngs { + testutil.Assert(t, rng.End > rng.Start) + } + + rngs, err = br.PostingsOffsets("a", "0") + testutil.Ok(t, err) + testutil.Assert(t, len(rngs) == 1) + testutil.Equals(t, NotFoundRange, rngs[0]) + + rngs, err = br.PostingsOffsets("a", "0", "10", "99") + testutil.Ok(t, err) + testutil.Assert(t, len(rngs) == 3) + for _, rng := range rngs { + testutil.Equals(t, NotFoundRange, rng) + } + + rngs, err = br.PostingsOffsets("a", "1", "10", "9") + testutil.Ok(t, err) + testutil.Assert(t, len(rngs) == 3) + testutil.Assert(t, rngs[0].End > rngs[0].Start) + testutil.Assert(t, rngs[2].End > rngs[2].Start) + testutil.Equals(t, NotFoundRange, rngs[1]) + // Regression tests for https://github.com/thanos-io/thanos/issues/2213. // Most of not existing value was working despite bug, except in certain unlucky cases // it was causing "invalid size" errors. diff --git a/pkg/block/indexheader/lazy_binary_reader.go b/pkg/block/indexheader/lazy_binary_reader.go index c3bee382c2..451a79b6ee 100644 --- a/pkg/block/indexheader/lazy_binary_reader.go +++ b/pkg/block/indexheader/lazy_binary_reader.go @@ -154,6 +154,19 @@ func (r *LazyBinaryReader) IndexVersion() (int, error) { return r.reader.IndexVersion() } +// PostingsOffsets implements Reader. +func (r *LazyBinaryReader) PostingsOffsets(name string, values ...string) ([]index.Range, error) { + r.readerMx.RLock() + defer r.readerMx.RUnlock() + + if err := r.load(); err != nil { + return nil, err + } + + r.usedAt.Store(time.Now().UnixNano()) + return r.reader.PostingsOffsets(name, values...) +} + // PostingsOffset implements Reader. func (r *LazyBinaryReader) PostingsOffset(name, value string) (index.Range, error) { r.readerMx.RLock() diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 027122447e..520ddfc912 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -117,26 +117,27 @@ var ( ) type bucketStoreMetrics struct { - blocksLoaded prometheus.Gauge - blockLoads prometheus.Counter - blockLoadFailures prometheus.Counter - lastLoadedBlock prometheus.Gauge - blockDrops prometheus.Counter - blockDropFailures prometheus.Counter - seriesDataTouched *prometheus.HistogramVec - seriesDataFetched *prometheus.HistogramVec - seriesDataSizeTouched *prometheus.HistogramVec - seriesDataSizeFetched *prometheus.HistogramVec - seriesBlocksQueried prometheus.Histogram - seriesGetAllDuration prometheus.Histogram - seriesMergeDuration prometheus.Histogram - resultSeriesCount prometheus.Histogram - chunkSizeBytes prometheus.Histogram - postingsSizeBytes prometheus.Histogram - queriesDropped *prometheus.CounterVec - seriesRefetches prometheus.Counter - chunkRefetches prometheus.Counter - emptyPostingCount prometheus.Counter + blocksLoaded prometheus.Gauge + blockLoads prometheus.Counter + blockLoadFailures prometheus.Counter + lastLoadedBlock prometheus.Gauge + blockDrops prometheus.Counter + blockDropFailures prometheus.Counter + seriesDataTouched *prometheus.HistogramVec + seriesDataFetched *prometheus.HistogramVec + seriesDataSizeTouched *prometheus.HistogramVec + seriesDataSizeFetched *prometheus.HistogramVec + seriesBlocksQueried prometheus.Histogram + seriesGetAllDuration prometheus.Histogram + seriesMergeDuration prometheus.Histogram + resultSeriesCount prometheus.Histogram + chunkSizeBytes prometheus.Histogram + postingsSizeBytes prometheus.Histogram + queriesDropped *prometheus.CounterVec + seriesRefetches prometheus.Counter + chunkRefetches prometheus.Counter + emptyPostingCount prometheus.Counter + lazyExpandedPostingsCount prometheus.Counter cachedPostingsCompressions *prometheus.CounterVec cachedPostingsCompressionErrors *prometheus.CounterVec @@ -302,6 +303,11 @@ func newBucketStoreMetrics(reg prometheus.Registerer) *bucketStoreMetrics { Help: "Total number of empty postings when fetching block series.", }) + m.lazyExpandedPostingsCount = promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "thanos_bucket_store_lazy_expanded_postings_total", + Help: "Total number of lazy expanded postings when fetching block series.", + }) + return &m } @@ -366,6 +372,8 @@ type BucketStore struct { enableChunkHashCalculation bool + enabledLazyExpandedPostings bool + bmtx sync.Mutex labelNamesSet stringset.Set @@ -473,6 +481,13 @@ func WithBlockEstimatedMaxChunkFunc(f BlockEstimator) BucketStoreOption { } } +// WithLazyExpandedPostings enables lazy expanded postings. +func WithLazyExpandedPostings(enabled bool) BucketStoreOption { + return func(s *BucketStore) { + s.enabledLazyExpandedPostings = enabled + } +} + // NewBucketStore creates a new bucket backed store that implements the store API against // an object store bucket. It is optimized to work against high latency backends. func NewBucketStore( @@ -900,18 +915,20 @@ type blockSeriesClient struct { skipChunks bool shardMatcher *storepb.ShardMatcher + blockMatchers []*labels.Matcher calculateChunkHash bool chunkFetchDuration prometheus.Histogram // Internal state. - i uint64 - postings []storage.SeriesRef - chkMetas []chunks.Meta - lset labels.Labels - symbolizedLset []symbolizedLabel - entries []seriesEntry - hasMorePostings bool - batchSize int + i uint64 + lazyPostings *lazyExpandedPostings + expandedPostings []storage.SeriesRef + chkMetas []chunks.Meta + lset labels.Labels + symbolizedLset []symbolizedLabel + entries []seriesEntry + hasMorePostings bool + batchSize int } func newBlockSeriesClient( @@ -921,6 +938,7 @@ func newBlockSeriesClient( req *storepb.SeriesRequest, limiter ChunksLimiter, bytesLimiter BytesLimiter, + blockMatchers []*labels.Matcher, shardMatcher *storepb.ShardMatcher, calculateChunkHash bool, batchSize int, @@ -954,6 +972,7 @@ func newBlockSeriesClient( loadAggregates: req.Aggregates, shardMatcher: shardMatcher, + blockMatchers: blockMatchers, calculateChunkHash: calculateChunkHash, hasMorePostings: true, batchSize: batchSize, @@ -995,23 +1014,33 @@ func newSortedMatchers(matchers []*labels.Matcher) sortedMatchers { func (b *blockSeriesClient) ExpandPostings( matchers sortedMatchers, seriesLimiter SeriesLimiter, + lazyExpandedPostingEnabled bool, + lazyExpandedPostingsCount prometheus.Counter, ) error { - ps, err := b.indexr.ExpandedPostings(b.ctx, matchers, b.bytesLimiter) + ps, err := b.indexr.ExpandedPostings(b.ctx, matchers, b.bytesLimiter, lazyExpandedPostingEnabled) if err != nil { return errors.Wrap(err, "expanded matching posting") } - if len(ps) == 0 { + if ps == nil || len(ps.postings) == 0 { + b.lazyPostings = emptyLazyPostings return nil } + b.lazyPostings = ps - if err := seriesLimiter.Reserve(uint64(len(ps))); err != nil { + // If lazy expanded posting enabled, it is possible to fetch more series + // so easier to hit the series limit. + if err := seriesLimiter.Reserve(uint64(len(ps.postings))); err != nil { return httpgrpc.Errorf(int(codes.ResourceExhausted), "exceeded series limit: %s", err) } - b.postings = ps - if b.batchSize > len(ps) { - b.batchSize = len(ps) + if b.batchSize > len(ps.postings) { + b.batchSize = len(ps.postings) + } + if b.lazyPostings.lazyExpanded() { + // Assume lazy expansion could cut actual expanded postings length to 50%. + b.expandedPostings = make([]storage.SeriesRef, 0, len(b.lazyPostings.postings)/2) + lazyExpandedPostingsCount.Inc() } b.entries = make([]seriesEntry, 0, b.batchSize) return nil @@ -1043,14 +1072,26 @@ func (b *blockSeriesClient) Recv() (*storepb.SeriesResponse, error) { func (b *blockSeriesClient) nextBatch() error { start := b.i end := start + SeriesBatchSize - if end > uint64(len(b.postings)) { - end = uint64(len(b.postings)) + if end > uint64(len(b.lazyPostings.postings)) { + end = uint64(len(b.lazyPostings.postings)) } b.i = end - postingsBatch := b.postings[start:end] + postingsBatch := b.lazyPostings.postings[start:end] if len(postingsBatch) == 0 { b.hasMorePostings = false + if b.lazyPostings.lazyExpanded() { + v, err := b.indexr.IndexVersion() + if err != nil { + return errors.Wrap(err, "get index version") + } + if v >= 2 { + for i := range b.expandedPostings { + b.expandedPostings[i] = b.expandedPostings[i] / 16 + } + } + b.indexr.storeExpandedPostingsToCache(b.blockMatchers, index.NewListPostings(b.expandedPostings), len(b.expandedPostings)) + } return nil } @@ -1064,6 +1105,7 @@ func (b *blockSeriesClient) nextBatch() error { } b.entries = b.entries[:0] +OUTER: for i := 0; i < len(postingsBatch); i++ { if err := b.ctx.Err(); err != nil { return err @@ -1080,6 +1122,16 @@ func (b *blockSeriesClient) nextBatch() error { return errors.Wrap(err, "Lookup labels symbols") } + for _, matcher := range b.lazyPostings.matchers { + val := b.lset.Get(matcher.Name) + if !matcher.Matches(val) { + continue OUTER + } + } + if b.lazyPostings.lazyExpanded() { + b.expandedPostings = append(b.expandedPostings, postingsBatch[i]) + } + completeLabelset := labelpb.ExtendSortedLabels(b.lset, b.extLset) if b.extLsetToRemove != nil { completeLabelset = rmLabels(completeLabelset, b.extLsetToRemove) @@ -1318,6 +1370,17 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store if !ok { continue } + // Sort matchers to make sure we generate the same cache key + // when fetching expanded postings. + sort.Slice(blockMatchers, func(i, j int) bool { + if blockMatchers[i].Type == blockMatchers[j].Type { + if blockMatchers[i].Name == blockMatchers[j].Name { + return blockMatchers[i].Value < blockMatchers[j].Value + } + return blockMatchers[i].Name < blockMatchers[j].Name + } + return blockMatchers[i].Type < blockMatchers[j].Type + }) sortedBlockMatchers := newSortedMatchers(blockMatchers) @@ -1345,6 +1408,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store req, chunksLimiter, bytesLimiter, + blockMatchers, shardMatcher, s.enableChunkHashCalculation, s.seriesBatchSize, @@ -1369,7 +1433,12 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store mtx.Unlock() } - if err := blockClient.ExpandPostings(sortedBlockMatchers, seriesLimiter); err != nil { + if err := blockClient.ExpandPostings( + sortedBlockMatchers, + seriesLimiter, + s.enabledLazyExpandedPostings, + s.metrics.lazyExpandedPostingsCount, + ); err != nil { onClose() span.Finish() return errors.Wrapf(err, "fetch postings for block %s", blk.meta.ULID) @@ -1643,6 +1712,7 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq seriesReq, nil, bytesLimiter, + reqSeriesMatchersNoExtLabels, nil, true, SeriesBatchSize, @@ -1654,6 +1724,8 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq if err := blockClient.ExpandPostings( sortedReqSeriesMatchersNoExtLabels, seriesLimiter, + s.enabledLazyExpandedPostings, + s.metrics.lazyExpandedPostingsCount, ); err != nil { return err } @@ -1871,6 +1943,7 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR seriesReq, nil, bytesLimiter, + reqSeriesMatchersNoExtLabels, nil, true, SeriesBatchSize, @@ -1882,6 +1955,8 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR if err := blockClient.ExpandPostings( sortedReqSeriesMatchersNoExtLabels, seriesLimiter, + s.enabledLazyExpandedPostings, + s.metrics.lazyExpandedPostingsCount, ); err != nil { return err } @@ -2275,6 +2350,8 @@ type bucketIndexReader struct { mtx sync.Mutex loadedSeries map[storage.SeriesRef][]byte + + indexVersion int } func newBucketIndexReader(block *bucketBlock) *bucketIndexReader { @@ -2288,6 +2365,20 @@ func newBucketIndexReader(block *bucketBlock) *bucketIndexReader { } return r } + +// IndexVersion caches the index header version. +func (r *bucketIndexReader) IndexVersion() (int, error) { + if r.indexVersion != 0 { + return r.indexVersion, nil + } + v, err := r.block.indexHeaderReader.IndexVersion() + if err != nil { + return 0, err + } + r.indexVersion = v + return v, nil +} + func (r *bucketIndexReader) reset() { r.loadedSeries = map[storage.SeriesRef][]byte{} } @@ -2301,7 +2392,7 @@ func (r *bucketIndexReader) reset() { // Reminder: A posting is a reference (represented as a uint64) to a series reference, which in turn points to the first // chunk where the series contains the matching label-value pair for a given block of data. Postings can be fetched by // single label name=value. -func (r *bucketIndexReader) ExpandedPostings(ctx context.Context, ms sortedMatchers, bytesLimiter BytesLimiter) ([]storage.SeriesRef, error) { +func (r *bucketIndexReader) ExpandedPostings(ctx context.Context, ms sortedMatchers, bytesLimiter BytesLimiter, lazyExpandedPostingEnabled bool) (*lazyExpandedPostings, error) { // Shortcut the case of `len(postingGroups) == 0`. It will only happen when no // matchers specified, and we don't need to fetch expanded postings from cache. if len(ms) == 0 { @@ -2313,12 +2404,11 @@ func (r *bucketIndexReader) ExpandedPostings(ctx context.Context, ms sortedMatch return nil, err } if hit { - return postings, nil + return newLazyExpandedPostings(postings), nil } var ( allRequested = false hasAdds = false - keys []labels.Label ) postingGroups, err := matchersToPostingGroups(ctx, r.block.indexHeaderReader.LabelValues, ms) @@ -2329,83 +2419,50 @@ func (r *bucketIndexReader) ExpandedPostings(ctx context.Context, ms sortedMatch r.storeExpandedPostingsToCache(ms, index.EmptyPostings(), 0) return nil, nil } + i := 0 for _, pg := range postingGroups { allRequested = allRequested || pg.addAll hasAdds = hasAdds || len(pg.addKeys) > 0 - // Postings returned by fetchPostings will be in the same order as keys - // so it's important that we iterate them in the same order later. - // We don't have any other way of pairing keys and fetched postings. - for _, key := range pg.addKeys { - keys = append(keys, labels.Label{Name: pg.name, Value: key}) - } - for _, key := range pg.removeKeys { - keys = append(keys, labels.Label{Name: pg.name, Value: key}) + // If a posting group doesn't have any keys, like posting group created + // from `=~".*"`, we don't have to keep the posting group as long as we + // keep track of whether we need to add all postings or not. + if len(pg.addKeys) == 0 && len(pg.removeKeys) == 0 { + continue } + postingGroups[i] = pg + i++ } + postingGroups = postingGroups[:i] + addAllPostings := allRequested && !hasAdds // We only need special All postings if there are no other adds. If there are, we can skip fetching // special All postings completely. - if allRequested && !hasAdds { + if addAllPostings { // add group with label to fetch "special All postings". name, value := index.AllPostingsKey() - allPostingsLabel := labels.Label{Name: name, Value: value} - postingGroups = append(postingGroups, newPostingGroup(true, name, []string{value}, nil)) - keys = append(keys, allPostingsLabel) } - fetchedPostings, closeFns, err := r.fetchPostings(ctx, keys, bytesLimiter) - defer func() { - for _, closeFn := range closeFns { - closeFn() - } - }() + ps, err := fetchLazyExpandedPostings(ctx, postingGroups, r, bytesLimiter, addAllPostings, lazyExpandedPostingEnabled) if err != nil { - return nil, errors.Wrap(err, "get postings") - } - - // Get "add" and "remove" postings from groups. We iterate over postingGroups and their keys - // again, and this is exactly the same order as before (when building the groups), so we can simply - // use one incrementing index to fetch postings from returned slice. - postingIndex := 0 - - var groupAdds, groupRemovals []index.Postings - for _, g := range postingGroups { - // We cannot add empty set to groupAdds, since they are intersected. - if len(g.addKeys) > 0 { - toMerge := make([]index.Postings, 0, len(g.addKeys)) - for _, l := range g.addKeys { - toMerge = append(toMerge, checkNilPosting(g.name, l, fetchedPostings[postingIndex])) - postingIndex++ - } - - groupAdds = append(groupAdds, index.Merge(toMerge...)) - } - - for _, l := range g.removeKeys { - groupRemovals = append(groupRemovals, checkNilPosting(g.name, l, fetchedPostings[postingIndex])) - postingIndex++ - } + return nil, errors.Wrap(err, "fetch and expand postings") } - - result := index.Without(index.Intersect(groupAdds...), index.Merge(groupRemovals...)) - ps, err := ExpandPostingsWithContext(ctx, result) - if err != nil { - return nil, errors.Wrap(err, "expand") + // If postings still have matchers to be applied lazily, cache expanded postings after filtering series so skip here. + if !ps.lazyExpanded() { + r.storeExpandedPostingsToCache(ms, index.NewListPostings(ps.postings), len(ps.postings)) } - r.storeExpandedPostingsToCache(ms, index.NewListPostings(ps), len(ps)) - if len(ps) > 0 { + if len(ps.postings) > 0 { // As of version two all series entries are 16 byte padded. All references // we get have to account for that to get the correct offset. - version, err := r.block.indexHeaderReader.IndexVersion() + version, err := r.IndexVersion() if err != nil { return nil, errors.Wrap(err, "get index version") } if version >= 2 { - for i, id := range ps { - ps[i] = id * 16 + for i, id := range ps.postings { + ps.postings[i] = id * 16 } } } @@ -2428,22 +2485,26 @@ func ExpandPostingsWithContext(ctx context.Context, p index.Postings) (res []sto // If addAll is not set: Merge of postings for "addKeys" labels minus postings for removeKeys labels // This computation happens in ExpandedPostings. type postingGroup struct { - addAll bool - name string - addKeys []string - removeKeys []string + addAll bool + name string + matchers []*labels.Matcher + addKeys []string + removeKeys []string + cardinality int64 + lazy bool } func newPostingGroup(addAll bool, name string, addKeys, removeKeys []string) *postingGroup { return &postingGroup{ - addAll: addAll, name: name, + addAll: addAll, addKeys: addKeys, removeKeys: removeKeys, } } -func (pg postingGroup) merge(other *postingGroup) *postingGroup { +// mergeKeys merges keys from two posting groups and ignores other fields. +func (pg postingGroup) mergeKeys(other *postingGroup) *postingGroup { if other == nil { return &pg } @@ -2539,12 +2600,16 @@ func checkNilPosting(name, value string, p index.Postings) index.Postings { } func matchersToPostingGroups(ctx context.Context, lvalsFn func(name string) ([]string, error), ms []*labels.Matcher) ([]*postingGroup, error) { - matchersMap := make(map[string][]*labels.Matcher) + matchersMap := make(map[string]map[string]*labels.Matcher) for _, m := range ms { - matchersMap[m.Name] = append(matchersMap[m.Name], m) + m := m + if _, ok := matchersMap[m.Name]; !ok { + matchersMap[m.Name] = make(map[string]*labels.Matcher) + } + matchersMap[m.Name][m.String()] = m } - pgs := make([]*postingGroup, 0) + pgs := make([]*postingGroup, 0, len(matchersMap)) // NOTE: Derived from tsdb.PostingsForMatchers. for _, values := range matchersMap { var ( @@ -2555,8 +2620,9 @@ func matchersToPostingGroups(ctx context.Context, lvalsFn func(name string) ([]s valuesCached bool ) lvalsFunc := lvalsFn + matchers := make([]*labels.Matcher, 0, len(vals)) // Merge PostingGroups with the same matcher into 1 to - // avoid fetching duplicate postings. + // avoid fetching duplicate postings. for _, val := range values { pg, vals, err = toPostingGroup(ctx, lvalsFunc, val) if err != nil { @@ -2579,7 +2645,7 @@ func matchersToPostingGroups(ctx context.Context, lvalsFn func(name string) ([]s if mergedPG == nil { mergedPG = pg } else { - mergedPG = mergedPG.merge(pg) + mergedPG = mergedPG.mergeKeys(pg) } // If this groups adds nothing, it's an empty group. We can shortcut this, since intersection with empty @@ -2588,7 +2654,19 @@ func matchersToPostingGroups(ctx context.Context, lvalsFn func(name string) ([]s if !mergedPG.addAll && len(mergedPG.addKeys) == 0 { return nil, nil } - } + matchers = append(matchers, val) + } + // Set and sort matchers to be used when picking up posting fetch strategy. + mergedPG.matchers = matchers + slices.SortFunc(mergedPG.matchers, func(a, b *labels.Matcher) bool { + if a.Type == b.Type { + if a.Name == b.Name { + return a.Value < b.Value + } + return a.Name < b.Name + } + return a.Type < b.Type + }) pgs = append(pgs, mergedPG) } slices.SortFunc(pgs, func(a, b *postingGroup) bool { diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index e13afd6518..79edc35ebd 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -31,6 +31,7 @@ import ( "github.com/leanovate/gopter/prop" "github.com/oklog/ulid" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" promtest "github.com/prometheus/client_golang/prometheus/testutil" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/relabel" @@ -1099,20 +1100,27 @@ func uploadTestBlock(t testing.TB, tmpDir string, bkt objstore.Bucket, series in }() logger := log.NewNopLogger() + ctx := context.Background() - appendTestData(t, h.Appender(context.Background()), series) + appendTestData(t, h.Appender(ctx), series) - testutil.Ok(t, os.MkdirAll(filepath.Join(tmpDir, "tmp"), os.ModePerm)) - id := createBlockFromHead(t, filepath.Join(tmpDir, "tmp"), h) + dir := filepath.Join(tmpDir, "tmp") + testutil.Ok(t, os.MkdirAll(dir, os.ModePerm)) + id := createBlockFromHead(t, dir, h) + bdir := filepath.Join(dir, id.String()) + meta, err := metadata.ReadFromDir(bdir) + testutil.Ok(t, err) + stats, err := block.GatherIndexHealthStats(logger, filepath.Join(bdir, block.IndexFilename), meta.MinTime, meta.MaxTime) + testutil.Ok(t, err) _, err = metadata.InjectThanos(log.NewNopLogger(), filepath.Join(tmpDir, "tmp", id.String()), metadata.Thanos{ Labels: labels.Labels{{Name: "ext1", Value: "1"}}.Map(), Downsample: metadata.ThanosDownsample{Resolution: 0}, Source: metadata.TestSource, + IndexStats: metadata.IndexStats{SeriesMaxSize: stats.SeriesMaxSize, ChunkMaxSize: stats.ChunkMaxSize}, }, nil) testutil.Ok(t, err) - testutil.Ok(t, block.Upload(context.Background(), logger, bkt, filepath.Join(tmpDir, "tmp", id.String()), metadata.NoneFunc)) - testutil.Ok(t, block.Upload(context.Background(), logger, bkt, filepath.Join(tmpDir, "tmp", id.String()), metadata.NoneFunc)) + testutil.Ok(t, block.Upload(ctx, logger, bkt, bdir, metadata.NoneFunc)) return id } @@ -1229,9 +1237,9 @@ func benchmarkExpandedPostings( t.ResetTimer() for i := 0; i < t.N(); i++ { - p, err := indexr.ExpandedPostings(context.Background(), newSortedMatchers(c.matchers), NewBytesLimiterFactory(0)(nil)) + p, err := indexr.ExpandedPostings(context.Background(), newSortedMatchers(c.matchers), NewBytesLimiterFactory(0)(nil), false) testutil.Ok(t, err) - testutil.Equals(t, c.expectedLen, len(p)) + testutil.Equals(t, c.expectedLen, len(p.postings)) } }) } @@ -1262,9 +1270,10 @@ func TestExpandedPostingsEmptyPostings(t *testing.T) { matcher1 := labels.MustNewMatcher(labels.MatchEqual, "j", "foo") // Match nothing. matcher2 := labels.MustNewMatcher(labels.MatchRegexp, "i", "500.*") - ps, err := indexr.ExpandedPostings(context.Background(), newSortedMatchers([]*labels.Matcher{matcher1, matcher2}), NewBytesLimiterFactory(0)(nil)) + ctx := context.Background() + ps, err := indexr.ExpandedPostings(ctx, newSortedMatchers([]*labels.Matcher{matcher1, matcher2}), NewBytesLimiterFactory(0)(nil), false) testutil.Ok(t, err) - testutil.Equals(t, len(ps), 0) + testutil.Equals(t, ps, (*lazyExpandedPostings)(nil)) // Make sure even if a matcher doesn't match any postings, we still cache empty expanded postings. testutil.Equals(t, 1, indexr.stats.cachedPostingsCompressions) } @@ -1272,21 +1281,28 @@ func TestExpandedPostingsEmptyPostings(t *testing.T) { func TestBucketSeries(t *testing.T) { tb := testutil.NewTB(t) storetestutil.RunSeriesInterestingCases(tb, 200e3, 200e3, func(t testutil.TB, samplesPerSeries, series int) { - benchBucketSeries(t, chunkenc.ValFloat, false, samplesPerSeries, series, 1) + benchBucketSeries(t, chunkenc.ValFloat, false, false, samplesPerSeries, series, 1) + }) +} + +func TestBucketSeriesLazyExpandedPostings(t *testing.T) { + tb := testutil.NewTB(t) + storetestutil.RunSeriesInterestingCases(tb, 200e3, 200e3, func(t testutil.TB, samplesPerSeries, series int) { + benchBucketSeries(t, chunkenc.ValFloat, false, true, samplesPerSeries, series, 1) }) } func TestBucketHistogramSeries(t *testing.T) { tb := testutil.NewTB(t) storetestutil.RunSeriesInterestingCases(tb, 200e3, 200e3, func(t testutil.TB, samplesPerSeries, series int) { - benchBucketSeries(t, chunkenc.ValHistogram, false, samplesPerSeries, series, 1) + benchBucketSeries(t, chunkenc.ValHistogram, false, false, samplesPerSeries, series, 1) }) } func TestBucketSkipChunksSeries(t *testing.T) { tb := testutil.NewTB(t) storetestutil.RunSeriesInterestingCases(tb, 200e3, 200e3, func(t testutil.TB, samplesPerSeries, series int) { - benchBucketSeries(t, chunkenc.ValFloat, true, samplesPerSeries, series, 1) + benchBucketSeries(t, chunkenc.ValFloat, true, false, samplesPerSeries, series, 1) }) } @@ -1294,7 +1310,7 @@ func BenchmarkBucketSeries(b *testing.B) { tb := testutil.NewTB(b) // 10e6 samples = ~1736 days with 15s scrape storetestutil.RunSeriesInterestingCases(tb, 10e6, 10e5, func(t testutil.TB, samplesPerSeries, series int) { - benchBucketSeries(t, chunkenc.ValFloat, false, samplesPerSeries, series, 1/100e6, 1/10e4, 1) + benchBucketSeries(t, chunkenc.ValFloat, false, false, samplesPerSeries, series, 1/100e6, 1/10e4, 1) }) } @@ -1302,11 +1318,11 @@ func BenchmarkBucketSkipChunksSeries(b *testing.B) { tb := testutil.NewTB(b) // 10e6 samples = ~1736 days with 15s scrape storetestutil.RunSeriesInterestingCases(tb, 10e6, 10e5, func(t testutil.TB, samplesPerSeries, series int) { - benchBucketSeries(t, chunkenc.ValFloat, true, samplesPerSeries, series, 1/100e6, 1/10e4, 1) + benchBucketSeries(t, chunkenc.ValFloat, true, false, samplesPerSeries, series, 1/100e6, 1/10e4, 1) }) } -func benchBucketSeries(t testutil.TB, sampleType chunkenc.ValueType, skipChunk bool, samplesPerSeries, totalSeries int, requestedRatios ...float64) { +func benchBucketSeries(t testutil.TB, sampleType chunkenc.ValueType, skipChunk, lazyExpandedPostings bool, samplesPerSeries, totalSeries int, requestedRatios ...float64) { const numOfBlocks = 4 tmpDir := t.TempDir() @@ -1322,12 +1338,6 @@ func benchBucketSeries(t testutil.TB, sampleType chunkenc.ValueType, skipChunk b ) extLset := labels.Labels{{Name: "ext1", Value: "1"}} - thanosMeta := metadata.Thanos{ - Labels: extLset.Map(), - Downsample: metadata.ThanosDownsample{Resolution: 0}, - Source: metadata.TestSource, - } - blockDir := filepath.Join(tmpDir, "tmp") samplesPerSeriesPerBlock := samplesPerSeries / numOfBlocks @@ -1355,19 +1365,33 @@ func benchBucketSeries(t testutil.TB, sampleType chunkenc.ValueType, skipChunk b }) id := createBlockFromHead(t, blockDir, head) testutil.Ok(t, head.Close()) + blockIDDir := filepath.Join(blockDir, id.String()) + meta, err := metadata.ReadFromDir(blockIDDir) + testutil.Ok(t, err) + stats, err := block.GatherIndexHealthStats(logger, filepath.Join(blockIDDir, block.IndexFilename), meta.MinTime, meta.MaxTime) + testutil.Ok(t, err) + thanosMeta := metadata.Thanos{ + Labels: extLset.Map(), + Downsample: metadata.ThanosDownsample{Resolution: 0}, + Source: metadata.TestSource, + IndexStats: metadata.IndexStats{ + SeriesMaxSize: stats.SeriesMaxSize, + ChunkMaxSize: stats.ChunkMaxSize, + }, + } // Histogram chunks are represented differently in memory and on disk. In order to // have a precise comparison, we need to use the on-disk representation as the expected value // instead of the in-memory one. - diskBlock, err := tsdb.OpenBlock(logger, path.Join(blockDir, id.String()), nil) + diskBlock, err := tsdb.OpenBlock(logger, blockIDDir, nil) testutil.Ok(t, err) series = append(series, storetestutil.ReadSeriesFromBlock(t, diskBlock, extLset, skipChunk)...) - meta, err := metadata.InjectThanos(logger, filepath.Join(blockDir, id.String()), thanosMeta, nil) + meta, err = metadata.InjectThanos(logger, blockIDDir, thanosMeta, nil) testutil.Ok(t, err) - testutil.Ok(t, meta.WriteToDir(logger, filepath.Join(blockDir, id.String()))) - testutil.Ok(t, block.Upload(context.Background(), logger, bkt, filepath.Join(blockDir, id.String()), metadata.NoneFunc)) + testutil.Ok(t, meta.WriteToDir(logger, blockIDDir)) + testutil.Ok(t, block.Upload(context.Background(), logger, bkt, blockIDDir, metadata.NoneFunc)) } ibkt := objstore.WithNoopInstr(bkt) @@ -1393,6 +1417,7 @@ func benchBucketSeries(t testutil.TB, sampleType chunkenc.ValueType, skipChunk b 0, WithLogger(logger), WithChunkPool(chunkPool), + WithLazyExpandedPostings(lazyExpandedPostings), ) testutil.Ok(t, err) @@ -2702,6 +2727,7 @@ func benchmarkBlockSeriesWithConcurrency(b *testing.B, concurrency int, blockMet wg := sync.WaitGroup{} wg.Add(concurrency) + dummyCounter := promauto.NewCounter(prometheus.CounterOpts{}) for w := 0; w < concurrency; w++ { go func() { defer wg.Done() @@ -2736,13 +2762,14 @@ func benchmarkBlockSeriesWithConcurrency(b *testing.B, concurrency int, blockMet req, chunksLimiter, NewBytesLimiterFactory(0)(nil), + matchers, nil, false, SeriesBatchSize, dummyHistogram, nil, ) - testutil.Ok(b, blockClient.ExpandPostings(sortedMatchers, seriesLimiter)) + testutil.Ok(b, blockClient.ExpandPostings(sortedMatchers, seriesLimiter, false, dummyCounter)) defer blockClient.Close() // Ensure at least 1 series has been returned (as expected). @@ -2797,9 +2824,10 @@ func TestMatchersToPostingGroup(t *testing.T) { }, expected: []*postingGroup{ { - name: "foo", - addAll: false, - addKeys: []string{"bar"}, + name: "foo", + addAll: false, + addKeys: []string{"bar"}, + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")}, }, }, }, @@ -2814,9 +2842,10 @@ func TestMatchersToPostingGroup(t *testing.T) { }, expected: []*postingGroup{ { - name: "foo", - addAll: false, - addKeys: []string{"bar"}, + name: "foo", + addAll: false, + addKeys: []string{"bar"}, + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")}, }, }, }, @@ -2834,9 +2863,10 @@ func TestMatchersToPostingGroup(t *testing.T) { }, expected: []*postingGroup{ { - name: "foo", - addAll: false, - addKeys: []string{"bar"}, + name: "foo", + addAll: false, + addKeys: []string{"bar"}, + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")}, }, }, }, @@ -2852,14 +2882,16 @@ func TestMatchersToPostingGroup(t *testing.T) { }, expected: []*postingGroup{ { - name: "bar", - addAll: false, - addKeys: []string{"baz"}, + name: "bar", + addAll: false, + addKeys: []string{"baz"}, + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "bar", "baz")}, }, { - name: "foo", - addAll: false, - addKeys: []string{"bar"}, + name: "foo", + addAll: false, + addKeys: []string{"bar"}, + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")}, }, }, }, @@ -2896,9 +2928,10 @@ func TestMatchersToPostingGroup(t *testing.T) { }, expected: []*postingGroup{ { - name: "foo", - addAll: false, - addKeys: []string{"bar"}, + name: "foo", + addAll: false, + addKeys: []string{"bar"}, + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"), labels.MustNewMatcher(labels.MatchNotEqual, "foo", "baz")}, }, }, }, @@ -2923,9 +2956,10 @@ func TestMatchersToPostingGroup(t *testing.T) { }, expected: []*postingGroup{ { - name: "foo", - addAll: false, - addKeys: []string{"bar"}, + name: "foo", + addAll: false, + addKeys: []string{"bar"}, + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"), labels.MustNewMatcher(labels.MatchRegexp, "foo", "b.*")}, }, }, }, @@ -2940,9 +2974,10 @@ func TestMatchersToPostingGroup(t *testing.T) { }, expected: []*postingGroup{ { - name: "foo", - addAll: false, - addKeys: []string{"bar"}, + name: "foo", + addAll: false, + addKeys: []string{"bar"}, + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"), labels.MustNewMatcher(labels.MatchNotEqual, "foo", "")}, }, }, }, @@ -2957,9 +2992,10 @@ func TestMatchersToPostingGroup(t *testing.T) { }, expected: []*postingGroup{ { - name: "foo", - addAll: false, - addKeys: []string{"bar"}, + name: "foo", + addAll: false, + addKeys: []string{"bar"}, + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"), labels.MustNewMatcher(labels.MatchRegexp, "foo", ".+")}, }, }, }, @@ -2974,9 +3010,10 @@ func TestMatchersToPostingGroup(t *testing.T) { }, expected: []*postingGroup{ { - name: "foo", - addAll: false, - addKeys: []string{"bar"}, + name: "foo", + addAll: false, + addKeys: []string{"bar"}, + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchRegexp, "foo", "bar|baz"), labels.MustNewMatcher(labels.MatchRegexp, "foo", "bar|buzz")}, }, }, }, @@ -2994,6 +3031,7 @@ func TestMatchersToPostingGroup(t *testing.T) { name: "foo", addAll: true, removeKeys: []string{"bar", "baz"}, + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchNotEqual, "foo", "bar"), labels.MustNewMatcher(labels.MatchNotEqual, "foo", "baz")}, }, }, }, @@ -3011,8 +3049,9 @@ func TestMatchersToPostingGroup(t *testing.T) { }, expected: []*postingGroup{ { - name: labels.MetricName, - addAll: true, + name: labels.MetricName, + addAll: true, + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchRegexp, "__name__", ".*")}, }, }, }, @@ -3030,18 +3069,21 @@ func TestMatchersToPostingGroup(t *testing.T) { }, expected: []*postingGroup{ { - name: labels.MetricName, - addAll: false, - addKeys: []string{"up"}, + name: labels.MetricName, + addAll: false, + addKeys: []string{"up"}, + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "__name__", "up")}, }, { - name: "cluster", - addAll: false, - addKeys: []string{"us-east-1", "us-west-2"}, + name: "cluster", + addAll: false, + addKeys: []string{"us-east-1", "us-west-2"}, + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchNotEqual, "cluster", "")}, }, { - name: "job", - addAll: true, + name: "job", + addAll: true, + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchRegexp, "job", ".*")}, }, }, }, @@ -3062,19 +3104,22 @@ func TestMatchersToPostingGroup(t *testing.T) { }, expected: []*postingGroup{ { - name: labels.MetricName, - addAll: false, - addKeys: []string{"go_info", "up"}, + name: labels.MetricName, + addAll: false, + addKeys: []string{"go_info", "up"}, + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchNotEqual, "__name__", "")}, }, { - name: "cluster", - addAll: false, - addKeys: []string{"us-east-1", "us-west-2"}, + name: "cluster", + addAll: false, + addKeys: []string{"us-east-1", "us-west-2"}, + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchNotEqual, "cluster", "")}, }, { - name: "job", - addAll: false, - addKeys: []string{"prometheus", "thanos"}, + name: "job", + addAll: false, + addKeys: []string{"prometheus", "thanos"}, + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchNotEqual, "job", "")}, }, }, }, @@ -3209,7 +3254,7 @@ func TestPostingGroupMerge(t *testing.T) { slices.Sort(tc.group2.addKeys) slices.Sort(tc.group2.removeKeys) } - res := tc.group1.merge(tc.group2) + res := tc.group1.mergeKeys(tc.group2) testutil.Equals(t, tc.expected, res) }) } @@ -3312,16 +3357,16 @@ func TestExpandedPostingsRace(t *testing.T) { i := i bb := bb go func(i int, bb *bucketBlock) { - refs, err := bb.indexReader().ExpandedPostings(context.Background(), m, NewBytesLimiterFactory(0)(nil)) + refs, err := bb.indexReader().ExpandedPostings(context.Background(), m, NewBytesLimiterFactory(0)(nil), false) testutil.Ok(t, err) defer wg.Done() l.Lock() defer l.Unlock() if previousRefs[i] != nil { - testutil.Equals(t, previousRefs[i], refs) + testutil.Equals(t, previousRefs[i], refs.postings) } else { - previousRefs[i] = refs + previousRefs[i] = refs.postings } }(i, bb) } diff --git a/pkg/store/lazy_postings.go b/pkg/store/lazy_postings.go new file mode 100644 index 0000000000..6c48015429 --- /dev/null +++ b/pkg/store/lazy_postings.go @@ -0,0 +1,260 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package store + +import ( + "context" + "math" + + "github.com/pkg/errors" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/tsdb/index" + "golang.org/x/exp/slices" + + "github.com/thanos-io/thanos/pkg/block/indexheader" +) + +var emptyLazyPostings = &lazyExpandedPostings{postings: nil, matchers: nil} + +// lazyExpandedPostings contains expanded postings (series IDs). If lazy posting expansion is +// enabled, it might contain matchers that can be lazily applied during series filtering time. +type lazyExpandedPostings struct { + postings []storage.SeriesRef + matchers []*labels.Matcher +} + +func newLazyExpandedPostings(ps []storage.SeriesRef, matchers ...*labels.Matcher) *lazyExpandedPostings { + return &lazyExpandedPostings{ + postings: ps, + matchers: matchers, + } +} + +func (p *lazyExpandedPostings) lazyExpanded() bool { + return p != nil && len(p.matchers) > 0 +} + +func optimizePostingsFetchByDownloadedBytes(r *bucketIndexReader, postingGroups []*postingGroup, seriesMaxSize int64, seriesMatchRatio float64) ([]*postingGroup, bool, error) { + if len(postingGroups) <= 1 { + return postingGroups, false, nil + } + // Collect posting cardinality of each posting group. + for _, pg := range postingGroups { + // A posting group can have either add keys or remove keys but not both the same time. + vals := pg.addKeys + if len(pg.removeKeys) > 0 { + vals = pg.removeKeys + } + rngs, err := r.block.indexHeaderReader.PostingsOffsets(pg.name, vals...) + if err != nil { + return nil, false, errors.Wrapf(err, "postings offsets for %s", pg.name) + } + + // No posting ranges found means empty posting. + if len(rngs) == 0 { + return nil, true, nil + } + for _, r := range rngs { + if r == indexheader.NotFoundRange { + continue + } + pg.cardinality += (r.End - r.Start - 4) / 4 + } + } + slices.SortFunc(postingGroups, func(a, b *postingGroup) bool { + if a.cardinality == b.cardinality { + return a.name < b.name + } + return a.cardinality < b.cardinality + }) + + /* + Algorithm of choosing what postings we need to fetch right now and what + postings we expand lazily. + Sort posting groups by cardinality, so we can iterate from posting group with the smallest posting size. + The algorithm focuses on fetching fewer data, including postings and series. + + We need to fetch at least 1 posting group in order to fetch series. So if we only fetch the first posting group, + the data bytes we need to download is formula F1: P1 * 4 + P1 * S where P1 is the number of postings in group 1 + and S is the size per series. 4 is the byte size per posting. + + If we are going to fetch 2 posting groups, we can intersect the two postings to reduce series we need to download (hopefully). + Assuming for each intersection, the series matching ratio is R (0 < R < 1). Then the data bytes we need to download is + formula F2: P1 * 4 + P2 * 4 + P1 * S * R. + We can get formula F3 if we are going to fetch 3 posting groups: + F3: P1 * 4 + P2 * 4 + P3 * 4 + P1 * S * R^2. + + Let's compare formula F2 and F1 first. + P1 * 4 + P2 * 4 + P1 * S * R < P1 * 4 + P1 * S + => P2 * 4 < P1 * S * (1 - R) + Left hand side is the posting group size and right hand side is basically the series size we don't need to fetch + by having the additional intersection. In order to fetch less data for F2 than F1, we just need to ensure that + the additional postings size is smaller. + + Let's compare formula F3 and F2. + P1 * 4 + P2 * 4 + P3 * 4 + P1 * S * R^2 < P1 * 4 + P2 * 4 + P1 * S * R + => P3 * 4 < P1 * S * R * (1 - R) + Same as the previous formula. + + Compare formula F4 (Cost to fetch up to 4 posting groups) and F3. + P4 * 4 < P1 * S * R^2 * (1 - R) + + We can generalize this to formula: Pn * 4 < P1 * S * R^(n - 2) * (1 - R) + + The idea of the algorithm: + By iterating the posting group in sorted order of cardinality, we need to make sure that by fetching the current posting group, + the total data fetched is smaller than the previous posting group. If so, then we continue to next posting group, + otherwise we stop. + + This ensures that when we stop at one posting group, posting groups after it always need to fetch more data. + Based on formula Pn * 4 < P1 * S * R^(n - 2) * (1 - R), left hand side is always increasing while iterating to larger + posting groups while right hand side value is always decreasing as R < 1. + */ + seriesBytesToFetch := postingGroups[0].cardinality * seriesMaxSize + p := float64(1) + i := 1 // Start from index 1 as we always need to fetch the smallest posting group. + for i < len(postingGroups) { + pg := postingGroups[i] + // Need to fetch more data on postings than series we avoid fetching, stop here and lazy expanding rest of matchers. + if pg.cardinality*4 > int64(p*math.Ceil((1-seriesMatchRatio)*float64(seriesBytesToFetch))) { + break + } + p = p * seriesMatchRatio + i++ + } + for i < len(postingGroups) { + postingGroups[i].lazy = true + i++ + } + return postingGroups, false, nil +} + +func fetchLazyExpandedPostings( + ctx context.Context, + postingGroups []*postingGroup, + r *bucketIndexReader, + bytesLimiter BytesLimiter, + addAllPostings bool, + lazyExpandedPostingEnabled bool, +) (*lazyExpandedPostings, error) { + var ( + err error + emptyPostingGroup bool + ) + /* + There are several cases that we skip postings fetch optimization: + - Lazy expanded posting disabled. + - Add all postings. This means we don't have a posting group with any add keys. + - `SeriesMaxSize` not set for this block then we have no way to estimate series size. + - Only one effective posting group available. We need to at least download postings from 1 posting group so no need to optimize. + */ + if lazyExpandedPostingEnabled && !addAllPostings && + r.block.meta.Thanos.IndexStats.SeriesMaxSize > 0 && len(postingGroups) > 1 { + postingGroups, emptyPostingGroup, err = optimizePostingsFetchByDownloadedBytes( + r, + postingGroups, + r.block.meta.Thanos.IndexStats.SeriesMaxSize, + 0.5, // TODO(yeya24): Expose this as a flag. + ) + if err != nil { + return nil, err + } + if emptyPostingGroup { + return emptyLazyPostings, nil + } + } + + ps, matchers, err := fetchAndExpandPostingGroups(ctx, r, postingGroups, bytesLimiter) + if err != nil { + return nil, err + } + return &lazyExpandedPostings{postings: ps, matchers: matchers}, nil +} + +// keysToFetchFromPostingGroups returns label pairs (postings) to fetch +// and matchers we need to use for lazy posting expansion. +// Input `postingGroups` needs to be ordered by cardinality in case lazy +// expansion is enabled. When we find the first lazy posting group we can exit. +func keysToFetchFromPostingGroups(postingGroups []*postingGroup) ([]labels.Label, []*labels.Matcher) { + var lazyMatchers []*labels.Matcher + keys := make([]labels.Label, 0) + i := 0 + for i < len(postingGroups) { + pg := postingGroups[i] + if pg.lazy { + break + } + + // Postings returned by fetchPostings will be in the same order as keys + // so it's important that we iterate them in the same order later. + // We don't have any other way of pairing keys and fetched postings. + for _, key := range pg.addKeys { + keys = append(keys, labels.Label{Name: pg.name, Value: key}) + } + for _, key := range pg.removeKeys { + keys = append(keys, labels.Label{Name: pg.name, Value: key}) + } + i++ + } + if i < len(postingGroups) { + lazyMatchers = make([]*labels.Matcher, 0) + for i < len(postingGroups) { + lazyMatchers = append(lazyMatchers, postingGroups[i].matchers...) + i++ + } + } + return keys, lazyMatchers +} + +func fetchAndExpandPostingGroups(ctx context.Context, r *bucketIndexReader, postingGroups []*postingGroup, bytesLimiter BytesLimiter) ([]storage.SeriesRef, []*labels.Matcher, error) { + keys, lazyMatchers := keysToFetchFromPostingGroups(postingGroups) + fetchedPostings, closeFns, err := r.fetchPostings(ctx, keys, bytesLimiter) + defer func() { + for _, closeFn := range closeFns { + closeFn() + } + }() + if err != nil { + return nil, nil, errors.Wrap(err, "get postings") + } + + // Get "add" and "remove" postings from groups. We iterate over postingGroups and their keys + // again, and this is exactly the same order as before (when building the groups), so we can simply + // use one incrementing index to fetch postings from returned slice. + postingIndex := 0 + + var groupAdds, groupRemovals []index.Postings + for _, g := range postingGroups { + if g.lazy { + break + } + // We cannot add empty set to groupAdds, since they are intersected. + if len(g.addKeys) > 0 { + toMerge := make([]index.Postings, 0, len(g.addKeys)) + for _, l := range g.addKeys { + toMerge = append(toMerge, checkNilPosting(g.name, l, fetchedPostings[postingIndex])) + postingIndex++ + } + + groupAdds = append(groupAdds, index.Merge(toMerge...)) + } + + for _, l := range g.removeKeys { + groupRemovals = append(groupRemovals, checkNilPosting(g.name, l, fetchedPostings[postingIndex])) + postingIndex++ + } + } + + result := index.Without(index.Intersect(groupAdds...), index.Merge(groupRemovals...)) + + if ctx.Err() != nil { + return nil, nil, ctx.Err() + } + ps, err := ExpandPostingsWithContext(ctx, result) + if err != nil { + return nil, nil, errors.Wrap(err, "expand") + } + return ps, lazyMatchers, nil +} diff --git a/pkg/store/lazy_postings_test.go b/pkg/store/lazy_postings_test.go new file mode 100644 index 0000000000..ec1ac600d3 --- /dev/null +++ b/pkg/store/lazy_postings_test.go @@ -0,0 +1,473 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package store + +import ( + "context" + "path" + "testing" + + "github.com/efficientgo/core/testutil" + "github.com/go-kit/log" + "github.com/oklog/ulid" + "github.com/pkg/errors" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/tsdb" + "github.com/prometheus/prometheus/tsdb/index" + + "github.com/thanos-io/objstore/providers/filesystem" + "github.com/thanos-io/thanos/pkg/block/indexheader" + "github.com/thanos-io/thanos/pkg/block/metadata" +) + +func TestKeysToFetchFromPostingGroups(t *testing.T) { + for _, tc := range []struct { + name string + pgs []*postingGroup + expectedLabels []labels.Label + expectedMatchers []*labels.Matcher + }{ + { + name: "empty group", + pgs: []*postingGroup{ + { + addKeys: []string{}, + removeKeys: []string{}, + }, + }, + expectedLabels: []labels.Label{}, + }, + { + name: "empty groups", + pgs: []*postingGroup{ + { + addKeys: []string{}, + removeKeys: []string{}, + }, + { + addKeys: []string{}, + removeKeys: []string{}, + }, + { + addKeys: []string{}, + removeKeys: []string{}, + }, + }, + expectedLabels: []labels.Label{}, + }, + { + name: "group with add keys", + pgs: []*postingGroup{ + { + name: "test", + addKeys: []string{"foo", "bar"}, + removeKeys: []string{}, + }, + }, + expectedLabels: []labels.Label{{Name: "test", Value: "foo"}, {Name: "test", Value: "bar"}}, + }, + { + name: "group with remove keys", + pgs: []*postingGroup{ + { + name: "test", + addKeys: []string{}, + removeKeys: []string{"foo", "bar"}, + }, + }, + expectedLabels: []labels.Label{{Name: "test", Value: "foo"}, {Name: "test", Value: "bar"}}, + }, + { + name: "group with both add and remove keys", + pgs: []*postingGroup{ + { + name: "test", + addKeys: []string{"foo", "bar"}, + removeKeys: []string{"a", "b"}, + }, + }, + expectedLabels: []labels.Label{ + {Name: "test", Value: "foo"}, {Name: "test", Value: "bar"}, + {Name: "test", Value: "a"}, {Name: "test", Value: "b"}, + }, + }, + { + name: "groups with both add keys", + pgs: []*postingGroup{ + { + name: "test", + addKeys: []string{"foo", "bar"}, + }, + { + name: "foo", + addKeys: []string{"bar"}, + }, + }, + expectedLabels: []labels.Label{ + {Name: "test", Value: "foo"}, {Name: "test", Value: "bar"}, + {Name: "foo", Value: "bar"}, + }, + }, + { + name: "groups with add and remove keys", + pgs: []*postingGroup{ + { + name: "test", + addKeys: []string{"foo", "bar"}, + }, + { + name: "foo", + removeKeys: []string{"bar"}, + }, + }, + expectedLabels: []labels.Label{ + {Name: "test", Value: "foo"}, {Name: "test", Value: "bar"}, + {Name: "foo", Value: "bar"}, + }, + }, + { + name: "lazy posting group with empty matchers", + pgs: []*postingGroup{ + { + name: "test", + addKeys: []string{"foo", "bar"}, + matchers: []*labels.Matcher{}, + lazy: true, + }, + }, + expectedLabels: []labels.Label{}, + expectedMatchers: []*labels.Matcher{}, + }, + { + name: "lazy posting group", + pgs: []*postingGroup{ + { + name: "test", + addKeys: []string{"foo", "bar"}, + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")}, + lazy: true, + }, + }, + expectedLabels: []labels.Label{}, + expectedMatchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")}, + }, + { + name: "multiple lazy posting groups", + pgs: []*postingGroup{ + { + name: "test", + addKeys: []string{"foo", "bar"}, + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")}, + lazy: true, + }, + { + name: "job", + addKeys: []string{"prometheus"}, + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchRegexp, "job", "prometheus.*")}, + lazy: true, + }, + }, + expectedLabels: []labels.Label{}, + expectedMatchers: []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"), + labels.MustNewMatcher(labels.MatchRegexp, "job", "prometheus.*"), + }, + }, + { + name: "multiple non lazy and lazy posting groups", + pgs: []*postingGroup{ + { + name: "test", + addKeys: []string{"foo", "bar"}, + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")}, + }, + { + name: "test", + addKeys: []string{"foo", "bar"}, + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")}, + lazy: true, + }, + { + name: "job", + addKeys: []string{"prometheus"}, + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchRegexp, "job", "prometheus.*")}, + lazy: true, + }, + }, + expectedLabels: []labels.Label{{Name: "test", Value: "foo"}, {Name: "test", Value: "bar"}}, + expectedMatchers: []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"), + labels.MustNewMatcher(labels.MatchRegexp, "job", "prometheus.*"), + }, + }, + } { + t.Run(tc.name, func(t *testing.T) { + keys, matchers := keysToFetchFromPostingGroups(tc.pgs) + testutil.Equals(t, tc.expectedLabels, keys) + testutil.Equals(t, tc.expectedMatchers, matchers) + }) + } +} + +type mockIndexHeaderReader struct { + postings map[string]map[string]index.Range + err error +} + +func (h *mockIndexHeaderReader) Close() error { return nil } + +func (h *mockIndexHeaderReader) IndexVersion() (int, error) { return 0, nil } + +func (h *mockIndexHeaderReader) PostingsOffsets(name string, value ...string) ([]index.Range, error) { + ranges := make([]index.Range, 0) + if _, ok := h.postings[name]; !ok { + return nil, nil + } + for _, val := range value { + if rng, ok := h.postings[name][val]; ok { + ranges = append(ranges, rng) + } else { + ranges = append(ranges, indexheader.NotFoundRange) + } + } + return ranges, h.err +} + +func (h *mockIndexHeaderReader) PostingsOffset(name string, value string) (index.Range, error) { + return index.Range{}, nil +} + +func (h *mockIndexHeaderReader) LookupSymbol(o uint32) (string, error) { return "", nil } + +func (h *mockIndexHeaderReader) LabelValues(name string) ([]string, error) { return nil, nil } + +func (h *mockIndexHeaderReader) LabelNames() ([]string, error) { return nil, nil } + +func TestOptimizePostingsFetchByDownloadedBytes(t *testing.T) { + ctx := context.Background() + logger := log.NewNopLogger() + dir := t.TempDir() + bkt, err := filesystem.NewBucket(dir) + testutil.Ok(t, err) + defer func() { testutil.Ok(t, bkt.Close()) }() + + inputError := errors.New("random") + blockID := ulid.MustNew(1, nil) + meta := &metadata.Meta{ + BlockMeta: tsdb.BlockMeta{ULID: blockID}, + Thanos: metadata.Thanos{ + Labels: map[string]string{ + "a": "b", + "c": "d", + }, + }, + } + for _, tc := range []struct { + name string + inputPostings map[string]map[string]index.Range + inputError error + postingGroups []*postingGroup + seriesMaxSize int64 + seriesMatchRatio float64 + expectedPostingGroups []*postingGroup + expectedEmptyPosting bool + expectedError string + }{ + { + name: "empty posting group", + }, + { + name: "one posting group", + postingGroups: []*postingGroup{ + {name: "foo"}, + }, + expectedPostingGroups: []*postingGroup{ + {name: "foo"}, + }, + }, + { + name: "posting offsets return error", + inputPostings: map[string]map[string]index.Range{ + "foo": {"bar": index.Range{End: 8}}, + "bar": {"foo": index.Range{Start: 8, End: 16}}, + }, + inputError: inputError, + seriesMaxSize: 1000, + seriesMatchRatio: 0.5, + postingGroups: []*postingGroup{ + {name: "foo", addKeys: []string{"bar"}}, + {name: "bar", addKeys: []string{"foo"}}, + }, + expectedPostingGroups: nil, + expectedError: "postings offsets for foo: random", + }, + { + name: "posting offsets empty", + inputPostings: map[string]map[string]index.Range{ + "foo": {"bar": index.Range{End: 8}}, + }, + seriesMaxSize: 1000, + seriesMatchRatio: 0.5, + postingGroups: []*postingGroup{ + {name: "foo", addKeys: []string{"bar"}}, + {name: "bar", addKeys: []string{"foo"}}, + }, + expectedPostingGroups: nil, + expectedEmptyPosting: true, + }, + { + name: "posting group label doesn't exist", + inputPostings: map[string]map[string]index.Range{ + "foo": {"bar": index.Range{End: 8}}, + }, + seriesMaxSize: 1000, + seriesMatchRatio: 0.5, + postingGroups: []*postingGroup{ + {name: "foo", addKeys: []string{"bar"}}, + {name: "bar", addKeys: []string{"foo"}}, + }, + expectedPostingGroups: nil, + expectedEmptyPosting: true, + }, + { + name: "posting group keys partial exist", + inputPostings: map[string]map[string]index.Range{ + "foo": {"bar": index.Range{End: 8}}, + "bar": {"foo": index.Range{Start: 8, End: 16}}, + }, + seriesMaxSize: 1000, + seriesMatchRatio: 0.5, + postingGroups: []*postingGroup{ + {name: "foo", addKeys: []string{"bar"}}, + {name: "bar", addKeys: []string{"foo", "buz"}}, + }, + expectedPostingGroups: []*postingGroup{ + {name: "bar", addKeys: []string{"foo", "buz"}, cardinality: 1}, + {name: "foo", addKeys: []string{"bar"}, cardinality: 1}, + }, + }, + { + name: "two posting groups with add keys, small postings and large series size", + inputPostings: map[string]map[string]index.Range{ + "foo": {"bar": index.Range{End: 8}}, + "bar": {"foo": index.Range{Start: 8, End: 16}}, + }, + seriesMaxSize: 1000, + seriesMatchRatio: 0.5, + postingGroups: []*postingGroup{ + {name: "foo", addKeys: []string{"bar"}}, + {name: "bar", addKeys: []string{"foo"}}, + }, + expectedPostingGroups: []*postingGroup{ + {name: "bar", addKeys: []string{"foo"}, cardinality: 1}, + {name: "foo", addKeys: []string{"bar"}, cardinality: 1}, + }, + }, + { + name: "two posting groups with remove keys, small postings and large series size", + inputPostings: map[string]map[string]index.Range{ + "foo": {"bar": index.Range{End: 8}}, + "bar": {"foo": index.Range{Start: 8, End: 16}}, + }, + seriesMaxSize: 1000, + seriesMatchRatio: 0.5, + postingGroups: []*postingGroup{ + {name: "foo", removeKeys: []string{"bar"}}, + {name: "bar", removeKeys: []string{"foo"}}, + }, + expectedPostingGroups: []*postingGroup{ + {name: "bar", removeKeys: []string{"foo"}, cardinality: 1}, + {name: "foo", removeKeys: []string{"bar"}, cardinality: 1}, + }, + }, + { + name: "two posting groups with add keys, very small series size, making one posting group lazy", + inputPostings: map[string]map[string]index.Range{ + "foo": {"bar": index.Range{End: 8}}, + "bar": {"foo": index.Range{Start: 8, End: 16}}, + }, + seriesMaxSize: 1, + seriesMatchRatio: 0.5, + postingGroups: []*postingGroup{ + {name: "foo", addKeys: []string{"bar"}}, + {name: "bar", addKeys: []string{"foo"}}, + }, + expectedPostingGroups: []*postingGroup{ + {name: "bar", addKeys: []string{"foo"}, cardinality: 1}, + {name: "foo", addKeys: []string{"bar"}, cardinality: 1, lazy: true}, + }, + }, + { + name: "two posting groups with add keys, one small posting group and a very large posting group, large one become lazy", + inputPostings: map[string]map[string]index.Range{ + "foo": {"bar": index.Range{End: 8}}, + "bar": {"foo": index.Range{Start: 8, End: 1000012}}, + }, + seriesMaxSize: 1000, + seriesMatchRatio: 0.5, + postingGroups: []*postingGroup{ + {name: "foo", addKeys: []string{"bar"}}, + {name: "bar", addKeys: []string{"foo"}}, + }, + expectedPostingGroups: []*postingGroup{ + {name: "foo", addKeys: []string{"bar"}, cardinality: 1}, + {name: "bar", addKeys: []string{"foo"}, cardinality: 250000, lazy: true}, + }, + }, + { + name: "three posting groups with add keys, two small posting group and a very large posting group, large one become lazy", + inputPostings: map[string]map[string]index.Range{ + "foo": {"bar": index.Range{End: 8}}, + "bar": {"foo": index.Range{Start: 8, End: 1000012}}, + "cluster": {"us": index.Range{Start: 1000012, End: 1000020}}, + }, + seriesMaxSize: 1000, + seriesMatchRatio: 0.5, + postingGroups: []*postingGroup{ + {name: "foo", addKeys: []string{"bar"}}, + {name: "bar", addKeys: []string{"foo"}}, + {name: "cluster", addKeys: []string{"us"}}, + }, + expectedPostingGroups: []*postingGroup{ + {name: "cluster", addKeys: []string{"us"}, cardinality: 1}, + {name: "foo", addKeys: []string{"bar"}, cardinality: 1}, + {name: "bar", addKeys: []string{"foo"}, cardinality: 250000, lazy: true}, + }, + }, + { + name: "three posting groups with either add or remove keys, two small posting group and a very large posting group, large one become lazy", + inputPostings: map[string]map[string]index.Range{ + "foo": {"bar": index.Range{End: 8}}, + "bar": {"foo": index.Range{Start: 8, End: 1000012}}, + "cluster": {"us": index.Range{Start: 1000012, End: 1000020}}, + }, + seriesMaxSize: 1000, + seriesMatchRatio: 0.5, + postingGroups: []*postingGroup{ + {name: "foo", removeKeys: []string{"bar"}}, + {name: "bar", removeKeys: []string{"foo"}}, + {name: "cluster", addKeys: []string{"us"}}, + }, + expectedPostingGroups: []*postingGroup{ + {name: "cluster", addKeys: []string{"us"}, cardinality: 1}, + {name: "foo", removeKeys: []string{"bar"}, cardinality: 1}, + {name: "bar", removeKeys: []string{"foo"}, cardinality: 250000, lazy: true}, + }, + }, + } { + t.Run(tc.name, func(t *testing.T) { + headerReader := &mockIndexHeaderReader{postings: tc.inputPostings, err: tc.inputError} + block, err := newBucketBlock(ctx, logger, newBucketStoreMetrics(nil), meta, bkt, path.Join(dir, blockID.String()), nil, nil, headerReader, nil, nil, nil) + testutil.Ok(t, err) + ir := newBucketIndexReader(block) + pgs, emptyPosting, err := optimizePostingsFetchByDownloadedBytes(ir, tc.postingGroups, tc.seriesMaxSize, tc.seriesMatchRatio) + if err != nil { + testutil.Equals(t, tc.expectedError, err.Error()) + return + } + testutil.Equals(t, tc.expectedEmptyPosting, emptyPosting) + testutil.Equals(t, tc.expectedPostingGroups, pgs) + }) + } +} diff --git a/pkg/testutil/e2eutil/prometheus.go b/pkg/testutil/e2eutil/prometheus.go index bf7e900a9b..9da879de82 100644 --- a/pkg/testutil/e2eutil/prometheus.go +++ b/pkg/testutil/e2eutil/prometheus.go @@ -24,6 +24,7 @@ import ( "testing" "time" + "github.com/efficientgo/core/testutil" "github.com/go-kit/log" "github.com/oklog/ulid" "github.com/pkg/errors" @@ -38,8 +39,6 @@ import ( "go.uber.org/atomic" "golang.org/x/sync/errgroup" - "github.com/efficientgo/core/testutil" - "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/runutil" ) @@ -446,15 +445,16 @@ func createBlockWithDelay(ctx context.Context, dir string, series []labels.Label return ulid.ULID{}, errors.Wrap(err, "create block id") } - m, err := metadata.ReadFromDir(path.Join(dir, blockID.String())) + bdir := path.Join(dir, blockID.String()) + m, err := metadata.ReadFromDir(bdir) if err != nil { return ulid.ULID{}, errors.Wrap(err, "open meta file") } + logger := log.NewNopLogger() m.ULID = id m.Compaction.Sources = []ulid.ULID{id} - - if err := m.WriteToDir(log.NewNopLogger(), path.Join(dir, blockID.String())); err != nil { + if err := m.WriteToDir(logger, path.Join(dir, blockID.String())); err != nil { return ulid.ULID{}, errors.Wrap(err, "write meta.json file") } @@ -555,6 +555,11 @@ func createBlock( } blockDir := filepath.Join(dir, id.String()) + logger := log.NewNopLogger() + seriesSize, err := gatherMaxSeriesSize(filepath.Join(blockDir, "index")) + if err != nil { + return id, errors.Wrap(err, "gather max series size") + } files := []metadata.File{} if hashFunc != metadata.NoneFunc { @@ -581,11 +586,12 @@ func createBlock( } } - if _, err = metadata.InjectThanos(log.NewNopLogger(), blockDir, metadata.Thanos{ + if _, err = metadata.InjectThanos(logger, blockDir, metadata.Thanos{ Labels: extLset.Map(), Downsample: metadata.ThanosDownsample{Resolution: resolution}, Source: metadata.TestSource, Files: files, + IndexStats: metadata.IndexStats{SeriesMaxSize: seriesSize}, }, nil); err != nil { return id, errors.Wrap(err, "finalize block") } @@ -599,6 +605,49 @@ func createBlock( return id, nil } +func gatherMaxSeriesSize(fn string) (int64, error) { + r, err := index.NewFileReader(fn) + if err != nil { + return 0, errors.Wrap(err, "open index file") + } + defer runutil.CloseWithErrCapture(&err, r, "gather index issue file reader") + + p, err := r.Postings(index.AllPostingsKey()) + if err != nil { + return 0, errors.Wrap(err, "get all postings") + } + + // As of version two all series entries are 16 byte padded. All references + // we get have to account for that to get the correct offset. + offsetMultiplier := 1 + version := r.Version() + if version >= 2 { + offsetMultiplier = 16 + } + + // Per series. + var ( + prevId storage.SeriesRef + maxSeriesSize int64 + ) + for p.Next() { + id := p.At() + if prevId != 0 { + // Approximate size. + seriesSize := int64(id-prevId) * int64(offsetMultiplier) + if seriesSize > maxSeriesSize { + maxSeriesSize = seriesSize + } + } + prevId = id + } + if p.Err() != nil { + return 0, errors.Wrap(err, "walk postings") + } + + return maxSeriesSize, nil +} + var indexFilename = "index" type indexWriterSeries struct { diff --git a/test/e2e/store_gateway_test.go b/test/e2e/store_gateway_test.go index d3c3de945e..9fec13287b 100644 --- a/test/e2e/store_gateway_test.go +++ b/test/e2e/store_gateway_test.go @@ -11,6 +11,7 @@ import ( "os" "path" "path/filepath" + "strconv" "strings" "testing" "time" @@ -878,7 +879,7 @@ config: testutil.Ok(t, runutil.RetryWithLog(log.NewLogfmtLogger(os.Stdout), 5*time.Second, ctx.Done(), func() error { if _, _, _, err := promclient.NewDefaultClient().QueryInstant(ctx, urlParse(t, "http://"+q1.Endpoint("http")), testQuery, now, opts); err != nil { e := err.Error() - if strings.Contains(e, "expanded matching posting: get postings") && strings.Contains(e, "exceeded bytes limit while fetching postings: limit 1 violated") { + if strings.Contains(e, "expanded matching posting: fetch and expand postings") && strings.Contains(e, "exceeded bytes limit while fetching postings: limit 1 violated") { return nil } return err @@ -1047,3 +1048,146 @@ config: testutil.Ok(t, s1.WaitSumMetricsWithOptions(e2emon.Equals(1), []string{`thanos_store_index_cache_hits_total`}, e2emon.WithLabelMatchers(matchers.MustNewMatcher(matchers.MatchEqual, "item_type", "ExpandedPostings")))) }) } + +func TestStoreGatewayLazyExpandedPostingsEnabled(t *testing.T) { + t.Parallel() + + e, err := e2e.NewDockerEnvironment("memcached-exp") + testutil.Ok(t, err) + t.Cleanup(e2ethanos.CleanScenario(t, e)) + + const bucket = "store-gateway-lazy-expanded-postings-test" + m := e2edb.NewMinio(e, "thanos-minio", bucket, e2edb.WithMinioTLS()) + testutil.Ok(t, e2e.StartAndWaitReady(m)) + + // Create 2 store gateways, one with lazy expanded postings enabled and another one disabled. + s1 := e2ethanos.NewStoreGW( + e, + "1", + client.BucketConfig{ + Type: client.S3, + Config: e2ethanos.NewS3Config(bucket, m.InternalEndpoint("http"), m.InternalDir()), + }, + "", + "", + []string{"--store.enable-lazy-expanded-postings"}, + ) + s2 := e2ethanos.NewStoreGW( + e, + "2", + client.BucketConfig{ + Type: client.S3, + Config: e2ethanos.NewS3Config(bucket, m.InternalEndpoint("http"), m.InternalDir()), + }, + "", + "", + nil, + ) + testutil.Ok(t, e2e.StartAndWaitReady(s1, s2)) + + q1 := e2ethanos.NewQuerierBuilder(e, "1", s1.InternalEndpoint("grpc")).Init() + q2 := e2ethanos.NewQuerierBuilder(e, "2", s2.InternalEndpoint("grpc")).Init() + testutil.Ok(t, e2e.StartAndWaitReady(q1, q2)) + + dir := filepath.Join(e.SharedDir(), "tmp") + testutil.Ok(t, os.MkdirAll(dir, os.ModePerm)) + + numSeries := 10000 + ss := make([]labels.Labels, 0, 10000) + for i := 0; i < numSeries; i++ { + ss = append(ss, labels.FromStrings("a", strconv.Itoa(i), "b", "1")) + } + extLset := labels.FromStrings("ext1", "value1", "replica", "1") + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute) + t.Cleanup(cancel) + + now := time.Now() + id, err := e2eutil.CreateBlockWithBlockDelay(ctx, dir, ss, 10, timestamp.FromTime(now), timestamp.FromTime(now.Add(2*time.Hour)), 30*time.Minute, extLset, 0, metadata.NoneFunc) + testutil.Ok(t, err) + + l := log.NewLogfmtLogger(os.Stdout) + bkt, err := s3.NewBucketWithConfig(l, + e2ethanos.NewS3Config(bucket, m.Endpoint("http"), m.Dir()), "test-feed") + testutil.Ok(t, err) + + testutil.Ok(t, objstore.UploadDir(ctx, l, bkt, path.Join(dir, id.String()), id.String())) + + // Wait for store to sync blocks. + // thanos_blocks_meta_synced: 1x loadedMeta 0x labelExcludedMeta 0x TooFreshMeta. + testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(1), "thanos_blocks_meta_synced")) + testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(0), "thanos_blocks_meta_sync_failures_total")) + testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(1), "thanos_bucket_store_blocks_loaded")) + testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(0), "thanos_bucket_store_block_drops_total")) + testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(0), "thanos_bucket_store_block_load_failures_total")) + + testutil.Ok(t, s2.WaitSumMetrics(e2emon.Equals(1), "thanos_blocks_meta_synced")) + testutil.Ok(t, s2.WaitSumMetrics(e2emon.Equals(0), "thanos_blocks_meta_sync_failures_total")) + testutil.Ok(t, s2.WaitSumMetrics(e2emon.Equals(1), "thanos_bucket_store_blocks_loaded")) + testutil.Ok(t, s2.WaitSumMetrics(e2emon.Equals(0), "thanos_bucket_store_block_drops_total")) + testutil.Ok(t, s2.WaitSumMetrics(e2emon.Equals(0), "thanos_bucket_store_block_load_failures_total")) + + t.Run("query with count", func(t *testing.T) { + queryAndAssert(t, ctx, q1.Endpoint("http"), func() string { return `count({b="1"})` }, + time.Now, promclient.QueryOptions{ + Deduplicate: false, + }, + model.Vector{ + { + Metric: map[model.LabelName]model.LabelValue{}, + Value: model.SampleValue(numSeries), + }, + }, + ) + + queryAndAssert(t, ctx, q2.Endpoint("http"), func() string { return `count({b="1"})` }, + time.Now, promclient.QueryOptions{ + Deduplicate: false, + }, + model.Vector{ + { + Metric: map[model.LabelName]model.LabelValue{}, + Value: model.SampleValue(numSeries), + }, + }, + ) + }) + + // We expect no lazy expanded postings as query `count({b="1"})` won't trigger the optimization. + testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(0), "thanos_bucket_store_lazy_expanded_postings_total")) + testutil.Ok(t, s2.WaitSumMetrics(e2emon.Equals(0), "thanos_bucket_store_lazy_expanded_postings_total")) + + t.Run("query specific series will trigger lazy posting", func(t *testing.T) { + queryAndAssertSeries(t, ctx, q1.Endpoint("http"), func() string { return `{a="1", b="1"}` }, + time.Now, promclient.QueryOptions{ + Deduplicate: false, + }, + []model.Metric{ + { + "a": "1", + "b": "1", + "ext1": "value1", + "replica": "1", + }, + }, + ) + + queryAndAssertSeries(t, ctx, q2.Endpoint("http"), func() string { return `{a="1", b="1"}` }, + time.Now, promclient.QueryOptions{ + Deduplicate: false, + }, + []model.Metric{ + { + "a": "1", + "b": "1", + "ext1": "value1", + "replica": "1", + }, + }, + ) + }) + + // Use greater or equal to handle flakiness. + testutil.Ok(t, s1.WaitSumMetrics(e2emon.GreaterOrEqual(1), "thanos_bucket_store_lazy_expanded_postings_total")) + testutil.Ok(t, s2.WaitSumMetrics(e2emon.Equals(0), "thanos_bucket_store_lazy_expanded_postings_total")) +} From 8fe9bd10c291df8323e7990d376251476e3ea1b8 Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Mon, 4 Sep 2023 20:58:46 -0700 Subject: [PATCH 2/6] address some review comments Signed-off-by: Ben Ye --- pkg/store/bucket.go | 23 ++--------------------- pkg/store/lazy_postings.go | 3 +++ 2 files changed, 5 insertions(+), 21 deletions(-) diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 520ddfc912..5b8bf7bbf5 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -1372,16 +1372,6 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store } // Sort matchers to make sure we generate the same cache key // when fetching expanded postings. - sort.Slice(blockMatchers, func(i, j int) bool { - if blockMatchers[i].Type == blockMatchers[j].Type { - if blockMatchers[i].Name == blockMatchers[j].Name { - return blockMatchers[i].Value < blockMatchers[j].Value - } - return blockMatchers[i].Name < blockMatchers[j].Name - } - return blockMatchers[i].Type < blockMatchers[j].Type - }) - sortedBlockMatchers := newSortedMatchers(blockMatchers) blocks := bs.getFor(req.MinTime, req.MaxTime, req.MaxResolutionWindow, reqBlockMatchers) @@ -1408,7 +1398,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store req, chunksLimiter, bytesLimiter, - blockMatchers, + sortedBlockMatchers, shardMatcher, s.enableChunkHashCalculation, s.seriesBatchSize, @@ -2657,16 +2647,7 @@ func matchersToPostingGroups(ctx context.Context, lvalsFn func(name string) ([]s matchers = append(matchers, val) } // Set and sort matchers to be used when picking up posting fetch strategy. - mergedPG.matchers = matchers - slices.SortFunc(mergedPG.matchers, func(a, b *labels.Matcher) bool { - if a.Type == b.Type { - if a.Name == b.Name { - return a.Value < b.Value - } - return a.Name < b.Name - } - return a.Type < b.Type - }) + mergedPG.matchers = newSortedMatchers(matchers) pgs = append(pgs, mergedPG) } slices.SortFunc(pgs, func(a, b *postingGroup) bool { diff --git a/pkg/store/lazy_postings.go b/pkg/store/lazy_postings.go index 6c48015429..c701b221bf 100644 --- a/pkg/store/lazy_postings.go +++ b/pkg/store/lazy_postings.go @@ -60,6 +60,9 @@ func optimizePostingsFetchByDownloadedBytes(r *bucketIndexReader, postingGroups if r == indexheader.NotFoundRange { continue } + // Each range starts from the #entries field which is 4 bytes. + // Need to subtract it when calculating number of postings. + // https://github.com/prometheus/prometheus/blob/v2.46.0/tsdb/docs/format/index.md. pg.cardinality += (r.End - r.Start - 4) / 4 } } From bf2a26f4d58786cd2158278af31082772cb223f2 Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Mon, 4 Sep 2023 21:49:27 -0700 Subject: [PATCH 3/6] add acceptance test and fixed bug of skipping posting groups with add keys Signed-off-by: Ben Ye --- pkg/store/acceptance_test.go | 148 +++++++++++++++++--------------- pkg/store/lazy_postings.go | 6 +- pkg/store/lazy_postings_test.go | 35 ++++++-- 3 files changed, 110 insertions(+), 79 deletions(-) diff --git a/pkg/store/acceptance_test.go b/pkg/store/acceptance_test.go index ecc23f5aa3..f6a5ef55ec 100644 --- a/pkg/store/acceptance_test.go +++ b/pkg/store/acceptance_test.go @@ -722,78 +722,86 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset func TestBucketStore_Acceptance(t *testing.T) { t.Cleanup(func() { custom.TolerantVerifyLeak(t) }) - testStoreAPIsAcceptance(t, func(tt *testing.T, extLset labels.Labels, appendFn func(app storage.Appender)) storepb.StoreServer { - tmpDir := tt.TempDir() - bktDir := filepath.Join(tmpDir, "bkt") - auxDir := filepath.Join(tmpDir, "aux") - metaDir := filepath.Join(tmpDir, "meta") - - testutil.Ok(tt, os.MkdirAll(metaDir, os.ModePerm)) - testutil.Ok(tt, os.MkdirAll(auxDir, os.ModePerm)) - - bkt, err := filesystem.NewBucket(bktDir) - testutil.Ok(tt, err) - tt.Cleanup(func() { testutil.Ok(tt, bkt.Close()) }) - - headOpts := tsdb.DefaultHeadOptions() - headOpts.ChunkDirRoot = tmpDir - headOpts.ChunkRange = 1000 - h, err := tsdb.NewHead(nil, nil, nil, nil, headOpts, nil) - testutil.Ok(tt, err) - tt.Cleanup(func() { testutil.Ok(tt, h.Close()) }) - logger := log.NewNopLogger() - - appendFn(h.Appender(context.Background())) - - if h.NumSeries() == 0 { - tt.Skip("Bucket Store cannot handle empty HEAD") - } - - id := createBlockFromHead(tt, auxDir, h) - - auxBlockDir := filepath.Join(auxDir, id.String()) - _, err = metadata.InjectThanos(log.NewNopLogger(), auxBlockDir, metadata.Thanos{ - Labels: extLset.Map(), - Downsample: metadata.ThanosDownsample{Resolution: 0}, - Source: metadata.TestSource, - }, nil) - testutil.Ok(tt, err) - - testutil.Ok(tt, block.Upload(context.Background(), logger, bkt, auxBlockDir, metadata.NoneFunc)) - testutil.Ok(tt, block.Upload(context.Background(), logger, bkt, auxBlockDir, metadata.NoneFunc)) - - chunkPool, err := NewDefaultChunkBytesPool(2e5) - testutil.Ok(tt, err) + for _, lazyExpandedPosting := range []bool{false, true} { + testStoreAPIsAcceptance(t, func(tt *testing.T, extLset labels.Labels, appendFn func(app storage.Appender)) storepb.StoreServer { + tmpDir := tt.TempDir() + bktDir := filepath.Join(tmpDir, "bkt") + auxDir := filepath.Join(tmpDir, "aux") + metaDir := filepath.Join(tmpDir, "meta") + + testutil.Ok(tt, os.MkdirAll(metaDir, os.ModePerm)) + testutil.Ok(tt, os.MkdirAll(auxDir, os.ModePerm)) + + bkt, err := filesystem.NewBucket(bktDir) + testutil.Ok(tt, err) + tt.Cleanup(func() { testutil.Ok(tt, bkt.Close()) }) + + headOpts := tsdb.DefaultHeadOptions() + headOpts.ChunkDirRoot = tmpDir + headOpts.ChunkRange = 1000 + h, err := tsdb.NewHead(nil, nil, nil, nil, headOpts, nil) + testutil.Ok(tt, err) + tt.Cleanup(func() { testutil.Ok(tt, h.Close()) }) + logger := log.NewNopLogger() + + appendFn(h.Appender(context.Background())) + + if h.NumSeries() == 0 { + tt.Skip("Bucket Store cannot handle empty HEAD") + } - metaFetcher, err := block.NewMetaFetcher(logger, 20, objstore.WithNoopInstr(bkt), metaDir, nil, []block.MetadataFilter{ - block.NewTimePartitionMetaFilter(allowAllFilterConf.MinTime, allowAllFilterConf.MaxTime), + id := createBlockFromHead(tt, auxDir, h) + + auxBlockDir := filepath.Join(auxDir, id.String()) + meta, err := metadata.ReadFromDir(auxBlockDir) + testutil.Ok(t, err) + stats, err := block.GatherIndexHealthStats(logger, filepath.Join(auxBlockDir, block.IndexFilename), meta.MinTime, meta.MaxTime) + testutil.Ok(t, err) + _, err = metadata.InjectThanos(log.NewNopLogger(), auxBlockDir, metadata.Thanos{ + Labels: extLset.Map(), + Downsample: metadata.ThanosDownsample{Resolution: 0}, + Source: metadata.TestSource, + IndexStats: metadata.IndexStats{SeriesMaxSize: stats.SeriesMaxSize, ChunkMaxSize: stats.ChunkMaxSize}, + }, nil) + testutil.Ok(tt, err) + + testutil.Ok(tt, block.Upload(context.Background(), logger, bkt, auxBlockDir, metadata.NoneFunc)) + testutil.Ok(tt, block.Upload(context.Background(), logger, bkt, auxBlockDir, metadata.NoneFunc)) + + chunkPool, err := NewDefaultChunkBytesPool(2e5) + testutil.Ok(tt, err) + + metaFetcher, err := block.NewMetaFetcher(logger, 20, objstore.WithNoopInstr(bkt), metaDir, nil, []block.MetadataFilter{ + block.NewTimePartitionMetaFilter(allowAllFilterConf.MinTime, allowAllFilterConf.MaxTime), + }) + testutil.Ok(tt, err) + + bucketStore, err := NewBucketStore( + objstore.WithNoopInstr(bkt), + metaFetcher, + "", + NewChunksLimiterFactory(10e6), + NewSeriesLimiterFactory(10e6), + NewBytesLimiterFactory(10e6), + NewGapBasedPartitioner(PartitionerMaxGapSize), + 20, + true, + DefaultPostingOffsetInMemorySampling, + false, + false, + 1*time.Minute, + WithChunkPool(chunkPool), + WithFilterConfig(allowAllFilterConf), + WithLazyExpandedPostings(lazyExpandedPosting), + ) + testutil.Ok(tt, err) + tt.Cleanup(func() { testutil.Ok(tt, bucketStore.Close()) }) + + testutil.Ok(tt, bucketStore.SyncBlocks(context.Background())) + + return bucketStore }) - testutil.Ok(tt, err) - - bucketStore, err := NewBucketStore( - objstore.WithNoopInstr(bkt), - metaFetcher, - "", - NewChunksLimiterFactory(10e6), - NewSeriesLimiterFactory(10e6), - NewBytesLimiterFactory(10e6), - NewGapBasedPartitioner(PartitionerMaxGapSize), - 20, - true, - DefaultPostingOffsetInMemorySampling, - false, - false, - 1*time.Minute, - WithChunkPool(chunkPool), - WithFilterConfig(allowAllFilterConf), - ) - testutil.Ok(tt, err) - tt.Cleanup(func() { testutil.Ok(tt, bucketStore.Close()) }) - - testutil.Ok(tt, bucketStore.SyncBlocks(context.Background())) - - return bucketStore - }) + } } func TestPrometheusStore_Acceptance(t *testing.T) { diff --git a/pkg/store/lazy_postings.go b/pkg/store/lazy_postings.go index c701b221bf..4d3fadc983 100644 --- a/pkg/store/lazy_postings.go +++ b/pkg/store/lazy_postings.go @@ -118,12 +118,16 @@ func optimizePostingsFetchByDownloadedBytes(r *bucketIndexReader, postingGroups seriesBytesToFetch := postingGroups[0].cardinality * seriesMaxSize p := float64(1) i := 1 // Start from index 1 as we always need to fetch the smallest posting group. + hasAdd := !postingGroups[0].addAll for i < len(postingGroups) { pg := postingGroups[i] // Need to fetch more data on postings than series we avoid fetching, stop here and lazy expanding rest of matchers. - if pg.cardinality*4 > int64(p*math.Ceil((1-seriesMatchRatio)*float64(seriesBytesToFetch))) { + // If there is no posting group with add keys, don't skip any posting group until we have one. + // Fetch posting group with addAll is much more expensive due to fetch all postings. + if hasAdd && pg.cardinality*4 > int64(p*math.Ceil((1-seriesMatchRatio)*float64(seriesBytesToFetch))) { break } + hasAdd = hasAdd || !pg.addAll p = p * seriesMatchRatio i++ } diff --git a/pkg/store/lazy_postings_test.go b/pkg/store/lazy_postings_test.go index ec1ac600d3..f2c20727fc 100644 --- a/pkg/store/lazy_postings_test.go +++ b/pkg/store/lazy_postings_test.go @@ -365,6 +365,8 @@ func TestOptimizePostingsFetchByDownloadedBytes(t *testing.T) { }, }, { + // This test case won't be optimized in real case because it is add all + // so doesn't make sense to optimize postings fetching anyway. name: "two posting groups with remove keys, small postings and large series size", inputPostings: map[string]map[string]index.Range{ "foo": {"bar": index.Range{End: 8}}, @@ -373,12 +375,29 @@ func TestOptimizePostingsFetchByDownloadedBytes(t *testing.T) { seriesMaxSize: 1000, seriesMatchRatio: 0.5, postingGroups: []*postingGroup{ - {name: "foo", removeKeys: []string{"bar"}}, - {name: "bar", removeKeys: []string{"foo"}}, + {addAll: true, name: "foo", removeKeys: []string{"bar"}}, + {addAll: true, name: "bar", removeKeys: []string{"foo"}}, }, expectedPostingGroups: []*postingGroup{ - {name: "bar", removeKeys: []string{"foo"}, cardinality: 1}, - {name: "foo", removeKeys: []string{"bar"}, cardinality: 1}, + {addAll: true, name: "bar", removeKeys: []string{"foo"}, cardinality: 1}, + {addAll: true, name: "foo", removeKeys: []string{"bar"}, cardinality: 1}, + }, + }, + { + name: "one group with remove keys and another one with add keys. Always add the addKeys posting group to avoid fetching all postings", + inputPostings: map[string]map[string]index.Range{ + "foo": {"bar": index.Range{End: 8}}, + "bar": {"foo": index.Range{Start: 8, End: 1000012}}, + }, + seriesMaxSize: 1000, + seriesMatchRatio: 0.5, + postingGroups: []*postingGroup{ + {addAll: true, name: "foo", removeKeys: []string{"bar"}}, + {name: "bar", addKeys: []string{"foo"}}, + }, + expectedPostingGroups: []*postingGroup{ + {addAll: true, name: "foo", removeKeys: []string{"bar"}, cardinality: 1}, + {name: "bar", addKeys: []string{"foo"}, cardinality: 250000}, }, }, { @@ -445,14 +464,14 @@ func TestOptimizePostingsFetchByDownloadedBytes(t *testing.T) { seriesMaxSize: 1000, seriesMatchRatio: 0.5, postingGroups: []*postingGroup{ - {name: "foo", removeKeys: []string{"bar"}}, - {name: "bar", removeKeys: []string{"foo"}}, + {addAll: true, name: "foo", removeKeys: []string{"bar"}}, + {addAll: true, name: "bar", removeKeys: []string{"foo"}}, {name: "cluster", addKeys: []string{"us"}}, }, expectedPostingGroups: []*postingGroup{ {name: "cluster", addKeys: []string{"us"}, cardinality: 1}, - {name: "foo", removeKeys: []string{"bar"}, cardinality: 1}, - {name: "bar", removeKeys: []string{"foo"}, cardinality: 250000, lazy: true}, + {addAll: true, name: "foo", removeKeys: []string{"bar"}, cardinality: 1}, + {addAll: true, name: "bar", removeKeys: []string{"foo"}, cardinality: 250000, lazy: true}, }, }, } { From 24c598d4c596a9dfb84ac789b4863cfd1dd43c25 Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Tue, 5 Sep 2023 01:40:22 -0700 Subject: [PATCH 4/6] add lazy postings param to block series clinet Signed-off-by: Ben Ye --- pkg/store/bucket.go | 26 ++++++++++++++++---------- pkg/store/bucket_test.go | 4 +++- 2 files changed, 19 insertions(+), 11 deletions(-) diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 5b8bf7bbf5..fdcd220ede 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -913,6 +913,9 @@ type blockSeriesClient struct { chunksLimiter ChunksLimiter bytesLimiter BytesLimiter + lazyExpandedPostingEnabled bool + lazyExpandedPostingsCount prometheus.Counter + skipChunks bool shardMatcher *storepb.ShardMatcher blockMatchers []*labels.Matcher @@ -944,6 +947,8 @@ func newBlockSeriesClient( batchSize int, chunkFetchDuration prometheus.Histogram, extLsetToRemove map[string]struct{}, + lazyExpandedPostingEnabled bool, + lazyExpandedPostingsCount prometheus.Counter, ) *blockSeriesClient { var chunkr *bucketChunkReader if !req.SkipChunks { @@ -970,6 +975,9 @@ func newBlockSeriesClient( skipChunks: req.SkipChunks, chunkFetchDuration: chunkFetchDuration, + lazyExpandedPostingEnabled: lazyExpandedPostingEnabled, + lazyExpandedPostingsCount: lazyExpandedPostingsCount, + loadAggregates: req.Aggregates, shardMatcher: shardMatcher, blockMatchers: blockMatchers, @@ -1014,10 +1022,8 @@ func newSortedMatchers(matchers []*labels.Matcher) sortedMatchers { func (b *blockSeriesClient) ExpandPostings( matchers sortedMatchers, seriesLimiter SeriesLimiter, - lazyExpandedPostingEnabled bool, - lazyExpandedPostingsCount prometheus.Counter, ) error { - ps, err := b.indexr.ExpandedPostings(b.ctx, matchers, b.bytesLimiter, lazyExpandedPostingEnabled) + ps, err := b.indexr.ExpandedPostings(b.ctx, matchers, b.bytesLimiter, b.lazyExpandedPostingEnabled) if err != nil { return errors.Wrap(err, "expanded matching posting") } @@ -1040,7 +1046,7 @@ func (b *blockSeriesClient) ExpandPostings( if b.lazyPostings.lazyExpanded() { // Assume lazy expansion could cut actual expanded postings length to 50%. b.expandedPostings = make([]storage.SeriesRef, 0, len(b.lazyPostings.postings)/2) - lazyExpandedPostingsCount.Inc() + b.lazyExpandedPostingsCount.Inc() } b.entries = make([]seriesEntry, 0, b.batchSize) return nil @@ -1404,6 +1410,8 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store s.seriesBatchSize, s.metrics.chunkFetchDuration, extLsetToRemove, + s.enabledLazyExpandedPostings, + s.metrics.lazyExpandedPostingsCount, ) defer blockClient.Close() @@ -1426,8 +1434,6 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store if err := blockClient.ExpandPostings( sortedBlockMatchers, seriesLimiter, - s.enabledLazyExpandedPostings, - s.metrics.lazyExpandedPostingsCount, ); err != nil { onClose() span.Finish() @@ -1708,14 +1714,14 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq SeriesBatchSize, s.metrics.chunkFetchDuration, nil, + s.enabledLazyExpandedPostings, + s.metrics.lazyExpandedPostingsCount, ) defer blockClient.Close() if err := blockClient.ExpandPostings( sortedReqSeriesMatchersNoExtLabels, seriesLimiter, - s.enabledLazyExpandedPostings, - s.metrics.lazyExpandedPostingsCount, ); err != nil { return err } @@ -1939,14 +1945,14 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR SeriesBatchSize, s.metrics.chunkFetchDuration, nil, + s.enabledLazyExpandedPostings, + s.metrics.lazyExpandedPostingsCount, ) defer blockClient.Close() if err := blockClient.ExpandPostings( sortedReqSeriesMatchersNoExtLabels, seriesLimiter, - s.enabledLazyExpandedPostings, - s.metrics.lazyExpandedPostingsCount, ); err != nil { return err } diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index 79edc35ebd..c190bf82dd 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -2768,8 +2768,10 @@ func benchmarkBlockSeriesWithConcurrency(b *testing.B, concurrency int, blockMet SeriesBatchSize, dummyHistogram, nil, + false, + dummyCounter, ) - testutil.Ok(b, blockClient.ExpandPostings(sortedMatchers, seriesLimiter, false, dummyCounter)) + testutil.Ok(b, blockClient.ExpandPostings(sortedMatchers, seriesLimiter)) defer blockClient.Close() // Ensure at least 1 series has been returned (as expected). From ca8a19d007fdd1df8ee6dc2c20dfa07983a06182 Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Tue, 5 Sep 2023 21:17:17 -0700 Subject: [PATCH 5/6] switch to use block estimated max series size Signed-off-by: Ben Ye --- pkg/store/lazy_postings.go | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/pkg/store/lazy_postings.go b/pkg/store/lazy_postings.go index 4d3fadc983..29736d45c9 100644 --- a/pkg/store/lazy_postings.go +++ b/pkg/store/lazy_postings.go @@ -151,18 +151,19 @@ func fetchLazyExpandedPostings( emptyPostingGroup bool ) /* - There are several cases that we skip postings fetch optimization: - - Lazy expanded posting disabled. - - Add all postings. This means we don't have a posting group with any add keys. - - `SeriesMaxSize` not set for this block then we have no way to estimate series size. - - Only one effective posting group available. We need to at least download postings from 1 posting group so no need to optimize. + There are several cases that we skip postings fetch optimization: + - Lazy expanded posting disabled. + - Add all postings. This means we don't have a posting group with any add keys. + - Block estimated max series size not set which means we don't have a way to estimate series bytes downloaded. + - `SeriesMaxSize` not set for this block then we have no way to estimate series size. + - Only one effective posting group available. We need to at least download postings from 1 posting group so no need to optimize. */ if lazyExpandedPostingEnabled && !addAllPostings && - r.block.meta.Thanos.IndexStats.SeriesMaxSize > 0 && len(postingGroups) > 1 { + r.block.estimatedMaxSeriesSize > 0 && len(postingGroups) > 1 { postingGroups, emptyPostingGroup, err = optimizePostingsFetchByDownloadedBytes( r, postingGroups, - r.block.meta.Thanos.IndexStats.SeriesMaxSize, + int64(r.block.estimatedMaxSeriesSize), 0.5, // TODO(yeya24): Expose this as a flag. ) if err != nil { From 80685af3746811bbf37b84d6fd9e2c44e7cf813a Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Fri, 8 Sep 2023 23:53:24 -0700 Subject: [PATCH 6/6] added two more metrics Signed-off-by: Ben Ye --- pkg/store/bucket.go | 86 ++++++++++++++++++++++----------- pkg/store/bucket_test.go | 13 +++-- pkg/store/lazy_postings.go | 6 ++- pkg/store/lazy_postings_test.go | 16 +++++- 4 files changed, 85 insertions(+), 36 deletions(-) diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index fdcd220ede..4a8eae4572 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -117,27 +117,30 @@ var ( ) type bucketStoreMetrics struct { - blocksLoaded prometheus.Gauge - blockLoads prometheus.Counter - blockLoadFailures prometheus.Counter - lastLoadedBlock prometheus.Gauge - blockDrops prometheus.Counter - blockDropFailures prometheus.Counter - seriesDataTouched *prometheus.HistogramVec - seriesDataFetched *prometheus.HistogramVec - seriesDataSizeTouched *prometheus.HistogramVec - seriesDataSizeFetched *prometheus.HistogramVec - seriesBlocksQueried prometheus.Histogram - seriesGetAllDuration prometheus.Histogram - seriesMergeDuration prometheus.Histogram - resultSeriesCount prometheus.Histogram - chunkSizeBytes prometheus.Histogram - postingsSizeBytes prometheus.Histogram - queriesDropped *prometheus.CounterVec - seriesRefetches prometheus.Counter - chunkRefetches prometheus.Counter - emptyPostingCount prometheus.Counter - lazyExpandedPostingsCount prometheus.Counter + blocksLoaded prometheus.Gauge + blockLoads prometheus.Counter + blockLoadFailures prometheus.Counter + lastLoadedBlock prometheus.Gauge + blockDrops prometheus.Counter + blockDropFailures prometheus.Counter + seriesDataTouched *prometheus.HistogramVec + seriesDataFetched *prometheus.HistogramVec + seriesDataSizeTouched *prometheus.HistogramVec + seriesDataSizeFetched *prometheus.HistogramVec + seriesBlocksQueried prometheus.Histogram + seriesGetAllDuration prometheus.Histogram + seriesMergeDuration prometheus.Histogram + resultSeriesCount prometheus.Histogram + chunkSizeBytes prometheus.Histogram + postingsSizeBytes prometheus.Histogram + queriesDropped *prometheus.CounterVec + seriesRefetches prometheus.Counter + chunkRefetches prometheus.Counter + emptyPostingCount prometheus.Counter + + lazyExpandedPostingsCount prometheus.Counter + lazyExpandedPostingSizeBytes prometheus.Counter + lazyExpandedPostingSeriesOverfetchedSizeBytes prometheus.Counter cachedPostingsCompressions *prometheus.CounterVec cachedPostingsCompressionErrors *prometheus.CounterVec @@ -305,7 +308,17 @@ func newBucketStoreMetrics(reg prometheus.Registerer) *bucketStoreMetrics { m.lazyExpandedPostingsCount = promauto.With(reg).NewCounter(prometheus.CounterOpts{ Name: "thanos_bucket_store_lazy_expanded_postings_total", - Help: "Total number of lazy expanded postings when fetching block series.", + Help: "Total number of times when lazy expanded posting optimization applies.", + }) + + m.lazyExpandedPostingSizeBytes = promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "thanos_bucket_store_lazy_expanded_posting_size_bytes_total", + Help: "Total number of lazy posting group size in bytes.", + }) + + m.lazyExpandedPostingSeriesOverfetchedSizeBytes = promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "thanos_bucket_store_lazy_expanded_posting_series_overfetched_size_bytes_total", + Help: "Total number of series size in bytes overfetched due to posting lazy expansion.", }) return &m @@ -913,8 +926,10 @@ type blockSeriesClient struct { chunksLimiter ChunksLimiter bytesLimiter BytesLimiter - lazyExpandedPostingEnabled bool - lazyExpandedPostingsCount prometheus.Counter + lazyExpandedPostingEnabled bool + lazyExpandedPostingsCount prometheus.Counter + lazyExpandedPostingSizeBytes prometheus.Counter + lazyExpandedPostingSeriesOverfetchedSizeBytes prometheus.Counter skipChunks bool shardMatcher *storepb.ShardMatcher @@ -949,6 +964,8 @@ func newBlockSeriesClient( extLsetToRemove map[string]struct{}, lazyExpandedPostingEnabled bool, lazyExpandedPostingsCount prometheus.Counter, + lazyExpandedPostingSizeBytes prometheus.Counter, + lazyExpandedPostingSeriesOverfetchedSizeBytes prometheus.Counter, ) *blockSeriesClient { var chunkr *bucketChunkReader if !req.SkipChunks { @@ -975,8 +992,10 @@ func newBlockSeriesClient( skipChunks: req.SkipChunks, chunkFetchDuration: chunkFetchDuration, - lazyExpandedPostingEnabled: lazyExpandedPostingEnabled, - lazyExpandedPostingsCount: lazyExpandedPostingsCount, + lazyExpandedPostingEnabled: lazyExpandedPostingEnabled, + lazyExpandedPostingsCount: lazyExpandedPostingsCount, + lazyExpandedPostingSizeBytes: lazyExpandedPostingSizeBytes, + lazyExpandedPostingSeriesOverfetchedSizeBytes: lazyExpandedPostingSeriesOverfetchedSizeBytes, loadAggregates: req.Aggregates, shardMatcher: shardMatcher, @@ -1023,7 +1042,7 @@ func (b *blockSeriesClient) ExpandPostings( matchers sortedMatchers, seriesLimiter SeriesLimiter, ) error { - ps, err := b.indexr.ExpandedPostings(b.ctx, matchers, b.bytesLimiter, b.lazyExpandedPostingEnabled) + ps, err := b.indexr.ExpandedPostings(b.ctx, matchers, b.bytesLimiter, b.lazyExpandedPostingEnabled, b.lazyExpandedPostingSizeBytes) if err != nil { return errors.Wrap(err, "expanded matching posting") } @@ -1131,6 +1150,9 @@ OUTER: for _, matcher := range b.lazyPostings.matchers { val := b.lset.Get(matcher.Name) if !matcher.Matches(val) { + // Series not matched means series we overfetched due to lazy posting expansion. + seriesBytes := b.indexr.loadedSeries[postingsBatch[i]] + b.lazyExpandedPostingSeriesOverfetchedSizeBytes.Add(float64(len(seriesBytes))) continue OUTER } } @@ -1412,6 +1434,8 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store extLsetToRemove, s.enabledLazyExpandedPostings, s.metrics.lazyExpandedPostingsCount, + s.metrics.lazyExpandedPostingSizeBytes, + s.metrics.lazyExpandedPostingSeriesOverfetchedSizeBytes, ) defer blockClient.Close() @@ -1716,6 +1740,8 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq nil, s.enabledLazyExpandedPostings, s.metrics.lazyExpandedPostingsCount, + s.metrics.lazyExpandedPostingSizeBytes, + s.metrics.lazyExpandedPostingSeriesOverfetchedSizeBytes, ) defer blockClient.Close() @@ -1947,6 +1973,8 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR nil, s.enabledLazyExpandedPostings, s.metrics.lazyExpandedPostingsCount, + s.metrics.lazyExpandedPostingSizeBytes, + s.metrics.lazyExpandedPostingSeriesOverfetchedSizeBytes, ) defer blockClient.Close() @@ -2388,7 +2416,7 @@ func (r *bucketIndexReader) reset() { // Reminder: A posting is a reference (represented as a uint64) to a series reference, which in turn points to the first // chunk where the series contains the matching label-value pair for a given block of data. Postings can be fetched by // single label name=value. -func (r *bucketIndexReader) ExpandedPostings(ctx context.Context, ms sortedMatchers, bytesLimiter BytesLimiter, lazyExpandedPostingEnabled bool) (*lazyExpandedPostings, error) { +func (r *bucketIndexReader) ExpandedPostings(ctx context.Context, ms sortedMatchers, bytesLimiter BytesLimiter, lazyExpandedPostingEnabled bool, lazyExpandedPostingSizeBytes prometheus.Counter) (*lazyExpandedPostings, error) { // Shortcut the case of `len(postingGroups) == 0`. It will only happen when no // matchers specified, and we don't need to fetch expanded postings from cache. if len(ms) == 0 { @@ -2440,7 +2468,7 @@ func (r *bucketIndexReader) ExpandedPostings(ctx context.Context, ms sortedMatch postingGroups = append(postingGroups, newPostingGroup(true, name, []string{value}, nil)) } - ps, err := fetchLazyExpandedPostings(ctx, postingGroups, r, bytesLimiter, addAllPostings, lazyExpandedPostingEnabled) + ps, err := fetchLazyExpandedPostings(ctx, postingGroups, r, bytesLimiter, addAllPostings, lazyExpandedPostingEnabled, lazyExpandedPostingSizeBytes) if err != nil { return nil, errors.Wrap(err, "fetch and expand postings") } diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index c190bf82dd..82d42dd9d7 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -1221,6 +1221,7 @@ func benchmarkExpandedPostings( {`uniq=~"9|random-shuffled-values|1"`, []*labels.Matcher{iRegexBigValueSet}, bigValueSetSize}, } + dummyCounter := promauto.NewCounter(prometheus.CounterOpts{Name: "test"}) for _, c := range cases { t.Run(c.name, func(t testutil.TB) { b := &bucketBlock{ @@ -1237,7 +1238,7 @@ func benchmarkExpandedPostings( t.ResetTimer() for i := 0; i < t.N(); i++ { - p, err := indexr.ExpandedPostings(context.Background(), newSortedMatchers(c.matchers), NewBytesLimiterFactory(0)(nil), false) + p, err := indexr.ExpandedPostings(context.Background(), newSortedMatchers(c.matchers), NewBytesLimiterFactory(0)(nil), false, dummyCounter) testutil.Ok(t, err) testutil.Equals(t, c.expectedLen, len(p.postings)) } @@ -1271,7 +1272,8 @@ func TestExpandedPostingsEmptyPostings(t *testing.T) { // Match nothing. matcher2 := labels.MustNewMatcher(labels.MatchRegexp, "i", "500.*") ctx := context.Background() - ps, err := indexr.ExpandedPostings(ctx, newSortedMatchers([]*labels.Matcher{matcher1, matcher2}), NewBytesLimiterFactory(0)(nil), false) + dummyCounter := promauto.With(prometheus.NewRegistry()).NewCounter(prometheus.CounterOpts{Name: "test"}) + ps, err := indexr.ExpandedPostings(ctx, newSortedMatchers([]*labels.Matcher{matcher1, matcher2}), NewBytesLimiterFactory(0)(nil), false, dummyCounter) testutil.Ok(t, err) testutil.Equals(t, ps, (*lazyExpandedPostings)(nil)) // Make sure even if a matcher doesn't match any postings, we still cache empty expanded postings. @@ -2727,7 +2729,7 @@ func benchmarkBlockSeriesWithConcurrency(b *testing.B, concurrency int, blockMet wg := sync.WaitGroup{} wg.Add(concurrency) - dummyCounter := promauto.NewCounter(prometheus.CounterOpts{}) + dummyCounter := promauto.NewCounter(prometheus.CounterOpts{Name: "test"}) for w := 0; w < concurrency; w++ { go func() { defer wg.Done() @@ -2770,6 +2772,8 @@ func benchmarkBlockSeriesWithConcurrency(b *testing.B, concurrency int, blockMet nil, false, dummyCounter, + dummyCounter, + dummyCounter, ) testutil.Ok(b, blockClient.ExpandPostings(sortedMatchers, seriesLimiter)) defer blockClient.Close() @@ -3337,6 +3341,7 @@ func TestExpandedPostingsRace(t *testing.T) { l := sync.Mutex{} previousRefs := make(map[int][]storage.SeriesRef) + dummyCounter := promauto.With(prometheus.NewRegistry()).NewCounter(prometheus.CounterOpts{Name: "test"}) for { if tm.Err() != nil { @@ -3359,7 +3364,7 @@ func TestExpandedPostingsRace(t *testing.T) { i := i bb := bb go func(i int, bb *bucketBlock) { - refs, err := bb.indexReader().ExpandedPostings(context.Background(), m, NewBytesLimiterFactory(0)(nil), false) + refs, err := bb.indexReader().ExpandedPostings(context.Background(), m, NewBytesLimiterFactory(0)(nil), false, dummyCounter) testutil.Ok(t, err) defer wg.Done() diff --git a/pkg/store/lazy_postings.go b/pkg/store/lazy_postings.go index 29736d45c9..2e02836c0c 100644 --- a/pkg/store/lazy_postings.go +++ b/pkg/store/lazy_postings.go @@ -8,6 +8,7 @@ import ( "math" "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb/index" @@ -36,7 +37,7 @@ func (p *lazyExpandedPostings) lazyExpanded() bool { return p != nil && len(p.matchers) > 0 } -func optimizePostingsFetchByDownloadedBytes(r *bucketIndexReader, postingGroups []*postingGroup, seriesMaxSize int64, seriesMatchRatio float64) ([]*postingGroup, bool, error) { +func optimizePostingsFetchByDownloadedBytes(r *bucketIndexReader, postingGroups []*postingGroup, seriesMaxSize int64, seriesMatchRatio float64, lazyExpandedPostingSizeBytes prometheus.Counter) ([]*postingGroup, bool, error) { if len(postingGroups) <= 1 { return postingGroups, false, nil } @@ -133,6 +134,7 @@ func optimizePostingsFetchByDownloadedBytes(r *bucketIndexReader, postingGroups } for i < len(postingGroups) { postingGroups[i].lazy = true + lazyExpandedPostingSizeBytes.Add(float64(4 * postingGroups[i].cardinality)) i++ } return postingGroups, false, nil @@ -145,6 +147,7 @@ func fetchLazyExpandedPostings( bytesLimiter BytesLimiter, addAllPostings bool, lazyExpandedPostingEnabled bool, + lazyExpandedPostingSizeBytes prometheus.Counter, ) (*lazyExpandedPostings, error) { var ( err error @@ -165,6 +168,7 @@ func fetchLazyExpandedPostings( postingGroups, int64(r.block.estimatedMaxSeriesSize), 0.5, // TODO(yeya24): Expose this as a flag. + lazyExpandedPostingSizeBytes, ) if err != nil { return nil, err diff --git a/pkg/store/lazy_postings_test.go b/pkg/store/lazy_postings_test.go index f2c20727fc..7b17a59ec6 100644 --- a/pkg/store/lazy_postings_test.go +++ b/pkg/store/lazy_postings_test.go @@ -12,6 +12,9 @@ import ( "github.com/go-kit/log" "github.com/oklog/ulid" "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + promtest "github.com/prometheus/client_golang/prometheus/testutil" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/tsdb" "github.com/prometheus/prometheus/tsdb/index" @@ -477,16 +480,25 @@ func TestOptimizePostingsFetchByDownloadedBytes(t *testing.T) { } { t.Run(tc.name, func(t *testing.T) { headerReader := &mockIndexHeaderReader{postings: tc.inputPostings, err: tc.inputError} - block, err := newBucketBlock(ctx, logger, newBucketStoreMetrics(nil), meta, bkt, path.Join(dir, blockID.String()), nil, nil, headerReader, nil, nil, nil) + registry := prometheus.NewRegistry() + block, err := newBucketBlock(ctx, logger, newBucketStoreMetrics(registry), meta, bkt, path.Join(dir, blockID.String()), nil, nil, headerReader, nil, nil, nil) testutil.Ok(t, err) ir := newBucketIndexReader(block) - pgs, emptyPosting, err := optimizePostingsFetchByDownloadedBytes(ir, tc.postingGroups, tc.seriesMaxSize, tc.seriesMatchRatio) + dummyCounter := promauto.With(registry).NewCounter(prometheus.CounterOpts{Name: "test"}) + pgs, emptyPosting, err := optimizePostingsFetchByDownloadedBytes(ir, tc.postingGroups, tc.seriesMaxSize, tc.seriesMatchRatio, dummyCounter) if err != nil { testutil.Equals(t, tc.expectedError, err.Error()) return } testutil.Equals(t, tc.expectedEmptyPosting, emptyPosting) testutil.Equals(t, tc.expectedPostingGroups, pgs) + var c int64 + for _, pg := range pgs { + if pg.lazy { + c += pg.cardinality + } + } + testutil.Equals(t, float64(4*c), promtest.ToFloat64(dummyCounter)) }) } }