From 7384bfe3f80b1cdd306af99b847593d4223de650 Mon Sep 17 00:00:00 2001 From: "yihao.dai" Date: Tue, 28 May 2024 19:27:43 +0800 Subject: [PATCH] fix: use seperate warmup pool and disable warmup by default (#33348) (#33349) 1. use a small warmup pool to reduce the impact of warmup 2. change the warmup pool to nonblocking mode 3. disable warmup by default 4. remove the maximum size limit of 16 for the load pool issue: https://github.com/milvus-io/milvus/issues/32772 pr: https://github.com/milvus-io/milvus/pull/33348 --------- Signed-off-by: bigsheeper Co-authored-by: xiaofanluan --- configs/milvus.yaml | 8 ++-- internal/querynodev2/segments/pool.go | 47 +++++++++++++++++---- internal/querynodev2/segments/pool_test.go | 21 +++++++++ internal/querynodev2/segments/segment.go | 4 +- pkg/util/paramtable/component_param.go | 8 ++-- pkg/util/paramtable/component_param_test.go | 2 +- 6 files changed, 70 insertions(+), 20 deletions(-) diff --git a/configs/milvus.yaml b/configs/milvus.yaml index 35b361e6ac131..6cf6df7071355 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -329,13 +329,13 @@ queryNode: enabled: true memoryLimit: 2147483648 # 2 GB, 2 * 1024 *1024 *1024 readAheadPolicy: willneed # The read ahead policy of chunk cache, options: `normal, random, sequential, willneed, dontneed` - # options: async, sync, off. + # options: async, sync, disable. # Specifies the necessity for warming up the chunk cache. - # 1. If set to "sync" or "async," the original vector data will be synchronously/asynchronously loaded into the + # 1. If set to "sync" or "async" the original vector data will be synchronously/asynchronously loaded into the # chunk cache during the load process. This approach has the potential to substantially reduce query/search latency # for a specific duration post-load, albeit accompanied by a concurrent increase in disk usage; - # 2. If set to "off," original vector data will only be loaded into the chunk cache during search/query. - warmup: async + # 2. If set to "disable" original vector data will only be loaded into the chunk cache during search/query. + warmup: disable mmap: mmapEnabled: false # Enable mmap for loading data lazyload: diff --git a/internal/querynodev2/segments/pool.go b/internal/querynodev2/segments/pool.go index bbf21b91a746e..cb025d8c749a3 100644 --- a/internal/querynodev2/segments/pool.go +++ b/internal/querynodev2/segments/pool.go @@ -37,12 +37,14 @@ var ( // and other operations (insert/delete/statistics/etc.) // since in concurrent situation, there operation may block each other in high payload - sqp atomic.Pointer[conc.Pool[any]] - sqOnce sync.Once - dp atomic.Pointer[conc.Pool[any]] - dynOnce sync.Once - loadPool atomic.Pointer[conc.Pool[any]] - loadOnce sync.Once + sqp atomic.Pointer[conc.Pool[any]] + sqOnce sync.Once + dp atomic.Pointer[conc.Pool[any]] + dynOnce sync.Once + loadPool atomic.Pointer[conc.Pool[any]] + loadOnce sync.Once + warmupPool atomic.Pointer[conc.Pool[any]] + warmupOnce sync.Once ) // initSQPool initialize @@ -80,9 +82,6 @@ func initLoadPool() { loadOnce.Do(func() { pt := paramtable.Get() poolSize := hardware.GetCPUNum() * pt.CommonCfg.MiddlePriorityThreadCoreCoefficient.GetAsInt() - if poolSize > 16 { - poolSize = 16 - } pool := conc.NewPool[any]( poolSize, conc.WithPreAlloc(false), @@ -96,6 +95,23 @@ func initLoadPool() { }) } +func initWarmupPool() { + warmupOnce.Do(func() { + pt := paramtable.Get() + poolSize := hardware.GetCPUNum() * pt.CommonCfg.LowPriorityThreadCoreCoefficient.GetAsInt() + pool := conc.NewPool[any]( + poolSize, + conc.WithPreAlloc(false), + conc.WithDisablePurge(false), + conc.WithPreHandler(runtime.LockOSThread), // lock os thread for cgo thread disposal + conc.WithNonBlocking(true), // make warming up non blocking + ) + + warmupPool.Store(pool) + pt.Watch(pt.CommonCfg.LowPriorityThreadCoreCoefficient.Key, config.NewHandler("qn.warmpool.lowpriority", ResizeWarmupPool)) + }) +} + // GetSQPool returns the singleton pool instance for search/query operations. func GetSQPool() *conc.Pool[any] { initSQPool() @@ -113,6 +129,11 @@ func GetLoadPool() *conc.Pool[any] { return loadPool.Load() } +func GetWarmupPool() *conc.Pool[any] { + initWarmupPool() + return warmupPool.Load() +} + func ResizeSQPool(evt *config.Event) { if evt.HasUpdated { pt := paramtable.Get() @@ -131,6 +152,14 @@ func ResizeLoadPool(evt *config.Event) { } } +func ResizeWarmupPool(evt *config.Event) { + if evt.HasUpdated { + pt := paramtable.Get() + newSize := hardware.GetCPUNum() * pt.CommonCfg.LowPriorityThreadCoreCoefficient.GetAsInt() + resizePool(GetWarmupPool(), newSize, "WarmupPool") + } +} + func resizePool(pool *conc.Pool[any], newSize int, tag string) { log := log.Ctx(context.Background()). With( diff --git a/internal/querynodev2/segments/pool_test.go b/internal/querynodev2/segments/pool_test.go index 6c817bdb1eb9a..868bce4186236 100644 --- a/internal/querynodev2/segments/pool_test.go +++ b/internal/querynodev2/segments/pool_test.go @@ -82,6 +82,27 @@ func TestResizePools(t *testing.T) { assert.Equal(t, expectedCap, GetLoadPool().Cap()) }) + t.Run("WarmupPool", func(t *testing.T) { + expectedCap := hardware.GetCPUNum() * pt.CommonCfg.LowPriorityThreadCoreCoefficient.GetAsInt() + + ResizeWarmupPool(&config.Event{ + HasUpdated: true, + }) + assert.Equal(t, expectedCap, GetWarmupPool().Cap()) + + pt.Save(pt.CommonCfg.LowPriorityThreadCoreCoefficient.Key, strconv.FormatFloat(pt.CommonCfg.LowPriorityThreadCoreCoefficient.GetAsFloat()*2, 'f', 10, 64)) + ResizeWarmupPool(&config.Event{ + HasUpdated: true, + }) + assert.Equal(t, expectedCap, GetWarmupPool().Cap()) + + pt.Save(pt.CommonCfg.LowPriorityThreadCoreCoefficient.Key, "0") + ResizeWarmupPool(&config.Event{ + HasUpdated: true, + }) + assert.Equal(t, expectedCap, GetWarmupPool().Cap()) + }) + t.Run("error_pool", func(*testing.T) { pool := conc.NewDefaultPool[any]() c := pool.Cap() diff --git a/internal/querynodev2/segments/segment.go b/internal/querynodev2/segments/segment.go index 0f79a5c1ec291..614f3102213c1 100644 --- a/internal/querynodev2/segments/segment.go +++ b/internal/querynodev2/segments/segment.go @@ -1367,7 +1367,7 @@ func (s *LocalSegment) WarmupChunkCache(ctx context.Context, fieldID int64) { warmingUp := strings.ToLower(paramtable.Get().QueryNodeCfg.ChunkCacheWarmingUp.GetValue()) switch warmingUp { case "sync": - GetLoadPool().Submit(func() (any, error) { + GetWarmupPool().Submit(func() (any, error) { cFieldID := C.int64_t(fieldID) status = C.WarmupChunkCache(s.ptr, cFieldID) if err := HandleCStatus(ctx, &status, "warming up chunk cache failed"); err != nil { @@ -1378,7 +1378,7 @@ func (s *LocalSegment) WarmupChunkCache(ctx context.Context, fieldID int64) { return nil, nil }).Await() case "async": - GetLoadPool().Submit(func() (any, error) { + GetWarmupPool().Submit(func() (any, error) { if !s.ptrLock.RLockIf(state.IsNotReleased) { return nil, nil } diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index 04296f296ed87..b6fcd5f763922 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -2347,13 +2347,13 @@ func (p *queryNodeConfig) init(base *BaseTable) { p.ChunkCacheWarmingUp = ParamItem{ Key: "queryNode.cache.warmup", Version: "2.3.6", - DefaultValue: "async", - Doc: `options: async, sync, off. + DefaultValue: "disable", + Doc: `options: async, sync, disable. Specifies the necessity for warming up the chunk cache. -1. If set to "sync" or "async," the original vector data will be synchronously/asynchronously loaded into the +1. If set to "sync" or "async" the original vector data will be synchronously/asynchronously loaded into the chunk cache during the load process. This approach has the potential to substantially reduce query/search latency for a specific duration post-load, albeit accompanied by a concurrent increase in disk usage; -2. If set to "off," original vector data will only be loaded into the chunk cache during search/query.`, +2. If set to "disable" original vector data will only be loaded into the chunk cache during search/query.`, Export: true, } p.ChunkCacheWarmingUp.Init(base.mgr) diff --git a/pkg/util/paramtable/component_param_test.go b/pkg/util/paramtable/component_param_test.go index c7aea9fb820d0..f1417089eed30 100644 --- a/pkg/util/paramtable/component_param_test.go +++ b/pkg/util/paramtable/component_param_test.go @@ -339,7 +339,7 @@ func TestComponentParam(t *testing.T) { // chunk cache assert.Equal(t, "willneed", Params.ReadAheadPolicy.GetValue()) - assert.Equal(t, "async", Params.ChunkCacheWarmingUp.GetValue()) + assert.Equal(t, "disable", Params.ChunkCacheWarmingUp.GetValue()) // test small indexNlist/NProbe default params.Remove("queryNode.segcore.smallIndex.nlist")