Skip to content

Commit

Permalink
Add multi-level chunk cache (cortexproject#6249)
Browse files Browse the repository at this point in the history
Signed-off-by: SungJin1212 <[email protected]>
  • Loading branch information
SungJin1212 authored Oct 9, 2024
1 parent e6e9fea commit d08f93b
Show file tree
Hide file tree
Showing 9 changed files with 580 additions and 44 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
* [FEATURE] Ruler: Experimental: Add `ruler.frontend-address` to allow query to query frontends instead of ingesters. #6151
* [FEATURE] Ruler: Minimize chances of missed rule group evaluations that can occur due to OOM kills, bad underlying nodes, or due to an unhealthy ruler that appears in the ring as healthy. This feature is enabled via `-ruler.enable-ha-evaluation` flag. #6129
* [FEATURE] Store Gateway: Add an in-memory chunk cache. #6245
* [FEATURE] Chunk Cache: Support multi level cache and add metrics. #6249
* [ENHANCEMENT] Ingester: Add `blocks-storage.tsdb.wal-compression-type` to support zstd wal compression type. #6232
* [ENHANCEMENT] Query Frontend: Add info field to query response. #6207
* [ENHANCEMENT] Query Frontend: Add peakSample in query stats response. #6188
Expand Down
21 changes: 19 additions & 2 deletions docs/blocks-storage/querier.md
Original file line number Diff line number Diff line change
Expand Up @@ -806,8 +806,10 @@ blocks_storage:
[max_backfill_items: <int> | default = 10000]

chunks_cache:
# Backend for chunks cache, if not empty. Supported values: memcached,
# redis, inmemory, and '' (disable).
# The chunks cache backend type. Single or Multiple cache backend can be
# provided. Supported values in single cache: memcached, redis, inmemory,
# and '' (disable). Supported values in multi level cache: a
# comma-separated list of (inmemory, memcached, redis)
# CLI flag: -blocks-storage.bucket-store.chunks-cache.backend
[backend: <string> | default = ""]

Expand Down Expand Up @@ -1018,6 +1020,21 @@ blocks_storage:
# CLI flag: -blocks-storage.bucket-store.chunks-cache.redis.set-async.circuit-breaker.failure-percent
[failure_percent: <float> | default = 0.05]

multilevel:
# The maximum number of concurrent asynchronous operations can occur
# when backfilling cache items.
# CLI flag: -blocks-storage.bucket-store.chunks-cache.multilevel.max-async-concurrency
[max_async_concurrency: <int> | default = 3]

# The maximum number of enqueued asynchronous operations allowed when
# backfilling cache items.
# CLI flag: -blocks-storage.bucket-store.chunks-cache.multilevel.max-async-buffer-size
[max_async_buffer_size: <int> | default = 10000]

# The maximum number of items to backfill per asynchronous operation.
# CLI flag: -blocks-storage.bucket-store.chunks-cache.multilevel.max-backfill-items
[max_backfill_items: <int> | default = 10000]

# Size of each subrange that bucket object is split into for better
# caching.
# CLI flag: -blocks-storage.bucket-store.chunks-cache.subrange-size
Expand Down
21 changes: 19 additions & 2 deletions docs/blocks-storage/store-gateway.md
Original file line number Diff line number Diff line change
Expand Up @@ -903,8 +903,10 @@ blocks_storage:
[max_backfill_items: <int> | default = 10000]

chunks_cache:
# Backend for chunks cache, if not empty. Supported values: memcached,
# redis, inmemory, and '' (disable).
# The chunks cache backend type. Single or Multiple cache backend can be
# provided. Supported values in single cache: memcached, redis, inmemory,
# and '' (disable). Supported values in multi level cache: a
# comma-separated list of (inmemory, memcached, redis)
# CLI flag: -blocks-storage.bucket-store.chunks-cache.backend
[backend: <string> | default = ""]

Expand Down Expand Up @@ -1115,6 +1117,21 @@ blocks_storage:
# CLI flag: -blocks-storage.bucket-store.chunks-cache.redis.set-async.circuit-breaker.failure-percent
[failure_percent: <float> | default = 0.05]

multilevel:
# The maximum number of concurrent asynchronous operations can occur
# when backfilling cache items.
# CLI flag: -blocks-storage.bucket-store.chunks-cache.multilevel.max-async-concurrency
[max_async_concurrency: <int> | default = 3]

# The maximum number of enqueued asynchronous operations allowed when
# backfilling cache items.
# CLI flag: -blocks-storage.bucket-store.chunks-cache.multilevel.max-async-buffer-size
[max_async_buffer_size: <int> | default = 10000]

# The maximum number of items to backfill per asynchronous operation.
# CLI flag: -blocks-storage.bucket-store.chunks-cache.multilevel.max-backfill-items
[max_backfill_items: <int> | default = 10000]

# Size of each subrange that bucket object is split into for better
# caching.
# CLI flag: -blocks-storage.bucket-store.chunks-cache.subrange-size
Expand Down
21 changes: 19 additions & 2 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -1339,8 +1339,10 @@ bucket_store:
[max_backfill_items: <int> | default = 10000]

chunks_cache:
# Backend for chunks cache, if not empty. Supported values: memcached,
# redis, inmemory, and '' (disable).
# The chunks cache backend type. Single or Multiple cache backend can be
# provided. Supported values in single cache: memcached, redis, inmemory,
# and '' (disable). Supported values in multi level cache: a comma-separated
# list of (inmemory, memcached, redis)
# CLI flag: -blocks-storage.bucket-store.chunks-cache.backend
[backend: <string> | default = ""]

Expand Down Expand Up @@ -1549,6 +1551,21 @@ bucket_store:
# CLI flag: -blocks-storage.bucket-store.chunks-cache.redis.set-async.circuit-breaker.failure-percent
[failure_percent: <float> | default = 0.05]

multilevel:
# The maximum number of concurrent asynchronous operations can occur when
# backfilling cache items.
# CLI flag: -blocks-storage.bucket-store.chunks-cache.multilevel.max-async-concurrency
[max_async_concurrency: <int> | default = 3]

# The maximum number of enqueued asynchronous operations allowed when
# backfilling cache items.
# CLI flag: -blocks-storage.bucket-store.chunks-cache.multilevel.max-async-buffer-size
[max_async_buffer_size: <int> | default = 10000]

# The maximum number of items to backfill per asynchronous operation.
# CLI flag: -blocks-storage.bucket-store.chunks-cache.multilevel.max-backfill-items
[max_backfill_items: <int> | default = 10000]

# Size of each subrange that bucket object is split into for better caching.
# CLI flag: -blocks-storage.bucket-store.chunks-cache.subrange-size
[subrange_size: <int> | default = 16000]
Expand Down
30 changes: 28 additions & 2 deletions integration/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,12 @@ func TestQuerierWithBlocksStorageRunningInMicroservicesMode(t *testing.T) {
chunkCacheBackend: tsdb.CacheBackendRedis,
bucketIndexEnabled: true,
},
"blocks sharding disabled, in-memory chunk cache": {
blocksShardingStrategy: "",
indexCacheBackend: tsdb.IndexCacheBackendRedis,
chunkCacheBackend: tsdb.CacheBackendInMemory,
bucketIndexEnabled: true,
},
"blocks default sharding, in-memory chunk cache": {
blocksShardingStrategy: "default",
indexCacheBackend: tsdb.IndexCacheBackendRedis,
Expand All @@ -110,6 +116,25 @@ func TestQuerierWithBlocksStorageRunningInMicroservicesMode(t *testing.T) {
chunkCacheBackend: tsdb.CacheBackendInMemory,
bucketIndexEnabled: true,
},
"block sharding disabled, multi-level chunk cache": {
blocksShardingStrategy: "",
indexCacheBackend: tsdb.IndexCacheBackendRedis,
chunkCacheBackend: fmt.Sprintf("%v,%v,%v", tsdb.CacheBackendInMemory, tsdb.CacheBackendMemcached, tsdb.CacheBackendRedis),
bucketIndexEnabled: true,
},
"block default sharding, multi-level chunk cache": {
blocksShardingStrategy: "default",
indexCacheBackend: tsdb.IndexCacheBackendRedis,
chunkCacheBackend: fmt.Sprintf("%v,%v,%v", tsdb.CacheBackendInMemory, tsdb.CacheBackendMemcached, tsdb.CacheBackendRedis),
bucketIndexEnabled: true,
},
"block shuffle sharding, multi-level chunk cache": {
blocksShardingStrategy: "shuffle-sharding",
tenantShardSize: 1,
indexCacheBackend: tsdb.IndexCacheBackendRedis,
chunkCacheBackend: fmt.Sprintf("%v,%v,%v", tsdb.CacheBackendInMemory, tsdb.CacheBackendMemcached, tsdb.CacheBackendRedis),
bucketIndexEnabled: true,
},
}

for testName, testCfg := range tests {
Expand Down Expand Up @@ -154,9 +179,10 @@ func TestQuerierWithBlocksStorageRunningInMicroservicesMode(t *testing.T) {
if strings.Contains(testCfg.indexCacheBackend, tsdb.IndexCacheBackendRedis) {
flags["-blocks-storage.bucket-store.index-cache.redis.addresses"] = redis.NetworkEndpoint(e2ecache.RedisPort)
}
if testCfg.chunkCacheBackend == tsdb.CacheBackendMemcached {
if strings.Contains(testCfg.chunkCacheBackend, tsdb.CacheBackendMemcached) {
flags["-blocks-storage.bucket-store.chunks-cache.memcached.addresses"] = "dns+" + memcached.NetworkEndpoint(e2ecache.MemcachedPort)
} else if testCfg.chunkCacheBackend == tsdb.CacheBackendRedis {
}
if strings.Contains(testCfg.chunkCacheBackend, tsdb.CacheBackendRedis) {
flags["-blocks-storage.bucket-store.chunks-cache.redis.addresses"] = redis.NetworkEndpoint(e2ecache.RedisPort)
}

Expand Down
118 changes: 82 additions & 36 deletions pkg/storage/tsdb/caching_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,16 @@ import (
"github.com/thanos-io/thanos/pkg/cacheutil"
"github.com/thanos-io/thanos/pkg/model"
storecache "github.com/thanos-io/thanos/pkg/store/cache"

"github.com/cortexproject/cortex/pkg/util"
)

var (
supportedChunkCacheBackends = []string{CacheBackendInMemory, CacheBackendMemcached, CacheBackendRedis}
supportedMetadataCacheBackends = []string{CacheBackendMemcached, CacheBackendRedis}

errUnsupportedChunkCacheBackend = errors.New("unsupported chunk cache backend")
errDuplicatedChunkCacheBackend = errors.New("duplicated chunk cache backend")
)

const (
Expand Down Expand Up @@ -56,23 +60,52 @@ func (cfg *MetadataCacheBackend) Validate() error {
}

type ChunkCacheBackend struct {
Backend string `yaml:"backend"`
InMemory InMemoryChunkCacheConfig `yaml:"inmemory"`
Memcached MemcachedClientConfig `yaml:"memcached"`
Redis RedisClientConfig `yaml:"redis"`
Backend string `yaml:"backend"`
InMemory InMemoryChunkCacheConfig `yaml:"inmemory"`
Memcached MemcachedClientConfig `yaml:"memcached"`
Redis RedisClientConfig `yaml:"redis"`
MultiLevel MultiLevelChunkCacheConfig `yaml:"multilevel"`
}

// Validate the config.
func (cfg *ChunkCacheBackend) Validate() error {
switch cfg.Backend {
case CacheBackendMemcached:
return cfg.Memcached.Validate()
case CacheBackendRedis:
return cfg.Redis.Validate()
case CacheBackendInMemory, "":
default:
return errUnsupportedChunkCacheBackend
if cfg.Backend == "" {
return nil
}

splitBackends := strings.Split(cfg.Backend, ",")
configuredBackends := map[string]struct{}{}

if len(splitBackends) > 1 {
if err := cfg.MultiLevel.Validate(); err != nil {
return err
}
}

for _, backend := range splitBackends {
if !util.StringsContain(supportedChunkCacheBackends, backend) {
return errUnsupportedChunkCacheBackend
}

if _, ok := configuredBackends[backend]; ok {
return errDuplicatedChunkCacheBackend
}

switch backend {
case CacheBackendMemcached:
if err := cfg.Memcached.Validate(); err != nil {
return err
}
case CacheBackendRedis:
if err := cfg.Redis.Validate(); err != nil {
return err
}
case CacheBackendInMemory:
}

configuredBackends[backend] = struct{}{}
}

return nil
}

Expand All @@ -86,16 +119,22 @@ type ChunksCacheConfig struct {
}

func (cfg *ChunksCacheConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix string) {
f.StringVar(&cfg.Backend, prefix+"backend", "", fmt.Sprintf("Backend for chunks cache, if not empty. Supported values: %s, %s, %s, and '' (disable).", CacheBackendMemcached, CacheBackendRedis, CacheBackendInMemory))
f.StringVar(&cfg.Backend, prefix+"backend", "", fmt.Sprintf("The chunks cache backend type. Single or Multiple cache backend can be provided. "+
"Supported values in single cache: %s, %s, %s, and '' (disable). "+
"Supported values in multi level cache: a comma-separated list of (%s)", CacheBackendMemcached, CacheBackendRedis, CacheBackendInMemory, strings.Join(supportedChunkCacheBackends, ", ")))

cfg.Memcached.RegisterFlagsWithPrefix(f, prefix+"memcached.")
cfg.Redis.RegisterFlagsWithPrefix(f, prefix+"redis.")
cfg.InMemory.RegisterFlagsWithPrefix(f, prefix+"inmemory.")
cfg.MultiLevel.RegisterFlagsWithPrefix(f, prefix+"multilevel.")

f.Int64Var(&cfg.SubrangeSize, prefix+"subrange-size", 16000, "Size of each subrange that bucket object is split into for better caching.")
f.IntVar(&cfg.MaxGetRangeRequests, prefix+"max-get-range-requests", 3, "Maximum number of sub-GetRange requests that a single GetRange request can be split into when fetching chunks. Zero or negative value = unlimited number of sub-requests.")
f.DurationVar(&cfg.AttributesTTL, prefix+"attributes-ttl", 168*time.Hour, "TTL for caching object attributes for chunks.")
f.DurationVar(&cfg.SubrangeTTL, prefix+"subrange-ttl", 24*time.Hour, "TTL for caching individual chunks subranges.")

// In the multi level chunk cache, backfill TTL follows subrange TTL
cfg.ChunkCacheBackend.MultiLevel.BackFillTTL = cfg.SubrangeTTL
}

func (cfg *ChunksCacheConfig) Validate() error {
Expand Down Expand Up @@ -232,34 +271,41 @@ func createMetadataCache(cacheName string, cacheBackend *MetadataCacheBackend, l
}

func createChunkCache(cacheName string, cacheBackend *ChunkCacheBackend, logger log.Logger, reg prometheus.Registerer) (cache.Cache, error) {
switch cacheBackend.Backend {
case "":
if cacheBackend.Backend == "" {
// No caching.
return nil, nil
case CacheBackendInMemory:
inMemoryCache, err := cache.NewInMemoryCacheWithConfig(cacheName, logger, reg, cacheBackend.InMemory.toInMemoryChunkCacheConfig())
if err != nil {
return nil, errors.Wrapf(err, "failed to create in-memory chunk cache")
}
return inMemoryCache, nil
case CacheBackendMemcached:
var client cacheutil.MemcachedClient
client, err := cacheutil.NewMemcachedClientWithConfig(logger, cacheName, cacheBackend.Memcached.ToMemcachedClientConfig(), reg)
if err != nil {
return nil, errors.Wrapf(err, "failed to create memcached client")
}
return cache.NewMemcachedCache(cacheName, logger, client, reg), nil
}

case CacheBackendRedis:
redisCache, err := cacheutil.NewRedisClientWithConfig(logger, cacheName, cacheBackend.Redis.ToRedisClientConfig(), reg)
if err != nil {
return nil, errors.Wrapf(err, "failed to create redis client")
splitBackends := strings.Split(cacheBackend.Backend, ",")
var (
caches []cache.Cache
)

for _, backend := range splitBackends {
switch backend {
case CacheBackendInMemory:
inMemoryCache, err := cache.NewInMemoryCacheWithConfig(cacheName, logger, reg, cacheBackend.InMemory.toInMemoryChunkCacheConfig())
if err != nil {
return nil, errors.Wrapf(err, "failed to create in-memory chunk cache")
}
caches = append(caches, inMemoryCache)
case CacheBackendMemcached:
var client cacheutil.MemcachedClient
client, err := cacheutil.NewMemcachedClientWithConfig(logger, cacheName, cacheBackend.Memcached.ToMemcachedClientConfig(), reg)
if err != nil {
return nil, errors.Wrapf(err, "failed to create memcached client")
}
caches = append(caches, cache.NewMemcachedCache(cacheName, logger, client, reg))
case CacheBackendRedis:
redisCache, err := cacheutil.NewRedisClientWithConfig(logger, cacheName, cacheBackend.Redis.ToRedisClientConfig(), reg)
if err != nil {
return nil, errors.Wrapf(err, "failed to create redis client")
}
caches = append(caches, cache.NewRedisCache(cacheName, logger, redisCache, reg))
}
return cache.NewRedisCache(cacheName, logger, redisCache, reg), nil

default:
return nil, errors.Errorf("unsupported cache type for cache %s: %s", cacheName, cacheBackend.Backend)
}

return newMultiLevelChunkCache(cacheName, cacheBackend.MultiLevel, reg, caches...), nil
}

type Matchers struct {
Expand Down
Loading

0 comments on commit d08f93b

Please sign in to comment.