diff --git a/src/cmd/split.rs b/src/cmd/split.rs index d945aa644..5cf6befd3 100644 --- a/src/cmd/split.rs +++ b/src/cmd/split.rs @@ -1,6 +1,8 @@ static USAGE: &str = r#" Splits the given CSV data into chunks. +Uses multithreading to go faster if the given CSV data has an index. + The files are written to the directory given with the name '{start}.csv', where {start} is the index of the first record of the chunk (starting at 0). @@ -11,12 +13,16 @@ Examples: qsv split . -s 100 input.csv - cat in.csv | qsv split outdir -s 1000 + cat in.csv | qsv split outdir -s 1000 + + qsv split outdir --chunks 10 input.csv + + qsv split outdir -c 10 -j 4 input.csv For more examples, see https://github.com/jqnatividad/qsv/blob/master/tests/test_split.rs. Usage: - qsv split [options] [] + qsv split [options] (--size | --chunks ) [] qsv split --help split arguments: @@ -28,6 +34,14 @@ split arguments: split options: -s, --size The number of records to write into each chunk. [default: 500] + -c, --chunks The number of chunks to split the data into. + This option is mutually exclusive with --size. + The number of rows in each chunk is determined by + the number of records in the CSV data and the number + of desired chunks. If the number of records is not evenly + divisible by the number of chunks, the last chunk will + have fewer records. + -j, --jobs The number of splitting jobs to run in parallel. This only works when the given CSV data has an index already created. Note that a file handle @@ -69,6 +83,7 @@ struct Args { arg_input: Option, arg_outdir: String, flag_size: usize, + flag_chunks: Option, flag_jobs: Option, flag_filename: FilenameTemplate, flag_pad: usize, @@ -101,11 +116,22 @@ impl Args { let mut rdr = rconfig.reader()?; let headers = rdr.byte_headers()?.clone(); + let chunk_size = if self.flag_chunks.is_some() { + let count = util::count_rows(&rconfig)?; + let chunk = self.flag_chunks.unwrap(); + if chunk == 0 { + return fail_incorrectusage_clierror!("--chunk must be greater than 0."); + } + (count as f64 / chunk as f64).ceil() as usize + } else { + self.flag_size + }; + let mut wtr = self.new_writer(&headers, 0, self.flag_pad)?; let mut i = 0; let mut row = csv::ByteRecord::new(); while rdr.read_byte_record(&mut row)? { - if i > 0 && i % self.flag_size == 0 { + if i > 0 && i % chunk_size == 0 { wtr.flush()?; wtr = self.new_writer(&headers, i, self.flag_pad)?; } @@ -117,13 +143,21 @@ impl Args { } fn parallel_split(&self, idx: &Indexed) -> CliResult<()> { - let nchunks = util::num_of_chunks(idx.count() as usize, self.flag_size); + let args = self.clone(); + let chunk_size; + let nchunks = if let Some(flag_chunks) = args.flag_chunks { + chunk_size = idx.count() as usize / flag_chunks; + flag_chunks + } else { + chunk_size = args.flag_size; + util::num_of_chunks(idx.count() as usize, self.flag_size) + }; if nchunks == 1 { // there's only one chunk, we can just do a sequential split // which has less overhead and better error handling return self.sequential_split(); } - let args = self.clone(); + util::njobs(args.flag_jobs); // safety: we cannot use ? here because we're in a closure @@ -138,13 +172,13 @@ impl Args { let mut wtr = args // safety: the only way this can fail is if we cannot create a file - .new_writer(headers, i * args.flag_size, args.flag_pad) + .new_writer(headers, i * chunk_size, args.flag_pad) .unwrap(); // safety: we know that there is more than one chunk, so we can safely // seek to the start of the chunk - idx.seek((i * args.flag_size) as u64).unwrap(); - for row in idx.byte_records().take(args.flag_size) { + idx.seek((i * chunk_size) as u64).unwrap(); + for row in idx.byte_records().take(chunk_size) { let row = row.unwrap(); wtr.write_byte_record(&row).unwrap(); } diff --git a/tests/test_split.rs b/tests/test_split.rs index deaf73e46..b0d21f55b 100644 --- a/tests/test_split.rs +++ b/tests/test_split.rs @@ -77,6 +77,47 @@ k,l assert!(!wrk.path("6.csv").exists()); } +#[test] +fn split_chunks() { + let wrk = Workdir::new("split_chunks"); + wrk.create("in.csv", data(true)); + + let mut cmd = wrk.command("split"); + cmd.args(["--chunks", "3"]) + .arg(&wrk.path(".")) + .arg("in.csv"); + wrk.run(&mut cmd); + + split_eq!( + wrk, + "0.csv", + "\ +h1,h2 +a,b +c,d +" + ); + split_eq!( + wrk, + "2.csv", + "\ +h1,h2 +e,f +g,h +" + ); + split_eq!( + wrk, + "4.csv", + "\ +h1,h2 +i,j +k,l +" + ); + assert!(!wrk.path("6.csv").exists()); +} + #[test] fn split_a_lot() { let wrk = Workdir::new("split_a_lot"); @@ -174,6 +215,49 @@ k,l assert!(!wrk.path("0006.csv").exists()); } +#[test] +fn split_chunks_padding() { + let wrk = Workdir::new("split_chunks_padding"); + wrk.create("in.csv", data(true)); + + let mut cmd = wrk.command("split"); + cmd.args(["--chunks", "3"]) + .arg("--pad") + .arg("4") + .arg(&wrk.path(".")) + .arg("in.csv"); + wrk.run(&mut cmd); + + split_eq!( + wrk, + "0000.csv", + "\ +h1,h2 +a,b +c,d +" + ); + split_eq!( + wrk, + "0002.csv", + "\ +h1,h2 +e,f +g,h +" + ); + split_eq!( + wrk, + "0004.csv", + "\ +h1,h2 +i,j +k,l +" + ); + assert!(!wrk.path("0006.csv").exists()); +} + #[test] fn split_idx() { let wrk = Workdir::new("split_idx"); @@ -213,6 +297,47 @@ k,l assert!(!wrk.path("6.csv").exists()); } +#[test] +fn split_chunks_idx() { + let wrk = Workdir::new("split_chunks_idx"); + wrk.create_indexed("in.csv", data(true)); + + let mut cmd = wrk.command("split"); + cmd.args(["--chunks", "3"]) + .arg(&wrk.path(".")) + .arg("in.csv"); + wrk.run(&mut cmd); + + split_eq!( + wrk, + "0.csv", + "\ +h1,h2 +a,b +c,d +" + ); + split_eq!( + wrk, + "2.csv", + "\ +h1,h2 +e,f +g,h +" + ); + split_eq!( + wrk, + "4.csv", + "\ +h1,h2 +i,j +k,l +" + ); + assert!(!wrk.path("6.csv").exists()); +} + #[test] fn split_no_headers() { let wrk = Workdir::new("split_no_headers"); @@ -250,6 +375,43 @@ k,l ); } +#[test] +fn split_chunks_no_headers() { + let wrk = Workdir::new("split_chunks_no_headers"); + wrk.create("in.csv", data(false)); + + let mut cmd = wrk.command("split"); + cmd.args(["--no-headers", "--chunks", "3"]) + .arg(&wrk.path(".")) + .arg("in.csv"); + wrk.run(&mut cmd); + + split_eq!( + wrk, + "0.csv", + "\ +a,b +c,d +" + ); + split_eq!( + wrk, + "2.csv", + "\ +e,f +g,h +" + ); + split_eq!( + wrk, + "4.csv", + "\ +i,j +k,l +" + ); +} + #[test] fn split_no_headers_idx() { let wrk = Workdir::new("split_no_headers_idx"); @@ -287,6 +449,43 @@ k,l ); } +#[test] +fn split_chunks_no_headers_idx() { + let wrk = Workdir::new("split_chunks_no_headers_idx"); + wrk.create_indexed("in.csv", data(false)); + + let mut cmd = wrk.command("split"); + cmd.args(["--no-headers", "--chunks", "3"]) + .arg(&wrk.path(".")) + .arg("in.csv"); + wrk.run(&mut cmd); + + split_eq!( + wrk, + "0.csv", + "\ +a,b +c,d +" + ); + split_eq!( + wrk, + "2.csv", + "\ +e,f +g,h +" + ); + split_eq!( + wrk, + "4.csv", + "\ +i,j +k,l +" + ); +} + #[test] fn split_one() { let wrk = Workdir::new("split_one"); @@ -436,6 +635,68 @@ k,l ); } +#[test] +fn split_chunks_a_lot() { + let wrk = Workdir::new("split_chunks_a_lot"); + wrk.create("in.csv", data(true)); + + let mut cmd = wrk.command("split"); + cmd.args(["--chunks", "10"]) + .arg(&wrk.path(".")) + .arg("in.csv"); + wrk.run(&mut cmd); + + split_eq!( + wrk, + "0.csv", + "\ +h1,h2 +a,b +" + ); + split_eq!( + wrk, + "1.csv", + "\ +h1,h2 +c,d +" + ); + split_eq!( + wrk, + "2.csv", + "\ +h1,h2 +e,f +" + ); + split_eq!( + wrk, + "3.csv", + "\ +h1,h2 +g,h +" + ); + split_eq!( + wrk, + "4.csv", + "\ +h1,h2 +i,j +" + ); + split_eq!( + wrk, + "5.csv", + "\ +h1,h2 +k,l +" + ); + assert!(!wrk.path("6.csv").exists()); +} + #[test] fn split_uneven_idx() { let wrk = Workdir::new("split_uneven_idx");