Skip to content

Commit

Permalink
fix: [cp25]Correct dropped segment num metrics
Browse files Browse the repository at this point in the history
See also: milvus-io#31891
pr: milvus-io#37410

Signed-off-by: yangxuan <[email protected]>
  • Loading branch information
XuanYang-cn committed Nov 6, 2024
1 parent 8ba0a4a commit 631125b
Show file tree
Hide file tree
Showing 2 changed files with 163 additions and 90 deletions.
10 changes: 5 additions & 5 deletions internal/datacoord/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -1551,14 +1551,13 @@ func (m *meta) completeMixCompactionMutation(t *datapb.CompactionTask, result *d
IsSorted: compactToSegment.GetIsSorted(),
})

// L1 segment with NumRows=0 will be discarded, so no need to change the metric
if compactToSegmentInfo.GetNumOfRows() > 0 {
// metrics mutation for compactTo segments
metricMutation.addNewSeg(compactToSegmentInfo.GetState(), compactToSegmentInfo.GetLevel(), compactToSegmentInfo.GetIsSorted(), compactToSegmentInfo.GetNumOfRows())
} else {
if compactToSegmentInfo.GetNumOfRows() == 0 {
compactToSegmentInfo.State = commonpb.SegmentState_Dropped
}

// metrics mutation for compactTo segments
metricMutation.addNewSeg(compactToSegmentInfo.GetState(), compactToSegmentInfo.GetLevel(), compactToSegmentInfo.GetIsSorted(), compactToSegmentInfo.GetNumOfRows())

log.Info("Add a new compactTo segment",
zap.Int64("compactTo", compactToSegmentInfo.GetID()),
zap.Int64("compactTo segment numRows", compactToSegmentInfo.GetNumOfRows()),
Expand All @@ -1582,6 +1581,7 @@ func (m *meta) completeMixCompactionMutation(t *datapb.CompactionTask, result *d
for _, seg := range compactToInfos {
binlogs = append(binlogs, metastore.BinlogsIncrement{Segment: seg})
}

// alter compactTo before compactFrom segments to avoid data lost if service crash during AlterSegments
if err := m.catalog.AlterSegments(m.ctx, compactToInfos, binlogs...); err != nil {
log.Warn("fail to alter compactTo segments", zap.Error(err))
Expand Down
243 changes: 158 additions & 85 deletions internal/datacoord/meta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,103 +182,176 @@ func (suite *MetaBasicSuite) TestCollection() {
}

func (suite *MetaBasicSuite) TestCompleteCompactionMutation() {
latestSegments := NewSegmentsInfo()
for segID, segment := range map[UniqueID]*SegmentInfo{
1: {SegmentInfo: &datapb.SegmentInfo{
ID: 1,
CollectionID: 100,
PartitionID: 10,
State: commonpb.SegmentState_Flushed,
Level: datapb.SegmentLevel_L1,
Binlogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 10000, 10001)},
Statslogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 20000, 20001)},
// latest segment has 2 deltalogs, one submit for compaction, one is appended before compaction done
Deltalogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 30000), getFieldBinlogIDs(0, 30001)},
NumOfRows: 2,
}},
2: {SegmentInfo: &datapb.SegmentInfo{
ID: 2,
CollectionID: 100,
PartitionID: 10,
State: commonpb.SegmentState_Flushed,
Level: datapb.SegmentLevel_L1,
Binlogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 11000)},
Statslogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 21000)},
// latest segment has 2 deltalogs, one submit for compaction, one is appended before compaction done
Deltalogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 31000), getFieldBinlogIDs(0, 31001)},
NumOfRows: 2,
}},
} {
latestSegments.SetSegment(segID, segment)
getLatestSegments := func() *SegmentsInfo {
latestSegments := NewSegmentsInfo()
for segID, segment := range map[UniqueID]*SegmentInfo{
1: {SegmentInfo: &datapb.SegmentInfo{
ID: 1,
CollectionID: 100,
PartitionID: 10,
State: commonpb.SegmentState_Flushed,
Level: datapb.SegmentLevel_L1,
Binlogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 10000, 10001)},
Statslogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 20000, 20001)},
// latest segment has 2 deltalogs, one submit for compaction, one is appended before compaction done
Deltalogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 30000), getFieldBinlogIDs(0, 30001)},
NumOfRows: 2,
}},
2: {SegmentInfo: &datapb.SegmentInfo{
ID: 2,
CollectionID: 100,
PartitionID: 10,
State: commonpb.SegmentState_Flushed,
Level: datapb.SegmentLevel_L1,
Binlogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 11000)},
Statslogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 21000)},
// latest segment has 2 deltalogs, one submit for compaction, one is appended before compaction done
Deltalogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 31000), getFieldBinlogIDs(0, 31001)},
NumOfRows: 2,
}},
} {
latestSegments.SetSegment(segID, segment)
}

