From ea114f2ea0cec2b09c182521a9e31fc79fd47079 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 11 Nov 2024 09:11:57 -0700 Subject: [PATCH 01/16] implement basic native code for casting struct to struct --- native/spark-expr/src/cast.rs | 73 ++++++++++++++++++++++++++++++++++- 1 file changed, 71 insertions(+), 2 deletions(-) diff --git a/native/spark-expr/src/cast.rs b/native/spark-expr/src/cast.rs index 0a2f7fef6..bff658aa6 100644 --- a/native/spark-expr/src/cast.rs +++ b/native/spark-expr/src/cast.rs @@ -34,7 +34,7 @@ use arrow::{ }; use arrow_array::builder::StringBuilder; use arrow_array::{DictionaryArray, StringArray, StructArray}; -use arrow_schema::{DataType, Schema}; +use arrow_schema::{DataType, Field, Schema}; use datafusion_common::{ cast::as_generic_string_array, internal_err, Result as DataFusionResult, ScalarValue, }; @@ -714,6 +714,14 @@ fn cast_array( (DataType::Struct(_), DataType::Utf8) => { Ok(casts_struct_to_string(array.as_struct(), &timezone)?) } + (DataType::Struct(_), DataType::Struct(_)) => Ok(cast_struct_to_struct( + array.as_struct(), + from_type, + to_type, + eval_mode, + &timezone, + allow_incompat, + )?), _ if is_datafusion_spark_compatible(from_type, to_type, allow_incompat) => { // use DataFusion cast only when we know that it is compatible with Spark Ok(cast_with_options(&array, to_type, &CAST_OPTIONS)?) @@ -811,6 +819,34 @@ fn is_datafusion_spark_compatible( } } +fn cast_struct_to_struct( + array: &StructArray, + from_type: &DataType, + to_type: &DataType, + eval_mode: EvalMode, + timezone: &str, + allow_incompat: bool, +) -> DataFusionResult { + match (from_type, to_type) { + (DataType::Struct(f), DataType::Struct(t)) => { + assert!(t.len() <= f.len()); + let mut foo: Vec<(Arc, ArrayRef)> = vec![]; + for i in 0..t.len() { + let x = spark_cast( + ColumnarValue::Array(array.column(i).clone()), + &t[i].data_type(), + eval_mode, + timezone, + allow_incompat, + )?; + foo.push((t[i].clone(), x.into_array(array.len())?)); + } + Ok(Arc::new(StructArray::from(foo))) + } + _ => unreachable!(), + } +} + fn casts_struct_to_string(array: &StructArray, timezone: &str) -> DataFusionResult { // cast each field to a string let string_arrays: Vec = array @@ -1929,7 +1965,7 @@ fn trim_end(s: &str) -> &str { mod tests { use arrow::datatypes::TimestampMicrosecondType; use arrow_array::StringArray; - use arrow_schema::{Field, TimeUnit}; + use arrow_schema::{Field, Fields, TimeUnit}; use std::str::FromStr; use super::*; @@ -2336,4 +2372,37 @@ mod tests { assert_eq!(r#"{4, d}"#, string_array.value(3)); assert_eq!(r#"{5, e}"#, string_array.value(4)); } + + #[test] + fn test_cast_struct_to_struct() { + let a: ArrayRef = Arc::new(Int32Array::from(vec![ + Some(1), + Some(2), + None, + Some(4), + Some(5), + ])); + let b: ArrayRef = Arc::new(StringArray::from(vec!["a", "b", "c", "d", "e"])); + let c: ArrayRef = Arc::new(StructArray::from(vec![ + (Arc::new(Field::new("a", DataType::Int32, true)), a), + (Arc::new(Field::new("b", DataType::Utf8, true)), b), + ])); + let fields = Fields::from(vec![ + Field::new("a", DataType::Utf8, true), + Field::new("b", DataType::Utf8, true), + ]); + let x = spark_cast( + ColumnarValue::Array(c), + &DataType::Struct(fields), + EvalMode::Legacy, + "UTC", + false, + ) + .unwrap(); + if let ColumnarValue::Array(cast_array) = x { + assert_eq!(5, cast_array.len()); + } else { + unreachable!() + } + } } From 60f7d9096dc418cf920b8749bf20a8d048b18a4e Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 11 Nov 2024 09:15:02 -0700 Subject: [PATCH 02/16] add another test --- native/spark-expr/src/cast.rs | 42 +++++++++++++++++++++++++++++++++-- 1 file changed, 40 insertions(+), 2 deletions(-) diff --git a/native/spark-expr/src/cast.rs b/native/spark-expr/src/cast.rs index bff658aa6..a425ccc7e 100644 --- a/native/spark-expr/src/cast.rs +++ b/native/spark-expr/src/cast.rs @@ -2387,11 +2387,12 @@ mod tests { (Arc::new(Field::new("a", DataType::Int32, true)), a), (Arc::new(Field::new("b", DataType::Utf8, true)), b), ])); + // change type of "a" from Int32 to Utf8 let fields = Fields::from(vec![ Field::new("a", DataType::Utf8, true), Field::new("b", DataType::Utf8, true), ]); - let x = spark_cast( + let cast_array = spark_cast( ColumnarValue::Array(c), &DataType::Struct(fields), EvalMode::Legacy, @@ -2399,8 +2400,45 @@ mod tests { false, ) .unwrap(); - if let ColumnarValue::Array(cast_array) = x { + if let ColumnarValue::Array(cast_array) = cast_array { assert_eq!(5, cast_array.len()); + let a = cast_array.as_struct().column(0).as_string::(); + assert_eq!("1", a.value(0)); + } else { + unreachable!() + } + } + + #[test] + fn test_cast_struct_to_struct_drop_column() { + let a: ArrayRef = Arc::new(Int32Array::from(vec![ + Some(1), + Some(2), + None, + Some(4), + Some(5), + ])); + let b: ArrayRef = Arc::new(StringArray::from(vec!["a", "b", "c", "d", "e"])); + let c: ArrayRef = Arc::new(StructArray::from(vec![ + (Arc::new(Field::new("a", DataType::Int32, true)), a), + (Arc::new(Field::new("b", DataType::Utf8, true)), b), + ])); + // change type of "a" from Int32 to Utf8 and drop "b" + let fields = Fields::from(vec![Field::new("a", DataType::Utf8, true)]); + let cast_array = spark_cast( + ColumnarValue::Array(c), + &DataType::Struct(fields), + EvalMode::Legacy, + "UTC", + false, + ) + .unwrap(); + if let ColumnarValue::Array(cast_array) = cast_array { + assert_eq!(5, cast_array.len()); + let struct_array = cast_array.as_struct(); + assert_eq!(1, struct_array.columns().len()); + let a = struct_array.column(0).as_string::(); + assert_eq!("1", a.value(0)); } else { unreachable!() } From 0223fb91f6ad8405f0171af493cf004be182f152 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 11 Nov 2024 09:17:23 -0700 Subject: [PATCH 03/16] rustdoc --- native/spark-expr/src/cast.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/native/spark-expr/src/cast.rs b/native/spark-expr/src/cast.rs index a425ccc7e..6314f1032 100644 --- a/native/spark-expr/src/cast.rs +++ b/native/spark-expr/src/cast.rs @@ -819,6 +819,12 @@ fn is_datafusion_spark_compatible( } } +/// Cast between struct types based on logic in +/// `org.apache.spark.sql.catalyst.expressions.Cast#castStruct`. +/// +/// This can change the types of fields within the struct as well as drop struct fields. The +/// `from_type` and `to_type` do not need to have the same number of fields, but the `from_type` +/// must have at least as many fields as the `to_type`. fn cast_struct_to_struct( array: &StructArray, from_type: &DataType, From 250f95678975c1d9f14d35db6c8430a6f0ac6043 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 11 Nov 2024 09:23:47 -0700 Subject: [PATCH 04/16] add scala side --- .../org/apache/comet/expressions/CometCast.scala | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/spark/src/main/scala/org/apache/comet/expressions/CometCast.scala b/spark/src/main/scala/org/apache/comet/expressions/CometCast.scala index eb9800b8d..afd1929c1 100644 --- a/spark/src/main/scala/org/apache/comet/expressions/CometCast.scala +++ b/spark/src/main/scala/org/apache/comet/expressions/CometCast.scala @@ -95,6 +95,17 @@ object CometCast { canCastFromFloat(toType) case (DataTypes.DoubleType, _) => canCastFromDouble(toType) + case (from_struct: StructType, to_struct: StructType) => + from_struct.fields.zip(to_struct.fields).foreach { case (a, b) => + isSupported(a.dataType, b.dataType, timeZoneId, evalMode) match { + case Compatible(_) => + // all good + case other => + return other + } + Compatible() + } + Compatible() case _ => Unsupported } } From ec4a0ed28027e0abb0c47525c1aa5f792073fdbe Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 11 Nov 2024 09:27:41 -0700 Subject: [PATCH 05/16] code cleanup --- native/spark-expr/src/cast.rs | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/native/spark-expr/src/cast.rs b/native/spark-expr/src/cast.rs index 6314f1032..65dc6e465 100644 --- a/native/spark-expr/src/cast.rs +++ b/native/spark-expr/src/cast.rs @@ -834,20 +834,20 @@ fn cast_struct_to_struct( allow_incompat: bool, ) -> DataFusionResult { match (from_type, to_type) { - (DataType::Struct(f), DataType::Struct(t)) => { - assert!(t.len() <= f.len()); - let mut foo: Vec<(Arc, ArrayRef)> = vec![]; - for i in 0..t.len() { - let x = spark_cast( + (DataType::Struct(from_fields), DataType::Struct(to_fields)) => { + assert!(to_fields.len() <= from_fields.len()); + let mut cast_fields: Vec<(Arc, ArrayRef)> = vec![]; + for i in 0..to_fields.len() { + let cast_field = spark_cast( ColumnarValue::Array(array.column(i).clone()), - &t[i].data_type(), + &to_fields[i].data_type(), eval_mode, timezone, allow_incompat, )?; - foo.push((t[i].clone(), x.into_array(array.len())?)); + cast_fields.push((to_fields[i].clone(), cast_field.into_array(array.len())?)); } - Ok(Arc::new(StructArray::from(foo))) + Ok(Arc::new(StructArray::from(cast_fields))) } _ => unreachable!(), } From 41497f59028d1c38ef19e4d3a7cbb7462750ae19 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 11 Nov 2024 09:46:01 -0700 Subject: [PATCH 06/16] clippy --- native/spark-expr/src/cast.rs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/native/spark-expr/src/cast.rs b/native/spark-expr/src/cast.rs index 65dc6e465..0bae8f74e 100644 --- a/native/spark-expr/src/cast.rs +++ b/native/spark-expr/src/cast.rs @@ -719,7 +719,7 @@ fn cast_array( from_type, to_type, eval_mode, - &timezone, + timezone, allow_incompat, )?), _ if is_datafusion_spark_compatible(from_type, to_type, allow_incompat) => { @@ -830,7 +830,7 @@ fn cast_struct_to_struct( from_type: &DataType, to_type: &DataType, eval_mode: EvalMode, - timezone: &str, + timezone: String, allow_incompat: bool, ) -> DataFusionResult { match (from_type, to_type) { @@ -838,14 +838,14 @@ fn cast_struct_to_struct( assert!(to_fields.len() <= from_fields.len()); let mut cast_fields: Vec<(Arc, ArrayRef)> = vec![]; for i in 0..to_fields.len() { - let cast_field = spark_cast( - ColumnarValue::Array(array.column(i).clone()), - &to_fields[i].data_type(), + let cast_field = cast_array( + Arc::clone(array.column(i)), + to_fields[i].data_type(), eval_mode, - timezone, + timezone.clone(), allow_incompat, )?; - cast_fields.push((to_fields[i].clone(), cast_field.into_array(array.len())?)); + cast_fields.push((Arc::clone(&to_fields[i]), cast_field)); } Ok(Arc::new(StructArray::from(cast_fields))) } From 823350b7dbfe270829a0950a2b8532215a0d5e11 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 11 Nov 2024 09:46:35 -0700 Subject: [PATCH 07/16] clippy --- native/spark-expr/src/cast.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/native/spark-expr/src/cast.rs b/native/spark-expr/src/cast.rs index 0bae8f74e..19fffb17d 100644 --- a/native/spark-expr/src/cast.rs +++ b/native/spark-expr/src/cast.rs @@ -836,7 +836,7 @@ fn cast_struct_to_struct( match (from_type, to_type) { (DataType::Struct(from_fields), DataType::Struct(to_fields)) => { assert!(to_fields.len() <= from_fields.len()); - let mut cast_fields: Vec<(Arc, ArrayRef)> = vec![]; + let mut cast_fields: Vec<(Arc, ArrayRef)> = Vec::with_capacity(to_fields.len()); for i in 0..to_fields.len() { let cast_field = cast_array( Arc::clone(array.column(i)), From 9d4885d9ef81a15a18d4fda8eaad19d1f443a79c Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 11 Nov 2024 09:53:33 -0700 Subject: [PATCH 08/16] add scala test --- .../org/apache/comet/expressions/CometCast.scala | 1 - .../scala/org/apache/comet/CometCastSuite.scala | 14 ++++++++++++++ 2 files changed, 14 insertions(+), 1 deletion(-) diff --git a/spark/src/main/scala/org/apache/comet/expressions/CometCast.scala b/spark/src/main/scala/org/apache/comet/expressions/CometCast.scala index afd1929c1..11d6d049f 100644 --- a/spark/src/main/scala/org/apache/comet/expressions/CometCast.scala +++ b/spark/src/main/scala/org/apache/comet/expressions/CometCast.scala @@ -103,7 +103,6 @@ object CometCast { case other => return other } - Compatible() } Compatible() case _ => Unsupported diff --git a/spark/src/test/scala/org/apache/comet/CometCastSuite.scala b/spark/src/test/scala/org/apache/comet/CometCastSuite.scala index 817545c5d..0b56b351f 100644 --- a/spark/src/test/scala/org/apache/comet/CometCastSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometCastSuite.scala @@ -881,6 +881,20 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper { } } + test("cast StructType to StructType") { + Seq(true, false).foreach { dictionaryEnabled => + withTempDir { dir => + val path = new Path(dir.toURI.toString, "test.parquet") + makeParquetFileAllTypes(path, dictionaryEnabled = dictionaryEnabled, 10000) + withParquetTable(path.toString, "tbl") { + checkSparkAnswerAndOperator( + "SELECT CAST(struct(_1, _2) as " + + "struct<_1:string, _2:string>) FROM tbl") + } + } + } + } + private def generateFloats(): DataFrame = { withNulls(gen.generateFloats(dataSize)).toDF("a") } From ca06f05753ca646e7a828a771ff019b4c38eb0a2 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 11 Nov 2024 10:02:23 -0700 Subject: [PATCH 09/16] improve test --- spark/src/test/scala/org/apache/comet/CometCastSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/spark/src/test/scala/org/apache/comet/CometCastSuite.scala b/spark/src/test/scala/org/apache/comet/CometCastSuite.scala index 0b56b351f..db9a870dc 100644 --- a/spark/src/test/scala/org/apache/comet/CometCastSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometCastSuite.scala @@ -888,8 +888,8 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper { makeParquetFileAllTypes(path, dictionaryEnabled = dictionaryEnabled, 10000) withParquetTable(path.toString, "tbl") { checkSparkAnswerAndOperator( - "SELECT CAST(struct(_1, _2) as " + - "struct<_1:string, _2:string>) FROM tbl") + "SELECT CAST(CASE WHEN _1 THEN struct(_1, _2, _3, _4) ELSE null END as " + + "struct<_1:string, _2:string, _3:string, _4:string>) FROM tbl") } } } From a8b2e7596cc72d0a4ea9ec953f373ec975dec1e2 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 11 Nov 2024 12:25:30 -0700 Subject: [PATCH 10/16] simple struct case passes --- .../core/src/execution/datafusion/planner.rs | 1 + .../comet/CometSparkSessionExtensions.scala | 3 +- .../org/apache/comet/DataTypeSupport.scala | 1 + .../apache/comet/serde/QueryPlanSerde.scala | 2 +- .../apache/comet/CometExpressionSuite.scala | 67 +++++++++++++++++++ 5 files changed, 72 insertions(+), 2 deletions(-) diff --git a/native/core/src/execution/datafusion/planner.rs b/native/core/src/execution/datafusion/planner.rs index 48a653add..e9bf99da6 100644 --- a/native/core/src/execution/datafusion/planner.rs +++ b/native/core/src/execution/datafusion/planner.rs @@ -968,6 +968,7 @@ impl PhysicalPlanner { ); println!("data_schema_arrow: {:?}", data_schema_arrow); + let required_schema_descriptor = parquet::schema::types::SchemaDescriptor::new(Arc::new(required_schema)); let required_schema_arrow = Arc::new( diff --git a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala index 952ef39e9..03f3dc1d5 100644 --- a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala +++ b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala @@ -954,7 +954,8 @@ class CometSparkSessionExtensions if (CometConf.COMET_EXPLAIN_FALLBACK_ENABLED.get()) { val info = new ExtendedExplainInfo() if (info.extensionInfo(newPlan).nonEmpty) { - logWarning( + // scalastyle:off println + println( "Comet cannot execute some parts of this plan natively " + s"(set ${CometConf.COMET_EXPLAIN_FALLBACK_ENABLED.key}=false " + "to disable this logging):\n" + diff --git a/spark/src/main/scala/org/apache/comet/DataTypeSupport.scala b/spark/src/main/scala/org/apache/comet/DataTypeSupport.scala index c49a2c465..09c062b8b 100644 --- a/spark/src/main/scala/org/apache/comet/DataTypeSupport.scala +++ b/spark/src/main/scala/org/apache/comet/DataTypeSupport.scala @@ -39,6 +39,7 @@ trait DataTypeSupport { BinaryType | StringType | _: DecimalType | DateType | TimestampType => true case t: DataType if t.typeName == "timestamp_ntz" => true + case _: StructType => true case _ => false } diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index b5517f40f..ad13f7c78 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -60,7 +60,7 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim logWarning(s"Comet native execution is disabled due to: $reason") } - def supportedDataType(dt: DataType, allowStruct: Boolean = false): Boolean = dt match { + def supportedDataType(dt: DataType, allowStruct: Boolean = true): Boolean = dt match { case _: ByteType | _: ShortType | _: IntegerType | _: LongType | _: FloatType | _: DoubleType | _: StringType | _: BinaryType | _: TimestampType | _: DecimalType | _: DateType | _: BooleanType | _: NullType => diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala index 0d00867d1..2602e91a3 100644 --- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala @@ -2200,6 +2200,73 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { } } + test("get_struct_field with DataFusion ParquetExec - simple case") { + withTempPath { dir => + // create input file with Comet disabled + withSQLConf(CometConf.COMET_ENABLED.key -> "false") { + val df = spark + .range(5) + // Add both a null struct and null inner value + .select( + when( + col("id") > 1, + struct( + when(col("id") > 2, col("id")).alias("id"))) + .alias("nested1")) + + df.write.parquet(dir.toString()) + } + + Seq("parquet").foreach { v1List => + withSQLConf( + SQLConf.USE_V1_SOURCE_LIST.key -> v1List, + CometConf.COMET_ENABLED.key -> "true", + CometConf.COMET_EXPLAIN_FALLBACK_ENABLED.key -> "true") { + + val df = spark.read.parquet(dir.toString()) + checkSparkAnswerAndOperator(df.select("nested1.id")) + } + } + } + } + + // TODO need more work to get this one passing + ignore("get_struct_field with DataFusion ParquetExec - select subset of struct") { + withTempPath { dir => + // create input file with Comet disabled + withSQLConf(CometConf.COMET_ENABLED.key -> "false") { + val df = spark + .range(5) + // Add both a null struct and null inner value + .select( + when( + col("id") > 1, + struct( + when(col("id") > 2, col("id")).alias("id"), + when(col("id") > 2, struct(when(col("id") > 3, col("id")).alias("id"))) + .as("nested2"))) + .alias("nested1")) + + df.write.parquet(dir.toString()) + } + + Seq("parquet").foreach { v1List => + withSQLConf( + SQLConf.USE_V1_SOURCE_LIST.key -> v1List, + CometConf.COMET_ENABLED.key -> "true", + CometConf.COMET_EXPLAIN_FALLBACK_ENABLED.key -> "true") { + + val df = spark.read.parquet(dir.toString()) + checkSparkAnswerAndOperator(df.select("nested1")) + + +// checkSparkAnswerAndOperator(df.select("nested1.id")) +// checkSparkAnswerAndOperator(df.select("nested1.nested2.id")) + } + } + } + } + test("CreateArray") { Seq(true, false).foreach { dictionaryEnabled => withTempDir { dir => From 2bdb9156df35f73cb8868a2fd65d532fa3b75911 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 11 Nov 2024 16:54:15 -0700 Subject: [PATCH 11/16] save progress --- .../core/src/execution/datafusion/planner.rs | 1 - .../apache/comet/serde/QueryPlanSerde.scala | 3 ++ .../apache/comet/CometExpressionSuite.scala | 48 ++++++++++++++++--- 3 files changed, 44 insertions(+), 8 deletions(-) diff --git a/native/core/src/execution/datafusion/planner.rs b/native/core/src/execution/datafusion/planner.rs index e9bf99da6..48a653add 100644 --- a/native/core/src/execution/datafusion/planner.rs +++ b/native/core/src/execution/datafusion/planner.rs @@ -968,7 +968,6 @@ impl PhysicalPlanner { ); println!("data_schema_arrow: {:?}", data_schema_arrow); - let required_schema_descriptor = parquet::schema::types::SchemaDescriptor::new(Arc::new(required_schema)); let required_schema_arrow = Arc::new( diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 4f8374104..6a22486de 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -2873,6 +2873,9 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim withInfo(join, "SortMergeJoin is not enabled") None + case _: DeserializeToObjectExec => + None + case op if isCometSink(op) && op.output.forall(a => supportedDataType(a.dataType, true)) => // These operators are source of Comet native execution chain val scanBuilder = OperatorOuterClass.Scan.newBuilder() diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala index 2602e91a3..b5fc99546 100644 --- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala @@ -2201,6 +2201,32 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { } test("get_struct_field with DataFusion ParquetExec - simple case") { + withTempPath { dir => + // create input file with Comet disabled + withSQLConf(CometConf.COMET_ENABLED.key -> "false") { + val df = spark + .range(5) + // Add both a null struct and null inner value + .select(when(col("id") > 1, struct(when(col("id") > 2, col("id")).alias("id"))) + .alias("nested1")) + + df.write.parquet(dir.toString()) + } + + Seq("parquet").foreach { v1List => + withSQLConf( + SQLConf.USE_V1_SOURCE_LIST.key -> v1List, + CometConf.COMET_ENABLED.key -> "true", + CometConf.COMET_EXPLAIN_FALLBACK_ENABLED.key -> "true") { + + val df = spark.read.parquet(dir.toString()) + checkSparkAnswerAndOperator(df.select("nested1.id")) + } + } + } + } + + test("get_struct_field with DataFusion ParquetExec - select subset of struct") { withTempPath { dir => // create input file with Comet disabled withSQLConf(CometConf.COMET_ENABLED.key -> "false") { @@ -2211,7 +2237,9 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { when( col("id") > 1, struct( - when(col("id") > 2, col("id")).alias("id"))) + when(col("id") > 2, col("id")).alias("id"), + when(col("id") > 2, struct(when(col("id") > 3, col("id")).alias("id"))) + .as("nested2"))) .alias("nested1")) df.write.parquet(dir.toString()) @@ -2224,14 +2252,24 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { CometConf.COMET_EXPLAIN_FALLBACK_ENABLED.key -> "true") { val df = spark.read.parquet(dir.toString()) + + // TODO this currently fails with the following error from DataFusion + // Cannot cast file schema field nested1 of type + // Struct([Field { name: "id", ... }, Field { name: "nested2", data_type: Struct([Field { name: "id", ... }]), ... }]) + // to table schema field of type + // Struct([Field { name: "id", ... }]) checkSparkAnswerAndOperator(df.select("nested1.id")) + + + // TODO +// checkSparkAnswerAndOperator(df.select("nested1.nested2.id")) } } } } - // TODO need more work to get this one passing - ignore("get_struct_field with DataFusion ParquetExec - select subset of struct") { + // TODO this is not using DataFusion's ParquetExec + ignore("get_struct_field with DataFusion ParquetExec - read entire struct") { withTempPath { dir => // create input file with Comet disabled withSQLConf(CometConf.COMET_ENABLED.key -> "false") { @@ -2258,10 +2296,6 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { val df = spark.read.parquet(dir.toString()) checkSparkAnswerAndOperator(df.select("nested1")) - - -// checkSparkAnswerAndOperator(df.select("nested1.id")) -// checkSparkAnswerAndOperator(df.select("nested1.nested2.id")) } } } From e8521bbb7e3f62f011fa0f060cf590a829571f71 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 12 Nov 2024 08:45:26 -0700 Subject: [PATCH 12/16] copy schema adapter code from DataFusion --- native/core/src/execution/datafusion/mod.rs | 1 + .../core/src/execution/datafusion/planner.rs | 6 +- .../execution/datafusion/schema_adapter.rs | 256 ++++++++++++++++++ .../apache/comet/CometExpressionSuite.scala | 7 +- 4 files changed, 267 insertions(+), 3 deletions(-) create mode 100644 native/core/src/execution/datafusion/schema_adapter.rs diff --git a/native/core/src/execution/datafusion/mod.rs b/native/core/src/execution/datafusion/mod.rs index 6f81ee918..fb9c8829c 100644 --- a/native/core/src/execution/datafusion/mod.rs +++ b/native/core/src/execution/datafusion/mod.rs @@ -20,5 +20,6 @@ pub mod expressions; mod operators; pub mod planner; +mod schema_adapter; pub mod shuffle_writer; mod util; diff --git a/native/core/src/execution/datafusion/planner.rs b/native/core/src/execution/datafusion/planner.rs index 48a653add..379691a12 100644 --- a/native/core/src/execution/datafusion/planner.rs +++ b/native/core/src/execution/datafusion/planner.rs @@ -84,6 +84,7 @@ use datafusion::{ }; use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr}; +use crate::execution::datafusion::schema_adapter::CometSchemaAdapterFactory; use datafusion::datasource::listing::PartitionedFile; use datafusion::datasource::physical_plan::parquet::ParquetExecBuilder; use datafusion::datasource::physical_plan::FileScanConfig; @@ -1050,7 +1051,10 @@ impl PhysicalPlanner { table_parquet_options.global.reorder_filters = true; let mut builder = ParquetExecBuilder::new(file_scan_config) - .with_table_parquet_options(table_parquet_options); + .with_table_parquet_options(table_parquet_options) + .with_schema_adapter_factory( + Arc::new(CometSchemaAdapterFactory::default()), + ); if let Some(filter) = test_data_filters { builder = builder.with_predicate(filter); diff --git a/native/core/src/execution/datafusion/schema_adapter.rs b/native/core/src/execution/datafusion/schema_adapter.rs new file mode 100644 index 000000000..a54242cd8 --- /dev/null +++ b/native/core/src/execution/datafusion/schema_adapter.rs @@ -0,0 +1,256 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Custom schema adapter that uses Spark-compatible casts + +use arrow_schema::{Schema, SchemaRef}; +use datafusion::datasource::schema_adapter::{SchemaAdapter, SchemaAdapterFactory, SchemaMapper}; +use std::sync::Arc; +use arrow::compute::{can_cast_types, cast}; +use arrow_array::{new_null_array, RecordBatch, RecordBatchOptions}; +use datafusion_common::plan_err; + +#[derive(Clone, Debug, Default)] +pub struct CometSchemaAdapterFactory {} + +impl SchemaAdapterFactory for CometSchemaAdapterFactory { + /// Create a new factory for mapping batches from a file schema to a table + /// schema. + /// + /// This is a convenience for [`DefaultSchemaAdapterFactory::create`] with + /// the same schema for both the projected table schema and the table + /// schema. + fn create( + &self, + projected_table_schema: SchemaRef, + table_schema: SchemaRef, + ) -> Box { + Box::new(CometSchemaAdapter { + projected_table_schema, + table_schema, + }) + } +} + +/// This SchemaAdapter requires both the table schema and the projected table +/// schema. See [`SchemaMapping`] for more details +#[derive(Clone, Debug)] +pub struct CometSchemaAdapter { + /// The schema for the table, projected to include only the fields being output (projected) by the + /// associated ParquetExec + projected_table_schema: SchemaRef, + /// The entire table schema for the table we're using this to adapt. + /// + /// This is used to evaluate any filters pushed down into the scan + /// which may refer to columns that are not referred to anywhere + /// else in the plan. + table_schema: SchemaRef, +} + +impl SchemaAdapter for CometSchemaAdapter { + /// Map a column index in the table schema to a column index in a particular + /// file schema + /// + /// Panics if index is not in range for the table schema + fn map_column_index(&self, index: usize, file_schema: &Schema) -> Option { + let field = self.projected_table_schema.field(index); + Some(file_schema.fields.find(field.name())?.0) + } + + /// Creates a `SchemaMapping` for casting or mapping the columns from the + /// file schema to the table schema. + /// + /// If the provided `file_schema` contains columns of a different type to + /// the expected `table_schema`, the method will attempt to cast the array + /// data from the file schema to the table schema where possible. + /// + /// Returns a [`SchemaMapping`] that can be applied to the output batch + /// along with an ordered list of columns to project from the file + fn map_schema( + &self, + file_schema: &Schema, + ) -> datafusion_common::Result<(Arc, Vec)> { + let mut projection = Vec::with_capacity(file_schema.fields().len()); + let mut field_mappings = vec![None; self.projected_table_schema.fields().len()]; + + for (file_idx, file_field) in file_schema.fields.iter().enumerate() { + if let Some((table_idx, table_field)) = + self.projected_table_schema.fields().find(file_field.name()) + { + match can_cast_types(file_field.data_type(), table_field.data_type()) { + true => { + field_mappings[table_idx] = Some(projection.len()); + projection.push(file_idx); + } + false => { + return plan_err!( + "Cannot cast file schema field {} of type {:?} to table schema field of type {:?}", + file_field.name(), + file_field.data_type(), + table_field.data_type() + ) + } + } + } + } + + Ok(( + Arc::new(SchemaMapping { + projected_table_schema: self.projected_table_schema.clone(), + field_mappings, + table_schema: self.table_schema.clone(), + }), + projection, + )) + } + +} + +// TODO SchemaMapping is public in DataFusion, but its fields are not. It would be +// good to fix that so we don't have to duplicate the code here. + +/// The SchemaMapping struct holds a mapping from the file schema to the table +/// schema and any necessary type conversions. +/// +/// Note, because `map_batch` and `map_partial_batch` functions have different +/// needs, this struct holds two schemas: +/// +/// 1. The projected **table** schema +/// 2. The full table schema +/// +/// [`map_batch`] is used by the ParquetOpener to produce a RecordBatch which +/// has the projected schema, since that's the schema which is supposed to come +/// out of the execution of this query. Thus `map_batch` uses +/// `projected_table_schema` as it can only operate on the projected fields. +/// +/// [`map_partial_batch`] is used to create a RecordBatch with a schema that +/// can be used for Parquet predicate pushdown, meaning that it may contain +/// fields which are not in the projected schema (as the fields that parquet +/// pushdown filters operate can be completely distinct from the fields that are +/// projected (output) out of the ParquetExec). `map_partial_batch` thus uses +/// `table_schema` to create the resulting RecordBatch (as it could be operating +/// on any fields in the schema). +/// +/// [`map_batch`]: Self::map_batch +/// [`map_partial_batch`]: Self::map_partial_batch +#[derive(Debug)] +pub struct SchemaMapping { + /// The schema of the table. This is the expected schema after conversion + /// and it should match the schema of the query result. + projected_table_schema: SchemaRef, + /// Mapping from field index in `projected_table_schema` to index in + /// projected file_schema. + /// + /// They are Options instead of just plain `usize`s because the table could + /// have fields that don't exist in the file. + field_mappings: Vec>, + /// The entire table schema, as opposed to the projected_table_schema (which + /// only contains the columns that we are projecting out of this query). + /// This contains all fields in the table, regardless of if they will be + /// projected out or not. + table_schema: SchemaRef, +} + +impl SchemaMapper for SchemaMapping { + /// Adapts a `RecordBatch` to match the `projected_table_schema` using the stored mapping and + /// conversions. The produced RecordBatch has a schema that contains only the projected + /// columns, so if one needs a RecordBatch with a schema that references columns which are not + /// in the projected, it would be better to use `map_partial_batch` + fn map_batch(&self, batch: RecordBatch) -> datafusion_common::Result { + let batch_rows = batch.num_rows(); + let batch_cols = batch.columns().to_vec(); + + let cols = self + .projected_table_schema + // go through each field in the projected schema + .fields() + .iter() + // and zip it with the index that maps fields from the projected table schema to the + // projected file schema in `batch` + .zip(&self.field_mappings) + // and for each one... + .map(|(field, file_idx)| { + file_idx.map_or_else( + // If this field only exists in the table, and not in the file, then we know + // that it's null, so just return that. + || Ok(new_null_array(field.data_type(), batch_rows)), + // However, if it does exist in both, then try to cast it to the correct output + // type + |batch_idx| cast(&batch_cols[batch_idx], field.data_type()), + ) + }) + .collect::, _>>()?; + + // Necessary to handle empty batches + let options = RecordBatchOptions::new().with_row_count(Some(batch.num_rows())); + + let schema = self.projected_table_schema.clone(); + let record_batch = RecordBatch::try_new_with_options(schema, cols, &options)?; + Ok(record_batch) + } + + /// Adapts a [`RecordBatch`]'s schema into one that has all the correct output types and only + /// contains the fields that exist in both the file schema and table schema. + /// + /// Unlike `map_batch` this method also preserves the columns that + /// may not appear in the final output (`projected_table_schema`) but may + /// appear in push down predicates + fn map_partial_batch( + &self, + batch: RecordBatch, + ) -> datafusion_common::Result { + let batch_cols = batch.columns().to_vec(); + let schema = batch.schema(); + + // for each field in the batch's schema (which is based on a file, not a table)... + let (cols, fields) = schema + .fields() + .iter() + .zip(batch_cols.iter()) + .flat_map(|(field, batch_col)| { + self.table_schema + // try to get the same field from the table schema that we have stored in self + .field_with_name(field.name()) + // and if we don't have it, that's fine, ignore it. This may occur when we've + // created an external table whose fields are a subset of the fields in this + // file, then tried to read data from the file into this table. If that is the + // case here, it's fine to ignore because we don't care about this field + // anyways + .ok() + // but if we do have it, + .map(|table_field| { + // try to cast it into the correct output type. we don't want to ignore this + // error, though, so it's propagated. + cast(batch_col, table_field.data_type()) + // and if that works, return the field and column. + .map(|new_col| (new_col, table_field.clone())) + }) + }) + .collect::, _>>()? + .into_iter() + .unzip::<_, _, Vec<_>, Vec<_>>(); + + // Necessary to handle empty batches + let options = RecordBatchOptions::new().with_row_count(Some(batch.num_rows())); + + let schema = + Arc::new(Schema::new_with_metadata(fields, schema.metadata().clone())); + let record_batch = RecordBatch::try_new_with_options(schema, cols, &options)?; + Ok(record_batch) + } +} + diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala index b5fc99546..a0fa72c93 100644 --- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala @@ -2258,11 +2258,14 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { // Struct([Field { name: "id", ... }, Field { name: "nested2", data_type: Struct([Field { name: "id", ... }]), ... }]) // to table schema field of type // Struct([Field { name: "id", ... }]) - checkSparkAnswerAndOperator(df.select("nested1.id")) + //checkSparkAnswerAndOperator(df.select("nested1.id")) + + + checkSparkAnswerAndOperator(df.select("nested1.id", "nested1.nested2.id")) // TODO -// checkSparkAnswerAndOperator(df.select("nested1.nested2.id")) + checkSparkAnswerAndOperator(df.select("nested1.nested2.id")) } } } From c1c8cfddbd6874d204cb9aa47d4a0675a56fe73d Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 12 Nov 2024 08:54:14 -0700 Subject: [PATCH 13/16] more tests pass --- .../execution/datafusion/schema_adapter.rs | 42 ++++++++++--------- .../apache/comet/CometExpressionSuite.scala | 12 +----- 2 files changed, 24 insertions(+), 30 deletions(-) diff --git a/native/core/src/execution/datafusion/schema_adapter.rs b/native/core/src/execution/datafusion/schema_adapter.rs index a54242cd8..e3751602e 100644 --- a/native/core/src/execution/datafusion/schema_adapter.rs +++ b/native/core/src/execution/datafusion/schema_adapter.rs @@ -17,12 +17,12 @@ //! Custom schema adapter that uses Spark-compatible casts -use arrow_schema::{Schema, SchemaRef}; -use datafusion::datasource::schema_adapter::{SchemaAdapter, SchemaAdapterFactory, SchemaMapper}; -use std::sync::Arc; use arrow::compute::{can_cast_types, cast}; use arrow_array::{new_null_array, RecordBatch, RecordBatchOptions}; +use arrow_schema::{DataType, Schema, SchemaRef}; +use datafusion::datasource::schema_adapter::{SchemaAdapter, SchemaAdapterFactory, SchemaMapper}; use datafusion_common::plan_err; +use std::sync::Arc; #[derive(Clone, Debug, Default)] pub struct CometSchemaAdapterFactory {} @@ -91,18 +91,26 @@ impl SchemaAdapter for CometSchemaAdapter { if let Some((table_idx, table_field)) = self.projected_table_schema.fields().find(file_field.name()) { - match can_cast_types(file_field.data_type(), table_field.data_type()) { - true => { + // workaround for struct casting + match (file_field.data_type(), table_field.data_type()) { + // TODO need to use Comet cast logic to determine which casts are supported, + // but for now just add a hack to support casting between struct types + (DataType::Struct(_), DataType::Struct(_)) => { field_mappings[table_idx] = Some(projection.len()); projection.push(file_idx); } - false => { - return plan_err!( - "Cannot cast file schema field {} of type {:?} to table schema field of type {:?}", - file_field.name(), - file_field.data_type(), - table_field.data_type() - ) + _ => { + if can_cast_types(file_field.data_type(), table_field.data_type()) { + field_mappings[table_idx] = Some(projection.len()); + projection.push(file_idx); + } else { + return plan_err!( + "Cannot cast file schema field {} of type {:?} to table schema field of type {:?}", + file_field.name(), + file_field.data_type(), + table_field.data_type() + ); + } } } } @@ -117,7 +125,6 @@ impl SchemaAdapter for CometSchemaAdapter { projection, )) } - } // TODO SchemaMapping is public in DataFusion, but its fields are not. It would be @@ -209,10 +216,7 @@ impl SchemaMapper for SchemaMapping { /// Unlike `map_batch` this method also preserves the columns that /// may not appear in the final output (`projected_table_schema`) but may /// appear in push down predicates - fn map_partial_batch( - &self, - batch: RecordBatch, - ) -> datafusion_common::Result { + fn map_partial_batch(&self, batch: RecordBatch) -> datafusion_common::Result { let batch_cols = batch.columns().to_vec(); let schema = batch.schema(); @@ -247,10 +251,8 @@ impl SchemaMapper for SchemaMapping { // Necessary to handle empty batches let options = RecordBatchOptions::new().with_row_count(Some(batch.num_rows())); - let schema = - Arc::new(Schema::new_with_metadata(fields, schema.metadata().clone())); + let schema = Arc::new(Schema::new_with_metadata(fields, schema.metadata().clone())); let record_batch = RecordBatch::try_new_with_options(schema, cols, &options)?; Ok(record_batch) } } - diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala index a0fa72c93..551c6daa5 100644 --- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala @@ -2253,19 +2253,11 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { val df = spark.read.parquet(dir.toString()) - // TODO this currently fails with the following error from DataFusion - // Cannot cast file schema field nested1 of type - // Struct([Field { name: "id", ... }, Field { name: "nested2", data_type: Struct([Field { name: "id", ... }]), ... }]) - // to table schema field of type - // Struct([Field { name: "id", ... }]) - //checkSparkAnswerAndOperator(df.select("nested1.id")) - + checkSparkAnswerAndOperator(df.select("nested1.id")) checkSparkAnswerAndOperator(df.select("nested1.id", "nested1.nested2.id")) - - // TODO - checkSparkAnswerAndOperator(df.select("nested1.nested2.id")) + //checkSparkAnswerAndOperator(df.select("nested1.nested2.id")) } } } From 98296e4ed1125243548d813f4d5ea3923d69ca8f Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 12 Nov 2024 13:01:40 -0700 Subject: [PATCH 14/16] save progress --- .../execution/datafusion/schema_adapter.rs | 42 +++++++++++++++---- native/spark-expr/src/cast.rs | 3 ++ .../apache/comet/serde/QueryPlanSerde.scala | 3 -- .../apache/comet/CometExpressionSuite.scala | 5 ++- 4 files changed, 40 insertions(+), 13 deletions(-) diff --git a/native/core/src/execution/datafusion/schema_adapter.rs b/native/core/src/execution/datafusion/schema_adapter.rs index e3751602e..ea114a87e 100644 --- a/native/core/src/execution/datafusion/schema_adapter.rs +++ b/native/core/src/execution/datafusion/schema_adapter.rs @@ -17,11 +17,13 @@ //! Custom schema adapter that uses Spark-compatible casts -use arrow::compute::{can_cast_types, cast}; -use arrow_array::{new_null_array, RecordBatch, RecordBatchOptions}; +use arrow::compute::can_cast_types; +use arrow_array::{new_null_array, Array, RecordBatch, RecordBatchOptions}; use arrow_schema::{DataType, Schema, SchemaRef}; use datafusion::datasource::schema_adapter::{SchemaAdapter, SchemaAdapterFactory, SchemaMapper}; +use datafusion_comet_spark_expr::{spark_cast, EvalMode}; use datafusion_common::plan_err; +use datafusion_expr::ColumnarValue; use std::sync::Arc; #[derive(Clone, Debug, Default)] @@ -91,6 +93,12 @@ impl SchemaAdapter for CometSchemaAdapter { if let Some((table_idx, table_field)) = self.projected_table_schema.fields().find(file_field.name()) { + println!( + "SchemaAdapter cast from {} to {}", + file_field.data_type(), + table_field.data_type() + ); + // workaround for struct casting match (file_field.data_type(), table_field.data_type()) { // TODO need to use Comet cast logic to determine which casts are supported, @@ -127,8 +135,9 @@ impl SchemaAdapter for CometSchemaAdapter { } } -// TODO SchemaMapping is public in DataFusion, but its fields are not. It would be -// good to fix that so we don't have to duplicate the code here. +// TODO SchemaMapping is mostly copied from DataFusion but calls spark_cast +// instead of arrow cast - can we reduce the amount of code copied here and make +// the DataFusion version more extensible? /// The SchemaMapping struct holds a mapping from the file schema to the table /// schema and any necessary type conversions. @@ -197,7 +206,17 @@ impl SchemaMapper for SchemaMapping { || Ok(new_null_array(field.data_type(), batch_rows)), // However, if it does exist in both, then try to cast it to the correct output // type - |batch_idx| cast(&batch_cols[batch_idx], field.data_type()), + |batch_idx| { + spark_cast( + ColumnarValue::Array(Arc::clone(&batch_cols[batch_idx])), + field.data_type(), + // TODO need to pass in configs here + EvalMode::Legacy, + "UTC", + false, + )? + .into_array(batch_rows) + }, ) }) .collect::, _>>()?; @@ -239,9 +258,16 @@ impl SchemaMapper for SchemaMapping { .map(|table_field| { // try to cast it into the correct output type. we don't want to ignore this // error, though, so it's propagated. - cast(batch_col, table_field.data_type()) - // and if that works, return the field and column. - .map(|new_col| (new_col, table_field.clone())) + spark_cast( + ColumnarValue::Array(Arc::clone(batch_col)), + table_field.data_type(), + // TODO need to pass in configs here + EvalMode::Legacy, + "UTC", + false, + )?.into_array(batch_col.len()) + // and if that works, return the field and column. + .map(|new_col| (new_col, table_field.clone())) }) }) .collect::, _>>()? diff --git a/native/spark-expr/src/cast.rs b/native/spark-expr/src/cast.rs index 13263a595..df0b7251b 100644 --- a/native/spark-expr/src/cast.rs +++ b/native/spark-expr/src/cast.rs @@ -623,6 +623,9 @@ fn cast_array( ) -> DataFusionResult { let array = array_with_timezone(array, timezone.clone(), Some(to_type))?; let from_type = array.data_type().clone(); + + println!("Comet cast_array from {from_type} to {to_type}"); + let array = match &from_type { DataType::Dictionary(key_type, value_type) if key_type.as_ref() == &DataType::Int32 diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 6a22486de..4f8374104 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -2873,9 +2873,6 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim withInfo(join, "SortMergeJoin is not enabled") None - case _: DeserializeToObjectExec => - None - case op if isCometSink(op) && op.output.forall(a => supportedDataType(a.dataType, true)) => // These operators are source of Comet native execution chain val scanBuilder = OperatorOuterClass.Scan.newBuilder() diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala index 551c6daa5..85ac6138b 100644 --- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala @@ -2257,13 +2257,14 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { checkSparkAnswerAndOperator(df.select("nested1.id", "nested1.nested2.id")) - //checkSparkAnswerAndOperator(df.select("nested1.nested2.id")) + // unsupported cast from Int64 to Struct([Field { name: "id", data_type: Int64, ... + // checkSparkAnswerAndOperator(df.select("nested1.nested2.id")) } } } } - // TODO this is not using DataFusion's ParquetExec + // TODO this is not using DataFusion's ParquetExec for some reason ignore("get_struct_field with DataFusion ParquetExec - read entire struct") { withTempPath { dir => // create input file with Comet disabled From de6c675d713c5e9bd647ee77e92c4d60b2cfda78 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 4 Dec 2024 12:06:42 -0700 Subject: [PATCH 15/16] remove debug println --- native/spark-expr/src/cast.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/native/spark-expr/src/cast.rs b/native/spark-expr/src/cast.rs index df0b7251b..8ef9ca291 100644 --- a/native/spark-expr/src/cast.rs +++ b/native/spark-expr/src/cast.rs @@ -624,8 +624,6 @@ fn cast_array( let array = array_with_timezone(array, timezone.clone(), Some(to_type))?; let from_type = array.data_type().clone(); - println!("Comet cast_array from {from_type} to {to_type}"); - let array = match &from_type { DataType::Dictionary(key_type, value_type) if key_type.as_ref() == &DataType::Int32 From 45f1c9dae7fe0a1a2976ce98de9aab07845da433 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 4 Dec 2024 12:10:22 -0700 Subject: [PATCH 16/16] remove debug println --- native/core/src/execution/datafusion/schema_adapter.rs | 6 ------ 1 file changed, 6 deletions(-) diff --git a/native/core/src/execution/datafusion/schema_adapter.rs b/native/core/src/execution/datafusion/schema_adapter.rs index ea114a87e..16d4b9d67 100644 --- a/native/core/src/execution/datafusion/schema_adapter.rs +++ b/native/core/src/execution/datafusion/schema_adapter.rs @@ -93,12 +93,6 @@ impl SchemaAdapter for CometSchemaAdapter { if let Some((table_idx, table_field)) = self.projected_table_schema.fields().find(file_field.name()) { - println!( - "SchemaAdapter cast from {} to {}", - file_field.data_type(), - table_field.data_type() - ); - // workaround for struct casting match (file_field.data_type(), table_field.data_type()) { // TODO need to use Comet cast logic to determine which casts are supported,