Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Separate L0 and Mix trigger interval #37190

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 19 additions & 8 deletions configs/milvus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ etcd:
password: # password for etcd authentication

metastore:
type: etcd # Default value: etcd, Valid values: [etcd, tikv]
type: etcd # Default value: etcd, Valid values: [etcd, tikv]
snapshot:
ttl: 86400 # snapshot ttl in seconds
reserveTime: 3600 # snapshot reserve time in seconds
Expand Down Expand Up @@ -564,6 +564,23 @@ dataCoord:
maxParallelTaskNum: 10
dropTolerance: 86400 # Compaction task will be cleaned after finish longer than this time(in seconds)
gcInterval: 1800 # The time interval in seconds for compaction gc
mix:
triggerInterval: 60 # The time interval in seconds to trigger mix compaction
levelzero:
triggerInterval: 10 # The time interval in seconds for trigger L0 compaction
forceTrigger:
minSize: 8388608 # The minmum size in bytes to force trigger a LevelZero Compaction, default as 8MB
maxSize: 67108864 # The maxmum size in bytes to force trigger a LevelZero Compaction, default as 64MB
deltalogMinNum: 10 # The minimum number of deltalog files to force trigger a LevelZero Compaction
deltalogMaxNum: 30 # The maxmum number of deltalog files to force trigger a LevelZero Compaction, default as 30
single:
ratio:
threshold: 0.2 # The ratio threshold of a segment to trigger a single compaction, default as 0.2
deltalog:
maxsize: 16777216 # The deltalog size of a segment to trigger a single compaction, default as 16MB
maxnum: 200 # The deltalog count of a segment to trigger a compaction, default as 200
expiredlog:
maxsize: 10485760 # The expired log size of a segment to trigger a compaction, default as 10MB
clustering:
enable: true # Enable clustering compaction
autoEnable: false # Enable auto clustering compaction
Expand All @@ -579,12 +596,6 @@ dataCoord:
minClusterSizeRatio: 0.01 # minimum cluster size / avg size in Kmeans train
maxClusterSizeRatio: 10 # maximum cluster size / avg size in Kmeans train
maxClusterSize: 5g # maximum cluster size in Kmeans train
levelzero:
forceTrigger:
minSize: 8388608 # The minmum size in bytes to force trigger a LevelZero Compaction, default as 8MB
maxSize: 67108864 # The maxmum size in bytes to force trigger a LevelZero Compaction, default as 64MB
deltalogMinNum: 10 # The minimum number of deltalog files to force trigger a LevelZero Compaction
deltalogMaxNum: 30 # The maxmum number of deltalog files to force trigger a LevelZero Compaction, default as 30
syncSegmentsInterval: 300 # The time interval for regularly syncing segments
enableGarbageCollection: true # Switch value to control if to enable garbage collection to clear the discarded data in MinIO or S3 service.
gc:
Expand Down Expand Up @@ -1077,4 +1088,4 @@ knowhere:
pq_code_budget_gb_ratio: 0.125 # Size limit on the PQ code (compared with raw data)
search_cache_budget_gb_ratio: 0.1 # Ratio of cached node numbers to raw data
search: # Diskann search params
beam_width_ratio: 4.0 # Ratio between the maximum number of IO requests per search iteration and CPU number.
beam_width_ratio: 4.0 # Ratio between the maximum number of IO requests per search iteration and CPU number
4 changes: 1 addition & 3 deletions internal/datacoord/compaction_trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func newCompactionTrigger(
}

