From 306e5e68988586374d31317c71aeecb5331c2ba3 Mon Sep 17 00:00:00 2001 From: "cai.zhang" Date: Thu, 19 Dec 2024 12:36:47 +0800 Subject: [PATCH] enhance: clean compaction task in compactionHandler (#38170) issue: #35711 --------- Signed-off-by: wayblink Signed-off-by: Cai Zhang Co-authored-by: wayblink --- internal/datacoord/compaction.go | 47 +++- internal/datacoord/compaction_task.go | 10 + .../datacoord/compaction_task_clustering.go | 226 ++++++++++-------- .../compaction_task_clustering_test.go | 51 +++- internal/datacoord/compaction_task_l0.go | 21 +- internal/datacoord/compaction_task_l0_test.go | 17 +- internal/datacoord/compaction_task_mix.go | 40 +++- internal/datacoord/compaction_test.go | 177 ++++++++++++++ internal/datacoord/meta.go | 6 + internal/datacoord/partition_stats_meta.go | 32 ++- .../datacoord/partition_stats_meta_test.go | 58 +++++ pkg/util/merr/errors.go | 1 + pkg/util/merr/utils.go | 8 + 13 files changed, 548 insertions(+), 146 deletions(-) diff --git a/internal/datacoord/compaction.go b/internal/datacoord/compaction.go index ea56f29349dac..c1f58b94d414d 100644 --- a/internal/datacoord/compaction.go +++ b/internal/datacoord/compaction.go @@ -84,6 +84,9 @@ type compactionPlanHandler struct { executingGuard lock.RWMutex executingTasks map[int64]CompactionTask // planID -> task + cleaningGuard lock.RWMutex + cleaningTasks map[int64]CompactionTask // planID -> task + meta CompactionMeta allocator allocator.Allocator sessions session.DataNodeManager @@ -193,6 +196,7 @@ func newCompactionPlanHandler(cluster Cluster, sessions session.DataNodeManager, stopCh: make(chan struct{}), cluster: cluster, executingTasks: make(map[int64]CompactionTask), + cleaningTasks: make(map[int64]CompactionTask), analyzeScheduler: analyzeScheduler, handler: handler, } @@ -416,6 +420,7 @@ func (c *compactionPlanHandler) loopCheck() { if err != nil { log.Info("fail to update compaction", zap.Error(err)) } + c.cleanFailedTasks() } } } @@ -447,7 +452,7 @@ func (c *compactionPlanHandler) cleanCompactionTaskMeta() { triggers := c.meta.GetCompactionTasks(context.TODO()) for _, tasks := range triggers { for _, task := range tasks { - if task.State == datapb.CompactionTaskState_completed || task.State == datapb.CompactionTaskState_cleaned { + if task.State == datapb.CompactionTaskState_cleaned { duration := time.Since(time.Unix(task.StartTime, 0)).Seconds() if duration > float64(Params.DataCoordCfg.CompactionDropToleranceInSeconds.GetAsDuration(time.Second).Seconds()) { // try best to delete meta @@ -668,6 +673,11 @@ func assignNodeID(slots map[int64]int64, t CompactionTask) int64 { return nodeID } +// checkCompaction retrieves executing tasks and calls each task's Process() method +// to evaluate its state and progress through the state machine. +// Completed tasks are removed from executingTasks. +// Tasks that fail or timeout are moved from executingTasks to cleaningTasks, +// where task-specific clean logic is performed asynchronously. func (c *compactionPlanHandler) checkCompaction() error { // Get executing executingTasks before GetCompactionState from DataNode to prevent false failure, // for DC might add new task while GetCompactionState. @@ -709,9 +719,44 @@ func (c *compactionPlanHandler) checkCompaction() error { metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", t.GetTaskProto().GetNodeID()), t.GetTaskProto().GetType().String(), metrics.Done).Inc() } c.executingGuard.Unlock() + + // insert task need to clean + c.cleaningGuard.Lock() + for _, t := range finishedTasks { + if t.GetTaskProto().GetState() == datapb.CompactionTaskState_failed || + t.GetTaskProto().GetState() == datapb.CompactionTaskState_timeout || + t.GetTaskProto().GetState() == datapb.CompactionTaskState_completed { + log.Ctx(context.TODO()).Info("task need to clean", + zap.Int64("collectionID", t.GetTaskProto().GetCollectionID()), + zap.Int64("planID", t.GetTaskProto().GetPlanID()), + zap.String("state", t.GetTaskProto().GetState().String())) + c.cleaningTasks[t.GetTaskProto().GetPlanID()] = t + } + } + c.cleaningGuard.Unlock() + return nil } +// cleanFailedTasks performs task define Clean logic +// while compactionPlanHandler.Clean is to do garbage collection for cleaned tasks +func (c *compactionPlanHandler) cleanFailedTasks() { + c.cleaningGuard.RLock() + cleanedTasks := make([]CompactionTask, 0) + for _, t := range c.cleaningTasks { + clean := t.Clean() + if clean { + cleanedTasks = append(cleanedTasks, t) + } + } + c.cleaningGuard.RUnlock() + c.cleaningGuard.Lock() + for _, t := range cleanedTasks { + delete(c.cleaningTasks, t.GetTaskProto().GetPlanID()) + } + c.cleaningGuard.Unlock() +} + func pickAnyNode(nodeSlots map[int64]int64, task CompactionTask) (nodeID int64, useSlot int64) { nodeID = NullNodeID var maxSlots int64 = -1 diff --git a/internal/datacoord/compaction_task.go b/internal/datacoord/compaction_task.go index f9cd27e972da6..4f0bd375a180b 100644 --- a/internal/datacoord/compaction_task.go +++ b/internal/datacoord/compaction_task.go @@ -23,7 +23,17 @@ import ( ) type CompactionTask interface { + // Process performs the task's state machine + // + // Returns: + // - : whether the task state machine ends. + // + // Notes: + // + // `end` doesn't mean the task completed, its state may be completed or failed or timeout. Process() bool + // Clean performs clean logic for a fail/timeout task + Clean() bool BuildCompactionRequest() (*datapb.CompactionPlan, error) GetSlotUsage() int64 GetLabel() string diff --git a/internal/datacoord/compaction_task_clustering.go b/internal/datacoord/compaction_task_clustering.go index 34416ba40df00..53a5fbd97e3a6 100644 --- a/internal/datacoord/compaction_task_clustering.go +++ b/internal/datacoord/compaction_task_clustering.go @@ -85,10 +85,13 @@ func newClusteringCompactionTask(t *datapb.CompactionTask, allocator allocator.A return task } +// Note: return True means exit this state machine. +// ONLY return True for Completed, Failed or Timeout func (t *clusteringCompactionTask) Process() bool { - log := log.With(zap.Int64("triggerID", t.GetTaskProto().GetTriggerID()), zap.Int64("PlanID", t.GetTaskProto().GetPlanID()), zap.Int64("collectionID", t.GetTaskProto().GetCollectionID())) + ctx := context.TODO() + log := log.Ctx(ctx).With(zap.Int64("triggerID", t.GetTaskProto().GetTriggerID()), zap.Int64("PlanID", t.GetTaskProto().GetPlanID()), zap.Int64("collectionID", t.GetTaskProto().GetCollectionID())) lastState := t.GetTaskProto().GetState().String() - err := t.retryableProcess() + err := t.retryableProcess(ctx) if err != nil { log.Warn("fail in process task", zap.Error(err)) if merr.IsRetryableErr(err) && t.GetTaskProto().RetryTimes < t.maxRetryTimes { @@ -125,19 +128,26 @@ func (t *clusteringCompactionTask) Process() bool { if err != nil { log.Warn("Failed to updateAndSaveTaskMeta", zap.Error(err)) } + log.Info("clustering compaction task state changed", zap.String("lastState", lastState), zap.String("currentState", currentState), zap.Int64("elapse seconds", lastStateDuration)) } log.Debug("process clustering task", zap.String("lastState", lastState), zap.String("currentState", currentState)) - return t.GetTaskProto().State == datapb.CompactionTaskState_completed || t.GetTaskProto().State == datapb.CompactionTaskState_cleaned + return t.GetTaskProto().State == datapb.CompactionTaskState_completed || + t.GetTaskProto().State == datapb.CompactionTaskState_cleaned || + t.GetTaskProto().State == datapb.CompactionTaskState_failed || + t.GetTaskProto().State == datapb.CompactionTaskState_timeout } // retryableProcess process task's state transfer, return error if not work as expected // the outer Process will set state and retry times according to the error type(retryable or not-retryable) -func (t *clusteringCompactionTask) retryableProcess() error { - if t.GetTaskProto().State == datapb.CompactionTaskState_completed || t.GetTaskProto().State == datapb.CompactionTaskState_cleaned { +func (t *clusteringCompactionTask) retryableProcess(ctx context.Context) error { + if t.GetTaskProto().State == datapb.CompactionTaskState_completed || + t.GetTaskProto().State == datapb.CompactionTaskState_cleaned || + t.GetTaskProto().State == datapb.CompactionTaskState_failed || + t.GetTaskProto().State == datapb.CompactionTaskState_timeout { return nil } - coll, err := t.handler.GetCollection(context.Background(), t.GetTaskProto().GetCollectionID()) + coll, err := t.handler.GetCollection(ctx, t.GetTaskProto().GetCollectionID()) if err != nil { // retryable log.Warn("fail to get collection", zap.Int64("collectionID", t.GetTaskProto().GetCollectionID()), zap.Error(err)) @@ -162,15 +172,15 @@ func (t *clusteringCompactionTask) retryableProcess() error { return t.processIndexing() case datapb.CompactionTaskState_statistic: return t.processStats() - - case datapb.CompactionTaskState_timeout: - return t.processFailedOrTimeout() - case datapb.CompactionTaskState_failed: - return t.processFailedOrTimeout() } return nil } +func (t *clusteringCompactionTask) Clean() bool { + log.Ctx(context.TODO()).Info("clean task", zap.Int64("planID", t.GetTaskProto().GetPlanID()), zap.String("type", t.GetTaskProto().GetType().String())) + return t.doClean() == nil +} + func (t *clusteringCompactionTask) BuildCompactionRequest() (*datapb.CompactionPlan, error) { beginLogID, _, err := t.allocator.AllocN(1) if err != nil { @@ -219,7 +229,9 @@ func (t *clusteringCompactionTask) BuildCompactionRequest() (*datapb.CompactionP } func (t *clusteringCompactionTask) processPipelining() error { - log := log.With(zap.Int64("triggerID", t.GetTaskProto().TriggerID), zap.Int64("collectionID", t.GetTaskProto().GetCollectionID()), zap.Int64("planID", t.GetTaskProto().GetPlanID())) + log := log.Ctx(context.TODO()).With(zap.Int64("triggerID", t.GetTaskProto().TriggerID), + zap.Int64("collectionID", t.GetTaskProto().GetCollectionID()), + zap.Int64("planID", t.GetTaskProto().GetPlanID())) if t.NeedReAssignNodeID() { log.Debug("wait for the node to be assigned before proceeding with the subsequent steps") return nil @@ -244,7 +256,7 @@ func (t *clusteringCompactionTask) processPipelining() error { } func (t *clusteringCompactionTask) processExecuting() error { - log := log.With(zap.Int64("planID", t.GetTaskProto().GetPlanID()), zap.String("type", t.GetTaskProto().GetType().String())) + log := log.Ctx(context.TODO()).With(zap.Int64("planID", t.GetTaskProto().GetPlanID()), zap.String("type", t.GetTaskProto().GetType().String())) result, err := t.sessions.GetCompactionPlanResult(t.GetTaskProto().GetNodeID(), t.GetTaskProto().GetPlanID()) if err != nil || result == nil { log.Warn("processExecuting clustering compaction", zap.Error(err)) @@ -282,12 +294,7 @@ func (t *clusteringCompactionTask) processExecuting() error { return t.processMetaSaved() case datapb.CompactionTaskState_executing: if t.checkTimeout() { - err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_timeout)) - if err == nil { - return t.processFailedOrTimeout() - } else { - return err - } + return t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_timeout)) } return nil case datapb.CompactionTaskState_failed: @@ -299,21 +306,23 @@ func (t *clusteringCompactionTask) processExecuting() error { } func (t *clusteringCompactionTask) processMetaSaved() error { + log := log.Ctx(context.TODO()).With(zap.Int64("planID", t.GetTaskProto().GetPlanID())) if err := t.sessions.DropCompactionPlan(t.GetTaskProto().GetNodeID(), &datapb.DropCompactionPlanRequest{ PlanID: t.GetTaskProto().GetPlanID(), }); err != nil { - log.Warn("clusteringCompactionTask processFailedOrTimeout unable to drop compaction plan", zap.Int64("planID", t.GetTaskProto().GetPlanID()), zap.Error(err)) + log.Warn("clusteringCompactionTask processFailedOrTimeout unable to drop compaction plan", zap.Error(err)) } // to ensure compatibility, if a task upgraded from version 2.4 has a status of MetaSave, // its TmpSegments will be empty, so skip the stats task, to build index. if len(t.GetTaskProto().GetTmpSegments()) == 0 { - log.Info("tmp segments is nil, skip stats task", zap.Int64("planID", t.GetTaskProto().GetPlanID())) + log.Info("tmp segments is nil, skip stats task") return t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_indexing)) } return t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_statistic)) } func (t *clusteringCompactionTask) processStats() error { + log := log.Ctx(context.TODO()).With(zap.Int64("planID", t.GetTaskProto().GetPlanID())) // just the memory step, if it crashes at this step, the state after recovery is CompactionTaskState_statistic. resultSegments := make([]int64, 0, len(t.GetTaskProto().GetTmpSegments())) if Params.DataCoordCfg.EnableStatsTask.GetAsBool() { @@ -338,15 +347,15 @@ func (t *clusteringCompactionTask) processStats() error { } if err := t.regeneratePartitionStats(tmpToResultSegments); err != nil { - log.Warn("regenerate partition stats failed, wait for retry", zap.Int64("planID", t.GetTaskProto().GetPlanID()), zap.Error(err)) + log.Warn("regenerate partition stats failed, wait for retry", zap.Error(err)) return merr.WrapErrClusteringCompactionMetaError("regeneratePartitionStats", err) } } else { - log.Info("stats task is not enable, set tmp segments to result segments", zap.Int64("planID", t.GetTaskProto().GetPlanID())) + log.Info("stats task is not enable, set tmp segments to result segments") resultSegments = t.GetTaskProto().GetTmpSegments() } - log.Info("clustering compaction stats task finished", zap.Int64("planID", t.GetTaskProto().GetPlanID()), + log.Info("clustering compaction stats task finished", zap.Int64s("tmp segments", t.GetTaskProto().GetTmpSegments()), zap.Int64s("result segments", resultSegments)) @@ -405,7 +414,7 @@ func (t *clusteringCompactionTask) regeneratePartitionStats(tmpToResultSegments } func (t *clusteringCompactionTask) processIndexing() error { - log := log.Ctx(context.TODO()) + log := log.Ctx(context.TODO()).With(zap.Int64("planID", t.GetTaskProto().GetPlanID())) // wait for segment indexed collectionIndexes := t.meta.GetIndexMeta().GetIndexesForCollection(t.GetTaskProto().GetCollectionID(), "") if len(collectionIndexes) == 0 { @@ -424,7 +433,8 @@ func (t *clusteringCompactionTask) processIndexing() error { } return true }() - log.Debug("check compaction result segments index states", zap.Bool("indexed", indexed), zap.Int64("planID", t.GetTaskProto().GetPlanID()), zap.Int64s("segments", t.GetTaskProto().ResultSegments)) + log.Debug("check compaction result segments index states", + zap.Bool("indexed", indexed), zap.Int64s("segments", t.GetTaskProto().ResultSegments)) if indexed { return t.completeTask() } @@ -463,6 +473,7 @@ func (t *clusteringCompactionTask) markInputSegmentsDropped() error { // indexed is the final state of a clustering compaction task // one task should only run this once func (t *clusteringCompactionTask) completeTask() error { + log := log.Ctx(context.TODO()).With(zap.Int64("planID", t.GetTaskProto().GetPlanID())) var err error // first mark result segments visible if err = t.markResultSegmentsVisible(); err != nil { @@ -482,29 +493,34 @@ func (t *clusteringCompactionTask) completeTask() error { return merr.WrapErrClusteringCompactionMetaError("SavePartitionStatsInfo", err) } - // mark input segments as dropped - // now, the segment view only includes the result segments. - if err = t.markInputSegmentsDropped(); err != nil { - return err - } - err = t.meta.GetPartitionStatsMeta().SaveCurrentPartitionStatsVersion(t.GetTaskProto().GetCollectionID(), t.GetTaskProto().GetPartitionID(), t.GetTaskProto().GetChannel(), t.GetTaskProto().GetPlanID()) if err != nil { return merr.WrapErrClusteringCompactionMetaError("SaveCurrentPartitionStatsVersion", err) } - return t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_completed)) + if err = t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_completed)); err != nil { + log.Warn("completeTask update task state to completed failed", zap.Error(err)) + return err + } + // mark input segments as dropped + // now, the segment view only includes the result segments. + if err = t.markInputSegmentsDropped(); err != nil { + log.Warn("mark input segments as Dropped failed, skip it and wait retry") + } + + return nil } func (t *clusteringCompactionTask) processAnalyzing() error { - log := log.Ctx(context.TODO()) + log := log.Ctx(context.TODO()).With(zap.Int64("planID", t.GetTaskProto().GetPlanID())) analyzeTask := t.meta.GetAnalyzeMeta().GetTask(t.GetTaskProto().GetAnalyzeTaskID()) if analyzeTask == nil { log.Warn("analyzeTask not found", zap.Int64("id", t.GetTaskProto().GetAnalyzeTaskID())) return merr.WrapErrAnalyzeTaskNotFound(t.GetTaskProto().GetAnalyzeTaskID()) // retryable } - log.Info("check analyze task state", zap.Int64("id", t.GetTaskProto().GetAnalyzeTaskID()), zap.Int64("version", analyzeTask.GetVersion()), zap.String("state", analyzeTask.State.String())) + log.Info("check analyze task state", zap.Int64("id", t.GetTaskProto().GetAnalyzeTaskID()), + zap.Int64("version", analyzeTask.GetVersion()), zap.String("state", analyzeTask.State.String())) switch analyzeTask.State { case indexpb.JobState_JobStateFinished: if analyzeTask.GetCentroidsFile() == "" { @@ -526,84 +542,100 @@ func (t *clusteringCompactionTask) resetSegmentCompacting() { t.meta.SetSegmentsCompacting(context.TODO(), t.GetTaskProto().GetInputSegments(), false) } -func (t *clusteringCompactionTask) processFailedOrTimeout() error { - log := log.Ctx(context.TODO()) - log.Info("clean task", zap.Int64("triggerID", t.GetTaskProto().GetTriggerID()), zap.Int64("planID", t.GetTaskProto().GetPlanID()), zap.String("state", t.GetTaskProto().GetState().String())) +func (t *clusteringCompactionTask) doClean() error { + log := log.Ctx(context.TODO()).With(zap.Int64("planID", t.GetTaskProto().GetPlanID())) + log.Info("clean task", zap.Int64("triggerID", t.GetTaskProto().GetTriggerID()), + zap.String("state", t.GetTaskProto().GetState().String())) if err := t.sessions.DropCompactionPlan(t.GetTaskProto().GetNodeID(), &datapb.DropCompactionPlanRequest{ PlanID: t.GetTaskProto().GetPlanID(), }); err != nil { - log.Warn("clusteringCompactionTask processFailedOrTimeout unable to drop compaction plan", zap.Int64("planID", t.GetTaskProto().GetPlanID()), zap.Error(err)) + log.Warn("clusteringCompactionTask unable to drop compaction plan", zap.Error(err)) } - isInputDropped := false - for _, segID := range t.GetTaskProto().GetInputSegments() { - if t.meta.GetHealthySegment(context.TODO(), segID) == nil { - isInputDropped = true - break + if t.GetTaskProto().GetState() == datapb.CompactionTaskState_completed { + if err := t.markInputSegmentsDropped(); err != nil { + return err } - } - if isInputDropped { - log.Info("input segments dropped, doing for compatibility", - zap.Int64("triggerID", t.GetTaskProto().GetTriggerID()), zap.Int64("planID", t.GetTaskProto().GetPlanID())) - // this task must be generated by v2.4, just for compatibility - // revert segments meta - var operators []UpdateOperator - // revert level of input segments - // L1 : L1 ->(processPipelining)-> L2 ->(processFailedOrTimeout)-> L1 - // L2 : L2 ->(processPipelining)-> L2 ->(processFailedOrTimeout)-> L2 + } else { + isInputDropped := false for _, segID := range t.GetTaskProto().GetInputSegments() { - operators = append(operators, RevertSegmentLevelOperator(segID)) - } - // if result segments are generated but task fail in the other steps, mark them as L1 segments without partitions stats - for _, segID := range t.GetTaskProto().GetResultSegments() { - operators = append(operators, UpdateSegmentLevelOperator(segID, datapb.SegmentLevel_L1)) - operators = append(operators, UpdateSegmentPartitionStatsVersionOperator(segID, 0)) - } - for _, segID := range t.GetTaskProto().GetTmpSegments() { - // maybe no necessary, there will be no `TmpSegments` that task was generated by v2.4 - operators = append(operators, UpdateSegmentLevelOperator(segID, datapb.SegmentLevel_L1)) - operators = append(operators, UpdateSegmentPartitionStatsVersionOperator(segID, 0)) - } - err := t.meta.UpdateSegmentsInfo(context.TODO(), operators...) - if err != nil { - log.Warn("UpdateSegmentsInfo fail", zap.Error(err)) - return merr.WrapErrClusteringCompactionMetaError("UpdateSegmentsInfo", err) + if t.meta.GetHealthySegment(context.TODO(), segID) == nil { + isInputDropped = true + break + } } - } else { - // after v2.5.0, mark the results segment as dropped - var operators []UpdateOperator - for _, segID := range t.GetTaskProto().GetResultSegments() { - // Don't worry about them being loaded; they are all invisible. - operators = append(operators, UpdateStatusOperator(segID, commonpb.SegmentState_Dropped)) + if isInputDropped { + log.Info("input segments dropped, doing for compatibility", + zap.Int64("triggerID", t.GetTaskProto().GetTriggerID()), zap.Int64("planID", t.GetTaskProto().GetPlanID())) + // this task must be generated by v2.4, just for compatibility + // revert segments meta + var operators []UpdateOperator + // revert level of input segments + // L1 : L1 ->(process)-> L2 ->(clean)-> L1 + // L2 : L2 ->(process)-> L2 ->(clean)-> L2 + for _, segID := range t.GetTaskProto().GetInputSegments() { + operators = append(operators, RevertSegmentLevelOperator(segID)) + } + // if result segments are generated but task fail in the other steps, mark them as L1 segments without partitions stats + for _, segID := range t.GetTaskProto().GetResultSegments() { + operators = append(operators, UpdateSegmentLevelOperator(segID, datapb.SegmentLevel_L1)) + operators = append(operators, UpdateSegmentPartitionStatsVersionOperator(segID, 0)) + } + for _, segID := range t.GetTaskProto().GetTmpSegments() { + // maybe no necessary, there will be no `TmpSegments` that task was generated by v2.4 + operators = append(operators, UpdateSegmentLevelOperator(segID, datapb.SegmentLevel_L1)) + operators = append(operators, UpdateSegmentPartitionStatsVersionOperator(segID, 0)) + } + err := t.meta.UpdateSegmentsInfo(context.TODO(), operators...) + if err != nil { + log.Warn("UpdateSegmentsInfo fail", zap.Error(err)) + return merr.WrapErrClusteringCompactionMetaError("UpdateSegmentsInfo", err) + } + } else { + // after v2.5.0, mark the results segment as dropped + var operators []UpdateOperator + for _, segID := range t.GetTaskProto().GetResultSegments() { + // Don't worry about them being loaded; they are all invisible. + operators = append(operators, UpdateStatusOperator(segID, commonpb.SegmentState_Dropped)) + } + for _, segID := range t.GetTaskProto().GetTmpSegments() { + // Don't worry about them being loaded; they are all invisible. + // tmpSegment is always invisible + operators = append(operators, UpdateStatusOperator(segID, commonpb.SegmentState_Dropped)) + } + err := t.meta.UpdateSegmentsInfo(context.TODO(), operators...) + if err != nil { + log.Warn("UpdateSegmentsInfo fail", zap.Error(err)) + return merr.WrapErrClusteringCompactionMetaError("UpdateSegmentsInfo", err) + } } - for _, segID := range t.GetTaskProto().GetTmpSegments() { - // Don't worry about them being loaded; they are all invisible. - // tmpSegment is always invisible - operators = append(operators, UpdateStatusOperator(segID, commonpb.SegmentState_Dropped)) + + // drop partition stats if uploaded + partitionStatsInfo := &datapb.PartitionStatsInfo{ + CollectionID: t.GetTaskProto().GetCollectionID(), + PartitionID: t.GetTaskProto().GetPartitionID(), + VChannel: t.GetTaskProto().GetChannel(), + Version: t.GetTaskProto().GetPlanID(), + SegmentIDs: t.GetTaskProto().GetResultSegments(), } - err := t.meta.UpdateSegmentsInfo(context.TODO(), operators...) + err := t.meta.CleanPartitionStatsInfo(context.TODO(), partitionStatsInfo) if err != nil { - log.Warn("UpdateSegmentsInfo fail", zap.Error(err)) - return merr.WrapErrClusteringCompactionMetaError("UpdateSegmentsInfo", err) + log.Warn("gcPartitionStatsInfo fail", zap.Error(err)) + return merr.WrapErrCleanPartitionStatsFail(fmt.Sprintf("%d-%d-%s-%d", t.GetTaskProto().GetCollectionID(), t.GetTaskProto().GetPartitionID(), t.GetTaskProto().GetChannel(), t.GetTaskProto().GetPlanID())) } } - t.resetSegmentCompacting() - - // drop partition stats if uploaded - partitionStatsInfo := &datapb.PartitionStatsInfo{ - CollectionID: t.GetTaskProto().GetCollectionID(), - PartitionID: t.GetTaskProto().GetPartitionID(), - VChannel: t.GetTaskProto().GetChannel(), - Version: t.GetTaskProto().GetPlanID(), - SegmentIDs: t.GetTaskProto().GetResultSegments(), - } - err := t.meta.CleanPartitionStatsInfo(context.TODO(), partitionStatsInfo) + err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_cleaned)) if err != nil { - log.Warn("gcPartitionStatsInfo fail", zap.Error(err)) + log.Warn("clusteringCompactionTask fail to updateAndSaveTaskMeta", zap.Error(err)) + return err } - return t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_cleaned)) + // resetSegmentCompacting must be the last step of Clean, to make sure resetSegmentCompacting only called once + // otherwise, it may unlock segments locked by other compaction tasks + t.resetSegmentCompacting() + log.Info("clusteringCompactionTask clean done") + return nil } func (t *clusteringCompactionTask) doAnalyze() error { @@ -691,7 +723,7 @@ func (t *clusteringCompactionTask) updateAndSaveTaskMeta(opts ...compactionTaskO return merr.WrapErrClusteringCompactionMetaError("updateAndSaveTaskMeta", err) // retryable } t.SetTask(task) - log.Info("updateAndSaveTaskMeta success", zap.String("task state", t.GetTaskProto().GetState().String())) + log.Ctx(context.TODO()).Info("updateAndSaveTaskMeta success", zap.String("task state", t.GetTaskProto().GetState().String())) return nil } diff --git a/internal/datacoord/compaction_task_clustering_test.go b/internal/datacoord/compaction_task_clustering_test.go index 9c580e0fc2ab1..4fdc8cb83e09a 100644 --- a/internal/datacoord/compaction_task_clustering_test.go +++ b/internal/datacoord/compaction_task_clustering_test.go @@ -109,7 +109,8 @@ func (s *ClusteringCompactionTaskSuite) TestClusteringCompactionSegmentMetaChang s.mockSessionMgr.EXPECT().Compaction(mock.Anything, mock.Anything, mock.Anything).Return(nil) task := s.generateBasicTask(false) - task.processPipelining() + err := task.processPipelining() + s.NoError(err) seg11 := s.meta.GetSegment(context.TODO(), 101) s.Equal(datapb.SegmentLevel_L1, seg11.Level) @@ -117,6 +118,34 @@ func (s *ClusteringCompactionTaskSuite) TestClusteringCompactionSegmentMetaChang s.Equal(datapb.SegmentLevel_L2, seg21.Level) s.Equal(int64(10000), seg21.PartitionStatsVersion) + task.updateAndSaveTaskMeta(setResultSegments([]int64{103, 104})) + // fake some compaction result segment + s.meta.AddSegment(context.TODO(), &SegmentInfo{ + SegmentInfo: &datapb.SegmentInfo{ + ID: 103, + State: commonpb.SegmentState_Flushed, + Level: datapb.SegmentLevel_L2, + LastLevel: datapb.SegmentLevel_L1, + CreatedByCompaction: true, + PartitionStatsVersion: 10001, + }, + }) + s.meta.AddSegment(context.TODO(), &SegmentInfo{ + SegmentInfo: &datapb.SegmentInfo{ + ID: 104, + State: commonpb.SegmentState_Flushed, + Level: datapb.SegmentLevel_L2, + LastLevel: datapb.SegmentLevel_L1, + CreatedByCompaction: true, + PartitionStatsVersion: 10001, + }, + }) + + s.mockSessionMgr.EXPECT().DropCompactionPlan(mock.Anything, mock.Anything).Return(nil) + + err = task.doClean() + s.NoError(err) + s.Run("v2.4.x", func() { // fake some compaction result segment s.meta.AddSegment(context.TODO(), &SegmentInfo{ @@ -162,8 +191,7 @@ func (s *ClusteringCompactionTaskSuite) TestClusteringCompactionSegmentMetaChang s.mockSessionMgr.EXPECT().DropCompactionPlan(mock.Anything, mock.Anything).Return(nil) task.GetTaskProto().InputSegments = []int64{101, 102} task.GetTaskProto().ResultSegments = []int64{103, 104} - - task.processFailedOrTimeout() + task.Clean() seg12 := s.meta.GetSegment(context.TODO(), 101) s.Equal(datapb.SegmentLevel_L1, seg12.Level) @@ -252,7 +280,7 @@ func (s *ClusteringCompactionTaskSuite) TestClusteringCompactionSegmentMetaChang task.GetTaskProto().TmpSegments = []int64{103, 104} task.GetTaskProto().ResultSegments = []int64{105, 106} - task.processFailedOrTimeout() + task.Clean() seg12 := s.meta.GetSegment(context.TODO(), 101) s.Equal(datapb.SegmentLevel_L1, seg12.Level) @@ -336,7 +364,7 @@ func (s *ClusteringCompactionTaskSuite) TestProcessRetryLogic() { s.Equal(false, task.Process()) s.Equal(int32(3), task.GetTaskProto().RetryTimes) s.Equal(datapb.CompactionTaskState_pipelining, task.GetTaskProto().GetState()) - s.Equal(false, task.Process()) + s.True(task.Process()) s.Equal(int32(0), task.GetTaskProto().RetryTimes) s.Equal(datapb.CompactionTaskState_failed, task.GetTaskProto().GetState()) } @@ -345,7 +373,7 @@ func (s *ClusteringCompactionTaskSuite) TestProcessPipelining() { s.Run("process pipelining fail, segment not found", func() { task := s.generateBasicTask(false) task.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining)) - s.Equal(false, task.Process()) + s.True(task.Process()) s.Equal(datapb.CompactionTaskState_failed, task.GetTaskProto().GetState()) }) @@ -570,11 +598,10 @@ func (s *ClusteringCompactionTaskSuite) TestProcessExecuting() { }, }, }, nil).Once() - s.mockSessionMgr.EXPECT().DropCompactionPlan(mock.Anything, mock.Anything).Return(nil).Once() time.Sleep(time.Second * 1) - s.Equal(true, task.Process()) - s.Equal(datapb.CompactionTaskState_cleaned, task.GetTaskProto().GetState()) + s.True(task.Process()) + s.Equal(datapb.CompactionTaskState_timeout, task.GetTaskProto().GetState()) }) } @@ -675,7 +702,7 @@ func (s *ClusteringCompactionTaskSuite) TestProcessAnalyzingState() { s.Run("analyze task not found", func() { task := s.generateBasicTask(false) task.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_analyzing)) - s.False(task.Process()) + s.True(task.Process()) s.Equal(datapb.CompactionTaskState_failed, task.GetTaskProto().GetState()) }) @@ -691,7 +718,7 @@ func (s *ClusteringCompactionTaskSuite) TestProcessAnalyzingState() { State: indexpb.JobState_JobStateFailed, } s.meta.analyzeMeta.AddAnalyzeTask(t) - s.False(task.Process()) + s.True(task.Process()) s.Equal(datapb.CompactionTaskState_failed, task.GetTaskProto().GetState()) }) @@ -708,7 +735,7 @@ func (s *ClusteringCompactionTaskSuite) TestProcessAnalyzingState() { CentroidsFile: "", } s.meta.analyzeMeta.AddAnalyzeTask(t) - s.False(task.Process()) + s.True(task.Process()) s.Equal(datapb.CompactionTaskState_failed, task.GetTaskProto().GetState()) }) diff --git a/internal/datacoord/compaction_task_l0.go b/internal/datacoord/compaction_task_l0.go index b3ec55c8907e0..e7535bca8e4b5 100644 --- a/internal/datacoord/compaction_task_l0.go +++ b/internal/datacoord/compaction_task_l0.go @@ -73,7 +73,7 @@ func newL0CompactionTask(t *datapb.CompactionTask, allocator allocator.Allocator } // Note: return True means exit this state machine. -// ONLY return True for processCompleted or processFailed +// ONLY return True for Completed, Failed func (t *l0CompactionTask) Process() bool { switch t.GetTaskProto().GetState() { case datapb.CompactionTaskState_pipelining: @@ -188,6 +188,11 @@ func (t *l0CompactionTask) processCompleted() bool { } func (t *l0CompactionTask) processFailed() bool { + return true +} + +func (t *l0CompactionTask) doClean() error { + log := log.With(zap.Int64("planID", t.GetTaskProto().GetPlanID())) if t.hasAssignedWorker() { err := t.sessions.DropCompactionPlan(t.GetTaskProto().GetNodeID(), &datapb.DropCompactionPlanRequest{ PlanID: t.GetTaskProto().GetPlanID(), @@ -197,15 +202,21 @@ func (t *l0CompactionTask) processFailed() bool { } } - t.resetSegmentCompacting() err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_cleaned)) if err != nil { log.Warn("l0CompactionTask failed to updateAndSaveTaskMeta", zap.Error(err)) - return false + return err } - log.Info("l0CompactionTask processFailed done", zap.Int64("taskID", t.GetTaskProto().GetTriggerID()), zap.Int64("planID", t.GetTaskProto().GetPlanID())) - return true + // resetSegmentCompacting must be the last step of Clean, to make sure resetSegmentCompacting only called once + // otherwise, it may unlock segments locked by other compaction tasks + t.resetSegmentCompacting() + log.Info("l0CompactionTask clean done") + return nil +} + +func (t *l0CompactionTask) Clean() bool { + return t.doClean() == nil } func (t *l0CompactionTask) GetResult() *datapb.CompactionPlanResult { diff --git a/internal/datacoord/compaction_task_l0_test.go b/internal/datacoord/compaction_task_l0_test.go index ee7a44ca3f2ab..f6eb442c3edaf 100644 --- a/internal/datacoord/compaction_task_l0_test.go +++ b/internal/datacoord/compaction_task_l0_test.go @@ -335,7 +335,6 @@ func (s *L0CompactionTaskSuite) TestPorcessStateTrans() { t := s.generateTestL0Task(datapb.CompactionTaskState_executing) t.updateAndSaveTaskMeta(setNodeID(100)) s.Require().True(t.GetTaskProto().GetNodeID() > 0) - s.mockSessMgr.EXPECT().GetCompactionPlanResult(t.GetTaskProto().NodeID, mock.Anything). Return(&datapb.CompactionPlanResult{ PlanID: t.GetTaskProto().GetPlanID(), @@ -417,12 +416,10 @@ func (s *L0CompactionTaskSuite) TestPorcessStateTrans() { PlanID: t.GetTaskProto().GetPlanID(), State: datapb.CompactionTaskState_failed, }, nil).Once() - s.mockSessMgr.EXPECT().DropCompactionPlan(t.GetTaskProto().GetNodeID(), mock.Anything).Return(nil) - s.mockMeta.EXPECT().SetSegmentsCompacting(mock.Anything, mock.Anything, false).Return().Once() got := t.Process() s.True(got) - s.Equal(datapb.CompactionTaskState_cleaned, t.GetTaskProto().GetState()) + s.Equal(datapb.CompactionTaskState_failed, t.GetTaskProto().GetState()) }) s.Run("test executing with result failed save compaction meta failed", func() { s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything, mock.Anything).Return(nil).Once() @@ -510,14 +507,10 @@ func (s *L0CompactionTaskSuite) TestPorcessStateTrans() { t := s.generateTestL0Task(datapb.CompactionTaskState_failed) t.updateAndSaveTaskMeta(setNodeID(100)) s.Require().True(t.GetTaskProto().GetNodeID() > 0) - s.mockSessMgr.EXPECT().DropCompactionPlan(t.GetTaskProto().GetNodeID(), mock.Anything).Return(nil).Once() - s.mockMeta.EXPECT().SetSegmentsCompacting(mock.Anything, mock.Anything, false).RunAndReturn(func(ctx context.Context, segIDs []int64, isCompacting bool) { - s.ElementsMatch(segIDs, t.GetTaskProto().GetInputSegments()) - }).Once() got := t.Process() s.True(got) - s.Equal(datapb.CompactionTaskState_cleaned, t.GetTaskProto().GetState()) + s.Equal(datapb.CompactionTaskState_failed, t.GetTaskProto().GetState()) }) s.Run("test process failed failed", func() { @@ -525,14 +518,10 @@ func (s *L0CompactionTaskSuite) TestPorcessStateTrans() { t := s.generateTestL0Task(datapb.CompactionTaskState_failed) t.updateAndSaveTaskMeta(setNodeID(100)) s.Require().True(t.GetTaskProto().GetNodeID() > 0) - s.mockSessMgr.EXPECT().DropCompactionPlan(t.GetTaskProto().GetNodeID(), mock.Anything).Return(errors.New("mock error")).Once() - s.mockMeta.EXPECT().SetSegmentsCompacting(mock.Anything, mock.Anything, false).RunAndReturn(func(ctx context.Context, segIDs []int64, isCompacting bool) { - s.ElementsMatch(segIDs, t.GetTaskProto().GetInputSegments()) - }).Once() got := t.Process() s.True(got) - s.Equal(datapb.CompactionTaskState_cleaned, t.GetTaskProto().GetState()) + s.Equal(datapb.CompactionTaskState_failed, t.GetTaskProto().GetState()) }) s.Run("test unknown task", func() { diff --git a/internal/datacoord/compaction_task_mix.go b/internal/datacoord/compaction_task_mix.go index 97337bfab2ad1..c61756949574a 100644 --- a/internal/datacoord/compaction_task_mix.go +++ b/internal/datacoord/compaction_task_mix.go @@ -70,7 +70,7 @@ func (t *mixCompactionTask) processPipelining() bool { log.Warn("mixCompactionTask failed to updateAndSaveTaskMeta", zap.Error(err)) return false } - return t.processFailed() + return true } err = t.sessions.Compaction(context.TODO(), t.GetTaskProto().GetNodeID(), t.GetPlan()) @@ -78,15 +78,18 @@ func (t *mixCompactionTask) processPipelining() bool { // Compaction tasks may be refused by DataNode because of slot limit. In this case, the node id is reset // to enable a retry in compaction.checkCompaction(). // This is tricky, we should remove the reassignment here. - log.Warn("mixCompactionTask failed to notify compaction tasks to DataNode", zap.Error(err)) - t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining), setNodeID(NullNodeID)) + log.Warn("mixCompactionTask failed to notify compaction tasks to DataNode", zap.Int64("planID", t.GetTaskProto().GetPlanID()), zap.Error(err)) + err = t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining), setNodeID(NullNodeID)) + if err != nil { + log.Warn("mixCompactionTask failed to updateAndSaveTaskMeta", zap.Error(err)) + } return false } log.Info("mixCompactionTask notify compaction tasks to DataNode") err = t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_executing)) if err != nil { - log.Warn("mixCompactionTask update task state failed", zap.Error(err)) + log.Warn("mixCompactionTask failed to updateAndSaveTaskMeta", zap.Error(err)) return false } return false @@ -129,7 +132,7 @@ func (t *mixCompactionTask) processExecuting() bool { log.Warn("mixCompactionTask failed to setState failed", zap.Error(err)) return false } - return t.processFailed() + return true } if err := t.saveSegmentMeta(); err != nil { log.Warn("mixCompactionTask failed to save segment meta", zap.Error(err)) @@ -139,7 +142,7 @@ func (t *mixCompactionTask) processExecuting() bool { log.Warn("mixCompactionTask failed to setState failed", zap.Error(err)) return false } - return t.processFailed() + return true } return false } @@ -155,7 +158,7 @@ func (t *mixCompactionTask) processExecuting() bool { if err != nil { log.Warn("fail to updateAndSaveTaskMeta") } - return false + return true } return false } @@ -183,7 +186,7 @@ func (t *mixCompactionTask) saveSegmentMeta() error { } // Note: return True means exit this state machine. -// ONLY return True for processCompleted or processFailed +// ONLY return True for Completed, Failed or Timeout func (t *mixCompactionTask) Process() bool { log := log.With(zap.Int64("triggerID", t.GetTaskProto().GetTriggerID()), zap.Int64("PlanID", t.GetTaskProto().GetPlanID()), zap.Int64("collectionID", t.GetTaskProto().GetCollectionID())) lastState := t.GetTaskProto().GetState().String() @@ -257,21 +260,32 @@ func (t *mixCompactionTask) ShadowClone(opts ...compactionTaskOpt) *datapb.Compa } func (t *mixCompactionTask) processFailed() bool { + return true +} + +func (t *mixCompactionTask) Clean() bool { + return t.doClean() == nil +} + +func (t *mixCompactionTask) doClean() error { log := log.With(zap.Int64("triggerID", t.GetTaskProto().GetTriggerID()), zap.Int64("PlanID", t.GetTaskProto().GetPlanID()), zap.Int64("collectionID", t.GetTaskProto().GetCollectionID())) if err := t.sessions.DropCompactionPlan(t.GetTaskProto().GetNodeID(), &datapb.DropCompactionPlanRequest{ PlanID: t.GetTaskProto().GetPlanID(), }); err != nil { log.Warn("mixCompactionTask processFailed unable to drop compaction plan", zap.Error(err)) + return err } - log.Info("mixCompactionTask processFailed done") - t.resetSegmentCompacting() err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_cleaned)) if err != nil { - log.Warn("mixCompactionTask failed to updateAndSaveTaskMeta", zap.Error(err)) - return false + log.Warn("mixCompactionTask fail to updateAndSaveTaskMeta", zap.Error(err)) + return err } - return true + // resetSegmentCompacting must be the last step of Clean, to make sure resetSegmentCompacting only called once + // otherwise, it may unlock segments locked by other compaction tasks + t.resetSegmentCompacting() + log.Info("mixCompactionTask clean done") + return nil } func (t *mixCompactionTask) updateAndSaveTaskMeta(opts ...compactionTaskOpt) error { diff --git a/internal/datacoord/compaction_test.go b/internal/datacoord/compaction_test.go index 38a7c70739877..d5053eb7f7327 100644 --- a/internal/datacoord/compaction_test.go +++ b/internal/datacoord/compaction_test.go @@ -26,6 +26,7 @@ import ( "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" + "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/datacoord/allocator" "github.com/milvus-io/milvus/internal/datacoord/session" "github.com/milvus-io/milvus/internal/metastore/kv/binlog" @@ -48,6 +49,7 @@ type CompactionPlanHandlerSuite struct { mockCm *MockChannelManager mockSessMgr *session.MockDataNodeManager handler *compactionPlanHandler + mockHandler *NMockHandler cluster *MockCluster } @@ -59,6 +61,8 @@ func (s *CompactionPlanHandlerSuite) SetupTest() { s.mockSessMgr = session.NewMockDataNodeManager(s.T()) s.cluster = NewMockCluster(s.T()) s.handler = newCompactionPlanHandler(s.cluster, s.mockSessMgr, s.mockMeta, s.mockAlloc, nil, nil) + s.mockHandler = NewNMockHandler(s.T()) + s.mockHandler.EXPECT().GetCollection(mock.Anything, mock.Anything).Return(&collectionInfo{}, nil).Maybe() } func (s *CompactionPlanHandlerSuite) TestScheduleEmpty() { @@ -939,6 +943,179 @@ func (s *CompactionPlanHandlerSuite) TestProcessCompleteCompaction() { s.NoError(err) } +func (s *CompactionPlanHandlerSuite) TestCleanCompaction() { + s.SetupTest() + + tests := []struct { + task CompactionTask + }{ + { + newMixCompactionTask( + &datapb.CompactionTask{ + PlanID: 1, + TriggerID: 1, + Type: datapb.CompactionType_MixCompaction, + State: datapb.CompactionTaskState_failed, + NodeID: 1, + InputSegments: []UniqueID{1, 2}, + }, + nil, s.mockMeta, s.mockSessMgr), + }, + { + newL0CompactionTask(&datapb.CompactionTask{ + PlanID: 1, + TriggerID: 1, + Type: datapb.CompactionType_Level0DeleteCompaction, + State: datapb.CompactionTaskState_failed, + NodeID: 1, + InputSegments: []UniqueID{1, 2}, + }, + nil, s.mockMeta, s.mockSessMgr), + }, + } + for _, test := range tests { + task := test.task + s.mockMeta.EXPECT().SetSegmentsCompacting(mock.Anything, mock.Anything, mock.Anything).Return().Once() + s.mockSessMgr.EXPECT().DropCompactionPlan(mock.Anything, mock.Anything).Return(nil) + s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything, mock.Anything).Return(nil) + s.mockSessMgr.EXPECT().DropCompactionPlan(mock.Anything, mock.Anything).Return(nil) + + s.handler.executingTasks[1] = task + s.Equal(1, len(s.handler.executingTasks)) + err := s.handler.checkCompaction() + s.NoError(err) + s.Equal(0, len(s.handler.executingTasks)) + s.Equal(1, len(s.handler.cleaningTasks)) + s.handler.cleanFailedTasks() + s.Equal(0, len(s.handler.cleaningTasks)) + } +} + +func (s *CompactionPlanHandlerSuite) TestCleanClusteringCompaction() { + s.SetupTest() + + task := newClusteringCompactionTask( + &datapb.CompactionTask{ + PlanID: 1, + TriggerID: 1, + CollectionID: 1001, + Type: datapb.CompactionType_ClusteringCompaction, + State: datapb.CompactionTaskState_failed, + NodeID: 1, + InputSegments: []UniqueID{1, 2}, + }, + nil, s.mockMeta, s.mockSessMgr, s.mockHandler, nil) + s.mockMeta.EXPECT().GetHealthySegment(mock.Anything, mock.Anything).Return(nil) + s.mockMeta.EXPECT().SetSegmentsCompacting(mock.Anything, mock.Anything, mock.Anything).Return().Once() + s.mockMeta.EXPECT().UpdateSegmentsInfo(mock.Anything, mock.Anything, mock.Anything).Return(nil) + s.mockMeta.EXPECT().CleanPartitionStatsInfo(mock.Anything, mock.Anything).Return(nil) + s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything, mock.Anything).Return(nil) + s.mockSessMgr.EXPECT().DropCompactionPlan(mock.Anything, mock.Anything).Return(nil) + + s.handler.executingTasks[1] = task + s.Equal(1, len(s.handler.executingTasks)) + s.handler.checkCompaction() + s.Equal(0, len(s.handler.executingTasks)) + s.Equal(1, len(s.handler.cleaningTasks)) + s.handler.cleanFailedTasks() + s.Equal(0, len(s.handler.cleaningTasks)) +} + +func (s *CompactionPlanHandlerSuite) TestCleanClusteringCompactionCommitFail() { + s.SetupTest() + + task := newClusteringCompactionTask(&datapb.CompactionTask{ + PlanID: 1, + TriggerID: 1, + CollectionID: 1001, + Channel: "ch-1", + Type: datapb.CompactionType_ClusteringCompaction, + State: datapb.CompactionTaskState_executing, + NodeID: 1, + InputSegments: []UniqueID{1, 2}, + ClusteringKeyField: &schemapb.FieldSchema{ + FieldID: 100, + Name: Int64Field, + IsPrimaryKey: true, + DataType: schemapb.DataType_Int64, + AutoID: true, + IsClusteringKey: true, + }, + }, + nil, s.mockMeta, s.mockSessMgr, s.mockHandler, nil) + + s.mockMeta.EXPECT().GetHealthySegment(mock.Anything, mock.Anything).Return(nil) + s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything, mock.Anything).Return(nil) + s.mockSessMgr.EXPECT().GetCompactionPlanResult(UniqueID(1), int64(1)).Return( + &datapb.CompactionPlanResult{ + PlanID: 1, + State: datapb.CompactionTaskState_completed, + Segments: []*datapb.CompactionSegment{ + { + PlanID: 1, + SegmentID: 101, + }, + }, + }, nil).Once() + s.mockMeta.EXPECT().CompleteCompactionMutation(mock.Anything, mock.Anything, mock.Anything).Return(nil, nil, errors.New("mock error")) + s.mockSessMgr.EXPECT().DropCompactionPlan(mock.Anything, mock.Anything).Return(nil) + s.handler.executingTasks[1] = task + s.Equal(1, len(s.handler.executingTasks)) + s.handler.checkCompaction() + s.Equal(0, len(task.GetTaskProto().GetResultSegments())) + + s.Equal(datapb.CompactionTaskState_failed, task.GetTaskProto().GetState()) + s.Equal(0, len(s.handler.executingTasks)) + s.Equal(1, len(s.handler.cleaningTasks)) + + s.mockMeta.EXPECT().SetSegmentsCompacting(mock.Anything, mock.Anything, mock.Anything).Return().Once() + s.mockMeta.EXPECT().UpdateSegmentsInfo(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) + s.mockMeta.EXPECT().CleanPartitionStatsInfo(mock.Anything, mock.Anything).Return(nil) + s.handler.cleanFailedTasks() + s.Equal(0, len(s.handler.cleaningTasks)) +} + +// test compactionHandler should keep clean the failed task until it become cleaned +func (s *CompactionPlanHandlerSuite) TestKeepClean() { + s.SetupTest() + + tests := []struct { + task CompactionTask + }{ + { + newClusteringCompactionTask(&datapb.CompactionTask{ + PlanID: 1, + TriggerID: 1, + Type: datapb.CompactionType_ClusteringCompaction, + State: datapb.CompactionTaskState_failed, + NodeID: 1, + InputSegments: []UniqueID{1, 2}, + }, + nil, s.mockMeta, s.mockSessMgr, s.mockHandler, nil), + }, + } + for _, test := range tests { + task := test.task + s.mockMeta.EXPECT().GetHealthySegment(mock.Anything, mock.Anything).Return(nil) + s.mockMeta.EXPECT().SetSegmentsCompacting(mock.Anything, mock.Anything, mock.Anything).Return() + s.mockMeta.EXPECT().UpdateSegmentsInfo(mock.Anything, mock.Anything, mock.Anything).Return(nil) + s.mockMeta.EXPECT().CleanPartitionStatsInfo(mock.Anything, mock.Anything).Return(errors.New("mock error")).Once() + s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything, mock.Anything).Return(nil) + s.mockSessMgr.EXPECT().DropCompactionPlan(mock.Anything, mock.Anything).Return(nil) + + s.handler.executingTasks[1] = task + s.Equal(1, len(s.handler.executingTasks)) + s.handler.checkCompaction() + s.Equal(0, len(s.handler.executingTasks)) + s.Equal(1, len(s.handler.cleaningTasks)) + s.handler.cleanFailedTasks() + s.Equal(1, len(s.handler.cleaningTasks)) + s.mockMeta.EXPECT().CleanPartitionStatsInfo(mock.Anything, mock.Anything).Return(nil).Once() + s.handler.cleanFailedTasks() + s.Equal(0, len(s.handler.cleaningTasks)) + } +} + func getFieldBinlogIDs(fieldID int64, logIDs ...int64) *datapb.FieldBinlog { l := &datapb.FieldBinlog{ FieldID: fieldID, diff --git a/internal/datacoord/meta.go b/internal/datacoord/meta.go index d1b321ff78ae2..aab1f9a41e78e 100644 --- a/internal/datacoord/meta.go +++ b/internal/datacoord/meta.go @@ -791,6 +791,12 @@ func UpdateStatusOperator(segmentID int64, status commonpb.SegmentState) UpdateO return false } + if segment.GetState() == status { + log.Ctx(context.TODO()).Info("meta update: segment stats already is target state", + zap.Int64("segmentID", segmentID), zap.String("status", status.String())) + return false + } + updateSegStateAndPrepareMetrics(segment, status, modPack.metricMutation) if status == commonpb.SegmentState_Dropped { segment.DroppedAt = uint64(time.Now().UnixNano()) diff --git a/internal/datacoord/partition_stats_meta.go b/internal/datacoord/partition_stats_meta.go index 1429106e719b9..97db8227ecd07 100644 --- a/internal/datacoord/partition_stats_meta.go +++ b/internal/datacoord/partition_stats_meta.go @@ -107,8 +107,8 @@ func (psm *partitionStatsMeta) ListPartitionStatsInfos(collectionID int64, parti func (psm *partitionStatsMeta) SavePartitionStatsInfo(info *datapb.PartitionStatsInfo) error { psm.Lock() defer psm.Unlock() - if err := psm.catalog.SavePartitionStatsInfo(psm.ctx, info); err != nil { - log.Error("meta update: update PartitionStatsInfo info fail", zap.Error(err)) + if err := psm.catalog.SavePartitionStatsInfo(context.TODO(), info); err != nil { + log.Ctx(context.TODO()).Error("meta update: update PartitionStatsInfo info fail", zap.Error(err)) return err } if _, ok := psm.partitionStatsInfos[info.GetVChannel()]; !ok { @@ -127,8 +127,26 @@ func (psm *partitionStatsMeta) SavePartitionStatsInfo(info *datapb.PartitionStat func (psm *partitionStatsMeta) DropPartitionStatsInfo(ctx context.Context, info *datapb.PartitionStatsInfo) error { psm.Lock() defer psm.Unlock() + // if the dropping partitionStats is the current version, should update currentPartitionStats + currentVersion := psm.innerGetCurrentPartitionStatsVersion(info.GetCollectionID(), info.GetPartitionID(), info.GetVChannel()) + if currentVersion == info.GetVersion() && currentVersion != emptyPartitionStatsVersion { + infos := psm.partitionStatsInfos[info.GetVChannel()][info.GetPartitionID()].infos + if len(infos) > 0 { + var maxVersion int64 = 0 + for version := range infos { + if version > maxVersion && version < currentVersion { + maxVersion = version + } + } + err := psm.innerSaveCurrentPartitionStatsVersion(info.GetCollectionID(), info.GetPartitionID(), info.GetVChannel(), maxVersion) + if err != nil { + return err + } + } + } + if err := psm.catalog.DropPartitionStatsInfo(ctx, info); err != nil { - log.Error("meta update: drop PartitionStatsInfo info fail", + log.Ctx(ctx).Error("meta update: drop PartitionStatsInfo info fail", zap.Int64("collectionID", info.GetCollectionID()), zap.Int64("partitionID", info.GetPartitionID()), zap.String("vchannel", info.GetVChannel()), @@ -155,8 +173,11 @@ func (psm *partitionStatsMeta) DropPartitionStatsInfo(ctx context.Context, info func (psm *partitionStatsMeta) SaveCurrentPartitionStatsVersion(collectionID, partitionID int64, vChannel string, currentPartitionStatsVersion int64) error { psm.Lock() defer psm.Unlock() + return psm.innerSaveCurrentPartitionStatsVersion(collectionID, partitionID, vChannel, currentPartitionStatsVersion) +} - log.Info("update current partition stats version", zap.Int64("collectionID", collectionID), +func (psm *partitionStatsMeta) innerSaveCurrentPartitionStatsVersion(collectionID, partitionID int64, vChannel string, currentPartitionStatsVersion int64) error { + log.Ctx(context.TODO()).Info("update current partition stats version", zap.Int64("collectionID", collectionID), zap.Int64("partitionID", partitionID), zap.String("vChannel", vChannel), zap.Int64("currentPartitionStatsVersion", currentPartitionStatsVersion)) @@ -180,7 +201,10 @@ func (psm *partitionStatsMeta) SaveCurrentPartitionStatsVersion(collectionID, pa func (psm *partitionStatsMeta) GetCurrentPartitionStatsVersion(collectionID, partitionID int64, vChannel string) int64 { psm.RLock() defer psm.RUnlock() + return psm.innerGetCurrentPartitionStatsVersion(collectionID, partitionID, vChannel) +} +func (psm *partitionStatsMeta) innerGetCurrentPartitionStatsVersion(collectionID, partitionID int64, vChannel string) int64 { if _, ok := psm.partitionStatsInfos[vChannel]; !ok { return emptyPartitionStatsVersion } diff --git a/internal/datacoord/partition_stats_meta_test.go b/internal/datacoord/partition_stats_meta_test.go index 0c67f2d4424b9..1b27a7bde770f 100644 --- a/internal/datacoord/partition_stats_meta_test.go +++ b/internal/datacoord/partition_stats_meta_test.go @@ -91,3 +91,61 @@ func (s *PartitionStatsMetaSuite) TestGetPartitionStats() { currentVersion4 := partitionStatsMeta.GetCurrentPartitionStatsVersion(1, 2, "ch-1") s.Equal(int64(100), currentVersion4) } + +func (s *PartitionStatsMetaSuite) TestDropPartitionStats() { + ctx := context.Background() + partitionStatsMeta, err := newPartitionStatsMeta(ctx, s.catalog) + s.NoError(err) + collectionID := int64(1) + partitionID := int64(2) + channel := "ch-1" + s.catalog.EXPECT().DropPartitionStatsInfo(mock.Anything, mock.Anything).Return(nil) + s.catalog.EXPECT().SaveCurrentPartitionStatsVersion(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) + + partitionStats := []*datapb.PartitionStatsInfo{ + { + CollectionID: collectionID, + PartitionID: partitionID, + VChannel: channel, + SegmentIDs: []int64{100000}, + Version: 100, + }, + { + CollectionID: collectionID, + PartitionID: partitionID, + VChannel: channel, + SegmentIDs: []int64{100000}, + Version: 101, + }, + { + CollectionID: collectionID, + PartitionID: partitionID, + VChannel: channel, + SegmentIDs: []int64{100000}, + Version: 102, + }, + } + for _, partitionStats := range partitionStats { + partitionStatsMeta.SavePartitionStatsInfo(partitionStats) + } + partitionStatsMeta.SaveCurrentPartitionStatsVersion(collectionID, partitionID, channel, 102) + version := partitionStatsMeta.GetCurrentPartitionStatsVersion(collectionID, partitionID, channel) + s.Equal(int64(102), version) + + err = partitionStatsMeta.DropPartitionStatsInfo(context.Background(), partitionStats[2]) + s.NoError(err) + s.Equal(2, len(partitionStatsMeta.partitionStatsInfos[channel][partitionID].infos)) + version2 := partitionStatsMeta.GetCurrentPartitionStatsVersion(collectionID, partitionID, channel) + s.Equal(int64(101), version2) + + err = partitionStatsMeta.DropPartitionStatsInfo(context.Background(), partitionStats[1]) + s.Equal(1, len(partitionStatsMeta.partitionStatsInfos[channel][partitionID].infos)) + version3 := partitionStatsMeta.GetCurrentPartitionStatsVersion(collectionID, partitionID, channel) + s.Equal(int64(100), version3) + + err = partitionStatsMeta.DropPartitionStatsInfo(context.Background(), partitionStats[0]) + s.NoError(err) + s.Nil(partitionStatsMeta.partitionStatsInfos[channel][partitionID]) + version4 := partitionStatsMeta.GetCurrentPartitionStatsVersion(collectionID, partitionID, channel) + s.Equal(emptyPartitionStatsVersion, version4) +} diff --git a/pkg/util/merr/errors.go b/pkg/util/merr/errors.go index 8de8203461eaa..b36d7230b93b8 100644 --- a/pkg/util/merr/errors.go +++ b/pkg/util/merr/errors.go @@ -215,6 +215,7 @@ var ( ErrGetCompactionPlanResultFail = newMilvusError("fail to get compaction plan", 2313, true) ErrCompactionResult = newMilvusError("illegal compaction results", 2314, false) ErrDuplicatedCompactionTask = newMilvusError("duplicated compaction task", 2315, false) + ErrCleanPartitionStatsFail = newMilvusError("fail to clean partition Stats", 2316, true) ErrDataNodeSlotExhausted = newMilvusError("datanode slot exhausted", 2401, false) diff --git a/pkg/util/merr/utils.go b/pkg/util/merr/utils.go index aef0bed7a344b..d29df8199fc69 100644 --- a/pkg/util/merr/utils.go +++ b/pkg/util/merr/utils.go @@ -1189,6 +1189,14 @@ func WrapErrClusteringCompactionMetaError(operation string, err error) error { return wrapFieldsWithDesc(ErrClusteringCompactionMetaError, err.Error(), value("operation", operation)) } +func WrapErrCleanPartitionStatsFail(msg ...string) error { + err := error(ErrCleanPartitionStatsFail) + if len(msg) > 0 { + err = errors.Wrap(err, strings.Join(msg, "->")) + } + return err +} + func WrapErrAnalyzeTaskNotFound(id int64) error { return wrapFields(ErrAnalyzeTaskNotFound, value("analyzeId", id)) }