return latestSegments
}

mockChMgr := mocks.NewChunkManager(suite.T())
m := &meta{
catalog: &datacoord.Catalog{MetaKv: NewMetaMemoryKV()},
segments: latestSegments,
chunkManager: mockChMgr,
}

compactToSeg := &datapb.CompactionSegment{
SegmentID: 3,
InsertLogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 50000)},
Field2StatslogPaths: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 50001)},
NumOfRows: 2,
}
suite.Run("test complete with compactTo 0 num of rows", func() {
latestSegments := getLatestSegments()
compactToSeg := &datapb.CompactionSegment{
SegmentID: 4,
InsertLogs: []*datapb.FieldBinlog{},
Field2StatslogPaths: []*datapb.FieldBinlog{},
NumOfRows: 0,
}

result := &datapb.CompactionPlanResult{
Segments: []*datapb.CompactionSegment{compactToSeg},
}
task := &datapb.CompactionTask{
InputSegments: []UniqueID{1, 2},
Type: datapb.CompactionType_MixCompaction,
}
result := &datapb.CompactionPlanResult{
Segments: []*datapb.CompactionSegment{compactToSeg},
}
task := &datapb.CompactionTask{
InputSegments: []UniqueID{1, 2},
Type: datapb.CompactionType_MixCompaction,
}
m := &meta{
catalog: &datacoord.Catalog{MetaKv: NewMetaMemoryKV()},
segments: latestSegments,
chunkManager: mockChMgr,
}

infos, mutation, err := m.CompleteCompactionMutation(task, result)
assert.NoError(suite.T(), err)
suite.Equal(1, len(infos))
info := infos[0]
suite.NoError(err)
suite.NotNil(info)
suite.NotNil(mutation)

// check newSegment
suite.EqualValues(3, info.GetID())
suite.Equal(datapb.SegmentLevel_L1, info.GetLevel())
suite.Equal(commonpb.SegmentState_Flushed, info.GetState())

binlogs := info.GetBinlogs()
for _, fbinlog := range binlogs {
for _, blog := range fbinlog.GetBinlogs() {
suite.Empty(blog.GetLogPath())
suite.EqualValues(50000, blog.GetLogID())
infos, mutation, err := m.CompleteCompactionMutation(task, result)
assert.NoError(suite.T(), err)
suite.Equal(1, len(infos))
info := infos[0]
suite.NoError(err)
suite.NotNil(info)
suite.NotNil(mutation)

// check compact to segments
suite.EqualValues(4, info.GetID())
suite.Equal(datapb.SegmentLevel_L1, info.GetLevel())
suite.Equal(commonpb.SegmentState_Dropped, info.GetState())

suite.Empty(info.GetBinlogs())
suite.Empty(info.GetStatslogs())

// check compactFrom segments
for _, segID := range []int64{1, 2} {
seg := m.GetSegment(segID)
suite.Equal(commonpb.SegmentState_Dropped, seg.GetState())
suite.NotEmpty(seg.GetDroppedAt())

suite.EqualValues(segID, seg.GetID())
suite.ElementsMatch(latestSegments.segments[segID].GetBinlogs(), seg.GetBinlogs())
suite.ElementsMatch(latestSegments.segments[segID].GetStatslogs(), seg.GetStatslogs())
suite.ElementsMatch(latestSegments.segments[segID].GetDeltalogs(), seg.GetDeltalogs())
}
}

statslogs := info.GetStatslogs()
for _, fbinlog := range statslogs {
for _, blog := range fbinlog.GetBinlogs() {
suite.Empty(blog.GetLogPath())
suite.EqualValues(50001, blog.GetLogID())
// check mutation metrics
suite.EqualValues(2, len(mutation.stateChange[datapb.SegmentLevel_L1.String()]))
suite.EqualValues(-4, mutation.rowCountChange)
suite.EqualValues(0, mutation.rowCountAccChange)
flushedUnsorted := mutation.stateChange[datapb.SegmentLevel_L1.String()][commonpb.SegmentState_Flushed.String()][getSortStatus(false)]
suite.EqualValues(-2, flushedUnsorted)

droppedUnsorted := mutation.stateChange[datapb.SegmentLevel_L1.String()][commonpb.SegmentState_Dropped.String()][getSortStatus(false)]
suite.EqualValues(3, droppedUnsorted)
})

suite.Run("test complete compaction mutation", func() {
latestSegments := getLatestSegments()
compactToSeg := &datapb.CompactionSegment{
SegmentID: 3,
InsertLogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 50000)},
Field2StatslogPaths: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 50001)},
NumOfRows: 2,
}
}

