Skip to content

Commit

Permalink
fix: Handle the error of the compaction queue being full (milvus-io#3…
Browse files Browse the repository at this point in the history
…7989)

issue: milvus-io#37988

Signed-off-by: Cai Zhang <[email protected]>
  • Loading branch information
xiaocai2333 authored Nov 29, 2024
1 parent fd94f12 commit 5e15276
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 4 deletions.
25 changes: 21 additions & 4 deletions internal/datacoord/compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,17 @@ func (c *compactionPlanHandler) loadMeta() {
continue
}
if t.NeedReAssignNodeID() {
c.submitTask(t)
if err = c.submitTask(t); err != nil {
log.Info("compactionPlanHandler loadMeta submit task failed, try to clean it",
zap.Int64("planID", task.GetPlanID()),
zap.String("type", task.GetType().String()),
zap.String("state", task.GetState().String()),
zap.Error(err),
)
// ignore the drop error
c.meta.DropCompactionTask(task)
continue
}
log.Info("compactionPlanHandler loadMeta submitTask",
zap.Int64("planID", t.GetTaskProto().GetPlanID()),
zap.Int64("triggerID", t.GetTaskProto().GetTriggerID()),
Expand Down Expand Up @@ -541,11 +551,14 @@ func (c *compactionPlanHandler) removeTasksByChannel(channel string) {
c.executingGuard.Unlock()
}

func (c *compactionPlanHandler) submitTask(t CompactionTask) {
func (c *compactionPlanHandler) submitTask(t CompactionTask) error {
_, span := otel.Tracer(typeutil.DataCoordRole).Start(context.Background(), fmt.Sprintf("Compaction-%s", t.GetTaskProto().GetType()))
t.SetSpan(span)
c.queueTasks.Enqueue(t)
if err := c.queueTasks.Enqueue(t); err != nil {
return err
}
metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", NullNodeID), t.GetTaskProto().GetType().String(), metrics.Pending).Inc()
return nil
}

// restoreTask used to restore Task from etcd
Expand Down Expand Up @@ -596,7 +609,11 @@ func (c *compactionPlanHandler) enqueueCompaction(task *datapb.CompactionTask) e
log.Warn("Failed to enqueue compaction task, unable to save task meta", zap.Error(err))
return err
}
c.submitTask(t)
if err = c.submitTask(t); err != nil {
log.Warn("submit compaction task failed", zap.Error(err))
c.meta.SetSegmentsCompacting(t.GetTaskProto().GetInputSegments(), false)
return err
}
log.Info("Compaction plan submitted")
return nil
}
Expand Down
38 changes: 38 additions & 0 deletions internal/datacoord/compaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -627,6 +627,44 @@ func (s *CompactionPlanHandlerSuite) TestGetCompactionTask() {
s.Equal(1, info.failedCnt)
}

func (s *CompactionPlanHandlerSuite) TestCompactionQueueFull() {
s.SetupTest()
paramtable.Get().Save("dataCoord.compaction.taskQueueCapacity", "1")
defer paramtable.Get().Reset("dataCoord.compaction.taskQueueCapacity")

s.handler = newCompactionPlanHandler(s.cluster, s.mockSessMgr, s.mockMeta, s.mockAlloc, nil, nil)

t1 := newMixCompactionTask(&datapb.CompactionTask{
TriggerID: 1,
PlanID: 1,
Type: datapb.CompactionType_MixCompaction,
Channel: "ch-01",
State: datapb.CompactionTaskState_executing,
}, nil, s.mockMeta, s.mockSessMgr)
t1.plan = &datapb.CompactionPlan{
PlanID: 1,
Type: datapb.CompactionType_MixCompaction,
Channel: "ch-01",
}

s.NoError(s.handler.submitTask(t1))

t2 := newMixCompactionTask(&datapb.CompactionTask{
TriggerID: 1,
PlanID: 2,
Type: datapb.CompactionType_MixCompaction,
Channel: "ch-01",
State: datapb.CompactionTaskState_completed,
}, nil, s.mockMeta, s.mockSessMgr)
t2.plan = &datapb.CompactionPlan{
PlanID: 2,
Type: datapb.CompactionType_MixCompaction,
Channel: "ch-01",
}

s.Error(s.handler.submitTask(t2))
}

func (s *CompactionPlanHandlerSuite) TestExecCompactionPlan() {
s.SetupTest()
s.mockMeta.EXPECT().CheckAndSetSegmentsCompacting(mock.Anything, mock.Anything).Return(true, true).Maybe()
Expand Down

0 comments on commit 5e15276

Please sign in to comment.