diff --git a/CHANGELOG.md b/CHANGELOG.md index 114eef55d7..b894b378a1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,20 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re ### Added - [#7907](https://github.com/thanos-io/thanos/pull/7907) Receive: Add `--receive.grpc-service-config` flag to configure gRPC service config for the receivers. +- [#7961](https://github.com/thanos-io/thanos/pull/7961) Store Gateway: Add `--store.posting-group-max-keys` flag to mark posting group as lazy if it exceeds number of keys limit. Added `thanos_bucket_store_lazy_expanded_posting_groups_total` for total number of lazy posting groups and corresponding reasons. + +### Changed + +### Removed + +## [v0.37.2](https://github.com/thanos-io/thanos/tree/release-0.37) - 11.12.2024 + +### Fixed + +- [#7970](https://github.com/thanos-io/thanos/pull/7970) Sidecar: Respect min-time setting. +- [#7962](https://github.com/thanos-io/thanos/pull/7962) Store: Fix potential deadlock in hedging request. + +### Added ### Changed diff --git a/cmd/thanos/sidecar.go b/cmd/thanos/sidecar.go index 127584ea94..531ffb8ab4 100644 --- a/cmd/thanos/sidecar.go +++ b/cmd/thanos/sidecar.go @@ -505,7 +505,7 @@ func (s *promMetadata) UpdateTimestamps(ctx context.Context) error { return err } - s.mint = min(s.limitMinTime.PrometheusTimestamp(), mint) + s.mint = max(s.limitMinTime.PrometheusTimestamp(), mint) s.maxt = math.MaxInt64 return nil diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go index 795c108cd3..1cdc12679c 100644 --- a/cmd/thanos/store.go +++ b/cmd/thanos/store.go @@ -67,40 +67,41 @@ const ( ) type storeConfig struct { - indexCacheConfigs extflag.PathOrContent - objStoreConfig extflag.PathOrContent - dataDir string - cacheIndexHeader bool - grpcConfig grpcConfig - httpConfig httpConfig - indexCacheSizeBytes units.Base2Bytes - chunkPoolSize units.Base2Bytes - estimatedMaxSeriesSize uint64 - estimatedMaxChunkSize uint64 - seriesBatchSize int - storeRateLimits store.SeriesSelectLimits - maxDownloadedBytes units.Base2Bytes - maxConcurrency int - component component.StoreAPI - debugLogging bool - syncInterval time.Duration - blockListStrategy string - blockSyncConcurrency int - blockMetaFetchConcurrency int - filterConf *store.FilterConfig - selectorRelabelConf extflag.PathOrContent - advertiseCompatibilityLabel bool - consistencyDelay commonmodel.Duration - ignoreDeletionMarksDelay commonmodel.Duration - disableWeb bool - webConfig webConfig - label string - postingOffsetsInMemSampling int - cachingBucketConfig extflag.PathOrContent - reqLogConfig *extflag.PathOrContent - lazyIndexReaderEnabled bool - lazyIndexReaderIdleTimeout time.Duration - lazyExpandedPostingsEnabled bool + indexCacheConfigs extflag.PathOrContent + objStoreConfig extflag.PathOrContent + dataDir string + cacheIndexHeader bool + grpcConfig grpcConfig + httpConfig httpConfig + indexCacheSizeBytes units.Base2Bytes + chunkPoolSize units.Base2Bytes + estimatedMaxSeriesSize uint64 + estimatedMaxChunkSize uint64 + seriesBatchSize int + storeRateLimits store.SeriesSelectLimits + maxDownloadedBytes units.Base2Bytes + maxConcurrency int + component component.StoreAPI + debugLogging bool + syncInterval time.Duration + blockListStrategy string + blockSyncConcurrency int + blockMetaFetchConcurrency int + filterConf *store.FilterConfig + selectorRelabelConf extflag.PathOrContent + advertiseCompatibilityLabel bool + consistencyDelay commonmodel.Duration + ignoreDeletionMarksDelay commonmodel.Duration + disableWeb bool + webConfig webConfig + label string + postingOffsetsInMemSampling int + cachingBucketConfig extflag.PathOrContent + reqLogConfig *extflag.PathOrContent + lazyIndexReaderEnabled bool + lazyIndexReaderIdleTimeout time.Duration + lazyExpandedPostingsEnabled bool + postingGroupMaxKeySeriesRatio float64 indexHeaderLazyDownloadStrategy string } @@ -204,6 +205,9 @@ func (sc *storeConfig) registerFlag(cmd extkingpin.FlagClause) { cmd.Flag("store.enable-lazy-expanded-postings", "If true, Store Gateway will estimate postings size and try to lazily expand postings if it downloads less data than expanding all postings."). Default("false").BoolVar(&sc.lazyExpandedPostingsEnabled) + cmd.Flag("store.posting-group-max-key-series-ratio", "Mark posting group as lazy if it fetches more keys than R * max series the query should fetch. With R set to 100, a posting group which fetches 100K keys will be marked as lazy if the current query only fetches 1000 series. thanos_bucket_store_lazy_expanded_posting_groups_total shows lazy expanded postings groups with reasons and you can tune this config accordingly. This config is only valid if lazy expanded posting is enabled. 0 disables the limit."). + Default("100").Float64Var(&sc.postingGroupMaxKeySeriesRatio) + cmd.Flag("store.index-header-lazy-download-strategy", "Strategy of how to download index headers lazily. Supported values: eager, lazy. If eager, always download index header during initial load. If lazy, download index header during query time."). Default(string(indexheader.EagerDownloadStrategy)). EnumVar(&sc.indexHeaderLazyDownloadStrategy, string(indexheader.EagerDownloadStrategy), string(indexheader.LazyDownloadStrategy)) @@ -429,6 +433,7 @@ func runStore( return conf.estimatedMaxChunkSize }), store.WithLazyExpandedPostings(conf.lazyExpandedPostingsEnabled), + store.WithPostingGroupMaxKeySeriesRatio(conf.postingGroupMaxKeySeriesRatio), store.WithIndexHeaderLazyDownloadStrategy( indexheader.IndexHeaderLazyDownloadStrategy(conf.indexHeaderLazyDownloadStrategy).StrategyToDownloadFunc(), ), diff --git a/docs/components/store.md b/docs/components/store.md index dfab1855cc..ce2adb6d6d 100644 --- a/docs/components/store.md +++ b/docs/components/store.md @@ -250,6 +250,18 @@ Flags: The maximum series allowed for a single Series request. The Series call fails if this limit is exceeded. 0 means no limit. + --store.posting-group-max-key-series-ratio=100 + Mark posting group as lazy if it fetches more + keys than R * max series the query should + fetch. With R set to 100, a posting group which + fetches 100K keys will be marked as lazy if + the current query only fetches 1000 series. + thanos_bucket_store_lazy_expanded_posting_groups_total + shows lazy expanded postings groups with + reasons and you can tune this config + accordingly. This config is only valid if lazy + expanded posting is enabled. 0 disables the + limit. --sync-block-duration=15m Repeat interval for syncing the blocks between local and remote view. --tracing.config= diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index d9940221ff..1c434f503f 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -151,6 +151,7 @@ type bucketStoreMetrics struct { emptyPostingCount *prometheus.CounterVec lazyExpandedPostingsCount prometheus.Counter + lazyExpandedPostingGroupsByReason *prometheus.CounterVec lazyExpandedPostingSizeBytes prometheus.Counter lazyExpandedPostingSeriesOverfetchedSizeBytes prometheus.Counter @@ -345,6 +346,11 @@ func newBucketStoreMetrics(reg prometheus.Registerer) *bucketStoreMetrics { Help: "Total number of times when lazy expanded posting optimization applies.", }) + m.lazyExpandedPostingGroupsByReason = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Name: "thanos_bucket_store_lazy_expanded_posting_groups_total", + Help: "Total number of posting groups that are marked as lazy and corresponding reason", + }, []string{"reason"}) + m.lazyExpandedPostingSizeBytes = promauto.With(reg).NewCounter(prometheus.CounterOpts{ Name: "thanos_bucket_store_lazy_expanded_posting_size_bytes_total", Help: "Total number of lazy posting group size in bytes.", @@ -419,7 +425,8 @@ type BucketStore struct { enableChunkHashCalculation bool - enabledLazyExpandedPostings bool + enabledLazyExpandedPostings bool + postingGroupMaxKeySeriesRatio float64 sortingStrategy sortingStrategy @@ -552,6 +559,13 @@ func WithLazyExpandedPostings(enabled bool) BucketStoreOption { } } +// WithPostingGroupMaxKeySeriesRatio configures a threshold to mark a posting group as lazy if it has more add keys. +func WithPostingGroupMaxKeySeriesRatio(postingGroupMaxKeySeriesRatio float64) BucketStoreOption { + return func(s *BucketStore) { + s.postingGroupMaxKeySeriesRatio = postingGroupMaxKeySeriesRatio + } +} + // WithDontResort disables series resorting in Store Gateway. func WithDontResort(true bool) BucketStoreOption { return func(s *BucketStore) { @@ -1002,8 +1016,11 @@ type blockSeriesClient struct { chunksLimiter ChunksLimiter bytesLimiter BytesLimiter - lazyExpandedPostingEnabled bool + lazyExpandedPostingEnabled bool + // Mark posting group as lazy if it adds too many keys. 0 to disable. + postingGroupMaxKeySeriesRatio float64 lazyExpandedPostingsCount prometheus.Counter + lazyExpandedPostingGroupByReason *prometheus.CounterVec lazyExpandedPostingSizeBytes prometheus.Counter lazyExpandedPostingSeriesOverfetchedSizeBytes prometheus.Counter @@ -1046,7 +1063,9 @@ func newBlockSeriesClient( chunkFetchDurationSum *prometheus.HistogramVec, extLsetToRemove map[string]struct{}, lazyExpandedPostingEnabled bool, + postingGroupMaxKeySeriesRatio float64, lazyExpandedPostingsCount prometheus.Counter, + lazyExpandedPostingByReason *prometheus.CounterVec, lazyExpandedPostingSizeBytes prometheus.Counter, lazyExpandedPostingSeriesOverfetchedSizeBytes prometheus.Counter, tenant string, @@ -1081,7 +1100,9 @@ func newBlockSeriesClient( chunkFetchDurationSum: chunkFetchDurationSum, lazyExpandedPostingEnabled: lazyExpandedPostingEnabled, + postingGroupMaxKeySeriesRatio: postingGroupMaxKeySeriesRatio, lazyExpandedPostingsCount: lazyExpandedPostingsCount, + lazyExpandedPostingGroupByReason: lazyExpandedPostingByReason, lazyExpandedPostingSizeBytes: lazyExpandedPostingSizeBytes, lazyExpandedPostingSeriesOverfetchedSizeBytes: lazyExpandedPostingSeriesOverfetchedSizeBytes, @@ -1133,7 +1154,7 @@ func (b *blockSeriesClient) ExpandPostings( matchers sortedMatchers, seriesLimiter SeriesLimiter, ) error { - ps, err := b.indexr.ExpandedPostings(b.ctx, matchers, b.bytesLimiter, b.lazyExpandedPostingEnabled, b.lazyExpandedPostingSizeBytes, b.tenant) + ps, err := b.indexr.ExpandedPostings(b.ctx, matchers, b.bytesLimiter, b.lazyExpandedPostingEnabled, b.postingGroupMaxKeySeriesRatio, b.lazyExpandedPostingSizeBytes, b.lazyExpandedPostingGroupByReason, b.tenant) if err != nil { return errors.Wrap(err, "expanded matching posting") } @@ -1566,7 +1587,9 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store s.metrics.chunkFetchDurationSum, extLsetToRemove, s.enabledLazyExpandedPostings, + s.postingGroupMaxKeySeriesRatio, s.metrics.lazyExpandedPostingsCount, + s.metrics.lazyExpandedPostingGroupsByReason, s.metrics.lazyExpandedPostingSizeBytes, s.metrics.lazyExpandedPostingSeriesOverfetchedSizeBytes, tenant, @@ -1880,7 +1903,9 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq nil, extLsetToRemove, s.enabledLazyExpandedPostings, + s.postingGroupMaxKeySeriesRatio, s.metrics.lazyExpandedPostingsCount, + s.metrics.lazyExpandedPostingGroupsByReason, s.metrics.lazyExpandedPostingSizeBytes, s.metrics.lazyExpandedPostingSeriesOverfetchedSizeBytes, tenant, @@ -2106,7 +2131,9 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR nil, nil, s.enabledLazyExpandedPostings, + s.postingGroupMaxKeySeriesRatio, s.metrics.lazyExpandedPostingsCount, + s.metrics.lazyExpandedPostingGroupsByReason, s.metrics.lazyExpandedPostingSizeBytes, s.metrics.lazyExpandedPostingSeriesOverfetchedSizeBytes, tenant, @@ -2563,7 +2590,16 @@ func (r *bucketIndexReader) reset(size int) { // Reminder: A posting is a reference (represented as a uint64) to a series reference, which in turn points to the first // chunk where the series contains the matching label-value pair for a given block of data. Postings can be fetched by // single label name=value. -func (r *bucketIndexReader) ExpandedPostings(ctx context.Context, ms sortedMatchers, bytesLimiter BytesLimiter, lazyExpandedPostingEnabled bool, lazyExpandedPostingSizeBytes prometheus.Counter, tenant string) (*lazyExpandedPostings, error) { +func (r *bucketIndexReader) ExpandedPostings( + ctx context.Context, + ms sortedMatchers, + bytesLimiter BytesLimiter, + lazyExpandedPostingEnabled bool, + postingGroupMaxKeySeriesRatio float64, + lazyExpandedPostingSizeBytes prometheus.Counter, + lazyExpandedPostingGroupsByReason *prometheus.CounterVec, + tenant string, +) (*lazyExpandedPostings, error) { // Shortcut the case of `len(postingGroups) == 0`. It will only happen when no // matchers specified, and we don't need to fetch expanded postings from cache. if len(ms) == 0 { @@ -2615,7 +2651,7 @@ func (r *bucketIndexReader) ExpandedPostings(ctx context.Context, ms sortedMatch postingGroups = append(postingGroups, newPostingGroup(true, name, []string{value}, nil)) } - ps, err := fetchLazyExpandedPostings(ctx, postingGroups, r, bytesLimiter, addAllPostings, lazyExpandedPostingEnabled, lazyExpandedPostingSizeBytes, tenant) + ps, err := fetchLazyExpandedPostings(ctx, postingGroups, r, bytesLimiter, addAllPostings, lazyExpandedPostingEnabled, postingGroupMaxKeySeriesRatio, lazyExpandedPostingSizeBytes, lazyExpandedPostingGroupsByReason, tenant) if err != nil { return nil, errors.Wrap(err, "fetch and expand postings") } @@ -2661,13 +2697,14 @@ func ExpandPostingsWithContext(ctx context.Context, p index.Postings) ([]storage // If addAll is not set: Merge of postings for "addKeys" labels minus postings for removeKeys labels // This computation happens in ExpandedPostings. type postingGroup struct { - addAll bool - name string - matchers []*labels.Matcher - addKeys []string - removeKeys []string - cardinality int64 - lazy bool + addAll bool + name string + matchers []*labels.Matcher + addKeys []string + removeKeys []string + cardinality int64 + existentKeys int + lazy bool } func newPostingGroup(addAll bool, name string, addKeys, removeKeys []string) *postingGroup { diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index e8dffd093b..df4d1e189c 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -1288,7 +1288,9 @@ func benchmarkExpandedPostings( {`uniq=~"9|random-shuffled-values|1"`, []*labels.Matcher{iRegexBigValueSet}, bigValueSetSize}, } - dummyCounter := promauto.NewCounter(prometheus.CounterOpts{Name: "test"}) + reg := prometheus.NewRegistry() + dummyCounter := promauto.With(reg).NewCounter(prometheus.CounterOpts{Name: "test"}) + dummyCounterVec := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{Name: "test_counter_vec"}, []string{"reason"}) for _, c := range cases { t.Run(c.name, func(t testutil.TB) { b := &bucketBlock{ @@ -1304,7 +1306,7 @@ func benchmarkExpandedPostings( t.ResetTimer() for i := 0; i < t.N(); i++ { - p, err := indexr.ExpandedPostings(context.Background(), newSortedMatchers(c.matchers), NewBytesLimiterFactory(0)(nil), false, dummyCounter, tenancy.DefaultTenant) + p, err := indexr.ExpandedPostings(context.Background(), newSortedMatchers(c.matchers), NewBytesLimiterFactory(0)(nil), false, 0, dummyCounter, dummyCounterVec, tenancy.DefaultTenant) testutil.Ok(t, err) testutil.Equals(t, c.expectedLen, len(p.postings)) } @@ -1340,8 +1342,10 @@ func TestExpandedPostingsEmptyPostings(t *testing.T) { // Match nothing. matcher2 := labels.MustNewMatcher(labels.MatchRegexp, "i", "500.*") ctx := context.Background() - dummyCounter := promauto.With(prometheus.NewRegistry()).NewCounter(prometheus.CounterOpts{Name: "test"}) - ps, err := indexr.ExpandedPostings(ctx, newSortedMatchers([]*labels.Matcher{matcher1, matcher2}), NewBytesLimiterFactory(0)(nil), false, dummyCounter, tenancy.DefaultTenant) + reg := prometheus.NewRegistry() + dummyCounter := promauto.With(reg).NewCounter(prometheus.CounterOpts{Name: "test"}) + dummyCounterVec := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{Name: "test_counter_vec"}, []string{"reason"}) + ps, err := indexr.ExpandedPostings(ctx, newSortedMatchers([]*labels.Matcher{matcher1, matcher2}), NewBytesLimiterFactory(0)(nil), false, 0, dummyCounter, dummyCounterVec, tenancy.DefaultTenant) testutil.Ok(t, err) testutil.Equals(t, ps, (*lazyExpandedPostings)(nil)) // Make sure even if a matcher doesn't match any postings, we still cache empty expanded postings. @@ -1378,8 +1382,10 @@ func TestLazyExpandedPostingsEmptyPostings(t *testing.T) { matcher2 := labels.MustNewMatcher(labels.MatchRegexp, "n", "1_.*") matcher3 := labels.MustNewMatcher(labels.MatchRegexp, "i", ".+") ctx := context.Background() - dummyCounter := promauto.With(prometheus.NewRegistry()).NewCounter(prometheus.CounterOpts{Name: "test"}) - ps, err := indexr.ExpandedPostings(ctx, newSortedMatchers([]*labels.Matcher{matcher1, matcher2, matcher3}), NewBytesLimiterFactory(0)(nil), true, dummyCounter, tenancy.DefaultTenant) + reg := prometheus.NewRegistry() + dummyCounter := promauto.With(reg).NewCounter(prometheus.CounterOpts{Name: "test"}) + dummyCounterVec := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{Name: "test_counter_vec"}, []string{"reason"}) + ps, err := indexr.ExpandedPostings(ctx, newSortedMatchers([]*labels.Matcher{matcher1, matcher2, matcher3}), NewBytesLimiterFactory(0)(nil), true, 0, dummyCounter, dummyCounterVec, tenancy.DefaultTenant) testutil.Ok(t, err) // We expect emptyLazyPostings rather than lazy postings with 0 length but with matchers. testutil.Equals(t, ps, emptyLazyPostings) @@ -2872,7 +2878,9 @@ func benchmarkBlockSeriesWithConcurrency(b *testing.B, concurrency int, blockMet wg := sync.WaitGroup{} wg.Add(concurrency) - dummyCounter := promauto.NewCounter(prometheus.CounterOpts{Name: "test"}) + reg := prometheus.NewRegistry() + dummyCounter := promauto.With(reg).NewCounter(prometheus.CounterOpts{Name: "test"}) + dummyCounterVec := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{Name: "test_counter_vec"}, []string{"reason"}) for w := 0; w < concurrency; w++ { go func() { defer wg.Done() @@ -2917,7 +2925,9 @@ func benchmarkBlockSeriesWithConcurrency(b *testing.B, concurrency int, blockMet dummyHistogram, nil, false, + 0, dummyCounter, + dummyCounterVec, dummyCounter, dummyCounter, tenancy.DefaultTenant, @@ -3551,7 +3561,9 @@ func TestExpandedPostingsRace(t *testing.T) { l := sync.Mutex{} previousRefs := make(map[int][]storage.SeriesRef) - dummyCounter := promauto.With(prometheus.NewRegistry()).NewCounter(prometheus.CounterOpts{Name: "test"}) + reg := prometheus.NewRegistry() + dummyCounter := promauto.With(reg).NewCounter(prometheus.CounterOpts{Name: "test"}) + dummyCounterVec := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{Name: "test_counter_vec"}, []string{"reason"}) for { if tm.Err() != nil { @@ -3573,7 +3585,7 @@ func TestExpandedPostingsRace(t *testing.T) { wg.Add(1) go func(i int, bb *bucketBlock) { - refs, err := bb.indexReader(logger).ExpandedPostings(context.Background(), m, NewBytesLimiterFactory(0)(nil), false, dummyCounter, tenancy.DefaultTenant) + refs, err := bb.indexReader(logger).ExpandedPostings(context.Background(), m, NewBytesLimiterFactory(0)(nil), false, 0, dummyCounter, dummyCounterVec, tenancy.DefaultTenant) testutil.Ok(t, err) defer wg.Done() diff --git a/pkg/store/lazy_postings.go b/pkg/store/lazy_postings.go index f8363ab477..ef7ae5d00a 100644 --- a/pkg/store/lazy_postings.go +++ b/pkg/store/lazy_postings.go @@ -39,7 +39,15 @@ func (p *lazyExpandedPostings) lazyExpanded() bool { return p != nil && len(p.matchers) > 0 } -func optimizePostingsFetchByDownloadedBytes(r *bucketIndexReader, postingGroups []*postingGroup, seriesMaxSize int64, seriesMatchRatio float64, lazyExpandedPostingSizeBytes prometheus.Counter) ([]*postingGroup, bool, error) { +func optimizePostingsFetchByDownloadedBytes( + r *bucketIndexReader, + postingGroups []*postingGroup, + seriesMaxSize int64, + seriesMatchRatio float64, + postingGroupMaxKeySeriesRatio float64, + lazyExpandedPostingSizeBytes prometheus.Counter, + lazyExpandedPostingGroupsByReason *prometheus.CounterVec, +) ([]*postingGroup, bool, error) { if len(postingGroups) <= 1 { return postingGroups, false, nil } @@ -55,6 +63,7 @@ func optimizePostingsFetchByDownloadedBytes(r *bucketIndexReader, postingGroups return nil, false, errors.Wrapf(err, "postings offsets for %s", pg.name) } + existentKeys := 0 for _, rng := range rngs { if rng == indexheader.NotFoundRange { continue @@ -63,14 +72,16 @@ func optimizePostingsFetchByDownloadedBytes(r *bucketIndexReader, postingGroups level.Error(r.logger).Log("msg", "invalid index range, fallback to non lazy posting optimization") return postingGroups, false, nil } + existentKeys++ // Each range starts from the #entries field which is 4 bytes. // Need to subtract it when calculating number of postings. // https://github.com/prometheus/prometheus/blob/v2.46.0/tsdb/docs/format/index.md. pg.cardinality += (rng.End - rng.Start - 4) / 4 } + pg.existentKeys = existentKeys // If the posting group adds keys, 0 cardinality means the posting doesn't exist. // If the posting group removes keys, no posting ranges found is fine as it is a noop. - if len(pg.addKeys) > 0 && pg.cardinality == 0 { + if len(pg.addKeys) > 0 && pg.existentKeys == 0 { return nil, true, nil } } @@ -142,6 +153,7 @@ func optimizePostingsFetchByDownloadedBytes(r *bucketIndexReader, postingGroups // Assume only seriesMatchRatio postings will be matched every posting group. seriesMatched := postingGroups[i].cardinality - int64(math.Ceil(float64(negativeCardinalities)*seriesMatchRatio)) + maxSeriesMatched := seriesMatched i++ // Start from next posting group as we always need to fetch at least one posting group with add keys. for i < len(postingGroups) { pg := postingGroups[i] @@ -165,6 +177,13 @@ func optimizePostingsFetchByDownloadedBytes(r *bucketIndexReader, postingGroups seriesMatched -= underfetchedSeries underfetchedSeriesSize = underfetchedSeries * seriesMaxSize } else { + // Only mark posting group as lazy due to too many keys when those keys are known to be existent. + if postingGroupMaxKeySeriesRatio > 0 && maxSeriesMatched > 0 && + float64(pg.existentKeys)/float64(maxSeriesMatched) > postingGroupMaxKeySeriesRatio { + markPostingGroupLazy(pg, "keys_limit", lazyExpandedPostingSizeBytes, lazyExpandedPostingGroupsByReason) + i++ + continue + } underfetchedSeriesSize = seriesMaxSize * int64(math.Ceil(float64(seriesMatched)*(1-seriesMatchRatio))) seriesMatched = int64(math.Ceil(float64(seriesMatched) * seriesMatchRatio)) } @@ -176,13 +195,18 @@ func optimizePostingsFetchByDownloadedBytes(r *bucketIndexReader, postingGroups i++ } for i < len(postingGroups) { - postingGroups[i].lazy = true - lazyExpandedPostingSizeBytes.Add(float64(4 * postingGroups[i].cardinality)) + markPostingGroupLazy(postingGroups[i], "postings_size", lazyExpandedPostingSizeBytes, lazyExpandedPostingGroupsByReason) i++ } return postingGroups, false, nil } +func markPostingGroupLazy(pg *postingGroup, reason string, lazyExpandedPostingSizeBytes prometheus.Counter, lazyExpandedPostingGroupsByReason *prometheus.CounterVec) { + pg.lazy = true + lazyExpandedPostingSizeBytes.Add(float64(4 * pg.cardinality)) + lazyExpandedPostingGroupsByReason.WithLabelValues(reason).Inc() +} + func fetchLazyExpandedPostings( ctx context.Context, postingGroups []*postingGroup, @@ -190,7 +214,9 @@ func fetchLazyExpandedPostings( bytesLimiter BytesLimiter, addAllPostings bool, lazyExpandedPostingEnabled bool, + postingGroupMaxKeySeriesRatio float64, lazyExpandedPostingSizeBytes prometheus.Counter, + lazyExpandedPostingGroupsByReason *prometheus.CounterVec, tenant string, ) (*lazyExpandedPostings, error) { var ( @@ -212,7 +238,9 @@ func fetchLazyExpandedPostings( postingGroups, int64(r.block.estimatedMaxSeriesSize), 0.5, // TODO(yeya24): Expose this as a flag. + postingGroupMaxKeySeriesRatio, lazyExpandedPostingSizeBytes, + lazyExpandedPostingGroupsByReason, ) if err != nil { return nil, err @@ -243,27 +271,25 @@ func keysToFetchFromPostingGroups(postingGroups []*postingGroup) ([]labels.Label for i < len(postingGroups) { pg := postingGroups[i] if pg.lazy { - break + if len(lazyMatchers) == 0 { + lazyMatchers = make([]*labels.Matcher, 0) + } + lazyMatchers = append(lazyMatchers, postingGroups[i].matchers...) + } else { + // Postings returned by fetchPostings will be in the same order as keys + // so it's important that we iterate them in the same order later. + // We don't have any other way of pairing keys and fetched postings. + for _, key := range pg.addKeys { + keys = append(keys, labels.Label{Name: pg.name, Value: key}) + } + for _, key := range pg.removeKeys { + keys = append(keys, labels.Label{Name: pg.name, Value: key}) + } } - // Postings returned by fetchPostings will be in the same order as keys - // so it's important that we iterate them in the same order later. - // We don't have any other way of pairing keys and fetched postings. - for _, key := range pg.addKeys { - keys = append(keys, labels.Label{Name: pg.name, Value: key}) - } - for _, key := range pg.removeKeys { - keys = append(keys, labels.Label{Name: pg.name, Value: key}) - } i++ } - if i < len(postingGroups) { - lazyMatchers = make([]*labels.Matcher, 0) - for i < len(postingGroups) { - lazyMatchers = append(lazyMatchers, postingGroups[i].matchers...) - i++ - } - } + return keys, lazyMatchers } @@ -279,6 +305,18 @@ func fetchAndExpandPostingGroups(ctx context.Context, r *bucketIndexReader, post return nil, nil, errors.Wrap(err, "get postings") } + result := mergeFetchedPostings(ctx, fetchedPostings, postingGroups) + if err := ctx.Err(); err != nil { + return nil, nil, err + } + ps, err := ExpandPostingsWithContext(ctx, result) + if err != nil { + return nil, nil, errors.Wrap(err, "expand") + } + return ps, lazyMatchers, nil +} + +func mergeFetchedPostings(ctx context.Context, fetchedPostings []index.Postings, postingGroups []*postingGroup) index.Postings { // Get "add" and "remove" postings from groups. We iterate over postingGroups and their keys // again, and this is exactly the same order as before (when building the groups), so we can simply // use one incrementing index to fetch postings from returned slice. @@ -287,7 +325,7 @@ func fetchAndExpandPostingGroups(ctx context.Context, r *bucketIndexReader, post var groupAdds, groupRemovals []index.Postings for _, g := range postingGroups { if g.lazy { - break + continue } // We cannot add empty set to groupAdds, since they are intersected. if len(g.addKeys) > 0 { @@ -307,13 +345,5 @@ func fetchAndExpandPostingGroups(ctx context.Context, r *bucketIndexReader, post } result := index.Without(index.Intersect(groupAdds...), index.Merge(ctx, groupRemovals...)) - - if err := ctx.Err(); err != nil { - return nil, nil, err - } - ps, err := ExpandPostingsWithContext(ctx, result) - if err != nil { - return nil, nil, errors.Wrap(err, "expand") - } - return ps, lazyMatchers, nil + return result } diff --git a/pkg/store/lazy_postings_test.go b/pkg/store/lazy_postings_test.go index 06157affe0..cb52dac412 100644 --- a/pkg/store/lazy_postings_test.go +++ b/pkg/store/lazy_postings_test.go @@ -16,8 +16,10 @@ import ( "github.com/prometheus/client_golang/prometheus/promauto" promtest "github.com/prometheus/client_golang/prometheus/testutil" "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb" "github.com/prometheus/prometheus/tsdb/index" + "github.com/stretchr/testify/require" "github.com/thanos-io/objstore/providers/filesystem" "github.com/thanos-io/thanos/pkg/block/indexheader" @@ -206,6 +208,38 @@ func TestKeysToFetchFromPostingGroups(t *testing.T) { labels.MustNewMatcher(labels.MatchRegexp, "job", "prometheus.*"), }, }, + { + name: "multiple non lazy and lazy posting groups with lazy posting groups in the middle", + pgs: []*postingGroup{ + { + name: "test", + addKeys: []string{"foo", "bar"}, + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")}, + }, + { + name: "cluster", + addKeys: []string{"bar"}, + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "cluster", "bar")}, + lazy: true, + }, + { + name: "env", + addKeys: []string{"beta", "gamma", "prod"}, + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchRegexp, "env", "beta|gamma|prod")}, + lazy: true, + }, + { + name: "job", + addKeys: []string{"prometheus"}, + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchRegexp, "job", "prometheus.*")}, + }, + }, + expectedLabels: []labels.Label{{Name: "test", Value: "foo"}, {Name: "test", Value: "bar"}, {Name: "job", Value: "prometheus"}}, + expectedMatchers: []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchEqual, "cluster", "bar"), + labels.MustNewMatcher(labels.MatchRegexp, "env", "beta|gamma|prod"), + }, + }, } { t.Run(tc.name, func(t *testing.T) { keys, matchers := keysToFetchFromPostingGroups(tc.pgs) @@ -276,15 +310,16 @@ func TestOptimizePostingsFetchByDownloadedBytes(t *testing.T) { }, } for _, tc := range []struct { - name string - inputPostings map[string]map[string]index.Range - inputError error - postingGroups []*postingGroup - seriesMaxSize int64 - seriesMatchRatio float64 - expectedPostingGroups []*postingGroup - expectedEmptyPosting bool - expectedError string + name string + inputPostings map[string]map[string]index.Range + inputError error + postingGroups []*postingGroup + seriesMaxSize int64 + seriesMatchRatio float64 + postingGroupMaxKeySeriesRatio float64 + expectedPostingGroups []*postingGroup + expectedEmptyPosting bool + expectedError string }{ { name: "empty posting group", @@ -353,7 +388,7 @@ func TestOptimizePostingsFetchByDownloadedBytes(t *testing.T) { }, expectedPostingGroups: []*postingGroup{ {name: "bar", removeKeys: []string{"foo"}, cardinality: 0, addAll: true}, - {name: "foo", addKeys: []string{"bar"}, cardinality: 1}, + {name: "foo", addKeys: []string{"bar"}, cardinality: 1, existentKeys: 1}, }, }, { @@ -385,7 +420,7 @@ func TestOptimizePostingsFetchByDownloadedBytes(t *testing.T) { }, expectedPostingGroups: []*postingGroup{ {name: "bar", removeKeys: []string{"foo"}, cardinality: 0, addAll: true}, - {name: "foo", addKeys: []string{"bar"}, cardinality: 1}, + {name: "foo", addKeys: []string{"bar"}, cardinality: 1, existentKeys: 1}, }, }, { @@ -401,8 +436,8 @@ func TestOptimizePostingsFetchByDownloadedBytes(t *testing.T) { {name: "bar", addKeys: []string{"foo", "buz"}}, }, expectedPostingGroups: []*postingGroup{ - {name: "bar", addKeys: []string{"foo", "buz"}, cardinality: 1}, - {name: "foo", addKeys: []string{"bar"}, cardinality: 1}, + {name: "bar", addKeys: []string{"foo", "buz"}, cardinality: 1, existentKeys: 1}, + {name: "foo", addKeys: []string{"bar"}, cardinality: 1, existentKeys: 1}, }, }, { @@ -418,8 +453,97 @@ func TestOptimizePostingsFetchByDownloadedBytes(t *testing.T) { {name: "bar", addKeys: []string{"foo"}}, }, expectedPostingGroups: []*postingGroup{ - {name: "bar", addKeys: []string{"foo"}, cardinality: 1}, - {name: "foo", addKeys: []string{"bar"}, cardinality: 1}, + {name: "bar", addKeys: []string{"foo"}, cardinality: 1, existentKeys: 1}, + {name: "foo", addKeys: []string{"bar"}, cardinality: 1, existentKeys: 1}, + }, + }, + { + name: "two posting groups with add keys, posting group not marked as lazy due to some add keys don't exist", + inputPostings: map[string]map[string]index.Range{ + "foo": {"bar": index.Range{End: 8}}, + "bar": {"foo": index.Range{Start: 8, End: 16}}, + }, + seriesMaxSize: 1000, + seriesMatchRatio: 0.5, + postingGroups: []*postingGroup{ + {name: "foo", addKeys: []string{"bar"}}, + {name: "bar", addKeys: []string{"bar", "baz", "foo"}}, + }, + postingGroupMaxKeySeriesRatio: 2, + expectedPostingGroups: []*postingGroup{ + {name: "bar", addKeys: []string{"bar", "baz", "foo"}, cardinality: 1, existentKeys: 1}, + {name: "foo", addKeys: []string{"bar"}, cardinality: 1, existentKeys: 1}, + }, + }, + { + name: "two posting groups with add keys, first posting group not marked as lazy even though exceeding 2 keys due to we always mark first posting group as non lazy", + inputPostings: map[string]map[string]index.Range{ + "foo": {"bar": index.Range{End: 108}}, + "bar": {"foo": index.Range{Start: 108, End: 116}, "bar": index.Range{Start: 116, End: 124}, "baz": index.Range{Start: 124, End: 132}}, + }, + seriesMaxSize: 1000, + seriesMatchRatio: 0.5, + postingGroups: []*postingGroup{ + {name: "foo", addKeys: []string{"bar"}}, + {name: "bar", addKeys: []string{"bar", "baz", "foo"}}, + }, + postingGroupMaxKeySeriesRatio: 2, + expectedPostingGroups: []*postingGroup{ + {name: "bar", addKeys: []string{"bar", "baz", "foo"}, cardinality: 3, existentKeys: 3}, + {name: "foo", addKeys: []string{"bar"}, cardinality: 26, existentKeys: 1}, + }, + }, + { + name: "two posting groups with add keys, one posting group with too many keys not marked as lazy due to postingGroupMaxKeySeriesRatio not set", + inputPostings: map[string]map[string]index.Range{ + "foo": {"bar": index.Range{End: 8}}, + "bar": {"foo": index.Range{Start: 8, End: 16}, "bar": index.Range{Start: 16, End: 24}, "baz": index.Range{Start: 24, End: 32}}, + }, + seriesMaxSize: 1000, + seriesMatchRatio: 0.5, + postingGroups: []*postingGroup{ + {name: "foo", addKeys: []string{"bar"}}, + {name: "bar", addKeys: []string{"bar", "baz", "foo"}}, + }, + postingGroupMaxKeySeriesRatio: 0, + expectedPostingGroups: []*postingGroup{ + {name: "foo", addKeys: []string{"bar"}, cardinality: 1, existentKeys: 1}, + {name: "bar", addKeys: []string{"bar", "baz", "foo"}, cardinality: 3, existentKeys: 3}, + }, + }, + { + name: "two posting groups with add keys, one posting group marked as lazy due to exceeding postingGroupMaxKeySeriesRatio", + inputPostings: map[string]map[string]index.Range{ + "foo": {"bar": index.Range{End: 8}}, + "bar": {"foo": index.Range{Start: 8, End: 16}, "bar": index.Range{Start: 16, End: 24}, "baz": index.Range{Start: 24, End: 32}}, + }, + seriesMaxSize: 1000, + seriesMatchRatio: 0.5, + postingGroups: []*postingGroup{ + {name: "foo", addKeys: []string{"bar"}}, + {name: "bar", addKeys: []string{"bar", "baz", "foo"}}, + }, + postingGroupMaxKeySeriesRatio: 2, + expectedPostingGroups: []*postingGroup{ + {name: "foo", addKeys: []string{"bar"}, cardinality: 1, existentKeys: 1}, + {name: "bar", addKeys: []string{"bar", "baz", "foo"}, cardinality: 3, existentKeys: 3, lazy: true}, + }, + }, + { + name: "two posting groups with remove keys, minAddKeysToMarkLazy won't be applied", + inputPostings: map[string]map[string]index.Range{ + "foo": {"bar": index.Range{End: 8}}, + "bar": {"foo": index.Range{Start: 8, End: 16}, "baz": index.Range{Start: 16, End: 24}}, + }, + seriesMaxSize: 1000, + seriesMatchRatio: 0.5, + postingGroups: []*postingGroup{ + {addAll: true, name: "foo", removeKeys: []string{"bar"}}, + {addAll: true, name: "bar", removeKeys: []string{"baz", "foo"}}, + }, + expectedPostingGroups: []*postingGroup{ + {addAll: true, name: "foo", removeKeys: []string{"bar"}, cardinality: 1, existentKeys: 1}, + {addAll: true, name: "bar", removeKeys: []string{"baz", "foo"}, cardinality: 2, existentKeys: 2}, }, }, { @@ -437,8 +561,8 @@ func TestOptimizePostingsFetchByDownloadedBytes(t *testing.T) { {addAll: true, name: "bar", removeKeys: []string{"foo"}}, }, expectedPostingGroups: []*postingGroup{ - {addAll: true, name: "bar", removeKeys: []string{"foo"}, cardinality: 1}, - {addAll: true, name: "foo", removeKeys: []string{"bar"}, cardinality: 1}, + {addAll: true, name: "bar", removeKeys: []string{"foo"}, cardinality: 1, existentKeys: 1}, + {addAll: true, name: "foo", removeKeys: []string{"bar"}, cardinality: 1, existentKeys: 1}, }, }, { @@ -454,8 +578,8 @@ func TestOptimizePostingsFetchByDownloadedBytes(t *testing.T) { {name: "bar", addKeys: []string{"foo"}}, }, expectedPostingGroups: []*postingGroup{ - {addAll: true, name: "foo", removeKeys: []string{"bar"}, cardinality: 1}, - {name: "bar", addKeys: []string{"foo"}, cardinality: 250000}, + {addAll: true, name: "foo", removeKeys: []string{"bar"}, cardinality: 1, existentKeys: 1}, + {name: "bar", addKeys: []string{"foo"}, cardinality: 250000, existentKeys: 1}, }, }, { @@ -471,8 +595,8 @@ func TestOptimizePostingsFetchByDownloadedBytes(t *testing.T) { {name: "bar", addKeys: []string{"foo"}}, }, expectedPostingGroups: []*postingGroup{ - {name: "bar", addKeys: []string{"foo"}, cardinality: 1}, - {name: "foo", addKeys: []string{"bar"}, cardinality: 1, lazy: true}, + {name: "bar", addKeys: []string{"foo"}, cardinality: 1, existentKeys: 1}, + {name: "foo", addKeys: []string{"bar"}, cardinality: 1, existentKeys: 1, lazy: true}, }, }, { @@ -488,8 +612,8 @@ func TestOptimizePostingsFetchByDownloadedBytes(t *testing.T) { {name: "bar", addKeys: []string{"foo"}}, }, expectedPostingGroups: []*postingGroup{ - {name: "foo", addKeys: []string{"bar"}, cardinality: 1}, - {name: "bar", addKeys: []string{"foo"}, cardinality: 250000, lazy: true}, + {name: "foo", addKeys: []string{"bar"}, cardinality: 1, existentKeys: 1}, + {name: "bar", addKeys: []string{"foo"}, cardinality: 250000, existentKeys: 1, lazy: true}, }, }, { @@ -507,9 +631,51 @@ func TestOptimizePostingsFetchByDownloadedBytes(t *testing.T) { {name: "cluster", addKeys: []string{"us"}}, }, expectedPostingGroups: []*postingGroup{ - {name: "cluster", addKeys: []string{"us"}, cardinality: 1}, - {name: "foo", addKeys: []string{"bar"}, cardinality: 1}, - {name: "bar", addKeys: []string{"foo"}, cardinality: 250000, lazy: true}, + {name: "cluster", addKeys: []string{"us"}, cardinality: 1, existentKeys: 1}, + {name: "foo", addKeys: []string{"bar"}, cardinality: 1, existentKeys: 1}, + {name: "bar", addKeys: []string{"foo"}, cardinality: 250000, existentKeys: 1, lazy: true}, + }, + }, + { + name: "three posting groups with add keys, middle posting group marked as lazy due to too many add keys", + inputPostings: map[string]map[string]index.Range{ + "foo": {"bar": index.Range{End: 8}}, + "bar": {"bar": index.Range{Start: 8, End: 16}, "baz": index.Range{Start: 16, End: 24}, "foo": index.Range{Start: 24, End: 32}}, + "cluster": {"us": index.Range{Start: 32, End: 108}}, + }, + seriesMaxSize: 1000, + seriesMatchRatio: 0.5, + postingGroupMaxKeySeriesRatio: 2, + postingGroups: []*postingGroup{ + {name: "foo", addKeys: []string{"bar"}}, + {name: "bar", addKeys: []string{"bar", "baz", "foo"}}, + {name: "cluster", addKeys: []string{"us"}}, + }, + expectedPostingGroups: []*postingGroup{ + {name: "foo", addKeys: []string{"bar"}, cardinality: 1, existentKeys: 1}, + {name: "bar", addKeys: []string{"bar", "baz", "foo"}, cardinality: 3, existentKeys: 3, lazy: true}, + {name: "cluster", addKeys: []string{"us"}, cardinality: 18, existentKeys: 1}, + }, + }, + { + name: "three posting groups with add keys, bar not marked as lazy even though too many add keys due to first positive posting group sorted by cardinality", + inputPostings: map[string]map[string]index.Range{ + "foo": {"bar": index.Range{End: 8}}, + "bar": {"bar": index.Range{Start: 8, End: 16}, "baz": index.Range{Start: 16, End: 24}, "foo": index.Range{Start: 24, End: 32}}, + "cluster": {"us": index.Range{Start: 32, End: 108}}, + }, + seriesMaxSize: 1000, + seriesMatchRatio: 0.5, + postingGroupMaxKeySeriesRatio: 2, + postingGroups: []*postingGroup{ + {addAll: true, name: "foo", removeKeys: []string{"bar"}}, + {name: "bar", addKeys: []string{"bar", "baz", "foo"}}, + {name: "cluster", addKeys: []string{"us"}}, + }, + expectedPostingGroups: []*postingGroup{ + {addAll: true, name: "foo", removeKeys: []string{"bar"}, cardinality: 1, existentKeys: 1}, + {name: "bar", addKeys: []string{"bar", "baz", "foo"}, cardinality: 3, existentKeys: 3}, + {name: "cluster", addKeys: []string{"us"}, cardinality: 18, existentKeys: 1}, }, }, { @@ -527,9 +693,9 @@ func TestOptimizePostingsFetchByDownloadedBytes(t *testing.T) { {name: "cluster", addKeys: []string{"us"}}, }, expectedPostingGroups: []*postingGroup{ - {name: "cluster", addKeys: []string{"us"}, cardinality: 1}, - {addAll: true, name: "foo", removeKeys: []string{"bar"}, cardinality: 1}, - {addAll: true, name: "bar", removeKeys: []string{"foo"}, cardinality: 250000, lazy: true}, + {name: "cluster", addKeys: []string{"us"}, cardinality: 1, existentKeys: 1}, + {addAll: true, name: "foo", removeKeys: []string{"bar"}, cardinality: 1, existentKeys: 1}, + {addAll: true, name: "bar", removeKeys: []string{"foo"}, cardinality: 250000, existentKeys: 1, lazy: true}, }, }, { @@ -549,10 +715,10 @@ func TestOptimizePostingsFetchByDownloadedBytes(t *testing.T) { {name: "cluster", addKeys: []string{"us"}}, }, expectedPostingGroups: []*postingGroup{ - {addAll: true, name: "foo", removeKeys: []string{"bar"}, cardinality: 1}, - {name: "bar", addKeys: []string{"foo"}, cardinality: 500}, - {name: "baz", addKeys: []string{"foo"}, cardinality: 501}, - {name: "cluster", addKeys: []string{"us"}, cardinality: 250000, lazy: true}, + {addAll: true, name: "foo", removeKeys: []string{"bar"}, cardinality: 1, existentKeys: 1}, + {name: "bar", addKeys: []string{"foo"}, cardinality: 500, existentKeys: 1}, + {name: "baz", addKeys: []string{"foo"}, cardinality: 501, existentKeys: 1}, + {name: "cluster", addKeys: []string{"us"}, cardinality: 250000, existentKeys: 1, lazy: true}, }, }, } { @@ -563,7 +729,8 @@ func TestOptimizePostingsFetchByDownloadedBytes(t *testing.T) { testutil.Ok(t, err) ir := newBucketIndexReader(block, logger) dummyCounter := promauto.With(registry).NewCounter(prometheus.CounterOpts{Name: "test"}) - pgs, emptyPosting, err := optimizePostingsFetchByDownloadedBytes(ir, tc.postingGroups, tc.seriesMaxSize, tc.seriesMatchRatio, dummyCounter) + dummyCounterVec := promauto.With(registry).NewCounterVec(prometheus.CounterOpts{Name: "test_counter_vec"}, []string{"reason"}) + pgs, emptyPosting, err := optimizePostingsFetchByDownloadedBytes(ir, tc.postingGroups, tc.seriesMaxSize, tc.seriesMatchRatio, tc.postingGroupMaxKeySeriesRatio, dummyCounter, dummyCounterVec) if err != nil { testutil.Equals(t, tc.expectedError, err.Error()) return @@ -580,3 +747,94 @@ func TestOptimizePostingsFetchByDownloadedBytes(t *testing.T) { }) } } + +func TestMergeFetchedPostings(t *testing.T) { + ctx := context.Background() + for _, tc := range []struct { + name string + fetchedPostings []index.Postings + postingGroups []*postingGroup + expectedSeriesRefs []storage.SeriesRef + }{ + { + name: "empty fetched postings and posting groups", + }, + { + name: "single posting group with 1 add key", + fetchedPostings: []index.Postings{index.NewListPostings([]storage.SeriesRef{1, 2, 3, 4, 5})}, + postingGroups: []*postingGroup{ + {name: "foo", addKeys: []string{"bar"}}, + }, + expectedSeriesRefs: []storage.SeriesRef{1, 2, 3, 4, 5}, + }, + { + name: "single posting group with multiple add keys, merge", + fetchedPostings: []index.Postings{ + index.NewListPostings([]storage.SeriesRef{1, 2, 3, 4, 5}), + index.NewListPostings([]storage.SeriesRef{6, 7, 8, 9}), + }, + postingGroups: []*postingGroup{ + {name: "foo", addKeys: []string{"bar", "baz"}}, + }, + expectedSeriesRefs: []storage.SeriesRef{1, 2, 3, 4, 5, 6, 7, 8, 9}, + }, + { + name: "multiple posting groups with add key, intersect", + fetchedPostings: []index.Postings{ + index.NewListPostings([]storage.SeriesRef{1, 2, 3, 4, 5}), + index.NewListPostings([]storage.SeriesRef{1, 2, 4}), + }, + postingGroups: []*postingGroup{ + {name: "foo", addKeys: []string{"bar"}}, + {name: "bar", addKeys: []string{"foo"}}, + }, + expectedSeriesRefs: []storage.SeriesRef{1, 2, 4}, + }, + { + name: "posting group with remove keys", + fetchedPostings: []index.Postings{ + index.NewListPostings([]storage.SeriesRef{1, 2, 3, 4, 5}), + index.NewListPostings([]storage.SeriesRef{1, 2, 4}), + }, + postingGroups: []*postingGroup{ + {name: "foo", addKeys: []string{"bar"}}, + {name: "bar", removeKeys: []string{"foo"}, addAll: true}, + }, + expectedSeriesRefs: []storage.SeriesRef{3, 5}, + }, + { + name: "multiple posting groups with add key and ignore lazy posting groups", + fetchedPostings: []index.Postings{ + index.NewListPostings([]storage.SeriesRef{1, 2, 3, 4, 5}), + }, + postingGroups: []*postingGroup{ + {name: "foo", addKeys: []string{"bar"}}, + {name: "bar", addKeys: []string{"foo"}, lazy: true}, + {name: "baz", addKeys: []string{"foo"}, lazy: true}, + {name: "job", addKeys: []string{"foo"}, lazy: true}, + }, + expectedSeriesRefs: []storage.SeriesRef{1, 2, 3, 4, 5}, + }, + { + name: "multiple posting groups with add key and non consecutive lazy posting groups", + fetchedPostings: []index.Postings{ + index.NewListPostings([]storage.SeriesRef{1, 2, 3, 4, 5}), + index.NewListPostings([]storage.SeriesRef{1, 2, 4}), + }, + postingGroups: []*postingGroup{ + {name: "foo", addKeys: []string{"bar"}}, + {name: "bar", addKeys: []string{"foo"}, lazy: true}, + {name: "baz", addKeys: []string{"foo"}}, + {name: "job", addKeys: []string{"foo"}, lazy: true}, + }, + expectedSeriesRefs: []storage.SeriesRef{1, 2, 4}, + }, + } { + t.Run(tc.name, func(t *testing.T) { + p := mergeFetchedPostings(ctx, tc.fetchedPostings, tc.postingGroups) + res, err := index.ExpandPostings(p) + require.NoError(t, err) + require.Equal(t, tc.expectedSeriesRefs, res) + }) + } +}