Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

No-blocking operations for web based clients #30

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 17 additions & 5 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions ewebsock/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,11 @@ tokio = [


[dependencies]
async-mutex = "1.4.0"
document-features = "0.2"
futures-channel = "0.3.30"
log = "0.4"
futures-util = { version = "0.3.21", default-features = false, features = ["sink"] }

# native:
[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
Expand All @@ -60,6 +63,7 @@ tokio-tungstenite = { version = ">=0.17, <=0.21", optional = true }
wasm-bindgen = "0.2"
js-sys = "0.3"
wasm-bindgen-futures = "0.4"
futures = "0.3"

[dependencies.web-sys]
version = "0.3"
Expand Down
27 changes: 19 additions & 8 deletions ewebsock/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@
//! ## Feature flags
#![doc = document_features::document_features!()]
//!
//!

#![warn(missing_docs)] // let's keep ewebsock well-documented

#[cfg(not(target_arch = "wasm32"))]
#[cfg(not(feature = "tokio"))]
mod native_tungstenite;
Expand All @@ -41,6 +41,10 @@
#[cfg(target_arch = "wasm32")]
pub use web::*;

use async_mutex::Mutex;
use futures_channel::mpsc::{channel, Receiver, Sender};

Check failure on line 45 in ewebsock/src/lib.rs

View workflow job for this annotation

GitHub Actions / Rust format, cranky, check, test, doc

unused import: `Sender`

Check warning on line 45 in ewebsock/src/lib.rs

View workflow job for this annotation

GitHub Actions / Check wasm32

unused import: `Sender`
use futures_util::stream::StreamExt;

// ----------------------------------------------------------------------------

/// A web-socket message.
Expand Down Expand Up @@ -81,7 +85,7 @@

/// Receiver for incoming [`WsEvent`]s.
pub struct WsReceiver {
rx: std::sync::mpsc::Receiver<WsEvent>,
pub rx: Mutex<Receiver<WsEvent>>,

Check failure on line 88 in ewebsock/src/lib.rs

View workflow job for this annotation

GitHub Actions / Rust format, cranky, check, test, doc

missing documentation for a struct field

Check warning on line 88 in ewebsock/src/lib.rs

View workflow job for this annotation

GitHub Actions / Check wasm32

missing documentation for a struct field
}

impl WsReceiver {
Expand All @@ -94,22 +98,29 @@
///
/// This can be used to wake up the UI thread.
pub fn new_with_callback(wake_up: impl Fn() + Send + Sync + 'static) -> (Self, EventHandler) {
let (tx, rx) = std::sync::mpsc::channel();
let (mut tx, rx) = channel(0);

let on_event = Box::new(move |event| {
wake_up(); // wake up UI thread
if tx.send(event).is_ok() {
if tx.try_send(event).is_ok() {
std::ops::ControlFlow::Continue(())
} else {
std::ops::ControlFlow::Break(())
}
});
let ws_receiver = WsReceiver { rx };

let ws_receiver = WsReceiver { rx: Mutex::new(rx) };
(ws_receiver, on_event)
}

/// Try receiving a new event without blocking.
pub fn try_recv(&self) -> Option<WsEvent> {
self.rx.try_recv().ok()
pub async fn try_recv(&self) -> Option<WsEvent> {
self.rx.lock().await.try_next().ok().flatten()
}

/// Get next message
pub async fn next(&mut self) -> std::option::Option<WsEvent> {
self.rx.lock().await.next().await
}
}

Expand All @@ -119,7 +130,7 @@
/// Short for `Result<T, ewebsock::Error>`.
pub type Result<T> = std::result::Result<T, Error>;

pub(crate) type EventHandler = Box<dyn Send + Fn(WsEvent) -> std::ops::ControlFlow<()>>;
pub(crate) type EventHandler = Box<dyn Send + FnMut(WsEvent) -> std::ops::ControlFlow<()>>;

/// Options for a connection.
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
Expand Down
20 changes: 14 additions & 6 deletions ewebsock/src/native_tungstenite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,15 @@ impl WsSender {
}
}

pub(crate) fn ws_receive_impl(url: String, options: Options, on_event: EventHandler) -> Result<()> {
pub(crate) fn ws_receive_impl(
url: String,
options: Options,
mut on_event: EventHandler,
) -> Result<()> {
std::thread::Builder::new()
.name("ewebsock".to_owned())
.spawn(move || {
if let Err(err) = ws_receiver_blocking(&url, options, &on_event) {
if let Err(err) = ws_receiver_blocking(&url, options, &mut on_event) {
on_event(WsEvent::Error(err));
} else {
log::debug!("WebSocket connection closed.");
Expand All @@ -68,7 +72,11 @@ pub(crate) fn ws_receive_impl(url: String, options: Options, on_event: EventHand
///
/// # Errors
/// All errors are returned to the caller, and NOT reported via `on_event`.
pub fn ws_receiver_blocking(url: &str, options: Options, on_event: &EventHandler) -> Result<()> {
pub fn ws_receiver_blocking(
url: &str,
options: Options,
on_event: &mut EventHandler,
) -> Result<()> {
let config = tungstenite::protocol::WebSocketConfig::from(options);
let max_redirects = 3; // tungstenite default

Expand Down Expand Up @@ -122,14 +130,14 @@ pub fn ws_receiver_blocking(url: &str, options: Options, on_event: &EventHandler
pub(crate) fn ws_connect_impl(
url: String,
options: Options,
on_event: EventHandler,
mut on_event: EventHandler,
) -> Result<WsSender> {
let (tx, rx) = std::sync::mpsc::channel();

std::thread::Builder::new()
.name("ewebsock".to_owned())
.spawn(move || {
if let Err(err) = ws_connect_blocking(&url, options, &on_event, &rx) {
if let Err(err) = ws_connect_blocking(&url, options, &mut on_event, &rx) {
on_event(WsEvent::Error(err));
} else {
log::debug!("WebSocket connection closed.");
Expand All @@ -149,7 +157,7 @@ pub(crate) fn ws_connect_impl(
pub fn ws_connect_blocking(
url: &str,
options: Options,
on_event: &EventHandler,
on_event: &mut EventHandler,
rx: &Receiver<WsMessage>,
) -> Result<()> {
let config = tungstenite::protocol::WebSocketConfig::from(options);
Expand Down
6 changes: 3 additions & 3 deletions ewebsock/src/native_tungstenite_tokio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
url: String,
options: Options,
outgoing_messages_stream: impl futures::Stream<Item = WsMessage>,
on_event: EventHandler,
mut on_event: EventHandler,
) {
use futures::StreamExt as _;

Expand Down Expand Up @@ -118,13 +118,13 @@
pub(crate) fn ws_connect_impl(
url: String,
options: Options,
on_event: EventHandler,
mut on_event: EventHandler,

Check failure on line 121 in ewebsock/src/native_tungstenite_tokio.rs

View workflow job for this annotation

GitHub Actions / Rust format, cranky, check, test, doc

variable does not need to be mutable
) -> Result<WsSender> {
Ok(ws_connect_native(url, options, on_event))
}

/// Like [`ws_connect`], but cannot fail. Only available on native builds.
fn ws_connect_native(url: String, options: Options, on_event: EventHandler) -> WsSender {
fn ws_connect_native(url: String, options: Options, mut on_event: EventHandler) -> WsSender {

Check failure on line 127 in ewebsock/src/native_tungstenite_tokio.rs

View workflow job for this annotation

GitHub Actions / Rust format, cranky, check, test, doc

variable does not need to be mutable
let (tx, mut rx) = tokio::sync::mpsc::channel(1000);

let outgoing_messages_stream = async_stream::stream! {
Expand Down
72 changes: 51 additions & 21 deletions ewebsock/src/web.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
use crate::{EventHandler, Options, Result, WsEvent, WsMessage};

use async_mutex::Mutex;
use wasm_bindgen_futures::spawn_local;

#[allow(clippy::needless_pass_by_value)]
fn string_from_js_value(s: wasm_bindgen::JsValue) -> String {
s.as_string().unwrap_or(format!("{:#?}", s))
Expand Down Expand Up @@ -63,14 +66,18 @@
}
}

pub(crate) fn ws_receive_impl(url: String, options: Options, on_event: EventHandler) -> Result<()> {
pub(crate) fn ws_receive_impl(
url: String,
options: Options,
mut on_event: EventHandler,

Check warning on line 72 in ewebsock/src/web.rs

View workflow job for this annotation

GitHub Actions / Check wasm32

variable does not need to be mutable
) -> Result<()> {
ws_connect_impl(url, options, on_event).map(|sender| sender.forget())
}

pub(crate) fn ws_connect_impl(
url: String,
_ignored_options: Options,
on_event: EventHandler,
mut on_event: EventHandler,

Check warning on line 80 in ewebsock/src/web.rs

View workflow job for this annotation

GitHub Actions / Check wasm32

variable does not need to be mutable
) -> Result<WsSender> {
// Based on https://rustwasm.github.io/wasm-bindgen/examples/websockets.html

Expand All @@ -84,42 +91,52 @@
ws.set_binary_type(web_sys::BinaryType::Arraybuffer);

// Allow it to be shared by the different callbacks:
let on_event: std::rc::Rc<dyn Send + Fn(WsEvent) -> std::ops::ControlFlow<()>> =
on_event.into();
let on_event = std::rc::Rc::new(Mutex::new(on_event));

// onmessage callback
{
let on_event = on_event.clone();
let onmessage_callback = Closure::wrap(Box::new(move |e: web_sys::MessageEvent| {
let on_event = on_event.clone();
// Handle difference Text/Binary,...
if let Ok(abuf) = e.data().dyn_into::<js_sys::ArrayBuffer>() {
let array = js_sys::Uint8Array::new(&abuf);
on_event(WsEvent::Message(WsMessage::Binary(array.to_vec())));
spawn_local(async move {
on_event.lock().await(WsEvent::Message(WsMessage::Binary(array.to_vec())));
});
} else if let Ok(blob) = e.data().dyn_into::<web_sys::Blob>() {
// better alternative to juggling with FileReader is to use https://crates.io/crates/gloo-file
let file_reader = web_sys::FileReader::new().expect("Failed to create FileReader");
let file_reader_clone = file_reader.clone();
// create onLoadEnd callback
let on_event = on_event.clone();

let onloadend_cb = Closure::wrap(Box::new(move |_e: web_sys::ProgressEvent| {
let array = js_sys::Uint8Array::new(&file_reader_clone.result().unwrap());
on_event(WsEvent::Message(WsMessage::Binary(array.to_vec())));
let on_event = on_event.clone();
spawn_local(async move {
on_event.lock().await(WsEvent::Message(WsMessage::Binary(array.to_vec())));
});
})
as Box<dyn FnMut(web_sys::ProgressEvent)>);

file_reader.set_onloadend(Some(onloadend_cb.as_ref().unchecked_ref()));
file_reader
.read_as_array_buffer(&blob)
.expect("blob not readable");
onloadend_cb.forget();
} else if let Ok(txt) = e.data().dyn_into::<js_sys::JsString>() {
on_event(WsEvent::Message(WsMessage::Text(string_from_js_string(
txt,
))));
spawn_local(async move {
on_event.lock().await(WsEvent::Message(WsMessage::Text(
string_from_js_string(txt),
)));
});
} else {
log::debug!("Unknown websocket message received: {:?}", e.data());
on_event(WsEvent::Message(WsMessage::Unknown(string_from_js_value(
e.data(),
))));
spawn_local(async move {
on_event.lock().await(WsEvent::Message(WsMessage::Unknown(
string_from_js_value(e.data()),
)));
});
}
}) as Box<dyn FnMut(web_sys::MessageEvent)>);

Expand All @@ -131,32 +148,45 @@
}

{
// let on_event_cb = &on_event.clone();
let on_event = on_event.clone();
let onerror_callback = Closure::wrap(Box::new(move |error_event: web_sys::ErrorEvent| {
log::error!(
"error event: {}: {:?}",
error_event.message(),
error_event.error()
);
on_event(WsEvent::Error(error_event.message()));
let on_event = on_event.clone();
spawn_local(async move {
log::error!(
"error event: {}: {:?}",
error_event.message(),
error_event.error()
);
on_event.clone().lock().await(WsEvent::Error(error_event.message()));
});
}) as Box<dyn FnMut(web_sys::ErrorEvent)>);

ws.set_onerror(Some(onerror_callback.as_ref().unchecked_ref()));
onerror_callback.forget();
}

{
let on_event = on_event.clone();
let onopen_callback = Closure::wrap(Box::new(move |_| {
on_event(WsEvent::Opened);
let on_event = on_event.clone();
spawn_local(async move {
on_event.lock().await(WsEvent::Opened);
});
}) as Box<dyn FnMut(wasm_bindgen::JsValue)>);
ws.set_onopen(Some(onopen_callback.as_ref().unchecked_ref()));
onopen_callback.forget();
}

{
let on_event = on_event.clone();
let onclose_callback = Closure::wrap(Box::new(move |_| {
on_event(WsEvent::Closed);
let on_event = on_event.clone();
spawn_local(async move {
on_event.lock().await(WsEvent::Closed);
});
}) as Box<dyn FnMut(wasm_bindgen::JsValue)>);

ws.set_onclose(Some(onclose_callback.as_ref().unchecked_ref()));
onclose_callback.forget();
}
Expand Down
4 changes: 2 additions & 2 deletions example_app/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
}

if let Some(frontend) = &mut self.frontend {
frontend.ui(ctx);

Check warning on line 56 in example_app/src/app.rs

View workflow job for this annotation

GitHub Actions / Check wasm32

unused implementer of `Future` that must be used
}
}
}
Expand Down Expand Up @@ -93,8 +93,8 @@
}
}

fn ui(&mut self, ctx: &egui::Context) {
while let Some(event) = self.ws_receiver.try_recv() {
async fn ui(&mut self, ctx: &egui::Context) {
while let Some(event) = self.ws_receiver.try_recv().await {
self.events.push(event);
}

Expand Down
Loading