From 4c875f45254c567ee97c80a2dbc4573c28dbb7b5 Mon Sep 17 00:00:00 2001 From: Caspar Krieger Date: Mon, 9 Dec 2024 22:09:35 +0800 Subject: [PATCH 1/4] refactor: drop typestate pattern for building sockets The typestate pattern caused a lot of boilerplate w.r.t. both the implementation and the usage of matchbox socket. There appeared to be only three actual uses of the typestate pattern: * Preventing client code from calling build() without adding a channel. * Some convenience methods to read/write from the first channel in the case where where was only one channel. * Making sure that ggrs::NonBlockingSocket is only implemented for WebRtcSocket in the case where it is a single channel socket, to avoid the case where a reliable socket is accidentally used as a GGRS socket (or a unreliable socket is used for GGRS + some other data). The first and (mostly) the last points can be accomplished at runtime via asserts, and owing to the "set it up and use it from one place" nature of this library I think it's highly unlikely that having a runtime assertion is problematic. This is a breaking change owing to the removal of the SingleChannel-variant convenience methods, but it should be trivially easy to migrate by looking at modified example code. --- bevy_matchbox/examples/hello.rs | 12 +- bevy_matchbox/src/lib.rs | 5 +- bevy_matchbox/src/socket.rs | 61 ++++---- examples/bevy_ggrs/src/main.rs | 2 +- examples/error_handling/src/main.rs | 8 +- examples/simple/src/main.rs | 6 +- matchbox_socket/src/ggrs_socket.rs | 61 +++----- matchbox_socket/src/lib.rs | 5 +- matchbox_socket/src/webrtc_socket/mod.rs | 3 +- matchbox_socket/src/webrtc_socket/socket.rs | 165 +++----------------- 10 files changed, 93 insertions(+), 235 deletions(-) diff --git a/bevy_matchbox/examples/hello.rs b/bevy_matchbox/examples/hello.rs index a27f6c8e..8757e6cc 100644 --- a/bevy_matchbox/examples/hello.rs +++ b/bevy_matchbox/examples/hello.rs @@ -4,6 +4,8 @@ use bevy::{prelude::*, time::common_conditions::on_timer, utils::Duration}; use bevy_matchbox::prelude::*; +const CHANNEL_ID: usize = 0; + fn main() { App::new() .add_plugins(DefaultPlugins) @@ -21,22 +23,24 @@ fn start_socket(mut commands: Commands) { commands.insert_resource(socket); } -fn send_message(mut socket: ResMut>) { +fn send_message(mut socket: ResMut) { let peers: Vec<_> = socket.connected_peers().collect(); for peer in peers { let message = "Hello"; info!("Sending message: {message:?} to {peer}"); - socket.send(message.as_bytes().into(), peer); + socket + .channel_mut(CHANNEL_ID) + .send(message.as_bytes().into(), peer); } } -fn receive_messages(mut socket: ResMut>) { +fn receive_messages(mut socket: ResMut) { for (peer, state) in socket.update_peers() { info!("{peer}: {state:?}"); } - for (_id, message) in socket.receive() { + for (_id, message) in socket.channel_mut(CHANNEL_ID).receive() { match std::str::from_utf8(&message) { Ok(message) => info!("Received message: {message:?}"), Err(e) => error!("Failed to convert message to string: {e}"), diff --git a/bevy_matchbox/src/lib.rs b/bevy_matchbox/src/lib.rs index a12dc796..4d64faf0 100644 --- a/bevy_matchbox/src/lib.rs +++ b/bevy_matchbox/src/lib.rs @@ -18,10 +18,7 @@ cfg_if! { pub mod prelude { pub use crate::{CloseSocketExt, MatchboxSocket, OpenSocketExt}; use cfg_if::cfg_if; - pub use matchbox_socket::{ - BuildablePlurality, ChannelConfig, MultipleChannels, PeerId, PeerState, SingleChannel, - WebRtcSocketBuilder, - }; + pub use matchbox_socket::{ChannelConfig, PeerId, PeerState, WebRtcSocketBuilder}; cfg_if! { if #[cfg(all(not(target_arch = "wasm32"), feature = "signaling"))] { diff --git a/bevy_matchbox/src/socket.rs b/bevy_matchbox/src/socket.rs index d285f56f..4469b154 100644 --- a/bevy_matchbox/src/socket.rs +++ b/bevy_matchbox/src/socket.rs @@ -4,12 +4,9 @@ use bevy::{ tasks::IoTaskPool, }; pub use matchbox_socket; -use matchbox_socket::{ - BuildablePlurality, MessageLoopFuture, SingleChannel, WebRtcSocket, WebRtcSocketBuilder, -}; +use matchbox_socket::{MessageLoopFuture, WebRtcSocket, WebRtcSocketBuilder}; use std::{ fmt::Debug, - marker::PhantomData, ops::{Deref, DerefMut}, }; @@ -28,7 +25,7 @@ use std::{ /// /// fn close_socket_system( /// mut commands: Commands, -/// socket: Query>> +/// socket: Query> /// ) { /// let socket = socket.single(); /// commands.entity(socket).despawn(); @@ -46,7 +43,7 @@ use std::{ /// } /// /// fn close_socket_system(mut commands: Commands) { -/// commands.close_socket::(); +/// commands.close_socket(); /// } /// ``` /// @@ -58,7 +55,7 @@ use std::{ /// fn open_socket_system(mut commands: Commands) { /// let room_url = "wss://matchbox.example.com"; /// -/// let socket: MatchboxSocket = WebRtcSocketBuilder::new(room_url) +/// let socket: MatchboxSocket = WebRtcSocketBuilder::new(room_url) /// .add_channel(ChannelConfig::reliable()) /// .into(); /// @@ -66,35 +63,35 @@ use std::{ /// } /// /// fn close_socket_system(mut commands: Commands) { -/// commands.remove_resource::>(); +/// commands.remove_resource::(); /// } /// ``` #[derive(Resource, Component, Debug)] #[allow(dead_code)] // keep the task alive so it doesn't drop before the socket -pub struct MatchboxSocket(WebRtcSocket, Box); +pub struct MatchboxSocket(WebRtcSocket, Box); -impl Deref for MatchboxSocket { - type Target = WebRtcSocket; +impl Deref for MatchboxSocket { + type Target = WebRtcSocket; fn deref(&self) -> &Self::Target { &self.0 } } -impl DerefMut for MatchboxSocket { +impl DerefMut for MatchboxSocket { fn deref_mut(&mut self) -> &mut Self::Target { &mut self.0 } } -impl From> for MatchboxSocket { - fn from(builder: WebRtcSocketBuilder) -> Self { +impl From for MatchboxSocket { + fn from(builder: WebRtcSocketBuilder) -> Self { Self::from(builder.build()) } } -impl From<(WebRtcSocket, MessageLoopFuture)> for MatchboxSocket { - fn from((socket, message_loop_fut): (WebRtcSocket, MessageLoopFuture)) -> Self { +impl From<(WebRtcSocket, MessageLoopFuture)> for MatchboxSocket { + fn from((socket, message_loop_fut): (WebRtcSocket, MessageLoopFuture)) -> Self { let task_pool = IoTaskPool::get(); let task = task_pool.spawn(message_loop_fut); MatchboxSocket(socket, Box::new(task)) @@ -102,32 +99,32 @@ impl From<(WebRtcSocket, MessageLoopFuture)> for Match } /// A [`Command`] used to open a [`MatchboxSocket`] and allocate it as a resource. -struct OpenSocket(WebRtcSocketBuilder); +struct OpenSocket(WebRtcSocketBuilder); -impl Command for OpenSocket { +impl Command for OpenSocket { fn apply(self, world: &mut World) { world.insert_resource(MatchboxSocket::from(self.0)); } } /// A [`Commands`] extension used to open a [`MatchboxSocket`] and allocate it as a resource. -pub trait OpenSocketExt { +pub trait OpenSocketExt { /// Opens a [`MatchboxSocket`] and allocates it as a resource. - fn open_socket(&mut self, socket_builder: WebRtcSocketBuilder); + fn open_socket(&mut self, socket_builder: WebRtcSocketBuilder); } -impl OpenSocketExt for Commands<'_, '_> { - fn open_socket(&mut self, socket_builder: WebRtcSocketBuilder) { +impl OpenSocketExt for Commands<'_, '_> { + fn open_socket(&mut self, socket_builder: WebRtcSocketBuilder) { self.add(OpenSocket(socket_builder)) } } /// A [`Command`] used to close a [`WebRtcSocket`], deleting the [`MatchboxSocket`] resource. -struct CloseSocket(PhantomData); +struct CloseSocket; -impl Command for CloseSocket { +impl Command for CloseSocket { fn apply(self, world: &mut World) { - world.remove_resource::>(); + world.remove_resource::(); } } @@ -135,16 +132,16 @@ impl Command for CloseSocket { /// resource. pub trait CloseSocketExt { /// Delete the [`MatchboxSocket`] resource. - fn close_socket(&mut self); + fn close_socket(&mut self); } impl CloseSocketExt for Commands<'_, '_> { - fn close_socket(&mut self) { - self.add(CloseSocket::(PhantomData)) + fn close_socket(&mut self) { + self.add(CloseSocket) } } -impl MatchboxSocket { +impl MatchboxSocket { /// Create a new socket with a single unreliable channel /// /// ```rust @@ -157,7 +154,7 @@ impl MatchboxSocket { /// commands.spawn(socket); /// } /// ``` - pub fn new_unreliable(room_url: impl Into) -> MatchboxSocket { + pub fn new_unreliable(room_url: impl Into) -> MatchboxSocket { Self::from(WebRtcSocket::new_unreliable(room_url)) } @@ -173,7 +170,7 @@ impl MatchboxSocket { /// commands.spawn(socket); /// } /// ``` - pub fn new_reliable(room_url: impl Into) -> MatchboxSocket { + pub fn new_reliable(room_url: impl Into) -> MatchboxSocket { Self::from(WebRtcSocket::new_reliable(room_url)) } @@ -190,7 +187,7 @@ impl MatchboxSocket { /// } /// ``` #[cfg(feature = "ggrs")] - pub fn new_ggrs(room_url: impl Into) -> MatchboxSocket { + pub fn new_ggrs(room_url: impl Into) -> MatchboxSocket { Self::from(WebRtcSocket::new_ggrs(room_url)) } } diff --git a/examples/bevy_ggrs/src/main.rs b/examples/bevy_ggrs/src/main.rs index 4202cfdf..1b945193 100644 --- a/examples/bevy_ggrs/src/main.rs +++ b/examples/bevy_ggrs/src/main.rs @@ -124,7 +124,7 @@ fn lobby_cleanup(query: Query>, mut commands: Commands) { fn lobby_system( mut app_state: ResMut>, args: Res, - mut socket: ResMut>, + mut socket: ResMut, mut commands: Commands, mut query: Query<&mut Text, With>, ) { diff --git a/examples/error_handling/src/main.rs b/examples/error_handling/src/main.rs index 3bb84fa0..7aa201a7 100644 --- a/examples/error_handling/src/main.rs +++ b/examples/error_handling/src/main.rs @@ -4,6 +4,8 @@ use log::{info, warn}; use matchbox_socket::{Error as SocketError, PeerId, PeerState, WebRtcSocket}; use std::time::Duration; +const CHANNEL_ID: usize = 0; + #[cfg(target_arch = "wasm32")] fn main() { // Setup logging @@ -65,7 +67,7 @@ async fn async_main() { PeerState::Connected => { info!("Peer joined: {peer}"); let packet = "hello friend!".as_bytes().to_vec().into_boxed_slice(); - socket.send(packet, peer); + socket.channel_mut(CHANNEL_ID).send(packet, peer); } PeerState::Disconnected => { info!("Peer left: {peer}"); @@ -74,7 +76,7 @@ async fn async_main() { } // Accept any messages incoming - for (peer, packet) in socket.receive() { + for (peer, packet) in socket.channel_mut(CHANNEL_ID).receive() { let message = String::from_utf8_lossy(&packet); info!("Message from {peer}: {message:?}"); } @@ -85,7 +87,7 @@ async fn async_main() { let peers: Vec = socket.connected_peers().collect(); for peer in peers { let packet = "ping!".as_bytes().to_vec().into_boxed_slice(); - socket.send(packet, peer); + socket.channel_mut(CHANNEL_ID).send(packet, peer); } timeout.reset(Duration::from_millis(10)); } diff --git a/examples/simple/src/main.rs b/examples/simple/src/main.rs index e4f63a3c..318138d2 100644 --- a/examples/simple/src/main.rs +++ b/examples/simple/src/main.rs @@ -4,6 +4,8 @@ use log::info; use matchbox_socket::{PeerState, WebRtcSocket}; use std::time::Duration; +const CHANNEL_ID: usize = 0; + #[cfg(target_arch = "wasm32")] fn main() { // Setup logging @@ -46,7 +48,7 @@ async fn async_main() { PeerState::Connected => { info!("Peer joined: {peer}"); let packet = "hello friend!".as_bytes().to_vec().into_boxed_slice(); - socket.send(packet, peer); + socket.channel_mut(CHANNEL_ID).send(packet, peer); } PeerState::Disconnected => { info!("Peer left: {peer}"); @@ -55,7 +57,7 @@ async fn async_main() { } // Accept any messages incoming - for (peer, packet) in socket.receive() { + for (peer, packet) in socket.channel_mut(CHANNEL_ID).receive() { let message = String::from_utf8_lossy(&packet); info!("Message from {peer}: {message:?}"); } diff --git a/matchbox_socket/src/ggrs_socket.rs b/matchbox_socket/src/ggrs_socket.rs index 650a680a..9abcbc88 100644 --- a/matchbox_socket/src/ggrs_socket.rs +++ b/matchbox_socket/src/ggrs_socket.rs @@ -1,13 +1,12 @@ -use std::marker::PhantomData; - use ggrs::{Message, PlayerType}; use matchbox_protocol::PeerId; use crate::{ - ChannelConfig, ChannelPlurality, MessageLoopFuture, MultipleChannels, NoChannels, Packet, - SingleChannel, WebRtcChannel, WebRtcSocket, WebRtcSocketBuilder, + ChannelConfig, MessageLoopFuture, Packet, WebRtcChannel, WebRtcSocket, WebRtcSocketBuilder, }; +pub const GGRS_CHANNEL_ID: usize = 0; + impl ChannelConfig { /// Creates a [`ChannelConfig`] suitable for use with GGRS. pub fn ggrs() -> Self { @@ -15,35 +14,18 @@ impl ChannelConfig { } } -impl WebRtcSocketBuilder { - /// Adds a new channel suitable for use with GGRS to the [`WebRtcSocket`] configuration. - pub fn add_ggrs_channel(mut self) -> WebRtcSocketBuilder { - self.config.channels.push(ChannelConfig::ggrs()); - WebRtcSocketBuilder { - config: self.config, - channel_plurality: PhantomData, - } - } -} - -impl WebRtcSocketBuilder { - /// Adds a new channel suitable for use with GGRS to the [`WebRtcSocket`] configuration. - pub fn add_ggrs_channel(mut self) -> WebRtcSocketBuilder { - self.config.channels.push(ChannelConfig::ggrs()); - WebRtcSocketBuilder { - config: self.config, - channel_plurality: PhantomData, - } - } -} -impl WebRtcSocketBuilder { +impl WebRtcSocketBuilder { /// Adds a new channel suitable for use with GGRS to the [`WebRtcSocket`] configuration. - pub fn add_ggrs_channel(mut self) -> WebRtcSocketBuilder { + /// + /// This must be called as the first channel. + pub fn add_ggrs_channel(mut self) -> WebRtcSocketBuilder { + assert_eq!( + self.config.channels.len(), + GGRS_CHANNEL_ID, + "ggrs channel is expected to be the first channel added" + ); self.config.channels.push(ChannelConfig::ggrs()); - WebRtcSocketBuilder { - config: self.config, - channel_plurality: PhantomData, - } + self } } @@ -55,16 +37,14 @@ impl WebRtcSocket { /// be sent and received. /// /// Please use the [`WebRtcSocketBuilder`] to create non-trivial sockets. - pub fn new_ggrs( - room_url: impl Into, - ) -> (WebRtcSocket, MessageLoopFuture) { + pub fn new_ggrs(room_url: impl Into) -> (WebRtcSocket, MessageLoopFuture) { WebRtcSocketBuilder::new(room_url) .add_channel(ChannelConfig::ggrs()) .build() } } -impl WebRtcSocket { +impl WebRtcSocket { /// Returns a Vec of connected peers as [`ggrs::PlayerType`] pub fn players(&mut self) -> Vec> { let Some(our_id) = self.id() else { @@ -100,12 +80,17 @@ fn deserialize_packet(message: (PeerId, Packet)) -> (PeerId, Message) { (message.0, bincode::deserialize(&message.1).unwrap()) } -impl ggrs::NonBlockingSocket for WebRtcSocket { +impl ggrs::NonBlockingSocket for WebRtcSocket { fn send_to(&mut self, msg: &Message, addr: &PeerId) { - self.send(build_packet(msg), *addr); + self.channel_mut(GGRS_CHANNEL_ID) + .send(build_packet(msg), *addr); } fn receive_all_messages(&mut self) -> Vec<(PeerId, Message)> { - self.receive().into_iter().map(deserialize_packet).collect() + self.channel_mut(GGRS_CHANNEL_ID) + .receive() + .into_iter() + .map(deserialize_packet) + .collect() } } diff --git a/matchbox_socket/src/lib.rs b/matchbox_socket/src/lib.rs index b5e07ceb..8bbeae21 100644 --- a/matchbox_socket/src/lib.rs +++ b/matchbox_socket/src/lib.rs @@ -10,7 +10,8 @@ mod webrtc_socket; pub use error::Error; pub use matchbox_protocol::PeerId; pub use webrtc_socket::{ - error::ChannelError, BuildablePlurality, ChannelConfig, ChannelPlurality, MessageLoopFuture, - MultipleChannels, NoChannels, Packet, PeerState, RtcIceServerConfig, SingleChannel, + error::ChannelError, ChannelConfig, MessageLoopFuture, Packet, PeerState, RtcIceServerConfig, WebRtcChannel, WebRtcSocket, WebRtcSocketBuilder, }; +#[cfg(feature = "ggrs")] +pub use ggrs_socket::GGRS_CHANNEL_ID; diff --git a/matchbox_socket/src/webrtc_socket/mod.rs b/matchbox_socket/src/webrtc_socket/mod.rs index 6b8e4041..7b8ce617 100644 --- a/matchbox_socket/src/webrtc_socket/mod.rs +++ b/matchbox_socket/src/webrtc_socket/mod.rs @@ -16,8 +16,7 @@ use matchbox_protocol::PeerId; use messages::*; pub(crate) use socket::MessageLoopChannels; pub use socket::{ - BuildablePlurality, ChannelConfig, ChannelPlurality, MultipleChannels, NoChannels, PeerState, - RtcIceServerConfig, SingleChannel, WebRtcChannel, WebRtcSocket, WebRtcSocketBuilder, + ChannelConfig, PeerState, RtcIceServerConfig, WebRtcChannel, WebRtcSocket, WebRtcSocketBuilder, }; use std::{collections::HashMap, pin::Pin, time::Duration}; diff --git a/matchbox_socket/src/webrtc_socket/socket.rs b/matchbox_socket/src/webrtc_socket/socket.rs index 11fbe0d0..79c96e5b 100644 --- a/matchbox_socket/src/webrtc_socket/socket.rs +++ b/matchbox_socket/src/webrtc_socket/socket.rs @@ -10,7 +10,7 @@ use futures::{future::Fuse, select, Future, FutureExt, StreamExt}; use futures_channel::mpsc::{SendError, TrySendError, UnboundedReceiver, UnboundedSender}; use log::{debug, error}; use matchbox_protocol::PeerId; -use std::{collections::HashMap, marker::PhantomData, pin::Pin, time::Duration}; +use std::{collections::HashMap, pin::Pin, time::Duration}; /// Configuration options for an ICE server connection. /// See also: @@ -73,31 +73,6 @@ impl Default for RtcIceServerConfig { } } -/// Tags types which are used to indicate the number of [`WebRtcChannel`]s or -/// [`ChannelConfig`]s in a [`WebRtcSocket`] or [`WebRtcSocketBuilder`] respectively. -pub trait ChannelPlurality: Send + Sync {} - -/// Tags types which are used to indicate a quantity of [`ChannelConfig`]s which can be -/// used to build a [`WebRtcSocket`]. -pub trait BuildablePlurality: ChannelPlurality {} - -/// Indicates that the type has no [`WebRtcChannel`]s or [`ChannelConfig`]s. -#[derive(Debug)] -pub struct NoChannels; -impl ChannelPlurality for NoChannels {} - -/// Indicates that the type has exactly one [`WebRtcChannel`] or [`ChannelConfig`]. -#[derive(Debug)] -pub struct SingleChannel; -impl ChannelPlurality for SingleChannel {} -impl BuildablePlurality for SingleChannel {} - -/// Indicates that the type has more than one [`WebRtcChannel`]s or [`ChannelConfig`]s. -#[derive(Debug)] -pub struct MultipleChannels; -impl ChannelPlurality for MultipleChannels {} -impl BuildablePlurality for MultipleChannels {} - #[derive(Debug, Clone)] pub(crate) struct SocketConfig { /// The url for the room to connect to @@ -128,9 +103,8 @@ pub(crate) struct SocketConfig { /// [`WebRtcSocketBuilder::add_channel`] before calling /// [`WebRtcSocketBuilder::build`] to produce the desired [`WebRtcSocket`]. #[derive(Debug, Clone)] -pub struct WebRtcSocketBuilder { +pub struct WebRtcSocketBuilder { pub(crate) config: SocketConfig, - pub(crate) channel_plurality: PhantomData, } impl WebRtcSocketBuilder { @@ -148,7 +122,6 @@ impl WebRtcSocketBuilder { attempts: Some(3), keep_alive_interval: Some(Duration::from_secs(10)), }, - channel_plurality: PhantomData, } } @@ -179,101 +152,35 @@ impl WebRtcSocketBuilder { self.config.keep_alive_interval = interval; self } -} -impl WebRtcSocketBuilder { /// Adds a new channel to the [`WebRtcSocket`] configuration according to a [`ChannelConfig`]. - pub fn add_channel(mut self, config: ChannelConfig) -> WebRtcSocketBuilder { - self.config.channels.push(config); - WebRtcSocketBuilder { - config: self.config, - channel_plurality: PhantomData, - } - } - - /// Adds a new unreliable channel to the [`WebRtcSocket`] configuration. - pub fn add_unreliable_channel(mut self) -> WebRtcSocketBuilder { - self.config.channels.push(ChannelConfig::unreliable()); - WebRtcSocketBuilder { - config: self.config, - channel_plurality: PhantomData, - } - } - - /// Adds a new reliable channel to the [`WebRtcSocket`] configuration. - pub fn add_reliable_channel(mut self) -> WebRtcSocketBuilder { - self.config.channels.push(ChannelConfig::reliable()); - WebRtcSocketBuilder { - config: self.config, - channel_plurality: PhantomData, - } - } -} - -impl WebRtcSocketBuilder { - /// Adds a new channel to the [`WebRtcSocket`] configuration according to a [`ChannelConfig`]. - pub fn add_channel(mut self, config: ChannelConfig) -> WebRtcSocketBuilder { - self.config.channels.push(config); - WebRtcSocketBuilder { - config: self.config, - channel_plurality: PhantomData, - } - } - - /// Adds a new unreliable channel to the [`WebRtcSocket`] configuration. - pub fn add_unreliable_channel(mut self) -> WebRtcSocketBuilder { - self.config.channels.push(ChannelConfig::unreliable()); - WebRtcSocketBuilder { - config: self.config, - channel_plurality: PhantomData, - } - } - - /// Adds a new reliable channel to the [`WebRtcSocket`] configuration. - pub fn add_reliable_channel(mut self) -> WebRtcSocketBuilder { - self.config.channels.push(ChannelConfig::reliable()); - WebRtcSocketBuilder { - config: self.config, - channel_plurality: PhantomData, - } - } -} -impl WebRtcSocketBuilder { - /// Adds a new channel to the [`WebRtcSocket`] configuration according to a [`ChannelConfig`]. - pub fn add_channel(mut self, config: ChannelConfig) -> WebRtcSocketBuilder { + pub fn add_channel(mut self, config: ChannelConfig) -> WebRtcSocketBuilder { self.config.channels.push(config); self } /// Adds a new unreliable channel to the [`WebRtcSocket`] configuration. - pub fn add_unreliable_channel(mut self) -> WebRtcSocketBuilder { + pub fn add_unreliable_channel(mut self) -> WebRtcSocketBuilder { self.config.channels.push(ChannelConfig::unreliable()); - WebRtcSocketBuilder { - config: self.config, - channel_plurality: PhantomData, - } + self } /// Adds a new reliable channel to the [`WebRtcSocket`] configuration. - pub fn add_reliable_channel(mut self) -> WebRtcSocketBuilder { + pub fn add_reliable_channel(mut self) -> WebRtcSocketBuilder { self.config.channels.push(ChannelConfig::reliable()); - WebRtcSocketBuilder { - config: self.config, - channel_plurality: PhantomData, - } + self } -} -impl WebRtcSocketBuilder { /// Creates a [`WebRtcSocket`] and the corresponding [`MessageLoopFuture`] according to the /// configuration supplied. /// /// The returned [`MessageLoopFuture`] should be awaited in order for messages to be sent and /// received. - pub fn build(self) -> (WebRtcSocket, MessageLoopFuture) { - if self.config.channels.is_empty() { - unreachable!(); - } + pub fn build(self) -> (WebRtcSocket, MessageLoopFuture) { + assert!( + !self.config.channels.is_empty(), + "Must have added at least one channel" + ); let (peer_state_tx, peer_state_rx) = futures_channel::mpsc::unbounded(); @@ -315,7 +222,6 @@ impl WebRtcSocketBuilder { peer_state_rx, peers: Default::default(), channels, - channel_plurality: PhantomData, }, Box::pin(socket_fut), ) @@ -393,13 +299,12 @@ impl WebRtcChannel { /// Contains a set of [`WebRtcChannel`]s and connection metadata. #[derive(Debug)] -pub struct WebRtcSocket { +pub struct WebRtcSocket { id: once_cell::race::OnceBox, id_rx: futures_channel::oneshot::Receiver, peer_state_rx: futures_channel::mpsc::UnboundedReceiver<(PeerId, PeerState)>, peers: HashMap, channels: Vec>, - channel_plurality: PhantomData, } impl WebRtcSocket { @@ -419,9 +324,7 @@ impl WebRtcSocket { /// be sent and received. /// /// Please use the [`WebRtcSocketBuilder`] to create non-trivial sockets. - pub fn new_unreliable( - room_url: impl Into, - ) -> (WebRtcSocket, MessageLoopFuture) { + pub fn new_unreliable(room_url: impl Into) -> (WebRtcSocket, MessageLoopFuture) { WebRtcSocketBuilder::new(room_url) .add_channel(ChannelConfig::unreliable()) .build() @@ -434,16 +337,14 @@ impl WebRtcSocket { /// be sent and received. /// /// Please use the [`WebRtcSocketBuilder`] to create non-trivial sockets. - pub fn new_reliable( - room_url: impl Into, - ) -> (WebRtcSocket, MessageLoopFuture) { + pub fn new_reliable(room_url: impl Into) -> (WebRtcSocket, MessageLoopFuture) { WebRtcSocketBuilder::new(room_url) .add_channel(ChannelConfig::reliable()) .build() } } -impl WebRtcSocket { +impl WebRtcSocket { // Todo: Disconnect from the peer, severing all communication channels. // pub fn disconnect(&mut self, peer: PeerId) {} @@ -651,39 +552,9 @@ impl WebRtcSocket { .take() .ok_or(ChannelError::Taken) } -} - -impl WebRtcSocket { - /// Call this where you want to handle new received messages. - /// - /// Messages are removed from the socket when called. - pub fn receive(&mut self) -> Vec<(PeerId, Packet)> { - self.channel_mut(0).receive() - } - - /// Try to send a packet to the given peer. An error is propagated if the socket future - /// is dropped. `Ok` is not a guarantee of delivery. - pub fn try_send(&mut self, packet: Packet, peer: PeerId) -> Result<(), SendError> { - self.channel_mut(0).try_send(packet, peer) - } - - /// Send a packet to the given peer. There is no guarantee of delivery. - /// - /// # Panics - /// Panics if socket future is dropped. - pub fn send(&mut self, packet: Packet, peer: PeerId) { - self.try_send(packet, peer).expect("Send failed"); - } - - /// Returns whether the socket channel is closed - pub fn is_closed(&self) -> bool { - self.channel(0).is_closed() - } -} -impl WebRtcSocket { /// Returns whether any socket channel is closed - pub fn any_closed(&self) -> bool { + pub fn any_channel_closed(&self) -> bool { self.channels .iter() .filter_map(Option::as_ref) @@ -691,7 +562,7 @@ impl WebRtcSocket { } /// Returns whether all socket channels are closed - pub fn all_closed(&self) -> bool { + pub fn all_channels_closed(&self) -> bool { self.channels .iter() .filter_map(Option::as_ref) From 469e6940ef68d712f0fc022adc705d3efce7fb7b Mon Sep 17 00:00:00 2001 From: Caspar Krieger Date: Sat, 14 Dec 2024 14:29:43 +0800 Subject: [PATCH 2/4] Remove GGRS-specific builders and ggrs impl for WebRtcSocket To create GGRS sockets, users can still create new sockets by specifying ChannelConfig::unreliable() and using WebRtcSocket::take_channel() to detach the channel so it can be owned by GGRS. This makes it clearer that there's nothing special about GGRS sockets. --- bevy_matchbox/src/signaling.rs | 6 +- bevy_matchbox/src/socket.rs | 17 ----- examples/bevy_ggrs/src/box_game.rs | 15 +++-- examples/bevy_ggrs/src/main.rs | 8 ++- matchbox_socket/src/ggrs_socket.rs | 71 +++++---------------- matchbox_socket/src/lib.rs | 2 - matchbox_socket/src/webrtc_socket/socket.rs | 39 +++++++---- 7 files changed, 59 insertions(+), 99 deletions(-) diff --git a/bevy_matchbox/src/signaling.rs b/bevy_matchbox/src/signaling.rs index f1ab5796..9c1e2a11 100644 --- a/bevy_matchbox/src/signaling.rs +++ b/bevy_matchbox/src/signaling.rs @@ -163,8 +163,10 @@ impl MatchboxServer { #[cfg(test)] mod tests { - use crate::matchbox_signaling::topologies::client_server::{ClientServer, ClientServerState}; - use crate::prelude::*; + use crate::{ + matchbox_signaling::topologies::client_server::{ClientServer, ClientServerState}, + prelude::*, + }; use bevy::prelude::*; use std::net::Ipv4Addr; diff --git a/bevy_matchbox/src/socket.rs b/bevy_matchbox/src/socket.rs index 4469b154..e4e1c457 100644 --- a/bevy_matchbox/src/socket.rs +++ b/bevy_matchbox/src/socket.rs @@ -173,21 +173,4 @@ impl MatchboxSocket { pub fn new_reliable(room_url: impl Into) -> MatchboxSocket { Self::from(WebRtcSocket::new_reliable(room_url)) } - - /// Create a new socket with a single ggrs-compatible channel - /// - /// ```rust - /// use bevy_matchbox::prelude::*; - /// use bevy::prelude::*; - /// - /// fn open_channel_system(mut commands: Commands) { - /// let room_url = "wss://matchbox.example.com"; - /// let socket = MatchboxSocket::new_ggrs(room_url); - /// commands.spawn(socket); - /// } - /// ``` - #[cfg(feature = "ggrs")] - pub fn new_ggrs(room_url: impl Into) -> MatchboxSocket { - Self::from(WebRtcSocket::new_ggrs(room_url)) - } } diff --git a/examples/bevy_ggrs/src/box_game.rs b/examples/bevy_ggrs/src/box_game.rs index 3de7ca71..5aa5090c 100644 --- a/examples/bevy_ggrs/src/box_game.rs +++ b/examples/bevy_ggrs/src/box_game.rs @@ -24,8 +24,8 @@ const FRICTION: f32 = 0.0018; const PLANE_SIZE: f32 = 5.0; const CUBE_SIZE: f32 = 0.2; -// You need to define a config struct to bundle all the generics of GGRS. bevy_ggrs provides a sensible default in `GgrsConfig`. -// (optional) You can define a type here for brevity. +// You need to define a config struct to bundle all the generics of GGRS. bevy_ggrs provides a +// sensible default in `GgrsConfig`. (optional) You can define a type here for brevity. pub type BoxConfig = GgrsConfig; #[repr(C)] @@ -58,7 +58,8 @@ pub struct FrameCount { pub frame: u32, } -/// Collects player inputs during [`ReadInputs`](`bevy_ggrs::ReadInputs`) and creates a [`LocalInputs`] resource. +/// Collects player inputs during [`ReadInputs`](`bevy_ggrs::ReadInputs`) and creates a +/// [`LocalInputs`] resource. pub fn read_local_inputs( mut commands: Commands, keyboard_input: Res>, @@ -154,8 +155,9 @@ pub fn setup_scene( } // Example system, manipulating a resource, will be added to the rollback schedule. -// Increases the frame count by 1 every update step. If loading and saving resources works correctly, -// you should see this resource rolling back, counting back up and finally increasing by 1 every update step +// Increases the frame count by 1 every update step. If loading and saving resources works +// correctly, you should see this resource rolling back, counting back up and finally increasing by +// 1 every update step #[allow(dead_code)] pub fn increase_frame_system(mut frame_count: ResMut) { frame_count.frame += 1; @@ -167,7 +169,8 @@ pub fn increase_frame_system(mut frame_count: ResMut) { #[allow(dead_code)] pub fn move_cube_system( mut query: Query<(&mut Transform, &mut Velocity, &Player), With>, - // ^------^ Added by `add_rollback` earlier + // ^------^ Added by + // `add_rollback` earlier inputs: Res>, // Thanks to RollbackTimePlugin, this is rollback safe time: Res