Skip to content

Commit

Permalink
change to use max key series ratio
Browse files Browse the repository at this point in the history
Signed-off-by: Ben Ye <[email protected]>
  • Loading branch information
yeya24 committed Dec 9, 2024
1 parent 530a833 commit 61beef9
Show file tree
Hide file tree
Showing 4 changed files with 112 additions and 93 deletions.
76 changes: 38 additions & 38 deletions cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,41 +67,41 @@ const (
)

type storeConfig struct {
indexCacheConfigs extflag.PathOrContent
objStoreConfig extflag.PathOrContent
dataDir string
cacheIndexHeader bool
grpcConfig grpcConfig
httpConfig httpConfig
indexCacheSizeBytes units.Base2Bytes
chunkPoolSize units.Base2Bytes
estimatedMaxSeriesSize uint64
estimatedMaxChunkSize uint64
seriesBatchSize int
storeRateLimits store.SeriesSelectLimits
maxDownloadedBytes units.Base2Bytes
maxConcurrency int
component component.StoreAPI
debugLogging bool
syncInterval time.Duration
blockListStrategy string
blockSyncConcurrency int
blockMetaFetchConcurrency int
filterConf *store.FilterConfig
selectorRelabelConf extflag.PathOrContent
advertiseCompatibilityLabel bool
consistencyDelay commonmodel.Duration
ignoreDeletionMarksDelay commonmodel.Duration
disableWeb bool
webConfig webConfig
label string
postingOffsetsInMemSampling int
cachingBucketConfig extflag.PathOrContent
reqLogConfig *extflag.PathOrContent
lazyIndexReaderEnabled bool
lazyIndexReaderIdleTimeout time.Duration
lazyExpandedPostingsEnabled bool
PostingGroupMaxKeys int
indexCacheConfigs extflag.PathOrContent
objStoreConfig extflag.PathOrContent
dataDir string
cacheIndexHeader bool
grpcConfig grpcConfig
httpConfig httpConfig
indexCacheSizeBytes units.Base2Bytes
chunkPoolSize units.Base2Bytes
estimatedMaxSeriesSize uint64
estimatedMaxChunkSize uint64
seriesBatchSize int
storeRateLimits store.SeriesSelectLimits
maxDownloadedBytes units.Base2Bytes
maxConcurrency int
component component.StoreAPI
debugLogging bool
syncInterval time.Duration
blockListStrategy string
blockSyncConcurrency int
blockMetaFetchConcurrency int
filterConf *store.FilterConfig
selectorRelabelConf extflag.PathOrContent
advertiseCompatibilityLabel bool
consistencyDelay commonmodel.Duration
ignoreDeletionMarksDelay commonmodel.Duration
disableWeb bool
webConfig webConfig
label string
postingOffsetsInMemSampling int
cachingBucketConfig extflag.PathOrContent
reqLogConfig *extflag.PathOrContent
lazyIndexReaderEnabled bool
lazyIndexReaderIdleTimeout time.Duration
lazyExpandedPostingsEnabled bool
postingGroupMaxKeySeriesRatio float64

indexHeaderLazyDownloadStrategy string
}
Expand Down Expand Up @@ -205,8 +205,8 @@ func (sc *storeConfig) registerFlag(cmd extkingpin.FlagClause) {
cmd.Flag("store.enable-lazy-expanded-postings", "If true, Store Gateway will estimate postings size and try to lazily expand postings if it downloads less data than expanding all postings.").
Default("false").BoolVar(&sc.lazyExpandedPostingsEnabled)

cmd.Flag("store.posting-group-max-keys", "Mark posting group as lazy if it fetches more keys than the configured number. Only valid if lazy expanded posting is enabled. 0 disables the limit.").
Default("0").IntVar(&sc.PostingGroupMaxKeys)
cmd.Flag("store.posting-group-max-key-series-ratio", "Mark posting group as lazy if it fetches more keys than R * max series the query should fetch. With R set to 100, a posting group which fetches 100K keys will be marked as lazy if the current query only fetches 1000 series. This config is only valid if lazy expanded posting is enabled. 0 disables the limit.").
Default("100").Float64Var(&sc.postingGroupMaxKeySeriesRatio)

cmd.Flag("store.index-header-lazy-download-strategy", "Strategy of how to download index headers lazily. Supported values: eager, lazy. If eager, always download index header during initial load. If lazy, download index header during query time.").
Default(string(indexheader.EagerDownloadStrategy)).
Expand Down Expand Up @@ -433,7 +433,7 @@ func runStore(
return conf.estimatedMaxChunkSize
}),
store.WithLazyExpandedPostings(conf.lazyExpandedPostingsEnabled),
store.WithPostingGroupMaxKeys(conf.PostingGroupMaxKeys),
store.WithPostingGroupMaxKeySeriesRatio(conf.postingGroupMaxKeySeriesRatio),
store.WithIndexHeaderLazyDownloadStrategy(
indexheader.IndexHeaderLazyDownloadStrategy(conf.indexHeaderLazyDownloadStrategy).StrategyToDownloadFunc(),
),
Expand Down
28 changes: 14 additions & 14 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -425,8 +425,8 @@ type BucketStore struct {

enableChunkHashCalculation bool

enabledLazyExpandedPostings bool
postingGroupMaxKeys int
enabledLazyExpandedPostings bool
postingGroupMaxKeySeriesRatio float64

sortingStrategy sortingStrategy

Expand Down Expand Up @@ -559,10 +559,10 @@ func WithLazyExpandedPostings(enabled bool) BucketStoreOption {
}
}

