Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enhance: improve rootcoord task scheduling policy #37352

Merged
merged 1 commit into from
Nov 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -627,8 +627,6 @@ github.com/milvus-io/cgosymbolizer v0.0.0-20240722103217-b7dee0e50119 h1:9VXijWu
github.com/milvus-io/cgosymbolizer v0.0.0-20240722103217-b7dee0e50119/go.mod h1:DvXTE/K/RtHehxU8/GtDs4vFtfw64jJ3PaCnFri8CRg=
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b h1:TfeY0NxYxZzUfIfYe5qYDBzt4ZYRqzUjTR6CvUzjat8=
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b/go.mod h1:iwW+9cWfIzzDseEBCCeDSN5SD16Tidvy8cwQ7ZY8Qj4=
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20240930043709-0c23514e4c34 h1:Fwxpg98128gfWRbQ1A3PMP9o2IfYZk7RSEy8rcoCWDA=
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20240930043709-0c23514e4c34/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20241025031121-4d5c88b00cf7 h1:HwAitQk+V59QdYUwwVVYHTujd4QZrebg2Cc2hmcjhAg=
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20241025031121-4d5c88b00cf7/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
github.com/milvus-io/pulsar-client-go v0.12.1 h1:O2JZp1tsYiO7C0MQ4hrUY/aJXnn2Gry6hpm7UodghmE=
Expand Down
8 changes: 8 additions & 0 deletions internal/rootcoord/alter_alias_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,11 @@ func (t *alterAliasTask) Execute(ctx context.Context) error {
// alter alias is atomic enough.
return t.core.meta.AlterAlias(ctx, t.Req.GetDbName(), t.Req.GetAlias(), t.Req.GetCollectionName(), t.GetTs())
}

func (t *alterAliasTask) GetLockerKey() LockerKey {
return NewLockerKeyChain(
NewClusterLockerKey(false),
NewDatabaseLockerKey(t.Req.GetDbName(), false),
NewCollectionLockerKey(t.Req.GetCollectionName(), true),
)
}
9 changes: 9 additions & 0 deletions internal/rootcoord/alter_collection_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,3 +124,12 @@ func (a *alterCollectionTask) Execute(ctx context.Context) error {

return redoTask.Execute(ctx)
}

func (a *alterCollectionTask) GetLockerKey() LockerKey {
collectionName := a.core.getRealCollectionName(a.ctx, a.Req.GetDbName(), a.Req.GetCollectionName())
return NewLockerKeyChain(
NewClusterLockerKey(false),
NewDatabaseLockerKey(a.Req.GetDbName(), false),
NewCollectionLockerKey(collectionName, true),
)
}
7 changes: 7 additions & 0 deletions internal/rootcoord/alter_database_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,13 @@ func (a *alterDatabaseTask) Execute(ctx context.Context) error {
return redoTask.Execute(ctx)
}

func (a *alterDatabaseTask) GetLockerKey() LockerKey {
return NewLockerKeyChain(
NewClusterLockerKey(false),
NewDatabaseLockerKey(a.Req.GetDbName(), true),
)
}

func MergeProperties(oldProps []*commonpb.KeyValuePair, updatedProps []*commonpb.KeyValuePair) []*commonpb.KeyValuePair {
props := make(map[string]string)
for _, prop := range oldProps {
Expand Down
8 changes: 8 additions & 0 deletions internal/rootcoord/create_alias_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,11 @@ func (t *createAliasTask) Execute(ctx context.Context) error {
// create alias is atomic enough.
return t.core.meta.CreateAlias(ctx, t.Req.GetDbName(), t.Req.GetAlias(), t.Req.GetCollectionName(), t.GetTs())
}

func (t *createAliasTask) GetLockerKey() LockerKey {
return NewLockerKeyChain(
NewClusterLockerKey(false),
NewDatabaseLockerKey(t.Req.GetDbName(), false),
NewCollectionLockerKey(t.Req.GetCollectionName(), true),
)
}
7 changes: 7 additions & 0 deletions internal/rootcoord/create_collection_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -658,3 +658,10 @@ func (t *createCollectionTask) Execute(ctx context.Context) error {

return undoTask.Execute(ctx)
}

func (t *createCollectionTask) GetLockerKey() LockerKey {
return NewLockerKeyChain(
NewClusterLockerKey(false),
NewDatabaseLockerKey(t.Req.GetDbName(), true),
SimFG marked this conversation as resolved.
Show resolved Hide resolved
)
}
4 changes: 4 additions & 0 deletions internal/rootcoord/create_db_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,7 @@ func (t *createDatabaseTask) Execute(ctx context.Context) error {
db := model.NewDatabase(t.dbID, t.Req.GetDbName(), etcdpb.DatabaseState_DatabaseCreated, t.Req.GetProperties())
return t.core.meta.CreateDatabase(ctx, db, t.GetTs())
}

func (t *createDatabaseTask) GetLockerKey() LockerKey {
return NewLockerKeyChain(NewClusterLockerKey(true))
}
9 changes: 9 additions & 0 deletions internal/rootcoord/create_partition_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,3 +128,12 @@ func (t *createPartitionTask) Execute(ctx context.Context) error {

return undoTask.Execute(ctx)
}

func (t *createPartitionTask) GetLockerKey() LockerKey {
collectionName := t.core.getRealCollectionName(t.ctx, t.Req.GetDbName(), t.Req.GetCollectionName())
return NewLockerKeyChain(
NewClusterLockerKey(false),
NewDatabaseLockerKey(t.Req.GetDbName(), false),
NewCollectionLockerKey(collectionName, true),
)
}
9 changes: 9 additions & 0 deletions internal/rootcoord/describe_collection_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,12 @@ func (t *describeCollectionTask) Execute(ctx context.Context) (err error) {
t.Rsp = convertModelToDesc(coll, aliases, db.Name)
return nil
}

func (t *describeCollectionTask) GetLockerKey() LockerKey {
collectionName := t.core.getRealCollectionName(t.ctx, t.Req.GetDbName(), t.Req.GetCollectionName())
return NewLockerKeyChain(
NewClusterLockerKey(false),
NewDatabaseLockerKey(t.Req.GetDbName(), false),
NewCollectionLockerKey(collectionName, false),
)
}
7 changes: 7 additions & 0 deletions internal/rootcoord/describe_db_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,10 @@ func (t *describeDBTask) Execute(ctx context.Context) (err error) {
}
return nil
}

func (t *describeDBTask) GetLockerKey() LockerKey {
return NewLockerKeyChain(
NewClusterLockerKey(false),
NewDatabaseLockerKey(t.Req.GetDbName(), false),
)
}
9 changes: 9 additions & 0 deletions internal/rootcoord/drop_alias_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,12 @@ func (t *dropAliasTask) Execute(ctx context.Context) error {
}
return t.core.meta.DropAlias(ctx, t.Req.GetDbName(), t.Req.GetAlias(), t.GetTs())
}

