Skip to content

Commit

Permalink
Get rid of CorrelationId, use context offset instead
Browse files Browse the repository at this point in the history
  • Loading branch information
asenac committed Jan 5, 2024
1 parent 6388902 commit e7a1312
Show file tree
Hide file tree
Showing 10 changed files with 262 additions and 342 deletions.
3 changes: 1 addition & 2 deletions src/query_graph/explain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,10 +170,9 @@ impl<'a> QueryGraphPrePostVisitor for ExplainVisitor<'a> {
..
} => {
format!(
"{}{} Apply correlation_id: {}, parameters: [{}]\n",
"{}{} Apply parameters: [{}]\n",
prefix,
apply_type,
correlation.correlation_id.0,
explain_scalar_expr_vec(&correlation.parameters),
)
}
Expand Down
3 changes: 1 addition & 2 deletions src/query_graph/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,9 @@ impl<'a> QueryGraphPrePostVisitor for JsonSerializer<'a> {
..
} => {
format!(
"{}{} Apply correlation_id: {}, parameters: [{}]",
"{}{} Apply parameters: [{}]",
prefix,
apply_type,
correlation.correlation_id.0,
explain_scalar_expr_vec(&correlation.parameters),
)
}
Expand Down
13 changes: 0 additions & 13 deletions src/query_graph/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,6 @@ pub mod visitor;

pub type NodeId = usize;

#[derive(Clone, PartialEq, Eq, Copy, Hash, PartialOrd, Ord, Debug)]
pub struct CorrelationId(pub usize);

