Skip to content

Commit

Permalink
validate: expose --batch size as an option
Browse files Browse the repository at this point in the history
- also tweaked performance of to_json_instance fn which is called repeatedly in a hot loop
to match on u8 (i.e. first char) rather than str which should be a tad faster.
  • Loading branch information
jqnatividad committed Dec 22, 2022
1 parent 392fd67 commit 5657b15
Showing 1 changed file with 30 additions and 25 deletions.
55 changes: 30 additions & 25 deletions src/cmd/validate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ Example output files from `mydata.csv`. If piped from stdin, then filename is `s
JSON Schema can be a local file or a URL.
Returns exitcode 0 when the CSV file is valid, exitcode 1 otherwise.
Returns exitcode 0 when the CSV file is valid, exitcode > 0 otherwise.
If all records are valid, no output files are produced.
For examples, see https://github.com/jqnatividad/qsv/blob/master/tests/test_validate.rs.
Expand All @@ -29,7 +29,9 @@ Validate options:
-j, --jobs <arg> The number of jobs to run in parallel.
When not set, the number of jobs is set to the
number of CPUs detected.
-b, --batch <size> The number of rows per batch to load into memory,
before running in parallel.
[default: 50000]
Common options:
-h, --help Display this message
Expand Down Expand Up @@ -66,9 +68,6 @@ use crate::{
util, CliResult,
};

// number of CSV rows to process in a batch
const BATCH_SIZE: usize = 24_000;

// to save on repeated init/allocs
static NULL_TYPE: once_cell::sync::OnceCell<Value> = OnceCell::new();

Expand All @@ -81,6 +80,7 @@ struct Args {
flag_json: bool,
flag_pretty_json: bool,
flag_jobs: Option<usize>,
flag_batch: u32,
flag_no_headers: bool,
flag_delimiter: Option<Delimiter>,
flag_progressbar: bool,
Expand Down Expand Up @@ -264,21 +264,23 @@ pub fn run(argv: &[&str]) -> CliResult<()> {
// amortize memory allocation by reusing record
let mut record = csv::ByteRecord::new();
// reuse batch buffer
let mut batch = Vec::with_capacity(BATCH_SIZE);
let mut validation_results = Vec::with_capacity(BATCH_SIZE);
let mut valid_flags: Vec<bool> = Vec::with_capacity(BATCH_SIZE);
let batch_size = args.flag_batch as usize;
let mut batch = Vec::with_capacity(batch_size);
let mut validation_results = Vec::with_capacity(batch_size);
let mut valid_flags: Vec<bool> = Vec::with_capacity(batch_size);
let mut validation_error_messages: Vec<String> = Vec::with_capacity(50);

// set RAYON_NUM_THREADS
util::njobs(args.flag_jobs);

// set this once, as this is used repeatedly in a hot loop
NULL_TYPE.set(Value::String("null".to_string())).unwrap();

// main loop to read CSV and construct batches for parallel processing.
// each batch is processed via Rayon parallel iterator.
// loop exits when batch is empty.
'batch_loop: loop {
for _ in 0..BATCH_SIZE {
for _ in 0..batch_size {
match rdr.read_byte_record(&mut record) {
Ok(has_data) => {
if has_data {
Expand Down Expand Up @@ -549,11 +551,12 @@ fn to_json_instance(

let field_type_def: &Value = field_def.get("type").unwrap_or(&Value::Null);

let json_type = match field_type_def {
Value::String(s) => s,
// we do json_type as a u8 so match will be a tad faster below
let json_type: u8 = match field_type_def {
Value::String(s) => s.as_bytes()[0],
Value::Array(vec) => {
// if can't find usable type info, defaults to "string"
let mut return_val = "string";
let mut return_val = b's';

// grab the first entry that's not a "null", since it just means value is optional
for val in vec {
Expand All @@ -562,7 +565,7 @@ fn to_json_instance(
continue;
}
return_val = if let Some(s) = val.as_str() {
s
s.as_bytes()[0]
} else {
return fail!("type info should be a JSON string");
};
Expand All @@ -572,17 +575,20 @@ fn to_json_instance(
}
_ => {
// default to JSON String
"string"
b's'
}
};

// dbg!(i, &header_string, &value_string, &json_type);

// matching against a u8, rather than an str should a tad faster
match json_type {
"string" => {
b's' => {
// string
json_object_map.insert(header_string, Value::String(value_string));
}
"number" => {
b'n' => {
// number
if let Ok(float) = value_string.parse::<f64>() {
json_object_map.insert(
header_string,
Expand All @@ -591,35 +597,34 @@ fn to_json_instance(
} else {
return fail_format!(
"Can't cast into Float. header: {header_string}, value: {value_string}, \
json type: {json_type}"
json type: number"
);
}
}
"integer" => {
b'i' => {
// integer
if let Ok(int) = value_string.parse::<i64>() {
json_object_map.insert(header_string, Value::Number(Number::from(int)));
} else {
return fail_format!(
"Can't cast into Integer. header: {header_string}, value: {value_string}, \
json type: {json_type}"
json type: integer"
);
}
}
"boolean" => {
b'b' => {
// boolean
if let Ok(boolean) = value_string.parse::<bool>() {
json_object_map.insert(header_string, Value::Bool(boolean));
} else {
return fail_format!(
"Can't cast into Boolean. header: {header_string}, value: {value_string}, \
json type: {json_type}"
json type: boolean"
);
}
}
_ => {
return fail_format!(
"Unsupported JSON type. header: {header_string}, value: {value_string}, json \
type: {json_type}"
);
unreachable!("we should never get an unknown json type");
}
}
}
Expand Down

0 comments on commit 5657b15

Please sign in to comment.