func (t *dropAliasTask) GetLockerKey() LockerKey {
collectionName := t.core.getRealCollectionName(t.ctx, t.Req.GetDbName(), t.Req.GetAlias())
return NewLockerKeyChain(
NewClusterLockerKey(false),
NewDatabaseLockerKey(t.Req.GetDbName(), false),
NewCollectionLockerKey(collectionName, true),
)
}
4 changes: 4 additions & 0 deletions internal/rootcoord/drop_collection_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,3 +119,7 @@ func (t *dropCollectionTask) Execute(ctx context.Context) error {

return redoTask.Execute(ctx)
}

func (t *dropCollectionTask) GetLockerKey() LockerKey {
return NewLockerKeyChain(NewClusterLockerKey(false), NewDatabaseLockerKey(t.Req.GetDbName(), true))
}
6 changes: 5 additions & 1 deletion internal/rootcoord/drop_db_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (t *dropDatabaseTask) Execute(ctx context.Context) error {
databaseName: dbName,
ts: ts,
})
redoTask.AddAsyncStep(&expireCacheStep{
redoTask.AddSyncStep(&expireCacheStep{
baseStep: baseStep{core: t.core},
dbName: dbName,
ts: ts,
Expand All @@ -60,3 +60,7 @@ func (t *dropDatabaseTask) Execute(ctx context.Context) error {
})
return redoTask.Execute(ctx)
}

func (t *dropDatabaseTask) GetLockerKey() LockerKey {
return NewLockerKeyChain(NewClusterLockerKey(true))
}
9 changes: 9 additions & 0 deletions internal/rootcoord/drop_partition_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,3 +111,12 @@ func (t *dropPartitionTask) Execute(ctx context.Context) error {

return redoTask.Execute(ctx)
}

func (t *dropPartitionTask) GetLockerKey() LockerKey {
collectionName := t.core.getRealCollectionName(t.ctx, t.Req.GetDbName(), t.Req.GetCollectionName())
return NewLockerKeyChain(
NewClusterLockerKey(false),
NewDatabaseLockerKey(t.Req.GetDbName(), false),
NewCollectionLockerKey(collectionName, true),
)
}
7 changes: 7 additions & 0 deletions internal/rootcoord/has_collection_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,10 @@ func (t *hasCollectionTask) Execute(ctx context.Context) error {
t.Rsp.Value = err == nil
return nil
}

func (t *hasCollectionTask) GetLockerKey() LockerKey {
return NewLockerKeyChain(
NewClusterLockerKey(false),
NewDatabaseLockerKey(t.Req.GetDbName(), false),
)
}
9 changes: 9 additions & 0 deletions internal/rootcoord/has_partition_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,12 @@ func (t *hasPartitionTask) Execute(ctx context.Context) error {
}
return nil
}

func (t *hasPartitionTask) GetLockerKey() LockerKey {
collectionName := t.core.getRealCollectionName(t.ctx, t.Req.GetDbName(), t.Req.GetCollectionName())
return NewLockerKeyChain(
NewClusterLockerKey(false),
NewDatabaseLockerKey(t.Req.GetDbName(), false),
NewCollectionLockerKey(collectionName, false),
)
}
4 changes: 4 additions & 0 deletions internal/rootcoord/list_db_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,3 +124,7 @@ func (t *listDatabaseTask) Execute(ctx context.Context) error {
t.Resp.CreatedTimestamp = createdTimes
return nil
}

func (t *listDatabaseTask) GetLockerKey() LockerKey {
return NewLockerKeyChain(NewClusterLockerKey(false))
}
6 changes: 6 additions & 0 deletions internal/rootcoord/rename_collection_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,9 @@ func (t *renameCollectionTask) Execute(ctx context.Context) error {
}
return t.core.meta.RenameCollection(ctx, t.Req.GetDbName(), t.Req.GetOldName(), t.Req.GetNewDBName(), t.Req.GetNewName(), t.GetTs())
}

