Skip to content

Commit

Permalink
fix: Exlude L0 compaction when clustering is executing (#37141)
Browse files Browse the repository at this point in the history
Also remove conflit check when executing L0. The exclusive is already
guarenteed in scheduler

See also: #37140

---------

Signed-off-by: yangxuan <[email protected]>
  • Loading branch information
XuanYang-cn authored Oct 28, 2024
1 parent 1e75a42 commit 26028f4
Show file tree
Hide file tree
Showing 3 changed files with 3 additions and 78 deletions.
3 changes: 2 additions & 1 deletion internal/datacoord/compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,8 @@ func (c *compactionPlanHandler) schedule() []CompactionTask {

switch t.GetTaskProto().GetType() {
case datapb.CompactionType_Level0DeleteCompaction:
if mixChannelExcludes.Contain(t.GetTaskProto().GetChannel()) {
if mixChannelExcludes.Contain(t.GetTaskProto().GetChannel()) ||
clusterChannelExcludes.Contain(t.GetTaskProto().GetChannel()) {
excluded = append(excluded, t)
continue
}
Expand Down
10 changes: 1 addition & 9 deletions internal/datacoord/compaction_task_l0.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,19 +296,11 @@ func (t *l0CompactionTask) BuildCompactionRequest() (*datapb.CompactionPlan, err
}))

if len(sealedSegments) == 0 {
// TO-DO fast finish l0 segment, just drop l0 segment
// TODO fast finish l0 segment, just drop l0 segment
log.Info("l0Compaction available non-L0 Segments is empty ")
return nil, errors.Errorf("Selected zero L1/L2 segments for the position=%v", taskProto.GetPos())
}

for _, segInfo := range sealedSegments {
// TODO should allow parallel executing of l0 compaction
if segInfo.isCompacting {
log.Warn("l0CompactionTask candidate segment is compacting", zap.Int64("segmentID", segInfo.GetID()))
return nil, merr.WrapErrCompactionPlanConflict(fmt.Sprintf("segment %d is compacting", segInfo.GetID()))
}
}

sealedSegBinlogs := lo.Map(sealedSegments, func(info *SegmentInfo, _ int) *datapb.CompactionSegmentBinlogs {
return &datapb.CompactionSegmentBinlogs{
SegmentID: info.GetID(),
Expand Down
68 changes: 0 additions & 68 deletions internal/datacoord/compaction_task_l0_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,74 +221,6 @@ func (s *L0CompactionTaskSuite) TestPorcessStateTrans() {
s.EqualValues(NullNodeID, t.GetTaskProto().NodeID)
})

s.Run("test pipelining BuildCompactionRequest failed", func() {
s.mockAlloc.EXPECT().AllocN(mock.Anything).Return(100, 200, nil)
s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil)
t := s.generateTestL0Task(datapb.CompactionTaskState_pipelining)
t.updateAndSaveTaskMeta(setNodeID(100))
channel := "ch-1"
deltaLogs := []*datapb.FieldBinlog{getFieldBinlogIDs(101, 3)}

s.mockMeta.EXPECT().SelectSegments(mock.Anything, mock.Anything).Return(
[]*SegmentInfo{
{SegmentInfo: &datapb.SegmentInfo{
ID: 200,
Level: datapb.SegmentLevel_L1,
InsertChannel: channel,
}, isCompacting: true},
},
)

s.mockMeta.EXPECT().GetHealthySegment(mock.Anything).RunAndReturn(func(segID int64) *SegmentInfo {
return &SegmentInfo{SegmentInfo: &datapb.SegmentInfo{
ID: segID,
Level: datapb.SegmentLevel_L0,
InsertChannel: channel,
State: commonpb.SegmentState_Flushed,
Deltalogs: deltaLogs,
}}
}).Twice()
s.mockMeta.EXPECT().SetSegmentsCompacting(mock.Anything, false).Return()

s.mockSessMgr.EXPECT().DropCompactionPlan(mock.Anything, mock.Anything).Return(nil).Once()

got := t.Process()
s.True(got)
s.Equal(datapb.CompactionTaskState_cleaned, t.GetTaskProto().State)
})
s.Run("test pipelining saveTaskMeta failed", func() {
s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil).Once()
t := s.generateTestL0Task(datapb.CompactionTaskState_pipelining)
s.mockAlloc.EXPECT().AllocN(mock.Anything).Return(100, 200, nil)
t.updateAndSaveTaskMeta(setNodeID(100))
channel := "ch-1"
deltaLogs := []*datapb.FieldBinlog{getFieldBinlogIDs(101, 3)}

s.mockMeta.EXPECT().SelectSegments(mock.Anything, mock.Anything).Return(
[]*SegmentInfo{
{SegmentInfo: &datapb.SegmentInfo{
ID: 200,
Level: datapb.SegmentLevel_L1,
InsertChannel: channel,
}, isCompacting: true},
},
)

s.mockMeta.EXPECT().GetHealthySegment(mock.Anything).RunAndReturn(func(segID int64) *SegmentInfo {
return &SegmentInfo{SegmentInfo: &datapb.SegmentInfo{
ID: segID,
Level: datapb.SegmentLevel_L0,
InsertChannel: channel,
State: commonpb.SegmentState_Flushed,
Deltalogs: deltaLogs,
}}
}).Twice()
s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(errors.New("mock error")).Once()
got := t.Process()
s.False(got)
s.Equal(datapb.CompactionTaskState_pipelining, t.GetTaskProto().State)
})

s.Run("test pipelining Compaction failed", func() {
s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil)
s.mockAlloc.EXPECT().AllocN(mock.Anything).Return(100, 200, nil)
Expand Down

0 comments on commit 26028f4

Please sign in to comment.