Skip to content

Commit

Permalink
feat: replace sleep with notifier for tunnel open
Browse files Browse the repository at this point in the history
  • Loading branch information
Ma233 committed Jul 14, 2024
1 parent 9a7e185 commit 04c2d7a
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 19 deletions.
5 changes: 2 additions & 3 deletions src/command.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use tonic::Request;
use tonic::Response;
use tonic::Status;
use tracing::trace;

use crate::PProxyHandle;

Expand Down Expand Up @@ -29,7 +28,7 @@ impl proto::command_service_server::CommandService for PProxyCommander {
&self,
request: Request<proto::AddPeerRequest>,
) -> Result<Response<proto::AddPeerResponse>, Status> {
trace!("handle request: {:?}", request);
tracing::trace!("handle request: {:?}", request);

self.handle
.add_peer(request.into_inner())
Expand All @@ -43,7 +42,7 @@ impl proto::command_service_server::CommandService for PProxyCommander {
request: tonic::Request<proto::CreateTunnelServerRequest>,
) -> std::result::Result<tonic::Response<proto::CreateTunnelServerResponse>, tonic::Status>
{
trace!("handle request: {:?}", request);
tracing::trace!("handle request: {:?}", request);

self.handle
.create_tunnel_server(request.into_inner())
Expand Down
8 changes: 2 additions & 6 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,12 @@ pub enum Error {
Litep2p(#[from] litep2p::Error),
#[error("Litep2p request response error: {0:?}")]
Litep2pRequestResponseError(litep2p::protocol::request_response::RequestResponseError),
#[error("Httparse error: {0}")]
Httparse(#[from] httparse::Error),
#[error("Incomplete http request")]
IncompleteHttpRequest,
#[error("Protocol not support: {0}")]
ProtocolNotSupport(String),
#[error("Io error: {0}")]
Io(#[from] std::io::Error),
#[error("Unexpected response type")]
UnexpectedResponseType,
#[error("Tunnel not waiting")]
TunnelNotWaiting(String),
#[error("Tunnel error: {0:?}")]
Tunnel(TunnelError),
#[error("Protobuf decode error: {0}")]
Expand Down
59 changes: 50 additions & 9 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@ use futures::channel::oneshot;
use litep2p::crypto::ed25519::SecretKey;
use litep2p::protocol::request_response::DialOptions;
use litep2p::protocol::request_response::RequestResponseEvent;
use litep2p::types::RequestId;
use litep2p::PeerId;
use multiaddr::Multiaddr;
use multiaddr::Protocol;
use prost::Message;
use tokio::sync::mpsc;
use tracing::warn;

use crate::command::proto::AddPeerRequest;
use crate::command::proto::AddPeerResponse;
Expand Down Expand Up @@ -77,6 +77,7 @@ pub struct PProxy {
command_rx: mpsc::Receiver<(PProxyCommand, CommandNotifier)>,
p2p_server: P2pServer,
proxy_addr: Option<SocketAddr>,
outbound_ready_notifiers: HashMap<RequestId, CommandNotifier>,
inbound_tunnels: HashMap<(PeerId, TunnelId), Tunnel>,
tunnel_txs: HashMap<(PeerId, TunnelId), mpsc::Sender<Vec<u8>>>,
}
Expand Down Expand Up @@ -118,6 +119,7 @@ impl PProxy {
command_rx,
p2p_server: P2pServer::new(secret_key, server_addr),
proxy_addr,
outbound_ready_notifiers: HashMap::new(),
inbound_tunnels: HashMap::new(),
tunnel_txs: HashMap::new(),
},
Expand All @@ -138,14 +140,14 @@ impl PProxy {
event = self.p2p_server.next_event() => match event {
None => return,
Some(event) => if let Err(error) = self.handle_p2p_server_event(event).await {
warn!("failed to handle event: {:?}", error);
tracing::warn!("failed to handle event: {:?}", error);
}
},

command = self.command_rx.recv() => match command {
None => return,
Some((command, tx)) => if let Err(error) = self.handle_command(command, tx).await {
warn!("failed to handle command: {:?}", error);
tracing::warn!("failed to handle command: {:?}", error);
}
}
}
Expand All @@ -166,7 +168,7 @@ impl PProxy {
..
}) => {
let msg = proto::Tunnel::decode(request.as_slice())?;
tracing::debug!("received Tunnel msg: {:?}", msg);
tracing::debug!("received Tunnel request msg: {:?}", msg);

match msg.command() {
proto::TunnelCommand::Connect => {
Expand Down Expand Up @@ -221,11 +223,49 @@ impl PProxy {

_ => {
return Err(Error::ProtocolNotSupport(
"Wrong tunnel command".to_string(),
"Wrong tunnel request command".to_string(),
));
}
}
}
P2pServerEvent::TunnelEvent(RequestResponseEvent::ResponseReceived {
peer,
request_id,
response,
..
}) => {
// This is response of TunnelCommand::Package
if response.is_empty() {
return Ok(());
}

let msg = proto::Tunnel::decode(response.as_slice())?;
tracing::debug!("received Tunnel response msg: {:?}", msg);

match msg.command() {
proto::TunnelCommand::ConnectResp => {
let tx = self
.outbound_ready_notifiers
.remove(&request_id)
.ok_or_else(|| {
Error::TunnelNotWaiting(format!(
"peer {}, tunnel {}",
peer, msg.tunnel_id
))
})?;

tx.send(Ok(PProxyCommandResponse::SendConnectCommand {}))
.map_err(|_| Error::EssentialTaskClosed)?;
}

_ => {
return Err(Error::ProtocolNotSupport(
"Wrong tunnel response command".to_string(),
));
}
}
}

_ => {}
}

Expand Down Expand Up @@ -286,15 +326,16 @@ impl PProxy {
}
.encode_to_vec();

self.p2p_server
tracing::info!("send connect command to peer_id: {:?}", peer_id);
let request_id = self
.p2p_server
.tunnel_handle
.send_request(peer_id, request, DialOptions::Dial)
.await?;

tracing::info!("send connect command to peer_id: {:?}", peer_id);
self.outbound_ready_notifiers.insert(request_id, tx);

tx.send(Ok(PProxyCommandResponse::SendConnectCommand {}))
.map_err(|_| Error::EssentialTaskClosed)
Ok(())
}

async fn on_send_outbound_package_command(
Expand Down
1 change: 0 additions & 1 deletion src/tunnel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,6 @@ impl TunnelServerListener {
Ok(Ok(_resp)) => {}
}

tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
if let Err(e) = tunnel.listen(stream, tunnel_rx).await {
tracing::error!("Tunnel listen failed: {e:?}");
continue;
Expand Down

0 comments on commit 04c2d7a

Please sign in to comment.