diff --git a/internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp b/internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp index ce45f09333132..35e16935bfd9e 100644 --- a/internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp +++ b/internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp @@ -669,38 +669,7 @@ ChunkedSegmentSealedImpl::LoadDeletedRecord(const LoadDeletedRecordInfo& info) { ParsePksFromIDs(pks, field_meta.get_data_type(), *info.primary_keys); auto timestamps = reinterpret_cast(info.timestamps); - std::vector> ordering(size); - for (int i = 0; i < size; i++) { - ordering[i] = std::make_tuple(timestamps[i], pks[i]); - } - - if (!insert_record_.empty_pks()) { - auto end = std::remove_if( - ordering.begin(), - ordering.end(), - [&](const std::tuple& record) { - return !insert_record_.contain(std::get<1>(record)); - }); - size = end - ordering.begin(); - ordering.resize(size); - } - - // all record filtered - if (size == 0) { - return; - } - - std::sort(ordering.begin(), ordering.end()); - std::vector sort_pks(size); - std::vector sort_timestamps(size); - - for (int i = 0; i < size; i++) { - auto [t, pk] = ordering[i]; - sort_timestamps[i] = t; - sort_pks[i] = pk; - } - - deleted_record_.push(sort_pks, sort_timestamps.data()); + deleted_record_.LoadPush(pks, timestamps); } void @@ -858,35 +827,7 @@ void ChunkedSegmentSealedImpl::mask_with_delete(BitsetTypeView& bitset, int64_t ins_barrier, Timestamp timestamp) const { - auto del_barrier = get_barrier(get_deleted_record(), timestamp); - if (del_barrier == 0) { - return; - } - - auto bitmap_holder = std::shared_ptr(); - - auto search_fn = [this](const PkType& pk, int64_t barrier) { - return this->search_pk(pk, barrier); - }; - bitmap_holder = get_deleted_bitmap(del_barrier, - ins_barrier, - deleted_record_, - insert_record_, - timestamp, - is_sorted_by_pk_, - search_fn); - - if (!bitmap_holder || !bitmap_holder->bitmap_ptr) { - return; - } - auto& delete_bitset = *bitmap_holder->bitmap_ptr; - AssertInfo( - delete_bitset.size() == bitset.size(), - fmt::format( - "Deleted bitmap size:{} not equal to filtered bitmap size:{}", - delete_bitset.size(), - bitset.size())); - bitset |= delete_bitset; + deleted_record_.Query(bitset, ins_barrier, timestamp); } void @@ -1337,7 +1278,8 @@ ChunkedSegmentSealedImpl::ChunkedSegmentSealedImpl( id_(segment_id), col_index_meta_(index_meta), TEST_skip_index_for_retrieve_(TEST_skip_index_for_retrieve), - is_sorted_by_pk_(is_sorted_by_pk) { + is_sorted_by_pk_(is_sorted_by_pk), + deleted_record_(&insert_record_, this) { mmap_descriptor_ = std::shared_ptr( new storage::MmapChunkDescriptor({segment_id, SegmentType::Sealed})); auto mcm = storage::MmapManager::GetInstance().GetMmapChunkManager(); @@ -1974,7 +1916,7 @@ ChunkedSegmentSealedImpl::Delete(int64_t reserved_offset, // deprecated sort_pks[i] = pk; } - deleted_record_.push(sort_pks, sort_timestamps.data()); + deleted_record_.StreamPush(sort_pks, sort_timestamps.data()); return SegcoreError::success(); } diff --git a/internal/core/src/segcore/ChunkedSegmentSealedImpl.h b/internal/core/src/segcore/ChunkedSegmentSealedImpl.h index 60314f019ec26..8afeae8aef4ce 100644 --- a/internal/core/src/segcore/ChunkedSegmentSealedImpl.h +++ b/internal/core/src/segcore/ChunkedSegmentSealedImpl.h @@ -104,6 +104,11 @@ class ChunkedSegmentSealedImpl : public SegmentSealed { return stats_.mem_size.load() + deleted_record_.mem_size(); } + InsertRecord& + get_insert_record() override { + return insert_record_; + } + int64_t get_row_count() const override; @@ -293,6 +298,7 @@ class ChunkedSegmentSealedImpl : public SegmentSealed { // } else { num_rows_ = row_count; // } + deleted_record_.set_sealed_row_count(row_count); } void @@ -317,11 +323,6 @@ class ChunkedSegmentSealedImpl : public SegmentSealed { return system_ready_count_ == 2; } - const DeletedRecord& - get_deleted_record() const { - return deleted_record_; - } - std::pair, std::vector> search_ids(const IdArray& id_array, Timestamp timestamp) const override; @@ -362,7 +363,7 @@ class ChunkedSegmentSealedImpl : public SegmentSealed { InsertRecord insert_record_; // deleted pks - mutable DeletedRecord deleted_record_; + mutable DeletedRecord deleted_record_; LoadFieldDataInfo field_data_info_; diff --git a/internal/core/src/segcore/DeletedRecord.h b/internal/core/src/segcore/DeletedRecord.h index f2f0e2d8a0d0e..d7ebf3a4ab7b9 100644 --- a/internal/core/src/segcore/DeletedRecord.h +++ b/internal/core/src/segcore/DeletedRecord.h @@ -17,111 +17,262 @@ #include #include #include +#include #include "AckResponder.h" #include "common/Schema.h" #include "common/Types.h" #include "segcore/Record.h" +#include "segcore/InsertRecord.h" #include "ConcurrentVector.h" namespace milvus::segcore { -struct DeletedRecord { - struct TmpBitmap { - // Just for query - int64_t del_barrier = 0; - BitsetTypePtr bitmap_ptr; - - std::shared_ptr - clone(int64_t capacity); - }; - static constexpr int64_t deprecated_size_per_chunk = 32 * 1024; - DeletedRecord() - : lru_(std::make_shared()), - timestamps_(deprecated_size_per_chunk), - pks_(deprecated_size_per_chunk) { - lru_->bitmap_ptr = std::make_shared(); +using Offset = int32_t; + +struct Comparator { + bool + operator()(const std::pair& left, + const std::pair& right) const { + if (left.first == right.first) { + return left.second < right.second; + } + return left.first < right.first; } +}; - auto - get_lru_entry() { - std::shared_lock lck(shared_mutex_); - return lru_; +// a lock-free list for multi-thread insert && read +using SortedDeleteList = + folly::ConcurrentSkipList, Comparator>; + +static int32_t DUMP_BATCH_SIZE = 10000; +static int32_t DELETE_PAIR_SIZE = sizeof(std::pair); + +class SegmentInternalInterface; +template +class DeletedRecord { + public: + DeletedRecord(InsertRecord* insert_record, + SegmentInternalInterface* segment) + : insert_record_(insert_record), + segment_(segment), + deleted_lists_(SortedDeleteList::createInstance()) { } - std::shared_ptr - clone_lru_entry(int64_t insert_barrier, - int64_t del_barrier, - int64_t& old_del_barrier, - bool& hit_cache) { - std::shared_lock lck(shared_mutex_); - auto res = lru_->clone(insert_barrier); - old_del_barrier = lru_->del_barrier; - - if (lru_->bitmap_ptr->size() == insert_barrier && - lru_->del_barrier == del_barrier) { - hit_cache = true; - } else { - res->del_barrier = del_barrier; - } + // not binding segment, only for testing purposes + DeletedRecord(InsertRecord* insert_record) + : insert_record_(insert_record), + segment_(nullptr), + deleted_lists_(SortedDeleteList::createInstance()) { + } - return res; + ~DeletedRecord() { } + DeletedRecord(DeletedRecord&& delete_record) = delete; + + DeletedRecord& + operator=(DeletedRecord&& delete_record) = delete; + void - insert_lru_entry(std::shared_ptr new_entry, bool force = false) { - std::lock_guard lck(shared_mutex_); - if (new_entry->del_barrier <= lru_->del_barrier) { - if (!force || - new_entry->bitmap_ptr->size() <= lru_->bitmap_ptr->size()) { - // DO NOTHING - return; - } + LoadPush(const std::vector& pks, const Timestamp* timestamps) { + InternalPush(pks, timestamps); + + SortedDeleteList::Accessor accessor(deleted_lists_); + auto* last = accessor.last(); + Assert(last != nullptr); + max_load_timestamp_ = last->first; + } + + // stream push delete timestamps should be sorted outside of the interface + void + StreamPush(const std::vector& pks, const Timestamp* timestamps) { + InternalPush(pks, timestamps); + + bool can_dump = timestamps[0] >= max_load_timestamp_; + if (can_dump) { + DumpSnapshot(); } - lru_ = std::move(new_entry); } void - push(const std::vector& pks, const Timestamp* timestamps) { - std::lock_guard lck(buffer_mutex_); - - auto size = pks.size(); - ssize_t divide_point = 0; - auto n = n_.load(); - // Truncate the overlapping prefix - if (n > 0) { - auto last = timestamps_[n - 1]; - divide_point = - std::lower_bound(timestamps, timestamps + size, last + 1) - - timestamps; + InternalPush(const std::vector& pks, const Timestamp* timestamps) { + int64_t removed_num = 0; + int64_t mem_add = 0; + + SortedDeleteList::Accessor accessor(deleted_lists_); + for (size_t i = 0; i < pks.size(); ++i) { + auto deleted_pk = pks[i]; + auto deleted_ts = timestamps[i]; + std::vector offsets; + if (segment_) { + offsets = + std::move(segment_->search_pk(deleted_pk, deleted_ts)); + } else { + // only for testing + offsets = std::move( + insert_record_->search_pk(deleted_pk, deleted_ts)); + } + for (auto& offset : offsets) { + auto row_id = offset.get(); + // if alreay deleted, no need to add new record + if (deleted_mask_.size() > row_id && deleted_mask_[row_id]) { + continue; + } + // if insert record and delete record is same timestamp, + // delete not take effect on this record. + if (deleted_ts == insert_record_->timestamps_[row_id]) { + continue; + } + accessor.insert(std::make_pair(deleted_ts, row_id)); + if constexpr (is_sealed) { + Assert(deleted_mask_.size() > 0); + deleted_mask_.set(row_id); + } else { + // need to add mask size firstly for growing segment + deleted_mask_.resize(insert_record_->size()); + deleted_mask_.set(row_id); + } + removed_num++; + mem_add += DELETE_PAIR_SIZE; + } } - // All these delete records have been applied - if (divide_point == size) { + n_.fetch_add(removed_num); + mem_size_.fetch_add(mem_add); + } + + void + Query(BitsetTypeView& bitset, + int64_t insert_barrier, + Timestamp query_timestamp) { + Assert(bitset.size() == insert_barrier); + + SortedDeleteList::Accessor accessor(deleted_lists_); + if (accessor.size() == 0) { return; } - size -= divide_point; - pks_.set_data_raw(n, pks.data() + divide_point, size); - timestamps_.set_data_raw(n, timestamps + divide_point, size); - n_ += size; - mem_size_ += sizeof(Timestamp) * size + - CalcPksSize(pks.data() + divide_point, size); - } + // try use snapshot to skip iterations + bool hit_snapshot = false; + SortedDeleteList::iterator next_iter; + if (!snapshots_.empty()) { + int loc = snapshots_.size() - 1; + // find last meeted snapshot + { + std::shared_lock lock(snap_lock_); + while (snapshots_[loc].first > query_timestamp && loc >= 0) { + loc--; + } + if (loc >= 0) { + next_iter = snap_next_iter_[loc]; + Assert(snapshots_[loc].second.size() >= bitset.size()); + bitset.inplace_and_with_count(snapshots_[loc].second, + bitset.size()); + hit_snapshot = true; + } + } + } - const ConcurrentVector& - timestamps() const { - return timestamps_; + auto start_iter = hit_snapshot ? next_iter : accessor.begin(); + auto end_iter = + accessor.lower_bound(std::make_pair(query_timestamp, 0)); + + auto it = start_iter; + while (it != accessor.end() && it != end_iter) { + AssertInfo(it->second <= insert_barrier, + "delete record beyond insert barrier, {} : {}", + it->second, + insert_barrier); + bitset.set(it->second); + it++; + } + while (it != accessor.end() && it->first == query_timestamp) { + AssertInfo(it->second <= insert_barrier, + "delete record beyond insert barrier, {} : {}", + it->second, + insert_barrier); + bitset.set(it->second); + it++; + } } - const ConcurrentVector& - pks() const { - return pks_; + void + DumpSnapshot() { + SortedDeleteList::Accessor accessor(deleted_lists_); + auto total_size = accessor.size(); + auto dumped_size = + snapshots_.empty() ? 0 : snapshots_.size() * DUMP_BATCH_SIZE; + + while (total_size - dumped_size > DUMP_BATCH_SIZE) { + int32_t bitsize = 0; + if constexpr (is_sealed) { + bitsize = sealed_row_count_; + } else { + bitsize = insert_record_->size(); + } + BitsetType bitmap(bitsize, false); + + auto it = accessor.begin(); + Timestamp last_dump_ts = 0; + if (!snapshots_.empty()) { + it = snap_next_iter_.back(); + last_dump_ts = snapshots_.back().first; + bitmap.inplace_and_with_count(snapshots_.back().second, + snapshots_.back().second.size()); + } + + while (total_size - dumped_size > DUMP_BATCH_SIZE && + it != accessor.end()) { + Timestamp dump_ts = 0; + + for (auto size = 0; size < DUMP_BATCH_SIZE; ++it, ++size) { + bitmap.set(it->second); + if (size == DUMP_BATCH_SIZE - 1) { + dump_ts = it->first; + } + } + + { + std::unique_lock lock(snap_lock_); + if (dump_ts == last_dump_ts) { + // only update + snapshots_.back().second = std::move(bitmap.clone()); + snap_next_iter_.back() = it; + } else { + // add new snapshot + snapshots_.push_back( + std::make_pair(dump_ts, std::move(bitmap.clone()))); + Assert(it != accessor.end() && it.good()); + snap_next_iter_.push_back(it); + } + + LOG_INFO( + "dump delete record snapshot at ts: {}, cursor: {}, " + "current snapshot size: {}", + dump_ts, + dumped_size + DUMP_BATCH_SIZE, + snapshots_.size()); + last_dump_ts = dump_ts; + // std::cout + // << fmt::format( + // "dump delete record snapshot at ts: {}, cursor: {}, " + // "current snapshot size: {}", + // dump_ts, + // dumped_size + DUMP_BATCH_SIZE, + // snapshots_.size()) + // << std::endl; + } + + dumped_size += DUMP_BATCH_SIZE; + } + } } int64_t size() const { - return n_.load(); + SortedDeleteList::Accessor accessor(deleted_lists_); + return accessor.size(); } size_t @@ -129,27 +280,29 @@ struct DeletedRecord { return mem_size_.load(); } - private: - std::shared_ptr lru_; - std::shared_mutex shared_mutex_; + void + set_sealed_row_count(size_t row_count) { + sealed_row_count_ = row_count; + deleted_mask_.resize(row_count); + } - std::shared_mutex buffer_mutex_; + public: std::atomic n_ = 0; std::atomic mem_size_ = 0; - ConcurrentVector timestamps_; - ConcurrentVector pks_; -}; + InsertRecord* insert_record_; + SegmentInternalInterface* segment_; + std::shared_ptr deleted_lists_; + // max timestamp of deleted records which replayed in load process + Timestamp max_load_timestamp_{0}; + int32_t sealed_row_count_; + // used to remove duplicated deleted records for fast access + BitsetType deleted_mask_; -inline auto -DeletedRecord::TmpBitmap::clone(int64_t capacity) - -> std::shared_ptr { - auto res = std::make_shared(); - res->del_barrier = this->del_barrier; - // res->bitmap_ptr = std::make_shared(); - // *(res->bitmap_ptr) = *(this->bitmap_ptr); - res->bitmap_ptr = std::make_shared(this->bitmap_ptr->clone()); - res->bitmap_ptr->resize(capacity, false); - return res; -} + // dump snapshot low frequency + mutable std::shared_mutex snap_lock_; + std::vector> snapshots_; + // next delete record iterator that follows every snapshot + std::vector snap_next_iter_; +}; } // namespace milvus::segcore diff --git a/internal/core/src/segcore/SegmentGrowingImpl.cpp b/internal/core/src/segcore/SegmentGrowingImpl.cpp index b67191ddbb35b..b42c2b7316561 100644 --- a/internal/core/src/segcore/SegmentGrowingImpl.cpp +++ b/internal/core/src/segcore/SegmentGrowingImpl.cpp @@ -46,23 +46,7 @@ void SegmentGrowingImpl::mask_with_delete(BitsetTypeView& bitset, int64_t ins_barrier, Timestamp timestamp) const { - auto del_barrier = get_barrier(get_deleted_record(), timestamp); - if (del_barrier == 0) { - return; - } - auto bitmap_holder = get_deleted_bitmap( - del_barrier, ins_barrier, deleted_record_, insert_record_, timestamp); - if (!bitmap_holder || !bitmap_holder->bitmap_ptr) { - return; - } - auto& delete_bitset = *bitmap_holder->bitmap_ptr; - AssertInfo( - delete_bitset.size() == bitset.size(), - fmt::format( - "Deleted bitmap size:{} not equal to filtered bitmap size:{}", - delete_bitset.size(), - bitset.size())); - bitset |= delete_bitset; + deleted_record_.Query(bitset, ins_barrier, timestamp); } void @@ -344,7 +328,7 @@ SegmentGrowingImpl::Delete(int64_t reserved_begin, } // step 2: fill delete record - deleted_record_.push(sort_pks, sort_timestamps.data()); + deleted_record_.StreamPush(sort_pks, sort_timestamps.data()); return SegcoreError::success(); } @@ -364,38 +348,7 @@ SegmentGrowingImpl::LoadDeletedRecord(const LoadDeletedRecordInfo& info) { ParsePksFromIDs(pks, field_meta.get_data_type(), *info.primary_keys); auto timestamps = reinterpret_cast(info.timestamps); - std::vector> ordering(size); - for (int i = 0; i < size; i++) { - ordering[i] = std::make_tuple(timestamps[i], pks[i]); - } - - if (!insert_record_.empty_pks()) { - auto end = std::remove_if( - ordering.begin(), - ordering.end(), - [&](const std::tuple& record) { - return !insert_record_.contain(std::get<1>(record)); - }); - size = end - ordering.begin(); - ordering.resize(size); - } - - // all record filtered - if (size == 0) { - return; - } - - std::sort(ordering.begin(), ordering.end()); - std::vector sort_pks(size); - std::vector sort_timestamps(size); - - for (int i = 0; i < size; i++) { - auto [t, pk] = ordering[i]; - sort_timestamps[i] = t; - sort_pks[i] = pk; - } - - deleted_record_.push(sort_pks, sort_timestamps.data()); + deleted_record_.LoadPush(pks, timestamps); } SpanBase diff --git a/internal/core/src/segcore/SegmentGrowingImpl.h b/internal/core/src/segcore/SegmentGrowingImpl.h index 4d28fd8115e6e..80c1d05d2e0f9 100644 --- a/internal/core/src/segcore/SegmentGrowingImpl.h +++ b/internal/core/src/segcore/SegmentGrowingImpl.h @@ -95,11 +95,6 @@ class SegmentGrowingImpl : public SegmentGrowing { return indexing_record_; } - const DeletedRecord& - get_deleted_record() const { - return deleted_record_; - } - std::shared_mutex& get_chunk_mutex() const { return chunk_mutex_; @@ -254,7 +249,8 @@ class SegmentGrowingImpl : public SegmentGrowing { insert_record_( *schema_, segcore_config.get_chunk_rows(), mmap_descriptor_), indexing_record_(*schema_, index_meta_, segcore_config_), - id_(segment_id) { + id_(segment_id), + deleted_record_(&insert_record_, this) { if (mmap_descriptor_ != nullptr) { LOG_INFO("growing segment {} use mmap to hold raw data", this->get_segment_id()); @@ -334,6 +330,16 @@ class SegmentGrowingImpl : public SegmentGrowing { return false; } + std::vector + search_pk(const PkType& pk, Timestamp timestamp) const override { + return insert_record_.search_pk(pk, timestamp); + } + + std::vector + search_pk(const PkType& pk, int64_t insert_barrier) const override { + return insert_record_.search_pk(pk, insert_barrier); + } + protected: int64_t num_chunk(FieldId field_id) const override; @@ -390,7 +396,7 @@ class SegmentGrowingImpl : public SegmentGrowing { mutable std::shared_mutex chunk_mutex_; // deleted pks - mutable DeletedRecord deleted_record_; + mutable DeletedRecord deleted_record_; int64_t id_; diff --git a/internal/core/src/segcore/SegmentInterface.h b/internal/core/src/segcore/SegmentInterface.h index f835153bc8e9c..091d0798b8f3d 100644 --- a/internal/core/src/segcore/SegmentInterface.h +++ b/internal/core/src/segcore/SegmentInterface.h @@ -19,7 +19,6 @@ #include #include -#include "DeletedRecord.h" #include "FieldIndexing.h" #include "common/Common.h" #include "common/Schema.h" @@ -444,6 +443,12 @@ class SegmentInternalInterface : public SegmentInterface { int64_t count, const std::vector& dynamic_field_names) const = 0; + virtual std::vector + search_pk(const PkType& pk, Timestamp timestamp) const = 0; + + virtual std::vector + search_pk(const PkType& pk, int64_t insert_barrier) const = 0; + protected: mutable std::shared_mutex mutex_; // fieldID -> std::pair diff --git a/internal/core/src/segcore/SegmentSealed.h b/internal/core/src/segcore/SegmentSealed.h index b84b3b9b94d5c..5078fbc11a5c6 100644 --- a/internal/core/src/segcore/SegmentSealed.h +++ b/internal/core/src/segcore/SegmentSealed.h @@ -52,6 +52,9 @@ class SegmentSealed : public SegmentInternalInterface { LoadTextIndex(FieldId field_id, std::unique_ptr index) = 0; + virtual InsertRecord& + get_insert_record() = 0; + SegmentType type() const override { return SegmentType::Sealed; diff --git a/internal/core/src/segcore/SegmentSealedImpl.cpp b/internal/core/src/segcore/SegmentSealedImpl.cpp index bfd847df1f753..17ffb2a3d71c5 100644 --- a/internal/core/src/segcore/SegmentSealedImpl.cpp +++ b/internal/core/src/segcore/SegmentSealedImpl.cpp @@ -667,38 +667,7 @@ SegmentSealedImpl::LoadDeletedRecord(const LoadDeletedRecordInfo& info) { ParsePksFromIDs(pks, field_meta.get_data_type(), *info.primary_keys); auto timestamps = reinterpret_cast(info.timestamps); - std::vector> ordering(size); - for (int i = 0; i < size; i++) { - ordering[i] = std::make_tuple(timestamps[i], pks[i]); - } - - if (!insert_record_.empty_pks()) { - auto end = std::remove_if( - ordering.begin(), - ordering.end(), - [&](const std::tuple& record) { - return !insert_record_.contain(std::get<1>(record)); - }); - size = end - ordering.begin(); - ordering.resize(size); - } - - // all record filtered - if (size == 0) { - return; - } - - std::sort(ordering.begin(), ordering.end()); - std::vector sort_pks(size); - std::vector sort_timestamps(size); - - for (int i = 0; i < size; i++) { - auto [t, pk] = ordering[i]; - sort_timestamps[i] = t; - sort_pks[i] = pk; - } - - deleted_record_.push(sort_pks, sort_timestamps.data()); + deleted_record_.LoadPush(pks, timestamps); } void @@ -903,35 +872,7 @@ void SegmentSealedImpl::mask_with_delete(BitsetTypeView& bitset, int64_t ins_barrier, Timestamp timestamp) const { - auto del_barrier = get_barrier(get_deleted_record(), timestamp); - if (del_barrier == 0) { - return; - } - - auto bitmap_holder = std::shared_ptr(); - - auto search_fn = [this](const PkType& pk, int64_t barrier) { - return this->search_pk(pk, barrier); - }; - bitmap_holder = get_deleted_bitmap(del_barrier, - ins_barrier, - deleted_record_, - insert_record_, - timestamp, - is_sorted_by_pk_, - search_fn); - - if (!bitmap_holder || !bitmap_holder->bitmap_ptr) { - return; - } - auto& delete_bitset = *bitmap_holder->bitmap_ptr; - AssertInfo( - delete_bitset.size() == bitset.size(), - fmt::format( - "Deleted bitmap size:{} not equal to filtered bitmap size:{}", - delete_bitset.size(), - bitset.size())); - bitset |= delete_bitset; + deleted_record_.Query(bitset, ins_barrier, timestamp); } void @@ -1255,7 +1196,8 @@ SegmentSealedImpl::SegmentSealedImpl(SchemaPtr schema, id_(segment_id), col_index_meta_(index_meta), TEST_skip_index_for_retrieve_(TEST_skip_index_for_retrieve), - is_sorted_by_pk_(is_sorted_by_pk) { + is_sorted_by_pk_(is_sorted_by_pk), + deleted_record_(&insert_record_, this) { mmap_descriptor_ = std::shared_ptr( new storage::MmapChunkDescriptor({segment_id, SegmentType::Sealed})); auto mcm = storage::MmapManager::GetInstance().GetMmapChunkManager(); @@ -1721,11 +1663,7 @@ SegmentSealedImpl::search_ids(const IdArray& id_array, for (auto& pk : pks) { std::vector pk_offsets; - if (!is_sorted_by_pk_) { - pk_offsets = insert_record_.search_pk(pk, timestamp); - } else { - pk_offsets = search_pk(pk, timestamp); - } + pk_offsets = search_pk(pk, timestamp); for (auto offset : pk_offsets) { switch (data_type) { case DataType::INT64: { @@ -1825,7 +1763,7 @@ SegmentSealedImpl::Delete(int64_t reserved_offset, // deprecated sort_pks[i] = pk; } - deleted_record_.push(sort_pks, sort_timestamps.data()); + deleted_record_.StreamPush(sort_pks, sort_timestamps.data()); return SegcoreError::success(); } diff --git a/internal/core/src/segcore/SegmentSealedImpl.h b/internal/core/src/segcore/SegmentSealedImpl.h index cc16f5568a831..3aac2d19ee42e 100644 --- a/internal/core/src/segcore/SegmentSealedImpl.h +++ b/internal/core/src/segcore/SegmentSealedImpl.h @@ -139,6 +139,11 @@ class SegmentSealedImpl : public SegmentSealed { return it->second->IsNullable(); }; + InsertRecord& + get_insert_record() override { + return insert_record_; + } + public: int64_t num_chunk_index(FieldId field_id) const override; @@ -292,6 +297,7 @@ class SegmentSealedImpl : public SegmentSealed { // } else { num_rows_ = row_count; // } + deleted_record_.set_sealed_row_count(row_count); } void @@ -316,11 +322,6 @@ class SegmentSealedImpl : public SegmentSealed { return system_ready_count_ == 2; } - const DeletedRecord& - get_deleted_record() const { - return deleted_record_; - } - std::pair, std::vector> search_ids(const IdArray& id_array, Timestamp timestamp) const override; @@ -361,7 +362,7 @@ class SegmentSealedImpl : public SegmentSealed { InsertRecord insert_record_; // deleted pks - mutable DeletedRecord deleted_record_; + mutable DeletedRecord deleted_record_; LoadFieldDataInfo field_data_info_; diff --git a/internal/core/src/segcore/Utils.h b/internal/core/src/segcore/Utils.h index ed6336aa9d4c3..a946fae20bec2 100644 --- a/internal/core/src/segcore/Utils.h +++ b/internal/core/src/segcore/Utils.h @@ -108,82 +108,6 @@ std::unique_ptr MergeDataArray(std::vector& merge_bases, const FieldMeta& field_meta); -template -std::shared_ptr -get_deleted_bitmap( - int64_t del_barrier, - int64_t insert_barrier, - DeletedRecord& delete_record, - const InsertRecord& insert_record, - Timestamp query_timestamp, - bool is_sorted_by_pk = false, - const std::function(const PkType&, int64_t)>& - search_fn = nullptr) { - // if insert_barrier and del_barrier have not changed, use cache data directly - bool hit_cache = false; - int64_t old_del_barrier = 0; - auto current = delete_record.clone_lru_entry( - insert_barrier, del_barrier, old_del_barrier, hit_cache); - if (hit_cache) { - return current; - } - - auto bitmap = current->bitmap_ptr; - - int64_t start, end; - if (del_barrier < old_del_barrier) { - // in this case, ts of delete record[current_del_barrier : old_del_barrier] > query_timestamp - // so these deletion records do not take effect in query/search - // so bitmap corresponding to those pks in delete record[current_del_barrier:old_del_barrier] will be reset to 0 - // for example, current_del_barrier = 2, query_time = 120, the bitmap will be reset to [0, 1, 1, 0, 0, 0, 0, 0] - start = del_barrier; - end = old_del_barrier; - } else { - // the cache is not enough, so update bitmap using new pks in delete record[old_del_barrier:current_del_barrier] - // for example, current_del_barrier = 4, query_time = 300, bitmap will be updated to [0, 1, 1, 0, 1, 1, 0, 0] - start = old_del_barrier; - end = del_barrier; - } - - // Avoid invalid calculations when there are a lot of repeated delete pks - std::unordered_map delete_timestamps; - for (auto del_index = start; del_index < end; ++del_index) { - auto pk = delete_record.pks()[del_index]; - auto timestamp = delete_record.timestamps()[del_index]; - - delete_timestamps[pk] = timestamp > delete_timestamps[pk] - ? timestamp - : delete_timestamps[pk]; - } - - for (auto& [pk, timestamp] : delete_timestamps) { - auto segOffsets = is_sorted_by_pk - ? search_fn(pk, insert_barrier) - : insert_record.search_pk(pk, insert_barrier); - for (auto offset : segOffsets) { - int64_t insert_row_offset = offset.get(); - - // The deletion record do not take effect in search/query, - // and reset bitmap to 0 - if (timestamp > query_timestamp) { - bitmap->reset(insert_row_offset); - continue; - } - // Insert after delete with same pk, delete will not task effect on this insert record, - // and reset bitmap to 0 - if (insert_record.timestamps_[insert_row_offset] >= timestamp) { - bitmap->reset(insert_row_offset); - continue; - } - // insert data corresponding to the insert_row_offset will be ignored in search/query - bitmap->set(insert_row_offset); - } - } - - delete_record.insert_lru_entry(current); - return current; -} - std::unique_ptr ReverseDataFromIndex(const index::IndexBase* index, const int64_t* seg_offsets, diff --git a/internal/core/unittest/CMakeLists.txt b/internal/core/unittest/CMakeLists.txt index 56123c7ef06e6..89587e8a7dc4a 100644 --- a/internal/core/unittest/CMakeLists.txt +++ b/internal/core/unittest/CMakeLists.txt @@ -43,6 +43,7 @@ set(MILVUS_TEST_FILES test_c_tokenizer.cpp test_loading.cpp test_data_codec.cpp + test_delete_record.cpp test_disk_file_manager_test.cpp test_exec.cpp test_expr.cpp diff --git a/internal/core/unittest/test_c_api.cpp b/internal/core/unittest/test_c_api.cpp index 6cd11609038ef..61a2d86cde29e 100644 --- a/internal/core/unittest/test_c_api.cpp +++ b/internal/core/unittest/test_c_api.cpp @@ -1204,7 +1204,7 @@ TEST(CApiTest, InsertSamePkAfterDeleteOnGrowingSegment) { TEST(CApiTest, InsertSamePkAfterDeleteOnSealedSegment) { auto collection = NewCollection(get_default_schema_config()); CSegmentInterface segment; - auto status = NewSegment(collection, Sealed, -1, &segment, true); + auto status = NewSegment(collection, Sealed, -1, &segment, false); ASSERT_EQ(status.error_code, Success); auto col = (milvus::segcore::Collection*)collection; @@ -1215,6 +1215,7 @@ TEST(CApiTest, InsertSamePkAfterDeleteOnSealedSegment) { auto segment_interface = reinterpret_cast(segment); auto sealed_segment = dynamic_cast(segment_interface); SealedLoadFieldData(dataset, *sealed_segment); + sealed_segment->get_insert_record().seal_pks(); // delete data pks = {1, 2, 3}, timestamps = {4, 4, 4} std::vector delete_row_ids = {1, 2, 3}; @@ -3873,7 +3874,7 @@ TEST(CApiTest, Indexing_Expr_With_binary_Predicate_Term) { TEST(CApiTest, SealedSegmentTest) { auto collection = NewCollection(get_default_schema_config()); CSegmentInterface segment; - auto status = NewSegment(collection, Sealed, -1, &segment, true); + auto status = NewSegment(collection, Sealed, -1, &segment, false); ASSERT_EQ(status.error_code, Success); int N = 1000; diff --git a/internal/core/unittest/test_delete_record.cpp b/internal/core/unittest/test_delete_record.cpp new file mode 100644 index 0000000000000..e32697e0263f9 --- /dev/null +++ b/internal/core/unittest/test_delete_record.cpp @@ -0,0 +1,313 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License + +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "segcore/DeletedRecord.h" +#include "segcore/SegmentGrowingImpl.h" +#include "segcore/SegmentSealedImpl.h" +#include "segcore/SegmentGrowingImpl.h" +#include "test_utils/DataGen.h" + +using namespace milvus; +using namespace milvus::segcore; + +TEST(DeleteMVCC, common_case) { + auto schema = std::make_shared(); + auto pk = schema->AddDebugField("pk", DataType::INT64); + schema->set_primary_field_id(pk); + auto segment = CreateSealedSegment(schema); + ASSERT_EQ(0, segment->get_real_count()); + + // load insert: pk (0, 1, 2, 3, 4, 5, 6, 7, 8, 9) + // with timestamp ts (0, 1, 2, 3, 4, 5, 6, 7, 8, 9) + int64_t c = 10; + auto dataset = DataGen(schema, c); + auto pks = dataset.get_col(pk); + SealedLoadFieldData(dataset, *segment); + ASSERT_EQ(c, segment->get_real_count()); + auto& insert_record = segment->get_insert_record(); + DeletedRecord delete_record(&insert_record); + delete_record.set_sealed_row_count(c); + + // delete pk(1) at ts(10); + std::vector delete_ts = {10}; + std::vector delete_pk = {1}; + delete_record.StreamPush(delete_pk, delete_ts.data()); + ASSERT_EQ(1, delete_record.size()); + + { + BitsetType bitsets(c); + BitsetTypeView bitsets_view(bitsets); + auto insert_barrier = c; + // query at ts (10) + Timestamp query_timestamp = 10; + delete_record.Query(bitsets_view, insert_barrier, query_timestamp); + std::vector expected = {0, 1, 0, 0, 0, 0, 0, 0, 0, 0}; + for (int i = 0; i < c; i++) { + ASSERT_EQ(bitsets_view[i], expected[i]); + } + } + { + BitsetType bitsets(c); + BitsetTypeView bitsets_view(bitsets); + auto insert_barrier = c; + // query at ts (11) + Timestamp query_timestamp = 11; + // query at ts (11) + query_timestamp = 11; + delete_record.Query(bitsets_view, insert_barrier, query_timestamp); + std::vector expected = {0, 1, 0, 0, 0, 0, 0, 0, 0, 0}; + for (int i = 0; i < c; i++) { + ASSERT_EQ(bitsets_view[i], expected[i]); + } + } + + // delete pk(5) at ts(12) + delete_ts = {12}; + delete_pk = {5}; + delete_record.StreamPush(delete_pk, delete_ts.data()); + ASSERT_EQ(2, delete_record.size()); + + { + BitsetType bitsets(c); + BitsetTypeView bitsets_view(bitsets); + auto insert_barrier = c; + // query at ts (12) + Timestamp query_timestamp = 12; + delete_record.Query(bitsets_view, insert_barrier, query_timestamp); + std::vector expected = {0, 1, 0, 0, 0, 1, 0, 0, 0, 0}; + for (int i = 0; i < c; i++) { + ASSERT_EQ(bitsets_view[i], expected[i]); + } + } + + // delete at pk(1) at ts(13) again + delete_ts = {13}; + delete_pk = {1}; + delete_record.StreamPush(delete_pk, delete_ts.data()); + // not add new record, because already deleted. + ASSERT_EQ(2, delete_record.size()); + + { + BitsetType bitsets(c); + BitsetTypeView bitsets_view(bitsets); + auto insert_barrier = c; + // query at ts (14) + Timestamp query_timestamp = 14; + + delete_record.Query(bitsets_view, insert_barrier, query_timestamp); + std::vector expected = {0, 1, 0, 0, 0, 1, 0, 0, 0, 0}; + for (int i = 0; i < c; i++) { + ASSERT_EQ(bitsets_view[i], expected[i]); + } + } + + // delete pk(9) at ts(9) + delete_ts = {9}; + delete_pk = {9}; + delete_record.StreamPush(delete_pk, delete_ts.data()); + // not add new record, because insert also at ts(9) same as deleted + // delete not take effect. + ASSERT_EQ(2, delete_record.size()); + + { + BitsetType bitsets(c); + BitsetTypeView bitsets_view(bitsets); + auto insert_barrier = c; + // query at ts (14) + Timestamp query_timestamp = 14; + delete_record.Query(bitsets_view, insert_barrier, query_timestamp); + std::vector expected = {0, 1, 0, 0, 0, 1, 0, 0, 0, 0}; + for (int i = 0; i < c; i++) { + ASSERT_EQ(bitsets_view[i], expected[i]); + } + } +} + +TEST(DeleteMVCC, delete_exist_duplicate_pks) { + using namespace milvus; + using namespace milvus::query; + using namespace milvus::segcore; + + auto schema = std::make_shared(); + auto vec_fid = schema->AddDebugField( + "fakevec", DataType::VECTOR_FLOAT, 16, knowhere::metric::L2); + auto i64_fid = schema->AddDebugField("age", DataType::INT64); + schema->set_primary_field_id(i64_fid); + auto N = 10; + uint64_t seg_id = 101; + InsertRecord insert_record(*schema, N); + DeletedRecord delete_record(&insert_record); + + // insert pk: (0, 1, 1, 2, 2, 3, 4, 3, 2, 5) + // at ts: (0, 1, 2, 3, 4, 5, 6, 7, 8, 9) + std::vector age_data = {0, 1, 1, 2, 2, 3, 4, 3, 2, 5}; + std::vector tss(N); + for (int i = 0; i < N; ++i) { + tss[i] = i; + insert_record.insert_pk(age_data[i], i); + } + auto insert_offset = insert_record.reserved.fetch_add(N); + insert_record.timestamps_.set_data_raw(insert_offset, tss.data(), N); + auto field_data = insert_record.get_data_base(i64_fid); + field_data->set_data_raw(insert_offset, age_data.data(), N); + insert_record.ack_responder_.AddSegment(insert_offset, insert_offset + N); + + // delete pk(2) at ts(5) + std::vector delete_ts = {5}; + std::vector delete_pk = {2}; + delete_record.StreamPush(delete_pk, delete_ts.data()); + ASSERT_EQ(2, delete_record.size()); + + { + BitsetType bitsets(N); + BitsetTypeView bitsets_view(bitsets); + int64_t insert_barrier = N; + // query at ts (10) + Timestamp query_timestamp = 10; + delete_record.Query(bitsets_view, insert_barrier, query_timestamp); + std::vector expected = {0, 0, 0, 1, 1, 0, 0, 0, 0, 0}; + // two pk 2 at ts(3, 4) was deleted + for (int i = 0; i < N; i++) { + ASSERT_EQ(bitsets_view[i], expected[i]); + } + } + + // delete pk(3) at ts(6) + delete_ts = {6}; + delete_pk = {3}; + delete_record.StreamPush(delete_pk, delete_ts.data()); + ASSERT_EQ(3, delete_record.size()); + + { + BitsetType bitsets(N); + BitsetTypeView bitsets_view(bitsets); + int64_t insert_barrier = N; + // query at ts (10) + Timestamp query_timestamp = 10; + delete_record.Query(bitsets_view, insert_barrier, query_timestamp); + std::vector expected = {0, 0, 0, 1, 1, 1, 0, 0, 0, 0}; + // one pk 3 in ts(5) was deleted + for (int i = 0; i < N; i++) { + ASSERT_EQ(bitsets_view[i], expected[i]); + } + } + + // delete pk(3) at ts(9) again + delete_ts = {9}; + delete_pk = {3}; + delete_record.StreamPush(delete_pk, delete_ts.data()); + ASSERT_EQ(4, delete_record.size()); + + { + BitsetType bitsets(N); + BitsetTypeView bitsets_view(bitsets); + int64_t insert_barrier = N; + // query at ts (10) + Timestamp query_timestamp = 10; + delete_record.Query(bitsets_view, insert_barrier, query_timestamp); + std::vector expected = {0, 0, 0, 1, 1, 1, 0, 1, 0, 0}; + // pk 3 in ts(7) was deleted + for (int i = 0; i < N; i++) { + ASSERT_EQ(bitsets_view[i], expected[i]); + } + } + + // delete pk(2) at ts(9) again + delete_ts = {9}; + delete_pk = {2}; + delete_record.StreamPush(delete_pk, delete_ts.data()); + ASSERT_EQ(5, delete_record.size()); + + { + BitsetType bitsets(N); + BitsetTypeView bitsets_view(bitsets); + int64_t insert_barrier = N; + // query at ts (10) + Timestamp query_timestamp = 10; + delete_record.Query(bitsets_view, insert_barrier, query_timestamp); + std::vector expected = {0, 0, 0, 1, 1, 1, 0, 1, 1, 0}; + // pk 2 in ts(8) was deleted + for (int i = 0; i < N; i++) { + ASSERT_EQ(bitsets_view[i], expected[i]); + } + } +} + +TEST(DeleteMVCC, perform) { + using namespace milvus; + using namespace milvus::query; + using namespace milvus::segcore; + + auto schema = std::make_shared(); + auto vec_fid = schema->AddDebugField( + "fakevec", DataType::VECTOR_FLOAT, 16, knowhere::metric::L2); + auto i64_fid = schema->AddDebugField("age", DataType::INT64); + schema->set_primary_field_id(i64_fid); + auto N = 1000000; + uint64_t seg_id = 101; + InsertRecord insert_record(*schema, N); + DeletedRecord delete_record(&insert_record); + + std::vector age_data(N); + std::vector tss(N); + for (int i = 0; i < N; ++i) { + age_data[i] = i; + tss[i] = i; + insert_record.insert_pk(i, i); + } + auto insert_offset = insert_record.reserved.fetch_add(N); + insert_record.timestamps_.set_data_raw(insert_offset, tss.data(), N); + auto field_data = insert_record.get_data_base(i64_fid); + field_data->set_data_raw(insert_offset, age_data.data(), N); + insert_record.ack_responder_.AddSegment(insert_offset, insert_offset + N); + + auto DN = N / 2; + std::vector delete_ts(DN); + std::vector delete_pk(DN); + for (int i = 0; i < DN; ++i) { + delete_ts[i] = N + i; + delete_pk[i] = i; + } + auto start = std::chrono::steady_clock::now(); + delete_record.StreamPush(delete_pk, delete_ts.data()); + auto end = std::chrono::steady_clock::now(); + std::cout << "push cost:" + << std::chrono::duration_cast(end - + start) + .count() + << std::endl; + std::cout << delete_record.size() << std::endl; + + auto query_timestamp = delete_ts[DN - 1]; + auto insert_barrier = get_barrier(insert_record, query_timestamp); + BitsetType res_bitmap(insert_barrier); + BitsetTypeView res_view(res_bitmap); + start = std::chrono::steady_clock::now(); + delete_record.Query(res_view, insert_barrier, query_timestamp); + end = std::chrono::steady_clock::now(); + std::cout << "query cost:" + << std::chrono::duration_cast(end - + start) + .count() + << std::endl; +} diff --git a/internal/core/unittest/test_growing.cpp b/internal/core/unittest/test_growing.cpp index f53f214ba568d..898dad93f0c30 100644 --- a/internal/core/unittest/test_growing.cpp +++ b/internal/core/unittest/test_growing.cpp @@ -89,7 +89,7 @@ TEST(Growing, RealCount) { // delete all. auto del_offset3 = segment->get_deleted_count(); - ASSERT_EQ(del_offset3, half * 2); + ASSERT_EQ(del_offset3, half); auto del_ids3 = GenPKs(pks.begin(), pks.end()); auto del_tss3 = GenTss(c, c + half * 2); status = segment->Delete(del_offset3, c, del_ids3.get(), del_tss3.data()); diff --git a/internal/core/unittest/test_sealed.cpp b/internal/core/unittest/test_sealed.cpp index 5aaba895da676..15e7f50b8a90d 100644 --- a/internal/core/unittest/test_sealed.cpp +++ b/internal/core/unittest/test_sealed.cpp @@ -1294,6 +1294,7 @@ TEST(Sealed, DeleteCount) { auto pk = schema->AddDebugField("pk", DataType::INT64); schema->set_primary_field_id(pk); auto segment = CreateSealedSegment(schema); + segment->get_insert_record().seal_pks(); int64_t c = 10; auto offset = segment->get_deleted_count(); @@ -1305,9 +1306,8 @@ TEST(Sealed, DeleteCount) { auto status = segment->Delete(offset, c, pks.get(), tss.data()); ASSERT_TRUE(status.ok()); - // shouldn't be filtered for empty segment. auto cnt = segment->get_deleted_count(); - ASSERT_EQ(cnt, 10); + ASSERT_EQ(cnt, 0); } { auto schema = std::make_shared(); @@ -1374,7 +1374,7 @@ TEST(Sealed, RealCount) { // delete all. auto del_offset3 = segment->get_deleted_count(); - ASSERT_EQ(del_offset3, half * 2); + ASSERT_EQ(del_offset3, half); auto del_ids3 = GenPKs(pks.begin(), pks.end()); auto del_tss3 = GenTss(c, c + half * 2); status = segment->Delete(del_offset3, c, del_ids3.get(), del_tss3.data()); diff --git a/internal/core/unittest/test_utils.cpp b/internal/core/unittest/test_utils.cpp index 032ca1d1a0609..7861dd9baab58 100644 --- a/internal/core/unittest/test_utils.cpp +++ b/internal/core/unittest/test_utils.cpp @@ -64,7 +64,7 @@ TEST(Util, GetDeleteBitmap) { auto N = 10; uint64_t seg_id = 101; InsertRecord insert_record(*schema, N); - DeletedRecord delete_record; + DeletedRecord delete_record(&insert_record); // fill insert record, all insert records has same pk = 1, timestamps= {1 ... N} std::vector age_data(N); @@ -83,37 +83,14 @@ TEST(Util, GetDeleteBitmap) { // test case delete pk1(ts = 0) -> insert repeated pk1 (ts = {1 ... N}) -> query (ts = N) std::vector delete_ts = {0}; std::vector delete_pk = {1}; - delete_record.push(delete_pk, delete_ts.data()); + delete_record.StreamPush(delete_pk, delete_ts.data()); auto query_timestamp = tss[N - 1]; - auto del_barrier = get_barrier(delete_record, query_timestamp); auto insert_barrier = get_barrier(insert_record, query_timestamp); - auto res_bitmap = get_deleted_bitmap(del_barrier, - insert_barrier, - delete_record, - insert_record, - query_timestamp); - ASSERT_EQ(res_bitmap->bitmap_ptr->count(), 0); - - // test case insert repeated pk1 (ts = {1 ... N}) -> delete pk1 (ts = N) -> query (ts = N) - delete_ts = {uint64_t(N)}; - delete_pk = {1}; - delete_record.push(delete_pk, delete_ts.data()); - - del_barrier = get_barrier(delete_record, query_timestamp); - res_bitmap = get_deleted_bitmap(del_barrier, - insert_barrier, - delete_record, - insert_record, - query_timestamp); - ASSERT_EQ(res_bitmap->bitmap_ptr->count(), N - 1); - - // test case insert repeated pk1 (ts = {1 ... N}) -> delete pk1 (ts = N) -> query (ts = N/2) - query_timestamp = tss[N - 1] / 2; - del_barrier = get_barrier(delete_record, query_timestamp); - res_bitmap = get_deleted_bitmap( - del_barrier, N, delete_record, insert_record, query_timestamp); - ASSERT_EQ(res_bitmap->bitmap_ptr->count(), 0); + BitsetType res_bitmap(insert_barrier); + BitsetTypeView res_view(res_bitmap); + delete_record.Query(res_view, insert_barrier, query_timestamp); + ASSERT_EQ(res_view.count(), 0); } TEST(Util, OutOfRange) {