func (t *renameCollectionTask) GetLockerKey() LockerKey {
return NewLockerKeyChain(
NewClusterLockerKey(true),
)
}
9 changes: 9 additions & 0 deletions internal/rootcoord/root_coord.go
Original file line number Diff line number Diff line change
Expand Up @@ -1118,6 +1118,15 @@ func (c *Core) HasCollection(ctx context.Context, in *milvuspb.HasCollectionRequ
return t.Rsp, nil
}

// getRealCollectionName get origin collection name to avoid the alias name
func (c *Core) getRealCollectionName(ctx context.Context, db, collection string) string {
realName, err := c.meta.DescribeAlias(ctx, db, collection, 0)
if err != nil {
return collection
}
return realName
}

func (c *Core) describeCollection(ctx context.Context, in *milvuspb.DescribeCollectionRequest, allowUnavailable bool) (*model.Collection, error) {
ts := getTravelTs(in)
if in.GetCollectionName() != "" {
Expand Down
59 changes: 51 additions & 8 deletions internal/rootcoord/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
"github.com/milvus-io/milvus/internal/allocator"
"github.com/milvus-io/milvus/internal/tso"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/lock"
)

type IScheduler interface {
Expand All @@ -48,21 +49,34 @@

lock sync.Mutex

minDdlTs atomic.Uint64
minDdlTs atomic.Uint64
clusterLock *lock.KeyLock[string]
databaseLock *lock.KeyLock[string]
collectionLock *lock.KeyLock[string]
lockMapping map[LockLevel]*lock.KeyLock[string]
}

func newScheduler(ctx context.Context, idAllocator allocator.Interface, tsoAllocator tso.Allocator) *scheduler {
ctx1, cancel := context.WithCancel(ctx)
// TODO
n := 1024 * 10
return &scheduler{
ctx: ctx1,
cancel: cancel,
idAllocator: idAllocator,
tsoAllocator: tsoAllocator,
taskChan: make(chan task, n),
minDdlTs: *atomic.NewUint64(0),
s := &scheduler{
ctx: ctx1,
cancel: cancel,
idAllocator: idAllocator,
tsoAllocator: tsoAllocator,
taskChan: make(chan task, n),
minDdlTs: *atomic.NewUint64(0),
clusterLock: lock.NewKeyLock[string](),
databaseLock: lock.NewKeyLock[string](),
collectionLock: lock.NewKeyLock[string](),
}
s.lockMapping = map[LockLevel]*lock.KeyLock[string]{
ClusterLock: s.clusterLock,
DatabaseLock: s.databaseLock,
CollectionLock: s.collectionLock,
}
return s
}

func (s *scheduler) Start() {
Expand Down Expand Up @@ -147,6 +161,13 @@
}

func (s *scheduler) AddTask(task task) error {
if Params.RootCoordCfg.UseLockScheduler.GetAsBool() {
lockKey := task.GetLockerKey()
if lockKey != nil {
return s.executeTaskWithLock(task, lockKey)
}
}

// make sure that setting ts and enqueue is atomic.
s.lock.Lock()
defer s.lock.Unlock()
Expand All @@ -168,3 +189,25 @@
func (s *scheduler) setMinDdlTs(ts Timestamp) {
s.minDdlTs.Store(ts)
}

func (s *scheduler) executeTaskWithLock(task task, lockerKey LockerKey) error {
if lockerKey == nil {
if err := s.setID(task); err != nil {
return err
}

Check warning on line 197 in internal/rootcoord/scheduler.go

View check run for this annotation

Codecov / codecov/patch

internal/rootcoord/scheduler.go#L196-L197

Added lines #L196 - L197 were not covered by tests
if err := s.setTs(task); err != nil {
return err
}

Check warning on line 200 in internal/rootcoord/scheduler.go

View check run for this annotation

Codecov / codecov/patch

internal/rootcoord/scheduler.go#L199-L200

Added lines #L199 - L200 were not covered by tests
s.execute(task)
return nil
}
taskLock := s.lockMapping[lockerKey.Level()]
if lockerKey.IsWLock() {
taskLock.Lock(lockerKey.LockKey())
defer taskLock.Unlock(lockerKey.LockKey())
} else {
taskLock.RLock(lockerKey.LockKey())
defer taskLock.RUnlock(lockerKey.LockKey())
}
return s.executeTaskWithLock(task, lockerKey.Next())
}
Loading
Loading