Skip to content

Commit

Permalink
frequency: performance refactor
Browse files Browse the repository at this point in the history
parallel_ftables()
- compute idx.count() once and store in var
- compute njobs once and store in var

ftables()
- rename some vars for readability
- use unsafe vec index access to avoid unnecessary bounds checking
- remove unnecessary Result wrap

- add safety comments
  • Loading branch information
jqnatividad committed Jan 29, 2024
1 parent ed111b6 commit 1a3a4b4
Showing 1 changed file with 40 additions and 23 deletions.
63 changes: 40 additions & 23 deletions src/cmd/frequency.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ use crate::{
CliResult,
};

#[allow(clippy::unsafe_derive_deserialize)]
#[derive(Clone, Deserialize)]
pub struct Args {
pub arg_input: Option<String>,
Expand Down Expand Up @@ -153,7 +154,7 @@ impl Args {
pub fn sequential_ftables(&self) -> CliResult<(Headers, FTables)> {
let mut rdr = self.rconfig().reader()?;
let (headers, sel) = self.sel_headers(&mut rdr)?;
Ok((headers, self.ftables(&sel, rdr.byte_records())?))
Ok((headers, self.ftables(&sel, rdr.byte_records())))
}

pub fn parallel_ftables(
Expand All @@ -163,84 +164,100 @@ impl Args {
let mut rdr = self.rconfig().reader()?;
let (headers, sel) = self.sel_headers(&mut rdr)?;

if idx.count() == 0 {
let idx_count = idx.count() as usize;
if idx_count == 0 {
return Ok((headers, vec![]));
}

let chunk_size = util::chunk_size(idx.count() as usize, util::njobs(self.flag_jobs));
let nchunks = util::num_of_chunks(idx.count() as usize, chunk_size);
let njobs = util::njobs(self.flag_jobs);
let chunk_size = util::chunk_size(idx_count, njobs);
let nchunks = util::num_of_chunks(idx_count, chunk_size);

let pool = ThreadPool::new(util::njobs(self.flag_jobs));
let pool = ThreadPool::new(njobs);
let (send, recv) = channel::bounded(0);
for i in 0..nchunks {
let (send, args, sel) = (send.clone(), self.clone(), sel.clone());
pool.execute(move || {
// safety: we know the file is indexed and seekable
let mut idx = args.rconfig().indexed().unwrap().unwrap();
idx.seek((i * chunk_size) as u64).unwrap();
let it = idx.byte_records().take(chunk_size);
send.send(args.ftables(&sel, it).unwrap()).unwrap();
send.send(args.ftables(&sel, it)).unwrap();
});
}
drop(send);
Ok((headers, merge_all(recv.iter()).unwrap()))
}

#[inline]
fn ftables<I>(&self, sel: &Selection, it: I) -> CliResult<FTables>
fn ftables<I>(&self, sel: &Selection, it: I) -> FTables
where
I: Iterator<Item = csv::Result<csv::ByteRecord>>,
{
let null = &b""[..].to_vec();
let nsel = sel.normal();
let nsel_len = nsel.len();
let mut tabs: Vec<_> = (0..nsel_len).map(|_| Frequencies::new()).collect();
let mut freq_tables: Vec<_> = (0..nsel_len).map(|_| Frequencies::new()).collect();

#[allow(unused_assignments)]
// amortize allocations
let mut field_work: Vec<u8> = Vec::with_capacity(nsel_len);
let mut row_work: csv::ByteRecord = csv::ByteRecord::with_capacity(200, nsel_len);
let mut field_buffer: Vec<u8> = Vec::with_capacity(nsel_len);
let mut row_buffer: csv::ByteRecord = csv::ByteRecord::with_capacity(200, nsel_len);

let flag_no_nulls = self.flag_no_nulls;
if self.flag_ignore_case {
let mut buf = String::new();
// safety: we do get_unchecked_mut on freq_tables
// as we know that nsel_len is the same as freq_tables.len()
// so we can skip the bounds check
for row in it {
row_work.clone_from(&row?);
for (i, field) in nsel.select(row_work.into_iter()).enumerate() {
field_work = {
// safety: we know the row is not empty
row_buffer.clone_from(&row.unwrap());
for (i, field) in nsel.select(row_buffer.into_iter()).enumerate() {
field_buffer = {
if let Ok(s) = simdutf8::basic::from_utf8(field) {
util::to_lowercase_into(s.trim(), &mut buf);
buf.as_bytes().to_vec()
} else {
field.to_vec()
}
};
if !field_work.is_empty() {
tabs[i].add(field_work);
if !field_buffer.is_empty() {
unsafe {
freq_tables.get_unchecked_mut(i).add(field_buffer);
}
} else if !flag_no_nulls {
tabs[i].add(null.clone());
unsafe {
freq_tables.get_unchecked_mut(i).add(null.clone());
}
}
}
}
} else {
for row in it {
row_work.clone_from(&row?);
for (i, field) in nsel.select(row_work.into_iter()).enumerate() {
field_work = {
// safety: we know the row is not empty
row_buffer.clone_from(&row.unwrap());
for (i, field) in nsel.select(row_buffer.into_iter()).enumerate() {
field_buffer = {
if let Ok(s) = simdutf8::basic::from_utf8(field) {
s.trim().as_bytes().to_vec()
} else {
field.to_vec()
}
};
if !field_work.is_empty() {
tabs[i].add(field_work);
if !field_buffer.is_empty() {
unsafe {
freq_tables.get_unchecked_mut(i).add(field_buffer);
}
} else if !flag_no_nulls {
tabs[i].add(null.clone());
unsafe {
freq_tables.get_unchecked_mut(i).add(null.clone());
}
}
}
}
}
Ok(tabs)
freq_tables
}

fn sel_headers<R: io::Read>(
Expand Down

0 comments on commit 1a3a4b4

Please sign in to comment.