Skip to content

Commit

Permalink
remove dead code (#1155)
Browse files Browse the repository at this point in the history
  • Loading branch information
andygrove authored Dec 10, 2024
1 parent 73f1405 commit 5c45fdc
Show file tree
Hide file tree
Showing 24 changed files with 27 additions and 281 deletions.
2 changes: 0 additions & 2 deletions native/core/benches/bloom_filter_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,8 @@ fn criterion_benchmark(c: &mut Criterion) {
group.bench_function(agg_mode.0, |b| {
let comet_bloom_filter_agg =
Arc::new(AggregateUDF::new_from_impl(BloomFilterAgg::new(
Arc::clone(&c0),
Arc::clone(&num_items),
Arc::clone(&num_bits),
"bloom_filter_agg",
DataType::Binary,
)));
b.to_async(&rt).iter(|| {
Expand Down
5 changes: 2 additions & 3 deletions native/core/benches/parquet_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ fn bench(c: &mut Criterion) {
let mut group = c.benchmark_group("comet_parquet_read");
let schema = build_test_schema();

let pages = build_plain_int32_pages(schema.clone(), schema.column(0), 0.0);
let pages = build_plain_int32_pages(schema.column(0), 0.0);
group.bench_function("INT/PLAIN/NOT_NULL", |b| {
let t = TypePtr::new(
PrimitiveTypeBuilder::new("f", PhysicalType::INT32)
Expand Down Expand Up @@ -107,7 +107,6 @@ const VALUES_PER_PAGE: usize = 10_000;
const BATCH_SIZE: usize = 4096;

fn build_plain_int32_pages(
schema: SchemaDescPtr,
column_desc: ColumnDescPtr,
null_density: f32,
) -> impl PageIterator + Clone {
Expand Down Expand Up @@ -143,7 +142,7 @@ fn build_plain_int32_pages(

// Since `InMemoryPageReader` is not exposed from parquet crate, here we use
// `InMemoryPageIterator` instead which is a Iter<Iter<Page>>.
InMemoryPageIterator::new(schema, column_desc, vec![pages])
InMemoryPageIterator::new(vec![pages])
}

struct TestColumnReader {
Expand Down
44 changes: 1 addition & 43 deletions native/core/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -485,23 +485,6 @@ where
|| f(t)
}

// This is a duplicate of `try_unwrap_or_throw`, which is used to work around Arrow's lack of
// `UnwindSafe` handling.
pub fn try_assert_unwind_safe_or_throw<T, F>(env: &JNIEnv, f: F) -> T
where
T: JNIDefault,
F: FnOnce(JNIEnv) -> Result<T, CometError>,
{
let mut env1 = unsafe { JNIEnv::from_raw(env.get_raw()).unwrap() };
let env2 = unsafe { JNIEnv::from_raw(env.get_raw()).unwrap() };
unwrap_or_throw_default(
&mut env1,
flatten(
catch_unwind(std::panic::AssertUnwindSafe(curry(f, env2))).map_err(CometError::from),
),
)
}

// It is currently undefined behavior to unwind from Rust code into foreign code, so we can wrap
// our JNI functions and turn these panics into a `RuntimeException`.
pub fn try_unwrap_or_throw<T, F>(env: &JNIEnv, f: F) -> T
Expand Down Expand Up @@ -534,10 +517,7 @@ mod tests {
AttachGuard, InitArgsBuilder, JNIEnv, JNIVersion, JavaVM,
};

use assertables::{
assert_contains, assert_contains_as_result, assert_starts_with,
assert_starts_with_as_result,
};
use assertables::{assert_starts_with, assert_starts_with_as_result};

pub fn jvm() -> &'static Arc<JavaVM> {
static mut JVM: Option<Arc<JavaVM>> = None;
Expand Down Expand Up @@ -890,26 +870,4 @@ mod tests {
// first line.
assert_starts_with!(msg_rust, expected_message);
}

// Asserts that exception's message matches `expected_message`.
fn assert_exception_message_with_stacktrace(
env: &mut JNIEnv,
exception: JThrowable,
expected_message: &str,
stacktrace_contains: &str,
) {
let message = env
.call_method(exception, "getMessage", "()Ljava/lang/String;", &[])
.unwrap()
.l()
.unwrap();
let message_string = message.into();
let msg_rust: String = env.get_string(&message_string).unwrap().into();
// Since panics result in multi-line messages which include the backtrace, just use the
// first line.
assert_starts_with!(msg_rust, expected_message);

// Check that the stacktrace is included by checking for a specific element
assert_contains!(msg_rust, stacktrace_contains);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,7 @@ use datafusion_physical_expr::expressions::Literal;

#[derive(Debug, Clone)]
pub struct BloomFilterAgg {
name: String,
signature: Signature,
expr: Arc<dyn PhysicalExpr>,
num_items: i32,
num_bits: i32,
}
Expand All @@ -53,15 +51,12 @@ fn extract_i32_from_literal(expr: Arc<dyn PhysicalExpr>) -> i32 {

impl BloomFilterAgg {
pub fn new(
expr: Arc<dyn PhysicalExpr>,
num_items: Arc<dyn PhysicalExpr>,
num_bits: Arc<dyn PhysicalExpr>,
name: impl Into<String>,
data_type: DataType,
) -> Self {
assert!(matches!(data_type, DataType::Binary));
Self {
name: name.into(),
signature: Signature::uniform(
1,
vec![
Expand All @@ -73,7 +68,6 @@ impl BloomFilterAgg {
],
Volatility::Immutable,
),
expr,
num_items: extract_i32_from_literal(num_items),
num_bits: extract_i32_from_literal(num_bits),
}
Expand Down
3 changes: 0 additions & 3 deletions native/core/src/execution/datafusion/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,6 @@ use std::cmp::max;
use std::{collections::HashMap, sync::Arc};

// For clippy error on type_complexity.
type ExecResult<T> = Result<T, ExecutionError>;
type PhyAggResult = Result<Vec<AggregateFunctionExpr>, ExecutionError>;
type PhyExprResult = Result<Vec<(Arc<dyn PhysicalExpr>, String)>, ExecutionError>;
type PartitionPhyExprResult = Result<Vec<Arc<dyn PhysicalExpr>>, ExecutionError>;
Expand Down Expand Up @@ -1758,10 +1757,8 @@ impl PhysicalPlanner {
self.create_expr(expr.num_bits.as_ref().unwrap(), Arc::clone(&schema))?;
let datatype = to_arrow_datatype(expr.datatype.as_ref().unwrap());
let func = AggregateUDF::new_from_impl(BloomFilterAgg::new(
Arc::clone(&child),
Arc::clone(&num_items),
Arc::clone(&num_bits),
"bloom_filter_agg",
datatype,
));
Self::create_aggr_func_expr("bloom_filter_agg", schema, vec![child], func)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ impl SparkBitArray {
self.data.len()
}

#[allow(dead_code)] // this is only called from tests
pub fn cardinality(&self) -> usize {
self.bit_count
}
Expand Down
8 changes: 0 additions & 8 deletions native/core/src/execution/jni_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,14 +207,6 @@ fn prepare_datafusion_session_context(
Ok(session_ctx)
}

fn parse_bool(conf: &HashMap<String, String>, name: &str) -> CometResult<bool> {
conf.get(name)
.map(String::as_str)
.unwrap_or("false")
.parse::<bool>()
.map_err(|e| CometError::Config(format!("Failed to parse boolean config {name}: {e}")))
}

/// Prepares arrow arrays for output.
fn prepare_output(
env: &mut JNIEnv,
Expand Down
117 changes: 1 addition & 116 deletions native/core/src/execution/kernels/strings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use std::sync::Arc;

use arrow::{
array::*,
buffer::{Buffer, MutableBuffer},
buffer::MutableBuffer,
compute::kernels::substring::{substring as arrow_substring, substring_by_char},
datatypes::{DataType, Int32Type},
};
Expand Down Expand Up @@ -87,43 +87,6 @@ pub fn substring(array: &dyn Array, start: i64, length: u64) -> Result<ArrayRef,
}
}

/// Returns an ArrayRef with a substring starting from `start` and length.
///
/// # Preconditions
///
/// - `start` can be negative, in which case the start counts from the end of the string.
/// - `array` must be either [`StringArray`] or [`LargeStringArray`].
///
/// Note: this is different from arrow-rs `substring` kernel in that both `start` and `length` are
/// `Int32Array` here.
pub fn substring_with_array(
array: &dyn Array,
start: &Int32Array,
length: &Int32Array,
) -> ArrayRef {
match array.data_type() {
DataType::LargeUtf8 => generic_substring(
array
.as_any()
.downcast_ref::<LargeStringArray>()
.expect("A large string is expected"),
start,
length,
|i| i as i64,
),
DataType::Utf8 => generic_substring(
array
.as_any()
.downcast_ref::<StringArray>()
.expect("A string is expected"),
start,
length,
|i| i,
),
_ => panic!("substring does not support type {:?}", array.data_type()),
}
}

fn generic_string_space<OffsetSize: OffsetSizeTrait>(length: &Int32Array) -> ArrayRef {
let array_len = length.len();
let mut offsets = MutableBuffer::new((array_len + 1) * std::mem::size_of::<OffsetSize>());
Expand Down Expand Up @@ -163,81 +126,3 @@ fn generic_string_space<OffsetSize: OffsetSizeTrait>(length: &Int32Array) -> Arr
};
make_array(data)
}

fn generic_substring<OffsetSize: OffsetSizeTrait, F>(
array: &GenericStringArray<OffsetSize>,
start: &Int32Array,
length: &Int32Array,
f: F,
) -> ArrayRef
where
F: Fn(i32) -> OffsetSize,
{
assert_eq!(array.len(), start.len());
assert_eq!(array.len(), length.len());

// compute current offsets
let offsets = array.to_data().buffers()[0].clone();
let offsets: &[OffsetSize] = offsets.typed_data::<OffsetSize>();

// compute null bitmap (copy)
let null_bit_buffer = array.to_data().nulls().map(|b| b.buffer().clone());

// Gets slices of start and length arrays to access them directly for performance.
let start_data = start.to_data();
let length_data = length.to_data();
let starts = start_data.buffers()[0].typed_data::<i32>();
let lengths = length_data.buffers()[0].typed_data::<i32>();

// compute values
let array_data = array.to_data();
let values = &array_data.buffers()[1];
let data = values.as_slice();

// we have no way to estimate how much this will be.
let mut new_values = MutableBuffer::new(0);
let mut new_offsets: Vec<OffsetSize> = Vec::with_capacity(array.len() + 1);

let mut length_so_far = OffsetSize::zero();
new_offsets.push(length_so_far);
(0..array.len()).for_each(|i| {
// the length of this entry
let length_i: OffsetSize = offsets[i + 1] - offsets[i];
// compute where we should start slicing this entry
let start_pos: OffsetSize = f(starts[i]);

let start = offsets[i]
+ if start_pos >= OffsetSize::zero() {
start_pos
} else {
length_i + start_pos
};

let start = start.clamp(offsets[i], offsets[i + 1]);
// compute the length of the slice
let slice_length: OffsetSize = f(lengths[i].max(0)).min(offsets[i + 1] - start);

length_so_far += slice_length;

new_offsets.push(length_so_far);

// we need usize for ranges
let start = start.to_usize().unwrap();
let slice_length = slice_length.to_usize().unwrap();

new_values.extend_from_slice(&data[start..start + slice_length]);
});

let data = unsafe {
ArrayData::new_unchecked(
GenericStringArray::<OffsetSize>::DATA_TYPE,
array.len(),
None,
null_bit_buffer,
0,
vec![Buffer::from_slice_ref(&new_offsets), new_values.into()],
vec![],
)
};
make_array(data)
}
8 changes: 0 additions & 8 deletions native/core/src/execution/operators/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -525,12 +525,4 @@ impl InputBatch {

InputBatch::Batch(columns, num_rows)
}

/// Get the number of rows in this batch
fn num_rows(&self) -> usize {
match self {
Self::EOF => 0,
Self::Batch(_, num_rows) => *num_rows,
}
}
}
4 changes: 1 addition & 3 deletions native/core/src/execution/shuffle/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ use arrow_schema::{DataType, TimeUnit};

pub struct SparkUnsafeArray {
row_addr: i64,
row_size: i32,
num_elements: usize,
element_offset: i64,
}
Expand All @@ -45,7 +44,7 @@ impl SparkUnsafeObject for SparkUnsafeArray {

impl SparkUnsafeArray {
/// Creates a `SparkUnsafeArray` which points to the given address and size in bytes.
pub fn new(addr: i64, size: i32) -> Self {
pub fn new(addr: i64) -> Self {
// Read the number of elements from the first 8 bytes.
let slice: &[u8] = unsafe { std::slice::from_raw_parts(addr as *const u8, 8) };
let num_elements = i64::from_le_bytes(slice.try_into().unwrap());
Expand All @@ -60,7 +59,6 @@ impl SparkUnsafeArray {

Self {
row_addr: addr,
row_size: size,
num_elements: num_elements as usize,
element_offset: addr + Self::get_header_portion_in_bytes(num_elements),
}
Expand Down
17 changes: 3 additions & 14 deletions native/core/src/execution/shuffle/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@ use arrow_array::builder::{
use arrow_schema::{DataType, FieldRef, Fields, TimeUnit};

pub struct SparkUnsafeMap {
row_addr: i64,
row_size: i32,
pub(crate) keys: SparkUnsafeArray,
pub(crate) values: SparkUnsafeArray,
}
Expand Down Expand Up @@ -59,8 +57,8 @@ impl SparkUnsafeMap {
panic!("Negative value size in bytes of map: {}", value_array_size);
}

let keys = SparkUnsafeArray::new(addr + 8, key_array_size as i32);
let values = SparkUnsafeArray::new(addr + 8 + key_array_size, value_array_size);
let keys = SparkUnsafeArray::new(addr + 8);
let values = SparkUnsafeArray::new(addr + 8 + key_array_size);

if keys.get_num_elements() != values.get_num_elements() {
panic!(
Expand All @@ -70,16 +68,7 @@ impl SparkUnsafeMap {
);
}

Self {
row_addr: addr,
row_size: size,
keys,
values,
}
}

pub(crate) fn get_num_elements(&self) -> usize {
self.keys.get_num_elements()
Self { keys, values }
}
}

Expand Down
Loading

0 comments on commit 5c45fdc

Please sign in to comment.