Skip to content

Commit

Permalink
fix: fix directory confusion between indexnode and querynode
Browse files Browse the repository at this point in the history
Signed-off-by: luzhang <[email protected]>
  • Loading branch information
luzhang committed Dec 20, 2024
1 parent 3d360c0 commit 89b71db
Show file tree
Hide file tree
Showing 15 changed files with 122 additions and 52 deletions.
26 changes: 26 additions & 0 deletions internal/core/src/common/Common.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,32 @@ extern int64_t LOW_PRIORITY_THREAD_CORE_COEFFICIENT;
extern int CPU_NUM;
extern int64_t EXEC_EVAL_EXPR_BATCH_SIZE;

enum class Role { QueryNode, IndexNode };

const std::unordered_map<Role, std::string> RoleToStringMap = {
{Role::QueryNode, "querynode"}, {Role::IndexNode, "indexnode"}};

// convert role to string
inline std::string
ToString(Role role) {
auto it = RoleToStringMap.find(role);
if (it != RoleToStringMap.end()) {
return it->second;
}
PanicInfo(UnexpectedError, "role {} not found", int(role));
}

// convert string to role
inline Role
FromString(const std::string& role_str) {
for (const auto& pair : RoleToStringMap) {
if (pair.second == role_str) {
return pair.first;
}
}
PanicInfo(UnexpectedError, "role {} not found", role_str);
}

void
SetIndexSliceSize(const int64_t size);

