diff --git a/datafusion/physical-plan/src/joins/utils.rs b/datafusion/physical-plan/src/joins/utils.rs index 6df8f5e75268..2224aecfafaa 100644 --- a/datafusion/physical-plan/src/joins/utils.rs +++ b/datafusion/physical-plan/src/joins/utils.rs @@ -50,7 +50,7 @@ use datafusion_physical_expr::{ LexOrdering, LexOrderingRef, PhysicalExpr, PhysicalExprRef, PhysicalSortExpr, }; -use datafusion_common::tree_node::{TreeNode, VisitRecursion}; +use datafusion_common::tree_node::{Transformed, TreeNode, VisitRecursion}; use futures::future::{BoxFuture, Shared}; use futures::{ready, FutureExt}; use hashbrown::raw::RawTable; @@ -504,14 +504,20 @@ pub fn adjust_right_output_partitioning( fn replace_on_columns_of_right_ordering( on_columns: &[(PhysicalExprRef, PhysicalExprRef)], right_ordering: &mut [PhysicalSortExpr], -) { +) -> Result<()> { for (left_col, right_col) in on_columns { for item in right_ordering.iter_mut() { - if item.expr.eq(right_col) { - item.expr = left_col.clone(); - } + let new_expr = item.expr.clone().transform(&|e| { + if e.eq(right_col) { + Ok(Transformed::Yes(left_col.clone())) + } else { + Ok(Transformed::No(e)) + } + })?; + item.expr = new_expr; } } + Ok(()) } fn offset_ordering( @@ -520,6 +526,8 @@ fn offset_ordering( offset: usize, ) -> Vec { match join_type { + // In the case below, right ordering should be offseted with the left + // side length, since we append the right table to the left table. JoinType::Inner | JoinType::Left | JoinType::Full | JoinType::Right => ordering .iter() .map(|sort_expr| PhysicalSortExpr { @@ -548,8 +556,12 @@ pub fn calculate_join_output_ordering( replace_on_columns_of_right_ordering( on_columns, &mut right_ordering.to_vec(), - ); - merge_vectors(left_ordering, right_ordering) + ) + .ok()?; + merge_vectors( + left_ordering, + &offset_ordering(right_ordering, &join_type, left_columns_len), + ) } else { left_ordering.to_vec() } @@ -560,8 +572,12 @@ pub fn calculate_join_output_ordering( replace_on_columns_of_right_ordering( on_columns, &mut right_ordering.to_vec(), - ); - merge_vectors(right_ordering, left_ordering) + ) + .ok()?; + merge_vectors( + right_ordering, + &offset_ordering(right_ordering, &join_type, left_columns_len), + ) } else { offset_ordering(right_ordering, &join_type, left_columns_len) }