diff --git a/Cargo.lock b/Cargo.lock index 16e127f8..e729ed08 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -429,6 +429,7 @@ dependencies = [ "clap", "derive_builder", "fnv", + "futures", "hostname", "native-tls", "num-bigint", @@ -490,18 +491,95 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "futures" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "645c6916888f6cb6350d2550b80fb63e734897a8498abe35cfb732b6487804b0" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-channel" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eac8f7d7865dcb88bd4373ab671c8cf4508703796caa2b1985a9ca867b3fcb78" +dependencies = [ + "futures-core", + "futures-sink", +] + [[package]] name = "futures-core" version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d" +[[package]] +name = "futures-executor" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a576fc72ae164fca6b9db127eaa9a9dda0d61316034f33a0a0d4eda41f02b01d" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-io" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1" + +[[package]] +name = "futures-macro" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.71", +] + [[package]] name = "futures-sink" version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9fb8e00e87438d937621c1c6269e53f536c14d3fbd6a042bb24879e57d474fb5" +[[package]] +name = "futures-task" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "38d84fa142264698cdce1a9f9172cf383a0c82de1bddcf3092901442c4097004" + +[[package]] +name = "futures-util" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48" +dependencies = [ + "futures-channel", + "futures-core", + "futures-io", + "futures-macro", + "futures-sink", + "futures-task", + "memchr", + "pin-project-lite", + "pin-utils", + "slab", +] + [[package]] name = "generic-array" version = "0.14.7" @@ -855,6 +933,12 @@ version = "0.2.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bda66fc9667c18cb2758a2ac84d1167245054bcf85d5d1aaa6923f45801bdd02" +[[package]] +name = "pin-utils" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" + [[package]] name = "pkg-config" version = "0.3.30" @@ -1093,6 +1177,15 @@ dependencies = [ "lazy_static", ] +[[package]] +name = "slab" +version = "0.4.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f92a496fb766b417c996b9c5e57daf2f7ad3b0bebe1ccfca4856390e3d3bb67" +dependencies = [ + "autocfg", +] + [[package]] name = "smallvec" version = "1.13.2" diff --git a/Cargo.toml b/Cargo.toml index cd9a88c7..a26163ee 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -50,6 +50,7 @@ tokio-rustls = { version = "0.25.0", optional = true } tracing = "0.1" url = "2" semver = { version = "1.0.23", features = ["serde"] } +futures = "0.3.30" [dev-dependencies] rustls-pki-types = "1.0.1" diff --git a/src/worker/mod.rs b/src/worker/mod.rs index b019c508..3ad37638 100644 --- a/src/worker/mod.rs +++ b/src/worker/mod.rs @@ -2,7 +2,9 @@ use super::proto::Client; use crate::error::Error; use crate::proto::{Ack, Fail, Job}; use fnv::FnvHashMap; +use futures::FutureExt; use std::future::Future; +use std::panic::AssertUnwindSafe; use std::pin::Pin; use std::process; use std::sync::{atomic, Arc}; @@ -214,7 +216,8 @@ impl Worker { enum Failed { Application(E), - HandlerPanic(JE), + HandlerPanic(String), + SyncHandlerPanic(JE), BadJobType(String), } @@ -225,11 +228,23 @@ impl Worker { .get(job.kind()) .ok_or(Failed::BadJobType(job.kind().to_string()))?; match handler { - Callback::Async(cb) => cb.run(job).await.map_err(Failed::Application), + Callback::Async(cb) => AssertUnwindSafe(cb.run(job)) + .catch_unwind() + .await + .map_err(|any| { + if any.is::() { + Failed::HandlerPanic(*any.downcast::().unwrap()) + } else if any.is::<&str>() { + Failed::HandlerPanic((*any.downcast::<&str>().unwrap()).to_string()) + } else { + Failed::HandlerPanic("Panic in handler".into()) + } + })? + .map_err(Failed::Application), Callback::Sync(cb) => { let cb = Arc::clone(cb); match spawn_blocking(move || cb(job)).await { - Err(join_error) => Err(Failed::HandlerPanic(join_error)), + Err(join_error) => Err(Failed::SyncHandlerPanic(join_error)), Ok(processing_result) => processing_result.map_err(Failed::Application), } } @@ -330,7 +345,8 @@ impl Worker { let fail = match e { Failed::BadJobType(jt) => Fail::generic(jid, format!("No handler for {}", jt)), Failed::Application(e) => Fail::generic_with_backtrace(jid, e), - Failed::HandlerPanic(e) => Fail::generic_with_backtrace(jid, e), + Failed::HandlerPanic(e) => Fail::generic(jid, e), + Failed::SyncHandlerPanic(e) => Fail::generic_with_backtrace(jid, e), }; self.worker_states.register_failure(worker, fail.clone()); self.c.issue(&fail).await?.read_ok().await?; diff --git a/tests/real/community.rs b/tests/real/community.rs index e1d14457..ceef348c 100644 --- a/tests/real/community.rs +++ b/tests/real/community.rs @@ -729,22 +729,38 @@ async fn test_panic_in_handler() { let local = "test_panic_in_handler"; let mut w = Worker::builder::() - .register_blocking_fn("panic", |_j| { + .register_blocking_fn("panic_SYNC_handler", |_j| { panic!("Panic inside the handler..."); }) + .register_fn("panic_ASYNC_handler", |j| async move { + // potentially going out of bounds + let arg1 = &j.args()[0]; + let arg2 = &j.args()[1]; + let _ = arg1.as_i64().unwrap() + arg2.as_i64().unwrap(); + Ok::<(), io::Error>(()) + }) .connect(None) .await .unwrap(); - Client::connect(None) - .await - .unwrap() - .enqueue(Job::builder("panic").queue(local).build()) + let mut c = Client::connect(None).await.unwrap(); + + // note how we are not specifying any args for this job, + // so indexing into `job.args()` will panic + c.enqueue(Job::builder("panic_SYNC_handler").queue(local).build()) .await .unwrap(); // we _did_ consume and process the job, the processing result itself though // was a failure; however, a panic in the handler was "intercepted" and communicated - // to the Faktory server via the FAIL command + // to the Faktory server via the FAIL command; + // note how the test run is not interrupted with a panic + assert!(w.run_one(0, &[local]).await.unwrap()); + + c.enqueue(Job::builder("panic_ASYNC_handler").queue(local).build()) + .await + .unwrap(); + + // same for async handler, note how the test run is not interrupted with a panic assert!(w.run_one(0, &[local]).await.unwrap()); }