Skip to content

Commit

Permalink
enhance: optimize some cache to reduce memory usage (milvus-io#33534)
Browse files Browse the repository at this point in the history
milvus-io#33533

Signed-off-by: luzhang <[email protected]>
Co-authored-by: luzhang <[email protected]>
  • Loading branch information
zhagnlu and luzhang authored Jun 4, 2024
1 parent 2422084 commit c6f8a73
Show file tree
Hide file tree
Showing 5 changed files with 24 additions and 81 deletions.
7 changes: 4 additions & 3 deletions internal/core/src/mmap/Column.h
Original file line number Diff line number Diff line change
Expand Up @@ -459,9 +459,7 @@ class VariableColumn : public ColumnBase {

std::string_view
RawAt(const int i) const {
size_t len = (i == indices_.size() - 1) ? size_ - indices_.back()
: indices_[i + 1] - indices_[i];
return std::string_view(data_ + indices_[i], len);
return std::string_view(views_[i]);
}

void
Expand Down Expand Up @@ -502,6 +500,9 @@ class VariableColumn : public ColumnBase {
}

ConstructViews();

// Not need indices_ after
indices_.clear();
}

protected:
Expand Down
13 changes: 4 additions & 9 deletions internal/core/src/segcore/InsertRecord.h
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,8 @@ class OffsetOrderedArray : public OffsetMap {
PanicInfo(Unsupported,
"OffsetOrderedArray could not insert after seal");
}
array_.push_back(std::make_pair(std::get<T>(pk), offset));
array_.push_back(
std::make_pair(std::get<T>(pk), static_cast<int32_t>(offset)));
}

void
Expand Down Expand Up @@ -285,13 +286,13 @@ class OffsetOrderedArray : public OffsetMap {

private:
bool is_sealed = false;
std::vector<std::pair<T, int64_t>> array_;
std::vector<std::pair<T, int32_t>> array_;
};

