Skip to content

Commit

Permalink
Merge pull request #2273 from jqnatividad/parallelized_template
Browse files Browse the repository at this point in the history
Parallelized template
  • Loading branch information
jqnatividad authored Nov 7, 2024
2 parents 0cfddbb + 309804c commit 36ccc4c
Show file tree
Hide file tree
Showing 3 changed files with 212 additions and 64 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@
| [sqlp](/src/cmd/sqlp.rs#L2)<br>✨🚀🐻‍❄️🗄️🪄 | Run [Polars](https://pola.rs) SQL queries against several CSVs - converting queries to blazing-fast [LazyFrame](https://docs.pola.rs/user-guide/lazy/using/) expressions, processing larger than memory CSV files. Query results can be saved in CSV, JSON, JSONL, Parquet, Apache Arrow IPC and Apache Avro formats. |
| [stats](/src/cmd/stats.rs#L2)<br>📇🤯🏎️👆🪄 | Compute [summary statistics](https://en.wikipedia.org/wiki/Summary_statistics) (sum, min/max/range, sort order, min/max/sum/avg length, mean, standard error of the mean (SEM), stddev, variance, Coefficient of Variation (CV), nullcount, max precision, sparsity, quartiles, Interquartile Range (IQR), lower/upper fences, skewness, median, mode/s, antimode/s & cardinality) & make GUARANTEED data type inferences (Null, String, Float, Integer, Date, DateTime, Boolean) for each column in a CSV ([more info](https://github.com/jqnatividad/qsv/wiki/Supplemental#stats-command-output-explanation)).<br>Uses multithreading to go faster if an index is present (with an index, can compile "streaming" stats on NYC's 311 data (15gb, 28m rows) in less than 7.3 seconds!). |
| [table](/src/cmd/table.rs#L2)<br>🤯 | Show aligned output of a CSV using [elastic tabstops](https://github.com/BurntSushi/tabwriter). To interactively view a CSV, use the `lens` command. |
| [template](/src/cmd/template.rs#L2)<br> | Renders a template using CSV data with the [MiniJinja](https://docs.rs/minijinja/latest/minijinja/) template engine. |
| [template](/src/cmd/template.rs#L2)<br>🚀 | Renders a template using CSV data with the [MiniJinja](https://docs.rs/minijinja/latest/minijinja/) template engine. |
| [to](/src/cmd/to.rs#L2)<br>✨🚀🗄️ | Convert CSV files to [PostgreSQL](https://www.postgresql.org), [SQLite](https://www.sqlite.org/index.html), XLSX and [Data Package](https://datahub.io/docs/data-packages/tabular). |
| [tojsonl](/src/cmd/tojsonl.rs#L3)<br>📇😣🚀🔣🪄 | Smartly converts CSV to a newline-delimited JSON ([JSONL](https://jsonlines.org/)/[NDJSON](http://ndjson.org/)). By scanning the CSV first, it "smartly" infers the appropriate JSON data type for each column. See `jsonl` command to convert JSONL to CSV. |
| [transpose](/src/cmd/transpose.rs#L2)<br>🤯 | Transpose rows/columns of a CSV. |
Expand Down
202 changes: 139 additions & 63 deletions src/cmd/template.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,14 @@ template options:
Note that the QSV_ROWNO variable is also available in the context
if you want to use it in the filename template.
[default: QSV_ROWNO]
--customfilter-error <arg> The value to return when a custom filter returns an error.
--customfilter-error <msg> The value to return when a custom filter returns an error.
Use "<empty string>" to return an empty string.
[default: <FILTER_ERROR>]
-j, --jobs <arg> The number of jobs to run in parallel.
When not set, the number of jobs is set to the number of CPUs detected.
-b, --batch <size> The number of rows per batch to load into memory, before running in parallel.
Set to 0 to load all rows in one batch.
[default: 50000]
Common options:
-h, --help Display this message
Expand All @@ -53,6 +58,10 @@ use std::{
};

use minijinja::Environment;
use rayon::{
iter::{IndexedParallelIterator, ParallelIterator},
prelude::IntoParallelRefIterator,
};
use serde::Deserialize;

use crate::{
Expand All @@ -71,6 +80,8 @@ struct Args {
flag_output: Option<String>,
flag_outfilename: String,
flag_customfilter_error: String,
flag_jobs: Option<usize>,
flag_batch: usize,
flag_delimiter: Option<Delimiter>,
flag_no_headers: bool,
}
Expand All @@ -90,7 +101,11 @@ pub fn run(argv: &[&str]) -> CliResult<()> {
let template_content = match (args.flag_template_file, args.flag_template) {
(Some(path), None) => fs::read_to_string(path)?,
(None, Some(template)) => template,
_ => return fail_clierror!("Must provide either --template or --template-string"),
_ => {
return fail_incorrectusage_clierror!(
"Must provide either --template or --template-file"
)
},
};

// Initialize FILTER_ERROR from args.flag_customfilter_error
Expand Down Expand Up @@ -126,26 +141,29 @@ pub fn run(argv: &[&str]) -> CliResult<()> {
let rconfig = Config::new(args.arg_input.as_ref())
.delimiter(args.flag_delimiter)
.no_headers(args.flag_no_headers);

let mut rdr = rconfig.reader()?;

// read headers
let headers = if args.flag_no_headers {
csv::StringRecord::new()
} else {
let headers = rdr.headers()?.clone();
let sanitized_headers: Vec<String> = headers
let mut sanitized_headers: Vec<String> = headers
.iter()
.map(|h| {
h.chars()
.map(|c| if c.is_alphanumeric() { c } else { '_' })
.collect()
})
.collect();
// add a column named QSV_ROWNO at the end
sanitized_headers.push(QSV_ROWNO.to_owned());
csv::StringRecord::from(sanitized_headers)
};

// Set up output handling
let output_to_dir = args.arg_outdir.is_some();
let mut row_number = 0_u64;
let mut row_no = 0_u64;
let mut rowcount = 0;

// Create filename environment once if needed
Expand All @@ -158,6 +176,7 @@ pub fn run(argv: &[&str]) -> CliResult<()> {
None
};
// Get width of rowcount for padding leading zeroes
// when rendering --outfilename
let width = rowcount.to_string().len();

if output_to_dir {
Expand All @@ -179,69 +198,126 @@ pub fn run(argv: &[&str]) -> CliResult<()> {
})
};

// amortize allocations
let mut curr_record = csv::StringRecord::new();
#[allow(unused_assignments)]
let mut rendered = String::new();
let num_jobs = util::njobs(args.flag_jobs);
let batchsize = util::optimal_batch_size(&rconfig, args.flag_batch, num_jobs);

// reuse batch buffers
#[allow(unused_assignments)]
let mut outfilename = String::new();
let mut context = simd_json::owned::Object::default();

// Process each record
for record in rdr.records() {
row_number += 1;
curr_record.clone_from(&record?);

if args.flag_no_headers {
// Use numeric, column 1-based indices (e.g. _c1, _c2, etc.)
for (i, field) in curr_record.iter().enumerate() {
context.insert(
format!("_c{}", i + 1),
simd_json::OwnedValue::String(field.to_owned()),
);
}
} else {
// Use header names
for (header, field) in headers.iter().zip(curr_record.iter()) {
context.insert(
header.to_string(),
simd_json::OwnedValue::String(field.to_owned()),
);
let mut batch_record = csv::StringRecord::new();
let mut batch = Vec::with_capacity(batchsize);
let mut batch_results = Vec::with_capacity(batchsize);

let no_headers = args.flag_no_headers;

// main loop to read CSV and construct batches for parallel processing.
// each batch is processed via Rayon parallel iterator.
// loop exits when batch is empty.
'batch_loop: loop {
for _ in 0..batchsize {
match rdr.read_record(&mut batch_record) {
Ok(has_data) => {
if has_data {
row_no += 1;
batch_record.push_field(itoa::Buffer::new().format(row_no));
batch.push(std::mem::take(&mut batch_record));
} else {
// nothing else to add to batch
break;
}
},
Err(e) => {
return fail_clierror!("Error reading file: {e}");
},
}
}
// Always add row number to context
context.insert(
QSV_ROWNO.to_string(),
simd_json::OwnedValue::from(row_number),
);

// Render template with record data
rendered = template.render(&context)?;

if output_to_dir {
outfilename = if args.flag_outfilename == QSV_ROWNO {
// Pad row number with required number of leading zeroes
format!("{row_number:0width$}.txt")
} else {
filename_env
.as_ref()
.unwrap()
.get_template("filename")?
.render(&context)?
};
let outpath = std::path::Path::new(args.arg_outdir.as_ref().unwrap()).join(outfilename);
let mut writer = BufWriter::new(fs::File::create(outpath)?);
write!(writer, "{rendered}")?;
writer.flush()?;
} else if let Some(ref mut w) = wtr {
w.write_all(rendered.as_bytes())?;

if batch.is_empty() {
// break out of infinite loop when at EOF
break 'batch_loop;
}
context.clear();
}

if let Some(mut w) = wtr {
w.flush()?;
}
// do actual template rendering via Rayon parallel iterator
batch
.par_iter()
.with_min_len(1024)
.map(|record| {
let curr_record = record;

let mut context = simd_json::owned::Object::default();
let mut row_number = 0_u64;

if no_headers {
// Use numeric, column 1-based indices (e.g. _c1, _c2, etc.)
let headers_len = curr_record.len();

for (i, field) in curr_record.iter().enumerate() {
if i == headers_len - 1 {
// set the last field to QSV_ROWNO
row_number = atoi_simd::parse::<u64>(field.as_bytes()).unwrap();
context.insert(
QSV_ROWNO.to_owned(),
simd_json::OwnedValue::String(field.to_owned()),
);
} else {
context.insert(
format!("_c{}", i + 1),
simd_json::OwnedValue::String(field.to_owned()),
);
}
}
} else {
// Use header names
for (header, field) in headers.iter().zip(curr_record.iter()) {
context.insert(
header.to_string(),
simd_json::OwnedValue::String(field.to_owned()),
);
// when headers are defined, the last one is QSV_ROWNO
if header == QSV_ROWNO {
row_number = atoi_simd::parse::<u64>(field.as_bytes()).unwrap();
}
}
}

// Render template with record data
let rendered = template
.render(&context)
.unwrap_or_else(|_| "RENDERING ERROR".to_owned());

if output_to_dir {
let outfilename = if args.flag_outfilename == QSV_ROWNO {
// Pad row number with required number of leading zeroes
format!("{row_number:0width$}.txt")
} else {
filename_env
.as_ref()
.unwrap()
.get_template("filename")
.unwrap()
.render(&context)
.unwrap_or_else(|_| "FILENAME RENDERING ERROR".to_owned())
};
(outfilename, rendered)
} else {
(String::new(), rendered)
}
})
.collect_into_vec(&mut batch_results);

for result_record in &batch_results {
if output_to_dir {
let outpath = std::path::Path::new(args.arg_outdir.as_ref().unwrap())
.join(result_record.0.clone());
let mut writer = BufWriter::new(fs::File::create(outpath)?);
write!(writer, "{}", result_record.1)?;
writer.flush()?;
} else if let Some(ref mut w) = wtr {
w.write_all(result_record.1.as_bytes())?;
}
}

batch.clear();
} // end batch loop

Ok(())
}
Expand Down
72 changes: 72 additions & 0 deletions tests/test_template.rs
Original file line number Diff line number Diff line change
Expand Up @@ -428,3 +428,75 @@ Alice is a minor.
Bob is an adult.";
assert_eq!(got, expected);
}

#[test]
fn template_render_error() {
let wrk = Workdir::new("template_render_error");
wrk.create(
"data.csv",
vec![
svec!["name", "age"],
svec!["Alice", "25"],
svec!["Bob", "30"],
],
);

// Test invalid template syntax with default error message
let mut cmd = wrk.command("template");
cmd.arg("--template")
.arg("Hello {{name}, invalid syntax!")
.arg("data.csv");

wrk.assert_err(&mut *&mut cmd);
let got: String = wrk.output_stderr(&mut cmd);
let expected =
"syntax error: unexpected `}}`, expected end of variable block (in template:1)\n";
assert_eq!(got, expected);
}

#[test]
fn template_filter_error() {
let wrk = Workdir::new("template_filter_error");
wrk.create(
"data.csv",
vec![
svec!["name", "amount"],
svec!["Alice", "not_a_number"],
svec!["Bob", "123.45"],
],
);

// Test filter error with default error message
let mut cmd = wrk.command("template");
cmd.arg("--template")
.arg("{{name}}: {{amount|format_float(2)}}\n\n")
.arg("data.csv");

let got: String = wrk.stdout(&mut cmd);
let expected = "Alice: <FILTER_ERROR>\nBob: 123.45";
assert_eq!(got, expected);

// Test custom filter error message
let mut cmd = wrk.command("template");
cmd.arg("--template")
.arg("{{name}}: {{amount|format_float(2)}}\n\n")
.arg("--customfilter-error")
.arg("INVALID NUMBER")
.arg("data.csv");

let got: String = wrk.stdout(&mut cmd);
let expected = "Alice: INVALID NUMBER\nBob: 123.45";
assert_eq!(got, expected);

// Test empty string as filter error
let mut cmd = wrk.command("template");
cmd.arg("--template")
.arg("{{name}}: {{amount|format_float(2)}}\n\n")
.arg("--customfilter-error")
.arg("<empty string>")
.arg("data.csv");

let got: String = wrk.stdout(&mut cmd);
let expected = "Alice: \nBob: 123.45";
assert_eq!(got, expected);
}

0 comments on commit 36ccc4c

Please sign in to comment.