Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve async processor handling enabled items, optimize code further #5686

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
* [FEATURE] Ingester: Add per-tenant new metric `cortex_ingester_tsdb_data_replay_duration_seconds`. #5477
* [ENHANCEMENT] Store Gateway: Added `-store-gateway.enabled-tenants` and `-store-gateway.disabled-tenants` to explicitly enable or disable store-gateway for specific tenants. #5638
* [ENHANCEMENT] Upgraded Docker base images to `alpine:3.18`. #5684
* [ENHANCEMENT] Index Cache: Multi level cache adds config `max_backfill_items` to cap max items to backfill per async operation. #5686

## 1.16.0 2023-11-20

Expand Down
4 changes: 4 additions & 0 deletions docs/blocks-storage/querier.md
Original file line number Diff line number Diff line change
Expand Up @@ -719,6 +719,10 @@ blocks_storage:
# CLI flag: -blocks-storage.bucket-store.index-cache.multilevel.max-async-buffer-size
[max_async_buffer_size: <int> | default = 10000]

# The maximum number of items to backfill per asynchronous operation.
# CLI flag: -blocks-storage.bucket-store.index-cache.multilevel.max-backfill-items
[max_backfill_items: <int> | default = 10000]

chunks_cache:
# Backend for chunks cache, if not empty. Supported values: memcached.
# CLI flag: -blocks-storage.bucket-store.chunks-cache.backend
Expand Down
4 changes: 4 additions & 0 deletions docs/blocks-storage/store-gateway.md
Original file line number Diff line number Diff line change
Expand Up @@ -834,6 +834,10 @@ blocks_storage:
# CLI flag: -blocks-storage.bucket-store.index-cache.multilevel.max-async-buffer-size
[max_async_buffer_size: <int> | default = 10000]

# The maximum number of items to backfill per asynchronous operation.
# CLI flag: -blocks-storage.bucket-store.index-cache.multilevel.max-backfill-items
[max_backfill_items: <int> | default = 10000]

chunks_cache:
# Backend for chunks cache, if not empty. Supported values: memcached.
# CLI flag: -blocks-storage.bucket-store.chunks-cache.backend
Expand Down
4 changes: 4 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -1268,6 +1268,10 @@ bucket_store:
# CLI flag: -blocks-storage.bucket-store.index-cache.multilevel.max-async-buffer-size
[max_async_buffer_size: <int> | default = 10000]

# The maximum number of items to backfill per asynchronous operation.
# CLI flag: -blocks-storage.bucket-store.index-cache.multilevel.max-backfill-items
[max_backfill_items: <int> | default = 10000]

chunks_cache:
# Backend for chunks cache, if not empty. Supported values: memcached.
# CLI flag: -blocks-storage.bucket-store.chunks-cache.backend
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ require (
github.com/VictoriaMetrics/fastcache v1.12.1
github.com/cespare/xxhash/v2 v2.2.0
github.com/google/go-cmp v0.6.0
golang.org/x/exp v0.0.0-20231006140011-7918f672742d
google.golang.org/protobuf v1.31.0
)

