Skip to content

Commit

Permalink
Merge pull request #1338 from jqnatividad/tojsonl-rayon
Browse files Browse the repository at this point in the history
`tojsonl`: parallelize with rayon
  • Loading branch information
jqnatividad authored Oct 1, 2023
2 parents c71cd16 + bde2eb7 commit 1f34466
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 57 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@
| [stats](/src/cmd/stats.rs#L2)<br>📇🤯🏎️ | Compute [summary statistics](https://en.wikipedia.org/wiki/Summary_statistics) (sum, min/max/range, min/max length, mean, stddev, variance, nullcount, sparsity, quartiles, IQR, lower/upper fences, skewness, median, mode/s, antimode/s & cardinality) & make GUARANTEED data type inferences (Null, String, Float, Integer, Date, DateTime, Boolean) for each column in a CSV.<br>Uses multithreading to go faster if an index is present (with an index, can compile "streaming" stats on NYC's 311 data (15gb, 28m rows) in less than 20 seconds). |
| [table](/src/cmd/table.rs#L2)<br>🤯 | Show aligned output of a CSV using [elastic tabstops](https://github.com/BurntSushi/tabwriter). To interactively view CSV files, qsv pairs well with [csvlens](https://github.com/YS-L/csvlens#csvlens). |
| [to](/src/cmd/to.rs#L2)<br>✨🚀 | Convert CSV files to [PostgreSQL](https://www.postgresql.org), [SQLite](https://www.sqlite.org/index.html), XLSX, [Parquet](https://parquet.apache.org) and [Data Package](https://datahub.io/docs/data-packages/tabular). |
| [tojsonl](/src/cmd/tojsonl.rs#L3)<br>📇😣🏎️ | Smartly converts CSV to a newline-delimited JSON ([JSONL](https://jsonlines.org/)/[NDJSON](http://ndjson.org/)). By scanning the CSV first, it "smartly" infers the appropriate JSON data type for each column. See `jsonl` command to convert JSONL to CSV. Uses multithreading to go faster if an index is present. |
| [tojsonl](/src/cmd/tojsonl.rs#L3)<br>📇😣🚀 | Smartly converts CSV to a newline-delimited JSON ([JSONL](https://jsonlines.org/)/[NDJSON](http://ndjson.org/)). By scanning the CSV first, it "smartly" infers the appropriate JSON data type for each column. See `jsonl` command to convert JSONL to CSV. Uses multithreading to go faster if an index is present. |
| [transpose](/src/cmd/transpose.rs#L2)<br>🤯 | Transpose rows/columns of a CSV. |
| [validate](/src/cmd/validate.rs#L2)<br>📇🚀🌐 | Validate CSV data blazingly-fast using [JSON Schema Validation](https://json-schema.org/draft/2020-12/json-schema-validation.html) & put invalid records into a separate file with an accompanying detailed validation error report file (e.g. *up to 350,000 rows/second* using [NYC's 311 schema](https://github.com/jqnatividad/qsv/blob/master/resources/test/311_Service_Requests_from_2010_to_Present-2022-03-04.csv.schema.json) generated by the `schema` command).<br>If no JSON schema file is provided, validates if a CSV conforms to the [RFC 4180 standard](#rfc-4180-csv-standard) and is UTF-8 encoded. |

Expand Down
159 changes: 103 additions & 56 deletions src/cmd/tojsonl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ Tojsonl options:
-j, --jobs <arg> The number of jobs to run in parallel.
When not set, the number of jobs is set to the
number of CPUs detected.
-b, --batch <size> The number of rows per batch to load into memory,
before running in parallel. [default: 50000]
Common options:
-h, --help Display this message
Expand All @@ -31,6 +33,10 @@ Common options:

use std::{fmt::Write, path::PathBuf, str::FromStr};

use rayon::{
iter::{IndexedParallelIterator, ParallelIterator},
prelude::IntoParallelRefIterator,
};
use serde::Deserialize;
use serde_json::{Map, Value};
use strum_macros::EnumString;
Expand All @@ -45,6 +51,7 @@ use crate::{
struct Args {
arg_input: Option<String>,
flag_jobs: Option<usize>,
flag_batch: u32,
flag_delimiter: Option<Delimiter>,
flag_output: Option<String>,
flag_memcheck: bool,
Expand Down Expand Up @@ -217,65 +224,105 @@ pub fn run(argv: &[&str]) -> CliResult<()> {
);
}

// amortize allocs
let mut record = csv::StringRecord::new();

let mut temp_string = String::with_capacity(100);
let mut temp_string2 = String::with_capacity(50);

let mut header_key = Value::String(String::with_capacity(50));
let mut temp_val = Value::String(String::with_capacity(50));

// TODO: see if its worth it to do rayon here after benchmarking
// with large files. We have --jobs option, but we only pass it
// thru to stats/frequency to infer data types & enum constraints.

// now that we have type mappings, iterate thru input csv
// and write jsonl file
while rdr.read_record(&mut record)? {
temp_string.clear();
record.trim();
write!(temp_string, "{{")?;
for (idx, field) in record.iter().enumerate() {
let field_val = if let Some(field_type) = field_type_vec.get(idx) {
match field_type {
JsonlType::String => {
if field.is_empty() {
"null"
} else {
// we round-trip thru serde_json to escape the str
// per json spec (https://www.json.org/json-en.html)
temp_val = field.into();
temp_string2 = temp_val.to_string();
&temp_string2
}
},
JsonlType::Null => "null",
JsonlType::Integer | JsonlType::Number => field,
JsonlType::Boolean => {
if let 't' | 'y' | '1' = boolcheck_first_lower_char(field) {
"true"
} else {
"false"
// amortize memory allocation by reusing record
#[allow(unused_assignments)]
let mut batch_record = csv::StringRecord::new();

// reuse batch buffers
let batchsize: usize = args.flag_batch as usize;
let mut batch = Vec::with_capacity(batchsize);
let mut batch_results = Vec::with_capacity(batchsize);

// set RAYON_NUM_THREADS
util::njobs(args.flag_jobs);

// main loop to read CSV and construct batches for parallel processing.
// each batch is processed via Rayon parallel iterator.
// loop exits when batch is empty.
'batch_loop: loop {
for _ in 0..batchsize {
match rdr.read_record(&mut batch_record) {
Ok(has_data) => {
if has_data {
batch.push(batch_record.clone());
} else {
// nothing else to add to batch
break;
}
},
Err(e) => {
return fail_clierror!("Error reading file: {e}");
},
}
}

if batch.is_empty() {
// break out of infinite loop when at EOF
break 'batch_loop;
}

// process batch in parallel
batch
.par_iter()
.map(|record_item| {
let mut record = record_item.clone();
let mut temp_string = String::new();
let mut temp_string2: String;

let mut header_key = Value::String(String::new());
let mut temp_val = Value::String(String::new());

record.trim();
write!(temp_string, "{{").unwrap();
for (idx, field) in record.iter().enumerate() {
let field_val = if let Some(field_type) = field_type_vec.get(idx) {
match field_type {
JsonlType::String => {
if field.is_empty() {
"null"
} else {
// we round-trip thru serde_json to escape the str
// per json spec (https://www.json.org/json-en.html)
temp_val = field.into();
temp_string2 = temp_val.to_string();
&temp_string2
}
},
JsonlType::Null => "null",
JsonlType::Integer | JsonlType::Number => field,
JsonlType::Boolean => {
if let 't' | 'y' | '1' = boolcheck_first_lower_char(field) {
"true"
} else {
"false"
}
},
}
},
} else {
"null"
};
header_key = headers[idx].into();
if field_val.is_empty() {
write!(temp_string, r#"{header_key}:null,"#).unwrap();
} else {
write!(temp_string, r#"{header_key}:{field_val},"#).unwrap();
}
}
} else {
"null"
};
header_key = headers[idx].into();
if field_val.is_empty() {
write!(temp_string, r#"{header_key}:null,"#)?;
} else {
write!(temp_string, r#"{header_key}:{field_val},"#)?;
}
temp_string.pop(); // remove last comma
temp_string.push('}');
record.clear();
record.push_field(&temp_string);
record
})
.collect_into_vec(&mut batch_results);

// rayon collect() guarantees original order, so we can just append results each batch
for result_record in &batch_results {
wtr.write_record(result_record)?;
}
temp_string.pop(); // remove last comma
temp_string.push('}');
record.clear();
record.push_field(&temp_string);
wtr.write_record(&record)?;
}

batch.clear();
} // end of batch loop

Ok(wtr.flush()?)
}
Expand Down

0 comments on commit 1f34466

Please sign in to comment.