Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use caching bucket for compactor, except cleaner #5682

Merged
merged 6 commits into from
Dec 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/blocks-storage/compactor.md
Original file line number Diff line number Diff line change
Expand Up @@ -298,4 +298,9 @@ compactor:
# When enabled, index verification will ignore out of order label names.
# CLI flag: -compactor.accept-malformed-index
[accept_malformed_index: <boolean> | default = false]

# When enabled, caching bucket will be used for compactor, except cleaner
# service, which serves as the source of truth for block status
# CLI flag: -compactor.caching-bucket-enabled
[caching_bucket_enabled: <boolean> | default = false]
```
5 changes: 5 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -2021,6 +2021,11 @@ sharding_ring:
# When enabled, index verification will ignore out of order label names.
# CLI flag: -compactor.accept-malformed-index
[accept_malformed_index: <boolean> | default = false]

# When enabled, caching bucket will be used for compactor, except cleaner
# service, which serves as the source of truth for block status
# CLI flag: -compactor.caching-bucket-enabled
[caching_bucket_enabled: <boolean> | default = false]
```

### `configs_config`
Expand Down
14 changes: 14 additions & 0 deletions pkg/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/compact"
"github.com/thanos-io/thanos/pkg/compact/downsample"
"github.com/thanos-io/thanos/pkg/extprom"

"github.com/cortexproject/cortex/pkg/ring"
"github.com/cortexproject/cortex/pkg/storage/bucket"
Expand Down Expand Up @@ -209,6 +210,7 @@ type Config struct {
BlockVisitMarkerFileUpdateInterval time.Duration `yaml:"block_visit_marker_file_update_interval"`

AcceptMalformedIndex bool `yaml:"accept_malformed_index"`
CachingBucketEnabled bool `yaml:"caching_bucket_enabled"`
}

// RegisterFlags registers the Compactor flags.
Expand Down Expand Up @@ -247,6 +249,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.DurationVar(&cfg.BlockVisitMarkerFileUpdateInterval, "compactor.block-visit-marker-file-update-interval", 1*time.Minute, "How frequently block visit marker file should be updated duration compaction.")

f.BoolVar(&cfg.AcceptMalformedIndex, "compactor.accept-malformed-index", false, "When enabled, index verification will ignore out of order label names.")
f.BoolVar(&cfg.CachingBucketEnabled, "compactor.caching-bucket-enabled", false, "When enabled, caching bucket will be used for compactor, except cleaner service, which serves as the source of truth for block status")
}

