Skip to content

Commit

Permalink
feat: support chunked column for sealed segment (milvus-io#35764)
Browse files Browse the repository at this point in the history
This PR splits sealed segment to chunked data to avoid unnecessary
memory copy and save memory usage when loading segments so that loading
can be accelerated.

To support rollback to previous version, we add an option
`multipleChunkedEnable` which is false by default.

Signed-off-by: sunby <[email protected]>
  • Loading branch information
sunby authored Oct 12, 2024
1 parent 5713620 commit a75bb85
Show file tree
Hide file tree
Showing 54 changed files with 4,912 additions and 484 deletions.
1 change: 1 addition & 0 deletions configs/milvus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,7 @@ queryNode:
nprobe: 16 # nprobe to search small index, based on your accuracy requirement, must smaller than nlist
memExpansionRate: 1.15 # extra memory needed by building interim index
buildParallelRate: 0.5 # the ratio of building interim index parallel matched with cpu num
multipleChunkedEnable: false # Enable multiple chunked search
knowhereScoreConsistency: false # Enable knowhere strong consistency score computation logic
loadMemoryUsageFactor: 1 # The multiply factor of calculating the memory usage while loading segments
enableDisk: false # enable querynode load disk index, and search on disk index
Expand Down
1 change: 1 addition & 0 deletions internal/core/src/bitset/detail/element_wise.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "ctz.h"
#include "popcount.h"

#include "bitset/common.h"
namespace milvus {
namespace bitset {
namespace detail {
Expand Down
29 changes: 16 additions & 13 deletions internal/core/src/common/Chunk.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,44 +18,47 @@

namespace milvus {

std::vector<std::string_view>
StringChunk::StringViews() const {
std::pair<std::vector<std::string_view>, FixedVector<bool>>
StringChunk::StringViews() {
std::vector<std::string_view> ret;
for (int i = 0; i < row_nums_ - 1; i++) {
for (int i = 0; i < row_nums_; i++) {
ret.emplace_back(data_ + offsets_[i], offsets_[i + 1] - offsets_[i]);
}
ret.emplace_back(data_ + offsets_[row_nums_ - 1],
size_ - MMAP_STRING_PADDING - offsets_[row_nums_ - 1]);
return ret;
return {ret, valid_};
}

void
ArrayChunk::ConstructViews() {
views_.reserve(row_nums_);

for (int i = 0; i < row_nums_; ++i) {
auto data_ptr = data_ + offsets_[i];
auto next_data_ptr = i == row_nums_ - 1
? data_ + size_ - MMAP_ARRAY_PADDING
: data_ + offsets_[i + 1];
auto offsets_len = lens_[i] * sizeof(uint64_t);
int offset = offsets_lens_[2 * i];
int next_offset = offsets_lens_[2 * (i + 1)];
int len = offsets_lens_[2 * i + 1];

auto data_ptr = data_ + offset;
auto offsets_len = 0;
std::vector<uint64_t> element_indices = {};
if (IsStringDataType(element_type_)) {
offsets_len = len * sizeof(uint64_t);
std::vector<uint64_t> tmp(
reinterpret_cast<uint64_t*>(data_ptr),
reinterpret_cast<uint64_t*>(data_ptr + offsets_len));
element_indices = std::move(tmp);
}
views_.emplace_back(data_ptr + offsets_len,
next_data_ptr - data_ptr - offsets_len,
next_offset - offset - offsets_len,
element_type_,
std::move(element_indices));
}
}

SpanBase
ArrayChunk::Span() const {
return SpanBase(views_.data(), views_.size(), sizeof(ArrayView));
return SpanBase(views_.data(),
nullable_ ? valid_.data() : nullptr,
views_.size(),
sizeof(ArrayView));
}

} // namespace milvus
156 changes: 121 additions & 35 deletions internal/core/src/common/Chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,60 +21,126 @@
#include "arrow/record_batch.h"
#include "common/Array.h"
#include "common/ChunkTarget.h"
#include "common/EasyAssert.h"
#include "common/FieldDataInterface.h"
#include "common/Json.h"
#include "common/Span.h"
#include "knowhere/sparse_utils.h"
#include "simdjson/common_defs.h"
#include "sys/mman.h"
#include "common/Types.h"
namespace milvus {
constexpr size_t MMAP_STRING_PADDING = 1;
constexpr size_t MMAP_ARRAY_PADDING = 1;
constexpr uint64_t MMAP_STRING_PADDING = 1;
constexpr uint64_t MMAP_ARRAY_PADDING = 1;
class Chunk {
public:
Chunk() = default;
Chunk(int64_t row_nums, char* data, size_t size)
: row_nums_(row_nums), data_(data), size_(size) {
Chunk(int64_t row_nums, char* data, uint64_t size, bool nullable)
: row_nums_(row_nums), data_(data), size_(size), nullable_(nullable) {
if (nullable) {
valid_.reserve(row_nums);
for (int i = 0; i < row_nums; i++) {
valid_.push_back((data[i >> 3] >> (i & 0x07)) & 1);
}
}
}
virtual ~Chunk() {
munmap(data_, size_);
}

uint64_t
Size() const {
return size_;
}

int64_t
RowNums() const {
return row_nums_;
}

virtual const char*
ValueAt(int64_t idx) const = 0;

virtual const char*
Data() const {
return data_;
}

virtual bool
isValid(int offset) {
return valid_[offset];
};

protected:
char* data_;
int64_t row_nums_;
size_t size_;
uint64_t size_;
bool nullable_;
FixedVector<bool>
valid_; // parse null bitmap to valid_ to be compatible with SpanBase
};

// for fixed size data, includes fixed size array
template <typename T>
class FixedWidthChunk : public Chunk {
public:
FixedWidthChunk(int32_t row_nums, int32_t dim, char* data, size_t size)
: Chunk(row_nums, data, size), dim_(dim){};
FixedWidthChunk(int32_t row_nums,
int32_t dim,
char* data,
uint64_t size,
uint64_t element_size,
bool nullable)
: Chunk(row_nums, data, size, nullable),
dim_(dim),
element_size_(element_size){};

milvus::SpanBase
Span() const {
auto null_bitmap_bytes_num = (row_nums_ + 7) / 8;
return milvus::SpanBase(
data_ + null_bitmap_bytes_num, row_nums_, sizeof(T) * dim_);
return milvus::SpanBase(data_ + null_bitmap_bytes_num,
nullable_ ? valid_.data() : nullptr,
row_nums_,
element_size_ * dim_);
}

const char*
ValueAt(int64_t idx) const override {
auto null_bitmap_bytes_num = (row_nums_ + 7) / 8;
return data_ + null_bitmap_bytes_num + idx * element_size_ * dim_;
}

const char*
Data() const override {
auto null_bitmap_bytes_num = (row_nums_ + 7) / 8;
return data_ + null_bitmap_bytes_num;
}

private:
int dim_;
int element_size_;
};

class StringChunk : public Chunk {
public:
StringChunk() = default;
StringChunk(int32_t row_nums, char* data, size_t size)
: Chunk(row_nums, data, size) {
StringChunk(int32_t row_nums, char* data, uint64_t size, bool nullable)
: Chunk(row_nums, data, size, nullable) {
auto null_bitmap_bytes_num = (row_nums + 7) / 8;
offsets_ = reinterpret_cast<uint64_t*>(data + null_bitmap_bytes_num);
}

std::vector<std::string_view>
StringViews() const;
std::pair<std::vector<std::string_view>, FixedVector<bool>>
StringViews();

const char*
ValueAt(int64_t idx) const override {
PanicInfo(ErrorCode::Unsupported,
"StringChunk::ValueAt is not supported");
}

uint64_t*
Offsets() {
return offsets_;
}

protected:
uint64_t* offsets_;
Expand All @@ -86,63 +152,83 @@ class ArrayChunk : public Chunk {
public:
ArrayChunk(int32_t row_nums,
char* data,
size_t size,
milvus::DataType element_type)
: Chunk(row_nums, data, size), element_type_(element_type) {
uint64_t size,
milvus::DataType element_type,
bool nullable)
: Chunk(row_nums, data, size, nullable), element_type_(element_type) {
auto null_bitmap_bytes_num = (row_nums + 7) / 8;
offsets_ = reinterpret_cast<uint64_t*>(data + null_bitmap_bytes_num);
lens_ = offsets_ + row_nums;
offsets_lens_ =
reinterpret_cast<uint64_t*>(data + null_bitmap_bytes_num);
ConstructViews();
}

SpanBase
Span() const;

ArrayView
View(int64_t idx) const {
return views_[idx];
}

void
ConstructViews();

const char*
ValueAt(int64_t idx) const override {
PanicInfo(ErrorCode::Unsupported,
"ArrayChunk::ValueAt is not supported");
}

private:
milvus::DataType element_type_;
uint64_t* offsets_;
uint64_t* lens_;
uint64_t* offsets_lens_;
std::vector<ArrayView> views_;
};

class SparseFloatVectorChunk : public Chunk {
public:
SparseFloatVectorChunk(int32_t row_nums, char* data, size_t size)
: Chunk(row_nums, data, size) {
SparseFloatVectorChunk(int32_t row_nums,
char* data,
uint64_t size,
bool nullable)
: Chunk(row_nums, data, size, nullable) {
vec_.resize(row_nums);
auto null_bitmap_bytes_num = (row_nums + 7) / 8;
auto offsets_ptr =
reinterpret_cast<uint64_t*>(data + null_bitmap_bytes_num);
for (int i = 0; i < row_nums; i++) {
int vec_size = 0;
if (i == row_nums - 1) {
vec_size = size - offsets_ptr[i];
} else {
vec_size = offsets_ptr[i + 1] - offsets_ptr[i];
}

vec_[i] = {
vec_size / knowhere::sparse::SparseRow<float>::element_size(),
(uint8_t*)(data + offsets_ptr[i]),
false};
vec_[i] = {(offsets_ptr[i + 1] - offsets_ptr[i]) /
knowhere::sparse::SparseRow<float>::element_size(),
(uint8_t*)(data + offsets_ptr[i]),
false};
dim_ = std::max(dim_, vec_[i].dim());
}
}

const char*
Data() const {
Data() const override {
return static_cast<const char*>(static_cast<const void*>(vec_.data()));
}

const char*
ValueAt(int64_t i) const override {
return static_cast<const char*>(
static_cast<const void*>(vec_.data() + i));
}

// only for test
std::vector<knowhere::sparse::SparseRow<float>>&
Vec() {
return vec_;
}

int64_t
Dim() {
return dim_;
}

private:
int64_t dim_ = 0;
std::vector<knowhere::sparse::SparseRow<float>> vec_;
};
} // namespace milvus
Loading

0 comments on commit a75bb85

Please sign in to comment.