diff --git a/configs/milvus.yaml b/configs/milvus.yaml index 13a4827ed580f..42555652f26ca 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -550,7 +550,7 @@ dataCoord: # level is prioritized by level: L0 compactions first, then mix compactions, then clustering compactions. # mix is prioritized by level: mix compactions first, then L0 compactions, then clustering compactions. taskPrioritizer: default - taskQueueCapacity: 256 # compaction task queue size + taskQueueCapacity: 100000 # compaction task queue size rpcTimeout: 10 maxParallelTaskNum: 10 mix: diff --git a/internal/datacoord/compaction.go b/internal/datacoord/compaction.go index 45f187bfee005..a25d45a501625 100644 --- a/internal/datacoord/compaction.go +++ b/internal/datacoord/compaction.go @@ -335,15 +335,16 @@ func (c *compactionPlanHandler) loadMeta() { zap.String("state", task.GetState().String())) continue } else { - // TODO: how to deal with the create failed tasks, leave it in meta forever? t, err := c.createCompactTask(task) if err != nil { - log.Warn("compactionPlanHandler loadMeta create compactionTask failed", + log.Info("compactionPlanHandler loadMeta create compactionTask failed, try to clean it", zap.Int64("planID", task.GetPlanID()), zap.String("type", task.GetType().String()), zap.String("state", task.GetState().String()), zap.Error(err), ) + // ignore the drop error + c.meta.DropCompactionTask(task) continue } if t.NeedReAssignNodeID() { diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index db608297839f1..af858cc0f4bba 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -3457,7 +3457,7 @@ mix is prioritized by level: mix compactions first, then L0 compactions, then cl p.CompactionTaskQueueCapacity = ParamItem{ Key: "dataCoord.compaction.taskQueueCapacity", Version: "2.5.0", - DefaultValue: "256", + DefaultValue: "100000", Doc: `compaction task queue size`, Export: true, }