diff --git a/CHANGELOG.md b/CHANGELOG.md index 5233a07d3e..e0d2ad5db0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -70,6 +70,7 @@ * [ENHANCEMENT] Querier: Check context before notifying scheduler and frontend. #5565 * [ENHANCEMENT] QueryFrontend: Add metric for number of series requests. #5373 * [ENHANCEMENT] Store Gateway: Add histogram metrics for total time spent fetching series and chunks per request. #5573 +* [ENHANCEMENT] Store Gateway: Check context in multi level cache. Add `cortex_store_multilevel_index_cache_fetch_duration_seconds` and `cortex_store_multilevel_index_cache_backfill_duration_seconds` to measure fetch and backfill latency. #5596 * [BUGFIX] Ruler: Validate if rule group can be safely converted back to rule group yaml from protobuf message #5265 * [BUGFIX] Querier: Convert gRPC `ResourceExhausted` status code from store gateway to 422 limit error. #5286 * [BUGFIX] Alertmanager: Route web-ui requests to the alertmanager distributor when sharding is enabled. #5293 diff --git a/pkg/storage/tsdb/index_cache.go b/pkg/storage/tsdb/index_cache.go index 796f5a291a..3ccf719e55 100644 --- a/pkg/storage/tsdb/index_cache.go +++ b/pkg/storage/tsdb/index_cache.go @@ -205,7 +205,7 @@ func NewIndexCache(cfg IndexCacheConfig, logger log.Logger, registerer prometheu } } - return newMultiLevelCache(caches...), nil + return newMultiLevelCache(registerer, caches...), nil } func newInMemoryIndexCache(cfg InMemoryIndexCacheConfig, logger log.Logger, registerer prometheus.Registerer) (storecache.IndexCache, error) { diff --git a/pkg/storage/tsdb/multilevel_cache.go b/pkg/storage/tsdb/multilevel_cache.go index 032ae745a8..76d9c4c63e 100644 --- a/pkg/storage/tsdb/multilevel_cache.go +++ b/pkg/storage/tsdb/multilevel_cache.go @@ -5,13 +5,24 @@ import ( "sync" "github.com/oklog/ulid" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/storage" storecache "github.com/thanos-io/thanos/pkg/store/cache" ) +const ( + cacheTypePostings string = "Postings" + cacheTypeExpandedPostings string = "ExpandedPostings" + cacheTypeSeries string = "Series" +) + type multiLevelCache struct { caches []storecache.IndexCache + + fetchLatency *prometheus.HistogramVec + backFillLatency *prometheus.HistogramVec } func (m *multiLevelCache) StorePostings(blockID ulid.ULID, l labels.Label, v []byte, tenant string) { @@ -28,11 +39,17 @@ func (m *multiLevelCache) StorePostings(blockID ulid.ULID, l labels.Label, v []b } func (m *multiLevelCache) FetchMultiPostings(ctx context.Context, blockID ulid.ULID, keys []labels.Label, tenant string) (hits map[labels.Label][]byte, misses []labels.Label) { + timer := prometheus.NewTimer(m.fetchLatency.WithLabelValues(cacheTypePostings)) + defer timer.ObserveDuration() + misses = keys hits = map[labels.Label][]byte{} backfillMap := map[storecache.IndexCache][]map[labels.Label][]byte{} for i, c := range m.caches { backfillMap[c] = []map[labels.Label][]byte{} + if ctx.Err() != nil { + return + } h, mi := c.FetchMultiPostings(ctx, blockID, misses, tenant) misses = mi @@ -50,9 +67,14 @@ func (m *multiLevelCache) FetchMultiPostings(ctx context.Context, blockID ulid.U } defer func() { + backFillTimer := prometheus.NewTimer(m.backFillLatency.WithLabelValues(cacheTypePostings)) + defer backFillTimer.ObserveDuration() for cache, hit := range backfillMap { for _, values := range hit { for l, b := range values { + if ctx.Err() != nil { + return + } cache.StorePostings(blockID, l, b, tenant) } } @@ -76,10 +98,18 @@ func (m *multiLevelCache) StoreExpandedPostings(blockID ulid.ULID, matchers []*l } func (m *multiLevelCache) FetchExpandedPostings(ctx context.Context, blockID ulid.ULID, matchers []*labels.Matcher, tenant string) ([]byte, bool) { + timer := prometheus.NewTimer(m.fetchLatency.WithLabelValues(cacheTypeExpandedPostings)) + defer timer.ObserveDuration() + for i, c := range m.caches { + if ctx.Err() != nil { + return nil, false + } if d, h := c.FetchExpandedPostings(ctx, blockID, matchers, tenant); h { if i > 0 { + backFillTimer := prometheus.NewTimer(m.backFillLatency.WithLabelValues(cacheTypeExpandedPostings)) m.caches[i-1].StoreExpandedPostings(blockID, matchers, d, tenant) + backFillTimer.ObserveDuration() } return d, h } @@ -102,12 +132,18 @@ func (m *multiLevelCache) StoreSeries(blockID ulid.ULID, id storage.SeriesRef, v } func (m *multiLevelCache) FetchMultiSeries(ctx context.Context, blockID ulid.ULID, ids []storage.SeriesRef, tenant string) (hits map[storage.SeriesRef][]byte, misses []storage.SeriesRef) { + timer := prometheus.NewTimer(m.fetchLatency.WithLabelValues(cacheTypeSeries)) + defer timer.ObserveDuration() + misses = ids hits = map[storage.SeriesRef][]byte{} backfillMap := map[storecache.IndexCache][]map[storage.SeriesRef][]byte{} for i, c := range m.caches { backfillMap[c] = []map[storage.SeriesRef][]byte{} + if ctx.Err() != nil { + return + } h, miss := c.FetchMultiSeries(ctx, blockID, misses, tenant) misses = miss @@ -125,9 +161,14 @@ func (m *multiLevelCache) FetchMultiSeries(ctx context.Context, blockID ulid.ULI } defer func() { + backFillTimer := prometheus.NewTimer(m.backFillLatency.WithLabelValues(cacheTypeSeries)) + defer backFillTimer.ObserveDuration() for cache, hit := range backfillMap { for _, values := range hit { for m, b := range values { + if ctx.Err() != nil { + return + } cache.StoreSeries(blockID, m, b, tenant) } } @@ -137,11 +178,21 @@ func (m *multiLevelCache) FetchMultiSeries(ctx context.Context, blockID ulid.ULI return hits, misses } -func newMultiLevelCache(c ...storecache.IndexCache) storecache.IndexCache { +func newMultiLevelCache(reg prometheus.Registerer, c ...storecache.IndexCache) storecache.IndexCache { if len(c) == 1 { return c[0] } return &multiLevelCache{ caches: c, + fetchLatency: promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{ + Name: "cortex_store_multilevel_index_cache_fetch_duration_seconds", + Help: "Histogram to track latency to fetch items from multi level index cache", + Buckets: []float64{0.01, 0.1, 0.3, 0.6, 1, 3, 6, 10, 15, 20, 25, 30, 40, 50, 60, 90}, + }, []string{"item_type"}), + backFillLatency: promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{ + Name: "cortex_store_multilevel_index_cache_backfill_duration_seconds", + Help: "Histogram to track latency to backfill items from multi level index cache", + Buckets: []float64{0.01, 0.1, 0.3, 0.6, 1, 3, 6, 10, 15, 20, 25, 30, 40, 50, 60, 90}, + }, []string{"item_type"}), } } diff --git a/pkg/storage/tsdb/multilevel_cache_test.go b/pkg/storage/tsdb/multilevel_cache_test.go index 93956d8063..4434fb493c 100644 --- a/pkg/storage/tsdb/multilevel_cache_test.go +++ b/pkg/storage/tsdb/multilevel_cache_test.go @@ -43,7 +43,7 @@ func Test_MultiIndexCacheInstantiation(t *testing.T) { }, }, }, - expectedType: newMultiLevelCache(), + expectedType: &multiLevelCache{}, }, "instantiate multiples backends - inmemory/memcached": { cfg: IndexCacheConfig{ @@ -55,7 +55,7 @@ func Test_MultiIndexCacheInstantiation(t *testing.T) { }, }, }, - expectedType: newMultiLevelCache(), + expectedType: &multiLevelCache{}, }, "should not allow duplicate backends": { cfg: IndexCacheConfig{ @@ -256,7 +256,8 @@ func Test_MultiLevelCache(t *testing.T) { t.Run(name, func(t *testing.T) { m1 := newMockIndexCache(tc.m1MockedCalls) m2 := newMockIndexCache(tc.m2MockedCalls) - c := newMultiLevelCache(m1, m2) + reg := prometheus.NewRegistry() + c := newMultiLevelCache(reg, m1, m2) tc.call(c) require.Equal(t, tc.m1ExpectedCalls, m1.calls) require.Equal(t, tc.m2ExpectedCalls, m2.calls)