From 3d1e81fb316bfe256c4be33f74fadd10abbdab29 Mon Sep 17 00:00:00 2001 From: congqixia Date: Tue, 29 Oct 2024 14:44:23 +0800 Subject: [PATCH] fix: [2.4] Use singleton delete pool and avoid goroutine leakage (#37225) Cherry-pick from master pr: #37220 Related to #36887 Previously using newly create pool per request shall cause goroutine leakage. This PR change this behavior by using singleton delete pool. This change could also provide better concurrency control over delete memory usage. Signed-off-by: Congqi Xia --- internal/querynodev2/segments/pool.go | 15 +++++++++++++++ internal/querynodev2/services.go | 3 +-- 2 files changed, 16 insertions(+), 2 deletions(-) diff --git a/internal/querynodev2/segments/pool.go b/internal/querynodev2/segments/pool.go index 7557c853dc628..7bddca6169e83 100644 --- a/internal/querynodev2/segments/pool.go +++ b/internal/querynodev2/segments/pool.go @@ -46,6 +46,9 @@ var ( warmupPool atomic.Pointer[conc.Pool[any]] warmupOnce sync.Once + deletePool atomic.Pointer[conc.Pool[struct{}]] + deletePoolOnce sync.Once + bfPool atomic.Pointer[conc.Pool[any]] bfApplyOnce sync.Once ) @@ -131,6 +134,13 @@ func initBFApplyPool() { }) } +func initDeletePool() { + deletePoolOnce.Do(func() { + pool := conc.NewPool[struct{}](runtime.GOMAXPROCS(0)) + deletePool.Store(pool) + }) +} + // GetSQPool returns the singleton pool instance for search/query operations. func GetSQPool() *conc.Pool[any] { initSQPool() @@ -158,6 +168,11 @@ func GetBFApplyPool() *conc.Pool[any] { return bfPool.Load() } +func GetDeletePool() *conc.Pool[struct{}] { + initDeletePool() + return deletePool.Load() +} + func ResizeSQPool(evt *config.Event) { if evt.HasUpdated { pt := paramtable.Get() diff --git a/internal/querynodev2/services.go b/internal/querynodev2/services.go index 9a9e47c29706b..ca68ab95d5f53 100644 --- a/internal/querynodev2/services.go +++ b/internal/querynodev2/services.go @@ -19,7 +19,6 @@ package querynodev2 import ( "context" "fmt" - "runtime" "strconv" "sync" "time" @@ -1475,7 +1474,7 @@ func (node *QueryNode) DeleteBatch(ctx context.Context, req *querypb.DeleteBatch // control the execution batch parallel with P number // maybe it shall be lower in case of heavy CPU usage may impacting search/query - pool := conc.NewPool[struct{}](runtime.GOMAXPROCS(0)) + pool := segments.GetDeletePool() futures := make([]*conc.Future[struct{}], 0, len(segs)) errSet := typeutil.NewConcurrentSet[int64]()