Skip to content

Commit

Permalink
fix: Remove export
Browse files Browse the repository at this point in the history
  • Loading branch information
kazuyukitanimura committed Nov 8, 2024
1 parent 5ac4408 commit 10ffc64
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,8 @@ class NativeUtil {
provider,
arrowArray,
arrowSchema)
builder += arrowArray.memoryAddress()
builder += arrowSchema.memoryAddress()
case c =>
throw new SparkException(
"Comet execution only takes Arrow Arrays, but got " +
Expand Down
36 changes: 32 additions & 4 deletions native/core/src/execution/operators/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,8 @@ use datafusion::{
physical_plan::{ExecutionPlan, *},
};
use datafusion_common::{arrow_datafusion_err, DataFusionError, Result as DataFusionResult};
use jni::objects::{GlobalRef, JObject};
use jni::objects::{JLongArray, ReleaseMode};
use jni::sys::jlongArray;
use jni::objects::{GlobalRef, JLongArray, JObject, JValueGen, ReleaseMode};
use jni::sys::{jlongArray, jsize};

/// ScanExec reads batches of data from Spark via JNI. The source of the scan could be a file
/// scan or the result of reading a broadcast or shuffle exchange.
Expand Down Expand Up @@ -183,9 +182,38 @@ impl ScanExec {

let mut env = JVMClasses::get_env()?;

let num_cols = data_types.len();
let mut array_addrs = Vec::with_capacity(num_cols);
let mut schema_addrs = Vec::with_capacity(num_cols);

for _ in 0..num_cols {
let arrow_array = Rc::new(FFI_ArrowArray::empty());
let arrow_schema = Rc::new(FFI_ArrowSchema::empty());
let (array_ptr, schema_ptr) = (
Rc::into_raw(arrow_array) as i64,
Rc::into_raw(arrow_schema) as i64,
);

array_addrs.push(array_ptr);
schema_addrs.push(schema_ptr);
}

// Prepare the java array parameters
let long_array_addrs = env.new_long_array(num_cols as jsize)?;
let long_schema_addrs = env.new_long_array(num_cols as jsize)?;

env.set_long_array_region(&long_array_addrs, 0, &array_addrs)?;
env.set_long_array_region(&long_schema_addrs, 0, &schema_addrs)?;

let array_obj = JObject::from(long_array_addrs);
let schema_obj = JObject::from(long_schema_addrs);

let array_obj = JValueGen::Object(array_obj.as_ref());
let schema_obj = JValueGen::Object(schema_obj.as_ref());

let batch_object: JObject = unsafe {
jni_call!(&mut env,
comet_batch_iterator(iter).next() -> JObject)?
comet_batch_iterator(iter).next(array_obj, schema_obj) -> JObject)?
};

if batch_object.is_null() {
Expand Down
2 changes: 1 addition & 1 deletion native/core/src/jvm_bridge/batch_iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ impl<'a> CometBatchIterator<'a> {

Ok(CometBatchIterator {
class,
method_next: env.get_method_id(Self::JVM_CLASS, "next", "([J[J)I")?,
method_next: env.get_method_id(Self::JVM_CLASS, "next", "([J[J)[J")?,
method_next_ret: ReturnType::Array,
})
}
Expand Down

0 comments on commit 10ffc64

Please sign in to comment.