From 61b6d565bf2a075014cf4d930f2cfdc019d136ed Mon Sep 17 00:00:00 2001 From: Kyle Machulis Date: Mon, 20 Nov 2023 18:06:03 -0800 Subject: [PATCH] chore: move to tokio-tungstenite --- Cargo.toml | 2 +- src/frontend/websocket_frontend.rs | 29 ++++++++++++++--------------- 2 files changed, 15 insertions(+), 16 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index fcdd9d2..19c89f5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -44,7 +44,6 @@ serde = "1.0.192" serde_json = "1.0.108" # sentry = { version = "0.27.0", optional = true } # sentry-tracing = { version = "0.27.0", optional = true } -async-tungstenite = { version = "0.23.0", features = ["tokio-runtime"] } thiserror = "1.0.50" getset = "0.1.2" async-trait = "0.1.74" @@ -54,6 +53,7 @@ console-subscriber = { version="0.2.0", optional = true } mdns-sd = "0.8.1" local-ip-address = "0.5.6" rand = "0.8.5" +tokio-tungstenite = "0.20.1" [build-dependencies] vergen = "7.5.1" diff --git a/src/frontend/websocket_frontend.rs b/src/frontend/websocket_frontend.rs index c226d3d..2423ae0 100644 --- a/src/frontend/websocket_frontend.rs +++ b/src/frontend/websocket_frontend.rs @@ -6,7 +6,7 @@ use crate::error::IntifaceError; use async_trait::async_trait; use futures::FutureExt; -use futures::{AsyncRead, AsyncWrite, SinkExt, StreamExt}; +use futures::{SinkExt, StreamExt}; use std::{ sync::{ atomic::{AtomicBool, Ordering}, @@ -16,20 +16,19 @@ use std::{ }; use tokio::{ self, - net::TcpListener, + net::{TcpListener, TcpStream}, select, sync::{broadcast, mpsc, Notify, OnceCell}, }; use tokio_util::sync::CancellationToken; -async fn run_connection_loop( - ws_stream: async_tungstenite::WebSocketStream, +async fn run_connection_loop( + ws_stream: tokio_tungstenite::WebSocketStream, mut request_receiver: mpsc::Receiver, response_sender: broadcast::Sender, disconnect_notifier: Arc, cancellation_token: Arc, -) where - S: AsyncRead + AsyncWrite + Unpin, +) { info!("Starting websocket server connection event loop."); @@ -54,7 +53,7 @@ async fn run_connection_loop( } pong_count = 0; if websocket_server_sender - .send(async_tungstenite::tungstenite::Message::Ping(vec!(0))) + .send(tokio_tungstenite::tungstenite::Message::Ping(vec!(0))) .await .is_err() { warn!("Cannot send ping to client, considering connection closed."); @@ -64,7 +63,7 @@ async fn run_connection_loop( serialized_msg = request_receiver.recv() => { if let Some(serialized_msg) = serialized_msg { if websocket_server_sender - .send(async_tungstenite::tungstenite::Message::Text(serde_json::to_string(&serialized_msg).unwrap())) + .send(tokio_tungstenite::tungstenite::Message::Text(serde_json::to_string(&serialized_msg).unwrap())) .await .is_err() { warn!("Cannot send text value to server, considering connection closed."); @@ -84,14 +83,14 @@ async fn run_connection_loop( match ws_data { Ok(msg) => { match msg { - async_tungstenite::tungstenite::Message::Text(text_msg) => { + tokio_tungstenite::tungstenite::Message::Text(text_msg) => { trace!("Got text: {}", text_msg); if response_sender.receiver_count() == 0 || response_sender.send(serde_json::from_str(&text_msg).unwrap()).is_err() { warn!("Connector that owns transport no longer available, exiting."); break; } } - async_tungstenite::tungstenite::Message::Close(_) => { + tokio_tungstenite::tungstenite::Message::Close(_) => { info!("Closing websocket"); cancellation_token.cancel(); if websocket_server_sender.close().await.is_err() { @@ -101,20 +100,20 @@ async fn run_connection_loop( //let _ = response_sender.send(ButtplugTransportIncomingMessage::Close("Websocket server closed".to_owned())).await; break; } - async_tungstenite::tungstenite::Message::Ping(_) => { + tokio_tungstenite::tungstenite::Message::Ping(_) => { // noop continue; } - async_tungstenite::tungstenite::Message::Frame(_) => { + tokio_tungstenite::tungstenite::Message::Frame(_) => { // noop continue; } - async_tungstenite::tungstenite::Message::Pong(_) => { + tokio_tungstenite::tungstenite::Message::Pong(_) => { // noop pong_count += 1; continue; } - async_tungstenite::tungstenite::Message::Binary(_) => { + tokio_tungstenite::tungstenite::Message::Binary(_) => { error!("Don't know how to handle binary message types!"); } } @@ -203,7 +202,7 @@ impl Frontend for WebsocketFrontend { debug!("Websocket: Listening on: {}", addr); if let Ok((stream, _)) = listener.accept().await { info!("Websocket: Got connection"); - let ws_fut = async_tungstenite::tokio::accept_async(stream); + let ws_fut = tokio_tungstenite::accept_async(stream); let ws_stream = ws_fut.await.map_err(|err| { error!("Websocket server accept error: {:?}", err); IntifaceError::new(&format!("Websocket server accept error: {:?}", err))