Skip to content

Commit

Permalink
build: Enable spark-4.0 Spark tests
Browse files Browse the repository at this point in the history
  • Loading branch information
kazuyukitanimura committed Jun 5, 2024
1 parent 35b4e6a commit b0d6d04
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 15 deletions.
37 changes: 30 additions & 7 deletions core/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use arrow::error::ArrowError;
use datafusion_common::DataFusionError;
use jni::errors::{Exception, ToException};
use regex::Regex;

use std::{
any::Any,
convert,
Expand All @@ -37,8 +38,8 @@ use std::{
use jni::sys::{jboolean, jbyte, jchar, jdouble, jfloat, jint, jlong, jobject, jshort};

use crate::execution::operators::ExecutionError;
use jni::JNIEnv;
use jni::objects::{GlobalRef, JThrowable};
use jni::JNIEnv;
use lazy_static::lazy_static;
use parquet::errors::ParquetError;
use thiserror::Error;
Expand Down Expand Up @@ -160,7 +161,11 @@ pub enum CometError {
},

#[error("{class}: {msg}")]
JavaException { class: String, msg: String, throwable: GlobalRef },
JavaException {
class: String,
msg: String,
throwable: GlobalRef,
},
}

pub fn init() {
Expand Down Expand Up @@ -199,6 +204,15 @@ impl From<CometError> for ExecutionError {
fn from(value: CometError) -> Self {
match value {
CometError::Execution { source } => source,
CometError::JavaException {
class,
msg,
throwable,
} => ExecutionError::JavaException {
class,
msg,
throwable,
},
_ => ExecutionError::GeneralError(value.to_string()),
}
}
Expand Down Expand Up @@ -375,10 +389,19 @@ fn throw_exception(env: &mut JNIEnv, error: &CometError, backtrace: Option<Strin
if env.exception_check().is_ok() {
// ... then throw new exception
match error {
CometError::JavaException { class: _, msg: _, throwable } => {
let obj = env.new_local_ref(throwable).unwrap();
env.throw(JThrowable::from(obj))
}
CometError::JavaException {
class: _,
msg: _,
throwable,
} => env.throw(<&JThrowable>::from(throwable.as_obj())),
CometError::Execution {
source:
ExecutionError::JavaException {
class: _,
msg: _,
throwable,
},
} => env.throw(<&JThrowable>::from(throwable.as_obj())),
_ => {
let exception = error.to_exception();
match backtrace {
Expand All @@ -390,7 +413,7 @@ fn throw_exception(env: &mut JNIEnv, error: &CometError, backtrace: Option<Strin
}
}
}
.expect("Thrown exception")
.expect("Thrown exception")
}
}

Expand Down
8 changes: 8 additions & 0 deletions core/src/execution/operators/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use arrow::{

use arrow::compute::{cast_with_options, CastOptions};
use arrow_schema::ArrowError;
use jni::objects::GlobalRef;
use std::{fmt::Debug, sync::Arc};

mod scan;
Expand Down Expand Up @@ -52,6 +53,13 @@ pub enum ExecutionError {
/// DataFusion error
#[error("Error from DataFusion {0}.")]
DataFusionError(String),

#[error("{class}: {msg}")]
JavaException {
class: String,
msg: String,
throwable: GlobalRef,
},
}

/// Copy an Arrow Array
Expand Down
3 changes: 0 additions & 3 deletions core/src/jvm_bridge/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -382,9 +382,6 @@ pub(crate) fn convert_exception(
let exception_class_name_str = get_throwable_class_name(env, cache, throwable)?;
let message_str = get_throwable_message(env, cache, throwable)?;

// let obj = env.new_local_ref(throwable).unwrap();
// let obj = env.alloc_object(JClass::from(obj)).unwrap();

Ok(CometError::JavaException {
class: exception_class_name_str,
msg: message_str,
Expand Down
10 changes: 5 additions & 5 deletions dev/diffs/4.0.0.diff
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
diff --git a/pom.xml b/pom.xml
index 36d12b9b528..4a459825eb2 100644
index bd384e42b0e..a8a8988378b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -147,6 +147,8 @@
Expand All @@ -11,7 +11,7 @@ index 36d12b9b528..4a459825eb2 100644
<!--
If you change codahale.metrics.version, you also need to change
the link to metrics.dropwizard.io in docs/monitoring.md.
@@ -2848,6 +2850,25 @@
@@ -2843,6 +2845,25 @@
<artifactId>arpack</artifactId>
<version>${netlib.ludovic.dev.version}</version>
</dependency>
Expand Down Expand Up @@ -311,7 +311,7 @@ index 760ee802608..db4dc90475e 100644
sql(
"""
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
index 16a493b5290..3f0b70e2d59 100644
index 10d6f045db3..03a63fd2860 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
@@ -42,7 +42,7 @@ import org.apache.spark.sql.catalyst.trees.DataFrameQueryContext
Expand All @@ -323,7 +323,7 @@ index 16a493b5290..3f0b70e2d59 100644
import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.functions._
@@ -2360,7 +2360,7 @@ class DatasetSuite extends QueryTest
@@ -2375,7 +2375,7 @@ class DatasetSuite extends QueryTest

// Assert that no extra shuffle introduced by cogroup.
val exchanges = collect(df3.queryExecution.executedPlan) {
Expand Down Expand Up @@ -1083,7 +1083,7 @@ index 3aaf61ffba4..93752e2a535 100644
val df = spark.read.parquet(path).selectExpr(projection: _*)

diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
index a7efd0aa75e..fa65bda2051 100644
index 4e1e171c8a8..3582bd17c82 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
@@ -28,11 +28,13 @@ import org.apache.spark.SparkException
Expand Down

0 comments on commit b0d6d04

Please sign in to comment.