From b7f7b10701e9e07ffcc1aa24ffe228b5c25508eb Mon Sep 17 00:00:00 2001 From: Justin Jung Date: Wed, 6 Sep 2023 10:07:14 -0700 Subject: [PATCH] Add max-inflight-request limit to bucket stores --- pkg/querier/blocks_store_queryable.go | 2 + pkg/querier/blocks_store_queryable_test.go | 51 ++++++++++++++++++++++ pkg/storage/tsdb/config.go | 2 + pkg/storegateway/bucket_stores.go | 29 ++++++++++++ pkg/storegateway/bucket_stores_test.go | 38 ++++++++++++++++ 5 files changed, 122 insertions(+) diff --git a/pkg/querier/blocks_store_queryable.go b/pkg/querier/blocks_store_queryable.go index c78d8952362..eee891b0fdc 100644 --- a/pkg/querier/blocks_store_queryable.go +++ b/pkg/querier/blocks_store_queryable.go @@ -1116,6 +1116,8 @@ func isRetryableError(err error) bool { switch status.Code(err) { case codes.Unavailable: return true + case codes.ResourceExhausted: + return errors.Is(err, storegateway.ErrTooManyInflightRequests) default: return false } diff --git a/pkg/querier/blocks_store_queryable_test.go b/pkg/querier/blocks_store_queryable_test.go index 26935a1f397..a5347628504 100644 --- a/pkg/querier/blocks_store_queryable_test.go +++ b/pkg/querier/blocks_store_queryable_test.go @@ -3,6 +3,7 @@ package querier import ( "context" "fmt" + "github.com/cortexproject/cortex/pkg/storegateway" "io" "sort" "strings" @@ -708,6 +709,56 @@ func TestBlocksStoreQuerier_Select(t *testing.T) { }, }, }, + "multiple store-gateways has the block, but one of them had too many inflight requests": { + finderResult: bucketindex.Blocks{ + {ID: block1}, + }, + storeSetResponses: []interface{}{ + map[BlocksStoreClient][]ulid.ULID{ + &storeGatewayClientMock{ + remoteAddr: "1.1.1.1", + mockedSeriesErr: storegateway.ErrTooManyInflightRequests, + }: {block1}, + }, + map[BlocksStoreClient][]ulid.ULID{ + &storeGatewayClientMock{remoteAddr: "2.2.2.2", mockedSeriesResponses: []*storepb.SeriesResponse{ + mockSeriesResponse(labels.Labels{metricNameLabel, series1Label}, minT, 2), + mockHintsResponse(block1), + }}: {block1}, + }, + }, + limits: &blocksStoreLimitsMock{}, + queryLimiter: noOpQueryLimiter, + expectedSeries: []seriesResult{ + { + lbls: labels.New(metricNameLabel, series1Label), + values: []valueResult{ + {t: minT, v: 2}, + }, + }, + }, + }, + "store gateway returns resource exhausted error other than max inflight request": { + finderResult: bucketindex.Blocks{ + {ID: block1}, + }, + storeSetResponses: []interface{}{ + map[BlocksStoreClient][]ulid.ULID{ + &storeGatewayClientMock{ + remoteAddr: "1.1.1.1", + mockedSeriesErr: status.Error(codes.ResourceExhausted, "some other resource"), + }: {block1}, + }, + map[BlocksStoreClient][]ulid.ULID{ + &storeGatewayClientMock{remoteAddr: "2.2.2.2", mockedSeriesResponses: []*storepb.SeriesResponse{ + mockSeriesResponse(labels.Labels{metricNameLabel, series1Label}, minT, 2), + mockHintsResponse(block1), + }}: {block1}, + }, + }, + limits: &blocksStoreLimitsMock{}, + expectedErr: errors.Wrapf(status.Error(codes.ResourceExhausted, "some other resource"), "failed to fetch series from 1.1.1.1"), + }, } for testName, testData := range tests { diff --git a/pkg/storage/tsdb/config.go b/pkg/storage/tsdb/config.go index 73897cc1122..086e0030145 100644 --- a/pkg/storage/tsdb/config.go +++ b/pkg/storage/tsdb/config.go @@ -241,6 +241,7 @@ type BucketStoreConfig struct { SyncDir string `yaml:"sync_dir"` SyncInterval time.Duration `yaml:"sync_interval"` MaxConcurrent int `yaml:"max_concurrent"` + MaxInflightRequest int `yaml:"max_inflight_request"` TenantSyncConcurrency int `yaml:"tenant_sync_concurrency"` BlockSyncConcurrency int `yaml:"block_sync_concurrency"` MetaSyncConcurrency int `yaml:"meta_sync_concurrency"` @@ -291,6 +292,7 @@ func (cfg *BucketStoreConfig) RegisterFlags(f *flag.FlagSet) { f.IntVar(&cfg.ChunkPoolMinBucketSizeBytes, "blocks-storage.bucket-store.chunk-pool-min-bucket-size-bytes", ChunkPoolDefaultMinBucketSize, "Size - in bytes - of the smallest chunks pool bucket.") f.IntVar(&cfg.ChunkPoolMaxBucketSizeBytes, "blocks-storage.bucket-store.chunk-pool-max-bucket-size-bytes", ChunkPoolDefaultMaxBucketSize, "Size - in bytes - of the largest chunks pool bucket.") f.IntVar(&cfg.MaxConcurrent, "blocks-storage.bucket-store.max-concurrent", 100, "Max number of concurrent queries to execute against the long-term storage. The limit is shared across all tenants.") + f.IntVar(&cfg.MaxInflightRequest, "blocks-storage.bucket-store.max-inflight-request", 0, "Max number of inflight queries to execute against the long-term storage. THe limit is shared across all tenants. 0 to disable.") f.IntVar(&cfg.TenantSyncConcurrency, "blocks-storage.bucket-store.tenant-sync-concurrency", 10, "Maximum number of concurrent tenants synching blocks.") f.IntVar(&cfg.BlockSyncConcurrency, "blocks-storage.bucket-store.block-sync-concurrency", 20, "Maximum number of concurrent blocks synching per tenant.") f.IntVar(&cfg.MetaSyncConcurrency, "blocks-storage.bucket-store.meta-sync-concurrency", 20, "Number of Go routines to use when syncing block meta files from object storage per tenant.") diff --git a/pkg/storegateway/bucket_stores.go b/pkg/storegateway/bucket_stores.go index cc5cb5a5270..6431b7157fa 100644 --- a/pkg/storegateway/bucket_stores.go +++ b/pkg/storegateway/bucket_stores.go @@ -3,6 +3,7 @@ package storegateway import ( "context" "fmt" + "google.golang.org/grpc/status" "math" "net/http" "os" @@ -72,6 +73,10 @@ type BucketStores struct { storesErrorsMu sync.RWMutex storesErrors map[string]error + // Keeps number of inflight requests + inflightRequestCnt int + inflightRequestMu sync.RWMutex + // Metrics. syncTimes prometheus.Histogram syncLastSuccess prometheus.Gauge @@ -79,6 +84,8 @@ type BucketStores struct { tenantsSynced prometheus.Gauge } +var ErrTooManyInflightRequests = status.Error(codes.ResourceExhausted, "too many inflight requests in store gateway") + // NewBucketStores makes a new BucketStores. func NewBucketStores(cfg tsdb.BlocksStorageConfig, shardingStrategy ShardingStrategy, bucketClient objstore.Bucket, limits *validation.Overrides, logLevel logging.Level, logger log.Logger, reg prometheus.Registerer) (*BucketStores, error) { cachingBucket, err := tsdb.CreateCachingBucket(cfg.BucketStore.ChunksCache, cfg.BucketStore.MetadataCache, bucketClient, logger, reg) @@ -293,6 +300,16 @@ func (u *BucketStores) Series(req *storepb.SeriesRequest, srv storepb.Store_Seri spanLog, spanCtx := spanlogger.New(srv.Context(), "BucketStores.Series") defer spanLog.Span.Finish() + maxInflightRequest := u.cfg.BucketStore.MaxInflightRequest + if maxInflightRequest > 0 { + if u.inflightRequestCnt >= maxInflightRequest { + return ErrTooManyInflightRequests + } + + u.incrementInflightRequestCnt() + defer u.decrementInflightRequestCnt() + } + userID := getUserIDFromGRPCContext(spanCtx) if userID == "" { return fmt.Errorf("no userID") @@ -321,6 +338,18 @@ func (u *BucketStores) Series(req *storepb.SeriesRequest, srv storepb.Store_Seri return err } +func (u *BucketStores) incrementInflightRequestCnt() { + u.inflightRequestMu.Lock() + u.inflightRequestCnt++ + u.inflightRequestMu.Unlock() +} + +func (u *BucketStores) decrementInflightRequestCnt() { + u.inflightRequestMu.Lock() + u.inflightRequestCnt-- + u.inflightRequestMu.Unlock() +} + // LabelNames implements the Storegateway proto service. func (u *BucketStores) LabelNames(ctx context.Context, req *storepb.LabelNamesRequest) (*storepb.LabelNamesResponse, error) { spanLog, spanCtx := spanlogger.New(ctx, "BucketStores.LabelNames") diff --git a/pkg/storegateway/bucket_stores_test.go b/pkg/storegateway/bucket_stores_test.go index 7cb3188e745..842ac135cd8 100644 --- a/pkg/storegateway/bucket_stores_test.go +++ b/pkg/storegateway/bucket_stores_test.go @@ -514,6 +514,44 @@ func testBucketStoresSeriesShouldCorrectlyQuerySeriesSpanningMultipleChunks(t *t } } +func TestBucketStores_Series_ShouldReturnErrorIfMaxInflightRequestIsReached(t *testing.T) { + cfg := prepareStorageConfig(t) + cfg.BucketStore.MaxInflightRequest = 10 + reg := prometheus.NewPedanticRegistry() + storageDir := t.TempDir() + bucket, err := filesystem.NewBucketClient(filesystem.Config{Directory: storageDir}) + require.NoError(t, err) + + stores, err := NewBucketStores(cfg, NewNoShardingStrategy(), bucket, defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg) + require.NoError(t, err) + + stores.inflightRequestMu.Lock() + stores.inflightRequestCnt = 10 + stores.inflightRequestMu.Unlock() + series, warnings, err := querySeries(stores, "user_id", "metric_name", 0, 0) + require.Errorf(t, err, "too many inflight requests in store gateway, limit = 10") + assert.Empty(t, series) + assert.Empty(t, warnings) +} + +func TestBucketStores_Series_ShouldNotCheckMaxInflightRequestsIfTheLimitIsDisabled(t *testing.T) { + cfg := prepareStorageConfig(t) + cfg.BucketStore.MaxInflightRequest = 0 // disables the limit + reg := prometheus.NewPedanticRegistry() + storageDir := t.TempDir() + bucket, err := filesystem.NewBucketClient(filesystem.Config{Directory: storageDir}) + require.NoError(t, err) + + stores, err := NewBucketStores(cfg, NewNoShardingStrategy(), bucket, defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg) + require.NoError(t, err) + + stores.inflightRequestMu.Lock() + stores.inflightRequestCnt = 10 + stores.inflightRequestMu.Unlock() + _, _, err = querySeries(stores, "user_id", "metric_name", 0, 0) + require.NoError(t, err) +} + func prepareStorageConfig(t *testing.T) cortex_tsdb.BlocksStorageConfig { cfg := cortex_tsdb.BlocksStorageConfig{} flagext.DefaultValues(&cfg)