diff --git a/src/worker/builder.rs b/src/worker/builder.rs index f5768216..2a8d5eed 100644 --- a/src/worker/builder.rs +++ b/src/worker/builder.rs @@ -4,6 +4,7 @@ use crate::{ Error, Job, JobRunner, WorkerId, }; use std::future::Future; +use std::sync::Arc; use tokio::io::{AsyncRead, AsyncWrite, BufStream}; use tokio::net::TcpStream as TokioStream; @@ -106,6 +107,33 @@ impl WorkerBuilder { self.register(kind, Closure(handler)) } + /// Register a _blocking_ (synchronous) handler function for the given job type (`kind`). + /// + /// This is an analogue of [`register_fn`](WorkerBuilder::register_fn) for compute heavy tasks. + /// Internally, `tokio`'s `spawn_blocking` is used when a job arrives whose type matches `kind`, + /// and so the `handler` is executed in a dedicated pool for blocking operations. See `Tokio`'s + /// [docs](https://docs.rs/tokio/latest/tokio/index.html#cpu-bound-tasks-and-blocking-code) for + /// how to set the upper limit on the number of threads in the mentioned pool and other details. + /// + /// You can mix and match async and blocking handlers in a single `Worker`. However, note that + /// there is no active management of the blocking tasks in `tokio`, and so if you end up with more + /// CPU-intensive blocking handlers executing at the same time than you have cores, the asynchronous + /// handler tasks (and indeed, all tasks) will suffer as a result. If you have a lot of blocking tasks, + /// consider using the standard async job handler (which you can register with [`WorkerBuilder::register`] + /// or [`WorkerBuilder::register_fn`]) and add explicit code to manage the blocking tasks appropriately. + /// + /// Also note that only one single handler per job kind is supported. Registering another handler + /// for the same job kind will silently override the handler registered previously. + pub fn register_blocking_fn(mut self, kind: K, handler: H) -> Self + where + K: Into, + H: Fn(Job) -> Result<(), E> + Send + Sync + 'static, + { + self.callbacks + .insert(kind.into(), super::Callback::Sync(Arc::new(handler))); + self + } + /// Register a handler for the given job type (`kind`). /// /// Whenever a job whose type matches `kind` is fetched from the Faktory, the given handler @@ -118,7 +146,8 @@ impl WorkerBuilder { K: Into, H: JobRunner + 'static, { - self.callbacks.insert(kind.into(), Box::new(runner)); + self.callbacks + .insert(kind.into(), super::Callback::Async(Box::new(runner))); self } diff --git a/src/worker/mod.rs b/src/worker/mod.rs index cc106e30..76a7da26 100644 --- a/src/worker/mod.rs +++ b/src/worker/mod.rs @@ -6,7 +6,7 @@ use std::sync::{atomic, Arc}; use std::{error::Error as StdError, sync::atomic::AtomicUsize}; use tokio::io::{AsyncBufRead, AsyncWrite}; use tokio::net::TcpStream; -use tokio::task::{AbortHandle, JoinSet}; +use tokio::task::{spawn_blocking, AbortHandle, JoinError, JoinSet}; mod builder; mod health; @@ -20,7 +20,12 @@ pub(crate) const STATUS_RUNNING: usize = 0; pub(crate) const STATUS_QUIET: usize = 1; pub(crate) const STATUS_TERMINATING: usize = 2; -type CallbacksRegistry = FnvHashMap>; +pub(crate) enum Callback { + Async(runner::BoxedJobRunner), + Sync(Arc Result<(), E> + Sync + Send + 'static>), +} + +type CallbacksRegistry = FnvHashMap>; /// `Worker` is used to run a worker that processes jobs provided by Faktory. /// @@ -176,18 +181,28 @@ impl Worker { } } -enum Failed { +enum Failed { Application(E), + HandlerPanic(JE), BadJobType(String), } impl Worker { - async fn run_job(&mut self, job: Job) -> Result<(), Failed> { + async fn run_job(&mut self, job: Job) -> Result<(), Failed> { let handler = self .callbacks .get(job.kind()) .ok_or(Failed::BadJobType(job.kind().to_string()))?; - handler.run(job).await.map_err(Failed::Application) + match handler { + Callback::Async(cb) => cb.run(job).await.map_err(Failed::Application), + Callback::Sync(cb) => { + let cb = Arc::clone(cb); + match spawn_blocking(move || cb(job)).await { + Err(join_error) => Err(Failed::HandlerPanic(join_error)), + Ok(processing_result) => processing_result.map_err(Failed::Application), + } + } + } } async fn report_on_all_workers(&mut self) -> Result<(), Error> { @@ -274,6 +289,7 @@ impl let fail = match e { Failed::BadJobType(jt) => Fail::generic(jid, format!("No handler for {}", jt)), Failed::Application(e) => Fail::generic_with_backtrace(jid, e), + Failed::HandlerPanic(e) => Fail::generic_with_backtrace(jid, e), }; self.worker_states.register_failure(worker, fail.clone()); self.c.issue(&fail).await?.read_ok().await?; diff --git a/tests/real/community.rs b/tests/real/community.rs index 66755421..2e149199 100644 --- a/tests/real/community.rs +++ b/tests/real/community.rs @@ -594,3 +594,75 @@ async fn test_jobs_created_with_builder() { .unwrap(); assert!(had_job); } + +// It is generally not ok to mix blocking and not blocking tasks, +// we are doing so in this test simply to demonstrate it is _possible_. +#[tokio::test(flavor = "multi_thread")] +async fn test_jobs_with_blocking_handlers() { + skip_check!(); + + let local = "test_jobs_with_blocking_handlers"; + + let mut w = Worker::builder() + .register_blocking_fn("cpu_intensive", |_j| { + // Imagine some compute heavy operations:serializing, sorting, matrix multiplication, etc. + std::thread::sleep(Duration::from_millis(1000)); + Ok::<(), io::Error>(()) + }) + .register_fn("io_intensive", |_j| async move { + // Imagine fetching data for this user from various origins, + // updating an entry on them in the database, and then sending them + // an email and pushing a follow-up task on the Faktory queue + tokio::time::sleep(Duration::from_millis(100)).await; + Ok::<(), io::Error>(()) + }) + .register_fn( + "general_workload", + |_j| async move { Ok::<(), io::Error>(()) }, + ) + .connect(None) + .await + .unwrap(); + + Client::connect(None) + .await + .unwrap() + .enqueue_many([ + Job::builder("cpu_intensive").queue(local).build(), + Job::builder("io_intensive").queue(local).build(), + Job::builder("general_workload").queue(local).build(), + ]) + .await + .unwrap(); + + for _ in 0..2 { + assert!(w.run_one(0, &[local]).await.unwrap()); + } +} + +#[tokio::test(flavor = "multi_thread")] +async fn test_panic_in_handler() { + skip_check!(); + + let local = "test_panic_in_handler"; + + let mut w = Worker::builder::() + .register_blocking_fn("panic", |_j| { + panic!("Panic inside the handler..."); + }) + .connect(None) + .await + .unwrap(); + + Client::connect(None) + .await + .unwrap() + .enqueue(Job::builder("panic").queue(local).build()) + .await + .unwrap(); + + // we _did_ consume and process the job, the processing result itself though + // was a failure; however, a panic in the handler was "intercepted" and communicated + // to the Faktory server via the FAIL command + assert!(w.run_one(0, &[local]).await.unwrap()); +}