Skip to content

Commit

Permalink
fix: [2.4] Handle the error of the compaction queue being full (#37990)
Browse files Browse the repository at this point in the history
issue: #37988

master pr: #37989

Signed-off-by: Cai Zhang <[email protected]>
  • Loading branch information
xiaocai2333 authored Nov 29, 2024
1 parent 88b731d commit 045cf56
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 4 deletions.
25 changes: 21 additions & 4 deletions internal/datacoord/compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,17 @@ func (c *compactionPlanHandler) loadMeta() {
continue
}
if t.NeedReAssignNodeID() {
c.submitTask(t)
if err = c.submitTask(t); err != nil {
log.Info("compactionPlanHandler loadMeta submit task failed, try to clean it",
zap.Int64("planID", task.GetPlanID()),
zap.String("type", task.GetType().String()),
zap.String("state", task.GetState().String()),
zap.Error(err),
)
// ignore the drop error
c.meta.DropCompactionTask(task)
continue
}
log.Info("compactionPlanHandler loadMeta submitTask",
zap.Int64("planID", t.GetPlanID()),
zap.Int64("triggerID", t.GetTriggerID()),
Expand Down Expand Up @@ -537,11 +547,14 @@ func (c *compactionPlanHandler) removeTasksByChannel(channel string) {
c.executingGuard.Unlock()
}

func (c *compactionPlanHandler) submitTask(t CompactionTask) {
func (c *compactionPlanHandler) submitTask(t CompactionTask) error {
_, span := otel.Tracer(typeutil.DataCoordRole).Start(context.Background(), fmt.Sprintf("Compaction-%s", t.GetType()))
t.SetSpan(span)
c.queueTasks.Enqueue(t)
if err := c.queueTasks.Enqueue(t); err != nil {
return err
}
metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", NullNodeID), t.GetType().String(), metrics.Pending).Inc()
return nil
}

// restoreTask used to restore Task from etcd
Expand Down Expand Up @@ -592,7 +605,11 @@ func (c *compactionPlanHandler) enqueueCompaction(task *datapb.CompactionTask) e
log.Warn("Failed to enqueue compaction task, unable to save task meta", zap.Error(err))
return err
}
c.submitTask(t)
if err = c.submitTask(t); err != nil {
log.Warn("submit compaction task failed", zap.Error(err))
c.meta.SetSegmentsCompacting(t.GetInputSegments(), false)
return err
}
log.Info("Compaction plan submitted")
return nil
}
Expand Down
47 changes: 47 additions & 0 deletions internal/datacoord/compaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/milvus-io/milvus/internal/metastore/kv/datacoord"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/pkg/util/metautil"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)

Expand Down Expand Up @@ -709,6 +710,52 @@ func (s *CompactionPlanHandlerSuite) TestGetCompactionTask() {
s.Equal(1, info.failedCnt)
}

func (s *CompactionPlanHandlerSuite) TestCompactionQueueFull() {
s.SetupTest()
paramtable.Get().Save("dataCoord.compaction.taskQueueCapacity", "1")
defer paramtable.Get().Reset("dataCoord.compaction.taskQueueCapacity")

s.handler = newCompactionPlanHandler(s.cluster, s.mockSessMgr, s.mockCm, s.mockMeta, s.mockAlloc, nil, nil)

t1 := &mixCompactionTask{
CompactionTask: &datapb.CompactionTask{
TriggerID: 1,
PlanID: 1,
Type: datapb.CompactionType_MixCompaction,
Channel: "ch-01",
State: datapb.CompactionTaskState_executing,
},
meta: s.mockMeta,
sessions: s.mockSessMgr,
}
t1.plan = &datapb.CompactionPlan{
PlanID: 1,
Type: datapb.CompactionType_MixCompaction,
Channel: "ch-01",
}

s.NoError(s.handler.submitTask(t1))

t2 := &mixCompactionTask{
CompactionTask: &datapb.CompactionTask{
TriggerID: 1,
PlanID: 2,
Type: datapb.CompactionType_MixCompaction,
Channel: "ch-01",
State: datapb.CompactionTaskState_completed,
},
meta: s.mockMeta,
sessions: s.mockSessMgr,
}
t2.plan = &datapb.CompactionPlan{
PlanID: 2,
Type: datapb.CompactionType_MixCompaction,
Channel: "ch-01",
}

s.Error(s.handler.submitTask(t2))
}

func (s *CompactionPlanHandlerSuite) TestExecCompactionPlan() {
s.SetupTest()
s.mockMeta.EXPECT().CheckAndSetSegmentsCompacting(mock.Anything).Return(true, true).Maybe()
Expand Down

0 comments on commit 045cf56

Please sign in to comment.