diff --git a/configs/milvus.yaml b/configs/milvus.yaml index 9aaa85694ddec..70e2285c1d385 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -682,6 +682,7 @@ dataNode: levelZeroBatchMemoryRatio: 0.5 # The minimal memory ratio of free memory for level zero compaction executing in batch mode levelZeroMaxBatchSize: -1 # Max batch size refers to the max number of L1/L2 segments in a batch when executing L0 compaction. Default to -1, any value that is less than 1 means no limit. Valid range: >= 1. useMergeSort: false # Whether to enable mergeSort mode when performing mixCompaction. + maxSegmentMergeSort: 30 # The maximum number of segments to be merged in mergeSort mode. gracefulStopTimeout: 1800 # seconds. force stop node without graceful stop slot: slotCap: 16 # The maximum number of tasks(e.g. compaction, importing) allowed to run concurrently on a datanode diff --git a/internal/datacoord/compaction_trigger.go b/internal/datacoord/compaction_trigger.go index 103afc9570de0..218c176dac17e 100644 --- a/internal/datacoord/compaction_trigger.go +++ b/internal/datacoord/compaction_trigger.go @@ -535,7 +535,7 @@ func (t *compactionTrigger) generatePlans(segments []*SegmentInfo, signal *compa toMerge := newSegmentPacker("merge", smallCandidates) toPack := newSegmentPacker("pack", nonPlannedSegments) - maxSegs := Params.DataCoordCfg.MaxSegmentToMerge.GetAsInt64() + maxSegs := int64(4096) // Deprecate the max segment limit since it is irrelevant in simple compactions. minSegs := Params.DataCoordCfg.MinSegmentToMerge.GetAsInt64() compactableProportion := Params.DataCoordCfg.SegmentCompactableProportion.GetAsFloat() satisfiedSize := int64(float64(expectedSize) * compactableProportion) @@ -543,16 +543,18 @@ func (t *compactionTrigger) generatePlans(segments []*SegmentInfo, signal *compa maxLeftSize := expectedSize - satisfiedSize expectedExpandedSize := int64(float64(expectedSize) * expantionRate) maxExpandedLeftSize := expectedExpandedSize - satisfiedSize + reasons := make([]string, 0) // 1. Merge small segments if they can make a full bucket for { - pack, _ := toMerge.pack(expectedSize, maxLeftSize, minSegs, maxSegs) + pack, left := toMerge.pack(expectedSize, maxLeftSize, minSegs, maxSegs) if len(pack) == 0 { break } + reasons = append(reasons, fmt.Sprintf("merging %d small segments with left size %d", len(pack), left)) buckets = append(buckets, pack) } - // 2. greedy pick from large segment to small, the goal is to fill each segment to reach 512M - // we must ensure all prioritized candidates is in a plan + + // 2. Pack prioritized candidates with small segments // TODO the compaction selection policy should consider if compaction workload is high for { // No limit on the remaining size because we want to pack all prioritized candidates @@ -560,6 +562,7 @@ func (t *compactionTrigger) generatePlans(segments []*SegmentInfo, signal *compa if len(pack) == 0 { break } + reasons = append(reasons, fmt.Sprintf("packing %d prioritized segments", len(pack))) buckets = append(buckets, pack) } @@ -570,6 +573,7 @@ func (t *compactionTrigger) generatePlans(segments []*SegmentInfo, signal *compa if len(pack) == 0 { break } + reasons = append(reasons, fmt.Sprintf("packing all %d small segments", len(pack))) buckets = append(buckets, pack) } remaining := t.squeezeSmallSegmentsToBuckets(toMerge.candidates, buckets, expectedSize) @@ -581,6 +585,7 @@ func (t *compactionTrigger) generatePlans(segments []*SegmentInfo, signal *compa if len(pack) == 0 { break } + reasons = append(reasons, fmt.Sprintf("packing %d small segments and non-planned segments", len(pack))) buckets = append(buckets, pack) } @@ -595,7 +600,15 @@ func (t *compactionTrigger) generatePlans(segments []*SegmentInfo, signal *compa pair := typeutil.NewPair(totalRows, segmentIDs) tasks[i] = &pair } - log.Info("generatePlans", zap.Int64("collectionID", signal.collectionID), zap.Int("plan_num", len(tasks))) + + if len(tasks) > 0 { + log.Info("generated nontrivial compaction tasks", + zap.Int64("collectionID", signal.collectionID), + zap.Int("prioritizedCandidates", len(prioritizedCandidates)), + zap.Int("smallCandidates", len(smallCandidates)), + zap.Int("nonPlannedSegments", len(nonPlannedSegments)), + zap.Strings("reasons", reasons)) + } return tasks } diff --git a/internal/datacoord/compaction_trigger_test.go b/internal/datacoord/compaction_trigger_test.go index a5d1df1e318a2..b1b912be218ef 100644 --- a/internal/datacoord/compaction_trigger_test.go +++ b/internal/datacoord/compaction_trigger_test.go @@ -751,7 +751,8 @@ func Test_compactionTrigger_force_maxSegmentLimit(t *testing.T) { }, } - for i := UniqueID(0); i < 50; i++ { + nSegments := 50 + for i := UniqueID(0); i < UniqueID(nSegments); i++ { info := &SegmentInfo{ SegmentInfo: &datapb.SegmentInfo{ ID: i, @@ -913,16 +914,13 @@ func Test_compactionTrigger_force_maxSegmentLimit(t *testing.T) { assert.Equal(t, tt.wantErr, err != nil) spy := (tt.fields.compactionHandler).(*spyCompactionHandler) - // should be split into two plans - plan := <-spy.spyChan - assert.NotEmpty(t, plan) - - // TODO CZS - // assert.Equal(t, len(plan.SegmentBinlogs), 30) - plan = <-spy.spyChan - assert.NotEmpty(t, plan) - // TODO CZS - // assert.Equal(t, len(plan.SegmentBinlogs), 20) + select { + case plan := <-spy.spyChan: + assert.NotEmpty(t, plan) + assert.Equal(t, len(plan.SegmentBinlogs), nSegments) + case <-time.After(2 * time.Second): + assert.Fail(t, "timeout") + } }) } } diff --git a/internal/datacoord/knapsack.go b/internal/datacoord/knapsack.go index 4c69e234df4a3..4134861e8dd4f 100644 --- a/internal/datacoord/knapsack.go +++ b/internal/datacoord/knapsack.go @@ -21,9 +21,6 @@ import ( "sort" "github.com/bits-and-blooms/bitset" - "go.uber.org/zap" - - "github.com/milvus-io/milvus/pkg/log" ) type Sizable interface { @@ -65,11 +62,6 @@ func (c *Knapsack[T]) tryPack(size, maxLeftSize, minSegs, maxSegs int64) (bitset nSelections := selection.Count() if left > maxLeftSize || nSelections < uint(minSegs) { - log.Debug("tryPack failed", - zap.String("name", c.name), - zap.Int64("left", left), zap.Int64("maxLeftSize", maxLeftSize), - zap.Int64("minSegs", minSegs), - zap.Uint("nselections", nSelections)) selection.ClearAll() left = size } diff --git a/internal/datanode/compaction/mix_compactor.go b/internal/datanode/compaction/mix_compactor.go index 6441e32736c9f..271cf1ca5478c 100644 --- a/internal/datanode/compaction/mix_compactor.go +++ b/internal/datanode/compaction/mix_compactor.go @@ -329,17 +329,23 @@ func (t *mixCompactionTask) Compact() (*datapb.CompactionPlanResult, error) { return nil, errors.New("illegal compaction plan") } - allSorted := true - for _, segment := range t.plan.GetSegmentBinlogs() { - if !segment.GetIsSorted() { - allSorted = false - break + sortMergeAppicable := paramtable.Get().DataNodeCfg.UseMergeSort.GetAsBool() + if sortMergeAppicable { + for _, segment := range t.plan.GetSegmentBinlogs() { + if !segment.GetIsSorted() { + sortMergeAppicable = false + break + } + } + if len(insertPaths) <= 1 || len(insertPaths) > paramtable.Get().DataNodeCfg.MaxSegmentMergeSort.GetAsInt() { + // sort merge is not applicable if there is only one segment or too many segments + sortMergeAppicable = false } } var res []*datapb.CompactionSegment - if paramtable.Get().DataNodeCfg.UseMergeSort.GetAsBool() && allSorted && len(t.plan.GetSegmentBinlogs()) > 1 { - log.Info("all segments are sorted, use merge sort") + if sortMergeAppicable { + log.Info("compact by merge sort") res, err = mergeSortMultipleSegments(ctxTimeout, t.plan, t.collectionID, t.partitionID, t.maxRows, t.binlogIO, t.plan.GetSegmentBinlogs(), t.tr, t.currentTs, t.plan.GetCollectionTtl(), t.bm25FieldIDs) if err != nil { diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index b125ec380d54a..0950233569af4 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -3284,7 +3284,6 @@ type dataCoordConfig struct { CompactionMaxParallelTasks ParamItem `refreshable:"true"` CompactionWorkerParalleTasks ParamItem `refreshable:"true"` MinSegmentToMerge ParamItem `refreshable:"true"` - MaxSegmentToMerge ParamItem `refreshable:"true"` SegmentSmallProportion ParamItem `refreshable:"true"` SegmentCompactableProportion ParamItem `refreshable:"true"` SegmentExpansionRate ParamItem `refreshable:"true"` @@ -3610,13 +3609,6 @@ mix is prioritized by level: mix compactions first, then L0 compactions, then cl } p.MinSegmentToMerge.Init(base.mgr) - p.MaxSegmentToMerge = ParamItem{ - Key: "dataCoord.compaction.max.segment", - Version: "2.0.0", - DefaultValue: "30", - } - p.MaxSegmentToMerge.Init(base.mgr) - p.SegmentSmallProportion = ParamItem{ Key: "dataCoord.segment.smallProportion", Version: "2.0.0", @@ -4303,6 +4295,7 @@ type dataNodeConfig struct { L0BatchMemoryRatio ParamItem `refreshable:"true"` L0CompactionMaxBatchSize ParamItem `refreshable:"true"` UseMergeSort ParamItem `refreshable:"true"` + MaxSegmentMergeSort ParamItem `refreshable:"true"` GracefulStopTimeout ParamItem `refreshable:"true"` @@ -4643,6 +4636,15 @@ if this parameter <= 0, will set it as 10`, } p.UseMergeSort.Init(base.mgr) + p.MaxSegmentMergeSort = ParamItem{ + Key: "dataNode.compaction.maxSegmentMergeSort", + Version: "2.5.0", + Doc: "The maximum number of segments to be merged in mergeSort mode.", + DefaultValue: "30", + Export: true, + } + p.MaxSegmentMergeSort.Init(base.mgr) + p.GracefulStopTimeout = ParamItem{ Key: "dataNode.gracefulStopTimeout", Version: "2.3.7",