diff --git a/configs/milvus.yaml b/configs/milvus.yaml index f5b153f0f4b0b..c1159138b1114 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -458,18 +458,17 @@ dataCoord: rpcTimeout: 10 maxParallelTaskNum: 10 workerMaxParallelTaskNum: 2 + gcInterval: 1800 # The time interval in seconds for compaction gc + dropTolerance: 86400 # Compaction task will be cleaned after finish longer than this time(in seconds) clustering: enable: true autoEnable: false triggerInterval: 600 - stateCheckInterval: 10 - gcInterval: 600 minInterval: 3600 maxInterval: 259200 newDataRatioThreshold: 0.2 newDataSizeThreshold: 512m timeout: 7200 - dropTolerance: 86400 # clustering compaction will try best to distribute data into segments with size range in [preferSegmentSize, maxSegmentSize]. # data will be clustered by preferSegmentSize, if a cluster is larger than maxSegmentSize, will spilt it into multi segment # buffer between (preferSegmentSize, maxSegmentSize) is left for new data in the same cluster(range), to avoid globally redistribute too often diff --git a/internal/datacoord/compaction.go b/internal/datacoord/compaction.go index aef155577beb9..1412e5d11c875 100644 --- a/internal/datacoord/compaction.go +++ b/internal/datacoord/compaction.go @@ -376,8 +376,10 @@ func (c *compactionPlanHandler) loopCheck() { } func (c *compactionPlanHandler) loopClean() { + interval := Params.DataCoordCfg.CompactionGCIntervalInSeconds.GetAsDuration(time.Second) + log.Info("compactionPlanHandler start clean check loop", zap.Any("gc interval", interval)) defer c.stopWg.Done() - cleanTicker := time.NewTicker(30 * time.Minute) + cleanTicker := time.NewTicker(interval) defer cleanTicker.Stop() for { select { @@ -401,10 +403,11 @@ func (c *compactionPlanHandler) cleanCompactionTaskMeta() { for _, tasks := range triggers { for _, task := range tasks { if task.State == datapb.CompactionTaskState_completed || task.State == datapb.CompactionTaskState_cleaned { - duration := time.Since(time.Unix(task.StartTime, 0)).Seconds() - if duration > float64(Params.DataCoordCfg.CompactionDropToleranceInSeconds.GetAsDuration(time.Second)) { + duration := time.Since(time.UnixMilli(task.StartTime)).Seconds() + if duration > float64(Params.DataCoordCfg.CompactionDropToleranceInSeconds.GetAsDuration(time.Second).Seconds()) { // try best to delete meta err := c.meta.DropCompactionTask(task) + log.Debug("drop compaction task meta", zap.Int64("planID", task.PlanID)) if err != nil { log.Warn("fail to drop task", zap.Int64("planID", task.PlanID), zap.Error(err)) } diff --git a/internal/datacoord/compaction_test.go b/internal/datacoord/compaction_test.go index 12baa0f34fb89..c93ce94a722d5 100644 --- a/internal/datacoord/compaction_test.go +++ b/internal/datacoord/compaction_test.go @@ -17,7 +17,9 @@ package datacoord import ( + "context" "testing" + "time" "github.com/cockroachdb/errors" "github.com/samber/lo" @@ -25,6 +27,7 @@ import ( "github.com/stretchr/testify/suite" "github.com/milvus-io/milvus/internal/metastore/kv/binlog" + "github.com/milvus-io/milvus/internal/metastore/kv/datacoord" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/pkg/util/metautil" "github.com/milvus-io/milvus/pkg/util/typeutil" @@ -703,6 +706,43 @@ func (s *CompactionPlanHandlerSuite) TestCheckCompaction() { s.Equal(datapb.CompactionTaskState_executing, t.GetState()) } +func (s *CompactionPlanHandlerSuite) TestCompactionGC() { + s.SetupTest() + inTasks := []*datapb.CompactionTask{ + { + PlanID: 1, + Type: datapb.CompactionType_MixCompaction, + State: datapb.CompactionTaskState_completed, + StartTime: time.Now().UnixMilli() - (time.Second * 100000).Milliseconds(), + }, + { + PlanID: 2, + Type: datapb.CompactionType_MixCompaction, + State: datapb.CompactionTaskState_cleaned, + StartTime: time.Now().UnixMilli() - (time.Second * 100000).Milliseconds(), + }, + { + PlanID: 3, + Type: datapb.CompactionType_MixCompaction, + State: datapb.CompactionTaskState_cleaned, + StartTime: time.Now().UnixMilli(), + }, + } + + catalog := &datacoord.Catalog{MetaKv: NewMetaMemoryKV()} + compactionTaskMeta, err := newCompactionTaskMeta(context.TODO(), catalog) + s.NoError(err) + s.handler.meta = &meta{compactionTaskMeta: compactionTaskMeta} + for _, t := range inTasks { + s.handler.meta.SaveCompactionTask(t) + } + + s.handler.cleanCompactionTaskMeta() + // two task should be cleaned, one remains + tasks := s.handler.meta.GetCompactionTaskMeta().GetCompactionTasks() + s.Equal(1, len(tasks)) +} + func (s *CompactionPlanHandlerSuite) TestProcessCompleteCompaction() { s.SetupTest() diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index 09d717b1fcb15..a320934273e37 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -2872,6 +2872,7 @@ type dataCoordConfig struct { SegmentExpansionRate ParamItem `refreshable:"true"` CompactionTimeoutInSeconds ParamItem `refreshable:"true"` CompactionDropToleranceInSeconds ParamItem `refreshable:"true"` + CompactionGCIntervalInSeconds ParamItem `refreshable:"true"` CompactionCheckIntervalInSeconds ParamItem `refreshable:"false"` SingleCompactionRatioThreshold ParamItem `refreshable:"true"` SingleCompactionDeltaLogMaxSize ParamItem `refreshable:"true"` @@ -2885,12 +2886,9 @@ type dataCoordConfig struct { ClusteringCompactionEnable ParamItem `refreshable:"true"` ClusteringCompactionAutoEnable ParamItem `refreshable:"true"` ClusteringCompactionTriggerInterval ParamItem `refreshable:"true"` - ClusteringCompactionStateCheckInterval ParamItem `refreshable:"true"` - ClusteringCompactionGCInterval ParamItem `refreshable:"true"` ClusteringCompactionMinInterval ParamItem `refreshable:"true"` ClusteringCompactionMaxInterval ParamItem `refreshable:"true"` ClusteringCompactionNewDataSizeThreshold ParamItem `refreshable:"true"` - ClusteringCompactionDropTolerance ParamItem `refreshable:"true"` ClusteringCompactionPreferSegmentSize ParamItem `refreshable:"true"` ClusteringCompactionMaxSegmentSize ParamItem `refreshable:"true"` ClusteringCompactionMaxTrainSizeRatio ParamItem `refreshable:"true"` @@ -3193,11 +3191,19 @@ During compaction, the size of segment # of rows is able to exceed segment max # p.CompactionDropToleranceInSeconds = ParamItem{ Key: "dataCoord.compaction.dropTolerance", Version: "2.4.2", - Doc: "If compaction job is finished for a long time, gc it", + Doc: "Compaction task will be cleaned after finish longer than this time(in seconds)", DefaultValue: "86400", } p.CompactionDropToleranceInSeconds.Init(base.mgr) + p.CompactionGCIntervalInSeconds = ParamItem{ + Key: "dataCoord.compaction.gcInterval", + Version: "2.4.7", + Doc: "The time interval in seconds for compaction gc", + DefaultValue: "1800", + } + p.CompactionGCIntervalInSeconds.Init(base.mgr) + p.CompactionCheckIntervalInSeconds = ParamItem{ Key: "dataCoord.compaction.check.interval", Version: "2.0.0", @@ -3326,20 +3332,6 @@ During compaction, the size of segment # of rows is able to exceed segment max # } p.ClusteringCompactionTriggerInterval.Init(base.mgr) - p.ClusteringCompactionStateCheckInterval = ParamItem{ - Key: "dataCoord.compaction.clustering.stateCheckInterval", - Version: "2.4.6", - DefaultValue: "10", - } - p.ClusteringCompactionStateCheckInterval.Init(base.mgr) - - p.ClusteringCompactionGCInterval = ParamItem{ - Key: "dataCoord.compaction.clustering.gcInterval", - Version: "2.4.6", - DefaultValue: "600", - } - p.ClusteringCompactionGCInterval.Init(base.mgr) - p.ClusteringCompactionMinInterval = ParamItem{ Key: "dataCoord.compaction.clustering.minInterval", Version: "2.4.6", @@ -3371,14 +3363,6 @@ During compaction, the size of segment # of rows is able to exceed segment max # } p.ClusteringCompactionTimeoutInSeconds.Init(base.mgr) - p.ClusteringCompactionDropTolerance = ParamItem{ - Key: "dataCoord.compaction.clustering.dropTolerance", - Version: "2.4.6", - Doc: "If clustering compaction job is finished for a long time, gc it", - DefaultValue: "259200", - } - p.ClusteringCompactionDropTolerance.Init(base.mgr) - p.ClusteringCompactionPreferSegmentSize = ParamItem{ Key: "dataCoord.compaction.clustering.preferSegmentSize", Version: "2.4.6", diff --git a/pkg/util/paramtable/component_param_test.go b/pkg/util/paramtable/component_param_test.go index 5489a9f620c3d..0f695eeae967a 100644 --- a/pkg/util/paramtable/component_param_test.go +++ b/pkg/util/paramtable/component_param_test.go @@ -446,6 +446,11 @@ func TestComponentParam(t *testing.T) { params.Save("datacoord.gracefulStopTimeout", "100") assert.Equal(t, 100*time.Second, Params.GracefulStopTimeout.GetAsDuration(time.Second)) + params.Save("dataCoord.compaction.gcInterval", "100") + assert.Equal(t, float64(100), Params.CompactionGCIntervalInSeconds.GetAsDuration(time.Second).Seconds()) + params.Save("dataCoord.compaction.dropTolerance", "100") + assert.Equal(t, float64(100), Params.CompactionDropToleranceInSeconds.GetAsDuration(time.Second).Seconds()) + params.Save("dataCoord.compaction.clustering.enable", "true") assert.Equal(t, true, Params.ClusteringCompactionEnable.GetAsBool()) params.Save("dataCoord.compaction.clustering.newDataSizeThreshold", "10") @@ -456,8 +461,6 @@ func TestComponentParam(t *testing.T) { assert.Equal(t, int64(10*1024*1024), Params.ClusteringCompactionNewDataSizeThreshold.GetAsSize()) params.Save("dataCoord.compaction.clustering.newDataSizeThreshold", "10g") assert.Equal(t, int64(10*1024*1024*1024), Params.ClusteringCompactionNewDataSizeThreshold.GetAsSize()) - params.Save("dataCoord.compaction.clustering.dropTolerance", "86400") - assert.Equal(t, int64(86400), Params.ClusteringCompactionDropTolerance.GetAsInt64()) params.Save("dataCoord.compaction.clustering.maxSegmentSize", "100m") assert.Equal(t, int64(100*1024*1024), Params.ClusteringCompactionMaxSegmentSize.GetAsSize()) params.Save("dataCoord.compaction.clustering.preferSegmentSize", "10m")