Skip to content

Commit

Permalink
enhance: refine array view to optimize memory usage(#38736)
Browse files Browse the repository at this point in the history
Signed-off-by: MrPresent-Han <[email protected]>
  • Loading branch information
MrPresent-Han committed Jan 1, 2025
1 parent 18a3bc7 commit 1843325
Show file tree
Hide file tree
Showing 11 changed files with 182 additions and 141 deletions.
147 changes: 76 additions & 71 deletions internal/core/src/common/Array.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,27 @@ class Array {

~Array() {
delete[] data_;
if (offsets_ptr_) {
// only deallocate offsets for string type array
delete[] offsets_ptr_;
}
}

Array(char* data,
int len,
size_t size,
DataType element_type,
const uint32_t* offsets_ptr)
: size_(size), length_(len), element_type_(element_type) {
data_ = new char[size];
std::copy(data, data + size, data_);
if (IsVariableDataType(element_type)) {
AssertInfo(offsets_ptr != nullptr,
"For variable type elements in array, offsets_ptr must "
"be non-null");
offsets_ptr_ = new uint32_t[len];
std::copy(offsets_ptr, offsets_ptr + len, offsets_ptr_);
}
}

explicit Array(const ScalarArray& field_data) {
Expand Down Expand Up @@ -97,17 +118,19 @@ class Array {
case ScalarArray::kStringData: {
element_type_ = DataType::STRING;
length_ = field_data.string_data().data().size();
offsets_.reserve(length_);
offsets_ptr_ = new uint32_t[length_];
for (int i = 0; i < length_; ++i) {
offsets_.push_back(size_);
size_ += field_data.string_data().data(i).size();
offsets_ptr_[i] = size_;
size_ +=
field_data.string_data()
.data(i)
.size(); //type risk here between uint32_t vs size_t
}

data_ = new char[size_];
for (int i = 0; i < length_; ++i) {
std::copy_n(field_data.string_data().data(i).data(),
field_data.string_data().data(i).size(),
data_ + offsets_[i]);
data_ + offsets_ptr_[i]);
}
break;
}
Expand All @@ -117,49 +140,39 @@ class Array {
}
}

Array(char* data,
size_t size,
DataType element_type,
std::vector<uint64_t>&& element_offsets)
: size_(size),
offsets_(std::move(element_offsets)),
element_type_(element_type) {
delete[] data_;
data_ = new char[size];
std::copy(data, data + size, data_);
if (IsVariableDataType(element_type_)) {
length_ = offsets_.size();
} else {
// int8, int16, int32 are all promoted to int32
if (element_type_ == DataType::INT8 ||
element_type_ == DataType::INT16) {
length_ = size / sizeof(int32_t);
} else {
length_ = size / GetDataTypeSize(element_type_);
}
}
}

Array(const Array& array) noexcept
: length_{array.length_},
size_{array.size_},
element_type_{array.element_type_} {
delete[] data_;
data_ = new char[array.size_];
std::copy(array.data_, array.data_ + array.size_, data_);
offsets_ = array.offsets_;
if (IsVariableDataType(array.element_type_)) {
AssertInfo(array.get_offsets_data() != nullptr,
"for array with variable length elements, offsets_ptr"
"must not be nullptr");
offsets_ptr_ = new uint32_t[length_];
std::copy_n(array.get_offsets_data(), array.length(), offsets_ptr_);
}
}

Array&
operator=(const Array& array) {
delete[] data_;

data_ = new char[array.size_];
std::copy(array.data_, array.data_ + array.size_, data_);
if (offsets_ptr_) {
delete[] offsets_ptr_;
}
length_ = array.length_;
size_ = array.size_;
offsets_ = array.offsets_;
element_type_ = array.element_type_;
data_ = new char[array.size_];
std::copy(array.data_, array.data_ + array.size_, data_);
if (IsVariableDataType(array.get_element_type())) {
AssertInfo(array.get_offsets_data() != nullptr,
"for array with variable length elements, offsets_ptr"
"must not be nullptr");
offsets_ptr_ = new uint32_t[length_];
std::copy_n(array.get_offsets_data(), array.length(), offsets_ptr_);
}
return *this;
}

Expand Down Expand Up @@ -241,10 +254,11 @@ class Array {
length_);
if constexpr (std::is_same_v<T, std::string> ||
std::is_same_v<T, std::string_view>) {
size_t element_length = (index == length_ - 1)
? size_ - offsets_.back()
: offsets_[index + 1] - offsets_[index];
return T(data_ + offsets_[index], element_length);
size_t element_length =
(index == length_ - 1)
? size_ - offsets_ptr_[length_ - 1]
: offsets_ptr_[index + 1] - offsets_ptr_[index];
return T(data_ + offsets_ptr_[index], element_length);
}
if constexpr (std::is_same_v<T, int> || std::is_same_v<T, int64_t> ||
std::is_same_v<T, int8_t> || std::is_same_v<T, int16_t> ||
Expand Down Expand Up @@ -272,14 +286,9 @@ class Array {
return reinterpret_cast<T*>(data_)[index];
}

const std::vector<uint64_t>&
get_offsets() const {
return offsets_;
}

std::vector<uint64_t>
get_offsets_in_copy() const {
return offsets_;
uint32_t*
get_offsets_data() const {
return offsets_ptr_;
}

ScalarArray
Expand Down Expand Up @@ -436,32 +445,30 @@ class Array {
char* data_{nullptr};
int length_ = 0;
int size_ = 0;
std::vector<uint64_t> offsets_{};
DataType element_type_ = DataType::NONE;
uint32_t* offsets_ptr_{nullptr};
};

class ArrayView {
public:
ArrayView() = default;

ArrayView(char* data,
int len,
size_t size,
DataType element_type,
std::vector<uint64_t>&& element_offsets)
: size_(size),
offsets_(std::move(element_offsets)),
element_type_(element_type) {
data_ = data;
uint32_t* offsets_ptr)
: data_(data),
length_(len),
size_(size),
element_type_(element_type),
offsets_ptr_(offsets_ptr) {
AssertInfo(data != nullptr,
"data pointer for ArrayView cannot be nullptr");
if (IsVariableDataType(element_type_)) {
length_ = offsets_.size();
} else {
// int8, int16, int32 are all promoted to int32
if (element_type_ == DataType::INT8 ||
element_type_ == DataType::INT16) {
length_ = size / sizeof(int32_t);
} else {
length_ = size / GetDataTypeSize(element_type_);
}
AssertInfo(offsets_ptr != nullptr,
"for array with variable length elements, offsets_ptr "
"must not be nullptr");
}
}

Expand All @@ -475,10 +482,11 @@ class ArrayView {

if constexpr (std::is_same_v<T, std::string> ||
std::is_same_v<T, std::string_view>) {
size_t element_length = (index == length_ - 1)
? size_ - offsets_.back()
: offsets_[index + 1] - offsets_[index];
return T(data_ + offsets_[index], element_length);
size_t element_length =
(index == length_ - 1)
? size_ - offsets_ptr_[length_ - 1]
: offsets_ptr_[index + 1] - offsets_ptr_[index];
return T(data_ + offsets_ptr_[index], element_length);
}
if constexpr (std::is_same_v<T, int> || std::is_same_v<T, int64_t> ||
std::is_same_v<T, float> || std::is_same_v<T, double>) {
Expand Down Expand Up @@ -580,11 +588,6 @@ class ArrayView {
data() const {
return data_;
}
// copy to result
std::vector<uint64_t>
get_offsets_in_copy() const {
return offsets_;
}

bool
is_same_array(const proto::plan::Array& arr2) const {
Expand Down Expand Up @@ -661,8 +664,10 @@ class ArrayView {
char* data_{nullptr};
int length_ = 0;
int size_ = 0;
std::vector<uint64_t> offsets_{};
DataType element_type_ = DataType::NONE;

//offsets ptr
uint32_t* offsets_ptr_{nullptr};
};

} // namespace milvus
25 changes: 14 additions & 11 deletions internal/core/src/common/Chunk.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "common/Span.h"
#include "common/Types.h"
#include "common/Chunk.h"
#include "log/Log.h"

namespace milvus {

Expand Down Expand Up @@ -51,22 +52,24 @@ ArrayChunk::ConstructViews() {
int offset = offsets_lens_[2 * i];
int next_offset = offsets_lens_[2 * (i + 1)];
int len = offsets_lens_[2 * i + 1];

auto data_ptr = data_ + offset;
auto offsets_len = 0;
std::vector<uint64_t> element_indices = {};
auto offsets_bytes_len = 0;
uint32_t* offsets_ptr = nullptr;
if (IsStringDataType(element_type_)) {
offsets_len = len * sizeof(uint64_t);
std::vector<uint64_t> tmp(
reinterpret_cast<uint64_t*>(data_ptr),
reinterpret_cast<uint64_t*>(data_ptr + offsets_len));
element_indices = std::move(tmp);
offsets_bytes_len = len * sizeof(uint32_t);
offsets_ptr = reinterpret_cast<uint32_t*>(data_ptr);
}
views_.emplace_back(data_ptr + offsets_len,
next_offset - offset - offsets_len,
views_.emplace_back(data_ptr + offsets_bytes_len,
len,
next_offset - offset - offsets_bytes_len,
element_type_,
std::move(element_indices));
offsets_ptr);
}
uint32_t view_size = 0;
for (auto& view : views_) {
view_size += sizeof(view);
}
LOG_INFO("hc===chunk_view_size:{}, row_num:{}", view_size, row_nums_);
}

SpanBase
Expand Down
28 changes: 14 additions & 14 deletions internal/core/src/common/ChunkWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -205,32 +205,32 @@ ArrayChunkWriter::write(std::shared_ptr<arrow::RecordBatchReader> data) {

int offsets_num = row_nums_ + 1;
int len_num = row_nums_;
int offset_start_pos =
target_->tell() + sizeof(uint64_t) * (offsets_num + len_num);
std::vector<uint64_t> offsets;
std::vector<uint64_t> lens;
for (auto& arr : arrays) {
offsets.push_back(offset_start_pos);
lens.push_back(arr.length());
offset_start_pos +=
is_string ? sizeof(uint64_t) * arr.get_offsets().size() : 0;
uint32_t offset_start_pos =
target_->tell() + sizeof(uint32_t) * (offsets_num + len_num);
std::vector<uint32_t> offsets(offsets_num);
std::vector<uint32_t> lens(len_num);
for (auto i = 0; i < arrays.size(); i++) {
auto& arr = arrays[i];
offsets[i] = offset_start_pos;
lens[i] = arr.length();
offset_start_pos += is_string ? sizeof(uint32_t) * lens[i] : 0;
offset_start_pos += arr.byte_size();
}
offsets.push_back(offset_start_pos);

for (int i = 0; i < offsets.size(); i++) {
if (i == offsets.size() - 1) {
target_->write(&offsets[i], sizeof(uint64_t));
target_->write(&offsets[i], sizeof(uint32_t));
break;
}
target_->write(&offsets[i], sizeof(uint64_t));
target_->write(&lens[i], sizeof(uint64_t));
target_->write(&offsets[i], sizeof(uint32_t));
target_->write(&lens[i], sizeof(uint32_t));
}

for (auto& arr : arrays) {
if (is_string) {
target_->write(arr.get_offsets().data(),
arr.get_offsets().size() * sizeof(uint64_t));
target_->write(arr.get_offsets_data(),
arr.length() * sizeof(uint32_t));
}
target_->write(arr.data(), arr.byte_size());
}
Expand Down
30 changes: 19 additions & 11 deletions internal/core/src/mmap/ChunkData.h
Original file line number Diff line number Diff line change
Expand Up @@ -191,21 +191,29 @@ VariableLengthChunk<Array>::set(const Array* src,
begin,
size_);
size_t total_size = 0;
size_t padding_size = 0;
for (auto i = 0; i < length; i++) {
total_size += src[i].byte_size() + padding_size;
total_size += src[i].byte_size();
}
auto buf = (char*)mcm->Allocate(mmap_descriptor_, total_size);
AssertInfo(buf != nullptr, "failed to allocate memory from mmap_manager.");
for (auto i = 0, offset = 0; i < length; i++) {
auto data_size = src[i].byte_size() + padding_size;
char* data_ptr = buf + offset;
std::copy(src[i].data(), src[i].data() + src[i].byte_size(), data_ptr);
data_[i + begin] = ArrayView(data_ptr,
data_size,
src[i].get_element_type(),
src[i].get_offsets_in_copy());
offset += data_size;
char* data_ptr = buf;
for (auto i = 0; i < length; i++) {
int length = src[i].length();
uint32_t* offsets_ptr = nullptr;
auto element_type = src[i].get_element_type();
// need copy offsets for variable types
if (IsVariableDataType(element_type)) {
std::copy(src[i].get_offsets_data(),
src[i].get_offsets_data() + length,
data_ptr);
offsets_ptr = reinterpret_cast<uint32_t*>(data_ptr);
data_ptr += length * sizeof(uint32_t);
}
auto data_size = src[i].byte_size();
std::copy(src[i].data(), src[i].data() + data_size, data_ptr);
data_[i + begin] =
ArrayView(data_ptr, length, data_size, element_type, offsets_ptr);
data_ptr += data_size;
}
}

Expand Down
3 changes: 2 additions & 1 deletion internal/core/src/mmap/ChunkVector.h
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,10 @@ class ThreadSafeChunkVector : public ChunkVectorBase<Type> {
} else if constexpr (std::is_same_v<Array, Type>) {
auto& src = chunk[chunk_offset];
return ArrayView(const_cast<char*>(src.data()),
src.length(),
src.byte_size(),
src.get_element_type(),
src.get_offsets_in_copy());
src.get_offsets_data());
} else {
return chunk[chunk_offset];
}
Expand Down
Loading

0 comments on commit 1843325

Please sign in to comment.