diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go index 3bc0082da3e..39f97918329 100644 --- a/cmd/thanos/store.go +++ b/cmd/thanos/store.go @@ -88,6 +88,7 @@ type storeConfig struct { reqLogConfig *extflag.PathOrContent lazyIndexReaderEnabled bool lazyIndexReaderIdleTimeout time.Duration + lazyExpandedPostingsEnabled bool } func (sc *storeConfig) registerFlag(cmd extkingpin.FlagClause) { @@ -182,6 +183,9 @@ func (sc *storeConfig) registerFlag(cmd extkingpin.FlagClause) { cmd.Flag("store.index-header-lazy-reader-idle-timeout", "If index-header lazy reader is enabled and this idle timeout setting is > 0, memory map-ed index-headers will be automatically released after 'idle timeout' inactivity."). Hidden().Default("5m").DurationVar(&sc.lazyIndexReaderIdleTimeout) + cmd.Flag("store.enable-lazy-expanded-postings", "If true, Store Gateway will estimate postings size and try to lazily expand postings if it downloads less data than expanding all postings."). + Default("false").BoolVar(&sc.lazyExpandedPostingsEnabled) + cmd.Flag("web.disable", "Disable Block Viewer UI.").Default("false").BoolVar(&sc.disableWeb) cmd.Flag("web.external-prefix", "Static prefix for all HTML links and redirect URLs in the bucket web UI interface. Actual endpoints are still served on / or the web.route-prefix. This allows thanos bucket web UI to be served behind a reverse proxy that strips a URL sub-path."). @@ -382,6 +386,7 @@ func runStore( } return conf.estimatedMaxChunkSize }), + store.WithLazyExpandedPostings(conf.lazyExpandedPostingsEnabled), } if conf.debugLogging { diff --git a/docs/components/store.md b/docs/components/store.md index 229c036775b..7b16d67930d 100644 --- a/docs/components/store.md +++ b/docs/components/store.md @@ -176,6 +176,11 @@ Flags: If true, Store Gateway will lazy memory map index-header only once the block is required by a query. + --store.enable-lazy-expanded-postings + If true, Store Gateway will estimate postings + size and try to lazily expand postings if + it downloads less data than expanding all + postings. --store.grpc.downloaded-bytes-limit=0 Maximum amount of downloaded (either fetched or touched) bytes in a single diff --git a/pkg/block/indexheader/binary_reader.go b/pkg/block/indexheader/binary_reader.go index 1befe63a7f2..16ef73ac3b0 100644 --- a/pkg/block/indexheader/binary_reader.go +++ b/pkg/block/indexheader/binary_reader.go @@ -47,6 +47,8 @@ const ( postingLengthFieldSize = 4 ) +var NotFoundRange = index.Range{Start: -1, End: -1} + // The table gets initialized with sync.Once but may still cause a race // with any other use of the crc32 package anywhere. Thus we initialize it // before. @@ -747,13 +749,18 @@ func (r *BinaryReader) IndexVersion() (int, error) { return r.indexVersion, nil } +// PostingsOffsets implements Reader. +func (r *BinaryReader) PostingsOffsets(name string, values ...string) ([]index.Range, error) { + return r.postingsOffset(name, values...) +} + // TODO(bwplotka): Get advantage of multi value offset fetch. func (r *BinaryReader) PostingsOffset(name, value string) (index.Range, error) { rngs, err := r.postingsOffset(name, value) if err != nil { return index.Range{}, err } - if len(rngs) != 1 { + if len(rngs) != 1 || rngs[0] == NotFoundRange { return index.Range{}, NotFoundRangeErr } return rngs[0], nil @@ -801,6 +808,7 @@ func (r *BinaryReader) postingsOffset(name string, values ...string) ([]index.Ra valueIndex := 0 for valueIndex < len(values) && values[valueIndex] < e.offsets[0].value { // Discard values before the start. + rngs = append(rngs, NotFoundRange) valueIndex++ } @@ -811,6 +819,9 @@ func (r *BinaryReader) postingsOffset(name string, values ...string) ([]index.Ra i := sort.Search(len(e.offsets), func(i int) bool { return e.offsets[i].value >= wantedValue }) if i == len(e.offsets) { // We're past the end. + for len(rngs) < len(values) { + rngs = append(rngs, NotFoundRange) + } break } if i > 0 && e.offsets[i].value != wantedValue { @@ -858,6 +869,8 @@ func (r *BinaryReader) postingsOffset(name string, values ...string) ([]index.Ra // Record on the way if wanted value is equal to the current value. if string(value) == wantedValue { newSameRngs = append(newSameRngs, index.Range{Start: postingOffset + postingLengthFieldSize}) + } else { + rngs = append(rngs, NotFoundRange) } valueIndex++ if valueIndex == len(values) { @@ -877,6 +890,10 @@ func (r *BinaryReader) postingsOffset(name string, values ...string) ([]index.Ra } if valueIndex != len(values) && wantedValue <= e.offsets[i+1].value { + // Increment i when wanted value is same as next offset. + if wantedValue == e.offsets[i+1].value { + i++ + } // wantedValue is smaller or same as the next offset we know about, let's iterate further to add those. continue } diff --git a/pkg/block/indexheader/header.go b/pkg/block/indexheader/header.go index 8ecef33564d..d0b4141afd8 100644 --- a/pkg/block/indexheader/header.go +++ b/pkg/block/indexheader/header.go @@ -20,10 +20,16 @@ type Reader interface { // IndexVersion returns version of index. IndexVersion() (int, error) + // PostingsOffsets returns start and end offsets for postings for given name and values. + // Input values need to be sorted. + // If the requested label name doesn't exist, then no posting and error will be returned. + // If the requested label name exists, but some values don't exist, the corresponding index range + // will be set to -1 for both start and end. + PostingsOffsets(name string, value ...string) ([]index.Range, error) + // PostingsOffset returns start and end offsets of postings for given name and value. // The end offset might be bigger than the actual posting ending, but not larger than the whole index file. // NotFoundRangeErr is returned when no index can be found for given name and value. - // TODO(bwplotka): Move to PostingsOffsets(name string, value ...string) []index.Range and benchmark. PostingsOffset(name string, value string) (index.Range, error) // LookupSymbol returns string based on given reference. diff --git a/pkg/block/indexheader/header_test.go b/pkg/block/indexheader/header_test.go index d0d7eb5f7dd..540e4d6c579 100644 --- a/pkg/block/indexheader/header_test.go +++ b/pkg/block/indexheader/header_test.go @@ -141,6 +141,38 @@ func TestReaders(t *testing.T) { testutil.Ok(t, err) testutil.Equals(t, []string(nil), vals) + // single value + rngs, err := br.PostingsOffsets("a", "9") + testutil.Ok(t, err) + for _, rng := range rngs { + testutil.Assert(t, rng.End > rng.Start) + } + + rngs, err = br.PostingsOffsets("a", "2", "3", "4", "5", "6", "7", "8", "9") + testutil.Ok(t, err) + for _, rng := range rngs { + testutil.Assert(t, rng.End > rng.Start) + } + + rngs, err = br.PostingsOffsets("a", "0") + testutil.Ok(t, err) + testutil.Assert(t, len(rngs) == 1) + testutil.Equals(t, NotFoundRange, rngs[0]) + + rngs, err = br.PostingsOffsets("a", "0", "10", "99") + testutil.Ok(t, err) + testutil.Assert(t, len(rngs) == 3) + for _, rng := range rngs { + testutil.Equals(t, NotFoundRange, rng) + } + + rngs, err = br.PostingsOffsets("a", "1", "10", "9") + testutil.Ok(t, err) + testutil.Assert(t, len(rngs) == 3) + testutil.Assert(t, rngs[0].End > rngs[0].Start) + testutil.Assert(t, rngs[2].End > rngs[2].Start) + testutil.Equals(t, NotFoundRange, rngs[1]) + // Regression tests for https://github.com/thanos-io/thanos/issues/2213. // Most of not existing value was working despite bug, except in certain unlucky cases // it was causing "invalid size" errors. diff --git a/pkg/block/indexheader/lazy_binary_reader.go b/pkg/block/indexheader/lazy_binary_reader.go index c3bee382c2f..451a79b6ee5 100644 --- a/pkg/block/indexheader/lazy_binary_reader.go +++ b/pkg/block/indexheader/lazy_binary_reader.go @@ -154,6 +154,19 @@ func (r *LazyBinaryReader) IndexVersion() (int, error) { return r.reader.IndexVersion() } +// PostingsOffsets implements Reader. +func (r *LazyBinaryReader) PostingsOffsets(name string, values ...string) ([]index.Range, error) { + r.readerMx.RLock() + defer r.readerMx.RUnlock() + + if err := r.load(); err != nil { + return nil, err + } + + r.usedAt.Store(time.Now().UnixNano()) + return r.reader.PostingsOffsets(name, values...) +} + // PostingsOffset implements Reader. func (r *LazyBinaryReader) PostingsOffset(name, value string) (index.Range, error) { r.readerMx.RLock() diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index cbec28a7c7b..170449a2f17 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -359,6 +359,8 @@ type BucketStore struct { enableChunkHashCalculation bool + enabledLazyExpandedPostings bool + blockEstimatedMaxSeriesFunc BlockEstimator blockEstimatedMaxChunkFunc BlockEstimator } @@ -463,6 +465,13 @@ func WithBlockEstimatedMaxChunkFunc(f BlockEstimator) BucketStoreOption { } } +// WithLazyExpandedPostings enables lazy expanded postings. +func WithLazyExpandedPostings(enabled bool) BucketStoreOption { + return func(s *BucketStore) { + s.enabledLazyExpandedPostings = enabled + } +} + // NewBucketStore creates a new bucket backed store that implements the store API against // an object store bucket. It is optimized to work against high latency backends. func NewBucketStore( @@ -888,18 +897,20 @@ type blockSeriesClient struct { skipChunks bool shardMatcher *storepb.ShardMatcher + blockMatchers []*labels.Matcher calculateChunkHash bool chunkFetchDuration prometheus.Histogram // Internal state. - i uint64 - postings []storage.SeriesRef - chkMetas []chunks.Meta - lset labels.Labels - symbolizedLset []symbolizedLabel - entries []seriesEntry - hasMorePostings bool - batchSize int + i uint64 + lazyPostings *lazyExpandedPostings + expandedPostings []storage.SeriesRef + chkMetas []chunks.Meta + lset labels.Labels + symbolizedLset []symbolizedLabel + entries []seriesEntry + hasMorePostings bool + batchSize int } func newBlockSeriesClient( @@ -909,6 +920,7 @@ func newBlockSeriesClient( req *storepb.SeriesRequest, limiter ChunksLimiter, bytesLimiter BytesLimiter, + blockMatchers []*labels.Matcher, shardMatcher *storepb.ShardMatcher, calculateChunkHash bool, batchSize int, @@ -940,6 +952,7 @@ func newBlockSeriesClient( loadAggregates: req.Aggregates, shardMatcher: shardMatcher, + blockMatchers: blockMatchers, calculateChunkHash: calculateChunkHash, hasMorePostings: true, batchSize: batchSize, @@ -965,23 +978,31 @@ func (b *blockSeriesClient) MergeStats(stats *queryStats) *queryStats { func (b *blockSeriesClient) ExpandPostings( matchers []*labels.Matcher, seriesLimiter SeriesLimiter, + lazyExpandedPostingEnabled bool, ) error { - ps, err := b.indexr.ExpandedPostings(b.ctx, matchers, b.bytesLimiter) + ps, err := b.indexr.ExpandedPostings(b.ctx, matchers, b.bytesLimiter, lazyExpandedPostingEnabled) if err != nil { return errors.Wrap(err, "expanded matching posting") } - if len(ps) == 0 { + if ps == nil || len(ps.postings) == 0 { + b.lazyPostings = emptyLazyPostings return nil } + b.lazyPostings = ps - if err := seriesLimiter.Reserve(uint64(len(ps))); err != nil { + // If lazy expanded posting enabled, it is possible to fetch more series + // so easier to hit the series limit. + if err := seriesLimiter.Reserve(uint64(len(ps.postings))); err != nil { return httpgrpc.Errorf(int(codes.ResourceExhausted), "exceeded series limit: %s", err) } - b.postings = ps - if b.batchSize > len(ps) { - b.batchSize = len(ps) + if b.batchSize > len(ps.postings) { + b.batchSize = len(ps.postings) + } + if b.lazyPostings.lazyExpanded() { + // Assume lazy expansion could cut actual expanded postings length to 50%. + b.expandedPostings = make([]storage.SeriesRef, 0, len(b.lazyPostings.postings)/2) } b.entries = make([]seriesEntry, 0, b.batchSize) return nil @@ -1013,14 +1034,26 @@ func (b *blockSeriesClient) Recv() (*storepb.SeriesResponse, error) { func (b *blockSeriesClient) nextBatch() error { start := b.i end := start + SeriesBatchSize - if end > uint64(len(b.postings)) { - end = uint64(len(b.postings)) + if end > uint64(len(b.lazyPostings.postings)) { + end = uint64(len(b.lazyPostings.postings)) } b.i = end - postingsBatch := b.postings[start:end] + postingsBatch := b.lazyPostings.postings[start:end] if len(postingsBatch) == 0 { b.hasMorePostings = false + if b.lazyPostings.lazyExpanded() { + v, err := b.indexr.IndexVersion() + if err != nil { + return errors.Wrap(err, "get index version") + } + if v >= 2 { + for i := range b.expandedPostings { + b.expandedPostings[i] = b.expandedPostings[i] / 16 + } + } + b.indexr.storeExpandedPostingsToCache(b.blockMatchers, index.NewListPostings(b.expandedPostings), len(b.expandedPostings)) + } return nil } @@ -1050,6 +1083,16 @@ func (b *blockSeriesClient) nextBatch() error { return errors.Wrap(err, "Lookup labels symbols") } + for _, matcher := range b.lazyPostings.matchers { + val := b.lset.Get(matcher.Name) + if !matcher.Matches(val) { + continue + } + } + if b.lazyPostings.lazyExpanded() { + b.expandedPostings = append(b.expandedPostings, postingsBatch[i]) + } + completeLabelset := labelpb.ExtendSortedLabels(b.lset, b.extLset) if !b.shardMatcher.MatchesLabels(completeLabelset) { continue @@ -1286,6 +1329,17 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie if !ok { continue } + // Sort matchers to make sure we generate the same cache key + // when fetching expanded postings. + sort.Slice(blockMatchers, func(i, j int) bool { + if blockMatchers[i].Type == blockMatchers[j].Type { + if blockMatchers[i].Name == blockMatchers[j].Name { + return blockMatchers[i].Value < blockMatchers[j].Value + } + return blockMatchers[i].Name < blockMatchers[j].Name + } + return blockMatchers[i].Type < blockMatchers[j].Type + }) blocks := bs.getFor(req.MinTime, req.MaxTime, req.MaxResolutionWindow, reqBlockMatchers) @@ -1311,6 +1365,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie req, chunksLimiter, bytesLimiter, + blockMatchers, shardMatcher, s.enableChunkHashCalculation, s.seriesBatchSize, @@ -1329,7 +1384,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie "block.resolution": blk.meta.Thanos.Downsample.Resolution, }) - if err := blockClient.ExpandPostings(blockMatchers, seriesLimiter); err != nil { + if err := blockClient.ExpandPostings(blockMatchers, seriesLimiter, s.enabledLazyExpandedPostings); err != nil { span.Finish() return errors.Wrapf(err, "fetch series for block %s", blk.meta.ULID) } @@ -1578,6 +1633,7 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq seriesReq, nil, bytesLimiter, + reqSeriesMatchersNoExtLabels, nil, true, SeriesBatchSize, @@ -1589,6 +1645,7 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq if err := blockClient.ExpandPostings( reqSeriesMatchersNoExtLabels, seriesLimiter, + s.enabledLazyExpandedPostings, ); err != nil { return err } @@ -1775,6 +1832,7 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR seriesReq, nil, bytesLimiter, + reqSeriesMatchersNoExtLabels, nil, true, SeriesBatchSize, @@ -1786,6 +1844,7 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR if err := blockClient.ExpandPostings( reqSeriesMatchersNoExtLabels, seriesLimiter, + s.enabledLazyExpandedPostings, ); err != nil { return err } @@ -2179,6 +2238,8 @@ type bucketIndexReader struct { mtx sync.Mutex loadedSeries map[storage.SeriesRef][]byte + + indexVersion int } func newBucketIndexReader(block *bucketBlock) *bucketIndexReader { @@ -2192,6 +2253,20 @@ func newBucketIndexReader(block *bucketBlock) *bucketIndexReader { } return r } + +// IndexVersion caches the index header version. +func (r *bucketIndexReader) IndexVersion() (int, error) { + if r.indexVersion != 0 { + return r.indexVersion, nil + } + v, err := r.block.indexHeaderReader.IndexVersion() + if err != nil { + return 0, err + } + r.indexVersion = v + return v, nil +} + func (r *bucketIndexReader) reset() { r.loadedSeries = map[storage.SeriesRef][]byte{} } @@ -2205,34 +2280,23 @@ func (r *bucketIndexReader) reset() { // Reminder: A posting is a reference (represented as a uint64) to a series reference, which in turn points to the first // chunk where the series contains the matching label-value pair for a given block of data. Postings can be fetched by // single label name=value. -func (r *bucketIndexReader) ExpandedPostings(ctx context.Context, ms []*labels.Matcher, bytesLimiter BytesLimiter) ([]storage.SeriesRef, error) { +func (r *bucketIndexReader) ExpandedPostings(ctx context.Context, ms []*labels.Matcher, bytesLimiter BytesLimiter, lazyExpandedPostingEnabled bool) (*lazyExpandedPostings, error) { // Shortcut the case of `len(postingGroups) == 0`. It will only happen when no // matchers specified, and we don't need to fetch expanded postings from cache. if len(ms) == 0 { return nil, nil } - // Sort matchers to make sure we generate the same cache key. - sort.Slice(ms, func(i, j int) bool { - if ms[i].Type == ms[j].Type { - if ms[i].Name == ms[j].Name { - return ms[i].Value < ms[j].Value - } - return ms[i].Name < ms[j].Name - } - return ms[i].Type < ms[j].Type - }) hit, postings, err := r.fetchExpandedPostingsFromCache(ctx, ms, bytesLimiter) if err != nil { return nil, err } if hit { - return postings, nil + return newLazyExpandedPostings(postings), nil } var ( allRequested = false hasAdds = false - keys []labels.Label ) postingGroups, err := matchersToPostingGroups(ctx, r.block.indexHeaderReader.LabelValues, ms) @@ -2243,83 +2307,50 @@ func (r *bucketIndexReader) ExpandedPostings(ctx context.Context, ms []*labels.M r.storeExpandedPostingsToCache(ms, index.EmptyPostings(), 0) return nil, nil } + i := 0 for _, pg := range postingGroups { allRequested = allRequested || pg.addAll hasAdds = hasAdds || len(pg.addKeys) > 0 - // Postings returned by fetchPostings will be in the same order as keys - // so it's important that we iterate them in the same order later. - // We don't have any other way of pairing keys and fetched postings. - for _, key := range pg.addKeys { - keys = append(keys, labels.Label{Name: pg.name, Value: key}) - } - for _, key := range pg.removeKeys { - keys = append(keys, labels.Label{Name: pg.name, Value: key}) + // If a posting group doesn't have any keys, like posting group created + // from `=~".*"`, we don't have to keep the posting group as long as we + // keep track of whether we need to add all postings or not. + if len(pg.addKeys) == 0 && len(pg.removeKeys) == 0 { + continue } + postingGroups[i] = pg + i++ } + postingGroups = postingGroups[:i] + addAllPostings := allRequested && !hasAdds // We only need special All postings if there are no other adds. If there are, we can skip fetching // special All postings completely. - if allRequested && !hasAdds { + if addAllPostings { // add group with label to fetch "special All postings". name, value := index.AllPostingsKey() - allPostingsLabel := labels.Label{Name: name, Value: value} - postingGroups = append(postingGroups, newPostingGroup(true, name, []string{value}, nil)) - keys = append(keys, allPostingsLabel) } - fetchedPostings, closeFns, err := r.fetchPostings(ctx, keys, bytesLimiter) - defer func() { - for _, closeFn := range closeFns { - closeFn() - } - }() + ps, err := fetchLazyExpandedPostings(ctx, postingGroups, r, bytesLimiter, addAllPostings, lazyExpandedPostingEnabled) if err != nil { - return nil, errors.Wrap(err, "get postings") - } - - // Get "add" and "remove" postings from groups. We iterate over postingGroups and their keys - // again, and this is exactly the same order as before (when building the groups), so we can simply - // use one incrementing index to fetch postings from returned slice. - postingIndex := 0 - - var groupAdds, groupRemovals []index.Postings - for _, g := range postingGroups { - // We cannot add empty set to groupAdds, since they are intersected. - if len(g.addKeys) > 0 { - toMerge := make([]index.Postings, 0, len(g.addKeys)) - for _, l := range g.addKeys { - toMerge = append(toMerge, checkNilPosting(g.name, l, fetchedPostings[postingIndex])) - postingIndex++ - } - - groupAdds = append(groupAdds, index.Merge(toMerge...)) - } - - for _, l := range g.removeKeys { - groupRemovals = append(groupRemovals, checkNilPosting(g.name, l, fetchedPostings[postingIndex])) - postingIndex++ - } + return nil, errors.Wrap(err, "fetch and expand postings") } - - result := index.Without(index.Intersect(groupAdds...), index.Merge(groupRemovals...)) - ps, err := ExpandPostingsWithContext(ctx, result) - if err != nil { - return nil, errors.Wrap(err, "expand") + // If postings still have matchers to be applied lazily, skip caching expanded postings. + if !ps.lazyExpanded() { + r.storeExpandedPostingsToCache(ms, index.NewListPostings(ps.postings), len(ps.postings)) } - r.storeExpandedPostingsToCache(ms, index.NewListPostings(ps), len(ps)) - if len(ps) > 0 { + if len(ps.postings) > 0 { // As of version two all series entries are 16 byte padded. All references // we get have to account for that to get the correct offset. - version, err := r.block.indexHeaderReader.IndexVersion() + version, err := r.IndexVersion() if err != nil { return nil, errors.Wrap(err, "get index version") } if version >= 2 { - for i, id := range ps { - ps[i] = id * 16 + for i, id := range ps.postings { + ps.postings[i] = id * 16 } } } @@ -2342,16 +2373,19 @@ func ExpandPostingsWithContext(ctx context.Context, p index.Postings) (res []sto // If addAll is not set: Merge of postings for "addKeys" labels minus postings for removeKeys labels // This computation happens in ExpandedPostings. type postingGroup struct { - addAll bool - name string - addKeys []string - removeKeys []string + addAll bool + name string + matchers []*labels.Matcher + addKeys []string + removeKeys []string + cardinality int64 + lazy bool } func newPostingGroup(addAll bool, name string, addKeys, removeKeys []string) *postingGroup { return &postingGroup{ - addAll: addAll, name: name, + addAll: addAll, addKeys: addKeys, removeKeys: removeKeys, } @@ -2453,12 +2487,16 @@ func checkNilPosting(name, value string, p index.Postings) index.Postings { } func matchersToPostingGroups(ctx context.Context, lvalsFn func(name string) ([]string, error), ms []*labels.Matcher) ([]*postingGroup, error) { - matchersMap := make(map[string][]*labels.Matcher) + matchersMap := make(map[string]map[string]*labels.Matcher) for _, m := range ms { - matchersMap[m.Name] = append(matchersMap[m.Name], m) + m := m + if _, ok := matchersMap[m.Name]; !ok { + matchersMap[m.Name] = make(map[string]*labels.Matcher) + } + matchersMap[m.Name][m.String()] = m } - pgs := make([]*postingGroup, 0) + pgs := make([]*postingGroup, 0, len(matchersMap)) // NOTE: Derived from tsdb.PostingsForMatchers. for _, values := range matchersMap { var ( @@ -2469,8 +2507,9 @@ func matchersToPostingGroups(ctx context.Context, lvalsFn func(name string) ([]s valuesCached bool ) lvalsFunc := lvalsFn + matchers := make([]*labels.Matcher, 0, len(vals)) // Merge PostingGroups with the same matcher into 1 to - // avoid fetching duplicate postings. + // avoid fetching duplicate postings. for _, val := range values { pg, vals, err = toPostingGroup(ctx, lvalsFunc, val) if err != nil { @@ -2502,7 +2541,19 @@ func matchersToPostingGroups(ctx context.Context, lvalsFn func(name string) ([]s if !mergedPG.addAll && len(mergedPG.addKeys) == 0 { return nil, nil } - } + matchers = append(matchers, val) + } + // Set and sort matchers to be used when picking up posting fetch strategy. + mergedPG.matchers = matchers + slices.SortFunc(mergedPG.matchers, func(a, b *labels.Matcher) bool { + if a.Type == b.Type { + if a.Name == b.Name { + return a.Value < b.Value + } + return a.Name < b.Name + } + return a.Type < b.Type + }) pgs = append(pgs, mergedPG) } slices.SortFunc(pgs, func(a, b *postingGroup) bool { diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index e4c1039e8ea..ee66008b266 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -1228,9 +1228,9 @@ func benchmarkExpandedPostings( t.ResetTimer() for i := 0; i < t.N(); i++ { - p, err := indexr.ExpandedPostings(context.Background(), c.matchers, NewBytesLimiterFactory(0)(nil)) + p, err := indexr.ExpandedPostings(context.Background(), c.matchers, NewBytesLimiterFactory(0)(nil), false) testutil.Ok(t, err) - testutil.Equals(t, c.expectedLen, len(p)) + testutil.Equals(t, c.expectedLen, len(p.postings)) } }) } @@ -1261,9 +1261,10 @@ func TestExpandedPostingsEmptyPostings(t *testing.T) { matcher1 := labels.MustNewMatcher(labels.MatchEqual, "j", "foo") // Match nothing. matcher2 := labels.MustNewMatcher(labels.MatchRegexp, "i", "500.*") - ps, err := indexr.ExpandedPostings(context.Background(), []*labels.Matcher{matcher1, matcher2}, NewBytesLimiterFactory(0)(nil)) + ctx := context.Background() + ps, err := indexr.ExpandedPostings(ctx, []*labels.Matcher{matcher1, matcher2}, NewBytesLimiterFactory(0)(nil), false) testutil.Ok(t, err) - testutil.Equals(t, len(ps), 0) + testutil.Equals(t, ps, (*lazyExpandedPostings)(nil)) // Make sure even if a matcher doesn't match any postings, we still cache empty expanded postings. testutil.Equals(t, 1, indexr.stats.cachedPostingsCompressions) } @@ -2571,13 +2572,14 @@ func benchmarkBlockSeriesWithConcurrency(b *testing.B, concurrency int, blockMet req, chunksLimiter, NewBytesLimiterFactory(0)(nil), + matchers, nil, false, SeriesBatchSize, dummyHistogram, nil, ) - testutil.Ok(b, blockClient.ExpandPostings(matchers, seriesLimiter)) + testutil.Ok(b, blockClient.ExpandPostings(matchers, seriesLimiter, false)) defer blockClient.Close() // Ensure at least 1 series has been returned (as expected). @@ -2632,9 +2634,10 @@ func TestMatchersToPostingGroup(t *testing.T) { }, expected: []*postingGroup{ { - name: "foo", - addAll: false, - addKeys: []string{"bar"}, + name: "foo", + addAll: false, + addKeys: []string{"bar"}, + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")}, }, }, }, @@ -2649,9 +2652,10 @@ func TestMatchersToPostingGroup(t *testing.T) { }, expected: []*postingGroup{ { - name: "foo", - addAll: false, - addKeys: []string{"bar"}, + name: "foo", + addAll: false, + addKeys: []string{"bar"}, + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")}, }, }, }, @@ -2669,9 +2673,10 @@ func TestMatchersToPostingGroup(t *testing.T) { }, expected: []*postingGroup{ { - name: "foo", - addAll: false, - addKeys: []string{"bar"}, + name: "foo", + addAll: false, + addKeys: []string{"bar"}, + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")}, }, }, }, @@ -2687,14 +2692,16 @@ func TestMatchersToPostingGroup(t *testing.T) { }, expected: []*postingGroup{ { - name: "bar", - addAll: false, - addKeys: []string{"baz"}, + name: "bar", + addAll: false, + addKeys: []string{"baz"}, + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "bar", "baz")}, }, { - name: "foo", - addAll: false, - addKeys: []string{"bar"}, + name: "foo", + addAll: false, + addKeys: []string{"bar"}, + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")}, }, }, }, @@ -2731,9 +2738,10 @@ func TestMatchersToPostingGroup(t *testing.T) { }, expected: []*postingGroup{ { - name: "foo", - addAll: false, - addKeys: []string{"bar"}, + name: "foo", + addAll: false, + addKeys: []string{"bar"}, + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"), labels.MustNewMatcher(labels.MatchNotEqual, "foo", "baz")}, }, }, }, @@ -2758,9 +2766,10 @@ func TestMatchersToPostingGroup(t *testing.T) { }, expected: []*postingGroup{ { - name: "foo", - addAll: false, - addKeys: []string{"bar"}, + name: "foo", + addAll: false, + addKeys: []string{"bar"}, + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"), labels.MustNewMatcher(labels.MatchRegexp, "foo", "b.*")}, }, }, }, @@ -2775,9 +2784,10 @@ func TestMatchersToPostingGroup(t *testing.T) { }, expected: []*postingGroup{ { - name: "foo", - addAll: false, - addKeys: []string{"bar"}, + name: "foo", + addAll: false, + addKeys: []string{"bar"}, + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"), labels.MustNewMatcher(labels.MatchNotEqual, "foo", "")}, }, }, }, @@ -2792,9 +2802,10 @@ func TestMatchersToPostingGroup(t *testing.T) { }, expected: []*postingGroup{ { - name: "foo", - addAll: false, - addKeys: []string{"bar"}, + name: "foo", + addAll: false, + addKeys: []string{"bar"}, + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"), labels.MustNewMatcher(labels.MatchRegexp, "foo", ".+")}, }, }, }, @@ -2809,9 +2820,10 @@ func TestMatchersToPostingGroup(t *testing.T) { }, expected: []*postingGroup{ { - name: "foo", - addAll: false, - addKeys: []string{"bar"}, + name: "foo", + addAll: false, + addKeys: []string{"bar"}, + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchRegexp, "foo", "bar|baz"), labels.MustNewMatcher(labels.MatchRegexp, "foo", "bar|buzz")}, }, }, }, @@ -2829,6 +2841,7 @@ func TestMatchersToPostingGroup(t *testing.T) { name: "foo", addAll: true, removeKeys: []string{"bar", "baz"}, + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchNotEqual, "foo", "bar"), labels.MustNewMatcher(labels.MatchNotEqual, "foo", "baz")}, }, }, }, @@ -2846,8 +2859,9 @@ func TestMatchersToPostingGroup(t *testing.T) { }, expected: []*postingGroup{ { - name: labels.MetricName, - addAll: true, + name: labels.MetricName, + addAll: true, + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchRegexp, "__name__", ".*")}, }, }, }, @@ -2865,18 +2879,21 @@ func TestMatchersToPostingGroup(t *testing.T) { }, expected: []*postingGroup{ { - name: labels.MetricName, - addAll: false, - addKeys: []string{"up"}, + name: labels.MetricName, + addAll: false, + addKeys: []string{"up"}, + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "__name__", "up")}, }, { - name: "cluster", - addAll: false, - addKeys: []string{"us-east-1", "us-west-2"}, + name: "cluster", + addAll: false, + addKeys: []string{"us-east-1", "us-west-2"}, + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchNotEqual, "cluster", "")}, }, { - name: "job", - addAll: true, + name: "job", + addAll: true, + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchRegexp, "job", ".*")}, }, }, }, @@ -2897,19 +2914,22 @@ func TestMatchersToPostingGroup(t *testing.T) { }, expected: []*postingGroup{ { - name: labels.MetricName, - addAll: false, - addKeys: []string{"go_info", "up"}, + name: labels.MetricName, + addAll: false, + addKeys: []string{"go_info", "up"}, + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchNotEqual, "__name__", "")}, }, { - name: "cluster", - addAll: false, - addKeys: []string{"us-east-1", "us-west-2"}, + name: "cluster", + addAll: false, + addKeys: []string{"us-east-1", "us-west-2"}, + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchNotEqual, "cluster", "")}, }, { - name: "job", - addAll: false, - addKeys: []string{"prometheus", "thanos"}, + name: "job", + addAll: false, + addKeys: []string{"prometheus", "thanos"}, + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchNotEqual, "job", "")}, }, }, }, diff --git a/pkg/store/lazy_postings.go b/pkg/store/lazy_postings.go new file mode 100644 index 00000000000..9fb5a175fe8 --- /dev/null +++ b/pkg/store/lazy_postings.go @@ -0,0 +1,259 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package store + +import ( + "context" + "github.com/thanos-io/thanos/pkg/block/indexheader" + "math" + + "github.com/pkg/errors" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/tsdb/index" + "golang.org/x/exp/slices" +) + +var emptyLazyPostings = &lazyExpandedPostings{postings: nil, matchers: nil} + +// lazyExpandedPostings contains expanded postings (series IDs). If lazy posting expansion is +// enabled, it might contain matchers that can be lazily applied during series filtering time. +type lazyExpandedPostings struct { + postings []storage.SeriesRef + matchers []*labels.Matcher +} + +func newLazyExpandedPostings(ps []storage.SeriesRef, matchers ...*labels.Matcher) *lazyExpandedPostings { + return &lazyExpandedPostings{ + postings: ps, + matchers: matchers, + } +} + +func (p *lazyExpandedPostings) lazyExpanded() bool { + return p != nil && len(p.matchers) > 0 +} + +func optimizePostingsFetchByDownloadedBytes(r *bucketIndexReader, postingGroups []*postingGroup, seriesMaxSize int64, seriesMatchRatio float64) ([]*postingGroup, bool, error) { + if len(postingGroups) <= 1 { + return postingGroups, false, nil + } + // Collect posting cardinality of each posting group. + for _, pg := range postingGroups { + // A posting group can have either add keys or remove keys but not both the same time. + vals := pg.addKeys + if len(pg.removeKeys) > 0 { + vals = pg.removeKeys + } + rngs, err := r.block.indexHeaderReader.PostingsOffsets(pg.name, vals...) + if err != nil { + return nil, false, errors.Wrapf(err, "postings offsets for %s", pg.name) + } + + // No posting ranges found means empty posting. + if len(rngs) == 0 { + return nil, true, nil + } + for _, r := range rngs { + if r == indexheader.NotFoundRange { + continue + } + pg.cardinality += (r.End - r.Start - 4) / 4 + } + } + slices.SortFunc(postingGroups, func(a, b *postingGroup) bool { + if a.cardinality == b.cardinality { + return a.name < b.name + } + return a.cardinality < b.cardinality + }) + + /* + Algorithm of choosing what postings we need to fetch right now and what + postings we expand lazily. + Sort posting groups by cardinality, so we can iterate from posting group with the smallest posting size. + The algorithm focuses on fetching fewer data, including postings and series. + + We need to fetch at least 1 posting group in order to fetch series. So if we only fetch the first posting group, + the data bytes we need to download is formula F1: P1 * 4 + P1 * S where P1 is the number of postings in group 1 + and S is the size per series. 4 is the byte size per posting. + + If we are going to fetch 2 posting groups, we can intersect the two postings to reduce series we need to download (hopefully). + Assuming for each intersection, the series matching ratio is R (0 < R < 1). Then the data bytes we need to download is + formula F2: P1 * 4 + P2 * 4 + P1 * S * R. + We can get formula F3 if we are going to fetch 3 posting groups: + F3: P1 * 4 + P2 * 4 + P3 * 4 + P1 * S * R^2. + + Let's compare formula F2 and F1 first. + P1 * 4 + P2 * 4 + P1 * S * R < P1 * 4 + P1 * S + => P2 * 4 < P1 * S * (1 - R) + Left hand side is the posting group size and right hand side is basically the series size we don't need to fetch + by having the additional intersection. In order to fetch less data for F2 than F1, we just need to ensure that + the additional postings size is smaller. + + Let's compare formula F3 and F2. + P1 * 4 + P2 * 4 + P3 * 4 + P1 * S * R^2 < P1 * 4 + P2 * 4 + P1 * S * R + => P3 * 4 < P1 * S * R * (1 - R) + Same as the previous formula. + + Compare formula F4 (Cost to fetch up to 4 posting groups) and F3. + P4 * 4 < P1 * S * R^2 * (1 - R) + + We can generalize this to formula: Pn * 4 < P1 * S * R^(n - 2) * (1 - R) + + The idea of the algorithm: + By iterating the posting group in sorted order of cardinality, we need to make sure that by fetching the current posting group, + the total data fetched is smaller than the previous posting group. If so, then we continue to next posting group, + otherwise we stop. + + This ensures that when we stop at one posting group, posting groups after it always need to fetch more data. + Based on formula Pn * 4 < P1 * S * R^(n - 2) * (1 - R), left hand side is always increasing while iterating to larger + posting groups while right hand side value is always decreasing as R < 1. + */ + seriesBytesToFetch := postingGroups[0].cardinality * seriesMaxSize + p := float64(1) + i := 1 // Start from index 1 as we always need to fetch the smallest posting group. + for i < len(postingGroups) { + pg := postingGroups[i] + // Need to fetch more data on postings than series we avoid fetching, stop here and lazy expanding rest of matchers. + if pg.cardinality*4 > int64(p*math.Ceil((1-seriesMatchRatio)*float64(seriesBytesToFetch))) { + break + } + p = p * seriesMatchRatio + i++ + } + for i < len(postingGroups) { + postingGroups[i].lazy = true + i++ + } + return postingGroups, false, nil +} + +func fetchLazyExpandedPostings( + ctx context.Context, + postingGroups []*postingGroup, + r *bucketIndexReader, + bytesLimiter BytesLimiter, + addAllPostings bool, + lazyExpandedPostingEnabled bool, +) (*lazyExpandedPostings, error) { + var ( + err error + emptyPostingGroup bool + ) + /* + There are several cases that we skip postings fetch optimization: + - Lazy expanded posting disabled. + - Add all postings. This means we don't have a posting group with any add keys. + - `SeriesMaxSize` not set for this block then we have no way to estimate series size. + - Only one effective posting group available. We need to at least download postings from 1 posting group so no need to optimize. + */ + if lazyExpandedPostingEnabled && !addAllPostings && + r.block.meta.Thanos.IndexStats.SeriesMaxSize > 0 && len(postingGroups) > 1 { + postingGroups, emptyPostingGroup, err = optimizePostingsFetchByDownloadedBytes( + r, + postingGroups, + r.block.meta.Thanos.IndexStats.SeriesMaxSize, + 0.5, // TODO(yeya24): Expose this as a flag. + ) + if err != nil { + return nil, err + } + if emptyPostingGroup { + return emptyLazyPostings, nil + } + } + + ps, matchers, err := fetchAndExpandPostingGroups(ctx, r, postingGroups, bytesLimiter) + if err != nil { + return nil, err + } + return &lazyExpandedPostings{postings: ps, matchers: matchers}, nil +} + +// keysToFetchFromPostingGroups returns label pairs (postings) to fetch +// and matchers we need to use for lazy posting expansion. +// Input `postingGroups` needs to be ordered by cardinality in case lazy +// expansion is enabled. When we find the first lazy posting group we can exit. +func keysToFetchFromPostingGroups(postingGroups []*postingGroup) ([]labels.Label, []*labels.Matcher) { + var lazyMatchers []*labels.Matcher + keys := make([]labels.Label, 0) + i := 0 + for i < len(postingGroups) { + pg := postingGroups[i] + if pg.lazy { + break + } + + // Postings returned by fetchPostings will be in the same order as keys + // so it's important that we iterate them in the same order later. + // We don't have any other way of pairing keys and fetched postings. + for _, key := range pg.addKeys { + keys = append(keys, labels.Label{Name: pg.name, Value: key}) + } + for _, key := range pg.removeKeys { + keys = append(keys, labels.Label{Name: pg.name, Value: key}) + } + i++ + } + if i < len(postingGroups) { + lazyMatchers = make([]*labels.Matcher, 0) + for i < len(postingGroups) { + lazyMatchers = append(lazyMatchers, postingGroups[i].matchers...) + i++ + } + } + return keys, lazyMatchers +} + +func fetchAndExpandPostingGroups(ctx context.Context, r *bucketIndexReader, postingGroups []*postingGroup, bytesLimiter BytesLimiter) ([]storage.SeriesRef, []*labels.Matcher, error) { + keys, lazyMatchers := keysToFetchFromPostingGroups(postingGroups) + fetchedPostings, closeFns, err := r.fetchPostings(ctx, keys, bytesLimiter) + defer func() { + for _, closeFn := range closeFns { + closeFn() + } + }() + if err != nil { + return nil, nil, errors.Wrap(err, "get postings") + } + + // Get "add" and "remove" postings from groups. We iterate over postingGroups and their keys + // again, and this is exactly the same order as before (when building the groups), so we can simply + // use one incrementing index to fetch postings from returned slice. + postingIndex := 0 + + var groupAdds, groupRemovals []index.Postings + for _, g := range postingGroups { + if g.lazy { + break + } + // We cannot add empty set to groupAdds, since they are intersected. + if len(g.addKeys) > 0 { + toMerge := make([]index.Postings, 0, len(g.addKeys)) + for _, l := range g.addKeys { + toMerge = append(toMerge, checkNilPosting(g.name, l, fetchedPostings[postingIndex])) + postingIndex++ + } + + groupAdds = append(groupAdds, index.Merge(toMerge...)) + } + + for _, l := range g.removeKeys { + groupRemovals = append(groupRemovals, checkNilPosting(g.name, l, fetchedPostings[postingIndex])) + postingIndex++ + } + } + + result := index.Without(index.Intersect(groupAdds...), index.Merge(groupRemovals...)) + + if ctx.Err() != nil { + return nil, nil, ctx.Err() + } + ps, err := ExpandPostingsWithContext(ctx, result) + if err != nil { + return nil, nil, errors.Wrap(err, "expand") + } + return ps, lazyMatchers, nil +} diff --git a/pkg/store/lazy_postings_test.go b/pkg/store/lazy_postings_test.go new file mode 100644 index 00000000000..bfe9c9e39a5 --- /dev/null +++ b/pkg/store/lazy_postings_test.go @@ -0,0 +1,472 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package store + +import ( + "context" + "errors" + "github.com/go-kit/log" + "github.com/oklog/ulid" + "github.com/prometheus/prometheus/tsdb" + "github.com/prometheus/prometheus/tsdb/index" + "github.com/thanos-io/objstore/providers/filesystem" + "github.com/thanos-io/thanos/pkg/block/indexheader" + "github.com/thanos-io/thanos/pkg/block/metadata" + "path" + "testing" + + "github.com/efficientgo/core/testutil" + "github.com/prometheus/prometheus/model/labels" +) + +func TestKeysToFetchFromPostingGroups(t *testing.T) { + for _, tc := range []struct { + name string + pgs []*postingGroup + expectedLabels []labels.Label + expectedMatchers []*labels.Matcher + }{ + { + name: "empty group", + pgs: []*postingGroup{ + { + addKeys: []string{}, + removeKeys: []string{}, + }, + }, + expectedLabels: []labels.Label{}, + }, + { + name: "empty groups", + pgs: []*postingGroup{ + { + addKeys: []string{}, + removeKeys: []string{}, + }, + { + addKeys: []string{}, + removeKeys: []string{}, + }, + { + addKeys: []string{}, + removeKeys: []string{}, + }, + }, + expectedLabels: []labels.Label{}, + }, + { + name: "group with add keys", + pgs: []*postingGroup{ + { + name: "test", + addKeys: []string{"foo", "bar"}, + removeKeys: []string{}, + }, + }, + expectedLabels: []labels.Label{{Name: "test", Value: "foo"}, {Name: "test", Value: "bar"}}, + }, + { + name: "group with remove keys", + pgs: []*postingGroup{ + { + name: "test", + addKeys: []string{}, + removeKeys: []string{"foo", "bar"}, + }, + }, + expectedLabels: []labels.Label{{Name: "test", Value: "foo"}, {Name: "test", Value: "bar"}}, + }, + { + name: "group with both add and remove keys", + pgs: []*postingGroup{ + { + name: "test", + addKeys: []string{"foo", "bar"}, + removeKeys: []string{"a", "b"}, + }, + }, + expectedLabels: []labels.Label{ + {Name: "test", Value: "foo"}, {Name: "test", Value: "bar"}, + {Name: "test", Value: "a"}, {Name: "test", Value: "b"}, + }, + }, + { + name: "groups with both add keys", + pgs: []*postingGroup{ + { + name: "test", + addKeys: []string{"foo", "bar"}, + }, + { + name: "foo", + addKeys: []string{"bar"}, + }, + }, + expectedLabels: []labels.Label{ + {Name: "test", Value: "foo"}, {Name: "test", Value: "bar"}, + {Name: "foo", Value: "bar"}, + }, + }, + { + name: "groups with add and remove keys", + pgs: []*postingGroup{ + { + name: "test", + addKeys: []string{"foo", "bar"}, + }, + { + name: "foo", + removeKeys: []string{"bar"}, + }, + }, + expectedLabels: []labels.Label{ + {Name: "test", Value: "foo"}, {Name: "test", Value: "bar"}, + {Name: "foo", Value: "bar"}, + }, + }, + { + name: "lazy posting group with empty matchers", + pgs: []*postingGroup{ + { + name: "test", + addKeys: []string{"foo", "bar"}, + matchers: []*labels.Matcher{}, + lazy: true, + }, + }, + expectedLabels: []labels.Label{}, + expectedMatchers: []*labels.Matcher{}, + }, + { + name: "lazy posting group", + pgs: []*postingGroup{ + { + name: "test", + addKeys: []string{"foo", "bar"}, + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")}, + lazy: true, + }, + }, + expectedLabels: []labels.Label{}, + expectedMatchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")}, + }, + { + name: "multiple lazy posting groups", + pgs: []*postingGroup{ + { + name: "test", + addKeys: []string{"foo", "bar"}, + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")}, + lazy: true, + }, + { + name: "job", + addKeys: []string{"prometheus"}, + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchRegexp, "job", "prometheus.*")}, + lazy: true, + }, + }, + expectedLabels: []labels.Label{}, + expectedMatchers: []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"), + labels.MustNewMatcher(labels.MatchRegexp, "job", "prometheus.*"), + }, + }, + { + name: "multiple non lazy and lazy posting groups", + pgs: []*postingGroup{ + { + name: "test", + addKeys: []string{"foo", "bar"}, + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")}, + }, + { + name: "test", + addKeys: []string{"foo", "bar"}, + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")}, + lazy: true, + }, + { + name: "job", + addKeys: []string{"prometheus"}, + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchRegexp, "job", "prometheus.*")}, + lazy: true, + }, + }, + expectedLabels: []labels.Label{{Name: "test", Value: "foo"}, {Name: "test", Value: "bar"}}, + expectedMatchers: []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"), + labels.MustNewMatcher(labels.MatchRegexp, "job", "prometheus.*"), + }, + }, + } { + t.Run(tc.name, func(t *testing.T) { + keys, matchers := keysToFetchFromPostingGroups(tc.pgs) + testutil.Equals(t, tc.expectedLabels, keys) + testutil.Equals(t, tc.expectedMatchers, matchers) + }) + } +} + +type mockIndexHeaderReader struct { + postings map[string]map[string]index.Range + err error +} + +func (h *mockIndexHeaderReader) Close() error { return nil } + +func (h *mockIndexHeaderReader) IndexVersion() (int, error) { return 0, nil } + +func (h *mockIndexHeaderReader) PostingsOffsets(name string, value ...string) ([]index.Range, error) { + ranges := make([]index.Range, 0) + if _, ok := h.postings[name]; !ok { + return nil, nil + } + for _, val := range value { + if rng, ok := h.postings[name][val]; ok { + ranges = append(ranges, rng) + } else { + ranges = append(ranges, indexheader.NotFoundRange) + } + } + return ranges, h.err +} + +func (h *mockIndexHeaderReader) PostingsOffset(name string, value string) (index.Range, error) { + return index.Range{}, nil +} + +func (h *mockIndexHeaderReader) LookupSymbol(o uint32) (string, error) { return "", nil } + +func (h *mockIndexHeaderReader) LabelValues(name string) ([]string, error) { return nil, nil } + +func (h *mockIndexHeaderReader) LabelNames() ([]string, error) { return nil, nil } + +func TestOptimizePostingsFetchByDownloadedBytes(t *testing.T) { + ctx := context.Background() + logger := log.NewNopLogger() + dir := t.TempDir() + bkt, err := filesystem.NewBucket(dir) + testutil.Ok(t, err) + defer func() { testutil.Ok(t, bkt.Close()) }() + + inputError := errors.New("random") + blockID := ulid.MustNew(1, nil) + meta := &metadata.Meta{ + BlockMeta: tsdb.BlockMeta{ULID: blockID}, + Thanos: metadata.Thanos{ + Labels: map[string]string{ + "a": "b", + "c": "d", + }, + }, + } + for _, tc := range []struct { + name string + inputPostings map[string]map[string]index.Range + inputError error + postingGroups []*postingGroup + seriesMaxSize int64 + seriesMatchRatio float64 + expectedPostingGroups []*postingGroup + expectedEmptyPosting bool + expectedError string + }{ + { + name: "empty posting group", + }, + { + name: "one posting group", + postingGroups: []*postingGroup{ + {name: "foo"}, + }, + expectedPostingGroups: []*postingGroup{ + {name: "foo"}, + }, + }, + { + name: "posting offsets return error", + inputPostings: map[string]map[string]index.Range{ + "foo": {"bar": index.Range{0, 8}}, + "bar": {"foo": index.Range{8, 16}}, + }, + inputError: inputError, + seriesMaxSize: 1000, + seriesMatchRatio: 0.5, + postingGroups: []*postingGroup{ + {name: "foo", addKeys: []string{"bar"}}, + {name: "bar", addKeys: []string{"foo"}}, + }, + expectedPostingGroups: nil, + expectedError: "postings offsets for foo: random", + }, + { + name: "posting offsets empty", + inputPostings: map[string]map[string]index.Range{ + "foo": {"bar": index.Range{0, 8}}, + }, + seriesMaxSize: 1000, + seriesMatchRatio: 0.5, + postingGroups: []*postingGroup{ + {name: "foo", addKeys: []string{"bar"}}, + {name: "bar", addKeys: []string{"foo"}}, + }, + expectedPostingGroups: nil, + expectedEmptyPosting: true, + }, + { + name: "posting group label doesn't exist", + inputPostings: map[string]map[string]index.Range{ + "foo": {"bar": index.Range{0, 8}}, + }, + seriesMaxSize: 1000, + seriesMatchRatio: 0.5, + postingGroups: []*postingGroup{ + {name: "foo", addKeys: []string{"bar"}}, + {name: "bar", addKeys: []string{"foo"}}, + }, + expectedPostingGroups: nil, + expectedEmptyPosting: true, + }, + { + name: "posting group keys partial exist", + inputPostings: map[string]map[string]index.Range{ + "foo": {"bar": index.Range{0, 8}}, + "bar": {"foo": index.Range{8, 16}}, + }, + seriesMaxSize: 1000, + seriesMatchRatio: 0.5, + postingGroups: []*postingGroup{ + {name: "foo", addKeys: []string{"bar"}}, + {name: "bar", addKeys: []string{"foo", "buz"}}, + }, + expectedPostingGroups: []*postingGroup{ + {name: "bar", addKeys: []string{"foo", "buz"}, cardinality: 1}, + {name: "foo", addKeys: []string{"bar"}, cardinality: 1}, + }, + }, + { + name: "two posting groups with add keys, small postings and large series size", + inputPostings: map[string]map[string]index.Range{ + "foo": {"bar": index.Range{0, 8}}, + "bar": {"foo": index.Range{8, 16}}, + }, + seriesMaxSize: 1000, + seriesMatchRatio: 0.5, + postingGroups: []*postingGroup{ + {name: "foo", addKeys: []string{"bar"}}, + {name: "bar", addKeys: []string{"foo"}}, + }, + expectedPostingGroups: []*postingGroup{ + {name: "bar", addKeys: []string{"foo"}, cardinality: 1}, + {name: "foo", addKeys: []string{"bar"}, cardinality: 1}, + }, + }, + { + name: "two posting groups with remove keys, small postings and large series size", + inputPostings: map[string]map[string]index.Range{ + "foo": {"bar": index.Range{0, 8}}, + "bar": {"foo": index.Range{8, 16}}, + }, + seriesMaxSize: 1000, + seriesMatchRatio: 0.5, + postingGroups: []*postingGroup{ + {name: "foo", removeKeys: []string{"bar"}}, + {name: "bar", removeKeys: []string{"foo"}}, + }, + expectedPostingGroups: []*postingGroup{ + {name: "bar", removeKeys: []string{"foo"}, cardinality: 1}, + {name: "foo", removeKeys: []string{"bar"}, cardinality: 1}, + }, + }, + { + name: "two posting groups with add keys, very small series size, making one posting group lazy", + inputPostings: map[string]map[string]index.Range{ + "foo": {"bar": index.Range{0, 8}}, + "bar": {"foo": index.Range{8, 16}}, + }, + seriesMaxSize: 1, + seriesMatchRatio: 0.5, + postingGroups: []*postingGroup{ + {name: "foo", addKeys: []string{"bar"}}, + {name: "bar", addKeys: []string{"foo"}}, + }, + expectedPostingGroups: []*postingGroup{ + {name: "bar", addKeys: []string{"foo"}, cardinality: 1}, + {name: "foo", addKeys: []string{"bar"}, cardinality: 1, lazy: true}, + }, + }, + { + name: "two posting groups with add keys, one small posting group and a very large posting group, large one become lazy", + inputPostings: map[string]map[string]index.Range{ + "foo": {"bar": index.Range{0, 8}}, + "bar": {"foo": index.Range{8, 1000012}}, + }, + seriesMaxSize: 1000, + seriesMatchRatio: 0.5, + postingGroups: []*postingGroup{ + {name: "foo", addKeys: []string{"bar"}}, + {name: "bar", addKeys: []string{"foo"}}, + }, + expectedPostingGroups: []*postingGroup{ + {name: "foo", addKeys: []string{"bar"}, cardinality: 1}, + {name: "bar", addKeys: []string{"foo"}, cardinality: 250000, lazy: true}, + }, + }, + { + name: "three posting groups with add keys, two small posting group and a very large posting group, large one become lazy", + inputPostings: map[string]map[string]index.Range{ + "foo": {"bar": index.Range{0, 8}}, + "bar": {"foo": index.Range{8, 1000012}}, + "cluster": {"us": index.Range{1000012, 1000020}}, + }, + seriesMaxSize: 1000, + seriesMatchRatio: 0.5, + postingGroups: []*postingGroup{ + {name: "foo", addKeys: []string{"bar"}}, + {name: "bar", addKeys: []string{"foo"}}, + {name: "cluster", addKeys: []string{"us"}}, + }, + expectedPostingGroups: []*postingGroup{ + {name: "cluster", addKeys: []string{"us"}, cardinality: 1}, + {name: "foo", addKeys: []string{"bar"}, cardinality: 1}, + {name: "bar", addKeys: []string{"foo"}, cardinality: 250000, lazy: true}, + }, + }, + { + name: "three posting groups with either add or remove keys, two small posting group and a very large posting group, large one become lazy", + inputPostings: map[string]map[string]index.Range{ + "foo": {"bar": index.Range{0, 8}}, + "bar": {"foo": index.Range{8, 1000012}}, + "cluster": {"us": index.Range{1000012, 1000020}}, + }, + seriesMaxSize: 1000, + seriesMatchRatio: 0.5, + postingGroups: []*postingGroup{ + {name: "foo", removeKeys: []string{"bar"}}, + {name: "bar", removeKeys: []string{"foo"}}, + {name: "cluster", addKeys: []string{"us"}}, + }, + expectedPostingGroups: []*postingGroup{ + {name: "cluster", addKeys: []string{"us"}, cardinality: 1}, + {name: "foo", removeKeys: []string{"bar"}, cardinality: 1}, + {name: "bar", removeKeys: []string{"foo"}, cardinality: 250000, lazy: true}, + }, + }, + } { + t.Run(tc.name, func(t *testing.T) { + headerReader := &mockIndexHeaderReader{postings: tc.inputPostings, err: tc.inputError} + block, err := newBucketBlock(ctx, logger, newBucketStoreMetrics(nil), meta, bkt, path.Join(dir, blockID.String()), nil, nil, headerReader, nil, nil, nil) + testutil.Ok(t, err) + ir := newBucketIndexReader(block) + pgs, emptyPosting, err := optimizePostingsFetchByDownloadedBytes(ir, tc.postingGroups, tc.seriesMaxSize, tc.seriesMatchRatio) + if err != nil { + testutil.Equals(t, tc.expectedError, err.Error()) + return + } + testutil.Equals(t, tc.expectedEmptyPosting, emptyPosting) + testutil.Equals(t, tc.expectedPostingGroups, pgs) + }) + } +} diff --git a/pkg/testutil/e2eutil/prometheus.go b/pkg/testutil/e2eutil/prometheus.go index f7c0ad98139..c5be3b71ef6 100644 --- a/pkg/testutil/e2eutil/prometheus.go +++ b/pkg/testutil/e2eutil/prometheus.go @@ -8,6 +8,7 @@ import ( "context" "encoding/json" "fmt" + "github.com/thanos-io/thanos/pkg/block" "math" "math/rand" "net/http" @@ -550,6 +551,16 @@ func createBlock( blockDir := filepath.Join(dir, id.String()) + newMeta, err := metadata.ReadFromDir(blockDir) + if err != nil { + return id, errors.Wrap(err, "read metadata") + } + logger := log.NewNopLogger() + stats, err := block.GatherIndexHealthStats(logger, filepath.Join(blockDir, block.IndexFilename), newMeta.MinTime, newMeta.MaxTime) + if err != nil { + return id, errors.Wrap(err, "gather index stats") + } + files := []metadata.File{} if hashFunc != metadata.NoneFunc { paths := []string{} @@ -575,11 +586,12 @@ func createBlock( } } - if _, err = metadata.InjectThanos(log.NewNopLogger(), blockDir, metadata.Thanos{ + if _, err = metadata.InjectThanos(logger, blockDir, metadata.Thanos{ Labels: extLset.Map(), Downsample: metadata.ThanosDownsample{Resolution: resolution}, Source: metadata.TestSource, Files: files, + IndexStats: metadata.IndexStats{SeriesMaxSize: stats.SeriesMaxSize, ChunkMaxSize: stats.ChunkMaxSize}, }, nil); err != nil { return id, errors.Wrap(err, "finalize block") }