Skip to content

Commit

Permalink
Merge pull request #54 from VertexStudio/cross-native-wasm
Browse files Browse the repository at this point in the history
cross native and wasm, ice trickle and data channel creation
  • Loading branch information
johanhelsing authored Dec 17, 2022
2 parents c41f219 + 079cd51 commit 2858ed5
Show file tree
Hide file tree
Showing 4 changed files with 153 additions and 100 deletions.
2 changes: 1 addition & 1 deletion matchbox_socket/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ web-sys = { version = "0.3.22", default-features = false, features = [
"MessageEvent",
"RtcPeerConnection",
"RtcSdpType", "RtcSessionDescription", "RtcSessionDescriptionInit",
"RtcIceGatheringState",
"RtcIceGatheringState", "RtcIceCandidate", "RtcIceCandidateInit",
"RtcConfiguration", "RtcDataChannel", "RtcDataChannelInit", "RtcDataChannelType",
] }
serde-wasm-bindgen = { version = "0.4" }
Expand Down
1 change: 1 addition & 0 deletions matchbox_socket/src/webrtc_socket/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ mod messages;
mod signal_peer;

const KEEP_ALIVE_INTERVAL: u64 = 10_000;
const DATA_CHANNEL_ID: u16 = 124;

// TODO: maybe use cfg-if to make this slightly tidier
#[cfg(not(target_arch = "wasm32"))]
Expand Down
63 changes: 16 additions & 47 deletions matchbox_socket/src/webrtc_socket/native/message_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use webrtc::{
use crate::webrtc_socket::{
messages::{PeerEvent, PeerId, PeerRequest, PeerSignal},
signal_peer::SignalPeer,
Packet, WebRtcSocketConfig, KEEP_ALIVE_INTERVAL,
Packet, WebRtcSocketConfig, DATA_CHANNEL_ID, KEEP_ALIVE_INTERVAL,
};

pub async fn message_loop(
Expand Down Expand Up @@ -317,6 +317,16 @@ async fn handshake_accept(
debug!("handshake_accept");
let (connection, trickle) = create_rtc_peer_connection(signal_peer.clone(), config).await?;

let (channel_ready_tx, mut channel_ready_rx) = futures_channel::mpsc::channel(1);
let data_channel = create_data_channel(
&connection,
channel_ready_tx,
signal_peer.id.clone(),
new_peer_tx,
from_peer_message_tx,
)
.await;

let offer;
loop {
match signal_receiver.next().await.ok_or("error")? {
Expand Down Expand Up @@ -347,23 +357,15 @@ async fn handshake_accept(
.fuse(),
);

let data_channel_fut = wait_for_data_channel(
&connection,
signal_peer.id.clone(),
new_peer_tx,
from_peer_message_tx,
)
.fuse();
pin_mut!(data_channel_fut);

let data_channel = loop {
let mut channel_ready_fut = channel_ready_rx.next();
loop {
select! {
data_channel = data_channel_fut => break data_channel,
_ = channel_ready_fut => break,
// TODO: this means that the signalling is down, should return an
// error
_ = trickle_fut => continue,
};
};
}

Ok((signal_peer.id, data_channel, trickle_fut))
}
Expand Down Expand Up @@ -424,6 +426,7 @@ async fn create_data_channel(
let config = RTCDataChannelInit {
ordered: Some(false),
max_retransmits: Some(0),
negotiated: Some(DATA_CHANNEL_ID),
..Default::default()
};

Expand All @@ -446,40 +449,6 @@ async fn create_data_channel(
channel
}

async fn wait_for_data_channel(
connection: &RTCPeerConnection,
peer_id: PeerId,
new_peer_tx: UnboundedSender<PeerId>,
from_peer_message_tx: UnboundedSender<(PeerId, Packet)>,
) -> Arc<RTCDataChannel> {
let (channel_tx, mut channel_rx) = futures_channel::mpsc::channel(1);

connection.on_data_channel(Box::new(move |channel| {
debug!("new data channel");
let peer_id = peer_id.clone();
let mut new_peer_tx = new_peer_tx.clone();
let from_peer_message_tx = from_peer_message_tx.clone();
let mut channel_tx = channel_tx.clone();
Box::pin(async move {
let peer_id2 = peer_id.clone();
let channel2 = Arc::clone(&channel);

// TODO: register close & error callbacks
channel.on_open(Box::new(move || {
debug!("Data channel ready");
Box::pin(async move {
new_peer_tx.send(peer_id2).await.unwrap();
channel_tx.try_send(channel2).unwrap();
})
}));

setup_data_channel(&channel, peer_id, from_peer_message_tx).await;
})
}));

channel_rx.next().await.unwrap()
}

async fn setup_data_channel(
data_channel: &RTCDataChannel,
peer_id: PeerId,
Expand Down
Loading

0 comments on commit 2858ed5

Please sign in to comment.