Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor: Drop typestate pattern for building sockets #468

Merged
merged 4 commits into from
Dec 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions bevy_matchbox/examples/hello.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
use bevy::{prelude::*, time::common_conditions::on_timer, utils::Duration};
use bevy_matchbox::prelude::*;

const CHANNEL_ID: usize = 0;

fn main() {
App::new()
.add_plugins(DefaultPlugins)
Expand All @@ -21,22 +23,24 @@ fn start_socket(mut commands: Commands) {
commands.insert_resource(socket);
}

fn send_message(mut socket: ResMut<MatchboxSocket<SingleChannel>>) {
fn send_message(mut socket: ResMut<MatchboxSocket>) {
let peers: Vec<_> = socket.connected_peers().collect();

for peer in peers {
let message = "Hello";
info!("Sending message: {message:?} to {peer}");
socket.send(message.as_bytes().into(), peer);
socket
.channel_mut(CHANNEL_ID)
.send(message.as_bytes().into(), peer);
}
}

fn receive_messages(mut socket: ResMut<MatchboxSocket<SingleChannel>>) {
fn receive_messages(mut socket: ResMut<MatchboxSocket>) {
for (peer, state) in socket.update_peers() {
info!("{peer}: {state:?}");
}

for (_id, message) in socket.receive() {
for (_id, message) in socket.channel_mut(CHANNEL_ID).receive() {
match std::str::from_utf8(&message) {
Ok(message) => info!("Received message: {message:?}"),
Err(e) => error!("Failed to convert message to string: {e}"),
Expand Down
8 changes: 4 additions & 4 deletions bevy_matchbox/examples/hello_host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,22 +60,22 @@ fn start_host_socket(mut commands: Commands) {
commands.insert_resource(socket);
}

fn send_message(mut socket: ResMut<MatchboxSocket<SingleChannel>>) {
fn send_message(mut socket: ResMut<MatchboxSocket>) {
let peers: Vec<_> = socket.connected_peers().collect();

for peer in peers {
let message = "Hello, I'm the host";
info!("Sending message: {message:?} to {peer}");
socket.send(message.as_bytes().into(), peer);
socket.channel_mut(0).send(message.as_bytes().into(), peer);
}
}

fn receive_messages(mut socket: ResMut<MatchboxSocket<SingleChannel>>) {
fn receive_messages(mut socket: ResMut<MatchboxSocket>) {
for (peer, state) in socket.update_peers() {
info!("{peer}: {state:?}");
}

for (_id, message) in socket.receive() {
for (_id, message) in socket.channel_mut(0).receive() {
match std::str::from_utf8(&message) {
Ok(message) => info!("Received message: {message:?}"),
Err(e) => error!("Failed to convert message to string: {e}"),
Expand Down
5 changes: 1 addition & 4 deletions bevy_matchbox/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,7 @@ cfg_if! {
pub mod prelude {
pub use crate::{CloseSocketExt, MatchboxSocket, OpenSocketExt};
use cfg_if::cfg_if;
pub use matchbox_socket::{
BuildablePlurality, ChannelConfig, MultipleChannels, PeerId, PeerState, SingleChannel,
WebRtcSocketBuilder,
};
pub use matchbox_socket::{ChannelConfig, PeerId, PeerState, WebRtcSocketBuilder};

cfg_if! {
if #[cfg(all(not(target_arch = "wasm32"), feature = "signaling"))] {
Expand Down
6 changes: 4 additions & 2 deletions bevy_matchbox/src/signaling.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,10 @@ impl MatchboxServer {

#[cfg(test)]
mod tests {
use crate::matchbox_signaling::topologies::client_server::{ClientServer, ClientServerState};
use crate::prelude::*;
use crate::{
matchbox_signaling::topologies::client_server::{ClientServer, ClientServerState},
prelude::*,
};
use bevy::prelude::*;
use std::net::Ipv4Addr;

Expand Down
76 changes: 28 additions & 48 deletions bevy_matchbox/src/socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,9 @@ use bevy::{
tasks::IoTaskPool,
};
pub use matchbox_socket;
use matchbox_socket::{
BuildablePlurality, MessageLoopFuture, SingleChannel, WebRtcSocket, WebRtcSocketBuilder,
};
use matchbox_socket::{MessageLoopFuture, WebRtcSocket, WebRtcSocketBuilder};
use std::{
fmt::Debug,
marker::PhantomData,
ops::{Deref, DerefMut},
};

Expand All @@ -28,7 +25,7 @@ use std::{
///
/// fn close_socket_system(
/// mut commands: Commands,
/// socket: Query<Entity, With<MatchboxSocket<SingleChannel>>>
/// socket: Query<Entity, With<MatchboxSocket>>
/// ) {
/// let socket = socket.single();
/// commands.entity(socket).despawn();
Expand All @@ -46,7 +43,7 @@ use std::{
/// }
///
/// fn close_socket_system(mut commands: Commands) {
/// commands.close_socket::<SingleChannel>();
/// commands.close_socket();
/// }
/// ```
///
Expand All @@ -58,93 +55,93 @@ use std::{
/// fn open_socket_system(mut commands: Commands) {
/// let room_url = "wss://matchbox.example.com";
///
/// let socket: MatchboxSocket<SingleChannel> = WebRtcSocketBuilder::new(room_url)
/// let socket: MatchboxSocket = WebRtcSocketBuilder::new(room_url)
/// .add_channel(ChannelConfig::reliable())
/// .into();
///
/// commands.insert_resource(socket);
/// }
///
/// fn close_socket_system(mut commands: Commands) {
/// commands.remove_resource::<MatchboxSocket<SingleChannel>>();
/// commands.remove_resource::<MatchboxSocket>();
/// }
/// ```
#[derive(Resource, Component, Debug)]
#[allow(dead_code)] // keep the task alive so it doesn't drop before the socket
pub struct MatchboxSocket<C: BuildablePlurality>(WebRtcSocket<C>, Box<dyn Debug + Send + Sync>);
pub struct MatchboxSocket(WebRtcSocket, Box<dyn Debug + Send + Sync>);

impl<C: BuildablePlurality> Deref for MatchboxSocket<C> {
type Target = WebRtcSocket<C>;
impl Deref for MatchboxSocket {
type Target = WebRtcSocket;

fn deref(&self) -> &Self::Target {
&self.0
}
}

impl<C: BuildablePlurality> DerefMut for MatchboxSocket<C> {
impl DerefMut for MatchboxSocket {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}

impl<C: BuildablePlurality> From<WebRtcSocketBuilder<C>> for MatchboxSocket<C> {
fn from(builder: WebRtcSocketBuilder<C>) -> Self {
impl From<WebRtcSocketBuilder> for MatchboxSocket {
fn from(builder: WebRtcSocketBuilder) -> Self {
Self::from(builder.build())
}
}

impl<C: BuildablePlurality> From<(WebRtcSocket<C>, MessageLoopFuture)> for MatchboxSocket<C> {
fn from((socket, message_loop_fut): (WebRtcSocket<C>, MessageLoopFuture)) -> Self {
impl From<(WebRtcSocket, MessageLoopFuture)> for MatchboxSocket {
fn from((socket, message_loop_fut): (WebRtcSocket, MessageLoopFuture)) -> Self {
let task_pool = IoTaskPool::get();
let task = task_pool.spawn(message_loop_fut);
MatchboxSocket(socket, Box::new(task))
}
}

/// A [`Command`] used to open a [`MatchboxSocket`] and allocate it as a resource.
struct OpenSocket<C: BuildablePlurality>(WebRtcSocketBuilder<C>);
struct OpenSocket(WebRtcSocketBuilder);

impl<C: BuildablePlurality + 'static> Command for OpenSocket<C> {
impl Command for OpenSocket {
fn apply(self, world: &mut World) {
world.insert_resource(MatchboxSocket::from(self.0));
}
}

/// A [`Commands`] extension used to open a [`MatchboxSocket`] and allocate it as a resource.
pub trait OpenSocketExt<C: BuildablePlurality> {
pub trait OpenSocketExt {
/// Opens a [`MatchboxSocket`] and allocates it as a resource.
fn open_socket(&mut self, socket_builder: WebRtcSocketBuilder<C>);
fn open_socket(&mut self, socket_builder: WebRtcSocketBuilder);
}

impl<C: BuildablePlurality + 'static> OpenSocketExt<C> for Commands<'_, '_> {
fn open_socket(&mut self, socket_builder: WebRtcSocketBuilder<C>) {
impl OpenSocketExt for Commands<'_, '_> {
fn open_socket(&mut self, socket_builder: WebRtcSocketBuilder) {
self.add(OpenSocket(socket_builder))
}
}

/// A [`Command`] used to close a [`WebRtcSocket`], deleting the [`MatchboxSocket`] resource.
struct CloseSocket<C: BuildablePlurality>(PhantomData<C>);
struct CloseSocket;

impl<C: BuildablePlurality + 'static> Command for CloseSocket<C> {
impl Command for CloseSocket {
fn apply(self, world: &mut World) {
world.remove_resource::<MatchboxSocket<C>>();
world.remove_resource::<MatchboxSocket>();
}
}

/// A [`Commands`] extension used to close a [`WebRtcSocket`], deleting the [`MatchboxSocket`]
/// resource.
pub trait CloseSocketExt {
/// Delete the [`MatchboxSocket`] resource.
fn close_socket<C: BuildablePlurality + 'static>(&mut self);
fn close_socket(&mut self);
}

impl CloseSocketExt for Commands<'_, '_> {
fn close_socket<C: BuildablePlurality + 'static>(&mut self) {
self.add(CloseSocket::<C>(PhantomData))
fn close_socket(&mut self) {
self.add(CloseSocket)
}
}

impl MatchboxSocket<SingleChannel> {
impl MatchboxSocket {
/// Create a new socket with a single unreliable channel
///
/// ```rust
Expand All @@ -157,7 +154,7 @@ impl MatchboxSocket<SingleChannel> {
/// commands.spawn(socket);
/// }
/// ```
pub fn new_unreliable(room_url: impl Into<String>) -> MatchboxSocket<SingleChannel> {
pub fn new_unreliable(room_url: impl Into<String>) -> MatchboxSocket {
Self::from(WebRtcSocket::new_unreliable(room_url))
}

Expand All @@ -173,24 +170,7 @@ impl MatchboxSocket<SingleChannel> {
/// commands.spawn(socket);
/// }
/// ```
pub fn new_reliable(room_url: impl Into<String>) -> MatchboxSocket<SingleChannel> {
pub fn new_reliable(room_url: impl Into<String>) -> MatchboxSocket {
Self::from(WebRtcSocket::new_reliable(room_url))
}

/// Create a new socket with a single ggrs-compatible channel
///
/// ```rust
/// use bevy_matchbox::prelude::*;
/// use bevy::prelude::*;
///
/// fn open_channel_system(mut commands: Commands) {
/// let room_url = "wss://matchbox.example.com";
/// let socket = MatchboxSocket::new_ggrs(room_url);
/// commands.spawn(socket);
/// }
/// ```
#[cfg(feature = "ggrs")]
pub fn new_ggrs(room_url: impl Into<String>) -> MatchboxSocket<SingleChannel> {
Self::from(WebRtcSocket::new_ggrs(room_url))
}
}
15 changes: 9 additions & 6 deletions examples/bevy_ggrs/src/box_game.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ const FRICTION: f32 = 0.0018;
const PLANE_SIZE: f32 = 5.0;
const CUBE_SIZE: f32 = 0.2;

// You need to define a config struct to bundle all the generics of GGRS. bevy_ggrs provides a sensible default in `GgrsConfig`.
// (optional) You can define a type here for brevity.
// You need to define a config struct to bundle all the generics of GGRS. bevy_ggrs provides a
// sensible default in `GgrsConfig`. (optional) You can define a type here for brevity.
pub type BoxConfig = GgrsConfig<BoxInput, PeerId>;

#[repr(C)]
Expand Down Expand Up @@ -58,7 +58,8 @@ pub struct FrameCount {
pub frame: u32,
}

/// Collects player inputs during [`ReadInputs`](`bevy_ggrs::ReadInputs`) and creates a [`LocalInputs`] resource.
/// Collects player inputs during [`ReadInputs`](`bevy_ggrs::ReadInputs`) and creates a
/// [`LocalInputs`] resource.
pub fn read_local_inputs(
mut commands: Commands,
keyboard_input: Res<ButtonInput<KeyCode>>,
Expand Down Expand Up @@ -154,8 +155,9 @@ pub fn setup_scene(
}

// Example system, manipulating a resource, will be added to the rollback schedule.
// Increases the frame count by 1 every update step. If loading and saving resources works correctly,
// you should see this resource rolling back, counting back up and finally increasing by 1 every update step
// Increases the frame count by 1 every update step. If loading and saving resources works
// correctly, you should see this resource rolling back, counting back up and finally increasing by
// 1 every update step
#[allow(dead_code)]
pub fn increase_frame_system(mut frame_count: ResMut<FrameCount>) {
frame_count.frame += 1;
Expand All @@ -167,7 +169,8 @@ pub fn increase_frame_system(mut frame_count: ResMut<FrameCount>) {
#[allow(dead_code)]
pub fn move_cube_system(
mut query: Query<(&mut Transform, &mut Velocity, &Player), With<Rollback>>,
// ^------^ Added by `add_rollback` earlier
// ^------^ Added by
// `add_rollback` earlier
inputs: Res<PlayerInputs<BoxConfig>>,
// Thanks to RollbackTimePlugin, this is rollback safe
time: Res<Time>,
Expand Down
10 changes: 6 additions & 4 deletions examples/bevy_ggrs/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,11 @@ fn main() {
.set_rollback_schedule_fps(FPS)
.add_systems(ReadInputs, read_local_inputs)
// Rollback behavior can be customized using a variety of extension methods and plugins:
// The FrameCount resource implements Copy, we can use that to have minimal overhead rollback
// The FrameCount resource implements Copy, we can use that to have minimal overhead
// rollback
.rollback_resource_with_copy::<FrameCount>()
// Transform and Velocity components only implement Clone, so instead we'll use that to snapshot and rollback with
// Transform and Velocity components only implement Clone, so instead we'll use that to
// snapshot and rollback with
.rollback_component_with_clone::<Transform>()
.rollback_component_with_clone::<Velocity>()
.insert_resource(ClearColor(SKY_COLOR))
Expand Down Expand Up @@ -66,7 +68,7 @@ fn start_matchbox_socket(mut commands: Commands, args: Res<Args>) {
let room_url = format!("{}/{}", &args.matchbox, room_id);
info!("connecting to matchbox server: {room_url:?}");

commands.insert_resource(MatchboxSocket::new_ggrs(room_url));
commands.insert_resource(MatchboxSocket::new_unreliable(room_url));
}

// Marker components for UI
Expand Down Expand Up @@ -124,7 +126,7 @@ fn lobby_cleanup(query: Query<Entity, With<LobbyUI>>, mut commands: Commands) {
fn lobby_system(
mut app_state: ResMut<NextState<AppState>>,
args: Res<Args>,
mut socket: ResMut<MatchboxSocket<SingleChannel>>,
mut socket: ResMut<MatchboxSocket>,
mut commands: Commands,
mut query: Query<&mut Text, With<LobbyText>>,
) {
Expand Down
8 changes: 5 additions & 3 deletions examples/error_handling/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ use log::{info, warn};
use matchbox_socket::{Error as SocketError, PeerId, PeerState, WebRtcSocket};
use std::time::Duration;

const CHANNEL_ID: usize = 0;

#[cfg(target_arch = "wasm32")]
fn main() {
// Setup logging
Expand Down Expand Up @@ -65,7 +67,7 @@ async fn async_main() {
PeerState::Connected => {
info!("Peer joined: {peer}");
let packet = "hello friend!".as_bytes().to_vec().into_boxed_slice();
socket.send(packet, peer);
socket.channel_mut(CHANNEL_ID).send(packet, peer);
}
PeerState::Disconnected => {
info!("Peer left: {peer}");
Expand All @@ -74,7 +76,7 @@ async fn async_main() {
}

// Accept any messages incoming
for (peer, packet) in socket.receive() {
for (peer, packet) in socket.channel_mut(CHANNEL_ID).receive() {
let message = String::from_utf8_lossy(&packet);
info!("Message from {peer}: {message:?}");
}
Expand All @@ -85,7 +87,7 @@ async fn async_main() {
let peers: Vec<PeerId> = socket.connected_peers().collect();
for peer in peers {
let packet = "ping!".as_bytes().to_vec().into_boxed_slice();
socket.send(packet, peer);
socket.channel_mut(CHANNEL_ID).send(packet, peer);
}
timeout.reset(Duration::from_millis(10));
}
Expand Down
Loading
Loading