Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Fix Comet error message #544

Merged
merged 4 commits into from
Jun 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 13 additions & 3 deletions core/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,9 +152,10 @@ pub enum CometError {
#[error("{msg}")]
Panic { msg: String },

#[error(transparent)]
#[error("{msg}")]
DataFusion {
#[from]
msg: String,
#[source]
source: DataFusionError,
},

Expand Down Expand Up @@ -185,10 +186,19 @@ impl convert::From<Box<dyn Any + Send>> for CometError {
}
}

impl From<DataFusionError> for CometError {
fn from(value: DataFusionError) -> Self {
CometError::DataFusion {
msg: value.message().to_string(),
source: value,
}
}
}
comphead marked this conversation as resolved.
Show resolved Hide resolved

impl From<CometError> for DataFusionError {
fn from(value: CometError) -> Self {
match value {
CometError::DataFusion { source } => source,
CometError::DataFusion { msg: _, source } => source,
_ => DataFusionError::Execution(value.to_string()),
}
}
Expand Down
11 changes: 10 additions & 1 deletion core/src/execution/datafusion/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1382,7 +1382,7 @@ impl PhysicalPlanner {

impl From<DataFusionError> for ExecutionError {
fn from(value: DataFusionError) -> Self {
ExecutionError::DataFusionError(value.to_string())
ExecutionError::DataFusionError(value.message().to_string())
}
}

Expand Down Expand Up @@ -1554,6 +1554,7 @@ mod tests {
spark_operator,
};

use crate::execution::operators::ExecutionError;
use spark_expression::expr::ExprStruct::*;
use spark_operator::{operator::OpStruct, Operator};

Expand Down Expand Up @@ -1743,6 +1744,14 @@ mod tests {
assert!(output.is_empty());
}

#[tokio::test()]
async fn from_datafusion_error_to_comet() {
let err_msg = "exec error";
let err = datafusion_common::DataFusionError::Execution(err_msg.to_string());
let comet_err: ExecutionError = err.into();
assert_eq!(comet_err.to_string(), "Error from DataFusion: exec error.");
}

// Creates a filter operator which takes an `Int32Array` and selects rows that are equal to
// `value`.
fn create_filter(child_op: spark_operator::Operator, value: i32) -> spark_operator::Operator {
Expand Down
8 changes: 4 additions & 4 deletions core/src/execution/operators/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,19 +38,19 @@ pub use copy::*;
pub enum ExecutionError {
/// Simple error
#[allow(dead_code)]
#[error("General execution error with reason {0}.")]
#[error("General execution error with reason: {0}.")]
GeneralError(String),

/// Error when deserializing an operator.
#[error("Fail to deserialize to native operator with reason {0}.")]
#[error("Fail to deserialize to native operator with reason: {0}.")]
DeserializeError(String),

/// Error when processing Arrow array.
#[error("Fail to process Arrow array with reason {0}.")]
#[error("Fail to process Arrow array with reason: {0}.")]
ArrowError(String),

/// DataFusion error
#[error("Error from DataFusion {0}.")]
#[error("Error from DataFusion: {0}.")]
DataFusionError(String),
}

Expand Down
4 changes: 1 addition & 3 deletions spark/src/test/scala/org/apache/comet/CometCastSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -984,9 +984,7 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper {
// both systems threw an exception so we make sure they are the same
val sparkMessage =
if (sparkException.getCause != null) sparkException.getCause.getMessage else null
// We have to workaround https://github.com/apache/datafusion-comet/issues/293 here by
// removing the "Execution error: " error message prefix that is added by DataFusion
val cometMessage = cometException.getCause.getMessage.replace("Execution error: ", "")
val cometMessage = cometException.getCause.getMessage
if (CometSparkSessionExtensions.isSpark40Plus) {
// for Spark 4 we expect to sparkException carries the message
assert(
Expand Down
Loading