Skip to content

Commit

Permalink
chore: move to tokio-tungstenite
Browse files Browse the repository at this point in the history
  • Loading branch information
qdot committed Nov 21, 2023
1 parent 95eb440 commit 61b6d56
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 16 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ serde = "1.0.192"
serde_json = "1.0.108"
# sentry = { version = "0.27.0", optional = true }
# sentry-tracing = { version = "0.27.0", optional = true }
async-tungstenite = { version = "0.23.0", features = ["tokio-runtime"] }
thiserror = "1.0.50"
getset = "0.1.2"
async-trait = "0.1.74"
Expand All @@ -54,6 +53,7 @@ console-subscriber = { version="0.2.0", optional = true }
mdns-sd = "0.8.1"
local-ip-address = "0.5.6"
rand = "0.8.5"
tokio-tungstenite = "0.20.1"

[build-dependencies]
vergen = "7.5.1"
Expand Down
29 changes: 14 additions & 15 deletions src/frontend/websocket_frontend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::error::IntifaceError;

use async_trait::async_trait;
use futures::FutureExt;
use futures::{AsyncRead, AsyncWrite, SinkExt, StreamExt};
use futures::{SinkExt, StreamExt};
use std::{
sync::{
atomic::{AtomicBool, Ordering},
Expand All @@ -16,20 +16,19 @@ use std::{
};
use tokio::{
self,
net::TcpListener,
net::{TcpListener, TcpStream},
select,
sync::{broadcast, mpsc, Notify, OnceCell},
};
use tokio_util::sync::CancellationToken;

async fn run_connection_loop<S>(
ws_stream: async_tungstenite::WebSocketStream<S>,
async fn run_connection_loop(
ws_stream: tokio_tungstenite::WebSocketStream<TcpStream>,
mut request_receiver: mpsc::Receiver<EngineMessage>,
response_sender: broadcast::Sender<IntifaceMessage>,
disconnect_notifier: Arc<Notify>,
cancellation_token: Arc<CancellationToken>,
) where
S: AsyncRead + AsyncWrite + Unpin,
)
{
info!("Starting websocket server connection event loop.");

Expand All @@ -54,7 +53,7 @@ async fn run_connection_loop<S>(
}
pong_count = 0;
if websocket_server_sender
.send(async_tungstenite::tungstenite::Message::Ping(vec!(0)))
.send(tokio_tungstenite::tungstenite::Message::Ping(vec!(0)))
.await
.is_err() {
warn!("Cannot send ping to client, considering connection closed.");
Expand All @@ -64,7 +63,7 @@ async fn run_connection_loop<S>(
serialized_msg = request_receiver.recv() => {
if let Some(serialized_msg) = serialized_msg {
if websocket_server_sender
.send(async_tungstenite::tungstenite::Message::Text(serde_json::to_string(&serialized_msg).unwrap()))
.send(tokio_tungstenite::tungstenite::Message::Text(serde_json::to_string(&serialized_msg).unwrap()))
.await
.is_err() {
warn!("Cannot send text value to server, considering connection closed.");
Expand All @@ -84,14 +83,14 @@ async fn run_connection_loop<S>(
match ws_data {
Ok(msg) => {
match msg {
async_tungstenite::tungstenite::Message::Text(text_msg) => {
tokio_tungstenite::tungstenite::Message::Text(text_msg) => {
trace!("Got text: {}", text_msg);
if response_sender.receiver_count() == 0 || response_sender.send(serde_json::from_str(&text_msg).unwrap()).is_err() {
warn!("Connector that owns transport no longer available, exiting.");
break;
}
}
async_tungstenite::tungstenite::Message::Close(_) => {
tokio_tungstenite::tungstenite::Message::Close(_) => {
info!("Closing websocket");
cancellation_token.cancel();
if websocket_server_sender.close().await.is_err() {
Expand All @@ -101,20 +100,20 @@ async fn run_connection_loop<S>(
//let _ = response_sender.send(ButtplugTransportIncomingMessage::Close("Websocket server closed".to_owned())).await;
break;
}
async_tungstenite::tungstenite::Message::Ping(_) => {
tokio_tungstenite::tungstenite::Message::Ping(_) => {
// noop
continue;
}
async_tungstenite::tungstenite::Message::Frame(_) => {
tokio_tungstenite::tungstenite::Message::Frame(_) => {
// noop
continue;
}
async_tungstenite::tungstenite::Message::Pong(_) => {
tokio_tungstenite::tungstenite::Message::Pong(_) => {
// noop
pong_count += 1;
continue;
}
async_tungstenite::tungstenite::Message::Binary(_) => {
tokio_tungstenite::tungstenite::Message::Binary(_) => {
error!("Don't know how to handle binary message types!");
}
}
Expand Down Expand Up @@ -203,7 +202,7 @@ impl Frontend for WebsocketFrontend {
debug!("Websocket: Listening on: {}", addr);
if let Ok((stream, _)) = listener.accept().await {
info!("Websocket: Got connection");
let ws_fut = async_tungstenite::tokio::accept_async(stream);
let ws_fut = tokio_tungstenite::accept_async(stream);
let ws_stream = ws_fut.await.map_err(|err| {
error!("Websocket server accept error: {:?}", err);
IntifaceError::new(&format!("Websocket server accept error: {:?}", err))
Expand Down

0 comments on commit 61b6d56

Please sign in to comment.