Skip to content

Commit

Permalink
enhance: refine array view to optimize memory usage(#38736)
Browse files Browse the repository at this point in the history
Signed-off-by: MrPresent-Han <[email protected]>
  • Loading branch information
MrPresent-Han committed Dec 31, 2024
1 parent 18a3bc7 commit 76efe5b
Show file tree
Hide file tree
Showing 12 changed files with 149 additions and 198 deletions.
143 changes: 71 additions & 72 deletions internal/core/src/common/Array.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,27 @@ class Array {

~Array() {
delete[] data_;
if (offsets_ptr_) {
// only deallocate offsets for string type array
delete[] offsets_ptr_;
}
}

Array(char* data,
int len,
size_t size,
DataType element_type,
const uint32_t* offsets_ptr)
: size_(size), length_(len), element_type_(element_type) {
data_ = new char[size];
std::copy(data, data + size, data_);
if (IsVariableDataType(element_type)) {
AssertInfo(offsets_ptr != nullptr,
"For variable type elements in array, offsets_ptr must "
"be non-null");
offsets_ptr_ = new uint32_t[len];
std::copy(offsets_ptr, offsets_ptr + len, offsets_ptr_);
}
}

explicit Array(const ScalarArray& field_data) {
Expand Down Expand Up @@ -97,17 +118,19 @@ class Array {
case ScalarArray::kStringData: {
element_type_ = DataType::STRING;
length_ = field_data.string_data().data().size();
offsets_.reserve(length_);
offsets_ptr_ = new uint32_t[length_];
for (int i = 0; i < length_; ++i) {
offsets_.push_back(size_);
size_ += field_data.string_data().data(i).size();
offsets_ptr_[i] = size_;
size_ +=
field_data.string_data()
.data(i)
.size(); //type risk here between uint32_t vs size_t
}

data_ = new char[size_];
for (int i = 0; i < length_; ++i) {
std::copy_n(field_data.string_data().data(i).data(),
field_data.string_data().data(i).size(),
data_ + offsets_[i]);
data_ + offsets_ptr_[i]);
}
break;
}
Expand All @@ -117,49 +140,33 @@ class Array {
}
}

Array(char* data,
size_t size,
DataType element_type,
std::vector<uint64_t>&& element_offsets)
: size_(size),
offsets_(std::move(element_offsets)),
element_type_(element_type) {
delete[] data_;
data_ = new char[size];
std::copy(data, data + size, data_);
if (IsVariableDataType(element_type_)) {
length_ = offsets_.size();
} else {
// int8, int16, int32 are all promoted to int32
if (element_type_ == DataType::INT8 ||
element_type_ == DataType::INT16) {
length_ = size / sizeof(int32_t);
} else {
length_ = size / GetDataTypeSize(element_type_);
}
}
}

Array(const Array& array) noexcept
: length_{array.length_},
size_{array.size_},
element_type_{array.element_type_} {
delete[] data_;
data_ = new char[array.size_];
std::copy(array.data_, array.data_ + array.size_, data_);
offsets_ = array.offsets_;
if (IsVariableDataType(array.element_type_)) {
offsets_ptr_ = new uint32_t[length_];
std::copy_n(array.get_offsets_data(), array.length(), offsets_ptr_);
}
}

Array&
operator=(const Array& array) {
delete[] data_;

data_ = new char[array.size_];
std::copy(array.data_, array.data_ + array.size_, data_);
if (offsets_ptr_) {
delete[] offsets_ptr_;
}
length_ = array.length_;
size_ = array.size_;
offsets_ = array.offsets_;
element_type_ = array.element_type_;
data_ = new char[array.size_];
std::copy(array.data_, array.data_ + array.size_, data_);
if (IsVariableDataType(array.get_element_type())) {
offsets_ptr_ = new uint32_t[length_];
std::copy_n(array.get_offsets_data(), array.length(), offsets_ptr_);
}
return *this;
}

Expand Down Expand Up @@ -241,10 +248,11 @@ class Array {
length_);
if constexpr (std::is_same_v<T, std::string> ||
std::is_same_v<T, std::string_view>) {
size_t element_length = (index == length_ - 1)
? size_ - offsets_.back()
: offsets_[index + 1] - offsets_[index];
return T(data_ + offsets_[index], element_length);
size_t element_length =
(index == length_ - 1)
? size_ - offsets_ptr_[length_ - 1]
: offsets_ptr_[index + 1] - offsets_ptr_[index];
return T(data_ + offsets_ptr_[index], element_length);
}
if constexpr (std::is_same_v<T, int> || std::is_same_v<T, int64_t> ||
std::is_same_v<T, int8_t> || std::is_same_v<T, int16_t> ||
Expand Down Expand Up @@ -272,14 +280,9 @@ class Array {
return reinterpret_cast<T*>(data_)[index];
}

const std::vector<uint64_t>&
get_offsets() const {
return offsets_;
}

std::vector<uint64_t>
get_offsets_in_copy() const {
return offsets_;
const uint32_t*
get_offsets_data() const {
return offsets_ptr_;
}

ScalarArray
Expand Down Expand Up @@ -435,33 +438,31 @@ class Array {
private:
char* data_{nullptr};
int length_ = 0;
int size_ = 0;
std::vector<uint64_t> offsets_{};
uint32_t size_ = 0;
DataType element_type_ = DataType::NONE;
uint32_t* offsets_ptr_{nullptr};
};

class ArrayView {
public:
ArrayView() = default;

ArrayView(char* data,
int len,
size_t size,
DataType element_type,
std::vector<uint64_t>&& element_offsets)
: size_(size),
offsets_(std::move(element_offsets)),
element_type_(element_type) {
data_ = data;
const uint32_t* offsets_ptr)
: data_(data),
length_(len),
size_(size),
element_type_(element_type),
offsets_ptr_(offsets_ptr) {
AssertInfo(data != nullptr,
"data pointer for ArrayView cannot be nullptr");
if (IsVariableDataType(element_type_)) {
length_ = offsets_.size();
} else {
// int8, int16, int32 are all promoted to int32
if (element_type_ == DataType::INT8 ||
element_type_ == DataType::INT16) {
length_ = size / sizeof(int32_t);
} else {
length_ = size / GetDataTypeSize(element_type_);
}
AssertInfo(offsets_ptr != nullptr,
"for variable data type, offsets_ptr for array view "
"must not be nullptr");
}
}

Expand All @@ -475,10 +476,11 @@ class ArrayView {

if constexpr (std::is_same_v<T, std::string> ||
std::is_same_v<T, std::string_view>) {
size_t element_length = (index == length_ - 1)
? size_ - offsets_.back()
: offsets_[index + 1] - offsets_[index];
return T(data_ + offsets_[index], element_length);
size_t element_length =
(index == length_ - 1)
? size_ - offsets_ptr_[length_ - 1]
: offsets_ptr_[index + 1] - offsets_ptr_[index];
return T(data_ + offsets_ptr_[index], element_length);
}
if constexpr (std::is_same_v<T, int> || std::is_same_v<T, int64_t> ||
std::is_same_v<T, float> || std::is_same_v<T, double>) {
Expand Down Expand Up @@ -580,11 +582,6 @@ class ArrayView {
data() const {
return data_;
}
// copy to result
std::vector<uint64_t>
get_offsets_in_copy() const {
return offsets_;
}

bool
is_same_array(const proto::plan::Array& arr2) const {
Expand Down Expand Up @@ -661,8 +658,10 @@ class ArrayView {
char* data_{nullptr};
int length_ = 0;
int size_ = 0;
std::vector<uint64_t> offsets_{};
DataType element_type_ = DataType::NONE;

//offsets ptr
const uint32_t* offsets_ptr_;
};

} // namespace milvus
19 changes: 8 additions & 11 deletions internal/core/src/common/Chunk.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,21 +51,18 @@ ArrayChunk::ConstructViews() {
int offset = offsets_lens_[2 * i];
int next_offset = offsets_lens_[2 * (i + 1)];
int len = offsets_lens_[2 * i + 1];

auto data_ptr = data_ + offset;
auto offsets_len = 0;
std::vector<uint64_t> element_indices = {};
auto offsets_bytes_len = 0;
uint32_t* offsets_ptr = nullptr;
if (IsStringDataType(element_type_)) {
offsets_len = len * sizeof(uint64_t);
std::vector<uint64_t> tmp(
reinterpret_cast<uint64_t*>(data_ptr),
reinterpret_cast<uint64_t*>(data_ptr + offsets_len));
element_indices = std::move(tmp);
offsets_bytes_len = len * sizeof(uint32_t);
offsets_ptr = reinterpret_cast<uint32_t*>(data_ptr);
}
views_.emplace_back(data_ptr + offsets_len,
next_offset - offset - offsets_len,
views_.emplace_back(data_ptr + offsets_bytes_len,
len,
next_offset - offset - offsets_bytes_len,
element_type_,
std::move(element_indices));
offsets_ptr);
}
}

Expand Down
28 changes: 14 additions & 14 deletions internal/core/src/common/ChunkWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -205,32 +205,32 @@ ArrayChunkWriter::write(std::shared_ptr<arrow::RecordBatchReader> data) {

int offsets_num = row_nums_ + 1;
int len_num = row_nums_;
int offset_start_pos =
target_->tell() + sizeof(uint64_t) * (offsets_num + len_num);
std::vector<uint64_t> offsets;
std::vector<uint64_t> lens;
for (auto& arr : arrays) {
offsets.push_back(offset_start_pos);
lens.push_back(arr.length());
offset_start_pos +=
is_string ? sizeof(uint64_t) * arr.get_offsets().size() : 0;
uint32_t offset_start_pos =
target_->tell() + sizeof(uint32_t) * (offsets_num + len_num);
std::vector<uint32_t> offsets(offsets_num);
std::vector<uint32_t> lens(len_num);
for (auto i = 0; i < arrays.size(); i++) {
auto& arr = arrays[i];
offsets[i] = offset_start_pos;
lens[i] = arr.length();
offset_start_pos += is_string ? sizeof(uint32_t) * lens[i] : 0;
offset_start_pos += arr.byte_size();
}
offsets.push_back(offset_start_pos);

for (int i = 0; i < offsets.size(); i++) {
if (i == offsets.size() - 1) {
target_->write(&offsets[i], sizeof(uint64_t));
target_->write(&offsets[i], sizeof(uint32_t));
break;
}
target_->write(&offsets[i], sizeof(uint64_t));
target_->write(&lens[i], sizeof(uint64_t));
target_->write(&offsets[i], sizeof(uint32_t));
target_->write(&lens[i], sizeof(uint32_t));
}

for (auto& arr : arrays) {
if (is_string) {
target_->write(arr.get_offsets().data(),
arr.get_offsets().size() * sizeof(uint64_t));
target_->write(arr.get_offsets_data(),
arr.length() * sizeof(uint32_t));
}
target_->write(arr.data(), arr.byte_size());
}
Expand Down
3 changes: 2 additions & 1 deletion internal/core/src/mmap/ChunkData.h
Original file line number Diff line number Diff line change
Expand Up @@ -202,9 +202,10 @@ VariableLengthChunk<Array>::set(const Array* src,
char* data_ptr = buf + offset;
std::copy(src[i].data(), src[i].data() + src[i].byte_size(), data_ptr);
data_[i + begin] = ArrayView(data_ptr,
src[i].length(),
data_size,
src[i].get_element_type(),
src[i].get_offsets_in_copy());
src[i].get_offsets_data());
offset += data_size;
}
}
Expand Down
3 changes: 2 additions & 1 deletion internal/core/src/mmap/ChunkVector.h
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,10 @@ class ThreadSafeChunkVector : public ChunkVectorBase<Type> {
} else if constexpr (std::is_same_v<Array, Type>) {
auto& src = chunk[chunk_offset];
return ArrayView(const_cast<char*>(src.data()),
src.length(),
src.byte_size(),
src.get_element_type(),
src.get_offsets_in_copy());
src.get_offsets_data());
} else {
return chunk[chunk_offset];
}
Expand Down
15 changes: 10 additions & 5 deletions internal/core/src/mmap/Column.h
Original file line number Diff line number Diff line change
Expand Up @@ -918,7 +918,10 @@ class SingleChunkArrayColumn : public SingleChunkColumnBase {
void
Append(const Array& array, bool valid_data = false) {
indices_.emplace_back(data_size_);
element_indices_.emplace_back(array.get_offsets());
element_indices_.emplace_back(
array.get_offsets_data(),
array.get_offsets_data() + array.length());
// have to copy element offsets from external array
if (nullable_) {
return SingleChunkColumnBase::Append(
static_cast<const char*>(array.data()),
Expand All @@ -931,7 +934,7 @@ class SingleChunkArrayColumn : public SingleChunkColumnBase {

void
Seal(std::vector<uint64_t>&& indices = {},
std::vector<std::vector<uint64_t>>&& element_indices = {}) {
std::vector<std::vector<uint32_t>>&& element_indices = {}) {
if (!indices.empty()) {
indices_ = std::move(indices);
element_indices_ = std::move(element_indices);
Expand All @@ -946,20 +949,22 @@ class SingleChunkArrayColumn : public SingleChunkColumnBase {
views_.reserve(indices_.size());
for (size_t i = 0; i < indices_.size() - 1; i++) {
views_.emplace_back(data_ + indices_[i],
element_indices_[i].size(),
indices_[i + 1] - indices_[i],
element_type_,
std::move(element_indices_[i]));
element_indices_[i].data());
}
views_.emplace_back(data_ + indices_.back(),
element_indices_[indices_.size() - 1].size(),
data_size_ - indices_.back(),
element_type_,
std::move(element_indices_[indices_.size() - 1]));
element_indices_[indices_.size() - 1].data());
element_indices_.clear();
}

private:
std::vector<uint64_t> indices_{};
std::vector<std::vector<uint64_t>> element_indices_{};
std::vector<std::vector<uint32_t>> element_indices_{};
// Compatible with current Span type
std::vector<ArrayView> views_{};
DataType element_type_;
Expand Down
Loading

0 comments on commit 76efe5b

Please sign in to comment.