Skip to content

Commit

Permalink
[comet-parquet-exec] Add unit test for reading a struct field from Pa…
Browse files Browse the repository at this point in the history
…rquet (#1075)

* implement basic native code for casting struct to struct

* add another test

* rustdoc

* add scala side

* code cleanup

* clippy

* clippy

* add scala test

* improve test

* simple struct case passes

* save progress

* copy schema adapter code from DataFusion

* more tests pass

* save progress

* remove debug println

* remove debug println
  • Loading branch information
andygrove authored Dec 4, 2024
1 parent ab09337 commit e3672f7
Show file tree
Hide file tree
Showing 8 changed files with 387 additions and 4 deletions.
1 change: 1 addition & 0 deletions native/core/src/execution/datafusion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,6 @@
pub mod expressions;
mod operators;
pub mod planner;
mod schema_adapter;
pub mod shuffle_writer;
mod util;
8 changes: 6 additions & 2 deletions native/core/src/execution/datafusion/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ use datafusion::{
};
use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr};

use crate::execution::datafusion::schema_adapter::CometSchemaAdapterFactory;
use datafusion::datasource::listing::PartitionedFile;
use datafusion::datasource::physical_plan::parquet::ParquetExecBuilder;
use datafusion::datasource::physical_plan::FileScanConfig;
Expand Down Expand Up @@ -1094,8 +1095,11 @@ impl PhysicalPlanner {
table_parquet_options.global.pushdown_filters = true;
table_parquet_options.global.reorder_filters = true;

let mut builder = ParquetExecBuilder::new(file_scan_config)
.with_table_parquet_options(table_parquet_options);
let mut builder = ParquetExecBuilder::new(file_scan_config)
.with_table_parquet_options(table_parquet_options)
.with_schema_adapter_factory(
Arc::new(CometSchemaAdapterFactory::default()),
);

if let Some(filter) = test_data_filters {
builder = builder.with_predicate(filter);
Expand Down
278 changes: 278 additions & 0 deletions native/core/src/execution/datafusion/schema_adapter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,278 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Custom schema adapter that uses Spark-compatible casts
use arrow::compute::can_cast_types;
use arrow_array::{new_null_array, Array, RecordBatch, RecordBatchOptions};
use arrow_schema::{DataType, Schema, SchemaRef};
use datafusion::datasource::schema_adapter::{SchemaAdapter, SchemaAdapterFactory, SchemaMapper};
use datafusion_comet_spark_expr::{spark_cast, EvalMode};
use datafusion_common::plan_err;
use datafusion_expr::ColumnarValue;
use std::sync::Arc;

#[derive(Clone, Debug, Default)]
pub struct CometSchemaAdapterFactory {}

impl SchemaAdapterFactory for CometSchemaAdapterFactory {
/// Create a new factory for mapping batches from a file schema to a table
/// schema.
///
/// This is a convenience for [`DefaultSchemaAdapterFactory::create`] with
/// the same schema for both the projected table schema and the table
/// schema.
fn create(
&self,
projected_table_schema: SchemaRef,
table_schema: SchemaRef,
) -> Box<dyn SchemaAdapter> {
Box::new(CometSchemaAdapter {
projected_table_schema,
table_schema,
})
}
}

/// This SchemaAdapter requires both the table schema and the projected table
/// schema. See [`SchemaMapping`] for more details
#[derive(Clone, Debug)]
pub struct CometSchemaAdapter {
/// The schema for the table, projected to include only the fields being output (projected) by the
/// associated ParquetExec
projected_table_schema: SchemaRef,
/// The entire table schema for the table we're using this to adapt.
///
/// This is used to evaluate any filters pushed down into the scan
/// which may refer to columns that are not referred to anywhere
/// else in the plan.
table_schema: SchemaRef,
}

impl SchemaAdapter for CometSchemaAdapter {
/// Map a column index in the table schema to a column index in a particular
/// file schema
///
/// Panics if index is not in range for the table schema
fn map_column_index(&self, index: usize, file_schema: &Schema) -> Option<usize> {
let field = self.projected_table_schema.field(index);
Some(file_schema.fields.find(field.name())?.0)
}

/// Creates a `SchemaMapping` for casting or mapping the columns from the
/// file schema to the table schema.
///
/// If the provided `file_schema` contains columns of a different type to
/// the expected `table_schema`, the method will attempt to cast the array
/// data from the file schema to the table schema where possible.
///
/// Returns a [`SchemaMapping`] that can be applied to the output batch
/// along with an ordered list of columns to project from the file
fn map_schema(
&self,
file_schema: &Schema,
) -> datafusion_common::Result<(Arc<dyn SchemaMapper>, Vec<usize>)> {
let mut projection = Vec::with_capacity(file_schema.fields().len());
let mut field_mappings = vec![None; self.projected_table_schema.fields().len()];

for (file_idx, file_field) in file_schema.fields.iter().enumerate() {
if let Some((table_idx, table_field)) =
self.projected_table_schema.fields().find(file_field.name())
{
// workaround for struct casting
match (file_field.data_type(), table_field.data_type()) {
// TODO need to use Comet cast logic to determine which casts are supported,
// but for now just add a hack to support casting between struct types
(DataType::Struct(_), DataType::Struct(_)) => {
field_mappings[table_idx] = Some(projection.len());
projection.push(file_idx);
}
_ => {
if can_cast_types(file_field.data_type(), table_field.data_type()) {
field_mappings[table_idx] = Some(projection.len());
projection.push(file_idx);
} else {
return plan_err!(
"Cannot cast file schema field {} of type {:?} to table schema field of type {:?}",
file_field.name(),
file_field.data_type(),
table_field.data_type()
);
}
}
}
}
}

Ok((
Arc::new(SchemaMapping {
projected_table_schema: self.projected_table_schema.clone(),
field_mappings,
table_schema: self.table_schema.clone(),
}),
projection,
))
}
}

// TODO SchemaMapping is mostly copied from DataFusion but calls spark_cast
// instead of arrow cast - can we reduce the amount of code copied here and make
// the DataFusion version more extensible?

/// The SchemaMapping struct holds a mapping from the file schema to the table
/// schema and any necessary type conversions.
///
/// Note, because `map_batch` and `map_partial_batch` functions have different
/// needs, this struct holds two schemas:
///
/// 1. The projected **table** schema
/// 2. The full table schema
///
/// [`map_batch`] is used by the ParquetOpener to produce a RecordBatch which
/// has the projected schema, since that's the schema which is supposed to come
/// out of the execution of this query. Thus `map_batch` uses
/// `projected_table_schema` as it can only operate on the projected fields.
///
/// [`map_partial_batch`] is used to create a RecordBatch with a schema that
/// can be used for Parquet predicate pushdown, meaning that it may contain
/// fields which are not in the projected schema (as the fields that parquet
/// pushdown filters operate can be completely distinct from the fields that are
/// projected (output) out of the ParquetExec). `map_partial_batch` thus uses
/// `table_schema` to create the resulting RecordBatch (as it could be operating
/// on any fields in the schema).
///
/// [`map_batch`]: Self::map_batch
/// [`map_partial_batch`]: Self::map_partial_batch
#[derive(Debug)]
pub struct SchemaMapping {
/// The schema of the table. This is the expected schema after conversion
/// and it should match the schema of the query result.
projected_table_schema: SchemaRef,
/// Mapping from field index in `projected_table_schema` to index in
/// projected file_schema.
///
/// They are Options instead of just plain `usize`s because the table could
/// have fields that don't exist in the file.
field_mappings: Vec<Option<usize>>,
/// The entire table schema, as opposed to the projected_table_schema (which
/// only contains the columns that we are projecting out of this query).
/// This contains all fields in the table, regardless of if they will be
/// projected out or not.
table_schema: SchemaRef,
}

impl SchemaMapper for SchemaMapping {
/// Adapts a `RecordBatch` to match the `projected_table_schema` using the stored mapping and
/// conversions. The produced RecordBatch has a schema that contains only the projected
/// columns, so if one needs a RecordBatch with a schema that references columns which are not
/// in the projected, it would be better to use `map_partial_batch`
fn map_batch(&self, batch: RecordBatch) -> datafusion_common::Result<RecordBatch> {
let batch_rows = batch.num_rows();
let batch_cols = batch.columns().to_vec();

let cols = self
.projected_table_schema
// go through each field in the projected schema
.fields()
.iter()
// and zip it with the index that maps fields from the projected table schema to the
// projected file schema in `batch`
.zip(&self.field_mappings)
// and for each one...
.map(|(field, file_idx)| {
file_idx.map_or_else(
// If this field only exists in the table, and not in the file, then we know
// that it's null, so just return that.
|| Ok(new_null_array(field.data_type(), batch_rows)),
// However, if it does exist in both, then try to cast it to the correct output
// type
|batch_idx| {
spark_cast(
ColumnarValue::Array(Arc::clone(&batch_cols[batch_idx])),
field.data_type(),
// TODO need to pass in configs here
EvalMode::Legacy,
"UTC",
false,
)?
.into_array(batch_rows)
},
)
})
.collect::<datafusion_common::Result<Vec<_>, _>>()?;

// Necessary to handle empty batches
let options = RecordBatchOptions::new().with_row_count(Some(batch.num_rows()));

let schema = self.projected_table_schema.clone();
let record_batch = RecordBatch::try_new_with_options(schema, cols, &options)?;
Ok(record_batch)
}

/// Adapts a [`RecordBatch`]'s schema into one that has all the correct output types and only
/// contains the fields that exist in both the file schema and table schema.
///
/// Unlike `map_batch` this method also preserves the columns that
/// may not appear in the final output (`projected_table_schema`) but may
/// appear in push down predicates
fn map_partial_batch(&self, batch: RecordBatch) -> datafusion_common::Result<RecordBatch> {
let batch_cols = batch.columns().to_vec();
let schema = batch.schema();

// for each field in the batch's schema (which is based on a file, not a table)...
let (cols, fields) = schema
.fields()
.iter()
.zip(batch_cols.iter())
.flat_map(|(field, batch_col)| {
self.table_schema
// try to get the same field from the table schema that we have stored in self
.field_with_name(field.name())
// and if we don't have it, that's fine, ignore it. This may occur when we've
// created an external table whose fields are a subset of the fields in this
// file, then tried to read data from the file into this table. If that is the
// case here, it's fine to ignore because we don't care about this field
// anyways
.ok()
// but if we do have it,
.map(|table_field| {
// try to cast it into the correct output type. we don't want to ignore this
// error, though, so it's propagated.
spark_cast(
ColumnarValue::Array(Arc::clone(batch_col)),
table_field.data_type(),
// TODO need to pass in configs here
EvalMode::Legacy,
"UTC",
false,
)?.into_array(batch_col.len())
// and if that works, return the field and column.
.map(|new_col| (new_col, table_field.clone()))
})
})
.collect::<Result<Vec<_>, _>>()?
.into_iter()
.unzip::<_, _, Vec<_>, Vec<_>>();

// Necessary to handle empty batches
let options = RecordBatchOptions::new().with_row_count(Some(batch.num_rows()));

let schema = Arc::new(Schema::new_with_metadata(fields, schema.metadata().clone()));
let record_batch = RecordBatch::try_new_with_options(schema, cols, &options)?;
Ok(record_batch)
}
}
1 change: 1 addition & 0 deletions native/spark-expr/src/cast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -623,6 +623,7 @@ fn cast_array(
) -> DataFusionResult<ArrayRef> {
let array = array_with_timezone(array, timezone.clone(), Some(to_type))?;
let from_type = array.data_type().clone();

let array = match &from_type {
DataType::Dictionary(key_type, value_type)
if key_type.as_ref() == &DataType::Int32
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -977,7 +977,8 @@ class CometSparkSessionExtensions
if (CometConf.COMET_EXPLAIN_FALLBACK_ENABLED.get()) {
val info = new ExtendedExplainInfo()
if (info.extensionInfo(newPlan).nonEmpty) {
logWarning(
// scalastyle:off println
println(
"Comet cannot execute some parts of this plan natively " +
s"(set ${CometConf.COMET_EXPLAIN_FALLBACK_ENABLED.key}=false " +
"to disable this logging):\n" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ trait DataTypeSupport {
BinaryType | StringType | _: DecimalType | DateType | TimestampType =>
true
case t: DataType if t.typeName == "timestamp_ntz" => true
case _: StructType => true
case _ => false
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim
logWarning(s"Comet native execution is disabled due to: $reason")
}

def supportedDataType(dt: DataType, allowStruct: Boolean = false): Boolean = dt match {
def supportedDataType(dt: DataType, allowStruct: Boolean = true): Boolean = dt match {
case _: ByteType | _: ShortType | _: IntegerType | _: LongType | _: FloatType |
_: DoubleType | _: StringType | _: BinaryType | _: TimestampType | _: DecimalType |
_: DateType | _: BooleanType | _: NullType =>
Expand Down
Loading

0 comments on commit e3672f7

Please sign in to comment.