From 8e864f121d616fdd4307819e058133ff959f7e3f Mon Sep 17 00:00:00 2001 From: Garry O'Donnell Date: Fri, 23 Jun 2023 14:32:28 +0100 Subject: [PATCH] Add signalling utils to `bevy_matchbox` (#215) * Move socket extensions to own module * Tidy socket docs * Make matchbox_signaling topologies public * Add bevy_matchbox signaling feature * Move task handles to components / resources * Feature gate bevy signaling server --- Cargo.lock | 2 + bevy_matchbox/Cargo.toml | 5 + bevy_matchbox/src/lib.rs | 178 ++---------------- bevy_matchbox/src/signaling.rs | 161 ++++++++++++++++ bevy_matchbox/src/socket.rs | 176 +++++++++++++++++ matchbox_signaling/src/lib.rs | 3 +- .../src/topologies/client_server.rs | 2 + .../src/topologies/full_mesh.rs | 1 + matchbox_signaling/src/topologies/mod.rs | 9 +- 9 files changed, 368 insertions(+), 169 deletions(-) create mode 100644 bevy_matchbox/src/signaling.rs create mode 100644 bevy_matchbox/src/socket.rs diff --git a/Cargo.lock b/Cargo.lock index 20f194fb..635ea744 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -982,6 +982,8 @@ name = "bevy_matchbox" version = "0.6.0" dependencies = [ "bevy", + "cfg-if", + "matchbox_signaling", "matchbox_socket", ] diff --git a/bevy_matchbox/Cargo.toml b/bevy_matchbox/Cargo.toml index 6fc10f37..ee4ebbdd 100644 --- a/bevy_matchbox/Cargo.toml +++ b/bevy_matchbox/Cargo.toml @@ -21,7 +21,12 @@ readme = "../README.md" [features] ggrs = ["matchbox_socket/ggrs"] +signaling = ["matchbox_signaling"] [dependencies] bevy = { version = "0.10", default-features = false } matchbox_socket = { path = "../matchbox_socket", version = "0.6" } +cfg-if = "1.0" + +[target.'cfg(not(target_arch = "wasm32"))'.dependencies] +matchbox_signaling = { path = "../matchbox_signaling", version = "0.6", optional = true } diff --git a/bevy_matchbox/src/lib.rs b/bevy_matchbox/src/lib.rs index ee65ef52..a12dc796 100644 --- a/bevy_matchbox/src/lib.rs +++ b/bevy_matchbox/src/lib.rs @@ -2,183 +2,31 @@ #![doc = include_str!("../README.md")] #![forbid(unsafe_code)] -use bevy::{ - ecs::system::Command, - prelude::{Commands, Component, Deref, DerefMut, Resource, World}, - tasks::IoTaskPool, -}; -pub use matchbox_socket; -use matchbox_socket::{ - BuildablePlurality, MessageLoopFuture, SingleChannel, WebRtcSocket, WebRtcSocketBuilder, -}; -use std::marker::PhantomData; +use cfg_if::cfg_if; -/// A [`WebRtcSocket`] as a [`Component`] or [`Resource`]. -/// -/// As a [`Component`], directly -/// ``` -/// use bevy_matchbox::prelude::*; -/// use bevy::prelude::*; -/// -/// fn open_socket_system(mut commands: Commands) { -/// let room_url = "wss://matchbox.example.com"; -/// let builder = WebRtcSocketBuilder::new(room_url).add_channel(ChannelConfig::reliable()); -/// commands.spawn(MatchboxSocket::from(builder)); -/// } -/// -/// fn close_socket_system( -/// mut commands: Commands, -/// socket: Query>> -/// ) { -/// let socket = socket.single(); -/// commands.entity(socket).despawn(); -/// } -/// ``` -/// -/// As a [`Resource`], with [`Commands`] -/// ``` -/// # use bevy_matchbox::prelude::*; -/// # use bevy::prelude::*; -/// fn open_socket_system(mut commands: Commands) { -/// let room_url = "wss://matchbox.example.com"; -/// commands.open_socket(WebRtcSocketBuilder::new(room_url).add_channel(ChannelConfig::reliable())); -/// } -/// -/// fn close_socket_system(mut commands: Commands) { -/// commands.close_socket::(); -/// } -/// ``` -/// -/// As a [`Resource`], directly -/// -/// ``` -/// # use bevy_matchbox::prelude::*; -/// # use bevy::prelude::*; -/// fn open_socket_system(mut commands: Commands) { -/// let room_url = "wss://matchbox.example.com"; -/// -/// let socket: MatchboxSocket = MatchboxSocket::new_reliable(room_url); -/// -/// commands.insert_resource(socket); -/// } -/// -/// fn close_socket_system(mut commands: Commands) { -/// commands.remove_resource::>(); -/// } -/// ``` -#[derive(Resource, Component, Debug, Deref, DerefMut)] -pub struct MatchboxSocket(WebRtcSocket); +mod socket; +pub use socket::*; -impl From> for MatchboxSocket { - fn from(builder: WebRtcSocketBuilder) -> Self { - Self::from(builder.build()) - } -} - -impl From<(WebRtcSocket, MessageLoopFuture)> for MatchboxSocket { - fn from((socket, message_loop_fut): (WebRtcSocket, MessageLoopFuture)) -> Self { - let task_pool = IoTaskPool::get(); - task_pool.spawn(message_loop_fut).detach(); - MatchboxSocket(socket) - } -} - -/// A [`Command`] used to open a [`MatchboxSocket`] and allocate it as a resource. -struct OpenSocket(WebRtcSocketBuilder); - -impl Command for OpenSocket { - fn write(self, world: &mut World) { - world.insert_resource(MatchboxSocket::from(self.0)); - } -} - -/// A [`Commands`] extension used to open a [`MatchboxSocket`] and allocate it as a resource. -pub trait OpenSocketExt { - /// Opens a [`MatchboxSocket`] and allocates it as a resource. - fn open_socket(&mut self, socket_builder: WebRtcSocketBuilder); -} - -impl<'w, 's, C: BuildablePlurality + 'static> OpenSocketExt for Commands<'w, 's> { - fn open_socket(&mut self, socket_builder: WebRtcSocketBuilder) { - self.add(OpenSocket(socket_builder)) - } -} - -/// A [`Command`] used to close a [`WebRtcSocket`], deleting the [`MatchboxSocket`] resource. -struct CloseSocket(PhantomData); - -impl Command for CloseSocket { - fn write(self, world: &mut World) { - world.remove_resource::>(); - } -} - -/// A [`Commands`] extension used to close a [`WebRtcSocket`], deleting the [`MatchboxSocket`] -/// resource. -pub trait CloseSocketExt { - /// Delete the [`MatchboxSocket`] resource. - fn close_socket(&mut self); -} - -impl<'w, 's> CloseSocketExt for Commands<'w, 's> { - fn close_socket(&mut self) { - self.add(CloseSocket::(PhantomData::default())) +cfg_if! { + if #[cfg(all(not(target_arch = "wasm32"), feature = "signaling"))] { + mod signaling; + pub use signaling::*; } } /// use `bevy_matchbox::prelude::*;` to import common resources and commands pub mod prelude { pub use crate::{CloseSocketExt, MatchboxSocket, OpenSocketExt}; + use cfg_if::cfg_if; pub use matchbox_socket::{ BuildablePlurality, ChannelConfig, MultipleChannels, PeerId, PeerState, SingleChannel, WebRtcSocketBuilder, }; -} - -impl MatchboxSocket { - /// Create a new socket with a single unreliable channel - /// - /// ```rust - /// # use bevy_matchbox::prelude::*; - /// # use bevy::prelude::*; - /// # fn open_channel_system(mut commands: Commands) { - /// let room_url = "wss://matchbox.example.com"; - /// let socket = MatchboxSocket::new_unreliable(room_url); - /// commands.spawn(socket); - /// # } - /// ``` - pub fn new_unreliable(room_url: impl Into) -> MatchboxSocket { - Self::from(WebRtcSocket::new_unreliable(room_url)) - } - - /// Create a new socket with a single reliable channel - /// - /// ```rust - /// # use bevy_matchbox::prelude::*; - /// # use bevy::prelude::*; - /// # fn open_channel_system(mut commands: Commands) { - /// let room_url = "wss://matchbox.example.com"; - /// let socket = MatchboxSocket::new_reliable(room_url); - /// commands.spawn(socket); - /// # } - /// ``` - pub fn new_reliable(room_url: impl Into) -> MatchboxSocket { - Self::from(WebRtcSocket::new_reliable(room_url)) - } - /// Create a new socket with a single ggrs-compatible channel - /// - /// ```rust - /// # use bevy_matchbox::prelude::*; - /// # use bevy::prelude::*; - /// # fn open_channel_system(mut commands: Commands) { - /// let room_url = "wss://matchbox.example.com"; - /// let socket = MatchboxSocket::new_ggrs(room_url); - /// commands.spawn(socket); - /// # } - /// ``` - #[cfg(feature = "ggrs")] - pub fn new_ggrs(room_url: impl Into) -> MatchboxSocket { - Self::from(WebRtcSocket::new_ggrs(room_url)) + cfg_if! { + if #[cfg(all(not(target_arch = "wasm32"), feature = "signaling"))] { + pub use crate::signaling::{MatchboxServer, StartServerExt, StopServerExt}; + pub use matchbox_signaling::SignalingServerBuilder; + } } } diff --git a/bevy_matchbox/src/signaling.rs b/bevy_matchbox/src/signaling.rs new file mode 100644 index 00000000..dd9e1684 --- /dev/null +++ b/bevy_matchbox/src/signaling.rs @@ -0,0 +1,161 @@ +use std::net::SocketAddr; + +use bevy::{ + ecs::system::Command, + prelude::{Commands, Resource}, + tasks::{IoTaskPool, Task}, +}; +pub use matchbox_signaling; +use matchbox_signaling::{ + topologies::{ + client_server::{ClientServer, ClientServerCallbacks, ClientServerState}, + full_mesh::{FullMesh, FullMeshCallbacks, FullMeshState}, + SignalingTopology, + }, + Error, SignalingCallbacks, SignalingServer, SignalingServerBuilder, SignalingState, +}; + +/// A [`SignalingServer`] as a [`Resource`]. +/// +/// As a [`Resource`], with [`Commands`] +/// ``` +/// use std::net::Ipv4Addr; +/// use bevy_matchbox::{ +/// prelude::*, +/// matchbox_signaling::topologies::full_mesh::{FullMesh, FullMeshState} +/// }; +/// use bevy::prelude::*; +/// +/// fn start_server_system(mut commands: Commands) { +/// let builder = SignalingServerBuilder::new( +/// (Ipv4Addr::UNSPECIFIED, 3536), +/// FullMesh, +/// FullMeshState::default(), +/// ); +/// commands.start_server(builder); +/// } +/// +/// fn stop_server_system(mut commands: Commands) { +/// commands.stop_server(); +/// } +/// ``` +/// +/// As a [`Resource`], directly +/// ``` +/// use std::net::Ipv4Addr; +/// use bevy_matchbox::{ +/// prelude::*, +/// matchbox_signaling::topologies::full_mesh::{FullMesh, FullMeshState} +/// }; +/// use bevy::prelude::*; +/// +/// fn start_server_system(mut commands: Commands) { +/// let server: MatchboxServer = SignalingServerBuilder::new( +/// (Ipv4Addr::UNSPECIFIED, 3536), +/// FullMesh, +/// FullMeshState::default(), +/// ).into(); +/// +/// commands.insert_resource(MatchboxServer::from(server)); +/// } +/// +/// fn stop_server_system(mut commands: Commands) { +/// commands.remove_resource::(); +/// } +/// ``` +#[derive(Debug, Resource)] +pub struct MatchboxServer(Task>); + +impl From> for MatchboxServer +where + Topology: SignalingTopology, + Cb: SignalingCallbacks, + S: SignalingState, +{ + fn from(value: SignalingServerBuilder) -> Self { + MatchboxServer::from(value.build()) + } +} + +impl From for MatchboxServer { + fn from(server: SignalingServer) -> Self { + let task_pool = IoTaskPool::get(); + let task = task_pool.spawn(server.serve()); + MatchboxServer(task) + } +} + +struct StartServer(SignalingServerBuilder) +where + Topology: SignalingTopology, + Cb: SignalingCallbacks, + S: SignalingState; + +impl Command for StartServer +where + Topology: SignalingTopology + Send + 'static, + Cb: SignalingCallbacks, + S: SignalingState, +{ + fn write(self, world: &mut bevy::prelude::World) { + world.insert_resource(MatchboxServer::from(self.0)) + } +} + +/// A [`Commands`] extension used to start a [`MatchboxServer`]. +pub trait StartServerExt< + Topology: SignalingTopology, + Cb: SignalingCallbacks, + S: SignalingState, +> +{ + /// Starts a [`MatchboxServer`] and allocates it as a resource. + fn start_server(&mut self, builder: SignalingServerBuilder); +} + +impl<'w, 's, Topology, Cb, S> StartServerExt for Commands<'w, 's> +where + Topology: SignalingTopology + Send + 'static, + Cb: SignalingCallbacks, + S: SignalingState, +{ + fn start_server(&mut self, builder: SignalingServerBuilder) { + self.add(StartServer(builder)) + } +} + +struct StopServer; + +impl Command for StopServer { + fn write(self, world: &mut bevy::prelude::World) { + world.remove_resource::(); + } +} + +/// A [`Commands`] extension used to stop a [`MatchboxServer`]. +pub trait StopServerExt { + /// Delete the [`MatchboxServer`] resource. + fn stop_server(&mut self); +} + +impl<'w, 's> StopServerExt for Commands<'w, 's> { + fn stop_server(&mut self) { + self.add(StopServer) + } +} + +impl MatchboxServer { + /// Creates a new builder for a [`SignalingServer`] with full-mesh topology. + pub fn full_mesh_builder( + socket_addr: impl Into, + ) -> SignalingServerBuilder { + SignalingServer::full_mesh_builder(socket_addr) + } + + /// Creates a new builder for a [`SignalingServer`] with client-server topology. + pub fn client_server_builder( + socket_addr: impl Into, + ) -> SignalingServerBuilder { + SignalingServer::client_server_builder(socket_addr) + } +} diff --git a/bevy_matchbox/src/socket.rs b/bevy_matchbox/src/socket.rs new file mode 100644 index 00000000..75e091ca --- /dev/null +++ b/bevy_matchbox/src/socket.rs @@ -0,0 +1,176 @@ +use bevy::{ + ecs::system::Command, + prelude::{Commands, Component, Deref, DerefMut, Resource, World}, + tasks::IoTaskPool, +}; +pub use matchbox_socket; +use matchbox_socket::{ + BuildablePlurality, MessageLoopFuture, SingleChannel, WebRtcSocket, WebRtcSocketBuilder, +}; +use std::marker::PhantomData; + +/// A [`WebRtcSocket`] as a [`Component`] or [`Resource`]. +/// +/// As a [`Component`], directly +/// ``` +/// use bevy_matchbox::prelude::*; +/// use bevy::prelude::*; +/// +/// fn open_socket_system(mut commands: Commands) { +/// let room_url = "wss://matchbox.example.com"; +/// let builder = WebRtcSocketBuilder::new(room_url).add_channel(ChannelConfig::reliable()); +/// commands.spawn(MatchboxSocket::from(builder)); +/// } +/// +/// fn close_socket_system( +/// mut commands: Commands, +/// socket: Query>> +/// ) { +/// let socket = socket.single(); +/// commands.entity(socket).despawn(); +/// } +/// ``` +/// +/// As a [`Resource`], with [`Commands`] +/// ``` +/// use bevy_matchbox::prelude::*; +/// use bevy::prelude::*; +/// +/// fn open_socket_system(mut commands: Commands) { +/// let room_url = "wss://matchbox.example.com"; +/// commands.open_socket(WebRtcSocketBuilder::new(room_url).add_channel(ChannelConfig::reliable())); +/// } +/// +/// fn close_socket_system(mut commands: Commands) { +/// commands.close_socket::(); +/// } +/// ``` +/// +/// As a [`Resource`], directly +/// ``` +/// use bevy_matchbox::prelude::*; +/// use bevy::prelude::*; +/// +/// fn open_socket_system(mut commands: Commands) { +/// let room_url = "wss://matchbox.example.com"; +/// +/// let socket: MatchboxSocket = WebRtcSocketBuilder::new(room_url) +/// .add_channel(ChannelConfig::reliable()) +/// .into(); +/// +/// commands.insert_resource(socket); +/// } +/// +/// fn close_socket_system(mut commands: Commands) { +/// commands.remove_resource::>(); +/// } +/// ``` +#[derive(Resource, Component, Debug, Deref, DerefMut)] +pub struct MatchboxSocket(WebRtcSocket); + +impl From> for MatchboxSocket { + fn from(builder: WebRtcSocketBuilder) -> Self { + Self::from(builder.build()) + } +} + +impl From<(WebRtcSocket, MessageLoopFuture)> for MatchboxSocket { + fn from((socket, message_loop_fut): (WebRtcSocket, MessageLoopFuture)) -> Self { + let task_pool = IoTaskPool::get(); + task_pool.spawn(message_loop_fut).detach(); + MatchboxSocket(socket) + } +} + +/// A [`Command`] used to open a [`MatchboxSocket`] and allocate it as a resource. +struct OpenSocket(WebRtcSocketBuilder); + +impl Command for OpenSocket { + fn write(self, world: &mut World) { + world.insert_resource(MatchboxSocket::from(self.0)); + } +} + +/// A [`Commands`] extension used to open a [`MatchboxSocket`] and allocate it as a resource. +pub trait OpenSocketExt { + /// Opens a [`MatchboxSocket`] and allocates it as a resource. + fn open_socket(&mut self, socket_builder: WebRtcSocketBuilder); +} + +impl<'w, 's, C: BuildablePlurality + 'static> OpenSocketExt for Commands<'w, 's> { + fn open_socket(&mut self, socket_builder: WebRtcSocketBuilder) { + self.add(OpenSocket(socket_builder)) + } +} + +/// A [`Command`] used to close a [`WebRtcSocket`], deleting the [`MatchboxSocket`] resource. +struct CloseSocket(PhantomData); + +impl Command for CloseSocket { + fn write(self, world: &mut World) { + world.remove_resource::>(); + } +} + +/// A [`Commands`] extension used to close a [`WebRtcSocket`], deleting the [`MatchboxSocket`] resource. +pub trait CloseSocketExt { + /// Delete the [`MatchboxSocket`] resource. + fn close_socket(&mut self); +} + +impl<'w, 's> CloseSocketExt for Commands<'w, 's> { + fn close_socket(&mut self) { + self.add(CloseSocket::(PhantomData::default())) + } +} + +impl MatchboxSocket { + /// Create a new socket with a single unreliable channel + /// + /// ```rust + /// use bevy_matchbox::prelude::*; + /// use bevy::prelude::*; + /// + /// fn open_channel_system(mut commands: Commands) { + /// let room_url = "wss://matchbox.example.com"; + /// let socket = MatchboxSocket::new_unreliable(room_url); + /// commands.spawn(socket); + /// } + /// ``` + pub fn new_unreliable(room_url: impl Into) -> MatchboxSocket { + Self::from(WebRtcSocket::new_unreliable(room_url)) + } + + /// Create a new socket with a single reliable channel + /// + /// ```rust + /// use bevy_matchbox::prelude::*; + /// use bevy::prelude::*; + /// + /// fn open_channel_system(mut commands: Commands) { + /// let room_url = "wss://matchbox.example.com"; + /// let socket = MatchboxSocket::new_reliable(room_url); + /// commands.spawn(socket); + /// } + /// ``` + pub fn new_reliable(room_url: impl Into) -> MatchboxSocket { + Self::from(WebRtcSocket::new_reliable(room_url)) + } + + /// Create a new socket with a single ggrs-compatible channel + /// + /// ```rust + /// use bevy_matchbox::prelude::*; + /// use bevy::prelude::*; + /// + /// fn open_channel_system(mut commands: Commands) { + /// let room_url = "wss://matchbox.example.com"; + /// let socket = MatchboxSocket::new_ggrs(room_url); + /// commands.spawn(socket); + /// } + /// ``` + #[cfg(feature = "ggrs")] + pub fn new_ggrs(room_url: impl Into) -> MatchboxSocket { + Self::from(WebRtcSocket::new_ggrs(room_url)) + } +} diff --git a/matchbox_signaling/src/lib.rs b/matchbox_signaling/src/lib.rs index 597d06cd..e90688a4 100644 --- a/matchbox_signaling/src/lib.rs +++ b/matchbox_signaling/src/lib.rs @@ -3,7 +3,8 @@ #![forbid(unsafe_code)] mod error; mod signaling_server; -mod topologies; +/// Network topologies to be created by the [`SignalingServer`] +pub mod topologies; pub use error::Error; pub use signaling_server::{ diff --git a/matchbox_signaling/src/topologies/client_server.rs b/matchbox_signaling/src/topologies/client_server.rs index e483715c..5d484b14 100644 --- a/matchbox_signaling/src/topologies/client_server.rs +++ b/matchbox_signaling/src/topologies/client_server.rs @@ -17,6 +17,7 @@ use matchbox_protocol::{JsonPeerEvent, PeerId, PeerRequest}; use std::collections::HashMap; use tracing::{error, info, warn}; +/// A client server network topology #[derive(Debug, Default)] pub struct ClientServer; @@ -243,6 +244,7 @@ impl ClientServerState { .and_then(|(_id, sender)| try_send(sender, message)) } + /// Inform all clients that the host has disconnected. pub fn reset(&mut self) { // Safety: Lock must be scoped/dropped to ensure no deadlock with next section let host_id = { diff --git a/matchbox_signaling/src/topologies/full_mesh.rs b/matchbox_signaling/src/topologies/full_mesh.rs index 1b70b5cb..ff023ccf 100644 --- a/matchbox_signaling/src/topologies/full_mesh.rs +++ b/matchbox_signaling/src/topologies/full_mesh.rs @@ -17,6 +17,7 @@ use matchbox_protocol::{JsonPeerEvent, PeerId, PeerRequest}; use std::collections::HashMap; use tracing::{error, info, warn}; +/// A full mesh network topolgoy #[derive(Debug, Default)] pub struct FullMesh; diff --git a/matchbox_signaling/src/topologies/mod.rs b/matchbox_signaling/src/topologies/mod.rs index 44d04b31..0b13fe18 100644 --- a/matchbox_signaling/src/topologies/mod.rs +++ b/matchbox_signaling/src/topologies/mod.rs @@ -5,11 +5,13 @@ use async_trait::async_trait; use futures::{future::BoxFuture, Future}; use std::sync::Arc; +/// An implementation of a client server topolgy pub mod client_server; +/// An implementation of a full mesh topology pub mod full_mesh; #[derive(Clone)] -pub struct SignalingStateMachine( +pub(crate) struct SignalingStateMachine( #[allow(clippy::type_complexity)] pub Arc) -> BoxFuture<'static, ()> + Send + Sync>>, ); @@ -19,14 +21,14 @@ where Cb: SignalingCallbacks, S: SignalingState, { - pub fn from_topology(_: Topology) -> Self + pub(crate) fn from_topology(_: Topology) -> Self where Topology: SignalingTopology, { Self::new(|ws| >::state_machine(ws)) } - pub fn new(callback: F) -> Self + pub(crate) fn new(callback: F) -> Self where F: Fn(WsStateMeta) -> Fut + 'static + Send + Sync, Fut: Future + 'static + Send, @@ -35,6 +37,7 @@ where } } +/// Topology produced by the signaling server #[async_trait] pub trait SignalingTopology where