Skip to content

Commit

Permalink
Catch unwind panic in async handler
Browse files Browse the repository at this point in the history
  • Loading branch information
rustworthy committed Aug 9, 2024
1 parent 6212ccd commit 011b847
Show file tree
Hide file tree
Showing 4 changed files with 136 additions and 10 deletions.
93 changes: 93 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ tokio-rustls = { version = "0.25.0", optional = true }
tracing = "0.1"
url = "2"
semver = { version = "1.0.23", features = ["serde"] }
futures = "0.3.30"

[dev-dependencies]
rustls-pki-types = "1.0.1"
Expand Down
24 changes: 20 additions & 4 deletions src/worker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ use super::proto::Client;
use crate::error::Error;
use crate::proto::{Ack, Fail, Job};
use fnv::FnvHashMap;
use futures::FutureExt;
use std::future::Future;
use std::panic::AssertUnwindSafe;
use std::pin::Pin;
use std::process;
use std::sync::{atomic, Arc};
Expand Down Expand Up @@ -214,7 +216,8 @@ impl<E> Worker<E> {

enum Failed<E: StdError, JE: StdError> {
Application(E),
HandlerPanic(JE),
HandlerPanic(String),
SyncHandlerPanic(JE),
BadJobType(String),
}

Expand All @@ -225,11 +228,23 @@ impl<E: StdError + 'static + Send> Worker<E> {
.get(job.kind())
.ok_or(Failed::BadJobType(job.kind().to_string()))?;
match handler {
Callback::Async(cb) => cb.run(job).await.map_err(Failed::Application),
Callback::Async(cb) => AssertUnwindSafe(cb.run(job))
.catch_unwind()
.await
.map_err(|any| {
if any.is::<String>() {
Failed::HandlerPanic(*any.downcast::<String>().unwrap())
} else if any.is::<&str>() {
Failed::HandlerPanic((*any.downcast::<&str>().unwrap()).to_string())

Check warning on line 238 in src/worker/mod.rs

View check run for this annotation

Codecov / codecov/patch

src/worker/mod.rs#L237-L238

Added lines #L237 - L238 were not covered by tests
} else {
Failed::HandlerPanic("Panic in handler".into())

Check warning on line 240 in src/worker/mod.rs

View check run for this annotation

Codecov / codecov/patch

src/worker/mod.rs#L240

Added line #L240 was not covered by tests
}
})?
.map_err(Failed::Application),
Callback::Sync(cb) => {
let cb = Arc::clone(cb);
match spawn_blocking(move || cb(job)).await {
Err(join_error) => Err(Failed::HandlerPanic(join_error)),
Err(join_error) => Err(Failed::SyncHandlerPanic(join_error)),
Ok(processing_result) => processing_result.map_err(Failed::Application),
}
}
Expand Down Expand Up @@ -330,7 +345,8 @@ impl<E: StdError + 'static + Send> Worker<E> {
let fail = match e {
Failed::BadJobType(jt) => Fail::generic(jid, format!("No handler for {}", jt)),
Failed::Application(e) => Fail::generic_with_backtrace(jid, e),
Failed::HandlerPanic(e) => Fail::generic_with_backtrace(jid, e),
Failed::HandlerPanic(e) => Fail::generic(jid, e),
Failed::SyncHandlerPanic(e) => Fail::generic_with_backtrace(jid, e),
};
self.worker_states.register_failure(worker, fail.clone());
self.c.issue(&fail).await?.read_ok().await?;
Expand Down
28 changes: 22 additions & 6 deletions tests/real/community.rs
Original file line number Diff line number Diff line change
Expand Up @@ -729,22 +729,38 @@ async fn test_panic_in_handler() {
let local = "test_panic_in_handler";

let mut w = Worker::builder::<io::Error>()
.register_blocking_fn("panic", |_j| {
.register_blocking_fn("panic_SYNC_handler", |_j| {
panic!("Panic inside the handler...");
})
.register_fn("panic_ASYNC_handler", |j| async move {
// potentially going out of bounds
let arg1 = &j.args()[0];
let arg2 = &j.args()[1];
let _ = arg1.as_i64().unwrap() + arg2.as_i64().unwrap();
Ok::<(), io::Error>(())
})
.connect(None)
.await
.unwrap();

Client::connect(None)
.await
.unwrap()
.enqueue(Job::builder("panic").queue(local).build())
let mut c = Client::connect(None).await.unwrap();

// note how we are not specifying any args for this job,
// so indexing into `job.args()` will panic
c.enqueue(Job::builder("panic_SYNC_handler").queue(local).build())
.await
.unwrap();

// we _did_ consume and process the job, the processing result itself though
// was a failure; however, a panic in the handler was "intercepted" and communicated
// to the Faktory server via the FAIL command
// to the Faktory server via the FAIL command;
// note how the test run is not interrupted with a panic
assert!(w.run_one(0, &[local]).await.unwrap());

c.enqueue(Job::builder("panic_ASYNC_handler").queue(local).build())
.await
.unwrap();

// same for async handler, note how the test run is not interrupted with a panic
assert!(w.run_one(0, &[local]).await.unwrap());
}

0 comments on commit 011b847

Please sign in to comment.