diff --git a/internal/core/src/storage/ThreadPool.h b/internal/core/src/storage/ThreadPool.h index 6b5e4cfa9fb4c..dd6098ed9df88 100644 --- a/internal/core/src/storage/ThreadPool.h +++ b/internal/core/src/storage/ThreadPool.h @@ -39,6 +39,13 @@ class ThreadPool { current_threads_size_ = 0; min_threads_size_ = CPU_NUM; max_threads_size_ = CPU_NUM * thread_core_coefficient; + + // only IO pool will set large limit, but the CPU helps nothing to IO operations, + // we need to limit the max thread num, each thread will download 16 MiB data, + // it should be not greater than 256 (4GiB data) to avoid OOM and send too many requests to object storage + if (max_threads_size_ > 256) { + max_threads_size_ = 256; + } LOG_SEGCORE_INFO_ << "Init thread pool:" << name_ << " with min worker num:" << min_threads_size_ << " and max worker num:" << max_threads_size_; diff --git a/internal/querynodev2/segments/segment_loader.go b/internal/querynodev2/segments/segment_loader.go index 23f0a071b507c..283946f8d34a6 100644 --- a/internal/querynodev2/segments/segment_loader.go +++ b/internal/querynodev2/segments/segment_loader.go @@ -367,6 +367,9 @@ func (loader *segmentLoader) requestResource(ctx context.Context, infos ...*quer diskCap := paramtable.Get().QueryNodeCfg.DiskCapacityLimit.GetAsUint64() poolCap := runtime.NumCPU() * paramtable.Get().CommonCfg.HighPriorityThreadCoreCoefficient.GetAsInt() + if poolCap > 256 { + poolCap = 256 + } if loader.committedResource.WorkNum >= poolCap { return resource, 0, merr.WrapErrServiceRequestLimitExceeded(int32(poolCap)) } else if loader.committedResource.MemorySize+memoryUsage >= totalMemory {