From 5ca94a09a3b5c428a8401e827000351403c36042 Mon Sep 17 00:00:00 2001 From: Justin Jung Date: Wed, 13 Sep 2023 03:45:24 -0700 Subject: [PATCH] Add max-inflight-requests limit to store gateway (#5553) * Add max-inflight-request limit to bucket stores Signed-off-by: Justin Jung * Add changelog + fix the config name Signed-off-by: Justin Jung * nit Signed-off-by: Justin Jung * Lint Signed-off-by: Justin Jung * Update docs Signed-off-by: Justin Jung * Add read lock + increment right before sending series request Signed-off-by: Justin Jung * Fix typo in config description + move max inflight request check to right before making the series call Signed-off-by: Justin Jung * Fix test Signed-off-by: Justin Jung * Renamed metric name to match convention Signed-off-by: Justin Jung --------- Signed-off-by: Justin Jung --- CHANGELOG.md | 1 + docs/blocks-storage/querier.md | 5 ++ docs/blocks-storage/store-gateway.md | 5 ++ docs/configuration/config-file-reference.md | 5 ++ pkg/querier/blocks_store_queryable.go | 2 + pkg/querier/blocks_store_queryable_test.go | 51 +++++++++++++++++++++ pkg/storage/tsdb/config.go | 2 + pkg/storegateway/bucket_stores.go | 35 ++++++++++++++ pkg/storegateway/bucket_stores_test.go | 42 +++++++++++++++++ pkg/storegateway/gateway.go | 20 ++++++++ 10 files changed, 168 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6be45a15e6..c6ebf564e5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -35,6 +35,7 @@ * [FEATURE] Ruler: Support for filtering rules in the API. #5417 * [FEATURE] Compactor: Add `-compactor.ring.tokens-file-path` to store generated tokens locally. #5432 * [FEATURE] Query Frontend: Add `-frontend.retry-on-too-many-outstanding-requests` to re-enqueue 429 requests if there are multiple query-schedulers available. #5496 +* [FEATURE] Store Gateway: Add `-blocks-storage.bucket-store.max-inflight-requests`for store gateways to reject further requests upon reaching the limit. #5553 * [ENHANCEMENT] Distributor/Ingester: Add span on push path #5319 * [ENHANCEMENT] Support object storage backends for runtime configuration file. #5292 * [ENHANCEMENT] Query Frontend: Reject subquery with too small step size. #5323 diff --git a/docs/blocks-storage/querier.md b/docs/blocks-storage/querier.md index 01361c8e5c..76ecf8a179 100644 --- a/docs/blocks-storage/querier.md +++ b/docs/blocks-storage/querier.md @@ -499,6 +499,11 @@ blocks_storage: # CLI flag: -blocks-storage.bucket-store.max-concurrent [max_concurrent: | default = 100] + # Max number of inflight queries to execute against the long-term storage. + # The limit is shared across all tenants. 0 to disable. + # CLI flag: -blocks-storage.bucket-store.max-inflight-requests + [max_inflight_requests: | default = 0] + # Maximum number of concurrent tenants synching blocks. # CLI flag: -blocks-storage.bucket-store.tenant-sync-concurrency [tenant_sync_concurrency: | default = 10] diff --git a/docs/blocks-storage/store-gateway.md b/docs/blocks-storage/store-gateway.md index e210238508..d407806542 100644 --- a/docs/blocks-storage/store-gateway.md +++ b/docs/blocks-storage/store-gateway.md @@ -602,6 +602,11 @@ blocks_storage: # CLI flag: -blocks-storage.bucket-store.max-concurrent [max_concurrent: | default = 100] + # Max number of inflight queries to execute against the long-term storage. + # The limit is shared across all tenants. 0 to disable. + # CLI flag: -blocks-storage.bucket-store.max-inflight-requests + [max_inflight_requests: | default = 0] + # Maximum number of concurrent tenants synching blocks. # CLI flag: -blocks-storage.bucket-store.tenant-sync-concurrency [tenant_sync_concurrency: | default = 10] diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 817d89ed46..0fb7777c30 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -1042,6 +1042,11 @@ bucket_store: # CLI flag: -blocks-storage.bucket-store.max-concurrent [max_concurrent: | default = 100] + # Max number of inflight queries to execute against the long-term storage. The + # limit is shared across all tenants. 0 to disable. + # CLI flag: -blocks-storage.bucket-store.max-inflight-requests + [max_inflight_requests: | default = 0] + # Maximum number of concurrent tenants synching blocks. # CLI flag: -blocks-storage.bucket-store.tenant-sync-concurrency [tenant_sync_concurrency: | default = 10] diff --git a/pkg/querier/blocks_store_queryable.go b/pkg/querier/blocks_store_queryable.go index c78d895236..eee891b0fd 100644 --- a/pkg/querier/blocks_store_queryable.go +++ b/pkg/querier/blocks_store_queryable.go @@ -1116,6 +1116,8 @@ func isRetryableError(err error) bool { switch status.Code(err) { case codes.Unavailable: return true + case codes.ResourceExhausted: + return errors.Is(err, storegateway.ErrTooManyInflightRequests) default: return false } diff --git a/pkg/querier/blocks_store_queryable_test.go b/pkg/querier/blocks_store_queryable_test.go index 26935a1f39..920312430b 100644 --- a/pkg/querier/blocks_store_queryable_test.go +++ b/pkg/querier/blocks_store_queryable_test.go @@ -34,6 +34,7 @@ import ( "google.golang.org/grpc/status" "github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex" + "github.com/cortexproject/cortex/pkg/storegateway" "github.com/cortexproject/cortex/pkg/storegateway/storegatewaypb" "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/limiter" @@ -708,6 +709,56 @@ func TestBlocksStoreQuerier_Select(t *testing.T) { }, }, }, + "multiple store-gateways has the block, but one of them had too many inflight requests": { + finderResult: bucketindex.Blocks{ + {ID: block1}, + }, + storeSetResponses: []interface{}{ + map[BlocksStoreClient][]ulid.ULID{ + &storeGatewayClientMock{ + remoteAddr: "1.1.1.1", + mockedSeriesErr: storegateway.ErrTooManyInflightRequests, + }: {block1}, + }, + map[BlocksStoreClient][]ulid.ULID{ + &storeGatewayClientMock{remoteAddr: "2.2.2.2", mockedSeriesResponses: []*storepb.SeriesResponse{ + mockSeriesResponse(labels.Labels{metricNameLabel, series1Label}, minT, 2), + mockHintsResponse(block1), + }}: {block1}, + }, + }, + limits: &blocksStoreLimitsMock{}, + queryLimiter: noOpQueryLimiter, + expectedSeries: []seriesResult{ + { + lbls: labels.New(metricNameLabel, series1Label), + values: []valueResult{ + {t: minT, v: 2}, + }, + }, + }, + }, + "store gateway returns resource exhausted error other than max inflight request": { + finderResult: bucketindex.Blocks{ + {ID: block1}, + }, + storeSetResponses: []interface{}{ + map[BlocksStoreClient][]ulid.ULID{ + &storeGatewayClientMock{ + remoteAddr: "1.1.1.1", + mockedSeriesErr: status.Error(codes.ResourceExhausted, "some other resource"), + }: {block1}, + }, + map[BlocksStoreClient][]ulid.ULID{ + &storeGatewayClientMock{remoteAddr: "2.2.2.2", mockedSeriesResponses: []*storepb.SeriesResponse{ + mockSeriesResponse(labels.Labels{metricNameLabel, series1Label}, minT, 2), + mockHintsResponse(block1), + }}: {block1}, + }, + }, + limits: &blocksStoreLimitsMock{}, + expectedErr: errors.Wrapf(status.Error(codes.ResourceExhausted, "some other resource"), "failed to fetch series from 1.1.1.1"), + }, } for testName, testData := range tests { diff --git a/pkg/storage/tsdb/config.go b/pkg/storage/tsdb/config.go index d28901d351..e8af5e1c41 100644 --- a/pkg/storage/tsdb/config.go +++ b/pkg/storage/tsdb/config.go @@ -241,6 +241,7 @@ type BucketStoreConfig struct { SyncDir string `yaml:"sync_dir"` SyncInterval time.Duration `yaml:"sync_interval"` MaxConcurrent int `yaml:"max_concurrent"` + MaxInflightRequests int `yaml:"max_inflight_requests"` TenantSyncConcurrency int `yaml:"tenant_sync_concurrency"` BlockSyncConcurrency int `yaml:"block_sync_concurrency"` MetaSyncConcurrency int `yaml:"meta_sync_concurrency"` @@ -294,6 +295,7 @@ func (cfg *BucketStoreConfig) RegisterFlags(f *flag.FlagSet) { f.IntVar(&cfg.ChunkPoolMinBucketSizeBytes, "blocks-storage.bucket-store.chunk-pool-min-bucket-size-bytes", ChunkPoolDefaultMinBucketSize, "Size - in bytes - of the smallest chunks pool bucket.") f.IntVar(&cfg.ChunkPoolMaxBucketSizeBytes, "blocks-storage.bucket-store.chunk-pool-max-bucket-size-bytes", ChunkPoolDefaultMaxBucketSize, "Size - in bytes - of the largest chunks pool bucket.") f.IntVar(&cfg.MaxConcurrent, "blocks-storage.bucket-store.max-concurrent", 100, "Max number of concurrent queries to execute against the long-term storage. The limit is shared across all tenants.") + f.IntVar(&cfg.MaxInflightRequests, "blocks-storage.bucket-store.max-inflight-requests", 0, "Max number of inflight queries to execute against the long-term storage. The limit is shared across all tenants. 0 to disable.") f.IntVar(&cfg.TenantSyncConcurrency, "blocks-storage.bucket-store.tenant-sync-concurrency", 10, "Maximum number of concurrent tenants synching blocks.") f.IntVar(&cfg.BlockSyncConcurrency, "blocks-storage.bucket-store.block-sync-concurrency", 20, "Maximum number of concurrent blocks synching per tenant.") f.IntVar(&cfg.MetaSyncConcurrency, "blocks-storage.bucket-store.meta-sync-concurrency", 20, "Number of Go routines to use when syncing block meta files from object storage per tenant.") diff --git a/pkg/storegateway/bucket_stores.go b/pkg/storegateway/bucket_stores.go index ef493ec65b..d7c709c4ec 100644 --- a/pkg/storegateway/bucket_stores.go +++ b/pkg/storegateway/bucket_stores.go @@ -31,6 +31,7 @@ import ( "github.com/weaveworks/common/logging" "google.golang.org/grpc/codes" "google.golang.org/grpc/metadata" + "google.golang.org/grpc/status" "github.com/cortexproject/cortex/pkg/storage/bucket" "github.com/cortexproject/cortex/pkg/storage/tsdb" @@ -72,6 +73,10 @@ type BucketStores struct { storesErrorsMu sync.RWMutex storesErrors map[string]error + // Keeps number of inflight requests + inflightRequestCnt int + inflightRequestMu sync.RWMutex + // Metrics. syncTimes prometheus.Histogram syncLastSuccess prometheus.Gauge @@ -79,6 +84,8 @@ type BucketStores struct { tenantsSynced prometheus.Gauge } +var ErrTooManyInflightRequests = status.Error(codes.ResourceExhausted, "too many inflight requests in store gateway") + // NewBucketStores makes a new BucketStores. func NewBucketStores(cfg tsdb.BlocksStorageConfig, shardingStrategy ShardingStrategy, bucketClient objstore.Bucket, limits *validation.Overrides, logLevel logging.Level, logger log.Logger, reg prometheus.Registerer) (*BucketStores, error) { cachingBucket, err := tsdb.CreateCachingBucket(cfg.BucketStore.ChunksCache, cfg.BucketStore.MetadataCache, bucketClient, logger, reg) @@ -313,6 +320,16 @@ func (u *BucketStores) Series(req *storepb.SeriesRequest, srv storepb.Store_Seri return nil } + maxInflightRequests := u.cfg.BucketStore.MaxInflightRequests + if maxInflightRequests > 0 { + if u.getInflightRequestCnt() >= maxInflightRequests { + return ErrTooManyInflightRequests + } + + u.incrementInflightRequestCnt() + defer u.decrementInflightRequestCnt() + } + err = store.Series(req, spanSeriesServer{ Store_SeriesServer: srv, ctx: spanCtx, @@ -321,6 +338,24 @@ func (u *BucketStores) Series(req *storepb.SeriesRequest, srv storepb.Store_Seri return err } +func (u *BucketStores) getInflightRequestCnt() int { + u.inflightRequestMu.RLock() + defer u.inflightRequestMu.RUnlock() + return u.inflightRequestCnt +} + +func (u *BucketStores) incrementInflightRequestCnt() { + u.inflightRequestMu.Lock() + u.inflightRequestCnt++ + u.inflightRequestMu.Unlock() +} + +func (u *BucketStores) decrementInflightRequestCnt() { + u.inflightRequestMu.Lock() + u.inflightRequestCnt-- + u.inflightRequestMu.Unlock() +} + // LabelNames implements the Storegateway proto service. func (u *BucketStores) LabelNames(ctx context.Context, req *storepb.LabelNamesRequest) (*storepb.LabelNamesResponse, error) { spanLog, spanCtx := spanlogger.New(ctx, "BucketStores.LabelNames") diff --git a/pkg/storegateway/bucket_stores_test.go b/pkg/storegateway/bucket_stores_test.go index 7cb3188e74..1b9b488768 100644 --- a/pkg/storegateway/bucket_stores_test.go +++ b/pkg/storegateway/bucket_stores_test.go @@ -514,6 +514,48 @@ func testBucketStoresSeriesShouldCorrectlyQuerySeriesSpanningMultipleChunks(t *t } } +func TestBucketStores_Series_ShouldReturnErrorIfMaxInflightRequestIsReached(t *testing.T) { + cfg := prepareStorageConfig(t) + cfg.BucketStore.MaxInflightRequests = 10 + reg := prometheus.NewPedanticRegistry() + storageDir := t.TempDir() + generateStorageBlock(t, storageDir, "user_id", "series_1", 0, 100, 15) + bucket, err := filesystem.NewBucketClient(filesystem.Config{Directory: storageDir}) + require.NoError(t, err) + + stores, err := NewBucketStores(cfg, NewNoShardingStrategy(), bucket, defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg) + require.NoError(t, err) + require.NoError(t, stores.InitialSync(context.Background())) + + stores.inflightRequestMu.Lock() + stores.inflightRequestCnt = 10 + stores.inflightRequestMu.Unlock() + series, warnings, err := querySeries(stores, "user_id", "series_1", 0, 100) + assert.ErrorIs(t, err, ErrTooManyInflightRequests) + assert.Empty(t, series) + assert.Empty(t, warnings) +} + +func TestBucketStores_Series_ShouldNotCheckMaxInflightRequestsIfTheLimitIsDisabled(t *testing.T) { + cfg := prepareStorageConfig(t) + reg := prometheus.NewPedanticRegistry() + storageDir := t.TempDir() + generateStorageBlock(t, storageDir, "user_id", "series_1", 0, 100, 15) + bucket, err := filesystem.NewBucketClient(filesystem.Config{Directory: storageDir}) + require.NoError(t, err) + + stores, err := NewBucketStores(cfg, NewNoShardingStrategy(), bucket, defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg) + require.NoError(t, err) + require.NoError(t, stores.InitialSync(context.Background())) + + stores.inflightRequestMu.Lock() + stores.inflightRequestCnt = 10 // max_inflight_request is set to 0 by default = disabled + stores.inflightRequestMu.Unlock() + series, _, err := querySeries(stores, "user_id", "series_1", 0, 100) + require.NoError(t, err) + assert.Equal(t, 1, len(series)) +} + func prepareStorageConfig(t *testing.T) cortex_tsdb.BlocksStorageConfig { cfg := cortex_tsdb.BlocksStorageConfig{} flagext.DefaultValues(&cfg) diff --git a/pkg/storegateway/gateway.go b/pkg/storegateway/gateway.go index cdf4930d8f..fe99a32fa1 100644 --- a/pkg/storegateway/gateway.go +++ b/pkg/storegateway/gateway.go @@ -39,6 +39,10 @@ const ( // ringAutoForgetUnhealthyPeriods is how many consecutive timeout periods an unhealthy instance // in the ring will be automatically removed. ringAutoForgetUnhealthyPeriods = 10 + + instanceLimitsMetric = "cortex_storegateway_instance_limits" + instanceLimitsMetricHelp = "Instance limits used by this store gateway." + limitLabel = "limit" ) var ( @@ -142,6 +146,22 @@ func newStoreGateway(gatewayCfg Config, storageCfg cortex_tsdb.BlocksStorageConf g.bucketSync.WithLabelValues(syncReasonPeriodic) g.bucketSync.WithLabelValues(syncReasonRingChange) + promauto.With(reg).NewGauge(prometheus.GaugeOpts{ + Name: instanceLimitsMetric, + Help: instanceLimitsMetricHelp, + ConstLabels: map[string]string{limitLabel: "max_inflight_requests"}, + }).Set(float64(storageCfg.BucketStore.MaxInflightRequests)) + promauto.With(reg).NewGauge(prometheus.GaugeOpts{ + Name: instanceLimitsMetric, + Help: instanceLimitsMetricHelp, + ConstLabels: map[string]string{limitLabel: "max_concurrent"}, + }).Set(float64(storageCfg.BucketStore.MaxConcurrent)) + promauto.With(reg).NewGauge(prometheus.GaugeOpts{ + Name: instanceLimitsMetric, + Help: instanceLimitsMetricHelp, + ConstLabels: map[string]string{limitLabel: "max_chunk_pool_bytes"}, + }).Set(float64(storageCfg.BucketStore.MaxChunkPoolBytes)) + // Init sharding strategy. var shardingStrategy ShardingStrategy