diff --git a/CHANGELOG.md b/CHANGELOG.md index f08d1671950..a7143298edb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -25,6 +25,7 @@ * [BUGFIX] Ingester: don't retain blocks if they finish exactly on the boundary of the retention window. #7656 * [BUGFIX] Bug-fixes and improvements to experimental native histograms. #7744 * [BUGFIX] Querier: return an error when a query uses `label_join` with an invalid destination label name. #7744 +* [BUGFIX] Compactor: correct outstanding job estimation in metrics and `compaction-planner` tool when block labels differ. #7745 ### Mixin diff --git a/pkg/compactor/blocks_cleaner.go b/pkg/compactor/blocks_cleaner.go index 1439182f934..e46dfee5102 100644 --- a/pkg/compactor/blocks_cleaner.go +++ b/pkg/compactor/blocks_cleaner.go @@ -683,18 +683,18 @@ func stalePartialBlockLastModifiedTime(ctx context.Context, blockID ulid.ULID, u } func estimateCompactionJobsFromBucketIndex(ctx context.Context, userID string, userBucket objstore.InstrumentedBucket, idx *bucketindex.Index, compactionBlockRanges mimir_tsdb.DurationList, mergeShards int, splitGroups int) ([]*Job, error) { - metas := convertBucketIndexToMetasForCompactionJobPlanning(idx) + metas := ConvertBucketIndexToMetasForCompactionJobPlanning(idx) // We need to pass this metric to MetadataFilters, but we don't need to report this value from BlocksCleaner. synced := newNoopGaugeVec() for _, f := range []block.MetadataFilter{ + NewLabelRemoverFilter(compactionIgnoredLabels), // We don't include ShardAwareDeduplicateFilter, because it relies on list of compaction sources, which are not present in the BucketIndex. // We do include NoCompactionMarkFilter to avoid computing jobs from blocks that are marked for no-compaction. NewNoCompactionMarkFilter(userBucket), } { - err := f.Filter(ctx, metas, synced) - if err != nil { + if err := f.Filter(ctx, metas, synced); err != nil { return nil, err } } @@ -705,23 +705,31 @@ func estimateCompactionJobsFromBucketIndex(ctx context.Context, userID string, u } // Convert index into map of block Metas, but ignore blocks marked for deletion. -func convertBucketIndexToMetasForCompactionJobPlanning(idx *bucketindex.Index) map[ulid.ULID]*block.Meta { +func ConvertBucketIndexToMetasForCompactionJobPlanning(idx *bucketindex.Index) map[ulid.ULID]*block.Meta { deletedULIDs := idx.BlockDeletionMarks.GetULIDs() - deleted := make(map[ulid.ULID]bool, len(deletedULIDs)) + deleted := make(map[ulid.ULID]struct{}, len(deletedULIDs)) for _, id := range deletedULIDs { - deleted[id] = true + deleted[id] = struct{}{} } metas := map[ulid.ULID]*block.Meta{} for _, b := range idx.Blocks { - if deleted[b.ID] { + if _, del := deleted[b.ID]; del { continue } metas[b.ID] = b.ThanosMeta() if metas[b.ID].Thanos.Labels == nil { metas[b.ID].Thanos.Labels = map[string]string{} } - metas[b.ID].Thanos.Labels[mimir_tsdb.CompactorShardIDExternalLabel] = b.CompactorShardID // Needed for correct planning. + + // Correct planning depends on external labels being present. We didn't + // always persist labels into the bucket index, but we may have tracked + // the shard ID label, so copy that back over if it isn't there. + if b.CompactorShardID != "" { + if _, found := metas[b.ID].Thanos.Labels[mimir_tsdb.CompactorShardIDExternalLabel]; !found { + metas[b.ID].Thanos.Labels[mimir_tsdb.CompactorShardIDExternalLabel] = b.CompactorShardID + } + } } return metas } diff --git a/pkg/compactor/blocks_cleaner_test.go b/pkg/compactor/blocks_cleaner_test.go index 2e883184f5c..b625fa05949 100644 --- a/pkg/compactor/blocks_cleaner_test.go +++ b/pkg/compactor/blocks_cleaner_test.go @@ -24,6 +24,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/client_golang/prometheus/testutil" + prom_tsdb "github.com/prometheus/prometheus/tsdb" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/thanos-io/objstore" @@ -1044,41 +1045,198 @@ func TestComputeCompactionJobs(t *testing.T) { } const user = "test" - twoHoursMS := 2 * time.Hour.Milliseconds() dayMS := 24 * time.Hour.Milliseconds() + userBucket := bucket.NewUserBucketClient(user, bucketClient, nil) + + // Mark block for no-compaction. blockMarkedForNoCompact := ulid.MustNew(ulid.Now(), rand.Reader) + require.NoError(t, block.MarkForNoCompact(context.Background(), log.NewNopLogger(), userBucket, blockMarkedForNoCompact, block.CriticalNoCompactReason, "testing", promauto.With(nil).NewCounter(prometheus.CounterOpts{}))) - index := bucketindex.Index{} - index.Blocks = bucketindex.Blocks{ - // Some 2h blocks that should be compacted together and split. - &bucketindex.Block{ID: ulid.MustNew(ulid.Now(), rand.Reader), MinTime: 0, MaxTime: twoHoursMS}, - &bucketindex.Block{ID: ulid.MustNew(ulid.Now(), rand.Reader), MinTime: 0, MaxTime: twoHoursMS}, - &bucketindex.Block{ID: ulid.MustNew(ulid.Now(), rand.Reader), MinTime: 0, MaxTime: twoHoursMS}, + cases := map[string]struct { + blocks bucketindex.Blocks + expectedSplits int + expectedMerges int + }{ + "standard": { + blocks: bucketindex.Blocks{ + // Some 2h blocks that should be compacted together and split. + &bucketindex.Block{ID: ulid.MustNew(ulid.Now(), rand.Reader), MinTime: 0, MaxTime: twoHoursMS}, + &bucketindex.Block{ID: ulid.MustNew(ulid.Now(), rand.Reader), MinTime: 0, MaxTime: twoHoursMS}, + &bucketindex.Block{ID: ulid.MustNew(ulid.Now(), rand.Reader), MinTime: 0, MaxTime: twoHoursMS}, + + // Some merge jobs. + &bucketindex.Block{ID: ulid.MustNew(ulid.Now(), rand.Reader), MinTime: dayMS, MaxTime: 2 * dayMS, CompactorShardID: "1_of_3"}, + &bucketindex.Block{ID: ulid.MustNew(ulid.Now(), rand.Reader), MinTime: dayMS, MaxTime: 2 * dayMS, CompactorShardID: "1_of_3"}, + + &bucketindex.Block{ID: ulid.MustNew(ulid.Now(), rand.Reader), MinTime: dayMS, MaxTime: 2 * dayMS, CompactorShardID: "2_of_3"}, + &bucketindex.Block{ID: ulid.MustNew(ulid.Now(), rand.Reader), MinTime: dayMS, MaxTime: 2 * dayMS, CompactorShardID: "2_of_3"}, + + // This merge job is skipped, as block is marked for no-compaction. + &bucketindex.Block{ID: ulid.MustNew(ulid.Now(), rand.Reader), MinTime: dayMS, MaxTime: 2 * dayMS, CompactorShardID: "3_of_3"}, + &bucketindex.Block{ID: blockMarkedForNoCompact, MinTime: dayMS, MaxTime: 2 * dayMS, CompactorShardID: "3_of_3"}, + }, + expectedSplits: 1, + expectedMerges: 2, + }, + "labels don't match": { + blocks: bucketindex.Blocks{ + // Compactor wouldn't produce a job for this pair as their external labels differ: + &bucketindex.Block{ID: ulid.MustNew(ulid.Now(), rand.Reader), MinTime: 5 * dayMS, MaxTime: 6 * dayMS, + Labels: map[string]string{ + tsdb.OutOfOrderExternalLabel: tsdb.OutOfOrderExternalLabelValue, + }, + }, + &bucketindex.Block{ID: ulid.MustNew(ulid.Now(), rand.Reader), MinTime: 5 * dayMS, MaxTime: 6 * dayMS, + Labels: map[string]string{ + "another_label": "-1", + }, + }, + }, + expectedSplits: 0, + expectedMerges: 0, + }, + "ignore deprecated labels": { + blocks: bucketindex.Blocks{ + // Compactor will ignore deprecated labels when computing jobs. Estimation should do the same. + &bucketindex.Block{ID: ulid.MustNew(ulid.Now(), rand.Reader), MinTime: 5 * dayMS, MaxTime: 6 * dayMS, + Labels: map[string]string{ + "honored_label": "12345", + tsdb.DeprecatedTenantIDExternalLabel: "tenant1", + tsdb.DeprecatedIngesterIDExternalLabel: "ingester1", + }, + }, + &bucketindex.Block{ID: ulid.MustNew(ulid.Now(), rand.Reader), MinTime: 5 * dayMS, MaxTime: 6 * dayMS, + Labels: map[string]string{ + "honored_label": "12345", + tsdb.DeprecatedTenantIDExternalLabel: "tenant2", + tsdb.DeprecatedIngesterIDExternalLabel: "ingester2", + }, + }, + }, + expectedSplits: 0, + expectedMerges: 1, + }, + } - // Some merge jobs. - &bucketindex.Block{ID: ulid.MustNew(ulid.Now(), rand.Reader), MinTime: dayMS, MaxTime: 2 * dayMS, CompactorShardID: "1_of_3"}, - &bucketindex.Block{ID: ulid.MustNew(ulid.Now(), rand.Reader), MinTime: dayMS, MaxTime: 2 * dayMS, CompactorShardID: "1_of_3"}, + for name, c := range cases { + t.Run(name, func(t *testing.T) { + index := &bucketindex.Index{Blocks: c.blocks} + jobs, err := estimateCompactionJobsFromBucketIndex(context.Background(), user, userBucket, index, cfg.CompactionBlockRanges, 3, 0) + require.NoError(t, err) + split, merge := computeSplitAndMergeJobs(jobs) + require.Equal(t, c.expectedSplits, split) + require.Equal(t, c.expectedMerges, merge) + }) + } +} - &bucketindex.Block{ID: ulid.MustNew(ulid.Now(), rand.Reader), MinTime: dayMS, MaxTime: 2 * dayMS, CompactorShardID: "2_of_3"}, - &bucketindex.Block{ID: ulid.MustNew(ulid.Now(), rand.Reader), MinTime: dayMS, MaxTime: 2 * dayMS, CompactorShardID: "2_of_3"}, +func TestConvertBucketIndexToMetasForCompactionJobPlanning(t *testing.T) { + twoHoursMS := 2 * time.Hour.Milliseconds() - // This merge job is skipped, as block is marked for no-compaction. - &bucketindex.Block{ID: ulid.MustNew(ulid.Now(), rand.Reader), MinTime: dayMS, MaxTime: 2 * dayMS, CompactorShardID: "3_of_3"}, - &bucketindex.Block{ID: blockMarkedForNoCompact, MinTime: dayMS, MaxTime: 2 * dayMS, CompactorShardID: "3_of_3"}, + makeUlid := func(n byte) ulid.ULID { + return ulid.ULID{n} } - userBucket := bucket.NewUserBucketClient(user, bucketClient, nil) - // Mark block for no-compaction. - require.NoError(t, block.MarkForNoCompact(context.Background(), log.NewNopLogger(), userBucket, blockMarkedForNoCompact, block.CriticalNoCompactReason, "testing", promauto.With(nil).NewCounter(prometheus.CounterOpts{}))) + makeMeta := func(id ulid.ULID, labels map[string]string) *block.Meta { + return &block.Meta{ + BlockMeta: prom_tsdb.BlockMeta{ + ULID: id, + MinTime: 0, + MaxTime: twoHoursMS, + Version: block.TSDBVersion1, + }, + Thanos: block.ThanosMeta{ + Version: block.ThanosVersion1, + Labels: labels, + }, + } + } - // No grouping of jobs for split-compaction. All jobs will be in single split compaction. - jobs, err := estimateCompactionJobsFromBucketIndex(context.Background(), user, userBucket, &index, cfg.CompactionBlockRanges, 3, 0) - require.NoError(t, err) - split, merge := computeSplitAndMergeJobs(jobs) - require.Equal(t, 1, split) - require.Equal(t, 2, merge) + cases := map[string]struct { + index *bucketindex.Index + expectedMetas map[ulid.ULID]*block.Meta + }{ + "empty": { + index: &bucketindex.Index{Blocks: bucketindex.Blocks{}}, + expectedMetas: map[ulid.ULID]*block.Meta{}, + }, + "basic": { + index: &bucketindex.Index{ + Blocks: bucketindex.Blocks{ + &bucketindex.Block{ID: makeUlid(1), MinTime: 0, MaxTime: twoHoursMS}, + }, + }, + expectedMetas: map[ulid.ULID]*block.Meta{ + makeUlid(1): makeMeta(makeUlid(1), map[string]string{}), + }, + }, + "adopt shard ID": { + index: &bucketindex.Index{ + Blocks: bucketindex.Blocks{ + &bucketindex.Block{ID: makeUlid(1), MinTime: 0, MaxTime: twoHoursMS, CompactorShardID: "78"}, + }, + }, + expectedMetas: map[ulid.ULID]*block.Meta{ + makeUlid(1): makeMeta(makeUlid(1), map[string]string{tsdb.CompactorShardIDExternalLabel: "78"}), + }, + }, + "use labeled shard ID": { + index: &bucketindex.Index{ + Blocks: bucketindex.Blocks{ + &bucketindex.Block{ID: makeUlid(1), MinTime: 0, MaxTime: twoHoursMS, + Labels: map[string]string{tsdb.CompactorShardIDExternalLabel: "3"}}, + }, + }, + expectedMetas: map[ulid.ULID]*block.Meta{ + makeUlid(1): makeMeta(makeUlid(1), map[string]string{tsdb.CompactorShardIDExternalLabel: "3"}), + }, + }, + "don't overwrite labeled shard ID": { + index: &bucketindex.Index{ + Blocks: bucketindex.Blocks{ + &bucketindex.Block{ID: makeUlid(1), MinTime: 0, MaxTime: twoHoursMS, CompactorShardID: "78", + Labels: map[string]string{tsdb.CompactorShardIDExternalLabel: "3"}}, + }, + }, + expectedMetas: map[ulid.ULID]*block.Meta{ + makeUlid(1): makeMeta(makeUlid(1), map[string]string{tsdb.CompactorShardIDExternalLabel: "3"}), + }, + }, + "honor deletion marks": { + index: &bucketindex.Index{ + BlockDeletionMarks: bucketindex.BlockDeletionMarks{ + &bucketindex.BlockDeletionMark{ID: makeUlid(14)}, + }, + Blocks: bucketindex.Blocks{ + &bucketindex.Block{ID: makeUlid(14), MinTime: 0, MaxTime: twoHoursMS}, + }, + }, + expectedMetas: map[ulid.ULID]*block.Meta{}, + }, + "excess deletes": { + index: &bucketindex.Index{ + BlockDeletionMarks: bucketindex.BlockDeletionMarks{ + &bucketindex.BlockDeletionMark{ID: makeUlid(15)}, + &bucketindex.BlockDeletionMark{ID: makeUlid(16)}, + }, + Blocks: bucketindex.Blocks{ + &bucketindex.Block{ID: makeUlid(14), MinTime: 0, MaxTime: twoHoursMS}, + }, + }, + expectedMetas: map[ulid.ULID]*block.Meta{ + makeUlid(14): makeMeta(makeUlid(14), map[string]string{}), + }, + }, + } + + for name, c := range cases { + t.Run(name, func(t *testing.T) { + m := ConvertBucketIndexToMetasForCompactionJobPlanning(c.index) + require.Equal(t, c.expectedMetas, m) + }) + } } type mockBucketFailure struct { diff --git a/pkg/compactor/compactor.go b/pkg/compactor/compactor.go index 5fead72d4a5..76b136baa9f 100644 --- a/pkg/compactor/compactor.go +++ b/pkg/compactor/compactor.go @@ -60,6 +60,14 @@ var ( errInvalidSymbolFlushersConcurrency = fmt.Errorf("invalid symbols-flushers-concurrency value, must be positive") errInvalidMaxBlockUploadValidationConcurrency = fmt.Errorf("invalid max-block-upload-validation-concurrency value, can't be negative") RingOp = ring.NewOp([]ring.InstanceState{ring.ACTIVE}, nil) + + // compactionIgnoredLabels defines the external labels that compactor will + // drop/ignore when planning jobs so that they don't keep blocks from + // compacting together. + compactionIgnoredLabels = []string{ + mimir_tsdb.DeprecatedIngesterIDExternalLabel, + mimir_tsdb.DeprecatedTenantIDExternalLabel, + } ) // BlocksGrouperFactory builds and returns the grouper to use to compact a tenant's blocks. @@ -733,14 +741,7 @@ func (c *MultitenantCompactor) compactUser(ctx context.Context, userID string) e // List of filters to apply (order matters). fetcherFilters := []block.MetadataFilter{ - // Remove the ingester ID because we don't shard blocks anymore, while still - // honoring the shard ID if sharding was done in the past. - // Remove TenantID external label to make sure that we compact blocks with and without the label - // together. - NewLabelRemoverFilter([]string{ - mimir_tsdb.DeprecatedTenantIDExternalLabel, - mimir_tsdb.DeprecatedIngesterIDExternalLabel, - }), + NewLabelRemoverFilter(compactionIgnoredLabels), deduplicateBlocksFilter, // removes blocks that should not be compacted due to being marked so. NewNoCompactionMarkFilter(userBucket), diff --git a/pkg/storage/tsdb/bucketindex/index.go b/pkg/storage/tsdb/bucketindex/index.go index 16807d39c65..b864f731df1 100644 --- a/pkg/storage/tsdb/bucketindex/index.go +++ b/pkg/storage/tsdb/bucketindex/index.go @@ -7,6 +7,7 @@ package bucketindex import ( "fmt" + "maps" "path/filepath" "strings" "time" @@ -97,6 +98,9 @@ type Block struct { // Whether the block was from out of order samples OutOfOrder bool `json:"out_of_order,omitempty"` + + // Labels contains the external labels from the block's metadata. + Labels map[string]string `json:"labels,omitempty"` } // Within returns whether the block contains samples within the provided range. @@ -118,6 +122,7 @@ func (m *Block) ThanosMeta() *block.Meta { if m.OutOfOrder { compactionHints = []string{tsdb.CompactionHintFromOutOfOrder} } + return &block.Meta{ BlockMeta: tsdb.BlockMeta{ ULID: m.ID, @@ -133,6 +138,7 @@ func (m *Block) ThanosMeta() *block.Meta { Version: block.ThanosVersion1, SegmentFiles: m.thanosMetaSegmentFiles(), Source: block.SourceType(m.Source), + Labels: maps.Clone(m.Labels), }, } } @@ -172,6 +178,7 @@ func BlockFromThanosMeta(meta block.Meta) *Block { Source: string(meta.Thanos.Source), CompactionLevel: meta.Compaction.Level, OutOfOrder: meta.Compaction.FromOutOfOrder(), + Labels: maps.Clone(meta.Thanos.Labels), } } diff --git a/pkg/storage/tsdb/bucketindex/index_test.go b/pkg/storage/tsdb/bucketindex/index_test.go index 7d23ce93a29..82883a60f74 100644 --- a/pkg/storage/tsdb/bucketindex/index_test.go +++ b/pkg/storage/tsdb/bucketindex/index_test.go @@ -241,6 +241,10 @@ func TestBlockFromThanosMeta(t *testing.T) { ID: blockID, MinTime: 10, MaxTime: 20, + Labels: map[string]string{ + "a": "b", + "c": "d", + }, }, }, "meta.json with external labels, with compactor shard ID": { @@ -263,6 +267,11 @@ func TestBlockFromThanosMeta(t *testing.T) { MinTime: 10, MaxTime: 20, CompactorShardID: "10_of_20", + Labels: map[string]string{ + "a": "b", + "c": "d", + mimir_tsdb.CompactorShardIDExternalLabel: "10_of_20", + }, }, }, "meta.json with external labels, with invalid shard ID": { @@ -285,6 +294,11 @@ func TestBlockFromThanosMeta(t *testing.T) { MinTime: 10, MaxTime: 20, CompactorShardID: "some weird value", + Labels: map[string]string{ + "a": "b", + "c": "d", + mimir_tsdb.CompactorShardIDExternalLabel: "some weird value", + }, }, }, } @@ -401,6 +415,28 @@ func TestBlock_ThanosMeta(t *testing.T) { }, }, }, + "block with labels": { + block: Block{ + ID: blockID, + MinTime: 10, + MaxTime: 20, + SegmentsFormat: SegmentsFormatUnknown, + SegmentsNum: 0, + Labels: map[string]string{"my_key": "0x8413"}, + }, + expected: &block.Meta{ + BlockMeta: tsdb.BlockMeta{ + ULID: blockID, + MinTime: 10, + MaxTime: 20, + Version: block.TSDBVersion1, + }, + Thanos: block.ThanosMeta{ + Version: block.ThanosVersion1, + Labels: map[string]string{"my_key": "0x8413"}, + }, + }, + }, } for testName, testData := range tests { diff --git a/pkg/storage/tsdb/bucketindex/updater_test.go b/pkg/storage/tsdb/bucketindex/updater_test.go index 44a88e83af7..b1bfa39b5cb 100644 --- a/pkg/storage/tsdb/bucketindex/updater_test.go +++ b/pkg/storage/tsdb/bucketindex/updater_test.go @@ -8,6 +8,7 @@ package bucketindex import ( "bytes" "context" + "maps" "path" "testing" "time" @@ -188,11 +189,14 @@ func TestUpdater_UpdateIndexFromVersion1ToVersion2(t *testing.T) { block1 := block.MockStorageBlockWithExtLabels(t, bkt, userID, 10, 20, map[string]string{mimir_tsdb.CompactorShardIDExternalLabel: "1_of_4"}) block2 := block.MockStorageBlockWithExtLabels(t, bkt, userID, 20, 30, map[string]string{mimir_tsdb.CompactorShardIDExternalLabel: "3_of_4"}) + // Make copies of blocks without the compactor shard ID label. block1WithoutCompactorShardID := block1 - block1WithoutCompactorShardID.Thanos.Labels = nil + block1WithoutCompactorShardID.Thanos.Labels = maps.Clone(block1.Thanos.Labels) + delete(block1WithoutCompactorShardID.Thanos.Labels, mimir_tsdb.CompactorShardIDExternalLabel) block2WithoutCompactorShardID := block2 - block2WithoutCompactorShardID.Thanos.Labels = nil + block2WithoutCompactorShardID.Thanos.Labels = maps.Clone(block2.Thanos.Labels) + delete(block2WithoutCompactorShardID.Thanos.Labels, mimir_tsdb.CompactorShardIDExternalLabel) // Double check that original block1 and block2 still have compactor shards set. require.Equal(t, "1_of_4", block1.Thanos.Labels[mimir_tsdb.CompactorShardIDExternalLabel]) @@ -209,6 +213,7 @@ func TestUpdater_UpdateIndexFromVersion1ToVersion2(t *testing.T) { // Now remove Compactor Shard ID from index. for _, b := range returnedIdx.Blocks { b.CompactorShardID = "" + delete(b.Labels, mimir_tsdb.CompactorShardIDExternalLabel) } // Try to update existing index. Since we didn't change the version, updater will reuse the index, and not update CompactorShardID field. @@ -253,6 +258,7 @@ func assertBucketIndexEqual(t testing.TB, idx *Index, bkt objstore.Bucket, userI Source: "test", CompactionLevel: 1, OutOfOrder: false, + Labels: b.Thanos.Labels, }) } diff --git a/tools/compaction-planner/main.go b/tools/compaction-planner/main.go index 0e315f40346..becc240869e 100644 --- a/tools/compaction-planner/main.go +++ b/tools/compaction-planner/main.go @@ -16,7 +16,6 @@ import ( gokitlog "github.com/go-kit/log" "github.com/grafana/dskit/flagext" - "github.com/oklog/ulid" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/model/timestamp" @@ -76,24 +75,7 @@ func main() { log.Println("Using index from", time.Unix(idx.UpdatedAt, 0).UTC().Format(time.RFC3339)) - // convert index to metas. - deleted := map[ulid.ULID]bool{} - for _, id := range idx.BlockDeletionMarks.GetULIDs() { - deleted[id] = true - } - - metas := map[ulid.ULID]*block.Meta{} - for _, b := range idx.Blocks { - if deleted[b.ID] { - continue - } - metas[b.ID] = b.ThanosMeta() - if metas[b.ID].Thanos.Labels == nil { - metas[b.ID].Thanos.Labels = map[string]string{} - } - metas[b.ID].Thanos.Labels[mimir_tsdb.CompactorShardIDExternalLabel] = b.CompactorShardID // Needed for correct planning. - } - + metas := compactor.ConvertBucketIndexToMetasForCompactionJobPlanning(idx) synced := extprom.NewTxGaugeVec(nil, prometheus.GaugeOpts{Name: "synced", Help: "Number of block metadata synced"}, []string{"state"}, []string{block.MarkedForNoCompactionMeta}, )