diff --git a/configs/milvus.yaml b/configs/milvus.yaml index 9d6de7e00cb2e..d006a0883d34c 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -564,6 +564,14 @@ dataCoord: workerMaxParallelTaskNum: 2 dropTolerance: 86400 # Compaction task will be cleaned after finish longer than this time(in seconds) gcInterval: 1800 # The time interval in seconds for compaction gc + single: + ratio: + threshold: 0.2 # The ratio threshold of a segment to trigger a single compaction, default as 0.2 + deltalog: + maxsize: 16777216 # The deltalog size of a segment to trigger a single compaction, default as 16MB + maxnum: 200 # The deltalog count of a segment to trigger a compaction, default as 200 + expiredlog: + maxsize: 10485760 # The expired log size of a segment to trigger a compaction, default as 10MB clustering: enable: true # Enable clustering compaction autoEnable: false # Enable auto clustering compaction diff --git a/internal/datacoord/compaction_trigger.go b/internal/datacoord/compaction_trigger.go index 6e27212d36e77..2ba88d107551e 100644 --- a/internal/datacoord/compaction_trigger.go +++ b/internal/datacoord/compaction_trigger.go @@ -109,7 +109,7 @@ func newCompactionTrigger( } func (t *compactionTrigger) start() { - t.globalTrigger = time.NewTicker(Params.DataCoordCfg.GlobalCompactionInterval.GetAsDuration(time.Second)) + t.globalTrigger = time.NewTicker(Params.DataCoordCfg.MixCompactionTriggerInterval.GetAsDuration(time.Second)) t.closeWaiter.Add(2) go func() { defer logutil.LogPanic() @@ -132,8 +132,6 @@ func (t *compactionTrigger) start() { default: // no need to handle err in handleSignal t.handleSignal(signal) - // shouldn't reset, otherwise a frequent flushed collection will affect other collections - // t.globalTrigger.Reset(Params.DataCoordCfg.GlobalCompactionInterval) } } } diff --git a/internal/datacoord/compaction_trigger_v2.go b/internal/datacoord/compaction_trigger_v2.go index 05474bba5402b..25ffae444b3b0 100644 --- a/internal/datacoord/compaction_trigger_v2.go +++ b/internal/datacoord/compaction_trigger_v2.go @@ -108,11 +108,11 @@ func (m *CompactionTriggerManager) startLoop() { defer logutil.LogPanic() defer m.closeWg.Done() - l0Ticker := time.NewTicker(Params.DataCoordCfg.GlobalCompactionInterval.GetAsDuration(time.Second)) + l0Ticker := time.NewTicker(Params.DataCoordCfg.L0CompactionTriggerInterval.GetAsDuration(time.Second)) defer l0Ticker.Stop() clusteringTicker := time.NewTicker(Params.DataCoordCfg.ClusteringCompactionTriggerInterval.GetAsDuration(time.Second)) defer clusteringTicker.Stop() - singleTicker := time.NewTicker(Params.DataCoordCfg.GlobalCompactionInterval.GetAsDuration(time.Second)) + singleTicker := time.NewTicker(Params.DataCoordCfg.MixCompactionTriggerInterval.GetAsDuration(time.Second)) defer singleTicker.Stop() log.Info("Compaction trigger manager start") for { diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index c8ffa4babd23c..038b767b96d2a 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -3189,25 +3189,29 @@ type dataCoordConfig struct { IndexBasedCompaction ParamItem `refreshable:"true"` CompactionTaskPrioritizer ParamItem `refreshable:"true"` - CompactionRPCTimeout ParamItem `refreshable:"true"` - CompactionMaxParallelTasks ParamItem `refreshable:"true"` - CompactionWorkerParalleTasks ParamItem `refreshable:"true"` - MinSegmentToMerge ParamItem `refreshable:"true"` - MaxSegmentToMerge ParamItem `refreshable:"true"` - SegmentSmallProportion ParamItem `refreshable:"true"` - SegmentCompactableProportion ParamItem `refreshable:"true"` - SegmentExpansionRate ParamItem `refreshable:"true"` - CompactionTimeoutInSeconds ParamItem `refreshable:"true"` - CompactionDropToleranceInSeconds ParamItem `refreshable:"true"` - CompactionGCIntervalInSeconds ParamItem `refreshable:"true"` - CompactionCheckIntervalInSeconds ParamItem `refreshable:"false"` + CompactionRPCTimeout ParamItem `refreshable:"true"` + CompactionMaxParallelTasks ParamItem `refreshable:"true"` + CompactionWorkerParalleTasks ParamItem `refreshable:"true"` + MinSegmentToMerge ParamItem `refreshable:"true"` + MaxSegmentToMerge ParamItem `refreshable:"true"` + SegmentSmallProportion ParamItem `refreshable:"true"` + SegmentCompactableProportion ParamItem `refreshable:"true"` + SegmentExpansionRate ParamItem `refreshable:"true"` + CompactionTimeoutInSeconds ParamItem `refreshable:"true"` + CompactionDropToleranceInSeconds ParamItem `refreshable:"true"` + CompactionGCIntervalInSeconds ParamItem `refreshable:"true"` + CompactionCheckIntervalInSeconds ParamItem `refreshable:"false"` + MixCompactionTriggerInterval ParamItem `refreshable:"false"` + L0CompactionTriggerInterval ParamItem `refreshable:"false"` + GlobalCompactionInterval ParamItem `refreshable:"false"` + SingleCompactionRatioThreshold ParamItem `refreshable:"true"` SingleCompactionDeltaLogMaxSize ParamItem `refreshable:"true"` SingleCompactionExpiredLogMaxSize ParamItem `refreshable:"true"` SingleCompactionDeltalogMaxNum ParamItem `refreshable:"true"` - GlobalCompactionInterval ParamItem `refreshable:"false"` - ChannelCheckpointMaxLag ParamItem `refreshable:"true"` - SyncSegmentsInterval ParamItem `refreshable:"false"` + + ChannelCheckpointMaxLag ParamItem `refreshable:"true"` + SyncSegmentsInterval ParamItem `refreshable:"false"` // Clustering Compaction ClusteringCompactionEnable ParamItem `refreshable:"true"` @@ -3578,13 +3582,17 @@ During compaction, the size of segment # of rows is able to exceed segment max # Key: "dataCoord.compaction.single.ratio.threshold", Version: "2.0.0", DefaultValue: "0.2", + Doc: "The ratio threshold of a segment to trigger a single compaction, default as 0.2", + Export: true, } p.SingleCompactionRatioThreshold.Init(base.mgr) p.SingleCompactionDeltaLogMaxSize = ParamItem{ Key: "dataCoord.compaction.single.deltalog.maxsize", Version: "2.0.0", - DefaultValue: strconv.Itoa(2 * 1024 * 1024), + DefaultValue: "16777216", + Doc: "The deltalog size of a segment to trigger a single compaction, default as 16MB", + Export: true, } p.SingleCompactionDeltaLogMaxSize.Init(base.mgr) @@ -3592,6 +3600,8 @@ During compaction, the size of segment # of rows is able to exceed segment max # Key: "dataCoord.compaction.single.expiredlog.maxsize", Version: "2.0.0", DefaultValue: "10485760", + Doc: "The expired log size of a segment to trigger a compaction, default as 10MB", + Export: true, } p.SingleCompactionExpiredLogMaxSize.Init(base.mgr) @@ -3599,6 +3609,8 @@ During compaction, the size of segment # of rows is able to exceed segment max # Key: "dataCoord.compaction.single.deltalog.maxnum", Version: "2.0.0", DefaultValue: "200", + Doc: "The deltalog count of a segment to trigger a compaction, default as 200", + Export: true, } p.SingleCompactionDeltalogMaxNum.Init(base.mgr) @@ -3609,6 +3621,22 @@ During compaction, the size of segment # of rows is able to exceed segment max # } p.GlobalCompactionInterval.Init(base.mgr) + p.MixCompactionTriggerInterval = ParamItem{ + Key: "dataCoord.compaction.mix.triggerInterval", + Version: "2.4.15", + Doc: "The time interval in seconds for trigger mix compaction, default as 60s", + DefaultValue: "60", + } + p.MixCompactionTriggerInterval.Init(base.mgr) + + p.L0CompactionTriggerInterval = ParamItem{ + Key: "dataCoord.compaction.levelzero.triggerInterval", + Version: "2.4.15", + Doc: "The time interval in seconds for trigger L0 compaction, default as 10s", + DefaultValue: "10", + } + p.L0CompactionTriggerInterval.Init(base.mgr) + p.ChannelCheckpointMaxLag = ParamItem{ Key: "dataCoord.compaction.channelMaxCPLag", Version: "2.4.0", diff --git a/tests/integration/compaction/compaction_test.go b/tests/integration/compaction/compaction_test.go index 2e738e00fb6c8..cda5a6d1ff6fa 100644 --- a/tests/integration/compaction/compaction_test.go +++ b/tests/integration/compaction/compaction_test.go @@ -33,13 +33,14 @@ func (s *CompactionSuite) SetupSuite() { s.MiniClusterSuite.SetupSuite() paramtable.Init() - paramtable.Get().Save(paramtable.Get().DataCoordCfg.GlobalCompactionInterval.Key, "1") + paramtable.Get().Save(paramtable.Get().DataCoordCfg.MixCompactionTriggerInterval.Key, "1") + paramtable.Get().Save(paramtable.Get().DataCoordCfg.L0CompactionTriggerInterval.Key, "1") } func (s *CompactionSuite) TearDownSuite() { s.MiniClusterSuite.TearDownSuite() - paramtable.Get().Reset(paramtable.Get().DataCoordCfg.GlobalCompactionInterval.Key) + paramtable.Get().Reset(paramtable.Get().DataCoordCfg.L0CompactionTriggerInterval.Key) } func TestCompaction(t *testing.T) { diff --git a/tests/integration/compaction/l2_single_compaction_test.go b/tests/integration/compaction/l2_single_compaction_test.go index 4c32236c94dc6..43a7bea351ff1 100644 --- a/tests/integration/compaction/l2_single_compaction_test.go +++ b/tests/integration/compaction/l2_single_compaction_test.go @@ -273,9 +273,9 @@ func (s *L2SingleCompactionSuite) TestL2SingleCompaction() { func TestL2SingleCompaction(t *testing.T) { paramtable.Init() // to speed up the test - paramtable.Get().Save(paramtable.Get().DataCoordCfg.GlobalCompactionInterval.Key, "10") + paramtable.Get().Save(paramtable.Get().DataCoordCfg.MixCompactionTriggerInterval.Key, "10") paramtable.Get().Save(paramtable.Get().DataCoordCfg.LevelZeroCompactionTriggerDeltalogMinNum.Key, "0") - defer paramtable.Get().Reset(paramtable.Get().DataCoordCfg.GlobalCompactionInterval.Key) + defer paramtable.Get().Reset(paramtable.Get().DataCoordCfg.MixCompactionTriggerInterval.Key) defer paramtable.Get().Reset(paramtable.Get().DataCoordCfg.LevelZeroCompactionTriggerDeltalogMinNum.Key) suite.Run(t, new(L2SingleCompactionSuite))