Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Configurable block listing strategy #5828

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
* [CHANGE] Ruler: Remove `cortex_ruler_write_requests_total`, `cortex_ruler_write_requests_failed_total`, `cortex_ruler_queries_total`, `cortex_ruler_queries_failed_total`, and `cortex_ruler_query_seconds_total` metrics for the tenant when the ruler deletes the manager for the tenant. #5772
* [CHANGE] Main: Mark `mem-ballast-size-bytes` flag as deprecated. #5816
* [CHANGE] Querier: Mark `-querier.ingester-streaming` flag as deprecated. Now query ingester streaming is always enabled. #5817
* [CHANGE] AlertManager API: Removal of all api/v1/ endpoints following [2970](https://github.com/prometheus/alertmanager/pull/2970). [5841]
* [CHANGE] Compactor/Bucket Store: Added `-blocks-storage.bucket-store.block-discovery-strategy` to configure different block listing strategy. Reverted the current recursive block listing mechanism and use the strategy `Concurrent` as in 1.15. #5828
* [FEATURE] Ingester: Add per-tenant new metric `cortex_ingester_tsdb_data_replay_duration_seconds`. #5477
* [FEATURE] Query Frontend/Scheduler: Add query priority support. #5605
* [FEATURE] Tracing: Add `kuberesolver` to resolve endpoints address with `kubernetes://` prefix as Kubernetes service. #5731
Expand Down
11 changes: 11 additions & 0 deletions docs/blocks-storage/querier.md
Original file line number Diff line number Diff line change
Expand Up @@ -1314,6 +1314,17 @@ blocks_storage:
# CLI flag: -blocks-storage.bucket-store.bucket-index.max-stale-period
[max_stale_period: <duration> | default = 1h]

# One of concurrent, recursive, bucket_index. When set to concurrent, stores
# will concurrently issue one call per directory to discover active blocks
# in the bucket. The recursive strategy iterates through all objects in the
# bucket, recursively traversing into each directory. This avoids N+1 calls
# at the expense of having slower bucket iterations. bucket_index strategy
# can be used in Compactor only and utilizes the existing bucket index to
# fetch block IDs to sync. This avoids iterating the bucket but can be
# impacted by delays of cleaner creating bucket index.
# CLI flag: -blocks-storage.bucket-store.block-discovery-strategy
[block_discovery_strategy: <string> | default = "concurrent"]

# Max size - in bytes - of a chunks pool, used to reduce memory allocations.
# The pool is shared across all tenants. 0 to disable the limit.
# CLI flag: -blocks-storage.bucket-store.max-chunk-pool-bytes
Expand Down
11 changes: 11 additions & 0 deletions docs/blocks-storage/store-gateway.md
Original file line number Diff line number Diff line change
Expand Up @@ -1423,6 +1423,17 @@ blocks_storage:
# CLI flag: -blocks-storage.bucket-store.bucket-index.max-stale-period
[max_stale_period: <duration> | default = 1h]

# One of concurrent, recursive, bucket_index. When set to concurrent, stores
# will concurrently issue one call per directory to discover active blocks
# in the bucket. The recursive strategy iterates through all objects in the
# bucket, recursively traversing into each directory. This avoids N+1 calls
# at the expense of having slower bucket iterations. bucket_index strategy
# can be used in Compactor only and utilizes the existing bucket index to
# fetch block IDs to sync. This avoids iterating the bucket but can be
# impacted by delays of cleaner creating bucket index.
# CLI flag: -blocks-storage.bucket-store.block-discovery-strategy
[block_discovery_strategy: <string> | default = "concurrent"]

# Max size - in bytes - of a chunks pool, used to reduce memory allocations.
# The pool is shared across all tenants. 0 to disable the limit.
# CLI flag: -blocks-storage.bucket-store.max-chunk-pool-bytes
Expand Down
11 changes: 11 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -1850,6 +1850,17 @@ bucket_store:
# CLI flag: -blocks-storage.bucket-store.bucket-index.max-stale-period
[max_stale_period: <duration> | default = 1h]

# One of concurrent, recursive, bucket_index. When set to concurrent, stores
# will concurrently issue one call per directory to discover active blocks in
# the bucket. The recursive strategy iterates through all objects in the
# bucket, recursively traversing into each directory. This avoids N+1 calls at
# the expense of having slower bucket iterations. bucket_index strategy can be
# used in Compactor only and utilizes the existing bucket index to fetch block
# IDs to sync. This avoids iterating the bucket but can be impacted by delays
# of cleaner creating bucket index.
# CLI flag: -blocks-storage.bucket-store.block-discovery-strategy
[block_discovery_strategy: <string> | default = "concurrent"]

# Max size - in bytes - of a chunks pool, used to reduce memory allocations.
# The pool is shared across all tenants. 0 to disable the limit.
# CLI flag: -blocks-storage.bucket-store.max-chunk-pool-bytes
Expand Down
26 changes: 15 additions & 11 deletions pkg/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -827,22 +827,26 @@ func (c *Compactor) compactUser(ctx context.Context, userID string) error {
// out of order chunks or index file too big.
noCompactMarkerFilter := compact.NewGatherNoCompactionMarkFilter(ulogger, bucket, c.compactorCfg.MetaSyncConcurrency)

var blockIDsFetcher block.Lister
var fetcherULogger log.Logger
if c.storageCfg.BucketStore.BucketIndex.Enabled {
fetcherULogger = log.With(ulogger, "blockIdsFetcher", "BucketIndexBlockIDsFetcher")
blockIDsFetcher = bucketindex.NewBlockIDsFetcher(fetcherULogger, c.bucketClient, userID, c.limits)

} else {
fetcherULogger = log.With(ulogger, "blockIdsFetcher", "BaseBlockIDsFetcher")
blockIDsFetcher = block.NewRecursiveLister(fetcherULogger, bucket)
var blockLister block.Lister
switch cortex_tsdb.BlockDiscoveryStrategy(c.storageCfg.BucketStore.BlockDiscoveryStrategy) {
case cortex_tsdb.ConcurrentDiscovery:
blockLister = block.NewConcurrentLister(ulogger, bucket)
case cortex_tsdb.RecursiveDiscovery:
blockLister = block.NewRecursiveLister(ulogger, bucket)
case cortex_tsdb.BucketIndexDiscovery:
if !c.storageCfg.BucketStore.BucketIndex.Enabled {
return cortex_tsdb.ErrInvalidBucketIndexBlockDiscoveryStrategy
}
blockLister = bucketindex.NewBlockLister(ulogger, c.bucketClient, userID, c.limits)
default:
return cortex_tsdb.ErrBlockDiscoveryStrategy
}

fetcher, err := block.NewMetaFetcher(
fetcherULogger,
ulogger,
c.compactorCfg.MetaSyncConcurrency,
bucket,
blockIDsFetcher,
blockLister,
c.metaSyncDirForUser(userID),
reg,
// List of filters to apply (order matters).
Expand Down
1 change: 1 addition & 0 deletions pkg/compactor/compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1673,6 +1673,7 @@ func prepareConfig() Config {
func prepare(t *testing.T, compactorCfg Config, bucketClient objstore.InstrumentedBucket, limits *validation.Limits) (*Compactor, *tsdbCompactorMock, *tsdbPlannerMock, *concurrency.SyncBuffer, prometheus.Gatherer) {
storageCfg := cortex_tsdb.BlocksStorageConfig{}
flagext.DefaultValues(&storageCfg)
storageCfg.BucketStore.BlockDiscoveryStrategy = string(cortex_tsdb.RecursiveDiscovery)

// Create a temporary directory for compactor data.
compactorCfg.DataDir = t.TempDir()
Expand Down
19 changes: 17 additions & 2 deletions pkg/querier/blocks_finder_bucket_scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ type BucketScanBlocksFinderConfig struct {
ConsistencyDelay time.Duration
IgnoreDeletionMarksDelay time.Duration
IgnoreBlocksWithin time.Duration

BlockDiscoveryStrategy string
}

// BucketScanBlocksFinder is a BlocksFinder implementation periodically scanning the bucket to discover blocks.
Expand Down Expand Up @@ -384,12 +386,25 @@ func (d *BucketScanBlocksFinder) createMetaFetcher(userID string) (block.Metadat
filters = append(filters, storegateway.NewIgnoreNonQueryableBlocksFilter(d.logger, d.cfg.IgnoreBlocksWithin))
}

blockIdsFetcher := block.NewRecursiveLister(userLogger, userBucket)
var (
err error
blockLister block.Lister
)
switch cortex_tsdb.BlockDiscoveryStrategy(d.cfg.BlockDiscoveryStrategy) {
case cortex_tsdb.ConcurrentDiscovery:
blockLister = block.NewConcurrentLister(userLogger, userBucket)
case cortex_tsdb.RecursiveDiscovery:
blockLister = block.NewRecursiveLister(userLogger, userBucket)
case cortex_tsdb.BucketIndexDiscovery:
return nil, nil, nil, cortex_tsdb.ErrInvalidBucketIndexBlockDiscoveryStrategy
default:
return nil, nil, nil, cortex_tsdb.ErrBlockDiscoveryStrategy
}
f, err := block.NewMetaFetcher(
userLogger,
d.cfg.MetasConcurrency,
userBucket,
blockIdsFetcher,
blockLister,
// The fetcher stores cached metas in the "meta-syncer/" sub directory.
filepath.Join(d.cfg.CacheDir, userID),
userReg,
Expand Down
1 change: 1 addition & 0 deletions pkg/querier/blocks_finder_bucket_scan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -521,5 +521,6 @@ func prepareBucketScanBlocksFinderConfig() BucketScanBlocksFinderConfig {
MetasConcurrency: 10,
IgnoreDeletionMarksDelay: time.Hour,
IgnoreBlocksWithin: 10 * time.Hour, // All blocks created in the last 10 hour shouldn't be scanned.
BlockDiscoveryStrategy: string(cortex_tsdb.RecursiveDiscovery),
}
}
1 change: 1 addition & 0 deletions pkg/querier/blocks_store_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ func NewBlocksStoreQueryableFromConfig(querierCfg Config, gatewayCfg storegatewa
CacheDir: storageCfg.BucketStore.SyncDir,
IgnoreDeletionMarksDelay: storageCfg.BucketStore.IgnoreDeletionMarksDelay,
IgnoreBlocksWithin: storageCfg.BucketStore.IgnoreBlocksWithin,
BlockDiscoveryStrategy: storageCfg.BucketStore.BlockDiscoveryStrategy,
}, bucketClient, limits, logger, reg)
}

Expand Down
34 changes: 17 additions & 17 deletions pkg/storage/tsdb/bucketindex/block_ids_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,40 +13,40 @@ import (
"github.com/cortexproject/cortex/pkg/storage/bucket"
)

type BlockIDsFetcher struct {
logger log.Logger
bkt objstore.Bucket
userID string
cfgProvider bucket.TenantConfigProvider
baseBlockIDsFetcher block.Lister
type BlockLister struct {
logger log.Logger
bkt objstore.Bucket
userID string
cfgProvider bucket.TenantConfigProvider
baseLister block.Lister
}

func NewBlockIDsFetcher(logger log.Logger, bkt objstore.Bucket, userID string, cfgProvider bucket.TenantConfigProvider) *BlockIDsFetcher {
func NewBlockLister(logger log.Logger, bkt objstore.Bucket, userID string, cfgProvider bucket.TenantConfigProvider) *BlockLister {
userBkt := bucket.NewUserBucketClient(userID, bkt, cfgProvider)
baseBlockIDsFetcher := block.NewRecursiveLister(logger, userBkt)
return &BlockIDsFetcher{
logger: logger,
bkt: bkt,
userID: userID,
cfgProvider: cfgProvider,
baseBlockIDsFetcher: baseBlockIDsFetcher,
baseLister := block.NewConcurrentLister(logger, userBkt)
return &BlockLister{
logger: logger,
bkt: bkt,
userID: userID,
cfgProvider: cfgProvider,
baseLister: baseLister,
}
}

func (f *BlockIDsFetcher) GetActiveAndPartialBlockIDs(ctx context.Context, ch chan<- ulid.ULID) (partialBlocks map[ulid.ULID]bool, err error) {
func (f *BlockLister) GetActiveAndPartialBlockIDs(ctx context.Context, ch chan<- ulid.ULID) (partialBlocks map[ulid.ULID]bool, err error) {
// Fetch the bucket index.
idx, err := ReadIndex(ctx, f.bkt, f.userID, f.cfgProvider, f.logger)
if errors.Is(err, ErrIndexNotFound) {
// This is a legit case happening when the first blocks of a tenant have recently been uploaded by ingesters
// and their bucket index has not been created yet.
// Fallback to BaseBlockIDsFetcher.
return f.baseBlockIDsFetcher.GetActiveAndPartialBlockIDs(ctx, ch)
return f.baseLister.GetActiveAndPartialBlockIDs(ctx, ch)
}
if errors.Is(err, ErrIndexCorrupted) {
// In case a single tenant bucket index is corrupted, we want to return empty active blocks and parital blocks, so skipping this compaction cycle
level.Error(f.logger).Log("msg", "corrupted bucket index found", "user", f.userID, "err", err)
// Fallback to BaseBlockIDsFetcher.
return f.baseBlockIDsFetcher.GetActiveAndPartialBlockIDs(ctx, ch)
return f.baseLister.GetActiveAndPartialBlockIDs(ctx, ch)
}

if errors.Is(err, bucket.ErrCustomerManagedKeyAccessDenied) {
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/tsdb/bucketindex/block_ids_fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func TestBlockIDsFetcher_Fetch(t *testing.T) {
UpdatedAt: now.Unix(),
}))

blockIdsFetcher := NewBlockIDsFetcher(logger, bkt, userID, nil)
blockIdsFetcher := NewBlockLister(logger, bkt, userID, nil)
ch := make(chan ulid.ULID)
var wg sync.WaitGroup
var blockIds []ulid.ULID
Expand Down Expand Up @@ -94,7 +94,7 @@ func TestBlockIDsFetcherFetcher_Fetch_NoBucketIndex(t *testing.T) {
require.NoError(t, json.NewEncoder(&buf).Encode(mark))
require.NoError(t, bkt.Upload(ctx, path.Join(userID, mark.ID.String(), metadata.DeletionMarkFilename), &buf))
}
blockIdsFetcher := NewBlockIDsFetcher(logger, bkt, userID, nil)
blockIdsFetcher := NewBlockLister(logger, bkt, userID, nil)
ch := make(chan ulid.ULID)
var wg sync.WaitGroup
var blockIds []ulid.ULID
Expand Down
24 changes: 24 additions & 0 deletions pkg/storage/tsdb/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/thanos-io/thanos/pkg/store"

"github.com/cortexproject/cortex/pkg/storage/bucket"
"github.com/cortexproject/cortex/pkg/util"
)

const (
Expand Down Expand Up @@ -48,6 +49,9 @@ var (
errInvalidStripeSize = errors.New("invalid TSDB stripe size")
errInvalidOutOfOrderCapMax = errors.New("invalid TSDB OOO chunks capacity (in samples)")
errEmptyBlockranges = errors.New("empty block ranges for TSDB")

ErrInvalidBucketIndexBlockDiscoveryStrategy = errors.New("bucket index block discovery strategy can only be enabled when bucket index is enabled")
ErrBlockDiscoveryStrategy = errors.New("invalid block discovery strategy")
)

// BlocksStorageConfig holds the config information for the blocks storage.
Expand Down Expand Up @@ -252,6 +256,7 @@ type BucketStoreConfig struct {
IgnoreDeletionMarksDelay time.Duration `yaml:"ignore_deletion_mark_delay"`
IgnoreBlocksWithin time.Duration `yaml:"ignore_blocks_within"`
BucketIndex BucketIndexConfig `yaml:"bucket_index"`
BlockDiscoveryStrategy string `yaml:"block_discovery_strategy"`

// Chunk pool.
MaxChunkPoolBytes uint64 `yaml:"max_chunk_pool_bytes"`
Expand Down Expand Up @@ -315,6 +320,7 @@ func (cfg *BucketStoreConfig) RegisterFlags(f *flag.FlagSet) {
f.Uint64Var(&cfg.EstimatedMaxChunkSizeBytes, "blocks-storage.bucket-store.estimated-max-chunk-size-bytes", store.EstimatedMaxChunkSize, "Estimated max chunk size in bytes. Setting a large value might result in over fetching data while a small value might result in data refetch. Default value is 16KiB.")
f.BoolVar(&cfg.LazyExpandedPostingsEnabled, "blocks-storage.bucket-store.lazy-expanded-postings-enabled", false, "If true, Store Gateway will estimate postings size and try to lazily expand postings if it downloads less data than expanding all postings.")
f.IntVar(&cfg.SeriesBatchSize, "blocks-storage.bucket-store.series-batch-size", store.SeriesBatchSize, "Controls how many series to fetch per batch in Store Gateway. Default value is 10000.")
f.StringVar(&cfg.BlockDiscoveryStrategy, "blocks-storage.bucket-store.block-discovery-strategy", string(ConcurrentDiscovery), "One of "+strings.Join(supportedBlockDiscoveryStrategies, ", ")+". When set to concurrent, stores will concurrently issue one call per directory to discover active blocks in the bucket. The recursive strategy iterates through all objects in the bucket, recursively traversing into each directory. This avoids N+1 calls at the expense of having slower bucket iterations. bucket_index strategy can be used in Compactor only and utilizes the existing bucket index to fetch block IDs to sync. This avoids iterating the bucket but can be impacted by delays of cleaner creating bucket index.")
}

// Validate the config.
Expand All @@ -331,6 +337,9 @@ func (cfg *BucketStoreConfig) Validate() error {
if err != nil {
return errors.Wrap(err, "metadata-cache configuration")
}
if !util.StringsContain(supportedBlockDiscoveryStrategies, cfg.BlockDiscoveryStrategy) {
return ErrInvalidBucketIndexBlockDiscoveryStrategy
}
return nil
}

Expand All @@ -347,3 +356,18 @@ func (cfg *BucketIndexConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix st
f.DurationVar(&cfg.IdleTimeout, prefix+"idle-timeout", time.Hour, "How long a unused bucket index should be cached. Once this timeout expires, the unused bucket index is removed from the in-memory cache. This option is used only by querier.")
f.DurationVar(&cfg.MaxStalePeriod, prefix+"max-stale-period", time.Hour, "The maximum allowed age of a bucket index (last updated) before queries start failing because the bucket index is too old. The bucket index is periodically updated by the compactor, while this check is enforced in the querier (at query time).")
}

// BlockDiscoveryStrategy configures how to list block IDs from object storage.
type BlockDiscoveryStrategy string

const (
ConcurrentDiscovery BlockDiscoveryStrategy = "concurrent"
RecursiveDiscovery BlockDiscoveryStrategy = "recursive"
BucketIndexDiscovery BlockDiscoveryStrategy = "bucket_index"
)

var supportedBlockDiscoveryStrategies = []string{
string(ConcurrentDiscovery),
string(RecursiveDiscovery),
string(BucketIndexDiscovery),
}
18 changes: 15 additions & 3 deletions pkg/storegateway/bucket_stores.go
Original file line number Diff line number Diff line change
Expand Up @@ -552,13 +552,25 @@ func (u *BucketStores) getOrCreateStore(userID string) (*store.BucketStore, erro
// BucketStore metrics are correctly updated.
fetcherBkt := NewShardingBucketReaderAdapter(userID, u.shardingStrategy, userBkt)

var err error
blockIdsFetcher := block.NewRecursiveLister(userLogger, fetcherBkt)
var (
err error
blockLister block.Lister
)
switch tsdb.BlockDiscoveryStrategy(u.cfg.BucketStore.BlockDiscoveryStrategy) {
case tsdb.ConcurrentDiscovery:
blockLister = block.NewConcurrentLister(userLogger, userBkt)
case tsdb.RecursiveDiscovery:
blockLister = block.NewRecursiveLister(userLogger, userBkt)
case tsdb.BucketIndexDiscovery:
return nil, tsdb.ErrInvalidBucketIndexBlockDiscoveryStrategy
default:
return nil, tsdb.ErrBlockDiscoveryStrategy
}
fetcher, err = block.NewMetaFetcher(
userLogger,
u.cfg.BucketStore.MetaSyncConcurrency,
fetcherBkt,
blockIdsFetcher,
blockLister,
u.syncDirForUser(userID), // The fetcher stores cached metas in the "meta-syncer/" sub directory
fetcherReg,
filters,
Expand Down
Loading