Skip to content

Commit

Permalink
enhance: [2.4]Determine the number of buffers based on the resource l…
Browse files Browse the repository at this point in the history
…imits of the DataNode (#38210)

issue: #28410 

master pr: #38209

---------

Signed-off-by: Cai Zhang <[email protected]>
  • Loading branch information
xiaocai2333 authored Dec 8, 2024
1 parent d2c20ed commit ddc40a7
Show file tree
Hide file tree
Showing 4 changed files with 178 additions and 35 deletions.
2 changes: 1 addition & 1 deletion configs/milvus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -669,7 +669,7 @@ dataNode:
slot:
slotCap: 16 # The maximum number of tasks(e.g. compaction, importing) allowed to run concurrently on a datanode
clusteringCompaction:
memoryBufferRatio: 0.1 # The ratio of memory buffer of clustering compaction. Data larger than threshold will be flushed to storage.
memoryBufferRatio: 0.3 # The ratio of memory buffer of clustering compaction. Data larger than threshold will be flushed to storage.
workPoolSize: 8 # worker pool size for one clustering compaction job.
ip: # TCP/IP address of dataNode. If not specified, use the first unicastable address
port: 21124 # TCP port of dataNode
Expand Down
133 changes: 100 additions & 33 deletions internal/datanode/compaction/clustering_compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ import (
"github.com/milvus-io/milvus/pkg/util/typeutil"
)

const (
expectedBinlogSize = 16 * 1024 * 1024
)

var _ Compactor = (*clusteringCompactionTask)(nil)

type clusteringCompactionTask struct {
Expand Down Expand Up @@ -321,6 +325,71 @@ func (t *clusteringCompactionTask) getScalarAnalyzeResult(ctx context.Context) e
return nil
}

func splitCentroids(centroids []int, num int) ([][]int, map[int]int) {
if num <= 0 {
return nil, nil
}

result := make([][]int, num)
resultIndex := make(map[int]int, len(centroids))
listLen := len(centroids)

for i := 0; i < listLen; i++ {
group := i % num
result[group] = append(result[group], centroids[i])
resultIndex[i] = group
}

return result, resultIndex
}

func (t *clusteringCompactionTask) generatedVectorPlan(bufferNum int, centroids []*schemapb.VectorField) error {
centroidsOffset := make([]int, len(centroids))
for i := 0; i < len(centroids); i++ {
centroidsOffset[i] = i
}
centroidGroups, groupIndex := splitCentroids(centroidsOffset, bufferNum)
for id, group := range centroidGroups {
fieldStats, err := storage.NewFieldStats(t.clusteringKeyField.FieldID, t.clusteringKeyField.DataType, 0)
if err != nil {
return err
}

centroidValues := make([]storage.VectorFieldValue, len(group))
for i, offset := range group {
centroidValues[i] = storage.NewVectorFieldValue(t.clusteringKeyField.DataType, centroids[offset])
}

fieldStats.SetVectorCentroids(centroidValues...)
clusterBuffer := &ClusterBuffer{
id: id,
flushedRowNum: map[typeutil.UniqueID]atomic.Int64{},
flushedBinlogs: make(map[typeutil.UniqueID]map[typeutil.UniqueID]*datapb.FieldBinlog, 0),
uploadedSegments: make([]*datapb.CompactionSegment, 0),
uploadedSegmentStats: make(map[typeutil.UniqueID]storage.SegmentStats, 0),
clusteringKeyFieldStats: fieldStats,
}
if _, err = t.refreshBufferWriterWithPack(clusterBuffer); err != nil {
return err
}
t.clusterBuffers = append(t.clusterBuffers, clusterBuffer)
}
t.offsetToBufferFunc = func(offset int64, idMapping []uint32) *ClusterBuffer {
centroidGroupOffset := groupIndex[int(idMapping[offset])]
return t.clusterBuffers[centroidGroupOffset]
}
return nil
}

func (t *clusteringCompactionTask) switchPolicyForVectorPlan(centroids *clusteringpb.ClusteringCentroidsStats) error {
bufferNum := len(centroids.GetCentroids())
bufferNumByMemory := int(t.memoryBufferSize / expectedBinlogSize)
if bufferNumByMemory < bufferNum {
bufferNum = bufferNumByMemory
}
return t.generatedVectorPlan(bufferNum, centroids.GetCentroids())
}

func (t *clusteringCompactionTask) getVectorAnalyzeResult(ctx context.Context) error {
ctx, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, fmt.Sprintf("getVectorAnalyzeResult-%d", t.GetPlanID()))
defer span.End()
Expand All @@ -345,28 +414,7 @@ func (t *clusteringCompactionTask) getVectorAnalyzeResult(ctx context.Context) e
log.Debug("read clustering centroids stats", zap.String("path", centroidFilePath),
zap.Int("centroidNum", len(centroids.GetCentroids())),
zap.Any("offsetMappingFiles", t.segmentIDOffsetMapping))

for id, centroid := range centroids.GetCentroids() {
fieldStats, err := storage.NewFieldStats(t.clusteringKeyField.FieldID, t.clusteringKeyField.DataType, 0)
if err != nil {
return err
}
fieldStats.SetVectorCentroids(storage.NewVectorFieldValue(t.clusteringKeyField.DataType, centroid))
clusterBuffer := &ClusterBuffer{
id: id,
flushedRowNum: map[typeutil.UniqueID]atomic.Int64{},
flushedBinlogs: make(map[typeutil.UniqueID]map[typeutil.UniqueID]*datapb.FieldBinlog, 0),
uploadedSegments: make([]*datapb.CompactionSegment, 0),
uploadedSegmentStats: make(map[typeutil.UniqueID]storage.SegmentStats, 0),
clusteringKeyFieldStats: fieldStats,
}
t.refreshBufferWriterWithPack(clusterBuffer)
t.clusterBuffers = append(t.clusterBuffers, clusterBuffer)
}
t.offsetToBufferFunc = func(offset int64, idMapping []uint32) *ClusterBuffer {
return t.clusterBuffers[idMapping[offset]]
}
return nil
return t.switchPolicyForVectorPlan(centroids)
}

