diff --git a/crates/worker/src/actor/bridge.rs b/crates/worker/src/actor/bridge.rs index 72366659..ec539a53 100644 --- a/crates/worker/src/actor/bridge.rs +++ b/crates/worker/src/actor/bridge.rs @@ -8,23 +8,101 @@ use std::rc::Weak; use serde::{Deserialize, Serialize}; use super::handler_id::HandlerId; +use super::messages::FromWorker; use super::messages::ToWorker; +use super::native_worker::DedicatedWorker; use super::native_worker::NativeWorkerExt; use super::traits::Worker; -use super::{Callback, Shared}; +use super::Callback; use crate::codec::Codec; pub(crate) type ToWorkerQueue = Vec>; pub(crate) type CallbackMap = HashMap::Output)>>; -struct WorkerBridgeInner +pub(crate) struct WorkerBridgeInner where W: Worker, { // When worker is loaded, queue becomes None. - pending_queue: Shared>>, - callbacks: Shared>, - post_msg: Rc)>, + pending_queue: RefCell>>, + callbacks: RefCell>, + native_worker: RefCell>, + post_msg: Box)>, +} + +impl WorkerBridgeInner +where + W: Worker + 'static, +{ + pub(crate) fn new(native_worker: DedicatedWorker, callbacks: CallbackMap) -> Rc + where + CODEC: Codec, + W::Input: Serialize + for<'de> Deserialize<'de>, + W::Output: Serialize + for<'de> Deserialize<'de>, + { + let worker = native_worker.clone(); + + let pending_queue = RefCell::new(Some(Vec::new())); + let callbacks = RefCell::new(callbacks); + let native_worker = RefCell::new(Some(native_worker)); + let post_msg = move |worker: &DedicatedWorker, msg: ToWorker| { + worker.post_packed_message::<_, CODEC>(msg) + }; + + let self_ = Self { + pending_queue, + callbacks, + native_worker, + post_msg: Box::new(post_msg), + }; + let self_ = Rc::new(self_); + + let handler = { + let bridge_inner = Rc::downgrade(&self_); + // If all bridges are dropped then `self_` is dropped and `upgrade` returns `None`. + move |msg: FromWorker| { + if let Some(bridge_inner) = Weak::upgrade(&bridge_inner) { + match msg { + FromWorker::WorkerLoaded => { + // Set pending queue to `None`. Unless `WorkerLoaded` is + // sent twice, this will always be `Some`. + if let Some(pending_queue) = bridge_inner.take_queue() { + // Will be `None` if the worker has been terminated. + if let Some(worker) = + bridge_inner.native_worker.borrow_mut().as_ref() + { + // Send all pending messages. + for to_worker in pending_queue.into_iter() { + (bridge_inner.post_msg)(worker, to_worker); + } + } + } + } + FromWorker::ProcessOutput(id, output) => { + let mut callbacks = bridge_inner.callbacks.borrow_mut(); + + if let Some(m) = callbacks.get(&id) { + if let Some(m) = Weak::upgrade(m) { + m(output); + } else { + // The bridge has been dropped. + callbacks.remove(&id); + } + } + } + } + } + } + }; + + worker.set_on_packed_message::<_, CODEC, _>(handler); + + self_ + } + + fn take_queue(&self) -> Option> { + self.pending_queue.borrow_mut().take() + } } impl fmt::Debug for WorkerBridgeInner @@ -49,10 +127,24 @@ where m.push(msg); } None => { - (self.post_msg)(msg); + if let Some(worker) = self.native_worker.borrow().as_ref() { + (self.post_msg)(worker, msg); + } } } } + + /// Terminate the worker, no more messages can be sent after this. + fn terminate(&self) { + if let Some(worker) = self.native_worker.borrow_mut().take() { + worker.terminate(); + } + } + + /// Returns true if the worker is terminated. + fn is_terminated(&self) -> bool { + self.native_worker.borrow().is_none() + } } impl Drop for WorkerBridgeInner @@ -66,6 +158,15 @@ where } /// A connection manager for components interaction with workers. +/// +/// Dropping this object will send a disconnect message to the worker and drop +/// the callback if set, but will have no effect on forked bridges. Note that +/// the worker will still receive and process any messages sent over the bridge +/// up to that point, but the reply will not trigger a callback. If all forked +/// bridges for a worker are dropped, the worker will be sent a destroy message. +/// +/// To terminate the worker and stop execution immediately, use +/// [`terminate`](#method.terminate). pub struct WorkerBridge where W: Worker, @@ -84,26 +185,16 @@ where self.inner.send_message(ToWorker::Connected(self.id)); } - pub(crate) fn new( + pub(crate) fn new( id: HandlerId, - native_worker: web_sys::Worker, - pending_queue: Rc>>>, - callbacks: Rc>>, + inner: Rc>, callback: Option>, ) -> Self where - CODEC: Codec, W::Input: Serialize + for<'de> Deserialize<'de>, { - let post_msg = move |msg: ToWorker| native_worker.post_packed_message::<_, CODEC>(msg); - let self_ = Self { - inner: WorkerBridgeInner { - pending_queue, - callbacks, - post_msg: Rc::new(post_msg), - } - .into(), + inner, id, _worker: PhantomData, _cb: callback, @@ -146,6 +237,23 @@ where self_ } + + /// Immediately terminates the worker and stops any execution in progress, + /// for this and all forked bridges. All messages will be dropped without + /// the worker receiving them. No disconnect or destroy message is sent. Any + /// messages sent after this point are dropped (from this bridge or any + /// forks). + /// + /// For more details see + /// [`web_sys::Worker::terminate`](https://rustwasm.github.io/wasm-bindgen/api/web_sys/struct.Worker.html#method.terminate). + pub fn terminate(&self) { + self.inner.terminate() + } + + /// Returns true if the worker is terminated. + pub fn is_terminated(&self) -> bool { + self.inner.is_terminated() + } } impl Drop for WorkerBridge diff --git a/crates/worker/src/actor/spawner.rs b/crates/worker/src/actor/spawner.rs index fbe0c64f..45558802 100644 --- a/crates/worker/src/actor/spawner.rs +++ b/crates/worker/src/actor/spawner.rs @@ -1,8 +1,7 @@ -use std::cell::RefCell; use std::collections::HashMap; use std::fmt; use std::marker::PhantomData; -use std::rc::{Rc, Weak}; +use std::rc::Rc; use gloo_utils::window; use js_sys::Array; @@ -10,12 +9,11 @@ use serde::de::Deserialize; use serde::ser::Serialize; use web_sys::{Blob, BlobPropertyBag, Url}; -use super::bridge::{CallbackMap, WorkerBridge}; +use super::bridge::{WorkerBridge, WorkerBridgeInner}; use super::handler_id::HandlerId; -use super::messages::FromWorker; -use super::native_worker::{DedicatedWorker, NativeWorkerExt}; +use super::native_worker::DedicatedWorker; use super::traits::Worker; -use super::{Callback, Shared}; +use super::Callback; use crate::codec::{Bincode, Codec}; fn create_worker(path: &str) -> DedicatedWorker { @@ -110,7 +108,6 @@ where W::Input: Serialize + for<'de> Deserialize<'de>, W::Output: Serialize + for<'de> Deserialize<'de>, { - let pending_queue = Rc::new(RefCell::new(Some(Vec::new()))); let handler_id = HandlerId::new(); let mut callbacks = HashMap::new(); @@ -118,45 +115,9 @@ where callbacks.insert(handler_id, m); } - let callbacks: Shared> = Rc::new(RefCell::new(callbacks)); - - let handler = { - let pending_queue = pending_queue.clone(); - let callbacks = callbacks.clone(); - - let worker = worker.clone(); - - move |msg: FromWorker| match msg { - FromWorker::WorkerLoaded => { - if let Some(pending_queue) = pending_queue.borrow_mut().take() { - for to_worker in pending_queue.into_iter() { - worker.post_packed_message::<_, CODEC>(to_worker); - } - } - } - FromWorker::ProcessOutput(id, output) => { - let mut callbacks = callbacks.borrow_mut(); - - if let Some(m) = callbacks.get(&id) { - if let Some(m) = Weak::upgrade(m) { - m(output); - } else { - callbacks.remove(&id); - } - } - } - } - }; - - worker.set_on_packed_message::<_, CODEC, _>(handler); - - WorkerBridge::::new::( - handler_id, - worker, - pending_queue, - callbacks, - self.callback.clone(), - ) + let inner = WorkerBridgeInner::::new::(worker, callbacks); + + WorkerBridge::::new(handler_id, inner, self.callback.clone()) } /// Spawns a Worker.