diff --git a/core/src/errors.rs b/core/src/errors.rs index af4fd2697..493880c3e 100644 --- a/core/src/errors.rs +++ b/core/src/errors.rs @@ -152,9 +152,10 @@ pub enum CometError { #[error("{msg}")] Panic { msg: String }, - #[error(transparent)] + #[error("{msg}")] DataFusion { - #[from] + msg: String, + #[source] source: DataFusionError, }, @@ -185,10 +186,19 @@ impl convert::From> for CometError { } } +impl From for CometError { + fn from(value: DataFusionError) -> Self { + CometError::DataFusion { + msg: value.message().to_string(), + source: value, + } + } +} + impl From for DataFusionError { fn from(value: CometError) -> Self { match value { - CometError::DataFusion { source } => source, + CometError::DataFusion { msg: _, source } => source, _ => DataFusionError::Execution(value.to_string()), } } diff --git a/core/src/execution/datafusion/planner.rs b/core/src/execution/datafusion/planner.rs index fcef182a7..e51932154 100644 --- a/core/src/execution/datafusion/planner.rs +++ b/core/src/execution/datafusion/planner.rs @@ -1391,7 +1391,7 @@ impl PhysicalPlanner { impl From for ExecutionError { fn from(value: DataFusionError) -> Self { - ExecutionError::DataFusionError(value.to_string()) + ExecutionError::DataFusionError(value.message().to_string()) } } @@ -1563,6 +1563,7 @@ mod tests { spark_operator, }; + use crate::execution::operators::ExecutionError; use spark_expression::expr::ExprStruct::*; use spark_operator::{operator::OpStruct, Operator}; @@ -1752,6 +1753,14 @@ mod tests { assert!(output.is_empty()); } + #[tokio::test()] + async fn from_datafusion_error_to_comet() { + let err_msg = "exec error"; + let err = datafusion_common::DataFusionError::Execution(err_msg.to_string()); + let comet_err: ExecutionError = err.into(); + assert_eq!(comet_err.to_string(), "Error from DataFusion: exec error."); + } + // Creates a filter operator which takes an `Int32Array` and selects rows that are equal to // `value`. fn create_filter(child_op: spark_operator::Operator, value: i32) -> spark_operator::Operator { diff --git a/core/src/execution/operators/mod.rs b/core/src/execution/operators/mod.rs index 5d05fdb8d..13a0d9627 100644 --- a/core/src/execution/operators/mod.rs +++ b/core/src/execution/operators/mod.rs @@ -38,19 +38,19 @@ pub use copy::*; pub enum ExecutionError { /// Simple error #[allow(dead_code)] - #[error("General execution error with reason {0}.")] + #[error("General execution error with reason: {0}.")] GeneralError(String), /// Error when deserializing an operator. - #[error("Fail to deserialize to native operator with reason {0}.")] + #[error("Fail to deserialize to native operator with reason: {0}.")] DeserializeError(String), /// Error when processing Arrow array. - #[error("Fail to process Arrow array with reason {0}.")] + #[error("Fail to process Arrow array with reason: {0}.")] ArrowError(String), /// DataFusion error - #[error("Error from DataFusion {0}.")] + #[error("Error from DataFusion: {0}.")] DataFusionError(String), } diff --git a/spark/src/test/scala/org/apache/comet/CometCastSuite.scala b/spark/src/test/scala/org/apache/comet/CometCastSuite.scala index 25343f933..71134e550 100644 --- a/spark/src/test/scala/org/apache/comet/CometCastSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometCastSuite.scala @@ -984,9 +984,7 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper { // both systems threw an exception so we make sure they are the same val sparkMessage = if (sparkException.getCause != null) sparkException.getCause.getMessage else null - // We have to workaround https://github.com/apache/datafusion-comet/issues/293 here by - // removing the "Execution error: " error message prefix that is added by DataFusion - val cometMessage = cometException.getCause.getMessage.replace("Execution error: ", "") + val cometMessage = cometException.getCause.getMessage if (CometSparkSessionExtensions.isSpark40Plus) { // for Spark 4 we expect to sparkException carries the message assert(