Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Partitioning compaction for Cortex #5465

Closed
Closed
Changes from 1 commit
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
ea5dc28
Partitioning compactor with upstreamed Thanos and Prometheus changes
alexqyle Jul 18, 2023
b2eebc1
Updated CHANGELOG
alexqyle Jul 18, 2023
0fa228b
Fixed issue that block cannot be compacted if block was compacted wit…
alexqyle Jul 25, 2023
f8f130c
fix lint
alexqyle Jul 25, 2023
6fda3f1
make unit test clear
alexqyle Jul 26, 2023
4a05910
make visit mark file suffix backward compatible
alexqyle Jul 28, 2023
2cb3d62
fix invalid cortex extension being set in meta
alexqyle Jul 31, 2023
0036a8e
fix lint
alexqyle Jul 31, 2023
044165d
Merge branch 'cortexproject:master' into partitioning-compactor-v3
alexqyle Aug 1, 2023
2a06555
Merge branch 'master' into partitioning-compactor-v3
alexqyle Aug 16, 2023
d902b8b
Make paritioning compactor configuration to be set per tenant
alexqyle Aug 16, 2023
5b892a3
fixed remaining plan metric and added block generation test util
alexqyle Aug 25, 2023
236fd6b
Merge commit 'b91a24d917f5a268b3982b0ddc6bf5686a9719a8' into partitio…
alexqyle Aug 25, 2023
dc3d3af
Merge commit '1a097a924b203c66646df5d02f3f09078ec77531' into partitio…
alexqyle Sep 8, 2023
4f7d9c0
improved performance going through all possible compaction groups
alexqyle Sep 8, 2023
c7060a4
nit fix
alexqyle Sep 8, 2023
146417b
nit fix
alexqyle Sep 8, 2023
20ff2ca
merge from master
alexqyle Sep 13, 2023
392cffc
add extra logic in compaction complete checker to avoid source blocks…
alexqyle Sep 14, 2023
4daba3e
move sample diff validation to post compaction callback
alexqyle Sep 18, 2023
ff3c705
fix lint
alexqyle Sep 18, 2023
7f0bc54
Merge commit '4e162a011a5e23f90c0c934c3d0dee6398d349fd' into partitio…
alexqyle Sep 18, 2023
45749f2
accurately getting result block
alexqyle Sep 19, 2023
da38fbe
Randomly iterating through partitions in grouper. More timing logs in…
alexqyle Sep 28, 2023
e0e2ec4
Introduce partition visit marker to avoid possible deadlock caused by…
alexqyle Sep 29, 2023
4ef90ac
clean up local dir in pre compaction callback
alexqyle Oct 11, 2023
2bf3571
Merge branch 'master' into partitioning-compactor-v3
alexqyle Oct 11, 2023
798a23a
fix compile error after prometheus update
alexqyle Oct 11, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
clean up local dir in pre compaction callback
Signed-off-by: Alex Le <leqiyue@amazon.com>
alexqyle committed Oct 11, 2023
commit 4ef90ac21d203a627d447fe014f3f82e466fe9e4
1 change: 1 addition & 0 deletions pkg/compactor/compactor.go
Original file line number Diff line number Diff line change
@@ -873,6 +873,7 @@ func (c *Compactor) compactUser(ctx context.Context, userID string) error {
userBucket: bucket,
logger: ulogger,
metaSyncConcurrency: c.compactorCfg.MetaSyncConcurrency,
compactDir: c.compactDirForUser(userID),
partitionedGroupInfoReadFailed: c.PartitionedGroupInfoReadFailed,
}
compactor, err := compact.NewBucketCompactorWithCheckerAndCallback(
16 changes: 13 additions & 3 deletions pkg/compactor/sharded_compaction_lifecycle_callback.go
Original file line number Diff line number Diff line change
@@ -3,21 +3,22 @@ package compactor
import (
"context"
"path"
"path/filepath"
"sort"
"strings"
"sync"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/oklog/ulid"
"golang.org/x/sync/errgroup"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/tsdb"
"github.com/thanos-io/objstore"
"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/compact"
"github.com/thanos-io/thanos/pkg/runutil"
"golang.org/x/sync/errgroup"

"github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex"
)
@@ -27,10 +28,19 @@ type ShardedCompactionLifecycleCallback struct {
userBucket objstore.InstrumentedBucket
logger log.Logger
metaSyncConcurrency int
compactDir string
partitionedGroupInfoReadFailed prometheus.Counter
}

func (c ShardedCompactionLifecycleCallback) PreCompactionCallback(_ context.Context, _ log.Logger, _ *compact.Group, _ []*metadata.Meta) error {
func (c ShardedCompactionLifecycleCallback) PreCompactionCallback(_ context.Context, logger log.Logger, g *compact.Group, meta []*metadata.Meta) error {
// Delete local files other than current group
var ignoreDirs []string
for _, m := range meta {
ignoreDirs = append(ignoreDirs, filepath.Join(g.Key(), m.ULID.String()))
}
if err := runutil.DeleteAll(c.compactDir, ignoreDirs...); err != nil {
level.Warn(logger).Log("msg", "failed deleting non-current compaction group files, disk space usage might have leaked.", "err", err, "dir", c.compactDir)
}
return nil
}

96 changes: 96 additions & 0 deletions pkg/compactor/sharded_compaction_lifecycle_callback_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package compactor

import (
"context"
"os"
"path/filepath"
"testing"
"time"

"github.com/go-kit/log"
"github.com/oklog/ulid"
"github.com/prometheus/prometheus/tsdb"
"github.com/stretchr/testify/require"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/compact"
)

func TestPreCompactionCallback(t *testing.T) {
compactDir, err := os.MkdirTemp(os.TempDir(), "compact")
require.NoError(t, err)

t.Cleanup(func() {
require.NoError(t, os.RemoveAll(compactDir))
})

lifecycleCallback := ShardedCompactionLifecycleCallback{
compactDir: compactDir,
}

block1 := ulid.MustNew(1, nil)
block2 := ulid.MustNew(2, nil)
block3 := ulid.MustNew(3, nil)
meta := []*metadata.Meta{
{
BlockMeta: tsdb.BlockMeta{ULID: block1, MinTime: 1 * time.Hour.Milliseconds(), MaxTime: 2 * time.Hour.Milliseconds()},
},
{
BlockMeta: tsdb.BlockMeta{ULID: block2, MinTime: 1 * time.Hour.Milliseconds(), MaxTime: 2 * time.Hour.Milliseconds()},
},
{
BlockMeta: tsdb.BlockMeta{ULID: block3, MinTime: 2 * time.Hour.Milliseconds(), MaxTime: 3 * time.Hour.Milliseconds()},
},
}
testGroupKey := "test_group_key"
testGroup, _ := compact.NewGroup(
log.NewNopLogger(),
nil,
testGroupKey,
nil,
0,
true,
true,
nil,
nil,
nil,
nil,
nil,
nil,
nil,
nil,
metadata.NoneFunc,
1,
1,
)
for _, m := range meta {
err := testGroup.AppendMeta(m)
require.NoError(t, err)
}

dummyGroupID1 := "dummy_dir_1"
dummyGroupID2 := "dummy_dir_2"
err = os.MkdirAll(filepath.Join(compactDir, testGroupKey), 0750)
require.NoError(t, err)
err = os.MkdirAll(filepath.Join(compactDir, testGroupKey, block1.String()), 0750)
require.NoError(t, err)
err = os.MkdirAll(filepath.Join(compactDir, dummyGroupID1), 0750)
require.NoError(t, err)
err = os.MkdirAll(filepath.Join(compactDir, dummyGroupID2), 0750)
require.NoError(t, err)

err = lifecycleCallback.PreCompactionCallback(context.Background(), log.NewNopLogger(), testGroup, meta)
require.NoError(t, err)

info, err := os.Stat(filepath.Join(compactDir, testGroupKey))
require.NoError(t, err)
require.True(t, info.IsDir())
info, err = os.Stat(filepath.Join(compactDir, testGroupKey, block1.String()))
require.NoError(t, err)
require.True(t, info.IsDir())
_, err = os.Stat(filepath.Join(compactDir, dummyGroupID1))
require.Error(t, err)
require.True(t, os.IsNotExist(err))
_, err = os.Stat(filepath.Join(compactDir, dummyGroupID2))
require.Error(t, err)
require.True(t, os.IsNotExist(err))
}