diff --git a/configs/milvus.yaml b/configs/milvus.yaml index aee48838345a3..d48dba7425e16 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -465,16 +465,15 @@ dataCoord: rpcTimeout: 10 maxParallelTaskNum: 10 workerMaxParallelTaskNum: 2 + dropTolerance: 86400 # Compaction task will be cleaned after finish longer than this time(in seconds) + gcInterval: 1800 # The time interval in seconds for compaction gc clustering: enable: true # Enable clustering compaction autoEnable: false # Enable auto clustering compaction triggerInterval: 600 # clustering compaction trigger interval in seconds - stateCheckInterval: 10 - gcInterval: 600 minInterval: 3600 # The minimum interval between clustering compaction executions of one collection, to avoid redundant compaction maxInterval: 259200 # If a collection haven't been clustering compacted for longer than maxInterval, force compact newDataSizeThreshold: 512m # If new data size is large than newDataSizeThreshold, execute clustering compaction - dropTolerance: 86400 # If clustering compaction job is finished for a long time, gc it preferSegmentSize: 512m maxSegmentSize: 1024m maxTrainSizeRatio: 0.8 # max data size ratio in Kmeans train, if larger than it, will down sampling to meet this limit @@ -862,4 +861,4 @@ trace: #maxMemSize will the whole available GPU memory. gpu: initMemSize: # Gpu Memory Pool init size - maxMemSize: # Gpu Memory Pool Max size \ No newline at end of file + maxMemSize: # Gpu Memory Pool Max size diff --git a/internal/datacoord/compaction.go b/internal/datacoord/compaction.go index 69a0357e6c6f7..118222253f40d 100644 --- a/internal/datacoord/compaction.go +++ b/internal/datacoord/compaction.go @@ -375,8 +375,10 @@ func (c *compactionPlanHandler) loopCheck() { } func (c *compactionPlanHandler) loopClean() { + interval := Params.DataCoordCfg.CompactionGCIntervalInSeconds.GetAsDuration(time.Second) + log.Info("compactionPlanHandler start clean check loop", zap.Any("gc interval", interval)) defer c.stopWg.Done() - cleanTicker := time.NewTicker(30 * time.Minute) + cleanTicker := time.NewTicker(interval) defer cleanTicker.Stop() for { select { @@ -400,10 +402,11 @@ func (c *compactionPlanHandler) cleanCompactionTaskMeta() { for _, tasks := range triggers { for _, task := range tasks { if task.State == datapb.CompactionTaskState_completed || task.State == datapb.CompactionTaskState_cleaned { - duration := time.Since(time.Unix(task.StartTime, 0)).Seconds() - if duration > float64(Params.DataCoordCfg.CompactionDropToleranceInSeconds.GetAsDuration(time.Second)) { + duration := time.Since(time.UnixMilli(task.StartTime)).Seconds() + if duration > float64(Params.DataCoordCfg.CompactionDropToleranceInSeconds.GetAsDuration(time.Second).Seconds()) { // try best to delete meta err := c.meta.DropCompactionTask(task) + log.Debug("drop compaction task meta", zap.Int64("planID", task.PlanID)) if err != nil { log.Warn("fail to drop task", zap.Int64("planID", task.PlanID), zap.Error(err)) } diff --git a/internal/datacoord/compaction_test.go b/internal/datacoord/compaction_test.go index c2d2da70d2122..00829953f7d72 100644 --- a/internal/datacoord/compaction_test.go +++ b/internal/datacoord/compaction_test.go @@ -17,7 +17,9 @@ package datacoord import ( + "context" "testing" + "time" "github.com/cockroachdb/errors" "github.com/samber/lo" @@ -25,6 +27,7 @@ import ( "github.com/stretchr/testify/suite" "github.com/milvus-io/milvus/internal/metastore/kv/binlog" + "github.com/milvus-io/milvus/internal/metastore/kv/datacoord" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/pkg/util/metautil" "github.com/milvus-io/milvus/pkg/util/typeutil" @@ -759,6 +762,43 @@ func (s *CompactionPlanHandlerSuite) TestCheckCompaction() { s.Equal(datapb.CompactionTaskState_executing, t.GetState()) } +func (s *CompactionPlanHandlerSuite) TestCompactionGC() { + s.SetupTest() + inTasks := []*datapb.CompactionTask{ + { + PlanID: 1, + Type: datapb.CompactionType_MixCompaction, + State: datapb.CompactionTaskState_completed, + StartTime: time.Now().UnixMilli() - (time.Second * 100000).Milliseconds(), + }, + { + PlanID: 2, + Type: datapb.CompactionType_MixCompaction, + State: datapb.CompactionTaskState_cleaned, + StartTime: time.Now().UnixMilli() - (time.Second * 100000).Milliseconds(), + }, + { + PlanID: 3, + Type: datapb.CompactionType_MixCompaction, + State: datapb.CompactionTaskState_cleaned, + StartTime: time.Now().UnixMilli(), + }, + } + + catalog := &datacoord.Catalog{MetaKv: NewMetaMemoryKV()} + compactionTaskMeta, err := newCompactionTaskMeta(context.TODO(), catalog) + s.NoError(err) + s.handler.meta = &meta{compactionTaskMeta: compactionTaskMeta} + for _, t := range inTasks { + s.handler.meta.SaveCompactionTask(t) + } + + s.handler.cleanCompactionTaskMeta() + // two task should be cleaned, one remains + tasks := s.handler.meta.GetCompactionTaskMeta().GetCompactionTasks() + s.Equal(1, len(tasks)) +} + func (s *CompactionPlanHandlerSuite) TestProcessCompleteCompaction() { s.SetupTest() diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index 4f597a12f1e16..0161eb27700f6 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -2966,6 +2966,7 @@ type dataCoordConfig struct { SegmentExpansionRate ParamItem `refreshable:"true"` CompactionTimeoutInSeconds ParamItem `refreshable:"true"` CompactionDropToleranceInSeconds ParamItem `refreshable:"true"` + CompactionGCIntervalInSeconds ParamItem `refreshable:"true"` CompactionCheckIntervalInSeconds ParamItem `refreshable:"false"` SingleCompactionRatioThreshold ParamItem `refreshable:"true"` SingleCompactionDeltaLogMaxSize ParamItem `refreshable:"true"` @@ -2979,12 +2980,9 @@ type dataCoordConfig struct { ClusteringCompactionEnable ParamItem `refreshable:"true"` ClusteringCompactionAutoEnable ParamItem `refreshable:"true"` ClusteringCompactionTriggerInterval ParamItem `refreshable:"true"` - ClusteringCompactionStateCheckInterval ParamItem `refreshable:"true"` - ClusteringCompactionGCInterval ParamItem `refreshable:"true"` ClusteringCompactionMinInterval ParamItem `refreshable:"true"` ClusteringCompactionMaxInterval ParamItem `refreshable:"true"` ClusteringCompactionNewDataSizeThreshold ParamItem `refreshable:"true"` - ClusteringCompactionDropTolerance ParamItem `refreshable:"true"` ClusteringCompactionPreferSegmentSize ParamItem `refreshable:"true"` ClusteringCompactionMaxSegmentSize ParamItem `refreshable:"true"` ClusteringCompactionMaxTrainSizeRatio ParamItem `refreshable:"true"` @@ -3301,11 +3299,21 @@ During compaction, the size of segment # of rows is able to exceed segment max # p.CompactionDropToleranceInSeconds = ParamItem{ Key: "dataCoord.compaction.dropTolerance", Version: "2.4.2", - Doc: "If compaction job is finished for a long time, gc it", + Doc: "Compaction task will be cleaned after finish longer than this time(in seconds)", DefaultValue: "86400", + Export: true, } p.CompactionDropToleranceInSeconds.Init(base.mgr) + p.CompactionGCIntervalInSeconds = ParamItem{ + Key: "dataCoord.compaction.gcInterval", + Version: "2.4.7", + Doc: "The time interval in seconds for compaction gc", + DefaultValue: "1800", + Export: true, + } + p.CompactionGCIntervalInSeconds.Init(base.mgr) + p.CompactionCheckIntervalInSeconds = ParamItem{ Key: "dataCoord.compaction.check.interval", Version: "2.0.0", @@ -3437,22 +3445,6 @@ During compaction, the size of segment # of rows is able to exceed segment max # } p.ClusteringCompactionTriggerInterval.Init(base.mgr) - p.ClusteringCompactionStateCheckInterval = ParamItem{ - Key: "dataCoord.compaction.clustering.stateCheckInterval", - Version: "2.4.6", - DefaultValue: "10", - Export: true, - } - p.ClusteringCompactionStateCheckInterval.Init(base.mgr) - - p.ClusteringCompactionGCInterval = ParamItem{ - Key: "dataCoord.compaction.clustering.gcInterval", - Version: "2.4.6", - DefaultValue: "600", - Export: true, - } - p.ClusteringCompactionGCInterval.Init(base.mgr) - p.ClusteringCompactionMinInterval = ParamItem{ Key: "dataCoord.compaction.clustering.minInterval", Version: "2.4.6", @@ -3487,15 +3479,6 @@ During compaction, the size of segment # of rows is able to exceed segment max # } p.ClusteringCompactionTimeoutInSeconds.Init(base.mgr) - p.ClusteringCompactionDropTolerance = ParamItem{ - Key: "dataCoord.compaction.clustering.dropTolerance", - Version: "2.4.6", - Doc: "If clustering compaction job is finished for a long time, gc it", - DefaultValue: "259200", - Export: true, - } - p.ClusteringCompactionDropTolerance.Init(base.mgr) - p.ClusteringCompactionPreferSegmentSize = ParamItem{ Key: "dataCoord.compaction.clustering.preferSegmentSize", Version: "2.4.6", diff --git a/pkg/util/paramtable/component_param_test.go b/pkg/util/paramtable/component_param_test.go index 263791a16157d..b3bacdf3e041c 100644 --- a/pkg/util/paramtable/component_param_test.go +++ b/pkg/util/paramtable/component_param_test.go @@ -468,6 +468,11 @@ func TestComponentParam(t *testing.T) { params.Save("datacoord.gracefulStopTimeout", "100") assert.Equal(t, 100*time.Second, Params.GracefulStopTimeout.GetAsDuration(time.Second)) + params.Save("dataCoord.compaction.gcInterval", "100") + assert.Equal(t, float64(100), Params.CompactionGCIntervalInSeconds.GetAsDuration(time.Second).Seconds()) + params.Save("dataCoord.compaction.dropTolerance", "100") + assert.Equal(t, float64(100), Params.CompactionDropToleranceInSeconds.GetAsDuration(time.Second).Seconds()) + params.Save("dataCoord.compaction.clustering.enable", "true") assert.Equal(t, true, Params.ClusteringCompactionEnable.GetAsBool()) params.Save("dataCoord.compaction.clustering.newDataSizeThreshold", "10") @@ -478,8 +483,6 @@ func TestComponentParam(t *testing.T) { assert.Equal(t, int64(10*1024*1024), Params.ClusteringCompactionNewDataSizeThreshold.GetAsSize()) params.Save("dataCoord.compaction.clustering.newDataSizeThreshold", "10g") assert.Equal(t, int64(10*1024*1024*1024), Params.ClusteringCompactionNewDataSizeThreshold.GetAsSize()) - params.Save("dataCoord.compaction.clustering.dropTolerance", "86400") - assert.Equal(t, int64(86400), Params.ClusteringCompactionDropTolerance.GetAsInt64()) params.Save("dataCoord.compaction.clustering.maxSegmentSize", "100m") assert.Equal(t, int64(100*1024*1024), Params.ClusteringCompactionMaxSegmentSize.GetAsSize()) params.Save("dataCoord.compaction.clustering.preferSegmentSize", "10m")