Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enhance: mark duplicated pk as deleted #34586

Merged
merged 1 commit into from
Jul 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
164 changes: 70 additions & 94 deletions internal/core/src/segcore/DeletedRecord.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,106 +17,84 @@
#include <tuple>
#include <utility>
#include <vector>
#include <folly/ConcurrentSkipList.h>

#include "AckResponder.h"
#include "common/Schema.h"
#include "common/Types.h"
#include "segcore/Record.h"
#include "segcore/InsertRecord.h"
#include "ConcurrentVector.h"

namespace milvus::segcore {

struct DeletedRecord {
struct TmpBitmap {
// Just for query
int64_t del_barrier = 0;
BitsetTypePtr bitmap_ptr;

std::shared_ptr<TmpBitmap>
clone(int64_t capacity);
};
static constexpr int64_t deprecated_size_per_chunk = 32 * 1024;
DeletedRecord()
: lru_(std::make_shared<TmpBitmap>()),
timestamps_(deprecated_size_per_chunk),
pks_(deprecated_size_per_chunk) {
lru_->bitmap_ptr = std::make_shared<BitsetType>();
}

auto
get_lru_entry() {
std::shared_lock lck(shared_mutex_);
return lru_;
struct Comparator {
bool
operator()(const std::pair<Timestamp, std::set<int64_t>>& left,
const std::pair<Timestamp, std::set<int64_t>>& right) const {
return left.first < right.first;
}
};

std::shared_ptr<TmpBitmap>
clone_lru_entry(int64_t insert_barrier,
int64_t del_barrier,
int64_t& old_del_barrier,
bool& hit_cache) {
std::shared_lock lck(shared_mutex_);
auto res = lru_->clone(insert_barrier);
old_del_barrier = lru_->del_barrier;

if (lru_->bitmap_ptr->size() == insert_barrier &&
lru_->del_barrier == del_barrier) {
hit_cache = true;
} else {
res->del_barrier = del_barrier;
}
using TSkipList =
folly::ConcurrentSkipList<std::pair<Timestamp, std::set<int64_t>>,
Comparator>;

return res;
template <bool is_sealed = false>
class DeletedRecord {
public:
DeletedRecord(InsertRecord<is_sealed>* insert_record)
: insert_record_(insert_record),
deleted_pairs_(TSkipList::createInstance()) {
}

DeletedRecord(DeletedRecord<is_sealed>&& delete_record) = delete;
DeletedRecord<is_sealed>&
operator=(DeletedRecord<is_sealed>&& delete_record) = delete;

void
insert_lru_entry(std::shared_ptr<TmpBitmap> new_entry, bool force = false) {
std::lock_guard lck(shared_mutex_);
if (new_entry->del_barrier <= lru_->del_barrier) {
if (!force ||
new_entry->bitmap_ptr->size() <= lru_->bitmap_ptr->size()) {
// DO NOTHING
return;
Push(const std::vector<PkType>& pks, const Timestamp* timestamps) {
std::unique_lock<std::shared_mutex> lck(mutex_);
int64_t removed_num = 0;
int64_t mem_add = 0;
for (size_t i = 0; i < pks.size(); ++i) {
auto offsets = insert_record_->search_pk(pks[i], timestamps[i]);
for (auto offset : offsets) {
int64_t insert_row_offset = offset.get();
// Assert(insert_record->timestamps_.size() >= insert_row_offset);
if (insert_record_->timestamps_[insert_row_offset] <
timestamps[i]) {
InsertIntoInnerPairs(timestamps[i], {insert_row_offset});
removed_num++;
mem_add += sizeof(Timestamp) + sizeof(int64_t);
}
}
}
lru_ = std::move(new_entry);
n_.fetch_add(removed_num);
mem_size_.fetch_add(mem_add);
}

void
push(const std::vector<PkType>& pks, const Timestamp* timestamps) {
std::lock_guard lck(buffer_mutex_);

auto size = pks.size();
ssize_t divide_point = 0;
auto n = n_.load();
// Truncate the overlapping prefix
if (n > 0) {
auto last = timestamps_[n - 1];
divide_point =
std::lower_bound(timestamps, timestamps + size, last + 1) -
timestamps;
}

// All these delete records have been applied
if (divide_point == size) {
Query(BitsetType& bitset, int64_t insert_barrier, Timestamp timestamp) {
Assert(bitset.size() == insert_barrier);
// TODO: add cache to bitset
if (deleted_pairs_.size() == 0) {
return;
}
auto end = deleted_pairs_.lower_bound(
std::make_pair(timestamp, std::set<int64_t>{}));
for (auto it = deleted_pairs_.begin(); it != end; it++) {
for (auto& v : it->second) {
bitset.set(v);
}
}

size -= divide_point;
pks_.set_data_raw(n, pks.data() + divide_point, size);
timestamps_.set_data_raw(n, timestamps + divide_point, size);
n_ += size;
mem_size_ += sizeof(Timestamp) * size +
CalcPksSize(pks.data() + divide_point, size);
}

const ConcurrentVector<Timestamp>&
timestamps() const {
return timestamps_;
}

const ConcurrentVector<PkType>&
pks() const {
return pks_;
// handle the case where end points to an element with the same timestamp
if (end != deleted_pairs_.end() && end->first == timestamp) {
for (auto& v : end->second) {
bitset.set(v);
}
}
}

int64_t
Expand All @@ -130,26 +108,24 @@ struct DeletedRecord {
}

private:
std::shared_ptr<TmpBitmap> lru_;
std::shared_mutex shared_mutex_;
void
InsertIntoInnerPairs(Timestamp ts, std::set<int64_t> offsets) {
auto it = deleted_pairs_.find(std::make_pair(ts, std::set<int64_t>{}));
if (it == deleted_pairs_.end()) {
deleted_pairs_.insert(std::make_pair(ts, offsets));
} else {
for (auto& val : offsets) {
it->second.insert(val);
}
}
}

std::shared_mutex buffer_mutex_;
private:
std::shared_mutex mutex_;
std::atomic<int64_t> n_ = 0;
std::atomic<int64_t> mem_size_ = 0;
ConcurrentVector<Timestamp> timestamps_;
ConcurrentVector<PkType> pks_;
InsertRecord<is_sealed>* insert_record_;
TSkipList::Accessor deleted_pairs_;
};

inline auto
DeletedRecord::TmpBitmap::clone(int64_t capacity)
-> std::shared_ptr<TmpBitmap> {
auto res = std::make_shared<TmpBitmap>();
res->del_barrier = this->del_barrier;
// res->bitmap_ptr = std::make_shared<BitsetType>();
// *(res->bitmap_ptr) = *(this->bitmap_ptr);
res->bitmap_ptr = std::make_shared<BitsetType>(this->bitmap_ptr->clone());
res->bitmap_ptr->resize(capacity, false);
return res;
}

} // namespace milvus::segcore
134 changes: 134 additions & 0 deletions internal/core/src/segcore/InsertRecord.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,12 @@ class OffsetMap {

virtual void
clear() = 0;

virtual std::pair<std::vector<PkType>, std::vector<Timestamp>>
get_need_removed_pks(const ConcurrentVector<Timestamp>& timestamps) = 0;

virtual void
remove_duplicate_pks(const ConcurrentVector<Timestamp>& timestamps) = 0;
};

template <typename T>
Expand Down Expand Up @@ -96,6 +102,57 @@ class OffsetOrderedMap : public OffsetMap {
map_[std::get<T>(pk)].emplace_back(offset);
}

std::pair<std::vector<PkType>, std::vector<Timestamp>>
get_need_removed_pks(const ConcurrentVector<Timestamp>& timestamps) {
std::shared_lock<std::shared_mutex> lck(mtx_);
std::vector<PkType> remove_pks;
std::vector<Timestamp> remove_timestamps;

for (auto& [pk, offsets] : map_) {
if (offsets.size() > 1) {
// find max timestamp offset
int64_t max_timestamp_offset = 0;
for (auto& offset : offsets) {
if (timestamps[offset] > timestamps[max_timestamp_offset]) {
max_timestamp_offset = offset;
}
}

remove_pks.push_back(pk);
remove_timestamps.push_back(timestamps[max_timestamp_offset]);
}
}

return std::make_pair(remove_pks, remove_timestamps);
}

void
remove_duplicate_pks(
const ConcurrentVector<Timestamp>& timestamps) override {
std::unique_lock<std::shared_mutex> lck(mtx_);

for (auto& [pk, offsets] : map_) {
if (offsets.size() > 1) {
// find max timestamp offset
int64_t max_timestamp_offset = 0;
for (auto& offset : offsets) {
if (timestamps[offset] > timestamps[max_timestamp_offset]) {
max_timestamp_offset = offset;
}
}

// remove other offsets from pk index
offsets.erase(
std::remove_if(offsets.begin(),
offsets.end(),
[max_timestamp_offset](int64_t val) {
return val != max_timestamp_offset;
}),
offsets.end());
}
}
}

void
seal() override {
PanicInfo(
Expand Down Expand Up @@ -219,6 +276,63 @@ class OffsetOrderedArray : public OffsetMap {
std::make_pair(std::get<T>(pk), static_cast<int32_t>(offset)));
}

std::pair<std::vector<PkType>, std::vector<Timestamp>>
get_need_removed_pks(const ConcurrentVector<Timestamp>& timestamps) {
std::vector<PkType> remove_pks;
std::vector<Timestamp> remove_timestamps;

// cached pks(key, max_timestamp_offset)
std::unordered_map<T, int64_t> pks;
std::unordered_set<T> need_removed_pks;
for (auto it = array_.begin(); it != array_.end(); ++it) {
const T& key = it->first;
if (pks.find(key) == pks.end()) {
pks.insert({key, it->second});
} else {
need_removed_pks.insert(key);
if (timestamps[it->second] > timestamps[pks[key]]) {
pks[key] = it->second;
}
}
}

// return max_timestamps that removed pks
for (auto& pk : need_removed_pks) {
remove_pks.push_back(pk);
remove_timestamps.push_back(timestamps[pks[pk]]);
}
return std::make_pair(remove_pks, remove_timestamps);
}

void
remove_duplicate_pks(const ConcurrentVector<Timestamp>& timestamps) {
// cached pks(key, max_timestamp_offset)
std::unordered_map<T, int64_t> pks;
std::unordered_set<T> need_removed_pks;
for (auto it = array_.begin(); it != array_.end(); ++it) {
const T& key = it->first;
if (pks.find(key) == pks.end()) {
pks.insert({key, it->second});
} else {
need_removed_pks.insert(key);
if (timestamps[it->second] > timestamps[pks[key]]) {
pks[key] = it->second;
}
}
}

// remove duplicate pks
for (auto it = array_.begin(); it != array_.end();) {
const T& key = it->first;
auto max_offset = pks[key];
if (max_offset != it->second) {
it = array_.erase(it);
} else {
it++;
}
}
}

void
seal() override {
sort(array_.begin(), array_.end());
Expand Down Expand Up @@ -520,6 +634,26 @@ struct InsertRecord {
pk2offset_->insert(pk, offset);
}

bool
insert_with_check_existence(const PkType& pk, int64_t offset) {
std::lock_guard lck(shared_mutex_);
auto exist = pk2offset_->contain(pk);
pk2offset_->insert(pk, offset);
return exist;
}

std::pair<std::vector<PkType>, std::vector<Timestamp>>
get_need_removed_pks() {
std::lock_guard lck(shared_mutex_);
return pk2offset_->get_need_removed_pks(timestamps_);
}

void
remove_duplicate_pks() {
std::lock_guard lck(shared_mutex_);
pk2offset_->remove_duplicate_pks(timestamps_);
}

bool
empty_pks() const {
std::shared_lock lck(shared_mutex_);
Expand Down
2 changes: 2 additions & 0 deletions internal/core/src/segcore/SegmentGrowing.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ class SegmentGrowing : public SegmentInternalInterface {
return SegmentType::Growing;
}

virtual std::vector<SegOffset>
SearchPk(const PkType& pk, Timestamp ts) const = 0;
// virtual int64_t
// PreDelete(int64_t size) = 0;

Expand Down
Loading
Loading