// check compactFrom segments
for _, segID := range []int64{1, 2} {
seg := m.GetSegment(segID)
suite.Equal(commonpb.SegmentState_Dropped, seg.GetState())
suite.NotEmpty(seg.GetDroppedAt())
result := &datapb.CompactionPlanResult{
Segments: []*datapb.CompactionSegment{compactToSeg},
}
task := &datapb.CompactionTask{
InputSegments: []UniqueID{1, 2},
Type: datapb.CompactionType_MixCompaction,
}
m := &meta{
catalog: &datacoord.Catalog{MetaKv: NewMetaMemoryKV()},
segments: latestSegments,
chunkManager: mockChMgr,
}

suite.EqualValues(segID, seg.GetID())
suite.ElementsMatch(latestSegments.segments[segID].GetBinlogs(), seg.GetBinlogs())
suite.ElementsMatch(latestSegments.segments[segID].GetStatslogs(), seg.GetStatslogs())
suite.ElementsMatch(latestSegments.segments[segID].GetDeltalogs(), seg.GetDeltalogs())
}
infos, mutation, err := m.CompleteCompactionMutation(task, result)
assert.NoError(suite.T(), err)
suite.Equal(1, len(infos))
info := infos[0]
suite.NoError(err)
suite.NotNil(info)
suite.NotNil(mutation)

// check newSegment
suite.EqualValues(3, info.GetID())
suite.Equal(datapb.SegmentLevel_L1, info.GetLevel())
suite.Equal(commonpb.SegmentState_Flushed, info.GetState())

binlogs := info.GetBinlogs()
for _, fbinlog := range binlogs {
for _, blog := range fbinlog.GetBinlogs() {
suite.Empty(blog.GetLogPath())
suite.EqualValues(50000, blog.GetLogID())
}
}

statslogs := info.GetStatslogs()
for _, fbinlog := range statslogs {
for _, blog := range fbinlog.GetBinlogs() {
suite.Empty(blog.GetLogPath())
suite.EqualValues(50001, blog.GetLogID())
}
}

// check compactFrom segments
for _, segID := range []int64{1, 2} {
seg := m.GetSegment(segID)
suite.Equal(commonpb.SegmentState_Dropped, seg.GetState())
suite.NotEmpty(seg.GetDroppedAt())

// check mutation metrics
suite.Equal(2, len(mutation.stateChange[datapb.SegmentLevel_L1.String()]))
suite.EqualValues(-2, mutation.rowCountChange)
suite.EqualValues(2, mutation.rowCountAccChange)
suite.EqualValues(segID, seg.GetID())
suite.ElementsMatch(latestSegments.segments[segID].GetBinlogs(), seg.GetBinlogs())
suite.ElementsMatch(latestSegments.segments[segID].GetStatslogs(), seg.GetStatslogs())
suite.ElementsMatch(latestSegments.segments[segID].GetDeltalogs(), seg.GetDeltalogs())
}

// check mutation metrics
suite.EqualValues(2, len(mutation.stateChange[datapb.SegmentLevel_L1.String()]))
suite.EqualValues(-2, mutation.rowCountChange)
suite.EqualValues(2, mutation.rowCountAccChange)
flushedCount := mutation.stateChange[datapb.SegmentLevel_L1.String()][commonpb.SegmentState_Flushed.String()][getSortStatus(false)]
suite.EqualValues(-1, flushedCount)

droppedCount := mutation.stateChange[datapb.SegmentLevel_L1.String()][commonpb.SegmentState_Dropped.String()][getSortStatus(false)]
suite.EqualValues(2, droppedCount)
})
}

func (suite *MetaBasicSuite) TestSetSegment() {
Expand Down

0 comments on commit 631125b

Please sign in to comment.