Skip to content

Commit

Permalink
fix(interactive): vertex with string oid is not support (#4171)
Browse files Browse the repository at this point in the history
  • Loading branch information
liulx20 authored Aug 22, 2024
1 parent c2d15dd commit 890b7ec
Show file tree
Hide file tree
Showing 6 changed files with 288 additions and 67 deletions.
226 changes: 183 additions & 43 deletions flex/engines/graph_db/runtime/adhoc/operators/scan.cc
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ static bool is_find_vertex(const physical::Scan& scan_opr,
} else if (triplet.const_().item_case() == common::Value::kI64) {
vertex_id = triplet.const_().i64();
} else {
LOG(FATAL) << "unexpected value case" << triplet.const_().item_case();
return false;
}
} break;
case algebra::IndexPredicate_Triplet::ValueCase::kParam: {
Expand All @@ -91,7 +91,7 @@ static bool is_find_vertex(const physical::Scan& scan_opr,
vertex_id = std::stoll(value);
} break;
default: {
LOG(FATAL) << "unexpected value case";
return false;
} break;
}

Expand All @@ -101,9 +101,11 @@ static bool is_find_vertex(const physical::Scan& scan_opr,
bool parse_idx_predicate(const algebra::IndexPredicate& predicate,
const std::map<std::string, std::string>& params,
std::vector<int64_t>& oids, bool& scan_oid) {
// todo unsupported cases.
if (predicate.or_predicates_size() != 1) {
return false;
}
// todo unsupported cases.
if (predicate.or_predicates(0).predicates_size() != 1) {
return false;
}
Expand All @@ -120,7 +122,6 @@ bool parse_idx_predicate(const algebra::IndexPredicate& predicate,
} else {
LOG(FATAL) << "unexpected key case";
}
// const common::Property& key = triplet.key();
if (triplet.cmp() != common::Logical::EQ && triplet.cmp() != common::WITHIN) {
return false;
}
Expand All @@ -138,7 +139,7 @@ bool parse_idx_predicate(const algebra::IndexPredicate& predicate,
}

} else {
LOG(FATAL) << "unexpected value case" << triplet.const_().item_case();
return false;
}
} else if (triplet.value_case() ==
algebra::IndexPredicate_Triplet::ValueCase::kParam) {
Expand All @@ -150,6 +151,85 @@ bool parse_idx_predicate(const algebra::IndexPredicate& predicate,
return true;
}

bool parse_idx_predicate(const algebra::IndexPredicate& predicate,
const std::map<std::string, std::string>& params,
std::vector<Any>& oids, bool& scan_oid) {
// todo unsupported cases.
if (predicate.or_predicates_size() != 1) {
return false;
}
// todo unsupported cases.
if (predicate.or_predicates(0).predicates_size() != 1) {
return false;
}
const algebra::IndexPredicate_Triplet& triplet =
predicate.or_predicates(0).predicates(0);
if (!triplet.has_key()) {
return false;
}
auto key = triplet.key();
if (key.has_key()) {
scan_oid = true;
} else if (key.has_id()) {
scan_oid = false;
} else {
LOG(FATAL) << "unexpected key case";
}
if (triplet.cmp() != common::Logical::EQ && triplet.cmp() != common::WITHIN) {
return false;
}

if (triplet.value_case() ==
algebra::IndexPredicate_Triplet::ValueCase::kConst) {
if (triplet.const_().item_case() == common::Value::kI32) {
oids.emplace_back(triplet.const_().i32());
} else if (triplet.const_().item_case() == common::Value::kI64) {
oids.emplace_back(triplet.const_().i64());
} else if (triplet.const_().item_case() == common::Value::kI64Array) {
const auto& arr = triplet.const_().i64_array();
for (int i = 0; i < arr.item_size(); ++i) {
oids.emplace_back(arr.item(i));
}

} else if (triplet.const_().item_case() == common::Value::kStr) {
std::string value = triplet.const_().str();
oids.emplace_back(Any::From(value));
} else if (triplet.const_().item_case() == common::Value::kStrArray) {
const auto& arr = triplet.const_().str_array();
for (int i = 0; i < arr.item_size(); ++i) {
oids.emplace_back(Any::From(arr.item(i)));
}
} else {
return false;
}
} else if (triplet.value_case() ==
algebra::IndexPredicate_Triplet::ValueCase::kParam) {
const common::DynamicParam& p = triplet.param();
if (p.data_type().type_case() == common::IrDataType::TypeCase::kDataType) {
auto dt = p.data_type().data_type();
if (dt == common::DataType::INT64) {
std::string name = p.name();
std::string value = params.at(name);
int64_t v = std::stoll(value);
oids.emplace_back(v);
} else if (dt == common::DataType::STRING) {
std::string name = p.name();
std::string value = params.at(name);
oids.emplace_back(Any::From(value));
} else if (dt == common::DataType::INT32) {
std::string name = p.name();
std::string value = params.at(name);
int32_t v = std::stoi(value);
oids.emplace_back(v);
} else {
LOG(FATAL) << "unsupported primary key type" << dt;
return false;
}
}
}
return true;
}

Context eval_scan(const physical::Scan& scan_opr, const ReadTransaction& txn,
const std::map<std::string, std::string>& params) {
label_t label;
Expand All @@ -158,7 +238,7 @@ Context eval_scan(const physical::Scan& scan_opr, const ReadTransaction& txn,

bool scan_oid;
if (is_find_vertex(scan_opr, params, label, vertex_id, alias, scan_oid)) {
return Scan::find_vertex(txn, label, vertex_id, alias, scan_oid);
return Scan::find_vertex_with_id(txn, label, vertex_id, alias, scan_oid);
}

const auto& opt = scan_opr.scan_opt();
Expand All @@ -170,51 +250,110 @@ Context eval_scan(const physical::Scan& scan_opr, const ReadTransaction& txn,
scan_params.alias = -1;
}
CHECK(scan_opr.has_params());
bool has_other_type_oid = false;
const auto& scan_opr_params = scan_opr.params();
for (const auto& table : scan_opr_params.tables()) {
// exclude invalid vertex label id
if (txn.schema().vertex_label_num() <= table.id()) {
continue;
}
scan_params.tables.push_back(table.id());
const auto& pks = txn.schema().get_vertex_primary_key(table.id());
if (pks.size() > 1) {
LOG(FATAL) << "only support one primary key";
}
auto [type, _, __] = pks[0];
if (type != PropertyType::kInt64) {
has_other_type_oid = true;
}
}

if (scan_opr.has_idx_predicate() && scan_opr_params.has_predicate()) {
Context ctx;
auto expr = parse_expression(
txn, ctx, params, scan_opr_params.predicate(), VarType::kVertexVar);
std::vector<int64_t> oids{};
CHECK(parse_idx_predicate(scan_opr.idx_predicate(), params, oids,
scan_oid));
if (scan_oid) {
return Scan::scan_vertex(
txn, scan_params, [&expr, &txn, oids](label_t label, vid_t vid) {
return std::find(oids.begin(), oids.end(),
txn.GetVertexId(label, vid).AsInt64()) !=
oids.end() &&
expr->eval_vertex(label, vid, 0).as_bool();
});
} else {
return Scan::scan_gid_vertex(
txn, scan_params,
[&expr, oids](label_t label, vid_t vid) {
return expr->eval_vertex(label, vid, 0).as_bool();
},
oids);
// implicit type conversion will happen when oid is int64_t
if (!has_other_type_oid && scan_opr.has_idx_predicate()) {
if (scan_opr.has_idx_predicate() && scan_opr_params.has_predicate()) {
Context ctx;
auto expr = parse_expression(
txn, ctx, params, scan_opr_params.predicate(), VarType::kVertexVar);
std::vector<int64_t> oids{};
CHECK(parse_idx_predicate(scan_opr.idx_predicate(), params, oids,
scan_oid));
if (scan_oid) {
return Scan::filter_oids(
txn, scan_params,
[&expr, oids](label_t label, vid_t vid) {
return expr->eval_vertex(label, vid, 0).as_bool();
},
oids);
} else {
return Scan::filter_gids(
txn, scan_params,
[&expr, oids](label_t label, vid_t vid) {
return expr->eval_vertex(label, vid, 0).as_bool();
},
oids);
}
}
}

if (scan_opr.has_idx_predicate()) {
std::vector<int64_t> oids{};
CHECK(parse_idx_predicate(scan_opr.idx_predicate(), params, oids,
scan_oid));
if (scan_opr.has_idx_predicate()) {
std::vector<int64_t> oids{};
CHECK(parse_idx_predicate(scan_opr.idx_predicate(), params, oids,
scan_oid));

if (scan_oid) {
return Scan::scan_vertex(
txn, scan_params, [&txn, oids](label_t label, vid_t vid) {
return std::find(oids.begin(), oids.end(),
txn.GetVertexId(label, vid).AsInt64()) !=
oids.end();
});
} else {
return Scan::scan_gid_vertex(
txn, scan_params, [](label_t, vid_t) { return true; }, oids);
if (scan_oid) {
return Scan::filter_oids(
txn, scan_params, [](label_t label, vid_t vid) { return true; },
oids);
} else {
return Scan::filter_gids(
txn, scan_params, [](label_t, vid_t) { return true; }, oids);
}
}
} else if (scan_opr.has_idx_predicate()) {
if (scan_opr.has_idx_predicate() && scan_opr_params.has_predicate()) {
Context ctx;
auto expr = parse_expression(
txn, ctx, params, scan_opr_params.predicate(), VarType::kVertexVar);
std::vector<Any> oids{};
CHECK(parse_idx_predicate(scan_opr.idx_predicate(), params, oids,
scan_oid));
if (scan_oid) {
return Scan::filter_oids(
txn, scan_params,
[&expr, oids](label_t label, vid_t vid) {
return expr->eval_vertex(label, vid, 0).as_bool();
},
oids);
} else {
std::vector<int64_t> gids;
for (size_t i = 0; i < oids.size(); i++) {
gids.push_back(oids[i].AsInt64());
}
return Scan::filter_gids(
txn, scan_params,
[&expr, gids](label_t label, vid_t vid) {
return expr->eval_vertex(label, vid, 0).as_bool();
},
gids);
}
}

if (scan_opr.has_idx_predicate()) {
std::vector<Any> oids{};
CHECK(parse_idx_predicate(scan_opr.idx_predicate(), params, oids,
scan_oid));

if (scan_oid) {
return Scan::filter_oids(
txn, scan_params, [](label_t label, vid_t vid) { return true; },
oids);
} else {
std::vector<int64_t> gids;
for (size_t i = 0; i < oids.size(); i++) {
gids.push_back(oids[i].AsInt64());
}
return Scan::filter_gids(
txn, scan_params, [](label_t, vid_t) { return true; }, gids);
}
}
}

Expand All @@ -240,7 +379,8 @@ Context eval_scan(const physical::Scan& scan_opr, const ReadTransaction& txn,
[](label_t, vid_t) { return true; });
}
}
LOG(FATAL) << "AAAAA";
LOG(FATAL) << "unsupport scan option " << scan_opr.DebugString()
<< " we only support scan vertex currently";
return Context();
}

Expand Down
28 changes: 26 additions & 2 deletions flex/engines/graph_db/runtime/adhoc/var.cc
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,20 @@ Var::Var(const ReadTransaction& txn, const Context& ctx,
getter_ = std::make_shared<VertexGIdPathAccessor>(ctx, tag);
} else if (pt.has_key()) {
if (pt.key().name() == "id") {
getter_ = std::make_shared<VertexIdPathAccessor>(txn, ctx, tag);
if (type_ == RTAnyType::kStringValue) {
getter_ =
std::make_shared<VertexIdPathAccessor<std::string_view>>(
txn, ctx, tag);
} else if (type_ == RTAnyType::kI32Value) {
getter_ = std::make_shared<VertexIdPathAccessor<int32_t>>(
txn, ctx, tag);
} else if (type_ == RTAnyType::kI64Value) {
getter_ = std::make_shared<VertexIdPathAccessor<int64_t>>(
txn, ctx, tag);
} else {
LOG(FATAL) << "not support for "
<< static_cast<int>(type_.type_enum_);
}
} else {
getter_ = create_vertex_property_path_accessor(txn, ctx, tag, type_,
pt.key().name());
Expand Down Expand Up @@ -111,7 +124,18 @@ Var::Var(const ReadTransaction& txn, const Context& ctx,
getter_ = std::make_shared<VertexGIdVertexAccessor>();
} else if (pt.has_key()) {
if (pt.key().name() == "id") {
getter_ = std::make_shared<VertexIdVertexAccessor>(txn);
if (type_ == RTAnyType::kStringValue) {
getter_ =
std::make_shared<VertexIdVertexAccessor<std::string_view>>(
txn);
} else if (type_ == RTAnyType::kI32Value) {
getter_ = std::make_shared<VertexIdVertexAccessor<int32_t>>(txn);
} else if (type_ == RTAnyType::kI64Value) {
getter_ = std::make_shared<VertexIdVertexAccessor<int64_t>>(txn);
} else {
LOG(FATAL) << "not support for "
<< static_cast<int>(type_.type_enum_);
}
} else {
getter_ = create_vertex_property_vertex_accessor(txn, type_,
pt.key().name());
Expand Down
14 changes: 8 additions & 6 deletions flex/engines/graph_db/runtime/common/accessors.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,10 @@ class VertexPathAccessor : public IAccessor {
const IVertexColumn& vertex_col_;
};

template <typename KEY_T>
class VertexIdPathAccessor : public IAccessor {
public:
using elem_t = int64_t;
using elem_t = KEY_T;
VertexIdPathAccessor(const ReadTransaction& txn, const Context& ctx, int tag)
: txn_(txn),
vertex_col_(*std::dynamic_pointer_cast<IVertexColumn>(ctx.get(tag))) {}
Expand All @@ -104,11 +105,11 @@ class VertexIdPathAccessor : public IAccessor {

elem_t typed_eval_path(size_t idx) const {
const auto& v = vertex_col_.get_vertex(idx);
return txn_.GetVertexId(v.first, v.second).AsInt64();
return AnyConverter<KEY_T>::from_any(txn_.GetVertexId(v.first, v.second));
}

RTAny eval_path(size_t idx) const override {
return RTAny::from_int64(typed_eval_path(idx));
return RTAny(typed_eval_path(idx));
}

std::shared_ptr<IContextColumnBuilder> builder() const override {
Expand Down Expand Up @@ -271,13 +272,14 @@ class ContextValueAccessor : public IAccessor {
const IValueColumn<elem_t>& col_;
};

template <typename KEY_T>
class VertexIdVertexAccessor : public IAccessor {
public:
using elem_t = int64_t;
using elem_t = KEY_T;
VertexIdVertexAccessor(const ReadTransaction& txn) : txn_(txn) {}

elem_t typed_eval_vertex(label_t label, vid_t v, size_t idx) const {
return txn_.GetVertexId(label, v).AsInt64();
return AnyConverter<KEY_T>::from_any(txn_.GetVertexId(label, v));
}

RTAny eval_path(size_t idx) const override {
Expand All @@ -286,7 +288,7 @@ class VertexIdVertexAccessor : public IAccessor {
}

RTAny eval_vertex(label_t label, vid_t v, size_t idx) const override {
return RTAny::from_int64(typed_eval_vertex(label, v, idx));
return RTAny(Any(typed_eval_vertex(label, v, idx)));
}

private:
Expand Down
Loading

0 comments on commit 890b7ec

Please sign in to comment.