diff --git a/configs/milvus.yaml b/configs/milvus.yaml index 6d0807e9ef817..f67fbb16be318 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -550,6 +550,7 @@ dataCoord: # This configuration takes effect only when dataCoord.enableCompaction is set as true. enableAutoCompaction: true indexBasedCompaction: true + taskPrioritizer: default # compaction task prioritizer, options: [default, level]. Default is FIFO, level is prioritized by level: L0 compactions first, then mix compactions, then major compactions. rpcTimeout: 10 maxParallelTaskNum: 10 workerMaxParallelTaskNum: 2 diff --git a/internal/datacoord/compaction.go b/internal/datacoord/compaction.go index ae49fa066dd2b..375fd0d8a4c69 100644 --- a/internal/datacoord/compaction.go +++ b/internal/datacoord/compaction.go @@ -24,9 +24,7 @@ import ( "time" "github.com/cockroachdb/errors" - "github.com/samber/lo" "go.opentelemetry.io/otel" - "go.uber.org/atomic" "go.uber.org/zap" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" @@ -64,7 +62,6 @@ type compactionPlanContext interface { var ( errChannelNotWatched = errors.New("channel is not watched") errChannelInBuffer = errors.New("channel is in buffer") - errCompactionBusy = errors.New("compaction task queue is full") ) var _ compactionPlanContext = (*compactionPlanHandler)(nil) @@ -79,8 +76,7 @@ type compactionInfo struct { } type compactionPlanHandler struct { - queueGuard lock.RWMutex - queueTasks map[int64]CompactionTask // planID -> task + queueTasks CompactionQueue executingGuard lock.RWMutex executingTasks map[int64]CompactionTask // planID -> task @@ -96,8 +92,6 @@ type compactionPlanHandler struct { stopCh chan struct{} stopOnce sync.Once stopWg sync.WaitGroup - - taskNumber *atomic.Int32 } func (c *compactionPlanHandler) getCompactionInfo(triggerID int64) *compactionInfo { @@ -168,13 +162,11 @@ func summaryCompactionState(tasks []*datapb.CompactionTask) *compactionInfo { func (c *compactionPlanHandler) getCompactionTasksNumBySignalID(triggerID int64) int { cnt := 0 - c.queueGuard.RLock() - for _, t := range c.queueTasks { - if t.GetTriggerID() == triggerID { + c.queueTasks.ForEach(func(ct CompactionTask) { + if ct.GetTriggerID() == triggerID { cnt += 1 } - } - c.queueGuard.RUnlock() + }) c.executingGuard.RLock() for _, t := range c.executingTasks { if t.GetTriggerID() == triggerID { @@ -185,10 +177,11 @@ func (c *compactionPlanHandler) getCompactionTasksNumBySignalID(triggerID int64) return cnt } -func newCompactionPlanHandler(cluster Cluster, sessions session.DataNodeManager, cm ChannelManager, meta CompactionMeta, allocator allocator.Allocator, analyzeScheduler *taskScheduler, handler Handler, +func newCompactionPlanHandler(cluster Cluster, sessions session.DataNodeManager, cm ChannelManager, meta CompactionMeta, + allocator allocator.Allocator, analyzeScheduler *taskScheduler, handler Handler, ) *compactionPlanHandler { return &compactionPlanHandler{ - queueTasks: make(map[int64]CompactionTask), + queueTasks: *NewCompactionQueue(256, getPrioritizer()), // Higher capacity will have better ordering in priority, but consumes more memory. chManager: cm, meta: meta, sessions: sessions, @@ -196,20 +189,12 @@ func newCompactionPlanHandler(cluster Cluster, sessions session.DataNodeManager, stopCh: make(chan struct{}), cluster: cluster, executingTasks: make(map[int64]CompactionTask), - taskNumber: atomic.NewInt32(0), analyzeScheduler: analyzeScheduler, handler: handler, } } func (c *compactionPlanHandler) schedule() []CompactionTask { - c.queueGuard.RLock() - if len(c.queueTasks) == 0 { - c.queueGuard.RUnlock() - return nil - } - c.queueGuard.RUnlock() - l0ChannelExcludes := typeutil.NewSet[string]() mixChannelExcludes := typeutil.NewSet[string]() clusterChannelExcludes := typeutil.NewSet[string]() @@ -231,42 +216,66 @@ func (c *compactionPlanHandler) schedule() []CompactionTask { } c.executingGuard.RUnlock() - var picked []CompactionTask - c.queueGuard.RLock() - defer c.queueGuard.RUnlock() - keys := lo.Keys(c.queueTasks) - sort.SliceStable(keys, func(i, j int) bool { - return keys[i] < keys[j] - }) - for _, planID := range keys { - t := c.queueTasks[planID] + excluded := make([]CompactionTask, 0) + defer func() { + // Add back the excluded tasks + for _, t := range excluded { + c.queueTasks.Enqueue(t) + } + }() + selected := make([]CompactionTask, 0) + + p := getPrioritizer() + if &c.queueTasks.prioritizer != &p { + c.queueTasks.UpdatePrioritizer(p) + } + + c.executingGuard.Lock() + tasksToGo := Params.DataCoordCfg.CompactionMaxParallelTasks.GetAsInt() - len(c.executingTasks) + c.executingGuard.Unlock() + for len(selected) < tasksToGo && c.queueTasks.Len() > 0 { + t, err := c.queueTasks.Dequeue() + if err != nil { + // Will never go here + return selected + } + switch t.GetType() { case datapb.CompactionType_Level0DeleteCompaction: if l0ChannelExcludes.Contain(t.GetChannel()) || mixChannelExcludes.Contain(t.GetChannel()) { + excluded = append(excluded, t) continue } - picked = append(picked, t) l0ChannelExcludes.Insert(t.GetChannel()) + selected = append(selected, t) case datapb.CompactionType_MixCompaction: if l0ChannelExcludes.Contain(t.GetChannel()) { + excluded = append(excluded, t) continue } - picked = append(picked, t) mixChannelExcludes.Insert(t.GetChannel()) mixLabelExcludes.Insert(t.GetLabel()) + selected = append(selected, t) case datapb.CompactionType_ClusteringCompaction: if l0ChannelExcludes.Contain(t.GetChannel()) || mixLabelExcludes.Contain(t.GetLabel()) || clusterLabelExcludes.Contain(t.GetLabel()) { + excluded = append(excluded, t) continue } - picked = append(picked, t) clusterChannelExcludes.Insert(t.GetChannel()) clusterLabelExcludes.Insert(t.GetLabel()) + selected = append(selected, t) } + + c.executingGuard.Lock() + c.executingTasks[t.GetPlanID()] = t + c.executingGuard.Unlock() + metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", NullNodeID), t.GetType().String(), metrics.Pending).Dec() + metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", NullNodeID), t.GetType().String(), metrics.Executing).Inc() } - return picked + return selected } func (c *compactionPlanHandler) start() { @@ -325,26 +334,6 @@ func (c *compactionPlanHandler) loadMeta() { } } -func (c *compactionPlanHandler) doSchedule() { - picked := c.schedule() - if len(picked) > 0 { - c.executingGuard.Lock() - for _, t := range picked { - c.executingTasks[t.GetPlanID()] = t - } - c.executingGuard.Unlock() - - c.queueGuard.Lock() - for _, t := range picked { - delete(c.queueTasks, t.GetPlanID()) - metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", NullNodeID), t.GetType().String(), metrics.Pending).Dec() - metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", NullNodeID), t.GetType().String(), metrics.Executing).Inc() - } - c.queueGuard.Unlock() - - } -} - func (c *compactionPlanHandler) loopSchedule() { log.Info("compactionPlanHandler start loop schedule") defer c.stopWg.Done() @@ -358,7 +347,7 @@ func (c *compactionPlanHandler) loopSchedule() { return case <-scheduleTicker.C: - c.doSchedule() + c.schedule() } } } @@ -484,22 +473,20 @@ func (c *compactionPlanHandler) stop() { } func (c *compactionPlanHandler) removeTasksByChannel(channel string) { - c.queueGuard.Lock() - for id, task := range c.queueTasks { - log.Info("Compaction handler removing tasks by channel", - zap.String("channel", channel), zap.Any("id", id), zap.Any("task_channel", task.GetChannel())) + log.Info("removing tasks by channel", zap.String("channel", channel)) + c.queueTasks.RemoveAll(func(task CompactionTask) bool { if task.GetChannel() == channel { log.Info("Compaction handler removing tasks by channel", zap.String("channel", channel), zap.Int64("planID", task.GetPlanID()), zap.Int64("node", task.GetNodeID()), ) - delete(c.queueTasks, id) - c.taskNumber.Dec() metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", task.GetNodeID()), task.GetType().String(), metrics.Pending).Dec() + return true } - } - c.queueGuard.Unlock() + return false + }) + c.executingGuard.Lock() for id, task := range c.executingTasks { log.Info("Compaction handler removing tasks by channel", @@ -511,7 +498,6 @@ func (c *compactionPlanHandler) removeTasksByChannel(channel string) { zap.Int64("node", task.GetNodeID()), ) delete(c.executingTasks, id) - c.taskNumber.Dec() metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", task.GetNodeID()), task.GetType().String(), metrics.Executing).Dec() } } @@ -521,10 +507,7 @@ func (c *compactionPlanHandler) removeTasksByChannel(channel string) { func (c *compactionPlanHandler) submitTask(t CompactionTask) { _, span := otel.Tracer(typeutil.DataCoordRole).Start(context.Background(), fmt.Sprintf("Compaction-%s", t.GetType())) t.SetSpan(span) - c.queueGuard.Lock() - c.queueTasks[t.GetPlanID()] = t - c.queueGuard.Unlock() - c.taskNumber.Add(1) + c.queueTasks.Enqueue(t) metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", NullNodeID), t.GetType().String(), metrics.Pending).Inc() } @@ -535,26 +518,24 @@ func (c *compactionPlanHandler) restoreTask(t CompactionTask) { c.executingGuard.Lock() c.executingTasks[t.GetPlanID()] = t c.executingGuard.Unlock() - c.taskNumber.Add(1) metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", t.GetNodeID()), t.GetType().String(), metrics.Executing).Inc() } // getCompactionTask return compaction func (c *compactionPlanHandler) getCompactionTask(planID int64) CompactionTask { - c.queueGuard.RLock() - t, ok := c.queueTasks[planID] - if ok { - c.queueGuard.RUnlock() + var t CompactionTask = nil + c.queueTasks.ForEach(func(task CompactionTask) { + if task.GetPlanID() == planID { + t = task + } + }) + if t != nil { return t } - c.queueGuard.RUnlock() + c.executingGuard.RLock() - t, ok = c.executingTasks[planID] - if ok { - c.executingGuard.RUnlock() - return t - } - c.executingGuard.RUnlock() + defer c.executingGuard.RUnlock() + t = c.executingTasks[planID] return t } @@ -669,7 +650,6 @@ func (c *compactionPlanHandler) checkCompaction() error { metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", t.GetNodeID()), t.GetType().String(), metrics.Done).Inc() } c.executingGuard.Unlock() - c.taskNumber.Sub(int32(len(finishedTasks))) return nil } @@ -708,23 +688,7 @@ func (c *compactionPlanHandler) pickShardNode(nodeSlots map[int64]int64, t Compa // isFull return true if the task pool is full func (c *compactionPlanHandler) isFull() bool { - return c.getTaskCount() >= Params.DataCoordCfg.CompactionMaxParallelTasks.GetAsInt() -} - -func (c *compactionPlanHandler) getTaskCount() int { - return int(c.taskNumber.Load()) -} - -func (c *compactionPlanHandler) getTasksByState(state datapb.CompactionTaskState) []CompactionTask { - c.queueGuard.RLock() - defer c.queueGuard.RUnlock() - tasks := make([]CompactionTask, 0, len(c.queueTasks)) - for _, t := range c.queueTasks { - if t.GetState() == state { - tasks = append(tasks, t) - } - } - return tasks + return c.queueTasks.Len() >= c.queueTasks.capacity } func (c *compactionPlanHandler) checkDelay(t CompactionTask) { diff --git a/internal/datacoord/compaction_queue.go b/internal/datacoord/compaction_queue.go new file mode 100644 index 0000000000000..c9b71017c4b66 --- /dev/null +++ b/internal/datacoord/compaction_queue.go @@ -0,0 +1,187 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package datacoord + +import ( + "container/heap" + + "github.com/cockroachdb/errors" + "github.com/samber/lo" + + "github.com/milvus-io/milvus/internal/proto/datapb" + "github.com/milvus-io/milvus/pkg/util/lock" +) + +type Item[T any] struct { + value T + priority int // The priority of the item in the queue. + // The index is needed by update and is maintained by the heap.Interface methods. + index int // The index of the item in the heap. +} + +// A PriorityQueue implements heap.Interface and holds Items. +type PriorityQueue[T any] []*Item[T] + +var _ heap.Interface = (*PriorityQueue[any])(nil) + +func (pq PriorityQueue[T]) Len() int { return len(pq) } + +func (pq PriorityQueue[T]) Less(i, j int) bool { + return pq[i].priority < pq[j].priority +} + +func (pq PriorityQueue[T]) Swap(i, j int) { + pq[i], pq[j] = pq[j], pq[i] + pq[i].index = i + pq[j].index = j +} + +func (pq *PriorityQueue[T]) Push(x any) { + n := len(*pq) + item := x.(*Item[T]) + item.index = n + *pq = append(*pq, item) +} + +func (pq *PriorityQueue[T]) Pop() any { + old := *pq + n := len(old) + item := old[n-1] + old[n-1] = nil // don't stop the GC from reclaiming the item eventually + item.index = -1 // for safety + *pq = old[0 : n-1] + return item +} + +// update modifies the priority and value of an Item in the queue. +func (pq *PriorityQueue[T]) Update(item *Item[T], value T, priority int) { + item.value = value + item.priority = priority + heap.Fix(pq, item.index) +} + +var ( + ErrFull = errors.New("compaction queue is full") + ErrNoSuchElement = errors.New("compaction queue has no element") +) + +type Prioritizer func(t CompactionTask) int + +type CompactionQueue struct { + pq PriorityQueue[CompactionTask] + lock lock.RWMutex + prioritizer Prioritizer + capacity int +} + +func NewCompactionQueue(capacity int, prioritizer Prioritizer) *CompactionQueue { + return &CompactionQueue{ + pq: make(PriorityQueue[CompactionTask], 0), + lock: lock.RWMutex{}, + prioritizer: prioritizer, + capacity: capacity, + } +} + +func (q *CompactionQueue) Enqueue(t CompactionTask) error { + q.lock.Lock() + defer q.lock.Unlock() + if q.capacity > 0 && len(q.pq) >= q.capacity { + return ErrFull + } + + heap.Push(&q.pq, &Item[CompactionTask]{value: t, priority: q.prioritizer(t)}) + return nil +} + +func (q *CompactionQueue) Dequeue() (CompactionTask, error) { + q.lock.Lock() + defer q.lock.Unlock() + + if len(q.pq) == 0 { + return nil, ErrNoSuchElement + } + + item := heap.Pop(&q.pq).(*Item[CompactionTask]) + return item.value, nil +} + +func (q *CompactionQueue) UpdatePrioritizer(prioritizer Prioritizer) { + q.prioritizer = prioritizer + q.lock.Lock() + defer q.lock.Unlock() + for i := range q.pq { + q.pq[i].priority = q.prioritizer(q.pq[i].value) + } + heap.Init(&q.pq) +} + +func (q *CompactionQueue) RemoveAll(predicate func(CompactionTask) bool) { + q.lock.Lock() + defer q.lock.Unlock() + f := lo.Filter[*Item[CompactionTask]](q.pq, func(i1 *Item[CompactionTask], _ int) bool { + return !predicate(i1.value) + }) + q.pq = f + heap.Init(&q.pq) +} + +// ForEach calls f on each item in the queue. +func (q *CompactionQueue) ForEach(f func(CompactionTask)) { + q.lock.RLock() + defer q.lock.RUnlock() + lo.ForEach[*Item[CompactionTask]](q.pq, func(i *Item[CompactionTask], _ int) { + f(i.value) + }) +} + +func (q *CompactionQueue) Len() int { + q.lock.RLock() + defer q.lock.RUnlock() + return len(q.pq) +} + +var ( + DefaultPrioritizer Prioritizer = func(task CompactionTask) int { + return int(task.GetPlanID()) + } + + LevelPrioritizer Prioritizer = func(task CompactionTask) int { + switch task.GetType() { + case datapb.CompactionType_Level0DeleteCompaction: + return 1 + case datapb.CompactionType_MixCompaction: + return 10 + case datapb.CompactionType_ClusteringCompaction: + return 100 + case datapb.CompactionType_MinorCompaction: + case datapb.CompactionType_MajorCompaction: + return 1000 + } + return 0xffff + } +) + +func getPrioritizer() Prioritizer { + p := Params.DataCoordCfg.CompactionTaskPrioritizer.GetValue() + switch p { + case "level": + return LevelPrioritizer + default: + return DefaultPrioritizer + } +} diff --git a/internal/datacoord/compaction_queue_test.go b/internal/datacoord/compaction_queue_test.go new file mode 100644 index 0000000000000..794520d047673 --- /dev/null +++ b/internal/datacoord/compaction_queue_test.go @@ -0,0 +1,151 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package datacoord + +import ( + "sync" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/milvus-io/milvus/internal/proto/datapb" +) + +func TestCompactionQueue(t *testing.T) { + t1 := &mixCompactionTask{ + CompactionTask: &datapb.CompactionTask{ + PlanID: 3, + Type: datapb.CompactionType_MixCompaction, + }, + } + + t2 := &l0CompactionTask{ + CompactionTask: &datapb.CompactionTask{ + PlanID: 1, + Type: datapb.CompactionType_Level0DeleteCompaction, + }, + } + + t3 := &clusteringCompactionTask{ + CompactionTask: &datapb.CompactionTask{ + PlanID: 2, + Type: datapb.CompactionType_MajorCompaction, + }, + } + + t.Run("default prioritizer", func(t *testing.T) { + cq := NewCompactionQueue(3, DefaultPrioritizer) + err := cq.Enqueue(t1) + assert.NoError(t, err) + err = cq.Enqueue(t2) + assert.NoError(t, err) + err = cq.Enqueue(t3) + assert.NoError(t, err) + err = cq.Enqueue(&mixCompactionTask{}) + assert.Error(t, err) + + task, err := cq.Dequeue() + assert.NoError(t, err) + assert.Equal(t, int64(1), task.GetPlanID()) + task, err = cq.Dequeue() + assert.NoError(t, err) + assert.Equal(t, int64(2), task.GetPlanID()) + task, err = cq.Dequeue() + assert.NoError(t, err) + assert.Equal(t, int64(3), task.GetPlanID()) + }) + + t.Run("level prioritizer", func(t *testing.T) { + cq := NewCompactionQueue(3, LevelPrioritizer) + err := cq.Enqueue(t1) + assert.NoError(t, err) + err = cq.Enqueue(t2) + assert.NoError(t, err) + err = cq.Enqueue(t3) + assert.NoError(t, err) + err = cq.Enqueue(&mixCompactionTask{}) + assert.Error(t, err) + + task, err := cq.Dequeue() + assert.NoError(t, err) + assert.Equal(t, datapb.CompactionType_Level0DeleteCompaction, task.GetType()) + task, err = cq.Dequeue() + assert.NoError(t, err) + assert.Equal(t, datapb.CompactionType_MixCompaction, task.GetType()) + task, err = cq.Dequeue() + assert.NoError(t, err) + assert.Equal(t, datapb.CompactionType_MajorCompaction, task.GetType()) + }) + + t.Run("update prioritizer", func(t *testing.T) { + cq := NewCompactionQueue(3, LevelPrioritizer) + err := cq.Enqueue(t1) + assert.NoError(t, err) + err = cq.Enqueue(t2) + assert.NoError(t, err) + err = cq.Enqueue(t3) + assert.NoError(t, err) + err = cq.Enqueue(&mixCompactionTask{}) + assert.Error(t, err) + + task, err := cq.Dequeue() + assert.NoError(t, err) + assert.Equal(t, datapb.CompactionType_Level0DeleteCompaction, task.GetType()) + + cq.UpdatePrioritizer(DefaultPrioritizer) + task, err = cq.Dequeue() + assert.NoError(t, err) + assert.Equal(t, int64(2), task.GetPlanID()) + task, err = cq.Dequeue() + assert.NoError(t, err) + assert.Equal(t, int64(3), task.GetPlanID()) + }) +} + +func TestConcurrency(t *testing.T) { + c := 10 + + cq := NewCompactionQueue(c, LevelPrioritizer) + + wg := sync.WaitGroup{} + wg.Add(c) + for i := 0; i < c; i++ { + t1 := &mixCompactionTask{ + CompactionTask: &datapb.CompactionTask{ + PlanID: int64(i), + Type: datapb.CompactionType_MixCompaction, + }, + } + go func() { + err := cq.Enqueue(t1) + assert.NoError(t, err) + wg.Done() + }() + } + + wg.Wait() + + wg.Add(c) + for i := 0; i < c; i++ { + go func() { + _, err := cq.Dequeue() + assert.NoError(t, err) + wg.Done() + }() + } + wg.Wait() +} diff --git a/internal/datacoord/compaction_test.go b/internal/datacoord/compaction_test.go index 6b9f711c0a75d..2357dd36fc3cb 100644 --- a/internal/datacoord/compaction_test.go +++ b/internal/datacoord/compaction_test.go @@ -214,19 +214,15 @@ func (s *CompactionPlanHandlerSuite) TestScheduleNodeWith1ParallelTask() { s.Run(test.description, func() { s.SetupTest() s.generateInitTasksForSchedule() - s.Require().Equal(4, s.handler.getTaskCount()) // submit the testing tasks for _, t := range test.tasks { s.handler.submitTask(t) } - s.Equal(4+len(test.tasks), s.handler.getTaskCount()) gotTasks := s.handler.schedule() s.Equal(test.expectedOut, lo.Map(gotTasks, func(t CompactionTask, _ int) int64 { return t.GetPlanID() })) - - s.Equal(4+len(test.tasks), s.handler.getTaskCount()) }) } } @@ -332,13 +328,11 @@ func (s *CompactionPlanHandlerSuite) TestScheduleNodeWithL0Executing() { for _, test := range tests { s.Run(test.description, func() { s.SetupTest() - s.Require().Equal(0, s.handler.getTaskCount()) // submit the testing tasks for _, t := range test.tasks { s.handler.submitTask(t) } - s.Equal(len(test.tasks), s.handler.getTaskCount()) gotTasks := s.handler.schedule() s.Equal(test.expectedOut, lo.Map(gotTasks, func(t CompactionTask, _ int) int64 { @@ -531,7 +525,6 @@ func (s *CompactionPlanHandlerSuite) TestRemoveTasksByChannel() { s.handler.submitTask(t1) s.handler.restoreTask(t2) s.handler.removeTasksByChannel(ch) - s.Equal(0, s.handler.getTaskCount()) } func (s *CompactionPlanHandlerSuite) TestGetCompactionTask() { @@ -601,9 +594,7 @@ func (s *CompactionPlanHandlerSuite) TestGetCompactionTask() { s.handler.submitTask(t) } - s.Equal(3, s.handler.getTaskCount()) - s.handler.doSchedule() - s.Equal(3, s.handler.getTaskCount()) + s.handler.schedule() info := s.handler.getCompactionInfo(1) s.Equal(1, info.completedCnt) @@ -627,7 +618,6 @@ func (s *CompactionPlanHandlerSuite) TestExecCompactionPlan() { s.NoError(err) t := handler.getCompactionTask(1) s.NotNil(t) - s.handler.taskNumber.Add(1000) task.PlanID = 2 err = s.handler.enqueueCompaction(task) s.NoError(err) @@ -759,10 +749,7 @@ func (s *CompactionPlanHandlerSuite) TestCheckCompaction() { s.handler.submitTask(t) } - picked := s.handler.schedule() - s.NotEmpty(picked) - - s.handler.doSchedule() + s.handler.schedule() // time.Sleep(2 * time.Second) s.handler.checkCompaction() @@ -903,11 +890,9 @@ func (s *CompactionPlanHandlerSuite) TestProcessCompleteCompaction() { s.mockSessMgr.EXPECT().DropCompactionPlan(mock.Anything, mock.Anything).Return(nil) s.handler.submitTask(task) - s.handler.doSchedule() - s.Equal(1, s.handler.getTaskCount()) + s.handler.schedule() err := s.handler.checkCompaction() s.NoError(err) - s.Equal(0, len(s.handler.getTasksByState(datapb.CompactionTaskState_completed))) } func getFieldBinlogIDs(fieldID int64, logIDs ...int64) *datapb.FieldBinlog { diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index d263c7bb9216f..28ee053737acf 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -3205,9 +3205,10 @@ type dataCoordConfig struct { SegmentFlushInterval ParamItem `refreshable:"true"` // compaction - EnableCompaction ParamItem `refreshable:"false"` - EnableAutoCompaction ParamItem `refreshable:"true"` - IndexBasedCompaction ParamItem `refreshable:"true"` + EnableCompaction ParamItem `refreshable:"false"` + EnableAutoCompaction ParamItem `refreshable:"true"` + IndexBasedCompaction ParamItem `refreshable:"true"` + CompactionTaskPrioritizer ParamItem `refreshable:"true"` CompactionRPCTimeout ParamItem `refreshable:"true"` CompactionMaxParallelTasks ParamItem `refreshable:"true"` @@ -3482,6 +3483,15 @@ This configuration takes effect only when dataCoord.enableCompaction is set as t } p.IndexBasedCompaction.Init(base.mgr) + p.CompactionTaskPrioritizer = ParamItem{ + Key: "dataCoord.compaction.taskPrioritizer", + Version: "2.5.0", + DefaultValue: "default", + Doc: "compaction task prioritizer, options: [default, level]. Default is FIFO, level is prioritized by level: L0 compactions first, then mix compactions, then major compactions.", + Export: true, + } + p.CompactionTaskPrioritizer.Init(base.mgr) + p.CompactionRPCTimeout = ParamItem{ Key: "dataCoord.compaction.rpcTimeout", Version: "2.2.12",