diff --git a/matchbox_socket/Cargo.toml b/matchbox_socket/Cargo.toml index fcb54b42..07ba4bef 100644 --- a/matchbox_socket/Cargo.toml +++ b/matchbox_socket/Cargo.toml @@ -39,7 +39,7 @@ web-sys = { version = "0.3.22", default-features = false, features = [ "MessageEvent", "RtcPeerConnection", "RtcSdpType", "RtcSessionDescription", "RtcSessionDescriptionInit", - "RtcIceGatheringState", + "RtcIceGatheringState", "RtcIceCandidate", "RtcIceCandidateInit", "RtcConfiguration", "RtcDataChannel", "RtcDataChannelInit", "RtcDataChannelType", ] } serde-wasm-bindgen = { version = "0.4" } diff --git a/matchbox_socket/src/webrtc_socket/mod.rs b/matchbox_socket/src/webrtc_socket/mod.rs index 52fa2ec6..eb2c8976 100644 --- a/matchbox_socket/src/webrtc_socket/mod.rs +++ b/matchbox_socket/src/webrtc_socket/mod.rs @@ -8,6 +8,7 @@ mod messages; mod signal_peer; const KEEP_ALIVE_INTERVAL: u64 = 10_000; +const DATA_CHANNEL_ID: u16 = 124; // TODO: maybe use cfg-if to make this slightly tidier #[cfg(not(target_arch = "wasm32"))] diff --git a/matchbox_socket/src/webrtc_socket/native/message_loop.rs b/matchbox_socket/src/webrtc_socket/native/message_loop.rs index 1f97ba94..196dc427 100644 --- a/matchbox_socket/src/webrtc_socket/native/message_loop.rs +++ b/matchbox_socket/src/webrtc_socket/native/message_loop.rs @@ -26,7 +26,7 @@ use webrtc::{ use crate::webrtc_socket::{ messages::{PeerEvent, PeerId, PeerRequest, PeerSignal}, signal_peer::SignalPeer, - Packet, WebRtcSocketConfig, KEEP_ALIVE_INTERVAL, + Packet, WebRtcSocketConfig, DATA_CHANNEL_ID, KEEP_ALIVE_INTERVAL, }; pub async fn message_loop( @@ -317,6 +317,16 @@ async fn handshake_accept( debug!("handshake_accept"); let (connection, trickle) = create_rtc_peer_connection(signal_peer.clone(), config).await?; + let (channel_ready_tx, mut channel_ready_rx) = futures_channel::mpsc::channel(1); + let data_channel = create_data_channel( + &connection, + channel_ready_tx, + signal_peer.id.clone(), + new_peer_tx, + from_peer_message_tx, + ) + .await; + let offer; loop { match signal_receiver.next().await.ok_or("error")? { @@ -347,23 +357,15 @@ async fn handshake_accept( .fuse(), ); - let data_channel_fut = wait_for_data_channel( - &connection, - signal_peer.id.clone(), - new_peer_tx, - from_peer_message_tx, - ) - .fuse(); - pin_mut!(data_channel_fut); - - let data_channel = loop { + let mut channel_ready_fut = channel_ready_rx.next(); + loop { select! { - data_channel = data_channel_fut => break data_channel, + _ = channel_ready_fut => break, // TODO: this means that the signalling is down, should return an // error _ = trickle_fut => continue, }; - }; + } Ok((signal_peer.id, data_channel, trickle_fut)) } @@ -424,6 +426,7 @@ async fn create_data_channel( let config = RTCDataChannelInit { ordered: Some(false), max_retransmits: Some(0), + negotiated: Some(DATA_CHANNEL_ID), ..Default::default() }; @@ -446,40 +449,6 @@ async fn create_data_channel( channel } -async fn wait_for_data_channel( - connection: &RTCPeerConnection, - peer_id: PeerId, - new_peer_tx: UnboundedSender, - from_peer_message_tx: UnboundedSender<(PeerId, Packet)>, -) -> Arc { - let (channel_tx, mut channel_rx) = futures_channel::mpsc::channel(1); - - connection.on_data_channel(Box::new(move |channel| { - debug!("new data channel"); - let peer_id = peer_id.clone(); - let mut new_peer_tx = new_peer_tx.clone(); - let from_peer_message_tx = from_peer_message_tx.clone(); - let mut channel_tx = channel_tx.clone(); - Box::pin(async move { - let peer_id2 = peer_id.clone(); - let channel2 = Arc::clone(&channel); - - // TODO: register close & error callbacks - channel.on_open(Box::new(move || { - debug!("Data channel ready"); - Box::pin(async move { - new_peer_tx.send(peer_id2).await.unwrap(); - channel_tx.try_send(channel2).unwrap(); - }) - })); - - setup_data_channel(&channel, peer_id, from_peer_message_tx).await; - }) - })); - - channel_rx.next().await.unwrap() -} - async fn setup_data_channel( data_channel: &RTCDataChannel, peer_id: PeerId, diff --git a/matchbox_socket/src/webrtc_socket/wasm/message_loop.rs b/matchbox_socket/src/webrtc_socket/wasm/message_loop.rs index 8ab34482..f7f7ac00 100644 --- a/matchbox_socket/src/webrtc_socket/wasm/message_loop.rs +++ b/matchbox_socket/src/webrtc_socket/wasm/message_loop.rs @@ -12,14 +12,13 @@ use wasm_bindgen::{prelude::*, JsCast, JsValue}; use wasm_bindgen_futures::JsFuture; use web_sys::{ MessageEvent, RtcConfiguration, RtcDataChannel, RtcDataChannelInit, RtcDataChannelType, - RtcIceGatheringState, RtcPeerConnection, RtcSdpType, RtcSessionDescriptionInit, + RtcIceCandidate, RtcIceCandidateInit, RtcPeerConnection, RtcSdpType, RtcSessionDescriptionInit, }; -use crate::webrtc_socket::KEEP_ALIVE_INTERVAL; use crate::webrtc_socket::{ messages::{PeerEvent, PeerId, PeerRequest, PeerSignal}, signal_peer::SignalPeer, - Packet, WebRtcSocketConfig, + Packet, WebRtcSocketConfig, DATA_CHANNEL_ID, KEEP_ALIVE_INTERVAL, }; pub async fn message_loop( @@ -131,6 +130,7 @@ async fn handshake_offer( config: &WebRtcSocketConfig, ) -> Result<(PeerId, RtcDataChannel), Box> { debug!("making offer"); + let conn = create_rtc_peer_connection(config); let (channel_ready_tx, mut channel_ready_rx) = futures_channel::mpsc::channel(1); let data_channel = create_data_channel( @@ -140,30 +140,25 @@ async fn handshake_offer( channel_ready_tx, ); + // Create offer let offer = JsFuture::from(conn.create_offer()).await.efix()?; - let offer_sdp = Reflect::get(&offer, &JsValue::from_str("sdp")) .efix()? .as_string() .ok_or("")?; - let mut rtc_session_desc_init_dict: RtcSessionDescriptionInit = RtcSessionDescriptionInit::new(RtcSdpType::Offer); - let offer_description = rtc_session_desc_init_dict.sdp(&offer_sdp); - JsFuture::from(conn.set_local_description(offer_description)) .await .efix()?; - - wait_for_ice_complete(conn.clone()).await; - debug!("created offer for new peer"); - signal_peer.send(PeerSignal::Offer(conn.local_description().unwrap().sdp())); - let sdp: String; + let mut received_candidates = vec![]; + // Wait for answer + let sdp: String; loop { let signal = signal_receiver .next() @@ -175,29 +170,80 @@ async fn handshake_offer( sdp = answer; break; } - PeerSignal::Offer(_) => { - warn!("Got an unexpected Offer, while waiting for Answer. Ignoring.") + PeerSignal::IceCandidate(candidate) => { + debug!("got an IceCandidate signal! {}", candidate); + received_candidates.push(candidate); } - PeerSignal::IceCandidate(_) => { - warn!( - "Got an ice candidate message, but ice trickle is not yet supported. Ignoring." - ) + _ => { + warn!("ignoring unexpected signal: {signal:?}"); } }; } + // Set remote description let mut remote_description: RtcSessionDescriptionInit = RtcSessionDescriptionInit::new(RtcSdpType::Answer); - remote_description.sdp(&sdp); - debug!("setting remote description"); JsFuture::from(conn.set_remote_description(&remote_description)) .await .efix()?; + // send ICE candidates to remote peer + let signal_peer_ice = signal_peer.clone(); + let onicecandidate: Box = Box::new(move |event| { + let event = Reflect::get(&event, &JsValue::from_str("candidate")).efix(); + if let Ok(event) = event { + if let Ok(candidate) = event.dyn_into::() { + debug!("sending IceCandidate signal {}", candidate.candidate()); + signal_peer_ice.send(PeerSignal::IceCandidate(candidate.candidate())); + } + } + }); + let onicecandidate = Closure::wrap(onicecandidate); + conn.set_onicecandidate(Some(onicecandidate.as_ref().unchecked_ref())); + + // handle pending ICE candidates + for canditate in received_candidates { + let mut ice_candidate: RtcIceCandidateInit = RtcIceCandidateInit::new(&canditate); + ice_candidate.sdp_m_line_index(Some(0)); + JsFuture::from( + conn.add_ice_candidate_with_opt_rtc_ice_candidate_init(Some(&ice_candidate)), + ) + .await + .efix()?; + } + + // select for channel ready or ice candidates debug!("waiting for data channel to open"); - channel_ready_rx.next().await; + loop { + select! { + _ = channel_ready_rx.next() => { + debug!("channel ready"); + break; + } + msg = signal_receiver.next() => { + if let Some(PeerSignal::IceCandidate(candidate)) = msg { + debug!("got an IceCandidate signal! {}", candidate); + let mut ice_candidate: RtcIceCandidateInit = RtcIceCandidateInit::new(&candidate); + ice_candidate.sdp_m_line_index(Some(0)); + JsFuture::from( + conn.add_ice_candidate_with_opt_rtc_ice_candidate_init(Some(&ice_candidate)), + ) + .await + .efix()?; + } + } + }; + } + + // stop listening for ICE candidates + // TODO: we should support getting new ICE candidates even after connecting, + // since it's possible to return to the ice gathering state + // See: + conn.set_onicecandidate(None); + + debug!("Ice completed: {:?}", conn.ice_gathering_state()); Ok((signal_peer.id, data_channel)) } @@ -219,15 +265,26 @@ async fn handshake_accept( channel_ready_tx, ); + let mut received_candidates = vec![]; + let offer: Option; loop { - match signal_receiver.next().await.ok_or("error")? { + let signal = signal_receiver + .next() + .await + .ok_or("Signal server connection lost in the middle of a handshake")?; + + match signal { PeerSignal::Offer(o) => { offer = Some(o); break; } + PeerSignal::IceCandidate(candidate) => { + debug!("got an IceCandidate signal! {}", candidate); + received_candidates.push(candidate); + } _ => { - warn!("ignoring other signal!!!"); + warn!("ignoring unexpected signal: {signal:?}"); } } } @@ -266,13 +323,64 @@ async fn handshake_accept( .await .efix()?; - wait_for_ice_complete(conn.clone()).await; - let answer = PeerSignal::Answer(conn.local_description().unwrap().sdp()); signal_peer.send(answer); + // send ICE candidates to remote peer + let signal_peer_ice = signal_peer.clone(); + let onicecandidate: Box = Box::new(move |event| { + let event = Reflect::get(&event, &JsValue::from_str("candidate")).efix(); + if let Ok(event) = event { + if let Ok(candidate) = event.dyn_into::() { + debug!("sending IceCandidate signal {}", candidate.candidate()); + signal_peer_ice.send(PeerSignal::IceCandidate(candidate.candidate())); + } + } + }); + let onicecandidate = Closure::wrap(onicecandidate); + conn.set_onicecandidate(Some(onicecandidate.as_ref().unchecked_ref())); + + // handle pending ICE candidates + for canditate in received_candidates { + let mut ice_candidate: RtcIceCandidateInit = RtcIceCandidateInit::new(&canditate); + ice_candidate.sdp_m_line_index(Some(0)); + JsFuture::from( + conn.add_ice_candidate_with_opt_rtc_ice_candidate_init(Some(&ice_candidate)), + ) + .await + .efix()?; + } + + // select for channel ready or ice candidates debug!("waiting for data channel to open"); - channel_ready_rx.next().await; + loop { + select! { + _ = channel_ready_rx.next() => { + debug!("channel ready"); + break; + } + msg = signal_receiver.next() => { + if let Some(PeerSignal::IceCandidate(candidate)) = msg { + debug!("got an IceCandidate signal! {}", candidate); + let mut ice_candidate: RtcIceCandidateInit = RtcIceCandidateInit::new(&candidate); + ice_candidate.sdp_m_line_index(Some(0)); + JsFuture::from( + conn.add_ice_candidate_with_opt_rtc_ice_candidate_init(Some(&ice_candidate)), + ) + .await + .efix()?; + } + } + }; + } + + // stop listening for ICE candidates + // TODO: we should support getting new ICE candidates even after connecting, + // since it's possible to return to the ice gathering state + // See: + conn.set_onicecandidate(None); + + debug!("Ice completed: {:?}", conn.ice_gathering_state()); Ok((signal_peer.id, data_channel)) } @@ -297,31 +405,6 @@ fn create_rtc_peer_connection(config: &WebRtcSocketConfig) -> RtcPeerConnection RtcPeerConnection::new_with_configuration(&peer_config).unwrap() } -async fn wait_for_ice_complete(conn: RtcPeerConnection) { - if conn.ice_gathering_state() == RtcIceGatheringState::Complete { - debug!("Ice already completed"); - return; - } - - let (mut tx, mut rx) = futures_channel::mpsc::channel(1); - - let conn_clone = conn.clone(); - let onstatechange: Box = Box::new(move |_| { - if conn_clone.ice_gathering_state() == RtcIceGatheringState::Complete { - tx.try_send(()).unwrap(); - } - }); - - let onstatechange = Closure::wrap(onstatechange); - - conn.set_onicegatheringstatechange(Some(onstatechange.as_ref().unchecked_ref())); - - rx.next().await; - - conn.set_onicegatheringstatechange(None); - debug!("Ice completed"); -} - fn create_data_channel( connection: RtcPeerConnection, incoming_tx: futures_channel::mpsc::UnboundedSender<(PeerId, Packet)>, @@ -332,7 +415,7 @@ fn create_data_channel( data_channel_config.ordered(false); data_channel_config.max_retransmits(0); data_channel_config.negotiated(true); - data_channel_config.id(0); + data_channel_config.id(DATA_CHANNEL_ID); let channel: RtcDataChannel = connection.create_data_channel_with_data_channel_dict("webudp", &data_channel_config);