diff --git a/datafusion/physical-plan/src/joins/utils.rs b/datafusion/physical-plan/src/joins/utils.rs index e510d11c4f7f..e6e3f83fd7e8 100644 --- a/datafusion/physical-plan/src/joins/utils.rs +++ b/datafusion/physical-plan/src/joins/utils.rs @@ -45,12 +45,12 @@ use datafusion_common::{ use datafusion_expr::interval_arithmetic::Interval; use datafusion_physical_expr::equivalence::add_offset_to_expr; use datafusion_physical_expr::expressions::Column; -use datafusion_physical_expr::utils::merge_vectors; +use datafusion_physical_expr::utils::{collect_columns, merge_vectors}; use datafusion_physical_expr::{ LexOrdering, LexOrderingRef, PhysicalExpr, PhysicalExprRef, PhysicalSortExpr, }; -use datafusion_common::tree_node::{Transformed, TreeNode, VisitRecursion}; +use datafusion_common::tree_node::{Transformed, TreeNode}; use futures::future::{BoxFuture, Shared}; use futures::{ready, FutureExt}; use hashbrown::raw::RawTable; @@ -410,42 +410,13 @@ fn check_join_set_is_valid( ) -> Result<()> { let on_left = &on .iter() - .flat_map(|on| { - let left = on.0.clone(); - - let mut columns = vec![]; - left.apply(&mut |expr| { - Ok({ - if let Some(column) = expr.as_any().downcast_ref::() { - columns.push(column.clone()); - } - VisitRecursion::Continue - }) - }) - .unwrap(); - columns - }) + .flat_map(|on| collect_columns(&on.0)) .collect::>(); let left_missing = on_left.difference(left).collect::>(); let on_right = &on .iter() - .flat_map(|on| { - let right = on.1.clone(); - - let mut columns = vec![]; - right - .apply(&mut |expr| { - Ok({ - if let Some(column) = expr.as_any().downcast_ref::() { - columns.push(column.clone()); - } - VisitRecursion::Continue - }) - }) - .unwrap(); - columns - }) + .flat_map(|on| collect_columns(&on.1)) .collect::>(); let right_missing = on_right.difference(right).collect::>();