diff --git a/configs/milvus.yaml b/configs/milvus.yaml index 659ce08e596eb..362ed1f24d340 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -669,7 +669,7 @@ dataNode: slot: slotCap: 16 # The maximum number of tasks(e.g. compaction, importing) allowed to run concurrently on a datanode clusteringCompaction: - memoryBufferRatio: 0.1 # The ratio of memory buffer of clustering compaction. Data larger than threshold will be flushed to storage. + memoryBufferRatio: 0.3 # The ratio of memory buffer of clustering compaction. Data larger than threshold will be flushed to storage. workPoolSize: 8 # worker pool size for one clustering compaction job. ip: # TCP/IP address of dataNode. If not specified, use the first unicastable address port: 21124 # TCP port of dataNode diff --git a/internal/datanode/compaction/clustering_compactor.go b/internal/datanode/compaction/clustering_compactor.go index 0758b55fcf1d7..1a8cf7101e0a0 100644 --- a/internal/datanode/compaction/clustering_compactor.go +++ b/internal/datanode/compaction/clustering_compactor.go @@ -59,6 +59,10 @@ import ( "github.com/milvus-io/milvus/pkg/util/typeutil" ) +const ( + expectedBinlogSize = 16 * 1024 * 1024 +) + var _ Compactor = (*clusteringCompactionTask)(nil) type clusteringCompactionTask struct { @@ -321,6 +325,71 @@ func (t *clusteringCompactionTask) getScalarAnalyzeResult(ctx context.Context) e return nil } +func splitCentroids(centroids []int, num int) ([][]int, map[int]int) { + if num <= 0 { + return nil, nil + } + + result := make([][]int, num) + resultIndex := make(map[int]int, len(centroids)) + listLen := len(centroids) + + for i := 0; i < listLen; i++ { + group := i % num + result[group] = append(result[group], centroids[i]) + resultIndex[i] = group + } + + return result, resultIndex +} + +func (t *clusteringCompactionTask) generatedVectorPlan(bufferNum int, centroids []*schemapb.VectorField) error { + centroidsOffset := make([]int, len(centroids)) + for i := 0; i < len(centroids); i++ { + centroidsOffset[i] = i + } + centroidGroups, groupIndex := splitCentroids(centroidsOffset, bufferNum) + for id, group := range centroidGroups { + fieldStats, err := storage.NewFieldStats(t.clusteringKeyField.FieldID, t.clusteringKeyField.DataType, 0) + if err != nil { + return err + } + + centroidValues := make([]storage.VectorFieldValue, len(group)) + for i, offset := range group { + centroidValues[i] = storage.NewVectorFieldValue(t.clusteringKeyField.DataType, centroids[offset]) + } + + fieldStats.SetVectorCentroids(centroidValues...) + clusterBuffer := &ClusterBuffer{ + id: id, + flushedRowNum: map[typeutil.UniqueID]atomic.Int64{}, + flushedBinlogs: make(map[typeutil.UniqueID]map[typeutil.UniqueID]*datapb.FieldBinlog, 0), + uploadedSegments: make([]*datapb.CompactionSegment, 0), + uploadedSegmentStats: make(map[typeutil.UniqueID]storage.SegmentStats, 0), + clusteringKeyFieldStats: fieldStats, + } + if _, err = t.refreshBufferWriterWithPack(clusterBuffer); err != nil { + return err + } + t.clusterBuffers = append(t.clusterBuffers, clusterBuffer) + } + t.offsetToBufferFunc = func(offset int64, idMapping []uint32) *ClusterBuffer { + centroidGroupOffset := groupIndex[int(idMapping[offset])] + return t.clusterBuffers[centroidGroupOffset] + } + return nil +} + +func (t *clusteringCompactionTask) switchPolicyForVectorPlan(centroids *clusteringpb.ClusteringCentroidsStats) error { + bufferNum := len(centroids.GetCentroids()) + bufferNumByMemory := int(t.memoryBufferSize / expectedBinlogSize) + if bufferNumByMemory < bufferNum { + bufferNum = bufferNumByMemory + } + return t.generatedVectorPlan(bufferNum, centroids.GetCentroids()) +} + func (t *clusteringCompactionTask) getVectorAnalyzeResult(ctx context.Context) error { ctx, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, fmt.Sprintf("getVectorAnalyzeResult-%d", t.GetPlanID())) defer span.End() @@ -345,28 +414,7 @@ func (t *clusteringCompactionTask) getVectorAnalyzeResult(ctx context.Context) e log.Debug("read clustering centroids stats", zap.String("path", centroidFilePath), zap.Int("centroidNum", len(centroids.GetCentroids())), zap.Any("offsetMappingFiles", t.segmentIDOffsetMapping)) - - for id, centroid := range centroids.GetCentroids() { - fieldStats, err := storage.NewFieldStats(t.clusteringKeyField.FieldID, t.clusteringKeyField.DataType, 0) - if err != nil { - return err - } - fieldStats.SetVectorCentroids(storage.NewVectorFieldValue(t.clusteringKeyField.DataType, centroid)) - clusterBuffer := &ClusterBuffer{ - id: id, - flushedRowNum: map[typeutil.UniqueID]atomic.Int64{}, - flushedBinlogs: make(map[typeutil.UniqueID]map[typeutil.UniqueID]*datapb.FieldBinlog, 0), - uploadedSegments: make([]*datapb.CompactionSegment, 0), - uploadedSegmentStats: make(map[typeutil.UniqueID]storage.SegmentStats, 0), - clusteringKeyFieldStats: fieldStats, - } - t.refreshBufferWriterWithPack(clusterBuffer) - t.clusterBuffers = append(t.clusterBuffers, clusterBuffer) - } - t.offsetToBufferFunc = func(offset int64, idMapping []uint32) *ClusterBuffer { - return t.clusterBuffers[idMapping[offset]] - } - return nil + return t.switchPolicyForVectorPlan(centroids) } // mapping read and split input segments into buffers @@ -525,6 +573,7 @@ func (t *clusteringCompactionTask) mappingSegment( fieldBinlogPaths = append(fieldBinlogPaths, ps) } + var offset int64 = -1 for _, paths := range fieldBinlogPaths { allValues, err := t.binlogIO.Download(ctx, paths) if err != nil { @@ -540,7 +589,6 @@ func (t *clusteringCompactionTask) mappingSegment( return err } - var offset int64 = -1 for { err := pkIter.Next() if err != nil { @@ -1032,6 +1080,7 @@ func (t *clusteringCompactionTask) scalarAnalyze(ctx context.Context) (map[inter zap.Int64("collectionID", t.GetCollection()), zap.Int64("partitionID", t.partitionID), zap.Int("segments", len(inputSegments)), + zap.Int("clustering num", len(analyzeDict)), zap.Duration("elapse", time.Since(analyzeStart))) return analyzeDict, nil } @@ -1146,19 +1195,10 @@ func (t *clusteringCompactionTask) scalarAnalyzeSegment( return analyzeResult, nil } -func (t *clusteringCompactionTask) scalarPlan(dict map[interface{}]int64) [][]interface{} { - keys := lo.MapToSlice(dict, func(k interface{}, _ int64) interface{} { - return k - }) - sort.Slice(keys, func(i, j int) bool { - return storage.NewScalarFieldValue(t.clusteringKeyField.DataType, keys[i]).LE(storage.NewScalarFieldValue(t.clusteringKeyField.DataType, keys[j])) - }) - +func (t *clusteringCompactionTask) generatedScalarPlan(maxRows, preferRows int64, keys []interface{}, dict map[interface{}]int64) [][]interface{} { buckets := make([][]interface{}, 0) currentBucket := make([]interface{}, 0) var currentBucketSize int64 = 0 - maxRows := t.plan.MaxSegmentRows - preferRows := t.plan.PreferSegmentRows for _, key := range keys { // todo can optimize if dict[key] > preferRows { @@ -1186,6 +1226,33 @@ func (t *clusteringCompactionTask) scalarPlan(dict map[interface{}]int64) [][]in return buckets } +func (t *clusteringCompactionTask) switchPolicyForScalarPlan(totalRows int64, keys []interface{}, dict map[interface{}]int64) [][]interface{} { + bufferNumBySegmentMaxRows := totalRows / t.plan.MaxSegmentRows + bufferNumByMemory := t.memoryBufferSize / expectedBinlogSize + log.Info("switchPolicyForScalarPlan", zap.Int64("totalRows", totalRows), + zap.Int64("bufferNumBySegmentMaxRows", bufferNumBySegmentMaxRows), + zap.Int64("bufferNumByMemory", bufferNumByMemory)) + if bufferNumByMemory > bufferNumBySegmentMaxRows { + return t.generatedScalarPlan(t.plan.GetMaxSegmentRows(), t.plan.GetPreferSegmentRows(), keys, dict) + } + + maxRows := totalRows / bufferNumByMemory + return t.generatedScalarPlan(maxRows, int64(float64(maxRows)*paramtable.Get().DataCoordCfg.ClusteringCompactionPreferSegmentSizeRatio.GetAsFloat()), keys, dict) +} + +func (t *clusteringCompactionTask) scalarPlan(dict map[interface{}]int64) [][]interface{} { + totalRows := int64(0) + keys := lo.MapToSlice(dict, func(k interface{}, v int64) interface{} { + totalRows += v + return k + }) + sort.Slice(keys, func(i, j int) bool { + return storage.NewScalarFieldValue(t.clusteringKeyField.DataType, keys[i]).LE(storage.NewScalarFieldValue(t.clusteringKeyField.DataType, keys[j])) + }) + + return t.switchPolicyForScalarPlan(totalRows, keys, dict) +} + func (t *clusteringCompactionTask) refreshBufferWriterWithPack(buffer *ClusterBuffer) (bool, error) { var segmentID int64 var err error diff --git a/internal/datanode/compaction/clustering_compactor_test.go b/internal/datanode/compaction/clustering_compactor_test.go index 66d59e3ab6285..673a426e318ad 100644 --- a/internal/datanode/compaction/clustering_compactor_test.go +++ b/internal/datanode/compaction/clustering_compactor_test.go @@ -19,6 +19,7 @@ package compaction import ( "context" "fmt" + "sync" "testing" "time" @@ -232,6 +233,81 @@ func (s *ClusteringCompactionTaskSuite) TestScalarCompactionNormal() { s.Equal(totalRowNum, statsRowNum) } +func (s *ClusteringCompactionTaskSuite) TestScalarCompactionNormalByMemoryLimit() { + schema := genCollectionSchema() + var segmentID int64 = 1001 + segWriter, err := NewSegmentWriter(schema, 1000, segmentID, PartitionID, CollectionID) + s.Require().NoError(err) + for i := 0; i < 10240; i++ { + v := storage.Value{ + PK: storage.NewInt64PrimaryKey(int64(i)), + Timestamp: int64(tsoutil.ComposeTSByTime(getMilvusBirthday(), 0)), + Value: genRow(int64(i)), + } + err = segWriter.Write(&v) + s.Require().NoError(err) + } + segWriter.FlushAndIsFull() + + kvs, fBinlogs, err := serializeWrite(context.TODO(), s.mockAlloc, segWriter) + s.NoError(err) + var one sync.Once + s.mockBinlogIO.EXPECT().Download(mock.Anything, mock.Anything).RunAndReturn( + func(ctx context.Context, strings []string) ([][]byte, error) { + // 32m, only two buffers can be generated + one.Do(func() { + s.task.memoryBufferSize = 32 * 1024 * 1024 + }) + return lo.Values(kvs), nil + }) + + s.plan.SegmentBinlogs = []*datapb.CompactionSegmentBinlogs{ + { + SegmentID: segmentID, + FieldBinlogs: lo.Values(fBinlogs), + }, + } + + s.task.plan.Schema = genCollectionSchema() + s.task.plan.ClusteringKeyField = 100 + s.task.plan.PreferSegmentRows = 3000 + s.task.plan.MaxSegmentRows = 3000 + + // 8+8+8+4+7+4*4=51 + // 51*1024 = 52224 + // writer will automatically flush after 1024 rows. + paramtable.Get().Save(paramtable.Get().DataNodeCfg.BinLogMaxSize.Key, "52223") + defer paramtable.Get().Reset(paramtable.Get().DataNodeCfg.BinLogMaxSize.Key) + paramtable.Get().Save(paramtable.Get().DataCoordCfg.ClusteringCompactionPreferSegmentSizeRatio.Key, "1") + defer paramtable.Get().Reset(paramtable.Get().DataCoordCfg.ClusteringCompactionPreferSegmentSizeRatio.Key) + + compactionResult, err := s.task.Compact() + s.Require().NoError(err) + s.Equal(2, len(s.task.clusterBuffers)) + s.Equal(4, len(compactionResult.GetSegments())) + totalBinlogNum := 0 + totalRowNum := int64(0) + for _, fb := range compactionResult.GetSegments()[0].GetInsertLogs() { + for _, b := range fb.GetBinlogs() { + totalBinlogNum++ + if fb.GetFieldID() == 100 { + totalRowNum += b.GetEntriesNum() + } + } + } + statsBinlogNum := 0 + statsRowNum := int64(0) + for _, sb := range compactionResult.GetSegments()[0].GetField2StatslogPaths() { + for _, b := range sb.GetBinlogs() { + statsBinlogNum++ + statsRowNum += b.GetEntriesNum() + } + } + s.Equal(3, totalBinlogNum/len(schema.GetFields())) + s.Equal(1, statsBinlogNum) + s.Equal(totalRowNum, statsRowNum) +} + func (s *ClusteringCompactionTaskSuite) TestCheckBuffersAfterCompaction() { s.Run("no leak", func() { task := &clusteringCompactionTask{clusterBuffers: []*ClusterBuffer{{}}} diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index 86467ac47488c..96373172bd5c5 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -4537,7 +4537,7 @@ if this parameter <= 0, will set it as 10`, Key: "dataNode.clusteringCompaction.memoryBufferRatio", Version: "2.4.6", Doc: "The ratio of memory buffer of clustering compaction. Data larger than threshold will be flushed to storage.", - DefaultValue: "0.1", + DefaultValue: "0.3", PanicIfEmpty: false, Export: true, }