Skip to content

Commit

Permalink
Support for correlated filters
Browse files Browse the repository at this point in the history
input_dependencies is now a cached property

A Filter node with a correlation_id may contain conditions that are
correlated wrt the input of the filter.
  • Loading branch information
asenac committed Jan 3, 2024
1 parent 83bac46 commit efae0eb
Show file tree
Hide file tree
Showing 15 changed files with 275 additions and 52 deletions.
22 changes: 17 additions & 5 deletions src/query_graph/explain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,11 +122,23 @@ impl<'a> QueryGraphPrePostVisitor for ExplainVisitor<'a> {
QueryNode::Project { outputs, .. } => {
format!("{}Project [{}]\n", prefix, explain_scalar_expr_vec(outputs),)
}
QueryNode::Filter { conditions, .. } => format!(
"{}Filter [{}]\n",
prefix,
explain_scalar_expr_vec(conditions),
),
QueryNode::Filter {
conditions,
correlation_id,
..
} => {
let correlation = if let Some(correlation_id) = correlation_id {
format!(" [CorrelationId: {}]", correlation_id.0,)
} else {
String::new()
};
format!(
"{}Filter{} [{}]\n",
prefix,
correlation,
explain_scalar_expr_vec(conditions),
)
}
QueryNode::TableScan { table_id, .. } => {
format!("{}TableScan id: {}\n", prefix, table_id)
}
Expand Down
18 changes: 16 additions & 2 deletions src/query_graph/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,22 @@ impl<'a> QueryGraphPrePostVisitor for JsonSerializer<'a> {
QueryNode::Project { outputs, .. } => {
format!("{}Project [{}]", prefix, explain_scalar_expr_vec(outputs),)
}
QueryNode::Filter { conditions, .. } => {
format!("{}Filter [{}]", prefix, explain_scalar_expr_vec(conditions),)
QueryNode::Filter {
conditions,
correlation_id,
..
} => {
let correlation = if let Some(correlation_id) = correlation_id {
format!(" [CorrelationId: {}]", correlation_id.0,)
} else {
String::new()
};
format!(
"{}Filter{} [{}]",
prefix,
correlation,
explain_scalar_expr_vec(conditions),
)
}
QueryNode::TableScan { table_id, .. } => {
format!("{}TableScan id: {}", prefix, table_id)
Expand Down
18 changes: 16 additions & 2 deletions src/query_graph/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ pub enum QueryNode {
Filter {
conditions: Vec<ScalarExprRef>,
input: NodeId,
correlation_id: Option<CorrelationId>,
},
TableScan {
table_id: usize,
Expand Down Expand Up @@ -161,12 +162,12 @@ impl QueryNode {
pub fn correlation_id(&self) -> Option<CorrelationId> {
match self {
QueryNode::Project { .. }
| QueryNode::Filter { .. }
| QueryNode::TableScan { .. }
| QueryNode::Join { .. }
| QueryNode::Aggregate { .. }
| QueryNode::Union { .. }
| QueryNode::SubqueryRoot { .. } => None,
QueryNode::Filter { correlation_id, .. } => *correlation_id,
QueryNode::Apply { correlation_id, .. } => Some(*correlation_id),
}
}
Expand Down Expand Up @@ -398,10 +399,23 @@ impl QueryGraph {
}

pub fn filter(&mut self, input: NodeId, conditions: Vec<ScalarExprRef>) -> NodeId {
self.possibly_correlated_filter(input, conditions, None)
}

pub fn possibly_correlated_filter(
&mut self,
input: NodeId,
conditions: Vec<ScalarExprRef>,
correlation_id: Option<CorrelationId>,
) -> NodeId {
if conditions.is_empty() {
input
} else {
self.add_node(QueryNode::Filter { conditions, input })
self.add_node(QueryNode::Filter {
conditions,
input,
correlation_id,
})
}
}

Expand Down
9 changes: 7 additions & 2 deletions src/query_graph/optimizer/rules/expression_reduction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,19 @@ impl SingleReplacementRule for ExpressionReductionRule {
.collect_vec(),
)
}
QueryNode::Filter { conditions, input } => {
QueryNode::Filter {
conditions,
input,
correlation_id,
} => {
let row_type = row_type(query_graph, *input);
query_graph.filter(
query_graph.possibly_correlated_filter(
*input,
conditions
.iter()
.map(|e| reduce_expr_recursively(e, &query_graph, &row_type))
.collect_vec(),
*correlation_id,
)
}
QueryNode::Join {
Expand Down
40 changes: 28 additions & 12 deletions src/query_graph/optimizer/rules/filter_merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,35 @@ impl SingleReplacementRule for FilterMergeRule {
}

fn apply(&self, query_graph: &mut QueryGraph, node_id: NodeId) -> Option<NodeId> {
if let QueryNode::Filter { conditions, input } = query_graph.node(node_id) {
if let QueryNode::Filter {
conditions,
input,
correlation_id,
} = query_graph.node(node_id)
{
if let QueryNode::Filter {
conditions: child_conditions,
input: child_input,
correlation_id: child_correlation_id,
} = query_graph.node(*input)
{
return Some(
query_graph.filter(
*child_input,
conditions
.clone()
.into_iter()
.chain(child_conditions.clone().into_iter())
.collect(),
),
);
// TODO(asenac) for merging two filters with correlated subqueries
// we need to rewrite the expressions for remapping the correlated
// references recusively.
if correlation_id.is_none() || child_correlation_id.is_none() {
let correlation_id = correlation_id.or(*child_correlation_id);
return Some(
query_graph.possibly_correlated_filter(
*child_input,
conditions
.clone()
.into_iter()
.chain(child_conditions.clone().into_iter())
.collect(),
correlation_id,
),
);
}
}
}
None
Expand Down Expand Up @@ -76,7 +89,10 @@ mod tests {
let merged_filter_id = filter_merge_rule
.apply(&mut query_graph, filter_id_2)
.unwrap();
if let QueryNode::Filter { input, conditions } = query_graph.node(merged_filter_id) {
if let QueryNode::Filter {
input, conditions, ..
} = query_graph.node(merged_filter_id)
{
assert_eq!(*input, project_id);
assert_eq!(*conditions, vec![filter_2, filter_1]);
} else {
Expand Down
13 changes: 11 additions & 2 deletions src/query_graph/optimizer/rules/filter_normalization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,12 @@ impl SingleReplacementRule for FilterNormalizationRule {
}

fn apply(&self, query_graph: &mut QueryGraph, node_id: NodeId) -> Option<NodeId> {
if let QueryNode::Filter { conditions, input } = query_graph.node(node_id) {
if let QueryNode::Filter {
conditions,
input,
correlation_id,
} = query_graph.node(node_id)
{
let classes = equivalence_classes(query_graph, *input);
let predicates = pulled_up_predicates(query_graph, *input);
let mut replacement_map = to_replacement_map(&classes);
Expand Down Expand Up @@ -66,7 +71,11 @@ impl SingleReplacementRule for FilterNormalizationRule {
.collect_vec();

if new_conditions != *conditions {
return Some(query_graph.filter(*input, new_conditions));
return Some(query_graph.possibly_correlated_filter(
*input,
new_conditions,
*correlation_id,
));
}
}
None
Expand Down
35 changes: 22 additions & 13 deletions src/query_graph/optimizer/rules/filter_project_transpose.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,28 @@ impl SingleReplacementRule for FilterProjectTransposeRule {
}

fn apply(&self, query_graph: &mut QueryGraph, node_id: NodeId) -> Option<NodeId> {
if let QueryNode::Filter { conditions, input } = query_graph.node(node_id) {
if let QueryNode::Project {
outputs,
input: proj_input,
} = query_graph.node(*input)
{
let new_conditions = conditions
.iter()
.map(|c| dereference_scalar_expr(c, outputs))
.collect::<Vec<_>>();
let outputs = outputs.clone();
let new_filter = query_graph.filter(*proj_input, new_conditions);
return Some(query_graph.project(new_filter, outputs));
if let QueryNode::Filter {
conditions,
input,
correlation_id,
} = query_graph.node(node_id)
{
// TODO(asenac) We would need to dereference the correlated references in
// the subqueries in the conditions, recursively.
if correlation_id.is_none() {
if let QueryNode::Project {
outputs,
input: proj_input,
} = query_graph.node(*input)
{
let new_conditions = conditions
.iter()
.map(|c| dereference_scalar_expr(c, outputs))
.collect::<Vec<_>>();
let outputs = outputs.clone();
let new_filter = query_graph.filter(*proj_input, new_conditions);
return Some(query_graph.project(new_filter, outputs));
}
}
}
None
Expand Down
6 changes: 5 additions & 1 deletion src/query_graph/optimizer/rules/outer_to_inner_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,11 @@ fn do_all_parents_reject_null_from_non_preserving(
query_graph.visit_subgraph_upwards_pre(
&mut |query_graph, node_id| match query_graph.node(node_id) {
QueryNode::Project { .. } => PreOrderVisitationResult::VisitInputs,
QueryNode::Filter { conditions, input } => {
QueryNode::Filter {
conditions,
input,
correlation_id: _,
} => {
// For filters:
// 1. find the provenance information of input from the non_preserving_node_id
// 2. rewrite the column expressions replacing all input refs with nulls
Expand Down
24 changes: 18 additions & 6 deletions src/query_graph/optimizer/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,18 @@ pub(crate) fn common_parent_filters(
parents
.iter()
.map(|parent| {
if let QueryNode::Filter { conditions, .. } = query_graph.node(*parent) {
conditions.clone()
} else {
Vec::new()
if let QueryNode::Filter {
conditions,
correlation_id,
..
} = query_graph.node(*parent)
{
// TODO(asenac) extract the predicates that are not correlated
if correlation_id.is_none() {
return conditions.clone();
}
}
Vec::new()
})
.fold(None, |acc: Option<HashSet<ScalarExprRef>>, predicates| {
let set: HashSet<ScalarExprRef> = predicates.iter().cloned().collect();
Expand Down Expand Up @@ -235,13 +242,18 @@ pub(crate) fn apply_map_to_parents_and_replace_input(
);
new_proj
}
QueryNode::Filter { conditions, input } => {
QueryNode::Filter {
conditions,
input,
correlation_id,
} => {
let new_input = *replacements.get(input).unwrap();
let new_filter = query_graph.filter(
let new_filter = query_graph.possibly_correlated_filter(
new_input,
rewrite_expr_vec(conditions, &mut |e| {
apply_column_map(e, column_map).unwrap()
}),
*correlation_id,
);
new_filter
}
Expand Down
1 change: 1 addition & 0 deletions src/query_graph/properties/column_provenance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ impl ColumnProvenance {
QueryNode::Filter {
input,
conditions: _,
correlation_id: _,
} => {
let input_prov = self.column_provenance_unchecked(query_graph, *input);
prov.extend(input_prov.iter().map(|prov_info| {
Expand Down
38 changes: 33 additions & 5 deletions src/query_graph/properties/input_dependencies.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::collections::HashSet;
use std::{any::TypeId, collections::HashSet, rc::Rc};

use crate::{
query_graph::{NodeId, QueryGraph, QueryNode},
Expand All @@ -7,13 +7,21 @@ use crate::{

use super::num_columns;

pub fn input_dependencies(query_graph: &QueryGraph, node_id: NodeId) -> HashSet<usize> {
struct InputDependenciesTag;

pub fn input_dependencies(query_graph: &QueryGraph, node_id: NodeId) -> Rc<HashSet<usize>> {
let type_id = TypeId::of::<InputDependenciesTag>();
if let Some(cached) = query_graph
.property_cache
.borrow_mut()
.single_node_properties(node_id)
.get(&type_id)
{
return cached.downcast_ref::<Rc<HashSet<usize>>>().unwrap().clone();
}
let mut dependencies = HashSet::new();
match query_graph.node(node_id) {
QueryNode::Project { outputs: exprs, .. }
| QueryNode::Filter {
conditions: exprs, ..
}
| QueryNode::Join {
conditions: exprs, ..
} => exprs
Expand All @@ -30,9 +38,29 @@ pub fn input_dependencies(query_graph: &QueryGraph, node_id: NodeId) -> HashSet<
dependencies.extend(aggregate.operands.iter());
}
}
QueryNode::Filter {
conditions: exprs,
correlation_id,
input,
} => {
// TODO(asenac) use `subgraph_correlated_input_refs`for correlated filters
if correlation_id.is_some() {
dependencies.extend(0..num_columns(query_graph, *input))
} else {
exprs
.iter()
.for_each(|e| store_input_dependencies(e, &mut dependencies))
}
}
QueryNode::Union { .. } | QueryNode::SubqueryRoot { .. } | QueryNode::Apply { .. } => {
dependencies.extend(0..num_columns(query_graph, node_id))
}
}
let dependencies = Rc::new(dependencies);
query_graph
.property_cache
.borrow_mut()
.single_node_properties(node_id)
.insert(type_id, Box::new(dependencies.clone()));
dependencies
}
6 changes: 5 additions & 1 deletion src/query_graph/properties/keys.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,11 @@ impl Keys {
}
}));
}
QueryNode::Filter { input, conditions } => {
QueryNode::Filter {
input,
conditions,
correlation_id: _,
} => {
// FALSE/NULL predicate -> empty relation
if has_false_or_null_predicate(conditions) {
keys.push(KeyBounds {
Expand Down
Loading

0 comments on commit efae0eb

Please sign in to comment.