From b76d5e49007d0afefa14353da95e6d7104c0fe18 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=F0=9F=8C=B2=20Harry=20=F0=9F=8C=8A=20John=20=F0=9F=8F=94?= Date: Thu, 23 Nov 2023 16:55:50 -0800 Subject: [PATCH 1/4] Move tenant-deletion-mark to a global dir MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: 🌲 Harry 🌊 John 🏔 --- pkg/compactor/blocks_cleaner.go | 5 ++ pkg/compactor/blocks_cleaner_test.go | 17 ++++-- pkg/compactor/compactor_test.go | 40 +++++++++----- pkg/purger/tenant_deletion_api_test.go | 6 +- pkg/querier/blocks_finder_bucket_scan_test.go | 7 ++- pkg/storage/tsdb/tenant_deletion_mark.go | 48 +++++++++++++--- pkg/storage/tsdb/tenant_deletion_mark_test.go | 55 ++++++++++++++++++- pkg/storage/tsdb/users_scanner.go | 6 +- pkg/storage/tsdb/users_scanner_test.go | 13 +++-- pkg/util/allowed_tenants.go | 7 +++ 10 files changed, 165 insertions(+), 39 deletions(-) diff --git a/pkg/compactor/blocks_cleaner.go b/pkg/compactor/blocks_cleaner.go index 071c91d559..29334c9b7d 100644 --- a/pkg/compactor/blocks_cleaner.go +++ b/pkg/compactor/blocks_cleaner.go @@ -300,6 +300,11 @@ func (c *BlocksCleaner) deleteUserMarkedForDeletion(ctx context.Context, userID level.Info(userLogger).Log("msg", "deleted marker files for tenant marked for deletion", "count", deleted) } + // Deleting global markers for the user + if err := c.bucketClient.Delete(ctx, cortex_tsdb.GetGlobalDeletionMarkPath(userID)); err != nil { + return errors.Wrap(err, "failed to delete global marker file") + } + return nil } diff --git a/pkg/compactor/blocks_cleaner_test.go b/pkg/compactor/blocks_cleaner_test.go index 14e496d17f..d7546286e6 100644 --- a/pkg/compactor/blocks_cleaner_test.go +++ b/pkg/compactor/blocks_cleaner_test.go @@ -209,10 +209,6 @@ func testBlocksCleanerWithOptions(t *testing.T, options testBlocksCleanerOptions {path: path.Join("user-3", block9.String(), "index"), expectedExists: false}, {path: path.Join("user-3", block10.String(), metadata.MetaFilename), expectedExists: false}, {path: path.Join("user-3", block10.String(), "index"), expectedExists: false}, - // Tenant deletion mark is not removed. - {path: path.Join("user-3", tsdb.TenantDeletionMarkPath), expectedExists: true}, - // User-4 is removed fully. - {path: path.Join("user-4", tsdb.TenantDeletionMarkPath), expectedExists: options.user4FilesExist}, {path: path.Join("user-4", block.DebugMetas, "meta.json"), expectedExists: options.user4FilesExist}, } { exists, err := bucketClient.Exists(ctx, tc.path) @@ -220,6 +216,19 @@ func testBlocksCleanerWithOptions(t *testing.T, options testBlocksCleanerOptions assert.Equal(t, tc.expectedExists, exists, tc.path) } + // Check if tenant deletion mark exists + for _, tc := range []struct { + user string + expectedExists bool + }{ + {"user-3", true}, + {"user-4", options.user4FilesExist}, + } { + exists, err := tsdb.TenantDeletionMarkExists(ctx, bucketClient, tc.user) + require.NoError(t, err) + assert.Equal(t, tc.expectedExists, exists, tc.user) + } + assert.Equal(t, float64(1), testutil.ToFloat64(cleaner.runsStarted)) assert.Equal(t, float64(1), testutil.ToFloat64(cleaner.runsCompleted)) assert.Equal(t, float64(0), testutil.ToFloat64(cleaner.runsFailed)) diff --git a/pkg/compactor/compactor_test.go b/pkg/compactor/compactor_test.go index 36ae05bda9..c2cf41df82 100644 --- a/pkg/compactor/compactor_test.go +++ b/pkg/compactor/compactor_test.go @@ -176,7 +176,8 @@ func TestCompactor_SkipCompactionWhenCmkError(t *testing.T) { bucketClient.MockGet(userID+"/bucket-index.json.gz", "", nil) bucketClient.MockUpload(userID+"/bucket-index-sync-status.json", nil) bucketClient.MockUpload(userID+"/bucket-index.json.gz", nil) - bucketClient.MockExists(path.Join(userID, cortex_tsdb.TenantDeletionMarkPath), false, nil) + bucketClient.MockExists(cortex_tsdb.GetGlobalDeletionMarkPath(userID), false, nil) + bucketClient.MockExists(cortex_tsdb.GetLocalDeletionMarkPath(userID), false, nil) cfg := prepareConfig() c, _, _, logs, _ := prepare(t, cfg, bucketClient, nil) @@ -500,7 +501,8 @@ func TestCompactor_ShouldIncrementCompactionErrorIfFailedToCompactASingleTenant( bucketClient.MockIter("", []string{userID}, nil) bucketClient.MockIter(userID+"/", []string{userID + "/01DTVP434PA9VFXSW2JKB3392D/meta.json", userID + "/01FN6CDF3PNEWWRY5MPGJPE3EX/meta.json"}, nil) bucketClient.MockIter(userID+"/markers/", nil, nil) - bucketClient.MockExists(path.Join(userID, cortex_tsdb.TenantDeletionMarkPath), false, nil) + bucketClient.MockExists(cortex_tsdb.GetGlobalDeletionMarkPath(userID), false, nil) + bucketClient.MockExists(cortex_tsdb.GetLocalDeletionMarkPath(userID), false, nil) bucketClient.MockGet(userID+"/01DTVP434PA9VFXSW2JKB3392D/meta.json", mockBlockMetaJSON("01DTVP434PA9VFXSW2JKB3392D"), nil) bucketClient.MockGet(userID+"/01DTVP434PA9VFXSW2JKB3392D/no-compact-mark.json", "", nil) bucketClient.MockGet(userID+"/01DTVP434PA9VFXSW2JKB3392D/deletion-mark.json", "", nil) @@ -549,7 +551,8 @@ func TestCompactor_ShouldIncrementCompactionErrorIfFailedToCompactASingleTenant( func TestCompactor_ShouldCompactAndRemoveUserFolder(t *testing.T) { bucketClient := &bucket.ClientMock{} bucketClient.MockIter("", []string{"user-1"}, nil) - bucketClient.MockExists(path.Join("user-1", cortex_tsdb.TenantDeletionMarkPath), false, nil) + bucketClient.MockExists(cortex_tsdb.GetGlobalDeletionMarkPath("user-1"), false, nil) + bucketClient.MockExists(cortex_tsdb.GetLocalDeletionMarkPath("user-1"), false, nil) bucketClient.MockIter("user-1/", []string{"user-1/01DTVP434PA9VFXSW2JKB3392D/meta.json", "user-1/01FN6CDF3PNEWWRY5MPGJPE3EX/meta.json"}, nil) bucketClient.MockIter("user-1/markers/", nil, nil) bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/meta.json", mockBlockMetaJSON("01DTVP434PA9VFXSW2JKB3392D"), nil) @@ -593,8 +596,10 @@ func TestCompactor_ShouldIterateOverUsersAndRunCompaction(t *testing.T) { // Mock the bucket to contain two users, each one with one block. bucketClient := &bucket.ClientMock{} bucketClient.MockIter("", []string{"user-1", "user-2"}, nil) - bucketClient.MockExists(path.Join("user-1", cortex_tsdb.TenantDeletionMarkPath), false, nil) - bucketClient.MockExists(path.Join("user-2", cortex_tsdb.TenantDeletionMarkPath), false, nil) + bucketClient.MockExists(cortex_tsdb.GetGlobalDeletionMarkPath("user-1"), false, nil) + bucketClient.MockExists(cortex_tsdb.GetLocalDeletionMarkPath("user-1"), false, nil) + bucketClient.MockExists(cortex_tsdb.GetGlobalDeletionMarkPath("user-2"), false, nil) + bucketClient.MockExists(cortex_tsdb.GetLocalDeletionMarkPath("user-2"), false, nil) bucketClient.MockIter("user-1/", []string{"user-1/01DTVP434PA9VFXSW2JKB3392D/meta.json", "user-1/01FN6CDF3PNEWWRY5MPGJPE3EX/meta.json"}, nil) bucketClient.MockIter("user-2/", []string{"user-2/01DTW0ZCPDDNV4BV83Q2SV4QAZ/meta.json", "user-2/01FN3V83ABR9992RF8WRJZ76ZQ/meta.json"}, nil) bucketClient.MockIter("user-1/markers/", nil, nil) @@ -734,7 +739,8 @@ func TestCompactor_ShouldNotCompactBlocksMarkedForDeletion(t *testing.T) { bucketClient := &bucket.ClientMock{} bucketClient.MockIter("", []string{"user-1"}, nil) bucketClient.MockIter("user-1/", []string{"user-1/01DTVP434PA9VFXSW2JKB3392D", "user-1/01DTW0ZCPDDNV4BV83Q2SV4QAZ"}, nil) - bucketClient.MockExists(path.Join("user-1", cortex_tsdb.TenantDeletionMarkPath), false, nil) + bucketClient.MockExists(cortex_tsdb.GetGlobalDeletionMarkPath("user-1"), false, nil) + bucketClient.MockExists(cortex_tsdb.GetLocalDeletionMarkPath("user-1"), false, nil) // Block that has just been marked for deletion. It will not be deleted just yet, and it also will not be compacted. bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/meta.json", mockBlockMetaJSON("01DTVP434PA9VFXSW2JKB3392D"), nil) @@ -856,8 +862,10 @@ func TestCompactor_ShouldNotCompactBlocksMarkedForSkipCompact(t *testing.T) { // Mock the bucket to contain two users, each one with one block. bucketClient := &bucket.ClientMock{} bucketClient.MockIter("", []string{"user-1", "user-2"}, nil) - bucketClient.MockExists(path.Join("user-1", cortex_tsdb.TenantDeletionMarkPath), false, nil) - bucketClient.MockExists(path.Join("user-2", cortex_tsdb.TenantDeletionMarkPath), false, nil) + bucketClient.MockExists(cortex_tsdb.GetGlobalDeletionMarkPath("user-1"), false, nil) + bucketClient.MockExists(cortex_tsdb.GetLocalDeletionMarkPath("user-1"), false, nil) + bucketClient.MockExists(cortex_tsdb.GetGlobalDeletionMarkPath("user-2"), false, nil) + bucketClient.MockExists(cortex_tsdb.GetLocalDeletionMarkPath("user-2"), false, nil) bucketClient.MockIter("user-1/", []string{"user-1/01DTVP434PA9VFXSW2JKB3392D/meta.json", "user-1/01FN6CDF3PNEWWRY5MPGJPE3EX/meta.json"}, nil) bucketClient.MockIter("user-2/", []string{"user-2/01DTW0ZCPDDNV4BV83Q2SV4QAZ/meta.json", "user-2/01FN3V83ABR9992RF8WRJZ76ZQ/meta.json"}, nil) bucketClient.MockIter("user-1/markers/", nil, nil) @@ -933,8 +941,8 @@ func TestCompactor_ShouldNotCompactBlocksForUsersMarkedForDeletion(t *testing.T) bucketClient := &bucket.ClientMock{} bucketClient.MockIter("", []string{"user-1"}, nil) bucketClient.MockIter("user-1/", []string{"user-1/01DTVP434PA9VFXSW2JKB3392D"}, nil) - bucketClient.MockGet(path.Join("user-1", cortex_tsdb.TenantDeletionMarkPath), `{"deletion_time": 1}`, nil) - bucketClient.MockUpload(path.Join("user-1", cortex_tsdb.TenantDeletionMarkPath), nil) + bucketClient.MockGet(cortex_tsdb.GetGlobalDeletionMarkPath("user-1"), `{"deletion_time": 1}`, nil) + bucketClient.MockUpload(cortex_tsdb.GetGlobalDeletionMarkPath("user-1"), nil) bucketClient.MockIter("user-1/01DTVP434PA9VFXSW2JKB3392D", []string{"user-1/01DTVP434PA9VFXSW2JKB3392D/meta.json", "user-1/01DTVP434PA9VFXSW2JKB3392D/index"}, nil) bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/meta.json", mockBlockMetaJSON("01DTVP434PA9VFXSW2JKB3392D"), nil) @@ -1094,8 +1102,10 @@ func TestCompactor_ShouldCompactAllUsersOnShardingEnabledButOnlyOneInstanceRunni // Mock the bucket to contain two users, each one with one block. bucketClient := &bucket.ClientMock{} bucketClient.MockIter("", []string{"user-1", "user-2"}, nil) - bucketClient.MockExists(path.Join("user-1", cortex_tsdb.TenantDeletionMarkPath), false, nil) - bucketClient.MockExists(path.Join("user-2", cortex_tsdb.TenantDeletionMarkPath), false, nil) + bucketClient.MockExists(cortex_tsdb.GetGlobalDeletionMarkPath("user-1"), false, nil) + bucketClient.MockExists(cortex_tsdb.GetLocalDeletionMarkPath("user-1"), false, nil) + bucketClient.MockExists(cortex_tsdb.GetGlobalDeletionMarkPath("user-2"), false, nil) + bucketClient.MockExists(cortex_tsdb.GetLocalDeletionMarkPath("user-2"), false, nil) bucketClient.MockIter("user-1/", []string{"user-1/01DTVP434PA9VFXSW2JKB3392D/meta.json", "user-1/01FN6CDF3PNEWWRY5MPGJPE3EX/meta.json"}, nil) bucketClient.MockIter("user-2/", []string{"user-2/01DTW0ZCPDDNV4BV83Q2SV4QAZ/meta.json", "user-2/01FN3V83ABR9992RF8WRJZ76ZQ/meta.json"}, nil) bucketClient.MockIter("user-1/markers/", nil, nil) @@ -1202,7 +1212,8 @@ func TestCompactor_ShouldCompactOnlyUsersOwnedByTheInstanceOnShardingEnabledAndM for _, userID := range userIDs { bucketClient.MockIter(userID+"/", []string{userID + "/01DTVP434PA9VFXSW2JKB3392D"}, nil) bucketClient.MockIter(userID+"/markers/", nil, nil) - bucketClient.MockExists(path.Join(userID, cortex_tsdb.TenantDeletionMarkPath), false, nil) + bucketClient.MockExists(cortex_tsdb.GetGlobalDeletionMarkPath(userID), false, nil) + bucketClient.MockExists(cortex_tsdb.GetLocalDeletionMarkPath(userID), false, nil) bucketClient.MockGet(userID+"/01DTVP434PA9VFXSW2JKB3392D/meta.json", mockBlockMetaJSON("01DTVP434PA9VFXSW2JKB3392D"), nil) bucketClient.MockGet(userID+"/01DTVP434PA9VFXSW2JKB3392D/deletion-mark.json", "", nil) bucketClient.MockGet(userID+"/01DTVP434PA9VFXSW2JKB3392D/no-compact-mark.json", "", nil) @@ -1334,7 +1345,8 @@ func TestCompactor_ShouldCompactOnlyShardsOwnedByTheInstanceOnShardingEnabledWit bucketClient.MockIter(userID+"/", blockFiles, nil) bucketClient.MockIter(userID+"/markers/", nil, nil) - bucketClient.MockExists(path.Join(userID, cortex_tsdb.TenantDeletionMarkPath), false, nil) + bucketClient.MockExists(cortex_tsdb.GetGlobalDeletionMarkPath(userID), false, nil) + bucketClient.MockExists(cortex_tsdb.GetLocalDeletionMarkPath(userID), false, nil) bucketClient.MockGet(userID+"/bucket-index.json.gz", "", nil) bucketClient.MockUpload(userID+"/bucket-index.json.gz", nil) bucketClient.MockUpload(userID+"/bucket-index-sync-status.json", nil) diff --git a/pkg/purger/tenant_deletion_api_test.go b/pkg/purger/tenant_deletion_api_test.go index 631af9bf5d..0d8129f80b 100644 --- a/pkg/purger/tenant_deletion_api_test.go +++ b/pkg/purger/tenant_deletion_api_test.go @@ -5,7 +5,6 @@ import ( "context" "net/http" "net/http/httptest" - "path" "testing" "github.com/go-kit/log" @@ -35,8 +34,9 @@ func TestDeleteTenant(t *testing.T) { api.DeleteTenant(resp, req.WithContext(ctx)) require.Equal(t, http.StatusOK, resp.Code) - objs := bkt.Objects() - require.NotNil(t, objs[path.Join("fake", tsdb.TenantDeletionMarkPath)]) + exists, err := tsdb.TenantDeletionMarkExists(ctx, bkt, "fake") + require.NoError(t, err) + require.True(t, exists) } } diff --git a/pkg/querier/blocks_finder_bucket_scan_test.go b/pkg/querier/blocks_finder_bucket_scan_test.go index dd8a6df38a..21abdee834 100644 --- a/pkg/querier/blocks_finder_bucket_scan_test.go +++ b/pkg/querier/blocks_finder_bucket_scan_test.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "os" - "path" "strings" "testing" "time" @@ -96,7 +95,8 @@ func TestBucketScanBlocksFinder_InitialScanFailure(t *testing.T) { // Mock the storage to simulate a failure when reading objects. bucket.MockIter("", []string{"user-1"}, nil) bucket.MockIter("user-1/", []string{"user-1/01DTVP434PA9VFXSW2JKB3392D/meta.json"}, nil) - bucket.MockExists(path.Join("user-1", cortex_tsdb.TenantDeletionMarkPath), false, nil) + bucket.MockExists(cortex_tsdb.GetGlobalDeletionMarkPath("user-1"), false, nil) + bucket.MockExists(cortex_tsdb.GetLocalDeletionMarkPath("user-1"), false, nil) bucket.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/meta.json", "invalid", errors.New("mocked error")) require.NoError(t, s.StartAsync(ctx)) @@ -143,7 +143,8 @@ func TestBucketScanBlocksFinder_StopWhileRunningTheInitialScanOnManyTenants(t *t bucket.MockIterWithCallback(tenantID+"/", []string{}, nil, func() { time.Sleep(time.Second) }) - bucket.MockExists(path.Join(tenantID, cortex_tsdb.TenantDeletionMarkPath), false, nil) + bucket.MockExists(cortex_tsdb.GetGlobalDeletionMarkPath(tenantID), false, nil) + bucket.MockExists(cortex_tsdb.GetLocalDeletionMarkPath(tenantID), false, nil) } cfg := prepareBucketScanBlocksFinderConfig() diff --git a/pkg/storage/tsdb/tenant_deletion_mark.go b/pkg/storage/tsdb/tenant_deletion_mark.go index b4b8022ed0..a6a5844b36 100644 --- a/pkg/storage/tsdb/tenant_deletion_mark.go +++ b/pkg/storage/tsdb/tenant_deletion_mark.go @@ -11,11 +11,12 @@ import ( "github.com/pkg/errors" "github.com/thanos-io/objstore" + "github.com/cortexproject/cortex/pkg/util" util_log "github.com/cortexproject/cortex/pkg/util/log" ) // Relative to user-specific prefix. -const TenantDeletionMarkPath = "markers/tenant-deletion-mark.json" +const TenantDeletionMarkPath = "tenant-deletion-mark.json" type TenantDeletionMark struct { // Unix timestamp when deletion marker was created. @@ -31,15 +32,49 @@ func NewTenantDeletionMark(deletionTime time.Time) *TenantDeletionMark { // Checks for deletion mark for tenant. Errors other than "object not found" are returned. func TenantDeletionMarkExists(ctx context.Context, bkt objstore.BucketReader, userID string) (bool, error) { - markerFile := path.Join(userID, TenantDeletionMarkPath) + markerFile := GetGlobalDeletionMarkPath(userID) + if globalExists, err := exists(ctx, bkt, markerFile); err != nil { + return false, err + } else if globalExists { + return true, nil + } - return bkt.Exists(ctx, markerFile) + markerFile = GetLocalDeletionMarkPath(userID) + return exists(ctx, bkt, markerFile) } // Uploads deletion mark to the tenant location in the bucket. func WriteTenantDeletionMark(ctx context.Context, bkt objstore.Bucket, userID string, mark *TenantDeletionMark) error { - markerFile := path.Join(userID, TenantDeletionMarkPath) + markerFile := GetGlobalDeletionMarkPath(userID) + return write(ctx, bkt, markerFile, mark) +} + +// Returns tenant deletion mark for given user, if it exists. If it doesn't exist, returns nil mark, and no error. +func ReadTenantDeletionMark(ctx context.Context, bkt objstore.BucketReader, userID string) (*TenantDeletionMark, error) { + markerFile := GetGlobalDeletionMarkPath(userID) + if mark, err := read(ctx, bkt, markerFile); err != nil { + return nil, err + } else if mark != nil { + return mark, nil + } + markerFile = GetLocalDeletionMarkPath(userID) + return read(ctx, bkt, markerFile) +} + +func GetLocalDeletionMarkPath(userID string) string { + return path.Join(userID, "markers", TenantDeletionMarkPath) +} + +func GetGlobalDeletionMarkPath(userID string) string { + return path.Join(util.GlobalMarkersDir, userID, TenantDeletionMarkPath) +} + +func exists(ctx context.Context, bkt objstore.BucketReader, markerFile string) (bool, error) { + return bkt.Exists(ctx, markerFile) +} + +func write(ctx context.Context, bkt objstore.Bucket, markerFile string, mark *TenantDeletionMark) error { data, err := json.Marshal(mark) if err != nil { return errors.Wrap(err, "serialize tenant deletion mark") @@ -48,10 +83,7 @@ func WriteTenantDeletionMark(ctx context.Context, bkt objstore.Bucket, userID st return errors.Wrap(bkt.Upload(ctx, markerFile, bytes.NewReader(data)), "upload tenant deletion mark") } -// Returns tenant deletion mark for given user, if it exists. If it doesn't exist, returns nil mark, and no error. -func ReadTenantDeletionMark(ctx context.Context, bkt objstore.BucketReader, userID string) (*TenantDeletionMark, error) { - markerFile := path.Join(userID, TenantDeletionMarkPath) - +func read(ctx context.Context, bkt objstore.BucketReader, markerFile string) (*TenantDeletionMark, error) { r, err := bkt.Get(ctx, markerFile) if err != nil { if bkt.IsObjNotFoundErr(err) { diff --git a/pkg/storage/tsdb/tenant_deletion_mark_test.go b/pkg/storage/tsdb/tenant_deletion_mark_test.go index 1d3ced41a4..6ae53a1168 100644 --- a/pkg/storage/tsdb/tenant_deletion_mark_test.go +++ b/pkg/storage/tsdb/tenant_deletion_mark_test.go @@ -9,7 +9,7 @@ import ( "github.com/thanos-io/objstore" ) -func TestTenantDeletionMarkExists(t *testing.T) { +func TestTenantLocalDeletionMarkExists(t *testing.T) { const username = "user" for name, tc := range map[string]struct { @@ -61,3 +61,56 @@ func TestTenantDeletionMarkExists(t *testing.T) { }) } } + +func TestTenantGlobalDeletionMarkExists(t *testing.T) { + const username = "user" + + for name, tc := range map[string]struct { + objects map[string][]byte + exists bool + deletedUsers []string + }{ + "empty": { + objects: nil, + exists: false, + }, + + "mark doesn't exist": { + objects: map[string][]byte{ + "user/01EQK4QKFHVSZYVJ908Y7HH9E0/meta.json": []byte("data"), + }, + exists: false, + }, + + "mark exists": { + objects: map[string][]byte{ + "user/01EQK4QKFHVSZYVJ908Y7HH9E0/meta.json": []byte("data"), + "markers/user/" + TenantDeletionMarkPath: []byte("data"), + }, + exists: true, + }, + "mark exists - upload via WriteTenantDeletionMark": { + objects: map[string][]byte{ + "user/01EQK4QKFHVSZYVJ908Y7HH9E0/meta.json": []byte("data"), + }, + deletedUsers: []string{"user"}, + exists: true, + }, + } { + t.Run(name, func(t *testing.T) { + bkt := objstore.NewInMemBucket() + // "upload" objects + for objName, data := range tc.objects { + require.NoError(t, bkt.Upload(context.Background(), objName, bytes.NewReader(data))) + } + + for _, user := range tc.deletedUsers { + require.NoError(t, WriteTenantDeletionMark(context.Background(), bkt, user, &TenantDeletionMark{})) + } + + res, err := TenantDeletionMarkExists(context.Background(), bkt, username) + require.NoError(t, err) + require.Equal(t, tc.exists, res) + }) + } +} diff --git a/pkg/storage/tsdb/users_scanner.go b/pkg/storage/tsdb/users_scanner.go index 956c5f429c..54f40bcc9d 100644 --- a/pkg/storage/tsdb/users_scanner.go +++ b/pkg/storage/tsdb/users_scanner.go @@ -4,6 +4,7 @@ import ( "context" "strings" + "github.com/cortexproject/cortex/pkg/util" "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/thanos-io/objstore" @@ -11,7 +12,10 @@ import ( // AllUsers returns true to each call and should be used whenever the UsersScanner should not filter out // any user due to sharding. -func AllUsers(_ string) (bool, error) { +func AllUsers(user string) (bool, error) { + if user == util.GlobalMarkersDir { + return false, nil + } return true, nil } diff --git a/pkg/storage/tsdb/users_scanner_test.go b/pkg/storage/tsdb/users_scanner_test.go index 90c132fd3c..72503865aa 100644 --- a/pkg/storage/tsdb/users_scanner_test.go +++ b/pkg/storage/tsdb/users_scanner_test.go @@ -3,7 +3,6 @@ package tsdb import ( "context" "errors" - "path" "testing" "github.com/go-kit/log" @@ -16,8 +15,10 @@ import ( func TestUsersScanner_ScanUsers_ShouldReturnedOwnedUsersOnly(t *testing.T) { bucketClient := &bucket.ClientMock{} bucketClient.MockIter("", []string{"user-1", "user-2", "user-3", "user-4"}, nil) - bucketClient.MockExists(path.Join("user-1", TenantDeletionMarkPath), false, nil) - bucketClient.MockExists(path.Join("user-3", TenantDeletionMarkPath), true, nil) + bucketClient.MockExists(GetGlobalDeletionMarkPath("user-1"), false, nil) + bucketClient.MockExists(GetLocalDeletionMarkPath("user-1"), false, nil) + bucketClient.MockExists(GetGlobalDeletionMarkPath("user-3"), true, nil) + bucketClient.MockExists(GetLocalDeletionMarkPath("user-3"), true, nil) isOwned := func(userID string) (bool, error) { return userID == "user-1" || userID == "user-3", nil @@ -36,8 +37,10 @@ func TestUsersScanner_ScanUsers_ShouldReturnUsersForWhichOwnerCheckOrTenantDelet bucketClient := &bucket.ClientMock{} bucketClient.MockIter("", expected, nil) - bucketClient.MockExists(path.Join("user-1", TenantDeletionMarkPath), false, nil) - bucketClient.MockExists(path.Join("user-2", TenantDeletionMarkPath), false, errors.New("fail")) + bucketClient.MockExists(GetGlobalDeletionMarkPath("user-1"), false, nil) + bucketClient.MockExists(GetLocalDeletionMarkPath("user-1"), false, nil) + + bucketClient.MockExists(GetGlobalDeletionMarkPath("user-2"), false, errors.New("fail")) isOwned := func(userID string) (bool, error) { return false, errors.New("failed to check if user is owned") diff --git a/pkg/util/allowed_tenants.go b/pkg/util/allowed_tenants.go index 88c7a6333b..2e1807689c 100644 --- a/pkg/util/allowed_tenants.go +++ b/pkg/util/allowed_tenants.go @@ -1,5 +1,7 @@ package util +const GlobalMarkersDir = "__markers__" + // AllowedTenants that can answer whether tenant is allowed or not based on configuration. // Default value (nil) allows all tenants. type AllowedTenants struct { @@ -34,6 +36,11 @@ func NewAllowedTenants(enabled []string, disabled []string) *AllowedTenants { } func (a *AllowedTenants) IsAllowed(tenantID string) bool { + if tenantID == GlobalMarkersDir { + // This is reserverd for global markers + return false + } + if a == nil { return true } From aa10248a267bba43601a40bd29267959bd6d9b5c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=F0=9F=8C=B2=20Harry=20=F0=9F=8C=8A=20John=20=F0=9F=8F=94?= Date: Thu, 23 Nov 2023 20:32:56 -0800 Subject: [PATCH 2/4] Fix tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: 🌲 Harry 🌊 John 🏔 --- pkg/storage/tsdb/tenant_deletion_mark_test.go | 56 ++----------------- pkg/util/allowed_tenants.go | 2 +- 2 files changed, 6 insertions(+), 52 deletions(-) diff --git a/pkg/storage/tsdb/tenant_deletion_mark_test.go b/pkg/storage/tsdb/tenant_deletion_mark_test.go index 6ae53a1168..22a9ade6b2 100644 --- a/pkg/storage/tsdb/tenant_deletion_mark_test.go +++ b/pkg/storage/tsdb/tenant_deletion_mark_test.go @@ -9,7 +9,7 @@ import ( "github.com/thanos-io/objstore" ) -func TestTenantLocalDeletionMarkExists(t *testing.T) { +func TestTenantDeletionMarkExists(t *testing.T) { const username = "user" for name, tc := range map[string]struct { @@ -29,63 +29,17 @@ func TestTenantLocalDeletionMarkExists(t *testing.T) { exists: false, }, - "mark exists": { + "local mark exists": { objects: map[string][]byte{ "user/01EQK4QKFHVSZYVJ908Y7HH9E0/meta.json": []byte("data"), - "user/" + TenantDeletionMarkPath: []byte("data"), + GetLocalDeletionMarkPath("user"): []byte("data"), }, exists: true, }, - "mark exists - upload via WriteTenantDeletionMark": { - objects: map[string][]byte{ - "user/01EQK4QKFHVSZYVJ908Y7HH9E0/meta.json": []byte("data"), - }, - deletedUsers: []string{"user"}, - exists: true, - }, - } { - t.Run(name, func(t *testing.T) { - bkt := objstore.NewInMemBucket() - // "upload" objects - for objName, data := range tc.objects { - require.NoError(t, bkt.Upload(context.Background(), objName, bytes.NewReader(data))) - } - - for _, user := range tc.deletedUsers { - require.NoError(t, WriteTenantDeletionMark(context.Background(), bkt, user, &TenantDeletionMark{})) - } - - res, err := TenantDeletionMarkExists(context.Background(), bkt, username) - require.NoError(t, err) - require.Equal(t, tc.exists, res) - }) - } -} - -func TestTenantGlobalDeletionMarkExists(t *testing.T) { - const username = "user" - - for name, tc := range map[string]struct { - objects map[string][]byte - exists bool - deletedUsers []string - }{ - "empty": { - objects: nil, - exists: false, - }, - - "mark doesn't exist": { - objects: map[string][]byte{ - "user/01EQK4QKFHVSZYVJ908Y7HH9E0/meta.json": []byte("data"), - }, - exists: false, - }, - - "mark exists": { + "global mark exists": { objects: map[string][]byte{ "user/01EQK4QKFHVSZYVJ908Y7HH9E0/meta.json": []byte("data"), - "markers/user/" + TenantDeletionMarkPath: []byte("data"), + GetGlobalDeletionMarkPath("user"): []byte("data"), }, exists: true, }, diff --git a/pkg/util/allowed_tenants.go b/pkg/util/allowed_tenants.go index 2e1807689c..d5b7af773e 100644 --- a/pkg/util/allowed_tenants.go +++ b/pkg/util/allowed_tenants.go @@ -37,7 +37,7 @@ func NewAllowedTenants(enabled []string, disabled []string) *AllowedTenants { func (a *AllowedTenants) IsAllowed(tenantID string) bool { if tenantID == GlobalMarkersDir { - // This is reserverd for global markers + // __markers__ is reserved for global markers and no tenant should be allowed to have that name. return false } From c3503fd3762b6819df1d2f481908b471688adcd1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=F0=9F=8C=B2=20Harry=20=F0=9F=8C=8A=20John=20=F0=9F=8F=94?= Date: Wed, 29 Nov 2023 10:10:58 -0800 Subject: [PATCH 3/4] Add CHANGELOG MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: 🌲 Harry 🌊 John 🏔 --- CHANGELOG.md | 1 + pkg/storage/tsdb/caching_bucket.go | 2 +- pkg/storage/tsdb/tenant_deletion_mark.go | 7 +++---- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index efcf471713..730438fe5c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -89,6 +89,7 @@ * [ENHANCEMENT] Ingester: Added new ingester TSDB metrics `cortex_ingester_tsdb_head_samples_appended_total`, `cortex_ingester_tsdb_head_out_of_order_samples_appended_total`, `cortex_ingester_tsdb_snapshot_replay_error_total`, `cortex_ingester_tsdb_sample_ooo_delta` and `cortex_ingester_tsdb_mmap_chunks_total`. #5624 * [ENHANCEMENT] Query Frontend: Handle context error before decoding and merging responses. #5499 * [ENHANCEMENT] Store-Gateway and AlertManager: Add a `wait_instance_time_out` to context to avoid waiting forever. #5581 +* [ENHANCEMENT] Store-Gateway: Move the `tenant-deletion-mark.json`` to a global dir. #5676 * [BUGFIX] Compactor: Fix possible division by zero during compactor config validation. #5535 * [BUGFIX] Ruler: Validate if rule group can be safely converted back to rule group yaml from protobuf message #5265 * [BUGFIX] Querier: Convert gRPC `ResourceExhausted` status code from store gateway to 422 limit error. #5286 diff --git a/pkg/storage/tsdb/caching_bucket.go b/pkg/storage/tsdb/caching_bucket.go index 293fe92dc4..535562b703 100644 --- a/pkg/storage/tsdb/caching_bucket.go +++ b/pkg/storage/tsdb/caching_bucket.go @@ -183,7 +183,7 @@ var chunksMatcher = regexp.MustCompile(`^.*/chunks/\d+$`) func isTSDBChunkFile(name string) bool { return chunksMatcher.MatchString(name) } func isMetaFile(name string) bool { - return strings.HasSuffix(name, "/"+metadata.MetaFilename) || strings.HasSuffix(name, "/"+metadata.DeletionMarkFilename) || strings.HasSuffix(name, "/"+TenantDeletionMarkPath) + return strings.HasSuffix(name, "/"+metadata.MetaFilename) || strings.HasSuffix(name, "/"+metadata.DeletionMarkFilename) || strings.HasSuffix(name, "/"+TenantDeletionMarkFile) } func isBlockIndexFile(name string) bool { diff --git a/pkg/storage/tsdb/tenant_deletion_mark.go b/pkg/storage/tsdb/tenant_deletion_mark.go index a6a5844b36..72b913f2a7 100644 --- a/pkg/storage/tsdb/tenant_deletion_mark.go +++ b/pkg/storage/tsdb/tenant_deletion_mark.go @@ -15,8 +15,7 @@ import ( util_log "github.com/cortexproject/cortex/pkg/util/log" ) -// Relative to user-specific prefix. -const TenantDeletionMarkPath = "tenant-deletion-mark.json" +const TenantDeletionMarkFile = "tenant-deletion-mark.json" type TenantDeletionMark struct { // Unix timestamp when deletion marker was created. @@ -63,11 +62,11 @@ func ReadTenantDeletionMark(ctx context.Context, bkt objstore.BucketReader, user } func GetLocalDeletionMarkPath(userID string) string { - return path.Join(userID, "markers", TenantDeletionMarkPath) + return path.Join(userID, "markers", TenantDeletionMarkFile) } func GetGlobalDeletionMarkPath(userID string) string { - return path.Join(util.GlobalMarkersDir, userID, TenantDeletionMarkPath) + return path.Join(util.GlobalMarkersDir, userID, TenantDeletionMarkFile) } func exists(ctx context.Context, bkt objstore.BucketReader, markerFile string) (bool, error) { From 74f28f9bf6a7a85e03297ce3d4e3d6a527f51448 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=F0=9F=8C=B2=20Harry=20=F0=9F=8C=8A=20John=20=F0=9F=8F=94?= Date: Wed, 29 Nov 2023 10:48:29 -0800 Subject: [PATCH 4/4] Address comments MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: 🌲 Harry 🌊 John 🏔 --- CHANGELOG.md | 2 +- pkg/compactor/blocks_cleaner.go | 6 ++---- pkg/storage/tsdb/tenant_deletion_mark.go | 11 +++++++++++ 3 files changed, 14 insertions(+), 5 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 730438fe5c..1ffa8ee6dc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -89,7 +89,7 @@ * [ENHANCEMENT] Ingester: Added new ingester TSDB metrics `cortex_ingester_tsdb_head_samples_appended_total`, `cortex_ingester_tsdb_head_out_of_order_samples_appended_total`, `cortex_ingester_tsdb_snapshot_replay_error_total`, `cortex_ingester_tsdb_sample_ooo_delta` and `cortex_ingester_tsdb_mmap_chunks_total`. #5624 * [ENHANCEMENT] Query Frontend: Handle context error before decoding and merging responses. #5499 * [ENHANCEMENT] Store-Gateway and AlertManager: Add a `wait_instance_time_out` to context to avoid waiting forever. #5581 -* [ENHANCEMENT] Store-Gateway: Move the `tenant-deletion-mark.json`` to a global dir. #5676 +* [ENHANCEMENT] Blocks storage: Move the tenant deletion mark from `/markers/tenant-deletion-mark.json` to `__markers__//tenant-deletion-mark.json`. #5676 * [BUGFIX] Compactor: Fix possible division by zero during compactor config validation. #5535 * [BUGFIX] Ruler: Validate if rule group can be safely converted back to rule group yaml from protobuf message #5265 * [BUGFIX] Querier: Convert gRPC `ResourceExhausted` status code from store gateway to 422 limit error. #5286 diff --git a/pkg/compactor/blocks_cleaner.go b/pkg/compactor/blocks_cleaner.go index 29334c9b7d..5cdda9d764 100644 --- a/pkg/compactor/blocks_cleaner.go +++ b/pkg/compactor/blocks_cleaner.go @@ -293,16 +293,14 @@ func (c *BlocksCleaner) deleteUserMarkedForDeletion(ctx context.Context, userID level.Info(userLogger).Log("msg", "deleted files under "+block.DebugMetas+" for tenant marked for deletion", "count", deleted) } - // Tenant deletion mark file is inside Markers as well. if deleted, err := bucket.DeletePrefix(ctx, userBucket, bucketindex.MarkersPathname, userLogger); err != nil { return errors.Wrap(err, "failed to delete marker files") } else if deleted > 0 { level.Info(userLogger).Log("msg", "deleted marker files for tenant marked for deletion", "count", deleted) } - // Deleting global markers for the user - if err := c.bucketClient.Delete(ctx, cortex_tsdb.GetGlobalDeletionMarkPath(userID)); err != nil { - return errors.Wrap(err, "failed to delete global marker file") + if err := cortex_tsdb.DeleteTenantDeletionMark(ctx, c.bucketClient, userID); err != nil { + return errors.Wrap(err, "failed to delete tenant deletion mark") } return nil diff --git a/pkg/storage/tsdb/tenant_deletion_mark.go b/pkg/storage/tsdb/tenant_deletion_mark.go index 72b913f2a7..8b07013d2c 100644 --- a/pkg/storage/tsdb/tenant_deletion_mark.go +++ b/pkg/storage/tsdb/tenant_deletion_mark.go @@ -61,6 +61,17 @@ func ReadTenantDeletionMark(ctx context.Context, bkt objstore.BucketReader, user return read(ctx, bkt, markerFile) } +// Deletes the tenant deletion mark for given user if it exists. +func DeleteTenantDeletionMark(ctx context.Context, bkt objstore.Bucket, userID string) error { + if err := bkt.Delete(ctx, GetGlobalDeletionMarkPath(userID)); err != nil { + return err + } + if err := bkt.Delete(ctx, GetLocalDeletionMarkPath(userID)); err != nil { + return err + } + return nil +} + func GetLocalDeletionMarkPath(userID string) string { return path.Join(userID, "markers", TenantDeletionMarkFile) }