Skip to content

Commit

Permalink
Add method to terminate a worker
Browse files Browse the repository at this point in the history
  • Loading branch information
JonasAlaif committed Dec 7, 2023
1 parent 17b01da commit ef42783
Show file tree
Hide file tree
Showing 2 changed files with 134 additions and 65 deletions.
146 changes: 127 additions & 19 deletions crates/worker/src/actor/bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,23 +8,101 @@ use std::rc::Weak;
use serde::{Deserialize, Serialize};

use super::handler_id::HandlerId;
use super::messages::FromWorker;
use super::messages::ToWorker;
use super::native_worker::DedicatedWorker;
use super::native_worker::NativeWorkerExt;
use super::traits::Worker;
use super::{Callback, Shared};
use super::Callback;
use crate::codec::Codec;

pub(crate) type ToWorkerQueue<W> = Vec<ToWorker<W>>;
pub(crate) type CallbackMap<W> = HashMap<HandlerId, Weak<dyn Fn(<W as Worker>::Output)>>;

struct WorkerBridgeInner<W>
pub(crate) struct WorkerBridgeInner<W>
where
W: Worker,
{
// When worker is loaded, queue becomes None.
pending_queue: Shared<Option<ToWorkerQueue<W>>>,
callbacks: Shared<CallbackMap<W>>,
post_msg: Rc<dyn Fn(ToWorker<W>)>,
pending_queue: RefCell<Option<ToWorkerQueue<W>>>,
callbacks: RefCell<CallbackMap<W>>,
native_worker: RefCell<Option<DedicatedWorker>>,
post_msg: Box<dyn Fn(&DedicatedWorker, ToWorker<W>)>,
}

impl<W> WorkerBridgeInner<W>
where
W: Worker + 'static,
{
pub(crate) fn new<CODEC>(native_worker: DedicatedWorker, callbacks: CallbackMap<W>) -> Rc<Self>
where
CODEC: Codec,
W::Input: Serialize + for<'de> Deserialize<'de>,
W::Output: Serialize + for<'de> Deserialize<'de>,
{
let worker = native_worker.clone();

let pending_queue = RefCell::new(Some(Vec::new()));
let callbacks = RefCell::new(callbacks);
let native_worker = RefCell::new(Some(native_worker));
let post_msg = move |worker: &DedicatedWorker, msg: ToWorker<W>| {
worker.post_packed_message::<_, CODEC>(msg)
};

let self_ = Self {
pending_queue,
callbacks,
native_worker,
post_msg: Box::new(post_msg),
};
let self_ = Rc::new(self_);

let handler = {
let bridge_inner = Rc::downgrade(&self_);
// If all bridges are dropped then `self_` is dropped and `upgrade` returns `None`.
move |msg: FromWorker<W>| {
if let Some(bridge_inner) = Weak::upgrade(&bridge_inner) {
match msg {
FromWorker::WorkerLoaded => {
// Set pending queue to `None`. Unless `WorkerLoaded` is
// sent twice, this will always be `Some`.
if let Some(pending_queue) = bridge_inner.take_queue() {
// Will be `None` if the worker has been terminated.
if let Some(worker) =
bridge_inner.native_worker.borrow_mut().as_ref()
{
// Send all pending messages.
for to_worker in pending_queue.into_iter() {
(bridge_inner.post_msg)(worker, to_worker);
}
}
}
}
FromWorker::ProcessOutput(id, output) => {
let mut callbacks = bridge_inner.callbacks.borrow_mut();

if let Some(m) = callbacks.get(&id) {
if let Some(m) = Weak::upgrade(m) {
m(output);
} else {
// The bridge has been dropped.
callbacks.remove(&id);
}
}
}
}
}
}
};

worker.set_on_packed_message::<_, CODEC, _>(handler);

self_
}

fn take_queue(&self) -> Option<ToWorkerQueue<W>> {
self.pending_queue.borrow_mut().take()
}
}

impl<W> fmt::Debug for WorkerBridgeInner<W>
Expand All @@ -49,10 +127,24 @@ where
m.push(msg);
}
None => {
(self.post_msg)(msg);
if let Some(worker) = self.native_worker.borrow().as_ref() {
(self.post_msg)(worker, msg);
}
}
}
}

/// Terminate the worker, no more messages can be sent after this.
fn terminate(&self) {
if let Some(worker) = self.native_worker.borrow_mut().take() {
worker.terminate();
}
}

/// Returns true if the worker is terminated.
fn is_terminated(&self) -> bool {
self.native_worker.borrow().is_none()
}
}

impl<W> Drop for WorkerBridgeInner<W>
Expand All @@ -66,6 +158,15 @@ where
}

