Skip to content

Commit

Permalink
Merge pull request #132 from peter-toth/refactor_treenode2
Browse files Browse the repository at this point in the history
Avoid cloning in `TreeNode.children_nodes()` implementations where possible using `Cow`
  • Loading branch information
viirya authored Dec 31, 2023
2 parents d4eab3a + ee4c9df commit 3b8a7ad
Show file tree
Hide file tree
Showing 10 changed files with 59 additions and 51 deletions.
9 changes: 5 additions & 4 deletions datafusion/common/src/tree_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
//! This module provides common traits for visiting or rewriting tree
//! data structures easily.
use std::borrow::Cow;
use std::sync::Arc;

use crate::Result;
Expand All @@ -32,9 +33,9 @@ use crate::Result;
/// [`PhysicalExpr`]: https://docs.rs/datafusion/latest/datafusion/physical_plan/trait.PhysicalExpr.html
/// [`LogicalPlan`]: https://docs.rs/datafusion-expr/latest/datafusion_expr/logical_plan/enum.LogicalPlan.html
/// [`Expr`]: https://docs.rs/datafusion-expr/latest/datafusion_expr/expr/enum.Expr.html
pub trait TreeNode: Sized {
pub trait TreeNode: Sized + Clone {
/// Returns all children of the TreeNode
fn children_nodes(&self) -> Vec<Self>;
fn children_nodes(&self) -> Vec<Cow<Self>>;

/// Use preorder to iterate the node on the tree so that we can
/// stop fast for some cases.
Expand Down Expand Up @@ -355,8 +356,8 @@ pub trait DynTreeNode {
/// Blanket implementation for Arc for any tye that implements
/// [`DynTreeNode`] (such as [`Arc<dyn PhysicalExpr>`])
impl<T: DynTreeNode + ?Sized> TreeNode for Arc<T> {
fn children_nodes(&self) -> Vec<Arc<T>> {
self.arc_children()
fn children_nodes(&self) -> Vec<Cow<Self>> {
self.arc_children().into_iter().map(Cow::Owned).collect()
}

fn map_children<F>(self, transform: F) -> Result<Self>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
//! according to the configuration), this rule increases partition counts in
//! the physical plan.
use std::borrow::Cow;
use std::fmt;
use std::fmt::Formatter;
use std::sync::Arc;
Expand Down Expand Up @@ -1409,8 +1410,8 @@ impl DistributionContext {
}

impl TreeNode for DistributionContext {
fn children_nodes(&self) -> Vec<Self> {
self.children_nodes.to_vec()
fn children_nodes(&self) -> Vec<Cow<Self>> {
self.children_nodes.iter().map(Cow::Borrowed).collect()
}

fn map_children<F>(mut self, transform: F) -> Result<Self>
Expand Down Expand Up @@ -1473,8 +1474,8 @@ impl PlanWithKeyRequirements {
}

impl TreeNode for PlanWithKeyRequirements {
fn children_nodes(&self) -> Vec<Self> {
self.children.to_vec()
fn children_nodes(&self) -> Vec<Cow<Self>> {
self.children.iter().map(Cow::Borrowed).collect()
}

fn map_children<F>(mut self, transform: F) -> Result<Self>
Expand Down
9 changes: 5 additions & 4 deletions datafusion/core/src/physical_optimizer/enforce_sorting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
//! in the physical plan. The first sort is unnecessary since its result is overwritten
//! by another [`SortExec`]. Therefore, this rule removes it from the physical plan.
use std::borrow::Cow;
use std::sync::Arc;

use crate::config::ConfigOptions;
Expand Down Expand Up @@ -145,8 +146,8 @@ impl PlanWithCorrespondingSort {
}

impl TreeNode for PlanWithCorrespondingSort {
fn children_nodes(&self) -> Vec<Self> {
self.children_nodes.to_vec()
fn children_nodes(&self) -> Vec<Cow<Self>> {
self.children_nodes.iter().map(Cow::Borrowed).collect()
}

fn map_children<F>(mut self, transform: F) -> Result<Self>
Expand Down Expand Up @@ -226,8 +227,8 @@ impl PlanWithCorrespondingCoalescePartitions {
}

impl TreeNode for PlanWithCorrespondingCoalescePartitions {
fn children_nodes(&self) -> Vec<Self> {
self.children_nodes.to_vec()
fn children_nodes(&self) -> Vec<Cow<Self>> {
self.children_nodes.iter().map(Cow::Borrowed).collect()
}

fn map_children<F>(mut self, transform: F) -> Result<Self>
Expand Down
5 changes: 3 additions & 2 deletions datafusion/core/src/physical_optimizer/pipeline_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
//! infinite sources, if there are any. It will reject non-runnable query plans
//! that use pipeline-breaking operators on infinite input(s).
use std::borrow::Cow;
use std::sync::Arc;

use crate::config::ConfigOptions;
Expand Down Expand Up @@ -91,8 +92,8 @@ impl PipelineStatePropagator {
}

impl TreeNode for PipelineStatePropagator {
fn children_nodes(&self) -> Vec<Self> {
self.children.to_vec()
fn children_nodes(&self) -> Vec<Cow<Self>> {
self.children.iter().map(Cow::Borrowed).collect()
}

fn map_children<F>(mut self, transform: F) -> Result<Self>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
//! order-preserving variants when it is helpful; either in terms of
//! performance or to accommodate unbounded streams by fixing the pipeline.
use std::borrow::Cow;
use std::sync::Arc;

use super::utils::is_repartition;
Expand Down Expand Up @@ -104,8 +105,8 @@ impl OrderPreservationContext {
}

impl TreeNode for OrderPreservationContext {
fn children_nodes(&self) -> Vec<Self> {
self.children_nodes.to_vec()
fn children_nodes(&self) -> Vec<Cow<Self>> {
self.children_nodes.iter().map(Cow::Borrowed).collect()
}

fn map_children<F>(mut self, transform: F) -> Result<Self>
Expand Down
5 changes: 3 additions & 2 deletions datafusion/core/src/physical_optimizer/sort_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use std::borrow::Cow;
use std::sync::Arc;

use crate::physical_optimizer::utils::{
Expand Down Expand Up @@ -71,8 +72,8 @@ impl SortPushDown {
}

impl TreeNode for SortPushDown {
fn children_nodes(&self) -> Vec<Self> {
self.children_nodes.to_vec()
fn children_nodes(&self) -> Vec<Cow<Self>> {
self.children_nodes.iter().map(Cow::Borrowed).collect()
}

fn map_children<F>(mut self, transform: F) -> Result<Self>
Expand Down
52 changes: 26 additions & 26 deletions datafusion/expr/src/tree_node/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,13 @@ use crate::expr::{
ScalarFunction, ScalarFunctionDefinition, Sort, TryCast, WindowFunction,
};
use crate::{Expr, GetFieldAccess};
use std::borrow::Cow;

use datafusion_common::tree_node::TreeNode;
use datafusion_common::{internal_err, DataFusionError, Result};

impl TreeNode for Expr {
fn children_nodes(&self) -> Vec<Self> {
fn children_nodes(&self) -> Vec<Cow<Self>> {
match self {
Expr::Alias(Alias { expr, .. })
| Expr::Not(expr)
Expand All @@ -44,26 +45,26 @@ impl TreeNode for Expr {
| Expr::Cast(Cast { expr, .. })
| Expr::TryCast(TryCast { expr, .. })
| Expr::Sort(Sort { expr, .. })
| Expr::InSubquery(InSubquery { expr, .. }) => vec![expr.as_ref().clone()],
| Expr::InSubquery(InSubquery { expr, .. }) => vec![Cow::Borrowed(expr)],
Expr::GetIndexedField(GetIndexedField { expr, field }) => {
let expr = expr.as_ref().clone();
let expr = Cow::Borrowed(expr.as_ref());
match field {
GetFieldAccess::ListIndex { key } => {
vec![key.as_ref().clone(), expr]
vec![Cow::Borrowed(key.as_ref()), expr]
}
GetFieldAccess::ListRange { start, stop } => {
vec![start.as_ref().clone(), stop.as_ref().clone(), expr]
vec![Cow::Borrowed(start), Cow::Borrowed(stop), expr]
}
GetFieldAccess::NamedStructField { name: _name } => {
vec![expr]
}
}
}
Expr::GroupingSet(GroupingSet::Rollup(exprs))
| Expr::GroupingSet(GroupingSet::Cube(exprs)) => exprs.clone(),
Expr::ScalarFunction(ScalarFunction { args, .. }) => args.clone(),
| Expr::GroupingSet(GroupingSet::Cube(exprs)) => exprs.into_iter().map(Cow::Borrowed).collect(),
Expr::ScalarFunction(ScalarFunction { args, .. }) => args.into_iter().map(Cow::Borrowed).collect(),
Expr::GroupingSet(GroupingSet::GroupingSets(lists_of_exprs)) => {
lists_of_exprs.clone().into_iter().flatten().collect()
lists_of_exprs.into_iter().flatten().map(Cow::Borrowed).collect()
}
Expr::Column(_)
// Treat OuterReferenceColumn as a leaf expression
Expand All @@ -75,30 +76,30 @@ impl TreeNode for Expr {
| Expr::Wildcard { .. }
| Expr::Placeholder(_) => vec![],
Expr::BinaryExpr(BinaryExpr { left, right, .. }) => {
vec![left.as_ref().clone(), right.as_ref().clone()]
vec![Cow::Borrowed(left), Cow::Borrowed(right)]
}
Expr::Like(Like { expr, pattern, .. })
| Expr::SimilarTo(Like { expr, pattern, .. }) => {
vec![expr.as_ref().clone(), pattern.as_ref().clone()]
vec![Cow::Borrowed(expr), Cow::Borrowed(pattern)]
}
Expr::Between(Between {
expr, low, high, ..
}) => vec![
expr.as_ref().clone(),
low.as_ref().clone(),
high.as_ref().clone(),
Cow::Borrowed(expr),
Cow::Borrowed(low),
Cow::Borrowed(high),
],
Expr::Case(case) => {
let mut expr_vec = vec![];
if let Some(expr) = case.expr.as_ref() {
expr_vec.push(expr.as_ref().clone());
expr_vec.push(Cow::Borrowed(expr.as_ref()));
};
for (when, then) in case.when_then_expr.iter() {
expr_vec.push(when.as_ref().clone());
expr_vec.push(then.as_ref().clone());
expr_vec.push(Cow::Borrowed(when));
expr_vec.push(Cow::Borrowed(then));
}
if let Some(else_expr) = case.else_expr.as_ref() {
expr_vec.push(else_expr.as_ref().clone());
expr_vec.push(Cow::Borrowed(else_expr));
}
expr_vec
}
Expand All @@ -108,13 +109,13 @@ impl TreeNode for Expr {
order_by,
..
}) => {
let mut expr_vec = args.clone();
let mut expr_vec: Vec<_> = args.into_iter().map(Cow::Borrowed).collect();

if let Some(f) = filter {
expr_vec.push(f.as_ref().clone());
expr_vec.push(Cow::Borrowed(f));
}
if let Some(o) = order_by {
expr_vec.extend(o.clone());
expr_vec.extend(o.into_iter().map(Cow::Borrowed).collect::<Vec<_>>());
}

expr_vec
Expand All @@ -125,15 +126,14 @@ impl TreeNode for Expr {
order_by,
..
}) => {
let mut expr_vec = args.clone();
expr_vec.extend(partition_by.clone());
expr_vec.extend(order_by.clone());
let mut expr_vec: Vec<_> = args.into_iter().map(Cow::Borrowed).collect();
expr_vec.extend(partition_by.into_iter().map(Cow::Borrowed).collect::<Vec<_>>());
expr_vec.extend(order_by.into_iter().map(Cow::Borrowed).collect::<Vec<_>>());
expr_vec
}
Expr::InList(InList { expr, list, .. }) => {
let mut expr_vec = vec![];
expr_vec.push(expr.as_ref().clone());
expr_vec.extend(list.clone());
let mut expr_vec = vec![Cow::Borrowed(expr.as_ref())];
expr_vec.extend(list.into_iter().map(Cow::Borrowed).collect::<Vec<_>>());
expr_vec
}
}
Expand Down
5 changes: 3 additions & 2 deletions datafusion/expr/src/tree_node/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@
use crate::LogicalPlan;
use datafusion_common::tree_node::{TreeNodeVisitor, VisitRecursion};
use datafusion_common::{tree_node::TreeNode, Result};
use std::borrow::Cow;

impl TreeNode for LogicalPlan {
fn children_nodes(&self) -> Vec<Self> {
self.inputs().into_iter().cloned().collect()
fn children_nodes(&self) -> Vec<Cow<Self>> {
self.inputs().into_iter().map(Cow::Borrowed).collect()
}

fn apply<F>(&self, op: &mut F) -> Result<VisitRecursion>
Expand Down
5 changes: 3 additions & 2 deletions datafusion/physical-expr/src/sort_properties.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use std::borrow::Cow;
use std::{ops::Neg, sync::Arc};

use arrow_schema::SortOptions;
Expand Down Expand Up @@ -173,8 +174,8 @@ impl ExprOrdering {
}

impl TreeNode for ExprOrdering {
fn children_nodes(&self) -> Vec<Self> {
self.children.to_vec()
fn children_nodes(&self) -> Vec<Cow<Self>> {
self.children.iter().map(Cow::Borrowed).collect()
}

fn map_children<F>(mut self, transform: F) -> Result<Self>
Expand Down
6 changes: 3 additions & 3 deletions datafusion/physical-expr/src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
mod guarantee;
pub use guarantee::{Guarantee, LiteralGuarantee};

use std::borrow::Borrow;
use std::borrow::{Borrow, Cow};
use std::collections::{HashMap, HashSet};
use std::sync::Arc;

Expand Down Expand Up @@ -154,8 +154,8 @@ impl<T> ExprTreeNode<T> {
}

impl<T: Clone> TreeNode for ExprTreeNode<T> {
fn children_nodes(&self) -> Vec<Self> {
self.children().to_vec()
fn children_nodes(&self) -> Vec<Cow<Self>> {
self.children().into_iter().map(Cow::Borrowed).collect()
}

fn map_children<F>(mut self, transform: F) -> Result<Self>
Expand Down

0 comments on commit 3b8a7ad

Please sign in to comment.