template <bool is_sealed = false>
struct InsertRecord {
InsertRecord(const Schema& schema, int64_t size_per_chunk)
: row_ids_(size_per_chunk), timestamps_(size_per_chunk) {
: timestamps_(size_per_chunk) {
std::optional<FieldId> pk_field_id = schema.get_primary_field_id();

for (auto& field : schema) {
Expand Down Expand Up @@ -590,10 +591,8 @@ struct InsertRecord {
void
clear() {
timestamps_.clear();
row_ids_.clear();
reserved = 0;
ack_responder_.clear();
timestamp_index_ = TimestampIndex();
pk2offset_->clear();
fields_data_.clear();
}
Expand All @@ -605,15 +604,11 @@ struct InsertRecord {

public:
ConcurrentVector<Timestamp> timestamps_;
ConcurrentVector<idx_t> row_ids_;

// used for preInsert of growing segment
std::atomic<int64_t> reserved = 0;
AckResponder ack_responder_;

// used for timestamps index of sealed segment
TimestampIndex timestamp_index_;

// pks to row offset
std::unique_ptr<OffsetMap> pk2offset_;

Expand Down
9 changes: 2 additions & 7 deletions internal/core/src/segcore/SegmentGrowingImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,6 @@ SegmentGrowingImpl::Insert(int64_t reserved_offset,
// step 3: fill into Segment.ConcurrentVector
insert_record_.timestamps_.set_data_raw(
reserved_offset, timestamps_raw, num_rows);
insert_record_.row_ids_.set_data_raw(reserved_offset, row_ids, num_rows);

// update the mem size of timestamps and row IDs
stats_.mem_size += num_rows * (sizeof(Timestamp) + sizeof(idx_t));
Expand Down Expand Up @@ -224,7 +223,6 @@ SegmentGrowingImpl::LoadFieldData(const LoadFieldDataInfo& infos) {
}

if (field_id == RowFieldID) {
insert_record_.row_ids_.set_data_raw(reserved_offset, field_data);
continue;
}

Expand Down Expand Up @@ -313,7 +311,6 @@ SegmentGrowingImpl::LoadFieldDataV2(const LoadFieldDataInfo& infos) {
}

if (field_id == RowFieldID) {
insert_record_.row_ids_.set_data_raw(reserved_offset, field_data);
continue;
}

Expand Down Expand Up @@ -766,10 +763,8 @@ SegmentGrowingImpl::bulk_subscript(SystemFieldType system_type,
static_cast<Timestamp*>(output));
break;
case SystemFieldType::RowId:
bulk_subscript_impl<int64_t>(&this->insert_record_.row_ids_,
seg_offsets,
count,
static_cast<int64_t*>(output));
PanicInfo(ErrorCode::Unsupported,
"RowId retrieve is not supported");
break;
default:
PanicInfo(DataTypeInvalid, "unknown subscript fields");
Expand Down
1 change: 1 addition & 0 deletions internal/core/src/segcore/SegmentInterface.h
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ class SegmentInternalInterface : public SegmentInterface {
virtual int64_t
num_chunk_data(FieldId field_id) const = 0;

// bitset 1 means not hit. 0 means hit.
virtual void
mask_with_timestamps(BitsetType& bitset_chunk,
Timestamp timestamp) const = 0;
Expand Down
75 changes: 13 additions & 62 deletions internal/core/src/segcore/SegmentSealedImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <string_view>
#include <unordered_map>
#include <vector>
#include <boost/iterator/counting_iterator.hpp>

#include "Utils.h"
#include "Types.h"
Expand Down Expand Up @@ -348,35 +349,15 @@ SegmentSealedImpl::LoadFieldData(FieldId field_id, FieldDataInfo& data) {
offset += row_count;
}

TimestampIndex index;
auto min_slice_length = num_rows < 4096 ? 1 : 4096;
auto meta = GenerateFakeSlices(
timestamps.data(), num_rows, min_slice_length);
index.set_length_meta(std::move(meta));
// todo ::opt to avoid copy timestamps from field data
index.build_with(timestamps.data(), num_rows);

// use special index
std::unique_lock lck(mutex_);
AssertInfo(insert_record_.timestamps_.empty(), "already exists");
insert_record_.timestamps_.fill_chunk_data(field_data);
insert_record_.timestamp_index_ = std::move(index);
AssertInfo(insert_record_.timestamps_.num_chunk() == 1,
"num chunk not equal to 1 for sealed segment");
stats_.mem_size += sizeof(Timestamp) * data.row_count;
} else {
AssertInfo(system_field_type == SystemFieldType::RowId,
"System field type of id column is not RowId");

auto field_data = storage::CollectFieldDataChannel(data.channel);

// write data under lock
std::unique_lock lck(mutex_);
AssertInfo(insert_record_.row_ids_.empty(), "already exists");
insert_record_.row_ids_.fill_chunk_data(field_data);
AssertInfo(insert_record_.row_ids_.num_chunk() == 1,
"num chunk not equal to 1 for sealed segment");
stats_.mem_size += sizeof(idx_t) * data.row_count;
}
++system_ready_count_;
} else {
Expand Down Expand Up @@ -925,9 +906,7 @@ SegmentSealedImpl::DropFieldData(const FieldId field_id) {

std::unique_lock lck(mutex_);
--system_ready_count_;
if (system_field_type == SystemFieldType::RowId) {
insert_record_.row_ids_.clear();
} else if (system_field_type == SystemFieldType::Timestamp) {
if (system_field_type == SystemFieldType::Timestamp) {
insert_record_.timestamps_.clear();
}
lck.unlock();
Expand Down Expand Up @@ -1042,13 +1021,7 @@ SegmentSealedImpl::bulk_subscript(SystemFieldType system_type,
static_cast<Timestamp*>(output));
break;
case SystemFieldType::RowId:
AssertInfo(insert_record_.row_ids_.num_chunk() == 1,
"num chunk of rowID not equal to 1 for sealed segment");
bulk_subscript_impl<int64_t>(
this->insert_record_.row_ids_.get_chunk_data(0),
seg_offsets,
count,
static_cast<int64_t*>(output));
PanicInfo(ErrorCode::Unsupported, "RowId retrieve not supported");
break;
default:
PanicInfo(DataTypeInvalid,
Expand Down Expand Up @@ -1512,12 +1485,6 @@ SegmentSealedImpl::debug() const {
void
SegmentSealedImpl::LoadSegmentMeta(
const proto::segcore::LoadSegmentMeta& segment_meta) {
std::unique_lock lck(mutex_);
std::vector<int64_t> slice_lengths;
for (auto& info : segment_meta.metas()) {
slice_lengths.push_back(info.row_count());
}
insert_record_.timestamp_index_.set_length_meta(std::move(slice_lengths));
PanicInfo(NotImplemented, "unimplemented");
}

Expand All @@ -1529,33 +1496,17 @@ SegmentSealedImpl::get_active_count(Timestamp ts) const {

void
SegmentSealedImpl::mask_with_timestamps(BitsetType& bitset_chunk,
Timestamp timestamp) const {
// TODO change the
AssertInfo(insert_record_.timestamps_.num_chunk() == 1,
"num chunk not equal to 1 for sealed segment");
const auto& timestamps_data = insert_record_.timestamps_.get_chunk(0);
AssertInfo(timestamps_data.size() == get_row_count(),
fmt::format("Timestamp size not equal to row count: {}, {}",
timestamps_data.size(),
get_row_count()));
auto range = insert_record_.timestamp_index_.get_active_range(timestamp);

// range == (size_, size_) and size_ is this->timestamps_.size().
// it means these data are all useful, we don't need to update bitset_chunk.
// It can be thought of as an OR operation with another bitmask that is all 0s, but it is not necessary to do so.
if (range.first == range.second && range.first == timestamps_data.size()) {
// just skip
return;
}
// range == (0, 0). it means these data can not be used, directly set bitset_chunk to all 1s.
// It can be thought of as an OR operation with another bitmask that is all 1s.
if (range.first == range.second && range.first == 0) {
bitset_chunk.set();
return;
Timestamp ts) const {
auto row_count = this->get_row_count();
auto& ts_vec = this->insert_record_.timestamps_;
auto iter = std::upper_bound(
boost::make_counting_iterator(static_cast<int64_t>(0)),
boost::make_counting_iterator(row_count),
ts,
[&](Timestamp ts, int64_t index) { return ts < ts_vec[index]; });
for (size_t i = *iter; i < row_count; ++i) {
bitset_chunk.set(i);
}
auto mask = TimestampIndex::GenerateBitset(
timestamp, range, timestamps_data.data(), timestamps_data.size());
bitset_chunk |= mask;
}

bool
Expand Down

0 comments on commit c6f8a73

Please sign in to comment.