Skip to content

Commit

Permalink
improve error handling
Browse files Browse the repository at this point in the history
  • Loading branch information
andygrove committed Dec 4, 2024
1 parent bf6b4d4 commit d4d71bc
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 35 deletions.
8 changes: 3 additions & 5 deletions native/core/src/execution/datafusion/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1095,11 +1095,9 @@ impl PhysicalPlanner {
table_parquet_options.global.pushdown_filters = true;
table_parquet_options.global.reorder_filters = true;

let mut builder = ParquetExecBuilder::new(file_scan_config)
.with_table_parquet_options(table_parquet_options)
.with_schema_adapter_factory(
Arc::new(CometSchemaAdapterFactory::default()),
);
let mut builder = ParquetExecBuilder::new(file_scan_config)
.with_table_parquet_options(table_parquet_options)
.with_schema_adapter_factory(Arc::new(CometSchemaAdapterFactory::default()));

if let Some(filter) = test_data_filters {
builder = builder.with_predicate(filter);
Expand Down
63 changes: 33 additions & 30 deletions native/core/src/execution/datafusion/schema_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
use arrow::compute::can_cast_types;
use arrow_array::{new_null_array, Array, RecordBatch, RecordBatchOptions};
use arrow_schema::{DataType, Schema, SchemaRef};
use arrow_schema::{DataType, Schema, SchemaRef, TimeUnit};
use datafusion::datasource::schema_adapter::{SchemaAdapter, SchemaAdapterFactory, SchemaMapper};
use datafusion_comet_spark_expr::{spark_cast, EvalMode};
use datafusion_common::plan_err;
Expand All @@ -38,11 +38,11 @@ impl SchemaAdapterFactory for CometSchemaAdapterFactory {
/// schema.
fn create(
&self,
projected_table_schema: SchemaRef,
required_schema: SchemaRef,
table_schema: SchemaRef,
) -> Box<dyn SchemaAdapter> {
Box::new(CometSchemaAdapter {
projected_table_schema,
required_schema,
table_schema,
})
}
Expand All @@ -54,7 +54,7 @@ impl SchemaAdapterFactory for CometSchemaAdapterFactory {
pub struct CometSchemaAdapter {
/// The schema for the table, projected to include only the fields being output (projected) by the
/// associated ParquetExec
projected_table_schema: SchemaRef,
required_schema: SchemaRef,
/// The entire table schema for the table we're using this to adapt.
///
/// This is used to evaluate any filters pushed down into the scan
Expand All @@ -69,7 +69,7 @@ impl SchemaAdapter for CometSchemaAdapter {
///
/// Panics if index is not in range for the table schema
fn map_column_index(&self, index: usize, file_schema: &Schema) -> Option<usize> {
let field = self.projected_table_schema.field(index);
let field = self.required_schema.field(index);
Some(file_schema.fields.find(field.name())?.0)
}

Expand All @@ -87,40 +87,29 @@ impl SchemaAdapter for CometSchemaAdapter {
file_schema: &Schema,
) -> datafusion_common::Result<(Arc<dyn SchemaMapper>, Vec<usize>)> {
let mut projection = Vec::with_capacity(file_schema.fields().len());
let mut field_mappings = vec![None; self.projected_table_schema.fields().len()];
let mut field_mappings = vec![None; self.required_schema.fields().len()];

for (file_idx, file_field) in file_schema.fields.iter().enumerate() {
if let Some((table_idx, table_field)) =
self.projected_table_schema.fields().find(file_field.name())
self.required_schema.fields().find(file_field.name())
{
// workaround for struct casting
match (file_field.data_type(), table_field.data_type()) {
// TODO need to use Comet cast logic to determine which casts are supported,
// but for now just add a hack to support casting between struct types
(DataType::Struct(_), DataType::Struct(_)) => {
field_mappings[table_idx] = Some(projection.len());
projection.push(file_idx);
}
_ => {
if can_cast_types(file_field.data_type(), table_field.data_type()) {
field_mappings[table_idx] = Some(projection.len());
projection.push(file_idx);
} else {
return plan_err!(
"Cannot cast file schema field {} of type {:?} to table schema field of type {:?}",
file_field.name(),
file_field.data_type(),
table_field.data_type()
);
}
}
if spark_can_cast_types(file_field.data_type(), table_field.data_type()) {
field_mappings[table_idx] = Some(projection.len());
projection.push(file_idx);
} else {
return plan_err!(
"Cannot cast file schema field {} of type {:?} to required schema field of type {:?}",
file_field.name(),
file_field.data_type(),
table_field.data_type()
);
}
}
}

Ok((
Arc::new(SchemaMapping {
projected_table_schema: self.projected_table_schema.clone(),
projected_table_schema: self.required_schema.clone(),
field_mappings,
table_schema: self.table_schema.clone(),
}),
Expand All @@ -129,6 +118,19 @@ impl SchemaAdapter for CometSchemaAdapter {
}
}

pub fn spark_can_cast_types(from_type: &DataType, to_type: &DataType) -> bool {
// TODO add all Spark cast rules (they are currently implemented in
// org.apache.comet.expressions.CometCast#isSupported in JVM side)
match (from_type, to_type) {
(DataType::Struct(_), DataType::Struct(_)) => {
// workaround for struct casting
true
}
(_, DataType::Timestamp(TimeUnit::Nanosecond, _)) => false,
_ => can_cast_types(from_type, to_type),
}
}

// TODO SchemaMapping is mostly copied from DataFusion but calls spark_cast
// instead of arrow cast - can we reduce the amount of code copied here and make
// the DataFusion version more extensible?
Expand Down Expand Up @@ -259,7 +261,8 @@ impl SchemaMapper for SchemaMapping {
EvalMode::Legacy,
"UTC",
false,
)?.into_array(batch_col.len())
)?
.into_array(batch_col.len())
// and if that works, return the field and column.
.map(|new_col| (new_col, table_field.clone()))
})
Expand Down

0 comments on commit d4d71bc

Please sign in to comment.