From 3a7a8c7944b05de41098c6dfb47fe45156ca01be Mon Sep 17 00:00:00 2001 From: Ted Xu Date: Mon, 2 Dec 2024 16:12:38 +0800 Subject: [PATCH] enhance: try compact small segments first if they may compose a full segment (#37709) See #37234 --------- Signed-off-by: Ted Xu --- internal/datacoord/compaction_trigger.go | 188 ++++---------- internal/datacoord/compaction_trigger_test.go | 240 +++++------------- internal/datacoord/knapsack.go | 130 ++++++++++ internal/datacoord/knapsack_test.go | 119 +++++++++ 4 files changed, 360 insertions(+), 317 deletions(-) create mode 100644 internal/datacoord/knapsack.go create mode 100644 internal/datacoord/knapsack_test.go diff --git a/internal/datacoord/compaction_trigger.go b/internal/datacoord/compaction_trigger.go index e3259a7a8b57f..bb2aa10d94225 100644 --- a/internal/datacoord/compaction_trigger.go +++ b/internal/datacoord/compaction_trigger.go @@ -19,7 +19,7 @@ package datacoord import ( "context" "fmt" - "sort" + "math" "sync" "time" @@ -283,23 +283,6 @@ func (t *compactionTrigger) allocSignalID() (UniqueID, error) { return t.allocator.AllocID(ctx) } -func (t *compactionTrigger) getExpectedSegmentSize(collectionID int64) int64 { - ctx, cancel := context.WithTimeout(context.Background(), time.Second) - defer cancel() - collMeta, err := t.handler.GetCollection(ctx, collectionID) - if err != nil { - log.Warn("failed to get collection", zap.Int64("collectionID", collectionID), zap.Error(err)) - return Params.DataCoordCfg.SegmentMaxSize.GetAsInt64() * 1024 * 1024 - } - allDiskIndex := t.meta.indexMeta.AreAllDiskIndex(collectionID, collMeta.Schema) - if allDiskIndex { - // Only if all vector fields index type are DiskANN, recalc segment max size here. - return Params.DataCoordCfg.DiskSegmentMaxSize.GetAsInt64() * 1024 * 1024 - } - // If some vector fields index type are not DiskANN, recalc segment max size using default policy. - return Params.DataCoordCfg.SegmentMaxSize.GetAsInt64() * 1024 * 1024 -} - func (t *compactionTrigger) handleGlobalSignal(signal *compactionSignal) error { t.forceMu.Lock() defer t.forceMu.Unlock() @@ -548,107 +531,57 @@ func (t *compactionTrigger) generatePlans(segments []*SegmentInfo, signal *compa } buckets := [][]*SegmentInfo{} - // sort segment from large to small - sort.Slice(prioritizedCandidates, func(i, j int) bool { - if prioritizedCandidates[i].getSegmentSize() != prioritizedCandidates[j].getSegmentSize() { - return prioritizedCandidates[i].getSegmentSize() > prioritizedCandidates[j].getSegmentSize() - } - return prioritizedCandidates[i].GetID() < prioritizedCandidates[j].GetID() - }) + toUpdate := newSegmentPacker("update", prioritizedCandidates) + toMerge := newSegmentPacker("merge", smallCandidates) + toPack := newSegmentPacker("pack", nonPlannedSegments) - sort.Slice(smallCandidates, func(i, j int) bool { - if smallCandidates[i].getSegmentSize() != smallCandidates[j].getSegmentSize() { - return smallCandidates[i].getSegmentSize() > smallCandidates[j].getSegmentSize() - } - return smallCandidates[i].GetID() < smallCandidates[j].GetID() - }) - - // Sort non-planned from small to large. - sort.Slice(nonPlannedSegments, func(i, j int) bool { - if nonPlannedSegments[i].getSegmentSize() != nonPlannedSegments[j].getSegmentSize() { - return nonPlannedSegments[i].getSegmentSize() < nonPlannedSegments[j].getSegmentSize() + maxSegs := Params.DataCoordCfg.MaxSegmentToMerge.GetAsInt64() + minSegs := Params.DataCoordCfg.MinSegmentToMerge.GetAsInt64() + compactableProportion := Params.DataCoordCfg.SegmentCompactableProportion.GetAsFloat() + satisfiedSize := int64(float64(expectedSize) * compactableProportion) + expantionRate := Params.DataCoordCfg.SegmentExpansionRate.GetAsFloat() + maxLeftSize := expectedSize - satisfiedSize + expectedExpandedSize := int64(float64(expectedSize) * expantionRate) + maxExpandedLeftSize := expectedExpandedSize - satisfiedSize + // 1. Merge small segments if they can make a full bucket + for { + pack, _ := toMerge.pack(expectedSize, maxLeftSize, minSegs, maxSegs) + if len(pack) == 0 { + break } - return nonPlannedSegments[i].GetID() > nonPlannedSegments[j].GetID() - }) - - // greedy pick from large segment to small, the goal is to fill each segment to reach 512M + buckets = append(buckets, pack) + } + // 2. greedy pick from large segment to small, the goal is to fill each segment to reach 512M // we must ensure all prioritized candidates is in a plan // TODO the compaction selection policy should consider if compaction workload is high - for len(prioritizedCandidates) > 0 { - var bucket []*SegmentInfo - // pop out the first element - segment := prioritizedCandidates[0] - bucket = append(bucket, segment) - prioritizedCandidates = prioritizedCandidates[1:] - - // only do single file compaction if segment is already large enough - if segment.getSegmentSize() < expectedSize { - var result []*SegmentInfo - free := expectedSize - segment.getSegmentSize() - maxNum := Params.DataCoordCfg.MaxSegmentToMerge.GetAsInt() - 1 - prioritizedCandidates, result, free = greedySelect(prioritizedCandidates, free, maxNum) - bucket = append(bucket, result...) - maxNum -= len(result) - if maxNum > 0 { - smallCandidates, result, _ = greedySelect(smallCandidates, free, maxNum) - bucket = append(bucket, result...) - } - } - // since this is priority compaction, we will execute even if there is only segment - log.Info("pick priority candidate for compaction", - zap.Int64("prioritized segmentID", segment.GetID()), - zap.Int64s("picked segmentIDs", lo.Map(bucket, func(s *SegmentInfo, _ int) int64 { return s.GetID() })), - zap.Int64("target size", lo.SumBy(bucket, func(s *SegmentInfo) int64 { return s.getSegmentSize() })), - zap.Int64("target count", lo.SumBy(bucket, func(s *SegmentInfo) int64 { return s.GetNumOfRows() })), - ) - buckets = append(buckets, bucket) - } - - var remainingSmallSegs []*SegmentInfo - // check if there are small candidates left can be merged into large segments - for len(smallCandidates) > 0 { - var bucket []*SegmentInfo - // pop out the first element - segment := smallCandidates[0] - bucket = append(bucket, segment) - smallCandidates = smallCandidates[1:] - - var result []*SegmentInfo - free := expectedSize - segment.getSegmentSize() - // for small segment merge, we pick one largest segment and merge as much as small segment together with it - // Why reverse? try to merge as many segments as expected. - // for instance, if a 255M and 255M is the largest small candidates, they will never be merged because of the MinSegmentToMerge limit. - smallCandidates, result, _ = reverseGreedySelect(smallCandidates, free, Params.DataCoordCfg.MaxSegmentToMerge.GetAsInt()-1) - bucket = append(bucket, result...) - - // only merge if candidate number is large than MinSegmentToMerge or if target size is large enough - targetSize := lo.SumBy(bucket, func(s *SegmentInfo) int64 { return s.getSegmentSize() }) - if len(bucket) >= Params.DataCoordCfg.MinSegmentToMerge.GetAsInt() || - len(bucket) > 1 && t.isCompactableSegment(targetSize, expectedSize) { - buckets = append(buckets, bucket) - } else { - remainingSmallSegs = append(remainingSmallSegs, bucket...) + for { + // No limit on the remaining size because we want to pack all prioritized candidates + pack, _ := toUpdate.packWith(expectedSize, math.MaxInt64, 0, maxSegs, toMerge) + if len(pack) == 0 { + break } + buckets = append(buckets, pack) } - remainingSmallSegs = t.squeezeSmallSegmentsToBuckets(remainingSmallSegs, buckets, expectedSize) - - // If there are still remaining small segments, try adding them to non-planned segments. - for _, npSeg := range nonPlannedSegments { - bucket := []*SegmentInfo{npSeg} - targetSize := npSeg.getSegmentSize() - for i := len(remainingSmallSegs) - 1; i >= 0; i-- { - // Note: could also simply use MaxRowNum as limit. - if targetSize+remainingSmallSegs[i].getSegmentSize() <= - int64(Params.DataCoordCfg.SegmentExpansionRate.GetAsFloat()*float64(expectedSize)) { - bucket = append(bucket, remainingSmallSegs[i]) - targetSize += remainingSmallSegs[i].getSegmentSize() - remainingSmallSegs = append(remainingSmallSegs[:i], remainingSmallSegs[i+1:]...) - } + // 2.+ legacy: squeeze small segments + // Try merge all small segments, and then squeeze + for { + pack, _ := toMerge.pack(expectedSize, math.MaxInt64, minSegs, maxSegs) + if len(pack) == 0 { + break } - if len(bucket) > 1 { - buckets = append(buckets, bucket) + buckets = append(buckets, pack) + } + remaining := t.squeezeSmallSegmentsToBuckets(toMerge.candidates, buckets, expectedSize) + toMerge = newSegmentPacker("merge", remaining) + + // 3. pack remaining small segments with non-planned segments + for { + pack, _ := toMerge.packWith(expectedExpandedSize, maxExpandedLeftSize, minSegs, maxSegs, toPack) + if len(pack) == 0 { + break } + buckets = append(buckets, pack) } tasks := make([]*typeutil.Pair[int64, []int64], len(buckets)) @@ -666,37 +599,6 @@ func (t *compactionTrigger) generatePlans(segments []*SegmentInfo, signal *compa return tasks } -func greedySelect(candidates []*SegmentInfo, free int64, maxSegment int) ([]*SegmentInfo, []*SegmentInfo, int64) { - var result []*SegmentInfo - - for i := 0; i < len(candidates); { - candidate := candidates[i] - if len(result) < maxSegment && candidate.getSegmentSize() < free { - result = append(result, candidate) - free -= candidate.getSegmentSize() - candidates = append(candidates[:i], candidates[i+1:]...) - } else { - i++ - } - } - - return candidates, result, free -} - -func reverseGreedySelect(candidates []*SegmentInfo, free int64, maxSegment int) ([]*SegmentInfo, []*SegmentInfo, int64) { - var result []*SegmentInfo - - for i := len(candidates) - 1; i >= 0; i-- { - candidate := candidates[i] - if (len(result) < maxSegment) && (candidate.getSegmentSize() < free) { - result = append(result, candidate) - free -= candidate.getSegmentSize() - candidates = append(candidates[:i], candidates[i+1:]...) - } - } - return candidates, result, free -} - func (t *compactionTrigger) getCandidateSegments(channel string, partitionID UniqueID) []*SegmentInfo { segments := t.meta.GetSegmentsByChannel(channel) if Params.DataCoordCfg.IndexBasedCompaction.GetAsBool() { @@ -743,10 +645,6 @@ func isExpandableSmallSegment(segment *SegmentInfo, expectedSize int64) bool { func isDeltalogTooManySegment(segment *SegmentInfo) bool { deltaLogCount := GetBinlogCount(segment.GetDeltalogs()) - log.Debug("isDeltalogTooManySegment", - zap.Int64("collectionID", segment.CollectionID), - zap.Int64("segmentID", segment.ID), - zap.Int("deltaLogCount", deltaLogCount)) return deltaLogCount > Params.DataCoordCfg.SingleCompactionDeltalogMaxNum.GetAsInt() } diff --git a/internal/datacoord/compaction_trigger_test.go b/internal/datacoord/compaction_trigger_test.go index 1fb62c62a6d16..a5d1df1e318a2 100644 --- a/internal/datacoord/compaction_trigger_test.go +++ b/internal/datacoord/compaction_trigger_test.go @@ -72,11 +72,6 @@ func (h *spyCompactionHandler) enqueueCompaction(task *datapb.CompactionTask) er return err } -// completeCompaction record the result of a compaction -func (h *spyCompactionHandler) completeCompaction(result *datapb.CompactionPlanResult) error { - return nil -} - // isFull return true if the task pool is full func (h *spyCompactionHandler) isFull() bool { return false @@ -607,10 +602,16 @@ func Test_compactionTrigger_force(t *testing.T) { _, err := tr.triggerManualCompaction(tt.collectionID) assert.Equal(t, tt.wantErr, err != nil) spy := (tt.fields.compactionHandler).(*spyCompactionHandler) - plan := <-spy.spyChan - plan.StartTime = 0 - sortPlanCompactionBinlogs(plan) - assert.EqualValues(t, tt.wantPlans[0], plan) + select { + case plan := <-spy.spyChan: + plan.StartTime = 0 + sortPlanCompactionBinlogs(plan) + assert.EqualValues(t, tt.wantPlans[0], plan) + return + case <-time.After(3 * time.Second): + assert.Fail(t, "timeout") + return + } }) t.Run(tt.name+" with DiskANN index", func(t *testing.T) { @@ -636,7 +637,14 @@ func Test_compactionTrigger_force(t *testing.T) { // expect max row num = 2048*1024*1024/(128*4) = 4194304 // assert.EqualValues(t, 4194304, tt.fields.meta.segments.GetSegments()[0].MaxRowNum) spy := (tt.fields.compactionHandler).(*spyCompactionHandler) - <-spy.spyChan + select { + case plan := <-spy.spyChan: + assert.NotNil(t, plan) + return + case <-time.After(3 * time.Second): + assert.Fail(t, "timeout") + return + } }) t.Run(tt.name+" with getCompact error", func(t *testing.T) { @@ -938,7 +946,8 @@ func Test_compactionTrigger_noplan(t *testing.T) { collectionID int64 compactTime *compactTime } - Params.DataCoordCfg.MinSegmentToMerge.DefaultValue = "4" + Params.Save(Params.DataCoordCfg.MinSegmentToMerge.Key, "4") + defer Params.Save(Params.DataCoordCfg.MinSegmentToMerge.Key, Params.DataCoordCfg.MinSegmentToMerge.DefaultValue) vecFieldID := int64(201) mock0Allocator := newMockAllocator(t) im := newSegmentIndexMeta(nil) @@ -1073,6 +1082,48 @@ func Test_compactionTrigger_noplan(t *testing.T) { } } +func mockSegment(segID, rows, deleteRows, sizeInMB int64) *datapb.SegmentInfo { + return &datapb.SegmentInfo{ + ID: segID, + CollectionID: 2, + PartitionID: 1, + LastExpireTime: 100, + NumOfRows: sizeInMB, + MaxRowNum: 150, + InsertChannel: "ch1", + State: commonpb.SegmentState_Flushed, + Binlogs: []*datapb.FieldBinlog{ + { + Binlogs: []*datapb.Binlog{ + {EntriesNum: rows, LogPath: "log1", LogSize: 100 * 1024 * 1024, MemorySize: sizeInMB * 1024 * 1024}, + }, + }, + }, + Deltalogs: []*datapb.FieldBinlog{ + { + Binlogs: []*datapb.Binlog{ + {EntriesNum: deleteRows, LogPath: "deltalog1"}, + }, + }, + }, + IsSorted: true, + } +} + +func mockSegmentsInfo(sizeInMB ...int64) *SegmentsInfo { + segments := make(map[int64]*SegmentInfo, len(sizeInMB)) + for i, size := range sizeInMB { + segId := int64(i + 1) + segments[segId] = &SegmentInfo{ + SegmentInfo: mockSegment(segId, size, 1, size), + lastFlushTime: time.Now().Add(-100 * time.Minute), + } + } + return &SegmentsInfo{ + segments: segments, + } +} + // Test compaction with prioritized candi func Test_compactionTrigger_PrioritizedCandi(t *testing.T) { type fields struct { @@ -1084,33 +1135,6 @@ func Test_compactionTrigger_PrioritizedCandi(t *testing.T) { } vecFieldID := int64(201) - genSeg := func(segID, numRows int64) *datapb.SegmentInfo { - return &datapb.SegmentInfo{ - ID: segID, - CollectionID: 2, - PartitionID: 1, - LastExpireTime: 100, - NumOfRows: numRows, - MaxRowNum: 150, - InsertChannel: "ch1", - State: commonpb.SegmentState_Flushed, - Binlogs: []*datapb.FieldBinlog{ - { - Binlogs: []*datapb.Binlog{ - {EntriesNum: numRows, LogPath: "log1", LogSize: 100, MemorySize: 100}, - }, - }, - }, - Deltalogs: []*datapb.FieldBinlog{ - { - Binlogs: []*datapb.Binlog{ - {EntriesNum: 5, LogPath: "deltalog1"}, - }, - }, - }, - IsSorted: true, - } - } mock0Allocator := newMockAllocator(t) genSegIndex := func(segID, indexID UniqueID, numRows int64) map[UniqueID]*model.SegmentIndex { @@ -1141,34 +1165,7 @@ func Test_compactionTrigger_PrioritizedCandi(t *testing.T) { // 8 small segments channelCPs: newChannelCps(), - segments: &SegmentsInfo{ - segments: map[int64]*SegmentInfo{ - 1: { - SegmentInfo: genSeg(1, 20), - lastFlushTime: time.Now().Add(-100 * time.Minute), - }, - 2: { - SegmentInfo: genSeg(2, 20), - lastFlushTime: time.Now(), - }, - 3: { - SegmentInfo: genSeg(3, 20), - lastFlushTime: time.Now(), - }, - 4: { - SegmentInfo: genSeg(4, 20), - lastFlushTime: time.Now(), - }, - 5: { - SegmentInfo: genSeg(5, 20), - lastFlushTime: time.Now(), - }, - 6: { - SegmentInfo: genSeg(6, 20), - lastFlushTime: time.Now(), - }, - }, - }, + segments: mockSegmentsInfo(20, 20, 20, 20, 20, 20), indexMeta: &indexMeta{ segmentIndexes: map[UniqueID]map[UniqueID]*model.SegmentIndex{ 1: genSegIndex(1, indexID, 20), @@ -1281,27 +1278,6 @@ func Test_compactionTrigger_SmallCandi(t *testing.T) { vecFieldID := int64(201) mock0Allocator := newMockAllocator(t) - genSeg := func(segID, numRows int64) *datapb.SegmentInfo { - return &datapb.SegmentInfo{ - ID: segID, - CollectionID: 2, - PartitionID: 1, - LastExpireTime: 100, - NumOfRows: numRows, - MaxRowNum: 110, - InsertChannel: "ch1", - State: commonpb.SegmentState_Flushed, - Binlogs: []*datapb.FieldBinlog{ - { - Binlogs: []*datapb.Binlog{ - {EntriesNum: 5, LogPath: "log1", LogSize: numRows * 1024 * 1024, MemorySize: numRows * 1024 * 1024}, - }, - }, - }, - IsSorted: true, - } - } - genSegIndex := func(segID, indexID UniqueID, numRows int64) map[UniqueID]*model.SegmentIndex { return map[UniqueID]*model.SegmentIndex{ indexID: { @@ -1328,41 +1304,10 @@ func Test_compactionTrigger_SmallCandi(t *testing.T) { "test small segment", fields{ &meta{ - // 4 small segments channelCPs: newChannelCps(), - - segments: &SegmentsInfo{ - segments: map[int64]*SegmentInfo{ - 1: { - SegmentInfo: genSeg(1, 200), - lastFlushTime: time.Now().Add(-100 * time.Minute), - }, - 2: { - SegmentInfo: genSeg(2, 200), - lastFlushTime: time.Now(), - }, - 3: { - SegmentInfo: genSeg(3, 200), - lastFlushTime: time.Now(), - }, - 4: { - SegmentInfo: genSeg(4, 200), - lastFlushTime: time.Now(), - }, - 5: { - SegmentInfo: genSeg(5, 200), - lastFlushTime: time.Now(), - }, - 6: { - SegmentInfo: genSeg(6, 200), - lastFlushTime: time.Now(), - }, - 7: { - SegmentInfo: genSeg(7, 200), - lastFlushTime: time.Now(), - }, - }, - }, + // 7 segments with 200MB each, the compaction is expected to be triggered + // as the first 5 being merged, and 1 plus being squeezed. + segments: mockSegmentsInfo(200, 200, 200, 200, 200, 200, 200), indexMeta: &indexMeta{ segmentIndexes: map[UniqueID]map[UniqueID]*model.SegmentIndex{ 1: genSegIndex(1, indexID, 20), @@ -1462,7 +1407,6 @@ func Test_compactionTrigger_SmallCandi(t *testing.T) { } } -// Test compaction with small candi func Test_compactionTrigger_SqueezeNonPlannedSegs(t *testing.T) { type fields struct { meta *meta @@ -1477,27 +1421,6 @@ func Test_compactionTrigger_SqueezeNonPlannedSegs(t *testing.T) { } vecFieldID := int64(201) - genSeg := func(segID, numRows int64) *datapb.SegmentInfo { - return &datapb.SegmentInfo{ - ID: segID, - CollectionID: 2, - PartitionID: 1, - LastExpireTime: 100, - NumOfRows: numRows, - MaxRowNum: 110, - InsertChannel: "ch1", - State: commonpb.SegmentState_Flushed, - Binlogs: []*datapb.FieldBinlog{ - { - Binlogs: []*datapb.Binlog{ - {EntriesNum: 5, LogPath: "log1", LogSize: numRows * 1024 * 1024, MemorySize: numRows * 1024 * 1024}, - }, - }, - }, - IsSorted: true, - } - } - genSegIndex := func(segID, indexID UniqueID, numRows int64) map[UniqueID]*model.SegmentIndex { return map[UniqueID]*model.SegmentIndex{ indexID: { @@ -1527,35 +1450,7 @@ func Test_compactionTrigger_SqueezeNonPlannedSegs(t *testing.T) { &meta{ channelCPs: newChannelCps(), - // 4 small segments - segments: &SegmentsInfo{ - segments: map[int64]*SegmentInfo{ - 1: { - SegmentInfo: genSeg(1, 600), - lastFlushTime: time.Now().Add(-100 * time.Minute), - }, - 2: { - SegmentInfo: genSeg(2, 600), - lastFlushTime: time.Now(), - }, - 3: { - SegmentInfo: genSeg(3, 600), - lastFlushTime: time.Now(), - }, - 4: { - SegmentInfo: genSeg(4, 600), - lastFlushTime: time.Now(), - }, - 5: { - SegmentInfo: genSeg(5, 260), - lastFlushTime: time.Now(), - }, - 6: { - SegmentInfo: genSeg(6, 260), - lastFlushTime: time.Now(), - }, - }, - }, + segments: mockSegmentsInfo(600, 600, 600, 600, 260, 260), indexMeta: &indexMeta{ segmentIndexes: map[UniqueID]map[UniqueID]*model.SegmentIndex{ 1: genSegIndex(1, indexID, 20), @@ -2040,6 +1935,7 @@ func Test_compactionTrigger_shouldDoSingleCompaction(t *testing.T) { // expire time < Timestamp To, but index engine version is 2 which is larger than CurrentIndexVersion in segmentIndex Params.Save(Params.DataCoordCfg.AutoUpgradeSegmentIndex.Key, "true") + defer Params.Save(Params.DataCoordCfg.AutoUpgradeSegmentIndex.Key, "false") couldDo = trigger.ShouldDoSingleCompaction(info4, &compactTime{expireTime: 300}) assert.True(t, couldDo) diff --git a/internal/datacoord/knapsack.go b/internal/datacoord/knapsack.go new file mode 100644 index 0000000000000..c40a586ebb2a3 --- /dev/null +++ b/internal/datacoord/knapsack.go @@ -0,0 +1,130 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package datacoord + +import ( + "math" + "sort" + + "github.com/bits-and-blooms/bitset" + "github.com/milvus-io/milvus/pkg/log" + "go.uber.org/zap" +) + +type Sizable interface { + getSegmentSize() int64 + GetID() int64 +} + +type Knapsack[T Sizable] struct { + name string + candidates []T +} + +func newKnapsack[T Sizable](name string, candidates []T) Knapsack[T] { + sort.Slice(candidates, func(i, j int) bool { + if candidates[i].getSegmentSize() != candidates[j].getSegmentSize() { + return candidates[i].getSegmentSize() > candidates[j].getSegmentSize() + } + return candidates[i].GetID() < candidates[j].GetID() + }) + return Knapsack[T]{ + name: name, + candidates: candidates, + } +} + +func (c *Knapsack[T]) tryPack(size, maxLeftSize, minSegs, maxSegs int64) (bitset.BitSet, int64) { + selection := bitset.New(uint(len(c.candidates))) + left := size + for i, segment := range c.candidates { + if maxSegs == 0 { + break + } + if segment.getSegmentSize() <= left { + selection.Set(uint(i)) + left -= segment.getSegmentSize() + maxSegs-- + } + } + + nSelections := selection.Count() + if left > maxLeftSize || nSelections < uint(minSegs) { + log.Debug("tryPack failed", + zap.String("name", c.name), + zap.Int64("left", left), zap.Int64("maxLeftSize", maxLeftSize), + zap.Int64("minSegs", minSegs), + zap.Uint("nselections", nSelections)) + selection.ClearAll() + left = size + } + return *selection, left +} + +func (c *Knapsack[T]) commit(selection bitset.BitSet) []T { + var ( + candidates = make([]T, 0, len(c.candidates)-int(selection.Count())) + returns = make([]T, 0, int(selection.Count())) + ) + + for i, candidate := range c.candidates { + if selection.Test(uint(i)) { + returns = append(returns, candidate) + } else { + candidates = append(candidates, candidate) + } + } + c.candidates = candidates + return returns +} + +// pack packs up to maxSegs segments into a single segment to match the total size given by size. +// If the remaining size is greater than maxLeftSize, or the number of segments is less than minSegs, return nil. +// returns the packed segments and the remaining size +func (c *Knapsack[T]) pack(size, maxLeftSize, minSegs, maxSegs int64) ([]T, int64) { + selection, left := c.tryPack(size, maxLeftSize, minSegs, maxSegs) + if selection.Count() == 0 { + return nil, size + } + segs := c.commit(selection) + return segs, left +} + +func (c *Knapsack[T]) packWith(size, maxLeftSize, minSegs, maxSegs int64, other Knapsack[T]) ([]T, int64) { + selection, left := c.tryPack(size, math.MaxInt64, 0, maxSegs) + if selection.Count() == 0 { + return nil, size + } + + numPacked := int64(selection.Count()) + otherSelection, left := other.tryPack(left, maxLeftSize, minSegs-numPacked, maxSegs-numPacked) + + if otherSelection.Count() == 0 { + // If the original selection already satisfied the requirements, return immediately + if left < maxLeftSize && selection.Count() >= uint(minSegs) { + return c.commit(selection), left + } + return nil, size + } + segs := c.commit(selection) + otherSegs := other.commit(otherSelection) + return append(segs, otherSegs...), left +} + +func newSegmentPacker(name string, candidates []*SegmentInfo) Knapsack[*SegmentInfo] { + return newKnapsack[*SegmentInfo](name, candidates) +} diff --git a/internal/datacoord/knapsack_test.go b/internal/datacoord/knapsack_test.go new file mode 100644 index 0000000000000..0bcae99d25dd2 --- /dev/null +++ b/internal/datacoord/knapsack_test.go @@ -0,0 +1,119 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package datacoord + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +type element struct { + size int64 + id int64 +} + +var _ Sizable = (*element)(nil) + +func (e *element) getSegmentSize() int64 { + return e.size +} + +func (e *element) GetID() int64 { + return e.id +} + +func mockElements(size ...int64) []*element { + var candidates []*element + for _, s := range size { + candidates = append(candidates, &element{size: s, id: s}) + } + return candidates +} + +func Test_pack(t *testing.T) { + type args struct { + size int64 + leftSize int64 + minSegs int64 + maxSegs int64 + } + tests := []struct { + name string + candidates []*element + args args + want []*element + wantLeft int64 + }{ + { + name: "all", + candidates: mockElements(1, 2, 3, 4, 5), + args: args{ + size: 15, + leftSize: 0, + minSegs: 2, + maxSegs: 5, + }, + want: mockElements(5, 4, 3, 2, 1), + wantLeft: 0, + }, + { + name: "failed by left size", + candidates: mockElements(1, 2, 3, 4, 5), + args: args{ + size: 20, + leftSize: 4, + minSegs: 2, + maxSegs: 5, + }, + want: mockElements(), + wantLeft: 20, + }, + { + name: "failed by min segs", + candidates: mockElements(10, 10), + args: args{ + size: 20, + leftSize: 5, + minSegs: 3, + maxSegs: 5, + }, + want: mockElements(), + wantLeft: 20, + }, + { + name: "failed by max segs", + candidates: mockElements(5, 5, 5, 5), + args: args{ + size: 20, + leftSize: 4, + minSegs: 2, + maxSegs: 3, + }, + want: mockElements(), + wantLeft: 20, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + p := newKnapsack[*element](tt.name, tt.candidates) + got, left := p.pack(tt.args.size, tt.args.leftSize, tt.args.minSegs, tt.args.maxSegs) + assert.Equal(t, tt.want, got) + assert.Equal(t, tt.wantLeft, left) + }) + } +}