diff --git a/src/cmd/applydp.rs b/src/cmd/applydp.rs index 410c9fd20..40fbfb760 100644 --- a/src/cmd/applydp.rs +++ b/src/cmd/applydp.rs @@ -171,7 +171,7 @@ applydp options: -j, --jobs The number of jobs to run in parallel. When not set, the number of jobs is set to the number of CPUs detected. -b, --batch The number of rows per batch to load into memory, before running in parallel. - [default: 50000] + Set to 0 to load all rows at once. [default: 50000] Common options: -h, --help Display this message @@ -241,7 +241,7 @@ struct Args { flag_comparand: String, flag_replacement: String, flag_formatstr: String, - flag_batch: u32, + flag_batch: usize, flag_jobs: Option, flag_new_column: Option, flag_output: Option, @@ -377,7 +377,11 @@ pub fn run(argv: &[&str]) -> CliResult<()> { let mut batch_record = csv::StringRecord::new(); // reuse batch buffers - let batchsize: usize = args.flag_batch as usize; + let batchsize: usize = if args.flag_batch == 0 { + util::count_rows(&rconfig)? as usize + } else { + args.flag_batch + }; let mut batch = Vec::with_capacity(batchsize); let mut batch_results = Vec::with_capacity(batchsize); diff --git a/src/cmd/datefmt.rs b/src/cmd/datefmt.rs index 70d9501c1..b846a3c43 100644 --- a/src/cmd/datefmt.rs +++ b/src/cmd/datefmt.rs @@ -84,6 +84,7 @@ datefmt options: -j, --jobs The number of jobs to run in parallel. When not set, the number of jobs is set to the number of CPUs detected. -b, --batch The number of rows per batch to load into memory, before running in parallel. + Set to 0 to load all rows at once. [default: 50000] Common options: @@ -132,7 +133,7 @@ struct Args { flag_default_tz: Option, flag_utc: bool, flag_zulu: bool, - flag_batch: u32, + flag_batch: usize, flag_jobs: Option, flag_new_column: Option, flag_output: Option, @@ -252,7 +253,11 @@ pub fn run(argv: &[&str]) -> CliResult<()> { let mut batch_record = csv::StringRecord::new(); // reuse batch buffers - let batchsize: usize = args.flag_batch as usize; + let batchsize: usize = if args.flag_batch == 0 { + util::count_rows(&rconfig)? as usize + } else { + args.flag_batch + }; let mut batch = Vec::with_capacity(batchsize); let mut batch_results = Vec::with_capacity(batchsize); diff --git a/src/cmd/geocode.rs b/src/cmd/geocode.rs index 0b6a2c901..35be846fc 100644 --- a/src/cmd/geocode.rs +++ b/src/cmd/geocode.rs @@ -322,6 +322,7 @@ geocode options: -j, --jobs The number of jobs to run in parallel. When not set, the number of jobs is set to the number of CPUs detected. -b, --batch The number of rows per batch to load into memory, before running in parallel. + Set to 0 to load all rows at once. [default: 50000] --timeout Timeout for downloading Geonames cities index. [default: 120] @@ -424,7 +425,7 @@ struct Args { flag_formatstr: String, flag_language: String, flag_invalid_result: Option, - flag_batch: u32, + flag_batch: usize, flag_timeout: u16, flag_cache_dir: String, flag_languages: String, @@ -1102,7 +1103,11 @@ async fn geocode_main(args: Args) -> CliResult<()> { let mut batch_record = csv::StringRecord::new(); // reuse batch buffers - let batchsize: usize = args.flag_batch as usize; + let batchsize: usize = if args.flag_batch == 0 { + util::count_rows(&rconfig)? as usize + } else { + args.flag_batch + }; let mut batch = Vec::with_capacity(batchsize); let mut batch_results = Vec::with_capacity(batchsize); diff --git a/src/cmd/jsonl.rs b/src/cmd/jsonl.rs index 167c2ecfe..3c5ecc539 100644 --- a/src/cmd/jsonl.rs +++ b/src/cmd/jsonl.rs @@ -20,7 +20,8 @@ jsonl options: When not set, the number of jobs is set to the number of CPUs detected. -b, --batch The number of rows per batch to load into memory, - before running in parallel. [default: 50000] + before running in parallel. Set to 0 to load all + rows at once. [default: 50000] Common options: -h, --help Display this message @@ -53,7 +54,7 @@ struct Args { flag_delimiter: Option, flag_ignore_errors: bool, flag_jobs: Option, - flag_batch: u32, + flag_batch: usize, } fn recurse_to_infer_headers(value: &Value, headers: &mut Vec>, path: &[String]) { @@ -152,8 +153,12 @@ pub fn run(argv: &[&str]) -> CliResult<()> { .delimiter(args.flag_delimiter) .writer()?; + let mut is_stdin = false; let mut rdr: Box = match args.arg_input { - None => Box::new(BufReader::new(io::stdin())), + None => { + is_stdin = true; + Box::new(BufReader::new(io::stdin())) + }, Some(p) => Box::new(BufReader::with_capacity( DEFAULT_RDR_BUFFER_CAPACITY, fs::File::open(p)?, @@ -167,7 +172,18 @@ pub fn run(argv: &[&str]) -> CliResult<()> { let mut batch_line = String::new(); // reuse batch buffers - let batchsize: usize = args.flag_batch as usize; + let batchsize: usize = if args.flag_batch == 0 { + if is_stdin { + // if stdin, we don't know how many lines there are + // so just make a reasonably big batch size + 1_000_000 + } else { + // safety: we know flag_output is Some coz of the std_in check above + util::count_lines_in_file(&args.flag_output.unwrap())? as usize + } + } else { + args.flag_batch + }; let mut batch = Vec::with_capacity(batchsize); let mut batch_results = Vec::with_capacity(batchsize); diff --git a/src/cmd/tojsonl.rs b/src/cmd/tojsonl.rs index b4d2c7848..32f2e42af 100644 --- a/src/cmd/tojsonl.rs +++ b/src/cmd/tojsonl.rs @@ -27,7 +27,8 @@ Tojsonl options: When not set, the number of jobs is set to the number of CPUs detected. -b, --batch The number of rows per batch to load into memory, - before running in parallel. [default: 50000] + before running in parallel. Set to 0 to load all + rows at once. [default: 50000] Common options: -h, --help Display this message @@ -60,7 +61,7 @@ struct Args { flag_trim: bool, flag_no_boolean: bool, flag_jobs: Option, - flag_batch: u32, + flag_batch: usize, flag_delimiter: Option, flag_output: Option, flag_memcheck: bool, @@ -255,7 +256,11 @@ pub fn run(argv: &[&str]) -> CliResult<()> { let mut batch_record = csv::StringRecord::new(); // reuse batch buffers - let batchsize: usize = args.flag_batch as usize; + let batchsize: usize = if args.flag_batch == 0 { + record_count as usize + } else { + args.flag_batch + }; let mut batch = Vec::with_capacity(batchsize); let mut batch_results = Vec::with_capacity(batchsize); diff --git a/src/cmd/validate.rs b/src/cmd/validate.rs index c8cea46f7..dba8b99c9 100644 --- a/src/cmd/validate.rs +++ b/src/cmd/validate.rs @@ -89,7 +89,7 @@ Validate options: When not set, the number of jobs is set to the number of CPUs detected. -b, --batch The number of rows per batch to load into memory, - before running in parallel. + before running in parallel. Set to 0 to load all rows at once. [default: 50000] --timeout Timeout for downloading json-schemas on URLs. [default: 30] @@ -158,7 +158,7 @@ struct Args { flag_pretty_json: bool, flag_valid_output: Option, flag_jobs: Option, - flag_batch: u32, + flag_batch: usize, flag_no_headers: bool, flag_delimiter: Option, flag_progressbar: bool, @@ -534,7 +534,11 @@ pub fn run(argv: &[&str]) -> CliResult<()> { // amortize memory allocation by reusing record let mut record = csv::ByteRecord::new(); // reuse batch buffer - let batch_size = args.flag_batch as usize; + let batch_size = if args.flag_batch == 0 { + util::count_rows(&rconfig)? as usize + } else { + args.flag_batch + }; let mut batch = Vec::with_capacity(batch_size); let mut validation_results = Vec::with_capacity(batch_size); let mut valid_flags: Vec = Vec::with_capacity(batch_size); diff --git a/src/util.rs b/src/util.rs index a2e48e0b7..6650a80af 100644 --- a/src/util.rs +++ b/src/util.rs @@ -4,7 +4,7 @@ use std::{ cmp::min, env, fs, fs::File, - io::{BufReader, BufWriter, Read, Write}, + io::{BufRead, BufReader, BufWriter, Read, Write}, path::{Path, PathBuf}, str, sync::OnceLock, @@ -372,6 +372,14 @@ pub fn count_rows_regular(conf: &Config) -> Result { } } +pub fn count_lines_in_file(file: &str) -> Result { + let file = File::open(file)?; + let reader = BufReader::new(file); + + let line_count = reader.lines().count() as u64; + Ok(line_count) +} + #[cfg(any(feature = "feature_capable", feature = "lite"))] pub fn prep_progress(progress: &ProgressBar, record_count: u64) { progress.set_style(