Skip to content

Commit

Permalink
Merge pull request #1413 from jqnatividad/joinp_snappy_support
Browse files Browse the repository at this point in the history
`joinp`: add snappy compression/decompression support
  • Loading branch information
jqnatividad authored Nov 12, 2023
2 parents 87cded8 + be5986f commit 0b8aee4
Show file tree
Hide file tree
Showing 4 changed files with 279 additions and 34 deletions.
84 changes: 60 additions & 24 deletions src/cmd/joinp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ For examples, see https://github.com/jqnatividad/qsv/blob/master/tests/test_join
Usage:
qsv joinp [options] <columns1> <input1> <columns2> <input2>
qsv joinp --cross <input1> <input2>
qsv joinp --cross <input1> <input2> [--output <file>]
qsv joinp --help
joinp arguments:
Expand Down Expand Up @@ -185,8 +185,9 @@ use polars::{
};
use serde::Deserialize;
use smartstring;
use tempfile::tempdir;

use crate::{config::Delimiter, util, CliError, CliResult};
use crate::{cmd::sqlp::compress_output_if_needed, config::Delimiter, util, CliError, CliResult};

#[derive(Deserialize)]
struct Args {
Expand Down Expand Up @@ -240,11 +241,14 @@ pub fn run(argv: &[&str]) -> CliResult<()> {
if args.flag_asof {
args.flag_try_parsedates = true;
}

let tmpdir = tempdir()?;
let join = args.new_join(
args.flag_try_parsedates,
args.flag_infer_len,
args.flag_low_memory,
args.flag_ignore_errors,
&tmpdir,
)?;

// safety: flag_validate is always is_some() as it has a default value
Expand Down Expand Up @@ -436,7 +440,7 @@ impl JoinStruct {

// no need to use buffered writer here, as CsvWriter already does that
let mut out_writer = match self.output {
Some(output_file) => {
Some(ref output_file) => {
let path = Path::new(&output_file);
Box::new(File::create(path).unwrap()) as Box<dyn Write>
},
Expand All @@ -456,17 +460,20 @@ impl JoinStruct {
.with_null_value(self.null_value)
.finish(&mut results_df)?;

compress_output_if_needed(self.output)?;

Ok(join_shape)
}
}

impl Args {
fn new_join(
&self,
&mut self,
try_parsedates: bool,
infer_len: usize,
low_memory: bool,
ignore_errors: bool,
tmpdir: &tempfile::TempDir,
) -> CliResult<JoinStruct> {
let delim = if let Some(delimiter) = self.flag_delimiter {
delimiter.as_byte()
Expand All @@ -486,32 +493,61 @@ impl Args {
Some(infer_len)
};

let mut left_lf = LazyCsvReader::new(&self.arg_input1)
.has_header(true)
.with_missing_is_null(self.flag_nulls)
.with_comment_char(comment_char)
.with_separator(delim)
.with_infer_schema_length(num_rows)
.with_try_parse_dates(try_parsedates)
.low_memory(low_memory)
.with_ignore_errors(ignore_errors)
.finish()?;
// check if the input files exist
let input1_path = Path::new(&self.arg_input1);
if !input1_path.exists() {
return fail_clierror!("Input file {} does not exist.", self.arg_input1);
}
let input2_path = Path::new(&self.arg_input2);
if !input2_path.exists() {
return fail_clierror!("Input file {} does not exist.", self.arg_input2);
}

let mut left_lf = {
// check if the left input file is snappy compressed
// if so, we need to decompress it first
if input1_path.extension().and_then(std::ffi::OsStr::to_str) == Some("sz") {
let decompressed_path =
util::decompress_snappy_file(&input1_path.to_path_buf(), tmpdir)?;
self.arg_input1 = decompressed_path;
}

LazyCsvReader::new(&self.arg_input1)
.has_header(true)
.with_missing_is_null(self.flag_nulls)
.with_comment_char(comment_char)
.with_separator(delim)
.with_infer_schema_length(num_rows)
.with_try_parse_dates(try_parsedates)
.low_memory(low_memory)
.with_ignore_errors(ignore_errors)
.finish()?
};

if let Some(filter_left) = &self.flag_filter_left {
let filter_left_expr = polars::sql::sql_expr(filter_left)?;
left_lf = left_lf.filter(filter_left_expr);
}

let mut right_lf = LazyCsvReader::new(&self.arg_input2)
.has_header(true)
.with_missing_is_null(self.flag_nulls)
.with_comment_char(comment_char)
.with_separator(delim)
.with_infer_schema_length(num_rows)
.with_try_parse_dates(try_parsedates)
.low_memory(low_memory)
.with_ignore_errors(ignore_errors)
.finish()?;
let mut right_lf = {
// check if the right input file is snappy compressed
if input2_path.extension().and_then(std::ffi::OsStr::to_str) == Some("sz") {
let decompressed_path =
util::decompress_snappy_file(&input2_path.to_path_buf(), tmpdir)?;
self.arg_input2 = decompressed_path;
}

LazyCsvReader::new(&self.arg_input2)
.has_header(true)
.with_missing_is_null(self.flag_nulls)
.with_comment_char(comment_char)
.with_separator(delim)
.with_infer_schema_length(num_rows)
.with_try_parse_dates(try_parsedates)
.low_memory(low_memory)
.with_ignore_errors(ignore_errors)
.finish()?
};

if let Some(filter_right) = &self.flag_filter_right {
let filter_right_exprt = polars::sql::sql_expr(filter_right)?;
Expand Down
27 changes: 18 additions & 9 deletions src/cmd/sqlp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,6 @@ use serde::Deserialize;
use tempfile;

use crate::{
cmd::snappy::compress,
config::{Delimiter, DEFAULT_WTR_BUFFER_CAPACITY},
util,
util::process_input,
Expand Down Expand Up @@ -549,8 +548,23 @@ pub fn run(argv: &[&str]) -> CliResult<()> {
}
}

if let Some(output) = args.flag_output {
// if the output ends with ".sz", we snappy compress the output
compress_output_if_needed(args.flag_output)?;

if !args.flag_quiet {
eprintln!("{query_result_shape:?}");
}

Ok(())
}

/// if the output ends with ".sz", we snappy compress the output
/// and replace the original output with the compressed output
pub fn compress_output_if_needed(
output_file: Option<String>,
) -> Result<(), crate::clitypes::CliError> {
use crate::cmd::snappy::compress;

if let Some(output) = output_file {
if std::path::Path::new(&output)
.extension()
.map_or(false, |ext| ext.eq_ignore_ascii_case("sz"))
Expand All @@ -575,11 +589,6 @@ pub fn run(argv: &[&str]) -> CliResult<()> {
DEFAULT_WTR_BUFFER_CAPACITY,
)?;
}
}

if !args.flag_quiet {
eprintln!("{query_result_shape:?}");
}

};
Ok(())
}
Loading

0 comments on commit 0b8aee4

Please sign in to comment.