Skip to content

Commit

Permalink
correlation_utils.rs
Browse files Browse the repository at this point in the history
  • Loading branch information
asenac committed Jan 4, 2024
1 parent a118e1b commit 005c02c
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 74 deletions.
76 changes: 76 additions & 0 deletions src/query_graph/optimizer/correlation_utils.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
use std::collections::HashMap;

use itertools::Itertools;

use crate::{
query_graph::{
cloner::deep_clone,
properties::{subgraph_correlated_input_refs, subgraph_subqueries, subqueries},
CorrelationId, NodeId, QueryGraph,
},
scalar_expr::{rewrite::rewrite_expr_post, rewrite_utils::apply_subquery_map, ScalarExprRef},
};

/// Recursively rewrite the subquery plans hanging from the given node that
/// are correlated wrt the `old_correlation_id`, to make the correlated references
/// point to `new_correlation_id` instead.
pub(crate) fn update_correlated_references_in_subqueries<F>(
query_graph: &mut QueryGraph,
node_id: NodeId,
correlation_id: CorrelationId,
update_func: F,
) -> HashMap<NodeId, NodeId>
where
F: Fn(&ScalarExprRef) -> Option<ScalarExprRef>,
{
let stack = node_subqueries_in_dependency_order(query_graph, node_id, correlation_id);
let mut subquery_map = HashMap::new();
for subquery_root_id in stack.iter().rev() {
// Skip the subquery root
let subquery_plan = query_graph.node(*subquery_root_id).get_input(0);
let new_subquery_plan =
deep_clone(query_graph, subquery_plan, &|_, _| false, &mut |expr| {
rewrite_expr_post(
&mut |expr: &ScalarExprRef| {
update_func(expr).or_else(|| apply_subquery_map(expr, &subquery_map))
},
expr,
)
});
let new_subquery_root_id = query_graph.add_subquery(new_subquery_plan);
subquery_map.insert(*subquery_root_id, new_subquery_root_id);
}
subquery_map
}

/// Collect the subqueries in the given node, that are correlated wrt the given
/// correlation ID, recursively, returning them in dependency order.
fn node_subqueries_in_dependency_order(
query_graph: &mut QueryGraph,
node_id: NodeId,
correlation_id: CorrelationId,
) -> Vec<NodeId> {
// TODO(asenac) remove duplicates
let mut stack = subqueries(query_graph, node_id)
.iter()
.filter(|subquery_root_id| {
subgraph_correlated_input_refs(query_graph, **subquery_root_id)
.contains_key(&correlation_id)
})
.cloned()
.collect_vec();
let mut i = 0;
while i < stack.len() {
stack.extend(
subgraph_subqueries(query_graph, stack[i])
.iter()
.filter(|subquery_root_id| {
subgraph_correlated_input_refs(query_graph, **subquery_root_id)
.contains_key(&correlation_id)
})
.cloned(),
);
i = i + 1;
}
stack
}
1 change: 1 addition & 0 deletions src/query_graph/optimizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::visitor_utils::{PostOrderVisitationResult, PreOrderVisitationResult};

use super::{visitor::QueryGraphPrePostVisitorMut, NodeId, QueryGraph};

pub(crate) mod correlation_utils;
pub mod rules;
pub(crate) mod utils;

Expand Down
84 changes: 10 additions & 74 deletions src/query_graph/optimizer/rules/filter_merge.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,14 @@
use std::collections::HashMap;

use itertools::Itertools;

use crate::{
query_graph::{
cloner::deep_clone,
optimizer::{OptRuleType, SingleReplacementRule},
properties::{subgraph_correlated_input_refs, subgraph_subqueries, subqueries},
CorrelationId, NodeId, QueryGraph, QueryNode,
optimizer::{
correlation_utils::update_correlated_references_in_subqueries, OptRuleType,
SingleReplacementRule,
},
NodeId, QueryGraph, QueryNode,
},
scalar_expr::{
rewrite::rewrite_expr_post,
rewrite_utils::{apply_subquery_map, update_correlation_id},
ScalarExprRef,
},
};

Expand Down Expand Up @@ -50,11 +46,13 @@ impl SingleReplacementRule for FilterMergeRule {
// the outer filter node are rewritten to make them refer to the correlation
// ID of the inner filter node.
if correlation_id.is_some() && child_correlation_id.is_some() {
let subquery_map = update_correlation_id_in_subqueries(
let correlation_id = correlation_id.unwrap();
let new_correlation_id = new_correlation_id.unwrap();
let subquery_map = update_correlated_references_in_subqueries(
query_graph,
node_id,
correlation_id.unwrap(),
new_correlation_id.unwrap(),
correlation_id,
|e| update_correlation_id(e, correlation_id, new_correlation_id),
);
conditions
.iter_mut()
Expand All @@ -74,68 +72,6 @@ impl SingleReplacementRule for FilterMergeRule {
}
}

/// Recursively rewrite the subquery plans hanging from the given node that
/// are correlated wrt the `old_correlation_id`, to make the correlated references
/// point to `new_correlation_id` instead.
fn update_correlation_id_in_subqueries(
query_graph: &mut QueryGraph,
node_id: NodeId,
old_correlation_id: CorrelationId,
new_correlation_id: CorrelationId,
) -> HashMap<NodeId, NodeId> {
let stack = subqueries_in_dependency_order(query_graph, node_id, old_correlation_id);
let mut subquery_map = HashMap::new();
for subquery_root_id in stack.iter().rev() {
// Skip the subquery root
let subquery_plan = query_graph.node(*subquery_root_id).get_input(0);
let new_subquery_plan =
deep_clone(query_graph, subquery_plan, &|_, _| false, &mut |expr| {
rewrite_expr_post(
&mut |expr: &ScalarExprRef| {
update_correlation_id(expr, old_correlation_id, new_correlation_id)
.or_else(|| apply_subquery_map(expr, &subquery_map))
},
expr,
)
});
let new_subquery_root_id = query_graph.add_subquery(new_subquery_plan);
subquery_map.insert(*subquery_root_id, new_subquery_root_id);
}
subquery_map
}

/// Collect the subqueries in the given node, that are correlated wrt the given
/// correlation ID, recursively, returning them in dependency order.
fn subqueries_in_dependency_order(
query_graph: &mut QueryGraph,
node_id: NodeId,
correlation_id: CorrelationId,
) -> Vec<NodeId> {
// TODO(asenac) remove duplicates
let mut stack = subqueries(query_graph, node_id)
.iter()
.filter(|subquery_root_id| {
subgraph_correlated_input_refs(query_graph, **subquery_root_id)
.contains_key(&correlation_id)
})
.cloned()
.collect_vec();
let mut i = 0;
while i < stack.len() {
stack.extend(
subgraph_subqueries(query_graph, stack[i])
.iter()
.filter(|subquery_root_id| {
subgraph_correlated_input_refs(query_graph, **subquery_root_id)
.contains_key(&correlation_id)
})
.cloned(),
);
i = i + 1;
}
stack
}

#[cfg(test)]
mod tests {
use crate::{
Expand Down

0 comments on commit 005c02c

Please sign in to comment.