/// A connection manager for components interaction with workers.
///
/// Dropping this object will send a disconnect message to the worker and drop
/// the callback if set, but will have no effect on forked bridges. Note that
/// the worker will still receive and process any messages sent over the bridge
/// up to that point, but the reply will not trigger a callback. If all forked
/// bridges for a worker are dropped, the worker will be sent a destroy message.
///
/// To terminate the worker and stop execution immediately, use
/// [`terminate`](#method.terminate).
pub struct WorkerBridge<W>
where
W: Worker,
Expand All @@ -84,26 +185,16 @@ where
self.inner.send_message(ToWorker::Connected(self.id));
}

pub(crate) fn new<CODEC>(
pub(crate) fn new(
id: HandlerId,
native_worker: web_sys::Worker,
pending_queue: Rc<RefCell<Option<ToWorkerQueue<W>>>>,
callbacks: Rc<RefCell<CallbackMap<W>>>,
inner: Rc<WorkerBridgeInner<W>>,
callback: Option<Callback<W::Output>>,
) -> Self
where
CODEC: Codec,
W::Input: Serialize + for<'de> Deserialize<'de>,
{
let post_msg = move |msg: ToWorker<W>| native_worker.post_packed_message::<_, CODEC>(msg);

let self_ = Self {
inner: WorkerBridgeInner {
pending_queue,
callbacks,
post_msg: Rc::new(post_msg),
}
.into(),
inner,
id,
_worker: PhantomData,
_cb: callback,
Expand Down Expand Up @@ -146,6 +237,23 @@ where

self_
}

/// Immediately terminates the worker and stops any execution in progress,
/// for this and all forked bridges. All messages will be dropped without
/// the worker receiving them. No disconnect or destroy message is sent. Any
/// messages sent after this point are dropped (from this bridge or any
/// forks).
///
/// For more details see
/// [`web_sys::Worker::terminate`](https://rustwasm.github.io/wasm-bindgen/api/web_sys/struct.Worker.html#method.terminate).
pub fn terminate(&self) {
self.inner.terminate()
}

/// Returns true if the worker is terminated.
pub fn is_terminated(&self) -> bool {
self.inner.is_terminated()
}
}

impl<W> Drop for WorkerBridge<W>
Expand Down
53 changes: 7 additions & 46 deletions crates/worker/src/actor/spawner.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,19 @@
use std::cell::RefCell;
use std::collections::HashMap;
use std::fmt;
use std::marker::PhantomData;
use std::rc::{Rc, Weak};
use std::rc::Rc;

use gloo_utils::window;
use js_sys::Array;
use serde::de::Deserialize;
use serde::ser::Serialize;
use web_sys::{Blob, BlobPropertyBag, Url};

use super::bridge::{CallbackMap, WorkerBridge};
use super::bridge::{WorkerBridge, WorkerBridgeInner};
use super::handler_id::HandlerId;
use super::messages::FromWorker;
use super::native_worker::{DedicatedWorker, NativeWorkerExt};
use super::native_worker::DedicatedWorker;
use super::traits::Worker;
use super::{Callback, Shared};
use super::Callback;
use crate::codec::{Bincode, Codec};

fn create_worker(path: &str) -> DedicatedWorker {
Expand Down Expand Up @@ -110,53 +108,16 @@ where
W::Input: Serialize + for<'de> Deserialize<'de>,
W::Output: Serialize + for<'de> Deserialize<'de>,
{
let pending_queue = Rc::new(RefCell::new(Some(Vec::new())));
let handler_id = HandlerId::new();
let mut callbacks = HashMap::new();

if let Some(m) = self.callback.as_ref().map(Rc::downgrade) {
callbacks.insert(handler_id, m);
}

let callbacks: Shared<CallbackMap<W>> = Rc::new(RefCell::new(callbacks));

let handler = {
let pending_queue = pending_queue.clone();
let callbacks = callbacks.clone();

let worker = worker.clone();

move |msg: FromWorker<W>| match msg {
FromWorker::WorkerLoaded => {
if let Some(pending_queue) = pending_queue.borrow_mut().take() {
for to_worker in pending_queue.into_iter() {
worker.post_packed_message::<_, CODEC>(to_worker);
}
}
}
FromWorker::ProcessOutput(id, output) => {
let mut callbacks = callbacks.borrow_mut();

if let Some(m) = callbacks.get(&id) {
if let Some(m) = Weak::upgrade(m) {
m(output);
} else {
callbacks.remove(&id);
}
}
}
}
};

worker.set_on_packed_message::<_, CODEC, _>(handler);

WorkerBridge::<W>::new::<CODEC>(
handler_id,
worker,
pending_queue,
callbacks,
self.callback.clone(),
)
let inner = WorkerBridgeInner::<W>::new::<CODEC>(worker, callbacks);

WorkerBridge::<W>::new(handler_id, inner, self.callback.clone())
}

/// Spawns a Worker.
Expand Down

0 comments on commit ef42783

Please sign in to comment.