Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tojsonl: parallelize with rayon #1338

Merged
merged 1 commit into from
Oct 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@
| [stats](/src/cmd/stats.rs#L2)<br>📇🤯🏎️ | Compute [summary statistics](https://en.wikipedia.org/wiki/Summary_statistics) (sum, min/max/range, min/max length, mean, stddev, variance, nullcount, sparsity, quartiles, IQR, lower/upper fences, skewness, median, mode/s, antimode/s & cardinality) & make GUARANTEED data type inferences (Null, String, Float, Integer, Date, DateTime, Boolean) for each column in a CSV.<br>Uses multithreading to go faster if an index is present (with an index, can compile "streaming" stats on NYC's 311 data (15gb, 28m rows) in less than 20 seconds). |
| [table](/src/cmd/table.rs#L2)<br>🤯 | Show aligned output of a CSV using [elastic tabstops](https://github.com/BurntSushi/tabwriter). To interactively view CSV files, qsv pairs well with [csvlens](https://github.com/YS-L/csvlens#csvlens). |
| [to](/src/cmd/to.rs#L2)<br>✨🚀 | Convert CSV files to [PostgreSQL](https://www.postgresql.org), [SQLite](https://www.sqlite.org/index.html), XLSX, [Parquet](https://parquet.apache.org) and [Data Package](https://datahub.io/docs/data-packages/tabular). |
| [tojsonl](/src/cmd/tojsonl.rs#L3)<br>📇😣🏎️ | Smartly converts CSV to a newline-delimited JSON ([JSONL](https://jsonlines.org/)/[NDJSON](http://ndjson.org/)). By scanning the CSV first, it "smartly" infers the appropriate JSON data type for each column. See `jsonl` command to convert JSONL to CSV. Uses multithreading to go faster if an index is present. |
| [tojsonl](/src/cmd/tojsonl.rs#L3)<br>📇😣🚀 | Smartly converts CSV to a newline-delimited JSON ([JSONL](https://jsonlines.org/)/[NDJSON](http://ndjson.org/)). By scanning the CSV first, it "smartly" infers the appropriate JSON data type for each column. See `jsonl` command to convert JSONL to CSV. Uses multithreading to go faster if an index is present. |
| [transpose](/src/cmd/transpose.rs#L2)<br>🤯 | Transpose rows/columns of a CSV. |
| [validate](/src/cmd/validate.rs#L2)<br>📇🚀🌐 | Validate CSV data blazingly-fast using [JSON Schema Validation](https://json-schema.org/draft/2020-12/json-schema-validation.html) & put invalid records into a separate file with an accompanying detailed validation error report file (e.g. *up to 350,000 rows/second* using [NYC's 311 schema](https://github.com/jqnatividad/qsv/blob/master/resources/test/311_Service_Requests_from_2010_to_Present-2022-03-04.csv.schema.json) generated by the `schema` command).<br>If no JSON schema file is provided, validates if a CSV conforms to the [RFC 4180 standard](#rfc-4180-csv-standard) and is UTF-8 encoded. |

Expand Down
159 changes: 103 additions & 56 deletions src/cmd/tojsonl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ Tojsonl options:
-j, --jobs <arg> The number of jobs to run in parallel.
When not set, the number of jobs is set to the
number of CPUs detected.
-b, --batch <size> The number of rows per batch to load into memory,
before running in parallel. [default: 50000]

Common options:
-h, --help Display this message
Expand All @@ -31,6 +33,10 @@ Common options:

use std::{fmt::Write, path::PathBuf, str::FromStr};

use rayon::{
iter::{IndexedParallelIterator, ParallelIterator},
prelude::IntoParallelRefIterator,
};
use serde::Deserialize;
use serde_json::{Map, Value};
use strum_macros::EnumString;
Expand All @@ -45,6 +51,7 @@ use crate::{
struct Args {
arg_input: Option<String>,
flag_jobs: Option<usize>,
flag_batch: u32,
flag_delimiter: Option<Delimiter>,
flag_output: Option<String>,
flag_memcheck: bool,
Expand Down Expand Up @@ -217,65 +224,105 @@ pub fn run(argv: &[&str]) -> CliResult<()> {
);
}

// amortize allocs
let mut record = csv::StringRecord::new();

let mut temp_string = String::with_capacity(100);
let mut temp_string2 = String::with_capacity(50);

let mut header_key = Value::String(String::with_capacity(50));
let mut temp_val = Value::String(String::with_capacity(50));

// TODO: see if its worth it to do rayon here after benchmarking
// with large files. We have --jobs option, but we only pass it
// thru to stats/frequency to infer data types & enum constraints.

// now that we have type mappings, iterate thru input csv
// and write jsonl file
while rdr.read_record(&mut record)? {
temp_string.clear();
record.trim();
write!(temp_string, "{{")?;
for (idx, field) in record.iter().enumerate() {
let field_val = if let Some(field_type) = field_type_vec.get(idx) {
match field_type {
JsonlType::String => {
if field.is_empty() {
"null"
} else {
// we round-trip thru serde_json to escape the str
// per json spec (https://www.json.org/json-en.html)
temp_val = field.into();
temp_string2 = temp_val.to_string();
&temp_string2
}
},
JsonlType::Null => "null",
JsonlType::Integer | JsonlType::Number => field,
JsonlType::Boolean => {
if let 't' | 'y' | '1' = boolcheck_first_lower_char(field) {
"true"
} else {
"false"
// amortize memory allocation by reusing record
#[allow(unused_assignments)]
let mut batch_record = csv::StringRecord::new();

// reuse batch buffers
let batchsize: usize = args.flag_batch as usize;
let mut batch = Vec::with_capacity(batchsize);
let mut batch_results = Vec::with_capacity(batchsize);

// set RAYON_NUM_THREADS
util::njobs(args.flag_jobs);

// main loop to read CSV and construct batches for parallel processing.
// each batch is processed via Rayon parallel iterator.
// loop exits when batch is empty.
'batch_loop: loop {
for _ in 0..batchsize {
match rdr.read_record(&mut batch_record) {
Ok(has_data) => {
if has_data {
batch.push(batch_record.clone());
} else {
// nothing else to add to batch
break;
}
},
Err(e) => {
return fail_clierror!("Error reading file: {e}");
},
}
}

if batch.is_empty() {
// break out of infinite loop when at EOF
break 'batch_loop;
}

// process batch in parallel
batch
.par_iter()
.map(|record_item| {
let mut record = record_item.clone();
let mut temp_string = String::new();
let mut temp_string2: String;

let mut header_key = Value::String(String::new());
let mut temp_val = Value::String(String::new());

record.trim();
write!(temp_string, "{{").unwrap();
for (idx, field) in record.iter().enumerate() {
let field_val = if let Some(field_type) = field_type_vec.get(idx) {
match field_type {
JsonlType::String => {
if field.is_empty() {
"null"
} else {
// we round-trip thru serde_json to escape the str
// per json spec (https://www.json.org/json-en.html)
temp_val = field.into();
temp_string2 = temp_val.to_string();
&temp_string2
}
},
JsonlType::Null => "null",
JsonlType::Integer | JsonlType::Number => field,
JsonlType::Boolean => {
if let 't' | 'y' | '1' = boolcheck_first_lower_char(field) {
"true"
} else {
"false"
}
},
}
},
} else {
"null"
};
header_key = headers[idx].into();
if field_val.is_empty() {
write!(temp_string, r#"{header_key}:null,"#).unwrap();
} else {
write!(temp_string, r#"{header_key}:{field_val},"#).unwrap();
}
}
} else {
"null"
};
header_key = headers[idx].into();
if field_val.is_empty() {
write!(temp_string, r#"{header_key}:null,"#)?;
} else {
write!(temp_string, r#"{header_key}:{field_val},"#)?;
}
temp_string.pop(); // remove last comma
temp_string.push('}');
record.clear();
record.push_field(&temp_string);
record
})
.collect_into_vec(&mut batch_results);

// rayon collect() guarantees original order, so we can just append results each batch
for result_record in &batch_results {
wtr.write_record(result_record)?;
}
temp_string.pop(); // remove last comma
temp_string.push('}');
record.clear();
record.push_field(&temp_string);
wtr.write_record(&record)?;
}

batch.clear();
} // end of batch loop

Ok(wtr.flush()?)
}
Expand Down
Loading