Skip to content

Commit

Permalink
move cast to spark-expr crate
Browse files Browse the repository at this point in the history
  • Loading branch information
andygrove committed Jul 11, 2024
1 parent 1e4eb74 commit 25dc0df
Show file tree
Hide file tree
Showing 8 changed files with 34 additions and 40 deletions.
5 changes: 5 additions & 0 deletions native/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions native/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ datafusion-comet-spark-expr = { path = "spark-expr", version = "0.1.0" }
datafusion-comet-utils = { path = "utils", version = "0.1.0" }
chrono = { version = "0.4", default-features = false, features = ["clock"] }
chrono-tz = { version = "0.8" }
num = "0.4"
regex = "1.9.6"

[profile.release]
debug = true
Expand Down
4 changes: 2 additions & 2 deletions native/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ flate2 = "1.0"
lz4 = "1.24"
zstd = "0.11"
rand = "0.8"
num = "0.4"
num = { workspace = true }
bytes = "1.5.0"
tempfile = "3.8.0"
ahash = { version = "0.8", default-features = false }
Expand All @@ -74,7 +74,7 @@ datafusion-physical-expr-common = { workspace = true }
datafusion-physical-expr = { workspace = true }
unicode-segmentation = "^1.10.1"
once_cell = "1.18.0"
regex = "1.9.6"
regex = { workspace = true }
crc32fast = "1.3.2"
simd-adler32 = "0.3.7"
datafusion-comet-spark-expr = { workspace = true }
Expand Down
31 changes: 5 additions & 26 deletions native/core/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use std::{
use jni::sys::{jboolean, jbyte, jchar, jdouble, jfloat, jint, jlong, jobject, jshort};

use crate::execution::operators::ExecutionError;
use datafusion_comet_spark_expr::SparkError;
use jni::objects::{GlobalRef, JThrowable};
use jni::JNIEnv;
use lazy_static::lazy_static;
Expand All @@ -62,34 +63,13 @@ pub enum CometError {
#[error("Comet Internal Error: {0}")]
Internal(String),

// Note that this message format is based on Spark 3.4 and is more detailed than the message
// returned by Spark 3.3
#[error("[CAST_INVALID_INPUT] The value '{value}' of the type \"{from_type}\" cannot be cast to \"{to_type}\" \
because it is malformed. Correct the value as per the syntax, or change its target type. \
Use `try_cast` to tolerate malformed input and return NULL instead. If necessary \
set \"spark.sql.ansi.enabled\" to \"false\" to bypass this error.")]
CastInvalidValue {
value: String,
from_type: String,
to_type: String,
},

#[error("[NUMERIC_VALUE_OUT_OF_RANGE] {value} cannot be represented as Decimal({precision}, {scale}). If necessary set \"spark.sql.ansi.enabled\" to \"false\" to bypass this error, and return NULL instead.")]
NumericValueOutOfRange {
value: String,
precision: u8,
scale: i8,
},

#[error("[CAST_OVERFLOW] The value {value} of the type \"{from_type}\" cannot be cast to \"{to_type}\" \
due to an overflow. Use `try_cast` to tolerate overflow and return NULL instead. If necessary \
set \"spark.sql.ansi.enabled\" to \"false\" to bypass this error.")]
CastOverFlow {
value: String,
from_type: String,
to_type: String,
},

#[error("[ARITHMETIC_OVERFLOW] {from_type} overflow. If necessary set \"spark.sql.ansi.enabled\" to \"false\" to bypass this error.")]
ArithmeticOverflow { from_type: String },

Expand Down Expand Up @@ -167,6 +147,9 @@ pub enum CometError {
msg: String,
throwable: GlobalRef,
},

#[error(transparent)]
Spark(SparkError),
}