Expand Down
10 changes: 6 additions & 4 deletions internal/core/src/index/VectorDiskIndex.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ VectorDiskAnnIndex<T>::VectorDiskAnnIndex(
std::make_shared<storage::DiskFileManagerImpl>(file_manager_context);
AssertInfo(file_manager_ != nullptr, "create file manager failed!");
auto local_chunk_manager =
storage::LocalChunkManagerSingleton::GetInstance().GetChunkManager();
storage::LocalChunkManagerFactory::GetInstance().GetChunkManager();
auto local_index_path_prefix = file_manager_->GetLocalIndexObjectPrefix();

// As we have guarded dup-load in QueryNode,
Expand Down Expand Up @@ -135,7 +135,8 @@ template <typename T>
void
VectorDiskAnnIndex<T>::Build(const Config& config) {
auto local_chunk_manager =
storage::LocalChunkManagerSingleton::GetInstance().GetChunkManager();
storage::LocalChunkManagerFactory::GetInstance().GetChunkManager(
Role::IndexNode);
knowhere::Json build_config;
build_config.update(config);

Expand Down Expand Up @@ -185,7 +186,8 @@ void
VectorDiskAnnIndex<T>::BuildWithDataset(const DatasetPtr& dataset,
const Config& config) {
auto local_chunk_manager =
storage::LocalChunkManagerSingleton::GetInstance().GetChunkManager();
storage::LocalChunkManagerFactory::GetInstance().GetChunkManager(
Role::IndexNode);
knowhere::Json build_config;
build_config.update(config);
// set data path
Expand Down Expand Up @@ -376,7 +378,7 @@ template <typename T>
void
VectorDiskAnnIndex<T>::CleanLocalData() {
auto local_chunk_manager =
storage::LocalChunkManagerSingleton::GetInstance().GetChunkManager();
storage::LocalChunkManagerFactory::GetInstance().GetChunkManager();
local_chunk_manager->RemoveDir(file_manager_->GetLocalIndexObjectPrefix());
local_chunk_manager->RemoveDir(
file_manager_->GetLocalRawDataObjectPrefix());
Expand Down
5 changes: 3 additions & 2 deletions internal/core/src/segcore/load_index_c.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "common/EasyAssert.h"
#include "common/Types.h"
#include "common/type_c.h"
#include "common/Common.h"
#include "index/Index.h"
#include "index/IndexFactory.h"
#include "index/Meta.h"
Expand Down Expand Up @@ -424,8 +425,8 @@ CleanLoadedIndex(CLoadIndexInfo c_load_index_info) {
auto load_index_info =
(milvus::segcore::LoadIndexInfo*)c_load_index_info;
auto local_chunk_manager =
milvus::storage::LocalChunkManagerSingleton::GetInstance()
.GetChunkManager();
milvus::storage::LocalChunkManagerFactory::GetInstance()
.GetChunkManager(Role::QueryNode);
auto index_file_path_prefix =
milvus::storage::GenIndexPathPrefix(local_chunk_manager,
load_index_info->index_build_id,
Expand Down
36 changes: 23 additions & 13 deletions internal/core/src/storage/DiskFileManagerImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,18 @@ namespace milvus::storage {
DiskFileManagerImpl::DiskFileManagerImpl(
const FileManagerContext& fileManagerContext)
: FileManagerImpl(fileManagerContext.fieldDataMeta,
fileManagerContext.indexMeta) {
fileManagerContext.indexMeta),
for_loading_index_(fileManagerContext.for_loading_index) {
rcm_ = fileManagerContext.chunkManagerPtr;
}

DiskFileManagerImpl::~DiskFileManagerImpl() {
auto local_chunk_manager =
LocalChunkManagerSingleton::GetInstance().GetChunkManager();
for_loading_index_
? LocalChunkManagerFactory::GetInstance().GetChunkManager(
Role::QueryNode)
: LocalChunkManagerFactory::GetInstance().GetChunkManager(
Role::IndexNode);
local_chunk_manager->RemoveDir(GetIndexPathPrefixWithBuildID(
local_chunk_manager, index_meta_.build_id));
}
Expand Down Expand Up @@ -82,7 +87,7 @@ DiskFileManagerImpl::GetRemoteTextLogPath(const std::string& file_name,
bool
DiskFileManagerImpl::AddFile(const std::string& file) noexcept {
auto local_chunk_manager =
LocalChunkManagerSingleton::GetInstance().GetChunkManager();
LocalChunkManagerFactory::GetInstance().GetChunkManager();
FILEMANAGER_TRY
if (!local_chunk_manager->Exist(file)) {
LOG_ERROR("local file {} not exists", file);
Expand Down Expand Up @@ -134,7 +139,7 @@ DiskFileManagerImpl::AddFile(const std::string& file) noexcept {
bool
DiskFileManagerImpl::AddTextLog(const std::string& file) noexcept {
auto local_chunk_manager =
LocalChunkManagerSingleton::GetInstance().GetChunkManager();
LocalChunkManagerFactory::GetInstance().GetChunkManager();
FILEMANAGER_TRY
if (!local_chunk_manager->Exist(file)) {
LOG_ERROR("local file {} not exists", file);
Expand Down Expand Up @@ -190,7 +195,7 @@ DiskFileManagerImpl::AddBatchIndexFiles(
const std::vector<std::string>& remote_files,
const std::vector<int64_t>& remote_file_sizes) {
auto local_chunk_manager =
LocalChunkManagerSingleton::GetInstance().GetChunkManager();
LocalChunkManagerFactory::GetInstance().GetChunkManager();
auto& pool = ThreadPools::GetThreadPool(milvus::ThreadPoolPriority::HIGH);

std::vector<std::future<std::shared_ptr<uint8_t[]>>> futures;
Expand Down Expand Up @@ -239,7 +244,7 @@ void
DiskFileManagerImpl::CacheIndexToDisk(
const std::vector<std::string>& remote_files) {
auto local_chunk_manager =
LocalChunkManagerSingleton::GetInstance().GetChunkManager();
LocalChunkManagerFactory::GetInstance().GetChunkManager();

std::map<std::string, std::vector<int>> index_slices;
for (auto& file_path : remote_files) {
Expand Down Expand Up @@ -307,7 +312,7 @@ void
DiskFileManagerImpl::CacheTextLogToDisk(
const std::vector<std::string>& remote_files) {
auto local_chunk_manager =
LocalChunkManagerSingleton::GetInstance().GetChunkManager();
LocalChunkManagerFactory::GetInstance().GetChunkManager();

std::map<std::string, std::vector<int>> index_slices;
for (auto& file_path : remote_files) {
Expand Down Expand Up @@ -376,7 +381,8 @@ DiskFileManagerImpl::CacheRawDataToDisk(std::vector<std::string> remote_files) {
auto field_id = GetFieldDataMeta().field_id;

auto local_chunk_manager =
LocalChunkManagerSingleton::GetInstance().GetChunkManager();
LocalChunkManagerFactory::GetInstance().GetChunkManager(
Role::IndexNode);
std::string local_data_path;
bool file_created = false;

Expand Down Expand Up @@ -619,7 +625,8 @@ DiskFileManagerImpl::CacheOptFieldToDisk(OptFieldT& fields_map) {
auto segment_id = GetFieldDataMeta().segment_id;
auto vec_field_id = GetFieldDataMeta().field_id;
auto local_chunk_manager =
LocalChunkManagerSingleton::GetInstance().GetChunkManager();
LocalChunkManagerFactory::GetInstance().GetChunkManager(
Role::IndexNode);
auto local_data_path = storage::GenFieldRawDataPathPrefix(
local_chunk_manager, segment_id, vec_field_id) +
std::string(VEC_OPT_FIELDS);
Expand Down Expand Up @@ -696,15 +703,17 @@ DiskFileManagerImpl::GetFileName(const std::string& localfile) {
std::string
DiskFileManagerImpl::GetLocalIndexObjectPrefix() {
auto local_chunk_manager =
LocalChunkManagerSingleton::GetInstance().GetChunkManager();
LocalChunkManagerFactory::GetInstance().GetChunkManager(
Role::QueryNode);
return GenIndexPathPrefix(
local_chunk_manager, index_meta_.build_id, index_meta_.index_version);
}

std::string
DiskFileManagerImpl::GetLocalTextIndexPrefix() {
auto local_chunk_manager =
LocalChunkManagerSingleton::GetInstance().GetChunkManager();
LocalChunkManagerFactory::GetInstance().GetChunkManager(
Role::QueryNode);
return GenTextIndexPathPrefix(local_chunk_manager,
index_meta_.build_id,
index_meta_.index_version,
Expand All @@ -729,7 +738,8 @@ DiskFileManagerImpl::GetTextIndexIdentifier() {
std::string
DiskFileManagerImpl::GetLocalRawDataObjectPrefix() {
auto local_chunk_manager =
LocalChunkManagerSingleton::GetInstance().GetChunkManager();
LocalChunkManagerFactory::GetInstance().GetChunkManager(
Role::IndexNode);
return GenFieldRawDataPathPrefix(
local_chunk_manager, field_meta_.segment_id, field_meta_.field_id);
}
Expand All @@ -744,7 +754,7 @@ std::optional<bool>
DiskFileManagerImpl::IsExisted(const std::string& file) noexcept {
bool isExist = false;
auto local_chunk_manager =
LocalChunkManagerSingleton::GetInstance().GetChunkManager();
LocalChunkManagerFactory::GetInstance().GetChunkManager();
try {
isExist = local_chunk_manager->Exist(file);
} catch (std::exception& e) {
Expand Down
3 changes: 3 additions & 0 deletions internal/core/src/storage/DiskFileManagerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,9 @@ class DiskFileManagerImpl : public FileManagerImpl {

// remote file path
std::map<std::string, int64_t> remote_paths_to_size_;

// indicate whether disk file manager is used for building index or load index
bool for_loading_index_;
};

using DiskANNFileManagerImplPtr = std::shared_ptr<DiskFileManagerImpl>;
Expand Down
51 changes: 33 additions & 18 deletions internal/core/src/storage/LocalChunkManagerSingleton.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,41 +20,56 @@
#include <mutex>
#include <shared_mutex>

#include "common/Common.h"
#include "storage/ChunkManager.h"
#include "storage/LocalChunkManager.h"

namespace milvus::storage {

class LocalChunkManagerSingleton {
private:
LocalChunkManagerSingleton() {
}

class LocalChunkManagerFactory {
public:
LocalChunkManagerSingleton(const LocalChunkManagerSingleton&) = delete;
LocalChunkManagerSingleton&
operator=(const LocalChunkManagerSingleton&) = delete;

static LocalChunkManagerSingleton&
static LocalChunkManagerFactory&
GetInstance() {
static LocalChunkManagerSingleton instance;
static LocalChunkManagerFactory instance;
return instance;
}

void
Init(std::string root_path) {
if (lcm_ == nullptr) {
lcm_ = std::make_shared<LocalChunkManager>(root_path);
AddChunkManager(Role role, std::string root_path) {
std::unique_lock<std::shared_mutex> lock(mutex_);
if (chunk_managers_.find(role) != chunk_managers_.end()) {
PanicInfo(UnexpectedError,
"chunk manager for role {} already exists",
ToString(role));
}
chunk_managers_[role] = std::make_shared<LocalChunkManager>(root_path);
}

LocalChunkManagerSPtr
GetChunkManager(Role role) const {
std::shared_lock<std::shared_mutex> lock(mutex_);
auto it = chunk_managers_.find(role);
if (it == chunk_managers_.end()) {
PanicInfo(UnexpectedError,
"local chunk manager for role:{} not found",
ToString(role));
}
return it->second;
}

// some situations not need to specify the role
// just randomly choose one chunk manager
// because local chunk manager no need root_path
// and interface use abs paths params
LocalChunkManagerSPtr
GetChunkManager() {
return lcm_;
GetChunkManager() const {
std::shared_lock<std::shared_mutex> lock(mutex_);
Assert(chunk_managers_.size() != 0);
return chunk_managers_.begin()->second;
}

private:
LocalChunkManagerSPtr lcm_ = nullptr;
mutable std::shared_mutex mutex_;
std::unordered_map<Role, LocalChunkManagerSPtr> chunk_managers_;
};

} // namespace milvus::storage
4 changes: 2 additions & 2 deletions internal/core/src/storage/MmapChunkManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ MmapChunkManager::~MmapChunkManager() {
}
// clean the mmap dir
auto cm =
storage::LocalChunkManagerSingleton::GetInstance().GetChunkManager();
storage::LocalChunkManagerFactory::GetInstance().GetChunkManager();
if (cm->Exist(mmap_file_prefix_)) {
cm->RemoveDir(mmap_file_prefix_);
}
Expand Down Expand Up @@ -310,7 +310,7 @@ MmapChunkManager::MmapChunkManager(std::string root_path,
std::make_unique<MmapBlocksHandler>(disk_limit, file_size, root_path);
mmap_file_prefix_ = root_path;
auto cm =
storage::LocalChunkManagerSingleton::GetInstance().GetChunkManager();
storage::LocalChunkManagerFactory::GetInstance().GetChunkManager();
AssertInfo(cm != nullptr,
"Fail to get LocalChunkManager, LocalChunkManagerPtr is null");
if (cm->Exist(root_path)) {
Expand Down
9 changes: 6 additions & 3 deletions internal/core/src/storage/storage_c.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// limitations under the License.

#include "storage/storage_c.h"
#include "common/Common.h"
#include "monitor/prometheus_client.h"
#include "storage/RemoteChunkManagerSingleton.h"
#include "storage/LocalChunkManagerSingleton.h"
Expand All @@ -24,7 +25,7 @@ CStatus
GetLocalUsedSize(const char* c_dir, int64_t* size) {
try {
auto local_chunk_manager =
milvus::storage::LocalChunkManagerSingleton::GetInstance()
milvus::storage::LocalChunkManagerFactory::GetInstance()
.GetChunkManager();
std::string dir(c_dir);
if (local_chunk_manager->DirExist(dir)) {
Expand All @@ -39,10 +40,12 @@ GetLocalUsedSize(const char* c_dir, int64_t* size) {
}

CStatus
InitLocalChunkManagerSingleton(const char* c_path) {
InitLocalChunkManager(const char* c_role, const char* c_path) {
try {
std::string role(c_role);
std::string path(c_path);
milvus::storage::LocalChunkManagerSingleton::GetInstance().Init(path);
milvus::storage::LocalChunkManagerFactory::GetInstance()
.AddChunkManager(milvus::FromString(role), path);

return milvus::SuccessCStatus();
} catch (std::exception& e) {
Expand Down
2 changes: 1 addition & 1 deletion internal/core/src/storage/storage_c.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ CStatus
GetLocalUsedSize(const char* c_path, int64_t* size);

CStatus
InitLocalChunkManagerSingleton(const char* path);
InitLocalChunkManager(const char* path);

CStatus
InitRemoteChunkManagerSingleton(CStorageConfig c_storage_config);
Expand Down
2 changes: 1 addition & 1 deletion internal/indexnode/indexnode.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ func (i *IndexNode) initSegcore() {
C.SegcoreSetKnowhereBuildThreadPoolNum(cKnowhereThreadPoolSize)

localDataRootPath := filepath.Join(Params.LocalStorageCfg.Path.GetValue(), typeutil.IndexNodeRole)
initcore.InitLocalChunkManager(localDataRootPath)
initcore.InitLocalChunkManager(typeutil.IndexNodeRole, localDataRootPath)
cGpuMemoryPoolInitSize := C.uint32_t(paramtable.Get().GpuConfig.InitSize.GetAsUint32())
cGpuMemoryPoolMaxSize := C.uint32_t(paramtable.Get().GpuConfig.MaxSize.GetAsUint32())
C.SegcoreSetKnowhereGpuMemoryPoolSize(cGpuMemoryPoolInitSize, cGpuMemoryPoolMaxSize)
Expand Down
5 changes: 4 additions & 1 deletion internal/indexnode/task_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package indexnode
import (
"context"
"fmt"
"path/filepath"
"strconv"
"strings"
"time"
Expand All @@ -42,6 +43,7 @@ import (
"github.com/milvus-io/milvus/pkg/util/metautil"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/timerecord"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)

// IndexBuildTask is used to record the information of the index tasks.
Expand Down Expand Up @@ -221,7 +223,8 @@ func (it *indexBuildTask) Execute(ctx context.Context) error {
}

// check load size and size of field data
localUsedSize, err := indexcgowrapper.GetLocalUsedSize(paramtable.Get().LocalStorageCfg.Path.GetValue())
localInPath := filepath.Join(paramtable.Get().LocalStorageCfg.Path.GetValue(), typeutil.IndexNodeRole)
localUsedSize, err := indexcgowrapper.GetLocalUsedSize(localInPath)
if err != nil {
log.Warn("IndexNode get local used size failed")
return err
Expand Down
Loading

0 comments on commit 89b71db

Please sign in to comment.