Skip to content

Commit

Permalink
Correlated references as a property
Browse files Browse the repository at this point in the history
Cache correlated input refs.
  • Loading branch information
asenac committed Jan 3, 2024
1 parent 75f1e34 commit 83bac46
Show file tree
Hide file tree
Showing 5 changed files with 320 additions and 3 deletions.
201 changes: 201 additions & 0 deletions src/query_graph/properties/correlated_input_refs.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
use std::{
any::TypeId,
collections::{BTreeSet, HashMap},
rc::Rc,
};

use itertools::Itertools;

use crate::{
query_graph::{visitor::QueryGraphPrePostVisitor, CorrelationId, NodeId, QueryGraph},
scalar_expr::{visitor::visit_expr_pre, ScalarExpr},
visitor_utils::PreOrderVisitationResult,
};

use super::subqueries;

struct CorrelatedInputRefsTag;

/// Returns a set with the correlated input refs the node contains, if any.
pub fn node_correlated_input_refs(
query_graph: &QueryGraph,
node_id: NodeId,
) -> Rc<HashMap<CorrelationId, BTreeSet<usize>>> {
let type_id = TypeId::of::<CorrelatedInputRefsTag>();
if let Some(cached) = query_graph
.property_cache
.borrow_mut()
.single_node_properties(node_id)
.get(&type_id)
{
return cached
.downcast_ref::<Rc<HashMap<CorrelationId, BTreeSet<usize>>>>()
.unwrap()
.clone();
}
let mut correlated_cols = HashMap::new();
let query_node = query_graph.node(node_id);
query_node.visit_scalar_expr(&mut |expr| {
visit_expr_pre(expr, &mut |curr_expr| {
if let ScalarExpr::CorrelatedInputRef {
correlation_id,
index,
..
} = curr_expr.as_ref()
{
correlated_cols
.entry(*correlation_id)
.or_insert_with(|| BTreeSet::new())
.insert(*index);
}
PreOrderVisitationResult::VisitInputs
});
});

// Add the correlated input refs in the subqueries the node may contain
let subqueries = subqueries(query_graph, node_id);
for subquery_root in subqueries.iter() {
let subquery_correlated_input_refs =
subgraph_correlated_input_refs(query_graph, *subquery_root);
merge_correlated_maps(&*subquery_correlated_input_refs, &mut correlated_cols);
}

// Store the property in the cache
let correlated_cols = Rc::new(correlated_cols);
query_graph
.property_cache
.borrow_mut()
.single_node_properties(node_id)
.insert(type_id, Box::new(correlated_cols.clone()));
correlated_cols
}

/// Returns a set with the correlated input refs in the given subplan that escape
/// the context of the subplan.
pub fn subgraph_correlated_input_refs(
query_graph: &QueryGraph,
node_id: NodeId,
) -> Rc<HashMap<CorrelationId, BTreeSet<usize>>> {
SubgraphCorrelatedInputRefs::subgraph_correlated_input_refs(query_graph, node_id)
}

pub fn subgraph_correlated_input_refs_annotator(
query_graph: &QueryGraph,
node_id: NodeId,
) -> Option<String> {
let correlated_cols = subgraph_correlated_input_refs(query_graph, node_id);
let correlated_cols = correlated_cols
.iter()
.sorted()
.map(|(correlation_id, columns)| {
columns
.iter()
.map(|column| format!("cor_{}.ref_{}", correlation_id.0, column))
})
.flatten()
.join(", ");
if correlated_cols.is_empty() {
None
} else {
Some(format!("Correlated References: {}", correlated_cols))
}
}

struct SubgraphCorrelatedInputRefs {}

impl SubgraphCorrelatedInputRefs {
fn subgraph_correlated_input_refs(
query_graph: &QueryGraph,
node_id: NodeId,
) -> Rc<HashMap<CorrelationId, BTreeSet<usize>>> {
let mut visitor = SubgraphCorrelatedInputRefs {};
query_graph.visit_subgraph(&mut visitor, node_id);
visitor.subgraph_correlated_input_refs_unchecked(query_graph, node_id)
}

fn subgraph_correlated_input_refs_unchecked(
&self,
query_graph: &QueryGraph,
node_id: NodeId,
) -> Rc<HashMap<CorrelationId, BTreeSet<usize>>> {
query_graph
.property_cache
.borrow_mut()
.node_bottom_up_properties(node_id)
.get(&Self::metadata_type_id())
.unwrap()
.downcast_ref::<Rc<HashMap<CorrelationId, BTreeSet<usize>>>>()
.unwrap()
.clone()
}

fn metadata_type_id() -> TypeId {
TypeId::of::<Self>()
}

fn compute_property_for_node(
&self,
query_graph: &QueryGraph,
node_id: NodeId,
) -> Rc<HashMap<CorrelationId, BTreeSet<usize>>> {
// The correlated input refs in the node itself...
let mut correlated_cols: HashMap<CorrelationId, BTreeSet<usize>> =
node_correlated_input_refs(query_graph, node_id)
.as_ref()
.clone();
// ... and the ones under its child subgraphs, ...
let query_node = query_graph.node(node_id);
for input in 0..query_node.num_inputs() {
let input_correlated_cols = self
.subgraph_correlated_input_refs_unchecked(query_graph, query_node.get_input(input));
merge_correlated_maps(&*input_correlated_cols, &mut correlated_cols);
}
//... but remove ones in the correlation scope the node defines.
if let Some(correlation_id) = query_node.correlation_id() {
correlated_cols.remove(&correlation_id);
}
Rc::new(correlated_cols)
}
}

