From 46ead63cde54298b67c7daa9c78ceefc213499f9 Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Tue, 24 Oct 2023 22:32:00 -0700 Subject: [PATCH] replace inmemory index cache to fastcache based implementation Signed-off-by: Ben Ye --- go.mod | 3 +- go.sum | 8 +- integration/querier_test.go | 19 +- pkg/storage/tsdb/index_cache.go | 2 +- pkg/storage/tsdb/inmemory_index_cache.go | 236 ++++++++++ pkg/storage/tsdb/inmemory_index_cache_test.go | 141 ++++++ pkg/storage/tsdb/multilevel_cache_test.go | 2 +- .../VictoriaMetrics/fastcache/LICENSE | 22 + .../VictoriaMetrics/fastcache/README.md | 116 +++++ .../VictoriaMetrics/fastcache/bigcache.go | 160 +++++++ .../VictoriaMetrics/fastcache/fastcache.go | 419 +++++++++++++++++ .../VictoriaMetrics/fastcache/file.go | 421 ++++++++++++++++++ .../VictoriaMetrics/fastcache/malloc_heap.go | 12 + .../VictoriaMetrics/fastcache/malloc_mmap.go | 54 +++ .../thanos-io/thanos/pkg/store/cache/cache.go | 105 ++--- .../thanos/pkg/store/cache/factory.go | 2 +- .../thanos/pkg/store/cache/filter_cache.go | 14 +- .../thanos/pkg/store/cache/inmemory.go | 114 ++--- .../thanos/pkg/store/cache/memcached.go | 66 +-- .../thanos-io/thanos/pkg/tenancy/tenancy.go | 27 +- vendor/modules.txt | 5 +- 21 files changed, 1777 insertions(+), 171 deletions(-) create mode 100644 pkg/storage/tsdb/inmemory_index_cache.go create mode 100644 pkg/storage/tsdb/inmemory_index_cache_test.go create mode 100644 vendor/github.com/VictoriaMetrics/fastcache/LICENSE create mode 100644 vendor/github.com/VictoriaMetrics/fastcache/README.md create mode 100644 vendor/github.com/VictoriaMetrics/fastcache/bigcache.go create mode 100644 vendor/github.com/VictoriaMetrics/fastcache/fastcache.go create mode 100644 vendor/github.com/VictoriaMetrics/fastcache/file.go create mode 100644 vendor/github.com/VictoriaMetrics/fastcache/malloc_heap.go create mode 100644 vendor/github.com/VictoriaMetrics/fastcache/malloc_mmap.go diff --git a/go.mod b/go.mod index 1186e5326c2..dc264812d06 100644 --- a/go.mod +++ b/go.mod @@ -53,7 +53,7 @@ require ( github.com/stretchr/testify v1.8.4 github.com/thanos-io/objstore v0.0.0-20230921130928-63a603e651ed github.com/thanos-io/promql-engine v0.0.0-20231013104847-4517c0d5f591 - github.com/thanos-io/thanos v0.32.5-0.20231023172853-513272e70874 + github.com/thanos-io/thanos v0.32.5-0.20231025032614-da62b6bf84b8 github.com/uber/jaeger-client-go v2.30.0+incompatible github.com/weaveworks/common v0.0.0-20221201103051-7c2720a9024d go.etcd.io/etcd/api/v3 v3.5.9 @@ -77,6 +77,7 @@ require ( ) require ( + github.com/VictoriaMetrics/fastcache v1.12.1 github.com/cespare/xxhash/v2 v2.2.0 github.com/google/go-cmp v0.5.9 google.golang.org/protobuf v1.31.0 diff --git a/go.sum b/go.sum index 4266ccf7399..bb8bceec40e 100644 --- a/go.sum +++ b/go.sum @@ -437,6 +437,8 @@ github.com/Microsoft/go-winio v0.6.1/go.mod h1:LRdKpFKfdobln8UmuiYcKPot9D2v6svN5 github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= github.com/OneOfOne/xxhash v1.2.6 h1:U68crOE3y3MPttCMQGywZOLrTeF5HHJ3/vDBCJn9/bA= github.com/OneOfOne/xxhash v1.2.6/go.mod h1:eZbhyaAYD41SGSSsnmcpxVoRiQ/MPUTjUdIIOT9Um7Q= +github.com/VictoriaMetrics/fastcache v1.12.1 h1:i0mICQuojGDL3KblA7wUNlY5lOK6a4bwt3uRKnkZU40= +github.com/VictoriaMetrics/fastcache v1.12.1/go.mod h1:tX04vaqcNoQeGLD+ra5pU5sWkuxnzWhEzLwhP9w653o= github.com/ajstarks/svgo v0.0.0-20180226025133-644b8db467af/go.mod h1:K08gAheRH3/J6wwsYMMT4xOr94bZjxIelGM0+d/wbFw= github.com/alecthomas/kingpin v1.3.8-0.20210301060133-17f40c25f497 h1:aDITxVUQ/3KBhpVWX57Vo9ntGTxoRw1F0T6/x/tRzNU= github.com/alecthomas/kingpin v1.3.8-0.20210301060133-17f40c25f497/go.mod h1:b6br6/pDFSfMkBgC96TbpOji05q5pa+v5rIlS0Y6XtI= @@ -455,6 +457,8 @@ github.com/alicebob/miniredis/v2 v2.30.4 h1:8S4/o1/KoUArAGbGwPxcwf0krlzceva2XVOS github.com/alicebob/miniredis/v2 v2.30.4/go.mod h1:b25qWj4fCEsBeAAR2mlb0ufImGC6uH3VlUfb/HS5zKg= github.com/aliyun/aliyun-oss-go-sdk v2.2.2+incompatible h1:9gWa46nstkJ9miBReJcN8Gq34cBFbzSpQZVVT9N09TM= github.com/aliyun/aliyun-oss-go-sdk v2.2.2+incompatible/go.mod h1:T/Aws4fEfogEE9v+HPhhw+CntffsBHJ8nXQCwKr0/g8= +github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156 h1:eMwmnE/GDgah4HI848JfFxHt+iPb26b4zyfspmqY0/8= +github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156/go.mod h1:Cb/ax3seSYIx7SuZdm2G2xzfwmv3TPSk2ucNfQESPXM= github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o= github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY= @@ -1277,8 +1281,8 @@ github.com/thanos-io/objstore v0.0.0-20230921130928-63a603e651ed h1:iWQdY3S6DpWj github.com/thanos-io/objstore v0.0.0-20230921130928-63a603e651ed/go.mod h1:oJ82xgcBDzGJrEgUsjlTj6n01+ZWUMMUR8BlZzX5xDE= github.com/thanos-io/promql-engine v0.0.0-20231013104847-4517c0d5f591 h1:6bZbFM+Mvy2kL8BeL8TJ5+5pV3sUR2PSLaZyw911rtQ= github.com/thanos-io/promql-engine v0.0.0-20231013104847-4517c0d5f591/go.mod h1:vfXJv1JXNdLfHnjsHsLLJl5tyI7KblF76Wo5lZ9YC4Q= -github.com/thanos-io/thanos v0.32.5-0.20231023172853-513272e70874 h1:7/j60inmFvV9uFvbxDdghob5DR92M7mvmn/Tw+CKK3o= -github.com/thanos-io/thanos v0.32.5-0.20231023172853-513272e70874/go.mod h1:eVFfte7jP1aTcTkQcZEj5/P9rCeMFHllEqfNZqirLLA= +github.com/thanos-io/thanos v0.32.5-0.20231025032614-da62b6bf84b8 h1:mWlY64XMYTFeCk4WziW33xerKsp+BWOck6g77cz9ZgA= +github.com/thanos-io/thanos v0.32.5-0.20231025032614-da62b6bf84b8/go.mod h1:eVFfte7jP1aTcTkQcZEj5/P9rCeMFHllEqfNZqirLLA= github.com/themihai/gomemcache v0.0.0-20180902122335-24332e2d58ab h1:7ZR3hmisBWw77ZpO1/o86g+JV3VKlk3d48jopJxzTjU= github.com/themihai/gomemcache v0.0.0-20180902122335-24332e2d58ab/go.mod h1:eheTFp954zcWZXCU8d0AT76ftsQOTo4DTqkN/h3k1MY= github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= diff --git a/integration/querier_test.go b/integration/querier_test.go index e7bf94356a1..8d650517133 100644 --- a/integration/querier_test.go +++ b/integration/querier_test.go @@ -257,10 +257,7 @@ func TestQuerierWithBlocksStorageRunningInMicroservicesMode(t *testing.T) { require.NoError(t, storeGateways.WaitSumMetrics(e2e.Equals(float64((5+5+2)*numberOfCacheBackends)), "thanos_store_index_cache_requests_total")) require.NoError(t, storeGateways.WaitSumMetrics(e2e.Equals(0), "thanos_store_index_cache_hits_total")) // no cache hit cause the cache was empty - if testCfg.indexCacheBackend == tsdb.IndexCacheBackendInMemory { - require.NoError(t, storeGateways.WaitSumMetrics(e2e.Equals(9), "thanos_store_index_cache_items")) - require.NoError(t, storeGateways.WaitSumMetrics(e2e.Equals(9), "thanos_store_index_cache_items_added_total")) - } else if testCfg.indexCacheBackend == tsdb.IndexCacheBackendMemcached { + if testCfg.indexCacheBackend == tsdb.IndexCacheBackendMemcached { require.NoError(t, storeGateways.WaitSumMetrics(e2e.Equals(21), "thanos_memcached_operations_total")) // 14 gets + 7 sets } @@ -297,10 +294,6 @@ func TestQuerierWithBlocksStorageRunningInMicroservicesMode(t *testing.T) { } require.NoError(t, storeGateways.WaitSumMetrics(e2e.Equals(2), "thanos_store_index_cache_hits_total")) // this time has used the index cache - if testCfg.indexCacheBackend == tsdb.IndexCacheBackendInMemory { - require.NoError(t, storeGateways.WaitSumMetrics(e2e.Equals(9), "thanos_store_index_cache_items")) // as before - require.NoError(t, storeGateways.WaitSumMetrics(e2e.Equals(9), "thanos_store_index_cache_items_added_total")) // as before - } if testCfg.indexCacheBackend == tsdb.IndexCacheBackendMemcached { require.NoError(t, storeGateways.WaitSumMetrics(e2e.Equals(23-l0CacheHits), "thanos_memcached_operations_total")) // as before + 2 gets - cache hits } @@ -516,10 +509,7 @@ func TestQuerierWithBlocksStorageRunningInSingleBinaryMode(t *testing.T) { require.NoError(t, cluster.WaitSumMetrics(e2e.Equals(float64((5+5+2)*seriesReplicationFactor)), "thanos_store_index_cache_requests_total")) // 5 for expanded postings and postings, 2 for series require.NoError(t, cluster.WaitSumMetrics(e2e.Equals(0), "thanos_store_index_cache_hits_total")) // no cache hit cause the cache was empty - if testCfg.indexCacheBackend == tsdb.IndexCacheBackendInMemory { - require.NoError(t, cluster.WaitSumMetrics(e2e.Equals(float64(9*seriesReplicationFactor)), "thanos_store_index_cache_items")) - require.NoError(t, cluster.WaitSumMetrics(e2e.Equals(float64(9*seriesReplicationFactor)), "thanos_store_index_cache_items_added_total")) - } else if testCfg.indexCacheBackend == tsdb.IndexCacheBackendMemcached { + if testCfg.indexCacheBackend == tsdb.IndexCacheBackendMemcached { require.NoError(t, cluster.WaitSumMetrics(e2e.Equals(float64(21*seriesReplicationFactor)), "thanos_memcached_operations_total")) // 14 gets + 7 sets } @@ -532,10 +522,7 @@ func TestQuerierWithBlocksStorageRunningInSingleBinaryMode(t *testing.T) { require.NoError(t, cluster.WaitSumMetrics(e2e.Equals(float64((12+2)*seriesReplicationFactor)), "thanos_store_index_cache_requests_total")) require.NoError(t, cluster.WaitSumMetrics(e2e.Equals(float64(2*seriesReplicationFactor)), "thanos_store_index_cache_hits_total")) // this time has used the index cache - if testCfg.indexCacheBackend == tsdb.IndexCacheBackendInMemory { - require.NoError(t, cluster.WaitSumMetrics(e2e.Equals(float64(9*seriesReplicationFactor)), "thanos_store_index_cache_items")) // as before - require.NoError(t, cluster.WaitSumMetrics(e2e.Equals(float64(9*seriesReplicationFactor)), "thanos_store_index_cache_items_added_total")) // as before - } else if testCfg.indexCacheBackend == tsdb.IndexCacheBackendMemcached { + if testCfg.indexCacheBackend == tsdb.IndexCacheBackendMemcached { require.NoError(t, cluster.WaitSumMetrics(e2e.Equals(float64((21+2)*seriesReplicationFactor)), "thanos_memcached_operations_total")) // as before + 2 gets } diff --git a/pkg/storage/tsdb/index_cache.go b/pkg/storage/tsdb/index_cache.go index 195aa9a24cb..6668d817871 100644 --- a/pkg/storage/tsdb/index_cache.go +++ b/pkg/storage/tsdb/index_cache.go @@ -222,7 +222,7 @@ func newInMemoryIndexCache(cfg InMemoryIndexCacheConfig, logger log.Logger, regi maxItemSize = maxCacheSize } - return storecache.NewInMemoryIndexCacheWithConfig(logger, nil, registerer, storecache.InMemoryIndexCacheConfig{ + return NewInMemoryIndexCacheWithConfig(logger, nil, registerer, storecache.InMemoryIndexCacheConfig{ MaxSize: maxCacheSize, MaxItemSize: maxItemSize, }) diff --git a/pkg/storage/tsdb/inmemory_index_cache.go b/pkg/storage/tsdb/inmemory_index_cache.go new file mode 100644 index 00000000000..8afb06464cd --- /dev/null +++ b/pkg/storage/tsdb/inmemory_index_cache.go @@ -0,0 +1,236 @@ +package tsdb + +import ( + "context" + "reflect" + "unsafe" + + "github.com/VictoriaMetrics/fastcache" + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/oklog/ulid" + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/storage" + + storecache "github.com/thanos-io/thanos/pkg/store/cache" + "github.com/thanos-io/thanos/pkg/tenancy" +) + +type InMemoryIndexCache struct { + logger log.Logger + cache *fastcache.Cache + maxItemSizeBytes uint64 + + added *prometheus.CounterVec + overflow *prometheus.CounterVec + + commonMetrics *storecache.CommonMetrics +} + +// NewInMemoryIndexCacheWithConfig creates a new thread-safe cache for index entries. It relies on the cache library +// (fastcache) to ensures the total cache size approximately does not exceed maxBytes. +func NewInMemoryIndexCacheWithConfig(logger log.Logger, commonMetrics *storecache.CommonMetrics, reg prometheus.Registerer, config storecache.InMemoryIndexCacheConfig) (*InMemoryIndexCache, error) { + if config.MaxItemSize > config.MaxSize { + return nil, errors.Errorf("max item size (%v) cannot be bigger than overall cache size (%v)", config.MaxItemSize, config.MaxSize) + } + + // fastcache will panic if MaxSize <= 0. + if config.MaxSize <= 0 { + config.MaxSize = storecache.DefaultInMemoryIndexCacheConfig.MaxSize + } + + if commonMetrics == nil { + commonMetrics = storecache.NewCommonMetrics(reg) + } + + c := &InMemoryIndexCache{ + logger: logger, + maxItemSizeBytes: uint64(config.MaxItemSize), + commonMetrics: commonMetrics, + } + + c.added = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Name: "thanos_store_index_cache_items_added_total", + Help: "Total number of items that were added to the index cache.", + }, []string{"item_type"}) + c.added.WithLabelValues(cacheTypePostings) + c.added.WithLabelValues(cacheTypeSeries) + c.added.WithLabelValues(cacheTypeExpandedPostings) + + c.commonMetrics.RequestTotal.WithLabelValues(cacheTypePostings, tenancy.DefaultTenant) + c.commonMetrics.RequestTotal.WithLabelValues(cacheTypeSeries, tenancy.DefaultTenant) + c.commonMetrics.RequestTotal.WithLabelValues(cacheTypeExpandedPostings, tenancy.DefaultTenant) + + c.overflow = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Name: "thanos_store_index_cache_items_overflowed_total", + Help: "Total number of items that could not be added to the cache due to being too big.", + }, []string{"item_type"}) + c.overflow.WithLabelValues(cacheTypePostings) + c.overflow.WithLabelValues(cacheTypeSeries) + c.overflow.WithLabelValues(cacheTypeExpandedPostings) + + c.commonMetrics.HitsTotal.WithLabelValues(cacheTypePostings, tenancy.DefaultTenant) + c.commonMetrics.HitsTotal.WithLabelValues(cacheTypeSeries, tenancy.DefaultTenant) + c.commonMetrics.HitsTotal.WithLabelValues(cacheTypeExpandedPostings, tenancy.DefaultTenant) + + c.cache = fastcache.New(int(config.MaxSize)) + level.Info(logger).Log( + "msg", "created in-memory index cache", + "maxItemSizeBytes", c.maxItemSizeBytes, + "maxSizeBytes", config.MaxSize, + ) + return c, nil +} + +func (c *InMemoryIndexCache) get(key storecache.CacheKey) ([]byte, bool) { + k := yoloBuf(key.String()) + resp := c.cache.GetBig(nil, k) + if len(resp) == 0 { + return nil, false + } + return resp, true +} + +func (c *InMemoryIndexCache) set(typ string, key storecache.CacheKey, val []byte) { + k := yoloBuf(key.String()) + r := c.cache.GetBig(nil, k) + // item exists, no need to set it again. + if r != nil { + return + } + + size := uint64(len(k) + len(val)) + if size > c.maxItemSizeBytes { + level.Info(c.logger).Log( + "msg", "item bigger than maxItemSizeBytes. Ignoring..", + "maxItemSizeBytes", c.maxItemSizeBytes, + "cacheType", typ, + ) + c.overflow.WithLabelValues(typ).Inc() + return + } + + c.cache.SetBig(k, val) + c.added.WithLabelValues(typ).Inc() +} + +func yoloBuf(s string) []byte { + return *(*[]byte)(unsafe.Pointer(&s)) +} + +func copyString(s string) string { + var b []byte + h := (*reflect.SliceHeader)(unsafe.Pointer(&b)) + h.Data = (*reflect.StringHeader)(unsafe.Pointer(&s)).Data + h.Len = len(s) + h.Cap = len(s) + return string(b) +} + +// copyToKey is required as underlying strings might be mmaped. +func copyToKey(l labels.Label) storecache.CacheKeyPostings { + return storecache.CacheKeyPostings(labels.Label{Value: copyString(l.Value), Name: copyString(l.Name)}) +} + +// StorePostings sets the postings identified by the ulid and label to the value v, +// if the postings already exists in the cache it is not mutated. +func (c *InMemoryIndexCache) StorePostings(blockID ulid.ULID, l labels.Label, v []byte, tenant string) { + c.commonMetrics.DataSizeBytes.WithLabelValues(cacheTypePostings, tenant).Observe(float64(len(v))) + c.set(cacheTypePostings, storecache.CacheKey{Block: blockID.String(), Key: copyToKey(l)}, v) +} + +// FetchMultiPostings fetches multiple postings - each identified by a label - +// and returns a map containing cache hits, along with a list of missing keys. +func (c *InMemoryIndexCache) FetchMultiPostings(ctx context.Context, blockID ulid.ULID, keys []labels.Label, tenant string) (hits map[labels.Label][]byte, misses []labels.Label) { + timer := prometheus.NewTimer(c.commonMetrics.FetchLatency.WithLabelValues(cacheTypePostings, tenant)) + defer timer.ObserveDuration() + + hits = map[labels.Label][]byte{} + + blockIDKey := blockID.String() + requests := 0 + hit := 0 + for _, key := range keys { + if ctx.Err() != nil { + c.commonMetrics.RequestTotal.WithLabelValues(cacheTypePostings, tenant).Add(float64(requests)) + c.commonMetrics.HitsTotal.WithLabelValues(cacheTypePostings, tenant).Add(float64(hit)) + return hits, misses + } + requests++ + if b, ok := c.get(storecache.CacheKey{Block: blockIDKey, Key: storecache.CacheKeyPostings(key)}); ok { + hit++ + hits[key] = b + continue + } + + misses = append(misses, key) + } + c.commonMetrics.RequestTotal.WithLabelValues(cacheTypePostings, tenant).Add(float64(requests)) + c.commonMetrics.HitsTotal.WithLabelValues(cacheTypePostings, tenant).Add(float64(hit)) + + return hits, misses +} + +// StoreExpandedPostings stores expanded postings for a set of label matchers. +func (c *InMemoryIndexCache) StoreExpandedPostings(blockID ulid.ULID, matchers []*labels.Matcher, v []byte, tenant string) { + c.commonMetrics.DataSizeBytes.WithLabelValues(cacheTypeExpandedPostings, tenant).Observe(float64(len(v))) + c.set(cacheTypeExpandedPostings, storecache.CacheKey{Block: blockID.String(), Key: storecache.CacheKeyExpandedPostings(storecache.LabelMatchersToString(matchers))}, v) +} + +// FetchExpandedPostings fetches expanded postings and returns cached data and a boolean value representing whether it is a cache hit or not. +func (c *InMemoryIndexCache) FetchExpandedPostings(ctx context.Context, blockID ulid.ULID, matchers []*labels.Matcher, tenant string) ([]byte, bool) { + timer := prometheus.NewTimer(c.commonMetrics.FetchLatency.WithLabelValues(cacheTypeExpandedPostings, tenant)) + defer timer.ObserveDuration() + + if ctx.Err() != nil { + return nil, false + } + c.commonMetrics.RequestTotal.WithLabelValues(cacheTypeExpandedPostings, tenant).Inc() + if b, ok := c.get(storecache.CacheKey{Block: blockID.String(), Key: storecache.CacheKeyExpandedPostings(storecache.LabelMatchersToString(matchers))}); ok { + c.commonMetrics.HitsTotal.WithLabelValues(cacheTypeExpandedPostings, tenant).Inc() + return b, true + } + return nil, false +} + +// StoreSeries sets the series identified by the ulid and id to the value v, +// if the series already exists in the cache it is not mutated. +func (c *InMemoryIndexCache) StoreSeries(blockID ulid.ULID, id storage.SeriesRef, v []byte, tenant string) { + c.commonMetrics.DataSizeBytes.WithLabelValues(cacheTypeSeries, tenant).Observe(float64(len(v))) + c.set(cacheTypeSeries, storecache.CacheKey{Block: blockID.String(), Key: storecache.CacheKeySeries(id)}, v) +} + +// FetchMultiSeries fetches multiple series - each identified by ID - from the cache +// and returns a map containing cache hits, along with a list of missing IDs. +func (c *InMemoryIndexCache) FetchMultiSeries(ctx context.Context, blockID ulid.ULID, ids []storage.SeriesRef, tenant string) (hits map[storage.SeriesRef][]byte, misses []storage.SeriesRef) { + timer := prometheus.NewTimer(c.commonMetrics.FetchLatency.WithLabelValues(cacheTypeSeries, tenant)) + defer timer.ObserveDuration() + + hits = map[storage.SeriesRef][]byte{} + + blockIDKey := blockID.String() + requests := 0 + hit := 0 + for _, id := range ids { + if ctx.Err() != nil { + c.commonMetrics.RequestTotal.WithLabelValues(cacheTypeSeries, tenant).Add(float64(requests)) + c.commonMetrics.HitsTotal.WithLabelValues(cacheTypeSeries, tenant).Add(float64(hit)) + return hits, misses + } + requests++ + if b, ok := c.get(storecache.CacheKey{Block: blockIDKey, Key: storecache.CacheKeySeries(id)}); ok { + hit++ + hits[id] = b + continue + } + + misses = append(misses, id) + } + c.commonMetrics.RequestTotal.WithLabelValues(cacheTypeSeries, tenant).Add(float64(requests)) + c.commonMetrics.HitsTotal.WithLabelValues(cacheTypeSeries, tenant).Add(float64(hit)) + + return hits, misses +} diff --git a/pkg/storage/tsdb/inmemory_index_cache_test.go b/pkg/storage/tsdb/inmemory_index_cache_test.go new file mode 100644 index 00000000000..f01896d1ea2 --- /dev/null +++ b/pkg/storage/tsdb/inmemory_index_cache_test.go @@ -0,0 +1,141 @@ +package tsdb + +import ( + "bytes" + "context" + "fmt" + "strconv" + "strings" + "testing" + + "github.com/efficientgo/core/testutil" + "github.com/go-kit/log" + "github.com/oklog/ulid" + "github.com/prometheus/client_golang/prometheus" + prom_testutil "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/storage" + storecache "github.com/thanos-io/thanos/pkg/store/cache" + "github.com/thanos-io/thanos/pkg/tenancy" +) + +func TestInMemoryIndexCache_UpdateItem(t *testing.T) { + var errorLogs []string + errorLogger := log.LoggerFunc(func(kvs ...interface{}) error { + var lvl string + for i := 0; i < len(kvs); i += 2 { + if kvs[i] == "level" { + lvl = fmt.Sprint(kvs[i+1]) + break + } + } + if lvl != "error" { + return nil + } + var buf bytes.Buffer + defer func() { errorLogs = append(errorLogs, buf.String()) }() + return log.NewLogfmtLogger(&buf).Log(kvs...) + }) + + metrics := prometheus.NewRegistry() + cache, err := NewInMemoryIndexCacheWithConfig(log.NewSyncLogger(errorLogger), nil, metrics, storecache.InMemoryIndexCacheConfig{ + MaxItemSize: 1024, + MaxSize: 1024, + }) + testutil.Ok(t, err) + + uid := func(id storage.SeriesRef) ulid.ULID { return ulid.MustNew(uint64(id), nil) } + lbl := labels.Label{Name: "foo", Value: "bar"} + matcher := labels.MustNewMatcher(labels.MatchEqual, "foo", "bar") + ctx := context.Background() + + for _, tt := range []struct { + typ string + set func(storage.SeriesRef, []byte) + get func(storage.SeriesRef) ([]byte, bool) + }{ + { + typ: cacheTypePostings, + set: func(id storage.SeriesRef, b []byte) { cache.StorePostings(uid(id), lbl, b, tenancy.DefaultTenant) }, + get: func(id storage.SeriesRef) ([]byte, bool) { + hits, _ := cache.FetchMultiPostings(ctx, uid(id), []labels.Label{lbl}, tenancy.DefaultTenant) + b, ok := hits[lbl] + + return b, ok + }, + }, + { + typ: cacheTypeSeries, + set: func(id storage.SeriesRef, b []byte) { cache.StoreSeries(uid(id), id, b, tenancy.DefaultTenant) }, + get: func(id storage.SeriesRef) ([]byte, bool) { + hits, _ := cache.FetchMultiSeries(ctx, uid(id), []storage.SeriesRef{id}, tenancy.DefaultTenant) + b, ok := hits[id] + + return b, ok + }, + }, + { + typ: cacheTypeExpandedPostings, + set: func(id storage.SeriesRef, b []byte) { + cache.StoreExpandedPostings(uid(id), []*labels.Matcher{matcher}, b, tenancy.DefaultTenant) + }, + get: func(id storage.SeriesRef) ([]byte, bool) { + return cache.FetchExpandedPostings(ctx, uid(id), []*labels.Matcher{matcher}, tenancy.DefaultTenant) + }, + }, + } { + t.Run(tt.typ, func(t *testing.T) { + defer func() { errorLogs = nil }() + + // Set value. + tt.set(0, []byte{0}) + buf, ok := tt.get(0) + testutil.Equals(t, true, ok) + testutil.Equals(t, []byte{0}, buf) + testutil.Equals(t, []string(nil), errorLogs) + + // Set the same value again. + tt.set(0, []byte{0}) + buf, ok = tt.get(0) + testutil.Equals(t, true, ok) + testutil.Equals(t, []byte{0}, buf) + testutil.Equals(t, []string(nil), errorLogs) + + // Set a larger value. + tt.set(1, []byte{0, 1}) + buf, ok = tt.get(1) + testutil.Equals(t, true, ok) + testutil.Equals(t, []byte{0, 1}, buf) + testutil.Equals(t, []string(nil), errorLogs) + + // Mutations to existing values will be ignored. + tt.set(1, []byte{1, 2}) + buf, ok = tt.get(1) + testutil.Equals(t, true, ok) + testutil.Equals(t, []byte{0, 1}, buf) + testutil.Equals(t, []string(nil), errorLogs) + }) + } +} + +func TestInMemoryIndexCacheSetOverflow(t *testing.T) { + config := storecache.InMemoryIndexCacheConfig{ + MaxSize: storecache.DefaultInMemoryIndexCacheConfig.MaxSize, + MaxItemSize: 100, + } + cache, err := NewInMemoryIndexCacheWithConfig(log.NewNopLogger(), nil, nil, config) + testutil.Ok(t, err) + counter := cache.overflow.WithLabelValues(cacheTypeSeries) + id := ulid.MustNew(ulid.Now(), nil) + // Insert a small value won't trigger item overflow. + cache.StoreSeries(id, 1, []byte("0"), tenancy.DefaultTenant) + testutil.Equals(t, float64(0), prom_testutil.ToFloat64(counter)) + + var sb strings.Builder + for i := 0; i < 100; i++ { + sb.WriteString(strconv.Itoa(i)) + } + // Trigger overflow with a large value. + cache.StoreSeries(id, 2, []byte(sb.String()), tenancy.DefaultTenant) + testutil.Equals(t, float64(1), prom_testutil.ToFloat64(counter)) +} diff --git a/pkg/storage/tsdb/multilevel_cache_test.go b/pkg/storage/tsdb/multilevel_cache_test.go index 4434fb493c8..c37e05391cd 100644 --- a/pkg/storage/tsdb/multilevel_cache_test.go +++ b/pkg/storage/tsdb/multilevel_cache_test.go @@ -32,7 +32,7 @@ func Test_MultiIndexCacheInstantiation(t *testing.T) { cfg: IndexCacheConfig{ Backend: "inmemory", }, - expectedType: &storecache.InMemoryIndexCache{}, + expectedType: &InMemoryIndexCache{}, }, "instantiate multiples backends - inmemory/redis": { cfg: IndexCacheConfig{ diff --git a/vendor/github.com/VictoriaMetrics/fastcache/LICENSE b/vendor/github.com/VictoriaMetrics/fastcache/LICENSE new file mode 100644 index 00000000000..9a8145e5834 --- /dev/null +++ b/vendor/github.com/VictoriaMetrics/fastcache/LICENSE @@ -0,0 +1,22 @@ +The MIT License (MIT) + +Copyright (c) 2018 VictoriaMetrics + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. + diff --git a/vendor/github.com/VictoriaMetrics/fastcache/README.md b/vendor/github.com/VictoriaMetrics/fastcache/README.md new file mode 100644 index 00000000000..b353214af69 --- /dev/null +++ b/vendor/github.com/VictoriaMetrics/fastcache/README.md @@ -0,0 +1,116 @@ +[![Build Status](https://github.com/VictoriaMetrics/fastcache/workflows/main/badge.svg)](https://github.com/VictoriaMetrics/fastcache/actions) +[![GoDoc](https://godoc.org/github.com/VictoriaMetrics/fastcache?status.svg)](http://godoc.org/github.com/VictoriaMetrics/fastcache) +[![Go Report](https://goreportcard.com/badge/github.com/VictoriaMetrics/fastcache)](https://goreportcard.com/report/github.com/VictoriaMetrics/fastcache) +[![codecov](https://codecov.io/gh/VictoriaMetrics/fastcache/branch/master/graph/badge.svg)](https://codecov.io/gh/VictoriaMetrics/fastcache) + +# fastcache - fast thread-safe inmemory cache for big number of entries in Go + +### Features + +* Fast. Performance scales on multi-core CPUs. See benchmark results below. +* Thread-safe. Concurrent goroutines may read and write into a single + cache instance. +* The fastcache is designed for storing big number of entries without + [GC overhead](https://syslog.ravelin.com/further-dangers-of-large-heaps-in-go-7a267b57d487). +* Fastcache automatically evicts old entries when reaching the maximum cache size + set on its creation. +* [Simple API](http://godoc.org/github.com/VictoriaMetrics/fastcache). +* Simple source code. +* Cache may be [saved to file](https://godoc.org/github.com/VictoriaMetrics/fastcache#Cache.SaveToFile) + and [loaded from file](https://godoc.org/github.com/VictoriaMetrics/fastcache#LoadFromFile). +* Works on [Google AppEngine](https://cloud.google.com/appengine/docs/go/). + + +### Benchmarks + +`Fastcache` performance is compared with [BigCache](https://github.com/allegro/bigcache), standard Go map +and [sync.Map](https://golang.org/pkg/sync/#Map). + +``` +GOMAXPROCS=4 go test github.com/VictoriaMetrics/fastcache -bench='Set|Get' -benchtime=10s +goos: linux +goarch: amd64 +pkg: github.com/VictoriaMetrics/fastcache +BenchmarkBigCacheSet-4 2000 10566656 ns/op 6.20 MB/s 4660369 B/op 6 allocs/op +BenchmarkBigCacheGet-4 2000 6902694 ns/op 9.49 MB/s 684169 B/op 131076 allocs/op +BenchmarkBigCacheSetGet-4 1000 17579118 ns/op 7.46 MB/s 5046744 B/op 131083 allocs/op +BenchmarkCacheSet-4 5000 3808874 ns/op 17.21 MB/s 1142 B/op 2 allocs/op +BenchmarkCacheGet-4 5000 3293849 ns/op 19.90 MB/s 1140 B/op 2 allocs/op +BenchmarkCacheSetGet-4 2000 8456061 ns/op 15.50 MB/s 2857 B/op 5 allocs/op +BenchmarkStdMapSet-4 2000 10559382 ns/op 6.21 MB/s 268413 B/op 65537 allocs/op +BenchmarkStdMapGet-4 5000 2687404 ns/op 24.39 MB/s 2558 B/op 13 allocs/op +BenchmarkStdMapSetGet-4 100 154641257 ns/op 0.85 MB/s 387405 B/op 65558 allocs/op +BenchmarkSyncMapSet-4 500 24703219 ns/op 2.65 MB/s 3426543 B/op 262411 allocs/op +BenchmarkSyncMapGet-4 5000 2265892 ns/op 28.92 MB/s 2545 B/op 79 allocs/op +BenchmarkSyncMapSetGet-4 1000 14595535 ns/op 8.98 MB/s 3417190 B/op 262277 allocs/op +``` + +`MB/s` column here actually means `millions of operations per second`. +As you can see, `fastcache` is faster than the `BigCache` in all the cases. +`fastcache` is faster than the standard Go map and `sync.Map` on workloads +with inserts. + + +### Limitations + +* Keys and values must be byte slices. Other types must be marshaled before + storing them in the cache. +* Big entries with sizes exceeding 64KB must be stored via [distinct API](http://godoc.org/github.com/VictoriaMetrics/fastcache#Cache.SetBig). +* There is no cache expiration. Entries are evicted from the cache only + on cache size overflow. Entry deadline may be stored inside the value in order + to implement cache expiration. + + +### Architecture details + +The cache uses ideas from [BigCache](https://github.com/allegro/bigcache): + +* The cache consists of many buckets, each with its own lock. + This helps scaling the performance on multi-core CPUs, since multiple + CPUs may concurrently access distinct buckets. +* Each bucket consists of a `hash(key) -> (key, value) position` map + and 64KB-sized byte slices (chunks) holding encoded `(key, value)` entries. + Each bucket contains only `O(chunksCount)` pointers. For instance, 64GB cache + would contain ~1M pointers, while similarly-sized `map[string][]byte` + would contain ~1B pointers for short keys and values. This would lead to + [huge GC overhead](https://syslog.ravelin.com/further-dangers-of-large-heaps-in-go-7a267b57d487). + +64KB-sized chunks reduce memory fragmentation and the total memory usage comparing +to a single big chunk per bucket. +Chunks are allocated off-heap if possible. This reduces total memory usage because +GC collects unused memory more frequently without the need in `GOGC` tweaking. + + +### Users + +* `Fastcache` has been extracted from [VictoriaMetrics](https://github.com/VictoriaMetrics/VictoriaMetrics) sources. + See [this article](https://medium.com/devopslinks/victoriametrics-creating-the-best-remote-storage-for-prometheus-5d92d66787ac) + for more info about `VictoriaMetrics`. + + +### FAQ + +#### What is the difference between `fastcache` and other similar caches like [BigCache](https://github.com/allegro/bigcache) or [FreeCache](https://github.com/coocood/freecache)? + +* `Fastcache` is faster. See benchmark results above. +* `Fastcache` uses less memory due to lower heap fragmentation. This allows + saving many GBs of memory on multi-GB caches. +* `Fastcache` API [is simpler](http://godoc.org/github.com/VictoriaMetrics/fastcache). + The API is designed to be used in zero-allocation mode. + + +#### Why `fastcache` doesn't support cache expiration? + +Because we don't need cache expiration in [VictoriaMetrics](https://github.com/VictoriaMetrics/VictoriaMetrics). +Cached entries inside `VictoriaMetrics` never expire. They are automatically evicted on cache size overflow. + +It is easy to implement cache expiration on top of `fastcache` by caching values +with marshaled deadlines and verifying deadlines after reading these values +from the cache. + + +#### Why `fastcache` doesn't support advanced features such as [thundering herd protection](https://en.wikipedia.org/wiki/Thundering_herd_problem) or callbacks on entries' eviction? + +Because these features would complicate the code and would make it slower. +`Fastcache` source code is simple - just copy-paste it and implement the feature you want +on top of it. diff --git a/vendor/github.com/VictoriaMetrics/fastcache/bigcache.go b/vendor/github.com/VictoriaMetrics/fastcache/bigcache.go new file mode 100644 index 00000000000..ea234b40d14 --- /dev/null +++ b/vendor/github.com/VictoriaMetrics/fastcache/bigcache.go @@ -0,0 +1,160 @@ +package fastcache + +import ( + "sync" + "sync/atomic" + + xxhash "github.com/cespare/xxhash/v2" +) + +// maxSubvalueLen is the maximum size of subvalue chunk. +// +// - 16 bytes are for subkey encoding +// - 4 bytes are for len(key)+len(value) encoding inside fastcache +// - 1 byte is implementation detail of fastcache +const maxSubvalueLen = chunkSize - 16 - 4 - 1 + +// maxKeyLen is the maximum size of key. +// +// - 16 bytes are for (hash + valueLen) +// - 4 bytes are for len(key)+len(subkey) +// - 1 byte is implementation detail of fastcache +const maxKeyLen = chunkSize - 16 - 4 - 1 + +// SetBig sets (k, v) to c where len(v) may exceed 64KB. +// +// GetBig must be used for reading stored values. +// +// The stored entry may be evicted at any time either due to cache +// overflow or due to unlikely hash collision. +// Pass higher maxBytes value to New if the added items disappear +// frequently. +// +// It is safe to store entries smaller than 64KB with SetBig. +// +// k and v contents may be modified after returning from SetBig. +func (c *Cache) SetBig(k, v []byte) { + atomic.AddUint64(&c.bigStats.SetBigCalls, 1) + if len(k) > maxKeyLen { + atomic.AddUint64(&c.bigStats.TooBigKeyErrors, 1) + return + } + valueLen := len(v) + valueHash := xxhash.Sum64(v) + + // Split v into chunks with up to 64Kb each. + subkey := getSubkeyBuf() + var i uint64 + for len(v) > 0 { + subkey.B = marshalUint64(subkey.B[:0], valueHash) + subkey.B = marshalUint64(subkey.B, uint64(i)) + i++ + subvalueLen := maxSubvalueLen + if len(v) < subvalueLen { + subvalueLen = len(v) + } + subvalue := v[:subvalueLen] + v = v[subvalueLen:] + c.Set(subkey.B, subvalue) + } + + // Write metavalue, which consists of valueHash and valueLen. + subkey.B = marshalUint64(subkey.B[:0], valueHash) + subkey.B = marshalUint64(subkey.B, uint64(valueLen)) + c.Set(k, subkey.B) + putSubkeyBuf(subkey) +} + +// GetBig searches for the value for the given k, appends it to dst +// and returns the result. +// +// GetBig returns only values stored via SetBig. It doesn't work +// with values stored via other methods. +// +// k contents may be modified after returning from GetBig. +func (c *Cache) GetBig(dst, k []byte) (r []byte) { + atomic.AddUint64(&c.bigStats.GetBigCalls, 1) + subkey := getSubkeyBuf() + dstWasNil := dst == nil + defer func() { + putSubkeyBuf(subkey) + if len(r) == 0 && dstWasNil { + // Guarantee that if the caller provided nil and this is a cache miss that + // the caller can accurately test for a cache miss with `if r == nil`. + r = nil + } + }() + + // Read and parse metavalue + subkey.B = c.Get(subkey.B[:0], k) + if len(subkey.B) == 0 { + // Nothing found. + return dst + } + if len(subkey.B) != 16 { + atomic.AddUint64(&c.bigStats.InvalidMetavalueErrors, 1) + return dst + } + valueHash := unmarshalUint64(subkey.B) + valueLen := unmarshalUint64(subkey.B[8:]) + + // Collect result from chunks. + dstLen := len(dst) + if n := dstLen + int(valueLen) - cap(dst); n > 0 { + dst = append(dst[:cap(dst)], make([]byte, n)...) + } + dst = dst[:dstLen] + var i uint64 + for uint64(len(dst)-dstLen) < valueLen { + subkey.B = marshalUint64(subkey.B[:0], valueHash) + subkey.B = marshalUint64(subkey.B, uint64(i)) + i++ + dstNew := c.Get(dst, subkey.B) + if len(dstNew) == len(dst) { + // Cannot find subvalue + return dst[:dstLen] + } + dst = dstNew + } + + // Verify the obtained value. + v := dst[dstLen:] + if uint64(len(v)) != valueLen { + atomic.AddUint64(&c.bigStats.InvalidValueLenErrors, 1) + return dst[:dstLen] + } + h := xxhash.Sum64(v) + if h != valueHash { + atomic.AddUint64(&c.bigStats.InvalidValueHashErrors, 1) + return dst[:dstLen] + } + return dst +} + +func getSubkeyBuf() *bytesBuf { + v := subkeyPool.Get() + if v == nil { + return &bytesBuf{} + } + return v.(*bytesBuf) +} + +func putSubkeyBuf(bb *bytesBuf) { + bb.B = bb.B[:0] + subkeyPool.Put(bb) +} + +var subkeyPool sync.Pool + +type bytesBuf struct { + B []byte +} + +func marshalUint64(dst []byte, u uint64) []byte { + return append(dst, byte(u>>56), byte(u>>48), byte(u>>40), byte(u>>32), byte(u>>24), byte(u>>16), byte(u>>8), byte(u)) +} + +func unmarshalUint64(src []byte) uint64 { + _ = src[7] + return uint64(src[0])<<56 | uint64(src[1])<<48 | uint64(src[2])<<40 | uint64(src[3])<<32 | uint64(src[4])<<24 | uint64(src[5])<<16 | uint64(src[6])<<8 | uint64(src[7]) +} diff --git a/vendor/github.com/VictoriaMetrics/fastcache/fastcache.go b/vendor/github.com/VictoriaMetrics/fastcache/fastcache.go new file mode 100644 index 00000000000..092ba37193b --- /dev/null +++ b/vendor/github.com/VictoriaMetrics/fastcache/fastcache.go @@ -0,0 +1,419 @@ +// Package fastcache implements fast in-memory cache. +// +// The package has been extracted from https://victoriametrics.com/ +package fastcache + +import ( + "fmt" + "sync" + "sync/atomic" + + xxhash "github.com/cespare/xxhash/v2" +) + +const bucketsCount = 512 + +const chunkSize = 64 * 1024 + +const bucketSizeBits = 40 + +const genSizeBits = 64 - bucketSizeBits + +const maxGen = 1<= maxBucketSize { + panic(fmt.Errorf("too big maxBytes=%d; should be smaller than %d", maxBytes, maxBucketSize)) + } + maxChunks := (maxBytes + chunkSize - 1) / chunkSize + b.chunks = make([][]byte, maxChunks) + b.m = make(map[uint64]uint64) + b.Reset() +} + +func (b *bucket) Reset() { + b.mu.Lock() + chunks := b.chunks + for i := range chunks { + putChunk(chunks[i]) + chunks[i] = nil + } + b.m = make(map[uint64]uint64) + b.idx = 0 + b.gen = 1 + atomic.StoreUint64(&b.getCalls, 0) + atomic.StoreUint64(&b.setCalls, 0) + atomic.StoreUint64(&b.misses, 0) + atomic.StoreUint64(&b.collisions, 0) + atomic.StoreUint64(&b.corruptions, 0) + b.mu.Unlock() +} + +func (b *bucket) cleanLocked() { + bGen := b.gen & ((1 << genSizeBits) - 1) + bIdx := b.idx + bm := b.m + for k, v := range bm { + gen := v >> bucketSizeBits + idx := v & ((1 << bucketSizeBits) - 1) + if (gen+1 == bGen || gen == maxGen && bGen == 1) && idx >= bIdx || gen == bGen && idx < bIdx { + continue + } + delete(bm, k) + } +} + +func (b *bucket) UpdateStats(s *Stats) { + s.GetCalls += atomic.LoadUint64(&b.getCalls) + s.SetCalls += atomic.LoadUint64(&b.setCalls) + s.Misses += atomic.LoadUint64(&b.misses) + s.Collisions += atomic.LoadUint64(&b.collisions) + s.Corruptions += atomic.LoadUint64(&b.corruptions) + + b.mu.RLock() + s.EntriesCount += uint64(len(b.m)) + bytesSize := uint64(0) + for _, chunk := range b.chunks { + bytesSize += uint64(cap(chunk)) + } + s.BytesSize += bytesSize + s.MaxBytesSize += uint64(len(b.chunks)) * chunkSize + b.mu.RUnlock() +} + +func (b *bucket) Set(k, v []byte, h uint64) { + atomic.AddUint64(&b.setCalls, 1) + if len(k) >= (1<<16) || len(v) >= (1<<16) { + // Too big key or value - its length cannot be encoded + // with 2 bytes (see below). Skip the entry. + return + } + var kvLenBuf [4]byte + kvLenBuf[0] = byte(uint16(len(k)) >> 8) + kvLenBuf[1] = byte(len(k)) + kvLenBuf[2] = byte(uint16(len(v)) >> 8) + kvLenBuf[3] = byte(len(v)) + kvLen := uint64(len(kvLenBuf) + len(k) + len(v)) + if kvLen >= chunkSize { + // Do not store too big keys and values, since they do not + // fit a chunk. + return + } + + chunks := b.chunks + needClean := false + b.mu.Lock() + idx := b.idx + idxNew := idx + kvLen + chunkIdx := idx / chunkSize + chunkIdxNew := idxNew / chunkSize + if chunkIdxNew > chunkIdx { + if chunkIdxNew >= uint64(len(chunks)) { + idx = 0 + idxNew = kvLen + chunkIdx = 0 + b.gen++ + if b.gen&((1< 0 { + gen := v >> bucketSizeBits + idx := v & ((1 << bucketSizeBits) - 1) + if gen == bGen && idx < b.idx || gen+1 == bGen && idx >= b.idx || gen == maxGen && bGen == 1 && idx >= b.idx { + chunkIdx := idx / chunkSize + if chunkIdx >= uint64(len(chunks)) { + // Corrupted data during the load from file. Just skip it. + atomic.AddUint64(&b.corruptions, 1) + goto end + } + chunk := chunks[chunkIdx] + idx %= chunkSize + if idx+4 >= chunkSize { + // Corrupted data during the load from file. Just skip it. + atomic.AddUint64(&b.corruptions, 1) + goto end + } + kvLenBuf := chunk[idx : idx+4] + keyLen := (uint64(kvLenBuf[0]) << 8) | uint64(kvLenBuf[1]) + valLen := (uint64(kvLenBuf[2]) << 8) | uint64(kvLenBuf[3]) + idx += 4 + if idx+keyLen+valLen >= chunkSize { + // Corrupted data during the load from file. Just skip it. + atomic.AddUint64(&b.corruptions, 1) + goto end + } + if string(k) == string(chunk[idx:idx+keyLen]) { + idx += keyLen + if returnDst { + dst = append(dst, chunk[idx:idx+valLen]...) + } + found = true + } else { + atomic.AddUint64(&b.collisions, 1) + } + } + } +end: + b.mu.RUnlock() + if !found { + atomic.AddUint64(&b.misses, 1) + } + return dst, found +} + +func (b *bucket) Del(h uint64) { + b.mu.Lock() + delete(b.m, h) + b.mu.Unlock() +} diff --git a/vendor/github.com/VictoriaMetrics/fastcache/file.go b/vendor/github.com/VictoriaMetrics/fastcache/file.go new file mode 100644 index 00000000000..dfbc0701d93 --- /dev/null +++ b/vendor/github.com/VictoriaMetrics/fastcache/file.go @@ -0,0 +1,421 @@ +package fastcache + +import ( + "encoding/binary" + "fmt" + "io" + "io/ioutil" + "os" + "path/filepath" + "regexp" + "runtime" + + "github.com/golang/snappy" +) + +// SaveToFile atomically saves cache data to the given filePath using a single +// CPU core. +// +// SaveToFile may be called concurrently with other operations on the cache. +// +// The saved data may be loaded with LoadFromFile*. +// +// See also SaveToFileConcurrent for faster saving to file. +func (c *Cache) SaveToFile(filePath string) error { + return c.SaveToFileConcurrent(filePath, 1) +} + +// SaveToFileConcurrent saves cache data to the given filePath using concurrency +// CPU cores. +// +// SaveToFileConcurrent may be called concurrently with other operations +// on the cache. +// +// The saved data may be loaded with LoadFromFile*. +// +// See also SaveToFile. +func (c *Cache) SaveToFileConcurrent(filePath string, concurrency int) error { + // Create dir if it doesn't exist. + dir := filepath.Dir(filePath) + if _, err := os.Stat(dir); err != nil { + if !os.IsNotExist(err) { + return fmt.Errorf("cannot stat %q: %s", dir, err) + } + if err := os.MkdirAll(dir, 0755); err != nil { + return fmt.Errorf("cannot create dir %q: %s", dir, err) + } + } + + // Save cache data into a temporary directory. + tmpDir, err := ioutil.TempDir(dir, "fastcache.tmp.") + if err != nil { + return fmt.Errorf("cannot create temporary dir inside %q: %s", dir, err) + } + defer func() { + if tmpDir != "" { + _ = os.RemoveAll(tmpDir) + } + }() + gomaxprocs := runtime.GOMAXPROCS(-1) + if concurrency <= 0 || concurrency > gomaxprocs { + concurrency = gomaxprocs + } + if err := c.save(tmpDir, concurrency); err != nil { + return fmt.Errorf("cannot save cache data to temporary dir %q: %s", tmpDir, err) + } + + // Remove old filePath contents, since os.Rename may return + // error if filePath dir exists. + if err := os.RemoveAll(filePath); err != nil { + return fmt.Errorf("cannot remove old contents at %q: %s", filePath, err) + } + if err := os.Rename(tmpDir, filePath); err != nil { + return fmt.Errorf("cannot move temporary dir %q to %q: %s", tmpDir, filePath, err) + } + tmpDir = "" + return nil +} + +// LoadFromFile loads cache data from the given filePath. +// +// See SaveToFile* for saving cache data to file. +func LoadFromFile(filePath string) (*Cache, error) { + return load(filePath, 0) +} + +// LoadFromFileOrNew tries loading cache data from the given filePath. +// +// The function falls back to creating new cache with the given maxBytes +// capacity if error occurs during loading the cache from file. +func LoadFromFileOrNew(filePath string, maxBytes int) *Cache { + c, err := load(filePath, maxBytes) + if err == nil { + return c + } + return New(maxBytes) +} + +func (c *Cache) save(dir string, workersCount int) error { + if err := saveMetadata(c, dir); err != nil { + return err + } + + // Save buckets by workersCount concurrent workers. + workCh := make(chan int, workersCount) + results := make(chan error) + for i := 0; i < workersCount; i++ { + go func(workerNum int) { + results <- saveBuckets(c.buckets[:], workCh, dir, workerNum) + }(i) + } + // Feed workers with work + for i := range c.buckets[:] { + workCh <- i + } + close(workCh) + + // Read results. + var err error + for i := 0; i < workersCount; i++ { + result := <-results + if result != nil && err == nil { + err = result + } + } + return err +} + +func load(filePath string, maxBytes int) (*Cache, error) { + maxBucketChunks, err := loadMetadata(filePath) + if err != nil { + return nil, err + } + if maxBytes > 0 { + maxBucketBytes := uint64((maxBytes + bucketsCount - 1) / bucketsCount) + expectedBucketChunks := (maxBucketBytes + chunkSize - 1) / chunkSize + if maxBucketChunks != expectedBucketChunks { + return nil, fmt.Errorf("cache file %s contains maxBytes=%d; want %d", filePath, maxBytes, expectedBucketChunks*chunkSize*bucketsCount) + } + } + + // Read bucket files from filePath dir. + d, err := os.Open(filePath) + if err != nil { + return nil, fmt.Errorf("cannot open %q: %s", filePath, err) + } + defer func() { + _ = d.Close() + }() + fis, err := d.Readdir(-1) + if err != nil { + return nil, fmt.Errorf("cannot read files from %q: %s", filePath, err) + } + results := make(chan error) + workersCount := 0 + var c Cache + for _, fi := range fis { + fn := fi.Name() + if fi.IsDir() || !dataFileRegexp.MatchString(fn) { + continue + } + workersCount++ + go func(dataPath string) { + results <- loadBuckets(c.buckets[:], dataPath, maxBucketChunks) + }(filePath + "/" + fn) + } + err = nil + for i := 0; i < workersCount; i++ { + result := <-results + if result != nil && err == nil { + err = result + } + } + if err != nil { + return nil, err + } + // Initialize buckets, which could be missing due to incomplete or corrupted files in the cache. + // It is better initializing such buckets instead of returning error, since the rest of buckets + // contain valid data. + for i := range c.buckets[:] { + b := &c.buckets[i] + if len(b.chunks) == 0 { + b.chunks = make([][]byte, maxBucketChunks) + b.m = make(map[uint64]uint64) + } + } + return &c, nil +} + +func saveMetadata(c *Cache, dir string) error { + metadataPath := dir + "/metadata.bin" + metadataFile, err := os.Create(metadataPath) + if err != nil { + return fmt.Errorf("cannot create %q: %s", metadataPath, err) + } + defer func() { + _ = metadataFile.Close() + }() + maxBucketChunks := uint64(cap(c.buckets[0].chunks)) + if err := writeUint64(metadataFile, maxBucketChunks); err != nil { + return fmt.Errorf("cannot write maxBucketChunks=%d to %q: %s", maxBucketChunks, metadataPath, err) + } + return nil +} + +func loadMetadata(dir string) (uint64, error) { + metadataPath := dir + "/metadata.bin" + metadataFile, err := os.Open(metadataPath) + if err != nil { + return 0, fmt.Errorf("cannot open %q: %s", metadataPath, err) + } + defer func() { + _ = metadataFile.Close() + }() + maxBucketChunks, err := readUint64(metadataFile) + if err != nil { + return 0, fmt.Errorf("cannot read maxBucketChunks from %q: %s", metadataPath, err) + } + if maxBucketChunks == 0 { + return 0, fmt.Errorf("invalid maxBucketChunks=0 read from %q", metadataPath) + } + return maxBucketChunks, nil +} + +var dataFileRegexp = regexp.MustCompile(`^data\.\d+\.bin$`) + +func saveBuckets(buckets []bucket, workCh <-chan int, dir string, workerNum int) error { + dataPath := fmt.Sprintf("%s/data.%d.bin", dir, workerNum) + dataFile, err := os.Create(dataPath) + if err != nil { + return fmt.Errorf("cannot create %q: %s", dataPath, err) + } + defer func() { + _ = dataFile.Close() + }() + zw := snappy.NewBufferedWriter(dataFile) + for bucketNum := range workCh { + if err := writeUint64(zw, uint64(bucketNum)); err != nil { + return fmt.Errorf("cannot write bucketNum=%d to %q: %s", bucketNum, dataPath, err) + } + if err := buckets[bucketNum].Save(zw); err != nil { + return fmt.Errorf("cannot save bucket[%d] to %q: %s", bucketNum, dataPath, err) + } + } + if err := zw.Close(); err != nil { + return fmt.Errorf("cannot close snappy.Writer for %q: %s", dataPath, err) + } + return nil +} + +func loadBuckets(buckets []bucket, dataPath string, maxChunks uint64) error { + dataFile, err := os.Open(dataPath) + if err != nil { + return fmt.Errorf("cannot open %q: %s", dataPath, err) + } + defer func() { + _ = dataFile.Close() + }() + zr := snappy.NewReader(dataFile) + for { + bucketNum, err := readUint64(zr) + if err == io.EOF { + // Reached the end of file. + return nil + } + if bucketNum >= uint64(len(buckets)) { + return fmt.Errorf("unexpected bucketNum read from %q: %d; must be smaller than %d", dataPath, bucketNum, len(buckets)) + } + if err := buckets[bucketNum].Load(zr, maxChunks); err != nil { + return fmt.Errorf("cannot load bucket[%d] from %q: %s", bucketNum, dataPath, err) + } + } +} + +func (b *bucket) Save(w io.Writer) error { + b.mu.Lock() + b.cleanLocked() + b.mu.Unlock() + + b.mu.RLock() + defer b.mu.RUnlock() + + // Store b.idx, b.gen and b.m to w. + + bIdx := b.idx + bGen := b.gen + chunksLen := 0 + for _, chunk := range b.chunks { + if chunk == nil { + break + } + chunksLen++ + } + kvs := make([]byte, 0, 2*8*len(b.m)) + var u64Buf [8]byte + for k, v := range b.m { + binary.LittleEndian.PutUint64(u64Buf[:], k) + kvs = append(kvs, u64Buf[:]...) + binary.LittleEndian.PutUint64(u64Buf[:], v) + kvs = append(kvs, u64Buf[:]...) + } + + if err := writeUint64(w, bIdx); err != nil { + return fmt.Errorf("cannot write b.idx: %s", err) + } + if err := writeUint64(w, bGen); err != nil { + return fmt.Errorf("cannot write b.gen: %s", err) + } + if err := writeUint64(w, uint64(len(kvs))/2/8); err != nil { + return fmt.Errorf("cannot write len(b.m): %s", err) + } + if _, err := w.Write(kvs); err != nil { + return fmt.Errorf("cannot write b.m: %s", err) + } + + // Store b.chunks to w. + if err := writeUint64(w, uint64(chunksLen)); err != nil { + return fmt.Errorf("cannot write len(b.chunks): %s", err) + } + for chunkIdx := 0; chunkIdx < chunksLen; chunkIdx++ { + chunk := b.chunks[chunkIdx][:chunkSize] + if _, err := w.Write(chunk); err != nil { + return fmt.Errorf("cannot write b.chunks[%d]: %s", chunkIdx, err) + } + } + + return nil +} + +func (b *bucket) Load(r io.Reader, maxChunks uint64) error { + if maxChunks == 0 { + return fmt.Errorf("the number of chunks per bucket cannot be zero") + } + bIdx, err := readUint64(r) + if err != nil { + return fmt.Errorf("cannot read b.idx: %s", err) + } + bGen, err := readUint64(r) + if err != nil { + return fmt.Errorf("cannot read b.gen: %s", err) + } + kvsLen, err := readUint64(r) + if err != nil { + return fmt.Errorf("cannot read len(b.m): %s", err) + } + kvsLen *= 2 * 8 + kvs := make([]byte, kvsLen) + if _, err := io.ReadFull(r, kvs); err != nil { + return fmt.Errorf("cannot read b.m: %s", err) + } + m := make(map[uint64]uint64, kvsLen/2/8) + for len(kvs) > 0 { + k := binary.LittleEndian.Uint64(kvs) + kvs = kvs[8:] + v := binary.LittleEndian.Uint64(kvs) + kvs = kvs[8:] + m[k] = v + } + + maxBytes := maxChunks * chunkSize + if maxBytes >= maxBucketSize { + return fmt.Errorf("too big maxBytes=%d; should be smaller than %d", maxBytes, maxBucketSize) + } + chunks := make([][]byte, maxChunks) + chunksLen, err := readUint64(r) + if err != nil { + return fmt.Errorf("cannot read len(b.chunks): %s", err) + } + if chunksLen > uint64(maxChunks) { + return fmt.Errorf("chunksLen=%d cannot exceed maxChunks=%d", chunksLen, maxChunks) + } + currChunkIdx := bIdx / chunkSize + if currChunkIdx > 0 && currChunkIdx >= chunksLen { + return fmt.Errorf("too big bIdx=%d; should be smaller than %d", bIdx, chunksLen*chunkSize) + } + for chunkIdx := uint64(0); chunkIdx < chunksLen; chunkIdx++ { + chunk := getChunk() + chunks[chunkIdx] = chunk + if _, err := io.ReadFull(r, chunk); err != nil { + // Free up allocated chunks before returning the error. + for _, chunk := range chunks { + if chunk != nil { + putChunk(chunk) + } + } + return fmt.Errorf("cannot read b.chunks[%d]: %s", chunkIdx, err) + } + } + // Adjust len for the chunk pointed by currChunkIdx. + if chunksLen > 0 { + chunkLen := bIdx % chunkSize + chunks[currChunkIdx] = chunks[currChunkIdx][:chunkLen] + } + + b.mu.Lock() + for _, chunk := range b.chunks { + putChunk(chunk) + } + b.chunks = chunks + b.m = m + b.idx = bIdx + b.gen = bGen + b.mu.Unlock() + + return nil +} + +func writeUint64(w io.Writer, u uint64) error { + var u64Buf [8]byte + binary.LittleEndian.PutUint64(u64Buf[:], u) + _, err := w.Write(u64Buf[:]) + return err +} + +func readUint64(r io.Reader) (uint64, error) { + var u64Buf [8]byte + if _, err := io.ReadFull(r, u64Buf[:]); err != nil { + return 0, err + } + u := binary.LittleEndian.Uint64(u64Buf[:]) + return u, nil +} diff --git a/vendor/github.com/VictoriaMetrics/fastcache/malloc_heap.go b/vendor/github.com/VictoriaMetrics/fastcache/malloc_heap.go new file mode 100644 index 00000000000..810d460b79e --- /dev/null +++ b/vendor/github.com/VictoriaMetrics/fastcache/malloc_heap.go @@ -0,0 +1,12 @@ +//go:build appengine || windows +// +build appengine windows + +package fastcache + +func getChunk() []byte { + return make([]byte, chunkSize) +} + +func putChunk(chunk []byte) { + // No-op. +} diff --git a/vendor/github.com/VictoriaMetrics/fastcache/malloc_mmap.go b/vendor/github.com/VictoriaMetrics/fastcache/malloc_mmap.go new file mode 100644 index 00000000000..e24d578bf75 --- /dev/null +++ b/vendor/github.com/VictoriaMetrics/fastcache/malloc_mmap.go @@ -0,0 +1,54 @@ +//go:build !appengine && !windows +// +build !appengine,!windows + +package fastcache + +import ( + "fmt" + "sync" + "unsafe" + + "golang.org/x/sys/unix" +) + +const chunksPerAlloc = 1024 + +var ( + freeChunks []*[chunkSize]byte + freeChunksLock sync.Mutex +) + +func getChunk() []byte { + freeChunksLock.Lock() + if len(freeChunks) == 0 { + // Allocate offheap memory, so GOGC won't take into account cache size. + // This should reduce free memory waste. + data, err := unix.Mmap(-1, 0, chunkSize*chunksPerAlloc, unix.PROT_READ|unix.PROT_WRITE, unix.MAP_ANON|unix.MAP_PRIVATE) + if err != nil { + panic(fmt.Errorf("cannot allocate %d bytes via mmap: %s", chunkSize*chunksPerAlloc, err)) + } + for len(data) > 0 { + p := (*[chunkSize]byte)(unsafe.Pointer(&data[0])) + freeChunks = append(freeChunks, p) + data = data[chunkSize:] + } + } + n := len(freeChunks) - 1 + p := freeChunks[n] + freeChunks[n] = nil + freeChunks = freeChunks[:n] + freeChunksLock.Unlock() + return p[:] +} + +func putChunk(chunk []byte) { + if chunk == nil { + return + } + chunk = chunk[:chunkSize] + p := (*[chunkSize]byte)(unsafe.Pointer(&chunk[0])) + + freeChunksLock.Lock() + freeChunks = append(freeChunks, p) + freeChunksLock.Unlock() +} diff --git a/vendor/github.com/thanos-io/thanos/pkg/store/cache/cache.go b/vendor/github.com/thanos-io/thanos/pkg/store/cache/cache.go index 0811d89cc03..c20a1f24598 100644 --- a/vendor/github.com/thanos-io/thanos/pkg/store/cache/cache.go +++ b/vendor/github.com/thanos-io/thanos/pkg/store/cache/cache.go @@ -20,13 +20,17 @@ import ( ) const ( - cacheTypePostings string = "Postings" - cacheTypeExpandedPostings string = "ExpandedPostings" - cacheTypeSeries string = "Series" + CacheTypePostings string = "Postings" + CacheTypeExpandedPostings string = "ExpandedPostings" + CacheTypeSeries string = "Series" sliceHeaderSize = 16 ) +type CacheKeyPostings labels.Label +type CacheKeyExpandedPostings string // We don't use []*labels.Matcher because it is not a hashable type so fail at inmemory cache. +type CacheKeySeries uint64 + var ( ulidSize = uint64(len(ulid.ULID{})) ) @@ -59,31 +63,32 @@ type IndexCache interface { } // Common metrics that should be used by all cache implementations. -type commonMetrics struct { - requestTotal *prometheus.CounterVec - hitsTotal *prometheus.CounterVec - dataSizeBytes *prometheus.HistogramVec - fetchLatency *prometheus.HistogramVec +type CommonMetrics struct { + RequestTotal *prometheus.CounterVec + HitsTotal *prometheus.CounterVec + DataSizeBytes *prometheus.HistogramVec + FetchLatency *prometheus.HistogramVec } -func newCommonMetrics(reg prometheus.Registerer) *commonMetrics { - return &commonMetrics{ - requestTotal: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ +// NewCommonMetrics initializes common metrics for index cache. +func NewCommonMetrics(reg prometheus.Registerer) *CommonMetrics { + return &CommonMetrics{ + RequestTotal: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ Name: "thanos_store_index_cache_requests_total", Help: "Total number of items requests to the cache.", }, []string{"item_type", tenancy.MetricLabel}), - hitsTotal: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + HitsTotal: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ Name: "thanos_store_index_cache_hits_total", Help: "Total number of items requests to the cache that were a hit.", }, []string{"item_type", tenancy.MetricLabel}), - dataSizeBytes: promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{ + DataSizeBytes: promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{ Name: "thanos_store_index_cache_stored_data_size_bytes", Help: "Histogram to track item data size stored in index cache", Buckets: []float64{ 32, 256, 512, 1024, 32 * 1024, 256 * 1024, 512 * 1024, 1024 * 1024, 32 * 1024 * 1024, 64 * 1024 * 1024, 128 * 1024 * 1024, 256 * 1024 * 1024, 512 * 1024 * 1024, }, }, []string{"item_type", tenancy.MetricLabel}), - fetchLatency: promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{ + FetchLatency: promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{ Name: "thanos_store_index_cache_fetch_duration_seconds", Help: "Histogram to track latency to fetch items from index cache", Buckets: []float64{0.01, 0.1, 0.3, 0.6, 1, 3, 6, 10, 15, 20, 30, 45, 60, 90, 120}, @@ -91,68 +96,72 @@ func newCommonMetrics(reg prometheus.Registerer) *commonMetrics { } } -type cacheKey struct { - block string - key interface{} +// CacheKey defines cache key used in index cache. +type CacheKey struct { + Block string + Key interface{} - compression string + Compression string } -func (c cacheKey) keyType() string { - switch c.key.(type) { - case cacheKeyPostings: - return cacheTypePostings - case cacheKeySeries: - return cacheTypeSeries - case cacheKeyExpandedPostings: - return cacheTypeExpandedPostings +// KeyType returns cache key type. +func (c CacheKey) KeyType() string { + switch c.Key.(type) { + case CacheKeyPostings: + return CacheTypePostings + case CacheKeySeries: + return CacheTypeSeries + case CacheKeyExpandedPostings: + return CacheTypeExpandedPostings } return "" } -func (c cacheKey) size() uint64 { - switch k := c.key.(type) { - case cacheKeyPostings: +// Size returns the size bytes of the cache key. +func (c CacheKey) Size() uint64 { + switch k := c.Key.(type) { + case CacheKeyPostings: // ULID + 2 slice headers + number of chars in value and name. return ulidSize + 2*sliceHeaderSize + uint64(len(k.Value)+len(k.Name)) - case cacheKeyExpandedPostings: + case CacheKeyExpandedPostings: return ulidSize + sliceHeaderSize + uint64(len(k)) - case cacheKeySeries: + case CacheKeySeries: return ulidSize + 8 // ULID + uint64. } return 0 } -func (c cacheKey) string() string { - switch c.key.(type) { - case cacheKeyPostings: +func (c CacheKey) String() string { + switch c.Key.(type) { + case CacheKeyPostings: // Use cryptographically hash functions to avoid hash collisions // which would end up in wrong query results. - lbl := c.key.(cacheKeyPostings) + lbl := c.Key.(CacheKeyPostings) lblHash := blake2b.Sum256([]byte(lbl.Name + ":" + lbl.Value)) - key := "P:" + c.block + ":" + base64.RawURLEncoding.EncodeToString(lblHash[0:]) - if len(c.compression) > 0 { - key += ":" + c.compression + key := "P:" + c.Block + ":" + base64.RawURLEncoding.EncodeToString(lblHash[0:]) + if len(c.Compression) > 0 { + key += ":" + c.Compression } return key - case cacheKeyExpandedPostings: + case CacheKeyExpandedPostings: // Use cryptographically hash functions to avoid hash collisions // which would end up in wrong query results. - matchers := c.key.(cacheKeyExpandedPostings) + matchers := c.Key.(CacheKeyExpandedPostings) matchersHash := blake2b.Sum256([]byte(matchers)) - key := "EP:" + c.block + ":" + base64.RawURLEncoding.EncodeToString(matchersHash[0:]) - if len(c.compression) > 0 { - key += ":" + c.compression + key := "EP:" + c.Block + ":" + base64.RawURLEncoding.EncodeToString(matchersHash[0:]) + if len(c.Compression) > 0 { + key += ":" + c.Compression } return key - case cacheKeySeries: - return "S:" + c.block + ":" + strconv.FormatUint(uint64(c.key.(cacheKeySeries)), 10) + case CacheKeySeries: + return "S:" + c.Block + ":" + strconv.FormatUint(uint64(c.Key.(CacheKeySeries)), 10) default: return "" } } -func labelMatchersToString(matchers []*labels.Matcher) string { +// LabelMatchersToString converts the given label matchers to string format. +func LabelMatchersToString(matchers []*labels.Matcher) string { sb := strings.Builder{} for i, lbl := range matchers { sb.WriteString(lbl.String()) @@ -162,7 +171,3 @@ func labelMatchersToString(matchers []*labels.Matcher) string { } return sb.String() } - -type cacheKeyPostings labels.Label -type cacheKeyExpandedPostings string // We don't use []*labels.Matcher because it is not a hashable type so fail at inmemory cache. -type cacheKeySeries uint64 diff --git a/vendor/github.com/thanos-io/thanos/pkg/store/cache/factory.go b/vendor/github.com/thanos-io/thanos/pkg/store/cache/factory.go index c457ec41f5b..80ba850b4c7 100644 --- a/vendor/github.com/thanos-io/thanos/pkg/store/cache/factory.go +++ b/vendor/github.com/thanos-io/thanos/pkg/store/cache/factory.go @@ -41,7 +41,7 @@ type IndexCacheConfig struct { func NewIndexCache(logger log.Logger, confContentYaml []byte, reg prometheus.Registerer) (IndexCache, error) { level.Info(logger).Log("msg", "loading index cache configuration") cacheConfig := &IndexCacheConfig{} - cacheMetrics := newCommonMetrics(reg) + cacheMetrics := NewCommonMetrics(reg) if err := yaml.UnmarshalStrict(confContentYaml, cacheConfig); err != nil { return nil, errors.Wrap(err, "parsing config YAML file") } diff --git a/vendor/github.com/thanos-io/thanos/pkg/store/cache/filter_cache.go b/vendor/github.com/thanos-io/thanos/pkg/store/cache/filter_cache.go index 193f7363a26..994491595f5 100644 --- a/vendor/github.com/thanos-io/thanos/pkg/store/cache/filter_cache.go +++ b/vendor/github.com/thanos-io/thanos/pkg/store/cache/filter_cache.go @@ -29,7 +29,7 @@ func NewFilteredIndexCache(cache IndexCache, enabledItems []string) *FilteredInd // StorePostings sets the postings identified by the ulid and label to the value v, // if the postings already exists in the cache it is not mutated. func (c *FilteredIndexCache) StorePostings(blockID ulid.ULID, l labels.Label, v []byte, tenant string) { - if len(c.enabledItems) == 0 || slices.Contains(c.enabledItems, cacheTypePostings) { + if len(c.enabledItems) == 0 || slices.Contains(c.enabledItems, CacheTypePostings) { c.cache.StorePostings(blockID, l, v, tenant) } } @@ -37,7 +37,7 @@ func (c *FilteredIndexCache) StorePostings(blockID ulid.ULID, l labels.Label, v // FetchMultiPostings fetches multiple postings - each identified by a label - // and returns a map containing cache hits, along with a list of missing keys. func (c *FilteredIndexCache) FetchMultiPostings(ctx context.Context, blockID ulid.ULID, keys []labels.Label, tenant string) (hits map[labels.Label][]byte, misses []labels.Label) { - if len(c.enabledItems) == 0 || slices.Contains(c.enabledItems, cacheTypePostings) { + if len(c.enabledItems) == 0 || slices.Contains(c.enabledItems, CacheTypePostings) { return c.cache.FetchMultiPostings(ctx, blockID, keys, tenant) } return nil, keys @@ -45,14 +45,14 @@ func (c *FilteredIndexCache) FetchMultiPostings(ctx context.Context, blockID uli // StoreExpandedPostings stores expanded postings for a set of label matchers. func (c *FilteredIndexCache) StoreExpandedPostings(blockID ulid.ULID, matchers []*labels.Matcher, v []byte, tenant string) { - if len(c.enabledItems) == 0 || slices.Contains(c.enabledItems, cacheTypeExpandedPostings) { + if len(c.enabledItems) == 0 || slices.Contains(c.enabledItems, CacheTypeExpandedPostings) { c.cache.StoreExpandedPostings(blockID, matchers, v, tenant) } } // FetchExpandedPostings fetches expanded postings and returns cached data and a boolean value representing whether it is a cache hit or not. func (c *FilteredIndexCache) FetchExpandedPostings(ctx context.Context, blockID ulid.ULID, matchers []*labels.Matcher, tenant string) ([]byte, bool) { - if len(c.enabledItems) == 0 || slices.Contains(c.enabledItems, cacheTypeExpandedPostings) { + if len(c.enabledItems) == 0 || slices.Contains(c.enabledItems, CacheTypeExpandedPostings) { return c.cache.FetchExpandedPostings(ctx, blockID, matchers, tenant) } return nil, false @@ -61,7 +61,7 @@ func (c *FilteredIndexCache) FetchExpandedPostings(ctx context.Context, blockID // StoreSeries sets the series identified by the ulid and id to the value v, // if the series already exists in the cache it is not mutated. func (c *FilteredIndexCache) StoreSeries(blockID ulid.ULID, id storage.SeriesRef, v []byte, tenant string) { - if len(c.enabledItems) == 0 || slices.Contains(c.enabledItems, cacheTypeSeries) { + if len(c.enabledItems) == 0 || slices.Contains(c.enabledItems, CacheTypeSeries) { c.cache.StoreSeries(blockID, id, v, tenant) } } @@ -69,7 +69,7 @@ func (c *FilteredIndexCache) StoreSeries(blockID ulid.ULID, id storage.SeriesRef // FetchMultiSeries fetches multiple series - each identified by ID - from the cache // and returns a map containing cache hits, along with a list of missing IDs. func (c *FilteredIndexCache) FetchMultiSeries(ctx context.Context, blockID ulid.ULID, ids []storage.SeriesRef, tenant string) (hits map[storage.SeriesRef][]byte, misses []storage.SeriesRef) { - if len(c.enabledItems) == 0 || slices.Contains(c.enabledItems, cacheTypeSeries) { + if len(c.enabledItems) == 0 || slices.Contains(c.enabledItems, CacheTypeSeries) { return c.cache.FetchMultiSeries(ctx, blockID, ids, tenant) } return nil, ids @@ -79,7 +79,7 @@ func ValidateEnabledItems(enabledItems []string) error { for _, item := range enabledItems { switch item { // valid - case cacheTypePostings, cacheTypeExpandedPostings, cacheTypeSeries: + case CacheTypePostings, CacheTypeExpandedPostings, CacheTypeSeries: default: return fmt.Errorf("unsupported item type %s", item) } diff --git a/vendor/github.com/thanos-io/thanos/pkg/store/cache/inmemory.go b/vendor/github.com/thanos-io/thanos/pkg/store/cache/inmemory.go index 66f05d4177b..ca61f102afb 100644 --- a/vendor/github.com/thanos-io/thanos/pkg/store/cache/inmemory.go +++ b/vendor/github.com/thanos-io/thanos/pkg/store/cache/inmemory.go @@ -50,7 +50,7 @@ type InMemoryIndexCache struct { totalCurrentSize *prometheus.GaugeVec overflow *prometheus.CounterVec - commonMetrics *commonMetrics + commonMetrics *CommonMetrics } // InMemoryIndexCacheConfig holds the in-memory index cache config. @@ -73,7 +73,7 @@ func parseInMemoryIndexCacheConfig(conf []byte) (InMemoryIndexCacheConfig, error // NewInMemoryIndexCache creates a new thread-safe LRU cache for index entries and ensures the total cache // size approximately does not exceed maxBytes. -func NewInMemoryIndexCache(logger log.Logger, commonMetrics *commonMetrics, reg prometheus.Registerer, conf []byte) (*InMemoryIndexCache, error) { +func NewInMemoryIndexCache(logger log.Logger, commonMetrics *CommonMetrics, reg prometheus.Registerer, conf []byte) (*InMemoryIndexCache, error) { config, err := parseInMemoryIndexCacheConfig(conf) if err != nil { return nil, err @@ -84,13 +84,13 @@ func NewInMemoryIndexCache(logger log.Logger, commonMetrics *commonMetrics, reg // NewInMemoryIndexCacheWithConfig creates a new thread-safe LRU cache for index entries and ensures the total cache // size approximately does not exceed maxBytes. -func NewInMemoryIndexCacheWithConfig(logger log.Logger, commonMetrics *commonMetrics, reg prometheus.Registerer, config InMemoryIndexCacheConfig) (*InMemoryIndexCache, error) { +func NewInMemoryIndexCacheWithConfig(logger log.Logger, commonMetrics *CommonMetrics, reg prometheus.Registerer, config InMemoryIndexCacheConfig) (*InMemoryIndexCache, error) { if config.MaxItemSize > config.MaxSize { return nil, errors.Errorf("max item size (%v) cannot be bigger than overall cache size (%v)", config.MaxItemSize, config.MaxSize) } if commonMetrics == nil { - commonMetrics = newCommonMetrics(reg) + commonMetrics = NewCommonMetrics(reg) } c := &InMemoryIndexCache{ @@ -104,57 +104,57 @@ func NewInMemoryIndexCacheWithConfig(logger log.Logger, commonMetrics *commonMet Name: "thanos_store_index_cache_items_evicted_total", Help: "Total number of items that were evicted from the index cache.", }, []string{"item_type"}) - c.evicted.WithLabelValues(cacheTypePostings) - c.evicted.WithLabelValues(cacheTypeSeries) - c.evicted.WithLabelValues(cacheTypeExpandedPostings) + c.evicted.WithLabelValues(CacheTypePostings) + c.evicted.WithLabelValues(CacheTypeSeries) + c.evicted.WithLabelValues(CacheTypeExpandedPostings) c.added = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ Name: "thanos_store_index_cache_items_added_total", Help: "Total number of items that were added to the index cache.", }, []string{"item_type"}) - c.added.WithLabelValues(cacheTypePostings) - c.added.WithLabelValues(cacheTypeSeries) - c.added.WithLabelValues(cacheTypeExpandedPostings) + c.added.WithLabelValues(CacheTypePostings) + c.added.WithLabelValues(CacheTypeSeries) + c.added.WithLabelValues(CacheTypeExpandedPostings) - c.commonMetrics.requestTotal.WithLabelValues(cacheTypePostings, tenancy.DefaultTenant) - c.commonMetrics.requestTotal.WithLabelValues(cacheTypeSeries, tenancy.DefaultTenant) - c.commonMetrics.requestTotal.WithLabelValues(cacheTypeExpandedPostings, tenancy.DefaultTenant) + c.commonMetrics.RequestTotal.WithLabelValues(CacheTypePostings, tenancy.DefaultTenant) + c.commonMetrics.RequestTotal.WithLabelValues(CacheTypeSeries, tenancy.DefaultTenant) + c.commonMetrics.RequestTotal.WithLabelValues(CacheTypeExpandedPostings, tenancy.DefaultTenant) c.overflow = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ Name: "thanos_store_index_cache_items_overflowed_total", Help: "Total number of items that could not be added to the cache due to being too big.", }, []string{"item_type"}) - c.overflow.WithLabelValues(cacheTypePostings) - c.overflow.WithLabelValues(cacheTypeSeries) - c.overflow.WithLabelValues(cacheTypeExpandedPostings) + c.overflow.WithLabelValues(CacheTypePostings) + c.overflow.WithLabelValues(CacheTypeSeries) + c.overflow.WithLabelValues(CacheTypeExpandedPostings) - c.commonMetrics.hitsTotal.WithLabelValues(cacheTypePostings, tenancy.DefaultTenant) - c.commonMetrics.hitsTotal.WithLabelValues(cacheTypeSeries, tenancy.DefaultTenant) - c.commonMetrics.hitsTotal.WithLabelValues(cacheTypeExpandedPostings, tenancy.DefaultTenant) + c.commonMetrics.HitsTotal.WithLabelValues(CacheTypePostings, tenancy.DefaultTenant) + c.commonMetrics.HitsTotal.WithLabelValues(CacheTypeSeries, tenancy.DefaultTenant) + c.commonMetrics.HitsTotal.WithLabelValues(CacheTypeExpandedPostings, tenancy.DefaultTenant) c.current = promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{ Name: "thanos_store_index_cache_items", Help: "Current number of items in the index cache.", }, []string{"item_type"}) - c.current.WithLabelValues(cacheTypePostings) - c.current.WithLabelValues(cacheTypeSeries) - c.current.WithLabelValues(cacheTypeExpandedPostings) + c.current.WithLabelValues(CacheTypePostings) + c.current.WithLabelValues(CacheTypeSeries) + c.current.WithLabelValues(CacheTypeExpandedPostings) c.currentSize = promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{ Name: "thanos_store_index_cache_items_size_bytes", Help: "Current byte size of items in the index cache.", }, []string{"item_type"}) - c.currentSize.WithLabelValues(cacheTypePostings) - c.currentSize.WithLabelValues(cacheTypeSeries) - c.currentSize.WithLabelValues(cacheTypeExpandedPostings) + c.currentSize.WithLabelValues(CacheTypePostings) + c.currentSize.WithLabelValues(CacheTypeSeries) + c.currentSize.WithLabelValues(CacheTypeExpandedPostings) c.totalCurrentSize = promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{ Name: "thanos_store_index_cache_total_size_bytes", Help: "Current byte size of items (both value and key) in the index cache.", }, []string{"item_type"}) - c.totalCurrentSize.WithLabelValues(cacheTypePostings) - c.totalCurrentSize.WithLabelValues(cacheTypeSeries) - c.totalCurrentSize.WithLabelValues(cacheTypeExpandedPostings) + c.totalCurrentSize.WithLabelValues(CacheTypePostings) + c.totalCurrentSize.WithLabelValues(CacheTypeSeries) + c.totalCurrentSize.WithLabelValues(CacheTypeExpandedPostings) _ = promauto.With(reg).NewGaugeFunc(prometheus.GaugeOpts{ Name: "thanos_store_index_cache_max_size_bytes", @@ -187,18 +187,18 @@ func NewInMemoryIndexCacheWithConfig(logger log.Logger, commonMetrics *commonMet } func (c *InMemoryIndexCache) onEvict(key, val interface{}) { - k := key.(cacheKey).keyType() + k := key.(CacheKey).KeyType() entrySize := sliceHeaderSize + uint64(len(val.([]byte))) c.evicted.WithLabelValues(k).Inc() c.current.WithLabelValues(k).Dec() c.currentSize.WithLabelValues(k).Sub(float64(entrySize)) - c.totalCurrentSize.WithLabelValues(k).Sub(float64(entrySize + key.(cacheKey).size())) + c.totalCurrentSize.WithLabelValues(k).Sub(float64(entrySize + key.(CacheKey).Size())) c.curSize -= entrySize } -func (c *InMemoryIndexCache) get(key cacheKey) ([]byte, bool) { +func (c *InMemoryIndexCache) get(key CacheKey) ([]byte, bool) { c.mtx.Lock() defer c.mtx.Unlock() @@ -209,7 +209,7 @@ func (c *InMemoryIndexCache) get(key cacheKey) ([]byte, bool) { return v.([]byte), true } -func (c *InMemoryIndexCache) set(typ string, key cacheKey, val []byte) { +func (c *InMemoryIndexCache) set(typ string, key CacheKey, val []byte) { var size = sliceHeaderSize + uint64(len(val)) c.mtx.Lock() @@ -232,7 +232,7 @@ func (c *InMemoryIndexCache) set(typ string, key cacheKey, val []byte) { c.added.WithLabelValues(typ).Inc() c.currentSize.WithLabelValues(typ).Add(float64(size)) - c.totalCurrentSize.WithLabelValues(typ).Add(float64(size + key.size())) + c.totalCurrentSize.WithLabelValues(typ).Add(float64(size + key.Size())) c.current.WithLabelValues(typ).Inc() c.curSize += size } @@ -286,21 +286,21 @@ func copyString(s string) string { } // copyToKey is required as underlying strings might be mmaped. -func copyToKey(l labels.Label) cacheKeyPostings { - return cacheKeyPostings(labels.Label{Value: copyString(l.Value), Name: copyString(l.Name)}) +func copyToKey(l labels.Label) CacheKeyPostings { + return CacheKeyPostings(labels.Label{Value: copyString(l.Value), Name: copyString(l.Name)}) } // StorePostings sets the postings identified by the ulid and label to the value v, // if the postings already exists in the cache it is not mutated. func (c *InMemoryIndexCache) StorePostings(blockID ulid.ULID, l labels.Label, v []byte, tenant string) { - c.commonMetrics.dataSizeBytes.WithLabelValues(cacheTypePostings, tenant).Observe(float64(len(v))) - c.set(cacheTypePostings, cacheKey{block: blockID.String(), key: copyToKey(l)}, v) + c.commonMetrics.DataSizeBytes.WithLabelValues(CacheTypePostings, tenant).Observe(float64(len(v))) + c.set(CacheTypePostings, CacheKey{Block: blockID.String(), Key: copyToKey(l)}, v) } // FetchMultiPostings fetches multiple postings - each identified by a label - // and returns a map containing cache hits, along with a list of missing keys. func (c *InMemoryIndexCache) FetchMultiPostings(ctx context.Context, blockID ulid.ULID, keys []labels.Label, tenant string) (hits map[labels.Label][]byte, misses []labels.Label) { - timer := prometheus.NewTimer(c.commonMetrics.fetchLatency.WithLabelValues(cacheTypePostings, tenant)) + timer := prometheus.NewTimer(c.commonMetrics.FetchLatency.WithLabelValues(CacheTypePostings, tenant)) defer timer.ObserveDuration() hits = map[labels.Label][]byte{} @@ -310,12 +310,12 @@ func (c *InMemoryIndexCache) FetchMultiPostings(ctx context.Context, blockID uli hit := 0 for _, key := range keys { if ctx.Err() != nil { - c.commonMetrics.requestTotal.WithLabelValues(cacheTypePostings, tenant).Add(float64(requests)) - c.commonMetrics.hitsTotal.WithLabelValues(cacheTypePostings, tenant).Add(float64(hit)) + c.commonMetrics.RequestTotal.WithLabelValues(CacheTypePostings, tenant).Add(float64(requests)) + c.commonMetrics.HitsTotal.WithLabelValues(CacheTypePostings, tenant).Add(float64(hit)) return hits, misses } requests++ - if b, ok := c.get(cacheKey{blockIDKey, cacheKeyPostings(key), ""}); ok { + if b, ok := c.get(CacheKey{blockIDKey, CacheKeyPostings(key), ""}); ok { hit++ hits[key] = b continue @@ -323,29 +323,29 @@ func (c *InMemoryIndexCache) FetchMultiPostings(ctx context.Context, blockID uli misses = append(misses, key) } - c.commonMetrics.requestTotal.WithLabelValues(cacheTypePostings, tenant).Add(float64(requests)) - c.commonMetrics.hitsTotal.WithLabelValues(cacheTypePostings, tenant).Add(float64(hit)) + c.commonMetrics.RequestTotal.WithLabelValues(CacheTypePostings, tenant).Add(float64(requests)) + c.commonMetrics.HitsTotal.WithLabelValues(CacheTypePostings, tenant).Add(float64(hit)) return hits, misses } // StoreExpandedPostings stores expanded postings for a set of label matchers. func (c *InMemoryIndexCache) StoreExpandedPostings(blockID ulid.ULID, matchers []*labels.Matcher, v []byte, tenant string) { - c.commonMetrics.dataSizeBytes.WithLabelValues(cacheTypeExpandedPostings, tenant).Observe(float64(len(v))) - c.set(cacheTypeExpandedPostings, cacheKey{block: blockID.String(), key: cacheKeyExpandedPostings(labelMatchersToString(matchers))}, v) + c.commonMetrics.DataSizeBytes.WithLabelValues(CacheTypeExpandedPostings, tenant).Observe(float64(len(v))) + c.set(CacheTypeExpandedPostings, CacheKey{Block: blockID.String(), Key: CacheKeyExpandedPostings(LabelMatchersToString(matchers))}, v) } // FetchExpandedPostings fetches expanded postings and returns cached data and a boolean value representing whether it is a cache hit or not. func (c *InMemoryIndexCache) FetchExpandedPostings(ctx context.Context, blockID ulid.ULID, matchers []*labels.Matcher, tenant string) ([]byte, bool) { - timer := prometheus.NewTimer(c.commonMetrics.fetchLatency.WithLabelValues(cacheTypeExpandedPostings, tenant)) + timer := prometheus.NewTimer(c.commonMetrics.FetchLatency.WithLabelValues(CacheTypeExpandedPostings, tenant)) defer timer.ObserveDuration() if ctx.Err() != nil { return nil, false } - c.commonMetrics.requestTotal.WithLabelValues(cacheTypeExpandedPostings, tenant).Inc() - if b, ok := c.get(cacheKey{blockID.String(), cacheKeyExpandedPostings(labelMatchersToString(matchers)), ""}); ok { - c.commonMetrics.hitsTotal.WithLabelValues(cacheTypeExpandedPostings, tenant).Inc() + c.commonMetrics.RequestTotal.WithLabelValues(CacheTypeExpandedPostings, tenant).Inc() + if b, ok := c.get(CacheKey{blockID.String(), CacheKeyExpandedPostings(LabelMatchersToString(matchers)), ""}); ok { + c.commonMetrics.HitsTotal.WithLabelValues(CacheTypeExpandedPostings, tenant).Inc() return b, true } return nil, false @@ -354,14 +354,14 @@ func (c *InMemoryIndexCache) FetchExpandedPostings(ctx context.Context, blockID // StoreSeries sets the series identified by the ulid and id to the value v, // if the series already exists in the cache it is not mutated. func (c *InMemoryIndexCache) StoreSeries(blockID ulid.ULID, id storage.SeriesRef, v []byte, tenant string) { - c.commonMetrics.dataSizeBytes.WithLabelValues(cacheTypeSeries, tenant).Observe(float64(len(v))) - c.set(cacheTypeSeries, cacheKey{blockID.String(), cacheKeySeries(id), ""}, v) + c.commonMetrics.DataSizeBytes.WithLabelValues(CacheTypeSeries, tenant).Observe(float64(len(v))) + c.set(CacheTypeSeries, CacheKey{blockID.String(), CacheKeySeries(id), ""}, v) } // FetchMultiSeries fetches multiple series - each identified by ID - from the cache // and returns a map containing cache hits, along with a list of missing IDs. func (c *InMemoryIndexCache) FetchMultiSeries(ctx context.Context, blockID ulid.ULID, ids []storage.SeriesRef, tenant string) (hits map[storage.SeriesRef][]byte, misses []storage.SeriesRef) { - timer := prometheus.NewTimer(c.commonMetrics.fetchLatency.WithLabelValues(cacheTypeSeries, tenant)) + timer := prometheus.NewTimer(c.commonMetrics.FetchLatency.WithLabelValues(CacheTypeSeries, tenant)) defer timer.ObserveDuration() hits = map[storage.SeriesRef][]byte{} @@ -371,12 +371,12 @@ func (c *InMemoryIndexCache) FetchMultiSeries(ctx context.Context, blockID ulid. hit := 0 for _, id := range ids { if ctx.Err() != nil { - c.commonMetrics.requestTotal.WithLabelValues(cacheTypeSeries, tenant).Add(float64(requests)) - c.commonMetrics.hitsTotal.WithLabelValues(cacheTypeSeries, tenant).Add(float64(hit)) + c.commonMetrics.RequestTotal.WithLabelValues(CacheTypeSeries, tenant).Add(float64(requests)) + c.commonMetrics.HitsTotal.WithLabelValues(CacheTypeSeries, tenant).Add(float64(hit)) return hits, misses } requests++ - if b, ok := c.get(cacheKey{blockIDKey, cacheKeySeries(id), ""}); ok { + if b, ok := c.get(CacheKey{blockIDKey, CacheKeySeries(id), ""}); ok { hit++ hits[id] = b continue @@ -384,8 +384,8 @@ func (c *InMemoryIndexCache) FetchMultiSeries(ctx context.Context, blockID ulid. misses = append(misses, id) } - c.commonMetrics.requestTotal.WithLabelValues(cacheTypeSeries, tenant).Add(float64(requests)) - c.commonMetrics.hitsTotal.WithLabelValues(cacheTypeSeries, tenant).Add(float64(hit)) + c.commonMetrics.RequestTotal.WithLabelValues(CacheTypeSeries, tenant).Add(float64(requests)) + c.commonMetrics.HitsTotal.WithLabelValues(CacheTypeSeries, tenant).Add(float64(hit)) return hits, misses } diff --git a/vendor/github.com/thanos-io/thanos/pkg/store/cache/memcached.go b/vendor/github.com/thanos-io/thanos/pkg/store/cache/memcached.go index a41c380a241..b99babfe035 100644 --- a/vendor/github.com/thanos-io/thanos/pkg/store/cache/memcached.go +++ b/vendor/github.com/thanos-io/thanos/pkg/store/cache/memcached.go @@ -43,7 +43,7 @@ type RemoteIndexCache struct { } // NewRemoteIndexCache makes a new RemoteIndexCache. -func NewRemoteIndexCache(logger log.Logger, cacheClient cacheutil.RemoteCacheClient, commonMetrics *commonMetrics, reg prometheus.Registerer, ttl time.Duration) (*RemoteIndexCache, error) { +func NewRemoteIndexCache(logger log.Logger, cacheClient cacheutil.RemoteCacheClient, commonMetrics *CommonMetrics, reg prometheus.Registerer, ttl time.Duration) (*RemoteIndexCache, error) { c := &RemoteIndexCache{ ttl: ttl, logger: logger, @@ -52,26 +52,26 @@ func NewRemoteIndexCache(logger log.Logger, cacheClient cacheutil.RemoteCacheCli } if commonMetrics == nil { - commonMetrics = newCommonMetrics(reg) + commonMetrics = NewCommonMetrics(reg) } - c.requestTotal = commonMetrics.requestTotal - c.hitsTotal = commonMetrics.hitsTotal - c.dataSizeBytes = commonMetrics.dataSizeBytes - c.fetchLatency = commonMetrics.fetchLatency + c.requestTotal = commonMetrics.RequestTotal + c.hitsTotal = commonMetrics.HitsTotal + c.dataSizeBytes = commonMetrics.DataSizeBytes + c.fetchLatency = commonMetrics.FetchLatency // Init requestTtotal and hitsTotal with default tenant - c.requestTotal.WithLabelValues(cacheTypePostings, tenancy.DefaultTenant) - c.requestTotal.WithLabelValues(cacheTypeSeries, tenancy.DefaultTenant) - c.requestTotal.WithLabelValues(cacheTypeExpandedPostings, tenancy.DefaultTenant) + c.requestTotal.WithLabelValues(CacheTypePostings, tenancy.DefaultTenant) + c.requestTotal.WithLabelValues(CacheTypeSeries, tenancy.DefaultTenant) + c.requestTotal.WithLabelValues(CacheTypeExpandedPostings, tenancy.DefaultTenant) - c.hitsTotal.WithLabelValues(cacheTypePostings, tenancy.DefaultTenant) - c.hitsTotal.WithLabelValues(cacheTypeSeries, tenancy.DefaultTenant) - c.hitsTotal.WithLabelValues(cacheTypeExpandedPostings, tenancy.DefaultTenant) + c.hitsTotal.WithLabelValues(CacheTypePostings, tenancy.DefaultTenant) + c.hitsTotal.WithLabelValues(CacheTypeSeries, tenancy.DefaultTenant) + c.hitsTotal.WithLabelValues(CacheTypeExpandedPostings, tenancy.DefaultTenant) - c.fetchLatency.WithLabelValues(cacheTypePostings, tenancy.DefaultTenant) - c.fetchLatency.WithLabelValues(cacheTypeSeries, tenancy.DefaultTenant) - c.fetchLatency.WithLabelValues(cacheTypeExpandedPostings, tenancy.DefaultTenant) + c.fetchLatency.WithLabelValues(CacheTypePostings, tenancy.DefaultTenant) + c.fetchLatency.WithLabelValues(CacheTypeSeries, tenancy.DefaultTenant) + c.fetchLatency.WithLabelValues(CacheTypeExpandedPostings, tenancy.DefaultTenant) level.Info(logger).Log("msg", "created index cache") @@ -82,8 +82,8 @@ func NewRemoteIndexCache(logger log.Logger, cacheClient cacheutil.RemoteCacheCli // The function enqueues the request and returns immediately: the entry will be // asynchronously stored in the cache. func (c *RemoteIndexCache) StorePostings(blockID ulid.ULID, l labels.Label, v []byte, tenant string) { - c.dataSizeBytes.WithLabelValues(cacheTypePostings, tenant).Observe(float64(len(v))) - key := cacheKey{blockID.String(), cacheKeyPostings(l), c.compressionScheme}.string() + c.dataSizeBytes.WithLabelValues(CacheTypePostings, tenant).Observe(float64(len(v))) + key := CacheKey{blockID.String(), CacheKeyPostings(l), c.compressionScheme}.String() if err := c.memcached.SetAsync(key, v, c.ttl); err != nil { level.Error(c.logger).Log("msg", "failed to cache postings in memcached", "err", err) } @@ -93,19 +93,19 @@ func (c *RemoteIndexCache) StorePostings(blockID ulid.ULID, l labels.Label, v [] // and returns a map containing cache hits, along with a list of missing keys. // In case of error, it logs and return an empty cache hits map. func (c *RemoteIndexCache) FetchMultiPostings(ctx context.Context, blockID ulid.ULID, lbls []labels.Label, tenant string) (hits map[labels.Label][]byte, misses []labels.Label) { - timer := prometheus.NewTimer(c.fetchLatency.WithLabelValues(cacheTypePostings, tenant)) + timer := prometheus.NewTimer(c.fetchLatency.WithLabelValues(CacheTypePostings, tenant)) defer timer.ObserveDuration() keys := make([]string, 0, len(lbls)) blockIDKey := blockID.String() for _, lbl := range lbls { - key := cacheKey{blockIDKey, cacheKeyPostings(lbl), c.compressionScheme}.string() + key := CacheKey{blockIDKey, CacheKeyPostings(lbl), c.compressionScheme}.String() keys = append(keys, key) } // Fetch the keys from memcached in a single request. - c.requestTotal.WithLabelValues(cacheTypePostings, tenant).Add(float64(len(keys))) + c.requestTotal.WithLabelValues(CacheTypePostings, tenant).Add(float64(len(keys))) results := c.memcached.GetMulti(ctx, keys) if len(results) == 0 { @@ -126,7 +126,7 @@ func (c *RemoteIndexCache) FetchMultiPostings(ctx context.Context, blockID ulid. hits[lbl] = value } - c.hitsTotal.WithLabelValues(cacheTypePostings, tenant).Add(float64(len(hits))) + c.hitsTotal.WithLabelValues(CacheTypePostings, tenant).Add(float64(len(hits))) return hits, misses } @@ -134,8 +134,8 @@ func (c *RemoteIndexCache) FetchMultiPostings(ctx context.Context, blockID ulid. // The function enqueues the request and returns immediately: the entry will be // asynchronously stored in the cache. func (c *RemoteIndexCache) StoreExpandedPostings(blockID ulid.ULID, keys []*labels.Matcher, v []byte, tenant string) { - c.dataSizeBytes.WithLabelValues(cacheTypeExpandedPostings, tenant).Observe(float64(len(v))) - key := cacheKey{blockID.String(), cacheKeyExpandedPostings(labelMatchersToString(keys)), c.compressionScheme}.string() + c.dataSizeBytes.WithLabelValues(CacheTypeExpandedPostings, tenant).Observe(float64(len(v))) + key := CacheKey{blockID.String(), CacheKeyExpandedPostings(LabelMatchersToString(keys)), c.compressionScheme}.String() if err := c.memcached.SetAsync(key, v, c.ttl); err != nil { level.Error(c.logger).Log("msg", "failed to cache expanded postings in memcached", "err", err) @@ -146,19 +146,19 @@ func (c *RemoteIndexCache) StoreExpandedPostings(blockID ulid.ULID, keys []*labe // and returns a map containing cache hits, along with a list of missing keys. // In case of error, it logs and return an empty cache hits map. func (c *RemoteIndexCache) FetchExpandedPostings(ctx context.Context, blockID ulid.ULID, lbls []*labels.Matcher, tenant string) ([]byte, bool) { - timer := prometheus.NewTimer(c.fetchLatency.WithLabelValues(cacheTypeExpandedPostings, tenant)) + timer := prometheus.NewTimer(c.fetchLatency.WithLabelValues(CacheTypeExpandedPostings, tenant)) defer timer.ObserveDuration() - key := cacheKey{blockID.String(), cacheKeyExpandedPostings(labelMatchersToString(lbls)), c.compressionScheme}.string() + key := CacheKey{blockID.String(), CacheKeyExpandedPostings(LabelMatchersToString(lbls)), c.compressionScheme}.String() // Fetch the keys from memcached in a single request. - c.requestTotal.WithLabelValues(cacheTypeExpandedPostings, tenant).Add(1) + c.requestTotal.WithLabelValues(CacheTypeExpandedPostings, tenant).Add(1) results := c.memcached.GetMulti(ctx, []string{key}) if len(results) == 0 { return nil, false } if res, ok := results[key]; ok { - c.hitsTotal.WithLabelValues(cacheTypeExpandedPostings, tenant).Add(1) + c.hitsTotal.WithLabelValues(CacheTypeExpandedPostings, tenant).Add(1) return res, true } return nil, false @@ -168,8 +168,8 @@ func (c *RemoteIndexCache) FetchExpandedPostings(ctx context.Context, blockID ul // The function enqueues the request and returns immediately: the entry will be // asynchronously stored in the cache. func (c *RemoteIndexCache) StoreSeries(blockID ulid.ULID, id storage.SeriesRef, v []byte, tenant string) { - c.dataSizeBytes.WithLabelValues(cacheTypeSeries, tenant).Observe(float64(len(v))) - key := cacheKey{blockID.String(), cacheKeySeries(id), ""}.string() + c.dataSizeBytes.WithLabelValues(CacheTypeSeries, tenant).Observe(float64(len(v))) + key := CacheKey{blockID.String(), CacheKeySeries(id), ""}.String() if err := c.memcached.SetAsync(key, v, c.ttl); err != nil { level.Error(c.logger).Log("msg", "failed to cache series in memcached", "err", err) @@ -180,19 +180,19 @@ func (c *RemoteIndexCache) StoreSeries(blockID ulid.ULID, id storage.SeriesRef, // and returns a map containing cache hits, along with a list of missing IDs. // In case of error, it logs and return an empty cache hits map. func (c *RemoteIndexCache) FetchMultiSeries(ctx context.Context, blockID ulid.ULID, ids []storage.SeriesRef, tenant string) (hits map[storage.SeriesRef][]byte, misses []storage.SeriesRef) { - timer := prometheus.NewTimer(c.fetchLatency.WithLabelValues(cacheTypeSeries, tenant)) + timer := prometheus.NewTimer(c.fetchLatency.WithLabelValues(CacheTypeSeries, tenant)) defer timer.ObserveDuration() keys := make([]string, 0, len(ids)) blockIDKey := blockID.String() for _, id := range ids { - key := cacheKey{blockIDKey, cacheKeySeries(id), ""}.string() + key := CacheKey{blockIDKey, CacheKeySeries(id), ""}.String() keys = append(keys, key) } // Fetch the keys from memcached in a single request. - c.requestTotal.WithLabelValues(cacheTypeSeries, tenant).Add(float64(len(ids))) + c.requestTotal.WithLabelValues(CacheTypeSeries, tenant).Add(float64(len(ids))) results := c.memcached.GetMulti(ctx, keys) if len(results) == 0 { return nil, ids @@ -212,7 +212,7 @@ func (c *RemoteIndexCache) FetchMultiSeries(ctx context.Context, blockID ulid.UL hits[id] = value } - c.hitsTotal.WithLabelValues(cacheTypeSeries, tenant).Add(float64(len(hits))) + c.hitsTotal.WithLabelValues(CacheTypeSeries, tenant).Add(float64(len(hits))) return hits, misses } diff --git a/vendor/github.com/thanos-io/thanos/pkg/tenancy/tenancy.go b/vendor/github.com/thanos-io/thanos/pkg/tenancy/tenancy.go index 4a874855fc1..f8b54bcc48d 100644 --- a/vendor/github.com/thanos-io/thanos/pkg/tenancy/tenancy.go +++ b/vendor/github.com/thanos-io/thanos/pkg/tenancy/tenancy.go @@ -47,7 +47,10 @@ func GetTenantFromHTTP(r *http.Request, tenantHeader string, defaultTenantID str var err error tenant := r.Header.Get(tenantHeader) if tenant == "" { - tenant = defaultTenantID + tenant = r.Header.Get(DefaultTenantHeader) + if tenant == "" { + tenant = defaultTenantID + } } if certTenantField != "" { @@ -65,6 +68,28 @@ func GetTenantFromHTTP(r *http.Request, tenantHeader string, defaultTenantID str return tenant, nil } +type roundTripperFunc func(*http.Request) (*http.Response, error) + +func (r roundTripperFunc) RoundTrip(request *http.Request) (*http.Response, error) { + return r(request) +} + +// InternalTenancyConversionTripper is a tripperware that rewrites the configurable tenancy header in the request into +// the hardcoded tenancy header that is used for internal communication in Thanos components. If any custom tenant +// header is configured and present in the request, it will be stripped out. +func InternalTenancyConversionTripper(customTenantHeader, certTenantField string, next http.RoundTripper) http.RoundTripper { + return roundTripperFunc(func(r *http.Request) (*http.Response, error) { + tenant, _ := GetTenantFromHTTP(r, customTenantHeader, DefaultTenant, certTenantField) + r.Header.Set(DefaultTenantHeader, tenant) + // If the custom tenant header is not the same as the default internal header, we want to exclude the custom + // one from the request to keep things simple. + if customTenantHeader != DefaultTenantHeader { + r.Header.Del(customTenantHeader) + } + return next.RoundTrip(r) + }) +} + // getTenantFromCertificate extracts the tenant value from a client's presented certificate. The x509 field to use as // value can be configured with Options.TenantField. An error is returned when the extraction has not succeeded. func getTenantFromCertificate(r *http.Request, certTenantField string) (string, error) { diff --git a/vendor/modules.txt b/vendor/modules.txt index 243ac06c392..6ff653943e5 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -93,6 +93,9 @@ github.com/AzureAD/microsoft-authentication-library-for-go/apps/public # github.com/Masterminds/squirrel v1.5.4 ## explicit; go 1.14 github.com/Masterminds/squirrel +# github.com/VictoriaMetrics/fastcache v1.12.1 +## explicit; go 1.13 +github.com/VictoriaMetrics/fastcache # github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 ## explicit github.com/alecthomas/template @@ -903,7 +906,7 @@ github.com/thanos-io/promql-engine/extlabels github.com/thanos-io/promql-engine/logicalplan github.com/thanos-io/promql-engine/query github.com/thanos-io/promql-engine/worker -# github.com/thanos-io/thanos v0.32.5-0.20231023172853-513272e70874 +# github.com/thanos-io/thanos v0.32.5-0.20231025032614-da62b6bf84b8 ## explicit; go 1.21 github.com/thanos-io/thanos/pkg/block github.com/thanos-io/thanos/pkg/block/indexheader