func (t *compactionTrigger) start() {
t.globalTrigger = time.NewTicker(Params.DataCoordCfg.GlobalCompactionInterval.GetAsDuration(time.Second))
t.globalTrigger = time.NewTicker(Params.DataCoordCfg.MixCompactionTriggerInterval.GetAsDuration(time.Second))
t.closeWaiter.Add(2)
go func() {
defer logutil.LogPanic()
Expand All @@ -132,8 +132,6 @@ func (t *compactionTrigger) start() {
default:
// no need to handle err in handleSignal
t.handleSignal(signal)
// shouldn't reset, otherwise a frequent flushed collection will affect other collections
// t.globalTrigger.Reset(Params.DataCoordCfg.GlobalCompactionInterval)
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions internal/datacoord/compaction_trigger_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,11 +108,11 @@ func (m *CompactionTriggerManager) startLoop() {
defer logutil.LogPanic()
defer m.closeWg.Done()

l0Ticker := time.NewTicker(Params.DataCoordCfg.GlobalCompactionInterval.GetAsDuration(time.Second))
l0Ticker := time.NewTicker(Params.DataCoordCfg.L0CompactionTriggerInterval.GetAsDuration(time.Second))
defer l0Ticker.Stop()
clusteringTicker := time.NewTicker(Params.DataCoordCfg.ClusteringCompactionTriggerInterval.GetAsDuration(time.Second))
defer clusteringTicker.Stop()
singleTicker := time.NewTicker(Params.DataCoordCfg.GlobalCompactionInterval.GetAsDuration(time.Second))
singleTicker := time.NewTicker(Params.DataCoordCfg.MixCompactionTriggerInterval.GetAsDuration(time.Second))
defer singleTicker.Stop()
log.Info("Compaction trigger manager start")
for {
Expand Down
63 changes: 47 additions & 16 deletions pkg/util/paramtable/component_param.go
Original file line number Diff line number Diff line change
Expand Up @@ -3202,25 +3202,29 @@ type dataCoordConfig struct {
CompactionTaskPrioritizer ParamItem `refreshable:"true"`
CompactionTaskQueueCapacity ParamItem `refreshable:"false"`

CompactionRPCTimeout ParamItem `refreshable:"true"`
CompactionMaxParallelTasks ParamItem `refreshable:"true"`
CompactionWorkerParalleTasks ParamItem `refreshable:"true"`
MinSegmentToMerge ParamItem `refreshable:"true"`
MaxSegmentToMerge ParamItem `refreshable:"true"`
SegmentSmallProportion ParamItem `refreshable:"true"`
SegmentCompactableProportion ParamItem `refreshable:"true"`
SegmentExpansionRate ParamItem `refreshable:"true"`
CompactionTimeoutInSeconds ParamItem `refreshable:"true"`
CompactionDropToleranceInSeconds ParamItem `refreshable:"true"`
CompactionGCIntervalInSeconds ParamItem `refreshable:"true"`
CompactionCheckIntervalInSeconds ParamItem `refreshable:"false"`
CompactionRPCTimeout ParamItem `refreshable:"true"`
CompactionMaxParallelTasks ParamItem `refreshable:"true"`
CompactionWorkerParalleTasks ParamItem `refreshable:"true"`
MinSegmentToMerge ParamItem `refreshable:"true"`
MaxSegmentToMerge ParamItem `refreshable:"true"`
SegmentSmallProportion ParamItem `refreshable:"true"`
SegmentCompactableProportion ParamItem `refreshable:"true"`
SegmentExpansionRate ParamItem `refreshable:"true"`
CompactionTimeoutInSeconds ParamItem `refreshable:"true"`
CompactionDropToleranceInSeconds ParamItem `refreshable:"true"`
CompactionGCIntervalInSeconds ParamItem `refreshable:"true"`
CompactionCheckIntervalInSeconds ParamItem `refreshable:"false"`
MixCompactionTriggerInterval ParamItem `refreshable:"false"`
L0CompactionTriggerInterval ParamItem `refreshable:"false"`
GlobalCompactionInterval ParamItem `refreshable:"false"`

SingleCompactionRatioThreshold ParamItem `refreshable:"true"`
SingleCompactionDeltaLogMaxSize ParamItem `refreshable:"true"`
SingleCompactionExpiredLogMaxSize ParamItem `refreshable:"true"`
SingleCompactionDeltalogMaxNum ParamItem `refreshable:"true"`
GlobalCompactionInterval ParamItem `refreshable:"false"`
ChannelCheckpointMaxLag ParamItem `refreshable:"true"`
SyncSegmentsInterval ParamItem `refreshable:"false"`

ChannelCheckpointMaxLag ParamItem `refreshable:"true"`
SyncSegmentsInterval ParamItem `refreshable:"false"`

// Clustering Compaction
ClusteringCompactionEnable ParamItem `refreshable:"true"`
Expand Down Expand Up @@ -3592,37 +3596,64 @@ During compaction, the size of segment # of rows is able to exceed segment max #
Key: "dataCoord.compaction.single.ratio.threshold",
Version: "2.0.0",
DefaultValue: "0.2",
Doc: "The ratio threshold of a segment to trigger a single compaction, default as 0.2",
Export: true,
}
p.SingleCompactionRatioThreshold.Init(base.mgr)

p.SingleCompactionDeltaLogMaxSize = ParamItem{
Key: "dataCoord.compaction.single.deltalog.maxsize",
Version: "2.0.0",
DefaultValue: strconv.Itoa(2 * 1024 * 1024),
DefaultValue: "16777216",
Doc: "The deltalog size of a segment to trigger a single compaction, default as 16MB",
Export: true,
}
p.SingleCompactionDeltaLogMaxSize.Init(base.mgr)

p.SingleCompactionExpiredLogMaxSize = ParamItem{
Key: "dataCoord.compaction.single.expiredlog.maxsize",
Version: "2.0.0",
DefaultValue: "10485760",
Doc: "The expired log size of a segment to trigger a compaction, default as 10MB",
Export: true,
}
p.SingleCompactionExpiredLogMaxSize.Init(base.mgr)

p.SingleCompactionDeltalogMaxNum = ParamItem{
Key: "dataCoord.compaction.single.deltalog.maxnum",
Version: "2.0.0",
DefaultValue: "200",
Doc: "The deltalog count of a segment to trigger a compaction, default as 200",
Export: true,
}
p.SingleCompactionDeltalogMaxNum.Init(base.mgr)

p.GlobalCompactionInterval = ParamItem{
Key: "dataCoord.compaction.global.interval",
Version: "2.0.0",
DefaultValue: "60",
Doc: "deprecated",
}
p.GlobalCompactionInterval.Init(base.mgr)

p.MixCompactionTriggerInterval = ParamItem{
Key: "dataCoord.compaction.mix.triggerInterval",
Version: "2.4.15",
Doc: "The time interval in seconds to trigger mix compaction",
DefaultValue: "60",
Export: true,
}
p.MixCompactionTriggerInterval.Init(base.mgr)

p.L0CompactionTriggerInterval = ParamItem{
Key: "dataCoord.compaction.levelzero.triggerInterval",
Version: "2.4.15",
Doc: "The time interval in seconds for trigger L0 compaction",
DefaultValue: "10",
Export: true,
}
p.L0CompactionTriggerInterval.Init(base.mgr)

p.ChannelCheckpointMaxLag = ParamItem{
Key: "dataCoord.compaction.channelMaxCPLag",
Version: "2.4.0",
Expand Down
2 changes: 1 addition & 1 deletion pkg/util/paramtable/service_param.go
Original file line number Diff line number Diff line change
Expand Up @@ -465,7 +465,7 @@ func (p *MetaStoreConfig) Init(base *BaseTable) {
Key: "metastore.type",
Version: "2.2.0",
DefaultValue: util.MetaStoreTypeEtcd,
Doc: `Default value: etcd, Valid values: [etcd, tikv] `,
Doc: `Default value: etcd, Valid values: [etcd, tikv]`,
Export: true,
}
p.MetaStoreType.Init(base.mgr)
Expand Down
6 changes: 4 additions & 2 deletions tests/integration/compaction/compaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,15 @@ func (s *CompactionSuite) SetupSuite() {
s.MiniClusterSuite.SetupSuite()

paramtable.Init()
paramtable.Get().Save(paramtable.Get().DataCoordCfg.GlobalCompactionInterval.Key, "1")
paramtable.Get().Save(paramtable.Get().DataCoordCfg.MixCompactionTriggerInterval.Key, "1")
paramtable.Get().Save(paramtable.Get().DataCoordCfg.L0CompactionTriggerInterval.Key, "1")
}

func (s *CompactionSuite) TearDownSuite() {
s.MiniClusterSuite.TearDownSuite()

paramtable.Get().Reset(paramtable.Get().DataCoordCfg.GlobalCompactionInterval.Key)
paramtable.Get().Reset(paramtable.Get().DataCoordCfg.L0CompactionTriggerInterval.Key)
congqixia marked this conversation as resolved.
Show resolved Hide resolved
paramtable.Get().Reset(paramtable.Get().DataCoordCfg.MixCompactionTriggerInterval.Key)
}

func TestCompaction(t *testing.T) {
Expand Down
4 changes: 2 additions & 2 deletions tests/integration/compaction/l2_single_compaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,9 +273,9 @@ func (s *L2SingleCompactionSuite) TestL2SingleCompaction() {
func TestL2SingleCompaction(t *testing.T) {
paramtable.Init()
// to speed up the test
paramtable.Get().Save(paramtable.Get().DataCoordCfg.GlobalCompactionInterval.Key, "10")
paramtable.Get().Save(paramtable.Get().DataCoordCfg.MixCompactionTriggerInterval.Key, "10")
paramtable.Get().Save(paramtable.Get().DataCoordCfg.LevelZeroCompactionTriggerDeltalogMinNum.Key, "0")
defer paramtable.Get().Reset(paramtable.Get().DataCoordCfg.GlobalCompactionInterval.Key)
defer paramtable.Get().Reset(paramtable.Get().DataCoordCfg.MixCompactionTriggerInterval.Key)
defer paramtable.Get().Reset(paramtable.Get().DataCoordCfg.LevelZeroCompactionTriggerDeltalogMinNum.Key)

suite.Run(t, new(L2SingleCompactionSuite))
Expand Down
Loading