From 85d832285af8726b99ef25ec98c342cdee99d8da Mon Sep 17 00:00:00 2001 From: BingqingLyu Date: Tue, 10 Oct 2023 19:11:25 +0800 Subject: [PATCH 1/6] [Proto] enable pk value to be a DynParam in IndexPredicate --- interactive_engine/executor/ir/proto/algebra.proto | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/interactive_engine/executor/ir/proto/algebra.proto b/interactive_engine/executor/ir/proto/algebra.proto index 533d9dff4e56..639e0fccc894 100644 --- a/interactive_engine/executor/ir/proto/algebra.proto +++ b/interactive_engine/executor/ir/proto/algebra.proto @@ -191,10 +191,16 @@ message Limit { // where the values referred by k1, k2, ... are indexed and hence the // predicate can be efficiently verified by leveraging the index. message IndexPredicate { + message PkValue { + oneof item { + common.Value value = 1; + common.DynamicParam dyn_param = 2; + } + } // A triplet defines that a key must be **equal** to a given constant value. message Triplet { common.Property key = 1; - common.Value value = 2; + PkValue value = 2; // TODO(longbin) More comparators (gt, ge, lt, le, ne) other than equivalence (eq) may be required common.None cmp = 3; } From b5a5fb2f5c8f6b527e2a7c6293c91503bea242f2 Mon Sep 17 00:00:00 2001 From: BingqingLyu Date: Tue, 10 Oct 2023 19:27:48 +0800 Subject: [PATCH 2/6] [IR Core] support pk value to be a DynParam in IndexPredicate --- .../executor/ir/common/src/utils.rs | 87 +++++++++++++++---- .../executor/ir/core/src/plan/ffi.rs | 5 +- .../executor/ir/core/src/plan/logical.rs | 45 +++++++--- .../graph_proxy/src/utils/expr/eval_pred.rs | 16 +++- 4 files changed, 118 insertions(+), 35 deletions(-) diff --git a/interactive_engine/executor/ir/common/src/utils.rs b/interactive_engine/executor/ir/common/src/utils.rs index b6e807331a7f..487f776711d7 100644 --- a/interactive_engine/executor/ir/common/src/utils.rs +++ b/interactive_engine/executor/ir/common/src/utils.rs @@ -254,6 +254,39 @@ impl From for common_pb::Variable { } } +impl From for pb::index_predicate::PkValue { + fn from(value: i32) -> Self { + let val: common_pb::Value = value.into(); + val.into() + } +} + +impl From for pb::index_predicate::PkValue { + fn from(value: i64) -> Self { + let val: common_pb::Value = value.into(); + val.into() + } +} + +impl From for pb::index_predicate::PkValue { + fn from(value: String) -> Self { + let val: common_pb::Value = value.into(); + val.into() + } +} + +impl From for pb::index_predicate::PkValue { + fn from(value: common_pb::Value) -> Self { + pb::index_predicate::PkValue { item: Some(pb::index_predicate::pk_value::Item::Value(value)) } + } +} + +impl From for pb::index_predicate::PkValue { + fn from(param: common_pb::DynamicParam) -> Self { + pb::index_predicate::PkValue { item: Some(pb::index_predicate::pk_value::Item::DynParam(param)) } + } +} + impl From for pb::index_predicate::AndPredicate { fn from(id: i64) -> Self { pb::index_predicate::AndPredicate { @@ -350,22 +383,35 @@ impl TryFrom for Vec { let (key, value) = (predicate.key.as_ref(), predicate.value.as_ref()); let key = key.ok_or("key is empty in kv_pair in indexed_scan")?; if let Some(common_pb::property::Item::Id(_id_key)) = key.item.as_ref() { - let value = value.ok_or("value is empty in kv_pair in indexed_scan")?; - - match &value.item { - Some(common_pb::value::Item::I64(v)) => { - global_ids.push(*v); - } - Some(common_pb::value::Item::I64Array(arr)) => { - global_ids.extend(arr.item.iter().cloned()) - } - Some(common_pb::value::Item::I32(v)) => { - global_ids.push(*v as i64); - } - Some(common_pb::value::Item::I32Array(arr)) => { - global_ids.extend(arr.item.iter().map(|i| *i as i64)); - } - _ => Err(ParsePbError::Unsupported( + let value_item = value + .ok_or(ParsePbError::EmptyFieldError( + "`Value` is empty in kv_pair in indexed_scan".to_string(), + ))? + .item + .as_ref() + .ok_or(ParsePbError::EmptyFieldError( + "`Value.item` is emtpy in kv_pair in indexed_scan".to_string(), + ))?; + + match value_item { + pb::index_predicate::pk_value::Item::Value(value) => match value.item.as_ref() { + Some(common_pb::value::Item::I64(v)) => { + global_ids.push(*v); + } + Some(common_pb::value::Item::I64Array(arr)) => { + global_ids.extend(arr.item.iter().cloned()) + } + Some(common_pb::value::Item::I32(v)) => { + global_ids.push(*v as i64); + } + Some(common_pb::value::Item::I32Array(arr)) => { + global_ids.extend(arr.item.iter().map(|i| *i as i64)); + } + _ => Err(ParsePbError::Unsupported( + "indexed value other than integer (I32, I64) and integer array".to_string(), + ))?, + }, + pb::index_predicate::pk_value::Item::DynParam(_) => Err(ParsePbError::Unsupported( "indexed value other than integer (I32, I64) and integer array".to_string(), ))?, } @@ -390,7 +436,7 @@ impl TryFrom for Vec<(NameOrId, Object)> { .key .clone() .ok_or("key is empty in kv_pair in indexed_scan")?; - let value = predicate + let value_pb = predicate .value .clone() .ok_or("value is empty in kv_pair in indexed_scan")?; @@ -400,6 +446,13 @@ impl TryFrom for Vec<(NameOrId, Object)> { "Other keys rather than property key in kv_pair in indexed_scan".to_string(), ))?, }; + let value = match value_pb.item { + Some(pb::index_predicate::pk_value::Item::Value(value)) => value, + _ => Err(ParsePbError::Unsupported(format!( + "unsupported indexed predicate value {:?}", + value_pb + )))?, + }; let obj_val = Object::try_from(value)?; primary_key_values.push((key, obj_val)); } diff --git a/interactive_engine/executor/ir/core/src/plan/ffi.rs b/interactive_engine/executor/ir/core/src/plan/ffi.rs index 1b1de7938d62..c505ebb9c080 100644 --- a/interactive_engine/executor/ir/core/src/plan/ffi.rs +++ b/interactive_engine/executor/ir/core/src/plan/ffi.rs @@ -1755,7 +1755,10 @@ mod scan { fn parse_equiv_predicate( key: FfiProperty, value: FfiConst, ) -> Result { - Ok(pb::index_predicate::Triplet { key: key.try_into()?, value: Some(value.try_into()?), cmp: None }) + let pk_value = pb::index_predicate::PkValue { + item: Some(pb::index_predicate::pk_value::Item::Value(value.try_into()?)), + }; + Ok(pb::index_predicate::Triplet { key: key.try_into()?, value: Some(pk_value), cmp: None }) } #[no_mangle] diff --git a/interactive_engine/executor/ir/core/src/plan/logical.rs b/interactive_engine/executor/ir/core/src/plan/logical.rs index 9a72fc98f5a9..887645a5ddff 100644 --- a/interactive_engine/executor/ir/core/src/plan/logical.rs +++ b/interactive_engine/executor/ir/core/src/plan/logical.rs @@ -847,7 +847,6 @@ fn triplet_to_index_predicate( let schema = meta.schema.as_ref().unwrap(); let mut key = None; let mut is_eq = false; - let mut value = None; if let Some(item) = &operators.get(0).unwrap().item { match item { common_pb::expr_opr::Item::Var(var) => { @@ -900,22 +899,35 @@ fn triplet_to_index_predicate( if let Some(item) = &operators.get(2).unwrap().item { match item { common_pb::expr_opr::Item::Const(c) => { - value = Some(c.clone()); + let idx_pred = pb::IndexPredicate { + or_predicates: vec![pb::index_predicate::AndPredicate { + predicates: vec![pb::index_predicate::Triplet { + key, + value: Some(c.clone().into()), + cmp: None, + }], + }], + }; + return Ok(Some(idx_pred)); + } + common_pb::expr_opr::Item::Param(param) => { + let idx_pred = pb::IndexPredicate { + or_predicates: vec![pb::index_predicate::AndPredicate { + predicates: vec![pb::index_predicate::Triplet { + key, + value: Some(param.clone().into()), + cmp: None, + }], + }], + }; + + return Ok(Some(idx_pred)); } _ => { /*do nothing*/ } } - }; - if value.is_none() { - return Ok(None); } - let idx_pred = pb::IndexPredicate { - or_predicates: vec![pb::index_predicate::AndPredicate { - predicates: vec![pb::index_predicate::Triplet { key, value, cmp: None }], - }], - }; - - Ok(Some(idx_pred)) + Ok(None) } fn get_table_id_from_pb(schema: &Schema, name: &common_pb::NameOrId) -> Option { @@ -1392,7 +1404,14 @@ impl AsLogical for pb::IndexPredicate { } common_pb::property::Item::Label(_) => { if let Some(val) = pred.value.as_mut() { - preprocess_label(val, meta, plan_meta)?; + if let Some(item) = val.item.as_mut() { + match item { + pb::index_predicate::pk_value::Item::Value(val) => { + preprocess_label(val, meta, plan_meta)? + } + pb::index_predicate::pk_value::Item::DynParam(_) => {} + } + } } } _ => {} diff --git a/interactive_engine/executor/ir/graph_proxy/src/utils/expr/eval_pred.rs b/interactive_engine/executor/ir/graph_proxy/src/utils/expr/eval_pred.rs index 59d10fc542fa..159e729e617d 100644 --- a/interactive_engine/executor/ir/graph_proxy/src/utils/expr/eval_pred.rs +++ b/interactive_engine/executor/ir/graph_proxy/src/utils/expr/eval_pred.rs @@ -168,16 +168,24 @@ impl TryFrom for Predicates { type Error = ParsePbError; fn try_from(triplet: pb::index_predicate::Triplet) -> Result { + let value = if let Some(value) = &triplet.value { + match &value.item { + Some(pb::index_predicate::pk_value::Item::Value(v)) => Some(v.clone()), + _ => Err(ParsePbError::Unsupported(format!( + "unsupported indexed predicate value {:?}", + value + )))?, + } + } else { + None + }; let partial = Partial::SingleItem { left: triplet .key .map(|var| var.try_into()) .transpose()?, cmp: Some(common_pb::Logical::Eq), - right: triplet - .value - .map(|val| val.try_into()) - .transpose()?, + right: value.map(|val| val.try_into()).transpose()?, }; Option::::from(partial) From 0f3b27bc36b9ac1b0cd24061adcacffb7f634511 Mon Sep 17 00:00:00 2001 From: BingqingLyu Date: Wed, 11 Oct 2023 14:10:38 +0800 Subject: [PATCH 3/6] [CI Tests] add ci for pkscan with DynParam --- .../executor/ir/core/src/plan/logical.rs | 56 +++++++++++++++++++ 1 file changed, 56 insertions(+) diff --git a/interactive_engine/executor/ir/core/src/plan/logical.rs b/interactive_engine/executor/ir/core/src/plan/logical.rs index 887645a5ddff..94d2251c8bd4 100644 --- a/interactive_engine/executor/ir/core/src/plan/logical.rs +++ b/interactive_engine/executor/ir/core/src/plan/logical.rs @@ -2240,6 +2240,62 @@ mod test { ); } + #[test] + fn scan_pred_to_idx_pred_with_dyn_param() { + let mut plan_meta = PlanMeta::default(); + plan_meta.set_curr_node(0); + plan_meta.curr_node_meta_mut(); + plan_meta.refer_to_nodes(0, vec![0]); + let meta = StoreMeta { + schema: Some( + Schema::from_json(std::fs::File::open("resource/modern_schema_pk.json").unwrap()).unwrap(), + ), + }; + // predicate: @.name == $person_name + let dyn_param = + common_pb::DynamicParam { name: "person_name".to_string(), index: 0, data_type: None }; + let dyn_param_opr = common_pb::ExprOpr { + node_type: None, + item: Some(common_pb::expr_opr::Item::Param(dyn_param.clone())), + }; + let mut predicate = str_to_expr_pb("@.name == ".to_string()).unwrap(); + predicate.operators.push(dyn_param_opr); + + let mut scan = pb::Scan { + scan_opt: 0, + alias: None, + params: Some(pb::QueryParams { + tables: vec!["person".into()], + columns: vec![], + is_all_columns: false, + limit: None, + predicate: Some(predicate), + sample_ratio: 1.0, + extra: HashMap::new(), + }), + idx_predicate: None, + is_count_only: false, + meta_data: None, + }; + + scan.preprocess(&meta, &mut plan_meta).unwrap(); + assert!(scan.params.unwrap().predicate.is_none()); + assert_eq!( + scan.idx_predicate.unwrap(), + pb::IndexPredicate { + or_predicates: vec![pb::index_predicate::AndPredicate { + predicates: vec![pb::index_predicate::Triplet { + key: Some(common_pb::Property { + item: Some(common_pb::property::Item::Key("name".into())), + }), + value: Some(dyn_param.into()), + cmp: None, + }] + }] + } + ); + } + #[test] fn column_maintain_case1() { let mut plan = LogicalPlan::with_root(); From 1d177987fae1a3aa9d90c916cc576b3557e00518 Mon Sep 17 00:00:00 2001 From: "bingqing.lbq" Date: Mon, 16 Oct 2023 07:01:51 +0000 Subject: [PATCH 4/6] refine the codes Committed-by: bingqing.lbq from Dev container --- .../executor/ir/common/src/utils.rs | 44 ++++++++----------- .../executor/ir/core/src/plan/ffi.rs | 9 ++-- .../executor/ir/core/src/plan/logical.rs | 10 ++--- .../graph_proxy/src/utils/expr/eval_pred.rs | 4 +- .../executor/ir/proto/algebra.proto | 16 +++---- 5 files changed, 37 insertions(+), 46 deletions(-) diff --git a/interactive_engine/executor/ir/common/src/utils.rs b/interactive_engine/executor/ir/common/src/utils.rs index 487f776711d7..97a972b68824 100644 --- a/interactive_engine/executor/ir/common/src/utils.rs +++ b/interactive_engine/executor/ir/common/src/utils.rs @@ -254,36 +254,36 @@ impl From for common_pb::Variable { } } -impl From for pb::index_predicate::PkValue { +impl From for pb::index_predicate::triplet::Value { fn from(value: i32) -> Self { let val: common_pb::Value = value.into(); val.into() } } -impl From for pb::index_predicate::PkValue { +impl From for pb::index_predicate::triplet::Value { fn from(value: i64) -> Self { let val: common_pb::Value = value.into(); val.into() } } -impl From for pb::index_predicate::PkValue { +impl From for pb::index_predicate::triplet::Value { fn from(value: String) -> Self { let val: common_pb::Value = value.into(); val.into() } } -impl From for pb::index_predicate::PkValue { +impl From for pb::index_predicate::triplet::Value { fn from(value: common_pb::Value) -> Self { - pb::index_predicate::PkValue { item: Some(pb::index_predicate::pk_value::Item::Value(value)) } + pb::index_predicate::triplet::Value::Const(value) } } -impl From for pb::index_predicate::PkValue { +impl From for pb::index_predicate::triplet::Value { fn from(param: common_pb::DynamicParam) -> Self { - pb::index_predicate::PkValue { item: Some(pb::index_predicate::pk_value::Item::DynParam(param)) } + pb::index_predicate::triplet::Value::Param(param) } } @@ -383,18 +383,12 @@ impl TryFrom for Vec { let (key, value) = (predicate.key.as_ref(), predicate.value.as_ref()); let key = key.ok_or("key is empty in kv_pair in indexed_scan")?; if let Some(common_pb::property::Item::Id(_id_key)) = key.item.as_ref() { - let value_item = value - .ok_or(ParsePbError::EmptyFieldError( - "`Value` is empty in kv_pair in indexed_scan".to_string(), - ))? - .item - .as_ref() - .ok_or(ParsePbError::EmptyFieldError( - "`Value.item` is emtpy in kv_pair in indexed_scan".to_string(), - ))?; + let value_item = value.ok_or(ParsePbError::EmptyFieldError( + "`Value` is empty in kv_pair in indexed_scan".to_string(), + ))?; match value_item { - pb::index_predicate::pk_value::Item::Value(value) => match value.item.as_ref() { + pb::index_predicate::triplet::Value::Const(value) => match value.item.as_ref() { Some(common_pb::value::Item::I64(v)) => { global_ids.push(*v); } @@ -411,7 +405,7 @@ impl TryFrom for Vec { "indexed value other than integer (I32, I64) and integer array".to_string(), ))?, }, - pb::index_predicate::pk_value::Item::DynParam(_) => Err(ParsePbError::Unsupported( + pb::index_predicate::triplet::Value::Param(_) => Err(ParsePbError::Unsupported( "indexed value other than integer (I32, I64) and integer array".to_string(), ))?, } @@ -446,15 +440,15 @@ impl TryFrom for Vec<(NameOrId, Object)> { "Other keys rather than property key in kv_pair in indexed_scan".to_string(), ))?, }; - let value = match value_pb.item { - Some(pb::index_predicate::pk_value::Item::Value(value)) => value, - _ => Err(ParsePbError::Unsupported(format!( + if let pb::index_predicate::triplet::Value::Const(value) = value_pb { + let obj_val = Object::try_from(value)?; + primary_key_values.push((key, obj_val)); + } else { + Err(ParsePbError::Unsupported(format!( "unsupported indexed predicate value {:?}", value_pb - )))?, - }; - let obj_val = Object::try_from(value)?; - primary_key_values.push((key, obj_val)); + )))? + } } Ok(primary_key_values) } diff --git a/interactive_engine/executor/ir/core/src/plan/ffi.rs b/interactive_engine/executor/ir/core/src/plan/ffi.rs index c505ebb9c080..6139ec234875 100644 --- a/interactive_engine/executor/ir/core/src/plan/ffi.rs +++ b/interactive_engine/executor/ir/core/src/plan/ffi.rs @@ -1755,10 +1755,11 @@ mod scan { fn parse_equiv_predicate( key: FfiProperty, value: FfiConst, ) -> Result { - let pk_value = pb::index_predicate::PkValue { - item: Some(pb::index_predicate::pk_value::Item::Value(value.try_into()?)), - }; - Ok(pb::index_predicate::Triplet { key: key.try_into()?, value: Some(pk_value), cmp: None }) + Ok(pb::index_predicate::Triplet { + key: key.try_into()?, + value: Some(pb::index_predicate::triplet::Value::Const(value.try_into()?)), + cmp: None, + }) } #[no_mangle] diff --git a/interactive_engine/executor/ir/core/src/plan/logical.rs b/interactive_engine/executor/ir/core/src/plan/logical.rs index 94d2251c8bd4..4739d7244ae1 100644 --- a/interactive_engine/executor/ir/core/src/plan/logical.rs +++ b/interactive_engine/executor/ir/core/src/plan/logical.rs @@ -1404,13 +1404,11 @@ impl AsLogical for pb::IndexPredicate { } common_pb::property::Item::Label(_) => { if let Some(val) = pred.value.as_mut() { - if let Some(item) = val.item.as_mut() { - match item { - pb::index_predicate::pk_value::Item::Value(val) => { - preprocess_label(val, meta, plan_meta)? - } - pb::index_predicate::pk_value::Item::DynParam(_) => {} + match val { + pb::index_predicate::triplet::Value::Const(val) => { + preprocess_label(val, meta, plan_meta)? } + pb::index_predicate::triplet::Value::Param(_) => {} } } } diff --git a/interactive_engine/executor/ir/graph_proxy/src/utils/expr/eval_pred.rs b/interactive_engine/executor/ir/graph_proxy/src/utils/expr/eval_pred.rs index 159e729e617d..450ec4eaedf8 100644 --- a/interactive_engine/executor/ir/graph_proxy/src/utils/expr/eval_pred.rs +++ b/interactive_engine/executor/ir/graph_proxy/src/utils/expr/eval_pred.rs @@ -169,8 +169,8 @@ impl TryFrom for Predicates { fn try_from(triplet: pb::index_predicate::Triplet) -> Result { let value = if let Some(value) = &triplet.value { - match &value.item { - Some(pb::index_predicate::pk_value::Item::Value(v)) => Some(v.clone()), + match &value { + pb::index_predicate::triplet::Value::Const(v) => Some(v.clone()), _ => Err(ParsePbError::Unsupported(format!( "unsupported indexed predicate value {:?}", value diff --git a/interactive_engine/executor/ir/proto/algebra.proto b/interactive_engine/executor/ir/proto/algebra.proto index 639e0fccc894..a0559c43efc1 100644 --- a/interactive_engine/executor/ir/proto/algebra.proto +++ b/interactive_engine/executor/ir/proto/algebra.proto @@ -191,18 +191,16 @@ message Limit { // where the values referred by k1, k2, ... are indexed and hence the // predicate can be efficiently verified by leveraging the index. message IndexPredicate { - message PkValue { - oneof item { - common.Value value = 1; - common.DynamicParam dyn_param = 2; - } - } - // A triplet defines that a key must be **equal** to a given constant value. + // A triplet defines that a key must be **equal** to a given value. + // The value can be a constant value, or a dynamic parameter. message Triplet { common.Property key = 1; - PkValue value = 2; + oneof value { + common.Value const = 2; + common.DynamicParam param = 3; + } // TODO(longbin) More comparators (gt, ge, lt, le, ne) other than equivalence (eq) may be required - common.None cmp = 3; + common.None cmp = 4; } // A collection of `Triplet` that forms a logical **AND** of all `Predicate`s. message AndPredicate { From 4479e4427dd5b5a0d5a30d89a66e0d49dedb0377 Mon Sep 17 00:00:00 2001 From: zhanglei1949 Date: Tue, 17 Oct 2023 15:52:38 +0800 Subject: [PATCH 5/6] fix CI --- flex/codegen/src/hqps/hqps_scan_builder.h | 244 ++++++++++------------ 1 file changed, 111 insertions(+), 133 deletions(-) diff --git a/flex/codegen/src/hqps/hqps_scan_builder.h b/flex/codegen/src/hqps/hqps_scan_builder.h index 9f24d9740b51..348e5fd7ecf2 100644 --- a/flex/codegen/src/hqps/hqps_scan_builder.h +++ b/flex/codegen/src/hqps/hqps_scan_builder.h @@ -55,7 +55,7 @@ static constexpr const char* SCAN_OP_TEMPLATE_NO_EXPR_STR = /// 4. vertex label /// 5. oid static constexpr const char* SCAN_OP_WITH_OID_TEMPLATE_STR = - "auto %1% = Engine::template ScanVertex<%2%>(%3%, %4%, %5%));\n"; + "auto %1% = Engine::template ScanVertexWithOid<%2%>(%3%, %4%, %5%);\n"; /** * @brief When building scanOp, we ignore the data type provided in the pb. @@ -85,16 +85,67 @@ class ScanOpBuilder { if (!query_params.has_predicate()) { VLOG(10) << "No expr in params"; } - query_params_ = query_params; + CHECK(labels_ids_.empty()) << "label ids should be empty"; + if (!try_to_get_label_id_from_query_params(query_params, labels_ids_)) { + LOG(FATAL) << "fail to label id from expr"; + } + + // the user provide oid can be a const or a param const + if (query_params.has_predicate()) { + auto& predicate = query_params.predicate(); + VLOG(10) << "predicate: " << predicate.DebugString(); + // We first scan the predicate to find whether there is conditions on + // labels. + std::vector expr_label_ids; + if (try_to_get_label_ids_from_expr(predicate, expr_label_ids)) { + // join expr_label_ids with table_lable_ids; + VLOG(10) << "Found label ids in expr: " + << gs::to_string(expr_label_ids); + intersection(labels_ids_, expr_label_ids); + } + + auto expr_builder = ExprBuilder(ctx_); + expr_builder.set_return_type(common::DataType::BOOLEAN); + expr_builder.AddAllExprOpr(query_params.predicate().operators()); + + std::string expr_code; + std::vector func_call_param_const; + std::vector> expr_tag_props; + std::vector unused_expr_ret_type; + std::tie(expr_func_name_, func_call_param_const, expr_tag_props, + expr_code, unused_expr_ret_type) = expr_builder.Build(); + VLOG(10) << "Found expr in scan: " << expr_func_name_; + // generate code. + ctx_.AddExprCode(expr_code); + expr_var_name_ = ctx_.GetNextExprVarName(); + { + std::stringstream ss; + for (auto i = 0; i < func_call_param_const.size(); ++i) { + ss << func_call_param_const[i].var_name; + if (i != func_call_param_const.size() - 1) { + ss << ","; + } + } + expr_construct_params_ = ss.str(); + } + { + std::stringstream ss; + if (expr_tag_props.size() > 0) { + ss << ","; + for (auto i = 0; i + 1 < expr_tag_props.size(); ++i) { + ss << expr_tag_props[i].second << ", "; + } + ss << expr_tag_props[expr_tag_props.size() - 1].second; + } + selectors_str_ = ss.str(); + } + } return *this; } ScanOpBuilder& idx_predicate(const algebra::IndexPredicate& predicate) { // check query_params not has predicate. - if (query_params_.has_predicate()) { - VLOG(10) << "query params already has predicate"; - return *this; - } + // Currently we only support one predicate. if (predicate.or_predicates_size() < 1) { VLOG(10) << "No predicate in index predicate"; @@ -104,150 +155,71 @@ class ScanOpBuilder { throw std::runtime_error( std::string("Currently only support one predicate")); } + CHECK(expr_func_name_.empty()) << "Predicate is already given by expr"; auto or_predicate = predicate.or_predicates(0); if (or_predicate.predicates_size() != 1) { throw std::runtime_error( std::string("Currently only support one and predicate")); } auto triplet = or_predicate.predicates(0); - // add index predicate to query params - auto* new_predicate = query_params_.mutable_predicate(); - { - auto first_op = new_predicate->add_operators(); - common::Variable variable; - auto& property = triplet.key(); - *(variable.mutable_property()) = property; - variable.mutable_node_type()->set_data_type(common::DataType::INT64); - *(first_op->mutable_var()) = variable; - } - { - auto second = new_predicate->add_operators(); - second->set_logical(common::Logical::EQ); - second->mutable_node_type()->set_data_type(common::DataType::BOOLEAN); - } - { - auto third = new_predicate->add_operators(); - auto& value = triplet.value(); - third->mutable_node_type()->set_data_type(common::DataType::INT64); - *(third->mutable_const_()) = value; + auto& property = triplet.key(); + if (triplet.value_case() == algebra::IndexPredicate::Triplet::kConst) { + // FUTURE: check property is really the primary key. + auto const_value = triplet.const_(); + switch (const_value.item_case()) { + case common::Value::kI32: + oid_ = std::to_string(const_value.i32()); + break; + case common::Value::kI64: + oid_ = std::to_string(const_value.i64()); + break; + default: + LOG(FATAL) << "Currently only support int, long as primary key"; + } + VLOG(1) << "Found oid: " << oid_ << " in index scan"; + } else { + // dynamic param + auto dyn_param_pb = triplet.param(); + auto param_const = param_const_pb_to_param_const(dyn_param_pb); + VLOG(10) << "receive param const in index predicate: " + << dyn_param_pb.DebugString(); + ctx_.AddParameterVar(param_const); } - VLOG(10) << "Add index predicate to query params: " - << query_params_.DebugString(); + return *this; } std::string Build() const { - std::string label_name; - std::vector labels_ids; - if (!try_to_get_label_name_from_query_params(query_params_, label_name)) { - LOG(WARNING) << "fail to label name from expr"; - if (!try_to_get_label_id_from_query_params(query_params_, labels_ids)) { - LOG(FATAL) << "fail to label id from expr"; - } - } - - // the user provide oid can be a const or a param const - if (query_params_.has_predicate()) { - auto& predicate = query_params_.predicate(); - VLOG(10) << "predicate: " << predicate.DebugString(); - // We first scan the predicate to find whether there is conditions on - // labels. - std::vector expr_label_ids; - if (try_to_get_label_ids_from_expr(predicate, expr_label_ids)) { - // join expr_label_ids with table_lable_ids; - VLOG(10) << "Found label ids in expr: " - << gs::to_string(expr_label_ids); - intersection(labels_ids, expr_label_ids); - } - } - // CHECK(labels_ids.size() == 1) << "only support one label in scan"; - -#ifdef FAST_SCAN - gs::codegen::oid_t oid; - gs::codegen::ParamConst oid_param; - if (try_to_get_oid_from_expr(predicate, oid)) { - VLOG(10) << "Parse oid: " << oid << "from expr"; - return scan_with_oid(label_name, label_id, oid); - } else if (try_to_get_oid_param_from_expr(predicate, oid_param)) { - VLOG(10) << "Parse oid param: " << oid_param.var_name << "from expr"; - return scan_with_oid(label_name, label_id, oid_param.var_name); + // 1. If common expression predicate presents, scan with expression + if (!expr_func_name_.empty()) { + VLOG(1) << "Scan with expression"; + return scan_with_expr(labels_ids_, expr_var_name_, expr_func_name_, + expr_construct_params_, selectors_str_); } else { - VLOG(10) << "Fail to parse oid from expr"; - { -#endif - if (query_params_.has_predicate()) { - auto expr_builder = ExprBuilder(ctx_); - expr_builder.set_return_type(common::DataType::BOOLEAN); - expr_builder.AddAllExprOpr(query_params_.predicate().operators()); - - std::string expr_func_name, expr_code; - std::vector func_call_param_const; - std::vector> expr_tag_props; - std::vector unused_expr_ret_type; - std::tie(expr_func_name, func_call_param_const, expr_tag_props, - expr_code, unused_expr_ret_type) = expr_builder.Build(); - VLOG(10) << "Found expr in scan: " << expr_func_name; - // generate code. - ctx_.AddExprCode(expr_code); - std::string expr_var_name = ctx_.GetNextExprVarName(); - std::string - expr_construct_params; // function construction params and - std::string selectors_str; // selectors str, concatenated - { - std::stringstream ss; - for (auto i = 0; i < func_call_param_const.size(); ++i) { - ss << func_call_param_const[i].var_name; - if (i != func_call_param_const.size() - 1) { - ss << ","; - } - } - expr_construct_params = ss.str(); - } - { - std::stringstream ss; - if (expr_tag_props.size() > 0) { - ss << ","; - for (auto i = 0; i + 1 < expr_tag_props.size(); ++i) { - ss << expr_tag_props[i].second << ", "; - } - ss << expr_tag_props[expr_tag_props.size() - 1].second; - } - selectors_str = ss.str(); - } - - // use expression to filter. - return scan_with_expr(labels_ids, expr_var_name, expr_func_name, - expr_construct_params, selectors_str); - } else { - return scan_without_expr(labels_ids); - } - -#ifdef FAST_SCAN + // If oid_ not empty, scan with oid + if (!oid_.empty()) { + VLOG(1) << "Scan with oid: " << oid_; + return scan_with_oid(labels_ids_, oid_); + } else { + // If no oid, scan without expression + VLOG(1) << "Scan without expression"; + return scan_without_expr(labels_ids_); } } -#endif } private: - std::string scan_with_oid(const std::string& label_name, - const int32_t& label_id, codegen::oid_t oid) const { - VLOG(10) << "Scan with fixed oid" << oid; - std::string next_ctx_name = ctx_.GetCurCtxName(); - auto append_opt = res_alias_to_append_opt(res_alias_); - - boost::format formater(SCAN_OP_WITH_OID_TEMPLATE_STR); - formater % next_ctx_name % append_opt % ctx_.GraphVar() % label_id % oid; - return formater.str(); - } - std::string scan_with_oid(const std::string& label_name, - const int32_t& label_id, + std::string scan_with_oid(const std::vector& label_ids, const std::string& oid) const { - VLOG(10) << "Scan with dynamic param oid"; + VLOG(10) << "Scan with oid: " << oid; + CHECK(label_ids.size() == 1) + << "Currently only support one label for index scan"; std::string next_ctx_name = ctx_.GetCurCtxName(); auto append_opt = res_alias_to_append_opt(res_alias_); boost::format formater(SCAN_OP_WITH_OID_TEMPLATE_STR); - formater % next_ctx_name % append_opt % ctx_.GraphVar() % label_id % oid; + formater % next_ctx_name % append_opt % ctx_.GraphVar() % label_ids[0] % + oid; return formater.str(); } @@ -303,9 +275,13 @@ class ScanOpBuilder { ctx_.GraphVar() % label_ids_str; return formater.str(); } + BuildingContext& ctx_; physical::Scan::ScanOpt scan_opt_; - algebra::QueryParams query_params_; + std::vector labels_ids_; + std::string expr_var_name_, expr_func_name_, expr_construct_params_, + selectors_str_; // The expression decode from params. + std::string oid_; // the oid decode from idx predicate, or param name. int res_alias_; }; @@ -322,11 +298,13 @@ static std::string BuildScanOp( } else { builder.resAlias(-1); } - return builder.queryParams(scan_pb.params()) - .idx_predicate(scan_pb.idx_predicate()) - .Build(); + builder.queryParams(scan_pb.params()); + if (scan_pb.has_idx_predicate()) { + builder.idx_predicate(scan_pb.idx_predicate()); + } + return builder.Build(); } } // namespace gs -#endif // CODEGEN_SRC_HQPS_HQPS_SCAN_BUILDER_H_ \ No newline at end of file +#endif // CODEGEN_SRC_HQPS_HQPS_SCAN_BUILDER_H_ From 850bd254d111dc34221449f2a066f6ddef171337 Mon Sep 17 00:00:00 2001 From: "xiaolei.zl" Date: Wed, 18 Oct 2023 20:37:03 +0800 Subject: [PATCH 6/6] fix CI --- flex/codegen/src/hqps/hqps_scan_builder.h | 34 +++++++++++++++---- flex/engines/hqps_db/core/operator/scan.h | 34 ++++++++++++++++++- flex/engines/hqps_db/core/sync_engine.h | 26 ++++++++++++++ .../hqps_db/database/mutable_csr_interface.h | 16 ++++----- 4 files changed, 92 insertions(+), 18 deletions(-) diff --git a/flex/codegen/src/hqps/hqps_scan_builder.h b/flex/codegen/src/hqps/hqps_scan_builder.h index 348e5fd7ecf2..6756b9aec909 100644 --- a/flex/codegen/src/hqps/hqps_scan_builder.h +++ b/flex/codegen/src/hqps/hqps_scan_builder.h @@ -54,9 +54,19 @@ static constexpr const char* SCAN_OP_TEMPLATE_NO_EXPR_STR = /// 3. graph name /// 4. vertex label /// 5. oid -static constexpr const char* SCAN_OP_WITH_OID_TEMPLATE_STR = +static constexpr const char* SCAN_OP_WITH_OID_ONE_LABEL_TEMPLATE_STR = "auto %1% = Engine::template ScanVertexWithOid<%2%>(%3%, %4%, %5%);\n"; +/// Args +/// 1. res_ctx_name +/// 2. AppendOpt, +/// 3. graph name +/// 4. vertex label +/// 5. oid +static constexpr const char* SCAN_OP_WITH_OID_MUL_LABEL_TEMPLATE_STR = + "auto %1% = Engine::template ScanVertexWithOid<%2%>(%3%, " + "std::array{%5%}, %6%);\n"; + /** * @brief When building scanOp, we ignore the data type provided in the pb. * @@ -212,15 +222,25 @@ class ScanOpBuilder { std::string scan_with_oid(const std::vector& label_ids, const std::string& oid) const { VLOG(10) << "Scan with oid: " << oid; - CHECK(label_ids.size() == 1) - << "Currently only support one label for index scan"; std::string next_ctx_name = ctx_.GetCurCtxName(); auto append_opt = res_alias_to_append_opt(res_alias_); - boost::format formater(SCAN_OP_WITH_OID_TEMPLATE_STR); - formater % next_ctx_name % append_opt % ctx_.GraphVar() % label_ids[0] % - oid; - return formater.str(); + if (label_ids.size() == 1) { + boost::format formater(SCAN_OP_WITH_OID_ONE_LABEL_TEMPLATE_STR); + formater % next_ctx_name % append_opt % ctx_.GraphVar() % label_ids[0] % + oid; + return formater.str(); + } else { + boost::format formater(SCAN_OP_WITH_OID_MUL_LABEL_TEMPLATE_STR); + std::stringstream ss; + for (auto i = 0; i + 1 < label_ids.size(); ++i) { + ss << std::to_string(label_ids[i]) << ", "; + } + ss << std::to_string(label_ids[label_ids.size() - 1]); + formater % next_ctx_name % append_opt % ctx_.GraphVar() % + label_ids.size() % ss.str() % oid; + return formater.str(); + } } std::string scan_without_expr(const std::vector& label_ids) const { diff --git a/flex/engines/hqps_db/core/operator/scan.h b/flex/engines/hqps_db/core/operator/scan.h index 6061a47101d9..7caf9e4bb686 100644 --- a/flex/engines/hqps_db/core/operator/scan.h +++ b/flex/engines/hqps_db/core/operator/scan.h @@ -100,10 +100,42 @@ class Scan { const label_id_t& v_label_id, int64_t oid) { std::vector gids; - gids.emplace_back(graph.ScanVerticesWithOid(v_label_id, oid)); + vertex_id_t vid; + if (graph.ScanVerticesWithOid(v_label_id, oid, vid)) { + gids.emplace_back(vid); + } return make_default_row_vertex_set(std::move(gids), v_label_id); } + /// @brief Scan vertex with oid + /// @param graph + /// @param v_label_ids + /// @param oid + /// @return + template + static auto ScanVertexWithOid( + const GRAPH_INTERFACE& graph, + const std::array& v_label_ids, int64_t oid) { + std::vector gids; + std::vector labels_vec; + std::vector bitsets; + vertex_id_t vid; + for (auto i = 0; i < num_labels; ++i) { + if (graph.ScanVerticesWithOid(v_label_ids[i], oid, vid)) { + labels_vec.emplace_back(v_label_ids[i]); + gids.emplace_back(vid); + } + } + bitsets.resize(labels_vec.size()); + for (auto i = 0; i < bitsets.size(); ++i) { + bitsets[i].init(gids.size()); + bitsets[i].set_bit(i); + } + + return make_general_set(std::move(gids), std::move(labels_vec), + std::move(bitsets)); + } + private: template static GeneralVertexSet diff --git a/flex/engines/hqps_db/core/sync_engine.h b/flex/engines/hqps_db/core/sync_engine.h index 8975a9b7174b..ce91a5ed30ba 100644 --- a/flex/engines/hqps_db/core/sync_engine.h +++ b/flex/engines/hqps_db/core/sync_engine.h @@ -185,6 +185,32 @@ class SyncEngine : public BaseEngine { return Context(std::move(v_set_tuple)); } + template ::type* = + nullptr, + typename COL_T = GeneralVertexSet> + static Context ScanVertexWithOid( + const GRAPH_INTERFACE& graph, std::array v_labels, + int64_t oid) { + auto v_set_tuple = + Scan::ScanVertexWithOid(graph, v_labels, oid); + + return Context(std::move(v_set_tuple)); + } + + template < + AppendOpt append_opt, typename LabelT, size_t num_labels, + typename std::enable_if<(append_opt == AppendOpt::Temp)>::type* = nullptr, + typename COL_T = GeneralVertexSet> + static Context ScanVertexWithOid( + const GRAPH_INTERFACE& graph, std::array v_labels, + int64_t oid) { + auto v_set_tuple = + Scan::ScanVertexWithOid(graph, v_labels, oid); + + return Context(std::move(v_set_tuple)); + } + //////////////////////////EdgeExpand//////////////////////////// ////Edge ExpandE with multiple edge label triplets. (src, dst, edge) diff --git a/flex/engines/hqps_db/database/mutable_csr_interface.h b/flex/engines/hqps_db/database/mutable_csr_interface.h index 0f1b9f3c89fa..4153f8a00d8d 100644 --- a/flex/engines/hqps_db/database/mutable_csr_interface.h +++ b/flex/engines/hqps_db/database/mutable_csr_interface.h @@ -182,12 +182,10 @@ class MutableCSRInterface { * @param label * @param oid */ - vertex_id_t ScanVerticesWithOid(const std::string& label, - outer_vertex_id_t oid) const { + bool ScanVerticesWithOid(const std::string& label, outer_vertex_id_t oid, + vertex_id_t& vid) const { auto label_id = db_session_.schema().get_vertex_label_id(label); - vertex_id_t vid; - CHECK(db_session_.graph().get_lid(label_id, oid, vid)); - return vid; + return db_session_.graph().get_lid(label_id, oid, vid); } /** @@ -196,11 +194,9 @@ class MutableCSRInterface { * @param label_id * @param oid */ - vertex_id_t ScanVerticesWithOid(const label_id_t& label_id, - outer_vertex_id_t oid) const { - vertex_id_t vid; - CHECK(db_session_.graph().get_lid(label_id, oid, vid)); - return vid; + bool ScanVerticesWithOid(const label_id_t& label_id, outer_vertex_id_t oid, + vertex_id_t& vid) const { + return db_session_.graph().get_lid(label_id, oid, vid); } /**