Skip to content

Commit

Permalink
Improve compactor job estimation by tracking external labels in bucke…
Browse files Browse the repository at this point in the history
…t index (#7745)

* reduce scope leakage of temp err

* Replace bool value with empty struct for map used just for membership checks.

* Add Labels as bucketindex Block field, copy it over when updating index from blocks.

* Copy bucket index block labels into block meta labels so estimates can match compaction.

* Update bucket index tests to reflect that bucket index now includes block labels.

* Update Block.Labels doc.

* Special insertion of __compactor_shard_id__ label no longer necessary as it exists in block labels.

* Add changelog entry.

* Continue to restore shard ID label from block.CompactorShardID. Make compaction-planner and estimator agree.

* Recover shard ID into labels whenever possible. Remove some code duplication.

* Clarify scope of change.

* Tests to cover ConvertBucketIndexToMetasForCompactionJobPlanning.

* Don't let BlockFromThanosMeta, block.ThanosMeta return nil label maps.

* Undo trying to guarantee non-nil maps, as json.Decoder doesn't do that in ReadIndex.

* I forgot to put the nil check back.

* Link to PR, not issue.

* Make estimator match compactor w/r/t ignoring deprecated labels.

* tabularize tests.

* test case names -> map keys

* Remove info from doc comment that is redundant with docs on those constants.
  • Loading branch information
seizethedave authored Apr 3, 2024
1 parent fee60b8 commit 9215d7a
Show file tree
Hide file tree
Showing 8 changed files with 260 additions and 61 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
* [BUGFIX] Ingester: don't retain blocks if they finish exactly on the boundary of the retention window. #7656
* [BUGFIX] Bug-fixes and improvements to experimental native histograms. #7744
* [BUGFIX] Querier: return an error when a query uses `label_join` with an invalid destination label name. #7744
* [BUGFIX] Compactor: correct outstanding job estimation in metrics and `compaction-planner` tool when block labels differ. #7745

### Mixin

Expand Down
24 changes: 16 additions & 8 deletions pkg/compactor/blocks_cleaner.go
Original file line number Diff line number Diff line change
Expand Up @@ -683,18 +683,18 @@ func stalePartialBlockLastModifiedTime(ctx context.Context, blockID ulid.ULID, u
}

func estimateCompactionJobsFromBucketIndex(ctx context.Context, userID string, userBucket objstore.InstrumentedBucket, idx *bucketindex.Index, compactionBlockRanges mimir_tsdb.DurationList, mergeShards int, splitGroups int) ([]*Job, error) {
metas := convertBucketIndexToMetasForCompactionJobPlanning(idx)
metas := ConvertBucketIndexToMetasForCompactionJobPlanning(idx)

// We need to pass this metric to MetadataFilters, but we don't need to report this value from BlocksCleaner.
synced := newNoopGaugeVec()

for _, f := range []block.MetadataFilter{
NewLabelRemoverFilter(compactionIgnoredLabels),
// We don't include ShardAwareDeduplicateFilter, because it relies on list of compaction sources, which are not present in the BucketIndex.
// We do include NoCompactionMarkFilter to avoid computing jobs from blocks that are marked for no-compaction.
NewNoCompactionMarkFilter(userBucket),
} {
err := f.Filter(ctx, metas, synced)
if err != nil {
if err := f.Filter(ctx, metas, synced); err != nil {
return nil, err
}
}
Expand All @@ -705,23 +705,31 @@ func estimateCompactionJobsFromBucketIndex(ctx context.Context, userID string, u
}

// Convert index into map of block Metas, but ignore blocks marked for deletion.
func convertBucketIndexToMetasForCompactionJobPlanning(idx *bucketindex.Index) map[ulid.ULID]*block.Meta {
func ConvertBucketIndexToMetasForCompactionJobPlanning(idx *bucketindex.Index) map[ulid.ULID]*block.Meta {
deletedULIDs := idx.BlockDeletionMarks.GetULIDs()
deleted := make(map[ulid.ULID]bool, len(deletedULIDs))
deleted := make(map[ulid.ULID]struct{}, len(deletedULIDs))
for _, id := range deletedULIDs {
deleted[id] = true
deleted[id] = struct{}{}
}

metas := map[ulid.ULID]*block.Meta{}
for _, b := range idx.Blocks {
if deleted[b.ID] {
if _, del := deleted[b.ID]; del {
continue
}
metas[b.ID] = b.ThanosMeta()
if metas[b.ID].Thanos.Labels == nil {
metas[b.ID].Thanos.Labels = map[string]string{}
}
metas[b.ID].Thanos.Labels[mimir_tsdb.CompactorShardIDExternalLabel] = b.CompactorShardID // Needed for correct planning.

// Correct planning depends on external labels being present. We didn't
// always persist labels into the bucket index, but we may have tracked
// the shard ID label, so copy that back over if it isn't there.
if b.CompactorShardID != "" {
if _, found := metas[b.ID].Thanos.Labels[mimir_tsdb.CompactorShardIDExternalLabel]; !found {
metas[b.ID].Thanos.Labels[mimir_tsdb.CompactorShardIDExternalLabel] = b.CompactorShardID
}
}
}
return metas
}
Expand Down
206 changes: 182 additions & 24 deletions pkg/compactor/blocks_cleaner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/client_golang/prometheus/testutil"
prom_tsdb "github.com/prometheus/prometheus/tsdb"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/thanos-io/objstore"
Expand Down Expand Up @@ -1044,41 +1045,198 @@ func TestComputeCompactionJobs(t *testing.T) {
}

const user = "test"

twoHoursMS := 2 * time.Hour.Milliseconds()
dayMS := 24 * time.Hour.Milliseconds()

userBucket := bucket.NewUserBucketClient(user, bucketClient, nil)

// Mark block for no-compaction.
blockMarkedForNoCompact := ulid.MustNew(ulid.Now(), rand.Reader)
require.NoError(t, block.MarkForNoCompact(context.Background(), log.NewNopLogger(), userBucket, blockMarkedForNoCompact, block.CriticalNoCompactReason, "testing", promauto.With(nil).NewCounter(prometheus.CounterOpts{})))

index := bucketindex.Index{}
index.Blocks = bucketindex.Blocks{
// Some 2h blocks that should be compacted together and split.
&bucketindex.Block{ID: ulid.MustNew(ulid.Now(), rand.Reader), MinTime: 0, MaxTime: twoHoursMS},
&bucketindex.Block{ID: ulid.MustNew(ulid.Now(), rand.Reader), MinTime: 0, MaxTime: twoHoursMS},
&bucketindex.Block{ID: ulid.MustNew(ulid.Now(), rand.Reader), MinTime: 0, MaxTime: twoHoursMS},
cases := map[string]struct {
blocks bucketindex.Blocks
expectedSplits int
expectedMerges int
}{
"standard": {
blocks: bucketindex.Blocks{
// Some 2h blocks that should be compacted together and split.
&bucketindex.Block{ID: ulid.MustNew(ulid.Now(), rand.Reader), MinTime: 0, MaxTime: twoHoursMS},
&bucketindex.Block{ID: ulid.MustNew(ulid.Now(), rand.Reader), MinTime: 0, MaxTime: twoHoursMS},
&bucketindex.Block{ID: ulid.MustNew(ulid.Now(), rand.Reader), MinTime: 0, MaxTime: twoHoursMS},

// Some merge jobs.
&bucketindex.Block{ID: ulid.MustNew(ulid.Now(), rand.Reader), MinTime: dayMS, MaxTime: 2 * dayMS, CompactorShardID: "1_of_3"},
&bucketindex.Block{ID: ulid.MustNew(ulid.Now(), rand.Reader), MinTime: dayMS, MaxTime: 2 * dayMS, CompactorShardID: "1_of_3"},

&bucketindex.Block{ID: ulid.MustNew(ulid.Now(), rand.Reader), MinTime: dayMS, MaxTime: 2 * dayMS, CompactorShardID: "2_of_3"},
&bucketindex.Block{ID: ulid.MustNew(ulid.Now(), rand.Reader), MinTime: dayMS, MaxTime: 2 * dayMS, CompactorShardID: "2_of_3"},

// This merge job is skipped, as block is marked for no-compaction.
&bucketindex.Block{ID: ulid.MustNew(ulid.Now(), rand.Reader), MinTime: dayMS, MaxTime: 2 * dayMS, CompactorShardID: "3_of_3"},
&bucketindex.Block{ID: blockMarkedForNoCompact, MinTime: dayMS, MaxTime: 2 * dayMS, CompactorShardID: "3_of_3"},
},
expectedSplits: 1,
expectedMerges: 2,
},
"labels don't match": {
blocks: bucketindex.Blocks{
// Compactor wouldn't produce a job for this pair as their external labels differ:
&bucketindex.Block{ID: ulid.MustNew(ulid.Now(), rand.Reader), MinTime: 5 * dayMS, MaxTime: 6 * dayMS,
Labels: map[string]string{
tsdb.OutOfOrderExternalLabel: tsdb.OutOfOrderExternalLabelValue,
},
},
&bucketindex.Block{ID: ulid.MustNew(ulid.Now(), rand.Reader), MinTime: 5 * dayMS, MaxTime: 6 * dayMS,
Labels: map[string]string{
"another_label": "-1",
},
},
},
expectedSplits: 0,
expectedMerges: 0,
},
"ignore deprecated labels": {
blocks: bucketindex.Blocks{
// Compactor will ignore deprecated labels when computing jobs. Estimation should do the same.
&bucketindex.Block{ID: ulid.MustNew(ulid.Now(), rand.Reader), MinTime: 5 * dayMS, MaxTime: 6 * dayMS,
Labels: map[string]string{
"honored_label": "12345",
tsdb.DeprecatedTenantIDExternalLabel: "tenant1",
tsdb.DeprecatedIngesterIDExternalLabel: "ingester1",
},
},
&bucketindex.Block{ID: ulid.MustNew(ulid.Now(), rand.Reader), MinTime: 5 * dayMS, MaxTime: 6 * dayMS,
Labels: map[string]string{
"honored_label": "12345",
tsdb.DeprecatedTenantIDExternalLabel: "tenant2",
tsdb.DeprecatedIngesterIDExternalLabel: "ingester2",
},
},
},
expectedSplits: 0,
expectedMerges: 1,
},
}

// Some merge jobs.
&bucketindex.Block{ID: ulid.MustNew(ulid.Now(), rand.Reader), MinTime: dayMS, MaxTime: 2 * dayMS, CompactorShardID: "1_of_3"},
&bucketindex.Block{ID: ulid.MustNew(ulid.Now(), rand.Reader), MinTime: dayMS, MaxTime: 2 * dayMS, CompactorShardID: "1_of_3"},
for name, c := range cases {
t.Run(name, func(t *testing.T) {
index := &bucketindex.Index{Blocks: c.blocks}
jobs, err := estimateCompactionJobsFromBucketIndex(context.Background(), user, userBucket, index, cfg.CompactionBlockRanges, 3, 0)
require.NoError(t, err)
split, merge := computeSplitAndMergeJobs(jobs)
require.Equal(t, c.expectedSplits, split)
require.Equal(t, c.expectedMerges, merge)
})
}
}

&bucketindex.Block{ID: ulid.MustNew(ulid.Now(), rand.Reader), MinTime: dayMS, MaxTime: 2 * dayMS, CompactorShardID: "2_of_3"},
&bucketindex.Block{ID: ulid.MustNew(ulid.Now(), rand.Reader), MinTime: dayMS, MaxTime: 2 * dayMS, CompactorShardID: "2_of_3"},
func TestConvertBucketIndexToMetasForCompactionJobPlanning(t *testing.T) {
twoHoursMS := 2 * time.Hour.Milliseconds()

// This merge job is skipped, as block is marked for no-compaction.
&bucketindex.Block{ID: ulid.MustNew(ulid.Now(), rand.Reader), MinTime: dayMS, MaxTime: 2 * dayMS, CompactorShardID: "3_of_3"},
&bucketindex.Block{ID: blockMarkedForNoCompact, MinTime: dayMS, MaxTime: 2 * dayMS, CompactorShardID: "3_of_3"},
makeUlid := func(n byte) ulid.ULID {
return ulid.ULID{n}
}

userBucket := bucket.NewUserBucketClient(user, bucketClient, nil)
// Mark block for no-compaction.
require.NoError(t, block.MarkForNoCompact(context.Background(), log.NewNopLogger(), userBucket, blockMarkedForNoCompact, block.CriticalNoCompactReason, "testing", promauto.With(nil).NewCounter(prometheus.CounterOpts{})))
makeMeta := func(id ulid.ULID, labels map[string]string) *block.Meta {
return &block.Meta{
BlockMeta: prom_tsdb.BlockMeta{
ULID: id,
MinTime: 0,
MaxTime: twoHoursMS,
Version: block.TSDBVersion1,
},
Thanos: block.ThanosMeta{
Version: block.ThanosVersion1,
Labels: labels,
},
}
}

// No grouping of jobs for split-compaction. All jobs will be in single split compaction.
jobs, err := estimateCompactionJobsFromBucketIndex(context.Background(), user, userBucket, &index, cfg.CompactionBlockRanges, 3, 0)
require.NoError(t, err)
split, merge := computeSplitAndMergeJobs(jobs)
require.Equal(t, 1, split)
require.Equal(t, 2, merge)
cases := map[string]struct {
index *bucketindex.Index
expectedMetas map[ulid.ULID]*block.Meta
}{
"empty": {
index: &bucketindex.Index{Blocks: bucketindex.Blocks{}},
expectedMetas: map[ulid.ULID]*block.Meta{},
},
"basic": {
index: &bucketindex.Index{
Blocks: bucketindex.Blocks{
&bucketindex.Block{ID: makeUlid(1), MinTime: 0, MaxTime: twoHoursMS},
},
},
expectedMetas: map[ulid.ULID]*block.Meta{
makeUlid(1): makeMeta(makeUlid(1), map[string]string{}),
},
},
"adopt shard ID": {
index: &bucketindex.Index{
Blocks: bucketindex.Blocks{
&bucketindex.Block{ID: makeUlid(1), MinTime: 0, MaxTime: twoHoursMS, CompactorShardID: "78"},
},
},
expectedMetas: map[ulid.ULID]*block.Meta{
makeUlid(1): makeMeta(makeUlid(1), map[string]string{tsdb.CompactorShardIDExternalLabel: "78"}),
},
},
"use labeled shard ID": {
index: &bucketindex.Index{
Blocks: bucketindex.Blocks{
&bucketindex.Block{ID: makeUlid(1), MinTime: 0, MaxTime: twoHoursMS,
Labels: map[string]string{tsdb.CompactorShardIDExternalLabel: "3"}},
},
},
expectedMetas: map[ulid.ULID]*block.Meta{
makeUlid(1): makeMeta(makeUlid(1), map[string]string{tsdb.CompactorShardIDExternalLabel: "3"}),
},
},
"don't overwrite labeled shard ID": {
index: &bucketindex.Index{
Blocks: bucketindex.Blocks{
&bucketindex.Block{ID: makeUlid(1), MinTime: 0, MaxTime: twoHoursMS, CompactorShardID: "78",
Labels: map[string]string{tsdb.CompactorShardIDExternalLabel: "3"}},
},
},
expectedMetas: map[ulid.ULID]*block.Meta{
makeUlid(1): makeMeta(makeUlid(1), map[string]string{tsdb.CompactorShardIDExternalLabel: "3"}),
},
},
"honor deletion marks": {
index: &bucketindex.Index{
BlockDeletionMarks: bucketindex.BlockDeletionMarks{
&bucketindex.BlockDeletionMark{ID: makeUlid(14)},
},
Blocks: bucketindex.Blocks{
&bucketindex.Block{ID: makeUlid(14), MinTime: 0, MaxTime: twoHoursMS},
},
},
expectedMetas: map[ulid.ULID]*block.Meta{},
},
"excess deletes": {
index: &bucketindex.Index{
BlockDeletionMarks: bucketindex.BlockDeletionMarks{
&bucketindex.BlockDeletionMark{ID: makeUlid(15)},
&bucketindex.BlockDeletionMark{ID: makeUlid(16)},
},
Blocks: bucketindex.Blocks{
&bucketindex.Block{ID: makeUlid(14), MinTime: 0, MaxTime: twoHoursMS},
},
},
expectedMetas: map[ulid.ULID]*block.Meta{
makeUlid(14): makeMeta(makeUlid(14), map[string]string{}),
},
},
}

for name, c := range cases {
t.Run(name, func(t *testing.T) {
m := ConvertBucketIndexToMetasForCompactionJobPlanning(c.index)
require.Equal(t, c.expectedMetas, m)
})
}
}

type mockBucketFailure struct {
Expand Down
17 changes: 9 additions & 8 deletions pkg/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,14 @@ var (
errInvalidSymbolFlushersConcurrency = fmt.Errorf("invalid symbols-flushers-concurrency value, must be positive")
errInvalidMaxBlockUploadValidationConcurrency = fmt.Errorf("invalid max-block-upload-validation-concurrency value, can't be negative")
RingOp = ring.NewOp([]ring.InstanceState{ring.ACTIVE}, nil)

// compactionIgnoredLabels defines the external labels that compactor will
// drop/ignore when planning jobs so that they don't keep blocks from
// compacting together.
compactionIgnoredLabels = []string{
mimir_tsdb.DeprecatedIngesterIDExternalLabel,
mimir_tsdb.DeprecatedTenantIDExternalLabel,
}
)

// BlocksGrouperFactory builds and returns the grouper to use to compact a tenant's blocks.
Expand Down Expand Up @@ -733,14 +741,7 @@ func (c *MultitenantCompactor) compactUser(ctx context.Context, userID string) e

// List of filters to apply (order matters).
fetcherFilters := []block.MetadataFilter{
// Remove the ingester ID because we don't shard blocks anymore, while still
// honoring the shard ID if sharding was done in the past.
// Remove TenantID external label to make sure that we compact blocks with and without the label
// together.
NewLabelRemoverFilter([]string{
mimir_tsdb.DeprecatedTenantIDExternalLabel,
mimir_tsdb.DeprecatedIngesterIDExternalLabel,
}),
NewLabelRemoverFilter(compactionIgnoredLabels),
deduplicateBlocksFilter,
// removes blocks that should not be compacted due to being marked so.
NewNoCompactionMarkFilter(userBucket),
Expand Down
7 changes: 7 additions & 0 deletions pkg/storage/tsdb/bucketindex/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package bucketindex

import (
"fmt"
"maps"
"path/filepath"
"strings"
"time"
Expand Down Expand Up @@ -97,6 +98,9 @@ type Block struct {

// Whether the block was from out of order samples
OutOfOrder bool `json:"out_of_order,omitempty"`

// Labels contains the external labels from the block's metadata.
Labels map[string]string `json:"labels,omitempty"`
}

// Within returns whether the block contains samples within the provided range.
Expand All @@ -118,6 +122,7 @@ func (m *Block) ThanosMeta() *block.Meta {
if m.OutOfOrder {
compactionHints = []string{tsdb.CompactionHintFromOutOfOrder}
}

return &block.Meta{
BlockMeta: tsdb.BlockMeta{
ULID: m.ID,
Expand All @@ -133,6 +138,7 @@ func (m *Block) ThanosMeta() *block.Meta {
Version: block.ThanosVersion1,
SegmentFiles: m.thanosMetaSegmentFiles(),
Source: block.SourceType(m.Source),
Labels: maps.Clone(m.Labels),
},
}
}
Expand Down Expand Up @@ -172,6 +178,7 @@ func BlockFromThanosMeta(meta block.Meta) *Block {
Source: string(meta.Thanos.Source),
CompactionLevel: meta.Compaction.Level,
OutOfOrder: meta.Compaction.FromOutOfOrder(),
Labels: maps.Clone(meta.Thanos.Labels),
}
}

Expand Down
Loading

0 comments on commit 9215d7a

Please sign in to comment.