Skip to content

Commit

Permalink
enhance: revert remove duplicated pk function
Browse files Browse the repository at this point in the history
Revert "fix: fix query count(*) concurrently (milvus-io#35007)"

This reverts commit 86322e0.

Revert "enhance: mark duplicated pk as deleted (milvus-io#34586)"

This reverts commit 804dd54.
  • Loading branch information
luzhang committed Jul 30, 2024
1 parent cabb200 commit 6b35d89
Show file tree
Hide file tree
Showing 27 changed files with 314 additions and 556 deletions.
187 changes: 94 additions & 93 deletions internal/core/src/segcore/DeletedRecord.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,107 +17,106 @@
#include <tuple>
#include <utility>
#include <vector>
#include <folly/ConcurrentSkipList.h>

#include "AckResponder.h"
#include "common/Schema.h"
#include "common/Types.h"
#include "segcore/Record.h"
#include "segcore/InsertRecord.h"
#include "ConcurrentVector.h"

namespace milvus::segcore {

struct Comparator {
bool
operator()(const std::pair<Timestamp, std::set<int64_t>>& left,
const std::pair<Timestamp, std::set<int64_t>>& right) const {
return left.first < right.first;
struct DeletedRecord {
struct TmpBitmap {
// Just for query
int64_t del_barrier = 0;
BitsetTypePtr bitmap_ptr;

std::shared_ptr<TmpBitmap>
clone(int64_t capacity);
};
static constexpr int64_t deprecated_size_per_chunk = 32 * 1024;
DeletedRecord()
: lru_(std::make_shared<TmpBitmap>()),
timestamps_(deprecated_size_per_chunk),
pks_(deprecated_size_per_chunk) {
lru_->bitmap_ptr = std::make_shared<BitsetType>();
}
};

using TSkipList =
folly::ConcurrentSkipList<std::pair<Timestamp, std::set<int64_t>>,
Comparator>;

template <bool is_sealed = false>
class DeletedRecord {
public:
DeletedRecord(InsertRecord<is_sealed>* insert_record)
: insert_record_(insert_record),
deleted_pairs_(TSkipList::createInstance()) {
auto
get_lru_entry() {
std::shared_lock lck(shared_mutex_);
return lru_;
}

DeletedRecord(DeletedRecord<is_sealed>&& delete_record) = delete;
DeletedRecord<is_sealed>&
operator=(DeletedRecord<is_sealed>&& delete_record) = delete;
std::shared_ptr<TmpBitmap>
clone_lru_entry(int64_t insert_barrier,
int64_t del_barrier,
int64_t& old_del_barrier,
bool& hit_cache) {
std::shared_lock lck(shared_mutex_);
auto res = lru_->clone(insert_barrier);
old_del_barrier = lru_->del_barrier;

if (lru_->bitmap_ptr->size() == insert_barrier &&
lru_->del_barrier == del_barrier) {
hit_cache = true;
} else {
res->del_barrier = del_barrier;
}

return res;
}

void
Push(const std::vector<PkType>& pks, const Timestamp* timestamps) {
std::unique_lock<std::shared_mutex> lck(mutex_);
int64_t removed_num = 0;
int64_t mem_add = 0;
for (size_t i = 0; i < pks.size(); ++i) {
auto delete_pk = pks[i];
auto delete_timestamp = timestamps[i];
auto offsets =
insert_record_->search_pk(delete_pk, delete_timestamp);
bool has_duplicate_pk_timestamps = false;
for (auto offset : offsets) {
int64_t row_offset = offset.get();
auto row_timestamp = insert_record_->timestamps_[row_offset];
// Assert(insert_record->timestamps_.size() >= row_offset);
if (row_timestamp < delete_timestamp) {
InsertIntoInnerPairs(delete_timestamp, {row_offset});
removed_num++;
mem_add += sizeof(Timestamp) + sizeof(int64_t);
} else if (row_timestamp == delete_timestamp) {
// if insert record have multi same (pk, timestamp) pairs,
// need to remove the next pairs, just keep first
if (!has_duplicate_pk_timestamps) {
has_duplicate_pk_timestamps = true;
} else {
InsertIntoInnerPairs(delete_timestamp, {row_offset});
removed_num++;
mem_add += sizeof(Timestamp) + sizeof(int64_t);
}
}
insert_lru_entry(std::shared_ptr<TmpBitmap> new_entry, bool force = false) {
std::lock_guard lck(shared_mutex_);
if (new_entry->del_barrier <= lru_->del_barrier) {
if (!force ||
new_entry->bitmap_ptr->size() <= lru_->bitmap_ptr->size()) {
// DO NOTHING
return;
}
}
n_.fetch_add(removed_num);
mem_size_.fetch_add(mem_add);
lru_ = std::move(new_entry);
}

void
Query(BitsetType& bitset, int64_t insert_barrier, Timestamp timestamp) {
Assert(bitset.size() == insert_barrier);
// TODO: add cache to bitset
if (deleted_pairs_.size() == 0) {
return;
}
auto end = deleted_pairs_.lower_bound(
std::make_pair(timestamp, std::set<int64_t>{}));
for (auto it = deleted_pairs_.begin(); it != end; it++) {
// this may happen if lower_bound end is deleted_pairs_ end and
// other threads insert node to deleted_pairs_ concurrently
if (it->first > timestamp) {
break;
}
for (auto& v : it->second) {
if (v < insert_barrier) {
bitset.set(v);
}
}
push(const std::vector<PkType>& pks, const Timestamp* timestamps) {
std::lock_guard lck(buffer_mutex_);

auto size = pks.size();
ssize_t divide_point = 0;
auto n = n_.load();
// Truncate the overlapping prefix
if (n > 0) {
auto last = timestamps_[n - 1];
divide_point =
std::lower_bound(timestamps, timestamps + size, last + 1) -
timestamps;
}

// handle the case where end points to an element with the same timestamp
if (end != deleted_pairs_.end() && end->first == timestamp) {
for (auto& v : end->second) {
if (v < insert_barrier) {
bitset.set(v);
}
}
// All these delete records have been applied
if (divide_point == size) {
return;
}

size -= divide_point;
pks_.set_data_raw(n, pks.data() + divide_point, size);
timestamps_.set_data_raw(n, timestamps + divide_point, size);
n_ += size;
mem_size_ += sizeof(Timestamp) * size +
CalcPksSize(pks.data() + divide_point, size);
}

const ConcurrentVector<Timestamp>&
timestamps() const {
return timestamps_;
}

const ConcurrentVector<PkType>&
pks() const {
return pks_;
}

int64_t
Expand All @@ -131,24 +130,26 @@ class DeletedRecord {
}

private:
void
InsertIntoInnerPairs(Timestamp ts, std::set<int64_t> offsets) {
auto it = deleted_pairs_.find(std::make_pair(ts, std::set<int64_t>{}));
if (it == deleted_pairs_.end()) {
deleted_pairs_.insert(std::make_pair(ts, offsets));
} else {
for (auto& val : offsets) {
it->second.insert(val);
}
}
}
std::shared_ptr<TmpBitmap> lru_;
std::shared_mutex shared_mutex_;

private:
std::shared_mutex mutex_;
std::shared_mutex buffer_mutex_;
std::atomic<int64_t> n_ = 0;
std::atomic<int64_t> mem_size_ = 0;
InsertRecord<is_sealed>* insert_record_;
TSkipList::Accessor deleted_pairs_;
ConcurrentVector<Timestamp> timestamps_;
ConcurrentVector<PkType> pks_;
};

inline auto
DeletedRecord::TmpBitmap::clone(int64_t capacity)
-> std::shared_ptr<TmpBitmap> {
auto res = std::make_shared<TmpBitmap>();
res->del_barrier = this->del_barrier;
// res->bitmap_ptr = std::make_shared<BitsetType>();
// *(res->bitmap_ptr) = *(this->bitmap_ptr);
res->bitmap_ptr = std::make_shared<BitsetType>(this->bitmap_ptr->clone());
res->bitmap_ptr->resize(capacity, false);
return res;
}

} // namespace milvus::segcore
134 changes: 0 additions & 134 deletions internal/core/src/segcore/InsertRecord.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,6 @@ class OffsetMap {

virtual void
clear() = 0;

virtual std::pair<std::vector<PkType>, std::vector<Timestamp>>
get_need_removed_pks(const ConcurrentVector<Timestamp>& timestamps) = 0;

virtual void
remove_duplicate_pks(const ConcurrentVector<Timestamp>& timestamps) = 0;
};

template <typename T>
Expand Down Expand Up @@ -103,57 +97,6 @@ class OffsetOrderedMap : public OffsetMap {
map_[std::get<T>(pk)].emplace_back(offset);
}

std::pair<std::vector<PkType>, std::vector<Timestamp>>
get_need_removed_pks(const ConcurrentVector<Timestamp>& timestamps) {
std::shared_lock<std::shared_mutex> lck(mtx_);
std::vector<PkType> remove_pks;
std::vector<Timestamp> remove_timestamps;

for (auto& [pk, offsets] : map_) {
if (offsets.size() > 1) {
// find max timestamp offset
int64_t max_timestamp_offset = 0;
for (auto& offset : offsets) {
if (timestamps[offset] > timestamps[max_timestamp_offset]) {
max_timestamp_offset = offset;
}
}

remove_pks.push_back(pk);
remove_timestamps.push_back(timestamps[max_timestamp_offset]);
}
}

return std::make_pair(remove_pks, remove_timestamps);
}

void
remove_duplicate_pks(
const ConcurrentVector<Timestamp>& timestamps) override {
std::unique_lock<std::shared_mutex> lck(mtx_);

for (auto& [pk, offsets] : map_) {
if (offsets.size() > 1) {
// find max timestamp offset
int64_t max_timestamp_offset = 0;
for (auto& offset : offsets) {
if (timestamps[offset] > timestamps[max_timestamp_offset]) {
max_timestamp_offset = offset;
}
}

// remove other offsets from pk index
offsets.erase(
std::remove_if(offsets.begin(),
offsets.end(),
[max_timestamp_offset](int64_t val) {
return val != max_timestamp_offset;
}),
offsets.end());
}
}
}

void
seal() override {
PanicInfo(
Expand Down Expand Up @@ -277,63 +220,6 @@ class OffsetOrderedArray : public OffsetMap {
std::make_pair(std::get<T>(pk), static_cast<int32_t>(offset)));
}

std::pair<std::vector<PkType>, std::vector<Timestamp>>
get_need_removed_pks(const ConcurrentVector<Timestamp>& timestamps) {
std::vector<PkType> remove_pks;
std::vector<Timestamp> remove_timestamps;

// cached pks(key, max_timestamp_offset)
std::unordered_map<T, int64_t> pks;
std::unordered_set<T> need_removed_pks;
for (auto it = array_.begin(); it != array_.end(); ++it) {
const T& key = it->first;
if (pks.find(key) == pks.end()) {
pks.insert({key, it->second});
} else {
need_removed_pks.insert(key);
if (timestamps[it->second] > timestamps[pks[key]]) {
pks[key] = it->second;
}
}
}

// return max_timestamps that removed pks
for (auto& pk : need_removed_pks) {
remove_pks.push_back(pk);
remove_timestamps.push_back(timestamps[pks[pk]]);
}
return std::make_pair(remove_pks, remove_timestamps);
}

void
remove_duplicate_pks(const ConcurrentVector<Timestamp>& timestamps) {
// cached pks(key, max_timestamp_offset)
std::unordered_map<T, int64_t> pks;
std::unordered_set<T> need_removed_pks;
for (auto it = array_.begin(); it != array_.end(); ++it) {
const T& key = it->first;
if (pks.find(key) == pks.end()) {
pks.insert({key, it->second});
} else {
need_removed_pks.insert(key);
if (timestamps[it->second] > timestamps[pks[key]]) {
pks[key] = it->second;
}
}
}

// remove duplicate pks
for (auto it = array_.begin(); it != array_.end();) {
const T& key = it->first;
auto max_offset = pks[key];
if (max_offset != it->second) {
it = array_.erase(it);
} else {
it++;
}
}
}

void
seal() override {
sort(array_.begin(), array_.end());
Expand Down Expand Up @@ -696,26 +582,6 @@ struct InsertRecord {
pk2offset_->insert(pk, offset);
}

bool
insert_with_check_existence(const PkType& pk, int64_t offset) {
std::lock_guard lck(shared_mutex_);
auto exist = pk2offset_->contain(pk);
pk2offset_->insert(pk, offset);
return exist;
}

std::pair<std::vector<PkType>, std::vector<Timestamp>>
get_need_removed_pks() {
std::lock_guard lck(shared_mutex_);
return pk2offset_->get_need_removed_pks(timestamps_);
}

void
remove_duplicate_pks() {
std::lock_guard lck(shared_mutex_);
pk2offset_->remove_duplicate_pks(timestamps_);
}

bool
empty_pks() const {
std::shared_lock lck(shared_mutex_);
Expand Down
2 changes: 0 additions & 2 deletions internal/core/src/segcore/SegmentGrowing.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,6 @@ class SegmentGrowing : public SegmentInternalInterface {
return SegmentType::Growing;
}

virtual std::vector<SegOffset>
SearchPk(const PkType& pk, Timestamp ts) const = 0;
// virtual int64_t
// PreDelete(int64_t size) = 0;

Expand Down
Loading

0 comments on commit 6b35d89

Please sign in to comment.