diff --git a/native/core/src/execution/datafusion/planner.rs b/native/core/src/execution/datafusion/planner.rs index c5147d772..a2a75bf98 100644 --- a/native/core/src/execution/datafusion/planner.rs +++ b/native/core/src/execution/datafusion/planner.rs @@ -1095,11 +1095,9 @@ impl PhysicalPlanner { table_parquet_options.global.pushdown_filters = true; table_parquet_options.global.reorder_filters = true; - let mut builder = ParquetExecBuilder::new(file_scan_config) - .with_table_parquet_options(table_parquet_options) - .with_schema_adapter_factory( - Arc::new(CometSchemaAdapterFactory::default()), - ); + let mut builder = ParquetExecBuilder::new(file_scan_config) + .with_table_parquet_options(table_parquet_options) + .with_schema_adapter_factory(Arc::new(CometSchemaAdapterFactory::default())); if let Some(filter) = test_data_filters { builder = builder.with_predicate(filter); diff --git a/native/core/src/execution/datafusion/schema_adapter.rs b/native/core/src/execution/datafusion/schema_adapter.rs index 16d4b9d67..4573ba348 100644 --- a/native/core/src/execution/datafusion/schema_adapter.rs +++ b/native/core/src/execution/datafusion/schema_adapter.rs @@ -19,7 +19,7 @@ use arrow::compute::can_cast_types; use arrow_array::{new_null_array, Array, RecordBatch, RecordBatchOptions}; -use arrow_schema::{DataType, Schema, SchemaRef}; +use arrow_schema::{DataType, Schema, SchemaRef, TimeUnit}; use datafusion::datasource::schema_adapter::{SchemaAdapter, SchemaAdapterFactory, SchemaMapper}; use datafusion_comet_spark_expr::{spark_cast, EvalMode}; use datafusion_common::plan_err; @@ -38,11 +38,11 @@ impl SchemaAdapterFactory for CometSchemaAdapterFactory { /// schema. fn create( &self, - projected_table_schema: SchemaRef, + required_schema: SchemaRef, table_schema: SchemaRef, ) -> Box { Box::new(CometSchemaAdapter { - projected_table_schema, + required_schema, table_schema, }) } @@ -54,7 +54,7 @@ impl SchemaAdapterFactory for CometSchemaAdapterFactory { pub struct CometSchemaAdapter { /// The schema for the table, projected to include only the fields being output (projected) by the /// associated ParquetExec - projected_table_schema: SchemaRef, + required_schema: SchemaRef, /// The entire table schema for the table we're using this to adapt. /// /// This is used to evaluate any filters pushed down into the scan @@ -69,7 +69,7 @@ impl SchemaAdapter for CometSchemaAdapter { /// /// Panics if index is not in range for the table schema fn map_column_index(&self, index: usize, file_schema: &Schema) -> Option { - let field = self.projected_table_schema.field(index); + let field = self.required_schema.field(index); Some(file_schema.fields.find(field.name())?.0) } @@ -87,40 +87,29 @@ impl SchemaAdapter for CometSchemaAdapter { file_schema: &Schema, ) -> datafusion_common::Result<(Arc, Vec)> { let mut projection = Vec::with_capacity(file_schema.fields().len()); - let mut field_mappings = vec![None; self.projected_table_schema.fields().len()]; + let mut field_mappings = vec![None; self.required_schema.fields().len()]; for (file_idx, file_field) in file_schema.fields.iter().enumerate() { if let Some((table_idx, table_field)) = - self.projected_table_schema.fields().find(file_field.name()) + self.required_schema.fields().find(file_field.name()) { - // workaround for struct casting - match (file_field.data_type(), table_field.data_type()) { - // TODO need to use Comet cast logic to determine which casts are supported, - // but for now just add a hack to support casting between struct types - (DataType::Struct(_), DataType::Struct(_)) => { - field_mappings[table_idx] = Some(projection.len()); - projection.push(file_idx); - } - _ => { - if can_cast_types(file_field.data_type(), table_field.data_type()) { - field_mappings[table_idx] = Some(projection.len()); - projection.push(file_idx); - } else { - return plan_err!( - "Cannot cast file schema field {} of type {:?} to table schema field of type {:?}", - file_field.name(), - file_field.data_type(), - table_field.data_type() - ); - } - } + if spark_can_cast_types(file_field.data_type(), table_field.data_type()) { + field_mappings[table_idx] = Some(projection.len()); + projection.push(file_idx); + } else { + return plan_err!( + "Cannot cast file schema field {} of type {:?} to required schema field of type {:?}", + file_field.name(), + file_field.data_type(), + table_field.data_type() + ); } } } Ok(( Arc::new(SchemaMapping { - projected_table_schema: self.projected_table_schema.clone(), + projected_table_schema: self.required_schema.clone(), field_mappings, table_schema: self.table_schema.clone(), }), @@ -129,6 +118,19 @@ impl SchemaAdapter for CometSchemaAdapter { } } +pub fn spark_can_cast_types(from_type: &DataType, to_type: &DataType) -> bool { + // TODO add all Spark cast rules (they are currently implemented in + // org.apache.comet.expressions.CometCast#isSupported in JVM side) + match (from_type, to_type) { + (DataType::Struct(_), DataType::Struct(_)) => { + // workaround for struct casting + true + } + (_, DataType::Timestamp(TimeUnit::Nanosecond, _)) => false, + _ => can_cast_types(from_type, to_type), + } +} + // TODO SchemaMapping is mostly copied from DataFusion but calls spark_cast // instead of arrow cast - can we reduce the amount of code copied here and make // the DataFusion version more extensible? @@ -259,7 +261,8 @@ impl SchemaMapper for SchemaMapping { EvalMode::Legacy, "UTC", false, - )?.into_array(batch_col.len()) + )? + .into_array(batch_col.len()) // and if that works, return the field and column. .map(|new_col| (new_col, table_field.clone())) })