diff --git a/internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp b/internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp index 56ba3fd074927..1f6a1aa1e409e 100644 --- a/internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp +++ b/internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp @@ -157,7 +157,7 @@ ChunkedSegmentSealedImpl::WarmupChunkCache(const FieldId field_id, auto cc = storage::MmapManager::GetInstance().GetChunkCache(); for (const auto& data_path : field_info.insert_files) { - auto column = cc->Read(data_path, mmap_descriptor_, field_meta); + auto column = cc->Read(data_path, field_meta, mmap_enabled, true); } } @@ -948,7 +948,7 @@ std::tuple< descriptor, const FieldMeta& field_meta) { - auto column = cc->Read(data_path, descriptor, field_meta); + auto column = cc->Read(data_path, field_meta, true); cc->Prefetch(data_path); return {data_path, std::dynamic_pointer_cast(column)}; } diff --git a/internal/core/src/storage/ChunkCache.cpp b/internal/core/src/storage/ChunkCache.cpp index 3fd68868d6ddb..b2ddb14d63aa2 100644 --- a/internal/core/src/storage/ChunkCache.cpp +++ b/internal/core/src/storage/ChunkCache.cpp @@ -14,10 +14,14 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include #include #include "ChunkCache.h" +#include "boost/filesystem/operations.hpp" +#include "boost/filesystem/path.hpp" +#include "common/Chunk.h" #include "common/ChunkWriter.h" #include "common/FieldMeta.h" #include "common/Types.h" @@ -26,8 +30,9 @@ namespace milvus::storage { std::shared_ptr ChunkCache::Read(const std::string& filepath, - const MmapChunkDescriptorPtr& descriptor, - const FieldMeta& field_meta) { + const FieldMeta& field_meta, + bool mmap_enabled, + bool mmap_rss_not_need) { // use rlock to get future { std::shared_lock lck(mutex_); @@ -66,8 +71,28 @@ ChunkCache::Read(const std::string& filepath, auto field_data = DownloadAndDecodeRemoteFile(cm_.get(), filepath, false); - auto chunk = create_chunk( - field_meta, field_meta.get_dim(), field_data->GetReader()->reader); + std::shared_ptr chunk; + auto dim = IsSparseFloatVectorDataType(field_meta.get_data_type()) + ? 1 + : field_meta.get_dim(); + if (mmap_enabled) { + auto path = std::filesystem::path(CachePath(filepath)); + auto dir = path.parent_path(); + std::filesystem::create_directories(dir); + + auto file = File::Open(path.string(), O_CREAT | O_TRUNC | O_RDWR); + chunk = create_chunk( + field_meta, dim, file, 0, field_data->GetReader()->reader); + // unlink + auto ok = unlink(path.c_str()); + AssertInfo(ok == 0, + "failed to unlink mmap data file {}, err: {}", + path.c_str(), + strerror(errno)); + } else { + chunk = + create_chunk(field_meta, dim, field_data->GetReader()->reader); + } auto data_type = field_meta.get_data_type(); if (IsSparseFloatVectorDataType(data_type)) { @@ -83,6 +108,22 @@ ChunkCache::Read(const std::string& filepath, std::vector> chunks{chunk}; column = std::make_shared(chunks); } + if (mmap_enabled && mmap_rss_not_need) { + auto ok = madvise(reinterpret_cast( + const_cast(column->MmappedData())), + column->DataByteSize(), + ReadAheadPolicy_Map["dontneed"]); + if (ok != 0) { + LOG_WARN( + "failed to madvise to the data file {}, addr {}, size {}, " + "err: " + "{}", + filepath, + static_cast(column->MmappedData()), + column->DataByteSize(), + strerror(errno)); + } + } } catch (const SegcoreError& e) { err_code = e.get_error_code(); err_msg = fmt::format("failed to read for chunkCache, seg_core_err:{}", @@ -261,4 +302,22 @@ ChunkCache::ConvertToColumn(const FieldDataPtr& field_data, column->AppendBatch(field_data); return column; } + +// TODO(sunby): use mmap chunk manager to create chunk +std::string +ChunkCache::CachePath(const std::string& filepath) { + auto path = std::filesystem::path(filepath); + auto prefix = std::filesystem::path(path_prefix_); + + // Cache path shall not use absolute filepath direct, it shall always under path_prefix_ + if (path.is_absolute()) { + return (prefix / + filepath.substr(path.root_directory().string().length(), + filepath.length())) + .string(); + } + + return (prefix / filepath).string(); +} + } // namespace milvus::storage diff --git a/internal/core/src/storage/ChunkCache.h b/internal/core/src/storage/ChunkCache.h index fecb8e5bac58c..040b0fbf8f0a3 100644 --- a/internal/core/src/storage/ChunkCache.h +++ b/internal/core/src/storage/ChunkCache.h @@ -27,10 +27,11 @@ extern std::map ReadAheadPolicy_Map; class ChunkCache { public: - explicit ChunkCache(const std::string& read_ahead_policy, + explicit ChunkCache(std::string& path_prefix, + const std::string& read_ahead_policy, ChunkManagerPtr cm, MmapChunkManagerPtr mcm) - : cm_(cm), mcm_(mcm) { + : path_prefix_(path_prefix), cm_(cm), mcm_(mcm) { auto iter = ReadAheadPolicy_Map.find(read_ahead_policy); AssertInfo(iter != ReadAheadPolicy_Map.end(), "unrecognized read ahead policy: {}, " @@ -47,8 +48,9 @@ class ChunkCache { public: std::shared_ptr Read(const std::string& filepath, - const MmapChunkDescriptorPtr& descriptor, - const FieldMeta& field_meta); + const FieldMeta& field_meta, + bool mmap_enabled, + bool mmap_rss_not_need = false); std::shared_ptr Read(const std::string& filepath, @@ -85,6 +87,8 @@ class ChunkCache { ChunkManagerPtr cm_; MmapChunkManagerPtr mcm_; ColumnTable columns_; + + std::string path_prefix_; }; using ChunkCachePtr = std::shared_ptr; diff --git a/internal/core/src/storage/MmapManager.h b/internal/core/src/storage/MmapManager.h index f2e32d56c6f0e..8043474aed01c 100644 --- a/internal/core/src/storage/MmapManager.h +++ b/internal/core/src/storage/MmapManager.h @@ -66,7 +66,10 @@ class MmapManager { auto rcm = RemoteChunkManagerSingleton::GetInstance() .GetRemoteChunkManager(); cc_ = std::make_shared( - std::move(mmap_config_.cache_read_ahead_policy), rcm, mcm_); + mmap_config_.mmap_path, + std::move(mmap_config_.cache_read_ahead_policy), + rcm, + mcm_); } LOG_INFO("Init MmapConfig with MmapConfig: {}", mmap_config_.ToString()); diff --git a/internal/core/unittest/test_chunk_cache.cpp b/internal/core/unittest/test_chunk_cache.cpp index f1483d3caa299..9888097e59ac8 100644 --- a/internal/core/unittest/test_chunk_cache.cpp +++ b/internal/core/unittest/test_chunk_cache.cpp @@ -16,23 +16,96 @@ #include +#include #include +#include #include +#include "common/Consts.h" +#include "common/FieldMeta.h" +#include "common/Types.h" #include "fmt/format.h" #include "common/Schema.h" +#include "gtest/gtest.h" +#include "knowhere/sparse_utils.h" +#include "mmap/Column.h" #include "test_utils/DataGen.h" #include "test_utils/storage_test_utils.h" #include "storage/ChunkCache.h" #include "storage/LocalChunkManagerSingleton.h" #define DEFAULT_READ_AHEAD_POLICY "willneed" -class ChunkCacheTest : public testing::TestWithParam { - public: +class ChunkCacheTest + : public testing::TestWithParam< + /*mmap enabled, is chunked*/ std::tuple> { + protected: void SetUp() override { mcm = milvus::storage::MmapManager::GetInstance().GetMmapChunkManager(); mcm->Register(descriptor); + + N = 10000; + dim = 128; + auto dense_metric_type = knowhere::metric::L2; + auto sparse_metric_type = knowhere::metric::IP; + + auto schema = std::make_shared(); + auto fake_dense_vec_id = schema->AddDebugField( + "fakevec", milvus::DataType::VECTOR_FLOAT, dim, dense_metric_type); + auto i64_fid = + schema->AddDebugField("counter", milvus::DataType::INT64); + auto fake_sparse_vec_id = + schema->AddDebugField("fakevec_sparse", + milvus::DataType::VECTOR_SPARSE_FLOAT, + dim, + sparse_metric_type); + schema->set_primary_field_id(i64_fid); + + auto dataset = milvus::segcore::DataGen(schema, N); + + auto dense_field_data_meta = + milvus::storage::FieldDataMeta{1, 2, 3, fake_dense_vec_id.get()}; + auto sparse_field_data_meta = + milvus::storage::FieldDataMeta{1, 2, 3, fake_sparse_vec_id.get()}; + dense_field_meta = milvus::FieldMeta(milvus::FieldName("fakevec"), + fake_dense_vec_id, + milvus::DataType::VECTOR_FLOAT, + dim, + dense_metric_type, + false); + sparse_field_meta = + milvus::FieldMeta(milvus::FieldName("fakevec_sparse"), + fake_sparse_vec_id, + milvus::DataType::VECTOR_SPARSE_FLOAT, + dim, + sparse_metric_type, + false); + + lcm = milvus::storage::LocalChunkManagerSingleton::GetInstance() + .GetChunkManager(); + dense_data = dataset.get_col(fake_dense_vec_id); + sparse_data = dataset.get_col>( + fake_sparse_vec_id); + + auto data_slices = std::vector{dense_data.data()}; + auto slice_sizes = std::vector{static_cast(N)}; + auto slice_names = std::vector{dense_file_name}; + PutFieldData(lcm.get(), + data_slices, + slice_sizes, + slice_names, + dense_field_data_meta, + dense_field_meta); + + data_slices = std::vector{sparse_data.data()}; + slice_sizes = std::vector{static_cast(N)}; + slice_names = std::vector{sparse_file_name}; + PutFieldData(lcm.get(), + data_slices, + slice_sizes, + slice_names, + sparse_field_data_meta, + sparse_field_meta); } void TearDown() override { @@ -46,81 +119,37 @@ class ChunkCacheTest : public testing::TestWithParam { std::shared_ptr( new milvus::storage::MmapChunkDescriptor( {101, SegmentType::Sealed})); -}; + int N; + int dim; + milvus::FieldMeta dense_field_meta = milvus::FieldMeta::RowIdMeta; + milvus::FixedVector dense_data; + milvus::FieldMeta sparse_field_meta = milvus::FieldMeta::RowIdMeta; + milvus::FixedVector> sparse_data; + std::shared_ptr lcm; +}; INSTANTIATE_TEST_SUITE_P(ChunkCacheTestSuite, ChunkCacheTest, - testing::Values(true, false)); + testing::Combine(testing::Bool(), testing::Bool())); TEST_P(ChunkCacheTest, Read) { - auto N = 10000; - auto dim = 128; - auto dense_metric_type = knowhere::metric::L2; - auto sparse_metric_type = knowhere::metric::IP; - - auto schema = std::make_shared(); - auto fake_dense_vec_id = schema->AddDebugField( - "fakevec", milvus::DataType::VECTOR_FLOAT, dim, dense_metric_type); - auto i64_fid = schema->AddDebugField("counter", milvus::DataType::INT64); - auto fake_sparse_vec_id = - schema->AddDebugField("fakevec_sparse", - milvus::DataType::VECTOR_SPARSE_FLOAT, - dim, - sparse_metric_type); - schema->set_primary_field_id(i64_fid); - - auto dataset = milvus::segcore::DataGen(schema, N); - - auto dense_field_data_meta = - milvus::storage::FieldDataMeta{1, 2, 3, fake_dense_vec_id.get()}; - auto sparse_field_data_meta = - milvus::storage::FieldDataMeta{1, 2, 3, fake_sparse_vec_id.get()}; - auto dense_field_meta = milvus::FieldMeta(milvus::FieldName("fakevec"), - fake_dense_vec_id, - milvus::DataType::VECTOR_FLOAT, - dim, - dense_metric_type, - false); - auto sparse_field_meta = - milvus::FieldMeta(milvus::FieldName("fakevec_sparse"), - fake_sparse_vec_id, - milvus::DataType::VECTOR_SPARSE_FLOAT, - dim, - sparse_metric_type, - false); - - auto lcm = milvus::storage::LocalChunkManagerSingleton::GetInstance() - .GetChunkManager(); - auto dense_data = dataset.get_col(fake_dense_vec_id); - auto sparse_data = - dataset.get_col>(fake_sparse_vec_id); - - auto data_slices = std::vector{dense_data.data()}; - auto slice_sizes = std::vector{static_cast(N)}; - auto slice_names = std::vector{dense_file_name}; - PutFieldData(lcm.get(), - data_slices, - slice_sizes, - slice_names, - dense_field_data_meta, - dense_field_meta); - - data_slices = std::vector{sparse_data.data()}; - slice_sizes = std::vector{static_cast(N)}; - slice_names = std::vector{sparse_file_name}; - PutFieldData(lcm.get(), - data_slices, - slice_sizes, - slice_names, - sparse_field_data_meta, - sparse_field_meta); - auto cc = milvus::storage::MmapManager::GetInstance().GetChunkCache(); // validate dense data - const auto& dense_column = - cc->Read(dense_file_name, descriptor, dense_field_meta, GetParam()); - Assert(dense_column->DataByteSize() == dim * N * 4); + std::shared_ptr dense_column; + + auto p = GetParam(); + auto mmap_enabled = std::get<0>(p); + auto is_test_chunked = std::get<1>(p); + + if (is_test_chunked) { + dense_column = + cc->Read(dense_file_name, dense_field_meta, mmap_enabled); + } else { + dense_column = cc->Read( + dense_file_name, descriptor, dense_field_meta, mmap_enabled); + Assert(dense_column->DataByteSize() == dim * N * 4); + } auto actual_dense = (const float*)(dense_column->Data()); for (auto i = 0; i < N * dim; i++) { AssertInfo(dense_data[i] == actual_dense[i], @@ -129,8 +158,14 @@ TEST_P(ChunkCacheTest, Read) { } // validate sparse data - const auto& sparse_column = - cc->Read(sparse_file_name, descriptor, sparse_field_meta, GetParam()); + std::shared_ptr sparse_column; + if (is_test_chunked) { + sparse_column = + cc->Read(sparse_file_name, sparse_field_meta, mmap_enabled); + } else { + sparse_column = cc->Read( + sparse_file_name, descriptor, sparse_field_meta, mmap_enabled); + } auto expected_sparse_size = 0; auto actual_sparse = (const knowhere::sparse::SparseRow*)(sparse_column->Data()); @@ -151,8 +186,9 @@ TEST_P(ChunkCacheTest, Read) { actual_sparse_row.data())); expected_sparse_size += bytes; } - - ASSERT_EQ(sparse_column->DataByteSize(), expected_sparse_size); + if (!is_test_chunked) { + Assert(sparse_column->DataByteSize() == expected_sparse_size); + } cc->Remove(dense_file_name); cc->Remove(sparse_file_name); @@ -161,76 +197,23 @@ TEST_P(ChunkCacheTest, Read) { } TEST_P(ChunkCacheTest, TestMultithreads) { - auto N = 1000; - auto dim = 128; - auto dense_metric_type = knowhere::metric::L2; - auto sparse_metric_type = knowhere::metric::IP; - - auto schema = std::make_shared(); - auto fake_dense_vec_id = schema->AddDebugField( - "fakevec", milvus::DataType::VECTOR_FLOAT, dim, dense_metric_type); - auto fake_sparse_vec_id = - schema->AddDebugField("fakevec_sparse", - milvus::DataType::VECTOR_SPARSE_FLOAT, - dim, - sparse_metric_type); - auto i64_fid = schema->AddDebugField("counter", milvus::DataType::INT64); - schema->set_primary_field_id(i64_fid); - - auto dataset = milvus::segcore::DataGen(schema, N); - - auto dense_field_data_meta = - milvus::storage::FieldDataMeta{1, 2, 3, fake_dense_vec_id.get()}; - auto sparse_field_data_meta = - milvus::storage::FieldDataMeta{1, 2, 3, fake_sparse_vec_id.get()}; - auto dense_field_meta = milvus::FieldMeta(milvus::FieldName("fakevec"), - fake_dense_vec_id, - milvus::DataType::VECTOR_FLOAT, - dim, - dense_metric_type, - false); - auto sparse_field_meta = - milvus::FieldMeta(milvus::FieldName("fakevec_sparse"), - fake_sparse_vec_id, - milvus::DataType::VECTOR_SPARSE_FLOAT, - dim, - sparse_metric_type, - false); - - auto lcm = milvus::storage::LocalChunkManagerSingleton::GetInstance() - .GetChunkManager(); - auto dense_data = dataset.get_col(fake_dense_vec_id); - auto sparse_data = - dataset.get_col>(fake_sparse_vec_id); - - auto dense_data_slices = std::vector{dense_data.data()}; - auto sparse_data_slices = std::vector{sparse_data.data()}; - auto slice_sizes = std::vector{static_cast(N)}; - auto dense_slice_names = std::vector{dense_file_name}; - auto sparse_slice_names = std::vector{sparse_file_name}; - - PutFieldData(lcm.get(), - dense_data_slices, - slice_sizes, - dense_slice_names, - dense_field_data_meta, - dense_field_meta); - - PutFieldData(lcm.get(), - sparse_data_slices, - slice_sizes, - sparse_slice_names, - sparse_field_data_meta, - sparse_field_meta); - auto cc = milvus::storage::MmapManager::GetInstance().GetChunkCache(); constexpr int threads = 16; std::vector total_counts(threads); + auto p = GetParam(); + auto mmap_enabled = std::get<0>(p); + auto is_test_chunked = std::get<1>(p); auto executor = [&](int thread_id) { - const auto& dense_column = - cc->Read(dense_file_name, descriptor, dense_field_meta, GetParam()); - Assert(dense_column->DataByteSize() == dim * N * 4); + std::shared_ptr dense_column; + if (is_test_chunked) { + dense_column = + cc->Read(dense_file_name, dense_field_meta, mmap_enabled); + } else { + dense_column = cc->Read( + dense_file_name, descriptor, dense_field_meta, mmap_enabled); + Assert(dense_column->DataByteSize() == dim * N * 4); + } auto actual_dense = (const float*)dense_column->Data(); for (auto i = 0; i < N * dim; i++) { @@ -240,8 +223,14 @@ TEST_P(ChunkCacheTest, TestMultithreads) { "expect {}, actual {}", dense_data[i], actual_dense[i])); } - const auto& sparse_column = cc->Read( - sparse_file_name, descriptor, sparse_field_meta, GetParam()); + std::shared_ptr sparse_column; + if (is_test_chunked) { + sparse_column = + cc->Read(sparse_file_name, sparse_field_meta, mmap_enabled); + } else { + sparse_column = cc->Read( + sparse_file_name, descriptor, sparse_field_meta, mmap_enabled); + } auto actual_sparse = (const knowhere::sparse::SparseRow*)sparse_column->Data(); for (auto i = 0; i < N; i++) {