diff --git a/configs/milvus.yaml b/configs/milvus.yaml index e154a5d7199ab..13571edd7ba03 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -384,9 +384,9 @@ dataCoord: balanceSilentDuration: 300 # The duration before the channelBalancer on datacoord to run balanceInterval: 360 #The interval for the channelBalancer on datacoord to check balance status segment: - maxSize: 512 # Maximum size of a segment in MB + maxSize: 1024 # Maximum size of a segment in MB diskSegmentMaxSize: 2048 # Maximum size of a segment in MB for collection which has Disk index - sealProportion: 0.23 + sealProportion: 0.12 # The time of the assignment expiration in ms # Warning! this parameter is an expert variable and closely related to data integrity. Without specific # target and solid understanding of the scenarios, it should not be changed. If it's necessary to alter diff --git a/internal/datacoord/compaction_trigger.go b/internal/datacoord/compaction_trigger.go index 46c2f7ba63a5a..a8f143d792665 100644 --- a/internal/datacoord/compaction_trigger.go +++ b/internal/datacoord/compaction_trigger.go @@ -28,11 +28,14 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" + "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/internal/metastore/model" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/indexparamcheck" "github.com/milvus-io/milvus/pkg/util/logutil" "github.com/milvus-io/milvus/pkg/util/tsoutil" + "github.com/milvus-io/milvus/pkg/util/typeutil" ) type compactTime struct { @@ -315,31 +318,44 @@ func (t *compactionTrigger) updateSegmentMaxSize(segments []*SegmentInfo) (bool, collectionID := segments[0].GetCollectionID() indexInfos := t.meta.GetIndexesForCollection(segments[0].GetCollectionID(), "") - isDiskANN := false - for _, indexInfo := range indexInfos { - indexType := getIndexType(indexInfo.IndexParams) - if indexType == indexparamcheck.IndexDISKANN { - // If index type is DiskANN, recalc segment max size here. - isDiskANN = true - newMaxRows, err := t.reCalcSegmentMaxNumOfRows(collectionID, true) - if err != nil { - return false, err - } - if len(segments) > 0 && int64(newMaxRows) != segments[0].GetMaxRowNum() { - log.Info("segment max rows recalculated for DiskANN collection", - zap.Int64("old max rows", segments[0].GetMaxRowNum()), - zap.Int64("new max rows", int64(newMaxRows))) - for _, segment := range segments { - segment.MaxRowNum = int64(newMaxRows) - } + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + collMeta, err := t.handler.GetCollection(ctx, collectionID) + if err != nil { + return false, fmt.Errorf("failed to get collection %d", collectionID) + } + vectorFields := typeutil.GetVectorFieldSchemas(collMeta.Schema) + fieldIndexTypes := lo.SliceToMap(indexInfos, func(t *model.Index) (int64, indexparamcheck.IndexType) { + return t.FieldID, getIndexType(t.IndexParams) + }) + vectorFieldsWithDiskIndex := lo.Filter(vectorFields, func(field *schemapb.FieldSchema, _ int) bool { + if indexType, ok := fieldIndexTypes[field.FieldID]; ok { + return indexparamcheck.IsDiskIndex(indexType) + } + return false + }) + + allDiskIndex := len(vectorFields) == len(vectorFieldsWithDiskIndex) + if allDiskIndex { + // Only if all vector fields index type are DiskANN, recalc segment max size here. + newMaxRows, err := t.reCalcSegmentMaxNumOfRows(collectionID, true) + if err != nil { + return false, err + } + if len(segments) > 0 && int64(newMaxRows) != segments[0].GetMaxRowNum() { + log.Info("segment max rows recalculated for DiskANN collection", + zap.Int64("old max rows", segments[0].GetMaxRowNum()), + zap.Int64("new max rows", int64(newMaxRows))) + for _, segment := range segments { + segment.MaxRowNum = int64(newMaxRows) } } } - // If index type is not DiskANN, recalc segment max size using default policy. - if !isDiskANN && !t.testingOnly { + // If some vector fields index type are not DiskANN, recalc segment max size using default policy. + if !allDiskIndex && !t.testingOnly { newMaxRows, err := t.reCalcSegmentMaxNumOfRows(collectionID, false) if err != nil { - return isDiskANN, err + return allDiskIndex, err } if len(segments) > 0 && int64(newMaxRows) != segments[0].GetMaxRowNum() { log.Info("segment max rows recalculated for non-DiskANN collection", @@ -350,7 +366,7 @@ func (t *compactionTrigger) updateSegmentMaxSize(segments []*SegmentInfo) (bool, } } } - return isDiskANN, nil + return allDiskIndex, nil } func (t *compactionTrigger) handleGlobalSignal(signal *compactionSignal) error { diff --git a/internal/datacoord/compaction_trigger_test.go b/internal/datacoord/compaction_trigger_test.go index 4d14186f83059..86da5cfacb4d9 100644 --- a/internal/datacoord/compaction_trigger_test.go +++ b/internal/datacoord/compaction_trigger_test.go @@ -24,6 +24,7 @@ import ( "time" "github.com/cockroachdb/errors" + "github.com/samber/lo" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" @@ -33,6 +34,7 @@ import ( "github.com/milvus-io/milvus/internal/metastore/model" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/pkg/common" + "github.com/milvus-io/milvus/pkg/util/indexparamcheck" "github.com/milvus-io/milvus/pkg/util/tsoutil" ) @@ -1745,7 +1747,7 @@ func Test_compactionTrigger_shouldDoSingleCompaction(t *testing.T) { assert.True(t, couldDo) // if only 10 bin logs, then disk index won't trigger compaction - info.Statslogs = binlogs[0:20] + info.Statslogs = binlogs[0:40] couldDo = trigger.ShouldDoSingleCompaction(info, false, &compactTime{}) assert.True(t, couldDo) @@ -2224,7 +2226,7 @@ func (s *CompactionTriggerSuite) TestHandleSignal() { defer s.SetupTest() tr := s.tr s.compactionHandler.EXPECT().isFull().Return(false) - s.allocator.EXPECT().allocTimestamp(mock.Anything).Return(10000, nil) + // s.allocator.EXPECT().allocTimestamp(mock.Anything).Return(10000, nil) s.handler.EXPECT().GetCollection(mock.Anything, int64(100)).Return(nil, errors.New("mocked")) tr.handleSignal(&compactionSignal{ segmentID: 1, @@ -2246,6 +2248,14 @@ func (s *CompactionTriggerSuite) TestHandleSignal() { Properties: map[string]string{ common.CollectionAutoCompactionKey: "bad_value", }, + Schema: &schemapb.CollectionSchema{ + Fields: []*schemapb.FieldSchema{ + { + FieldID: s.vecFieldID, + DataType: schemapb.DataType_FloatVector, + }, + }, + }, }, nil) tr.handleSignal(&compactionSignal{ segmentID: 1, @@ -2319,6 +2329,30 @@ func (s *CompactionTriggerSuite) TestHandleSignal() { } func (s *CompactionTriggerSuite) TestHandleGlobalSignal() { + schema := &schemapb.CollectionSchema{ + Fields: []*schemapb.FieldSchema{ + { + FieldID: common.StartOfUserFieldID, + DataType: schemapb.DataType_FloatVector, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: common.DimKey, + Value: "128", + }, + }, + }, + { + FieldID: common.StartOfUserFieldID + 1, + DataType: schemapb.DataType_FloatVector, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: common.DimKey, + Value: "128", + }, + }, + }, + }, + } s.Run("getCompaction_failed", func() { defer s.SetupTest() tr := s.tr @@ -2342,6 +2376,7 @@ func (s *CompactionTriggerSuite) TestHandleGlobalSignal() { s.compactionHandler.EXPECT().isFull().Return(false) s.allocator.EXPECT().allocTimestamp(mock.Anything).Return(10000, nil) s.handler.EXPECT().GetCollection(mock.Anything, int64(100)).Return(&collectionInfo{ + Schema: schema, Properties: map[string]string{ common.CollectionAutoCompactionKey: "bad_value", }, @@ -2363,6 +2398,7 @@ func (s *CompactionTriggerSuite) TestHandleGlobalSignal() { s.compactionHandler.EXPECT().isFull().Return(false) s.allocator.EXPECT().allocTimestamp(mock.Anything).Return(10000, nil) s.handler.EXPECT().GetCollection(mock.Anything, int64(100)).Return(&collectionInfo{ + Schema: schema, Properties: map[string]string{ common.CollectionAutoCompactionKey: "false", }, @@ -2385,6 +2421,7 @@ func (s *CompactionTriggerSuite) TestHandleGlobalSignal() { s.allocator.EXPECT().allocTimestamp(mock.Anything).Return(10000, nil) s.allocator.EXPECT().allocID(mock.Anything).Return(20000, nil) s.handler.EXPECT().GetCollection(mock.Anything, int64(100)).Return(&collectionInfo{ + Schema: schema, Properties: map[string]string{ common.CollectionAutoCompactionKey: "false", }, @@ -2400,6 +2437,285 @@ func (s *CompactionTriggerSuite) TestHandleGlobalSignal() { }) } +// test updateSegmentMaxSize +func Test_compactionTrigger_updateSegmentMaxSize(t *testing.T) { + type fields struct { + meta *meta + allocator allocator + signals chan *compactionSignal + compactionHandler compactionPlanContext + globalTrigger *time.Ticker + } + type args struct { + collectionID int64 + compactTime *compactTime + } + collectionID := int64(2) + vecFieldID1 := int64(201) + vecFieldID2 := int64(202) + segmentInfos := make([]*SegmentInfo, 0) + for i := UniqueID(0); i < 50; i++ { + info := &SegmentInfo{ + SegmentInfo: &datapb.SegmentInfo{ + ID: i, + CollectionID: collectionID, + }, + segmentIndexes: map[UniqueID]*model.SegmentIndex{ + indexID: { + SegmentID: i, + CollectionID: collectionID, + PartitionID: 1, + NumRows: 100, + IndexID: indexID, + BuildID: i, + NodeID: 0, + IndexVersion: 1, + IndexState: commonpb.IndexState_Finished, + }, + }, + } + segmentInfos = append(segmentInfos, info) + } + segmentsInfo := &SegmentsInfo{ + segments: lo.SliceToMap(segmentInfos, func(t *SegmentInfo) (UniqueID, *SegmentInfo) { + return t.ID, t + }), + } + info := &collectionInfo{ + ID: collectionID, + Schema: &schemapb.CollectionSchema{ + Fields: []*schemapb.FieldSchema{ + { + FieldID: vecFieldID1, + DataType: schemapb.DataType_FloatVector, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: common.DimKey, + Value: "128", + }, + }, + }, + { + FieldID: vecFieldID2, + DataType: schemapb.DataType_FloatVector, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: common.DimKey, + Value: "128", + }, + }, + }, + }, + }, + } + + tests := []struct { + name string + fields fields + args args + isDiskANN bool + }{ + { + "all mem index", + fields{ + &meta{ + segments: segmentsInfo, + collections: map[int64]*collectionInfo{ + collectionID: info, + }, + indexes: map[UniqueID]map[UniqueID]*model.Index{ + collectionID: { + indexID: { + TenantID: "", + CollectionID: 2, + FieldID: vecFieldID1, + IndexID: indexID, + IndexName: "_default_idx_1", + IsDeleted: false, + CreateTime: 0, + TypeParams: nil, + IndexParams: []*commonpb.KeyValuePair{ + { + Key: common.IndexTypeKey, + Value: "HNSW", + }, + }, + IsAutoIndex: false, + UserIndexParams: nil, + }, + indexID + 1: { + TenantID: "", + CollectionID: 2, + FieldID: vecFieldID2, + IndexID: indexID + 1, + IndexName: "_default_idx_2", + IsDeleted: false, + CreateTime: 0, + TypeParams: nil, + IndexParams: []*commonpb.KeyValuePair{ + { + Key: common.IndexTypeKey, + Value: "HNSW", + }, + }, + IsAutoIndex: false, + UserIndexParams: nil, + }, + }, + }, + }, + newMockAllocator(), + nil, + &spyCompactionHandler{spyChan: make(chan *datapb.CompactionPlan, 2)}, + nil, + }, + args{ + collectionID, + &compactTime{}, + }, + false, + }, + { + "all disk index", + fields{ + &meta{ + segments: segmentsInfo, + collections: map[int64]*collectionInfo{ + collectionID: info, + }, + indexes: map[UniqueID]map[UniqueID]*model.Index{ + collectionID: { + indexID: { + TenantID: "", + CollectionID: 2, + FieldID: vecFieldID1, + IndexID: indexID, + IndexName: "_default_idx_1", + IsDeleted: false, + CreateTime: 0, + TypeParams: nil, + IndexParams: []*commonpb.KeyValuePair{ + { + Key: common.IndexTypeKey, + Value: indexparamcheck.IndexDISKANN, + }, + }, + IsAutoIndex: false, + UserIndexParams: nil, + }, + indexID + 1: { + TenantID: "", + CollectionID: 2, + FieldID: vecFieldID2, + IndexID: indexID + 1, + IndexName: "_default_idx_2", + IsDeleted: false, + CreateTime: 0, + TypeParams: nil, + IndexParams: []*commonpb.KeyValuePair{ + { + Key: common.IndexTypeKey, + Value: indexparamcheck.IndexDISKANN, + }, + }, + IsAutoIndex: false, + UserIndexParams: nil, + }, + }, + }, + }, + newMockAllocator(), + nil, + &spyCompactionHandler{spyChan: make(chan *datapb.CompactionPlan, 2)}, + nil, + }, + args{ + collectionID, + &compactTime{}, + }, + true, + }, + { + "some mme index", + fields{ + &meta{ + segments: segmentsInfo, + collections: map[int64]*collectionInfo{ + collectionID: info, + }, + indexes: map[UniqueID]map[UniqueID]*model.Index{ + collectionID: { + indexID: { + TenantID: "", + CollectionID: 2, + FieldID: vecFieldID1, + IndexID: indexID, + IndexName: "_default_idx_1", + IsDeleted: false, + CreateTime: 0, + TypeParams: nil, + IndexParams: []*commonpb.KeyValuePair{ + { + Key: common.IndexTypeKey, + Value: indexparamcheck.IndexDISKANN, + }, + }, + IsAutoIndex: false, + UserIndexParams: nil, + }, + indexID + 1: { + TenantID: "", + CollectionID: 2, + FieldID: vecFieldID2, + IndexID: indexID + 1, + IndexName: "_default_idx_2", + IsDeleted: false, + CreateTime: 0, + TypeParams: nil, + IndexParams: []*commonpb.KeyValuePair{ + { + Key: common.IndexTypeKey, + Value: indexparamcheck.IndexHNSW, + }, + }, + IsAutoIndex: false, + UserIndexParams: nil, + }, + }, + }, + }, + newMockAllocator(), + nil, + &spyCompactionHandler{spyChan: make(chan *datapb.CompactionPlan, 2)}, + nil, + }, + args{ + collectionID, + &compactTime{}, + }, + false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + tr := &compactionTrigger{ + meta: tt.fields.meta, + handler: newMockHandlerWithMeta(tt.fields.meta), + allocator: tt.fields.allocator, + signals: tt.fields.signals, + compactionHandler: tt.fields.compactionHandler, + globalTrigger: tt.fields.globalTrigger, + estimateDiskSegmentPolicy: calBySchemaPolicyWithDiskIndex, + estimateNonDiskSegmentPolicy: calBySchemaPolicy, + testingOnly: true, + } + res, err := tr.updateSegmentMaxSize(segmentInfos) + assert.NoError(t, err) + assert.Equal(t, tt.isDiskANN, res) + }) + } +} + func TestCompactionTriggerSuite(t *testing.T) { suite.Run(t, new(CompactionTriggerSuite)) } diff --git a/pkg/util/indexparamcheck/index_type.go b/pkg/util/indexparamcheck/index_type.go index b6fb43049e72d..5bf0ad25d0100 100644 --- a/pkg/util/indexparamcheck/index_type.go +++ b/pkg/util/indexparamcheck/index_type.go @@ -48,3 +48,7 @@ func IsMmapSupported(indexType IndexType) bool { indexType == IndexFaissBinIvfFlat || indexType == IndexHNSW } + +func IsDiskIndex(indexType IndexType) bool { + return indexType == IndexDISKANN +} diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index 869a1ffc63497..21fdec1662eca 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -2387,7 +2387,7 @@ func (p *dataCoordConfig) init(base *BaseTable) { p.SegmentMaxSize = ParamItem{ Key: "dataCoord.segment.maxSize", Version: "2.0.0", - DefaultValue: "512", + DefaultValue: "1024", Doc: "Maximum size of a segment in MB", Export: true, } @@ -2396,7 +2396,7 @@ func (p *dataCoordConfig) init(base *BaseTable) { p.DiskSegmentMaxSize = ParamItem{ Key: "dataCoord.segment.diskSegmentMaxSize", Version: "2.0.0", - DefaultValue: "512", + DefaultValue: "2048", Doc: "Maximun size of a segment in MB for collection which has Disk index", Export: true, } @@ -2405,7 +2405,7 @@ func (p *dataCoordConfig) init(base *BaseTable) { p.SegmentSealProportion = ParamItem{ Key: "dataCoord.segment.sealProportion", Version: "2.0.0", - DefaultValue: "0.23", + DefaultValue: "0.12", Export: true, } p.SegmentSealProportion.Init(base.mgr)