Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cross native and wasm, ice trickle and data channel creation #54

Merged
merged 4 commits into from
Dec 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion matchbox_socket/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ web-sys = { version = "0.3.22", default-features = false, features = [
"MessageEvent",
"RtcPeerConnection",
"RtcSdpType", "RtcSessionDescription", "RtcSessionDescriptionInit",
"RtcIceGatheringState",
"RtcIceGatheringState", "RtcIceCandidate", "RtcIceCandidateInit",
"RtcConfiguration", "RtcDataChannel", "RtcDataChannelInit", "RtcDataChannelType",
] }
serde-wasm-bindgen = { version = "0.4" }
Expand Down
1 change: 1 addition & 0 deletions matchbox_socket/src/webrtc_socket/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ mod messages;
mod signal_peer;

const KEEP_ALIVE_INTERVAL: u64 = 10_000;
const DATA_CHANNEL_ID: u16 = 124;

// TODO: maybe use cfg-if to make this slightly tidier
#[cfg(not(target_arch = "wasm32"))]
Expand Down
63 changes: 16 additions & 47 deletions matchbox_socket/src/webrtc_socket/native/message_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use webrtc::{
use crate::webrtc_socket::{
messages::{PeerEvent, PeerId, PeerRequest, PeerSignal},
signal_peer::SignalPeer,
Packet, WebRtcSocketConfig, KEEP_ALIVE_INTERVAL,
Packet, WebRtcSocketConfig, DATA_CHANNEL_ID, KEEP_ALIVE_INTERVAL,
};

pub async fn message_loop(
Expand Down Expand Up @@ -317,6 +317,16 @@ async fn handshake_accept(
debug!("handshake_accept");
let (connection, trickle) = create_rtc_peer_connection(signal_peer.clone(), config).await?;

let (channel_ready_tx, mut channel_ready_rx) = futures_channel::mpsc::channel(1);
let data_channel = create_data_channel(
&connection,
channel_ready_tx,
signal_peer.id.clone(),
new_peer_tx,
from_peer_message_tx,
)
.await;
johanhelsing marked this conversation as resolved.
Show resolved Hide resolved

let offer;
loop {
match signal_receiver.next().await.ok_or("error")? {
Expand Down Expand Up @@ -347,23 +357,15 @@ async fn handshake_accept(
.fuse(),
);

let data_channel_fut = wait_for_data_channel(
&connection,
signal_peer.id.clone(),
new_peer_tx,
from_peer_message_tx,
)
.fuse();
pin_mut!(data_channel_fut);

let data_channel = loop {
let mut channel_ready_fut = channel_ready_rx.next();
loop {
select! {
data_channel = data_channel_fut => break data_channel,
_ = channel_ready_fut => break,
// TODO: this means that the signalling is down, should return an
// error
_ = trickle_fut => continue,
};
};
}

Ok((signal_peer.id, data_channel, trickle_fut))
}
Expand Down Expand Up @@ -424,6 +426,7 @@ async fn create_data_channel(
let config = RTCDataChannelInit {
ordered: Some(false),
max_retransmits: Some(0),
negotiated: Some(DATA_CHANNEL_ID),
..Default::default()
};

Expand All @@ -446,40 +449,6 @@ async fn create_data_channel(
channel
}

async fn wait_for_data_channel(
connection: &RTCPeerConnection,
peer_id: PeerId,
new_peer_tx: UnboundedSender<PeerId>,
from_peer_message_tx: UnboundedSender<(PeerId, Packet)>,
) -> Arc<RTCDataChannel> {
let (channel_tx, mut channel_rx) = futures_channel::mpsc::channel(1);

connection.on_data_channel(Box::new(move |channel| {
debug!("new data channel");
let peer_id = peer_id.clone();
let mut new_peer_tx = new_peer_tx.clone();
let from_peer_message_tx = from_peer_message_tx.clone();
let mut channel_tx = channel_tx.clone();
Box::pin(async move {
let peer_id2 = peer_id.clone();
let channel2 = Arc::clone(&channel);

// TODO: register close & error callbacks
channel.on_open(Box::new(move || {
debug!("Data channel ready");
Box::pin(async move {
new_peer_tx.send(peer_id2).await.unwrap();
channel_tx.try_send(channel2).unwrap();
})
}));

setup_data_channel(&channel, peer_id, from_peer_message_tx).await;
})
}));

channel_rx.next().await.unwrap()
}

async fn setup_data_channel(
data_channel: &RTCDataChannel,
peer_id: PeerId,
Expand Down
Loading