Skip to content

Commit

Permalink
fix: Check clustering key skip load behavior (milvus-io#35865)
Browse files Browse the repository at this point in the history
feature issue: milvus-io#35415
See also milvus-io#35861

Signed-off-by: Congqi Xia <[email protected]>
  • Loading branch information
congqixia authored Sep 2, 2024
1 parent 576ac2b commit 9d80137
Show file tree
Hide file tree
Showing 6 changed files with 216 additions and 15 deletions.
9 changes: 8 additions & 1 deletion internal/proxy/meta_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,8 +218,9 @@ func (s *schemaInfo) GetLoadFieldIDs(loadFields []string, skipDynamicField bool)
func (s *schemaInfo) validateLoadFields(names []string, fields []*schemapb.FieldSchema) error {
// ignore error if not found
partitionKeyField, _ := s.schemaHelper.GetPartitionKeyField()
clusteringKeyField, _ := s.schemaHelper.GetClusteringKeyField()

var hasPrimaryKey, hasPartitionKey, hasVector bool
var hasPrimaryKey, hasPartitionKey, hasClusteringKey, hasVector bool
for _, field := range fields {
if field.GetFieldID() == s.pkField.GetFieldID() {
hasPrimaryKey = true
Expand All @@ -230,6 +231,9 @@ func (s *schemaInfo) validateLoadFields(names []string, fields []*schemapb.Field
if field.IsPartitionKey {
hasPartitionKey = true
}
if field.IsClusteringKey {
hasClusteringKey = true
}
}

if !hasPrimaryKey {
Expand All @@ -241,6 +245,9 @@ func (s *schemaInfo) validateLoadFields(names []string, fields []*schemapb.Field
if partitionKeyField != nil && !hasPartitionKey {
return merr.WrapErrParameterInvalidMsg("load field list %v does not contain partition key field %s", names, partitionKeyField.GetName())
}
if clusteringKeyField != nil && !hasClusteringKey {
return merr.WrapErrParameterInvalidMsg("load field list %v does not contain clsutering key field %s", names, clusteringKeyField.GetName())
}
return nil
}

Expand Down
31 changes: 28 additions & 3 deletions internal/proxy/meta_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1196,6 +1196,12 @@ func TestSchemaInfo_GetLoadFieldIDs(t *testing.T) {
DataType: schemapb.DataType_JSON,
IsDynamic: true,
}
clusteringKeyField := &schemapb.FieldSchema{
FieldID: common.StartOfUserFieldID + 5,
Name: "clustering_key",
DataType: schemapb.DataType_Int32,
IsClusteringKey: true,
}

testCases := []testCase{
{
Expand Down Expand Up @@ -1229,11 +1235,12 @@ func TestSchemaInfo_GetLoadFieldIDs(t *testing.T) {
partitionKeyField,
vectorField,
dynamicField,
clusteringKeyField,
},
},
loadFields: nil,
skipDynamicField: false,
expectResult: []int64{common.StartOfUserFieldID, common.StartOfUserFieldID + 2, common.StartOfUserFieldID + 3, common.StartOfUserFieldID + 4},
expectResult: []int64{common.StartOfUserFieldID, common.StartOfUserFieldID + 2, common.StartOfUserFieldID + 3, common.StartOfUserFieldID + 4, common.StartOfUserFieldID + 5},
expectErr: false,
},
{
Expand All @@ -1248,11 +1255,12 @@ func TestSchemaInfo_GetLoadFieldIDs(t *testing.T) {
partitionKeyField,
vectorField,
dynamicField,
clusteringKeyField,
},
},
loadFields: []string{"pk", "part_key", "vector"},
loadFields: []string{"pk", "part_key", "vector", "clustering_key"},
skipDynamicField: false,
expectResult: []int64{common.StartOfUserFieldID, common.StartOfUserFieldID + 2, common.StartOfUserFieldID + 3, common.StartOfUserFieldID + 4},
expectResult: []int64{common.StartOfUserFieldID, common.StartOfUserFieldID + 2, common.StartOfUserFieldID + 3, common.StartOfUserFieldID + 4, common.StartOfUserFieldID + 5},
expectErr: false,
},
{
Expand Down Expand Up @@ -1328,6 +1336,23 @@ func TestSchemaInfo_GetLoadFieldIDs(t *testing.T) {
skipDynamicField: true,
expectErr: true,
},
{
tag: "clustering_key_not_loaded",
schema: &schemapb.CollectionSchema{
EnableDynamicField: true,
Fields: []*schemapb.FieldSchema{
rowIDField,
timestampField,
pkField,
scalarField,
partitionKeyField,
vectorField,
clusteringKeyField,
},
},
loadFields: []string{"pk", "part_key", "vector"},
expectErr: true,
},
}

for _, tc := range testCases {
Expand Down
7 changes: 4 additions & 3 deletions internal/proxy/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -634,9 +634,6 @@ func validateMultipleVectorFields(schema *schemapb.CollectionSchema) error {
}

func validateLoadFieldsList(schema *schemapb.CollectionSchema) error {
// ignore error if not found
// partitionKeyField, _ := s.schemaHelper.GetPartitionKeyField()

var vectorCnt int
for _, field := range schema.Fields {
shouldLoad, err := common.ShouldFieldBeLoaded(field.GetTypeParams())
Expand All @@ -658,6 +655,10 @@ func validateLoadFieldsList(schema *schemapb.CollectionSchema) error {
if field.IsPartitionKey {
return merr.WrapErrParameterInvalidMsg("Partition Key field %s cannot skip loading", field.GetName())
}

if field.IsClusteringKey {
return merr.WrapErrParameterInvalidMsg("Clustering Key field %s cannot skip loading", field.GetName())
}
}

if vectorCnt == 0 {
Expand Down
24 changes: 24 additions & 0 deletions internal/proxy/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2521,6 +2521,12 @@ func TestValidateLoadFieldsList(t *testing.T) {
DataType: schemapb.DataType_JSON,
IsDynamic: true,
}
clusteringKeyField := &schemapb.FieldSchema{
FieldID: common.StartOfUserFieldID + 5,
Name: common.MetaFieldName,
DataType: schemapb.DataType_Int32,
IsClusteringKey: true,
}

addSkipLoadAttr := func(f *schemapb.FieldSchema, flag bool) *schemapb.FieldSchema {
result := typeutil.Clone(f)
Expand All @@ -2544,6 +2550,7 @@ func TestValidateLoadFieldsList(t *testing.T) {
partitionKeyField,
vectorField,
dynamicField,
clusteringKeyField,
},
},
expectErr: false,
Expand Down Expand Up @@ -2596,6 +2603,23 @@ func TestValidateLoadFieldsList(t *testing.T) {
},
expectErr: true,
},
{
tag: "clustering_key_not_loaded",
schema: &schemapb.CollectionSchema{
EnableDynamicField: true,
Fields: []*schemapb.FieldSchema{
rowIDField,
timestampField,
pkField,
scalarField,
partitionKeyField,
vectorField,
dynamicField,
addSkipLoadAttr(clusteringKeyField, true),
},
},
expectErr: true,
},
}

for _, tc := range testCases {
Expand Down
42 changes: 34 additions & 8 deletions pkg/util/typeutil/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,20 +251,30 @@ func EstimateEntitySize(fieldsData []*schemapb.FieldData, rowOffset int) (int, e

// SchemaHelper provides methods to get the schema of fields
type SchemaHelper struct {
schema *schemapb.CollectionSchema
nameOffset map[string]int
idOffset map[int64]int
primaryKeyOffset int
partitionKeyOffset int
dynamicFieldOffset int
loadFields Set[int64]
schema *schemapb.CollectionSchema
nameOffset map[string]int
idOffset map[int64]int
primaryKeyOffset int
partitionKeyOffset int
clusteringKeyOffset int
dynamicFieldOffset int
loadFields Set[int64]
}

func CreateSchemaHelperWithLoadFields(schema *schemapb.CollectionSchema, loadFields []int64) (*SchemaHelper, error) {
if schema == nil {
return nil, errors.New("schema is nil")
}
schemaHelper := SchemaHelper{schema: schema, nameOffset: make(map[string]int), idOffset: make(map[int64]int), primaryKeyOffset: -1, partitionKeyOffset: -1, dynamicFieldOffset: -1, loadFields: NewSet(loadFields...)}
schemaHelper := SchemaHelper{
schema: schema,
nameOffset: make(map[string]int),
idOffset: make(map[int64]int),
primaryKeyOffset: -1,
partitionKeyOffset: -1,
clusteringKeyOffset: -1,
dynamicFieldOffset: -1,
loadFields: NewSet(loadFields...),
}
for offset, field := range schema.Fields {
if _, ok := schemaHelper.nameOffset[field.Name]; ok {
return nil, fmt.Errorf("duplicated fieldName: %s", field.Name)
Expand All @@ -288,6 +298,13 @@ func CreateSchemaHelperWithLoadFields(schema *schemapb.CollectionSchema, loadFie
schemaHelper.partitionKeyOffset = offset
}

if field.IsClusteringKey {
if schemaHelper.clusteringKeyOffset != -1 {
return nil, errors.New("clustering key is not unique")
}
schemaHelper.clusteringKeyOffset = offset
}

if field.IsDynamic {
if schemaHelper.dynamicFieldOffset != -1 {
return nil, errors.New("dynamic field is not unique")
Expand Down Expand Up @@ -319,6 +336,15 @@ func (helper *SchemaHelper) GetPartitionKeyField() (*schemapb.FieldSchema, error
return helper.schema.Fields[helper.partitionKeyOffset], nil
}

// GetClusteringKeyField returns the schema of the clustering key.
// If not found, an error shall be returned.
func (helper *SchemaHelper) GetClusteringKeyField() (*schemapb.FieldSchema, error) {
if helper.clusteringKeyOffset == -1 {
return nil, fmt.Errorf("failed to get clustering key field: not clustering key in schema")
}
return helper.schema.Fields[helper.clusteringKeyOffset], nil
}

// GetDynamicField returns the field schema of dynamic field if exists.
// if there is no dynamic field defined in schema, error will be returned.
func (helper *SchemaHelper) GetDynamicField() (*schemapb.FieldSchema, error) {
Expand Down
118 changes: 118 additions & 0 deletions pkg/util/typeutil/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,124 @@ func TestSchemaHelper_GetDynamicField(t *testing.T) {
})
}

func TestSchemaHelper_GetClusteringKeyField(t *testing.T) {
t.Run("with_clustering_key", func(t *testing.T) {
sch := &schemapb.CollectionSchema{
Name: "testColl",
Description: "",
AutoID: false,
Fields: []*schemapb.FieldSchema{
{
FieldID: 100,
Name: "field_int64",
IsPrimaryKey: true,
DataType: schemapb.DataType_Int64,
},
{
FieldID: 101,
Name: "field_float_vector",
DataType: schemapb.DataType_FloatVector,
TypeParams: []*commonpb.KeyValuePair{
{
Key: common.DimKey,
Value: "128",
},
},
},
{
FieldID: 102,
Name: "group",
DataType: schemapb.DataType_Int64,
IsClusteringKey: true,
},
},
}

helper, err := CreateSchemaHelper(sch)
require.NoError(t, err)

f, err := helper.GetClusteringKeyField()
assert.NoError(t, err)
assert.NotNil(t, f)
assert.EqualValues(t, 102, f.FieldID)
})

t.Run("without_clusteriny_key_schema", func(t *testing.T) {
sch := &schemapb.CollectionSchema{
Name: "testColl",
Description: "",
AutoID: false,
Fields: []*schemapb.FieldSchema{
{
FieldID: 100,
Name: "field_int64",
IsPrimaryKey: true,
DataType: schemapb.DataType_Int64,
},
{
FieldID: 101,
Name: "field_float_vector",
DataType: schemapb.DataType_FloatVector,
TypeParams: []*commonpb.KeyValuePair{
{
Key: common.DimKey,
Value: "128",
},
},
},
},
}

helper, err := CreateSchemaHelper(sch)
require.NoError(t, err)

_, err = helper.GetClusteringKeyField()
assert.Error(t, err)
})

t.Run("multiple_dynamic_fields", func(t *testing.T) {
sch := &schemapb.CollectionSchema{
Name: "testColl",
Description: "",
AutoID: false,
Fields: []*schemapb.FieldSchema{
{
FieldID: 100,
Name: "field_int64",
IsPrimaryKey: true,
DataType: schemapb.DataType_Int64,
},
{
FieldID: 101,
Name: "field_float_vector",
DataType: schemapb.DataType_FloatVector,
TypeParams: []*commonpb.KeyValuePair{
{
Key: common.DimKey,
Value: "128",
},
},
},
{
FieldID: 102,
Name: "group",
DataType: schemapb.DataType_Int64,
IsClusteringKey: true,
},
{
FieldID: 103,
Name: "batch",
DataType: schemapb.DataType_VarChar,
IsClusteringKey: true,
},
},
}

_, err := CreateSchemaHelper(sch)
assert.Error(t, err)
})
}

func TestSchema_invalid(t *testing.T) {
t.Run("Duplicate field name", func(t *testing.T) {
schema := &schemapb.CollectionSchema{
Expand Down

0 comments on commit 9d80137

Please sign in to comment.