Skip to content

Commit

Permalink
Search for pk using raw data to reduce the overhead caused by views
Browse files Browse the repository at this point in the history
Signed-off-by: Cai Zhang <[email protected]>
  • Loading branch information
xiaocai2333 committed Nov 2, 2024
1 parent 116bf50 commit 2380a49
Show file tree
Hide file tree
Showing 7 changed files with 129 additions and 209 deletions.
37 changes: 35 additions & 2 deletions internal/core/src/common/Chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -133,13 +133,46 @@ class StringChunk : public Chunk {
offsets_ = reinterpret_cast<uint64_t*>(data + null_bitmap_bytes_num);
}

std::string_view
operator[](const int i) const {
if (i < 0 || i > row_nums_) {
PanicInfo(ErrorCode::OutOfRange, "index out of range");

Check warning on line 139 in internal/core/src/common/Chunk.h

View check run for this annotation

Codecov / codecov/patch

internal/core/src/common/Chunk.h#L137-L139

Added lines #L137 - L139 were not covered by tests
}

return {data_ + offsets_[i], offsets_[i + 1] - offsets_[i]};

Check warning on line 142 in internal/core/src/common/Chunk.h

View check run for this annotation

Codecov / codecov/patch

internal/core/src/common/Chunk.h#L142

Added line #L142 was not covered by tests
}

std::pair<std::vector<std::string_view>, FixedVector<bool>>
StringViews();

int
findStringPosition(std::string_view target) {

Check warning on line 149 in internal/core/src/common/Chunk.h

View check run for this annotation

Codecov / codecov/patch

internal/core/src/common/Chunk.h#L149

Added line #L149 was not covered by tests
// only supported sorted pk
int left = 0;
int right = row_nums_ - 1; // `right` should be num_rows_ - 1
int result =

Check warning on line 153 in internal/core/src/common/Chunk.h

View check run for this annotation

Codecov / codecov/patch

internal/core/src/common/Chunk.h#L151-L153

Added lines #L151 - L153 were not covered by tests
-1; // Initialize result to store the first occurrence index

while (left <= right) {
int mid = left + (right - left) / 2;
std::string_view midString = (*this)[mid];
if (midString == target) {
result = mid; // Store the index of match
right = mid - 1; // Continue searching in the left half
} else if (midString < target) {

Check warning on line 162 in internal/core/src/common/Chunk.h

View check run for this annotation

Codecov / codecov/patch

internal/core/src/common/Chunk.h#L156-L162

Added lines #L156 - L162 were not covered by tests
// midString < target
left = mid + 1;

Check warning on line 164 in internal/core/src/common/Chunk.h

View check run for this annotation

Codecov / codecov/patch

internal/core/src/common/Chunk.h#L164

Added line #L164 was not covered by tests
} else {
// midString > target
right = mid - 1;

Check warning on line 167 in internal/core/src/common/Chunk.h

View check run for this annotation

Codecov / codecov/patch

internal/core/src/common/Chunk.h#L167

Added line #L167 was not covered by tests
}
}
return result;

Check warning on line 170 in internal/core/src/common/Chunk.h

View check run for this annotation

Codecov / codecov/patch

internal/core/src/common/Chunk.h#L170

Added line #L170 was not covered by tests
}

const char*
ValueAt(int64_t idx) const override {
PanicInfo(ErrorCode::Unsupported,
"StringChunk::ValueAt is not supported");
return (*this)[idx].data();

Check warning on line 175 in internal/core/src/common/Chunk.h

View check run for this annotation

Codecov / codecov/patch

internal/core/src/common/Chunk.h#L175

Added line #L175 was not covered by tests
}

uint64_t*
Expand Down
5 changes: 5 additions & 0 deletions internal/core/src/mmap/ChunkedColumn.h
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,11 @@ class ChunkedVariableColumn : public ChunkedColumnBase {
->StringViews();
}

std::shared_ptr<Chunk>
GetChunk(int64_t chunk_id) const {
return chunks_[chunk_id];

Check warning on line 316 in internal/core/src/mmap/ChunkedColumn.h

View check run for this annotation

Codecov / codecov/patch

internal/core/src/mmap/ChunkedColumn.h#L316

Added line #L316 was not covered by tests
}

