Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: [cherry-pick] compaction task not be cleaned correctly #34766

Merged
merged 1 commit into from
Jul 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions configs/milvus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -458,18 +458,17 @@ dataCoord:
rpcTimeout: 10
maxParallelTaskNum: 10
workerMaxParallelTaskNum: 2
gcInterval: 1800 # The time interval in seconds for compaction gc
dropTolerance: 86400 # Compaction task will be cleaned after finish longer than this time(in seconds)
clustering:
enable: true
autoEnable: false
triggerInterval: 600
stateCheckInterval: 10
gcInterval: 600
minInterval: 3600
maxInterval: 259200
newDataRatioThreshold: 0.2
newDataSizeThreshold: 512m
timeout: 7200
dropTolerance: 86400
# clustering compaction will try best to distribute data into segments with size range in [preferSegmentSize, maxSegmentSize].
# data will be clustered by preferSegmentSize, if a cluster is larger than maxSegmentSize, will spilt it into multi segment
# buffer between (preferSegmentSize, maxSegmentSize) is left for new data in the same cluster(range), to avoid globally redistribute too often
Expand Down
9 changes: 6 additions & 3 deletions internal/datacoord/compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,8 +376,10 @@ func (c *compactionPlanHandler) loopCheck() {
}

func (c *compactionPlanHandler) loopClean() {
interval := Params.DataCoordCfg.CompactionGCIntervalInSeconds.GetAsDuration(time.Second)
log.Info("compactionPlanHandler start clean check loop", zap.Any("gc interval", interval))
defer c.stopWg.Done()
cleanTicker := time.NewTicker(30 * time.Minute)
cleanTicker := time.NewTicker(interval)
defer cleanTicker.Stop()
for {
select {
Expand All @@ -401,10 +403,11 @@ func (c *compactionPlanHandler) cleanCompactionTaskMeta() {
for _, tasks := range triggers {
for _, task := range tasks {
if task.State == datapb.CompactionTaskState_completed || task.State == datapb.CompactionTaskState_cleaned {
duration := time.Since(time.Unix(task.StartTime, 0)).Seconds()
if duration > float64(Params.DataCoordCfg.CompactionDropToleranceInSeconds.GetAsDuration(time.Second)) {
duration := time.Since(time.UnixMilli(task.StartTime)).Seconds()
if duration > float64(Params.DataCoordCfg.CompactionDropToleranceInSeconds.GetAsDuration(time.Second).Seconds()) {
// try best to delete meta
err := c.meta.DropCompactionTask(task)
log.Debug("drop compaction task meta", zap.Int64("planID", task.PlanID))
if err != nil {
log.Warn("fail to drop task", zap.Int64("planID", task.PlanID), zap.Error(err))
}
Expand Down
40 changes: 40 additions & 0 deletions internal/datacoord/compaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,17 @@
package datacoord

import (
"context"
"testing"
"time"

"github.com/cockroachdb/errors"
"github.com/samber/lo"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"

"github.com/milvus-io/milvus/internal/metastore/kv/binlog"
"github.com/milvus-io/milvus/internal/metastore/kv/datacoord"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/pkg/util/metautil"
"github.com/milvus-io/milvus/pkg/util/typeutil"
Expand Down Expand Up @@ -703,6 +706,43 @@ func (s *CompactionPlanHandlerSuite) TestCheckCompaction() {
s.Equal(datapb.CompactionTaskState_executing, t.GetState())
}

func (s *CompactionPlanHandlerSuite) TestCompactionGC() {
s.SetupTest()
inTasks := []*datapb.CompactionTask{
{
PlanID: 1,
Type: datapb.CompactionType_MixCompaction,
State: datapb.CompactionTaskState_completed,
StartTime: time.Now().UnixMilli() - (time.Second * 100000).Milliseconds(),
},
{
PlanID: 2,
Type: datapb.CompactionType_MixCompaction,
State: datapb.CompactionTaskState_cleaned,
StartTime: time.Now().UnixMilli() - (time.Second * 100000).Milliseconds(),
},
{
PlanID: 3,
Type: datapb.CompactionType_MixCompaction,
State: datapb.CompactionTaskState_cleaned,
StartTime: time.Now().UnixMilli(),
},
}

catalog := &datacoord.Catalog{MetaKv: NewMetaMemoryKV()}
compactionTaskMeta, err := newCompactionTaskMeta(context.TODO(), catalog)
s.NoError(err)
s.handler.meta = &meta{compactionTaskMeta: compactionTaskMeta}
for _, t := range inTasks {
s.handler.meta.SaveCompactionTask(t)
}

s.handler.cleanCompactionTaskMeta()
// two task should be cleaned, one remains
tasks := s.handler.meta.GetCompactionTaskMeta().GetCompactionTasks()
s.Equal(1, len(tasks))
}

func (s *CompactionPlanHandlerSuite) TestProcessCompleteCompaction() {
s.SetupTest()

Expand Down
36 changes: 10 additions & 26 deletions pkg/util/paramtable/component_param.go
Original file line number Diff line number Diff line change
Expand Up @@ -2872,6 +2872,7 @@ type dataCoordConfig struct {
SegmentExpansionRate ParamItem `refreshable:"true"`
CompactionTimeoutInSeconds ParamItem `refreshable:"true"`
CompactionDropToleranceInSeconds ParamItem `refreshable:"true"`
CompactionGCIntervalInSeconds ParamItem `refreshable:"true"`
CompactionCheckIntervalInSeconds ParamItem `refreshable:"false"`
SingleCompactionRatioThreshold ParamItem `refreshable:"true"`
SingleCompactionDeltaLogMaxSize ParamItem `refreshable:"true"`
Expand All @@ -2885,12 +2886,9 @@ type dataCoordConfig struct {
ClusteringCompactionEnable ParamItem `refreshable:"true"`
ClusteringCompactionAutoEnable ParamItem `refreshable:"true"`
ClusteringCompactionTriggerInterval ParamItem `refreshable:"true"`
ClusteringCompactionStateCheckInterval ParamItem `refreshable:"true"`
ClusteringCompactionGCInterval ParamItem `refreshable:"true"`
ClusteringCompactionMinInterval ParamItem `refreshable:"true"`
ClusteringCompactionMaxInterval ParamItem `refreshable:"true"`
ClusteringCompactionNewDataSizeThreshold ParamItem `refreshable:"true"`
ClusteringCompactionDropTolerance ParamItem `refreshable:"true"`
ClusteringCompactionPreferSegmentSize ParamItem `refreshable:"true"`
ClusteringCompactionMaxSegmentSize ParamItem `refreshable:"true"`
ClusteringCompactionMaxTrainSizeRatio ParamItem `refreshable:"true"`
Expand Down Expand Up @@ -3193,11 +3191,19 @@ During compaction, the size of segment # of rows is able to exceed segment max #
p.CompactionDropToleranceInSeconds = ParamItem{
Key: "dataCoord.compaction.dropTolerance",
Version: "2.4.2",
Doc: "If compaction job is finished for a long time, gc it",
Doc: "Compaction task will be cleaned after finish longer than this time(in seconds)",
DefaultValue: "86400",
}
p.CompactionDropToleranceInSeconds.Init(base.mgr)

p.CompactionGCIntervalInSeconds = ParamItem{
Key: "dataCoord.compaction.gcInterval",
Version: "2.4.7",
Doc: "The time interval in seconds for compaction gc",
DefaultValue: "1800",
}
p.CompactionGCIntervalInSeconds.Init(base.mgr)

p.CompactionCheckIntervalInSeconds = ParamItem{
Key: "dataCoord.compaction.check.interval",
Version: "2.0.0",
Expand Down Expand Up @@ -3326,20 +3332,6 @@ During compaction, the size of segment # of rows is able to exceed segment max #
}
p.ClusteringCompactionTriggerInterval.Init(base.mgr)

p.ClusteringCompactionStateCheckInterval = ParamItem{
Key: "dataCoord.compaction.clustering.stateCheckInterval",
Version: "2.4.6",
DefaultValue: "10",
}
p.ClusteringCompactionStateCheckInterval.Init(base.mgr)

p.ClusteringCompactionGCInterval = ParamItem{
Key: "dataCoord.compaction.clustering.gcInterval",
Version: "2.4.6",
DefaultValue: "600",
}
p.ClusteringCompactionGCInterval.Init(base.mgr)

p.ClusteringCompactionMinInterval = ParamItem{
Key: "dataCoord.compaction.clustering.minInterval",
Version: "2.4.6",
Expand Down Expand Up @@ -3371,14 +3363,6 @@ During compaction, the size of segment # of rows is able to exceed segment max #
}
p.ClusteringCompactionTimeoutInSeconds.Init(base.mgr)

p.ClusteringCompactionDropTolerance = ParamItem{
Key: "dataCoord.compaction.clustering.dropTolerance",
Version: "2.4.6",
Doc: "If clustering compaction job is finished for a long time, gc it",
DefaultValue: "259200",
}
p.ClusteringCompactionDropTolerance.Init(base.mgr)

p.ClusteringCompactionPreferSegmentSize = ParamItem{
Key: "dataCoord.compaction.clustering.preferSegmentSize",
Version: "2.4.6",
Expand Down
7 changes: 5 additions & 2 deletions pkg/util/paramtable/component_param_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,11 @@ func TestComponentParam(t *testing.T) {
params.Save("datacoord.gracefulStopTimeout", "100")
assert.Equal(t, 100*time.Second, Params.GracefulStopTimeout.GetAsDuration(time.Second))

params.Save("dataCoord.compaction.gcInterval", "100")
assert.Equal(t, float64(100), Params.CompactionGCIntervalInSeconds.GetAsDuration(time.Second).Seconds())
params.Save("dataCoord.compaction.dropTolerance", "100")
assert.Equal(t, float64(100), Params.CompactionDropToleranceInSeconds.GetAsDuration(time.Second).Seconds())

params.Save("dataCoord.compaction.clustering.enable", "true")
assert.Equal(t, true, Params.ClusteringCompactionEnable.GetAsBool())
params.Save("dataCoord.compaction.clustering.newDataSizeThreshold", "10")
Expand All @@ -456,8 +461,6 @@ func TestComponentParam(t *testing.T) {
assert.Equal(t, int64(10*1024*1024), Params.ClusteringCompactionNewDataSizeThreshold.GetAsSize())
params.Save("dataCoord.compaction.clustering.newDataSizeThreshold", "10g")
assert.Equal(t, int64(10*1024*1024*1024), Params.ClusteringCompactionNewDataSizeThreshold.GetAsSize())
params.Save("dataCoord.compaction.clustering.dropTolerance", "86400")
assert.Equal(t, int64(86400), Params.ClusteringCompactionDropTolerance.GetAsInt64())
params.Save("dataCoord.compaction.clustering.maxSegmentSize", "100m")
assert.Equal(t, int64(100*1024*1024), Params.ClusteringCompactionMaxSegmentSize.GetAsSize())
params.Save("dataCoord.compaction.clustering.preferSegmentSize", "10m")
Expand Down
Loading