diff --git a/native/core/src/execution/datafusion/planner.rs b/native/core/src/execution/datafusion/planner.rs index 40fe4515c..c40780c58 100644 --- a/native/core/src/execution/datafusion/planner.rs +++ b/native/core/src/execution/datafusion/planner.rs @@ -17,9 +17,10 @@ //! Converts Spark physical plan to DataFusion physical plan +use datafusion::physical_plan::filter::FilterExec; use super::expressions::EvalMode; use crate::execution::datafusion::expressions::comet_scalar_funcs::create_comet_physical_fun; -use crate::execution::operators::{CopyMode, FilterExec}; +use crate::execution::operators::{CopyMode, FilterExec as CometFilterExec}; use crate::{ errors::ExpressionError, execution::{ @@ -851,7 +852,11 @@ impl PhysicalPlanner { let predicate = self.create_expr(filter.predicate.as_ref().unwrap(), child.schema())?; - Ok((scans, Arc::new(FilterExec::try_new(predicate, child)?))) + if can_reuse_input_batch(&child) { + Ok((scans, Arc::new(CometFilterExec::try_new(predicate, child)?))) + } else { + Ok((scans, Arc::new(FilterExec::try_new(predicate, child)?))) + } } OpStruct::HashAgg(agg) => { assert!(children.len() == 1); diff --git a/native/core/src/execution/operators/filter.rs b/native/core/src/execution/operators/filter.rs index d9a54712d..18a094602 100644 --- a/native/core/src/execution/operators/filter.rs +++ b/native/core/src/execution/operators/filter.rs @@ -227,7 +227,7 @@ impl DisplayAs for FilterExec { impl ExecutionPlan for FilterExec { fn name(&self) -> &'static str { - "FilterExec" + "CometFilterExec" } /// Return a reference to Any that can be used for downcasting