Skip to content

Commit

Permalink
fix: fix directory confusion between indexnode and querynode
Browse files Browse the repository at this point in the history
Signed-off-by: luzhang <[email protected]>
  • Loading branch information
luzhang committed Dec 25, 2024
1 parent 3d360c0 commit ac55af4
Show file tree
Hide file tree
Showing 22 changed files with 139 additions and 71 deletions.
26 changes: 26 additions & 0 deletions internal/core/src/common/Common.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,32 @@ extern int64_t LOW_PRIORITY_THREAD_CORE_COEFFICIENT;
extern int CPU_NUM;
extern int64_t EXEC_EVAL_EXPR_BATCH_SIZE;

enum class Role { QueryNode, IndexNode };

const std::unordered_map<Role, std::string> RoleToStringMap = {
{Role::QueryNode, "querynode"}, {Role::IndexNode, "indexnode"}};

// convert role to string
inline std::string
ToString(Role role) {
auto it = RoleToStringMap.find(role);
if (it != RoleToStringMap.end()) {
return it->second;

Check warning on line 43 in internal/core/src/common/Common.h

View check run for this annotation

Codecov / codecov/patch

internal/core/src/common/Common.h#L40-L43

Added lines #L40 - L43 were not covered by tests
}
PanicInfo(UnexpectedError, "role {} not found", int(role));

Check warning on line 45 in internal/core/src/common/Common.h

View check run for this annotation

Codecov / codecov/patch

internal/core/src/common/Common.h#L45

Added line #L45 was not covered by tests
}

// convert string to role
inline Role
FromString(const std::string& role_str) {
for (const auto& pair : RoleToStringMap) {
if (pair.second == role_str) {
return pair.first;

Check warning on line 53 in internal/core/src/common/Common.h

View check run for this annotation

Codecov / codecov/patch

internal/core/src/common/Common.h#L50-L53

Added lines #L50 - L53 were not covered by tests
}
}
PanicInfo(UnexpectedError, "role {} not found", role_str);

Check warning on line 56 in internal/core/src/common/Common.h

View check run for this annotation

Codecov / codecov/patch

internal/core/src/common/Common.h#L56

Added line #L56 was not covered by tests
}

void
SetIndexSliceSize(const int64_t size);

