From 25c0fe51da79e856af5357a452ec80a18d0879e6 Mon Sep 17 00:00:00 2001 From: Pavel Mikhalkevich Date: Sun, 18 Aug 2024 16:29:33 +0400 Subject: [PATCH] Avoid double-arcing --- src/worker/builder.rs | 3 +-- src/worker/mod.rs | 40 +++++++++++++++++++++++----------------- 2 files changed, 24 insertions(+), 19 deletions(-) diff --git a/src/worker/builder.rs b/src/worker/builder.rs index a947eb03..169c1199 100644 --- a/src/worker/builder.rs +++ b/src/worker/builder.rs @@ -4,7 +4,6 @@ use crate::{ Error, Job, JobRunner, Reconnect, WorkerId, }; use std::future::Future; -use std::sync::Arc; use std::time::Duration; use tokio::io::{AsyncBufRead, AsyncRead, AsyncWrite, BufStream}; use tokio::net::TcpStream as TokioStream; @@ -225,7 +224,7 @@ impl WorkerBuilder { H: Fn(Job) -> Result<(), E> + Send + Sync + 'static, { self.callbacks - .insert(kind.into(), super::Callback::Sync(Arc::new(handler))); + .insert(kind.into(), super::Callback::Sync(Box::new(handler))); self } diff --git a/src/worker/mod.rs b/src/worker/mod.rs index cee2f63d..0f734792 100644 --- a/src/worker/mod.rs +++ b/src/worker/mod.rs @@ -8,7 +8,7 @@ use std::process; use std::sync::{atomic, Arc}; use std::time::Duration; use std::{error::Error as StdError, sync::atomic::AtomicUsize}; -use tokio::task::{spawn_blocking, AbortHandle, JoinError, JoinSet}; +use tokio::task::{spawn, spawn_blocking, AbortHandle, JoinError, JoinSet}; use tokio::time::sleep as tokio_sleep; mod builder; @@ -29,7 +29,7 @@ type ShutdownSignal = Pin + 'static + Send>>; pub(crate) enum Callback { Async(runner::BoxedJobRunner), - Sync(Arc Result<(), E> + Sync + Send + 'static>), + Sync(Box Result<(), E> + Sync + Send + 'static>), } type CallbacksRegistry = FnvHashMap>; @@ -222,31 +222,37 @@ impl Worker { async fn run_job(&mut self, job: Job) -> Result<(), Failed> { let handler = self .callbacks - .get(job.kind()) + .get(&job.kind) .ok_or(Failed::BadJobType(job.kind().to_string()))?; - match handler { + let spawning_result = match handler { Callback::Async(_) => { let callbacks = self.callbacks.clone(); - let process = async move { - let cb = callbacks.get(job.kind()).unwrap(); - if let Callback::Async(cb) = cb { + let processing_task = async move { + let callback = callbacks.get(&job.kind).unwrap(); + if let Callback::Async(cb) = callback { cb.run(job).await } else { unreachable!() } }; - match tokio::spawn(process).await { - Err(join_error) => Err(Failed::HandlerPanic(join_error)), - Ok(processing_result) => processing_result.map_err(Failed::Application), - } + spawn(processing_task).await } - Callback::Sync(cb) => { - let cb = Arc::clone(cb); - match spawn_blocking(move || cb(job)).await { - Err(join_error) => Err(Failed::HandlerPanic(join_error)), - Ok(processing_result) => processing_result.map_err(Failed::Application), - } + Callback::Sync(_) => { + let callbacks = self.callbacks.clone(); + let processing_task = move || { + let callback = callbacks.get(&job.kind).unwrap(); + if let Callback::Sync(cb) = callback { + cb(job) + } else { + unreachable!() + } + }; + spawn_blocking(processing_task).await } + }; + match spawning_result { + Err(join_error) => Err(Failed::HandlerPanic(join_error)), + Ok(processing_result) => processing_result.map_err(Failed::Application), } }