From 1f70986e6fe0e7aa6625e8b2792b205cbb593eee Mon Sep 17 00:00:00 2001 From: Yi Jin Date: Sat, 23 Mar 2024 23:50:26 -0700 Subject: [PATCH 1/4] [PLAT-104290] mark no compaction if blocks are partially overlapping Signed-off-by: Yi Jin --- pkg/compact/overlapping.go | 40 +++++++++++++++++++++++---------- pkg/compact/overlapping_test.go | 40 +++++++++++++-------------------- 2 files changed, 43 insertions(+), 37 deletions(-) diff --git a/pkg/compact/overlapping.go b/pkg/compact/overlapping.go index adc363ba63e..46af74d38a0 100644 --- a/pkg/compact/overlapping.go +++ b/pkg/compact/overlapping.go @@ -19,8 +19,12 @@ import ( "github.com/thanos-io/thanos/pkg/block/metadata" ) +const overlappingNoCompactionReason = "blocks-overlapping" + type OverlappingCompactionLifecycleCallback struct { - overlappingBlocks prometheus.Counter + overlappingBlocks prometheus.Counter + noCompactionBlocks prometheus.Counter + deletedBlocks prometheus.Counter } func NewOverlappingCompactionLifecycleCallback(reg *prometheus.Registry, enabled bool) CompactionLifecycleCallback { @@ -28,7 +32,15 @@ func NewOverlappingCompactionLifecycleCallback(reg *prometheus.Registry, enabled return OverlappingCompactionLifecycleCallback{ overlappingBlocks: promauto.With(reg).NewCounter(prometheus.CounterOpts{ Name: "thanos_compact_group_overlapping_blocks_total", - Help: "Total number of blocks that are overlapping and to be deleted.", + Help: "Total number of blocks that are overlapping.", + }), + noCompactionBlocks: promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "thanos_compact_group_overlapping_blocks_no_compaction_total", + Help: "Total number of blocks that are overlapping and mark no compaction.", + }), + deletedBlocks: promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "thanos_compact_group_overlapping_blocks_deleted_total", + Help: "Total number of blocks that are overlapping and deleted.", }), } } @@ -51,15 +63,20 @@ func (o OverlappingCompactionLifecycleCallback) PreCompactionCallback(ctx contex } else if currB.MinTime < prevB.MinTime { // halt when the assumption is broken, the input toCompact isn't sorted by minTime, need manual investigation return halt(errors.Errorf("later blocks has smaller minTime than previous block: %s -- %s", prevB.String(), currB.String())) - } else if prevB.MaxTime < currB.MaxTime && prevB.MinTime != currB.MinTime { - err := errors.Errorf("found partially overlapping block: %s -- %s", prevB.String(), currB.String()) - if cg.enableVerticalCompaction { - level.Error(logger).Log("msg", "best effort to vertical compact", "err", err) - prev = curr - continue - } else { - return halt(err) + } + // prev min <= curr min < prev max + o.overlappingBlocks.Inc() + if prevB.MaxTime < currB.MaxTime && prevB.MinTime != currB.MinTime { + reason := fmt.Errorf("found partially overlapping block: %s -- %s", prevB.String(), currB.String()) + if err := block.MarkForNoCompact(ctx, logger, cg.bkt, prevB.ULID, overlappingNoCompactionReason, + reason.Error(), o.noCompactionBlocks); err != nil { + return retry(err) + } + if err := block.MarkForNoCompact(ctx, logger, cg.bkt, currB.ULID, overlappingNoCompactionReason, + reason.Error(), o.noCompactionBlocks); err != nil { + return retry(err) } + return retry(reason) } else if prevB.MinTime == currB.MinTime && prevB.MaxTime == currB.MaxTime { if prevB.Stats.NumSeries != currB.Stats.NumSeries || prevB.Stats.NumSamples != currB.Stats.NumSamples { level.Warn(logger).Log("msg", "found same time range but different stats, keep both blocks", @@ -70,7 +87,6 @@ func (o OverlappingCompactionLifecycleCallback) PreCompactionCallback(ctx contex continue } } - // prev min <= curr min < prev max toDelete := -1 if prevB.MaxTime >= currB.MaxTime { toDelete = curr @@ -82,7 +98,7 @@ func (o OverlappingCompactionLifecycleCallback) PreCompactionCallback(ctx contex level.Warn(logger).Log("msg", "found overlapping block in plan, keep current block", "toKeep", currB.String(), "toDelete", prevB.String()) } - o.overlappingBlocks.Inc() + o.deletedBlocks.Inc() if err := DeleteBlockNow(ctx, logger, cg.bkt, toCompact[toDelete]); err != nil { return retry(err) } diff --git a/pkg/compact/overlapping_test.go b/pkg/compact/overlapping_test.go index ab6d1051bb3..b1146199044 100644 --- a/pkg/compact/overlapping_test.go +++ b/pkg/compact/overlapping_test.go @@ -41,12 +41,11 @@ func TestPreCompactionCallback(t *testing.T) { } callback := NewOverlappingCompactionLifecycleCallback(reg, true) for _, tcase := range []struct { - testName string - input []*metadata.Meta - enableVerticalCompaction bool - expectedSize int - expectedBlocks []*metadata.Meta - err error + testName string + input []*metadata.Meta + expectedSize int + expectedBlocks []*metadata.Meta + err error }{ { testName: "empty blocks", @@ -139,38 +138,29 @@ func TestPreCompactionCallback(t *testing.T) { createCustomBlockMeta(7, 0, 5, metadata.CompactorSource, 1), createCustomBlockMeta(8, 5, 8, metadata.CompactorSource, 1), }, - err: halt(errors.Errorf("expect halt error")), + err: halt(errors.Errorf("later blocks has smaller minTime than previous block: %s -- %s", + createCustomBlockMeta(6, 2, 3, metadata.CompactorSource, 1).String(), + createCustomBlockMeta(7, 0, 5, metadata.CompactorSource, 1).String(), + )), }, { - testName: "partially overlapping blocks with vertical compaction off", + testName: "partially overlapping blocks", input: []*metadata.Meta{ createCustomBlockMeta(6, 2, 4, metadata.CompactorSource, 1), createCustomBlockMeta(7, 3, 6, metadata.CompactorSource, 1), createCustomBlockMeta(8, 5, 8, metadata.CompactorSource, 1), }, - err: halt(errors.Errorf("expect halt error")), - }, - { - testName: "partially overlapping blocks with vertical compaction on", - input: []*metadata.Meta{ - createCustomBlockMeta(6, 2, 4, metadata.CompactorSource, 1), - createCustomBlockMeta(7, 3, 6, metadata.CompactorSource, 1), - createCustomBlockMeta(8, 5, 8, metadata.CompactorSource, 1), - }, - enableVerticalCompaction: true, - expectedSize: 3, + err: retry(errors.Errorf("found partially overlapping block: %s -- %s", + createCustomBlockMeta(6, 2, 4, metadata.CompactorSource, 1).String(), + createCustomBlockMeta(7, 3, 6, metadata.CompactorSource, 1).String(), + )), }, } { if ok := t.Run(tcase.testName, func(t *testing.T) { - group.enableVerticalCompaction = tcase.enableVerticalCompaction err := callback.PreCompactionCallback(context.Background(), logger, group, tcase.input) if tcase.err != nil { testutil.NotOk(t, err) - if IsHaltError(tcase.err) { - testutil.Assert(t, IsHaltError(err), "expected halt error") - } else if IsRetryError(tcase.err) { - testutil.Assert(t, IsRetryError(err), "expected retry error") - } + testutil.Equals(t, tcase.err.Error(), err.Error()) return } testutil.Equals(t, tcase.expectedSize, len(FilterRemovedBlocks(tcase.input))) From 24fde69c257e12568dfc349c0e27e0715473f362 Mon Sep 17 00:00:00 2001 From: Yi Jin Date: Mon, 25 Mar 2024 18:29:56 -0700 Subject: [PATCH 2/4] [PLAT-104290] mark no compaction or no downsampling instead of deletion Signed-off-by: Yi Jin --- pkg/compact/compact.go | 1 - .../downsample/streamed_block_writer.go | 2 +- pkg/compact/overlapping.go | 73 +++++-------- pkg/compact/overlapping_test.go | 102 +++++++----------- 4 files changed, 66 insertions(+), 112 deletions(-) diff --git a/pkg/compact/compact.go b/pkg/compact/compact.go index 52133ec2eac..3fe1b5eeaf7 100644 --- a/pkg/compact/compact.go +++ b/pkg/compact/compact.go @@ -1115,7 +1115,6 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp if err := compactionLifecycleCallback.PreCompactionCallback(ctx, cg.logger, cg, toCompact); err != nil { return false, ulid.ULID{}, errors.Wrapf(err, "failed to run pre compaction callback for plan: %s", fmt.Sprintf("%v", toCompact)) } - toCompact = FilterRemovedBlocks(toCompact) level.Info(cg.logger).Log("msg", "finished running pre compaction callback; downloading blocks", "duration", time.Since(begin), "duration_ms", time.Since(begin).Milliseconds(), "plan", fmt.Sprintf("%v", toCompact)) begin = time.Now() diff --git a/pkg/compact/downsample/streamed_block_writer.go b/pkg/compact/downsample/streamed_block_writer.go index 1db0496376d..6bb9f1445a0 100644 --- a/pkg/compact/downsample/streamed_block_writer.go +++ b/pkg/compact/downsample/streamed_block_writer.go @@ -113,7 +113,7 @@ func (w *streamedBlockWriter) WriteSeries(lset labels.Labels, chunks []chunks.Me } if len(chunks) == 0 { - level.Warn(w.logger).Log("msg", "empty chunks happened, skip series", "series", strings.ReplaceAll(lset.String(), "\"", "'")) + level.Debug(w.logger).Log("msg", "empty chunks happened, skip series", "series", strings.ReplaceAll(lset.String(), "\"", "'")) return nil } diff --git a/pkg/compact/overlapping.go b/pkg/compact/overlapping.go index 46af74d38a0..8db34ca3985 100644 --- a/pkg/compact/overlapping.go +++ b/pkg/compact/overlapping.go @@ -14,17 +14,16 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/prometheus/tsdb" - "github.com/thanos-io/objstore" "github.com/thanos-io/thanos/pkg/block" "github.com/thanos-io/thanos/pkg/block/metadata" ) -const overlappingNoCompactionReason = "blocks-overlapping" +const overlappingReason = "blocks-overlapping" type OverlappingCompactionLifecycleCallback struct { - overlappingBlocks prometheus.Counter - noCompactionBlocks prometheus.Counter - deletedBlocks prometheus.Counter + overlappingBlocks prometheus.Counter + noCompaction prometheus.Counter + noDownsampling prometheus.Counter } func NewOverlappingCompactionLifecycleCallback(reg *prometheus.Registry, enabled bool) CompactionLifecycleCallback { @@ -34,13 +33,13 @@ func NewOverlappingCompactionLifecycleCallback(reg *prometheus.Registry, enabled Name: "thanos_compact_group_overlapping_blocks_total", Help: "Total number of blocks that are overlapping.", }), - noCompactionBlocks: promauto.With(reg).NewCounter(prometheus.CounterOpts{ + noCompaction: promauto.With(reg).NewCounter(prometheus.CounterOpts{ Name: "thanos_compact_group_overlapping_blocks_no_compaction_total", Help: "Total number of blocks that are overlapping and mark no compaction.", }), - deletedBlocks: promauto.With(reg).NewCounter(prometheus.CounterOpts{ - Name: "thanos_compact_group_overlapping_blocks_deleted_total", - Help: "Total number of blocks that are overlapping and deleted.", + noDownsampling: promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "thanos_compact_group_overlapping_blocks_no_downsampling_total", + Help: "Total number of blocks that are overlapping and mark no downsampling.", }), } } @@ -54,6 +53,7 @@ func (o OverlappingCompactionLifecycleCallback) PreCompactionCallback(ctx contex return nil } prev := 0 + var reason error for curr, currB := range toCompact { prevB := toCompact[prev] if curr == 0 || currB.Thanos.Source == metadata.ReceiveSource || prevB.MaxTime <= currB.MinTime { @@ -67,13 +67,14 @@ func (o OverlappingCompactionLifecycleCallback) PreCompactionCallback(ctx contex // prev min <= curr min < prev max o.overlappingBlocks.Inc() if prevB.MaxTime < currB.MaxTime && prevB.MinTime != currB.MinTime { - reason := fmt.Errorf("found partially overlapping block: %s -- %s", prevB.String(), currB.String()) - if err := block.MarkForNoCompact(ctx, logger, cg.bkt, prevB.ULID, overlappingNoCompactionReason, - reason.Error(), o.noCompactionBlocks); err != nil { + o.noCompaction.Inc() + reason = fmt.Errorf("found partially overlapping block: %s -- %s", prevB.String(), currB.String()) + if err := block.MarkForNoCompact(ctx, logger, cg.bkt, prevB.ULID, overlappingReason, + reason.Error(), o.noCompaction); err != nil { return retry(err) } - if err := block.MarkForNoCompact(ctx, logger, cg.bkt, currB.ULID, overlappingNoCompactionReason, - reason.Error(), o.noCompactionBlocks); err != nil { + if err := block.MarkForNoCompact(ctx, logger, cg.bkt, currB.ULID, overlappingReason, + reason.Error(), o.noCompaction); err != nil { return retry(err) } return retry(reason) @@ -87,24 +88,26 @@ func (o OverlappingCompactionLifecycleCallback) PreCompactionCallback(ctx contex continue } } - toDelete := -1 + var outer, inner *metadata.Meta if prevB.MaxTime >= currB.MaxTime { - toDelete = curr - level.Warn(logger).Log("msg", "found overlapping block in plan, keep previous block", - "toKeep", prevB.String(), "toDelete", currB.String()) + inner = currB + outer = prevB } else if prevB.MaxTime < currB.MaxTime { - toDelete = prev + inner = prevB + outer = currB prev = curr - level.Warn(logger).Log("msg", "found overlapping block in plan, keep current block", - "toKeep", currB.String(), "toDelete", prevB.String()) } - o.deletedBlocks.Inc() - if err := DeleteBlockNow(ctx, logger, cg.bkt, toCompact[toDelete]); err != nil { + reason = fmt.Errorf("found full overlapping block: %s > %s", outer.String(), inner.String()) + if err := block.MarkForNoCompact(ctx, logger, cg.bkt, inner.ULID, overlappingReason, reason.Error(), + o.noCompaction); err != nil { + return retry(err) + } + if err := block.MarkForNoDownsample(ctx, logger, cg.bkt, inner.ULID, overlappingReason, reason.Error(), + o.noDownsampling); err != nil { return retry(err) } - toCompact[toDelete] = nil } - return nil + return reason } func (o OverlappingCompactionLifecycleCallback) PostCompactionCallback(_ context.Context, _ log.Logger, _ *Group, _ ulid.ULID) error { @@ -114,23 +117,3 @@ func (o OverlappingCompactionLifecycleCallback) PostCompactionCallback(_ context func (o OverlappingCompactionLifecycleCallback) GetBlockPopulator(_ context.Context, _ log.Logger, _ *Group) (tsdb.BlockPopulator, error) { return tsdb.DefaultBlockPopulator{}, nil } - -func FilterRemovedBlocks(blocks []*metadata.Meta) (res []*metadata.Meta) { - for _, b := range blocks { - if b != nil { - res = append(res, b) - } - } - return res -} - -func DeleteBlockNow(ctx context.Context, logger log.Logger, bkt objstore.Bucket, m *metadata.Meta) error { - level.Warn(logger).Log("msg", "delete polluted block immediately", "block", m.String(), - "level", m.Compaction.Level, "parents", fmt.Sprintf("%v", m.Compaction.Parents), - "resolution", m.Thanos.Downsample.Resolution, "source", m.Thanos.Source, "labels", m.Thanos.GetLabels(), - "series", m.Stats.NumSeries, "samples", m.Stats.NumSamples, "chunks", m.Stats.NumChunks) - if err := block.Delete(ctx, logger, bkt, m.ULID); err != nil { - return errors.Wrapf(err, "delete overlapping block %s", m.String()) - } - return nil -} diff --git a/pkg/compact/overlapping_test.go b/pkg/compact/overlapping_test.go index b1146199044..a595b109d6a 100644 --- a/pkg/compact/overlapping_test.go +++ b/pkg/compact/overlapping_test.go @@ -5,6 +5,8 @@ package compact import ( "context" + "fmt" + "path" "testing" "github.com/efficientgo/core/testutil" @@ -16,36 +18,15 @@ import ( "github.com/thanos-io/thanos/pkg/compact/downsample" ) -func TestFilterNilCompact(t *testing.T) { - blocks := []*metadata.Meta{nil, nil} - filtered := FilterRemovedBlocks(blocks) - testutil.Equals(t, 0, len(filtered)) - - meta := []*metadata.Meta{ - createCustomBlockMeta(6, 1, 3, metadata.CompactorSource, 1), - nil, - createCustomBlockMeta(7, 3, 5, metadata.CompactorSource, 2), - createCustomBlockMeta(8, 5, 10, metadata.CompactorSource, 3), - nil, - } - testutil.Equals(t, 3, len(FilterRemovedBlocks(meta))) -} - func TestPreCompactionCallback(t *testing.T) { reg := prometheus.NewRegistry() logger := log.NewNopLogger() - bkt := objstore.NewInMemBucket() - group := &Group{ - logger: log.NewNopLogger(), - bkt: bkt, - } callback := NewOverlappingCompactionLifecycleCallback(reg, true) for _, tcase := range []struct { - testName string - input []*metadata.Meta - expectedSize int - expectedBlocks []*metadata.Meta - err error + testName string + input []*metadata.Meta + expectedMarks map[int]int + expectedErr error }{ { testName: "empty blocks", @@ -57,7 +38,6 @@ func TestPreCompactionCallback(t *testing.T) { createCustomBlockMeta(7, 3, 5, metadata.CompactorSource, 1), createCustomBlockMeta(8, 5, 10, metadata.CompactorSource, 1), }, - expectedSize: 3, }, { testName: "duplicated blocks", @@ -66,10 +46,7 @@ func TestPreCompactionCallback(t *testing.T) { createCustomBlockMeta(7, 1, 7, metadata.CompactorSource, 1), createCustomBlockMeta(8, 1, 7, metadata.CompactorSource, 1), }, - expectedSize: 1, - expectedBlocks: []*metadata.Meta{ - createCustomBlockMeta(6, 1, 7, metadata.CompactorSource, 1), - }, + expectedMarks: map[int]int{7: 2, 8: 2}, }, { testName: "overlap non dup blocks", @@ -78,11 +55,7 @@ func TestPreCompactionCallback(t *testing.T) { createCustomBlockMeta(7, 1, 7, metadata.CompactorSource, 2), createCustomBlockMeta(8, 1, 7, metadata.CompactorSource, 2), }, - expectedSize: 2, - expectedBlocks: []*metadata.Meta{ - createCustomBlockMeta(6, 1, 7, metadata.CompactorSource, 1), - createCustomBlockMeta(7, 1, 7, metadata.CompactorSource, 2), - }, + expectedMarks: map[int]int{8: 2}, }, { testName: "receive blocks", @@ -91,7 +64,6 @@ func TestPreCompactionCallback(t *testing.T) { createCustomBlockMeta(7, 1, 7, metadata.ReceiveSource, 2), createCustomBlockMeta(8, 1, 7, metadata.ReceiveSource, 3), }, - expectedSize: 3, }, { testName: "receive + compactor blocks", @@ -100,11 +72,7 @@ func TestPreCompactionCallback(t *testing.T) { createCustomBlockMeta(7, 2, 7, metadata.CompactorSource, 1), createCustomBlockMeta(8, 2, 8, metadata.ReceiveSource, 1), }, - expectedSize: 2, - expectedBlocks: []*metadata.Meta{ - createCustomBlockMeta(6, 1, 7, metadata.ReceiveSource, 1), - createCustomBlockMeta(8, 2, 8, metadata.ReceiveSource, 1), - }, + expectedMarks: map[int]int{7: 2}, }, { testName: "full overlapping blocks", @@ -113,10 +81,7 @@ func TestPreCompactionCallback(t *testing.T) { createCustomBlockMeta(7, 3, 6, metadata.CompactorSource, 1), createCustomBlockMeta(8, 5, 8, metadata.CompactorSource, 1), }, - expectedSize: 1, - expectedBlocks: []*metadata.Meta{ - createCustomBlockMeta(6, 1, 10, metadata.CompactorSource, 1), - }, + expectedMarks: map[int]int{7: 2, 8: 2}, }, { testName: "part overlapping blocks", @@ -125,11 +90,7 @@ func TestPreCompactionCallback(t *testing.T) { createCustomBlockMeta(2, 1, 6, metadata.CompactorSource, 1), createCustomBlockMeta(3, 6, 8, metadata.CompactorSource, 1), }, - expectedSize: 2, - expectedBlocks: []*metadata.Meta{ - createCustomBlockMeta(2, 1, 6, metadata.CompactorSource, 1), - createCustomBlockMeta(3, 6, 8, metadata.CompactorSource, 1), - }, + expectedMarks: map[int]int{1: 2}, }, { testName: "out of order blocks", @@ -138,10 +99,7 @@ func TestPreCompactionCallback(t *testing.T) { createCustomBlockMeta(7, 0, 5, metadata.CompactorSource, 1), createCustomBlockMeta(8, 5, 8, metadata.CompactorSource, 1), }, - err: halt(errors.Errorf("later blocks has smaller minTime than previous block: %s -- %s", - createCustomBlockMeta(6, 2, 3, metadata.CompactorSource, 1).String(), - createCustomBlockMeta(7, 0, 5, metadata.CompactorSource, 1).String(), - )), + expectedErr: halt(errors.Errorf("some errors")), }, { testName: "partially overlapping blocks", @@ -150,23 +108,33 @@ func TestPreCompactionCallback(t *testing.T) { createCustomBlockMeta(7, 3, 6, metadata.CompactorSource, 1), createCustomBlockMeta(8, 5, 8, metadata.CompactorSource, 1), }, - err: retry(errors.Errorf("found partially overlapping block: %s -- %s", - createCustomBlockMeta(6, 2, 4, metadata.CompactorSource, 1).String(), - createCustomBlockMeta(7, 3, 6, metadata.CompactorSource, 1).String(), - )), + expectedMarks: map[int]int{6: 1, 7: 1}, }, } { if ok := t.Run(tcase.testName, func(t *testing.T) { - err := callback.PreCompactionCallback(context.Background(), logger, group, tcase.input) - if tcase.err != nil { + ctx := context.Background() + bkt := objstore.NewInMemBucket() + group := &Group{logger: log.NewNopLogger(), bkt: bkt} + err := callback.PreCompactionCallback(ctx, logger, group, tcase.input) + if tcase.expectedErr != nil || len(tcase.expectedMarks) != 0 { testutil.NotOk(t, err) - testutil.Equals(t, tcase.err.Error(), err.Error()) - return + } else { + testutil.Ok(t, err) } - testutil.Equals(t, tcase.expectedSize, len(FilterRemovedBlocks(tcase.input))) - if tcase.expectedSize != len(tcase.input) { - testutil.Equals(t, tcase.expectedBlocks, FilterRemovedBlocks(tcase.input)) + objs := bkt.Objects() + expectedSize := 0 + for id, file := range tcase.expectedMarks { + expectedSize += file + _, noCompaction := objs[getFile(id, metadata.NoCompactMarkFilename)] + _, noDownsampling := objs[getFile(id, metadata.NoDownsampleMarkFilename)] + if file <= 2 { + testutil.Assert(t, noCompaction, fmt.Sprintf("expect %d has no compaction", id)) + } + if file == 2 { + testutil.Assert(t, noDownsampling, fmt.Sprintf("expect %d has no downsampling", id)) + } } + testutil.Equals(t, expectedSize, len(objs)) }); !ok { return } @@ -180,3 +148,7 @@ func createCustomBlockMeta(id uint64, minTime, maxTime int64, source metadata.So m.Stats.NumSeries = numSeries return m } + +func getFile(id int, mark string) string { + return path.Join(fmt.Sprintf("%010d", id)+fmt.Sprintf("%016d", 0), mark) +} From 7d4ac34b26960c825bbaff4604758630b5e3366c Mon Sep 17 00:00:00 2001 From: Yi Jin Date: Tue, 26 Mar 2024 22:15:48 -0700 Subject: [PATCH 3/4] [PLAT-104290] must wrap error as retryable to avoid compactor error exit Signed-off-by: Yi Jin --- pkg/compact/overlapping.go | 11 ++++++++--- pkg/compact/overlapping_test.go | 8 ++++++-- 2 files changed, 14 insertions(+), 5 deletions(-) diff --git a/pkg/compact/overlapping.go b/pkg/compact/overlapping.go index 8db34ca3985..32badf425ad 100644 --- a/pkg/compact/overlapping.go +++ b/pkg/compact/overlapping.go @@ -97,14 +97,19 @@ func (o OverlappingCompactionLifecycleCallback) PreCompactionCallback(ctx contex outer = currB prev = curr } - reason = fmt.Errorf("found full overlapping block: %s > %s", outer.String(), inner.String()) + if outer.Thanos.Source == metadata.ReceiveSource { + level.Warn(logger).Log("msg", "bypass if larger blocks are from receive", + "outer", outer.String(), "inner", inner.String()) + continue + } + reason = retry(fmt.Errorf("found full overlapping block: %s > %s", outer.String(), inner.String())) if err := block.MarkForNoCompact(ctx, logger, cg.bkt, inner.ULID, overlappingReason, reason.Error(), o.noCompaction); err != nil { - return retry(err) + return err } if err := block.MarkForNoDownsample(ctx, logger, cg.bkt, inner.ULID, overlappingReason, reason.Error(), o.noDownsampling); err != nil { - return retry(err) + return err } } return reason diff --git a/pkg/compact/overlapping_test.go b/pkg/compact/overlapping_test.go index a595b109d6a..279222264e6 100644 --- a/pkg/compact/overlapping_test.go +++ b/pkg/compact/overlapping_test.go @@ -72,7 +72,6 @@ func TestPreCompactionCallback(t *testing.T) { createCustomBlockMeta(7, 2, 7, metadata.CompactorSource, 1), createCustomBlockMeta(8, 2, 8, metadata.ReceiveSource, 1), }, - expectedMarks: map[int]int{7: 2}, }, { testName: "full overlapping blocks", @@ -116,10 +115,15 @@ func TestPreCompactionCallback(t *testing.T) { bkt := objstore.NewInMemBucket() group := &Group{logger: log.NewNopLogger(), bkt: bkt} err := callback.PreCompactionCallback(ctx, logger, group, tcase.input) - if tcase.expectedErr != nil || len(tcase.expectedMarks) != 0 { + if len(tcase.expectedMarks) != 0 { testutil.NotOk(t, err) + testutil.Assert(t, IsRetryError(err)) + } else if tcase.expectedErr != nil { + testutil.NotOk(t, err) + testutil.Assert(t, IsHaltError(err)) } else { testutil.Ok(t, err) + testutil.Assert(t, err == nil) } objs := bkt.Objects() expectedSize := 0 From 64f2964027c9ac5ef488f86ed5d295cc5d2e938a Mon Sep 17 00:00:00 2001 From: Christopher Li Date: Tue, 2 Apr 2024 18:08:38 -0700 Subject: [PATCH 4/4] exclude 409 error --- pkg/receive/handler.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pkg/receive/handler.go b/pkg/receive/handler.go index c0c0fea21fe..09a54e2c1c9 100644 --- a/pkg/receive/handler.go +++ b/pkg/receive/handler.go @@ -798,6 +798,9 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[e Replica: int64(writeTarget.replica + 1), }) }) + if isConflict(err) { + err = nil + } if err != nil { // Check if peer connection is unavailable, don't attempt to send requests constantly. if st, ok := status.FromError(err); ok {