Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

update thanos and add block_ids_fetcher to bucketindex #5681

Merged
merged 8 commits into from
Nov 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
46 changes: 23 additions & 23 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@ require (
github.com/sony/gobreaker v0.5.0
github.com/spf13/afero v1.9.5
github.com/stretchr/testify v1.8.4
github.com/thanos-io/objstore v0.0.0-20231112185854-37752ee64d98
github.com/thanos-io/promql-engine v0.0.0-20231013104847-4517c0d5f591
github.com/thanos-io/thanos v0.32.5-0.20231120163350-0a4f5ae3310e
github.com/thanos-io/objstore v0.0.0-20231123170144-bffedaa58acb
github.com/thanos-io/promql-engine v0.0.0-20231127105941-257543af55e8
github.com/thanos-io/thanos v0.32.5-0.20231127170340-8ffb9da1383e
github.com/uber/jaeger-client-go v2.30.0+incompatible
github.com/weaveworks/common v0.0.0-20221201103051-7c2720a9024d
go.etcd.io/etcd/api/v3 v3.5.10
Expand All @@ -67,10 +67,10 @@ require (
go.opentelemetry.io/otel/sdk v1.19.0
go.opentelemetry.io/otel/trace v1.19.0
go.uber.org/atomic v1.11.0
golang.org/x/net v0.17.0
golang.org/x/sync v0.4.0
golang.org/x/time v0.3.0
google.golang.org/grpc v1.58.3
golang.org/x/net v0.18.0
golang.org/x/sync v0.5.0
golang.org/x/time v0.4.0
google.golang.org/grpc v1.59.0
gopkg.in/yaml.v2 v2.4.0
gopkg.in/yaml.v3 v3.0.1
sigs.k8s.io/yaml v1.3.0
Expand All @@ -84,11 +84,11 @@ require (
)

require (
cloud.google.com/go v0.110.8 // indirect
cloud.google.com/go/compute v1.23.0 // indirect
cloud.google.com/go v0.110.10 // indirect
cloud.google.com/go/compute v1.23.3 // indirect
cloud.google.com/go/compute/metadata v0.2.3 // indirect
cloud.google.com/go/iam v1.1.2 // indirect
cloud.google.com/go/storage v1.30.1 // indirect
cloud.google.com/go/iam v1.1.5 // indirect
cloud.google.com/go/storage v1.35.1 // indirect
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.8.0 // indirect
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.4.0 // indirect
github.com/Azure/azure-sdk-for-go/sdk/internal v1.3.0 // indirect
Expand Down Expand Up @@ -143,8 +143,8 @@ require (
github.com/google/btree v1.0.1 // indirect
github.com/google/pprof v0.0.0-20230926050212-f7f687d19a98 // indirect
github.com/google/s2a-go v0.1.7 // indirect
github.com/google/uuid v1.3.1 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.3.1 // indirect
github.com/google/uuid v1.4.0 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.3.2 // indirect
github.com/googleapis/gax-go/v2 v2.12.0 // indirect
github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.0.0-rc.2.0.20201207153454-9f6bf00c00a7 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0 // indirect
Expand Down Expand Up @@ -216,20 +216,20 @@ require (
go.uber.org/zap v1.21.0 // indirect
go4.org/intern v0.0.0-20230525184215-6c62f75575cb // indirect
go4.org/unsafe/assume-no-moving-gc v0.0.0-20230525183740-e7c30c78aeb2 // indirect
golang.org/x/crypto v0.14.0 // indirect
golang.org/x/crypto v0.15.0 // indirect
golang.org/x/exp v0.0.0-20231006140011-7918f672742d // indirect
golang.org/x/mod v0.13.0 // indirect
golang.org/x/oauth2 v0.13.0 // indirect
golang.org/x/sys v0.13.0 // indirect
golang.org/x/text v0.13.0 // indirect
golang.org/x/oauth2 v0.14.0 // indirect
golang.org/x/sys v0.14.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/tools v0.14.0 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 // indirect
gonum.org/v1/gonum v0.12.0 // indirect
google.golang.org/api v0.147.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20231002182017-d307bd883b97 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20231012201019-e917dd12ba7a // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20231009173412-8bfb1ae86b6c // indirect
google.golang.org/api v0.150.0 // indirect
google.golang.org/appengine v1.6.8 // indirect
google.golang.org/genproto v0.0.0-20231106174013-bbf56f31fb17 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20231106174013-bbf56f31fb17 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20231106174013-bbf56f31fb17 // indirect
gopkg.in/alecthomas/kingpin.v2 v2.2.6 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/telebot.v3 v3.1.3 // indirect
Expand Down
89 changes: 46 additions & 43 deletions go.sum

Large diffs are not rendered by default.

8 changes: 8 additions & 0 deletions pkg/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -790,10 +790,18 @@ func (c *Compactor) compactUser(ctx context.Context, userID string) error {
// out of order chunks or index file too big.
noCompactMarkerFilter := compact.NewGatherNoCompactionMarkFilter(ulogger, bucket, c.compactorCfg.MetaSyncConcurrency)

var blockIDsFetcher block.BlockIDsFetcher
if c.storageCfg.BucketStore.BucketIndex.Enabled {
blockIDsFetcher = bucketindex.NewBlockIDsFetcher(ulogger, c.bucketClient, userID, c.limits)
} else {
blockIDsFetcher = block.NewBaseBlockIDsFetcher(ulogger, bucket)
}

fetcher, err := block.NewMetaFetcher(
ulogger,
c.compactorCfg.MetaSyncConcurrency,
bucket,
blockIDsFetcher,
c.metaSyncDirForUser(userID),
reg,
// List of filters to apply (order matters).
Expand Down
2 changes: 2 additions & 0 deletions pkg/querier/blocks_finder_bucket_scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,10 +384,12 @@ func (d *BucketScanBlocksFinder) createMetaFetcher(userID string) (block.Metadat
filters = append(filters, storegateway.NewIgnoreNonQueryableBlocksFilter(d.logger, d.cfg.IgnoreBlocksWithin))
}

blockIdsFetcher := block.NewBaseBlockIDsFetcher(userLogger, userBucket)
f, err := block.NewMetaFetcher(
userLogger,
d.cfg.MetasConcurrency,
userBucket,
blockIdsFetcher,
// The fetcher stores cached metas in the "meta-syncer/" sub directory.
filepath.Join(d.cfg.CacheDir, userID),
userReg,
Expand Down
72 changes: 72 additions & 0 deletions pkg/storage/tsdb/bucketindex/block_ids_fetcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package bucketindex

import (
"context"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/oklog/ulid"
"github.com/pkg/errors"
"github.com/thanos-io/objstore"
"github.com/thanos-io/thanos/pkg/block"

"github.com/cortexproject/cortex/pkg/storage/bucket"
)

type BlockIDsFetcher struct {
logger log.Logger
bkt objstore.Bucket
userID string
cfgProvider bucket.TenantConfigProvider
baseBlockIDsFetcher block.BlockIDsFetcher
}

func NewBlockIDsFetcher(logger log.Logger, bkt objstore.Bucket, userID string, cfgProvider bucket.TenantConfigProvider) *BlockIDsFetcher {
userBkt := bucket.NewUserBucketClient(userID, bkt, cfgProvider)
baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, userBkt)
return &BlockIDsFetcher{
logger: logger,
bkt: bkt,
userID: userID,
cfgProvider: cfgProvider,
baseBlockIDsFetcher: baseBlockIDsFetcher,
}
}

func (f *BlockIDsFetcher) GetActiveAndPartialBlockIDs(ctx context.Context, ch chan<- ulid.ULID) (partialBlocks map[ulid.ULID]bool, err error) {
// Fetch the bucket index.
idx, err := ReadIndex(ctx, f.bkt, f.userID, f.cfgProvider, f.logger)
if errors.Is(err, ErrIndexNotFound) {
// This is a legit case happening when the first blocks of a tenant have recently been uploaded by ingesters
// and their bucket index has not been created yet.
// Fallback to BaseBlockIDsFetcher.
return f.baseBlockIDsFetcher.GetActiveAndPartialBlockIDs(ctx, ch)
}
if errors.Is(err, ErrIndexCorrupted) {
// In case a single tenant bucket index is corrupted, we want to return empty active blocks and parital blocks, so skipping this compaction cycle
level.Error(f.logger).Log("msg", "corrupted bucket index found", "user", f.userID, "err", err)
// Fallback to BaseBlockIDsFetcher.
return f.baseBlockIDsFetcher.GetActiveAndPartialBlockIDs(ctx, ch)
}

if errors.Is(err, bucket.ErrCustomerManagedKeyAccessDenied) {
// stop the job and return the error
// this error should be used to return Access Denied to the caller
level.Error(f.logger).Log("msg", "bucket index key permission revoked", "user", f.userID, "err", err)
return nil, err
}

if err != nil {
return nil, err
}

// Sent the active block ids
for _, b := range idx.Blocks {
select {
case <-ctx.Done():
return nil, ctx.Err()
case ch <- b.ID:
}
}
return nil, nil
}
111 changes: 111 additions & 0 deletions pkg/storage/tsdb/bucketindex/block_ids_fetcher_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package bucketindex

import (
"bytes"
"context"
"encoding/json"
"path"
"sync"
"testing"
"time"

"github.com/go-kit/log"
"github.com/oklog/ulid"
"github.com/stretchr/testify/require"
"github.com/thanos-io/thanos/pkg/block/metadata"

cortex_testutil "github.com/cortexproject/cortex/pkg/storage/tsdb/testutil"
"github.com/cortexproject/cortex/pkg/util/concurrency"
)

func TestBlockIDsFetcher_Fetch(t *testing.T) {
t.Parallel()
const userID = "user-1"

bkt, _ := cortex_testutil.PrepareFilesystemBucket(t)
ctx := context.Background()
now := time.Now()
logs := &concurrency.SyncBuffer{}
logger := log.NewLogfmtLogger(logs)

// Create a bucket index.
block1 := &Block{ID: ulid.MustNew(1, nil)}
block2 := &Block{ID: ulid.MustNew(2, nil)}
block3 := &Block{ID: ulid.MustNew(3, nil)}
mark1 := &BlockDeletionMark{ID: block1.ID, DeletionTime: now.Add(-time.Hour).Unix()} // Below the ignore delay threshold.
mark2 := &BlockDeletionMark{ID: block2.ID, DeletionTime: now.Add(-3 * time.Hour).Unix()} // Above the ignore delay threshold.

require.NoError(t, WriteIndex(ctx, bkt, userID, nil, &Index{
Version: IndexVersion1,
Blocks: Blocks{block1, block2, block3},
BlockDeletionMarks: BlockDeletionMarks{mark1, mark2},
UpdatedAt: now.Unix(),
}))

blockIdsFetcher := NewBlockIDsFetcher(logger, bkt, userID, nil)
ch := make(chan ulid.ULID)
var wg sync.WaitGroup
var blockIds []ulid.ULID
wg.Add(1)
go func() {
defer wg.Done()
for id := range ch {
blockIds = append(blockIds, id)
}
}()
blockIdsFetcher.GetActiveAndPartialBlockIDs(ctx, ch)
close(ch)
wg.Wait()
require.Equal(t, []ulid.ULID{block1.ID, block2.ID, block3.ID}, blockIds)
}

func TestBlockIDsFetcherFetcher_Fetch_NoBucketIndex(t *testing.T) {
t.Parallel()
const userID = "user-1"

bkt, _ := cortex_testutil.PrepareFilesystemBucket(t)
ctx := context.Background()
now := time.Now()
logs := &concurrency.SyncBuffer{}
logger := log.NewLogfmtLogger(logs)

//prepare tenant bucket
var meta1, meta2, meta3 metadata.Meta
block1 := &Block{ID: ulid.MustNew(1, nil)}
meta1.Version = 1
meta1.ULID = block1.ID
block2 := &Block{ID: ulid.MustNew(2, nil)}
meta2.Version = 1
meta2.ULID = block2.ID
block3 := &Block{ID: ulid.MustNew(3, nil)}
meta3.Version = 1
meta3.ULID = block3.ID
metas := []metadata.Meta{meta1, meta2, meta3}
mark1 := &BlockDeletionMark{ID: block1.ID, DeletionTime: now.Add(-time.Hour).Unix()} // Below the ignore delay threshold.
mark2 := &BlockDeletionMark{ID: block2.ID, DeletionTime: now.Add(-3 * time.Hour).Unix()} // Above the ignore delay threshold.
marks := []*BlockDeletionMark{mark1, mark2}
var buf bytes.Buffer
for _, meta := range metas {
require.NoError(t, json.NewEncoder(&buf).Encode(&meta))
require.NoError(t, bkt.Upload(ctx, path.Join(userID, meta.ULID.String(), metadata.MetaFilename), &buf))
}
for _, mark := range marks {
require.NoError(t, json.NewEncoder(&buf).Encode(mark))
require.NoError(t, bkt.Upload(ctx, path.Join(userID, mark.ID.String(), metadata.DeletionMarkFilename), &buf))
}
blockIdsFetcher := NewBlockIDsFetcher(logger, bkt, userID, nil)
ch := make(chan ulid.ULID)
var wg sync.WaitGroup
var blockIds []ulid.ULID
wg.Add(1)
go func() {
defer wg.Done()
for id := range ch {
blockIds = append(blockIds, id)
}
}()
blockIdsFetcher.GetActiveAndPartialBlockIDs(ctx, ch)
close(ch)
wg.Wait()
require.Equal(t, []ulid.ULID{block1.ID, block2.ID, block3.ID}, blockIds)
}
2 changes: 2 additions & 0 deletions pkg/storegateway/bucket_stores.go
Original file line number Diff line number Diff line change
Expand Up @@ -552,10 +552,12 @@ func (u *BucketStores) getOrCreateStore(userID string) (*store.BucketStore, erro
fetcherBkt := NewShardingBucketReaderAdapter(userID, u.shardingStrategy, userBkt)

var err error
blockIdsFetcher := block.NewBaseBlockIDsFetcher(userLogger, fetcherBkt)
fetcher, err = block.NewMetaFetcher(
userLogger,
u.cfg.BucketStore.MetaSyncConcurrency,
fetcherBkt,
blockIdsFetcher,
u.syncDirForUser(userID), // The fetcher stores cached metas in the "meta-syncer/" sub directory
fetcherReg,
filters,
Expand Down
2 changes: 1 addition & 1 deletion vendor/cloud.google.com/go/compute/internal/version.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 21 additions & 0 deletions vendor/cloud.google.com/go/iam/CHANGES.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion vendor/cloud.google.com/go/iam/apiv1/iampb/options.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion vendor/cloud.google.com/go/iam/apiv1/iampb/policy.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading