From 5e152767a39924b5adfd240292bd18c007912af7 Mon Sep 17 00:00:00 2001 From: "cai.zhang" Date: Fri, 29 Nov 2024 11:10:36 +0800 Subject: [PATCH] fix: Handle the error of the compaction queue being full (#37989) issue: #37988 Signed-off-by: Cai Zhang --- internal/datacoord/compaction.go | 25 +++++++++++++++--- internal/datacoord/compaction_test.go | 38 +++++++++++++++++++++++++++ 2 files changed, 59 insertions(+), 4 deletions(-) diff --git a/internal/datacoord/compaction.go b/internal/datacoord/compaction.go index 1cbfc0fda34c5..51b29fc7c8c0a 100644 --- a/internal/datacoord/compaction.go +++ b/internal/datacoord/compaction.go @@ -350,7 +350,17 @@ func (c *compactionPlanHandler) loadMeta() { continue } if t.NeedReAssignNodeID() { - c.submitTask(t) + if err = c.submitTask(t); err != nil { + log.Info("compactionPlanHandler loadMeta submit task failed, try to clean it", + zap.Int64("planID", task.GetPlanID()), + zap.String("type", task.GetType().String()), + zap.String("state", task.GetState().String()), + zap.Error(err), + ) + // ignore the drop error + c.meta.DropCompactionTask(task) + continue + } log.Info("compactionPlanHandler loadMeta submitTask", zap.Int64("planID", t.GetTaskProto().GetPlanID()), zap.Int64("triggerID", t.GetTaskProto().GetTriggerID()), @@ -541,11 +551,14 @@ func (c *compactionPlanHandler) removeTasksByChannel(channel string) { c.executingGuard.Unlock() } -func (c *compactionPlanHandler) submitTask(t CompactionTask) { +func (c *compactionPlanHandler) submitTask(t CompactionTask) error { _, span := otel.Tracer(typeutil.DataCoordRole).Start(context.Background(), fmt.Sprintf("Compaction-%s", t.GetTaskProto().GetType())) t.SetSpan(span) - c.queueTasks.Enqueue(t) + if err := c.queueTasks.Enqueue(t); err != nil { + return err + } metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", NullNodeID), t.GetTaskProto().GetType().String(), metrics.Pending).Inc() + return nil } // restoreTask used to restore Task from etcd @@ -596,7 +609,11 @@ func (c *compactionPlanHandler) enqueueCompaction(task *datapb.CompactionTask) e log.Warn("Failed to enqueue compaction task, unable to save task meta", zap.Error(err)) return err } - c.submitTask(t) + if err = c.submitTask(t); err != nil { + log.Warn("submit compaction task failed", zap.Error(err)) + c.meta.SetSegmentsCompacting(t.GetTaskProto().GetInputSegments(), false) + return err + } log.Info("Compaction plan submitted") return nil } diff --git a/internal/datacoord/compaction_test.go b/internal/datacoord/compaction_test.go index 5ca8ed2be8150..38a7c70739877 100644 --- a/internal/datacoord/compaction_test.go +++ b/internal/datacoord/compaction_test.go @@ -627,6 +627,44 @@ func (s *CompactionPlanHandlerSuite) TestGetCompactionTask() { s.Equal(1, info.failedCnt) } +func (s *CompactionPlanHandlerSuite) TestCompactionQueueFull() { + s.SetupTest() + paramtable.Get().Save("dataCoord.compaction.taskQueueCapacity", "1") + defer paramtable.Get().Reset("dataCoord.compaction.taskQueueCapacity") + + s.handler = newCompactionPlanHandler(s.cluster, s.mockSessMgr, s.mockMeta, s.mockAlloc, nil, nil) + + t1 := newMixCompactionTask(&datapb.CompactionTask{ + TriggerID: 1, + PlanID: 1, + Type: datapb.CompactionType_MixCompaction, + Channel: "ch-01", + State: datapb.CompactionTaskState_executing, + }, nil, s.mockMeta, s.mockSessMgr) + t1.plan = &datapb.CompactionPlan{ + PlanID: 1, + Type: datapb.CompactionType_MixCompaction, + Channel: "ch-01", + } + + s.NoError(s.handler.submitTask(t1)) + + t2 := newMixCompactionTask(&datapb.CompactionTask{ + TriggerID: 1, + PlanID: 2, + Type: datapb.CompactionType_MixCompaction, + Channel: "ch-01", + State: datapb.CompactionTaskState_completed, + }, nil, s.mockMeta, s.mockSessMgr) + t2.plan = &datapb.CompactionPlan{ + PlanID: 2, + Type: datapb.CompactionType_MixCompaction, + Channel: "ch-01", + } + + s.Error(s.handler.submitTask(t2)) +} + func (s *CompactionPlanHandlerSuite) TestExecCompactionPlan() { s.SetupTest() s.mockMeta.EXPECT().CheckAndSetSegmentsCompacting(mock.Anything, mock.Anything).Return(true, true).Maybe()