Skip to content

Commit

Permalink
enhance: [2.4] improve rootcoord task scheduling policy (#37523)
Browse files Browse the repository at this point in the history
- issue: #30301
- pr: #37352

Signed-off-by: SimFG <[email protected]>
  • Loading branch information
SimFG authored Nov 8, 2024
1 parent a3c1fc1 commit 5c166a2
Show file tree
Hide file tree
Showing 25 changed files with 728 additions and 9 deletions.
8 changes: 8 additions & 0 deletions internal/rootcoord/alter_alias_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,11 @@ func (t *alterAliasTask) Execute(ctx context.Context) error {
// alter alias is atomic enough.
return t.core.meta.AlterAlias(ctx, t.Req.GetDbName(), t.Req.GetAlias(), t.Req.GetCollectionName(), t.GetTs())
}

func (t *alterAliasTask) GetLockerKey() LockerKey {
return NewLockerKeyChain(
NewClusterLockerKey(false),
NewDatabaseLockerKey(t.Req.GetDbName(), false),
NewCollectionLockerKey(t.Req.GetCollectionName(), true),
)
}
9 changes: 9 additions & 0 deletions internal/rootcoord/alter_collection_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,3 +124,12 @@ func (a *alterCollectionTask) Execute(ctx context.Context) error {

return redoTask.Execute(ctx)
}

func (a *alterCollectionTask) GetLockerKey() LockerKey {
collectionName := a.core.getRealCollectionName(a.ctx, a.Req.GetDbName(), a.Req.GetCollectionName())
return NewLockerKeyChain(
NewClusterLockerKey(false),
NewDatabaseLockerKey(a.Req.GetDbName(), false),
NewCollectionLockerKey(collectionName, true),
)
}
7 changes: 7 additions & 0 deletions internal/rootcoord/alter_database_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,13 @@ func (a *alterDatabaseTask) Execute(ctx context.Context) error {
return redoTask.Execute(ctx)
}

func (a *alterDatabaseTask) GetLockerKey() LockerKey {
return NewLockerKeyChain(
NewClusterLockerKey(false),
NewDatabaseLockerKey(a.Req.GetDbName(), true),
)
}

func MergeProperties(oldProps []*commonpb.KeyValuePair, updatedProps []*commonpb.KeyValuePair) []*commonpb.KeyValuePair {
props := make(map[string]string)
for _, prop := range oldProps {
Expand Down
8 changes: 8 additions & 0 deletions internal/rootcoord/create_alias_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,11 @@ func (t *createAliasTask) Execute(ctx context.Context) error {
// create alias is atomic enough.
return t.core.meta.CreateAlias(ctx, t.Req.GetDbName(), t.Req.GetAlias(), t.Req.GetCollectionName(), t.GetTs())
}

func (t *createAliasTask) GetLockerKey() LockerKey {
return NewLockerKeyChain(
NewClusterLockerKey(false),
NewDatabaseLockerKey(t.Req.GetDbName(), false),
NewCollectionLockerKey(t.Req.GetCollectionName(), true),
)
}
7 changes: 7 additions & 0 deletions internal/rootcoord/create_collection_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -563,3 +563,10 @@ func (t *createCollectionTask) Execute(ctx context.Context) error {

return undoTask.Execute(ctx)
}

func (t *createCollectionTask) GetLockerKey() LockerKey {
return NewLockerKeyChain(
NewClusterLockerKey(false),
NewDatabaseLockerKey(t.Req.GetDbName(), true),
)
}
4 changes: 4 additions & 0 deletions internal/rootcoord/create_db_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,7 @@ func (t *createDatabaseTask) Execute(ctx context.Context) error {
db := model.NewDatabase(t.dbID, t.Req.GetDbName(), etcdpb.DatabaseState_DatabaseCreated, t.Req.GetProperties())
return t.core.meta.CreateDatabase(ctx, db, t.GetTs())
}

func (t *createDatabaseTask) GetLockerKey() LockerKey {
return NewLockerKeyChain(NewClusterLockerKey(true))
}
9 changes: 9 additions & 0 deletions internal/rootcoord/create_partition_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,3 +118,12 @@ func (t *createPartitionTask) Execute(ctx context.Context) error {

return undoTask.Execute(ctx)
}

func (t *createPartitionTask) GetLockerKey() LockerKey {
collectionName := t.core.getRealCollectionName(t.ctx, t.Req.GetDbName(), t.Req.GetCollectionName())
return NewLockerKeyChain(
NewClusterLockerKey(false),
NewDatabaseLockerKey(t.Req.GetDbName(), false),
NewCollectionLockerKey(collectionName, true),
)
}
9 changes: 9 additions & 0 deletions internal/rootcoord/describe_collection_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,12 @@ func (t *describeCollectionTask) Execute(ctx context.Context) (err error) {
t.Rsp = convertModelToDesc(coll, aliases, db.Name)
return nil
}

func (t *describeCollectionTask) GetLockerKey() LockerKey {
collectionName := t.core.getRealCollectionName(t.ctx, t.Req.GetDbName(), t.Req.GetCollectionName())
return NewLockerKeyChain(
NewClusterLockerKey(false),
NewDatabaseLockerKey(t.Req.GetDbName(), false),
NewCollectionLockerKey(collectionName, false),
)
}
7 changes: 7 additions & 0 deletions internal/rootcoord/describe_db_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,10 @@ func (t *describeDBTask) Execute(ctx context.Context) (err error) {
}
return nil
}

func (t *describeDBTask) GetLockerKey() LockerKey {
return NewLockerKeyChain(
NewClusterLockerKey(false),
NewDatabaseLockerKey(t.Req.GetDbName(), false),
)
}
9 changes: 9 additions & 0 deletions internal/rootcoord/drop_alias_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,12 @@ func (t *dropAliasTask) Execute(ctx context.Context) error {
}
return t.core.meta.DropAlias(ctx, t.Req.GetDbName(), t.Req.GetAlias(), t.GetTs())
}

func (t *dropAliasTask) GetLockerKey() LockerKey {
collectionName := t.core.getRealCollectionName(t.ctx, t.Req.GetDbName(), t.Req.GetAlias())
return NewLockerKeyChain(
NewClusterLockerKey(false),
NewDatabaseLockerKey(t.Req.GetDbName(), false),
NewCollectionLockerKey(collectionName, true),
)
}
4 changes: 4 additions & 0 deletions internal/rootcoord/drop_collection_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,3 +119,7 @@ func (t *dropCollectionTask) Execute(ctx context.Context) error {

return redoTask.Execute(ctx)
}

func (t *dropCollectionTask) GetLockerKey() LockerKey {
return NewLockerKeyChain(NewClusterLockerKey(false), NewDatabaseLockerKey(t.Req.GetDbName(), true))
}
6 changes: 5 additions & 1 deletion internal/rootcoord/drop_db_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (t *dropDatabaseTask) Execute(ctx context.Context) error {
databaseName: dbName,
ts: ts,
})
redoTask.AddAsyncStep(&expireCacheStep{
redoTask.AddSyncStep(&expireCacheStep{
baseStep: baseStep{core: t.core},
dbName: dbName,
ts: ts,
Expand All @@ -60,3 +60,7 @@ func (t *dropDatabaseTask) Execute(ctx context.Context) error {
})
return redoTask.Execute(ctx)
}

func (t *dropDatabaseTask) GetLockerKey() LockerKey {
return NewLockerKeyChain(NewClusterLockerKey(true))
}
9 changes: 9 additions & 0 deletions internal/rootcoord/drop_partition_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,3 +110,12 @@ func (t *dropPartitionTask) Execute(ctx context.Context) error {

return redoTask.Execute(ctx)
}

func (t *dropPartitionTask) GetLockerKey() LockerKey {
collectionName := t.core.getRealCollectionName(t.ctx, t.Req.GetDbName(), t.Req.GetCollectionName())
return NewLockerKeyChain(
NewClusterLockerKey(false),
NewDatabaseLockerKey(t.Req.GetDbName(), false),
NewCollectionLockerKey(collectionName, true),
)
}
7 changes: 7 additions & 0 deletions internal/rootcoord/has_collection_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,10 @@ func (t *hasCollectionTask) Execute(ctx context.Context) error {
t.Rsp.Value = err == nil
return nil
}

func (t *hasCollectionTask) GetLockerKey() LockerKey {
return NewLockerKeyChain(
NewClusterLockerKey(false),
NewDatabaseLockerKey(t.Req.GetDbName(), false),
)
}
9 changes: 9 additions & 0 deletions internal/rootcoord/has_partition_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,12 @@ func (t *hasPartitionTask) Execute(ctx context.Context) error {
}
return nil
}

func (t *hasPartitionTask) GetLockerKey() LockerKey {
collectionName := t.core.getRealCollectionName(t.ctx, t.Req.GetDbName(), t.Req.GetCollectionName())
return NewLockerKeyChain(
NewClusterLockerKey(false),
NewDatabaseLockerKey(t.Req.GetDbName(), false),
NewCollectionLockerKey(collectionName, false),
)
}
4 changes: 4 additions & 0 deletions internal/rootcoord/list_db_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,3 +124,7 @@ func (t *listDatabaseTask) Execute(ctx context.Context) error {
t.Resp.CreatedTimestamp = createdTimes
return nil
}

func (t *listDatabaseTask) GetLockerKey() LockerKey {
return NewLockerKeyChain(NewClusterLockerKey(false))
}
6 changes: 6 additions & 0 deletions internal/rootcoord/rename_collection_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,9 @@ func (t *renameCollectionTask) Execute(ctx context.Context) error {
}
return t.core.meta.RenameCollection(ctx, t.Req.GetDbName(), t.Req.GetOldName(), t.Req.GetNewDBName(), t.Req.GetNewName(), t.GetTs())
}

