Skip to content

Commit

Permalink
enhance: [2.4] pick some master improvements to 2.4 branch (#38128)
Browse files Browse the repository at this point in the history
- issue: #38127

master pr list:
- #37759
- #37835
- #37845
- #37874
- #37894
- #37969
- #37983
- #38005
- #38035

---------

Signed-off-by: SimFG <[email protected]>
  • Loading branch information
SimFG authored Dec 13, 2024
1 parent 85ade98 commit df73f93
Show file tree
Hide file tree
Showing 39 changed files with 489 additions and 138 deletions.
3 changes: 3 additions & 0 deletions cmd/tools/config/generate.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ func collectRecursive(params *paramtable.ComponentParam, data *[]DocContent, val
item := subVal.Interface().(paramtable.ParamItem) //nolint:govet
refreshable := tag.Get("refreshable")
defaultValue := params.GetWithDefault(item.Key, item.DefaultValue)
if strings.HasPrefix(item.DefaultValue, "\"") && strings.HasSuffix(item.DefaultValue, "\"") {
defaultValue = fmt.Sprintf("\"%s\"", defaultValue)
}
log.Debug("got key", zap.String("key", item.Key), zap.Any("value", defaultValue), zap.String("variable", val.Type().Field(j).Name))
*data = append(*data, DocContent{item.Key, defaultValue, item.Version, refreshable, item.Export, item.Doc})
} else if t == "paramtable.ParamGroup" {
Expand Down
2 changes: 1 addition & 1 deletion configs/milvus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -796,7 +796,7 @@ common:
# The superusers will ignore some system check processes,
# like the old password verification when updating the credential
superUsers:
defaultRootPassword: Milvus # default password for root user
defaultRootPassword: "Milvus" # default password for root user. The maximum length is 72 characters, and double quotes are required.
rbac:
overrideBuiltInPrivilgeGroups:
enabled: false # Whether to override build-in privilege groups
Expand Down
2 changes: 1 addition & 1 deletion internal/http/static/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ <h2>Result</h2>
if (xhr.status === 200) {
document.getElementById('resultText').textContent = JSON.stringify(JSON.parse(xhr.responseText), null, 2);
} else {
document.getElementById('resultText').textContent = 'Error: ' + xhr.status;
document.getElementById('resultText').textContent = `Error: ${xhr.status}, detail: ${xhr.responseText}`;
}
};
xhr.send();
Expand Down
4 changes: 2 additions & 2 deletions internal/metastore/model/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ func NewDatabase(id int64, name string, state pb.DatabaseState, properties []*co
}
}

