From 3088d07fd43d35c5a1e1031814d2a44986252aa2 Mon Sep 17 00:00:00 2001 From: --show-origin Date: Wed, 1 May 2024 00:43:52 +0500 Subject: [PATCH] Add support for blocking handlers --- src/worker/builder.rs | 28 ++++++++++++++++++++++++- src/worker/mod.rs | 20 +++++++++++++++--- tests/real/community.rs | 46 ++++++++++++++++++++++++++++++++++++++++- 3 files changed, 89 insertions(+), 5 deletions(-) diff --git a/src/worker/builder.rs b/src/worker/builder.rs index f5768216..0c1ee483 100644 --- a/src/worker/builder.rs +++ b/src/worker/builder.rs @@ -4,6 +4,7 @@ use crate::{ Error, Job, JobRunner, WorkerId, }; use std::future::Future; +use std::sync::Arc; use tokio::io::{AsyncRead, AsyncWrite, BufStream}; use tokio::net::TcpStream as TokioStream; @@ -106,6 +107,30 @@ impl WorkerBuilder { self.register(kind, Closure(handler)) } + /// Register a _blocking_ (synchronous) handler function for the given job type (`kind`). + /// + /// This is an analogue of [`register_fn`](WorkerBuilder::register_fn) for compute heavy tasks. + /// Internally, `tokio`'s `spawn_blocking` is used when a job arrives whose type matches `kind`, + /// and so the `handler` is executed in a dedicated pool for blocking operations. See `Tokio`'s + /// [docs](https://docs.rs/tokio/latest/tokio/index.html#cpu-bound-tasks-and-blocking-code) for + /// how to set the upper limit on the number of threads in the mentioned pool and other details. + /// + /// Note that it is not recommended to mix blocking and non-blocking tasks and so if many of your + /// handlers are blocking, you will want to launch a dedicated worker process (at least a separate + /// Tokio Runtime) where only blocking handlers will be used. + /// + /// Also note that only one single handler per job kind is supported. Registering another handler + /// for the same job kind will silently override the handler registered previously. + pub fn register_blocking_fn(mut self, kind: K, handler: H) -> Self + where + K: Into, + H: Fn(Job) -> Result<(), E> + Send + Sync + 'static, + { + self.callbacks + .insert(kind.into(), super::Callback::Sync(Arc::new(handler))); + self + } + /// Register a handler for the given job type (`kind`). /// /// Whenever a job whose type matches `kind` is fetched from the Faktory, the given handler @@ -118,7 +143,8 @@ impl WorkerBuilder { K: Into, H: JobRunner + 'static, { - self.callbacks.insert(kind.into(), Box::new(runner)); + self.callbacks + .insert(kind.into(), super::Callback::Async(Box::new(runner))); self } diff --git a/src/worker/mod.rs b/src/worker/mod.rs index ba93c449..87cd0410 100644 --- a/src/worker/mod.rs +++ b/src/worker/mod.rs @@ -5,7 +5,7 @@ use fnv::FnvHashMap; use std::sync::{atomic, Arc}; use std::{error::Error as StdError, sync::atomic::AtomicUsize}; use tokio::io::{AsyncBufRead, AsyncWrite}; -use tokio::task::{AbortHandle, JoinSet}; +use tokio::task::{spawn_blocking, AbortHandle, JoinSet}; mod builder; mod health; @@ -19,7 +19,12 @@ pub(crate) const STATUS_RUNNING: usize = 0; pub(crate) const STATUS_QUIET: usize = 1; pub(crate) const STATUS_TERMINATING: usize = 2; -type CallbacksRegistry = FnvHashMap>; +pub(crate) enum Callback { + Async(runner::BoxedJobRunner), + Sync(Arc Result<(), E> + Sync + Send + 'static>), +} + +type CallbacksRegistry = FnvHashMap>; /// `Worker` is used to run a worker that processes jobs provided by Faktory. /// @@ -177,7 +182,16 @@ impl .callbacks .get(job.kind()) .ok_or(Failed::BadJobType(job.kind().to_string()))?; - handler.run(job).await.map_err(Failed::Application) + match handler { + Callback::Async(cb) => cb.run(job).await.map_err(Failed::Application), + Callback::Sync(cb) => { + let cb = Arc::clone(cb); + spawn_blocking(move || cb(job)) + .await + .expect("joined ok") + .map_err(Failed::Application) + } + } } async fn report_on_all_workers(&mut self) -> Result<(), Error> { diff --git a/tests/real/community.rs b/tests/real/community.rs index 44100cb4..aaa62af9 100644 --- a/tests/real/community.rs +++ b/tests/real/community.rs @@ -1,7 +1,7 @@ use crate::skip_check; use faktory::{Client, Job, JobBuilder, JobId, WorkerBuilder, WorkerId}; use serde_json::Value; -use std::{io, sync}; +use std::{io, sync, time::Duration}; #[tokio::test(flavor = "multi_thread")] async fn hello_client() { @@ -340,3 +340,47 @@ async fn test_jobs_created_with_builder() { .unwrap(); assert!(had_job); } + +// It is generally not ok to mix blocking and not blocking tasks, +// we are doing so in this test simply to demonstrate it is _possible_. +#[tokio::test(flavor = "multi_thread")] +async fn test_jobs_with_blocking_handlers() { + skip_check!(); + + let local = "test_jobs_with_blocking_handlers"; + + let mut w = WorkerBuilder::default() + .register_blocking_fn("cpu_intensive", |_j| { + // Imaging some compute heavy operations:serializing, sorting, matrix multiplication, etc. + std::thread::sleep(Duration::from_millis(1000)); + Ok::<(), io::Error>(()) + }) + .register_fn("io_intensive", |_j| async move { + // Imagine fetching data for this user from various origins, + // updating an entry on them in the database, and then sending them + // an email and pushing a follow-up task on the Faktory queue + Ok::<(), io::Error>(()) + }) + .register_fn( + "general_workload", + |_j| async move { Ok::<(), io::Error>(()) }, + ) + .connect(None) + .await + .unwrap(); + + Client::connect(None) + .await + .unwrap() + .enqueue_many([ + Job::builder("cpu_intensive").queue(local).build(), + Job::builder("io_intensive").queue(local).build(), + Job::builder("general_workload").queue(local).build(), + ]) + .await + .unwrap(); + + for _ in 0..2 { + assert!(w.run_one(0, &[local]).await.unwrap()); + } +}