From df03371d585852ff9456b52d7d4bed9dd4a16d6f Mon Sep 17 00:00:00 2001 From: Kazuyuki Tanimura Date: Mon, 4 Nov 2024 10:33:35 -0800 Subject: [PATCH] fix: Remove export --- .../comet/parquet/AbstractColumnReader.java | 2 +- .../apache/comet/parquet/ColumnReader.java | 99 ++++++++++--------- .../comet/parquet/MetadataColumnReader.java | 19 ++-- .../java/org/apache/comet/parquet/Native.java | 6 +- .../org/apache/comet/vector/NativeUtil.scala | 6 +- native/core/src/parquet/mod.rs | 25 ++--- 6 files changed, 82 insertions(+), 75 deletions(-) diff --git a/common/src/main/java/org/apache/comet/parquet/AbstractColumnReader.java b/common/src/main/java/org/apache/comet/parquet/AbstractColumnReader.java index f8a72bc84..099c7b973 100644 --- a/common/src/main/java/org/apache/comet/parquet/AbstractColumnReader.java +++ b/common/src/main/java/org/apache/comet/parquet/AbstractColumnReader.java @@ -99,7 +99,7 @@ public void setBatchSize(int batchSize) { public void close() { if (nativeHandle != 0) { LOG.debug("Closing the column reader"); - // Native.closeColumnReader(nativeHandle); + Native.closeColumnReader(nativeHandle); nativeHandle = 0; } } diff --git a/common/src/main/java/org/apache/comet/parquet/ColumnReader.java b/common/src/main/java/org/apache/comet/parquet/ColumnReader.java index 2da6e19bf..d30befd74 100644 --- a/common/src/main/java/org/apache/comet/parquet/ColumnReader.java +++ b/common/src/main/java/org/apache/comet/parquet/ColumnReader.java @@ -26,7 +26,8 @@ import org.slf4j.LoggerFactory; import org.apache.arrow.c.*; -import org.apache.arrow.c.NativeUtil; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; import org.apache.arrow.vector.FieldVector; import org.apache.arrow.vector.dictionary.Dictionary; import org.apache.arrow.vector.types.pojo.DictionaryEncoding; @@ -45,6 +46,7 @@ public class ColumnReader extends AbstractColumnReader { protected static final Logger LOG = LoggerFactory.getLogger(ColumnReader.class); + protected final BufferAllocator ALLOCATOR = new RootAllocator(); /** * The current Comet vector holding all the values read by this column reader. Owned by this @@ -84,6 +86,9 @@ public class ColumnReader extends AbstractColumnReader { private final boolean hasNativeOperations; + private ArrowArray array = null; + private ArrowSchema schema = null; + public ColumnReader( DataType type, ColumnDescriptor descriptor, @@ -169,12 +174,15 @@ public CometVector loadVector() { if (currentVector != null) { currentVector.close(); } - long[] addrs = Native.currentBatch(nativeHandle); - ArrowArray array = ArrowArray.wrap(addrs[0]); - ArrowSchema schema = ArrowSchema.wrap(addrs[1]); - ArrowSchema.Snapshot snapshot = schema.snapshot(); - String format = NativeUtil.toJavaString(snapshot.format); - currentVector = new CometNativeVector(null, useDecimal128, addrs[0], addrs[1]); + + array = ArrowArray.allocateNew(ALLOCATOR); + schema = ArrowSchema.allocateNew(ALLOCATOR); + + long arrayAddr = array.memoryAddress(); + long schemaAddr = schema.memoryAddress(); + + Native.currentBatch(nativeHandle, arrayAddr, schemaAddr); + currentVector = new CometNativeVector(null, useDecimal128, arrayAddr, schemaAddr); return currentVector; } @@ -213,53 +221,56 @@ public CometVector loadVector() { boolean isUuid = logicalTypeAnnotation instanceof LogicalTypeAnnotation.UUIDLogicalTypeAnnotation; - long[] addresses = Native.currentBatch(nativeHandle); + array = ArrowArray.allocateNew(ALLOCATOR); + schema = ArrowSchema.allocateNew(ALLOCATOR); - try (ArrowArray array = ArrowArray.wrap(addresses[0]); - ArrowSchema schema = ArrowSchema.wrap(addresses[1])) { - FieldVector vector = importer.importVector(array, schema); + long arrayAddr = array.memoryAddress(); + long schemaAddr = schema.memoryAddress(); - DictionaryEncoding dictionaryEncoding = vector.getField().getDictionary(); + Native.currentBatch(nativeHandle, arrayAddr, schemaAddr); - CometPlainVector cometVector = new CometPlainVector(vector, useDecimal128); + FieldVector vector = importer.importVector(array, schema); - // Update whether the current vector contains any null values. This is used in the following - // batch(s) to determine whether we can skip loading the native vector. - hadNull = cometVector.hasNull(); + DictionaryEncoding dictionaryEncoding = vector.getField().getDictionary(); - if (dictionaryEncoding == null) { - if (dictionary != null) { - // This means the column was using dictionary encoding but now has fall-back to plain - // encoding, on the native side. Setting 'dictionary' to null here, so we can use it as - // a condition to check if we can re-use vector later. - dictionary = null; - } - // Either the column is not dictionary encoded, or it was using dictionary encoding but - // a new data page has switched back to use plain encoding. For both cases we should - // return plain vector. - currentVector = cometVector; - return currentVector; - } + CometPlainVector cometVector = new CometPlainVector(vector, useDecimal128); + + // Update whether the current vector contains any null values. This is used in the following + // batch(s) to determine whether we can skip loading the native vector. + hadNull = cometVector.hasNull(); - // We should already re-initiate `CometDictionary` here because `Data.importVector` API will - // release the previous dictionary vector and create a new one. - Dictionary arrowDictionary = importer.getProvider().lookup(dictionaryEncoding.getId()); - CometPlainVector dictionaryVector = - new CometPlainVector(arrowDictionary.getVector(), useDecimal128, isUuid); + if (dictionaryEncoding == null) { if (dictionary != null) { - dictionary.setDictionaryVector(dictionaryVector); - } else { - dictionary = new CometDictionary(dictionaryVector); + // This means the column was using dictionary encoding but now has fall-back to plain + // encoding, on the native side. Setting 'dictionary' to null here, so we can use it as + // a condition to check if we can re-use vector later. + dictionary = null; } - - currentVector = - new CometDictionaryVector( - cometVector, dictionary, importer.getProvider(), useDecimal128, false, isUuid); - - currentVector = - new CometDictionaryVector(cometVector, dictionary, importer.getProvider(), useDecimal128); + // Either the column is not dictionary encoded, or it was using dictionary encoding but + // a new data page has switched back to use plain encoding. For both cases we should + // return plain vector. + currentVector = cometVector; return currentVector; } + + // We should already re-initiate `CometDictionary` here because `Data.importVector` API will + // release the previous dictionary vector and create a new one. + Dictionary arrowDictionary = importer.getProvider().lookup(dictionaryEncoding.getId()); + CometPlainVector dictionaryVector = + new CometPlainVector(arrowDictionary.getVector(), useDecimal128, isUuid); + if (dictionary != null) { + dictionary.setDictionaryVector(dictionaryVector); + } else { + dictionary = new CometDictionary(dictionaryVector); + } + + currentVector = + new CometDictionaryVector( + cometVector, dictionary, importer.getProvider(), useDecimal128, false, isUuid); + + currentVector = + new CometDictionaryVector(cometVector, dictionary, importer.getProvider(), useDecimal128); + return currentVector; } protected void readPage() { diff --git a/common/src/main/java/org/apache/comet/parquet/MetadataColumnReader.java b/common/src/main/java/org/apache/comet/parquet/MetadataColumnReader.java index b8722ca78..688970ae5 100644 --- a/common/src/main/java/org/apache/comet/parquet/MetadataColumnReader.java +++ b/common/src/main/java/org/apache/comet/parquet/MetadataColumnReader.java @@ -34,8 +34,12 @@ /** A metadata column reader that can be extended by {@link RowIndexColumnReader} etc. */ public class MetadataColumnReader extends AbstractColumnReader { private final BufferAllocator allocator = new RootAllocator(); + private CometVector vector; + private ArrowArray array = null; + private ArrowSchema schema = null; + public MetadataColumnReader(DataType type, ColumnDescriptor descriptor, boolean useDecimal128) { // TODO: should we handle legacy dates & timestamps for metadata columns? super(type, descriptor, useDecimal128, false); @@ -50,12 +54,15 @@ public void setBatchSize(int batchSize) { @Override public void readBatch(int total) { if (vector == null) { - long[] addresses = Native.currentBatch(nativeHandle); - try (ArrowArray array = ArrowArray.wrap(addresses[0]); - ArrowSchema schema = ArrowSchema.wrap(addresses[1])) { - FieldVector fieldVector = Data.importVector(allocator, array, schema, null); - vector = new CometPlainVector(fieldVector, useDecimal128); - } + array = ArrowArray.allocateNew(allocator); + schema = ArrowSchema.allocateNew(allocator); + + long arrayAddr = array.memoryAddress(); + long schemaAddr = schema.memoryAddress(); + + Native.currentBatch(nativeHandle, arrayAddr, schemaAddr); + FieldVector fieldVector = Data.importVector(allocator, array, schema, null); + vector = new CometPlainVector(fieldVector, useDecimal128); } vector.setNumValues(total); } diff --git a/common/src/main/java/org/apache/comet/parquet/Native.java b/common/src/main/java/org/apache/comet/parquet/Native.java index f4820fedf..1e666652e 100644 --- a/common/src/main/java/org/apache/comet/parquet/Native.java +++ b/common/src/main/java/org/apache/comet/parquet/Native.java @@ -192,10 +192,10 @@ public static native void setPageV2( * Returns the current batch constructed via 'readBatch' * * @param handle the handle to the native Parquet column reader - * @return a long array with 2 elements, the first is the address to native Arrow array, and the - * second is the address to the Arrow schema. + * @param arrayAddr the memory address to the ArrowArray struct + * @param schemaAddr the memory address to the ArrowSchema struct */ - public static native long[] currentBatch(long handle); + public static native void currentBatch(long handle, long arrayAddr, long schemaAddr); /** Set methods to set a constant value for the reader, so it'll return constant vectors */ public static native void setNull(long handle); diff --git a/common/src/main/scala/org/apache/comet/vector/NativeUtil.scala b/common/src/main/scala/org/apache/comet/vector/NativeUtil.scala index e38ec4b8d..f99b2c920 100644 --- a/common/src/main/scala/org/apache/comet/vector/NativeUtil.scala +++ b/common/src/main/scala/org/apache/comet/vector/NativeUtil.scala @@ -163,11 +163,11 @@ class NativeUtil { case -1 => // EOF None - case numRows => + case numRows if numRows >= 0 => val cometVectors = importVector(arrays, schemas) Some(new ColumnarBatch(cometVectors.toArray, numRows.toInt)) - // case flag => - // throw new IllegalStateException(s"Invalid native flag: $flag") + case flag => + throw new IllegalStateException(s"Invalid native flag: $flag") } } diff --git a/native/core/src/parquet/mod.rs b/native/core/src/parquet/mod.rs index ef95c17c9..455f19929 100644 --- a/native/core/src/parquet/mod.rs +++ b/native/core/src/parquet/mod.rs @@ -27,7 +27,7 @@ use std::{boxed::Box, ptr::NonNull, sync::Arc}; use crate::errors::{try_unwrap_or_throw, CometError}; -use arrow::ffi::{FFI_ArrowArray, FFI_ArrowSchema}; +use arrow::ffi::FFI_ArrowArray; /// JNI exposed methods use jni::JNIEnv; @@ -52,7 +52,6 @@ const STR_CLASS_NAME: &str = "java/lang/String"; /// Parquet read context maintained across multiple JNI calls. struct Context { pub column_reader: ColumnReader, - pub arrays: Vec<(Arc, Arc)>, last_data_page: Option, } @@ -110,7 +109,6 @@ pub extern "system" fn Java_org_apache_comet_parquet_Native_initColumnReader( use_decimal_128 != 0, use_legacy_date_timestamp != 0, ), - arrays: Vec::new(), last_data_page: None, }; let res = Box::new(ctx); @@ -539,24 +537,15 @@ pub extern "system" fn Java_org_apache_comet_parquet_Native_currentBatch( e: JNIEnv, _jclass: JClass, handle: jlong, -) -> jlongArray { - try_unwrap_or_throw(&e, |env| { + array_addr: jlong, + schema_addr: jlong, +) { + try_unwrap_or_throw(&e, |_env| { let ctx = get_context(handle)?; let reader = &mut ctx.column_reader; let data = reader.current_batch(); - let (array, schema) = data.to_spark()?; - - unsafe { - let arrow_array = Arc::from_raw(array as *const FFI_ArrowArray); - let arrow_schema = Arc::from_raw(schema as *const FFI_ArrowSchema); - ctx.arrays.push((arrow_array, arrow_schema)); - - let res = env.new_long_array(2)?; - let buf: [i64; 2] = [array, schema]; - env.set_long_array_region(&res, 0, &buf) - .expect("set long array region failed"); - Ok(res.into_raw()) - } + data.move_to_spark(array_addr, schema_addr) + .map_err(|e| e.into()) }) }