diff --git a/internal/rootcoord/alter_alias_task.go b/internal/rootcoord/alter_alias_task.go index 61abe8437c4f8..ac2f79acbf8a5 100644 --- a/internal/rootcoord/alter_alias_task.go +++ b/internal/rootcoord/alter_alias_task.go @@ -43,3 +43,11 @@ func (t *alterAliasTask) Execute(ctx context.Context) error { // alter alias is atomic enough. return t.core.meta.AlterAlias(ctx, t.Req.GetDbName(), t.Req.GetAlias(), t.Req.GetCollectionName(), t.GetTs()) } + +func (t *alterAliasTask) GetLockerKey() LockerKey { + return NewLockerKeyChain( + NewClusterLockerKey(false), + NewDatabaseLockerKey(t.Req.GetDbName(), false), + NewCollectionLockerKey(t.Req.GetCollectionName(), true), + ) +} diff --git a/internal/rootcoord/alter_collection_task.go b/internal/rootcoord/alter_collection_task.go index 8fd05e11cb8a8..9d7381c7f5668 100644 --- a/internal/rootcoord/alter_collection_task.go +++ b/internal/rootcoord/alter_collection_task.go @@ -124,3 +124,12 @@ func (a *alterCollectionTask) Execute(ctx context.Context) error { return redoTask.Execute(ctx) } + +func (a *alterCollectionTask) GetLockerKey() LockerKey { + collectionName := a.core.getRealCollectionName(a.ctx, a.Req.GetDbName(), a.Req.GetCollectionName()) + return NewLockerKeyChain( + NewClusterLockerKey(false), + NewDatabaseLockerKey(a.Req.GetDbName(), false), + NewCollectionLockerKey(collectionName, true), + ) +} diff --git a/internal/rootcoord/alter_database_task.go b/internal/rootcoord/alter_database_task.go index 292ba8bf3c8db..f87ec79d177dc 100644 --- a/internal/rootcoord/alter_database_task.go +++ b/internal/rootcoord/alter_database_task.go @@ -118,6 +118,13 @@ func (a *alterDatabaseTask) Execute(ctx context.Context) error { return redoTask.Execute(ctx) } +func (a *alterDatabaseTask) GetLockerKey() LockerKey { + return NewLockerKeyChain( + NewClusterLockerKey(false), + NewDatabaseLockerKey(a.Req.GetDbName(), true), + ) +} + func MergeProperties(oldProps []*commonpb.KeyValuePair, updatedProps []*commonpb.KeyValuePair) []*commonpb.KeyValuePair { props := make(map[string]string) for _, prop := range oldProps { diff --git a/internal/rootcoord/create_alias_task.go b/internal/rootcoord/create_alias_task.go index 0f3327a022f57..5675704703255 100644 --- a/internal/rootcoord/create_alias_task.go +++ b/internal/rootcoord/create_alias_task.go @@ -39,3 +39,11 @@ func (t *createAliasTask) Execute(ctx context.Context) error { // create alias is atomic enough. return t.core.meta.CreateAlias(ctx, t.Req.GetDbName(), t.Req.GetAlias(), t.Req.GetCollectionName(), t.GetTs()) } + +func (t *createAliasTask) GetLockerKey() LockerKey { + return NewLockerKeyChain( + NewClusterLockerKey(false), + NewDatabaseLockerKey(t.Req.GetDbName(), false), + NewCollectionLockerKey(t.Req.GetCollectionName(), true), + ) +} diff --git a/internal/rootcoord/create_collection_task.go b/internal/rootcoord/create_collection_task.go index 2621b02c6433f..9c83bc7018f51 100644 --- a/internal/rootcoord/create_collection_task.go +++ b/internal/rootcoord/create_collection_task.go @@ -658,3 +658,10 @@ func (t *createCollectionTask) Execute(ctx context.Context) error { return undoTask.Execute(ctx) } + +func (t *createCollectionTask) GetLockerKey() LockerKey { + return NewLockerKeyChain( + NewClusterLockerKey(false), + NewDatabaseLockerKey(t.Req.GetDbName(), true), + ) +} diff --git a/internal/rootcoord/create_db_task.go b/internal/rootcoord/create_db_task.go index 31de0c5f5afc5..35f1b67ddcf50 100644 --- a/internal/rootcoord/create_db_task.go +++ b/internal/rootcoord/create_db_task.go @@ -53,3 +53,7 @@ func (t *createDatabaseTask) Execute(ctx context.Context) error { db := model.NewDatabase(t.dbID, t.Req.GetDbName(), etcdpb.DatabaseState_DatabaseCreated, t.Req.GetProperties()) return t.core.meta.CreateDatabase(ctx, db, t.GetTs()) } + +func (t *createDatabaseTask) GetLockerKey() LockerKey { + return NewLockerKeyChain(NewClusterLockerKey(true)) +} diff --git a/internal/rootcoord/create_partition_task.go b/internal/rootcoord/create_partition_task.go index 76a5c7a718359..7e3fe097aa935 100644 --- a/internal/rootcoord/create_partition_task.go +++ b/internal/rootcoord/create_partition_task.go @@ -128,3 +128,12 @@ func (t *createPartitionTask) Execute(ctx context.Context) error { return undoTask.Execute(ctx) } + +func (t *createPartitionTask) GetLockerKey() LockerKey { + collectionName := t.core.getRealCollectionName(t.ctx, t.Req.GetDbName(), t.Req.GetCollectionName()) + return NewLockerKeyChain( + NewClusterLockerKey(false), + NewDatabaseLockerKey(t.Req.GetDbName(), false), + NewCollectionLockerKey(collectionName, true), + ) +} diff --git a/internal/rootcoord/describe_collection_task.go b/internal/rootcoord/describe_collection_task.go index 8a9da97a9f1ee..cc49a6cef5b88 100644 --- a/internal/rootcoord/describe_collection_task.go +++ b/internal/rootcoord/describe_collection_task.go @@ -53,3 +53,12 @@ func (t *describeCollectionTask) Execute(ctx context.Context) (err error) { t.Rsp = convertModelToDesc(coll, aliases, db.Name) return nil } + +func (t *describeCollectionTask) GetLockerKey() LockerKey { + collectionName := t.core.getRealCollectionName(t.ctx, t.Req.GetDbName(), t.Req.GetCollectionName()) + return NewLockerKeyChain( + NewClusterLockerKey(false), + NewDatabaseLockerKey(t.Req.GetDbName(), false), + NewCollectionLockerKey(collectionName, false), + ) +} diff --git a/internal/rootcoord/describe_db_task.go b/internal/rootcoord/describe_db_task.go index 603d1a46be90b..d711316b95238 100644 --- a/internal/rootcoord/describe_db_task.go +++ b/internal/rootcoord/describe_db_task.go @@ -55,3 +55,10 @@ func (t *describeDBTask) Execute(ctx context.Context) (err error) { } return nil } + +func (t *describeDBTask) GetLockerKey() LockerKey { + return NewLockerKeyChain( + NewClusterLockerKey(false), + NewDatabaseLockerKey(t.Req.GetDbName(), false), + ) +} diff --git a/internal/rootcoord/drop_alias_task.go b/internal/rootcoord/drop_alias_task.go index 28caceafc9eca..4f9fcbfe644a9 100644 --- a/internal/rootcoord/drop_alias_task.go +++ b/internal/rootcoord/drop_alias_task.go @@ -43,3 +43,12 @@ func (t *dropAliasTask) Execute(ctx context.Context) error { } return t.core.meta.DropAlias(ctx, t.Req.GetDbName(), t.Req.GetAlias(), t.GetTs()) } + +func (t *dropAliasTask) GetLockerKey() LockerKey { + collectionName := t.core.getRealCollectionName(t.ctx, t.Req.GetDbName(), t.Req.GetAlias()) + return NewLockerKeyChain( + NewClusterLockerKey(false), + NewDatabaseLockerKey(t.Req.GetDbName(), false), + NewCollectionLockerKey(collectionName, true), + ) +} diff --git a/internal/rootcoord/drop_collection_task.go b/internal/rootcoord/drop_collection_task.go index ce458e5cff059..4d443a1ad3ad7 100644 --- a/internal/rootcoord/drop_collection_task.go +++ b/internal/rootcoord/drop_collection_task.go @@ -119,3 +119,7 @@ func (t *dropCollectionTask) Execute(ctx context.Context) error { return redoTask.Execute(ctx) } + +func (t *dropCollectionTask) GetLockerKey() LockerKey { + return NewLockerKeyChain(NewClusterLockerKey(false), NewDatabaseLockerKey(t.Req.GetDbName(), true)) +} diff --git a/internal/rootcoord/drop_db_task.go b/internal/rootcoord/drop_db_task.go index 15096c4e3057c..bdc1cc035db32 100644 --- a/internal/rootcoord/drop_db_task.go +++ b/internal/rootcoord/drop_db_task.go @@ -47,7 +47,7 @@ func (t *dropDatabaseTask) Execute(ctx context.Context) error { databaseName: dbName, ts: ts, }) - redoTask.AddAsyncStep(&expireCacheStep{ + redoTask.AddSyncStep(&expireCacheStep{ baseStep: baseStep{core: t.core}, dbName: dbName, ts: ts, @@ -60,3 +60,7 @@ func (t *dropDatabaseTask) Execute(ctx context.Context) error { }) return redoTask.Execute(ctx) } + +func (t *dropDatabaseTask) GetLockerKey() LockerKey { + return NewLockerKeyChain(NewClusterLockerKey(true)) +} diff --git a/internal/rootcoord/drop_partition_task.go b/internal/rootcoord/drop_partition_task.go index cc45422db7f7e..c65c91f9c1099 100644 --- a/internal/rootcoord/drop_partition_task.go +++ b/internal/rootcoord/drop_partition_task.go @@ -111,3 +111,12 @@ func (t *dropPartitionTask) Execute(ctx context.Context) error { return redoTask.Execute(ctx) } + +func (t *dropPartitionTask) GetLockerKey() LockerKey { + collectionName := t.core.getRealCollectionName(t.ctx, t.Req.GetDbName(), t.Req.GetCollectionName()) + return NewLockerKeyChain( + NewClusterLockerKey(false), + NewDatabaseLockerKey(t.Req.GetDbName(), false), + NewCollectionLockerKey(collectionName, true), + ) +} diff --git a/internal/rootcoord/has_collection_task.go b/internal/rootcoord/has_collection_task.go index d9258a8f19607..52a4625ef1012 100644 --- a/internal/rootcoord/has_collection_task.go +++ b/internal/rootcoord/has_collection_task.go @@ -47,3 +47,10 @@ func (t *hasCollectionTask) Execute(ctx context.Context) error { t.Rsp.Value = err == nil return nil } + +func (t *hasCollectionTask) GetLockerKey() LockerKey { + return NewLockerKeyChain( + NewClusterLockerKey(false), + NewDatabaseLockerKey(t.Req.GetDbName(), false), + ) +} diff --git a/internal/rootcoord/has_partition_task.go b/internal/rootcoord/has_partition_task.go index 77ef717b47c84..1e9f32da740b4 100644 --- a/internal/rootcoord/has_partition_task.go +++ b/internal/rootcoord/has_partition_task.go @@ -57,3 +57,12 @@ func (t *hasPartitionTask) Execute(ctx context.Context) error { } return nil } + +func (t *hasPartitionTask) GetLockerKey() LockerKey { + collectionName := t.core.getRealCollectionName(t.ctx, t.Req.GetDbName(), t.Req.GetCollectionName()) + return NewLockerKeyChain( + NewClusterLockerKey(false), + NewDatabaseLockerKey(t.Req.GetDbName(), false), + NewCollectionLockerKey(collectionName, false), + ) +} diff --git a/internal/rootcoord/list_db_task.go b/internal/rootcoord/list_db_task.go index 1b4e81a79519d..847b5928c5747 100644 --- a/internal/rootcoord/list_db_task.go +++ b/internal/rootcoord/list_db_task.go @@ -124,3 +124,7 @@ func (t *listDatabaseTask) Execute(ctx context.Context) error { t.Resp.CreatedTimestamp = createdTimes return nil } + +func (t *listDatabaseTask) GetLockerKey() LockerKey { + return NewLockerKeyChain(NewClusterLockerKey(false)) +} diff --git a/internal/rootcoord/rename_collection_task.go b/internal/rootcoord/rename_collection_task.go index 50bdc61713693..cdceaab79ba1b 100644 --- a/internal/rootcoord/rename_collection_task.go +++ b/internal/rootcoord/rename_collection_task.go @@ -42,3 +42,10 @@ func (t *renameCollectionTask) Execute(ctx context.Context) error { } return t.core.meta.RenameCollection(ctx, t.Req.GetDbName(), t.Req.GetOldName(), t.Req.GetNewDBName(), t.Req.GetNewName(), t.GetTs()) } + +func (t *renameCollectionTask) GetLockerKey() LockerKey { + return NewLockerKeyChain( + NewClusterLockerKey(false), + NewDatabaseLockerKey(t.Req.GetDbName(), true), + ) +} diff --git a/internal/rootcoord/root_coord.go b/internal/rootcoord/root_coord.go index ea267b577e590..ad6ef5fc597f2 100644 --- a/internal/rootcoord/root_coord.go +++ b/internal/rootcoord/root_coord.go @@ -1118,6 +1118,15 @@ func (c *Core) HasCollection(ctx context.Context, in *milvuspb.HasCollectionRequ return t.Rsp, nil } +// getRealCollectionName get origin collection name to avoid the alias name +func (c *Core) getRealCollectionName(ctx context.Context, db, collection string) string { + realName, err := c.meta.DescribeAlias(ctx, db, collection, 0) + if err != nil { + return collection + } + return realName +} + func (c *Core) describeCollection(ctx context.Context, in *milvuspb.DescribeCollectionRequest, allowUnavailable bool) (*model.Collection, error) { ts := getTravelTs(in) if in.GetCollectionName() != "" { diff --git a/internal/rootcoord/scheduler.go b/internal/rootcoord/scheduler.go index 1b3f8ab139e99..1f5e3d0e478ec 100644 --- a/internal/rootcoord/scheduler.go +++ b/internal/rootcoord/scheduler.go @@ -27,6 +27,7 @@ import ( "github.com/milvus-io/milvus/internal/allocator" "github.com/milvus-io/milvus/internal/tso" "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/util/lock" ) type IScheduler interface { @@ -48,21 +49,34 @@ type scheduler struct { lock sync.Mutex - minDdlTs atomic.Uint64 + minDdlTs atomic.Uint64 + clusterLock *lock.KeyLock[string] + databaseLock *lock.KeyLock[string] + collectionLock *lock.KeyLock[string] + lockMapping map[LockLevel]*lock.KeyLock[string] } func newScheduler(ctx context.Context, idAllocator allocator.Interface, tsoAllocator tso.Allocator) *scheduler { ctx1, cancel := context.WithCancel(ctx) // TODO n := 1024 * 10 - return &scheduler{ - ctx: ctx1, - cancel: cancel, - idAllocator: idAllocator, - tsoAllocator: tsoAllocator, - taskChan: make(chan task, n), - minDdlTs: *atomic.NewUint64(0), + s := &scheduler{ + ctx: ctx1, + cancel: cancel, + idAllocator: idAllocator, + tsoAllocator: tsoAllocator, + taskChan: make(chan task, n), + minDdlTs: *atomic.NewUint64(0), + clusterLock: lock.NewKeyLock[string](), + databaseLock: lock.NewKeyLock[string](), + collectionLock: lock.NewKeyLock[string](), } + s.lockMapping = map[LockLevel]*lock.KeyLock[string]{ + ClusterLock: s.clusterLock, + DatabaseLock: s.databaseLock, + CollectionLock: s.collectionLock, + } + return s } func (s *scheduler) Start() { @@ -147,6 +161,13 @@ func (s *scheduler) enqueue(task task) { } func (s *scheduler) AddTask(task task) error { + if Params.RootCoordCfg.UseLockScheduler.GetAsBool() { + lockKey := task.GetLockerKey() + if lockKey != nil { + return s.executeTaskWithLock(task, lockKey) + } + } + // make sure that setting ts and enqueue is atomic. s.lock.Lock() defer s.lock.Unlock() @@ -168,3 +189,25 @@ func (s *scheduler) GetMinDdlTs() Timestamp { func (s *scheduler) setMinDdlTs(ts Timestamp) { s.minDdlTs.Store(ts) } + +func (s *scheduler) executeTaskWithLock(task task, lockerKey LockerKey) error { + if lockerKey == nil { + if err := s.setID(task); err != nil { + return err + } + if err := s.setTs(task); err != nil { + return err + } + s.execute(task) + return nil + } + taskLock := s.lockMapping[lockerKey.Level()] + if lockerKey.IsWLock() { + taskLock.Lock(lockerKey.LockKey()) + defer taskLock.Unlock(lockerKey.LockKey()) + } else { + taskLock.RLock(lockerKey.LockKey()) + defer taskLock.RUnlock(lockerKey.LockKey()) + } + return s.executeTaskWithLock(task, lockerKey.Next()) +} diff --git a/internal/rootcoord/show_collection_task.go b/internal/rootcoord/show_collection_task.go index 090d4ada5b561..ab853266bbc36 100644 --- a/internal/rootcoord/show_collection_task.go +++ b/internal/rootcoord/show_collection_task.go @@ -147,3 +147,7 @@ func (t *showCollectionTask) Execute(ctx context.Context) error { } return nil } + +func (t *showCollectionTask) GetLockerKey() LockerKey { + return NewLockerKeyChain(NewClusterLockerKey(false), NewDatabaseLockerKey(t.Req.GetDbName(), false)) +} diff --git a/internal/rootcoord/show_partition_task.go b/internal/rootcoord/show_partition_task.go index 5e1f8214b4824..f023c60f60016 100644 --- a/internal/rootcoord/show_partition_task.go +++ b/internal/rootcoord/show_partition_task.go @@ -67,3 +67,12 @@ func (t *showPartitionTask) Execute(ctx context.Context) error { return nil } + +func (t *showPartitionTask) GetLockerKey() LockerKey { + collectionName := t.core.getRealCollectionName(t.ctx, t.Req.GetDbName(), t.Req.GetCollectionName()) + return NewLockerKeyChain( + NewClusterLockerKey(false), + NewDatabaseLockerKey(t.Req.GetDbName(), false), + NewCollectionLockerKey(collectionName, false), + ) +} diff --git a/internal/rootcoord/task.go b/internal/rootcoord/task.go index bbabf12bb6f57..ba0ff8cd78fd8 100644 --- a/internal/rootcoord/task.go +++ b/internal/rootcoord/task.go @@ -20,9 +20,27 @@ import ( "context" "time" + "go.uber.org/zap" + + "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/timerecord" ) +type LockLevel int + +const ( + ClusterLock LockLevel = iota + DatabaseLock + CollectionLock +) + +type LockerKey interface { + LockKey() string + Level() LockLevel + IsWLock() bool + Next() LockerKey +} + type task interface { GetCtx() context.Context SetCtx(context.Context) @@ -35,6 +53,7 @@ type task interface { WaitToFinish() error NotifyDone(err error) SetInQueueDuration() + GetLockerKey() LockerKey } type baseTask struct { @@ -101,3 +120,73 @@ func (b *baseTask) NotifyDone(err error) { func (b *baseTask) SetInQueueDuration() { b.queueDur = b.tr.ElapseSpan() } + +func (b *baseTask) GetLockerKey() LockerKey { + return nil +} + +type taskLockerKey struct { + key string + rw bool + level LockLevel + next LockerKey +} + +func (t *taskLockerKey) LockKey() string { + return t.key +} + +func (t *taskLockerKey) Level() LockLevel { + return t.level +} + +func (t *taskLockerKey) IsWLock() bool { + return t.rw +} + +func (t *taskLockerKey) Next() LockerKey { + return t.next +} + +func NewClusterLockerKey(rw bool) LockerKey { + return &taskLockerKey{ + key: "$", + rw: true, + level: ClusterLock, + } +} + +func NewDatabaseLockerKey(db string, rw bool) LockerKey { + return &taskLockerKey{ + key: db, + rw: rw, + level: DatabaseLock, + } +} + +func NewCollectionLockerKey(collection string, rw bool) LockerKey { + return &taskLockerKey{ + key: collection, + rw: rw, + level: CollectionLock, + } +} + +func NewLockerKeyChain(lockerKeys ...LockerKey) LockerKey { + if len(lockerKeys) == 0 { + return nil + } + if lockerKeys[0].Level() != ClusterLock { + log.Warn("Invalid locker key chain", zap.Stack("stack")) + return nil + } + + for i := 0; i < len(lockerKeys)-1; i++ { + if lockerKeys[i].Level() >= lockerKeys[i+1].Level() { + log.Warn("Invalid locker key chain", zap.Stack("stack")) + return nil + } + lockerKeys[i].(*taskLockerKey).next = lockerKeys[i+1] + } + return lockerKeys[0] +} diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index 9e0ff8bdc30a4..d7137f0061184 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -1117,6 +1117,7 @@ type rootCoordConfig struct { MaxDatabaseNum ParamItem `refreshable:"false"` MaxGeneralCapacity ParamItem `refreshable:"true"` GracefulStopTimeout ParamItem `refreshable:"true"` + UseLockScheduler ParamItem `refreshable:"true"` } func (p *rootCoordConfig) init(base *BaseTable) { @@ -1191,6 +1192,15 @@ Segments with smaller size than this parameter will not be indexed, and will be Export: true, } p.GracefulStopTimeout.Init(base.mgr) + + p.UseLockScheduler = ParamItem{ + Key: "rootCoord.useLockScheduler", + Version: "2.4.15", + DefaultValue: "false", + Doc: "use lock to schedule the task", + Export: false, + } + p.UseLockScheduler.Init(base.mgr) } // /////////////////////////////////////////////////////////////////////////////