Skip to content

Commit

Permalink
enhance: support iterative filter execution (#37363)
Browse files Browse the repository at this point in the history
issue: #37360

---------

Signed-off-by: chasingegg <[email protected]>
  • Loading branch information
chasingegg authored Dec 11, 2024
1 parent a118ca1 commit 994fc54
Show file tree
Hide file tree
Showing 67 changed files with 6,401 additions and 1,563 deletions.
16 changes: 16 additions & 0 deletions internal/core/src/common/Chunk.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,28 @@ namespace milvus {
std::pair<std::vector<std::string_view>, FixedVector<bool>>
StringChunk::StringViews() {
std::vector<std::string_view> ret;
ret.reserve(row_nums_);
for (int i = 0; i < row_nums_; i++) {
ret.emplace_back(data_ + offsets_[i], offsets_[i + 1] - offsets_[i]);
}
return {ret, valid_};
}

std::pair<std::vector<std::string_view>, FixedVector<bool>>
StringChunk::ViewsByOffsets(const FixedVector<int32_t>& offsets) {
std::vector<std::string_view> ret;
FixedVector<bool> valid_res;
size_t size = offsets.size();
ret.reserve(size);
valid_res.reserve(size);
for (auto i = 0; i < size; ++i) {
ret.emplace_back(data_ + offsets_[offsets[i]],
offsets_[offsets[i] + 1] - offsets_[offsets[i]]);
valid_res.emplace_back(isValid(offsets[i]));
}
return {ret, valid_res};
}

void
ArrayChunk::ConstructViews() {
views_.reserve(row_nums_);
Expand Down
8 changes: 7 additions & 1 deletion internal/core/src/common/Chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,10 @@ class Chunk {

virtual bool
isValid(int offset) {
return valid_[offset];
if (nullable_) {
return valid_[offset];
}
return true;
};

protected:
Expand Down Expand Up @@ -170,6 +173,9 @@ class StringChunk : public Chunk {
return result;
}

std::pair<std::vector<std::string_view>, FixedVector<bool>>
ViewsByOffsets(const FixedVector<int32_t>& offsets);

const char*
ValueAt(int64_t idx) const override {
return (*this)[idx].data();
Expand Down
2 changes: 2 additions & 0 deletions internal/core/src/common/Consts.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ const char KMEANS_CLUSTER[] = "KMEANS";
const char VEC_OPT_FIELDS[] = "opt_fields";
const char PAGE_RETAIN_ORDER[] = "page_retain_order";
const char TEXT_LOG_ROOT_PATH[] = "text_log";
const char ITERATIVE_FILTER[] = "iterative_filter";
const char HINTS[] = "hints";

const char DEFAULT_PLANNODE_ID[] = "0";
const char DEAFULT_QUERY_ID[] = "0";
Expand Down
1 change: 1 addition & 0 deletions internal/core/src/common/QueryInfo.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ struct SearchInfo {
std::optional<FieldId> group_by_field_id_;
tracer::TraceContext trace_ctx_;
bool materialized_view_involved = false;
bool iterative_filter_execution = false;
};

using SearchInfoPtr = std::shared_ptr<SearchInfo>;
Expand Down
12 changes: 9 additions & 3 deletions internal/core/src/exec/Driver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "exec/operator/CallbackSink.h"
#include "exec/operator/CountNode.h"
#include "exec/operator/FilterBitsNode.h"
#include "exec/operator/IterativeFilterNode.h"
#include "exec/operator/MvccNode.h"
#include "exec/operator/Operator.h"
#include "exec/operator/VectorSearchNode.h"
Expand Down Expand Up @@ -52,11 +53,16 @@ DriverFactory::CreateDriver(std::unique_ptr<DriverContext> ctx,
for (size_t i = 0; i < plannodes_.size(); ++i) {
auto id = operators.size();
auto plannode = plannodes_[i];
if (auto filternode =
if (auto filterbitsnode =
std::dynamic_pointer_cast<const plan::FilterBitsNode>(
plannode)) {
operators.push_back(
std::make_unique<PhyFilterBitsNode>(id, ctx.get(), filternode));
operators.push_back(std::make_unique<PhyFilterBitsNode>(
id, ctx.get(), filterbitsnode));
} else if (auto filternode =
std::dynamic_pointer_cast<const plan::FilterNode>(
plannode)) {
operators.push_back(std::make_unique<PhyIterativeFilterNode>(
id, ctx.get(), filternode));
} else if (auto mvccnode =
std::dynamic_pointer_cast<const plan::MvccNode>(
plannode)) {
Expand Down
5 changes: 5 additions & 0 deletions internal/core/src/exec/QueryContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,11 @@ class QueryContext : public Context {
return search_info_;
}

knowhere::MetricType
get_metric_type() {
return search_info_.metric_type_;
}

const query::PlaceholderGroup*
get_placeholder_group() {
return placeholder_group_;
Expand Down
10 changes: 7 additions & 3 deletions internal/core/src/exec/expression/AlwaysTrueExpr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,13 @@ namespace exec {

void
PhyAlwaysTrueExpr::Eval(EvalCtx& context, VectorPtr& result) {
int64_t real_batch_size = current_pos_ + batch_size_ >= active_count_
? active_count_ - current_pos_
: batch_size_;
auto input = context.get_offset_input();
has_offset_input_ = (input != nullptr);
int64_t real_batch_size = (has_offset_input_)
? input->size()
: (current_pos_ + batch_size_ >= active_count_
? active_count_ - current_pos_
: batch_size_);

// always true no need to skip null
if (real_batch_size == 0) {
Expand Down
11 changes: 7 additions & 4 deletions internal/core/src/exec/expression/AlwaysTrueExpr.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,14 @@ class PhyAlwaysTrueExpr : public Expr {

void
MoveCursor() override {
int64_t real_batch_size = current_pos_ + batch_size_ >= active_count_
? active_count_ - current_pos_
: batch_size_;
if (!has_offset_input_) {
int64_t real_batch_size =
current_pos_ + batch_size_ >= active_count_
? active_count_ - current_pos_
: batch_size_;

current_pos_ += real_batch_size;
current_pos_ += real_batch_size;
}
}

private:
Expand Down
Loading

0 comments on commit 994fc54

Please sign in to comment.