Skip to content

Commit

Permalink
feat: Fix Comet error message (apache#544)
Browse files Browse the repository at this point in the history
* fix error message


Co-authored-by: Andy Grove <[email protected]>

---------

Co-authored-by: Andy Grove <[email protected]>
  • Loading branch information
2 people authored and kazuyukitanimura committed Jul 1, 2024
1 parent 7080d3c commit 4b76943
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 11 deletions.
16 changes: 13 additions & 3 deletions core/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,9 +152,10 @@ pub enum CometError {
#[error("{msg}")]
Panic { msg: String },

#[error(transparent)]
#[error("{msg}")]
DataFusion {
#[from]
msg: String,
#[source]
source: DataFusionError,
},

Expand Down Expand Up @@ -185,10 +186,19 @@ impl convert::From<Box<dyn Any + Send>> for CometError {
}
}

impl From<DataFusionError> for CometError {
fn from(value: DataFusionError) -> Self {
CometError::DataFusion {
msg: value.message().to_string(),
source: value,
}
}
}

impl From<CometError> for DataFusionError {
fn from(value: CometError) -> Self {
match value {
CometError::DataFusion { source } => source,
CometError::DataFusion { msg: _, source } => source,
_ => DataFusionError::Execution(value.to_string()),
}
}
Expand Down
11 changes: 10 additions & 1 deletion core/src/execution/datafusion/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1391,7 +1391,7 @@ impl PhysicalPlanner {

impl From<DataFusionError> for ExecutionError {
fn from(value: DataFusionError) -> Self {
ExecutionError::DataFusionError(value.to_string())
ExecutionError::DataFusionError(value.message().to_string())
}
}

Expand Down Expand Up @@ -1563,6 +1563,7 @@ mod tests {
spark_operator,
};

use crate::execution::operators::ExecutionError;
use spark_expression::expr::ExprStruct::*;
use spark_operator::{operator::OpStruct, Operator};

Expand Down Expand Up @@ -1752,6 +1753,14 @@ mod tests {
assert!(output.is_empty());
}

#[tokio::test()]
async fn from_datafusion_error_to_comet() {
let err_msg = "exec error";
let err = datafusion_common::DataFusionError::Execution(err_msg.to_string());
let comet_err: ExecutionError = err.into();
assert_eq!(comet_err.to_string(), "Error from DataFusion: exec error.");
}

// Creates a filter operator which takes an `Int32Array` and selects rows that are equal to
// `value`.
fn create_filter(child_op: spark_operator::Operator, value: i32) -> spark_operator::Operator {
Expand Down
8 changes: 4 additions & 4 deletions core/src/execution/operators/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,19 +38,19 @@ pub use copy::*;
pub enum ExecutionError {
/// Simple error
#[allow(dead_code)]
#[error("General execution error with reason {0}.")]
#[error("General execution error with reason: {0}.")]
GeneralError(String),

/// Error when deserializing an operator.
#[error("Fail to deserialize to native operator with reason {0}.")]
#[error("Fail to deserialize to native operator with reason: {0}.")]
DeserializeError(String),

/// Error when processing Arrow array.
#[error("Fail to process Arrow array with reason {0}.")]
#[error("Fail to process Arrow array with reason: {0}.")]
ArrowError(String),

/// DataFusion error
#[error("Error from DataFusion {0}.")]
#[error("Error from DataFusion: {0}.")]
DataFusionError(String),
}

Expand Down
4 changes: 1 addition & 3 deletions spark/src/test/scala/org/apache/comet/CometCastSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -984,9 +984,7 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper {
// both systems threw an exception so we make sure they are the same
val sparkMessage =
if (sparkException.getCause != null) sparkException.getCause.getMessage else null
// We have to workaround https://github.com/apache/datafusion-comet/issues/293 here by
// removing the "Execution error: " error message prefix that is added by DataFusion
val cometMessage = cometException.getCause.getMessage.replace("Execution error: ", "")
val cometMessage = cometException.getCause.getMessage
if (CometSparkSessionExtensions.isSpark40Plus) {
// for Spark 4 we expect to sparkException carries the message
assert(
Expand Down

0 comments on commit 4b76943

Please sign in to comment.