Skip to content

Commit

Permalink
enhance: improve mix compaction performance by removing max segment l…
Browse files Browse the repository at this point in the history
…imitations (#38344)

See #37234

---------

Signed-off-by: Ted Xu <[email protected]>
  • Loading branch information
tedxu authored Dec 11, 2024
1 parent e279ccf commit dc85d8e
Show file tree
Hide file tree
Showing 6 changed files with 51 additions and 39 deletions.
1 change: 1 addition & 0 deletions configs/milvus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -682,6 +682,7 @@ dataNode:
levelZeroBatchMemoryRatio: 0.5 # The minimal memory ratio of free memory for level zero compaction executing in batch mode
levelZeroMaxBatchSize: -1 # Max batch size refers to the max number of L1/L2 segments in a batch when executing L0 compaction. Default to -1, any value that is less than 1 means no limit. Valid range: >= 1.
useMergeSort: false # Whether to enable mergeSort mode when performing mixCompaction.
maxSegmentMergeSort: 30 # The maximum number of segments to be merged in mergeSort mode.
gracefulStopTimeout: 1800 # seconds. force stop node without graceful stop
slot:
slotCap: 16 # The maximum number of tasks(e.g. compaction, importing) allowed to run concurrently on a datanode
Expand Down
23 changes: 18 additions & 5 deletions internal/datacoord/compaction_trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -535,31 +535,34 @@ func (t *compactionTrigger) generatePlans(segments []*SegmentInfo, signal *compa
toMerge := newSegmentPacker("merge", smallCandidates)
toPack := newSegmentPacker("pack", nonPlannedSegments)

maxSegs := Params.DataCoordCfg.MaxSegmentToMerge.GetAsInt64()
maxSegs := int64(4096) // Deprecate the max segment limit since it is irrelevant in simple compactions.
minSegs := Params.DataCoordCfg.MinSegmentToMerge.GetAsInt64()
compactableProportion := Params.DataCoordCfg.SegmentCompactableProportion.GetAsFloat()
satisfiedSize := int64(float64(expectedSize) * compactableProportion)
expantionRate := Params.DataCoordCfg.SegmentExpansionRate.GetAsFloat()
maxLeftSize := expectedSize - satisfiedSize
expectedExpandedSize := int64(float64(expectedSize) * expantionRate)
maxExpandedLeftSize := expectedExpandedSize - satisfiedSize
reasons := make([]string, 0)
// 1. Merge small segments if they can make a full bucket
for {
pack, _ := toMerge.pack(expectedSize, maxLeftSize, minSegs, maxSegs)
pack, left := toMerge.pack(expectedSize, maxLeftSize, minSegs, maxSegs)
if len(pack) == 0 {
break
}
reasons = append(reasons, fmt.Sprintf("merging %d small segments with left size %d", len(pack), left))
buckets = append(buckets, pack)
}
// 2. greedy pick from large segment to small, the goal is to fill each segment to reach 512M
// we must ensure all prioritized candidates is in a plan

// 2. Pack prioritized candidates with small segments
// TODO the compaction selection policy should consider if compaction workload is high
for {
// No limit on the remaining size because we want to pack all prioritized candidates
pack, _ := toUpdate.packWith(expectedSize, math.MaxInt64, 0, maxSegs, toMerge)
if len(pack) == 0 {
break
}
reasons = append(reasons, fmt.Sprintf("packing %d prioritized segments", len(pack)))
buckets = append(buckets, pack)
}

Expand All @@ -570,6 +573,7 @@ func (t *compactionTrigger) generatePlans(segments []*SegmentInfo, signal *compa
if len(pack) == 0 {
break
}
reasons = append(reasons, fmt.Sprintf("packing all %d small segments", len(pack)))
buckets = append(buckets, pack)
}
remaining := t.squeezeSmallSegmentsToBuckets(toMerge.candidates, buckets, expectedSize)
Expand All @@ -581,6 +585,7 @@ func (t *compactionTrigger) generatePlans(segments []*SegmentInfo, signal *compa
if len(pack) == 0 {
break
}
reasons = append(reasons, fmt.Sprintf("packing %d small segments and non-planned segments", len(pack)))
buckets = append(buckets, pack)
}

Expand All @@ -595,7 +600,15 @@ func (t *compactionTrigger) generatePlans(segments []*SegmentInfo, signal *compa
pair := typeutil.NewPair(totalRows, segmentIDs)
tasks[i] = &pair
}
log.Info("generatePlans", zap.Int64("collectionID", signal.collectionID), zap.Int("plan_num", len(tasks)))

if len(tasks) > 0 {
log.Info("generated nontrivial compaction tasks",
zap.Int64("collectionID", signal.collectionID),
zap.Int("prioritizedCandidates", len(prioritizedCandidates)),
zap.Int("smallCandidates", len(smallCandidates)),
zap.Int("nonPlannedSegments", len(nonPlannedSegments)),
zap.Strings("reasons", reasons))
}
return tasks
}

Expand Down
20 changes: 9 additions & 11 deletions internal/datacoord/compaction_trigger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -751,7 +751,8 @@ func Test_compactionTrigger_force_maxSegmentLimit(t *testing.T) {
},
}