func (t *renameCollectionTask) GetLockerKey() LockerKey {
return NewLockerKeyChain(
NewClusterLockerKey(true),
)
}
9 changes: 9 additions & 0 deletions internal/rootcoord/root_coord.go
Original file line number Diff line number Diff line change
Expand Up @@ -1101,6 +1101,15 @@ func (c *Core) HasCollection(ctx context.Context, in *milvuspb.HasCollectionRequ
return t.Rsp, nil
}

// getRealCollectionName get origin collection name to avoid the alias name
func (c *Core) getRealCollectionName(ctx context.Context, db, collection string) string {
realName, err := c.meta.DescribeAlias(ctx, db, collection, 0)
if err != nil {
return collection
}
return realName
}

func (c *Core) describeCollection(ctx context.Context, in *milvuspb.DescribeCollectionRequest, allowUnavailable bool) (*model.Collection, error) {
ts := getTravelTs(in)
if in.GetCollectionName() != "" {
Expand Down
59 changes: 51 additions & 8 deletions internal/rootcoord/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/milvus-io/milvus/internal/allocator"
"github.com/milvus-io/milvus/internal/tso"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/lock"
)

type IScheduler interface {
Expand All @@ -48,21 +49,34 @@ type scheduler struct {

lock sync.Mutex

minDdlTs atomic.Uint64
minDdlTs atomic.Uint64
clusterLock *lock.KeyLock[string]
databaseLock *lock.KeyLock[string]
collectionLock *lock.KeyLock[string]
lockMapping map[LockLevel]*lock.KeyLock[string]
}

func newScheduler(ctx context.Context, idAllocator allocator.Interface, tsoAllocator tso.Allocator) *scheduler {
ctx1, cancel := context.WithCancel(ctx)
// TODO
n := 1024 * 10
return &scheduler{
ctx: ctx1,
cancel: cancel,
idAllocator: idAllocator,
tsoAllocator: tsoAllocator,
taskChan: make(chan task, n),
minDdlTs: *atomic.NewUint64(0),
s := &scheduler{
ctx: ctx1,
cancel: cancel,
idAllocator: idAllocator,
tsoAllocator: tsoAllocator,
taskChan: make(chan task, n),
minDdlTs: *atomic.NewUint64(0),
clusterLock: lock.NewKeyLock[string](),
databaseLock: lock.NewKeyLock[string](),
collectionLock: lock.NewKeyLock[string](),
}
s.lockMapping = map[LockLevel]*lock.KeyLock[string]{
ClusterLock: s.clusterLock,
DatabaseLock: s.databaseLock,
CollectionLock: s.collectionLock,
}
return s
}

func (s *scheduler) Start() {
Expand Down Expand Up @@ -147,6 +161,13 @@ func (s *scheduler) enqueue(task task) {
}

func (s *scheduler) AddTask(task task) error {
if Params.RootCoordCfg.UseLockScheduler.GetAsBool() {
lockKey := task.GetLockerKey()
if lockKey != nil {
return s.executeTaskWithLock(task, lockKey)
}
}

// make sure that setting ts and enqueue is atomic.
s.lock.Lock()
defer s.lock.Unlock()
Expand All @@ -168,3 +189,25 @@ func (s *scheduler) GetMinDdlTs() Timestamp {
func (s *scheduler) setMinDdlTs(ts Timestamp) {
s.minDdlTs.Store(ts)
}

func (s *scheduler) executeTaskWithLock(task task, lockerKey LockerKey) error {
if lockerKey == nil {
if err := s.setID(task); err != nil {
return err
}
if err := s.setTs(task); err != nil {
return err
}
s.execute(task)
return nil
}
taskLock := s.lockMapping[lockerKey.Level()]
if lockerKey.IsWLock() {
taskLock.Lock(lockerKey.LockKey())
defer taskLock.Unlock(lockerKey.LockKey())
} else {
taskLock.RLock(lockerKey.LockKey())
defer taskLock.RUnlock(lockerKey.LockKey())
}
return s.executeTaskWithLock(task, lockerKey.Next())
}
Loading

0 comments on commit 5c166a2

Please sign in to comment.