Skip to content

Commit

Permalink
Use consistent function with sort merge join
Browse files Browse the repository at this point in the history
  • Loading branch information
viirya committed Mar 14, 2024
1 parent cbd87cf commit c95659c
Showing 1 changed file with 10 additions and 8 deletions.
18 changes: 10 additions & 8 deletions core/src/execution/datafusion/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -965,13 +965,13 @@ impl PhysicalPlanner {
// DataFusion `HashJoinExec` operator keeps the input batch internally. We need
// to copy the input batch to avoid the data corruption from reusing the input
// batch.
let left = if !is_op_do_copying(&left) {
let left = if op_reuse_array(&left) {
Arc::new(CopyExec::new(left))
} else {
left
};

let right = if !is_op_do_copying(&right) {
let right = if op_reuse_array(&right) {
Arc::new(CopyExec::new(right))
} else {
right
Expand Down Expand Up @@ -1157,12 +1157,14 @@ impl From<ExpressionError> for DataFusionError {
}
}

/// Returns true if given operator copies input batch to avoid data corruption from reusing
/// input arrays.
fn is_op_do_copying(op: &Arc<dyn ExecutionPlan>) -> bool {
op.as_any().downcast_ref::<CopyExec>().is_some()
|| op.as_any().downcast_ref::<CometExpandExec>().is_some()
|| op.as_any().downcast_ref::<SortExec>().is_some()
/// Returns true if given operator can return input array as output array without
/// modification. This is used to determine if we need to copy the input batch to avoid
/// data corruption from reusing the input batch.
fn op_reuse_array(op: &Arc<dyn ExecutionPlan>) -> bool {
op.as_any().downcast_ref::<ScanExec>().is_some()
|| op.as_any().downcast_ref::<LocalLimitExec>().is_some()
|| op.as_any().downcast_ref::<ProjectionExec>().is_some()
|| op.as_any().downcast_ref::<FilterExec>().is_some()
}

/// Collects the indices of the columns in the input schema that are used in the expression
Expand Down

0 comments on commit c95659c

Please sign in to comment.