From 6b5eef7364166f1bc8cf5a037090ca8d75d9f911 Mon Sep 17 00:00:00 2001 From: Wen Xu Date: Tue, 28 Nov 2023 16:00:20 +0000 Subject: [PATCH] add unit test for bucketindex block ids fetcher Signed-off-by: Wen Xu --- pkg/compactor/compactor.go | 3 +- .../tsdb/bucketindex/block_ids_fetcher.go} | 31 +++-- .../bucketindex/block_ids_fetcher_test.go | 111 ++++++++++++++++++ 3 files changed, 131 insertions(+), 14 deletions(-) rename pkg/{compactor/bucket-index-block-ids-fetcher.go => storage/tsdb/bucketindex/block_ids_fetcher.go} (59%) create mode 100644 pkg/storage/tsdb/bucketindex/block_ids_fetcher_test.go diff --git a/pkg/compactor/compactor.go b/pkg/compactor/compactor.go index f73f039dee..83b20142fe 100644 --- a/pkg/compactor/compactor.go +++ b/pkg/compactor/compactor.go @@ -793,8 +793,9 @@ func (c *Compactor) compactUser(ctx context.Context, userID string) error { noCompactMarkerFilter := compact.NewGatherNoCompactionMarkFilter(ulogger, bucket, c.compactorCfg.MetaSyncConcurrency) var blockIDsFetcher block.BlockIDsFetcher + // TODO: remove the BucketIndexBlockIDsFetcherEnabled flag once it is proved stable if c.storageCfg.BucketStore.BucketIndex.Enabled && c.compactorCfg.BucketIndexBlockIDsFetcherEnalbed { - blockIDsFetcher = NewBucketIndexBlockIDsFetcher(ulogger, c.bucketClient, userID, c.limits) + blockIDsFetcher = bucketindex.NewBlockIDsFetcher(ulogger, c.bucketClient, userID, c.limits) } else { blockIDsFetcher = block.NewBaseBlockIDsFetcher(ulogger, bucket) } diff --git a/pkg/compactor/bucket-index-block-ids-fetcher.go b/pkg/storage/tsdb/bucketindex/block_ids_fetcher.go similarity index 59% rename from pkg/compactor/bucket-index-block-ids-fetcher.go rename to pkg/storage/tsdb/bucketindex/block_ids_fetcher.go index 4ddac0e98e..d7b030f1aa 100644 --- a/pkg/compactor/bucket-index-block-ids-fetcher.go +++ b/pkg/storage/tsdb/bucketindex/block_ids_fetcher.go @@ -1,26 +1,27 @@ -package compactor +package bucketindex import ( "context" + "github.com/thanos-io/thanos/pkg/block" - "github.com/cortexproject/cortex/pkg/storage/bucket" - "github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex" "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/oklog/ulid" "github.com/pkg/errors" "github.com/thanos-io/objstore" + + "github.com/cortexproject/cortex/pkg/storage/bucket" ) -type BucketIndexBlockIDsFetcher struct { +type BlockIDsFetcher struct { logger log.Logger bkt objstore.Bucket userID string cfgProvider bucket.TenantConfigProvider } -func NewBucketIndexBlockIDsFetcher(logger log.Logger, bkt objstore.Bucket, userID string, cfgProvider bucket.TenantConfigProvider) *BucketIndexBlockIDsFetcher { - return &BucketIndexBlockIDsFetcher{ +func NewBlockIDsFetcher(logger log.Logger, bkt objstore.Bucket, userID string, cfgProvider bucket.TenantConfigProvider) *BlockIDsFetcher { + return &BlockIDsFetcher{ logger: logger, bkt: bkt, userID: userID, @@ -28,18 +29,22 @@ func NewBucketIndexBlockIDsFetcher(logger log.Logger, bkt objstore.Bucket, userI } } -func (f *BucketIndexBlockIDsFetcher) GetActiveAndPartialBlockIDs(ctx context.Context, ch chan<- ulid.ULID) (partialBlocks map[ulid.ULID]bool, err error) { +func (f *BlockIDsFetcher) GetActiveAndPartialBlockIDs(ctx context.Context, ch chan<- ulid.ULID) (partialBlocks map[ulid.ULID]bool, err error) { // Fetch the bucket index. - idx, err := bucketindex.ReadIndex(ctx, f.bkt, f.userID, f.cfgProvider, f.logger) - if errors.Is(err, bucketindex.ErrIndexNotFound) { + idx, err := ReadIndex(ctx, f.bkt, f.userID, f.cfgProvider, f.logger) + userBkt := bucket.NewUserBucketClient(f.userID, f.bkt, f.cfgProvider) + baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(f.logger, userBkt) + if errors.Is(err, ErrIndexNotFound) { // This is a legit case happening when the first blocks of a tenant have recently been uploaded by ingesters // and their bucket index has not been created yet. - return nil, nil + // Fallback to BaseBlockIDsFetcher. + return baseBlockIDsFetcher.GetActiveAndPartialBlockIDs(ctx, ch) } - if errors.Is(err, bucketindex.ErrIndexCorrupted) { + if errors.Is(err, ErrIndexCorrupted) { // In case a single tenant bucket index is corrupted, we want to return empty active blocks and parital blocks, so skipping this compaction cycle level.Error(f.logger).Log("msg", "corrupted bucket index found", "user", f.userID, "err", err) - return nil, nil + // Fallback to BaseBlockIDsFetcher. + return baseBlockIDsFetcher.GetActiveAndPartialBlockIDs(ctx, ch) } if errors.Is(err, bucket.ErrCustomerManagedKeyAccessDenied) { @@ -50,7 +55,7 @@ func (f *BucketIndexBlockIDsFetcher) GetActiveAndPartialBlockIDs(ctx context.Con } if err != nil { - return nil, errors.Wrapf(err, "read bucket index") + return nil, err } // Sent the active block ids diff --git a/pkg/storage/tsdb/bucketindex/block_ids_fetcher_test.go b/pkg/storage/tsdb/bucketindex/block_ids_fetcher_test.go new file mode 100644 index 0000000000..5422513a0f --- /dev/null +++ b/pkg/storage/tsdb/bucketindex/block_ids_fetcher_test.go @@ -0,0 +1,111 @@ +package bucketindex + +import ( + "bytes" + "context" + "encoding/json" + "github.com/thanos-io/thanos/pkg/block/metadata" + "path" + "sync" + "testing" + "time" + + "github.com/cortexproject/cortex/pkg/util/concurrency" + "github.com/go-kit/log" + "github.com/oklog/ulid" + "github.com/stretchr/testify/require" + + cortex_testutil "github.com/cortexproject/cortex/pkg/storage/tsdb/testutil" +) + +func TestBlockIDsFetcher_Fetch(t *testing.T) { + t.Parallel() + const userID = "user-1" + + bkt, _ := cortex_testutil.PrepareFilesystemBucket(t) + ctx := context.Background() + now := time.Now() + logs := &concurrency.SyncBuffer{} + logger := log.NewLogfmtLogger(logs) + + // Create a bucket index. + block1 := &Block{ID: ulid.MustNew(1, nil)} + block2 := &Block{ID: ulid.MustNew(2, nil)} + block3 := &Block{ID: ulid.MustNew(3, nil)} + mark1 := &BlockDeletionMark{ID: block1.ID, DeletionTime: now.Add(-time.Hour).Unix()} // Below the ignore delay threshold. + mark2 := &BlockDeletionMark{ID: block2.ID, DeletionTime: now.Add(-3 * time.Hour).Unix()} // Above the ignore delay threshold. + + require.NoError(t, WriteIndex(ctx, bkt, userID, nil, &Index{ + Version: IndexVersion1, + Blocks: Blocks{block1, block2, block3}, + BlockDeletionMarks: BlockDeletionMarks{mark1, mark2}, + UpdatedAt: now.Unix(), + })) + + blockIdsFetcher := NewBlockIDsFetcher(logger, bkt, userID, nil) + ch := make(chan ulid.ULID) + var wg sync.WaitGroup + var blockIds []ulid.ULID + wg.Add(1) + go func() { + defer wg.Done() + for id := range ch { + blockIds = append(blockIds, id) + } + }() + blockIdsFetcher.GetActiveAndPartialBlockIDs(ctx, ch) + close(ch) + wg.Wait() + require.Equal(t, []ulid.ULID{block1.ID, block2.ID, block3.ID}, blockIds) +} + +func TestBlockIDsFetcherFetcher_Fetch_NoBucketIndex(t *testing.T) { + t.Parallel() + const userID = "user-1" + + bkt, _ := cortex_testutil.PrepareFilesystemBucket(t) + ctx := context.Background() + now := time.Now() + logs := &concurrency.SyncBuffer{} + logger := log.NewLogfmtLogger(logs) + + //prepare tenant bucket + var meta1, meta2, meta3 metadata.Meta + block1 := &Block{ID: ulid.MustNew(1, nil)} + meta1.Version = 1 + meta1.ULID = block1.ID + block2 := &Block{ID: ulid.MustNew(2, nil)} + meta2.Version = 1 + meta2.ULID = block2.ID + block3 := &Block{ID: ulid.MustNew(3, nil)} + meta3.Version = 1 + meta3.ULID = block3.ID + metas := []metadata.Meta{meta1, meta2, meta3} + mark1 := &BlockDeletionMark{ID: block1.ID, DeletionTime: now.Add(-time.Hour).Unix()} // Below the ignore delay threshold. + mark2 := &BlockDeletionMark{ID: block2.ID, DeletionTime: now.Add(-3 * time.Hour).Unix()} // Above the ignore delay threshold. + marks := []*BlockDeletionMark{mark1, mark2} + var buf bytes.Buffer + for _, meta := range metas { + require.NoError(t, json.NewEncoder(&buf).Encode(&meta)) + require.NoError(t, bkt.Upload(ctx, path.Join(userID, meta.ULID.String(), metadata.MetaFilename), &buf)) + } + for _, mark := range marks { + require.NoError(t, json.NewEncoder(&buf).Encode(mark)) + require.NoError(t, bkt.Upload(ctx, path.Join(userID, mark.ID.String(), metadata.DeletionMarkFilename), &buf)) + } + blockIdsFetcher := NewBlockIDsFetcher(logger, bkt, userID, nil) + ch := make(chan ulid.ULID) + var wg sync.WaitGroup + var blockIds []ulid.ULID + wg.Add(1) + go func() { + defer wg.Done() + for id := range ch { + blockIds = append(blockIds, id) + } + }() + blockIdsFetcher.GetActiveAndPartialBlockIDs(ctx, ch) + close(ch) + wg.Wait() + require.Equal(t, []ulid.ULID{block1.ID, block2.ID, block3.ID}, blockIds) +}