Skip to content

Commit

Permalink
fix: [cp24]Saperate L0 and Mix trigger interval (#37319)
Browse files Browse the repository at this point in the history
See also: #37108
pr: #37190

- Add MixCompactionTriggerInterval, default 60s
- Add L0CompactionTriggerInterval, default 10s
- Export Single related compaction configs
- Raise SingleCompactionDeltaLogMaxSize from 2MB to 16MB

---------

Signed-off-by: yangxuan <[email protected]>
  • Loading branch information
XuanYang-cn authored Nov 6, 2024
1 parent af5e32d commit 20534a3
Show file tree
Hide file tree
Showing 6 changed files with 72 additions and 31 deletions.
23 changes: 17 additions & 6 deletions configs/milvus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -553,6 +553,23 @@ dataCoord:
taskQueueCapacity: 256 # compaction task queue size
rpcTimeout: 10
maxParallelTaskNum: 10
mix:
triggerInterval: 60 # The time interval in seconds to trigger mix compaction
levelzero:
triggerInterval: 10 # The time interval in seconds for trigger L0 compaction
forceTrigger:
minSize: 8388608 # The minmum size in bytes to force trigger a LevelZero Compaction, default as 8MB
maxSize: 67108864 # The maxmum size in bytes to force trigger a LevelZero Compaction, default as 64MB
deltalogMinNum: 10 # The minimum number of deltalog files to force trigger a LevelZero Compaction
deltalogMaxNum: 30 # The maxmum number of deltalog files to force trigger a LevelZero Compaction, default as 30
single:
ratio:
threshold: 0.2 # The ratio threshold of a segment to trigger a single compaction, default as 0.2
deltalog:
maxsize: 16777216 # The deltalog size of a segment to trigger a single compaction, default as 16MB
maxnum: 200 # The deltalog count of a segment to trigger a compaction, default as 200
expiredlog:
maxsize: 10485760 # The expired log size of a segment to trigger a compaction, default as 10MB
clustering:
enable: true # Enable clustering compaction
autoEnable: false # Enable auto clustering compaction
Expand All @@ -568,12 +585,6 @@ dataCoord:
minClusterSizeRatio: 0.01 # minimum cluster size / avg size in Kmeans train
maxClusterSizeRatio: 10 # maximum cluster size / avg size in Kmeans train
maxClusterSize: 5g # maximum cluster size in Kmeans train
levelzero:
forceTrigger:
minSize: 8388608 # The minmum size in bytes to force trigger a LevelZero Compaction, default as 8MB
maxSize: 67108864 # The maxmum size in bytes to force trigger a LevelZero Compaction, default as 64MB
deltalogMinNum: 10 # The minimum number of deltalog files to force trigger a LevelZero Compaction
deltalogMaxNum: 30 # The maxmum number of deltalog files to force trigger a LevelZero Compaction, default as 30
syncSegmentsInterval: 300 # The time interval for regularly syncing segments
enableGarbageCollection: true # Switch value to control if to enable garbage collection to clear the discarded data in MinIO or S3 service.
gc:
Expand Down
4 changes: 1 addition & 3 deletions internal/datacoord/compaction_trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func newCompactionTrigger(
}

func (t *compactionTrigger) start() {
t.globalTrigger = time.NewTicker(Params.DataCoordCfg.GlobalCompactionInterval.GetAsDuration(time.Second))
t.globalTrigger = time.NewTicker(Params.DataCoordCfg.MixCompactionTriggerInterval.GetAsDuration(time.Second))
t.closeWaiter.Add(2)
go func() {
defer logutil.LogPanic()
Expand All @@ -131,8 +131,6 @@ func (t *compactionTrigger) start() {
default:
// no need to handle err in handleSignal
t.handleSignal(signal)
// shouldn't reset, otherwise a frequent flushed collection will affect other collections
// t.globalTrigger.Reset(Params.DataCoordCfg.GlobalCompactionInterval)
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions internal/datacoord/compaction_trigger_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,11 +105,11 @@ func (m *CompactionTriggerManager) startLoop() {
defer logutil.LogPanic()
defer m.closeWg.Done()

l0Ticker := time.NewTicker(Params.DataCoordCfg.GlobalCompactionInterval.GetAsDuration(time.Second))
l0Ticker := time.NewTicker(Params.DataCoordCfg.L0CompactionTriggerInterval.GetAsDuration(time.Second))
defer l0Ticker.Stop()
clusteringTicker := time.NewTicker(Params.DataCoordCfg.ClusteringCompactionTriggerInterval.GetAsDuration(time.Second))
defer clusteringTicker.Stop()
singleTicker := time.NewTicker(Params.DataCoordCfg.GlobalCompactionInterval.GetAsDuration(time.Second))
singleTicker := time.NewTicker(Params.DataCoordCfg.MixCompactionTriggerInterval.GetAsDuration(time.Second))
defer singleTicker.Stop()
log.Info("Compaction trigger manager start")
for {
Expand Down
63 changes: 47 additions & 16 deletions pkg/util/paramtable/component_param.go
Original file line number Diff line number Diff line change
Expand Up @@ -3156,25 +3156,29 @@ type dataCoordConfig struct {
CompactionTaskPrioritizer ParamItem `refreshable:"true"`
CompactionTaskQueueCapacity ParamItem `refreshable:"false"`

CompactionRPCTimeout ParamItem `refreshable:"true"`
CompactionMaxParallelTasks ParamItem `refreshable:"true"`
CompactionWorkerParalleTasks ParamItem `refreshable:"true"`
MinSegmentToMerge ParamItem `refreshable:"true"`
MaxSegmentToMerge ParamItem `refreshable:"true"`
SegmentSmallProportion ParamItem `refreshable:"true"`
SegmentCompactableProportion ParamItem `refreshable:"true"`
SegmentExpansionRate ParamItem `refreshable:"true"`
CompactionTimeoutInSeconds ParamItem `refreshable:"true"`
CompactionDropToleranceInSeconds ParamItem `refreshable:"true"`
CompactionGCIntervalInSeconds ParamItem `refreshable:"true"`
CompactionCheckIntervalInSeconds ParamItem `refreshable:"false"`
CompactionRPCTimeout ParamItem `refreshable:"true"`
CompactionMaxParallelTasks ParamItem `refreshable:"true"`
CompactionWorkerParalleTasks ParamItem `refreshable:"true"`
MinSegmentToMerge ParamItem `refreshable:"true"`
MaxSegmentToMerge ParamItem `refreshable:"true"`
SegmentSmallProportion ParamItem `refreshable:"true"`
SegmentCompactableProportion ParamItem `refreshable:"true"`
SegmentExpansionRate ParamItem `refreshable:"true"`
CompactionTimeoutInSeconds ParamItem `refreshable:"true"`
CompactionDropToleranceInSeconds ParamItem `refreshable:"true"`
CompactionGCIntervalInSeconds ParamItem `refreshable:"true"`
CompactionCheckIntervalInSeconds ParamItem `refreshable:"false"`
MixCompactionTriggerInterval ParamItem `refreshable:"false"`
L0CompactionTriggerInterval ParamItem `refreshable:"false"`
GlobalCompactionInterval ParamItem `refreshable:"false"`

SingleCompactionRatioThreshold ParamItem `refreshable:"true"`
SingleCompactionDeltaLogMaxSize ParamItem `refreshable:"true"`
SingleCompactionExpiredLogMaxSize ParamItem `refreshable:"true"`
SingleCompactionDeltalogMaxNum ParamItem `refreshable:"true"`
GlobalCompactionInterval ParamItem `refreshable:"false"`
ChannelCheckpointMaxLag ParamItem `refreshable:"true"`
SyncSegmentsInterval ParamItem `refreshable:"false"`

ChannelCheckpointMaxLag ParamItem `refreshable:"true"`
SyncSegmentsInterval ParamItem `refreshable:"false"`

// Clustering Compaction
ClusteringCompactionEnable ParamItem `refreshable:"true"`
Expand Down Expand Up @@ -3541,37 +3545,64 @@ During compaction, the size of segment # of rows is able to exceed segment max #
Key: "dataCoord.compaction.single.ratio.threshold",
Version: "2.0.0",
DefaultValue: "0.2",
Doc: "The ratio threshold of a segment to trigger a single compaction, default as 0.2",
Export: true,
}
p.SingleCompactionRatioThreshold.Init(base.mgr)

p.SingleCompactionDeltaLogMaxSize = ParamItem{
Key: "dataCoord.compaction.single.deltalog.maxsize",
Version: "2.0.0",
DefaultValue: strconv.Itoa(2 * 1024 * 1024),
DefaultValue: "16777216",
Doc: "The deltalog size of a segment to trigger a single compaction, default as 16MB",
Export: true,
}
p.SingleCompactionDeltaLogMaxSize.Init(base.mgr)

p.SingleCompactionExpiredLogMaxSize = ParamItem{
Key: "dataCoord.compaction.single.expiredlog.maxsize",
Version: "2.0.0",
DefaultValue: "10485760",
Doc: "The expired log size of a segment to trigger a compaction, default as 10MB",
Export: true,
}
p.SingleCompactionExpiredLogMaxSize.Init(base.mgr)

p.SingleCompactionDeltalogMaxNum = ParamItem{
Key: "dataCoord.compaction.single.deltalog.maxnum",
Version: "2.0.0",
DefaultValue: "200",
Doc: "The deltalog count of a segment to trigger a compaction, default as 200",
Export: true,
}
p.SingleCompactionDeltalogMaxNum.Init(base.mgr)

p.GlobalCompactionInterval = ParamItem{
Key: "dataCoord.compaction.global.interval",
Version: "2.0.0",
DefaultValue: "60",
Doc: "deprecated",
}
p.GlobalCompactionInterval.Init(base.mgr)

p.MixCompactionTriggerInterval = ParamItem{
Key: "dataCoord.compaction.mix.triggerInterval",
Version: "2.4.15",
Doc: "The time interval in seconds to trigger mix compaction",
DefaultValue: "60",
Export: true,
}
p.MixCompactionTriggerInterval.Init(base.mgr)

p.L0CompactionTriggerInterval = ParamItem{
Key: "dataCoord.compaction.levelzero.triggerInterval",
Version: "2.4.15",
Doc: "The time interval in seconds for trigger L0 compaction",
DefaultValue: "10",
Export: true,
}
p.L0CompactionTriggerInterval.Init(base.mgr)

p.ChannelCheckpointMaxLag = ParamItem{
Key: "dataCoord.compaction.channelMaxCPLag",
Version: "2.4.0",
Expand Down
5 changes: 3 additions & 2 deletions tests/integration/compaction/compaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,14 @@ func (s *CompactionSuite) SetupSuite() {
s.MiniClusterSuite.SetupSuite()

paramtable.Init()
paramtable.Get().Save(paramtable.Get().DataCoordCfg.GlobalCompactionInterval.Key, "1")
paramtable.Get().Save(paramtable.Get().DataCoordCfg.MixCompactionTriggerInterval.Key, "1")
paramtable.Get().Save(paramtable.Get().DataCoordCfg.L0CompactionTriggerInterval.Key, "1")
}

func (s *CompactionSuite) TearDownSuite() {
s.MiniClusterSuite.TearDownSuite()

paramtable.Get().Reset(paramtable.Get().DataCoordCfg.GlobalCompactionInterval.Key)
paramtable.Get().Reset(paramtable.Get().DataCoordCfg.L0CompactionTriggerInterval.Key)
}

func TestCompaction(t *testing.T) {
Expand Down
4 changes: 2 additions & 2 deletions tests/integration/compaction/l2_single_compaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,9 +275,9 @@ func (s *L2SingleCompactionSuite) TestL2SingleCompaction() {
func TestL2SingleCompaction(t *testing.T) {
paramtable.Init()
// to speed up the test
paramtable.Get().Save(paramtable.Get().DataCoordCfg.GlobalCompactionInterval.Key, "10")
paramtable.Get().Save(paramtable.Get().DataCoordCfg.MixCompactionTriggerInterval.Key, "10")
paramtable.Get().Save(paramtable.Get().DataCoordCfg.LevelZeroCompactionTriggerDeltalogMinNum.Key, "0")
defer paramtable.Get().Reset(paramtable.Get().DataCoordCfg.GlobalCompactionInterval.Key)
defer paramtable.Get().Reset(paramtable.Get().DataCoordCfg.MixCompactionTriggerInterval.Key)
defer paramtable.Get().Reset(paramtable.Get().DataCoordCfg.LevelZeroCompactionTriggerDeltalogMinNum.Key)

suite.Run(t, new(L2SingleCompactionSuite))
Expand Down

0 comments on commit 20534a3

Please sign in to comment.