Skip to content

Commit

Permalink
fix: Fix chunked segment can not warmup using mmap (#38492)
Browse files Browse the repository at this point in the history
issue: #38410

---------

Signed-off-by: sunby <[email protected]>
  • Loading branch information
sunby authored Dec 17, 2024
1 parent 33aecb0 commit dd4f33a
Show file tree
Hide file tree
Showing 5 changed files with 206 additions and 151 deletions.
4 changes: 2 additions & 2 deletions internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ ChunkedSegmentSealedImpl::WarmupChunkCache(const FieldId field_id,

auto cc = storage::MmapManager::GetInstance().GetChunkCache();
for (const auto& data_path : field_info.insert_files) {
auto column = cc->Read(data_path, mmap_descriptor_, field_meta);
auto column = cc->Read(data_path, field_meta, mmap_enabled, true);
}
}

Expand Down Expand Up @@ -948,7 +948,7 @@ std::tuple<
descriptor,
const FieldMeta&
field_meta) {
auto column = cc->Read(data_path, descriptor, field_meta);
auto column = cc->Read(data_path, field_meta, true);
cc->Prefetch(data_path);
return {data_path, std::dynamic_pointer_cast<ChunkedColumnBase>(column)};
}
Expand Down
67 changes: 63 additions & 4 deletions internal/core/src/storage/ChunkCache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,14 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <filesystem>
#include <future>
#include <memory>

#include "ChunkCache.h"
#include "boost/filesystem/operations.hpp"
#include "boost/filesystem/path.hpp"
#include "common/Chunk.h"
#include "common/ChunkWriter.h"
#include "common/FieldMeta.h"
#include "common/Types.h"
Expand All @@ -26,8 +30,9 @@
namespace milvus::storage {
std::shared_ptr<ColumnBase>
ChunkCache::Read(const std::string& filepath,
const MmapChunkDescriptorPtr& descriptor,
const FieldMeta& field_meta) {
const FieldMeta& field_meta,
bool mmap_enabled,
bool mmap_rss_not_need) {
// use rlock to get future
{
std::shared_lock lck(mutex_);
Expand Down Expand Up @@ -66,8 +71,28 @@ ChunkCache::Read(const std::string& filepath,
auto field_data =
DownloadAndDecodeRemoteFile(cm_.get(), filepath, false);

auto chunk = create_chunk(
field_meta, field_meta.get_dim(), field_data->GetReader()->reader);
std::shared_ptr<Chunk> chunk;
auto dim = IsSparseFloatVectorDataType(field_meta.get_data_type())
? 1
: field_meta.get_dim();
if (mmap_enabled) {
auto path = std::filesystem::path(CachePath(filepath));
auto dir = path.parent_path();
std::filesystem::create_directories(dir);

auto file = File::Open(path.string(), O_CREAT | O_TRUNC | O_RDWR);
chunk = create_chunk(
field_meta, dim, file, 0, field_data->GetReader()->reader);
// unlink
auto ok = unlink(path.c_str());
AssertInfo(ok == 0,
"failed to unlink mmap data file {}, err: {}",
path.c_str(),
strerror(errno));
} else {
chunk =
create_chunk(field_meta, dim, field_data->GetReader()->reader);
}

auto data_type = field_meta.get_data_type();
if (IsSparseFloatVectorDataType(data_type)) {
Expand All @@ -83,6 +108,22 @@ ChunkCache::Read(const std::string& filepath,
std::vector<std::shared_ptr<Chunk>> chunks{chunk};
column = std::make_shared<ChunkedColumn>(chunks);
}
if (mmap_enabled && mmap_rss_not_need) {
auto ok = madvise(reinterpret_cast<void*>(
const_cast<char*>(column->MmappedData())),
column->DataByteSize(),
ReadAheadPolicy_Map["dontneed"]);
if (ok != 0) {
LOG_WARN(
"failed to madvise to the data file {}, addr {}, size {}, "
"err: "
"{}",
filepath,
static_cast<const void*>(column->MmappedData()),
column->DataByteSize(),
strerror(errno));
}
}
} catch (const SegcoreError& e) {
err_code = e.get_error_code();
err_msg = fmt::format("failed to read for chunkCache, seg_core_err:{}",
Expand Down Expand Up @@ -261,4 +302,22 @@ ChunkCache::ConvertToColumn(const FieldDataPtr& field_data,
column->AppendBatch(field_data);
return column;
}

// TODO(sunby): use mmap chunk manager to create chunk
std::string
ChunkCache::CachePath(const std::string& filepath) {
auto path = std::filesystem::path(filepath);
auto prefix = std::filesystem::path(path_prefix_);

// Cache path shall not use absolute filepath direct, it shall always under path_prefix_
if (path.is_absolute()) {
return (prefix /
filepath.substr(path.root_directory().string().length(),
filepath.length()))
.string();
}

return (prefix / filepath).string();
}

} // namespace milvus::storage
12 changes: 8 additions & 4 deletions internal/core/src/storage/ChunkCache.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,11 @@ extern std::map<std::string, int> ReadAheadPolicy_Map;

class ChunkCache {
public:
explicit ChunkCache(const std::string& read_ahead_policy,
explicit ChunkCache(std::string& path_prefix,
const std::string& read_ahead_policy,
ChunkManagerPtr cm,
MmapChunkManagerPtr mcm)
: cm_(cm), mcm_(mcm) {
: path_prefix_(path_prefix), cm_(cm), mcm_(mcm) {
auto iter = ReadAheadPolicy_Map.find(read_ahead_policy);
AssertInfo(iter != ReadAheadPolicy_Map.end(),
"unrecognized read ahead policy: {}, "
Expand All @@ -47,8 +48,9 @@ class ChunkCache {
public:
std::shared_ptr<ColumnBase>
Read(const std::string& filepath,
const MmapChunkDescriptorPtr& descriptor,
const FieldMeta& field_meta);
const FieldMeta& field_meta,
bool mmap_enabled,
bool mmap_rss_not_need = false);

std::shared_ptr<ColumnBase>
Read(const std::string& filepath,
Expand Down Expand Up @@ -85,6 +87,8 @@ class ChunkCache {
ChunkManagerPtr cm_;
MmapChunkManagerPtr mcm_;
ColumnTable columns_;

std::string path_prefix_;
};

using ChunkCachePtr = std::shared_ptr<milvus::storage::ChunkCache>;
Expand Down
5 changes: 4 additions & 1 deletion internal/core/src/storage/MmapManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,10 @@ class MmapManager {
auto rcm = RemoteChunkManagerSingleton::GetInstance()
.GetRemoteChunkManager();
cc_ = std::make_shared<ChunkCache>(
std::move(mmap_config_.cache_read_ahead_policy), rcm, mcm_);
mmap_config_.mmap_path,
std::move(mmap_config_.cache_read_ahead_policy),
rcm,
mcm_);
}
LOG_INFO("Init MmapConfig with MmapConfig: {}",
mmap_config_.ToString());
Expand Down
Loading

0 comments on commit dd4f33a

Please sign in to comment.