Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for blocking handlers #65

Merged
merged 5 commits into from
Jul 14, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 27 additions & 1 deletion src/worker/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::{
Error, Job, JobRunner, WorkerId,
};
use std::future::Future;
use std::sync::Arc;
use tokio::io::{AsyncRead, AsyncWrite, BufStream};
use tokio::net::TcpStream as TokioStream;

Expand Down Expand Up @@ -106,6 +107,30 @@ impl<E: 'static> WorkerBuilder<E> {
self.register(kind, Closure(handler))
}

/// Register a _blocking_ (synchronous) handler function for the given job type (`kind`).
///
/// This is an analogue of [`register_fn`](WorkerBuilder::register_fn) for compute heavy tasks.
/// Internally, `tokio`'s `spawn_blocking` is used when a job arrives whose type matches `kind`,
/// and so the `handler` is executed in a dedicated pool for blocking operations. See `Tokio`'s
/// [docs](https://docs.rs/tokio/latest/tokio/index.html#cpu-bound-tasks-and-blocking-code) for
/// how to set the upper limit on the number of threads in the mentioned pool and other details.
///
/// Note that it is not recommended to mix blocking and non-blocking tasks and so if many of your
/// handlers are blocking, you will want to launch a dedicated worker process (at least a separate
/// Tokio Runtime) where only blocking handlers will be used.
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I expect this to be necessary. As long as we're using spawn_blocking, it should be fine to have some async and some sync handlers.

Copy link
Collaborator Author

@rustworthy rustworthy May 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is my reading of this section and namely this passage:


If your code is CPU-bound and you wish to limit the number of threads used to run it, you should use a separate thread pool dedicated to CPU bound tasks. For example, you could consider using the rayon library for CPU-bound tasks. It is also possible to create an extra Tokio runtime dedicated to CPU-bound tasks, but if you do this, you should be careful that the extra runtime runs only CPU-bound tasks, as IO-bound tasks on that runtime will behave poorly.


So "if many handlers are blocking" they can employ one single runtime and mix blocking handlers with async ones, since we are using spawn_blocking (and so are protecting the cooperative scheduling for async tasks), but we are encouraging them to consider launching a dedicated runtime for their blocking tasks. Or you think this note is redundant?

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think giving a nudge in this direction is fine, although I think the nudge is currently too strong. I wonder if we just want to say something like:

You can mix and match async and blocking handlers in a single Worker. However, note that there is no active management of the blocking tasks in tokio, and so if you end up with more CPU-intensive blocking handlers executing at the same time than you have cores, the asynchronous handler tasks (and indeed, all tasks) will suffer as a result. If you have a lot of blocking tasks, consider using the standard async job handler and add explicit code to manage the blocking tasks appropriately.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

///
/// Also note that only one single handler per job kind is supported. Registering another handler
/// for the same job kind will silently override the handler registered previously.
pub fn register_blocking_fn<K, H>(mut self, kind: K, handler: H) -> Self
where
K: Into<String>,
H: Fn(Job) -> Result<(), E> + Send + Sync + 'static,
{
self.callbacks
.insert(kind.into(), super::Callback::Sync(Arc::new(handler)));
self
}

/// Register a handler for the given job type (`kind`).
///
/// Whenever a job whose type matches `kind` is fetched from the Faktory, the given handler
Expand All @@ -118,7 +143,8 @@ impl<E: 'static> WorkerBuilder<E> {
K: Into<String>,
H: JobRunner<Error = E> + 'static,
{
self.callbacks.insert(kind.into(), Box::new(runner));
self.callbacks
.insert(kind.into(), super::Callback::Async(Box::new(runner)));
self
}

Expand Down
20 changes: 17 additions & 3 deletions src/worker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::sync::{atomic, Arc};
use std::{error::Error as StdError, sync::atomic::AtomicUsize};
use tokio::io::{AsyncBufRead, AsyncWrite};
use tokio::net::TcpStream;
use tokio::task::{AbortHandle, JoinSet};
use tokio::task::{spawn_blocking, AbortHandle, JoinSet};