Expand Down Expand Up @@ -217,7 +218,6 @@ require (
go4.org/intern v0.0.0-20230525184215-6c62f75575cb // indirect
go4.org/unsafe/assume-no-moving-gc v0.0.0-20230525183740-e7c30c78aeb2 // indirect
golang.org/x/crypto v0.15.0 // indirect
golang.org/x/exp v0.0.0-20231006140011-7918f672742d // indirect
golang.org/x/mod v0.13.0 // indirect
golang.org/x/oauth2 v0.14.0 // indirect
golang.org/x/sys v0.14.0 // indirect
Expand Down
21 changes: 11 additions & 10 deletions pkg/storage/tsdb/index_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ var (
errNoIndexCacheAddresses = errors.New("no index cache backend addresses")
errInvalidMaxAsyncConcurrency = errors.New("invalid max_async_concurrency, must greater than 0")
errInvalidMaxAsyncBufferSize = errors.New("invalid max_async_buffer_size, must greater than 0")
errInvalidMaxBackfillItems = errors.New("invalid max_backfill_items, must greater than 0")
)

type IndexCacheConfig struct {
Expand Down Expand Up @@ -114,6 +115,7 @@ func (cfg *IndexCacheConfig) Validate() error {
type MultiLevelIndexCacheConfig struct {
MaxAsyncConcurrency int `yaml:"max_async_concurrency"`
MaxAsyncBufferSize int `yaml:"max_async_buffer_size"`
MaxBackfillItems int `yaml:"max_backfill_items"`
}

func (cfg *MultiLevelIndexCacheConfig) Validate() error {
Expand All @@ -123,12 +125,16 @@ func (cfg *MultiLevelIndexCacheConfig) Validate() error {
if cfg.MaxAsyncConcurrency <= 0 {
return errInvalidMaxAsyncConcurrency
}
if cfg.MaxBackfillItems <= 0 {
return errInvalidMaxBackfillItems
}
return nil
}

func (cfg *MultiLevelIndexCacheConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix string) {
f.IntVar(&cfg.MaxAsyncConcurrency, prefix+"max-async-concurrency", 50, "The maximum number of concurrent asynchronous operations can occur when backfilling cache items.")
f.IntVar(&cfg.MaxAsyncBufferSize, prefix+"max-async-buffer-size", 10000, "The maximum number of enqueued asynchronous operations allowed when backfilling cache items.")
f.IntVar(&cfg.MaxBackfillItems, prefix+"max-backfill-items", 10000, "The maximum number of items to backfill per asynchronous operation.")
}

type InMemoryIndexCacheConfig struct {
Expand Down Expand Up @@ -187,7 +193,7 @@ func NewIndexCache(cfg IndexCacheConfig, logger log.Logger, registerer prometheu
splitBackends := strings.Split(cfg.Backend, ",")
var (
caches []storecache.IndexCache
enabledItems []string
enabledItems [][]string
)

for i, backend := range splitBackends {
Expand All @@ -205,7 +211,7 @@ func NewIndexCache(cfg IndexCacheConfig, logger log.Logger, registerer prometheu
return c, err
}
caches = append(caches, c)
enabledItems = cfg.InMemory.EnabledItems
enabledItems = append(enabledItems, cfg.InMemory.EnabledItems)
case IndexCacheBackendMemcached:
c, err := newMemcachedIndexCacheClient(cfg.Memcached.ClientConfig, logger, registerer)
if err != nil {
Expand All @@ -217,7 +223,7 @@ func NewIndexCache(cfg IndexCacheConfig, logger log.Logger, registerer prometheu
return nil, err
}
caches = append(caches, cache)
enabledItems = cfg.Memcached.EnabledItems
enabledItems = append(enabledItems, cfg.Memcached.EnabledItems)
case IndexCacheBackendRedis:
c, err := newRedisIndexCacheClient(cfg.Redis.ClientConfig, logger, iReg)
if err != nil {
Expand All @@ -229,18 +235,13 @@ func NewIndexCache(cfg IndexCacheConfig, logger log.Logger, registerer prometheu
return nil, err
}
caches = append(caches, cache)
enabledItems = cfg.Redis.EnabledItems
enabledItems = append(enabledItems, cfg.Redis.EnabledItems)
default:
return nil, errUnsupportedIndexCacheBackend
}
if len(enabledItems) > 0 {
latestCache := caches[len(caches)-1]
cache := storecache.NewFilteredIndexCache(latestCache, enabledItems)
caches[len(caches)-1] = cache
}
}

return newMultiLevelCache(registerer, cfg.MultiLevel, caches...), nil
return newMultiLevelCache(registerer, cfg.MultiLevel, enabledItems, caches...), nil
}

func newInMemoryIndexCache(cfg InMemoryIndexCacheConfig, logger log.Logger, registerer prometheus.Registerer) (storecache.IndexCache, error) {
Expand Down
126 changes: 84 additions & 42 deletions pkg/storage/tsdb/multilevel_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/prometheus/prometheus/storage"
"github.com/thanos-io/thanos/pkg/cacheutil"
storecache "github.com/thanos-io/thanos/pkg/store/cache"
"golang.org/x/exp/slices"
)

const (
Expand All @@ -21,18 +22,22 @@ const (
)

type multiLevelCache struct {
caches []storecache.IndexCache
postingsCaches, seriesCaches, expandedPostingCaches []storecache.IndexCache

fetchLatency *prometheus.HistogramVec
backFillLatency *prometheus.HistogramVec
backfillProcessor *cacheutil.AsyncOperationProcessor
backfillDroppedItems *prometheus.CounterVec
fetchLatency *prometheus.HistogramVec
backFillLatency *prometheus.HistogramVec
backfillProcessor *cacheutil.AsyncOperationProcessor
backfillDroppedPostings prometheus.Counter
backfillDroppedSeries prometheus.Counter
backfillDroppedExpandedPostings prometheus.Counter

maxBackfillItems int
}

func (m *multiLevelCache) StorePostings(blockID ulid.ULID, l labels.Label, v []byte, tenant string) {
wg := sync.WaitGroup{}
wg.Add(len(m.caches))
for _, c := range m.caches {
wg.Add(len(m.postingsCaches))
for _, c := range m.postingsCaches {
cache := c
go func() {
defer wg.Done()
Expand All @@ -48,9 +53,9 @@ func (m *multiLevelCache) FetchMultiPostings(ctx context.Context, blockID ulid.U

misses = keys
hits = map[labels.Label][]byte{}
backfillItems := make([]map[labels.Label][]byte, len(m.caches)-1)
for i, c := range m.caches {
if i < len(m.caches)-1 {
backfillItems := make([]map[labels.Label][]byte, len(m.postingsCaches)-1)
for i, c := range m.postingsCaches {
if i < len(m.postingsCaches)-1 {
backfillItems[i] = map[labels.Label][]byte{}
}
if ctx.Err() != nil {
Expand All @@ -76,14 +81,23 @@ func (m *multiLevelCache) FetchMultiPostings(ctx context.Context, blockID ulid.U
backFillTimer := prometheus.NewTimer(m.backFillLatency.WithLabelValues(cacheTypePostings))
defer backFillTimer.ObserveDuration()
for i, values := range backfillItems {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we put this also on the "async task"?

for lbl, b := range values {
lbl := lbl
b := b
if err := m.backfillProcessor.EnqueueAsync(func() {
m.caches[i].StorePostings(blockID, lbl, b, tenant)
}); errors.Is(err, cacheutil.ErrAsyncBufferFull) {
m.backfillDroppedItems.WithLabelValues(cacheTypePostings).Inc()
i := i
values := values
if len(values) == 0 {
continue
}
if err := m.backfillProcessor.EnqueueAsync(func() {
cnt := 0
for lbl, b := range values {
m.postingsCaches[i].StorePostings(blockID, lbl, b, tenant)
cnt++
if cnt == m.maxBackfillItems {
m.backfillDroppedPostings.Add(float64(len(values) - cnt))
return
}
}
}); errors.Is(err, cacheutil.ErrAsyncBufferFull) {
m.backfillDroppedPostings.Add(float64(len(values)))
}
}
}()
Expand All @@ -93,8 +107,8 @@ func (m *multiLevelCache) FetchMultiPostings(ctx context.Context, blockID ulid.U

func (m *multiLevelCache) StoreExpandedPostings(blockID ulid.ULID, matchers []*labels.Matcher, v []byte, tenant string) {
wg := sync.WaitGroup{}
wg.Add(len(m.caches))
for _, c := range m.caches {
wg.Add(len(m.expandedPostingCaches))
for _, c := range m.expandedPostingCaches {
cache := c
go func() {
defer wg.Done()
Expand All @@ -108,17 +122,17 @@ func (m *multiLevelCache) FetchExpandedPostings(ctx context.Context, blockID uli
timer := prometheus.NewTimer(m.fetchLatency.WithLabelValues(cacheTypeExpandedPostings))
defer timer.ObserveDuration()

for i, c := range m.caches {
for i, c := range m.expandedPostingCaches {
if ctx.Err() != nil {
return nil, false
}
if d, h := c.FetchExpandedPostings(ctx, blockID, matchers, tenant); h {
if i > 0 {
backFillTimer := prometheus.NewTimer(m.backFillLatency.WithLabelValues(cacheTypeExpandedPostings))
if err := m.backfillProcessor.EnqueueAsync(func() {
m.caches[i-1].StoreExpandedPostings(blockID, matchers, d, tenant)
m.expandedPostingCaches[i-1].StoreExpandedPostings(blockID, matchers, d, tenant)
}); errors.Is(err, cacheutil.ErrAsyncBufferFull) {
m.backfillDroppedItems.WithLabelValues(cacheTypeExpandedPostings).Inc()
m.backfillDroppedExpandedPostings.Inc()
}
backFillTimer.ObserveDuration()
}
Expand All @@ -131,8 +145,8 @@ func (m *multiLevelCache) FetchExpandedPostings(ctx context.Context, blockID uli

func (m *multiLevelCache) StoreSeries(blockID ulid.ULID, id storage.SeriesRef, v []byte, tenant string) {
wg := sync.WaitGroup{}
wg.Add(len(m.caches))
for _, c := range m.caches {
wg.Add(len(m.seriesCaches))
for _, c := range m.seriesCaches {
cache := c
go func() {
defer wg.Done()
Expand All @@ -148,10 +162,10 @@ func (m *multiLevelCache) FetchMultiSeries(ctx context.Context, blockID ulid.ULI

misses = ids
hits = map[storage.SeriesRef][]byte{}
backfillItems := make([]map[storage.SeriesRef][]byte, len(m.caches)-1)
backfillItems := make([]map[storage.SeriesRef][]byte, len(m.seriesCaches)-1)

for i, c := range m.caches {
if i < len(m.caches)-1 {
for i, c := range m.seriesCaches {
if i < len(m.seriesCaches)-1 {
backfillItems[i] = map[storage.SeriesRef][]byte{}
}
if ctx.Err() != nil {
Expand All @@ -177,29 +191,57 @@ func (m *multiLevelCache) FetchMultiSeries(ctx context.Context, blockID ulid.ULI
backFillTimer := prometheus.NewTimer(m.backFillLatency.WithLabelValues(cacheTypeSeries))
defer backFillTimer.ObserveDuration()
for i, values := range backfillItems {
for ref, b := range values {
ref := ref
b := b
if err := m.backfillProcessor.EnqueueAsync(func() {
m.caches[i].StoreSeries(blockID, ref, b, tenant)
}); errors.Is(err, cacheutil.ErrAsyncBufferFull) {
m.backfillDroppedItems.WithLabelValues(cacheTypeSeries).Inc()
i := i
values := values
if len(values) == 0 {
continue
}
if err := m.backfillProcessor.EnqueueAsync(func() {
cnt := 0
for ref, b := range values {
m.seriesCaches[i].StoreSeries(blockID, ref, b, tenant)
cnt++
if cnt == m.maxBackfillItems {
m.backfillDroppedSeries.Add(float64(len(values) - cnt))
return
}
}
}); errors.Is(err, cacheutil.ErrAsyncBufferFull) {
m.backfillDroppedSeries.Add(float64(len(values)))
}
}
}()

return hits, misses
}

func newMultiLevelCache(reg prometheus.Registerer, cfg MultiLevelIndexCacheConfig, c ...storecache.IndexCache) storecache.IndexCache {
func filterCachesByItem(enabledItems [][]string, cachedItem string, c ...storecache.IndexCache) []storecache.IndexCache {
filteredCaches := make([]storecache.IndexCache, 0, len(c))
for i := range enabledItems {
if len(enabledItems[i]) == 0 || slices.Contains(enabledItems[i], cachedItem) {
filteredCaches = append(filteredCaches, c[i])
}
}
return filteredCaches
}

func newMultiLevelCache(reg prometheus.Registerer, cfg MultiLevelIndexCacheConfig, enabledItems [][]string, c ...storecache.IndexCache) storecache.IndexCache {
if len(c) == 1 {
return c[0]
if len(enabledItems[0]) == 0 {
return c[0]
}
return storecache.NewFilteredIndexCache(c[0], enabledItems[0])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still need this type? we cannot just use a multi level with 1 level?

}

backfillDroppedItems := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_store_multilevel_index_cache_backfill_dropped_items_total",
Help: "Total number of items dropped due to async buffer full when backfilling multilevel cache ",
}, []string{"item_type"})
return &multiLevelCache{
caches: c,
backfillProcessor: cacheutil.NewAsyncOperationProcessor(cfg.MaxAsyncBufferSize, cfg.MaxAsyncConcurrency),
postingsCaches: filterCachesByItem(enabledItems, cacheTypePostings, c...),
seriesCaches: filterCachesByItem(enabledItems, cacheTypeSeries, c...),
expandedPostingCaches: filterCachesByItem(enabledItems, cacheTypeExpandedPostings, c...),
backfillProcessor: cacheutil.NewAsyncOperationProcessor(cfg.MaxAsyncBufferSize, cfg.MaxAsyncConcurrency),
fetchLatency: promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{
Name: "cortex_store_multilevel_index_cache_fetch_duration_seconds",
Help: "Histogram to track latency to fetch items from multi level index cache",
Expand All @@ -210,9 +252,9 @@ func newMultiLevelCache(reg prometheus.Registerer, cfg MultiLevelIndexCacheConfi
Help: "Histogram to track latency to backfill items from multi level index cache",
Buckets: []float64{0.01, 0.1, 0.3, 0.6, 1, 3, 6, 10, 15, 20, 25, 30, 40, 50, 60, 90},
}, []string{"item_type"}),
backfillDroppedItems: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_store_multilevel_index_cache_backfill_dropped_items_total",
Help: "Total number of items dropped due to async buffer full when backfilling multilevel cache ",
}, []string{"item_type"}),
backfillDroppedPostings: backfillDroppedItems.WithLabelValues(cacheTypePostings),
backfillDroppedSeries: backfillDroppedItems.WithLabelValues(cacheTypeSeries),
backfillDroppedExpandedPostings: backfillDroppedItems.WithLabelValues(cacheTypeExpandedPostings),
maxBackfillItems: cfg.MaxBackfillItems,
}
}
Loading
Loading