Skip to content

Commit

Permalink
Merge pull request #65 from johanhelsing/fix-wasm-connections-2
Browse files Browse the repository at this point in the history
Fix regression in wasm connections behind NATs
  • Loading branch information
johanhelsing authored Dec 27, 2022
2 parents 8f59213 + 357144f commit 97c2ea1
Show file tree
Hide file tree
Showing 2 changed files with 114 additions and 31 deletions.
1 change: 1 addition & 0 deletions matchbox_socket/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ web-sys = { version = "0.3.22", default-features = false, features = [
"RtcPeerConnection",
"RtcSdpType", "RtcSessionDescription", "RtcSessionDescriptionInit",
"RtcIceGatheringState", "RtcIceCandidate", "RtcIceCandidateInit", "RtcPeerConnectionIceEvent",
"RtcIceConnectionState",
"RtcConfiguration", "RtcDataChannel", "RtcDataChannelInit", "RtcDataChannelType",
] }
serde-wasm-bindgen = { version = "0.4" }
Expand Down
144 changes: 113 additions & 31 deletions matchbox_socket/src/webrtc_socket/wasm/message_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ use wasm_bindgen::{prelude::*, JsCast, JsValue};
use wasm_bindgen_futures::JsFuture;
use web_sys::{
MessageEvent, RtcConfiguration, RtcDataChannel, RtcDataChannelInit, RtcDataChannelType,
RtcIceCandidateInit, RtcPeerConnection, RtcPeerConnectionIceEvent, RtcSdpType,
RtcSessionDescriptionInit,
RtcIceCandidateInit, RtcIceGatheringState, RtcPeerConnection, RtcPeerConnectionIceEvent,
RtcSdpType, RtcSessionDescriptionInit,
};

