From 9b9f5ce9f73838bbdf040ab1466293dbe9849e56 Mon Sep 17 00:00:00 2001 From: wayblink Date: Mon, 24 Jun 2024 15:22:42 +0800 Subject: [PATCH] Add an option to enable/disable vector field clustering key Signed-off-by: wayblink --- configs/milvus.yaml | 2 + internal/proxy/task.go | 4 ++ internal/proxy/task_test.go | 35 +++++++++++++++ internal/util/clustering/clustering.go | 9 +++- pkg/util/merr/errors.go | 15 ++++--- pkg/util/merr/errors_test.go | 1 + pkg/util/merr/utils.go | 10 +++++ pkg/util/paramtable/component_param.go | 49 ++++++++++++--------- pkg/util/paramtable/component_param_test.go | 2 + 9 files changed, 99 insertions(+), 28 deletions(-) diff --git a/configs/milvus.yaml b/configs/milvus.yaml index 9bb5f2963e359..38cdfe504504b 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -650,8 +650,10 @@ common: traceLogMode: 0 # trace request info bloomFilterSize: 100000 # bloom filter initial size maxBloomFalsePositive: 0.001 # max false positive rate for bloom filter + # clustering key/compaction related usePartitionKeyAsClusteringKey: false useVectorAsClusteringKey: false + enableVectorClusteringKey: false # QuotaConfig, configurations of Milvus quota and limits. # By default, we enable: diff --git a/internal/proxy/task.go b/internal/proxy/task.go index e2558030476d5..acf91ae9a63b2 100644 --- a/internal/proxy/task.go +++ b/internal/proxy/task.go @@ -248,6 +248,10 @@ func (t *createCollectionTask) validateClusteringKey() error { idx := -1 for i, field := range t.schema.Fields { if field.GetIsClusteringKey() { + if typeutil.IsVectorType(field.GetDataType()) && + !paramtable.Get().CommonCfg.EnableVectorClusteringKey.GetAsBool() { + return merr.WrapErrCollectionVectorClusteringKeyNotAllowed(t.CollectionName) + } if idx != -1 { return merr.WrapErrCollectionIllegalSchema(t.CollectionName, fmt.Sprintf("there are more than one clustering key, field name = %s, %s", t.schema.Fields[idx].Name, field.Name)) diff --git a/internal/proxy/task_test.go b/internal/proxy/task_test.go index 8d80338417a11..17644e99d98de 100644 --- a/internal/proxy/task_test.go +++ b/internal/proxy/task_test.go @@ -3568,6 +3568,41 @@ func TestClusteringKey(t *testing.T) { err = createCollectionTask.PreExecute(ctx) assert.Error(t, err) }) + + t.Run("create collection with vector clustering key", func(t *testing.T) { + fieldName2Type := make(map[string]schemapb.DataType) + fieldName2Type["int64_field"] = schemapb.DataType_Int64 + fieldName2Type["varChar_field"] = schemapb.DataType_VarChar + schema := constructCollectionSchemaByDataType(collectionName, fieldName2Type, "int64_field", false) + clusterKeyField := &schemapb.FieldSchema{ + Name: "vec_field", + DataType: schemapb.DataType_FloatVector, + IsClusteringKey: true, + } + schema.Fields = append(schema.Fields, clusterKeyField) + marshaledSchema, err := proto.Marshal(schema) + assert.NoError(t, err) + + createCollectionTask := &createCollectionTask{ + Condition: NewTaskCondition(ctx), + CreateCollectionRequest: &milvuspb.CreateCollectionRequest{ + Base: &commonpb.MsgBase{ + MsgID: UniqueID(uniquegenerator.GetUniqueIntGeneratorIns().GetInt()), + Timestamp: Timestamp(time.Now().UnixNano()), + }, + DbName: "", + CollectionName: collectionName, + Schema: marshaledSchema, + ShardsNum: shardsNum, + }, + ctx: ctx, + rootCoord: rc, + result: nil, + schema: nil, + } + err = createCollectionTask.PreExecute(ctx) + assert.Error(t, err) + }) } func TestAlterCollectionCheckLoaded(t *testing.T) { diff --git a/internal/util/clustering/clustering.go b/internal/util/clustering/clustering.go index b9859922332d8..20b6636bca6aa 100644 --- a/internal/util/clustering/clustering.go +++ b/internal/util/clustering/clustering.go @@ -8,6 +8,7 @@ import ( "github.com/milvus-io/milvus/pkg/util/distance" "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/paramtable" + "github.com/milvus-io/milvus/pkg/util/typeutil" ) func CalcVectorDistance(dim int64, dataType schemapb.DataType, left []byte, right []float32, metric string) ([]float32, error) { @@ -70,10 +71,16 @@ func GetClusteringKeyField(collectionSchema *schemapb.CollectionSchema) *schemap // in some server mode, we regard partition key field or vector field as clustering key by default. // here is the priority: clusteringKey > partitionKey > vector field(only single vector) if clusteringKeyField != nil { + if typeutil.IsVectorType(clusteringKeyField.GetDataType()) && + !paramtable.Get().CommonCfg.EnableVectorClusteringKey.GetAsBool() { + return nil + } return clusteringKeyField } else if paramtable.Get().CommonCfg.UsePartitionKeyAsClusteringKey.GetAsBool() && partitionKeyField != nil { return partitionKeyField - } else if paramtable.Get().CommonCfg.UseVectorAsClusteringKey.GetAsBool() && len(vectorFields) == 1 { + } else if paramtable.Get().CommonCfg.EnableVectorClusteringKey.GetAsBool() && + paramtable.Get().CommonCfg.UseVectorAsClusteringKey.GetAsBool() && + len(vectorFields) == 1 { return vectorFields[0] } return nil diff --git a/pkg/util/merr/errors.go b/pkg/util/merr/errors.go index 6d6ef240cb101..3412f371cb1ce 100644 --- a/pkg/util/merr/errors.go +++ b/pkg/util/merr/errors.go @@ -46,13 +46,14 @@ var ( ErrServiceResourceInsufficient = newMilvusError("service resource insufficient", 12, true) // Collection related - ErrCollectionNotFound = newMilvusError("collection not found", 100, false) - ErrCollectionNotLoaded = newMilvusError("collection not loaded", 101, false) - ErrCollectionNumLimitExceeded = newMilvusError("exceeded the limit number of collections", 102, false) - ErrCollectionNotFullyLoaded = newMilvusError("collection not fully loaded", 103, true) - ErrCollectionLoaded = newMilvusError("collection already loaded", 104, false) - ErrCollectionIllegalSchema = newMilvusError("illegal collection schema", 105, false) - ErrCollectionOnRecovering = newMilvusError("collection on recovering", 106, true) + ErrCollectionNotFound = newMilvusError("collection not found", 100, false) + ErrCollectionNotLoaded = newMilvusError("collection not loaded", 101, false) + ErrCollectionNumLimitExceeded = newMilvusError("exceeded the limit number of collections", 102, false) + ErrCollectionNotFullyLoaded = newMilvusError("collection not fully loaded", 103, true) + ErrCollectionLoaded = newMilvusError("collection already loaded", 104, false) + ErrCollectionIllegalSchema = newMilvusError("illegal collection schema", 105, false) + ErrCollectionOnRecovering = newMilvusError("collection on recovering", 106, true) + ErrCollectionVectorClusteringKeyNotAllowed = newMilvusError("vector clustering key not allowed", 107, false) // Partition related ErrPartitionNotFound = newMilvusError("partition not found", 200, false) diff --git a/pkg/util/merr/errors_test.go b/pkg/util/merr/errors_test.go index 67782d1c507e1..125a2e72f91a2 100644 --- a/pkg/util/merr/errors_test.go +++ b/pkg/util/merr/errors_test.go @@ -87,6 +87,7 @@ func (s *ErrSuite) TestWrap() { s.ErrorIs(WrapErrCollectionNotFullyLoaded("test_collection", "failed to query"), ErrCollectionNotFullyLoaded) s.ErrorIs(WrapErrCollectionNotLoaded("test_collection", "failed to alter index %s", "hnsw"), ErrCollectionNotLoaded) s.ErrorIs(WrapErrCollectionOnRecovering("test_collection", "channel lost %s", "dev"), ErrCollectionOnRecovering) + s.ErrorIs(WrapErrCollectionVectorClusteringKeyNotAllowed("test_collection", "field"), ErrCollectionVectorClusteringKeyNotAllowed) // Partition related s.ErrorIs(WrapErrPartitionNotFound("test_partition", "failed to get partition"), ErrPartitionNotFound) diff --git a/pkg/util/merr/utils.go b/pkg/util/merr/utils.go index 89b6d795e673d..a598e8056a0d4 100644 --- a/pkg/util/merr/utils.go +++ b/pkg/util/merr/utils.go @@ -489,6 +489,16 @@ func WrapErrCollectionOnRecovering(collection any, msgAndArgs ...any) error { return err } +// WrapErrCollectionVectorClusteringKeyNotAllowed wraps ErrCollectionVectorClusteringKeyNotAllowed with collection +func WrapErrCollectionVectorClusteringKeyNotAllowed(collection any, msgAndArgs ...any) error { + err := wrapFields(ErrCollectionVectorClusteringKeyNotAllowed, value("collection", collection)) + if len(msgAndArgs) > 0 { + msg := msgAndArgs[0].(string) + err = errors.Wrapf(err, msg, msgAndArgs[1:]...) + } + return err +} + func WrapErrAliasNotFound(db any, alias any, msg ...string) error { err := wrapFields(ErrAliasNotFound, value("database", db), diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index f18510566c57e..c5b4c4ba4cd6d 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -252,6 +252,7 @@ type commonConfig struct { UsePartitionKeyAsClusteringKey ParamItem `refreshable:"true"` UseVectorAsClusteringKey ParamItem `refreshable:"true"` + EnableVectorClusteringKey ParamItem `refreshable:"true"` } func (p *commonConfig) init(base *BaseTable) { @@ -777,7 +778,7 @@ like the old password verification when updating the credential`, p.UsePartitionKeyAsClusteringKey = ParamItem{ Key: "common.usePartitionKeyAsClusteringKey", - Version: "2.4.2", + Version: "2.4.6", Doc: "if true, do clustering compaction and segment prune on partition key field", DefaultValue: "false", } @@ -785,11 +786,19 @@ like the old password verification when updating the credential`, p.UseVectorAsClusteringKey = ParamItem{ Key: "common.useVectorAsClusteringKey", - Version: "2.4.2", + Version: "2.4.6", Doc: "if true, do clustering compaction and segment prune on vector field", DefaultValue: "false", } p.UseVectorAsClusteringKey.Init(base.mgr) + + p.EnableVectorClusteringKey = ParamItem{ + Key: "common.enableVectorClusteringKey", + Version: "2.4.6", + Doc: "if true, enable vector clustering key and vector clustering compaction", + DefaultValue: "false", + } + p.EnableVectorClusteringKey.Init(base.mgr) } type gpuConfig struct { @@ -3289,7 +3298,7 @@ During compaction, the size of segment # of rows is able to exceed segment max # p.ClusteringCompactionEnable = ParamItem{ Key: "dataCoord.compaction.clustering.enable", - Version: "2.4.2", + Version: "2.4.6", DefaultValue: "false", Doc: "Enable clustering compaction", Export: true, @@ -3298,7 +3307,7 @@ During compaction, the size of segment # of rows is able to exceed segment max # p.ClusteringCompactionAutoEnable = ParamItem{ Key: "dataCoord.compaction.clustering.autoEnable", - Version: "2.4.2", + Version: "2.4.6", DefaultValue: "false", Doc: "Enable auto clustering compaction", Export: true, @@ -3307,28 +3316,28 @@ During compaction, the size of segment # of rows is able to exceed segment max # p.ClusteringCompactionTriggerInterval = ParamItem{ Key: "dataCoord.compaction.clustering.triggerInterval", - Version: "2.4.2", + Version: "2.4.6", DefaultValue: "600", } p.ClusteringCompactionTriggerInterval.Init(base.mgr) p.ClusteringCompactionStateCheckInterval = ParamItem{ Key: "dataCoord.compaction.clustering.stateCheckInterval", - Version: "2.4.2", + Version: "2.4.6", DefaultValue: "10", } p.ClusteringCompactionStateCheckInterval.Init(base.mgr) p.ClusteringCompactionGCInterval = ParamItem{ Key: "dataCoord.compaction.clustering.gcInterval", - Version: "2.4.2", + Version: "2.4.6", DefaultValue: "600", } p.ClusteringCompactionGCInterval.Init(base.mgr) p.ClusteringCompactionMinInterval = ParamItem{ Key: "dataCoord.compaction.clustering.minInterval", - Version: "2.4.2", + Version: "2.4.6", Doc: "The minimum interval between clustering compaction executions of one collection, to avoid redundant compaction", DefaultValue: "3600", } @@ -3336,7 +3345,7 @@ During compaction, the size of segment # of rows is able to exceed segment max # p.ClusteringCompactionMaxInterval = ParamItem{ Key: "dataCoord.compaction.clustering.maxInterval", - Version: "2.4.2", + Version: "2.4.6", Doc: "If a collection haven't been clustering compacted for longer than maxInterval, force compact", DefaultValue: "86400", } @@ -3344,7 +3353,7 @@ During compaction, the size of segment # of rows is able to exceed segment max # p.ClusteringCompactionNewDataSizeThreshold = ParamItem{ Key: "dataCoord.compaction.clustering.newDataSizeThreshold", - Version: "2.4.2", + Version: "2.4.6", Doc: "If new data size is large than newDataSizeThreshold, execute clustering compaction", DefaultValue: "512m", } @@ -3352,14 +3361,14 @@ During compaction, the size of segment # of rows is able to exceed segment max # p.ClusteringCompactionTimeoutInSeconds = ParamItem{ Key: "dataCoord.compaction.clustering.timeout", - Version: "2.4.2", + Version: "2.4.6", DefaultValue: "3600", } p.ClusteringCompactionTimeoutInSeconds.Init(base.mgr) p.ClusteringCompactionDropTolerance = ParamItem{ Key: "dataCoord.compaction.clustering.dropTolerance", - Version: "2.4.2", + Version: "2.4.6", Doc: "If clustering compaction job is finished for a long time, gc it", DefaultValue: "259200", } @@ -3367,7 +3376,7 @@ During compaction, the size of segment # of rows is able to exceed segment max # p.ClusteringCompactionPreferSegmentSize = ParamItem{ Key: "dataCoord.compaction.clustering.preferSegmentSize", - Version: "2.4.2", + Version: "2.4.6", DefaultValue: "512m", PanicIfEmpty: false, Export: true, @@ -3376,7 +3385,7 @@ During compaction, the size of segment # of rows is able to exceed segment max # p.ClusteringCompactionMaxSegmentSize = ParamItem{ Key: "dataCoord.compaction.clustering.maxSegmentSize", - Version: "2.4.2", + Version: "2.4.6", DefaultValue: "1024m", PanicIfEmpty: false, Export: true, @@ -3385,7 +3394,7 @@ During compaction, the size of segment # of rows is able to exceed segment max # p.ClusteringCompactionMaxTrainSizeRatio = ParamItem{ Key: "dataCoord.compaction.clustering.maxTrainSizeRatio", - Version: "2.4.2", + Version: "2.4.6", DefaultValue: "0.8", Doc: "max data size ratio in Kmeans train, if larger than it, will down sampling to meet this limit", Export: true, @@ -3394,7 +3403,7 @@ During compaction, the size of segment # of rows is able to exceed segment max # p.ClusteringCompactionMaxCentroidsNum = ParamItem{ Key: "dataCoord.compaction.clustering.maxCentroidsNum", - Version: "2.4.2", + Version: "2.4.6", DefaultValue: "10240", Doc: "maximum centroids number in Kmeans train", Export: true, @@ -3403,7 +3412,7 @@ During compaction, the size of segment # of rows is able to exceed segment max # p.ClusteringCompactionMinCentroidsNum = ParamItem{ Key: "dataCoord.compaction.clustering.minCentroidsNum", - Version: "2.4.2", + Version: "2.4.6", DefaultValue: "16", Doc: "minimum centroids number in Kmeans train", Export: true, @@ -3412,7 +3421,7 @@ During compaction, the size of segment # of rows is able to exceed segment max # p.ClusteringCompactionMinClusterSizeRatio = ParamItem{ Key: "dataCoord.compaction.clustering.minClusterSizeRatio", - Version: "2.4.2", + Version: "2.4.6", DefaultValue: "0.01", Doc: "minimum cluster size / avg size in Kmeans train", Export: true, @@ -3421,7 +3430,7 @@ During compaction, the size of segment # of rows is able to exceed segment max # p.ClusteringCompactionMaxClusterSizeRatio = ParamItem{ Key: "dataCoord.compaction.clustering.maxClusterSizeRatio", - Version: "2.4.2", + Version: "2.4.6", DefaultValue: "10", Doc: "maximum cluster size / avg size in Kmeans train", Export: true, @@ -3430,7 +3439,7 @@ During compaction, the size of segment # of rows is able to exceed segment max # p.ClusteringCompactionMaxClusterSize = ParamItem{ Key: "dataCoord.compaction.clustering.maxClusterSize", - Version: "2.4.2", + Version: "2.4.6", DefaultValue: "5g", Doc: "maximum cluster size in Kmeans train", Export: true, diff --git a/pkg/util/paramtable/component_param_test.go b/pkg/util/paramtable/component_param_test.go index 6b41f34cd989b..234b0e38558c6 100644 --- a/pkg/util/paramtable/component_param_test.go +++ b/pkg/util/paramtable/component_param_test.go @@ -543,6 +543,8 @@ func TestComponentParam(t *testing.T) { assert.Equal(t, true, Params.UsePartitionKeyAsClusteringKey.GetAsBool()) params.Save("common.useVectorAsClusteringKey", "true") assert.Equal(t, true, Params.UseVectorAsClusteringKey.GetAsBool()) + params.Save("common.enableVectorClusteringKey", "true") + assert.Equal(t, true, Params.EnableVectorClusteringKey.GetAsBool()) }) }