Skip to content

Commit

Permalink
Refactor TreeNode and cleanup some implementations
Browse files Browse the repository at this point in the history
  • Loading branch information
viirya committed Dec 28, 2023
1 parent 6403222 commit 2da06d5
Show file tree
Hide file tree
Showing 10 changed files with 104 additions and 168 deletions.
26 changes: 21 additions & 5 deletions datafusion/common/src/tree_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ use crate::Result;
/// [`LogicalPlan`]: https://docs.rs/datafusion-expr/latest/datafusion_expr/logical_plan/enum.LogicalPlan.html
/// [`Expr`]: https://docs.rs/datafusion-expr/latest/datafusion_expr/expr/enum.Expr.html
pub trait TreeNode: Sized {
/// Returns all children of the TreeNode
fn children_nodes(&self) -> Vec<&Self>;

/// Use preorder to iterate the node on the tree so that we can
/// stop fast for some cases.
///
Expand Down Expand Up @@ -211,7 +214,17 @@ pub trait TreeNode: Sized {
/// Apply the closure `F` to the node's children
fn apply_children<F>(&self, op: &mut F) -> Result<VisitRecursion>
where
F: FnMut(&Self) -> Result<VisitRecursion>;
F: FnMut(&Self) -> Result<VisitRecursion>,
{
for child in self.children_nodes() {
match op(child)? {
VisitRecursion::Continue => {}
VisitRecursion::Skip => return Ok(VisitRecursion::Continue),
VisitRecursion::Stop => return Ok(VisitRecursion::Stop),
}
}
Ok(VisitRecursion::Continue)
}

/// Apply transform `F` to the node's children, the transform `F` might have a direction(Preorder or Postorder)
fn map_children<F>(self, transform: F) -> Result<Self>
Expand Down Expand Up @@ -342,18 +355,21 @@ pub trait DynTreeNode {
/// Blanket implementation for Arc for any tye that implements
/// [`DynTreeNode`] (such as [`Arc<dyn PhysicalExpr>`])
impl<T: DynTreeNode + ?Sized> TreeNode for Arc<T> {
fn children_nodes(&self) -> Vec<&Arc<T>> {
unimplemented!("Call arc_children instead")
}

fn apply_children<F>(&self, op: &mut F) -> Result<VisitRecursion>
where
F: FnMut(&Self) -> Result<VisitRecursion>,
{
for child in self.arc_children() {
match op(&child)? {
for child in &self.arc_children() {
match op(child)? {
VisitRecursion::Continue => {}
VisitRecursion::Skip => return Ok(VisitRecursion::Continue),
VisitRecursion::Stop => return Ok(VisitRecursion::Stop),
}
}

Ok(VisitRecursion::Continue)
}

Expand All @@ -368,7 +384,7 @@ impl<T: DynTreeNode + ?Sized> TreeNode for Arc<T> {
let arc_self = Arc::clone(&self);
self.with_new_arc_children(arc_self, new_children?)
} else {
Ok(self)
Ok(self.clone())
}
}
}
31 changes: 5 additions & 26 deletions datafusion/core/src/physical_optimizer/enforce_distribution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ use crate::physical_plan::{
};

use arrow::compute::SortOptions;
use datafusion_common::tree_node::{Transformed, TreeNode, VisitRecursion};
use datafusion_common::tree_node::{Transformed, TreeNode};
use datafusion_expr::logical_plan::JoinType;
use datafusion_physical_expr::expressions::{Column, NoOp};
use datafusion_physical_expr::utils::map_columns_before_projection;
Expand Down Expand Up @@ -1409,18 +1409,8 @@ impl DistributionContext {
}

impl TreeNode for DistributionContext {
fn apply_children<F>(&self, op: &mut F) -> Result<VisitRecursion>
where
F: FnMut(&Self) -> Result<VisitRecursion>,
{
for child in &self.children_nodes {
match op(child)? {
VisitRecursion::Continue => {}
VisitRecursion::Skip => return Ok(VisitRecursion::Continue),
VisitRecursion::Stop => return Ok(VisitRecursion::Stop),
}
}
Ok(VisitRecursion::Continue)
fn children_nodes(&self) -> Vec<&Self> {
self.children_nodes.iter().collect()
}

fn map_children<F>(mut self, transform: F) -> Result<Self>
Expand Down Expand Up @@ -1483,19 +1473,8 @@ impl PlanWithKeyRequirements {
}

impl TreeNode for PlanWithKeyRequirements {
fn apply_children<F>(&self, op: &mut F) -> Result<VisitRecursion>
where
F: FnMut(&Self) -> Result<VisitRecursion>,
{
for child in &self.children {
match op(child)? {
VisitRecursion::Continue => {}
VisitRecursion::Skip => return Ok(VisitRecursion::Continue),
VisitRecursion::Stop => return Ok(VisitRecursion::Stop),
}
}

Ok(VisitRecursion::Continue)
fn children_nodes(&self) -> Vec<&Self> {
self.children.iter().collect()
}

fn map_children<F>(mut self, transform: F) -> Result<Self>
Expand Down
21 changes: 7 additions & 14 deletions datafusion/core/src/physical_optimizer/enforce_sorting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,11 +145,15 @@ impl PlanWithCorrespondingSort {
}

impl TreeNode for PlanWithCorrespondingSort {
fn children_nodes(&self) -> Vec<&Self> {
self.children_nodes.iter().collect()
}

fn apply_children<F>(&self, op: &mut F) -> Result<VisitRecursion>
where
F: FnMut(&Self) -> Result<VisitRecursion>,
{
for child in &self.children_nodes {
for child in self.children_nodes() {
match op(child)? {
VisitRecursion::Continue => {}
VisitRecursion::Skip => return Ok(VisitRecursion::Continue),
Expand Down Expand Up @@ -237,19 +241,8 @@ impl PlanWithCorrespondingCoalescePartitions {
}

impl TreeNode for PlanWithCorrespondingCoalescePartitions {
fn apply_children<F>(&self, op: &mut F) -> Result<VisitRecursion>
where
F: FnMut(&Self) -> Result<VisitRecursion>,
{
for child in &self.children_nodes {
match op(child)? {
VisitRecursion::Continue => {}
VisitRecursion::Skip => return Ok(VisitRecursion::Continue),
VisitRecursion::Stop => return Ok(VisitRecursion::Stop),
}
}

Ok(VisitRecursion::Continue)
fn children_nodes(&self) -> Vec<&Self> {
self.children_nodes.iter().collect()
}

fn map_children<F>(mut self, transform: F) -> Result<Self>
Expand Down
17 changes: 3 additions & 14 deletions datafusion/core/src/physical_optimizer/pipeline_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use crate::physical_optimizer::PhysicalOptimizerRule;
use crate::physical_plan::{with_new_children_if_necessary, ExecutionPlan};

use datafusion_common::config::OptimizerOptions;
use datafusion_common::tree_node::{Transformed, TreeNode, VisitRecursion};
use datafusion_common::tree_node::{Transformed, TreeNode};
use datafusion_common::{plan_err, DataFusionError};
use datafusion_physical_expr::intervals::utils::{check_support, is_datatype_supported};
use datafusion_physical_plan::joins::SymmetricHashJoinExec;
Expand Down Expand Up @@ -91,19 +91,8 @@ impl PipelineStatePropagator {
}

impl TreeNode for PipelineStatePropagator {
fn apply_children<F>(&self, op: &mut F) -> Result<VisitRecursion>
where
F: FnMut(&Self) -> Result<VisitRecursion>,
{
for child in &self.children {
match op(child)? {
VisitRecursion::Continue => {}
VisitRecursion::Skip => return Ok(VisitRecursion::Continue),
VisitRecursion::Stop => return Ok(VisitRecursion::Stop),
}
}

Ok(VisitRecursion::Continue)
fn children_nodes(&self) -> Vec<&Self> {
self.children.iter().collect()
}

fn map_children<F>(mut self, transform: F) -> Result<Self>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
use crate::physical_plan::{with_new_children_if_necessary, ExecutionPlan};

use datafusion_common::config::ConfigOptions;
use datafusion_common::tree_node::{Transformed, TreeNode, VisitRecursion};
use datafusion_common::tree_node::{Transformed, TreeNode};
use datafusion_physical_plan::unbounded_output;

/// For a given `plan`, this object carries the information one needs from its
Expand Down Expand Up @@ -104,18 +104,8 @@ impl OrderPreservationContext {
}

impl TreeNode for OrderPreservationContext {
fn apply_children<F>(&self, op: &mut F) -> Result<VisitRecursion>
where
F: FnMut(&Self) -> Result<VisitRecursion>,
{
for child in &self.children_nodes {
match op(child)? {
VisitRecursion::Continue => {}
VisitRecursion::Skip => return Ok(VisitRecursion::Continue),
VisitRecursion::Stop => return Ok(VisitRecursion::Stop),
}
}
Ok(VisitRecursion::Continue)
fn children_nodes(&self) -> Vec<&Self> {
self.children_nodes.iter().collect()
}

fn map_children<F>(mut self, transform: F) -> Result<Self>
Expand Down
18 changes: 4 additions & 14 deletions datafusion/core/src/physical_optimizer/sort_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use crate::physical_plan::repartition::RepartitionExec;
use crate::physical_plan::sorts::sort::SortExec;
use crate::physical_plan::{with_new_children_if_necessary, ExecutionPlan};

use datafusion_common::tree_node::{Transformed, TreeNode, VisitRecursion};
use datafusion_common::tree_node::{Transformed, TreeNode};
use datafusion_common::{plan_err, DataFusionError, JoinSide, Result};
use datafusion_expr::JoinType;
use datafusion_physical_expr::expressions::Column;
Expand Down Expand Up @@ -71,20 +71,10 @@ impl SortPushDown {
}

impl TreeNode for SortPushDown {
fn apply_children<F>(&self, op: &mut F) -> Result<VisitRecursion>
where
F: FnMut(&Self) -> Result<VisitRecursion>,
{
for child in &self.children_nodes {
match op(child)? {
VisitRecursion::Continue => {}
VisitRecursion::Skip => return Ok(VisitRecursion::Continue),
VisitRecursion::Stop => return Ok(VisitRecursion::Stop),
}
}

Ok(VisitRecursion::Continue)
fn children_nodes(&self) -> Vec<&Self> {
self.children_nodes.iter().collect()
}

fn map_children<F>(mut self, transform: F) -> Result<Self>
where
F: FnMut(Self) -> Result<Self>,
Expand Down
Loading

0 comments on commit 2da06d5

Please sign in to comment.