Skip to content

Commit

Permalink
use caching bucket for compactor, except cleaner (#5682)
Browse files Browse the repository at this point in the history
* use caching bucket for compactor, except cleaner

Signed-off-by: Wen Xu <[email protected]>

* disable chunks cache for compactor, do not cache block deletion marker and tenant deletion marker for compactor

Signed-off-by: Wen Xu <[email protected]>

* turn on cachingbucketenabled in compactor test and add more descript to the feature flag

Signed-off-by: Wen Xu <[email protected]>

* remove cachingBucketEnabled flag in unit test

Signed-off-by: Wen Xu <[email protected]>

* properly disable chunkscache

Signed-off-by: Wen Xu <[email protected]>

* fix lint

Signed-off-by: Wen Xu <[email protected]>

---------

Signed-off-by: Wen Xu <[email protected]>
  • Loading branch information
wenxu1024 authored Dec 7, 2023
1 parent a85f3c1 commit ab3ca0a
Show file tree
Hide file tree
Showing 6 changed files with 112 additions and 12 deletions.
5 changes: 5 additions & 0 deletions docs/blocks-storage/compactor.md
Original file line number Diff line number Diff line change
Expand Up @@ -298,4 +298,9 @@ compactor:
# When enabled, index verification will ignore out of order label names.
# CLI flag: -compactor.accept-malformed-index
[accept_malformed_index: <boolean> | default = false]

# When enabled, caching bucket will be used for compactor, except cleaner
# service, which serves as the source of truth for block status
# CLI flag: -compactor.caching-bucket-enabled
[caching_bucket_enabled: <boolean> | default = false]
```
5 changes: 5 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -2021,6 +2021,11 @@ sharding_ring:
# When enabled, index verification will ignore out of order label names.
# CLI flag: -compactor.accept-malformed-index
[accept_malformed_index: <boolean> | default = false]
# When enabled, caching bucket will be used for compactor, except cleaner
# service, which serves as the source of truth for block status
# CLI flag: -compactor.caching-bucket-enabled
[caching_bucket_enabled: <boolean> | default = false]
```

### `configs_config`
Expand Down
14 changes: 14 additions & 0 deletions pkg/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/compact"
"github.com/thanos-io/thanos/pkg/compact/downsample"
"github.com/thanos-io/thanos/pkg/extprom"

"github.com/cortexproject/cortex/pkg/ring"
"github.com/cortexproject/cortex/pkg/storage/bucket"
Expand Down Expand Up @@ -209,6 +210,7 @@ type Config struct {
BlockVisitMarkerFileUpdateInterval time.Duration `yaml:"block_visit_marker_file_update_interval"`

AcceptMalformedIndex bool `yaml:"accept_malformed_index"`
CachingBucketEnabled bool `yaml:"caching_bucket_enabled"`
}

// RegisterFlags registers the Compactor flags.
Expand Down Expand Up @@ -247,6 +249,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.DurationVar(&cfg.BlockVisitMarkerFileUpdateInterval, "compactor.block-visit-marker-file-update-interval", 1*time.Minute, "How frequently block visit marker file should be updated duration compaction.")

f.BoolVar(&cfg.AcceptMalformedIndex, "compactor.accept-malformed-index", false, "When enabled, index verification will ignore out of order label names.")
f.BoolVar(&cfg.CachingBucketEnabled, "compactor.caching-bucket-enabled", false, "When enabled, caching bucket will be used for compactor, except cleaner service, which serves as the source of truth for block status")
}

