Skip to content

Commit

Permalink
check context in multi level cache and add histogram to measure latency
Browse files Browse the repository at this point in the history
Signed-off-by: Ben Ye <[email protected]>

add backfill histogram

Signed-off-by: Ben Ye <[email protected]>

update changelog

Signed-off-by: Ben Ye <[email protected]>

lint

Signed-off-by: Ben Ye <[email protected]>
  • Loading branch information
yeya24 committed Oct 12, 2023
1 parent f16bb49 commit d8230cf
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 5 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
* [ENHANCEMENT] Querier: Check context before notifying scheduler and frontend. #5565
* [ENHANCEMENT] QueryFrontend: Add metric for number of series requests. #5373
* [ENHANCEMENT] Store Gateway: Add histogram metrics for total time spent fetching series and chunks per request. #5573
* [ENHANCEMENT] Store Gateway: Check context in multi level cache. Add `cortex_store_multilevel_index_cache_fetch_duration_seconds` and `cortex_store_multilevel_index_cache_backfill_duration_seconds` to measure fetch and backfill latency. #5596
* [BUGFIX] Ruler: Validate if rule group can be safely converted back to rule group yaml from protobuf message #5265
* [BUGFIX] Querier: Convert gRPC `ResourceExhausted` status code from store gateway to 422 limit error. #5286
* [BUGFIX] Alertmanager: Route web-ui requests to the alertmanager distributor when sharding is enabled. #5293
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/tsdb/index_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ func NewIndexCache(cfg IndexCacheConfig, logger log.Logger, registerer prometheu
}
}

return newMultiLevelCache(caches...), nil
return newMultiLevelCache(registerer, caches...), nil
}

