diff --git a/internal/datacoord/compaction.go b/internal/datacoord/compaction.go index 82103062598f0..7a4093b4b3065 100644 --- a/internal/datacoord/compaction.go +++ b/internal/datacoord/compaction.go @@ -248,7 +248,8 @@ func (c *compactionPlanHandler) schedule() []CompactionTask { switch t.GetTaskProto().GetType() { case datapb.CompactionType_Level0DeleteCompaction: - if mixChannelExcludes.Contain(t.GetTaskProto().GetChannel()) { + if mixChannelExcludes.Contain(t.GetTaskProto().GetChannel()) || + clusterChannelExcludes.Contain(t.GetTaskProto().GetChannel()) { excluded = append(excluded, t) continue } diff --git a/internal/datacoord/compaction_task_l0.go b/internal/datacoord/compaction_task_l0.go index 002b8a2635f05..1668bad1f0528 100644 --- a/internal/datacoord/compaction_task_l0.go +++ b/internal/datacoord/compaction_task_l0.go @@ -296,19 +296,11 @@ func (t *l0CompactionTask) BuildCompactionRequest() (*datapb.CompactionPlan, err })) if len(sealedSegments) == 0 { - // TO-DO fast finish l0 segment, just drop l0 segment + // TODO fast finish l0 segment, just drop l0 segment log.Info("l0Compaction available non-L0 Segments is empty ") return nil, errors.Errorf("Selected zero L1/L2 segments for the position=%v", taskProto.GetPos()) } - for _, segInfo := range sealedSegments { - // TODO should allow parallel executing of l0 compaction - if segInfo.isCompacting { - log.Warn("l0CompactionTask candidate segment is compacting", zap.Int64("segmentID", segInfo.GetID())) - return nil, merr.WrapErrCompactionPlanConflict(fmt.Sprintf("segment %d is compacting", segInfo.GetID())) - } - } - sealedSegBinlogs := lo.Map(sealedSegments, func(info *SegmentInfo, _ int) *datapb.CompactionSegmentBinlogs { return &datapb.CompactionSegmentBinlogs{ SegmentID: info.GetID(), diff --git a/internal/datacoord/compaction_task_l0_test.go b/internal/datacoord/compaction_task_l0_test.go index ede8ddd78e2cf..3d4724aaae425 100644 --- a/internal/datacoord/compaction_task_l0_test.go +++ b/internal/datacoord/compaction_task_l0_test.go @@ -221,74 +221,6 @@ func (s *L0CompactionTaskSuite) TestPorcessStateTrans() { s.EqualValues(NullNodeID, t.GetTaskProto().NodeID) }) - s.Run("test pipelining BuildCompactionRequest failed", func() { - s.mockAlloc.EXPECT().AllocN(mock.Anything).Return(100, 200, nil) - s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil) - t := s.generateTestL0Task(datapb.CompactionTaskState_pipelining) - t.updateAndSaveTaskMeta(setNodeID(100)) - channel := "ch-1" - deltaLogs := []*datapb.FieldBinlog{getFieldBinlogIDs(101, 3)} - - s.mockMeta.EXPECT().SelectSegments(mock.Anything, mock.Anything).Return( - []*SegmentInfo{ - {SegmentInfo: &datapb.SegmentInfo{ - ID: 200, - Level: datapb.SegmentLevel_L1, - InsertChannel: channel, - }, isCompacting: true}, - }, - ) - - s.mockMeta.EXPECT().GetHealthySegment(mock.Anything).RunAndReturn(func(segID int64) *SegmentInfo { - return &SegmentInfo{SegmentInfo: &datapb.SegmentInfo{ - ID: segID, - Level: datapb.SegmentLevel_L0, - InsertChannel: channel, - State: commonpb.SegmentState_Flushed, - Deltalogs: deltaLogs, - }} - }).Twice() - s.mockMeta.EXPECT().SetSegmentsCompacting(mock.Anything, false).Return() - - s.mockSessMgr.EXPECT().DropCompactionPlan(mock.Anything, mock.Anything).Return(nil).Once() - - got := t.Process() - s.True(got) - s.Equal(datapb.CompactionTaskState_cleaned, t.GetTaskProto().State) - }) - s.Run("test pipelining saveTaskMeta failed", func() { - s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil).Once() - t := s.generateTestL0Task(datapb.CompactionTaskState_pipelining) - s.mockAlloc.EXPECT().AllocN(mock.Anything).Return(100, 200, nil) - t.updateAndSaveTaskMeta(setNodeID(100)) - channel := "ch-1" - deltaLogs := []*datapb.FieldBinlog{getFieldBinlogIDs(101, 3)} - - s.mockMeta.EXPECT().SelectSegments(mock.Anything, mock.Anything).Return( - []*SegmentInfo{ - {SegmentInfo: &datapb.SegmentInfo{ - ID: 200, - Level: datapb.SegmentLevel_L1, - InsertChannel: channel, - }, isCompacting: true}, - }, - ) - - s.mockMeta.EXPECT().GetHealthySegment(mock.Anything).RunAndReturn(func(segID int64) *SegmentInfo { - return &SegmentInfo{SegmentInfo: &datapb.SegmentInfo{ - ID: segID, - Level: datapb.SegmentLevel_L0, - InsertChannel: channel, - State: commonpb.SegmentState_Flushed, - Deltalogs: deltaLogs, - }} - }).Twice() - s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(errors.New("mock error")).Once() - got := t.Process() - s.False(got) - s.Equal(datapb.CompactionTaskState_pipelining, t.GetTaskProto().State) - }) - s.Run("test pipelining Compaction failed", func() { s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil) s.mockAlloc.EXPECT().AllocN(mock.Anything).Return(100, 200, nil)