Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multi level cache: check context add latency histogram #5596

Merged
merged 1 commit into from
Oct 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
* [ENHANCEMENT] Querier: Check context before notifying scheduler and frontend. #5565
* [ENHANCEMENT] QueryFrontend: Add metric for number of series requests. #5373
* [ENHANCEMENT] Store Gateway: Add histogram metrics for total time spent fetching series and chunks per request. #5573
* [ENHANCEMENT] Store Gateway: Check context in multi level cache. Add `cortex_store_multilevel_index_cache_fetch_duration_seconds` and `cortex_store_multilevel_index_cache_backfill_duration_seconds` to measure fetch and backfill latency. #5596
* [BUGFIX] Ruler: Validate if rule group can be safely converted back to rule group yaml from protobuf message #5265
* [BUGFIX] Querier: Convert gRPC `ResourceExhausted` status code from store gateway to 422 limit error. #5286
* [BUGFIX] Alertmanager: Route web-ui requests to the alertmanager distributor when sharding is enabled. #5293
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/tsdb/index_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ func NewIndexCache(cfg IndexCacheConfig, logger log.Logger, registerer prometheu
}
}

return newMultiLevelCache(caches...), nil
return newMultiLevelCache(registerer, caches...), nil
}

func newInMemoryIndexCache(cfg InMemoryIndexCacheConfig, logger log.Logger, registerer prometheus.Registerer) (storecache.IndexCache, error) {
Expand Down
53 changes: 52 additions & 1 deletion pkg/storage/tsdb/multilevel_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,24 @@ import (
"sync"

"github.com/oklog/ulid"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
storecache "github.com/thanos-io/thanos/pkg/store/cache"
)

const (
cacheTypePostings string = "Postings"
cacheTypeExpandedPostings string = "ExpandedPostings"
cacheTypeSeries string = "Series"
)

type multiLevelCache struct {
caches []storecache.IndexCache

fetchLatency *prometheus.HistogramVec
backFillLatency *prometheus.HistogramVec
}

func (m *multiLevelCache) StorePostings(blockID ulid.ULID, l labels.Label, v []byte, tenant string) {
Expand All @@ -28,11 +39,17 @@ func (m *multiLevelCache) StorePostings(blockID ulid.ULID, l labels.Label, v []b
}

func (m *multiLevelCache) FetchMultiPostings(ctx context.Context, blockID ulid.ULID, keys []labels.Label, tenant string) (hits map[labels.Label][]byte, misses []labels.Label) {
timer := prometheus.NewTimer(m.fetchLatency.WithLabelValues(cacheTypePostings))
defer timer.ObserveDuration()

misses = keys
hits = map[labels.Label][]byte{}
backfillMap := map[storecache.IndexCache][]map[labels.Label][]byte{}
for i, c := range m.caches {
backfillMap[c] = []map[labels.Label][]byte{}
if ctx.Err() != nil {
return
}
h, mi := c.FetchMultiPostings(ctx, blockID, misses, tenant)
misses = mi

Expand All @@ -50,9 +67,14 @@ func (m *multiLevelCache) FetchMultiPostings(ctx context.Context, blockID ulid.U
}

defer func() {
backFillTimer := prometheus.NewTimer(m.backFillLatency.WithLabelValues(cacheTypePostings))
defer backFillTimer.ObserveDuration()
for cache, hit := range backfillMap {
for _, values := range hit {
for l, b := range values {
if ctx.Err() != nil {
return
}
cache.StorePostings(blockID, l, b, tenant)
}
}
Expand All @@ -76,10 +98,18 @@ func (m *multiLevelCache) StoreExpandedPostings(blockID ulid.ULID, matchers []*l
}

func (m *multiLevelCache) FetchExpandedPostings(ctx context.Context, blockID ulid.ULID, matchers []*labels.Matcher, tenant string) ([]byte, bool) {
timer := prometheus.NewTimer(m.fetchLatency.WithLabelValues(cacheTypeExpandedPostings))
defer timer.ObserveDuration()

for i, c := range m.caches {
if ctx.Err() != nil {
return nil, false
}
if d, h := c.FetchExpandedPostings(ctx, blockID, matchers, tenant); h {
if i > 0 {
backFillTimer := prometheus.NewTimer(m.backFillLatency.WithLabelValues(cacheTypeExpandedPostings))
m.caches[i-1].StoreExpandedPostings(blockID, matchers, d, tenant)
backFillTimer.ObserveDuration()
}
return d, h
}
Expand All @@ -102,12 +132,18 @@ func (m *multiLevelCache) StoreSeries(blockID ulid.ULID, id storage.SeriesRef, v
}

func (m *multiLevelCache) FetchMultiSeries(ctx context.Context, blockID ulid.ULID, ids []storage.SeriesRef, tenant string) (hits map[storage.SeriesRef][]byte, misses []storage.SeriesRef) {
timer := prometheus.NewTimer(m.fetchLatency.WithLabelValues(cacheTypeSeries))
defer timer.ObserveDuration()

misses = ids
hits = map[storage.SeriesRef][]byte{}
backfillMap := map[storecache.IndexCache][]map[storage.SeriesRef][]byte{}

for i, c := range m.caches {
backfillMap[c] = []map[storage.SeriesRef][]byte{}
if ctx.Err() != nil {
return
}
h, miss := c.FetchMultiSeries(ctx, blockID, misses, tenant)
misses = miss

Expand All @@ -125,9 +161,14 @@ func (m *multiLevelCache) FetchMultiSeries(ctx context.Context, blockID ulid.ULI
}

defer func() {
backFillTimer := prometheus.NewTimer(m.backFillLatency.WithLabelValues(cacheTypeSeries))
defer backFillTimer.ObserveDuration()
for cache, hit := range backfillMap {
for _, values := range hit {
for m, b := range values {
if ctx.Err() != nil {
return
}
cache.StoreSeries(blockID, m, b, tenant)
}
}
Expand All @@ -137,11 +178,21 @@ func (m *multiLevelCache) FetchMultiSeries(ctx context.Context, blockID ulid.ULI
return hits, misses
}

func newMultiLevelCache(c ...storecache.IndexCache) storecache.IndexCache {
func newMultiLevelCache(reg prometheus.Registerer, c ...storecache.IndexCache) storecache.IndexCache {
if len(c) == 1 {
return c[0]
}
return &multiLevelCache{
caches: c,
fetchLatency: promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{
Name: "cortex_store_multilevel_index_cache_fetch_duration_seconds",
Help: "Histogram to track latency to fetch items from multi level index cache",
Buckets: []float64{0.01, 0.1, 0.3, 0.6, 1, 3, 6, 10, 15, 20, 25, 30, 40, 50, 60, 90},
}, []string{"item_type"}),
backFillLatency: promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{
Name: "cortex_store_multilevel_index_cache_backfill_duration_seconds",
Help: "Histogram to track latency to backfill items from multi level index cache",
Buckets: []float64{0.01, 0.1, 0.3, 0.6, 1, 3, 6, 10, 15, 20, 25, 30, 40, 50, 60, 90},
}, []string{"item_type"}),
}
}
7 changes: 4 additions & 3 deletions pkg/storage/tsdb/multilevel_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func Test_MultiIndexCacheInstantiation(t *testing.T) {
},
},
},
expectedType: newMultiLevelCache(),
expectedType: &multiLevelCache{},
},
"instantiate multiples backends - inmemory/memcached": {
cfg: IndexCacheConfig{
Expand All @@ -55,7 +55,7 @@ func Test_MultiIndexCacheInstantiation(t *testing.T) {
},
},
},
expectedType: newMultiLevelCache(),
expectedType: &multiLevelCache{},
},
"should not allow duplicate backends": {
cfg: IndexCacheConfig{
Expand Down Expand Up @@ -256,7 +256,8 @@ func Test_MultiLevelCache(t *testing.T) {
t.Run(name, func(t *testing.T) {
m1 := newMockIndexCache(tc.m1MockedCalls)
m2 := newMockIndexCache(tc.m2MockedCalls)
c := newMultiLevelCache(m1, m2)
reg := prometheus.NewRegistry()
c := newMultiLevelCache(reg, m1, m2)
tc.call(c)
require.Equal(t, tc.m1ExpectedCalls, m1.calls)
require.Equal(t, tc.m2ExpectedCalls, m2.calls)
Expand Down
Loading