Skip to content

Commit

Permalink
Add support for blocking handlers
Browse files Browse the repository at this point in the history
  • Loading branch information
rustworthy committed Apr 30, 2024
1 parent 70d40ad commit 3088d07
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 5 deletions.
28 changes: 27 additions & 1 deletion src/worker/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::{
Error, Job, JobRunner, WorkerId,
};
use std::future::Future;
use std::sync::Arc;
use tokio::io::{AsyncRead, AsyncWrite, BufStream};
use tokio::net::TcpStream as TokioStream;

Expand Down Expand Up @@ -106,6 +107,30 @@ impl<E: 'static> WorkerBuilder<E> {
self.register(kind, Closure(handler))
}

/// Register a _blocking_ (synchronous) handler function for the given job type (`kind`).
///
/// This is an analogue of [`register_fn`](WorkerBuilder::register_fn) for compute heavy tasks.
/// Internally, `tokio`'s `spawn_blocking` is used when a job arrives whose type matches `kind`,
/// and so the `handler` is executed in a dedicated pool for blocking operations. See `Tokio`'s
/// [docs](https://docs.rs/tokio/latest/tokio/index.html#cpu-bound-tasks-and-blocking-code) for
/// how to set the upper limit on the number of threads in the mentioned pool and other details.
///
/// Note that it is not recommended to mix blocking and non-blocking tasks and so if many of your
/// handlers are blocking, you will want to launch a dedicated worker process (at least a separate
/// Tokio Runtime) where only blocking handlers will be used.
///
/// Also note that only one single handler per job kind is supported. Registering another handler
/// for the same job kind will silently override the handler registered previously.
pub fn register_blocking_fn<K, H>(mut self, kind: K, handler: H) -> Self
where
K: Into<String>,
H: Fn(Job) -> Result<(), E> + Send + Sync + 'static,
{
self.callbacks
.insert(kind.into(), super::Callback::Sync(Arc::new(handler)));
self
}

/// Register a handler for the given job type (`kind`).
///
/// Whenever a job whose type matches `kind` is fetched from the Faktory, the given handler
Expand All @@ -118,7 +143,8 @@ impl<E: 'static> WorkerBuilder<E> {
K: Into<String>,
H: JobRunner<Error = E> + 'static,
{
self.callbacks.insert(kind.into(), Box::new(runner));
self.callbacks
.insert(kind.into(), super::Callback::Async(Box::new(runner)));
self
}

Expand Down
20 changes: 17 additions & 3 deletions src/worker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use fnv::FnvHashMap;
use std::sync::{atomic, Arc};
use std::{error::Error as StdError, sync::atomic::AtomicUsize};
use tokio::io::{AsyncBufRead, AsyncWrite};
use tokio::task::{AbortHandle, JoinSet};
use tokio::task::{spawn_blocking, AbortHandle, JoinSet};

mod builder;
mod health;
Expand All @@ -19,7 +19,12 @@ pub(crate) const STATUS_RUNNING: usize = 0;
pub(crate) const STATUS_QUIET: usize = 1;
pub(crate) const STATUS_TERMINATING: usize = 2;

type CallbacksRegistry<E> = FnvHashMap<String, runner::BoxedJobRunner<E>>;
pub(crate) enum Callback<E> {
Async(runner::BoxedJobRunner<E>),
Sync(Arc<dyn Fn(Job) -> Result<(), E> + Sync + Send + 'static>),
}

type CallbacksRegistry<E> = FnvHashMap<String, Callback<E>>;

/// `Worker` is used to run a worker that processes jobs provided by Faktory.
///
Expand Down Expand Up @@ -177,7 +182,16 @@ impl<S: AsyncBufRead + AsyncWrite + Send + Unpin, E: StdError + 'static + Send>
.callbacks
.get(job.kind())
.ok_or(Failed::BadJobType(job.kind().to_string()))?;
handler.run(job).await.map_err(Failed::Application)
match handler {
Callback::Async(cb) => cb.run(job).await.map_err(Failed::Application),
Callback::Sync(cb) => {
let cb = Arc::clone(cb);
spawn_blocking(move || cb(job))
.await
.expect("joined ok")
.map_err(Failed::Application)
}
}
}

async fn report_on_all_workers(&mut self) -> Result<(), Error> {
Expand Down
46 changes: 45 additions & 1 deletion tests/real/community.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::skip_check;
use faktory::{Client, Job, JobBuilder, JobId, WorkerBuilder, WorkerId};
use serde_json::Value;
use std::{io, sync};
use std::{io, sync, time::Duration};

#[tokio::test(flavor = "multi_thread")]
async fn hello_client() {
Expand Down Expand Up @@ -340,3 +340,47 @@ async fn test_jobs_created_with_builder() {
.unwrap();
assert!(had_job);
}

// It is generally not ok to mix blocking and not blocking tasks,
// we are doing so in this test simply to demonstrate it is _possible_.
#[tokio::test(flavor = "multi_thread")]
async fn test_jobs_with_blocking_handlers() {
skip_check!();

let local = "test_jobs_with_blocking_handlers";

let mut w = WorkerBuilder::default()
.register_blocking_fn("cpu_intensive", |_j| {
// Imaging some compute heavy operations:serializing, sorting, matrix multiplication, etc.
std::thread::sleep(Duration::from_millis(1000));
Ok::<(), io::Error>(())
})
.register_fn("io_intensive", |_j| async move {
// Imagine fetching data for this user from various origins,
// updating an entry on them in the database, and then sending them
// an email and pushing a follow-up task on the Faktory queue
Ok::<(), io::Error>(())
})
.register_fn(
"general_workload",
|_j| async move { Ok::<(), io::Error>(()) },
)
.connect(None)
.await
.unwrap();

Client::connect(None)
.await
.unwrap()
.enqueue_many([
Job::builder("cpu_intensive").queue(local).build(),
Job::builder("io_intensive").queue(local).build(),
Job::builder("general_workload").queue(local).build(),
])
.await
.unwrap();

for _ in 0..2 {
assert!(w.run_one(0, &[local]).await.unwrap());
}
}

0 comments on commit 3088d07

Please sign in to comment.