Skip to content

Commit

Permalink
enhance: refactor mmap variable column to reduce memory cost
Browse files Browse the repository at this point in the history
Signed-off-by: luzhang <[email protected]>
  • Loading branch information
luzhang committed Jun 20, 2024
1 parent 2a13569 commit 79d267b
Show file tree
Hide file tree
Showing 17 changed files with 356 additions and 72 deletions.
8 changes: 8 additions & 0 deletions internal/core/src/common/Common.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,12 @@ SetCpuNum(const int core);
void
SetDefaultExecEvalExprBatchSize(int64_t val);

struct BufferView {
char* data_;
size_t size_;

BufferView(char* data_ptr, size_t size) : data_(data_ptr), size_(size) {
}
};

} // namespace milvus
1 change: 1 addition & 0 deletions internal/core/src/common/EasyAssert.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ enum ErrorCode {
MemAllocateFailed = 2034,
MemAllocateSizeNotMatch = 2035,
MmapError = 2036,
OutOfRange = 2037,
KnowhereError = 2100,

// timeout or cancel related.
Expand Down
17 changes: 15 additions & 2 deletions internal/core/src/common/File.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,19 +39,30 @@ class File {
"failed to create mmap file {}: {}",
filepath,
strerror(errno));
return File(fd);
return File(fd, std::string(filepath));
}

int
Descriptor() const {
return fd_;
}

std::string
Path() const {
return filepath_;
}

ssize_t
Write(const void* buf, size_t size) {
return write(fd_, buf, size);
}

template <typename T, std::enable_if_t<std::is_integral_v<T>, int> = 0>
ssize_t
WriteInt(T value) {
return write(fd_, &value, sizeof(value));
}

