From b15db240fceb9384d620294bd2dfb47c318fe97b Mon Sep 17 00:00:00 2001 From: Alan Protasio Date: Mon, 4 Dec 2023 18:33:25 -0800 Subject: [PATCH] Do not count 404 errors when checking if tenant deletion markers exists Signed-off-by: Alan Protasio --- pkg/compactor/blocks_cleaner.go | 4 ++-- pkg/compactor/compactor.go | 8 ++++---- pkg/compactor/compactor_test.go | 8 ++++---- pkg/storage/bucket/client.go | 11 ++++++----- pkg/storage/bucket/client_mock.go | 8 ++++++++ .../tsdb/bucketindex/markers_bucket_client.go | 2 +- pkg/storage/tsdb/caching_bucket.go | 2 +- pkg/storage/tsdb/tenant_deletion_mark.go | 8 ++++---- pkg/storage/tsdb/testutil/objstore.go | 2 +- pkg/storegateway/bucket_stores.go | 2 +- pkg/storegateway/bucket_stores_test.go | 14 +++++++------- pkg/storegateway/gateway.go | 4 ++-- 12 files changed, 41 insertions(+), 32 deletions(-) diff --git a/pkg/compactor/blocks_cleaner.go b/pkg/compactor/blocks_cleaner.go index 5cdda9d7646..d1a81f401cf 100644 --- a/pkg/compactor/blocks_cleaner.go +++ b/pkg/compactor/blocks_cleaner.go @@ -43,7 +43,7 @@ type BlocksCleaner struct { cfg BlocksCleanerConfig cfgProvider ConfigProvider logger log.Logger - bucketClient objstore.Bucket + bucketClient objstore.InstrumentedBucket usersScanner *cortex_tsdb.UsersScanner // Keep track of the last owned users. @@ -64,7 +64,7 @@ type BlocksCleaner struct { tenantBucketIndexLastUpdate *prometheus.GaugeVec } -func NewBlocksCleaner(cfg BlocksCleanerConfig, bucketClient objstore.Bucket, usersScanner *cortex_tsdb.UsersScanner, cfgProvider ConfigProvider, logger log.Logger, reg prometheus.Registerer) *BlocksCleaner { +func NewBlocksCleaner(cfg BlocksCleanerConfig, bucketClient objstore.InstrumentedBucket, usersScanner *cortex_tsdb.UsersScanner, cfgProvider ConfigProvider, logger log.Logger, reg prometheus.Registerer) *BlocksCleaner { c := &BlocksCleaner{ cfg: cfg, bucketClient: bucketClient, diff --git a/pkg/compactor/compactor.go b/pkg/compactor/compactor.go index 06f242103a6..a7e5cf429dc 100644 --- a/pkg/compactor/compactor.go +++ b/pkg/compactor/compactor.go @@ -296,7 +296,7 @@ type Compactor struct { // Functions that creates bucket client, grouper, planner and compactor using the context. // Useful for injecting mock objects from tests. - bucketClientFactory func(ctx context.Context) (objstore.Bucket, error) + bucketClientFactory func(ctx context.Context) (objstore.InstrumentedBucket, error) blocksGrouperFactory BlocksGrouperFactory blocksCompactorFactory BlocksCompactorFactory @@ -312,7 +312,7 @@ type Compactor struct { blocksPlannerFactory PlannerFactory // Client used to run operations on the bucket storing blocks. - bucketClient objstore.Bucket + bucketClient objstore.InstrumentedBucket // Ring used for sharding compactions. ringLifecycler *ring.Lifecycler @@ -345,7 +345,7 @@ type Compactor struct { // NewCompactor makes a new Compactor. func NewCompactor(compactorCfg Config, storageCfg cortex_tsdb.BlocksStorageConfig, logger log.Logger, registerer prometheus.Registerer, limits *validation.Overrides) (*Compactor, error) { - bucketClientFactory := func(ctx context.Context) (objstore.Bucket, error) { + bucketClientFactory := func(ctx context.Context) (objstore.InstrumentedBucket, error) { return bucket.NewClient(ctx, storageCfg.Bucket, "compactor", logger, registerer) } @@ -380,7 +380,7 @@ func newCompactor( storageCfg cortex_tsdb.BlocksStorageConfig, logger log.Logger, registerer prometheus.Registerer, - bucketClientFactory func(ctx context.Context) (objstore.Bucket, error), + bucketClientFactory func(ctx context.Context) (objstore.InstrumentedBucket, error), blocksGrouperFactory BlocksGrouperFactory, blocksCompactorFactory BlocksCompactorFactory, limits *validation.Overrides, diff --git a/pkg/compactor/compactor_test.go b/pkg/compactor/compactor_test.go index 03659a53638..8998fa4fae5 100644 --- a/pkg/compactor/compactor_test.go +++ b/pkg/compactor/compactor_test.go @@ -1647,7 +1647,7 @@ func prepareConfig() Config { return compactorCfg } -func prepare(t *testing.T, compactorCfg Config, bucketClient objstore.Bucket, limits *validation.Limits) (*Compactor, *tsdbCompactorMock, *tsdbPlannerMock, *concurrency.SyncBuffer, prometheus.Gatherer) { +func prepare(t *testing.T, compactorCfg Config, bucketClient objstore.InstrumentedBucket, limits *validation.Limits) (*Compactor, *tsdbCompactorMock, *tsdbPlannerMock, *concurrency.SyncBuffer, prometheus.Gatherer) { storageCfg := cortex_tsdb.BlocksStorageConfig{} flagext.DefaultValues(&storageCfg) @@ -1670,7 +1670,7 @@ func prepare(t *testing.T, compactorCfg Config, bucketClient objstore.Bucket, li overrides, err := validation.NewOverrides(*limits, nil) require.NoError(t, err) - bucketClientFactory := func(ctx context.Context) (objstore.Bucket, error) { + bucketClientFactory := func(ctx context.Context) (objstore.InstrumentedBucket, error) { return bucketClient, nil } @@ -1845,7 +1845,7 @@ func TestCompactor_DeleteLocalSyncFiles(t *testing.T) { userIDs = append(userIDs, fmt.Sprintf("user-%d", i)) } - inmem := objstore.NewInMemBucket() + inmem := objstore.WithNoopInstr(objstore.NewInMemBucket()) for _, userID := range userIDs { id, err := ulid.New(ulid.Now(), rand.Reader) require.NoError(t, err) @@ -1956,7 +1956,7 @@ func TestCompactor_ShouldFailCompactionOnTimeout(t *testing.T) { } func TestCompactor_ShouldNotTreatInterruptionsAsErrors(t *testing.T) { - bucketClient := objstore.NewInMemBucket() + bucketClient := objstore.WithNoopInstr(objstore.NewInMemBucket()) id := ulid.MustNew(ulid.Now(), rand.Reader) require.NoError(t, bucketClient.Upload(context.Background(), "user-1/"+id.String()+"/meta.json", strings.NewReader(mockBlockMetaJSON(id.String())))) diff --git a/pkg/storage/bucket/client.go b/pkg/storage/bucket/client.go index 15133f00287..7d120cd6d77 100644 --- a/pkg/storage/bucket/client.go +++ b/pkg/storage/bucket/client.go @@ -57,7 +57,7 @@ type Config struct { // Not used internally, meant to allow callers to wrap Buckets // created using this config - Middlewares []func(objstore.Bucket) (objstore.Bucket, error) `yaml:"-"` + Middlewares []func(objstore.InstrumentedBucket) (objstore.InstrumentedBucket, error) `yaml:"-"` // Used to inject additional backends into the config. Allows for this config to // be embedded in multiple contexts and support non-object storage based backends. @@ -103,7 +103,8 @@ func (cfg *Config) Validate() error { } // NewClient creates a new bucket client based on the configured backend -func NewClient(ctx context.Context, cfg Config, name string, logger log.Logger, reg prometheus.Registerer) (client objstore.Bucket, err error) { +func NewClient(ctx context.Context, cfg Config, name string, logger log.Logger, reg prometheus.Registerer) (bucket objstore.InstrumentedBucket, err error) { + var client objstore.Bucket switch cfg.Backend { case S3: client, err = s3.NewBucketClient(cfg.S3, name, logger) @@ -123,17 +124,17 @@ func NewClient(ctx context.Context, cfg Config, name string, logger log.Logger, return nil, err } - client = opentracing.WrapWithTraces(bucketWithMetrics(client, name, reg)) + iClient := opentracing.WrapWithTraces(bucketWithMetrics(client, name, reg)) // Wrap the client with any provided middleware for _, wrap := range cfg.Middlewares { - client, err = wrap(client) + iClient, err = wrap(iClient) if err != nil { return nil, err } } - return client, nil + return iClient, nil } func bucketWithMetrics(bucketClient objstore.Bucket, name string, reg prometheus.Registerer) objstore.Bucket { diff --git a/pkg/storage/bucket/client_mock.go b/pkg/storage/bucket/client_mock.go index 55ab7f7dc1a..ac96370aa92 100644 --- a/pkg/storage/bucket/client_mock.go +++ b/pkg/storage/bucket/client_mock.go @@ -23,6 +23,14 @@ type ClientMock struct { uploaded sync.Map } +func (m *ClientMock) WithExpectedErrs(objstore.IsOpFailureExpectedFunc) objstore.Bucket { + return m +} + +func (m *ClientMock) ReaderWithExpectedErrs(objstore.IsOpFailureExpectedFunc) objstore.BucketReader { + return m +} + // Upload mocks objstore.Bucket.Upload() func (m *ClientMock) Upload(ctx context.Context, name string, r io.Reader) error { if _, ok := m.uploaded.Load(name); ok { diff --git a/pkg/storage/tsdb/bucketindex/markers_bucket_client.go b/pkg/storage/tsdb/bucketindex/markers_bucket_client.go index 4585d842ad8..c0dbb6e7660 100644 --- a/pkg/storage/tsdb/bucketindex/markers_bucket_client.go +++ b/pkg/storage/tsdb/bucketindex/markers_bucket_client.go @@ -18,7 +18,7 @@ type globalMarkersBucket struct { // BucketWithGlobalMarkers wraps the input bucket into a bucket which also keeps track of markers // in the global markers location. -func BucketWithGlobalMarkers(b objstore.Bucket) objstore.Bucket { +func BucketWithGlobalMarkers(b objstore.InstrumentedBucket) objstore.InstrumentedBucket { return &globalMarkersBucket{ parent: b, } diff --git a/pkg/storage/tsdb/caching_bucket.go b/pkg/storage/tsdb/caching_bucket.go index 535562b703b..990f8119e0c 100644 --- a/pkg/storage/tsdb/caching_bucket.go +++ b/pkg/storage/tsdb/caching_bucket.go @@ -110,7 +110,7 @@ func (cfg *MetadataCacheConfig) Validate() error { return cfg.CacheBackend.Validate() } -func CreateCachingBucket(chunksConfig ChunksCacheConfig, metadataConfig MetadataCacheConfig, bkt objstore.Bucket, logger log.Logger, reg prometheus.Registerer) (objstore.Bucket, error) { +func CreateCachingBucket(chunksConfig ChunksCacheConfig, metadataConfig MetadataCacheConfig, bkt objstore.InstrumentedBucket, logger log.Logger, reg prometheus.Registerer) (objstore.InstrumentedBucket, error) { cfg := cache.NewCachingBucketConfig() cachingConfigured := false diff --git a/pkg/storage/tsdb/tenant_deletion_mark.go b/pkg/storage/tsdb/tenant_deletion_mark.go index 8b07013d2c9..33ba7b22ceb 100644 --- a/pkg/storage/tsdb/tenant_deletion_mark.go +++ b/pkg/storage/tsdb/tenant_deletion_mark.go @@ -43,22 +43,22 @@ func TenantDeletionMarkExists(ctx context.Context, bkt objstore.BucketReader, us } // Uploads deletion mark to the tenant location in the bucket. -func WriteTenantDeletionMark(ctx context.Context, bkt objstore.Bucket, userID string, mark *TenantDeletionMark) error { +func WriteTenantDeletionMark(ctx context.Context, bkt objstore.InstrumentedBucket, userID string, mark *TenantDeletionMark) error { markerFile := GetGlobalDeletionMarkPath(userID) return write(ctx, bkt, markerFile, mark) } // Returns tenant deletion mark for given user, if it exists. If it doesn't exist, returns nil mark, and no error. -func ReadTenantDeletionMark(ctx context.Context, bkt objstore.BucketReader, userID string) (*TenantDeletionMark, error) { +func ReadTenantDeletionMark(ctx context.Context, bkt objstore.InstrumentedBucket, userID string) (*TenantDeletionMark, error) { markerFile := GetGlobalDeletionMarkPath(userID) - if mark, err := read(ctx, bkt, markerFile); err != nil { + if mark, err := read(ctx, bkt.WithExpectedErrs(bkt.IsObjNotFoundErr), markerFile); err != nil { return nil, err } else if mark != nil { return mark, nil } markerFile = GetLocalDeletionMarkPath(userID) - return read(ctx, bkt, markerFile) + return read(ctx, bkt.WithExpectedErrs(bkt.IsObjNotFoundErr), markerFile) } // Deletes the tenant deletion mark for given user if it exists. diff --git a/pkg/storage/tsdb/testutil/objstore.go b/pkg/storage/tsdb/testutil/objstore.go index f9d5e67a729..d879ab2bb42 100644 --- a/pkg/storage/tsdb/testutil/objstore.go +++ b/pkg/storage/tsdb/testutil/objstore.go @@ -19,7 +19,7 @@ import ( var ErrKeyAccessDeniedError = errors.New("test key access denied") -func PrepareFilesystemBucket(t testing.TB) (objstore.Bucket, string) { +func PrepareFilesystemBucket(t testing.TB) (objstore.InstrumentedBucket, string) { storageDir, err := os.MkdirTemp(os.TempDir(), "bucket") require.NoError(t, err) diff --git a/pkg/storegateway/bucket_stores.go b/pkg/storegateway/bucket_stores.go index ab82d3037a1..dd2b5ad6338 100644 --- a/pkg/storegateway/bucket_stores.go +++ b/pkg/storegateway/bucket_stores.go @@ -87,7 +87,7 @@ type BucketStores struct { var ErrTooManyInflightRequests = status.Error(codes.ResourceExhausted, "too many inflight requests in store gateway") // NewBucketStores makes a new BucketStores. -func NewBucketStores(cfg tsdb.BlocksStorageConfig, shardingStrategy ShardingStrategy, bucketClient objstore.Bucket, limits *validation.Overrides, logLevel logging.Level, logger log.Logger, reg prometheus.Registerer) (*BucketStores, error) { +func NewBucketStores(cfg tsdb.BlocksStorageConfig, shardingStrategy ShardingStrategy, bucketClient objstore.InstrumentedBucket, limits *validation.Overrides, logLevel logging.Level, logger log.Logger, reg prometheus.Registerer) (*BucketStores, error) { cachingBucket, err := tsdb.CreateCachingBucket(cfg.BucketStore.ChunksCache, cfg.BucketStore.MetadataCache, bucketClient, logger, reg) if err != nil { return nil, errors.Wrapf(err, "create caching bucket") diff --git a/pkg/storegateway/bucket_stores_test.go b/pkg/storegateway/bucket_stores_test.go index da38ce2eaa3..a10a4599fba 100644 --- a/pkg/storegateway/bucket_stores_test.go +++ b/pkg/storegateway/bucket_stores_test.go @@ -200,7 +200,7 @@ func TestBucketStores_InitialSync(t *testing.T) { require.NoError(t, err) reg := prometheus.NewPedanticRegistry() - stores, err := NewBucketStores(cfg, NewNoShardingStrategy(log.NewNopLogger(), nil), bucket, defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg) + stores, err := NewBucketStores(cfg, NewNoShardingStrategy(log.NewNopLogger(), nil), objstore.WithNoopInstr(bucket), defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg) require.NoError(t, err) // Query series before the initial sync. @@ -276,7 +276,7 @@ func TestBucketStores_InitialSyncShouldRetryOnFailure(t *testing.T) { bucket = &failFirstGetBucket{Bucket: bucket} reg := prometheus.NewPedanticRegistry() - stores, err := NewBucketStores(cfg, NewNoShardingStrategy(log.NewNopLogger(), nil), bucket, defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg) + stores, err := NewBucketStores(cfg, NewNoShardingStrategy(log.NewNopLogger(), nil), objstore.WithNoopInstr(bucket), defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg) require.NoError(t, err) // Initial sync should succeed even if a transient error occurs. @@ -336,7 +336,7 @@ func TestBucketStores_SyncBlocks(t *testing.T) { require.NoError(t, err) reg := prometheus.NewPedanticRegistry() - stores, err := NewBucketStores(cfg, NewNoShardingStrategy(log.NewNopLogger(), nil), bucket, defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg) + stores, err := NewBucketStores(cfg, NewNoShardingStrategy(log.NewNopLogger(), nil), objstore.WithNoopInstr(bucket), defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg) require.NoError(t, err) // Run an initial sync to discover 1 block. @@ -470,7 +470,7 @@ func testBucketStoresSeriesShouldCorrectlyQuerySeriesSpanningMultipleChunks(t *t require.NoError(t, err) reg := prometheus.NewPedanticRegistry() - stores, err := NewBucketStores(cfg, NewNoShardingStrategy(log.NewNopLogger(), nil), bucket, defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg) + stores, err := NewBucketStores(cfg, NewNoShardingStrategy(log.NewNopLogger(), nil), objstore.WithNoopInstr(bucket), defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg) require.NoError(t, err) require.NoError(t, stores.InitialSync(ctx)) @@ -526,7 +526,7 @@ func TestBucketStores_Series_ShouldReturnErrorIfMaxInflightRequestIsReached(t *t bucket, err := filesystem.NewBucketClient(filesystem.Config{Directory: storageDir}) require.NoError(t, err) - stores, err := NewBucketStores(cfg, NewNoShardingStrategy(log.NewNopLogger(), nil), bucket, defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg) + stores, err := NewBucketStores(cfg, NewNoShardingStrategy(log.NewNopLogger(), nil), objstore.WithNoopInstr(bucket), defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg) require.NoError(t, err) require.NoError(t, stores.InitialSync(context.Background())) @@ -547,7 +547,7 @@ func TestBucketStores_Series_ShouldNotCheckMaxInflightRequestsIfTheLimitIsDisabl bucket, err := filesystem.NewBucketClient(filesystem.Config{Directory: storageDir}) require.NoError(t, err) - stores, err := NewBucketStores(cfg, NewNoShardingStrategy(log.NewNopLogger(), nil), bucket, defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg) + stores, err := NewBucketStores(cfg, NewNoShardingStrategy(log.NewNopLogger(), nil), objstore.WithNoopInstr(bucket), defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg) require.NoError(t, err) require.NoError(t, stores.InitialSync(context.Background())) @@ -691,7 +691,7 @@ func TestBucketStores_deleteLocalFilesForExcludedTenants(t *testing.T) { sharding := userShardingStrategy{} reg := prometheus.NewPedanticRegistry() - stores, err := NewBucketStores(cfg, &sharding, bucket, defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg) + stores, err := NewBucketStores(cfg, &sharding, objstore.WithNoopInstr(bucket), defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg) require.NoError(t, err) // Perform sync. diff --git a/pkg/storegateway/gateway.go b/pkg/storegateway/gateway.go index 5633974913c..ebea7316899 100644 --- a/pkg/storegateway/gateway.go +++ b/pkg/storegateway/gateway.go @@ -134,7 +134,7 @@ func NewStoreGateway(gatewayCfg Config, storageCfg cortex_tsdb.BlocksStorageConf return newStoreGateway(gatewayCfg, storageCfg, bucketClient, ringStore, limits, logLevel, logger, reg) } -func newStoreGateway(gatewayCfg Config, storageCfg cortex_tsdb.BlocksStorageConfig, bucketClient objstore.Bucket, ringStore kv.Client, limits *validation.Overrides, logLevel logging.Level, logger log.Logger, reg prometheus.Registerer) (*StoreGateway, error) { +func newStoreGateway(gatewayCfg Config, storageCfg cortex_tsdb.BlocksStorageConfig, bucketClient objstore.InstrumentedBucket, ringStore kv.Client, limits *validation.Overrides, logLevel logging.Level, logger log.Logger, reg prometheus.Registerer) (*StoreGateway, error) { var err error g := &StoreGateway{ @@ -408,7 +408,7 @@ func (g *StoreGateway) OnRingInstanceStopping(_ *ring.BasicLifecycler) func (g *StoreGateway) OnRingInstanceHeartbeat(_ *ring.BasicLifecycler, _ *ring.Desc, _ *ring.InstanceDesc) { } -func createBucketClient(cfg cortex_tsdb.BlocksStorageConfig, logger log.Logger, reg prometheus.Registerer) (objstore.Bucket, error) { +func createBucketClient(cfg cortex_tsdb.BlocksStorageConfig, logger log.Logger, reg prometheus.Registerer) (objstore.InstrumentedBucket, error) { bucketClient, err := bucket.NewClient(context.Background(), cfg.Bucket, "store-gateway", logger, reg) if err != nil { return nil, errors.Wrap(err, "create bucket client")