Skip to content

Commit

Permalink
Merge pull request #2256 from jqnatividad/2225-sqlp-derive-polars-sch…
Browse files Browse the repository at this point in the history
…ema-from-stats

`sqlp`: derive polars schema from stats cache
  • Loading branch information
jqnatividad authored Oct 27, 2024
2 parents 86fe22e + 382d4c7 commit 2d70a22
Show file tree
Hide file tree
Showing 3 changed files with 141 additions and 44 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
| [index](/src/cmd/index.rs#L2) | Create an index (📇) for a CSV. This is very quick (even the 15gb, 28m row NYC 311 dataset takes all of 14 seconds to index) & provides constant time indexing/random access into the CSV. With an index, `count`, `sample` & `slice` work instantaneously; random access mode is enabled in `luau`; and multithreading (🏎️) is enabled for the `frequency`, `split`, `stats`, `schema` & `tojsonl` commands. |
| [input](/src/cmd/input.rs#L2) | Read CSV data with special commenting, quoting, trimming, line-skipping & non-UTF8 encoding handling rules. Typically used to "normalize" a CSV for further processing with other qsv commands. |
| [join](/src/cmd/join.rs#L2)<br>👆 | Inner, outer, right, cross, anti & semi joins. Automatically creates a simple, in-memory hash index to make it fast. |
| [joinp](/src/cmd/joinp.rs#L2)<br>✨🚀🐻‍❄️ | Inner, outer, right, cross, anti, semi & asof joins using the [Pola.rs](https://www.pola.rs) engine. Unlike the `join` command, `joinp` can process files larger than RAM, is multithreaded, has join key validation, pre-join filtering, supports [asof joins](https://pola-rs.github.io/polars/py-polars/html/reference/dataframe/api/polars.DataFrame.join_asof.html) (which is [particularly useful for time series data](https://github.com/jqnatividad/qsv/blob/30cc920d0812a854fcbfedc5db81788a0600c92b/tests/test_joinp.rs#L509-L983)) & its output columns can be coalesced. However, `joinp` doesn't have an --ignore-case option. |
| [joinp](/src/cmd/joinp.rs#L2)<br>✨🚀🐻‍❄️🪄 | Inner, outer, right, cross, anti, semi & asof joins using the [Pola.rs](https://www.pola.rs) engine. Unlike the `join` command, `joinp` can process files larger than RAM, is multithreaded, has join key validation, pre-join filtering, supports [asof joins](https://pola-rs.github.io/polars/py-polars/html/reference/dataframe/api/polars.DataFrame.join_asof.html) (which is [particularly useful for time series data](https://github.com/jqnatividad/qsv/blob/30cc920d0812a854fcbfedc5db81788a0600c92b/tests/test_joinp.rs#L509-L983)) & its output columns can be coalesced. However, `joinp` doesn't have an --ignore-case option. |
| [json](/src/cmd/json.rs#L2)<br>👆 | Convert JSON to CSV.
| [jsonl](/src/cmd/jsonl.rs#L2)<br>🚀🔣 | Convert newline-delimited JSON ([JSONL](https://jsonlines.org/)/[NDJSON](http://ndjson.org/)) to CSV. See `tojsonl` command to convert CSV to JSONL.
| [lens](/src/cmd/lens.rs#L2) | Interactively view, search & filter a CSV using the [csvlens](https://github.com/YS-L/csvlens#csvlens) engine.
Expand All @@ -81,7 +81,7 @@
| [sort](/src/cmd/sort.rs#L2)<br>🚀🤯👆 | Sorts CSV data in alphabetical (with case-insensitive option), numerical, reverse, unique or random (with optional seed) order (See also `extsort` & `sortcheck` commands). |
| [sortcheck](/src/cmd/sortcheck.rs#L2)<br>📇👆 | Check if a CSV is sorted. With the --json options, also retrieve record count, sort breaks & duplicate count. |
| [split](/src/cmd/split.rs#L2)<br>📇🏎️ | Split one CSV file into many CSV files. It can split by number of rows, number of chunks or file size. Uses multithreading to go faster if an index is present when splitting by rows or chunks. |
| [sqlp](/src/cmd/sqlp.rs#L2)<br>✨🚀🐻‍❄️🗄️ | Run [Polars](https://pola.rs) SQL queries against several CSVs - converting queries to blazing-fast [LazyFrame](https://docs.pola.rs/user-guide/lazy/using/) expressions, processing larger than memory CSV files. Query results can be saved in CSV, JSON, JSONL, Parquet, Apache Arrow IPC and Apache Avro formats. |
| [sqlp](/src/cmd/sqlp.rs#L2)<br>✨🚀🐻‍❄️🗄️🪄 | Run [Polars](https://pola.rs) SQL queries against several CSVs - converting queries to blazing-fast [LazyFrame](https://docs.pola.rs/user-guide/lazy/using/) expressions, processing larger than memory CSV files. Query results can be saved in CSV, JSON, JSONL, Parquet, Apache Arrow IPC and Apache Avro formats. |
| [stats](/src/cmd/stats.rs#L2)<br>📇🤯🏎️👆🪄 | Compute [summary statistics](https://en.wikipedia.org/wiki/Summary_statistics) (sum, min/max/range, sort order, min/max/sum/avg length, mean, standard error of the mean (SEM), stddev, variance, Coefficient of Variation (CV), nullcount, max precision, sparsity, quartiles, Interquartile Range (IQR), lower/upper fences, skewness, median, mode/s, antimode/s & cardinality) & make GUARANTEED data type inferences (Null, String, Float, Integer, Date, DateTime, Boolean) for each column in a CSV ([more info](https://github.com/jqnatividad/qsv/wiki/Supplemental#stats-command-output-explanation)).<br>Uses multithreading to go faster if an index is present (with an index, can compile "streaming" stats on NYC's 311 data (15gb, 28m rows) in less than 7.3 seconds!). |
| [table](/src/cmd/table.rs#L2)<br>🤯 | Show aligned output of a CSV using [elastic tabstops](https://github.com/BurntSushi/tabwriter). To interactively view a CSV, use the `lens` command. |
| [to](/src/cmd/to.rs#L2)<br>✨🚀🗄️ | Convert CSV files to [PostgreSQL](https://www.postgresql.org), [SQLite](https://www.sqlite.org/index.html), XLSX and [Data Package](https://datahub.io/docs/data-packages/tabular). |
Expand Down
99 changes: 88 additions & 11 deletions src/cmd/sqlp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,9 +188,12 @@ sqlp options:
Set to 0 to do a full table scan (warning: can be slow).
[default: 10000]
--cache-schema Create and cache Polars schema JSON files.
If specified and the schema file/s do not exist, it will save the
inferred schemas in JSON format. Each schema file will have the same
file stem as the corresponding input file, with the extension ".pschema.json"
If specified and the schema file/s do not exist, it will check if a
stats cache is available. If so, it will use it to derive a Polars schema
and save it. If there's no stats cache, it will infer the schema
using --infer-len and save the inferred schemas.
Each schema file will have the same file stem as the corresponding
input file, with the extension ".pschema.json"
(data.csv's Polars schema file will be data.pschema.json)
If the file/s exists, it will load the schema instead of inferring it
(ignoring --infer-len) and attempt to use it for each corresponding
Expand Down Expand Up @@ -288,7 +291,7 @@ use crate::{
cmd::joinp::tsvssv_delim,
config::{Config, Delimiter, DEFAULT_WTR_BUFFER_CAPACITY},
util,
util::process_input,
util::{get_stats_records, process_input},
CliResult,
};

Expand Down Expand Up @@ -770,13 +773,86 @@ pub fn run(argv: &[&str]) -> CliResult<()> {
.with_decimal_comma(args.flag_decimal_comma)
.with_low_memory(args.flag_low_memory);

// --cache-schema is enabled, check if a valid pschema.json file exists for this
// table
let schema_file = table.canonicalize()?.with_extension("pschema.json");
if schema_file.exists()
&& schema_file.metadata()?.modified()? > table.metadata()?.modified()?
{
// We have a valid pschema.json file - it exists and is newer than the table

// check if the pschema.json file exists and is newer than the table file
let mut valid_schema_exists = schema_file.exists()
&& schema_file.metadata()?.modified()? > table.metadata()?.modified()?;

if !valid_schema_exists {
// we don't have a valid pschema.json file,
// check if we have stats, as we can derive pschema.json file from it
let schema_args = util::SchemaArgs {
flag_enum_threshold: 0,
flag_ignore_case: false,
flag_strict_dates: false,
// we still get all the stats columns so we can use the stats cache
flag_pattern_columns: crate::select::SelectColumns::parse("").unwrap(),
flag_dates_whitelist: String::new(),
flag_prefer_dmy: false,
flag_force: false,
flag_stdout: false,
flag_jobs: Some(util::njobs(None)),
flag_no_headers: false,
flag_delimiter: args.flag_delimiter,
arg_input: Some(table.to_string_lossy().into_owned()),
flag_memcheck: false,
};
let (csv_fields, csv_stats) =
get_stats_records(&schema_args, util::StatsMode::PolarsSchema)?;

let mut schema = Schema::with_capacity(csv_stats.len());
for (idx, stat) in csv_stats.iter().enumerate() {
schema.insert(
PlSmallStr::from_str(
simdutf8::basic::from_utf8(csv_fields.get(idx).unwrap()).unwrap(),
),
{
let datatype = &stat.r#type;
#[allow(clippy::match_same_arms)]
match datatype.as_str() {
"String" => polars::datatypes::DataType::String,
"Integer" => {
let min = stat.min.as_ref().unwrap();
let max = stat.max.as_ref().unwrap();
if min.parse::<i32>().is_ok() && max.parse::<i32>().is_ok()
{
polars::datatypes::DataType::Int32
} else {
polars::datatypes::DataType::Int64
}
},
"Float" => {
let min = stat.min.as_ref().unwrap();
let max = stat.max.as_ref().unwrap();
if min.parse::<f32>().is_ok() && max.parse::<f32>().is_ok()
{
polars::datatypes::DataType::Float32
} else {
polars::datatypes::DataType::Float64
}
},
"Boolean" => polars::datatypes::DataType::Boolean,
"Date" => polars::datatypes::DataType::Date,
_ => polars::datatypes::DataType::String,
}
},
);
}
let stats_schema = Arc::new(schema);
let stats_schema_json = serde_json::to_string_pretty(&stats_schema)?;

let mut file = BufWriter::new(File::create(&schema_file)?);
file.write_all(stats_schema_json.as_bytes())?;
file.flush()?;
if debuglog_flag {
log::debug!("Saved stats_schema to file: {}", schema_file.display());
}
valid_schema_exists = true;
}

if valid_schema_exists {
// We have a valid pschema.json file!
// load the schema and deserialize it and use it with the lazy frame
let file = File::open(&schema_file)?;
let mut buf_reader = BufReader::new(file);
Expand Down Expand Up @@ -813,10 +889,11 @@ pub fn run(argv: &[&str]) -> CliResult<()> {
ctx.register(table_name, lf.clone().with_optimizations(optflags));

// the lazy frame's schema has been updated and --cache-schema is enabled
// update the pschema.json file
// update the pschema.json file, if necessary
if create_schema {
let schema = lf.collect_schema()?;
let schema_json = serde_json::to_string_pretty(&schema)?;

let schema_file = table.canonicalize()?.with_extension("pschema.json");
let mut file = BufWriter::new(File::create(&schema_file)?);
file.write_all(schema_json.as_bytes())?;
Expand Down
82 changes: 51 additions & 31 deletions src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ pub enum StatsMode {
Schema,
Frequency,
FrequencyForceStats,
PolarsSchema,
None,
}

Expand Down Expand Up @@ -1995,6 +1996,7 @@ pub fn get_stats_records(
stats_data_loaded = true;
}

// otherwise, run stats command to generate stats.csv.data.jsonl file
if !stats_data_loaded {
let stats_args = crate::cmd::stats::Args {
arg_input: args.arg_input.clone(),
Expand Down Expand Up @@ -2022,7 +2024,6 @@ pub fn get_stats_records(
flag_memcheck: args.flag_memcheck,
};

// otherwise, run stats command to generate stats.csv.data.jsonl file
let tempfile = tempfile::Builder::new()
.suffix(".stats.csv")
.tempfile()
Expand All @@ -2031,36 +2032,55 @@ pub fn get_stats_records(

let statsdatajson_path = canonical_input_path.with_extension("stats.csv.data.jsonl");

let mut stats_args_str = if mode == StatsMode::Schema {
// mode is GetStatsMode::Schema
// we're generating schema, so we need cardinality and to infer-dates
format!(
"stats {input} --infer-dates --dates-whitelist {dates_whitelist} --round 4 \
--cardinality --stats-jsonl --force --output {output}",
input = {
if let Some(arg_input) = stats_args.arg_input.clone() {
arg_input
} else {
"-".to_string()
}
},
dates_whitelist = stats_args.flag_dates_whitelist,
output = tempfile_path,
)
} else {
// mode is GetStatsMode::Frequency or GetStatsMode::FrequencyForceStats
// we're doing frequency, so we just need cardinality
format!(
"stats {input} --cardinality --stats-jsonl --output {output}",
input = {
if let Some(arg_input) = stats_args.arg_input.clone() {
arg_input
} else {
"-".to_string()
}
},
output = tempfile_path,
)
let mut stats_args_str = match mode {
StatsMode::Schema => {
// mode is StatsMode::Schema
// we're generating schema, so we need cardinality and to infer-dates
format!(
"stats {input} --infer-dates --dates-whitelist {dates_whitelist} --round 4 \
--cardinality --stats-jsonl --force --output {output}",
input = {
if let Some(arg_input) = stats_args.arg_input.clone() {
arg_input
} else {
"-".to_string()
}
},
dates_whitelist = stats_args.flag_dates_whitelist,
output = tempfile_path,
)
},
StatsMode::Frequency | StatsMode::FrequencyForceStats => {
// StatsMode::Frequency or StatsMode::FrequencyForceStats
// we're doing frequency, so we just need cardinality
format!(
"stats {input} --cardinality --stats-jsonl --output {output}",
input = {
if let Some(arg_input) = stats_args.arg_input.clone() {
arg_input
} else {
"-".to_string()
}
},
output = tempfile_path,
)
},
StatsMode::PolarsSchema => {
// StatsMode::PolarsSchema
// we need data types and ranges
format!(
"stats {input} --infer-boolean --stats-jsonl --output {output}",
input = {
if let Some(arg_input) = stats_args.arg_input.clone() {
arg_input
} else {
"-".to_string()
}
},
output = tempfile_path,
)
},
StatsMode::None => unreachable!(), // we returned early on None earlier
};
if args.flag_prefer_dmy {
stats_args_str = format!("{stats_args_str} --prefer-dmy");
Expand Down

0 comments on commit 2d70a22

Please sign in to comment.