Skip to content

Commit

Permalink
Add QueryRoot node
Browse files Browse the repository at this point in the history
  • Loading branch information
asenac committed Jan 6, 2024
1 parent 91e1b17 commit 01f6958
Show file tree
Hide file tree
Showing 39 changed files with 3,248 additions and 2,509 deletions.
4 changes: 3 additions & 1 deletion src/query_graph/cloner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ where
input,
} => *input = inputs[0],
QueryNode::Union { inputs: inputs_ref } => *inputs_ref = inputs.to_vec(),
QueryNode::SubqueryRoot { input } => *input = inputs[0],
QueryNode::Apply {
correlation: _,
left,
Expand All @@ -90,6 +89,9 @@ where
*left = inputs[0];
*right = inputs[1];
}
QueryNode::QueryRoot { .. } | QueryNode::SubqueryRoot { .. } => {
panic!("Root nodes cannot be cloned")
}
}
self.stack.truncate(self.stack.len() - num_inputs);
query_graph.add_node(cloned_node)
Expand Down
5 changes: 4 additions & 1 deletion src/query_graph/explain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ impl<'a> Explainer<'a> {
query_graph,
leaves: HashSet::new(),
annotators: Vec::new(),
entry_point: query_graph.entry_node,
entry_point: QueryGraph::ROOT_NODE_ID,
}
}

