Skip to content

Commit

Permalink
Avoid double-arcing
Browse files Browse the repository at this point in the history
  • Loading branch information
rustworthy committed Aug 18, 2024
1 parent 286e1b3 commit 25c0fe5
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 19 deletions.
3 changes: 1 addition & 2 deletions src/worker/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use crate::{
Error, Job, JobRunner, Reconnect, WorkerId,
};
use std::future::Future;
use std::sync::Arc;
use std::time::Duration;
use tokio::io::{AsyncBufRead, AsyncRead, AsyncWrite, BufStream};
use tokio::net::TcpStream as TokioStream;
Expand Down Expand Up @@ -225,7 +224,7 @@ impl<E: 'static> WorkerBuilder<E> {
H: Fn(Job) -> Result<(), E> + Send + Sync + 'static,
{
self.callbacks
.insert(kind.into(), super::Callback::Sync(Arc::new(handler)));
.insert(kind.into(), super::Callback::Sync(Box::new(handler)));
self
}

Expand Down
40 changes: 23 additions & 17 deletions src/worker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::process;
use std::sync::{atomic, Arc};
use std::time::Duration;
use std::{error::Error as StdError, sync::atomic::AtomicUsize};
use tokio::task::{spawn_blocking, AbortHandle, JoinError, JoinSet};
use tokio::task::{spawn, spawn_blocking, AbortHandle, JoinError, JoinSet};
use tokio::time::sleep as tokio_sleep;

mod builder;
Expand All @@ -29,7 +29,7 @@ type ShutdownSignal = Pin<Box<dyn Future<Output = ()> + 'static + Send>>;

pub(crate) enum Callback<E> {
Async(runner::BoxedJobRunner<E>),
Sync(Arc<dyn Fn(Job) -> Result<(), E> + Sync + Send + 'static>),
Sync(Box<dyn Fn(Job) -> Result<(), E> + Sync + Send + 'static>),
}

type CallbacksRegistry<E> = FnvHashMap<String, Callback<E>>;
Expand Down Expand Up @@ -222,31 +222,37 @@ impl<E: StdError + 'static + Send> Worker<E> {
async fn run_job(&mut self, job: Job) -> Result<(), Failed<E, JoinError>> {
let handler = self
.callbacks
.get(job.kind())
.get(&job.kind)
.ok_or(Failed::BadJobType(job.kind().to_string()))?;
match handler {
let spawning_result = match handler {
Callback::Async(_) => {
let callbacks = self.callbacks.clone();
let process = async move {
let cb = callbacks.get(job.kind()).unwrap();
if let Callback::Async(cb) = cb {
let processing_task = async move {
let callback = callbacks.get(&job.kind).unwrap();
if let Callback::Async(cb) = callback {
cb.run(job).await
} else {
unreachable!()

Check warning on line 235 in src/worker/mod.rs

View check run for this annotation

Codecov / codecov/patch

src/worker/mod.rs#L235

Added line #L235 was not covered by tests
}
};
match tokio::spawn(process).await {
Err(join_error) => Err(Failed::HandlerPanic(join_error)),
Ok(processing_result) => processing_result.map_err(Failed::Application),
}
spawn(processing_task).await
}
Callback::Sync(cb) => {
let cb = Arc::clone(cb);
match spawn_blocking(move || cb(job)).await {
Err(join_error) => Err(Failed::HandlerPanic(join_error)),
Ok(processing_result) => processing_result.map_err(Failed::Application),
}
Callback::Sync(_) => {
let callbacks = self.callbacks.clone();
let processing_task = move || {
let callback = callbacks.get(&job.kind).unwrap();
if let Callback::Sync(cb) = callback {
cb(job)
} else {
unreachable!()

Check warning on line 247 in src/worker/mod.rs

View check run for this annotation

Codecov / codecov/patch

src/worker/mod.rs#L247

Added line #L247 was not covered by tests
}
};
spawn_blocking(processing_task).await
}
};
match spawning_result {
Err(join_error) => Err(Failed::HandlerPanic(join_error)),
Ok(processing_result) => processing_result.map_err(Failed::Application),
}
}

Expand Down

0 comments on commit 25c0fe5

Please sign in to comment.