diff --git a/core/src/execution/datafusion/planner.rs b/core/src/execution/datafusion/planner.rs index cbbe4877a..4d21a8324 100644 --- a/core/src/execution/datafusion/planner.rs +++ b/core/src/execution/datafusion/planner.rs @@ -1106,8 +1106,9 @@ impl From for DataFusionError { } } -/// Returns true if given operator probably returns input array as output array without -/// modification. +/// Returns true if given operator can return input array as output array without +/// modification. This is used to determine if we need to copy the input batch to avoid +/// data corruption from reusing the input batch. fn op_reuse_array(op: &Arc) -> bool { op.as_any().downcast_ref::().is_some() || op.as_any().downcast_ref::().is_some() diff --git a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala index a51acc988..3539e2cf7 100644 --- a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala +++ b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala @@ -223,13 +223,9 @@ class CometSparkSessionExtensions // spotless:on private def transform(plan: SparkPlan): SparkPlan = { def transform1(op: SparkPlan): Option[Operator] = { - val allNativeExec = op.children.map { - case childNativeOp: CometNativeExec => Some(childNativeOp.nativeOp) - case _ => None - } - - if (allNativeExec.forall(_.isDefined)) { - QueryPlanSerde.operator2Proto(op, allNativeExec.map(_.get): _*) + if (op.children.forall(_.isInstanceOf[CometNativeExec])) { + QueryPlanSerde.operator2Proto(op, + op.children.map(_.asInstanceOf[CometNativeExec].nativeOp): _*) } else { None }