Skip to content

Commit

Permalink
enhance: refactor delete mvcc function
Browse files Browse the repository at this point in the history
Signed-off-by: luzhang <[email protected]>
  • Loading branch information
luzhang committed Nov 28, 2024
1 parent e247ff9 commit 9d2606d
Show file tree
Hide file tree
Showing 17 changed files with 671 additions and 316 deletions.
37 changes: 5 additions & 32 deletions internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -700,7 +700,7 @@ ChunkedSegmentSealedImpl::LoadDeletedRecord(const LoadDeletedRecordInfo& info) {
sort_pks[i] = pk;
}

deleted_record_.push(sort_pks, sort_timestamps.data());
deleted_record_.LoadPush(sort_pks, sort_timestamps.data());
}

void
Expand Down Expand Up @@ -858,35 +858,7 @@ void
ChunkedSegmentSealedImpl::mask_with_delete(BitsetTypeView& bitset,
int64_t ins_barrier,
Timestamp timestamp) const {
auto del_barrier = get_barrier(get_deleted_record(), timestamp);
if (del_barrier == 0) {
return;
}

auto bitmap_holder = std::shared_ptr<DeletedRecord::TmpBitmap>();

auto search_fn = [this](const PkType& pk, int64_t barrier) {
return this->search_pk(pk, barrier);
};
bitmap_holder = get_deleted_bitmap(del_barrier,
ins_barrier,
deleted_record_,
insert_record_,
timestamp,
is_sorted_by_pk_,
search_fn);

if (!bitmap_holder || !bitmap_holder->bitmap_ptr) {
return;
}
auto& delete_bitset = *bitmap_holder->bitmap_ptr;
AssertInfo(
delete_bitset.size() == bitset.size(),
fmt::format(
"Deleted bitmap size:{} not equal to filtered bitmap size:{}",
delete_bitset.size(),
bitset.size()));
bitset |= delete_bitset;
deleted_record_.Query(bitset, ins_barrier, timestamp);
}

