diff --git a/internal/datacoord/compaction_task_l0.go b/internal/datacoord/compaction_task_l0.go index cf531eeb87e31..f29f057b106cb 100644 --- a/internal/datacoord/compaction_task_l0.go +++ b/internal/datacoord/compaction_task_l0.go @@ -139,52 +139,69 @@ func (t *l0CompactionTask) processExecuting() bool { return false } -func (t *l0CompactionTask) GetSpan() trace.Span { - return t.span +func (t *l0CompactionTask) processMetaSaved() bool { + err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_completed)) + if err != nil { + log.Warn("l0CompactionTask unable to processMetaSaved", zap.Int64("planID", t.GetPlanID()), zap.Error(err)) + return false + } + return t.processCompleted() +} + +func (t *l0CompactionTask) processCompleted() bool { + if t.hasAssignedWorker() { + err := t.sessions.DropCompactionPlan(t.GetNodeID(), &datapb.DropCompactionPlanRequest{ + PlanID: t.GetPlanID(), + }) + if err != nil { + log.Warn("l0CompactionTask unable to drop compaction plan", zap.Int64("planID", t.GetPlanID()), zap.Error(err)) + } + } + + t.resetSegmentCompacting() + UpdateCompactionSegmentSizeMetrics(t.result.GetSegments()) + log.Info("l0CompactionTask processCompleted done", zap.Int64("planID", t.GetPlanID())) + return true +} + +func (t *l0CompactionTask) processTimeout() bool { + t.resetSegmentCompacting() + return true +} + +func (t *l0CompactionTask) processFailed() bool { + if t.hasAssignedWorker() { + err := t.sessions.DropCompactionPlan(t.GetNodeID(), &datapb.DropCompactionPlanRequest{ + PlanID: t.GetPlanID(), + }) + if err != nil { + log.Warn("l0CompactionTask processFailed unable to drop compaction plan", zap.Int64("planID", t.GetPlanID()), zap.Error(err)) + } + } + + t.resetSegmentCompacting() + log.Info("l0CompactionTask processFailed done", zap.Int64("taskID", t.GetTriggerID()), zap.Int64("planID", t.GetPlanID())) + return true } func (t *l0CompactionTask) GetResult() *datapb.CompactionPlanResult { return t.result } -func (t *l0CompactionTask) SetTask(task *datapb.CompactionTask) { - t.CompactionTask = task +func (t *l0CompactionTask) SetResult(result *datapb.CompactionPlanResult) { + t.result = result } -func (t *l0CompactionTask) SetSpan(span trace.Span) { - t.span = span +func (t *l0CompactionTask) SetTask(task *datapb.CompactionTask) { + t.CompactionTask = task } -func (t *l0CompactionTask) SetPlan(plan *datapb.CompactionPlan) { - t.plan = plan +func (t *l0CompactionTask) GetSpan() trace.Span { + return t.span } -func (t *l0CompactionTask) ShadowClone(opts ...compactionTaskOpt) *datapb.CompactionTask { - taskClone := &datapb.CompactionTask{ - PlanID: t.GetPlanID(), - TriggerID: t.GetTriggerID(), - State: t.GetState(), - StartTime: t.GetStartTime(), - EndTime: t.GetEndTime(), - TimeoutInSeconds: t.GetTimeoutInSeconds(), - Type: t.GetType(), - CollectionTtl: t.CollectionTtl, - CollectionID: t.GetCollectionID(), - PartitionID: t.GetPartitionID(), - Channel: t.GetChannel(), - InputSegments: t.GetInputSegments(), - ResultSegments: t.GetResultSegments(), - TotalRows: t.TotalRows, - Schema: t.Schema, - NodeID: t.GetNodeID(), - FailReason: t.GetFailReason(), - RetryTimes: t.GetRetryTimes(), - Pos: t.GetPos(), - } - for _, opt := range opts { - opt(taskClone) - } - return taskClone +func (t *l0CompactionTask) SetSpan(span trace.Span) { + t.span = span } func (t *l0CompactionTask) EndSpan() { @@ -193,20 +210,24 @@ func (t *l0CompactionTask) EndSpan() { } } -func (t *l0CompactionTask) GetLabel() string { - return fmt.Sprintf("%d-%s", t.PartitionID, t.GetChannel()) +func (t *l0CompactionTask) SetPlan(plan *datapb.CompactionPlan) { + t.plan = plan } func (t *l0CompactionTask) GetPlan() *datapb.CompactionPlan { return t.plan } -func (t *l0CompactionTask) NeedReAssignNodeID() bool { - return t.GetState() == datapb.CompactionTaskState_pipelining && (t.GetNodeID() == 0 || t.GetNodeID() == NullNodeID) +func (t *l0CompactionTask) SetStartTime(startTime int64) { + t.StartTime = startTime } -func (t *l0CompactionTask) SetResult(result *datapb.CompactionPlanResult) { - t.result = result +func (t *l0CompactionTask) GetLabel() string { + return fmt.Sprintf("%d-%s", t.PartitionID, t.GetChannel()) +} + +func (t *l0CompactionTask) NeedReAssignNodeID() bool { + return t.GetState() == datapb.CompactionTaskState_pipelining && (!t.hasAssignedWorker()) } func (t *l0CompactionTask) CleanLogPath() { @@ -229,6 +250,34 @@ func (t *l0CompactionTask) CleanLogPath() { } } +func (t *l0CompactionTask) ShadowClone(opts ...compactionTaskOpt) *datapb.CompactionTask { + taskClone := &datapb.CompactionTask{ + PlanID: t.GetPlanID(), + TriggerID: t.GetTriggerID(), + State: t.GetState(), + StartTime: t.GetStartTime(), + EndTime: t.GetEndTime(), + TimeoutInSeconds: t.GetTimeoutInSeconds(), + Type: t.GetType(), + CollectionTtl: t.CollectionTtl, + CollectionID: t.GetCollectionID(), + PartitionID: t.GetPartitionID(), + Channel: t.GetChannel(), + InputSegments: t.GetInputSegments(), + ResultSegments: t.GetResultSegments(), + TotalRows: t.TotalRows, + Schema: t.Schema, + NodeID: t.GetNodeID(), + FailReason: t.GetFailReason(), + RetryTimes: t.GetRetryTimes(), + Pos: t.GetPos(), + } + for _, opt := range opts { + opt(taskClone) + } + return taskClone +} + func (t *l0CompactionTask) BuildCompactionRequest() (*datapb.CompactionPlan, error) { beginLogID, _, err := t.allocator.allocN(1) if err != nil { @@ -306,53 +355,12 @@ func (t *l0CompactionTask) BuildCompactionRequest() (*datapb.CompactionPlan, err return plan, nil } -func (t *l0CompactionTask) processMetaSaved() bool { - err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_completed)) - if err != nil { - log.Warn("l0CompactionTask unable to processMetaSaved", zap.Int64("planID", t.GetPlanID()), zap.Error(err)) - return false - } - return t.processCompleted() -} - -func (t *l0CompactionTask) processCompleted() bool { - if t.GetNodeID() != 0 && t.GetNodeID() != NullNodeID { - err := t.sessions.DropCompactionPlan(t.GetNodeID(), &datapb.DropCompactionPlanRequest{ - PlanID: t.GetPlanID(), - }) - if err != nil { - log.Warn("l0CompactionTask unable to drop compaction plan", zap.Int64("planID", t.GetPlanID()), zap.Error(err)) - } - } - - t.resetSegmentCompacting() - UpdateCompactionSegmentSizeMetrics(t.result.GetSegments()) - log.Info("l0CompactionTask processCompleted done", zap.Int64("planID", t.GetPlanID())) - return true -} - func (t *l0CompactionTask) resetSegmentCompacting() { t.meta.SetSegmentsCompacting(t.GetInputSegments(), false) } -func (t *l0CompactionTask) processTimeout() bool { - t.resetSegmentCompacting() - return true -} - -func (t *l0CompactionTask) processFailed() bool { - if t.GetNodeID() != 0 && t.GetNodeID() != NullNodeID { - err := t.sessions.DropCompactionPlan(t.GetNodeID(), &datapb.DropCompactionPlanRequest{ - PlanID: t.GetPlanID(), - }) - if err != nil { - log.Warn("l0CompactionTask processFailed unable to drop compaction plan", zap.Int64("planID", t.GetPlanID()), zap.Error(err)) - } - } - - t.resetSegmentCompacting() - log.Info("l0CompactionTask processFailed done", zap.Int64("taskID", t.GetTriggerID()), zap.Int64("planID", t.GetPlanID())) - return true +func (t *l0CompactionTask) hasAssignedWorker() bool { + return t.GetNodeID() != 0 && t.GetNodeID() != NullNodeID } func (t *l0CompactionTask) checkTimeout() bool { @@ -373,6 +381,14 @@ func (t *l0CompactionTask) checkTimeout() bool { return false } +func (t *l0CompactionTask) SetNodeID(id UniqueID) error { + return t.updateAndSaveTaskMeta(setNodeID(id)) +} + +func (t *l0CompactionTask) SaveTaskMeta() error { + return t.saveTaskMeta(t.CompactionTask) +} + func (t *l0CompactionTask) updateAndSaveTaskMeta(opts ...compactionTaskOpt) error { task := t.ShadowClone(opts...) err := t.saveTaskMeta(task) @@ -383,18 +399,10 @@ func (t *l0CompactionTask) updateAndSaveTaskMeta(opts ...compactionTaskOpt) erro return nil } -func (t *l0CompactionTask) SetNodeID(id UniqueID) error { - return t.updateAndSaveTaskMeta(setNodeID(id)) -} - func (t *l0CompactionTask) saveTaskMeta(task *datapb.CompactionTask) error { return t.meta.SaveCompactionTask(task) } -func (t *l0CompactionTask) SaveTaskMeta() error { - return t.saveTaskMeta(t.CompactionTask) -} - func (t *l0CompactionTask) saveSegmentMeta() error { result := t.result var operators []UpdateOperator diff --git a/internal/datacoord/compaction_task_l0_test.go b/internal/datacoord/compaction_task_l0_test.go index 3730c8e43c079..3a994b6136238 100644 --- a/internal/datacoord/compaction_task_l0_test.go +++ b/internal/datacoord/compaction_task_l0_test.go @@ -25,6 +25,7 @@ import ( "github.com/samber/lo" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" + "go.opentelemetry.io/otel/trace" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus/internal/proto/datapb" @@ -212,6 +213,7 @@ func (s *L0CompactionTaskSuite) generateTestL0Task(state datapb.CompactionTaskSt Type: datapb.CompactionType_Level0DeleteCompaction, NodeID: NullNodeID, State: state, + Channel: "ch-1", InputSegments: []int64{100, 101}, }, meta: s.mockMeta, @@ -265,6 +267,37 @@ func (s *L0CompactionTaskSuite) TestPorcessStateTrans() { s.True(got) s.Equal(datapb.CompactionTaskState_failed, t.State) }) + s.Run("test pipelining saveTaskMeta failed", func() { + t := s.generateTestL0Task(datapb.CompactionTaskState_pipelining) + s.mockAlloc.EXPECT().allocN(mock.Anything).Return(100, 200, nil) + t.NodeID = 100 + channel := "ch-1" + deltaLogs := []*datapb.FieldBinlog{getFieldBinlogIDs(101, 3)} + + s.mockMeta.EXPECT().SelectSegments(mock.Anything, mock.Anything).Return( + []*SegmentInfo{ + {SegmentInfo: &datapb.SegmentInfo{ + ID: 200, + Level: datapb.SegmentLevel_L1, + InsertChannel: channel, + }, isCompacting: true}, + }, + ) + + s.mockMeta.EXPECT().GetHealthySegment(mock.Anything).RunAndReturn(func(segID int64) *SegmentInfo { + return &SegmentInfo{SegmentInfo: &datapb.SegmentInfo{ + ID: segID, + Level: datapb.SegmentLevel_L0, + InsertChannel: channel, + State: commonpb.SegmentState_Flushed, + Deltalogs: deltaLogs, + }} + }).Twice() + s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(errors.New("mock error")).Once() + got := t.Process() + s.False(got) + s.Equal(datapb.CompactionTaskState_pipelining, t.State) + }) s.Run("test pipelining Compaction failed", func() { s.mockAlloc.EXPECT().allocN(mock.Anything).Return(100, 200, nil) @@ -515,4 +548,192 @@ func (s *L0CompactionTaskSuite) TestPorcessStateTrans() { s.False(got) s.Equal(datapb.CompactionTaskState_executing, t.GetState()) }) + + s.Run("test timeout", func() { + t := s.generateTestL0Task(datapb.CompactionTaskState_timeout) + t.NodeID = 100 + s.Require().True(t.GetNodeID() > 0) + + s.mockMeta.EXPECT().SetSegmentsCompacting(mock.Anything, false).RunAndReturn(func(segIDs []int64, isCompacting bool) { + s.Require().False(isCompacting) + s.ElementsMatch(segIDs, t.GetInputSegments()) + }).Once() + + got := t.Process() + s.True(got) + }) + + s.Run("test metaSaved success", func() { + t := s.generateTestL0Task(datapb.CompactionTaskState_meta_saved) + t.NodeID = 100 + s.Require().True(t.GetNodeID() > 0) + t.result = &datapb.CompactionPlanResult{} + + s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil).Once() + s.mockMeta.EXPECT().SetSegmentsCompacting(mock.Anything, false).RunAndReturn(func(segIDs []int64, isCompacting bool) { + s.ElementsMatch(segIDs, t.GetInputSegments()) + }).Once() + s.mockSessMgr.EXPECT().DropCompactionPlan(t.GetNodeID(), mock.Anything).Return(nil).Once() + + got := t.Process() + s.True(got) + s.Equal(datapb.CompactionTaskState_completed, t.GetState()) + }) + + s.Run("test metaSaved failed", func() { + t := s.generateTestL0Task(datapb.CompactionTaskState_meta_saved) + t.NodeID = 100 + s.Require().True(t.GetNodeID() > 0) + t.result = &datapb.CompactionPlanResult{} + + s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(errors.New("mock error")).Once() + + got := t.Process() + s.False(got) + s.Equal(datapb.CompactionTaskState_meta_saved, t.GetState()) + }) + + s.Run("test complete drop failed", func() { + t := s.generateTestL0Task(datapb.CompactionTaskState_completed) + t.NodeID = 100 + s.Require().True(t.GetNodeID() > 0) + t.result = &datapb.CompactionPlanResult{} + s.mockSessMgr.EXPECT().DropCompactionPlan(t.GetNodeID(), mock.Anything).Return(errors.New("mock error")).Once() + s.mockMeta.EXPECT().SetSegmentsCompacting(mock.Anything, false).RunAndReturn(func(segIDs []int64, isCompacting bool) { + s.ElementsMatch(segIDs, t.GetInputSegments()) + }).Once() + + got := t.Process() + s.True(got) + s.Equal(datapb.CompactionTaskState_completed, t.GetState()) + }) + + s.Run("test complete success", func() { + t := s.generateTestL0Task(datapb.CompactionTaskState_completed) + t.NodeID = 100 + s.Require().True(t.GetNodeID() > 0) + t.result = &datapb.CompactionPlanResult{} + s.mockSessMgr.EXPECT().DropCompactionPlan(t.GetNodeID(), mock.Anything).Return(nil).Once() + s.mockMeta.EXPECT().SetSegmentsCompacting(mock.Anything, false).RunAndReturn(func(segIDs []int64, isCompacting bool) { + s.ElementsMatch(segIDs, t.GetInputSegments()) + }).Once() + + got := t.Process() + s.True(got) + s.Equal(datapb.CompactionTaskState_completed, t.GetState()) + }) + + s.Run("test process failed success", func() { + t := s.generateTestL0Task(datapb.CompactionTaskState_failed) + t.NodeID = 100 + s.Require().True(t.GetNodeID() > 0) + s.mockSessMgr.EXPECT().DropCompactionPlan(t.GetNodeID(), mock.Anything).Return(nil).Once() + s.mockMeta.EXPECT().SetSegmentsCompacting(mock.Anything, false).RunAndReturn(func(segIDs []int64, isCompacting bool) { + s.ElementsMatch(segIDs, t.GetInputSegments()) + }).Once() + + got := t.Process() + s.True(got) + s.Equal(datapb.CompactionTaskState_failed, t.GetState()) + }) + s.Run("test process failed failed", func() { + t := s.generateTestL0Task(datapb.CompactionTaskState_failed) + t.NodeID = 100 + s.Require().True(t.GetNodeID() > 0) + s.mockSessMgr.EXPECT().DropCompactionPlan(t.GetNodeID(), mock.Anything).Return(errors.New("mock error")).Once() + s.mockMeta.EXPECT().SetSegmentsCompacting(mock.Anything, false).RunAndReturn(func(segIDs []int64, isCompacting bool) { + s.ElementsMatch(segIDs, t.GetInputSegments()) + }).Once() + + got := t.Process() + s.True(got) + s.Equal(datapb.CompactionTaskState_failed, t.GetState()) + }) + + s.Run("test unkonwn task", func() { + t := s.generateTestL0Task(datapb.CompactionTaskState_unknown) + + got := t.Process() + s.True(got) + }) +} + +func (s *L0CompactionTaskSuite) TestSetterGetter() { + t := s.generateTestL0Task(datapb.CompactionTaskState_pipelining) + + span := t.GetSpan() + s.Nil(span) + s.NotPanics(t.EndSpan) + + t.SetSpan(trace.SpanFromContext(context.TODO())) + s.NotPanics(t.EndSpan) + + rst := t.GetResult() + s.Nil(rst) + t.SetResult(&datapb.CompactionPlanResult{PlanID: 19530}) + s.NotNil(t.GetResult()) + + label := t.GetLabel() + s.Equal("10-ch-1", label) + + t.SetStartTime(100) + s.EqualValues(100, t.GetStartTime()) + + t.SetTask(nil) + t.SetPlan(&datapb.CompactionPlan{PlanID: 19530}) + s.NotNil(t.GetPlan()) + + s.Run("set NodeID", func() { + t := s.generateTestL0Task(datapb.CompactionTaskState_pipelining) + + s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil) + t.SetNodeID(1000) + s.EqualValues(1000, t.GetNodeID()) + }) +} + +func (s *L0CompactionTaskSuite) TestCleanLogPath() { + s.Run("plan nil", func() { + t := s.generateTestL0Task(datapb.CompactionTaskState_pipelining) + t.CleanLogPath() + }) + + s.Run("clear path", func() { + t := s.generateTestL0Task(datapb.CompactionTaskState_pipelining) + t.SetPlan(&datapb.CompactionPlan{ + Channel: "ch-1", + Type: datapb.CompactionType_MixCompaction, + SegmentBinlogs: []*datapb.CompactionSegmentBinlogs{ + { + SegmentID: 100, + FieldBinlogs: []*datapb.FieldBinlog{getFieldBinlogIDs(101, 4)}, + Field2StatslogPaths: []*datapb.FieldBinlog{getFieldBinlogIDs(101, 5)}, + Deltalogs: []*datapb.FieldBinlog{getFieldBinlogIDs(101, 6)}, + }, + }, + PlanID: 19530, + }) + + t.SetResult(&datapb.CompactionPlanResult{ + Segments: []*datapb.CompactionSegment{ + { + SegmentID: 100, + InsertLogs: []*datapb.FieldBinlog{getFieldBinlogIDs(101, 4)}, + Field2StatslogPaths: []*datapb.FieldBinlog{getFieldBinlogIDs(101, 5)}, + Deltalogs: []*datapb.FieldBinlog{getFieldBinlogIDs(101, 6)}, + }, + }, + PlanID: 19530, + }) + + t.CleanLogPath() + + s.Empty(t.GetPlan().GetSegmentBinlogs()[0].GetFieldBinlogs()) + s.Empty(t.GetPlan().GetSegmentBinlogs()[0].GetField2StatslogPaths()) + s.Empty(t.GetPlan().GetSegmentBinlogs()[0].GetDeltalogs()) + + s.Empty(t.GetResult().GetSegments()[0].GetInsertLogs()) + s.Empty(t.GetResult().GetSegments()[0].GetField2StatslogPaths()) + s.Empty(t.GetResult().GetSegments()[0].GetDeltalogs()) + }) }