From 7a8cbd67f4c83636bd9cbbc1b177a6394ce78cad Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Tue, 10 Oct 2023 00:20:23 -0700 Subject: [PATCH] check context in multi level cache and add histogram to measure latency Signed-off-by: Ben Ye --- pkg/storage/tsdb/index_cache.go | 2 +- pkg/storage/tsdb/multilevel_cache.go | 41 ++++++++++++++++++++++- pkg/storage/tsdb/multilevel_cache_test.go | 7 ++-- 3 files changed, 45 insertions(+), 5 deletions(-) diff --git a/pkg/storage/tsdb/index_cache.go b/pkg/storage/tsdb/index_cache.go index 796f5a291a2..3ccf719e552 100644 --- a/pkg/storage/tsdb/index_cache.go +++ b/pkg/storage/tsdb/index_cache.go @@ -205,7 +205,7 @@ func NewIndexCache(cfg IndexCacheConfig, logger log.Logger, registerer prometheu } } - return newMultiLevelCache(caches...), nil + return newMultiLevelCache(registerer, caches...), nil } func newInMemoryIndexCache(cfg InMemoryIndexCacheConfig, logger log.Logger, registerer prometheus.Registerer) (storecache.IndexCache, error) { diff --git a/pkg/storage/tsdb/multilevel_cache.go b/pkg/storage/tsdb/multilevel_cache.go index 032ae745a82..11f5f5287ac 100644 --- a/pkg/storage/tsdb/multilevel_cache.go +++ b/pkg/storage/tsdb/multilevel_cache.go @@ -5,13 +5,23 @@ import ( "sync" "github.com/oklog/ulid" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/storage" storecache "github.com/thanos-io/thanos/pkg/store/cache" ) +const ( + cacheTypePostings string = "Postings" + cacheTypeExpandedPostings string = "ExpandedPostings" + cacheTypeSeries string = "Series" +) + type multiLevelCache struct { caches []storecache.IndexCache + + fetchLatency *prometheus.HistogramVec } func (m *multiLevelCache) StorePostings(blockID ulid.ULID, l labels.Label, v []byte, tenant string) { @@ -28,11 +38,17 @@ func (m *multiLevelCache) StorePostings(blockID ulid.ULID, l labels.Label, v []b } func (m *multiLevelCache) FetchMultiPostings(ctx context.Context, blockID ulid.ULID, keys []labels.Label, tenant string) (hits map[labels.Label][]byte, misses []labels.Label) { + timer := prometheus.NewTimer(m.fetchLatency.WithLabelValues(cacheTypePostings)) + defer timer.ObserveDuration() + misses = keys hits = map[labels.Label][]byte{} backfillMap := map[storecache.IndexCache][]map[labels.Label][]byte{} for i, c := range m.caches { backfillMap[c] = []map[labels.Label][]byte{} + if ctx.Err() != nil { + return + } h, mi := c.FetchMultiPostings(ctx, blockID, misses, tenant) misses = mi @@ -53,6 +69,9 @@ func (m *multiLevelCache) FetchMultiPostings(ctx context.Context, blockID ulid.U for cache, hit := range backfillMap { for _, values := range hit { for l, b := range values { + if ctx.Err() != nil { + return + } cache.StorePostings(blockID, l, b, tenant) } } @@ -76,7 +95,13 @@ func (m *multiLevelCache) StoreExpandedPostings(blockID ulid.ULID, matchers []*l } func (m *multiLevelCache) FetchExpandedPostings(ctx context.Context, blockID ulid.ULID, matchers []*labels.Matcher, tenant string) ([]byte, bool) { + timer := prometheus.NewTimer(m.fetchLatency.WithLabelValues(cacheTypeExpandedPostings)) + defer timer.ObserveDuration() + for i, c := range m.caches { + if ctx.Err() != nil { + return nil, false + } if d, h := c.FetchExpandedPostings(ctx, blockID, matchers, tenant); h { if i > 0 { m.caches[i-1].StoreExpandedPostings(blockID, matchers, d, tenant) @@ -102,12 +127,18 @@ func (m *multiLevelCache) StoreSeries(blockID ulid.ULID, id storage.SeriesRef, v } func (m *multiLevelCache) FetchMultiSeries(ctx context.Context, blockID ulid.ULID, ids []storage.SeriesRef, tenant string) (hits map[storage.SeriesRef][]byte, misses []storage.SeriesRef) { + timer := prometheus.NewTimer(m.fetchLatency.WithLabelValues(cacheTypeSeries)) + defer timer.ObserveDuration() + misses = ids hits = map[storage.SeriesRef][]byte{} backfillMap := map[storecache.IndexCache][]map[storage.SeriesRef][]byte{} for i, c := range m.caches { backfillMap[c] = []map[storage.SeriesRef][]byte{} + if ctx.Err() != nil { + return + } h, miss := c.FetchMultiSeries(ctx, blockID, misses, tenant) misses = miss @@ -128,6 +159,9 @@ func (m *multiLevelCache) FetchMultiSeries(ctx context.Context, blockID ulid.ULI for cache, hit := range backfillMap { for _, values := range hit { for m, b := range values { + if ctx.Err() != nil { + return + } cache.StoreSeries(blockID, m, b, tenant) } } @@ -137,11 +171,16 @@ func (m *multiLevelCache) FetchMultiSeries(ctx context.Context, blockID ulid.ULI return hits, misses } -func newMultiLevelCache(c ...storecache.IndexCache) storecache.IndexCache { +func newMultiLevelCache(reg prometheus.Registerer, c ...storecache.IndexCache) storecache.IndexCache { if len(c) == 1 { return c[0] } return &multiLevelCache{ caches: c, + fetchLatency: promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{ + Name: "cortex_store_multilevel_index_cache_fetch_duration_seconds", + Help: "Histogram to track latency to fetch items from multi level index cache", + Buckets: []float64{0.01, 0.1, 0.3, 0.6, 1, 3, 6, 10, 15, 20, 25, 30, 40, 50, 60, 90}, + }, []string{"item_type"}), } } diff --git a/pkg/storage/tsdb/multilevel_cache_test.go b/pkg/storage/tsdb/multilevel_cache_test.go index 93956d80637..4434fb493c8 100644 --- a/pkg/storage/tsdb/multilevel_cache_test.go +++ b/pkg/storage/tsdb/multilevel_cache_test.go @@ -43,7 +43,7 @@ func Test_MultiIndexCacheInstantiation(t *testing.T) { }, }, }, - expectedType: newMultiLevelCache(), + expectedType: &multiLevelCache{}, }, "instantiate multiples backends - inmemory/memcached": { cfg: IndexCacheConfig{ @@ -55,7 +55,7 @@ func Test_MultiIndexCacheInstantiation(t *testing.T) { }, }, }, - expectedType: newMultiLevelCache(), + expectedType: &multiLevelCache{}, }, "should not allow duplicate backends": { cfg: IndexCacheConfig{ @@ -256,7 +256,8 @@ func Test_MultiLevelCache(t *testing.T) { t.Run(name, func(t *testing.T) { m1 := newMockIndexCache(tc.m1MockedCalls) m2 := newMockIndexCache(tc.m2MockedCalls) - c := newMultiLevelCache(m1, m2) + reg := prometheus.NewRegistry() + c := newMultiLevelCache(reg, m1, m2) tc.call(c) require.Equal(t, tc.m1ExpectedCalls, m1.calls) require.Equal(t, tc.m2ExpectedCalls, m2.calls)