From a45a288a25ef3eef8e96837936cde16ce40594c1 Mon Sep 17 00:00:00 2001 From: XuanYang-cn Date: Tue, 12 Nov 2024 10:56:37 +0800 Subject: [PATCH] fix: Separate L0 and Mix trigger interval (#37190) See also: #37108 - Add MixCompactionTriggerInterval, default 60s - Add L0CompactionTriggerInterval, default 10s - Export Single related compaction configs - Raise SingleCompactionDeltaLogMaxSize from 2MB to 16MB --------- Signed-off-by: yangxuan --- configs/milvus.yaml | 27 +++++--- internal/datacoord/compaction_trigger.go | 4 +- internal/datacoord/compaction_trigger_v2.go | 4 +- pkg/util/paramtable/component_param.go | 63 ++++++++++++++----- pkg/util/paramtable/service_param.go | 2 +- .../integration/compaction/compaction_test.go | 6 +- .../compaction/l2_single_compaction_test.go | 4 +- 7 files changed, 76 insertions(+), 34 deletions(-) diff --git a/configs/milvus.yaml b/configs/milvus.yaml index 511353fdddb55..8a21b53744b67 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -63,7 +63,7 @@ etcd: password: # password for etcd authentication metastore: - type: etcd # Default value: etcd, Valid values: [etcd, tikv] + type: etcd # Default value: etcd, Valid values: [etcd, tikv] snapshot: ttl: 86400 # snapshot ttl in seconds reserveTime: 3600 # snapshot reserve time in seconds @@ -564,6 +564,23 @@ dataCoord: maxParallelTaskNum: 10 dropTolerance: 86400 # Compaction task will be cleaned after finish longer than this time(in seconds) gcInterval: 1800 # The time interval in seconds for compaction gc + mix: + triggerInterval: 60 # The time interval in seconds to trigger mix compaction + levelzero: + triggerInterval: 10 # The time interval in seconds for trigger L0 compaction + forceTrigger: + minSize: 8388608 # The minmum size in bytes to force trigger a LevelZero Compaction, default as 8MB + maxSize: 67108864 # The maxmum size in bytes to force trigger a LevelZero Compaction, default as 64MB + deltalogMinNum: 10 # The minimum number of deltalog files to force trigger a LevelZero Compaction + deltalogMaxNum: 30 # The maxmum number of deltalog files to force trigger a LevelZero Compaction, default as 30 + single: + ratio: + threshold: 0.2 # The ratio threshold of a segment to trigger a single compaction, default as 0.2 + deltalog: + maxsize: 16777216 # The deltalog size of a segment to trigger a single compaction, default as 16MB + maxnum: 200 # The deltalog count of a segment to trigger a compaction, default as 200 + expiredlog: + maxsize: 10485760 # The expired log size of a segment to trigger a compaction, default as 10MB clustering: enable: true # Enable clustering compaction autoEnable: false # Enable auto clustering compaction @@ -579,12 +596,6 @@ dataCoord: minClusterSizeRatio: 0.01 # minimum cluster size / avg size in Kmeans train maxClusterSizeRatio: 10 # maximum cluster size / avg size in Kmeans train maxClusterSize: 5g # maximum cluster size in Kmeans train - levelzero: - forceTrigger: - minSize: 8388608 # The minmum size in bytes to force trigger a LevelZero Compaction, default as 8MB - maxSize: 67108864 # The maxmum size in bytes to force trigger a LevelZero Compaction, default as 64MB - deltalogMinNum: 10 # The minimum number of deltalog files to force trigger a LevelZero Compaction - deltalogMaxNum: 30 # The maxmum number of deltalog files to force trigger a LevelZero Compaction, default as 30 syncSegmentsInterval: 300 # The time interval for regularly syncing segments enableGarbageCollection: true # Switch value to control if to enable garbage collection to clear the discarded data in MinIO or S3 service. gc: @@ -1077,4 +1088,4 @@ knowhere: pq_code_budget_gb_ratio: 0.125 # Size limit on the PQ code (compared with raw data) search_cache_budget_gb_ratio: 0.1 # Ratio of cached node numbers to raw data search: # Diskann search params - beam_width_ratio: 4.0 # Ratio between the maximum number of IO requests per search iteration and CPU number. \ No newline at end of file + beam_width_ratio: 4.0 # Ratio between the maximum number of IO requests per search iteration and CPU number diff --git a/internal/datacoord/compaction_trigger.go b/internal/datacoord/compaction_trigger.go index 6e27212d36e77..2ba88d107551e 100644 --- a/internal/datacoord/compaction_trigger.go +++ b/internal/datacoord/compaction_trigger.go @@ -109,7 +109,7 @@ func newCompactionTrigger( } func (t *compactionTrigger) start() { - t.globalTrigger = time.NewTicker(Params.DataCoordCfg.GlobalCompactionInterval.GetAsDuration(time.Second)) + t.globalTrigger = time.NewTicker(Params.DataCoordCfg.MixCompactionTriggerInterval.GetAsDuration(time.Second)) t.closeWaiter.Add(2) go func() { defer logutil.LogPanic() @@ -132,8 +132,6 @@ func (t *compactionTrigger) start() { default: // no need to handle err in handleSignal t.handleSignal(signal) - // shouldn't reset, otherwise a frequent flushed collection will affect other collections - // t.globalTrigger.Reset(Params.DataCoordCfg.GlobalCompactionInterval) } } } diff --git a/internal/datacoord/compaction_trigger_v2.go b/internal/datacoord/compaction_trigger_v2.go index 05474bba5402b..25ffae444b3b0 100644 --- a/internal/datacoord/compaction_trigger_v2.go +++ b/internal/datacoord/compaction_trigger_v2.go @@ -108,11 +108,11 @@ func (m *CompactionTriggerManager) startLoop() { defer logutil.LogPanic() defer m.closeWg.Done() - l0Ticker := time.NewTicker(Params.DataCoordCfg.GlobalCompactionInterval.GetAsDuration(time.Second)) + l0Ticker := time.NewTicker(Params.DataCoordCfg.L0CompactionTriggerInterval.GetAsDuration(time.Second)) defer l0Ticker.Stop() clusteringTicker := time.NewTicker(Params.DataCoordCfg.ClusteringCompactionTriggerInterval.GetAsDuration(time.Second)) defer clusteringTicker.Stop() - singleTicker := time.NewTicker(Params.DataCoordCfg.GlobalCompactionInterval.GetAsDuration(time.Second)) + singleTicker := time.NewTicker(Params.DataCoordCfg.MixCompactionTriggerInterval.GetAsDuration(time.Second)) defer singleTicker.Stop() log.Info("Compaction trigger manager start") for { diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index 2bc5a3eeeee4f..88d42806229ea 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -3202,25 +3202,29 @@ type dataCoordConfig struct { CompactionTaskPrioritizer ParamItem `refreshable:"true"` CompactionTaskQueueCapacity ParamItem `refreshable:"false"` - CompactionRPCTimeout ParamItem `refreshable:"true"` - CompactionMaxParallelTasks ParamItem `refreshable:"true"` - CompactionWorkerParalleTasks ParamItem `refreshable:"true"` - MinSegmentToMerge ParamItem `refreshable:"true"` - MaxSegmentToMerge ParamItem `refreshable:"true"` - SegmentSmallProportion ParamItem `refreshable:"true"` - SegmentCompactableProportion ParamItem `refreshable:"true"` - SegmentExpansionRate ParamItem `refreshable:"true"` - CompactionTimeoutInSeconds ParamItem `refreshable:"true"` - CompactionDropToleranceInSeconds ParamItem `refreshable:"true"` - CompactionGCIntervalInSeconds ParamItem `refreshable:"true"` - CompactionCheckIntervalInSeconds ParamItem `refreshable:"false"` + CompactionRPCTimeout ParamItem `refreshable:"true"` + CompactionMaxParallelTasks ParamItem `refreshable:"true"` + CompactionWorkerParalleTasks ParamItem `refreshable:"true"` + MinSegmentToMerge ParamItem `refreshable:"true"` + MaxSegmentToMerge ParamItem `refreshable:"true"` + SegmentSmallProportion ParamItem `refreshable:"true"` + SegmentCompactableProportion ParamItem `refreshable:"true"` + SegmentExpansionRate ParamItem `refreshable:"true"` + CompactionTimeoutInSeconds ParamItem `refreshable:"true"` + CompactionDropToleranceInSeconds ParamItem `refreshable:"true"` + CompactionGCIntervalInSeconds ParamItem `refreshable:"true"` + CompactionCheckIntervalInSeconds ParamItem `refreshable:"false"` + MixCompactionTriggerInterval ParamItem `refreshable:"false"` + L0CompactionTriggerInterval ParamItem `refreshable:"false"` + GlobalCompactionInterval ParamItem `refreshable:"false"` + SingleCompactionRatioThreshold ParamItem `refreshable:"true"` SingleCompactionDeltaLogMaxSize ParamItem `refreshable:"true"` SingleCompactionExpiredLogMaxSize ParamItem `refreshable:"true"` SingleCompactionDeltalogMaxNum ParamItem `refreshable:"true"` - GlobalCompactionInterval ParamItem `refreshable:"false"` - ChannelCheckpointMaxLag ParamItem `refreshable:"true"` - SyncSegmentsInterval ParamItem `refreshable:"false"` + + ChannelCheckpointMaxLag ParamItem `refreshable:"true"` + SyncSegmentsInterval ParamItem `refreshable:"false"` // Clustering Compaction ClusteringCompactionEnable ParamItem `refreshable:"true"` @@ -3592,13 +3596,17 @@ During compaction, the size of segment # of rows is able to exceed segment max # Key: "dataCoord.compaction.single.ratio.threshold", Version: "2.0.0", DefaultValue: "0.2", + Doc: "The ratio threshold of a segment to trigger a single compaction, default as 0.2", + Export: true, } p.SingleCompactionRatioThreshold.Init(base.mgr) p.SingleCompactionDeltaLogMaxSize = ParamItem{ Key: "dataCoord.compaction.single.deltalog.maxsize", Version: "2.0.0", - DefaultValue: strconv.Itoa(2 * 1024 * 1024), + DefaultValue: "16777216", + Doc: "The deltalog size of a segment to trigger a single compaction, default as 16MB", + Export: true, } p.SingleCompactionDeltaLogMaxSize.Init(base.mgr) @@ -3606,6 +3614,8 @@ During compaction, the size of segment # of rows is able to exceed segment max # Key: "dataCoord.compaction.single.expiredlog.maxsize", Version: "2.0.0", DefaultValue: "10485760", + Doc: "The expired log size of a segment to trigger a compaction, default as 10MB", + Export: true, } p.SingleCompactionExpiredLogMaxSize.Init(base.mgr) @@ -3613,6 +3623,8 @@ During compaction, the size of segment # of rows is able to exceed segment max # Key: "dataCoord.compaction.single.deltalog.maxnum", Version: "2.0.0", DefaultValue: "200", + Doc: "The deltalog count of a segment to trigger a compaction, default as 200", + Export: true, } p.SingleCompactionDeltalogMaxNum.Init(base.mgr) @@ -3620,9 +3632,28 @@ During compaction, the size of segment # of rows is able to exceed segment max # Key: "dataCoord.compaction.global.interval", Version: "2.0.0", DefaultValue: "60", + Doc: "deprecated", } p.GlobalCompactionInterval.Init(base.mgr) + p.MixCompactionTriggerInterval = ParamItem{ + Key: "dataCoord.compaction.mix.triggerInterval", + Version: "2.4.15", + Doc: "The time interval in seconds to trigger mix compaction", + DefaultValue: "60", + Export: true, + } + p.MixCompactionTriggerInterval.Init(base.mgr) + + p.L0CompactionTriggerInterval = ParamItem{ + Key: "dataCoord.compaction.levelzero.triggerInterval", + Version: "2.4.15", + Doc: "The time interval in seconds for trigger L0 compaction", + DefaultValue: "10", + Export: true, + } + p.L0CompactionTriggerInterval.Init(base.mgr) + p.ChannelCheckpointMaxLag = ParamItem{ Key: "dataCoord.compaction.channelMaxCPLag", Version: "2.4.0", diff --git a/pkg/util/paramtable/service_param.go b/pkg/util/paramtable/service_param.go index e5f6c6108cb81..0a6091a8c87a6 100644 --- a/pkg/util/paramtable/service_param.go +++ b/pkg/util/paramtable/service_param.go @@ -465,7 +465,7 @@ func (p *MetaStoreConfig) Init(base *BaseTable) { Key: "metastore.type", Version: "2.2.0", DefaultValue: util.MetaStoreTypeEtcd, - Doc: `Default value: etcd, Valid values: [etcd, tikv] `, + Doc: `Default value: etcd, Valid values: [etcd, tikv]`, Export: true, } p.MetaStoreType.Init(base.mgr) diff --git a/tests/integration/compaction/compaction_test.go b/tests/integration/compaction/compaction_test.go index 2e738e00fb6c8..f38d926bbea88 100644 --- a/tests/integration/compaction/compaction_test.go +++ b/tests/integration/compaction/compaction_test.go @@ -33,13 +33,15 @@ func (s *CompactionSuite) SetupSuite() { s.MiniClusterSuite.SetupSuite() paramtable.Init() - paramtable.Get().Save(paramtable.Get().DataCoordCfg.GlobalCompactionInterval.Key, "1") + paramtable.Get().Save(paramtable.Get().DataCoordCfg.MixCompactionTriggerInterval.Key, "1") + paramtable.Get().Save(paramtable.Get().DataCoordCfg.L0CompactionTriggerInterval.Key, "1") } func (s *CompactionSuite) TearDownSuite() { s.MiniClusterSuite.TearDownSuite() - paramtable.Get().Reset(paramtable.Get().DataCoordCfg.GlobalCompactionInterval.Key) + paramtable.Get().Reset(paramtable.Get().DataCoordCfg.L0CompactionTriggerInterval.Key) + paramtable.Get().Reset(paramtable.Get().DataCoordCfg.MixCompactionTriggerInterval.Key) } func TestCompaction(t *testing.T) { diff --git a/tests/integration/compaction/l2_single_compaction_test.go b/tests/integration/compaction/l2_single_compaction_test.go index 4c32236c94dc6..43a7bea351ff1 100644 --- a/tests/integration/compaction/l2_single_compaction_test.go +++ b/tests/integration/compaction/l2_single_compaction_test.go @@ -273,9 +273,9 @@ func (s *L2SingleCompactionSuite) TestL2SingleCompaction() { func TestL2SingleCompaction(t *testing.T) { paramtable.Init() // to speed up the test - paramtable.Get().Save(paramtable.Get().DataCoordCfg.GlobalCompactionInterval.Key, "10") + paramtable.Get().Save(paramtable.Get().DataCoordCfg.MixCompactionTriggerInterval.Key, "10") paramtable.Get().Save(paramtable.Get().DataCoordCfg.LevelZeroCompactionTriggerDeltalogMinNum.Key, "0") - defer paramtable.Get().Reset(paramtable.Get().DataCoordCfg.GlobalCompactionInterval.Key) + defer paramtable.Get().Reset(paramtable.Get().DataCoordCfg.MixCompactionTriggerInterval.Key) defer paramtable.Get().Reset(paramtable.Get().DataCoordCfg.LevelZeroCompactionTriggerDeltalogMinNum.Key) suite.Run(t, new(L2SingleCompactionSuite))