From 901471ab3a648dc5ac6985f68795168a059d5187 Mon Sep 17 00:00:00 2001 From: Justin Lei Date: Thu, 30 Nov 2023 14:44:02 -0800 Subject: [PATCH] Use BucketIndexBlocksFinder instead of BucketScanBlocksFinder (#6779) * Use BucketIndexBlocksFinder instead of BucketScanBlocksFinder The use of BucketScanBlocksFinder was supposed to be removed in https://github.com/grafana/mimir/pull/6673 but its counterpart BucketIndexBlocksFinder was accidentally removed instead. This reverses that. * Remove BlocksFinderBucketScan * Update docs wrt. bucket scanning option * Remove alert and runbook for bucket scanning (cherry picked from commit ff8a70aee6d187113d0f2b18b75144a4bee5091d) --- .../mimir/manage/mimir-runbooks/_index.md | 8 - .../architecture/bucket-index/index.md | 3 - .../architecture/components/querier.md | 20 +- integration/querier_test.go | 21 +- integration/store_gateway_limits_hit_test.go | 5 +- .../metamonitoring/mixin-alerts.yaml | 13 - .../alerts.yaml | 13 - operations/mimir-mixin-compiled/alerts.yaml | 13 - .../mimir-mixin/alerts/blocks.libsonnet | 16 - pkg/querier/blocks_finder_bucket_index.go | 1 + pkg/querier/blocks_finder_bucket_scan.go | 433 --------------- pkg/querier/blocks_finder_bucket_scan_test.go | 497 ------------------ pkg/querier/blocks_store_queryable.go | 13 +- 13 files changed, 25 insertions(+), 1031 deletions(-) delete mode 100644 pkg/querier/blocks_finder_bucket_scan.go delete mode 100644 pkg/querier/blocks_finder_bucket_scan_test.go diff --git a/docs/sources/mimir/manage/mimir-runbooks/_index.md b/docs/sources/mimir/manage/mimir-runbooks/_index.md index 3fb0aa79325..f1adc4d9a9c 100644 --- a/docs/sources/mimir/manage/mimir-runbooks/_index.md +++ b/docs/sources/mimir/manage/mimir-runbooks/_index.md @@ -522,14 +522,6 @@ How to **fix** it: - Set the shard size of one or more tenants to `0`; this will shard the given tenant's rule groups across all ingesters. - Decrease the total number of ruler replicas by the number of idle replicas. -### MimirQuerierHasNotScanTheBucket - -This alert fires when a Mimir querier is not successfully scanning blocks in the storage (bucket). A querier is expected to periodically iterate the bucket to find new and deleted blocks (defaults to every 5m) and if it's not successfully synching the bucket since a long time, it may end up querying only a subset of blocks, thus leading to potentially partial results. - -How to **investigate**: - -- Look for any scan error in the querier logs (ie. networking or rate limiting issues) - ### MimirStoreGatewayHasNotSyncTheBucket This alert fires when a Mimir store-gateway is not successfully scanning blocks in the storage (bucket). A store-gateway is expected to periodically iterate the bucket to find new and deleted blocks (defaults to every 5m) and if it's not successfully synching the bucket for a long time, it may end up querying only a subset of blocks, thus leading to potentially partial results. diff --git a/docs/sources/mimir/references/architecture/bucket-index/index.md b/docs/sources/mimir/references/architecture/bucket-index/index.md index 2b8ca51b3ca..acc0dd48e31 100644 --- a/docs/sources/mimir/references/architecture/bucket-index/index.md +++ b/docs/sources/mimir/references/architecture/bucket-index/index.md @@ -11,9 +11,6 @@ weight: 50 The bucket index is a per-tenant file that contains the list of blocks and block deletion marks in the storage. The bucket index is stored in the backend object storage, is periodically updated by the compactor, and used by queriers, store-gateways, and rulers (in [internal]({{< relref "../components/ruler#internal" >}}) operational mode) to discover blocks in the storage. -The bucket index is enabled by default, but is optional. It can be disabled via `-blocks-storage.bucket-store.bucket-index.enabled=false` (or its respective YAML configuration option). -Disabling the bucket index is not recommended. - ## Benefits The [querier]({{< relref "../components/querier" >}}), [store-gateway]({{< relref "../components/store-gateway" >}}) and [ruler]({{< relref "../components/ruler" >}}) must have an almost[^1] up-to-date view of the storage bucket, in order to find the right blocks to look up at query time (querier) and to load a block's [index-header]({{< relref "../binary-index-header" >}}) (store-gateway). diff --git a/docs/sources/mimir/references/architecture/components/querier.md b/docs/sources/mimir/references/architecture/components/querier.md index 34653512f29..e2e1f7bcc0a 100644 --- a/docs/sources/mimir/references/architecture/components/querier.md +++ b/docs/sources/mimir/references/architecture/components/querier.md @@ -16,28 +16,10 @@ The querier uses the [store-gateway]({{< relref "./store-gateway" >}}) component ## How it works -To find the correct blocks to look up at query time, the querier requires an almost up-to-date view of the bucket in long-term storage. The querier performs one of the following actions to ensure that the bucket view is updated: - -1. Periodically download the [bucket index]({{< relref "../bucket-index" >}}) (default) -2. Periodically scan the bucket - -Queriers do not need any content from blocks except their metadata, which includes the minimum and maximum timestamp of samples within the block. - -### Bucket index enabled (default) - -Queriers lazily download the bucket index when they receive the first query for a given tenant. The querier caches the bucket index in memory and periodically keeps it up-to-date. +To find the correct blocks to look up at query time, queriers lazily download the bucket index when they receive the first query for a given tenant. The querier caches the bucket index in memory and periodically keeps it up-to-date. The bucket index contains a list of blocks and block deletion marks of a tenant. The querier later uses the list of blocks and block deletion marks to locate the set of blocks that need to be queried for the given query. -When the querier runs with the bucket index enabled, the querier startup time and the volume of API calls to object storage are reduced. -We recommend that you keep the bucket index enabled. - -### Bucket index disabled - -When [bucket index]({{< relref "../bucket-index" >}}) is disabled, queriers iterate over the storage bucket to discover blocks for all tenants and download the `meta.json` of each block. During this initial bucket scanning phase, a querier cannot process incoming queries and its `/ready` readiness probe endpoint will not return the HTTP status code `200`. - -When running, queriers periodically iterate over the storage bucket to discover new tenants and recently uploaded blocks. - ### Anatomy of a query request When a querier receives a query range request, the request contains the following parameters: diff --git a/integration/querier_test.go b/integration/querier_test.go index c90bdffab1b..ffb16c6d906 100644 --- a/integration/querier_test.go +++ b/integration/querier_test.go @@ -139,7 +139,6 @@ func testQuerierWithBlocksStorageRunningInMicroservicesMode(t *testing.T, stream require.NoError(t, ingester.WaitSumMetrics(e2e.Equals(2), "cortex_ingester_memory_series_removed_total")) // Start the compactor to have the bucket index created before querying. - // This is only required for tests using the bucket index, but doesn't hurt doing it for all of them. compactor := e2emimir.NewCompactor("compactor", consul.NetworkHTTPEndpoint(), commonFlags) require.NoError(t, s.StartAndWaitReady(compactor)) @@ -187,9 +186,6 @@ func testQuerierWithBlocksStorageRunningInMicroservicesMode(t *testing.T, stream // the store-gateway ring if blocks sharding is enabled. require.NoError(t, querier.WaitSumMetrics(e2e.Equals(float64(512+(512*storeGateways.NumInstances()))), "cortex_ring_tokens_total")) - // Wait until the querier has discovered the uploaded blocks. - require.NoError(t, querier.WaitSumMetrics(e2e.Equals(2), "cortex_blocks_meta_synced")) - // Wait until the store-gateway has synched the new uploaded blocks. When sharding is enabled // we don't known which store-gateway instance will synch the blocks, so we need to wait on // metrics extracted from all instances. @@ -235,6 +231,9 @@ func testQuerierWithBlocksStorageRunningInMicroservicesMode(t *testing.T, stream // thanos_store_index_cache_requests_total: ExpandedPostings: 5, Postings: 2, Series: 2 instantQueriesCount++ + // Make sure the querier is using the bucket index blocks finder. + require.NoError(t, querier.WaitSumMetrics(e2e.Greater(0), "cortex_bucket_index_loads_total")) + comparingFunction := e2e.Equals if streamingEnabled { // Some metrics can be higher when streaming is enabled. The exact number is not deterministic in every case. @@ -439,15 +438,15 @@ func TestQuerierWithBlocksStorageRunningInSingleBinaryMode(t *testing.T) { require.NoError(t, cluster.WaitSumMetrics(e2e.Equals(float64(3*cluster.NumInstances())), "cortex_ingester_memory_series_created_total")) require.NoError(t, cluster.WaitSumMetrics(e2e.Equals(float64(2*cluster.NumInstances())), "cortex_ingester_memory_series_removed_total")) - // Wait until the querier has discovered the uploaded blocks (discovered both by the querier and store-gateway). - require.NoError(t, cluster.WaitSumMetricsWithOptions(e2e.Equals(float64(2*cluster.NumInstances()*2)), []string{"cortex_blocks_meta_synced"}, e2e.WithLabelMatchers( - labels.MustNewMatcher(labels.MatchEqual, "component", "querier")))) - // Wait until the store-gateway has synched the new uploaded blocks. The number of blocks loaded // may be greater than expected if the compactor is running (there may have been compacted). const shippedBlocks = 2 require.NoError(t, cluster.WaitSumMetrics(e2e.GreaterOrEqual(float64(shippedBlocks*seriesReplicationFactor)), "cortex_bucket_store_blocks_loaded")) + // Start the compactor to have the bucket index created before querying. + compactor := e2emimir.NewCompactor("compactor", consul.NetworkHTTPEndpoint(), flags) + require.NoError(t, s.StartAndWaitReady(compactor)) + var expectedCacheRequests int // Query back the series (1 only in the storage, 1 only in the ingesters, 1 on both). @@ -822,9 +821,13 @@ func TestQuerierWithBlocksStorageOnMissingBlocksFromStorage(t *testing.T) { require.NoError(t, querier.WaitSumMetrics(e2e.Equals(512*2), "cortex_ring_tokens_total")) require.NoError(t, storeGateway.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total")) + // Start the compactor to have the bucket index created before querying. + compactor := e2emimir.NewCompactor("compactor", consul.NetworkHTTPEndpoint(), flags) + require.NoError(t, s.StartAndWaitReady(compactor)) + // Wait until the blocks are old enough for consistency check // 1 sync on startup, 3 to go over the consistency check limit explained above - require.NoError(t, querier.WaitSumMetrics(e2e.GreaterOrEqual(1+3), "cortex_blocks_meta_syncs_total")) + require.NoError(t, storeGateway.WaitSumMetrics(e2e.GreaterOrEqual(1+3), "cortex_blocks_meta_syncs_total")) // Query back the series. c, err = e2emimir.NewClient("", querier.HTTPEndpoint(), "", "", "user-1") diff --git a/integration/store_gateway_limits_hit_test.go b/integration/store_gateway_limits_hit_test.go index 6bc97f87991..334fb9bc74d 100644 --- a/integration/store_gateway_limits_hit_test.go +++ b/integration/store_gateway_limits_hit_test.go @@ -106,9 +106,10 @@ func Test_MaxSeriesAndChunksPerQueryLimitHit(t *testing.T) { // they discovered the blocks in the storage. querier := e2emimir.NewQuerier("querier", consul.NetworkHTTPEndpoint(), mergeFlags(flags, testData.additionalQuerierFlags)) storeGateway := e2emimir.NewStoreGateway("store-gateway", consul.NetworkHTTPEndpoint(), mergeFlags(flags, testData.additionalStoreGatewayFlags)) - require.NoError(t, scenario.StartAndWaitReady(querier, storeGateway)) + compactor := e2emimir.NewCompactor("compactor", consul.NetworkHTTPEndpoint(), flags) + require.NoError(t, scenario.StartAndWaitReady(querier, storeGateway, compactor)) t.Cleanup(func() { - require.NoError(t, scenario.Stop(querier, storeGateway)) + require.NoError(t, scenario.Stop(querier, storeGateway, compactor)) }) client, err = e2emimir.NewClient("", querier.HTTPEndpoint(), "", "", "test") diff --git a/operations/helm/tests/metamonitoring-values-generated/mimir-distributed/templates/metamonitoring/mixin-alerts.yaml b/operations/helm/tests/metamonitoring-values-generated/mimir-distributed/templates/metamonitoring/mixin-alerts.yaml index e5e9427e361..68946124401 100644 --- a/operations/helm/tests/metamonitoring-values-generated/mimir-distributed/templates/metamonitoring/mixin-alerts.yaml +++ b/operations/helm/tests/metamonitoring-values-generated/mimir-distributed/templates/metamonitoring/mixin-alerts.yaml @@ -772,19 +772,6 @@ spec: for: 3m labels: severity: critical - - alert: MimirQuerierHasNotScanTheBucket - annotations: - message: Mimir Querier {{ $labels.pod }} in {{ $labels.cluster }}/{{ $labels.namespace - }} has not successfully scanned the bucket since {{ $value | humanizeDuration - }}. - runbook_url: https://grafana.com/docs/mimir/latest/operators-guide/mimir-runbooks/#mimirquerierhasnotscanthebucket - expr: | - (time() - cortex_querier_blocks_last_successful_scan_timestamp_seconds > 60 * 30) - and - cortex_querier_blocks_last_successful_scan_timestamp_seconds > 0 - for: 5m - labels: - severity: critical - alert: MimirStoreGatewayHasNotSyncTheBucket annotations: message: Mimir store-gateway {{ $labels.pod }} in {{ $labels.cluster }}/{{ $labels.namespace diff --git a/operations/mimir-mixin-compiled-baremetal/alerts.yaml b/operations/mimir-mixin-compiled-baremetal/alerts.yaml index ccd02c0e933..8ea4ba6b16b 100644 --- a/operations/mimir-mixin-compiled-baremetal/alerts.yaml +++ b/operations/mimir-mixin-compiled-baremetal/alerts.yaml @@ -746,19 +746,6 @@ groups: for: 3m labels: severity: critical - - alert: MimirQuerierHasNotScanTheBucket - annotations: - message: Mimir Querier {{ $labels.instance }} in {{ $labels.cluster }}/{{ $labels.namespace - }} has not successfully scanned the bucket since {{ $value | humanizeDuration - }}. - runbook_url: https://grafana.com/docs/mimir/latest/operators-guide/mimir-runbooks/#mimirquerierhasnotscanthebucket - expr: | - (time() - cortex_querier_blocks_last_successful_scan_timestamp_seconds > 60 * 30) - and - cortex_querier_blocks_last_successful_scan_timestamp_seconds > 0 - for: 5m - labels: - severity: critical - alert: MimirStoreGatewayHasNotSyncTheBucket annotations: message: Mimir store-gateway {{ $labels.instance }} in {{ $labels.cluster }}/{{ diff --git a/operations/mimir-mixin-compiled/alerts.yaml b/operations/mimir-mixin-compiled/alerts.yaml index 39ed7bfa2b9..ff5084878cd 100644 --- a/operations/mimir-mixin-compiled/alerts.yaml +++ b/operations/mimir-mixin-compiled/alerts.yaml @@ -760,19 +760,6 @@ groups: for: 3m labels: severity: critical - - alert: MimirQuerierHasNotScanTheBucket - annotations: - message: Mimir Querier {{ $labels.pod }} in {{ $labels.cluster }}/{{ $labels.namespace - }} has not successfully scanned the bucket since {{ $value | humanizeDuration - }}. - runbook_url: https://grafana.com/docs/mimir/latest/operators-guide/mimir-runbooks/#mimirquerierhasnotscanthebucket - expr: | - (time() - cortex_querier_blocks_last_successful_scan_timestamp_seconds > 60 * 30) - and - cortex_querier_blocks_last_successful_scan_timestamp_seconds > 0 - for: 5m - labels: - severity: critical - alert: MimirStoreGatewayHasNotSyncTheBucket annotations: message: Mimir store-gateway {{ $labels.pod }} in {{ $labels.cluster }}/{{ $labels.namespace diff --git a/operations/mimir-mixin/alerts/blocks.libsonnet b/operations/mimir-mixin/alerts/blocks.libsonnet index d2404f637e4..2c969bf24d8 100644 --- a/operations/mimir-mixin/alerts/blocks.libsonnet +++ b/operations/mimir-mixin/alerts/blocks.libsonnet @@ -183,22 +183,6 @@ message: '%(product)s Ingester %(alert_instance_variable)s in %(alert_aggregation_variables)s is failing to write to TSDB WAL.' % $._config, }, }, - { - // Alert if the querier is not successfully scanning the bucket. - alert: $.alertName('QuerierHasNotScanTheBucket'), - 'for': '5m', - expr: ||| - (time() - cortex_querier_blocks_last_successful_scan_timestamp_seconds > 60 * 30) - and - cortex_querier_blocks_last_successful_scan_timestamp_seconds > 0 - |||, - labels: { - severity: 'critical', - }, - annotations: { - message: '%(product)s Querier %(alert_instance_variable)s in %(alert_aggregation_variables)s has not successfully scanned the bucket since {{ $value | humanizeDuration }}.' % $._config, - }, - }, { // Alert if the store-gateway is not successfully synching the bucket. alert: $.alertName('StoreGatewayHasNotSyncTheBucket'), diff --git a/pkg/querier/blocks_finder_bucket_index.go b/pkg/querier/blocks_finder_bucket_index.go index db47cd32574..5db9e8fd0c6 100644 --- a/pkg/querier/blocks_finder_bucket_index.go +++ b/pkg/querier/blocks_finder_bucket_index.go @@ -24,6 +24,7 @@ import ( var ( errBucketIndexBlocksFinderNotRunning = errors.New("bucket index blocks finder is not running") + errInvalidBlocksRange = errors.New("invalid blocks time range") ) type BucketIndexBlocksFinderConfig struct { diff --git a/pkg/querier/blocks_finder_bucket_scan.go b/pkg/querier/blocks_finder_bucket_scan.go deleted file mode 100644 index 6660ec5dda5..00000000000 --- a/pkg/querier/blocks_finder_bucket_scan.go +++ /dev/null @@ -1,433 +0,0 @@ -// SPDX-License-Identifier: AGPL-3.0-only -// Provenance-includes-location: https://github.com/cortexproject/cortex/blob/master/pkg/querier/blocks_finder_bucket_scan.go -// Provenance-includes-license: Apache-2.0 -// Provenance-includes-copyright: The Cortex Authors. - -package querier - -import ( - "context" - "path" - "path/filepath" - "sort" - "strings" - "sync" - "time" - - "github.com/go-kit/log" - "github.com/go-kit/log/level" - "github.com/grafana/dskit/backoff" - "github.com/grafana/dskit/services" - "github.com/oklog/ulid" - "github.com/pkg/errors" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" - tsdb_errors "github.com/prometheus/prometheus/tsdb/errors" - "github.com/thanos-io/objstore" - - "github.com/grafana/mimir/pkg/storage/bucket" - mimir_tsdb "github.com/grafana/mimir/pkg/storage/tsdb" - "github.com/grafana/mimir/pkg/storage/tsdb/block" - "github.com/grafana/mimir/pkg/storage/tsdb/bucketindex" - "github.com/grafana/mimir/pkg/storegateway" - "github.com/grafana/mimir/pkg/util" - util_log "github.com/grafana/mimir/pkg/util/log" -) - -var ( - errBucketScanBlocksFinderNotRunning = errors.New("bucket scan blocks finder is not running") - errInvalidBlocksRange = errors.New("invalid blocks time range") -) - -type BucketScanBlocksFinderConfig struct { - ScanInterval time.Duration - TenantsConcurrency int - MetasConcurrency int - CacheDir string - IgnoreDeletionMarksDelay time.Duration -} - -// BucketScanBlocksFinder is a BlocksFinder implementation periodically scanning the bucket to discover blocks. -type BucketScanBlocksFinder struct { - services.Service - - cfg BucketScanBlocksFinderConfig - cfgProvider bucket.TenantConfigProvider - logger log.Logger - bucketClient objstore.Bucket - fetchersMetrics *storegateway.MetadataFetcherMetrics - usersScanner *mimir_tsdb.UsersScanner - - // We reuse the metadata fetcher instance for a given tenant both because of performance - // reasons (the fetcher keeps a in-memory cache) and being able to collect and group metrics. - fetchersMx sync.Mutex - fetchers map[string]userFetcher - - // Keep the per-tenant/user metas found during the last run. - userMx sync.RWMutex - userMetas map[string]bucketindex.Blocks - userMetasLookup map[string]map[ulid.ULID]*bucketindex.Block - userDeletionMarks map[string]map[ulid.ULID]*bucketindex.BlockDeletionMark - - scanDuration prometheus.Histogram - scanLastSuccess prometheus.Gauge -} - -func NewBucketScanBlocksFinder(cfg BucketScanBlocksFinderConfig, bucketClient objstore.Bucket, cfgProvider bucket.TenantConfigProvider, logger log.Logger, reg prometheus.Registerer) *BucketScanBlocksFinder { - d := &BucketScanBlocksFinder{ - cfg: cfg, - cfgProvider: cfgProvider, - logger: logger, - bucketClient: bucketClient, - fetchers: make(map[string]userFetcher), - usersScanner: mimir_tsdb.NewUsersScanner(bucketClient, mimir_tsdb.AllUsers, logger), - userMetas: make(map[string]bucketindex.Blocks), - userMetasLookup: make(map[string]map[ulid.ULID]*bucketindex.Block), - userDeletionMarks: map[string]map[ulid.ULID]*bucketindex.BlockDeletionMark{}, - fetchersMetrics: storegateway.NewMetadataFetcherMetrics(), - scanDuration: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{ - Name: "cortex_querier_blocks_scan_duration_seconds", - Help: "The total time it takes to run a full blocks scan across the storage.", - Buckets: []float64{1, 10, 20, 30, 60, 120, 180, 240, 300, 600}, - }), - scanLastSuccess: promauto.With(reg).NewGauge(prometheus.GaugeOpts{ - Name: "cortex_querier_blocks_last_successful_scan_timestamp_seconds", - Help: "Unix timestamp of the last successful blocks scan.", - }), - } - - if reg != nil { - prometheus.WrapRegistererWith(prometheus.Labels{"component": "querier"}, reg).MustRegister(d.fetchersMetrics) - } - - // Apply a jitter to the sync frequency in order to increase the probability - // of hitting the shared cache (if any). - scanInterval := util.DurationWithJitter(cfg.ScanInterval, 0.2) - d.Service = services.NewTimerService(scanInterval, d.starting, d.scan, nil) - - return d -} - -// GetBlocks returns known blocks for userID containing samples within the range minT -// and maxT (milliseconds, both included). Returned blocks are sorted by MaxTime descending. -func (d *BucketScanBlocksFinder) GetBlocks(_ context.Context, userID string, minT, maxT int64) (bucketindex.Blocks, map[ulid.ULID]*bucketindex.BlockDeletionMark, error) { - // We need to ensure the initial full bucket scan succeeded. - if d.State() != services.Running { - return nil, nil, errBucketScanBlocksFinderNotRunning - } - if maxT < minT { - return nil, nil, errInvalidBlocksRange - } - - d.userMx.RLock() - defer d.userMx.RUnlock() - - userMetas, ok := d.userMetas[userID] - if !ok { - return nil, nil, nil - } - - // Given we do expect the large majority of queries to have a time range close - // to "now", we're going to find matching blocks iterating the list in reverse order. - var matchingMetas bucketindex.Blocks - for i := len(userMetas) - 1; i >= 0; i-- { - if userMetas[i].Within(minT, maxT) { - matchingMetas = append(matchingMetas, userMetas[i]) - } - - // We can safely break the loop because metas are sorted by MaxTime. - if userMetas[i].MaxTime <= minT { - break - } - } - - // Filter deletion marks by matching blocks only. - matchingDeletionMarks := map[ulid.ULID]*bucketindex.BlockDeletionMark{} - if userDeletionMarks, ok := d.userDeletionMarks[userID]; ok { - for _, m := range matchingMetas { - if d := userDeletionMarks[m.ID]; d != nil { - matchingDeletionMarks[m.ID] = d - } - } - } - - return matchingMetas, matchingDeletionMarks, nil -} - -func (d *BucketScanBlocksFinder) starting(ctx context.Context) error { - // Before the service is in the running state it must have successfully - // complete the initial scan. - if err := d.scanBucket(ctx); err != nil { - level.Error(d.logger).Log("msg", "unable to run the initial blocks scan", "err", err) - return err - } - - return nil -} - -func (d *BucketScanBlocksFinder) scan(ctx context.Context) error { - if err := d.scanBucket(ctx); err != nil { - level.Error(d.logger).Log("msg", "failed to scan bucket storage to find blocks", "err", err) - } - - // Never return error, otherwise the service terminates. - return nil -} - -func (d *BucketScanBlocksFinder) scanBucket(ctx context.Context) (returnErr error) { - defer func(start time.Time) { - d.scanDuration.Observe(time.Since(start).Seconds()) - if returnErr == nil { - d.scanLastSuccess.SetToCurrentTime() - } - }(time.Now()) - - // Discover all users first. This helps cacheability of the object store call. - userIDs, _, err := d.usersScanner.ScanUsers(ctx) - if err != nil { - return err - } - - jobsChan := make(chan string) - resMx := sync.Mutex{} - resMetas := map[string]bucketindex.Blocks{} - resMetasLookup := map[string]map[ulid.ULID]*bucketindex.Block{} - resDeletionMarks := map[string]map[ulid.ULID]*bucketindex.BlockDeletionMark{} - resErrs := tsdb_errors.NewMulti() - - // Create a pool of workers which will synchronize metas. The pool size - // is limited in order to avoid to concurrently sync a lot of tenants in - // a large cluster. - wg := &sync.WaitGroup{} - wg.Add(d.cfg.TenantsConcurrency) - - for i := 0; i < d.cfg.TenantsConcurrency; i++ { - go func() { - defer wg.Done() - - for userID := range jobsChan { - metas, deletionMarks, err := d.scanUserBlocksWithRetries(ctx, userID) - - // Build the lookup map. - lookup := map[ulid.ULID]*bucketindex.Block{} - for _, m := range metas { - lookup[m.ID] = m - } - - resMx.Lock() - if err != nil { - resErrs.Add(err) - } else { - resMetas[userID] = metas - resMetasLookup[userID] = lookup - resDeletionMarks[userID] = deletionMarks - } - resMx.Unlock() - } - }() - } - - // Push a job for each user whose blocks need to be discovered. -pushJobsLoop: - for _, userID := range userIDs { - select { - case jobsChan <- userID: - // Nothing to do. - case <-ctx.Done(): - resMx.Lock() - resErrs.Add(ctx.Err()) - resMx.Unlock() - break pushJobsLoop - } - } - - // Wait until all workers completed. - close(jobsChan) - wg.Wait() - - d.userMx.Lock() - if len(resErrs) == 0 { - // Replace the map, so that we discard tenants fully deleted from storage. - d.userMetas = resMetas - d.userMetasLookup = resMetasLookup - d.userDeletionMarks = resDeletionMarks - } else { - // If an error occurred, we prefer to partially update the metas map instead of - // not updating it at all. At least we'll update blocks for the successful tenants. - for userID, metas := range resMetas { - d.userMetas[userID] = metas - } - - for userID, metas := range resMetasLookup { - d.userMetasLookup[userID] = metas - } - - for userID, deletionMarks := range resDeletionMarks { - d.userDeletionMarks[userID] = deletionMarks - } - } - d.userMx.Unlock() - - return resErrs.Err() -} - -// scanUserBlocksWithRetries runs scanUserBlocks() retrying multiple times -// in case of error. -func (d *BucketScanBlocksFinder) scanUserBlocksWithRetries(ctx context.Context, userID string) (metas bucketindex.Blocks, deletionMarks map[ulid.ULID]*bucketindex.BlockDeletionMark, err error) { - retries := backoff.New(ctx, backoff.Config{ - MinBackoff: time.Second, - MaxBackoff: 30 * time.Second, - MaxRetries: 3, - }) - - for retries.Ongoing() { - metas, deletionMarks, err = d.scanUserBlocks(ctx, userID) - if err == nil { - return - } - - retries.Wait() - } - - return -} - -func (d *BucketScanBlocksFinder) scanUserBlocks(ctx context.Context, userID string) (bucketindex.Blocks, map[ulid.ULID]*bucketindex.BlockDeletionMark, error) { - fetcher, userBucket, deletionMarkFilter, err := d.getOrCreateMetaFetcher(userID) - if err != nil { - return nil, nil, errors.Wrapf(err, "create meta fetcher for user %s", userID) - } - - metas, partials, err := fetcher.Fetch(ctx) - if err != nil { - return nil, nil, errors.Wrapf(err, "scan blocks for user %s", userID) - } - - // In case we've found any partial block we log about it but continue cause we don't want - // to break the scanner just because there's a spurious block. - if len(partials) > 0 { - logPartialBlocks(userID, partials, d.logger) - } - - res := make(bucketindex.Blocks, 0, len(metas)) - for _, m := range metas { - blockMeta := bucketindex.BlockFromThanosMeta(*m) - - // If the block is already known, we can get the remaining attributes from there - // because a block is immutable. - prevMeta := d.getBlockMeta(userID, m.ULID) - if prevMeta != nil { - blockMeta.UploadedAt = prevMeta.UploadedAt - } else { - attrs, err := userBucket.Attributes(ctx, path.Join(m.ULID.String(), block.MetaFilename)) - if err != nil { - return nil, nil, errors.Wrapf(err, "read %s attributes of block %s for user %s", block.MetaFilename, m.ULID.String(), userID) - } - - // Since the meta.json file is the last file of a block being uploaded and it's immutable - // we can safely assume that the last modified timestamp of the meta.json is the time when - // the block has completed to be uploaded. - blockMeta.UploadedAt = attrs.LastModified.Unix() - } - - res = append(res, blockMeta) - } - - // The blocks scanner expects all blocks to be sorted by max time. - sortBlocksByMaxTime(res) - - // Convert deletion marks to our own data type. - marks := map[ulid.ULID]*bucketindex.BlockDeletionMark{} - for id, m := range deletionMarkFilter.DeletionMarkBlocks() { - marks[id] = bucketindex.BlockDeletionMarkFromThanosMarker(m) - } - - return res, marks, nil -} - -func (d *BucketScanBlocksFinder) getOrCreateMetaFetcher(userID string) (block.MetadataFetcher, objstore.Bucket, *block.IgnoreDeletionMarkFilter, error) { - d.fetchersMx.Lock() - defer d.fetchersMx.Unlock() - - if f, ok := d.fetchers[userID]; ok { - return f.metadataFetcher, f.userBucket, f.deletionMarkFilter, nil - } - - fetcher, userBucket, deletionMarkFilter, err := d.createMetaFetcher(userID) - if err != nil { - return nil, nil, nil, err - } - - d.fetchers[userID] = userFetcher{ - metadataFetcher: fetcher, - deletionMarkFilter: deletionMarkFilter, - userBucket: userBucket, - } - - return fetcher, userBucket, deletionMarkFilter, nil -} - -func (d *BucketScanBlocksFinder) createMetaFetcher(userID string) (block.MetadataFetcher, objstore.Bucket, *block.IgnoreDeletionMarkFilter, error) { - userLogger := util_log.WithUserID(userID, d.logger) - userBucket := bucket.NewUserBucketClient(userID, d.bucketClient, d.cfgProvider) - userReg := prometheus.NewRegistry() - - // The following filters have been intentionally omitted: - // - Deduplicate filter: omitted because it could cause troubles with the consistency check if - // we "hide" source blocks because recently compacted by the compactor before the store-gateway instances - // discover and load the compacted ones. - deletionMarkFilter := block.NewIgnoreDeletionMarkFilter(userLogger, userBucket, d.cfg.IgnoreDeletionMarksDelay, d.cfg.MetasConcurrency) - filters := []block.MetadataFilter{deletionMarkFilter} - - f, err := block.NewMetaFetcher( - userLogger, - d.cfg.MetasConcurrency, - userBucket, - // The fetcher stores cached metas in the "meta-syncer/" sub directory. - filepath.Join(d.cfg.CacheDir, userID), - userReg, - filters, - ) - if err != nil { - return nil, nil, nil, err - } - - d.fetchersMetrics.AddUserRegistry(userID, userReg) - return f, userBucket, deletionMarkFilter, nil -} - -func (d *BucketScanBlocksFinder) getBlockMeta(userID string, blockID ulid.ULID) *bucketindex.Block { - d.userMx.RLock() - defer d.userMx.RUnlock() - - metas, ok := d.userMetasLookup[userID] - if !ok { - return nil - } - - return metas[blockID] -} - -func sortBlocksByMaxTime(blocks bucketindex.Blocks) { - sort.Slice(blocks, func(i, j int) bool { - return blocks[i].MaxTime < blocks[j].MaxTime - }) -} - -func logPartialBlocks(userID string, partials map[ulid.ULID]error, logger log.Logger) { - ids := make([]string, 0, len(partials)) - errs := make([]string, 0, len(partials)) - - for id, err := range partials { - ids = append(ids, id.String()) - errs = append(errs, err.Error()) - } - - level.Warn(logger).Log("msg", "found partial blocks", "user", userID, "blocks", strings.Join(ids, ","), "err", strings.Join(errs, ",")) -} - -type userFetcher struct { - metadataFetcher block.MetadataFetcher - deletionMarkFilter *block.IgnoreDeletionMarkFilter - userBucket objstore.Bucket -} diff --git a/pkg/querier/blocks_finder_bucket_scan_test.go b/pkg/querier/blocks_finder_bucket_scan_test.go deleted file mode 100644 index 6279f26257d..00000000000 --- a/pkg/querier/blocks_finder_bucket_scan_test.go +++ /dev/null @@ -1,497 +0,0 @@ -// SPDX-License-Identifier: AGPL-3.0-only -// Provenance-includes-location: https://github.com/cortexproject/cortex/blob/master/pkg/querier/blocks_finder_bucket_scan_test.go -// Provenance-includes-license: Apache-2.0 -// Provenance-includes-copyright: The Cortex Authors. - -package querier - -import ( - "context" - "fmt" - "os" - "path" - "strings" - "testing" - "time" - - "github.com/go-kit/log" - "github.com/grafana/dskit/services" - "github.com/oklog/ulid" - "github.com/pkg/errors" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/testutil" - "github.com/prometheus/prometheus/tsdb" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/mock" - "github.com/stretchr/testify/require" - "github.com/thanos-io/objstore" - - "github.com/grafana/mimir/pkg/storage/bucket" - mimir_tsdb "github.com/grafana/mimir/pkg/storage/tsdb" - "github.com/grafana/mimir/pkg/storage/tsdb/block" - "github.com/grafana/mimir/pkg/storage/tsdb/bucketindex" - mimir_testutil "github.com/grafana/mimir/pkg/storage/tsdb/testutil" -) - -func TestBucketScanBlocksFinder_InitialScan(t *testing.T) { - ctx := context.Background() - s, bucket, _, reg := prepareBucketScanBlocksFinder(t, prepareBucketScanBlocksFinderConfig()) - - user1Block1 := block.MockStorageBlock(t, bucket, "user-1", 10, 20) - user1Block2 := block.MockStorageBlock(t, bucket, "user-1", 20, 30) - user2Block1 := block.MockStorageBlock(t, bucket, "user-2", 10, 20) - user2Mark1 := bucketindex.BlockDeletionMarkFromThanosMarker(block.MockStorageDeletionMark(t, bucket, "user-2", user2Block1)) - - require.NoError(t, services.StartAndAwaitRunning(ctx, s)) - - blocks, deletionMarks, err := s.GetBlocks(ctx, "user-1", 0, 30) - require.NoError(t, err) - require.Equal(t, 2, len(blocks)) - assert.Equal(t, user1Block2.ULID, blocks[0].ID) - assert.Equal(t, user1Block1.ULID, blocks[1].ID) - assert.WithinDuration(t, time.Now(), blocks[0].GetUploadedAt(), 5*time.Second) - assert.WithinDuration(t, time.Now(), blocks[1].GetUploadedAt(), 5*time.Second) - assert.Empty(t, deletionMarks) - - blocks, deletionMarks, err = s.GetBlocks(ctx, "user-2", 0, 30) - require.NoError(t, err) - require.Equal(t, 1, len(blocks)) - assert.Equal(t, user2Block1.ULID, blocks[0].ID) - assert.WithinDuration(t, time.Now(), blocks[0].GetUploadedAt(), 5*time.Second) - assert.Equal(t, map[ulid.ULID]*bucketindex.BlockDeletionMark{ - user2Block1.ULID: user2Mark1, - }, deletionMarks) - - assert.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(` - # HELP cortex_blocks_meta_syncs_total Total blocks metadata synchronization attempts - # TYPE cortex_blocks_meta_syncs_total counter - cortex_blocks_meta_syncs_total{component="querier"} 2 - - # HELP cortex_blocks_meta_sync_failures_total Total blocks metadata synchronization failures - # TYPE cortex_blocks_meta_sync_failures_total counter - cortex_blocks_meta_sync_failures_total{component="querier"} 0 - `), - "cortex_blocks_meta_syncs_total", - "cortex_blocks_meta_sync_failures_total", - )) - - assert.Greater(t, testutil.ToFloat64(s.scanLastSuccess), float64(0)) -} - -func TestBucketScanBlocksFinder_InitialScanFailure(t *testing.T) { - cacheDir := t.TempDir() - - ctx := context.Background() - bucket := &bucket.ClientMock{} - reg := prometheus.NewPedanticRegistry() - - cfg := prepareBucketScanBlocksFinderConfig() - cfg.CacheDir = cacheDir - - s := NewBucketScanBlocksFinder(cfg, bucket, nil, log.NewNopLogger(), reg) - defer func() { - s.StopAsync() - s.AwaitTerminated(context.Background()) //nolint: errcheck - }() - - // Mock the storage to simulate a failure when reading objects. - bucket.MockIter("", []string{"user-1"}, nil) - bucket.MockIter("user-1/", []string{"user-1/01DTVP434PA9VFXSW2JKB3392D"}, nil) - bucket.MockExists(path.Join("user-1", mimir_tsdb.TenantDeletionMarkPath), false, nil) - bucket.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/meta.json", "invalid", errors.New("mocked error")) - - require.NoError(t, s.StartAsync(ctx)) - require.Error(t, s.AwaitRunning(ctx)) - - blocks, deletionMarks, err := s.GetBlocks(ctx, "user-1", 0, 30) - assert.Equal(t, errBucketScanBlocksFinderNotRunning, err) - assert.Nil(t, blocks) - assert.Nil(t, deletionMarks) - - assert.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(` - # HELP cortex_blocks_meta_syncs_total Total blocks metadata synchronization attempts - # TYPE cortex_blocks_meta_syncs_total counter - cortex_blocks_meta_syncs_total{component="querier"} 3 - - # HELP cortex_blocks_meta_sync_failures_total Total blocks metadata synchronization failures - # TYPE cortex_blocks_meta_sync_failures_total counter - cortex_blocks_meta_sync_failures_total{component="querier"} 3 - - # HELP cortex_querier_blocks_last_successful_scan_timestamp_seconds Unix timestamp of the last successful blocks scan. - # TYPE cortex_querier_blocks_last_successful_scan_timestamp_seconds gauge - cortex_querier_blocks_last_successful_scan_timestamp_seconds 0 - `), - "cortex_blocks_meta_syncs_total", - "cortex_blocks_meta_sync_failures_total", - "cortex_querier_blocks_last_successful_scan_timestamp_seconds", - )) -} - -func TestBucketScanBlocksFinder_StopWhileRunningTheInitialScanOnManyTenants(t *testing.T) { - tenantIDs := []string{"1", "2", "3", "4", "5", "6", "7", "8", "9", "10"} - - // Mock the bucket to introduce a 1s sleep while iterating each tenant in the bucket. - bucket := &bucket.ClientMock{} - bucket.MockIter("", tenantIDs, nil) - for _, tenantID := range tenantIDs { - bucket.MockIterWithCallback(tenantID+"/", []string{}, nil, func() { - time.Sleep(time.Second) - }) - bucket.MockExists(path.Join(tenantID, mimir_tsdb.TenantDeletionMarkPath), false, nil) - } - - cacheDir := t.TempDir() - - cfg := prepareBucketScanBlocksFinderConfig() - cfg.CacheDir = cacheDir - cfg.MetasConcurrency = 1 - cfg.TenantsConcurrency = 1 - - s := NewBucketScanBlocksFinder(cfg, bucket, nil, log.NewLogfmtLogger(os.Stdout), nil) - - // Start the scanner, let it run for 1s and then issue a stop. - require.NoError(t, s.StartAsync(context.Background())) - time.Sleep(time.Second) - - stopTime := time.Now() - _ = services.StopAndAwaitTerminated(context.Background(), s) - - // Expect to stop before having completed the full sync (which is expected to take - // 1s for each tenant due to the delay introduced in the mock). - assert.Less(t, time.Since(stopTime).Nanoseconds(), (3 * time.Second).Nanoseconds()) -} - -func TestBucketScanBlocksFinder_StopWhileRunningTheInitialScanOnManyBlocks(t *testing.T) { - var blockPaths []string - for i := 1; i <= 10; i++ { - blockPaths = append(blockPaths, "user-1/"+ulid.MustNew(uint64(i), nil).String()) - } - - // Mock the bucket to introduce a 1s sleep while syncing each block in the bucket. - bkt := &bucket.ClientMock{} - bkt.MockIter("", []string{"user-1"}, nil) - bkt.MockIter("user-1/", blockPaths, nil) - - // We return that all files don't exist, but introduce a 1s delay for each call. - sleep := func(_ mock.Arguments) { - time.Sleep(time.Second) - } - bkt.On("Exists", mock.Anything, mock.Anything).Return(false, nil).Run(sleep) - bkt.On("Get", mock.Anything, mock.Anything).Return(nil, bucket.ErrObjectDoesNotExist).Run(sleep) - - cfg := prepareBucketScanBlocksFinderConfig() - cfg.CacheDir = t.TempDir() - cfg.MetasConcurrency = 1 - cfg.TenantsConcurrency = 1 - - s := NewBucketScanBlocksFinder(cfg, bkt, nil, log.NewLogfmtLogger(os.Stdout), nil) - - // Start the scanner, let it run for 1s and then issue a stop. - require.NoError(t, s.StartAsync(context.Background())) - time.Sleep(time.Second) - - stopTime := time.Now() - _ = services.StopAndAwaitTerminated(context.Background(), s) - - // Expect to stop before having completed the full sync (which is expected to take - // 1s for each block due to the delay introduced in the mock). - assert.Less(t, time.Since(stopTime).Nanoseconds(), (3 * time.Second).Nanoseconds()) -} - -func TestBucketScanBlocksFinder_PeriodicScanFindsNewUser(t *testing.T) { - ctx := context.Background() - s, bucket, _, _ := prepareBucketScanBlocksFinder(t, prepareBucketScanBlocksFinderConfig()) - - require.NoError(t, services.StartAndAwaitRunning(ctx, s)) - - blocks, deletionMarks, err := s.GetBlocks(ctx, "user-1", 0, 30) - require.NoError(t, err) - require.Equal(t, 0, len(blocks)) - assert.Empty(t, deletionMarks) - - block1 := block.MockStorageBlock(t, bucket, "user-1", 10, 20) - block2 := block.MockStorageBlock(t, bucket, "user-1", 20, 30) - mark2 := bucketindex.BlockDeletionMarkFromThanosMarker(block.MockStorageDeletionMark(t, bucket, "user-1", block2)) - - // Trigger a periodic sync - require.NoError(t, s.scan(ctx)) - - blocks, deletionMarks, err = s.GetBlocks(ctx, "user-1", 0, 30) - require.NoError(t, err) - require.Equal(t, 2, len(blocks)) - assert.Equal(t, block2.ULID, blocks[0].ID) - assert.Equal(t, block1.ULID, blocks[1].ID) - assert.WithinDuration(t, time.Now(), blocks[0].GetUploadedAt(), 5*time.Second) - assert.WithinDuration(t, time.Now(), blocks[1].GetUploadedAt(), 5*time.Second) - assert.Equal(t, map[ulid.ULID]*bucketindex.BlockDeletionMark{ - block2.ULID: mark2, - }, deletionMarks) -} - -func TestBucketScanBlocksFinder_PeriodicScanFindsNewBlock(t *testing.T) { - ctx := context.Background() - s, bucket, _, _ := prepareBucketScanBlocksFinder(t, prepareBucketScanBlocksFinderConfig()) - - block1 := block.MockStorageBlock(t, bucket, "user-1", 10, 20) - - require.NoError(t, services.StartAndAwaitRunning(ctx, s)) - - blocks, deletionMarks, err := s.GetBlocks(ctx, "user-1", 0, 30) - require.NoError(t, err) - require.Equal(t, 1, len(blocks)) - assert.Equal(t, block1.ULID, blocks[0].ID) - assert.WithinDuration(t, time.Now(), blocks[0].GetUploadedAt(), 5*time.Second) - assert.Empty(t, deletionMarks) - - block2 := block.MockStorageBlock(t, bucket, "user-1", 20, 30) - - // Trigger a periodic sync - require.NoError(t, s.scan(ctx)) - - blocks, deletionMarks, err = s.GetBlocks(ctx, "user-1", 0, 30) - require.NoError(t, err) - require.Equal(t, 2, len(blocks)) - assert.Equal(t, block2.ULID, blocks[0].ID) - assert.Equal(t, block1.ULID, blocks[1].ID) - assert.WithinDuration(t, time.Now(), blocks[0].GetUploadedAt(), 5*time.Second) - assert.WithinDuration(t, time.Now(), blocks[1].GetUploadedAt(), 5*time.Second) - assert.Empty(t, deletionMarks) -} - -func TestBucketScanBlocksFinder_PeriodicScanFindsBlockMarkedForDeletion(t *testing.T) { - ctx := context.Background() - s, bucket, _, _ := prepareBucketScanBlocksFinder(t, prepareBucketScanBlocksFinderConfig()) - - block1 := block.MockStorageBlock(t, bucket, "user-1", 10, 20) - block2 := block.MockStorageBlock(t, bucket, "user-1", 20, 30) - - require.NoError(t, services.StartAndAwaitRunning(ctx, s)) - - blocks, deletionMarks, err := s.GetBlocks(ctx, "user-1", 0, 30) - require.NoError(t, err) - require.Equal(t, 2, len(blocks)) - assert.Equal(t, block2.ULID, blocks[0].ID) - assert.Equal(t, block1.ULID, blocks[1].ID) - assert.Empty(t, deletionMarks) - - mark1 := bucketindex.BlockDeletionMarkFromThanosMarker(block.MockStorageDeletionMark(t, bucket, "user-1", block1)) - - // Trigger a periodic sync - require.NoError(t, s.scan(ctx)) - - blocks, deletionMarks, err = s.GetBlocks(ctx, "user-1", 0, 30) - require.NoError(t, err) - require.Equal(t, 2, len(blocks)) - assert.Equal(t, block2.ULID, blocks[0].ID) - assert.Equal(t, block1.ULID, blocks[1].ID) - assert.Equal(t, map[ulid.ULID]*bucketindex.BlockDeletionMark{ - block1.ULID: mark1, - }, deletionMarks) -} - -func TestBucketScanBlocksFinder_PeriodicScanFindsDeletedBlock(t *testing.T) { - ctx := context.Background() - s, bucket, _, _ := prepareBucketScanBlocksFinder(t, prepareBucketScanBlocksFinderConfig()) - - block1 := block.MockStorageBlock(t, bucket, "user-1", 10, 20) - block2 := block.MockStorageBlock(t, bucket, "user-1", 20, 30) - - require.NoError(t, services.StartAndAwaitRunning(ctx, s)) - - blocks, deletionMarks, err := s.GetBlocks(ctx, "user-1", 0, 30) - require.NoError(t, err) - require.Equal(t, 2, len(blocks)) - assert.Equal(t, block2.ULID, blocks[0].ID) - assert.Equal(t, block1.ULID, blocks[1].ID) - assert.Empty(t, deletionMarks) - - require.NoError(t, bucket.Delete(ctx, fmt.Sprintf("%s/%s", "user-1", block1.ULID.String()))) - - // Trigger a periodic sync - require.NoError(t, s.scan(ctx)) - - blocks, deletionMarks, err = s.GetBlocks(ctx, "user-1", 0, 30) - require.NoError(t, err) - require.Equal(t, 1, len(blocks)) - assert.Equal(t, block2.ULID, blocks[0].ID) - assert.Empty(t, deletionMarks) -} - -func TestBucketScanBlocksFinder_PeriodicScanFindsDeletedUser(t *testing.T) { - ctx := context.Background() - s, bucket, _, _ := prepareBucketScanBlocksFinder(t, prepareBucketScanBlocksFinderConfig()) - - block1 := block.MockStorageBlock(t, bucket, "user-1", 10, 20) - block2 := block.MockStorageBlock(t, bucket, "user-1", 20, 30) - - require.NoError(t, services.StartAndAwaitRunning(ctx, s)) - - blocks, deletionMarks, err := s.GetBlocks(ctx, "user-1", 0, 30) - require.NoError(t, err) - require.Equal(t, 2, len(blocks)) - assert.Equal(t, block2.ULID, blocks[0].ID) - assert.Equal(t, block1.ULID, blocks[1].ID) - assert.Empty(t, deletionMarks) - - require.NoError(t, bucket.Delete(ctx, "user-1")) - - // Trigger a periodic sync - require.NoError(t, s.scan(ctx)) - - blocks, deletionMarks, err = s.GetBlocks(ctx, "user-1", 0, 30) - require.NoError(t, err) - require.Equal(t, 0, len(blocks)) - assert.Empty(t, deletionMarks) -} - -func TestBucketScanBlocksFinder_PeriodicScanFindsUserWhichWasPreviouslyDeleted(t *testing.T) { - ctx := context.Background() - s, bucket, _, _ := prepareBucketScanBlocksFinder(t, prepareBucketScanBlocksFinderConfig()) - - block1 := block.MockStorageBlock(t, bucket, "user-1", 10, 20) - block2 := block.MockStorageBlock(t, bucket, "user-1", 20, 30) - - require.NoError(t, services.StartAndAwaitRunning(ctx, s)) - - blocks, deletionMarks, err := s.GetBlocks(ctx, "user-1", 0, 40) - require.NoError(t, err) - require.Equal(t, 2, len(blocks)) - assert.Equal(t, block2.ULID, blocks[0].ID) - assert.Equal(t, block1.ULID, blocks[1].ID) - assert.Empty(t, deletionMarks) - - require.NoError(t, bucket.Delete(ctx, "user-1")) - - // Trigger a periodic sync - require.NoError(t, s.scan(ctx)) - - blocks, deletionMarks, err = s.GetBlocks(ctx, "user-1", 0, 40) - require.NoError(t, err) - require.Equal(t, 0, len(blocks)) - assert.Empty(t, deletionMarks) - - block3 := block.MockStorageBlock(t, bucket, "user-1", 30, 40) - - // Trigger a periodic sync - require.NoError(t, s.scan(ctx)) - - blocks, deletionMarks, err = s.GetBlocks(ctx, "user-1", 0, 40) - require.NoError(t, err) - require.Equal(t, 1, len(blocks)) - assert.Equal(t, block3.ULID, blocks[0].ID) - assert.Empty(t, deletionMarks) -} - -func TestBucketScanBlocksFinder_GetBlocks(t *testing.T) { - ctx := context.Background() - s, bucket, _, _ := prepareBucketScanBlocksFinder(t, prepareBucketScanBlocksFinderConfig()) - - block1 := block.MockStorageBlock(t, bucket, "user-1", 10, 15) - block2 := block.MockStorageBlock(t, bucket, "user-1", 12, 20) - block3 := block.MockStorageBlock(t, bucket, "user-1", 20, 30) - block4 := block.MockStorageBlock(t, bucket, "user-1", 30, 40) - mark3 := bucketindex.BlockDeletionMarkFromThanosMarker(block.MockStorageDeletionMark(t, bucket, "user-1", block3)) - - require.NoError(t, services.StartAndAwaitRunning(ctx, s)) - - tests := map[string]struct { - minT int64 - maxT int64 - expectedMetas []tsdb.BlockMeta - expectedMarks map[ulid.ULID]*bucketindex.BlockDeletionMark - }{ - "no matching block because the range is too low": { - minT: 0, - maxT: 5, - expectedMarks: map[ulid.ULID]*bucketindex.BlockDeletionMark{}, - }, - "no matching block because the range is too high": { - minT: 50, - maxT: 60, - expectedMarks: map[ulid.ULID]*bucketindex.BlockDeletionMark{}, - }, - "matching all blocks": { - minT: 0, - maxT: 60, - expectedMetas: []tsdb.BlockMeta{block4, block3, block2, block1}, - expectedMarks: map[ulid.ULID]*bucketindex.BlockDeletionMark{ - block3.ULID: mark3, - }, - }, - "query range starting at a block maxT": { - minT: block3.MaxTime, - maxT: 60, - expectedMetas: []tsdb.BlockMeta{block4}, - expectedMarks: map[ulid.ULID]*bucketindex.BlockDeletionMark{}, - }, - "query range ending at a block minT": { - minT: block3.MinTime, - maxT: block4.MinTime, - expectedMetas: []tsdb.BlockMeta{block4, block3}, - expectedMarks: map[ulid.ULID]*bucketindex.BlockDeletionMark{ - block3.ULID: mark3, - }, - }, - "query range within a single block": { - minT: block3.MinTime + 2, - maxT: block3.MaxTime - 2, - expectedMetas: []tsdb.BlockMeta{block3}, - expectedMarks: map[ulid.ULID]*bucketindex.BlockDeletionMark{ - block3.ULID: mark3, - }, - }, - "query range within multiple blocks": { - minT: 13, - maxT: 16, - expectedMetas: []tsdb.BlockMeta{block2, block1}, - expectedMarks: map[ulid.ULID]*bucketindex.BlockDeletionMark{}, - }, - "query range matching exactly a single block": { - minT: block3.MinTime, - maxT: block3.MaxTime - 1, - expectedMetas: []tsdb.BlockMeta{block3}, - expectedMarks: map[ulid.ULID]*bucketindex.BlockDeletionMark{ - block3.ULID: mark3, - }, - }, - } - - for testName, testData := range tests { - t.Run(testName, func(t *testing.T) { - metas, deletionMarks, err := s.GetBlocks(ctx, "user-1", testData.minT, testData.maxT) - require.NoError(t, err) - require.Equal(t, len(testData.expectedMetas), len(metas)) - require.Equal(t, testData.expectedMarks, deletionMarks) - - for i, expectedBlock := range testData.expectedMetas { - assert.Equal(t, expectedBlock.ULID, metas[i].ID) - } - }) - } -} - -func prepareBucketScanBlocksFinder(t *testing.T, cfg BucketScanBlocksFinderConfig) (*BucketScanBlocksFinder, objstore.Bucket, string, *prometheus.Registry) { - cacheDir := t.TempDir() - - bkt, storageDir := mimir_testutil.PrepareFilesystemBucket(t) - - reg := prometheus.NewPedanticRegistry() - cfg.CacheDir = cacheDir - s := NewBucketScanBlocksFinder(cfg, bkt, nil, log.NewNopLogger(), reg) - - t.Cleanup(func() { - s.StopAsync() - require.NoError(t, s.AwaitTerminated(context.Background())) - }) - - return s, bkt, storageDir, reg -} - -func prepareBucketScanBlocksFinderConfig() BucketScanBlocksFinderConfig { - return BucketScanBlocksFinderConfig{ - ScanInterval: time.Minute, - TenantsConcurrency: 10, - MetasConcurrency: 10, - IgnoreDeletionMarksDelay: time.Hour, - } -} diff --git a/pkg/querier/blocks_store_queryable.go b/pkg/querier/blocks_store_queryable.go index 8eed5dc1d71..b1b704fbb20 100644 --- a/pkg/querier/blocks_store_queryable.go +++ b/pkg/querier/blocks_store_queryable.go @@ -209,11 +209,14 @@ func NewBlocksStoreQueryableFromConfig(querierCfg Config, gatewayCfg storegatewa bucketClient = cachingBucket // Create the blocks finder. - finder := NewBucketScanBlocksFinder(BucketScanBlocksFinderConfig{ - ScanInterval: storageCfg.BucketStore.SyncInterval, - TenantsConcurrency: storageCfg.BucketStore.TenantSyncConcurrency, - MetasConcurrency: storageCfg.BucketStore.MetaSyncConcurrency, - CacheDir: storageCfg.BucketStore.SyncDir, + finder := NewBucketIndexBlocksFinder(BucketIndexBlocksFinderConfig{ + IndexLoader: bucketindex.LoaderConfig{ + CheckInterval: time.Minute, + UpdateOnStaleInterval: storageCfg.BucketStore.SyncInterval, + UpdateOnErrorInterval: storageCfg.BucketStore.BucketIndex.UpdateOnErrorInterval, + IdleTimeout: storageCfg.BucketStore.BucketIndex.IdleTimeout, + }, + MaxStalePeriod: storageCfg.BucketStore.BucketIndex.MaxStalePeriod, IgnoreDeletionMarksDelay: storageCfg.BucketStore.IgnoreDeletionMarksDelay, }, bucketClient, limits, logger, reg)