Skip to content

Commit

Permalink
fix: Add version to the proxy cache to resolve concurrency issues (#3…
Browse files Browse the repository at this point in the history
…8067)

issue: #36989

---------

Signed-off-by: Cai Zhang <[email protected]>
  • Loading branch information
xiaocai2333 authored Dec 4, 2024
1 parent a65d395 commit 73aa95f
Show file tree
Hide file tree
Showing 19 changed files with 287 additions and 87 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ require (
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
github.com/klauspost/compress v1.17.9
github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d
github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20241129033252-5d0b09587056
github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20241202112430-822be0295910
github.com/minio/minio-go/v7 v7.0.73
github.com/pingcap/log v1.1.1-0.20221015072633-39906604fb81
github.com/prometheus/client_golang v1.14.0
Expand Down
8 changes: 2 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -630,12 +630,8 @@ github.com/milvus-io/cgosymbolizer v0.0.0-20240722103217-b7dee0e50119 h1:9VXijWu
github.com/milvus-io/cgosymbolizer v0.0.0-20240722103217-b7dee0e50119/go.mod h1:DvXTE/K/RtHehxU8/GtDs4vFtfw64jJ3PaCnFri8CRg=
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b h1:TfeY0NxYxZzUfIfYe5qYDBzt4ZYRqzUjTR6CvUzjat8=
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b/go.mod h1:iwW+9cWfIzzDseEBCCeDSN5SD16Tidvy8cwQ7ZY8Qj4=
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20241120015424-93892e628c69 h1:Qt0Bv2Fum3EX3OlkuQYHJINBzeU4oEuHy2lXSfB/gZw=
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20241120015424-93892e628c69/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20241129024423-3911e6ebd8a6 h1:TrGZtojfj84Rdd1XAaGULCWZqO3rJMiGS8vxFXHT7G4=
github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20241129024423-3911e6ebd8a6/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20241129033252-5d0b09587056 h1:o2uJgfwTOg8bu/E9n6TvmFT2XPrPm1v0XFhc6XXcFoE=
github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20241129033252-5d0b09587056/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20241202112430-822be0295910 h1:cFRrdFZwhFHv33pue1z8beYSvrXDYFSFsCuvXGX3DHE=
github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20241202112430-822be0295910/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
github.com/milvus-io/pulsar-client-go v0.12.1 h1:O2JZp1tsYiO7C0MQ4hrUY/aJXnn2Gry6hpm7UodghmE=
github.com/milvus-io/pulsar-client-go v0.12.1/go.mod h1:dkutuH4oS2pXiGm+Ti7fQZ4MRjrMPZ8IJeEGAWMeckk=
github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 h1:AMFGa4R4MiIpspGNG7Z948v4n35fFGB3RR3G/ry4FWs=
Expand Down
32 changes: 14 additions & 18 deletions internal/proxy/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,21 +129,21 @@ func (node *Proxy) InvalidateCollectionMetaCache(ctx context.Context, request *p
if globalMetaCache != nil {
switch msgType {
case commonpb.MsgType_DropCollection, commonpb.MsgType_RenameCollection, commonpb.MsgType_DropAlias, commonpb.MsgType_AlterAlias:
if collectionName != "" {
globalMetaCache.RemoveCollection(ctx, request.GetDbName(), collectionName) // no need to return error, though collection may be not cached
globalMetaCache.DeprecateShardCache(request.GetDbName(), collectionName)
}
if request.CollectionID != UniqueID(0) {
aliasName = globalMetaCache.RemoveCollectionsByID(ctx, collectionID)
aliasName = globalMetaCache.RemoveCollectionsByID(ctx, collectionID, request.GetBase().GetTimestamp(), msgType == commonpb.MsgType_DropCollection)
for _, name := range aliasName {
globalMetaCache.DeprecateShardCache(request.GetDbName(), name)
}
}
if collectionName != "" {
globalMetaCache.RemoveCollection(ctx, request.GetDbName(), collectionName) // no need to return error, though collection may be not cached
globalMetaCache.DeprecateShardCache(request.GetDbName(), collectionName)
}
log.Info("complete to invalidate collection meta cache with collection name", zap.String("type", request.GetBase().GetMsgType().String()))
case commonpb.MsgType_LoadCollection, commonpb.MsgType_ReleaseCollection:
// All the request from query use collectionID
if request.CollectionID != UniqueID(0) {
aliasName = globalMetaCache.RemoveCollectionsByID(ctx, collectionID)
aliasName = globalMetaCache.RemoveCollectionsByID(ctx, collectionID, 0, false)
for _, name := range aliasName {
globalMetaCache.DeprecateShardCache(request.GetDbName(), name)
}
Expand All @@ -154,31 +154,27 @@ func (node *Proxy) InvalidateCollectionMetaCache(ctx context.Context, request *p
log.Warn("invalidate collection meta cache failed. partitionName is empty")
return &commonpb.Status{ErrorCode: commonpb.ErrorCode_UnexpectedError}, nil
}
// no need to deprecate shard cache because shard won't change when create or drop partition
globalMetaCache.RemoveCollection(ctx, request.GetDbName(), collectionName)
// drop all the alias as well
if request.CollectionID != UniqueID(0) {
aliasName = globalMetaCache.RemoveCollectionsByID(ctx, collectionID)
for _, name := range aliasName {
globalMetaCache.DeprecateShardCache(request.GetDbName(), name)
}
aliasName = globalMetaCache.RemoveCollectionsByID(ctx, collectionID, request.GetBase().GetTimestamp(), false)
}
globalMetaCache.RemoveCollection(ctx, request.GetDbName(), collectionName)
log.Info("complete to invalidate collection meta cache", zap.String("type", request.GetBase().GetMsgType().String()))
case commonpb.MsgType_DropDatabase:
globalMetaCache.RemoveDatabase(ctx, request.GetDbName())
default:
log.Warn("receive unexpected msgType of invalidate collection meta cache", zap.String("msgType", request.GetBase().GetMsgType().String()))

if collectionName != "" {
globalMetaCache.RemoveCollection(ctx, request.GetDbName(), collectionName) // no need to return error, though collection may be not cached
globalMetaCache.DeprecateShardCache(request.GetDbName(), collectionName)
}
if request.CollectionID != UniqueID(0) {
aliasName = globalMetaCache.RemoveCollectionsByID(ctx, collectionID)
aliasName = globalMetaCache.RemoveCollectionsByID(ctx, collectionID, request.GetBase().GetTimestamp(), false)
for _, name := range aliasName {
globalMetaCache.DeprecateShardCache(request.GetDbName(), name)
}
}

if collectionName != "" {
globalMetaCache.RemoveCollection(ctx, request.GetDbName(), collectionName) // no need to return error, though collection may be not cached
globalMetaCache.DeprecateShardCache(request.GetDbName(), collectionName)
}
}
}

Expand Down
80 changes: 58 additions & 22 deletions internal/proxy/meta_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ type Cache interface {
InvalidateShardLeaderCache(collections []int64)
ListShardLocation() map[int64]nodeInfo
RemoveCollection(ctx context.Context, database, collectionName string)
RemoveCollectionsByID(ctx context.Context, collectionID UniqueID) []string
RemoveCollectionsByID(ctx context.Context, collectionID UniqueID, version uint64, removeVersion bool) []string

// GetCredentialInfo operate credential cache
GetCredentialInfo(ctx context.Context, username string) (*internalpb.CredentialInfo, error)
Expand Down Expand Up @@ -340,6 +340,8 @@ type MetaCache struct {
IDCount int64
IDIndex int64
IDLock sync.RWMutex

collectionCacheVersion map[UniqueID]uint64 // collectionID -> cacheVersion
}

// globalMetaCache is singleton instance of Cache
Expand Down Expand Up @@ -368,15 +370,16 @@ func InitMetaCache(ctx context.Context, rootCoord types.RootCoordClient, queryCo
// NewMetaCache creates a MetaCache with provided RootCoord and QueryNode
func NewMetaCache(rootCoord types.RootCoordClient, queryCoord types.QueryCoordClient, shardMgr shardClientMgr) (*MetaCache, error) {
return &MetaCache{
rootCoord: rootCoord,
queryCoord: queryCoord,
dbInfo: map[string]*databaseInfo{},
collInfo: map[string]map[string]*collectionInfo{},
collLeader: map[string]map[string]*shardLeaders{},
credMap: map[string]*internalpb.CredentialInfo{},
shardMgr: shardMgr,
privilegeInfos: map[string]struct{}{},
userToRoles: map[string]map[string]struct{}{},
rootCoord: rootCoord,
queryCoord: queryCoord,
dbInfo: map[string]*databaseInfo{},
collInfo: map[string]map[string]*collectionInfo{},
collLeader: map[string]map[string]*shardLeaders{},
credMap: map[string]*internalpb.CredentialInfo{},
shardMgr: shardMgr,
privilegeInfos: map[string]struct{}{},
userToRoles: map[string]map[string]struct{}{},
collectionCacheVersion: make(map[UniqueID]uint64),
}, nil
}

Expand Down Expand Up @@ -445,19 +448,36 @@ func (m *MetaCache) update(ctx context.Context, database, collectionName string,
if database == "" {
log.Warn("database is empty, use default database name", zap.String("collectionName", collectionName), zap.Stack("stack"))
}
isolation, err := common.IsPartitionKeyIsolationKvEnabled(collection.Properties...)
if err != nil {
return nil, err
}

schemaInfo := newSchemaInfoWithLoadFields(collection.Schema, loadFields)

m.mu.Lock()
defer m.mu.Unlock()
curVersion := m.collectionCacheVersion[collection.GetCollectionID()]
// Compatibility logic: if the rootcoord version is lower(requestTime = 0), update the cache directly.
if collection.GetRequestTime() < curVersion && collection.GetRequestTime() != 0 {
log.Debug("describe collection timestamp less than version, don't update cache",
zap.String("collectionName", collectionName),
zap.Uint64("version", collection.GetRequestTime()), zap.Uint64("cache version", curVersion))
return &collectionInfo{
collID: collection.CollectionID,
schema: schemaInfo,
partInfo: parsePartitionsInfo(infos, schemaInfo.hasPartitionKeyField),
createdTimestamp: collection.CreatedTimestamp,
createdUtcTimestamp: collection.CreatedUtcTimestamp,
consistencyLevel: collection.ConsistencyLevel,
partitionKeyIsolation: isolation,
}, nil
}
_, dbOk := m.collInfo[database]
if !dbOk {
m.collInfo[database] = make(map[string]*collectionInfo)
}

isolation, err := common.IsPartitionKeyIsolationKvEnabled(collection.Properties...)
if err != nil {
return nil, err
}

schemaInfo := newSchemaInfoWithLoadFields(collection.Schema, loadFields)
m.collInfo[database][collectionName] = &collectionInfo{
collID: collection.CollectionID,
schema: schemaInfo,
Expand All @@ -470,9 +490,14 @@ func (m *MetaCache) update(ctx context.Context, database, collectionName string,

log.Ctx(ctx).Info("meta update success", zap.String("database", database), zap.String("collectionName", collectionName),
zap.String("actual collection Name", collection.Schema.GetName()), zap.Int64("collectionID", collection.CollectionID),
zap.Strings("partition", partitions.PartitionNames),
zap.Strings("partition", partitions.PartitionNames), zap.Uint64("currentVersion", curVersion),
zap.Uint64("version", collection.GetRequestTime()),
)
return m.collInfo[database][collectionName], nil

m.collectionCacheVersion[collection.GetCollectionID()] = collection.GetRequestTime()
collInfo := m.collInfo[database][collectionName]

return collInfo, nil
}

func buildSfKeyByName(database, collectionName string) string {
Expand Down Expand Up @@ -822,19 +847,30 @@ func (m *MetaCache) RemoveCollection(ctx context.Context, database, collectionNa
log.Debug("remove collection", zap.String("db", database), zap.String("collection", collectionName))
}

func (m *MetaCache) RemoveCollectionsByID(ctx context.Context, collectionID UniqueID) []string {
func (m *MetaCache) RemoveCollectionsByID(ctx context.Context, collectionID UniqueID, version uint64, removeVersion bool) []string {
m.mu.Lock()
defer m.mu.Unlock()

curVersion := m.collectionCacheVersion[collectionID]
var collNames []string
for database, db := range m.collInfo {
for k, v := range db {
if v.collID == collectionID {
delete(m.collInfo[database], k)
collNames = append(collNames, k)
if version == 0 || curVersion <= version {
delete(m.collInfo[database], k)
collNames = append(collNames, k)
}
}
}
}
log.Debug("remove collection by id", zap.Int64("id", collectionID), zap.Strings("collection", collNames))
if removeVersion {
delete(m.collectionCacheVersion, collectionID)
} else if version != 0 {
m.collectionCacheVersion[collectionID] = version
}
log.Debug("remove collection by id", zap.Int64("id", collectionID),
zap.Strings("collection", collNames), zap.Uint64("currentVersion", curVersion),
zap.Uint64("version", version), zap.Bool("removeVersion", removeVersion))
return collNames
}

Expand Down
70 changes: 65 additions & 5 deletions internal/proxy/meta_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,8 @@ func (m *MockRootCoordClientInterface) DescribeCollection(ctx context.Context, i
AutoID: true,
Name: "collection1",
},
DbName: dbName,
DbName: dbName,
RequestTime: 100,
}, nil
}
if in.CollectionName == "collection2" || in.CollectionID == 2 {
Expand All @@ -151,7 +152,8 @@ func (m *MockRootCoordClientInterface) DescribeCollection(ctx context.Context, i
AutoID: true,
Name: "collection2",
},
DbName: dbName,
DbName: dbName,
RequestTime: 100,
}, nil
}
if in.CollectionName == "errorCollection" {
Expand All @@ -161,7 +163,8 @@ func (m *MockRootCoordClientInterface) DescribeCollection(ctx context.Context, i
Schema: &schemapb.CollectionSchema{
AutoID: true,
},
DbName: dbName,
DbName: dbName,
RequestTime: 100,
}, nil
}

Expand Down Expand Up @@ -791,14 +794,14 @@ func TestMetaCache_RemoveCollection(t *testing.T) {
// shouldn't access RootCoord again
assert.Equal(t, rootCoord.GetAccessCount(), 2)

globalMetaCache.RemoveCollectionsByID(ctx, UniqueID(1))
globalMetaCache.RemoveCollectionsByID(ctx, UniqueID(1), 100, false)
// no collectionInfo of collection2, should access RootCoord
_, err = globalMetaCache.GetCollectionInfo(ctx, dbName, "collection1", 1)
assert.NoError(t, err)
// shouldn't access RootCoord again
assert.Equal(t, rootCoord.GetAccessCount(), 3)

globalMetaCache.RemoveCollectionsByID(ctx, UniqueID(1))
globalMetaCache.RemoveCollectionsByID(ctx, UniqueID(1), 100, false)
// no collectionInfo of collection2, should access RootCoord
_, err = globalMetaCache.GetCollectionInfo(ctx, dbName, "collection1", 1)
assert.NoError(t, err)
Expand Down Expand Up @@ -1259,3 +1262,60 @@ func TestSchemaInfo_GetLoadFieldIDs(t *testing.T) {
})
}
}

func TestMetaCache_Parallel(t *testing.T) {
ctx := context.Background()
rootCoord := mocks.NewMockRootCoordClient(t)
queryCoord := mocks.NewMockQueryCoordClient(t)
queryCoord.EXPECT().ShowCollections(mock.Anything, mock.Anything).Return(&querypb.ShowCollectionsResponse{}, nil).Maybe()
rootCoord.EXPECT().ShowPartitions(mock.Anything, mock.Anything).Return(&milvuspb.ShowPartitionsResponse{
Status: merr.Success(),
}, nil).Maybe()
mgr := newShardClientMgr()
cache, err := NewMetaCache(rootCoord, queryCoord, mgr)
assert.NoError(t, err)

cacheVersion := uint64(100)
// clean cache
cache.RemoveCollectionsByID(ctx, 111, cacheVersion+2, false)

// update cache, but version is smaller
rootCoord.EXPECT().DescribeCollection(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, request *milvuspb.DescribeCollectionRequest, option ...grpc.CallOption) (*milvuspb.DescribeCollectionResponse, error) {
return &milvuspb.DescribeCollectionResponse{
Status: merr.Success(),
Schema: &schemapb.CollectionSchema{
Name: "collection1",
},
CollectionID: 111,
DbName: dbName,
RequestTime: cacheVersion,
}, nil
}).Once()

collInfo, err := cache.update(ctx, dbName, "collection1", 111)
assert.NoError(t, err)
assert.Equal(t, "collection1", collInfo.schema.Name)
assert.Equal(t, int64(111), collInfo.collID)
_, ok := cache.collInfo[dbName]["collection1"]
assert.False(t, ok)

rootCoord.EXPECT().DescribeCollection(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, request *milvuspb.DescribeCollectionRequest, option ...grpc.CallOption) (*milvuspb.DescribeCollectionResponse, error) {
cacheVersion++
return &milvuspb.DescribeCollectionResponse{
Status: merr.Success(),
Schema: &schemapb.CollectionSchema{
Name: "collection1",
},
CollectionID: 111,
DbName: dbName,
RequestTime: cacheVersion + 5,
}, nil
}).Once()

collInfo, err = cache.update(ctx, dbName, "collection1", 111)
assert.NoError(t, err)
assert.Equal(t, "collection1", collInfo.schema.Name)
assert.Equal(t, int64(111), collInfo.collID)
_, ok = cache.collInfo[dbName]["collection1"]
assert.True(t, ok)
}
22 changes: 12 additions & 10 deletions internal/proxy/mock_cache.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 73aa95f

Please sign in to comment.