impl QueryGraphPrePostVisitor for SubgraphCorrelatedInputRefs {
fn visit_pre(&mut self, query_graph: &QueryGraph, node_id: NodeId) -> PreOrderVisitationResult {
if query_graph
.property_cache
.borrow_mut()
.node_bottom_up_properties(node_id)
.contains_key(&Self::metadata_type_id())
{
PreOrderVisitationResult::DoNotVisitInputs
} else {
PreOrderVisitationResult::VisitInputs
}
}

fn visit_post(&mut self, query_graph: &QueryGraph, node_id: NodeId) {
if !query_graph
.property_cache
.borrow_mut()
.node_bottom_up_properties(node_id)
.contains_key(&Self::metadata_type_id())
{
let correlated_input_refs = self.compute_property_for_node(query_graph, node_id);
query_graph
.property_cache
.borrow_mut()
.node_bottom_up_properties(node_id)
.insert(Self::metadata_type_id(), Box::new(correlated_input_refs));
}
}
}

fn merge_correlated_maps(
src: &HashMap<CorrelationId, BTreeSet<usize>>,
dst: &mut HashMap<CorrelationId, BTreeSet<usize>>,
) {
for (correlation_id, columns) in src.iter() {
dst.entry(*correlation_id)
.or_insert_with(|| BTreeSet::new())
.extend(columns.iter());
}
}
5 changes: 5 additions & 0 deletions src/query_graph/properties/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use std::{
use crate::query_graph::NodeId;

mod column_provenance;
mod correlated_input_refs;
mod equivalence_classes;
mod input_dependencies;
mod keys;
Expand All @@ -21,6 +22,9 @@ mod subqueries;

pub use column_provenance::column_provenance;
pub use column_provenance::ColumnProvenanceInfo;
pub use correlated_input_refs::node_correlated_input_refs;
pub use correlated_input_refs::subgraph_correlated_input_refs;
pub use correlated_input_refs::subgraph_correlated_input_refs_annotator;
pub use equivalence_classes::equivalence_classes;
pub use input_dependencies::input_dependencies;
pub use keys::empty_key;
Expand All @@ -46,6 +50,7 @@ pub fn default_annotators() -> Vec<&'static dyn Fn(&QueryGraph, NodeId) -> Optio
&row_type_annotator,
&pulled_up_predicates_annotator,
&keys_annotator,
&subgraph_correlated_input_refs_annotator,
]
}

Expand Down
1 change: 1 addition & 0 deletions src/query_graph/properties/num_columns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ impl QueryGraphPrePostVisitor for NumColumns {
}
}
}

#[cfg(test)]
mod tests {
use crate::{
Expand Down
49 changes: 49 additions & 0 deletions tests/explain_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1687,6 +1687,55 @@ mod test_queries {
query_graph.set_entry_node(apply_1);
query_graph
});
queries.insert("nested_apply_1".to_string(), {
let mut query_graph = QueryGraph::new();
let table_scan_1 = query_graph.table_scan(1, 5);
let correlation_id_1 = query_graph.new_correlation_id();
let correlation_id_2 = query_graph.new_correlation_id();
let filter_1 = query_graph.filter(
table_scan_1,
vec![
ScalarExpr::input_ref(0)
.binary(
BinaryOp::Eq,
ScalarExpr::CorrelatedInputRef {
correlation_id: correlation_id_1,
index: 1,
data_type: DataType::String,
}
.into(),
)
.into(),
ScalarExpr::input_ref(1)
.binary(
BinaryOp::Eq,
ScalarExpr::CorrelatedInputRef {
correlation_id: correlation_id_2,
index: 3,
data_type: DataType::String,
}
.into(),
)
.into(),
],
);
let table_scan_2 = query_graph.table_scan(2, 5);
let apply_1 = query_graph.add_node(QueryNode::Apply {
correlation_id: correlation_id_1,
left: table_scan_2,
right: filter_1,
apply_type: ApplyType::LeftOuter,
});
let table_scan_3 = query_graph.table_scan(3, 5);
let apply_2 = query_graph.add_node(QueryNode::Apply {
correlation_id: correlation_id_2,
left: table_scan_3,
right: apply_1,
apply_type: ApplyType::Inner,
});
query_graph.set_entry_node(apply_2);
query_graph
});
}
}

Expand Down
Loading

0 comments on commit 83bac46

Please sign in to comment.