Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: fix directory confusion between indexnode and querynode #38628

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions internal/core/src/common/Common.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,32 @@ extern int64_t LOW_PRIORITY_THREAD_CORE_COEFFICIENT;
extern int CPU_NUM;
extern int64_t EXEC_EVAL_EXPR_BATCH_SIZE;

enum class Role { QueryNode, IndexNode };

const std::unordered_map<Role, std::string> RoleToStringMap = {
{Role::QueryNode, "querynode"}, {Role::IndexNode, "indexnode"}};

// convert role to string
inline std::string
ToString(Role role) {
auto it = RoleToStringMap.find(role);
if (it != RoleToStringMap.end()) {
return it->second;
}
PanicInfo(UnexpectedError, "role {} not found", int(role));
}

// convert string to role
inline Role
FromString(const std::string& role_str) {
for (const auto& pair : RoleToStringMap) {
if (pair.second == role_str) {
return pair.first;
}
}
PanicInfo(UnexpectedError, "role {} not found", role_str);
}

void
SetIndexSliceSize(const int64_t size);

Expand Down
2 changes: 1 addition & 1 deletion internal/core/src/index/InvertedIndexTantivy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ InvertedIndexTantivy<T>::InvertedIndexTantivy(
template <typename T>
InvertedIndexTantivy<T>::~InvertedIndexTantivy() {
auto local_chunk_manager =
storage::LocalChunkManagerSingleton::GetInstance().GetChunkManager();
storage::LocalChunkManagerFactory::GetInstance().GetChunkManager();
auto prefix = path_;
LOG_INFO("inverted index remove path:{}", path_);
local_chunk_manager->RemoveDir(prefix);
Expand Down
10 changes: 6 additions & 4 deletions internal/core/src/index/VectorDiskIndex.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ VectorDiskAnnIndex<T>::VectorDiskAnnIndex(
std::make_shared<storage::DiskFileManagerImpl>(file_manager_context);
AssertInfo(file_manager_ != nullptr, "create file manager failed!");
auto local_chunk_manager =
storage::LocalChunkManagerSingleton::GetInstance().GetChunkManager();
storage::LocalChunkManagerFactory::GetInstance().GetChunkManager();
auto local_index_path_prefix = file_manager_->GetLocalIndexObjectPrefix();

// As we have guarded dup-load in QueryNode,
Expand Down Expand Up @@ -135,7 +135,8 @@ template <typename T>
void
VectorDiskAnnIndex<T>::Build(const Config& config) {
auto local_chunk_manager =
storage::LocalChunkManagerSingleton::GetInstance().GetChunkManager();
storage::LocalChunkManagerFactory::GetInstance().GetChunkManager(
milvus::Role::IndexNode);
knowhere::Json build_config;
build_config.update(config);

Expand Down Expand Up @@ -185,7 +186,8 @@ void
VectorDiskAnnIndex<T>::BuildWithDataset(const DatasetPtr& dataset,
const Config& config) {
auto local_chunk_manager =
storage::LocalChunkManagerSingleton::GetInstance().GetChunkManager();
storage::LocalChunkManagerFactory::GetInstance().GetChunkManager(
milvus::Role::IndexNode);
knowhere::Json build_config;
build_config.update(config);
// set data path
Expand Down Expand Up @@ -376,7 +378,7 @@ template <typename T>
void
VectorDiskAnnIndex<T>::CleanLocalData() {
auto local_chunk_manager =
storage::LocalChunkManagerSingleton::GetInstance().GetChunkManager();
storage::LocalChunkManagerFactory::GetInstance().GetChunkManager();
local_chunk_manager->RemoveDir(file_manager_->GetLocalIndexObjectPrefix());
local_chunk_manager->RemoveDir(
file_manager_->GetLocalRawDataObjectPrefix());
Expand Down
5 changes: 3 additions & 2 deletions internal/core/src/segcore/load_index_c.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "common/EasyAssert.h"
#include "common/Types.h"
#include "common/type_c.h"
#include "common/Common.h"
#include "index/Index.h"
#include "index/IndexFactory.h"
#include "index/Meta.h"
Expand Down Expand Up @@ -424,8 +425,8 @@ CleanLoadedIndex(CLoadIndexInfo c_load_index_info) {
auto load_index_info =
(milvus::segcore::LoadIndexInfo*)c_load_index_info;
auto local_chunk_manager =
milvus::storage::LocalChunkManagerSingleton::GetInstance()
.GetChunkManager();
milvus::storage::LocalChunkManagerFactory::GetInstance()
.GetChunkManager(milvus::Role::QueryNode);
auto index_file_path_prefix =
milvus::storage::GenIndexPathPrefix(local_chunk_manager,
load_index_info->index_build_id,
Expand Down
36 changes: 23 additions & 13 deletions internal/core/src/storage/DiskFileManagerImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,18 @@ namespace milvus::storage {
DiskFileManagerImpl::DiskFileManagerImpl(
const FileManagerContext& fileManagerContext)
: FileManagerImpl(fileManagerContext.fieldDataMeta,
fileManagerContext.indexMeta) {
fileManagerContext.indexMeta),
for_loading_index_(fileManagerContext.for_loading_index) {
rcm_ = fileManagerContext.chunkManagerPtr;
}

DiskFileManagerImpl::~DiskFileManagerImpl() {
auto local_chunk_manager =
LocalChunkManagerSingleton::GetInstance().GetChunkManager();
for_loading_index_
? LocalChunkManagerFactory::GetInstance().GetChunkManager(
milvus::Role::QueryNode)
: LocalChunkManagerFactory::GetInstance().GetChunkManager(
milvus::Role::IndexNode);
local_chunk_manager->RemoveDir(GetIndexPathPrefixWithBuildID(
local_chunk_manager, index_meta_.build_id));
}
Expand Down Expand Up @@ -82,7 +87,7 @@ DiskFileManagerImpl::GetRemoteTextLogPath(const std::string& file_name,
bool
DiskFileManagerImpl::AddFile(const std::string& file) noexcept {
auto local_chunk_manager =
LocalChunkManagerSingleton::GetInstance().GetChunkManager();
LocalChunkManagerFactory::GetInstance().GetChunkManager();
FILEMANAGER_TRY
if (!local_chunk_manager->Exist(file)) {
LOG_ERROR("local file {} not exists", file);
Expand Down Expand Up @@ -134,7 +139,7 @@ DiskFileManagerImpl::AddFile(const std::string& file) noexcept {
bool
DiskFileManagerImpl::AddTextLog(const std::string& file) noexcept {
auto local_chunk_manager =
LocalChunkManagerSingleton::GetInstance().GetChunkManager();
LocalChunkManagerFactory::GetInstance().GetChunkManager();
FILEMANAGER_TRY
if (!local_chunk_manager->Exist(file)) {
LOG_ERROR("local file {} not exists", file);
Expand Down Expand Up @@ -190,7 +195,7 @@ DiskFileManagerImpl::AddBatchIndexFiles(
const std::vector<std::string>& remote_files,
const std::vector<int64_t>& remote_file_sizes) {
auto local_chunk_manager =
LocalChunkManagerSingleton::GetInstance().GetChunkManager();
LocalChunkManagerFactory::GetInstance().GetChunkManager();
auto& pool = ThreadPools::GetThreadPool(milvus::ThreadPoolPriority::HIGH);

std::vector<std::future<std::shared_ptr<uint8_t[]>>> futures;
Expand Down Expand Up @@ -239,7 +244,7 @@ void
DiskFileManagerImpl::CacheIndexToDisk(
const std::vector<std::string>& remote_files) {
auto local_chunk_manager =
LocalChunkManagerSingleton::GetInstance().GetChunkManager();
LocalChunkManagerFactory::GetInstance().GetChunkManager();

std::map<std::string, std::vector<int>> index_slices;
for (auto& file_path : remote_files) {
Expand Down Expand Up @@ -307,7 +312,7 @@ void
DiskFileManagerImpl::CacheTextLogToDisk(
const std::vector<std::string>& remote_files) {
auto local_chunk_manager =
LocalChunkManagerSingleton::GetInstance().GetChunkManager();
LocalChunkManagerFactory::GetInstance().GetChunkManager();

std::map<std::string, std::vector<int>> index_slices;
for (auto& file_path : remote_files) {
Expand Down Expand Up @@ -376,7 +381,8 @@ DiskFileManagerImpl::CacheRawDataToDisk(std::vector<std::string> remote_files) {
auto field_id = GetFieldDataMeta().field_id;

auto local_chunk_manager =
LocalChunkManagerSingleton::GetInstance().GetChunkManager();
LocalChunkManagerFactory::GetInstance().GetChunkManager(
milvus::Role::IndexNode);
std::string local_data_path;
bool file_created = false;

Expand Down Expand Up @@ -619,7 +625,8 @@ DiskFileManagerImpl::CacheOptFieldToDisk(OptFieldT& fields_map) {
auto segment_id = GetFieldDataMeta().segment_id;
auto vec_field_id = GetFieldDataMeta().field_id;
auto local_chunk_manager =
LocalChunkManagerSingleton::GetInstance().GetChunkManager();
LocalChunkManagerFactory::GetInstance().GetChunkManager(
milvus::Role::IndexNode);
auto local_data_path = storage::GenFieldRawDataPathPrefix(
local_chunk_manager, segment_id, vec_field_id) +
std::string(VEC_OPT_FIELDS);
Expand Down Expand Up @@ -696,15 +703,17 @@ DiskFileManagerImpl::GetFileName(const std::string& localfile) {
std::string
DiskFileManagerImpl::GetLocalIndexObjectPrefix() {
auto local_chunk_manager =
LocalChunkManagerSingleton::GetInstance().GetChunkManager();
LocalChunkManagerFactory::GetInstance().GetChunkManager(
milvus::Role::QueryNode);
return GenIndexPathPrefix(
local_chunk_manager, index_meta_.build_id, index_meta_.index_version);
}

std::string
DiskFileManagerImpl::GetLocalTextIndexPrefix() {
auto local_chunk_manager =
LocalChunkManagerSingleton::GetInstance().GetChunkManager();
LocalChunkManagerFactory::GetInstance().GetChunkManager(
milvus::Role::QueryNode);
return GenTextIndexPathPrefix(local_chunk_manager,
index_meta_.build_id,
index_meta_.index_version,
Expand All @@ -729,7 +738,8 @@ DiskFileManagerImpl::GetTextIndexIdentifier() {
std::string
DiskFileManagerImpl::GetLocalRawDataObjectPrefix() {
auto local_chunk_manager =
LocalChunkManagerSingleton::GetInstance().GetChunkManager();
LocalChunkManagerFactory::GetInstance().GetChunkManager(
milvus::Role::IndexNode);
return GenFieldRawDataPathPrefix(
local_chunk_manager, field_meta_.segment_id, field_meta_.field_id);
}
Expand All @@ -744,7 +754,7 @@ std::optional<bool>
DiskFileManagerImpl::IsExisted(const std::string& file) noexcept {
bool isExist = false;
auto local_chunk_manager =
LocalChunkManagerSingleton::GetInstance().GetChunkManager();
LocalChunkManagerFactory::GetInstance().GetChunkManager();
try {
isExist = local_chunk_manager->Exist(file);
} catch (std::exception& e) {
Expand Down
3 changes: 3 additions & 0 deletions internal/core/src/storage/DiskFileManagerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,9 @@ class DiskFileManagerImpl : public FileManagerImpl {

// remote file path
std::map<std::string, int64_t> remote_paths_to_size_;

// indicate whether disk file manager is used for building index or load index
bool for_loading_index_;
};

using DiskANNFileManagerImplPtr = std::shared_ptr<DiskFileManagerImpl>;
Expand Down
51 changes: 33 additions & 18 deletions internal/core/src/storage/LocalChunkManagerSingleton.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,41 +20,56 @@
#include <mutex>
#include <shared_mutex>

#include "common/Common.h"
#include "storage/ChunkManager.h"
#include "storage/LocalChunkManager.h"

namespace milvus::storage {

class LocalChunkManagerSingleton {
private:
LocalChunkManagerSingleton() {
}

class LocalChunkManagerFactory {
public:
LocalChunkManagerSingleton(const LocalChunkManagerSingleton&) = delete;
LocalChunkManagerSingleton&
operator=(const LocalChunkManagerSingleton&) = delete;

static LocalChunkManagerSingleton&
static LocalChunkManagerFactory&
GetInstance() {
static LocalChunkManagerSingleton instance;
static LocalChunkManagerFactory instance;
return instance;
}

void
Init(std::string root_path) {
if (lcm_ == nullptr) {
lcm_ = std::make_shared<LocalChunkManager>(root_path);
AddChunkManager(Role role, std::string root_path) {
std::unique_lock<std::shared_mutex> lock(mutex_);
if (chunk_managers_.find(role) != chunk_managers_.end()) {
PanicInfo(UnexpectedError,
"chunk manager for role {} already exists",
ToString(role));
}
chunk_managers_[role] = std::make_shared<LocalChunkManager>(root_path);
}

LocalChunkManagerSPtr
GetChunkManager(Role role) const {
std::shared_lock<std::shared_mutex> lock(mutex_);
auto it = chunk_managers_.find(role);
if (it == chunk_managers_.end()) {
PanicInfo(UnexpectedError,
"local chunk manager for role:{} not found",
ToString(role));
}
return it->second;
}

// some situations not need to specify the role
// just randomly choose one chunk manager
// because local chunk manager no need root_path
// and interface use abs paths params
LocalChunkManagerSPtr
GetChunkManager() {
return lcm_;
GetChunkManager() const {
std::shared_lock<std::shared_mutex> lock(mutex_);
Assert(chunk_managers_.size() != 0);
return chunk_managers_.begin()->second;
}

private:
LocalChunkManagerSPtr lcm_ = nullptr;
mutable std::shared_mutex mutex_;
std::unordered_map<Role, LocalChunkManagerSPtr> chunk_managers_;
};

} // namespace milvus::storage
4 changes: 2 additions & 2 deletions internal/core/src/storage/MmapChunkManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ MmapChunkManager::~MmapChunkManager() {
}
// clean the mmap dir
auto cm =
storage::LocalChunkManagerSingleton::GetInstance().GetChunkManager();
storage::LocalChunkManagerFactory::GetInstance().GetChunkManager();
if (cm->Exist(mmap_file_prefix_)) {
cm->RemoveDir(mmap_file_prefix_);
}
Expand Down Expand Up @@ -310,7 +310,7 @@ MmapChunkManager::MmapChunkManager(std::string root_path,
std::make_unique<MmapBlocksHandler>(disk_limit, file_size, root_path);
mmap_file_prefix_ = root_path;
auto cm =
storage::LocalChunkManagerSingleton::GetInstance().GetChunkManager();
storage::LocalChunkManagerFactory::GetInstance().GetChunkManager();
AssertInfo(cm != nullptr,
"Fail to get LocalChunkManager, LocalChunkManagerPtr is null");
if (cm->Exist(root_path)) {
Expand Down
9 changes: 6 additions & 3 deletions internal/core/src/storage/storage_c.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// limitations under the License.

#include "storage/storage_c.h"
#include "common/Common.h"
#include "monitor/prometheus_client.h"
#include "storage/RemoteChunkManagerSingleton.h"
#include "storage/LocalChunkManagerSingleton.h"
Expand All @@ -24,7 +25,7 @@ CStatus
GetLocalUsedSize(const char* c_dir, int64_t* size) {
try {
auto local_chunk_manager =
milvus::storage::LocalChunkManagerSingleton::GetInstance()
milvus::storage::LocalChunkManagerFactory::GetInstance()
.GetChunkManager();
std::string dir(c_dir);
if (local_chunk_manager->DirExist(dir)) {
Expand All @@ -39,10 +40,12 @@ GetLocalUsedSize(const char* c_dir, int64_t* size) {
}

CStatus
InitLocalChunkManagerSingleton(const char* c_path) {
InitLocalChunkManager(const char* c_role, const char* c_path) {
try {
std::string role(c_role);
std::string path(c_path);
milvus::storage::LocalChunkManagerSingleton::GetInstance().Init(path);
milvus::storage::LocalChunkManagerFactory::GetInstance()
.AddChunkManager(milvus::FromString(role), path);

return milvus::SuccessCStatus();
} catch (std::exception& e) {
Expand Down
2 changes: 1 addition & 1 deletion internal/core/src/storage/storage_c.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ CStatus
GetLocalUsedSize(const char* c_path, int64_t* size);

CStatus
InitLocalChunkManagerSingleton(const char* path);
InitLocalChunkManager(const char* c_role, const char* path);

CStatus
InitRemoteChunkManagerSingleton(CStorageConfig c_storage_config);
Expand Down
6 changes: 4 additions & 2 deletions internal/core/unittest/init_gtest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@ main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
folly::Init follyInit(&argc, &argv, false);

milvus::storage::LocalChunkManagerSingleton::GetInstance().Init(
TestLocalPath);
milvus::storage::LocalChunkManagerFactory::GetInstance().AddChunkManager(
milvus::Role::QueryNode, TestLocalPath);
milvus::storage::LocalChunkManagerFactory::GetInstance().AddChunkManager(
milvus::Role::IndexNode, TestLocalPath);
milvus::storage::RemoteChunkManagerSingleton::GetInstance().Init(
get_default_local_storage_config());
milvus::storage::MmapManager::GetInstance().Init(get_default_mmap_config());
Expand Down
3 changes: 2 additions & 1 deletion internal/core/unittest/test_chunk_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ class ChunkCacheTest
sparse_metric_type,
false);

lcm = milvus::storage::LocalChunkManagerSingleton::GetInstance()
lcm = milvus::storage::LocalChunkManagerFactory()
.GetInstance()
.GetChunkManager();
dense_data = dataset.get_col<float>(fake_dense_vec_id);
sparse_data = dataset.get_col<knowhere::sparse::SparseRow<float>>(
Expand Down
Loading
Loading