Skip to content

Commit

Permalink
feat: convert auth client to access client
Browse files Browse the repository at this point in the history
  • Loading branch information
Ma233 committed Aug 19, 2024
1 parent c710a80 commit c54be33
Show file tree
Hide file tree
Showing 7 changed files with 167 additions and 68 deletions.
22 changes: 11 additions & 11 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions proto/command_v1.proto
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,15 @@ message ConnectRelayResponse {
string relaied_multiaddr = 1;
}

message ExpirePeerAccessRequest {
string peer_id = 1;
}

message ExpirePeerAccessResponse {}

service CommandService {
rpc AddPeer(AddPeerRequest) returns (AddPeerResponse);
rpc CreateTunnelServer(CreateTunnelServerRequest) returns (CreateTunnelServerResponse);
rpc ConnectRelay(ConnectRelayRequest) returns (ConnectRelayResponse);
rpc ExpirePeerAccess(ExpirePeerAccessRequest) returns (ExpirePeerAccessResponse);
}
80 changes: 80 additions & 0 deletions src/access.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
use std::collections::HashMap;
use std::time::Duration;
use std::time::Instant;

use libp2p::PeerId;
use serde::Deserialize;

const ACCESS_TTL: Duration = Duration::from_secs(10 * 60);

pub struct AccessClient {
local_id: libp2p::PeerId,
endpoint: reqwest::Url,
client: reqwest::Client,
cache: HashMap<PeerId, (bool, Instant)>,
}

#[derive(Deserialize)]
struct AccessClientResponse {
data: bool,
}

impl AccessClient {
pub fn new(local_id: libp2p::PeerId, endpoint: reqwest::Url) -> AccessClient {
AccessClient {
local_id,
endpoint,
client: reqwest::Client::new(),
cache: HashMap::default(),
}
}

async fn request_endpoint(&mut self, peer: &PeerId) -> Result<bool, reqwest::Error> {
let url = self.endpoint.join("access-control").unwrap();
let params = [
("device", self.local_id.to_string()),
("token", peer.to_string()),
];

let response = self
.client
.get(url)
.query(&params)
.send()
.await?
.json::<AccessClientResponse>()
.await?;

Ok(response.data)
}

pub async fn is_valid(&mut self, peer: &PeerId) -> bool {
self.cache
.retain(|_, (_, created)| created.elapsed() < ACCESS_TTL);

if let Some((valid, _)) = self.cache.get(peer) {
return *valid;
}

match self.request_endpoint(peer).await {
Err(e) => {
tracing::error!(
"error while requesting access endpoint (will return false) for {}: {}",
peer,
e
);
false
}

Ok(valid) => {
self.cache.insert(*peer, (valid, Instant::now()));
valid
}
}
}

pub fn expire(&mut self, token: &PeerId) {
tracing::debug!("expire token: {} from cache", token);
self.cache.remove(token);
}
}
41 changes: 0 additions & 41 deletions src/auth.rs

This file was deleted.

13 changes: 13 additions & 0 deletions src/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,4 +63,17 @@ impl proto::command_service_server::CommandService for PProxyCommander {
.map(Response::new)
.map_err(|e| tonic::Status::internal(format!("{:?}", e)))
}

async fn expire_peer_access(
&self,
request: tonic::Request<proto::ExpirePeerAccessRequest>,
) -> std::result::Result<tonic::Response<proto::ExpirePeerAccessResponse>, tonic::Status> {
tracing::debug!("handle request: {:?}", request);

self.handle
.expire_peer_access(request.into_inner())
.await
.map(Response::new)
.map_err(|e| tonic::Status::internal(format!("{:?}", e)))
}
}
58 changes: 49 additions & 9 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@ use libp2p::PeerId;
use libp2p::Swarm;
use tokio::sync::mpsc;

use crate::auth::AuthClient;
use crate::access::AccessClient;
use crate::command::proto::AddPeerRequest;
use crate::command::proto::AddPeerResponse;
use crate::command::proto::ConnectRelayRequest;
use crate::command::proto::ConnectRelayResponse;
use crate::command::proto::CreateTunnelServerRequest;
use crate::command::proto::CreateTunnelServerResponse;
use crate::command::proto::ExpirePeerAccessRequest;
use crate::command::proto::ExpirePeerAccessResponse;
use crate::p2p::PProxyNetworkBehaviour;
use crate::p2p::PProxyNetworkBehaviourEvent;
use crate::tunnel::proto;
Expand All @@ -30,7 +32,7 @@ use crate::tunnel::Tunnel;
use crate::tunnel::TunnelServer;
use crate::types::*;

