From c95659c0f581abef680fd17e2845d958670d95d9 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Wed, 13 Mar 2024 22:28:43 -0700 Subject: [PATCH] Use consistent function with sort merge join --- core/src/execution/datafusion/planner.rs | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/core/src/execution/datafusion/planner.rs b/core/src/execution/datafusion/planner.rs index 752828ad2..d19883730 100644 --- a/core/src/execution/datafusion/planner.rs +++ b/core/src/execution/datafusion/planner.rs @@ -965,13 +965,13 @@ impl PhysicalPlanner { // DataFusion `HashJoinExec` operator keeps the input batch internally. We need // to copy the input batch to avoid the data corruption from reusing the input // batch. - let left = if !is_op_do_copying(&left) { + let left = if op_reuse_array(&left) { Arc::new(CopyExec::new(left)) } else { left }; - let right = if !is_op_do_copying(&right) { + let right = if op_reuse_array(&right) { Arc::new(CopyExec::new(right)) } else { right @@ -1157,12 +1157,14 @@ impl From for DataFusionError { } } -/// Returns true if given operator copies input batch to avoid data corruption from reusing -/// input arrays. -fn is_op_do_copying(op: &Arc) -> bool { - op.as_any().downcast_ref::().is_some() - || op.as_any().downcast_ref::().is_some() - || op.as_any().downcast_ref::().is_some() +/// Returns true if given operator can return input array as output array without +/// modification. This is used to determine if we need to copy the input batch to avoid +/// data corruption from reusing the input batch. +fn op_reuse_array(op: &Arc) -> bool { + op.as_any().downcast_ref::().is_some() + || op.as_any().downcast_ref::().is_some() + || op.as_any().downcast_ref::().is_some() + || op.as_any().downcast_ref::().is_some() } /// Collects the indices of the columns in the input schema that are used in the expression