From 53556fe81caed4ef772c0614ea7d3d39d8cfa434 Mon Sep 17 00:00:00 2001 From: SungJin1212 Date: Thu, 3 Oct 2024 07:07:35 +0900 Subject: [PATCH] Add in-memory chunk cache (#6245) Signed-off-by: SungJin1212 --- CHANGELOG.md | 3 +- docs/blocks-storage/querier.md | 9 +- docs/blocks-storage/store-gateway.md | 9 +- docs/configuration/config-file-reference.md | 9 +- integration/querier_test.go | 13 +++ pkg/storage/tsdb/caching_bucket.go | 102 ++++++++++++++++++-- pkg/storage/tsdb/caching_bucket_test.go | 51 ++++++++++ 7 files changed, 182 insertions(+), 14 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4305f4ed27..a86e11d57d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,10 +2,11 @@ ## master / unreleased -* [FEATURE] Query Frontend/Querier: Add protobuf codec `-api.querier-default-codec` and the option to choose response compression type `-querier.response-compression`. #5527 * [CHANGE] Enable Compactor and Alertmanager in target all. #6204 +* [FEATURE] Query Frontend/Querier: Add protobuf codec `-api.querier-default-codec` and the option to choose response compression type `-querier.response-compression`. #5527 * [FEATURE] Ruler: Experimental: Add `ruler.frontend-address` to allow query to query frontends instead of ingesters. #6151 * [FEATURE] Ruler: Minimize chances of missed rule group evaluations that can occur due to OOM kills, bad underlying nodes, or due to an unhealthy ruler that appears in the ring as healthy. This feature is enabled via `-ruler.enable-ha-evaluation` flag. #6129 +* [FEATURE] Store Gateway: Add an in-memory chunk cache. #6245 * [ENHANCEMENT] Ingester: Add `blocks-storage.tsdb.wal-compression-type` to support zstd wal compression type. #6232 * [ENHANCEMENT] Query Frontend: Add info field to query response. #6207 * [ENHANCEMENT] Query Frontend: Add peakSample in query stats response. #6188 diff --git a/docs/blocks-storage/querier.md b/docs/blocks-storage/querier.md index 00605f60e4..ba6caa34e3 100644 --- a/docs/blocks-storage/querier.md +++ b/docs/blocks-storage/querier.md @@ -788,10 +788,17 @@ blocks_storage: [max_backfill_items: | default = 10000] chunks_cache: - # Backend for chunks cache, if not empty. Supported values: memcached. + # Backend for chunks cache, if not empty. Supported values: memcached, + # redis, inmemory, and '' (disable). # CLI flag: -blocks-storage.bucket-store.chunks-cache.backend [backend: | default = ""] + inmemory: + # Maximum size in bytes of in-memory chunk cache used to speed up chunk + # lookups (shared between all tenants). + # CLI flag: -blocks-storage.bucket-store.chunks-cache.inmemory.max-size-bytes + [max_size_bytes: | default = 1073741824] + memcached: # Comma separated list of memcached addresses. Supported prefixes are: # dns+ (looked up as an A/AAAA query), dnssrv+ (looked up as a SRV diff --git a/docs/blocks-storage/store-gateway.md b/docs/blocks-storage/store-gateway.md index 3303ec9cb9..5279be4867 100644 --- a/docs/blocks-storage/store-gateway.md +++ b/docs/blocks-storage/store-gateway.md @@ -903,10 +903,17 @@ blocks_storage: [max_backfill_items: | default = 10000] chunks_cache: - # Backend for chunks cache, if not empty. Supported values: memcached. + # Backend for chunks cache, if not empty. Supported values: memcached, + # redis, inmemory, and '' (disable). # CLI flag: -blocks-storage.bucket-store.chunks-cache.backend [backend: | default = ""] + inmemory: + # Maximum size in bytes of in-memory chunk cache used to speed up chunk + # lookups (shared between all tenants). + # CLI flag: -blocks-storage.bucket-store.chunks-cache.inmemory.max-size-bytes + [max_size_bytes: | default = 1073741824] + memcached: # Comma separated list of memcached addresses. Supported prefixes are: # dns+ (looked up as an A/AAAA query), dnssrv+ (looked up as a SRV diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 3146f68c3c..ceda75705b 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -1339,10 +1339,17 @@ bucket_store: [max_backfill_items: | default = 10000] chunks_cache: - # Backend for chunks cache, if not empty. Supported values: memcached. + # Backend for chunks cache, if not empty. Supported values: memcached, + # redis, inmemory, and '' (disable). # CLI flag: -blocks-storage.bucket-store.chunks-cache.backend [backend: | default = ""] + inmemory: + # Maximum size in bytes of in-memory chunk cache used to speed up chunk + # lookups (shared between all tenants). + # CLI flag: -blocks-storage.bucket-store.chunks-cache.inmemory.max-size-bytes + [max_size_bytes: | default = 1073741824] + memcached: # Comma separated list of memcached addresses. Supported prefixes are: # dns+ (looked up as an A/AAAA query), dnssrv+ (looked up as a SRV query, diff --git a/integration/querier_test.go b/integration/querier_test.go index a261a6d2c6..ae442bcb16 100644 --- a/integration/querier_test.go +++ b/integration/querier_test.go @@ -97,6 +97,19 @@ func TestQuerierWithBlocksStorageRunningInMicroservicesMode(t *testing.T) { chunkCacheBackend: tsdb.CacheBackendRedis, bucketIndexEnabled: true, }, + "blocks default sharding, in-memory chunk cache": { + blocksShardingStrategy: "default", + indexCacheBackend: tsdb.IndexCacheBackendRedis, + chunkCacheBackend: tsdb.CacheBackendInMemory, + bucketIndexEnabled: true, + }, + "blocks shuffle sharding, in-memory chunk cache": { + blocksShardingStrategy: "shuffle-sharding", + tenantShardSize: 1, + indexCacheBackend: tsdb.IndexCacheBackendRedis, + chunkCacheBackend: tsdb.CacheBackendInMemory, + bucketIndexEnabled: true, + }, } for testName, testCfg := range tests { diff --git a/pkg/storage/tsdb/caching_bucket.go b/pkg/storage/tsdb/caching_bucket.go index cbd9efee59..4946e3f036 100644 --- a/pkg/storage/tsdb/caching_bucket.go +++ b/pkg/storage/tsdb/caching_bucket.go @@ -8,6 +8,7 @@ import ( "strings" "time" + "github.com/alecthomas/units" "github.com/go-kit/log" "github.com/golang/snappy" "github.com/oklog/ulid" @@ -18,22 +19,28 @@ import ( "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/cache" "github.com/thanos-io/thanos/pkg/cacheutil" + "github.com/thanos-io/thanos/pkg/model" storecache "github.com/thanos-io/thanos/pkg/store/cache" ) +var ( + errUnsupportedChunkCacheBackend = errors.New("unsupported chunk cache backend") +) + const ( CacheBackendMemcached = "memcached" CacheBackendRedis = "redis" + CacheBackendInMemory = "inmemory" ) -type CacheBackend struct { +type MetadataCacheBackend struct { Backend string `yaml:"backend"` Memcached MemcachedClientConfig `yaml:"memcached"` Redis RedisClientConfig `yaml:"redis"` } // Validate the config. -func (cfg *CacheBackend) Validate() error { +func (cfg *MetadataCacheBackend) Validate() error { switch cfg.Backend { case CacheBackendMemcached: return cfg.Memcached.Validate() @@ -46,8 +53,29 @@ func (cfg *CacheBackend) Validate() error { return nil } +type ChunkCacheBackend struct { + Backend string `yaml:"backend"` + InMemory InMemoryChunkCacheConfig `yaml:"inmemory"` + Memcached MemcachedClientConfig `yaml:"memcached"` + Redis RedisClientConfig `yaml:"redis"` +} + +// Validate the config. +func (cfg *ChunkCacheBackend) Validate() error { + switch cfg.Backend { + case CacheBackendMemcached: + return cfg.Memcached.Validate() + case CacheBackendRedis: + return cfg.Redis.Validate() + case CacheBackendInMemory, "": + default: + return errUnsupportedChunkCacheBackend + } + return nil +} + type ChunksCacheConfig struct { - CacheBackend `yaml:",inline"` + ChunkCacheBackend `yaml:",inline"` SubrangeSize int64 `yaml:"subrange_size"` MaxGetRangeRequests int `yaml:"max_get_range_requests"` @@ -56,10 +84,11 @@ type ChunksCacheConfig struct { } func (cfg *ChunksCacheConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix string) { - f.StringVar(&cfg.Backend, prefix+"backend", "", fmt.Sprintf("Backend for chunks cache, if not empty. Supported values: %s.", CacheBackendMemcached)) + f.StringVar(&cfg.Backend, prefix+"backend", "", fmt.Sprintf("Backend for chunks cache, if not empty. Supported values: %s, %s, %s, and '' (disable).", CacheBackendMemcached, CacheBackendRedis, CacheBackendInMemory)) cfg.Memcached.RegisterFlagsWithPrefix(f, prefix+"memcached.") cfg.Redis.RegisterFlagsWithPrefix(f, prefix+"redis.") + cfg.InMemory.RegisterFlagsWithPrefix(f, prefix+"inmemory.") f.Int64Var(&cfg.SubrangeSize, prefix+"subrange-size", 16000, "Size of each subrange that bucket object is split into for better caching.") f.IntVar(&cfg.MaxGetRangeRequests, prefix+"max-get-range-requests", 3, "Maximum number of sub-GetRange requests that a single GetRange request can be split into when fetching chunks. Zero or negative value = unlimited number of sub-requests.") @@ -68,11 +97,34 @@ func (cfg *ChunksCacheConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix st } func (cfg *ChunksCacheConfig) Validate() error { - return cfg.CacheBackend.Validate() + return cfg.ChunkCacheBackend.Validate() +} + +type InMemoryChunkCacheConfig struct { + MaxSizeBytes uint64 `yaml:"max_size_bytes"` +} + +func (cfg *InMemoryChunkCacheConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix string) { + f.Uint64Var(&cfg.MaxSizeBytes, prefix+"max-size-bytes", uint64(1*units.Gibibyte), "Maximum size in bytes of in-memory chunk cache used to speed up chunk lookups (shared between all tenants).") +} + +func (cfg *InMemoryChunkCacheConfig) toInMemoryChunkCacheConfig() cache.InMemoryCacheConfig { + maxCacheSize := model.Bytes(cfg.MaxSizeBytes) + + // Calculate the max item size. + maxItemSize := defaultMaxItemSize + if maxItemSize > maxCacheSize { + maxItemSize = maxCacheSize + } + + return cache.InMemoryCacheConfig{ + MaxSize: maxCacheSize, + MaxItemSize: maxItemSize, + } } type MetadataCacheConfig struct { - CacheBackend `yaml:",inline"` + MetadataCacheBackend `yaml:",inline"` TenantsListTTL time.Duration `yaml:"tenants_list_ttl"` TenantBlocksListTTL time.Duration `yaml:"tenant_blocks_list_ttl"` @@ -107,14 +159,14 @@ func (cfg *MetadataCacheConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix } func (cfg *MetadataCacheConfig) Validate() error { - return cfg.CacheBackend.Validate() + return cfg.MetadataCacheBackend.Validate() } func CreateCachingBucket(chunksConfig ChunksCacheConfig, metadataConfig MetadataCacheConfig, matchers Matchers, bkt objstore.InstrumentedBucket, logger log.Logger, reg prometheus.Registerer) (objstore.InstrumentedBucket, error) { cfg := cache.NewCachingBucketConfig() cachingConfigured := false - chunksCache, err := createCache("chunks-cache", &chunksConfig.CacheBackend, logger, reg) + chunksCache, err := createChunkCache("chunks-cache", &chunksConfig.ChunkCacheBackend, logger, reg) if err != nil { return nil, errors.Wrapf(err, "chunks-cache") } @@ -124,7 +176,7 @@ func CreateCachingBucket(chunksConfig ChunksCacheConfig, metadataConfig Metadata cfg.CacheGetRange("chunks", chunksCache, matchers.GetChunksMatcher(), chunksConfig.SubrangeSize, chunksConfig.AttributesTTL, chunksConfig.SubrangeTTL, chunksConfig.MaxGetRangeRequests) } - metadataCache, err := createCache("metadata-cache", &metadataConfig.CacheBackend, logger, reg) + metadataCache, err := createMetadataCache("metadata-cache", &metadataConfig.MetadataCacheBackend, logger, reg) if err != nil { return nil, errors.Wrapf(err, "metadata-cache") } @@ -152,12 +204,42 @@ func CreateCachingBucket(chunksConfig ChunksCacheConfig, metadataConfig Metadata return storecache.NewCachingBucket(bkt, cfg, logger, reg) } -func createCache(cacheName string, cacheBackend *CacheBackend, logger log.Logger, reg prometheus.Registerer) (cache.Cache, error) { +func createMetadataCache(cacheName string, cacheBackend *MetadataCacheBackend, logger log.Logger, reg prometheus.Registerer) (cache.Cache, error) { switch cacheBackend.Backend { case "": // No caching. return nil, nil + case CacheBackendMemcached: + var client cacheutil.MemcachedClient + client, err := cacheutil.NewMemcachedClientWithConfig(logger, cacheName, cacheBackend.Memcached.ToMemcachedClientConfig(), reg) + if err != nil { + return nil, errors.Wrapf(err, "failed to create memcached client") + } + return cache.NewMemcachedCache(cacheName, logger, client, reg), nil + case CacheBackendRedis: + redisCache, err := cacheutil.NewRedisClientWithConfig(logger, cacheName, cacheBackend.Redis.ToRedisClientConfig(), reg) + if err != nil { + return nil, errors.Wrapf(err, "failed to create redis client") + } + return cache.NewRedisCache(cacheName, logger, redisCache, reg), nil + + default: + return nil, errors.Errorf("unsupported cache type for cache %s: %s", cacheName, cacheBackend.Backend) + } +} + +func createChunkCache(cacheName string, cacheBackend *ChunkCacheBackend, logger log.Logger, reg prometheus.Registerer) (cache.Cache, error) { + switch cacheBackend.Backend { + case "": + // No caching. + return nil, nil + case CacheBackendInMemory: + inMemoryCache, err := cache.NewInMemoryCacheWithConfig(cacheName, logger, reg, cacheBackend.InMemory.toInMemoryChunkCacheConfig()) + if err != nil { + return nil, errors.Wrapf(err, "failed to create in-memory chunk cache") + } + return inMemoryCache, nil case CacheBackendMemcached: var client cacheutil.MemcachedClient client, err := cacheutil.NewMemcachedClientWithConfig(logger, cacheName, cacheBackend.Memcached.ToMemcachedClientConfig(), reg) diff --git a/pkg/storage/tsdb/caching_bucket_test.go b/pkg/storage/tsdb/caching_bucket_test.go index e2fdc395a6..78ad1fb9b9 100644 --- a/pkg/storage/tsdb/caching_bucket_test.go +++ b/pkg/storage/tsdb/caching_bucket_test.go @@ -8,6 +8,57 @@ import ( "github.com/stretchr/testify/assert" ) +func Test_ChunkCacheBackendValidation(t *testing.T) { + tests := map[string]struct { + cfg ChunkCacheBackend + expectedErr error + }{ + "valid chunk cache type ('')": { + cfg: ChunkCacheBackend{ + Backend: "", + }, + expectedErr: nil, + }, + "valid chunk cache type (in-memory)": { + cfg: ChunkCacheBackend{ + Backend: CacheBackendInMemory, + }, + expectedErr: nil, + }, + "valid chunk cache type (memcached)": { + cfg: ChunkCacheBackend{ + Backend: CacheBackendMemcached, + Memcached: MemcachedClientConfig{ + Addresses: "dns+localhost:11211", + }, + }, + expectedErr: nil, + }, + "valid chunk cache type (redis)": { + cfg: ChunkCacheBackend{ + Backend: CacheBackendRedis, + Redis: RedisClientConfig{ + Addresses: "localhost:6379", + }, + }, + expectedErr: nil, + }, + "invalid chunk cache type": { + cfg: ChunkCacheBackend{ + Backend: "dummy", + }, + expectedErr: errUnsupportedChunkCacheBackend, + }, + } + + for name, tc := range tests { + t.Run(name, func(t *testing.T) { + err := tc.cfg.Validate() + assert.Equal(t, tc.expectedErr, err) + }) + } +} + func TestIsTenantDir(t *testing.T) { assert.False(t, isTenantBlocksDir("")) assert.True(t, isTenantBlocksDir("test"))