Skip to content

Commit

Permalink
Add support for blocking handlers (#65)
Browse files Browse the repository at this point in the history
* Add support for blocking handlers

* Use tokio::time::sleep in test_jobs_with_blocking_handlers test

* Bubble up panic in handler and communicate to Faktory server

* Update docs for WorkerBuilder::register_blocking_fn
  • Loading branch information
rustworthy authored Jul 14, 2024
1 parent 7ee772b commit 0df8659
Show file tree
Hide file tree
Showing 3 changed files with 123 additions and 6 deletions.
31 changes: 30 additions & 1 deletion src/worker/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::{
Error, Job, JobRunner, WorkerId,
};
use std::future::Future;
use std::sync::Arc;
use tokio::io::{AsyncRead, AsyncWrite, BufStream};
use tokio::net::TcpStream as TokioStream;

Expand Down Expand Up @@ -106,6 +107,33 @@ impl<E: 'static> WorkerBuilder<E> {
self.register(kind, Closure(handler))
}

/// Register a _blocking_ (synchronous) handler function for the given job type (`kind`).
///
/// This is an analogue of [`register_fn`](WorkerBuilder::register_fn) for compute heavy tasks.
/// Internally, `tokio`'s `spawn_blocking` is used when a job arrives whose type matches `kind`,
/// and so the `handler` is executed in a dedicated pool for blocking operations. See `Tokio`'s
/// [docs](https://docs.rs/tokio/latest/tokio/index.html#cpu-bound-tasks-and-blocking-code) for
/// how to set the upper limit on the number of threads in the mentioned pool and other details.
///
/// You can mix and match async and blocking handlers in a single `Worker`. However, note that
/// there is no active management of the blocking tasks in `tokio`, and so if you end up with more
/// CPU-intensive blocking handlers executing at the same time than you have cores, the asynchronous
/// handler tasks (and indeed, all tasks) will suffer as a result. If you have a lot of blocking tasks,
/// consider using the standard async job handler (which you can register with [`WorkerBuilder::register`]
/// or [`WorkerBuilder::register_fn`]) and add explicit code to manage the blocking tasks appropriately.
///
/// Also note that only one single handler per job kind is supported. Registering another handler
/// for the same job kind will silently override the handler registered previously.
pub fn register_blocking_fn<K, H>(mut self, kind: K, handler: H) -> Self
where
K: Into<String>,
H: Fn(Job) -> Result<(), E> + Send + Sync + 'static,
{
self.callbacks
.insert(kind.into(), super::Callback::Sync(Arc::new(handler)));
self
}

/// Register a handler for the given job type (`kind`).
///
/// Whenever a job whose type matches `kind` is fetched from the Faktory, the given handler
Expand All @@ -118,7 +146,8 @@ impl<E: 'static> WorkerBuilder<E> {
K: Into<String>,
H: JobRunner<Error = E> + 'static,
{
self.callbacks.insert(kind.into(), Box::new(runner));
self.callbacks
.insert(kind.into(), super::Callback::Async(Box::new(runner)));
self
}

Expand Down
26 changes: 21 additions & 5 deletions src/worker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::sync::{atomic, Arc};
use std::{error::Error as StdError, sync::atomic::AtomicUsize};
use tokio::io::{AsyncBufRead, AsyncWrite};
use tokio::net::TcpStream;
use tokio::task::{AbortHandle, JoinSet};
use tokio::task::{spawn_blocking, AbortHandle, JoinError, JoinSet};

mod builder;
mod health;
Expand All @@ -20,7 +20,12 @@ pub(crate) const STATUS_RUNNING: usize = 0;
pub(crate) const STATUS_QUIET: usize = 1;
pub(crate) const STATUS_TERMINATING: usize = 2;

type CallbacksRegistry<E> = FnvHashMap<String, runner::BoxedJobRunner<E>>;
pub(crate) enum Callback<E> {
Async(runner::BoxedJobRunner<E>),
Sync(Arc<dyn Fn(Job) -> Result<(), E> + Sync + Send + 'static>),
}

type CallbacksRegistry<E> = FnvHashMap<String, Callback<E>>;

/// `Worker` is used to run a worker that processes jobs provided by Faktory.
///
Expand Down Expand Up @@ -176,18 +181,28 @@ impl<S: AsyncWrite + Send + Unpin, E> Worker<S, E> {
}
}

