Skip to content

Commit

Permalink
feat(ewebsock): Integrate async-mutex and futures for non-blocking We…
Browse files Browse the repository at this point in the history
…bSocket communication
  • Loading branch information
S0c5 committed Mar 6, 2024
1 parent da4d5bf commit 2d0fbad
Show file tree
Hide file tree
Showing 7 changed files with 110 additions and 45 deletions.
22 changes: 17 additions & 5 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions ewebsock/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,11 @@ tokio = [


[dependencies]
async-mutex = "1.4.0"
document-features = "0.2"
futures-channel = "0.3.30"
log = "0.4"
futures-util = { version = "0.3.21", default-features = false, features = ["sink"] }

# native:
[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
Expand All @@ -60,6 +63,7 @@ tokio-tungstenite = { version = ">=0.17, <=0.21", optional = true }
wasm-bindgen = "0.2"
js-sys = "0.3"
wasm-bindgen-futures = "0.4"
futures = "0.3"

[dependencies.web-sys]
version = "0.3"
Expand Down
27 changes: 19 additions & 8 deletions ewebsock/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@
//! ## Feature flags
#![doc = document_features::document_features!()]
//!
//!

#![warn(missing_docs)] // let's keep ewebsock well-documented

#[cfg(not(target_arch = "wasm32"))]
#[cfg(not(feature = "tokio"))]
mod native_tungstenite;
Expand All @@ -41,6 +41,10 @@ mod web;
#[cfg(target_arch = "wasm32")]
pub use web::*;

use async_mutex::Mutex;
use futures_channel::mpsc::{channel, Receiver, Sender};

Check failure on line 45 in ewebsock/src/lib.rs

View workflow job for this annotation

GitHub Actions / Rust format, cranky, check, test, doc

unused import: `Sender`

Check warning on line 45 in ewebsock/src/lib.rs

View workflow job for this annotation

GitHub Actions / Check wasm32

unused import: `Sender`
use futures_util::stream::StreamExt;

// ----------------------------------------------------------------------------

/// A web-socket message.
Expand Down Expand Up @@ -81,7 +85,7 @@ pub enum WsEvent {

/// Receiver for incoming [`WsEvent`]s.
pub struct WsReceiver {
rx: std::sync::mpsc::Receiver<WsEvent>,
pub rx: Mutex<Receiver<WsEvent>>,

Check failure on line 88 in ewebsock/src/lib.rs

View workflow job for this annotation

GitHub Actions / Rust format, cranky, check, test, doc

missing documentation for a struct field

Check warning on line 88 in ewebsock/src/lib.rs

View workflow job for this annotation

GitHub Actions / Check wasm32

missing documentation for a struct field
}

impl WsReceiver {
Expand All @@ -94,22 +98,29 @@ impl WsReceiver {
///
/// This can be used to wake up the UI thread.
pub fn new_with_callback(wake_up: impl Fn() + Send + Sync + 'static) -> (Self, EventHandler) {
let (tx, rx) = std::sync::mpsc::channel();
let (mut tx, rx) = channel(0);

let on_event = Box::new(move |event| {
wake_up(); // wake up UI thread
if tx.send(event).is_ok() {
if tx.try_send(event).is_ok() {
std::ops::ControlFlow::Continue(())
} else {
std::ops::ControlFlow::Break(())
}
});
let ws_receiver = WsReceiver { rx };

let ws_receiver = WsReceiver { rx: Mutex::new(rx) };
(ws_receiver, on_event)
}

/// Try receiving a new event without blocking.
pub fn try_recv(&self) -> Option<WsEvent> {
self.rx.try_recv().ok()
pub async fn try_recv(&self) -> Option<WsEvent> {
self.rx.lock().await.try_next().ok().flatten()
}

/// Get next message
pub async fn next(&mut self) -> std::option::Option<WsEvent> {
self.rx.lock().await.next().await
}
}

Expand All @@ -119,7 +130,7 @@ pub type Error = String;
/// Short for `Result<T, ewebsock::Error>`.
pub type Result<T> = std::result::Result<T, Error>;

pub(crate) type EventHandler = Box<dyn Send + Fn(WsEvent) -> std::ops::ControlFlow<()>>;
pub(crate) type EventHandler = Box<dyn Send + FnMut(WsEvent) -> std::ops::ControlFlow<()>>;

/// Options for a connection.
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
Expand Down
20 changes: 14 additions & 6 deletions ewebsock/src/native_tungstenite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,15 @@ impl WsSender {
}
}

pub(crate) fn ws_receive_impl(url: String, options: Options, on_event: EventHandler) -> Result<()> {
pub(crate) fn ws_receive_impl(
url: String,
options: Options,
mut on_event: EventHandler,
) -> Result<()> {
std::thread::Builder::new()
.name("ewebsock".to_owned())
.spawn(move || {
if let Err(err) = ws_receiver_blocking(&url, options, &on_event) {
if let Err(err) = ws_receiver_blocking(&url, options, &mut on_event) {
on_event(WsEvent::Error(err));
} else {
log::debug!("WebSocket connection closed.");
Expand All @@ -68,7 +72,11 @@ pub(crate) fn ws_receive_impl(url: String, options: Options, on_event: EventHand
///
/// # Errors
/// All errors are returned to the caller, and NOT reported via `on_event`.
pub fn ws_receiver_blocking(url: &str, options: Options, on_event: &EventHandler) -> Result<()> {
pub fn ws_receiver_blocking(
url: &str,
options: Options,
on_event: &mut EventHandler,
) -> Result<()> {
let config = tungstenite::protocol::WebSocketConfig::from(options);
let max_redirects = 3; // tungstenite default

Expand Down Expand Up @@ -122,14 +130,14 @@ pub fn ws_receiver_blocking(url: &str, options: Options, on_event: &EventHandler
pub(crate) fn ws_connect_impl(
url: String,
options: Options,
on_event: EventHandler,
mut on_event: EventHandler,
) -> Result<WsSender> {
let (tx, rx) = std::sync::mpsc::channel();

std::thread::Builder::new()
.name("ewebsock".to_owned())
.spawn(move || {
if let Err(err) = ws_connect_blocking(&url, options, &on_event, &rx) {
if let Err(err) = ws_connect_blocking(&url, options, &mut on_event, &rx) {
on_event(WsEvent::Error(err));
} else {
log::debug!("WebSocket connection closed.");
Expand All @@ -149,7 +157,7 @@ pub(crate) fn ws_connect_impl(
pub fn ws_connect_blocking(
url: &str,
options: Options,
on_event: &EventHandler,
on_event: &mut EventHandler,
rx: &Receiver<WsMessage>,
) -> Result<()> {
let config = tungstenite::protocol::WebSocketConfig::from(options);
Expand Down
6 changes: 3 additions & 3 deletions ewebsock/src/native_tungstenite_tokio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ async fn ws_connect_async(
url: String,
options: Options,
outgoing_messages_stream: impl futures::Stream<Item = WsMessage>,
on_event: EventHandler,
mut on_event: EventHandler,
) {
use futures::StreamExt as _;

Expand Down Expand Up @@ -118,13 +118,13 @@ async fn ws_connect_async(
pub(crate) fn ws_connect_impl(
url: String,
options: Options,
on_event: EventHandler,
mut on_event: EventHandler,

Check failure on line 121 in ewebsock/src/native_tungstenite_tokio.rs

View workflow job for this annotation

GitHub Actions / Rust format, cranky, check, test, doc

variable does not need to be mutable
) -> Result<WsSender> {
Ok(ws_connect_native(url, options, on_event))
}

/// Like [`ws_connect`], but cannot fail. Only available on native builds.
fn ws_connect_native(url: String, options: Options, on_event: EventHandler) -> WsSender {
fn ws_connect_native(url: String, options: Options, mut on_event: EventHandler) -> WsSender {

Check failure on line 127 in ewebsock/src/native_tungstenite_tokio.rs

View workflow job for this annotation

GitHub Actions / Rust format, cranky, check, test, doc

variable does not need to be mutable
let (tx, mut rx) = tokio::sync::mpsc::channel(1000);

let outgoing_messages_stream = async_stream::stream! {
Expand Down
72 changes: 51 additions & 21 deletions ewebsock/src/web.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
use crate::{EventHandler, Options, Result, WsEvent, WsMessage};

use async_mutex::Mutex;
use wasm_bindgen_futures::spawn_local;

#[allow(clippy::needless_pass_by_value)]
fn string_from_js_value(s: wasm_bindgen::JsValue) -> String {
s.as_string().unwrap_or(format!("{:#?}", s))
Expand Down Expand Up @@ -63,14 +66,18 @@ impl WsSender {
}
}

pub(crate) fn ws_receive_impl(url: String, options: Options, on_event: EventHandler) -> Result<()> {
pub(crate) fn ws_receive_impl(
url: String,
options: Options,
mut on_event: EventHandler,

Check warning on line 72 in ewebsock/src/web.rs

View workflow job for this annotation

GitHub Actions / Check wasm32

variable does not need to be mutable
) -> Result<()> {
ws_connect_impl(url, options, on_event).map(|sender| sender.forget())
}

pub(crate) fn ws_connect_impl(
url: String,
_ignored_options: Options,
on_event: EventHandler,
mut on_event: EventHandler,

Check warning on line 80 in ewebsock/src/web.rs

View workflow job for this annotation

GitHub Actions / Check wasm32

variable does not need to be mutable
) -> Result<WsSender> {
// Based on https://rustwasm.github.io/wasm-bindgen/examples/websockets.html

Expand All @@ -84,42 +91,52 @@ pub(crate) fn ws_connect_impl(
ws.set_binary_type(web_sys::BinaryType::Arraybuffer);

// Allow it to be shared by the different callbacks:
let on_event: std::rc::Rc<dyn Send + Fn(WsEvent) -> std::ops::ControlFlow<()>> =
on_event.into();
let on_event = std::rc::Rc::new(Mutex::new(on_event));

// onmessage callback
{
let on_event = on_event.clone();
let onmessage_callback = Closure::wrap(Box::new(move |e: web_sys::MessageEvent| {
let on_event = on_event.clone();
// Handle difference Text/Binary,...
if let Ok(abuf) = e.data().dyn_into::<js_sys::ArrayBuffer>() {
let array = js_sys::Uint8Array::new(&abuf);
on_event(WsEvent::Message(WsMessage::Binary(array.to_vec())));
spawn_local(async move {
on_event.lock().await(WsEvent::Message(WsMessage::Binary(array.to_vec())));
});
} else if let Ok(blob) = e.data().dyn_into::<web_sys::Blob>() {
// better alternative to juggling with FileReader is to use https://crates.io/crates/gloo-file
let file_reader = web_sys::FileReader::new().expect("Failed to create FileReader");
let file_reader_clone = file_reader.clone();
// create onLoadEnd callback
let on_event = on_event.clone();

let onloadend_cb = Closure::wrap(Box::new(move |_e: web_sys::ProgressEvent| {
let array = js_sys::Uint8Array::new(&file_reader_clone.result().unwrap());
on_event(WsEvent::Message(WsMessage::Binary(array.to_vec())));
let on_event = on_event.clone();
spawn_local(async move {
on_event.lock().await(WsEvent::Message(WsMessage::Binary(array.to_vec())));
});
})
as Box<dyn FnMut(web_sys::ProgressEvent)>);

file_reader.set_onloadend(Some(onloadend_cb.as_ref().unchecked_ref()));
file_reader
.read_as_array_buffer(&blob)
.expect("blob not readable");
onloadend_cb.forget();
} else if let Ok(txt) = e.data().dyn_into::<js_sys::JsString>() {
on_event(WsEvent::Message(WsMessage::Text(string_from_js_string(
txt,
))));
spawn_local(async move {
on_event.lock().await(WsEvent::Message(WsMessage::Text(
string_from_js_string(txt),
)));
});
} else {
log::debug!("Unknown websocket message received: {:?}", e.data());
on_event(WsEvent::Message(WsMessage::Unknown(string_from_js_value(
e.data(),
))));
spawn_local(async move {
on_event.lock().await(WsEvent::Message(WsMessage::Unknown(
string_from_js_value(e.data()),
)));
});
}
}) as Box<dyn FnMut(web_sys::MessageEvent)>);

Expand All @@ -131,32 +148,45 @@ pub(crate) fn ws_connect_impl(
}

{
// let on_event_cb = &on_event.clone();
let on_event = on_event.clone();
let onerror_callback = Closure::wrap(Box::new(move |error_event: web_sys::ErrorEvent| {
log::error!(
"error event: {}: {:?}",
error_event.message(),
error_event.error()
);
on_event(WsEvent::Error(error_event.message()));
let on_event = on_event.clone();
spawn_local(async move {
log::error!(
"error event: {}: {:?}",
error_event.message(),
error_event.error()
);
on_event.clone().lock().await(WsEvent::Error(error_event.message()));
});
}) as Box<dyn FnMut(web_sys::ErrorEvent)>);

ws.set_onerror(Some(onerror_callback.as_ref().unchecked_ref()));
onerror_callback.forget();
}

{
let on_event = on_event.clone();
let onopen_callback = Closure::wrap(Box::new(move |_| {
on_event(WsEvent::Opened);
let on_event = on_event.clone();
spawn_local(async move {
on_event.lock().await(WsEvent::Opened);
});
}) as Box<dyn FnMut(wasm_bindgen::JsValue)>);
ws.set_onopen(Some(onopen_callback.as_ref().unchecked_ref()));
onopen_callback.forget();
}

{
let on_event = on_event.clone();
let onclose_callback = Closure::wrap(Box::new(move |_| {
on_event(WsEvent::Closed);
let on_event = on_event.clone();
spawn_local(async move {
on_event.lock().await(WsEvent::Closed);
});
}) as Box<dyn FnMut(wasm_bindgen::JsValue)>);

ws.set_onclose(Some(onclose_callback.as_ref().unchecked_ref()));
onclose_callback.forget();
}
Expand Down
4 changes: 2 additions & 2 deletions example_app/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,8 @@ impl FrontEnd {
}
}

fn ui(&mut self, ctx: &egui::Context) {
while let Some(event) = self.ws_receiver.try_recv() {
async fn ui(&mut self, ctx: &egui::Context) {
while let Some(event) = self.ws_receiver.try_recv().await {
self.events.push(event);
}

Expand Down

0 comments on commit 2d0fbad

Please sign in to comment.