Skip to content

Commit

Permalink
check context in multi level cache and add histogram to measure latency
Browse files Browse the repository at this point in the history
Signed-off-by: Ben Ye <[email protected]>
  • Loading branch information
yeya24 committed Oct 10, 2023
1 parent f16bb49 commit 7a8cbd6
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 5 deletions.
2 changes: 1 addition & 1 deletion pkg/storage/tsdb/index_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ func NewIndexCache(cfg IndexCacheConfig, logger log.Logger, registerer prometheu
}
}

return newMultiLevelCache(caches...), nil
return newMultiLevelCache(registerer, caches...), nil
}

func newInMemoryIndexCache(cfg InMemoryIndexCacheConfig, logger log.Logger, registerer prometheus.Registerer) (storecache.IndexCache, error) {
Expand Down
41 changes: 40 additions & 1 deletion pkg/storage/tsdb/multilevel_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,23 @@ import (
"sync"

"github.com/oklog/ulid"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
storecache "github.com/thanos-io/thanos/pkg/store/cache"
)

const (
cacheTypePostings string = "Postings"
cacheTypeExpandedPostings string = "ExpandedPostings"
cacheTypeSeries string = "Series"
)

type multiLevelCache struct {
caches []storecache.IndexCache

fetchLatency *prometheus.HistogramVec
}

func (m *multiLevelCache) StorePostings(blockID ulid.ULID, l labels.Label, v []byte, tenant string) {
Expand All @@ -28,11 +38,17 @@ func (m *multiLevelCache) StorePostings(blockID ulid.ULID, l labels.Label, v []b
}

func (m *multiLevelCache) FetchMultiPostings(ctx context.Context, blockID ulid.ULID, keys []labels.Label, tenant string) (hits map[labels.Label][]byte, misses []labels.Label) {
timer := prometheus.NewTimer(m.fetchLatency.WithLabelValues(cacheTypePostings))
defer timer.ObserveDuration()

misses = keys
hits = map[labels.Label][]byte{}
backfillMap := map[storecache.IndexCache][]map[labels.Label][]byte{}
for i, c := range m.caches {
backfillMap[c] = []map[labels.Label][]byte{}
if ctx.Err() != nil {
return
}
h, mi := c.FetchMultiPostings(ctx, blockID, misses, tenant)
misses = mi

Expand All @@ -53,6 +69,9 @@ func (m *multiLevelCache) FetchMultiPostings(ctx context.Context, blockID ulid.U
for cache, hit := range backfillMap {
for _, values := range hit {
for l, b := range values {
if ctx.Err() != nil {
return
}
cache.StorePostings(blockID, l, b, tenant)
}
}
Expand All @@ -76,7 +95,13 @@ func (m *multiLevelCache) StoreExpandedPostings(blockID ulid.ULID, matchers []*l
}

func (m *multiLevelCache) FetchExpandedPostings(ctx context.Context, blockID ulid.ULID, matchers []*labels.Matcher, tenant string) ([]byte, bool) {
timer := prometheus.NewTimer(m.fetchLatency.WithLabelValues(cacheTypeExpandedPostings))
defer timer.ObserveDuration()

for i, c := range m.caches {
if ctx.Err() != nil {
return nil, false
}
if d, h := c.FetchExpandedPostings(ctx, blockID, matchers, tenant); h {
if i > 0 {
m.caches[i-1].StoreExpandedPostings(blockID, matchers, d, tenant)
Expand All @@ -102,12 +127,18 @@ func (m *multiLevelCache) StoreSeries(blockID ulid.ULID, id storage.SeriesRef, v
}

func (m *multiLevelCache) FetchMultiSeries(ctx context.Context, blockID ulid.ULID, ids []storage.SeriesRef, tenant string) (hits map[storage.SeriesRef][]byte, misses []storage.SeriesRef) {
timer := prometheus.NewTimer(m.fetchLatency.WithLabelValues(cacheTypeSeries))
defer timer.ObserveDuration()

misses = ids
hits = map[storage.SeriesRef][]byte{}
backfillMap := map[storecache.IndexCache][]map[storage.SeriesRef][]byte{}

for i, c := range m.caches {
backfillMap[c] = []map[storage.SeriesRef][]byte{}
if ctx.Err() != nil {
return
}
h, miss := c.FetchMultiSeries(ctx, blockID, misses, tenant)
misses = miss

Expand All @@ -128,6 +159,9 @@ func (m *multiLevelCache) FetchMultiSeries(ctx context.Context, blockID ulid.ULI
for cache, hit := range backfillMap {
for _, values := range hit {
for m, b := range values {
if ctx.Err() != nil {
return
}
cache.StoreSeries(blockID, m, b, tenant)
}
}
Expand All @@ -137,11 +171,16 @@ func (m *multiLevelCache) FetchMultiSeries(ctx context.Context, blockID ulid.ULI
return hits, misses
}

func newMultiLevelCache(c ...storecache.IndexCache) storecache.IndexCache {
func newMultiLevelCache(reg prometheus.Registerer, c ...storecache.IndexCache) storecache.IndexCache {
if len(c) == 1 {
return c[0]
}
return &multiLevelCache{
caches: c,
fetchLatency: promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{
Name: "cortex_store_multilevel_index_cache_fetch_duration_seconds",
Help: "Histogram to track latency to fetch items from multi level index cache",
Buckets: []float64{0.01, 0.1, 0.3, 0.6, 1, 3, 6, 10, 15, 20, 25, 30, 40, 50, 60, 90},
}, []string{"item_type"}),
}
}
7 changes: 4 additions & 3 deletions pkg/storage/tsdb/multilevel_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func Test_MultiIndexCacheInstantiation(t *testing.T) {
},
},
},
expectedType: newMultiLevelCache(),
expectedType: &multiLevelCache{},
},
"instantiate multiples backends - inmemory/memcached": {
cfg: IndexCacheConfig{
Expand All @@ -55,7 +55,7 @@ func Test_MultiIndexCacheInstantiation(t *testing.T) {
},
},
},
expectedType: newMultiLevelCache(),
expectedType: &multiLevelCache{},
},
"should not allow duplicate backends": {
cfg: IndexCacheConfig{
Expand Down Expand Up @@ -256,7 +256,8 @@ func Test_MultiLevelCache(t *testing.T) {
t.Run(name, func(t *testing.T) {
m1 := newMockIndexCache(tc.m1MockedCalls)
m2 := newMockIndexCache(tc.m2MockedCalls)
c := newMultiLevelCache(m1, m2)
reg := prometheus.NewRegistry()
c := newMultiLevelCache(reg, m1, m2)
tc.call(c)
require.Equal(t, tc.m1ExpectedCalls, m1.calls)
require.Equal(t, tc.m2ExpectedCalls, m2.calls)
Expand Down

0 comments on commit 7a8cbd6

Please sign in to comment.