From 10ffc64c9bbdfd69648fec7609d1674c49a1448b Mon Sep 17 00:00:00 2001 From: Kazuyuki Tanimura Date: Fri, 8 Nov 2024 00:29:08 -0800 Subject: [PATCH] fix: Remove export --- .../org/apache/comet/vector/NativeUtil.scala | 2 ++ native/core/src/execution/operators/scan.rs | 36 ++++++++++++++++--- native/core/src/jvm_bridge/batch_iterator.rs | 2 +- 3 files changed, 35 insertions(+), 5 deletions(-) diff --git a/common/src/main/scala/org/apache/comet/vector/NativeUtil.scala b/common/src/main/scala/org/apache/comet/vector/NativeUtil.scala index a12595965..9cceefd43 100644 --- a/common/src/main/scala/org/apache/comet/vector/NativeUtil.scala +++ b/common/src/main/scala/org/apache/comet/vector/NativeUtil.scala @@ -122,6 +122,8 @@ class NativeUtil { provider, arrowArray, arrowSchema) + builder += arrowArray.memoryAddress() + builder += arrowSchema.memoryAddress() case c => throw new SparkException( "Comet execution only takes Arrow Arrays, but got " + diff --git a/native/core/src/execution/operators/scan.rs b/native/core/src/execution/operators/scan.rs index 361d29f32..cdfa4429e 100644 --- a/native/core/src/execution/operators/scan.rs +++ b/native/core/src/execution/operators/scan.rs @@ -48,9 +48,8 @@ use datafusion::{ physical_plan::{ExecutionPlan, *}, }; use datafusion_common::{arrow_datafusion_err, DataFusionError, Result as DataFusionResult}; -use jni::objects::{GlobalRef, JObject}; -use jni::objects::{JLongArray, ReleaseMode}; -use jni::sys::jlongArray; +use jni::objects::{GlobalRef, JLongArray, JObject, JValueGen, ReleaseMode}; +use jni::sys::{jlongArray, jsize}; /// ScanExec reads batches of data from Spark via JNI. The source of the scan could be a file /// scan or the result of reading a broadcast or shuffle exchange. @@ -183,9 +182,38 @@ impl ScanExec { let mut env = JVMClasses::get_env()?; + let num_cols = data_types.len(); + let mut array_addrs = Vec::with_capacity(num_cols); + let mut schema_addrs = Vec::with_capacity(num_cols); + + for _ in 0..num_cols { + let arrow_array = Rc::new(FFI_ArrowArray::empty()); + let arrow_schema = Rc::new(FFI_ArrowSchema::empty()); + let (array_ptr, schema_ptr) = ( + Rc::into_raw(arrow_array) as i64, + Rc::into_raw(arrow_schema) as i64, + ); + + array_addrs.push(array_ptr); + schema_addrs.push(schema_ptr); + } + + // Prepare the java array parameters + let long_array_addrs = env.new_long_array(num_cols as jsize)?; + let long_schema_addrs = env.new_long_array(num_cols as jsize)?; + + env.set_long_array_region(&long_array_addrs, 0, &array_addrs)?; + env.set_long_array_region(&long_schema_addrs, 0, &schema_addrs)?; + + let array_obj = JObject::from(long_array_addrs); + let schema_obj = JObject::from(long_schema_addrs); + + let array_obj = JValueGen::Object(array_obj.as_ref()); + let schema_obj = JValueGen::Object(schema_obj.as_ref()); + let batch_object: JObject = unsafe { jni_call!(&mut env, - comet_batch_iterator(iter).next() -> JObject)? + comet_batch_iterator(iter).next(array_obj, schema_obj) -> JObject)? }; if batch_object.is_null() { diff --git a/native/core/src/jvm_bridge/batch_iterator.rs b/native/core/src/jvm_bridge/batch_iterator.rs index f4e06c1de..011fc5cd4 100644 --- a/native/core/src/jvm_bridge/batch_iterator.rs +++ b/native/core/src/jvm_bridge/batch_iterator.rs @@ -37,7 +37,7 @@ impl<'a> CometBatchIterator<'a> { Ok(CometBatchIterator { class, - method_next: env.get_method_id(Self::JVM_CLASS, "next", "([J[J)I")?, + method_next: env.get_method_id(Self::JVM_CLASS, "next", "([J[J)[J")?, method_next_ret: ReturnType::Array, }) }