mod builder;
mod health;
Expand All @@ -20,7 +20,12 @@ pub(crate) const STATUS_RUNNING: usize = 0;
pub(crate) const STATUS_QUIET: usize = 1;
pub(crate) const STATUS_TERMINATING: usize = 2;

type CallbacksRegistry<E> = FnvHashMap<String, runner::BoxedJobRunner<E>>;
pub(crate) enum Callback<E> {
Async(runner::BoxedJobRunner<E>),
Sync(Arc<dyn Fn(Job) -> Result<(), E> + Sync + Send + 'static>),
jonhoo marked this conversation as resolved.
Show resolved Hide resolved
}

type CallbacksRegistry<E> = FnvHashMap<String, Callback<E>>;

/// `Worker` is used to run a worker that processes jobs provided by Faktory.
///
Expand Down Expand Up @@ -187,7 +192,16 @@ impl<S: AsyncBufRead + AsyncWrite + Send + Unpin, E: StdError + 'static + Send>
.callbacks
.get(job.kind())
.ok_or(Failed::BadJobType(job.kind().to_string()))?;
handler.run(job).await.map_err(Failed::Application)
match handler {
Callback::Async(cb) => cb.run(job).await.map_err(Failed::Application),
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am worried that this is not the case for async handlers, panicking in there is not intercepted in the way is now being done for register_blocking_fn. I wonder if might need to use futures::future::FutureExt::catch_unwind for those futures, not sure..

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's do that in a follow-up (maybe the tech debt one?)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pinned in there

Callback::Sync(cb) => {
let cb = Arc::clone(cb);
jonhoo marked this conversation as resolved.
Show resolved Hide resolved
spawn_blocking(move || cb(job))
.await
.expect("joined ok")
jonhoo marked this conversation as resolved.
Show resolved Hide resolved
.map_err(Failed::Application)
}
}
}

async fn report_on_all_workers(&mut self) -> Result<(), Error> {
Expand Down
45 changes: 45 additions & 0 deletions tests/real/community.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::skip_check;
use faktory::{Client, Job, JobBuilder, JobId, Worker, WorkerBuilder, WorkerId};
use serde_json::Value;
use std::time::Duration;
use std::{io, sync};

#[tokio::test(flavor = "multi_thread")]
Expand Down Expand Up @@ -340,3 +341,47 @@ async fn test_jobs_created_with_builder() {
.unwrap();
assert!(had_job);
}

// It is generally not ok to mix blocking and not blocking tasks,
// we are doing so in this test simply to demonstrate it is _possible_.
#[tokio::test(flavor = "multi_thread")]
async fn test_jobs_with_blocking_handlers() {
skip_check!();

let local = "test_jobs_with_blocking_handlers";

let mut w = Worker::builder()
.register_blocking_fn("cpu_intensive", |_j| {
// Imagine some compute heavy operations:serializing, sorting, matrix multiplication, etc.
std::thread::sleep(Duration::from_millis(1000));
Ok::<(), io::Error>(())
})
.register_fn("io_intensive", |_j| async move {
// Imagine fetching data for this user from various origins,
// updating an entry on them in the database, and then sending them
// an email and pushing a follow-up task on the Faktory queue
Ok::<(), io::Error>(())
jonhoo marked this conversation as resolved.
Show resolved Hide resolved
})
.register_fn(
"general_workload",
|_j| async move { Ok::<(), io::Error>(()) },
)
.connect(None)
.await
.unwrap();

Client::connect(None)
.await
.unwrap()
.enqueue_many([
Job::builder("cpu_intensive").queue(local).build(),
Job::builder("io_intensive").queue(local).build(),
Job::builder("general_workload").queue(local).build(),
])
.await
.unwrap();

for _ in 0..2 {
assert!(w.run_one(0, &[local]).await.unwrap());
}
}
Loading