#[derive(Clone, PartialEq, Eq, Copy)]
pub enum JoinType {
Inner,
Expand All @@ -49,7 +46,6 @@ pub enum ApplyType {

#[derive(Clone, Hash, PartialEq, Eq, PartialOrd, Ord, Debug)]
pub struct CorrelationContext<E: VisitableExpr + RewritableExpr> {
pub correlation_id: CorrelationId,
pub parameters: Vec<Rc<E>>,
}

Expand Down Expand Up @@ -106,7 +102,6 @@ pub struct QueryGraph {
parents: HashMap<NodeId, BTreeSet<NodeId>>,
/// Subqueries
subqueries: Vec<NodeId>,
next_correlation_id: CorrelationId,
/// Keeps track of the number of node replacements the query graph has gone through.
pub gen_number: usize,
pub property_cache: RefCell<PropertyCache>,
Expand Down Expand Up @@ -233,7 +228,6 @@ impl QueryGraph {
gen_number: 0,
parents: HashMap::new(),
subqueries: Vec::new(),
next_correlation_id: CorrelationId(0),
property_cache: RefCell::new(PropertyCache::new()),
}
}
Expand Down Expand Up @@ -281,12 +275,6 @@ impl QueryGraph {
self.subqueries.iter().map(|root_id| *root_id).collect_vec()
}

pub fn new_correlation_id(&mut self) -> CorrelationId {
let result = self.next_correlation_id;
self.next_correlation_id = CorrelationId(result.0 + 1);
result
}

/// Finds whether there is an existing node exactly like the given one.
fn find_node(&self, node: &QueryNode) -> Option<NodeId> {
self.nodes.iter().find_map(|(node_id, existing_node)| {
Expand Down Expand Up @@ -464,7 +452,6 @@ impl Clone for QueryGraph {
gen_number: self.gen_number,
parents: self.parents.clone(),
subqueries: self.subqueries.clone(),
next_correlation_id: self.next_correlation_id,
// Cached metadata is not cloned
property_cache: RefCell::new(PropertyCache::new()),
}
Expand Down
76 changes: 40 additions & 36 deletions src/query_graph/properties/correlated_input_refs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,7 @@ use std::{
use itertools::Itertools;

use crate::{
query_graph::{
visitor::QueryGraphPrePostVisitor, CorrelationId, NodeId, QueryGraph, QueryNode,
},
query_graph::{visitor::QueryGraphPrePostVisitor, NodeId, QueryGraph, QueryNode},
scalar_expr::{visitor::visit_expr_pre, ScalarExpr},
visitor_utils::PreOrderVisitationResult,
};
Expand All @@ -20,7 +18,7 @@ struct CorrelatedInputRefsTag;
pub fn node_correlated_input_refs(
query_graph: &QueryGraph,
node_id: NodeId,
) -> Rc<HashMap<CorrelationId, BTreeSet<usize>>> {
) -> Rc<HashMap<usize, BTreeSet<usize>>> {
let type_id = TypeId::of::<CorrelatedInputRefsTag>();
if let Some(cached) = query_graph
.property_cache
Expand All @@ -29,7 +27,7 @@ pub fn node_correlated_input_refs(
.get(&type_id)
{
return cached
.downcast_ref::<Rc<HashMap<CorrelationId, BTreeSet<usize>>>>()
.downcast_ref::<Rc<HashMap<usize, BTreeSet<usize>>>>()
.unwrap()
.clone();
}
Expand All @@ -39,12 +37,12 @@ pub fn node_correlated_input_refs(
visit_expr_pre(expr, &mut |curr_expr| {
match curr_expr.as_ref() {
ScalarExpr::CorrelatedInputRef {
correlation_id,
context_offset,
index,
..
} => {
correlated_cols
.entry(*correlation_id)
.entry(*context_offset)
.or_insert_with(|| BTreeSet::new())
.insert(*index);
}
Expand All @@ -53,21 +51,23 @@ pub fn node_correlated_input_refs(
| ScalarExpr::ScalarSubqueryCmp { subquery, .. } => {
let subquery_correlated_input_refs =
subgraph_correlated_input_refs(query_graph, subquery.root);
merge_correlated_maps(
subquery_correlated_input_refs
.iter()
// Remove the references that correspond to parameters of the subquery
.filter(|(correlation_id, _)| {
subquery
.correlation
.as_ref()
.map(|correlation| {
correlation.correlation_id != **correlation_id
})
.unwrap_or(false)
}),
&mut correlated_cols,
);
if subquery.correlation.is_some() {
let subquery_external_correlated_input_refs =
subquery_correlated_input_refs
.iter()
.filter(|(offset, _)| **offset > 0)
.map(|(offset, columns)| (offset - 1, columns.clone()))
.collect::<HashMap<usize, BTreeSet<usize>>>();
merge_correlated_maps(
subquery_external_correlated_input_refs.iter(),
&mut correlated_cols,
);
} else {
merge_correlated_maps(
subquery_correlated_input_refs.iter(),
&mut correlated_cols,
);
}
}
_ => (),
}
Expand All @@ -90,7 +90,7 @@ pub fn node_correlated_input_refs(
pub fn subgraph_correlated_input_refs(
query_graph: &QueryGraph,
node_id: NodeId,
) -> Rc<HashMap<CorrelationId, BTreeSet<usize>>> {
) -> Rc<HashMap<usize, BTreeSet<usize>>> {
SubgraphCorrelatedInputRefs::subgraph_correlated_input_refs(query_graph, node_id)
}

Expand All @@ -102,10 +102,10 @@ pub fn subgraph_correlated_input_refs_annotator(
let correlated_cols = correlated_cols
.iter()
.sorted()
.map(|(correlation_id, columns)| {
.map(|(offset, columns)| {
columns
.iter()
.map(|column| format!("cor_{}.ref_{}", correlation_id.0, column))
.map(|column| format!("ctx_{}.ref_{}", *offset, column))
})
.flatten()
.join(", ");
Expand All @@ -122,7 +122,7 @@ impl SubgraphCorrelatedInputRefs {
fn subgraph_correlated_input_refs(
query_graph: &QueryGraph,
node_id: NodeId,
) -> Rc<HashMap<CorrelationId, BTreeSet<usize>>> {
) -> Rc<HashMap<usize, BTreeSet<usize>>> {
let mut visitor = SubgraphCorrelatedInputRefs {};
query_graph.visit_subgraph(&mut visitor, node_id);
visitor.subgraph_correlated_input_refs_unchecked(query_graph, node_id)
Expand All @@ -132,14 +132,14 @@ impl SubgraphCorrelatedInputRefs {
&self,
query_graph: &QueryGraph,
node_id: NodeId,
) -> Rc<HashMap<CorrelationId, BTreeSet<usize>>> {
) -> Rc<HashMap<usize, BTreeSet<usize>>> {
query_graph
.property_cache
.borrow_mut()
.node_bottom_up_properties(node_id)
.get(&Self::metadata_type_id())
.unwrap()
.downcast_ref::<Rc<HashMap<CorrelationId, BTreeSet<usize>>>>()
.downcast_ref::<Rc<HashMap<usize, BTreeSet<usize>>>>()
.unwrap()
.clone()
}
Expand All @@ -152,9 +152,9 @@ impl SubgraphCorrelatedInputRefs {
&self,
query_graph: &QueryGraph,
node_id: NodeId,
) -> Rc<HashMap<CorrelationId, BTreeSet<usize>>> {
) -> Rc<HashMap<usize, BTreeSet<usize>>> {
// The correlated input refs in the node itself...
let mut correlated_cols: HashMap<CorrelationId, BTreeSet<usize>> =
let mut correlated_cols: HashMap<usize, BTreeSet<usize>> =
node_correlated_input_refs(query_graph, node_id)
.as_ref()
.clone();
Expand All @@ -166,8 +166,12 @@ impl SubgraphCorrelatedInputRefs {
merge_correlated_maps(input_correlated_cols.iter(), &mut correlated_cols);
}
//... but remove ones in the correlation scope the node defines.
if let QueryNode::Apply { correlation, .. } = &query_node {
correlated_cols.remove(&correlation.correlation_id);
if let QueryNode::Apply { .. } = &query_node {
correlated_cols = correlated_cols
.into_iter()
.filter(|(offset, _)| *offset > 0)
.map(|(offset, columns)| (offset - 1, columns))
.collect();
}
Rc::new(correlated_cols)
}
Expand Down Expand Up @@ -204,12 +208,12 @@ impl QueryGraphPrePostVisitor for SubgraphCorrelatedInputRefs {
}
}

fn merge_correlated_maps<'a, I>(src: I, dst: &mut HashMap<CorrelationId, BTreeSet<usize>>)
fn merge_correlated_maps<'a, I>(src: I, dst: &mut HashMap<usize, BTreeSet<usize>>)
where
I: Iterator<Item = (&'a CorrelationId, &'a BTreeSet<usize>)>,
I: Iterator<Item = (&'a usize, &'a BTreeSet<usize>)>,
{
for (correlation_id, columns) in src {
dst.entry(*correlation_id)
for (context_offset, columns) in src {
dst.entry(*context_offset)
.or_insert_with(|| BTreeSet::new())
.extend(columns.iter());
}
Expand Down
Loading

0 comments on commit e7a1312

Please sign in to comment.