Skip to content

Commit

Permalink
Merge branch 'db_main' into pull-latest-main
Browse files Browse the repository at this point in the history
Signed-off-by: Yi Jin <[email protected]>
  • Loading branch information
jnyi authored Apr 4, 2024
2 parents fa9882c + 7175992 commit 127e32d
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 121 deletions.
1 change: 0 additions & 1 deletion pkg/compact/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -1127,7 +1127,6 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp
if err := compactionLifecycleCallback.PreCompactionCallback(ctx, cg.logger, cg, toCompact); err != nil {
return false, ulid.ULID{}, errors.Wrapf(err, "failed to run pre compaction callback for plan: %s", fmt.Sprintf("%v", toCompact))
}
toCompact = FilterRemovedBlocks(toCompact)
level.Info(cg.logger).Log("msg", "finished running pre compaction callback; downloading blocks", "duration", time.Since(begin), "duration_ms", time.Since(begin).Milliseconds(), "plan", fmt.Sprintf("%v", toCompact))

begin = time.Now()
Expand Down
2 changes: 1 addition & 1 deletion pkg/compact/downsample/streamed_block_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func (w *streamedBlockWriter) WriteSeries(lset labels.Labels, chunks []chunks.Me
}

if len(chunks) == 0 {
level.Warn(w.logger).Log("msg", "empty chunks happened, skip series", "series", strings.ReplaceAll(lset.String(), "\"", "'"))
level.Debug(w.logger).Log("msg", "empty chunks happened, skip series", "series", strings.ReplaceAll(lset.String(), "\"", "'"))
return nil
}

Expand Down
90 changes: 47 additions & 43 deletions pkg/compact/overlapping.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,32 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/prometheus/tsdb"
"github.com/thanos-io/objstore"
"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/metadata"
)

const overlappingReason = "blocks-overlapping"

type OverlappingCompactionLifecycleCallback struct {
overlappingBlocks prometheus.Counter
noCompaction prometheus.Counter
noDownsampling prometheus.Counter
}

