Skip to content

Commit

Permalink
replace inmemory index cache to fastcache based implementation
Browse files Browse the repository at this point in the history
Signed-off-by: Ben Ye <[email protected]>
  • Loading branch information
yeya24 committed Oct 25, 2023
1 parent f608df2 commit 46ead63
Show file tree
Hide file tree
Showing 21 changed files with 1,777 additions and 171 deletions.
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ require (
github.com/stretchr/testify v1.8.4
github.com/thanos-io/objstore v0.0.0-20230921130928-63a603e651ed
github.com/thanos-io/promql-engine v0.0.0-20231013104847-4517c0d5f591
github.com/thanos-io/thanos v0.32.5-0.20231023172853-513272e70874
github.com/thanos-io/thanos v0.32.5-0.20231025032614-da62b6bf84b8
github.com/uber/jaeger-client-go v2.30.0+incompatible
github.com/weaveworks/common v0.0.0-20221201103051-7c2720a9024d
go.etcd.io/etcd/api/v3 v3.5.9
Expand All @@ -77,6 +77,7 @@ require (
)

require (
github.com/VictoriaMetrics/fastcache v1.12.1
github.com/cespare/xxhash/v2 v2.2.0
github.com/google/go-cmp v0.5.9
google.golang.org/protobuf v1.31.0
Expand Down
8 changes: 6 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,8 @@ github.com/Microsoft/go-winio v0.6.1/go.mod h1:LRdKpFKfdobln8UmuiYcKPot9D2v6svN5
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
github.com/OneOfOne/xxhash v1.2.6 h1:U68crOE3y3MPttCMQGywZOLrTeF5HHJ3/vDBCJn9/bA=
github.com/OneOfOne/xxhash v1.2.6/go.mod h1:eZbhyaAYD41SGSSsnmcpxVoRiQ/MPUTjUdIIOT9Um7Q=
github.com/VictoriaMetrics/fastcache v1.12.1 h1:i0mICQuojGDL3KblA7wUNlY5lOK6a4bwt3uRKnkZU40=
github.com/VictoriaMetrics/fastcache v1.12.1/go.mod h1:tX04vaqcNoQeGLD+ra5pU5sWkuxnzWhEzLwhP9w653o=
github.com/ajstarks/svgo v0.0.0-20180226025133-644b8db467af/go.mod h1:K08gAheRH3/J6wwsYMMT4xOr94bZjxIelGM0+d/wbFw=
github.com/alecthomas/kingpin v1.3.8-0.20210301060133-17f40c25f497 h1:aDITxVUQ/3KBhpVWX57Vo9ntGTxoRw1F0T6/x/tRzNU=
github.com/alecthomas/kingpin v1.3.8-0.20210301060133-17f40c25f497/go.mod h1:b6br6/pDFSfMkBgC96TbpOji05q5pa+v5rIlS0Y6XtI=
Expand All @@ -455,6 +457,8 @@ github.com/alicebob/miniredis/v2 v2.30.4 h1:8S4/o1/KoUArAGbGwPxcwf0krlzceva2XVOS
github.com/alicebob/miniredis/v2 v2.30.4/go.mod h1:b25qWj4fCEsBeAAR2mlb0ufImGC6uH3VlUfb/HS5zKg=
github.com/aliyun/aliyun-oss-go-sdk v2.2.2+incompatible h1:9gWa46nstkJ9miBReJcN8Gq34cBFbzSpQZVVT9N09TM=
github.com/aliyun/aliyun-oss-go-sdk v2.2.2+incompatible/go.mod h1:T/Aws4fEfogEE9v+HPhhw+CntffsBHJ8nXQCwKr0/g8=
github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156 h1:eMwmnE/GDgah4HI848JfFxHt+iPb26b4zyfspmqY0/8=
github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156/go.mod h1:Cb/ax3seSYIx7SuZdm2G2xzfwmv3TPSk2ucNfQESPXM=
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o=
github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY=
Expand Down Expand Up @@ -1277,8 +1281,8 @@ github.com/thanos-io/objstore v0.0.0-20230921130928-63a603e651ed h1:iWQdY3S6DpWj
github.com/thanos-io/objstore v0.0.0-20230921130928-63a603e651ed/go.mod h1:oJ82xgcBDzGJrEgUsjlTj6n01+ZWUMMUR8BlZzX5xDE=
github.com/thanos-io/promql-engine v0.0.0-20231013104847-4517c0d5f591 h1:6bZbFM+Mvy2kL8BeL8TJ5+5pV3sUR2PSLaZyw911rtQ=
github.com/thanos-io/promql-engine v0.0.0-20231013104847-4517c0d5f591/go.mod h1:vfXJv1JXNdLfHnjsHsLLJl5tyI7KblF76Wo5lZ9YC4Q=
github.com/thanos-io/thanos v0.32.5-0.20231023172853-513272e70874 h1:7/j60inmFvV9uFvbxDdghob5DR92M7mvmn/Tw+CKK3o=
github.com/thanos-io/thanos v0.32.5-0.20231023172853-513272e70874/go.mod h1:eVFfte7jP1aTcTkQcZEj5/P9rCeMFHllEqfNZqirLLA=
github.com/thanos-io/thanos v0.32.5-0.20231025032614-da62b6bf84b8 h1:mWlY64XMYTFeCk4WziW33xerKsp+BWOck6g77cz9ZgA=
github.com/thanos-io/thanos v0.32.5-0.20231025032614-da62b6bf84b8/go.mod h1:eVFfte7jP1aTcTkQcZEj5/P9rCeMFHllEqfNZqirLLA=
github.com/themihai/gomemcache v0.0.0-20180902122335-24332e2d58ab h1:7ZR3hmisBWw77ZpO1/o86g+JV3VKlk3d48jopJxzTjU=
github.com/themihai/gomemcache v0.0.0-20180902122335-24332e2d58ab/go.mod h1:eheTFp954zcWZXCU8d0AT76ftsQOTo4DTqkN/h3k1MY=
github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=
Expand Down
19 changes: 3 additions & 16 deletions integration/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,10 +257,7 @@ func TestQuerierWithBlocksStorageRunningInMicroservicesMode(t *testing.T) {
require.NoError(t, storeGateways.WaitSumMetrics(e2e.Equals(float64((5+5+2)*numberOfCacheBackends)), "thanos_store_index_cache_requests_total"))
require.NoError(t, storeGateways.WaitSumMetrics(e2e.Equals(0), "thanos_store_index_cache_hits_total")) // no cache hit cause the cache was empty

if testCfg.indexCacheBackend == tsdb.IndexCacheBackendInMemory {
require.NoError(t, storeGateways.WaitSumMetrics(e2e.Equals(9), "thanos_store_index_cache_items"))
require.NoError(t, storeGateways.WaitSumMetrics(e2e.Equals(9), "thanos_store_index_cache_items_added_total"))
} else if testCfg.indexCacheBackend == tsdb.IndexCacheBackendMemcached {
if testCfg.indexCacheBackend == tsdb.IndexCacheBackendMemcached {
require.NoError(t, storeGateways.WaitSumMetrics(e2e.Equals(21), "thanos_memcached_operations_total")) // 14 gets + 7 sets
}

Expand Down Expand Up @@ -297,10 +294,6 @@ func TestQuerierWithBlocksStorageRunningInMicroservicesMode(t *testing.T) {
}
require.NoError(t, storeGateways.WaitSumMetrics(e2e.Equals(2), "thanos_store_index_cache_hits_total")) // this time has used the index cache

if testCfg.indexCacheBackend == tsdb.IndexCacheBackendInMemory {
require.NoError(t, storeGateways.WaitSumMetrics(e2e.Equals(9), "thanos_store_index_cache_items")) // as before
require.NoError(t, storeGateways.WaitSumMetrics(e2e.Equals(9), "thanos_store_index_cache_items_added_total")) // as before
}
if testCfg.indexCacheBackend == tsdb.IndexCacheBackendMemcached {
require.NoError(t, storeGateways.WaitSumMetrics(e2e.Equals(23-l0CacheHits), "thanos_memcached_operations_total")) // as before + 2 gets - cache hits
}
Expand Down Expand Up @@ -516,10 +509,7 @@ func TestQuerierWithBlocksStorageRunningInSingleBinaryMode(t *testing.T) {
require.NoError(t, cluster.WaitSumMetrics(e2e.Equals(float64((5+5+2)*seriesReplicationFactor)), "thanos_store_index_cache_requests_total")) // 5 for expanded postings and postings, 2 for series
require.NoError(t, cluster.WaitSumMetrics(e2e.Equals(0), "thanos_store_index_cache_hits_total")) // no cache hit cause the cache was empty

if testCfg.indexCacheBackend == tsdb.IndexCacheBackendInMemory {
require.NoError(t, cluster.WaitSumMetrics(e2e.Equals(float64(9*seriesReplicationFactor)), "thanos_store_index_cache_items"))
require.NoError(t, cluster.WaitSumMetrics(e2e.Equals(float64(9*seriesReplicationFactor)), "thanos_store_index_cache_items_added_total"))
} else if testCfg.indexCacheBackend == tsdb.IndexCacheBackendMemcached {
if testCfg.indexCacheBackend == tsdb.IndexCacheBackendMemcached {
require.NoError(t, cluster.WaitSumMetrics(e2e.Equals(float64(21*seriesReplicationFactor)), "thanos_memcached_operations_total")) // 14 gets + 7 sets
}

Expand All @@ -532,10 +522,7 @@ func TestQuerierWithBlocksStorageRunningInSingleBinaryMode(t *testing.T) {
require.NoError(t, cluster.WaitSumMetrics(e2e.Equals(float64((12+2)*seriesReplicationFactor)), "thanos_store_index_cache_requests_total"))
require.NoError(t, cluster.WaitSumMetrics(e2e.Equals(float64(2*seriesReplicationFactor)), "thanos_store_index_cache_hits_total")) // this time has used the index cache

if testCfg.indexCacheBackend == tsdb.IndexCacheBackendInMemory {
require.NoError(t, cluster.WaitSumMetrics(e2e.Equals(float64(9*seriesReplicationFactor)), "thanos_store_index_cache_items")) // as before
require.NoError(t, cluster.WaitSumMetrics(e2e.Equals(float64(9*seriesReplicationFactor)), "thanos_store_index_cache_items_added_total")) // as before
} else if testCfg.indexCacheBackend == tsdb.IndexCacheBackendMemcached {
if testCfg.indexCacheBackend == tsdb.IndexCacheBackendMemcached {
require.NoError(t, cluster.WaitSumMetrics(e2e.Equals(float64((21+2)*seriesReplicationFactor)), "thanos_memcached_operations_total")) // as before + 2 gets
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/tsdb/index_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ func newInMemoryIndexCache(cfg InMemoryIndexCacheConfig, logger log.Logger, regi
maxItemSize = maxCacheSize
}

return storecache.NewInMemoryIndexCacheWithConfig(logger, nil, registerer, storecache.InMemoryIndexCacheConfig{
return NewInMemoryIndexCacheWithConfig(logger, nil, registerer, storecache.InMemoryIndexCacheConfig{
MaxSize: maxCacheSize,
MaxItemSize: maxItemSize,
})
Expand Down
236 changes: 236 additions & 0 deletions pkg/storage/tsdb/inmemory_index_cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,236 @@
package tsdb

import (
"context"
"reflect"
"unsafe"

"github.com/VictoriaMetrics/fastcache"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/oklog/ulid"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"

storecache "github.com/thanos-io/thanos/pkg/store/cache"
"github.com/thanos-io/thanos/pkg/tenancy"
)

type InMemoryIndexCache struct {
logger log.Logger
cache *fastcache.Cache
maxItemSizeBytes uint64

added *prometheus.CounterVec
overflow *prometheus.CounterVec

commonMetrics *storecache.CommonMetrics
}

// NewInMemoryIndexCacheWithConfig creates a new thread-safe cache for index entries. It relies on the cache library
// (fastcache) to ensures the total cache size approximately does not exceed maxBytes.
func NewInMemoryIndexCacheWithConfig(logger log.Logger, commonMetrics *storecache.CommonMetrics, reg prometheus.Registerer, config storecache.InMemoryIndexCacheConfig) (*InMemoryIndexCache, error) {
if config.MaxItemSize > config.MaxSize {
return nil, errors.Errorf("max item size (%v) cannot be bigger than overall cache size (%v)", config.MaxItemSize, config.MaxSize)
}

// fastcache will panic if MaxSize <= 0.
if config.MaxSize <= 0 {
config.MaxSize = storecache.DefaultInMemoryIndexCacheConfig.MaxSize
}

if commonMetrics == nil {
commonMetrics = storecache.NewCommonMetrics(reg)
}

c := &InMemoryIndexCache{
logger: logger,
maxItemSizeBytes: uint64(config.MaxItemSize),
commonMetrics: commonMetrics,
}

c.added = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "thanos_store_index_cache_items_added_total",
Help: "Total number of items that were added to the index cache.",
}, []string{"item_type"})
c.added.WithLabelValues(cacheTypePostings)
c.added.WithLabelValues(cacheTypeSeries)
c.added.WithLabelValues(cacheTypeExpandedPostings)

c.commonMetrics.RequestTotal.WithLabelValues(cacheTypePostings, tenancy.DefaultTenant)
c.commonMetrics.RequestTotal.WithLabelValues(cacheTypeSeries, tenancy.DefaultTenant)
c.commonMetrics.RequestTotal.WithLabelValues(cacheTypeExpandedPostings, tenancy.DefaultTenant)

c.overflow = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "thanos_store_index_cache_items_overflowed_total",
Help: "Total number of items that could not be added to the cache due to being too big.",
}, []string{"item_type"})
c.overflow.WithLabelValues(cacheTypePostings)
c.overflow.WithLabelValues(cacheTypeSeries)
c.overflow.WithLabelValues(cacheTypeExpandedPostings)

c.commonMetrics.HitsTotal.WithLabelValues(cacheTypePostings, tenancy.DefaultTenant)
c.commonMetrics.HitsTotal.WithLabelValues(cacheTypeSeries, tenancy.DefaultTenant)
c.commonMetrics.HitsTotal.WithLabelValues(cacheTypeExpandedPostings, tenancy.DefaultTenant)

c.cache = fastcache.New(int(config.MaxSize))
level.Info(logger).Log(
"msg", "created in-memory index cache",
"maxItemSizeBytes", c.maxItemSizeBytes,
"maxSizeBytes", config.MaxSize,
)
return c, nil
}

func (c *InMemoryIndexCache) get(key storecache.CacheKey) ([]byte, bool) {
k := yoloBuf(key.String())
resp := c.cache.GetBig(nil, k)
if len(resp) == 0 {
return nil, false
}
return resp, true
}

func (c *InMemoryIndexCache) set(typ string, key storecache.CacheKey, val []byte) {
k := yoloBuf(key.String())
r := c.cache.GetBig(nil, k)
// item exists, no need to set it again.
if r != nil {
return
}

size := uint64(len(k) + len(val))
if size > c.maxItemSizeBytes {
level.Info(c.logger).Log(
"msg", "item bigger than maxItemSizeBytes. Ignoring..",
"maxItemSizeBytes", c.maxItemSizeBytes,
"cacheType", typ,
)
c.overflow.WithLabelValues(typ).Inc()
return
}

c.cache.SetBig(k, val)
c.added.WithLabelValues(typ).Inc()
}

func yoloBuf(s string) []byte {
return *(*[]byte)(unsafe.Pointer(&s))
}

func copyString(s string) string {
var b []byte
h := (*reflect.SliceHeader)(unsafe.Pointer(&b))
h.Data = (*reflect.StringHeader)(unsafe.Pointer(&s)).Data
h.Len = len(s)
h.Cap = len(s)
return string(b)
}

// copyToKey is required as underlying strings might be mmaped.
func copyToKey(l labels.Label) storecache.CacheKeyPostings {
return storecache.CacheKeyPostings(labels.Label{Value: copyString(l.Value), Name: copyString(l.Name)})
}

// StorePostings sets the postings identified by the ulid and label to the value v,
// if the postings already exists in the cache it is not mutated.
func (c *InMemoryIndexCache) StorePostings(blockID ulid.ULID, l labels.Label, v []byte, tenant string) {
c.commonMetrics.DataSizeBytes.WithLabelValues(cacheTypePostings, tenant).Observe(float64(len(v)))
c.set(cacheTypePostings, storecache.CacheKey{Block: blockID.String(), Key: copyToKey(l)}, v)
}

// FetchMultiPostings fetches multiple postings - each identified by a label -
// and returns a map containing cache hits, along with a list of missing keys.
func (c *InMemoryIndexCache) FetchMultiPostings(ctx context.Context, blockID ulid.ULID, keys []labels.Label, tenant string) (hits map[labels.Label][]byte, misses []labels.Label) {
timer := prometheus.NewTimer(c.commonMetrics.FetchLatency.WithLabelValues(cacheTypePostings, tenant))
defer timer.ObserveDuration()

hits = map[labels.Label][]byte{}

blockIDKey := blockID.String()
requests := 0
hit := 0
for _, key := range keys {
if ctx.Err() != nil {
c.commonMetrics.RequestTotal.WithLabelValues(cacheTypePostings, tenant).Add(float64(requests))
c.commonMetrics.HitsTotal.WithLabelValues(cacheTypePostings, tenant).Add(float64(hit))
return hits, misses
}
requests++
if b, ok := c.get(storecache.CacheKey{Block: blockIDKey, Key: storecache.CacheKeyPostings(key)}); ok {
hit++
hits[key] = b
continue
}

misses = append(misses, key)
}
c.commonMetrics.RequestTotal.WithLabelValues(cacheTypePostings, tenant).Add(float64(requests))
c.commonMetrics.HitsTotal.WithLabelValues(cacheTypePostings, tenant).Add(float64(hit))

return hits, misses
}

// StoreExpandedPostings stores expanded postings for a set of label matchers.
func (c *InMemoryIndexCache) StoreExpandedPostings(blockID ulid.ULID, matchers []*labels.Matcher, v []byte, tenant string) {
c.commonMetrics.DataSizeBytes.WithLabelValues(cacheTypeExpandedPostings, tenant).Observe(float64(len(v)))
c.set(cacheTypeExpandedPostings, storecache.CacheKey{Block: blockID.String(), Key: storecache.CacheKeyExpandedPostings(storecache.LabelMatchersToString(matchers))}, v)
}

// FetchExpandedPostings fetches expanded postings and returns cached data and a boolean value representing whether it is a cache hit or not.
func (c *InMemoryIndexCache) FetchExpandedPostings(ctx context.Context, blockID ulid.ULID, matchers []*labels.Matcher, tenant string) ([]byte, bool) {
timer := prometheus.NewTimer(c.commonMetrics.FetchLatency.WithLabelValues(cacheTypeExpandedPostings, tenant))
defer timer.ObserveDuration()

if ctx.Err() != nil {
return nil, false
}
c.commonMetrics.RequestTotal.WithLabelValues(cacheTypeExpandedPostings, tenant).Inc()
if b, ok := c.get(storecache.CacheKey{Block: blockID.String(), Key: storecache.CacheKeyExpandedPostings(storecache.LabelMatchersToString(matchers))}); ok {
c.commonMetrics.HitsTotal.WithLabelValues(cacheTypeExpandedPostings, tenant).Inc()
return b, true
}
return nil, false
}

// StoreSeries sets the series identified by the ulid and id to the value v,
// if the series already exists in the cache it is not mutated.
func (c *InMemoryIndexCache) StoreSeries(blockID ulid.ULID, id storage.SeriesRef, v []byte, tenant string) {
c.commonMetrics.DataSizeBytes.WithLabelValues(cacheTypeSeries, tenant).Observe(float64(len(v)))
c.set(cacheTypeSeries, storecache.CacheKey{Block: blockID.String(), Key: storecache.CacheKeySeries(id)}, v)
}

// FetchMultiSeries fetches multiple series - each identified by ID - from the cache
// and returns a map containing cache hits, along with a list of missing IDs.
func (c *InMemoryIndexCache) FetchMultiSeries(ctx context.Context, blockID ulid.ULID, ids []storage.SeriesRef, tenant string) (hits map[storage.SeriesRef][]byte, misses []storage.SeriesRef) {
timer := prometheus.NewTimer(c.commonMetrics.FetchLatency.WithLabelValues(cacheTypeSeries, tenant))
defer timer.ObserveDuration()

hits = map[storage.SeriesRef][]byte{}

blockIDKey := blockID.String()
requests := 0
hit := 0
for _, id := range ids {
if ctx.Err() != nil {
c.commonMetrics.RequestTotal.WithLabelValues(cacheTypeSeries, tenant).Add(float64(requests))
c.commonMetrics.HitsTotal.WithLabelValues(cacheTypeSeries, tenant).Add(float64(hit))
return hits, misses
}
requests++
if b, ok := c.get(storecache.CacheKey{Block: blockIDKey, Key: storecache.CacheKeySeries(id)}); ok {
hit++
hits[id] = b
continue
}

misses = append(misses, id)
}
c.commonMetrics.RequestTotal.WithLabelValues(cacheTypeSeries, tenant).Add(float64(requests))
c.commonMetrics.HitsTotal.WithLabelValues(cacheTypeSeries, tenant).Add(float64(hit))

return hits, misses
}
Loading

0 comments on commit 46ead63

Please sign in to comment.