Skip to content

Commit

Permalink
Fix cleaner not discovering deleted users from global dir (#5691)
Browse files Browse the repository at this point in the history
* Fix cleaner not discovering deleted users from global dir

Signed-off-by: 🌲 Harry 🌊 John 🏔 <[email protected]>

* Fix tests

Signed-off-by: 🌲 Harry 🌊 John 🏔 <[email protected]>

* Remove the prefix from entry

Signed-off-by: 🌲 Harry 🌊 John 🏔 <[email protected]>

* fix tests

Signed-off-by: 🌲 Harry 🌊 John 🏔 <[email protected]>

* Fix tests

Signed-off-by: 🌲 Harry 🌊 John 🏔 <[email protected]>

* Address comments

Signed-off-by: 🌲 Harry 🌊 John 🏔 <[email protected]>

* Update comments

Signed-off-by: 🌲 Harry 🌊 John 🏔 <[email protected]>

---------

Signed-off-by: 🌲 Harry 🌊 John 🏔 <[email protected]>
  • Loading branch information
harry671003 authored Dec 6, 2023
1 parent 61fe286 commit e8a101c
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 22 deletions.
13 changes: 13 additions & 0 deletions pkg/compactor/compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ func TestCompactor_SkipCompactionWhenCmkError(t *testing.T) {
// No user blocks stored in the bucket.
bucketClient := &bucket.ClientMock{}
bucketClient.MockIter("", []string{userID}, nil)
bucketClient.MockIter("__markers__", []string{}, nil)
bucketClient.MockIter(userID+"/", []string{}, nil)
bucketClient.MockIter(userID+"/markers/", nil, nil)
bucketClient.MockGet(userID+"/bucket-index-sync-status.json", string(content), nil)
Expand Down Expand Up @@ -198,6 +199,7 @@ func TestCompactor_ShouldDoNothingOnNoUserBlocks(t *testing.T) {
// No user blocks stored in the bucket.
bucketClient := &bucket.ClientMock{}
bucketClient.MockIter("", []string{}, nil)
bucketClient.MockIter("__markers__", []string{}, nil)
cfg := prepareConfig()
c, _, _, logs, registry := prepare(t, cfg, bucketClient, nil)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), c))
Expand Down Expand Up @@ -348,6 +350,7 @@ func TestCompactor_ShouldRetryCompactionOnFailureWhileDiscoveringUsersFromBucket

// Fail to iterate over the bucket while discovering users.
bucketClient := &bucket.ClientMock{}
bucketClient.MockIter("__markers__", nil, errors.New("failed to iterate the bucket"))
bucketClient.MockIter("", nil, errors.New("failed to iterate the bucket"))

