Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add method to terminate a worker #420

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -360,10 +360,11 @@

## `worker`

### Version 0.4.1
### Version 0.5.0

- Remove Cloning on WorkerBridge (#388)
- Add an example of processing transferrable types with worker (#371)
- Add method to terminate a worker (#420)

### Version 0.4.0

Expand Down
147 changes: 128 additions & 19 deletions crates/worker/src/actor/bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,23 +8,102 @@ use std::rc::Weak;
use serde::{Deserialize, Serialize};

use super::handler_id::HandlerId;
use super::messages::FromWorker;
use super::messages::ToWorker;
use super::native_worker::DedicatedWorker;
use super::native_worker::NativeWorkerExt;
use super::traits::Worker;
use super::{Callback, Shared};
use super::Callback;
use crate::codec::Codec;

pub(crate) type ToWorkerQueue<W> = Vec<ToWorker<W>>;
pub(crate) type CallbackMap<W> = HashMap<HandlerId, Weak<dyn Fn(<W as Worker>::Output)>>;
type PostMsg<W> = Box<dyn Fn(&DedicatedWorker, ToWorker<W>)>;

struct WorkerBridgeInner<W>
pub(crate) struct WorkerBridgeInner<W>
where
W: Worker,
{
// When worker is loaded, queue becomes None.
pending_queue: Shared<Option<ToWorkerQueue<W>>>,
callbacks: Shared<CallbackMap<W>>,
post_msg: Rc<dyn Fn(ToWorker<W>)>,
pending_queue: RefCell<Option<ToWorkerQueue<W>>>,
callbacks: RefCell<CallbackMap<W>>,
native_worker: RefCell<Option<DedicatedWorker>>,
post_msg: PostMsg<W>,
}

impl<W> WorkerBridgeInner<W>
where
W: Worker + 'static,
{
pub(crate) fn new<CODEC>(native_worker: DedicatedWorker, callbacks: CallbackMap<W>) -> Rc<Self>
where
CODEC: Codec,
W::Input: Serialize + for<'de> Deserialize<'de>,
W::Output: Serialize + for<'de> Deserialize<'de>,
{
let worker = native_worker.clone();

let pending_queue = RefCell::new(Some(Vec::new()));
let callbacks = RefCell::new(callbacks);
let native_worker = RefCell::new(Some(native_worker));
let post_msg = move |worker: &DedicatedWorker, msg: ToWorker<W>| {
worker.post_packed_message::<_, CODEC>(msg)
};

let self_ = Self {
pending_queue,
callbacks,
native_worker,
post_msg: Box::new(post_msg),
};
let self_ = Rc::new(self_);

let handler = {
let bridge_inner = Rc::downgrade(&self_);
// If all bridges are dropped then `self_` is dropped and `upgrade` returns `None`.
move |msg: FromWorker<W>| {
if let Some(bridge_inner) = Weak::upgrade(&bridge_inner) {
match msg {
FromWorker::WorkerLoaded => {
// Set pending queue to `None`. Unless `WorkerLoaded` is
// sent twice, this will always be `Some`.
if let Some(pending_queue) = bridge_inner.take_queue() {
// Will be `None` if the worker has been terminated.
if let Some(worker) =
bridge_inner.native_worker.borrow_mut().as_ref()
{
// Send all pending messages.
for to_worker in pending_queue.into_iter() {
(bridge_inner.post_msg)(worker, to_worker);
}
}
}
}
FromWorker::ProcessOutput(id, output) => {
let mut callbacks = bridge_inner.callbacks.borrow_mut();

if let Some(m) = callbacks.get(&id) {
if let Some(m) = Weak::upgrade(m) {
m(output);
} else {
// The bridge has been dropped.
callbacks.remove(&id);
}
}
}
}
}
}
};

worker.set_on_packed_message::<_, CODEC, _>(handler);

self_
}

fn take_queue(&self) -> Option<ToWorkerQueue<W>> {
self.pending_queue.borrow_mut().take()
}
}

impl<W> fmt::Debug for WorkerBridgeInner<W>
Expand All @@ -49,10 +128,24 @@ where
m.push(msg);
}
None => {
(self.post_msg)(msg);
if let Some(worker) = self.native_worker.borrow().as_ref() {
(self.post_msg)(worker, msg);
}
}
}
}

/// Terminate the worker, no more messages can be sent after this.
fn terminate(&self) {
if let Some(worker) = self.native_worker.borrow_mut().take() {
worker.terminate();
}
}

/// Returns true if the worker is terminated.
fn is_terminated(&self) -> bool {
self.native_worker.borrow().is_none()
}
}

impl<W> Drop for WorkerBridgeInner<W>
Expand All @@ -66,6 +159,15 @@ where
}

