From 54a5f59db7fe96f2e286b738c57b59aabe0aca9b Mon Sep 17 00:00:00 2001 From: "ganesh.maddula" Date: Sun, 28 Apr 2024 01:54:45 +0530 Subject: [PATCH 01/11] handled cast for long to short --- core/src/errors.rs | 10 ++++ .../execution/datafusion/expressions/cast.rs | 48 ++++++++++++++++++- .../org/apache/comet/CometCastSuite.scala | 31 ++++++++++-- 3 files changed, 83 insertions(+), 6 deletions(-) diff --git a/core/src/errors.rs b/core/src/errors.rs index f02bd1969..5a410e9f1 100644 --- a/core/src/errors.rs +++ b/core/src/errors.rs @@ -71,6 +71,16 @@ pub enum CometError { from_type: String, to_type: String, }, + // Note that this message format is based on Spark 3.4 and is more detailed than the message + // returned by Spark 3.2 or 3.3 + #[error("[CAST_OVERFLOW] The value {value} of the type \"{from_type}\" cannot be cast to \"{to_type}\" \ + due to an overflow. Use `try_cast` to tolerate overflow and return NULL instead. If necessary \ + set \"spark.sql.ansi.enabled\" to \"false\" to bypass this error.")] + CastOverFlow { + value: String, + from_type: String, + to_type: String, + }, #[error(transparent)] Arrow { diff --git a/core/src/execution/datafusion/expressions/cast.rs b/core/src/execution/datafusion/expressions/cast.rs index 10079855d..4c9a50ad8 100644 --- a/core/src/execution/datafusion/expressions/cast.rs +++ b/core/src/execution/datafusion/expressions/cast.rs @@ -28,7 +28,7 @@ use arrow::{ record_batch::RecordBatch, util::display::FormatOptions, }; -use arrow_array::{Array, ArrayRef, BooleanArray, GenericStringArray, OffsetSizeTrait}; +use arrow_array::{Array, ArrayRef, BooleanArray, GenericStringArray, Int16Array, Int64Array, OffsetSizeTrait}; use arrow_schema::{DataType, Schema}; use datafusion::logical_expr::ColumnarValue; use datafusion_common::{internal_err, Result as DataFusionResult, ScalarValue}; @@ -103,11 +103,57 @@ impl Cast { (DataType::LargeUtf8, DataType::Boolean) => { Self::spark_cast_utf8_to_boolean::(&array, self.eval_mode)? } + (DataType::Int64, DataType::Int16) if self.eval_mode != EvalMode::Try => { + // (DataType::Int64, DataType::Int16) => { + Self::spark_cast_int64_to_int16(&array, self.eval_mode)? + } _ => cast_with_options(&array, to_type, &CAST_OPTIONS)?, }; let result = spark_cast(cast_result, from_type, to_type); Ok(result) } + fn spark_cast_int64_to_int16( + from: &dyn Array, + eval_mode: EvalMode, + ) -> CometResult + { + let array = from + .as_any() + .downcast_ref::() + .unwrap(); + + let output_array = match eval_mode { + EvalMode::Legacy => { + array.iter() + .map(|value| match value{ + Some(value) => Ok::, CometError>(Some(value as i16)), + _ => Ok(None) + }) + .collect::>()? + }, + _ => { + array.iter() + .map(|value| match value{ + Some(value) => { + let res = i16::try_from(value); + if res.is_err() { + Err(CometError::CastOverFlow{ + value: value.to_string() + "L", + from_type: "BIGINT".to_string(), + to_type: "SMALLINT".to_string(), + }) + }else{ + Ok::, CometError>(Some(i16::try_from(value).unwrap())) + } + + }, + _ => Ok(None) + }) + .collect::>()? + } + }; + Ok(Arc::new(output_array)) + } fn spark_cast_utf8_to_boolean( from: &dyn Array, diff --git a/spark/src/test/scala/org/apache/comet/CometCastSuite.scala b/spark/src/test/scala/org/apache/comet/CometCastSuite.scala index 8abd24598..2f42640a5 100644 --- a/spark/src/test/scala/org/apache/comet/CometCastSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometCastSuite.scala @@ -43,10 +43,10 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper { private val datePattern = "0123456789/" + whitespaceChars private val timestampPattern = "0123456789/:T" + whitespaceChars - ignore("cast long to short") { - castTest(generateLongs, DataTypes.ShortType) - } - +// ignore("cast long to short") { +// castTest(generateLongs, DataTypes.ShortType) +// } +// ignore("cast float to bool") { castTest(generateFloats, DataTypes.BooleanType) } @@ -106,6 +106,27 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper { castTest(values.toDF("a"), DataTypes.DoubleType) } + // spotless:off + test("cast short to int"){ + + } + test("cast short to long"){ + + } + test("cast int to short"){ + + } + test("cast int to long"){ + + } + test("cast long to short"){ + castTest(generateLongs, DataTypes.ShortType) + } + test("cast long to int"){ + + } + // spotless:on + private def generateFloats(): DataFrame = { val r = new Random(0) Range(0, dataSize).map(_ => r.nextFloat()).toDF("a") @@ -113,7 +134,7 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper { private def generateLongs(): DataFrame = { val r = new Random(0) - Range(0, dataSize).map(_ => r.nextLong()).toDF("a") + (Range(0, dataSize).map(_ => r.nextLong()) ++ Seq(Long.MaxValue, Long.MinValue)).toDF("a") } private def generateString(r: Random, chars: String, maxLen: Int): String = { From 8b296417b331cfb2de64f2cffac653d8bac1b51a Mon Sep 17 00:00:00 2001 From: "ganesh.maddula" Date: Mon, 29 Apr 2024 01:57:59 +0530 Subject: [PATCH 02/11] handled cast for all overflow cases --- core/src/errors.rs | 3 +- .../execution/datafusion/expressions/cast.rs | 131 ++++++++++++------ .../org/apache/comet/CometCastSuite.scala | 37 +++-- 3 files changed, 114 insertions(+), 57 deletions(-) diff --git a/core/src/errors.rs b/core/src/errors.rs index 5a410e9f1..a06c613ad 100644 --- a/core/src/errors.rs +++ b/core/src/errors.rs @@ -71,8 +71,7 @@ pub enum CometError { from_type: String, to_type: String, }, - // Note that this message format is based on Spark 3.4 and is more detailed than the message - // returned by Spark 3.2 or 3.3 + #[error("[CAST_OVERFLOW] The value {value} of the type \"{from_type}\" cannot be cast to \"{to_type}\" \ due to an overflow. Use `try_cast` to tolerate overflow and return NULL instead. If necessary \ set \"spark.sql.ansi.enabled\" to \"false\" to bypass this error.")] diff --git a/core/src/execution/datafusion/expressions/cast.rs b/core/src/execution/datafusion/expressions/cast.rs index 4c9a50ad8..f7cbb5ee2 100644 --- a/core/src/execution/datafusion/expressions/cast.rs +++ b/core/src/execution/datafusion/expressions/cast.rs @@ -28,7 +28,8 @@ use arrow::{ record_batch::RecordBatch, util::display::FormatOptions, }; -use arrow_array::{Array, ArrayRef, BooleanArray, GenericStringArray, Int16Array, Int64Array, OffsetSizeTrait}; +use arrow_array::{Array, ArrayRef, BooleanArray, GenericStringArray, OffsetSizeTrait, PrimitiveArray}; +use arrow_array::types::{Int16Type, Int32Type, Int64Type, Int8Type}; use arrow_schema::{DataType, Schema}; use datafusion::logical_expr::ColumnarValue; use datafusion_common::{internal_err, Result as DataFusionResult, ScalarValue}; @@ -64,6 +65,62 @@ pub struct Cast { pub timezone: String, } +macro_rules! cast_int_to_int_macro{ + ( + $array: expr, + $eval_mode:expr, + $from_arrow_primitive_type: ty, + $to_arrow_primitive_type: ty, + $from_data_type: expr, + $to_native_type: ty, + $spark_from_data_type_name: expr, + $spark_to_data_type_name: expr + ) => {{ + let cast_array = $array + .as_any() + .downcast_ref::>() + .unwrap(); + let spark_int_literal_suffix = match $from_data_type { + &DataType::Int64 => "L", + &DataType::Int16 => "S", + &DataType::Int8 => "T", + _ => "" + }; + + let output_array = match $eval_mode { + EvalMode::Legacy => + cast_array.iter() + .map(|value| match value { + Some(value) => Ok::, CometError>(Some(value as $to_native_type)), + _ => Ok(None) + }) + .collect::, _>>(), + _ => { + cast_array.iter() + .map(|value| match value{ + Some(value) => { + let res = <$to_native_type>::try_from(value); + if res.is_err() { + Err(CometError::CastOverFlow{ + value: value.to_string() + spark_int_literal_suffix, + from_type: $spark_from_data_type_name.to_string(), + to_type: $spark_to_data_type_name.to_string(), + }) + }else{ + Ok::, CometError>(Some(res.unwrap())) + } + + }, + _ => Ok(None) + }) + .collect::, _>>() + } + }?; + let result: CometResult = Ok(Arc::new(output_array) as ArrayRef); + result + }}; +} + impl Cast { pub fn new( child: Arc, @@ -103,56 +160,46 @@ impl Cast { (DataType::LargeUtf8, DataType::Boolean) => { Self::spark_cast_utf8_to_boolean::(&array, self.eval_mode)? } - (DataType::Int64, DataType::Int16) if self.eval_mode != EvalMode::Try => { - // (DataType::Int64, DataType::Int16) => { - Self::spark_cast_int64_to_int16(&array, self.eval_mode)? + (DataType::Int64, DataType::Int32) + | (DataType::Int64, DataType::Int16) + | (DataType::Int64, DataType::Int8) + | (DataType::Int32, DataType::Int16) + | (DataType::Int32, DataType::Int8) + | (DataType::Int16, DataType::Int8) + if self.eval_mode != EvalMode::Try => { + Self::spark_cast_int_to_int(&array, self.eval_mode, from_type, to_type)? } _ => cast_with_options(&array, to_type, &CAST_OPTIONS)?, }; let result = spark_cast(cast_result, from_type, to_type); Ok(result) } - fn spark_cast_int64_to_int16( - from: &dyn Array, + + fn spark_cast_int_to_int( + array: &dyn Array, eval_mode: EvalMode, + from_type: &DataType, + to_type: &DataType, ) -> CometResult { - let array = from - .as_any() - .downcast_ref::() - .unwrap(); - - let output_array = match eval_mode { - EvalMode::Legacy => { - array.iter() - .map(|value| match value{ - Some(value) => Ok::, CometError>(Some(value as i16)), - _ => Ok(None) - }) - .collect::>()? - }, - _ => { - array.iter() - .map(|value| match value{ - Some(value) => { - let res = i16::try_from(value); - if res.is_err() { - Err(CometError::CastOverFlow{ - value: value.to_string() + "L", - from_type: "BIGINT".to_string(), - to_type: "SMALLINT".to_string(), - }) - }else{ - Ok::, CometError>(Some(i16::try_from(value).unwrap())) - } - - }, - _ => Ok(None) - }) - .collect::>()? - } - }; - Ok(Arc::new(output_array)) + match (from_type, to_type) { + (DataType::Int64, DataType::Int32) => + cast_int_to_int_macro!(array, eval_mode, Int64Type, Int32Type, from_type, i32, "BIGINT", "INT"), + (DataType::Int64, DataType::Int16) => + cast_int_to_int_macro!(array, eval_mode, Int64Type, Int16Type, from_type, i16, "BIGINT", "SMALLINT"), + (DataType::Int64, DataType::Int8) => + cast_int_to_int_macro!(array, eval_mode, Int64Type, Int8Type, from_type, i8, "BIGINT", "TINYINT"), + (DataType::Int32, DataType::Int16) => + cast_int_to_int_macro!(array, eval_mode, Int32Type, Int16Type, from_type, i16, "INT", "SMALLINT"), + (DataType::Int32, DataType::Int8) => + cast_int_to_int_macro!(array, eval_mode, Int32Type, Int8Type, from_type, i8, "INT", "TINYINT"), + (DataType::Int16, DataType::Int8) => + cast_int_to_int_macro!(array, eval_mode, Int16Type, Int8Type, from_type, i8, "SMALLINT", "TINYINT"), + _ => unreachable!( + "{}", + format!("invalid integer type {to_type} in cast from {from_type}") + ), + } } fn spark_cast_utf8_to_boolean( diff --git a/spark/src/test/scala/org/apache/comet/CometCastSuite.scala b/spark/src/test/scala/org/apache/comet/CometCastSuite.scala index 2f42640a5..54b08ae0e 100644 --- a/spark/src/test/scala/org/apache/comet/CometCastSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometCastSuite.scala @@ -43,10 +43,6 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper { private val datePattern = "0123456789/" + whitespaceChars private val timestampPattern = "0123456789/:T" + whitespaceChars -// ignore("cast long to short") { -// castTest(generateLongs, DataTypes.ShortType) -// } -// ignore("cast float to bool") { castTest(generateFloats, DataTypes.BooleanType) } @@ -106,26 +102,29 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper { castTest(values.toDF("a"), DataTypes.DoubleType) } - // spotless:off - test("cast short to int"){ - + test("cast short to byte") { + castTest(generateShorts, DataTypes.ByteType) } - test("cast short to long"){ + test("cast int to byte") { + castTest(generateInts, DataTypes.ByteType) } - test("cast int to short"){ + test("cast int to short") { + castTest(generateInts, DataTypes.ShortType) } - test("cast int to long"){ + test("cast long to byte") { + castTest(generateLongs, DataTypes.ByteType) } - test("cast long to short"){ + + test("cast long to short") { castTest(generateLongs, DataTypes.ShortType) } - test("cast long to int"){ + test("cast long to int") { + castTest(generateLongs, DataTypes.IntegerType) } - // spotless:on private def generateFloats(): DataFrame = { val r = new Random(0) @@ -137,6 +136,18 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper { (Range(0, dataSize).map(_ => r.nextLong()) ++ Seq(Long.MaxValue, Long.MinValue)).toDF("a") } + private def generateInts(): DataFrame = { + val r = new Random(0) + (Range(0, dataSize).map(_ => r.nextInt()) ++ Seq(Int.MaxValue, Int.MinValue)).toDF("a") + } + + private def generateShorts(): DataFrame = { + val r = new Random(0) + (Range(0, dataSize).map(_ => r.nextInt(Short.MaxValue).toShort) ++ Seq( + Short.MaxValue, + Short.MinValue)).toDF("a") + } + private def generateString(r: Random, chars: String, maxLen: Int): String = { val len = r.nextInt(maxLen) Range(0, len).map(_ => chars.charAt(r.nextInt(chars.length))).mkString From a0895527a8c40878f1f1606ca18064de19f9883e Mon Sep 17 00:00:00 2001 From: "ganesh.maddula" Date: Mon, 29 Apr 2024 02:16:04 +0530 Subject: [PATCH 03/11] ran make format --- .../execution/datafusion/expressions/cast.rs | 102 ++++++++++-------- 1 file changed, 55 insertions(+), 47 deletions(-) diff --git a/core/src/execution/datafusion/expressions/cast.rs b/core/src/execution/datafusion/expressions/cast.rs index f7cbb5ee2..7b021eb57 100644 --- a/core/src/execution/datafusion/expressions/cast.rs +++ b/core/src/execution/datafusion/expressions/cast.rs @@ -28,8 +28,10 @@ use arrow::{ record_batch::RecordBatch, util::display::FormatOptions, }; -use arrow_array::{Array, ArrayRef, BooleanArray, GenericStringArray, OffsetSizeTrait, PrimitiveArray}; -use arrow_array::types::{Int16Type, Int32Type, Int64Type, Int8Type}; +use arrow_array::{ + types::{Int16Type, Int32Type, Int64Type, Int8Type}, + Array, ArrayRef, BooleanArray, GenericStringArray, OffsetSizeTrait, PrimitiveArray, +}; use arrow_schema::{DataType, Schema}; use datafusion::logical_expr::ColumnarValue; use datafusion_common::{internal_err, Result as DataFusionResult, ScalarValue}; @@ -65,7 +67,7 @@ pub struct Cast { pub timezone: String, } -macro_rules! cast_int_to_int_macro{ +macro_rules! cast_int_to_int_macro { ( $array: expr, $eval_mode:expr, @@ -78,43 +80,43 @@ macro_rules! cast_int_to_int_macro{ ) => {{ let cast_array = $array .as_any() - .downcast_ref::>() + .downcast_ref::>() .unwrap(); let spark_int_literal_suffix = match $from_data_type { &DataType::Int64 => "L", &DataType::Int16 => "S", &DataType::Int8 => "T", - _ => "" + _ => "", }; let output_array = match $eval_mode { - EvalMode::Legacy => - cast_array.iter() - .map(|value| match value { - Some(value) => Ok::, CometError>(Some(value as $to_native_type)), - _ => Ok(None) - }) - .collect::, _>>(), - _ => { - cast_array.iter() - .map(|value| match value{ - Some(value) => { - let res = <$to_native_type>::try_from(value); - if res.is_err() { - Err(CometError::CastOverFlow{ - value: value.to_string() + spark_int_literal_suffix, - from_type: $spark_from_data_type_name.to_string(), - to_type: $spark_to_data_type_name.to_string(), - }) - }else{ - Ok::, CometError>(Some(res.unwrap())) - } - - }, - _ => Ok(None) - }) - .collect::, _>>() - } + EvalMode::Legacy => cast_array + .iter() + .map(|value| match value { + Some(value) => { + Ok::, CometError>(Some(value as $to_native_type)) + } + _ => Ok(None), + }) + .collect::, _>>(), + _ => cast_array + .iter() + .map(|value| match value { + Some(value) => { + let res = <$to_native_type>::try_from(value); + if res.is_err() { + Err(CometError::CastOverFlow { + value: value.to_string() + spark_int_literal_suffix, + from_type: $spark_from_data_type_name.to_string(), + to_type: $spark_to_data_type_name.to_string(), + }) + } else { + Ok::, CometError>(Some(res.unwrap())) + } + } + _ => Ok(None), + }) + .collect::, _>>(), }?; let result: CometResult = Ok(Arc::new(output_array) as ArrayRef); result @@ -166,7 +168,8 @@ impl Cast { | (DataType::Int32, DataType::Int16) | (DataType::Int32, DataType::Int8) | (DataType::Int16, DataType::Int8) - if self.eval_mode != EvalMode::Try => { + if self.eval_mode != EvalMode::Try => + { Self::spark_cast_int_to_int(&array, self.eval_mode, from_type, to_type)? } _ => cast_with_options(&array, to_type, &CAST_OPTIONS)?, @@ -180,21 +183,26 @@ impl Cast { eval_mode: EvalMode, from_type: &DataType, to_type: &DataType, - ) -> CometResult - { + ) -> CometResult { match (from_type, to_type) { - (DataType::Int64, DataType::Int32) => - cast_int_to_int_macro!(array, eval_mode, Int64Type, Int32Type, from_type, i32, "BIGINT", "INT"), - (DataType::Int64, DataType::Int16) => - cast_int_to_int_macro!(array, eval_mode, Int64Type, Int16Type, from_type, i16, "BIGINT", "SMALLINT"), - (DataType::Int64, DataType::Int8) => - cast_int_to_int_macro!(array, eval_mode, Int64Type, Int8Type, from_type, i8, "BIGINT", "TINYINT"), - (DataType::Int32, DataType::Int16) => - cast_int_to_int_macro!(array, eval_mode, Int32Type, Int16Type, from_type, i16, "INT", "SMALLINT"), - (DataType::Int32, DataType::Int8) => - cast_int_to_int_macro!(array, eval_mode, Int32Type, Int8Type, from_type, i8, "INT", "TINYINT"), - (DataType::Int16, DataType::Int8) => - cast_int_to_int_macro!(array, eval_mode, Int16Type, Int8Type, from_type, i8, "SMALLINT", "TINYINT"), + (DataType::Int64, DataType::Int32) => cast_int_to_int_macro!( + array, eval_mode, Int64Type, Int32Type, from_type, i32, "BIGINT", "INT" + ), + (DataType::Int64, DataType::Int16) => cast_int_to_int_macro!( + array, eval_mode, Int64Type, Int16Type, from_type, i16, "BIGINT", "SMALLINT" + ), + (DataType::Int64, DataType::Int8) => cast_int_to_int_macro!( + array, eval_mode, Int64Type, Int8Type, from_type, i8, "BIGINT", "TINYINT" + ), + (DataType::Int32, DataType::Int16) => cast_int_to_int_macro!( + array, eval_mode, Int32Type, Int16Type, from_type, i16, "INT", "SMALLINT" + ), + (DataType::Int32, DataType::Int8) => cast_int_to_int_macro!( + array, eval_mode, Int32Type, Int8Type, from_type, i8, "INT", "TINYINT" + ), + (DataType::Int16, DataType::Int8) => cast_int_to_int_macro!( + array, eval_mode, Int16Type, Int8Type, from_type, i8, "SMALLINT", "TINYINT" + ), _ => unreachable!( "{}", format!("invalid integer type {to_type} in cast from {from_type}") From 1ab192a6f4ea46c70bb0d57fb99c1fdfa7c112b9 Mon Sep 17 00:00:00 2001 From: "ganesh.maddula" Date: Wed, 1 May 2024 01:57:58 +0530 Subject: [PATCH 04/11] added check for overflow exception for 3.4 below. --- spark/src/test/scala/org/apache/comet/CometCastSuite.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/spark/src/test/scala/org/apache/comet/CometCastSuite.scala b/spark/src/test/scala/org/apache/comet/CometCastSuite.scala index 88cfde590..dff67c355 100644 --- a/spark/src/test/scala/org/apache/comet/CometCastSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometCastSuite.scala @@ -746,7 +746,8 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper { // Comet message is in format `The value 'VALUE' of the type FROM_TYPE cannot be cast to TO_TYPE` // We just check that the comet message contains the same invalid value as the Spark message val sparkInvalidValue = sparkMessage.substring(sparkMessage.indexOf(':') + 2) - assert(cometMessage.contains(sparkInvalidValue)) + assert( + cometMessage.contains(sparkInvalidValue) || cometMessage.contains("overflow")) } } From a3ef6a36aa676adda887ae0917969e07359653d0 Mon Sep 17 00:00:00 2001 From: "ganesh.maddula" Date: Wed, 1 May 2024 23:17:57 +0530 Subject: [PATCH 05/11] added comments to on why we do overflow check. added a check before we fetch the sparkInvalidValue --- .../main/scala/org/apache/comet/Constants.scala | 1 + .../scala/org/apache/comet/CometCastSuite.scala | 16 +++++++++++++--- 2 files changed, 14 insertions(+), 3 deletions(-) diff --git a/common/src/main/scala/org/apache/comet/Constants.scala b/common/src/main/scala/org/apache/comet/Constants.scala index 83b570fc3..b6893a1e0 100644 --- a/common/src/main/scala/org/apache/comet/Constants.scala +++ b/common/src/main/scala/org/apache/comet/Constants.scala @@ -22,4 +22,5 @@ package org.apache.comet object Constants { val LOG_CONF_PATH = "comet.log.file.path" val LOG_CONF_NAME = "log4rs.yaml" + val EMPTY_STRING = "" } diff --git a/spark/src/test/scala/org/apache/comet/CometCastSuite.scala b/spark/src/test/scala/org/apache/comet/CometCastSuite.scala index 412f2f66e..f4426e657 100644 --- a/spark/src/test/scala/org/apache/comet/CometCastSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometCastSuite.scala @@ -19,10 +19,10 @@ package org.apache.comet -import java.io.File +import org.apache.comet.Constants.EMPTY_STRING +import java.io.File import scala.util.Random - import org.apache.spark.sql.{CometTestBase, DataFrame, SaveMode} import org.apache.spark.sql.catalyst.expressions.Cast import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper @@ -831,10 +831,20 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper { } else { // Spark 3.2 and 3.3 have a different error message format so we can't do a direct // comparison between Spark and Comet. + // In the case of CAST_INVALID_INPUT // Spark message is in format `invalid input syntax for type TYPE: VALUE` // Comet message is in format `The value 'VALUE' of the type FROM_TYPE cannot be cast to TO_TYPE` // We just check that the comet message contains the same invalid value as the Spark message - val sparkInvalidValue = sparkMessage.substring(sparkMessage.indexOf(':') + 2) + // In the case of CAST_OVERFLOW + // Spark message is in format `Casting VALUE to TO_TYPE causes overflow` + // Comet message is in format `The value 'VALUE' of the type FROM_TYPE cannot be cast to TO_TYPE + // due to an overflow` + // We check if the comet message contains 'overflow'. + val sparkInvalidValue = if(sparkMessage.indexOf(':') == 0){ + EMPTY_STRING + } else{ + sparkMessage.substring(sparkMessage.indexOf(':') + 2) + } assert( cometMessage.contains(sparkInvalidValue) || cometMessage.contains("overflow")) } From 046985ea2102ef0adb904db1f3ffad941c40dc43 Mon Sep 17 00:00:00 2001 From: "ganesh.maddula" Date: Wed, 1 May 2024 23:20:10 +0530 Subject: [PATCH 06/11] -1 instead of 0, -1 indicates the provided character is not present --- spark/src/test/scala/org/apache/comet/CometCastSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spark/src/test/scala/org/apache/comet/CometCastSuite.scala b/spark/src/test/scala/org/apache/comet/CometCastSuite.scala index f4426e657..a9f281942 100644 --- a/spark/src/test/scala/org/apache/comet/CometCastSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometCastSuite.scala @@ -840,7 +840,7 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper { // Comet message is in format `The value 'VALUE' of the type FROM_TYPE cannot be cast to TO_TYPE // due to an overflow` // We check if the comet message contains 'overflow'. - val sparkInvalidValue = if(sparkMessage.indexOf(':') == 0){ + val sparkInvalidValue = if(sparkMessage.indexOf(':') == -1){ EMPTY_STRING } else{ sparkMessage.substring(sparkMessage.indexOf(':') + 2) From 6f2306aa4c60dd6c0e7ffa6c29b3f90f1c79003f Mon Sep 17 00:00:00 2001 From: "ganesh.maddula" Date: Wed, 1 May 2024 23:24:44 +0530 Subject: [PATCH 07/11] ran mvn spotless:apply --- .../test/scala/org/apache/comet/CometCastSuite.scala | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/spark/src/test/scala/org/apache/comet/CometCastSuite.scala b/spark/src/test/scala/org/apache/comet/CometCastSuite.scala index a9f281942..9defaaef2 100644 --- a/spark/src/test/scala/org/apache/comet/CometCastSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometCastSuite.scala @@ -19,10 +19,10 @@ package org.apache.comet -import org.apache.comet.Constants.EMPTY_STRING - import java.io.File + import scala.util.Random + import org.apache.spark.sql.{CometTestBase, DataFrame, SaveMode} import org.apache.spark.sql.catalyst.expressions.Cast import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper @@ -30,6 +30,8 @@ import org.apache.spark.sql.functions.col import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{DataType, DataTypes} +import org.apache.comet.Constants.EMPTY_STRING + class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper { import testImplicits._ @@ -840,9 +842,9 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper { // Comet message is in format `The value 'VALUE' of the type FROM_TYPE cannot be cast to TO_TYPE // due to an overflow` // We check if the comet message contains 'overflow'. - val sparkInvalidValue = if(sparkMessage.indexOf(':') == -1){ + val sparkInvalidValue = if (sparkMessage.indexOf(':') == -1) { EMPTY_STRING - } else{ + } else { sparkMessage.substring(sparkMessage.indexOf(':') + 2) } assert( From 7b9698d9509f9dec8ebae19acb6131c7f6984eda Mon Sep 17 00:00:00 2001 From: "ganesh.maddula" Date: Fri, 3 May 2024 23:15:38 +0530 Subject: [PATCH 08/11] check for presence of ':' and have asserts accordingly --- .../src/main/scala/org/apache/comet/Constants.scala | 1 - .../test/scala/org/apache/comet/CometCastSuite.scala | 11 ++++------- 2 files changed, 4 insertions(+), 8 deletions(-) diff --git a/common/src/main/scala/org/apache/comet/Constants.scala b/common/src/main/scala/org/apache/comet/Constants.scala index b6893a1e0..83b570fc3 100644 --- a/common/src/main/scala/org/apache/comet/Constants.scala +++ b/common/src/main/scala/org/apache/comet/Constants.scala @@ -22,5 +22,4 @@ package org.apache.comet object Constants { val LOG_CONF_PATH = "comet.log.file.path" val LOG_CONF_NAME = "log4rs.yaml" - val EMPTY_STRING = "" } diff --git a/spark/src/test/scala/org/apache/comet/CometCastSuite.scala b/spark/src/test/scala/org/apache/comet/CometCastSuite.scala index fb76949e8..b0a225d13 100644 --- a/spark/src/test/scala/org/apache/comet/CometCastSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometCastSuite.scala @@ -30,8 +30,6 @@ import org.apache.spark.sql.functions.col import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{DataType, DataTypes} -import org.apache.comet.Constants.EMPTY_STRING - class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper { import testImplicits._ @@ -903,13 +901,12 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper { // Comet message is in format `The value 'VALUE' of the type FROM_TYPE cannot be cast to TO_TYPE // due to an overflow` // We check if the comet message contains 'overflow'. - val sparkInvalidValue = if (sparkMessage.indexOf(':') == -1) { - EMPTY_STRING + if (sparkMessage.indexOf(':') == -1) { + assert(cometMessage.contains("overflow")) } else { - sparkMessage.substring(sparkMessage.indexOf(':') + 2) + assert( + cometMessage.contains(sparkMessage.substring(sparkMessage.indexOf(':') + 2))) } - assert( - cometMessage.contains(sparkInvalidValue) || cometMessage.contains("overflow")) } } From eaca87df85a039cf5475ed3d1b381f09d4dc9e77 Mon Sep 17 00:00:00 2001 From: "ganesh.maddula" Date: Fri, 3 May 2024 23:53:35 +0530 Subject: [PATCH 09/11] reusing exising test functions --- .../org/apache/comet/CometCastSuite.scala | 36 ++++--------------- 1 file changed, 6 insertions(+), 30 deletions(-) diff --git a/spark/src/test/scala/org/apache/comet/CometCastSuite.scala b/spark/src/test/scala/org/apache/comet/CometCastSuite.scala index b0a225d13..cd6213128 100644 --- a/spark/src/test/scala/org/apache/comet/CometCastSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometCastSuite.scala @@ -175,7 +175,7 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper { castTest(generateShorts(), DataTypes.BooleanType) } - ignore("cast ShortType to ByteType") { + test("cast ShortType to ByteType") { // https://github.com/apache/datafusion-comet/issues/311 castTest(generateShorts(), DataTypes.ByteType) } @@ -215,12 +215,12 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper { castTest(generateInts(), DataTypes.BooleanType) } - ignore("cast IntegerType to ByteType") { + test("cast IntegerType to ByteType") { // https://github.com/apache/datafusion-comet/issues/311 castTest(generateInts(), DataTypes.ByteType) } - ignore("cast IntegerType to ShortType") { + test("cast IntegerType to ShortType") { // https://github.com/apache/datafusion-comet/issues/311 castTest(generateInts(), DataTypes.ShortType) } @@ -257,17 +257,17 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper { castTest(generateLongs(), DataTypes.BooleanType) } - ignore("cast LongType to ByteType") { + test("cast LongType to ByteType") { // https://github.com/apache/datafusion-comet/issues/311 castTest(generateLongs(), DataTypes.ByteType) } - ignore("cast LongType to ShortType") { + test("cast LongType to ShortType") { // https://github.com/apache/datafusion-comet/issues/311 castTest(generateLongs(), DataTypes.ShortType) } - ignore("cast LongType to IntegerType") { + test("cast LongType to IntegerType") { // https://github.com/apache/datafusion-comet/issues/311 castTest(generateLongs(), DataTypes.IntegerType) } @@ -665,30 +665,6 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper { castTest(generateTimestamps(), DataTypes.DateType) } - test("cast short to byte") { - castTest(generateShorts, DataTypes.ByteType) - } - - test("cast int to byte") { - castTest(generateInts, DataTypes.ByteType) - } - - test("cast int to short") { - castTest(generateInts, DataTypes.ShortType) - } - - test("cast long to byte") { - castTest(generateLongs, DataTypes.ByteType) - } - - test("cast long to short") { - castTest(generateLongs, DataTypes.ShortType) - } - - test("cast long to int") { - castTest(generateLongs, DataTypes.IntegerType) - } - private def generateFloats(): DataFrame = { val r = new Random(0) val values = Seq( From 2da677c25e0f960c8ea45c0658d1d5cf276ca448 Mon Sep 17 00:00:00 2001 From: "ganesh.maddula" Date: Sat, 4 May 2024 01:16:03 +0530 Subject: [PATCH 10/11] added one more check in assert when ':' is not present --- spark/src/test/scala/org/apache/comet/CometCastSuite.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/spark/src/test/scala/org/apache/comet/CometCastSuite.scala b/spark/src/test/scala/org/apache/comet/CometCastSuite.scala index 8a1e59d28..f5f5e1082 100644 --- a/spark/src/test/scala/org/apache/comet/CometCastSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometCastSuite.scala @@ -935,7 +935,9 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper { // due to an overflow` // We check if the comet message contains 'overflow'. if (sparkMessage.indexOf(':') == -1) { - assert(cometMessage.contains("overflow")) + assert( + cometMessage.contains("overflow") || cometMessage.contains( + "CAST_INVALID_INPUT")) } else { assert( cometMessage.contains(sparkMessage.substring(sparkMessage.indexOf(':') + 2))) From 4ee196398a51d70192f96e9301b8f61d0d026035 Mon Sep 17 00:00:00 2001 From: "ganesh.maddula" Date: Sat, 4 May 2024 02:34:39 +0530 Subject: [PATCH 11/11] redo the compare logic as per andy's suggestions. --- .../org/apache/comet/CometCastSuite.scala | 33 +++++++++---------- 1 file changed, 16 insertions(+), 17 deletions(-) diff --git a/spark/src/test/scala/org/apache/comet/CometCastSuite.scala b/spark/src/test/scala/org/apache/comet/CometCastSuite.scala index f5f5e1082..483301e02 100644 --- a/spark/src/test/scala/org/apache/comet/CometCastSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometCastSuite.scala @@ -921,26 +921,25 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper { val cometMessage = cometException.getCause.getMessage .replace("Execution error: ", "") if (CometSparkSessionExtensions.isSpark34Plus) { + // for Spark 3.4 we expect to reproduce the error message exactly assert(cometMessage == sparkMessage) + } else if (CometSparkSessionExtensions.isSpark33Plus) { + // for Spark 3.3 we just need to strip the prefix from the Comet message + // before comparing + val cometMessageModified = cometMessage + .replace("[CAST_INVALID_INPUT] ", "") + .replace("[CAST_OVERFLOW] ", "") + assert(cometMessageModified == sparkMessage) } else { - // Spark 3.2 and 3.3 have a different error message format so we can't do a direct - // comparison between Spark and Comet. - // In the case of CAST_INVALID_INPUT - // Spark message is in format `invalid input syntax for type TYPE: VALUE` - // Comet message is in format `The value 'VALUE' of the type FROM_TYPE cannot be cast to TO_TYPE` - // We just check that the comet message contains the same invalid value as the Spark message - // In the case of CAST_OVERFLOW - // Spark message is in format `Casting VALUE to TO_TYPE causes overflow` - // Comet message is in format `The value 'VALUE' of the type FROM_TYPE cannot be cast to TO_TYPE - // due to an overflow` - // We check if the comet message contains 'overflow'. - if (sparkMessage.indexOf(':') == -1) { - assert( - cometMessage.contains("overflow") || cometMessage.contains( - "CAST_INVALID_INPUT")) + // for Spark 3.2 we just make sure we are seeing a similar type of error + if (sparkMessage.contains("causes overflow")) { + assert(cometMessage.contains("due to an overflow")) } else { - assert( - cometMessage.contains(sparkMessage.substring(sparkMessage.indexOf(':') + 2))) + // assume that this is an invalid input message in the form: + // `invalid input syntax for type numeric: -9223372036854775809` + // we just check that the Comet message contains the same literal value + val sparkInvalidValue = sparkMessage.substring(sparkMessage.indexOf(':') + 2) + assert(cometMessage.contains(sparkInvalidValue)) } } }