diff --git a/core/src/execution/datafusion/planner.rs b/core/src/execution/datafusion/planner.rs index 752828ad2..d19883730 100644 --- a/core/src/execution/datafusion/planner.rs +++ b/core/src/execution/datafusion/planner.rs @@ -965,13 +965,13 @@ impl PhysicalPlanner { // DataFusion `HashJoinExec` operator keeps the input batch internally. We need // to copy the input batch to avoid the data corruption from reusing the input // batch. - let left = if !is_op_do_copying(&left) { + let left = if op_reuse_array(&left) { Arc::new(CopyExec::new(left)) } else { left }; - let right = if !is_op_do_copying(&right) { + let right = if op_reuse_array(&right) { Arc::new(CopyExec::new(right)) } else { right @@ -1157,12 +1157,14 @@ impl From for DataFusionError { } } -/// Returns true if given operator copies input batch to avoid data corruption from reusing -/// input arrays. -fn is_op_do_copying(op: &Arc) -> bool { - op.as_any().downcast_ref::().is_some() - || op.as_any().downcast_ref::().is_some() - || op.as_any().downcast_ref::().is_some() +/// Returns true if given operator can return input array as output array without +/// modification. This is used to determine if we need to copy the input batch to avoid +/// data corruption from reusing the input batch. +fn op_reuse_array(op: &Arc) -> bool { + op.as_any().downcast_ref::().is_some() + || op.as_any().downcast_ref::().is_some() + || op.as_any().downcast_ref::().is_some() + || op.as_any().downcast_ref::().is_some() } /// Collects the indices of the columns in the input schema that are used in the expression