pub fn init() {
Expand Down Expand Up @@ -239,11 +222,7 @@ impl jni::errors::ToException for CometError {
class: "java/lang/NullPointerException".to_string(),
msg: self.to_string(),
},
CometError::CastInvalidValue { .. } => Exception {
class: "org/apache/spark/SparkException".to_string(),
msg: self.to_string(),
},
CometError::NumericValueOutOfRange { .. } => Exception {
CometError::Spark(_) => Exception {
class: "org/apache/spark/SparkException".to_string(),
msg: self.to_string(),
},
Expand Down
2 changes: 1 addition & 1 deletion native/core/src/execution/datafusion/expressions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
//! Native DataFusion expressions
pub mod bitwise_not;
pub mod cast;
pub use datafusion_comet_spark_expr::cast;
pub mod checkoverflow;
mod normalize_nan;
pub mod scalar_funcs;
Expand Down
5 changes: 5 additions & 0 deletions native/spark-expr/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,17 @@ edition = { workspace = true }

[dependencies]
arrow = { workspace = true }
arrow-array = { workspace = true }
arrow-schema = { workspace = true }
chrono = { workspace = true }
chrono-tz = { workspace = true }
datafusion = { workspace = true }
datafusion-common = { workspace = true }
datafusion-functions = { workspace = true }
datafusion-physical-expr = { workspace = true }
datafusion-comet-utils = { workspace = true }
num = { workspace = true }
regex = { workspace = true }

[lib]
name = "datafusion_comet_spark_expr"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use std::{
};

use super::EvalMode;
use crate::{SparkError, SparkResult};
use arrow::{
compute::{cast_with_options, unary, CastOptions},
datatypes::{
Expand All @@ -43,11 +44,9 @@ use arrow_array::{
use arrow_schema::{DataType, Schema};
use chrono::{NaiveDate, NaiveDateTime, TimeZone, Timelike};
use datafusion::logical_expr::ColumnarValue;
use datafusion_comet_spark_expr::{SparkError, SparkResult};
use datafusion_comet_utils::{array_with_timezone, down_cast_any_ref};
use datafusion_common::{
cast::as_generic_string_array, internal_err, DataFusionError, Result as DataFusionResult,
ScalarValue,
cast::as_generic_string_array, internal_err, Result as DataFusionResult, ScalarValue,
};
use datafusion_physical_expr::PhysicalExpr;
use num::{
Expand Down Expand Up @@ -605,11 +604,7 @@ impl Cast {
)))
}
};
Ok(spark_cast(
cast_result.map_err(|e| <SparkError as Into<DataFusionError>>::into(e))?,
from_type,
to_type,
))
Ok(spark_cast(cast_result?, from_type, to_type))
}

/// Determines if DataFusion supports the given cast in a way that is
Expand Down
14 changes: 11 additions & 3 deletions native/spark-expr/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use std::error::Error;
use std::fmt::{Display, Formatter};

mod abs;
pub mod cast;
mod if_expr;

pub use abs::Abs;
Expand Down Expand Up @@ -88,9 +89,16 @@ impl Display for SparkError {
match self {
Self::ArithmeticOverflow(data_type) =>
write!(f, "[ARITHMETIC_OVERFLOW] {} overflow. If necessary set \"spark.sql.ansi.enabled\" to \"false\" to bypass this error.", data_type),
Self::CastOverFlow { .. } => todo!(),
Self::Internal(_) => todo!(),
_ => todo!()
Self::CastOverFlow { value, from_type, to_type } => write!(f, "[CAST_OVERFLOW] The value {value} of the type \"{from_type}\" cannot be cast to \"{to_type}\" \
due to an overflow. Use `try_cast` to tolerate overflow and return NULL instead. If necessary \
set \"spark.sql.ansi.enabled\" to \"false\" to bypass this error."),
Self::CastInvalidValue { value, from_type, to_type } => write!(f, "[CAST_INVALID_INPUT] The value '{value}' of the type \"{from_type}\" cannot be cast to \"{to_type}\" \
because it is malformed. Correct the value as per the syntax, or change its target type. \
Use `try_cast` to tolerate malformed input and return NULL instead. If necessary \
set \"spark.sql.ansi.enabled\" to \"false\" to bypass this error."),
Self::NumericValueOutOfRange { value, precision, scale } => write!(f, "[NUMERIC_VALUE_OUT_OF_RANGE] {value} cannot be represented as Decimal({precision}, {scale}). If necessary set \"spark.sql.ansi.enabled\" to \"false\" to bypass this error, and return NULL instead."),
Self::Arrow(e) => write!(f, "ArrowError: {e}"),
Self::Internal(e) => write!(f, "{e}"),
}
}
}

0 comments on commit 25dc0df

Please sign in to comment.