diff --git a/configs/milvus.yaml b/configs/milvus.yaml index eec18547040fe..0b78999c5776a 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -269,8 +269,9 @@ queryNode: enableDisk: false # enable querynode load disk index, and search on disk index maxDiskUsagePercentage: 95 cache: - enabled: true - memoryLimit: 2147483648 # 2 GB, 2 * 1024 *1024 *1024 + enabled: true # deprecated, TODO: remove it + memoryLimit: 2147483648 # 2 GB, 2 * 1024 *1024 *1024 # deprecated, TODO: remove it + readAheadPolicy: willneed # The read ahead policy of chunk cache, options: `normal, random, sequential, willneed, dontneed` grouping: enabled: true maxNQ: 1000 diff --git a/internal/core/src/storage/ChunkCache.cpp b/internal/core/src/storage/ChunkCache.cpp index 8cee77b570daa..0a0454c4830ff 100644 --- a/internal/core/src/storage/ChunkCache.cpp +++ b/internal/core/src/storage/ChunkCache.cpp @@ -55,7 +55,7 @@ ChunkCache::Prefetch(const std::string& filepath) { auto ok = madvise(reinterpret_cast(const_cast(column->Data())), column->ByteSize(), - MADV_WILLNEED); // TODO: sheep, make it configurable + read_ahead_policy_); AssertInfo(ok == 0, fmt::format("failed to madvise to the data file {}, err: {}", path.c_str(), diff --git a/internal/core/src/storage/ChunkCache.h b/internal/core/src/storage/ChunkCache.h index 5cbc331c93433..9d842b8e556ec 100644 --- a/internal/core/src/storage/ChunkCache.h +++ b/internal/core/src/storage/ChunkCache.h @@ -21,11 +21,23 @@ namespace milvus::storage { +extern std::map ReadAheadPolicy_Map; + class ChunkCache { public: - explicit ChunkCache(const std::string& path, ChunkManagerPtr cm) - : path_prefix_(path), cm_(cm) { - LOG_SEGCORE_INFO_ << "Init ChunkCache with prefix: " << path_prefix_; + explicit ChunkCache(std::string path, + const std::string& read_ahead_policy, + ChunkManagerPtr cm) + : path_prefix_(std::move(path)), cm_(cm) { + auto iter = ReadAheadPolicy_Map.find(read_ahead_policy); + AssertInfo(iter != ReadAheadPolicy_Map.end(), + fmt::format("unrecognized read ahead policy: {}, " + "should be one of `normal, random, sequential, " + "willneed, dontneed`", + read_ahead_policy)); + read_ahead_policy_ = iter->second; + LOG_SEGCORE_INFO_ << "Init ChunkCache with prefix: " << path_prefix_ + << ", read_ahead_policy: " << read_ahead_policy; } ~ChunkCache() = default; @@ -51,6 +63,7 @@ class ChunkCache { private: mutable std::mutex mutex_; + int read_ahead_policy_; std::string path_prefix_; ChunkManagerPtr cm_; ColumnTable columns_; diff --git a/internal/core/src/storage/ChunkCacheSingleton.h b/internal/core/src/storage/ChunkCacheSingleton.h index bc84461249447..c1abfb7379621 100644 --- a/internal/core/src/storage/ChunkCacheSingleton.h +++ b/internal/core/src/storage/ChunkCacheSingleton.h @@ -39,11 +39,12 @@ class ChunkCacheSingleton { } void - Init(const std::string& root_path) { + Init(std::string root_path, std::string read_ahead_policy) { if (cc_ == nullptr) { auto rcm = RemoteChunkManagerSingleton::GetInstance() .GetRemoteChunkManager(); - cc_ = std::make_shared(root_path, rcm); + cc_ = std::make_shared( + std::move(root_path), std::move(read_ahead_policy), rcm); } } diff --git a/internal/core/src/storage/Util.cpp b/internal/core/src/storage/Util.cpp index 8a1a7a3e3a5a0..d5f41f8abb209 100644 --- a/internal/core/src/storage/Util.cpp +++ b/internal/core/src/storage/Util.cpp @@ -55,6 +55,13 @@ std::map CloudProviderType_Map = { {"aliyun", CloudProviderType::ALIYUN}, {"azure", CloudProviderType::AZURE}}; +std::map ReadAheadPolicy_Map = { + {"normal", MADV_NORMAL}, + {"random", MADV_RANDOM}, + {"sequential", MADV_SEQUENTIAL}, + {"willneed", MADV_WILLNEED}, + {"dontneed", MADV_DONTNEED}}; + StorageType ReadMediumType(BinlogReaderPtr reader) { AssertInfo(reader->Tell() == 0, diff --git a/internal/core/src/storage/storage_c.cpp b/internal/core/src/storage/storage_c.cpp index 2dcea75c204cc..e5c83a3496ec4 100644 --- a/internal/core/src/storage/storage_c.cpp +++ b/internal/core/src/storage/storage_c.cpp @@ -81,9 +81,10 @@ InitRemoteChunkManagerSingleton(CStorageConfig c_storage_config) { } CStatus -InitChunkCacheSingleton(const char* c_dir_path) { +InitChunkCacheSingleton(const char* c_dir_path, const char* read_ahead_policy) { try { - milvus::storage::ChunkCacheSingleton::GetInstance().Init(c_dir_path); + milvus::storage::ChunkCacheSingleton::GetInstance().Init( + c_dir_path, read_ahead_policy); return milvus::SuccessCStatus(); } catch (std::exception& e) { return milvus::FailureCStatus(&e); diff --git a/internal/core/src/storage/storage_c.h b/internal/core/src/storage/storage_c.h index d0fe400fadcea..a10b38c3c21a7 100644 --- a/internal/core/src/storage/storage_c.h +++ b/internal/core/src/storage/storage_c.h @@ -31,7 +31,7 @@ CStatus InitRemoteChunkManagerSingleton(CStorageConfig c_storage_config); CStatus -InitChunkCacheSingleton(const char* c_dir_path); +InitChunkCacheSingleton(const char* c_dir_path, const char* read_ahead_policy); void CleanRemoteChunkManagerSingleton(); diff --git a/internal/core/unittest/test_chunk_cache.cpp b/internal/core/unittest/test_chunk_cache.cpp index bc4753b169335..5386b745f00a8 100644 --- a/internal/core/unittest/test_chunk_cache.cpp +++ b/internal/core/unittest/test_chunk_cache.cpp @@ -26,6 +26,8 @@ #include "storage/ChunkCache.h" #include "storage/LocalChunkManagerSingleton.h" +#define DEFAULT_READ_AHEAD_POLICY "willneed" + TEST(ChunkCacheTest, Read) { auto N = 10000; auto dim = 128; @@ -67,7 +69,8 @@ TEST(ChunkCacheTest, Read) { field_data_meta, field_meta); - auto cc = std::make_shared(mmap_dir, lcm); + auto cc = std::make_shared( + mmap_dir, DEFAULT_READ_AHEAD_POLICY, lcm); const auto& column = cc->Read(file_name); Assert(column->ByteSize() == dim * N * 4); @@ -129,7 +132,8 @@ TEST(ChunkCacheTest, TestMultithreads) { field_data_meta, field_meta); - auto cc = std::make_shared(mmap_dir, lcm); + auto cc = std::make_shared( + mmap_dir, DEFAULT_READ_AHEAD_POLICY, lcm); constexpr int threads = 16; std::vector total_counts(threads); diff --git a/internal/core/unittest/test_sealed.cpp b/internal/core/unittest/test_sealed.cpp index a065e4a706a1d..f4b507d9553e2 100644 --- a/internal/core/unittest/test_sealed.cpp +++ b/internal/core/unittest/test_sealed.cpp @@ -1182,7 +1182,8 @@ TEST(Sealed, GetVectorFromChunkCache) { milvus::storage::RemoteChunkManagerSingleton::GetInstance().Init(sc); auto mcm = std::make_unique(sc); mcm->CreateBucket(sc.bucket_name); - milvus::storage::ChunkCacheSingleton::GetInstance().Init(mmap_dir); + milvus::storage::ChunkCacheSingleton::GetInstance().Init(mmap_dir, + "willneed"); auto schema = std::make_shared(); auto fakevec_id = schema->AddDebugField( diff --git a/internal/querynodev2/server.go b/internal/querynodev2/server.go index af5e762d03efd..bbc831e87ac04 100644 --- a/internal/querynodev2/server.go +++ b/internal/querynodev2/server.go @@ -237,11 +237,12 @@ func (node *QueryNode) InitSegcore() error { mmapDirPath = paramtable.Get().LocalStorageCfg.Path.GetValue() } mmapDirPath += "/chunk_cache" - err = initcore.InitChunkCache(mmapDirPath) + policy := paramtable.Get().QueryNodeCfg.ReadAheadPolicy.GetValue() + err = initcore.InitChunkCache(mmapDirPath, policy) if err != nil { return err } - log.Info("InitChunkCache done", zap.String("dir", mmapDirPath)) + log.Info("InitChunkCache done", zap.String("dir", mmapDirPath), zap.String("policy", policy)) initcore.InitTraceConfig(paramtable.Get()) return nil diff --git a/internal/util/initcore/init_core.go b/internal/util/initcore/init_core.go index 296557f9ceeb8..ae96482786418 100644 --- a/internal/util/initcore/init_core.go +++ b/internal/util/initcore/init_core.go @@ -96,10 +96,12 @@ func InitRemoteChunkManager(params *paramtable.ComponentParam) error { return HandleCStatus(&status, "InitRemoteChunkManagerSingleton failed") } -func InitChunkCache(mmapDirPath string) error { +func InitChunkCache(mmapDirPath string, readAheadPolicy string) error { cMmapDirPath := C.CString(mmapDirPath) defer C.free(unsafe.Pointer(cMmapDirPath)) - status := C.InitChunkCacheSingleton(cMmapDirPath) + cReadAheadPolicy := C.CString(readAheadPolicy) + defer C.free(unsafe.Pointer(cReadAheadPolicy)) + status := C.InitChunkCacheSingleton(cMmapDirPath, cReadAheadPolicy) return HandleCStatus(&status, "InitChunkCacheSingleton failed") } diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index 004872faadbe2..475b5f1929fe5 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -1529,6 +1529,9 @@ type queryNodeConfig struct { CacheMemoryLimit ParamItem `refreshable:"false"` MmapDirPath ParamItem `refreshable:"false"` + // chunk cache + ReadAheadPolicy ParamItem `refreshable:"false"` + GroupEnabled ParamItem `refreshable:"true"` MaxReceiveChanSize ParamItem `refreshable:"false"` MaxUnsolvedQueueSize ParamItem `refreshable:"true"` @@ -1715,6 +1718,14 @@ func (p *queryNodeConfig) init(base *BaseTable) { } p.MmapDirPath.Init(base.mgr) + p.ReadAheadPolicy = ParamItem{ + Key: "queryNode.cache.readAheadPolicy", + Version: "2.3.2", + DefaultValue: "willneed", + Doc: "The read ahead policy of chunk cache, options: `normal, random, sequential, willneed, dontneed`", + } + p.ReadAheadPolicy.Init(base.mgr) + p.GroupEnabled = ParamItem{ Key: "queryNode.grouping.enabled", Version: "2.0.0", diff --git a/pkg/util/paramtable/component_param_test.go b/pkg/util/paramtable/component_param_test.go index 02d62c0babf90..43ab4a77ff693 100644 --- a/pkg/util/paramtable/component_param_test.go +++ b/pkg/util/paramtable/component_param_test.go @@ -309,6 +309,9 @@ func TestComponentParam(t *testing.T) { assert.Equal(t, 10.0, Params.CPURatio.GetAsFloat()) assert.Equal(t, uint32(runtime.GOMAXPROCS(0)), Params.KnowhereThreadPoolSize.GetAsUint32()) + // chunk cache + assert.Equal(t, "willneed", Params.ReadAheadPolicy.GetValue()) + // test small indexNlist/NProbe default params.Remove("queryNode.segcore.smallIndex.nlist") params.Remove("queryNode.segcore.smallIndex.nprobe")