Skip to content

Commit

Permalink
Create feature flag to switch between current shuffle sharding group …
Browse files Browse the repository at this point in the history
…planner and partition compaction group planner (cortexproject#6141)

* Create feature flag to switch between current shuffle sharding group planner and partition compaction group planner

Signed-off-by: Alex Le <[email protected]>

* rename

Signed-off-by: Alex Le <[email protected]>

* update doc

Signed-off-by: Alex Le <[email protected]>

---------

Signed-off-by: Alex Le <[email protected]>
  • Loading branch information
alexqyle authored Aug 20, 2024
1 parent a467830 commit 90ad777
Show file tree
Hide file tree
Showing 6 changed files with 132 additions and 27 deletions.
4 changes: 4 additions & 0 deletions docs/blocks-storage/compactor.md
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,10 @@ compactor:
# CLI flag: -compactor.ring.wait-active-instance-timeout
[wait_active_instance_timeout: <duration> | default = 10m]

# The compaction strategy to use. Supported values are: default, partitioning.
# CLI flag: -compactor.compaction-mode
[compaction_mode: <string> | default = "default"]

# How long block visit marker file should be considered as expired and able to
# be picked up by compactor again.
# CLI flag: -compactor.block-visit-marker-timeout
Expand Down
4 changes: 4 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -2216,6 +2216,10 @@ sharding_ring:
# CLI flag: -compactor.ring.wait-active-instance-timeout
[wait_active_instance_timeout: <duration> | default = 10m]
# The compaction strategy to use. Supported values are: default, partitioning.
# CLI flag: -compactor.compaction-mode
[compaction_mode: <string> | default = "default"]
# How long block visit marker file should be considered as expired and able to
# be picked up by compactor again.
# CLI flag: -compactor.block-visit-marker-timeout
Expand Down
78 changes: 51 additions & 27 deletions pkg/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,12 @@ var (
errInvalidBlockRanges = "compactor block range periods should be divisible by the previous one, but %s is not divisible by %s"
RingOp = ring.NewOp([]ring.InstanceState{ring.ACTIVE}, nil)

supportedShardingStrategies = []string{util.ShardingStrategyDefault, util.ShardingStrategyShuffle}
errInvalidShardingStrategy = errors.New("invalid sharding strategy")
errInvalidTenantShardSize = errors.New("invalid tenant shard size, the value must be greater than 0")
supportedShardingStrategies = []string{util.ShardingStrategyDefault, util.ShardingStrategyShuffle}
errInvalidShardingStrategy = errors.New("invalid sharding strategy")
errInvalidTenantShardSize = errors.New("invalid tenant shard size, the value must be greater than 0")
supportedCompactionStrategies = []string{util.CompactionStrategyDefault, util.CompactionStrategyPartitioning}
errInvalidCompactionStrategy = errors.New("invalid compaction strategy")
errInvalidCompactionStrategyPartitioning = errors.New("compaction strategy partitioning can only be enabled when shuffle sharding is enabled")

DefaultBlocksGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.InstrumentedBucket, logger log.Logger, blocksMarkedForNoCompaction prometheus.Counter, _ prometheus.Counter, _ prometheus.Counter, syncerMetrics *compact.SyncerMetrics, compactorMetrics *compactorMetrics, _ *ring.Ring, _ *ring.Lifecycler, _ Limits, _ string, _ *compact.GatherNoCompactionMarkFilter) compact.Grouper {
return compact.NewDefaultGrouperWithMetrics(
Expand All @@ -77,29 +80,33 @@ var (
}

ShuffleShardingGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.InstrumentedBucket, logger log.Logger, blocksMarkedForNoCompaction prometheus.Counter, blockVisitMarkerReadFailed prometheus.Counter, blockVisitMarkerWriteFailed prometheus.Counter, syncerMetrics *compact.SyncerMetrics, compactorMetrics *compactorMetrics, ring *ring.Ring, ringLifecycle *ring.Lifecycler, limits Limits, userID string, noCompactionMarkFilter *compact.GatherNoCompactionMarkFilter) compact.Grouper {
return NewShuffleShardingGrouper(
ctx,
logger,
bkt,
cfg.AcceptMalformedIndex,
true, // Enable vertical compaction
blocksMarkedForNoCompaction,
metadata.NoneFunc,
syncerMetrics,
compactorMetrics,
cfg,
ring,
ringLifecycle.Addr,
ringLifecycle.ID,
limits,
userID,
cfg.BlockFilesConcurrency,
cfg.BlocksFetchConcurrency,
cfg.CompactionConcurrency,
cfg.BlockVisitMarkerTimeout,
blockVisitMarkerReadFailed,
blockVisitMarkerWriteFailed,
noCompactionMarkFilter.NoCompactMarkedBlocks)
if cfg.CompactionStrategy == util.CompactionStrategyPartitioning {
return NewPartitionCompactionGrouper(ctx, logger, bkt)
} else {
return NewShuffleShardingGrouper(
ctx,
logger,
bkt,
cfg.AcceptMalformedIndex,
true, // Enable vertical compaction
blocksMarkedForNoCompaction,
metadata.NoneFunc,
syncerMetrics,
compactorMetrics,
cfg,
ring,
ringLifecycle.Addr,
ringLifecycle.ID,
limits,
userID,
cfg.BlockFilesConcurrency,
cfg.BlocksFetchConcurrency,
cfg.CompactionConcurrency,
cfg.BlockVisitMarkerTimeout,
blockVisitMarkerReadFailed,
blockVisitMarkerWriteFailed,
noCompactionMarkFilter.NoCompactMarkedBlocks)
}
}

DefaultBlocksCompactorFactory = func(ctx context.Context, cfg Config, logger log.Logger, reg prometheus.Registerer) (compact.Compactor, PlannerFactory, error) {
Expand All @@ -123,7 +130,11 @@ var (

plannerFactory := func(ctx context.Context, bkt objstore.InstrumentedBucket, logger log.Logger, cfg Config, noCompactionMarkFilter *compact.GatherNoCompactionMarkFilter, ringLifecycle *ring.Lifecycler, userID string, blockVisitMarkerReadFailed prometheus.Counter, blockVisitMarkerWriteFailed prometheus.Counter, compactorMetrics *compactorMetrics) compact.Planner {

return NewShuffleShardingPlanner(ctx, bkt, logger, cfg.BlockRanges.ToMilliseconds(), noCompactionMarkFilter.NoCompactMarkedBlocks, ringLifecycle.ID, cfg.BlockVisitMarkerTimeout, cfg.BlockVisitMarkerFileUpdateInterval, blockVisitMarkerReadFailed, blockVisitMarkerWriteFailed)
if cfg.CompactionStrategy == util.CompactionStrategyPartitioning {
return NewPartitionCompactionPlanner(ctx, bkt, logger)
} else {
return NewShuffleShardingPlanner(ctx, bkt, logger, cfg.BlockRanges.ToMilliseconds(), noCompactionMarkFilter.NoCompactMarkedBlocks, ringLifecycle.ID, cfg.BlockVisitMarkerTimeout, cfg.BlockVisitMarkerFileUpdateInterval, blockVisitMarkerReadFailed, blockVisitMarkerWriteFailed)
}
}
return compactor, plannerFactory, nil
}
Expand Down Expand Up @@ -202,6 +213,9 @@ type Config struct {
ShardingStrategy string `yaml:"sharding_strategy"`
ShardingRing RingConfig `yaml:"sharding_ring"`

// Compaction mode.
CompactionStrategy string `yaml:"compaction_mode"`

// No need to add options to customize the retry backoff,
// given the defaults should be fine, but allow to override
// it in tests.
Expand Down Expand Up @@ -244,6 +258,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&cfg.CleanupConcurrency, "compactor.cleanup-concurrency", 20, "Max number of tenants for which blocks cleanup and maintenance should run concurrently.")
f.BoolVar(&cfg.ShardingEnabled, "compactor.sharding-enabled", false, "Shard tenants across multiple compactor instances. Sharding is required if you run multiple compactor instances, in order to coordinate compactions and avoid race conditions leading to the same tenant blocks simultaneously compacted by different instances.")
f.StringVar(&cfg.ShardingStrategy, "compactor.sharding-strategy", util.ShardingStrategyDefault, fmt.Sprintf("The sharding strategy to use. Supported values are: %s.", strings.Join(supportedShardingStrategies, ", ")))
f.StringVar(&cfg.CompactionStrategy, "compactor.compaction-mode", util.CompactionStrategyDefault, fmt.Sprintf("The compaction strategy to use. Supported values are: %s.", strings.Join(supportedCompactionStrategies, ", ")))
f.DurationVar(&cfg.DeletionDelay, "compactor.deletion-delay", 12*time.Hour, "Time before a block marked for deletion is deleted from bucket. "+
"If not 0, blocks will be marked for deletion and compactor component will permanently delete blocks marked for deletion from the bucket. "+
"If 0, blocks will be deleted straight away. Note that deleting blocks immediately can cause query failures.")
Expand Down Expand Up @@ -290,6 +305,15 @@ func (cfg *Config) Validate(limits validation.Limits) error {
}
}

// Make sure a valid compaction mode is being used
if !util.StringsContain(supportedCompactionStrategies, cfg.CompactionStrategy) {
return errInvalidCompactionStrategy
}

if !cfg.ShardingEnabled && cfg.CompactionStrategy == util.CompactionStrategyPartitioning {
return errInvalidCompactionStrategyPartitioning
}

return nil
}

Expand Down
38 changes: 38 additions & 0 deletions pkg/compactor/partition_compaction_grouper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package compactor

import (
"context"

"github.com/go-kit/log"
"github.com/oklog/ulid"
"github.com/thanos-io/objstore"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/compact"
)

type PartitionCompactionGrouper struct {
ctx context.Context
logger log.Logger
bkt objstore.InstrumentedBucket
}

func NewPartitionCompactionGrouper(
ctx context.Context,
logger log.Logger,
bkt objstore.InstrumentedBucket,
) *PartitionCompactionGrouper {
if logger == nil {
logger = log.NewNopLogger()
}

return &PartitionCompactionGrouper{
ctx: ctx,
logger: logger,
bkt: bkt,
}
}

// Groups function modified from https://github.com/cortexproject/cortex/pull/2616
func (g *PartitionCompactionGrouper) Groups(blocks map[ulid.ULID]*metadata.Meta) (res []*compact.Group, err error) {
panic("PartitionCompactionGrouper not implemented")
}
31 changes: 31 additions & 0 deletions pkg/compactor/partition_compaction_planner.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package compactor

import (
"context"

"github.com/go-kit/log"
"github.com/thanos-io/objstore"
"github.com/thanos-io/thanos/pkg/block/metadata"
)

type PartitionCompactionPlanner struct {
ctx context.Context
bkt objstore.InstrumentedBucket
logger log.Logger
}

func NewPartitionCompactionPlanner(
ctx context.Context,
bkt objstore.InstrumentedBucket,
logger log.Logger,
) *PartitionCompactionPlanner {
return &PartitionCompactionPlanner{
ctx: ctx,
bkt: bkt,
logger: logger,
}
}

func (p *PartitionCompactionPlanner) Plan(ctx context.Context, metasByMinTime []*metadata.Meta, errChan chan error, extensions any) ([]*metadata.Meta, error) {
panic("PartitionCompactionPlanner not implemented")
}
4 changes: 4 additions & 0 deletions pkg/util/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ const (
// Sharding strategies.
ShardingStrategyDefault = "default"
ShardingStrategyShuffle = "shuffle-sharding"

// Compaction mode
CompactionStrategyDefault = "default"
CompactionStrategyPartitioning = "partitioning"
)

var (
Expand Down

0 comments on commit 90ad777

Please sign in to comment.