Skip to content

Commit

Permalink
clippy (#1140)
Browse files Browse the repository at this point in the history
  • Loading branch information
parthchandra authored Dec 5, 2024
1 parent e0d8077 commit bf5a2c6
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 47 deletions.
6 changes: 3 additions & 3 deletions native/core/src/execution/datafusion/schema_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,9 +120,9 @@ impl SchemaAdapter for CometSchemaAdapter {

Ok((
Arc::new(SchemaMapping {
projected_table_schema: self.projected_table_schema.clone(),
projected_table_schema: Arc::<Schema>::clone(&self.projected_table_schema),
field_mappings,
table_schema: self.table_schema.clone(),
table_schema: Arc::<Schema>::clone(&self.table_schema),
}),
projection,
))
Expand Down Expand Up @@ -218,7 +218,7 @@ impl SchemaMapper for SchemaMapping {
// Necessary to handle empty batches
let options = RecordBatchOptions::new().with_row_count(Some(batch.num_rows()));

let schema = self.projected_table_schema.clone();
let schema = Arc::<Schema>::clone(&self.projected_table_schema);
let record_batch = RecordBatch::try_new_with_options(schema, cols, &options)?;
Ok(record_batch)
}
Expand Down
94 changes: 50 additions & 44 deletions native/core/src/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -621,8 +621,10 @@ fn get_batch_reader<'a>(handle: jlong) -> Result<&'a mut ParquetRecordBatchReade
Ok(&mut get_batch_context(handle)?.batch_reader)
}

/// # Safety
/// This function is inherently unsafe since it deals with raw pointers passed from JNI.
#[no_mangle]
pub extern "system" fn Java_org_apache_comet_parquet_Native_initRecordBatchReader(
pub unsafe extern "system" fn Java_org_apache_comet_parquet_Native_initRecordBatchReader(
e: JNIEnv,
_jclass: JClass,
file_path: jstring,
Expand All @@ -646,62 +648,66 @@ pub extern "system" fn Java_org_apache_comet_parquet_Native_initRecordBatchReade
.unwrap()
.with_batch_size(8192); // TODO: (ARROW NATIVE) Use batch size configured in JVM

let num_row_groups;
let mut total_rows: i64 = 0;
//TODO: (ARROW NATIVE) if we can get the ParquetMetadata serialized, we need not do this.
let metadata = builder.metadata().clone();

let mut columns_to_read: Vec<usize> = Vec::new();
let columns_to_read_array = JObjectArray::from_raw(required_columns);
let array_len = env.get_array_length(&columns_to_read_array)?;
let mut required_columns: Vec<String> = Vec::new();
for i in 0..array_len {
let p: JString = env
.get_object_array_element(&columns_to_read_array, i)?
.into();
required_columns.push(env.get_string(&p)?.into());
}
for (i, col) in metadata
.file_metadata()
.schema_descr()
.columns()
.iter()
.enumerate()
{
for (_, required) in required_columns.iter().enumerate() {
if col.name().to_uppercase().eq(&required.to_uppercase()) {
columns_to_read.push(i);
break;
let metadata = builder.metadata();

let mut columns_to_read: Vec<usize> = Vec::new();
let columns_to_read_array = JObjectArray::from_raw(required_columns);
let array_len = env.get_array_length(&columns_to_read_array)?;
let mut required_columns: Vec<String> = Vec::new();
for i in 0..array_len {
let p: JString = env
.get_object_array_element(&columns_to_read_array, i)?
.into();
required_columns.push(env.get_string(&p)?.into());
}
for (i, col) in metadata
.file_metadata()
.schema_descr()
.columns()
.iter()
.enumerate()
{
for required in required_columns.iter() {
if col.name().to_uppercase().eq(&required.to_uppercase()) {
columns_to_read.push(i);
break;
}
}
}
}
//TODO: (ARROW NATIVE) make this work for complex types (especially deeply nested structs)
let mask = ProjectionMask::leaves(metadata.file_metadata().schema_descr(), columns_to_read);
// Set projection mask to read only root columns 1 and 2.
builder = builder.with_projection(mask);

let mut row_groups_to_read: Vec<usize> = Vec::new();
let mut total_rows: i64 = 0;
// get row groups -
for (i, rg) in metadata.row_groups().into_iter().enumerate() {
let rg_start = rg.file_offset().unwrap();
let rg_end = rg_start + rg.compressed_size();
if rg_start >= start && rg_end <= start + length {
row_groups_to_read.push(i);
total_rows += rg.num_rows();
//TODO: (ARROW NATIVE) make this work for complex types (especially deeply nested structs)
let mask =
ProjectionMask::leaves(metadata.file_metadata().schema_descr(), columns_to_read);
// Set projection mask to read only root columns 1 and 2.

let mut row_groups_to_read: Vec<usize> = Vec::new();
// get row groups -
for (i, rg) in metadata.row_groups().iter().enumerate() {
let rg_start = rg.file_offset().unwrap();
let rg_end = rg_start + rg.compressed_size();
if rg_start >= start && rg_end <= start + length {
row_groups_to_read.push(i);
total_rows += rg.num_rows();
}
}
num_row_groups = row_groups_to_read.len();
builder = builder
.with_projection(mask)
.with_row_groups(row_groups_to_read.clone())
}

// Build a sync parquet reader.
let batch_reader = builder
.with_row_groups(row_groups_to_read.clone())
.build()
.unwrap();
let batch_reader = builder.build().unwrap();

let ctx = BatchContext {
batch_reader,
current_batch: None,
reader_state: ParquetReaderState::Init,
num_row_groups: row_groups_to_read.len() as i32,
total_rows: total_rows,
num_row_groups: num_row_groups as i32,
total_rows,
};
let res = Box::new(ctx);
Ok(Box::into_raw(res) as i64)
Expand Down

0 comments on commit bf5a2c6

Please sign in to comment.