From 04c2d7a2fb9ca57ba6de1adf8ca0df8082be71e1 Mon Sep 17 00:00:00 2001 From: magine Date: Sun, 14 Jul 2024 17:33:58 +0800 Subject: [PATCH] feat: replace sleep with notifier for tunnel open --- src/command.rs | 5 ++--- src/error.rs | 8 ++----- src/lib.rs | 59 ++++++++++++++++++++++++++++++++++++++++++-------- src/tunnel.rs | 1 - 4 files changed, 54 insertions(+), 19 deletions(-) diff --git a/src/command.rs b/src/command.rs index 6ec4a73..1a84b9b 100644 --- a/src/command.rs +++ b/src/command.rs @@ -1,7 +1,6 @@ use tonic::Request; use tonic::Response; use tonic::Status; -use tracing::trace; use crate::PProxyHandle; @@ -29,7 +28,7 @@ impl proto::command_service_server::CommandService for PProxyCommander { &self, request: Request, ) -> Result, Status> { - trace!("handle request: {:?}", request); + tracing::trace!("handle request: {:?}", request); self.handle .add_peer(request.into_inner()) @@ -43,7 +42,7 @@ impl proto::command_service_server::CommandService for PProxyCommander { request: tonic::Request, ) -> std::result::Result, tonic::Status> { - trace!("handle request: {:?}", request); + tracing::trace!("handle request: {:?}", request); self.handle .create_tunnel_server(request.into_inner()) diff --git a/src/error.rs b/src/error.rs index 69ef0dd..90e936b 100644 --- a/src/error.rs +++ b/src/error.rs @@ -19,16 +19,12 @@ pub enum Error { Litep2p(#[from] litep2p::Error), #[error("Litep2p request response error: {0:?}")] Litep2pRequestResponseError(litep2p::protocol::request_response::RequestResponseError), - #[error("Httparse error: {0}")] - Httparse(#[from] httparse::Error), - #[error("Incomplete http request")] - IncompleteHttpRequest, #[error("Protocol not support: {0}")] ProtocolNotSupport(String), - #[error("Io error: {0}")] - Io(#[from] std::io::Error), #[error("Unexpected response type")] UnexpectedResponseType, + #[error("Tunnel not waiting")] + TunnelNotWaiting(String), #[error("Tunnel error: {0:?}")] Tunnel(TunnelError), #[error("Protobuf decode error: {0}")] diff --git a/src/lib.rs b/src/lib.rs index d649e1d..996610e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -8,12 +8,12 @@ use futures::channel::oneshot; use litep2p::crypto::ed25519::SecretKey; use litep2p::protocol::request_response::DialOptions; use litep2p::protocol::request_response::RequestResponseEvent; +use litep2p::types::RequestId; use litep2p::PeerId; use multiaddr::Multiaddr; use multiaddr::Protocol; use prost::Message; use tokio::sync::mpsc; -use tracing::warn; use crate::command::proto::AddPeerRequest; use crate::command::proto::AddPeerResponse; @@ -77,6 +77,7 @@ pub struct PProxy { command_rx: mpsc::Receiver<(PProxyCommand, CommandNotifier)>, p2p_server: P2pServer, proxy_addr: Option, + outbound_ready_notifiers: HashMap, inbound_tunnels: HashMap<(PeerId, TunnelId), Tunnel>, tunnel_txs: HashMap<(PeerId, TunnelId), mpsc::Sender>>, } @@ -118,6 +119,7 @@ impl PProxy { command_rx, p2p_server: P2pServer::new(secret_key, server_addr), proxy_addr, + outbound_ready_notifiers: HashMap::new(), inbound_tunnels: HashMap::new(), tunnel_txs: HashMap::new(), }, @@ -138,14 +140,14 @@ impl PProxy { event = self.p2p_server.next_event() => match event { None => return, Some(event) => if let Err(error) = self.handle_p2p_server_event(event).await { - warn!("failed to handle event: {:?}", error); + tracing::warn!("failed to handle event: {:?}", error); } }, command = self.command_rx.recv() => match command { None => return, Some((command, tx)) => if let Err(error) = self.handle_command(command, tx).await { - warn!("failed to handle command: {:?}", error); + tracing::warn!("failed to handle command: {:?}", error); } } } @@ -166,7 +168,7 @@ impl PProxy { .. }) => { let msg = proto::Tunnel::decode(request.as_slice())?; - tracing::debug!("received Tunnel msg: {:?}", msg); + tracing::debug!("received Tunnel request msg: {:?}", msg); match msg.command() { proto::TunnelCommand::Connect => { @@ -221,11 +223,49 @@ impl PProxy { _ => { return Err(Error::ProtocolNotSupport( - "Wrong tunnel command".to_string(), + "Wrong tunnel request command".to_string(), )); } } } + P2pServerEvent::TunnelEvent(RequestResponseEvent::ResponseReceived { + peer, + request_id, + response, + .. + }) => { + // This is response of TunnelCommand::Package + if response.is_empty() { + return Ok(()); + } + + let msg = proto::Tunnel::decode(response.as_slice())?; + tracing::debug!("received Tunnel response msg: {:?}", msg); + + match msg.command() { + proto::TunnelCommand::ConnectResp => { + let tx = self + .outbound_ready_notifiers + .remove(&request_id) + .ok_or_else(|| { + Error::TunnelNotWaiting(format!( + "peer {}, tunnel {}", + peer, msg.tunnel_id + )) + })?; + + tx.send(Ok(PProxyCommandResponse::SendConnectCommand {})) + .map_err(|_| Error::EssentialTaskClosed)?; + } + + _ => { + return Err(Error::ProtocolNotSupport( + "Wrong tunnel response command".to_string(), + )); + } + } + } + _ => {} } @@ -286,15 +326,16 @@ impl PProxy { } .encode_to_vec(); - self.p2p_server + tracing::info!("send connect command to peer_id: {:?}", peer_id); + let request_id = self + .p2p_server .tunnel_handle .send_request(peer_id, request, DialOptions::Dial) .await?; - tracing::info!("send connect command to peer_id: {:?}", peer_id); + self.outbound_ready_notifiers.insert(request_id, tx); - tx.send(Ok(PProxyCommandResponse::SendConnectCommand {})) - .map_err(|_| Error::EssentialTaskClosed) + Ok(()) } async fn on_send_outbound_package_command( diff --git a/src/tunnel.rs b/src/tunnel.rs index 27b92b1..85b1309 100644 --- a/src/tunnel.rs +++ b/src/tunnel.rs @@ -156,7 +156,6 @@ impl TunnelServerListener { Ok(Ok(_resp)) => {} } - tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; if let Err(e) = tunnel.listen(stream, tunnel_rx).await { tracing::error!("Tunnel listen failed: {e:?}"); continue;