// WithPostingGroupMaxKeys configures a threshold to mark a posting group as lazy if it has more add keys.
func WithPostingGroupMaxKeys(postingGroupMaxKeys int) BucketStoreOption {
// WithPostingGroupMaxKeySeriesRatio configures a threshold to mark a posting group as lazy if it has more add keys.
func WithPostingGroupMaxKeySeriesRatio(postingGroupMaxKeySeriesRatio float64) BucketStoreOption {
return func(s *BucketStore) {
s.postingGroupMaxKeys = postingGroupMaxKeys
s.postingGroupMaxKeySeriesRatio = postingGroupMaxKeySeriesRatio
}
}

Expand Down Expand Up @@ -1018,7 +1018,7 @@ type blockSeriesClient struct {

lazyExpandedPostingEnabled bool
// Mark posting group as lazy if it adds too many keys. 0 to disable.
postingGroupMaxKeys int
postingGroupMaxKeySeriesRatio float64
lazyExpandedPostingsCount prometheus.Counter
lazyExpandedPostingGroupByReason *prometheus.CounterVec
lazyExpandedPostingSizeBytes prometheus.Counter
Expand Down Expand Up @@ -1063,7 +1063,7 @@ func newBlockSeriesClient(
chunkFetchDurationSum *prometheus.HistogramVec,
extLsetToRemove map[string]struct{},
lazyExpandedPostingEnabled bool,
postingGroupMaxKeys int,
postingGroupMaxKeySeriesRatio float64,
lazyExpandedPostingsCount prometheus.Counter,
lazyExpandedPostingByReason *prometheus.CounterVec,
lazyExpandedPostingSizeBytes prometheus.Counter,
Expand Down Expand Up @@ -1100,7 +1100,7 @@ func newBlockSeriesClient(
chunkFetchDurationSum: chunkFetchDurationSum,

lazyExpandedPostingEnabled: lazyExpandedPostingEnabled,
postingGroupMaxKeys: postingGroupMaxKeys,
postingGroupMaxKeySeriesRatio: postingGroupMaxKeySeriesRatio,
lazyExpandedPostingsCount: lazyExpandedPostingsCount,
lazyExpandedPostingGroupByReason: lazyExpandedPostingByReason,
lazyExpandedPostingSizeBytes: lazyExpandedPostingSizeBytes,
Expand Down Expand Up @@ -1154,7 +1154,7 @@ func (b *blockSeriesClient) ExpandPostings(
matchers sortedMatchers,
seriesLimiter SeriesLimiter,
) error {
ps, err := b.indexr.ExpandedPostings(b.ctx, matchers, b.bytesLimiter, b.lazyExpandedPostingEnabled, b.postingGroupMaxKeys, b.lazyExpandedPostingSizeBytes, b.lazyExpandedPostingGroupByReason, b.tenant)
ps, err := b.indexr.ExpandedPostings(b.ctx, matchers, b.bytesLimiter, b.lazyExpandedPostingEnabled, b.postingGroupMaxKeySeriesRatio, b.lazyExpandedPostingSizeBytes, b.lazyExpandedPostingGroupByReason, b.tenant)
if err != nil {
return errors.Wrap(err, "expanded matching posting")
}
Expand Down Expand Up @@ -1587,7 +1587,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store
s.metrics.chunkFetchDurationSum,
extLsetToRemove,
s.enabledLazyExpandedPostings,
s.postingGroupMaxKeys,
s.postingGroupMaxKeySeriesRatio,
s.metrics.lazyExpandedPostingsCount,
s.metrics.lazyExpandedPostingGroupsByReason,
s.metrics.lazyExpandedPostingSizeBytes,
Expand Down Expand Up @@ -1903,7 +1903,7 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq
nil,
extLsetToRemove,
s.enabledLazyExpandedPostings,
s.postingGroupMaxKeys,
s.postingGroupMaxKeySeriesRatio,
s.metrics.lazyExpandedPostingsCount,
s.metrics.lazyExpandedPostingGroupsByReason,
s.metrics.lazyExpandedPostingSizeBytes,
Expand Down Expand Up @@ -2131,7 +2131,7 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR
nil,
nil,
s.enabledLazyExpandedPostings,
s.postingGroupMaxKeys,
s.postingGroupMaxKeySeriesRatio,
s.metrics.lazyExpandedPostingsCount,
s.metrics.lazyExpandedPostingGroupsByReason,
s.metrics.lazyExpandedPostingSizeBytes,
Expand Down Expand Up @@ -2595,7 +2595,7 @@ func (r *bucketIndexReader) ExpandedPostings(
ms sortedMatchers,
bytesLimiter BytesLimiter,
lazyExpandedPostingEnabled bool,
postingGroupMaxKeys int,
postingGroupMaxKeySeriesRatio float64,
lazyExpandedPostingSizeBytes prometheus.Counter,
lazyExpandedPostingGroupsByReason *prometheus.CounterVec,
tenant string,
Expand Down Expand Up @@ -2651,7 +2651,7 @@ func (r *bucketIndexReader) ExpandedPostings(
postingGroups = append(postingGroups, newPostingGroup(true, name, []string{value}, nil))
}

ps, err := fetchLazyExpandedPostings(ctx, postingGroups, r, bytesLimiter, addAllPostings, lazyExpandedPostingEnabled, postingGroupMaxKeys, lazyExpandedPostingSizeBytes, lazyExpandedPostingGroupsByReason, tenant)
ps, err := fetchLazyExpandedPostings(ctx, postingGroups, r, bytesLimiter, addAllPostings, lazyExpandedPostingEnabled, postingGroupMaxKeySeriesRatio, lazyExpandedPostingSizeBytes, lazyExpandedPostingGroupsByReason, tenant)
if err != nil {
return nil, errors.Wrap(err, "fetch and expand postings")
}
Expand Down
9 changes: 5 additions & 4 deletions pkg/store/lazy_postings.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func optimizePostingsFetchByDownloadedBytes(
postingGroups []*postingGroup,
seriesMaxSize int64,
seriesMatchRatio float64,
postingGroupMaxKeys int,
postingGroupMaxKeySeriesRatio float64,
lazyExpandedPostingSizeBytes prometheus.Counter,
lazyExpandedPostingGroupsByReason *prometheus.CounterVec,
) ([]*postingGroup, bool, error) {
Expand Down Expand Up @@ -153,6 +153,7 @@ func optimizePostingsFetchByDownloadedBytes(

// Assume only seriesMatchRatio postings will be matched every posting group.
seriesMatched := postingGroups[i].cardinality - int64(math.Ceil(float64(negativeCardinalities)*seriesMatchRatio))
maxSeriesMatched := seriesMatched
i++ // Start from next posting group as we always need to fetch at least one posting group with add keys.
for i < len(postingGroups) {
pg := postingGroups[i]
Expand All @@ -177,7 +178,7 @@ func optimizePostingsFetchByDownloadedBytes(
underfetchedSeriesSize = underfetchedSeries * seriesMaxSize
} else {
// Only mark posting group as lazy due to too many keys when those keys are known to be existent.
if postingGroupMaxKeys > 0 && pg.existentKeys > postingGroupMaxKeys {
if postingGroupMaxKeySeriesRatio > 0 && float64(pg.existentKeys)/float64(maxSeriesMatched) > postingGroupMaxKeySeriesRatio {
markPostingGroupLazy(pg, "keys_limit", lazyExpandedPostingSizeBytes, lazyExpandedPostingGroupsByReason)
i++
continue
Expand Down Expand Up @@ -212,7 +213,7 @@ func fetchLazyExpandedPostings(
bytesLimiter BytesLimiter,
addAllPostings bool,
lazyExpandedPostingEnabled bool,
postingGroupMaxKeys int,
postingGroupMaxKeySeriesRatio float64,
lazyExpandedPostingSizeBytes prometheus.Counter,
lazyExpandedPostingGroupsByReason *prometheus.CounterVec,
tenant string,
Expand All @@ -236,7 +237,7 @@ func fetchLazyExpandedPostings(
postingGroups,
int64(r.block.estimatedMaxSeriesSize),
0.5, // TODO(yeya24): Expose this as a flag.
postingGroupMaxKeys,
postingGroupMaxKeySeriesRatio,
lazyExpandedPostingSizeBytes,
lazyExpandedPostingGroupsByReason,
)
Expand Down
Loading

0 comments on commit 61beef9

Please sign in to comment.