BufferView
GetBatchBuffer(int64_t start_offset, int64_t length) override {
if (start_offset < 0 || start_offset > num_rows_ ||
Expand Down
24 changes: 24 additions & 0 deletions internal/core/src/mmap/Column.h
Original file line number Diff line number Diff line change
Expand Up @@ -749,6 +749,30 @@ class SingleChunkVariableColumn : public SingleChunkColumnBase {
return ViewType(pos + sizeof(uint32_t), size);
}

int
findStringPosition(std::string_view target) {

Check warning on line 753 in internal/core/src/mmap/Column.h

View check run for this annotation

Codecov / codecov/patch

internal/core/src/mmap/Column.h#L753

Added line #L753 was not covered by tests
int left = 0;
int right = num_rows_ - 1; // `right` should be num_rows_ - 1

Check warning on line 755 in internal/core/src/mmap/Column.h

View check run for this annotation

Codecov / codecov/patch

internal/core/src/mmap/Column.h#L755

Added line #L755 was not covered by tests
int result =
-1; // Initialize result to store the first occurrence index

while (left <= right) {
int mid = left + (right - left) / 2;
std::string_view midString = this->RawAt(mid);

Check warning on line 761 in internal/core/src/mmap/Column.h

View check run for this annotation

Codecov / codecov/patch

internal/core/src/mmap/Column.h#L759-L761

Added lines #L759 - L761 were not covered by tests
if (midString == target) {
result = mid; // Store the index of match
right = mid - 1; // Continue searching in the left half
} else if (midString < target) {

Check warning on line 765 in internal/core/src/mmap/Column.h

View check run for this annotation

Codecov / codecov/patch

internal/core/src/mmap/Column.h#L764-L765

Added lines #L764 - L765 were not covered by tests
// midString < target
left = mid + 1;

Check warning on line 767 in internal/core/src/mmap/Column.h

View check run for this annotation

Codecov / codecov/patch

internal/core/src/mmap/Column.h#L767

Added line #L767 was not covered by tests
} else {
// midString > target
right = mid - 1;

Check warning on line 770 in internal/core/src/mmap/Column.h

View check run for this annotation

Codecov / codecov/patch

internal/core/src/mmap/Column.h#L770

Added line #L770 was not covered by tests
}
}
return result;

Check warning on line 773 in internal/core/src/mmap/Column.h

View check run for this annotation

Codecov / codecov/patch

internal/core/src/mmap/Column.h#L773

Added line #L773 was not covered by tests
}

std::string_view
RawAt(const int i) const {
return std::string_view((*this)[i]);
Expand Down
177 changes: 34 additions & 143 deletions internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -863,16 +863,17 @@ ChunkedSegmentSealedImpl::mask_with_delete(BitsetTypeView& bitset,

auto bitmap_holder = std::shared_ptr<DeletedRecord::TmpBitmap>();

if (!is_sorted_by_pk_) {
bitmap_holder = get_deleted_bitmap(del_barrier,
ins_barrier,
deleted_record_,
insert_record_,
timestamp);
} else {
bitmap_holder = get_deleted_bitmap_s(
del_barrier, ins_barrier, deleted_record_, timestamp);
}
auto search_fn = [this](const PkType& pk, int64_t barrier) {
return this->search_pk(pk, barrier);
};
bitmap_holder = get_deleted_bitmap(del_barrier,

Check warning on line 869 in internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp#L866-L869

Added lines #L866 - L869 were not covered by tests
ins_barrier,
deleted_record_,
insert_record_,

Check warning on line 872 in internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp#L872

Added line #L872 was not covered by tests
timestamp,
is_sorted_by_pk_,

Check warning on line 874 in internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp#L874

Added line #L874 was not covered by tests
search_fn);

if (!bitmap_holder || !bitmap_holder->bitmap_ptr) {
return;
}
Expand Down Expand Up @@ -1182,72 +1183,28 @@ ChunkedSegmentSealedImpl::check_search(const query::Plan* plan) const {
std::vector<SegOffset>
ChunkedSegmentSealedImpl::search_pk(const PkType& pk,
Timestamp timestamp) const {
auto pk_field_id = schema_->get_primary_field_id().value_or(FieldId(-1));
AssertInfo(pk_field_id.get() != -1, "Primary key is -1");
auto pk_column = fields_.at(pk_field_id);
std::vector<SegOffset> pk_offsets;
switch (schema_->get_fields().at(pk_field_id).get_data_type()) {
case DataType::INT64: {
auto target = std::get<int64_t>(pk);
// get int64 pks
auto num_chunk = pk_column->num_chunks();
for (int i = 0; i < num_chunk; ++i) {
auto src = reinterpret_cast<const int64_t*>(pk_column->Data(i));
auto chunk_row_num = pk_column->chunk_row_nums(i);
auto it = std::lower_bound(
src,
src + chunk_row_num,
target,
[](const int64_t& elem, const int64_t& value) {
return elem < value;
});
for (; it != src + chunk_row_num && *it == target; it++) {
auto offset = it - src;
if (insert_record_.timestamps_[offset] <= timestamp) {
pk_offsets.emplace_back(offset);
}
}
}
break;
}
case DataType::VARCHAR: {
auto target = std::get<std::string>(pk);
// get varchar pks
auto var_column =
std::dynamic_pointer_cast<ChunkedVariableColumn<std::string>>(
pk_column);
auto num_chunk = var_column->num_chunks();
for (int i = 0; i < num_chunk; ++i) {
auto views = var_column->StringViews(i).first;
auto it = std::lower_bound(views.begin(), views.end(), target);
for (; it != views.end() && *it == target; it++) {
auto offset = std::distance(views.begin(), it);
if (insert_record_.timestamps_[offset] <= timestamp) {
pk_offsets.emplace_back(offset);
}
}
}
break;
}
default: {
PanicInfo(
DataTypeInvalid,
fmt::format(
"unsupported type {}",
schema_->get_fields().at(pk_field_id).get_data_type()));
}
}

return pk_offsets;
return search_pk_internal(pk, [this, timestamp](int64_t offset) {
return insert_record_.timestamps_[offset] <= timestamp;
});

Check warning on line 1188 in internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp#L1186-L1188

Added lines #L1186 - L1188 were not covered by tests
}

std::vector<SegOffset>
ChunkedSegmentSealedImpl::search_pk(const PkType& pk,
int64_t insert_barrier) const {
return search_pk_internal(pk, [insert_barrier](int64_t offset) {
return offset < insert_barrier;
});

Check warning on line 1196 in internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp#L1194-L1196

Added lines #L1194 - L1196 were not covered by tests
}

template <typename Condition>
std::vector<SegOffset>
ChunkedSegmentSealedImpl::search_pk_internal(const PkType& pk,

Check warning on line 1201 in internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp#L1201

Added line #L1201 was not covered by tests
Condition condition) const {
auto pk_field_id = schema_->get_primary_field_id().value_or(FieldId(-1));
AssertInfo(pk_field_id.get() != -1, "Primary key is -1");
auto pk_column = fields_.at(pk_field_id);
std::vector<SegOffset> pk_offsets;

switch (schema_->get_fields().at(pk_field_id).get_data_type()) {
case DataType::INT64: {
auto target = std::get<int64_t>(pk);
Expand All @@ -1264,9 +1221,10 @@ ChunkedSegmentSealedImpl::search_pk(const PkType& pk,
[](const int64_t& elem, const int64_t& value) {
return elem < value;
});
for (; it != src + chunk_row_num && *it == target; it++) {
for (; it != src + pk_column->NumRows() && *it == target;

Check warning on line 1224 in internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp#L1224

Added line #L1224 was not covered by tests
++it) {
auto offset = it - src;
if (offset < insert_barrier) {
if (condition(offset)) {

Check warning on line 1227 in internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp#L1227

Added line #L1227 was not covered by tests
pk_offsets.emplace_back(offset);
}
}
Expand All @@ -1283,11 +1241,13 @@ ChunkedSegmentSealedImpl::search_pk(const PkType& pk,

auto num_chunk = var_column->num_chunks();
for (int i = 0; i < num_chunk; ++i) {
auto views = var_column->StringViews(i).first;
auto it = std::lower_bound(views.begin(), views.end(), target);
for (; it != views.end() && *it == target; it++) {
auto offset = std::distance(views.begin(), it);
if (offset < insert_barrier) {
// TODO @xiaocai2333, @sunby: chunk need to record the min/max.
auto string_chunk = std::dynamic_pointer_cast<StringChunk>(var_column->GetChunk(i));
auto offset = string_chunk->findStringPosition(target);
for (; offset != -1 && offset < var_column->NumRows() &&
var_column->RawAt(offset) == target;

Check warning on line 1248 in internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp#L1245-L1248

Added lines #L1245 - L1248 were not covered by tests
++offset) {
if (condition(offset)) {

Check warning on line 1250 in internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp#L1250

Added line #L1250 was not covered by tests
pk_offsets.emplace_back(offset);
}
}
Expand All @@ -1306,75 +1266,6 @@ ChunkedSegmentSealedImpl::search_pk(const PkType& pk,
return pk_offsets;
}

std::shared_ptr<DeletedRecord::TmpBitmap>
ChunkedSegmentSealedImpl::get_deleted_bitmap_s(
int64_t del_barrier,
int64_t insert_barrier,
DeletedRecord& delete_record,
Timestamp query_timestamp) const {
// if insert_barrier and del_barrier have not changed, use cache data directly
bool hit_cache = false;
int64_t old_del_barrier = 0;
auto current = delete_record.clone_lru_entry(
insert_barrier, del_barrier, old_del_barrier, hit_cache);
if (hit_cache) {
return current;
}

auto bitmap = current->bitmap_ptr;

int64_t start, end;
if (del_barrier < old_del_barrier) {
// in this case, ts of delete record[current_del_barrier : old_del_barrier] > query_timestamp
// so these deletion records do not take effect in query/search
// so bitmap corresponding to those pks in delete record[current_del_barrier:old_del_barrier] will be reset to 0
// for example, current_del_barrier = 2, query_time = 120, the bitmap will be reset to [0, 1, 1, 0, 0, 0, 0, 0]
start = del_barrier;
end = old_del_barrier;
} else {
// the cache is not enough, so update bitmap using new pks in delete record[old_del_barrier:current_del_barrier]
// for example, current_del_barrier = 4, query_time = 300, bitmap will be updated to [0, 1, 1, 0, 1, 1, 0, 0]
start = old_del_barrier;
end = del_barrier;
}

// Avoid invalid calculations when there are a lot of repeated delete pks
std::unordered_map<PkType, Timestamp> delete_timestamps;
for (auto del_index = start; del_index < end; ++del_index) {
auto pk = delete_record.pks()[del_index];
auto timestamp = delete_record.timestamps()[del_index];

delete_timestamps[pk] = timestamp > delete_timestamps[pk]
? timestamp
: delete_timestamps[pk];
}

for (auto& [pk, timestamp] : delete_timestamps) {
auto segOffsets = search_pk(pk, insert_barrier);
for (auto offset : segOffsets) {
int64_t insert_row_offset = offset.get();

// The deletion record do not take effect in search/query,
// and reset bitmap to 0
if (timestamp > query_timestamp) {
bitmap->reset(insert_row_offset);
continue;
}
// Insert after delete with same pk, delete will not task effect on this insert record,
// and reset bitmap to 0
if (insert_record_.timestamps_[offset.get()] >= timestamp) {
bitmap->reset(insert_row_offset);
continue;
}
// insert data corresponding to the insert_row_offset will be ignored in search/query
bitmap->set(insert_row_offset);
}
}

delete_record.insert_lru_entry(current);
return current;
}

std::pair<std::vector<OffsetMap::OffsetType>, bool>
ChunkedSegmentSealedImpl::find_first(int64_t limit,
const BitsetType& bitset) const {
Expand Down
8 changes: 3 additions & 5 deletions internal/core/src/segcore/ChunkedSegmentSealedImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -119,11 +119,9 @@ class ChunkedSegmentSealedImpl : public SegmentSealed {
std::vector<SegOffset>
search_pk(const PkType& pk, int64_t insert_barrier) const;

std::shared_ptr<DeletedRecord::TmpBitmap>
get_deleted_bitmap_s(int64_t del_barrier,
int64_t insert_barrier,
DeletedRecord& delete_record,
Timestamp query_timestamp) const;
template <typename Condition>
std::vector<SegOffset>
search_pk_internal(const PkType& pk, Condition condition) const;

std::unique_ptr<DataArray>
get_vector(FieldId field_id,
Expand Down
Loading

0 comments on commit 2380a49

Please sign in to comment.