for i := UniqueID(0); i < 50; i++ {
nSegments := 50
for i := UniqueID(0); i < UniqueID(nSegments); i++ {
info := &SegmentInfo{
SegmentInfo: &datapb.SegmentInfo{
ID: i,
Expand Down Expand Up @@ -913,16 +914,13 @@ func Test_compactionTrigger_force_maxSegmentLimit(t *testing.T) {
assert.Equal(t, tt.wantErr, err != nil)
spy := (tt.fields.compactionHandler).(*spyCompactionHandler)

// should be split into two plans
plan := <-spy.spyChan
assert.NotEmpty(t, plan)

// TODO CZS
// assert.Equal(t, len(plan.SegmentBinlogs), 30)
plan = <-spy.spyChan
assert.NotEmpty(t, plan)
// TODO CZS
// assert.Equal(t, len(plan.SegmentBinlogs), 20)
select {
case plan := <-spy.spyChan:
assert.NotEmpty(t, plan)
assert.Equal(t, len(plan.SegmentBinlogs), nSegments)
case <-time.After(2 * time.Second):
assert.Fail(t, "timeout")
}
})
}
}
Expand Down
8 changes: 0 additions & 8 deletions internal/datacoord/knapsack.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,6 @@ import (
"sort"

"github.com/bits-and-blooms/bitset"
"go.uber.org/zap"

"github.com/milvus-io/milvus/pkg/log"
)

type Sizable interface {
Expand Down Expand Up @@ -65,11 +62,6 @@ func (c *Knapsack[T]) tryPack(size, maxLeftSize, minSegs, maxSegs int64) (bitset

nSelections := selection.Count()
if left > maxLeftSize || nSelections < uint(minSegs) {
log.Debug("tryPack failed",
zap.String("name", c.name),
zap.Int64("left", left), zap.Int64("maxLeftSize", maxLeftSize),
zap.Int64("minSegs", minSegs),
zap.Uint("nselections", nSelections))
selection.ClearAll()
left = size
}
Expand Down
20 changes: 13 additions & 7 deletions internal/datanode/compaction/mix_compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,17 +329,23 @@ func (t *mixCompactionTask) Compact() (*datapb.CompactionPlanResult, error) {
return nil, errors.New("illegal compaction plan")
}

allSorted := true
for _, segment := range t.plan.GetSegmentBinlogs() {
if !segment.GetIsSorted() {
allSorted = false
break
sortMergeAppicable := paramtable.Get().DataNodeCfg.UseMergeSort.GetAsBool()
if sortMergeAppicable {
for _, segment := range t.plan.GetSegmentBinlogs() {
if !segment.GetIsSorted() {
sortMergeAppicable = false
break
}
}
if len(insertPaths) <= 1 || len(insertPaths) > paramtable.Get().DataNodeCfg.MaxSegmentMergeSort.GetAsInt() {
// sort merge is not applicable if there is only one segment or too many segments
sortMergeAppicable = false
}
}

var res []*datapb.CompactionSegment
if paramtable.Get().DataNodeCfg.UseMergeSort.GetAsBool() && allSorted && len(t.plan.GetSegmentBinlogs()) > 1 {
log.Info("all segments are sorted, use merge sort")
if sortMergeAppicable {
log.Info("compact by merge sort")
res, err = mergeSortMultipleSegments(ctxTimeout, t.plan, t.collectionID, t.partitionID, t.maxRows, t.binlogIO,
t.plan.GetSegmentBinlogs(), t.tr, t.currentTs, t.plan.GetCollectionTtl(), t.bm25FieldIDs)
if err != nil {
Expand Down
18 changes: 10 additions & 8 deletions pkg/util/paramtable/component_param.go
Original file line number Diff line number Diff line change
Expand Up @@ -3284,7 +3284,6 @@ type dataCoordConfig struct {
CompactionMaxParallelTasks ParamItem `refreshable:"true"`
CompactionWorkerParalleTasks ParamItem `refreshable:"true"`
MinSegmentToMerge ParamItem `refreshable:"true"`
MaxSegmentToMerge ParamItem `refreshable:"true"`
SegmentSmallProportion ParamItem `refreshable:"true"`
SegmentCompactableProportion ParamItem `refreshable:"true"`
SegmentExpansionRate ParamItem `refreshable:"true"`
Expand Down Expand Up @@ -3610,13 +3609,6 @@ mix is prioritized by level: mix compactions first, then L0 compactions, then cl
}
p.MinSegmentToMerge.Init(base.mgr)

p.MaxSegmentToMerge = ParamItem{
Key: "dataCoord.compaction.max.segment",
Version: "2.0.0",
DefaultValue: "30",
}
p.MaxSegmentToMerge.Init(base.mgr)

p.SegmentSmallProportion = ParamItem{
Key: "dataCoord.segment.smallProportion",
Version: "2.0.0",
Expand Down Expand Up @@ -4303,6 +4295,7 @@ type dataNodeConfig struct {
L0BatchMemoryRatio ParamItem `refreshable:"true"`
L0CompactionMaxBatchSize ParamItem `refreshable:"true"`
UseMergeSort ParamItem `refreshable:"true"`
MaxSegmentMergeSort ParamItem `refreshable:"true"`

GracefulStopTimeout ParamItem `refreshable:"true"`

Expand Down Expand Up @@ -4643,6 +4636,15 @@ if this parameter <= 0, will set it as 10`,
}
p.UseMergeSort.Init(base.mgr)

p.MaxSegmentMergeSort = ParamItem{
Key: "dataNode.compaction.maxSegmentMergeSort",
Version: "2.5.0",
Doc: "The maximum number of segments to be merged in mergeSort mode.",
DefaultValue: "30",
Export: true,
}
p.MaxSegmentMergeSort.Init(base.mgr)

p.GracefulStopTimeout = ParamItem{
Key: "dataNode.gracefulStopTimeout",
Version: "2.3.7",
Expand Down

0 comments on commit dc85d8e

Please sign in to comment.