diff --git a/docs/blocks-storage/querier.md b/docs/blocks-storage/querier.md index b7283df8aa..2e9ec58de4 100644 --- a/docs/blocks-storage/querier.md +++ b/docs/blocks-storage/querier.md @@ -536,6 +536,11 @@ blocks_storage: # CLI flag: -blocks-storage.bucket-store.index-cache.inmemory.max-size-bytes [max_size_bytes: | default = 1073741824] + # Selectively cache index item types. Supported values are Postings, + # ExpandedPostings and Series + # CLI flag: -blocks-storage.bucket-store.index-cache.inmemory.enabled-items + [enabled_items: | default = []] + memcached: # Comma separated list of memcached addresses. Supported prefixes are: # dns+ (looked up as an A/AAAA query), dnssrv+ (looked up as a SRV @@ -583,6 +588,11 @@ blocks_storage: # CLI flag: -blocks-storage.bucket-store.index-cache.memcached.auto-discovery [auto_discovery: | default = false] + # Selectively cache index item types. Supported values are Postings, + # ExpandedPostings and Series + # CLI flag: -blocks-storage.bucket-store.index-cache.memcached.enabled-items + [enabled_items: | default = []] + redis: # Comma separated list of redis addresses. Supported prefixes are: dns+ # (looked up as an A/AAAA query), dnssrv+ (looked up as a SRV query, @@ -679,6 +689,11 @@ blocks_storage: # CLI flag: -blocks-storage.bucket-store.index-cache.redis.cache-size [cache_size: | default = 0] + # Selectively cache index item types. Supported values are Postings, + # ExpandedPostings and Series + # CLI flag: -blocks-storage.bucket-store.index-cache.redis.enabled-items + [enabled_items: | default = []] + chunks_cache: # Backend for chunks cache, if not empty. Supported values: memcached. # CLI flag: -blocks-storage.bucket-store.chunks-cache.backend diff --git a/docs/blocks-storage/store-gateway.md b/docs/blocks-storage/store-gateway.md index 82c30a121e..7162d34b93 100644 --- a/docs/blocks-storage/store-gateway.md +++ b/docs/blocks-storage/store-gateway.md @@ -643,6 +643,11 @@ blocks_storage: # CLI flag: -blocks-storage.bucket-store.index-cache.inmemory.max-size-bytes [max_size_bytes: | default = 1073741824] + # Selectively cache index item types. Supported values are Postings, + # ExpandedPostings and Series + # CLI flag: -blocks-storage.bucket-store.index-cache.inmemory.enabled-items + [enabled_items: | default = []] + memcached: # Comma separated list of memcached addresses. Supported prefixes are: # dns+ (looked up as an A/AAAA query), dnssrv+ (looked up as a SRV @@ -690,6 +695,11 @@ blocks_storage: # CLI flag: -blocks-storage.bucket-store.index-cache.memcached.auto-discovery [auto_discovery: | default = false] + # Selectively cache index item types. Supported values are Postings, + # ExpandedPostings and Series + # CLI flag: -blocks-storage.bucket-store.index-cache.memcached.enabled-items + [enabled_items: | default = []] + redis: # Comma separated list of redis addresses. Supported prefixes are: dns+ # (looked up as an A/AAAA query), dnssrv+ (looked up as a SRV query, @@ -786,6 +796,11 @@ blocks_storage: # CLI flag: -blocks-storage.bucket-store.index-cache.redis.cache-size [cache_size: | default = 0] + # Selectively cache index item types. Supported values are Postings, + # ExpandedPostings and Series + # CLI flag: -blocks-storage.bucket-store.index-cache.redis.enabled-items + [enabled_items: | default = []] + chunks_cache: # Backend for chunks cache, if not empty. Supported values: memcached. # CLI flag: -blocks-storage.bucket-store.chunks-cache.backend diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 71fce349af..5103df9a38 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -1083,6 +1083,11 @@ bucket_store: # CLI flag: -blocks-storage.bucket-store.index-cache.inmemory.max-size-bytes [max_size_bytes: | default = 1073741824] + # Selectively cache index item types. Supported values are Postings, + # ExpandedPostings and Series + # CLI flag: -blocks-storage.bucket-store.index-cache.inmemory.enabled-items + [enabled_items: | default = []] + memcached: # Comma separated list of memcached addresses. Supported prefixes are: # dns+ (looked up as an A/AAAA query), dnssrv+ (looked up as a SRV query, @@ -1130,6 +1135,11 @@ bucket_store: # CLI flag: -blocks-storage.bucket-store.index-cache.memcached.auto-discovery [auto_discovery: | default = false] + # Selectively cache index item types. Supported values are Postings, + # ExpandedPostings and Series + # CLI flag: -blocks-storage.bucket-store.index-cache.memcached.enabled-items + [enabled_items: | default = []] + redis: # Comma separated list of redis addresses. Supported prefixes are: dns+ # (looked up as an A/AAAA query), dnssrv+ (looked up as a SRV query, @@ -1226,6 +1236,11 @@ bucket_store: # CLI flag: -blocks-storage.bucket-store.index-cache.redis.cache-size [cache_size: | default = 0] + # Selectively cache index item types. Supported values are Postings, + # ExpandedPostings and Series + # CLI flag: -blocks-storage.bucket-store.index-cache.redis.enabled-items + [enabled_items: | default = []] + chunks_cache: # Backend for chunks cache, if not empty. Supported values: memcached. # CLI flag: -blocks-storage.bucket-store.chunks-cache.backend diff --git a/go.mod b/go.mod index 9716fb4769..0136803094 100644 --- a/go.mod +++ b/go.mod @@ -53,7 +53,7 @@ require ( github.com/stretchr/testify v1.8.4 github.com/thanos-io/objstore v0.0.0-20230921130928-63a603e651ed github.com/thanos-io/promql-engine v0.0.0-20230821193351-e1ae4275b96e - github.com/thanos-io/thanos v0.32.4-0.20231001083734-531cdb1e8ec3 + github.com/thanos-io/thanos v0.32.5-0.20231006043659-79bbf34b4275 github.com/uber/jaeger-client-go v2.30.0+incompatible github.com/weaveworks/common v0.0.0-20221201103051-7c2720a9024d go.etcd.io/etcd/api/v3 v3.5.9 diff --git a/go.sum b/go.sum index d901a4ad3a..caf94052e2 100644 --- a/go.sum +++ b/go.sum @@ -1212,8 +1212,8 @@ github.com/thanos-io/objstore v0.0.0-20230921130928-63a603e651ed h1:iWQdY3S6DpWj github.com/thanos-io/objstore v0.0.0-20230921130928-63a603e651ed/go.mod h1:oJ82xgcBDzGJrEgUsjlTj6n01+ZWUMMUR8BlZzX5xDE= github.com/thanos-io/promql-engine v0.0.0-20230821193351-e1ae4275b96e h1:kwsFCU8eSkZehbrAN3nXPw5RdMHi/Bok/y8l2C4M+gk= github.com/thanos-io/promql-engine v0.0.0-20230821193351-e1ae4275b96e/go.mod h1:+T/ZYNCGybT6eTsGGvVtGb63nT1cvUmH6MjqRrcQoKw= -github.com/thanos-io/thanos v0.32.4-0.20231001083734-531cdb1e8ec3 h1:ekD3P1XF0Hlg/u7rSNqdyLhwYE4W4RGfkMDudtepRL8= -github.com/thanos-io/thanos v0.32.4-0.20231001083734-531cdb1e8ec3/go.mod h1:Px5Boq60s+2WwR+V4v4oxgmxfw9WHrwMwjRou6pkUNw= +github.com/thanos-io/thanos v0.32.5-0.20231006043659-79bbf34b4275 h1:y2YPqM1XiBw7EhLg45F6A1g8bgt4yYxkaRAeQaNLWYk= +github.com/thanos-io/thanos v0.32.5-0.20231006043659-79bbf34b4275/go.mod h1:HwiHn7u6GeES403BTACOYib/JKAJknf8dByU/uJiEr0= github.com/themihai/gomemcache v0.0.0-20180902122335-24332e2d58ab h1:7ZR3hmisBWw77ZpO1/o86g+JV3VKlk3d48jopJxzTjU= github.com/themihai/gomemcache v0.0.0-20180902122335-24332e2d58ab/go.mod h1:eheTFp954zcWZXCU8d0AT76ftsQOTo4DTqkN/h3k1MY= github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= diff --git a/pkg/storage/tsdb/index_cache.go b/pkg/storage/tsdb/index_cache.go index f9a51580b3..796f5a291a 100644 --- a/pkg/storage/tsdb/index_cache.go +++ b/pkg/storage/tsdb/index_cache.go @@ -14,6 +14,7 @@ import ( storecache "github.com/thanos-io/thanos/pkg/store/cache" "github.com/cortexproject/cortex/pkg/util" + "github.com/cortexproject/cortex/pkg/util/flagext" ) const ( @@ -41,10 +42,10 @@ var ( ) type IndexCacheConfig struct { - Backend string `yaml:"backend"` - InMemory InMemoryIndexCacheConfig `yaml:"inmemory"` - Memcached MemcachedClientConfig `yaml:"memcached"` - Redis RedisClientConfig `yaml:"redis"` + Backend string `yaml:"backend"` + InMemory InMemoryIndexCacheConfig `yaml:"inmemory"` + Memcached MemcachedIndexCacheConfig `yaml:"memcached"` + Redis RedisIndexCacheConfig `yaml:"redis"` } func (cfg *IndexCacheConfig) RegisterFlags(f *flag.FlagSet) { @@ -85,6 +86,10 @@ func (cfg *IndexCacheConfig) Validate() error { if err := cfg.Redis.Validate(); err != nil { return err } + } else { + if err := cfg.InMemory.Validate(); err != nil { + return err + } } configuredBackends[backend] = struct{}{} @@ -94,17 +99,63 @@ func (cfg *IndexCacheConfig) Validate() error { } type InMemoryIndexCacheConfig struct { - MaxSizeBytes uint64 `yaml:"max_size_bytes"` + MaxSizeBytes uint64 `yaml:"max_size_bytes"` + EnabledItems []string `yaml:"enabled_items"` +} + +func (cfg *InMemoryIndexCacheConfig) Validate() error { + if err := storecache.ValidateEnabledItems(cfg.EnabledItems); err != nil { + return err + } + return nil } func (cfg *InMemoryIndexCacheConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix string) { f.Uint64Var(&cfg.MaxSizeBytes, prefix+"max-size-bytes", uint64(1*units.Gibibyte), "Maximum size in bytes of in-memory index cache used to speed up blocks index lookups (shared between all tenants).") + f.Var((*flagext.StringSlice)(&cfg.EnabledItems), prefix+"enabled-items", "Selectively cache index item types. Supported values are Postings, ExpandedPostings and Series") +} + +type MemcachedIndexCacheConfig struct { + ClientConfig MemcachedClientConfig `yaml:",inline"` + EnabledItems []string `yaml:"enabled_items"` +} + +func (cfg *MemcachedIndexCacheConfig) Validate() error { + if err := cfg.ClientConfig.Validate(); err != nil { + return err + } + return storecache.ValidateEnabledItems(cfg.EnabledItems) +} + +func (cfg *MemcachedIndexCacheConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix string) { + cfg.ClientConfig.RegisterFlagsWithPrefix(f, prefix) + f.Var((*flagext.StringSlice)(&cfg.EnabledItems), prefix+"enabled-items", "Selectively cache index item types. Supported values are Postings, ExpandedPostings and Series") +} + +type RedisIndexCacheConfig struct { + ClientConfig RedisClientConfig `yaml:",inline"` + EnabledItems []string `yaml:"enabled_items"` +} + +func (cfg *RedisIndexCacheConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix string) { + cfg.ClientConfig.RegisterFlagsWithPrefix(f, prefix) + f.Var((*flagext.StringSlice)(&cfg.EnabledItems), prefix+"enabled-items", "Selectively cache index item types. Supported values are Postings, ExpandedPostings and Series") +} + +func (cfg *RedisIndexCacheConfig) Validate() error { + if err := cfg.ClientConfig.Validate(); err != nil { + return err + } + return storecache.ValidateEnabledItems(cfg.EnabledItems) } // NewIndexCache creates a new index cache based on the input configuration. func NewIndexCache(cfg IndexCacheConfig, logger log.Logger, registerer prometheus.Registerer) (storecache.IndexCache, error) { splitBackends := strings.Split(cfg.Backend, ",") - var caches []storecache.IndexCache + var ( + caches []storecache.IndexCache + enabledItems []string + ) for i, backend := range splitBackends { iReg := registerer @@ -121,8 +172,9 @@ func NewIndexCache(cfg IndexCacheConfig, logger log.Logger, registerer prometheu return c, err } caches = append(caches, c) + enabledItems = cfg.InMemory.EnabledItems case IndexCacheBackendMemcached: - c, err := newMemcachedIndexCacheClient(cfg.Memcached, logger, registerer) + c, err := newMemcachedIndexCacheClient(cfg.Memcached.ClientConfig, logger, registerer) if err != nil { return nil, err } @@ -131,8 +183,9 @@ func NewIndexCache(cfg IndexCacheConfig, logger log.Logger, registerer prometheu return nil, err } caches = append(caches, cache) + enabledItems = cfg.Memcached.EnabledItems case IndexCacheBackendRedis: - c, err := newRedisIndexCacheClient(cfg.Redis, logger, iReg) + c, err := newRedisIndexCacheClient(cfg.Redis.ClientConfig, logger, iReg) if err != nil { return nil, err } @@ -141,9 +194,15 @@ func NewIndexCache(cfg IndexCacheConfig, logger log.Logger, registerer prometheu return nil, err } caches = append(caches, cache) + enabledItems = cfg.Redis.EnabledItems default: return nil, errUnsupportedIndexCacheBackend } + if len(enabledItems) > 0 { + latestCache := caches[len(caches)-1] + cache := storecache.NewFilteredIndexCache(latestCache, enabledItems) + caches[len(caches)-1] = cache + } } return newMultiLevelCache(caches...), nil diff --git a/pkg/storage/tsdb/index_cache_test.go b/pkg/storage/tsdb/index_cache_test.go index c5617f04d9..b0112c9c18 100644 --- a/pkg/storage/tsdb/index_cache_test.go +++ b/pkg/storage/tsdb/index_cache_test.go @@ -1,6 +1,7 @@ package tsdb import ( + "fmt" "testing" "github.com/stretchr/testify/assert" @@ -35,11 +36,46 @@ func TestIndexCacheConfig_Validate(t *testing.T) { "one memcached address should pass": { cfg: IndexCacheConfig{ Backend: "memcached", - Memcached: MemcachedClientConfig{ - Addresses: "dns+localhost:11211", + Memcached: MemcachedIndexCacheConfig{ + ClientConfig: MemcachedClientConfig{ + Addresses: "dns+localhost:11211", + }, }, }, }, + "invalid enabled items memcached": { + cfg: IndexCacheConfig{ + Backend: "memcached", + Memcached: MemcachedIndexCacheConfig{ + ClientConfig: MemcachedClientConfig{ + Addresses: "dns+localhost:11211", + }, + EnabledItems: []string{"foo", "bar"}, + }, + }, + expected: fmt.Errorf("unsupported item type foo"), + }, + "invalid enabled items inmemory": { + cfg: IndexCacheConfig{ + Backend: "inmemory", + InMemory: InMemoryIndexCacheConfig{ + EnabledItems: []string{"foo", "bar"}, + }, + }, + expected: fmt.Errorf("unsupported item type foo"), + }, + "invalid enabled items redis": { + cfg: IndexCacheConfig{ + Backend: "redis", + Redis: RedisIndexCacheConfig{ + ClientConfig: RedisClientConfig{ + Addresses: "test", + }, + EnabledItems: []string{"foo", "bar"}, + }, + }, + expected: fmt.Errorf("unsupported item type foo"), + }, } for testName, testData := range tests { diff --git a/pkg/storage/tsdb/multilevel_cache.go b/pkg/storage/tsdb/multilevel_cache.go index 5283eedd66..032ae745a8 100644 --- a/pkg/storage/tsdb/multilevel_cache.go +++ b/pkg/storage/tsdb/multilevel_cache.go @@ -14,26 +14,26 @@ type multiLevelCache struct { caches []storecache.IndexCache } -func (m *multiLevelCache) StorePostings(blockID ulid.ULID, l labels.Label, v []byte) { +func (m *multiLevelCache) StorePostings(blockID ulid.ULID, l labels.Label, v []byte, tenant string) { wg := sync.WaitGroup{} wg.Add(len(m.caches)) for _, c := range m.caches { cache := c go func() { defer wg.Done() - cache.StorePostings(blockID, l, v) + cache.StorePostings(blockID, l, v, tenant) }() } wg.Wait() } -func (m *multiLevelCache) FetchMultiPostings(ctx context.Context, blockID ulid.ULID, keys []labels.Label) (hits map[labels.Label][]byte, misses []labels.Label) { +func (m *multiLevelCache) FetchMultiPostings(ctx context.Context, blockID ulid.ULID, keys []labels.Label, tenant string) (hits map[labels.Label][]byte, misses []labels.Label) { misses = keys hits = map[labels.Label][]byte{} backfillMap := map[storecache.IndexCache][]map[labels.Label][]byte{} for i, c := range m.caches { backfillMap[c] = []map[labels.Label][]byte{} - h, mi := c.FetchMultiPostings(ctx, blockID, misses) + h, mi := c.FetchMultiPostings(ctx, blockID, misses, tenant) misses = mi for label, bytes := range h { @@ -53,7 +53,7 @@ func (m *multiLevelCache) FetchMultiPostings(ctx context.Context, blockID ulid.U for cache, hit := range backfillMap { for _, values := range hit { for l, b := range values { - cache.StorePostings(blockID, l, b) + cache.StorePostings(blockID, l, b, tenant) } } } @@ -62,24 +62,24 @@ func (m *multiLevelCache) FetchMultiPostings(ctx context.Context, blockID ulid.U return hits, misses } -func (m *multiLevelCache) StoreExpandedPostings(blockID ulid.ULID, matchers []*labels.Matcher, v []byte) { +func (m *multiLevelCache) StoreExpandedPostings(blockID ulid.ULID, matchers []*labels.Matcher, v []byte, tenant string) { wg := sync.WaitGroup{} wg.Add(len(m.caches)) for _, c := range m.caches { cache := c go func() { defer wg.Done() - cache.StoreExpandedPostings(blockID, matchers, v) + cache.StoreExpandedPostings(blockID, matchers, v, tenant) }() } wg.Wait() } -func (m *multiLevelCache) FetchExpandedPostings(ctx context.Context, blockID ulid.ULID, matchers []*labels.Matcher) ([]byte, bool) { +func (m *multiLevelCache) FetchExpandedPostings(ctx context.Context, blockID ulid.ULID, matchers []*labels.Matcher, tenant string) ([]byte, bool) { for i, c := range m.caches { - if d, h := c.FetchExpandedPostings(ctx, blockID, matchers); h { + if d, h := c.FetchExpandedPostings(ctx, blockID, matchers, tenant); h { if i > 0 { - m.caches[i-1].StoreExpandedPostings(blockID, matchers, d) + m.caches[i-1].StoreExpandedPostings(blockID, matchers, d, tenant) } return d, h } @@ -88,27 +88,27 @@ func (m *multiLevelCache) FetchExpandedPostings(ctx context.Context, blockID uli return []byte{}, false } -func (m *multiLevelCache) StoreSeries(blockID ulid.ULID, id storage.SeriesRef, v []byte) { +func (m *multiLevelCache) StoreSeries(blockID ulid.ULID, id storage.SeriesRef, v []byte, tenant string) { wg := sync.WaitGroup{} wg.Add(len(m.caches)) for _, c := range m.caches { cache := c go func() { defer wg.Done() - cache.StoreSeries(blockID, id, v) + cache.StoreSeries(blockID, id, v, tenant) }() } wg.Wait() } -func (m *multiLevelCache) FetchMultiSeries(ctx context.Context, blockID ulid.ULID, ids []storage.SeriesRef) (hits map[storage.SeriesRef][]byte, misses []storage.SeriesRef) { +func (m *multiLevelCache) FetchMultiSeries(ctx context.Context, blockID ulid.ULID, ids []storage.SeriesRef, tenant string) (hits map[storage.SeriesRef][]byte, misses []storage.SeriesRef) { misses = ids hits = map[storage.SeriesRef][]byte{} backfillMap := map[storecache.IndexCache][]map[storage.SeriesRef][]byte{} for i, c := range m.caches { backfillMap[c] = []map[storage.SeriesRef][]byte{} - h, miss := c.FetchMultiSeries(ctx, blockID, misses) + h, miss := c.FetchMultiSeries(ctx, blockID, misses, tenant) misses = miss for label, bytes := range h { @@ -128,7 +128,7 @@ func (m *multiLevelCache) FetchMultiSeries(ctx context.Context, blockID ulid.ULI for cache, hit := range backfillMap { for _, values := range hit { for m, b := range values { - cache.StoreSeries(blockID, m, b) + cache.StoreSeries(blockID, m, b, tenant) } } } diff --git a/pkg/storage/tsdb/multilevel_cache_test.go b/pkg/storage/tsdb/multilevel_cache_test.go index 749da9da5b..93956d8063 100644 --- a/pkg/storage/tsdb/multilevel_cache_test.go +++ b/pkg/storage/tsdb/multilevel_cache_test.go @@ -37,8 +37,10 @@ func Test_MultiIndexCacheInstantiation(t *testing.T) { "instantiate multiples backends - inmemory/redis": { cfg: IndexCacheConfig{ Backend: "inmemory,redis", - Redis: RedisClientConfig{ - Addresses: s.Addr(), + Redis: RedisIndexCacheConfig{ + ClientConfig: RedisClientConfig{ + Addresses: s.Addr(), + }, }, }, expectedType: newMultiLevelCache(), @@ -46,9 +48,11 @@ func Test_MultiIndexCacheInstantiation(t *testing.T) { "instantiate multiples backends - inmemory/memcached": { cfg: IndexCacheConfig{ Backend: "inmemory,memcached", - Memcached: MemcachedClientConfig{ - Addresses: s.Addr(), - MaxAsyncConcurrency: 1000, + Memcached: MemcachedIndexCacheConfig{ + ClientConfig: MemcachedClientConfig{ + Addresses: s.Addr(), + MaxAsyncConcurrency: 1000, + }, }, }, expectedType: newMultiLevelCache(), @@ -112,7 +116,7 @@ func Test_MultiLevelCache(t *testing.T) { "StorePostings": {{bID, l1, v}}, }, call: func(cache storecache.IndexCache) { - cache.StorePostings(bID, l1, v) + cache.StorePostings(bID, l1, v, "") }, }, "[StoreSeries] Should store on all caches": { @@ -123,7 +127,7 @@ func Test_MultiLevelCache(t *testing.T) { "StoreSeries": {{bID, storage.SeriesRef(1), v}}, }, call: func(cache storecache.IndexCache) { - cache.StoreSeries(bID, 1, v) + cache.StoreSeries(bID, 1, v, "") }, }, "[StoreExpandedPostings] Should store on all caches": { @@ -134,7 +138,7 @@ func Test_MultiLevelCache(t *testing.T) { "StoreExpandedPostings": {{bID, []*labels.Matcher{matcher}, v}}, }, call: func(cache storecache.IndexCache) { - cache.StoreExpandedPostings(bID, []*labels.Matcher{matcher}, v) + cache.StoreExpandedPostings(bID, []*labels.Matcher{matcher}, v, "") }, }, "[FetchMultiPostings] Should fallback when all misses": { @@ -145,7 +149,7 @@ func Test_MultiLevelCache(t *testing.T) { "FetchMultiPostings": {{bID, []labels.Label{l1, l2}}}, }, call: func(cache storecache.IndexCache) { - cache.FetchMultiPostings(ctx, bID, []labels.Label{l1, l2}) + cache.FetchMultiPostings(ctx, bID, []labels.Label{l1, l2}, "") }, }, "[FetchMultiPostings] should fallback and backfill only the missing keys on l1": { @@ -163,7 +167,7 @@ func Test_MultiLevelCache(t *testing.T) { "FetchMultiPostings": {map[labels.Label][]byte{l2: v}, []labels.Label{}}, }, call: func(cache storecache.IndexCache) { - cache.FetchMultiPostings(ctx, bID, []labels.Label{l1, l2}) + cache.FetchMultiPostings(ctx, bID, []labels.Label{l1, l2}, "") }, }, "[FetchMultiPostings] should not fallback when all hit on l1": { @@ -175,7 +179,7 @@ func Test_MultiLevelCache(t *testing.T) { "FetchMultiPostings": {map[labels.Label][]byte{l1: make([]byte, 1), l2: make([]byte, 1)}, []labels.Label{}}, }, call: func(cache storecache.IndexCache) { - cache.FetchMultiPostings(ctx, bID, []labels.Label{l1, l2}) + cache.FetchMultiPostings(ctx, bID, []labels.Label{l1, l2}, "") }, }, "[FetchMultiSeries] Should fallback when all misses": { @@ -186,7 +190,7 @@ func Test_MultiLevelCache(t *testing.T) { "FetchMultiSeries": {{bID, []storage.SeriesRef{1, 2}}}, }, call: func(cache storecache.IndexCache) { - cache.FetchMultiSeries(ctx, bID, []storage.SeriesRef{1, 2}) + cache.FetchMultiSeries(ctx, bID, []storage.SeriesRef{1, 2}, "") }, }, "[FetchMultiSeries] should fallback and backfill only the missing keys on l1": { @@ -204,7 +208,7 @@ func Test_MultiLevelCache(t *testing.T) { "FetchMultiSeries": {map[storage.SeriesRef][]byte{2: v}, []storage.SeriesRef{2}}, }, call: func(cache storecache.IndexCache) { - cache.FetchMultiSeries(ctx, bID, []storage.SeriesRef{1, 2}) + cache.FetchMultiSeries(ctx, bID, []storage.SeriesRef{1, 2}, "") }, }, "[FetchMultiSeries] should not fallback when all hit on l1": { @@ -216,7 +220,7 @@ func Test_MultiLevelCache(t *testing.T) { "FetchMultiSeries": {map[storage.SeriesRef][]byte{1: make([]byte, 1), 2: make([]byte, 1)}, []storage.SeriesRef{}}, }, call: func(cache storecache.IndexCache) { - cache.FetchMultiSeries(ctx, bID, []storage.SeriesRef{1, 2}) + cache.FetchMultiSeries(ctx, bID, []storage.SeriesRef{1, 2}, "") }, }, "[FetchExpandedPostings] Should fallback and backfill when miss": { @@ -231,7 +235,7 @@ func Test_MultiLevelCache(t *testing.T) { "FetchExpandedPostings": {v, true}, }, call: func(cache storecache.IndexCache) { - cache.FetchExpandedPostings(ctx, bID, []*labels.Matcher{matcher}) + cache.FetchExpandedPostings(ctx, bID, []*labels.Matcher{matcher}, "") }, }, "[FetchExpandedPostings] should not fallback when all hit on l1": { @@ -243,7 +247,7 @@ func Test_MultiLevelCache(t *testing.T) { "FetchExpandedPostings": {[]byte{}, true}, }, call: func(cache storecache.IndexCache) { - cache.FetchExpandedPostings(ctx, bID, []*labels.Matcher{matcher}) + cache.FetchExpandedPostings(ctx, bID, []*labels.Matcher{matcher}, "") }, }, } @@ -272,11 +276,11 @@ type mockIndexCache struct { mockedCalls map[string][]interface{} } -func (m *mockIndexCache) StorePostings(blockID ulid.ULID, l labels.Label, v []byte) { +func (m *mockIndexCache) StorePostings(blockID ulid.ULID, l labels.Label, v []byte, tenant string) { m.calls["StorePostings"] = append(m.calls["StorePostings"], []interface{}{blockID, l, v}) } -func (m *mockIndexCache) FetchMultiPostings(_ context.Context, blockID ulid.ULID, keys []labels.Label) (hits map[labels.Label][]byte, misses []labels.Label) { +func (m *mockIndexCache) FetchMultiPostings(_ context.Context, blockID ulid.ULID, keys []labels.Label, tenant string) (hits map[labels.Label][]byte, misses []labels.Label) { m.calls["FetchMultiPostings"] = append(m.calls["FetchMultiPostings"], []interface{}{blockID, keys}) if m, ok := m.mockedCalls["FetchMultiPostings"]; ok { return m[0].(map[labels.Label][]byte), m[1].([]labels.Label) @@ -285,11 +289,11 @@ func (m *mockIndexCache) FetchMultiPostings(_ context.Context, blockID ulid.ULID return map[labels.Label][]byte{}, keys } -func (m *mockIndexCache) StoreExpandedPostings(blockID ulid.ULID, matchers []*labels.Matcher, v []byte) { +func (m *mockIndexCache) StoreExpandedPostings(blockID ulid.ULID, matchers []*labels.Matcher, v []byte, tenant string) { m.calls["StoreExpandedPostings"] = append(m.calls["StoreExpandedPostings"], []interface{}{blockID, matchers, v}) } -func (m *mockIndexCache) FetchExpandedPostings(_ context.Context, blockID ulid.ULID, matchers []*labels.Matcher) ([]byte, bool) { +func (m *mockIndexCache) FetchExpandedPostings(_ context.Context, blockID ulid.ULID, matchers []*labels.Matcher, tenant string) ([]byte, bool) { m.calls["FetchExpandedPostings"] = append(m.calls["FetchExpandedPostings"], []interface{}{blockID, matchers}) if m, ok := m.mockedCalls["FetchExpandedPostings"]; ok { return m[0].([]byte), m[1].(bool) @@ -298,11 +302,11 @@ func (m *mockIndexCache) FetchExpandedPostings(_ context.Context, blockID ulid.U return []byte{}, false } -func (m *mockIndexCache) StoreSeries(blockID ulid.ULID, id storage.SeriesRef, v []byte) { +func (m *mockIndexCache) StoreSeries(blockID ulid.ULID, id storage.SeriesRef, v []byte, tenant string) { m.calls["StoreSeries"] = append(m.calls["StoreSeries"], []interface{}{blockID, id, v}) } -func (m *mockIndexCache) FetchMultiSeries(_ context.Context, blockID ulid.ULID, ids []storage.SeriesRef) (hits map[storage.SeriesRef][]byte, misses []storage.SeriesRef) { +func (m *mockIndexCache) FetchMultiSeries(_ context.Context, blockID ulid.ULID, ids []storage.SeriesRef, tenant string) (hits map[storage.SeriesRef][]byte, misses []storage.SeriesRef) { m.calls["FetchMultiSeries"] = append(m.calls["FetchMultiSeries"], []interface{}{blockID, ids}) if m, ok := m.mockedCalls["FetchMultiSeries"]; ok { return m[0].(map[storage.SeriesRef][]byte), m[1].([]storage.SeriesRef) diff --git a/vendor/github.com/thanos-io/thanos/pkg/block/indexheader/binary_reader.go b/vendor/github.com/thanos-io/thanos/pkg/block/indexheader/binary_reader.go index 7dbed1bec2..02c062337e 100644 --- a/vendor/github.com/thanos-io/thanos/pkg/block/indexheader/binary_reader.go +++ b/vendor/github.com/thanos-io/thanos/pkg/block/indexheader/binary_reader.go @@ -74,7 +74,12 @@ type BinaryTOC struct { // WriteBinary build index header from the pieces of index in object storage, and cached in file if necessary. func WriteBinary(ctx context.Context, bkt objstore.BucketReader, id ulid.ULID, filename string) ([]byte, error) { - ir, indexVersion, err := newChunkedIndexReader(ctx, bkt, id) + var tmpDir = "" + if filename != "" { + tmpDir = filepath.Dir(filename) + } + parallelBucket := WrapWithParallel(bkt, tmpDir) + ir, indexVersion, err := newChunkedIndexReader(ctx, parallelBucket, id) if err != nil { return nil, errors.Wrap(err, "new index reader") } diff --git a/vendor/github.com/thanos-io/thanos/pkg/block/indexheader/parallel_bucket.go b/vendor/github.com/thanos-io/thanos/pkg/block/indexheader/parallel_bucket.go new file mode 100644 index 0000000000..6e52022d72 --- /dev/null +++ b/vendor/github.com/thanos-io/thanos/pkg/block/indexheader/parallel_bucket.go @@ -0,0 +1,231 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package indexheader + +import ( + "bufio" + "bytes" + "context" + "fmt" + "io" + "os" + "path/filepath" + + "github.com/google/uuid" + "github.com/pkg/errors" + "github.com/prometheus/prometheus/tsdb/fileutil" + "github.com/thanos-io/objstore" + "github.com/thanos-io/thanos/pkg/runutil" + "golang.org/x/sync/errgroup" +) + +// partitionSize is used for splitting range reads. +const partitionSize = 16 * 1024 * 1024 // 16 MiB + +type parallelBucketReader struct { + objstore.BucketReader + tmpDir string + partitionSize int64 +} + +func WrapWithParallel(b objstore.BucketReader, tmpDir string) objstore.BucketReader { + return ¶llelBucketReader{ + BucketReader: b, + tmpDir: tmpDir, + partitionSize: partitionSize, + } +} + +// GetRange reads the range in parallel. +func (b *parallelBucketReader) GetRange(ctx context.Context, name string, off int64, length int64) (io.ReadCloser, error) { + partFilePrefix := uuid.New().String() + g, gctx := errgroup.WithContext(ctx) + + numParts := length / b.partitionSize + if length%b.partitionSize > 0 { + // A partial partition is remaining + numParts += 1 + } + + parts := make([]Part, 0, numParts) + + partId := 0 + for o := off; o < off+length; o += b.partitionSize { + l := b.partitionSize + if o+l > off+length { + // Partial partition + l = length - (int64(partId) * b.partitionSize) + } + + partOff := o + partLength := l + part, err := b.createPart(partFilePrefix, partId, int(partLength)) + if err != nil { + return nil, err + } + parts = append(parts, part) + + g.Go(func() error { + rc, err := b.BucketReader.GetRange(gctx, name, partOff, partLength) + defer runutil.CloseWithErrCapture(&err, rc, "close object") + if err != nil { + return errors.Wrap(err, fmt.Sprintf("get range part %v", partId)) + } + if _, err := io.Copy(part, rc); err != nil { + return errors.Wrap(err, fmt.Sprintf("get range part %v", partId)) + } + return part.Flush() + }) + partId += 1 + } + + if err := g.Wait(); err != nil { + return nil, err + } + return newPartMerger(parts), nil +} + +func (b *parallelBucketReader) createPart(partFilePrefix string, partId int, size int) (Part, error) { + if b.tmpDir == "" { + // Parts stored in memory + return newPartBuffer(size), nil + } + + partName := fmt.Sprintf("%s.part-%d", partFilePrefix, partId) + filename := filepath.Join(b.tmpDir, partName) + return newPartFile(filename) +} + +type partMerger struct { + closers []io.Closer + multiReader io.Reader +} + +func newPartMerger(parts []Part) *partMerger { + readers := make([]io.Reader, 0, len(parts)) + closers := make([]io.Closer, 0, len(parts)) + for _, p := range parts { + readers = append(readers, p.(io.Reader)) + closers = append(closers, p.(io.Closer)) + } + return &partMerger{ + closers: closers, + multiReader: io.MultiReader(readers...), + } +} + +func (m *partMerger) Read(b []byte) (n int, err error) { + n, err = m.multiReader.Read(b) + return +} + +func (m *partMerger) Close() (err error) { + var firstErr error = nil + for _, c := range m.closers { + if err := c.Close(); err != nil { + if firstErr == nil { + firstErr = err + } + } + } + return firstErr +} + +type Part interface { + Read(buf []byte) (int, error) + Write(buf []byte) (int, error) + Flush() error +} + +// partFile stores parts in temporary files. +type partFile struct { + file *os.File + fileWriter *bufio.Writer + fileReader *bufio.Reader +} + +func newPartFile(filename string) (*partFile, error) { + dir := filepath.Dir(filename) + df, err := fileutil.OpenDir(dir) + if os.IsNotExist(err) { + if err := os.MkdirAll(dir, os.ModePerm); err != nil { + return nil, errors.Wrap(err, "create temp dir") + } + df, err = fileutil.OpenDir(dir) + } + if err != nil { + return nil, errors.Wrap(err, "open temp dir") + } + + if err := df.Sync(); err != nil { + return nil, errors.Wrap(err, "sync dir") + } + + if err := os.RemoveAll(filename); err != nil { + return nil, errors.Wrap(err, "remove existing file") + } + f, err := os.OpenFile(filepath.Clean(filename), os.O_CREATE|os.O_RDWR, 0600) + if err != nil { + return nil, errors.Wrap(err, "open temp file") + } + return &partFile{ + file: f, + fileWriter: bufio.NewWriterSize(f, 32*1024), + fileReader: bufio.NewReaderSize(f, 32*1024), + }, nil +} + +func (p *partFile) Close() error { + if err := p.file.Close(); err != nil { + return err + } + return os.Remove(p.file.Name()) +} + +func (p *partFile) Flush() error { + if err := p.fileWriter.Flush(); err != nil { + return err + } + if err := p.file.Sync(); err != nil { + return err + } + // Seek is necessary because the part was just written to. + _, err := p.file.Seek(0, io.SeekStart) + return err +} + +func (p *partFile) Read(buf []byte) (int, error) { + return p.fileReader.Read(buf) +} + +func (p *partFile) Write(buf []byte) (int, error) { + return p.fileWriter.Write(buf) +} + +// partBuffer stores parts in memory. +type partBuffer struct { + buf *bytes.Buffer +} + +func newPartBuffer(size int) *partBuffer { + return &partBuffer{ + buf: bytes.NewBuffer(make([]byte, 0, size)), + } +} + +func (p *partBuffer) Close() error { + return nil +} + +func (p *partBuffer) Read(b []byte) (int, error) { + return p.buf.Read(b) +} + +func (p *partBuffer) Write(b []byte) (int, error) { + return p.buf.Write(b) +} + +func (p *partBuffer) Flush() error { + return nil +} diff --git a/vendor/github.com/thanos-io/thanos/pkg/promclient/promclient.go b/vendor/github.com/thanos-io/thanos/pkg/promclient/promclient.go index f564fade72..f06e9ce54e 100644 --- a/vendor/github.com/thanos-io/thanos/pkg/promclient/promclient.go +++ b/vendor/github.com/thanos-io/thanos/pkg/promclient/promclient.go @@ -105,7 +105,7 @@ func NewWithTracingClient(logger log.Logger, httpClient *http.Client, userAgent // req2xx sends a request to the given url.URL. If method is http.MethodPost then // the raw query is encoded in the body and the appropriate Content-Type is set. -func (c *Client) req2xx(ctx context.Context, u *url.URL, method string) (_ []byte, _ int, err error) { +func (c *Client) req2xx(ctx context.Context, u *url.URL, method string, headers http.Header) (_ []byte, _ int, err error) { var b io.Reader if method == http.MethodPost { rq := u.RawQuery @@ -117,6 +117,10 @@ func (c *Client) req2xx(ctx context.Context, u *url.URL, method string) (_ []byt if err != nil { return nil, 0, errors.Wrapf(err, "create %s request", method) } + if headers != nil { + req.Header = headers + } + if c.userAgent != "" { req.Header.Set("User-Agent", c.userAgent) } @@ -166,7 +170,7 @@ func (c *Client) ExternalLabels(ctx context.Context, base *url.URL) (labels.Labe span, ctx := tracing.StartSpan(ctx, "/prom_config HTTP[client]") defer span.Finish() - body, _, err := c.req2xx(ctx, &u, http.MethodGet) + body, _, err := c.req2xx(ctx, &u, http.MethodGet, nil) if err != nil { return nil, err } @@ -363,6 +367,7 @@ type QueryOptions struct { MaxSourceResolution string Engine string Explain bool + HTTPHeaders http.Header } func (p *QueryOptions) AddTo(values url.Values) error { @@ -423,7 +428,7 @@ func (c *Client) QueryInstant(ctx context.Context, base *url.URL, query string, method = http.MethodGet } - body, _, err := c.req2xx(ctx, &u, method) + body, _, err := c.req2xx(ctx, &u, method, opts.HTTPHeaders) if err != nil { return nil, nil, nil, errors.Wrap(err, "read query instant response") } @@ -529,7 +534,7 @@ func (c *Client) QueryRange(ctx context.Context, base *url.URL, query string, st span, ctx := tracing.StartSpan(ctx, "/prom_query_range HTTP[client]") defer span.Finish() - body, _, err := c.req2xx(ctx, &u, http.MethodGet) + body, _, err := c.req2xx(ctx, &u, http.MethodGet, opts.HTTPHeaders) if err != nil { return nil, nil, nil, errors.Wrap(err, "read query range response") } @@ -612,7 +617,7 @@ func (c *Client) AlertmanagerAlerts(ctx context.Context, base *url.URL) ([]*mode span, ctx := tracing.StartSpan(ctx, "/alertmanager_alerts HTTP[client]") defer span.Finish() - body, _, err := c.req2xx(ctx, &u, http.MethodGet) + body, _, err := c.req2xx(ctx, &u, http.MethodGet, nil) if err != nil { return nil, err } @@ -643,7 +648,7 @@ func (c *Client) BuildVersion(ctx context.Context, base *url.URL) (string, error defer span.Finish() // We get status code 404 or 405 for prometheus versions lower than 2.14.0 - body, code, err := c.req2xx(ctx, &u, http.MethodGet) + body, code, err := c.req2xx(ctx, &u, http.MethodGet, nil) if err != nil { if code == http.StatusNotFound { return "0", nil @@ -675,7 +680,7 @@ func (c *Client) get2xxResultWithGRPCErrors(ctx context.Context, spanName string span, ctx := tracing.StartSpan(ctx, spanName) defer span.Finish() - body, code, err := c.req2xx(ctx, u, http.MethodGet) + body, code, err := c.req2xx(ctx, u, http.MethodGet, nil) if err != nil { if code, exists := statusToCode[code]; exists && code != 0 { return status.Error(code, err.Error()) diff --git a/vendor/github.com/thanos-io/thanos/pkg/store/bucket.go b/vendor/github.com/thanos-io/thanos/pkg/store/bucket.go index 2e40d010a3..01f32f4366 100644 --- a/vendor/github.com/thanos-io/thanos/pkg/store/bucket.go +++ b/vendor/github.com/thanos-io/thanos/pkg/store/bucket.go @@ -127,16 +127,16 @@ type bucketStoreMetrics struct { seriesDataFetched *prometheus.HistogramVec seriesDataSizeTouched *prometheus.HistogramVec seriesDataSizeFetched *prometheus.HistogramVec - seriesBlocksQueried prometheus.Histogram - seriesGetAllDuration prometheus.Histogram - seriesMergeDuration prometheus.Histogram - resultSeriesCount prometheus.Histogram - chunkSizeBytes prometheus.Histogram - postingsSizeBytes prometheus.Histogram + seriesBlocksQueried *prometheus.HistogramVec + seriesGetAllDuration *prometheus.HistogramVec + seriesMergeDuration *prometheus.HistogramVec + resultSeriesCount *prometheus.HistogramVec + chunkSizeBytes *prometheus.HistogramVec + postingsSizeBytes *prometheus.HistogramVec queriesDropped *prometheus.CounterVec - seriesRefetches prometheus.Counter - chunkRefetches prometheus.Counter - emptyPostingCount prometheus.Counter + seriesRefetches *prometheus.CounterVec + chunkRefetches *prometheus.CounterVec + emptyPostingCount *prometheus.CounterVec lazyExpandedPostingsCount prometheus.Counter lazyExpandedPostingSizeBytes prometheus.Counter @@ -145,18 +145,18 @@ type bucketStoreMetrics struct { cachedPostingsCompressions *prometheus.CounterVec cachedPostingsCompressionErrors *prometheus.CounterVec cachedPostingsCompressionTimeSeconds *prometheus.CounterVec - cachedPostingsOriginalSizeBytes prometheus.Counter - cachedPostingsCompressedSizeBytes prometheus.Counter + cachedPostingsOriginalSizeBytes *prometheus.CounterVec + cachedPostingsCompressedSizeBytes *prometheus.CounterVec - seriesFetchDuration prometheus.Histogram + seriesFetchDuration *prometheus.HistogramVec // Counts time for fetching series across all batches. - seriesFetchDurationSum prometheus.Histogram - postingsFetchDuration prometheus.Histogram + seriesFetchDurationSum *prometheus.HistogramVec + postingsFetchDuration *prometheus.HistogramVec // chunkFetchDuration counts total time loading chunks, but since we spawn // multiple goroutines the actual latency is usually much lower than it. - chunkFetchDuration prometheus.Histogram + chunkFetchDuration *prometheus.HistogramVec // Actual absolute total time for loading chunks. - chunkFetchDurationSum prometheus.Histogram + chunkFetchDurationSum *prometheus.HistogramVec } func newBucketStoreMetrics(reg prometheus.Registerer) *bucketStoreMetrics { @@ -196,138 +196,138 @@ func newBucketStoreMetrics(reg prometheus.Registerer) *bucketStoreMetrics { Name: "thanos_bucket_store_series_data_touched", Help: "Number of items of a data type touched to fulfill a single Store API series request.", Buckets: prometheus.ExponentialBuckets(200, 2, 15), - }, []string{"data_type"}) + }, []string{"data_type", tenancy.MetricLabel}) m.seriesDataFetched = promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{ Name: "thanos_bucket_store_series_data_fetched", Help: "Number of items of a data type retrieved to fulfill a single Store API series request.", Buckets: prometheus.ExponentialBuckets(200, 2, 15), - }, []string{"data_type"}) + }, []string{"data_type", tenancy.MetricLabel}) m.seriesDataSizeTouched = promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{ Name: "thanos_bucket_store_series_data_size_touched_bytes", Help: "Total size of items of a data type touched to fulfill a single Store API series request in Bytes.", Buckets: prometheus.ExponentialBuckets(1024, 2, 15), - }, []string{"data_type"}) + }, []string{"data_type", tenancy.MetricLabel}) m.seriesDataSizeFetched = promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{ Name: "thanos_bucket_store_series_data_size_fetched_bytes", Help: "Total size of items of a data type fetched to fulfill a single Store API series request in Bytes.", Buckets: prometheus.ExponentialBuckets(1024, 2, 15), - }, []string{"data_type"}) + }, []string{"data_type", tenancy.MetricLabel}) - m.seriesBlocksQueried = promauto.With(reg).NewHistogram(prometheus.HistogramOpts{ + m.seriesBlocksQueried = promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{ Name: "thanos_bucket_store_series_blocks_queried", Help: "Number of blocks in a bucket store that were touched to satisfy a query.", Buckets: prometheus.ExponentialBuckets(1, 2, 10), - }) - m.seriesGetAllDuration = promauto.With(reg).NewHistogram(prometheus.HistogramOpts{ + }, []string{tenancy.MetricLabel}) + m.seriesGetAllDuration = promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{ Name: "thanos_bucket_store_series_get_all_duration_seconds", Help: "Time it takes until all per-block prepares and loads for a query are finished.", Buckets: []float64{0.001, 0.01, 0.1, 0.3, 0.6, 1, 3, 6, 9, 20, 30, 60, 90, 120}, - }) - m.seriesMergeDuration = promauto.With(reg).NewHistogram(prometheus.HistogramOpts{ + }, []string{tenancy.MetricLabel}) + m.seriesMergeDuration = promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{ Name: "thanos_bucket_store_series_merge_duration_seconds", Help: "Time it takes to merge sub-results from all queried blocks into a single result.", Buckets: []float64{0.001, 0.01, 0.1, 0.3, 0.6, 1, 3, 6, 9, 20, 30, 60, 90, 120}, - }) - m.resultSeriesCount = promauto.With(reg).NewHistogram(prometheus.HistogramOpts{ + }, []string{tenancy.MetricLabel}) + m.resultSeriesCount = promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{ Name: "thanos_bucket_store_series_result_series", Help: "Number of series observed in the final result of a query.", Buckets: prometheus.ExponentialBuckets(1, 2, 15), - }) + }, []string{tenancy.MetricLabel}) - m.chunkSizeBytes = promauto.With(reg).NewHistogram(prometheus.HistogramOpts{ + m.chunkSizeBytes = promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{ Name: "thanos_bucket_store_sent_chunk_size_bytes", Help: "Size in bytes of the chunks for the single series, which is adequate to the gRPC message size sent to querier.", Buckets: []float64{ 32, 256, 512, 1024, 32 * 1024, 256 * 1024, 512 * 1024, 1024 * 1024, 32 * 1024 * 1024, 256 * 1024 * 1024, 512 * 1024 * 1024, }, - }) + }, []string{tenancy.MetricLabel}) - m.postingsSizeBytes = promauto.With(reg).NewHistogram(prometheus.HistogramOpts{ + m.postingsSizeBytes = promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{ Name: "thanos_bucket_store_postings_size_bytes", Help: "Size in bytes of the postings for a single series call.", Buckets: []float64{ 32, 256, 512, 1024, 32 * 1024, 256 * 1024, 512 * 1024, 1024 * 1024, 32 * 1024 * 1024, 128 * 1024 * 1024, 256 * 1024 * 1024, 512 * 1024 * 1024, 768 * 1024 * 1024, 1024 * 1024 * 1024, }, - }) + }, []string{tenancy.MetricLabel}) m.queriesDropped = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ Name: "thanos_bucket_store_queries_dropped_total", Help: "Number of queries that were dropped due to the limit.", - }, []string{"reason"}) - m.seriesRefetches = promauto.With(reg).NewCounter(prometheus.CounterOpts{ + }, []string{"reason", tenancy.MetricLabel}) + m.seriesRefetches = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ Name: "thanos_bucket_store_series_refetches_total", Help: "Total number of cases where configured estimated series bytes was not enough was to fetch series from index, resulting in refetch.", - }) - m.chunkRefetches = promauto.With(reg).NewCounter(prometheus.CounterOpts{ + }, []string{tenancy.MetricLabel}) + m.chunkRefetches = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ Name: "thanos_bucket_store_chunk_refetches_total", Help: "Total number of cases where configured estimated chunk bytes was not enough was to fetch chunks from object store, resulting in refetch.", - }) + }, []string{tenancy.MetricLabel}) m.cachedPostingsCompressions = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ Name: "thanos_bucket_store_cached_postings_compressions_total", Help: "Number of postings compressions before storing to index cache.", - }, []string{"op"}) - m.cachedPostingsCompressions.WithLabelValues(labelEncode) - m.cachedPostingsCompressions.WithLabelValues(labelDecode) + }, []string{"op", tenancy.MetricLabel}) + m.cachedPostingsCompressions.WithLabelValues(labelEncode, tenancy.DefaultTenant) + m.cachedPostingsCompressions.WithLabelValues(labelDecode, tenancy.DefaultTenant) m.cachedPostingsCompressionErrors = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ Name: "thanos_bucket_store_cached_postings_compression_errors_total", Help: "Number of postings compression errors.", - }, []string{"op"}) - m.cachedPostingsCompressionErrors.WithLabelValues(labelEncode) - m.cachedPostingsCompressionErrors.WithLabelValues(labelDecode) + }, []string{"op", tenancy.MetricLabel}) + m.cachedPostingsCompressionErrors.WithLabelValues(labelEncode, tenancy.DefaultTenant) + m.cachedPostingsCompressionErrors.WithLabelValues(labelDecode, tenancy.DefaultTenant) m.cachedPostingsCompressionTimeSeconds = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ Name: "thanos_bucket_store_cached_postings_compression_time_seconds_total", Help: "Time spent compressing postings before storing them into postings cache.", - }, []string{"op"}) - m.cachedPostingsCompressionTimeSeconds.WithLabelValues(labelEncode) - m.cachedPostingsCompressionTimeSeconds.WithLabelValues(labelDecode) + }, []string{"op", tenancy.MetricLabel}) + m.cachedPostingsCompressionTimeSeconds.WithLabelValues(labelEncode, tenancy.DefaultTenant) + m.cachedPostingsCompressionTimeSeconds.WithLabelValues(labelDecode, tenancy.DefaultTenant) - m.cachedPostingsOriginalSizeBytes = promauto.With(reg).NewCounter(prometheus.CounterOpts{ + m.cachedPostingsOriginalSizeBytes = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ Name: "thanos_bucket_store_cached_postings_original_size_bytes_total", Help: "Original size of postings stored into cache.", - }) - m.cachedPostingsCompressedSizeBytes = promauto.With(reg).NewCounter(prometheus.CounterOpts{ + }, []string{tenancy.MetricLabel}) + m.cachedPostingsCompressedSizeBytes = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ Name: "thanos_bucket_store_cached_postings_compressed_size_bytes_total", Help: "Compressed size of postings stored into cache.", - }) + }, []string{tenancy.MetricLabel}) - m.seriesFetchDuration = promauto.With(reg).NewHistogram(prometheus.HistogramOpts{ + m.seriesFetchDuration = promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{ Name: "thanos_bucket_store_series_fetch_duration_seconds", Help: "The time it takes to fetch series to respond to a request sent to a store gateway. It includes both the time to fetch it from the cache and from storage in case of cache misses.", Buckets: []float64{0.001, 0.01, 0.1, 0.3, 0.6, 1, 3, 6, 9, 20, 30, 60, 90, 120}, - }) + }, []string{tenancy.MetricLabel}) - m.seriesFetchDurationSum = promauto.With(reg).NewHistogram(prometheus.HistogramOpts{ + m.seriesFetchDurationSum = promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{ Name: "thanos_bucket_store_series_fetch_duration_sum_seconds", Help: "The total time it takes to fetch series to respond to a request sent to a store gateway across all series batches. It includes both the time to fetch it from the cache and from storage in case of cache misses.", Buckets: []float64{0.001, 0.01, 0.1, 0.3, 0.6, 1, 3, 6, 9, 20, 30, 60, 90, 120}, - }) + }, []string{tenancy.MetricLabel}) - m.postingsFetchDuration = promauto.With(reg).NewHistogram(prometheus.HistogramOpts{ + m.postingsFetchDuration = promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{ Name: "thanos_bucket_store_postings_fetch_duration_seconds", Help: "The time it takes to fetch postings to respond to a request sent to a store gateway. It includes both the time to fetch it from the cache and from storage in case of cache misses.", Buckets: []float64{0.001, 0.01, 0.1, 0.3, 0.6, 1, 3, 6, 9, 20, 30, 60, 90, 120}, - }) + }, []string{tenancy.MetricLabel}) - m.chunkFetchDuration = promauto.With(reg).NewHistogram(prometheus.HistogramOpts{ + m.chunkFetchDuration = promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{ Name: "thanos_bucket_store_chunks_fetch_duration_seconds", Help: "The total time spent fetching chunks within a single request for one block.", Buckets: []float64{0.001, 0.01, 0.1, 0.3, 0.6, 1, 3, 6, 9, 20, 30, 60, 90, 120}, - }) + }, []string{tenancy.MetricLabel}) - m.chunkFetchDurationSum = promauto.With(reg).NewHistogram(prometheus.HistogramOpts{ + m.chunkFetchDurationSum = promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{ Name: "thanos_bucket_store_chunks_fetch_duration_sum_seconds", Help: "The total absolute time spent fetching chunks within a single request for one block.", Buckets: []float64{0.001, 0.01, 0.1, 0.3, 0.6, 1, 3, 6, 9, 20, 30, 60, 90, 120}, - }) + }, []string{tenancy.MetricLabel}) - m.emptyPostingCount = promauto.With(reg).NewCounter(prometheus.CounterOpts{ + m.emptyPostingCount = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ Name: "thanos_bucket_store_empty_postings_total", Help: "Total number of empty postings when fetching block series.", - }) + }, []string{tenancy.MetricLabel}) m.lazyExpandedPostingsCount = promauto.With(reg).NewCounter(prometheus.CounterOpts{ Name: "thanos_bucket_store_lazy_expanded_postings_total", @@ -423,18 +423,18 @@ func (s *BucketStore) validate() error { type noopCache struct{} -func (noopCache) StorePostings(ulid.ULID, labels.Label, []byte) {} -func (noopCache) FetchMultiPostings(_ context.Context, _ ulid.ULID, keys []labels.Label) (map[labels.Label][]byte, []labels.Label) { +func (noopCache) StorePostings(ulid.ULID, labels.Label, []byte, string) {} +func (noopCache) FetchMultiPostings(_ context.Context, _ ulid.ULID, keys []labels.Label, tenant string) (map[labels.Label][]byte, []labels.Label) { return map[labels.Label][]byte{}, keys } -func (noopCache) StoreExpandedPostings(_ ulid.ULID, _ []*labels.Matcher, _ []byte) {} -func (noopCache) FetchExpandedPostings(_ context.Context, _ ulid.ULID, _ []*labels.Matcher) ([]byte, bool) { +func (noopCache) StoreExpandedPostings(_ ulid.ULID, _ []*labels.Matcher, _ []byte, tenant string) {} +func (noopCache) FetchExpandedPostings(_ context.Context, _ ulid.ULID, _ []*labels.Matcher, tenant string) ([]byte, bool) { return []byte{}, false } -func (noopCache) StoreSeries(ulid.ULID, storage.SeriesRef, []byte) {} -func (noopCache) FetchMultiSeries(_ context.Context, _ ulid.ULID, ids []storage.SeriesRef) (map[storage.SeriesRef][]byte, []storage.SeriesRef) { +func (noopCache) StoreSeries(ulid.ULID, storage.SeriesRef, []byte, string) {} +func (noopCache) FetchMultiSeries(_ context.Context, _ ulid.ULID, ids []storage.SeriesRef, tenant string) (map[storage.SeriesRef][]byte, []storage.SeriesRef) { return map[storage.SeriesRef][]byte{}, ids } @@ -955,9 +955,10 @@ type blockSeriesClient struct { shardMatcher *storepb.ShardMatcher blockMatchers []*labels.Matcher calculateChunkHash bool - seriesFetchDurationSum prometheus.Histogram - chunkFetchDuration prometheus.Histogram - chunkFetchDurationSum prometheus.Histogram + seriesFetchDurationSum *prometheus.HistogramVec + chunkFetchDuration *prometheus.HistogramVec + chunkFetchDurationSum *prometheus.HistogramVec + tenant string // Internal state. i uint64 @@ -982,14 +983,15 @@ func newBlockSeriesClient( shardMatcher *storepb.ShardMatcher, calculateChunkHash bool, batchSize int, - seriesFetchDurationSum prometheus.Histogram, - chunkFetchDuration prometheus.Histogram, - chunkFetchDurationSum prometheus.Histogram, + seriesFetchDurationSum *prometheus.HistogramVec, + chunkFetchDuration *prometheus.HistogramVec, + chunkFetchDurationSum *prometheus.HistogramVec, extLsetToRemove map[string]struct{}, lazyExpandedPostingEnabled bool, lazyExpandedPostingsCount prometheus.Counter, lazyExpandedPostingSizeBytes prometheus.Counter, lazyExpandedPostingSeriesOverfetchedSizeBytes prometheus.Counter, + tenant string, ) *blockSeriesClient { var chunkr *bucketChunkReader if !req.SkipChunks { @@ -1029,6 +1031,7 @@ func newBlockSeriesClient( calculateChunkHash: calculateChunkHash, hasMorePostings: true, batchSize: batchSize, + tenant: tenant, } } @@ -1068,7 +1071,7 @@ func (b *blockSeriesClient) ExpandPostings( matchers sortedMatchers, seriesLimiter SeriesLimiter, ) error { - ps, err := b.indexr.ExpandedPostings(b.ctx, matchers, b.bytesLimiter, b.lazyExpandedPostingEnabled, b.lazyExpandedPostingSizeBytes) + ps, err := b.indexr.ExpandedPostings(b.ctx, matchers, b.bytesLimiter, b.lazyExpandedPostingEnabled, b.lazyExpandedPostingSizeBytes, b.tenant) if err != nil { return errors.Wrap(err, "expanded matching posting") } @@ -1099,16 +1102,16 @@ func (b *blockSeriesClient) ExpandPostings( func (b *blockSeriesClient) Recv() (*storepb.SeriesResponse, error) { for len(b.entries) == 0 && b.hasMorePostings { - if err := b.nextBatch(); err != nil { + if err := b.nextBatch(b.tenant); err != nil { return nil, err } } if len(b.entries) == 0 { - b.seriesFetchDurationSum.Observe(b.indexr.stats.SeriesDownloadLatencySum.Seconds()) + b.seriesFetchDurationSum.WithLabelValues(b.tenant).Observe(b.indexr.stats.SeriesDownloadLatencySum.Seconds()) if b.chunkr != nil { - b.chunkFetchDuration.Observe(b.chunkr.stats.ChunksFetchDurationSum.Seconds()) - b.chunkFetchDurationSum.Observe(b.chunkr.stats.ChunksDownloadLatencySum.Seconds()) + b.chunkFetchDuration.WithLabelValues(b.tenant).Observe(b.chunkr.stats.ChunksFetchDurationSum.Seconds()) + b.chunkFetchDurationSum.WithLabelValues(b.tenant).Observe(b.chunkr.stats.ChunksDownloadLatencySum.Seconds()) } return nil, io.EOF } @@ -1122,7 +1125,7 @@ func (b *blockSeriesClient) Recv() (*storepb.SeriesResponse, error) { }), nil } -func (b *blockSeriesClient) nextBatch() error { +func (b *blockSeriesClient) nextBatch(tenant string) error { start := b.i end := start + uint64(b.batchSize) if end > uint64(len(b.lazyPostings.postings)) { @@ -1143,7 +1146,7 @@ func (b *blockSeriesClient) nextBatch() error { b.expandedPostings[i] = b.expandedPostings[i] / 16 } } - b.indexr.storeExpandedPostingsToCache(b.blockMatchers, index.NewListPostings(b.expandedPostings), len(b.expandedPostings)) + b.indexr.storeExpandedPostingsToCache(b.blockMatchers, index.NewListPostings(b.expandedPostings), len(b.expandedPostings), tenant) } return nil } @@ -1153,7 +1156,7 @@ func (b *blockSeriesClient) nextBatch() error { b.chunkr.reset() } - if err := b.indexr.PreloadSeries(b.ctx, postingsBatch, b.bytesLimiter); err != nil { + if err := b.indexr.PreloadSeries(b.ctx, postingsBatch, b.bytesLimiter, b.tenant); err != nil { return errors.Wrap(err, "preload series") } @@ -1227,7 +1230,7 @@ OUTER: } if !b.skipChunks { - if err := b.chunkr.load(b.ctx, b.entries, b.loadAggregates, b.calculateChunkHash, b.bytesLimiter); err != nil { + if err := b.chunkr.load(b.ctx, b.entries, b.loadAggregates, b.calculateChunkHash, b.bytesLimiter, b.tenant); err != nil { return errors.Wrap(err, "load chunks") } } @@ -1376,7 +1379,6 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store } tenant, _ := tenancy.GetTenantFromGRPCMetadata(srv.Context()) - level.Debug(s.logger).Log("msg", "Tenant for Series request", "tenant", tenant) matchers, err := storepb.MatchersToPromMatchers(req.Matchers...) if err != nil { @@ -1386,7 +1388,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store req.MaxTime = s.limitMaxTime(req.MaxTime) var ( - bytesLimiter = s.bytesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("bytes")) + bytesLimiter = s.bytesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("bytes", tenant)) ctx = srv.Context() stats = &queryStats{} respSets []respSet @@ -1394,8 +1396,8 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store g, gctx = errgroup.WithContext(ctx) resHints = &hintspb.SeriesResponseHints{} reqBlockMatchers []*labels.Matcher - chunksLimiter = s.chunksLimiterFactory(s.metrics.queriesDropped.WithLabelValues("chunks")) - seriesLimiter = s.seriesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("series")) + chunksLimiter = s.chunksLimiterFactory(s.metrics.queriesDropped.WithLabelValues("chunks", tenant)) + seriesLimiter = s.seriesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("series", tenant)) queryStatsEnabled = false ) @@ -1467,6 +1469,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store s.metrics.lazyExpandedPostingsCount, s.metrics.lazyExpandedPostingSizeBytes, s.metrics.lazyExpandedPostingSeriesOverfetchedSizeBytes, + tenant, ) defer blockClient.Close() @@ -1505,7 +1508,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store blockClient, shardMatcher, false, - s.metrics.emptyPostingCount, + s.metrics.emptyPostingCount.WithLabelValues(tenant), nil, ) @@ -1521,28 +1524,28 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store s.mtx.RUnlock() defer func() { - s.metrics.seriesDataTouched.WithLabelValues("postings").Observe(float64(stats.postingsTouched)) - s.metrics.seriesDataFetched.WithLabelValues("postings").Observe(float64(stats.postingsFetched)) - s.metrics.seriesDataSizeTouched.WithLabelValues("postings").Observe(float64(stats.PostingsTouchedSizeSum)) - s.metrics.seriesDataSizeFetched.WithLabelValues("postings").Observe(float64(stats.PostingsFetchedSizeSum)) - s.metrics.seriesDataTouched.WithLabelValues("series").Observe(float64(stats.seriesTouched)) - s.metrics.seriesDataFetched.WithLabelValues("series").Observe(float64(stats.seriesFetched)) - s.metrics.seriesDataSizeTouched.WithLabelValues("series").Observe(float64(stats.SeriesTouchedSizeSum)) - s.metrics.seriesDataSizeFetched.WithLabelValues("series").Observe(float64(stats.SeriesFetchedSizeSum)) - s.metrics.seriesDataTouched.WithLabelValues("chunks").Observe(float64(stats.chunksTouched)) - s.metrics.seriesDataFetched.WithLabelValues("chunks").Observe(float64(stats.chunksFetched)) - s.metrics.seriesDataSizeTouched.WithLabelValues("chunks").Observe(float64(stats.ChunksTouchedSizeSum)) - s.metrics.seriesDataSizeFetched.WithLabelValues("chunks").Observe(float64(stats.ChunksFetchedSizeSum)) - s.metrics.resultSeriesCount.Observe(float64(stats.mergedSeriesCount)) - s.metrics.cachedPostingsCompressions.WithLabelValues(labelEncode).Add(float64(stats.cachedPostingsCompressions)) - s.metrics.cachedPostingsCompressions.WithLabelValues(labelDecode).Add(float64(stats.cachedPostingsDecompressions)) - s.metrics.cachedPostingsCompressionErrors.WithLabelValues(labelEncode).Add(float64(stats.cachedPostingsCompressionErrors)) - s.metrics.cachedPostingsCompressionErrors.WithLabelValues(labelDecode).Add(float64(stats.cachedPostingsDecompressionErrors)) - s.metrics.cachedPostingsCompressionTimeSeconds.WithLabelValues(labelEncode).Add(stats.CachedPostingsCompressionTimeSum.Seconds()) - s.metrics.cachedPostingsCompressionTimeSeconds.WithLabelValues(labelDecode).Add(stats.CachedPostingsDecompressionTimeSum.Seconds()) - s.metrics.cachedPostingsOriginalSizeBytes.Add(float64(stats.CachedPostingsOriginalSizeSum)) - s.metrics.cachedPostingsCompressedSizeBytes.Add(float64(stats.CachedPostingsCompressedSizeSum)) - s.metrics.postingsSizeBytes.Observe(float64(int(stats.PostingsFetchedSizeSum) + int(stats.PostingsTouchedSizeSum))) + s.metrics.seriesDataTouched.WithLabelValues("postings", tenant).Observe(float64(stats.postingsTouched)) + s.metrics.seriesDataFetched.WithLabelValues("postings", tenant).Observe(float64(stats.postingsFetched)) + s.metrics.seriesDataSizeTouched.WithLabelValues("postings", tenant).Observe(float64(stats.PostingsTouchedSizeSum)) + s.metrics.seriesDataSizeFetched.WithLabelValues("postings", tenant).Observe(float64(stats.PostingsFetchedSizeSum)) + s.metrics.seriesDataTouched.WithLabelValues("series", tenant).Observe(float64(stats.seriesTouched)) + s.metrics.seriesDataFetched.WithLabelValues("series", tenant).Observe(float64(stats.seriesFetched)) + s.metrics.seriesDataSizeTouched.WithLabelValues("series", tenant).Observe(float64(stats.SeriesTouchedSizeSum)) + s.metrics.seriesDataSizeFetched.WithLabelValues("series", tenant).Observe(float64(stats.SeriesFetchedSizeSum)) + s.metrics.seriesDataTouched.WithLabelValues("chunks", tenant).Observe(float64(stats.chunksTouched)) + s.metrics.seriesDataFetched.WithLabelValues("chunks", tenant).Observe(float64(stats.chunksFetched)) + s.metrics.seriesDataSizeTouched.WithLabelValues("chunks", tenant).Observe(float64(stats.ChunksTouchedSizeSum)) + s.metrics.seriesDataSizeFetched.WithLabelValues("chunks", tenant).Observe(float64(stats.ChunksFetchedSizeSum)) + s.metrics.resultSeriesCount.WithLabelValues(tenant).Observe(float64(stats.mergedSeriesCount)) + s.metrics.cachedPostingsCompressions.WithLabelValues(labelEncode, tenant).Add(float64(stats.cachedPostingsCompressions)) + s.metrics.cachedPostingsCompressions.WithLabelValues(labelDecode, tenant).Add(float64(stats.cachedPostingsDecompressions)) + s.metrics.cachedPostingsCompressionErrors.WithLabelValues(labelEncode, tenant).Add(float64(stats.cachedPostingsCompressionErrors)) + s.metrics.cachedPostingsCompressionErrors.WithLabelValues(labelDecode, tenant).Add(float64(stats.cachedPostingsDecompressionErrors)) + s.metrics.cachedPostingsCompressionTimeSeconds.WithLabelValues(labelEncode, tenant).Add(stats.CachedPostingsCompressionTimeSum.Seconds()) + s.metrics.cachedPostingsCompressionTimeSeconds.WithLabelValues(labelDecode, tenant).Add(stats.CachedPostingsDecompressionTimeSum.Seconds()) + s.metrics.cachedPostingsOriginalSizeBytes.WithLabelValues(tenant).Add(float64(stats.CachedPostingsOriginalSizeSum)) + s.metrics.cachedPostingsCompressedSizeBytes.WithLabelValues(tenant).Add(float64(stats.CachedPostingsCompressedSizeSum)) + s.metrics.postingsSizeBytes.WithLabelValues(tenant).Observe(float64(int(stats.PostingsFetchedSizeSum) + int(stats.PostingsTouchedSizeSum))) level.Debug(s.logger).Log("msg", "stats query processed", "request", req, @@ -1564,8 +1567,8 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store } stats.blocksQueried = len(respSets) stats.GetAllDuration = time.Since(begin) - s.metrics.seriesGetAllDuration.Observe(stats.GetAllDuration.Seconds()) - s.metrics.seriesBlocksQueried.Observe(float64(stats.blocksQueried)) + s.metrics.seriesGetAllDuration.WithLabelValues(tenant).Observe(stats.GetAllDuration.Seconds()) + s.metrics.seriesBlocksQueried.WithLabelValues(tenant).Observe(float64(stats.blocksQueried)) } // Merge the sub-results from each selected block. @@ -1593,7 +1596,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store stats.mergedSeriesCount++ if !req.SkipChunks { stats.mergedChunksCount += len(series.Chunks) - s.metrics.chunkSizeBytes.Observe(float64(chunksSize(series.Chunks))) + s.metrics.chunkSizeBytes.WithLabelValues(tenant).Observe(float64(chunksSize(series.Chunks))) } } if err = srv.Send(at); err != nil { @@ -1602,7 +1605,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store } } stats.MergeDuration = time.Since(begin) - s.metrics.seriesMergeDuration.Observe(stats.MergeDuration.Seconds()) + s.metrics.seriesMergeDuration.WithLabelValues(tenant).Observe(stats.MergeDuration.Seconds()) err = nil }) @@ -1648,7 +1651,6 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq } tenant, _ := tenancy.GetTenantFromGRPCMetadata(ctx) - level.Debug(s.logger).Log("msg", "Tenant for LabelNames request", "tenant", tenant) resHints := &hintspb.LabelNamesResponseHints{} @@ -1672,8 +1674,8 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq var mtx sync.Mutex var sets [][]string - var seriesLimiter = s.seriesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("series")) - var bytesLimiter = s.bytesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("bytes")) + var seriesLimiter = s.seriesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("series", tenant)) + var bytesLimiter = s.bytesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("bytes", tenant)) for _, b := range s.blocks { b := b @@ -1750,6 +1752,7 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq s.metrics.lazyExpandedPostingsCount, s.metrics.lazyExpandedPostingSizeBytes, s.metrics.lazyExpandedPostingSeriesOverfetchedSizeBytes, + tenant, ) defer blockClient.Close() @@ -1848,7 +1851,6 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR } tenant, _ := tenancy.GetTenantFromGRPCMetadata(ctx) - level.Debug(s.logger).Log("msg", "Tenant for LabelValues request", "tenant", tenant) resHints := &hintspb.LabelValuesResponseHints{} @@ -1872,8 +1874,8 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR var mtx sync.Mutex var sets [][]string - var seriesLimiter = s.seriesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("series")) - var bytesLimiter = s.bytesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("bytes")) + var seriesLimiter = s.seriesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("series", tenant)) + var bytesLimiter = s.bytesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("bytes", tenant)) for _, b := range s.blocks { b := b @@ -1953,6 +1955,7 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR s.metrics.lazyExpandedPostingsCount, s.metrics.lazyExpandedPostingSizeBytes, s.metrics.lazyExpandedPostingSeriesOverfetchedSizeBytes, + tenant, ) defer blockClient.Close() @@ -2394,14 +2397,14 @@ func (r *bucketIndexReader) reset() { // Reminder: A posting is a reference (represented as a uint64) to a series reference, which in turn points to the first // chunk where the series contains the matching label-value pair for a given block of data. Postings can be fetched by // single label name=value. -func (r *bucketIndexReader) ExpandedPostings(ctx context.Context, ms sortedMatchers, bytesLimiter BytesLimiter, lazyExpandedPostingEnabled bool, lazyExpandedPostingSizeBytes prometheus.Counter) (*lazyExpandedPostings, error) { +func (r *bucketIndexReader) ExpandedPostings(ctx context.Context, ms sortedMatchers, bytesLimiter BytesLimiter, lazyExpandedPostingEnabled bool, lazyExpandedPostingSizeBytes prometheus.Counter, tenant string) (*lazyExpandedPostings, error) { // Shortcut the case of `len(postingGroups) == 0`. It will only happen when no // matchers specified, and we don't need to fetch expanded postings from cache. if len(ms) == 0 { return nil, nil } - hit, postings, err := r.fetchExpandedPostingsFromCache(ctx, ms, bytesLimiter) + hit, postings, err := r.fetchExpandedPostingsFromCache(ctx, ms, bytesLimiter, tenant) if err != nil { return nil, err } @@ -2418,7 +2421,7 @@ func (r *bucketIndexReader) ExpandedPostings(ctx context.Context, ms sortedMatch return nil, errors.Wrap(err, "matchersToPostingGroups") } if postingGroups == nil { - r.storeExpandedPostingsToCache(ms, index.EmptyPostings(), 0) + r.storeExpandedPostingsToCache(ms, index.EmptyPostings(), 0, tenant) return nil, nil } i := 0 @@ -2446,13 +2449,13 @@ func (r *bucketIndexReader) ExpandedPostings(ctx context.Context, ms sortedMatch postingGroups = append(postingGroups, newPostingGroup(true, name, []string{value}, nil)) } - ps, err := fetchLazyExpandedPostings(ctx, postingGroups, r, bytesLimiter, addAllPostings, lazyExpandedPostingEnabled, lazyExpandedPostingSizeBytes) + ps, err := fetchLazyExpandedPostings(ctx, postingGroups, r, bytesLimiter, addAllPostings, lazyExpandedPostingEnabled, lazyExpandedPostingSizeBytes, tenant) if err != nil { return nil, errors.Wrap(err, "fetch and expand postings") } // If postings still have matchers to be applied lazily, cache expanded postings after filtering series so skip here. if !ps.lazyExpanded() { - r.storeExpandedPostingsToCache(ms, index.NewListPostings(ps.postings), len(ps.postings)) + r.storeExpandedPostingsToCache(ms, index.NewListPostings(ps.postings), len(ps.postings), tenant) } if len(ps.postings) > 0 { @@ -2743,8 +2746,8 @@ type postingPtr struct { ptr index.Range } -func (r *bucketIndexReader) fetchExpandedPostingsFromCache(ctx context.Context, ms []*labels.Matcher, bytesLimiter BytesLimiter) (bool, []storage.SeriesRef, error) { - dataFromCache, hit := r.block.indexCache.FetchExpandedPostings(ctx, r.block.meta.ULID, ms) +func (r *bucketIndexReader) fetchExpandedPostingsFromCache(ctx context.Context, ms []*labels.Matcher, bytesLimiter BytesLimiter, tenant string) (bool, []storage.SeriesRef, error) { + dataFromCache, hit := r.block.indexCache.FetchExpandedPostings(ctx, r.block.meta.ULID, ms, tenant) if !hit { return false, nil, nil } @@ -2788,7 +2791,7 @@ func (r *bucketIndexReader) fetchExpandedPostingsFromCache(ctx context.Context, return true, ps, nil } -func (r *bucketIndexReader) storeExpandedPostingsToCache(ms []*labels.Matcher, ps index.Postings, length int) { +func (r *bucketIndexReader) storeExpandedPostingsToCache(ms []*labels.Matcher, ps index.Postings, length int, tenant string) { // Encode postings to cache. We compress and cache postings before adding // 16 bytes padding in order to make compressed size smaller. dataToCache, compressionDuration, compressionErrors, compressedSize := r.encodePostingsToCache(ps, length) @@ -2797,7 +2800,7 @@ func (r *bucketIndexReader) storeExpandedPostingsToCache(ms []*labels.Matcher, p r.stats.CachedPostingsCompressionTimeSum += compressionDuration r.stats.CachedPostingsCompressedSizeSum += units.Base2Bytes(compressedSize) r.stats.CachedPostingsOriginalSizeSum += units.Base2Bytes(length * 4) // Estimate the posting list size. - r.block.indexCache.StoreExpandedPostings(r.block.meta.ULID, ms, dataToCache) + r.block.indexCache.StoreExpandedPostings(r.block.meta.ULID, ms, dataToCache, tenant) } var bufioReaderPool = sync.Pool{ @@ -2809,10 +2812,10 @@ var bufioReaderPool = sync.Pool{ // fetchPostings fill postings requested by posting groups. // It returns one posting for each key, in the same order. // If postings for given key is not fetched, entry at given index will be nil. -func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Label, bytesLimiter BytesLimiter) ([]index.Postings, []func(), error) { +func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Label, bytesLimiter BytesLimiter, tenant string) ([]index.Postings, []func(), error) { var closeFns []func() - timer := prometheus.NewTimer(r.block.metrics.postingsFetchDuration) + timer := prometheus.NewTimer(r.block.metrics.postingsFetchDuration.WithLabelValues(tenant)) defer timer.ObserveDuration() var ptrs []postingPtr @@ -2820,7 +2823,7 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab output := make([]index.Postings, len(keys)) // Fetch postings from the cache with a single call. - fromCache, _ := r.block.indexCache.FetchMultiPostings(ctx, r.block.meta.ULID, keys) + fromCache, _ := r.block.indexCache.FetchMultiPostings(ctx, r.block.meta.ULID, keys, tenant) for _, dataFromCache := range fromCache { if err := bytesLimiter.Reserve(uint64(len(dataFromCache))); err != nil { return nil, closeFns, httpgrpc.Errorf(int(codes.ResourceExhausted), "exceeded bytes limit while loading postings from index cache: %s", err) @@ -2938,7 +2941,7 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab r.stats.CachedPostingsCompressionTimeSum += time.Since(startCompression) r.mtx.Unlock() - r.block.indexCache.StorePostings(r.block.meta.ULID, keys[keyID], dataToCache) + r.block.indexCache.StorePostings(r.block.meta.ULID, keys[keyID], dataToCache, tenant) } r.mtx.Lock() @@ -3049,8 +3052,8 @@ func (it *bigEndianPostings) length() int { return len(it.list) / 4 } -func (r *bucketIndexReader) PreloadSeries(ctx context.Context, ids []storage.SeriesRef, bytesLimiter BytesLimiter) error { - timer := prometheus.NewTimer(r.block.metrics.seriesFetchDuration) +func (r *bucketIndexReader) PreloadSeries(ctx context.Context, ids []storage.SeriesRef, bytesLimiter BytesLimiter, tenant string) error { + timer := prometheus.NewTimer(r.block.metrics.seriesFetchDuration.WithLabelValues(tenant)) defer func() { d := timer.ObserveDuration() r.stats.SeriesDownloadLatencySum += d @@ -3058,7 +3061,7 @@ func (r *bucketIndexReader) PreloadSeries(ctx context.Context, ids []storage.Ser // Load series from cache, overwriting the list of ids to preload // with the missing ones. - fromCache, ids := r.block.indexCache.FetchMultiSeries(ctx, r.block.meta.ULID, ids) + fromCache, ids := r.block.indexCache.FetchMultiSeries(ctx, r.block.meta.ULID, ids, tenant) for id, b := range fromCache { r.loadedSeries[id] = b if err := bytesLimiter.Reserve(uint64(len(b))); err != nil { @@ -3077,13 +3080,13 @@ func (r *bucketIndexReader) PreloadSeries(ctx context.Context, ids []storage.Ser i, j := p.ElemRng[0], p.ElemRng[1] g.Go(func() error { - return r.loadSeries(ctx, ids[i:j], false, s, e, bytesLimiter) + return r.loadSeries(ctx, ids[i:j], false, s, e, bytesLimiter, tenant) }) } return g.Wait() } -func (r *bucketIndexReader) loadSeries(ctx context.Context, ids []storage.SeriesRef, refetch bool, start, end uint64, bytesLimiter BytesLimiter) error { +func (r *bucketIndexReader) loadSeries(ctx context.Context, ids []storage.SeriesRef, refetch bool, start, end uint64, bytesLimiter BytesLimiter, tenant string) error { begin := time.Now() if bytesLimiter != nil { @@ -3120,16 +3123,16 @@ func (r *bucketIndexReader) loadSeries(ctx context.Context, ids []storage.Series } // Inefficient, but should be rare. - r.block.metrics.seriesRefetches.Inc() + r.block.metrics.seriesRefetches.WithLabelValues(tenant).Inc() level.Warn(r.block.logger).Log("msg", "series size exceeded expected size; refetching", "id", id, "series length", n+int(l), "maxSeriesSize", r.block.estimatedMaxSeriesSize) // Fetch plus to get the size of next one if exists. - return r.loadSeries(ctx, ids[i:], true, uint64(id), uint64(id)+uint64(n+int(l)+1), bytesLimiter) + return r.loadSeries(ctx, ids[i:], true, uint64(id), uint64(id)+uint64(n+int(l)+1), bytesLimiter, tenant) } c = c[n : n+int(l)] r.mtx.Lock() r.loadedSeries[id] = c - r.block.indexCache.StoreSeries(r.block.meta.ULID, id, c) + r.block.indexCache.StoreSeries(r.block.meta.ULID, id, c, tenant) r.mtx.Unlock() } return nil @@ -3367,7 +3370,7 @@ func (r *bucketChunkReader) addLoad(id chunks.ChunkRef, seriesEntry, chunk int) } // load loads all added chunks and saves resulting aggrs to refs. -func (r *bucketChunkReader) load(ctx context.Context, res []seriesEntry, aggrs []storepb.Aggr, calculateChunkChecksum bool, bytesLimiter BytesLimiter) error { +func (r *bucketChunkReader) load(ctx context.Context, res []seriesEntry, aggrs []storepb.Aggr, calculateChunkChecksum bool, bytesLimiter BytesLimiter, tenant string) error { r.loadingChunksMtx.Lock() r.loadingChunks = true r.loadingChunksMtx.Unlock() @@ -3405,7 +3408,7 @@ func (r *bucketChunkReader) load(ctx context.Context, res []seriesEntry, aggrs [ p := p indices := pIdxs[p.ElemRng[0]:p.ElemRng[1]] g.Go(func() error { - return r.loadChunks(ctx, res, aggrs, seq, p, indices, calculateChunkChecksum, bytesLimiter) + return r.loadChunks(ctx, res, aggrs, seq, p, indices, calculateChunkChecksum, bytesLimiter, tenant) }) } } @@ -3414,7 +3417,7 @@ func (r *bucketChunkReader) load(ctx context.Context, res []seriesEntry, aggrs [ // loadChunks will read range [start, end] from the segment file with sequence number seq. // This data range covers chunks starting at supplied offsets. -func (r *bucketChunkReader) loadChunks(ctx context.Context, res []seriesEntry, aggrs []storepb.Aggr, seq int, part Part, pIdxs []loadIdx, calculateChunkChecksum bool, bytesLimiter BytesLimiter) error { +func (r *bucketChunkReader) loadChunks(ctx context.Context, res []seriesEntry, aggrs []storepb.Aggr, seq int, part Part, pIdxs []loadIdx, calculateChunkChecksum bool, bytesLimiter BytesLimiter, tenant string) error { var locked bool fetchBegin := time.Now() defer func() { @@ -3503,7 +3506,7 @@ func (r *bucketChunkReader) loadChunks(ctx context.Context, res []seriesEntry, a continue } - r.block.metrics.chunkRefetches.Inc() + r.block.metrics.chunkRefetches.WithLabelValues(tenant).Inc() // If we didn't fetch enough data for the chunk, fetch more. fetchBegin = time.Now() // Read entire chunk into new buffer. diff --git a/vendor/github.com/thanos-io/thanos/pkg/store/cache/cache.go b/vendor/github.com/thanos-io/thanos/pkg/store/cache/cache.go index 360cdd67e5..0811d89cc0 100644 --- a/vendor/github.com/thanos-io/thanos/pkg/store/cache/cache.go +++ b/vendor/github.com/thanos-io/thanos/pkg/store/cache/cache.go @@ -15,6 +15,8 @@ import ( "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/storage" "golang.org/x/crypto/blake2b" + + "github.com/thanos-io/thanos/pkg/tenancy" ) const ( @@ -36,24 +38,24 @@ var ( // (potentially with a deadline) as in the original user's request. type IndexCache interface { // StorePostings stores postings for a single series. - StorePostings(blockID ulid.ULID, l labels.Label, v []byte) + StorePostings(blockID ulid.ULID, l labels.Label, v []byte, tenant string) // FetchMultiPostings fetches multiple postings - each identified by a label - // and returns a map containing cache hits, along with a list of missing keys. - FetchMultiPostings(ctx context.Context, blockID ulid.ULID, keys []labels.Label) (hits map[labels.Label][]byte, misses []labels.Label) + FetchMultiPostings(ctx context.Context, blockID ulid.ULID, keys []labels.Label, tenant string) (hits map[labels.Label][]byte, misses []labels.Label) // StoreExpandedPostings stores expanded postings for a set of label matchers. - StoreExpandedPostings(blockID ulid.ULID, matchers []*labels.Matcher, v []byte) + StoreExpandedPostings(blockID ulid.ULID, matchers []*labels.Matcher, v []byte, tenant string) // FetchExpandedPostings fetches expanded postings and returns cached data and a boolean value representing whether it is a cache hit or not. - FetchExpandedPostings(ctx context.Context, blockID ulid.ULID, matchers []*labels.Matcher) ([]byte, bool) + FetchExpandedPostings(ctx context.Context, blockID ulid.ULID, matchers []*labels.Matcher, tenant string) ([]byte, bool) // StoreSeries stores a single series. - StoreSeries(blockID ulid.ULID, id storage.SeriesRef, v []byte) + StoreSeries(blockID ulid.ULID, id storage.SeriesRef, v []byte, tenant string) // FetchMultiSeries fetches multiple series - each identified by ID - from the cache // and returns a map containing cache hits, along with a list of missing IDs. - FetchMultiSeries(ctx context.Context, blockID ulid.ULID, ids []storage.SeriesRef) (hits map[storage.SeriesRef][]byte, misses []storage.SeriesRef) + FetchMultiSeries(ctx context.Context, blockID ulid.ULID, ids []storage.SeriesRef, tenant string) (hits map[storage.SeriesRef][]byte, misses []storage.SeriesRef) } // Common metrics that should be used by all cache implementations. @@ -69,23 +71,23 @@ func newCommonMetrics(reg prometheus.Registerer) *commonMetrics { requestTotal: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ Name: "thanos_store_index_cache_requests_total", Help: "Total number of items requests to the cache.", - }, []string{"item_type"}), + }, []string{"item_type", tenancy.MetricLabel}), hitsTotal: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ Name: "thanos_store_index_cache_hits_total", Help: "Total number of items requests to the cache that were a hit.", - }, []string{"item_type"}), + }, []string{"item_type", tenancy.MetricLabel}), dataSizeBytes: promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{ Name: "thanos_store_index_cache_stored_data_size_bytes", Help: "Histogram to track item data size stored in index cache", Buckets: []float64{ 32, 256, 512, 1024, 32 * 1024, 256 * 1024, 512 * 1024, 1024 * 1024, 32 * 1024 * 1024, 64 * 1024 * 1024, 128 * 1024 * 1024, 256 * 1024 * 1024, 512 * 1024 * 1024, }, - }, []string{"item_type"}), + }, []string{"item_type", tenancy.MetricLabel}), fetchLatency: promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{ Name: "thanos_store_index_cache_fetch_duration_seconds", Help: "Histogram to track latency to fetch items from index cache", Buckets: []float64{0.01, 0.1, 0.3, 0.6, 1, 3, 6, 10, 15, 20, 30, 45, 60, 90, 120}, - }, []string{"item_type"}), + }, []string{"item_type", tenancy.MetricLabel}), } } diff --git a/vendor/github.com/thanos-io/thanos/pkg/store/cache/factory.go b/vendor/github.com/thanos-io/thanos/pkg/store/cache/factory.go index d4e9f0c5cd..4204c92f22 100644 --- a/vendor/github.com/thanos-io/thanos/pkg/store/cache/factory.go +++ b/vendor/github.com/thanos-io/thanos/pkg/store/cache/factory.go @@ -28,6 +28,9 @@ const ( type IndexCacheConfig struct { Type IndexCacheProvider `yaml:"type"` Config interface{} `yaml:"config"` + + // Available item types are Postings, Series and ExpandedPostings. + EnabledItems []string `yaml:"enabled_items"` } // NewIndexCache initializes and returns new index cache. @@ -66,5 +69,13 @@ func NewIndexCache(logger log.Logger, confContentYaml []byte, reg prometheus.Reg if err != nil { return nil, errors.Wrap(err, fmt.Sprintf("create %s index cache", cacheConfig.Type)) } + + if len(cacheConfig.EnabledItems) > 0 { + if err = ValidateEnabledItems(cacheConfig.EnabledItems); err != nil { + return nil, err + } + cache = NewFilteredIndexCache(cache, cacheConfig.EnabledItems) + } + return cache, nil } diff --git a/vendor/github.com/thanos-io/thanos/pkg/store/cache/filter_cache.go b/vendor/github.com/thanos-io/thanos/pkg/store/cache/filter_cache.go new file mode 100644 index 0000000000..193f7363a2 --- /dev/null +++ b/vendor/github.com/thanos-io/thanos/pkg/store/cache/filter_cache.go @@ -0,0 +1,88 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package storecache + +import ( + "context" + "fmt" + + "github.com/oklog/ulid" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/storage" + "golang.org/x/exp/slices" +) + +type FilteredIndexCache struct { + cache IndexCache + enabledItems []string +} + +// NewFilteredIndexCache creates a filtered index cache based on enabled items. +func NewFilteredIndexCache(cache IndexCache, enabledItems []string) *FilteredIndexCache { + return &FilteredIndexCache{ + cache: cache, + enabledItems: enabledItems, + } +} + +// StorePostings sets the postings identified by the ulid and label to the value v, +// if the postings already exists in the cache it is not mutated. +func (c *FilteredIndexCache) StorePostings(blockID ulid.ULID, l labels.Label, v []byte, tenant string) { + if len(c.enabledItems) == 0 || slices.Contains(c.enabledItems, cacheTypePostings) { + c.cache.StorePostings(blockID, l, v, tenant) + } +} + +// FetchMultiPostings fetches multiple postings - each identified by a label - +// and returns a map containing cache hits, along with a list of missing keys. +func (c *FilteredIndexCache) FetchMultiPostings(ctx context.Context, blockID ulid.ULID, keys []labels.Label, tenant string) (hits map[labels.Label][]byte, misses []labels.Label) { + if len(c.enabledItems) == 0 || slices.Contains(c.enabledItems, cacheTypePostings) { + return c.cache.FetchMultiPostings(ctx, blockID, keys, tenant) + } + return nil, keys +} + +// StoreExpandedPostings stores expanded postings for a set of label matchers. +func (c *FilteredIndexCache) StoreExpandedPostings(blockID ulid.ULID, matchers []*labels.Matcher, v []byte, tenant string) { + if len(c.enabledItems) == 0 || slices.Contains(c.enabledItems, cacheTypeExpandedPostings) { + c.cache.StoreExpandedPostings(blockID, matchers, v, tenant) + } +} + +// FetchExpandedPostings fetches expanded postings and returns cached data and a boolean value representing whether it is a cache hit or not. +func (c *FilteredIndexCache) FetchExpandedPostings(ctx context.Context, blockID ulid.ULID, matchers []*labels.Matcher, tenant string) ([]byte, bool) { + if len(c.enabledItems) == 0 || slices.Contains(c.enabledItems, cacheTypeExpandedPostings) { + return c.cache.FetchExpandedPostings(ctx, blockID, matchers, tenant) + } + return nil, false +} + +// StoreSeries sets the series identified by the ulid and id to the value v, +// if the series already exists in the cache it is not mutated. +func (c *FilteredIndexCache) StoreSeries(blockID ulid.ULID, id storage.SeriesRef, v []byte, tenant string) { + if len(c.enabledItems) == 0 || slices.Contains(c.enabledItems, cacheTypeSeries) { + c.cache.StoreSeries(blockID, id, v, tenant) + } +} + +// FetchMultiSeries fetches multiple series - each identified by ID - from the cache +// and returns a map containing cache hits, along with a list of missing IDs. +func (c *FilteredIndexCache) FetchMultiSeries(ctx context.Context, blockID ulid.ULID, ids []storage.SeriesRef, tenant string) (hits map[storage.SeriesRef][]byte, misses []storage.SeriesRef) { + if len(c.enabledItems) == 0 || slices.Contains(c.enabledItems, cacheTypeSeries) { + return c.cache.FetchMultiSeries(ctx, blockID, ids, tenant) + } + return nil, ids +} + +func ValidateEnabledItems(enabledItems []string) error { + for _, item := range enabledItems { + switch item { + // valid + case cacheTypePostings, cacheTypeExpandedPostings, cacheTypeSeries: + default: + return fmt.Errorf("unsupported item type %s", item) + } + } + return nil +} diff --git a/vendor/github.com/thanos-io/thanos/pkg/store/cache/inmemory.go b/vendor/github.com/thanos-io/thanos/pkg/store/cache/inmemory.go index e0077acc35..3312c5faa0 100644 --- a/vendor/github.com/thanos-io/thanos/pkg/store/cache/inmemory.go +++ b/vendor/github.com/thanos-io/thanos/pkg/store/cache/inmemory.go @@ -21,6 +21,7 @@ import ( "gopkg.in/yaml.v2" "github.com/thanos-io/thanos/pkg/model" + "github.com/thanos-io/thanos/pkg/tenancy" ) var ( @@ -115,9 +116,9 @@ func NewInMemoryIndexCacheWithConfig(logger log.Logger, commonMetrics *commonMet c.added.WithLabelValues(cacheTypeSeries) c.added.WithLabelValues(cacheTypeExpandedPostings) - c.commonMetrics.requestTotal.WithLabelValues(cacheTypePostings) - c.commonMetrics.requestTotal.WithLabelValues(cacheTypeSeries) - c.commonMetrics.requestTotal.WithLabelValues(cacheTypeExpandedPostings) + c.commonMetrics.requestTotal.WithLabelValues(cacheTypePostings, tenancy.DefaultTenant) + c.commonMetrics.requestTotal.WithLabelValues(cacheTypeSeries, tenancy.DefaultTenant) + c.commonMetrics.requestTotal.WithLabelValues(cacheTypeExpandedPostings, tenancy.DefaultTenant) c.overflow = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ Name: "thanos_store_index_cache_items_overflowed_total", @@ -127,9 +128,9 @@ func NewInMemoryIndexCacheWithConfig(logger log.Logger, commonMetrics *commonMet c.overflow.WithLabelValues(cacheTypeSeries) c.overflow.WithLabelValues(cacheTypeExpandedPostings) - c.commonMetrics.hitsTotal.WithLabelValues(cacheTypePostings) - c.commonMetrics.hitsTotal.WithLabelValues(cacheTypeSeries) - c.commonMetrics.hitsTotal.WithLabelValues(cacheTypeExpandedPostings) + c.commonMetrics.hitsTotal.WithLabelValues(cacheTypePostings, tenancy.DefaultTenant) + c.commonMetrics.hitsTotal.WithLabelValues(cacheTypeSeries, tenancy.DefaultTenant) + c.commonMetrics.hitsTotal.WithLabelValues(cacheTypeExpandedPostings, tenancy.DefaultTenant) c.current = promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{ Name: "thanos_store_index_cache_items", @@ -197,8 +198,8 @@ func (c *InMemoryIndexCache) onEvict(key, val interface{}) { c.curSize -= entrySize } -func (c *InMemoryIndexCache) get(typ string, key cacheKey) ([]byte, bool) { - c.commonMetrics.requestTotal.WithLabelValues(typ).Inc() +func (c *InMemoryIndexCache) get(typ string, key cacheKey, tenant string) ([]byte, bool) { + c.commonMetrics.requestTotal.WithLabelValues(typ, tenant).Inc() c.mtx.Lock() defer c.mtx.Unlock() @@ -207,7 +208,7 @@ func (c *InMemoryIndexCache) get(typ string, key cacheKey) ([]byte, bool) { if !ok { return nil, false } - c.commonMetrics.hitsTotal.WithLabelValues(typ).Inc() + c.commonMetrics.hitsTotal.WithLabelValues(typ, tenant).Inc() return v.([]byte), true } @@ -294,22 +295,22 @@ func copyToKey(l labels.Label) cacheKeyPostings { // StorePostings sets the postings identified by the ulid and label to the value v, // if the postings already exists in the cache it is not mutated. -func (c *InMemoryIndexCache) StorePostings(blockID ulid.ULID, l labels.Label, v []byte) { - c.commonMetrics.dataSizeBytes.WithLabelValues(cacheTypePostings).Observe(float64(len(v))) +func (c *InMemoryIndexCache) StorePostings(blockID ulid.ULID, l labels.Label, v []byte, tenant string) { + c.commonMetrics.dataSizeBytes.WithLabelValues(cacheTypePostings, tenant).Observe(float64(len(v))) c.set(cacheTypePostings, cacheKey{block: blockID.String(), key: copyToKey(l)}, v) } // FetchMultiPostings fetches multiple postings - each identified by a label - // and returns a map containing cache hits, along with a list of missing keys. -func (c *InMemoryIndexCache) FetchMultiPostings(_ context.Context, blockID ulid.ULID, keys []labels.Label) (hits map[labels.Label][]byte, misses []labels.Label) { - timer := prometheus.NewTimer(c.commonMetrics.fetchLatency.WithLabelValues(cacheTypePostings)) +func (c *InMemoryIndexCache) FetchMultiPostings(_ context.Context, blockID ulid.ULID, keys []labels.Label, tenant string) (hits map[labels.Label][]byte, misses []labels.Label) { + timer := prometheus.NewTimer(c.commonMetrics.fetchLatency.WithLabelValues(cacheTypePostings, tenant)) defer timer.ObserveDuration() hits = map[labels.Label][]byte{} blockIDKey := blockID.String() for _, key := range keys { - if b, ok := c.get(cacheTypePostings, cacheKey{blockIDKey, cacheKeyPostings(key), ""}); ok { + if b, ok := c.get(cacheTypePostings, cacheKey{blockIDKey, cacheKeyPostings(key), ""}, tenant); ok { hits[key] = b continue } @@ -321,17 +322,17 @@ func (c *InMemoryIndexCache) FetchMultiPostings(_ context.Context, blockID ulid. } // StoreExpandedPostings stores expanded postings for a set of label matchers. -func (c *InMemoryIndexCache) StoreExpandedPostings(blockID ulid.ULID, matchers []*labels.Matcher, v []byte) { - c.commonMetrics.dataSizeBytes.WithLabelValues(cacheTypeExpandedPostings).Observe(float64(len(v))) +func (c *InMemoryIndexCache) StoreExpandedPostings(blockID ulid.ULID, matchers []*labels.Matcher, v []byte, tenant string) { + c.commonMetrics.dataSizeBytes.WithLabelValues(cacheTypeExpandedPostings, tenant).Observe(float64(len(v))) c.set(cacheTypeExpandedPostings, cacheKey{block: blockID.String(), key: cacheKeyExpandedPostings(labelMatchersToString(matchers))}, v) } // FetchExpandedPostings fetches expanded postings and returns cached data and a boolean value representing whether it is a cache hit or not. -func (c *InMemoryIndexCache) FetchExpandedPostings(_ context.Context, blockID ulid.ULID, matchers []*labels.Matcher) ([]byte, bool) { - timer := prometheus.NewTimer(c.commonMetrics.fetchLatency.WithLabelValues(cacheTypeExpandedPostings)) +func (c *InMemoryIndexCache) FetchExpandedPostings(_ context.Context, blockID ulid.ULID, matchers []*labels.Matcher, tenant string) ([]byte, bool) { + timer := prometheus.NewTimer(c.commonMetrics.fetchLatency.WithLabelValues(cacheTypeExpandedPostings, tenant)) defer timer.ObserveDuration() - if b, ok := c.get(cacheTypeExpandedPostings, cacheKey{blockID.String(), cacheKeyExpandedPostings(labelMatchersToString(matchers)), ""}); ok { + if b, ok := c.get(cacheTypeExpandedPostings, cacheKey{blockID.String(), cacheKeyExpandedPostings(labelMatchersToString(matchers)), ""}, tenant); ok { return b, true } return nil, false @@ -339,22 +340,22 @@ func (c *InMemoryIndexCache) FetchExpandedPostings(_ context.Context, blockID ul // StoreSeries sets the series identified by the ulid and id to the value v, // if the series already exists in the cache it is not mutated. -func (c *InMemoryIndexCache) StoreSeries(blockID ulid.ULID, id storage.SeriesRef, v []byte) { - c.commonMetrics.dataSizeBytes.WithLabelValues(cacheTypeSeries).Observe(float64(len(v))) +func (c *InMemoryIndexCache) StoreSeries(blockID ulid.ULID, id storage.SeriesRef, v []byte, tenant string) { + c.commonMetrics.dataSizeBytes.WithLabelValues(cacheTypeSeries, tenant).Observe(float64(len(v))) c.set(cacheTypeSeries, cacheKey{blockID.String(), cacheKeySeries(id), ""}, v) } // FetchMultiSeries fetches multiple series - each identified by ID - from the cache // and returns a map containing cache hits, along with a list of missing IDs. -func (c *InMemoryIndexCache) FetchMultiSeries(_ context.Context, blockID ulid.ULID, ids []storage.SeriesRef) (hits map[storage.SeriesRef][]byte, misses []storage.SeriesRef) { - timer := prometheus.NewTimer(c.commonMetrics.fetchLatency.WithLabelValues(cacheTypeSeries)) +func (c *InMemoryIndexCache) FetchMultiSeries(_ context.Context, blockID ulid.ULID, ids []storage.SeriesRef, tenant string) (hits map[storage.SeriesRef][]byte, misses []storage.SeriesRef) { + timer := prometheus.NewTimer(c.commonMetrics.fetchLatency.WithLabelValues(cacheTypeSeries, tenant)) defer timer.ObserveDuration() hits = map[storage.SeriesRef][]byte{} blockIDKey := blockID.String() for _, id := range ids { - if b, ok := c.get(cacheTypeSeries, cacheKey{blockIDKey, cacheKeySeries(id), ""}); ok { + if b, ok := c.get(cacheTypeSeries, cacheKey{blockIDKey, cacheKeySeries(id), ""}, tenant); ok { hits[id] = b continue } diff --git a/vendor/github.com/thanos-io/thanos/pkg/store/cache/memcached.go b/vendor/github.com/thanos-io/thanos/pkg/store/cache/memcached.go index 104b936e8c..bc8bb5b52c 100644 --- a/vendor/github.com/thanos-io/thanos/pkg/store/cache/memcached.go +++ b/vendor/github.com/thanos-io/thanos/pkg/store/cache/memcached.go @@ -15,6 +15,7 @@ import ( "github.com/prometheus/prometheus/storage" "github.com/thanos-io/thanos/pkg/cacheutil" + "github.com/thanos-io/thanos/pkg/tenancy" ) const ( @@ -33,18 +34,10 @@ type RemoteIndexCache struct { compressionScheme string // Metrics. - postingRequests prometheus.Counter - seriesRequests prometheus.Counter - expandedPostingRequests prometheus.Counter - postingHits prometheus.Counter - seriesHits prometheus.Counter - expandedPostingHits prometheus.Counter - postingDataSizeBytes prometheus.Observer - expandedPostingDataSizeBytes prometheus.Observer - seriesDataSizeBytes prometheus.Observer - postingsFetchDuration prometheus.Observer - expandedPostingsFetchDuration prometheus.Observer - seriesFetchDuration prometheus.Observer + requestTotal *prometheus.CounterVec + hitsTotal *prometheus.CounterVec + dataSizeBytes *prometheus.HistogramVec + fetchLatency *prometheus.HistogramVec } // NewRemoteIndexCache makes a new RemoteIndexCache. @@ -59,21 +52,23 @@ func NewRemoteIndexCache(logger log.Logger, cacheClient cacheutil.RemoteCacheCli commonMetrics = newCommonMetrics(reg) } - c.postingRequests = commonMetrics.requestTotal.WithLabelValues(cacheTypePostings) - c.seriesRequests = commonMetrics.requestTotal.WithLabelValues(cacheTypeSeries) - c.expandedPostingRequests = commonMetrics.requestTotal.WithLabelValues(cacheTypeExpandedPostings) + c.requestTotal = commonMetrics.requestTotal + c.hitsTotal = commonMetrics.hitsTotal + c.dataSizeBytes = commonMetrics.dataSizeBytes + c.fetchLatency = commonMetrics.fetchLatency - c.postingHits = commonMetrics.hitsTotal.WithLabelValues(cacheTypePostings) - c.seriesHits = commonMetrics.hitsTotal.WithLabelValues(cacheTypeSeries) - c.expandedPostingHits = commonMetrics.hitsTotal.WithLabelValues(cacheTypeExpandedPostings) + // Init requestTtotal and hitsTotal with default tenant + c.requestTotal.WithLabelValues(cacheTypePostings, tenancy.DefaultTenant) + c.requestTotal.WithLabelValues(cacheTypeSeries, tenancy.DefaultTenant) + c.requestTotal.WithLabelValues(cacheTypeExpandedPostings, tenancy.DefaultTenant) - c.postingDataSizeBytes = commonMetrics.dataSizeBytes.WithLabelValues(cacheTypePostings) - c.seriesDataSizeBytes = commonMetrics.dataSizeBytes.WithLabelValues(cacheTypeSeries) - c.expandedPostingDataSizeBytes = commonMetrics.dataSizeBytes.WithLabelValues(cacheTypeExpandedPostings) + c.hitsTotal.WithLabelValues(cacheTypePostings, tenancy.DefaultTenant) + c.hitsTotal.WithLabelValues(cacheTypeSeries, tenancy.DefaultTenant) + c.hitsTotal.WithLabelValues(cacheTypeExpandedPostings, tenancy.DefaultTenant) - c.postingsFetchDuration = commonMetrics.fetchLatency.WithLabelValues(cacheTypePostings) - c.seriesFetchDuration = commonMetrics.fetchLatency.WithLabelValues(cacheTypeSeries) - c.expandedPostingsFetchDuration = commonMetrics.fetchLatency.WithLabelValues(cacheTypeExpandedPostings) + c.fetchLatency.WithLabelValues(cacheTypePostings, tenancy.DefaultTenant) + c.fetchLatency.WithLabelValues(cacheTypeSeries, tenancy.DefaultTenant) + c.fetchLatency.WithLabelValues(cacheTypeExpandedPostings, tenancy.DefaultTenant) level.Info(logger).Log("msg", "created index cache") @@ -83,8 +78,8 @@ func NewRemoteIndexCache(logger log.Logger, cacheClient cacheutil.RemoteCacheCli // StorePostings sets the postings identified by the ulid and label to the value v. // The function enqueues the request and returns immediately: the entry will be // asynchronously stored in the cache. -func (c *RemoteIndexCache) StorePostings(blockID ulid.ULID, l labels.Label, v []byte) { - c.postingDataSizeBytes.Observe(float64(len(v))) +func (c *RemoteIndexCache) StorePostings(blockID ulid.ULID, l labels.Label, v []byte, tenant string) { + c.dataSizeBytes.WithLabelValues(cacheTypePostings, tenant).Observe(float64(len(v))) key := cacheKey{blockID.String(), cacheKeyPostings(l), c.compressionScheme}.string() if err := c.memcached.SetAsync(key, v, memcachedDefaultTTL); err != nil { level.Error(c.logger).Log("msg", "failed to cache postings in memcached", "err", err) @@ -94,8 +89,8 @@ func (c *RemoteIndexCache) StorePostings(blockID ulid.ULID, l labels.Label, v [] // FetchMultiPostings fetches multiple postings - each identified by a label - // and returns a map containing cache hits, along with a list of missing keys. // In case of error, it logs and return an empty cache hits map. -func (c *RemoteIndexCache) FetchMultiPostings(ctx context.Context, blockID ulid.ULID, lbls []labels.Label) (hits map[labels.Label][]byte, misses []labels.Label) { - timer := prometheus.NewTimer(c.postingsFetchDuration) +func (c *RemoteIndexCache) FetchMultiPostings(ctx context.Context, blockID ulid.ULID, lbls []labels.Label, tenant string) (hits map[labels.Label][]byte, misses []labels.Label) { + timer := prometheus.NewTimer(c.fetchLatency.WithLabelValues(cacheTypePostings, tenant)) defer timer.ObserveDuration() keys := make([]string, 0, len(lbls)) @@ -107,7 +102,8 @@ func (c *RemoteIndexCache) FetchMultiPostings(ctx context.Context, blockID ulid. } // Fetch the keys from memcached in a single request. - c.postingRequests.Add(float64(len(keys))) + c.requestTotal.WithLabelValues(cacheTypePostings, tenant).Add(float64(len(keys))) + results := c.memcached.GetMulti(ctx, keys) if len(results) == 0 { return nil, lbls @@ -127,16 +123,15 @@ func (c *RemoteIndexCache) FetchMultiPostings(ctx context.Context, blockID ulid. hits[lbl] = value } - - c.postingHits.Add(float64(len(hits))) + c.hitsTotal.WithLabelValues(cacheTypePostings, tenant).Add(float64(len(hits))) return hits, misses } // StoreExpandedPostings sets the postings identified by the ulid and label to the value v. // The function enqueues the request and returns immediately: the entry will be // asynchronously stored in the cache. -func (c *RemoteIndexCache) StoreExpandedPostings(blockID ulid.ULID, keys []*labels.Matcher, v []byte) { - c.expandedPostingDataSizeBytes.Observe(float64(len(v))) +func (c *RemoteIndexCache) StoreExpandedPostings(blockID ulid.ULID, keys []*labels.Matcher, v []byte, tenant string) { + c.dataSizeBytes.WithLabelValues(cacheTypeExpandedPostings, tenant).Observe(float64(len(v))) key := cacheKey{blockID.String(), cacheKeyExpandedPostings(labelMatchersToString(keys)), c.compressionScheme}.string() if err := c.memcached.SetAsync(key, v, memcachedDefaultTTL); err != nil { @@ -147,20 +142,20 @@ func (c *RemoteIndexCache) StoreExpandedPostings(blockID ulid.ULID, keys []*labe // FetchExpandedPostings fetches multiple postings - each identified by a label - // and returns a map containing cache hits, along with a list of missing keys. // In case of error, it logs and return an empty cache hits map. -func (c *RemoteIndexCache) FetchExpandedPostings(ctx context.Context, blockID ulid.ULID, lbls []*labels.Matcher) ([]byte, bool) { - timer := prometheus.NewTimer(c.expandedPostingsFetchDuration) +func (c *RemoteIndexCache) FetchExpandedPostings(ctx context.Context, blockID ulid.ULID, lbls []*labels.Matcher, tenant string) ([]byte, bool) { + timer := prometheus.NewTimer(c.fetchLatency.WithLabelValues(cacheTypeExpandedPostings, tenant)) defer timer.ObserveDuration() key := cacheKey{blockID.String(), cacheKeyExpandedPostings(labelMatchersToString(lbls)), c.compressionScheme}.string() // Fetch the keys from memcached in a single request. - c.expandedPostingRequests.Add(1) + c.requestTotal.WithLabelValues(cacheTypeExpandedPostings, tenant).Add(1) results := c.memcached.GetMulti(ctx, []string{key}) if len(results) == 0 { return nil, false } if res, ok := results[key]; ok { - c.expandedPostingHits.Add(1) + c.hitsTotal.WithLabelValues(cacheTypeExpandedPostings, tenant).Add(1) return res, true } return nil, false @@ -169,8 +164,8 @@ func (c *RemoteIndexCache) FetchExpandedPostings(ctx context.Context, blockID ul // StoreSeries sets the series identified by the ulid and id to the value v. // The function enqueues the request and returns immediately: the entry will be // asynchronously stored in the cache. -func (c *RemoteIndexCache) StoreSeries(blockID ulid.ULID, id storage.SeriesRef, v []byte) { - c.seriesDataSizeBytes.Observe(float64(len(v))) +func (c *RemoteIndexCache) StoreSeries(blockID ulid.ULID, id storage.SeriesRef, v []byte, tenant string) { + c.dataSizeBytes.WithLabelValues(cacheTypeSeries, tenant).Observe(float64(len(v))) key := cacheKey{blockID.String(), cacheKeySeries(id), ""}.string() if err := c.memcached.SetAsync(key, v, memcachedDefaultTTL); err != nil { @@ -181,8 +176,8 @@ func (c *RemoteIndexCache) StoreSeries(blockID ulid.ULID, id storage.SeriesRef, // FetchMultiSeries fetches multiple series - each identified by ID - from the cache // and returns a map containing cache hits, along with a list of missing IDs. // In case of error, it logs and return an empty cache hits map. -func (c *RemoteIndexCache) FetchMultiSeries(ctx context.Context, blockID ulid.ULID, ids []storage.SeriesRef) (hits map[storage.SeriesRef][]byte, misses []storage.SeriesRef) { - timer := prometheus.NewTimer(c.seriesFetchDuration) +func (c *RemoteIndexCache) FetchMultiSeries(ctx context.Context, blockID ulid.ULID, ids []storage.SeriesRef, tenant string) (hits map[storage.SeriesRef][]byte, misses []storage.SeriesRef) { + timer := prometheus.NewTimer(c.fetchLatency.WithLabelValues(cacheTypeSeries, tenant)) defer timer.ObserveDuration() keys := make([]string, 0, len(ids)) @@ -194,7 +189,7 @@ func (c *RemoteIndexCache) FetchMultiSeries(ctx context.Context, blockID ulid.UL } // Fetch the keys from memcached in a single request. - c.seriesRequests.Add(float64(len(ids))) + c.requestTotal.WithLabelValues(cacheTypeSeries, tenant).Add(float64(len(ids))) results := c.memcached.GetMulti(ctx, keys) if len(results) == 0 { return nil, ids @@ -214,8 +209,7 @@ func (c *RemoteIndexCache) FetchMultiSeries(ctx context.Context, blockID ulid.UL hits[id] = value } - - c.seriesHits.Add(float64(len(hits))) + c.hitsTotal.WithLabelValues(cacheTypeSeries, tenant).Add(float64(len(hits))) return hits, misses } diff --git a/vendor/github.com/thanos-io/thanos/pkg/store/lazy_postings.go b/vendor/github.com/thanos-io/thanos/pkg/store/lazy_postings.go index 2e02836c0c..4fb4a155f9 100644 --- a/vendor/github.com/thanos-io/thanos/pkg/store/lazy_postings.go +++ b/vendor/github.com/thanos-io/thanos/pkg/store/lazy_postings.go @@ -148,6 +148,7 @@ func fetchLazyExpandedPostings( addAllPostings bool, lazyExpandedPostingEnabled bool, lazyExpandedPostingSizeBytes prometheus.Counter, + tenant string, ) (*lazyExpandedPostings, error) { var ( err error @@ -178,7 +179,7 @@ func fetchLazyExpandedPostings( } } - ps, matchers, err := fetchAndExpandPostingGroups(ctx, r, postingGroups, bytesLimiter) + ps, matchers, err := fetchAndExpandPostingGroups(ctx, r, postingGroups, bytesLimiter, tenant) if err != nil { return nil, err } @@ -220,9 +221,9 @@ func keysToFetchFromPostingGroups(postingGroups []*postingGroup) ([]labels.Label return keys, lazyMatchers } -func fetchAndExpandPostingGroups(ctx context.Context, r *bucketIndexReader, postingGroups []*postingGroup, bytesLimiter BytesLimiter) ([]storage.SeriesRef, []*labels.Matcher, error) { +func fetchAndExpandPostingGroups(ctx context.Context, r *bucketIndexReader, postingGroups []*postingGroup, bytesLimiter BytesLimiter, tenant string) ([]storage.SeriesRef, []*labels.Matcher, error) { keys, lazyMatchers := keysToFetchFromPostingGroups(postingGroups) - fetchedPostings, closeFns, err := r.fetchPostings(ctx, keys, bytesLimiter) + fetchedPostings, closeFns, err := r.fetchPostings(ctx, keys, bytesLimiter, tenant) defer func() { for _, closeFn := range closeFns { closeFn() diff --git a/vendor/github.com/thanos-io/thanos/pkg/tenancy/tenancy.go b/vendor/github.com/thanos-io/thanos/pkg/tenancy/tenancy.go index 13775cf6e1..4a874855fc 100644 --- a/vendor/github.com/thanos-io/thanos/pkg/tenancy/tenancy.go +++ b/vendor/github.com/thanos-io/thanos/pkg/tenancy/tenancy.go @@ -24,6 +24,8 @@ const ( DefaultTenantLabel = "tenant_id" // This key is used to pass tenant information using Context. TenantKey contextKey = 0 + // MetricLabel is the label name used for adding tenant information to exported metrics. + MetricLabel = "tenant" ) // Allowed fields in client certificates. diff --git a/vendor/modules.txt b/vendor/modules.txt index b74cc59f1f..7f22de0558 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -902,7 +902,7 @@ github.com/thanos-io/promql-engine/logicalplan github.com/thanos-io/promql-engine/parser github.com/thanos-io/promql-engine/query github.com/thanos-io/promql-engine/worker -# github.com/thanos-io/thanos v0.32.4-0.20231001083734-531cdb1e8ec3 +# github.com/thanos-io/thanos v0.32.5-0.20231006043659-79bbf34b4275 ## explicit; go 1.18 github.com/thanos-io/thanos/pkg/block github.com/thanos-io/thanos/pkg/block/indexheader