/// A connection manager for components interaction with workers.
///
/// Dropping this object will send a disconnect message to the worker and drop
/// the callback if set, but will have no effect on forked bridges. Note that
/// the worker will still receive and process any messages sent over the bridge
/// up to that point, but the reply will not trigger a callback. If all forked
/// bridges for a worker are dropped, the worker will be sent a destroy message.
///
/// To terminate the worker and stop execution immediately, use
/// [`terminate`](#method.terminate).
pub struct WorkerBridge<W>
where
W: Worker,
Expand All @@ -84,26 +186,16 @@ where
self.inner.send_message(ToWorker::Connected(self.id));
}

pub(crate) fn new<CODEC>(
pub(crate) fn new(
id: HandlerId,
native_worker: web_sys::Worker,
pending_queue: Rc<RefCell<Option<ToWorkerQueue<W>>>>,
callbacks: Rc<RefCell<CallbackMap<W>>>,
inner: Rc<WorkerBridgeInner<W>>,
callback: Option<Callback<W::Output>>,
) -> Self
where
CODEC: Codec,
W::Input: Serialize + for<'de> Deserialize<'de>,
{
let post_msg = move |msg: ToWorker<W>| native_worker.post_packed_message::<_, CODEC>(msg);

let self_ = Self {
inner: WorkerBridgeInner {
pending_queue,
callbacks,
post_msg: Rc::new(post_msg),
}
.into(),
inner,
id,
_worker: PhantomData,
_cb: callback,
Expand Down Expand Up @@ -146,6 +238,23 @@ where

self_
}

/// Immediately terminates the worker and stops any execution in progress,
/// for this and all forked bridges. All messages will be dropped without
/// the worker receiving them. No disconnect or destroy message is sent. Any
/// messages sent after this point are dropped (from this bridge or any
/// forks).
///
/// For more details see
/// [`web_sys::Worker::terminate`](https://rustwasm.github.io/wasm-bindgen/api/web_sys/struct.Worker.html#method.terminate).
pub fn terminate(&self) {
self.inner.terminate()
}

/// Returns true if the worker is terminated.
pub fn is_terminated(&self) -> bool {
self.inner.is_terminated()
}
}

impl<W> Drop for WorkerBridge<W>
Expand Down
53 changes: 7 additions & 46 deletions crates/worker/src/actor/spawner.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,19 @@
use std::cell::RefCell;
use std::collections::HashMap;
use std::fmt;
use std::marker::PhantomData;
use std::rc::{Rc, Weak};
use std::rc::Rc;

use gloo_utils::window;
use js_sys::Array;
use serde::de::Deserialize;
use serde::ser::Serialize;
use web_sys::{Blob, BlobPropertyBag, Url};

use super::bridge::{CallbackMap, WorkerBridge};
use super::bridge::{WorkerBridge, WorkerBridgeInner};
use super::handler_id::HandlerId;
use super::messages::FromWorker;
use super::native_worker::{DedicatedWorker, NativeWorkerExt};
use super::native_worker::DedicatedWorker;
use super::traits::Worker;
use super::{Callback, Shared};
use super::Callback;
use crate::codec::{Bincode, Codec};

fn create_worker(path: &str) -> DedicatedWorker {
Expand Down Expand Up @@ -110,53 +108,16 @@ where
W::Input: Serialize + for<'de> Deserialize<'de>,
W::Output: Serialize + for<'de> Deserialize<'de>,
{
let pending_queue = Rc::new(RefCell::new(Some(Vec::new())));
let handler_id = HandlerId::new();
let mut callbacks = HashMap::new();

if let Some(m) = self.callback.as_ref().map(Rc::downgrade) {
callbacks.insert(handler_id, m);
}

let callbacks: Shared<CallbackMap<W>> = Rc::new(RefCell::new(callbacks));

let handler = {
let pending_queue = pending_queue.clone();
let callbacks = callbacks.clone();

let worker = worker.clone();

move |msg: FromWorker<W>| match msg {
FromWorker::WorkerLoaded => {
if let Some(pending_queue) = pending_queue.borrow_mut().take() {
for to_worker in pending_queue.into_iter() {
worker.post_packed_message::<_, CODEC>(to_worker);
}
}
}
FromWorker::ProcessOutput(id, output) => {
let mut callbacks = callbacks.borrow_mut();

if let Some(m) = callbacks.get(&id) {
if let Some(m) = Weak::upgrade(m) {
m(output);
} else {
callbacks.remove(&id);
}
}
}
}
};

worker.set_on_packed_message::<_, CODEC, _>(handler);

WorkerBridge::<W>::new::<CODEC>(
handler_id,
worker,
pending_queue,
callbacks,
self.callback.clone(),
)
let inner = WorkerBridgeInner::<W>::new::<CODEC>(worker, callbacks);

WorkerBridge::<W>::new(handler_id, inner, self.callback.clone())
}

/// Spawns a Worker.
Expand Down