From e8a101c6237b76cf52e8f753d84c18fe1711c75d Mon Sep 17 00:00:00 2001 From: Harry John Date: Wed, 6 Dec 2023 00:44:14 -0800 Subject: [PATCH] Fix cleaner not discovering deleted users from global dir (#5691) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Fix cleaner not discovering deleted users from global dir Signed-off-by: 🌲 Harry 🌊 John 🏔 * Fix tests Signed-off-by: 🌲 Harry 🌊 John 🏔 * Remove the prefix from entry Signed-off-by: 🌲 Harry 🌊 John 🏔 * fix tests Signed-off-by: 🌲 Harry 🌊 John 🏔 * Fix tests Signed-off-by: 🌲 Harry 🌊 John 🏔 * Address comments Signed-off-by: 🌲 Harry 🌊 John 🏔 * Update comments Signed-off-by: 🌲 Harry 🌊 John 🏔 --------- Signed-off-by: 🌲 Harry 🌊 John 🏔 --- pkg/compactor/compactor_test.go | 13 +++++++ pkg/querier/blocks_finder_bucket_scan_test.go | 3 ++ pkg/storage/tsdb/users_scanner.go | 36 ++++++++++++------- pkg/storage/tsdb/users_scanner_test.go | 15 +++++--- tools/thanosconvert/thanosconvert_test.go | 16 ++++++--- 5 files changed, 61 insertions(+), 22 deletions(-) diff --git a/pkg/compactor/compactor_test.go b/pkg/compactor/compactor_test.go index 8998fa4fae..3a22981379 100644 --- a/pkg/compactor/compactor_test.go +++ b/pkg/compactor/compactor_test.go @@ -170,6 +170,7 @@ func TestCompactor_SkipCompactionWhenCmkError(t *testing.T) { // No user blocks stored in the bucket. bucketClient := &bucket.ClientMock{} bucketClient.MockIter("", []string{userID}, nil) + bucketClient.MockIter("__markers__", []string{}, nil) bucketClient.MockIter(userID+"/", []string{}, nil) bucketClient.MockIter(userID+"/markers/", nil, nil) bucketClient.MockGet(userID+"/bucket-index-sync-status.json", string(content), nil) @@ -198,6 +199,7 @@ func TestCompactor_ShouldDoNothingOnNoUserBlocks(t *testing.T) { // No user blocks stored in the bucket. bucketClient := &bucket.ClientMock{} bucketClient.MockIter("", []string{}, nil) + bucketClient.MockIter("__markers__", []string{}, nil) cfg := prepareConfig() c, _, _, logs, registry := prepare(t, cfg, bucketClient, nil) require.NoError(t, services.StartAndAwaitRunning(context.Background(), c)) @@ -348,6 +350,7 @@ func TestCompactor_ShouldRetryCompactionOnFailureWhileDiscoveringUsersFromBucket // Fail to iterate over the bucket while discovering users. bucketClient := &bucket.ClientMock{} + bucketClient.MockIter("__markers__", nil, errors.New("failed to iterate the bucket")) bucketClient.MockIter("", nil, errors.New("failed to iterate the bucket")) c, _, _, logs, registry := prepare(t, prepareConfig(), bucketClient, nil) @@ -501,6 +504,7 @@ func TestCompactor_ShouldIncrementCompactionErrorIfFailedToCompactASingleTenant( userID := "test-user" bucketClient := &bucket.ClientMock{} bucketClient.MockIter("", []string{userID}, nil) + bucketClient.MockIter("__markers__", []string{}, nil) bucketClient.MockIter(userID+"/", []string{userID + "/01DTVP434PA9VFXSW2JKB3392D/meta.json", userID + "/01FN6CDF3PNEWWRY5MPGJPE3EX/meta.json"}, nil) bucketClient.MockIter(userID+"/markers/", nil, nil) bucketClient.MockExists(cortex_tsdb.GetGlobalDeletionMarkPath(userID), false, nil) @@ -553,6 +557,7 @@ func TestCompactor_ShouldIncrementCompactionErrorIfFailedToCompactASingleTenant( func TestCompactor_ShouldCompactAndRemoveUserFolder(t *testing.T) { bucketClient := &bucket.ClientMock{} bucketClient.MockIter("", []string{"user-1"}, nil) + bucketClient.MockIter("__markers__", []string{}, nil) bucketClient.MockExists(cortex_tsdb.GetGlobalDeletionMarkPath("user-1"), false, nil) bucketClient.MockExists(cortex_tsdb.GetLocalDeletionMarkPath("user-1"), false, nil) bucketClient.MockIter("user-1/", []string{"user-1/01DTVP434PA9VFXSW2JKB3392D/meta.json", "user-1/01FN6CDF3PNEWWRY5MPGJPE3EX/meta.json"}, nil) @@ -598,6 +603,7 @@ func TestCompactor_ShouldIterateOverUsersAndRunCompaction(t *testing.T) { // Mock the bucket to contain two users, each one with one block. bucketClient := &bucket.ClientMock{} bucketClient.MockIter("", []string{"user-1", "user-2"}, nil) + bucketClient.MockIter("__markers__", []string{}, nil) bucketClient.MockExists(cortex_tsdb.GetGlobalDeletionMarkPath("user-1"), false, nil) bucketClient.MockExists(cortex_tsdb.GetLocalDeletionMarkPath("user-1"), false, nil) bucketClient.MockExists(cortex_tsdb.GetGlobalDeletionMarkPath("user-2"), false, nil) @@ -741,6 +747,7 @@ func TestCompactor_ShouldNotCompactBlocksMarkedForDeletion(t *testing.T) { // Mock the bucket to contain two users, each one with one block. bucketClient := &bucket.ClientMock{} bucketClient.MockIter("", []string{"user-1"}, nil) + bucketClient.MockIter("__markers__", []string{}, nil) bucketClient.MockIter("user-1/", []string{"user-1/01DTVP434PA9VFXSW2JKB3392D", "user-1/01DTW0ZCPDDNV4BV83Q2SV4QAZ"}, nil) bucketClient.MockExists(cortex_tsdb.GetGlobalDeletionMarkPath("user-1"), false, nil) bucketClient.MockExists(cortex_tsdb.GetLocalDeletionMarkPath("user-1"), false, nil) @@ -866,6 +873,7 @@ func TestCompactor_ShouldNotCompactBlocksMarkedForSkipCompact(t *testing.T) { // Mock the bucket to contain two users, each one with one block. bucketClient := &bucket.ClientMock{} bucketClient.MockIter("", []string{"user-1", "user-2"}, nil) + bucketClient.MockIter("__markers__", []string{}, nil) bucketClient.MockExists(cortex_tsdb.GetGlobalDeletionMarkPath("user-1"), false, nil) bucketClient.MockExists(cortex_tsdb.GetLocalDeletionMarkPath("user-1"), false, nil) bucketClient.MockExists(cortex_tsdb.GetGlobalDeletionMarkPath("user-2"), false, nil) @@ -944,6 +952,7 @@ func TestCompactor_ShouldNotCompactBlocksForUsersMarkedForDeletion(t *testing.T) // Mock the bucket to contain two users, each one with one block. bucketClient := &bucket.ClientMock{} bucketClient.MockIter("", []string{"user-1"}, nil) + bucketClient.MockIter("__markers__", []string{"__markers__/user-1/"}, nil) bucketClient.MockIter("user-1/", []string{"user-1/01DTVP434PA9VFXSW2JKB3392D"}, nil) bucketClient.MockGet(cortex_tsdb.GetGlobalDeletionMarkPath("user-1"), `{"deletion_time": 1}`, nil) bucketClient.MockUpload(cortex_tsdb.GetGlobalDeletionMarkPath("user-1"), nil) @@ -1107,6 +1116,7 @@ func TestCompactor_ShouldCompactAllUsersOnShardingEnabledButOnlyOneInstanceRunni // Mock the bucket to contain two users, each one with one block. bucketClient := &bucket.ClientMock{} bucketClient.MockIter("", []string{"user-1", "user-2"}, nil) + bucketClient.MockIter("__markers__", []string{}, nil) bucketClient.MockExists(cortex_tsdb.GetGlobalDeletionMarkPath("user-1"), false, nil) bucketClient.MockExists(cortex_tsdb.GetLocalDeletionMarkPath("user-1"), false, nil) bucketClient.MockExists(cortex_tsdb.GetGlobalDeletionMarkPath("user-2"), false, nil) @@ -1215,6 +1225,7 @@ func TestCompactor_ShouldCompactOnlyUsersOwnedByTheInstanceOnShardingEnabledAndM // Mock the bucket to contain all users, each one with one block. bucketClient := &bucket.ClientMock{} bucketClient.MockIter("", userIDs, nil) + bucketClient.MockIter("__markers__", []string{}, nil) for _, userID := range userIDs { bucketClient.MockIter(userID+"/", []string{userID + "/01DTVP434PA9VFXSW2JKB3392D"}, nil) bucketClient.MockIter(userID+"/markers/", nil, nil) @@ -1321,6 +1332,7 @@ func TestCompactor_ShouldCompactOnlyShardsOwnedByTheInstanceOnShardingEnabledWit // Mock the bucket to contain all users, each one with five blocks, 2 sets of overlapping blocks and 1 separate block. bucketClient := &bucket.ClientMock{} bucketClient.MockIter("", userIDs, nil) + bucketClient.MockIter("__markers__", []string{}, nil) // Keys with a value greater than 1 will be groups that should be compacted groupHashes := make(map[uint32]int) @@ -1927,6 +1939,7 @@ func TestCompactor_ShouldFailCompactionOnTimeout(t *testing.T) { // Mock the bucket bucketClient := &bucket.ClientMock{} bucketClient.MockIter("", []string{}, nil) + bucketClient.MockIter("__markers__", []string{}, nil) ringStore, closer := consul.NewInMemoryClient(ring.GetCodec(), log.NewNopLogger(), nil) t.Cleanup(func() { assert.NoError(t, closer.Close()) }) diff --git a/pkg/querier/blocks_finder_bucket_scan_test.go b/pkg/querier/blocks_finder_bucket_scan_test.go index 21abdee834..b502c8eed2 100644 --- a/pkg/querier/blocks_finder_bucket_scan_test.go +++ b/pkg/querier/blocks_finder_bucket_scan_test.go @@ -94,6 +94,7 @@ func TestBucketScanBlocksFinder_InitialScanFailure(t *testing.T) { // Mock the storage to simulate a failure when reading objects. bucket.MockIter("", []string{"user-1"}, nil) + bucket.MockIter("__markers__", []string{}, nil) bucket.MockIter("user-1/", []string{"user-1/01DTVP434PA9VFXSW2JKB3392D/meta.json"}, nil) bucket.MockExists(cortex_tsdb.GetGlobalDeletionMarkPath("user-1"), false, nil) bucket.MockExists(cortex_tsdb.GetLocalDeletionMarkPath("user-1"), false, nil) @@ -139,6 +140,7 @@ func TestBucketScanBlocksFinder_StopWhileRunningTheInitialScanOnManyTenants(t *t // Mock the bucket to introduce a 1s sleep while iterating each tenant in the bucket. bucket := &bucket.ClientMock{} bucket.MockIter("", tenantIDs, nil) + bucket.MockIter("__markers__", []string{}, nil) for _, tenantID := range tenantIDs { bucket.MockIterWithCallback(tenantID+"/", []string{}, nil, func() { time.Sleep(time.Second) @@ -177,6 +179,7 @@ func TestBucketScanBlocksFinder_StopWhileRunningTheInitialScanOnManyBlocks(t *te // Mock the bucket to introduce a 1s sleep while syncing each block in the bucket. bucket := &bucket.ClientMock{} bucket.MockIter("", []string{"user-1"}, nil) + bucket.MockIter("__markers__", []string{}, nil) bucket.MockIter("user-1/", blockPaths, nil) bucket.On("Exists", mock.Anything, mock.Anything).Return(false, nil).Run(func(args mock.Arguments) { // We return the meta.json doesn't exist, but introduce a 1s delay for each call. diff --git a/pkg/storage/tsdb/users_scanner.go b/pkg/storage/tsdb/users_scanner.go index 54f40bcc9d..2c0d463d25 100644 --- a/pkg/storage/tsdb/users_scanner.go +++ b/pkg/storage/tsdb/users_scanner.go @@ -38,38 +38,48 @@ func NewUsersScanner(bucketClient objstore.Bucket, isOwned func(userID string) ( // // If sharding is enabled, returned lists contains only the users owned by this instance. func (s *UsersScanner) ScanUsers(ctx context.Context) (users, markedForDeletion []string, err error) { + scannedUsers := make(map[string]struct{}) + + // Scan users in the bucket. err = s.bucketClient.Iter(ctx, "", func(entry string) error { - users = append(users, strings.TrimSuffix(entry, "/")) + userID := strings.TrimSuffix(entry, "/") + scannedUsers[userID] = struct{}{} return nil }) if err != nil { return nil, nil, err } - // Check users for being owned by instance, and split users into non-deleted and deleted. - // We do these checks after listing all users, to improve cacheability of Iter (result is only cached at the end of Iter call). - for ix := 0; ix < len(users); { - userID := users[ix] + // Scan users from the __markers__ directory. + err = s.bucketClient.Iter(ctx, util.GlobalMarkersDir, func(entry string) error { + // entry will be of the form __markers__// + parts := strings.Split(entry, objstore.DirDelim) + userID := parts[1] + scannedUsers[userID] = struct{}{} + return nil + }) + if err != nil { + return nil, nil, err + } - // Check if it's owned by this instance. - owned, err := s.isOwned(userID) - if err != nil { + for userID := range scannedUsers { + // Filter out users not owned by this instance. + if owned, err := s.isOwned(userID); err != nil { level.Warn(s.logger).Log("msg", "unable to check if user is owned by this shard", "user", userID, "err", err) } else if !owned { - users = append(users[:ix], users[ix+1:]...) continue } - deletionMarkExists, err := TenantDeletionMarkExists(ctx, s.bucketClient, userID) - if err != nil { + // Filter users marked for deletion + if deletionMarkExists, err := TenantDeletionMarkExists(ctx, s.bucketClient, userID); err != nil { level.Warn(s.logger).Log("msg", "unable to check if user is marked for deletion", "user", userID, "err", err) } else if deletionMarkExists { - users = append(users[:ix], users[ix+1:]...) markedForDeletion = append(markedForDeletion, userID) continue } - ix++ + // The remaining are the active users owned by this instance. + users = append(users, userID) } return users, markedForDeletion, nil diff --git a/pkg/storage/tsdb/users_scanner_test.go b/pkg/storage/tsdb/users_scanner_test.go index 72503865aa..18c044d49c 100644 --- a/pkg/storage/tsdb/users_scanner_test.go +++ b/pkg/storage/tsdb/users_scanner_test.go @@ -8,28 +8,32 @@ import ( "github.com/go-kit/log" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "golang.org/x/exp/slices" "github.com/cortexproject/cortex/pkg/storage/bucket" ) func TestUsersScanner_ScanUsers_ShouldReturnedOwnedUsersOnly(t *testing.T) { bucketClient := &bucket.ClientMock{} - bucketClient.MockIter("", []string{"user-1", "user-2", "user-3", "user-4"}, nil) + bucketClient.MockIter("", []string{"user-1/", "user-2/", "user-3/", "user-4/"}, nil) + bucketClient.MockIter("__markers__", []string{"__markers__/user-5/", "__markers__/user-6/", "__markers__/user-7/"}, nil) bucketClient.MockExists(GetGlobalDeletionMarkPath("user-1"), false, nil) bucketClient.MockExists(GetLocalDeletionMarkPath("user-1"), false, nil) bucketClient.MockExists(GetGlobalDeletionMarkPath("user-3"), true, nil) - bucketClient.MockExists(GetLocalDeletionMarkPath("user-3"), true, nil) + bucketClient.MockExists(GetLocalDeletionMarkPath("user-3"), false, nil) + bucketClient.MockExists(GetGlobalDeletionMarkPath("user-7"), false, nil) + bucketClient.MockExists(GetLocalDeletionMarkPath("user-7"), true, nil) isOwned := func(userID string) (bool, error) { - return userID == "user-1" || userID == "user-3", nil + return userID == "user-1" || userID == "user-3" || userID == "user-7", nil } s := NewUsersScanner(bucketClient, isOwned, log.NewNopLogger()) actual, deleted, err := s.ScanUsers(context.Background()) require.NoError(t, err) assert.Equal(t, []string{"user-1"}, actual) - assert.Equal(t, []string{"user-3"}, deleted) - + slices.Sort(deleted) + assert.Equal(t, []string{"user-3", "user-7"}, deleted) } func TestUsersScanner_ScanUsers_ShouldReturnUsersForWhichOwnerCheckOrTenantDeletionCheckFailed(t *testing.T) { @@ -37,6 +41,7 @@ func TestUsersScanner_ScanUsers_ShouldReturnUsersForWhichOwnerCheckOrTenantDelet bucketClient := &bucket.ClientMock{} bucketClient.MockIter("", expected, nil) + bucketClient.MockIter("__markers__", []string{}, nil) bucketClient.MockExists(GetGlobalDeletionMarkPath("user-1"), false, nil) bucketClient.MockExists(GetLocalDeletionMarkPath("user-1"), false, nil) diff --git a/tools/thanosconvert/thanosconvert_test.go b/tools/thanosconvert/thanosconvert_test.go index f59dd9da4f..169196a105 100644 --- a/tools/thanosconvert/thanosconvert_test.go +++ b/tools/thanosconvert/thanosconvert_test.go @@ -45,8 +45,10 @@ func TestThanosBlockConverter(t *testing.T) { assertions func(*testing.T, *bucket.ClientMock, Results, error) }{ { - name: "empty bucket is a noop", - bucketData: fakeBucket{}, + name: "empty bucket is a noop", + bucketData: fakeBucket{ + "__markers__": map[string]metadata.Meta{}, + }, assertions: func(t *testing.T, bkt *bucket.ClientMock, results Results, err error) { bkt.AssertNotCalled(t, "Get", mock.Anything, mock.Anything) bkt.AssertNotCalled(t, "Upload", mock.Anything, mock.Anything, mock.Anything) @@ -54,8 +56,11 @@ func TestThanosBlockConverter(t *testing.T) { }, }, { - name: "user with no blocks is a noop", - bucketData: fakeBucket{"user1": map[string]metadata.Meta{}}, + name: "user with no blocks is a noop", + bucketData: fakeBucket{ + "user1": map[string]metadata.Meta{}, + "__markers__": map[string]metadata.Meta{}, + }, assertions: func(t *testing.T, bkt *bucket.ClientMock, results Results, err error) { bkt.AssertNotCalled(t, "Get", mock.Anything, mock.Anything) bkt.AssertNotCalled(t, "Upload", mock.Anything, mock.Anything, mock.Anything) @@ -80,6 +85,7 @@ func TestThanosBlockConverter(t *testing.T) { "user3": map[string]metadata.Meta{ block1: cortexMeta("user3"), }, + "__markers__": map[string]metadata.Meta{}, }, assertions: func(t *testing.T, bkt *bucket.ClientMock, results Results, err error) { bkt.AssertNotCalled(t, "Upload", mock.Anything, mock.Anything, mock.Anything) @@ -114,6 +120,7 @@ func TestThanosBlockConverter(t *testing.T) { "user3": map[string]metadata.Meta{ block1: thanosMeta(), }, + "__markers__": map[string]metadata.Meta{}, }, assertions: func(t *testing.T, bkt *bucket.ClientMock, results Results, err error) { assert.Len(t, results, 3, "expected users in results") @@ -149,6 +156,7 @@ func TestThanosBlockConverter(t *testing.T) { blockWithUploadFailure: thanosMeta(), blockWithMalformedMeta: thanosMeta(), }, + "__markers__": map[string]metadata.Meta{}, }, assertions: func(t *testing.T, bkt *bucket.ClientMock, results Results, err error) { assert.Len(t, results["user1"].FailedBlocks, 1)