diff --git a/configs/milvus.yaml b/configs/milvus.yaml index f417774c1eb60..de101742d6d5a 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -553,6 +553,23 @@ dataCoord: taskQueueCapacity: 256 # compaction task queue size rpcTimeout: 10 maxParallelTaskNum: 10 + mix: + triggerInterval: 60 # The time interval in seconds to trigger mix compaction + levelzero: + triggerInterval: 10 # The time interval in seconds for trigger L0 compaction + forceTrigger: + minSize: 8388608 # The minmum size in bytes to force trigger a LevelZero Compaction, default as 8MB + maxSize: 67108864 # The maxmum size in bytes to force trigger a LevelZero Compaction, default as 64MB + deltalogMinNum: 10 # The minimum number of deltalog files to force trigger a LevelZero Compaction + deltalogMaxNum: 30 # The maxmum number of deltalog files to force trigger a LevelZero Compaction, default as 30 + single: + ratio: + threshold: 0.2 # The ratio threshold of a segment to trigger a single compaction, default as 0.2 + deltalog: + maxsize: 16777216 # The deltalog size of a segment to trigger a single compaction, default as 16MB + maxnum: 200 # The deltalog count of a segment to trigger a compaction, default as 200 + expiredlog: + maxsize: 10485760 # The expired log size of a segment to trigger a compaction, default as 10MB clustering: enable: true # Enable clustering compaction autoEnable: false # Enable auto clustering compaction @@ -568,12 +585,6 @@ dataCoord: minClusterSizeRatio: 0.01 # minimum cluster size / avg size in Kmeans train maxClusterSizeRatio: 10 # maximum cluster size / avg size in Kmeans train maxClusterSize: 5g # maximum cluster size in Kmeans train - levelzero: - forceTrigger: - minSize: 8388608 # The minmum size in bytes to force trigger a LevelZero Compaction, default as 8MB - maxSize: 67108864 # The maxmum size in bytes to force trigger a LevelZero Compaction, default as 64MB - deltalogMinNum: 10 # The minimum number of deltalog files to force trigger a LevelZero Compaction - deltalogMaxNum: 30 # The maxmum number of deltalog files to force trigger a LevelZero Compaction, default as 30 syncSegmentsInterval: 300 # The time interval for regularly syncing segments enableGarbageCollection: true # Switch value to control if to enable garbage collection to clear the discarded data in MinIO or S3 service. gc: diff --git a/internal/datacoord/compaction_trigger.go b/internal/datacoord/compaction_trigger.go index 6e98e632f9fcc..c92e3730b0dee 100644 --- a/internal/datacoord/compaction_trigger.go +++ b/internal/datacoord/compaction_trigger.go @@ -108,7 +108,7 @@ func newCompactionTrigger( } func (t *compactionTrigger) start() { - t.globalTrigger = time.NewTicker(Params.DataCoordCfg.GlobalCompactionInterval.GetAsDuration(time.Second)) + t.globalTrigger = time.NewTicker(Params.DataCoordCfg.MixCompactionTriggerInterval.GetAsDuration(time.Second)) t.closeWaiter.Add(2) go func() { defer logutil.LogPanic() @@ -131,8 +131,6 @@ func (t *compactionTrigger) start() { default: // no need to handle err in handleSignal t.handleSignal(signal) - // shouldn't reset, otherwise a frequent flushed collection will affect other collections - // t.globalTrigger.Reset(Params.DataCoordCfg.GlobalCompactionInterval) } } } diff --git a/internal/datacoord/compaction_trigger_v2.go b/internal/datacoord/compaction_trigger_v2.go index b18a5fa034b92..481a62de2976e 100644 --- a/internal/datacoord/compaction_trigger_v2.go +++ b/internal/datacoord/compaction_trigger_v2.go @@ -105,11 +105,11 @@ func (m *CompactionTriggerManager) startLoop() { defer logutil.LogPanic() defer m.closeWg.Done() - l0Ticker := time.NewTicker(Params.DataCoordCfg.GlobalCompactionInterval.GetAsDuration(time.Second)) + l0Ticker := time.NewTicker(Params.DataCoordCfg.L0CompactionTriggerInterval.GetAsDuration(time.Second)) defer l0Ticker.Stop() clusteringTicker := time.NewTicker(Params.DataCoordCfg.ClusteringCompactionTriggerInterval.GetAsDuration(time.Second)) defer clusteringTicker.Stop() - singleTicker := time.NewTicker(Params.DataCoordCfg.GlobalCompactionInterval.GetAsDuration(time.Second)) + singleTicker := time.NewTicker(Params.DataCoordCfg.MixCompactionTriggerInterval.GetAsDuration(time.Second)) defer singleTicker.Stop() log.Info("Compaction trigger manager start") for { diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index f0863a8ae62d2..fa99de454df4b 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -3156,25 +3156,29 @@ type dataCoordConfig struct { CompactionTaskPrioritizer ParamItem `refreshable:"true"` CompactionTaskQueueCapacity ParamItem `refreshable:"false"` - CompactionRPCTimeout ParamItem `refreshable:"true"` - CompactionMaxParallelTasks ParamItem `refreshable:"true"` - CompactionWorkerParalleTasks ParamItem `refreshable:"true"` - MinSegmentToMerge ParamItem `refreshable:"true"` - MaxSegmentToMerge ParamItem `refreshable:"true"` - SegmentSmallProportion ParamItem `refreshable:"true"` - SegmentCompactableProportion ParamItem `refreshable:"true"` - SegmentExpansionRate ParamItem `refreshable:"true"` - CompactionTimeoutInSeconds ParamItem `refreshable:"true"` - CompactionDropToleranceInSeconds ParamItem `refreshable:"true"` - CompactionGCIntervalInSeconds ParamItem `refreshable:"true"` - CompactionCheckIntervalInSeconds ParamItem `refreshable:"false"` + CompactionRPCTimeout ParamItem `refreshable:"true"` + CompactionMaxParallelTasks ParamItem `refreshable:"true"` + CompactionWorkerParalleTasks ParamItem `refreshable:"true"` + MinSegmentToMerge ParamItem `refreshable:"true"` + MaxSegmentToMerge ParamItem `refreshable:"true"` + SegmentSmallProportion ParamItem `refreshable:"true"` + SegmentCompactableProportion ParamItem `refreshable:"true"` + SegmentExpansionRate ParamItem `refreshable:"true"` + CompactionTimeoutInSeconds ParamItem `refreshable:"true"` + CompactionDropToleranceInSeconds ParamItem `refreshable:"true"` + CompactionGCIntervalInSeconds ParamItem `refreshable:"true"` + CompactionCheckIntervalInSeconds ParamItem `refreshable:"false"` + MixCompactionTriggerInterval ParamItem `refreshable:"false"` + L0CompactionTriggerInterval ParamItem `refreshable:"false"` + GlobalCompactionInterval ParamItem `refreshable:"false"` + SingleCompactionRatioThreshold ParamItem `refreshable:"true"` SingleCompactionDeltaLogMaxSize ParamItem `refreshable:"true"` SingleCompactionExpiredLogMaxSize ParamItem `refreshable:"true"` SingleCompactionDeltalogMaxNum ParamItem `refreshable:"true"` - GlobalCompactionInterval ParamItem `refreshable:"false"` - ChannelCheckpointMaxLag ParamItem `refreshable:"true"` - SyncSegmentsInterval ParamItem `refreshable:"false"` + + ChannelCheckpointMaxLag ParamItem `refreshable:"true"` + SyncSegmentsInterval ParamItem `refreshable:"false"` // Clustering Compaction ClusteringCompactionEnable ParamItem `refreshable:"true"` @@ -3541,13 +3545,17 @@ During compaction, the size of segment # of rows is able to exceed segment max # Key: "dataCoord.compaction.single.ratio.threshold", Version: "2.0.0", DefaultValue: "0.2", + Doc: "The ratio threshold of a segment to trigger a single compaction, default as 0.2", + Export: true, } p.SingleCompactionRatioThreshold.Init(base.mgr) p.SingleCompactionDeltaLogMaxSize = ParamItem{ Key: "dataCoord.compaction.single.deltalog.maxsize", Version: "2.0.0", - DefaultValue: strconv.Itoa(2 * 1024 * 1024), + DefaultValue: "16777216", + Doc: "The deltalog size of a segment to trigger a single compaction, default as 16MB", + Export: true, } p.SingleCompactionDeltaLogMaxSize.Init(base.mgr) @@ -3555,6 +3563,8 @@ During compaction, the size of segment # of rows is able to exceed segment max # Key: "dataCoord.compaction.single.expiredlog.maxsize", Version: "2.0.0", DefaultValue: "10485760", + Doc: "The expired log size of a segment to trigger a compaction, default as 10MB", + Export: true, } p.SingleCompactionExpiredLogMaxSize.Init(base.mgr) @@ -3562,6 +3572,8 @@ During compaction, the size of segment # of rows is able to exceed segment max # Key: "dataCoord.compaction.single.deltalog.maxnum", Version: "2.0.0", DefaultValue: "200", + Doc: "The deltalog count of a segment to trigger a compaction, default as 200", + Export: true, } p.SingleCompactionDeltalogMaxNum.Init(base.mgr) @@ -3569,9 +3581,28 @@ During compaction, the size of segment # of rows is able to exceed segment max # Key: "dataCoord.compaction.global.interval", Version: "2.0.0", DefaultValue: "60", + Doc: "deprecated", } p.GlobalCompactionInterval.Init(base.mgr) + p.MixCompactionTriggerInterval = ParamItem{ + Key: "dataCoord.compaction.mix.triggerInterval", + Version: "2.4.15", + Doc: "The time interval in seconds to trigger mix compaction", + DefaultValue: "60", + Export: true, + } + p.MixCompactionTriggerInterval.Init(base.mgr) + + p.L0CompactionTriggerInterval = ParamItem{ + Key: "dataCoord.compaction.levelzero.triggerInterval", + Version: "2.4.15", + Doc: "The time interval in seconds for trigger L0 compaction", + DefaultValue: "10", + Export: true, + } + p.L0CompactionTriggerInterval.Init(base.mgr) + p.ChannelCheckpointMaxLag = ParamItem{ Key: "dataCoord.compaction.channelMaxCPLag", Version: "2.4.0", diff --git a/tests/integration/compaction/compaction_test.go b/tests/integration/compaction/compaction_test.go index 2e738e00fb6c8..cda5a6d1ff6fa 100644 --- a/tests/integration/compaction/compaction_test.go +++ b/tests/integration/compaction/compaction_test.go @@ -33,13 +33,14 @@ func (s *CompactionSuite) SetupSuite() { s.MiniClusterSuite.SetupSuite() paramtable.Init() - paramtable.Get().Save(paramtable.Get().DataCoordCfg.GlobalCompactionInterval.Key, "1") + paramtable.Get().Save(paramtable.Get().DataCoordCfg.MixCompactionTriggerInterval.Key, "1") + paramtable.Get().Save(paramtable.Get().DataCoordCfg.L0CompactionTriggerInterval.Key, "1") } func (s *CompactionSuite) TearDownSuite() { s.MiniClusterSuite.TearDownSuite() - paramtable.Get().Reset(paramtable.Get().DataCoordCfg.GlobalCompactionInterval.Key) + paramtable.Get().Reset(paramtable.Get().DataCoordCfg.L0CompactionTriggerInterval.Key) } func TestCompaction(t *testing.T) { diff --git a/tests/integration/compaction/l2_single_compaction_test.go b/tests/integration/compaction/l2_single_compaction_test.go index 90c9daf4e1def..fa97ea5c09969 100644 --- a/tests/integration/compaction/l2_single_compaction_test.go +++ b/tests/integration/compaction/l2_single_compaction_test.go @@ -275,9 +275,9 @@ func (s *L2SingleCompactionSuite) TestL2SingleCompaction() { func TestL2SingleCompaction(t *testing.T) { paramtable.Init() // to speed up the test - paramtable.Get().Save(paramtable.Get().DataCoordCfg.GlobalCompactionInterval.Key, "10") + paramtable.Get().Save(paramtable.Get().DataCoordCfg.MixCompactionTriggerInterval.Key, "10") paramtable.Get().Save(paramtable.Get().DataCoordCfg.LevelZeroCompactionTriggerDeltalogMinNum.Key, "0") - defer paramtable.Get().Reset(paramtable.Get().DataCoordCfg.GlobalCompactionInterval.Key) + defer paramtable.Get().Reset(paramtable.Get().DataCoordCfg.MixCompactionTriggerInterval.Key) defer paramtable.Get().Reset(paramtable.Get().DataCoordCfg.LevelZeroCompactionTriggerDeltalogMinNum.Key) suite.Run(t, new(L2SingleCompactionSuite))