Skip to content

Commit

Permalink
enhance: remove timestamp_filter after retrieve
Browse files Browse the repository at this point in the history
Signed-off-by: luzhang <[email protected]>
  • Loading branch information
luzhang committed Aug 2, 2024
1 parent b4d0f4d commit 2939a4d
Show file tree
Hide file tree
Showing 7 changed files with 25 additions and 81 deletions.
11 changes: 4 additions & 7 deletions internal/core/src/exec/expression/TermExpr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,6 @@ PhyTermFilterExpr::CanSkipSegment() {
if (segment_->type() == SegmentType::Sealed &&
skip_index.CanSkipBinaryRange<T>(field_id_, 0, min, max, true, true)) {
cached_bits_.resize(active_count_, false);
cached_offsets_ = std::make_shared<ColumnVector>(DataType::INT64, 0);
cached_offsets_inited_ = true;
return true;
}
Expand Down Expand Up @@ -178,14 +177,9 @@ PhyTermFilterExpr::InitPkCacheOffset() {
auto [uids, seg_offsets] =
segment_->search_ids(*id_array, query_timestamp_);
cached_bits_.resize(active_count_, false);
cached_offsets_ =
std::make_shared<ColumnVector>(DataType::INT64, seg_offsets.size());
int64_t* cached_offsets_ptr = (int64_t*)cached_offsets_->GetRawData();
int i = 0;
for (const auto& offset : seg_offsets) {
auto _offset = (int64_t)offset.get();
cached_bits_[_offset] = true;
cached_offsets_ptr[i++] = _offset;
}
cached_offsets_inited_ = true;
}
Expand Down Expand Up @@ -214,7 +208,10 @@ PhyTermFilterExpr::ExecPkTermImpl() {
}

if (use_cache_offsets_) {
std::vector<VectorPtr> vecs{res_vec, cached_offsets_};
auto cache_bits_copy = cached_bits_.clone();
std::vector<VectorPtr> vecs{
res_vec,
std::make_shared<ColumnVector>(std::move(cache_bits_copy))};
return std::make_shared<RowVector>(vecs);
} else {
return res_vec;
Expand Down
36 changes: 1 addition & 35 deletions internal/core/src/query/generated/ExecPlanNodeVisitor.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,21 +78,6 @@ class ExecPlanNodeVisitor : public PlanNodeVisitor {
return ret;
}

void
SetExprCacheOffsets(std::vector<int64_t>&& offsets) {
expr_cached_pk_id_offsets_ = std::move(offsets);
}

void
AddExprCacheOffset(int64_t offset) {
expr_cached_pk_id_offsets_.push_back(offset);
}

const std::vector<int64_t>&
GetExprCacheOffsets() {
return expr_cached_pk_id_offsets_;
}

void
SetExprUsePkIndex(bool use_pk_index) {
expr_use_pk_index_ = use_pk_index;
Expand All @@ -103,29 +88,11 @@ class ExecPlanNodeVisitor : public PlanNodeVisitor {
return expr_use_pk_index_;
}

void
ExecuteExprNodeInternal(
const std::shared_ptr<milvus::plan::PlanNode>& plannode,
const milvus::segcore::SegmentInternalInterface* segment,
int64_t active_count,
BitsetType& result,
bool& cache_offset_getted,
std::vector<int64_t>& cache_offset);

void
ExecuteExprNode(const std::shared_ptr<milvus::plan::PlanNode>& plannode,
const milvus::segcore::SegmentInternalInterface* segment,
int64_t active_count,
BitsetType& result) {
bool get_cache_offset;
std::vector<int64_t> cache_offsets;
ExecuteExprNodeInternal(plannode,
segment,
active_count,
result,
get_cache_offset,
cache_offsets);
}
BitsetType& result);

private:
template <typename VectorType>
Expand All @@ -140,6 +107,5 @@ class ExecPlanNodeVisitor : public PlanNodeVisitor {
SearchResultOpt search_result_opt_;
RetrieveResultOpt retrieve_result_opt_;
bool expr_use_pk_index_ = false;
std::vector<int64_t> expr_cached_pk_id_offsets_;
};
} // namespace milvus::query
9 changes: 4 additions & 5 deletions internal/core/src/query/visitors/ExecExprVisitor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,8 @@ class ExecExprVisitor : ExprVisitor {

template <typename CmpFunc>
auto
ExecCompareExprDispatcher(CompareExpr& expr, CmpFunc cmp_func)
-> BitsetType;
ExecCompareExprDispatcher(CompareExpr& expr,
CmpFunc cmp_func) -> BitsetType;

private:
const segcore::SegmentInternalInterface& segment_;
Expand Down Expand Up @@ -2151,8 +2151,8 @@ ExecExprVisitor::ExecCompareExprDispatcherForNonIndexedSegment(

template <typename Op>
auto
ExecExprVisitor::ExecCompareExprDispatcher(CompareExpr& expr, Op op)
-> BitsetType {
ExecExprVisitor::ExecCompareExprDispatcher(CompareExpr& expr,
Op op) -> BitsetType {
using number = boost::variant<bool,
int8_t,
int16_t,
Expand Down Expand Up @@ -2547,7 +2547,6 @@ ExecExprVisitor::ExecTermVisitorImpl(TermExpr& expr_raw) -> BitsetType {
// If enable plan_visitor pk index cache, pass offsets_ to it
if (plan_visitor_ != nullptr) {
plan_visitor_->SetExprUsePkIndex(true);
plan_visitor_->SetExprCacheOffsets(std::move(cached_offsets));
}
AssertInfo(bitset.size() == row_count_,
"[ExecExprVisitor]Size of results not equal row count");
Expand Down
44 changes: 13 additions & 31 deletions internal/core/src/query/visitors/ExecPlanNodeVisitor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,11 @@ empty_search_result(int64_t num_queries, SearchInfo& search_info) {
}

void
ExecPlanNodeVisitor::ExecuteExprNodeInternal(
ExecPlanNodeVisitor::ExecuteExprNode(
const std::shared_ptr<milvus::plan::PlanNode>& plannode,
const milvus::segcore::SegmentInternalInterface* segment,
int64_t active_count,
BitsetType& bitset_holder,
bool& cache_offset_getted,
std::vector<int64_t>& cache_offset) {
BitsetType& bitset_holder) {
bitset_holder.clear();
LOG_DEBUG("plannode: {}, active_count: {}, timestamp: {}",
plannode->ToString(),
Expand All @@ -94,6 +92,7 @@ ExecPlanNodeVisitor::ExecuteExprNodeInternal(

auto task =
milvus::exec::Task::Create(DEFAULT_TASK_ID, plan, 0, query_context);
bool cache_offset_getted = false;
for (;;) {
auto result = task->Next();
if (!result) {
Expand All @@ -115,20 +114,17 @@ ExecPlanNodeVisitor::ExecuteExprNodeInternal(

if (!cache_offset_getted) {
// offset cache only get once because not support iterator batch
auto cache_offset_vec =
auto cache_bits_vec =
std::dynamic_pointer_cast<ColumnVector>(row->child(1));
// If get empty cached offsets. mean no record hits in this segment
TargetBitmapView view(cache_bits_vec->GetRawData(),
cache_bits_vec->size());
// If get empty cached bits. mean no record hits in this segment
// no need to get next batch.
if (cache_offset_vec->size() == 0) {
if (view.count() == 0) {
bitset_holder.resize(active_count);
task->RequestCancel();
break;
}
auto cache_offset_vec_ptr =
(int64_t*)(cache_offset_vec->GetRawData());
for (size_t i = 0; i < cache_offset_vec->size(); ++i) {
cache_offset.push_back(cache_offset_vec_ptr[i]);
}
cache_offset_getted = true;
}
} else {
Expand Down Expand Up @@ -281,17 +277,12 @@ ExecPlanNodeVisitor::visit(RetrievePlanNode& node) {
bitset_holder.resize(active_count);
}

// This flag used to indicate whether to get offset from expr module that
// speeds up mvcc filter in the next interface: "timestamp_filter"
bool get_cache_offset = false;
std::vector<int64_t> cache_offsets;
if (node.filter_plannode_.has_value()) {
ExecuteExprNodeInternal(node.filter_plannode_.value(),
segment,
active_count,
bitset_holder,
get_cache_offset,
cache_offsets);
ExecuteExprNode(node.filter_plannode_.value(),
segment,
active_count,
bitset_holder);
bitset_holder.flip();
}

Expand All @@ -313,16 +304,7 @@ ExecPlanNodeVisitor::visit(RetrievePlanNode& node) {
}

retrieve_result.total_data_cnt_ = bitset_holder.size();
bool false_filtered_out = false;
if (get_cache_offset) {
segment->timestamp_filter(bitset_holder, cache_offsets, timestamp_);
} else {
bitset_holder.flip();
false_filtered_out = true;
segment->timestamp_filter(bitset_holder, timestamp_);
}
auto results_pair =
segment->find_first(node.limit_, bitset_holder, false_filtered_out);
auto results_pair = segment->find_first(node.limit_, bitset_holder);
retrieve_result.result_offsets_ = std::move(results_pair.first);
retrieve_result.has_more_result = results_pair.second;
retrieve_result_opt_ = std::move(retrieve_result);
Expand Down
2 changes: 1 addition & 1 deletion internal/core/src/segcore/SegmentGrowingImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ class SegmentGrowingImpl : public SegmentGrowing {
std::pair<std::vector<OffsetMap::OffsetType>, bool>
find_first(int64_t limit,
const BitsetType& bitset,
bool false_filtered_out) const override {
bool false_filtered_out = false) const override {
return insert_record_.pk2offset_->find_first(
limit, bitset, false_filtered_out);
}
Expand Down
2 changes: 1 addition & 1 deletion internal/core/src/segcore/SegmentInterface.h
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ class SegmentInternalInterface : public SegmentInterface {
virtual std::pair<std::vector<OffsetMap::OffsetType>, bool>
find_first(int64_t limit,
const BitsetType& bitset,
bool false_filtered_out) const = 0;
bool false_filtered_out = false) const = 0;

void
FillTargetEntry(
Expand Down
2 changes: 1 addition & 1 deletion internal/core/src/segcore/SegmentSealedImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ class SegmentSealedImpl : public SegmentSealed {
std::pair<std::vector<OffsetMap::OffsetType>, bool>
find_first(int64_t limit,
const BitsetType& bitset,
bool false_filtered_out) const override {
bool false_filtered_out = false) const override {
return insert_record_.pk2offset_->find_first(
limit, bitset, false_filtered_out);
}
Expand Down

0 comments on commit 2939a4d

Please sign in to comment.