Skip to content

Commit

Permalink
fix: compaction task not be cleaned correctly
Browse files Browse the repository at this point in the history
1.fix compaction task not be cleaned correctly
2.add a new parameter to control compaction gc loop interval
3.remove some useless configs of clustering compaction

Signed-off-by: wayblink <[email protected]>
  • Loading branch information
wayblink committed Jul 17, 2024
1 parent ca758c3 commit 17db5ff
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 34 deletions.
5 changes: 2 additions & 3 deletions configs/milvus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -460,18 +460,17 @@ dataCoord:
rpcTimeout: 10
maxParallelTaskNum: 10
workerMaxParallelTaskNum: 2
gcInterval: 1800 # The time interval in seconds for compaction gc
dropTolerance: 86400 # Compaction task will be cleaned after finish longer than this time(in seconds)
clustering:
enable: true
autoEnable: false
triggerInterval: 600
stateCheckInterval: 10
gcInterval: 600
minInterval: 3600
maxInterval: 259200
newDataRatioThreshold: 0.2
newDataSizeThreshold: 512m
timeout: 7200
dropTolerance: 86400
# clustering compaction will try best to distribute data into segments with size range in [preferSegmentSize, maxSegmentSize].
# data will be clustered by preferSegmentSize, if a cluster is larger than maxSegmentSize, will spilt it into multi segment
# buffer between (preferSegmentSize, maxSegmentSize) is left for new data in the same cluster(range), to avoid globally redistribute too often
Expand Down
9 changes: 6 additions & 3 deletions internal/datacoord/compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,8 +375,10 @@ func (c *compactionPlanHandler) loopCheck() {
}

