Skip to content

Commit

Permalink
fix: Saperate L0 and Mix trigger interval
Browse files Browse the repository at this point in the history
See also: milvus-io#37108
pr: milvus-io#37190

- Add MixCompactionTriggerInterval, default 60s
- Add L0CompactionTriggerInterval, default 10s
- Export Single related compaction configs
- Raise SingleCompactionDeltaLogMaxSize from 2MB to 16MB

Signed-off-by: yangxuan <[email protected]>
  • Loading branch information
XuanYang-cn committed Oct 31, 2024
1 parent 6109e9d commit 889e2d3
Show file tree
Hide file tree
Showing 6 changed files with 62 additions and 25 deletions.
10 changes: 10 additions & 0 deletions configs/milvus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -553,6 +553,16 @@ dataCoord:
taskQueueCapacity: 256 # compaction task queue size
rpcTimeout: 10
maxParallelTaskNum: 10
dropTolerance: 86400 # Compaction task will be cleaned after finish longer than this time(in seconds)
gcInterval: 1800 # The time interval in seconds for compaction gc
single:
ratio:
threshold: 0.2 # The ratio threshold of a segment to trigger a single compaction, default as 0.2
deltalog:
maxsize: 16777216 # The deltalog size of a segment to trigger a single compaction, default as 16MB
maxnum: 200 # The deltalog count of a segment to trigger a compaction, default as 200
expiredlog:
maxsize: 10485760 # The expired log size of a segment to trigger a compaction, default as 10MB
clustering:
enable: true # Enable clustering compaction
autoEnable: false # Enable auto clustering compaction
Expand Down
4 changes: 1 addition & 3 deletions internal/datacoord/compaction_trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func newCompactionTrigger(
}

