Skip to content

Commit

Permalink
Replace http server with tcp tunnel
Browse files Browse the repository at this point in the history
  • Loading branch information
Ma233 committed Jul 1, 2024
1 parent 2835c74 commit b3480a5
Show file tree
Hide file tree
Showing 13 changed files with 723 additions and 525 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ edition = "2021"

[dependencies]
base64 = "0.22.1"
bytes = "1.6.0"
clap = "4.5.4"
futures = "0.3.30"
futures-util = "0.3.30"
Expand All @@ -23,6 +22,7 @@ percent-encoding = "2.3.1"
prost = "0.12.4"
thiserror = "1.0.60"
tokio = { version = "1.37.0", features = ["rt-multi-thread"] }
tokio-util = "0.7.11"
tonic = "0.11.0"
tonic-web = "0.11.0"
tracing = "0.1.40"
Expand Down
1 change: 1 addition & 0 deletions buf.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ version: v1
lint:
except:
- PACKAGE_DIRECTORY_MATCH
- DIRECTORY_SAME_PACKAGE
3 changes: 2 additions & 1 deletion build.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
fn main() -> Result<(), Box<dyn std::error::Error>> {
let protos = ["proto/command_v1.proto", "proto/tunnel_v1.proto"];
tonic_build::configure()
.protoc_arg("--experimental_allow_proto3_optional")
.compile(&["proto/command_v1.proto"], &["proto"])?;
.compile(&protos, &["proto"])?;
Ok(())
}
11 changes: 6 additions & 5 deletions proto/command_v1.proto
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,17 @@ message AddPeerResponse {
string peer_id = 1;
}

message RequestHttpServerRequest {
message CreateTunnelServerRequest {
string peer_id = 1;
bytes data = 2;
optional string address = 2;
}

message RequestHttpServerResponse {
bytes data = 1;
message CreateTunnelServerResponse {
string peer_id = 1;
string address = 2;
}

service CommandService {
rpc AddPeer(AddPeerRequest) returns (AddPeerResponse);
rpc RequestHttpServer(RequestHttpServerRequest) returns (RequestHttpServerResponse);
rpc CreateTunnelServer(CreateTunnelServerRequest) returns (CreateTunnelServerResponse);
}
16 changes: 16 additions & 0 deletions proto/tunnel_v1.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
syntax = "proto3";
package tunnel.v1;

enum TunnelCommand {
TUNNEL_COMMAND_UNSPECIFIED = 0;
TUNNEL_COMMAND_CONNECT = 1;
TUNNEL_COMMAND_CONNECT_RESP = 2;
TUNNEL_COMMAND_INBOUND_PACKAGE = 3;
TUNNEL_COMMAND_OUTBOUND_PACKAGE = 4;
}

message Tunnel {
string tunnel_id = 1;
TunnelCommand command = 2;
optional bytes data = 3;
}
9 changes: 5 additions & 4 deletions src/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,15 @@ impl proto::command_service_server::CommandService for PProxyCommander {
.map_err(|e| tonic::Status::internal(format!("{:?}", e)))
}

async fn request_http_server(
async fn create_tunnel_server(
&self,
request: tonic::Request<proto::RequestHttpServerRequest>,
) -> std::result::Result<tonic::Response<proto::RequestHttpServerResponse>, tonic::Status> {
request: tonic::Request<proto::CreateTunnelServerRequest>,
) -> std::result::Result<tonic::Response<proto::CreateTunnelServerResponse>, tonic::Status>
{
trace!("handle request: {:?}", request);

self.handle
.request_http_server(request.into_inner())
.create_tunnel_server(request.into_inner())
.await
.map(Response::new)
.map_err(|e| tonic::Status::internal(format!("{:?}", e)))
Expand Down
72 changes: 68 additions & 4 deletions src/error.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
use std::io::ErrorKind as IOErrorKind;

#[derive(Debug, thiserror::Error)]
#[non_exhaustive]
pub enum Error {
#[error("Multiaddr {0} parse error")]
#[error("Multiaddr parse error: {0}")]
MultiaddrParseError(String),
#[error("SocketAddr parse error: {0}")]
SocketAddrParseError(String),
#[error("Failed to extract peer id from multiaddr: {0}")]
FailedToExtractPeerIdFromMultiaddr(String),
#[error("PeerId {0} parse error")]
#[error("PeerId parse error: {0}")]
PeerIdParseError(String),
#[error("TunnelId parse error: {0}")]
TunnelIdParseError(String),
#[error("Essential task closed")]
EssentialTaskClosed,
#[error("Litep2p error: {0}")]
Expand All @@ -16,12 +23,44 @@ pub enum Error {
Httparse(#[from] httparse::Error),
#[error("Incomplete http request")]
IncompleteHttpRequest,
#[error("Protocol not support")]
ProtocolNotSupport,
#[error("Protocol not support: {0}")]
ProtocolNotSupport(String),
#[error("Io error: {0}")]
Io(#[from] std::io::Error),
#[error("Unexpected response type")]
UnexpectedResponseType,
#[error("Tunnel error: {0:?}")]
Tunnel(TunnelError),
#[error("Protobuf decode error: {0}")]
ProtobufDecode(#[from] prost::DecodeError),
}

/// A list specifying general categories of Tunnel error like [std::io::ErrorKind].
#[derive(Debug, Clone, Copy)]
#[repr(u8)]
#[non_exhaustive]
pub enum TunnelError {
/// Failed to send data to peer.
DataSendFailed = 1,
/// The connection timed out when dialing.
ConnectionTimeout = 2,
/// Got [std::io::ErrorKind::ConnectionRefused] error from local stream.
ConnectionRefused = 3,
/// Got [std::io::ErrorKind::ConnectionAborted] error from local stream.
ConnectionAborted = 4,
/// Got [std::io::ErrorKind::ConnectionReset] error from local stream.
ConnectionReset = 5,
/// Got [std::io::ErrorKind::NotConnected] error from local stream.
NotConnected = 6,
/// The connection is closed by peer.
ConnectionClosed = 7,
/// A socket address could not be bound because the address is already in
/// use elsewhere.
AddrInUse = 8,
/// Tunnel already listened.
TunnelInUse = 9,
/// Unknown [std::io::ErrorKind] error.
Unknown = u8::MAX,
}

impl<T> From<tokio::sync::mpsc::error::SendError<T>> for Error {
Expand All @@ -41,3 +80,28 @@ impl From<litep2p::protocol::request_response::RequestResponseError> for Error {
Error::Litep2pRequestResponseError(err)
}
}

impl From<TunnelError> for Error {
fn from(error: TunnelError) -> Self {
Error::Tunnel(error)
}
}

impl From<IOErrorKind> for TunnelError {
fn from(kind: IOErrorKind) -> TunnelError {
match kind {
IOErrorKind::ConnectionRefused => TunnelError::ConnectionRefused,
IOErrorKind::ConnectionAborted => TunnelError::ConnectionAborted,
IOErrorKind::ConnectionReset => TunnelError::ConnectionReset,
IOErrorKind::NotConnected => TunnelError::NotConnected,
IOErrorKind::AddrInUse => TunnelError::AddrInUse,
_ => TunnelError::Unknown,
}
}
}

impl From<std::io::Error> for TunnelError {
fn from(error: std::io::Error) -> TunnelError {
error.kind().into()
}
}
Loading

0 comments on commit b3480a5

Please sign in to comment.