// mapping read and split input segments into buffers
Expand Down Expand Up @@ -525,6 +573,7 @@ func (t *clusteringCompactionTask) mappingSegment(
fieldBinlogPaths = append(fieldBinlogPaths, ps)
}

var offset int64 = -1
for _, paths := range fieldBinlogPaths {
allValues, err := t.binlogIO.Download(ctx, paths)
if err != nil {
Expand All @@ -540,7 +589,6 @@ func (t *clusteringCompactionTask) mappingSegment(
return err
}

var offset int64 = -1
for {
err := pkIter.Next()
if err != nil {
Expand Down Expand Up @@ -1032,6 +1080,7 @@ func (t *clusteringCompactionTask) scalarAnalyze(ctx context.Context) (map[inter
zap.Int64("collectionID", t.GetCollection()),
zap.Int64("partitionID", t.partitionID),
zap.Int("segments", len(inputSegments)),
zap.Int("clustering num", len(analyzeDict)),
zap.Duration("elapse", time.Since(analyzeStart)))
return analyzeDict, nil
}
Expand Down Expand Up @@ -1146,19 +1195,10 @@ func (t *clusteringCompactionTask) scalarAnalyzeSegment(
return analyzeResult, nil
}

func (t *clusteringCompactionTask) scalarPlan(dict map[interface{}]int64) [][]interface{} {
keys := lo.MapToSlice(dict, func(k interface{}, _ int64) interface{} {
return k
})
sort.Slice(keys, func(i, j int) bool {
return storage.NewScalarFieldValue(t.clusteringKeyField.DataType, keys[i]).LE(storage.NewScalarFieldValue(t.clusteringKeyField.DataType, keys[j]))
})

func (t *clusteringCompactionTask) generatedScalarPlan(maxRows, preferRows int64, keys []interface{}, dict map[interface{}]int64) [][]interface{} {
buckets := make([][]interface{}, 0)
currentBucket := make([]interface{}, 0)
var currentBucketSize int64 = 0
maxRows := t.plan.MaxSegmentRows
preferRows := t.plan.PreferSegmentRows
for _, key := range keys {
// todo can optimize
if dict[key] > preferRows {
Expand Down Expand Up @@ -1186,6 +1226,33 @@ func (t *clusteringCompactionTask) scalarPlan(dict map[interface{}]int64) [][]in
return buckets
}

func (t *clusteringCompactionTask) switchPolicyForScalarPlan(totalRows int64, keys []interface{}, dict map[interface{}]int64) [][]interface{} {
bufferNumBySegmentMaxRows := totalRows / t.plan.MaxSegmentRows
bufferNumByMemory := t.memoryBufferSize / expectedBinlogSize
log.Info("switchPolicyForScalarPlan", zap.Int64("totalRows", totalRows),
zap.Int64("bufferNumBySegmentMaxRows", bufferNumBySegmentMaxRows),
zap.Int64("bufferNumByMemory", bufferNumByMemory))
if bufferNumByMemory > bufferNumBySegmentMaxRows {
return t.generatedScalarPlan(t.plan.GetMaxSegmentRows(), t.plan.GetPreferSegmentRows(), keys, dict)
}

maxRows := totalRows / bufferNumByMemory
return t.generatedScalarPlan(maxRows, int64(float64(maxRows)*paramtable.Get().DataCoordCfg.ClusteringCompactionPreferSegmentSizeRatio.GetAsFloat()), keys, dict)
}

func (t *clusteringCompactionTask) scalarPlan(dict map[interface{}]int64) [][]interface{} {
totalRows := int64(0)
keys := lo.MapToSlice(dict, func(k interface{}, v int64) interface{} {
totalRows += v
return k
})
sort.Slice(keys, func(i, j int) bool {
return storage.NewScalarFieldValue(t.clusteringKeyField.DataType, keys[i]).LE(storage.NewScalarFieldValue(t.clusteringKeyField.DataType, keys[j]))
})

return t.switchPolicyForScalarPlan(totalRows, keys, dict)
}

func (t *clusteringCompactionTask) refreshBufferWriterWithPack(buffer *ClusterBuffer) (bool, error) {
var segmentID int64
var err error
Expand Down
76 changes: 76 additions & 0 deletions internal/datanode/compaction/clustering_compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package compaction
import (
"context"
"fmt"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -232,6 +233,81 @@ func (s *ClusteringCompactionTaskSuite) TestScalarCompactionNormal() {
s.Equal(totalRowNum, statsRowNum)
}

func (s *ClusteringCompactionTaskSuite) TestScalarCompactionNormalByMemoryLimit() {
schema := genCollectionSchema()
var segmentID int64 = 1001
segWriter, err := NewSegmentWriter(schema, 1000, segmentID, PartitionID, CollectionID)
s.Require().NoError(err)
for i := 0; i < 10240; i++ {
v := storage.Value{
PK: storage.NewInt64PrimaryKey(int64(i)),
Timestamp: int64(tsoutil.ComposeTSByTime(getMilvusBirthday(), 0)),
Value: genRow(int64(i)),
}
err = segWriter.Write(&v)
s.Require().NoError(err)
}
segWriter.FlushAndIsFull()

kvs, fBinlogs, err := serializeWrite(context.TODO(), s.mockAlloc, segWriter)
s.NoError(err)
var one sync.Once
s.mockBinlogIO.EXPECT().Download(mock.Anything, mock.Anything).RunAndReturn(
func(ctx context.Context, strings []string) ([][]byte, error) {
// 32m, only two buffers can be generated
one.Do(func() {
s.task.memoryBufferSize = 32 * 1024 * 1024
})
return lo.Values(kvs), nil
})

s.plan.SegmentBinlogs = []*datapb.CompactionSegmentBinlogs{
{
SegmentID: segmentID,
FieldBinlogs: lo.Values(fBinlogs),
},
}

s.task.plan.Schema = genCollectionSchema()
s.task.plan.ClusteringKeyField = 100
s.task.plan.PreferSegmentRows = 3000
s.task.plan.MaxSegmentRows = 3000

// 8+8+8+4+7+4*4=51
// 51*1024 = 52224
// writer will automatically flush after 1024 rows.
paramtable.Get().Save(paramtable.Get().DataNodeCfg.BinLogMaxSize.Key, "52223")
defer paramtable.Get().Reset(paramtable.Get().DataNodeCfg.BinLogMaxSize.Key)
paramtable.Get().Save(paramtable.Get().DataCoordCfg.ClusteringCompactionPreferSegmentSizeRatio.Key, "1")
defer paramtable.Get().Reset(paramtable.Get().DataCoordCfg.ClusteringCompactionPreferSegmentSizeRatio.Key)

compactionResult, err := s.task.Compact()
s.Require().NoError(err)
s.Equal(2, len(s.task.clusterBuffers))
s.Equal(4, len(compactionResult.GetSegments()))
totalBinlogNum := 0
totalRowNum := int64(0)
for _, fb := range compactionResult.GetSegments()[0].GetInsertLogs() {
for _, b := range fb.GetBinlogs() {
totalBinlogNum++
if fb.GetFieldID() == 100 {
totalRowNum += b.GetEntriesNum()
}
}
}
statsBinlogNum := 0
statsRowNum := int64(0)
for _, sb := range compactionResult.GetSegments()[0].GetField2StatslogPaths() {
for _, b := range sb.GetBinlogs() {
statsBinlogNum++
statsRowNum += b.GetEntriesNum()
}
}
s.Equal(3, totalBinlogNum/len(schema.GetFields()))
s.Equal(1, statsBinlogNum)
s.Equal(totalRowNum, statsRowNum)
}

func (s *ClusteringCompactionTaskSuite) TestCheckBuffersAfterCompaction() {
s.Run("no leak", func() {
task := &clusteringCompactionTask{clusterBuffers: []*ClusterBuffer{{}}}
Expand Down
2 changes: 1 addition & 1 deletion pkg/util/paramtable/component_param.go
Original file line number Diff line number Diff line change
Expand Up @@ -4537,7 +4537,7 @@ if this parameter <= 0, will set it as 10`,
Key: "dataNode.clusteringCompaction.memoryBufferRatio",
Version: "2.4.6",
Doc: "The ratio of memory buffer of clustering compaction. Data larger than threshold will be flushed to storage.",
DefaultValue: "0.1",
DefaultValue: "0.3",
PanicIfEmpty: false,
Export: true,
}
Expand Down

0 comments on commit ddc40a7

Please sign in to comment.