Skip to content

Commit

Permalink
fix: Separate L0 and Mix trigger interval (#37190)
Browse files Browse the repository at this point in the history
See also: #37108

- Add MixCompactionTriggerInterval, default 60s
- Add L0CompactionTriggerInterval, default 10s
- Export Single related compaction configs
- Raise SingleCompactionDeltaLogMaxSize from 2MB to 16MB

---------

Signed-off-by: yangxuan <[email protected]>
  • Loading branch information
XuanYang-cn authored Nov 12, 2024
1 parent 2a4c00d commit a45a288
Show file tree
Hide file tree
Showing 7 changed files with 76 additions and 34 deletions.
27 changes: 19 additions & 8 deletions configs/milvus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ etcd:
password: # password for etcd authentication

metastore:
type: etcd # Default value: etcd, Valid values: [etcd, tikv]
type: etcd # Default value: etcd, Valid values: [etcd, tikv]
snapshot:
ttl: 86400 # snapshot ttl in seconds
reserveTime: 3600 # snapshot reserve time in seconds
Expand Down Expand Up @@ -564,6 +564,23 @@ dataCoord:
maxParallelTaskNum: 10
dropTolerance: 86400 # Compaction task will be cleaned after finish longer than this time(in seconds)
gcInterval: 1800 # The time interval in seconds for compaction gc
mix:
triggerInterval: 60 # The time interval in seconds to trigger mix compaction
levelzero:
triggerInterval: 10 # The time interval in seconds for trigger L0 compaction
forceTrigger:
minSize: 8388608 # The minmum size in bytes to force trigger a LevelZero Compaction, default as 8MB
maxSize: 67108864 # The maxmum size in bytes to force trigger a LevelZero Compaction, default as 64MB
deltalogMinNum: 10 # The minimum number of deltalog files to force trigger a LevelZero Compaction
deltalogMaxNum: 30 # The maxmum number of deltalog files to force trigger a LevelZero Compaction, default as 30
single:
ratio:
threshold: 0.2 # The ratio threshold of a segment to trigger a single compaction, default as 0.2
deltalog:
maxsize: 16777216 # The deltalog size of a segment to trigger a single compaction, default as 16MB
maxnum: 200 # The deltalog count of a segment to trigger a compaction, default as 200
expiredlog:
maxsize: 10485760 # The expired log size of a segment to trigger a compaction, default as 10MB
clustering:
enable: true # Enable clustering compaction
autoEnable: false # Enable auto clustering compaction
Expand All @@ -579,12 +596,6 @@ dataCoord:
minClusterSizeRatio: 0.01 # minimum cluster size / avg size in Kmeans train
maxClusterSizeRatio: 10 # maximum cluster size / avg size in Kmeans train
maxClusterSize: 5g # maximum cluster size in Kmeans train
levelzero:
forceTrigger:
minSize: 8388608 # The minmum size in bytes to force trigger a LevelZero Compaction, default as 8MB
maxSize: 67108864 # The maxmum size in bytes to force trigger a LevelZero Compaction, default as 64MB
deltalogMinNum: 10 # The minimum number of deltalog files to force trigger a LevelZero Compaction
deltalogMaxNum: 30 # The maxmum number of deltalog files to force trigger a LevelZero Compaction, default as 30
syncSegmentsInterval: 300 # The time interval for regularly syncing segments
enableGarbageCollection: true # Switch value to control if to enable garbage collection to clear the discarded data in MinIO or S3 service.
gc:
Expand Down Expand Up @@ -1077,4 +1088,4 @@ knowhere:
pq_code_budget_gb_ratio: 0.125 # Size limit on the PQ code (compared with raw data)
search_cache_budget_gb_ratio: 0.1 # Ratio of cached node numbers to raw data
search: # Diskann search params
beam_width_ratio: 4.0 # Ratio between the maximum number of IO requests per search iteration and CPU number.
beam_width_ratio: 4.0 # Ratio between the maximum number of IO requests per search iteration and CPU number
4 changes: 1 addition & 3 deletions internal/datacoord/compaction_trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func newCompactionTrigger(
}

func (t *compactionTrigger) start() {
t.globalTrigger = time.NewTicker(Params.DataCoordCfg.GlobalCompactionInterval.GetAsDuration(time.Second))
t.globalTrigger = time.NewTicker(Params.DataCoordCfg.MixCompactionTriggerInterval.GetAsDuration(time.Second))
t.closeWaiter.Add(2)
go func() {
defer logutil.LogPanic()
Expand All @@ -132,8 +132,6 @@ func (t *compactionTrigger) start() {
default:
// no need to handle err in handleSignal
t.handleSignal(signal)
// shouldn't reset, otherwise a frequent flushed collection will affect other collections
// t.globalTrigger.Reset(Params.DataCoordCfg.GlobalCompactionInterval)
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions internal/datacoord/compaction_trigger_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,11 +108,11 @@ func (m *CompactionTriggerManager) startLoop() {
defer logutil.LogPanic()
defer m.closeWg.Done()

l0Ticker := time.NewTicker(Params.DataCoordCfg.GlobalCompactionInterval.GetAsDuration(time.Second))
l0Ticker := time.NewTicker(Params.DataCoordCfg.L0CompactionTriggerInterval.GetAsDuration(time.Second))
defer l0Ticker.Stop()
clusteringTicker := time.NewTicker(Params.DataCoordCfg.ClusteringCompactionTriggerInterval.GetAsDuration(time.Second))
defer clusteringTicker.Stop()
singleTicker := time.NewTicker(Params.DataCoordCfg.GlobalCompactionInterval.GetAsDuration(time.Second))
singleTicker := time.NewTicker(Params.DataCoordCfg.MixCompactionTriggerInterval.GetAsDuration(time.Second))
defer singleTicker.Stop()
log.Info("Compaction trigger manager start")
for {
Expand Down
63 changes: 47 additions & 16 deletions pkg/util/paramtable/component_param.go
Original file line number Diff line number Diff line change
Expand Up @@ -3202,25 +3202,29 @@ type dataCoordConfig struct {
CompactionTaskPrioritizer ParamItem `refreshable:"true"`
CompactionTaskQueueCapacity ParamItem `refreshable:"false"`

CompactionRPCTimeout ParamItem `refreshable:"true"`
CompactionMaxParallelTasks ParamItem `refreshable:"true"`
CompactionWorkerParalleTasks ParamItem `refreshable:"true"`
MinSegmentToMerge ParamItem `refreshable:"true"`
MaxSegmentToMerge ParamItem `refreshable:"true"`
SegmentSmallProportion ParamItem `refreshable:"true"`
SegmentCompactableProportion ParamItem `refreshable:"true"`
SegmentExpansionRate ParamItem `refreshable:"true"`
CompactionTimeoutInSeconds ParamItem `refreshable:"true"`
CompactionDropToleranceInSeconds ParamItem `refreshable:"true"`
CompactionGCIntervalInSeconds ParamItem `refreshable:"true"`
CompactionCheckIntervalInSeconds ParamItem `refreshable:"false"`
CompactionRPCTimeout ParamItem `refreshable:"true"`
CompactionMaxParallelTasks ParamItem `refreshable:"true"`
CompactionWorkerParalleTasks ParamItem `refreshable:"true"`
MinSegmentToMerge ParamItem `refreshable:"true"`
MaxSegmentToMerge ParamItem `refreshable:"true"`
SegmentSmallProportion ParamItem `refreshable:"true"`
SegmentCompactableProportion ParamItem `refreshable:"true"`
SegmentExpansionRate ParamItem `refreshable:"true"`
CompactionTimeoutInSeconds ParamItem `refreshable:"true"`
CompactionDropToleranceInSeconds ParamItem `refreshable:"true"`
CompactionGCIntervalInSeconds ParamItem `refreshable:"true"`
CompactionCheckIntervalInSeconds ParamItem `refreshable:"false"`
MixCompactionTriggerInterval ParamItem `refreshable:"false"`
L0CompactionTriggerInterval ParamItem `refreshable:"false"`
GlobalCompactionInterval ParamItem `refreshable:"false"`

SingleCompactionRatioThreshold ParamItem `refreshable:"true"`
SingleCompactionDeltaLogMaxSize ParamItem `refreshable:"true"`
SingleCompactionExpiredLogMaxSize ParamItem `refreshable:"true"`
SingleCompactionDeltalogMaxNum ParamItem `refreshable:"true"`
GlobalCompactionInterval ParamItem `refreshable:"false"`
ChannelCheckpointMaxLag ParamItem `refreshable:"true"`
SyncSegmentsInterval ParamItem `refreshable:"false"`

ChannelCheckpointMaxLag ParamItem `refreshable:"true"`
SyncSegmentsInterval ParamItem `refreshable:"false"`

// Clustering Compaction
ClusteringCompactionEnable ParamItem `refreshable:"true"`
Expand Down Expand Up @@ -3592,37 +3596,64 @@ During compaction, the size of segment # of rows is able to exceed segment max #
Key: "dataCoord.compaction.single.ratio.threshold",
Version: "2.0.0",
DefaultValue: "0.2",
Doc: "The ratio threshold of a segment to trigger a single compaction, default as 0.2",
Export: true,
}
p.SingleCompactionRatioThreshold.Init(base.mgr)

p.SingleCompactionDeltaLogMaxSize = ParamItem{
Key: "dataCoord.compaction.single.deltalog.maxsize",
Version: "2.0.0",
DefaultValue: strconv.Itoa(2 * 1024 * 1024),
DefaultValue: "16777216",
Doc: "The deltalog size of a segment to trigger a single compaction, default as 16MB",
Export: true,
}
p.SingleCompactionDeltaLogMaxSize.Init(base.mgr)

p.SingleCompactionExpiredLogMaxSize = ParamItem{
Key: "dataCoord.compaction.single.expiredlog.maxsize",
Version: "2.0.0",
DefaultValue: "10485760",
Doc: "The expired log size of a segment to trigger a compaction, default as 10MB",
Export: true,
}
p.SingleCompactionExpiredLogMaxSize.Init(base.mgr)

p.SingleCompactionDeltalogMaxNum = ParamItem{
Key: "dataCoord.compaction.single.deltalog.maxnum",
Version: "2.0.0",
DefaultValue: "200",
Doc: "The deltalog count of a segment to trigger a compaction, default as 200",
Export: true,
}
p.SingleCompactionDeltalogMaxNum.Init(base.mgr)

p.GlobalCompactionInterval = ParamItem{
Key: "dataCoord.compaction.global.interval",
Version: "2.0.0",
DefaultValue: "60",
Doc: "deprecated",
}
p.GlobalCompactionInterval.Init(base.mgr)

p.MixCompactionTriggerInterval = ParamItem{
Key: "dataCoord.compaction.mix.triggerInterval",
Version: "2.4.15",
Doc: "The time interval in seconds to trigger mix compaction",
DefaultValue: "60",
Export: true,
}
p.MixCompactionTriggerInterval.Init(base.mgr)

p.L0CompactionTriggerInterval = ParamItem{
Key: "dataCoord.compaction.levelzero.triggerInterval",
Version: "2.4.15",
Doc: "The time interval in seconds for trigger L0 compaction",
DefaultValue: "10",
Export: true,
}
p.L0CompactionTriggerInterval.Init(base.mgr)

p.ChannelCheckpointMaxLag = ParamItem{
Key: "dataCoord.compaction.channelMaxCPLag",
Version: "2.4.0",
Expand Down
2 changes: 1 addition & 1 deletion pkg/util/paramtable/service_param.go
Original file line number Diff line number Diff line change
Expand Up @@ -465,7 +465,7 @@ func (p *MetaStoreConfig) Init(base *BaseTable) {
Key: "metastore.type",
Version: "2.2.0",
DefaultValue: util.MetaStoreTypeEtcd,
Doc: `Default value: etcd, Valid values: [etcd, tikv] `,
Doc: `Default value: etcd, Valid values: [etcd, tikv]`,
Export: true,
}
p.MetaStoreType.Init(base.mgr)
Expand Down
6 changes: 4 additions & 2 deletions tests/integration/compaction/compaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,15 @@ func (s *CompactionSuite) SetupSuite() {
s.MiniClusterSuite.SetupSuite()

paramtable.Init()
paramtable.Get().Save(paramtable.Get().DataCoordCfg.GlobalCompactionInterval.Key, "1")
paramtable.Get().Save(paramtable.Get().DataCoordCfg.MixCompactionTriggerInterval.Key, "1")
paramtable.Get().Save(paramtable.Get().DataCoordCfg.L0CompactionTriggerInterval.Key, "1")
}

func (s *CompactionSuite) TearDownSuite() {
s.MiniClusterSuite.TearDownSuite()

paramtable.Get().Reset(paramtable.Get().DataCoordCfg.GlobalCompactionInterval.Key)
paramtable.Get().Reset(paramtable.Get().DataCoordCfg.L0CompactionTriggerInterval.Key)
paramtable.Get().Reset(paramtable.Get().DataCoordCfg.MixCompactionTriggerInterval.Key)
}

func TestCompaction(t *testing.T) {
Expand Down
4 changes: 2 additions & 2 deletions tests/integration/compaction/l2_single_compaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,9 +273,9 @@ func (s *L2SingleCompactionSuite) TestL2SingleCompaction() {
func TestL2SingleCompaction(t *testing.T) {
paramtable.Init()
// to speed up the test
paramtable.Get().Save(paramtable.Get().DataCoordCfg.GlobalCompactionInterval.Key, "10")
paramtable.Get().Save(paramtable.Get().DataCoordCfg.MixCompactionTriggerInterval.Key, "10")
paramtable.Get().Save(paramtable.Get().DataCoordCfg.LevelZeroCompactionTriggerDeltalogMinNum.Key, "0")
defer paramtable.Get().Reset(paramtable.Get().DataCoordCfg.GlobalCompactionInterval.Key)
defer paramtable.Get().Reset(paramtable.Get().DataCoordCfg.MixCompactionTriggerInterval.Key)
defer paramtable.Get().Reset(paramtable.Get().DataCoordCfg.LevelZeroCompactionTriggerDeltalogMinNum.Key)

suite.Run(t, new(L2SingleCompactionSuite))
Expand Down

0 comments on commit a45a288

Please sign in to comment.