Expand Down Expand Up @@ -119,6 +119,9 @@ impl<'a> QueryGraphPrePostVisitor for ExplainVisitor<'a> {
}
let prefix = format!("{}[{}] ", line_prefix, node_id);
let node = match query_graph.node(node_id) {
QueryNode::QueryRoot { .. } => {
format!("{}QueryRoot\n", prefix)
}
QueryNode::Project { outputs, .. } => {
format!("{}Project [{}]\n", prefix, explain_scalar_expr_vec(outputs))
}
Expand Down
3 changes: 3 additions & 0 deletions src/query_graph/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ impl<'a> QueryGraphPrePostVisitor for JsonSerializer<'a> {
}
let prefix = format!("[{}] ", node_id);
let label = match query_graph.node(node_id) {
QueryNode::QueryRoot { .. } => {
format!("{}QueryRoot", prefix)
}
QueryNode::Project { outputs, .. } => {
format!("{}Project [{}]", prefix, explain_scalar_expr_vec(outputs))
}
Expand Down
64 changes: 45 additions & 19 deletions src/query_graph/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ pub struct CorrelationContext<E: VisitableExpr + RewritableExpr> {

#[derive(Clone, PartialEq, Eq)]
pub enum QueryNode {
QueryRoot {
input: Option<NodeId>,
},
Project {
outputs: Vec<ScalarExprRef>,
input: NodeId,
Expand Down Expand Up @@ -93,8 +96,6 @@ pub struct QueryGraph {
/// All the nodes in the query graph. May contain nodes not attached to the plan, ie.
/// not reachable from the entry node.
nodes: HashMap<NodeId, QueryNode>,
/// The top-level root node of the query graph.
pub entry_node: NodeId,
/// The ID that will be given to the next node added to the query graph.
next_node_id: usize,
/// For each node, it contains a set with the nodes pointing to it through any of their
Expand All @@ -111,6 +112,7 @@ impl QueryNode {
/// Returns the number of inputs of this node.
pub fn num_inputs(&self) -> usize {
match self {
Self::QueryRoot { input } => input.map(|_| 1).unwrap_or(0),
Self::Project { .. } | Self::Filter { .. } | Self::Aggregate { .. } => 1,
Self::TableScan { .. } => 0,
Self::Join { .. } => 2,
Expand All @@ -125,6 +127,7 @@ impl QueryNode {
assert!(input_idx < self.num_inputs());

match self {
Self::QueryRoot { input } => input.unwrap(),
Self::Project { input, .. }
| Self::Filter { input, .. }
| Self::Aggregate { input, .. }
Expand All @@ -147,6 +150,7 @@ impl QueryNode {
assert!(input_idx < self.num_inputs());

match self {
Self::QueryRoot { input } => *input = Some(node_id),
Self::Project { input, .. }
| Self::Filter { input, .. }
| Self::Aggregate { input, .. }
Expand Down Expand Up @@ -180,7 +184,8 @@ impl QueryNode {
visitor(expr);
}
}
QueryNode::TableScan { .. }
QueryNode::QueryRoot { .. }
| QueryNode::TableScan { .. }
| QueryNode::Aggregate { .. }
| QueryNode::Union { .. }
| QueryNode::SubqueryRoot { .. } => {}
Expand Down Expand Up @@ -220,11 +225,12 @@ impl QueryNode {
}

impl QueryGraph {
pub const ROOT_NODE_ID: NodeId = 0;

pub fn new() -> QueryGraph {
Self {
nodes: HashMap::new(),
entry_node: 0,
next_node_id: 0,
nodes: HashMap::from([(Self::ROOT_NODE_ID, QueryNode::QueryRoot { input: None })]),
next_node_id: Self::ROOT_NODE_ID + 1,
gen_number: 0,
parents: HashMap::new(),
subqueries: Vec::new(),
Expand All @@ -233,7 +239,28 @@ impl QueryGraph {
}

pub fn set_entry_node(&mut self, entry_node: NodeId) {
self.entry_node = entry_node;
match self.nodes.get_mut(&Self::ROOT_NODE_ID).unwrap() {
QueryNode::QueryRoot { input } => {
let old_entry_node = input.clone();
if let Some(old_entry_node) = old_entry_node {
self.parents
.get_mut(&old_entry_node)
.unwrap()
.remove(&Self::ROOT_NODE_ID);
}

*input = Some(entry_node);
self.parents
.entry(entry_node)
.or_insert_with(|| BTreeSet::new())
.insert(Self::ROOT_NODE_ID);

if let Some(old_entry_node) = old_entry_node {
self.remove_detached_nodes(old_entry_node);
}
}
_ => panic!("Unexpected root node"),
}
}

/// Returns a reference to the node under the given ID. The provided ID must
Expand Down Expand Up @@ -325,6 +352,7 @@ impl QueryGraph {
/// Invalidates the cached metadata for the nodes that are no longer part of the
/// query graph.
fn replace_node(&mut self, node_id: NodeId, new_node_id: NodeId) {
assert!(self.can_be_replaced(node_id));
self.invalidate_properties_upwards(node_id);

// All the parents of the old node are now parents of the new one
Expand Down Expand Up @@ -352,17 +380,20 @@ impl QueryGraph {
self.parents.insert(new_node_id, parents);
}
}
// Replace the reference to the entry node as well
if self.entry_node == node_id {
self.entry_node = new_node_id;
}

self.remove_detached_nodes(node_id);
}

pub fn can_be_replaced(&self, node_id: NodeId) -> bool {
match self.node(node_id) {
QueryNode::QueryRoot { .. } | QueryNode::SubqueryRoot { .. } => false,
_ => true,
}
}

pub fn garbage_collect(&mut self) {
let mut visited_nodes = HashSet::new();
let mut stack = vec![self.entry_node];
let mut stack = vec![Self::ROOT_NODE_ID];
while !stack.is_empty() {
let current = stack.pop().unwrap();
if visited_nodes.insert(current) {
Expand Down Expand Up @@ -404,7 +435,7 @@ impl QueryGraph {
/// hanging from the entry node.
fn collect_referenced_subqueries(&self) -> HashSet<NodeId> {
let mut referenced_subqueries = HashSet::new();
let mut stack = subgraph_subqueries(self, self.entry_node)
let mut stack = subgraph_subqueries(self, Self::ROOT_NODE_ID)
.iter()
.cloned()
.collect_vec();
Expand All @@ -422,7 +453,7 @@ impl QueryGraph {
#[allow(dead_code)]
fn collect_attached_nodes(&self) -> HashSet<NodeId> {
let mut attached_nodes = HashSet::new();
let mut queue = VecDeque::from([self.entry_node]);
let mut queue = VecDeque::from([Self::ROOT_NODE_ID]);
while let Some(node_id) = queue.pop_front() {
self.visit_subgraph_pre(
&mut |query_graph, node_id| {
Expand Down Expand Up @@ -509,7 +540,6 @@ impl Clone for QueryGraph {
fn clone(&self) -> Self {
Self {
nodes: self.nodes.clone(),
entry_node: self.entry_node,
next_node_id: self.next_node_id,
gen_number: self.gen_number,
parents: self.parents.clone(),
Expand Down Expand Up @@ -553,10 +583,6 @@ impl QueryGraph {
if !self.parents.contains_key(&node_id) {
let mut stack = vec![node_id];
while let Some(current_id) = stack.pop() {
// TODO(asenac) the root node should be an non-replaceable node!
if current_id == self.entry_node {
continue;
}
self.invalidate_bottom_up_properties(current_id);
self.invalidate_single_node_properties(current_id);

Expand Down
9 changes: 6 additions & 3 deletions src/query_graph/optimizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,9 @@ impl Optimizer {

/// Optimize the given query graph by applying the rules in this optimizer instance.
pub fn optimize(&self, context: &mut OptimizerContext, query_graph: &mut QueryGraph) {
self.optimization_loop(context, query_graph, |query_graph| query_graph.entry_node);
self.optimization_loop(context, query_graph, true, |query_graph| {
query_graph.node(QueryGraph::ROOT_NODE_ID).get_input(0)
});

// Optimize the subqueries in the query graph
// Note: optimizing a subquery may result on some other subquery being removed.
Expand All @@ -139,7 +141,7 @@ impl Optimizer {
.find(|i| last_subquery.is_none() || **i > last_subquery.unwrap())
.cloned()
{
self.optimization_loop(context, query_graph, |query_graph| {
self.optimization_loop(context, query_graph, false, |query_graph| {
query_graph.node(next_subquery).get_input(0)
});
last_subquery = Some(next_subquery);
Expand All @@ -150,6 +152,7 @@ impl Optimizer {
&self,
context: &mut OptimizerContext,
query_graph: &mut QueryGraph,
is_query_root: bool,
get_node_id: F,
) where
F: Fn(&QueryGraph) -> NodeId,
Expand All @@ -159,7 +162,7 @@ impl Optimizer {
let last_gen_number = query_graph.gen_number;

let mut node_id = get_node_id(query_graph);
if node_id == query_graph.entry_node {
if is_query_root {
// TODO(asenac) RootOnly vs. AnyRoot rules
self.apply_root_only_rules(context, query_graph, &mut node_id);
}
Expand Down
7 changes: 7 additions & 0 deletions src/query_graph/optimizer/rules/remove_passthrough_project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,13 @@ impl SingleReplacementRule for RemovePassthroughProjectRule {
}

fn apply(&self, query_graph: &mut QueryGraph, node_id: NodeId) -> Option<NodeId> {
if query_graph
.get_parents(node_id)
.map(|parents| parents.contains(&QueryGraph::ROOT_NODE_ID))
.unwrap_or(false)
{
return None;
}
if let QueryNode::Project { outputs, input } = query_graph.node(node_id) {
if query_graph.num_parents(node_id) > 0
&& outputs.len() == num_columns(query_graph, *input)
Expand Down
5 changes: 5 additions & 0 deletions src/query_graph/properties/input_dependencies.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ pub fn input_dependencies(query_graph: &QueryGraph, node_id: NodeId) -> Rc<HashS
}
let mut dependencies = HashSet::new();
match query_graph.node(node_id) {
QueryNode::QueryRoot { input } => {
if let Some(input) = input {
dependencies.extend(0..num_columns(query_graph, *input));
}
}
QueryNode::Project { outputs: exprs, .. }
| QueryNode::Join {
conditions: exprs, ..
Expand Down
5 changes: 5 additions & 0 deletions src/query_graph/properties/keys.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,11 @@ impl Keys {
) -> Rc<Vec<KeyBounds>> {
let mut keys = Vec::new();
match query_graph.node(node_id) {
QueryNode::QueryRoot { input } => {
if let Some(input) = input {
keys.extend(self.keys_unchecked(query_graph, *input).iter().cloned());
}
}
QueryNode::Project { outputs, input } => {
let input_keys = self.keys_unchecked(query_graph, *input);
// Lift the input keys through the projection expressions.
Expand Down
7 changes: 7 additions & 0 deletions src/query_graph/properties/num_columns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,13 @@ impl NumColumns {

fn compute_num_columns_for_node(&self, query_graph: &QueryGraph, node_id: NodeId) -> usize {
match query_graph.node(node_id) {
QueryNode::QueryRoot { input } => {
if let Some(input) = input {
self.num_columns_unchecked(query_graph, *input)
} else {
0
}
}
QueryNode::Project { outputs, .. } => outputs.len(),
QueryNode::Filter { input, .. } | QueryNode::SubqueryRoot { input } => {
self.num_columns_unchecked(query_graph, *input)
Expand Down
9 changes: 9 additions & 0 deletions src/query_graph/properties/pulled_up_predicates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,15 @@ impl PulledUpPredicates {
) -> Rc<Vec<ScalarExprRef>> {
let mut predicates = Vec::new();
match query_graph.node(node_id) {
QueryNode::QueryRoot { input } => {
if let Some(input) = input {
predicates.extend(
self.predicates_unchecked(query_graph, *input)
.iter()
.cloned(),
);
}
}
QueryNode::Project { outputs, input } => {
predicates.extend(
self.predicates_unchecked(query_graph, *input)
Expand Down
7 changes: 7 additions & 0 deletions src/query_graph/properties/row_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,13 @@ impl RowType {
node_id: NodeId,
) -> Rc<Vec<DataType>> {
match query_graph.node(node_id) {
QueryNode::QueryRoot { input } => {
if let Some(input) = input {
self.row_type_unchecked(query_graph, *input)
} else {
Default::default()
}
}
QueryNode::Project { outputs, input } => {
let input_row_type = self.row_type_unchecked(query_graph, *input);
outputs
Expand Down
13 changes: 9 additions & 4 deletions src/query_graph/visitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ impl QueryGraph {
where
V: QueryGraphPrePostVisitor,
{
self.visit_subgraph(visitor, self.entry_node);
self.visit_subgraph(visitor, QueryGraph::ROOT_NODE_ID);
}

/// Visits the sub-graph under the given node.
Expand Down Expand Up @@ -145,7 +145,7 @@ impl QueryGraph {
where
V: QueryGraphPrePostVisitorMut,
{
self.visit_subgraph_mut(visitor, self.entry_node)
self.visit_subgraph_mut(visitor, QueryGraph::ROOT_NODE_ID)
}

pub fn visit_subgraph_mut<V>(&mut self, visitor: &mut V, node_id: NodeId)
Expand Down Expand Up @@ -192,7 +192,7 @@ impl QueryGraph {
where
F: FnMut(&QueryGraph, NodeId) -> PreOrderVisitationResult,
{
self.visit_subgraph_pre(visitor, self.entry_node)
self.visit_subgraph_pre(visitor, QueryGraph::ROOT_NODE_ID)
}

pub fn visit_subgraph_pre<F>(&self, visitor: &mut F, node_id: NodeId)
Expand Down Expand Up @@ -292,7 +292,12 @@ mod tests {
ordered
);

let all_nodes = query_graph.nodes.keys().cloned().collect::<HashSet<_>>();
let all_nodes = query_graph
.nodes
.keys()
.filter(|node_id| **node_id != QueryGraph::ROOT_NODE_ID)
.cloned()
.collect::<HashSet<_>>();
assert_eq!(query_graph.collect_nodes_above(table_scan), all_nodes);
assert_eq!(
query_graph.collect_nodes_above(union_),
Expand Down
Loading

0 comments on commit 01f6958

Please sign in to comment.