Skip to content

Commit

Permalink
enhance: Set segment.maxSize param to 1024M (milvus-io#30139)
Browse files Browse the repository at this point in the history
issue: milvus-io#25639 
/kind improvement

When the number of vector columns increases, the number of rows per
segment will decrease. In order to reduce the impact on vector indexing
performance, it is necessary to increase the segment max limit.

If a collection has multiple vector fields with memory and disk indices
on different vector fields, the size limit after segment compaction is
the minimum of segment.maxSize and segment.diskSegmentMaxSize.

Signed-off-by: xige-16 <[email protected]>

---------

Signed-off-by: xige-16 <[email protected]>
  • Loading branch information
xige-16 authored Jan 29, 2024
1 parent 927d310 commit 033eae9
Show file tree
Hide file tree
Showing 5 changed files with 364 additions and 28 deletions.
4 changes: 2 additions & 2 deletions configs/milvus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -384,9 +384,9 @@ dataCoord:
balanceSilentDuration: 300 # The duration before the channelBalancer on datacoord to run
balanceInterval: 360 #The interval for the channelBalancer on datacoord to check balance status
segment:
maxSize: 512 # Maximum size of a segment in MB
maxSize: 1024 # Maximum size of a segment in MB
diskSegmentMaxSize: 2048 # Maximum size of a segment in MB for collection which has Disk index
sealProportion: 0.23
sealProportion: 0.12
# The time of the assignment expiration in ms
# Warning! this parameter is an expert variable and closely related to data integrity. Without specific
# target and solid understanding of the scenarios, it should not be changed. If it's necessary to alter
Expand Down
58 changes: 37 additions & 21 deletions internal/datacoord/compaction_trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,14 @@ import (

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/metastore/model"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/indexparamcheck"
"github.com/milvus-io/milvus/pkg/util/logutil"
"github.com/milvus-io/milvus/pkg/util/tsoutil"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)

type compactTime struct {
Expand Down Expand Up @@ -315,31 +318,44 @@ func (t *compactionTrigger) updateSegmentMaxSize(segments []*SegmentInfo) (bool,
collectionID := segments[0].GetCollectionID()
indexInfos := t.meta.GetIndexesForCollection(segments[0].GetCollectionID(), "")

isDiskANN := false
for _, indexInfo := range indexInfos {
indexType := getIndexType(indexInfo.IndexParams)
if indexType == indexparamcheck.IndexDISKANN {
// If index type is DiskANN, recalc segment max size here.
isDiskANN = true
newMaxRows, err := t.reCalcSegmentMaxNumOfRows(collectionID, true)
if err != nil {
return false, err
}
if len(segments) > 0 && int64(newMaxRows) != segments[0].GetMaxRowNum() {
log.Info("segment max rows recalculated for DiskANN collection",
zap.Int64("old max rows", segments[0].GetMaxRowNum()),
zap.Int64("new max rows", int64(newMaxRows)))
for _, segment := range segments {
segment.MaxRowNum = int64(newMaxRows)
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
collMeta, err := t.handler.GetCollection(ctx, collectionID)
if err != nil {
return false, fmt.Errorf("failed to get collection %d", collectionID)
}
vectorFields := typeutil.GetVectorFieldSchemas(collMeta.Schema)
fieldIndexTypes := lo.SliceToMap(indexInfos, func(t *model.Index) (int64, indexparamcheck.IndexType) {
return t.FieldID, getIndexType(t.IndexParams)
})
vectorFieldsWithDiskIndex := lo.Filter(vectorFields, func(field *schemapb.FieldSchema, _ int) bool {
if indexType, ok := fieldIndexTypes[field.FieldID]; ok {
return indexparamcheck.IsDiskIndex(indexType)
}
return false
})

allDiskIndex := len(vectorFields) == len(vectorFieldsWithDiskIndex)
if allDiskIndex {
// Only if all vector fields index type are DiskANN, recalc segment max size here.
newMaxRows, err := t.reCalcSegmentMaxNumOfRows(collectionID, true)
if err != nil {
return false, err
}
if len(segments) > 0 && int64(newMaxRows) != segments[0].GetMaxRowNum() {
log.Info("segment max rows recalculated for DiskANN collection",
zap.Int64("old max rows", segments[0].GetMaxRowNum()),
zap.Int64("new max rows", int64(newMaxRows)))
for _, segment := range segments {
segment.MaxRowNum = int64(newMaxRows)
}
}
}
// If index type is not DiskANN, recalc segment max size using default policy.
if !isDiskANN && !t.testingOnly {
// If some vector fields index type are not DiskANN, recalc segment max size using default policy.
if !allDiskIndex && !t.testingOnly {
newMaxRows, err := t.reCalcSegmentMaxNumOfRows(collectionID, false)
if err != nil {
return isDiskANN, err
return allDiskIndex, err
}
if len(segments) > 0 && int64(newMaxRows) != segments[0].GetMaxRowNum() {
log.Info("segment max rows recalculated for non-DiskANN collection",
Expand All @@ -350,7 +366,7 @@ func (t *compactionTrigger) updateSegmentMaxSize(segments []*SegmentInfo) (bool,
}
}
}
return isDiskANN, nil
return allDiskIndex, nil
}

func (t *compactionTrigger) handleGlobalSignal(signal *compactionSignal) error {
Expand Down
Loading

0 comments on commit 033eae9

Please sign in to comment.