mod auth;
mod access;
pub mod command;
pub mod error;
mod p2p;
Expand Down Expand Up @@ -75,13 +77,17 @@ pub enum PProxyCommand {
tunnel_id: TunnelId,
data: Vec<u8>,
},
ExpirePeerAccess {
peer_id: PeerId,
},
}

pub enum PProxyCommandResponse {
AddPeer { peer_id: PeerId },
ConnectRelay { relaied_multiaddr: Multiaddr },
SendConnectCommand {},
SendOutboundPackageCommand {},
ExpirePeerAccess {},
}

pub struct PProxy {
Expand All @@ -92,7 +98,7 @@ pub struct PProxy {
outbound_ready_notifiers: HashMap<request_response::OutboundRequestId, CommandNotifier>,
inbound_tunnels: HashMap<(PeerId, TunnelId), Tunnel>,
tunnel_txs: HashMap<(PeerId, TunnelId), mpsc::Sender<Vec<u8>>>,
auth_client: Option<AuthClient>,
access_client: Option<AccessClient>,
}

pub struct PProxyHandle {
Expand All @@ -106,13 +112,13 @@ impl PProxy {
keypair: Keypair,
listen_addr: SocketAddr,
proxy_addr: Option<SocketAddr>,
auth_server_endpoint: Option<reqwest::Url>,
access_server_endpoint: Option<reqwest::Url>,
) -> Result<(Self, PProxyHandle)> {
let (command_tx, command_rx) = mpsc::channel(DEFAULT_CHANNEL_SIZE);
let swarm = crate::p2p::new_swarm(keypair, listen_addr)
.map_err(|e| Error::Libp2pSwarmCreateError(e.to_string()))?;
let auth_client =
auth_server_endpoint.map(|endpoint| AuthClient::new(*swarm.local_peer_id(), endpoint));
let access_client = access_server_endpoint
.map(|endpoint| AccessClient::new(*swarm.local_peer_id(), endpoint));

Ok((
Self {
Expand All @@ -123,7 +129,7 @@ impl PProxy {
outbound_ready_notifiers: HashMap::new(),
inbound_tunnels: HashMap::new(),
tunnel_txs: HashMap::new(),
auth_client,
access_client,
},
PProxyHandle {
command_tx,
Expand Down Expand Up @@ -187,8 +193,8 @@ impl PProxy {
request_response::Message::Request {
request, channel, ..
} => {
if let Some(auth_client) = &mut self.auth_client {
if !auth_client.is_valid(&peer.to_string()).await? {
if let Some(ac) = &mut self.access_client {
if !ac.is_valid(&peer).await {
// TODO: Manage tunnel lifecycle
return Err(Error::Tunnel(error::TunnelError::ConnectionClosed));
}
Expand Down Expand Up @@ -329,6 +335,9 @@ impl PProxy {
self.on_send_outbound_package_command(peer_id, tunnel_id, data, tx)
.await
}
PProxyCommand::ExpirePeerAccess { peer_id } => {
self.on_expire_peer_access(peer_id, tx).await
}
}
}

Expand Down Expand Up @@ -405,6 +414,17 @@ impl PProxy {
tx.send(Ok(PProxyCommandResponse::SendOutboundPackageCommand {}))
.map_err(|_| Error::EssentialTaskClosed)
}

async fn on_expire_peer_access(&mut self, peer_id: PeerId, tx: CommandNotifier) -> Result<()> {
if let Some(ref mut ac) = self.access_client {
ac.expire(&peer_id);
}

tx.send(Ok(PProxyCommandResponse::ExpirePeerAccess {}))
.map_err(|_| Error::EssentialTaskClosed)?;

Ok(())
}
}

impl PProxyHandle {
Expand Down Expand Up @@ -495,6 +515,26 @@ impl PProxyHandle {
_ => Err(Error::UnexpectedResponseType),
}
}

pub async fn expire_peer_access(
&self,
request: ExpirePeerAccessRequest,
) -> Result<ExpirePeerAccessResponse> {
let (tx, rx) = oneshot::channel();

let peer_id = request
.peer_id
.parse()
.map_err(|_| Error::PeerIdParseError(request.peer_id))?;

self.command_tx
.send((PProxyCommand::ExpirePeerAccess { peer_id }, tx))
.await?;

rx.await??;

Ok(ExpirePeerAccessResponse {})
}
}

fn extract_peer_id_from_multiaddr(multiaddr: &Multiaddr) -> Result<PeerId> {
Expand Down
Loading

0 comments on commit c54be33

Please sign in to comment.