diff --git a/internal/core/src/common/Common.h b/internal/core/src/common/Common.h index 49fcbcb7c8592..7be8ccf8543f3 100644 --- a/internal/core/src/common/Common.h +++ b/internal/core/src/common/Common.h @@ -30,6 +30,32 @@ extern int64_t LOW_PRIORITY_THREAD_CORE_COEFFICIENT; extern int CPU_NUM; extern int64_t EXEC_EVAL_EXPR_BATCH_SIZE; +enum class Role { QueryNode, IndexNode }; + +const std::unordered_map RoleToStringMap = { + {Role::QueryNode, "querynode"}, {Role::IndexNode, "indexnode"}}; + +// convert role to string +inline std::string +ToString(Role role) { + auto it = RoleToStringMap.find(role); + if (it != RoleToStringMap.end()) { + return it->second; + } + PanicInfo(UnexpectedError, "role {} not found", int(role)); +} + +// convert string to role +inline Role +FromString(const std::string& role_str) { + for (const auto& pair : RoleToStringMap) { + if (pair.second == role_str) { + return pair.first; + } + } + PanicInfo(UnexpectedError, "role {} not found", role_str); +} + void SetIndexSliceSize(const int64_t size); diff --git a/internal/core/src/index/VectorDiskIndex.cpp b/internal/core/src/index/VectorDiskIndex.cpp index e6360eb199159..abfebb37d19b6 100644 --- a/internal/core/src/index/VectorDiskIndex.cpp +++ b/internal/core/src/index/VectorDiskIndex.cpp @@ -47,7 +47,7 @@ VectorDiskAnnIndex::VectorDiskAnnIndex( std::make_shared(file_manager_context); AssertInfo(file_manager_ != nullptr, "create file manager failed!"); auto local_chunk_manager = - storage::LocalChunkManagerSingleton::GetInstance().GetChunkManager(); + storage::LocalChunkManagerFactory::GetInstance().GetChunkManager(); auto local_index_path_prefix = file_manager_->GetLocalIndexObjectPrefix(); // As we have guarded dup-load in QueryNode, @@ -135,7 +135,8 @@ template void VectorDiskAnnIndex::Build(const Config& config) { auto local_chunk_manager = - storage::LocalChunkManagerSingleton::GetInstance().GetChunkManager(); + storage::LocalChunkManagerFactory::GetInstance().GetChunkManager( + Role::IndexNode); knowhere::Json build_config; build_config.update(config); @@ -185,7 +186,8 @@ void VectorDiskAnnIndex::BuildWithDataset(const DatasetPtr& dataset, const Config& config) { auto local_chunk_manager = - storage::LocalChunkManagerSingleton::GetInstance().GetChunkManager(); + storage::LocalChunkManagerFactory::GetInstance().GetChunkManager( + Role::IndexNode); knowhere::Json build_config; build_config.update(config); // set data path @@ -376,7 +378,7 @@ template void VectorDiskAnnIndex::CleanLocalData() { auto local_chunk_manager = - storage::LocalChunkManagerSingleton::GetInstance().GetChunkManager(); + storage::LocalChunkManagerFactory::GetInstance().GetChunkManager(); local_chunk_manager->RemoveDir(file_manager_->GetLocalIndexObjectPrefix()); local_chunk_manager->RemoveDir( file_manager_->GetLocalRawDataObjectPrefix()); diff --git a/internal/core/src/segcore/load_index_c.cpp b/internal/core/src/segcore/load_index_c.cpp index 16d7bcdd89344..c75ba0964c236 100644 --- a/internal/core/src/segcore/load_index_c.cpp +++ b/internal/core/src/segcore/load_index_c.cpp @@ -15,6 +15,7 @@ #include "common/EasyAssert.h" #include "common/Types.h" #include "common/type_c.h" +#include "common/Common.h" #include "index/Index.h" #include "index/IndexFactory.h" #include "index/Meta.h" @@ -424,8 +425,8 @@ CleanLoadedIndex(CLoadIndexInfo c_load_index_info) { auto load_index_info = (milvus::segcore::LoadIndexInfo*)c_load_index_info; auto local_chunk_manager = - milvus::storage::LocalChunkManagerSingleton::GetInstance() - .GetChunkManager(); + milvus::storage::LocalChunkManagerFactory::GetInstance() + .GetChunkManager(Role::QueryNode); auto index_file_path_prefix = milvus::storage::GenIndexPathPrefix(local_chunk_manager, load_index_info->index_build_id, diff --git a/internal/core/src/storage/DiskFileManagerImpl.cpp b/internal/core/src/storage/DiskFileManagerImpl.cpp index e54fa6d748825..3d6a631f9423c 100644 --- a/internal/core/src/storage/DiskFileManagerImpl.cpp +++ b/internal/core/src/storage/DiskFileManagerImpl.cpp @@ -48,13 +48,18 @@ namespace milvus::storage { DiskFileManagerImpl::DiskFileManagerImpl( const FileManagerContext& fileManagerContext) : FileManagerImpl(fileManagerContext.fieldDataMeta, - fileManagerContext.indexMeta) { + fileManagerContext.indexMeta), + for_loading_index_(fileManagerContext.for_loading_index) { rcm_ = fileManagerContext.chunkManagerPtr; } DiskFileManagerImpl::~DiskFileManagerImpl() { auto local_chunk_manager = - LocalChunkManagerSingleton::GetInstance().GetChunkManager(); + for_loading_index_ + ? LocalChunkManagerFactory::GetInstance().GetChunkManager( + Role::QueryNode) + : LocalChunkManagerFactory::GetInstance().GetChunkManager( + Role::IndexNode); local_chunk_manager->RemoveDir(GetIndexPathPrefixWithBuildID( local_chunk_manager, index_meta_.build_id)); } @@ -82,7 +87,7 @@ DiskFileManagerImpl::GetRemoteTextLogPath(const std::string& file_name, bool DiskFileManagerImpl::AddFile(const std::string& file) noexcept { auto local_chunk_manager = - LocalChunkManagerSingleton::GetInstance().GetChunkManager(); + LocalChunkManagerFactory::GetInstance().GetChunkManager(); FILEMANAGER_TRY if (!local_chunk_manager->Exist(file)) { LOG_ERROR("local file {} not exists", file); @@ -134,7 +139,7 @@ DiskFileManagerImpl::AddFile(const std::string& file) noexcept { bool DiskFileManagerImpl::AddTextLog(const std::string& file) noexcept { auto local_chunk_manager = - LocalChunkManagerSingleton::GetInstance().GetChunkManager(); + LocalChunkManagerFactory::GetInstance().GetChunkManager(); FILEMANAGER_TRY if (!local_chunk_manager->Exist(file)) { LOG_ERROR("local file {} not exists", file); @@ -190,7 +195,7 @@ DiskFileManagerImpl::AddBatchIndexFiles( const std::vector& remote_files, const std::vector& remote_file_sizes) { auto local_chunk_manager = - LocalChunkManagerSingleton::GetInstance().GetChunkManager(); + LocalChunkManagerFactory::GetInstance().GetChunkManager(); auto& pool = ThreadPools::GetThreadPool(milvus::ThreadPoolPriority::HIGH); std::vector>> futures; @@ -239,7 +244,7 @@ void DiskFileManagerImpl::CacheIndexToDisk( const std::vector& remote_files) { auto local_chunk_manager = - LocalChunkManagerSingleton::GetInstance().GetChunkManager(); + LocalChunkManagerFactory::GetInstance().GetChunkManager(); std::map> index_slices; for (auto& file_path : remote_files) { @@ -307,7 +312,7 @@ void DiskFileManagerImpl::CacheTextLogToDisk( const std::vector& remote_files) { auto local_chunk_manager = - LocalChunkManagerSingleton::GetInstance().GetChunkManager(); + LocalChunkManagerFactory::GetInstance().GetChunkManager(); std::map> index_slices; for (auto& file_path : remote_files) { @@ -376,7 +381,8 @@ DiskFileManagerImpl::CacheRawDataToDisk(std::vector remote_files) { auto field_id = GetFieldDataMeta().field_id; auto local_chunk_manager = - LocalChunkManagerSingleton::GetInstance().GetChunkManager(); + LocalChunkManagerFactory::GetInstance().GetChunkManager( + Role::IndexNode); std::string local_data_path; bool file_created = false; @@ -619,7 +625,8 @@ DiskFileManagerImpl::CacheOptFieldToDisk(OptFieldT& fields_map) { auto segment_id = GetFieldDataMeta().segment_id; auto vec_field_id = GetFieldDataMeta().field_id; auto local_chunk_manager = - LocalChunkManagerSingleton::GetInstance().GetChunkManager(); + LocalChunkManagerFactory::GetInstance().GetChunkManager( + Role::IndexNode); auto local_data_path = storage::GenFieldRawDataPathPrefix( local_chunk_manager, segment_id, vec_field_id) + std::string(VEC_OPT_FIELDS); @@ -696,7 +703,8 @@ DiskFileManagerImpl::GetFileName(const std::string& localfile) { std::string DiskFileManagerImpl::GetLocalIndexObjectPrefix() { auto local_chunk_manager = - LocalChunkManagerSingleton::GetInstance().GetChunkManager(); + LocalChunkManagerFactory::GetInstance().GetChunkManager( + Role::QueryNode); return GenIndexPathPrefix( local_chunk_manager, index_meta_.build_id, index_meta_.index_version); } @@ -704,7 +712,8 @@ DiskFileManagerImpl::GetLocalIndexObjectPrefix() { std::string DiskFileManagerImpl::GetLocalTextIndexPrefix() { auto local_chunk_manager = - LocalChunkManagerSingleton::GetInstance().GetChunkManager(); + LocalChunkManagerFactory::GetInstance().GetChunkManager( + Role::QueryNode); return GenTextIndexPathPrefix(local_chunk_manager, index_meta_.build_id, index_meta_.index_version, @@ -729,7 +738,8 @@ DiskFileManagerImpl::GetTextIndexIdentifier() { std::string DiskFileManagerImpl::GetLocalRawDataObjectPrefix() { auto local_chunk_manager = - LocalChunkManagerSingleton::GetInstance().GetChunkManager(); + LocalChunkManagerFactory::GetInstance().GetChunkManager( + Role::IndexNode); return GenFieldRawDataPathPrefix( local_chunk_manager, field_meta_.segment_id, field_meta_.field_id); } @@ -744,7 +754,7 @@ std::optional DiskFileManagerImpl::IsExisted(const std::string& file) noexcept { bool isExist = false; auto local_chunk_manager = - LocalChunkManagerSingleton::GetInstance().GetChunkManager(); + LocalChunkManagerFactory::GetInstance().GetChunkManager(); try { isExist = local_chunk_manager->Exist(file); } catch (std::exception& e) { diff --git a/internal/core/src/storage/DiskFileManagerImpl.h b/internal/core/src/storage/DiskFileManagerImpl.h index b2c87b1ff78db..35ed387515140 100644 --- a/internal/core/src/storage/DiskFileManagerImpl.h +++ b/internal/core/src/storage/DiskFileManagerImpl.h @@ -131,6 +131,9 @@ class DiskFileManagerImpl : public FileManagerImpl { // remote file path std::map remote_paths_to_size_; + + // indicate whether disk file manager is used for building index or load index + bool for_loading_index_; }; using DiskANNFileManagerImplPtr = std::shared_ptr; diff --git a/internal/core/src/storage/LocalChunkManagerSingleton.h b/internal/core/src/storage/LocalChunkManagerSingleton.h index 2715796d7b771..41959874b5b3a 100644 --- a/internal/core/src/storage/LocalChunkManagerSingleton.h +++ b/internal/core/src/storage/LocalChunkManagerSingleton.h @@ -20,41 +20,56 @@ #include #include +#include "common/Common.h" #include "storage/ChunkManager.h" #include "storage/LocalChunkManager.h" namespace milvus::storage { -class LocalChunkManagerSingleton { - private: - LocalChunkManagerSingleton() { - } - +class LocalChunkManagerFactory { public: - LocalChunkManagerSingleton(const LocalChunkManagerSingleton&) = delete; - LocalChunkManagerSingleton& - operator=(const LocalChunkManagerSingleton&) = delete; - - static LocalChunkManagerSingleton& + static LocalChunkManagerFactory& GetInstance() { - static LocalChunkManagerSingleton instance; + static LocalChunkManagerFactory instance; return instance; } void - Init(std::string root_path) { - if (lcm_ == nullptr) { - lcm_ = std::make_shared(root_path); + AddChunkManager(Role role, std::string root_path) { + std::unique_lock lock(mutex_); + if (chunk_managers_.find(role) != chunk_managers_.end()) { + PanicInfo(UnexpectedError, + "chunk manager for role {} already exists", + ToString(role)); + } + chunk_managers_[role] = std::make_shared(root_path); + } + + LocalChunkManagerSPtr + GetChunkManager(Role role) const { + std::shared_lock lock(mutex_); + auto it = chunk_managers_.find(role); + if (it == chunk_managers_.end()) { + PanicInfo(UnexpectedError, + "local chunk manager for role:{} not found", + ToString(role)); } + return it->second; } + // some situations not need to specify the role + // just randomly choose one chunk manager + // because local chunk manager no need root_path + // and interface use abs paths params LocalChunkManagerSPtr - GetChunkManager() { - return lcm_; + GetChunkManager() const { + std::shared_lock lock(mutex_); + Assert(chunk_managers_.size() != 0); + return chunk_managers_.begin()->second; } - private: - LocalChunkManagerSPtr lcm_ = nullptr; + mutable std::shared_mutex mutex_; + std::unordered_map chunk_managers_; }; } // namespace milvus::storage \ No newline at end of file diff --git a/internal/core/src/storage/MmapChunkManager.cpp b/internal/core/src/storage/MmapChunkManager.cpp index 935e9db2e8260..2bff3c0cc112c 100644 --- a/internal/core/src/storage/MmapChunkManager.cpp +++ b/internal/core/src/storage/MmapChunkManager.cpp @@ -228,7 +228,7 @@ MmapChunkManager::~MmapChunkManager() { } // clean the mmap dir auto cm = - storage::LocalChunkManagerSingleton::GetInstance().GetChunkManager(); + storage::LocalChunkManagerFactory::GetInstance().GetChunkManager(); if (cm->Exist(mmap_file_prefix_)) { cm->RemoveDir(mmap_file_prefix_); } @@ -310,7 +310,7 @@ MmapChunkManager::MmapChunkManager(std::string root_path, std::make_unique(disk_limit, file_size, root_path); mmap_file_prefix_ = root_path; auto cm = - storage::LocalChunkManagerSingleton::GetInstance().GetChunkManager(); + storage::LocalChunkManagerFactory::GetInstance().GetChunkManager(); AssertInfo(cm != nullptr, "Fail to get LocalChunkManager, LocalChunkManagerPtr is null"); if (cm->Exist(root_path)) { diff --git a/internal/core/src/storage/storage_c.cpp b/internal/core/src/storage/storage_c.cpp index d2305b9153bdd..00d278f3bd44f 100644 --- a/internal/core/src/storage/storage_c.cpp +++ b/internal/core/src/storage/storage_c.cpp @@ -15,6 +15,7 @@ // limitations under the License. #include "storage/storage_c.h" +#include "common/Common.h" #include "monitor/prometheus_client.h" #include "storage/RemoteChunkManagerSingleton.h" #include "storage/LocalChunkManagerSingleton.h" @@ -24,7 +25,7 @@ CStatus GetLocalUsedSize(const char* c_dir, int64_t* size) { try { auto local_chunk_manager = - milvus::storage::LocalChunkManagerSingleton::GetInstance() + milvus::storage::LocalChunkManagerFactory::GetInstance() .GetChunkManager(); std::string dir(c_dir); if (local_chunk_manager->DirExist(dir)) { @@ -39,10 +40,12 @@ GetLocalUsedSize(const char* c_dir, int64_t* size) { } CStatus -InitLocalChunkManagerSingleton(const char* c_path) { +InitLocalChunkManager(const char* c_role, const char* c_path) { try { + std::string role(c_role); std::string path(c_path); - milvus::storage::LocalChunkManagerSingleton::GetInstance().Init(path); + milvus::storage::LocalChunkManagerFactory::GetInstance() + .AddChunkManager(milvus::FromString(role), path); return milvus::SuccessCStatus(); } catch (std::exception& e) { diff --git a/internal/core/src/storage/storage_c.h b/internal/core/src/storage/storage_c.h index d6cac3b46fae9..748ad0c4a85ba 100644 --- a/internal/core/src/storage/storage_c.h +++ b/internal/core/src/storage/storage_c.h @@ -25,7 +25,7 @@ CStatus GetLocalUsedSize(const char* c_path, int64_t* size); CStatus -InitLocalChunkManagerSingleton(const char* path); +InitLocalChunkManager(const char* path); CStatus InitRemoteChunkManagerSingleton(CStorageConfig c_storage_config); diff --git a/internal/indexnode/indexnode.go b/internal/indexnode/indexnode.go index 31617d18276ed..b67faa5071a1f 100644 --- a/internal/indexnode/indexnode.go +++ b/internal/indexnode/indexnode.go @@ -186,7 +186,7 @@ func (i *IndexNode) initSegcore() { C.SegcoreSetKnowhereBuildThreadPoolNum(cKnowhereThreadPoolSize) localDataRootPath := filepath.Join(Params.LocalStorageCfg.Path.GetValue(), typeutil.IndexNodeRole) - initcore.InitLocalChunkManager(localDataRootPath) + initcore.InitLocalChunkManager(typeutil.IndexNodeRole, localDataRootPath) cGpuMemoryPoolInitSize := C.uint32_t(paramtable.Get().GpuConfig.InitSize.GetAsUint32()) cGpuMemoryPoolMaxSize := C.uint32_t(paramtable.Get().GpuConfig.MaxSize.GetAsUint32()) C.SegcoreSetKnowhereGpuMemoryPoolSize(cGpuMemoryPoolInitSize, cGpuMemoryPoolMaxSize) diff --git a/internal/indexnode/task_index.go b/internal/indexnode/task_index.go index 87e0f44be8684..a33b2f9e3b7fa 100644 --- a/internal/indexnode/task_index.go +++ b/internal/indexnode/task_index.go @@ -19,6 +19,7 @@ package indexnode import ( "context" "fmt" + "path/filepath" "strconv" "strings" "time" @@ -42,6 +43,7 @@ import ( "github.com/milvus-io/milvus/pkg/util/metautil" "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/timerecord" + "github.com/milvus-io/milvus/pkg/util/typeutil" ) // IndexBuildTask is used to record the information of the index tasks. @@ -221,7 +223,8 @@ func (it *indexBuildTask) Execute(ctx context.Context) error { } // check load size and size of field data - localUsedSize, err := indexcgowrapper.GetLocalUsedSize(paramtable.Get().LocalStorageCfg.Path.GetValue()) + localInPath := filepath.Join(paramtable.Get().LocalStorageCfg.Path.GetValue(), typeutil.IndexNodeRole) + localUsedSize, err := indexcgowrapper.GetLocalUsedSize(localInPath) if err != nil { log.Warn("IndexNode get local used size failed") return err diff --git a/internal/querynodev2/segments/segment.go b/internal/querynodev2/segments/segment.go index 41188c1139bd5..85cf31d9b5c82 100644 --- a/internal/querynodev2/segments/segment.go +++ b/internal/querynodev2/segments/segment.go @@ -29,6 +29,7 @@ import "C" import ( "context" "fmt" + "path/filepath" "strings" "time" "unsafe" @@ -1274,7 +1275,8 @@ func (s *LocalSegment) Release(ctx context.Context, opts ...releaseOption) { GetDynamicPool().Submit(func() (any, error) { C.DeleteSegment(ptr) - localDiskUsage, err := segcore.GetLocalUsedSize(context.Background(), paramtable.Get().LocalStorageCfg.Path.GetValue()) + localQnPaths := filepath.Join(paramtable.Get().LocalStorageCfg.Path.GetValue(), typeutil.QueryNodeRole) + localDiskUsage, err := segcore.GetLocalUsedSize(context.Background(), localQnPaths) // ignore error here, shall not block releasing if err == nil { metrics.QueryNodeDiskUsedSize.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Set(float64(localDiskUsage) / 1024 / 1024) // in MB diff --git a/internal/querynodev2/segments/segment_loader.go b/internal/querynodev2/segments/segment_loader.go index 6e883ab30781a..d9722c2456acb 100644 --- a/internal/querynodev2/segments/segment_loader.go +++ b/internal/querynodev2/segments/segment_loader.go @@ -28,6 +28,7 @@ import ( "fmt" "io" "path" + "path/filepath" "runtime/debug" "strconv" "sync" @@ -453,7 +454,8 @@ func (loader *segmentLoader) requestResource(ctx context.Context, infos ...*quer memoryUsage := hardware.GetUsedMemoryCount() totalMemory := hardware.GetMemoryCount() - diskUsage, err := segcore.GetLocalUsedSize(ctx, paramtable.Get().LocalStorageCfg.Path.GetValue()) + localQnPath := filepath.Join(paramtable.Get().LocalStorageCfg.Path.GetValue(), typeutil.QueryNodeRole) + diskUsage, err := segcore.GetLocalUsedSize(ctx, localQnPath) if err != nil { return result, errors.Wrap(err, "get local used size failed") } @@ -1366,7 +1368,8 @@ func (loader *segmentLoader) checkSegmentSize(ctx context.Context, segmentLoadIn return 0, 0, errors.New("get memory failed when checkSegmentSize") } - localDiskUsage, err := segcore.GetLocalUsedSize(ctx, paramtable.Get().LocalStorageCfg.Path.GetValue()) + localQnPaths := filepath.Join(paramtable.Get().LocalStorageCfg.Path.GetValue(), typeutil.QueryNodeRole) + localDiskUsage, err := segcore.GetLocalUsedSize(ctx, localQnPaths) if err != nil { return 0, 0, errors.Wrap(err, "get local used size failed") } diff --git a/internal/querynodev2/server.go b/internal/querynodev2/server.go index fd331a6bdfa0b..b8bb818c9eedf 100644 --- a/internal/querynodev2/server.go +++ b/internal/querynodev2/server.go @@ -244,7 +244,7 @@ func (node *QueryNode) InitSegcore() error { C.SegcoreSetKnowhereGpuMemoryPoolSize(cGpuMemoryPoolInitSize, cGpuMemoryPoolMaxSize) localDataRootPath := filepath.Join(paramtable.Get().LocalStorageCfg.Path.GetValue(), typeutil.QueryNodeRole) - initcore.InitLocalChunkManager(localDataRootPath) + initcore.InitLocalChunkManager(typeutil.QueryNodeRole, localDataRootPath) err := initcore.InitRemoteChunkManager(paramtable.Get()) if err != nil { @@ -321,7 +321,7 @@ func (node *QueryNode) Init() error { node.factory.Init(paramtable.Get()) - localRootPath := paramtable.Get().LocalStorageCfg.Path.GetValue() + localRootPath := filepath.Join(paramtable.Get().LocalStorageCfg.Path.GetValue(), typeutil.QueryNodeRole) localUsedSize, err := segcore.GetLocalUsedSize(node.ctx, localRootPath) if err != nil { log.Warn("get local used size failed", zap.Error(err)) diff --git a/internal/util/initcore/init_core.go b/internal/util/initcore/init_core.go index 88a4f4ae5aa15..fcd5b0b08d990 100644 --- a/internal/util/initcore/init_core.go +++ b/internal/util/initcore/init_core.go @@ -40,10 +40,12 @@ import ( "github.com/milvus-io/milvus/pkg/util/paramtable" ) -func InitLocalChunkManager(path string) { +func InitLocalChunkManager(role string, path string) { + CRole := C.CString(role) + defer C.free(unsafe.Pointer(CRole)) CLocalRootPath := C.CString(path) defer C.free(unsafe.Pointer(CLocalRootPath)) - C.InitLocalChunkManagerSingleton(CLocalRootPath) + C.InitLocalChunkManager(CRole, CLocalRootPath) } func InitTraceConfig(params *paramtable.ComponentParam) {