func newInMemoryIndexCache(cfg InMemoryIndexCacheConfig, logger log.Logger, registerer prometheus.Registerer) (storecache.IndexCache, error) {
Expand Down
53 changes: 52 additions & 1 deletion pkg/storage/tsdb/multilevel_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,24 @@ import (
"sync"

"github.com/oklog/ulid"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
storecache "github.com/thanos-io/thanos/pkg/store/cache"
)

const (
cacheTypePostings string = "Postings"
cacheTypeExpandedPostings string = "ExpandedPostings"
cacheTypeSeries string = "Series"
)

type multiLevelCache struct {
caches []storecache.IndexCache

fetchLatency *prometheus.HistogramVec
backFillLatency *prometheus.HistogramVec
}

func (m *multiLevelCache) StorePostings(blockID ulid.ULID, l labels.Label, v []byte, tenant string) {
Expand All @@ -28,11 +39,17 @@ func (m *multiLevelCache) StorePostings(blockID ulid.ULID, l labels.Label, v []b
}

func (m *multiLevelCache) FetchMultiPostings(ctx context.Context, blockID ulid.ULID, keys []labels.Label, tenant string) (hits map[labels.Label][]byte, misses []labels.Label) {
timer := prometheus.NewTimer(m.fetchLatency.WithLabelValues(cacheTypePostings))
defer timer.ObserveDuration()

misses = keys
hits = map[labels.Label][]byte{}
backfillMap := map[storecache.IndexCache][]map[labels.Label][]byte{}
for i, c := range m.caches {
backfillMap[c] = []map[labels.Label][]byte{}
if ctx.Err() != nil {
return
}
h, mi := c.FetchMultiPostings(ctx, blockID, misses, tenant)
misses = mi

Expand All @@ -50,9 +67,14 @@ func (m *multiLevelCache) FetchMultiPostings(ctx context.Context, blockID ulid.U
}

defer func() {
backFillTimer := prometheus.NewTimer(m.backFillLatency.WithLabelValues(cacheTypePostings))
defer backFillTimer.ObserveDuration()
for cache, hit := range backfillMap {
for _, values := range hit {
for l, b := range values {
if ctx.Err() != nil {
return
}
cache.StorePostings(blockID, l, b, tenant)
}
}
Expand All @@ -76,10 +98,18 @@ func (m *multiLevelCache) StoreExpandedPostings(blockID ulid.ULID, matchers []*l
}

func (m *multiLevelCache) FetchExpandedPostings(ctx context.Context, blockID ulid.ULID, matchers []*labels.Matcher, tenant string) ([]byte, bool) {
timer := prometheus.NewTimer(m.fetchLatency.WithLabelValues(cacheTypeExpandedPostings))
defer timer.ObserveDuration()

for i, c := range m.caches {
if ctx.Err() != nil {
return nil, false
}
if d, h := c.FetchExpandedPostings(ctx, blockID, matchers, tenant); h {
if i > 0 {
backFillTimer := prometheus.NewTimer(m.backFillLatency.WithLabelValues(cacheTypeExpandedPostings))
m.caches[i-1].StoreExpandedPostings(blockID, matchers, d, tenant)
backFillTimer.ObserveDuration()
}
return d, h
}
Expand All @@ -102,12 +132,18 @@ func (m *multiLevelCache) StoreSeries(blockID ulid.ULID, id storage.SeriesRef, v
}

func (m *multiLevelCache) FetchMultiSeries(ctx context.Context, blockID ulid.ULID, ids []storage.SeriesRef, tenant string) (hits map[storage.SeriesRef][]byte, misses []storage.SeriesRef) {
timer := prometheus.NewTimer(m.fetchLatency.WithLabelValues(cacheTypeSeries))
defer timer.ObserveDuration()

misses = ids
hits = map[storage.SeriesRef][]byte{}
backfillMap := map[storecache.IndexCache][]map[storage.SeriesRef][]byte{}

for i, c := range m.caches {
backfillMap[c] = []map[storage.SeriesRef][]byte{}
if ctx.Err() != nil {
return
}
h, miss := c.FetchMultiSeries(ctx, blockID, misses, tenant)
misses = miss

Expand All @@ -125,9 +161,14 @@ func (m *multiLevelCache) FetchMultiSeries(ctx context.Context, blockID ulid.ULI
}

defer func() {
backFillTimer := prometheus.NewTimer(m.backFillLatency.WithLabelValues(cacheTypeSeries))
defer backFillTimer.ObserveDuration()
for cache, hit := range backfillMap {
for _, values := range hit {
for m, b := range values {
if ctx.Err() != nil {
return
}
cache.StoreSeries(blockID, m, b, tenant)
}
}
Expand All @@ -137,11 +178,21 @@ func (m *multiLevelCache) FetchMultiSeries(ctx context.Context, blockID ulid.ULI
return hits, misses
}

func newMultiLevelCache(c ...storecache.IndexCache) storecache.IndexCache {
func newMultiLevelCache(reg prometheus.Registerer, c ...storecache.IndexCache) storecache.IndexCache {
if len(c) == 1 {
return c[0]
}
return &multiLevelCache{
caches: c,
fetchLatency: promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{
Name: "cortex_store_multilevel_index_cache_fetch_duration_seconds",
Help: "Histogram to track latency to fetch items from multi level index cache",
Buckets: []float64{0.01, 0.1, 0.3, 0.6, 1, 3, 6, 10, 15, 20, 25, 30, 40, 50, 60, 90},
}, []string{"item_type"}),
backFillLatency: promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{
Name: "cortex_store_multilevel_index_cache_backfill_duration_seconds",
Help: "Histogram to track latency to backfill items from multi level index cache",
Buckets: []float64{0.01, 0.1, 0.3, 0.6, 1, 3, 6, 10, 15, 20, 25, 30, 40, 50, 60, 90},
}, []string{"item_type"}),
}
}
7 changes: 4 additions & 3 deletions pkg/storage/tsdb/multilevel_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func Test_MultiIndexCacheInstantiation(t *testing.T) {
},
},
},
expectedType: newMultiLevelCache(),
expectedType: &multiLevelCache{},
},
"instantiate multiples backends - inmemory/memcached": {
cfg: IndexCacheConfig{
Expand All @@ -55,7 +55,7 @@ func Test_MultiIndexCacheInstantiation(t *testing.T) {
},
},
},
expectedType: newMultiLevelCache(),
expectedType: &multiLevelCache{},
},
"should not allow duplicate backends": {
cfg: IndexCacheConfig{
Expand Down Expand Up @@ -256,7 +256,8 @@ func Test_MultiLevelCache(t *testing.T) {
t.Run(name, func(t *testing.T) {
m1 := newMockIndexCache(tc.m1MockedCalls)
m2 := newMockIndexCache(tc.m2MockedCalls)
c := newMultiLevelCache(m1, m2)
reg := prometheus.NewRegistry()
c := newMultiLevelCache(reg, m1, m2)
tc.call(c)
require.Equal(t, tc.m1ExpectedCalls, m1.calls)
require.Equal(t, tc.m2ExpectedCalls, m2.calls)
Expand Down

0 comments on commit d8230cf

Please sign in to comment.