Skip to content

Commit

Permalink
Alter compactTo segments before compactFrom to avoid data loss if crash
Browse files Browse the repository at this point in the history
Signed-off-by: wayblink <[email protected]>
  • Loading branch information
wayblink committed Jul 9, 2024
1 parent 737bd7c commit 5f95aae
Showing 1 changed file with 73 additions and 73 deletions.
146 changes: 73 additions & 73 deletions internal/datacoord/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -790,6 +790,7 @@ func UpdateSegmentPartitionStatsVersionOperator(segmentID int64, version int64)
}
segment.LastPartitionStatsVersion = segment.PartitionStatsVersion
segment.PartitionStatsVersion = version
log.Debug("update segment version", zap.Int64("segmentID", segmentID), zap.Int64("PartitionStatsVersion", version), zap.Int64("LastPartitionStatsVersion", segment.LastPartitionStatsVersion))
return true
}
}
Expand All @@ -803,6 +804,7 @@ func RevertSegmentLevelOperator(segmentID int64) UpdateOperator {
return false
}
segment.Level = segment.LastLevel
log.Debug("revert segment level", zap.Int64("segmentID", segmentID), zap.String("LastLevel", segment.LastLevel.String()))

Check warning on line 807 in internal/datacoord/meta.go

View check run for this annotation

Codecov / codecov/patch

internal/datacoord/meta.go#L807

Added line #L807 was not covered by tests
return true
}
}
Expand All @@ -816,6 +818,7 @@ func RevertSegmentPartitionStatsVersionOperator(segmentID int64) UpdateOperator
return false
}
segment.PartitionStatsVersion = segment.LastPartitionStatsVersion
log.Debug("revert segment partition stats version", zap.Int64("segmentID", segmentID), zap.Int64("LastPartitionStatsVersion", segment.LastPartitionStatsVersion))

Check warning on line 821 in internal/datacoord/meta.go

View check run for this annotation

Codecov / codecov/patch

internal/datacoord/meta.go#L821

Added line #L821 was not covered by tests
return true
}
}
Expand Down Expand Up @@ -1351,12 +1354,33 @@ func (m *meta) SetSegmentLevel(segmentID UniqueID, level datapb.SegmentLevel) {
m.segments.SetLevel(segmentID, level)
}

func getMinPosition(segments []*SegmentInfo) *msgpb.MsgPosition {
positions := lo.Map(segments, func(info *SegmentInfo, _ int) *msgpb.MsgPosition {
return info.GetStartPosition()
})
var minPos *msgpb.MsgPosition
for _, pos := range positions {
if minPos == nil ||
pos != nil && pos.GetTimestamp() < minPos.GetTimestamp() {
minPos = pos
}
}
return minPos
}

