Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix regression in wasm connections behind NATs #65

Merged
merged 5 commits into from
Dec 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions matchbox_socket/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ web-sys = { version = "0.3.22", default-features = false, features = [
"RtcPeerConnection",
"RtcSdpType", "RtcSessionDescription", "RtcSessionDescriptionInit",
"RtcIceGatheringState", "RtcIceCandidate", "RtcIceCandidateInit", "RtcPeerConnectionIceEvent",
"RtcIceConnectionState",
"RtcConfiguration", "RtcDataChannel", "RtcDataChannelInit", "RtcDataChannelType",
] }
serde-wasm-bindgen = { version = "0.4" }
Expand Down
144 changes: 113 additions & 31 deletions matchbox_socket/src/webrtc_socket/wasm/message_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ use wasm_bindgen::{prelude::*, JsCast, JsValue};
use wasm_bindgen_futures::JsFuture;
use web_sys::{
MessageEvent, RtcConfiguration, RtcDataChannel, RtcDataChannelInit, RtcDataChannelType,
RtcIceCandidateInit, RtcPeerConnection, RtcPeerConnectionIceEvent, RtcSdpType,
RtcSessionDescriptionInit,
RtcIceCandidateInit, RtcIceGatheringState, RtcPeerConnection, RtcPeerConnectionIceEvent,
RtcSdpType, RtcSessionDescriptionInit,
};

