Skip to content

Commit

Permalink
For review
Browse files Browse the repository at this point in the history
  • Loading branch information
viirya committed Mar 18, 2024
1 parent 68418ac commit 08db759
Showing 1 changed file with 5 additions and 3 deletions.
8 changes: 5 additions & 3 deletions core/src/execution/datafusion/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -924,13 +924,13 @@ impl PhysicalPlanner {
// DataFusion `SortMergeJoinExec` operator keeps the input batch internally. We need
// to copy the input batch to avoid the data corruption from reusing the input
// batch.
let left = if op_reuse_array(&left) {
let left = if can_reuse_input_batch(&left) {
Arc::new(CopyExec::new(left))
} else {
left
};

let right = if op_reuse_array(&right) {
let right = if can_reuse_input_batch(&right) {
Arc::new(CopyExec::new(right))
} else {
right
Expand All @@ -943,6 +943,8 @@ impl PhysicalPlanner {
None,
join_type,
sort_options,
// null doesn't equal to null in Spark join key. If the join key is
// `EqualNullSafe`, Spark will rewrite it during planning.
false,
)?);

Expand Down Expand Up @@ -1134,7 +1136,7 @@ impl From<ExpressionError> for DataFusionError {
/// Returns true if given operator can return input array as output array without
/// modification. This is used to determine if we need to copy the input batch to avoid
/// data corruption from reusing the input batch.
fn op_reuse_array(op: &Arc<dyn ExecutionPlan>) -> bool {
fn can_reuse_input_batch(op: &Arc<dyn ExecutionPlan>) -> bool {
op.as_any().downcast_ref::<ScanExec>().is_some()
|| op.as_any().downcast_ref::<LocalLimitExec>().is_some()
|| op.as_any().downcast_ref::<ProjectionExec>().is_some()
Expand Down

0 comments on commit 08db759

Please sign in to comment.