Skip to content

Commit

Permalink
fix: add ddl and dcl concurrency to avoid competition (milvus-io#37672)
Browse files Browse the repository at this point in the history
issue: milvus-io#37166

Signed-off-by: chyezh <[email protected]>
  • Loading branch information
chyezh authored Nov 15, 2024
1 parent 65d3c66 commit 81fa7dd
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 2 deletions.
2 changes: 2 additions & 0 deletions configs/milvus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,8 @@ proxy:
ginLogging: true
ginLogSkipPaths: / # skip url path for gin log
maxTaskNum: 1024 # The maximum number of tasks in the task queue of the proxy.
ddlConcurrency: 16 # The concurrent execution number of DDL at proxy.
dclConcurrency: 16 # The concurrent execution number of DCL at proxy.
mustUsePartitionKey: false # switch for whether proxy must use partition key for the collection
accessLog:
enable: false # Whether to enable the access log feature.
Expand Down
14 changes: 12 additions & 2 deletions internal/proxy/task_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -493,14 +493,19 @@ func (sched *taskScheduler) processTask(t task, q taskQueue) {
// definitionLoop schedules the ddl tasks.
func (sched *taskScheduler) definitionLoop() {
defer sched.wg.Done()

pool := conc.NewPool[struct{}](paramtable.Get().ProxyCfg.DDLConcurrency.GetAsInt(), conc.WithExpiryDuration(time.Minute))
for {
select {
case <-sched.ctx.Done():
return
case <-sched.ddQueue.utChan():
if !sched.ddQueue.utEmpty() {
t := sched.scheduleDdTask()
sched.processTask(t, sched.ddQueue)
pool.Submit(func() (struct{}, error) {
sched.processTask(t, sched.ddQueue)
return struct{}{}, nil
})
}
}
}
Expand All @@ -509,14 +514,19 @@ func (sched *taskScheduler) definitionLoop() {
// controlLoop schedule the data control operation, such as flush
func (sched *taskScheduler) controlLoop() {
defer sched.wg.Done()

pool := conc.NewPool[struct{}](paramtable.Get().ProxyCfg.DCLConcurrency.GetAsInt(), conc.WithExpiryDuration(time.Minute))
for {
select {
case <-sched.ctx.Done():
return
case <-sched.dcQueue.utChan():
if !sched.dcQueue.utEmpty() {
t := sched.scheduleDcTask()
sched.processTask(t, sched.dcQueue)
pool.Submit(func() (struct{}, error) {
sched.processTask(t, sched.dcQueue)
return struct{}{}, nil
})
}
}
}
Expand Down
20 changes: 20 additions & 0 deletions pkg/util/paramtable/component_param.go
Original file line number Diff line number Diff line change
Expand Up @@ -1241,6 +1241,8 @@ type proxyConfig struct {
MaxUserNum ParamItem `refreshable:"true"`
MaxRoleNum ParamItem `refreshable:"true"`
MaxTaskNum ParamItem `refreshable:"false"`
DDLConcurrency ParamItem `refreshable:"true"`
DCLConcurrency ParamItem `refreshable:"true"`
ShardLeaderCacheInterval ParamItem `refreshable:"false"`
ReplicaSelectionPolicy ParamItem `refreshable:"false"`
CheckQueryNodeHealthInterval ParamItem `refreshable:"false"`
Expand Down Expand Up @@ -1387,6 +1389,24 @@ func (p *proxyConfig) init(base *BaseTable) {
}
p.MaxTaskNum.Init(base.mgr)

p.DDLConcurrency = ParamItem{
Key: "proxy.ddlConcurrency",
Version: "2.5.0",
DefaultValue: "16",
Doc: "The concurrent execution number of DDL at proxy.",
Export: true,
}
p.DDLConcurrency.Init(base.mgr)

p.DCLConcurrency = ParamItem{
Key: "proxy.dclConcurrency",
Version: "2.5.0",
DefaultValue: "16",
Doc: "The concurrent execution number of DCL at proxy.",
Export: true,
}
p.DCLConcurrency.Init(base.mgr)

p.GinLogging = ParamItem{
Key: "proxy.ginLogging",
Version: "2.2.0",
Expand Down
3 changes: 3 additions & 0 deletions pkg/util/paramtable/component_param_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,9 @@ func TestComponentParam(t *testing.T) {

assert.Equal(t, int64(10), Params.CheckWorkloadRequestNum.GetAsInt64())
assert.Equal(t, float64(0.1), Params.WorkloadToleranceFactor.GetAsFloat())

assert.Equal(t, int64(16), Params.DDLConcurrency.GetAsInt64())
assert.Equal(t, int64(16), Params.DCLConcurrency.GetAsInt64())
})

// t.Run("test proxyConfig panic", func(t *testing.T) {
Expand Down

0 comments on commit 81fa7dd

Please sign in to comment.