diff --git a/docs/blocks-storage/compactor.md b/docs/blocks-storage/compactor.md index 374c192bd7..5fb44208ec 100644 --- a/docs/blocks-storage/compactor.md +++ b/docs/blocks-storage/compactor.md @@ -298,4 +298,9 @@ compactor: # When enabled, index verification will ignore out of order label names. # CLI flag: -compactor.accept-malformed-index [accept_malformed_index: | default = false] + + # When enabled, caching bucket will be used for compactor, except cleaner + # service, which serves as the source of truth for block status + # CLI flag: -compactor.caching-bucket-enabled + [caching_bucket_enabled: | default = false] ``` diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index c409a276ec..8282c15c9a 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -2021,6 +2021,11 @@ sharding_ring: # When enabled, index verification will ignore out of order label names. # CLI flag: -compactor.accept-malformed-index [accept_malformed_index: | default = false] + +# When enabled, caching bucket will be used for compactor, except cleaner +# service, which serves as the source of truth for block status +# CLI flag: -compactor.caching-bucket-enabled +[caching_bucket_enabled: | default = false] ``` ### `configs_config` diff --git a/pkg/compactor/compactor.go b/pkg/compactor/compactor.go index a7e5cf429d..77713ded32 100644 --- a/pkg/compactor/compactor.go +++ b/pkg/compactor/compactor.go @@ -22,6 +22,7 @@ import ( "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/compact" "github.com/thanos-io/thanos/pkg/compact/downsample" + "github.com/thanos-io/thanos/pkg/extprom" "github.com/cortexproject/cortex/pkg/ring" "github.com/cortexproject/cortex/pkg/storage/bucket" @@ -209,6 +210,7 @@ type Config struct { BlockVisitMarkerFileUpdateInterval time.Duration `yaml:"block_visit_marker_file_update_interval"` AcceptMalformedIndex bool `yaml:"accept_malformed_index"` + CachingBucketEnabled bool `yaml:"caching_bucket_enabled"` } // RegisterFlags registers the Compactor flags. @@ -247,6 +249,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.DurationVar(&cfg.BlockVisitMarkerFileUpdateInterval, "compactor.block-visit-marker-file-update-interval", 1*time.Minute, "How frequently block visit marker file should be updated duration compaction.") f.BoolVar(&cfg.AcceptMalformedIndex, "compactor.accept-malformed-index", false, "When enabled, index verification will ignore out of order label names.") + f.BoolVar(&cfg.CachingBucketEnabled, "compactor.caching-bucket-enabled", false, "When enabled, caching bucket will be used for compactor, except cleaner service, which serves as the source of truth for block status") } func (cfg *Config) Validate(limits validation.Limits) error { @@ -588,6 +591,17 @@ func (c *Compactor) starting(ctx context.Context) error { return errors.Wrap(err, "failed to start the blocks cleaner") } + if c.compactorCfg.CachingBucketEnabled { + matchers := cortex_tsdb.NewMatchers() + // Do not cache tenant deletion marker and block deletion marker for compactor + matchers.SetMetaFileMatcher(func(name string) bool { + return strings.HasSuffix(name, "/"+metadata.MetaFilename) + }) + c.bucketClient, err = cortex_tsdb.CreateCachingBucket(cortex_tsdb.ChunksCacheConfig{}, c.storageCfg.BucketStore.MetadataCache, matchers, c.bucketClient, c.logger, extprom.WrapRegistererWith(prometheus.Labels{"component": "compactor"}, c.registerer)) + if err != nil { + return errors.Wrap(err, "create caching bucket") + } + } return nil } diff --git a/pkg/querier/blocks_store_queryable.go b/pkg/querier/blocks_store_queryable.go index 2ab1698e43..c4adf5bdab 100644 --- a/pkg/querier/blocks_store_queryable.go +++ b/pkg/querier/blocks_store_queryable.go @@ -183,7 +183,8 @@ func NewBlocksStoreQueryableFromConfig(querierCfg Config, gatewayCfg storegatewa } // Blocks finder doesn't use chunks, but we pass config for consistency. - cachingBucket, err := cortex_tsdb.CreateCachingBucket(storageCfg.BucketStore.ChunksCache, storageCfg.BucketStore.MetadataCache, bucketClient, logger, extprom.WrapRegistererWith(prometheus.Labels{"component": "querier"}, reg)) + matchers := cortex_tsdb.NewMatchers() + cachingBucket, err := cortex_tsdb.CreateCachingBucket(storageCfg.BucketStore.ChunksCache, storageCfg.BucketStore.MetadataCache, matchers, bucketClient, logger, extprom.WrapRegistererWith(prometheus.Labels{"component": "querier"}, reg)) if err != nil { return nil, errors.Wrap(err, "create caching bucket") } diff --git a/pkg/storage/tsdb/caching_bucket.go b/pkg/storage/tsdb/caching_bucket.go index 990f8119e0..4c904de6e6 100644 --- a/pkg/storage/tsdb/caching_bucket.go +++ b/pkg/storage/tsdb/caching_bucket.go @@ -110,7 +110,7 @@ func (cfg *MetadataCacheConfig) Validate() error { return cfg.CacheBackend.Validate() } -func CreateCachingBucket(chunksConfig ChunksCacheConfig, metadataConfig MetadataCacheConfig, bkt objstore.InstrumentedBucket, logger log.Logger, reg prometheus.Registerer) (objstore.InstrumentedBucket, error) { +func CreateCachingBucket(chunksConfig ChunksCacheConfig, metadataConfig MetadataCacheConfig, matchers Matchers, bkt objstore.InstrumentedBucket, logger log.Logger, reg prometheus.Registerer) (objstore.InstrumentedBucket, error) { cfg := cache.NewCachingBucketConfig() cachingConfigured := false @@ -121,7 +121,7 @@ func CreateCachingBucket(chunksConfig ChunksCacheConfig, metadataConfig Metadata if chunksCache != nil { cachingConfigured = true chunksCache = cache.NewTracingCache(chunksCache) - cfg.CacheGetRange("chunks", chunksCache, isTSDBChunkFile, chunksConfig.SubrangeSize, chunksConfig.AttributesTTL, chunksConfig.SubrangeTTL, chunksConfig.MaxGetRangeRequests) + cfg.CacheGetRange("chunks", chunksCache, matchers.GetChunksMatcher(), chunksConfig.SubrangeSize, chunksConfig.AttributesTTL, chunksConfig.SubrangeTTL, chunksConfig.MaxGetRangeRequests) } metadataCache, err := createCache("metadata-cache", &metadataConfig.CacheBackend, logger, reg) @@ -132,16 +132,16 @@ func CreateCachingBucket(chunksConfig ChunksCacheConfig, metadataConfig Metadata cachingConfigured = true metadataCache = cache.NewTracingCache(metadataCache) - cfg.CacheExists("metafile", metadataCache, isMetaFile, metadataConfig.MetafileExistsTTL, metadataConfig.MetafileDoesntExistTTL) - cfg.CacheGet("metafile", metadataCache, isMetaFile, metadataConfig.MetafileMaxSize, metadataConfig.MetafileContentTTL, metadataConfig.MetafileExistsTTL, metadataConfig.MetafileDoesntExistTTL) - cfg.CacheAttributes("metafile", metadataCache, isMetaFile, metadataConfig.MetafileAttributesTTL) - cfg.CacheAttributes("block-index", metadataCache, isBlockIndexFile, metadataConfig.BlockIndexAttributesTTL) - cfg.CacheGet("bucket-index", metadataCache, isBucketIndexFiles, metadataConfig.BucketIndexMaxSize, metadataConfig.BucketIndexContentTTL /* do not cache exist / not exist: */, 0, 0) + cfg.CacheExists("metafile", metadataCache, matchers.GetMetafileMatcher(), metadataConfig.MetafileExistsTTL, metadataConfig.MetafileDoesntExistTTL) + cfg.CacheGet("metafile", metadataCache, matchers.GetMetafileMatcher(), metadataConfig.MetafileMaxSize, metadataConfig.MetafileContentTTL, metadataConfig.MetafileExistsTTL, metadataConfig.MetafileDoesntExistTTL) + cfg.CacheAttributes("metafile", metadataCache, matchers.GetMetafileMatcher(), metadataConfig.MetafileAttributesTTL) + cfg.CacheAttributes("block-index", metadataCache, matchers.GetBlockIndexMatcher(), metadataConfig.BlockIndexAttributesTTL) + cfg.CacheGet("bucket-index", metadataCache, matchers.GetBucketIndexMatcher(), metadataConfig.BucketIndexMaxSize, metadataConfig.BucketIndexContentTTL /* do not cache exist / not exist: */, 0, 0) codec := snappyIterCodec{storecache.JSONIterCodec{}} - cfg.CacheIter("tenants-iter", metadataCache, isTenantsDir, metadataConfig.TenantsListTTL, codec) - cfg.CacheIter("tenant-blocks-iter", metadataCache, isTenantBlocksDir, metadataConfig.TenantBlocksListTTL, codec) - cfg.CacheIter("chunks-iter", metadataCache, isChunksDir, metadataConfig.ChunksListTTL, codec) + cfg.CacheIter("tenants-iter", metadataCache, matchers.GetTenantsIterMatcher(), metadataConfig.TenantsListTTL, codec) + cfg.CacheIter("tenant-blocks-iter", metadataCache, matchers.GetTenantBlocksIterMatcher(), metadataConfig.TenantBlocksListTTL, codec) + cfg.CacheIter("chunks-iter", metadataCache, matchers.GetChunksIterMatcher(), metadataConfig.ChunksListTTL, codec) } if !cachingConfigured { @@ -178,6 +178,80 @@ func createCache(cacheName string, cacheBackend *CacheBackend, logger log.Logger } } +type Matchers struct { + matcherMap map[string]func(string) bool +} + +func NewMatchers() Matchers { + matcherMap := make(map[string]func(string) bool) + matcherMap["chunks"] = isTSDBChunkFile + matcherMap["metafile"] = isMetaFile + matcherMap["block-index"] = isBlockIndexFile + matcherMap["bucket-index"] = isBucketIndexFiles + matcherMap["tenants-iter"] = isTenantsDir + matcherMap["tenant-blocks-iter"] = isTenantBlocksDir + matcherMap["chunks-iter"] = isChunksDir + return Matchers{ + matcherMap: matcherMap, + } +} + +func (m *Matchers) SetMetaFileMatcher(f func(string) bool) { + m.matcherMap["metafile"] = f +} + +func (m *Matchers) SetChunksMatcher(f func(string) bool) { + m.matcherMap["chunks"] = f +} + +func (m *Matchers) SetBlockIndexMatcher(f func(string) bool) { + m.matcherMap["block-index"] = f +} + +func (m *Matchers) SetBucketIndexMatcher(f func(string) bool) { + m.matcherMap["bucket-index"] = f +} + +func (m *Matchers) SetTenantsIterMatcher(f func(string) bool) { + m.matcherMap["tenants-iter"] = f +} + +func (m *Matchers) SetTenantBlocksIterMatcher(f func(string) bool) { + m.matcherMap["tenant-blocks-iter"] = f +} + +func (m *Matchers) SetChunksIterMatcher(f func(string) bool) { + m.matcherMap["chunks-iter"] = f +} + +func (m *Matchers) GetChunksMatcher() func(string) bool { + return m.matcherMap["chunks"] +} + +func (m *Matchers) GetMetafileMatcher() func(string) bool { + return m.matcherMap["metafile"] +} + +func (m *Matchers) GetBlockIndexMatcher() func(string) bool { + return m.matcherMap["block-index"] +} + +func (m *Matchers) GetBucketIndexMatcher() func(string) bool { + return m.matcherMap["bucket-index"] +} + +func (m *Matchers) GetTenantsIterMatcher() func(string) bool { + return m.matcherMap["tenants-iter"] +} + +func (m *Matchers) GetTenantBlocksIterMatcher() func(string) bool { + return m.matcherMap["tenant-blocks-iter"] +} + +func (m *Matchers) GetChunksIterMatcher() func(string) bool { + return m.matcherMap["chunks-iter"] +} + var chunksMatcher = regexp.MustCompile(`^.*/chunks/\d+$`) func isTSDBChunkFile(name string) bool { return chunksMatcher.MatchString(name) } diff --git a/pkg/storegateway/bucket_stores.go b/pkg/storegateway/bucket_stores.go index dd2b5ad633..318d4c8f39 100644 --- a/pkg/storegateway/bucket_stores.go +++ b/pkg/storegateway/bucket_stores.go @@ -88,7 +88,8 @@ var ErrTooManyInflightRequests = status.Error(codes.ResourceExhausted, "too many // NewBucketStores makes a new BucketStores. func NewBucketStores(cfg tsdb.BlocksStorageConfig, shardingStrategy ShardingStrategy, bucketClient objstore.InstrumentedBucket, limits *validation.Overrides, logLevel logging.Level, logger log.Logger, reg prometheus.Registerer) (*BucketStores, error) { - cachingBucket, err := tsdb.CreateCachingBucket(cfg.BucketStore.ChunksCache, cfg.BucketStore.MetadataCache, bucketClient, logger, reg) + matchers := tsdb.NewMatchers() + cachingBucket, err := tsdb.CreateCachingBucket(cfg.BucketStore.ChunksCache, cfg.BucketStore.MetadataCache, matchers, bucketClient, logger, reg) if err != nil { return nil, errors.Wrapf(err, "create caching bucket") }