Skip to content

Commit

Permalink
store-gateway: use bucket index instead of scanning the bucket (#6808)
Browse files Browse the repository at this point in the history
* store-gateway: use bucket index instead of scanning the bucket

This is a follow-up to 6673, similar change to 6779

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Fix TestStoreGateway_InitialSyncFailure

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Fix TestStoreGateway_InitialSyncWithDefaultShardingEnabled

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Fix TestBucketStores_SyncBlocks

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Fix TestBucketStores_InitialSyncShouldRetryOnFailure

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Fix TestBucketStores_InitialSync

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Fix TestBucketStores_Series_ShouldCorrectlyQuerySeriesSpanningMultipleChunks

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Fix TestBucketStore_Series_ShouldQueryBlockWithOutOfOrderChunks

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Fix TestBucketStores_deleteLocalFilesForExcludedTenants

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Fix TestStoreGateway_SyncShouldKeepPreviousBlocksIfInstanceIsUnhealthyInTheRing

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Fix TestStoreGateway_SeriesQueryingShouldEnforceMaxChunksPerQueryLimit

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Fix read-write mode integration tests

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Clean up users in integration tests more often

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Update CHANGELOG.md

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Fix TestGettingStartedWithGossipedRing

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Fix Test_MaxSeriesAndChunksPerQueryLimitHit

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Remove debug option

Signed-off-by: Dimitar Dimitrov <[email protected]>

---------

Signed-off-by: Dimitar Dimitrov <[email protected]>
(cherry picked from commit 1a459bb)
  • Loading branch information
dimitarvdimitrov authored and grafanabot committed Dec 4, 2023
1 parent bcd1f2b commit bb6b9c1
Show file tree
Hide file tree
Showing 9 changed files with 54 additions and 41 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

### Grafana Mimir

* [CHANGE] The following deprecated configurations have been removed: #6673
* [CHANGE] The following deprecated configurations have been removed: #6673 #6779 #6808
* `-querier.iterators`
* `-querier.batch-iterators`
* `-blocks-storage.bucket-store.max-chunk-pool-bytes`
Expand Down
1 change: 1 addition & 0 deletions integration/configs.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ var (
"-blocks-storage.tsdb.ship-interval": "1m",
"-blocks-storage.tsdb.head-compaction-interval": "1s",
"-querier.query-store-after": "0",
"-compactor.cleanup-interval": "2s",
}
}

Expand Down
1 change: 1 addition & 0 deletions integration/getting_started_with_gossiped_ring_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ func TestGettingStartedWithGossipedRing(t *testing.T) {
// decrease timeouts to make test faster. should still be fine with two instances only
"-ingester.ring.observe-period": "5s", // to avoid conflicts in tokens
"-blocks-storage.bucket-store.sync-interval": "1s", // sync continuously
"-compactor.cleanup-interval": "1s", // update bucket index continuously
"-blocks-storage.bucket-store.ignore-blocks-within": "0",
"-blocks-storage.backend": "s3",
"-blocks-storage.s3.bucket-name": blocksBucketName,
Expand Down
2 changes: 1 addition & 1 deletion integration/read_write_mode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,6 @@ func TestReadWriteModeCompaction(t *testing.T) {
// Frequently cleanup old blocks.
// While this doesn't test the compaction functionality of the compactor, it does verify that the compactor
// is correctly configured and able to interact with storage, which is the intention of this test.
"-compactor.cleanup-interval": "2s",
"-compactor.blocks-retention-period": "5s",
})

