From f023274ae06817f3a650128abc8801d49ebadcf5 Mon Sep 17 00:00:00 2001 From: yangxuan Date: Mon, 28 Oct 2024 14:50:12 +0800 Subject: [PATCH] fix: Saperate L0 and Mix trigger interval See also: #37108 pr: #37190 - Add MixCompactionTriggerInterval, default 60s - Add L0CompactionTriggerInterval, default 10s - Export Single related compaction configs - Raise SingleCompactionDeltaLogMaxSize from 2MB to 16MB Signed-off-by: yangxuan --- configs/milvus.yaml | 10 ++++ internal/datacoord/compaction_trigger.go | 4 +- internal/datacoord/compaction_trigger_v2.go | 4 +- pkg/util/paramtable/component_param.go | 60 ++++++++++++++----- .../integration/compaction/compaction_test.go | 5 +- .../compaction/l2_single_compaction_test.go | 4 +- 6 files changed, 62 insertions(+), 25 deletions(-) diff --git a/configs/milvus.yaml b/configs/milvus.yaml index f417774c1eb60..6b1487de9a90d 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -553,6 +553,16 @@ dataCoord: taskQueueCapacity: 256 # compaction task queue size rpcTimeout: 10 maxParallelTaskNum: 10 + dropTolerance: 86400 # Compaction task will be cleaned after finish longer than this time(in seconds) + gcInterval: 1800 # The time interval in seconds for compaction gc + single: + ratio: + threshold: 0.2 # The ratio threshold of a segment to trigger a single compaction, default as 0.2 + deltalog: + maxsize: 16777216 # The deltalog size of a segment to trigger a single compaction, default as 16MB + maxnum: 200 # The deltalog count of a segment to trigger a compaction, default as 200 + expiredlog: + maxsize: 10485760 # The expired log size of a segment to trigger a compaction, default as 10MB clustering: enable: true # Enable clustering compaction autoEnable: false # Enable auto clustering compaction diff --git a/internal/datacoord/compaction_trigger.go b/internal/datacoord/compaction_trigger.go index 6e98e632f9fcc..c92e3730b0dee 100644 --- a/internal/datacoord/compaction_trigger.go +++ b/internal/datacoord/compaction_trigger.go @@ -108,7 +108,7 @@ func newCompactionTrigger( } func (t *compactionTrigger) start() { - t.globalTrigger = time.NewTicker(Params.DataCoordCfg.GlobalCompactionInterval.GetAsDuration(time.Second)) + t.globalTrigger = time.NewTicker(Params.DataCoordCfg.MixCompactionTriggerInterval.GetAsDuration(time.Second)) t.closeWaiter.Add(2) go func() { defer logutil.LogPanic() @@ -131,8 +131,6 @@ func (t *compactionTrigger) start() { default: // no need to handle err in handleSignal t.handleSignal(signal) - // shouldn't reset, otherwise a frequent flushed collection will affect other collections - // t.globalTrigger.Reset(Params.DataCoordCfg.GlobalCompactionInterval) } } } diff --git a/internal/datacoord/compaction_trigger_v2.go b/internal/datacoord/compaction_trigger_v2.go index b18a5fa034b92..481a62de2976e 100644 --- a/internal/datacoord/compaction_trigger_v2.go +++ b/internal/datacoord/compaction_trigger_v2.go @@ -105,11 +105,11 @@ func (m *CompactionTriggerManager) startLoop() { defer logutil.LogPanic() defer m.closeWg.Done() - l0Ticker := time.NewTicker(Params.DataCoordCfg.GlobalCompactionInterval.GetAsDuration(time.Second)) + l0Ticker := time.NewTicker(Params.DataCoordCfg.L0CompactionTriggerInterval.GetAsDuration(time.Second)) defer l0Ticker.Stop() clusteringTicker := time.NewTicker(Params.DataCoordCfg.ClusteringCompactionTriggerInterval.GetAsDuration(time.Second)) defer clusteringTicker.Stop() - singleTicker := time.NewTicker(Params.DataCoordCfg.GlobalCompactionInterval.GetAsDuration(time.Second)) + singleTicker := time.NewTicker(Params.DataCoordCfg.MixCompactionTriggerInterval.GetAsDuration(time.Second)) defer singleTicker.Stop() log.Info("Compaction trigger manager start") for { diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index f0863a8ae62d2..05f9802836b8a 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -3156,25 +3156,29 @@ type dataCoordConfig struct { CompactionTaskPrioritizer ParamItem `refreshable:"true"` CompactionTaskQueueCapacity ParamItem `refreshable:"false"` - CompactionRPCTimeout ParamItem `refreshable:"true"` - CompactionMaxParallelTasks ParamItem `refreshable:"true"` - CompactionWorkerParalleTasks ParamItem `refreshable:"true"` - MinSegmentToMerge ParamItem `refreshable:"true"` - MaxSegmentToMerge ParamItem `refreshable:"true"` - SegmentSmallProportion ParamItem `refreshable:"true"` - SegmentCompactableProportion ParamItem `refreshable:"true"` - SegmentExpansionRate ParamItem `refreshable:"true"` - CompactionTimeoutInSeconds ParamItem `refreshable:"true"` - CompactionDropToleranceInSeconds ParamItem `refreshable:"true"` - CompactionGCIntervalInSeconds ParamItem `refreshable:"true"` - CompactionCheckIntervalInSeconds ParamItem `refreshable:"false"` + CompactionRPCTimeout ParamItem `refreshable:"true"` + CompactionMaxParallelTasks ParamItem `refreshable:"true"` + CompactionWorkerParalleTasks ParamItem `refreshable:"true"` + MinSegmentToMerge ParamItem `refreshable:"true"` + MaxSegmentToMerge ParamItem `refreshable:"true"` + SegmentSmallProportion ParamItem `refreshable:"true"` + SegmentCompactableProportion ParamItem `refreshable:"true"` + SegmentExpansionRate ParamItem `refreshable:"true"` + CompactionTimeoutInSeconds ParamItem `refreshable:"true"` + CompactionDropToleranceInSeconds ParamItem `refreshable:"true"` + CompactionGCIntervalInSeconds ParamItem `refreshable:"true"` + CompactionCheckIntervalInSeconds ParamItem `refreshable:"false"` + MixCompactionTriggerInterval ParamItem `refreshable:"false"` + L0CompactionTriggerInterval ParamItem `refreshable:"false"` + GlobalCompactionInterval ParamItem `refreshable:"false"` + SingleCompactionRatioThreshold ParamItem `refreshable:"true"` SingleCompactionDeltaLogMaxSize ParamItem `refreshable:"true"` SingleCompactionExpiredLogMaxSize ParamItem `refreshable:"true"` SingleCompactionDeltalogMaxNum ParamItem `refreshable:"true"` - GlobalCompactionInterval ParamItem `refreshable:"false"` - ChannelCheckpointMaxLag ParamItem `refreshable:"true"` - SyncSegmentsInterval ParamItem `refreshable:"false"` + + ChannelCheckpointMaxLag ParamItem `refreshable:"true"` + SyncSegmentsInterval ParamItem `refreshable:"false"` // Clustering Compaction ClusteringCompactionEnable ParamItem `refreshable:"true"` @@ -3541,13 +3545,17 @@ During compaction, the size of segment # of rows is able to exceed segment max # Key: "dataCoord.compaction.single.ratio.threshold", Version: "2.0.0", DefaultValue: "0.2", + Doc: "The ratio threshold of a segment to trigger a single compaction, default as 0.2", + Export: true, } p.SingleCompactionRatioThreshold.Init(base.mgr) p.SingleCompactionDeltaLogMaxSize = ParamItem{ Key: "dataCoord.compaction.single.deltalog.maxsize", Version: "2.0.0", - DefaultValue: strconv.Itoa(2 * 1024 * 1024), + DefaultValue: "16777216", + Doc: "The deltalog size of a segment to trigger a single compaction, default as 16MB", + Export: true, } p.SingleCompactionDeltaLogMaxSize.Init(base.mgr) @@ -3555,6 +3563,8 @@ During compaction, the size of segment # of rows is able to exceed segment max # Key: "dataCoord.compaction.single.expiredlog.maxsize", Version: "2.0.0", DefaultValue: "10485760", + Doc: "The expired log size of a segment to trigger a compaction, default as 10MB", + Export: true, } p.SingleCompactionExpiredLogMaxSize.Init(base.mgr) @@ -3562,6 +3572,8 @@ During compaction, the size of segment # of rows is able to exceed segment max # Key: "dataCoord.compaction.single.deltalog.maxnum", Version: "2.0.0", DefaultValue: "200", + Doc: "The deltalog count of a segment to trigger a compaction, default as 200", + Export: true, } p.SingleCompactionDeltalogMaxNum.Init(base.mgr) @@ -3572,6 +3584,22 @@ During compaction, the size of segment # of rows is able to exceed segment max # } p.GlobalCompactionInterval.Init(base.mgr) + p.MixCompactionTriggerInterval = ParamItem{ + Key: "dataCoord.compaction.mix.triggerInterval", + Version: "2.4.15", + Doc: "The time interval in seconds for trigger mix compaction, default as 60s", + DefaultValue: "60", + } + p.MixCompactionTriggerInterval.Init(base.mgr) + + p.L0CompactionTriggerInterval = ParamItem{ + Key: "dataCoord.compaction.levelzero.triggerInterval", + Version: "2.4.15", + Doc: "The time interval in seconds for trigger L0 compaction, default as 10s", + DefaultValue: "10", + } + p.L0CompactionTriggerInterval.Init(base.mgr) + p.ChannelCheckpointMaxLag = ParamItem{ Key: "dataCoord.compaction.channelMaxCPLag", Version: "2.4.0", diff --git a/tests/integration/compaction/compaction_test.go b/tests/integration/compaction/compaction_test.go index 2e738e00fb6c8..cda5a6d1ff6fa 100644 --- a/tests/integration/compaction/compaction_test.go +++ b/tests/integration/compaction/compaction_test.go @@ -33,13 +33,14 @@ func (s *CompactionSuite) SetupSuite() { s.MiniClusterSuite.SetupSuite() paramtable.Init() - paramtable.Get().Save(paramtable.Get().DataCoordCfg.GlobalCompactionInterval.Key, "1") + paramtable.Get().Save(paramtable.Get().DataCoordCfg.MixCompactionTriggerInterval.Key, "1") + paramtable.Get().Save(paramtable.Get().DataCoordCfg.L0CompactionTriggerInterval.Key, "1") } func (s *CompactionSuite) TearDownSuite() { s.MiniClusterSuite.TearDownSuite() - paramtable.Get().Reset(paramtable.Get().DataCoordCfg.GlobalCompactionInterval.Key) + paramtable.Get().Reset(paramtable.Get().DataCoordCfg.L0CompactionTriggerInterval.Key) } func TestCompaction(t *testing.T) { diff --git a/tests/integration/compaction/l2_single_compaction_test.go b/tests/integration/compaction/l2_single_compaction_test.go index 90c9daf4e1def..fa97ea5c09969 100644 --- a/tests/integration/compaction/l2_single_compaction_test.go +++ b/tests/integration/compaction/l2_single_compaction_test.go @@ -275,9 +275,9 @@ func (s *L2SingleCompactionSuite) TestL2SingleCompaction() { func TestL2SingleCompaction(t *testing.T) { paramtable.Init() // to speed up the test - paramtable.Get().Save(paramtable.Get().DataCoordCfg.GlobalCompactionInterval.Key, "10") + paramtable.Get().Save(paramtable.Get().DataCoordCfg.MixCompactionTriggerInterval.Key, "10") paramtable.Get().Save(paramtable.Get().DataCoordCfg.LevelZeroCompactionTriggerDeltalogMinNum.Key, "0") - defer paramtable.Get().Reset(paramtable.Get().DataCoordCfg.GlobalCompactionInterval.Key) + defer paramtable.Get().Reset(paramtable.Get().DataCoordCfg.MixCompactionTriggerInterval.Key) defer paramtable.Get().Reset(paramtable.Get().DataCoordCfg.LevelZeroCompactionTriggerDeltalogMinNum.Key) suite.Run(t, new(L2SingleCompactionSuite))