Skip to content

Commit

Permalink
fix: use seperate warmup pool and disable warmup by default (#33348) (#…
Browse files Browse the repository at this point in the history
…33349)

1. use a small warmup pool to reduce the impact of warmup
2. change the warmup pool to nonblocking mode
3. disable warmup by default
4. remove the maximum size limit of 16 for the load pool

issue: #32772

pr: #33348

---------

Signed-off-by: bigsheeper <[email protected]>
Co-authored-by: xiaofanluan <[email protected]>
  • Loading branch information
bigsheeper and xiaofan-luan authored May 28, 2024
1 parent 9c0076c commit 7384bfe
Show file tree
Hide file tree
Showing 6 changed files with 70 additions and 20 deletions.
8 changes: 4 additions & 4 deletions configs/milvus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -329,13 +329,13 @@ queryNode:
enabled: true
memoryLimit: 2147483648 # 2 GB, 2 * 1024 *1024 *1024
readAheadPolicy: willneed # The read ahead policy of chunk cache, options: `normal, random, sequential, willneed, dontneed`
# options: async, sync, off.
# options: async, sync, disable.
# Specifies the necessity for warming up the chunk cache.
# 1. If set to "sync" or "async," the original vector data will be synchronously/asynchronously loaded into the
# 1. If set to "sync" or "async" the original vector data will be synchronously/asynchronously loaded into the
# chunk cache during the load process. This approach has the potential to substantially reduce query/search latency
# for a specific duration post-load, albeit accompanied by a concurrent increase in disk usage;
# 2. If set to "off," original vector data will only be loaded into the chunk cache during search/query.
warmup: async
# 2. If set to "disable" original vector data will only be loaded into the chunk cache during search/query.
warmup: disable
mmap:
mmapEnabled: false # Enable mmap for loading data
lazyload:
Expand Down
47 changes: 38 additions & 9 deletions internal/querynodev2/segments/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,14 @@ var (
// and other operations (insert/delete/statistics/etc.)
// since in concurrent situation, there operation may block each other in high payload

sqp atomic.Pointer[conc.Pool[any]]
sqOnce sync.Once
dp atomic.Pointer[conc.Pool[any]]
dynOnce sync.Once
loadPool atomic.Pointer[conc.Pool[any]]
loadOnce sync.Once
sqp atomic.Pointer[conc.Pool[any]]
sqOnce sync.Once
dp atomic.Pointer[conc.Pool[any]]
dynOnce sync.Once
loadPool atomic.Pointer[conc.Pool[any]]
loadOnce sync.Once
warmupPool atomic.Pointer[conc.Pool[any]]
warmupOnce sync.Once
)

