From 81fa7dd52c91be207d28e7fbf90debea5b79eaf0 Mon Sep 17 00:00:00 2001 From: Zhen Ye Date: Fri, 15 Nov 2024 15:04:31 +0800 Subject: [PATCH] fix: add ddl and dcl concurrency to avoid competition (#37672) issue: #37166 Signed-off-by: chyezh --- configs/milvus.yaml | 2 ++ internal/proxy/task_scheduler.go | 14 ++++++++++++-- pkg/util/paramtable/component_param.go | 20 ++++++++++++++++++++ pkg/util/paramtable/component_param_test.go | 3 +++ 4 files changed, 37 insertions(+), 2 deletions(-) diff --git a/configs/milvus.yaml b/configs/milvus.yaml index 88817e4de03a6..3eb6a7d339dbb 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -286,6 +286,8 @@ proxy: ginLogging: true ginLogSkipPaths: / # skip url path for gin log maxTaskNum: 1024 # The maximum number of tasks in the task queue of the proxy. + ddlConcurrency: 16 # The concurrent execution number of DDL at proxy. + dclConcurrency: 16 # The concurrent execution number of DCL at proxy. mustUsePartitionKey: false # switch for whether proxy must use partition key for the collection accessLog: enable: false # Whether to enable the access log feature. diff --git a/internal/proxy/task_scheduler.go b/internal/proxy/task_scheduler.go index 05a8199844f94..878363c94e161 100644 --- a/internal/proxy/task_scheduler.go +++ b/internal/proxy/task_scheduler.go @@ -493,6 +493,8 @@ func (sched *taskScheduler) processTask(t task, q taskQueue) { // definitionLoop schedules the ddl tasks. func (sched *taskScheduler) definitionLoop() { defer sched.wg.Done() + + pool := conc.NewPool[struct{}](paramtable.Get().ProxyCfg.DDLConcurrency.GetAsInt(), conc.WithExpiryDuration(time.Minute)) for { select { case <-sched.ctx.Done(): @@ -500,7 +502,10 @@ func (sched *taskScheduler) definitionLoop() { case <-sched.ddQueue.utChan(): if !sched.ddQueue.utEmpty() { t := sched.scheduleDdTask() - sched.processTask(t, sched.ddQueue) + pool.Submit(func() (struct{}, error) { + sched.processTask(t, sched.ddQueue) + return struct{}{}, nil + }) } } } @@ -509,6 +514,8 @@ func (sched *taskScheduler) definitionLoop() { // controlLoop schedule the data control operation, such as flush func (sched *taskScheduler) controlLoop() { defer sched.wg.Done() + + pool := conc.NewPool[struct{}](paramtable.Get().ProxyCfg.DCLConcurrency.GetAsInt(), conc.WithExpiryDuration(time.Minute)) for { select { case <-sched.ctx.Done(): @@ -516,7 +523,10 @@ func (sched *taskScheduler) controlLoop() { case <-sched.dcQueue.utChan(): if !sched.dcQueue.utEmpty() { t := sched.scheduleDcTask() - sched.processTask(t, sched.dcQueue) + pool.Submit(func() (struct{}, error) { + sched.processTask(t, sched.dcQueue) + return struct{}{}, nil + }) } } } diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index a1f0ed6897642..9f3acdcbd8f04 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -1241,6 +1241,8 @@ type proxyConfig struct { MaxUserNum ParamItem `refreshable:"true"` MaxRoleNum ParamItem `refreshable:"true"` MaxTaskNum ParamItem `refreshable:"false"` + DDLConcurrency ParamItem `refreshable:"true"` + DCLConcurrency ParamItem `refreshable:"true"` ShardLeaderCacheInterval ParamItem `refreshable:"false"` ReplicaSelectionPolicy ParamItem `refreshable:"false"` CheckQueryNodeHealthInterval ParamItem `refreshable:"false"` @@ -1387,6 +1389,24 @@ func (p *proxyConfig) init(base *BaseTable) { } p.MaxTaskNum.Init(base.mgr) + p.DDLConcurrency = ParamItem{ + Key: "proxy.ddlConcurrency", + Version: "2.5.0", + DefaultValue: "16", + Doc: "The concurrent execution number of DDL at proxy.", + Export: true, + } + p.DDLConcurrency.Init(base.mgr) + + p.DCLConcurrency = ParamItem{ + Key: "proxy.dclConcurrency", + Version: "2.5.0", + DefaultValue: "16", + Doc: "The concurrent execution number of DCL at proxy.", + Export: true, + } + p.DCLConcurrency.Init(base.mgr) + p.GinLogging = ParamItem{ Key: "proxy.ginLogging", Version: "2.2.0", diff --git a/pkg/util/paramtable/component_param_test.go b/pkg/util/paramtable/component_param_test.go index 22bccfe3e5ae8..0136e4813644e 100644 --- a/pkg/util/paramtable/component_param_test.go +++ b/pkg/util/paramtable/component_param_test.go @@ -211,6 +211,9 @@ func TestComponentParam(t *testing.T) { assert.Equal(t, int64(10), Params.CheckWorkloadRequestNum.GetAsInt64()) assert.Equal(t, float64(0.1), Params.WorkloadToleranceFactor.GetAsFloat()) + + assert.Equal(t, int64(16), Params.DDLConcurrency.GetAsInt64()) + assert.Equal(t, int64(16), Params.DCLConcurrency.GetAsInt64()) }) // t.Run("test proxyConfig panic", func(t *testing.T) {