Skip to content

Commit

Permalink
enhance: Alter compactTo segments before compactFrom to avoid data lo…
Browse files Browse the repository at this point in the history
…ss if crash (#34513)

#34512

Signed-off-by: wayblink <[email protected]>
  • Loading branch information
wayblink authored Jul 11, 2024
1 parent a08a0c8 commit 358e9a1
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 82 deletions.
21 changes: 4 additions & 17 deletions internal/datacoord/compaction_trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,18 +388,13 @@ func (t *compactionTrigger) handleGlobalSignal(signal *compactionSignal) error {
}

if !signal.isForce && !t.isCollectionAutoCompactionEnabled(coll) {
log.RatedInfo(20, "collection auto compaction disabled",
zap.Int64("collectionID", group.collectionID),
)
log.RatedInfo(20, "collection auto compaction disabled")
return nil
}

ct, err := getCompactTime(tsoutil.ComposeTSByTime(time.Now(), 0), coll)
if err != nil {
log.Warn("get compact time failed, skip to handle compaction",
zap.Int64("collectionID", group.collectionID),
zap.Int64("partitionID", group.partitionID),
zap.String("channel", group.channelName))
log.Warn("get compact time failed, skip to handle compaction")
return err
}

Expand All @@ -412,9 +407,7 @@ func (t *compactionTrigger) handleGlobalSignal(signal *compactionSignal) error {
totalRows := plan.A
segIDs := plan.B
if !signal.isForce && t.compactionHandler.isFull() {
log.Warn("compaction plan skipped due to handler full",
zap.Int64("collectionID", signal.collectionID),
zap.Int64s("segmentIDs", segIDs))
log.Warn("compaction plan skipped due to handler full", zap.Int64s("segmentIDs", segIDs))
break
}
start := time.Now()
Expand All @@ -429,7 +422,7 @@ func (t *compactionTrigger) handleGlobalSignal(signal *compactionSignal) error {
TimeoutInSeconds: Params.DataCoordCfg.CompactionTimeoutInSeconds.GetAsInt32(),
Type: datapb.CompactionType_MixCompaction,
CollectionTtl: ct.collectionTTL.Nanoseconds(),
CollectionID: signal.collectionID,
CollectionID: group.collectionID,
PartitionID: group.partitionID,
Channel: group.channelName,
InputSegments: segIDs,
Expand All @@ -439,19 +432,13 @@ func (t *compactionTrigger) handleGlobalSignal(signal *compactionSignal) error {
err := t.compactionHandler.enqueueCompaction(task)
if err != nil {
log.Warn("failed to execute compaction task",
zap.Int64("collectionID", signal.collectionID),
zap.Int64("planID", planID),
zap.Int64s("segmentIDs", segIDs),
zap.Error(err))
continue
}

log.Info("time cost of generating global compaction",
zap.Int64("planID", planID),
zap.Int64("time cost", time.Since(start).Milliseconds()),
zap.Int64("collectionID", signal.collectionID),
zap.String("channel", group.channelName),
zap.Int64("partitionID", group.partitionID),
zap.Int64s("segmentIDs", segIDs))
}
}
Expand Down
135 changes: 70 additions & 65 deletions internal/datacoord/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -789,6 +789,7 @@ func UpdateSegmentPartitionStatsVersionOperator(segmentID int64, version int64)
}
segment.LastPartitionStatsVersion = segment.PartitionStatsVersion
segment.PartitionStatsVersion = version
log.Debug("update segment version", zap.Int64("segmentID", segmentID), zap.Int64("PartitionStatsVersion", version), zap.Int64("LastPartitionStatsVersion", segment.LastPartitionStatsVersion))
return true
}
}
Expand All @@ -802,6 +803,7 @@ func RevertSegmentLevelOperator(segmentID int64) UpdateOperator {
return false
}
segment.Level = segment.LastLevel
log.Debug("revert segment level", zap.Int64("segmentID", segmentID), zap.String("LastLevel", segment.LastLevel.String()))
return true
}
}
Expand All @@ -815,6 +817,7 @@ func RevertSegmentPartitionStatsVersionOperator(segmentID int64) UpdateOperator
return false
}
segment.PartitionStatsVersion = segment.LastPartitionStatsVersion
log.Debug("revert segment partition stats version", zap.Int64("segmentID", segmentID), zap.Int64("LastPartitionStatsVersion", segment.LastPartitionStatsVersion))
return true
}
}
Expand Down Expand Up @@ -1350,12 +1353,30 @@ func (m *meta) SetSegmentLevel(segmentID UniqueID, level datapb.SegmentLevel) {
m.segments.SetLevel(segmentID, level)
}

