Skip to content

Commit

Permalink
feat: add connect_relay in command line
Browse files Browse the repository at this point in the history
  • Loading branch information
Ma233 committed Jul 30, 2024
1 parent 57f8019 commit 6802e50
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 36 deletions.
6 changes: 3 additions & 3 deletions proto/command_v1.proto
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ syntax = "proto3";
package command.v1;

message AddPeerRequest {
string address = 1;
string multiaddr = 1;
optional string peer_id = 2;
}

Expand All @@ -21,11 +21,11 @@ message CreateTunnelServerResponse {
}

message ConnectRelayRequest {
string address = 1;
string multiaddr = 1;
}

message ConnectRelayResponse {
string relaied_address = 1;
string relaied_multiaddr = 1;
}

service CommandService {
Expand Down
60 changes: 31 additions & 29 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::sync::Mutex;
use futures::channel::oneshot;
use futures::StreamExt;
use libp2p::identity::Keypair;
use libp2p::multiaddr;
use libp2p::multiaddr::Protocol;
use libp2p::request_response;
use libp2p::swarm::SwarmEvent;
use libp2p::Multiaddr;
Expand Down Expand Up @@ -58,11 +58,11 @@ type CommandNotifier = oneshot::Sender<CommandNotification>;
#[derive(Debug)]
pub enum PProxyCommand {
AddPeer {
address: Multiaddr,
multiaddr: Multiaddr,
peer_id: PeerId,
},
ConnectRelay {
address: Multiaddr,
multiaddr: Multiaddr,
},
SendConnectCommand {
peer_id: PeerId,
Expand All @@ -78,7 +78,7 @@ pub enum PProxyCommand {

pub enum PProxyCommandResponse {
AddPeer { peer_id: PeerId },
ConnectRelay { relaied_address: Multiaddr },
ConnectRelay { relaied_multiaddr: Multiaddr },
SendConnectCommand {},
SendOutboundPackageCommand {},
}
Expand Down Expand Up @@ -284,7 +284,7 @@ impl PProxy {
},

SwarmEvent::NewListenAddr { mut address, .. } => {
address.push(multiaddr::Protocol::P2p(*self.swarm.local_peer_id()));
address.push(Protocol::P2p(*self.swarm.local_peer_id()));
println!("Local node is listening on {address}");
}

Expand All @@ -296,10 +296,10 @@ impl PProxy {

async fn handle_command(&mut self, command: PProxyCommand, tx: CommandNotifier) -> Result<()> {
match command {
PProxyCommand::AddPeer { address, peer_id } => {
self.on_add_peer(address, peer_id, tx).await
PProxyCommand::AddPeer { multiaddr, peer_id } => {
self.on_add_peer(multiaddr, peer_id, tx).await
}
PProxyCommand::ConnectRelay { address } => self.on_connect_relay(address, tx).await,
PProxyCommand::ConnectRelay { multiaddr } => self.on_connect_relay(multiaddr, tx).await,
PProxyCommand::SendConnectCommand {
peer_id,
tunnel_id,
Expand All @@ -321,25 +321,27 @@ impl PProxy {

async fn on_add_peer(
&mut self,
addr: Multiaddr,
multiaddr: Multiaddr,
peer_id: PeerId,
tx: CommandNotifier,
) -> Result<()> {
self.swarm.add_peer_address(peer_id, addr);
self.swarm.add_peer_address(peer_id, multiaddr);

tx.send(Ok(PProxyCommandResponse::AddPeer { peer_id }))
.map_err(|_| Error::EssentialTaskClosed)
}

async fn on_connect_relay(&mut self, address: Multiaddr, tx: CommandNotifier) -> Result<()> {
let relaied_address = address
.with(multiaddr::Protocol::P2pCircuit)
.with(multiaddr::Protocol::P2p(*self.swarm.local_peer_id()));
async fn on_connect_relay(&mut self, multiaddr: Multiaddr, tx: CommandNotifier) -> Result<()> {
let relaied_multiaddr = multiaddr
.with(Protocol::P2pCircuit)
.with(Protocol::P2p(*self.swarm.local_peer_id()));

self.swarm.listen_on(relaied_address.clone())?;
self.swarm.listen_on(relaied_multiaddr.clone())?;

tx.send(Ok(PProxyCommandResponse::ConnectRelay { relaied_address }))
.map_err(|_| Error::EssentialTaskClosed)
tx.send(Ok(PProxyCommandResponse::ConnectRelay {
relaied_multiaddr,
}))
.map_err(|_| Error::EssentialTaskClosed)
}

async fn on_send_connect_command(
Expand Down Expand Up @@ -396,13 +398,13 @@ impl PProxyHandle {
pub async fn add_peer(&self, request: AddPeerRequest) -> Result<AddPeerResponse> {
let (tx, rx) = oneshot::channel();

let address: Multiaddr = request
.address
let multiaddr: Multiaddr = request
.multiaddr
.parse()
.map_err(|_| Error::MultiaddrParseError(request.address.clone()))?;
.map_err(|_| Error::MultiaddrParseError(request.multiaddr.clone()))?;

let peer_id = request.peer_id.map_or_else(
|| extract_peer_id_from_multiaddr(&address),
|| extract_peer_id_from_multiaddr(&multiaddr),
|peer_id| {
peer_id
.parse()
Expand All @@ -411,7 +413,7 @@ impl PProxyHandle {
)?;

self.command_tx
.send((PProxyCommand::AddPeer { address, peer_id }, tx))
.send((PProxyCommand::AddPeer { multiaddr, peer_id }, tx))
.await?;

let response = rx.await??;
Expand Down Expand Up @@ -462,20 +464,20 @@ impl PProxyHandle {
) -> Result<ConnectRelayResponse> {
let (tx, rx) = oneshot::channel();

let address: Multiaddr = request
.address
let multiaddr: Multiaddr = request
.multiaddr
.parse()
.map_err(|_| Error::MultiaddrParseError(request.address.clone()))?;
.map_err(|_| Error::MultiaddrParseError(request.multiaddr.clone()))?;

self.command_tx
.send((PProxyCommand::ConnectRelay { address }, tx))
.send((PProxyCommand::ConnectRelay { multiaddr }, tx))
.await?;

let response = rx.await??;

match response {
PProxyCommandResponse::ConnectRelay { relaied_address } => Ok(ConnectRelayResponse {
relaied_address: relaied_address.to_string(),
PProxyCommandResponse::ConnectRelay { relaied_multiaddr } => Ok(ConnectRelayResponse {
relaied_multiaddr: relaied_multiaddr.to_string(),
}),
_ => Err(Error::UnexpectedResponseType),
}
Expand All @@ -485,7 +487,7 @@ impl PProxyHandle {
fn extract_peer_id_from_multiaddr(multiaddr: &Multiaddr) -> Result<PeerId> {
let protocol = multiaddr.iter().last();

let Some(multiaddr::Protocol::P2p(peer_id)) = protocol else {
let Some(Protocol::P2p(peer_id)) = protocol else {
return Err(Error::FailedToExtractPeerIdFromMultiaddr(
multiaddr.to_string(),
));
Expand Down
59 changes: 55 additions & 4 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use clap::ArgMatches;
use clap::Command;
use dephy_pproxy::command::proto::command_service_client::CommandServiceClient;
use dephy_pproxy::command::proto::AddPeerRequest;
use dephy_pproxy::command::proto::ConnectRelayRequest;
use dephy_pproxy::command::proto::CreateTunnelServerRequest;
use dephy_pproxy::command::PProxyCommander;
use dephy_pproxy::PProxy;
Expand Down Expand Up @@ -66,7 +67,7 @@ fn parse_args() -> Command {
.long("tunnel-server-addr")
.num_args(1)
.action(ArgAction::Set)
.help("Tunnel server address, if not set a random port will be used"),
.help("Tunnel server address. If not set, a random port will be used"),
)
.arg(
Arg::new("PEER_MULTIADDR")
Expand All @@ -77,10 +78,30 @@ fn parse_args() -> Command {
.help("The multiaddr of the remote peer"),
);

let connect_relay = Command::new("connect_relay")
.about("Connect to a relay server")
.arg(
Arg::new("COMMANDER_SERVER_ADDR")
.long("commander-server-addr")
.num_args(1)
.default_value("127.0.0.1:10086")
.action(ArgAction::Set)
.help("Commander server address"),
)
.arg(
Arg::new("RELAY_MULTIADDR")
.long("relay-multiaddr")
.num_args(1)
.action(ArgAction::Set)
.required(true)
.help("Relay server multiaddr"),
);

app = app
.arg_required_else_help(true)
.subcommand(serve)
.subcommand(create_tunnel_server);
.subcommand(create_tunnel_server)
.subcommand(connect_relay);

app
}
Expand Down Expand Up @@ -151,15 +172,15 @@ async fn create_tunnel_server(args: &ArgMatches) {
.get_one::<String>("PEER_MULTIADDR")
.unwrap()
.parse::<Multiaddr>()
.expect("Missing peer multiaddr");
.expect("Invalid peer multiaddr");

let mut client = CommandServiceClient::connect(format!("http://{}", commander_server_addr))
.await
.expect("Connect to commander server failed");

let pp_response = client
.add_peer(AddPeerRequest {
address: peer_multiaddr.to_string(),
multiaddr: peer_multiaddr.to_string(),
peer_id: None,
})
.await
Expand All @@ -180,6 +201,33 @@ async fn create_tunnel_server(args: &ArgMatches) {
println!("tunnel_server_addr: {}", pp_response.address);
}

async fn connect_relay(args: &ArgMatches) {
let commander_server_addr = args
.get_one::<String>("COMMANDER_SERVER_ADDR")
.unwrap()
.parse::<SocketAddr>()
.expect("Invalid command server address");
let relay_multiaddr = args
.get_one::<String>("RELAY_MULTIADDR")
.unwrap()
.parse::<Multiaddr>()
.expect("Invalid relay multiaddr");

let mut client = CommandServiceClient::connect(format!("http://{}", commander_server_addr))
.await
.expect("Connect to commander server failed");

let pp_response = client
.connect_relay(ConnectRelayRequest {
multiaddr: relay_multiaddr.to_string(),
})
.await
.expect("Connect relay failed")
.into_inner();

println!("relaied_multiaddr: {}", pp_response.relaied_multiaddr);
}

#[tokio::main]
async fn main() {
let _ = tracing_subscriber::fmt()
Expand All @@ -195,6 +243,9 @@ async fn main() {
Some(("create_tunnel_server", args)) => {
create_tunnel_server(args).await;
}
Some(("connect_relay", args)) => {
connect_relay(args).await;
}
_ => unreachable!(),
}
}

0 comments on commit 6802e50

Please sign in to comment.