use crate::webrtc_socket::{
Expand Down Expand Up @@ -164,6 +164,13 @@ async fn handshake_offer(
.await
.efix()?;
debug!("created offer for new peer");

// todo: the point of implementing ice trickle is to avoid this wait...
// however, for some reason removing this wait causes problems with NAT
// punching in practice.
// We should figure out why this is happening.
wait_for_ice_gathering_complete(conn.clone()).await;

signal_peer.send(PeerSignal::Offer(conn.local_description().unwrap().sdp()));

let mut received_candidates = vec![];
Expand Down Expand Up @@ -200,23 +207,26 @@ async fn handshake_offer(
// send ICE candidates to remote peer
let signal_peer_ice = signal_peer.clone();
let onicecandidate: Box<dyn FnMut(RtcPeerConnectionIceEvent)> = Box::new(
move |event: RtcPeerConnectionIceEvent| match event.candidate() {
Some(candidate) => {
let candidate = js_sys::JSON::stringify(&candidate.to_json())
move |event: RtcPeerConnectionIceEvent| {
let candidate_json = match event.candidate() {
Some(candidate) => js_sys::JSON::stringify(&candidate.to_json())
.expect("failed to serialize candidate")
.as_string()
.unwrap();
.unwrap(),
None => {
debug!("Received RtcPeerConnectionIceEvent with no candidate. This means there are no further ice candidates for this session");
"null".to_string()
}
};

debug!("sending IceCandidate signal: {candidate:?}");
signal_peer_ice.send(PeerSignal::IceCandidate(candidate));
}
None => {
debug!("Received RtcPeerConnectionIceEvent with no candidate. This means there are no further ice candidates for this session");
}
debug!("sending IceCandidate signal: {candidate_json:?}");
signal_peer_ice.send(PeerSignal::IceCandidate(candidate_json));
},
);
let onicecandidate = Closure::wrap(onicecandidate);
conn.set_onicecandidate(Some(onicecandidate.as_ref().unchecked_ref()));
// note: we can let rust keep ownership of this closure, since we replace
// the event handler later in this method when ice is finished

// handle pending ICE candidates
for candidate in received_candidates {
Expand Down Expand Up @@ -245,25 +255,40 @@ async fn handshake_offer(
// TODO: we should support getting new ICE candidates even after connecting,
// since it's possible to return to the ice gathering state
// See: <https://developer.mozilla.org/en-US/docs/Web/API/RTCPeerConnection/iceGatheringState>
conn.set_onicecandidate(None);
let onicecandidate: Box<dyn FnMut(RtcPeerConnectionIceEvent)> =
Box::new(move |_event: RtcPeerConnectionIceEvent| {
warn!("received ice candidate event after handshake completed");
});
let onicecandidate = Closure::wrap(onicecandidate);
conn.set_onicecandidate(Some(onicecandidate.as_ref().unchecked_ref()));
onicecandidate.forget();

debug!("Ice completed: {:?}", conn.ice_gathering_state());
debug!(
"handshake_offer completed, ice gathering state: {:?}",
conn.ice_gathering_state()
);

Ok((signal_peer.id, data_channel))
}

async fn try_add_rtc_ice_candidate(connection: &RtcPeerConnection, candidate_string: &str) {
let candidate_init = match js_sys::JSON::parse(candidate_string).map(RtcIceCandidateInit::from)
{
Ok(js_value) => js_value,
let parsed_candidate = match js_sys::JSON::parse(candidate_string) {
Ok(c) => c,
Err(err) => {
error!("failed to parse candidate json: {err:?}");
return;
}
};

let candidate_init = if parsed_candidate.is_null() {
debug!("Received null ice candidate, this means there are no further ice candidates");
None
} else {
Some(RtcIceCandidateInit::from(parsed_candidate))
};

JsFuture::from(
connection.add_ice_candidate_with_opt_rtc_ice_candidate_init(Some(&candidate_init)),
connection.add_ice_candidate_with_opt_rtc_ice_candidate_init(candidate_init.as_ref()),
)
.await
.expect("failed to add ice candidate");
Expand Down Expand Up @@ -339,30 +364,39 @@ async fn handshake_accept(
.await
.efix()?;

// todo: the point of implementing ice trickle is to avoid this wait...
// however, for some reason removing this wait causes problems with NAT
// punching in practice.
// We should figure out why this is happening.
wait_for_ice_gathering_complete(conn.clone()).await;

let answer = PeerSignal::Answer(conn.local_description().unwrap().sdp());
signal_peer.send(answer);

// send ICE candidates to remote peer
let signal_peer_ice = signal_peer.clone();
// todo: exactly the same as offer, dedup?
let onicecandidate: Box<dyn FnMut(RtcPeerConnectionIceEvent)> = Box::new(
move |event: RtcPeerConnectionIceEvent| match event.candidate() {
Some(candidate) => {
let candidate = js_sys::JSON::stringify(&candidate.to_json())
move |event: RtcPeerConnectionIceEvent| {
let candidate_json = match event.candidate() {
Some(candidate) => js_sys::JSON::stringify(&candidate.to_json())
.expect("failed to serialize candidate")
.as_string()
.unwrap();
.unwrap(),
None => {
debug!("Received RtcPeerConnectionIceEvent with no candidate. This means there are no further ice candidates for this session");
"null".to_string()
}
};

debug!("sending IceCandidate signal: {candidate:?}");
signal_peer_ice.send(PeerSignal::IceCandidate(candidate));
}
None => {
debug!("Received RtcPeerConnectionIceEvent with no candidate. This means there are no further ice candidates for this session");
}
debug!("sending IceCandidate signal: {candidate_json:?}");
signal_peer_ice.send(PeerSignal::IceCandidate(candidate_json));
},
);
let onicecandidate = Closure::wrap(onicecandidate);
conn.set_onicecandidate(Some(onicecandidate.as_ref().unchecked_ref()));
// note: we can let rust keep ownership of this closure, since we replace
// the event handler later in this method when ice is finished

// handle pending ICE candidates
for candidate in received_candidates {
Expand Down Expand Up @@ -391,9 +425,18 @@ async fn handshake_accept(
// TODO: we should support getting new ICE candidates even after connecting,
// since it's possible to return to the ice gathering state
// See: <https://developer.mozilla.org/en-US/docs/Web/API/RTCPeerConnection/iceGatheringState>
conn.set_onicecandidate(None);
let onicecandidate: Box<dyn FnMut(RtcPeerConnectionIceEvent)> =
Box::new(move |_event: RtcPeerConnectionIceEvent| {
warn!("received ice candidate event after handshake completed");
});
let onicecandidate = Closure::wrap(onicecandidate);
conn.set_onicecandidate(Some(onicecandidate.as_ref().unchecked_ref()));
onicecandidate.forget();

debug!("Ice completed: {:?}", conn.ice_gathering_state());
debug!(
"handshake_accept completed, ice gathering state: {:?}",
conn.ice_gathering_state()
);

Ok((signal_peer.id, data_channel))
}
Expand All @@ -415,7 +458,46 @@ fn create_rtc_peer_connection(config: &WebRtcSocketConfig) -> RtcPeerConnection
};
let ice_server_config_list = [ice_server_config];
peer_config.ice_servers(&serde_wasm_bindgen::to_value(&ice_server_config_list).unwrap());
RtcPeerConnection::new_with_configuration(&peer_config).unwrap()
let connection = RtcPeerConnection::new_with_configuration(&peer_config).unwrap();

let connection_1 = connection.clone();
let oniceconnectionstatechange: Box<dyn FnMut(_)> = Box::new(move |_event: JsValue| {
debug!(
"ice connection state changed: {:?}",
connection_1.ice_connection_state()
);
});
let oniceconnectionstatechange = Closure::wrap(oniceconnectionstatechange);
connection
.set_oniceconnectionstatechange(Some(oniceconnectionstatechange.as_ref().unchecked_ref()));
oniceconnectionstatechange.forget();

connection
}

async fn wait_for_ice_gathering_complete(conn: RtcPeerConnection) {
if conn.ice_gathering_state() == RtcIceGatheringState::Complete {
debug!("Ice gathering already completed");
return;
}

let (mut tx, mut rx) = futures_channel::mpsc::channel(1);

let conn_clone = conn.clone();
let onstatechange: Box<dyn FnMut(JsValue)> = Box::new(move |_| {
if conn_clone.ice_gathering_state() == RtcIceGatheringState::Complete {
tx.try_send(()).unwrap();
}
});

let onstatechange = Closure::wrap(onstatechange);

conn.set_onicegatheringstatechange(Some(onstatechange.as_ref().unchecked_ref()));

rx.next().await;

conn.set_onicegatheringstatechange(None);
debug!("Ice gathering completed");
}

fn create_data_channel(
Expand Down

0 comments on commit 97c2ea1

Please sign in to comment.