Skip to content

Commit

Permalink
fix: query iterator lack results(#33137) (#33422)
Browse files Browse the repository at this point in the history
related: #33137 
adding has_more_result_tag for various level's reduce to rectify
reduce_stop_for_best

Signed-off-by: MrPresent-Han <[email protected]>
  • Loading branch information
MrPresent-Han authored May 30, 2024
1 parent 7763718 commit 416a2cf
Show file tree
Hide file tree
Showing 17 changed files with 197 additions and 103 deletions.
1 change: 1 addition & 0 deletions internal/core/src/common/QueryResult.h
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ struct RetrieveResult {
void* segment_;
std::vector<int64_t> result_offsets_;
std::vector<DataArray> field_data_;
bool has_more_result = true;
};

using RetrieveResultPtr = std::shared_ptr<RetrieveResult>;
Expand Down
4 changes: 3 additions & 1 deletion internal/core/src/query/visitors/ExecPlanNodeVisitor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -291,8 +291,10 @@ ExecPlanNodeVisitor::visit(RetrievePlanNode& node) {
false_filtered_out = true;
segment->timestamp_filter(bitset_holder, timestamp_);
}
retrieve_result.result_offsets_ =
auto results_pair =
segment->find_first(node.limit_, bitset_holder, false_filtered_out);
retrieve_result.result_offsets_ = std::move(results_pair.first);
retrieve_result.has_more_result = results_pair.second;
retrieve_result_opt_ = std::move(retrieve_result);
}

