Skip to content

Commit

Permalink
Fix
Browse files Browse the repository at this point in the history
  • Loading branch information
viirya committed Jan 25, 2024
1 parent 87cd11b commit 04fbe55
Showing 1 changed file with 25 additions and 9 deletions.
34 changes: 25 additions & 9 deletions datafusion/physical-plan/src/joins/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ use datafusion_physical_expr::{
LexOrdering, LexOrderingRef, PhysicalExpr, PhysicalExprRef, PhysicalSortExpr,
};

use datafusion_common::tree_node::{TreeNode, VisitRecursion};
use datafusion_common::tree_node::{Transformed, TreeNode, VisitRecursion};
use futures::future::{BoxFuture, Shared};
use futures::{ready, FutureExt};
use hashbrown::raw::RawTable;
Expand Down Expand Up @@ -504,14 +504,20 @@ pub fn adjust_right_output_partitioning(
fn replace_on_columns_of_right_ordering(
on_columns: &[(PhysicalExprRef, PhysicalExprRef)],
right_ordering: &mut [PhysicalSortExpr],
) {
) -> Result<()> {
for (left_col, right_col) in on_columns {
for item in right_ordering.iter_mut() {
if item.expr.eq(right_col) {
item.expr = left_col.clone();
}
let new_expr = item.expr.clone().transform(&|e| {
if e.eq(right_col) {
Ok(Transformed::Yes(left_col.clone()))
} else {
Ok(Transformed::No(e))
}
})?;
item.expr = new_expr;
}
}
Ok(())
}

fn offset_ordering(
Expand All @@ -520,6 +526,8 @@ fn offset_ordering(
offset: usize,
) -> Vec<PhysicalSortExpr> {
match join_type {
// In the case below, right ordering should be offseted with the left
// side length, since we append the right table to the left table.
JoinType::Inner | JoinType::Left | JoinType::Full | JoinType::Right => ordering
.iter()
.map(|sort_expr| PhysicalSortExpr {
Expand Down Expand Up @@ -548,8 +556,12 @@ pub fn calculate_join_output_ordering(
replace_on_columns_of_right_ordering(
on_columns,
&mut right_ordering.to_vec(),
);
merge_vectors(left_ordering, right_ordering)
)
.ok()?;
merge_vectors(
left_ordering,
&offset_ordering(right_ordering, &join_type, left_columns_len),
)
} else {
left_ordering.to_vec()
}
Expand All @@ -560,8 +572,12 @@ pub fn calculate_join_output_ordering(
replace_on_columns_of_right_ordering(
on_columns,
&mut right_ordering.to_vec(),
);
merge_vectors(right_ordering, left_ordering)
)
.ok()?;
merge_vectors(
right_ordering,
&offset_ordering(right_ordering, &join_type, left_columns_len),
)
} else {
offset_ordering(right_ordering, &join_type, left_columns_len)
}
Expand Down

0 comments on commit 04fbe55

Please sign in to comment.