diff --git a/pkg/compactor/blocks_cleaner.go b/pkg/compactor/blocks_cleaner.go index 5cdda9d764..d1a81f401c 100644 --- a/pkg/compactor/blocks_cleaner.go +++ b/pkg/compactor/blocks_cleaner.go @@ -43,7 +43,7 @@ type BlocksCleaner struct { cfg BlocksCleanerConfig cfgProvider ConfigProvider logger log.Logger - bucketClient objstore.Bucket + bucketClient objstore.InstrumentedBucket usersScanner *cortex_tsdb.UsersScanner // Keep track of the last owned users. @@ -64,7 +64,7 @@ type BlocksCleaner struct { tenantBucketIndexLastUpdate *prometheus.GaugeVec } -func NewBlocksCleaner(cfg BlocksCleanerConfig, bucketClient objstore.Bucket, usersScanner *cortex_tsdb.UsersScanner, cfgProvider ConfigProvider, logger log.Logger, reg prometheus.Registerer) *BlocksCleaner { +func NewBlocksCleaner(cfg BlocksCleanerConfig, bucketClient objstore.InstrumentedBucket, usersScanner *cortex_tsdb.UsersScanner, cfgProvider ConfigProvider, logger log.Logger, reg prometheus.Registerer) *BlocksCleaner { c := &BlocksCleaner{ cfg: cfg, bucketClient: bucketClient, diff --git a/pkg/compactor/compactor.go b/pkg/compactor/compactor.go index 06f242103a..a7e5cf429d 100644 --- a/pkg/compactor/compactor.go +++ b/pkg/compactor/compactor.go @@ -296,7 +296,7 @@ type Compactor struct { // Functions that creates bucket client, grouper, planner and compactor using the context. // Useful for injecting mock objects from tests. - bucketClientFactory func(ctx context.Context) (objstore.Bucket, error) + bucketClientFactory func(ctx context.Context) (objstore.InstrumentedBucket, error) blocksGrouperFactory BlocksGrouperFactory blocksCompactorFactory BlocksCompactorFactory @@ -312,7 +312,7 @@ type Compactor struct { blocksPlannerFactory PlannerFactory // Client used to run operations on the bucket storing blocks. - bucketClient objstore.Bucket + bucketClient objstore.InstrumentedBucket // Ring used for sharding compactions. ringLifecycler *ring.Lifecycler @@ -345,7 +345,7 @@ type Compactor struct { // NewCompactor makes a new Compactor. func NewCompactor(compactorCfg Config, storageCfg cortex_tsdb.BlocksStorageConfig, logger log.Logger, registerer prometheus.Registerer, limits *validation.Overrides) (*Compactor, error) { - bucketClientFactory := func(ctx context.Context) (objstore.Bucket, error) { + bucketClientFactory := func(ctx context.Context) (objstore.InstrumentedBucket, error) { return bucket.NewClient(ctx, storageCfg.Bucket, "compactor", logger, registerer) } @@ -380,7 +380,7 @@ func newCompactor( storageCfg cortex_tsdb.BlocksStorageConfig, logger log.Logger, registerer prometheus.Registerer, - bucketClientFactory func(ctx context.Context) (objstore.Bucket, error), + bucketClientFactory func(ctx context.Context) (objstore.InstrumentedBucket, error), blocksGrouperFactory BlocksGrouperFactory, blocksCompactorFactory BlocksCompactorFactory, limits *validation.Overrides, diff --git a/pkg/compactor/compactor_test.go b/pkg/compactor/compactor_test.go index 03659a5363..8998fa4fae 100644 --- a/pkg/compactor/compactor_test.go +++ b/pkg/compactor/compactor_test.go @@ -1647,7 +1647,7 @@ func prepareConfig() Config { return compactorCfg } -func prepare(t *testing.T, compactorCfg Config, bucketClient objstore.Bucket, limits *validation.Limits) (*Compactor, *tsdbCompactorMock, *tsdbPlannerMock, *concurrency.SyncBuffer, prometheus.Gatherer) { +func prepare(t *testing.T, compactorCfg Config, bucketClient objstore.InstrumentedBucket, limits *validation.Limits) (*Compactor, *tsdbCompactorMock, *tsdbPlannerMock, *concurrency.SyncBuffer, prometheus.Gatherer) { storageCfg := cortex_tsdb.BlocksStorageConfig{} flagext.DefaultValues(&storageCfg) @@ -1670,7 +1670,7 @@ func prepare(t *testing.T, compactorCfg Config, bucketClient objstore.Bucket, li overrides, err := validation.NewOverrides(*limits, nil) require.NoError(t, err) - bucketClientFactory := func(ctx context.Context) (objstore.Bucket, error) { + bucketClientFactory := func(ctx context.Context) (objstore.InstrumentedBucket, error) { return bucketClient, nil } @@ -1845,7 +1845,7 @@ func TestCompactor_DeleteLocalSyncFiles(t *testing.T) { userIDs = append(userIDs, fmt.Sprintf("user-%d", i)) } - inmem := objstore.NewInMemBucket() + inmem := objstore.WithNoopInstr(objstore.NewInMemBucket()) for _, userID := range userIDs { id, err := ulid.New(ulid.Now(), rand.Reader) require.NoError(t, err) @@ -1956,7 +1956,7 @@ func TestCompactor_ShouldFailCompactionOnTimeout(t *testing.T) { } func TestCompactor_ShouldNotTreatInterruptionsAsErrors(t *testing.T) { - bucketClient := objstore.NewInMemBucket() + bucketClient := objstore.WithNoopInstr(objstore.NewInMemBucket()) id := ulid.MustNew(ulid.Now(), rand.Reader) require.NoError(t, bucketClient.Upload(context.Background(), "user-1/"+id.String()+"/meta.json", strings.NewReader(mockBlockMetaJSON(id.String())))) diff --git a/pkg/ingester/ingester_test.go b/pkg/ingester/ingester_test.go index 78dd53f7b9..1e1ee46b71 100644 --- a/pkg/ingester/ingester_test.go +++ b/pkg/ingester/ingester_test.go @@ -2731,7 +2731,7 @@ func TestIngester_dontShipBlocksWhenTenantDeletionMarkerIsPresent(t *testing.T) numObjects := len(bucket.Objects()) require.NotZero(t, numObjects) - require.NoError(t, cortex_tsdb.WriteTenantDeletionMark(context.Background(), bucket, userID, cortex_tsdb.NewTenantDeletionMark(time.Now()))) + require.NoError(t, cortex_tsdb.WriteTenantDeletionMark(context.Background(), objstore.WithNoopInstr(bucket), userID, cortex_tsdb.NewTenantDeletionMark(time.Now()))) numObjects++ // For deletion marker db := i.getTSDB(userID) @@ -2763,7 +2763,7 @@ func TestIngester_seriesCountIsCorrectAfterClosingTSDBForDeletedTenant(t *testin bucket := objstore.NewInMemBucket() // Write tenant deletion mark. - require.NoError(t, cortex_tsdb.WriteTenantDeletionMark(context.Background(), bucket, userID, cortex_tsdb.NewTenantDeletionMark(time.Now()))) + require.NoError(t, cortex_tsdb.WriteTenantDeletionMark(context.Background(), objstore.WithNoopInstr(bucket), userID, cortex_tsdb.NewTenantDeletionMark(time.Now()))) i.TSDBState.bucket = bucket require.NoError(t, services.StartAndAwaitRunning(context.Background(), i)) diff --git a/pkg/purger/tenant_deletion_api.go b/pkg/purger/tenant_deletion_api.go index a80b9c0f7c..1b8310f826 100644 --- a/pkg/purger/tenant_deletion_api.go +++ b/pkg/purger/tenant_deletion_api.go @@ -20,7 +20,7 @@ import ( ) type TenantDeletionAPI struct { - bucketClient objstore.Bucket + bucketClient objstore.InstrumentedBucket logger log.Logger cfgProvider bucket.TenantConfigProvider } @@ -34,7 +34,7 @@ func NewTenantDeletionAPI(storageCfg cortex_tsdb.BlocksStorageConfig, cfgProvide return newTenantDeletionAPI(bucketClient, cfgProvider, logger), nil } -func newTenantDeletionAPI(bkt objstore.Bucket, cfgProvider bucket.TenantConfigProvider, logger log.Logger) *TenantDeletionAPI { +func newTenantDeletionAPI(bkt objstore.InstrumentedBucket, cfgProvider bucket.TenantConfigProvider, logger log.Logger) *TenantDeletionAPI { return &TenantDeletionAPI{ bucketClient: bkt, cfgProvider: cfgProvider, @@ -118,7 +118,7 @@ func (api *TenantDeletionAPI) isBlocksForUserDeleted(ctx context.Context, userID return true, nil } -func createBucketClient(cfg cortex_tsdb.BlocksStorageConfig, logger log.Logger, reg prometheus.Registerer) (objstore.Bucket, error) { +func createBucketClient(cfg cortex_tsdb.BlocksStorageConfig, logger log.Logger, reg prometheus.Registerer) (objstore.InstrumentedBucket, error) { bucketClient, err := bucket.NewClient(context.Background(), cfg.Bucket, "purger", logger, reg) if err != nil { return nil, errors.Wrap(err, "create bucket client") diff --git a/pkg/purger/tenant_deletion_api_test.go b/pkg/purger/tenant_deletion_api_test.go index 0d8129f80b..6667438b1b 100644 --- a/pkg/purger/tenant_deletion_api_test.go +++ b/pkg/purger/tenant_deletion_api_test.go @@ -17,7 +17,7 @@ import ( func TestDeleteTenant(t *testing.T) { bkt := objstore.NewInMemBucket() - api := newTenantDeletionAPI(bkt, nil, log.NewNopLogger()) + api := newTenantDeletionAPI(objstore.WithNoopInstr(bkt), nil, log.NewNopLogger()) { resp := httptest.NewRecorder() @@ -80,7 +80,7 @@ func TestDeleteTenantStatus(t *testing.T) { require.NoError(t, bkt.Upload(context.Background(), objName, bytes.NewReader(data))) } - api := newTenantDeletionAPI(bkt, nil, log.NewNopLogger()) + api := newTenantDeletionAPI(objstore.WithNoopInstr(bkt), nil, log.NewNopLogger()) res, err := api.isBlocksForUserDeleted(context.Background(), username) require.NoError(t, err) diff --git a/pkg/storage/bucket/client.go b/pkg/storage/bucket/client.go index 15133f0028..7d120cd6d7 100644 --- a/pkg/storage/bucket/client.go +++ b/pkg/storage/bucket/client.go @@ -57,7 +57,7 @@ type Config struct { // Not used internally, meant to allow callers to wrap Buckets // created using this config - Middlewares []func(objstore.Bucket) (objstore.Bucket, error) `yaml:"-"` + Middlewares []func(objstore.InstrumentedBucket) (objstore.InstrumentedBucket, error) `yaml:"-"` // Used to inject additional backends into the config. Allows for this config to // be embedded in multiple contexts and support non-object storage based backends. @@ -103,7 +103,8 @@ func (cfg *Config) Validate() error { } // NewClient creates a new bucket client based on the configured backend -func NewClient(ctx context.Context, cfg Config, name string, logger log.Logger, reg prometheus.Registerer) (client objstore.Bucket, err error) { +func NewClient(ctx context.Context, cfg Config, name string, logger log.Logger, reg prometheus.Registerer) (bucket objstore.InstrumentedBucket, err error) { + var client objstore.Bucket switch cfg.Backend { case S3: client, err = s3.NewBucketClient(cfg.S3, name, logger) @@ -123,17 +124,17 @@ func NewClient(ctx context.Context, cfg Config, name string, logger log.Logger, return nil, err } - client = opentracing.WrapWithTraces(bucketWithMetrics(client, name, reg)) + iClient := opentracing.WrapWithTraces(bucketWithMetrics(client, name, reg)) // Wrap the client with any provided middleware for _, wrap := range cfg.Middlewares { - client, err = wrap(client) + iClient, err = wrap(iClient) if err != nil { return nil, err } } - return client, nil + return iClient, nil } func bucketWithMetrics(bucketClient objstore.Bucket, name string, reg prometheus.Registerer) objstore.Bucket { diff --git a/pkg/storage/bucket/client_mock.go b/pkg/storage/bucket/client_mock.go index 55ab7f7dc1..ac96370aa9 100644 --- a/pkg/storage/bucket/client_mock.go +++ b/pkg/storage/bucket/client_mock.go @@ -23,6 +23,14 @@ type ClientMock struct { uploaded sync.Map } +func (m *ClientMock) WithExpectedErrs(objstore.IsOpFailureExpectedFunc) objstore.Bucket { + return m +} + +func (m *ClientMock) ReaderWithExpectedErrs(objstore.IsOpFailureExpectedFunc) objstore.BucketReader { + return m +} + // Upload mocks objstore.Bucket.Upload() func (m *ClientMock) Upload(ctx context.Context, name string, r io.Reader) error { if _, ok := m.uploaded.Load(name); ok { diff --git a/pkg/storage/tsdb/bucketindex/markers_bucket_client.go b/pkg/storage/tsdb/bucketindex/markers_bucket_client.go index 4585d842ad..c0dbb6e766 100644 --- a/pkg/storage/tsdb/bucketindex/markers_bucket_client.go +++ b/pkg/storage/tsdb/bucketindex/markers_bucket_client.go @@ -18,7 +18,7 @@ type globalMarkersBucket struct { // BucketWithGlobalMarkers wraps the input bucket into a bucket which also keeps track of markers // in the global markers location. -func BucketWithGlobalMarkers(b objstore.Bucket) objstore.Bucket { +func BucketWithGlobalMarkers(b objstore.InstrumentedBucket) objstore.InstrumentedBucket { return &globalMarkersBucket{ parent: b, } diff --git a/pkg/storage/tsdb/bucketindex/markers_bucket_client_test.go b/pkg/storage/tsdb/bucketindex/markers_bucket_client_test.go index faea4d30aa..e6a41b5eb0 100644 --- a/pkg/storage/tsdb/bucketindex/markers_bucket_client_test.go +++ b/pkg/storage/tsdb/bucketindex/markers_bucket_client_test.go @@ -195,8 +195,8 @@ func TestBucketWithGlobalMarkers_ShouldRetryUpload(t *testing.T) { Bucket: bkt, UploadFailures: map[string]error{p: errors.New("test")}, } - bkt, _ = s3.NewBucketWithRetries(mBucket, 5, 0, 0, log.NewNopLogger()) - bkt = BucketWithGlobalMarkers(bkt) + s3Bkt, _ := s3.NewBucketWithRetries(mBucket, 5, 0, 0, log.NewNopLogger()) + bkt = BucketWithGlobalMarkers(objstore.WithNoopInstr(s3Bkt)) originalPath := block1.String() + "/" + tc.mark err := bkt.Upload(ctx, originalPath, strings.NewReader("{}")) require.Equal(t, errors.New("test"), err) diff --git a/pkg/storage/tsdb/bucketindex/storage_test.go b/pkg/storage/tsdb/bucketindex/storage_test.go index e1c36b3ed2..ec6866cd98 100644 --- a/pkg/storage/tsdb/bucketindex/storage_test.go +++ b/pkg/storage/tsdb/bucketindex/storage_test.go @@ -10,6 +10,7 @@ import ( "github.com/go-kit/log" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/thanos-io/objstore" "github.com/cortexproject/cortex/pkg/storage/bucket" "github.com/cortexproject/cortex/pkg/storage/bucket/s3" @@ -91,8 +92,8 @@ func TestReadIndex_ShouldRetryUpload(t *testing.T) { Bucket: bkt, UploadFailures: map[string]error{userID: errors.New("test")}, } - bkt, _ = s3.NewBucketWithRetries(mBucket, 5, 0, 0, log.NewNopLogger()) - bkt = BucketWithGlobalMarkers(bkt) + s3Bkt, _ := s3.NewBucketWithRetries(mBucket, 5, 0, 0, log.NewNopLogger()) + bkt = BucketWithGlobalMarkers(objstore.WithNoopInstr(s3Bkt)) u := NewUpdater(bkt, userID, nil, logger) expectedIdx, _, _, err := u.UpdateIndex(ctx, nil) diff --git a/pkg/storage/tsdb/caching_bucket.go b/pkg/storage/tsdb/caching_bucket.go index 535562b703..990f8119e0 100644 --- a/pkg/storage/tsdb/caching_bucket.go +++ b/pkg/storage/tsdb/caching_bucket.go @@ -110,7 +110,7 @@ func (cfg *MetadataCacheConfig) Validate() error { return cfg.CacheBackend.Validate() } -func CreateCachingBucket(chunksConfig ChunksCacheConfig, metadataConfig MetadataCacheConfig, bkt objstore.Bucket, logger log.Logger, reg prometheus.Registerer) (objstore.Bucket, error) { +func CreateCachingBucket(chunksConfig ChunksCacheConfig, metadataConfig MetadataCacheConfig, bkt objstore.InstrumentedBucket, logger log.Logger, reg prometheus.Registerer) (objstore.InstrumentedBucket, error) { cfg := cache.NewCachingBucketConfig() cachingConfigured := false diff --git a/pkg/storage/tsdb/tenant_deletion_mark.go b/pkg/storage/tsdb/tenant_deletion_mark.go index 8b07013d2c..33ba7b22ce 100644 --- a/pkg/storage/tsdb/tenant_deletion_mark.go +++ b/pkg/storage/tsdb/tenant_deletion_mark.go @@ -43,22 +43,22 @@ func TenantDeletionMarkExists(ctx context.Context, bkt objstore.BucketReader, us } // Uploads deletion mark to the tenant location in the bucket. -func WriteTenantDeletionMark(ctx context.Context, bkt objstore.Bucket, userID string, mark *TenantDeletionMark) error { +func WriteTenantDeletionMark(ctx context.Context, bkt objstore.InstrumentedBucket, userID string, mark *TenantDeletionMark) error { markerFile := GetGlobalDeletionMarkPath(userID) return write(ctx, bkt, markerFile, mark) } // Returns tenant deletion mark for given user, if it exists. If it doesn't exist, returns nil mark, and no error. -func ReadTenantDeletionMark(ctx context.Context, bkt objstore.BucketReader, userID string) (*TenantDeletionMark, error) { +func ReadTenantDeletionMark(ctx context.Context, bkt objstore.InstrumentedBucket, userID string) (*TenantDeletionMark, error) { markerFile := GetGlobalDeletionMarkPath(userID) - if mark, err := read(ctx, bkt, markerFile); err != nil { + if mark, err := read(ctx, bkt.WithExpectedErrs(bkt.IsObjNotFoundErr), markerFile); err != nil { return nil, err } else if mark != nil { return mark, nil } markerFile = GetLocalDeletionMarkPath(userID) - return read(ctx, bkt, markerFile) + return read(ctx, bkt.WithExpectedErrs(bkt.IsObjNotFoundErr), markerFile) } // Deletes the tenant deletion mark for given user if it exists. diff --git a/pkg/storage/tsdb/tenant_deletion_mark_test.go b/pkg/storage/tsdb/tenant_deletion_mark_test.go index 22a9ade6b2..f95388be06 100644 --- a/pkg/storage/tsdb/tenant_deletion_mark_test.go +++ b/pkg/storage/tsdb/tenant_deletion_mark_test.go @@ -59,7 +59,7 @@ func TestTenantDeletionMarkExists(t *testing.T) { } for _, user := range tc.deletedUsers { - require.NoError(t, WriteTenantDeletionMark(context.Background(), bkt, user, &TenantDeletionMark{})) + require.NoError(t, WriteTenantDeletionMark(context.Background(), objstore.WithNoopInstr(bkt), user, &TenantDeletionMark{})) } res, err := TenantDeletionMarkExists(context.Background(), bkt, username) diff --git a/pkg/storage/tsdb/testutil/objstore.go b/pkg/storage/tsdb/testutil/objstore.go index f9d5e67a72..d879ab2bb4 100644 --- a/pkg/storage/tsdb/testutil/objstore.go +++ b/pkg/storage/tsdb/testutil/objstore.go @@ -19,7 +19,7 @@ import ( var ErrKeyAccessDeniedError = errors.New("test key access denied") -func PrepareFilesystemBucket(t testing.TB) (objstore.Bucket, string) { +func PrepareFilesystemBucket(t testing.TB) (objstore.InstrumentedBucket, string) { storageDir, err := os.MkdirTemp(os.TempDir(), "bucket") require.NoError(t, err) diff --git a/pkg/storegateway/bucket_stores.go b/pkg/storegateway/bucket_stores.go index ab82d3037a..dd2b5ad633 100644 --- a/pkg/storegateway/bucket_stores.go +++ b/pkg/storegateway/bucket_stores.go @@ -87,7 +87,7 @@ type BucketStores struct { var ErrTooManyInflightRequests = status.Error(codes.ResourceExhausted, "too many inflight requests in store gateway") // NewBucketStores makes a new BucketStores. -func NewBucketStores(cfg tsdb.BlocksStorageConfig, shardingStrategy ShardingStrategy, bucketClient objstore.Bucket, limits *validation.Overrides, logLevel logging.Level, logger log.Logger, reg prometheus.Registerer) (*BucketStores, error) { +func NewBucketStores(cfg tsdb.BlocksStorageConfig, shardingStrategy ShardingStrategy, bucketClient objstore.InstrumentedBucket, limits *validation.Overrides, logLevel logging.Level, logger log.Logger, reg prometheus.Registerer) (*BucketStores, error) { cachingBucket, err := tsdb.CreateCachingBucket(cfg.BucketStore.ChunksCache, cfg.BucketStore.MetadataCache, bucketClient, logger, reg) if err != nil { return nil, errors.Wrapf(err, "create caching bucket") diff --git a/pkg/storegateway/bucket_stores_test.go b/pkg/storegateway/bucket_stores_test.go index da38ce2eaa..a10a4599fb 100644 --- a/pkg/storegateway/bucket_stores_test.go +++ b/pkg/storegateway/bucket_stores_test.go @@ -200,7 +200,7 @@ func TestBucketStores_InitialSync(t *testing.T) { require.NoError(t, err) reg := prometheus.NewPedanticRegistry() - stores, err := NewBucketStores(cfg, NewNoShardingStrategy(log.NewNopLogger(), nil), bucket, defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg) + stores, err := NewBucketStores(cfg, NewNoShardingStrategy(log.NewNopLogger(), nil), objstore.WithNoopInstr(bucket), defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg) require.NoError(t, err) // Query series before the initial sync. @@ -276,7 +276,7 @@ func TestBucketStores_InitialSyncShouldRetryOnFailure(t *testing.T) { bucket = &failFirstGetBucket{Bucket: bucket} reg := prometheus.NewPedanticRegistry() - stores, err := NewBucketStores(cfg, NewNoShardingStrategy(log.NewNopLogger(), nil), bucket, defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg) + stores, err := NewBucketStores(cfg, NewNoShardingStrategy(log.NewNopLogger(), nil), objstore.WithNoopInstr(bucket), defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg) require.NoError(t, err) // Initial sync should succeed even if a transient error occurs. @@ -336,7 +336,7 @@ func TestBucketStores_SyncBlocks(t *testing.T) { require.NoError(t, err) reg := prometheus.NewPedanticRegistry() - stores, err := NewBucketStores(cfg, NewNoShardingStrategy(log.NewNopLogger(), nil), bucket, defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg) + stores, err := NewBucketStores(cfg, NewNoShardingStrategy(log.NewNopLogger(), nil), objstore.WithNoopInstr(bucket), defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg) require.NoError(t, err) // Run an initial sync to discover 1 block. @@ -470,7 +470,7 @@ func testBucketStoresSeriesShouldCorrectlyQuerySeriesSpanningMultipleChunks(t *t require.NoError(t, err) reg := prometheus.NewPedanticRegistry() - stores, err := NewBucketStores(cfg, NewNoShardingStrategy(log.NewNopLogger(), nil), bucket, defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg) + stores, err := NewBucketStores(cfg, NewNoShardingStrategy(log.NewNopLogger(), nil), objstore.WithNoopInstr(bucket), defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg) require.NoError(t, err) require.NoError(t, stores.InitialSync(ctx)) @@ -526,7 +526,7 @@ func TestBucketStores_Series_ShouldReturnErrorIfMaxInflightRequestIsReached(t *t bucket, err := filesystem.NewBucketClient(filesystem.Config{Directory: storageDir}) require.NoError(t, err) - stores, err := NewBucketStores(cfg, NewNoShardingStrategy(log.NewNopLogger(), nil), bucket, defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg) + stores, err := NewBucketStores(cfg, NewNoShardingStrategy(log.NewNopLogger(), nil), objstore.WithNoopInstr(bucket), defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg) require.NoError(t, err) require.NoError(t, stores.InitialSync(context.Background())) @@ -547,7 +547,7 @@ func TestBucketStores_Series_ShouldNotCheckMaxInflightRequestsIfTheLimitIsDisabl bucket, err := filesystem.NewBucketClient(filesystem.Config{Directory: storageDir}) require.NoError(t, err) - stores, err := NewBucketStores(cfg, NewNoShardingStrategy(log.NewNopLogger(), nil), bucket, defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg) + stores, err := NewBucketStores(cfg, NewNoShardingStrategy(log.NewNopLogger(), nil), objstore.WithNoopInstr(bucket), defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg) require.NoError(t, err) require.NoError(t, stores.InitialSync(context.Background())) @@ -691,7 +691,7 @@ func TestBucketStores_deleteLocalFilesForExcludedTenants(t *testing.T) { sharding := userShardingStrategy{} reg := prometheus.NewPedanticRegistry() - stores, err := NewBucketStores(cfg, &sharding, bucket, defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg) + stores, err := NewBucketStores(cfg, &sharding, objstore.WithNoopInstr(bucket), defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg) require.NoError(t, err) // Perform sync. diff --git a/pkg/storegateway/gateway.go b/pkg/storegateway/gateway.go index 5633974913..ebea731689 100644 --- a/pkg/storegateway/gateway.go +++ b/pkg/storegateway/gateway.go @@ -134,7 +134,7 @@ func NewStoreGateway(gatewayCfg Config, storageCfg cortex_tsdb.BlocksStorageConf return newStoreGateway(gatewayCfg, storageCfg, bucketClient, ringStore, limits, logLevel, logger, reg) } -func newStoreGateway(gatewayCfg Config, storageCfg cortex_tsdb.BlocksStorageConfig, bucketClient objstore.Bucket, ringStore kv.Client, limits *validation.Overrides, logLevel logging.Level, logger log.Logger, reg prometheus.Registerer) (*StoreGateway, error) { +func newStoreGateway(gatewayCfg Config, storageCfg cortex_tsdb.BlocksStorageConfig, bucketClient objstore.InstrumentedBucket, ringStore kv.Client, limits *validation.Overrides, logLevel logging.Level, logger log.Logger, reg prometheus.Registerer) (*StoreGateway, error) { var err error g := &StoreGateway{ @@ -408,7 +408,7 @@ func (g *StoreGateway) OnRingInstanceStopping(_ *ring.BasicLifecycler) func (g *StoreGateway) OnRingInstanceHeartbeat(_ *ring.BasicLifecycler, _ *ring.Desc, _ *ring.InstanceDesc) { } -func createBucketClient(cfg cortex_tsdb.BlocksStorageConfig, logger log.Logger, reg prometheus.Registerer) (objstore.Bucket, error) { +func createBucketClient(cfg cortex_tsdb.BlocksStorageConfig, logger log.Logger, reg prometheus.Registerer) (objstore.InstrumentedBucket, error) { bucketClient, err := bucket.NewClient(context.Background(), cfg.Bucket, "store-gateway", logger, reg) if err != nil { return nil, errors.Wrap(err, "create bucket client") diff --git a/pkg/storegateway/gateway_test.go b/pkg/storegateway/gateway_test.go index 387f991cb6..0908701baf 100644 --- a/pkg/storegateway/gateway_test.go +++ b/pkg/storegateway/gateway_test.go @@ -957,7 +957,7 @@ func TestStoreGateway_SeriesQueryingShouldRemoveExternalLabels(t *testing.T) { storageCfg := mockStorageConfig(t) storageCfg.BucketStore.BucketIndex.Enabled = bucketIndexEnabled - g, err := newStoreGateway(gatewayCfg, storageCfg, bucketClient, nil, defaultLimitsOverrides(t), mockLoggingLevel(), logger, nil) + g, err := newStoreGateway(gatewayCfg, storageCfg, objstore.WithNoopInstr(bucketClient), nil, defaultLimitsOverrides(t), mockLoggingLevel(), logger, nil) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(ctx, g)) defer services.StopAndAwaitTerminated(ctx, g) //nolint:errcheck @@ -1056,7 +1056,7 @@ func TestStoreGateway_SeriesQueryingShouldEnforceMaxChunksPerQueryLimit(t *testi gatewayCfg.ShardingEnabled = false storageCfg := mockStorageConfig(t) - g, err := newStoreGateway(gatewayCfg, storageCfg, bucketClient, nil, overrides, mockLoggingLevel(), logger, nil) + g, err := newStoreGateway(gatewayCfg, storageCfg, objstore.WithNoopInstr(bucketClient), nil, overrides, mockLoggingLevel(), logger, nil) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(ctx, g)) defer services.StopAndAwaitTerminated(ctx, g) //nolint:errcheck @@ -1145,7 +1145,7 @@ func TestStoreGateway_SeriesQueryingShouldEnforceMaxSeriesPerQueryLimit(t *testi gatewayCfg.ShardingEnabled = false storageCfg := mockStorageConfig(t) - g, err := newStoreGateway(gatewayCfg, storageCfg, bucketClient, nil, overrides, mockLoggingLevel(), logger, nil) + g, err := newStoreGateway(gatewayCfg, storageCfg, objstore.WithNoopInstr(bucketClient), nil, overrides, mockLoggingLevel(), logger, nil) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(ctx, g)) defer services.StopAndAwaitTerminated(ctx, g) //nolint:errcheck