From daec19191c48b0a75917cc7945e383fcbab574cd Mon Sep 17 00:00:00 2001 From: Joel Natividad <1980690+jqnatividad@users.noreply.github.com> Date: Sun, 12 Nov 2023 09:52:10 -0500 Subject: [PATCH 1/4] `sqlp`: create compress_output_if_needed() helper fn we can use in joinp --- src/cmd/sqlp.rs | 27 ++++++++++++++++++--------- 1 file changed, 18 insertions(+), 9 deletions(-) diff --git a/src/cmd/sqlp.rs b/src/cmd/sqlp.rs index 027e799cd..569421ad3 100644 --- a/src/cmd/sqlp.rs +++ b/src/cmd/sqlp.rs @@ -197,7 +197,6 @@ use serde::Deserialize; use tempfile; use crate::{ - cmd::snappy::compress, config::{Delimiter, DEFAULT_WTR_BUFFER_CAPACITY}, util, util::process_input, @@ -549,8 +548,23 @@ pub fn run(argv: &[&str]) -> CliResult<()> { } } - if let Some(output) = args.flag_output { - // if the output ends with ".sz", we snappy compress the output + compress_output_if_needed(args.flag_output)?; + + if !args.flag_quiet { + eprintln!("{query_result_shape:?}"); + } + + Ok(()) +} + +/// if the output ends with ".sz", we snappy compress the output +/// and replace the original output with the compressed output +pub fn compress_output_if_needed( + output_file: Option, +) -> Result<(), crate::clitypes::CliError> { + use crate::cmd::snappy::compress; + + if let Some(output) = output_file { if std::path::Path::new(&output) .extension() .map_or(false, |ext| ext.eq_ignore_ascii_case("sz")) @@ -575,11 +589,6 @@ pub fn run(argv: &[&str]) -> CliResult<()> { DEFAULT_WTR_BUFFER_CAPACITY, )?; } - } - - if !args.flag_quiet { - eprintln!("{query_result_shape:?}"); - } - + }; Ok(()) } From 86be06ca9820c924a37a9f29fa0b41889f2d2f17 Mon Sep 17 00:00:00 2001 From: Joel Natividad <1980690+jqnatividad@users.noreply.github.com> Date: Sun, 12 Nov 2023 09:53:14 -0500 Subject: [PATCH 2/4] `joinp`: add snappy input and output support also fix joinp cross so it supports --output option --- src/cmd/joinp.rs | 84 ++++++++++++++++++++++++++++++++++-------------- 1 file changed, 60 insertions(+), 24 deletions(-) diff --git a/src/cmd/joinp.rs b/src/cmd/joinp.rs index d25dd0a41..634366039 100644 --- a/src/cmd/joinp.rs +++ b/src/cmd/joinp.rs @@ -16,7 +16,7 @@ For examples, see https://github.com/jqnatividad/qsv/blob/master/tests/test_join Usage: qsv joinp [options] - qsv joinp --cross + qsv joinp --cross [--output ] qsv joinp --help joinp arguments: @@ -185,8 +185,9 @@ use polars::{ }; use serde::Deserialize; use smartstring; +use tempfile::tempdir; -use crate::{config::Delimiter, util, CliError, CliResult}; +use crate::{cmd::sqlp::compress_output_if_needed, config::Delimiter, util, CliError, CliResult}; #[derive(Deserialize)] struct Args { @@ -240,11 +241,14 @@ pub fn run(argv: &[&str]) -> CliResult<()> { if args.flag_asof { args.flag_try_parsedates = true; } + + let tmpdir = tempdir()?; let join = args.new_join( args.flag_try_parsedates, args.flag_infer_len, args.flag_low_memory, args.flag_ignore_errors, + &tmpdir, )?; // safety: flag_validate is always is_some() as it has a default value @@ -436,7 +440,7 @@ impl JoinStruct { // no need to use buffered writer here, as CsvWriter already does that let mut out_writer = match self.output { - Some(output_file) => { + Some(ref output_file) => { let path = Path::new(&output_file); Box::new(File::create(path).unwrap()) as Box }, @@ -456,17 +460,20 @@ impl JoinStruct { .with_null_value(self.null_value) .finish(&mut results_df)?; + compress_output_if_needed(self.output)?; + Ok(join_shape) } } impl Args { fn new_join( - &self, + &mut self, try_parsedates: bool, infer_len: usize, low_memory: bool, ignore_errors: bool, + tmpdir: &tempfile::TempDir, ) -> CliResult { let delim = if let Some(delimiter) = self.flag_delimiter { delimiter.as_byte() @@ -486,32 +493,61 @@ impl Args { Some(infer_len) }; - let mut left_lf = LazyCsvReader::new(&self.arg_input1) - .has_header(true) - .with_missing_is_null(self.flag_nulls) - .with_comment_char(comment_char) - .with_separator(delim) - .with_infer_schema_length(num_rows) - .with_try_parse_dates(try_parsedates) - .low_memory(low_memory) - .with_ignore_errors(ignore_errors) - .finish()?; + // check if the input files exist + let input1_path = Path::new(&self.arg_input1); + if !input1_path.exists() { + return fail_clierror!("Input file {} does not exist.", self.arg_input1); + } + let input2_path = Path::new(&self.arg_input2); + if !input2_path.exists() { + return fail_clierror!("Input file {} does not exist.", self.arg_input2); + } + + let mut left_lf = { + // check if the left input file is snappy compressed + // if so, we need to decompress it first + if input1_path.extension().and_then(std::ffi::OsStr::to_str) == Some("sz") { + let decompressed_path = + util::decompress_snappy_file(&input1_path.to_path_buf(), &tmpdir)?; + self.arg_input1 = decompressed_path; + } + + LazyCsvReader::new(&self.arg_input1) + .has_header(true) + .with_missing_is_null(self.flag_nulls) + .with_comment_char(comment_char) + .with_separator(delim) + .with_infer_schema_length(num_rows) + .with_try_parse_dates(try_parsedates) + .low_memory(low_memory) + .with_ignore_errors(ignore_errors) + .finish()? + }; if let Some(filter_left) = &self.flag_filter_left { let filter_left_expr = polars::sql::sql_expr(filter_left)?; left_lf = left_lf.filter(filter_left_expr); } - let mut right_lf = LazyCsvReader::new(&self.arg_input2) - .has_header(true) - .with_missing_is_null(self.flag_nulls) - .with_comment_char(comment_char) - .with_separator(delim) - .with_infer_schema_length(num_rows) - .with_try_parse_dates(try_parsedates) - .low_memory(low_memory) - .with_ignore_errors(ignore_errors) - .finish()?; + let mut right_lf = { + // check if the right input file is snappy compressed + if input2_path.extension().and_then(std::ffi::OsStr::to_str) == Some("sz") { + let decompressed_path = + util::decompress_snappy_file(&input2_path.to_path_buf(), &tmpdir)?; + self.arg_input2 = decompressed_path; + } + + LazyCsvReader::new(&self.arg_input2) + .has_header(true) + .with_missing_is_null(self.flag_nulls) + .with_comment_char(comment_char) + .with_separator(delim) + .with_infer_schema_length(num_rows) + .with_try_parse_dates(try_parsedates) + .low_memory(low_memory) + .with_ignore_errors(ignore_errors) + .finish()? + }; if let Some(filter_right) = &self.flag_filter_right { let filter_right_exprt = polars::sql::sql_expr(filter_right)?; From cc177992abbf09a31c058677b07803e6872b8a28 Mon Sep 17 00:00:00 2001 From: Joel Natividad <1980690+jqnatividad@users.noreply.github.com> Date: Sun, 12 Nov 2023 09:53:51 -0500 Subject: [PATCH 3/4] add snappy tests for sqlp and joinp --- tests/test_joinp.rs | 165 ++++++++++++++++++++++++++++++++++++++++++++ tests/test_sqlp.rs | 37 +++++++++- 2 files changed, 201 insertions(+), 1 deletion(-) diff --git a/tests/test_joinp.rs b/tests/test_joinp.rs index 4f6cf3919..e5d03f7cf 100644 --- a/tests/test_joinp.rs +++ b/tests/test_joinp.rs @@ -41,6 +41,26 @@ macro_rules! joinp_test_comments { }; } +macro_rules! joinp_test_compresssed { + ($name3:ident, $fun:expr) => { + mod $name3 { + use std::process; + + #[allow(unused_imports)] + use super::{make_rows, setup}; + use crate::workdir::Workdir; + + #[test] + fn headers() { + let wrk = setup(stringify!($name3)); + let mut cmd = wrk.command("joinp"); + cmd.args(&["city", "cities.csv.sz", "city", "places.csv.sz"]); + $fun(wrk, cmd); + } + } + }; +} + fn setup(name: &str) -> Workdir { let cities = vec![ svec!["city", "state"], @@ -70,6 +90,22 @@ fn setup(name: &str) -> Workdir { wrk.create("cities.csv", cities); wrk.create("cities_comments.csv", cities_comments); wrk.create("places.csv", places); + + // create snappy compressed versions + let out_file = wrk.path("cities.csv.sz").to_string_lossy().to_string(); + let mut cmd = wrk.command("snappy"); + cmd.arg("compress") + .arg("cities.csv") + .args(["--output", &out_file]); + wrk.assert_success(&mut cmd); + + let out_file = wrk.path("places.csv.sz").to_string_lossy().to_string(); + let mut cmd = wrk.command("snappy"); + cmd.arg("compress") + .arg("places.csv") + .args(["--output", &out_file]); + wrk.assert_success(&mut cmd); + wrk } @@ -113,6 +149,22 @@ joinp_test_comments!( } ); +joinp_test_compresssed!( + joinp_inner_compressed, + |wrk: Workdir, mut cmd: process::Command| { + let got: Vec> = wrk.read_stdout(&mut cmd); + let expected = make_rows( + false, + vec![ + svec!["Boston", "MA", "Logan Airport"], + svec!["Boston", "MA", "Boston Garden"], + svec!["Buffalo", "NY", "Ralph Wilson Stadium"], + ], + ); + assert_eq!(got, expected); + } +); + joinp_test!( joinp_outer_left, |wrk: Workdir, mut cmd: process::Command| { @@ -283,6 +335,37 @@ joinp_test!(joinp_full, |wrk: Workdir, mut cmd: process::Command| { assert!(got == expected1 || got == expected2); }); +joinp_test_compresssed!( + joinp_full_compressed, + |wrk: Workdir, mut cmd: process::Command| { + cmd.arg("--full"); + let got: Vec> = wrk.read_stdout(&mut cmd); + let expected1 = make_rows( + false, + vec![ + svec!["Boston", "MA", "Logan Airport"], + svec!["Boston", "MA", "Boston Garden"], + svec!["Buffalo", "NY", "Ralph Wilson Stadium"], + svec!["Orlando", "", "Disney World"], + svec!["San Francisco", "CA", ""], + svec!["New York", "NY", ""], + ], + ); + let expected2 = make_rows( + false, + vec![ + svec!["Boston", "MA", "Logan Airport"], + svec!["Boston", "MA", "Boston Garden"], + svec!["Buffalo", "NY", "Ralph Wilson Stadium"], + svec!["Orlando", "", "Disney World"], + svec!["New York", "NY", ""], + svec!["San Francisco", "CA", ""], + ], + ); + assert!(got == expected1 || got == expected2); + } +); + joinp_test_comments!( joinp_full_comments, |wrk: Workdir, mut cmd: process::Command| { @@ -386,6 +469,41 @@ fn joinp_cross() { assert_eq!(got, expected); } +#[test] +fn joinp_cross_compress() { + let wrk = Workdir::new("join_cross_compress"); + wrk.create( + "letters.csv", + vec![svec!["h1", "h2"], svec!["a", "b"], svec!["c", "d"]], + ); + wrk.create( + "numbers.csv", + vec![svec!["h3", "h4"], svec!["1", "2"], svec!["3", "4"]], + ); + + let out_file = wrk.path("out.csv.sz").to_string_lossy().to_string(); + + let mut cmd = wrk.command("joinp"); + cmd.arg("--cross") + .args(["letters.csv", "numbers.csv"]) + .args(["--output", &out_file]); + + wrk.assert_success(&mut cmd); + + let mut cmd2 = wrk.command("snappy"); + cmd2.arg("decompress").arg(&out_file); + + let got: Vec> = wrk.read_stdout(&mut cmd2); + let expected = vec![ + svec!["h1", "h2", "h3", "h4"], + svec!["a", "b", "1", "2"], + svec!["a", "b", "3", "4"], + svec!["c", "d", "1", "2"], + svec!["c", "d", "3", "4"], + ]; + assert_eq!(got, expected); +} + #[test] fn joinp_asof_date() { let wrk = Workdir::new("join_asof_date"); @@ -425,6 +543,53 @@ fn joinp_asof_date() { assert_eq!(got, expected); } +#[test] +fn joinp_asof_date_compress() { + let wrk = Workdir::new("join_asof_date_compress"); + wrk.create( + "gdp.csv", + vec![ + svec!["date", "gdp"], + svec!["2016-01-01", "4164"], + svec!["2017-01-01", "4411"], + svec!["2018-01-01", "4566"], + svec!["2019-01-01", "4696"], + ], + ); + wrk.create( + "population.csv", + vec![ + svec!["date", "population"], + svec!["2016-05-12", "82.19"], + svec!["2017-05-12", "82.66"], + svec!["2018-05-12", "83.12"], + svec!["2019-05-12", "83.52"], + ], + ); + + let out_file = wrk.path("out.csv.sz").to_string_lossy().to_string(); + + let mut cmd = wrk.command("joinp"); + cmd.arg("--asof") + .args(["date", "population.csv", "date", "gdp.csv"]) + .args(["--output", &out_file]); + + wrk.assert_success(&mut cmd); + + let mut cmd2 = wrk.command("snappy"); + cmd2.arg("decompress").arg(&out_file); + + let got: Vec> = wrk.read_stdout(&mut cmd2); + let expected = vec![ + svec!["date", "population", "gdp"], + svec!["2016-05-12", "82.19", "4164"], + svec!["2017-05-12", "82.66", "4411"], + svec!["2018-05-12", "83.12", "4566"], + svec!["2019-05-12", "83.52", "4696"], + ]; + assert_eq!(got, expected); +} + #[test] fn joinp_asof_date_comments() { let wrk = Workdir::new("join_asof_date_comments"); diff --git a/tests/test_sqlp.rs b/tests/test_sqlp.rs index a5d8c4b76..7b874ff81 100644 --- a/tests/test_sqlp.rs +++ b/tests/test_sqlp.rs @@ -726,7 +726,6 @@ fn sqlp_boston311_try_parsedates_format() { #[test] fn sqlp_comments() { let wrk = Workdir::new("sqlp_comments"); - // let test_file = wrk.load_test_file("inputcommenttest.csv"); wrk.create( "comments.csv", vec![ @@ -759,6 +758,42 @@ fn sqlp_comments() { assert_eq!(got, expected); } +#[test] +fn sqlp_compress() { + let wrk = Workdir::new("sqlp_compress"); + wrk.create( + "data.csv", + vec![ + svec!["column1", "column2"], + svec!["a", "1"], + svec!["c", "3"], + svec!["e", "5"], + ], + ); + + let out_file = wrk.path("out.csv.sz").to_string_lossy().to_string(); + + let mut cmd = wrk.command("sqlp"); + cmd.arg("data.csv") + .arg("select column1, column2 from data order by column2 desc") + .args(["-o", &out_file]); + + wrk.assert_success(&mut cmd); + + let mut cmd2 = wrk.command("snappy"); + cmd2.arg("decompress").arg(out_file); + + let got: Vec> = wrk.read_stdout(&mut cmd2); + let expected = vec![ + svec!["column1", "column2"], + svec!["e", "5"], + svec!["c", "3"], + svec!["a", "1"], + ]; + + assert_eq!(got, expected); +} + #[test] fn sqlp_boston311_explain() { let wrk = Workdir::new("sqlp_boston311_explain"); From be5986f2a5680a52f32a988c59b16df3ab784bbb Mon Sep 17 00:00:00 2001 From: Joel Natividad <1980690+jqnatividad@users.noreply.github.com> Date: Sun, 12 Nov 2023 10:01:50 -0500 Subject: [PATCH 4/4] clippy:needless_borrow warning: this expression creates a reference which is immediately dereferenced by the compiler --> src/cmd/joinp.rs:511:78 | 511 | util::decompress_snappy_file(&input1_path.to_path_buf(), &tmpdir)?; | ^^^^^^^ help: change this to: `tmpdir` | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#needless_borrow = note: `#[warn(clippy::needless_borrow)]` on by default warning: this expression creates a reference which is immediately dereferenced by the compiler --> src/cmd/joinp.rs:536:78 | 536 | util::decompress_snappy_file(&input2_path.to_path_buf(), &tmpdir)?; | ^^^^^^^ help: change this to: `tmpdir` | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#needless_borrow --- src/cmd/joinp.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/cmd/joinp.rs b/src/cmd/joinp.rs index 634366039..3b2124d27 100644 --- a/src/cmd/joinp.rs +++ b/src/cmd/joinp.rs @@ -508,7 +508,7 @@ impl Args { // if so, we need to decompress it first if input1_path.extension().and_then(std::ffi::OsStr::to_str) == Some("sz") { let decompressed_path = - util::decompress_snappy_file(&input1_path.to_path_buf(), &tmpdir)?; + util::decompress_snappy_file(&input1_path.to_path_buf(), tmpdir)?; self.arg_input1 = decompressed_path; } @@ -533,7 +533,7 @@ impl Args { // check if the right input file is snappy compressed if input2_path.extension().and_then(std::ffi::OsStr::to_str) == Some("sz") { let decompressed_path = - util::decompress_snappy_file(&input2_path.to_path_buf(), &tmpdir)?; + util::decompress_snappy_file(&input2_path.to_path_buf(), tmpdir)?; self.arg_input2 = decompressed_path; }