Expand Down
24 changes: 12 additions & 12 deletions internal/core/src/segcore/InsertRecord.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ class OffsetMap {

using OffsetType = int64_t;
// TODO: in fact, we can retrieve the pk here. Not sure which way is more efficient.
virtual std::vector<OffsetType>
virtual std::pair<std::vector<OffsetMap::OffsetType>, bool>
find_first(int64_t limit,
const BitsetType& bitset,
bool false_filtered_out) const = 0;
Expand Down Expand Up @@ -109,7 +109,7 @@ class OffsetOrderedMap : public OffsetMap {
return map_.empty();
}

std::vector<OffsetType>
std::pair<std::vector<OffsetMap::OffsetType>, bool>
find_first(int64_t limit,
const BitsetType& bitset,
bool false_filtered_out) const override {
Expand All @@ -131,7 +131,7 @@ class OffsetOrderedMap : public OffsetMap {
}

private:
std::vector<OffsetType>
std::pair<std::vector<OffsetMap::OffsetType>, bool>
find_first_by_index(int64_t limit,
const BitsetType& bitset,
bool false_filtered_out) const {
Expand All @@ -144,8 +144,8 @@ class OffsetOrderedMap : public OffsetMap {
limit = std::min(limit, cnt);
std::vector<int64_t> seg_offsets;
seg_offsets.reserve(limit);
for (auto it = map_.begin(); hit_num < limit && it != map_.end();
it++) {
auto it = map_.begin();
for (; hit_num < limit && it != map_.end(); it++) {
for (auto seg_offset : it->second) {
if (seg_offset >= size) {
// Frequently concurrent insert/query will cause this case.
Expand All @@ -161,7 +161,7 @@ class OffsetOrderedMap : public OffsetMap {
}
}
}
return seg_offsets;
return {seg_offsets, it != map_.end()};
}

private:
Expand Down Expand Up @@ -226,7 +226,7 @@ class OffsetOrderedArray : public OffsetMap {
return array_.empty();
}

std::vector<OffsetType>
std::pair<std::vector<OffsetMap::OffsetType>, bool>
find_first(int64_t limit,
const BitsetType& bitset,
bool false_filtered_out) const override {
Expand All @@ -248,7 +248,7 @@ class OffsetOrderedArray : public OffsetMap {
}

private:
std::vector<OffsetType>
std::pair<std::vector<OffsetMap::OffsetType>, bool>
find_first_by_index(int64_t limit,
const BitsetType& bitset,
bool false_filtered_out) const {
Expand All @@ -261,11 +261,11 @@ class OffsetOrderedArray : public OffsetMap {
limit = std::min(limit, cnt);
std::vector<int64_t> seg_offsets;
seg_offsets.reserve(limit);
for (auto it = array_.begin(); hit_num < limit && it != array_.end();
it++) {
auto it = array_.begin();
for (; hit_num < limit && it != array_.end(); it++) {
auto seg_offset = it->second;
if (seg_offset >= size) {
// In fact, this case won't happend on sealed segments.
// In fact, this case won't happen on sealed segments.
continue;
}

Expand All @@ -274,7 +274,7 @@ class OffsetOrderedArray : public OffsetMap {
hit_num++;
}
}
return seg_offsets;
return {seg_offsets, it != array_.end()};
}

void
Expand Down
2 changes: 1 addition & 1 deletion internal/core/src/segcore/SegmentGrowingImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ class SegmentGrowingImpl : public SegmentGrowing {
return true;
}

std::vector<OffsetMap::OffsetType>
std::pair<std::vector<OffsetMap::OffsetType>, bool>
find_first(int64_t limit,
const BitsetType& bitset,
bool false_filtered_out) const override {
Expand Down
2 changes: 1 addition & 1 deletion internal/core/src/segcore/SegmentInterface.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ SegmentInternalInterface::Retrieve(tracer::TraceContext* trace_ctx,
query::ExecPlanNodeVisitor visitor(*this, timestamp);
auto retrieve_results = visitor.get_retrieve_result(*plan->plan_node_);
retrieve_results.segment_ = (void*)this;
results->set_has_more_result(retrieve_results.has_more_result);

auto result_rows = retrieve_results.result_offsets_.size();
int64_t output_data_size = 0;
Expand Down Expand Up @@ -120,7 +121,6 @@ SegmentInternalInterface::Retrieve(tracer::TraceContext* trace_ctx,
retrieve_results.result_offsets_.size(),
ignore_non_pk,
true);

return results;
}

Expand Down
2 changes: 1 addition & 1 deletion internal/core/src/segcore/SegmentInterface.h
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ class SegmentInternalInterface : public SegmentInterface {
* @param false_filtered_out
* @return All candidates offsets.
*/
virtual std::vector<OffsetMap::OffsetType>
virtual std::pair<std::vector<OffsetMap::OffsetType>, bool>
find_first(int64_t limit,
const BitsetType& bitset,
bool false_filtered_out) const = 0;
Expand Down
2 changes: 1 addition & 1 deletion internal/core/src/segcore/SegmentSealedImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ class SegmentSealedImpl : public SegmentSealed {
const IdArray* pks,
const Timestamp* timestamps) override;

std::vector<OffsetMap::OffsetType>
std::pair<std::vector<OffsetMap::OffsetType>, bool>
find_first(int64_t limit,
const BitsetType& bitset,
bool false_filtered_out) const override {
Expand Down
86 changes: 53 additions & 33 deletions internal/core/unittest/test_offset_ordered_array.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,6 @@ using TypeOfPks = testing::Types<int64_t, std::string>;
TYPED_TEST_SUITE_P(TypedOffsetOrderedArrayTest);

TYPED_TEST_P(TypedOffsetOrderedArrayTest, find_first) {
std::vector<int64_t> offsets;

// not sealed.
ASSERT_ANY_THROW(this->map_.find_first(Unlimited, {}, true));

Expand All @@ -81,40 +79,62 @@ TYPED_TEST_P(TypedOffsetOrderedArrayTest, find_first) {
this->seal();

// all is satisfied.
BitsetType all(num);
all.set();
offsets = this->map_.find_first(num / 2, all, true);
ASSERT_EQ(num / 2, offsets.size());
for (int i = 1; i < offsets.size(); i++) {
ASSERT_TRUE(data[offsets[i - 1]] <= data[offsets[i]]);
}
offsets = this->map_.find_first(Unlimited, all, true);
ASSERT_EQ(num, offsets.size());
for (int i = 1; i < offsets.size(); i++) {
ASSERT_TRUE(data[offsets[i - 1]] <= data[offsets[i]]);
{
BitsetType all(num);
all.set();
{
auto [offsets, has_more_res] =
this->map_.find_first(num / 2, all, true);
ASSERT_EQ(num / 2, offsets.size());
ASSERT_TRUE(has_more_res);
for (int i = 1; i < offsets.size(); i++) {
ASSERT_TRUE(data[offsets[i - 1]] <= data[offsets[i]]);
}
}
{
auto [offsets, has_more_res] =
this->map_.find_first(Unlimited, all, true);
ASSERT_EQ(num, offsets.size());
ASSERT_FALSE(has_more_res);
for (int i = 1; i < offsets.size(); i++) {
ASSERT_TRUE(data[offsets[i - 1]] <= data[offsets[i]]);
}
}
}

// corner case, segment offset exceeds the size of bitset.
BitsetType all_minus_1(num - 1);
all_minus_1.set();
offsets = this->map_.find_first(num / 2, all_minus_1, true);
ASSERT_EQ(num / 2, offsets.size());
for (int i = 1; i < offsets.size(); i++) {
ASSERT_TRUE(data[offsets[i - 1]] <= data[offsets[i]]);
{
// corner case, segment offset exceeds the size of bitset.
BitsetType all_minus_1(num - 1);
all_minus_1.set();
{
auto [offsets, has_more_res] =
this->map_.find_first(num / 2, all_minus_1, true);
ASSERT_EQ(num / 2, offsets.size());
ASSERT_TRUE(has_more_res);
for (int i = 1; i < offsets.size(); i++) {
ASSERT_TRUE(data[offsets[i - 1]] <= data[offsets[i]]);
}
}
{
auto [offsets, has_more_res] =
this->map_.find_first(Unlimited, all_minus_1, true);
ASSERT_EQ(all_minus_1.size(), offsets.size());
ASSERT_FALSE(has_more_res);
for (int i = 1; i < offsets.size(); i++) {
ASSERT_TRUE(data[offsets[i - 1]] <= data[offsets[i]]);
}
}
}
offsets = this->map_.find_first(Unlimited, all_minus_1, true);
ASSERT_EQ(all_minus_1.size(), offsets.size());
for (int i = 1; i < offsets.size(); i++) {
ASSERT_TRUE(data[offsets[i - 1]] <= data[offsets[i]]);
{
// none is satisfied.
BitsetType none(num);
none.reset();
auto result_pair = this->map_.find_first(num / 2, none, true);
ASSERT_EQ(0, result_pair.first.size());
ASSERT_TRUE(result_pair.second);
result_pair = this->map_.find_first(NoLimit, none, true);
ASSERT_EQ(0, result_pair.first.size());
ASSERT_TRUE(result_pair.second);
}

// none is satisfied.
BitsetType none(num);
none.reset();
offsets = this->map_.find_first(num / 2, none, true);
ASSERT_EQ(0, offsets.size());
offsets = this->map_.find_first(NoLimit, none, true);
ASSERT_EQ(0, offsets.size());
}

REGISTER_TYPED_TEST_SUITE_P(TypedOffsetOrderedArrayTest, find_first);
Expand Down
76 changes: 51 additions & 25 deletions internal/core/unittest/test_offset_ordered_map.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,13 @@ using TypeOfPks = testing::Types<int64_t, std::string>;
TYPED_TEST_SUITE_P(TypedOffsetOrderedMapTest);

TYPED_TEST_P(TypedOffsetOrderedMapTest, find_first) {
std::vector<int64_t> offsets;

// no data.
offsets = this->map_.find_first(Unlimited, {}, true);
ASSERT_EQ(0, offsets.size());

{
auto [offsets, has_more_res] =
this->map_.find_first(Unlimited, {}, true);
ASSERT_EQ(0, offsets.size());
ASSERT_FALSE(has_more_res);
}
// insert 10 entities.
int num = 10;
auto data = this->random_generate(num);
Expand All @@ -76,38 +77,63 @@ TYPED_TEST_P(TypedOffsetOrderedMapTest, find_first) {
// all is satisfied.
BitsetType all(num);
all.set();
offsets = this->map_.find_first(num / 2, all, true);
ASSERT_EQ(num / 2, offsets.size());
for (int i = 1; i < offsets.size(); i++) {
ASSERT_TRUE(data[offsets[i - 1]] <= data[offsets[i]]);

{
auto [offsets, has_more_res] =
this->map_.find_first(num / 2, all, true);
ASSERT_EQ(num / 2, offsets.size());
ASSERT_TRUE(has_more_res);
for (int i = 1; i < offsets.size(); i++) {
ASSERT_TRUE(data[offsets[i - 1]] <= data[offsets[i]]);
}
}
offsets = this->map_.find_first(Unlimited, all, true);
ASSERT_EQ(num, offsets.size());
for (int i = 1; i < offsets.size(); i++) {
ASSERT_TRUE(data[offsets[i - 1]] <= data[offsets[i]]);
{
auto [offsets, has_more_res] =
this->map_.find_first(Unlimited, all, true);
ASSERT_EQ(num, offsets.size());
ASSERT_FALSE(has_more_res);
for (int i = 1; i < offsets.size(); i++) {
ASSERT_TRUE(data[offsets[i - 1]] <= data[offsets[i]]);
}
}

// corner case, segment offset exceeds the size of bitset.
BitsetType all_minus_1(num - 1);
all_minus_1.set();
offsets = this->map_.find_first(num / 2, all_minus_1, true);
ASSERT_EQ(num / 2, offsets.size());
for (int i = 1; i < offsets.size(); i++) {
ASSERT_TRUE(data[offsets[i - 1]] <= data[offsets[i]]);
{
auto [offsets, has_more_res] =
this->map_.find_first(num / 2, all_minus_1, true);
ASSERT_EQ(num / 2, offsets.size());
ASSERT_TRUE(has_more_res);
for (int i = 1; i < offsets.size(); i++) {
ASSERT_TRUE(data[offsets[i - 1]] <= data[offsets[i]]);
}
}
offsets = this->map_.find_first(Unlimited, all_minus_1, true);
ASSERT_EQ(all_minus_1.size(), offsets.size());
for (int i = 1; i < offsets.size(); i++) {
ASSERT_TRUE(data[offsets[i - 1]] <= data[offsets[i]]);
{
auto [offsets, has_more_res] =
this->map_.find_first(Unlimited, all_minus_1, true);
ASSERT_EQ(all_minus_1.size(), offsets.size());
ASSERT_FALSE(has_more_res);
for (int i = 1; i < offsets.size(); i++) {
ASSERT_TRUE(data[offsets[i - 1]] <= data[offsets[i]]);
}
}

// none is satisfied.
BitsetType none(num);
none.reset();
offsets = this->map_.find_first(num / 2, none, true);
ASSERT_EQ(0, offsets.size());
offsets = this->map_.find_first(NoLimit, none, true);
ASSERT_EQ(0, offsets.size());
{
auto [offsets, has_more_res] =
this->map_.find_first(num / 2, none, true);
ASSERT_TRUE(has_more_res);
ASSERT_EQ(0, offsets.size());
}
{
auto [offsets, has_more_res] =
this->map_.find_first(NoLimit, none, true);
ASSERT_TRUE(has_more_res);
ASSERT_EQ(0, offsets.size());
}
}

REGISTER_TYPED_TEST_SUITE_P(TypedOffsetOrderedMapTest, find_first);
Expand Down
1 change: 1 addition & 0 deletions internal/proto/internal.proto
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ message RetrieveResults {
// query request cost
CostAggregation costAggregation = 13;
int64 all_retrieve_count = 14;
bool has_more_result = 15;
}

message LoadIndex {
Expand Down
1 change: 1 addition & 0 deletions internal/proto/segcore.proto
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ message RetrieveResults {
repeated int64 offset = 2;
repeated schema.FieldData fields_data = 3;
int64 all_retrieve_count = 4;
bool has_more_result = 5;
}

message LoadFieldMeta {
Expand Down
Loading

0 comments on commit 416a2cf

Please sign in to comment.