Expand Down
2 changes: 1 addition & 1 deletion internal/core/src/index/InvertedIndexTantivy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ InvertedIndexTantivy<T>::InvertedIndexTantivy(
template <typename T>
InvertedIndexTantivy<T>::~InvertedIndexTantivy() {
auto local_chunk_manager =
storage::LocalChunkManagerSingleton::GetInstance().GetChunkManager();
storage::LocalChunkManagerFactory::GetInstance().GetChunkManager();
auto prefix = path_;
LOG_INFO("inverted index remove path:{}", path_);
local_chunk_manager->RemoveDir(prefix);
Expand Down
10 changes: 6 additions & 4 deletions internal/core/src/index/VectorDiskIndex.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ VectorDiskAnnIndex<T>::VectorDiskAnnIndex(
std::make_shared<storage::DiskFileManagerImpl>(file_manager_context);
AssertInfo(file_manager_ != nullptr, "create file manager failed!");
auto local_chunk_manager =
storage::LocalChunkManagerSingleton::GetInstance().GetChunkManager();
storage::LocalChunkManagerFactory::GetInstance().GetChunkManager();
auto local_index_path_prefix = file_manager_->GetLocalIndexObjectPrefix();

// As we have guarded dup-load in QueryNode,
Expand Down Expand Up @@ -135,7 +135,8 @@ template <typename T>
void
VectorDiskAnnIndex<T>::Build(const Config& config) {
auto local_chunk_manager =
storage::LocalChunkManagerSingleton::GetInstance().GetChunkManager();
storage::LocalChunkManagerFactory::GetInstance().GetChunkManager(

Check warning on line 138 in internal/core/src/index/VectorDiskIndex.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/index/VectorDiskIndex.cpp#L138

Added line #L138 was not covered by tests
milvus::Role::IndexNode);
knowhere::Json build_config;
build_config.update(config);

Expand Down Expand Up @@ -185,7 +186,8 @@ void
VectorDiskAnnIndex<T>::BuildWithDataset(const DatasetPtr& dataset,
const Config& config) {
auto local_chunk_manager =
storage::LocalChunkManagerSingleton::GetInstance().GetChunkManager();
storage::LocalChunkManagerFactory::GetInstance().GetChunkManager(
milvus::Role::IndexNode);
knowhere::Json build_config;
build_config.update(config);
// set data path
Expand Down Expand Up @@ -376,7 +378,7 @@ template <typename T>
void
VectorDiskAnnIndex<T>::CleanLocalData() {
auto local_chunk_manager =
storage::LocalChunkManagerSingleton::GetInstance().GetChunkManager();
storage::LocalChunkManagerFactory::GetInstance().GetChunkManager();

Check warning on line 381 in internal/core/src/index/VectorDiskIndex.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/index/VectorDiskIndex.cpp#L381

Added line #L381 was not covered by tests
local_chunk_manager->RemoveDir(file_manager_->GetLocalIndexObjectPrefix());
local_chunk_manager->RemoveDir(
file_manager_->GetLocalRawDataObjectPrefix());
Expand Down
5 changes: 3 additions & 2 deletions internal/core/src/segcore/load_index_c.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "common/EasyAssert.h"
#include "common/Types.h"
#include "common/type_c.h"
#include "common/Common.h"
#include "index/Index.h"
#include "index/IndexFactory.h"
#include "index/Meta.h"
Expand Down Expand Up @@ -424,8 +425,8 @@ CleanLoadedIndex(CLoadIndexInfo c_load_index_info) {
auto load_index_info =
(milvus::segcore::LoadIndexInfo*)c_load_index_info;
auto local_chunk_manager =
milvus::storage::LocalChunkManagerSingleton::GetInstance()
.GetChunkManager();
milvus::storage::LocalChunkManagerFactory::GetInstance()
.GetChunkManager(milvus::Role::QueryNode);

Check warning on line 429 in internal/core/src/segcore/load_index_c.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/segcore/load_index_c.cpp#L428-L429

Added lines #L428 - L429 were not covered by tests
auto index_file_path_prefix =
milvus::storage::GenIndexPathPrefix(local_chunk_manager,
load_index_info->index_build_id,
Expand Down
36 changes: 23 additions & 13 deletions internal/core/src/storage/DiskFileManagerImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,18 @@ namespace milvus::storage {
DiskFileManagerImpl::DiskFileManagerImpl(
const FileManagerContext& fileManagerContext)
: FileManagerImpl(fileManagerContext.fieldDataMeta,
fileManagerContext.indexMeta) {
fileManagerContext.indexMeta),
for_loading_index_(fileManagerContext.for_loading_index) {
rcm_ = fileManagerContext.chunkManagerPtr;
}

DiskFileManagerImpl::~DiskFileManagerImpl() {
auto local_chunk_manager =
LocalChunkManagerSingleton::GetInstance().GetChunkManager();
for_loading_index_
? LocalChunkManagerFactory::GetInstance().GetChunkManager(
milvus::Role::QueryNode)
: LocalChunkManagerFactory::GetInstance().GetChunkManager(
milvus::Role::IndexNode);
local_chunk_manager->RemoveDir(GetIndexPathPrefixWithBuildID(
local_chunk_manager, index_meta_.build_id));
}
Expand Down Expand Up @@ -82,7 +87,7 @@ DiskFileManagerImpl::GetRemoteTextLogPath(const std::string& file_name,
bool
DiskFileManagerImpl::AddFile(const std::string& file) noexcept {
auto local_chunk_manager =
LocalChunkManagerSingleton::GetInstance().GetChunkManager();
LocalChunkManagerFactory::GetInstance().GetChunkManager();
FILEMANAGER_TRY
if (!local_chunk_manager->Exist(file)) {
LOG_ERROR("local file {} not exists", file);
Expand Down Expand Up @@ -134,7 +139,7 @@ DiskFileManagerImpl::AddFile(const std::string& file) noexcept {
bool
DiskFileManagerImpl::AddTextLog(const std::string& file) noexcept {
auto local_chunk_manager =
LocalChunkManagerSingleton::GetInstance().GetChunkManager();
LocalChunkManagerFactory::GetInstance().GetChunkManager();

Check warning on line 142 in internal/core/src/storage/DiskFileManagerImpl.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/storage/DiskFileManagerImpl.cpp#L142

Added line #L142 was not covered by tests
FILEMANAGER_TRY
if (!local_chunk_manager->Exist(file)) {
LOG_ERROR("local file {} not exists", file);
Expand Down Expand Up @@ -190,7 +195,7 @@ DiskFileManagerImpl::AddBatchIndexFiles(
const std::vector<std::string>& remote_files,
const std::vector<int64_t>& remote_file_sizes) {
auto local_chunk_manager =
LocalChunkManagerSingleton::GetInstance().GetChunkManager();
LocalChunkManagerFactory::GetInstance().GetChunkManager();
auto& pool = ThreadPools::GetThreadPool(milvus::ThreadPoolPriority::HIGH);

std::vector<std::future<std::shared_ptr<uint8_t[]>>> futures;
Expand Down Expand Up @@ -239,7 +244,7 @@ void
DiskFileManagerImpl::CacheIndexToDisk(
const std::vector<std::string>& remote_files) {
auto local_chunk_manager =
LocalChunkManagerSingleton::GetInstance().GetChunkManager();
LocalChunkManagerFactory::GetInstance().GetChunkManager();

std::map<std::string, std::vector<int>> index_slices;
for (auto& file_path : remote_files) {
Expand Down Expand Up @@ -307,7 +312,7 @@ void
DiskFileManagerImpl::CacheTextLogToDisk(
const std::vector<std::string>& remote_files) {
auto local_chunk_manager =
LocalChunkManagerSingleton::GetInstance().GetChunkManager();
LocalChunkManagerFactory::GetInstance().GetChunkManager();

Check warning on line 315 in internal/core/src/storage/DiskFileManagerImpl.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/storage/DiskFileManagerImpl.cpp#L315

Added line #L315 was not covered by tests

std::map<std::string, std::vector<int>> index_slices;
for (auto& file_path : remote_files) {
Expand Down Expand Up @@ -376,7 +381,8 @@ DiskFileManagerImpl::CacheRawDataToDisk(std::vector<std::string> remote_files) {
auto field_id = GetFieldDataMeta().field_id;

auto local_chunk_manager =
LocalChunkManagerSingleton::GetInstance().GetChunkManager();
LocalChunkManagerFactory::GetInstance().GetChunkManager(

Check warning on line 384 in internal/core/src/storage/DiskFileManagerImpl.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/storage/DiskFileManagerImpl.cpp#L384

Added line #L384 was not covered by tests
milvus::Role::IndexNode);
std::string local_data_path;
bool file_created = false;

Expand Down Expand Up @@ -619,7 +625,8 @@ DiskFileManagerImpl::CacheOptFieldToDisk(OptFieldT& fields_map) {
auto segment_id = GetFieldDataMeta().segment_id;
auto vec_field_id = GetFieldDataMeta().field_id;
auto local_chunk_manager =
LocalChunkManagerSingleton::GetInstance().GetChunkManager();
LocalChunkManagerFactory::GetInstance().GetChunkManager(
milvus::Role::IndexNode);
auto local_data_path = storage::GenFieldRawDataPathPrefix(
local_chunk_manager, segment_id, vec_field_id) +
std::string(VEC_OPT_FIELDS);
Expand Down Expand Up @@ -696,15 +703,17 @@ DiskFileManagerImpl::GetFileName(const std::string& localfile) {
std::string
DiskFileManagerImpl::GetLocalIndexObjectPrefix() {
auto local_chunk_manager =
LocalChunkManagerSingleton::GetInstance().GetChunkManager();
LocalChunkManagerFactory::GetInstance().GetChunkManager(
milvus::Role::QueryNode);
return GenIndexPathPrefix(
local_chunk_manager, index_meta_.build_id, index_meta_.index_version);
}

std::string
DiskFileManagerImpl::GetLocalTextIndexPrefix() {
auto local_chunk_manager =
LocalChunkManagerSingleton::GetInstance().GetChunkManager();
LocalChunkManagerFactory::GetInstance().GetChunkManager(
milvus::Role::QueryNode);

Check warning on line 716 in internal/core/src/storage/DiskFileManagerImpl.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/storage/DiskFileManagerImpl.cpp#L715-L716

Added lines #L715 - L716 were not covered by tests
return GenTextIndexPathPrefix(local_chunk_manager,
index_meta_.build_id,
index_meta_.index_version,
Expand All @@ -729,7 +738,8 @@ DiskFileManagerImpl::GetTextIndexIdentifier() {
std::string
DiskFileManagerImpl::GetLocalRawDataObjectPrefix() {
auto local_chunk_manager =
LocalChunkManagerSingleton::GetInstance().GetChunkManager();
LocalChunkManagerFactory::GetInstance().GetChunkManager(
milvus::Role::IndexNode);

Check warning on line 742 in internal/core/src/storage/DiskFileManagerImpl.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/storage/DiskFileManagerImpl.cpp#L741-L742

Added lines #L741 - L742 were not covered by tests
return GenFieldRawDataPathPrefix(
local_chunk_manager, field_meta_.segment_id, field_meta_.field_id);
}
Expand All @@ -744,7 +754,7 @@ std::optional<bool>
DiskFileManagerImpl::IsExisted(const std::string& file) noexcept {
bool isExist = false;
auto local_chunk_manager =
LocalChunkManagerSingleton::GetInstance().GetChunkManager();
LocalChunkManagerFactory::GetInstance().GetChunkManager();
try {
isExist = local_chunk_manager->Exist(file);
} catch (std::exception& e) {
Expand Down
3 changes: 3 additions & 0 deletions internal/core/src/storage/DiskFileManagerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,9 @@ class DiskFileManagerImpl : public FileManagerImpl {

// remote file path
std::map<std::string, int64_t> remote_paths_to_size_;

// indicate whether disk file manager is used for building index or load index
bool for_loading_index_;
};

using DiskANNFileManagerImplPtr = std::shared_ptr<DiskFileManagerImpl>;
Expand Down
51 changes: 33 additions & 18 deletions internal/core/src/storage/LocalChunkManagerSingleton.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,41 +20,56 @@
#include <mutex>
#include <shared_mutex>

#include "common/Common.h"
#include "storage/ChunkManager.h"
#include "storage/LocalChunkManager.h"

namespace milvus::storage {

class LocalChunkManagerSingleton {
private:
LocalChunkManagerSingleton() {
}

class LocalChunkManagerFactory {
public:
LocalChunkManagerSingleton(const LocalChunkManagerSingleton&) = delete;
LocalChunkManagerSingleton&
operator=(const LocalChunkManagerSingleton&) = delete;

static LocalChunkManagerSingleton&
static LocalChunkManagerFactory&
GetInstance() {
static LocalChunkManagerSingleton instance;
static LocalChunkManagerFactory instance;
return instance;
}

void
Init(std::string root_path) {
if (lcm_ == nullptr) {
lcm_ = std::make_shared<LocalChunkManager>(root_path);
AddChunkManager(Role role, std::string root_path) {
std::unique_lock<std::shared_mutex> lock(mutex_);
if (chunk_managers_.find(role) != chunk_managers_.end()) {
PanicInfo(UnexpectedError,

Check warning on line 41 in internal/core/src/storage/LocalChunkManagerSingleton.h

View check run for this annotation

Codecov / codecov/patch

internal/core/src/storage/LocalChunkManagerSingleton.h#L41

Added line #L41 was not covered by tests
"chunk manager for role {} already exists",
ToString(role));
}
chunk_managers_[role] = std::make_shared<LocalChunkManager>(root_path);
}

LocalChunkManagerSPtr
GetChunkManager(Role role) const {
std::shared_lock<std::shared_mutex> lock(mutex_);
auto it = chunk_managers_.find(role);
if (it == chunk_managers_.end()) {
PanicInfo(UnexpectedError,

Check warning on line 53 in internal/core/src/storage/LocalChunkManagerSingleton.h

View check run for this annotation

Codecov / codecov/patch

internal/core/src/storage/LocalChunkManagerSingleton.h#L53

Added line #L53 was not covered by tests
"local chunk manager for role:{} not found",
ToString(role));
}
return it->second;
}

// some situations not need to specify the role
// just randomly choose one chunk manager
// because local chunk manager no need root_path
// and interface use abs paths params
LocalChunkManagerSPtr
GetChunkManager() {
return lcm_;
GetChunkManager() const {
std::shared_lock<std::shared_mutex> lock(mutex_);
Assert(chunk_managers_.size() != 0);
return chunk_managers_.begin()->second;
}

private:
LocalChunkManagerSPtr lcm_ = nullptr;
mutable std::shared_mutex mutex_;
std::unordered_map<Role, LocalChunkManagerSPtr> chunk_managers_;
};

} // namespace milvus::storage
4 changes: 2 additions & 2 deletions internal/core/src/storage/MmapChunkManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ MmapChunkManager::~MmapChunkManager() {
}
// clean the mmap dir
auto cm =
storage::LocalChunkManagerSingleton::GetInstance().GetChunkManager();
storage::LocalChunkManagerFactory::GetInstance().GetChunkManager();
if (cm->Exist(mmap_file_prefix_)) {
cm->RemoveDir(mmap_file_prefix_);
}
Expand Down Expand Up @@ -310,7 +310,7 @@ MmapChunkManager::MmapChunkManager(std::string root_path,
std::make_unique<MmapBlocksHandler>(disk_limit, file_size, root_path);
mmap_file_prefix_ = root_path;
auto cm =
storage::LocalChunkManagerSingleton::GetInstance().GetChunkManager();
storage::LocalChunkManagerFactory::GetInstance().GetChunkManager();
AssertInfo(cm != nullptr,
"Fail to get LocalChunkManager, LocalChunkManagerPtr is null");
if (cm->Exist(root_path)) {
Expand Down
9 changes: 6 additions & 3 deletions internal/core/src/storage/storage_c.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// limitations under the License.

#include "storage/storage_c.h"
#include "common/Common.h"
#include "monitor/prometheus_client.h"
#include "storage/RemoteChunkManagerSingleton.h"
#include "storage/LocalChunkManagerSingleton.h"
Expand All @@ -24,7 +25,7 @@ CStatus
GetLocalUsedSize(const char* c_dir, int64_t* size) {
try {
auto local_chunk_manager =
milvus::storage::LocalChunkManagerSingleton::GetInstance()
milvus::storage::LocalChunkManagerFactory::GetInstance()
.GetChunkManager();
std::string dir(c_dir);
if (local_chunk_manager->DirExist(dir)) {
Expand All @@ -39,10 +40,12 @@ GetLocalUsedSize(const char* c_dir, int64_t* size) {
}

CStatus
InitLocalChunkManagerSingleton(const char* c_path) {
InitLocalChunkManager(const char* c_role, const char* c_path) {

Check warning on line 43 in internal/core/src/storage/storage_c.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/storage/storage_c.cpp#L43

Added line #L43 was not covered by tests
try {
std::string role(c_role);

Check warning on line 45 in internal/core/src/storage/storage_c.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/storage/storage_c.cpp#L45

Added line #L45 was not covered by tests
std::string path(c_path);
milvus::storage::LocalChunkManagerSingleton::GetInstance().Init(path);
milvus::storage::LocalChunkManagerFactory::GetInstance()
.AddChunkManager(milvus::FromString(role), path);

Check warning on line 48 in internal/core/src/storage/storage_c.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/storage/storage_c.cpp#L47-L48

Added lines #L47 - L48 were not covered by tests

return milvus::SuccessCStatus();
} catch (std::exception& e) {
Expand Down
2 changes: 1 addition & 1 deletion internal/core/src/storage/storage_c.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ CStatus
GetLocalUsedSize(const char* c_path, int64_t* size);

CStatus
InitLocalChunkManagerSingleton(const char* path);
InitLocalChunkManager(const char* c_role, const char* path);

CStatus
InitRemoteChunkManagerSingleton(CStorageConfig c_storage_config);
Expand Down
6 changes: 4 additions & 2 deletions internal/core/unittest/init_gtest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@ main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
folly::Init follyInit(&argc, &argv, false);

milvus::storage::LocalChunkManagerSingleton::GetInstance().Init(
TestLocalPath);
milvus::storage::LocalChunkManagerFactory::GetInstance().AddChunkManager(
milvus::Role::QueryNode, TestLocalPath);
milvus::storage::LocalChunkManagerFactory::GetInstance().AddChunkManager(
milvus::Role::IndexNode, TestLocalPath);
milvus::storage::RemoteChunkManagerSingleton::GetInstance().Init(
get_default_local_storage_config());
milvus::storage::MmapManager::GetInstance().Init(get_default_mmap_config());
Expand Down
3 changes: 2 additions & 1 deletion internal/core/unittest/test_chunk_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ class ChunkCacheTest
sparse_metric_type,
false);

lcm = milvus::storage::LocalChunkManagerSingleton::GetInstance()
lcm = milvus::storage::LocalChunkManagerFactory()
.GetInstance()
.GetChunkManager();
dense_data = dataset.get_col<float>(fake_dense_vec_id);
sparse_data = dataset.get_col<knowhere::sparse::SparseRow<float>>(
Expand Down
Loading

0 comments on commit ac55af4

Please sign in to comment.