From 1002beb096c76cd0673b9f04491e0454144953ef Mon Sep 17 00:00:00 2001 From: SimFG Date: Fri, 1 Nov 2024 12:31:29 +0800 Subject: [PATCH] enhance: improve rootcoord task scheduling policy Signed-off-by: SimFG --- go.sum | 2 - internal/rootcoord/alter_alias_task.go | 8 + internal/rootcoord/alter_collection_task.go | 9 + internal/rootcoord/alter_database_task.go | 7 + internal/rootcoord/create_alias_task.go | 8 + internal/rootcoord/create_collection_task.go | 7 + internal/rootcoord/create_db_task.go | 4 + internal/rootcoord/create_partition_task.go | 9 + .../rootcoord/describe_collection_task.go | 9 + internal/rootcoord/describe_db_task.go | 7 + internal/rootcoord/drop_alias_task.go | 9 + internal/rootcoord/drop_collection_task.go | 4 + internal/rootcoord/drop_db_task.go | 6 +- internal/rootcoord/drop_partition_task.go | 9 + internal/rootcoord/has_collection_task.go | 7 + internal/rootcoord/has_partition_task.go | 9 + internal/rootcoord/list_db_task.go | 4 + internal/rootcoord/rename_collection_task.go | 6 + internal/rootcoord/root_coord.go | 9 + internal/rootcoord/scheduler.go | 59 ++- internal/rootcoord/scheduler_test.go | 93 +++++ internal/rootcoord/show_collection_task.go | 4 + internal/rootcoord/show_partition_task.go | 9 + internal/rootcoord/task.go | 90 +++++ internal/rootcoord/task_test.go | 345 ++++++++++++++++++ pkg/util/paramtable/component_param.go | 10 + 26 files changed, 732 insertions(+), 11 deletions(-) create mode 100644 internal/rootcoord/task_test.go diff --git a/go.sum b/go.sum index 66de8bb5875c1..98ffb403f2456 100644 --- a/go.sum +++ b/go.sum @@ -627,8 +627,6 @@ github.com/milvus-io/cgosymbolizer v0.0.0-20240722103217-b7dee0e50119 h1:9VXijWu github.com/milvus-io/cgosymbolizer v0.0.0-20240722103217-b7dee0e50119/go.mod h1:DvXTE/K/RtHehxU8/GtDs4vFtfw64jJ3PaCnFri8CRg= github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b h1:TfeY0NxYxZzUfIfYe5qYDBzt4ZYRqzUjTR6CvUzjat8= github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b/go.mod h1:iwW+9cWfIzzDseEBCCeDSN5SD16Tidvy8cwQ7ZY8Qj4= -github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20240930043709-0c23514e4c34 h1:Fwxpg98128gfWRbQ1A3PMP9o2IfYZk7RSEy8rcoCWDA= -github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20240930043709-0c23514e4c34/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs= github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20241025031121-4d5c88b00cf7 h1:HwAitQk+V59QdYUwwVVYHTujd4QZrebg2Cc2hmcjhAg= github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20241025031121-4d5c88b00cf7/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs= github.com/milvus-io/pulsar-client-go v0.12.1 h1:O2JZp1tsYiO7C0MQ4hrUY/aJXnn2Gry6hpm7UodghmE= diff --git a/internal/rootcoord/alter_alias_task.go b/internal/rootcoord/alter_alias_task.go index 61abe8437c4f8..ac2f79acbf8a5 100644 --- a/internal/rootcoord/alter_alias_task.go +++ b/internal/rootcoord/alter_alias_task.go @@ -43,3 +43,11 @@ func (t *alterAliasTask) Execute(ctx context.Context) error { // alter alias is atomic enough. return t.core.meta.AlterAlias(ctx, t.Req.GetDbName(), t.Req.GetAlias(), t.Req.GetCollectionName(), t.GetTs()) } + +func (t *alterAliasTask) GetLockerKey() LockerKey { + return NewLockerKeyChain( + NewClusterLockerKey(false), + NewDatabaseLockerKey(t.Req.GetDbName(), false), + NewCollectionLockerKey(t.Req.GetCollectionName(), true), + ) +} diff --git a/internal/rootcoord/alter_collection_task.go b/internal/rootcoord/alter_collection_task.go index 8fd05e11cb8a8..9d7381c7f5668 100644 --- a/internal/rootcoord/alter_collection_task.go +++ b/internal/rootcoord/alter_collection_task.go @@ -124,3 +124,12 @@ func (a *alterCollectionTask) Execute(ctx context.Context) error { return redoTask.Execute(ctx) } + +func (a *alterCollectionTask) GetLockerKey() LockerKey { + collectionName := a.core.getRealCollectionName(a.ctx, a.Req.GetDbName(), a.Req.GetCollectionName()) + return NewLockerKeyChain( + NewClusterLockerKey(false), + NewDatabaseLockerKey(a.Req.GetDbName(), false), + NewCollectionLockerKey(collectionName, true), + ) +} diff --git a/internal/rootcoord/alter_database_task.go b/internal/rootcoord/alter_database_task.go index 292ba8bf3c8db..f87ec79d177dc 100644 --- a/internal/rootcoord/alter_database_task.go +++ b/internal/rootcoord/alter_database_task.go @@ -118,6 +118,13 @@ func (a *alterDatabaseTask) Execute(ctx context.Context) error { return redoTask.Execute(ctx) } +func (a *alterDatabaseTask) GetLockerKey() LockerKey { + return NewLockerKeyChain( + NewClusterLockerKey(false), + NewDatabaseLockerKey(a.Req.GetDbName(), true), + ) +} + func MergeProperties(oldProps []*commonpb.KeyValuePair, updatedProps []*commonpb.KeyValuePair) []*commonpb.KeyValuePair { props := make(map[string]string) for _, prop := range oldProps { diff --git a/internal/rootcoord/create_alias_task.go b/internal/rootcoord/create_alias_task.go index 0f3327a022f57..5675704703255 100644 --- a/internal/rootcoord/create_alias_task.go +++ b/internal/rootcoord/create_alias_task.go @@ -39,3 +39,11 @@ func (t *createAliasTask) Execute(ctx context.Context) error { // create alias is atomic enough. return t.core.meta.CreateAlias(ctx, t.Req.GetDbName(), t.Req.GetAlias(), t.Req.GetCollectionName(), t.GetTs()) } + +func (t *createAliasTask) GetLockerKey() LockerKey { + return NewLockerKeyChain( + NewClusterLockerKey(false), + NewDatabaseLockerKey(t.Req.GetDbName(), false), + NewCollectionLockerKey(t.Req.GetCollectionName(), true), + ) +} diff --git a/internal/rootcoord/create_collection_task.go b/internal/rootcoord/create_collection_task.go index 2621b02c6433f..9c83bc7018f51 100644 --- a/internal/rootcoord/create_collection_task.go +++ b/internal/rootcoord/create_collection_task.go @@ -658,3 +658,10 @@ func (t *createCollectionTask) Execute(ctx context.Context) error { return undoTask.Execute(ctx) } + +func (t *createCollectionTask) GetLockerKey() LockerKey { + return NewLockerKeyChain( + NewClusterLockerKey(false), + NewDatabaseLockerKey(t.Req.GetDbName(), true), + ) +} diff --git a/internal/rootcoord/create_db_task.go b/internal/rootcoord/create_db_task.go index 31de0c5f5afc5..35f1b67ddcf50 100644 --- a/internal/rootcoord/create_db_task.go +++ b/internal/rootcoord/create_db_task.go @@ -53,3 +53,7 @@ func (t *createDatabaseTask) Execute(ctx context.Context) error { db := model.NewDatabase(t.dbID, t.Req.GetDbName(), etcdpb.DatabaseState_DatabaseCreated, t.Req.GetProperties()) return t.core.meta.CreateDatabase(ctx, db, t.GetTs()) } + +func (t *createDatabaseTask) GetLockerKey() LockerKey { + return NewLockerKeyChain(NewClusterLockerKey(true)) +} diff --git a/internal/rootcoord/create_partition_task.go b/internal/rootcoord/create_partition_task.go index 76a5c7a718359..7e3fe097aa935 100644 --- a/internal/rootcoord/create_partition_task.go +++ b/internal/rootcoord/create_partition_task.go @@ -128,3 +128,12 @@ func (t *createPartitionTask) Execute(ctx context.Context) error { return undoTask.Execute(ctx) } + +func (t *createPartitionTask) GetLockerKey() LockerKey { + collectionName := t.core.getRealCollectionName(t.ctx, t.Req.GetDbName(), t.Req.GetCollectionName()) + return NewLockerKeyChain( + NewClusterLockerKey(false), + NewDatabaseLockerKey(t.Req.GetDbName(), false), + NewCollectionLockerKey(collectionName, true), + ) +} diff --git a/internal/rootcoord/describe_collection_task.go b/internal/rootcoord/describe_collection_task.go index 8a9da97a9f1ee..cc49a6cef5b88 100644 --- a/internal/rootcoord/describe_collection_task.go +++ b/internal/rootcoord/describe_collection_task.go @@ -53,3 +53,12 @@ func (t *describeCollectionTask) Execute(ctx context.Context) (err error) { t.Rsp = convertModelToDesc(coll, aliases, db.Name) return nil } + +func (t *describeCollectionTask) GetLockerKey() LockerKey { + collectionName := t.core.getRealCollectionName(t.ctx, t.Req.GetDbName(), t.Req.GetCollectionName()) + return NewLockerKeyChain( + NewClusterLockerKey(false), + NewDatabaseLockerKey(t.Req.GetDbName(), false), + NewCollectionLockerKey(collectionName, false), + ) +} diff --git a/internal/rootcoord/describe_db_task.go b/internal/rootcoord/describe_db_task.go index 603d1a46be90b..d711316b95238 100644 --- a/internal/rootcoord/describe_db_task.go +++ b/internal/rootcoord/describe_db_task.go @@ -55,3 +55,10 @@ func (t *describeDBTask) Execute(ctx context.Context) (err error) { } return nil } + +func (t *describeDBTask) GetLockerKey() LockerKey { + return NewLockerKeyChain( + NewClusterLockerKey(false), + NewDatabaseLockerKey(t.Req.GetDbName(), false), + ) +} diff --git a/internal/rootcoord/drop_alias_task.go b/internal/rootcoord/drop_alias_task.go index 28caceafc9eca..4f9fcbfe644a9 100644 --- a/internal/rootcoord/drop_alias_task.go +++ b/internal/rootcoord/drop_alias_task.go @@ -43,3 +43,12 @@ func (t *dropAliasTask) Execute(ctx context.Context) error { } return t.core.meta.DropAlias(ctx, t.Req.GetDbName(), t.Req.GetAlias(), t.GetTs()) } + +func (t *dropAliasTask) GetLockerKey() LockerKey { + collectionName := t.core.getRealCollectionName(t.ctx, t.Req.GetDbName(), t.Req.GetAlias()) + return NewLockerKeyChain( + NewClusterLockerKey(false), + NewDatabaseLockerKey(t.Req.GetDbName(), false), + NewCollectionLockerKey(collectionName, true), + ) +} diff --git a/internal/rootcoord/drop_collection_task.go b/internal/rootcoord/drop_collection_task.go index ce458e5cff059..4d443a1ad3ad7 100644 --- a/internal/rootcoord/drop_collection_task.go +++ b/internal/rootcoord/drop_collection_task.go @@ -119,3 +119,7 @@ func (t *dropCollectionTask) Execute(ctx context.Context) error { return redoTask.Execute(ctx) } + +func (t *dropCollectionTask) GetLockerKey() LockerKey { + return NewLockerKeyChain(NewClusterLockerKey(false), NewDatabaseLockerKey(t.Req.GetDbName(), true)) +} diff --git a/internal/rootcoord/drop_db_task.go b/internal/rootcoord/drop_db_task.go index 15096c4e3057c..bdc1cc035db32 100644 --- a/internal/rootcoord/drop_db_task.go +++ b/internal/rootcoord/drop_db_task.go @@ -47,7 +47,7 @@ func (t *dropDatabaseTask) Execute(ctx context.Context) error { databaseName: dbName, ts: ts, }) - redoTask.AddAsyncStep(&expireCacheStep{ + redoTask.AddSyncStep(&expireCacheStep{ baseStep: baseStep{core: t.core}, dbName: dbName, ts: ts, @@ -60,3 +60,7 @@ func (t *dropDatabaseTask) Execute(ctx context.Context) error { }) return redoTask.Execute(ctx) } + +func (t *dropDatabaseTask) GetLockerKey() LockerKey { + return NewLockerKeyChain(NewClusterLockerKey(true)) +} diff --git a/internal/rootcoord/drop_partition_task.go b/internal/rootcoord/drop_partition_task.go index cc45422db7f7e..c65c91f9c1099 100644 --- a/internal/rootcoord/drop_partition_task.go +++ b/internal/rootcoord/drop_partition_task.go @@ -111,3 +111,12 @@ func (t *dropPartitionTask) Execute(ctx context.Context) error { return redoTask.Execute(ctx) } + +func (t *dropPartitionTask) GetLockerKey() LockerKey { + collectionName := t.core.getRealCollectionName(t.ctx, t.Req.GetDbName(), t.Req.GetCollectionName()) + return NewLockerKeyChain( + NewClusterLockerKey(false), + NewDatabaseLockerKey(t.Req.GetDbName(), false), + NewCollectionLockerKey(collectionName, true), + ) +} diff --git a/internal/rootcoord/has_collection_task.go b/internal/rootcoord/has_collection_task.go index d9258a8f19607..52a4625ef1012 100644 --- a/internal/rootcoord/has_collection_task.go +++ b/internal/rootcoord/has_collection_task.go @@ -47,3 +47,10 @@ func (t *hasCollectionTask) Execute(ctx context.Context) error { t.Rsp.Value = err == nil return nil } + +func (t *hasCollectionTask) GetLockerKey() LockerKey { + return NewLockerKeyChain( + NewClusterLockerKey(false), + NewDatabaseLockerKey(t.Req.GetDbName(), false), + ) +} diff --git a/internal/rootcoord/has_partition_task.go b/internal/rootcoord/has_partition_task.go index 77ef717b47c84..1e9f32da740b4 100644 --- a/internal/rootcoord/has_partition_task.go +++ b/internal/rootcoord/has_partition_task.go @@ -57,3 +57,12 @@ func (t *hasPartitionTask) Execute(ctx context.Context) error { } return nil } + +func (t *hasPartitionTask) GetLockerKey() LockerKey { + collectionName := t.core.getRealCollectionName(t.ctx, t.Req.GetDbName(), t.Req.GetCollectionName()) + return NewLockerKeyChain( + NewClusterLockerKey(false), + NewDatabaseLockerKey(t.Req.GetDbName(), false), + NewCollectionLockerKey(collectionName, false), + ) +} diff --git a/internal/rootcoord/list_db_task.go b/internal/rootcoord/list_db_task.go index 1b4e81a79519d..847b5928c5747 100644 --- a/internal/rootcoord/list_db_task.go +++ b/internal/rootcoord/list_db_task.go @@ -124,3 +124,7 @@ func (t *listDatabaseTask) Execute(ctx context.Context) error { t.Resp.CreatedTimestamp = createdTimes return nil } + +func (t *listDatabaseTask) GetLockerKey() LockerKey { + return NewLockerKeyChain(NewClusterLockerKey(false)) +} diff --git a/internal/rootcoord/rename_collection_task.go b/internal/rootcoord/rename_collection_task.go index 50bdc61713693..fc915302cc912 100644 --- a/internal/rootcoord/rename_collection_task.go +++ b/internal/rootcoord/rename_collection_task.go @@ -42,3 +42,9 @@ func (t *renameCollectionTask) Execute(ctx context.Context) error { } return t.core.meta.RenameCollection(ctx, t.Req.GetDbName(), t.Req.GetOldName(), t.Req.GetNewDBName(), t.Req.GetNewName(), t.GetTs()) } + +func (t *renameCollectionTask) GetLockerKey() LockerKey { + return NewLockerKeyChain( + NewClusterLockerKey(true), + ) +} diff --git a/internal/rootcoord/root_coord.go b/internal/rootcoord/root_coord.go index ea267b577e590..ad6ef5fc597f2 100644 --- a/internal/rootcoord/root_coord.go +++ b/internal/rootcoord/root_coord.go @@ -1118,6 +1118,15 @@ func (c *Core) HasCollection(ctx context.Context, in *milvuspb.HasCollectionRequ return t.Rsp, nil } +// getRealCollectionName get origin collection name to avoid the alias name +func (c *Core) getRealCollectionName(ctx context.Context, db, collection string) string { + realName, err := c.meta.DescribeAlias(ctx, db, collection, 0) + if err != nil { + return collection + } + return realName +} + func (c *Core) describeCollection(ctx context.Context, in *milvuspb.DescribeCollectionRequest, allowUnavailable bool) (*model.Collection, error) { ts := getTravelTs(in) if in.GetCollectionName() != "" { diff --git a/internal/rootcoord/scheduler.go b/internal/rootcoord/scheduler.go index 1b3f8ab139e99..1f5e3d0e478ec 100644 --- a/internal/rootcoord/scheduler.go +++ b/internal/rootcoord/scheduler.go @@ -27,6 +27,7 @@ import ( "github.com/milvus-io/milvus/internal/allocator" "github.com/milvus-io/milvus/internal/tso" "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/util/lock" ) type IScheduler interface { @@ -48,21 +49,34 @@ type scheduler struct { lock sync.Mutex - minDdlTs atomic.Uint64 + minDdlTs atomic.Uint64 + clusterLock *lock.KeyLock[string] + databaseLock *lock.KeyLock[string] + collectionLock *lock.KeyLock[string] + lockMapping map[LockLevel]*lock.KeyLock[string] } func newScheduler(ctx context.Context, idAllocator allocator.Interface, tsoAllocator tso.Allocator) *scheduler { ctx1, cancel := context.WithCancel(ctx) // TODO n := 1024 * 10 - return &scheduler{ - ctx: ctx1, - cancel: cancel, - idAllocator: idAllocator, - tsoAllocator: tsoAllocator, - taskChan: make(chan task, n), - minDdlTs: *atomic.NewUint64(0), + s := &scheduler{ + ctx: ctx1, + cancel: cancel, + idAllocator: idAllocator, + tsoAllocator: tsoAllocator, + taskChan: make(chan task, n), + minDdlTs: *atomic.NewUint64(0), + clusterLock: lock.NewKeyLock[string](), + databaseLock: lock.NewKeyLock[string](), + collectionLock: lock.NewKeyLock[string](), } + s.lockMapping = map[LockLevel]*lock.KeyLock[string]{ + ClusterLock: s.clusterLock, + DatabaseLock: s.databaseLock, + CollectionLock: s.collectionLock, + } + return s } func (s *scheduler) Start() { @@ -147,6 +161,13 @@ func (s *scheduler) enqueue(task task) { } func (s *scheduler) AddTask(task task) error { + if Params.RootCoordCfg.UseLockScheduler.GetAsBool() { + lockKey := task.GetLockerKey() + if lockKey != nil { + return s.executeTaskWithLock(task, lockKey) + } + } + // make sure that setting ts and enqueue is atomic. s.lock.Lock() defer s.lock.Unlock() @@ -168,3 +189,25 @@ func (s *scheduler) GetMinDdlTs() Timestamp { func (s *scheduler) setMinDdlTs(ts Timestamp) { s.minDdlTs.Store(ts) } + +func (s *scheduler) executeTaskWithLock(task task, lockerKey LockerKey) error { + if lockerKey == nil { + if err := s.setID(task); err != nil { + return err + } + if err := s.setTs(task); err != nil { + return err + } + s.execute(task) + return nil + } + taskLock := s.lockMapping[lockerKey.Level()] + if lockerKey.IsWLock() { + taskLock.Lock(lockerKey.LockKey()) + defer taskLock.Unlock(lockerKey.LockKey()) + } else { + taskLock.RLock(lockerKey.LockKey()) + defer taskLock.RUnlock(lockerKey.LockKey()) + } + return s.executeTaskWithLock(task, lockerKey.Next()) +} diff --git a/internal/rootcoord/scheduler_test.go b/internal/rootcoord/scheduler_test.go index b48c4c8963bdb..99c0806baf008 100644 --- a/internal/rootcoord/scheduler_test.go +++ b/internal/rootcoord/scheduler_test.go @@ -20,13 +20,19 @@ import ( "context" "fmt" "math/rand" + "sync" "testing" "time" "github.com/cockroachdb/errors" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" "go.uber.org/atomic" + "go.uber.org/zap" + "github.com/milvus-io/milvus/internal/allocator" + mocktso "github.com/milvus-io/milvus/internal/tso/mocks" + "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/paramtable" ) @@ -242,3 +248,90 @@ func Test_scheduler_updateDdlMinTsLoop(t *testing.T) { s.Stop() }) } + +type WithLockKeyTask struct { + baseTask + lockKey LockerKey + workDuration time.Duration + newTime time.Time + name string +} + +func NewWithLockKeyTask(lockKey LockerKey, duration time.Duration, name string) *WithLockKeyTask { + task := &WithLockKeyTask{ + baseTask: newBaseTask(context.Background(), nil), + lockKey: lockKey, + workDuration: duration, + newTime: time.Now(), + name: name, + } + return task +} + +func (t *WithLockKeyTask) GetLockerKey() LockerKey { + return t.lockKey +} + +func (t *WithLockKeyTask) Execute(ctx context.Context) error { + log.Info("execute task", zap.String("name", t.name), zap.Duration("duration", time.Since(t.newTime))) + time.Sleep(t.workDuration) + return nil +} + +func TestExecuteTaskWithLock(t *testing.T) { + paramtable.Init() + Params.Save(Params.RootCoordCfg.UseLockScheduler.Key, "true") + defer Params.Reset(Params.RootCoordCfg.UseLockScheduler.Key) + idMock := allocator.NewMockAllocator(t) + tsMock := mocktso.NewAllocator(t) + idMock.EXPECT().AllocOne().Return(1000, nil) + tsMock.EXPECT().GenerateTSO(mock.Anything).Return(10000, nil) + s := newScheduler(context.Background(), idMock, tsMock) + w := &sync.WaitGroup{} + w.Add(4) + { + go func() { + defer w.Done() + time.Sleep(1500 * time.Millisecond) + lockKey := NewLockerKeyChain(NewClusterLockerKey(false), NewDatabaseLockerKey("test", false)) + t1 := NewWithLockKeyTask(lockKey, time.Second*2, "t1-1") + err := s.AddTask(t1) + assert.NoError(t, err) + }() + } + { + go func() { + defer w.Done() + time.Sleep(1500 * time.Millisecond) + lockKey := NewLockerKeyChain(NewClusterLockerKey(false), NewDatabaseLockerKey("test", false)) + t1 := NewWithLockKeyTask(lockKey, time.Second*3, "t1-2") + err := s.AddTask(t1) + assert.NoError(t, err) + }() + } + { + go func() { + defer w.Done() + time.Sleep(500 * time.Millisecond) + lockKey := NewLockerKeyChain(NewClusterLockerKey(false), NewDatabaseLockerKey("test", true)) + t2 := NewWithLockKeyTask(lockKey, time.Second*2, "t2") + err := s.AddTask(t2) + assert.NoError(t, err) + }() + } + { + go func() { + defer w.Done() + lockKey := NewLockerKeyChain(NewClusterLockerKey(true)) + t3 := NewWithLockKeyTask(lockKey, time.Second, "t3") + err := s.AddTask(t3) + assert.NoError(t, err) + }() + } + + startTime := time.Now() + w.Wait() + delta := time.Since(startTime) + assert.True(t, delta > 6*time.Second) + assert.True(t, delta < 8*time.Second) +} diff --git a/internal/rootcoord/show_collection_task.go b/internal/rootcoord/show_collection_task.go index 090d4ada5b561..ab853266bbc36 100644 --- a/internal/rootcoord/show_collection_task.go +++ b/internal/rootcoord/show_collection_task.go @@ -147,3 +147,7 @@ func (t *showCollectionTask) Execute(ctx context.Context) error { } return nil } + +func (t *showCollectionTask) GetLockerKey() LockerKey { + return NewLockerKeyChain(NewClusterLockerKey(false), NewDatabaseLockerKey(t.Req.GetDbName(), false)) +} diff --git a/internal/rootcoord/show_partition_task.go b/internal/rootcoord/show_partition_task.go index 5e1f8214b4824..f023c60f60016 100644 --- a/internal/rootcoord/show_partition_task.go +++ b/internal/rootcoord/show_partition_task.go @@ -67,3 +67,12 @@ func (t *showPartitionTask) Execute(ctx context.Context) error { return nil } + +func (t *showPartitionTask) GetLockerKey() LockerKey { + collectionName := t.core.getRealCollectionName(t.ctx, t.Req.GetDbName(), t.Req.GetCollectionName()) + return NewLockerKeyChain( + NewClusterLockerKey(false), + NewDatabaseLockerKey(t.Req.GetDbName(), false), + NewCollectionLockerKey(collectionName, false), + ) +} diff --git a/internal/rootcoord/task.go b/internal/rootcoord/task.go index bbabf12bb6f57..4b0927b3bf77f 100644 --- a/internal/rootcoord/task.go +++ b/internal/rootcoord/task.go @@ -20,9 +20,27 @@ import ( "context" "time" + "go.uber.org/zap" + + "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/timerecord" ) +type LockLevel int + +const ( + ClusterLock LockLevel = iota + DatabaseLock + CollectionLock +) + +type LockerKey interface { + LockKey() string + Level() LockLevel + IsWLock() bool + Next() LockerKey +} + type task interface { GetCtx() context.Context SetCtx(context.Context) @@ -35,6 +53,7 @@ type task interface { WaitToFinish() error NotifyDone(err error) SetInQueueDuration() + GetLockerKey() LockerKey } type baseTask struct { @@ -101,3 +120,74 @@ func (b *baseTask) NotifyDone(err error) { func (b *baseTask) SetInQueueDuration() { b.queueDur = b.tr.ElapseSpan() } + +func (b *baseTask) GetLockerKey() LockerKey { + return nil +} + +type taskLockerKey struct { + key string + rw bool + level LockLevel + next LockerKey +} + +func (t *taskLockerKey) LockKey() string { + return t.key +} + +func (t *taskLockerKey) Level() LockLevel { + return t.level +} + +func (t *taskLockerKey) IsWLock() bool { + return t.rw +} + +func (t *taskLockerKey) Next() LockerKey { + return t.next +} + +func NewClusterLockerKey(rw bool) LockerKey { + return &taskLockerKey{ + key: "$", + rw: rw, + level: ClusterLock, + } +} + +func NewDatabaseLockerKey(db string, rw bool) LockerKey { + return &taskLockerKey{ + key: db, + rw: rw, + level: DatabaseLock, + } +} + +func NewCollectionLockerKey(collection string, rw bool) LockerKey { + return &taskLockerKey{ + key: collection, + rw: rw, + level: CollectionLock, + } +} + +func NewLockerKeyChain(lockerKeys ...LockerKey) LockerKey { + log.Info("NewLockerKeyChain", zap.Any("lockerKeys", len(lockerKeys))) + if len(lockerKeys) == 0 { + return nil + } + if lockerKeys[0] == nil || lockerKeys[0].Level() != ClusterLock { + log.Warn("Invalid locker key chain", zap.Stack("stack")) + return nil + } + + for i := 0; i < len(lockerKeys)-1; i++ { + if lockerKeys[i] == nil || lockerKeys[i].Level() >= lockerKeys[i+1].Level() { + log.Warn("Invalid locker key chain", zap.Stack("stack")) + return nil + } + lockerKeys[i].(*taskLockerKey).next = lockerKeys[i+1] + } + return lockerKeys[0] +} diff --git a/internal/rootcoord/task_test.go b/internal/rootcoord/task_test.go new file mode 100644 index 0000000000000..aaf866051c53e --- /dev/null +++ b/internal/rootcoord/task_test.go @@ -0,0 +1,345 @@ +/* + * Licensed to the LF AI & Data foundation under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package rootcoord + +import ( + "context" + "fmt" + "testing" + + "github.com/cockroachdb/errors" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + + "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" + "github.com/milvus-io/milvus/internal/proto/rootcoordpb" + mockrootcoord "github.com/milvus-io/milvus/internal/rootcoord/mocks" +) + +func TestLockerKey(t *testing.T) { + clusterLock := NewClusterLockerKey(true) + assert.Equal(t, clusterLock.IsWLock(), true) + assert.Equal(t, clusterLock.Level(), ClusterLock) + assert.Equal(t, clusterLock.LockKey(), "$") + + dbLock := NewDatabaseLockerKey("foo", true) + assert.Equal(t, dbLock.IsWLock(), true) + assert.Equal(t, dbLock.Level(), DatabaseLock) + assert.Equal(t, dbLock.LockKey(), "foo") + + collectionLock := NewCollectionLockerKey("foo", true) + assert.Equal(t, collectionLock.IsWLock(), true) + assert.Equal(t, collectionLock.Level(), CollectionLock) + assert.Equal(t, collectionLock.LockKey(), "foo") + + { + lockerChain := NewLockerKeyChain(nil) + assert.Nil(t, lockerChain) + } + + { + lockerChain := NewLockerKeyChain(dbLock) + assert.Nil(t, lockerChain) + } + + { + lockerChain := NewLockerKeyChain(clusterLock, collectionLock, dbLock) + assert.Nil(t, lockerChain) + } + + { + lockerChain := NewLockerKeyChain(clusterLock, dbLock, collectionLock) + assert.NotNil(t, lockerChain) + assert.Equal(t, lockerChain.Next(), dbLock) + assert.Equal(t, lockerChain.Next().Next(), collectionLock) + } +} + +func GetLockerKeyString(k LockerKey) string { + key := k.LockKey() + level := k.Level() + wLock := k.IsWLock() + if k.Next() == nil { + return fmt.Sprintf("%s-%d-%t", key, level, wLock) + } + return fmt.Sprintf("%s-%d-%t|%s", key, level, wLock, GetLockerKeyString(k.Next())) +} + +func TestGetLockerKey(t *testing.T) { + t.Run("alter alias task locker key", func(t *testing.T) { + tt := &alterAliasTask{ + Req: &milvuspb.AlterAliasRequest{ + DbName: "foo", + CollectionName: "bar", + }, + } + key := tt.GetLockerKey() + assert.Equal(t, GetLockerKeyString(key), "$-0-false|foo-1-false|bar-2-true") + }) + t.Run("alter collection task locker key", func(t *testing.T) { + metaMock := mockrootcoord.NewIMetaTable(t) + metaMock.EXPECT().DescribeAlias(mock.Anything, mock.Anything, mock.Anything, mock.Anything). + RunAndReturn(func(ctx context.Context, s string, s2 string, u uint64) (string, error) { + return s2, nil + }) + c := &Core{ + meta: metaMock, + } + tt := &alterCollectionTask{ + baseTask: baseTask{ + core: c, + }, + Req: &milvuspb.AlterCollectionRequest{ + DbName: "foo", + CollectionName: "bar", + }, + } + key := tt.GetLockerKey() + assert.Equal(t, GetLockerKeyString(key), "$-0-false|foo-1-false|bar-2-true") + }) + t.Run("alter database task locker key", func(t *testing.T) { + tt := &alterDatabaseTask{ + Req: &rootcoordpb.AlterDatabaseRequest{ + DbName: "foo", + }, + } + key := tt.GetLockerKey() + assert.Equal(t, GetLockerKeyString(key), "$-0-false|foo-1-true") + }) + t.Run("create alias task locker key", func(t *testing.T) { + metaMock := mockrootcoord.NewIMetaTable(t) + metaMock.EXPECT().DescribeAlias(mock.Anything, mock.Anything, mock.Anything, mock.Anything). + RunAndReturn(func(ctx context.Context, s string, s2 string, u uint64) (string, error) { + return "", errors.New("not found") + }) + c := &Core{ + meta: metaMock, + } + tt := &createAliasTask{ + baseTask: baseTask{ + core: c, + }, + Req: &milvuspb.CreateAliasRequest{ + DbName: "foo", + CollectionName: "bar", + }, + } + key := tt.GetLockerKey() + assert.Equal(t, GetLockerKeyString(key), "$-0-false|foo-1-false|bar-2-true") + }) + t.Run("create collection task locker key", func(t *testing.T) { + tt := &createCollectionTask{ + Req: &milvuspb.CreateCollectionRequest{ + DbName: "foo", + CollectionName: "bar", + }, + } + key := tt.GetLockerKey() + assert.Equal(t, GetLockerKeyString(key), "$-0-false|foo-1-true") + }) + t.Run("create database task locker key", func(t *testing.T) { + tt := &createDatabaseTask{ + Req: &milvuspb.CreateDatabaseRequest{ + DbName: "foo", + }, + } + key := tt.GetLockerKey() + assert.Equal(t, GetLockerKeyString(key), "$-0-true") + }) + t.Run("create partition task locker key", func(t *testing.T) { + metaMock := mockrootcoord.NewIMetaTable(t) + metaMock.EXPECT().DescribeAlias(mock.Anything, mock.Anything, mock.Anything, mock.Anything). + RunAndReturn(func(ctx context.Context, s string, s2 string, u uint64) (string, error) { + return "real" + s2, nil + }) + c := &Core{ + meta: metaMock, + } + tt := &createPartitionTask{ + baseTask: baseTask{core: c}, + Req: &milvuspb.CreatePartitionRequest{ + DbName: "foo", + CollectionName: "bar", + PartitionName: "baz", + }, + } + key := tt.GetLockerKey() + assert.Equal(t, GetLockerKeyString(key), "$-0-false|foo-1-false|realbar-2-true") + }) + t.Run("describe collection task locker key", func(t *testing.T) { + metaMock := mockrootcoord.NewIMetaTable(t) + metaMock.EXPECT().DescribeAlias(mock.Anything, mock.Anything, mock.Anything, mock.Anything). + RunAndReturn(func(ctx context.Context, s string, s2 string, u uint64) (string, error) { + return "", errors.New("not found") + }) + c := &Core{ + meta: metaMock, + } + tt := &describeCollectionTask{ + baseTask: baseTask{core: c}, + Req: &milvuspb.DescribeCollectionRequest{ + DbName: "foo", + CollectionName: "bar", + }, + } + key := tt.GetLockerKey() + assert.Equal(t, GetLockerKeyString(key), "$-0-false|foo-1-false|bar-2-false") + }) + t.Run("describe database task locker key", func(t *testing.T) { + tt := &describeDBTask{ + Req: &rootcoordpb.DescribeDatabaseRequest{ + DbName: "foo", + }, + } + key := tt.GetLockerKey() + assert.Equal(t, GetLockerKeyString(key), "$-0-false|foo-1-false") + }) + t.Run("drop alias task locker key", func(t *testing.T) { + metaMock := mockrootcoord.NewIMetaTable(t) + metaMock.EXPECT().DescribeAlias(mock.Anything, mock.Anything, mock.Anything, mock.Anything). + RunAndReturn(func(ctx context.Context, s string, s2 string, u uint64) (string, error) { + return "real" + s2, nil + }) + c := &Core{ + meta: metaMock, + } + tt := &dropAliasTask{ + baseTask: baseTask{core: c}, + Req: &milvuspb.DropAliasRequest{ + DbName: "foo", + Alias: "bar", + }, + } + key := tt.GetLockerKey() + assert.Equal(t, GetLockerKeyString(key), "$-0-false|foo-1-false|realbar-2-true") + }) + t.Run("drop collection task locker key", func(t *testing.T) { + tt := &dropCollectionTask{ + Req: &milvuspb.DropCollectionRequest{ + DbName: "foo", + CollectionName: "bar", + }, + } + key := tt.GetLockerKey() + assert.Equal(t, GetLockerKeyString(key), "$-0-false|foo-1-true") + }) + t.Run("drop database task locker key", func(t *testing.T) { + tt := &dropDatabaseTask{ + Req: &milvuspb.DropDatabaseRequest{ + DbName: "foo", + }, + } + key := tt.GetLockerKey() + assert.Equal(t, GetLockerKeyString(key), "$-0-true") + }) + t.Run("drop partition task locker key", func(t *testing.T) { + metaMock := mockrootcoord.NewIMetaTable(t) + metaMock.EXPECT().DescribeAlias(mock.Anything, mock.Anything, mock.Anything, mock.Anything). + RunAndReturn(func(ctx context.Context, s string, s2 string, u uint64) (string, error) { + return "real" + s2, nil + }) + c := &Core{ + meta: metaMock, + } + tt := &dropPartitionTask{ + baseTask: baseTask{core: c}, + Req: &milvuspb.DropPartitionRequest{ + DbName: "foo", + CollectionName: "bar", + PartitionName: "baz", + }, + } + key := tt.GetLockerKey() + assert.Equal(t, GetLockerKeyString(key), "$-0-false|foo-1-false|realbar-2-true") + }) + t.Run("has collection task locker key", func(t *testing.T) { + tt := &hasCollectionTask{ + Req: &milvuspb.HasCollectionRequest{ + DbName: "foo", + CollectionName: "bar", + }, + } + key := tt.GetLockerKey() + assert.Equal(t, GetLockerKeyString(key), "$-0-false|foo-1-false") + }) + t.Run("has partition task locker key", func(t *testing.T) { + metaMock := mockrootcoord.NewIMetaTable(t) + metaMock.EXPECT().DescribeAlias(mock.Anything, mock.Anything, mock.Anything, mock.Anything). + RunAndReturn(func(ctx context.Context, s string, s2 string, u uint64) (string, error) { + return "real" + s2, nil + }) + c := &Core{ + meta: metaMock, + } + tt := &hasPartitionTask{ + baseTask: baseTask{core: c}, + Req: &milvuspb.HasPartitionRequest{ + DbName: "foo", + CollectionName: "bar", + PartitionName: "baz", + }, + } + key := tt.GetLockerKey() + assert.Equal(t, GetLockerKeyString(key), "$-0-false|foo-1-false|realbar-2-false") + }) + t.Run("list db task locker key", func(t *testing.T) { + tt := &listDatabaseTask{} + key := tt.GetLockerKey() + assert.Equal(t, GetLockerKeyString(key), "$-0-false") + }) + t.Run("rename collection task locker key", func(t *testing.T) { + tt := &renameCollectionTask{ + Req: &milvuspb.RenameCollectionRequest{ + DbName: "foo", + OldName: "bar", + NewName: "baz", + }, + } + key := tt.GetLockerKey() + assert.Equal(t, GetLockerKeyString(key), "$-0-true") + }) + t.Run("show collection task locker key", func(t *testing.T) { + tt := &showCollectionTask{ + Req: &milvuspb.ShowCollectionsRequest{ + DbName: "foo", + }, + } + key := tt.GetLockerKey() + assert.Equal(t, GetLockerKeyString(key), "$-0-false|foo-1-false") + }) + t.Run("show partition task locker key", func(t *testing.T) { + metaMock := mockrootcoord.NewIMetaTable(t) + metaMock.EXPECT().DescribeAlias(mock.Anything, mock.Anything, mock.Anything, mock.Anything). + RunAndReturn(func(ctx context.Context, s string, s2 string, u uint64) (string, error) { + return "real" + s2, nil + }) + c := &Core{ + meta: metaMock, + } + tt := &showPartitionTask{ + baseTask: baseTask{core: c}, + Req: &milvuspb.ShowPartitionsRequest{ + DbName: "foo", + CollectionName: "bar", + }, + } + key := tt.GetLockerKey() + assert.Equal(t, GetLockerKeyString(key), "$-0-false|foo-1-false|realbar-2-false") + }) +} diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index 9e0ff8bdc30a4..d7137f0061184 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -1117,6 +1117,7 @@ type rootCoordConfig struct { MaxDatabaseNum ParamItem `refreshable:"false"` MaxGeneralCapacity ParamItem `refreshable:"true"` GracefulStopTimeout ParamItem `refreshable:"true"` + UseLockScheduler ParamItem `refreshable:"true"` } func (p *rootCoordConfig) init(base *BaseTable) { @@ -1191,6 +1192,15 @@ Segments with smaller size than this parameter will not be indexed, and will be Export: true, } p.GracefulStopTimeout.Init(base.mgr) + + p.UseLockScheduler = ParamItem{ + Key: "rootCoord.useLockScheduler", + Version: "2.4.15", + DefaultValue: "false", + Doc: "use lock to schedule the task", + Export: false, + } + p.UseLockScheduler.Init(base.mgr) } // /////////////////////////////////////////////////////////////////////////////