From 5c45fdc9e311da07a0f3510d9b8aec673081861a Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 9 Dec 2024 17:45:27 -0700 Subject: [PATCH] remove dead code (#1155) --- native/core/benches/bloom_filter_agg.rs | 2 - native/core/benches/parquet_read.rs | 5 +- native/core/src/errors.rs | 44 +------ .../expressions/bloom_filter_agg.rs | 6 - .../core/src/execution/datafusion/planner.rs | 3 - .../datafusion/util/spark_bit_array.rs | 1 + native/core/src/execution/jni_api.rs | 8 -- native/core/src/execution/kernels/strings.rs | 117 +----------------- native/core/src/execution/operators/scan.rs | 8 -- native/core/src/execution/shuffle/list.rs | 4 +- native/core/src/execution/shuffle/map.rs | 17 +-- native/core/src/execution/shuffle/row.rs | 5 +- native/core/src/execution/utils.rs | 18 --- native/core/src/jvm_bridge/batch_iterator.rs | 1 + .../core/src/jvm_bridge/comet_metric_node.rs | 1 + .../jvm_bridge/comet_task_memory_manager.rs | 1 + native/core/src/jvm_bridge/mod.rs | 1 + native/core/src/lib.rs | 1 - native/core/src/parquet/mod.rs | 2 - native/core/src/parquet/mutable_vector.rs | 8 -- native/core/src/parquet/read/column.rs | 14 +-- native/core/src/parquet/read/mod.rs | 3 - native/core/src/parquet/read/values.rs | 26 +--- .../src/parquet/util/test_common/page_util.rs | 12 +- 24 files changed, 27 insertions(+), 281 deletions(-) diff --git a/native/core/benches/bloom_filter_agg.rs b/native/core/benches/bloom_filter_agg.rs index af3eb919e..25d27d174 100644 --- a/native/core/benches/bloom_filter_agg.rs +++ b/native/core/benches/bloom_filter_agg.rs @@ -61,10 +61,8 @@ fn criterion_benchmark(c: &mut Criterion) { group.bench_function(agg_mode.0, |b| { let comet_bloom_filter_agg = Arc::new(AggregateUDF::new_from_impl(BloomFilterAgg::new( - Arc::clone(&c0), Arc::clone(&num_items), Arc::clone(&num_bits), - "bloom_filter_agg", DataType::Binary, ))); b.to_async(&rt).iter(|| { diff --git a/native/core/benches/parquet_read.rs b/native/core/benches/parquet_read.rs index 1f8178cd2..ae511ade5 100644 --- a/native/core/benches/parquet_read.rs +++ b/native/core/benches/parquet_read.rs @@ -44,7 +44,7 @@ fn bench(c: &mut Criterion) { let mut group = c.benchmark_group("comet_parquet_read"); let schema = build_test_schema(); - let pages = build_plain_int32_pages(schema.clone(), schema.column(0), 0.0); + let pages = build_plain_int32_pages(schema.column(0), 0.0); group.bench_function("INT/PLAIN/NOT_NULL", |b| { let t = TypePtr::new( PrimitiveTypeBuilder::new("f", PhysicalType::INT32) @@ -107,7 +107,6 @@ const VALUES_PER_PAGE: usize = 10_000; const BATCH_SIZE: usize = 4096; fn build_plain_int32_pages( - schema: SchemaDescPtr, column_desc: ColumnDescPtr, null_density: f32, ) -> impl PageIterator + Clone { @@ -143,7 +142,7 @@ fn build_plain_int32_pages( // Since `InMemoryPageReader` is not exposed from parquet crate, here we use // `InMemoryPageIterator` instead which is a Iter>. - InMemoryPageIterator::new(schema, column_desc, vec![pages]) + InMemoryPageIterator::new(vec![pages]) } struct TestColumnReader { diff --git a/native/core/src/errors.rs b/native/core/src/errors.rs index 92799bcf6..4d623d976 100644 --- a/native/core/src/errors.rs +++ b/native/core/src/errors.rs @@ -485,23 +485,6 @@ where || f(t) } -// This is a duplicate of `try_unwrap_or_throw`, which is used to work around Arrow's lack of -// `UnwindSafe` handling. -pub fn try_assert_unwind_safe_or_throw(env: &JNIEnv, f: F) -> T -where - T: JNIDefault, - F: FnOnce(JNIEnv) -> Result, -{ - let mut env1 = unsafe { JNIEnv::from_raw(env.get_raw()).unwrap() }; - let env2 = unsafe { JNIEnv::from_raw(env.get_raw()).unwrap() }; - unwrap_or_throw_default( - &mut env1, - flatten( - catch_unwind(std::panic::AssertUnwindSafe(curry(f, env2))).map_err(CometError::from), - ), - ) -} - // It is currently undefined behavior to unwind from Rust code into foreign code, so we can wrap // our JNI functions and turn these panics into a `RuntimeException`. pub fn try_unwrap_or_throw(env: &JNIEnv, f: F) -> T @@ -534,10 +517,7 @@ mod tests { AttachGuard, InitArgsBuilder, JNIEnv, JNIVersion, JavaVM, }; - use assertables::{ - assert_contains, assert_contains_as_result, assert_starts_with, - assert_starts_with_as_result, - }; + use assertables::{assert_starts_with, assert_starts_with_as_result}; pub fn jvm() -> &'static Arc { static mut JVM: Option> = None; @@ -890,26 +870,4 @@ mod tests { // first line. assert_starts_with!(msg_rust, expected_message); } - - // Asserts that exception's message matches `expected_message`. - fn assert_exception_message_with_stacktrace( - env: &mut JNIEnv, - exception: JThrowable, - expected_message: &str, - stacktrace_contains: &str, - ) { - let message = env - .call_method(exception, "getMessage", "()Ljava/lang/String;", &[]) - .unwrap() - .l() - .unwrap(); - let message_string = message.into(); - let msg_rust: String = env.get_string(&message_string).unwrap().into(); - // Since panics result in multi-line messages which include the backtrace, just use the - // first line. - assert_starts_with!(msg_rust, expected_message); - - // Check that the stacktrace is included by checking for a specific element - assert_contains!(msg_rust, stacktrace_contains); - } } diff --git a/native/core/src/execution/datafusion/expressions/bloom_filter_agg.rs b/native/core/src/execution/datafusion/expressions/bloom_filter_agg.rs index e6528a563..1300e08c2 100644 --- a/native/core/src/execution/datafusion/expressions/bloom_filter_agg.rs +++ b/native/core/src/execution/datafusion/expressions/bloom_filter_agg.rs @@ -34,9 +34,7 @@ use datafusion_physical_expr::expressions::Literal; #[derive(Debug, Clone)] pub struct BloomFilterAgg { - name: String, signature: Signature, - expr: Arc, num_items: i32, num_bits: i32, } @@ -53,15 +51,12 @@ fn extract_i32_from_literal(expr: Arc) -> i32 { impl BloomFilterAgg { pub fn new( - expr: Arc, num_items: Arc, num_bits: Arc, - name: impl Into, data_type: DataType, ) -> Self { assert!(matches!(data_type, DataType::Binary)); Self { - name: name.into(), signature: Signature::uniform( 1, vec![ @@ -73,7 +68,6 @@ impl BloomFilterAgg { ], Volatility::Immutable, ), - expr, num_items: extract_i32_from_literal(num_items), num_bits: extract_i32_from_literal(num_bits), } diff --git a/native/core/src/execution/datafusion/planner.rs b/native/core/src/execution/datafusion/planner.rs index a83dba5d6..5e77b3f65 100644 --- a/native/core/src/execution/datafusion/planner.rs +++ b/native/core/src/execution/datafusion/planner.rs @@ -115,7 +115,6 @@ use std::cmp::max; use std::{collections::HashMap, sync::Arc}; // For clippy error on type_complexity. -type ExecResult = Result; type PhyAggResult = Result, ExecutionError>; type PhyExprResult = Result, String)>, ExecutionError>; type PartitionPhyExprResult = Result>, ExecutionError>; @@ -1758,10 +1757,8 @@ impl PhysicalPlanner { self.create_expr(expr.num_bits.as_ref().unwrap(), Arc::clone(&schema))?; let datatype = to_arrow_datatype(expr.datatype.as_ref().unwrap()); let func = AggregateUDF::new_from_impl(BloomFilterAgg::new( - Arc::clone(&child), Arc::clone(&num_items), Arc::clone(&num_bits), - "bloom_filter_agg", datatype, )); Self::create_aggr_func_expr("bloom_filter_agg", schema, vec![child], func) diff --git a/native/core/src/execution/datafusion/util/spark_bit_array.rs b/native/core/src/execution/datafusion/util/spark_bit_array.rs index 68b97d660..6cfecc1bf 100644 --- a/native/core/src/execution/datafusion/util/spark_bit_array.rs +++ b/native/core/src/execution/datafusion/util/spark_bit_array.rs @@ -70,6 +70,7 @@ impl SparkBitArray { self.data.len() } + #[allow(dead_code)] // this is only called from tests pub fn cardinality(&self) -> usize { self.bit_count } diff --git a/native/core/src/execution/jni_api.rs b/native/core/src/execution/jni_api.rs index 8afe134cd..5103f5ce4 100644 --- a/native/core/src/execution/jni_api.rs +++ b/native/core/src/execution/jni_api.rs @@ -207,14 +207,6 @@ fn prepare_datafusion_session_context( Ok(session_ctx) } -fn parse_bool(conf: &HashMap, name: &str) -> CometResult { - conf.get(name) - .map(String::as_str) - .unwrap_or("false") - .parse::() - .map_err(|e| CometError::Config(format!("Failed to parse boolean config {name}: {e}"))) -} - /// Prepares arrow arrays for output. fn prepare_output( env: &mut JNIEnv, diff --git a/native/core/src/execution/kernels/strings.rs b/native/core/src/execution/kernels/strings.rs index 2e5e67b67..d63b2c477 100644 --- a/native/core/src/execution/kernels/strings.rs +++ b/native/core/src/execution/kernels/strings.rs @@ -21,7 +21,7 @@ use std::sync::Arc; use arrow::{ array::*, - buffer::{Buffer, MutableBuffer}, + buffer::MutableBuffer, compute::kernels::substring::{substring as arrow_substring, substring_by_char}, datatypes::{DataType, Int32Type}, }; @@ -87,43 +87,6 @@ pub fn substring(array: &dyn Array, start: i64, length: u64) -> Result ArrayRef { - match array.data_type() { - DataType::LargeUtf8 => generic_substring( - array - .as_any() - .downcast_ref::() - .expect("A large string is expected"), - start, - length, - |i| i as i64, - ), - DataType::Utf8 => generic_substring( - array - .as_any() - .downcast_ref::() - .expect("A string is expected"), - start, - length, - |i| i, - ), - _ => panic!("substring does not support type {:?}", array.data_type()), - } -} - fn generic_string_space(length: &Int32Array) -> ArrayRef { let array_len = length.len(); let mut offsets = MutableBuffer::new((array_len + 1) * std::mem::size_of::()); @@ -163,81 +126,3 @@ fn generic_string_space(length: &Int32Array) -> Arr }; make_array(data) } - -fn generic_substring( - array: &GenericStringArray, - start: &Int32Array, - length: &Int32Array, - f: F, -) -> ArrayRef -where - F: Fn(i32) -> OffsetSize, -{ - assert_eq!(array.len(), start.len()); - assert_eq!(array.len(), length.len()); - - // compute current offsets - let offsets = array.to_data().buffers()[0].clone(); - let offsets: &[OffsetSize] = offsets.typed_data::(); - - // compute null bitmap (copy) - let null_bit_buffer = array.to_data().nulls().map(|b| b.buffer().clone()); - - // Gets slices of start and length arrays to access them directly for performance. - let start_data = start.to_data(); - let length_data = length.to_data(); - let starts = start_data.buffers()[0].typed_data::(); - let lengths = length_data.buffers()[0].typed_data::(); - - // compute values - let array_data = array.to_data(); - let values = &array_data.buffers()[1]; - let data = values.as_slice(); - - // we have no way to estimate how much this will be. - let mut new_values = MutableBuffer::new(0); - let mut new_offsets: Vec = Vec::with_capacity(array.len() + 1); - - let mut length_so_far = OffsetSize::zero(); - new_offsets.push(length_so_far); - (0..array.len()).for_each(|i| { - // the length of this entry - let length_i: OffsetSize = offsets[i + 1] - offsets[i]; - // compute where we should start slicing this entry - let start_pos: OffsetSize = f(starts[i]); - - let start = offsets[i] - + if start_pos >= OffsetSize::zero() { - start_pos - } else { - length_i + start_pos - }; - - let start = start.clamp(offsets[i], offsets[i + 1]); - // compute the length of the slice - let slice_length: OffsetSize = f(lengths[i].max(0)).min(offsets[i + 1] - start); - - length_so_far += slice_length; - - new_offsets.push(length_so_far); - - // we need usize for ranges - let start = start.to_usize().unwrap(); - let slice_length = slice_length.to_usize().unwrap(); - - new_values.extend_from_slice(&data[start..start + slice_length]); - }); - - let data = unsafe { - ArrayData::new_unchecked( - GenericStringArray::::DATA_TYPE, - array.len(), - None, - null_bit_buffer, - 0, - vec![Buffer::from_slice_ref(&new_offsets), new_values.into()], - vec![], - ) - }; - make_array(data) -} diff --git a/native/core/src/execution/operators/scan.rs b/native/core/src/execution/operators/scan.rs index a97caf0db..0d35859df 100644 --- a/native/core/src/execution/operators/scan.rs +++ b/native/core/src/execution/operators/scan.rs @@ -525,12 +525,4 @@ impl InputBatch { InputBatch::Batch(columns, num_rows) } - - /// Get the number of rows in this batch - fn num_rows(&self) -> usize { - match self { - Self::EOF => 0, - Self::Batch(_, num_rows) => *num_rows, - } - } } diff --git a/native/core/src/execution/shuffle/list.rs b/native/core/src/execution/shuffle/list.rs index d8bdcb197..0f7f3e8cb 100644 --- a/native/core/src/execution/shuffle/list.rs +++ b/native/core/src/execution/shuffle/list.rs @@ -28,7 +28,6 @@ use arrow_schema::{DataType, TimeUnit}; pub struct SparkUnsafeArray { row_addr: i64, - row_size: i32, num_elements: usize, element_offset: i64, } @@ -45,7 +44,7 @@ impl SparkUnsafeObject for SparkUnsafeArray { impl SparkUnsafeArray { /// Creates a `SparkUnsafeArray` which points to the given address and size in bytes. - pub fn new(addr: i64, size: i32) -> Self { + pub fn new(addr: i64) -> Self { // Read the number of elements from the first 8 bytes. let slice: &[u8] = unsafe { std::slice::from_raw_parts(addr as *const u8, 8) }; let num_elements = i64::from_le_bytes(slice.try_into().unwrap()); @@ -60,7 +59,6 @@ impl SparkUnsafeArray { Self { row_addr: addr, - row_size: size, num_elements: num_elements as usize, element_offset: addr + Self::get_header_portion_in_bytes(num_elements), } diff --git a/native/core/src/execution/shuffle/map.rs b/native/core/src/execution/shuffle/map.rs index 014695293..0969168f8 100644 --- a/native/core/src/execution/shuffle/map.rs +++ b/native/core/src/execution/shuffle/map.rs @@ -30,8 +30,6 @@ use arrow_array::builder::{ use arrow_schema::{DataType, FieldRef, Fields, TimeUnit}; pub struct SparkUnsafeMap { - row_addr: i64, - row_size: i32, pub(crate) keys: SparkUnsafeArray, pub(crate) values: SparkUnsafeArray, } @@ -59,8 +57,8 @@ impl SparkUnsafeMap { panic!("Negative value size in bytes of map: {}", value_array_size); } - let keys = SparkUnsafeArray::new(addr + 8, key_array_size as i32); - let values = SparkUnsafeArray::new(addr + 8 + key_array_size, value_array_size); + let keys = SparkUnsafeArray::new(addr + 8); + let values = SparkUnsafeArray::new(addr + 8 + key_array_size); if keys.get_num_elements() != values.get_num_elements() { panic!( @@ -70,16 +68,7 @@ impl SparkUnsafeMap { ); } - Self { - row_addr: addr, - row_size: size, - keys, - values, - } - } - - pub(crate) fn get_num_elements(&self) -> usize { - self.keys.get_num_elements() + Self { keys, values } } } diff --git a/native/core/src/execution/shuffle/row.rs b/native/core/src/execution/shuffle/row.rs index 2aeb48815..17b180e9d 100644 --- a/native/core/src/execution/shuffle/row.rs +++ b/native/core/src/execution/shuffle/row.rs @@ -48,7 +48,6 @@ use std::{ sync::Arc, }; -const WORD_SIZE: i64 = 8; const MAX_LONG_DIGITS: u8 = 18; const NESTED_TYPE_BUILDER_CAPACITY: usize = 100; @@ -170,8 +169,8 @@ pub trait SparkUnsafeObject { /// Returns array value at the given index of the object. fn get_array(&self, index: usize) -> SparkUnsafeArray { - let (offset, len) = self.get_offset_and_len(index); - SparkUnsafeArray::new(self.get_row_addr() + offset as i64, len) + let (offset, _) = self.get_offset_and_len(index); + SparkUnsafeArray::new(self.get_row_addr() + offset as i64) } fn get_map(&self, index: usize) -> SparkUnsafeMap { diff --git a/native/core/src/execution/utils.rs b/native/core/src/execution/utils.rs index 553d42606..4992b7ba9 100644 --- a/native/core/src/execution/utils.rs +++ b/native/core/src/execution/utils.rs @@ -15,8 +15,6 @@ // specific language governing permissions and limitations // under the License. -use std::sync::Arc; - use arrow::{ array::ArrayData, error::ArrowError, @@ -52,10 +50,6 @@ pub trait SparkArrowConvert { where Self: Sized; - /// Convert Arrow Arrays to C data interface. - /// It returns a tuple (ArrowArray address, ArrowSchema address). - fn to_spark(&self) -> Result<(i64, i64), ExecutionError>; - /// Move Arrow Arrays to C data interface. fn move_to_spark(&self, array: i64, schema: i64) -> Result<(), ExecutionError>; } @@ -88,18 +82,6 @@ impl SparkArrowConvert for ArrayData { Ok(ffi_array) } - /// Converts this ArrowData to pointers of Arrow C data interface. - /// Returned pointers are Arc-ed and should be freed manually. - #[allow(clippy::arc_with_non_send_sync)] - fn to_spark(&self) -> Result<(i64, i64), ExecutionError> { - let arrow_array = Arc::new(FFI_ArrowArray::new(self)); - let arrow_schema = Arc::new(FFI_ArrowSchema::try_from(self.data_type())?); - - let (array, schema) = (Arc::into_raw(arrow_array), Arc::into_raw(arrow_schema)); - - Ok((array as i64, schema as i64)) - } - /// Move this ArrowData to pointers of Arrow C data interface. fn move_to_spark(&self, array: i64, schema: i64) -> Result<(), ExecutionError> { let array_ptr = array as *mut FFI_ArrowArray; diff --git a/native/core/src/jvm_bridge/batch_iterator.rs b/native/core/src/jvm_bridge/batch_iterator.rs index 45b10cf20..998e540c7 100644 --- a/native/core/src/jvm_bridge/batch_iterator.rs +++ b/native/core/src/jvm_bridge/batch_iterator.rs @@ -24,6 +24,7 @@ use jni::{ }; /// A struct that holds all the JNI methods and fields for JVM `CometBatchIterator` class. +#[allow(dead_code)] // we need to keep references to Java items to prevent GC pub struct CometBatchIterator<'a> { pub class: JClass<'a>, pub method_has_next: JMethodID, diff --git a/native/core/src/jvm_bridge/comet_metric_node.rs b/native/core/src/jvm_bridge/comet_metric_node.rs index 8647e071a..85386d9b0 100644 --- a/native/core/src/jvm_bridge/comet_metric_node.rs +++ b/native/core/src/jvm_bridge/comet_metric_node.rs @@ -23,6 +23,7 @@ use jni::{ }; /// A struct that holds all the JNI methods and fields for JVM CometMetricNode class. +#[allow(dead_code)] // we need to keep references to Java items to prevent GC pub struct CometMetricNode<'a> { pub class: JClass<'a>, pub method_get_child_node: JMethodID, diff --git a/native/core/src/jvm_bridge/comet_task_memory_manager.rs b/native/core/src/jvm_bridge/comet_task_memory_manager.rs index 97d1bf3a7..22c3332c6 100644 --- a/native/core/src/jvm_bridge/comet_task_memory_manager.rs +++ b/native/core/src/jvm_bridge/comet_task_memory_manager.rs @@ -25,6 +25,7 @@ use jni::{ /// A wrapper which delegate acquire/release memory calls to the /// JVM side `CometTaskMemoryManager`. #[derive(Debug)] +#[allow(dead_code)] // we need to keep references to Java items to prevent GC pub struct CometTaskMemoryManager<'a> { pub class: JClass<'a>, pub method_acquire_memory: JMethodID, diff --git a/native/core/src/jvm_bridge/mod.rs b/native/core/src/jvm_bridge/mod.rs index 4936b1c5b..5fc0a55e3 100644 --- a/native/core/src/jvm_bridge/mod.rs +++ b/native/core/src/jvm_bridge/mod.rs @@ -189,6 +189,7 @@ pub use comet_metric_node::*; pub use comet_task_memory_manager::*; /// The JVM classes that are used in the JNI calls. +#[allow(dead_code)] // we need to keep references to Java items to prevent GC pub struct JVMClasses<'a> { /// Cached JClass for "java.lang.Object" java_lang_object: JClass<'a>, diff --git a/native/core/src/lib.rs b/native/core/src/lib.rs index 68c8ae729..cab511faf 100644 --- a/native/core/src/lib.rs +++ b/native/core/src/lib.rs @@ -17,7 +17,6 @@ #![allow(incomplete_features)] #![allow(non_camel_case_types)] -#![allow(dead_code)] #![allow(clippy::upper_case_acronyms)] // For prost generated struct #![allow(clippy::derive_partial_eq_without_eq)] diff --git a/native/core/src/parquet/mod.rs b/native/core/src/parquet/mod.rs index 455f19929..d2a6f4804 100644 --- a/native/core/src/parquet/mod.rs +++ b/native/core/src/parquet/mod.rs @@ -47,8 +47,6 @@ use util::jni::{convert_column_descriptor, convert_encoding}; use self::util::jni::TypePromotionInfo; -const STR_CLASS_NAME: &str = "java/lang/String"; - /// Parquet read context maintained across multiple JNI calls. struct Context { pub column_reader: ColumnReader, diff --git a/native/core/src/parquet/mutable_vector.rs b/native/core/src/parquet/mutable_vector.rs index 7f30d7d87..d19ea32fa 100644 --- a/native/core/src/parquet/mutable_vector.rs +++ b/native/core/src/parquet/mutable_vector.rs @@ -40,12 +40,6 @@ pub struct ParquetMutableVector { /// The number of null elements in this vector, must <= `num_values`. pub(crate) num_nulls: usize, - /// The capacity of the vector - pub(crate) capacity: usize, - - /// How many bits are required to store a single value - pub(crate) bit_width: usize, - /// The validity buffer of this Arrow vector. A bit set at position `i` indicates the `i`th /// element is not null. Otherwise, an unset bit at position `i` indicates the `i`th element is /// null. @@ -109,8 +103,6 @@ impl ParquetMutableVector { arrow_type, num_values: 0, num_nulls: 0, - capacity, - bit_width, validity_buffer, value_buffer, children, diff --git a/native/core/src/parquet/read/column.rs b/native/core/src/parquet/read/column.rs index 73f8df956..05a0bf7b5 100644 --- a/native/core/src/parquet/read/column.rs +++ b/native/core/src/parquet/read/column.rs @@ -770,7 +770,7 @@ impl TypedColumnReader { // Create a new vector for dictionary values let mut value_vector = ParquetMutableVector::new(page_value_count, &self.arrow_type); - let mut dictionary = self.get_decoder(page_data, page_value_count, encoding); + let mut dictionary = self.get_decoder(page_data, encoding); dictionary.read_batch(&mut value_vector, page_value_count); value_vector.num_values = page_value_count; @@ -812,7 +812,7 @@ impl TypedColumnReader { self.def_level_decoder = Some(dl_decoder); page_buffer = page_buffer.slice(offset); - let value_decoder = self.get_decoder(page_buffer, page_value_count, encoding); + let value_decoder = self.get_decoder(page_buffer, encoding); self.value_decoder = Some(value_decoder); } @@ -838,7 +838,7 @@ impl TypedColumnReader { dl_decoder.set_data(page_value_count, &def_level_data); self.def_level_decoder = Some(dl_decoder); - let value_decoder = self.get_decoder(value_data, page_value_count, encoding); + let value_decoder = self.get_decoder(value_data, encoding); self.value_decoder = Some(value_decoder); } @@ -977,15 +977,9 @@ impl TypedColumnReader { } } - fn get_decoder( - &self, - value_data: Buffer, - page_value_count: usize, - encoding: Encoding, - ) -> Box { + fn get_decoder(&self, value_data: Buffer, encoding: Encoding) -> Box { get_decoder::( value_data, - page_value_count, encoding, Arc::clone(&self.desc), self.read_options, diff --git a/native/core/src/parquet/read/mod.rs b/native/core/src/parquet/read/mod.rs index 4d057a06c..5a55f2117 100644 --- a/native/core/src/parquet/read/mod.rs +++ b/native/core/src/parquet/read/mod.rs @@ -44,9 +44,6 @@ pub struct PlainDecoderInner { /// The current offset in `data`, in bytes. offset: usize, - /// The number of total values in `data` - value_count: usize, - /// Reads `data` bit by bit, used if `T` is [`BoolType`]. bit_reader: BitReader, diff --git a/native/core/src/parquet/read/values.rs b/native/core/src/parquet/read/values.rs index 71cd035d2..e28d695ec 100644 --- a/native/core/src/parquet/read/values.rs +++ b/native/core/src/parquet/read/values.rs @@ -34,20 +34,16 @@ use datafusion_comet_spark_expr::utils::unlikely; pub fn get_decoder( value_data: Buffer, - num_values: usize, encoding: Encoding, desc: ColumnDescPtr, read_options: ReadOptions, ) -> Box { let decoder: Box = match encoding { - Encoding::PLAIN | Encoding::PLAIN_DICTIONARY => Box::new(PlainDecoder::::new( - value_data, - num_values, - desc, - read_options, - )), + Encoding::PLAIN | Encoding::PLAIN_DICTIONARY => { + Box::new(PlainDecoder::::new(value_data, desc, read_options)) + } // This is for dictionary indices - Encoding::RLE_DICTIONARY => Box::new(DictDecoder::new(value_data, num_values)), + Encoding::RLE_DICTIONARY => Box::new(DictDecoder::new(value_data)), _ => panic!("Unsupported encoding: {}", encoding), }; decoder @@ -108,17 +104,11 @@ pub struct PlainDecoder { } impl PlainDecoder { - pub fn new( - value_data: Buffer, - num_values: usize, - desc: ColumnDescPtr, - read_options: ReadOptions, - ) -> Self { + pub fn new(value_data: Buffer, desc: ColumnDescPtr, read_options: ReadOptions) -> Self { let len = value_data.len(); let inner = PlainDecoderInner { data: value_data.clone(), offset: 0, - value_count: num_values, bit_reader: BitReader::new(value_data, len), read_options, desc, @@ -938,9 +928,6 @@ pub struct DictDecoder { /// Number of bits used to represent dictionary indices. Must be between `[0, 64]`. bit_width: usize, - /// The number of total values in `data` - value_count: usize, - /// Bit reader bit_reader: BitReader, @@ -955,12 +942,11 @@ pub struct DictDecoder { } impl DictDecoder { - pub fn new(buf: Buffer, num_values: usize) -> Self { + pub fn new(buf: Buffer) -> Self { let bit_width = buf.as_bytes()[0] as usize; Self { bit_width, - value_count: num_values, bit_reader: BitReader::new_all(buf.slice(1)), rle_left: 0, bit_packed_left: 0, diff --git a/native/core/src/parquet/util/test_common/page_util.rs b/native/core/src/parquet/util/test_common/page_util.rs index e20cc30cf..333298bc3 100644 --- a/native/core/src/parquet/util/test_common/page_util.rs +++ b/native/core/src/parquet/util/test_common/page_util.rs @@ -28,7 +28,7 @@ use parquet::{ levels::{max_buffer_size, LevelEncoder}, }, errors::Result, - schema::types::{ColumnDescPtr, SchemaDescPtr}, + schema::types::ColumnDescPtr, }; use super::random_numbers_range; @@ -201,20 +201,12 @@ impl + Send> Iterator for InMemoryPageReader

{ /// A utility page iterator which stores page readers in memory, used for tests. #[derive(Clone)] pub struct InMemoryPageIterator>> { - schema: SchemaDescPtr, - column_desc: ColumnDescPtr, page_reader_iter: I, } impl>> InMemoryPageIterator { - pub fn new( - schema: SchemaDescPtr, - column_desc: ColumnDescPtr, - pages: impl IntoIterator, IntoIter = I>, - ) -> Self { + pub fn new(pages: impl IntoIterator, IntoIter = I>) -> Self { Self { - schema, - column_desc, page_reader_iter: pages.into_iter(), } }