Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enhance: try compact small segments first if they may compose a full segment (#37709) #38203

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
188 changes: 43 additions & 145 deletions internal/datacoord/compaction_trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package datacoord
import (
"context"
"fmt"
"sort"
"math"
"sync"
"time"

Expand Down Expand Up @@ -283,23 +283,6 @@ func (t *compactionTrigger) allocSignalID() (UniqueID, error) {
return t.allocator.AllocID(ctx)
}

func (t *compactionTrigger) getExpectedSegmentSize(collectionID int64) int64 {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
collMeta, err := t.handler.GetCollection(ctx, collectionID)
if err != nil {
log.Warn("failed to get collection", zap.Int64("collectionID", collectionID), zap.Error(err))
return Params.DataCoordCfg.SegmentMaxSize.GetAsInt64() * 1024 * 1024
}
allDiskIndex := t.meta.indexMeta.AreAllDiskIndex(collectionID, collMeta.Schema)
if allDiskIndex {
// Only if all vector fields index type are DiskANN, recalc segment max size here.
return Params.DataCoordCfg.DiskSegmentMaxSize.GetAsInt64() * 1024 * 1024
}
// If some vector fields index type are not DiskANN, recalc segment max size using default policy.
return Params.DataCoordCfg.SegmentMaxSize.GetAsInt64() * 1024 * 1024
}

func (t *compactionTrigger) handleGlobalSignal(signal *compactionSignal) error {
t.forceMu.Lock()
defer t.forceMu.Unlock()
Expand Down Expand Up @@ -548,107 +531,57 @@ func (t *compactionTrigger) generatePlans(segments []*SegmentInfo, signal *compa
}

buckets := [][]*SegmentInfo{}
// sort segment from large to small
sort.Slice(prioritizedCandidates, func(i, j int) bool {
if prioritizedCandidates[i].getSegmentSize() != prioritizedCandidates[j].getSegmentSize() {
return prioritizedCandidates[i].getSegmentSize() > prioritizedCandidates[j].getSegmentSize()
}
return prioritizedCandidates[i].GetID() < prioritizedCandidates[j].GetID()
})
toUpdate := newSegmentPacker("update", prioritizedCandidates)
toMerge := newSegmentPacker("merge", smallCandidates)
toPack := newSegmentPacker("pack", nonPlannedSegments)

sort.Slice(smallCandidates, func(i, j int) bool {
if smallCandidates[i].getSegmentSize() != smallCandidates[j].getSegmentSize() {
return smallCandidates[i].getSegmentSize() > smallCandidates[j].getSegmentSize()
}
return smallCandidates[i].GetID() < smallCandidates[j].GetID()
})

// Sort non-planned from small to large.
sort.Slice(nonPlannedSegments, func(i, j int) bool {
if nonPlannedSegments[i].getSegmentSize() != nonPlannedSegments[j].getSegmentSize() {
return nonPlannedSegments[i].getSegmentSize() < nonPlannedSegments[j].getSegmentSize()
maxSegs := Params.DataCoordCfg.MaxSegmentToMerge.GetAsInt64()
minSegs := Params.DataCoordCfg.MinSegmentToMerge.GetAsInt64()
compactableProportion := Params.DataCoordCfg.SegmentCompactableProportion.GetAsFloat()
satisfiedSize := int64(float64(expectedSize) * compactableProportion)
expantionRate := Params.DataCoordCfg.SegmentExpansionRate.GetAsFloat()
maxLeftSize := expectedSize - satisfiedSize
expectedExpandedSize := int64(float64(expectedSize) * expantionRate)
maxExpandedLeftSize := expectedExpandedSize - satisfiedSize
// 1. Merge small segments if they can make a full bucket
for {
pack, _ := toMerge.pack(expectedSize, maxLeftSize, minSegs, maxSegs)
if len(pack) == 0 {
break
}
return nonPlannedSegments[i].GetID() > nonPlannedSegments[j].GetID()
})

// greedy pick from large segment to small, the goal is to fill each segment to reach 512M
buckets = append(buckets, pack)
}
// 2. greedy pick from large segment to small, the goal is to fill each segment to reach 512M
// we must ensure all prioritized candidates is in a plan
// TODO the compaction selection policy should consider if compaction workload is high
for len(prioritizedCandidates) > 0 {
var bucket []*SegmentInfo
// pop out the first element
segment := prioritizedCandidates[0]
bucket = append(bucket, segment)
prioritizedCandidates = prioritizedCandidates[1:]

// only do single file compaction if segment is already large enough
if segment.getSegmentSize() < expectedSize {
var result []*SegmentInfo
free := expectedSize - segment.getSegmentSize()
maxNum := Params.DataCoordCfg.MaxSegmentToMerge.GetAsInt() - 1
prioritizedCandidates, result, free = greedySelect(prioritizedCandidates, free, maxNum)
bucket = append(bucket, result...)
maxNum -= len(result)
if maxNum > 0 {
smallCandidates, result, _ = greedySelect(smallCandidates, free, maxNum)
bucket = append(bucket, result...)
}
}
// since this is priority compaction, we will execute even if there is only segment
log.Info("pick priority candidate for compaction",
zap.Int64("prioritized segmentID", segment.GetID()),
zap.Int64s("picked segmentIDs", lo.Map(bucket, func(s *SegmentInfo, _ int) int64 { return s.GetID() })),
zap.Int64("target size", lo.SumBy(bucket, func(s *SegmentInfo) int64 { return s.getSegmentSize() })),
zap.Int64("target count", lo.SumBy(bucket, func(s *SegmentInfo) int64 { return s.GetNumOfRows() })),
)
buckets = append(buckets, bucket)
}

var remainingSmallSegs []*SegmentInfo
// check if there are small candidates left can be merged into large segments
for len(smallCandidates) > 0 {
var bucket []*SegmentInfo
// pop out the first element
segment := smallCandidates[0]
bucket = append(bucket, segment)
smallCandidates = smallCandidates[1:]

var result []*SegmentInfo
free := expectedSize - segment.getSegmentSize()
// for small segment merge, we pick one largest segment and merge as much as small segment together with it
// Why reverse? try to merge as many segments as expected.
// for instance, if a 255M and 255M is the largest small candidates, they will never be merged because of the MinSegmentToMerge limit.
smallCandidates, result, _ = reverseGreedySelect(smallCandidates, free, Params.DataCoordCfg.MaxSegmentToMerge.GetAsInt()-1)
bucket = append(bucket, result...)

// only merge if candidate number is large than MinSegmentToMerge or if target size is large enough
targetSize := lo.SumBy(bucket, func(s *SegmentInfo) int64 { return s.getSegmentSize() })
if len(bucket) >= Params.DataCoordCfg.MinSegmentToMerge.GetAsInt() ||
len(bucket) > 1 && t.isCompactableSegment(targetSize, expectedSize) {
buckets = append(buckets, bucket)
} else {
remainingSmallSegs = append(remainingSmallSegs, bucket...)
for {
// No limit on the remaining size because we want to pack all prioritized candidates
pack, _ := toUpdate.packWith(expectedSize, math.MaxInt64, 0, maxSegs, toMerge)
if len(pack) == 0 {
break
}
buckets = append(buckets, pack)
}

remainingSmallSegs = t.squeezeSmallSegmentsToBuckets(remainingSmallSegs, buckets, expectedSize)

// If there are still remaining small segments, try adding them to non-planned segments.
for _, npSeg := range nonPlannedSegments {
bucket := []*SegmentInfo{npSeg}
targetSize := npSeg.getSegmentSize()
for i := len(remainingSmallSegs) - 1; i >= 0; i-- {
// Note: could also simply use MaxRowNum as limit.
if targetSize+remainingSmallSegs[i].getSegmentSize() <=
int64(Params.DataCoordCfg.SegmentExpansionRate.GetAsFloat()*float64(expectedSize)) {
bucket = append(bucket, remainingSmallSegs[i])
targetSize += remainingSmallSegs[i].getSegmentSize()
remainingSmallSegs = append(remainingSmallSegs[:i], remainingSmallSegs[i+1:]...)
}
// 2.+ legacy: squeeze small segments
// Try merge all small segments, and then squeeze
for {
pack, _ := toMerge.pack(expectedSize, math.MaxInt64, minSegs, maxSegs)
if len(pack) == 0 {
break
}
if len(bucket) > 1 {
buckets = append(buckets, bucket)
buckets = append(buckets, pack)
}
remaining := t.squeezeSmallSegmentsToBuckets(toMerge.candidates, buckets, expectedSize)
toMerge = newSegmentPacker("merge", remaining)

// 3. pack remaining small segments with non-planned segments
for {
pack, _ := toMerge.packWith(expectedExpandedSize, maxExpandedLeftSize, minSegs, maxSegs, toPack)
if len(pack) == 0 {
break
}
buckets = append(buckets, pack)
}

tasks := make([]*typeutil.Pair[int64, []int64], len(buckets))
Expand All @@ -666,37 +599,6 @@ func (t *compactionTrigger) generatePlans(segments []*SegmentInfo, signal *compa
return tasks
}

func greedySelect(candidates []*SegmentInfo, free int64, maxSegment int) ([]*SegmentInfo, []*SegmentInfo, int64) {
var result []*SegmentInfo

for i := 0; i < len(candidates); {
candidate := candidates[i]
if len(result) < maxSegment && candidate.getSegmentSize() < free {
result = append(result, candidate)
free -= candidate.getSegmentSize()
candidates = append(candidates[:i], candidates[i+1:]...)
} else {
i++
}
}

return candidates, result, free
}

func reverseGreedySelect(candidates []*SegmentInfo, free int64, maxSegment int) ([]*SegmentInfo, []*SegmentInfo, int64) {
var result []*SegmentInfo

for i := len(candidates) - 1; i >= 0; i-- {
candidate := candidates[i]
if (len(result) < maxSegment) && (candidate.getSegmentSize() < free) {
result = append(result, candidate)
free -= candidate.getSegmentSize()
candidates = append(candidates[:i], candidates[i+1:]...)
}
}
return candidates, result, free
}

func (t *compactionTrigger) getCandidateSegments(channel string, partitionID UniqueID) []*SegmentInfo {
segments := t.meta.GetSegmentsByChannel(channel)
if Params.DataCoordCfg.IndexBasedCompaction.GetAsBool() {
Expand Down Expand Up @@ -743,10 +645,6 @@ func isExpandableSmallSegment(segment *SegmentInfo, expectedSize int64) bool {

func isDeltalogTooManySegment(segment *SegmentInfo) bool {
deltaLogCount := GetBinlogCount(segment.GetDeltalogs())
log.Debug("isDeltalogTooManySegment",
zap.Int64("collectionID", segment.CollectionID),
zap.Int64("segmentID", segment.ID),
zap.Int("deltaLogCount", deltaLogCount))
return deltaLogCount > Params.DataCoordCfg.SingleCompactionDeltalogMaxNum.GetAsInt()
}

Expand Down
Loading
Loading