Skip to content

Commit

Permalink
feat: Add HashJoin support for BuildRight (#437)
Browse files Browse the repository at this point in the history
* feat: Add HashJoin support for BuildRight

* Enable test

* Update plan stability

* More

* Update plan stability

* Refine

* Fix

* Update diffs to fix Spark tests

* Update diff

* Update Spark 3.4.3 diff

* Use BuildSide enum

* Update diffs

* Update plan stability for Spark 4.0

* Update q5a plan
  • Loading branch information
viirya authored Jun 8, 2024
1 parent 311e13e commit 32c61f5
Show file tree
Hide file tree
Showing 479 changed files with 56,791 additions and 58,646 deletions.
76 changes: 35 additions & 41 deletions core/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 12 additions & 3 deletions core/src/execution/datafusion/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use datafusion::{
},
AggregateExpr, PhysicalExpr, PhysicalSortExpr, ScalarFunctionExpr,
},
physical_optimizer::join_selection::swap_hash_join,
physical_plan::{
aggregates::{AggregateMode as DFAggregateMode, PhysicalGroupBy},
filter::FilterExec,
Expand Down Expand Up @@ -91,7 +92,7 @@ use crate::{
agg_expr::ExprStruct as AggExprStruct, expr::ExprStruct, literal::Value, AggExpr, Expr,
ScalarFunc,
},
spark_operator::{operator::OpStruct, JoinType, Operator},
spark_operator::{operator::OpStruct, BuildSide, JoinType, Operator},
spark_partitioning::{partitioning::PartitioningStruct, Partitioning as SparkPartitioning},
},
};
Expand Down Expand Up @@ -965,7 +966,7 @@ impl PhysicalPlanner {
join.join_type,
&join.condition,
)?;
let join = Arc::new(HashJoinExec::try_new(
let hash_join = Arc::new(HashJoinExec::try_new(
join_params.left,
join_params.right,
join_params.join_on,
Expand All @@ -977,7 +978,15 @@ impl PhysicalPlanner {
// `EqualNullSafe`, Spark will rewrite it during planning.
false,
)?);
Ok((scans, join))

// If the hash join is build right, we need to swap the left and right
let hash_join = if join.build_side == BuildSide::BuildLeft as i32 {
hash_join
} else {
swap_hash_join(hash_join.as_ref(), PartitionMode::Partitioned)?
};

Ok((scans, hash_join))
}
}
}
Expand Down
6 changes: 6 additions & 0 deletions core/src/execution/proto/operator.proto
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ message HashJoin {
repeated spark.spark_expression.Expr right_join_keys = 2;
JoinType join_type = 3;
optional spark.spark_expression.Expr condition = 4;
BuildSide build_side = 5;
}

message SortMergeJoin {
Expand All @@ -114,3 +115,8 @@ enum JoinType {
LeftAnti = 6;
RightAnti = 7;
}

enum BuildSide {
BuildLeft = 0;
BuildRight = 1;
}
Loading

0 comments on commit 32c61f5

Please sign in to comment.