From ce6aeba798bdd359c01ba2ea67b45894b97e7103 Mon Sep 17 00:00:00 2001 From: Filip Petkovski Date: Sat, 24 Feb 2024 20:52:24 +0100 Subject: [PATCH] Allow using different listing strategies (#7134) * Allow using different listing strategies Signed-off-by: Filip Petkovski * Expose flags for block list strategy Signed-off-by: Filip Petkovski * Run make docs Signed-off-by: Filip Petkovski * Fix whitespace Signed-off-by: Filip Petkovski * Add CHANGELOG entry Signed-off-by: Filip Petkovski --------- Signed-off-by: Filip Petkovski --- CHANGELOG.md | 1 + cmd/thanos/compact.go | 16 ++++- cmd/thanos/downsample.go | 2 +- cmd/thanos/main_test.go | 5 +- cmd/thanos/store.go | 25 ++++++- cmd/thanos/tools_bucket.go | 12 ++-- docs/components/compact.md | 9 +++ docs/components/store.md | 9 +++ pkg/block/fetcher.go | 117 +++++++++++++++++++++++++------- pkg/block/fetcher_test.go | 3 +- pkg/compact/clean_test.go | 3 +- pkg/compact/compact_e2e_test.go | 7 +- pkg/compact/retention_test.go | 3 +- pkg/replicate/replicator.go | 2 +- pkg/store/acceptance_test.go | 3 +- pkg/store/bucket_e2e_test.go | 2 +- pkg/store/bucket_test.go | 18 ++--- 17 files changed, 182 insertions(+), 55 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b36e94674e..e9ca7af64e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#7083](https://github.com/thanos-io/thanos/pull/7083) Store Gateway: Fix lazy expanded postings with 0 length failed to be cached. - [#7080](https://github.com/thanos-io/thanos/pull/7080) Receive: race condition in handler Close() when stopped early - [#7132](https://github.com/thanos-io/thanos/pull/7132) Documentation: fix broken helm installation instruction +- [#7134](https://github.com/thanos-io/thanos/pull/7134) Store, Compact: Revert the recursive block listing mechanism introduced in https://github.com/thanos-io/thanos/pull/6474 and use the same strategy as in 0.31. Introduce a `--block-discovery-strategy` flag to control the listing strategy so that a recursive lister can still be used if the tradeoff of slower but cheaper discovery is preferred. ### Added - [#7105](https://github.com/thanos-io/thanos/pull/7105) Rule: add flag `--query.enable-x-functions` to allow usage of extended promql functions (xrate, xincrease, xdelta) in loaded rules diff --git a/cmd/thanos/compact.go b/cmd/thanos/compact.go index 90a12850a9..a6364a3bee 100644 --- a/cmd/thanos/compact.go +++ b/cmd/thanos/compact.go @@ -239,8 +239,16 @@ func runCompact( consistencyDelayMetaFilter := block.NewConsistencyDelayMetaFilter(logger, conf.consistencyDelay, extprom.WrapRegistererWithPrefix("thanos_", reg)) timePartitionMetaFilter := block.NewTimePartitionMetaFilter(conf.filterConf.MinTime, conf.filterConf.MaxTime) - baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, insBkt) - baseMetaFetcher, err := block.NewBaseFetcher(logger, conf.blockMetaFetchConcurrency, insBkt, baseBlockIDsFetcher, conf.dataDir, extprom.WrapRegistererWithPrefix("thanos_", reg)) + var blockLister block.Lister + switch syncStrategy(conf.blockListStrategy) { + case concurrentDiscovery: + blockLister = block.NewConcurrentLister(logger, insBkt) + case recursiveDiscovery: + blockLister = block.NewRecursiveLister(logger, insBkt) + default: + return errors.Errorf("unknown sync strategy %s", conf.blockListStrategy) + } + baseMetaFetcher, err := block.NewBaseFetcher(logger, conf.blockMetaFetchConcurrency, insBkt, blockLister, conf.dataDir, extprom.WrapRegistererWithPrefix("thanos_", reg)) if err != nil { return errors.Wrap(err, "create meta fetcher") } @@ -695,6 +703,7 @@ type compactConfig struct { wait bool waitInterval time.Duration disableDownsampling bool + blockListStrategy string blockMetaFetchConcurrency int blockFilesConcurrency int blockViewerSyncBlockInterval time.Duration @@ -757,6 +766,9 @@ func (cc *compactConfig) registerFlag(cmd extkingpin.FlagClause) { "as querying long time ranges without non-downsampled data is not efficient and useful e.g it is not possible to render all samples for a human eye anyway"). Default("false").BoolVar(&cc.disableDownsampling) + strategies := strings.Join([]string{string(concurrentDiscovery), string(recursiveDiscovery)}, ", ") + cmd.Flag("block-discovery-strategy", "One of "+strategies+". When set to concurrent, stores will concurrently issue one call per directory to discover active blocks in the bucket. The recursive strategy iterates through all objects in the bucket, recursively traversing into each directory. This avoids N+1 calls at the expense of having slower bucket iterations."). + Default(string(concurrentDiscovery)).StringVar(&cc.blockListStrategy) cmd.Flag("block-meta-fetch-concurrency", "Number of goroutines to use when fetching block metadata from object storage."). Default("32").IntVar(&cc.blockMetaFetchConcurrency) cmd.Flag("block-files-concurrency", "Number of goroutines to use when fetching/uploading block files from object storage."). diff --git a/cmd/thanos/downsample.go b/cmd/thanos/downsample.go index 6045bb29c1..3d6b129261 100644 --- a/cmd/thanos/downsample.go +++ b/cmd/thanos/downsample.go @@ -90,7 +90,7 @@ func RunDownsample( insBkt := objstoretracing.WrapWithTraces(objstore.WrapWithMetrics(bkt, extprom.WrapRegistererWithPrefix("thanos_", reg), bkt.Name())) // While fetching blocks, filter out blocks that were marked for no downsample. - baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, insBkt) + baseBlockIDsFetcher := block.NewConcurrentLister(logger, insBkt) metaFetcher, err := block.NewMetaFetcher(logger, block.FetcherConcurrency, insBkt, baseBlockIDsFetcher, "", extprom.WrapRegistererWithPrefix("thanos_", reg), []block.MetadataFilter{ block.NewDeduplicateFilter(block.FetcherConcurrency), downsample.NewGatherNoDownsampleMarkFilter(logger, insBkt, block.FetcherConcurrency), diff --git a/cmd/thanos/main_test.go b/cmd/thanos/main_test.go index d8d1fffef0..1ced04637b 100644 --- a/cmd/thanos/main_test.go +++ b/cmd/thanos/main_test.go @@ -21,6 +21,7 @@ import ( "github.com/thanos-io/objstore" "github.com/efficientgo/core/testutil" + "github.com/thanos-io/thanos/pkg/block" "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/compact/downsample" @@ -157,7 +158,7 @@ func TestRegression4960_Deadlock(t *testing.T) { metrics := newDownsampleMetrics(prometheus.NewRegistry()) testutil.Equals(t, 0.0, promtest.ToFloat64(metrics.downsamples.WithLabelValues(meta.Thanos.GroupKey()))) - baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, bkt) + baseBlockIDsFetcher := block.NewConcurrentLister(logger, bkt) metaFetcher, err := block.NewMetaFetcher(nil, block.FetcherConcurrency, bkt, baseBlockIDsFetcher, "", nil, nil) testutil.Ok(t, err) @@ -197,7 +198,7 @@ func TestCleanupDownsampleCacheFolder(t *testing.T) { metrics := newDownsampleMetrics(prometheus.NewRegistry()) testutil.Equals(t, 0.0, promtest.ToFloat64(metrics.downsamples.WithLabelValues(meta.Thanos.GroupKey()))) - baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, bkt) + baseBlockIDsFetcher := block.NewConcurrentLister(logger, bkt) metaFetcher, err := block.NewMetaFetcher(nil, block.FetcherConcurrency, bkt, baseBlockIDsFetcher, "", nil, nil) testutil.Ok(t, err) diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go index cb0160445e..3b92341745 100644 --- a/cmd/thanos/store.go +++ b/cmd/thanos/store.go @@ -7,6 +7,7 @@ import ( "context" "fmt" "strconv" + "strings" "time" "github.com/alecthomas/units" @@ -56,6 +57,13 @@ const ( retryIntervalDuration = 10 ) +type syncStrategy string + +const ( + concurrentDiscovery syncStrategy = "concurrent" + recursiveDiscovery syncStrategy = "recursive" +) + type storeConfig struct { indexCacheConfigs extflag.PathOrContent objStoreConfig extflag.PathOrContent @@ -74,6 +82,7 @@ type storeConfig struct { component component.StoreAPI debugLogging bool syncInterval time.Duration + blockListStrategy string blockSyncConcurrency int blockMetaFetchConcurrency int filterConf *store.FilterConfig @@ -137,6 +146,10 @@ func (sc *storeConfig) registerFlag(cmd extkingpin.FlagClause) { cmd.Flag("sync-block-duration", "Repeat interval for syncing the blocks between local and remote view."). Default("15m").DurationVar(&sc.syncInterval) + strategies := strings.Join([]string{string(concurrentDiscovery), string(recursiveDiscovery)}, ", ") + cmd.Flag("block-discovery-strategy", "One of "+strategies+". When set to concurrent, stores will concurrently issue one call per directory to discover active blocks in the bucket. The recursive strategy iterates through all objects in the bucket, recursively traversing into each directory. This avoids N+1 calls at the expense of having slower bucket iterations."). + Default(string(concurrentDiscovery)).StringVar(&sc.blockListStrategy) + cmd.Flag("block-sync-concurrency", "Number of goroutines to use when constructing index-cache.json blocks from object storage. Must be equal or greater than 1."). Default("20").IntVar(&sc.blockSyncConcurrency) @@ -345,9 +358,17 @@ func runStore( return errors.Wrap(err, "create index cache") } + var blockLister block.Lister + switch syncStrategy(conf.blockListStrategy) { + case concurrentDiscovery: + blockLister = block.NewConcurrentLister(logger, insBkt) + case recursiveDiscovery: + blockLister = block.NewRecursiveLister(logger, insBkt) + default: + return errors.Errorf("unknown sync strategy %s", conf.blockListStrategy) + } ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, insBkt, time.Duration(conf.ignoreDeletionMarksDelay), conf.blockMetaFetchConcurrency) - baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, insBkt) - metaFetcher, err := block.NewMetaFetcher(logger, conf.blockMetaFetchConcurrency, insBkt, baseBlockIDsFetcher, dataDir, extprom.WrapRegistererWithPrefix("thanos_", reg), + metaFetcher, err := block.NewMetaFetcher(logger, conf.blockMetaFetchConcurrency, insBkt, blockLister, dataDir, extprom.WrapRegistererWithPrefix("thanos_", reg), []block.MetadataFilter{ block.NewTimePartitionMetaFilter(conf.filterConf.MinTime, conf.filterConf.MaxTime), block.NewLabelShardedMetaFilter(relabelConfig), diff --git a/cmd/thanos/tools_bucket.go b/cmd/thanos/tools_bucket.go index a266cb381f..326e4b09eb 100644 --- a/cmd/thanos/tools_bucket.go +++ b/cmd/thanos/tools_bucket.go @@ -365,7 +365,7 @@ func registerBucketVerify(app extkingpin.AppClause, objStoreConfig *extflag.Path // We ignore any block that has the deletion marker file. filters := []block.MetadataFilter{block.NewIgnoreDeletionMarkFilter(logger, insBkt, 0, block.FetcherConcurrency)} - baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, insBkt) + baseBlockIDsFetcher := block.NewConcurrentLister(logger, insBkt) fetcher, err := block.NewMetaFetcher(logger, block.FetcherConcurrency, insBkt, baseBlockIDsFetcher, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg), filters) if err != nil { return err @@ -423,7 +423,7 @@ func registerBucketLs(app extkingpin.AppClause, objStoreConfig *extflag.PathOrCo ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, insBkt, 0, block.FetcherConcurrency) filters = append(filters, ignoreDeletionMarkFilter) } - baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, insBkt) + baseBlockIDsFetcher := block.NewConcurrentLister(logger, insBkt) fetcher, err := block.NewMetaFetcher(logger, block.FetcherConcurrency, insBkt, baseBlockIDsFetcher, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg), filters) if err != nil { return err @@ -525,7 +525,7 @@ func registerBucketInspect(app extkingpin.AppClause, objStoreConfig *extflag.Pat } insBkt := objstoretracing.WrapWithTraces(objstore.WrapWithMetrics(bkt, extprom.WrapRegistererWithPrefix("thanos_", reg), bkt.Name())) - baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, insBkt) + baseBlockIDsFetcher := block.NewConcurrentLister(logger, insBkt) fetcher, err := block.NewMetaFetcher(logger, block.FetcherConcurrency, insBkt, baseBlockIDsFetcher, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg), nil) if err != nil { return err @@ -669,7 +669,7 @@ func registerBucketWeb(app extkingpin.AppClause, objStoreConfig *extflag.PathOrC return err } // TODO(bwplotka): Allow Bucket UI to visualize the state of block as well. - baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, insBkt) + baseBlockIDsFetcher := block.NewConcurrentLister(logger, insBkt) fetcher, err := block.NewMetaFetcher(logger, block.FetcherConcurrency, insBkt, baseBlockIDsFetcher, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg), []block.MetadataFilter{ block.NewTimePartitionMetaFilter(filterConf.MinTime, filterConf.MaxTime), @@ -848,7 +848,7 @@ func registerBucketCleanup(app extkingpin.AppClause, objStoreConfig *extflag.Pat var sy *compact.Syncer { - baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, insBkt) + baseBlockIDsFetcher := block.NewConcurrentLister(logger, insBkt) baseMetaFetcher, err := block.NewBaseFetcher(logger, tbc.blockSyncConcurrency, insBkt, baseBlockIDsFetcher, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg)) if err != nil { return errors.Wrap(err, "create meta fetcher") @@ -1391,7 +1391,7 @@ func registerBucketRetention(app extkingpin.AppClause, objStoreConfig *extflag.P var sy *compact.Syncer { - baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, insBkt) + baseBlockIDsFetcher := block.NewConcurrentLister(logger, insBkt) baseMetaFetcher, err := block.NewBaseFetcher(logger, tbc.blockSyncConcurrency, insBkt, baseBlockIDsFetcher, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg)) if err != nil { return errors.Wrap(err, "create meta fetcher") diff --git a/docs/components/compact.md b/docs/components/compact.md index d210dd55a8..91e6fd04c6 100644 --- a/docs/components/compact.md +++ b/docs/components/compact.md @@ -279,6 +279,15 @@ usage: thanos compact [] Continuously compacts blocks in an object store bucket. Flags: + --block-discovery-strategy="concurrent" + One of concurrent, recursive. When set to + concurrent, stores will concurrently issue + one call per directory to discover active + blocks in the bucket. The recursive strategy + iterates through all objects in the bucket, + recursively traversing into each directory. + This avoids N+1 calls at the expense of having + slower bucket iterations. --block-files-concurrency=1 Number of goroutines to use when fetching/uploading block files from object diff --git a/docs/components/store.md b/docs/components/store.md index 8ecc53d68f..cf96bfdf4c 100644 --- a/docs/components/store.md +++ b/docs/components/store.md @@ -29,6 +29,15 @@ Store node giving access to blocks in a bucket provider. Now supported GCS, S3, Azure, Swift, Tencent COS and Aliyun OSS. Flags: + --block-discovery-strategy="concurrent" + One of concurrent, recursive. When set to + concurrent, stores will concurrently issue + one call per directory to discover active + blocks in the bucket. The recursive strategy + iterates through all objects in the bucket, + recursively traversing into each directory. + This avoids N+1 calls at the expense of having + slower bucket iterations. --block-meta-fetch-concurrency=32 Number of goroutines to use when fetching block metadata from object storage. diff --git a/pkg/block/fetcher.go b/pkg/block/fetcher.go index 875909e182..fe9fc6c244 100644 --- a/pkg/block/fetcher.go +++ b/pkg/block/fetcher.go @@ -209,26 +209,27 @@ func DefaultModifiedLabelValues() [][]string { } } -// Fetcher interface to retieve blockId information from a bucket. -type BlockIDsFetcher interface { - // GetActiveBlocksIDs returning it via channel (streaming) and response. +// Lister lists block IDs from a bucket. +type Lister interface { + // GetActiveAndPartialBlockIDs GetActiveBlocksIDs returning it via channel (streaming) and response. // Active blocks are blocks which contain meta.json, while partial blocks are blocks without meta.json GetActiveAndPartialBlockIDs(ctx context.Context, ch chan<- ulid.ULID) (partialBlocks map[ulid.ULID]bool, err error) } -type BaseBlockIDsFetcher struct { +// RecursiveLister lists block IDs by recursively iterating through a bucket. +type RecursiveLister struct { logger log.Logger bkt objstore.InstrumentedBucketReader } -func NewBaseBlockIDsFetcher(logger log.Logger, bkt objstore.InstrumentedBucketReader) *BaseBlockIDsFetcher { - return &BaseBlockIDsFetcher{ +func NewRecursiveLister(logger log.Logger, bkt objstore.InstrumentedBucketReader) *RecursiveLister { + return &RecursiveLister{ logger: logger, bkt: bkt, } } -func (f *BaseBlockIDsFetcher) GetActiveAndPartialBlockIDs(ctx context.Context, ch chan<- ulid.ULID) (partialBlocks map[ulid.ULID]bool, err error) { +func (f *RecursiveLister) GetActiveAndPartialBlockIDs(ctx context.Context, ch chan<- ulid.ULID) (partialBlocks map[ulid.ULID]bool, err error) { partialBlocks = make(map[ulid.ULID]bool) err = f.bkt.Iter(ctx, "", func(name string) error { parts := strings.Split(name, "/") @@ -255,6 +256,74 @@ func (f *BaseBlockIDsFetcher) GetActiveAndPartialBlockIDs(ctx context.Context, c return partialBlocks, err } +// ConcurrentLister lists block IDs by doing a top level iteration of the bucket +// followed by one Exists call for each discovered block to detect partial blocks. +type ConcurrentLister struct { + logger log.Logger + bkt objstore.InstrumentedBucketReader +} + +func NewConcurrentLister(logger log.Logger, bkt objstore.InstrumentedBucketReader) *ConcurrentLister { + return &ConcurrentLister{ + logger: logger, + bkt: bkt, + } +} + +func (f *ConcurrentLister) GetActiveAndPartialBlockIDs(ctx context.Context, ch chan<- ulid.ULID) (partialBlocks map[ulid.ULID]bool, err error) { + const concurrency = 64 + + partialBlocks = make(map[ulid.ULID]bool) + var ( + metaChan = make(chan ulid.ULID, concurrency) + eg, gCtx = errgroup.WithContext(ctx) + mu sync.Mutex + ) + for i := 0; i < concurrency; i++ { + eg.Go(func() error { + for uid := range metaChan { + // TODO(bwplotka): If that causes problems (obj store rate limits), add longer ttl to cached items. + // For 1y and 100 block sources this generates ~1.5-3k HEAD RPM. AWS handles 330k RPM per prefix. + // TODO(bwplotka): Consider filtering by consistency delay here (can't do until compactor healthyOverride work). + metaFile := path.Join(uid.String(), MetaFilename) + ok, err := f.bkt.Exists(gCtx, metaFile) + if err != nil { + return errors.Wrapf(err, "meta.json file exists: %v", uid) + } + if !ok { + mu.Lock() + partialBlocks[uid] = true + mu.Unlock() + continue + } + ch <- uid + } + return nil + }) + } + + if err = f.bkt.Iter(ctx, "", func(name string) error { + id, ok := IsBlockDir(name) + if !ok { + return nil + } + select { + case <-ctx.Done(): + return ctx.Err() + case metaChan <- id: + } + return nil + }); err != nil { + return nil, err + } + close(metaChan) + + if err := eg.Wait(); err != nil { + return nil, err + } + return partialBlocks, nil +} + type MetadataFetcher interface { Fetch(ctx context.Context) (metas map[ulid.ULID]*metadata.Meta, partial map[ulid.ULID]error, err error) UpdateOnChange(func([]metadata.Meta, error)) @@ -273,10 +342,10 @@ type MetadataFilter interface { // BaseFetcher is a struct that synchronizes filtered metadata of all block in the object storage with the local state. // Go-routine safe. type BaseFetcher struct { - logger log.Logger - concurrency int - bkt objstore.InstrumentedBucketReader - blockIDsFetcher BlockIDsFetcher + logger log.Logger + concurrency int + bkt objstore.InstrumentedBucketReader + blockIDsLister Lister // Optional local directory to cache meta.json files. cacheDir string @@ -289,12 +358,12 @@ type BaseFetcher struct { } // NewBaseFetcher constructs BaseFetcher. -func NewBaseFetcher(logger log.Logger, concurrency int, bkt objstore.InstrumentedBucketReader, blockIDsFetcher BlockIDsFetcher, dir string, reg prometheus.Registerer) (*BaseFetcher, error) { +func NewBaseFetcher(logger log.Logger, concurrency int, bkt objstore.InstrumentedBucketReader, blockIDsFetcher Lister, dir string, reg prometheus.Registerer) (*BaseFetcher, error) { return NewBaseFetcherWithMetrics(logger, concurrency, bkt, blockIDsFetcher, dir, NewBaseFetcherMetrics(reg)) } // NewBaseFetcherWithMetrics constructs BaseFetcher. -func NewBaseFetcherWithMetrics(logger log.Logger, concurrency int, bkt objstore.InstrumentedBucketReader, blockIDsFetcher BlockIDsFetcher, dir string, metrics *BaseFetcherMetrics) (*BaseFetcher, error) { +func NewBaseFetcherWithMetrics(logger log.Logger, concurrency int, bkt objstore.InstrumentedBucketReader, blockIDsLister Lister, dir string, metrics *BaseFetcherMetrics) (*BaseFetcher, error) { if logger == nil { logger = log.NewNopLogger() } @@ -308,24 +377,24 @@ func NewBaseFetcherWithMetrics(logger log.Logger, concurrency int, bkt objstore. } return &BaseFetcher{ - logger: log.With(logger, "component", "block.BaseFetcher"), - concurrency: concurrency, - bkt: bkt, - blockIDsFetcher: blockIDsFetcher, - cacheDir: cacheDir, - cached: map[ulid.ULID]*metadata.Meta{}, - metrics: metrics, + logger: log.With(logger, "component", "block.BaseFetcher"), + concurrency: concurrency, + bkt: bkt, + blockIDsLister: blockIDsLister, + cacheDir: cacheDir, + cached: map[ulid.ULID]*metadata.Meta{}, + metrics: metrics, }, nil } // NewRawMetaFetcher returns basic meta fetcher without proper handling for eventual consistent backends or partial uploads. // NOTE: Not suitable to use in production. -func NewRawMetaFetcher(logger log.Logger, bkt objstore.InstrumentedBucketReader, blockIDsFetcher BlockIDsFetcher) (*MetaFetcher, error) { +func NewRawMetaFetcher(logger log.Logger, bkt objstore.InstrumentedBucketReader, blockIDsFetcher Lister) (*MetaFetcher, error) { return NewMetaFetcher(logger, 1, bkt, blockIDsFetcher, "", nil, nil) } // NewMetaFetcher returns meta fetcher. -func NewMetaFetcher(logger log.Logger, concurrency int, bkt objstore.InstrumentedBucketReader, blockIDsFetcher BlockIDsFetcher, dir string, reg prometheus.Registerer, filters []MetadataFilter) (*MetaFetcher, error) { +func NewMetaFetcher(logger log.Logger, concurrency int, bkt objstore.InstrumentedBucketReader, blockIDsFetcher Lister, dir string, reg prometheus.Registerer, filters []MetadataFilter) (*MetaFetcher, error) { b, err := NewBaseFetcher(logger, concurrency, bkt, blockIDsFetcher, dir, reg) if err != nil { return nil, err @@ -334,7 +403,7 @@ func NewMetaFetcher(logger log.Logger, concurrency int, bkt objstore.Instrumente } // NewMetaFetcherWithMetrics returns meta fetcher. -func NewMetaFetcherWithMetrics(logger log.Logger, concurrency int, bkt objstore.InstrumentedBucketReader, blockIDsFetcher BlockIDsFetcher, dir string, baseFetcherMetrics *BaseFetcherMetrics, fetcherMetrics *FetcherMetrics, filters []MetadataFilter) (*MetaFetcher, error) { +func NewMetaFetcherWithMetrics(logger log.Logger, concurrency int, bkt objstore.InstrumentedBucketReader, blockIDsFetcher Lister, dir string, baseFetcherMetrics *BaseFetcherMetrics, fetcherMetrics *FetcherMetrics, filters []MetadataFilter) (*MetaFetcher, error) { b, err := NewBaseFetcherWithMetrics(logger, concurrency, bkt, blockIDsFetcher, dir, baseFetcherMetrics) if err != nil { return nil, err @@ -497,7 +566,7 @@ func (f *BaseFetcher) fetchMetadata(ctx context.Context) (interface{}, error) { // Workers scheduled, distribute blocks. eg.Go(func() error { defer close(ch) - partialBlocks, err = f.blockIDsFetcher.GetActiveAndPartialBlockIDs(ctx, ch) + partialBlocks, err = f.blockIDsLister.GetActiveAndPartialBlockIDs(ctx, ch) return err }) diff --git a/pkg/block/fetcher_test.go b/pkg/block/fetcher_test.go index 4737b09972..c9d787547c 100644 --- a/pkg/block/fetcher_test.go +++ b/pkg/block/fetcher_test.go @@ -27,6 +27,7 @@ import ( "github.com/thanos-io/objstore/objtesting" "github.com/efficientgo/core/testutil" + "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/extprom" "github.com/thanos-io/thanos/pkg/model" @@ -73,7 +74,7 @@ func TestMetaFetcher_Fetch(t *testing.T) { r := prometheus.NewRegistry() noopLogger := log.NewNopLogger() insBkt := objstore.WithNoopInstr(bkt) - baseBlockIDsFetcher := NewBaseBlockIDsFetcher(noopLogger, insBkt) + baseBlockIDsFetcher := NewConcurrentLister(noopLogger, insBkt) baseFetcher, err := NewBaseFetcher(noopLogger, 20, insBkt, baseBlockIDsFetcher, dir, r) testutil.Ok(t, err) diff --git a/pkg/compact/clean_test.go b/pkg/compact/clean_test.go index b66ca9f018..cd6135b702 100644 --- a/pkg/compact/clean_test.go +++ b/pkg/compact/clean_test.go @@ -19,6 +19,7 @@ import ( "github.com/thanos-io/objstore" "github.com/efficientgo/core/testutil" + "github.com/thanos-io/thanos/pkg/block" "github.com/thanos-io/thanos/pkg/block/metadata" ) @@ -30,7 +31,7 @@ func TestBestEffortCleanAbortedPartialUploads(t *testing.T) { bkt := objstore.WithNoopInstr(objstore.NewInMemBucket()) logger := log.NewNopLogger() - baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, bkt) + baseBlockIDsFetcher := block.NewConcurrentLister(logger, bkt) metaFetcher, err := block.NewMetaFetcher(nil, 32, bkt, baseBlockIDsFetcher, "", nil, nil) testutil.Ok(t, err) diff --git a/pkg/compact/compact_e2e_test.go b/pkg/compact/compact_e2e_test.go index f1e01ec4f4..12ef896154 100644 --- a/pkg/compact/compact_e2e_test.go +++ b/pkg/compact/compact_e2e_test.go @@ -27,6 +27,7 @@ import ( "github.com/thanos-io/objstore/objtesting" "github.com/efficientgo/core/testutil" + "github.com/thanos-io/thanos/pkg/block" "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/dedup" @@ -95,7 +96,7 @@ func TestSyncer_GarbageCollect_e2e(t *testing.T) { duplicateBlocksFilter := block.NewDeduplicateFilter(fetcherConcurrency) insBkt := objstore.WithNoopInstr(bkt) - baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(nil, insBkt) + baseBlockIDsFetcher := block.NewConcurrentLister(nil, insBkt) metaFetcher, err := block.NewMetaFetcher(nil, 32, insBkt, baseBlockIDsFetcher, "", nil, []block.MetadataFilter{ duplicateBlocksFilter, }) @@ -197,7 +198,7 @@ func testGroupCompactE2e(t *testing.T, mergeFunc storage.VerticalChunkSeriesMerg duplicateBlocksFilter := block.NewDeduplicateFilter(fetcherConcurrency) noCompactMarkerFilter := NewGatherNoCompactionMarkFilter(logger, objstore.WithNoopInstr(bkt), 2) insBkt := objstore.WithNoopInstr(bkt) - baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, insBkt) + baseBlockIDsFetcher := block.NewConcurrentLister(logger, insBkt) metaFetcher, err := block.NewMetaFetcher(nil, 32, insBkt, baseBlockIDsFetcher, "", nil, []block.MetadataFilter{ ignoreDeletionMarkFilter, duplicateBlocksFilter, @@ -509,7 +510,7 @@ func TestGarbageCollectDoesntCreateEmptyBlocksWithDeletionMarksOnly(t *testing.T duplicateBlocksFilter := block.NewDeduplicateFilter(fetcherConcurrency) insBkt := objstore.WithNoopInstr(bkt) - baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, insBkt) + baseBlockIDsFetcher := block.NewConcurrentLister(logger, insBkt) metaFetcher, err := block.NewMetaFetcher(nil, 32, insBkt, baseBlockIDsFetcher, "", nil, []block.MetadataFilter{ ignoreDeletionMarkFilter, duplicateBlocksFilter, diff --git a/pkg/compact/retention_test.go b/pkg/compact/retention_test.go index c1936f095a..d80895617c 100644 --- a/pkg/compact/retention_test.go +++ b/pkg/compact/retention_test.go @@ -21,6 +21,7 @@ import ( "github.com/thanos-io/objstore" "github.com/efficientgo/core/testutil" + "github.com/thanos-io/thanos/pkg/block" "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/compact" @@ -245,7 +246,7 @@ func TestApplyRetentionPolicyByResolution(t *testing.T) { uploadMockBlock(t, bkt, b.id, b.minTime, b.maxTime, int64(b.resolution)) } - baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, bkt) + baseBlockIDsFetcher := block.NewConcurrentLister(logger, bkt) metaFetcher, err := block.NewMetaFetcher(logger, 32, bkt, baseBlockIDsFetcher, "", nil, nil) testutil.Ok(t, err) diff --git a/pkg/replicate/replicator.go b/pkg/replicate/replicator.go index 1f8cdef2e4..668d64afce 100644 --- a/pkg/replicate/replicator.go +++ b/pkg/replicate/replicator.go @@ -244,7 +244,7 @@ func newMetaFetcher( if ignoreMarkedForDeletion { filters = append(filters, thanosblock.NewIgnoreDeletionMarkFilter(logger, fromBkt, 0, concurrency)) } - baseBlockIDsFetcher := thanosblock.NewBaseBlockIDsFetcher(logger, fromBkt) + baseBlockIDsFetcher := thanosblock.NewConcurrentLister(logger, fromBkt) return thanosblock.NewMetaFetcher( logger, concurrency, diff --git a/pkg/store/acceptance_test.go b/pkg/store/acceptance_test.go index be1a1179f1..46cbe9490a 100644 --- a/pkg/store/acceptance_test.go +++ b/pkg/store/acceptance_test.go @@ -26,6 +26,7 @@ import ( "github.com/thanos-io/objstore" "github.com/thanos-io/objstore/providers/filesystem" + "github.com/thanos-io/thanos/pkg/block" "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/component" @@ -901,7 +902,7 @@ func TestBucketStore_Acceptance(t *testing.T) { testutil.Ok(tt, err) insBkt := objstore.WithNoopInstr(bkt) - baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, insBkt) + baseBlockIDsFetcher := block.NewConcurrentLister(logger, insBkt) metaFetcher, err := block.NewMetaFetcher(logger, 20, insBkt, baseBlockIDsFetcher, metaDir, nil, []block.MetadataFilter{ block.NewTimePartitionMetaFilter(allowAllFilterConf.MinTime, allowAllFilterConf.MaxTime), }) diff --git a/pkg/store/bucket_e2e_test.go b/pkg/store/bucket_e2e_test.go index 02c182cd0b..c91fb4096d 100644 --- a/pkg/store/bucket_e2e_test.go +++ b/pkg/store/bucket_e2e_test.go @@ -154,7 +154,7 @@ func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, m } insBkt := objstore.WithNoopInstr(bkt) - baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(s.logger, insBkt) + baseBlockIDsFetcher := block.NewConcurrentLister(s.logger, insBkt) metaFetcher, err := block.NewMetaFetcher(s.logger, 20, insBkt, baseBlockIDsFetcher, dir, nil, []block.MetadataFilter{ block.NewTimePartitionMetaFilter(filterConf.MinTime, filterConf.MaxTime), block.NewLabelShardedMetaFilter(relabelConfig), diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index d38e14587d..5a665fb1e0 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -883,7 +883,7 @@ func testSharding(t *testing.T, reuseDisk string, bkt objstore.Bucket, all ...ul rec := &recorder{Bucket: bkt} insBkt := objstore.WithNoopInstr(bkt) - baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, insBkt) + baseBlockIDsFetcher := block.NewConcurrentLister(logger, insBkt) metaFetcher, err := block.NewMetaFetcher(logger, 20, insBkt, baseBlockIDsFetcher, dir, nil, []block.MetadataFilter{ block.NewTimePartitionMetaFilter(allowAllFilterConf.MinTime, allowAllFilterConf.MaxTime), block.NewLabelShardedMetaFilter(relabelConf), @@ -1441,7 +1441,7 @@ func benchBucketSeries(t testutil.TB, sampleType chunkenc.ValueType, skipChunk, } ibkt := objstore.WithNoopInstr(bkt) - baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, ibkt) + baseBlockIDsFetcher := block.NewConcurrentLister(logger, ibkt) f, err := block.NewRawMetaFetcher(logger, ibkt, baseBlockIDsFetcher) testutil.Ok(t, err) @@ -1891,7 +1891,7 @@ func TestSeries_ErrorUnmarshallingRequestHints(t *testing.T) { ) // Instance a real bucket store we'll use to query the series. - baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, instrBkt) + baseBlockIDsFetcher := block.NewConcurrentLister(logger, instrBkt) fetcher, err := block.NewMetaFetcher(logger, 10, instrBkt, baseBlockIDsFetcher, tmpDir, nil, nil) testutil.Ok(tb, err) @@ -1983,7 +1983,7 @@ func TestSeries_BlockWithMultipleChunks(t *testing.T) { testutil.Ok(t, block.Upload(context.Background(), logger, bkt, filepath.Join(headOpts.ChunkDirRoot, blk.String()), metadata.NoneFunc)) // Instance a real bucket store we'll use to query the series. - baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, instrBkt) + baseBlockIDsFetcher := block.NewConcurrentLister(logger, instrBkt) fetcher, err := block.NewMetaFetcher(logger, 10, instrBkt, baseBlockIDsFetcher, tmpDir, nil, nil) testutil.Ok(tb, err) @@ -2142,7 +2142,7 @@ func TestSeries_SeriesSortedWithoutReplicaLabels(t *testing.T) { } // Instance a real bucket store we'll use to query the series. - baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, instrBkt) + baseBlockIDsFetcher := block.NewConcurrentLister(logger, instrBkt) fetcher, err := block.NewMetaFetcher(logger, 10, instrBkt, baseBlockIDsFetcher, tmpDir, nil, nil) testutil.Ok(tb, err) @@ -2329,7 +2329,7 @@ func setupStoreForHintsTest(t *testing.T) (testutil.TB, *BucketStore, []*storepb } // Instance a real bucket store we'll use to query back the series. - baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, instrBkt) + baseBlockIDsFetcher := block.NewConcurrentLister(logger, instrBkt) fetcher, err := block.NewMetaFetcher(logger, 10, instrBkt, baseBlockIDsFetcher, tmpDir, nil, nil) testutil.Ok(tb, err) @@ -2546,7 +2546,7 @@ func TestSeries_ChunksHaveHashRepresentation(t *testing.T) { testutil.Ok(t, block.Upload(context.Background(), logger, bkt, filepath.Join(headOpts.ChunkDirRoot, blk.String()), metadata.NoneFunc)) // Instance a real bucket store we'll use to query the series. - baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, instrBkt) + baseBlockIDsFetcher := block.NewConcurrentLister(logger, instrBkt) fetcher, err := block.NewMetaFetcher(logger, 10, instrBkt, baseBlockIDsFetcher, tmpDir, nil, nil) testutil.Ok(tb, err) @@ -3522,7 +3522,7 @@ func TestBucketStoreDedupOnBlockSeriesSet(t *testing.T) { testutil.Ok(t, err) insBkt := objstore.WithNoopInstr(bkt) - baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, insBkt) + baseBlockIDsFetcher := block.NewConcurrentLister(logger, insBkt) metaFetcher, err := block.NewMetaFetcher(logger, 20, insBkt, baseBlockIDsFetcher, metaDir, nil, []block.MetadataFilter{ block.NewTimePartitionMetaFilter(allowAllFilterConf.MinTime, allowAllFilterConf.MaxTime), }) @@ -3738,7 +3738,7 @@ func TestBucketStoreStreamingSeriesLimit(t *testing.T) { testutil.Ok(t, err) insBkt := objstore.WithNoopInstr(bkt) - baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, insBkt) + baseBlockIDsFetcher := block.NewConcurrentLister(logger, insBkt) metaFetcher, err := block.NewMetaFetcher(logger, 20, insBkt, baseBlockIDsFetcher, metaDir, nil, []block.MetadataFilter{ block.NewTimePartitionMetaFilter(allowAllFilterConf.MinTime, allowAllFilterConf.MaxTime), })