From 5356796efea7e54629fc1be70a9cdf0d75f1352f Mon Sep 17 00:00:00 2001 From: Justin Jung Date: Wed, 24 Jul 2024 14:28:59 -0700 Subject: [PATCH] [Store Gateway] Token bucket limiter (#6016) * Create TokenBucket Signed-off-by: Justin Jung * Update bucket stores to pass token bucket Signed-off-by: Justin Jung * Move limiters to a new file Signed-off-by: Justin Jung * Added tests for limiters and token bucket Signed-off-by: Justin Jung * Add more tests Signed-off-by: Justin Jung * Added enable flag Signed-off-by: Justin Jung * Add dryrun feature Signed-off-by: Justin Jung * Add doc Signed-off-by: Justin Jung * Add changelog Signed-off-by: Justin Jung * Lint Signed-off-by: Justin Jung * Do not create pod token bucket if the feature is not enabled Signed-off-by: Justin Jung * More docs Signed-off-by: Justin Jung * Address comments Signed-off-by: Justin Jung * Rename podTokenBucket to instanceTokenBucket Signed-off-by: Justin Jung * Updated default values Signed-off-by: Justin Jung * Rename TokenBucketLimiter to TokenBucketBytesLimiter Signed-off-by: Justin Jung * Changed error to httpgrpc Signed-off-by: Justin Jung * Nit Signed-off-by: Justin Jung * Increment failure metric when token bucket returns error Signed-off-by: Justin Jung * Simplify token bucket by making Retrieve to always deduct token Signed-off-by: Justin Jung * Throw 429 and 422 for different failure scenarios Signed-off-by: Justin Jung * Hide token factors from doc Signed-off-by: Justin Jung * Simplified config by combining dryrun and enabled Signed-off-by: Justin Jung * Remove test log Signed-off-by: Justin Jung * Fix tests Signed-off-by: Justin Jung * Fix Signed-off-by: Justin Jung --------- Signed-off-by: Justin Jung --- CHANGELOG.md | 1 + docs/blocks-storage/querier.md | 18 +++ docs/blocks-storage/store-gateway.md | 18 +++ docs/configuration/config-file-reference.md | 18 +++ pkg/storage/tsdb/config.go | 45 ++++++ pkg/storegateway/bucket_stores.go | 101 ++++++------- pkg/storegateway/bucket_stores_test.go | 92 ++++++++++++ pkg/storegateway/limiter.go | 148 ++++++++++++++++++++ pkg/storegateway/limiter_test.go | 148 ++++++++++++++++++++ pkg/util/token_bucket.go | 62 ++++++++ pkg/util/token_bucket_test.go | 23 +++ 11 files changed, 625 insertions(+), 49 deletions(-) create mode 100644 pkg/storegateway/limiter.go create mode 100644 pkg/storegateway/limiter_test.go create mode 100644 pkg/util/token_bucket.go create mode 100644 pkg/util/token_bucket_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index a01440613c..fb01005e0d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ * [FEATURE] OTLP: Support ingesting OTLP exponential metrics as native histograms. #6071 * [FEATURE] Ingester: Add `ingester.instance-limits.max-inflight-query-requests` to allow limiting ingester concurrent queries. #6081 * [FEATURE] Distributor: Add `validation.max-native-histogram-buckets` to limit max number of bucket count. Distributor will try to automatically reduce histogram resolution until it is within the bucket limit or resolution cannot be reduced anymore. #6104 +* [FEATURE] Store Gateway: Token bucket limiter. #6016 * [ENHANCEMENT] rulers: Add support to persist tokens in rulers. #5987 * [ENHANCEMENT] Query Frontend/Querier: Added store gateway postings touched count and touched size in Querier stats and log in Query Frontend. #5892 * [ENHANCEMENT] Query Frontend/Querier: Returns `warnings` on prometheus query responses. #5916 diff --git a/docs/blocks-storage/querier.md b/docs/blocks-storage/querier.md index 1ca05c1b05..a9206c82ba 100644 --- a/docs/blocks-storage/querier.md +++ b/docs/blocks-storage/querier.md @@ -1341,6 +1341,24 @@ blocks_storage: # CLI flag: -blocks-storage.bucket-store.series-batch-size [series_batch_size: | default = 10000] + token_bucket_bytes_limiter: + # Token bucket bytes limiter mode. Supported values are: disabled, dryrun, + # enabled + # CLI flag: -blocks-storage.bucket-store.token-bucket-bytes-limiter.mode + [mode: | default = "disabled"] + + # Instance token bucket size + # CLI flag: -blocks-storage.bucket-store.token-bucket-bytes-limiter.instance-token-bucket-size + [instance_token_bucket_size: | default = 859832320] + + # User token bucket size + # CLI flag: -blocks-storage.bucket-store.token-bucket-bytes-limiter.user-token-bucket-size + [user_token_bucket_size: | default = 644874240] + + # Request token bucket size + # CLI flag: -blocks-storage.bucket-store.token-bucket-bytes-limiter.request-token-bucket-size + [request_token_bucket_size: | default = 4194304] + tsdb: # Local directory to store TSDBs in the ingesters. # CLI flag: -blocks-storage.tsdb.dir diff --git a/docs/blocks-storage/store-gateway.md b/docs/blocks-storage/store-gateway.md index 4b7e504fa4..6c31046b1f 100644 --- a/docs/blocks-storage/store-gateway.md +++ b/docs/blocks-storage/store-gateway.md @@ -1466,6 +1466,24 @@ blocks_storage: # CLI flag: -blocks-storage.bucket-store.series-batch-size [series_batch_size: | default = 10000] + token_bucket_bytes_limiter: + # Token bucket bytes limiter mode. Supported values are: disabled, dryrun, + # enabled + # CLI flag: -blocks-storage.bucket-store.token-bucket-bytes-limiter.mode + [mode: | default = "disabled"] + + # Instance token bucket size + # CLI flag: -blocks-storage.bucket-store.token-bucket-bytes-limiter.instance-token-bucket-size + [instance_token_bucket_size: | default = 859832320] + + # User token bucket size + # CLI flag: -blocks-storage.bucket-store.token-bucket-bytes-limiter.user-token-bucket-size + [user_token_bucket_size: | default = 644874240] + + # Request token bucket size + # CLI flag: -blocks-storage.bucket-store.token-bucket-bytes-limiter.request-token-bucket-size + [request_token_bucket_size: | default = 4194304] + tsdb: # Local directory to store TSDBs in the ingesters. # CLI flag: -blocks-storage.tsdb.dir diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 1e730d99e1..df036ed2f6 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -1899,6 +1899,24 @@ bucket_store: # CLI flag: -blocks-storage.bucket-store.series-batch-size [series_batch_size: | default = 10000] + token_bucket_bytes_limiter: + # Token bucket bytes limiter mode. Supported values are: disabled, dryrun, + # enabled + # CLI flag: -blocks-storage.bucket-store.token-bucket-bytes-limiter.mode + [mode: | default = "disabled"] + + # Instance token bucket size + # CLI flag: -blocks-storage.bucket-store.token-bucket-bytes-limiter.instance-token-bucket-size + [instance_token_bucket_size: | default = 859832320] + + # User token bucket size + # CLI flag: -blocks-storage.bucket-store.token-bucket-bytes-limiter.user-token-bucket-size + [user_token_bucket_size: | default = 644874240] + + # Request token bucket size + # CLI flag: -blocks-storage.bucket-store.token-bucket-bytes-limiter.request-token-bucket-size + [request_token_bucket_size: | default = 4194304] + tsdb: # Local directory to store TSDBs in the ingesters. # CLI flag: -blocks-storage.tsdb.dir diff --git a/pkg/storage/tsdb/config.go b/pkg/storage/tsdb/config.go index 5c9f0c23cb..bd3099dba9 100644 --- a/pkg/storage/tsdb/config.go +++ b/pkg/storage/tsdb/config.go @@ -2,6 +2,7 @@ package tsdb import ( "flag" + "fmt" "path/filepath" "strings" "time" @@ -52,6 +53,7 @@ var ( ErrInvalidBucketIndexBlockDiscoveryStrategy = errors.New("bucket index block discovery strategy can only be enabled when bucket index is enabled") ErrBlockDiscoveryStrategy = errors.New("invalid block discovery strategy") + ErrInvalidTokenBucketBytesLimiterMode = errors.New("invalid token bucket bytes limiter mode") ) // BlocksStorageConfig holds the config information for the blocks storage. @@ -292,6 +294,22 @@ type BucketStoreConfig struct { // Controls how many series to fetch per batch in Store Gateway. Default value is 10000. SeriesBatchSize int `yaml:"series_batch_size"` + + // Token bucket configs + TokenBucketBytesLimiter TokenBucketBytesLimiterConfig `yaml:"token_bucket_bytes_limiter"` +} + +type TokenBucketBytesLimiterConfig struct { + Mode string `yaml:"mode"` + InstanceTokenBucketSize int64 `yaml:"instance_token_bucket_size"` + UserTokenBucketSize int64 `yaml:"user_token_bucket_size"` + RequestTokenBucketSize int64 `yaml:"request_token_bucket_size"` + FetchedPostingsTokenFactor float64 `yaml:"fetched_postings_token_factor" doc:"hidden"` + TouchedPostingsTokenFactor float64 `yaml:"touched_postings_token_factor" doc:"hidden"` + FetchedSeriesTokenFactor float64 `yaml:"fetched_series_token_factor" doc:"hidden"` + TouchedSeriesTokenFactor float64 `yaml:"touched_series_token_factor" doc:"hidden"` + FetchedChunksTokenFactor float64 `yaml:"fetched_chunks_token_factor" doc:"hidden"` + TouchedChunksTokenFactor float64 `yaml:"touched_chunks_token_factor" doc:"hidden"` } // RegisterFlags registers the BucketStore flags @@ -325,6 +343,16 @@ func (cfg *BucketStoreConfig) RegisterFlags(f *flag.FlagSet) { f.BoolVar(&cfg.LazyExpandedPostingsEnabled, "blocks-storage.bucket-store.lazy-expanded-postings-enabled", false, "If true, Store Gateway will estimate postings size and try to lazily expand postings if it downloads less data than expanding all postings.") f.IntVar(&cfg.SeriesBatchSize, "blocks-storage.bucket-store.series-batch-size", store.SeriesBatchSize, "Controls how many series to fetch per batch in Store Gateway. Default value is 10000.") f.StringVar(&cfg.BlockDiscoveryStrategy, "blocks-storage.bucket-store.block-discovery-strategy", string(ConcurrentDiscovery), "One of "+strings.Join(supportedBlockDiscoveryStrategies, ", ")+". When set to concurrent, stores will concurrently issue one call per directory to discover active blocks in the bucket. The recursive strategy iterates through all objects in the bucket, recursively traversing into each directory. This avoids N+1 calls at the expense of having slower bucket iterations. bucket_index strategy can be used in Compactor only and utilizes the existing bucket index to fetch block IDs to sync. This avoids iterating the bucket but can be impacted by delays of cleaner creating bucket index.") + f.StringVar(&cfg.TokenBucketBytesLimiter.Mode, "blocks-storage.bucket-store.token-bucket-bytes-limiter.mode", string(TokenBucketBytesLimiterDisabled), fmt.Sprintf("Token bucket bytes limiter mode. Supported values are: %s", strings.Join(supportedTokenBucketBytesLimiterModes, ", "))) + f.Int64Var(&cfg.TokenBucketBytesLimiter.InstanceTokenBucketSize, "blocks-storage.bucket-store.token-bucket-bytes-limiter.instance-token-bucket-size", int64(820*units.Mebibyte), "Instance token bucket size") + f.Int64Var(&cfg.TokenBucketBytesLimiter.UserTokenBucketSize, "blocks-storage.bucket-store.token-bucket-bytes-limiter.user-token-bucket-size", int64(615*units.Mebibyte), "User token bucket size") + f.Int64Var(&cfg.TokenBucketBytesLimiter.RequestTokenBucketSize, "blocks-storage.bucket-store.token-bucket-bytes-limiter.request-token-bucket-size", int64(4*units.Mebibyte), "Request token bucket size") + f.Float64Var(&cfg.TokenBucketBytesLimiter.FetchedPostingsTokenFactor, "blocks-storage.bucket-store.token-bucket-bytes-limiter.fetched-postings-token-factor", 0, "Multiplication factor used for fetched postings token") + f.Float64Var(&cfg.TokenBucketBytesLimiter.TouchedPostingsTokenFactor, "blocks-storage.bucket-store.token-bucket-bytes-limiter.touched-postings-token-factor", 5, "Multiplication factor used for touched postings token") + f.Float64Var(&cfg.TokenBucketBytesLimiter.FetchedSeriesTokenFactor, "blocks-storage.bucket-store.token-bucket-bytes-limiter.fetched-series-token-factor", 0, "Multiplication factor used for fetched series token") + f.Float64Var(&cfg.TokenBucketBytesLimiter.TouchedSeriesTokenFactor, "blocks-storage.bucket-store.token-bucket-bytes-limiter.touched-series-token-factor", 25, "Multiplication factor used for touched series token") + f.Float64Var(&cfg.TokenBucketBytesLimiter.FetchedChunksTokenFactor, "blocks-storage.bucket-store.token-bucket-bytes-limiter.fetched-chunks-token-factor", 0, "Multiplication factor used for fetched chunks token") + f.Float64Var(&cfg.TokenBucketBytesLimiter.TouchedChunksTokenFactor, "blocks-storage.bucket-store.token-bucket-bytes-limiter.touched-chunks-token-factor", 1, "Multiplication factor used for touched chunks token") } // Validate the config. @@ -344,6 +372,9 @@ func (cfg *BucketStoreConfig) Validate() error { if !util.StringsContain(supportedBlockDiscoveryStrategies, cfg.BlockDiscoveryStrategy) { return ErrInvalidBucketIndexBlockDiscoveryStrategy } + if !util.StringsContain(supportedTokenBucketBytesLimiterModes, cfg.TokenBucketBytesLimiter.Mode) { + return ErrInvalidTokenBucketBytesLimiterMode + } return nil } @@ -375,3 +406,17 @@ var supportedBlockDiscoveryStrategies = []string{ string(RecursiveDiscovery), string(BucketIndexDiscovery), } + +type TokenBucketBytesLimiterMode string + +const ( + TokenBucketBytesLimiterDisabled TokenBucketBytesLimiterMode = "disabled" + TokenBucketBytesLimiterDryRun TokenBucketBytesLimiterMode = "dryrun" + TokenBucketBytesLimiterEnabled TokenBucketBytesLimiterMode = "enabled" +) + +var supportedTokenBucketBytesLimiterModes = []string{ + string(TokenBucketBytesLimiterDisabled), + string(TokenBucketBytesLimiterDryRun), + string(TokenBucketBytesLimiterEnabled), +} diff --git a/pkg/storegateway/bucket_stores.go b/pkg/storegateway/bucket_stores.go index 5d2c2d0ec1..e95e7dd6bc 100644 --- a/pkg/storegateway/bucket_stores.go +++ b/pkg/storegateway/bucket_stores.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "math" - "net/http" "os" "path/filepath" "strings" @@ -35,6 +34,7 @@ import ( "github.com/cortexproject/cortex/pkg/storage/bucket" "github.com/cortexproject/cortex/pkg/storage/tsdb" + "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/backoff" cortex_errors "github.com/cortexproject/cortex/pkg/util/errors" util_log "github.com/cortexproject/cortex/pkg/util/log" @@ -73,6 +73,11 @@ type BucketStores struct { storesErrorsMu sync.RWMutex storesErrors map[string]error + instanceTokenBucket *util.TokenBucket + + userTokenBucketsMu sync.RWMutex + userTokenBuckets map[string]*util.TokenBucket + // Keeps number of inflight requests inflightRequestCnt int inflightRequestMu sync.RWMutex @@ -115,6 +120,7 @@ func NewBucketStores(cfg tsdb.BlocksStorageConfig, shardingStrategy ShardingStra metaFetcherMetrics: NewMetadataFetcherMetrics(), queryGate: queryGate, partitioner: newGapBasedPartitioner(cfg.BucketStore.PartitionerMaxGapBytes, reg), + userTokenBuckets: make(map[string]*util.TokenBucket), syncTimes: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{ Name: "cortex_bucket_stores_blocks_sync_seconds", Help: "The total time it takes to perform a sync stores", @@ -144,6 +150,13 @@ func NewBucketStores(cfg tsdb.BlocksStorageConfig, shardingStrategy ShardingStra return nil, errors.Wrap(err, "create chunks bytes pool") } + if u.cfg.BucketStore.TokenBucketBytesLimiter.Mode != string(tsdb.TokenBucketBytesLimiterDisabled) { + u.instanceTokenBucket = util.NewTokenBucket(cfg.BucketStore.TokenBucketBytesLimiter.InstanceTokenBucketSize, promauto.With(reg).NewGauge(prometheus.GaugeOpts{ + Name: "cortex_bucket_stores_instance_token_bucket_remaining", + Help: "Number of tokens left in instance token bucket.", + })) + } + if reg != nil { reg.MustRegister(u.bucketStoreMetrics, u.metaFetcherMetrics) } @@ -475,6 +488,12 @@ func (u *BucketStores) closeEmptyBucketStore(userID string) error { unlockInDefer = false u.storesMu.Unlock() + if u.cfg.BucketStore.TokenBucketBytesLimiter.Mode != string(tsdb.TokenBucketBytesLimiterDisabled) { + u.userTokenBucketsMu.Lock() + delete(u.userTokenBuckets, userID) + u.userTokenBucketsMu.Unlock() + } + u.metaFetcherMetrics.RemoveUserRegistry(userID) u.bucketStoreMetrics.RemoveUserRegistry(userID) return bs.Close() @@ -612,13 +631,19 @@ func (u *BucketStores) getOrCreateStore(userID string) (*store.BucketStore, erro bucketStoreOpts = append(bucketStoreOpts, store.WithDebugLogging()) } + if u.cfg.BucketStore.TokenBucketBytesLimiter.Mode != string(tsdb.TokenBucketBytesLimiterDisabled) { + u.userTokenBucketsMu.Lock() + u.userTokenBuckets[userID] = util.NewTokenBucket(u.cfg.BucketStore.TokenBucketBytesLimiter.UserTokenBucketSize, nil) + u.userTokenBucketsMu.Unlock() + } + bs, err := store.NewBucketStore( userBkt, fetcher, u.syncDirForUser(userID), newChunksLimiterFactory(u.limits, userID), newSeriesLimiterFactory(u.limits, userID), - newBytesLimiterFactory(u.limits, userID), + newBytesLimiterFactory(u.limits, userID, u.getUserTokenBucket(userID), u.instanceTokenBucket, u.cfg.BucketStore.TokenBucketBytesLimiter, u.getTokensToRetrieve), u.partitioner, u.cfg.BucketStore.BlockSyncConcurrency, false, // No need to enable backward compatibility with Thanos pre 0.8.0 queriers @@ -680,6 +705,31 @@ func (u *BucketStores) deleteLocalFilesForExcludedTenants(includeUserIDs map[str } } +func (u *BucketStores) getUserTokenBucket(userID string) *util.TokenBucket { + u.userTokenBucketsMu.RLock() + defer u.userTokenBucketsMu.RUnlock() + return u.userTokenBuckets[userID] +} + +func (u *BucketStores) getTokensToRetrieve(tokens uint64, dataType store.StoreDataType) int64 { + tokensToRetrieve := float64(tokens) + switch dataType { + case store.PostingsFetched: + tokensToRetrieve *= u.cfg.BucketStore.TokenBucketBytesLimiter.FetchedPostingsTokenFactor + case store.PostingsTouched: + tokensToRetrieve *= u.cfg.BucketStore.TokenBucketBytesLimiter.TouchedPostingsTokenFactor + case store.SeriesFetched: + tokensToRetrieve *= u.cfg.BucketStore.TokenBucketBytesLimiter.FetchedSeriesTokenFactor + case store.SeriesTouched: + tokensToRetrieve *= u.cfg.BucketStore.TokenBucketBytesLimiter.TouchedSeriesTokenFactor + case store.ChunksFetched: + tokensToRetrieve *= u.cfg.BucketStore.TokenBucketBytesLimiter.FetchedChunksTokenFactor + case store.ChunksTouched: + tokensToRetrieve *= u.cfg.BucketStore.TokenBucketBytesLimiter.TouchedChunksTokenFactor + } + return int64(tokensToRetrieve) +} + func getUserIDFromGRPCContext(ctx context.Context) string { meta, ok := metadata.FromIncomingContext(ctx) if !ok { @@ -730,50 +780,3 @@ type spanSeriesServer struct { func (s spanSeriesServer) Context() context.Context { return s.ctx } - -type limiter struct { - limiter *store.Limiter -} - -func (c *limiter) Reserve(num uint64) error { - return c.ReserveWithType(num, 0) -} - -func (c *limiter) ReserveWithType(num uint64, _ store.StoreDataType) error { - err := c.limiter.Reserve(num) - if err != nil { - return httpgrpc.Errorf(http.StatusUnprocessableEntity, err.Error()) - } - - return nil -} - -func newChunksLimiterFactory(limits *validation.Overrides, userID string) store.ChunksLimiterFactory { - return func(failedCounter prometheus.Counter) store.ChunksLimiter { - // Since limit overrides could be live reloaded, we have to get the current user's limit - // each time a new limiter is instantiated. - return &limiter{ - limiter: store.NewLimiter(uint64(limits.MaxChunksPerQueryFromStore(userID)), failedCounter), - } - } -} - -func newSeriesLimiterFactory(limits *validation.Overrides, userID string) store.SeriesLimiterFactory { - return func(failedCounter prometheus.Counter) store.SeriesLimiter { - // Since limit overrides could be live reloaded, we have to get the current user's limit - // each time a new limiter is instantiated. - return &limiter{ - limiter: store.NewLimiter(uint64(limits.MaxFetchedSeriesPerQuery(userID)), failedCounter), - } - } -} - -func newBytesLimiterFactory(limits *validation.Overrides, userID string) store.BytesLimiterFactory { - return func(failedCounter prometheus.Counter) store.BytesLimiter { - // Since limit overrides could be live reloaded, we have to get the current user's limit - // each time a new limiter is instantiated. - return &limiter{ - limiter: store.NewLimiter(uint64(limits.MaxDownloadedBytesPerRequest(userID)), failedCounter), - } - } -} diff --git a/pkg/storegateway/bucket_stores_test.go b/pkg/storegateway/bucket_stores_test.go index a10a4599fb..d3efcfc112 100644 --- a/pkg/storegateway/bucket_stores_test.go +++ b/pkg/storegateway/bucket_stores_test.go @@ -763,6 +763,98 @@ func TestBucketStores_deleteLocalFilesForExcludedTenants(t *testing.T) { `), metricNames...)) } +func TestBucketStores_tokenBuckets(t *testing.T) { + const ( + user1 = "user-1" + user2 = "user-2" + ) + + ctx := context.Background() + cfg := prepareStorageConfig(t) + cfg.BucketStore.TokenBucketBytesLimiter.Mode = string(cortex_tsdb.TokenBucketBytesLimiterEnabled) + + storageDir := t.TempDir() + userToMetric := map[string]string{ + user1: "series_1", + user2: "series_2", + } + for userID, metricName := range userToMetric { + generateStorageBlock(t, storageDir, userID, metricName, 10, 100, 15) + } + + sharding := userShardingStrategy{} + sharding.users = []string{user1, user2} + + bucket, err := filesystem.NewBucketClient(filesystem.Config{Directory: storageDir}) + assert.NoError(t, err) + + reg := prometheus.NewPedanticRegistry() + stores, err := NewBucketStores(cfg, &sharding, objstore.WithNoopInstr(bucket), defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg) + assert.NoError(t, err) + assert.NotNil(t, stores.instanceTokenBucket) + + assert.NoError(t, stores.InitialSync(ctx)) + assert.NotNil(t, stores.getUserTokenBucket("user-1")) + assert.NotNil(t, stores.getUserTokenBucket("user-2")) + + sharding.users = []string{user1} + assert.NoError(t, stores.SyncBlocks(ctx)) + assert.NotNil(t, stores.getUserTokenBucket("user-1")) + assert.Nil(t, stores.getUserTokenBucket("user-2")) + + sharding.users = []string{} + assert.NoError(t, stores.SyncBlocks(ctx)) + assert.Nil(t, stores.getUserTokenBucket("user-1")) + assert.Nil(t, stores.getUserTokenBucket("user-2")) + + cfg.BucketStore.TokenBucketBytesLimiter.Mode = string(cortex_tsdb.TokenBucketBytesLimiterDryRun) + sharding.users = []string{user1, user2} + reg = prometheus.NewPedanticRegistry() + stores, err = NewBucketStores(cfg, &sharding, objstore.WithNoopInstr(bucket), defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg) + assert.NoError(t, err) + assert.NotNil(t, stores.instanceTokenBucket) + + assert.NoError(t, stores.InitialSync(ctx)) + assert.NotNil(t, stores.getUserTokenBucket("user-1")) + assert.NotNil(t, stores.getUserTokenBucket("user-2")) + + cfg.BucketStore.TokenBucketBytesLimiter.Mode = string(cortex_tsdb.TokenBucketBytesLimiterDisabled) + sharding.users = []string{user1, user2} + reg = prometheus.NewPedanticRegistry() + stores, err = NewBucketStores(cfg, &sharding, objstore.WithNoopInstr(bucket), defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg) + assert.NoError(t, err) + + assert.NoError(t, stores.InitialSync(ctx)) + assert.Nil(t, stores.instanceTokenBucket) + assert.Nil(t, stores.getUserTokenBucket("user-1")) + assert.Nil(t, stores.getUserTokenBucket("user-2")) +} + +func TestBucketStores_getTokensToRetrieve(t *testing.T) { + cfg := prepareStorageConfig(t) + cfg.BucketStore.TokenBucketBytesLimiter.FetchedPostingsTokenFactor = 1 + cfg.BucketStore.TokenBucketBytesLimiter.TouchedPostingsTokenFactor = 2 + cfg.BucketStore.TokenBucketBytesLimiter.FetchedSeriesTokenFactor = 3 + cfg.BucketStore.TokenBucketBytesLimiter.TouchedSeriesTokenFactor = 4 + cfg.BucketStore.TokenBucketBytesLimiter.FetchedChunksTokenFactor = 0 + cfg.BucketStore.TokenBucketBytesLimiter.TouchedChunksTokenFactor = 0.5 + + storageDir := t.TempDir() + bucket, err := filesystem.NewBucketClient(filesystem.Config{Directory: storageDir}) + assert.NoError(t, err) + + reg := prometheus.NewPedanticRegistry() + stores, err := NewBucketStores(cfg, NewNoShardingStrategy(log.NewNopLogger(), nil), objstore.WithNoopInstr(bucket), defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg) + assert.NoError(t, err) + + assert.Equal(t, int64(2), stores.getTokensToRetrieve(2, store.PostingsFetched)) + assert.Equal(t, int64(4), stores.getTokensToRetrieve(2, store.PostingsTouched)) + assert.Equal(t, int64(6), stores.getTokensToRetrieve(2, store.SeriesFetched)) + assert.Equal(t, int64(8), stores.getTokensToRetrieve(2, store.SeriesTouched)) + assert.Equal(t, int64(0), stores.getTokensToRetrieve(2, store.ChunksFetched)) + assert.Equal(t, int64(1), stores.getTokensToRetrieve(2, store.ChunksTouched)) +} + func getUsersInDir(t *testing.T, dir string) []string { fs, err := os.ReadDir(dir) require.NoError(t, err) diff --git a/pkg/storegateway/limiter.go b/pkg/storegateway/limiter.go new file mode 100644 index 0000000000..7a633e2422 --- /dev/null +++ b/pkg/storegateway/limiter.go @@ -0,0 +1,148 @@ +package storegateway + +import ( + "net/http" + "sync" + + "github.com/go-kit/log/level" + "github.com/prometheus/client_golang/prometheus" + "github.com/thanos-io/thanos/pkg/store" + "github.com/weaveworks/common/httpgrpc" + + "github.com/cortexproject/cortex/pkg/storage/tsdb" + "github.com/cortexproject/cortex/pkg/util" + util_log "github.com/cortexproject/cortex/pkg/util/log" + "github.com/cortexproject/cortex/pkg/util/validation" +) + +const tokenBucketBytesLimiterErrStr = "store gateway resource exhausted" + +type limiter struct { + limiter *store.Limiter +} + +func (c *limiter) Reserve(num uint64) error { + return c.ReserveWithType(num, 0) +} + +func (c *limiter) ReserveWithType(num uint64, _ store.StoreDataType) error { + err := c.limiter.Reserve(num) + if err != nil { + return httpgrpc.Errorf(http.StatusUnprocessableEntity, err.Error()) + } + + return nil +} + +type compositeBytesLimiter struct { + limiters []store.BytesLimiter +} + +func (c *compositeBytesLimiter) ReserveWithType(num uint64, dataType store.StoreDataType) error { + for _, l := range c.limiters { + if err := l.ReserveWithType(num, dataType); err != nil { + return err // nested limiters are expected to return httpgrpc error + } + } + return nil +} + +type tokenBucketBytesLimiter struct { + instanceTokenBucket *util.TokenBucket + userTokenBucket *util.TokenBucket + requestTokenBucket *util.TokenBucket + getTokensToRetrieve func(tokens uint64, dataType store.StoreDataType) int64 + dryRun bool + + // Counter metric which we will increase if limit is exceeded. + failedCounter prometheus.Counter + failedOnce sync.Once +} + +func (t *tokenBucketBytesLimiter) Reserve(_ uint64) error { + return nil +} + +func (t *tokenBucketBytesLimiter) ReserveWithType(num uint64, dataType store.StoreDataType) error { + tokensToRetrieve := t.getTokensToRetrieve(num, dataType) + + requestTokenRemaining := t.requestTokenBucket.Retrieve(tokensToRetrieve) + userTokenRemaining := t.userTokenBucket.Retrieve(tokensToRetrieve) + instanceTokenRemaining := t.instanceTokenBucket.Retrieve(tokensToRetrieve) + + // if we can retrieve from request bucket, let the request go through + if requestTokenRemaining >= 0 { + return nil + } + + errCode := 0 + + if tokensToRetrieve > t.userTokenBucket.MaxCapacity() || tokensToRetrieve > t.instanceTokenBucket.MaxCapacity() { + errCode = http.StatusUnprocessableEntity + } else if userTokenRemaining < 0 || instanceTokenRemaining < 0 { + errCode = http.StatusTooManyRequests + } + + if errCode > 0 { + if t.dryRun { + level.Warn(util_log.Logger).Log("msg", tokenBucketBytesLimiterErrStr, "dataType", dataType, "dataSize", num, "tokens", tokensToRetrieve, "errorCode", errCode) + return nil + } + + // We need to protect from the counter being incremented twice due to concurrency + t.failedOnce.Do(t.failedCounter.Inc) + return httpgrpc.Errorf(errCode, tokenBucketBytesLimiterErrStr) + } + + return nil +} + +func newTokenBucketBytesLimiter(requestTokenBucket, userTokenBucket, instanceTokenBucket *util.TokenBucket, dryRun bool, failedCounter prometheus.Counter, getTokensToRetrieve func(tokens uint64, dataType store.StoreDataType) int64) *tokenBucketBytesLimiter { + return &tokenBucketBytesLimiter{ + requestTokenBucket: requestTokenBucket, + userTokenBucket: userTokenBucket, + instanceTokenBucket: instanceTokenBucket, + dryRun: dryRun, + failedCounter: failedCounter, + getTokensToRetrieve: getTokensToRetrieve, + } +} + +func newChunksLimiterFactory(limits *validation.Overrides, userID string) store.ChunksLimiterFactory { + return func(failedCounter prometheus.Counter) store.ChunksLimiter { + // Since limit overrides could be live reloaded, we have to get the current user's limit + // each time a new limiter is instantiated. + return &limiter{ + limiter: store.NewLimiter(uint64(limits.MaxChunksPerQueryFromStore(userID)), failedCounter), + } + } +} + +func newSeriesLimiterFactory(limits *validation.Overrides, userID string) store.SeriesLimiterFactory { + return func(failedCounter prometheus.Counter) store.SeriesLimiter { + // Since limit overrides could be live reloaded, we have to get the current user's limit + // each time a new limiter is instantiated. + return &limiter{ + limiter: store.NewLimiter(uint64(limits.MaxFetchedSeriesPerQuery(userID)), failedCounter), + } + } +} + +func newBytesLimiterFactory(limits *validation.Overrides, userID string, userTokenBucket, instanceTokenBucket *util.TokenBucket, tokenBucketBytesLimiterCfg tsdb.TokenBucketBytesLimiterConfig, getTokensToRetrieve func(tokens uint64, dataType store.StoreDataType) int64) store.BytesLimiterFactory { + return func(failedCounter prometheus.Counter) store.BytesLimiter { + limiters := []store.BytesLimiter{} + // Since limit overrides could be live reloaded, we have to get the current user's limit + // each time a new limiter is instantiated. + limiters = append(limiters, &limiter{limiter: store.NewLimiter(uint64(limits.MaxDownloadedBytesPerRequest(userID)), failedCounter)}) + + if tokenBucketBytesLimiterCfg.Mode != string(tsdb.TokenBucketBytesLimiterDisabled) { + requestTokenBucket := util.NewTokenBucket(tokenBucketBytesLimiterCfg.RequestTokenBucketSize, nil) + dryRun := tokenBucketBytesLimiterCfg.Mode == string(tsdb.TokenBucketBytesLimiterDryRun) + limiters = append(limiters, newTokenBucketBytesLimiter(requestTokenBucket, userTokenBucket, instanceTokenBucket, dryRun, failedCounter, getTokensToRetrieve)) + } + + return &compositeBytesLimiter{ + limiters: limiters, + } + } +} diff --git a/pkg/storegateway/limiter_test.go b/pkg/storegateway/limiter_test.go new file mode 100644 index 0000000000..c2a61de249 --- /dev/null +++ b/pkg/storegateway/limiter_test.go @@ -0,0 +1,148 @@ +package storegateway + +import ( + "fmt" + "testing" + + "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/assert" + "github.com/thanos-io/thanos/pkg/store" + + "github.com/cortexproject/cortex/pkg/util" +) + +func TestLimiter(t *testing.T) { + l := &limiter{ + limiter: store.NewLimiter(2, prometheus.NewCounter(prometheus.CounterOpts{})), + } + + assert.NoError(t, l.Reserve(1)) + assert.NoError(t, l.ReserveWithType(1, store.PostingsFetched)) + assert.Error(t, l.Reserve(1)) + assert.Error(t, l.ReserveWithType(1, store.PostingsFetched)) +} + +func TestCompositeLimiter(t *testing.T) { + l := &compositeBytesLimiter{ + limiters: []store.BytesLimiter{ + &limiter{limiter: store.NewLimiter(2, prometheus.NewCounter(prometheus.CounterOpts{}))}, + &limiter{limiter: store.NewLimiter(1, prometheus.NewCounter(prometheus.CounterOpts{}))}, + }, + } + + assert.NoError(t, l.ReserveWithType(1, store.PostingsFetched)) + assert.ErrorContains(t, l.ReserveWithType(1, store.PostingsFetched), "(422)") +} + +func TestNewTokenBucketBytesLimiter(t *testing.T) { + tests := map[string]struct { + tokensToRetrieve []uint64 + requestTokenBucketSize int64 + userTokenBucketSize int64 + instanceTokenBucketSize int64 + expectedRequestTokenRemaining int64 + expectedUserTokenRemaining int64 + expectedInstanceTokenRemaining int64 + getTokensToRetrieve func(tokens uint64, dataType store.StoreDataType) int64 + errCode int + dryRun bool + }{ + "should retrieve buckets from all buckets": { + tokensToRetrieve: []uint64{1}, + requestTokenBucketSize: 1, + userTokenBucketSize: 1, + instanceTokenBucketSize: 1, + }, + "should succeed if there is enough request token, regardless of user or instance bucket": { + tokensToRetrieve: []uint64{1}, + requestTokenBucketSize: 1, + userTokenBucketSize: 0, + instanceTokenBucketSize: 0, + expectedUserTokenRemaining: -1, + expectedInstanceTokenRemaining: -1, + }, + "should throw 429 if not enough user tokens remaining": { + tokensToRetrieve: []uint64{1, 1}, + requestTokenBucketSize: 1, + userTokenBucketSize: 1, + instanceTokenBucketSize: 2, + errCode: 429, + expectedRequestTokenRemaining: -1, + expectedUserTokenRemaining: -1, + }, + "should throw 422 if request is greater than user token bucket size": { + tokensToRetrieve: []uint64{2}, + requestTokenBucketSize: 1, + userTokenBucketSize: 1, + instanceTokenBucketSize: 2, + errCode: 422, + expectedRequestTokenRemaining: -1, + expectedUserTokenRemaining: -1, + }, + "should throw 429 if not enough instance tokesn remaining": { + tokensToRetrieve: []uint64{1, 1}, + requestTokenBucketSize: 1, + userTokenBucketSize: 2, + instanceTokenBucketSize: 1, + errCode: 429, + expectedRequestTokenRemaining: -1, + expectedInstanceTokenRemaining: -1, + }, + "should throw 422 if request is greater than instance token bucket size": { + tokensToRetrieve: []uint64{2}, + requestTokenBucketSize: 1, + userTokenBucketSize: 2, + instanceTokenBucketSize: 1, + errCode: 422, + expectedRequestTokenRemaining: -1, + expectedInstanceTokenRemaining: -1, + }, + "should use getTokensToRetrieve to calculate tokens": { + tokensToRetrieve: []uint64{1}, + getTokensToRetrieve: func(tokens uint64, dataType store.StoreDataType) int64 { + if dataType == store.PostingsFetched { + return 0 + } + return 1 + }, + }, + "should not fail if dryRun": { + tokensToRetrieve: []uint64{1}, + expectedRequestTokenRemaining: -1, + expectedUserTokenRemaining: -1, + expectedInstanceTokenRemaining: -1, + dryRun: true, + }, + } + + for testName, testData := range tests { + t.Run(testName, func(t *testing.T) { + requestTokenBucket := util.NewTokenBucket(testData.requestTokenBucketSize, nil) + userTokenBucket := util.NewTokenBucket(testData.userTokenBucketSize, nil) + instanceTokenBucket := util.NewTokenBucket(testData.instanceTokenBucketSize, nil) + + getTokensToRetrieve := func(tokens uint64, dataType store.StoreDataType) int64 { + return int64(tokens) + } + if testData.getTokensToRetrieve != nil { + getTokensToRetrieve = testData.getTokensToRetrieve + } + l := newTokenBucketBytesLimiter(requestTokenBucket, userTokenBucket, instanceTokenBucket, testData.dryRun, prometheus.NewCounter(prometheus.CounterOpts{}), getTokensToRetrieve) + + var err error + for _, token := range testData.tokensToRetrieve { + err = l.ReserveWithType(token, store.PostingsFetched) + } + + assert.Equal(t, testData.expectedRequestTokenRemaining, requestTokenBucket.Retrieve(0)) + assert.Equal(t, testData.expectedUserTokenRemaining, userTokenBucket.Retrieve(0)) + assert.Equal(t, testData.expectedInstanceTokenRemaining, instanceTokenBucket.Retrieve(0)) + + if testData.errCode > 0 { + assert.ErrorContains(t, err, fmt.Sprintf("(%d)", testData.errCode)) + } else { + assert.NoError(t, err) + } + }) + } +} diff --git a/pkg/util/token_bucket.go b/pkg/util/token_bucket.go new file mode 100644 index 0000000000..6e33c7aaf8 --- /dev/null +++ b/pkg/util/token_bucket.go @@ -0,0 +1,62 @@ +package util + +import ( + "sync" + "time" + + "github.com/prometheus/client_golang/prometheus" +) + +type TokenBucket struct { + remainingTokens int64 + maxCapacity int64 + refillRate int64 + lastRefill time.Time + mu sync.Mutex + + remainingTokensMetric prometheus.Gauge +} + +func NewTokenBucket(maxCapacity int64, remainingTokensMetric prometheus.Gauge) *TokenBucket { + if remainingTokensMetric != nil { + remainingTokensMetric.Set(float64(maxCapacity)) + } + + return &TokenBucket{ + remainingTokens: maxCapacity, + maxCapacity: maxCapacity, + refillRate: maxCapacity, + lastRefill: time.Now(), + remainingTokensMetric: remainingTokensMetric, + } +} + +// Retrieve always deducts token, even if there is not enough remaining tokens. +func (t *TokenBucket) Retrieve(amount int64) int64 { + t.mu.Lock() + defer t.mu.Unlock() + + t.updateTokens() + t.remainingTokens -= amount + + if t.remainingTokensMetric != nil { + t.remainingTokensMetric.Set(float64(t.remainingTokens)) + } + + return t.remainingTokens +} + +func (t *TokenBucket) MaxCapacity() int64 { + return t.maxCapacity +} + +func (t *TokenBucket) updateTokens() { + now := time.Now() + refilledTokens := int64(now.Sub(t.lastRefill).Seconds() * float64(t.refillRate)) + t.remainingTokens += refilledTokens + t.lastRefill = now + + if t.remainingTokens > t.maxCapacity { + t.remainingTokens = t.maxCapacity + } +} diff --git a/pkg/util/token_bucket_test.go b/pkg/util/token_bucket_test.go new file mode 100644 index 0000000000..155e36ce75 --- /dev/null +++ b/pkg/util/token_bucket_test.go @@ -0,0 +1,23 @@ +package util + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestTokenBucket_Retrieve(t *testing.T) { + bucket := NewTokenBucket(10, nil) + + assert.Equal(t, int64(0), bucket.Retrieve(10)) + assert.Negative(t, bucket.Retrieve(1)) + time.Sleep(time.Second) + assert.Positive(t, bucket.Retrieve(5)) +} + +func TestTokenBucket_MaxCapacity(t *testing.T) { + bucket := NewTokenBucket(10, nil) + + assert.Equal(t, int64(10), bucket.MaxCapacity()) +}