Skip to content

Commit

Permalink
Merge pull request #1928 from jqnatividad/batch-size_zero
Browse files Browse the repository at this point in the history
setting `--batch` to 0 loads all rows at once before parallel processing
  • Loading branch information
jqnatividad authored Jun 29, 2024
2 parents 99b1715 + d8a22f6 commit 827ea3f
Show file tree
Hide file tree
Showing 7 changed files with 65 additions and 18 deletions.
10 changes: 7 additions & 3 deletions src/cmd/applydp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ applydp options:
-j, --jobs <arg> The number of jobs to run in parallel.
When not set, the number of jobs is set to the number of CPUs detected.
-b, --batch <size> The number of rows per batch to load into memory, before running in parallel.
[default: 50000]
Set to 0 to load all rows at once. [default: 50000]
Common options:
-h, --help Display this message
Expand Down Expand Up @@ -241,7 +241,7 @@ struct Args {
flag_comparand: String,
flag_replacement: String,
flag_formatstr: String,
flag_batch: u32,
flag_batch: usize,
flag_jobs: Option<usize>,
flag_new_column: Option<String>,
flag_output: Option<String>,
Expand Down Expand Up @@ -377,7 +377,11 @@ pub fn run(argv: &[&str]) -> CliResult<()> {
let mut batch_record = csv::StringRecord::new();

// reuse batch buffers
let batchsize: usize = args.flag_batch as usize;
let batchsize: usize = if args.flag_batch == 0 {
util::count_rows(&rconfig)? as usize
} else {
args.flag_batch
};
let mut batch = Vec::with_capacity(batchsize);
let mut batch_results = Vec::with_capacity(batchsize);

Expand Down
9 changes: 7 additions & 2 deletions src/cmd/datefmt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ datefmt options:
-j, --jobs <arg> The number of jobs to run in parallel.
When not set, the number of jobs is set to the number of CPUs detected.
-b, --batch <size> The number of rows per batch to load into memory, before running in parallel.
Set to 0 to load all rows at once.
[default: 50000]
Common options:
Expand Down Expand Up @@ -132,7 +133,7 @@ struct Args {
flag_default_tz: Option<String>,
flag_utc: bool,
flag_zulu: bool,
flag_batch: u32,
flag_batch: usize,
flag_jobs: Option<usize>,
flag_new_column: Option<String>,
flag_output: Option<String>,
Expand Down Expand Up @@ -252,7 +253,11 @@ pub fn run(argv: &[&str]) -> CliResult<()> {
let mut batch_record = csv::StringRecord::new();

// reuse batch buffers
let batchsize: usize = args.flag_batch as usize;
let batchsize: usize = if args.flag_batch == 0 {
util::count_rows(&rconfig)? as usize
} else {
args.flag_batch
};
let mut batch = Vec::with_capacity(batchsize);
let mut batch_results = Vec::with_capacity(batchsize);

Expand Down
9 changes: 7 additions & 2 deletions src/cmd/geocode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,7 @@ geocode options:
-j, --jobs <arg> The number of jobs to run in parallel.
When not set, the number of jobs is set to the number of CPUs detected.
-b, --batch <size> The number of rows per batch to load into memory, before running in parallel.
Set to 0 to load all rows at once.
[default: 50000]
--timeout <seconds> Timeout for downloading Geonames cities index.
[default: 120]
Expand Down Expand Up @@ -424,7 +425,7 @@ struct Args {
flag_formatstr: String,
flag_language: String,
flag_invalid_result: Option<String>,
flag_batch: u32,
flag_batch: usize,
flag_timeout: u16,
flag_cache_dir: String,
flag_languages: String,
Expand Down Expand Up @@ -1102,7 +1103,11 @@ async fn geocode_main(args: Args) -> CliResult<()> {
let mut batch_record = csv::StringRecord::new();

// reuse batch buffers
let batchsize: usize = args.flag_batch as usize;
let batchsize: usize = if args.flag_batch == 0 {
util::count_rows(&rconfig)? as usize
} else {
args.flag_batch
};
let mut batch = Vec::with_capacity(batchsize);
let mut batch_results = Vec::with_capacity(batchsize);

Expand Down
24 changes: 20 additions & 4 deletions src/cmd/jsonl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ jsonl options:
When not set, the number of jobs is set to the
number of CPUs detected.
-b, --batch <size> The number of rows per batch to load into memory,
before running in parallel. [default: 50000]
before running in parallel. Set to 0 to load all
rows at once. [default: 50000]
Common options:
-h, --help Display this message
Expand Down Expand Up @@ -53,7 +54,7 @@ struct Args {
flag_delimiter: Option<Delimiter>,
flag_ignore_errors: bool,
flag_jobs: Option<usize>,
flag_batch: u32,
flag_batch: usize,
}

fn recurse_to_infer_headers(value: &Value, headers: &mut Vec<Vec<String>>, path: &[String]) {
Expand Down Expand Up @@ -152,8 +153,12 @@ pub fn run(argv: &[&str]) -> CliResult<()> {
.delimiter(args.flag_delimiter)
.writer()?;

let mut is_stdin = false;
let mut rdr: Box<dyn BufRead> = match args.arg_input {
None => Box::new(BufReader::new(io::stdin())),
None => {
is_stdin = true;
Box::new(BufReader::new(io::stdin()))
},
Some(p) => Box::new(BufReader::with_capacity(
DEFAULT_RDR_BUFFER_CAPACITY,
fs::File::open(p)?,
Expand All @@ -167,7 +172,18 @@ pub fn run(argv: &[&str]) -> CliResult<()> {
let mut batch_line = String::new();

// reuse batch buffers
let batchsize: usize = args.flag_batch as usize;
let batchsize: usize = if args.flag_batch == 0 {
if is_stdin {
// if stdin, we don't know how many lines there are
// so just make a reasonably big batch size
1_000_000
} else {
// safety: we know flag_output is Some coz of the std_in check above
util::count_lines_in_file(&args.flag_output.unwrap())? as usize
}
} else {
args.flag_batch
};
let mut batch = Vec::with_capacity(batchsize);
let mut batch_results = Vec::with_capacity(batchsize);

Expand Down
11 changes: 8 additions & 3 deletions src/cmd/tojsonl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ Tojsonl options:
When not set, the number of jobs is set to the
number of CPUs detected.
-b, --batch <size> The number of rows per batch to load into memory,
before running in parallel. [default: 50000]
before running in parallel. Set to 0 to load all
rows at once. [default: 50000]
Common options:
-h, --help Display this message
Expand Down Expand Up @@ -60,7 +61,7 @@ struct Args {
flag_trim: bool,
flag_no_boolean: bool,
flag_jobs: Option<usize>,
flag_batch: u32,
flag_batch: usize,
flag_delimiter: Option<Delimiter>,
flag_output: Option<String>,
flag_memcheck: bool,
Expand Down Expand Up @@ -255,7 +256,11 @@ pub fn run(argv: &[&str]) -> CliResult<()> {
let mut batch_record = csv::StringRecord::new();

// reuse batch buffers
let batchsize: usize = args.flag_batch as usize;
let batchsize: usize = if args.flag_batch == 0 {
record_count as usize
} else {
args.flag_batch
};
let mut batch = Vec::with_capacity(batchsize);
let mut batch_results = Vec::with_capacity(batchsize);

Expand Down
10 changes: 7 additions & 3 deletions src/cmd/validate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ Validate options:
When not set, the number of jobs is set to the
number of CPUs detected.
-b, --batch <size> The number of rows per batch to load into memory,
before running in parallel.
before running in parallel. Set to 0 to load all rows at once.
[default: 50000]
--timeout <seconds> Timeout for downloading json-schemas on URLs.
[default: 30]
Expand Down Expand Up @@ -158,7 +158,7 @@ struct Args {
flag_pretty_json: bool,
flag_valid_output: Option<String>,
flag_jobs: Option<usize>,
flag_batch: u32,
flag_batch: usize,
flag_no_headers: bool,
flag_delimiter: Option<Delimiter>,
flag_progressbar: bool,
Expand Down Expand Up @@ -534,7 +534,11 @@ pub fn run(argv: &[&str]) -> CliResult<()> {
// amortize memory allocation by reusing record
let mut record = csv::ByteRecord::new();
// reuse batch buffer
let batch_size = args.flag_batch as usize;
let batch_size = if args.flag_batch == 0 {
util::count_rows(&rconfig)? as usize
} else {
args.flag_batch
};
let mut batch = Vec::with_capacity(batch_size);
let mut validation_results = Vec::with_capacity(batch_size);
let mut valid_flags: Vec<bool> = Vec::with_capacity(batch_size);
Expand Down
10 changes: 9 additions & 1 deletion src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::{
cmp::min,
env, fs,
fs::File,
io::{BufReader, BufWriter, Read, Write},
io::{BufRead, BufReader, BufWriter, Read, Write},
path::{Path, PathBuf},
str,
sync::OnceLock,
Expand Down Expand Up @@ -372,6 +372,14 @@ pub fn count_rows_regular(conf: &Config) -> Result<u64, CliError> {
}
}

pub fn count_lines_in_file(file: &str) -> Result<u64, CliError> {
let file = File::open(file)?;
let reader = BufReader::new(file);

let line_count = reader.lines().count() as u64;
Ok(line_count)
}

#[cfg(any(feature = "feature_capable", feature = "lite"))]
pub fn prep_progress(progress: &ProgressBar, record_count: u64) {
progress.set_style(
Expand Down

0 comments on commit 827ea3f

Please sign in to comment.