Skip to content

Commit

Permalink
Merge pull request #468 from caspark/no-typestate-pattern
Browse files Browse the repository at this point in the history
Refactor: Drop typestate pattern for building sockets
  • Loading branch information
johanhelsing authored Dec 16, 2024
2 parents eba5c4d + 9d74446 commit 06b62a4
Show file tree
Hide file tree
Showing 14 changed files with 140 additions and 325 deletions.
12 changes: 8 additions & 4 deletions bevy_matchbox/examples/hello.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
use bevy::{prelude::*, time::common_conditions::on_timer, utils::Duration};
use bevy_matchbox::prelude::*;

const CHANNEL_ID: usize = 0;

fn main() {
App::new()
.add_plugins(DefaultPlugins)
Expand All @@ -21,22 +23,24 @@ fn start_socket(mut commands: Commands) {
commands.insert_resource(socket);
}

fn send_message(mut socket: ResMut<MatchboxSocket<SingleChannel>>) {
fn send_message(mut socket: ResMut<MatchboxSocket>) {
let peers: Vec<_> = socket.connected_peers().collect();

for peer in peers {
let message = "Hello";
info!("Sending message: {message:?} to {peer}");
socket.send(message.as_bytes().into(), peer);
socket
.channel_mut(CHANNEL_ID)
.send(message.as_bytes().into(), peer);
}
}

fn receive_messages(mut socket: ResMut<MatchboxSocket<SingleChannel>>) {
fn receive_messages(mut socket: ResMut<MatchboxSocket>) {
for (peer, state) in socket.update_peers() {
info!("{peer}: {state:?}");
}

for (_id, message) in socket.receive() {
for (_id, message) in socket.channel_mut(CHANNEL_ID).receive() {
match std::str::from_utf8(&message) {
Ok(message) => info!("Received message: {message:?}"),
Err(e) => error!("Failed to convert message to string: {e}"),
Expand Down
8 changes: 4 additions & 4 deletions bevy_matchbox/examples/hello_host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,22 +60,22 @@ fn start_host_socket(mut commands: Commands) {
commands.insert_resource(socket);
}

fn send_message(mut socket: ResMut<MatchboxSocket<SingleChannel>>) {
fn send_message(mut socket: ResMut<MatchboxSocket>) {
let peers: Vec<_> = socket.connected_peers().collect();

for peer in peers {
let message = "Hello, I'm the host";
info!("Sending message: {message:?} to {peer}");
socket.send(message.as_bytes().into(), peer);
socket.channel_mut(0).send(message.as_bytes().into(), peer);
}
}

fn receive_messages(mut socket: ResMut<MatchboxSocket<SingleChannel>>) {
fn receive_messages(mut socket: ResMut<MatchboxSocket>) {
for (peer, state) in socket.update_peers() {
info!("{peer}: {state:?}");
}

for (_id, message) in socket.receive() {
for (_id, message) in socket.channel_mut(0).receive() {
match std::str::from_utf8(&message) {
Ok(message) => info!("Received message: {message:?}"),
Err(e) => error!("Failed to convert message to string: {e}"),
Expand Down
5 changes: 1 addition & 4 deletions bevy_matchbox/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,7 @@ cfg_if! {
pub mod prelude {
pub use crate::{CloseSocketExt, MatchboxSocket, OpenSocketExt};
use cfg_if::cfg_if;
pub use matchbox_socket::{
BuildablePlurality, ChannelConfig, MultipleChannels, PeerId, PeerState, SingleChannel,
WebRtcSocketBuilder,
};
pub use matchbox_socket::{ChannelConfig, PeerId, PeerState, WebRtcSocketBuilder};

cfg_if! {
if #[cfg(all(not(target_arch = "wasm32"), feature = "signaling"))] {
Expand Down
6 changes: 4 additions & 2 deletions bevy_matchbox/src/signaling.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,10 @@ impl MatchboxServer {

#[cfg(test)]
mod tests {
use crate::matchbox_signaling::topologies::client_server::{ClientServer, ClientServerState};
use crate::prelude::*;
use crate::{
matchbox_signaling::topologies::client_server::{ClientServer, ClientServerState},
prelude::*,
};
use bevy::prelude::*;
use std::net::Ipv4Addr;

Expand Down
76 changes: 28 additions & 48 deletions bevy_matchbox/src/socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,9 @@ use bevy::{
tasks::IoTaskPool,
};
pub use matchbox_socket;
use matchbox_socket::{
BuildablePlurality, MessageLoopFuture, SingleChannel, WebRtcSocket, WebRtcSocketBuilder,
};
use matchbox_socket::{MessageLoopFuture, WebRtcSocket, WebRtcSocketBuilder};
use std::{
fmt::Debug,
marker::PhantomData,
ops::{Deref, DerefMut},
};

Expand All @@ -28,7 +25,7 @@ use std::{
///
/// fn close_socket_system(
/// mut commands: Commands,
/// socket: Query<Entity, With<MatchboxSocket<SingleChannel>>>
/// socket: Query<Entity, With<MatchboxSocket>>
/// ) {
/// let socket = socket.single();
/// commands.entity(socket).despawn();
Expand All @@ -46,7 +43,7 @@ use std::{
/// }
///
/// fn close_socket_system(mut commands: Commands) {
/// commands.close_socket::<SingleChannel>();
/// commands.close_socket();
/// }
/// ```
///
Expand All @@ -58,93 +55,93 @@ use std::{
/// fn open_socket_system(mut commands: Commands) {
/// let room_url = "wss://matchbox.example.com";
///
/// let socket: MatchboxSocket<SingleChannel> = WebRtcSocketBuilder::new(room_url)
/// let socket: MatchboxSocket = WebRtcSocketBuilder::new(room_url)
/// .add_channel(ChannelConfig::reliable())
/// .into();
///
/// commands.insert_resource(socket);
/// }
///
/// fn close_socket_system(mut commands: Commands) {
/// commands.remove_resource::<MatchboxSocket<SingleChannel>>();
/// commands.remove_resource::<MatchboxSocket>();
/// }
/// ```
#[derive(Resource, Component, Debug)]
#[allow(dead_code)] // keep the task alive so it doesn't drop before the socket
pub struct MatchboxSocket<C: BuildablePlurality>(WebRtcSocket<C>, Box<dyn Debug + Send + Sync>);
pub struct MatchboxSocket(WebRtcSocket, Box<dyn Debug + Send + Sync>);

impl<C: BuildablePlurality> Deref for MatchboxSocket<C> {
type Target = WebRtcSocket<C>;
impl Deref for MatchboxSocket {
type Target = WebRtcSocket;

fn deref(&self) -> &Self::Target {
&self.0
}
}

impl<C: BuildablePlurality> DerefMut for MatchboxSocket<C> {
impl DerefMut for MatchboxSocket {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}

impl<C: BuildablePlurality> From<WebRtcSocketBuilder<C>> for MatchboxSocket<C> {
fn from(builder: WebRtcSocketBuilder<C>) -> Self {
impl From<WebRtcSocketBuilder> for MatchboxSocket {
fn from(builder: WebRtcSocketBuilder) -> Self {
Self::from(builder.build())
}
}

impl<C: BuildablePlurality> From<(WebRtcSocket<C>, MessageLoopFuture)> for MatchboxSocket<C> {
fn from((socket, message_loop_fut): (WebRtcSocket<C>, MessageLoopFuture)) -> Self {
impl From<(WebRtcSocket, MessageLoopFuture)> for MatchboxSocket {
fn from((socket, message_loop_fut): (WebRtcSocket, MessageLoopFuture)) -> Self {
let task_pool = IoTaskPool::get();
let task = task_pool.spawn(message_loop_fut);
MatchboxSocket(socket, Box::new(task))
}
}

/// A [`Command`] used to open a [`MatchboxSocket`] and allocate it as a resource.
struct OpenSocket<C: BuildablePlurality>(WebRtcSocketBuilder<C>);
struct OpenSocket(WebRtcSocketBuilder);

impl<C: BuildablePlurality + 'static> Command for OpenSocket<C> {
impl Command for OpenSocket {
fn apply(self, world: &mut World) {
world.insert_resource(MatchboxSocket::from(self.0));
}
}

/// A [`Commands`] extension used to open a [`MatchboxSocket`] and allocate it as a resource.
pub trait OpenSocketExt<C: BuildablePlurality> {
pub trait OpenSocketExt {
/// Opens a [`MatchboxSocket`] and allocates it as a resource.
fn open_socket(&mut self, socket_builder: WebRtcSocketBuilder<C>);
fn open_socket(&mut self, socket_builder: WebRtcSocketBuilder);
}

impl<C: BuildablePlurality + 'static> OpenSocketExt<C> for Commands<'_, '_> {
fn open_socket(&mut self, socket_builder: WebRtcSocketBuilder<C>) {
impl OpenSocketExt for Commands<'_, '_> {
fn open_socket(&mut self, socket_builder: WebRtcSocketBuilder) {
self.add(OpenSocket(socket_builder))
}
}

/// A [`Command`] used to close a [`WebRtcSocket`], deleting the [`MatchboxSocket`] resource.
struct CloseSocket<C: BuildablePlurality>(PhantomData<C>);
struct CloseSocket;

impl<C: BuildablePlurality + 'static> Command for CloseSocket<C> {
impl Command for CloseSocket {
fn apply(self, world: &mut World) {
world.remove_resource::<MatchboxSocket<C>>();
world.remove_resource::<MatchboxSocket>();
}
}

/// A [`Commands`] extension used to close a [`WebRtcSocket`], deleting the [`MatchboxSocket`]
/// resource.
pub trait CloseSocketExt {
/// Delete the [`MatchboxSocket`] resource.
fn close_socket<C: BuildablePlurality + 'static>(&mut self);
fn close_socket(&mut self);
}

impl CloseSocketExt for Commands<'_, '_> {
fn close_socket<C: BuildablePlurality + 'static>(&mut self) {
self.add(CloseSocket::<C>(PhantomData))
fn close_socket(&mut self) {
self.add(CloseSocket)
}
}

impl MatchboxSocket<SingleChannel> {
impl MatchboxSocket {
/// Create a new socket with a single unreliable channel
///
/// ```rust
Expand All @@ -157,7 +154,7 @@ impl MatchboxSocket<SingleChannel> {
/// commands.spawn(socket);
/// }
/// ```
pub fn new_unreliable(room_url: impl Into<String>) -> MatchboxSocket<SingleChannel> {
pub fn new_unreliable(room_url: impl Into<String>) -> MatchboxSocket {
Self::from(WebRtcSocket::new_unreliable(room_url))
}

Expand All @@ -173,24 +170,7 @@ impl MatchboxSocket<SingleChannel> {
/// commands.spawn(socket);
/// }
/// ```
pub fn new_reliable(room_url: impl Into<String>) -> MatchboxSocket<SingleChannel> {
pub fn new_reliable(room_url: impl Into<String>) -> MatchboxSocket {
Self::from(WebRtcSocket::new_reliable(room_url))
}

/// Create a new socket with a single ggrs-compatible channel
///
/// ```rust
/// use bevy_matchbox::prelude::*;
/// use bevy::prelude::*;
///
/// fn open_channel_system(mut commands: Commands) {
/// let room_url = "wss://matchbox.example.com";
/// let socket = MatchboxSocket::new_ggrs(room_url);
/// commands.spawn(socket);
/// }
/// ```
#[cfg(feature = "ggrs")]
pub fn new_ggrs(room_url: impl Into<String>) -> MatchboxSocket<SingleChannel> {
Self::from(WebRtcSocket::new_ggrs(room_url))
}
}
15 changes: 9 additions & 6 deletions examples/bevy_ggrs/src/box_game.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ const FRICTION: f32 = 0.0018;
const PLANE_SIZE: f32 = 5.0;
const CUBE_SIZE: f32 = 0.2;

// You need to define a config struct to bundle all the generics of GGRS. bevy_ggrs provides a sensible default in `GgrsConfig`.
// (optional) You can define a type here for brevity.
// You need to define a config struct to bundle all the generics of GGRS. bevy_ggrs provides a
// sensible default in `GgrsConfig`. (optional) You can define a type here for brevity.
pub type BoxConfig = GgrsConfig<BoxInput, PeerId>;

#[repr(C)]
Expand Down Expand Up @@ -58,7 +58,8 @@ pub struct FrameCount {
pub frame: u32,
}

/// Collects player inputs during [`ReadInputs`](`bevy_ggrs::ReadInputs`) and creates a [`LocalInputs`] resource.
/// Collects player inputs during [`ReadInputs`](`bevy_ggrs::ReadInputs`) and creates a
/// [`LocalInputs`] resource.
pub fn read_local_inputs(
mut commands: Commands,
keyboard_input: Res<ButtonInput<KeyCode>>,
Expand Down Expand Up @@ -154,8 +155,9 @@ pub fn setup_scene(
}

// Example system, manipulating a resource, will be added to the rollback schedule.
// Increases the frame count by 1 every update step. If loading and saving resources works correctly,
// you should see this resource rolling back, counting back up and finally increasing by 1 every update step
// Increases the frame count by 1 every update step. If loading and saving resources works
// correctly, you should see this resource rolling back, counting back up and finally increasing by
// 1 every update step
#[allow(dead_code)]
pub fn increase_frame_system(mut frame_count: ResMut<FrameCount>) {
frame_count.frame += 1;
Expand All @@ -167,7 +169,8 @@ pub fn increase_frame_system(mut frame_count: ResMut<FrameCount>) {
#[allow(dead_code)]
pub fn move_cube_system(
mut query: Query<(&mut Transform, &mut Velocity, &Player), With<Rollback>>,
// ^------^ Added by `add_rollback` earlier
// ^------^ Added by
// `add_rollback` earlier
inputs: Res<PlayerInputs<BoxConfig>>,
// Thanks to RollbackTimePlugin, this is rollback safe
time: Res<Time>,
Expand Down
10 changes: 6 additions & 4 deletions examples/bevy_ggrs/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,11 @@ fn main() {
.set_rollback_schedule_fps(FPS)
.add_systems(ReadInputs, read_local_inputs)
// Rollback behavior can be customized using a variety of extension methods and plugins:
// The FrameCount resource implements Copy, we can use that to have minimal overhead rollback
// The FrameCount resource implements Copy, we can use that to have minimal overhead
// rollback
.rollback_resource_with_copy::<FrameCount>()
// Transform and Velocity components only implement Clone, so instead we'll use that to snapshot and rollback with
// Transform and Velocity components only implement Clone, so instead we'll use that to
// snapshot and rollback with
.rollback_component_with_clone::<Transform>()
.rollback_component_with_clone::<Velocity>()
.insert_resource(ClearColor(SKY_COLOR))
Expand Down Expand Up @@ -66,7 +68,7 @@ fn start_matchbox_socket(mut commands: Commands, args: Res<Args>) {
let room_url = format!("{}/{}", &args.matchbox, room_id);
info!("connecting to matchbox server: {room_url:?}");

commands.insert_resource(MatchboxSocket::new_ggrs(room_url));
commands.insert_resource(MatchboxSocket::new_unreliable(room_url));
}

// Marker components for UI
Expand Down Expand Up @@ -124,7 +126,7 @@ fn lobby_cleanup(query: Query<Entity, With<LobbyUI>>, mut commands: Commands) {
fn lobby_system(
mut app_state: ResMut<NextState<AppState>>,
args: Res<Args>,
mut socket: ResMut<MatchboxSocket<SingleChannel>>,
mut socket: ResMut<MatchboxSocket>,
mut commands: Commands,
mut query: Query<&mut Text, With<LobbyText>>,
) {
Expand Down
8 changes: 5 additions & 3 deletions examples/error_handling/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ use log::{info, warn};
use matchbox_socket::{Error as SocketError, PeerId, PeerState, WebRtcSocket};
use std::time::Duration;

const CHANNEL_ID: usize = 0;

#[cfg(target_arch = "wasm32")]
fn main() {
// Setup logging
Expand Down Expand Up @@ -65,7 +67,7 @@ async fn async_main() {
PeerState::Connected => {
info!("Peer joined: {peer}");
let packet = "hello friend!".as_bytes().to_vec().into_boxed_slice();
socket.send(packet, peer);
socket.channel_mut(CHANNEL_ID).send(packet, peer);
}
PeerState::Disconnected => {
info!("Peer left: {peer}");
Expand All @@ -74,7 +76,7 @@ async fn async_main() {
}

// Accept any messages incoming
for (peer, packet) in socket.receive() {
for (peer, packet) in socket.channel_mut(CHANNEL_ID).receive() {
let message = String::from_utf8_lossy(&packet);
info!("Message from {peer}: {message:?}");
}
Expand All @@ -85,7 +87,7 @@ async fn async_main() {
let peers: Vec<PeerId> = socket.connected_peers().collect();
for peer in peers {
let packet = "ping!".as_bytes().to_vec().into_boxed_slice();
socket.send(packet, peer);
socket.channel_mut(CHANNEL_ID).send(packet, peer);
}
timeout.reset(Duration::from_millis(10));
}
Expand Down
Loading

0 comments on commit 06b62a4

Please sign in to comment.