offset_t
Seek(offset_t offset, int whence) {
return lseek(fd_, offset, whence);
Expand All @@ -64,8 +75,10 @@ class File {
}

private:
explicit File(int fd) : fd_(fd) {
explicit File(int fd, const std::string& filepath)
: fd_(fd), filepath_(filepath) {
}
int fd_{-1};
std::string filepath_;
};
} // namespace milvus
2 changes: 1 addition & 1 deletion internal/core/src/exec/expression/CompareExpr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ PhyCompareFilterExpr::GetChunkData<std::string>(FieldId field_id,
return [chunk_data](int i) -> const number { return chunk_data[i]; };
} else {
auto chunk_data =
segment_->chunk_data<std::string_view>(field_id, chunk_id).data();
segment_->chunk_view<std::string_view>(field_id, chunk_id).data();
return [chunk_data](int i) -> const number {
return std::string(chunk_data[i]);
};
Expand Down
2 changes: 1 addition & 1 deletion internal/core/src/exec/expression/ExistsExpr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ PhyExistsFilterExpr::Eval(EvalCtx& context, VectorPtr& result) {
case DataType::JSON: {
if (is_index_mode_) {
PanicInfo(ExprInvalid,
"exists expr for json index mode not supportted");
"exists expr for json index mode not supported");
}
result = EvalJsonExistsForDataSegment();
break;
Expand Down
37 changes: 36 additions & 1 deletion internal/core/src/exec/expression/Expr.h
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,32 @@ class SegmentExpr : public Expr {
: batch_size_;
}

// used for processing raw data expr for sealed segments.
// now only used for std::string_view && json
// TODO: support more types
template <typename T, typename FUNC, typename... ValTypes>
int64_t
ProcessChunkForSealedSeg(
FUNC func,
std::function<bool(const milvus::SkipIndex&, FieldId, int)> skip_func,
TargetBitmapView res,
ValTypes... values) {
// For sealed segment, only single chunk
Assert(num_data_chunk_ == 1);
auto need_size =
std::min(active_count_ - current_data_chunk_pos_, batch_size_);

auto& skip_index = segment_->GetSkipIndex();
if (!skip_func || !skip_func(skip_index, field_id_, 0)) {
auto data_vec = segment_->get_batch_views<T>(
field_id_, 0, current_data_chunk_pos_, need_size);

func(data_vec.data(), need_size, res, values...);
}
current_data_chunk_pos_ += need_size;
return need_size;
}

template <typename T, typename FUNC, typename... ValTypes>
int64_t
ProcessDataChunks(
Expand All @@ -191,6 +217,15 @@ class SegmentExpr : public Expr {
TargetBitmapView res,
ValTypes... values) {
int64_t processed_size = 0;

if constexpr (std::is_same_v<T, std::string_view> ||
std::is_same_v<T, Json>) {
if (segment_->type() == SegmentType::Sealed) {
return ProcessChunkForSealedSeg<T>(
func, skip_func, res, values...);
}
}

for (size_t i = current_data_chunk_; i < num_data_chunk_; i++) {
auto data_pos =
(i == current_data_chunk_) ? current_data_chunk_pos_ : 0;
Expand Down Expand Up @@ -422,4 +457,4 @@ class ExprSet {
};

} //namespace exec
} // namespace milvus
} // namespace milvus
2 changes: 1 addition & 1 deletion internal/core/src/exec/expression/JsonContainsExpr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ PhyJsonContainsFilterExpr::Eval(EvalCtx& context, VectorPtr& result) {
if (is_index_mode_) {
PanicInfo(
ExprInvalid,
"exists expr for json or array index mode not supportted");
"exists expr for json or array index mode not supported");
}
result = EvalJsonContainsForDataSegment();
break;
Expand Down
124 changes: 98 additions & 26 deletions internal/core/src/mmap/Column.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include <vector>

#include "common/Array.h"
#include "common/Common.h"
#include "common/EasyAssert.h"
#include "common/File.h"
#include "common/FieldMeta.h"
Expand All @@ -52,6 +53,8 @@ namespace milvus {
constexpr size_t STRING_PADDING = 1;
constexpr size_t ARRAY_PADDING = 1;

constexpr size_t BLOCK_SIZE = 8192;

class ColumnBase {
public:
enum MappingType {
Expand Down Expand Up @@ -223,6 +226,19 @@ class ColumnBase {
virtual SpanBase
Span() const = 0;

// used for sequential access for search
virtual BufferView
GetBatchBuffer(int64_t start_offset, int64_t length) {
PanicInfo(ErrorCode::Unsupported,
"GetBatchBuffer only supported for VariableColumn");
}

virtual std::vector<std::string_view>
StringViews() const {
PanicInfo(ErrorCode::Unsupported,
"StringViews only supported for VariableColumn");
}

virtual void
AppendBatch(const FieldDataPtr data) {
size_t required_size = size_ + data->Size();
Expand Down Expand Up @@ -548,40 +564,94 @@ class VariableColumn : public ColumnBase {
}

VariableColumn(VariableColumn&& column) noexcept
: ColumnBase(std::move(column)),
indices_(std::move(column.indices_)),
views_(std::move(column.views_)) {
: ColumnBase(std::move(column)), indices_(std::move(column.indices_)) {
}

~VariableColumn() override = default;

SpanBase
Span() const override {
return SpanBase(views_.data(), views_.size(), sizeof(ViewType));
PanicInfo(ErrorCode::NotImplemented,
"span() interface is not implemented for variable column");
}

std::vector<std::string_view>
StringViews() const override {
std::vector<std::string_view> res;
char* pos = data_;
for (size_t i = 0; i < num_rows_; ++i) {
uint32_t size;
size = *reinterpret_cast<uint32_t*>(pos);
pos += sizeof(uint32_t);
res.emplace_back(std::string_view(pos, size));
pos += size;
}
return res;
}

[[nodiscard]] const std::vector<ViewType>&
[[nodiscard]] std::vector<ViewType>
Views() const {
return views_;
std::vector<ViewType> res;
char* pos = data_;
for (size_t i = 0; i < num_rows_; ++i) {
uint32_t size;
size = *reinterpret_cast<uint32_t*>(pos);
pos += sizeof(uint32_t);
res.emplace_back(ViewType(pos, size));
pos += size;
}
return res;
}

BufferView
GetBatchBuffer(int64_t start_offset, int64_t length) override {
if (start_offset < 0 || start_offset > num_rows_ ||
start_offset + length > num_rows_) {
PanicInfo(ErrorCode::OutOfRange, "index out of range");
}

char* pos = data_ + indices_[start_offset / BLOCK_SIZE];
for (size_t j = 0; j < start_offset % BLOCK_SIZE; j++) {
uint32_t size;
size = *reinterpret_cast<uint32_t*>(pos);
pos += sizeof(uint32_t) + size;
}

return BufferView{pos, size_ - (pos - data_)};
}

ViewType
operator[](const int i) const {
return views_[i];
if (i < 0 || i > num_rows_) {
PanicInfo(ErrorCode::OutOfRange, "index out of range");
}
size_t batch_id = i / BLOCK_SIZE;
size_t offset = i % BLOCK_SIZE;

// located in batch start location
char* pos = data_ + indices_[batch_id];
for (size_t j = 0; j < offset; j++) {
uint32_t size;
size = *reinterpret_cast<uint32_t*>(pos);
pos += sizeof(uint32_t) + size;
}

uint32_t size;
size = *reinterpret_cast<uint32_t*>(pos);
return ViewType(pos + sizeof(uint32_t), size);
}

std::string_view
RawAt(const int i) const {
return std::string_view(views_[i]);
return std::string_view((*this)[i]);
}

void
Append(FieldDataPtr chunk) {
for (auto i = 0; i < chunk->get_num_rows(); i++) {
auto data = static_cast<const T*>(chunk->RawValue(i));

indices_.emplace_back(size_);
size_ += data->size();
auto data = static_cast<const T*>(chunk->RawValue(i));
size_ += sizeof(uint32_t) + data->size();
}
load_buf_.emplace(std::move(chunk));
}
Expand All @@ -604,40 +674,42 @@ class VariableColumn : public ColumnBase {
auto chunk = std::move(load_buf_.front());
load_buf_.pop();

// data_ as: |size|data|size|data......
for (auto i = 0; i < chunk->get_num_rows(); i++) {
auto current_size = (uint32_t)chunk->Size(i);
std::memcpy(data_ + size_, &current_size, sizeof(uint32_t));
size_ += sizeof(uint32_t);
auto data = static_cast<const T*>(chunk->RawValue(i));
std::copy_n(data->c_str(), data->size(), data_ + size_);
std::memcpy(data_ + size_, data->c_str(), data->size());
size_ += data->size();
}
}
}

ConstructViews();

// Not need indices_ after
indices_.clear();
std::vector<uint64_t>().swap(indices_);
shrink_indice();
}

protected:
void
ConstructViews() {
views_.reserve(indices_.size());
for (size_t i = 0; i < indices_.size() - 1; i++) {
views_.emplace_back(data_ + indices_[i],
indices_[i + 1] - indices_[i]);
shrink_indice() {
std::vector<uint64_t> tmp_indices;
tmp_indices.reserve((indices_.size() + BLOCK_SIZE - 1) / BLOCK_SIZE);

for (size_t i = 0; i < indices_.size();) {
tmp_indices.push_back(indices_[i]);
i += BLOCK_SIZE;
}
views_.emplace_back(data_ + indices_.back(), size_ - indices_.back());

indices_.swap(tmp_indices);
}

private:
// loading states
std::queue<FieldDataPtr> load_buf_{};

// raw data index, record indices located 0, interval, 2 * interval, 3 * interval
// ... just like page index, interval set to 8192 that matches search engine's batch size
std::vector<uint64_t> indices_{};

// Compatible with current Span type
std::vector<ViewType> views_{};
};

class ArrayColumn : public ColumnBase {
Expand Down
Loading

0 comments on commit 79d267b

Please sign in to comment.