func (c *compactionPlanHandler) loopClean() {
interval := Params.DataCoordCfg.CompactionGCIntervalInSeconds.GetAsDuration(time.Second)
log.Info("compactionPlanHandler start clean check loop", zap.Any("gc interval", interval))
defer c.stopWg.Done()
cleanTicker := time.NewTicker(30 * time.Minute)
cleanTicker := time.NewTicker(interval)
defer cleanTicker.Stop()
for {
select {
Expand All @@ -400,10 +402,11 @@ func (c *compactionPlanHandler) cleanCompactionTaskMeta() {
for _, tasks := range triggers {
for _, task := range tasks {
if task.State == datapb.CompactionTaskState_completed || task.State == datapb.CompactionTaskState_cleaned {
duration := time.Since(time.Unix(task.StartTime, 0)).Seconds()
if duration > float64(Params.DataCoordCfg.CompactionDropToleranceInSeconds.GetAsDuration(time.Second)) {
duration := time.Since(time.UnixMilli(task.StartTime)).Seconds()
if duration > float64(Params.DataCoordCfg.CompactionDropToleranceInSeconds.GetAsDuration(time.Second).Seconds()) {
// try best to delete meta
err := c.meta.DropCompactionTask(task)
log.Debug("drop compaction task meta", zap.Int64("planID", task.PlanID))
if err != nil {
log.Warn("fail to drop task", zap.Int64("planID", task.PlanID), zap.Error(err))
}
Expand Down
40 changes: 40 additions & 0 deletions internal/datacoord/compaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,17 @@
package datacoord

import (
"context"
"testing"
"time"

"github.com/cockroachdb/errors"
"github.com/samber/lo"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"

"github.com/milvus-io/milvus/internal/metastore/kv/binlog"
"github.com/milvus-io/milvus/internal/metastore/kv/datacoord"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/pkg/util/metautil"
"github.com/milvus-io/milvus/pkg/util/typeutil"
Expand Down Expand Up @@ -703,6 +706,43 @@ func (s *CompactionPlanHandlerSuite) TestCheckCompaction() {
s.Equal(datapb.CompactionTaskState_executing, t.GetState())
}

func (s *CompactionPlanHandlerSuite) TestCompactionGC() {
s.SetupTest()
inTasks := []*datapb.CompactionTask{
{
PlanID: 1,
Type: datapb.CompactionType_MixCompaction,
State: datapb.CompactionTaskState_completed,
StartTime: time.Now().UnixMilli() - (time.Second * 100000).Milliseconds(),
},
{
PlanID: 2,
Type: datapb.CompactionType_MixCompaction,
State: datapb.CompactionTaskState_cleaned,
StartTime: time.Now().UnixMilli() - (time.Second * 100000).Milliseconds(),
},
{
PlanID: 3,
Type: datapb.CompactionType_MixCompaction,
State: datapb.CompactionTaskState_cleaned,
StartTime: time.Now().UnixMilli(),
},
}

catalog := &datacoord.Catalog{MetaKv: NewMetaMemoryKV()}
compactionTaskMeta, err := newCompactionTaskMeta(context.TODO(), catalog)
s.NoError(err)
s.handler.meta = &meta{compactionTaskMeta: compactionTaskMeta}
for _, t := range inTasks {
s.handler.meta.SaveCompactionTask(t)
}

s.handler.cleanCompactionTaskMeta()
// two task should be cleaned, one remains
tasks := s.handler.meta.GetCompactionTaskMeta().GetCompactionTasks()
s.Equal(1, len(tasks))
}

func (s *CompactionPlanHandlerSuite) TestProcessCompleteCompaction() {
s.SetupTest()

Expand Down
36 changes: 10 additions & 26 deletions pkg/util/paramtable/component_param.go
Original file line number Diff line number Diff line change
Expand Up @@ -2889,6 +2889,7 @@ type dataCoordConfig struct {
SegmentExpansionRate ParamItem `refreshable:"true"`
CompactionTimeoutInSeconds ParamItem `refreshable:"true"`
CompactionDropToleranceInSeconds ParamItem `refreshable:"true"`
CompactionGCIntervalInSeconds ParamItem `refreshable:"true"`
CompactionCheckIntervalInSeconds ParamItem `refreshable:"false"`
SingleCompactionRatioThreshold ParamItem `refreshable:"true"`
SingleCompactionDeltaLogMaxSize ParamItem `refreshable:"true"`
Expand All @@ -2902,12 +2903,9 @@ type dataCoordConfig struct {
ClusteringCompactionEnable ParamItem `refreshable:"true"`
ClusteringCompactionAutoEnable ParamItem `refreshable:"true"`
ClusteringCompactionTriggerInterval ParamItem `refreshable:"true"`
ClusteringCompactionStateCheckInterval ParamItem `refreshable:"true"`
ClusteringCompactionGCInterval ParamItem `refreshable:"true"`
ClusteringCompactionMinInterval ParamItem `refreshable:"true"`
ClusteringCompactionMaxInterval ParamItem `refreshable:"true"`
ClusteringCompactionNewDataSizeThreshold ParamItem `refreshable:"true"`
ClusteringCompactionDropTolerance ParamItem `refreshable:"true"`
ClusteringCompactionPreferSegmentSize ParamItem `refreshable:"true"`
ClusteringCompactionMaxSegmentSize ParamItem `refreshable:"true"`
ClusteringCompactionMaxTrainSizeRatio ParamItem `refreshable:"true"`
Expand Down Expand Up @@ -3210,11 +3208,19 @@ During compaction, the size of segment # of rows is able to exceed segment max #
p.CompactionDropToleranceInSeconds = ParamItem{
Key: "dataCoord.compaction.dropTolerance",
Version: "2.4.2",
Doc: "If compaction job is finished for a long time, gc it",
Doc: "Compaction task will be cleaned after finish longer than this time(in seconds)",
DefaultValue: "86400",
}
p.CompactionDropToleranceInSeconds.Init(base.mgr)

p.CompactionGCIntervalInSeconds = ParamItem{
Key: "dataCoord.compaction.gcInterval",
Version: "2.4.7",
Doc: "The time interval in seconds for compaction gc",
DefaultValue: "1800",
}
p.CompactionGCIntervalInSeconds.Init(base.mgr)

p.CompactionCheckIntervalInSeconds = ParamItem{
Key: "dataCoord.compaction.check.interval",
Version: "2.0.0",
Expand Down Expand Up @@ -3343,20 +3349,6 @@ During compaction, the size of segment # of rows is able to exceed segment max #
}
p.ClusteringCompactionTriggerInterval.Init(base.mgr)

p.ClusteringCompactionStateCheckInterval = ParamItem{
Key: "dataCoord.compaction.clustering.stateCheckInterval",
Version: "2.4.6",
DefaultValue: "10",
}
p.ClusteringCompactionStateCheckInterval.Init(base.mgr)

p.ClusteringCompactionGCInterval = ParamItem{
Key: "dataCoord.compaction.clustering.gcInterval",
Version: "2.4.6",
DefaultValue: "600",
}
p.ClusteringCompactionGCInterval.Init(base.mgr)

p.ClusteringCompactionMinInterval = ParamItem{
Key: "dataCoord.compaction.clustering.minInterval",
Version: "2.4.6",
Expand Down Expand Up @@ -3388,14 +3380,6 @@ During compaction, the size of segment # of rows is able to exceed segment max #
}
p.ClusteringCompactionTimeoutInSeconds.Init(base.mgr)

p.ClusteringCompactionDropTolerance = ParamItem{
Key: "dataCoord.compaction.clustering.dropTolerance",
Version: "2.4.6",
Doc: "If clustering compaction job is finished for a long time, gc it",
DefaultValue: "259200",
}
p.ClusteringCompactionDropTolerance.Init(base.mgr)

p.ClusteringCompactionPreferSegmentSize = ParamItem{
Key: "dataCoord.compaction.clustering.preferSegmentSize",
Version: "2.4.6",
Expand Down
7 changes: 5 additions & 2 deletions pkg/util/paramtable/component_param_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,11 @@ func TestComponentParam(t *testing.T) {
params.Save("datacoord.gracefulStopTimeout", "100")
assert.Equal(t, 100*time.Second, Params.GracefulStopTimeout.GetAsDuration(time.Second))

params.Save("dataCoord.compaction.gcInterval", "100")
assert.Equal(t, float64(100), Params.CompactionGCIntervalInSeconds.GetAsDuration(time.Second).Seconds())
params.Save("dataCoord.compaction.dropTolerance", "100")
assert.Equal(t, float64(100), Params.CompactionDropToleranceInSeconds.GetAsDuration(time.Second).Seconds())

params.Save("dataCoord.compaction.clustering.enable", "true")
assert.Equal(t, true, Params.ClusteringCompactionEnable.GetAsBool())
params.Save("dataCoord.compaction.clustering.newDataSizeThreshold", "10")
Expand All @@ -461,8 +466,6 @@ func TestComponentParam(t *testing.T) {
assert.Equal(t, int64(10*1024*1024), Params.ClusteringCompactionNewDataSizeThreshold.GetAsSize())
params.Save("dataCoord.compaction.clustering.newDataSizeThreshold", "10g")
assert.Equal(t, int64(10*1024*1024*1024), Params.ClusteringCompactionNewDataSizeThreshold.GetAsSize())
params.Save("dataCoord.compaction.clustering.dropTolerance", "86400")
assert.Equal(t, int64(86400), Params.ClusteringCompactionDropTolerance.GetAsInt64())
params.Save("dataCoord.compaction.clustering.maxSegmentSize", "100m")
assert.Equal(t, int64(100*1024*1024), Params.ClusteringCompactionMaxSegmentSize.GetAsSize())
params.Save("dataCoord.compaction.clustering.preferSegmentSize", "10m")
Expand Down

0 comments on commit 17db5ff

Please sign in to comment.