Skip to content

Commit

Permalink
Merge pull request #355 from johanhelsing/fix-bevy-matchbox-signaling…
Browse files Browse the repository at this point in the history
…-take-2

Fix bevy matchbox signaling panic (take 2)
  • Loading branch information
johanhelsing authored Oct 31, 2023
2 parents fe95d91 + 6703d38 commit 6791f92
Show file tree
Hide file tree
Showing 9 changed files with 117 additions and 90 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion bevy_matchbox/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ readme = "../README.md"

[features]
ggrs = ["matchbox_socket/ggrs"]
signaling = ["matchbox_signaling"]
signaling = ["dep:matchbox_signaling", "dep:async-compat"]

[dependencies]
bevy = { version = "0.11", default-features = false }
Expand All @@ -30,3 +30,4 @@ cfg-if = "1.0"

[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
matchbox_signaling = { version = "0.7", path = "../matchbox_signaling", optional = true }
async-compat = { version = "0.2", optional = true }
9 changes: 3 additions & 6 deletions bevy_matchbox/src/signaling.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use std::net::SocketAddr;

use async_compat::CompatExt;
use bevy::{
ecs::system::Command,
prelude::{Commands, Resource},
Expand All @@ -14,6 +13,7 @@ use matchbox_signaling::{
},
Error, SignalingCallbacks, SignalingServer, SignalingServerBuilder, SignalingState,
};
use std::net::SocketAddr;

/// A [`SignalingServer`] as a [`Resource`].
///
Expand Down Expand Up @@ -80,7 +80,7 @@ where
impl From<SignalingServer> for MatchboxServer {
fn from(server: SignalingServer) -> Self {
let task_pool = IoTaskPool::get();
let task = task_pool.spawn(server.serve());
let task = task_pool.spawn(server.serve().compat());
MatchboxServer(task)
}
}
Expand Down Expand Up @@ -179,7 +179,6 @@ mod tests {
}

#[test]
#[ignore]
// https://github.com/johanhelsing/matchbox/issues/350
fn start_signaling_without_panics() {
let mut app = App::new();
Expand All @@ -188,7 +187,5 @@ mod tests {
.add_systems(Startup, start_signaling);

app.update();

assert_eq!(0, 1);
}
}
36 changes: 18 additions & 18 deletions matchbox_server/src/topology.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,8 @@ mod tests {

#[tokio::test]
async fn ws_connect() {
let server = app();
let addr = server.local_addr();
let mut server = app();
let addr = server.bind().unwrap();
tokio::spawn(server.serve());

tokio_tungstenite::connect_async(format!("ws://{addr}/room_a"))
Expand All @@ -185,8 +185,8 @@ mod tests {

#[tokio::test]
async fn uuid_assigned() {
let server = app();
let addr = server.local_addr();
let mut server = app();
let addr = server.bind().unwrap();
tokio::spawn(server.serve());

let (mut client, _response) =
Expand All @@ -201,8 +201,8 @@ mod tests {

#[tokio::test]
async fn new_peer() {
let server = app();
let addr = server.local_addr();
let mut server = app();
let addr = server.bind().unwrap();
tokio::spawn(server.serve());

let (mut client_a, _response) =
Expand All @@ -226,8 +226,8 @@ mod tests {

#[tokio::test]
async fn disconnect_peer() {
let server = app();
let addr = server.local_addr();
let mut server = app();
let addr = server.bind().unwrap();
tokio::spawn(server.serve());

let (mut client_a, _response) =
Expand Down Expand Up @@ -257,8 +257,8 @@ mod tests {

#[tokio::test]
async fn signal() {
let server = app();
let addr = server.local_addr();
let mut server = app();
let addr = server.bind().unwrap();
tokio::spawn(server.serve());

let (mut client_a, _response) =
Expand Down Expand Up @@ -299,8 +299,8 @@ mod tests {

#[tokio::test]
async fn match_pairs() {
let server = app();
let addr = server.local_addr();
let mut server = app();
let addr = server.bind().unwrap();
tokio::spawn(server.serve());

let (mut client_a, _response) =
Expand Down Expand Up @@ -350,8 +350,8 @@ mod tests {
}
#[tokio::test]
async fn match_pair_and_other_alone_room_without_next() {
let server = app();
let addr = server.local_addr();
let mut server = app();
let addr = server.bind().unwrap();
tokio::spawn(server.serve());

let (mut client_a, _response) =
Expand Down Expand Up @@ -392,8 +392,8 @@ mod tests {

#[tokio::test]
async fn match_different_id_same_next() {
let server = app();
let addr = server.local_addr();
let mut server = app();
let addr = server.bind().unwrap();
tokio::spawn(server.serve());

let (mut client_a, _response) =
Expand Down Expand Up @@ -443,8 +443,8 @@ mod tests {
}
#[tokio::test]
async fn match_same_id_different_next() {
let server = app();
let addr = server.local_addr();
let mut server = app();
let addr = server.bind().unwrap();
tokio::spawn(server.serve());

let (mut client_a, _response) =
Expand Down
4 changes: 4 additions & 0 deletions matchbox_signaling/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,8 @@ pub enum Error {
/// An error occurring from hyper
#[error("Hyper error: {0}")]
Hyper(#[from] hyper::Error),

/// Couldn't bind to socket
#[error("Bind error: {0}")]
Bind(hyper::Error),
}
19 changes: 9 additions & 10 deletions matchbox_signaling/src/signaling_server/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,6 @@ where
}

/// Create a [`SignalingServer`].
///
/// # Panics
/// This method will panic if the socket address requested cannot be bound.
pub fn build(mut self) -> SignalingServer {
// Insert topology
let state_machine: SignalingStateMachine<Cb, S> =
Expand All @@ -130,14 +127,16 @@ where
.layer(Extension(self.shared_callbacks))
.layer(Extension(self.callbacks))
.layer(Extension(self.state));
let server = axum::Server::bind(&self.socket_addr).serve(
self.router
.into_make_service_with_connect_info::<SocketAddr>(),
);
let socket_addr = server.local_addr();

let info = self
.router
.into_make_service_with_connect_info::<SocketAddr>();

let socket_addr = self.socket_addr;
SignalingServer {
server,
socket_addr,
info,
requested_addr: socket_addr,
server: None,
}
}
}
47 changes: 36 additions & 11 deletions matchbox_signaling/src/signaling_server/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,22 @@ use crate::{
full_mesh::{FullMesh, FullMeshCallbacks, FullMeshState},
},
};
use axum::{extract::connect_info::IntoMakeServiceWithConnectInfo, Router, Server};
use hyper::server::conn::AddrIncoming;
use axum::{extract::connect_info::IntoMakeServiceWithConnectInfo, Router};
use hyper::{server::conn::AddrIncoming, Server};
use std::net::SocketAddr;

/// Contains the interface end of a signaling server
#[derive(Debug)]
pub struct SignalingServer {
/// The socket address bound for this server
pub(crate) socket_addr: SocketAddr,
/// The socket configured for this server
pub(crate) requested_addr: SocketAddr,

/// The low-level axum server
pub(crate) server: Server<AddrIncoming, IntoMakeServiceWithConnectInfo<Router, SocketAddr>>,
/// Low-level info for how to build an axum server
pub(crate) info: IntoMakeServiceWithConnectInfo<Router, SocketAddr>,

/// Low-level info for how to build an axum server
pub(crate) server:
Option<Server<AddrIncoming, IntoMakeServiceWithConnectInfo<Router, SocketAddr>>>,
}

/// Common methods
Expand All @@ -36,14 +40,35 @@ impl SignalingServer {
}

/// Returns the local address this server is bound to
pub fn local_addr(&self) -> SocketAddr {
self.socket_addr
///
/// The server needs to [`bind`] first
pub fn local_addr(&self) -> Option<SocketAddr> {
self.server.as_ref().map(|s| s.local_addr())
}

/// Binds the server to a socket
///
/// Optional: Will happen automatically on [`serve`]
pub fn bind(&mut self) -> Result<SocketAddr, crate::Error> {
let server = axum::Server::try_bind(&self.requested_addr)
.map_err(crate::Error::Bind)?
.serve(self.info.clone());

let addr = server.local_addr();
self.server = Some(server);
Ok(addr)
}

/// Serve the signaling server
pub async fn serve(self) -> Result<(), crate::Error> {
// TODO: Shouldn't this return Result<!, crate::Error>?
match self.server.await {
///
/// Will bind if not already bound
pub async fn serve(mut self) -> Result<(), crate::Error> {
if self.server.is_none() {
self.bind()?;
assert!(self.server.is_some());
}

match self.server.expect("no server, this is a bug").await {
Ok(()) => Ok(()),
Err(e) => Err(crate::Error::from(e)),
}
Expand Down
Loading

0 comments on commit 6791f92

Please sign in to comment.