func (cfg *Config) Validate(limits validation.Limits) error {
Expand Down Expand Up @@ -588,6 +591,17 @@ func (c *Compactor) starting(ctx context.Context) error {
return errors.Wrap(err, "failed to start the blocks cleaner")
}

if c.compactorCfg.CachingBucketEnabled {
matchers := cortex_tsdb.NewMatchers()
// Do not cache tenant deletion marker and block deletion marker for compactor
matchers.SetMetaFileMatcher(func(name string) bool {
return strings.HasSuffix(name, "/"+metadata.MetaFilename)
})
c.bucketClient, err = cortex_tsdb.CreateCachingBucket(cortex_tsdb.ChunksCacheConfig{}, c.storageCfg.BucketStore.MetadataCache, matchers, c.bucketClient, c.logger, extprom.WrapRegistererWith(prometheus.Labels{"component": "compactor"}, c.registerer))
if err != nil {
return errors.Wrap(err, "create caching bucket")
}
}
return nil
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/querier/blocks_store_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,8 @@ func NewBlocksStoreQueryableFromConfig(querierCfg Config, gatewayCfg storegatewa
}

// Blocks finder doesn't use chunks, but we pass config for consistency.
cachingBucket, err := cortex_tsdb.CreateCachingBucket(storageCfg.BucketStore.ChunksCache, storageCfg.BucketStore.MetadataCache, bucketClient, logger, extprom.WrapRegistererWith(prometheus.Labels{"component": "querier"}, reg))
matchers := cortex_tsdb.NewMatchers()
cachingBucket, err := cortex_tsdb.CreateCachingBucket(storageCfg.BucketStore.ChunksCache, storageCfg.BucketStore.MetadataCache, matchers, bucketClient, logger, extprom.WrapRegistererWith(prometheus.Labels{"component": "querier"}, reg))
if err != nil {
return nil, errors.Wrap(err, "create caching bucket")
}
Expand Down
94 changes: 84 additions & 10 deletions pkg/storage/tsdb/caching_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func (cfg *MetadataCacheConfig) Validate() error {
return cfg.CacheBackend.Validate()
}

func CreateCachingBucket(chunksConfig ChunksCacheConfig, metadataConfig MetadataCacheConfig, bkt objstore.InstrumentedBucket, logger log.Logger, reg prometheus.Registerer) (objstore.InstrumentedBucket, error) {
func CreateCachingBucket(chunksConfig ChunksCacheConfig, metadataConfig MetadataCacheConfig, matchers Matchers, bkt objstore.InstrumentedBucket, logger log.Logger, reg prometheus.Registerer) (objstore.InstrumentedBucket, error) {
cfg := cache.NewCachingBucketConfig()
cachingConfigured := false

Expand All @@ -121,7 +121,7 @@ func CreateCachingBucket(chunksConfig ChunksCacheConfig, metadataConfig Metadata
if chunksCache != nil {
cachingConfigured = true
chunksCache = cache.NewTracingCache(chunksCache)
cfg.CacheGetRange("chunks", chunksCache, isTSDBChunkFile, chunksConfig.SubrangeSize, chunksConfig.AttributesTTL, chunksConfig.SubrangeTTL, chunksConfig.MaxGetRangeRequests)
cfg.CacheGetRange("chunks", chunksCache, matchers.GetChunksMatcher(), chunksConfig.SubrangeSize, chunksConfig.AttributesTTL, chunksConfig.SubrangeTTL, chunksConfig.MaxGetRangeRequests)
}

metadataCache, err := createCache("metadata-cache", &metadataConfig.CacheBackend, logger, reg)
Expand All @@ -132,16 +132,16 @@ func CreateCachingBucket(chunksConfig ChunksCacheConfig, metadataConfig Metadata
cachingConfigured = true
metadataCache = cache.NewTracingCache(metadataCache)

cfg.CacheExists("metafile", metadataCache, isMetaFile, metadataConfig.MetafileExistsTTL, metadataConfig.MetafileDoesntExistTTL)
cfg.CacheGet("metafile", metadataCache, isMetaFile, metadataConfig.MetafileMaxSize, metadataConfig.MetafileContentTTL, metadataConfig.MetafileExistsTTL, metadataConfig.MetafileDoesntExistTTL)
cfg.CacheAttributes("metafile", metadataCache, isMetaFile, metadataConfig.MetafileAttributesTTL)
cfg.CacheAttributes("block-index", metadataCache, isBlockIndexFile, metadataConfig.BlockIndexAttributesTTL)
cfg.CacheGet("bucket-index", metadataCache, isBucketIndexFiles, metadataConfig.BucketIndexMaxSize, metadataConfig.BucketIndexContentTTL /* do not cache exist / not exist: */, 0, 0)
cfg.CacheExists("metafile", metadataCache, matchers.GetMetafileMatcher(), metadataConfig.MetafileExistsTTL, metadataConfig.MetafileDoesntExistTTL)
cfg.CacheGet("metafile", metadataCache, matchers.GetMetafileMatcher(), metadataConfig.MetafileMaxSize, metadataConfig.MetafileContentTTL, metadataConfig.MetafileExistsTTL, metadataConfig.MetafileDoesntExistTTL)
cfg.CacheAttributes("metafile", metadataCache, matchers.GetMetafileMatcher(), metadataConfig.MetafileAttributesTTL)
cfg.CacheAttributes("block-index", metadataCache, matchers.GetBlockIndexMatcher(), metadataConfig.BlockIndexAttributesTTL)
cfg.CacheGet("bucket-index", metadataCache, matchers.GetBucketIndexMatcher(), metadataConfig.BucketIndexMaxSize, metadataConfig.BucketIndexContentTTL /* do not cache exist / not exist: */, 0, 0)

codec := snappyIterCodec{storecache.JSONIterCodec{}}
cfg.CacheIter("tenants-iter", metadataCache, isTenantsDir, metadataConfig.TenantsListTTL, codec)
cfg.CacheIter("tenant-blocks-iter", metadataCache, isTenantBlocksDir, metadataConfig.TenantBlocksListTTL, codec)
cfg.CacheIter("chunks-iter", metadataCache, isChunksDir, metadataConfig.ChunksListTTL, codec)
cfg.CacheIter("tenants-iter", metadataCache, matchers.GetTenantsIterMatcher(), metadataConfig.TenantsListTTL, codec)
cfg.CacheIter("tenant-blocks-iter", metadataCache, matchers.GetTenantBlocksIterMatcher(), metadataConfig.TenantBlocksListTTL, codec)
cfg.CacheIter("chunks-iter", metadataCache, matchers.GetChunksIterMatcher(), metadataConfig.ChunksListTTL, codec)
}

if !cachingConfigured {
Expand Down Expand Up @@ -178,6 +178,80 @@ func createCache(cacheName string, cacheBackend *CacheBackend, logger log.Logger
}
}

type Matchers struct {
matcherMap map[string]func(string) bool
}

func NewMatchers() Matchers {
matcherMap := make(map[string]func(string) bool)
matcherMap["chunks"] = isTSDBChunkFile
matcherMap["metafile"] = isMetaFile
matcherMap["block-index"] = isBlockIndexFile
matcherMap["bucket-index"] = isBucketIndexFiles
matcherMap["tenants-iter"] = isTenantsDir
matcherMap["tenant-blocks-iter"] = isTenantBlocksDir
matcherMap["chunks-iter"] = isChunksDir
return Matchers{
matcherMap: matcherMap,
}
}

func (m *Matchers) SetMetaFileMatcher(f func(string) bool) {
m.matcherMap["metafile"] = f
}

func (m *Matchers) SetChunksMatcher(f func(string) bool) {
m.matcherMap["chunks"] = f
}

func (m *Matchers) SetBlockIndexMatcher(f func(string) bool) {
m.matcherMap["block-index"] = f
}

func (m *Matchers) SetBucketIndexMatcher(f func(string) bool) {
m.matcherMap["bucket-index"] = f
}

func (m *Matchers) SetTenantsIterMatcher(f func(string) bool) {
m.matcherMap["tenants-iter"] = f
}

func (m *Matchers) SetTenantBlocksIterMatcher(f func(string) bool) {
m.matcherMap["tenant-blocks-iter"] = f
}

func (m *Matchers) SetChunksIterMatcher(f func(string) bool) {
m.matcherMap["chunks-iter"] = f
}

func (m *Matchers) GetChunksMatcher() func(string) bool {
return m.matcherMap["chunks"]
}

func (m *Matchers) GetMetafileMatcher() func(string) bool {
return m.matcherMap["metafile"]
}

func (m *Matchers) GetBlockIndexMatcher() func(string) bool {
return m.matcherMap["block-index"]
}

func (m *Matchers) GetBucketIndexMatcher() func(string) bool {
return m.matcherMap["bucket-index"]
}

func (m *Matchers) GetTenantsIterMatcher() func(string) bool {
return m.matcherMap["tenants-iter"]
}

func (m *Matchers) GetTenantBlocksIterMatcher() func(string) bool {
return m.matcherMap["tenant-blocks-iter"]
}

func (m *Matchers) GetChunksIterMatcher() func(string) bool {
return m.matcherMap["chunks-iter"]
}

var chunksMatcher = regexp.MustCompile(`^.*/chunks/\d+$`)

func isTSDBChunkFile(name string) bool { return chunksMatcher.MatchString(name) }
Expand Down
3 changes: 2 additions & 1 deletion pkg/storegateway/bucket_stores.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,8 @@ var ErrTooManyInflightRequests = status.Error(codes.ResourceExhausted, "too many

// NewBucketStores makes a new BucketStores.
func NewBucketStores(cfg tsdb.BlocksStorageConfig, shardingStrategy ShardingStrategy, bucketClient objstore.InstrumentedBucket, limits *validation.Overrides, logLevel logging.Level, logger log.Logger, reg prometheus.Registerer) (*BucketStores, error) {
cachingBucket, err := tsdb.CreateCachingBucket(cfg.BucketStore.ChunksCache, cfg.BucketStore.MetadataCache, bucketClient, logger, reg)
matchers := tsdb.NewMatchers()
cachingBucket, err := tsdb.CreateCachingBucket(cfg.BucketStore.ChunksCache, cfg.BucketStore.MetadataCache, matchers, bucketClient, logger, reg)
if err != nil {
return nil, errors.Wrapf(err, "create caching bucket")
}
Expand Down

0 comments on commit ab3ca0a

Please sign in to comment.