diff --git a/CHANGELOG.md b/CHANGELOG.md index 00ecd511bf..efcf471713 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,7 @@ ## master / unreleased * [CHANGE] Azure Storage: Upgraded objstore dependency and support Azure Workload Identity Authentication. Added `connection_string` to support authenticating via SAS token. Marked `msi_resource` config as deprecating. #5645 * [CHANGE] Store Gateway: Add a new fastcache based inmemory index cache. #5619 +* [CHANGE] Index Cache: Multi level cache backfilling operation becomes async. Added `-blocks-storage.bucket-store.index-cache.multilevel.max-async-concurrency` and `-blocks-storage.bucket-store.index-cache.multilevel.max-async-buffer-size` configs and metric `cortex_store_multilevel_index_cache_backfill_dropped_items_total` for number of dropped items. #5661 * [FEATURE] Ingester: Add per-tenant new metric `cortex_ingester_tsdb_data_replay_duration_seconds`. #5477 * [ENHANCEMENT] Store Gateway: Added `-store-gateway.enabled-tenants` and `-store-gateway.disabled-tenants` to explicitly enable or disable store-gateway for specific tenants. #5638 diff --git a/docs/blocks-storage/querier.md b/docs/blocks-storage/querier.md index b1bdd3a90c..524a2699fa 100644 --- a/docs/blocks-storage/querier.md +++ b/docs/blocks-storage/querier.md @@ -708,6 +708,17 @@ blocks_storage: # CLI flag: -blocks-storage.bucket-store.index-cache.redis.enabled-items [enabled_items: | default = []] + multilevel: + # The maximum number of concurrent asynchronous operations can occur + # when backfilling cache items. + # CLI flag: -blocks-storage.bucket-store.index-cache.multilevel.max-async-concurrency + [max_async_concurrency: | default = 50] + + # The maximum number of enqueued asynchronous operations allowed when + # backfilling cache items. + # CLI flag: -blocks-storage.bucket-store.index-cache.multilevel.max-async-buffer-size + [max_async_buffer_size: | default = 10000] + chunks_cache: # Backend for chunks cache, if not empty. Supported values: memcached. # CLI flag: -blocks-storage.bucket-store.chunks-cache.backend diff --git a/docs/blocks-storage/store-gateway.md b/docs/blocks-storage/store-gateway.md index 1006712d86..5a83eecbb3 100644 --- a/docs/blocks-storage/store-gateway.md +++ b/docs/blocks-storage/store-gateway.md @@ -823,6 +823,17 @@ blocks_storage: # CLI flag: -blocks-storage.bucket-store.index-cache.redis.enabled-items [enabled_items: | default = []] + multilevel: + # The maximum number of concurrent asynchronous operations can occur + # when backfilling cache items. + # CLI flag: -blocks-storage.bucket-store.index-cache.multilevel.max-async-concurrency + [max_async_concurrency: | default = 50] + + # The maximum number of enqueued asynchronous operations allowed when + # backfilling cache items. + # CLI flag: -blocks-storage.bucket-store.index-cache.multilevel.max-async-buffer-size + [max_async_buffer_size: | default = 10000] + chunks_cache: # Backend for chunks cache, if not empty. Supported values: memcached. # CLI flag: -blocks-storage.bucket-store.chunks-cache.backend diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index e0f5871201..c2c07da5e4 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -1257,6 +1257,17 @@ bucket_store: # CLI flag: -blocks-storage.bucket-store.index-cache.redis.enabled-items [enabled_items: | default = []] + multilevel: + # The maximum number of concurrent asynchronous operations can occur when + # backfilling cache items. + # CLI flag: -blocks-storage.bucket-store.index-cache.multilevel.max-async-concurrency + [max_async_concurrency: | default = 50] + + # The maximum number of enqueued asynchronous operations allowed when + # backfilling cache items. + # CLI flag: -blocks-storage.bucket-store.index-cache.multilevel.max-async-buffer-size + [max_async_buffer_size: | default = 10000] + chunks_cache: # Backend for chunks cache, if not empty. Supported values: memcached. # CLI flag: -blocks-storage.bucket-store.chunks-cache.backend diff --git a/go.mod b/go.mod index 6425537f04..1d506a0e08 100644 --- a/go.mod +++ b/go.mod @@ -53,7 +53,7 @@ require ( github.com/stretchr/testify v1.8.4 github.com/thanos-io/objstore v0.0.0-20231112185854-37752ee64d98 github.com/thanos-io/promql-engine v0.0.0-20231013104847-4517c0d5f591 - github.com/thanos-io/thanos v0.32.5-0.20231103115946-463a6ce8b53c + github.com/thanos-io/thanos v0.32.5-0.20231120163350-0a4f5ae3310e github.com/uber/jaeger-client-go v2.30.0+incompatible github.com/weaveworks/common v0.0.0-20221201103051-7c2720a9024d go.etcd.io/etcd/api/v3 v3.5.10 diff --git a/go.sum b/go.sum index 8b3fa220a8..e189527e9a 100644 --- a/go.sum +++ b/go.sum @@ -1519,8 +1519,8 @@ github.com/thanos-io/objstore v0.0.0-20231112185854-37752ee64d98 h1:gx2MTto1UQRu github.com/thanos-io/objstore v0.0.0-20231112185854-37752ee64d98/go.mod h1:JauBAcJ61tRSv9widgISVmA6akQXDeUMXBrVmWW4xog= github.com/thanos-io/promql-engine v0.0.0-20231013104847-4517c0d5f591 h1:6bZbFM+Mvy2kL8BeL8TJ5+5pV3sUR2PSLaZyw911rtQ= github.com/thanos-io/promql-engine v0.0.0-20231013104847-4517c0d5f591/go.mod h1:vfXJv1JXNdLfHnjsHsLLJl5tyI7KblF76Wo5lZ9YC4Q= -github.com/thanos-io/thanos v0.32.5-0.20231103115946-463a6ce8b53c h1:hMpXd1ybZB/vnR3+zex93va42rQ++2E0qi2wVSf3AwY= -github.com/thanos-io/thanos v0.32.5-0.20231103115946-463a6ce8b53c/go.mod h1:q+0MQPBugkBKZBFSOec4WV4EcuKJU6tgMI0i4M2znpY= +github.com/thanos-io/thanos v0.32.5-0.20231120163350-0a4f5ae3310e h1:ej5fKlojY+r8qty//Q4b7nyNA4QEkJ5uWms77Itf75E= +github.com/thanos-io/thanos v0.32.5-0.20231120163350-0a4f5ae3310e/go.mod h1:qeDC74QOf5hWzTlvIrLT8WlNGg67nORFON0T2VF4qgg= github.com/themihai/gomemcache v0.0.0-20180902122335-24332e2d58ab h1:7ZR3hmisBWw77ZpO1/o86g+JV3VKlk3d48jopJxzTjU= github.com/themihai/gomemcache v0.0.0-20180902122335-24332e2d58ab/go.mod h1:eheTFp954zcWZXCU8d0AT76ftsQOTo4DTqkN/h3k1MY= github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index f0394b814e..8e5ca5f76a 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -256,7 +256,8 @@ type userTSDB struct { lastUpdate atomic.Int64 // Thanos shipper used to ship blocks to the storage. - shipper Shipper + shipper Shipper + shipperMetadataFilePath string // When deletion marker is found for the tenant (checked before shipping), // shipping stops and TSDB is closed before reaching idle timeout time (if enabled). @@ -435,7 +436,7 @@ func (u *userTSDB) blocksToDelete(blocks []*tsdb.Block) map[ulid.ULID]struct{} { // updateCachedShipperBlocks reads the shipper meta file and updates the cached shipped blocks. func (u *userTSDB) updateCachedShippedBlocks() error { - shipperMeta, err := shipper.ReadMetaFile(u.db.Dir()) + shipperMeta, err := shipper.ReadMetaFile(u.shipperMetadataFilePath) if os.IsNotExist(err) || os.IsNotExist(errors.Cause(err)) { // If the meta file doesn't exist it means the shipper hasn't run yet. shipperMeta = &shipper.Meta{} @@ -606,7 +607,7 @@ func newTSDBState(bucketClient objstore.Bucket, registerer prometheus.Registerer } } -// NewV2 returns a new Ingester that uses Cortex block storage instead of chunks storage. +// New returns a new Ingester that uses Cortex block storage instead of chunks storage. func New(cfg Config, limits *validation.Overrides, registerer prometheus.Registerer, logger log.Logger) (*Ingester, error) { defaultInstanceLimits = &cfg.DefaultLimits if cfg.ingesterClientFactory == nil { @@ -2050,7 +2051,9 @@ func (i *Ingester) createTSDB(userID string) (*userTSDB, error) { }, true, // Allow out of order uploads. It's fine in Cortex's context. metadata.NoneFunc, + "", ) + userDB.shipperMetadataFilePath = filepath.Join(userDB.db.Dir(), filepath.Clean(shipper.DefaultMetaFilename)) // Initialise the shipper blocks cache. if err := userDB.updateCachedShippedBlocks(); err != nil { diff --git a/pkg/ingester/ingester_test.go b/pkg/ingester/ingester_test.go index 132003f4ff..78dd53f7b9 100644 --- a/pkg/ingester/ingester_test.go +++ b/pkg/ingester/ingester_test.go @@ -2821,7 +2821,7 @@ func TestIngester_sholdUpdateCacheShippedBlocks(t *testing.T) { require.Equal(t, len(db.getCachedShippedBlocks()), 0) shippedBlock, _ := ulid.Parse("01D78XZ44G0000000000000000") - require.NoError(t, shipper.WriteMetaFile(log.NewNopLogger(), db.db.Dir(), &shipper.Meta{ + require.NoError(t, shipper.WriteMetaFile(log.NewNopLogger(), db.shipperMetadataFilePath, &shipper.Meta{ Version: shipper.MetaVersion1, Uploaded: []ulid.ULID{shippedBlock}, })) @@ -2858,7 +2858,7 @@ func TestIngester_closeAndDeleteUserTSDBIfIdle_shouldNotCloseTSDBIfShippingIsInP // Mock the shipper meta (no blocks). db := i.getTSDB(userID) - require.NoError(t, shipper.WriteMetaFile(log.NewNopLogger(), db.db.Dir(), &shipper.Meta{ + require.NoError(t, shipper.WriteMetaFile(log.NewNopLogger(), db.shipperMetadataFilePath, &shipper.Meta{ Version: shipper.MetaVersion1, })) @@ -3788,7 +3788,7 @@ func TestIngesterNotDeleteUnshippedBlocks(t *testing.T) { `, oldBlocks[0].Meta().ULID.Time()/1000)), "cortex_ingester_oldest_unshipped_block_timestamp_seconds")) // Saying that we have shipped the second block, so only that should get deleted. - require.Nil(t, shipper.WriteMetaFile(nil, db.db.Dir(), &shipper.Meta{ + require.Nil(t, shipper.WriteMetaFile(nil, db.shipperMetadataFilePath, &shipper.Meta{ Version: shipper.MetaVersion1, Uploaded: []ulid.ULID{oldBlocks[1].Meta().ULID}, })) @@ -3816,7 +3816,7 @@ func TestIngesterNotDeleteUnshippedBlocks(t *testing.T) { `, newBlocks[0].Meta().ULID.Time()/1000)), "cortex_ingester_oldest_unshipped_block_timestamp_seconds")) // Shipping 2 more blocks, hence all the blocks from first round. - require.Nil(t, shipper.WriteMetaFile(nil, db.db.Dir(), &shipper.Meta{ + require.Nil(t, shipper.WriteMetaFile(nil, db.shipperMetadataFilePath, &shipper.Meta{ Version: shipper.MetaVersion1, Uploaded: []ulid.ULID{oldBlocks[1].Meta().ULID, newBlocks[0].Meta().ULID, newBlocks[1].Meta().ULID}, })) diff --git a/pkg/storage/tsdb/index_cache.go b/pkg/storage/tsdb/index_cache.go index 6668d81787..b52532bce6 100644 --- a/pkg/storage/tsdb/index_cache.go +++ b/pkg/storage/tsdb/index_cache.go @@ -42,13 +42,16 @@ var ( errUnsupportedIndexCacheBackend = errors.New("unsupported index cache backend") errDuplicatedIndexCacheBackend = errors.New("duplicated index cache backend") errNoIndexCacheAddresses = errors.New("no index cache backend addresses") + errInvalidMaxAsyncConcurrency = errors.New("invalid max_async_concurrency, must greater than 0") + errInvalidMaxAsyncBufferSize = errors.New("invalid max_async_buffer_size, must greater than 0") ) type IndexCacheConfig struct { - Backend string `yaml:"backend"` - InMemory InMemoryIndexCacheConfig `yaml:"inmemory"` - Memcached MemcachedIndexCacheConfig `yaml:"memcached"` - Redis RedisIndexCacheConfig `yaml:"redis"` + Backend string `yaml:"backend"` + InMemory InMemoryIndexCacheConfig `yaml:"inmemory"` + Memcached MemcachedIndexCacheConfig `yaml:"memcached"` + Redis RedisIndexCacheConfig `yaml:"redis"` + MultiLevel MultiLevelIndexCacheConfig `yaml:"multilevel"` } func (cfg *IndexCacheConfig) RegisterFlags(f *flag.FlagSet) { @@ -64,6 +67,7 @@ func (cfg *IndexCacheConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix str cfg.InMemory.RegisterFlagsWithPrefix(f, prefix+"inmemory.") cfg.Memcached.RegisterFlagsWithPrefix(f, prefix+"memcached.") cfg.Redis.RegisterFlagsWithPrefix(f, prefix+"redis.") + cfg.MultiLevel.RegisterFlagsWithPrefix(f, prefix+"multilevel.") } // Validate the config. @@ -72,6 +76,12 @@ func (cfg *IndexCacheConfig) Validate() error { splitBackends := strings.Split(cfg.Backend, ",") configuredBackends := map[string]struct{}{} + if len(splitBackends) > 1 { + if err := cfg.MultiLevel.Validate(); err != nil { + return err + } + } + for _, backend := range splitBackends { if !util.StringsContain(supportedIndexCacheBackends, backend) { return errUnsupportedIndexCacheBackend @@ -101,6 +111,26 @@ func (cfg *IndexCacheConfig) Validate() error { return nil } +type MultiLevelIndexCacheConfig struct { + MaxAsyncConcurrency int `yaml:"max_async_concurrency"` + MaxAsyncBufferSize int `yaml:"max_async_buffer_size"` +} + +func (cfg *MultiLevelIndexCacheConfig) Validate() error { + if cfg.MaxAsyncBufferSize <= 0 { + return errInvalidMaxAsyncBufferSize + } + if cfg.MaxAsyncConcurrency <= 0 { + return errInvalidMaxAsyncConcurrency + } + return nil +} + +func (cfg *MultiLevelIndexCacheConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix string) { + f.IntVar(&cfg.MaxAsyncConcurrency, prefix+"max-async-concurrency", 50, "The maximum number of concurrent asynchronous operations can occur when backfilling cache items.") + f.IntVar(&cfg.MaxAsyncBufferSize, prefix+"max-async-buffer-size", 10000, "The maximum number of enqueued asynchronous operations allowed when backfilling cache items.") +} + type InMemoryIndexCacheConfig struct { MaxSizeBytes uint64 `yaml:"max_size_bytes"` EnabledItems []string `yaml:"enabled_items"` @@ -210,7 +240,7 @@ func NewIndexCache(cfg IndexCacheConfig, logger log.Logger, registerer prometheu } } - return newMultiLevelCache(registerer, caches...), nil + return newMultiLevelCache(registerer, cfg.MultiLevel, caches...), nil } func newInMemoryIndexCache(cfg InMemoryIndexCacheConfig, logger log.Logger, registerer prometheus.Registerer) (storecache.IndexCache, error) { diff --git a/pkg/storage/tsdb/multilevel_cache.go b/pkg/storage/tsdb/multilevel_cache.go index 76d9c4c63e..59365ad9e0 100644 --- a/pkg/storage/tsdb/multilevel_cache.go +++ b/pkg/storage/tsdb/multilevel_cache.go @@ -2,6 +2,7 @@ package tsdb import ( "context" + "errors" "sync" "github.com/oklog/ulid" @@ -9,6 +10,7 @@ import ( "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/storage" + "github.com/thanos-io/thanos/pkg/cacheutil" storecache "github.com/thanos-io/thanos/pkg/store/cache" ) @@ -21,8 +23,10 @@ const ( type multiLevelCache struct { caches []storecache.IndexCache - fetchLatency *prometheus.HistogramVec - backFillLatency *prometheus.HistogramVec + fetchLatency *prometheus.HistogramVec + backFillLatency *prometheus.HistogramVec + backfillProcessor *cacheutil.AsyncOperationProcessor + backfillDroppedItems *prometheus.CounterVec } func (m *multiLevelCache) StorePostings(blockID ulid.ULID, l labels.Label, v []byte, tenant string) { @@ -44,9 +48,11 @@ func (m *multiLevelCache) FetchMultiPostings(ctx context.Context, blockID ulid.U misses = keys hits = map[labels.Label][]byte{} - backfillMap := map[storecache.IndexCache][]map[labels.Label][]byte{} + backfillItems := make([][]map[labels.Label][]byte, len(m.caches)-1) for i, c := range m.caches { - backfillMap[c] = []map[labels.Label][]byte{} + if i < len(m.caches)-1 { + backfillItems[i] = []map[labels.Label][]byte{} + } if ctx.Err() != nil { return } @@ -58,7 +64,7 @@ func (m *multiLevelCache) FetchMultiPostings(ctx context.Context, blockID ulid.U } if i > 0 { - backfillMap[m.caches[i-1]] = append(backfillMap[m.caches[i-1]], h) + backfillItems[i-1] = append(backfillItems[i-1], h) } if len(misses) == 0 { @@ -69,13 +75,14 @@ func (m *multiLevelCache) FetchMultiPostings(ctx context.Context, blockID ulid.U defer func() { backFillTimer := prometheus.NewTimer(m.backFillLatency.WithLabelValues(cacheTypePostings)) defer backFillTimer.ObserveDuration() - for cache, hit := range backfillMap { + for i, hit := range backfillItems { for _, values := range hit { - for l, b := range values { - if ctx.Err() != nil { - return + for lbl, b := range values { + if err := m.backfillProcessor.EnqueueAsync(func() { + m.caches[i].StorePostings(blockID, lbl, b, tenant) + }); errors.Is(err, cacheutil.ErrAsyncBufferFull) { + m.backfillDroppedItems.WithLabelValues(cacheTypePostings).Inc() } - cache.StorePostings(blockID, l, b, tenant) } } } @@ -108,7 +115,11 @@ func (m *multiLevelCache) FetchExpandedPostings(ctx context.Context, blockID uli if d, h := c.FetchExpandedPostings(ctx, blockID, matchers, tenant); h { if i > 0 { backFillTimer := prometheus.NewTimer(m.backFillLatency.WithLabelValues(cacheTypeExpandedPostings)) - m.caches[i-1].StoreExpandedPostings(blockID, matchers, d, tenant) + if err := m.backfillProcessor.EnqueueAsync(func() { + m.caches[i-1].StoreExpandedPostings(blockID, matchers, d, tenant) + }); errors.Is(err, cacheutil.ErrAsyncBufferFull) { + m.backfillDroppedItems.WithLabelValues(cacheTypeExpandedPostings).Inc() + } backFillTimer.ObserveDuration() } return d, h @@ -137,10 +148,12 @@ func (m *multiLevelCache) FetchMultiSeries(ctx context.Context, blockID ulid.ULI misses = ids hits = map[storage.SeriesRef][]byte{} - backfillMap := map[storecache.IndexCache][]map[storage.SeriesRef][]byte{} + backfillItems := make([][]map[storage.SeriesRef][]byte, len(m.caches)-1) for i, c := range m.caches { - backfillMap[c] = []map[storage.SeriesRef][]byte{} + if i < len(m.caches)-1 { + backfillItems[i] = []map[storage.SeriesRef][]byte{} + } if ctx.Err() != nil { return } @@ -152,7 +165,7 @@ func (m *multiLevelCache) FetchMultiSeries(ctx context.Context, blockID ulid.ULI } if i > 0 && len(h) > 0 { - backfillMap[m.caches[i-1]] = append(backfillMap[m.caches[i-1]], h) + backfillItems[i-1] = append(backfillItems[i-1], h) } if len(misses) == 0 { @@ -163,13 +176,14 @@ func (m *multiLevelCache) FetchMultiSeries(ctx context.Context, blockID ulid.ULI defer func() { backFillTimer := prometheus.NewTimer(m.backFillLatency.WithLabelValues(cacheTypeSeries)) defer backFillTimer.ObserveDuration() - for cache, hit := range backfillMap { + for i, hit := range backfillItems { for _, values := range hit { - for m, b := range values { - if ctx.Err() != nil { - return + for ref, b := range values { + if err := m.backfillProcessor.EnqueueAsync(func() { + m.caches[i].StoreSeries(blockID, ref, b, tenant) + }); errors.Is(err, cacheutil.ErrAsyncBufferFull) { + m.backfillDroppedItems.WithLabelValues(cacheTypeSeries).Inc() } - cache.StoreSeries(blockID, m, b, tenant) } } } @@ -178,12 +192,14 @@ func (m *multiLevelCache) FetchMultiSeries(ctx context.Context, blockID ulid.ULI return hits, misses } -func newMultiLevelCache(reg prometheus.Registerer, c ...storecache.IndexCache) storecache.IndexCache { +func newMultiLevelCache(reg prometheus.Registerer, cfg MultiLevelIndexCacheConfig, c ...storecache.IndexCache) storecache.IndexCache { if len(c) == 1 { return c[0] } + return &multiLevelCache{ - caches: c, + caches: c, + backfillProcessor: cacheutil.NewAsyncOperationProcessor(cfg.MaxAsyncBufferSize, cfg.MaxAsyncConcurrency), fetchLatency: promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{ Name: "cortex_store_multilevel_index_cache_fetch_duration_seconds", Help: "Histogram to track latency to fetch items from multi level index cache", @@ -194,5 +210,9 @@ func newMultiLevelCache(reg prometheus.Registerer, c ...storecache.IndexCache) s Help: "Histogram to track latency to backfill items from multi level index cache", Buckets: []float64{0.01, 0.1, 0.3, 0.6, 1, 3, 6, 10, 15, 20, 25, 30, 40, 50, 60, 90}, }, []string{"item_type"}), + backfillDroppedItems: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Name: "cortex_store_multilevel_index_cache_backfill_dropped_items_total", + Help: "Total number of items dropped due to async buffer full when backfilling multilevel cache ", + }, []string{"item_type"}), } } diff --git a/pkg/storage/tsdb/multilevel_cache_test.go b/pkg/storage/tsdb/multilevel_cache_test.go index c37e05391c..9ae5e92972 100644 --- a/pkg/storage/tsdb/multilevel_cache_test.go +++ b/pkg/storage/tsdb/multilevel_cache_test.go @@ -17,6 +17,10 @@ import ( ) func Test_MultiIndexCacheInstantiation(t *testing.T) { + multiLevelCfg := MultiLevelIndexCacheConfig{ + MaxAsyncBufferSize: 1, + MaxAsyncConcurrency: 1, + } s, err := miniredis.Run() if err != nil { testutil.Ok(t, err) @@ -42,6 +46,7 @@ func Test_MultiIndexCacheInstantiation(t *testing.T) { Addresses: s.Addr(), }, }, + MultiLevel: multiLevelCfg, }, expectedType: &multiLevelCache{}, }, @@ -54,12 +59,14 @@ func Test_MultiIndexCacheInstantiation(t *testing.T) { MaxAsyncConcurrency: 1000, }, }, + MultiLevel: multiLevelCfg, }, expectedType: &multiLevelCache{}, }, "should not allow duplicate backends": { cfg: IndexCacheConfig{ - Backend: "inmemory,inmemory", + Backend: "inmemory,inmemory", + MultiLevel: multiLevelCfg, }, expectedType: &storecache.InMemoryIndexCache{}, expectedValidationError: errDuplicatedIndexCacheBackend, @@ -84,6 +91,10 @@ func Test_MultiIndexCacheInstantiation(t *testing.T) { } func Test_MultiLevelCache(t *testing.T) { + cfg := MultiLevelIndexCacheConfig{ + MaxAsyncConcurrency: 10, + MaxAsyncBufferSize: 100000, + } bID, _ := ulid.Parse("01D78XZ44G0000000000000000") ctx := context.Background() l1 := labels.Label{ @@ -257,8 +268,11 @@ func Test_MultiLevelCache(t *testing.T) { m1 := newMockIndexCache(tc.m1MockedCalls) m2 := newMockIndexCache(tc.m2MockedCalls) reg := prometheus.NewRegistry() - c := newMultiLevelCache(reg, m1, m2) + c := newMultiLevelCache(reg, cfg, m1, m2) tc.call(c) + mlc := c.(*multiLevelCache) + // Wait until async operation finishes. + mlc.backfillProcessor.Stop() require.Equal(t, tc.m1ExpectedCalls, m1.calls) require.Equal(t, tc.m2ExpectedCalls, m2.calls) }) diff --git a/vendor/github.com/thanos-io/thanos/pkg/cacheutil/async_op.go b/vendor/github.com/thanos-io/thanos/pkg/cacheutil/async_op.go index 524d2fdba2..f03f1d0827 100644 --- a/vendor/github.com/thanos-io/thanos/pkg/cacheutil/async_op.go +++ b/vendor/github.com/thanos-io/thanos/pkg/cacheutil/async_op.go @@ -5,9 +5,15 @@ package cacheutil import ( "sync" + + "github.com/pkg/errors" +) + +var ( + ErrAsyncBufferFull = errors.New("the async buffer is full") ) -type asyncOperationProcessor struct { +type AsyncOperationProcessor struct { // Channel used to notify internal goroutines when they should quit. stop chan struct{} @@ -18,8 +24,9 @@ type asyncOperationProcessor struct { workers sync.WaitGroup } -func newAsyncOperationProcessor(bufferSize, concurrency int) *asyncOperationProcessor { - p := &asyncOperationProcessor{ +// NewAsyncOperationProcessor creates an async processor with given bufferSize and concurrency. +func NewAsyncOperationProcessor(bufferSize, concurrency int) *AsyncOperationProcessor { + p := &AsyncOperationProcessor{ stop: make(chan struct{}, 1), asyncQueue: make(chan func(), bufferSize), } @@ -32,14 +39,14 @@ func newAsyncOperationProcessor(bufferSize, concurrency int) *asyncOperationProc return p } -func (p *asyncOperationProcessor) Stop() { +func (p *AsyncOperationProcessor) Stop() { close(p.stop) // Wait until all workers have terminated. p.workers.Wait() } -func (p *asyncOperationProcessor) asyncQueueProcessLoop() { +func (p *AsyncOperationProcessor) asyncQueueProcessLoop() { defer p.workers.Done() for { @@ -52,11 +59,12 @@ func (p *asyncOperationProcessor) asyncQueueProcessLoop() { } } -func (p *asyncOperationProcessor) enqueueAsync(op func()) error { +// EnqueueAsync enqueues op to async queue. If enqueue failed, ErrAsyncBufferFull is returned. +func (p *AsyncOperationProcessor) EnqueueAsync(op func()) error { select { case p.asyncQueue <- op: return nil default: - return errMemcachedAsyncBufferFull + return ErrAsyncBufferFull } } diff --git a/vendor/github.com/thanos-io/thanos/pkg/cacheutil/memcached_client.go b/vendor/github.com/thanos-io/thanos/pkg/cacheutil/memcached_client.go index 55d07d9d5a..29caaed02b 100644 --- a/vendor/github.com/thanos-io/thanos/pkg/cacheutil/memcached_client.go +++ b/vendor/github.com/thanos-io/thanos/pkg/cacheutil/memcached_client.go @@ -40,7 +40,6 @@ const ( ) var ( - errMemcachedAsyncBufferFull = errors.New("the async buffer is full") errMemcachedConfigNoAddrs = errors.New("no memcached addresses provided") errMemcachedDNSUpdateIntervalNotPositive = errors.New("DNS provider update interval must be positive") errMemcachedMaxAsyncConcurrencyNotPositive = errors.New("max async concurrency must be positive") @@ -195,7 +194,7 @@ type memcachedClient struct { duration *prometheus.HistogramVec dataSize *prometheus.HistogramVec - p *asyncOperationProcessor + p *AsyncOperationProcessor } // AddressProvider performs node address resolution given a list of clusters. @@ -278,7 +277,7 @@ func newMemcachedClient( config.MaxGetMultiConcurrency, gate.Gets, ), - p: newAsyncOperationProcessor(config.MaxAsyncBufferSize, config.MaxAsyncConcurrency), + p: NewAsyncOperationProcessor(config.MaxAsyncBufferSize, config.MaxAsyncConcurrency), } c.clientInfo = promauto.With(reg).NewGaugeFunc(prometheus.GaugeOpts{ @@ -372,7 +371,7 @@ func (c *memcachedClient) SetAsync(key string, value []byte, ttl time.Duration) return nil } - err := c.p.enqueueAsync(func() { + err := c.p.EnqueueAsync(func() { start := time.Now() c.operations.WithLabelValues(opSet).Inc() @@ -400,7 +399,7 @@ func (c *memcachedClient) SetAsync(key string, value []byte, ttl time.Duration) c.duration.WithLabelValues(opSet).Observe(time.Since(start).Seconds()) }) - if errors.Is(err, errMemcachedAsyncBufferFull) { + if errors.Is(err, ErrAsyncBufferFull) { c.skipped.WithLabelValues(opSet, reasonAsyncBufferFull).Inc() level.Debug(c.logger).Log("msg", "failed to store item to memcached because the async buffer is full", "err", err, "size", len(c.p.asyncQueue)) return nil diff --git a/vendor/github.com/thanos-io/thanos/pkg/cacheutil/redis_client.go b/vendor/github.com/thanos-io/thanos/pkg/cacheutil/redis_client.go index 1ef3755ca7..032ed2942d 100644 --- a/vendor/github.com/thanos-io/thanos/pkg/cacheutil/redis_client.go +++ b/vendor/github.com/thanos-io/thanos/pkg/cacheutil/redis_client.go @@ -149,7 +149,7 @@ type RedisClient struct { durationSetMulti prometheus.Observer durationGetMulti prometheus.Observer - p *asyncOperationProcessor + p *AsyncOperationProcessor } // NewRedisClient makes a new RedisClient. @@ -221,7 +221,7 @@ func NewRedisClientWithConfig(logger log.Logger, name string, config RedisClient client: client, config: config, logger: logger, - p: newAsyncOperationProcessor(config.MaxAsyncBufferSize, config.MaxAsyncConcurrency), + p: NewAsyncOperationProcessor(config.MaxAsyncBufferSize, config.MaxAsyncConcurrency), getMultiGate: gate.New( extprom.WrapRegistererWithPrefix("thanos_redis_getmulti_", reg), config.MaxGetMultiConcurrency, @@ -247,7 +247,7 @@ func NewRedisClientWithConfig(logger log.Logger, name string, config RedisClient // SetAsync implement RemoteCacheClient. func (c *RedisClient) SetAsync(key string, value []byte, ttl time.Duration) error { - return c.p.enqueueAsync(func() { + return c.p.EnqueueAsync(func() { start := time.Now() if err := c.client.Do(context.Background(), c.client.B().Set().Key(key).Value(rueidis.BinaryString(value)).ExSeconds(int64(ttl.Seconds())).Build()).Error(); err != nil { level.Warn(c.logger).Log("msg", "failed to set item into redis", "err", err, "key", key, "value_size", len(value)) diff --git a/vendor/github.com/thanos-io/thanos/pkg/compact/downsample/downsample.go b/vendor/github.com/thanos-io/thanos/pkg/compact/downsample/downsample.go index 829cc8d456..14963602e6 100644 --- a/vendor/github.com/thanos-io/thanos/pkg/compact/downsample/downsample.go +++ b/vendor/github.com/thanos-io/thanos/pkg/compact/downsample/downsample.go @@ -808,11 +808,11 @@ type GatherNoDownsampleMarkFilter struct { } // NewGatherNoDownsampleMarkFilter creates GatherNoDownsampleMarkFilter. -func NewGatherNoDownsampleMarkFilter(logger log.Logger, bkt objstore.InstrumentedBucketReader) *GatherNoDownsampleMarkFilter { +func NewGatherNoDownsampleMarkFilter(logger log.Logger, bkt objstore.InstrumentedBucketReader, concurrency int) *GatherNoDownsampleMarkFilter { return &GatherNoDownsampleMarkFilter{ logger: logger, bkt: bkt, - concurrency: 1, + concurrency: concurrency, } } diff --git a/vendor/github.com/thanos-io/thanos/pkg/extkingpin/path_content_reloader.go b/vendor/github.com/thanos-io/thanos/pkg/extkingpin/path_content_reloader.go index e0d8fdf74b..e96b0ddb34 100644 --- a/vendor/github.com/thanos-io/thanos/pkg/extkingpin/path_content_reloader.go +++ b/vendor/github.com/thanos-io/thanos/pkg/extkingpin/path_content_reloader.go @@ -8,6 +8,7 @@ import ( "crypto/sha256" "os" "path/filepath" + "sync" "time" "github.com/go-kit/log" @@ -79,15 +80,22 @@ func (p *pollingEngine) start(ctx context.Context) error { } type staticPathContent struct { - content []byte - path string + contentMtx sync.Mutex + content []byte + path string } var _ fileContent = (*staticPathContent)(nil) // Content returns the cached content. func (t *staticPathContent) Content() ([]byte, error) { - return t.content, nil + t.contentMtx.Lock() + defer t.contentMtx.Unlock() + + c := make([]byte, 0, len(t.content)) + c = append(c, t.content...) + + return c, nil } // Path returns the path to the file that contains the content. @@ -102,12 +110,15 @@ func NewStaticPathContent(fromPath string) (*staticPathContent, error) { if err != nil { return nil, errors.Wrapf(err, "could not load test content: %s", fromPath) } - return &staticPathContent{content, fromPath}, nil + return &staticPathContent{content: content, path: fromPath}, nil } // Rewrite rewrites the file backing this staticPathContent and swaps the local content cache. The file writing // is needed to trigger the file system monitor. func (t *staticPathContent) Rewrite(newContent []byte) error { + t.contentMtx.Lock() + defer t.contentMtx.Unlock() + t.content = newContent // Write the file to ensure possible file watcher reloaders get triggered. return os.WriteFile(t.path, newContent, 0666) diff --git a/vendor/github.com/thanos-io/thanos/pkg/shipper/shipper.go b/vendor/github.com/thanos-io/thanos/pkg/shipper/shipper.go index 69e84b963b..a3b520d15e 100644 --- a/vendor/github.com/thanos-io/thanos/pkg/shipper/shipper.go +++ b/vendor/github.com/thanos-io/thanos/pkg/shipper/shipper.go @@ -69,11 +69,12 @@ func newMetrics(reg prometheus.Registerer) *metrics { // Shipper watches a directory for matching files and directories and uploads // them to a remote data store. type Shipper struct { - logger log.Logger - dir string - metrics *metrics - bucket objstore.Bucket - source metadata.SourceType + logger log.Logger + dir string + metrics *metrics + bucket objstore.Bucket + source metadata.SourceType + metadataFilePath string uploadCompactedFunc func() bool allowOutOfOrderUploads bool @@ -96,6 +97,7 @@ func New( uploadCompactedFunc func() bool, allowOutOfOrderUploads bool, hashFunc metadata.HashFunc, + metaFileName string, ) *Shipper { if logger == nil { logger = log.NewNopLogger() @@ -104,6 +106,10 @@ func New( lbls = func() labels.Labels { return labels.EmptyLabels() } } + if metaFileName == "" { + metaFileName = DefaultMetaFilename + } + if uploadCompactedFunc == nil { uploadCompactedFunc = func() bool { return false @@ -119,6 +125,7 @@ func New( allowOutOfOrderUploads: allowOutOfOrderUploads, uploadCompactedFunc: uploadCompactedFunc, hashFunc: hashFunc, + metadataFilePath: filepath.Join(dir, filepath.Clean(metaFileName)), } } @@ -139,7 +146,7 @@ func (s *Shipper) getLabels() labels.Labels { // Timestamps returns the minimum timestamp for which data is available and the highest timestamp // of blocks that were successfully uploaded. func (s *Shipper) Timestamps() (minTime, maxSyncTime int64, err error) { - meta, err := ReadMetaFile(s.dir) + meta, err := ReadMetaFile(s.metadataFilePath) if err != nil { return 0, 0, errors.Wrap(err, "read shipper meta file") } @@ -247,7 +254,7 @@ func (c *lazyOverlapChecker) IsOverlapping(ctx context.Context, newMeta tsdb.Blo // // It is not concurrency-safe, however it is compactor-safe (running concurrently with compactor is ok). func (s *Shipper) Sync(ctx context.Context) (uploaded int, err error) { - meta, err := ReadMetaFile(s.dir) + meta, err := ReadMetaFile(s.metadataFilePath) if err != nil { // If we encounter any error, proceed with an empty meta file and overwrite it later. // The meta file is only used to avoid unnecessary bucket.Exists call, @@ -330,7 +337,7 @@ func (s *Shipper) Sync(ctx context.Context) (uploaded int, err error) { uploaded++ s.metrics.uploads.Inc() } - if err := WriteMetaFile(s.logger, s.dir, meta); err != nil { + if err := WriteMetaFile(s.logger, s.metadataFilePath, meta); err != nil { level.Warn(s.logger).Log("msg", "updating meta file failed", "err", err) } @@ -457,17 +464,16 @@ type Meta struct { } const ( - // MetaFilename is the known JSON filename for meta information. - MetaFilename = "thanos.shipper.json" + // DefaultMetaFilename is the default JSON filename for meta information. + DefaultMetaFilename = "thanos.shipper.json" // MetaVersion1 represents 1 version of meta. MetaVersion1 = 1 ) // WriteMetaFile writes the given meta into /thanos.shipper.json. -func WriteMetaFile(logger log.Logger, dir string, meta *Meta) error { +func WriteMetaFile(logger log.Logger, path string, meta *Meta) error { // Make any changes to the file appear atomic. - path := filepath.Join(dir, MetaFilename) tmp := path + ".tmp" f, err := os.Create(tmp) @@ -489,16 +495,15 @@ func WriteMetaFile(logger log.Logger, dir string, meta *Meta) error { } // ReadMetaFile reads the given meta from /thanos.shipper.json. -func ReadMetaFile(dir string) (*Meta, error) { - fpath := filepath.Join(dir, filepath.Clean(MetaFilename)) - b, err := os.ReadFile(fpath) +func ReadMetaFile(path string) (*Meta, error) { + b, err := os.ReadFile(path) if err != nil { - return nil, errors.Wrapf(err, "failed to read %s", fpath) + return nil, errors.Wrapf(err, "failed to read %s", path) } var m Meta if err := json.Unmarshal(b, &m); err != nil { - return nil, errors.Wrapf(err, "failed to parse %s as JSON: %q", fpath, string(b)) + return nil, errors.Wrapf(err, "failed to parse %s as JSON: %q", path, string(b)) } if m.Version != MetaVersion1 { return nil, errors.Errorf("unexpected meta file version %d", m.Version) diff --git a/vendor/github.com/thanos-io/thanos/pkg/store/prometheus.go b/vendor/github.com/thanos-io/thanos/pkg/store/prometheus.go index f9e836aca9..05af3b5d93 100644 --- a/vendor/github.com/thanos-io/thanos/pkg/store/prometheus.go +++ b/vendor/github.com/thanos-io/thanos/pkg/store/prometheus.go @@ -189,15 +189,17 @@ func (p *PrometheusStore) Series(r *storepb.SeriesRequest, seriesSrv storepb.Sto if err != nil { return err } + var b labels.Builder for _, lbm := range labelMaps { - lset := make([]labelpb.ZLabel, 0, len(lbm)+finalExtLset.Len()) + b.Reset(labels.EmptyLabels()) for k, v := range lbm { - lset = append(lset, labelpb.ZLabel{Name: k, Value: v}) + b.Set(k, v) } - lset = append(lset, labelpb.ZLabelsFromPromLabels(finalExtLset)...) - sort.Slice(lset, func(i, j int) bool { - return lset[i].Name < lset[j].Name + // external labels should take precedence + finalExtLset.Range(func(l labels.Label) { + b.Set(l.Name, l.Value) }) + lset := labelpb.ZLabelsFromPromLabels(b.Labels()) if err = s.Send(storepb.NewSeriesResponse(&storepb.Series{Labels: lset})); err != nil { return err } diff --git a/vendor/modules.txt b/vendor/modules.txt index 91b3e4a24c..c513f2a6d7 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -906,7 +906,7 @@ github.com/thanos-io/promql-engine/extlabels github.com/thanos-io/promql-engine/logicalplan github.com/thanos-io/promql-engine/query github.com/thanos-io/promql-engine/worker -# github.com/thanos-io/thanos v0.32.5-0.20231103115946-463a6ce8b53c +# github.com/thanos-io/thanos v0.32.5-0.20231120163350-0a4f5ae3310e ## explicit; go 1.21 github.com/thanos-io/thanos/pkg/block github.com/thanos-io/thanos/pkg/block/indexheader