func NewOverlappingCompactionLifecycleCallback(reg *prometheus.Registry, enabled bool) CompactionLifecycleCallback {
if enabled {
return OverlappingCompactionLifecycleCallback{
overlappingBlocks: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "thanos_compact_group_overlapping_blocks_total",
Help: "Total number of blocks that are overlapping and to be deleted.",
Help: "Total number of blocks that are overlapping.",
}),
noCompaction: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "thanos_compact_group_overlapping_blocks_no_compaction_total",
Help: "Total number of blocks that are overlapping and mark no compaction.",
}),
noDownsampling: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "thanos_compact_group_overlapping_blocks_no_downsampling_total",
Help: "Total number of blocks that are overlapping and mark no downsampling.",
}),
}
}
Expand All @@ -42,6 +53,7 @@ func (o OverlappingCompactionLifecycleCallback) PreCompactionCallback(ctx contex
return nil
}
prev := 0
var reason error
for curr, currB := range toCompact {
prevB := toCompact[prev]
if curr == 0 || currB.Thanos.Source == metadata.ReceiveSource || prevB.MaxTime <= currB.MinTime {
Expand All @@ -51,15 +63,21 @@ func (o OverlappingCompactionLifecycleCallback) PreCompactionCallback(ctx contex
} else if currB.MinTime < prevB.MinTime {
// halt when the assumption is broken, the input toCompact isn't sorted by minTime, need manual investigation
return halt(errors.Errorf("later blocks has smaller minTime than previous block: %s -- %s", prevB.String(), currB.String()))
} else if prevB.MaxTime < currB.MaxTime && prevB.MinTime != currB.MinTime {
err := errors.Errorf("found partially overlapping block: %s -- %s", prevB.String(), currB.String())
if cg.enableVerticalCompaction {
level.Error(logger).Log("msg", "best effort to vertical compact", "err", err)
prev = curr
continue
} else {
return halt(err)
}
// prev min <= curr min < prev max
o.overlappingBlocks.Inc()
if prevB.MaxTime < currB.MaxTime && prevB.MinTime != currB.MinTime {
o.noCompaction.Inc()
reason = fmt.Errorf("found partially overlapping block: %s -- %s", prevB.String(), currB.String())
if err := block.MarkForNoCompact(ctx, logger, cg.bkt, prevB.ULID, overlappingReason,
reason.Error(), o.noCompaction); err != nil {
return retry(err)
}
if err := block.MarkForNoCompact(ctx, logger, cg.bkt, currB.ULID, overlappingReason,
reason.Error(), o.noCompaction); err != nil {
return retry(err)
}
return retry(reason)
} else if prevB.MinTime == currB.MinTime && prevB.MaxTime == currB.MaxTime {
if prevB.Stats.NumSeries != currB.Stats.NumSeries || prevB.Stats.NumSamples != currB.Stats.NumSamples {
level.Warn(logger).Log("msg", "found same time range but different stats, keep both blocks",
Expand All @@ -70,25 +88,31 @@ func (o OverlappingCompactionLifecycleCallback) PreCompactionCallback(ctx contex
continue
}
}
// prev min <= curr min < prev max
toDelete := -1
var outer, inner *metadata.Meta
if prevB.MaxTime >= currB.MaxTime {
toDelete = curr
level.Warn(logger).Log("msg", "found overlapping block in plan, keep previous block",
"toKeep", prevB.String(), "toDelete", currB.String())
inner = currB
outer = prevB
} else if prevB.MaxTime < currB.MaxTime {
toDelete = prev
inner = prevB
outer = currB
prev = curr
level.Warn(logger).Log("msg", "found overlapping block in plan, keep current block",
"toKeep", currB.String(), "toDelete", prevB.String())
}
o.overlappingBlocks.Inc()
if err := DeleteBlockNow(ctx, logger, cg.bkt, toCompact[toDelete]); err != nil {
return retry(err)
if outer.Thanos.Source == metadata.ReceiveSource {
level.Warn(logger).Log("msg", "bypass if larger blocks are from receive",
"outer", outer.String(), "inner", inner.String())
continue
}
reason = retry(fmt.Errorf("found full overlapping block: %s > %s", outer.String(), inner.String()))
if err := block.MarkForNoCompact(ctx, logger, cg.bkt, inner.ULID, overlappingReason, reason.Error(),
o.noCompaction); err != nil {
return err
}
if err := block.MarkForNoDownsample(ctx, logger, cg.bkt, inner.ULID, overlappingReason, reason.Error(),
o.noDownsampling); err != nil {
return err
}
toCompact[toDelete] = nil
}
return nil
return reason
}

func (o OverlappingCompactionLifecycleCallback) PostCompactionCallback(_ context.Context, _ log.Logger, _ *Group, _ ulid.ULID) error {
Expand All @@ -98,23 +122,3 @@ func (o OverlappingCompactionLifecycleCallback) PostCompactionCallback(_ context
func (o OverlappingCompactionLifecycleCallback) GetBlockPopulator(_ context.Context, _ log.Logger, _ *Group) (tsdb.BlockPopulator, error) {
return tsdb.DefaultBlockPopulator{}, nil
}

func FilterRemovedBlocks(blocks []*metadata.Meta) (res []*metadata.Meta) {
for _, b := range blocks {
if b != nil {
res = append(res, b)
}
}
return res
}

func DeleteBlockNow(ctx context.Context, logger log.Logger, bkt objstore.Bucket, m *metadata.Meta) error {
level.Warn(logger).Log("msg", "delete polluted block immediately", "block", m.String(),
"level", m.Compaction.Level, "parents", fmt.Sprintf("%v", m.Compaction.Parents),
"resolution", m.Thanos.Downsample.Resolution, "source", m.Thanos.Source, "labels", m.Thanos.GetLabels(),
"series", m.Stats.NumSeries, "samples", m.Stats.NumSamples, "chunks", m.Stats.NumChunks)
if err := block.Delete(ctx, logger, bkt, m.ULID); err != nil {
return errors.Wrapf(err, "delete overlapping block %s", m.String())
}
return nil
}
118 changes: 42 additions & 76 deletions pkg/compact/overlapping_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ package compact

import (
"context"
"fmt"
"path"
"testing"

"github.com/efficientgo/core/testutil"
Expand All @@ -16,37 +18,15 @@ import (
"github.com/thanos-io/thanos/pkg/compact/downsample"
)

func TestFilterNilCompact(t *testing.T) {
blocks := []*metadata.Meta{nil, nil}
filtered := FilterRemovedBlocks(blocks)
testutil.Equals(t, 0, len(filtered))

meta := []*metadata.Meta{
createCustomBlockMeta(6, 1, 3, metadata.CompactorSource, 1),
nil,
createCustomBlockMeta(7, 3, 5, metadata.CompactorSource, 2),
createCustomBlockMeta(8, 5, 10, metadata.CompactorSource, 3),
nil,
}
testutil.Equals(t, 3, len(FilterRemovedBlocks(meta)))
}

func TestPreCompactionCallback(t *testing.T) {
reg := prometheus.NewRegistry()
logger := log.NewNopLogger()
bkt := objstore.NewInMemBucket()
group := &Group{
logger: log.NewNopLogger(),
bkt: bkt,
}
callback := NewOverlappingCompactionLifecycleCallback(reg, true)
for _, tcase := range []struct {
testName string
input []*metadata.Meta
enableVerticalCompaction bool
expectedSize int
expectedBlocks []*metadata.Meta
err error
testName string
input []*metadata.Meta
expectedMarks map[int]int
expectedErr error
}{
{
testName: "empty blocks",
Expand All @@ -58,7 +38,6 @@ func TestPreCompactionCallback(t *testing.T) {
createCustomBlockMeta(7, 3, 5, metadata.CompactorSource, 1),
createCustomBlockMeta(8, 5, 10, metadata.CompactorSource, 1),
},
expectedSize: 3,
},
{
testName: "duplicated blocks",
Expand All @@ -67,10 +46,7 @@ func TestPreCompactionCallback(t *testing.T) {
createCustomBlockMeta(7, 1, 7, metadata.CompactorSource, 1),
createCustomBlockMeta(8, 1, 7, metadata.CompactorSource, 1),
},
expectedSize: 1,
expectedBlocks: []*metadata.Meta{
createCustomBlockMeta(6, 1, 7, metadata.CompactorSource, 1),
},
expectedMarks: map[int]int{7: 2, 8: 2},
},
{
testName: "overlap non dup blocks",
Expand All @@ -79,11 +55,7 @@ func TestPreCompactionCallback(t *testing.T) {
createCustomBlockMeta(7, 1, 7, metadata.CompactorSource, 2),
createCustomBlockMeta(8, 1, 7, metadata.CompactorSource, 2),
},
expectedSize: 2,
expectedBlocks: []*metadata.Meta{
createCustomBlockMeta(6, 1, 7, metadata.CompactorSource, 1),
createCustomBlockMeta(7, 1, 7, metadata.CompactorSource, 2),
},
expectedMarks: map[int]int{8: 2},
},
{
testName: "receive blocks",
Expand All @@ -92,7 +64,6 @@ func TestPreCompactionCallback(t *testing.T) {
createCustomBlockMeta(7, 1, 7, metadata.ReceiveSource, 2),
createCustomBlockMeta(8, 1, 7, metadata.ReceiveSource, 3),
},
expectedSize: 3,
},
{
testName: "receive + compactor blocks",
Expand All @@ -101,11 +72,6 @@ func TestPreCompactionCallback(t *testing.T) {
createCustomBlockMeta(7, 2, 7, metadata.CompactorSource, 1),
createCustomBlockMeta(8, 2, 8, metadata.ReceiveSource, 1),
},
expectedSize: 2,
expectedBlocks: []*metadata.Meta{
createCustomBlockMeta(6, 1, 7, metadata.ReceiveSource, 1),
createCustomBlockMeta(8, 2, 8, metadata.ReceiveSource, 1),
},
},
{
testName: "full overlapping blocks",
Expand All @@ -114,10 +80,7 @@ func TestPreCompactionCallback(t *testing.T) {
createCustomBlockMeta(7, 3, 6, metadata.CompactorSource, 1),
createCustomBlockMeta(8, 5, 8, metadata.CompactorSource, 1),
},
expectedSize: 1,
expectedBlocks: []*metadata.Meta{
createCustomBlockMeta(6, 1, 10, metadata.CompactorSource, 1),
},
expectedMarks: map[int]int{7: 2, 8: 2},
},
{
testName: "part overlapping blocks",
Expand All @@ -126,11 +89,7 @@ func TestPreCompactionCallback(t *testing.T) {
createCustomBlockMeta(2, 1, 6, metadata.CompactorSource, 1),
createCustomBlockMeta(3, 6, 8, metadata.CompactorSource, 1),
},
expectedSize: 2,
expectedBlocks: []*metadata.Meta{
createCustomBlockMeta(2, 1, 6, metadata.CompactorSource, 1),
createCustomBlockMeta(3, 6, 8, metadata.CompactorSource, 1),
},
expectedMarks: map[int]int{1: 2},
},
{
testName: "out of order blocks",
Expand All @@ -139,44 +98,47 @@ func TestPreCompactionCallback(t *testing.T) {
createCustomBlockMeta(7, 0, 5, metadata.CompactorSource, 1),
createCustomBlockMeta(8, 5, 8, metadata.CompactorSource, 1),
},
err: halt(errors.Errorf("expect halt error")),
expectedErr: halt(errors.Errorf("some errors")),
},
{
testName: "partially overlapping blocks with vertical compaction off",
testName: "partially overlapping blocks",
input: []*metadata.Meta{
createCustomBlockMeta(6, 2, 4, metadata.CompactorSource, 1),
createCustomBlockMeta(7, 3, 6, metadata.CompactorSource, 1),
createCustomBlockMeta(8, 5, 8, metadata.CompactorSource, 1),
},
err: halt(errors.Errorf("expect halt error")),
},
{
testName: "partially overlapping blocks with vertical compaction on",
input: []*metadata.Meta{
createCustomBlockMeta(6, 2, 4, metadata.CompactorSource, 1),
createCustomBlockMeta(7, 3, 6, metadata.CompactorSource, 1),
createCustomBlockMeta(8, 5, 8, metadata.CompactorSource, 1),
},
enableVerticalCompaction: true,
expectedSize: 3,
expectedMarks: map[int]int{6: 1, 7: 1},
},
} {
if ok := t.Run(tcase.testName, func(t *testing.T) {
group.enableVerticalCompaction = tcase.enableVerticalCompaction
err := callback.PreCompactionCallback(context.Background(), logger, group, tcase.input)
if tcase.err != nil {
ctx := context.Background()
bkt := objstore.NewInMemBucket()
group := &Group{logger: log.NewNopLogger(), bkt: bkt}
err := callback.PreCompactionCallback(ctx, logger, group, tcase.input)
if len(tcase.expectedMarks) != 0 {
testutil.NotOk(t, err)
if IsHaltError(tcase.err) {
testutil.Assert(t, IsHaltError(err), "expected halt error")
} else if IsRetryError(tcase.err) {
testutil.Assert(t, IsRetryError(err), "expected retry error")
}
return
testutil.Assert(t, IsRetryError(err))
} else if tcase.expectedErr != nil {
testutil.NotOk(t, err)
testutil.Assert(t, IsHaltError(err))
} else {
testutil.Ok(t, err)
testutil.Assert(t, err == nil)
}
testutil.Equals(t, tcase.expectedSize, len(FilterRemovedBlocks(tcase.input)))
if tcase.expectedSize != len(tcase.input) {
testutil.Equals(t, tcase.expectedBlocks, FilterRemovedBlocks(tcase.input))
objs := bkt.Objects()
expectedSize := 0
for id, file := range tcase.expectedMarks {
expectedSize += file
_, noCompaction := objs[getFile(id, metadata.NoCompactMarkFilename)]
_, noDownsampling := objs[getFile(id, metadata.NoDownsampleMarkFilename)]
if file <= 2 {
testutil.Assert(t, noCompaction, fmt.Sprintf("expect %d has no compaction", id))
}
if file == 2 {
testutil.Assert(t, noDownsampling, fmt.Sprintf("expect %d has no downsampling", id))
}
}
testutil.Equals(t, expectedSize, len(objs))
}); !ok {
return
}
Expand All @@ -190,3 +152,7 @@ func createCustomBlockMeta(id uint64, minTime, maxTime int64, source metadata.So
m.Stats.NumSeries = numSeries
return m
}

func getFile(id int, mark string) string {
return path.Join(fmt.Sprintf("%010d", id)+fmt.Sprintf("%016d", 0), mark)
}

0 comments on commit 127e32d

Please sign in to comment.