Skip to content

Commit

Permalink
feat: Support stats task to sort segment by PK (milvus-io#35054)
Browse files Browse the repository at this point in the history
issue: milvus-io#33744 

This PR includes the following changes:
1. Added a new task type to the task scheduler in datacoord: stats task,
which sorts segments by primary key.
2. Implemented segment sorting in indexnode.
3. Added a new field `FieldStatsLog` to SegmentInfo to store token index
information.

---------

Signed-off-by: Cai Zhang <[email protected]>
  • Loading branch information
xiaocai2333 authored Sep 2, 2024
1 parent 9d80137 commit 2c9bb4d
Show file tree
Hide file tree
Showing 110 changed files with 5,616 additions and 1,437 deletions.
254 changes: 243 additions & 11 deletions internal/core/src/segcore/SegmentSealedImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,8 @@ SegmentSealedImpl::LoadScalarIndex(const LoadIndexInfo& info) {
case DataType::INT64: {
auto int64_index = dynamic_cast<index::ScalarIndex<int64_t>*>(
scalar_indexings_[field_id].get());
if (insert_record_.empty_pks() && int64_index->HasRawData()) {
if (!is_sorted_by_pk_ && insert_record_.empty_pks() &&
int64_index->HasRawData()) {
for (int i = 0; i < row_count; ++i) {
insert_record_.insert_pk(int64_index->Reverse_Lookup(i),
i);
Expand All @@ -202,7 +203,8 @@ SegmentSealedImpl::LoadScalarIndex(const LoadIndexInfo& info) {
auto string_index =
dynamic_cast<index::ScalarIndex<std::string>*>(
scalar_indexings_[field_id].get());
if (insert_record_.empty_pks() && string_index->HasRawData()) {
if (!is_sorted_by_pk_ && insert_record_.empty_pks() &&
string_index->HasRawData()) {
for (int i = 0; i < row_count; ++i) {
insert_record_.insert_pk(
string_index->Reverse_Lookup(i), i);
Expand Down Expand Up @@ -445,7 +447,9 @@ SegmentSealedImpl::LoadFieldData(FieldId field_id, FieldDataInfo& data) {
}

// set pks to offset
if (schema_->get_primary_field_id() == field_id) {
// if the segments are already sorted by pk, there is no need to build a pk offset index.
// it can directly perform a binary search on the pk column.
if (schema_->get_primary_field_id() == field_id && !is_sorted_by_pk_) {
AssertInfo(field_id.get() != -1, "Primary key is -1");
AssertInfo(insert_record_.empty_pks(), "already exists");
insert_record_.insert_pks(data_type, column);
Expand Down Expand Up @@ -571,7 +575,8 @@ SegmentSealedImpl::MapFieldData(const FieldId field_id, FieldDataInfo& data) {
strerror(errno)));

// set pks to offset
if (schema_->get_primary_field_id() == field_id) {
// no need pk
if (schema_->get_primary_field_id() == field_id && !is_sorted_by_pk_) {
AssertInfo(field_id.get() != -1, "Primary key is -1");
AssertInfo(insert_record_.empty_pks(), "already exists");
insert_record_.insert_pks(data_type, column);
Expand Down Expand Up @@ -721,6 +726,182 @@ SegmentSealedImpl::get_schema() const {
return *schema_;
}

std::vector<SegOffset>
SegmentSealedImpl::search_pk(const PkType& pk, Timestamp timestamp) const {
auto pk_field_id = schema_->get_primary_field_id().value_or(FieldId(-1));
AssertInfo(pk_field_id.get() != -1, "Primary key is -1");
auto pk_column = fields_.at(pk_field_id);
std::vector<SegOffset> pk_offsets;
switch (schema_->get_fields().at(pk_field_id).get_data_type()) {
case DataType::INT64: {
auto target = std::get<int64_t>(pk);
// get int64 pks
auto src = reinterpret_cast<const int64_t*>(pk_column->Data());
auto it =
std::lower_bound(src,
src + pk_column->NumRows(),
target,
[](const int64_t& elem, const int64_t& value) {
return elem < value;
});
for (; it != src + pk_column->NumRows() && *it == target; it++) {
auto offset = it - src;
if (insert_record_.timestamps_[offset] <= timestamp) {
pk_offsets.emplace_back(it - src);
}
}
break;
}
case DataType::VARCHAR: {
auto target = std::get<std::string>(pk);
// get varchar pks
auto var_column =
std::dynamic_pointer_cast<VariableColumn<std::string>>(
pk_column);
auto views = var_column->Views();
auto it = std::lower_bound(views.begin(), views.end(), target);
for (; it != views.end() && *it == target; it++) {
auto offset = std::distance(views.begin(), it);
if (insert_record_.timestamps_[offset] <= timestamp) {
pk_offsets.emplace_back(offset);
}
}
break;
}
default: {
PanicInfo(
DataTypeInvalid,
fmt::format(
"unsupported type {}",
schema_->get_fields().at(pk_field_id).get_data_type()));
}
}

return pk_offsets;
}

std::vector<SegOffset>
SegmentSealedImpl::search_pk(const PkType& pk, int64_t insert_barrier) const {
auto pk_field_id = schema_->get_primary_field_id().value_or(FieldId(-1));
AssertInfo(pk_field_id.get() != -1, "Primary key is -1");
auto pk_column = fields_.at(pk_field_id);
std::vector<SegOffset> pk_offsets;
switch (schema_->get_fields().at(pk_field_id).get_data_type()) {
case DataType::INT64: {
auto target = std::get<int64_t>(pk);
// get int64 pks
auto src = reinterpret_cast<const int64_t*>(pk_column->Data());
auto it =
std::lower_bound(src,
src + pk_column->NumRows(),
target,
[](const int64_t& elem, const int64_t& value) {
return elem < value;
});
for (; it != src + pk_column->NumRows() && *it == target; it++) {
if (it - src < insert_barrier) {
pk_offsets.emplace_back(it - src);
}
}
break;
}
case DataType::VARCHAR: {
auto target = std::get<std::string>(pk);
// get varchar pks
auto var_column =
std::dynamic_pointer_cast<VariableColumn<std::string>>(
pk_column);
auto views = var_column->Views();
auto it = std::lower_bound(views.begin(), views.end(), target);
while (it != views.end() && *it == target) {
auto offset = std::distance(views.begin(), it);
if (offset < insert_barrier) {
pk_offsets.emplace_back(offset);
}
++it;
}
break;
}
default: {
PanicInfo(
DataTypeInvalid,
fmt::format(
"unsupported type {}",
schema_->get_fields().at(pk_field_id).get_data_type()));
}
}

return pk_offsets;
}

std::shared_ptr<DeletedRecord::TmpBitmap>
SegmentSealedImpl::get_deleted_bitmap_s(int64_t del_barrier,
int64_t insert_barrier,
DeletedRecord& delete_record,
Timestamp query_timestamp) const {
// if insert_barrier and del_barrier have not changed, use cache data directly
bool hit_cache = false;
int64_t old_del_barrier = 0;
auto current = delete_record.clone_lru_entry(
insert_barrier, del_barrier, old_del_barrier, hit_cache);
if (hit_cache) {
return current;
}

auto bitmap = current->bitmap_ptr;

int64_t start, end;
if (del_barrier < old_del_barrier) {
// in this case, ts of delete record[current_del_barrier : old_del_barrier] > query_timestamp
// so these deletion records do not take effect in query/search
// so bitmap corresponding to those pks in delete record[current_del_barrier:old_del_barrier] will be reset to 0
// for example, current_del_barrier = 2, query_time = 120, the bitmap will be reset to [0, 1, 1, 0, 0, 0, 0, 0]
start = del_barrier;
end = old_del_barrier;
} else {
// the cache is not enough, so update bitmap using new pks in delete record[old_del_barrier:current_del_barrier]
// for example, current_del_barrier = 4, query_time = 300, bitmap will be updated to [0, 1, 1, 0, 1, 1, 0, 0]
start = old_del_barrier;
end = del_barrier;
}

// Avoid invalid calculations when there are a lot of repeated delete pks
std::unordered_map<PkType, Timestamp> delete_timestamps;
for (auto del_index = start; del_index < end; ++del_index) {
auto pk = delete_record.pks()[del_index];
auto timestamp = delete_record.timestamps()[del_index];

delete_timestamps[pk] = timestamp > delete_timestamps[pk]
? timestamp
: delete_timestamps[pk];
}

for (auto& [pk, timestamp] : delete_timestamps) {
auto segOffsets = search_pk(pk, insert_barrier);
for (auto offset : segOffsets) {
int64_t insert_row_offset = offset.get();

// The deletion record do not take effect in search/query,
// and reset bitmap to 0
if (timestamp > query_timestamp) {
bitmap->reset(insert_row_offset);
continue;
}
// Insert after delete with same pk, delete will not task effect on this insert record,
// and reset bitmap to 0
if (insert_record_.timestamps_[offset.get()] >= timestamp) {
bitmap->reset(insert_row_offset);
continue;
}
// insert data corresponding to the insert_row_offset will be ignored in search/query
bitmap->set(insert_row_offset);
}
}

delete_record.insert_lru_entry(current);
return current;
}

void
SegmentSealedImpl::mask_with_delete(BitsetType& bitset,
int64_t ins_barrier,
Expand All @@ -730,8 +911,19 @@ SegmentSealedImpl::mask_with_delete(BitsetType& bitset,
return;
}

auto bitmap_holder = get_deleted_bitmap(
del_barrier, ins_barrier, deleted_record_, insert_record_, timestamp);
auto bitmap_holder = std::shared_ptr<DeletedRecord::TmpBitmap>();

if (!is_sorted_by_pk_) {
bitmap_holder = get_deleted_bitmap(del_barrier,
ins_barrier,
deleted_record_,
insert_record_,
timestamp);
} else {
bitmap_holder = get_deleted_bitmap_s(
del_barrier, ins_barrier, deleted_record_, timestamp);
}

if (!bitmap_holder || !bitmap_holder->bitmap_ptr) {
return;
}
Expand Down Expand Up @@ -1037,7 +1229,8 @@ SegmentSealedImpl::SegmentSealedImpl(SchemaPtr schema,
IndexMetaPtr index_meta,
const SegcoreConfig& segcore_config,
int64_t segment_id,
bool TEST_skip_index_for_retrieve)
bool TEST_skip_index_for_retrieve,
bool is_sorted_by_pk)
: segcore_config_(segcore_config),
field_data_ready_bitset_(schema->size()),
index_ready_bitset_(schema->size()),
Expand All @@ -1047,7 +1240,8 @@ SegmentSealedImpl::SegmentSealedImpl(SchemaPtr schema,
schema_(schema),
id_(segment_id),
col_index_meta_(index_meta),
TEST_skip_index_for_retrieve_(TEST_skip_index_for_retrieve) {
TEST_skip_index_for_retrieve_(TEST_skip_index_for_retrieve),
is_sorted_by_pk_(is_sorted_by_pk) {
mmap_descriptor_ = std::shared_ptr<storage::MmapChunkDescriptor>(
new storage::MmapChunkDescriptor({segment_id, SegmentType::Sealed}));
auto mcm = storage::MmapManager::GetInstance().GetMmapChunkManager();
Expand Down Expand Up @@ -1506,13 +1700,18 @@ SegmentSealedImpl::search_ids(const IdArray& id_array,
auto ids_size = GetSizeOfIdArray(id_array);
std::vector<PkType> pks(ids_size);
ParsePksFromIDs(pks, data_type, id_array);

auto res_id_arr = std::make_unique<IdArray>();
std::vector<SegOffset> res_offsets;
res_offsets.reserve(pks.size());

for (auto& pk : pks) {
auto segOffsets = insert_record_.search_pk(pk, timestamp);
for (auto offset : segOffsets) {
std::vector<SegOffset> pk_offsets;
if (!is_sorted_by_pk_) {
pk_offsets = insert_record_.search_pk(pk, timestamp);
} else {
pk_offsets = search_pk(pk, timestamp);
}
for (auto offset : pk_offsets) {
switch (data_type) {
case DataType::INT64: {
res_id_arr->mutable_int_id()->add_data(
Expand All @@ -1535,6 +1734,39 @@ SegmentSealedImpl::search_ids(const IdArray& id_array,
return {std::move(res_id_arr), std::move(res_offsets)};
}

std::pair<std::vector<OffsetMap::OffsetType>, bool>
SegmentSealedImpl::find_first(int64_t limit, const BitsetType& bitset) const {
if (!is_sorted_by_pk_) {
return insert_record_.pk2offset_->find_first(limit, bitset);
}
if (limit == Unlimited || limit == NoLimit) {
limit = num_rows_.value();
}

int64_t hit_num = 0; // avoid counting the number everytime.
auto size = bitset.size();
int64_t cnt = size - bitset.count();
auto more_hit_than_limit = cnt > limit;
limit = std::min(limit, cnt);
std::vector<int64_t> seg_offsets;
seg_offsets.reserve(limit);

int64_t offset = 0;
for (; hit_num < limit && offset < num_rows_.value(); offset++) {
if (offset >= size) {
// In fact, this case won't happen on sealed segments.
continue;
}

if (!bitset[offset]) {
seg_offsets.push_back(offset);
hit_num++;
}
}

return {seg_offsets, more_hit_than_limit && offset != num_rows_.value()};
}

SegcoreError
SegmentSealedImpl::Delete(int64_t reserved_offset, // deprecated
int64_t size,
Expand Down
28 changes: 22 additions & 6 deletions internal/core/src/segcore/SegmentSealedImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ class SegmentSealedImpl : public SegmentSealed {
IndexMetaPtr index_meta,
const SegcoreConfig& segcore_config,
int64_t segment_id,
bool TEST_skip_index_for_retrieve = false);
bool TEST_skip_index_for_retrieve = false,
bool is_sorted_by_pk = false);
~SegmentSealedImpl() override;
void
LoadIndex(const LoadIndexInfo& info) override;
Expand Down Expand Up @@ -105,6 +106,18 @@ class SegmentSealedImpl : public SegmentSealed {
const Schema&
get_schema() const override;

std::vector<SegOffset>
search_pk(const PkType& pk, Timestamp timestamp) const;

std::vector<SegOffset>
search_pk(const PkType& pk, int64_t insert_barrier) const;

std::shared_ptr<DeletedRecord::TmpBitmap>
get_deleted_bitmap_s(int64_t del_barrier,
int64_t insert_barrier,
DeletedRecord& delete_record,
Timestamp query_timestamp) const;

std::unique_ptr<DataArray>
get_vector(FieldId field_id, const int64_t* ids, int64_t count) const;

Expand Down Expand Up @@ -142,9 +155,7 @@ class SegmentSealedImpl : public SegmentSealed {
const Timestamp* timestamps) override;

std::pair<std::vector<OffsetMap::OffsetType>, bool>
find_first(int64_t limit, const BitsetType& bitset) const override {
return insert_record_.pk2offset_->find_first(limit, bitset);
}
find_first(int64_t limit, const BitsetType& bitset) const override;

// Calculate: output[i] = Vec[seg_offset[i]]
// where Vec is determined from field_offset
Expand Down Expand Up @@ -343,6 +354,9 @@ class SegmentSealedImpl : public SegmentSealed {
// for sparse vector unit test only! Once a type of sparse index that
// doesn't has raw data is added, this should be removed.
bool TEST_skip_index_for_retrieve_ = false;

// whether the segment is sorted by the pk
bool is_sorted_by_pk_ = false;
};

inline SegmentSealedUPtr
Expand All @@ -351,12 +365,14 @@ CreateSealedSegment(
IndexMetaPtr index_meta = nullptr,
int64_t segment_id = -1,
const SegcoreConfig& segcore_config = SegcoreConfig::default_config(),
bool TEST_skip_index_for_retrieve = false) {
bool TEST_skip_index_for_retrieve = false,
bool is_sorted_by_pk = false) {
return std::make_unique<SegmentSealedImpl>(schema,
index_meta,
segcore_config,
segment_id,
TEST_skip_index_for_retrieve);
TEST_skip_index_for_retrieve,
is_sorted_by_pk);
}

} // namespace milvus::segcore
Loading

0 comments on commit 2c9bb4d

Please sign in to comment.