c, _, _, logs, registry := prepare(t, prepareConfig(), bucketClient, nil)
Expand Down Expand Up @@ -501,6 +504,7 @@ func TestCompactor_ShouldIncrementCompactionErrorIfFailedToCompactASingleTenant(
userID := "test-user"
bucketClient := &bucket.ClientMock{}
bucketClient.MockIter("", []string{userID}, nil)
bucketClient.MockIter("__markers__", []string{}, nil)
bucketClient.MockIter(userID+"/", []string{userID + "/01DTVP434PA9VFXSW2JKB3392D/meta.json", userID + "/01FN6CDF3PNEWWRY5MPGJPE3EX/meta.json"}, nil)
bucketClient.MockIter(userID+"/markers/", nil, nil)
bucketClient.MockExists(cortex_tsdb.GetGlobalDeletionMarkPath(userID), false, nil)
Expand Down Expand Up @@ -553,6 +557,7 @@ func TestCompactor_ShouldIncrementCompactionErrorIfFailedToCompactASingleTenant(
func TestCompactor_ShouldCompactAndRemoveUserFolder(t *testing.T) {
bucketClient := &bucket.ClientMock{}
bucketClient.MockIter("", []string{"user-1"}, nil)
bucketClient.MockIter("__markers__", []string{}, nil)
bucketClient.MockExists(cortex_tsdb.GetGlobalDeletionMarkPath("user-1"), false, nil)
bucketClient.MockExists(cortex_tsdb.GetLocalDeletionMarkPath("user-1"), false, nil)
bucketClient.MockIter("user-1/", []string{"user-1/01DTVP434PA9VFXSW2JKB3392D/meta.json", "user-1/01FN6CDF3PNEWWRY5MPGJPE3EX/meta.json"}, nil)
Expand Down Expand Up @@ -598,6 +603,7 @@ func TestCompactor_ShouldIterateOverUsersAndRunCompaction(t *testing.T) {
// Mock the bucket to contain two users, each one with one block.
bucketClient := &bucket.ClientMock{}
bucketClient.MockIter("", []string{"user-1", "user-2"}, nil)
bucketClient.MockIter("__markers__", []string{}, nil)
bucketClient.MockExists(cortex_tsdb.GetGlobalDeletionMarkPath("user-1"), false, nil)
bucketClient.MockExists(cortex_tsdb.GetLocalDeletionMarkPath("user-1"), false, nil)
bucketClient.MockExists(cortex_tsdb.GetGlobalDeletionMarkPath("user-2"), false, nil)
Expand Down Expand Up @@ -741,6 +747,7 @@ func TestCompactor_ShouldNotCompactBlocksMarkedForDeletion(t *testing.T) {
// Mock the bucket to contain two users, each one with one block.
bucketClient := &bucket.ClientMock{}
bucketClient.MockIter("", []string{"user-1"}, nil)
bucketClient.MockIter("__markers__", []string{}, nil)
bucketClient.MockIter("user-1/", []string{"user-1/01DTVP434PA9VFXSW2JKB3392D", "user-1/01DTW0ZCPDDNV4BV83Q2SV4QAZ"}, nil)
bucketClient.MockExists(cortex_tsdb.GetGlobalDeletionMarkPath("user-1"), false, nil)
bucketClient.MockExists(cortex_tsdb.GetLocalDeletionMarkPath("user-1"), false, nil)
Expand Down Expand Up @@ -866,6 +873,7 @@ func TestCompactor_ShouldNotCompactBlocksMarkedForSkipCompact(t *testing.T) {
// Mock the bucket to contain two users, each one with one block.
bucketClient := &bucket.ClientMock{}
bucketClient.MockIter("", []string{"user-1", "user-2"}, nil)
bucketClient.MockIter("__markers__", []string{}, nil)
bucketClient.MockExists(cortex_tsdb.GetGlobalDeletionMarkPath("user-1"), false, nil)
bucketClient.MockExists(cortex_tsdb.GetLocalDeletionMarkPath("user-1"), false, nil)
bucketClient.MockExists(cortex_tsdb.GetGlobalDeletionMarkPath("user-2"), false, nil)
Expand Down Expand Up @@ -944,6 +952,7 @@ func TestCompactor_ShouldNotCompactBlocksForUsersMarkedForDeletion(t *testing.T)
// Mock the bucket to contain two users, each one with one block.
bucketClient := &bucket.ClientMock{}
bucketClient.MockIter("", []string{"user-1"}, nil)
bucketClient.MockIter("__markers__", []string{"__markers__/user-1/"}, nil)
bucketClient.MockIter("user-1/", []string{"user-1/01DTVP434PA9VFXSW2JKB3392D"}, nil)
bucketClient.MockGet(cortex_tsdb.GetGlobalDeletionMarkPath("user-1"), `{"deletion_time": 1}`, nil)
bucketClient.MockUpload(cortex_tsdb.GetGlobalDeletionMarkPath("user-1"), nil)
Expand Down Expand Up @@ -1107,6 +1116,7 @@ func TestCompactor_ShouldCompactAllUsersOnShardingEnabledButOnlyOneInstanceRunni
// Mock the bucket to contain two users, each one with one block.
bucketClient := &bucket.ClientMock{}
bucketClient.MockIter("", []string{"user-1", "user-2"}, nil)
bucketClient.MockIter("__markers__", []string{}, nil)
bucketClient.MockExists(cortex_tsdb.GetGlobalDeletionMarkPath("user-1"), false, nil)
bucketClient.MockExists(cortex_tsdb.GetLocalDeletionMarkPath("user-1"), false, nil)
bucketClient.MockExists(cortex_tsdb.GetGlobalDeletionMarkPath("user-2"), false, nil)
Expand Down Expand Up @@ -1215,6 +1225,7 @@ func TestCompactor_ShouldCompactOnlyUsersOwnedByTheInstanceOnShardingEnabledAndM
// Mock the bucket to contain all users, each one with one block.
bucketClient := &bucket.ClientMock{}
bucketClient.MockIter("", userIDs, nil)
bucketClient.MockIter("__markers__", []string{}, nil)
for _, userID := range userIDs {
bucketClient.MockIter(userID+"/", []string{userID + "/01DTVP434PA9VFXSW2JKB3392D"}, nil)
bucketClient.MockIter(userID+"/markers/", nil, nil)
Expand Down Expand Up @@ -1321,6 +1332,7 @@ func TestCompactor_ShouldCompactOnlyShardsOwnedByTheInstanceOnShardingEnabledWit
// Mock the bucket to contain all users, each one with five blocks, 2 sets of overlapping blocks and 1 separate block.
bucketClient := &bucket.ClientMock{}
bucketClient.MockIter("", userIDs, nil)
bucketClient.MockIter("__markers__", []string{}, nil)

// Keys with a value greater than 1 will be groups that should be compacted
groupHashes := make(map[uint32]int)
Expand Down Expand Up @@ -1927,6 +1939,7 @@ func TestCompactor_ShouldFailCompactionOnTimeout(t *testing.T) {
// Mock the bucket
bucketClient := &bucket.ClientMock{}
bucketClient.MockIter("", []string{}, nil)
bucketClient.MockIter("__markers__", []string{}, nil)

ringStore, closer := consul.NewInMemoryClient(ring.GetCodec(), log.NewNopLogger(), nil)
t.Cleanup(func() { assert.NoError(t, closer.Close()) })
Expand Down
3 changes: 3 additions & 0 deletions pkg/querier/blocks_finder_bucket_scan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ func TestBucketScanBlocksFinder_InitialScanFailure(t *testing.T) {

// Mock the storage to simulate a failure when reading objects.
bucket.MockIter("", []string{"user-1"}, nil)
bucket.MockIter("__markers__", []string{}, nil)
bucket.MockIter("user-1/", []string{"user-1/01DTVP434PA9VFXSW2JKB3392D/meta.json"}, nil)
bucket.MockExists(cortex_tsdb.GetGlobalDeletionMarkPath("user-1"), false, nil)
bucket.MockExists(cortex_tsdb.GetLocalDeletionMarkPath("user-1"), false, nil)
Expand Down Expand Up @@ -139,6 +140,7 @@ func TestBucketScanBlocksFinder_StopWhileRunningTheInitialScanOnManyTenants(t *t
// Mock the bucket to introduce a 1s sleep while iterating each tenant in the bucket.
bucket := &bucket.ClientMock{}
bucket.MockIter("", tenantIDs, nil)
bucket.MockIter("__markers__", []string{}, nil)
for _, tenantID := range tenantIDs {
bucket.MockIterWithCallback(tenantID+"/", []string{}, nil, func() {
time.Sleep(time.Second)
Expand Down Expand Up @@ -177,6 +179,7 @@ func TestBucketScanBlocksFinder_StopWhileRunningTheInitialScanOnManyBlocks(t *te
// Mock the bucket to introduce a 1s sleep while syncing each block in the bucket.
bucket := &bucket.ClientMock{}
bucket.MockIter("", []string{"user-1"}, nil)
bucket.MockIter("__markers__", []string{}, nil)
bucket.MockIter("user-1/", blockPaths, nil)
bucket.On("Exists", mock.Anything, mock.Anything).Return(false, nil).Run(func(args mock.Arguments) {
// We return the meta.json doesn't exist, but introduce a 1s delay for each call.
Expand Down
36 changes: 23 additions & 13 deletions pkg/storage/tsdb/users_scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,38 +38,48 @@ func NewUsersScanner(bucketClient objstore.Bucket, isOwned func(userID string) (
//
// If sharding is enabled, returned lists contains only the users owned by this instance.
func (s *UsersScanner) ScanUsers(ctx context.Context) (users, markedForDeletion []string, err error) {
scannedUsers := make(map[string]struct{})

// Scan users in the bucket.
err = s.bucketClient.Iter(ctx, "", func(entry string) error {
users = append(users, strings.TrimSuffix(entry, "/"))
userID := strings.TrimSuffix(entry, "/")
scannedUsers[userID] = struct{}{}
return nil
})
if err != nil {
return nil, nil, err
}

// Check users for being owned by instance, and split users into non-deleted and deleted.
// We do these checks after listing all users, to improve cacheability of Iter (result is only cached at the end of Iter call).
for ix := 0; ix < len(users); {
userID := users[ix]
// Scan users from the __markers__ directory.
err = s.bucketClient.Iter(ctx, util.GlobalMarkersDir, func(entry string) error {
// entry will be of the form __markers__/<user>/
parts := strings.Split(entry, objstore.DirDelim)
userID := parts[1]
scannedUsers[userID] = struct{}{}
return nil
})
if err != nil {
return nil, nil, err
}

// Check if it's owned by this instance.
owned, err := s.isOwned(userID)
if err != nil {
for userID := range scannedUsers {
// Filter out users not owned by this instance.
if owned, err := s.isOwned(userID); err != nil {
level.Warn(s.logger).Log("msg", "unable to check if user is owned by this shard", "user", userID, "err", err)
} else if !owned {
users = append(users[:ix], users[ix+1:]...)
continue
}

deletionMarkExists, err := TenantDeletionMarkExists(ctx, s.bucketClient, userID)
if err != nil {
// Filter users marked for deletion
if deletionMarkExists, err := TenantDeletionMarkExists(ctx, s.bucketClient, userID); err != nil {
level.Warn(s.logger).Log("msg", "unable to check if user is marked for deletion", "user", userID, "err", err)
} else if deletionMarkExists {
users = append(users[:ix], users[ix+1:]...)
markedForDeletion = append(markedForDeletion, userID)
continue
}

ix++
// The remaining are the active users owned by this instance.
users = append(users, userID)
}

return users, markedForDeletion, nil
Expand Down
15 changes: 10 additions & 5 deletions pkg/storage/tsdb/users_scanner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,35 +8,40 @@ import (
"github.com/go-kit/log"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/exp/slices"

"github.com/cortexproject/cortex/pkg/storage/bucket"
)

func TestUsersScanner_ScanUsers_ShouldReturnedOwnedUsersOnly(t *testing.T) {
bucketClient := &bucket.ClientMock{}
bucketClient.MockIter("", []string{"user-1", "user-2", "user-3", "user-4"}, nil)
bucketClient.MockIter("", []string{"user-1/", "user-2/", "user-3/", "user-4/"}, nil)
bucketClient.MockIter("__markers__", []string{"__markers__/user-5/", "__markers__/user-6/", "__markers__/user-7/"}, nil)
bucketClient.MockExists(GetGlobalDeletionMarkPath("user-1"), false, nil)
bucketClient.MockExists(GetLocalDeletionMarkPath("user-1"), false, nil)
bucketClient.MockExists(GetGlobalDeletionMarkPath("user-3"), true, nil)
bucketClient.MockExists(GetLocalDeletionMarkPath("user-3"), true, nil)
bucketClient.MockExists(GetLocalDeletionMarkPath("user-3"), false, nil)
bucketClient.MockExists(GetGlobalDeletionMarkPath("user-7"), false, nil)
bucketClient.MockExists(GetLocalDeletionMarkPath("user-7"), true, nil)

isOwned := func(userID string) (bool, error) {
return userID == "user-1" || userID == "user-3", nil
return userID == "user-1" || userID == "user-3" || userID == "user-7", nil
}

s := NewUsersScanner(bucketClient, isOwned, log.NewNopLogger())
actual, deleted, err := s.ScanUsers(context.Background())
require.NoError(t, err)
assert.Equal(t, []string{"user-1"}, actual)
assert.Equal(t, []string{"user-3"}, deleted)

slices.Sort(deleted)
assert.Equal(t, []string{"user-3", "user-7"}, deleted)
}

func TestUsersScanner_ScanUsers_ShouldReturnUsersForWhichOwnerCheckOrTenantDeletionCheckFailed(t *testing.T) {
expected := []string{"user-1", "user-2"}

bucketClient := &bucket.ClientMock{}
bucketClient.MockIter("", expected, nil)
bucketClient.MockIter("__markers__", []string{}, nil)
bucketClient.MockExists(GetGlobalDeletionMarkPath("user-1"), false, nil)
bucketClient.MockExists(GetLocalDeletionMarkPath("user-1"), false, nil)

Expand Down
16 changes: 12 additions & 4 deletions tools/thanosconvert/thanosconvert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,17 +45,22 @@ func TestThanosBlockConverter(t *testing.T) {
assertions func(*testing.T, *bucket.ClientMock, Results, error)
}{
{
name: "empty bucket is a noop",
bucketData: fakeBucket{},
name: "empty bucket is a noop",
bucketData: fakeBucket{
"__markers__": map[string]metadata.Meta{},
},
assertions: func(t *testing.T, bkt *bucket.ClientMock, results Results, err error) {
bkt.AssertNotCalled(t, "Get", mock.Anything, mock.Anything)
bkt.AssertNotCalled(t, "Upload", mock.Anything, mock.Anything, mock.Anything)
assert.Len(t, results, 0, "expected no users in results")
},
},
{
name: "user with no blocks is a noop",
bucketData: fakeBucket{"user1": map[string]metadata.Meta{}},
name: "user with no blocks is a noop",
bucketData: fakeBucket{
"user1": map[string]metadata.Meta{},
"__markers__": map[string]metadata.Meta{},
},
assertions: func(t *testing.T, bkt *bucket.ClientMock, results Results, err error) {
bkt.AssertNotCalled(t, "Get", mock.Anything, mock.Anything)
bkt.AssertNotCalled(t, "Upload", mock.Anything, mock.Anything, mock.Anything)
Expand All @@ -80,6 +85,7 @@ func TestThanosBlockConverter(t *testing.T) {
"user3": map[string]metadata.Meta{
block1: cortexMeta("user3"),
},
"__markers__": map[string]metadata.Meta{},
},
assertions: func(t *testing.T, bkt *bucket.ClientMock, results Results, err error) {
bkt.AssertNotCalled(t, "Upload", mock.Anything, mock.Anything, mock.Anything)
Expand Down Expand Up @@ -114,6 +120,7 @@ func TestThanosBlockConverter(t *testing.T) {
"user3": map[string]metadata.Meta{
block1: thanosMeta(),
},
"__markers__": map[string]metadata.Meta{},
},
assertions: func(t *testing.T, bkt *bucket.ClientMock, results Results, err error) {
assert.Len(t, results, 3, "expected users in results")
Expand Down Expand Up @@ -149,6 +156,7 @@ func TestThanosBlockConverter(t *testing.T) {
blockWithUploadFailure: thanosMeta(),
blockWithMalformedMeta: thanosMeta(),
},
"__markers__": map[string]metadata.Meta{},
},
assertions: func(t *testing.T, bkt *bucket.ClientMock, results Results, err error) {
assert.Len(t, results["user1"].FailedBlocks, 1)
Expand Down

0 comments on commit e8a101c

Please sign in to comment.