Skip to content

Commit

Permalink
Do not count 404 errors when checking if tenant deletion markers exists
Browse files Browse the repository at this point in the history
Signed-off-by: Alan Protasio <[email protected]>
  • Loading branch information
alanprot committed Dec 5, 2023
1 parent 23294aa commit b15db24
Show file tree
Hide file tree
Showing 12 changed files with 41 additions and 32 deletions.
4 changes: 2 additions & 2 deletions pkg/compactor/blocks_cleaner.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ type BlocksCleaner struct {
cfg BlocksCleanerConfig
cfgProvider ConfigProvider
logger log.Logger
bucketClient objstore.Bucket
bucketClient objstore.InstrumentedBucket
usersScanner *cortex_tsdb.UsersScanner

// Keep track of the last owned users.
Expand All @@ -64,7 +64,7 @@ type BlocksCleaner struct {
tenantBucketIndexLastUpdate *prometheus.GaugeVec
}

func NewBlocksCleaner(cfg BlocksCleanerConfig, bucketClient objstore.Bucket, usersScanner *cortex_tsdb.UsersScanner, cfgProvider ConfigProvider, logger log.Logger, reg prometheus.Registerer) *BlocksCleaner {
func NewBlocksCleaner(cfg BlocksCleanerConfig, bucketClient objstore.InstrumentedBucket, usersScanner *cortex_tsdb.UsersScanner, cfgProvider ConfigProvider, logger log.Logger, reg prometheus.Registerer) *BlocksCleaner {
c := &BlocksCleaner{
cfg: cfg,
bucketClient: bucketClient,
Expand Down
8 changes: 4 additions & 4 deletions pkg/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ type Compactor struct {

// Functions that creates bucket client, grouper, planner and compactor using the context.
// Useful for injecting mock objects from tests.
bucketClientFactory func(ctx context.Context) (objstore.Bucket, error)
bucketClientFactory func(ctx context.Context) (objstore.InstrumentedBucket, error)
blocksGrouperFactory BlocksGrouperFactory
blocksCompactorFactory BlocksCompactorFactory

Expand All @@ -312,7 +312,7 @@ type Compactor struct {
blocksPlannerFactory PlannerFactory

// Client used to run operations on the bucket storing blocks.
bucketClient objstore.Bucket
bucketClient objstore.InstrumentedBucket

// Ring used for sharding compactions.
ringLifecycler *ring.Lifecycler
Expand Down Expand Up @@ -345,7 +345,7 @@ type Compactor struct {

// NewCompactor makes a new Compactor.
func NewCompactor(compactorCfg Config, storageCfg cortex_tsdb.BlocksStorageConfig, logger log.Logger, registerer prometheus.Registerer, limits *validation.Overrides) (*Compactor, error) {
bucketClientFactory := func(ctx context.Context) (objstore.Bucket, error) {
bucketClientFactory := func(ctx context.Context) (objstore.InstrumentedBucket, error) {
return bucket.NewClient(ctx, storageCfg.Bucket, "compactor", logger, registerer)
}

Expand Down Expand Up @@ -380,7 +380,7 @@ func newCompactor(
storageCfg cortex_tsdb.BlocksStorageConfig,
logger log.Logger,
registerer prometheus.Registerer,
bucketClientFactory func(ctx context.Context) (objstore.Bucket, error),
bucketClientFactory func(ctx context.Context) (objstore.InstrumentedBucket, error),
blocksGrouperFactory BlocksGrouperFactory,
blocksCompactorFactory BlocksCompactorFactory,
limits *validation.Overrides,
Expand Down
8 changes: 4 additions & 4 deletions pkg/compactor/compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1647,7 +1647,7 @@ func prepareConfig() Config {
return compactorCfg
}

func prepare(t *testing.T, compactorCfg Config, bucketClient objstore.Bucket, limits *validation.Limits) (*Compactor, *tsdbCompactorMock, *tsdbPlannerMock, *concurrency.SyncBuffer, prometheus.Gatherer) {
func prepare(t *testing.T, compactorCfg Config, bucketClient objstore.InstrumentedBucket, limits *validation.Limits) (*Compactor, *tsdbCompactorMock, *tsdbPlannerMock, *concurrency.SyncBuffer, prometheus.Gatherer) {
storageCfg := cortex_tsdb.BlocksStorageConfig{}
flagext.DefaultValues(&storageCfg)

Expand All @@ -1670,7 +1670,7 @@ func prepare(t *testing.T, compactorCfg Config, bucketClient objstore.Bucket, li
overrides, err := validation.NewOverrides(*limits, nil)
require.NoError(t, err)

bucketClientFactory := func(ctx context.Context) (objstore.Bucket, error) {
bucketClientFactory := func(ctx context.Context) (objstore.InstrumentedBucket, error) {
return bucketClient, nil
}

Expand Down Expand Up @@ -1845,7 +1845,7 @@ func TestCompactor_DeleteLocalSyncFiles(t *testing.T) {
userIDs = append(userIDs, fmt.Sprintf("user-%d", i))
}

inmem := objstore.NewInMemBucket()
inmem := objstore.WithNoopInstr(objstore.NewInMemBucket())
for _, userID := range userIDs {
id, err := ulid.New(ulid.Now(), rand.Reader)
require.NoError(t, err)
Expand Down Expand Up @@ -1956,7 +1956,7 @@ func TestCompactor_ShouldFailCompactionOnTimeout(t *testing.T) {
}

func TestCompactor_ShouldNotTreatInterruptionsAsErrors(t *testing.T) {
bucketClient := objstore.NewInMemBucket()
bucketClient := objstore.WithNoopInstr(objstore.NewInMemBucket())
id := ulid.MustNew(ulid.Now(), rand.Reader)
require.NoError(t, bucketClient.Upload(context.Background(), "user-1/"+id.String()+"/meta.json", strings.NewReader(mockBlockMetaJSON(id.String()))))

Expand Down
11 changes: 6 additions & 5 deletions pkg/storage/bucket/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ type Config struct {

// Not used internally, meant to allow callers to wrap Buckets
// created using this config
Middlewares []func(objstore.Bucket) (objstore.Bucket, error) `yaml:"-"`
Middlewares []func(objstore.InstrumentedBucket) (objstore.InstrumentedBucket, error) `yaml:"-"`

// Used to inject additional backends into the config. Allows for this config to
// be embedded in multiple contexts and support non-object storage based backends.
Expand Down Expand Up @@ -103,7 +103,8 @@ func (cfg *Config) Validate() error {
}

// NewClient creates a new bucket client based on the configured backend
func NewClient(ctx context.Context, cfg Config, name string, logger log.Logger, reg prometheus.Registerer) (client objstore.Bucket, err error) {
func NewClient(ctx context.Context, cfg Config, name string, logger log.Logger, reg prometheus.Registerer) (bucket objstore.InstrumentedBucket, err error) {
var client objstore.Bucket
switch cfg.Backend {
case S3:
client, err = s3.NewBucketClient(cfg.S3, name, logger)
Expand All @@ -123,17 +124,17 @@ func NewClient(ctx context.Context, cfg Config, name string, logger log.Logger,
return nil, err
}

client = opentracing.WrapWithTraces(bucketWithMetrics(client, name, reg))
iClient := opentracing.WrapWithTraces(bucketWithMetrics(client, name, reg))

// Wrap the client with any provided middleware
for _, wrap := range cfg.Middlewares {
client, err = wrap(client)
iClient, err = wrap(iClient)
if err != nil {
return nil, err
}
}

return client, nil
return iClient, nil
}

func bucketWithMetrics(bucketClient objstore.Bucket, name string, reg prometheus.Registerer) objstore.Bucket {
Expand Down
8 changes: 8 additions & 0 deletions pkg/storage/bucket/client_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,14 @@ type ClientMock struct {
uploaded sync.Map
}

func (m *ClientMock) WithExpectedErrs(objstore.IsOpFailureExpectedFunc) objstore.Bucket {
return m
}

func (m *ClientMock) ReaderWithExpectedErrs(objstore.IsOpFailureExpectedFunc) objstore.BucketReader {
return m
}

// Upload mocks objstore.Bucket.Upload()
func (m *ClientMock) Upload(ctx context.Context, name string, r io.Reader) error {
if _, ok := m.uploaded.Load(name); ok {
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/tsdb/bucketindex/markers_bucket_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ type globalMarkersBucket struct {

// BucketWithGlobalMarkers wraps the input bucket into a bucket which also keeps track of markers
// in the global markers location.
func BucketWithGlobalMarkers(b objstore.Bucket) objstore.Bucket {
func BucketWithGlobalMarkers(b objstore.InstrumentedBucket) objstore.InstrumentedBucket {
return &globalMarkersBucket{
parent: b,
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/tsdb/caching_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func (cfg *MetadataCacheConfig) Validate() error {
return cfg.CacheBackend.Validate()
}

func CreateCachingBucket(chunksConfig ChunksCacheConfig, metadataConfig MetadataCacheConfig, bkt objstore.Bucket, logger log.Logger, reg prometheus.Registerer) (objstore.Bucket, error) {
func CreateCachingBucket(chunksConfig ChunksCacheConfig, metadataConfig MetadataCacheConfig, bkt objstore.InstrumentedBucket, logger log.Logger, reg prometheus.Registerer) (objstore.InstrumentedBucket, error) {
cfg := cache.NewCachingBucketConfig()
cachingConfigured := false

Expand Down
8 changes: 4 additions & 4 deletions pkg/storage/tsdb/tenant_deletion_mark.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,22 +43,22 @@ func TenantDeletionMarkExists(ctx context.Context, bkt objstore.BucketReader, us
}

// Uploads deletion mark to the tenant location in the bucket.
func WriteTenantDeletionMark(ctx context.Context, bkt objstore.Bucket, userID string, mark *TenantDeletionMark) error {
func WriteTenantDeletionMark(ctx context.Context, bkt objstore.InstrumentedBucket, userID string, mark *TenantDeletionMark) error {
markerFile := GetGlobalDeletionMarkPath(userID)
return write(ctx, bkt, markerFile, mark)
}

// Returns tenant deletion mark for given user, if it exists. If it doesn't exist, returns nil mark, and no error.
func ReadTenantDeletionMark(ctx context.Context, bkt objstore.BucketReader, userID string) (*TenantDeletionMark, error) {
func ReadTenantDeletionMark(ctx context.Context, bkt objstore.InstrumentedBucket, userID string) (*TenantDeletionMark, error) {
markerFile := GetGlobalDeletionMarkPath(userID)
if mark, err := read(ctx, bkt, markerFile); err != nil {
if mark, err := read(ctx, bkt.WithExpectedErrs(bkt.IsObjNotFoundErr), markerFile); err != nil {
return nil, err
} else if mark != nil {
return mark, nil
}

markerFile = GetLocalDeletionMarkPath(userID)
return read(ctx, bkt, markerFile)
return read(ctx, bkt.WithExpectedErrs(bkt.IsObjNotFoundErr), markerFile)
}

// Deletes the tenant deletion mark for given user if it exists.
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/tsdb/testutil/objstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (

var ErrKeyAccessDeniedError = errors.New("test key access denied")

func PrepareFilesystemBucket(t testing.TB) (objstore.Bucket, string) {
func PrepareFilesystemBucket(t testing.TB) (objstore.InstrumentedBucket, string) {
storageDir, err := os.MkdirTemp(os.TempDir(), "bucket")
require.NoError(t, err)

Expand Down
2 changes: 1 addition & 1 deletion pkg/storegateway/bucket_stores.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ type BucketStores struct {
var ErrTooManyInflightRequests = status.Error(codes.ResourceExhausted, "too many inflight requests in store gateway")

// NewBucketStores makes a new BucketStores.
func NewBucketStores(cfg tsdb.BlocksStorageConfig, shardingStrategy ShardingStrategy, bucketClient objstore.Bucket, limits *validation.Overrides, logLevel logging.Level, logger log.Logger, reg prometheus.Registerer) (*BucketStores, error) {
func NewBucketStores(cfg tsdb.BlocksStorageConfig, shardingStrategy ShardingStrategy, bucketClient objstore.InstrumentedBucket, limits *validation.Overrides, logLevel logging.Level, logger log.Logger, reg prometheus.Registerer) (*BucketStores, error) {
cachingBucket, err := tsdb.CreateCachingBucket(cfg.BucketStore.ChunksCache, cfg.BucketStore.MetadataCache, bucketClient, logger, reg)
if err != nil {
return nil, errors.Wrapf(err, "create caching bucket")
Expand Down
14 changes: 7 additions & 7 deletions pkg/storegateway/bucket_stores_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ func TestBucketStores_InitialSync(t *testing.T) {
require.NoError(t, err)

reg := prometheus.NewPedanticRegistry()
stores, err := NewBucketStores(cfg, NewNoShardingStrategy(log.NewNopLogger(), nil), bucket, defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg)
stores, err := NewBucketStores(cfg, NewNoShardingStrategy(log.NewNopLogger(), nil), objstore.WithNoopInstr(bucket), defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg)
require.NoError(t, err)

// Query series before the initial sync.
Expand Down Expand Up @@ -276,7 +276,7 @@ func TestBucketStores_InitialSyncShouldRetryOnFailure(t *testing.T) {
bucket = &failFirstGetBucket{Bucket: bucket}

reg := prometheus.NewPedanticRegistry()
stores, err := NewBucketStores(cfg, NewNoShardingStrategy(log.NewNopLogger(), nil), bucket, defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg)
stores, err := NewBucketStores(cfg, NewNoShardingStrategy(log.NewNopLogger(), nil), objstore.WithNoopInstr(bucket), defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg)
require.NoError(t, err)

// Initial sync should succeed even if a transient error occurs.
Expand Down Expand Up @@ -336,7 +336,7 @@ func TestBucketStores_SyncBlocks(t *testing.T) {
require.NoError(t, err)

reg := prometheus.NewPedanticRegistry()
stores, err := NewBucketStores(cfg, NewNoShardingStrategy(log.NewNopLogger(), nil), bucket, defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg)
stores, err := NewBucketStores(cfg, NewNoShardingStrategy(log.NewNopLogger(), nil), objstore.WithNoopInstr(bucket), defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg)
require.NoError(t, err)

// Run an initial sync to discover 1 block.
Expand Down Expand Up @@ -470,7 +470,7 @@ func testBucketStoresSeriesShouldCorrectlyQuerySeriesSpanningMultipleChunks(t *t
require.NoError(t, err)

reg := prometheus.NewPedanticRegistry()
stores, err := NewBucketStores(cfg, NewNoShardingStrategy(log.NewNopLogger(), nil), bucket, defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg)
stores, err := NewBucketStores(cfg, NewNoShardingStrategy(log.NewNopLogger(), nil), objstore.WithNoopInstr(bucket), defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg)
require.NoError(t, err)
require.NoError(t, stores.InitialSync(ctx))

Expand Down Expand Up @@ -526,7 +526,7 @@ func TestBucketStores_Series_ShouldReturnErrorIfMaxInflightRequestIsReached(t *t
bucket, err := filesystem.NewBucketClient(filesystem.Config{Directory: storageDir})
require.NoError(t, err)

stores, err := NewBucketStores(cfg, NewNoShardingStrategy(log.NewNopLogger(), nil), bucket, defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg)
stores, err := NewBucketStores(cfg, NewNoShardingStrategy(log.NewNopLogger(), nil), objstore.WithNoopInstr(bucket), defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg)
require.NoError(t, err)
require.NoError(t, stores.InitialSync(context.Background()))

Expand All @@ -547,7 +547,7 @@ func TestBucketStores_Series_ShouldNotCheckMaxInflightRequestsIfTheLimitIsDisabl
bucket, err := filesystem.NewBucketClient(filesystem.Config{Directory: storageDir})
require.NoError(t, err)

stores, err := NewBucketStores(cfg, NewNoShardingStrategy(log.NewNopLogger(), nil), bucket, defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg)
stores, err := NewBucketStores(cfg, NewNoShardingStrategy(log.NewNopLogger(), nil), objstore.WithNoopInstr(bucket), defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg)
require.NoError(t, err)
require.NoError(t, stores.InitialSync(context.Background()))

Expand Down Expand Up @@ -691,7 +691,7 @@ func TestBucketStores_deleteLocalFilesForExcludedTenants(t *testing.T) {
sharding := userShardingStrategy{}

reg := prometheus.NewPedanticRegistry()
stores, err := NewBucketStores(cfg, &sharding, bucket, defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg)
stores, err := NewBucketStores(cfg, &sharding, objstore.WithNoopInstr(bucket), defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg)
require.NoError(t, err)

// Perform sync.
Expand Down
4 changes: 2 additions & 2 deletions pkg/storegateway/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func NewStoreGateway(gatewayCfg Config, storageCfg cortex_tsdb.BlocksStorageConf
return newStoreGateway(gatewayCfg, storageCfg, bucketClient, ringStore, limits, logLevel, logger, reg)
}

func newStoreGateway(gatewayCfg Config, storageCfg cortex_tsdb.BlocksStorageConfig, bucketClient objstore.Bucket, ringStore kv.Client, limits *validation.Overrides, logLevel logging.Level, logger log.Logger, reg prometheus.Registerer) (*StoreGateway, error) {
func newStoreGateway(gatewayCfg Config, storageCfg cortex_tsdb.BlocksStorageConfig, bucketClient objstore.InstrumentedBucket, ringStore kv.Client, limits *validation.Overrides, logLevel logging.Level, logger log.Logger, reg prometheus.Registerer) (*StoreGateway, error) {
var err error

g := &StoreGateway{
Expand Down Expand Up @@ -408,7 +408,7 @@ func (g *StoreGateway) OnRingInstanceStopping(_ *ring.BasicLifecycler)
func (g *StoreGateway) OnRingInstanceHeartbeat(_ *ring.BasicLifecycler, _ *ring.Desc, _ *ring.InstanceDesc) {
}

func createBucketClient(cfg cortex_tsdb.BlocksStorageConfig, logger log.Logger, reg prometheus.Registerer) (objstore.Bucket, error) {
func createBucketClient(cfg cortex_tsdb.BlocksStorageConfig, logger log.Logger, reg prometheus.Registerer) (objstore.InstrumentedBucket, error) {
bucketClient, err := bucket.NewClient(context.Background(), cfg.Bucket, "store-gateway", logger, reg)
if err != nil {
return nil, errors.Wrap(err, "create bucket client")
Expand Down

0 comments on commit b15db24

Please sign in to comment.