func NewDefaultDatabase() *Database {
return NewDatabase(util.DefaultDBID, util.DefaultDBName, pb.DatabaseState_DatabaseCreated, nil)
func NewDefaultDatabase(prop []*commonpb.KeyValuePair) *Database {
return NewDatabase(util.DefaultDBID, util.DefaultDBName, pb.DatabaseState_DatabaseCreated, prop)
}

func (c *Database) Available() bool {
Expand Down
2 changes: 1 addition & 1 deletion internal/metastore/model/database_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,5 +59,5 @@ func TestDatabaseCloneAndEqual(t *testing.T) {

func TestDatabaseAvailable(t *testing.T) {
assert.True(t, dbModel.Available())
assert.True(t, NewDefaultDatabase().Available())
assert.True(t, NewDefaultDatabase(nil).Available())
}
66 changes: 21 additions & 45 deletions internal/proxy/meta_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"github.com/milvus-io/milvus/pkg/util"
"github.com/milvus-io/milvus/pkg/util/commonpbutil"
"github.com/milvus-io/milvus/pkg/util/conc"
"github.com/milvus-io/milvus/pkg/util/expr"
"github.com/milvus-io/milvus/pkg/util/funcutil"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
Expand All @@ -58,7 +59,7 @@ type Cache interface {
// GetCollectionName get collection's name and database by id
GetCollectionName(ctx context.Context, database string, collectionID int64) (string, error)
// GetCollectionInfo get collection's information by name or collection id, such as schema, and etc.
GetCollectionInfo(ctx context.Context, database, collectionName string, collectionID int64) (*collectionBasicInfo, error)
GetCollectionInfo(ctx context.Context, database, collectionName string, collectionID int64) (*collectionInfo, error)
// GetPartitionID get partition's identifier of specific collection.
GetPartitionID(ctx context.Context, database, collectionName string, partitionName string) (typeutil.UniqueID, error)
// GetPartitions get all partitions' id of specific collection.
Expand Down Expand Up @@ -93,13 +94,6 @@ type Cache interface {
// AllocID is only using on requests that need to skip timestamp allocation, don't overuse it.
AllocID(ctx context.Context) (int64, error)
}
type collectionBasicInfo struct {
collID typeutil.UniqueID
createdTimestamp uint64
createdUtcTimestamp uint64
consistencyLevel commonpb.ConsistencyLevel
partitionKeyIsolation bool
}

type collectionInfo struct {
collID typeutil.UniqueID
Expand Down Expand Up @@ -267,20 +261,7 @@ type partitionInfo struct {
partitionID typeutil.UniqueID
createdTimestamp uint64
createdUtcTimestamp uint64
}

// getBasicInfo get a basic info by deep copy.
func (info *collectionInfo) getBasicInfo() *collectionBasicInfo {
// Do a deep copy for all fields.
basicInfo := &collectionBasicInfo{
collID: info.collID,
createdTimestamp: info.createdTimestamp,
createdUtcTimestamp: info.createdUtcTimestamp,
consistencyLevel: info.consistencyLevel,
partitionKeyIsolation: info.partitionKeyIsolation,
}

return basicInfo
isDefault bool
}

func (info *collectionInfo) isCollectionCached() bool {
Expand Down Expand Up @@ -368,6 +349,7 @@ func InitMetaCache(ctx context.Context, rootCoord types.RootCoordClient, queryCo
if err != nil {
return err
}
expr.Register("cache", globalMetaCache)

// The privilege info is a little more. And to get this info, the query operation of involving multiple table queries is required.
resp, err := rootCoord.ListPolicy(ctx, &internalpb.ListPolicyRequest{})
Expand Down Expand Up @@ -458,12 +440,14 @@ func (m *MetaCache) update(ctx context.Context, database, collectionName string,
return nil, merr.WrapErrParameterInvalidMsg("partition names and timestamps number is not aligned, response: %s", partitions.String())
}

defaultPartitionName := Params.CommonCfg.DefaultPartitionName.GetValue()
infos := lo.Map(partitions.GetPartitionIDs(), func(partitionID int64, idx int) *partitionInfo {
return &partitionInfo{
name: partitions.PartitionNames[idx],
partitionID: partitions.PartitionIDs[idx],
createdTimestamp: partitions.CreatedTimestamps[idx],
createdUtcTimestamp: partitions.CreatedUtcTimestamps[idx],
isDefault: partitions.PartitionNames[idx] == defaultPartitionName,
}
})

Expand Down Expand Up @@ -568,7 +552,7 @@ func (m *MetaCache) GetCollectionName(ctx context.Context, database string, coll
return collInfo.schema.Name, nil
}

func (m *MetaCache) GetCollectionInfo(ctx context.Context, database string, collectionName string, collectionID int64) (*collectionBasicInfo, error) {
func (m *MetaCache) GetCollectionInfo(ctx context.Context, database string, collectionName string, collectionID int64) (*collectionInfo, error) {
collInfo, ok := m.getCollection(database, collectionName, 0)

method := "GetCollectionInfo"
Expand All @@ -583,11 +567,11 @@ func (m *MetaCache) GetCollectionInfo(ctx context.Context, database string, coll
return nil, err
}
metrics.ProxyUpdateCacheLatency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
return collInfo.getBasicInfo(), nil
return collInfo, nil
}

metrics.ProxyCacheStatsCounter.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), method, metrics.CacheHitLabel).Inc()
return collInfo.getBasicInfo(), nil
return collInfo, nil
}

// GetCollectionInfo returns the collection information related to provided collection name
Expand Down Expand Up @@ -661,6 +645,14 @@ func (m *MetaCache) GetPartitionInfo(ctx context.Context, database, collectionNa
return nil, err
}

if partitionName == "" {
for _, info := range partitions.partitionInfos {
if info.isDefault {
return info, nil
}
}
}

info, ok := partitions.name2Info[partitionName]
if !ok {
return nil, merr.WrapErrPartitionNotFound(partitionName)
Expand Down Expand Up @@ -718,30 +710,14 @@ func (m *MetaCache) describeCollection(ctx context.Context, database, collection
if err != nil {
return nil, err
}
resp := &milvuspb.DescribeCollectionResponse{
Status: coll.Status,
Schema: &schemapb.CollectionSchema{
Name: coll.Schema.Name,
Description: coll.Schema.Description,
AutoID: coll.Schema.AutoID,
Fields: make([]*schemapb.FieldSchema, 0),
EnableDynamicField: coll.Schema.EnableDynamicField,
},
CollectionID: coll.CollectionID,
VirtualChannelNames: coll.VirtualChannelNames,
PhysicalChannelNames: coll.PhysicalChannelNames,
CreatedTimestamp: coll.CreatedTimestamp,
CreatedUtcTimestamp: coll.CreatedUtcTimestamp,
ConsistencyLevel: coll.ConsistencyLevel,
DbName: coll.GetDbName(),
Properties: coll.Properties,
}
userFields := make([]*schemapb.FieldSchema, 0)
for _, field := range coll.Schema.Fields {
if field.FieldID >= common.StartOfUserFieldID {
resp.Schema.Fields = append(resp.Schema.Fields, field)
userFields = append(userFields, field)
}
}
return resp, nil
coll.Schema.Fields = userFields
return coll, nil
}

func (m *MetaCache) showPartitions(ctx context.Context, dbName string, collectionName string, collectionID UniqueID) (*milvuspb.ShowPartitionsResponse, error) {
Expand Down
33 changes: 26 additions & 7 deletions internal/proxy/meta_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,25 @@ type MockRootCoordClientInterface struct {
listPolicy func(ctx context.Context, in *internalpb.ListPolicyRequest) (*internalpb.ListPolicyResponse, error)
}

func EqualSchema(t *testing.T, expect, actual *schemapb.CollectionSchema) {
assert.Equal(t, expect.AutoID, actual.AutoID)
assert.Equal(t, expect.Description, actual.Description)
assert.Equal(t, expect.Name, actual.Name)
assert.Equal(t, expect.EnableDynamicField, actual.EnableDynamicField)
assert.Equal(t, len(expect.Fields), len(actual.Fields))
for i := range expect.Fields {
assert.Equal(t, expect.Fields[i], actual.Fields[i])
}
// assert.Equal(t, len(expect.Functions), len(actual.Functions))
// for i := range expect.Functions {
// assert.Equal(t, expect.Functions[i], actual.Functions[i])
// }
assert.Equal(t, len(expect.Properties), len(actual.Properties))
for i := range expect.Properties {
assert.Equal(t, expect.Properties[i], actual.Properties[i])
}
}

func (m *MockRootCoordClientInterface) IncAccessCount() {
atomic.AddInt32(&m.AccessCount, 1)
}
Expand Down Expand Up @@ -212,7 +231,7 @@ func TestMetaCache_GetCollection(t *testing.T) {
schema, err := globalMetaCache.GetCollectionSchema(ctx, dbName, "collection1")
assert.Equal(t, rootCoord.GetAccessCount(), 1)
assert.NoError(t, err)
assert.Equal(t, schema.CollectionSchema, &schemapb.CollectionSchema{
EqualSchema(t, schema.CollectionSchema, &schemapb.CollectionSchema{
AutoID: true,
Fields: []*schemapb.FieldSchema{},
Name: "collection1",
Expand All @@ -224,7 +243,7 @@ func TestMetaCache_GetCollection(t *testing.T) {
schema, err = globalMetaCache.GetCollectionSchema(ctx, dbName, "collection2")
assert.Equal(t, rootCoord.GetAccessCount(), 2)
assert.NoError(t, err)
assert.Equal(t, schema.CollectionSchema, &schemapb.CollectionSchema{
EqualSchema(t, schema.CollectionSchema, &schemapb.CollectionSchema{
AutoID: true,
Fields: []*schemapb.FieldSchema{},
Name: "collection2",
Expand All @@ -238,7 +257,7 @@ func TestMetaCache_GetCollection(t *testing.T) {
schema, err = globalMetaCache.GetCollectionSchema(ctx, dbName, "collection1")
assert.Equal(t, rootCoord.GetAccessCount(), 2)
assert.NoError(t, err)
assert.Equal(t, schema.CollectionSchema, &schemapb.CollectionSchema{
EqualSchema(t, schema.CollectionSchema, &schemapb.CollectionSchema{
AutoID: true,
Fields: []*schemapb.FieldSchema{},
Name: "collection1",
Expand Down Expand Up @@ -323,7 +342,7 @@ func TestMetaCache_GetCollectionName(t *testing.T) {
schema, err = globalMetaCache.GetCollectionSchema(ctx, dbName, "collection1")
assert.Equal(t, rootCoord.GetAccessCount(), 2)
assert.NoError(t, err)
assert.Equal(t, schema.CollectionSchema, &schemapb.CollectionSchema{
EqualSchema(t, schema.CollectionSchema, &schemapb.CollectionSchema{
AutoID: true,
Fields: []*schemapb.FieldSchema{},
Name: "collection1",
Expand All @@ -348,7 +367,7 @@ func TestMetaCache_GetCollectionFailure(t *testing.T) {

schema, err = globalMetaCache.GetCollectionSchema(ctx, dbName, "collection1")
assert.NoError(t, err)
assert.Equal(t, schema.CollectionSchema, &schemapb.CollectionSchema{
EqualSchema(t, schema.CollectionSchema, &schemapb.CollectionSchema{
AutoID: true,
Fields: []*schemapb.FieldSchema{},
Name: "collection1",
Expand All @@ -357,7 +376,7 @@ func TestMetaCache_GetCollectionFailure(t *testing.T) {
rootCoord.Error = true
// should be cached with no error
assert.NoError(t, err)
assert.Equal(t, schema.CollectionSchema, &schemapb.CollectionSchema{
EqualSchema(t, schema.CollectionSchema, &schemapb.CollectionSchema{
AutoID: true,
Fields: []*schemapb.FieldSchema{},
Name: "collection1",
Expand Down Expand Up @@ -421,7 +440,7 @@ func TestMetaCache_ConcurrentTest1(t *testing.T) {
// GetCollectionSchema will never fail
schema, err := globalMetaCache.GetCollectionSchema(ctx, dbName, "collection1")
assert.NoError(t, err)
assert.Equal(t, schema.CollectionSchema, &schemapb.CollectionSchema{
EqualSchema(t, schema.CollectionSchema, &schemapb.CollectionSchema{
AutoID: true,
Fields: []*schemapb.FieldSchema{},
Name: "collection1",
Expand Down
18 changes: 11 additions & 7 deletions internal/proxy/mock_cache.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions internal/proxy/msg_pack.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ func genInsertMsgsByPartition(ctx context.Context,
),
CollectionID: insertMsg.CollectionID,
PartitionID: partitionID,
DbName: insertMsg.DbName,
CollectionName: insertMsg.CollectionName,
PartitionName: partitionName,
SegmentID: segmentID,
Expand Down
8 changes: 7 additions & 1 deletion internal/proxy/rate_limit_interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,13 @@ func getCollectionAndPartitionID(ctx context.Context, r reqPartName) (int64, map
return 0, nil, err
}
if r.GetPartitionName() == "" {
return db.dbID, map[int64][]int64{collectionID: {}}, nil
collectionSchema, err := globalMetaCache.GetCollectionSchema(ctx, r.GetDbName(), r.GetCollectionName())
if err != nil {
return 0, nil, err
}
if collectionSchema.IsPartitionKeyCollection() {
return db.dbID, map[int64][]int64{collectionID: {}}, nil
}
}
part, err := globalMetaCache.GetPartitionInfo(ctx, r.GetDbName(), r.GetCollectionName(), r.GetPartitionName())
if err != nil {
Expand Down
Loading

0 comments on commit df73f93

Please sign in to comment.