Skip to content

Commit

Permalink
split: added --chunks option
Browse files Browse the repository at this point in the history
apart from specifying how many rows per chunk with `--size`, you can now also use the mutually exclusive `--chunks` option to specify the number of chunks and `split` will determine the chunk size.
  • Loading branch information
jqnatividad committed Feb 4, 2024
1 parent ff769b7 commit 0cda137
Show file tree
Hide file tree
Showing 2 changed files with 303 additions and 8 deletions.
50 changes: 42 additions & 8 deletions src/cmd/split.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
static USAGE: &str = r#"
Splits the given CSV data into chunks.
Uses multithreading to go faster if the given CSV data has an index.
The files are written to the directory given with the name '{start}.csv',
where {start} is the index of the first record of the chunk (starting at 0).
Expand All @@ -11,12 +13,16 @@ Examples:
qsv split . -s 100 input.csv
cat in.csv | qsv split outdir -s 1000
cat in.csv | qsv split outdir -s 1000
qsv split outdir --chunks 10 input.csv
qsv split outdir -c 10 -j 4 input.csv
For more examples, see https://github.com/jqnatividad/qsv/blob/master/tests/test_split.rs.
Usage:
qsv split [options] <outdir> [<input>]
qsv split [options] (--size <arg> | --chunks <arg>) <outdir> [<input>]
qsv split --help
split arguments:
Expand All @@ -28,6 +34,14 @@ split arguments:
split options:
-s, --size <arg> The number of records to write into each chunk.
[default: 500]
-c, --chunks <arg> The number of chunks to split the data into.
This option is mutually exclusive with --size.
The number of rows in each chunk is determined by
the number of records in the CSV data and the number
of desired chunks. If the number of records is not evenly
divisible by the number of chunks, the last chunk will
have fewer records.
-j, --jobs <arg> The number of splitting jobs to run in parallel.
This only works when the given CSV data has
an index already created. Note that a file handle
Expand Down Expand Up @@ -69,6 +83,7 @@ struct Args {
arg_input: Option<String>,
arg_outdir: String,
flag_size: usize,
flag_chunks: Option<usize>,
flag_jobs: Option<usize>,
flag_filename: FilenameTemplate,
flag_pad: usize,
Expand Down Expand Up @@ -101,11 +116,22 @@ impl Args {
let mut rdr = rconfig.reader()?;
let headers = rdr.byte_headers()?.clone();

let chunk_size = if self.flag_chunks.is_some() {
let count = util::count_rows(&rconfig)?;
let chunk = self.flag_chunks.unwrap();
if chunk == 0 {
return fail_incorrectusage_clierror!("--chunk must be greater than 0.");
}
(count as f64 / chunk as f64).ceil() as usize
} else {
self.flag_size
};

let mut wtr = self.new_writer(&headers, 0, self.flag_pad)?;
let mut i = 0;
let mut row = csv::ByteRecord::new();
while rdr.read_byte_record(&mut row)? {
if i > 0 && i % self.flag_size == 0 {
if i > 0 && i % chunk_size == 0 {
wtr.flush()?;
wtr = self.new_writer(&headers, i, self.flag_pad)?;
}
Expand All @@ -117,13 +143,21 @@ impl Args {
}

fn parallel_split(&self, idx: &Indexed<fs::File, fs::File>) -> CliResult<()> {
let nchunks = util::num_of_chunks(idx.count() as usize, self.flag_size);
let args = self.clone();
let chunk_size;
let nchunks = if let Some(flag_chunks) = args.flag_chunks {
chunk_size = idx.count() as usize / flag_chunks;
flag_chunks
} else {
chunk_size = args.flag_size;
util::num_of_chunks(idx.count() as usize, self.flag_size)
};
if nchunks == 1 {
// there's only one chunk, we can just do a sequential split
// which has less overhead and better error handling
return self.sequential_split();
}
let args = self.clone();

util::njobs(args.flag_jobs);

// safety: we cannot use ? here because we're in a closure
Expand All @@ -138,13 +172,13 @@ impl Args {

let mut wtr = args
// safety: the only way this can fail is if we cannot create a file
.new_writer(headers, i * args.flag_size, args.flag_pad)
.new_writer(headers, i * chunk_size, args.flag_pad)
.unwrap();

// safety: we know that there is more than one chunk, so we can safely
// seek to the start of the chunk
idx.seek((i * args.flag_size) as u64).unwrap();
for row in idx.byte_records().take(args.flag_size) {
idx.seek((i * chunk_size) as u64).unwrap();
for row in idx.byte_records().take(chunk_size) {
let row = row.unwrap();
wtr.write_byte_record(&row).unwrap();
}
Expand Down
Loading

0 comments on commit 0cda137

Please sign in to comment.