void
Expand Down Expand Up @@ -1337,7 +1309,8 @@ ChunkedSegmentSealedImpl::ChunkedSegmentSealedImpl(
id_(segment_id),
col_index_meta_(index_meta),
TEST_skip_index_for_retrieve_(TEST_skip_index_for_retrieve),
is_sorted_by_pk_(is_sorted_by_pk) {
is_sorted_by_pk_(is_sorted_by_pk),
deleted_record_(&insert_record_, this) {
mmap_descriptor_ = std::shared_ptr<storage::MmapChunkDescriptor>(
new storage::MmapChunkDescriptor({segment_id, SegmentType::Sealed}));
auto mcm = storage::MmapManager::GetInstance().GetMmapChunkManager();
Expand Down Expand Up @@ -1974,7 +1947,7 @@ ChunkedSegmentSealedImpl::Delete(int64_t reserved_offset, // deprecated
sort_pks[i] = pk;
}

deleted_record_.push(sort_pks, sort_timestamps.data());
deleted_record_.StreamPush(sort_pks, sort_timestamps.data());
return SegcoreError::success();
}

Expand Down
13 changes: 7 additions & 6 deletions internal/core/src/segcore/ChunkedSegmentSealedImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,11 @@ class ChunkedSegmentSealedImpl : public SegmentSealed {
return stats_.mem_size.load() + deleted_record_.mem_size();
}

InsertRecord<true>&
get_insert_record() override {
return insert_record_;
}

int64_t
get_row_count() const override;

Expand Down Expand Up @@ -293,6 +298,7 @@ class ChunkedSegmentSealedImpl : public SegmentSealed {
// } else {
num_rows_ = row_count;
// }
deleted_record_.set_sealed_row_count(row_count);
}

void
Expand All @@ -317,11 +323,6 @@ class ChunkedSegmentSealedImpl : public SegmentSealed {
return system_ready_count_ == 2;
}

const DeletedRecord&
get_deleted_record() const {
return deleted_record_;
}

std::pair<std::unique_ptr<IdArray>, std::vector<SegOffset>>
search_ids(const IdArray& id_array, Timestamp timestamp) const override;

Expand Down Expand Up @@ -362,7 +363,7 @@ class ChunkedSegmentSealedImpl : public SegmentSealed {
InsertRecord<true> insert_record_;

// deleted pks
mutable DeletedRecord deleted_record_;
mutable DeletedRecord<true> deleted_record_;

LoadFieldDataInfo field_data_info_;

Expand Down
220 changes: 220 additions & 0 deletions internal/core/src/segcore/DeletedRecord.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,220 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License

#include "segcore/DeletedRecord.h"
#include "segcore/SegmentInterface.h"

namespace milvus::segcore {

template class DeletedRecord<true>;
template class DeletedRecord<false>;

template <bool is_sealed>
void
DeletedRecord<is_sealed>::LoadPush(const std::vector<PkType>& pks,
const Timestamp* timestamps) {
InternalPush(pks, timestamps);

SortedDeleteList::Accessor accessor(deleted_lists_);
auto* last = accessor.last();
Assert(last != nullptr);
max_load_timestamp_ = last->first;
}

template <bool is_sealed>
void
DeletedRecord<is_sealed>::StreamPush(const std::vector<PkType>& pks,
const Timestamp* timestamps) {
InternalPush(pks, timestamps);

bool can_dump = timestamps[0] >= max_load_timestamp_;
if (can_dump) {
DumpSnapshot();
}
}

template <bool is_sealed>
void
DeletedRecord<is_sealed>::InternalPush(const std::vector<PkType>& pks,
const Timestamp* timestamps) {
int64_t removed_num = 0;
int64_t mem_add = 0;

SortedDeleteList::Accessor accessor(deleted_lists_);
for (size_t i = 0; i < pks.size(); ++i) {
auto deleted_pk = pks[i];
auto deleted_ts = timestamps[i];
std::vector<SegOffset> offsets;
if (segment_) {
offsets = std::move(segment_->search_pk(deleted_pk, deleted_ts));
} else {
// only for testing
offsets =
std::move(insert_record_->search_pk(deleted_pk, deleted_ts));
}
for (auto& offset : offsets) {
auto row_id = offset.get();
// if alreay deleted, no need to add new record
if (deleted_mask_.size() > row_id && deleted_mask_[row_id]) {
continue;
}
// if insert record and delete record is same timestamp,
// delete not take effect on this record.
if (deleted_ts == insert_record_->timestamps_[row_id]) {
continue;
}
accessor.insert(std::make_pair(deleted_ts, row_id));
if constexpr (is_sealed) {
Assert(deleted_mask_.size() > 0);
deleted_mask_.set(row_id);
} else {
// need to add mask size firstly for growing segment
deleted_mask_.resize(insert_record_->size());
deleted_mask_.set(row_id);
}
removed_num++;
mem_add += DELETE_PAIR_SIZE;
}
}

n_.fetch_add(removed_num);
mem_size_.fetch_add(mem_add);
}

template <bool is_sealed>
void
DeletedRecord<is_sealed>::DumpSnapshot() {
SortedDeleteList::Accessor accessor(deleted_lists_);
auto total_size = accessor.size();
auto dumped_size =
snapshots_.empty() ? 0 : snapshots_.size() * DUMP_BATCH_SIZE;

while (total_size - dumped_size > DUMP_BATCH_SIZE) {
int32_t bitsize = 0;
if constexpr (is_sealed) {
bitsize = sealed_row_count_;
} else {
bitsize = insert_record_->size();
}
BitsetType bitmap(bitsize, false);

auto it = accessor.begin();
Timestamp last_dump_ts = 0;
if (!snapshots_.empty()) {
it = snap_next_iter_.back();
last_dump_ts = snapshots_.back().first;
bitmap.inplace_and_with_count(snapshots_.back().second,
snapshots_.back().second.size());
}

while (total_size - dumped_size > DUMP_BATCH_SIZE &&
it != accessor.end()) {
Timestamp dump_ts = 0;

for (auto size = 0; size < DUMP_BATCH_SIZE; ++it, ++size) {
bitmap.set(it->second);
if (size == DUMP_BATCH_SIZE - 1) {
dump_ts = it->first;
}
}

{
std::unique_lock<std::shared_mutex> lock(snap_lock_);
if (dump_ts == last_dump_ts) {
// only update
snapshots_.back().second = std::move(bitmap.clone());
snap_next_iter_.back() = it;
} else {
// add new snapshot
snapshots_.push_back(
std::make_pair(dump_ts, std::move(bitmap.clone())));
Assert(it != accessor.end() && it.good());
snap_next_iter_.push_back(it);
}

LOG_INFO(
"dump delete record snapshot at ts: {}, cursor: {}, "
"current snapshot size: {}",
dump_ts,
dumped_size + DUMP_BATCH_SIZE,
snapshots_.size());
last_dump_ts = dump_ts;
// std::cout
// << fmt::format(
// "dump delete record snapshot at ts: {}, cursor: {}, "
// "current snapshot size: {}",
// dump_ts,
// dumped_size + DUMP_BATCH_SIZE,
// snapshots_.size())
// << std::endl;
}

dumped_size += DUMP_BATCH_SIZE;
}
}
}

template <bool is_sealed>
void
DeletedRecord<is_sealed>::Query(BitsetTypeView& bitset,
int64_t insert_barrier,
Timestamp query_timestamp) {
Assert(bitset.size() == insert_barrier);

SortedDeleteList::Accessor accessor(deleted_lists_);
if (accessor.size() == 0) {
return;
}

// try use snapshot to skip iterations
bool hit_snapshot = false;
SortedDeleteList::iterator next_iter;
if (!snapshots_.empty()) {
int loc = snapshots_.size() - 1;
// find last meeted snapshot
{
std::shared_lock<std::shared_mutex> lock(snap_lock_);
while (snapshots_[loc].first > query_timestamp && loc >= 0) {
loc--;
}
if (loc >= 0) {
next_iter = snap_next_iter_[loc];
Assert(snapshots_[loc].second.size() >= bitset.size());
bitset.inplace_and_with_count(snapshots_[loc].second,
bitset.size());
hit_snapshot = true;
}
}
}

auto start_iter = hit_snapshot ? next_iter : accessor.begin();
auto end_iter = accessor.lower_bound(std::make_pair(query_timestamp, 0));

auto it = start_iter;
while (it != accessor.end() && it != end_iter) {
AssertInfo(it->second <= insert_barrier,
"delete record beyond insert barrier, {} : {}",
it->second,
insert_barrier);
bitset.set(it->second);
it++;
}
while (it != accessor.end() && it->first == query_timestamp) {
AssertInfo(it->second <= insert_barrier,
"delete record beyond insert barrier, {} : {}",
it->second,
insert_barrier);
bitset.set(it->second);
it++;
}
}

} // namespace milvus::segcore
Loading

0 comments on commit 9d2606d

Please sign in to comment.