From d9bc2a53a14761bf8c48035937c09e060e7d67f2 Mon Sep 17 00:00:00 2001 From: Joel Natividad <1980690+jqnatividad@users.noreply.github.com> Date: Mon, 18 Nov 2024 00:24:30 -0500 Subject: [PATCH 01/11] feat: add datase level stats resolves #2288 --- src/cmd/stats.rs | 83 ++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 81 insertions(+), 2 deletions(-) diff --git a/src/cmd/stats.rs b/src/cmd/stats.rs index b95aed857..f1b3e59fe 100644 --- a/src/cmd/stats.rs +++ b/src/cmd/stats.rs @@ -242,7 +242,9 @@ with ~500 tests. use std::{ default::Default, - fmt, fs, io, + fmt, fs, + hash::BuildHasher, + io, io::Write, iter::repeat, path::{Path, PathBuf}, @@ -769,6 +771,8 @@ pub fn run(argv: &[&str]) -> CliResult<()> { }?; let stats_sr_vec = args.stats_to_records(stats); + let mut work_br; + let mut stats_br_vec: Vec = Vec::new(); let stats_headers_sr = args.stat_headers(); wtr.write_record(&stats_headers_sr)?; @@ -780,9 +784,77 @@ pub fn run(argv: &[&str]) -> CliResult<()> { header.to_vec() }; let stat = stat.iter().map(str::as_bytes); - wtr.write_record(vec![&*header].into_iter().chain(stat))?; + // work_var = vec![&*header].into_iter().chain(stat); + work_br = csv::ByteRecord::from_iter(vec![&*header].into_iter().chain(stat)); + wtr.write_record(&work_br)?; + stats_br_vec.push(work_br); } + // add the dataset-level stats + let num_stats_fields = stats_headers_sr.len(); + let mut dataset_stats_br = csv::ByteRecord::with_capacity(128, num_stats_fields); + dataset_stats_br.push_field(b"_qsv_rowcount"); + for _ in 2..num_stats_fields { + dataset_stats_br.push_field(b""); + } + dataset_stats_br.push_field(itoa::Buffer::new().format(*record_count).as_bytes()); + wtr.write_record(&dataset_stats_br)?; + stats_br_vec.push(dataset_stats_br.to_owned()); + + dataset_stats_br.clear(); + dataset_stats_br.push_field(b"_qsv_columncount"); + for _ in 2..num_stats_fields { + dataset_stats_br.push_field(b""); + } + dataset_stats_br.push_field(itoa::Buffer::new().format(headers.len()).as_bytes()); + wtr.write_record(&dataset_stats_br)?; + stats_br_vec.push(dataset_stats_br.to_owned()); + + dataset_stats_br.clear(); + dataset_stats_br.push_field(b"_qsv_filesize_bytes"); + for _ in 2..num_stats_fields { + dataset_stats_br.push_field(b""); + } + dataset_stats_br.push_field( + itoa::Buffer::new() + .format(fs::metadata(&path)?.len()) + .as_bytes(), + ); + wtr.write_record(&dataset_stats_br)?; + stats_br_vec.push(dataset_stats_br.to_owned()); + + // compute the hash using stats, instead of scanning the entire file + // so the performance is constant regardless of file size + let stats_hash = { + #[allow(deprecated)] + // we use "deprecated" SipHasher explicitly instead of DefaultHasher, + // even though, it is the current DefaultHasher since Rust 1.7.0 + // as we want the hash to be deterministic and stable across Rust versions + // DefaultHasher may change in future Rust versions + let mut hasher = + std::hash::BuildHasherDefault::::default().build_hasher(); + for record in &stats_br_vec { + for (i, field) in record.iter().enumerate() { + // we only do the first 20 stats columns to compute the hash as those + // columns are always the same, even if other stats --options are used + if i >= 20 { + break; + } + std::hash::Hash::hash(field, &mut hasher); + } + } + std::hash::Hasher::finish(&hasher) + }; + + dataset_stats_br.clear(); + dataset_stats_br.push_field(b"_qsv_hash"); + for _ in 2..num_stats_fields { + dataset_stats_br.push_field(b""); + } + dataset_stats_br.push_field(itoa::Buffer::new().format(stats_hash).as_bytes()); + wtr.write_record(&dataset_stats_br)?; + stats_br_vec.push(dataset_stats_br); + // update the stats args json metadata current_stats_args.compute_duration_ms = start_time.elapsed().as_millis() as u64; @@ -1144,6 +1216,10 @@ impl Args { "antimode_occurrences", ]); } + + // we add the _qsv_value field at the end for dataset-level stats + fields.push("_qsv_value"); + csv::StringRecord::from(fields) } } @@ -1791,6 +1867,9 @@ impl Stats { // append it here to preserve legacy ordering of columns pieces.extend_from_slice(&mc_pieces); + // add an empty field for _qsv_value + pieces.push(empty()); + csv::StringRecord::from(pieces) } } From 6ca0100106cfa9ff5d4b7d9e04112ee50ca912e0 Mon Sep 17 00:00:00 2001 From: Joel Natividad <1980690+jqnatividad@users.noreply.github.com> Date: Mon, 18 Nov 2024 00:31:07 -0500 Subject: [PATCH 02/11] chore: prealloc stats_br_vec; remove old commented code --- src/cmd/stats.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/cmd/stats.rs b/src/cmd/stats.rs index f1b3e59fe..7eecfdf73 100644 --- a/src/cmd/stats.rs +++ b/src/cmd/stats.rs @@ -772,7 +772,9 @@ pub fn run(argv: &[&str]) -> CliResult<()> { let stats_sr_vec = args.stats_to_records(stats); let mut work_br; - let mut stats_br_vec: Vec = Vec::new(); + + // prealloc. we add 4 for the 4 addl dataset-level stats + let mut stats_br_vec: Vec = Vec::with_capacity(stats_sr_vec.len() + 4); let stats_headers_sr = args.stat_headers(); wtr.write_record(&stats_headers_sr)?; @@ -784,7 +786,6 @@ pub fn run(argv: &[&str]) -> CliResult<()> { header.to_vec() }; let stat = stat.iter().map(str::as_bytes); - // work_var = vec![&*header].into_iter().chain(stat); work_br = csv::ByteRecord::from_iter(vec![&*header].into_iter().chain(stat)); wtr.write_record(&work_br)?; stats_br_vec.push(work_br); From 8fae48fb5410867e584ea52da15cab41ec65667f Mon Sep 17 00:00:00 2001 From: Joel Natividad <1980690+jqnatividad@users.noreply.github.com> Date: Mon, 18 Nov 2024 00:44:40 -0500 Subject: [PATCH 03/11] chore: apply clippy suggestions warning: usage of `FromIterator::from_iter` --> src/cmd/stats.rs:789:27 | 789 | work_br = csv::ByteRecord::from_iter(vec![&*header].into_iter().chain(stat)); | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ help: use `.collect()` instead of `::from_iter()`: `vec![&*header].into_iter().chain(stat).collect::>()` | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#from_iter_instead_of_collect = note: `-W clippy::from-iter-instead-of-collect` implied by `-W clippy::pedantic` = help: to override `-W clippy::pedantic` add `#[allow(clippy::from_iter_instead_of_collect)]` warning: implicitly cloning a `ByteRecord` by calling `to_owned` on its dereferenced type --> src/cmd/stats.rs:803:31 | 803 | stats_br_vec.push(dataset_stats_br.to_owned()); | ^^^^^^^^^^^^^^^^^^^^^^^^^^^ help: consider using: `dataset_stats_br.clone()` | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#implicit_clone = note: `-W clippy::implicit-clone` implied by `-W clippy::pedantic` = help: to override `-W clippy::pedantic` add `#[allow(clippy::implicit_clone)]` warning: implicitly cloning a `ByteRecord` by calling `to_owned` on its dereferenced type --> src/cmd/stats.rs:812:31 | 812 | stats_br_vec.push(dataset_stats_br.to_owned()); | ^^^^^^^^^^^^^^^^^^^^^^^^^^^ help: consider using: `dataset_stats_br.clone()` | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#implicit_clone warning: implicitly cloning a `ByteRecord` by calling `to_owned` on its dereferenced type --> src/cmd/stats.rs:825:31 | 825 | stats_br_vec.push(dataset_stats_br.to_owned()); | ^^^^^^^^^^^^^^^^^^^^^^^^^^^ help: consider using: `dataset_stats_br.clone()` | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#implicit_clone --- src/cmd/stats.rs | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/src/cmd/stats.rs b/src/cmd/stats.rs index 7eecfdf73..a84e84712 100644 --- a/src/cmd/stats.rs +++ b/src/cmd/stats.rs @@ -786,7 +786,10 @@ pub fn run(argv: &[&str]) -> CliResult<()> { header.to_vec() }; let stat = stat.iter().map(str::as_bytes); - work_br = csv::ByteRecord::from_iter(vec![&*header].into_iter().chain(stat)); + work_br = vec![&*header] + .into_iter() + .chain(stat) + .collect::(); wtr.write_record(&work_br)?; stats_br_vec.push(work_br); } @@ -800,7 +803,7 @@ pub fn run(argv: &[&str]) -> CliResult<()> { } dataset_stats_br.push_field(itoa::Buffer::new().format(*record_count).as_bytes()); wtr.write_record(&dataset_stats_br)?; - stats_br_vec.push(dataset_stats_br.to_owned()); + stats_br_vec.push(dataset_stats_br.clone()); dataset_stats_br.clear(); dataset_stats_br.push_field(b"_qsv_columncount"); @@ -809,7 +812,7 @@ pub fn run(argv: &[&str]) -> CliResult<()> { } dataset_stats_br.push_field(itoa::Buffer::new().format(headers.len()).as_bytes()); wtr.write_record(&dataset_stats_br)?; - stats_br_vec.push(dataset_stats_br.to_owned()); + stats_br_vec.push(dataset_stats_br.clone()); dataset_stats_br.clear(); dataset_stats_br.push_field(b"_qsv_filesize_bytes"); @@ -822,7 +825,7 @@ pub fn run(argv: &[&str]) -> CliResult<()> { .as_bytes(), ); wtr.write_record(&dataset_stats_br)?; - stats_br_vec.push(dataset_stats_br.to_owned()); + stats_br_vec.push(dataset_stats_br.clone()); // compute the hash using stats, instead of scanning the entire file // so the performance is constant regardless of file size From 68f3830c168d83b55087e1890dcb05bf1760ece3 Mon Sep 17 00:00:00 2001 From: Joel Natividad <1980690+jqnatividad@users.noreply.github.com> Date: Mon, 18 Nov 2024 15:46:44 -0500 Subject: [PATCH 04/11] `refactor`: how `stats` computes dataset-level stats --- src/cmd/stats.rs | 30 +++++++++++++++++++----------- 1 file changed, 19 insertions(+), 11 deletions(-) diff --git a/src/cmd/stats.rs b/src/cmd/stats.rs index a84e84712..7d8dc53d3 100644 --- a/src/cmd/stats.rs +++ b/src/cmd/stats.rs @@ -801,31 +801,36 @@ pub fn run(argv: &[&str]) -> CliResult<()> { for _ in 2..num_stats_fields { dataset_stats_br.push_field(b""); } - dataset_stats_br.push_field(itoa::Buffer::new().format(*record_count).as_bytes()); + let ds_record_count = itoa::Buffer::new() + .format(*record_count) + .as_bytes() + .to_owned(); + dataset_stats_br.push_field(&ds_record_count); wtr.write_record(&dataset_stats_br)?; - stats_br_vec.push(dataset_stats_br.clone()); dataset_stats_br.clear(); dataset_stats_br.push_field(b"_qsv_columncount"); for _ in 2..num_stats_fields { dataset_stats_br.push_field(b""); } - dataset_stats_br.push_field(itoa::Buffer::new().format(headers.len()).as_bytes()); + let ds_column_count = itoa::Buffer::new() + .format(headers.len()) + .as_bytes() + .to_owned(); + dataset_stats_br.push_field(&ds_column_count); wtr.write_record(&dataset_stats_br)?; - stats_br_vec.push(dataset_stats_br.clone()); dataset_stats_br.clear(); dataset_stats_br.push_field(b"_qsv_filesize_bytes"); for _ in 2..num_stats_fields { dataset_stats_br.push_field(b""); } - dataset_stats_br.push_field( - itoa::Buffer::new() - .format(fs::metadata(&path)?.len()) - .as_bytes(), - ); + let ds_filesize_bytes = itoa::Buffer::new() + .format(fs::metadata(&path)?.len()) + .as_bytes() + .to_owned(); + dataset_stats_br.push_field(&ds_filesize_bytes); wtr.write_record(&dataset_stats_br)?; - stats_br_vec.push(dataset_stats_br.clone()); // compute the hash using stats, instead of scanning the entire file // so the performance is constant regardless of file size @@ -847,6 +852,10 @@ pub fn run(argv: &[&str]) -> CliResult<()> { std::hash::Hash::hash(field, &mut hasher); } } + // we also add the dataset level stats to the hash + std::hash::Hash::hash(&ds_record_count, &mut hasher); + std::hash::Hash::hash(&ds_column_count, &mut hasher); + std::hash::Hash::hash(&ds_filesize_bytes, &mut hasher); std::hash::Hasher::finish(&hasher) }; @@ -857,7 +866,6 @@ pub fn run(argv: &[&str]) -> CliResult<()> { } dataset_stats_br.push_field(itoa::Buffer::new().format(stats_hash).as_bytes()); wtr.write_record(&dataset_stats_br)?; - stats_br_vec.push(dataset_stats_br); // update the stats args json metadata current_stats_args.compute_duration_ms = start_time.elapsed().as_millis() as u64; From 3bb256d5eb4e7ae4d553cd3e3a0e290bb5045880 Mon Sep 17 00:00:00 2001 From: Joel Natividad <1980690+jqnatividad@users.noreply.github.com> Date: Mon, 18 Nov 2024 16:18:49 -0500 Subject: [PATCH 05/11] refactor: improve dataset-level stats calculation, use a helper closure write_dataset_stat --- src/cmd/stats.rs | 99 ++++++++++++++++++++++++++++-------------------- 1 file changed, 57 insertions(+), 42 deletions(-) diff --git a/src/cmd/stats.rs b/src/cmd/stats.rs index 7d8dc53d3..ae098be99 100644 --- a/src/cmd/stats.rs +++ b/src/cmd/stats.rs @@ -20,7 +20,7 @@ The following additional "non-streaming" statistics require loading the entire f cardinality, mode/antimode, median, MAD, quartiles and its related measures (IQR, lower/upper fences & skewness). -When computing “non-streaming” statistics, an Out-Of-Memory (OOM) heuristic check is done. +When computing "non-streaming" statistics, an Out-Of-Memory (OOM) heuristic check is done. If the file is larger than the available memory minus a headroom buffer of 20% (which can be adjusted using the QSV_FREEMEMORY_HEADROOM_PCT environment variable), processing will be preemptively prevented. @@ -794,46 +794,64 @@ pub fn run(argv: &[&str]) -> CliResult<()> { stats_br_vec.push(work_br); } - // add the dataset-level stats + // Add dataset-level stats as additional rows ==================== let num_stats_fields = stats_headers_sr.len(); let mut dataset_stats_br = csv::ByteRecord::with_capacity(128, num_stats_fields); - dataset_stats_br.push_field(b"_qsv_rowcount"); - for _ in 2..num_stats_fields { - dataset_stats_br.push_field(b""); - } + + // Helper closure to write a dataset stat row + let write_dataset_stat = |name: &[u8], + value: &[u8], + br: &mut csv::ByteRecord, + wtr: &mut csv::Writer<_>| + -> CliResult<()> { + br.clear(); + br.push_field(name); + // Fill middle columns with empty strings + for _ in 2..num_stats_fields { + br.push_field(b""); + } + // write _qsv_value as last column + br.push_field(value); + wtr.write_byte_record(br).map_err(|e| e.into()) + }; + + // Write _qsv_rowcount let ds_record_count = itoa::Buffer::new() .format(*record_count) .as_bytes() .to_owned(); - dataset_stats_br.push_field(&ds_record_count); - wtr.write_record(&dataset_stats_br)?; + write_dataset_stat( + b"_qsv_rowcount", + &ds_record_count, + &mut dataset_stats_br, + &mut wtr, + )?; - dataset_stats_br.clear(); - dataset_stats_br.push_field(b"_qsv_columncount"); - for _ in 2..num_stats_fields { - dataset_stats_br.push_field(b""); - } + // Write _qsv_columncount let ds_column_count = itoa::Buffer::new() .format(headers.len()) .as_bytes() .to_owned(); - dataset_stats_br.push_field(&ds_column_count); - wtr.write_record(&dataset_stats_br)?; + write_dataset_stat( + b"_qsv_columncount", + &ds_column_count, + &mut dataset_stats_br, + &mut wtr, + )?; - dataset_stats_br.clear(); - dataset_stats_br.push_field(b"_qsv_filesize_bytes"); - for _ in 2..num_stats_fields { - dataset_stats_br.push_field(b""); - } + // Write _qsv_filesize_bytes let ds_filesize_bytes = itoa::Buffer::new() .format(fs::metadata(&path)?.len()) .as_bytes() .to_owned(); - dataset_stats_br.push_field(&ds_filesize_bytes); - wtr.write_record(&dataset_stats_br)?; + write_dataset_stat( + b"_qsv_filesize_bytes", + &ds_filesize_bytes, + &mut dataset_stats_br, + &mut wtr, + )?; - // compute the hash using stats, instead of scanning the entire file - // so the performance is constant regardless of file size + // Compute hash of stats for data fingerprinting let stats_hash = { #[allow(deprecated)] // we use "deprecated" SipHasher explicitly instead of DefaultHasher, @@ -842,32 +860,29 @@ pub fn run(argv: &[&str]) -> CliResult<()> { // DefaultHasher may change in future Rust versions let mut hasher = std::hash::BuildHasherDefault::::default().build_hasher(); + + // Hash the first 20 columns of each stats record + // we only do the first 20 stats columns to compute the hash as those + // columns are always the same, even if other stats --options are used for record in &stats_br_vec { - for (i, field) in record.iter().enumerate() { - // we only do the first 20 stats columns to compute the hash as those - // columns are always the same, even if other stats --options are used - if i >= 20 { - break; - } + for field in record.iter().take(20) { std::hash::Hash::hash(field, &mut hasher); } } - // we also add the dataset level stats to the hash - std::hash::Hash::hash(&ds_record_count, &mut hasher); - std::hash::Hash::hash(&ds_column_count, &mut hasher); - std::hash::Hash::hash(&ds_filesize_bytes, &mut hasher); + + // Include dataset-level stats in hash + for stat in [&ds_record_count, &ds_column_count, &ds_filesize_bytes] { + std::hash::Hash::hash(stat, &mut hasher); + } + std::hash::Hasher::finish(&hasher) }; - dataset_stats_br.clear(); - dataset_stats_br.push_field(b"_qsv_hash"); - for _ in 2..num_stats_fields { - dataset_stats_br.push_field(b""); - } - dataset_stats_br.push_field(itoa::Buffer::new().format(stats_hash).as_bytes()); - wtr.write_record(&dataset_stats_br)?; + // Write _qsv_hash + let hash_bytes = itoa::Buffer::new().format(stats_hash).as_bytes().to_owned(); + write_dataset_stat(b"_qsv_hash", &hash_bytes, &mut dataset_stats_br, &mut wtr)?; - // update the stats args json metadata + // update the stats args json metadata =============== current_stats_args.compute_duration_ms = start_time.elapsed().as_millis() as u64; if create_cache From 85c330b98bac389cfd3898c0e90037d8a8798b79 Mon Sep 17 00:00:00 2001 From: Joel Natividad <1980690+jqnatividad@users.noreply.github.com> Date: Mon, 18 Nov 2024 17:13:32 -0500 Subject: [PATCH 06/11] refactor: simplify write_dataset_stat helper closure --- src/cmd/stats.rs | 54 ++++++++++++++++-------------------------------- 1 file changed, 18 insertions(+), 36 deletions(-) diff --git a/src/cmd/stats.rs b/src/cmd/stats.rs index ae098be99..39f6a0512 100644 --- a/src/cmd/stats.rs +++ b/src/cmd/stats.rs @@ -773,8 +773,8 @@ pub fn run(argv: &[&str]) -> CliResult<()> { let stats_sr_vec = args.stats_to_records(stats); let mut work_br; - // prealloc. we add 4 for the 4 addl dataset-level stats - let mut stats_br_vec: Vec = Vec::with_capacity(stats_sr_vec.len() + 4); + // vec we use to compute dataset-level fingerprint hash + let mut stats_br_vec: Vec = Vec::with_capacity(stats_sr_vec.len()); let stats_headers_sr = args.stat_headers(); wtr.write_record(&stats_headers_sr)?; @@ -799,57 +799,39 @@ pub fn run(argv: &[&str]) -> CliResult<()> { let mut dataset_stats_br = csv::ByteRecord::with_capacity(128, num_stats_fields); // Helper closure to write a dataset stat row - let write_dataset_stat = |name: &[u8], - value: &[u8], - br: &mut csv::ByteRecord, - wtr: &mut csv::Writer<_>| - -> CliResult<()> { - br.clear(); - br.push_field(name); + let mut write_dataset_stat = |name: &[u8], value: &[u8]| -> CliResult<()> { + dataset_stats_br.clear(); + dataset_stats_br.push_field(name); // Fill middle columns with empty strings for _ in 2..num_stats_fields { - br.push_field(b""); + dataset_stats_br.push_field(b""); } // write _qsv_value as last column - br.push_field(value); - wtr.write_byte_record(br).map_err(|e| e.into()) + dataset_stats_br.push_field(value); + wtr.write_byte_record(&dataset_stats_br) + .map_err(std::convert::Into::into) }; // Write _qsv_rowcount let ds_record_count = itoa::Buffer::new() .format(*record_count) .as_bytes() - .to_owned(); - write_dataset_stat( - b"_qsv_rowcount", - &ds_record_count, - &mut dataset_stats_br, - &mut wtr, - )?; + .to_vec(); + write_dataset_stat(b"_qsv_rowcount", &ds_record_count)?; // Write _qsv_columncount let ds_column_count = itoa::Buffer::new() .format(headers.len()) .as_bytes() - .to_owned(); - write_dataset_stat( - b"_qsv_columncount", - &ds_column_count, - &mut dataset_stats_br, - &mut wtr, - )?; + .to_vec(); + write_dataset_stat(b"_qsv_columncount", &ds_column_count)?; // Write _qsv_filesize_bytes let ds_filesize_bytes = itoa::Buffer::new() .format(fs::metadata(&path)?.len()) .as_bytes() - .to_owned(); - write_dataset_stat( - b"_qsv_filesize_bytes", - &ds_filesize_bytes, - &mut dataset_stats_br, - &mut wtr, - )?; + .to_vec(); + write_dataset_stat(b"_qsv_filesize_bytes", &ds_filesize_bytes)?; // Compute hash of stats for data fingerprinting let stats_hash = { @@ -878,9 +860,9 @@ pub fn run(argv: &[&str]) -> CliResult<()> { std::hash::Hasher::finish(&hasher) }; - // Write _qsv_hash - let hash_bytes = itoa::Buffer::new().format(stats_hash).as_bytes().to_owned(); - write_dataset_stat(b"_qsv_hash", &hash_bytes, &mut dataset_stats_br, &mut wtr)?; + // Write _qsv_hash dataset fingerprint + let hash_bytes = itoa::Buffer::new().format(stats_hash).as_bytes().to_vec(); + write_dataset_stat(b"_qsv_hash", &hash_bytes)?; // update the stats args json metadata =============== current_stats_args.compute_duration_ms = start_time.elapsed().as_millis() as u64; From 12bdfff675ce9c4854844c3e8419e686fb984081 Mon Sep 17 00:00:00 2001 From: Joel Natividad <1980690+jqnatividad@users.noreply.github.com> Date: Mon, 18 Nov 2024 22:27:15 -0500 Subject: [PATCH 07/11] `refactor`: get_stats_records helper to ignore dataset-level stats and use simd_json instead of serde_json --- src/util.rs | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/src/util.rs b/src/util.rs index 35bf70d8f..436ead86b 100644 --- a/src/util.rs +++ b/src/util.rs @@ -1968,10 +1968,17 @@ pub fn get_stats_records( let statsdata_lines = statsdata_reader.lines(); let mut line: String; + let mut s_slice: Vec; for curr_line in statsdata_lines { line = curr_line?; - let stats_record: StatsData = serde_json::from_str(&line)?; - csv_stats.push(stats_record); + // do not load dataset-level stats into csv_stats + if !line.starts_with(r#"{"field":"_qsv_"#) { + s_slice = line.as_bytes().to_vec(); + match simd_json::serde::from_slice(&mut **&mut s_slice) { + Ok(stats) => csv_stats.push(stats), + Err(_) => continue, + } + } } stats_data_loaded = true; } From 642fd74df4005b805b9cc29020ce1a5fba84c563 Mon Sep 17 00:00:00 2001 From: Joel Natividad <1980690+jqnatividad@users.noreply.github.com> Date: Tue, 19 Nov 2024 07:47:42 -0500 Subject: [PATCH 08/11] refactor: get_stats_records - reduce cloning; align stats.jsonl loading approach also refactor csv_to_jsonl to pass output_jsonl by reference instead of by value --- src/util.rs | 87 ++++++++++++++++++++++++++--------------------------- 1 file changed, 42 insertions(+), 45 deletions(-) diff --git a/src/util.rs b/src/util.rs index 436ead86b..44c821c10 100644 --- a/src/util.rs +++ b/src/util.rs @@ -1921,6 +1921,8 @@ pub fn get_stats_records( args: &SchemaArgs, mode: StatsMode, ) -> CliResult<(ByteRecord, Vec)> { + const DATASET_STATS_PREFIX: &str = r#"{"field":"_qsv_"#; + if mode == StatsMode::None || args.arg_input.is_none() || args.arg_input.as_ref() == Some(&"-".to_string()) @@ -1930,13 +1932,13 @@ pub fn get_stats_records( return Ok((ByteRecord::new(), Vec::new())); }; - let canonical_input_path = Path::new(&args.arg_input.clone().unwrap()).canonicalize()?; + let canonical_input_path = Path::new(args.arg_input.as_ref().unwrap()).canonicalize()?; let statsdata_path = canonical_input_path.with_extension("stats.csv.data.jsonl"); let stats_data_current = if statsdata_path.exists() { let statsdata_metadata = std::fs::metadata(&statsdata_path)?; - let input_metadata = std::fs::metadata(args.arg_input.clone().unwrap())?; + let input_metadata = std::fs::metadata(args.arg_input.as_ref().unwrap())?; let statsdata_mtime = FileTime::from_last_modification_time(&statsdata_metadata); let input_mtime = FileTime::from_last_modification_time(&input_metadata); @@ -1958,35 +1960,39 @@ pub fn get_stats_records( return Ok((ByteRecord::new(), Vec::new())); } + // get the headers from the input file + let mut rdr = csv::Reader::from_path(args.arg_input.as_ref().ok_or("No input provided")?)?; + let csv_fields = rdr.byte_headers()?.clone(); + drop(rdr); + let mut stats_data_loaded = false; - let mut csv_stats: Vec = Vec::new(); + let mut csv_stats: Vec = Vec::with_capacity(csv_fields.len()); // if stats_data file exists and is current, use it if stats_data_current && !args.flag_force { - let statsdata_file = std::fs::File::open(&statsdata_path)?; - let statsdata_reader = std::io::BufReader::new(statsdata_file); - let statsdata_lines = statsdata_reader.lines(); + let statsdatajson_rdr = + BufReader::with_capacity(DEFAULT_RDR_BUFFER_CAPACITY, File::open(statsdata_path)?); - let mut line: String; + let mut curr_line: String; let mut s_slice: Vec; - for curr_line in statsdata_lines { - line = curr_line?; - // do not load dataset-level stats into csv_stats - if !line.starts_with(r#"{"field":"_qsv_"#) { - s_slice = line.as_bytes().to_vec(); - match simd_json::serde::from_slice(&mut **&mut s_slice) { - Ok(stats) => csv_stats.push(stats), - Err(_) => continue, - } + for line in statsdatajson_rdr.lines() { + curr_line = line?; + if curr_line.starts_with(DATASET_STATS_PREFIX) { + break; + } + s_slice = curr_line.as_bytes().to_vec(); + match simd_json::serde::from_slice(&mut **&mut s_slice) { + Ok(stats) => csv_stats.push(stats), + Err(_) => continue, } } - stats_data_loaded = true; + stats_data_loaded = !csv_stats.is_empty(); } // otherwise, run stats command to generate stats.csv.data.jsonl file if !stats_data_loaded { let stats_args = crate::cmd::stats::Args { - arg_input: args.arg_input.clone(), + arg_input: args.arg_input.as_ref().map(String::from), flag_select: crate::select::SelectColumns::parse("").unwrap(), flag_everything: false, flag_typesonly: false, @@ -2017,13 +2023,10 @@ pub fn get_stats_records( .unwrap(); let tempfile_path = tempfile.path().to_str().unwrap().to_string(); - let statsdatajson_path = canonical_input_path.with_extension("stats.csv.data.jsonl"); + let statsdatajson_path = &canonical_input_path.with_extension("stats.csv.data.jsonl"); + + let input = stats_args.arg_input.unwrap_or_else(|| "-".to_string()); - let input = if let Some(arg_input) = stats_args.arg_input { - arg_input - } else { - "-".to_string() - }; // we do rustfmt::skip here as it was breaking the stats cmdline along strange // boundaries, causing CI errors. // This is because we're using tab characters (/t) to separate args to fix #2294, @@ -2048,8 +2051,7 @@ pub fn get_stats_records( // StatsMode::FrequencyForceStats // we're doing frequency, so we need cardinality from a --forced stats run format!( - "stats\t{input}\t--cardinality\t--stats-jsonl\t--force\ - \t--output\t{tempfile_path}" + "stats\t{input}\t--cardinality\t--stats-jsonl\t--force\t--output\t{tempfile_path}" ) }, #[cfg(feature = "polars")] @@ -2110,31 +2112,26 @@ pub fn get_stats_records( } // create a statsdatajon from the output of the stats command - csv_to_jsonl( - &tempfile_path, - &get_stats_data_types(), - statsdatajson_path.clone(), - )?; - - let statsdatajson_rdr = BufReader::with_capacity( - DEFAULT_RDR_BUFFER_CAPACITY * 2, - File::open(statsdatajson_path)?, - ); + csv_to_jsonl(&tempfile_path, &get_stats_data_types(), &statsdatajson_path)?; + + let statsdatajson_rdr = + BufReader::with_capacity(DEFAULT_RDR_BUFFER_CAPACITY, File::open(statsdatajson_path)?); - let mut statsrecord: StatsData; let mut curr_line: String; + let mut s_slice: Vec; for line in statsdatajson_rdr.lines() { curr_line = line?; - statsrecord = serde_json::from_str(&curr_line)?; - csv_stats.push(statsrecord); + if curr_line.starts_with(DATASET_STATS_PREFIX) { + break; + } + s_slice = curr_line.as_bytes().to_vec(); + match simd_json::serde::from_slice(&mut **&mut s_slice) { + Ok(stats) => csv_stats.push(stats), + Err(_) => continue, + } } }; - // get the headers from the input file - let mut rdr = csv::Reader::from_path(args.arg_input.clone().unwrap()).unwrap(); - let csv_fields = rdr.byte_headers()?.clone(); - drop(rdr); - Ok((csv_fields, csv_stats)) } @@ -2143,7 +2140,7 @@ pub fn get_stats_records( pub fn csv_to_jsonl( input_csv: &str, csv_types: &[JsonTypes], - output_jsonl: PathBuf, + output_jsonl: &PathBuf, ) -> CliResult<()> { let file = File::open(input_csv)?; let mut rdr = csv::ReaderBuilder::new() From 0b072c8f6310d02f46602564f592c761698c04cf Mon Sep 17 00:00:00 2001 From: Joel Natividad <1980690+jqnatividad@users.noreply.github.com> Date: Tue, 19 Nov 2024 07:48:37 -0500 Subject: [PATCH 09/11] refactor: `stats` - csv_to_jsonl - pass stats_pathbuf by reference instead of by value --- src/cmd/stats.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/cmd/stats.rs b/src/cmd/stats.rs index 39f6a0512..7d1b3c845 100644 --- a/src/cmd/stats.rs +++ b/src/cmd/stats.rs @@ -972,7 +972,7 @@ pub fn run(argv: &[&str]) -> CliResult<()> { // save the stats data to ".stats.csv.data.jsonl" if write_stats_jsonl { stats_pathbuf.set_extension("data.jsonl"); - util::csv_to_jsonl(&currstats_filename, &get_stats_data_types(), stats_pathbuf)?; + util::csv_to_jsonl(&currstats_filename, &get_stats_data_types(), &stats_pathbuf)?; } } } From b05c57640e4e922551d5a7c8d4118df55916ee89 Mon Sep 17 00:00:00 2001 From: Joel Natividad <1980690+jqnatividad@users.noreply.github.com> Date: Tue, 19 Nov 2024 09:26:30 -0500 Subject: [PATCH 10/11] refactor: use `qsv__` instead of `_qsv_` as prefix for qsv dataset level objects so as not to trigger select where objects starting with _ is a sentinel for last column --- src/cmd/stats.rs | 24 ++++++++++++------------ src/util.rs | 2 +- 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/src/cmd/stats.rs b/src/cmd/stats.rs index 7d1b3c845..a5ae3bfcd 100644 --- a/src/cmd/stats.rs +++ b/src/cmd/stats.rs @@ -806,32 +806,32 @@ pub fn run(argv: &[&str]) -> CliResult<()> { for _ in 2..num_stats_fields { dataset_stats_br.push_field(b""); } - // write _qsv_value as last column + // write qsv__value as last column dataset_stats_br.push_field(value); wtr.write_byte_record(&dataset_stats_br) .map_err(std::convert::Into::into) }; - // Write _qsv_rowcount + // Write qsv__rowcount let ds_record_count = itoa::Buffer::new() .format(*record_count) .as_bytes() .to_vec(); - write_dataset_stat(b"_qsv_rowcount", &ds_record_count)?; + write_dataset_stat(b"qsv__rowcount", &ds_record_count)?; - // Write _qsv_columncount + // Write qsv__columncount let ds_column_count = itoa::Buffer::new() .format(headers.len()) .as_bytes() .to_vec(); - write_dataset_stat(b"_qsv_columncount", &ds_column_count)?; + write_dataset_stat(b"qsv__columncount", &ds_column_count)?; - // Write _qsv_filesize_bytes + // Write qsv__filesize_bytes let ds_filesize_bytes = itoa::Buffer::new() .format(fs::metadata(&path)?.len()) .as_bytes() .to_vec(); - write_dataset_stat(b"_qsv_filesize_bytes", &ds_filesize_bytes)?; + write_dataset_stat(b"qsv__filesize_bytes", &ds_filesize_bytes)?; // Compute hash of stats for data fingerprinting let stats_hash = { @@ -860,9 +860,9 @@ pub fn run(argv: &[&str]) -> CliResult<()> { std::hash::Hasher::finish(&hasher) }; - // Write _qsv_hash dataset fingerprint + // Write qsv__fingerprint_hash dataset let hash_bytes = itoa::Buffer::new().format(stats_hash).as_bytes().to_vec(); - write_dataset_stat(b"_qsv_hash", &hash_bytes)?; + write_dataset_stat(b"qsv__fingerprint_hash", &hash_bytes)?; // update the stats args json metadata =============== current_stats_args.compute_duration_ms = start_time.elapsed().as_millis() as u64; @@ -1226,8 +1226,8 @@ impl Args { ]); } - // we add the _qsv_value field at the end for dataset-level stats - fields.push("_qsv_value"); + // we add the qsv__value field at the end for dataset-level stats + fields.push("qsv__value"); csv::StringRecord::from(fields) } @@ -1876,7 +1876,7 @@ impl Stats { // append it here to preserve legacy ordering of columns pieces.extend_from_slice(&mc_pieces); - // add an empty field for _qsv_value + // add an empty field for qsv__value pieces.push(empty()); csv::StringRecord::from(pieces) diff --git a/src/util.rs b/src/util.rs index 44c821c10..eb3b8fe2c 100644 --- a/src/util.rs +++ b/src/util.rs @@ -1921,7 +1921,7 @@ pub fn get_stats_records( args: &SchemaArgs, mode: StatsMode, ) -> CliResult<(ByteRecord, Vec)> { - const DATASET_STATS_PREFIX: &str = r#"{"field":"_qsv_"#; + const DATASET_STATS_PREFIX: &str = r#"{"field":"qsv__"#; if mode == StatsMode::None || args.arg_input.is_none() From 672796e29ee16a9feca4552d74ee09ada71cfe06 Mon Sep 17 00:00:00 2001 From: Joel Natividad <1980690+jqnatividad@users.noreply.github.com> Date: Tue, 19 Nov 2024 09:27:51 -0500 Subject: [PATCH 11/11] tests: adjust `index` and `json` tests to account for new dataset-level stats also add assert_succes to select frequency and tojsonl tests to help in debugging --- tests/test_frequency.rs | 2 + tests/test_index.rs | 101 ++++++++++++++++++++++++++++++++++++++-- tests/test_json.rs | 2 +- tests/test_tojsonl.rs | 2 + 4 files changed, 103 insertions(+), 4 deletions(-) diff --git a/tests/test_frequency.rs b/tests/test_frequency.rs index 84048f506..97e3ac6bb 100644 --- a/tests/test_frequency.rs +++ b/tests/test_frequency.rs @@ -535,6 +535,8 @@ fn frequency_all_unique_force_stats_cache() { .args(["--stats-mode", "force"]) .arg(testdata); + wrk.assert_success(&mut cmd); + let got: Vec> = wrk.read_stdout(&mut cmd); let expected = vec![ svec!["field", "value", "count", "percentage"], diff --git a/tests/test_index.rs b/tests/test_index.rs index ab70f35eb..a36a621ea 100644 --- a/tests/test_index.rs +++ b/tests/test_index.rs @@ -84,7 +84,8 @@ fn index_outdated_stats() { "cv", "nullcount", "max_precision", - "sparsity" + "sparsity", + "qsv__value", ], svec![ "letter", @@ -106,7 +107,8 @@ fn index_outdated_stats() { "", "0", "", - "0" + "0", + "", ], svec![ "number", @@ -128,7 +130,100 @@ fn index_outdated_stats() { "40.8248", "0", "", - "0" + "0", + "", + ], + svec![ + "qsv__rowcount", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "3" + ], + svec![ + "qsv__columncount", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "2" + ], + svec![ + "qsv__filesize_bytes", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "26" + ], + svec![ + "qsv__fingerprint_hash", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "7405440055837468253" ], ]; diff --git a/tests/test_json.rs b/tests/test_json.rs index 5f6f6bb8f..b7125e6c3 100644 --- a/tests/test_json.rs +++ b/tests/test_json.rs @@ -192,7 +192,7 @@ fn json_fruits_stats_slice_json() { // qsv stats fruits.csv let mut stats_cmd = wrk.command("stats"); - stats_cmd.arg(test_file); + stats_cmd.arg(test_file).arg("--force"); wrk.assert_success(&mut stats_cmd); diff --git a/tests/test_tojsonl.rs b/tests/test_tojsonl.rs index de3e79807..8afd6788a 100644 --- a/tests/test_tojsonl.rs +++ b/tests/test_tojsonl.rs @@ -20,6 +20,8 @@ fn tojsonl_simple() { let mut cmd = wrk.command("tojsonl"); cmd.arg("in.csv"); + wrk.assert_success(&mut cmd); + let got: String = wrk.stdout(&mut cmd); let expected = r#"{"id":1,"father":"Mark","mother":"Charlotte","oldest_child":"Tom","boy":true,"weight":150.2} {"id":2,"father":"John","mother":"Ann","oldest_child":"Jessika","boy":false,"weight":175.5}