diff --git a/internal/datacoord/meta.go b/internal/datacoord/meta.go index cafb6b0929cd0..aa1d1fcc9e820 100644 --- a/internal/datacoord/meta.go +++ b/internal/datacoord/meta.go @@ -1540,14 +1540,13 @@ func (m *meta) completeMixCompactionMutation(t *datapb.CompactionTask, result *d })), }) - // L1 segment with NumRows=0 will be discarded, so no need to change the metric - if compactToSegmentInfo.GetNumOfRows() > 0 { - // metrics mutation for compactTo segments - metricMutation.addNewSeg(compactToSegmentInfo.GetState(), compactToSegmentInfo.GetLevel(), compactToSegmentInfo.GetNumOfRows()) - } else { + if compactToSegmentInfo.GetNumOfRows() == 0 { compactToSegmentInfo.State = commonpb.SegmentState_Dropped } + // metrics mutation for compactTo segments + metricMutation.addNewSeg(compactToSegmentInfo.GetState(), compactToSegmentInfo.GetLevel(), compactToSegmentInfo.GetNumOfRows()) + log.Info("Add a new compactTo segment", zap.Int64("compactTo", compactToSegmentInfo.GetID()), zap.Int64("compactTo segment numRows", compactToSegmentInfo.GetNumOfRows()), @@ -1571,6 +1570,7 @@ func (m *meta) completeMixCompactionMutation(t *datapb.CompactionTask, result *d for _, seg := range compactToInfos { binlogs = append(binlogs, metastore.BinlogsIncrement{Segment: seg}) } + // alter compactTo before compactFrom segments to avoid data lost if service crash during AlterSegments if err := m.catalog.AlterSegments(m.ctx, compactToInfos, binlogs...); err != nil { log.Warn("fail to alter compactTo segments", zap.Error(err)) diff --git a/internal/datacoord/meta_test.go b/internal/datacoord/meta_test.go index 3dee653ee104d..4545e5d31bb7a 100644 --- a/internal/datacoord/meta_test.go +++ b/internal/datacoord/meta_test.go @@ -179,103 +179,176 @@ func (suite *MetaBasicSuite) TestCollection() { } func (suite *MetaBasicSuite) TestCompleteCompactionMutation() { - latestSegments := NewSegmentsInfo() - for segID, segment := range map[UniqueID]*SegmentInfo{ - 1: {SegmentInfo: &datapb.SegmentInfo{ - ID: 1, - CollectionID: 100, - PartitionID: 10, - State: commonpb.SegmentState_Flushed, - Level: datapb.SegmentLevel_L1, - Binlogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 10000, 10001)}, - Statslogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 20000, 20001)}, - // latest segment has 2 deltalogs, one submit for compaction, one is appended before compaction done - Deltalogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 30000), getFieldBinlogIDs(0, 30001)}, - NumOfRows: 2, - }}, - 2: {SegmentInfo: &datapb.SegmentInfo{ - ID: 2, - CollectionID: 100, - PartitionID: 10, - State: commonpb.SegmentState_Flushed, - Level: datapb.SegmentLevel_L1, - Binlogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 11000)}, - Statslogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 21000)}, - // latest segment has 2 deltalogs, one submit for compaction, one is appended before compaction done - Deltalogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 31000), getFieldBinlogIDs(0, 31001)}, - NumOfRows: 2, - }}, - } { - latestSegments.SetSegment(segID, segment) + getLatestSegments := func() *SegmentsInfo { + latestSegments := NewSegmentsInfo() + for segID, segment := range map[UniqueID]*SegmentInfo{ + 1: {SegmentInfo: &datapb.SegmentInfo{ + ID: 1, + CollectionID: 100, + PartitionID: 10, + State: commonpb.SegmentState_Flushed, + Level: datapb.SegmentLevel_L1, + Binlogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 10000, 10001)}, + Statslogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 20000, 20001)}, + // latest segment has 2 deltalogs, one submit for compaction, one is appended before compaction done + Deltalogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 30000), getFieldBinlogIDs(0, 30001)}, + NumOfRows: 2, + }}, + 2: {SegmentInfo: &datapb.SegmentInfo{ + ID: 2, + CollectionID: 100, + PartitionID: 10, + State: commonpb.SegmentState_Flushed, + Level: datapb.SegmentLevel_L1, + Binlogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 11000)}, + Statslogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 21000)}, + // latest segment has 2 deltalogs, one submit for compaction, one is appended before compaction done + Deltalogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 31000), getFieldBinlogIDs(0, 31001)}, + NumOfRows: 2, + }}, + } { + latestSegments.SetSegment(segID, segment) + } + + return latestSegments } mockChMgr := mocks.NewChunkManager(suite.T()) - m := &meta{ - catalog: &datacoord.Catalog{MetaKv: NewMetaMemoryKV()}, - segments: latestSegments, - chunkManager: mockChMgr, - } - compactToSeg := &datapb.CompactionSegment{ - SegmentID: 3, - InsertLogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 50000)}, - Field2StatslogPaths: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 50001)}, - NumOfRows: 2, - } + suite.Run("test complete with compactTo 0 num of rows", func() { + latestSegments := getLatestSegments() + compactToSeg := &datapb.CompactionSegment{ + SegmentID: 4, + InsertLogs: []*datapb.FieldBinlog{}, + Field2StatslogPaths: []*datapb.FieldBinlog{}, + NumOfRows: 0, + } - result := &datapb.CompactionPlanResult{ - Segments: []*datapb.CompactionSegment{compactToSeg}, - } - task := &datapb.CompactionTask{ - InputSegments: []UniqueID{1, 2}, - Type: datapb.CompactionType_MixCompaction, - } + result := &datapb.CompactionPlanResult{ + Segments: []*datapb.CompactionSegment{compactToSeg}, + } + task := &datapb.CompactionTask{ + InputSegments: []UniqueID{1, 2}, + Type: datapb.CompactionType_MixCompaction, + } + m := &meta{ + catalog: &datacoord.Catalog{MetaKv: NewMetaMemoryKV()}, + segments: latestSegments, + chunkManager: mockChMgr, + } - infos, mutation, err := m.CompleteCompactionMutation(task, result) - assert.NoError(suite.T(), err) - suite.Equal(1, len(infos)) - info := infos[0] - suite.NoError(err) - suite.NotNil(info) - suite.NotNil(mutation) - - // check newSegment - suite.EqualValues(3, info.GetID()) - suite.Equal(datapb.SegmentLevel_L1, info.GetLevel()) - suite.Equal(commonpb.SegmentState_Flushed, info.GetState()) - - binlogs := info.GetBinlogs() - for _, fbinlog := range binlogs { - for _, blog := range fbinlog.GetBinlogs() { - suite.Empty(blog.GetLogPath()) - suite.EqualValues(50000, blog.GetLogID()) + infos, mutation, err := m.CompleteCompactionMutation(task, result) + assert.NoError(suite.T(), err) + suite.Equal(1, len(infos)) + info := infos[0] + suite.NoError(err) + suite.NotNil(info) + suite.NotNil(mutation) + + // check compact to segments + suite.EqualValues(4, info.GetID()) + suite.Equal(datapb.SegmentLevel_L1, info.GetLevel()) + suite.Equal(commonpb.SegmentState_Dropped, info.GetState()) + + suite.Empty(info.GetBinlogs()) + suite.Empty(info.GetStatslogs()) + + // check compactFrom segments + for _, segID := range []int64{1, 2} { + seg := m.GetSegment(segID) + suite.Equal(commonpb.SegmentState_Dropped, seg.GetState()) + suite.NotEmpty(seg.GetDroppedAt()) + + suite.EqualValues(segID, seg.GetID()) + suite.ElementsMatch(latestSegments.segments[segID].GetBinlogs(), seg.GetBinlogs()) + suite.ElementsMatch(latestSegments.segments[segID].GetStatslogs(), seg.GetStatslogs()) + suite.ElementsMatch(latestSegments.segments[segID].GetDeltalogs(), seg.GetDeltalogs()) } - } - statslogs := info.GetStatslogs() - for _, fbinlog := range statslogs { - for _, blog := range fbinlog.GetBinlogs() { - suite.Empty(blog.GetLogPath()) - suite.EqualValues(50001, blog.GetLogID()) + // check mutation metrics + suite.EqualValues(2, len(mutation.stateChange[datapb.SegmentLevel_L1.String()])) + suite.EqualValues(-4, mutation.rowCountChange) + suite.EqualValues(0, mutation.rowCountAccChange) + flushedUnsorted := mutation.stateChange[datapb.SegmentLevel_L1.String()][commonpb.SegmentState_Flushed.String()] + suite.EqualValues(-2, flushedUnsorted) + + droppedUnsorted := mutation.stateChange[datapb.SegmentLevel_L1.String()][commonpb.SegmentState_Dropped.String()] + suite.EqualValues(3, droppedUnsorted) + }) + + suite.Run("test complete compaction mutation", func() { + latestSegments := getLatestSegments() + compactToSeg := &datapb.CompactionSegment{ + SegmentID: 3, + InsertLogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 50000)}, + Field2StatslogPaths: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 50001)}, + NumOfRows: 2, } - } - // check compactFrom segments - for _, segID := range []int64{1, 2} { - seg := m.GetSegment(segID) - suite.Equal(commonpb.SegmentState_Dropped, seg.GetState()) - suite.NotEmpty(seg.GetDroppedAt()) + result := &datapb.CompactionPlanResult{ + Segments: []*datapb.CompactionSegment{compactToSeg}, + } + task := &datapb.CompactionTask{ + InputSegments: []UniqueID{1, 2}, + Type: datapb.CompactionType_MixCompaction, + } + m := &meta{ + catalog: &datacoord.Catalog{MetaKv: NewMetaMemoryKV()}, + segments: latestSegments, + chunkManager: mockChMgr, + } - suite.EqualValues(segID, seg.GetID()) - suite.ElementsMatch(latestSegments.segments[segID].GetBinlogs(), seg.GetBinlogs()) - suite.ElementsMatch(latestSegments.segments[segID].GetStatslogs(), seg.GetStatslogs()) - suite.ElementsMatch(latestSegments.segments[segID].GetDeltalogs(), seg.GetDeltalogs()) - } + infos, mutation, err := m.CompleteCompactionMutation(task, result) + assert.NoError(suite.T(), err) + suite.Equal(1, len(infos)) + info := infos[0] + suite.NoError(err) + suite.NotNil(info) + suite.NotNil(mutation) + + // check newSegment + suite.EqualValues(3, info.GetID()) + suite.Equal(datapb.SegmentLevel_L1, info.GetLevel()) + suite.Equal(commonpb.SegmentState_Flushed, info.GetState()) + + binlogs := info.GetBinlogs() + for _, fbinlog := range binlogs { + for _, blog := range fbinlog.GetBinlogs() { + suite.Empty(blog.GetLogPath()) + suite.EqualValues(50000, blog.GetLogID()) + } + } + + statslogs := info.GetStatslogs() + for _, fbinlog := range statslogs { + for _, blog := range fbinlog.GetBinlogs() { + suite.Empty(blog.GetLogPath()) + suite.EqualValues(50001, blog.GetLogID()) + } + } + + // check compactFrom segments + for _, segID := range []int64{1, 2} { + seg := m.GetSegment(segID) + suite.Equal(commonpb.SegmentState_Dropped, seg.GetState()) + suite.NotEmpty(seg.GetDroppedAt()) - // check mutation metrics - suite.Equal(2, len(mutation.stateChange[datapb.SegmentLevel_L1.String()])) - suite.EqualValues(-2, mutation.rowCountChange) - suite.EqualValues(2, mutation.rowCountAccChange) + suite.EqualValues(segID, seg.GetID()) + suite.ElementsMatch(latestSegments.segments[segID].GetBinlogs(), seg.GetBinlogs()) + suite.ElementsMatch(latestSegments.segments[segID].GetStatslogs(), seg.GetStatslogs()) + suite.ElementsMatch(latestSegments.segments[segID].GetDeltalogs(), seg.GetDeltalogs()) + } + + // check mutation metrics + suite.EqualValues(2, len(mutation.stateChange[datapb.SegmentLevel_L1.String()])) + suite.EqualValues(-2, mutation.rowCountChange) + suite.EqualValues(2, mutation.rowCountAccChange) + flushedCount := mutation.stateChange[datapb.SegmentLevel_L1.String()][commonpb.SegmentState_Flushed.String()] + suite.EqualValues(-1, flushedCount) + + droppedCount := mutation.stateChange[datapb.SegmentLevel_L1.String()][commonpb.SegmentState_Dropped.String()] + suite.EqualValues(2, droppedCount) + }) } // fix https://github.com/milvus-io/milvus/issues/35003