Expand Down Expand Up @@ -341,6 +340,7 @@ func startReadWriteModeCluster(t *testing.T, s *e2e.Scenario, extraFlags ...map[

flagSets := []map[string]string{
CommonStorageBackendFlags(),
BlocksStorageFlags(),
{
"-memberlist.join": "mimir-backend-1",
},
Expand Down
1 change: 0 additions & 1 deletion integration/single_binary_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ func TestMimirShouldStartInSingleBinaryModeWithAllMemcachedConfigured(t *testing
// Compactor.
"-compactor.ring.store": "consul",
"-compactor.ring.consul.hostname": consul.NetworkHTTPEndpoint(),
"-compactor.cleanup-interval": "2s", // Update bucket index often.
})

// Ensure Mimir successfully starts.
Expand Down
21 changes: 12 additions & 9 deletions integration/store_gateway_limits_hit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,12 +102,15 @@ func Test_MaxSeriesAndChunksPerQueryLimitHit(t *testing.T) {

for testName, testData := range tests {
t.Run(testName, func(t *testing.T) {
// Start Mimir read components and wait until ready. The querier and store-gateway will be ready after
// they discovered the blocks in the storage.
// Start Mimir read components and wait until ready.
// Compactor needs to start before store-gateway so that the bucket index is updated.
compactor := e2emimir.NewCompactor("compactor", consul.NetworkHTTPEndpoint(), flags)
require.NoError(t, scenario.StartAndWaitReady(compactor))

// The querier and store-gateway will be ready after they discovered the blocks in the storage.
querier := e2emimir.NewQuerier("querier", consul.NetworkHTTPEndpoint(), mergeFlags(flags, testData.additionalQuerierFlags))
storeGateway := e2emimir.NewStoreGateway("store-gateway", consul.NetworkHTTPEndpoint(), mergeFlags(flags, testData.additionalStoreGatewayFlags))
compactor := e2emimir.NewCompactor("compactor", consul.NetworkHTTPEndpoint(), flags)
require.NoError(t, scenario.StartAndWaitReady(querier, storeGateway, compactor))
require.NoError(t, scenario.StartAndWaitReady(querier, storeGateway))
t.Cleanup(func() {
require.NoError(t, scenario.Stop(querier, storeGateway, compactor))
})
Expand All @@ -116,15 +119,15 @@ func Test_MaxSeriesAndChunksPerQueryLimitHit(t *testing.T) {
require.NoError(t, err)

// Verify we can successfully query timeseries between timeStamp1 and timeStamp2 (excluded)
rangeResultResponse, _, err := client.QueryRangeRaw("{__name__=~\"series_.+\"}", timeStamp1, timeStamp1.Add(time.Second), time.Second)
rangeResultResponse, rangeResultBody, err := client.QueryRangeRaw("{__name__=~\"series_.+\"}", timeStamp1, timeStamp1.Add(time.Second), time.Second)
require.NoError(t, err)
require.Equal(t, http.StatusOK, rangeResultResponse.StatusCode)
require.Equal(t, http.StatusOK, rangeResultResponse.StatusCode, string(rangeResultBody))

// Verify we cannot successfully query timeseries between timeSeries1 and timeSeries2 (included) because the limit is hit, and the status code 422 is returned
rangeResultResponse, rangeResultBody, err := client.QueryRangeRaw("{__name__=~\"series_.+\"}", timeStamp1, timeStamp2.Add(time.Second), time.Second)
rangeResultResponse, rangeResultBody, err = client.QueryRangeRaw("{__name__=~\"series_.+\"}", timeStamp1, timeStamp2.Add(time.Second), time.Second)
require.NoError(t, err)
require.Equal(t, http.StatusUnprocessableEntity, rangeResultResponse.StatusCode)
require.True(t, strings.Contains(string(rangeResultBody), testData.expectedErrorKey))
require.Equal(t, http.StatusUnprocessableEntity, rangeResultResponse.StatusCode, string(rangeResultBody))
require.True(t, strings.Contains(string(rangeResultBody), testData.expectedErrorKey), string(rangeResultBody))
})
}
}
Expand Down
22 changes: 6 additions & 16 deletions pkg/storegateway/bucket_stores.go
Original file line number Diff line number Diff line change
Expand Up @@ -446,32 +446,22 @@ func (u *BucketStores) getOrCreateStore(userID string) (*BucketStore, error) {
// but if the store-gateway removes redundant blocks before the querier discovers them, the
// consistency check on the querier will fail.
}

// Instantiate a different blocks metadata fetcher based on whether bucket index is enabled or not.
var (
fetcher block.MetadataFetcher
err error
)
fetcher, err = block.NewMetaFetcher(
userLogger,
u.cfg.BucketStore.MetaSyncConcurrency,
userBkt,
u.syncDirForUser(userID), // The fetcher stores cached metas in the "meta-syncer/" sub directory
fetcher := NewBucketIndexMetadataFetcher(
userID,
u.bucket,
u.limits,
u.logger,
fetcherReg,
filters,
)
if err != nil {
return nil, err
}

bucketStoreOpts := []BucketStoreOption{
WithLogger(userLogger),
WithIndexCache(u.indexCache),
WithQueryGate(u.queryGate),
WithLazyLoadingGate(u.lazyLoadingGate),
}

bs, err = NewBucketStore(
bs, err := NewBucketStore(
userID,
userBkt,
fetcher,
Expand Down
18 changes: 14 additions & 4 deletions pkg/storegateway/bucket_stores_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,9 @@ func TestBucketStores_InitialSync(t *testing.T) {
assert.Empty(t, warnings)
assert.Empty(t, seriesSet)
}

for userID := range userToMetric {
createBucketIndex(t, bucket, userID)
}
require.NoError(t, stores.InitialSync(ctx))

// Query series after the initial sync.
Expand Down Expand Up @@ -135,16 +137,17 @@ func TestBucketStores_InitialSync(t *testing.T) {
func TestBucketStores_InitialSyncShouldRetryOnFailure(t *testing.T) {
test.VerifyNoLeak(t)

const tenantID = "user-1"
ctx := context.Background()
cfg := prepareStorageConfig(t)

storageDir := t.TempDir()

// Generate a block for the user in the storage.
generateStorageBlock(t, storageDir, "user-1", "series_1", 10, 100, 15)

generateStorageBlock(t, storageDir, tenantID, "series_1", 10, 100, 15)
bucket, err := filesystem.NewBucketClient(filesystem.Config{Directory: storageDir})
require.NoError(t, err)
createBucketIndex(t, bucket, tenantID)

// Wrap the bucket to fail the 1st Get() request.
bucket = &failFirstGetBucket{Bucket: bucket}
Expand All @@ -157,7 +160,7 @@ func TestBucketStores_InitialSyncShouldRetryOnFailure(t *testing.T) {
require.NoError(t, stores.InitialSync(ctx))

// Query series after the initial sync.
seriesSet, warnings, err := querySeries(t, stores, "user-1", "series_1", 20, 40)
seriesSet, warnings, err := querySeries(t, stores, tenantID, "series_1", 20, 40)
require.NoError(t, err)
assert.Empty(t, warnings)
require.Len(t, seriesSet, 1)
Expand Down Expand Up @@ -216,6 +219,7 @@ func TestBucketStores_SyncBlocks(t *testing.T) {

// Run an initial sync to discover 1 block.
generateStorageBlock(t, storageDir, userID, metricName, 10, 100, 15)
createBucketIndex(t, bucket, userID)
require.NoError(t, stores.InitialSync(ctx))

// Query a range for which we have no samples.
Expand All @@ -226,6 +230,7 @@ func TestBucketStores_SyncBlocks(t *testing.T) {

// Generate another block and sync blocks again.
generateStorageBlock(t, storageDir, userID, metricName, 100, 200, 15)
createBucketIndex(t, bucket, userID)
require.NoError(t, stores.SyncBlocks(ctx))

seriesSet, warnings, err = querySeries(t, stores, userID, metricName, 150, 180)
Expand Down Expand Up @@ -423,6 +428,7 @@ func testBucketStoresSeriesShouldCorrectlyQuerySeriesSpanningMultipleChunks(t *t
stores, err := NewBucketStores(cfg, newNoShardingStrategy(), bucket, defaultLimitsOverrides(t), log.NewNopLogger(), reg)
require.NoError(t, err)

createBucketIndex(t, bucket, userID)
require.NoError(t, stores.InitialSync(ctx))

tests := map[string]struct {
Expand Down Expand Up @@ -501,6 +507,7 @@ func TestBucketStore_Series_ShouldQueryBlockWithOutOfOrderChunks(t *testing.T) {
reg := prometheus.NewPedanticRegistry()
stores, err := NewBucketStores(cfg, newNoShardingStrategy(), bucket, defaultLimitsOverrides(t), log.NewNopLogger(), reg)
require.NoError(t, err)
createBucketIndex(t, bucket, userID)
require.NoError(t, stores.InitialSync(ctx))

tests := map[string]struct {
Expand Down Expand Up @@ -653,6 +660,9 @@ func TestBucketStores_deleteLocalFilesForExcludedTenants(t *testing.T) {

bucket, err := filesystem.NewBucketClient(filesystem.Config{Directory: storageDir})
require.NoError(t, err)
for userID := range userToMetric {
createBucketIndex(t, bucket, userID)
}

sharding := userShardingStrategy{}

Expand Down
27 changes: 18 additions & 9 deletions pkg/storegateway/gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,14 @@ func TestStoreGateway_InitialSyncWithDefaultShardingEnabled(t *testing.T) {
ringStore, closer := consul.NewInMemoryClient(ring.GetCodec(), log.NewNopLogger(), nil)
t.Cleanup(func() { assert.NoError(t, closer.Close()) })

bucketClient := &bucket.ClientMock{}
onBucketIndexGet := func() {}

bucketClient := &bucket.ErrorInjectedBucketClient{Bucket: objstore.NewInMemBucket(), Injector: func(op bucket.Operation, name string) error {
if op == bucket.OpGet && strings.HasSuffix(name, bucketindex.IndexCompressedFilename) {
onBucketIndexGet()
}
return nil
}}

// Setup the initial instance state in the ring.
if testData.initialExists {
Expand All @@ -149,16 +156,18 @@ func TestStoreGateway_InitialSyncWithDefaultShardingEnabled(t *testing.T) {
t.Cleanup(func() { assert.NoError(t, services.StopAndAwaitTerminated(ctx, g)) })
assert.False(t, g.ringLifecycler.IsRegistered())

bucketClient.MockIterWithCallback("", []string{"user-1", "user-2"}, nil, func() {
for _, userID := range []string{"user-1", "user-2"} {
createBucketIndex(t, bucketClient, userID)
}

onBucketIndexGet = func() {
// During the initial sync, we expect the instance to always be in the JOINING
// state within the ring.
assert.True(t, g.ringLifecycler.IsRegistered())
assert.Equal(t, ring.JOINING, g.ringLifecycler.GetState())
assert.Equal(t, ringNumTokensDefault, len(g.ringLifecycler.GetTokens()))
assert.Subset(t, g.ringLifecycler.GetTokens(), testData.initialTokens)
})
bucketClient.MockIter("user-1/", []string{}, nil)
bucketClient.MockIter("user-2/", []string{}, nil)
}

// Once successfully started, the instance should be ACTIVE in the ring.
require.NoError(t, services.StartAndAwaitRunning(ctx, g))
Expand All @@ -184,13 +193,11 @@ func TestStoreGateway_InitialSyncFailure(t *testing.T) {
ringStore, closer := consul.NewInMemoryClient(ring.GetCodec(), log.NewNopLogger(), nil)
t.Cleanup(func() { assert.NoError(t, closer.Close()) })

bucketClient := &bucket.ClientMock{}
bucketClient := &bucket.ErrorInjectedBucketClient{Injector: func(operation bucket.Operation, s string) error { return assert.AnError }}

g, err := newStoreGateway(gatewayCfg, storageCfg, bucketClient, ringStore, defaultLimitsOverrides(t), log.NewNopLogger(), nil, nil)
g, err := newStoreGateway(gatewayCfg, storageCfg, bucketClient, ringStore, defaultLimitsOverrides(t), log.NewLogfmtLogger(os.Stdout), nil, nil)
require.NoError(t, err)

bucketClient.MockIter("", []string{}, errors.New("network error"))

require.NoError(t, g.StartAsync(ctx))
err = g.AwaitRunning(ctx)
assert.Error(t, err)
Expand Down Expand Up @@ -768,6 +775,7 @@ func TestStoreGateway_SyncShouldKeepPreviousBlocksIfInstanceIsUnhealthyInTheRing
bucket, err := filesystem.NewBucketClient(filesystem.Config{Directory: storageDir})
require.NoError(t, err)
generateStorageBlock(t, storageDir, userID, metricName, 10, 100, 15)
createBucketIndex(t, bucket, userID)

g, err := newStoreGateway(gatewayCfg, storageCfg, bucket, ringStore, defaultLimitsOverrides(t), log.NewNopLogger(), reg, nil)
require.NoError(t, err)
Expand Down Expand Up @@ -1431,6 +1439,7 @@ func TestStoreGateway_SeriesQueryingShouldEnforceMaxChunksPerQueryLimit(t *testi

bucketClient, err := filesystem.NewBucketClient(filesystem.Config{Directory: storageDir})
require.NoError(t, err)
createBucketIndex(t, bucketClient, userID)

// Prepare the request to query back all series (1 chunk per series in this test).
req := &storepb.SeriesRequest{
Expand Down

0 comments on commit bb6b9c1

Please sign in to comment.