Skip to content

Commit

Permalink
py: use latest bounds API, replacing deprecated gils_ref methods
Browse files Browse the repository at this point in the history
- also added a special value to --batch to process the entire file in one batch
  • Loading branch information
jqnatividad committed Jun 29, 2024
1 parent 9bbb98b commit 23edde9
Showing 1 changed file with 23 additions and 15 deletions.
38 changes: 23 additions & 15 deletions src/cmd/python.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,8 @@ py options:
-b, --batch <size> The number of rows per batch to process before
releasing memory and acquiring a new GILpool.
See https://pyo3.rs/v0.17.1/memory.html#gil-bound-memory
Set to 0 to process the entire file in one batch.
See https://pyo3.rs/v0.22.0/memory.html#gil-bound-memory
for more info. [default: 30000]
Common options:
Expand All @@ -120,6 +121,7 @@ use indicatif::{ProgressBar, ProgressDrawTarget};
use log::{error, log_enabled, Level::Debug};
use pyo3::{
intern,
prelude::*,
types::{PyDict, PyModule},
PyErr, PyResult, Python,
};
Expand Down Expand Up @@ -164,7 +166,7 @@ struct Args {
cmd_filter: bool,
arg_new_column: Option<String>,
arg_script: String,
flag_batch: u32,
flag_batch: usize,
flag_helper: Option<String>,
arg_input: Option<String>,
flag_output: Option<String>,
Expand Down Expand Up @@ -243,16 +245,22 @@ pub fn run(argv: &[&str]) -> CliResult<()> {
let mut batch_record = csv::StringRecord::new();
let mut error_count = 0_usize;

let batch_size = if args.flag_batch == 0 {
util::count_rows(&rconfig)? as usize
} else {
args.flag_batch
};

// reuse batch buffers
let mut batch = Vec::with_capacity(args.flag_batch as usize);
let mut batch = Vec::with_capacity(batch_size);

// main loop to read CSV and construct batches.
// we batch python operations so that the GILPool does not get very large
// as we release the pool after each batch
// loop exits when batch is empty.
// see https://pyo3.rs/latest/memory.html#gil-bound-memory for more info.
'batch_loop: loop {
for _ in 0..args.flag_batch {
for _ in 0..batch_size {
match rdr.read_record(&mut batch_record) {
Ok(has_data) => {
if has_data {
Expand All @@ -275,19 +283,19 @@ pub fn run(argv: &[&str]) -> CliResult<()> {

Python::with_gil(|py| -> PyResult<()> {
let curr_batch = batch.clone();
let helpers = PyModule::from_code(py, HELPERS, "qsv_helpers.py", "qsv_helpers")?;
let batch_globals = PyDict::new(py);
let batch_locals = PyDict::new(py);
let helpers = PyModule::from_code_bound(py, HELPERS, "qsv_helpers.py", "qsv_helpers")?;
let batch_globals = PyDict::new_bound(py);
let batch_locals = PyDict::new_bound(py);

let user_helpers =
PyModule::from_code(py, &helper_text, "qsv_user_helpers.py", "qsv_uh")?;
PyModule::from_code_bound(py, &helper_text, "qsv_user_helpers.py", "qsv_uh")?;
batch_globals.set_item(intern!(py, "qsv_uh"), user_helpers)?;

// Global imports
let builtins = PyModule::import(py, "builtins")?;
let math_module = PyModule::import(py, "math")?;
let random_module = PyModule::import(py, "random")?;
let datetime_module = PyModule::import(py, "datetime")?;
let builtins = PyModule::import_bound(py, "builtins")?;
let math_module = PyModule::import_bound(py, "math")?;
let random_module = PyModule::import_bound(py, "random")?;
let datetime_module = PyModule::import_bound(py, "datetime")?;

batch_globals.set_item("__builtins__", builtins)?;
batch_globals.set_item("math", math_module)?;
Expand All @@ -298,7 +306,7 @@ pub fn run(argv: &[&str]) -> CliResult<()> {
.getattr("QSVRow")?
.call1((headers.iter().collect::<Vec<&str>>(),))?;

batch_locals.set_item("col", py_row)?;
batch_locals.set_item("col", py_row.clone())?;

let error_result = intern!(py, "<ERROR>");

Expand All @@ -323,7 +331,7 @@ pub fn run(argv: &[&str]) -> CliResult<()> {
py_row.call_method1(intern!(py, "_update_underlying_data"), (row_data,))?;

let result = py
.eval(&args.arg_script, Some(batch_globals), Some(batch_locals))
.eval_bound(&args.arg_script, Some(&batch_globals), Some(&batch_locals))
.map_err(|e| {
e.print_and_set_sys_last_vars(py);
error_count += 1;
Expand All @@ -332,7 +340,7 @@ pub fn run(argv: &[&str]) -> CliResult<()> {
}
"Evaluation of given expression failed with the above error!"
})
.unwrap_or_else(|_| error_result.as_gil_ref());
.unwrap_or_else(|_| error_result.clone().into_any());

if args.cmd_map {
let result = helpers
Expand Down

0 comments on commit 23edde9

Please sign in to comment.