use crate::webrtc_socket::{
Expand Down Expand Up @@ -164,6 +164,13 @@ async fn handshake_offer(
.await
.efix()?;
debug!("created offer for new peer");

// todo: the point of implementing ice trickle is to avoid this wait...
// however, for some reason removing this wait causes problems with NAT
// punching in practice.
// We should figure out why this is happening.
wait_for_ice_gathering_complete(conn.clone()).await;

signal_peer.send(PeerSignal::Offer(conn.local_description().unwrap().sdp()));

let mut received_candidates = vec![];
Expand Down Expand Up @@ -200,23 +207,26 @@ async fn handshake_offer(
// send ICE candidates to remote peer
let signal_peer_ice = signal_peer.clone();
let onicecandidate: Box<dyn FnMut(RtcPeerConnectionIceEvent)> = Box::new(
move |event: RtcPeerConnectionIceEvent| match event.candidate() {
Some(candidate) => {
let candidate = js_sys::JSON::stringify(&candidate.to_json())
move |event: RtcPeerConnectionIceEvent| {
let candidate_json = match event.candidate() {
Some(candidate) => js_sys::JSON::stringify(&candidate.to_json())
.expect("failed to serialize candidate")
.as_string()
.unwrap();
.unwrap(),
None => {
debug!("Received RtcPeerConnectionIceEvent with no candidate. This means there are no further ice candidates for this session");
"null".to_string()
}
};

debug!("sending IceCandidate signal: {candidate:?}");
signal_peer_ice.send(PeerSignal::IceCandidate(candidate));
}
None => {
debug!("Received RtcPeerConnectionIceEvent with no candidate. This means there are no further ice candidates for this session");
}
debug!("sending IceCandidate signal: {candidate_json:?}");
signal_peer_ice.send(PeerSignal::IceCandidate(candidate_json));
},
);
let onicecandidate = Closure::wrap(onicecandidate);
conn.set_onicecandidate(Some(onicecandidate.as_ref().unchecked_ref()));
// note: we can let rust keep ownership of this closure, since we replace
// the event handler later in this method when ice is finished

// handle pending ICE candidates
for candidate in received_candidates {
Expand Down Expand Up @@ -245,25 +255,40 @@ async fn handshake_offer(
// TODO: we should support getting new ICE candidates even after connecting,
// since it's possible to return to the ice gathering state
// See: <https://developer.mozilla.org/en-US/docs/Web/API/RTCPeerConnection/iceGatheringState>
conn.set_onicecandidate(None);
let onicecandidate: Box<dyn FnMut(RtcPeerConnectionIceEvent)> =
Box::new(move |_event: RtcPeerConnectionIceEvent| {
warn!("received ice candidate event after handshake completed");
});
let onicecandidate = Closure::wrap(onicecandidate);
conn.set_onicecandidate(Some(onicecandidate.as_ref().unchecked_ref()));
onicecandidate.forget();

debug!("Ice completed: {:?}", conn.ice_gathering_state());
debug!(
"handshake_offer completed, ice gathering state: {:?}",
conn.ice_gathering_state()
);

Ok((signal_peer.id, data_channel))
}

async fn try_add_rtc_ice_candidate(connection: &RtcPeerConnection, candidate_string: &str) {
let candidate_init = match js_sys::JSON::parse(candidate_string).map(RtcIceCandidateInit::from)
{
Ok(js_value) => js_value,
let parsed_candidate = match js_sys::JSON::parse(candidate_string) {
Ok(c) => c,
Err(err) => {
error!("failed to parse candidate json: {err:?}");
return;
}
};

let candidate_init = if parsed_candidate.is_null() {
debug!("Received null ice candidate, this means there are no further ice candidates");
None
} else {
Some(RtcIceCandidateInit::from(parsed_candidate))
};

JsFuture::from(
connection.add_ice_candidate_with_opt_rtc_ice_candidate_init(Some(&candidate_init)),
connection.add_ice_candidate_with_opt_rtc_ice_candidate_init(candidate_init.as_ref()),
)
.await
.expect("failed to add ice candidate");
Expand Down Expand Up @@ -339,30 +364,39 @@ async fn handshake_accept(
.await
.efix()?;

// todo: the point of implementing ice trickle is to avoid this wait...
// however, for some reason removing this wait causes problems with NAT
// punching in practice.
// We should figure out why this is happening.
wait_for_ice_gathering_complete(conn.clone()).await;

let answer = PeerSignal::Answer(conn.local_description().unwrap().sdp());
signal_peer.send(answer);

// send ICE candidates to remote peer
let signal_peer_ice = signal_peer.clone();
// todo: exactly the same as offer, dedup?
let onicecandidate: Box<dyn FnMut(RtcPeerConnectionIceEvent)> = Box::new(
move |event: RtcPeerConnectionIceEvent| match event.candidate() {
Some(candidate) => {
let candidate = js_sys::JSON::stringify(&candidate.to_json())
move |event: RtcPeerConnectionIceEvent| {
let candidate_json = match event.candidate() {
Some(candidate) => js_sys::JSON::stringify(&candidate.to_json())
.expect("failed to serialize candidate")
.as_string()
.unwrap();
.unwrap(),
None => {
debug!("Received RtcPeerConnectionIceEvent with no candidate. This means there are no further ice candidates for this session");
"null".to_string()
}
};

debug!("sending IceCandidate signal: {candidate:?}");
signal_peer_ice.send(PeerSignal::IceCandidate(candidate));
}
None => {
debug!("Received RtcPeerConnectionIceEvent with no candidate. This means there are no further ice candidates for this session");
}
debug!("sending IceCandidate signal: {candidate_json:?}");
signal_peer_ice.send(PeerSignal::IceCandidate(candidate_json));
},
);
let onicecandidate = Closure::wrap(onicecandidate);
conn.set_onicecandidate(Some(onicecandidate.as_ref().unchecked_ref()));
// note: we can let rust keep ownership of this closure, since we replace
// the event handler later in this method when ice is finished

// handle pending ICE candidates
for candidate in received_candidates {
Expand Down Expand Up @@ -391,9 +425,18 @@ async fn handshake_accept(
// TODO: we should support getting new ICE candidates even after connecting,
// since it's possible to return to the ice gathering state
// See: <https://developer.mozilla.org/en-US/docs/Web/API/RTCPeerConnection/iceGatheringState>
conn.set_onicecandidate(None);
let onicecandidate: Box<dyn FnMut(RtcPeerConnectionIceEvent)> =
Box::new(move |_event: RtcPeerConnectionIceEvent| {
warn!("received ice candidate event after handshake completed");
});
let onicecandidate = Closure::wrap(onicecandidate);
conn.set_onicecandidate(Some(onicecandidate.as_ref().unchecked_ref()));
onicecandidate.forget();

debug!("Ice completed: {:?}", conn.ice_gathering_state());
debug!(
"handshake_accept completed, ice gathering state: {:?}",
conn.ice_gathering_state()
);

Ok((signal_peer.id, data_channel))
}
Expand All @@ -415,7 +458,46 @@ fn create_rtc_peer_connection(config: &WebRtcSocketConfig) -> RtcPeerConnection
};
let ice_server_config_list = [ice_server_config];
peer_config.ice_servers(&serde_wasm_bindgen::to_value(&ice_server_config_list).unwrap());
RtcPeerConnection::new_with_configuration(&peer_config).unwrap()
let connection = RtcPeerConnection::new_with_configuration(&peer_config).unwrap();

let connection_1 = connection.clone();
let oniceconnectionstatechange: Box<dyn FnMut(_)> = Box::new(move |_event: JsValue| {
debug!(
"ice connection state changed: {:?}",
connection_1.ice_connection_state()
);
});
let oniceconnectionstatechange = Closure::wrap(oniceconnectionstatechange);
connection
.set_oniceconnectionstatechange(Some(oniceconnectionstatechange.as_ref().unchecked_ref()));
oniceconnectionstatechange.forget();

connection
}

async fn wait_for_ice_gathering_complete(conn: RtcPeerConnection) {
if conn.ice_gathering_state() == RtcIceGatheringState::Complete {
debug!("Ice gathering already completed");
return;
}

let (mut tx, mut rx) = futures_channel::mpsc::channel(1);

let conn_clone = conn.clone();
let onstatechange: Box<dyn FnMut(JsValue)> = Box::new(move |_| {
if conn_clone.ice_gathering_state() == RtcIceGatheringState::Complete {
tx.try_send(()).unwrap();
}
});

let onstatechange = Closure::wrap(onstatechange);

conn.set_onicegatheringstatechange(Some(onstatechange.as_ref().unchecked_ref()));

rx.next().await;

conn.set_onicegatheringstatechange(None);
debug!("Ice gathering completed");
}

fn create_data_channel(
Expand Down