diff --git a/src/lib.rs b/src/lib.rs index a1f4967..68cc223 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -73,7 +73,11 @@ pub enum PProxyCommand { ConnectRelay { multiaddr: Multiaddr, }, - SendConnectCommand { + ConnectTunnel { + peer_id: PeerId, + tunnel_id: TunnelId, + }, + CleanTunnel { peer_id: PeerId, tunnel_id: TunnelId, }, @@ -86,14 +90,17 @@ pub enum PProxyCommand { pub enum PProxyCommandResponse { AddPeer { peer_id: PeerId }, ConnectRelay { relaied_multiaddr: Multiaddr }, - SendConnectCommand { remote_stream: Stream }, + ConnectTunnel { remote_stream: Stream }, + CleanTunnel {}, ExpirePeerAccess {}, } pub struct PProxy { + command_tx: mpsc::Sender<(PProxyCommand, CommandNotifier)>, command_rx: mpsc::Receiver<(PProxyCommand, CommandNotifier)>, swarm: Swarm, stream_control: libp2p_stream::Control, + inbound_tunnels: HashMap<(PeerId, TunnelId), Tunnel>, proxy_addr: Option, access_client: Option, } @@ -119,9 +126,11 @@ impl PProxy { Ok(( Self { + command_tx: command_tx.clone(), command_rx, swarm, stream_control, + inbound_tunnels: HashMap::new(), proxy_addr, access_client, }, @@ -179,6 +188,9 @@ impl PProxy { address.push(Protocol::P2p(*self.swarm.local_peer_id())); println!("Local node is listening on {address}"); } + SwarmEvent::ConnectionClosed { peer_id, .. } => { + self.inbound_tunnels.retain(|(p, _), _| p != &peer_id); + } _ => {} } @@ -226,7 +238,10 @@ impl PProxy { .write_to(&mut remote_stream) .await?; - tunnel.listen(local_stream, remote_stream).await?; + tunnel + .listen(self.command_tx.clone(), local_stream, remote_stream) + .await?; + self.inbound_tunnels.insert((peer_id, header.id), tunnel); } } @@ -239,8 +254,11 @@ impl PProxy { self.on_add_peer(multiaddr, peer_id, tx).await } PProxyCommand::ConnectRelay { multiaddr } => self.on_connect_relay(multiaddr, tx).await, - PProxyCommand::SendConnectCommand { peer_id, tunnel_id } => { - self.on_send_connect_command(peer_id, tunnel_id, tx).await + PProxyCommand::ConnectTunnel { peer_id, tunnel_id } => { + self.on_connect_tunnel(peer_id, tunnel_id, tx).await + } + PProxyCommand::CleanTunnel { peer_id, tunnel_id } => { + self.on_clean_tunnel(peer_id, tunnel_id, tx).await } PProxyCommand::ExpirePeerAccess { peer_id } => { self.on_expire_peer_access(peer_id, tx).await @@ -272,7 +290,7 @@ impl PProxy { .map_err(|_| Error::EssentialTaskClosed) } - async fn on_send_connect_command( + async fn on_connect_tunnel( &mut self, peer_id: PeerId, tunnel_id: TunnelId, @@ -291,10 +309,8 @@ impl PProxy { match resp.reply { TunnelReply::Succeeded => { - tx.send(Ok(PProxyCommandResponse::SendConnectCommand { - remote_stream, - })) - .map_err(|_| Error::EssentialTaskClosed)?; + tx.send(Ok(PProxyCommandResponse::ConnectTunnel { remote_stream })) + .map_err(|_| Error::EssentialTaskClosed)?; } e => { remote_stream.close().await?; @@ -308,6 +324,17 @@ impl PProxy { Ok(()) } + async fn on_clean_tunnel( + &mut self, + peer_id: PeerId, + tunnel_id: TunnelId, + tx: CommandNotifier, + ) -> Result<()> { + self.inbound_tunnels.remove(&(peer_id, tunnel_id)); + tx.send(Ok(PProxyCommandResponse::CleanTunnel {})) + .map_err(|_| Error::EssentialTaskClosed) + } + async fn on_expire_peer_access(&mut self, peer_id: PeerId, tx: CommandNotifier) -> Result<()> { if let Some(ref mut ac) = self.access_client { ac.expire(&peer_id); diff --git a/src/tunnel/mod.rs b/src/tunnel/mod.rs index b491ae6..3e130d9 100644 --- a/src/tunnel/mod.rs +++ b/src/tunnel/mod.rs @@ -43,14 +43,15 @@ pub struct TunnelServerListener { pub struct Tunnel { peer_id: PeerId, tunnel_id: TunnelId, - listener_cancel_token: Option, listener: Option>, } pub struct TunnelListener { + peer_id: PeerId, + tunnel_id: TunnelId, + pproxy_command_tx: mpsc::Sender<(PProxyCommand, CommandNotifier)>, local_stream: TcpStream, remote_stream: Compat, - cancel_token: CancellationToken, } impl Drop for TunnelServer { @@ -72,15 +73,8 @@ impl Drop for TunnelServer { impl Drop for Tunnel { fn drop(&mut self) { - if let Some(cancel_token) = self.listener_cancel_token.take() { - cancel_token.cancel(); - } - if let Some(listener) = self.listener.take() { - tokio::spawn(async move { - tokio::time::sleep(tokio::time::Duration::from_secs(3)).await; - listener.abort(); - }); + listener.abort(); } tracing::info!("Tunnel {}-{} dropped", self.peer_id, self.tunnel_id); @@ -167,7 +161,7 @@ impl TunnelServerListener { if let Err(e) = self .pproxy_command_tx .send(( - PProxyCommand::SendConnectCommand { + PProxyCommand::ConnectTunnel { peer_id: self.peer_id, tunnel_id, }, @@ -175,28 +169,33 @@ impl TunnelServerListener { )) .await { - tracing::error!("Send connect command channel tx failed: {e:?}"); + tracing::error!("ConnectTunnel tx failed: {e:?}"); continue; } match rx.await { Err(e) => { - tracing::error!("Send connect command channel rx failed: {e:?}"); + tracing::error!("ConnectTunnel rx failed: {e:?}"); continue; } Ok(Err(e)) => { - tracing::error!("Send connect command channel failed: {e:?}"); + tracing::error!("ConnectTunnel response failed: {e:?}"); continue; } Ok(Ok(resp)) => match resp { - PProxyCommandResponse::SendConnectCommand { remote_stream } => { - if let Err(e) = tunnel.listen(stream, remote_stream).await { + PProxyCommandResponse::ConnectTunnel { remote_stream } => { + if let Err(e) = tunnel + .listen(self.pproxy_command_tx.clone(), stream, remote_stream) + .await + { tracing::error!("Tunnel listen failed: {e:?}"); continue; }; } other_resp => { - tracing::error!("Send connect command channel got invalid pproxy command response {other_resp:?}"); + tracing::error!( + "ConnectTunnel got invalid pproxy command response {other_resp:?}" + ); continue; } }, @@ -213,12 +212,12 @@ impl Tunnel { peer_id, tunnel_id, listener: None, - listener_cancel_token: None, } } pub async fn listen( &mut self, + pproxy_command_tx: mpsc::Sender<(PProxyCommand, CommandNotifier)>, local_stream: TcpStream, remote_stream: Stream, ) -> Result<(), TunnelError> { @@ -226,35 +225,92 @@ impl Tunnel { return Err(TunnelError::AlreadyListened); } - let mut listener = TunnelListener::new(local_stream, remote_stream).await; - let listener_cancel_token = listener.cancel_token(); + let mut listener = TunnelListener::new( + self.peer_id, + self.tunnel_id, + pproxy_command_tx, + local_stream, + remote_stream, + ) + .await; let listener_handler = tokio::spawn(Box::pin(async move { listener.listen().await })); self.listener = Some(listener_handler); - self.listener_cancel_token = Some(listener_cancel_token); Ok(()) } } impl TunnelListener { - async fn new(local_stream: TcpStream, remote_stream: Stream) -> Self { + async fn new( + peer_id: PeerId, + tunnel_id: TunnelId, + pproxy_command_tx: mpsc::Sender<(PProxyCommand, CommandNotifier)>, + local_stream: TcpStream, + remote_stream: Stream, + ) -> Self { let remote_stream = remote_stream.compat(); Self { + peer_id, + tunnel_id, + pproxy_command_tx, local_stream, remote_stream, - cancel_token: CancellationToken::new(), } } - fn cancel_token(&self) -> CancellationToken { - self.cancel_token.clone() - } - async fn listen(&mut self) { - tokio::io::copy_bidirectional(&mut self.local_stream, &mut self.remote_stream) + match tokio::io::copy_bidirectional(&mut self.local_stream, &mut self.remote_stream).await { + Ok((rn, wn)) => { + tracing::trace!( + "tcp tunnel {} <-> {} (peer_id) closed, L2R {} bytes, R2L {} bytes", + self.tunnel_id, + self.peer_id, + rn, + wn + ); + } + Err(e) => { + tracing::warn!( + "tcp tunnel {} <-> {} (peer_id) closed with error: {}", + self.tunnel_id, + self.peer_id, + e, + ); + } + } + + let (tx, rx) = oneshot::channel(); + if let Err(e) = self + .pproxy_command_tx + .send(( + PProxyCommand::CleanTunnel { + peer_id: self.peer_id, + tunnel_id: self.tunnel_id, + }, + tx, + )) .await - .unwrap(); + { + tracing::error!("CleanTunnel tx failed: {e:?}"); + } + + match rx.await { + Err(e) => { + tracing::error!("CleanTunnel rx failed: {e:?}"); + } + Ok(Err(e)) => { + tracing::error!("CleanTunnel response failed: {e:?}"); + } + Ok(Ok(resp)) => match resp { + PProxyCommandResponse::CleanTunnel {} => {} + other_resp => { + tracing::error!( + "CleanTunnel got invalid pproxy command response {other_resp:?}" + ); + } + }, + } } }