From 168f67b2c8ac72315b4d6bd9c4c49dbe203fffc4 Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Thu, 22 Jun 2023 00:45:11 -0700 Subject: [PATCH] optimize postings fetching by checking postings and series size Signed-off-by: Ben Ye --- cmd/thanos/store.go | 5 + docs/components/store.md | 5 + pkg/block/block_test.go | 10 +- pkg/block/indexheader/binary_reader.go | 19 +- pkg/block/indexheader/header.go | 8 +- pkg/block/indexheader/header_test.go | 32 ++ pkg/block/indexheader/lazy_binary_reader.go | 13 + pkg/store/bucket.go | 229 ++++++---- pkg/store/bucket_test.go | 199 ++++---- pkg/store/lazy_postings.go | 260 +++++++++++ pkg/store/lazy_postings_test.go | 473 ++++++++++++++++++++ pkg/testutil/e2eutil/prometheus.go | 61 ++- test/e2e/store_gateway_test.go | 105 +++++ 13 files changed, 1245 insertions(+), 174 deletions(-) create mode 100644 pkg/store/lazy_postings.go create mode 100644 pkg/store/lazy_postings_test.go diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go index 3bc0082da3e..39f97918329 100644 --- a/cmd/thanos/store.go +++ b/cmd/thanos/store.go @@ -88,6 +88,7 @@ type storeConfig struct { reqLogConfig *extflag.PathOrContent lazyIndexReaderEnabled bool lazyIndexReaderIdleTimeout time.Duration + lazyExpandedPostingsEnabled bool } func (sc *storeConfig) registerFlag(cmd extkingpin.FlagClause) { @@ -182,6 +183,9 @@ func (sc *storeConfig) registerFlag(cmd extkingpin.FlagClause) { cmd.Flag("store.index-header-lazy-reader-idle-timeout", "If index-header lazy reader is enabled and this idle timeout setting is > 0, memory map-ed index-headers will be automatically released after 'idle timeout' inactivity."). Hidden().Default("5m").DurationVar(&sc.lazyIndexReaderIdleTimeout) + cmd.Flag("store.enable-lazy-expanded-postings", "If true, Store Gateway will estimate postings size and try to lazily expand postings if it downloads less data than expanding all postings."). + Default("false").BoolVar(&sc.lazyExpandedPostingsEnabled) + cmd.Flag("web.disable", "Disable Block Viewer UI.").Default("false").BoolVar(&sc.disableWeb) cmd.Flag("web.external-prefix", "Static prefix for all HTML links and redirect URLs in the bucket web UI interface. Actual endpoints are still served on / or the web.route-prefix. This allows thanos bucket web UI to be served behind a reverse proxy that strips a URL sub-path."). @@ -382,6 +386,7 @@ func runStore( } return conf.estimatedMaxChunkSize }), + store.WithLazyExpandedPostings(conf.lazyExpandedPostingsEnabled), } if conf.debugLogging { diff --git a/docs/components/store.md b/docs/components/store.md index ac0234f1df3..26b359f3267 100644 --- a/docs/components/store.md +++ b/docs/components/store.md @@ -176,6 +176,11 @@ Flags: If true, Store Gateway will lazy memory map index-header only once the block is required by a query. + --store.enable-lazy-expanded-postings + If true, Store Gateway will estimate postings + size and try to lazily expand postings if + it downloads less data than expanding all + postings. --store.grpc.downloaded-bytes-limit=0 Maximum amount of downloaded (either fetched or touched) bytes in a single diff --git a/pkg/block/block_test.go b/pkg/block/block_test.go index 12eb5eed84b..a2712705904 100644 --- a/pkg/block/block_test.go +++ b/pkg/block/block_test.go @@ -144,7 +144,7 @@ func TestUpload(t *testing.T) { testutil.Equals(t, 3, len(bkt.Objects())) testutil.Equals(t, 3727, len(bkt.Objects()[path.Join(b1.String(), ChunksDirname, "000001")])) testutil.Equals(t, 401, len(bkt.Objects()[path.Join(b1.String(), IndexFilename)])) - testutil.Equals(t, 567, len(bkt.Objects()[path.Join(b1.String(), MetaFilename)])) + testutil.Equals(t, 595, len(bkt.Objects()[path.Join(b1.String(), MetaFilename)])) // File stats are gathered. testutil.Equals(t, fmt.Sprintf(`{ @@ -184,7 +184,9 @@ func TestUpload(t *testing.T) { "rel_path": "meta.json" } ], - "index_stats": {} + "index_stats": { + "series_max_size": 16 + } } } `, b1.String(), b1.String()), string(bkt.Objects()[path.Join(b1.String(), MetaFilename)])) @@ -195,7 +197,7 @@ func TestUpload(t *testing.T) { testutil.Equals(t, 3, len(bkt.Objects())) testutil.Equals(t, 3727, len(bkt.Objects()[path.Join(b1.String(), ChunksDirname, "000001")])) testutil.Equals(t, 401, len(bkt.Objects()[path.Join(b1.String(), IndexFilename)])) - testutil.Equals(t, 567, len(bkt.Objects()[path.Join(b1.String(), MetaFilename)])) + testutil.Equals(t, 595, len(bkt.Objects()[path.Join(b1.String(), MetaFilename)])) } { // Upload with no external labels should be blocked. @@ -227,7 +229,7 @@ func TestUpload(t *testing.T) { testutil.Equals(t, 6, len(bkt.Objects())) testutil.Equals(t, 3727, len(bkt.Objects()[path.Join(b2.String(), ChunksDirname, "000001")])) testutil.Equals(t, 401, len(bkt.Objects()[path.Join(b2.String(), IndexFilename)])) - testutil.Equals(t, 546, len(bkt.Objects()[path.Join(b2.String(), MetaFilename)])) + testutil.Equals(t, 574, len(bkt.Objects()[path.Join(b2.String(), MetaFilename)])) } } diff --git a/pkg/block/indexheader/binary_reader.go b/pkg/block/indexheader/binary_reader.go index 1befe63a7f2..16ef73ac3b0 100644 --- a/pkg/block/indexheader/binary_reader.go +++ b/pkg/block/indexheader/binary_reader.go @@ -47,6 +47,8 @@ const ( postingLengthFieldSize = 4 ) +var NotFoundRange = index.Range{Start: -1, End: -1} + // The table gets initialized with sync.Once but may still cause a race // with any other use of the crc32 package anywhere. Thus we initialize it // before. @@ -747,13 +749,18 @@ func (r *BinaryReader) IndexVersion() (int, error) { return r.indexVersion, nil } +// PostingsOffsets implements Reader. +func (r *BinaryReader) PostingsOffsets(name string, values ...string) ([]index.Range, error) { + return r.postingsOffset(name, values...) +} + // TODO(bwplotka): Get advantage of multi value offset fetch. func (r *BinaryReader) PostingsOffset(name, value string) (index.Range, error) { rngs, err := r.postingsOffset(name, value) if err != nil { return index.Range{}, err } - if len(rngs) != 1 { + if len(rngs) != 1 || rngs[0] == NotFoundRange { return index.Range{}, NotFoundRangeErr } return rngs[0], nil @@ -801,6 +808,7 @@ func (r *BinaryReader) postingsOffset(name string, values ...string) ([]index.Ra valueIndex := 0 for valueIndex < len(values) && values[valueIndex] < e.offsets[0].value { // Discard values before the start. + rngs = append(rngs, NotFoundRange) valueIndex++ } @@ -811,6 +819,9 @@ func (r *BinaryReader) postingsOffset(name string, values ...string) ([]index.Ra i := sort.Search(len(e.offsets), func(i int) bool { return e.offsets[i].value >= wantedValue }) if i == len(e.offsets) { // We're past the end. + for len(rngs) < len(values) { + rngs = append(rngs, NotFoundRange) + } break } if i > 0 && e.offsets[i].value != wantedValue { @@ -858,6 +869,8 @@ func (r *BinaryReader) postingsOffset(name string, values ...string) ([]index.Ra // Record on the way if wanted value is equal to the current value. if string(value) == wantedValue { newSameRngs = append(newSameRngs, index.Range{Start: postingOffset + postingLengthFieldSize}) + } else { + rngs = append(rngs, NotFoundRange) } valueIndex++ if valueIndex == len(values) { @@ -877,6 +890,10 @@ func (r *BinaryReader) postingsOffset(name string, values ...string) ([]index.Ra } if valueIndex != len(values) && wantedValue <= e.offsets[i+1].value { + // Increment i when wanted value is same as next offset. + if wantedValue == e.offsets[i+1].value { + i++ + } // wantedValue is smaller or same as the next offset we know about, let's iterate further to add those. continue } diff --git a/pkg/block/indexheader/header.go b/pkg/block/indexheader/header.go index 8ecef33564d..d0b4141afd8 100644 --- a/pkg/block/indexheader/header.go +++ b/pkg/block/indexheader/header.go @@ -20,10 +20,16 @@ type Reader interface { // IndexVersion returns version of index. IndexVersion() (int, error) + // PostingsOffsets returns start and end offsets for postings for given name and values. + // Input values need to be sorted. + // If the requested label name doesn't exist, then no posting and error will be returned. + // If the requested label name exists, but some values don't exist, the corresponding index range + // will be set to -1 for both start and end. + PostingsOffsets(name string, value ...string) ([]index.Range, error) + // PostingsOffset returns start and end offsets of postings for given name and value. // The end offset might be bigger than the actual posting ending, but not larger than the whole index file. // NotFoundRangeErr is returned when no index can be found for given name and value. - // TODO(bwplotka): Move to PostingsOffsets(name string, value ...string) []index.Range and benchmark. PostingsOffset(name string, value string) (index.Range, error) // LookupSymbol returns string based on given reference. diff --git a/pkg/block/indexheader/header_test.go b/pkg/block/indexheader/header_test.go index d0d7eb5f7dd..540e4d6c579 100644 --- a/pkg/block/indexheader/header_test.go +++ b/pkg/block/indexheader/header_test.go @@ -141,6 +141,38 @@ func TestReaders(t *testing.T) { testutil.Ok(t, err) testutil.Equals(t, []string(nil), vals) + // single value + rngs, err := br.PostingsOffsets("a", "9") + testutil.Ok(t, err) + for _, rng := range rngs { + testutil.Assert(t, rng.End > rng.Start) + } + + rngs, err = br.PostingsOffsets("a", "2", "3", "4", "5", "6", "7", "8", "9") + testutil.Ok(t, err) + for _, rng := range rngs { + testutil.Assert(t, rng.End > rng.Start) + } + + rngs, err = br.PostingsOffsets("a", "0") + testutil.Ok(t, err) + testutil.Assert(t, len(rngs) == 1) + testutil.Equals(t, NotFoundRange, rngs[0]) + + rngs, err = br.PostingsOffsets("a", "0", "10", "99") + testutil.Ok(t, err) + testutil.Assert(t, len(rngs) == 3) + for _, rng := range rngs { + testutil.Equals(t, NotFoundRange, rng) + } + + rngs, err = br.PostingsOffsets("a", "1", "10", "9") + testutil.Ok(t, err) + testutil.Assert(t, len(rngs) == 3) + testutil.Assert(t, rngs[0].End > rngs[0].Start) + testutil.Assert(t, rngs[2].End > rngs[2].Start) + testutil.Equals(t, NotFoundRange, rngs[1]) + // Regression tests for https://github.com/thanos-io/thanos/issues/2213. // Most of not existing value was working despite bug, except in certain unlucky cases // it was causing "invalid size" errors. diff --git a/pkg/block/indexheader/lazy_binary_reader.go b/pkg/block/indexheader/lazy_binary_reader.go index c3bee382c2f..451a79b6ee5 100644 --- a/pkg/block/indexheader/lazy_binary_reader.go +++ b/pkg/block/indexheader/lazy_binary_reader.go @@ -154,6 +154,19 @@ func (r *LazyBinaryReader) IndexVersion() (int, error) { return r.reader.IndexVersion() } +// PostingsOffsets implements Reader. +func (r *LazyBinaryReader) PostingsOffsets(name string, values ...string) ([]index.Range, error) { + r.readerMx.RLock() + defer r.readerMx.RUnlock() + + if err := r.load(); err != nil { + return nil, err + } + + r.usedAt.Store(time.Now().UnixNano()) + return r.reader.PostingsOffsets(name, values...) +} + // PostingsOffset implements Reader. func (r *LazyBinaryReader) PostingsOffset(name, value string) (index.Range, error) { r.readerMx.RLock() diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index a9aef3b881a..8197de55c36 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -359,6 +359,8 @@ type BucketStore struct { enableChunkHashCalculation bool + enabledLazyExpandedPostings bool + blockEstimatedMaxSeriesFunc BlockEstimator blockEstimatedMaxChunkFunc BlockEstimator } @@ -463,6 +465,13 @@ func WithBlockEstimatedMaxChunkFunc(f BlockEstimator) BucketStoreOption { } } +// WithLazyExpandedPostings enables lazy expanded postings. +func WithLazyExpandedPostings(enabled bool) BucketStoreOption { + return func(s *BucketStore) { + s.enabledLazyExpandedPostings = enabled + } +} + // NewBucketStore creates a new bucket backed store that implements the store API against // an object store bucket. It is optimized to work against high latency backends. func NewBucketStore( @@ -888,18 +897,20 @@ type blockSeriesClient struct { skipChunks bool shardMatcher *storepb.ShardMatcher + blockMatchers []*labels.Matcher calculateChunkHash bool chunkFetchDuration prometheus.Histogram // Internal state. - i uint64 - postings []storage.SeriesRef - chkMetas []chunks.Meta - lset labels.Labels - symbolizedLset []symbolizedLabel - entries []seriesEntry - hasMorePostings bool - batchSize int + i uint64 + lazyPostings *lazyExpandedPostings + expandedPostings []storage.SeriesRef + chkMetas []chunks.Meta + lset labels.Labels + symbolizedLset []symbolizedLabel + entries []seriesEntry + hasMorePostings bool + batchSize int } func newBlockSeriesClient( @@ -909,6 +920,7 @@ func newBlockSeriesClient( req *storepb.SeriesRequest, limiter ChunksLimiter, bytesLimiter BytesLimiter, + blockMatchers []*labels.Matcher, shardMatcher *storepb.ShardMatcher, calculateChunkHash bool, batchSize int, @@ -940,6 +952,7 @@ func newBlockSeriesClient( loadAggregates: req.Aggregates, shardMatcher: shardMatcher, + blockMatchers: blockMatchers, calculateChunkHash: calculateChunkHash, hasMorePostings: true, batchSize: batchSize, @@ -981,23 +994,31 @@ func newSortedMatchers(matchers []*labels.Matcher) sortedMatchers { func (b *blockSeriesClient) ExpandPostings( matchers sortedMatchers, seriesLimiter SeriesLimiter, + lazyExpandedPostingEnabled bool, ) error { - ps, err := b.indexr.ExpandedPostings(b.ctx, matchers, b.bytesLimiter) + ps, err := b.indexr.ExpandedPostings(b.ctx, matchers, b.bytesLimiter, lazyExpandedPostingEnabled) if err != nil { return errors.Wrap(err, "expanded matching posting") } - if len(ps) == 0 { + if ps == nil || len(ps.postings) == 0 { + b.lazyPostings = emptyLazyPostings return nil } + b.lazyPostings = ps - if err := seriesLimiter.Reserve(uint64(len(ps))); err != nil { + // If lazy expanded posting enabled, it is possible to fetch more series + // so easier to hit the series limit. + if err := seriesLimiter.Reserve(uint64(len(ps.postings))); err != nil { return httpgrpc.Errorf(int(codes.ResourceExhausted), "exceeded series limit: %s", err) } - b.postings = ps - if b.batchSize > len(ps) { - b.batchSize = len(ps) + if b.batchSize > len(ps.postings) { + b.batchSize = len(ps.postings) + } + if b.lazyPostings.lazyExpanded() { + // Assume lazy expansion could cut actual expanded postings length to 50%. + b.expandedPostings = make([]storage.SeriesRef, 0, len(b.lazyPostings.postings)/2) } b.entries = make([]seriesEntry, 0, b.batchSize) return nil @@ -1029,14 +1050,26 @@ func (b *blockSeriesClient) Recv() (*storepb.SeriesResponse, error) { func (b *blockSeriesClient) nextBatch() error { start := b.i end := start + SeriesBatchSize - if end > uint64(len(b.postings)) { - end = uint64(len(b.postings)) + if end > uint64(len(b.lazyPostings.postings)) { + end = uint64(len(b.lazyPostings.postings)) } b.i = end - postingsBatch := b.postings[start:end] + postingsBatch := b.lazyPostings.postings[start:end] if len(postingsBatch) == 0 { b.hasMorePostings = false + if b.lazyPostings.lazyExpanded() { + v, err := b.indexr.IndexVersion() + if err != nil { + return errors.Wrap(err, "get index version") + } + if v >= 2 { + for i := range b.expandedPostings { + b.expandedPostings[i] = b.expandedPostings[i] / 16 + } + } + b.indexr.storeExpandedPostingsToCache(b.blockMatchers, index.NewListPostings(b.expandedPostings), len(b.expandedPostings)) + } return nil } @@ -1066,6 +1099,16 @@ func (b *blockSeriesClient) nextBatch() error { return errors.Wrap(err, "Lookup labels symbols") } + for _, matcher := range b.lazyPostings.matchers { + val := b.lset.Get(matcher.Name) + if !matcher.Matches(val) { + continue + } + } + if b.lazyPostings.lazyExpanded() { + b.expandedPostings = append(b.expandedPostings, postingsBatch[i]) + } + completeLabelset := labelpb.ExtendSortedLabels(b.lset, b.extLset) if !b.shardMatcher.MatchesLabels(completeLabelset) { continue @@ -1299,6 +1342,17 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie if !ok { continue } + // Sort matchers to make sure we generate the same cache key + // when fetching expanded postings. + sort.Slice(blockMatchers, func(i, j int) bool { + if blockMatchers[i].Type == blockMatchers[j].Type { + if blockMatchers[i].Name == blockMatchers[j].Name { + return blockMatchers[i].Value < blockMatchers[j].Value + } + return blockMatchers[i].Name < blockMatchers[j].Name + } + return blockMatchers[i].Type < blockMatchers[j].Type + }) sortedBlockMatchers := newSortedMatchers(blockMatchers) @@ -1326,6 +1380,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie req, chunksLimiter, bytesLimiter, + blockMatchers, shardMatcher, s.enableChunkHashCalculation, s.seriesBatchSize, @@ -1344,7 +1399,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie "block.resolution": blk.meta.Thanos.Downsample.Resolution, }) - if err := blockClient.ExpandPostings(sortedBlockMatchers, seriesLimiter); err != nil { + if err := blockClient.ExpandPostings(sortedBlockMatchers, seriesLimiter, s.enabledLazyExpandedPostings); err != nil { span.Finish() return errors.Wrapf(err, "fetch series for block %s", blk.meta.ULID) } @@ -1592,6 +1647,7 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq seriesReq, nil, bytesLimiter, + reqSeriesMatchersNoExtLabels, nil, true, SeriesBatchSize, @@ -1603,6 +1659,7 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq if err := blockClient.ExpandPostings( sortedReqSeriesMatchersNoExtLabels, seriesLimiter, + s.enabledLazyExpandedPostings, ); err != nil { return err } @@ -1788,6 +1845,7 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR seriesReq, nil, bytesLimiter, + reqSeriesMatchersNoExtLabels, nil, true, SeriesBatchSize, @@ -1799,6 +1857,7 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR if err := blockClient.ExpandPostings( sortedReqSeriesMatchersNoExtLabels, seriesLimiter, + s.enabledLazyExpandedPostings, ); err != nil { return err } @@ -2192,6 +2251,8 @@ type bucketIndexReader struct { mtx sync.Mutex loadedSeries map[storage.SeriesRef][]byte + + indexVersion int } func newBucketIndexReader(block *bucketBlock) *bucketIndexReader { @@ -2205,6 +2266,20 @@ func newBucketIndexReader(block *bucketBlock) *bucketIndexReader { } return r } + +// IndexVersion caches the index header version. +func (r *bucketIndexReader) IndexVersion() (int, error) { + if r.indexVersion != 0 { + return r.indexVersion, nil + } + v, err := r.block.indexHeaderReader.IndexVersion() + if err != nil { + return 0, err + } + r.indexVersion = v + return v, nil +} + func (r *bucketIndexReader) reset() { r.loadedSeries = map[storage.SeriesRef][]byte{} } @@ -2218,7 +2293,7 @@ func (r *bucketIndexReader) reset() { // Reminder: A posting is a reference (represented as a uint64) to a series reference, which in turn points to the first // chunk where the series contains the matching label-value pair for a given block of data. Postings can be fetched by // single label name=value. -func (r *bucketIndexReader) ExpandedPostings(ctx context.Context, ms sortedMatchers, bytesLimiter BytesLimiter) ([]storage.SeriesRef, error) { +func (r *bucketIndexReader) ExpandedPostings(ctx context.Context, ms sortedMatchers, bytesLimiter BytesLimiter, lazyExpandedPostingEnabled bool) (*lazyExpandedPostings, error) { // Shortcut the case of `len(postingGroups) == 0`. It will only happen when no // matchers specified, and we don't need to fetch expanded postings from cache. if len(ms) == 0 { @@ -2230,12 +2305,11 @@ func (r *bucketIndexReader) ExpandedPostings(ctx context.Context, ms sortedMatch return nil, err } if hit { - return postings, nil + return newLazyExpandedPostings(postings), nil } var ( allRequested = false hasAdds = false - keys []labels.Label ) postingGroups, err := matchersToPostingGroups(ctx, r.block.indexHeaderReader.LabelValues, ms) @@ -2246,83 +2320,50 @@ func (r *bucketIndexReader) ExpandedPostings(ctx context.Context, ms sortedMatch r.storeExpandedPostingsToCache(ms, index.EmptyPostings(), 0) return nil, nil } + i := 0 for _, pg := range postingGroups { allRequested = allRequested || pg.addAll hasAdds = hasAdds || len(pg.addKeys) > 0 - // Postings returned by fetchPostings will be in the same order as keys - // so it's important that we iterate them in the same order later. - // We don't have any other way of pairing keys and fetched postings. - for _, key := range pg.addKeys { - keys = append(keys, labels.Label{Name: pg.name, Value: key}) - } - for _, key := range pg.removeKeys { - keys = append(keys, labels.Label{Name: pg.name, Value: key}) + // If a posting group doesn't have any keys, like posting group created + // from `=~".*"`, we don't have to keep the posting group as long as we + // keep track of whether we need to add all postings or not. + if len(pg.addKeys) == 0 && len(pg.removeKeys) == 0 { + continue } + postingGroups[i] = pg + i++ } + postingGroups = postingGroups[:i] + addAllPostings := allRequested && !hasAdds // We only need special All postings if there are no other adds. If there are, we can skip fetching // special All postings completely. - if allRequested && !hasAdds { + if addAllPostings { // add group with label to fetch "special All postings". name, value := index.AllPostingsKey() - allPostingsLabel := labels.Label{Name: name, Value: value} - postingGroups = append(postingGroups, newPostingGroup(true, name, []string{value}, nil)) - keys = append(keys, allPostingsLabel) } - fetchedPostings, closeFns, err := r.fetchPostings(ctx, keys, bytesLimiter) - defer func() { - for _, closeFn := range closeFns { - closeFn() - } - }() + ps, err := fetchLazyExpandedPostings(ctx, postingGroups, r, bytesLimiter, addAllPostings, lazyExpandedPostingEnabled) if err != nil { - return nil, errors.Wrap(err, "get postings") - } - - // Get "add" and "remove" postings from groups. We iterate over postingGroups and their keys - // again, and this is exactly the same order as before (when building the groups), so we can simply - // use one incrementing index to fetch postings from returned slice. - postingIndex := 0 - - var groupAdds, groupRemovals []index.Postings - for _, g := range postingGroups { - // We cannot add empty set to groupAdds, since they are intersected. - if len(g.addKeys) > 0 { - toMerge := make([]index.Postings, 0, len(g.addKeys)) - for _, l := range g.addKeys { - toMerge = append(toMerge, checkNilPosting(g.name, l, fetchedPostings[postingIndex])) - postingIndex++ - } - - groupAdds = append(groupAdds, index.Merge(toMerge...)) - } - - for _, l := range g.removeKeys { - groupRemovals = append(groupRemovals, checkNilPosting(g.name, l, fetchedPostings[postingIndex])) - postingIndex++ - } + return nil, errors.Wrap(err, "fetch and expand postings") } - - result := index.Without(index.Intersect(groupAdds...), index.Merge(groupRemovals...)) - ps, err := ExpandPostingsWithContext(ctx, result) - if err != nil { - return nil, errors.Wrap(err, "expand") + // If postings still have matchers to be applied lazily, skip caching expanded postings. + if !ps.lazyExpanded() { + r.storeExpandedPostingsToCache(ms, index.NewListPostings(ps.postings), len(ps.postings)) } - r.storeExpandedPostingsToCache(ms, index.NewListPostings(ps), len(ps)) - if len(ps) > 0 { + if len(ps.postings) > 0 { // As of version two all series entries are 16 byte padded. All references // we get have to account for that to get the correct offset. - version, err := r.block.indexHeaderReader.IndexVersion() + version, err := r.IndexVersion() if err != nil { return nil, errors.Wrap(err, "get index version") } if version >= 2 { - for i, id := range ps { - ps[i] = id * 16 + for i, id := range ps.postings { + ps.postings[i] = id * 16 } } } @@ -2345,16 +2386,19 @@ func ExpandPostingsWithContext(ctx context.Context, p index.Postings) (res []sto // If addAll is not set: Merge of postings for "addKeys" labels minus postings for removeKeys labels // This computation happens in ExpandedPostings. type postingGroup struct { - addAll bool - name string - addKeys []string - removeKeys []string + addAll bool + name string + matchers []*labels.Matcher + addKeys []string + removeKeys []string + cardinality int64 + lazy bool } func newPostingGroup(addAll bool, name string, addKeys, removeKeys []string) *postingGroup { return &postingGroup{ - addAll: addAll, name: name, + addAll: addAll, addKeys: addKeys, removeKeys: removeKeys, } @@ -2456,12 +2500,16 @@ func checkNilPosting(name, value string, p index.Postings) index.Postings { } func matchersToPostingGroups(ctx context.Context, lvalsFn func(name string) ([]string, error), ms []*labels.Matcher) ([]*postingGroup, error) { - matchersMap := make(map[string][]*labels.Matcher) + matchersMap := make(map[string]map[string]*labels.Matcher) for _, m := range ms { - matchersMap[m.Name] = append(matchersMap[m.Name], m) + m := m + if _, ok := matchersMap[m.Name]; !ok { + matchersMap[m.Name] = make(map[string]*labels.Matcher) + } + matchersMap[m.Name][m.String()] = m } - pgs := make([]*postingGroup, 0) + pgs := make([]*postingGroup, 0, len(matchersMap)) // NOTE: Derived from tsdb.PostingsForMatchers. for _, values := range matchersMap { var ( @@ -2472,8 +2520,9 @@ func matchersToPostingGroups(ctx context.Context, lvalsFn func(name string) ([]s valuesCached bool ) lvalsFunc := lvalsFn + matchers := make([]*labels.Matcher, 0, len(vals)) // Merge PostingGroups with the same matcher into 1 to - // avoid fetching duplicate postings. + // avoid fetching duplicate postings. for _, val := range values { pg, vals, err = toPostingGroup(ctx, lvalsFunc, val) if err != nil { @@ -2505,7 +2554,19 @@ func matchersToPostingGroups(ctx context.Context, lvalsFn func(name string) ([]s if !mergedPG.addAll && len(mergedPG.addKeys) == 0 { return nil, nil } - } + matchers = append(matchers, val) + } + // Set and sort matchers to be used when picking up posting fetch strategy. + mergedPG.matchers = matchers + slices.SortFunc(mergedPG.matchers, func(a, b *labels.Matcher) bool { + if a.Type == b.Type { + if a.Name == b.Name { + return a.Value < b.Value + } + return a.Name < b.Name + } + return a.Type < b.Type + }) pgs = append(pgs, mergedPG) } slices.SortFunc(pgs, func(a, b *postingGroup) bool { diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index fa29c69bc55..4c29d345992 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -1097,20 +1097,27 @@ func uploadTestBlock(t testing.TB, tmpDir string, bkt objstore.Bucket, series in }() logger := log.NewNopLogger() + ctx := context.Background() - appendTestData(t, h.Appender(context.Background()), series) + appendTestData(t, h.Appender(ctx), series) - testutil.Ok(t, os.MkdirAll(filepath.Join(tmpDir, "tmp"), os.ModePerm)) - id := createBlockFromHead(t, filepath.Join(tmpDir, "tmp"), h) + dir := filepath.Join(tmpDir, "tmp") + testutil.Ok(t, os.MkdirAll(dir, os.ModePerm)) + id := createBlockFromHead(t, dir, h) + bdir := filepath.Join(dir, id.String()) + meta, err := metadata.ReadFromDir(bdir) + testutil.Ok(t, err) + stats, err := block.GatherIndexHealthStats(logger, filepath.Join(bdir, block.IndexFilename), meta.MinTime, meta.MaxTime) + testutil.Ok(t, err) _, err = metadata.InjectThanos(log.NewNopLogger(), filepath.Join(tmpDir, "tmp", id.String()), metadata.Thanos{ Labels: labels.Labels{{Name: "ext1", Value: "1"}}.Map(), Downsample: metadata.ThanosDownsample{Resolution: 0}, Source: metadata.TestSource, + IndexStats: metadata.IndexStats{SeriesMaxSize: stats.SeriesMaxSize, ChunkMaxSize: stats.ChunkMaxSize}, }, nil) testutil.Ok(t, err) - testutil.Ok(t, block.Upload(context.Background(), logger, bkt, filepath.Join(tmpDir, "tmp", id.String()), metadata.NoneFunc)) - testutil.Ok(t, block.Upload(context.Background(), logger, bkt, filepath.Join(tmpDir, "tmp", id.String()), metadata.NoneFunc)) + testutil.Ok(t, block.Upload(ctx, logger, bkt, bdir, metadata.NoneFunc)) return id } @@ -1228,9 +1235,9 @@ func benchmarkExpandedPostings( t.ResetTimer() for i := 0; i < t.N(); i++ { - p, err := indexr.ExpandedPostings(context.Background(), newSortedMatchers(c.matchers), NewBytesLimiterFactory(0)(nil)) + p, err := indexr.ExpandedPostings(context.Background(), newSortedMatchers(c.matchers), NewBytesLimiterFactory(0)(nil), false) testutil.Ok(t, err) - testutil.Equals(t, c.expectedLen, len(p)) + testutil.Equals(t, c.expectedLen, len(p.postings)) } }) } @@ -1261,9 +1268,10 @@ func TestExpandedPostingsEmptyPostings(t *testing.T) { matcher1 := labels.MustNewMatcher(labels.MatchEqual, "j", "foo") // Match nothing. matcher2 := labels.MustNewMatcher(labels.MatchRegexp, "i", "500.*") - ps, err := indexr.ExpandedPostings(context.Background(), newSortedMatchers([]*labels.Matcher{matcher1, matcher2}), NewBytesLimiterFactory(0)(nil)) + ctx := context.Background() + ps, err := indexr.ExpandedPostings(ctx, newSortedMatchers([]*labels.Matcher{matcher1, matcher2}), NewBytesLimiterFactory(0)(nil), false) testutil.Ok(t, err) - testutil.Equals(t, len(ps), 0) + testutil.Equals(t, ps, (*lazyExpandedPostings)(nil)) // Make sure even if a matcher doesn't match any postings, we still cache empty expanded postings. testutil.Equals(t, 1, indexr.stats.cachedPostingsCompressions) } @@ -1271,21 +1279,28 @@ func TestExpandedPostingsEmptyPostings(t *testing.T) { func TestBucketSeries(t *testing.T) { tb := testutil.NewTB(t) storetestutil.RunSeriesInterestingCases(tb, 200e3, 200e3, func(t testutil.TB, samplesPerSeries, series int) { - benchBucketSeries(t, chunkenc.ValFloat, false, samplesPerSeries, series, 1) + benchBucketSeries(t, chunkenc.ValFloat, false, false, samplesPerSeries, series, 1) + }) +} + +func TestBucketSeriesLazyExpandedPostings(t *testing.T) { + tb := testutil.NewTB(t) + storetestutil.RunSeriesInterestingCases(tb, 200e3, 200e3, func(t testutil.TB, samplesPerSeries, series int) { + benchBucketSeries(t, chunkenc.ValFloat, false, true, samplesPerSeries, series, 1) }) } func TestBucketHistogramSeries(t *testing.T) { tb := testutil.NewTB(t) storetestutil.RunSeriesInterestingCases(tb, 200e3, 200e3, func(t testutil.TB, samplesPerSeries, series int) { - benchBucketSeries(t, chunkenc.ValHistogram, false, samplesPerSeries, series, 1) + benchBucketSeries(t, chunkenc.ValHistogram, false, false, samplesPerSeries, series, 1) }) } func TestBucketSkipChunksSeries(t *testing.T) { tb := testutil.NewTB(t) storetestutil.RunSeriesInterestingCases(tb, 200e3, 200e3, func(t testutil.TB, samplesPerSeries, series int) { - benchBucketSeries(t, chunkenc.ValFloat, true, samplesPerSeries, series, 1) + benchBucketSeries(t, chunkenc.ValFloat, true, false, samplesPerSeries, series, 1) }) } @@ -1293,7 +1308,7 @@ func BenchmarkBucketSeries(b *testing.B) { tb := testutil.NewTB(b) // 10e6 samples = ~1736 days with 15s scrape storetestutil.RunSeriesInterestingCases(tb, 10e6, 10e5, func(t testutil.TB, samplesPerSeries, series int) { - benchBucketSeries(t, chunkenc.ValFloat, false, samplesPerSeries, series, 1/100e6, 1/10e4, 1) + benchBucketSeries(t, chunkenc.ValFloat, false, false, samplesPerSeries, series, 1/100e6, 1/10e4, 1) }) } @@ -1301,11 +1316,11 @@ func BenchmarkBucketSkipChunksSeries(b *testing.B) { tb := testutil.NewTB(b) // 10e6 samples = ~1736 days with 15s scrape storetestutil.RunSeriesInterestingCases(tb, 10e6, 10e5, func(t testutil.TB, samplesPerSeries, series int) { - benchBucketSeries(t, chunkenc.ValFloat, true, samplesPerSeries, series, 1/100e6, 1/10e4, 1) + benchBucketSeries(t, chunkenc.ValFloat, true, false, samplesPerSeries, series, 1/100e6, 1/10e4, 1) }) } -func benchBucketSeries(t testutil.TB, sampleType chunkenc.ValueType, skipChunk bool, samplesPerSeries, totalSeries int, requestedRatios ...float64) { +func benchBucketSeries(t testutil.TB, sampleType chunkenc.ValueType, skipChunk, lazyExpandedPostings bool, samplesPerSeries, totalSeries int, requestedRatios ...float64) { const numOfBlocks = 4 tmpDir := t.TempDir() @@ -1321,12 +1336,6 @@ func benchBucketSeries(t testutil.TB, sampleType chunkenc.ValueType, skipChunk b ) extLset := labels.Labels{{Name: "ext1", Value: "1"}} - thanosMeta := metadata.Thanos{ - Labels: extLset.Map(), - Downsample: metadata.ThanosDownsample{Resolution: 0}, - Source: metadata.TestSource, - } - blockDir := filepath.Join(tmpDir, "tmp") samplesPerSeriesPerBlock := samplesPerSeries / numOfBlocks @@ -1354,19 +1363,33 @@ func benchBucketSeries(t testutil.TB, sampleType chunkenc.ValueType, skipChunk b }) id := createBlockFromHead(t, blockDir, head) testutil.Ok(t, head.Close()) + blockIDDir := filepath.Join(blockDir, id.String()) + meta, err := metadata.ReadFromDir(blockIDDir) + testutil.Ok(t, err) + stats, err := block.GatherIndexHealthStats(logger, filepath.Join(blockIDDir, block.IndexFilename), meta.MinTime, meta.MaxTime) + testutil.Ok(t, err) + thanosMeta := metadata.Thanos{ + Labels: extLset.Map(), + Downsample: metadata.ThanosDownsample{Resolution: 0}, + Source: metadata.TestSource, + IndexStats: metadata.IndexStats{ + SeriesMaxSize: stats.SeriesMaxSize, + ChunkMaxSize: stats.ChunkMaxSize, + }, + } // Histogram chunks are represented differently in memory and on disk. In order to // have a precise comparison, we need to use the on-disk representation as the expected value // instead of the in-memory one. - diskBlock, err := tsdb.OpenBlock(logger, path.Join(blockDir, id.String()), nil) + diskBlock, err := tsdb.OpenBlock(logger, blockIDDir, nil) testutil.Ok(t, err) series = append(series, storetestutil.ReadSeriesFromBlock(t, diskBlock, extLset, skipChunk)...) - meta, err := metadata.InjectThanos(logger, filepath.Join(blockDir, id.String()), thanosMeta, nil) + meta, err = metadata.InjectThanos(logger, blockIDDir, thanosMeta, nil) testutil.Ok(t, err) - testutil.Ok(t, meta.WriteToDir(logger, filepath.Join(blockDir, id.String()))) - testutil.Ok(t, block.Upload(context.Background(), logger, bkt, filepath.Join(blockDir, id.String()), metadata.NoneFunc)) + testutil.Ok(t, meta.WriteToDir(logger, blockIDDir)) + testutil.Ok(t, block.Upload(context.Background(), logger, bkt, blockIDDir, metadata.NoneFunc)) } ibkt := objstore.WithNoopInstr(bkt) @@ -1392,6 +1415,7 @@ func benchBucketSeries(t testutil.TB, sampleType chunkenc.ValueType, skipChunk b 0, WithLogger(logger), WithChunkPool(chunkPool), + WithLazyExpandedPostings(lazyExpandedPostings), ) testutil.Ok(t, err) @@ -2572,13 +2596,14 @@ func benchmarkBlockSeriesWithConcurrency(b *testing.B, concurrency int, blockMet req, chunksLimiter, NewBytesLimiterFactory(0)(nil), + matchers, nil, false, SeriesBatchSize, dummyHistogram, nil, ) - testutil.Ok(b, blockClient.ExpandPostings(sortedMatchers, seriesLimiter)) + testutil.Ok(b, blockClient.ExpandPostings(sortedMatchers, seriesLimiter, false)) defer blockClient.Close() // Ensure at least 1 series has been returned (as expected). @@ -2633,9 +2658,10 @@ func TestMatchersToPostingGroup(t *testing.T) { }, expected: []*postingGroup{ { - name: "foo", - addAll: false, - addKeys: []string{"bar"}, + name: "foo", + addAll: false, + addKeys: []string{"bar"}, + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")}, }, }, }, @@ -2650,9 +2676,10 @@ func TestMatchersToPostingGroup(t *testing.T) { }, expected: []*postingGroup{ { - name: "foo", - addAll: false, - addKeys: []string{"bar"}, + name: "foo", + addAll: false, + addKeys: []string{"bar"}, + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")}, }, }, }, @@ -2670,9 +2697,10 @@ func TestMatchersToPostingGroup(t *testing.T) { }, expected: []*postingGroup{ { - name: "foo", - addAll: false, - addKeys: []string{"bar"}, + name: "foo", + addAll: false, + addKeys: []string{"bar"}, + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")}, }, }, }, @@ -2688,14 +2716,16 @@ func TestMatchersToPostingGroup(t *testing.T) { }, expected: []*postingGroup{ { - name: "bar", - addAll: false, - addKeys: []string{"baz"}, + name: "bar", + addAll: false, + addKeys: []string{"baz"}, + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "bar", "baz")}, }, { - name: "foo", - addAll: false, - addKeys: []string{"bar"}, + name: "foo", + addAll: false, + addKeys: []string{"bar"}, + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")}, }, }, }, @@ -2732,9 +2762,10 @@ func TestMatchersToPostingGroup(t *testing.T) { }, expected: []*postingGroup{ { - name: "foo", - addAll: false, - addKeys: []string{"bar"}, + name: "foo", + addAll: false, + addKeys: []string{"bar"}, + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"), labels.MustNewMatcher(labels.MatchNotEqual, "foo", "baz")}, }, }, }, @@ -2759,9 +2790,10 @@ func TestMatchersToPostingGroup(t *testing.T) { }, expected: []*postingGroup{ { - name: "foo", - addAll: false, - addKeys: []string{"bar"}, + name: "foo", + addAll: false, + addKeys: []string{"bar"}, + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"), labels.MustNewMatcher(labels.MatchRegexp, "foo", "b.*")}, }, }, }, @@ -2776,9 +2808,10 @@ func TestMatchersToPostingGroup(t *testing.T) { }, expected: []*postingGroup{ { - name: "foo", - addAll: false, - addKeys: []string{"bar"}, + name: "foo", + addAll: false, + addKeys: []string{"bar"}, + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"), labels.MustNewMatcher(labels.MatchNotEqual, "foo", "")}, }, }, }, @@ -2793,9 +2826,10 @@ func TestMatchersToPostingGroup(t *testing.T) { }, expected: []*postingGroup{ { - name: "foo", - addAll: false, - addKeys: []string{"bar"}, + name: "foo", + addAll: false, + addKeys: []string{"bar"}, + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"), labels.MustNewMatcher(labels.MatchRegexp, "foo", ".+")}, }, }, }, @@ -2810,9 +2844,10 @@ func TestMatchersToPostingGroup(t *testing.T) { }, expected: []*postingGroup{ { - name: "foo", - addAll: false, - addKeys: []string{"bar"}, + name: "foo", + addAll: false, + addKeys: []string{"bar"}, + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchRegexp, "foo", "bar|baz"), labels.MustNewMatcher(labels.MatchRegexp, "foo", "bar|buzz")}, }, }, }, @@ -2830,6 +2865,7 @@ func TestMatchersToPostingGroup(t *testing.T) { name: "foo", addAll: true, removeKeys: []string{"bar", "baz"}, + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchNotEqual, "foo", "bar"), labels.MustNewMatcher(labels.MatchNotEqual, "foo", "baz")}, }, }, }, @@ -2847,8 +2883,9 @@ func TestMatchersToPostingGroup(t *testing.T) { }, expected: []*postingGroup{ { - name: labels.MetricName, - addAll: true, + name: labels.MetricName, + addAll: true, + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchRegexp, "__name__", ".*")}, }, }, }, @@ -2866,18 +2903,21 @@ func TestMatchersToPostingGroup(t *testing.T) { }, expected: []*postingGroup{ { - name: labels.MetricName, - addAll: false, - addKeys: []string{"up"}, + name: labels.MetricName, + addAll: false, + addKeys: []string{"up"}, + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "__name__", "up")}, }, { - name: "cluster", - addAll: false, - addKeys: []string{"us-east-1", "us-west-2"}, + name: "cluster", + addAll: false, + addKeys: []string{"us-east-1", "us-west-2"}, + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchNotEqual, "cluster", "")}, }, { - name: "job", - addAll: true, + name: "job", + addAll: true, + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchRegexp, "job", ".*")}, }, }, }, @@ -2898,19 +2938,22 @@ func TestMatchersToPostingGroup(t *testing.T) { }, expected: []*postingGroup{ { - name: labels.MetricName, - addAll: false, - addKeys: []string{"go_info", "up"}, + name: labels.MetricName, + addAll: false, + addKeys: []string{"go_info", "up"}, + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchNotEqual, "__name__", "")}, }, { - name: "cluster", - addAll: false, - addKeys: []string{"us-east-1", "us-west-2"}, + name: "cluster", + addAll: false, + addKeys: []string{"us-east-1", "us-west-2"}, + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchNotEqual, "cluster", "")}, }, { - name: "job", - addAll: false, - addKeys: []string{"prometheus", "thanos"}, + name: "job", + addAll: false, + addKeys: []string{"prometheus", "thanos"}, + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchNotEqual, "job", "")}, }, }, }, @@ -3148,16 +3191,16 @@ func TestExpandedPostingsRace(t *testing.T) { i := i bb := bb go func(i int, bb *bucketBlock) { - refs, err := bb.indexReader().ExpandedPostings(context.Background(), m, NewBytesLimiterFactory(0)(nil)) + refs, err := bb.indexReader().ExpandedPostings(context.Background(), m, NewBytesLimiterFactory(0)(nil), false) testutil.Ok(t, err) defer wg.Done() l.Lock() defer l.Unlock() if previousRefs[i] != nil { - testutil.Equals(t, previousRefs[i], refs) + testutil.Equals(t, previousRefs[i], refs.postings) } else { - previousRefs[i] = refs + previousRefs[i] = refs.postings } }(i, bb) } diff --git a/pkg/store/lazy_postings.go b/pkg/store/lazy_postings.go new file mode 100644 index 00000000000..6c480154298 --- /dev/null +++ b/pkg/store/lazy_postings.go @@ -0,0 +1,260 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package store + +import ( + "context" + "math" + + "github.com/pkg/errors" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/tsdb/index" + "golang.org/x/exp/slices" + + "github.com/thanos-io/thanos/pkg/block/indexheader" +) + +var emptyLazyPostings = &lazyExpandedPostings{postings: nil, matchers: nil} + +// lazyExpandedPostings contains expanded postings (series IDs). If lazy posting expansion is +// enabled, it might contain matchers that can be lazily applied during series filtering time. +type lazyExpandedPostings struct { + postings []storage.SeriesRef + matchers []*labels.Matcher +} + +func newLazyExpandedPostings(ps []storage.SeriesRef, matchers ...*labels.Matcher) *lazyExpandedPostings { + return &lazyExpandedPostings{ + postings: ps, + matchers: matchers, + } +} + +func (p *lazyExpandedPostings) lazyExpanded() bool { + return p != nil && len(p.matchers) > 0 +} + +func optimizePostingsFetchByDownloadedBytes(r *bucketIndexReader, postingGroups []*postingGroup, seriesMaxSize int64, seriesMatchRatio float64) ([]*postingGroup, bool, error) { + if len(postingGroups) <= 1 { + return postingGroups, false, nil + } + // Collect posting cardinality of each posting group. + for _, pg := range postingGroups { + // A posting group can have either add keys or remove keys but not both the same time. + vals := pg.addKeys + if len(pg.removeKeys) > 0 { + vals = pg.removeKeys + } + rngs, err := r.block.indexHeaderReader.PostingsOffsets(pg.name, vals...) + if err != nil { + return nil, false, errors.Wrapf(err, "postings offsets for %s", pg.name) + } + + // No posting ranges found means empty posting. + if len(rngs) == 0 { + return nil, true, nil + } + for _, r := range rngs { + if r == indexheader.NotFoundRange { + continue + } + pg.cardinality += (r.End - r.Start - 4) / 4 + } + } + slices.SortFunc(postingGroups, func(a, b *postingGroup) bool { + if a.cardinality == b.cardinality { + return a.name < b.name + } + return a.cardinality < b.cardinality + }) + + /* + Algorithm of choosing what postings we need to fetch right now and what + postings we expand lazily. + Sort posting groups by cardinality, so we can iterate from posting group with the smallest posting size. + The algorithm focuses on fetching fewer data, including postings and series. + + We need to fetch at least 1 posting group in order to fetch series. So if we only fetch the first posting group, + the data bytes we need to download is formula F1: P1 * 4 + P1 * S where P1 is the number of postings in group 1 + and S is the size per series. 4 is the byte size per posting. + + If we are going to fetch 2 posting groups, we can intersect the two postings to reduce series we need to download (hopefully). + Assuming for each intersection, the series matching ratio is R (0 < R < 1). Then the data bytes we need to download is + formula F2: P1 * 4 + P2 * 4 + P1 * S * R. + We can get formula F3 if we are going to fetch 3 posting groups: + F3: P1 * 4 + P2 * 4 + P3 * 4 + P1 * S * R^2. + + Let's compare formula F2 and F1 first. + P1 * 4 + P2 * 4 + P1 * S * R < P1 * 4 + P1 * S + => P2 * 4 < P1 * S * (1 - R) + Left hand side is the posting group size and right hand side is basically the series size we don't need to fetch + by having the additional intersection. In order to fetch less data for F2 than F1, we just need to ensure that + the additional postings size is smaller. + + Let's compare formula F3 and F2. + P1 * 4 + P2 * 4 + P3 * 4 + P1 * S * R^2 < P1 * 4 + P2 * 4 + P1 * S * R + => P3 * 4 < P1 * S * R * (1 - R) + Same as the previous formula. + + Compare formula F4 (Cost to fetch up to 4 posting groups) and F3. + P4 * 4 < P1 * S * R^2 * (1 - R) + + We can generalize this to formula: Pn * 4 < P1 * S * R^(n - 2) * (1 - R) + + The idea of the algorithm: + By iterating the posting group in sorted order of cardinality, we need to make sure that by fetching the current posting group, + the total data fetched is smaller than the previous posting group. If so, then we continue to next posting group, + otherwise we stop. + + This ensures that when we stop at one posting group, posting groups after it always need to fetch more data. + Based on formula Pn * 4 < P1 * S * R^(n - 2) * (1 - R), left hand side is always increasing while iterating to larger + posting groups while right hand side value is always decreasing as R < 1. + */ + seriesBytesToFetch := postingGroups[0].cardinality * seriesMaxSize + p := float64(1) + i := 1 // Start from index 1 as we always need to fetch the smallest posting group. + for i < len(postingGroups) { + pg := postingGroups[i] + // Need to fetch more data on postings than series we avoid fetching, stop here and lazy expanding rest of matchers. + if pg.cardinality*4 > int64(p*math.Ceil((1-seriesMatchRatio)*float64(seriesBytesToFetch))) { + break + } + p = p * seriesMatchRatio + i++ + } + for i < len(postingGroups) { + postingGroups[i].lazy = true + i++ + } + return postingGroups, false, nil +} + +func fetchLazyExpandedPostings( + ctx context.Context, + postingGroups []*postingGroup, + r *bucketIndexReader, + bytesLimiter BytesLimiter, + addAllPostings bool, + lazyExpandedPostingEnabled bool, +) (*lazyExpandedPostings, error) { + var ( + err error + emptyPostingGroup bool + ) + /* + There are several cases that we skip postings fetch optimization: + - Lazy expanded posting disabled. + - Add all postings. This means we don't have a posting group with any add keys. + - `SeriesMaxSize` not set for this block then we have no way to estimate series size. + - Only one effective posting group available. We need to at least download postings from 1 posting group so no need to optimize. + */ + if lazyExpandedPostingEnabled && !addAllPostings && + r.block.meta.Thanos.IndexStats.SeriesMaxSize > 0 && len(postingGroups) > 1 { + postingGroups, emptyPostingGroup, err = optimizePostingsFetchByDownloadedBytes( + r, + postingGroups, + r.block.meta.Thanos.IndexStats.SeriesMaxSize, + 0.5, // TODO(yeya24): Expose this as a flag. + ) + if err != nil { + return nil, err + } + if emptyPostingGroup { + return emptyLazyPostings, nil + } + } + + ps, matchers, err := fetchAndExpandPostingGroups(ctx, r, postingGroups, bytesLimiter) + if err != nil { + return nil, err + } + return &lazyExpandedPostings{postings: ps, matchers: matchers}, nil +} + +// keysToFetchFromPostingGroups returns label pairs (postings) to fetch +// and matchers we need to use for lazy posting expansion. +// Input `postingGroups` needs to be ordered by cardinality in case lazy +// expansion is enabled. When we find the first lazy posting group we can exit. +func keysToFetchFromPostingGroups(postingGroups []*postingGroup) ([]labels.Label, []*labels.Matcher) { + var lazyMatchers []*labels.Matcher + keys := make([]labels.Label, 0) + i := 0 + for i < len(postingGroups) { + pg := postingGroups[i] + if pg.lazy { + break + } + + // Postings returned by fetchPostings will be in the same order as keys + // so it's important that we iterate them in the same order later. + // We don't have any other way of pairing keys and fetched postings. + for _, key := range pg.addKeys { + keys = append(keys, labels.Label{Name: pg.name, Value: key}) + } + for _, key := range pg.removeKeys { + keys = append(keys, labels.Label{Name: pg.name, Value: key}) + } + i++ + } + if i < len(postingGroups) { + lazyMatchers = make([]*labels.Matcher, 0) + for i < len(postingGroups) { + lazyMatchers = append(lazyMatchers, postingGroups[i].matchers...) + i++ + } + } + return keys, lazyMatchers +} + +func fetchAndExpandPostingGroups(ctx context.Context, r *bucketIndexReader, postingGroups []*postingGroup, bytesLimiter BytesLimiter) ([]storage.SeriesRef, []*labels.Matcher, error) { + keys, lazyMatchers := keysToFetchFromPostingGroups(postingGroups) + fetchedPostings, closeFns, err := r.fetchPostings(ctx, keys, bytesLimiter) + defer func() { + for _, closeFn := range closeFns { + closeFn() + } + }() + if err != nil { + return nil, nil, errors.Wrap(err, "get postings") + } + + // Get "add" and "remove" postings from groups. We iterate over postingGroups and their keys + // again, and this is exactly the same order as before (when building the groups), so we can simply + // use one incrementing index to fetch postings from returned slice. + postingIndex := 0 + + var groupAdds, groupRemovals []index.Postings + for _, g := range postingGroups { + if g.lazy { + break + } + // We cannot add empty set to groupAdds, since they are intersected. + if len(g.addKeys) > 0 { + toMerge := make([]index.Postings, 0, len(g.addKeys)) + for _, l := range g.addKeys { + toMerge = append(toMerge, checkNilPosting(g.name, l, fetchedPostings[postingIndex])) + postingIndex++ + } + + groupAdds = append(groupAdds, index.Merge(toMerge...)) + } + + for _, l := range g.removeKeys { + groupRemovals = append(groupRemovals, checkNilPosting(g.name, l, fetchedPostings[postingIndex])) + postingIndex++ + } + } + + result := index.Without(index.Intersect(groupAdds...), index.Merge(groupRemovals...)) + + if ctx.Err() != nil { + return nil, nil, ctx.Err() + } + ps, err := ExpandPostingsWithContext(ctx, result) + if err != nil { + return nil, nil, errors.Wrap(err, "expand") + } + return ps, lazyMatchers, nil +} diff --git a/pkg/store/lazy_postings_test.go b/pkg/store/lazy_postings_test.go new file mode 100644 index 00000000000..ec1ac600d3c --- /dev/null +++ b/pkg/store/lazy_postings_test.go @@ -0,0 +1,473 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package store + +import ( + "context" + "path" + "testing" + + "github.com/efficientgo/core/testutil" + "github.com/go-kit/log" + "github.com/oklog/ulid" + "github.com/pkg/errors" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/tsdb" + "github.com/prometheus/prometheus/tsdb/index" + + "github.com/thanos-io/objstore/providers/filesystem" + "github.com/thanos-io/thanos/pkg/block/indexheader" + "github.com/thanos-io/thanos/pkg/block/metadata" +) + +func TestKeysToFetchFromPostingGroups(t *testing.T) { + for _, tc := range []struct { + name string + pgs []*postingGroup + expectedLabels []labels.Label + expectedMatchers []*labels.Matcher + }{ + { + name: "empty group", + pgs: []*postingGroup{ + { + addKeys: []string{}, + removeKeys: []string{}, + }, + }, + expectedLabels: []labels.Label{}, + }, + { + name: "empty groups", + pgs: []*postingGroup{ + { + addKeys: []string{}, + removeKeys: []string{}, + }, + { + addKeys: []string{}, + removeKeys: []string{}, + }, + { + addKeys: []string{}, + removeKeys: []string{}, + }, + }, + expectedLabels: []labels.Label{}, + }, + { + name: "group with add keys", + pgs: []*postingGroup{ + { + name: "test", + addKeys: []string{"foo", "bar"}, + removeKeys: []string{}, + }, + }, + expectedLabels: []labels.Label{{Name: "test", Value: "foo"}, {Name: "test", Value: "bar"}}, + }, + { + name: "group with remove keys", + pgs: []*postingGroup{ + { + name: "test", + addKeys: []string{}, + removeKeys: []string{"foo", "bar"}, + }, + }, + expectedLabels: []labels.Label{{Name: "test", Value: "foo"}, {Name: "test", Value: "bar"}}, + }, + { + name: "group with both add and remove keys", + pgs: []*postingGroup{ + { + name: "test", + addKeys: []string{"foo", "bar"}, + removeKeys: []string{"a", "b"}, + }, + }, + expectedLabels: []labels.Label{ + {Name: "test", Value: "foo"}, {Name: "test", Value: "bar"}, + {Name: "test", Value: "a"}, {Name: "test", Value: "b"}, + }, + }, + { + name: "groups with both add keys", + pgs: []*postingGroup{ + { + name: "test", + addKeys: []string{"foo", "bar"}, + }, + { + name: "foo", + addKeys: []string{"bar"}, + }, + }, + expectedLabels: []labels.Label{ + {Name: "test", Value: "foo"}, {Name: "test", Value: "bar"}, + {Name: "foo", Value: "bar"}, + }, + }, + { + name: "groups with add and remove keys", + pgs: []*postingGroup{ + { + name: "test", + addKeys: []string{"foo", "bar"}, + }, + { + name: "foo", + removeKeys: []string{"bar"}, + }, + }, + expectedLabels: []labels.Label{ + {Name: "test", Value: "foo"}, {Name: "test", Value: "bar"}, + {Name: "foo", Value: "bar"}, + }, + }, + { + name: "lazy posting group with empty matchers", + pgs: []*postingGroup{ + { + name: "test", + addKeys: []string{"foo", "bar"}, + matchers: []*labels.Matcher{}, + lazy: true, + }, + }, + expectedLabels: []labels.Label{}, + expectedMatchers: []*labels.Matcher{}, + }, + { + name: "lazy posting group", + pgs: []*postingGroup{ + { + name: "test", + addKeys: []string{"foo", "bar"}, + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")}, + lazy: true, + }, + }, + expectedLabels: []labels.Label{}, + expectedMatchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")}, + }, + { + name: "multiple lazy posting groups", + pgs: []*postingGroup{ + { + name: "test", + addKeys: []string{"foo", "bar"}, + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")}, + lazy: true, + }, + { + name: "job", + addKeys: []string{"prometheus"}, + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchRegexp, "job", "prometheus.*")}, + lazy: true, + }, + }, + expectedLabels: []labels.Label{}, + expectedMatchers: []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"), + labels.MustNewMatcher(labels.MatchRegexp, "job", "prometheus.*"), + }, + }, + { + name: "multiple non lazy and lazy posting groups", + pgs: []*postingGroup{ + { + name: "test", + addKeys: []string{"foo", "bar"}, + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")}, + }, + { + name: "test", + addKeys: []string{"foo", "bar"}, + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")}, + lazy: true, + }, + { + name: "job", + addKeys: []string{"prometheus"}, + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchRegexp, "job", "prometheus.*")}, + lazy: true, + }, + }, + expectedLabels: []labels.Label{{Name: "test", Value: "foo"}, {Name: "test", Value: "bar"}}, + expectedMatchers: []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"), + labels.MustNewMatcher(labels.MatchRegexp, "job", "prometheus.*"), + }, + }, + } { + t.Run(tc.name, func(t *testing.T) { + keys, matchers := keysToFetchFromPostingGroups(tc.pgs) + testutil.Equals(t, tc.expectedLabels, keys) + testutil.Equals(t, tc.expectedMatchers, matchers) + }) + } +} + +type mockIndexHeaderReader struct { + postings map[string]map[string]index.Range + err error +} + +func (h *mockIndexHeaderReader) Close() error { return nil } + +func (h *mockIndexHeaderReader) IndexVersion() (int, error) { return 0, nil } + +func (h *mockIndexHeaderReader) PostingsOffsets(name string, value ...string) ([]index.Range, error) { + ranges := make([]index.Range, 0) + if _, ok := h.postings[name]; !ok { + return nil, nil + } + for _, val := range value { + if rng, ok := h.postings[name][val]; ok { + ranges = append(ranges, rng) + } else { + ranges = append(ranges, indexheader.NotFoundRange) + } + } + return ranges, h.err +} + +func (h *mockIndexHeaderReader) PostingsOffset(name string, value string) (index.Range, error) { + return index.Range{}, nil +} + +func (h *mockIndexHeaderReader) LookupSymbol(o uint32) (string, error) { return "", nil } + +func (h *mockIndexHeaderReader) LabelValues(name string) ([]string, error) { return nil, nil } + +func (h *mockIndexHeaderReader) LabelNames() ([]string, error) { return nil, nil } + +func TestOptimizePostingsFetchByDownloadedBytes(t *testing.T) { + ctx := context.Background() + logger := log.NewNopLogger() + dir := t.TempDir() + bkt, err := filesystem.NewBucket(dir) + testutil.Ok(t, err) + defer func() { testutil.Ok(t, bkt.Close()) }() + + inputError := errors.New("random") + blockID := ulid.MustNew(1, nil) + meta := &metadata.Meta{ + BlockMeta: tsdb.BlockMeta{ULID: blockID}, + Thanos: metadata.Thanos{ + Labels: map[string]string{ + "a": "b", + "c": "d", + }, + }, + } + for _, tc := range []struct { + name string + inputPostings map[string]map[string]index.Range + inputError error + postingGroups []*postingGroup + seriesMaxSize int64 + seriesMatchRatio float64 + expectedPostingGroups []*postingGroup + expectedEmptyPosting bool + expectedError string + }{ + { + name: "empty posting group", + }, + { + name: "one posting group", + postingGroups: []*postingGroup{ + {name: "foo"}, + }, + expectedPostingGroups: []*postingGroup{ + {name: "foo"}, + }, + }, + { + name: "posting offsets return error", + inputPostings: map[string]map[string]index.Range{ + "foo": {"bar": index.Range{End: 8}}, + "bar": {"foo": index.Range{Start: 8, End: 16}}, + }, + inputError: inputError, + seriesMaxSize: 1000, + seriesMatchRatio: 0.5, + postingGroups: []*postingGroup{ + {name: "foo", addKeys: []string{"bar"}}, + {name: "bar", addKeys: []string{"foo"}}, + }, + expectedPostingGroups: nil, + expectedError: "postings offsets for foo: random", + }, + { + name: "posting offsets empty", + inputPostings: map[string]map[string]index.Range{ + "foo": {"bar": index.Range{End: 8}}, + }, + seriesMaxSize: 1000, + seriesMatchRatio: 0.5, + postingGroups: []*postingGroup{ + {name: "foo", addKeys: []string{"bar"}}, + {name: "bar", addKeys: []string{"foo"}}, + }, + expectedPostingGroups: nil, + expectedEmptyPosting: true, + }, + { + name: "posting group label doesn't exist", + inputPostings: map[string]map[string]index.Range{ + "foo": {"bar": index.Range{End: 8}}, + }, + seriesMaxSize: 1000, + seriesMatchRatio: 0.5, + postingGroups: []*postingGroup{ + {name: "foo", addKeys: []string{"bar"}}, + {name: "bar", addKeys: []string{"foo"}}, + }, + expectedPostingGroups: nil, + expectedEmptyPosting: true, + }, + { + name: "posting group keys partial exist", + inputPostings: map[string]map[string]index.Range{ + "foo": {"bar": index.Range{End: 8}}, + "bar": {"foo": index.Range{Start: 8, End: 16}}, + }, + seriesMaxSize: 1000, + seriesMatchRatio: 0.5, + postingGroups: []*postingGroup{ + {name: "foo", addKeys: []string{"bar"}}, + {name: "bar", addKeys: []string{"foo", "buz"}}, + }, + expectedPostingGroups: []*postingGroup{ + {name: "bar", addKeys: []string{"foo", "buz"}, cardinality: 1}, + {name: "foo", addKeys: []string{"bar"}, cardinality: 1}, + }, + }, + { + name: "two posting groups with add keys, small postings and large series size", + inputPostings: map[string]map[string]index.Range{ + "foo": {"bar": index.Range{End: 8}}, + "bar": {"foo": index.Range{Start: 8, End: 16}}, + }, + seriesMaxSize: 1000, + seriesMatchRatio: 0.5, + postingGroups: []*postingGroup{ + {name: "foo", addKeys: []string{"bar"}}, + {name: "bar", addKeys: []string{"foo"}}, + }, + expectedPostingGroups: []*postingGroup{ + {name: "bar", addKeys: []string{"foo"}, cardinality: 1}, + {name: "foo", addKeys: []string{"bar"}, cardinality: 1}, + }, + }, + { + name: "two posting groups with remove keys, small postings and large series size", + inputPostings: map[string]map[string]index.Range{ + "foo": {"bar": index.Range{End: 8}}, + "bar": {"foo": index.Range{Start: 8, End: 16}}, + }, + seriesMaxSize: 1000, + seriesMatchRatio: 0.5, + postingGroups: []*postingGroup{ + {name: "foo", removeKeys: []string{"bar"}}, + {name: "bar", removeKeys: []string{"foo"}}, + }, + expectedPostingGroups: []*postingGroup{ + {name: "bar", removeKeys: []string{"foo"}, cardinality: 1}, + {name: "foo", removeKeys: []string{"bar"}, cardinality: 1}, + }, + }, + { + name: "two posting groups with add keys, very small series size, making one posting group lazy", + inputPostings: map[string]map[string]index.Range{ + "foo": {"bar": index.Range{End: 8}}, + "bar": {"foo": index.Range{Start: 8, End: 16}}, + }, + seriesMaxSize: 1, + seriesMatchRatio: 0.5, + postingGroups: []*postingGroup{ + {name: "foo", addKeys: []string{"bar"}}, + {name: "bar", addKeys: []string{"foo"}}, + }, + expectedPostingGroups: []*postingGroup{ + {name: "bar", addKeys: []string{"foo"}, cardinality: 1}, + {name: "foo", addKeys: []string{"bar"}, cardinality: 1, lazy: true}, + }, + }, + { + name: "two posting groups with add keys, one small posting group and a very large posting group, large one become lazy", + inputPostings: map[string]map[string]index.Range{ + "foo": {"bar": index.Range{End: 8}}, + "bar": {"foo": index.Range{Start: 8, End: 1000012}}, + }, + seriesMaxSize: 1000, + seriesMatchRatio: 0.5, + postingGroups: []*postingGroup{ + {name: "foo", addKeys: []string{"bar"}}, + {name: "bar", addKeys: []string{"foo"}}, + }, + expectedPostingGroups: []*postingGroup{ + {name: "foo", addKeys: []string{"bar"}, cardinality: 1}, + {name: "bar", addKeys: []string{"foo"}, cardinality: 250000, lazy: true}, + }, + }, + { + name: "three posting groups with add keys, two small posting group and a very large posting group, large one become lazy", + inputPostings: map[string]map[string]index.Range{ + "foo": {"bar": index.Range{End: 8}}, + "bar": {"foo": index.Range{Start: 8, End: 1000012}}, + "cluster": {"us": index.Range{Start: 1000012, End: 1000020}}, + }, + seriesMaxSize: 1000, + seriesMatchRatio: 0.5, + postingGroups: []*postingGroup{ + {name: "foo", addKeys: []string{"bar"}}, + {name: "bar", addKeys: []string{"foo"}}, + {name: "cluster", addKeys: []string{"us"}}, + }, + expectedPostingGroups: []*postingGroup{ + {name: "cluster", addKeys: []string{"us"}, cardinality: 1}, + {name: "foo", addKeys: []string{"bar"}, cardinality: 1}, + {name: "bar", addKeys: []string{"foo"}, cardinality: 250000, lazy: true}, + }, + }, + { + name: "three posting groups with either add or remove keys, two small posting group and a very large posting group, large one become lazy", + inputPostings: map[string]map[string]index.Range{ + "foo": {"bar": index.Range{End: 8}}, + "bar": {"foo": index.Range{Start: 8, End: 1000012}}, + "cluster": {"us": index.Range{Start: 1000012, End: 1000020}}, + }, + seriesMaxSize: 1000, + seriesMatchRatio: 0.5, + postingGroups: []*postingGroup{ + {name: "foo", removeKeys: []string{"bar"}}, + {name: "bar", removeKeys: []string{"foo"}}, + {name: "cluster", addKeys: []string{"us"}}, + }, + expectedPostingGroups: []*postingGroup{ + {name: "cluster", addKeys: []string{"us"}, cardinality: 1}, + {name: "foo", removeKeys: []string{"bar"}, cardinality: 1}, + {name: "bar", removeKeys: []string{"foo"}, cardinality: 250000, lazy: true}, + }, + }, + } { + t.Run(tc.name, func(t *testing.T) { + headerReader := &mockIndexHeaderReader{postings: tc.inputPostings, err: tc.inputError} + block, err := newBucketBlock(ctx, logger, newBucketStoreMetrics(nil), meta, bkt, path.Join(dir, blockID.String()), nil, nil, headerReader, nil, nil, nil) + testutil.Ok(t, err) + ir := newBucketIndexReader(block) + pgs, emptyPosting, err := optimizePostingsFetchByDownloadedBytes(ir, tc.postingGroups, tc.seriesMaxSize, tc.seriesMatchRatio) + if err != nil { + testutil.Equals(t, tc.expectedError, err.Error()) + return + } + testutil.Equals(t, tc.expectedEmptyPosting, emptyPosting) + testutil.Equals(t, tc.expectedPostingGroups, pgs) + }) + } +} diff --git a/pkg/testutil/e2eutil/prometheus.go b/pkg/testutil/e2eutil/prometheus.go index f7c0ad98139..b2672e65967 100644 --- a/pkg/testutil/e2eutil/prometheus.go +++ b/pkg/testutil/e2eutil/prometheus.go @@ -23,6 +23,7 @@ import ( "testing" "time" + "github.com/efficientgo/core/testutil" "github.com/go-kit/log" "github.com/oklog/ulid" "github.com/pkg/errors" @@ -37,8 +38,6 @@ import ( "go.uber.org/atomic" "golang.org/x/sync/errgroup" - "github.com/efficientgo/core/testutil" - "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/runutil" ) @@ -440,15 +439,16 @@ func createBlockWithDelay(ctx context.Context, dir string, series []labels.Label return ulid.ULID{}, errors.Wrap(err, "create block id") } - m, err := metadata.ReadFromDir(path.Join(dir, blockID.String())) + bdir := path.Join(dir, blockID.String()) + m, err := metadata.ReadFromDir(bdir) if err != nil { return ulid.ULID{}, errors.Wrap(err, "open meta file") } + logger := log.NewNopLogger() m.ULID = id m.Compaction.Sources = []ulid.ULID{id} - - if err := m.WriteToDir(log.NewNopLogger(), path.Join(dir, blockID.String())); err != nil { + if err := m.WriteToDir(logger, path.Join(dir, blockID.String())); err != nil { return ulid.ULID{}, errors.Wrap(err, "write meta.json file") } @@ -549,6 +549,11 @@ func createBlock( } blockDir := filepath.Join(dir, id.String()) + logger := log.NewNopLogger() + seriesSize, err := gatherMaxSeriesSize(filepath.Join(blockDir, "index")) + if err != nil { + return id, errors.Wrap(err, "gather max series size") + } files := []metadata.File{} if hashFunc != metadata.NoneFunc { @@ -575,11 +580,12 @@ func createBlock( } } - if _, err = metadata.InjectThanos(log.NewNopLogger(), blockDir, metadata.Thanos{ + if _, err = metadata.InjectThanos(logger, blockDir, metadata.Thanos{ Labels: extLset.Map(), Downsample: metadata.ThanosDownsample{Resolution: resolution}, Source: metadata.TestSource, Files: files, + IndexStats: metadata.IndexStats{SeriesMaxSize: seriesSize}, }, nil); err != nil { return id, errors.Wrap(err, "finalize block") } @@ -593,6 +599,49 @@ func createBlock( return id, nil } +func gatherMaxSeriesSize(fn string) (int64, error) { + r, err := index.NewFileReader(fn) + if err != nil { + return 0, errors.Wrap(err, "open index file") + } + defer runutil.CloseWithErrCapture(&err, r, "gather index issue file reader") + + p, err := r.Postings(index.AllPostingsKey()) + if err != nil { + return 0, errors.Wrap(err, "get all postings") + } + + // As of version two all series entries are 16 byte padded. All references + // we get have to account for that to get the correct offset. + offsetMultiplier := 1 + version := r.Version() + if version >= 2 { + offsetMultiplier = 16 + } + + // Per series. + var ( + prevId storage.SeriesRef + maxSeriesSize int64 + ) + for p.Next() { + id := p.At() + if prevId != 0 { + // Approximate size. + seriesSize := int64(id-prevId) * int64(offsetMultiplier) + if seriesSize > maxSeriesSize { + maxSeriesSize = seriesSize + } + } + prevId = id + } + if p.Err() != nil { + return 0, errors.Wrap(err, "walk postings") + } + + return maxSeriesSize, nil +} + var indexFilename = "index" type indexWriterSeries struct { diff --git a/test/e2e/store_gateway_test.go b/test/e2e/store_gateway_test.go index d3c3de945e3..7d6622ec854 100644 --- a/test/e2e/store_gateway_test.go +++ b/test/e2e/store_gateway_test.go @@ -1047,3 +1047,108 @@ config: testutil.Ok(t, s1.WaitSumMetricsWithOptions(e2emon.Equals(1), []string{`thanos_store_index_cache_hits_total`}, e2emon.WithLabelMatchers(matchers.MustNewMatcher(matchers.MatchEqual, "item_type", "ExpandedPostings")))) }) } + +func TestStoreGatewayLazyExpandedPostingsEnabled(t *testing.T) { + t.Parallel() + + e, err := e2e.NewDockerEnvironment("memcached-exp") + testutil.Ok(t, err) + t.Cleanup(e2ethanos.CleanScenario(t, e)) + + const bucket = "store-gateway-lazy-expanded-postings-test" + m := e2edb.NewMinio(e, "thanos-minio", bucket, e2edb.WithMinioTLS()) + testutil.Ok(t, e2e.StartAndWaitReady(m)) + + memcached := e2ethanos.NewMemcached(e, "1") + testutil.Ok(t, e2e.StartAndWaitReady(memcached)) + + indexCacheConfig := fmt.Sprintf(`type: MEMCACHED +config: + addresses: [%s] + max_async_concurrency: 10 + dns_provider_update_interval: 1s + auto_discovery: false`, memcached.InternalEndpoint("memcached")) + + s1 := e2ethanos.NewStoreGW( + e, + "1", + client.BucketConfig{ + Type: client.S3, + Config: e2ethanos.NewS3Config(bucket, m.InternalEndpoint("http"), m.InternalDir()), + }, + "", + indexCacheConfig, + []string{"--store.enable-lazy-expanded-postings"}, + ) + testutil.Ok(t, e2e.StartAndWaitReady(s1)) + + q := e2ethanos.NewQuerierBuilder(e, "1", s1.InternalEndpoint("grpc")).Init() + testutil.Ok(t, e2e.StartAndWaitReady(q)) + + dir := filepath.Join(e.SharedDir(), "tmp") + testutil.Ok(t, os.MkdirAll(dir, os.ModePerm)) + + series := []labels.Labels{labels.FromStrings("a", "1", "b", "2")} + extLset := labels.FromStrings("ext1", "value1", "replica", "1") + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute) + t.Cleanup(cancel) + + now := time.Now() + id, err := e2eutil.CreateBlockWithBlockDelay(ctx, dir, series, 10, timestamp.FromTime(now), timestamp.FromTime(now.Add(2*time.Hour)), 30*time.Minute, extLset, 0, metadata.NoneFunc) + testutil.Ok(t, err) + + l := log.NewLogfmtLogger(os.Stdout) + bkt, err := s3.NewBucketWithConfig(l, + e2ethanos.NewS3Config(bucket, m.Endpoint("http"), m.Dir()), "test-feed") + testutil.Ok(t, err) + + testutil.Ok(t, objstore.UploadDir(ctx, l, bkt, path.Join(dir, id.String()), id.String())) + + // Wait for store to sync blocks. + // thanos_blocks_meta_synced: 1x loadedMeta 0x labelExcludedMeta 0x TooFreshMeta. + testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(1), "thanos_blocks_meta_synced")) + testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(0), "thanos_blocks_meta_sync_failures_total")) + + testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(1), "thanos_bucket_store_blocks_loaded")) + testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(0), "thanos_bucket_store_block_drops_total")) + testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(0), "thanos_bucket_store_block_load_failures_total")) + + t.Run("query with cache miss", func(t *testing.T) { + queryAndAssertSeries(t, ctx, q.Endpoint("http"), func() string { return testQuery }, + time.Now, promclient.QueryOptions{ + Deduplicate: false, + }, + []model.Metric{ + { + "a": "1", + "b": "2", + "ext1": "value1", + "replica": "1", + }, + }, + ) + + testutil.Ok(t, s1.WaitSumMetricsWithOptions(e2emon.Equals(1), []string{`thanos_store_index_cache_requests_total`}, e2emon.WithLabelMatchers(matchers.MustNewMatcher(matchers.MatchEqual, "item_type", "ExpandedPostings")))) + testutil.Ok(t, s1.WaitSumMetricsWithOptions(e2emon.Equals(0), []string{`thanos_store_index_cache_hits_total`}, e2emon.WithLabelMatchers(matchers.MustNewMatcher(matchers.MatchEqual, "item_type", "ExpandedPostings")))) + }) + + t.Run("query with cache hit", func(t *testing.T) { + queryAndAssertSeries(t, ctx, q.Endpoint("http"), func() string { return testQuery }, + time.Now, promclient.QueryOptions{ + Deduplicate: false, + }, + []model.Metric{ + { + "a": "1", + "b": "2", + "ext1": "value1", + "replica": "1", + }, + }, + ) + + testutil.Ok(t, s1.WaitSumMetricsWithOptions(e2emon.Equals(2), []string{`thanos_store_index_cache_requests_total`}, e2emon.WithLabelMatchers(matchers.MustNewMatcher(matchers.MatchEqual, "item_type", "ExpandedPostings")))) + testutil.Ok(t, s1.WaitSumMetricsWithOptions(e2emon.Equals(1), []string{`thanos_store_index_cache_hits_total`}, e2emon.WithLabelMatchers(matchers.MustNewMatcher(matchers.MatchEqual, "item_type", "ExpandedPostings")))) + }) +}