func getMinPosition(positions []*msgpb.MsgPosition) *msgpb.MsgPosition {
var minPos *msgpb.MsgPosition
for _, pos := range positions {
if minPos == nil ||
pos != nil && pos.GetTimestamp() < minPos.GetTimestamp() {
minPos = pos
}
}
return minPos
}

func (m *meta) completeClusterCompactionMutation(t *datapb.CompactionTask, result *datapb.CompactionPlanResult) ([]*SegmentInfo, *segMetricMutation, error) {
log := log.With(zap.Int64("planID", t.GetPlanID()), zap.String("type", t.GetType().String()))
log := log.With(zap.Int64("planID", t.GetPlanID()),
zap.String("type", t.GetType().String()),
zap.Int64("collectionID", t.CollectionID),
zap.Int64("partitionID", t.PartitionID),
zap.String("channel", t.GetChannel()))

metricMutation := &segMetricMutation{stateChange: make(map[string]map[string]int)}
var compactFromSegIDs []int64
var latestCompactFromSegments []*SegmentInfo
compactFromSegIDs := make([]int64, 0)
compactToSegIDs := make([]int64, 0)
compactFromSegInfos := make([]*SegmentInfo, 0)
compactToSegInfos := make([]*SegmentInfo, 0)

for _, segmentID := range t.GetInputSegments() {
segment := m.segments.GetSegment(segmentID)
if segment == nil {
Expand All @@ -1366,92 +1387,85 @@ func (m *meta) completeClusterCompactionMutation(t *datapb.CompactionTask, resul
cloned.DroppedAt = uint64(time.Now().UnixNano())
cloned.Compacted = true

latestCompactFromSegments = append(latestCompactFromSegments, cloned)
compactFromSegInfos = append(compactFromSegInfos, cloned)
compactFromSegIDs = append(compactFromSegIDs, cloned.GetID())

// metrics mutation for compaction from segments
updateSegStateAndPrepareMetrics(cloned, commonpb.SegmentState_Dropped, metricMutation)
}

getMinPosition := func(positions []*msgpb.MsgPosition) *msgpb.MsgPosition {
var minPos *msgpb.MsgPosition
for _, pos := range positions {
if minPos == nil ||
pos != nil && pos.GetTimestamp() < minPos.GetTimestamp() {
minPos = pos
}
}
return minPos
}
newSegments := make([]*SegmentInfo, 0)
for _, seg := range result.GetSegments() {
segmentInfo := &datapb.SegmentInfo{
ID: seg.GetSegmentID(),
CollectionID: latestCompactFromSegments[0].CollectionID,
PartitionID: latestCompactFromSegments[0].PartitionID,
CollectionID: compactFromSegInfos[0].CollectionID,
PartitionID: compactFromSegInfos[0].PartitionID,
InsertChannel: t.GetChannel(),
NumOfRows: seg.NumOfRows,
State: commonpb.SegmentState_Flushed,
MaxRowNum: latestCompactFromSegments[0].MaxRowNum,
MaxRowNum: compactFromSegInfos[0].MaxRowNum,
Binlogs: seg.GetInsertLogs(),
Statslogs: seg.GetField2StatslogPaths(),
CreatedByCompaction: true,
CompactionFrom: compactFromSegIDs,
LastExpireTime: tsoutil.ComposeTSByTime(time.Unix(t.GetStartTime(), 0), 0),
Level: datapb.SegmentLevel_L2,
StartPosition: getMinPosition(lo.Map(latestCompactFromSegments, func(info *SegmentInfo, _ int) *msgpb.MsgPosition {
StartPosition: getMinPosition(lo.Map(compactFromSegInfos, func(info *SegmentInfo, _ int) *msgpb.MsgPosition {
return info.GetStartPosition()
})),
DmlPosition: getMinPosition(lo.Map(latestCompactFromSegments, func(info *SegmentInfo, _ int) *msgpb.MsgPosition {
DmlPosition: getMinPosition(lo.Map(compactFromSegInfos, func(info *SegmentInfo, _ int) *msgpb.MsgPosition {
return info.GetDmlPosition()
})),
}
segment := NewSegmentInfo(segmentInfo)
newSegments = append(newSegments, segment)
compactToSegInfos = append(compactToSegInfos, segment)
compactToSegIDs = append(compactToSegIDs, segment.GetID())
metricMutation.addNewSeg(segment.GetState(), segment.GetLevel(), segment.GetNumOfRows())
}
compactionTo := make([]UniqueID, 0, len(newSegments))
for _, s := range newSegments {
compactionTo = append(compactionTo, s.GetID())
}

log.Info("meta update: prepare for complete compaction mutation - complete",
zap.Int64("collectionID", latestCompactFromSegments[0].CollectionID),
zap.Int64("partitionID", latestCompactFromSegments[0].PartitionID),
zap.Any("compacted from", compactFromSegIDs),
zap.Any("compacted to", compactionTo))
log = log.With(zap.Int64s("compact from", compactFromSegIDs), zap.Int64s("compact to", compactToSegIDs))
log.Debug("meta update: prepare for meta mutation - complete")

compactFromInfos := lo.Map(latestCompactFromSegments, func(info *SegmentInfo, _ int) *datapb.SegmentInfo {
compactFromInfos := lo.Map(compactFromSegInfos, func(info *SegmentInfo, _ int) *datapb.SegmentInfo {
return info.SegmentInfo
})

newSegmentInfos := lo.Map(newSegments, func(info *SegmentInfo, _ int) *datapb.SegmentInfo {
compactToInfos := lo.Map(compactToSegInfos, func(info *SegmentInfo, _ int) *datapb.SegmentInfo {
return info.SegmentInfo
})

binlogs := make([]metastore.BinlogsIncrement, 0)
for _, seg := range newSegmentInfos {
for _, seg := range compactToInfos {
binlogs = append(binlogs, metastore.BinlogsIncrement{Segment: seg})
}
if err := m.catalog.AlterSegments(m.ctx, append(compactFromInfos, newSegmentInfos...), binlogs...); err != nil {
log.Warn("fail to alter segments and new segment", zap.Error(err))
// alter compactTo before compactFrom segments to avoid data lost if service crash during AlterSegments
if err := m.catalog.AlterSegments(m.ctx, compactToInfos, binlogs...); err != nil {
log.Warn("fail to alter compactTo segments", zap.Error(err))
return nil, nil, err
}
if err := m.catalog.AlterSegments(m.ctx, compactFromInfos); err != nil {
log.Warn("fail to alter compactFrom segments", zap.Error(err))
return nil, nil, err
}
lo.ForEach(latestCompactFromSegments, func(info *SegmentInfo, _ int) {
lo.ForEach(compactFromSegInfos, func(info *SegmentInfo, _ int) {
m.segments.SetSegment(info.GetID(), info)
})
lo.ForEach(newSegments, func(info *SegmentInfo, _ int) {
lo.ForEach(compactToSegInfos, func(info *SegmentInfo, _ int) {
m.segments.SetSegment(info.GetID(), info)
})
return newSegments, metricMutation, nil
log.Info("meta update: alter in memory meta after compaction - complete")
return compactToSegInfos, metricMutation, nil
}

func (m *meta) completeMixCompactionMutation(t *datapb.CompactionTask, result *datapb.CompactionPlanResult) ([]*SegmentInfo, *segMetricMutation, error) {
log := log.With(zap.Int64("planID", t.GetPlanID()), zap.String("type", t.GetType().String()))
log := log.With(zap.Int64("planID", t.GetPlanID()),
zap.String("type", t.GetType().String()),
zap.Int64("collectionID", t.CollectionID),
zap.Int64("partitionID", t.PartitionID),
zap.String("channel", t.GetChannel()))

metricMutation := &segMetricMutation{stateChange: make(map[string]map[string]int)}
var compactFromSegIDs []int64
var latestCompactFromSegments []*SegmentInfo
var compactFromSegInfos []*SegmentInfo
for _, segmentID := range t.GetInputSegments() {
segment := m.segments.GetSegment(segmentID)
if segment == nil {
Expand All @@ -1462,36 +1476,25 @@ func (m *meta) completeMixCompactionMutation(t *datapb.CompactionTask, result *d
cloned.DroppedAt = uint64(time.Now().UnixNano())
cloned.Compacted = true

latestCompactFromSegments = append(latestCompactFromSegments, cloned)
compactFromSegInfos = append(compactFromSegInfos, cloned)
compactFromSegIDs = append(compactFromSegIDs, cloned.GetID())

// metrics mutation for compaction from segments
updateSegStateAndPrepareMetrics(cloned, commonpb.SegmentState_Dropped, metricMutation)
}

getMinPosition := func(positions []*msgpb.MsgPosition) *msgpb.MsgPosition {
var minPos *msgpb.MsgPosition
for _, pos := range positions {
if minPos == nil ||
pos != nil && pos.GetTimestamp() < minPos.GetTimestamp() {
minPos = pos
}
}
return minPos
}

// MixCompaction / MergeCompaction will generates one and only one segment
compactToSegment := result.GetSegments()[0]

compactToSegmentInfo := NewSegmentInfo(
&datapb.SegmentInfo{
ID: compactToSegment.GetSegmentID(),
CollectionID: latestCompactFromSegments[0].CollectionID,
PartitionID: latestCompactFromSegments[0].PartitionID,
CollectionID: compactFromSegInfos[0].CollectionID,
PartitionID: compactFromSegInfos[0].PartitionID,
InsertChannel: t.GetChannel(),
NumOfRows: compactToSegment.NumOfRows,
State: commonpb.SegmentState_Flushed,
MaxRowNum: latestCompactFromSegments[0].MaxRowNum,
MaxRowNum: compactFromSegInfos[0].MaxRowNum,
Binlogs: compactToSegment.GetInsertLogs(),
Statslogs: compactToSegment.GetField2StatslogPaths(),
Deltalogs: compactToSegment.GetDeltalogs(),
Expand All @@ -1501,10 +1504,10 @@ func (m *meta) completeMixCompactionMutation(t *datapb.CompactionTask, result *d
LastExpireTime: tsoutil.ComposeTSByTime(time.Unix(t.GetStartTime(), 0), 0),
Level: datapb.SegmentLevel_L1,

StartPosition: getMinPosition(lo.Map(latestCompactFromSegments, func(info *SegmentInfo, _ int) *msgpb.MsgPosition {
StartPosition: getMinPosition(lo.Map(compactFromSegInfos, func(info *SegmentInfo, _ int) *msgpb.MsgPosition {
return info.GetStartPosition()
})),
DmlPosition: getMinPosition(lo.Map(latestCompactFromSegments, func(info *SegmentInfo, _ int) *msgpb.MsgPosition {
DmlPosition: getMinPosition(lo.Map(compactFromSegInfos, func(info *SegmentInfo, _ int) *msgpb.MsgPosition {
return info.GetDmlPosition()
})),
})
Expand All @@ -1518,15 +1521,13 @@ func (m *meta) completeMixCompactionMutation(t *datapb.CompactionTask, result *d
}

log = log.With(
zap.String("channel", t.GetChannel()),
zap.Int64("partitionID", compactToSegmentInfo.GetPartitionID()),
zap.Int64("compactTo segmentID", compactToSegmentInfo.GetID()),
zap.Int64s("compactFrom", compactFromSegIDs),
zap.Int64("compactTo", compactToSegmentInfo.GetID()),
zap.Int64("compactTo segment numRows", compactToSegmentInfo.GetNumOfRows()),
zap.Any("compactFrom segments(to be updated as dropped)", compactFromSegIDs),
)

log.Debug("meta update: prepare for meta mutation - complete")
compactFromInfos := lo.Map(latestCompactFromSegments, func(info *SegmentInfo, _ int) *datapb.SegmentInfo {
compactFromInfos := lo.Map(compactFromSegInfos, func(info *SegmentInfo, _ int) *datapb.SegmentInfo {
return info.SegmentInfo
})

Expand All @@ -1535,14 +1536,18 @@ func (m *meta) completeMixCompactionMutation(t *datapb.CompactionTask, result *d
zap.Int("statslog count", len(compactToSegmentInfo.GetStatslogs())),
zap.Int("deltalog count", len(compactToSegmentInfo.GetDeltalogs())),
)
if err := m.catalog.AlterSegments(m.ctx, append(compactFromInfos, compactToSegmentInfo.SegmentInfo),
if err := m.catalog.AlterSegments(m.ctx, []*datapb.SegmentInfo{compactToSegmentInfo.SegmentInfo},
metastore.BinlogsIncrement{Segment: compactToSegmentInfo.SegmentInfo},
); err != nil {
log.Warn("fail to alter segments and new segment", zap.Error(err))
log.Warn("fail to alter compactTo segments", zap.Error(err))
return nil, nil, err
}
if err := m.catalog.AlterSegments(m.ctx, compactFromInfos); err != nil {
log.Warn("fail to alter compactFrom segments", zap.Error(err))
return nil, nil, err
}

lo.ForEach(latestCompactFromSegments, func(info *SegmentInfo, _ int) {
lo.ForEach(compactFromSegInfos, func(info *SegmentInfo, _ int) {
m.segments.SetSegment(info.GetID(), info)
})
m.segments.SetSegment(compactToSegmentInfo.GetID(), compactToSegmentInfo)
Expand Down

0 comments on commit 358e9a1

Please sign in to comment.