Skip to content

Commit

Permalink
fix(interactive): fix bugs when try to get labels or properties from …
Browse files Browse the repository at this point in the history
…an None entry (#4341)

<!--
Thanks for your contribution! please review
https://github.com/alibaba/GraphScope/blob/main/CONTRIBUTING.md before
opening an issue.
-->

## What do these changes do?

<!-- Please give a short brief about these changes. -->

As titled.


## Related issue number

<!-- Are there any issues opened that will be resolved by merging this
change? -->

Fixes #4340
  • Loading branch information
BingqingLyu authored Dec 12, 2024
1 parent 939e4a7 commit 086d7b1
Show file tree
Hide file tree
Showing 9 changed files with 103 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -219,4 +219,15 @@ public static QueryContext get_simple_match_query_17_test() {
List<String> expected = Arrays.asList("Record<{$f0: 851}>");
return new QueryContext(query, expected);
}

public static QueryContext get_simple_match_query_18_test() {
String query =
"MATCH (country:PLACE {name:"
+ " \"India\"})<-[:ISPARTOF]-(:PLACE)<-[:ISLOCATEDIN]-(zombie:PERSON)\n"
+ "OPTIONAL MATCH (zombie)<-[:HASCREATOR]-(message)\n"
+ "WHERE message.length < 100\n"
+ " Return count(country);";
List<String> expected = Arrays.asList("Record<{$f0: 39783}>");
return new QueryContext(query, expected);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,13 @@ public void run_simple_match_17_test() {
Assert.assertEquals(testQuery.getExpectedResult().toString(), result.list().toString());
}

@Test
public void run_simple_match_18_test() {
QueryContext testQuery = SimpleMatchQueries.get_simple_match_query_18_test();
Result result = session.run(testQuery.getQuery());
Assert.assertEquals(testQuery.getExpectedResult().toString(), result.list().toString());
}

@AfterClass
public static void afterClass() {
if (session != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ use crate::utils::expr::eval_pred::PEvaluator;

pub mod element;
pub type ID = i64;
// a special id for Null graph elements.
pub const NULL_ID: ID = ID::MAX;

pub fn read_id<R: ReadExt>(reader: &mut R) -> io::Result<ID> {
reader.read_i64()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -833,7 +833,7 @@ impl InnerOpr {
mod tests {
use ahash::HashMap;
use dyn_type::DateTimeFormats;
use ir_common::{expr_parse::str_to_expr_pb, generated::physical::physical_opr::operator};
use ir_common::expr_parse::str_to_expr_pb;

use super::*;
use crate::apis::{DynDetails, Vertex};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ mod common;
mod test {
use std::sync::Arc;

use dyn_type::Object;
use graph_proxy::apis::{register_graph, GraphElement};
use graph_proxy::create_exp_store;
use graph_store::ldbc::LDBCVertexParser;
Expand Down Expand Up @@ -96,8 +95,7 @@ mod test {
while let Some(Ok(record)) = result.next() {
if let Some(element) = record.get(None).unwrap().as_vertex() {
result_ids.push(element.id() as usize)
} else if let Some(obj) = record.get(None).unwrap().as_object() {
assert_eq!(obj, &Object::None);
} else if record.get(None).unwrap().is_none() {
none_cnt += 1;
}
}
Expand Down Expand Up @@ -131,8 +129,7 @@ mod test {
println!("record: {:?}", record);
if let Some(element) = record.get(None).unwrap().as_vertex() {
result_ids.push(element.id() as usize)
} else if let Some(obj) = record.get(None).unwrap().as_object() {
assert_eq!(obj, &Object::None);
} else if record.get(None).unwrap().is_none() {
none_cnt += 1;
}
}
Expand Down Expand Up @@ -168,8 +165,7 @@ mod test {
while let Some(Ok(record)) = result.next() {
if let Some(e) = record.get(None).unwrap().as_edge() {
result_edges.push((e.src_id as usize, e.dst_id as usize));
} else if let Some(obj) = record.get(None).unwrap().as_object() {
assert_eq!(obj, &Object::None);
} else if record.get(None).unwrap().is_none() {
none_cnt += 1;
}
}
Expand Down Expand Up @@ -268,11 +264,8 @@ mod test {
while let Some(Ok(record)) = result.next() {
if let Some(element) = record.get(None).unwrap().as_vertex() {
result_ids.push(element.id() as usize);
} else if let Some(obj) = record.get(None).unwrap().as_object() {
assert_eq!(obj, &Object::None);
} else if record.get(None).unwrap().is_none() {
none_cnt += 1;
} else {
unreachable!()
}
}
result_ids.sort();
Expand Down
63 changes: 59 additions & 4 deletions interactive_engine/executor/ir/runtime/src/process/entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use std::sync::Arc;

use ahash::HashMap;
use dyn_type::{BorrowObject, Object};
use graph_proxy::apis::graph::NULL_ID;
use graph_proxy::apis::VertexOrEdge;
use graph_proxy::apis::{Edge, Element, GraphElement, GraphPath, PropertyValue, Vertex, ID};
use ir_common::error::ParsePbError;
Expand Down Expand Up @@ -51,6 +52,8 @@ pub enum EntryType {
Intersection,
/// Type of collection consisting of entries
Collection,
/// A Null graph element entry type
Null,
}

pub trait Entry: Debug + Send + Sync + AsAny + Element {
Expand Down Expand Up @@ -104,6 +107,7 @@ impl DynEntry {
.as_object()
.map(|obj| obj.eq(&Object::None))
.unwrap_or(false),
EntryType::Null => true,
_ => false,
}
}
Expand Down Expand Up @@ -184,6 +188,9 @@ impl Encode for DynEntry {
.unwrap()
.write_to(writer)?;
}
EntryType::Null => {
writer.write_u8(9)?;
}
}
Ok(())
}
Expand Down Expand Up @@ -225,6 +232,7 @@ impl Decode for DynEntry {
let general_intersect = GeneralIntersectionEntry::read_from(reader)?;
Ok(DynEntry::new(general_intersect))
}
9 => Ok(DynEntry::new(NullEntry)),
_ => unreachable!(),
}
}
Expand All @@ -247,7 +255,7 @@ impl Element for DynEntry {
impl GraphElement for DynEntry {
fn id(&self) -> ID {
match self.get_type() {
EntryType::Vertex | EntryType::Edge | EntryType::Path => {
EntryType::Vertex | EntryType::Edge | EntryType::Path | EntryType::Null => {
self.inner.as_graph_element().unwrap().id()
}
_ => unreachable!(),
Expand All @@ -256,7 +264,7 @@ impl GraphElement for DynEntry {

fn label(&self) -> Option<i32> {
match self.get_type() {
EntryType::Vertex | EntryType::Edge | EntryType::Path => {
EntryType::Vertex | EntryType::Edge | EntryType::Path | EntryType::Null => {
self.inner.as_graph_element().unwrap().label()
}
_ => unreachable!(),
Expand All @@ -265,7 +273,7 @@ impl GraphElement for DynEntry {

fn get_property(&self, key: &NameOrId) -> Option<PropertyValue> {
match self.get_type() {
EntryType::Vertex | EntryType::Edge | EntryType::Path => self
EntryType::Vertex | EntryType::Edge | EntryType::Path | EntryType::Null => self
.inner
.as_graph_element()
.unwrap()
Expand All @@ -276,7 +284,7 @@ impl GraphElement for DynEntry {

fn get_all_properties(&self) -> Option<HashMap<NameOrId, Object>> {
match self.get_type() {
EntryType::Vertex | EntryType::Edge | EntryType::Path => self
EntryType::Vertex | EntryType::Edge | EntryType::Path | EntryType::Null => self
.inner
.as_graph_element()
.unwrap()
Expand Down Expand Up @@ -306,6 +314,7 @@ impl Hash for DynEntry {
.as_any_ref()
.downcast_ref::<PairEntry>()
.hash(state),
EntryType::Null => self.hash(state),
}
}
}
Expand Down Expand Up @@ -335,6 +344,7 @@ impl PartialEq for DynEntry {
.as_any_ref()
.downcast_ref::<PairEntry>()
.eq(&other.as_any_ref().downcast_ref::<PairEntry>()),
EntryType::Null => other.get_type() == EntryType::Null,
}
} else {
false
Expand Down Expand Up @@ -373,6 +383,7 @@ impl PartialOrd for DynEntry {
.as_any_ref()
.downcast_ref::<PairEntry>()
.partial_cmp(&other.as_any_ref().downcast_ref::<PairEntry>()),
EntryType::Null => None,
}
} else {
None
Expand Down Expand Up @@ -548,6 +559,50 @@ impl Decode for CollectionEntry {
}
}

// NullEntry represents a null graph element, e.g., a null vertex generated by optional edge_expand.
#[derive(Debug, Clone, Default, PartialEq, PartialOrd, Eq, Hash)]
pub struct NullEntry;

impl_as_any!(NullEntry);

impl Entry for NullEntry {
fn get_type(&self) -> EntryType {
EntryType::Null
}
}

impl Element for NullEntry {
fn as_graph_element(&self) -> Option<&dyn GraphElement> {
Some(self)
}

fn len(&self) -> usize {
0
}

fn as_borrow_object(&self) -> BorrowObject {
BorrowObject::None
}
}

impl GraphElement for NullEntry {
fn id(&self) -> ID {
NULL_ID
}

fn label(&self) -> Option<i32> {
None
}

fn get_property(&self, _key: &NameOrId) -> Option<PropertyValue> {
None
}

fn get_all_properties(&self) -> Option<HashMap<NameOrId, Object>> {
None
}
}

impl TryFrom<result_pb::Element> for DynEntry {
type Error = ParsePbError;
fn try_from(e: result_pb::Element) -> Result<Self, Self::Error> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use ir_common::KeyId;
use pegasus::api::function::{DynIter, FlatMapFunction, FnResult};

use crate::error::{FnExecError, FnGenError, FnGenResult};
use crate::process::entry::{Entry, EntryType};
use crate::process::entry::{Entry, EntryType, NullEntry};
use crate::process::operator::flatmap::FlatMapFuncGen;
use crate::process::record::{Record, RecordExpandIter, RecordPathExpandIter};

Expand All @@ -50,7 +50,7 @@ impl<E: Entry + 'static> FlatMapFunction<Record, Record> for EdgeExpandOperator<
// the case of expand edge, and get end vertex;
ExpandOpt::Vertex => {
if self.is_optional && iter.peek().is_none() {
input.append(Object::None, self.alias);
input.append(NullEntry, self.alias);
Ok(Box::new(vec![input].into_iter()))
} else {
let neighbors_iter = iter.map(|e| {
Expand All @@ -74,7 +74,7 @@ impl<E: Entry + 'static> FlatMapFunction<Record, Record> for EdgeExpandOperator<
// the case of expand neighbors, including edges/vertices
ExpandOpt::Edge => {
if self.is_optional && iter.peek().is_none() {
input.append(Object::None, self.alias);
input.append(NullEntry, self.alias);
Ok(Box::new(vec![input].into_iter()))
} else {
Ok(Box::new(RecordExpandIter::new(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
use std::convert::TryInto;

use dyn_type::Object;
use graph_proxy::apis::GraphElement;
use graph_proxy::apis::{get_graph, DynDetails, GraphPath, QueryParams, Vertex};
use graph_proxy::utils::expr::eval_pred::EvalPred;
Expand All @@ -26,7 +25,7 @@ use ir_common::{KeyId, LabelId};
use pegasus::api::function::{FilterMapFunction, FnResult};

use crate::error::{FnExecError, FnExecResult, FnGenError, FnGenResult};
use crate::process::entry::{DynEntry, Entry};
use crate::process::entry::{DynEntry, Entry, NullEntry};
use crate::process::operator::map::FilterMapFuncGen;
use crate::process::record::Record;

Expand Down Expand Up @@ -118,16 +117,9 @@ impl FilterMapFunction<Record, Record> for GetVertexOperator {
} else {
Err(FnExecError::unexpected_data_error("unreachable path end entry in GetV"))?
}
} else if let Some(obj) = entry.as_object() {
if Object::None.eq(obj) {
input.append(Object::None, self.alias);
Ok(Some(input))
} else {
Err(FnExecError::unexpected_data_error(&format!(
"Can only apply `GetV` on an object that is not None. The entry is {:?}",
entry
)))?
}
} else if entry.is_none() {
input.append(NullEntry, self.alias);
Ok(Some(input))
} else {
Err(FnExecError::unexpected_data_error( &format!(
"Can only apply `GetV` (`Auxilia` instead) on an edge or path entry, while the entry is {:?}", entry
Expand Down Expand Up @@ -251,27 +243,20 @@ impl FilterMapFunction<Record, Record> for AuxiliaOperator {
} else {
return Ok(None);
}
} else if let Some(obj) = entry.as_object() {
if Object::None.eq(obj) {
if let Some(predicate) = &self.query_params.filter {
let res = predicate
.eval_bool(Some(&input))
.map_err(|e| FnExecError::from(e))?;
if res {
input.append(Object::None, self.alias);
return Ok(Some(input));
} else {
return Ok(None);
}
} else {
input.append(Object::None, self.alias);
} else if entry.is_none() {
if let Some(predicate) = &self.query_params.filter {
let res = predicate
.eval_bool(Some(&input))
.map_err(|e| FnExecError::from(e))?;
if res {
input.append(NullEntry, self.alias);
return Ok(Some(input));
} else {
return Ok(None);
}
} else {
Err(FnExecError::unexpected_data_error(&format!(
"neither Vertex nor Edge entry is accessed in `Auxilia` operator, the entry is {:?}",
entry
)))?
input.append(NullEntry, self.alias);
return Ok(Some(input));
}
} else {
Err(FnExecError::unexpected_data_error(&format!(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,7 @@ impl RecordSinkEncoder {
EntryType::Pair => {
unreachable!()
}
EntryType::Null => Some(result_pb::element::Inner::Object(Object::None.into())),
};
result_pb::Element { inner }
}
Expand Down

0 comments on commit 086d7b1

Please sign in to comment.