From 13ff6a7da2dc3e403f275ae19897b368798cd32d Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Tue, 10 Dec 2024 00:01:41 -0800 Subject: [PATCH] update thanos to latest main to pull lazy posting improvements Signed-off-by: Ben Ye --- go.mod | 2 +- go.sum | 6 +- pkg/storage/tsdb/config.go | 14 +++- pkg/storegateway/bucket_store_metrics.go | 7 ++ pkg/storegateway/bucket_store_metrics_test.go | 12 ++++ pkg/storegateway/bucket_stores.go | 1 + .../thanos-io/thanos/pkg/store/bucket.go | 61 +++++++++++++---- .../thanos/pkg/store/lazy_postings.go | 68 +++++++++++++------ vendor/modules.txt | 2 +- 9 files changed, 132 insertions(+), 41 deletions(-) diff --git a/go.mod b/go.mod index 0c89405290..ab8919571b 100644 --- a/go.mod +++ b/go.mod @@ -53,7 +53,7 @@ require ( github.com/stretchr/testify v1.10.0 github.com/thanos-io/objstore v0.0.0-20241111205755-d1dd89d41f97 github.com/thanos-io/promql-engine v0.0.0-20241203103240-2f49f80c7c68 - github.com/thanos-io/thanos v0.37.2-0.20241205123958-d0d93dbf3efc + github.com/thanos-io/thanos v0.37.2-0.20241210071311-51c7dcd8c278 github.com/uber/jaeger-client-go v2.30.0+incompatible github.com/weaveworks/common v0.0.0-20230728070032-dd9e68f319d5 go.etcd.io/etcd/api/v3 v3.5.17 diff --git a/go.sum b/go.sum index f9c500b6d6..8516f29d23 100644 --- a/go.sum +++ b/go.sum @@ -1502,7 +1502,7 @@ github.com/opencontainers/image-spec v1.1.0 h1:8SG7/vwALn54lVB/0yZ/MMwhFrPYtpEHQ github.com/opencontainers/image-spec v1.1.0/go.mod h1:W4s4sFTMaBeK1BQLXbG4AdM2szdn85PY75RI83NrTrM= github.com/opentracing-contrib/go-grpc v0.1.0 h1:9JHDtQXv6UL0tFF8KJB/4ApJgeOcaHp1h07d0PJjESc= github.com/opentracing-contrib/go-grpc v0.1.0/go.mod h1:i3/jx/TvJZ/HKidtT4XGIi/NosUEpzS9xjVJctbKZzI= -github.com/opentracing-contrib/go-stdlib v1.1.0 h1:cZBWc4pA4e65tqTJddbflK435S0tDImj6c9BMvkdUH0= +github.com/opentracing-contrib/go-stdlib v1.1.0 h1:hSJ8yYaiAO/k2YZUeWJWpQCPE2wRCDtxRnir0gU6wbA= github.com/opentracing-contrib/go-stdlib v1.1.0/go.mod h1:S0p+X9p6dcBkoMTL+Qq2VOvxKs9ys5PpYWXWqlCS0bQ= github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= github.com/opentracing/opentracing-go v1.2.0 h1:uEJPy/1a5RIPAJ0Ov+OIO8OxWu77jEv+1B0VhjKrZUs= @@ -1665,8 +1665,8 @@ github.com/thanos-io/objstore v0.0.0-20241111205755-d1dd89d41f97 h1:VjG0mwhN1Dkn github.com/thanos-io/objstore v0.0.0-20241111205755-d1dd89d41f97/go.mod h1:vyzFrBXgP+fGNG2FopEGWOO/zrIuoy7zt3LpLeezRsw= github.com/thanos-io/promql-engine v0.0.0-20241203103240-2f49f80c7c68 h1:cChM/FbpXeYmrSmXO1/MmmSlONviLVxWAWCB0/g4JrY= github.com/thanos-io/promql-engine v0.0.0-20241203103240-2f49f80c7c68/go.mod h1:wx0JlRZtsB2S10JYUgeg5GqLfMxw31SzArP+28yyE00= -github.com/thanos-io/thanos v0.37.2-0.20241205123958-d0d93dbf3efc h1:LMpGIErJWqv+9FmCHAcl9t+6VL8gn6lptIKDgglbNnU= -github.com/thanos-io/thanos v0.37.2-0.20241205123958-d0d93dbf3efc/go.mod h1:5Ni7Uc1Bc8UCGOYmZ/2f/LVvDkZKNDdqDJZqjDFG+rI= +github.com/thanos-io/thanos v0.37.2-0.20241210071311-51c7dcd8c278 h1:5MYGbe7gYtPE/DYReOxrevi++3+mgwz5ud9ji/lwXrg= +github.com/thanos-io/thanos v0.37.2-0.20241210071311-51c7dcd8c278/go.mod h1:5Ni7Uc1Bc8UCGOYmZ/2f/LVvDkZKNDdqDJZqjDFG+rI= github.com/tjhop/slog-gokit v0.1.2 h1:pmQI4SvU9h4gA0vIQsdhJQSqQg4mOmsPykG2/PM3j1I= github.com/tjhop/slog-gokit v0.1.2/go.mod h1:8fhlcp8C8ELbg3GCyKv06tgt4B5sDq2P1r2DQAu1HuM= github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM= diff --git a/pkg/storage/tsdb/config.go b/pkg/storage/tsdb/config.go index 612061676c..a48cd3839c 100644 --- a/pkg/storage/tsdb/config.go +++ b/pkg/storage/tsdb/config.go @@ -52,9 +52,10 @@ var ( errEmptyBlockranges = errors.New("empty block ranges for TSDB") errUnSupportedWALCompressionType = errors.New("unsupported WAL compression type, valid types are (zstd, snappy and '')") - ErrInvalidBucketIndexBlockDiscoveryStrategy = errors.New("bucket index block discovery strategy can only be enabled when bucket index is enabled") - ErrBlockDiscoveryStrategy = errors.New("invalid block discovery strategy") - ErrInvalidTokenBucketBytesLimiterMode = errors.New("invalid token bucket bytes limiter mode") + ErrInvalidBucketIndexBlockDiscoveryStrategy = errors.New("bucket index block discovery strategy can only be enabled when bucket index is enabled") + ErrBlockDiscoveryStrategy = errors.New("invalid block discovery strategy") + ErrInvalidTokenBucketBytesLimiterMode = errors.New("invalid token bucket bytes limiter mode") + ErrInvalidLazyExpandedPostingGroupMaxKeySeriesRatio = errors.New("lazy expanded posting group max key series ratio needs to be greater than 1.0") ) // BlocksStorageConfig holds the config information for the blocks storage. @@ -291,6 +292,9 @@ type BucketStoreConfig struct { // Controls whether lazy expanded posting optimization is enabled or not. LazyExpandedPostingsEnabled bool `yaml:"lazy_expanded_postings_enabled"` + // Controls whether expanded posting group is marked as lazy or not depending on number of keys to fetch. + LazyExpandedPostingGroupMaxKeySeriesRatio float64 `yaml:"lazy_expanded_posting_group_max_key_series_ratio"` + // Controls the partitioner, used to aggregate multiple GET object API requests. // The config option is hidden until experimental. PartitionerMaxGapBytes uint64 `yaml:"partitioner_max_gap_bytes" doc:"hidden"` @@ -356,6 +360,7 @@ func (cfg *BucketStoreConfig) RegisterFlags(f *flag.FlagSet) { f.Uint64Var(&cfg.EstimatedMaxSeriesSizeBytes, "blocks-storage.bucket-store.estimated-max-series-size-bytes", store.EstimatedMaxSeriesSize, "Estimated max series size in bytes. Setting a large value might result in over fetching data while a small value might result in data refetch. Default value is 64KB.") f.Uint64Var(&cfg.EstimatedMaxChunkSizeBytes, "blocks-storage.bucket-store.estimated-max-chunk-size-bytes", store.EstimatedMaxChunkSize, "Estimated max chunk size in bytes. Setting a large value might result in over fetching data while a small value might result in data refetch. Default value is 16KiB.") f.BoolVar(&cfg.LazyExpandedPostingsEnabled, "blocks-storage.bucket-store.lazy-expanded-postings-enabled", false, "If true, Store Gateway will estimate postings size and try to lazily expand postings if it downloads less data than expanding all postings.") + f.Float64Var(&cfg.LazyExpandedPostingGroupMaxKeySeriesRatio, "blocks-storage.bucket-store.lazy-expanded-posting-group-max-key-series-ratio", 100, "Mark posting group as lazy if it fetches more keys than R * max series the query should fetch. With R set to 100, a posting group which fetches 100K keys will be marked as lazy if the current query only fetches 1000 series. This config is only valid if lazy expanded posting is enabled. 0 disables the limit.") f.IntVar(&cfg.SeriesBatchSize, "blocks-storage.bucket-store.series-batch-size", store.SeriesBatchSize, "Controls how many series to fetch per batch in Store Gateway. Default value is 10000.") f.StringVar(&cfg.BlockDiscoveryStrategy, "blocks-storage.bucket-store.block-discovery-strategy", string(ConcurrentDiscovery), "One of "+strings.Join(supportedBlockDiscoveryStrategies, ", ")+". When set to concurrent, stores will concurrently issue one call per directory to discover active blocks in the bucket. The recursive strategy iterates through all objects in the bucket, recursively traversing into each directory. This avoids N+1 calls at the expense of having slower bucket iterations. bucket_index strategy can be used in Compactor only and utilizes the existing bucket index to fetch block IDs to sync. This avoids iterating the bucket but can be impacted by delays of cleaner creating bucket index.") f.StringVar(&cfg.TokenBucketBytesLimiter.Mode, "blocks-storage.bucket-store.token-bucket-bytes-limiter.mode", string(TokenBucketBytesLimiterDisabled), fmt.Sprintf("Token bucket bytes limiter mode. Supported values are: %s", strings.Join(supportedTokenBucketBytesLimiterModes, ", "))) @@ -390,6 +395,9 @@ func (cfg *BucketStoreConfig) Validate() error { if !util.StringsContain(supportedTokenBucketBytesLimiterModes, cfg.TokenBucketBytesLimiter.Mode) { return ErrInvalidTokenBucketBytesLimiterMode } + if cfg.LazyExpandedPostingGroupMaxKeySeriesRatio <= 1.0 { + return ErrInvalidLazyExpandedPostingGroupMaxKeySeriesRatio + } return nil } diff --git a/pkg/storegateway/bucket_store_metrics.go b/pkg/storegateway/bucket_store_metrics.go index f351940bcf..8319e6a70f 100644 --- a/pkg/storegateway/bucket_store_metrics.go +++ b/pkg/storegateway/bucket_store_metrics.go @@ -46,6 +46,7 @@ type BucketStoreMetrics struct { chunkFetchDurationSum *prometheus.Desc lazyExpandedPostingsCount *prometheus.Desc + lazyExpandedPostingGroups *prometheus.Desc lazyExpandedPostingSizeBytes *prometheus.Desc lazyExpandedPostingSeriesOverfetchedSizeBytes *prometheus.Desc @@ -209,6 +210,10 @@ func NewBucketStoreMetrics() *BucketStoreMetrics { "cortex_bucket_store_lazy_expanded_postings_total", "Total number of lazy expanded postings when fetching block series.", nil, nil), + lazyExpandedPostingGroups: prometheus.NewDesc( + "cortex_bucket_store_lazy_expanded_posting_groups_total", + "Total number of posting groups that are marked as lazy and corresponding reason.", + []string{"reason"}, nil), lazyExpandedPostingSizeBytes: prometheus.NewDesc( "cortex_bucket_store_lazy_expanded_posting_size_bytes_total", "Total number of lazy posting group size in bytes.", @@ -269,6 +274,7 @@ func (m *BucketStoreMetrics) Describe(out chan<- *prometheus.Desc) { out <- m.indexHeaderLazyLoadDuration out <- m.lazyExpandedPostingsCount + out <- m.lazyExpandedPostingGroups out <- m.lazyExpandedPostingSizeBytes out <- m.lazyExpandedPostingSeriesOverfetchedSizeBytes } @@ -319,6 +325,7 @@ func (m *BucketStoreMetrics) Collect(out chan<- prometheus.Metric) { data.SendSumOfHistograms(out, m.indexHeaderLazyLoadDuration, "thanos_bucket_store_indexheader_lazy_load_duration_seconds") data.SendSumOfCounters(out, m.lazyExpandedPostingsCount, "thanos_bucket_store_lazy_expanded_postings_total") + data.SendSumOfCountersWithLabels(out, m.lazyExpandedPostingGroups, "thanos_bucket_store_lazy_expanded_posting_groups_total", "reason") data.SendSumOfCounters(out, m.lazyExpandedPostingSizeBytes, "thanos_bucket_store_lazy_expanded_posting_size_bytes_total") data.SendSumOfCounters(out, m.lazyExpandedPostingSeriesOverfetchedSizeBytes, "thanos_bucket_store_lazy_expanded_posting_series_overfetched_size_bytes_total") } diff --git a/pkg/storegateway/bucket_store_metrics_test.go b/pkg/storegateway/bucket_store_metrics_test.go index 650a015a49..079f2017f0 100644 --- a/pkg/storegateway/bucket_store_metrics_test.go +++ b/pkg/storegateway/bucket_store_metrics_test.go @@ -543,6 +543,10 @@ func TestBucketStoreMetrics(t *testing.T) { # HELP cortex_bucket_store_indexheader_lazy_unload_total Total number of index-header lazy unload operations. # TYPE cortex_bucket_store_indexheader_lazy_unload_total counter cortex_bucket_store_indexheader_lazy_unload_total 1.396178e+06 + # HELP cortex_bucket_store_lazy_expanded_posting_groups_total Total number of posting groups that are marked as lazy and corresponding reason. + # TYPE cortex_bucket_store_lazy_expanded_posting_groups_total counter + cortex_bucket_store_lazy_expanded_posting_groups_total{reason="keys_limit"} 202671 + cortex_bucket_store_lazy_expanded_posting_groups_total{reason="postings_size"} 225190 # HELP cortex_bucket_store_lazy_expanded_posting_series_overfetched_size_bytes_total Total number of series size in bytes overfetched due to posting lazy expansion. # TYPE cortex_bucket_store_lazy_expanded_posting_series_overfetched_size_bytes_total counter cortex_bucket_store_lazy_expanded_posting_series_overfetched_size_bytes_total 180152 @@ -687,6 +691,8 @@ func populateMockedBucketStoreMetrics(base float64) *prometheus.Registry { m.lazyExpandedPostingsCount.Add(6 * base) m.lazyExpandedPostingSizeBytes.Add(7 * base) m.lazyExpandedPostingSeriesOverfetchedSizeBytes.Add(8 * base) + m.lazyExpandedPostingGroups.WithLabelValues("keys_limit").Add(9 * base) + m.lazyExpandedPostingGroups.WithLabelValues("postings_size").Add(10 * base) return reg } @@ -733,6 +739,7 @@ type mockedBucketStoreMetrics struct { indexHeaderLazyLoadDuration prometheus.Histogram lazyExpandedPostingsCount prometheus.Counter + lazyExpandedPostingGroups *prometheus.CounterVec lazyExpandedPostingSizeBytes prometheus.Counter lazyExpandedPostingSeriesOverfetchedSizeBytes prometheus.Counter } @@ -917,6 +924,11 @@ func newMockedBucketStoreMetrics(reg prometheus.Registerer) *mockedBucketStoreMe Help: "Total number of times when lazy expanded posting optimization applies.", }) + m.lazyExpandedPostingGroups = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Name: "thanos_bucket_store_lazy_expanded_posting_groups_total", + Help: "Total number of posting groups that are marked as lazy and corresponding reason.", + }, []string{"reason"}) + m.lazyExpandedPostingSizeBytes = promauto.With(reg).NewCounter(prometheus.CounterOpts{ Name: "thanos_bucket_store_lazy_expanded_posting_size_bytes_total", Help: "Total number of lazy posting group size in bytes.", diff --git a/pkg/storegateway/bucket_stores.go b/pkg/storegateway/bucket_stores.go index 003fc91b03..5b6a3e07b8 100644 --- a/pkg/storegateway/bucket_stores.go +++ b/pkg/storegateway/bucket_stores.go @@ -625,6 +625,7 @@ func (u *BucketStores) getOrCreateStore(userID string) (*store.BucketStore, erro return u.cfg.BucketStore.EstimatedMaxSeriesSizeBytes }), store.WithLazyExpandedPostings(u.cfg.BucketStore.LazyExpandedPostingsEnabled), + store.WithPostingGroupMaxKeySeriesRatio(u.cfg.BucketStore.LazyExpandedPostingGroupMaxKeySeriesRatio), store.WithDontResort(true), // Cortex doesn't need to resort series in store gateway. } if u.logLevel.String() == "debug" { diff --git a/vendor/github.com/thanos-io/thanos/pkg/store/bucket.go b/vendor/github.com/thanos-io/thanos/pkg/store/bucket.go index d9940221ff..1c434f503f 100644 --- a/vendor/github.com/thanos-io/thanos/pkg/store/bucket.go +++ b/vendor/github.com/thanos-io/thanos/pkg/store/bucket.go @@ -151,6 +151,7 @@ type bucketStoreMetrics struct { emptyPostingCount *prometheus.CounterVec lazyExpandedPostingsCount prometheus.Counter + lazyExpandedPostingGroupsByReason *prometheus.CounterVec lazyExpandedPostingSizeBytes prometheus.Counter lazyExpandedPostingSeriesOverfetchedSizeBytes prometheus.Counter @@ -345,6 +346,11 @@ func newBucketStoreMetrics(reg prometheus.Registerer) *bucketStoreMetrics { Help: "Total number of times when lazy expanded posting optimization applies.", }) + m.lazyExpandedPostingGroupsByReason = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Name: "thanos_bucket_store_lazy_expanded_posting_groups_total", + Help: "Total number of posting groups that are marked as lazy and corresponding reason", + }, []string{"reason"}) + m.lazyExpandedPostingSizeBytes = promauto.With(reg).NewCounter(prometheus.CounterOpts{ Name: "thanos_bucket_store_lazy_expanded_posting_size_bytes_total", Help: "Total number of lazy posting group size in bytes.", @@ -419,7 +425,8 @@ type BucketStore struct { enableChunkHashCalculation bool - enabledLazyExpandedPostings bool + enabledLazyExpandedPostings bool + postingGroupMaxKeySeriesRatio float64 sortingStrategy sortingStrategy @@ -552,6 +559,13 @@ func WithLazyExpandedPostings(enabled bool) BucketStoreOption { } } +// WithPostingGroupMaxKeySeriesRatio configures a threshold to mark a posting group as lazy if it has more add keys. +func WithPostingGroupMaxKeySeriesRatio(postingGroupMaxKeySeriesRatio float64) BucketStoreOption { + return func(s *BucketStore) { + s.postingGroupMaxKeySeriesRatio = postingGroupMaxKeySeriesRatio + } +} + // WithDontResort disables series resorting in Store Gateway. func WithDontResort(true bool) BucketStoreOption { return func(s *BucketStore) { @@ -1002,8 +1016,11 @@ type blockSeriesClient struct { chunksLimiter ChunksLimiter bytesLimiter BytesLimiter - lazyExpandedPostingEnabled bool + lazyExpandedPostingEnabled bool + // Mark posting group as lazy if it adds too many keys. 0 to disable. + postingGroupMaxKeySeriesRatio float64 lazyExpandedPostingsCount prometheus.Counter + lazyExpandedPostingGroupByReason *prometheus.CounterVec lazyExpandedPostingSizeBytes prometheus.Counter lazyExpandedPostingSeriesOverfetchedSizeBytes prometheus.Counter @@ -1046,7 +1063,9 @@ func newBlockSeriesClient( chunkFetchDurationSum *prometheus.HistogramVec, extLsetToRemove map[string]struct{}, lazyExpandedPostingEnabled bool, + postingGroupMaxKeySeriesRatio float64, lazyExpandedPostingsCount prometheus.Counter, + lazyExpandedPostingByReason *prometheus.CounterVec, lazyExpandedPostingSizeBytes prometheus.Counter, lazyExpandedPostingSeriesOverfetchedSizeBytes prometheus.Counter, tenant string, @@ -1081,7 +1100,9 @@ func newBlockSeriesClient( chunkFetchDurationSum: chunkFetchDurationSum, lazyExpandedPostingEnabled: lazyExpandedPostingEnabled, + postingGroupMaxKeySeriesRatio: postingGroupMaxKeySeriesRatio, lazyExpandedPostingsCount: lazyExpandedPostingsCount, + lazyExpandedPostingGroupByReason: lazyExpandedPostingByReason, lazyExpandedPostingSizeBytes: lazyExpandedPostingSizeBytes, lazyExpandedPostingSeriesOverfetchedSizeBytes: lazyExpandedPostingSeriesOverfetchedSizeBytes, @@ -1133,7 +1154,7 @@ func (b *blockSeriesClient) ExpandPostings( matchers sortedMatchers, seriesLimiter SeriesLimiter, ) error { - ps, err := b.indexr.ExpandedPostings(b.ctx, matchers, b.bytesLimiter, b.lazyExpandedPostingEnabled, b.lazyExpandedPostingSizeBytes, b.tenant) + ps, err := b.indexr.ExpandedPostings(b.ctx, matchers, b.bytesLimiter, b.lazyExpandedPostingEnabled, b.postingGroupMaxKeySeriesRatio, b.lazyExpandedPostingSizeBytes, b.lazyExpandedPostingGroupByReason, b.tenant) if err != nil { return errors.Wrap(err, "expanded matching posting") } @@ -1566,7 +1587,9 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store s.metrics.chunkFetchDurationSum, extLsetToRemove, s.enabledLazyExpandedPostings, + s.postingGroupMaxKeySeriesRatio, s.metrics.lazyExpandedPostingsCount, + s.metrics.lazyExpandedPostingGroupsByReason, s.metrics.lazyExpandedPostingSizeBytes, s.metrics.lazyExpandedPostingSeriesOverfetchedSizeBytes, tenant, @@ -1880,7 +1903,9 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq nil, extLsetToRemove, s.enabledLazyExpandedPostings, + s.postingGroupMaxKeySeriesRatio, s.metrics.lazyExpandedPostingsCount, + s.metrics.lazyExpandedPostingGroupsByReason, s.metrics.lazyExpandedPostingSizeBytes, s.metrics.lazyExpandedPostingSeriesOverfetchedSizeBytes, tenant, @@ -2106,7 +2131,9 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR nil, nil, s.enabledLazyExpandedPostings, + s.postingGroupMaxKeySeriesRatio, s.metrics.lazyExpandedPostingsCount, + s.metrics.lazyExpandedPostingGroupsByReason, s.metrics.lazyExpandedPostingSizeBytes, s.metrics.lazyExpandedPostingSeriesOverfetchedSizeBytes, tenant, @@ -2563,7 +2590,16 @@ func (r *bucketIndexReader) reset(size int) { // Reminder: A posting is a reference (represented as a uint64) to a series reference, which in turn points to the first // chunk where the series contains the matching label-value pair for a given block of data. Postings can be fetched by // single label name=value. -func (r *bucketIndexReader) ExpandedPostings(ctx context.Context, ms sortedMatchers, bytesLimiter BytesLimiter, lazyExpandedPostingEnabled bool, lazyExpandedPostingSizeBytes prometheus.Counter, tenant string) (*lazyExpandedPostings, error) { +func (r *bucketIndexReader) ExpandedPostings( + ctx context.Context, + ms sortedMatchers, + bytesLimiter BytesLimiter, + lazyExpandedPostingEnabled bool, + postingGroupMaxKeySeriesRatio float64, + lazyExpandedPostingSizeBytes prometheus.Counter, + lazyExpandedPostingGroupsByReason *prometheus.CounterVec, + tenant string, +) (*lazyExpandedPostings, error) { // Shortcut the case of `len(postingGroups) == 0`. It will only happen when no // matchers specified, and we don't need to fetch expanded postings from cache. if len(ms) == 0 { @@ -2615,7 +2651,7 @@ func (r *bucketIndexReader) ExpandedPostings(ctx context.Context, ms sortedMatch postingGroups = append(postingGroups, newPostingGroup(true, name, []string{value}, nil)) } - ps, err := fetchLazyExpandedPostings(ctx, postingGroups, r, bytesLimiter, addAllPostings, lazyExpandedPostingEnabled, lazyExpandedPostingSizeBytes, tenant) + ps, err := fetchLazyExpandedPostings(ctx, postingGroups, r, bytesLimiter, addAllPostings, lazyExpandedPostingEnabled, postingGroupMaxKeySeriesRatio, lazyExpandedPostingSizeBytes, lazyExpandedPostingGroupsByReason, tenant) if err != nil { return nil, errors.Wrap(err, "fetch and expand postings") } @@ -2661,13 +2697,14 @@ func ExpandPostingsWithContext(ctx context.Context, p index.Postings) ([]storage // If addAll is not set: Merge of postings for "addKeys" labels minus postings for removeKeys labels // This computation happens in ExpandedPostings. type postingGroup struct { - addAll bool - name string - matchers []*labels.Matcher - addKeys []string - removeKeys []string - cardinality int64 - lazy bool + addAll bool + name string + matchers []*labels.Matcher + addKeys []string + removeKeys []string + cardinality int64 + existentKeys int + lazy bool } func newPostingGroup(addAll bool, name string, addKeys, removeKeys []string) *postingGroup { diff --git a/vendor/github.com/thanos-io/thanos/pkg/store/lazy_postings.go b/vendor/github.com/thanos-io/thanos/pkg/store/lazy_postings.go index f8363ab477..81b977f5d3 100644 --- a/vendor/github.com/thanos-io/thanos/pkg/store/lazy_postings.go +++ b/vendor/github.com/thanos-io/thanos/pkg/store/lazy_postings.go @@ -39,7 +39,15 @@ func (p *lazyExpandedPostings) lazyExpanded() bool { return p != nil && len(p.matchers) > 0 } -func optimizePostingsFetchByDownloadedBytes(r *bucketIndexReader, postingGroups []*postingGroup, seriesMaxSize int64, seriesMatchRatio float64, lazyExpandedPostingSizeBytes prometheus.Counter) ([]*postingGroup, bool, error) { +func optimizePostingsFetchByDownloadedBytes( + r *bucketIndexReader, + postingGroups []*postingGroup, + seriesMaxSize int64, + seriesMatchRatio float64, + postingGroupMaxKeySeriesRatio float64, + lazyExpandedPostingSizeBytes prometheus.Counter, + lazyExpandedPostingGroupsByReason *prometheus.CounterVec, +) ([]*postingGroup, bool, error) { if len(postingGroups) <= 1 { return postingGroups, false, nil } @@ -55,6 +63,7 @@ func optimizePostingsFetchByDownloadedBytes(r *bucketIndexReader, postingGroups return nil, false, errors.Wrapf(err, "postings offsets for %s", pg.name) } + existentKeys := 0 for _, rng := range rngs { if rng == indexheader.NotFoundRange { continue @@ -63,14 +72,16 @@ func optimizePostingsFetchByDownloadedBytes(r *bucketIndexReader, postingGroups level.Error(r.logger).Log("msg", "invalid index range, fallback to non lazy posting optimization") return postingGroups, false, nil } + existentKeys++ // Each range starts from the #entries field which is 4 bytes. // Need to subtract it when calculating number of postings. // https://github.com/prometheus/prometheus/blob/v2.46.0/tsdb/docs/format/index.md. pg.cardinality += (rng.End - rng.Start - 4) / 4 } + pg.existentKeys = existentKeys // If the posting group adds keys, 0 cardinality means the posting doesn't exist. // If the posting group removes keys, no posting ranges found is fine as it is a noop. - if len(pg.addKeys) > 0 && pg.cardinality == 0 { + if len(pg.addKeys) > 0 && pg.existentKeys == 0 { return nil, true, nil } } @@ -142,6 +153,7 @@ func optimizePostingsFetchByDownloadedBytes(r *bucketIndexReader, postingGroups // Assume only seriesMatchRatio postings will be matched every posting group. seriesMatched := postingGroups[i].cardinality - int64(math.Ceil(float64(negativeCardinalities)*seriesMatchRatio)) + maxSeriesMatched := seriesMatched i++ // Start from next posting group as we always need to fetch at least one posting group with add keys. for i < len(postingGroups) { pg := postingGroups[i] @@ -165,6 +177,13 @@ func optimizePostingsFetchByDownloadedBytes(r *bucketIndexReader, postingGroups seriesMatched -= underfetchedSeries underfetchedSeriesSize = underfetchedSeries * seriesMaxSize } else { + // Only mark posting group as lazy due to too many keys when those keys are known to be existent. + if postingGroupMaxKeySeriesRatio > 0 && maxSeriesMatched > 0 && + float64(pg.existentKeys)/float64(maxSeriesMatched) > postingGroupMaxKeySeriesRatio { + markPostingGroupLazy(pg, "keys_limit", lazyExpandedPostingSizeBytes, lazyExpandedPostingGroupsByReason) + i++ + continue + } underfetchedSeriesSize = seriesMaxSize * int64(math.Ceil(float64(seriesMatched)*(1-seriesMatchRatio))) seriesMatched = int64(math.Ceil(float64(seriesMatched) * seriesMatchRatio)) } @@ -176,13 +195,18 @@ func optimizePostingsFetchByDownloadedBytes(r *bucketIndexReader, postingGroups i++ } for i < len(postingGroups) { - postingGroups[i].lazy = true - lazyExpandedPostingSizeBytes.Add(float64(4 * postingGroups[i].cardinality)) + markPostingGroupLazy(postingGroups[i], "postings_size", lazyExpandedPostingSizeBytes, lazyExpandedPostingGroupsByReason) i++ } return postingGroups, false, nil } +func markPostingGroupLazy(pg *postingGroup, reason string, lazyExpandedPostingSizeBytes prometheus.Counter, lazyExpandedPostingGroupsByReason *prometheus.CounterVec) { + pg.lazy = true + lazyExpandedPostingSizeBytes.Add(float64(4 * pg.cardinality)) + lazyExpandedPostingGroupsByReason.WithLabelValues(reason).Inc() +} + func fetchLazyExpandedPostings( ctx context.Context, postingGroups []*postingGroup, @@ -190,7 +214,9 @@ func fetchLazyExpandedPostings( bytesLimiter BytesLimiter, addAllPostings bool, lazyExpandedPostingEnabled bool, + postingGroupMaxKeySeriesRatio float64, lazyExpandedPostingSizeBytes prometheus.Counter, + lazyExpandedPostingGroupsByReason *prometheus.CounterVec, tenant string, ) (*lazyExpandedPostings, error) { var ( @@ -212,7 +238,9 @@ func fetchLazyExpandedPostings( postingGroups, int64(r.block.estimatedMaxSeriesSize), 0.5, // TODO(yeya24): Expose this as a flag. + postingGroupMaxKeySeriesRatio, lazyExpandedPostingSizeBytes, + lazyExpandedPostingGroupsByReason, ) if err != nil { return nil, err @@ -243,27 +271,25 @@ func keysToFetchFromPostingGroups(postingGroups []*postingGroup) ([]labels.Label for i < len(postingGroups) { pg := postingGroups[i] if pg.lazy { - break + if len(lazyMatchers) == 0 { + lazyMatchers = make([]*labels.Matcher, 0) + } + lazyMatchers = append(lazyMatchers, postingGroups[i].matchers...) + } else { + // Postings returned by fetchPostings will be in the same order as keys + // so it's important that we iterate them in the same order later. + // We don't have any other way of pairing keys and fetched postings. + for _, key := range pg.addKeys { + keys = append(keys, labels.Label{Name: pg.name, Value: key}) + } + for _, key := range pg.removeKeys { + keys = append(keys, labels.Label{Name: pg.name, Value: key}) + } } - // Postings returned by fetchPostings will be in the same order as keys - // so it's important that we iterate them in the same order later. - // We don't have any other way of pairing keys and fetched postings. - for _, key := range pg.addKeys { - keys = append(keys, labels.Label{Name: pg.name, Value: key}) - } - for _, key := range pg.removeKeys { - keys = append(keys, labels.Label{Name: pg.name, Value: key}) - } i++ } - if i < len(postingGroups) { - lazyMatchers = make([]*labels.Matcher, 0) - for i < len(postingGroups) { - lazyMatchers = append(lazyMatchers, postingGroups[i].matchers...) - i++ - } - } + return keys, lazyMatchers } diff --git a/vendor/modules.txt b/vendor/modules.txt index 590d80fbb5..fb01be4153 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -990,7 +990,7 @@ github.com/thanos-io/promql-engine/query github.com/thanos-io/promql-engine/ringbuffer github.com/thanos-io/promql-engine/storage github.com/thanos-io/promql-engine/storage/prometheus -# github.com/thanos-io/thanos v0.37.2-0.20241205123958-d0d93dbf3efc +# github.com/thanos-io/thanos v0.37.2-0.20241210071311-51c7dcd8c278 ## explicit; go 1.23.0 github.com/thanos-io/thanos/pkg/api/query/querypb github.com/thanos-io/thanos/pkg/block