Skip to content

Commit

Permalink
enhance: mark duplicated pk as deleted
Browse files Browse the repository at this point in the history
Signed-off-by: luzhang <[email protected]>
  • Loading branch information
luzhang committed Jul 15, 2024
1 parent d8e68cb commit 6b49755
Show file tree
Hide file tree
Showing 22 changed files with 560 additions and 374 deletions.
164 changes: 70 additions & 94 deletions internal/core/src/segcore/DeletedRecord.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,106 +17,84 @@
#include <tuple>
#include <utility>
#include <vector>
#include <folly/ConcurrentSkipList.h>

#include "AckResponder.h"
#include "common/Schema.h"
#include "common/Types.h"
#include "segcore/Record.h"
#include "segcore/InsertRecord.h"
#include "ConcurrentVector.h"

namespace milvus::segcore {

struct DeletedRecord {
struct TmpBitmap {
// Just for query
int64_t del_barrier = 0;
BitsetTypePtr bitmap_ptr;

std::shared_ptr<TmpBitmap>
clone(int64_t capacity);
};
static constexpr int64_t deprecated_size_per_chunk = 32 * 1024;
DeletedRecord()
: lru_(std::make_shared<TmpBitmap>()),
timestamps_(deprecated_size_per_chunk),
pks_(deprecated_size_per_chunk) {
lru_->bitmap_ptr = std::make_shared<BitsetType>();
}

auto
get_lru_entry() {
std::shared_lock lck(shared_mutex_);
return lru_;
struct Comparator {
bool
operator()(const std::pair<Timestamp, std::set<int64_t>>& left,
const std::pair<Timestamp, std::set<int64_t>>& right) const {
return left.first < right.first;
}
};

std::shared_ptr<TmpBitmap>
clone_lru_entry(int64_t insert_barrier,
int64_t del_barrier,
int64_t& old_del_barrier,
bool& hit_cache) {
std::shared_lock lck(shared_mutex_);
auto res = lru_->clone(insert_barrier);
old_del_barrier = lru_->del_barrier;

if (lru_->bitmap_ptr->size() == insert_barrier &&
lru_->del_barrier == del_barrier) {
hit_cache = true;
} else {
res->del_barrier = del_barrier;
}
using TSkipList =
folly::ConcurrentSkipList<std::pair<Timestamp, std::set<int64_t>>,
Comparator>;

return res;
template <bool is_sealed = false>
class DeletedRecord {
public:
DeletedRecord(InsertRecord<is_sealed>* insert_record)
: insert_record_(insert_record),
deleted_pairs_(TSkipList::createInstance()) {
}

DeletedRecord(DeletedRecord<is_sealed>&& delete_record) = delete;
DeletedRecord<is_sealed>&
operator=(DeletedRecord<is_sealed>&& delete_record) = delete;

void
insert_lru_entry(std::shared_ptr<TmpBitmap> new_entry, bool force = false) {
std::lock_guard lck(shared_mutex_);
if (new_entry->del_barrier <= lru_->del_barrier) {
if (!force ||
new_entry->bitmap_ptr->size() <= lru_->bitmap_ptr->size()) {
// DO NOTHING
return;
Push(const std::vector<PkType>& pks, const Timestamp* timestamps) {
std::unique_lock<std::shared_mutex> lck(mutex_);
int64_t removed_num = 0;
int64_t mem_add = 0;
for (size_t i = 0; i < pks.size(); ++i) {
auto offsets = insert_record_->search_pk(pks[i], timestamps[i]);
for (auto offset : offsets) {
int64_t insert_row_offset = offset.get();
// Assert(insert_record->timestamps_.size() >= insert_row_offset);
if (insert_record_->timestamps_[insert_row_offset] <
timestamps[i]) {
InsertIntoInnerPairs(timestamps[i], {insert_row_offset});
removed_num++;
mem_add += sizeof(Timestamp) + sizeof(int64_t);
}
}
}
lru_ = std::move(new_entry);
n_.fetch_add(removed_num);
mem_size_.fetch_add(mem_add);
}

void
push(const std::vector<PkType>& pks, const Timestamp* timestamps) {
std::lock_guard lck(buffer_mutex_);

auto size = pks.size();
ssize_t divide_point = 0;
auto n = n_.load();
// Truncate the overlapping prefix
if (n > 0) {
auto last = timestamps_[n - 1];
divide_point =
std::lower_bound(timestamps, timestamps + size, last + 1) -
timestamps;
}

// All these delete records have been applied
if (divide_point == size) {
Query(BitsetType& bitset, int64_t insert_barrier, Timestamp timestamp) {
Assert(bitset.size() == insert_barrier);
// TODO: add cache to bitset
if (deleted_pairs_.size() == 0) {
return;
}
auto end = deleted_pairs_.lower_bound(
std::make_pair(timestamp, std::set<int64_t>{}));
for (auto it = deleted_pairs_.begin(); it != end; it++) {
for (auto& v : it->second) {
bitset.set(v);
}
}

size -= divide_point;
pks_.set_data_raw(n, pks.data() + divide_point, size);
timestamps_.set_data_raw(n, timestamps + divide_point, size);
n_ += size;
mem_size_ += sizeof(Timestamp) * size +
CalcPksSize(pks.data() + divide_point, size);
}

const ConcurrentVector<Timestamp>&
timestamps() const {
return timestamps_;
}

const ConcurrentVector<PkType>&
pks() const {
return pks_;
// handle the case where end points to an element with the same timestamp
if (end != deleted_pairs_.end() && end->first == timestamp) {
for (auto& v : end->second) {
bitset.set(v);
}
}
}

int64_t
Expand All @@ -130,26 +108,24 @@ struct DeletedRecord {
}

private:
std::shared_ptr<TmpBitmap> lru_;
std::shared_mutex shared_mutex_;
void
InsertIntoInnerPairs(Timestamp ts, std::set<int64_t> offsets) {
auto it = deleted_pairs_.find(std::make_pair(ts, std::set<int64_t>{}));
if (it == deleted_pairs_.end()) {
deleted_pairs_.insert(std::make_pair(ts, offsets));
} else {
for (auto& val : offsets) {
it->second.insert(val);
}
}
}

std::shared_mutex buffer_mutex_;
private:
std::shared_mutex mutex_;
std::atomic<int64_t> n_ = 0;
std::atomic<int64_t> mem_size_ = 0;
ConcurrentVector<Timestamp> timestamps_;
ConcurrentVector<PkType> pks_;
InsertRecord<is_sealed>* insert_record_;
TSkipList::Accessor deleted_pairs_;
};

inline auto
DeletedRecord::TmpBitmap::clone(int64_t capacity)
-> std::shared_ptr<TmpBitmap> {
auto res = std::make_shared<TmpBitmap>();
res->del_barrier = this->del_barrier;
// res->bitmap_ptr = std::make_shared<BitsetType>();
// *(res->bitmap_ptr) = *(this->bitmap_ptr);
res->bitmap_ptr = std::make_shared<BitsetType>(this->bitmap_ptr->clone());
res->bitmap_ptr->resize(capacity, false);
return res;
}

} // namespace milvus::segcore
134 changes: 134 additions & 0 deletions internal/core/src/segcore/InsertRecord.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,12 @@ class OffsetMap {

virtual void
clear() = 0;

virtual std::pair<std::vector<PkType>, std::vector<Timestamp>>
get_need_removed_pks(const ConcurrentVector<Timestamp>& timestamps) = 0;

virtual void
remove_duplicate_pks(const ConcurrentVector<Timestamp>& timestamps) = 0;
};

template <typename T>
Expand Down Expand Up @@ -96,6 +102,57 @@ class OffsetOrderedMap : public OffsetMap {
map_[std::get<T>(pk)].emplace_back(offset);
}

std::pair<std::vector<PkType>, std::vector<Timestamp>>
get_need_removed_pks(const ConcurrentVector<Timestamp>& timestamps) {
std::shared_lock<std::shared_mutex> lck(mtx_);
std::vector<PkType> remove_pks;
std::vector<Timestamp> remove_timestamps;

for (auto& [pk, offsets] : map_) {
if (offsets.size() > 1) {
// find max timestamp offset
int64_t max_timestamp_offset = 0;
for (auto& offset : offsets) {
if (timestamps[offset] > timestamps[max_timestamp_offset]) {
max_timestamp_offset = offset;
}
}

remove_pks.push_back(pk);
remove_timestamps.push_back(timestamps[max_timestamp_offset]);
}
}

return std::make_pair(remove_pks, remove_timestamps);
}

void
remove_duplicate_pks(
const ConcurrentVector<Timestamp>& timestamps) override {
std::unique_lock<std::shared_mutex> lck(mtx_);

for (auto& [pk, offsets] : map_) {
if (offsets.size() > 1) {
// find max timestamp offset
int64_t max_timestamp_offset = 0;
for (auto& offset : offsets) {
if (timestamps[offset] > timestamps[max_timestamp_offset]) {
max_timestamp_offset = offset;
}
}

// remove other offsets from pk index
offsets.erase(
std::remove_if(offsets.begin(),
offsets.end(),
[max_timestamp_offset](int64_t val) {
return val != max_timestamp_offset;
}),
offsets.end());
}
}
}

void
seal() override {
PanicInfo(
Expand Down Expand Up @@ -219,6 +276,63 @@ class OffsetOrderedArray : public OffsetMap {
std::make_pair(std::get<T>(pk), static_cast<int32_t>(offset)));
}

std::pair<std::vector<PkType>, std::vector<Timestamp>>
get_need_removed_pks(const ConcurrentVector<Timestamp>& timestamps) {
std::vector<PkType> remove_pks;
std::vector<Timestamp> remove_timestamps;

// cached pks(key, max_timestamp_offset)
std::unordered_map<T, int64_t> pks;
std::unordered_set<T> need_removed_pks;
for (auto it = array_.begin(); it != array_.end(); ++it) {
const T& key = it->first;
if (pks.find(key) == pks.end()) {
pks.insert({key, it->second});
} else {
need_removed_pks.insert(key);
if (timestamps[it->second] > timestamps[pks[key]]) {
pks[key] = it->second;
}
}
}

// return max_timestamps that removed pks
for (auto& pk : need_removed_pks) {
remove_pks.push_back(pk);
remove_timestamps.push_back(timestamps[pks[pk]]);
}
return std::make_pair(remove_pks, remove_timestamps);
}

void
remove_duplicate_pks(const ConcurrentVector<Timestamp>& timestamps) {
// cached pks(key, max_timestamp_offset)
std::unordered_map<T, int64_t> pks;
std::unordered_set<T> need_removed_pks;
for (auto it = array_.begin(); it != array_.end(); ++it) {
const T& key = it->first;
if (pks.find(key) == pks.end()) {
pks.insert({key, it->second});
} else {
need_removed_pks.insert(key);
if (timestamps[it->second] > timestamps[pks[key]]) {
pks[key] = it->second;
}
}
}

// remove duplicate pks
for (auto it = array_.begin(); it != array_.end();) {
const T& key = it->first;
auto max_offset = pks[key];
if (max_offset != it->second) {
it = array_.erase(it);
} else {
it++;
}
}
}

void
seal() override {
sort(array_.begin(), array_.end());
Expand Down Expand Up @@ -520,6 +634,26 @@ struct InsertRecord {
pk2offset_->insert(pk, offset);
}

bool
insert_with_check_existence(const PkType& pk, int64_t offset) {
std::lock_guard lck(shared_mutex_);
auto exist = pk2offset_->contain(pk);
pk2offset_->insert(pk, offset);
return exist;
}

std::pair<std::vector<PkType>, std::vector<Timestamp>>
get_need_removed_pks() {
std::lock_guard lck(shared_mutex_);
return pk2offset_->get_need_removed_pks(timestamps_);
}

void
remove_duplicate_pks() {
std::lock_guard lck(shared_mutex_);
pk2offset_->remove_duplicate_pks(timestamps_);
}

bool
empty_pks() const {
std::shared_lock lck(shared_mutex_);
Expand Down
2 changes: 2 additions & 0 deletions internal/core/src/segcore/SegmentGrowing.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ class SegmentGrowing : public SegmentInternalInterface {
return SegmentType::Growing;
}

virtual std::vector<SegOffset>
SearchPk(const PkType& pk, Timestamp ts) const = 0;
// virtual int64_t
// PreDelete(int64_t size) = 0;

Expand Down
Loading

0 comments on commit 6b49755

Please sign in to comment.