// initSQPool initialize
Expand Down Expand Up @@ -80,9 +82,6 @@ func initLoadPool() {
loadOnce.Do(func() {
pt := paramtable.Get()
poolSize := hardware.GetCPUNum() * pt.CommonCfg.MiddlePriorityThreadCoreCoefficient.GetAsInt()
if poolSize > 16 {
poolSize = 16
}
pool := conc.NewPool[any](
poolSize,
conc.WithPreAlloc(false),
Expand All @@ -96,6 +95,23 @@ func initLoadPool() {
})
}

func initWarmupPool() {
warmupOnce.Do(func() {
pt := paramtable.Get()
poolSize := hardware.GetCPUNum() * pt.CommonCfg.LowPriorityThreadCoreCoefficient.GetAsInt()
pool := conc.NewPool[any](
poolSize,
conc.WithPreAlloc(false),
conc.WithDisablePurge(false),
conc.WithPreHandler(runtime.LockOSThread), // lock os thread for cgo thread disposal
conc.WithNonBlocking(true), // make warming up non blocking
)

warmupPool.Store(pool)
pt.Watch(pt.CommonCfg.LowPriorityThreadCoreCoefficient.Key, config.NewHandler("qn.warmpool.lowpriority", ResizeWarmupPool))
})
}

// GetSQPool returns the singleton pool instance for search/query operations.
func GetSQPool() *conc.Pool[any] {
initSQPool()
Expand All @@ -113,6 +129,11 @@ func GetLoadPool() *conc.Pool[any] {
return loadPool.Load()
}

func GetWarmupPool() *conc.Pool[any] {
initWarmupPool()
return warmupPool.Load()
}

func ResizeSQPool(evt *config.Event) {
if evt.HasUpdated {
pt := paramtable.Get()
Expand All @@ -131,6 +152,14 @@ func ResizeLoadPool(evt *config.Event) {
}
}

func ResizeWarmupPool(evt *config.Event) {
if evt.HasUpdated {
pt := paramtable.Get()
newSize := hardware.GetCPUNum() * pt.CommonCfg.LowPriorityThreadCoreCoefficient.GetAsInt()
resizePool(GetWarmupPool(), newSize, "WarmupPool")
}
}

func resizePool(pool *conc.Pool[any], newSize int, tag string) {
log := log.Ctx(context.Background()).
With(
Expand Down
21 changes: 21 additions & 0 deletions internal/querynodev2/segments/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,27 @@ func TestResizePools(t *testing.T) {
assert.Equal(t, expectedCap, GetLoadPool().Cap())
})

t.Run("WarmupPool", func(t *testing.T) {
expectedCap := hardware.GetCPUNum() * pt.CommonCfg.LowPriorityThreadCoreCoefficient.GetAsInt()

ResizeWarmupPool(&config.Event{
HasUpdated: true,
})
assert.Equal(t, expectedCap, GetWarmupPool().Cap())

pt.Save(pt.CommonCfg.LowPriorityThreadCoreCoefficient.Key, strconv.FormatFloat(pt.CommonCfg.LowPriorityThreadCoreCoefficient.GetAsFloat()*2, 'f', 10, 64))
ResizeWarmupPool(&config.Event{
HasUpdated: true,
})
assert.Equal(t, expectedCap, GetWarmupPool().Cap())

pt.Save(pt.CommonCfg.LowPriorityThreadCoreCoefficient.Key, "0")
ResizeWarmupPool(&config.Event{
HasUpdated: true,
})
assert.Equal(t, expectedCap, GetWarmupPool().Cap())
})

t.Run("error_pool", func(*testing.T) {
pool := conc.NewDefaultPool[any]()
c := pool.Cap()
Expand Down
4 changes: 2 additions & 2 deletions internal/querynodev2/segments/segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -1367,7 +1367,7 @@ func (s *LocalSegment) WarmupChunkCache(ctx context.Context, fieldID int64) {
warmingUp := strings.ToLower(paramtable.Get().QueryNodeCfg.ChunkCacheWarmingUp.GetValue())
switch warmingUp {
case "sync":
GetLoadPool().Submit(func() (any, error) {
GetWarmupPool().Submit(func() (any, error) {
cFieldID := C.int64_t(fieldID)
status = C.WarmupChunkCache(s.ptr, cFieldID)
if err := HandleCStatus(ctx, &status, "warming up chunk cache failed"); err != nil {
Expand All @@ -1378,7 +1378,7 @@ func (s *LocalSegment) WarmupChunkCache(ctx context.Context, fieldID int64) {
return nil, nil
}).Await()
case "async":
GetLoadPool().Submit(func() (any, error) {
GetWarmupPool().Submit(func() (any, error) {
if !s.ptrLock.RLockIf(state.IsNotReleased) {
return nil, nil
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/util/paramtable/component_param.go
Original file line number Diff line number Diff line change
Expand Up @@ -2347,13 +2347,13 @@ func (p *queryNodeConfig) init(base *BaseTable) {
p.ChunkCacheWarmingUp = ParamItem{
Key: "queryNode.cache.warmup",
Version: "2.3.6",
DefaultValue: "async",
Doc: `options: async, sync, off.
DefaultValue: "disable",
Doc: `options: async, sync, disable.
Specifies the necessity for warming up the chunk cache.
1. If set to "sync" or "async," the original vector data will be synchronously/asynchronously loaded into the
1. If set to "sync" or "async" the original vector data will be synchronously/asynchronously loaded into the
chunk cache during the load process. This approach has the potential to substantially reduce query/search latency
for a specific duration post-load, albeit accompanied by a concurrent increase in disk usage;
2. If set to "off," original vector data will only be loaded into the chunk cache during search/query.`,
2. If set to "disable" original vector data will only be loaded into the chunk cache during search/query.`,
Export: true,
}
p.ChunkCacheWarmingUp.Init(base.mgr)
Expand Down
2 changes: 1 addition & 1 deletion pkg/util/paramtable/component_param_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ func TestComponentParam(t *testing.T) {

// chunk cache
assert.Equal(t, "willneed", Params.ReadAheadPolicy.GetValue())
assert.Equal(t, "async", Params.ChunkCacheWarmingUp.GetValue())
assert.Equal(t, "disable", Params.ChunkCacheWarmingUp.GetValue())

// test small indexNlist/NProbe default
params.Remove("queryNode.segcore.smallIndex.nlist")
Expand Down

0 comments on commit 7384bfe

Please sign in to comment.