From 2f4a13a7ae84d1d12994593e7fb09f680d33b658 Mon Sep 17 00:00:00 2001 From: "zhenshan.cao" Date: Sat, 24 Feb 2024 09:07:46 +0800 Subject: [PATCH] enhance: Revert (#30197 #30690 #30415) (#30795) Revert "enhance: reduce many I/O operations while loading disk index (#30189) (#30690)" This reverts commit d4c4bf946b15bc537acd170dfd1d938bea237c7a. Revert "enhance: limit the max pool size to 16 (#30371) (#30415)" This reverts commit 52ac0718f059d4aa45c5908ec8507e6045b24e1f. Revert "enhance: convert the `GetObject` util to async (#30166) (#30197)" This reverts commit 4b7c5baab773366aa8084762e7321130c4f894b7. Signed-off-by: zhenshan.cao --- .../core/src/storage/DiskFileManagerImpl.cpp | 81 ++++++++++--------- .../core/src/storage/MemFileManagerImpl.cpp | 5 +- internal/core/src/storage/ThreadPool.h | 11 +-- internal/core/src/storage/Util.cpp | 51 +++++++++++- internal/core/src/storage/Util.h | 3 +- internal/querynodev2/segments/pool.go | 7 +- 6 files changed, 98 insertions(+), 60 deletions(-) diff --git a/internal/core/src/storage/DiskFileManagerImpl.cpp b/internal/core/src/storage/DiskFileManagerImpl.cpp index 61d090bcc71d9..5364267061c1e 100644 --- a/internal/core/src/storage/DiskFileManagerImpl.cpp +++ b/internal/core/src/storage/DiskFileManagerImpl.cpp @@ -14,27 +14,20 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include #include #include #include #include #include "common/Common.h" -#include "common/Consts.h" -#include "common/EasyAssert.h" -#include "common/File.h" #include "common/Slice.h" #include "log/Log.h" #include "storage/DiskFileManagerImpl.h" -#include "storage/FieldData.h" -#include "storage/FieldDataInterface.h" -#include "storage/FileManager.h" -#include "storage/IndexData.h" #include "storage/LocalChunkManagerSingleton.h" -#include "storage/ThreadPools.h" +#include "storage/IndexData.h" #include "storage/Util.h" +#include "storage/ThreadPools.h" namespace milvus::storage { @@ -123,27 +116,28 @@ DiskFileManagerImpl::AddBatchIndexFiles( const std::vector& remote_file_sizes) { auto local_chunk_manager = LocalChunkManagerSingleton::GetInstance().GetChunkManager(); - auto& pool = ThreadPools::GetThreadPool(milvus::ThreadPoolPriority::HIGH); + auto& pool = ThreadPools::GetThreadPool(milvus::ThreadPoolPriority::MIDDLE); + + auto LoadIndexFromDisk = [&]( + const std::string& file, + const int64_t offset, + const int64_t data_size) -> std::shared_ptr { + auto buf = std::shared_ptr(new uint8_t[data_size]); + local_chunk_manager->Read(file, offset, buf.get(), data_size); + return buf; + }; std::vector>> futures; - futures.reserve(remote_file_sizes.size()); AssertInfo(local_file_offsets.size() == remote_files.size(), "inconsistent size of offset slices with file slices"); AssertInfo(remote_files.size() == remote_file_sizes.size(), "inconsistent size of file slices with size slices"); for (int64_t i = 0; i < remote_files.size(); ++i) { - futures.push_back(pool.Submit( - [&](const std::string& file, - const int64_t offset, - const int64_t data_size) -> std::shared_ptr { - auto buf = std::shared_ptr(new uint8_t[data_size]); - local_chunk_manager->Read(file, offset, buf.get(), data_size); - return buf; - }, - local_file_name, - local_file_offsets[i], - remote_file_sizes[i])); + futures.push_back(pool.Submit(LoadIndexFromDisk, + local_file_name, + local_file_offsets[i], + remote_file_sizes[i])); } // hold index data util upload index file done @@ -161,8 +155,8 @@ DiskFileManagerImpl::AddBatchIndexFiles( remote_files, field_meta_, index_meta_); - for (auto& re : res) { - remote_paths_to_size_[re.first] = re.second; + for (auto iter = res.begin(); iter != res.end(); ++iter) { + remote_paths_to_size_[iter->first] = iter->second; } } @@ -183,30 +177,39 @@ DiskFileManagerImpl::CacheIndexToDisk( std::sort(slices.second.begin(), slices.second.end()); } + auto EstimateParallelDegree = [&](const std::string& file) -> uint64_t { + auto fileSize = rcm_->Size(file); + return uint64_t(DEFAULT_FIELD_MAX_MEMORY_LIMIT / fileSize); + }; + for (auto& slices : index_slices) { auto prefix = slices.first; auto local_index_file_name = GetLocalIndexObjectPrefix() + prefix.substr(prefix.find_last_of('/') + 1); local_chunk_manager->CreateFile(local_index_file_name); - auto file = - File::Open(local_index_file_name, O_CREAT | O_RDWR | O_TRUNC); - - // Get the remote files + int64_t offset = 0; std::vector batch_remote_files; - batch_remote_files.reserve(slices.second.size()); + uint64_t max_parallel_degree = INT_MAX; for (int& iter : slices.second) { + if (batch_remote_files.size() == max_parallel_degree) { + auto next_offset = CacheBatchIndexFilesToDisk( + batch_remote_files, local_index_file_name, offset); + offset = next_offset; + batch_remote_files.clear(); + } auto origin_file = prefix + "_" + std::to_string(iter); + if (batch_remote_files.size() == 0) { + // Use first file size as average size to estimate + max_parallel_degree = EstimateParallelDegree(origin_file); + } batch_remote_files.push_back(origin_file); } - - auto index_chunks = GetObjectData(rcm_.get(), batch_remote_files); - for (auto& chunk : index_chunks) { - auto index_data = chunk.get()->GetFieldData(); - auto index_size = index_data->Size(); - auto chunk_data = reinterpret_cast( - const_cast(index_data->Data())); - file.Write(chunk_data, index_size); + if (batch_remote_files.size() > 0) { + auto next_offset = CacheBatchIndexFilesToDisk( + batch_remote_files, local_index_file_name, offset); + offset = next_offset; + batch_remote_files.clear(); } local_paths_.emplace_back(local_index_file_name); } @@ -226,7 +229,7 @@ DiskFileManagerImpl::CacheBatchIndexFilesToDisk( uint64_t offset = local_file_init_offfset; for (int i = 0; i < batch_size; ++i) { - auto index_data = index_datas[i].get()->GetFieldData(); + auto index_data = index_datas[i]; auto index_size = index_data->Size(); auto uint8_data = reinterpret_cast(const_cast(index_data->Data())); @@ -270,7 +273,7 @@ DiskFileManagerImpl::CacheRawDataToDisk(std::vector remote_files) { auto field_datas = GetObjectData(rcm_.get(), batch_files); int batch_size = batch_files.size(); for (int i = 0; i < batch_size; ++i) { - auto field_data = field_datas[i].get()->GetFieldData(); + auto field_data = field_datas[i]; num_rows += uint32_t(field_data->get_num_rows()); AssertInfo(dim == 0 || dim == field_data->get_dim(), "inconsistent dim value in multi binlogs!"); diff --git a/internal/core/src/storage/MemFileManagerImpl.cpp b/internal/core/src/storage/MemFileManagerImpl.cpp index e51ae4a5ffbcf..e7c0c49428ecb 100644 --- a/internal/core/src/storage/MemFileManagerImpl.cpp +++ b/internal/core/src/storage/MemFileManagerImpl.cpp @@ -98,8 +98,7 @@ MemFileManagerImpl::LoadIndexToMemory( for (size_t idx = 0; idx < batch_files.size(); ++idx) { auto file_name = batch_files[idx].substr(batch_files[idx].find_last_of('/') + 1); - file_to_index_data[file_name] = - index_datas[idx].get()->GetFieldData(); + file_to_index_data[file_name] = index_datas[idx]; } }; @@ -138,7 +137,7 @@ MemFileManagerImpl::CacheRawDataToMemory( auto FetchRawData = [&]() { auto raw_datas = GetObjectData(rcm_.get(), batch_files); for (auto& data : raw_datas) { - field_datas.emplace_back(data.get()->GetFieldData()); + field_datas.emplace_back(data); } }; diff --git a/internal/core/src/storage/ThreadPool.h b/internal/core/src/storage/ThreadPool.h index 9478968ca3825..dd6098ed9df88 100644 --- a/internal/core/src/storage/ThreadPool.h +++ b/internal/core/src/storage/ThreadPool.h @@ -41,13 +41,10 @@ class ThreadPool { max_threads_size_ = CPU_NUM * thread_core_coefficient; // only IO pool will set large limit, but the CPU helps nothing to IO operations, - // we need to limit the max thread num, each thread will download 16~64 MiB data, - // according to our benchmark, 16 threads is enough to saturate the network bandwidth. - if (min_threads_size_ > 16) { - min_threads_size_ = 16; - } - if (max_threads_size_ > 16) { - max_threads_size_ = 16; + // we need to limit the max thread num, each thread will download 16 MiB data, + // it should be not greater than 256 (4GiB data) to avoid OOM and send too many requests to object storage + if (max_threads_size_ > 256) { + max_threads_size_ = 256; } LOG_SEGCORE_INFO_ << "Init thread pool:" << name_ << " with min worker num:" << min_threads_size_ diff --git a/internal/core/src/storage/Util.cpp b/internal/core/src/storage/Util.cpp index 764ba7e2aada6..b3cde65ab38c1 100644 --- a/internal/core/src/storage/Util.cpp +++ b/internal/core/src/storage/Util.cpp @@ -447,17 +447,62 @@ EncodeAndUploadFieldSlice(ChunkManager* chunk_manager, return std::make_pair(std::move(object_key), serialized_index_size); } -std::vector>> +// /** +// * Returns the current resident set size (physical memory use) measured +// * in bytes, or zero if the value cannot be determined on this OS. +// */ +// size_t +// getCurrentRSS() { +// #if defined(_WIN32) +// /* Windows -------------------------------------------------- */ +// PROCESS_MEMORY_COUNTERS info; +// GetProcessMemoryInfo(GetCurrentProcess(), &info, sizeof(info)); +// return (size_t)info.WorkingSetSize; + +// #elif defined(__APPLE__) && defined(__MACH__) +// /* OSX ------------------------------------------------------ */ +// struct mach_task_basic_info info; +// mach_msg_type_number_t infoCount = MACH_TASK_BASIC_INFO_COUNT; +// if (task_info(mach_task_self(), MACH_TASK_BASIC_INFO, (task_info_t)&info, &infoCount) != KERN_SUCCESS) +// return (size_t)0L; /* Can't access? */ +// return (size_t)info.resident_size; + +// #elif defined(__linux__) || defined(__linux) || defined(linux) || defined(__gnu_linux__) +// /* Linux ---------------------------------------------------- */ +// long rss = 0L; +// FILE* fp = NULL; +// if ((fp = fopen("/proc/self/statm", "r")) == NULL) +// return (size_t)0L; /* Can't open? */ +// if (fscanf(fp, "%*s%ld", &rss) != 1) { +// fclose(fp); +// return (size_t)0L; /* Can't read? */ +// } +// fclose(fp); +// return (size_t)rss * (size_t)sysconf(_SC_PAGESIZE); + +// #else +// /* AIX, BSD, Solaris, and Unknown OS ------------------------ */ +// return (size_t)0L; /* Unsupported. */ +// #endif +// } + +std::vector GetObjectData(ChunkManager* remote_chunk_manager, const std::vector& remote_files) { auto& pool = ThreadPools::GetThreadPool(milvus::ThreadPoolPriority::HIGH); std::vector>> futures; - futures.reserve(remote_files.size()); for (auto& file : remote_files) { futures.emplace_back(pool.Submit( DownloadAndDecodeRemoteFile, remote_chunk_manager, file)); } - return futures; + + std::vector datas; + for (int i = 0; i < futures.size(); ++i) { + auto res = futures[i].get(); + datas.emplace_back(res->GetFieldData()); + } + ReleaseArrowUnused(); + return datas; } std::map diff --git a/internal/core/src/storage/Util.h b/internal/core/src/storage/Util.h index b264f915c1117..e732371a1cd14 100644 --- a/internal/core/src/storage/Util.h +++ b/internal/core/src/storage/Util.h @@ -19,7 +19,6 @@ #include #include #include -#include #include "storage/FieldData.h" #include "storage/PayloadStream.h" @@ -103,7 +102,7 @@ EncodeAndUploadFieldSlice(ChunkManager* chunk_manager, const FieldMeta& field_meta, std::string object_key); -std::vector>> +std::vector GetObjectData(ChunkManager* remote_chunk_manager, const std::vector& remote_files); diff --git a/internal/querynodev2/segments/pool.go b/internal/querynodev2/segments/pool.go index a55990c6a1a67..29c6c65e56bb2 100644 --- a/internal/querynodev2/segments/pool.go +++ b/internal/querynodev2/segments/pool.go @@ -71,13 +71,8 @@ func initDynamicPool() { func initLoadPool() { loadOnce.Do(func() { - pt := paramtable.Get() - poolSize := hardware.GetCPUNum() * pt.CommonCfg.MiddlePriorityThreadCoreCoefficient.GetAsInt() - if poolSize > 16 { - poolSize = 16 - } pool := conc.NewPool[any]( - poolSize, + hardware.GetCPUNum()*paramtable.Get().CommonCfg.MiddlePriorityThreadCoreCoefficient.GetAsInt(), conc.WithPreAlloc(false), conc.WithDisablePurge(false), conc.WithPreHandler(runtime.LockOSThread), // lock os thread for cgo thread disposal