From 0e95a25b4dae9e1ba0713e84978644309f424ec0 Mon Sep 17 00:00:00 2001 From: Wen Xu Date: Tue, 28 Nov 2023 21:13:16 +0000 Subject: [PATCH 1/6] use caching bucket for compactor, except cleaner Signed-off-by: Wen Xu --- docs/blocks-storage/compactor.md | 5 +++++ docs/configuration/config-file-reference.md | 5 +++++ pkg/compactor/compactor.go | 9 +++++++++ 3 files changed, 19 insertions(+) diff --git a/docs/blocks-storage/compactor.md b/docs/blocks-storage/compactor.md index 374c192bd7..bd648bdacd 100644 --- a/docs/blocks-storage/compactor.md +++ b/docs/blocks-storage/compactor.md @@ -298,4 +298,9 @@ compactor: # When enabled, index verification will ignore out of order label names. # CLI flag: -compactor.accept-malformed-index [accept_malformed_index: | default = false] + + # When enabled, caching bucket will be used for compactor, except cleaner + # service. + # CLI flag: -compactor.caching-bucket-enabled + [caching_bucket_enabled: | default = false] ``` diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index c409a276ec..f8b91fdf6c 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -2021,6 +2021,11 @@ sharding_ring: # When enabled, index verification will ignore out of order label names. # CLI flag: -compactor.accept-malformed-index [accept_malformed_index: | default = false] + +# When enabled, caching bucket will be used for compactor, except cleaner +# service. +# CLI flag: -compactor.caching-bucket-enabled +[caching_bucket_enabled: | default = false] ``` ### `configs_config` diff --git a/pkg/compactor/compactor.go b/pkg/compactor/compactor.go index a7e5cf429d..820ffe94e7 100644 --- a/pkg/compactor/compactor.go +++ b/pkg/compactor/compactor.go @@ -22,6 +22,7 @@ import ( "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/compact" "github.com/thanos-io/thanos/pkg/compact/downsample" + "github.com/thanos-io/thanos/pkg/extprom" "github.com/cortexproject/cortex/pkg/ring" "github.com/cortexproject/cortex/pkg/storage/bucket" @@ -209,6 +210,7 @@ type Config struct { BlockVisitMarkerFileUpdateInterval time.Duration `yaml:"block_visit_marker_file_update_interval"` AcceptMalformedIndex bool `yaml:"accept_malformed_index"` + CachingBucketEnabled bool `yaml:"caching_bucket_enabled"` } // RegisterFlags registers the Compactor flags. @@ -247,6 +249,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.DurationVar(&cfg.BlockVisitMarkerFileUpdateInterval, "compactor.block-visit-marker-file-update-interval", 1*time.Minute, "How frequently block visit marker file should be updated duration compaction.") f.BoolVar(&cfg.AcceptMalformedIndex, "compactor.accept-malformed-index", false, "When enabled, index verification will ignore out of order label names.") + f.BoolVar(&cfg.CachingBucketEnabled, "compactor.caching-bucket-enabled", false, "When enabled, caching bucket will be used for compactor, except cleaner service.") } func (cfg *Config) Validate(limits validation.Limits) error { @@ -588,6 +591,12 @@ func (c *Compactor) starting(ctx context.Context) error { return errors.Wrap(err, "failed to start the blocks cleaner") } + if c.compactorCfg.CachingBucketEnabled { + c.bucketClient, err = cortex_tsdb.CreateCachingBucket(c.storageCfg.BucketStore.ChunksCache, c.storageCfg.BucketStore.MetadataCache, c.bucketClient, c.logger, extprom.WrapRegistererWith(prometheus.Labels{"component": "compactor"}, c.registerer)) + if err != nil { + return errors.Wrap(err, "create caching bucket") + } + } return nil } From 8634a2180a55d359f6b2103edded946e2016846c Mon Sep 17 00:00:00 2001 From: Wen Xu Date: Tue, 5 Dec 2023 22:24:40 +0000 Subject: [PATCH 2/6] disable chunks cache for compactor, do not cache block deletion marker and tenant deletion marker for compactor Signed-off-by: Wen Xu --- pkg/compactor/compactor.go | 9 ++- pkg/querier/blocks_store_queryable.go | 3 +- pkg/storage/tsdb/caching_bucket.go | 94 ++++++++++++++++++++++++--- pkg/storegateway/bucket_stores.go | 3 +- 4 files changed, 96 insertions(+), 13 deletions(-) diff --git a/pkg/compactor/compactor.go b/pkg/compactor/compactor.go index 820ffe94e7..03b55f1e0d 100644 --- a/pkg/compactor/compactor.go +++ b/pkg/compactor/compactor.go @@ -592,7 +592,14 @@ func (c *Compactor) starting(ctx context.Context) error { } if c.compactorCfg.CachingBucketEnabled { - c.bucketClient, err = cortex_tsdb.CreateCachingBucket(c.storageCfg.BucketStore.ChunksCache, c.storageCfg.BucketStore.MetadataCache, c.bucketClient, c.logger, extprom.WrapRegistererWith(prometheus.Labels{"component": "compactor"}, c.registerer)) + // Turn off chunksCache since compactor is using get to download chunks, while store-gateway and querier is using getrange. + c.storageCfg.BucketStore.ChunksCache.Backend = "" + matchers := cortex_tsdb.NewMatchers() + // Do not cache tenant deletion marker and block deletion marker for compactor + matchers.SetMetaFileMatcher(func(name string) bool { + return strings.HasSuffix(name, "/"+metadata.MetaFilename) + }) + c.bucketClient, err = cortex_tsdb.CreateCachingBucket(c.storageCfg.BucketStore.ChunksCache, c.storageCfg.BucketStore.MetadataCache, matchers, c.bucketClient, c.logger, extprom.WrapRegistererWith(prometheus.Labels{"component": "compactor"}, c.registerer)) if err != nil { return errors.Wrap(err, "create caching bucket") } diff --git a/pkg/querier/blocks_store_queryable.go b/pkg/querier/blocks_store_queryable.go index 2ab1698e43..c4adf5bdab 100644 --- a/pkg/querier/blocks_store_queryable.go +++ b/pkg/querier/blocks_store_queryable.go @@ -183,7 +183,8 @@ func NewBlocksStoreQueryableFromConfig(querierCfg Config, gatewayCfg storegatewa } // Blocks finder doesn't use chunks, but we pass config for consistency. - cachingBucket, err := cortex_tsdb.CreateCachingBucket(storageCfg.BucketStore.ChunksCache, storageCfg.BucketStore.MetadataCache, bucketClient, logger, extprom.WrapRegistererWith(prometheus.Labels{"component": "querier"}, reg)) + matchers := cortex_tsdb.NewMatchers() + cachingBucket, err := cortex_tsdb.CreateCachingBucket(storageCfg.BucketStore.ChunksCache, storageCfg.BucketStore.MetadataCache, matchers, bucketClient, logger, extprom.WrapRegistererWith(prometheus.Labels{"component": "querier"}, reg)) if err != nil { return nil, errors.Wrap(err, "create caching bucket") } diff --git a/pkg/storage/tsdb/caching_bucket.go b/pkg/storage/tsdb/caching_bucket.go index 990f8119e0..4c904de6e6 100644 --- a/pkg/storage/tsdb/caching_bucket.go +++ b/pkg/storage/tsdb/caching_bucket.go @@ -110,7 +110,7 @@ func (cfg *MetadataCacheConfig) Validate() error { return cfg.CacheBackend.Validate() } -func CreateCachingBucket(chunksConfig ChunksCacheConfig, metadataConfig MetadataCacheConfig, bkt objstore.InstrumentedBucket, logger log.Logger, reg prometheus.Registerer) (objstore.InstrumentedBucket, error) { +func CreateCachingBucket(chunksConfig ChunksCacheConfig, metadataConfig MetadataCacheConfig, matchers Matchers, bkt objstore.InstrumentedBucket, logger log.Logger, reg prometheus.Registerer) (objstore.InstrumentedBucket, error) { cfg := cache.NewCachingBucketConfig() cachingConfigured := false @@ -121,7 +121,7 @@ func CreateCachingBucket(chunksConfig ChunksCacheConfig, metadataConfig Metadata if chunksCache != nil { cachingConfigured = true chunksCache = cache.NewTracingCache(chunksCache) - cfg.CacheGetRange("chunks", chunksCache, isTSDBChunkFile, chunksConfig.SubrangeSize, chunksConfig.AttributesTTL, chunksConfig.SubrangeTTL, chunksConfig.MaxGetRangeRequests) + cfg.CacheGetRange("chunks", chunksCache, matchers.GetChunksMatcher(), chunksConfig.SubrangeSize, chunksConfig.AttributesTTL, chunksConfig.SubrangeTTL, chunksConfig.MaxGetRangeRequests) } metadataCache, err := createCache("metadata-cache", &metadataConfig.CacheBackend, logger, reg) @@ -132,16 +132,16 @@ func CreateCachingBucket(chunksConfig ChunksCacheConfig, metadataConfig Metadata cachingConfigured = true metadataCache = cache.NewTracingCache(metadataCache) - cfg.CacheExists("metafile", metadataCache, isMetaFile, metadataConfig.MetafileExistsTTL, metadataConfig.MetafileDoesntExistTTL) - cfg.CacheGet("metafile", metadataCache, isMetaFile, metadataConfig.MetafileMaxSize, metadataConfig.MetafileContentTTL, metadataConfig.MetafileExistsTTL, metadataConfig.MetafileDoesntExistTTL) - cfg.CacheAttributes("metafile", metadataCache, isMetaFile, metadataConfig.MetafileAttributesTTL) - cfg.CacheAttributes("block-index", metadataCache, isBlockIndexFile, metadataConfig.BlockIndexAttributesTTL) - cfg.CacheGet("bucket-index", metadataCache, isBucketIndexFiles, metadataConfig.BucketIndexMaxSize, metadataConfig.BucketIndexContentTTL /* do not cache exist / not exist: */, 0, 0) + cfg.CacheExists("metafile", metadataCache, matchers.GetMetafileMatcher(), metadataConfig.MetafileExistsTTL, metadataConfig.MetafileDoesntExistTTL) + cfg.CacheGet("metafile", metadataCache, matchers.GetMetafileMatcher(), metadataConfig.MetafileMaxSize, metadataConfig.MetafileContentTTL, metadataConfig.MetafileExistsTTL, metadataConfig.MetafileDoesntExistTTL) + cfg.CacheAttributes("metafile", metadataCache, matchers.GetMetafileMatcher(), metadataConfig.MetafileAttributesTTL) + cfg.CacheAttributes("block-index", metadataCache, matchers.GetBlockIndexMatcher(), metadataConfig.BlockIndexAttributesTTL) + cfg.CacheGet("bucket-index", metadataCache, matchers.GetBucketIndexMatcher(), metadataConfig.BucketIndexMaxSize, metadataConfig.BucketIndexContentTTL /* do not cache exist / not exist: */, 0, 0) codec := snappyIterCodec{storecache.JSONIterCodec{}} - cfg.CacheIter("tenants-iter", metadataCache, isTenantsDir, metadataConfig.TenantsListTTL, codec) - cfg.CacheIter("tenant-blocks-iter", metadataCache, isTenantBlocksDir, metadataConfig.TenantBlocksListTTL, codec) - cfg.CacheIter("chunks-iter", metadataCache, isChunksDir, metadataConfig.ChunksListTTL, codec) + cfg.CacheIter("tenants-iter", metadataCache, matchers.GetTenantsIterMatcher(), metadataConfig.TenantsListTTL, codec) + cfg.CacheIter("tenant-blocks-iter", metadataCache, matchers.GetTenantBlocksIterMatcher(), metadataConfig.TenantBlocksListTTL, codec) + cfg.CacheIter("chunks-iter", metadataCache, matchers.GetChunksIterMatcher(), metadataConfig.ChunksListTTL, codec) } if !cachingConfigured { @@ -178,6 +178,80 @@ func createCache(cacheName string, cacheBackend *CacheBackend, logger log.Logger } } +type Matchers struct { + matcherMap map[string]func(string) bool +} + +func NewMatchers() Matchers { + matcherMap := make(map[string]func(string) bool) + matcherMap["chunks"] = isTSDBChunkFile + matcherMap["metafile"] = isMetaFile + matcherMap["block-index"] = isBlockIndexFile + matcherMap["bucket-index"] = isBucketIndexFiles + matcherMap["tenants-iter"] = isTenantsDir + matcherMap["tenant-blocks-iter"] = isTenantBlocksDir + matcherMap["chunks-iter"] = isChunksDir + return Matchers{ + matcherMap: matcherMap, + } +} + +func (m *Matchers) SetMetaFileMatcher(f func(string) bool) { + m.matcherMap["metafile"] = f +} + +func (m *Matchers) SetChunksMatcher(f func(string) bool) { + m.matcherMap["chunks"] = f +} + +func (m *Matchers) SetBlockIndexMatcher(f func(string) bool) { + m.matcherMap["block-index"] = f +} + +func (m *Matchers) SetBucketIndexMatcher(f func(string) bool) { + m.matcherMap["bucket-index"] = f +} + +func (m *Matchers) SetTenantsIterMatcher(f func(string) bool) { + m.matcherMap["tenants-iter"] = f +} + +func (m *Matchers) SetTenantBlocksIterMatcher(f func(string) bool) { + m.matcherMap["tenant-blocks-iter"] = f +} + +func (m *Matchers) SetChunksIterMatcher(f func(string) bool) { + m.matcherMap["chunks-iter"] = f +} + +func (m *Matchers) GetChunksMatcher() func(string) bool { + return m.matcherMap["chunks"] +} + +func (m *Matchers) GetMetafileMatcher() func(string) bool { + return m.matcherMap["metafile"] +} + +func (m *Matchers) GetBlockIndexMatcher() func(string) bool { + return m.matcherMap["block-index"] +} + +func (m *Matchers) GetBucketIndexMatcher() func(string) bool { + return m.matcherMap["bucket-index"] +} + +func (m *Matchers) GetTenantsIterMatcher() func(string) bool { + return m.matcherMap["tenants-iter"] +} + +func (m *Matchers) GetTenantBlocksIterMatcher() func(string) bool { + return m.matcherMap["tenant-blocks-iter"] +} + +func (m *Matchers) GetChunksIterMatcher() func(string) bool { + return m.matcherMap["chunks-iter"] +} + var chunksMatcher = regexp.MustCompile(`^.*/chunks/\d+$`) func isTSDBChunkFile(name string) bool { return chunksMatcher.MatchString(name) } diff --git a/pkg/storegateway/bucket_stores.go b/pkg/storegateway/bucket_stores.go index dd2b5ad633..318d4c8f39 100644 --- a/pkg/storegateway/bucket_stores.go +++ b/pkg/storegateway/bucket_stores.go @@ -88,7 +88,8 @@ var ErrTooManyInflightRequests = status.Error(codes.ResourceExhausted, "too many // NewBucketStores makes a new BucketStores. func NewBucketStores(cfg tsdb.BlocksStorageConfig, shardingStrategy ShardingStrategy, bucketClient objstore.InstrumentedBucket, limits *validation.Overrides, logLevel logging.Level, logger log.Logger, reg prometheus.Registerer) (*BucketStores, error) { - cachingBucket, err := tsdb.CreateCachingBucket(cfg.BucketStore.ChunksCache, cfg.BucketStore.MetadataCache, bucketClient, logger, reg) + matchers := tsdb.NewMatchers() + cachingBucket, err := tsdb.CreateCachingBucket(cfg.BucketStore.ChunksCache, cfg.BucketStore.MetadataCache, matchers, bucketClient, logger, reg) if err != nil { return nil, errors.Wrapf(err, "create caching bucket") } From 2210bea14759a808246c32bb79736d001b606c44 Mon Sep 17 00:00:00 2001 From: Wen Xu Date: Wed, 6 Dec 2023 01:16:35 +0000 Subject: [PATCH 3/6] turn on cachingbucketenabled in compactor test and add more descript to the feature flag Signed-off-by: Wen Xu --- pkg/compactor/compactor.go | 2 +- pkg/compactor/compactor_test.go | 3 +++ 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/pkg/compactor/compactor.go b/pkg/compactor/compactor.go index 03b55f1e0d..fd8386a737 100644 --- a/pkg/compactor/compactor.go +++ b/pkg/compactor/compactor.go @@ -249,7 +249,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.DurationVar(&cfg.BlockVisitMarkerFileUpdateInterval, "compactor.block-visit-marker-file-update-interval", 1*time.Minute, "How frequently block visit marker file should be updated duration compaction.") f.BoolVar(&cfg.AcceptMalformedIndex, "compactor.accept-malformed-index", false, "When enabled, index verification will ignore out of order label names.") - f.BoolVar(&cfg.CachingBucketEnabled, "compactor.caching-bucket-enabled", false, "When enabled, caching bucket will be used for compactor, except cleaner service.") + f.BoolVar(&cfg.CachingBucketEnabled, "compactor.caching-bucket-enabled", false, "When enabled, caching bucket will be used for compactor, except cleaner service, which serves as the source of truth for block status") } func (cfg *Config) Validate(limits validation.Limits) error { diff --git a/pkg/compactor/compactor_test.go b/pkg/compactor/compactor_test.go index 3a22981379..dbdc28f5ad 100644 --- a/pkg/compactor/compactor_test.go +++ b/pkg/compactor/compactor_test.go @@ -1656,6 +1656,9 @@ func prepareConfig() Config { // Set lower timeout for waiting on compactor to become ACTIVE in the ring for unit tests compactorCfg.ShardingRing.WaitActiveInstanceTimeout = 5 * time.Second + // Set CachingBucketEnabled to true + compactorCfg.CachingBucketEnabled = true + return compactorCfg } From 589b6f421107f23c62b70d22e34e9e99be3667e9 Mon Sep 17 00:00:00 2001 From: Wen Xu Date: Wed, 6 Dec 2023 21:47:01 -0500 Subject: [PATCH 4/6] remove cachingBucketEnabled flag in unit test Signed-off-by: Wen Xu --- pkg/compactor/compactor_test.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/pkg/compactor/compactor_test.go b/pkg/compactor/compactor_test.go index dbdc28f5ad..3a22981379 100644 --- a/pkg/compactor/compactor_test.go +++ b/pkg/compactor/compactor_test.go @@ -1656,9 +1656,6 @@ func prepareConfig() Config { // Set lower timeout for waiting on compactor to become ACTIVE in the ring for unit tests compactorCfg.ShardingRing.WaitActiveInstanceTimeout = 5 * time.Second - // Set CachingBucketEnabled to true - compactorCfg.CachingBucketEnabled = true - return compactorCfg } From 3c58f66cfec98124670ddc2347af671033e2c93d Mon Sep 17 00:00:00 2001 From: Wen Xu Date: Wed, 6 Dec 2023 21:51:02 -0500 Subject: [PATCH 5/6] properly disable chunkscache Signed-off-by: Wen Xu --- pkg/compactor/compactor.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/pkg/compactor/compactor.go b/pkg/compactor/compactor.go index fd8386a737..77713ded32 100644 --- a/pkg/compactor/compactor.go +++ b/pkg/compactor/compactor.go @@ -592,14 +592,12 @@ func (c *Compactor) starting(ctx context.Context) error { } if c.compactorCfg.CachingBucketEnabled { - // Turn off chunksCache since compactor is using get to download chunks, while store-gateway and querier is using getrange. - c.storageCfg.BucketStore.ChunksCache.Backend = "" matchers := cortex_tsdb.NewMatchers() // Do not cache tenant deletion marker and block deletion marker for compactor matchers.SetMetaFileMatcher(func(name string) bool { return strings.HasSuffix(name, "/"+metadata.MetaFilename) }) - c.bucketClient, err = cortex_tsdb.CreateCachingBucket(c.storageCfg.BucketStore.ChunksCache, c.storageCfg.BucketStore.MetadataCache, matchers, c.bucketClient, c.logger, extprom.WrapRegistererWith(prometheus.Labels{"component": "compactor"}, c.registerer)) + c.bucketClient, err = cortex_tsdb.CreateCachingBucket(cortex_tsdb.ChunksCacheConfig{}, c.storageCfg.BucketStore.MetadataCache, matchers, c.bucketClient, c.logger, extprom.WrapRegistererWith(prometheus.Labels{"component": "compactor"}, c.registerer)) if err != nil { return errors.Wrap(err, "create caching bucket") } From 769a5c061b58987796f1854f1eab28b13ca1bce9 Mon Sep 17 00:00:00 2001 From: Wen Xu Date: Thu, 7 Dec 2023 02:56:21 +0000 Subject: [PATCH 6/6] fix lint Signed-off-by: Wen Xu --- docs/blocks-storage/compactor.md | 2 +- docs/configuration/config-file-reference.md | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/blocks-storage/compactor.md b/docs/blocks-storage/compactor.md index bd648bdacd..5fb44208ec 100644 --- a/docs/blocks-storage/compactor.md +++ b/docs/blocks-storage/compactor.md @@ -300,7 +300,7 @@ compactor: [accept_malformed_index: | default = false] # When enabled, caching bucket will be used for compactor, except cleaner - # service. + # service, which serves as the source of truth for block status # CLI flag: -compactor.caching-bucket-enabled [caching_bucket_enabled: | default = false] ``` diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index f8b91fdf6c..8282c15c9a 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -2023,7 +2023,7 @@ sharding_ring: [accept_malformed_index: | default = false] # When enabled, caching bucket will be used for compactor, except cleaner -# service. +# service, which serves as the source of truth for block status # CLI flag: -compactor.caching-bucket-enabled [caching_bucket_enabled: | default = false] ```