diff --git a/README.md b/README.md index a9005c3d6..4bb157a3a 100644 --- a/README.md +++ b/README.md @@ -84,7 +84,7 @@ | [sqlp](/src/cmd/sqlp.rs#L2)
✨🚀🐻‍❄️🗄️🪄 | Run [Polars](https://pola.rs) SQL queries against several CSVs - converting queries to blazing-fast [LazyFrame](https://docs.pola.rs/user-guide/lazy/using/) expressions, processing larger than memory CSV files. Query results can be saved in CSV, JSON, JSONL, Parquet, Apache Arrow IPC and Apache Avro formats. | | [stats](/src/cmd/stats.rs#L2)
📇🤯🏎️👆🪄 | Compute [summary statistics](https://en.wikipedia.org/wiki/Summary_statistics) (sum, min/max/range, sort order, min/max/sum/avg length, mean, standard error of the mean (SEM), stddev, variance, Coefficient of Variation (CV), nullcount, max precision, sparsity, quartiles, Interquartile Range (IQR), lower/upper fences, skewness, median, mode/s, antimode/s & cardinality) & make GUARANTEED data type inferences (Null, String, Float, Integer, Date, DateTime, Boolean) for each column in a CSV ([more info](https://github.com/jqnatividad/qsv/wiki/Supplemental#stats-command-output-explanation)).
Uses multithreading to go faster if an index is present (with an index, can compile "streaming" stats on NYC's 311 data (15gb, 28m rows) in less than 7.3 seconds!). | | [table](/src/cmd/table.rs#L2)
🤯 | Show aligned output of a CSV using [elastic tabstops](https://github.com/BurntSushi/tabwriter). To interactively view a CSV, use the `lens` command. | -| [template](/src/cmd/template.rs#L2)
| Renders a template using CSV data with the [MiniJinja](https://docs.rs/minijinja/latest/minijinja/) template engine. | +| [template](/src/cmd/template.rs#L2)
🚀 | Renders a template using CSV data with the [MiniJinja](https://docs.rs/minijinja/latest/minijinja/) template engine. | | [to](/src/cmd/to.rs#L2)
✨🚀🗄️ | Convert CSV files to [PostgreSQL](https://www.postgresql.org), [SQLite](https://www.sqlite.org/index.html), XLSX and [Data Package](https://datahub.io/docs/data-packages/tabular). | | [tojsonl](/src/cmd/tojsonl.rs#L3)
📇😣🚀🔣🪄 | Smartly converts CSV to a newline-delimited JSON ([JSONL](https://jsonlines.org/)/[NDJSON](http://ndjson.org/)). By scanning the CSV first, it "smartly" infers the appropriate JSON data type for each column. See `jsonl` command to convert JSONL to CSV. | | [transpose](/src/cmd/transpose.rs#L2)
🤯 | Transpose rows/columns of a CSV. | diff --git a/src/cmd/template.rs b/src/cmd/template.rs index 6e647ed0f..47b94b3d7 100644 --- a/src/cmd/template.rs +++ b/src/cmd/template.rs @@ -33,9 +33,14 @@ template options: Note that the QSV_ROWNO variable is also available in the context if you want to use it in the filename template. [default: QSV_ROWNO] - --customfilter-error The value to return when a custom filter returns an error. + --customfilter-error The value to return when a custom filter returns an error. Use "" to return an empty string. [default: ] + -j, --jobs The number of jobs to run in parallel. + When not set, the number of jobs is set to the number of CPUs detected. + -b, --batch The number of rows per batch to load into memory, before running in parallel. + Set to 0 to load all rows in one batch. + [default: 50000] Common options: -h, --help Display this message @@ -53,6 +58,10 @@ use std::{ }; use minijinja::Environment; +use rayon::{ + iter::{IndexedParallelIterator, ParallelIterator}, + prelude::IntoParallelRefIterator, +}; use serde::Deserialize; use crate::{ @@ -71,6 +80,8 @@ struct Args { flag_output: Option, flag_outfilename: String, flag_customfilter_error: String, + flag_jobs: Option, + flag_batch: usize, flag_delimiter: Option, flag_no_headers: bool, } @@ -90,7 +101,11 @@ pub fn run(argv: &[&str]) -> CliResult<()> { let template_content = match (args.flag_template_file, args.flag_template) { (Some(path), None) => fs::read_to_string(path)?, (None, Some(template)) => template, - _ => return fail_clierror!("Must provide either --template or --template-string"), + _ => { + return fail_incorrectusage_clierror!( + "Must provide either --template or --template-file" + ) + }, }; // Initialize FILTER_ERROR from args.flag_customfilter_error @@ -126,13 +141,14 @@ pub fn run(argv: &[&str]) -> CliResult<()> { let rconfig = Config::new(args.arg_input.as_ref()) .delimiter(args.flag_delimiter) .no_headers(args.flag_no_headers); - let mut rdr = rconfig.reader()?; + + // read headers let headers = if args.flag_no_headers { csv::StringRecord::new() } else { let headers = rdr.headers()?.clone(); - let sanitized_headers: Vec = headers + let mut sanitized_headers: Vec = headers .iter() .map(|h| { h.chars() @@ -140,12 +156,14 @@ pub fn run(argv: &[&str]) -> CliResult<()> { .collect() }) .collect(); + // add a column named QSV_ROWNO at the end + sanitized_headers.push(QSV_ROWNO.to_owned()); csv::StringRecord::from(sanitized_headers) }; // Set up output handling let output_to_dir = args.arg_outdir.is_some(); - let mut row_number = 0_u64; + let mut row_no = 0_u64; let mut rowcount = 0; // Create filename environment once if needed @@ -158,6 +176,7 @@ pub fn run(argv: &[&str]) -> CliResult<()> { None }; // Get width of rowcount for padding leading zeroes + // when rendering --outfilename let width = rowcount.to_string().len(); if output_to_dir { @@ -179,69 +198,126 @@ pub fn run(argv: &[&str]) -> CliResult<()> { }) }; - // amortize allocations - let mut curr_record = csv::StringRecord::new(); - #[allow(unused_assignments)] - let mut rendered = String::new(); + let num_jobs = util::njobs(args.flag_jobs); + let batchsize = util::optimal_batch_size(&rconfig, args.flag_batch, num_jobs); + + // reuse batch buffers #[allow(unused_assignments)] - let mut outfilename = String::new(); - let mut context = simd_json::owned::Object::default(); - - // Process each record - for record in rdr.records() { - row_number += 1; - curr_record.clone_from(&record?); - - if args.flag_no_headers { - // Use numeric, column 1-based indices (e.g. _c1, _c2, etc.) - for (i, field) in curr_record.iter().enumerate() { - context.insert( - format!("_c{}", i + 1), - simd_json::OwnedValue::String(field.to_owned()), - ); - } - } else { - // Use header names - for (header, field) in headers.iter().zip(curr_record.iter()) { - context.insert( - header.to_string(), - simd_json::OwnedValue::String(field.to_owned()), - ); + let mut batch_record = csv::StringRecord::new(); + let mut batch = Vec::with_capacity(batchsize); + let mut batch_results = Vec::with_capacity(batchsize); + + let no_headers = args.flag_no_headers; + + // main loop to read CSV and construct batches for parallel processing. + // each batch is processed via Rayon parallel iterator. + // loop exits when batch is empty. + 'batch_loop: loop { + for _ in 0..batchsize { + match rdr.read_record(&mut batch_record) { + Ok(has_data) => { + if has_data { + row_no += 1; + batch_record.push_field(itoa::Buffer::new().format(row_no)); + batch.push(std::mem::take(&mut batch_record)); + } else { + // nothing else to add to batch + break; + } + }, + Err(e) => { + return fail_clierror!("Error reading file: {e}"); + }, } } - // Always add row number to context - context.insert( - QSV_ROWNO.to_string(), - simd_json::OwnedValue::from(row_number), - ); - - // Render template with record data - rendered = template.render(&context)?; - - if output_to_dir { - outfilename = if args.flag_outfilename == QSV_ROWNO { - // Pad row number with required number of leading zeroes - format!("{row_number:0width$}.txt") - } else { - filename_env - .as_ref() - .unwrap() - .get_template("filename")? - .render(&context)? - }; - let outpath = std::path::Path::new(args.arg_outdir.as_ref().unwrap()).join(outfilename); - let mut writer = BufWriter::new(fs::File::create(outpath)?); - write!(writer, "{rendered}")?; - writer.flush()?; - } else if let Some(ref mut w) = wtr { - w.write_all(rendered.as_bytes())?; + + if batch.is_empty() { + // break out of infinite loop when at EOF + break 'batch_loop; } - context.clear(); - } - if let Some(mut w) = wtr { - w.flush()?; - } + // do actual template rendering via Rayon parallel iterator + batch + .par_iter() + .with_min_len(1024) + .map(|record| { + let curr_record = record; + + let mut context = simd_json::owned::Object::default(); + let mut row_number = 0_u64; + + if no_headers { + // Use numeric, column 1-based indices (e.g. _c1, _c2, etc.) + let headers_len = curr_record.len(); + + for (i, field) in curr_record.iter().enumerate() { + if i == headers_len - 1 { + // set the last field to QSV_ROWNO + row_number = atoi_simd::parse::(field.as_bytes()).unwrap(); + context.insert( + QSV_ROWNO.to_owned(), + simd_json::OwnedValue::String(field.to_owned()), + ); + } else { + context.insert( + format!("_c{}", i + 1), + simd_json::OwnedValue::String(field.to_owned()), + ); + } + } + } else { + // Use header names + for (header, field) in headers.iter().zip(curr_record.iter()) { + context.insert( + header.to_string(), + simd_json::OwnedValue::String(field.to_owned()), + ); + // when headers are defined, the last one is QSV_ROWNO + if header == QSV_ROWNO { + row_number = atoi_simd::parse::(field.as_bytes()).unwrap(); + } + } + } + + // Render template with record data + let rendered = template + .render(&context) + .unwrap_or_else(|_| "RENDERING ERROR".to_owned()); + + if output_to_dir { + let outfilename = if args.flag_outfilename == QSV_ROWNO { + // Pad row number with required number of leading zeroes + format!("{row_number:0width$}.txt") + } else { + filename_env + .as_ref() + .unwrap() + .get_template("filename") + .unwrap() + .render(&context) + .unwrap_or_else(|_| "FILENAME RENDERING ERROR".to_owned()) + }; + (outfilename, rendered) + } else { + (String::new(), rendered) + } + }) + .collect_into_vec(&mut batch_results); + + for result_record in &batch_results { + if output_to_dir { + let outpath = std::path::Path::new(args.arg_outdir.as_ref().unwrap()) + .join(result_record.0.clone()); + let mut writer = BufWriter::new(fs::File::create(outpath)?); + write!(writer, "{}", result_record.1)?; + writer.flush()?; + } else if let Some(ref mut w) = wtr { + w.write_all(result_record.1.as_bytes())?; + } + } + + batch.clear(); + } // end batch loop Ok(()) } diff --git a/tests/test_template.rs b/tests/test_template.rs index c3a14c831..3fc1cbeee 100644 --- a/tests/test_template.rs +++ b/tests/test_template.rs @@ -428,3 +428,75 @@ Alice is a minor. Bob is an adult."; assert_eq!(got, expected); } + +#[test] +fn template_render_error() { + let wrk = Workdir::new("template_render_error"); + wrk.create( + "data.csv", + vec![ + svec!["name", "age"], + svec!["Alice", "25"], + svec!["Bob", "30"], + ], + ); + + // Test invalid template syntax with default error message + let mut cmd = wrk.command("template"); + cmd.arg("--template") + .arg("Hello {{name}, invalid syntax!") + .arg("data.csv"); + + wrk.assert_err(&mut *&mut cmd); + let got: String = wrk.output_stderr(&mut cmd); + let expected = + "syntax error: unexpected `}}`, expected end of variable block (in template:1)\n"; + assert_eq!(got, expected); +} + +#[test] +fn template_filter_error() { + let wrk = Workdir::new("template_filter_error"); + wrk.create( + "data.csv", + vec![ + svec!["name", "amount"], + svec!["Alice", "not_a_number"], + svec!["Bob", "123.45"], + ], + ); + + // Test filter error with default error message + let mut cmd = wrk.command("template"); + cmd.arg("--template") + .arg("{{name}}: {{amount|format_float(2)}}\n\n") + .arg("data.csv"); + + let got: String = wrk.stdout(&mut cmd); + let expected = "Alice: \nBob: 123.45"; + assert_eq!(got, expected); + + // Test custom filter error message + let mut cmd = wrk.command("template"); + cmd.arg("--template") + .arg("{{name}}: {{amount|format_float(2)}}\n\n") + .arg("--customfilter-error") + .arg("INVALID NUMBER") + .arg("data.csv"); + + let got: String = wrk.stdout(&mut cmd); + let expected = "Alice: INVALID NUMBER\nBob: 123.45"; + assert_eq!(got, expected); + + // Test empty string as filter error + let mut cmd = wrk.command("template"); + cmd.arg("--template") + .arg("{{name}}: {{amount|format_float(2)}}\n\n") + .arg("--customfilter-error") + .arg("") + .arg("data.csv"); + + let got: String = wrk.stdout(&mut cmd); + let expected = "Alice: \nBob: 123.45"; + assert_eq!(got, expected); +}