enum Failed<E: StdError> {
enum Failed<E: StdError, JE: StdError> {
Application(E),
HandlerPanic(JE),
BadJobType(String),
}

impl<S: AsyncBufRead + AsyncWrite + Send + Unpin, E: StdError + 'static + Send> Worker<S, E> {
async fn run_job(&mut self, job: Job) -> Result<(), Failed<E>> {
async fn run_job(&mut self, job: Job) -> Result<(), Failed<E, JoinError>> {
let handler = self
.callbacks
.get(job.kind())
.ok_or(Failed::BadJobType(job.kind().to_string()))?;
handler.run(job).await.map_err(Failed::Application)
match handler {
Callback::Async(cb) => cb.run(job).await.map_err(Failed::Application),
Callback::Sync(cb) => {
let cb = Arc::clone(cb);
match spawn_blocking(move || cb(job)).await {
Err(join_error) => Err(Failed::HandlerPanic(join_error)),
Ok(processing_result) => processing_result.map_err(Failed::Application),
}
}
}
}

async fn report_on_all_workers(&mut self) -> Result<(), Error> {
Expand Down Expand Up @@ -274,6 +289,7 @@ impl<S: AsyncBufRead + AsyncWrite + Send + Unpin, E: StdError + 'static + Send>
let fail = match e {
Failed::BadJobType(jt) => Fail::generic(jid, format!("No handler for {}", jt)),
Failed::Application(e) => Fail::generic_with_backtrace(jid, e),
Failed::HandlerPanic(e) => Fail::generic_with_backtrace(jid, e),
};
self.worker_states.register_failure(worker, fail.clone());
self.c.issue(&fail).await?.read_ok().await?;
Expand Down
72 changes: 72 additions & 0 deletions tests/real/community.rs
Original file line number Diff line number Diff line change
Expand Up @@ -602,3 +602,75 @@ async fn test_jobs_created_with_builder() {
.unwrap();
assert!(had_job);
}

// It is generally not ok to mix blocking and not blocking tasks,
// we are doing so in this test simply to demonstrate it is _possible_.
#[tokio::test(flavor = "multi_thread")]
async fn test_jobs_with_blocking_handlers() {
skip_check!();

let local = "test_jobs_with_blocking_handlers";

let mut w = Worker::builder()
.register_blocking_fn("cpu_intensive", |_j| {
// Imagine some compute heavy operations:serializing, sorting, matrix multiplication, etc.
std::thread::sleep(Duration::from_millis(1000));
Ok::<(), io::Error>(())
})
.register_fn("io_intensive", |_j| async move {
// Imagine fetching data for this user from various origins,
// updating an entry on them in the database, and then sending them
// an email and pushing a follow-up task on the Faktory queue
tokio::time::sleep(Duration::from_millis(100)).await;
Ok::<(), io::Error>(())
})
.register_fn(
"general_workload",
|_j| async move { Ok::<(), io::Error>(()) },
)
.connect(None)
.await
.unwrap();

Client::connect(None)
.await
.unwrap()
.enqueue_many([
Job::builder("cpu_intensive").queue(local).build(),
Job::builder("io_intensive").queue(local).build(),
Job::builder("general_workload").queue(local).build(),
])
.await
.unwrap();

for _ in 0..2 {
assert!(w.run_one(0, &[local]).await.unwrap());
}
}

#[tokio::test(flavor = "multi_thread")]
async fn test_panic_in_handler() {
skip_check!();

let local = "test_panic_in_handler";

let mut w = Worker::builder::<io::Error>()
.register_blocking_fn("panic", |_j| {
panic!("Panic inside the handler...");
})
.connect(None)
.await
.unwrap();

Client::connect(None)
.await
.unwrap()
.enqueue(Job::builder("panic").queue(local).build())
.await
.unwrap();

// we _did_ consume and process the job, the processing result itself though
// was a failure; however, a panic in the handler was "intercepted" and communicated
// to the Faktory server via the FAIL command
assert!(w.run_one(0, &[local]).await.unwrap());
}

0 comments on commit 0df8659

Please sign in to comment.