Skip to content

Commit

Permalink
fix: [2.4] Use singleton delete pool and avoid goroutine leakage (#37225
Browse files Browse the repository at this point in the history
)

Cherry-pick from master
pr: #37220
Related to #36887

Previously using newly create pool per request shall cause goroutine
leakage. This PR change this behavior by using singleton delete pool.
This change could also provide better concurrency control over delete
memory usage.

Signed-off-by: Congqi Xia <[email protected]>
  • Loading branch information
congqixia authored Oct 29, 2024
1 parent 0b284cc commit 3d1e81f
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 2 deletions.
15 changes: 15 additions & 0 deletions internal/querynodev2/segments/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ var (
warmupPool atomic.Pointer[conc.Pool[any]]
warmupOnce sync.Once

deletePool atomic.Pointer[conc.Pool[struct{}]]
deletePoolOnce sync.Once

bfPool atomic.Pointer[conc.Pool[any]]
bfApplyOnce sync.Once
)
Expand Down Expand Up @@ -131,6 +134,13 @@ func initBFApplyPool() {
})
}

func initDeletePool() {
deletePoolOnce.Do(func() {
pool := conc.NewPool[struct{}](runtime.GOMAXPROCS(0))
deletePool.Store(pool)
})
}

// GetSQPool returns the singleton pool instance for search/query operations.
func GetSQPool() *conc.Pool[any] {
initSQPool()
Expand Down Expand Up @@ -158,6 +168,11 @@ func GetBFApplyPool() *conc.Pool[any] {
return bfPool.Load()
}

func GetDeletePool() *conc.Pool[struct{}] {
initDeletePool()
return deletePool.Load()
}

func ResizeSQPool(evt *config.Event) {
if evt.HasUpdated {
pt := paramtable.Get()
Expand Down
3 changes: 1 addition & 2 deletions internal/querynodev2/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package querynodev2
import (
"context"
"fmt"
"runtime"
"strconv"
"sync"
"time"
Expand Down Expand Up @@ -1475,7 +1474,7 @@ func (node *QueryNode) DeleteBatch(ctx context.Context, req *querypb.DeleteBatch

// control the execution batch parallel with P number
// maybe it shall be lower in case of heavy CPU usage may impacting search/query
pool := conc.NewPool[struct{}](runtime.GOMAXPROCS(0))
pool := segments.GetDeletePool()
futures := make([]*conc.Future[struct{}], 0, len(segs))
errSet := typeutil.NewConcurrentSet[int64]()

Expand Down

0 comments on commit 3d1e81f

Please sign in to comment.