diff --git a/native/core/src/execution/datafusion/mod.rs b/native/core/src/execution/datafusion/mod.rs index 6f81ee918..fb9c8829c 100644 --- a/native/core/src/execution/datafusion/mod.rs +++ b/native/core/src/execution/datafusion/mod.rs @@ -20,5 +20,6 @@ pub mod expressions; mod operators; pub mod planner; +mod schema_adapter; pub mod shuffle_writer; mod util; diff --git a/native/core/src/execution/datafusion/planner.rs b/native/core/src/execution/datafusion/planner.rs index babc68696..c5147d772 100644 --- a/native/core/src/execution/datafusion/planner.rs +++ b/native/core/src/execution/datafusion/planner.rs @@ -84,6 +84,7 @@ use datafusion::{ }; use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr}; +use crate::execution::datafusion::schema_adapter::CometSchemaAdapterFactory; use datafusion::datasource::listing::PartitionedFile; use datafusion::datasource::physical_plan::parquet::ParquetExecBuilder; use datafusion::datasource::physical_plan::FileScanConfig; @@ -1094,8 +1095,11 @@ impl PhysicalPlanner { table_parquet_options.global.pushdown_filters = true; table_parquet_options.global.reorder_filters = true; - let mut builder = ParquetExecBuilder::new(file_scan_config) - .with_table_parquet_options(table_parquet_options); + let mut builder = ParquetExecBuilder::new(file_scan_config) + .with_table_parquet_options(table_parquet_options) + .with_schema_adapter_factory( + Arc::new(CometSchemaAdapterFactory::default()), + ); if let Some(filter) = test_data_filters { builder = builder.with_predicate(filter); diff --git a/native/core/src/execution/datafusion/schema_adapter.rs b/native/core/src/execution/datafusion/schema_adapter.rs new file mode 100644 index 000000000..16d4b9d67 --- /dev/null +++ b/native/core/src/execution/datafusion/schema_adapter.rs @@ -0,0 +1,278 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Custom schema adapter that uses Spark-compatible casts + +use arrow::compute::can_cast_types; +use arrow_array::{new_null_array, Array, RecordBatch, RecordBatchOptions}; +use arrow_schema::{DataType, Schema, SchemaRef}; +use datafusion::datasource::schema_adapter::{SchemaAdapter, SchemaAdapterFactory, SchemaMapper}; +use datafusion_comet_spark_expr::{spark_cast, EvalMode}; +use datafusion_common::plan_err; +use datafusion_expr::ColumnarValue; +use std::sync::Arc; + +#[derive(Clone, Debug, Default)] +pub struct CometSchemaAdapterFactory {} + +impl SchemaAdapterFactory for CometSchemaAdapterFactory { + /// Create a new factory for mapping batches from a file schema to a table + /// schema. + /// + /// This is a convenience for [`DefaultSchemaAdapterFactory::create`] with + /// the same schema for both the projected table schema and the table + /// schema. + fn create( + &self, + projected_table_schema: SchemaRef, + table_schema: SchemaRef, + ) -> Box { + Box::new(CometSchemaAdapter { + projected_table_schema, + table_schema, + }) + } +} + +/// This SchemaAdapter requires both the table schema and the projected table +/// schema. See [`SchemaMapping`] for more details +#[derive(Clone, Debug)] +pub struct CometSchemaAdapter { + /// The schema for the table, projected to include only the fields being output (projected) by the + /// associated ParquetExec + projected_table_schema: SchemaRef, + /// The entire table schema for the table we're using this to adapt. + /// + /// This is used to evaluate any filters pushed down into the scan + /// which may refer to columns that are not referred to anywhere + /// else in the plan. + table_schema: SchemaRef, +} + +impl SchemaAdapter for CometSchemaAdapter { + /// Map a column index in the table schema to a column index in a particular + /// file schema + /// + /// Panics if index is not in range for the table schema + fn map_column_index(&self, index: usize, file_schema: &Schema) -> Option { + let field = self.projected_table_schema.field(index); + Some(file_schema.fields.find(field.name())?.0) + } + + /// Creates a `SchemaMapping` for casting or mapping the columns from the + /// file schema to the table schema. + /// + /// If the provided `file_schema` contains columns of a different type to + /// the expected `table_schema`, the method will attempt to cast the array + /// data from the file schema to the table schema where possible. + /// + /// Returns a [`SchemaMapping`] that can be applied to the output batch + /// along with an ordered list of columns to project from the file + fn map_schema( + &self, + file_schema: &Schema, + ) -> datafusion_common::Result<(Arc, Vec)> { + let mut projection = Vec::with_capacity(file_schema.fields().len()); + let mut field_mappings = vec![None; self.projected_table_schema.fields().len()]; + + for (file_idx, file_field) in file_schema.fields.iter().enumerate() { + if let Some((table_idx, table_field)) = + self.projected_table_schema.fields().find(file_field.name()) + { + // workaround for struct casting + match (file_field.data_type(), table_field.data_type()) { + // TODO need to use Comet cast logic to determine which casts are supported, + // but for now just add a hack to support casting between struct types + (DataType::Struct(_), DataType::Struct(_)) => { + field_mappings[table_idx] = Some(projection.len()); + projection.push(file_idx); + } + _ => { + if can_cast_types(file_field.data_type(), table_field.data_type()) { + field_mappings[table_idx] = Some(projection.len()); + projection.push(file_idx); + } else { + return plan_err!( + "Cannot cast file schema field {} of type {:?} to table schema field of type {:?}", + file_field.name(), + file_field.data_type(), + table_field.data_type() + ); + } + } + } + } + } + + Ok(( + Arc::new(SchemaMapping { + projected_table_schema: self.projected_table_schema.clone(), + field_mappings, + table_schema: self.table_schema.clone(), + }), + projection, + )) + } +} + +// TODO SchemaMapping is mostly copied from DataFusion but calls spark_cast +// instead of arrow cast - can we reduce the amount of code copied here and make +// the DataFusion version more extensible? + +/// The SchemaMapping struct holds a mapping from the file schema to the table +/// schema and any necessary type conversions. +/// +/// Note, because `map_batch` and `map_partial_batch` functions have different +/// needs, this struct holds two schemas: +/// +/// 1. The projected **table** schema +/// 2. The full table schema +/// +/// [`map_batch`] is used by the ParquetOpener to produce a RecordBatch which +/// has the projected schema, since that's the schema which is supposed to come +/// out of the execution of this query. Thus `map_batch` uses +/// `projected_table_schema` as it can only operate on the projected fields. +/// +/// [`map_partial_batch`] is used to create a RecordBatch with a schema that +/// can be used for Parquet predicate pushdown, meaning that it may contain +/// fields which are not in the projected schema (as the fields that parquet +/// pushdown filters operate can be completely distinct from the fields that are +/// projected (output) out of the ParquetExec). `map_partial_batch` thus uses +/// `table_schema` to create the resulting RecordBatch (as it could be operating +/// on any fields in the schema). +/// +/// [`map_batch`]: Self::map_batch +/// [`map_partial_batch`]: Self::map_partial_batch +#[derive(Debug)] +pub struct SchemaMapping { + /// The schema of the table. This is the expected schema after conversion + /// and it should match the schema of the query result. + projected_table_schema: SchemaRef, + /// Mapping from field index in `projected_table_schema` to index in + /// projected file_schema. + /// + /// They are Options instead of just plain `usize`s because the table could + /// have fields that don't exist in the file. + field_mappings: Vec>, + /// The entire table schema, as opposed to the projected_table_schema (which + /// only contains the columns that we are projecting out of this query). + /// This contains all fields in the table, regardless of if they will be + /// projected out or not. + table_schema: SchemaRef, +} + +impl SchemaMapper for SchemaMapping { + /// Adapts a `RecordBatch` to match the `projected_table_schema` using the stored mapping and + /// conversions. The produced RecordBatch has a schema that contains only the projected + /// columns, so if one needs a RecordBatch with a schema that references columns which are not + /// in the projected, it would be better to use `map_partial_batch` + fn map_batch(&self, batch: RecordBatch) -> datafusion_common::Result { + let batch_rows = batch.num_rows(); + let batch_cols = batch.columns().to_vec(); + + let cols = self + .projected_table_schema + // go through each field in the projected schema + .fields() + .iter() + // and zip it with the index that maps fields from the projected table schema to the + // projected file schema in `batch` + .zip(&self.field_mappings) + // and for each one... + .map(|(field, file_idx)| { + file_idx.map_or_else( + // If this field only exists in the table, and not in the file, then we know + // that it's null, so just return that. + || Ok(new_null_array(field.data_type(), batch_rows)), + // However, if it does exist in both, then try to cast it to the correct output + // type + |batch_idx| { + spark_cast( + ColumnarValue::Array(Arc::clone(&batch_cols[batch_idx])), + field.data_type(), + // TODO need to pass in configs here + EvalMode::Legacy, + "UTC", + false, + )? + .into_array(batch_rows) + }, + ) + }) + .collect::, _>>()?; + + // Necessary to handle empty batches + let options = RecordBatchOptions::new().with_row_count(Some(batch.num_rows())); + + let schema = self.projected_table_schema.clone(); + let record_batch = RecordBatch::try_new_with_options(schema, cols, &options)?; + Ok(record_batch) + } + + /// Adapts a [`RecordBatch`]'s schema into one that has all the correct output types and only + /// contains the fields that exist in both the file schema and table schema. + /// + /// Unlike `map_batch` this method also preserves the columns that + /// may not appear in the final output (`projected_table_schema`) but may + /// appear in push down predicates + fn map_partial_batch(&self, batch: RecordBatch) -> datafusion_common::Result { + let batch_cols = batch.columns().to_vec(); + let schema = batch.schema(); + + // for each field in the batch's schema (which is based on a file, not a table)... + let (cols, fields) = schema + .fields() + .iter() + .zip(batch_cols.iter()) + .flat_map(|(field, batch_col)| { + self.table_schema + // try to get the same field from the table schema that we have stored in self + .field_with_name(field.name()) + // and if we don't have it, that's fine, ignore it. This may occur when we've + // created an external table whose fields are a subset of the fields in this + // file, then tried to read data from the file into this table. If that is the + // case here, it's fine to ignore because we don't care about this field + // anyways + .ok() + // but if we do have it, + .map(|table_field| { + // try to cast it into the correct output type. we don't want to ignore this + // error, though, so it's propagated. + spark_cast( + ColumnarValue::Array(Arc::clone(batch_col)), + table_field.data_type(), + // TODO need to pass in configs here + EvalMode::Legacy, + "UTC", + false, + )?.into_array(batch_col.len()) + // and if that works, return the field and column. + .map(|new_col| (new_col, table_field.clone())) + }) + }) + .collect::, _>>()? + .into_iter() + .unzip::<_, _, Vec<_>, Vec<_>>(); + + // Necessary to handle empty batches + let options = RecordBatchOptions::new().with_row_count(Some(batch.num_rows())); + + let schema = Arc::new(Schema::new_with_metadata(fields, schema.metadata().clone())); + let record_batch = RecordBatch::try_new_with_options(schema, cols, &options)?; + Ok(record_batch) + } +} diff --git a/native/spark-expr/src/cast.rs b/native/spark-expr/src/cast.rs index 13263a595..8ef9ca291 100644 --- a/native/spark-expr/src/cast.rs +++ b/native/spark-expr/src/cast.rs @@ -623,6 +623,7 @@ fn cast_array( ) -> DataFusionResult { let array = array_with_timezone(array, timezone.clone(), Some(to_type))?; let from_type = array.data_type().clone(); + let array = match &from_type { DataType::Dictionary(key_type, value_type) if key_type.as_ref() == &DataType::Int32 diff --git a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala index 1028d0466..32668f0dd 100644 --- a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala +++ b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala @@ -977,7 +977,8 @@ class CometSparkSessionExtensions if (CometConf.COMET_EXPLAIN_FALLBACK_ENABLED.get()) { val info = new ExtendedExplainInfo() if (info.extensionInfo(newPlan).nonEmpty) { - logWarning( + // scalastyle:off println + println( "Comet cannot execute some parts of this plan natively " + s"(set ${CometConf.COMET_EXPLAIN_FALLBACK_ENABLED.key}=false " + "to disable this logging):\n" + diff --git a/spark/src/main/scala/org/apache/comet/DataTypeSupport.scala b/spark/src/main/scala/org/apache/comet/DataTypeSupport.scala index c49a2c465..09c062b8b 100644 --- a/spark/src/main/scala/org/apache/comet/DataTypeSupport.scala +++ b/spark/src/main/scala/org/apache/comet/DataTypeSupport.scala @@ -39,6 +39,7 @@ trait DataTypeSupport { BinaryType | StringType | _: DecimalType | DateType | TimestampType => true case t: DataType if t.typeName == "timestamp_ntz" => true + case _: StructType => true case _ => false } diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 0cd4b3d9c..7473e9326 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -62,7 +62,7 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim logWarning(s"Comet native execution is disabled due to: $reason") } - def supportedDataType(dt: DataType, allowStruct: Boolean = false): Boolean = dt match { + def supportedDataType(dt: DataType, allowStruct: Boolean = true): Boolean = dt match { case _: ByteType | _: ShortType | _: IntegerType | _: LongType | _: FloatType | _: DoubleType | _: StringType | _: BinaryType | _: TimestampType | _: DecimalType | _: DateType | _: BooleanType | _: NullType => diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala index 0d00867d1..85ac6138b 100644 --- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala @@ -2200,6 +2200,103 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { } } + test("get_struct_field with DataFusion ParquetExec - simple case") { + withTempPath { dir => + // create input file with Comet disabled + withSQLConf(CometConf.COMET_ENABLED.key -> "false") { + val df = spark + .range(5) + // Add both a null struct and null inner value + .select(when(col("id") > 1, struct(when(col("id") > 2, col("id")).alias("id"))) + .alias("nested1")) + + df.write.parquet(dir.toString()) + } + + Seq("parquet").foreach { v1List => + withSQLConf( + SQLConf.USE_V1_SOURCE_LIST.key -> v1List, + CometConf.COMET_ENABLED.key -> "true", + CometConf.COMET_EXPLAIN_FALLBACK_ENABLED.key -> "true") { + + val df = spark.read.parquet(dir.toString()) + checkSparkAnswerAndOperator(df.select("nested1.id")) + } + } + } + } + + test("get_struct_field with DataFusion ParquetExec - select subset of struct") { + withTempPath { dir => + // create input file with Comet disabled + withSQLConf(CometConf.COMET_ENABLED.key -> "false") { + val df = spark + .range(5) + // Add both a null struct and null inner value + .select( + when( + col("id") > 1, + struct( + when(col("id") > 2, col("id")).alias("id"), + when(col("id") > 2, struct(when(col("id") > 3, col("id")).alias("id"))) + .as("nested2"))) + .alias("nested1")) + + df.write.parquet(dir.toString()) + } + + Seq("parquet").foreach { v1List => + withSQLConf( + SQLConf.USE_V1_SOURCE_LIST.key -> v1List, + CometConf.COMET_ENABLED.key -> "true", + CometConf.COMET_EXPLAIN_FALLBACK_ENABLED.key -> "true") { + + val df = spark.read.parquet(dir.toString()) + + checkSparkAnswerAndOperator(df.select("nested1.id")) + + checkSparkAnswerAndOperator(df.select("nested1.id", "nested1.nested2.id")) + + // unsupported cast from Int64 to Struct([Field { name: "id", data_type: Int64, ... + // checkSparkAnswerAndOperator(df.select("nested1.nested2.id")) + } + } + } + } + + // TODO this is not using DataFusion's ParquetExec for some reason + ignore("get_struct_field with DataFusion ParquetExec - read entire struct") { + withTempPath { dir => + // create input file with Comet disabled + withSQLConf(CometConf.COMET_ENABLED.key -> "false") { + val df = spark + .range(5) + // Add both a null struct and null inner value + .select( + when( + col("id") > 1, + struct( + when(col("id") > 2, col("id")).alias("id"), + when(col("id") > 2, struct(when(col("id") > 3, col("id")).alias("id"))) + .as("nested2"))) + .alias("nested1")) + + df.write.parquet(dir.toString()) + } + + Seq("parquet").foreach { v1List => + withSQLConf( + SQLConf.USE_V1_SOURCE_LIST.key -> v1List, + CometConf.COMET_ENABLED.key -> "true", + CometConf.COMET_EXPLAIN_FALLBACK_ENABLED.key -> "true") { + + val df = spark.read.parquet(dir.toString()) + checkSparkAnswerAndOperator(df.select("nested1")) + } + } + } + } + test("CreateArray") { Seq(true, false).foreach { dictionaryEnabled => withTempDir { dir =>