Skip to content

Commit

Permalink
Delete only if primary keys exist (#25292)
Browse files Browse the repository at this point in the history
Signed-off-by: yah01 <[email protected]>
  • Loading branch information
yah01 authored Sep 20, 2023
1 parent 16b35e0 commit 93e2eb7
Show file tree
Hide file tree
Showing 11 changed files with 82 additions and 5 deletions.
26 changes: 26 additions & 0 deletions internal/core/src/segcore/InsertRecord.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ class OffsetMap {
public:
virtual ~OffsetMap() = default;

virtual bool
contain(const PkType& pk) const = 0;

virtual std::vector<int64_t>
find(const PkType& pk) const = 0;

Expand All @@ -65,6 +68,11 @@ class OffsetMap {
template <typename T>
class OffsetOrderedMap : public OffsetMap {
public:
bool
contain(const PkType& pk) const override {
return map_.find(std::get<T>(pk)) != map_.end();
}

std::vector<int64_t>
find(const PkType& pk) const override {
auto offset_vector = map_.find(std::get<T>(pk));
Expand Down Expand Up @@ -138,6 +146,19 @@ class OffsetOrderedMap : public OffsetMap {
template <typename T>
class OffsetOrderedArray : public OffsetMap {
public:
bool
contain(const PkType& pk) const override {
const T& target = std::get<T>(pk);
auto it =
std::lower_bound(array_.begin(),
array_.end(),
target,
[](const std::pair<T, int64_t>& elem,
const T& value) { return elem.first < value; });

return it != array_.end();
}

std::vector<int64_t>
find(const PkType& pk) const override {
check_search();
Expand Down Expand Up @@ -355,6 +376,11 @@ struct InsertRecord {
}
}

bool
contain(const PkType& pk) const {
return pk2offset_->contain(pk);
}

std::vector<SegOffset>
search_pk(const PkType& pk, Timestamp timestamp) const {
std::shared_lock lck(shared_mutex_);
Expand Down
9 changes: 9 additions & 0 deletions internal/core/src/segcore/SegmentGrowingImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,15 @@ SegmentGrowingImpl::Delete(int64_t reserved_begin,
std::vector<PkType> pks(size);
ParsePksFromIDs(pks, field_meta.get_data_type(), *ids);

// filter out the deletions that the primary key not exists
auto end = std::remove_if(pks.begin(), pks.end(), [&](const PkType& pk) {
return !insert_record_.contain(pk);
});
size = end - pks.begin();
if (size == 0) {
return SegcoreError::success();
}

// step 1: sort timestamp
std::vector<std::tuple<Timestamp, PkType>> ordering(size);
for (int i = 0; i < size; i++) {
Expand Down
7 changes: 6 additions & 1 deletion internal/core/src/segcore/SegmentGrowingImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
#include "InsertRecord.h"
#include "SealedIndexingRecord.h"
#include "SegmentGrowing.h"

#include "common/Types.h"
#include "common/EasyAssert.h"
#include "query/PlanNode.h"
#include "common/IndexMeta.h"
Expand All @@ -47,6 +47,11 @@ class SegmentGrowingImpl : public SegmentGrowing {
const Timestamp* timestamps,
const InsertData* insert_data) override;

bool
Contain(const PkType& pk) const override {
return insert_record_.contain(pk);
}

// TODO: add id into delete log, possibly bitmap
SegcoreError
Delete(int64_t reserved_offset,
Expand Down
3 changes: 3 additions & 0 deletions internal/core/src/segcore/SegmentInterface.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ class SegmentInterface {
virtual void
FillTargetEntry(const query::Plan* plan, SearchResult& results) const = 0;

virtual bool
Contain(const PkType& pk) const = 0;

virtual std::unique_ptr<SearchResult>
Search(const query::Plan* Plan,
const query::PlaceholderGroup* placeholder_group) const = 0;
Expand Down
10 changes: 10 additions & 0 deletions internal/core/src/segcore/SegmentSealedImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include <fcntl.h>
#include <fmt/core.h>

#include <algorithm>
#include <cstdint>
#include <filesystem>
#include <memory>
Expand Down Expand Up @@ -1136,6 +1137,15 @@ SegmentSealedImpl::Delete(int64_t reserved_offset, // deprecated
std::vector<PkType> pks(size);
ParsePksFromIDs(pks, field_meta.get_data_type(), *ids);

// filter out the deletions that the primary key not exists
auto end = std::remove_if(pks.begin(), pks.end(), [&](const PkType& pk) {
return !insert_record_.contain(pk);
});
size = end - pks.begin();
if (size == 0) {
return SegcoreError::success();
}

// step 1: sort timestamp
std::vector<std::tuple<Timestamp, PkType>> ordering(size);
for (int i = 0; i < size; i++) {
Expand Down
5 changes: 5 additions & 0 deletions internal/core/src/segcore/SegmentSealedImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@ class SegmentSealedImpl : public SegmentSealed {
bool
HasFieldData(FieldId field_id) const override;

bool
Contain(const PkType& pk) const override {
return insert_record_.contain(pk);
}

void
LoadFieldData(FieldId field_id, FieldDataInfo& data) override;
void
Expand Down
1 change: 1 addition & 0 deletions internal/core/src/segcore/segment_c.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "segcore/Collection.h"
#include "segcore/SegmentGrowingImpl.h"
#include "segcore/SegmentSealedImpl.h"
#include "segcore/Utils.h"
#include "storage/FieldData.h"
#include "storage/Util.h"
#include "mmap/Types.h"
Expand Down
6 changes: 6 additions & 0 deletions internal/core/src/segcore/segment_c.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,12 @@ AddFieldDataInfoForSealed(CSegmentInterface c_segment,
CLoadFieldDataInfo c_load_field_data_info);

////////////////////////////// interfaces for SegmentInterface //////////////////////////////
CStatus
ExistPk(CSegmentInterface c_segment,
const uint8_t* raw_ids,
const uint64_t size,
bool* results);

CStatus
Delete(CSegmentInterface c_segment,
int64_t reserved_offset,
Expand Down
6 changes: 5 additions & 1 deletion internal/core/unittest/test_c_api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,26 @@
#include <google/protobuf/text_format.h>
#include <gtest/gtest.h>

#include <array>
#include <boost/format.hpp>
#include <chrono>
#include <iostream>
#include <random>
#include <string>
#include <unordered_set>

#include "boost/container/vector.hpp"
#include "common/LoadInfo.h"
#include "common/Types.h"
#include "common/type_c.h"
#include "index/IndexFactory.h"
#include "knowhere/comp/index_param.h"
#include "pb/plan.pb.h"
#include "query/ExprImpl.h"
#include "segcore/Collection.h"
#include "segcore/Reduce.h"
#include "segcore/reduce_c.h"
#include "segcore/segment_c.h"
#include "test_utils/DataGen.h"
#include "test_utils/PbHelper.h"
#include "test_utils/indexbuilder_test_utils.h"
Expand Down Expand Up @@ -1151,7 +1155,7 @@ TEST(CApiTest, GetDeletedCountTest) {

// TODO: assert(deleted_count == len(delete_row_ids))
auto deleted_count = GetDeletedCount(segment);
ASSERT_EQ(deleted_count, delete_row_ids.size());
ASSERT_EQ(deleted_count, 0);

DeleteCollection(collection);
DeleteSegment(segment);
Expand Down
12 changes: 10 additions & 2 deletions internal/core/unittest/test_growing.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,18 @@ TEST(Growing, DeleteCount) {
int64_t c = 10;
auto offset = 0;

auto dataset = DataGen(schema, c);
auto pks = dataset.get_col<int64_t>(pk);
segment->Insert(offset,
c,
dataset.row_ids_.data(),
dataset.timestamps_.data(),
dataset.raw_);

Timestamp begin_ts = 100;
auto tss = GenTss(c, begin_ts);
auto pks = GenPKs(c, 0);
auto status = segment->Delete(offset, c, pks.get(), tss.data());
auto del_pks = GenPKs(pks.begin(), pks.end());
auto status = segment->Delete(offset, c, del_pks.get(), tss.data());
ASSERT_TRUE(status.ok());

auto cnt = segment->get_deleted_count();
Expand Down
2 changes: 1 addition & 1 deletion internal/core/unittest/test_sealed.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1061,7 +1061,7 @@ TEST(Sealed, DeleteCount) {
ASSERT_TRUE(status.ok());

auto cnt = segment->get_deleted_count();
ASSERT_EQ(cnt, c);
ASSERT_EQ(cnt, 0);
}

TEST(Sealed, RealCount) {
Expand Down

0 comments on commit 93e2eb7

Please sign in to comment.