Skip to content

Commit

Permalink
fix: clean tunnel when disconnected
Browse files Browse the repository at this point in the history
  • Loading branch information
Ma233 committed Oct 18, 2024
1 parent a76aa02 commit 2ec4998
Show file tree
Hide file tree
Showing 2 changed files with 122 additions and 39 deletions.
47 changes: 37 additions & 10 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,11 @@ pub enum PProxyCommand {
ConnectRelay {
multiaddr: Multiaddr,
},
SendConnectCommand {
ConnectTunnel {
peer_id: PeerId,
tunnel_id: TunnelId,
},
CleanTunnel {
peer_id: PeerId,
tunnel_id: TunnelId,
},
Expand All @@ -86,14 +90,17 @@ pub enum PProxyCommand {
pub enum PProxyCommandResponse {
AddPeer { peer_id: PeerId },
ConnectRelay { relaied_multiaddr: Multiaddr },
SendConnectCommand { remote_stream: Stream },
ConnectTunnel { remote_stream: Stream },
CleanTunnel {},
ExpirePeerAccess {},
}

pub struct PProxy {
command_tx: mpsc::Sender<(PProxyCommand, CommandNotifier)>,
command_rx: mpsc::Receiver<(PProxyCommand, CommandNotifier)>,
swarm: Swarm<PProxyNetworkBehaviour>,
stream_control: libp2p_stream::Control,
inbound_tunnels: HashMap<(PeerId, TunnelId), Tunnel>,
proxy_addr: Option<SocketAddr>,
access_client: Option<AccessClient>,
}
Expand All @@ -119,9 +126,11 @@ impl PProxy {

Ok((
Self {
command_tx: command_tx.clone(),
command_rx,
swarm,
stream_control,
inbound_tunnels: HashMap::new(),
proxy_addr,
access_client,
},
Expand Down Expand Up @@ -179,6 +188,9 @@ impl PProxy {
address.push(Protocol::P2p(*self.swarm.local_peer_id()));
println!("Local node is listening on {address}");
}
SwarmEvent::ConnectionClosed { peer_id, .. } => {
self.inbound_tunnels.retain(|(p, _), _| p != &peer_id);
}
_ => {}
}

Expand Down Expand Up @@ -226,7 +238,10 @@ impl PProxy {
.write_to(&mut remote_stream)
.await?;

tunnel.listen(local_stream, remote_stream).await?;
tunnel
.listen(self.command_tx.clone(), local_stream, remote_stream)
.await?;
self.inbound_tunnels.insert((peer_id, header.id), tunnel);
}
}

Expand All @@ -239,8 +254,11 @@ impl PProxy {
self.on_add_peer(multiaddr, peer_id, tx).await
}
PProxyCommand::ConnectRelay { multiaddr } => self.on_connect_relay(multiaddr, tx).await,
PProxyCommand::SendConnectCommand { peer_id, tunnel_id } => {
self.on_send_connect_command(peer_id, tunnel_id, tx).await
PProxyCommand::ConnectTunnel { peer_id, tunnel_id } => {
self.on_connect_tunnel(peer_id, tunnel_id, tx).await
}
PProxyCommand::CleanTunnel { peer_id, tunnel_id } => {
self.on_clean_tunnel(peer_id, tunnel_id, tx).await
}
PProxyCommand::ExpirePeerAccess { peer_id } => {
self.on_expire_peer_access(peer_id, tx).await
Expand Down Expand Up @@ -272,7 +290,7 @@ impl PProxy {
.map_err(|_| Error::EssentialTaskClosed)
}

async fn on_send_connect_command(
async fn on_connect_tunnel(
&mut self,
peer_id: PeerId,
tunnel_id: TunnelId,
Expand All @@ -291,10 +309,8 @@ impl PProxy {

match resp.reply {
TunnelReply::Succeeded => {
tx.send(Ok(PProxyCommandResponse::SendConnectCommand {
remote_stream,
}))
.map_err(|_| Error::EssentialTaskClosed)?;
tx.send(Ok(PProxyCommandResponse::ConnectTunnel { remote_stream }))
.map_err(|_| Error::EssentialTaskClosed)?;
}
e => {
remote_stream.close().await?;
Expand All @@ -308,6 +324,17 @@ impl PProxy {
Ok(())
}

async fn on_clean_tunnel(
&mut self,
peer_id: PeerId,
tunnel_id: TunnelId,
tx: CommandNotifier,
) -> Result<()> {
self.inbound_tunnels.remove(&(peer_id, tunnel_id));
tx.send(Ok(PProxyCommandResponse::CleanTunnel {}))
.map_err(|_| Error::EssentialTaskClosed)
}

async fn on_expire_peer_access(&mut self, peer_id: PeerId, tx: CommandNotifier) -> Result<()> {
if let Some(ref mut ac) = self.access_client {
ac.expire(&peer_id);
Expand Down
114 changes: 85 additions & 29 deletions src/tunnel/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,15 @@ pub struct TunnelServerListener {
pub struct Tunnel {
peer_id: PeerId,
tunnel_id: TunnelId,
listener_cancel_token: Option<CancellationToken>,
listener: Option<tokio::task::JoinHandle<()>>,
}

pub struct TunnelListener {
peer_id: PeerId,
tunnel_id: TunnelId,
pproxy_command_tx: mpsc::Sender<(PProxyCommand, CommandNotifier)>,
local_stream: TcpStream,
remote_stream: Compat<Stream>,
cancel_token: CancellationToken,
}

impl Drop for TunnelServer {
Expand All @@ -72,15 +73,8 @@ impl Drop for TunnelServer {

impl Drop for Tunnel {
fn drop(&mut self) {
if let Some(cancel_token) = self.listener_cancel_token.take() {
cancel_token.cancel();
}

if let Some(listener) = self.listener.take() {
tokio::spawn(async move {
tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;
listener.abort();
});
listener.abort();
}

tracing::info!("Tunnel {}-{} dropped", self.peer_id, self.tunnel_id);
Expand Down Expand Up @@ -167,36 +161,41 @@ impl TunnelServerListener {
if let Err(e) = self
.pproxy_command_tx
.send((
PProxyCommand::SendConnectCommand {
PProxyCommand::ConnectTunnel {
peer_id: self.peer_id,
tunnel_id,
},
tx,
))
.await
{
tracing::error!("Send connect command channel tx failed: {e:?}");
tracing::error!("ConnectTunnel tx failed: {e:?}");
continue;
}

match rx.await {
Err(e) => {
tracing::error!("Send connect command channel rx failed: {e:?}");
tracing::error!("ConnectTunnel rx failed: {e:?}");
continue;
}
Ok(Err(e)) => {
tracing::error!("Send connect command channel failed: {e:?}");
tracing::error!("ConnectTunnel response failed: {e:?}");
continue;
}
Ok(Ok(resp)) => match resp {
PProxyCommandResponse::SendConnectCommand { remote_stream } => {
if let Err(e) = tunnel.listen(stream, remote_stream).await {
PProxyCommandResponse::ConnectTunnel { remote_stream } => {
if let Err(e) = tunnel
.listen(self.pproxy_command_tx.clone(), stream, remote_stream)
.await
{
tracing::error!("Tunnel listen failed: {e:?}");
continue;
};
}
other_resp => {
tracing::error!("Send connect command channel got invalid pproxy command response {other_resp:?}");
tracing::error!(
"ConnectTunnel got invalid pproxy command response {other_resp:?}"
);
continue;
}
},
Expand All @@ -213,48 +212,105 @@ impl Tunnel {
peer_id,
tunnel_id,
listener: None,
listener_cancel_token: None,
}
}

pub async fn listen(
&mut self,
pproxy_command_tx: mpsc::Sender<(PProxyCommand, CommandNotifier)>,
local_stream: TcpStream,
remote_stream: Stream,
) -> Result<(), TunnelError> {
if self.listener.is_some() {
return Err(TunnelError::AlreadyListened);
}

let mut listener = TunnelListener::new(local_stream, remote_stream).await;
let listener_cancel_token = listener.cancel_token();
let mut listener = TunnelListener::new(
self.peer_id,
self.tunnel_id,
pproxy_command_tx,
local_stream,
remote_stream,
)
.await;
let listener_handler = tokio::spawn(Box::pin(async move { listener.listen().await }));

self.listener = Some(listener_handler);
self.listener_cancel_token = Some(listener_cancel_token);

Ok(())
}
}

impl TunnelListener {
async fn new(local_stream: TcpStream, remote_stream: Stream) -> Self {
async fn new(
peer_id: PeerId,
tunnel_id: TunnelId,
pproxy_command_tx: mpsc::Sender<(PProxyCommand, CommandNotifier)>,
local_stream: TcpStream,
remote_stream: Stream,
) -> Self {
let remote_stream = remote_stream.compat();
Self {
peer_id,
tunnel_id,
pproxy_command_tx,
local_stream,
remote_stream,
cancel_token: CancellationToken::new(),
}
}

fn cancel_token(&self) -> CancellationToken {
self.cancel_token.clone()
}

async fn listen(&mut self) {
tokio::io::copy_bidirectional(&mut self.local_stream, &mut self.remote_stream)
match tokio::io::copy_bidirectional(&mut self.local_stream, &mut self.remote_stream).await {
Ok((rn, wn)) => {
tracing::trace!(
"tcp tunnel {} <-> {} (peer_id) closed, L2R {} bytes, R2L {} bytes",
self.tunnel_id,
self.peer_id,
rn,
wn
);
}
Err(e) => {
tracing::warn!(
"tcp tunnel {} <-> {} (peer_id) closed with error: {}",
self.tunnel_id,
self.peer_id,
e,
);
}
}

let (tx, rx) = oneshot::channel();
if let Err(e) = self
.pproxy_command_tx
.send((
PProxyCommand::CleanTunnel {
peer_id: self.peer_id,
tunnel_id: self.tunnel_id,
},
tx,
))
.await
.unwrap();
{
tracing::error!("CleanTunnel tx failed: {e:?}");
}

match rx.await {
Err(e) => {
tracing::error!("CleanTunnel rx failed: {e:?}");
}
Ok(Err(e)) => {
tracing::error!("CleanTunnel response failed: {e:?}");
}
Ok(Ok(resp)) => match resp {
PProxyCommandResponse::CleanTunnel {} => {}
other_resp => {
tracing::error!(
"CleanTunnel got invalid pproxy command response {other_resp:?}"
);
}
},
}
}
}

Expand Down

0 comments on commit 2ec4998

Please sign in to comment.