func (m *meta) completeClusterCompactionMutation(t *datapb.CompactionTask, result *datapb.CompactionPlanResult) ([]*SegmentInfo, *segMetricMutation, error) {
log := log.With(zap.Int64("planID", t.GetPlanID()), zap.String("type", t.GetType().String()))
log := log.With(zap.Int64("planID", t.GetPlanID()),
zap.String("type", t.GetType().String()),
zap.Int64("collectionID", t.CollectionID),
zap.Int64("partitionID", t.PartitionID),
zap.String("channel", t.GetChannel()))

metricMutation := &segMetricMutation{stateChange: make(map[string]map[string]int)}
var compactFromSegIDs []int64
var latestCompactFromSegments []*SegmentInfo
compactFromSegIDs := make([]int64, 0)
compactToSegIDs := make([]int64, 0)
compactFromSegInfos := make([]*SegmentInfo, 0)
compactToSegInfos := make([]*SegmentInfo, 0)

for _, segmentID := range t.GetInputSegments() {
segment := m.segments.GetSegment(segmentID)
if segment == nil {
Expand All @@ -1367,92 +1391,81 @@ func (m *meta) completeClusterCompactionMutation(t *datapb.CompactionTask, resul
cloned.DroppedAt = uint64(time.Now().UnixNano())
cloned.Compacted = true

latestCompactFromSegments = append(latestCompactFromSegments, cloned)
compactFromSegInfos = append(compactFromSegInfos, cloned)
compactFromSegIDs = append(compactFromSegIDs, cloned.GetID())

// metrics mutation for compaction from segments
updateSegStateAndPrepareMetrics(cloned, commonpb.SegmentState_Dropped, metricMutation)
}

getMinPosition := func(positions []*msgpb.MsgPosition) *msgpb.MsgPosition {
var minPos *msgpb.MsgPosition
for _, pos := range positions {
if minPos == nil ||
pos != nil && pos.GetTimestamp() < minPos.GetTimestamp() {
minPos = pos
}
}
return minPos
}
newSegments := make([]*SegmentInfo, 0)
for _, seg := range result.GetSegments() {
segmentInfo := &datapb.SegmentInfo{
ID: seg.GetSegmentID(),
CollectionID: latestCompactFromSegments[0].CollectionID,
PartitionID: latestCompactFromSegments[0].PartitionID,
CollectionID: t.CollectionID,
PartitionID: t.PartitionID,
InsertChannel: t.GetChannel(),
NumOfRows: seg.NumOfRows,
State: commonpb.SegmentState_Flushed,
MaxRowNum: latestCompactFromSegments[0].MaxRowNum,
MaxRowNum: compactFromSegInfos[0].MaxRowNum,
Binlogs: seg.GetInsertLogs(),
Statslogs: seg.GetField2StatslogPaths(),
CreatedByCompaction: true,
CompactionFrom: compactFromSegIDs,
LastExpireTime: tsoutil.ComposeTSByTime(time.Unix(t.GetStartTime(), 0), 0),
Level: datapb.SegmentLevel_L2,
StartPosition: getMinPosition(lo.Map(latestCompactFromSegments, func(info *SegmentInfo, _ int) *msgpb.MsgPosition {
return info.GetStartPosition()
})),
DmlPosition: getMinPosition(lo.Map(latestCompactFromSegments, func(info *SegmentInfo, _ int) *msgpb.MsgPosition {
return info.GetDmlPosition()
})),
StartPosition: getMinPosition(compactFromSegInfos),
DmlPosition: getMinPosition(compactFromSegInfos),
}
segment := NewSegmentInfo(segmentInfo)
newSegments = append(newSegments, segment)
compactToSegInfos = append(compactToSegInfos, segment)
compactToSegIDs = append(compactFromSegIDs, segment.GetID())
metricMutation.addNewSeg(segment.GetState(), segment.GetLevel(), segment.GetNumOfRows())
}
compactionTo := make([]UniqueID, 0, len(newSegments))
for _, s := range newSegments {
compactionTo = append(compactionTo, s.GetID())
}

log.Info("meta update: prepare for complete compaction mutation - complete",
zap.Int64("collectionID", latestCompactFromSegments[0].CollectionID),
zap.Int64("partitionID", latestCompactFromSegments[0].PartitionID),
zap.Any("compacted from", compactFromSegIDs),
zap.Any("compacted to", compactionTo))
log = log.With(zap.Int64s("compact from", compactFromSegIDs), zap.Int64s("compact to", compactToSegIDs))
log.Debug("meta update: prepare for meta mutation - complete")

compactFromInfos := lo.Map(latestCompactFromSegments, func(info *SegmentInfo, _ int) *datapb.SegmentInfo {
compactFromInfos := lo.Map(compactFromSegInfos, func(info *SegmentInfo, _ int) *datapb.SegmentInfo {
return info.SegmentInfo
})

newSegmentInfos := lo.Map(newSegments, func(info *SegmentInfo, _ int) *datapb.SegmentInfo {
compactToInfos := lo.Map(compactToSegInfos, func(info *SegmentInfo, _ int) *datapb.SegmentInfo {
return info.SegmentInfo
})

binlogs := make([]metastore.BinlogsIncrement, 0)
for _, seg := range newSegmentInfos {
for _, seg := range compactToInfos {
binlogs = append(binlogs, metastore.BinlogsIncrement{Segment: seg})
}
if err := m.catalog.AlterSegments(m.ctx, append(compactFromInfos, newSegmentInfos...), binlogs...); err != nil {
log.Warn("fail to alter segments and new segment", zap.Error(err))
// alter compactTo before compactFrom segments to avoid data lost if service crash during AlterSegments
if err := m.catalog.AlterSegments(m.ctx, compactToInfos, binlogs...); err != nil {
log.Warn("fail to alter compactTo segments", zap.Error(err))
return nil, nil, err

Check warning on line 1443 in internal/datacoord/meta.go

View check run for this annotation

Codecov / codecov/patch

internal/datacoord/meta.go#L1442-L1443

Added lines #L1442 - L1443 were not covered by tests
}
if err := m.catalog.AlterSegments(m.ctx, compactFromInfos); err != nil {
log.Warn("fail to alter compactFrom segments", zap.Error(err))

Check warning on line 1446 in internal/datacoord/meta.go

View check run for this annotation

Codecov / codecov/patch

internal/datacoord/meta.go#L1446

Added line #L1446 was not covered by tests
return nil, nil, err
}
lo.ForEach(latestCompactFromSegments, func(info *SegmentInfo, _ int) {
lo.ForEach(compactFromSegInfos, func(info *SegmentInfo, _ int) {
m.segments.SetSegment(info.GetID(), info)
})
lo.ForEach(newSegments, func(info *SegmentInfo, _ int) {
lo.ForEach(compactToSegInfos, func(info *SegmentInfo, _ int) {
m.segments.SetSegment(info.GetID(), info)
})
return newSegments, metricMutation, nil
log.Info("meta update: alter in memory meta after compaction - complete")
return compactToSegInfos, metricMutation, nil
}

func (m *meta) completeMixCompactionMutation(t *datapb.CompactionTask, result *datapb.CompactionPlanResult) ([]*SegmentInfo, *segMetricMutation, error) {
log := log.With(zap.Int64("planID", t.GetPlanID()), zap.String("type", t.GetType().String()))
log := log.With(zap.Int64("planID", t.GetPlanID()),
zap.String("type", t.GetType().String()),
zap.Int64("collectionID", t.CollectionID),
zap.Int64("partitionID", t.PartitionID),
zap.String("channel", t.GetChannel()))

metricMutation := &segMetricMutation{stateChange: make(map[string]map[string]int)}
var compactFromSegIDs []int64
var latestCompactFromSegments []*SegmentInfo
var compactFromSegInfos []*SegmentInfo
for _, segmentID := range t.GetInputSegments() {
segment := m.segments.GetSegment(segmentID)
if segment == nil {
Expand All @@ -1463,36 +1476,25 @@ func (m *meta) completeMixCompactionMutation(t *datapb.CompactionTask, result *d
cloned.DroppedAt = uint64(time.Now().UnixNano())
cloned.Compacted = true

latestCompactFromSegments = append(latestCompactFromSegments, cloned)
compactFromSegInfos = append(compactFromSegInfos, cloned)
compactFromSegIDs = append(compactFromSegIDs, cloned.GetID())

// metrics mutation for compaction from segments
updateSegStateAndPrepareMetrics(cloned, commonpb.SegmentState_Dropped, metricMutation)
}

getMinPosition := func(positions []*msgpb.MsgPosition) *msgpb.MsgPosition {
var minPos *msgpb.MsgPosition
for _, pos := range positions {
if minPos == nil ||
pos != nil && pos.GetTimestamp() < minPos.GetTimestamp() {
minPos = pos
}
}
return minPos
}

// MixCompaction / MergeCompaction will generates one and only one segment
compactToSegment := result.GetSegments()[0]

compactToSegmentInfo := NewSegmentInfo(
&datapb.SegmentInfo{
ID: compactToSegment.GetSegmentID(),
CollectionID: latestCompactFromSegments[0].CollectionID,
PartitionID: latestCompactFromSegments[0].PartitionID,
CollectionID: t.CollectionID,
PartitionID: t.PartitionID,
InsertChannel: t.GetChannel(),
NumOfRows: compactToSegment.NumOfRows,
State: commonpb.SegmentState_Flushed,
MaxRowNum: latestCompactFromSegments[0].MaxRowNum,
MaxRowNum: compactFromSegInfos[0].MaxRowNum,
Binlogs: compactToSegment.GetInsertLogs(),
Statslogs: compactToSegment.GetField2StatslogPaths(),
Deltalogs: compactToSegment.GetDeltalogs(),
Expand All @@ -1502,12 +1504,8 @@ func (m *meta) completeMixCompactionMutation(t *datapb.CompactionTask, result *d
LastExpireTime: tsoutil.ComposeTSByTime(time.Unix(t.GetStartTime(), 0), 0),
Level: datapb.SegmentLevel_L1,

StartPosition: getMinPosition(lo.Map(latestCompactFromSegments, func(info *SegmentInfo, _ int) *msgpb.MsgPosition {
return info.GetStartPosition()
})),
DmlPosition: getMinPosition(lo.Map(latestCompactFromSegments, func(info *SegmentInfo, _ int) *msgpb.MsgPosition {
return info.GetDmlPosition()
})),
StartPosition: getMinPosition(compactFromSegInfos),
DmlPosition: getMinPosition(compactFromSegInfos),
})

// L1 segment with NumRows=0 will be discarded, so no need to change the metric
Expand All @@ -1519,15 +1517,13 @@ func (m *meta) completeMixCompactionMutation(t *datapb.CompactionTask, result *d
}

log = log.With(
zap.String("channel", t.GetChannel()),
zap.Int64("partitionID", compactToSegmentInfo.GetPartitionID()),
zap.Int64("compactTo segmentID", compactToSegmentInfo.GetID()),
zap.Int64s("compactFrom", compactFromSegIDs),
zap.Int64("compactTo", compactToSegmentInfo.GetID()),
zap.Int64("compactTo segment numRows", compactToSegmentInfo.GetNumOfRows()),
zap.Any("compactFrom segments(to be updated as dropped)", compactFromSegIDs),
)

log.Debug("meta update: prepare for meta mutation - complete")
compactFromInfos := lo.Map(latestCompactFromSegments, func(info *SegmentInfo, _ int) *datapb.SegmentInfo {
compactFromInfos := lo.Map(compactFromSegInfos, func(info *SegmentInfo, _ int) *datapb.SegmentInfo {
return info.SegmentInfo
})

Expand All @@ -1536,14 +1532,18 @@ func (m *meta) completeMixCompactionMutation(t *datapb.CompactionTask, result *d
zap.Int("statslog count", len(compactToSegmentInfo.GetStatslogs())),
zap.Int("deltalog count", len(compactToSegmentInfo.GetDeltalogs())),
)
if err := m.catalog.AlterSegments(m.ctx, append(compactFromInfos, compactToSegmentInfo.SegmentInfo),
if err := m.catalog.AlterSegments(m.ctx, []*datapb.SegmentInfo{compactToSegmentInfo.SegmentInfo},
metastore.BinlogsIncrement{Segment: compactToSegmentInfo.SegmentInfo},
); err != nil {
log.Warn("fail to alter segments and new segment", zap.Error(err))
log.Warn("fail to alter compactTo segments", zap.Error(err))
return nil, nil, err

Check warning on line 1539 in internal/datacoord/meta.go

View check run for this annotation

Codecov / codecov/patch

internal/datacoord/meta.go#L1538-L1539

Added lines #L1538 - L1539 were not covered by tests
}
if err := m.catalog.AlterSegments(m.ctx, compactFromInfos); err != nil {
log.Warn("fail to alter compactFrom segments", zap.Error(err))

Check warning on line 1542 in internal/datacoord/meta.go

View check run for this annotation

Codecov / codecov/patch

internal/datacoord/meta.go#L1542

Added line #L1542 was not covered by tests
return nil, nil, err
}

lo.ForEach(latestCompactFromSegments, func(info *SegmentInfo, _ int) {
lo.ForEach(compactFromSegInfos, func(info *SegmentInfo, _ int) {
m.segments.SetSegment(info.GetID(), info)
})
m.segments.SetSegment(compactToSegmentInfo.GetID(), compactToSegmentInfo)
Expand Down

0 comments on commit 5f95aae

Please sign in to comment.