func (cfg *Config) Validate(limits validation.Limits) error {
Expand Down Expand Up @@ -588,6 +591,17 @@ func (c *Compactor) starting(ctx context.Context) error {
return errors.Wrap(err, "failed to start the blocks cleaner")
}

if c.compactorCfg.CachingBucketEnabled {
matchers := cortex_tsdb.NewMatchers()
// Do not cache tenant deletion marker and block deletion marker for compactor
matchers.SetMetaFileMatcher(func(name string) bool {
return strings.HasSuffix(name, "/"+metadata.MetaFilename)
})
c.bucketClient, err = cortex_tsdb.CreateCachingBucket(cortex_tsdb.ChunksCacheConfig{}, c.storageCfg.BucketStore.MetadataCache, matchers, c.bucketClient, c.logger, extprom.WrapRegistererWith(prometheus.Labels{"component": "compactor"}, c.registerer))
if err != nil {
return errors.Wrap(err, "create caching bucket")
}
}
return nil
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/querier/blocks_store_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,8 @@ func NewBlocksStoreQueryableFromConfig(querierCfg Config, gatewayCfg storegatewa
}

// Blocks finder doesn't use chunks, but we pass config for consistency.
cachingBucket, err := cortex_tsdb.CreateCachingBucket(storageCfg.BucketStore.ChunksCache, storageCfg.BucketStore.MetadataCache, bucketClient, logger, extprom.WrapRegistererWith(prometheus.Labels{"component": "querier"}, reg))
matchers := cortex_tsdb.NewMatchers()
cachingBucket, err := cortex_tsdb.CreateCachingBucket(storageCfg.BucketStore.ChunksCache, storageCfg.BucketStore.MetadataCache, matchers, bucketClient, logger, extprom.WrapRegistererWith(prometheus.Labels{"component": "querier"}, reg))
if err != nil {
return nil, errors.Wrap(err, "create caching bucket")
}
Expand Down
94 changes: 84 additions & 10 deletions pkg/storage/tsdb/caching_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func (cfg *MetadataCacheConfig) Validate() error {
return cfg.CacheBackend.Validate()
}

func CreateCachingBucket(chunksConfig ChunksCacheConfig, metadataConfig MetadataCacheConfig, bkt objstore.InstrumentedBucket, logger log.Logger, reg prometheus.Registerer) (objstore.InstrumentedBucket, error) {
func CreateCachingBucket(chunksConfig ChunksCacheConfig, metadataConfig MetadataCacheConfig, matchers Matchers, bkt objstore.InstrumentedBucket, logger log.Logger, reg prometheus.Registerer) (objstore.InstrumentedBucket, error) {
cfg := cache.NewCachingBucketConfig()
cachingConfigured := false

Expand All @@ -121,7 +121,7 @@ func CreateCachingBucket(chunksConfig ChunksCacheConfig, metadataConfig Metadata
if chunksCache != nil {
cachingConfigured = true
chunksCache = cache.NewTracingCache(chunksCache)
cfg.CacheGetRange("chunks", chunksCache, isTSDBChunkFile, chunksConfig.SubrangeSize, chunksConfig.AttributesTTL, chunksConfig.SubrangeTTL, chunksConfig.MaxGetRangeRequests)
cfg.CacheGetRange("chunks", chunksCache, matchers.GetChunksMatcher(), chunksConfig.SubrangeSize, chunksConfig.AttributesTTL, chunksConfig.SubrangeTTL, chunksConfig.MaxGetRangeRequests)
}

metadataCache, err := createCache("metadata-cache", &metadataConfig.CacheBackend, logger, reg)
Expand All @@ -132,16 +132,16 @@ func CreateCachingBucket(chunksConfig ChunksCacheConfig, metadataConfig Metadata
cachingConfigured = true
metadataCache = cache.NewTracingCache(metadataCache)

cfg.CacheExists("metafile", metadataCache, isMetaFile, metadataConfig.MetafileExistsTTL, metadataConfig.MetafileDoesntExistTTL)
cfg.CacheGet("metafile", metadataCache, isMetaFile, metadataConfig.MetafileMaxSize, metadataConfig.MetafileContentTTL, metadataConfig.MetafileExistsTTL, metadataConfig.MetafileDoesntExistTTL)
cfg.CacheAttributes("metafile", metadataCache, isMetaFile, metadataConfig.MetafileAttributesTTL)
cfg.CacheAttributes("block-index", metadataCache, isBlockIndexFile, metadataConfig.BlockIndexAttributesTTL)
cfg.CacheGet("bucket-index", metadataCache, isBucketIndexFiles, metadataConfig.BucketIndexMaxSize, metadataConfig.BucketIndexContentTTL /* do not cache exist / not exist: */, 0, 0)
cfg.CacheExists("metafile", metadataCache, matchers.GetMetafileMatcher(), metadataConfig.MetafileExistsTTL, metadataConfig.MetafileDoesntExistTTL)
cfg.CacheGet("metafile", metadataCache, matchers.GetMetafileMatcher(), metadataConfig.MetafileMaxSize, metadataConfig.MetafileContentTTL, metadataConfig.MetafileExistsTTL, metadataConfig.MetafileDoesntExistTTL)
cfg.CacheAttributes("metafile", metadataCache, matchers.GetMetafileMatcher(), metadataConfig.MetafileAttributesTTL)
cfg.CacheAttributes("block-index", metadataCache, matchers.GetBlockIndexMatcher(), metadataConfig.BlockIndexAttributesTTL)
cfg.CacheGet("bucket-index", metadataCache, matchers.GetBucketIndexMatcher(), metadataConfig.BucketIndexMaxSize, metadataConfig.BucketIndexContentTTL /* do not cache exist / not exist: */, 0, 0)

codec := snappyIterCodec{storecache.JSONIterCodec{}}
cfg.CacheIter("tenants-iter", metadataCache, isTenantsDir, metadataConfig.TenantsListTTL, codec)
cfg.CacheIter("tenant-blocks-iter", metadataCache, isTenantBlocksDir, metadataConfig.TenantBlocksListTTL, codec)
cfg.CacheIter("chunks-iter", metadataCache, isChunksDir, metadataConfig.ChunksListTTL, codec)
cfg.CacheIter("tenants-iter", metadataCache, matchers.GetTenantsIterMatcher(), metadataConfig.TenantsListTTL, codec)
cfg.CacheIter("tenant-blocks-iter", metadataCache, matchers.GetTenantBlocksIterMatcher(), metadataConfig.TenantBlocksListTTL, codec)
cfg.CacheIter("chunks-iter", metadataCache, matchers.GetChunksIterMatcher(), metadataConfig.ChunksListTTL, codec)
}

if !cachingConfigured {
Expand Down Expand Up @@ -178,6 +178,80 @@ func createCache(cacheName string, cacheBackend *CacheBackend, logger log.Logger
}
}

type Matchers struct {
matcherMap map[string]func(string) bool
}

func NewMatchers() Matchers {
matcherMap := make(map[string]func(string) bool)
matcherMap["chunks"] = isTSDBChunkFile
matcherMap["metafile"] = isMetaFile
matcherMap["block-index"] = isBlockIndexFile
matcherMap["bucket-index"] = isBucketIndexFiles
matcherMap["tenants-iter"] = isTenantsDir
matcherMap["tenant-blocks-iter"] = isTenantBlocksDir
matcherMap["chunks-iter"] = isChunksDir
return Matchers{
matcherMap: matcherMap,
}
}

func (m *Matchers) SetMetaFileMatcher(f func(string) bool) {
m.matcherMap["metafile"] = f
}

func (m *Matchers) SetChunksMatcher(f func(string) bool) {
m.matcherMap["chunks"] = f
}

func (m *Matchers) SetBlockIndexMatcher(f func(string) bool) {
m.matcherMap["block-index"] = f
}

func (m *Matchers) SetBucketIndexMatcher(f func(string) bool) {
m.matcherMap["bucket-index"] = f
}

func (m *Matchers) SetTenantsIterMatcher(f func(string) bool) {
m.matcherMap["tenants-iter"] = f
}

func (m *Matchers) SetTenantBlocksIterMatcher(f func(string) bool) {
m.matcherMap["tenant-blocks-iter"] = f
}

func (m *Matchers) SetChunksIterMatcher(f func(string) bool) {
m.matcherMap["chunks-iter"] = f
}

func (m *Matchers) GetChunksMatcher() func(string) bool {
return m.matcherMap["chunks"]
}

func (m *Matchers) GetMetafileMatcher() func(string) bool {
return m.matcherMap["metafile"]
}

func (m *Matchers) GetBlockIndexMatcher() func(string) bool {
return m.matcherMap["block-index"]
}

func (m *Matchers) GetBucketIndexMatcher() func(string) bool {
return m.matcherMap["bucket-index"]
}

func (m *Matchers) GetTenantsIterMatcher() func(string) bool {
return m.matcherMap["tenants-iter"]
}

func (m *Matchers) GetTenantBlocksIterMatcher() func(string) bool {
return m.matcherMap["tenant-blocks-iter"]
}

func (m *Matchers) GetChunksIterMatcher() func(string) bool {
return m.matcherMap["chunks-iter"]
}

var chunksMatcher = regexp.MustCompile(`^.*/chunks/\d+$`)

func isTSDBChunkFile(name string) bool { return chunksMatcher.MatchString(name) }
Expand Down
3 changes: 2 additions & 1 deletion pkg/storegateway/bucket_stores.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,8 @@ var ErrTooManyInflightRequests = status.Error(codes.ResourceExhausted, "too many

// NewBucketStores makes a new BucketStores.
func NewBucketStores(cfg tsdb.BlocksStorageConfig, shardingStrategy ShardingStrategy, bucketClient objstore.InstrumentedBucket, limits *validation.Overrides, logLevel logging.Level, logger log.Logger, reg prometheus.Registerer) (*BucketStores, error) {
cachingBucket, err := tsdb.CreateCachingBucket(cfg.BucketStore.ChunksCache, cfg.BucketStore.MetadataCache, bucketClient, logger, reg)
matchers := tsdb.NewMatchers()
cachingBucket, err := tsdb.CreateCachingBucket(cfg.BucketStore.ChunksCache, cfg.BucketStore.MetadataCache, matchers, bucketClient, logger, reg)
if err != nil {
return nil, errors.Wrapf(err, "create caching bucket")
}
Expand Down
Loading