Skip to content

Commit

Permalink
enhance: Revert (#30197 #30690 #30415) (#30795)
Browse files Browse the repository at this point in the history
Revert "enhance: reduce many I/O operations while loading disk index
(#30189) (#30690)" This reverts commit
d4c4bf9.

Revert "enhance: limit the max pool size to 16 (#30371) (#30415)" This
reverts commit 52ac071.

Revert "enhance: convert the `GetObject` util to async (#30166)
(#30197)" This reverts commit 4b7c5ba.

Signed-off-by: zhenshan.cao <[email protected]>
  • Loading branch information
czs007 authored Feb 24, 2024
1 parent 2896f5e commit 2f4a13a
Show file tree
Hide file tree
Showing 6 changed files with 98 additions and 60 deletions.
81 changes: 42 additions & 39 deletions internal/core/src/storage/DiskFileManagerImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,27 +14,20 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <sys/fcntl.h>
#include <algorithm>
#include <boost/filesystem.hpp>
#include <mutex>
#include <utility>

#include "common/Common.h"
#include "common/Consts.h"
#include "common/EasyAssert.h"
#include "common/File.h"
#include "common/Slice.h"
#include "log/Log.h"

#include "storage/DiskFileManagerImpl.h"
#include "storage/FieldData.h"
#include "storage/FieldDataInterface.h"
#include "storage/FileManager.h"
#include "storage/IndexData.h"
#include "storage/LocalChunkManagerSingleton.h"
#include "storage/ThreadPools.h"
#include "storage/IndexData.h"
#include "storage/Util.h"
#include "storage/ThreadPools.h"

namespace milvus::storage {

Expand Down Expand Up @@ -123,27 +116,28 @@ DiskFileManagerImpl::AddBatchIndexFiles(
const std::vector<int64_t>& remote_file_sizes) {
auto local_chunk_manager =
LocalChunkManagerSingleton::GetInstance().GetChunkManager();
auto& pool = ThreadPools::GetThreadPool(milvus::ThreadPoolPriority::HIGH);
auto& pool = ThreadPools::GetThreadPool(milvus::ThreadPoolPriority::MIDDLE);

auto LoadIndexFromDisk = [&](
const std::string& file,
const int64_t offset,
const int64_t data_size) -> std::shared_ptr<uint8_t[]> {
auto buf = std::shared_ptr<uint8_t[]>(new uint8_t[data_size]);
local_chunk_manager->Read(file, offset, buf.get(), data_size);
return buf;
};

std::vector<std::future<std::shared_ptr<uint8_t[]>>> futures;
futures.reserve(remote_file_sizes.size());
AssertInfo(local_file_offsets.size() == remote_files.size(),
"inconsistent size of offset slices with file slices");
AssertInfo(remote_files.size() == remote_file_sizes.size(),
"inconsistent size of file slices with size slices");

for (int64_t i = 0; i < remote_files.size(); ++i) {
futures.push_back(pool.Submit(
[&](const std::string& file,
const int64_t offset,
const int64_t data_size) -> std::shared_ptr<uint8_t[]> {
auto buf = std::shared_ptr<uint8_t[]>(new uint8_t[data_size]);
local_chunk_manager->Read(file, offset, buf.get(), data_size);
return buf;
},
local_file_name,
local_file_offsets[i],
remote_file_sizes[i]));
futures.push_back(pool.Submit(LoadIndexFromDisk,
local_file_name,
local_file_offsets[i],
remote_file_sizes[i]));
}

// hold index data util upload index file done
Expand All @@ -161,8 +155,8 @@ DiskFileManagerImpl::AddBatchIndexFiles(
remote_files,
field_meta_,
index_meta_);
for (auto& re : res) {
remote_paths_to_size_[re.first] = re.second;
for (auto iter = res.begin(); iter != res.end(); ++iter) {
remote_paths_to_size_[iter->first] = iter->second;
}
}

Expand All @@ -183,30 +177,39 @@ DiskFileManagerImpl::CacheIndexToDisk(
std::sort(slices.second.begin(), slices.second.end());
}

auto EstimateParallelDegree = [&](const std::string& file) -> uint64_t {
auto fileSize = rcm_->Size(file);
return uint64_t(DEFAULT_FIELD_MAX_MEMORY_LIMIT / fileSize);
};

for (auto& slices : index_slices) {
auto prefix = slices.first;
auto local_index_file_name =
GetLocalIndexObjectPrefix() +
prefix.substr(prefix.find_last_of('/') + 1);
local_chunk_manager->CreateFile(local_index_file_name);
auto file =
File::Open(local_index_file_name, O_CREAT | O_RDWR | O_TRUNC);

// Get the remote files
int64_t offset = 0;
std::vector<std::string> batch_remote_files;
batch_remote_files.reserve(slices.second.size());
uint64_t max_parallel_degree = INT_MAX;
for (int& iter : slices.second) {
if (batch_remote_files.size() == max_parallel_degree) {
auto next_offset = CacheBatchIndexFilesToDisk(
batch_remote_files, local_index_file_name, offset);
offset = next_offset;
batch_remote_files.clear();
}
auto origin_file = prefix + "_" + std::to_string(iter);
if (batch_remote_files.size() == 0) {
// Use first file size as average size to estimate
max_parallel_degree = EstimateParallelDegree(origin_file);
}
batch_remote_files.push_back(origin_file);
}

auto index_chunks = GetObjectData(rcm_.get(), batch_remote_files);
for (auto& chunk : index_chunks) {
auto index_data = chunk.get()->GetFieldData();
auto index_size = index_data->Size();
auto chunk_data = reinterpret_cast<uint8_t*>(
const_cast<void*>(index_data->Data()));
file.Write(chunk_data, index_size);
if (batch_remote_files.size() > 0) {
auto next_offset = CacheBatchIndexFilesToDisk(
batch_remote_files, local_index_file_name, offset);
offset = next_offset;
batch_remote_files.clear();
}
local_paths_.emplace_back(local_index_file_name);
}
Expand All @@ -226,7 +229,7 @@ DiskFileManagerImpl::CacheBatchIndexFilesToDisk(

uint64_t offset = local_file_init_offfset;
for (int i = 0; i < batch_size; ++i) {
auto index_data = index_datas[i].get()->GetFieldData();
auto index_data = index_datas[i];
auto index_size = index_data->Size();
auto uint8_data =
reinterpret_cast<uint8_t*>(const_cast<void*>(index_data->Data()));
Expand Down Expand Up @@ -270,7 +273,7 @@ DiskFileManagerImpl::CacheRawDataToDisk(std::vector<std::string> remote_files) {
auto field_datas = GetObjectData(rcm_.get(), batch_files);
int batch_size = batch_files.size();
for (int i = 0; i < batch_size; ++i) {
auto field_data = field_datas[i].get()->GetFieldData();
auto field_data = field_datas[i];
num_rows += uint32_t(field_data->get_num_rows());
AssertInfo(dim == 0 || dim == field_data->get_dim(),
"inconsistent dim value in multi binlogs!");
Expand Down
5 changes: 2 additions & 3 deletions internal/core/src/storage/MemFileManagerImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,7 @@ MemFileManagerImpl::LoadIndexToMemory(
for (size_t idx = 0; idx < batch_files.size(); ++idx) {
auto file_name =
batch_files[idx].substr(batch_files[idx].find_last_of('/') + 1);
file_to_index_data[file_name] =
index_datas[idx].get()->GetFieldData();
file_to_index_data[file_name] = index_datas[idx];
}
};

Expand Down Expand Up @@ -138,7 +137,7 @@ MemFileManagerImpl::CacheRawDataToMemory(
auto FetchRawData = [&]() {
auto raw_datas = GetObjectData(rcm_.get(), batch_files);
for (auto& data : raw_datas) {
field_datas.emplace_back(data.get()->GetFieldData());
field_datas.emplace_back(data);
}
};

Expand Down
11 changes: 4 additions & 7 deletions internal/core/src/storage/ThreadPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,10 @@ class ThreadPool {
max_threads_size_ = CPU_NUM * thread_core_coefficient;

// only IO pool will set large limit, but the CPU helps nothing to IO operations,
// we need to limit the max thread num, each thread will download 16~64 MiB data,
// according to our benchmark, 16 threads is enough to saturate the network bandwidth.
if (min_threads_size_ > 16) {
min_threads_size_ = 16;
}
if (max_threads_size_ > 16) {
max_threads_size_ = 16;
// we need to limit the max thread num, each thread will download 16 MiB data,
// it should be not greater than 256 (4GiB data) to avoid OOM and send too many requests to object storage
if (max_threads_size_ > 256) {
max_threads_size_ = 256;
}
LOG_SEGCORE_INFO_ << "Init thread pool:" << name_
<< " with min worker num:" << min_threads_size_
Expand Down
51 changes: 48 additions & 3 deletions internal/core/src/storage/Util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -447,17 +447,62 @@ EncodeAndUploadFieldSlice(ChunkManager* chunk_manager,
return std::make_pair(std::move(object_key), serialized_index_size);
}

std::vector<std::future<std::unique_ptr<DataCodec>>>
// /**
// * Returns the current resident set size (physical memory use) measured
// * in bytes, or zero if the value cannot be determined on this OS.
// */
// size_t
// getCurrentRSS() {
// #if defined(_WIN32)
// /* Windows -------------------------------------------------- */
// PROCESS_MEMORY_COUNTERS info;
// GetProcessMemoryInfo(GetCurrentProcess(), &info, sizeof(info));
// return (size_t)info.WorkingSetSize;

// #elif defined(__APPLE__) && defined(__MACH__)
// /* OSX ------------------------------------------------------ */
// struct mach_task_basic_info info;
// mach_msg_type_number_t infoCount = MACH_TASK_BASIC_INFO_COUNT;
// if (task_info(mach_task_self(), MACH_TASK_BASIC_INFO, (task_info_t)&info, &infoCount) != KERN_SUCCESS)
// return (size_t)0L; /* Can't access? */
// return (size_t)info.resident_size;

// #elif defined(__linux__) || defined(__linux) || defined(linux) || defined(__gnu_linux__)
// /* Linux ---------------------------------------------------- */
// long rss = 0L;
// FILE* fp = NULL;
// if ((fp = fopen("/proc/self/statm", "r")) == NULL)
// return (size_t)0L; /* Can't open? */
// if (fscanf(fp, "%*s%ld", &rss) != 1) {
// fclose(fp);
// return (size_t)0L; /* Can't read? */
// }
// fclose(fp);
// return (size_t)rss * (size_t)sysconf(_SC_PAGESIZE);

// #else
// /* AIX, BSD, Solaris, and Unknown OS ------------------------ */
// return (size_t)0L; /* Unsupported. */
// #endif
// }

std::vector<FieldDataPtr>
GetObjectData(ChunkManager* remote_chunk_manager,
const std::vector<std::string>& remote_files) {
auto& pool = ThreadPools::GetThreadPool(milvus::ThreadPoolPriority::HIGH);
std::vector<std::future<std::unique_ptr<DataCodec>>> futures;
futures.reserve(remote_files.size());
for (auto& file : remote_files) {
futures.emplace_back(pool.Submit(
DownloadAndDecodeRemoteFile, remote_chunk_manager, file));
}
return futures;

std::vector<FieldDataPtr> datas;
for (int i = 0; i < futures.size(); ++i) {
auto res = futures[i].get();
datas.emplace_back(res->GetFieldData());
}
ReleaseArrowUnused();
return datas;
}

std::map<std::string, int64_t>
Expand Down
3 changes: 1 addition & 2 deletions internal/core/src/storage/Util.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
#include <memory>
#include <string>
#include <vector>
#include <future>

#include "storage/FieldData.h"
#include "storage/PayloadStream.h"
Expand Down Expand Up @@ -103,7 +102,7 @@ EncodeAndUploadFieldSlice(ChunkManager* chunk_manager,
const FieldMeta& field_meta,
std::string object_key);

std::vector<std::future<std::unique_ptr<DataCodec>>>
std::vector<FieldDataPtr>
GetObjectData(ChunkManager* remote_chunk_manager,
const std::vector<std::string>& remote_files);

Expand Down
7 changes: 1 addition & 6 deletions internal/querynodev2/segments/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,8 @@ func initDynamicPool() {

func initLoadPool() {
loadOnce.Do(func() {
pt := paramtable.Get()
poolSize := hardware.GetCPUNum() * pt.CommonCfg.MiddlePriorityThreadCoreCoefficient.GetAsInt()
if poolSize > 16 {
poolSize = 16
}
pool := conc.NewPool[any](
poolSize,
hardware.GetCPUNum()*paramtable.Get().CommonCfg.MiddlePriorityThreadCoreCoefficient.GetAsInt(),
conc.WithPreAlloc(false),
conc.WithDisablePurge(false),
conc.WithPreHandler(runtime.LockOSThread), // lock os thread for cgo thread disposal
Expand Down

0 comments on commit 2f4a13a

Please sign in to comment.