Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(interactive): Support Index predicate for dynamic params in IR-Core #3282

Merged
merged 8 commits into from
Oct 19, 2023
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
244 changes: 111 additions & 133 deletions flex/codegen/src/hqps/hqps_scan_builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ static constexpr const char* SCAN_OP_TEMPLATE_NO_EXPR_STR =
/// 4. vertex label
/// 5. oid
static constexpr const char* SCAN_OP_WITH_OID_TEMPLATE_STR =
"auto %1% = Engine::template ScanVertex<%2%>(%3%, %4%, %5%));\n";
"auto %1% = Engine::template ScanVertexWithOid<%2%>(%3%, %4%, %5%);\n";

/**
* @brief When building scanOp, we ignore the data type provided in the pb.
Expand Down Expand Up @@ -85,16 +85,67 @@ class ScanOpBuilder {
if (!query_params.has_predicate()) {
VLOG(10) << "No expr in params";
}
query_params_ = query_params;
CHECK(labels_ids_.empty()) << "label ids should be empty";
if (!try_to_get_label_id_from_query_params(query_params, labels_ids_)) {
LOG(FATAL) << "fail to label id from expr";
}

// the user provide oid can be a const or a param const
if (query_params.has_predicate()) {
auto& predicate = query_params.predicate();
VLOG(10) << "predicate: " << predicate.DebugString();
// We first scan the predicate to find whether there is conditions on
// labels.
std::vector<int32_t> expr_label_ids;
if (try_to_get_label_ids_from_expr(predicate, expr_label_ids)) {
// join expr_label_ids with table_lable_ids;
VLOG(10) << "Found label ids in expr: "
<< gs::to_string(expr_label_ids);
intersection(labels_ids_, expr_label_ids);
}

auto expr_builder = ExprBuilder(ctx_);
expr_builder.set_return_type(common::DataType::BOOLEAN);
expr_builder.AddAllExprOpr(query_params.predicate().operators());

std::string expr_code;
std::vector<codegen::ParamConst> func_call_param_const;
std::vector<std::pair<int32_t, std::string>> expr_tag_props;
std::vector<common::DataType> unused_expr_ret_type;
std::tie(expr_func_name_, func_call_param_const, expr_tag_props,
expr_code, unused_expr_ret_type) = expr_builder.Build();
VLOG(10) << "Found expr in scan: " << expr_func_name_;
// generate code.
ctx_.AddExprCode(expr_code);
expr_var_name_ = ctx_.GetNextExprVarName();
{
std::stringstream ss;
for (auto i = 0; i < func_call_param_const.size(); ++i) {
ss << func_call_param_const[i].var_name;
if (i != func_call_param_const.size() - 1) {
ss << ",";
}
}
expr_construct_params_ = ss.str();
}
{
std::stringstream ss;
if (expr_tag_props.size() > 0) {
ss << ",";
for (auto i = 0; i + 1 < expr_tag_props.size(); ++i) {
ss << expr_tag_props[i].second << ", ";
}
ss << expr_tag_props[expr_tag_props.size() - 1].second;
}
selectors_str_ = ss.str();
}
}
return *this;
}

ScanOpBuilder& idx_predicate(const algebra::IndexPredicate& predicate) {
// check query_params not has predicate.
if (query_params_.has_predicate()) {
VLOG(10) << "query params already has predicate";
return *this;
}

// Currently we only support one predicate.
if (predicate.or_predicates_size() < 1) {
VLOG(10) << "No predicate in index predicate";
Expand All @@ -104,150 +155,71 @@ class ScanOpBuilder {
throw std::runtime_error(
std::string("Currently only support one predicate"));
}
CHECK(expr_func_name_.empty()) << "Predicate is already given by expr";
auto or_predicate = predicate.or_predicates(0);
if (or_predicate.predicates_size() != 1) {
throw std::runtime_error(
std::string("Currently only support one and predicate"));
}
auto triplet = or_predicate.predicates(0);
// add index predicate to query params
auto* new_predicate = query_params_.mutable_predicate();
{
auto first_op = new_predicate->add_operators();
common::Variable variable;
auto& property = triplet.key();
*(variable.mutable_property()) = property;
variable.mutable_node_type()->set_data_type(common::DataType::INT64);
*(first_op->mutable_var()) = variable;
}
{
auto second = new_predicate->add_operators();
second->set_logical(common::Logical::EQ);
second->mutable_node_type()->set_data_type(common::DataType::BOOLEAN);
}
{
auto third = new_predicate->add_operators();
auto& value = triplet.value();
third->mutable_node_type()->set_data_type(common::DataType::INT64);
*(third->mutable_const_()) = value;
auto& property = triplet.key();
if (triplet.value_case() == algebra::IndexPredicate::Triplet::kConst) {
// FUTURE: check property is really the primary key.
auto const_value = triplet.const_();
switch (const_value.item_case()) {
case common::Value::kI32:
oid_ = std::to_string(const_value.i32());
break;
case common::Value::kI64:
oid_ = std::to_string(const_value.i64());
break;
default:
LOG(FATAL) << "Currently only support int, long as primary key";
}
VLOG(1) << "Found oid: " << oid_ << " in index scan";
} else {
// dynamic param
auto dyn_param_pb = triplet.param();
auto param_const = param_const_pb_to_param_const(dyn_param_pb);
VLOG(10) << "receive param const in index predicate: "
<< dyn_param_pb.DebugString();
ctx_.AddParameterVar(param_const);
}
VLOG(10) << "Add index predicate to query params: "
<< query_params_.DebugString();

return *this;
}

std::string Build() const {
std::string label_name;
std::vector<int32_t> labels_ids;
if (!try_to_get_label_name_from_query_params(query_params_, label_name)) {
LOG(WARNING) << "fail to label name from expr";
if (!try_to_get_label_id_from_query_params(query_params_, labels_ids)) {
LOG(FATAL) << "fail to label id from expr";
}
}

// the user provide oid can be a const or a param const
if (query_params_.has_predicate()) {
auto& predicate = query_params_.predicate();
VLOG(10) << "predicate: " << predicate.DebugString();
// We first scan the predicate to find whether there is conditions on
// labels.
std::vector<int32_t> expr_label_ids;
if (try_to_get_label_ids_from_expr(predicate, expr_label_ids)) {
// join expr_label_ids with table_lable_ids;
VLOG(10) << "Found label ids in expr: "
<< gs::to_string(expr_label_ids);
intersection(labels_ids, expr_label_ids);
}
}
// CHECK(labels_ids.size() == 1) << "only support one label in scan";

#ifdef FAST_SCAN
gs::codegen::oid_t oid;
gs::codegen::ParamConst oid_param;
if (try_to_get_oid_from_expr(predicate, oid)) {
VLOG(10) << "Parse oid: " << oid << "from expr";
return scan_with_oid(label_name, label_id, oid);
} else if (try_to_get_oid_param_from_expr(predicate, oid_param)) {
VLOG(10) << "Parse oid param: " << oid_param.var_name << "from expr";
return scan_with_oid(label_name, label_id, oid_param.var_name);
// 1. If common expression predicate presents, scan with expression
if (!expr_func_name_.empty()) {
VLOG(1) << "Scan with expression";
return scan_with_expr(labels_ids_, expr_var_name_, expr_func_name_,
expr_construct_params_, selectors_str_);
} else {
VLOG(10) << "Fail to parse oid from expr";
{
#endif
if (query_params_.has_predicate()) {
auto expr_builder = ExprBuilder(ctx_);
expr_builder.set_return_type(common::DataType::BOOLEAN);
expr_builder.AddAllExprOpr(query_params_.predicate().operators());

std::string expr_func_name, expr_code;
std::vector<codegen::ParamConst> func_call_param_const;
std::vector<std::pair<int32_t, std::string>> expr_tag_props;
std::vector<common::DataType> unused_expr_ret_type;
std::tie(expr_func_name, func_call_param_const, expr_tag_props,
expr_code, unused_expr_ret_type) = expr_builder.Build();
VLOG(10) << "Found expr in scan: " << expr_func_name;
// generate code.
ctx_.AddExprCode(expr_code);
std::string expr_var_name = ctx_.GetNextExprVarName();
std::string
expr_construct_params; // function construction params and
std::string selectors_str; // selectors str, concatenated
{
std::stringstream ss;
for (auto i = 0; i < func_call_param_const.size(); ++i) {
ss << func_call_param_const[i].var_name;
if (i != func_call_param_const.size() - 1) {
ss << ",";
}
}
expr_construct_params = ss.str();
}
{
std::stringstream ss;
if (expr_tag_props.size() > 0) {
ss << ",";
for (auto i = 0; i + 1 < expr_tag_props.size(); ++i) {
ss << expr_tag_props[i].second << ", ";
}
ss << expr_tag_props[expr_tag_props.size() - 1].second;
}
selectors_str = ss.str();
}

// use expression to filter.
return scan_with_expr(labels_ids, expr_var_name, expr_func_name,
expr_construct_params, selectors_str);
} else {
return scan_without_expr(labels_ids);
}

#ifdef FAST_SCAN
// If oid_ not empty, scan with oid
if (!oid_.empty()) {
VLOG(1) << "Scan with oid: " << oid_;
return scan_with_oid(labels_ids_, oid_);
} else {
// If no oid, scan without expression
VLOG(1) << "Scan without expression";
return scan_without_expr(labels_ids_);
}
}
#endif
}

private:
std::string scan_with_oid(const std::string& label_name,
const int32_t& label_id, codegen::oid_t oid) const {
VLOG(10) << "Scan with fixed oid" << oid;
std::string next_ctx_name = ctx_.GetCurCtxName();
auto append_opt = res_alias_to_append_opt(res_alias_);

boost::format formater(SCAN_OP_WITH_OID_TEMPLATE_STR);
formater % next_ctx_name % append_opt % ctx_.GraphVar() % label_id % oid;
return formater.str();
}
std::string scan_with_oid(const std::string& label_name,
const int32_t& label_id,
std::string scan_with_oid(const std::vector<int32_t>& label_ids,
const std::string& oid) const {
VLOG(10) << "Scan with dynamic param oid";
VLOG(10) << "Scan with oid: " << oid;
CHECK(label_ids.size() == 1)
<< "Currently only support one label for index scan";
std::string next_ctx_name = ctx_.GetCurCtxName();
auto append_opt = res_alias_to_append_opt(res_alias_);

boost::format formater(SCAN_OP_WITH_OID_TEMPLATE_STR);
formater % next_ctx_name % append_opt % ctx_.GraphVar() % label_id % oid;
formater % next_ctx_name % append_opt % ctx_.GraphVar() % label_ids[0] %
oid;
return formater.str();
}

Expand Down Expand Up @@ -303,9 +275,13 @@ class ScanOpBuilder {
ctx_.GraphVar() % label_ids_str;
return formater.str();
}

BuildingContext& ctx_;
physical::Scan::ScanOpt scan_opt_;
algebra::QueryParams query_params_;
std::vector<int32_t> labels_ids_;
std::string expr_var_name_, expr_func_name_, expr_construct_params_,
selectors_str_; // The expression decode from params.
std::string oid_; // the oid decode from idx predicate, or param name.
int res_alias_;
};

Expand All @@ -322,11 +298,13 @@ static std::string BuildScanOp(
} else {
builder.resAlias(-1);
}
return builder.queryParams(scan_pb.params())
.idx_predicate(scan_pb.idx_predicate())
.Build();
builder.queryParams(scan_pb.params());
if (scan_pb.has_idx_predicate()) {
builder.idx_predicate(scan_pb.idx_predicate());
}
return builder.Build();
}

} // namespace gs

#endif // CODEGEN_SRC_HQPS_HQPS_SCAN_BUILDER_H_
#endif // CODEGEN_SRC_HQPS_HQPS_SCAN_BUILDER_H_
Loading
Loading