diff --git a/core/src/execution/datafusion/planner.rs b/core/src/execution/datafusion/planner.rs index 88783c207..fae20915e 100644 --- a/core/src/execution/datafusion/planner.rs +++ b/core/src/execution/datafusion/planner.rs @@ -1131,8 +1131,9 @@ impl From for DataFusionError { } } -/// Returns true if given operator probably returns input array as output array without -/// modification. +/// Returns true if given operator can return input array as output array without +/// modification. This is used to determine if we need to copy the input batch to avoid +/// data corruption from reusing the input batch. fn op_reuse_array(op: &Arc) -> bool { op.as_any().downcast_ref::().is_some() || op.as_any().downcast_ref::().is_some() diff --git a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala index dd197c972..1e868401e 100644 --- a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala +++ b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala @@ -224,13 +224,9 @@ class CometSparkSessionExtensions // spotless:on private def transform(plan: SparkPlan): SparkPlan = { def transform1(op: SparkPlan): Option[Operator] = { - val allNativeExec = op.children.map { - case childNativeOp: CometNativeExec => Some(childNativeOp.nativeOp) - case _ => None - } - - if (allNativeExec.forall(_.isDefined)) { - QueryPlanSerde.operator2Proto(op, allNativeExec.map(_.get): _*) + if (op.children.forall(_.isInstanceOf[CometNativeExec])) { + QueryPlanSerde.operator2Proto(op, + op.children.map(_.asInstanceOf[CometNativeExec].nativeOp): _*) } else { None }