func (t *compactionTrigger) start() {
t.globalTrigger = time.NewTicker(Params.DataCoordCfg.GlobalCompactionInterval.GetAsDuration(time.Second))
t.globalTrigger = time.NewTicker(Params.DataCoordCfg.MixCompactionTriggerInterval.GetAsDuration(time.Second))
t.closeWaiter.Add(2)
go func() {
defer logutil.LogPanic()
Expand All @@ -131,8 +131,6 @@ func (t *compactionTrigger) start() {
default:
// no need to handle err in handleSignal
t.handleSignal(signal)
// shouldn't reset, otherwise a frequent flushed collection will affect other collections
// t.globalTrigger.Reset(Params.DataCoordCfg.GlobalCompactionInterval)
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions internal/datacoord/compaction_trigger_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,11 +105,11 @@ func (m *CompactionTriggerManager) startLoop() {
defer logutil.LogPanic()
defer m.closeWg.Done()

l0Ticker := time.NewTicker(Params.DataCoordCfg.GlobalCompactionInterval.GetAsDuration(time.Second))
l0Ticker := time.NewTicker(Params.DataCoordCfg.L0CompactionTriggerInterval.GetAsDuration(time.Second))
defer l0Ticker.Stop()
clusteringTicker := time.NewTicker(Params.DataCoordCfg.ClusteringCompactionTriggerInterval.GetAsDuration(time.Second))
defer clusteringTicker.Stop()
singleTicker := time.NewTicker(Params.DataCoordCfg.GlobalCompactionInterval.GetAsDuration(time.Second))
singleTicker := time.NewTicker(Params.DataCoordCfg.MixCompactionTriggerInterval.GetAsDuration(time.Second))
defer singleTicker.Stop()
log.Info("Compaction trigger manager start")
for {
Expand Down
60 changes: 44 additions & 16 deletions pkg/util/paramtable/component_param.go
Original file line number Diff line number Diff line change
Expand Up @@ -3156,25 +3156,29 @@ type dataCoordConfig struct {
CompactionTaskPrioritizer ParamItem `refreshable:"true"`
CompactionTaskQueueCapacity ParamItem `refreshable:"false"`

CompactionRPCTimeout ParamItem `refreshable:"true"`
CompactionMaxParallelTasks ParamItem `refreshable:"true"`
CompactionWorkerParalleTasks ParamItem `refreshable:"true"`
MinSegmentToMerge ParamItem `refreshable:"true"`
MaxSegmentToMerge ParamItem `refreshable:"true"`
SegmentSmallProportion ParamItem `refreshable:"true"`
SegmentCompactableProportion ParamItem `refreshable:"true"`
SegmentExpansionRate ParamItem `refreshable:"true"`
CompactionTimeoutInSeconds ParamItem `refreshable:"true"`
CompactionDropToleranceInSeconds ParamItem `refreshable:"true"`
CompactionGCIntervalInSeconds ParamItem `refreshable:"true"`
CompactionCheckIntervalInSeconds ParamItem `refreshable:"false"`
CompactionRPCTimeout ParamItem `refreshable:"true"`
CompactionMaxParallelTasks ParamItem `refreshable:"true"`
CompactionWorkerParalleTasks ParamItem `refreshable:"true"`
MinSegmentToMerge ParamItem `refreshable:"true"`
MaxSegmentToMerge ParamItem `refreshable:"true"`
SegmentSmallProportion ParamItem `refreshable:"true"`
SegmentCompactableProportion ParamItem `refreshable:"true"`
SegmentExpansionRate ParamItem `refreshable:"true"`
CompactionTimeoutInSeconds ParamItem `refreshable:"true"`
CompactionDropToleranceInSeconds ParamItem `refreshable:"true"`
CompactionGCIntervalInSeconds ParamItem `refreshable:"true"`
CompactionCheckIntervalInSeconds ParamItem `refreshable:"false"`
MixCompactionTriggerInterval ParamItem `refreshable:"false"`
L0CompactionTriggerInterval ParamItem `refreshable:"false"`
GlobalCompactionInterval ParamItem `refreshable:"false"`

SingleCompactionRatioThreshold ParamItem `refreshable:"true"`
SingleCompactionDeltaLogMaxSize ParamItem `refreshable:"true"`
SingleCompactionExpiredLogMaxSize ParamItem `refreshable:"true"`
SingleCompactionDeltalogMaxNum ParamItem `refreshable:"true"`
GlobalCompactionInterval ParamItem `refreshable:"false"`
ChannelCheckpointMaxLag ParamItem `refreshable:"true"`
SyncSegmentsInterval ParamItem `refreshable:"false"`

ChannelCheckpointMaxLag ParamItem `refreshable:"true"`
SyncSegmentsInterval ParamItem `refreshable:"false"`

// Clustering Compaction
ClusteringCompactionEnable ParamItem `refreshable:"true"`
Expand Down Expand Up @@ -3541,27 +3545,35 @@ During compaction, the size of segment # of rows is able to exceed segment max #
Key: "dataCoord.compaction.single.ratio.threshold",
Version: "2.0.0",
DefaultValue: "0.2",
Doc: "The ratio threshold of a segment to trigger a single compaction, default as 0.2",
Export: true,
}
p.SingleCompactionRatioThreshold.Init(base.mgr)

p.SingleCompactionDeltaLogMaxSize = ParamItem{
Key: "dataCoord.compaction.single.deltalog.maxsize",
Version: "2.0.0",
DefaultValue: strconv.Itoa(2 * 1024 * 1024),
DefaultValue: "16777216",
Doc: "The deltalog size of a segment to trigger a single compaction, default as 16MB",
Export: true,
}
p.SingleCompactionDeltaLogMaxSize.Init(base.mgr)

p.SingleCompactionExpiredLogMaxSize = ParamItem{
Key: "dataCoord.compaction.single.expiredlog.maxsize",
Version: "2.0.0",
DefaultValue: "10485760",
Doc: "The expired log size of a segment to trigger a compaction, default as 10MB",
Export: true,
}
p.SingleCompactionExpiredLogMaxSize.Init(base.mgr)

p.SingleCompactionDeltalogMaxNum = ParamItem{
Key: "dataCoord.compaction.single.deltalog.maxnum",
Version: "2.0.0",
DefaultValue: "200",
Doc: "The deltalog count of a segment to trigger a compaction, default as 200",
Export: true,
}
p.SingleCompactionDeltalogMaxNum.Init(base.mgr)

Expand All @@ -3572,6 +3584,22 @@ During compaction, the size of segment # of rows is able to exceed segment max #
}
p.GlobalCompactionInterval.Init(base.mgr)

p.MixCompactionTriggerInterval = ParamItem{
Key: "dataCoord.compaction.mix.triggerInterval",
Version: "2.5.0",
Doc: "The time interval in seconds for trigger mix compaction, default as 60s",
DefaultValue: "60",
}
p.MixCompactionTriggerInterval.Init(base.mgr)

p.L0CompactionTriggerInterval = ParamItem{
Key: "dataCoord.compaction.levelzero.triggerInterval",
Version: "2.5.0",
Doc: "The time interval in seconds for trigger L0 compaction, default as 10s",
DefaultValue: "10",
}
p.L0CompactionTriggerInterval.Init(base.mgr)

p.ChannelCheckpointMaxLag = ParamItem{
Key: "dataCoord.compaction.channelMaxCPLag",
Version: "2.4.0",
Expand Down
5 changes: 3 additions & 2 deletions tests/integration/compaction/compaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,14 @@ func (s *CompactionSuite) SetupSuite() {
s.MiniClusterSuite.SetupSuite()

paramtable.Init()
paramtable.Get().Save(paramtable.Get().DataCoordCfg.GlobalCompactionInterval.Key, "1")
paramtable.Get().Save(paramtable.Get().DataCoordCfg.MixCompactionTriggerInterval.Key, "1")
paramtable.Get().Save(paramtable.Get().DataCoordCfg.L0CompactionTriggerInterval.Key, "1")
}

func (s *CompactionSuite) TearDownSuite() {
s.MiniClusterSuite.TearDownSuite()

paramtable.Get().Reset(paramtable.Get().DataCoordCfg.GlobalCompactionInterval.Key)
paramtable.Get().Reset(paramtable.Get().DataCoordCfg.L0CompactionTriggerInterval.Key)
}

func TestCompaction(t *testing.T) {
Expand Down
4 changes: 2 additions & 2 deletions tests/integration/compaction/l2_single_compaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,9 +275,9 @@ func (s *L2SingleCompactionSuite) TestL2SingleCompaction() {
func TestL2SingleCompaction(t *testing.T) {
paramtable.Init()
// to speed up the test
paramtable.Get().Save(paramtable.Get().DataCoordCfg.GlobalCompactionInterval.Key, "10")
paramtable.Get().Save(paramtable.Get().DataCoordCfg.MixCompactionTriggerInterval.Key, "10")
paramtable.Get().Save(paramtable.Get().DataCoordCfg.LevelZeroCompactionTriggerDeltalogMinNum.Key, "0")
defer paramtable.Get().Reset(paramtable.Get().DataCoordCfg.GlobalCompactionInterval.Key)
defer paramtable.Get().Reset(paramtable.Get().DataCoordCfg.MixCompactionTriggerInterval.Key)
defer paramtable.Get().Reset(paramtable.Get().DataCoordCfg.LevelZeroCompactionTriggerDeltalogMinNum.Key)

suite.Run(t, new(L2SingleCompactionSuite))
Expand Down

0 comments on commit 889e2d3

Please sign in to comment.