Skip to content

Commit

Permalink
Make read ahead policy in ChunkCache configurable
Browse files Browse the repository at this point in the history
Signed-off-by: bigsheeper <[email protected]>
  • Loading branch information
bigsheeper committed Sep 28, 2023
1 parent 8c59dba commit e52dff3
Show file tree
Hide file tree
Showing 13 changed files with 63 additions and 18 deletions.
5 changes: 3 additions & 2 deletions configs/milvus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -269,8 +269,9 @@ queryNode:
enableDisk: false # enable querynode load disk index, and search on disk index
maxDiskUsagePercentage: 95
cache:
enabled: true
memoryLimit: 2147483648 # 2 GB, 2 * 1024 *1024 *1024
enabled: true # deprecated, TODO: remove it
memoryLimit: 2147483648 # 2 GB, 2 * 1024 *1024 *1024 # deprecated, TODO: remove it
readAheadPolicy: willneed # The read ahead policy of chunk cache, options: `normal, random, sequential, willneed, dontneed`
grouping:
enabled: true
maxNQ: 1000
Expand Down
2 changes: 1 addition & 1 deletion internal/core/src/storage/ChunkCache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ ChunkCache::Prefetch(const std::string& filepath) {
auto ok =
madvise(reinterpret_cast<void*>(const_cast<char*>(column->Data())),
column->ByteSize(),
MADV_WILLNEED); // TODO: sheep, make it configurable
read_ahead_policy_);
AssertInfo(ok == 0,
fmt::format("failed to madvise to the data file {}, err: {}",
path.c_str(),
Expand Down
19 changes: 16 additions & 3 deletions internal/core/src/storage/ChunkCache.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,23 @@

namespace milvus::storage {

extern std::map<std::string, int> ReadAheadPolicy_Map;

class ChunkCache {
public:
explicit ChunkCache(const std::string& path, ChunkManagerPtr cm)
: path_prefix_(path), cm_(cm) {
LOG_SEGCORE_INFO_ << "Init ChunkCache with prefix: " << path_prefix_;
explicit ChunkCache(std::string path,
const std::string& read_ahead_policy,
ChunkManagerPtr cm)
: path_prefix_(std::move(path)), cm_(cm) {
auto iter = ReadAheadPolicy_Map.find(read_ahead_policy);
AssertInfo(iter != ReadAheadPolicy_Map.end(),
fmt::format("unrecognized read ahead policy: {}, "
"should be one of `normal, random, sequential, "
"willneed, dontneed`",
read_ahead_policy));
read_ahead_policy_ = iter->second;
LOG_SEGCORE_INFO_ << "Init ChunkCache with prefix: " << path_prefix_
<< ", read_ahead_policy: " << read_ahead_policy;
}

~ChunkCache() = default;
Expand All @@ -51,6 +63,7 @@ class ChunkCache {

private:
mutable std::mutex mutex_;
int read_ahead_policy_;
std::string path_prefix_;
ChunkManagerPtr cm_;
ColumnTable columns_;
Expand Down
5 changes: 3 additions & 2 deletions internal/core/src/storage/ChunkCacheSingleton.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,12 @@ class ChunkCacheSingleton {
}

void
Init(const std::string& root_path) {
Init(std::string root_path, std::string read_ahead_policy) {
if (cc_ == nullptr) {
auto rcm = RemoteChunkManagerSingleton::GetInstance()
.GetRemoteChunkManager();
cc_ = std::make_shared<ChunkCache>(root_path, rcm);
cc_ = std::make_shared<ChunkCache>(
std::move(root_path), std::move(read_ahead_policy), rcm);
}
}

Expand Down
7 changes: 7 additions & 0 deletions internal/core/src/storage/Util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,13 @@ std::map<std::string, CloudProviderType> CloudProviderType_Map = {
{"aliyun", CloudProviderType::ALIYUN},
{"azure", CloudProviderType::AZURE}};

std::map<std::string, int> ReadAheadPolicy_Map = {
{"normal", MADV_NORMAL},
{"random", MADV_RANDOM},
{"sequential", MADV_SEQUENTIAL},
{"willneed", MADV_WILLNEED},
{"dontneed", MADV_DONTNEED}};

StorageType
ReadMediumType(BinlogReaderPtr reader) {
AssertInfo(reader->Tell() == 0,
Expand Down
5 changes: 3 additions & 2 deletions internal/core/src/storage/storage_c.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,10 @@ InitRemoteChunkManagerSingleton(CStorageConfig c_storage_config) {
}

CStatus
InitChunkCacheSingleton(const char* c_dir_path) {
InitChunkCacheSingleton(const char* c_dir_path, const char* read_ahead_policy) {
try {
milvus::storage::ChunkCacheSingleton::GetInstance().Init(c_dir_path);
milvus::storage::ChunkCacheSingleton::GetInstance().Init(
c_dir_path, read_ahead_policy);
return milvus::SuccessCStatus();
} catch (std::exception& e) {
return milvus::FailureCStatus(&e);
Expand Down
2 changes: 1 addition & 1 deletion internal/core/src/storage/storage_c.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ CStatus
InitRemoteChunkManagerSingleton(CStorageConfig c_storage_config);

CStatus
InitChunkCacheSingleton(const char* c_dir_path);
InitChunkCacheSingleton(const char* c_dir_path, const char* read_ahead_policy);

void
CleanRemoteChunkManagerSingleton();
Expand Down
8 changes: 6 additions & 2 deletions internal/core/unittest/test_chunk_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
#include "storage/ChunkCache.h"
#include "storage/LocalChunkManagerSingleton.h"

#define DEFAULT_READ_AHEAD_POLICY "willneed"

TEST(ChunkCacheTest, Read) {
auto N = 10000;
auto dim = 128;
Expand Down Expand Up @@ -67,7 +69,8 @@ TEST(ChunkCacheTest, Read) {
field_data_meta,
field_meta);

auto cc = std::make_shared<milvus::storage::ChunkCache>(mmap_dir, lcm);
auto cc = std::make_shared<milvus::storage::ChunkCache>(
mmap_dir, DEFAULT_READ_AHEAD_POLICY, lcm);
const auto& column = cc->Read(file_name);
Assert(column->ByteSize() == dim * N * 4);

Expand Down Expand Up @@ -129,7 +132,8 @@ TEST(ChunkCacheTest, TestMultithreads) {
field_data_meta,
field_meta);

auto cc = std::make_shared<milvus::storage::ChunkCache>(mmap_dir, lcm);
auto cc = std::make_shared<milvus::storage::ChunkCache>(
mmap_dir, DEFAULT_READ_AHEAD_POLICY, lcm);

constexpr int threads = 16;
std::vector<int64_t> total_counts(threads);
Expand Down
3 changes: 2 additions & 1 deletion internal/core/unittest/test_sealed.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1182,7 +1182,8 @@ TEST(Sealed, GetVectorFromChunkCache) {
milvus::storage::RemoteChunkManagerSingleton::GetInstance().Init(sc);
auto mcm = std::make_unique<milvus::storage::MinioChunkManager>(sc);
mcm->CreateBucket(sc.bucket_name);
milvus::storage::ChunkCacheSingleton::GetInstance().Init(mmap_dir);
milvus::storage::ChunkCacheSingleton::GetInstance().Init(mmap_dir,
"willneed");

auto schema = std::make_shared<Schema>();
auto fakevec_id = schema->AddDebugField(
Expand Down
5 changes: 3 additions & 2 deletions internal/querynodev2/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,11 +237,12 @@ func (node *QueryNode) InitSegcore() error {
mmapDirPath = paramtable.Get().LocalStorageCfg.Path.GetValue()
}
mmapDirPath += "/chunk_cache"
err = initcore.InitChunkCache(mmapDirPath)
policy := paramtable.Get().QueryNodeCfg.ReadAheadPolicy.GetValue()
err = initcore.InitChunkCache(mmapDirPath, policy)
if err != nil {
return err
}
log.Info("InitChunkCache done", zap.String("dir", mmapDirPath))
log.Info("InitChunkCache done", zap.String("dir", mmapDirPath), zap.String("policy", policy))

initcore.InitTraceConfig(paramtable.Get())
return nil
Expand Down
6 changes: 4 additions & 2 deletions internal/util/initcore/init_core.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,12 @@ func InitRemoteChunkManager(params *paramtable.ComponentParam) error {
return HandleCStatus(&status, "InitRemoteChunkManagerSingleton failed")
}

func InitChunkCache(mmapDirPath string) error {
func InitChunkCache(mmapDirPath string, readAheadPolicy string) error {
cMmapDirPath := C.CString(mmapDirPath)
defer C.free(unsafe.Pointer(cMmapDirPath))
status := C.InitChunkCacheSingleton(cMmapDirPath)
cReadAheadPolicy := C.CString(readAheadPolicy)
defer C.free(unsafe.Pointer(cReadAheadPolicy))
status := C.InitChunkCacheSingleton(cMmapDirPath, cReadAheadPolicy)
return HandleCStatus(&status, "InitChunkCacheSingleton failed")
}

Expand Down
11 changes: 11 additions & 0 deletions pkg/util/paramtable/component_param.go
Original file line number Diff line number Diff line change
Expand Up @@ -1529,6 +1529,9 @@ type queryNodeConfig struct {
CacheMemoryLimit ParamItem `refreshable:"false"`
MmapDirPath ParamItem `refreshable:"false"`

// chunk cache
ReadAheadPolicy ParamItem `refreshable:"false"`

GroupEnabled ParamItem `refreshable:"true"`
MaxReceiveChanSize ParamItem `refreshable:"false"`
MaxUnsolvedQueueSize ParamItem `refreshable:"true"`
Expand Down Expand Up @@ -1715,6 +1718,14 @@ func (p *queryNodeConfig) init(base *BaseTable) {
}
p.MmapDirPath.Init(base.mgr)

p.ReadAheadPolicy = ParamItem{
Key: "queryNode.cache.readAheadPolicy",
Version: "2.3.2",
DefaultValue: "willneed",
Doc: "The read ahead policy of chunk cache, options: `normal, random, sequential, willneed, dontneed`",
}
p.ReadAheadPolicy.Init(base.mgr)

p.GroupEnabled = ParamItem{
Key: "queryNode.grouping.enabled",
Version: "2.0.0",
Expand Down
3 changes: 3 additions & 0 deletions pkg/util/paramtable/component_param_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,9 @@ func TestComponentParam(t *testing.T) {
assert.Equal(t, 10.0, Params.CPURatio.GetAsFloat())
assert.Equal(t, uint32(runtime.GOMAXPROCS(0)), Params.KnowhereThreadPoolSize.GetAsUint32())

// chunk cache
assert.Equal(t, "willneed", Params.ReadAheadPolicy.GetValue())

// test small indexNlist/NProbe default
params.Remove("queryNode.segcore.smallIndex.nlist")
params.Remove("queryNode.segcore.smallIndex.nprobe")
Expand Down

0 comments on commit e52dff3

Please sign in to comment.