Skip to content

Commit

Permalink
split: minor parallel_split optimizations; wordsmith output summary
Browse files Browse the repository at this point in the history
parallel_split()
- remove unnecessary var args, cloning self
- amortized `write_row` byterecord allocation for hot write loop

output summary now accounts for singular chunk
  • Loading branch information
jqnatividad committed Feb 6, 2024
1 parent 212140d commit f3d3861
Showing 1 changed file with 12 additions and 12 deletions.
24 changes: 12 additions & 12 deletions src/cmd/split.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ impl Args {

if !self.flag_quiet {
eprintln!(
"Wrote {} chunks to '{}'. Rows/chunk: {} Num records: {}",
"Wrote {} chunk/s to '{}'. Rows/chunk: {} Num records: {}",
nchunks + 1,
Path::new(&self.arg_outdir).canonicalize()?.display(),
chunk_size,
Expand All @@ -159,16 +159,15 @@ impl Args {
}

fn parallel_split(&self, idx: &Indexed<fs::File, fs::File>) -> CliResult<()> {
let args = self.clone();
let chunk_size;
let idx_count = idx.count();

#[allow(clippy::cast_precision_loss)]
let nchunks = if let Some(flag_chunks) = args.flag_chunks {
let nchunks = if let Some(flag_chunks) = self.flag_chunks {
chunk_size = (idx_count as f64 / flag_chunks as f64).ceil() as usize;
flag_chunks
} else {
chunk_size = args.flag_size;
chunk_size = self.flag_size;
util::num_of_chunks(idx_count as usize, self.flag_size)
};
if nchunks == 1 {
Expand All @@ -177,38 +176,39 @@ impl Args {
return self.sequential_split();
}

util::njobs(args.flag_jobs);
util::njobs(self.flag_jobs);

// safety: we cannot use ? here because we're in a closure
(0..nchunks).into_par_iter().for_each(|i| {
let conf = args.rconfig();
let conf = self.rconfig();
// safety: safe to unwrap because we know the file is indexed
let mut idx = conf.indexed().unwrap().unwrap();
// safety: the only way this can fail is if the file first row of the chunk
// is not a valid CSV record, which is impossible because we're reading
// from a file with a valid index
let headers = idx.byte_headers().unwrap();

let mut wtr = args
let mut wtr = self
// safety: the only way this can fail is if we cannot create a file
.new_writer(headers, i * chunk_size, args.flag_pad)
.new_writer(headers, i * chunk_size, self.flag_pad)
.unwrap();

// safety: we know that there is more than one chunk, so we can safely
// seek to the start of the chunk
idx.seek((i * chunk_size) as u64).unwrap();
let mut write_row;
for row in idx.byte_records().take(chunk_size) {
let row = row.unwrap();
wtr.write_byte_record(&row).unwrap();
write_row = row.unwrap();
wtr.write_byte_record(&write_row).unwrap();
}
// safety: safe to unwrap because we know the writer is a file
// the only way this can fail is if we cannot write to the file
wtr.flush().unwrap();
});

if !args.flag_quiet {
if !self.flag_quiet {
eprintln!(
"Wrote {} chunks to '{}'. Rows/chunk: {} Num records: {}",
"Wrote {} chunk/s to '{}'. Rows/chunk: {} Num records: {}",
nchunks,
Path::new(&self.arg_outdir).canonicalize()?.display(),
chunk_size,
Expand Down

0 comments on commit f3d3861

Please sign in to comment.