Skip to content

Commit

Permalink
add unit test for bucketindex block ids fetcher
Browse files Browse the repository at this point in the history
Signed-off-by: Wen Xu <[email protected]>
  • Loading branch information
wenxu1024 committed Nov 28, 2023
1 parent a3e382e commit 6b5eef7
Show file tree
Hide file tree
Showing 3 changed files with 131 additions and 14 deletions.
3 changes: 2 additions & 1 deletion pkg/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -793,8 +793,9 @@ func (c *Compactor) compactUser(ctx context.Context, userID string) error {
noCompactMarkerFilter := compact.NewGatherNoCompactionMarkFilter(ulogger, bucket, c.compactorCfg.MetaSyncConcurrency)

var blockIDsFetcher block.BlockIDsFetcher
// TODO: remove the BucketIndexBlockIDsFetcherEnabled flag once it is proved stable
if c.storageCfg.BucketStore.BucketIndex.Enabled && c.compactorCfg.BucketIndexBlockIDsFetcherEnalbed {
blockIDsFetcher = NewBucketIndexBlockIDsFetcher(ulogger, c.bucketClient, userID, c.limits)
blockIDsFetcher = bucketindex.NewBlockIDsFetcher(ulogger, c.bucketClient, userID, c.limits)
} else {
blockIDsFetcher = block.NewBaseBlockIDsFetcher(ulogger, bucket)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,45 +1,50 @@
package compactor
package bucketindex

import (
"context"
"github.com/thanos-io/thanos/pkg/block"

"github.com/cortexproject/cortex/pkg/storage/bucket"
"github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/oklog/ulid"
"github.com/pkg/errors"
"github.com/thanos-io/objstore"

"github.com/cortexproject/cortex/pkg/storage/bucket"
)

type BucketIndexBlockIDsFetcher struct {
type BlockIDsFetcher struct {
logger log.Logger
bkt objstore.Bucket
userID string
cfgProvider bucket.TenantConfigProvider
}

func NewBucketIndexBlockIDsFetcher(logger log.Logger, bkt objstore.Bucket, userID string, cfgProvider bucket.TenantConfigProvider) *BucketIndexBlockIDsFetcher {
return &BucketIndexBlockIDsFetcher{
func NewBlockIDsFetcher(logger log.Logger, bkt objstore.Bucket, userID string, cfgProvider bucket.TenantConfigProvider) *BlockIDsFetcher {
return &BlockIDsFetcher{
logger: logger,
bkt: bkt,
userID: userID,
cfgProvider: cfgProvider,
}
}

func (f *BucketIndexBlockIDsFetcher) GetActiveAndPartialBlockIDs(ctx context.Context, ch chan<- ulid.ULID) (partialBlocks map[ulid.ULID]bool, err error) {
func (f *BlockIDsFetcher) GetActiveAndPartialBlockIDs(ctx context.Context, ch chan<- ulid.ULID) (partialBlocks map[ulid.ULID]bool, err error) {
// Fetch the bucket index.
idx, err := bucketindex.ReadIndex(ctx, f.bkt, f.userID, f.cfgProvider, f.logger)
if errors.Is(err, bucketindex.ErrIndexNotFound) {
idx, err := ReadIndex(ctx, f.bkt, f.userID, f.cfgProvider, f.logger)
userBkt := bucket.NewUserBucketClient(f.userID, f.bkt, f.cfgProvider)
baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(f.logger, userBkt)
if errors.Is(err, ErrIndexNotFound) {
// This is a legit case happening when the first blocks of a tenant have recently been uploaded by ingesters
// and their bucket index has not been created yet.
return nil, nil
// Fallback to BaseBlockIDsFetcher.
return baseBlockIDsFetcher.GetActiveAndPartialBlockIDs(ctx, ch)
}
if errors.Is(err, bucketindex.ErrIndexCorrupted) {
if errors.Is(err, ErrIndexCorrupted) {
// In case a single tenant bucket index is corrupted, we want to return empty active blocks and parital blocks, so skipping this compaction cycle
level.Error(f.logger).Log("msg", "corrupted bucket index found", "user", f.userID, "err", err)
return nil, nil
// Fallback to BaseBlockIDsFetcher.
return baseBlockIDsFetcher.GetActiveAndPartialBlockIDs(ctx, ch)
}

if errors.Is(err, bucket.ErrCustomerManagedKeyAccessDenied) {
Expand All @@ -50,7 +55,7 @@ func (f *BucketIndexBlockIDsFetcher) GetActiveAndPartialBlockIDs(ctx context.Con
}

if err != nil {
return nil, errors.Wrapf(err, "read bucket index")
return nil, err
}

// Sent the active block ids
Expand Down
111 changes: 111 additions & 0 deletions pkg/storage/tsdb/bucketindex/block_ids_fetcher_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package bucketindex

import (
"bytes"
"context"
"encoding/json"
"github.com/thanos-io/thanos/pkg/block/metadata"
"path"
"sync"
"testing"
"time"

"github.com/cortexproject/cortex/pkg/util/concurrency"
"github.com/go-kit/log"
"github.com/oklog/ulid"
"github.com/stretchr/testify/require"

cortex_testutil "github.com/cortexproject/cortex/pkg/storage/tsdb/testutil"
)

func TestBlockIDsFetcher_Fetch(t *testing.T) {
t.Parallel()
const userID = "user-1"

bkt, _ := cortex_testutil.PrepareFilesystemBucket(t)
ctx := context.Background()
now := time.Now()
logs := &concurrency.SyncBuffer{}
logger := log.NewLogfmtLogger(logs)

// Create a bucket index.
block1 := &Block{ID: ulid.MustNew(1, nil)}
block2 := &Block{ID: ulid.MustNew(2, nil)}
block3 := &Block{ID: ulid.MustNew(3, nil)}
mark1 := &BlockDeletionMark{ID: block1.ID, DeletionTime: now.Add(-time.Hour).Unix()} // Below the ignore delay threshold.
mark2 := &BlockDeletionMark{ID: block2.ID, DeletionTime: now.Add(-3 * time.Hour).Unix()} // Above the ignore delay threshold.

require.NoError(t, WriteIndex(ctx, bkt, userID, nil, &Index{
Version: IndexVersion1,
Blocks: Blocks{block1, block2, block3},
BlockDeletionMarks: BlockDeletionMarks{mark1, mark2},
UpdatedAt: now.Unix(),
}))

blockIdsFetcher := NewBlockIDsFetcher(logger, bkt, userID, nil)
ch := make(chan ulid.ULID)
var wg sync.WaitGroup
var blockIds []ulid.ULID
wg.Add(1)
go func() {
defer wg.Done()
for id := range ch {
blockIds = append(blockIds, id)
}
}()
blockIdsFetcher.GetActiveAndPartialBlockIDs(ctx, ch)
close(ch)
wg.Wait()
require.Equal(t, []ulid.ULID{block1.ID, block2.ID, block3.ID}, blockIds)
}

func TestBlockIDsFetcherFetcher_Fetch_NoBucketIndex(t *testing.T) {
t.Parallel()
const userID = "user-1"

bkt, _ := cortex_testutil.PrepareFilesystemBucket(t)
ctx := context.Background()
now := time.Now()
logs := &concurrency.SyncBuffer{}
logger := log.NewLogfmtLogger(logs)

//prepare tenant bucket
var meta1, meta2, meta3 metadata.Meta
block1 := &Block{ID: ulid.MustNew(1, nil)}
meta1.Version = 1
meta1.ULID = block1.ID
block2 := &Block{ID: ulid.MustNew(2, nil)}
meta2.Version = 1
meta2.ULID = block2.ID
block3 := &Block{ID: ulid.MustNew(3, nil)}
meta3.Version = 1
meta3.ULID = block3.ID
metas := []metadata.Meta{meta1, meta2, meta3}
mark1 := &BlockDeletionMark{ID: block1.ID, DeletionTime: now.Add(-time.Hour).Unix()} // Below the ignore delay threshold.
mark2 := &BlockDeletionMark{ID: block2.ID, DeletionTime: now.Add(-3 * time.Hour).Unix()} // Above the ignore delay threshold.
marks := []*BlockDeletionMark{mark1, mark2}
var buf bytes.Buffer
for _, meta := range metas {
require.NoError(t, json.NewEncoder(&buf).Encode(&meta))
require.NoError(t, bkt.Upload(ctx, path.Join(userID, meta.ULID.String(), metadata.MetaFilename), &buf))
}
for _, mark := range marks {
require.NoError(t, json.NewEncoder(&buf).Encode(mark))
require.NoError(t, bkt.Upload(ctx, path.Join(userID, mark.ID.String(), metadata.DeletionMarkFilename), &buf))
}
blockIdsFetcher := NewBlockIDsFetcher(logger, bkt, userID, nil)
ch := make(chan ulid.ULID)
var wg sync.WaitGroup
var blockIds []ulid.ULID
wg.Add(1)
go func() {
defer wg.Done()
for id := range ch {
blockIds = append(blockIds, id)
}
}()
blockIdsFetcher.GetActiveAndPartialBlockIDs(ctx, ch)
close(ch)
wg.Wait()
require.Equal(t, []ulid.ULID{block1.ID, block2.ID, block3.ID}, blockIds)
}

0 comments on commit 6b5eef7

Please sign in to comment.