diff --git a/Cargo.toml b/Cargo.toml index 0aac40b4fc..c437501f2f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -50,16 +50,7 @@ spmc = "0.3" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" tokio = { version = "1", features = [ - "fs", - "macros", - "net", - "io-std", - "io-util", - "rt", - "rt-multi-thread", # so examples can use #[tokio::main] - "sync", - "time", - "test-util", + "full" ] } tokio-test = "0.4" tokio-util = "0.7.10" @@ -70,10 +61,10 @@ default = [] # Easily turn it all on full = [ - "client", - "http1", - "http2", - "server", + "client", + "http1", + "http2", + "server", ] # HTTP versions @@ -108,6 +99,11 @@ incremental = false codegen-units = 1 incremental = false +[[example]] +name = "multi_layer_proxy" +path = "examples/multi_layer_proxy.rs" +required-features = ["full"] + [[example]] name = "client" path = "examples/client.rs" diff --git a/examples/README.md b/examples/README.md index de38911e9c..e798cb2ade 100644 --- a/examples/README.md +++ b/examples/README.md @@ -1,10 +1,11 @@ # Examples of using hyper -These examples show how to do common tasks using `hyper`. You may also find the [Guides](https://hyper.rs/guides/1/) helpful. +These examples show how to do common tasks using `hyper`. You may also find the [Guides](https://hyper.rs/guides/1/) +helpful. If you checkout this repository, you can run any of the examples with the command: - `cargo run --example {example_name} --features="full"` +`cargo run --example {example_name} --features="full"` ### Dependencies @@ -28,9 +29,11 @@ futures-util = { version = "0.3", default-features = false } ### Clients -* [`client`](client.rs) - A simple CLI http client that requests the url passed in parameters and outputs the response content and details to the stdout, reading content chunk-by-chunk. +* [`client`](client.rs) - A simple CLI http client that requests the url passed in parameters and outputs the response + content and details to the stdout, reading content chunk-by-chunk. -* [`client_json`](client_json.rs) - A simple program that GETs some json, reads the body asynchronously, parses it with serde and outputs the result. +* [`client_json`](client_json.rs) - A simple program that GETs some json, reads the body asynchronously, parses it with + serde and outputs the result. ### Servers @@ -42,22 +45,36 @@ futures-util = { version = "0.3", default-features = false } * [`gateway`](gateway.rs) - A server gateway (reverse proxy) that proxies to the `hello` service above. -* [`graceful_shutdown`](graceful_shutdown.rs) - A server that has a timeout for incoming connections and does graceful connection shutdown. +* [`graceful_shutdown`](graceful_shutdown.rs) - A server that has a timeout for incoming connections and does graceful + connection shutdown. -* [`http_proxy`](http_proxy.rs) - A simple HTTP(S) proxy that handle and upgrade `CONNECT` requests and then proxy data between client and remote server. +* [`http_proxy`](http_proxy.rs) - A simple HTTP(S) proxy that handle and upgrade `CONNECT` requests and then proxy data + between client and remote server. * [`multi_server`](multi_server.rs) - A server that listens to two different ports, a different `Service` per port. -* [`params`](params.rs) - A webserver that accept a form, with a name and a number, checks the parameters are presents and validates the input. +* [`params`](params.rs) - A webserver that accept a form, with a name and a number, checks the parameters are presents + and validates the input. -* [`send_file`](send_file.rs) - A server that sends back content of files using tokio-util to read the files asynchronously. +* [`send_file`](send_file.rs) - A server that sends back content of files using tokio-util to read the files + asynchronously. -* [`service_struct_impl`](service_struct_impl.rs) - A struct that manually implements the `Service` trait and uses a shared counter across requests. +* [`service_struct_impl`](service_struct_impl.rs) - A struct that manually implements the `Service` trait and uses a + shared counter across requests. -* [`single_threaded`](single_threaded.rs) - A server only running on 1 thread, so it can make use of `!Send` app state (like an `Rc` counter). +* [`single_threaded`](single_threaded.rs) - A server only running on 1 thread, so it can make use of `!Send` app state ( + like an `Rc` counter). -* [`state`](state.rs) - A webserver showing basic state sharing among requests. A counter is shared, incremented for every request, and every response is sent the last count. +* [`state`](state.rs) - A webserver showing basic state sharing among requests. A counter is shared, incremented for + every request, and every response is sent the last count. * [`upgrades`](upgrades.rs) - A server and client demonstrating how to do HTTP upgrades (such as WebSockets). -* [`web_api`](web_api.rs) - A server consisting in a service that returns incoming POST request's content in the response in uppercase and a service that calls the first service and includes the first service response in its own response. +* [`web_api`](web_api.rs) - A server consisting in a service that returns incoming POST request's content in the + response in uppercase and a service that calls the first service and includes the first service response in its own + response. + +* [`multi_layer_proxy`](multi_layer_proxy.rs) - In this configuration we have a `public` `master` server, which accepts + **outgoing** connections from `endpoint` servers. + The reason for using outgoing connections, is to avoid the need to open firewall ports. + The `master` will receive requests and forward them one of the servers connected to it. \ No newline at end of file diff --git a/examples/multi_layer_proxy.rs b/examples/multi_layer_proxy.rs new file mode 100644 index 0000000000..64c1458ad8 --- /dev/null +++ b/examples/multi_layer_proxy.rs @@ -0,0 +1,340 @@ +use futures_util::future::join_all; +use std::net::SocketAddr; +use tokio::net::TcpListener; + +#[path = "../benches/support/mod.rs"] +mod support; +use support::TokioIo; + +pub mod helpers { + use bytes::Bytes; + use http_body_util::combinators::BoxBody; + use http_body_util::{BodyExt, Empty, Full}; + + pub fn host_addr(uri: &http::Uri) -> Option { + uri.authority().and_then(|auth| Some(auth.to_string())) + } + + pub fn empty() -> BoxBody { + Empty::::new() + .map_err(|never| match never {}) + .boxed() + } + + pub fn full>(chunk: T) -> BoxBody { + Full::new(chunk.into()) + .map_err(|never| match never {}) + .boxed() + } +} + +pub mod proxy_endpoint { + use super::helpers::{empty, full, host_addr}; + use super::TokioIo; + use bytes::Bytes; + use http::header; + use http_body_util::{combinators::BoxBody, BodyExt}; + use hyper::client::conn::http1::Builder; + use hyper::server::conn::http1; + use hyper::service::service_fn; + use hyper::upgrade::Upgraded; + use hyper::{http, Method, Request, Response}; + use std::net::SocketAddr; + use std::str::FromStr; + use tokio::net::TcpStream; + + pub async fn proxy_endpoint_main() -> Result<(), Box> { + let addr = SocketAddr::from_str(format!("{}:{}", "127.0.0.1", "5000").as_str()) + .expect("Failed to parse address"); + while let Ok(stream) = TcpStream::connect(addr).await { + println!("Connected to {}", addr); + let (mut send_request, conn) = Builder::new().handshake(TokioIo::new(stream)).await?; + tokio::spawn(conn.with_upgrades()); + let req = Request::builder() + .method(Method::CONNECT) + .uri(addr.to_string()) + .header(header::UPGRADE, "") + .header("custom-header", "") + .body(empty()) + .unwrap(); + let res = send_request.send_request(req).await?; + let stream = hyper::upgrade::on(res).await?; + + if let Err(err) = http1::Builder::new() + .preserve_header_case(true) + .title_case_headers(true) + .serve_connection(stream, service_fn(proxy)) + .with_upgrades() + .await + { + println!("Failed to serve connection: {:?}", err); + } + } + Ok(()) + } + + async fn proxy( + req: Request, + ) -> Result>, hyper::Error> { + println!("req: {:?}", req); + + if Method::CONNECT == req.method() { + if let Some(addr) = host_addr(req.uri()) { + tokio::task::spawn(async move { + match hyper::upgrade::on(req).await { + Ok(upgraded) => { + if let Err(e) = tunnel(upgraded, addr).await { + println!("server io error: {}", e); + }; + } + Err(e) => println!("upgrade error: {}", e), + } + }); + + Ok(Response::new(empty())) + } else { + println!("CONNECT host is not socket addr: {:?}", req.uri()); + let mut resp = Response::new(full("CONNECT must be to a socket address")); + *resp.status_mut() = http::StatusCode::BAD_REQUEST; + + Ok(resp) + } + } else { + let host = req.uri().host().expect("uri has no host"); + let port = req.uri().port_u16().unwrap_or(80); + + let stream = TcpStream::connect((host, port)).await.unwrap(); + let io = TokioIo::new(stream); + + let (mut sender, conn) = Builder::new() + .preserve_header_case(true) + .title_case_headers(true) + .handshake(io) + .await?; + tokio::task::spawn(async move { + if let Err(err) = conn.await { + println!("Connection failed: {:?}", err); + } + }); + + let resp = sender.send_request(req).await?; + Ok(resp.map(|b| b.boxed())) + } + } + + async fn tunnel(upgraded: Upgraded, addr: String) -> std::io::Result<()> { + let mut server = TcpStream::connect(addr.clone()).await?; + let mut upgraded = TokioIo::new(upgraded); + let (from_client, from_server) = + tokio::io::copy_bidirectional(&mut upgraded, &mut server).await?; + println!( + "proxy_endpoint => from_client = {} | from_server = {}", + from_client, from_server + ); + Ok(()) + } +} + +pub mod proxy_master { + pub mod proxy_pool { + use hyper::upgrade::Upgraded; + use std::sync::Arc; + use tokio::sync::Mutex; + + #[derive(Debug, Clone, Default)] + pub struct ProxyPool { + pool: Arc>>, + } + + impl ProxyPool { + pub async fn put(&self, stream: Upgraded) { + self.pool.lock().await.push(stream); + } + + pub async fn get(&self) -> Option { + let mut lock = self.pool.lock().await; + + // We have all proxy connection now, so we can pick any of them by arbitrary condition + + // Just pop the last one for example + lock.pop() + } + } + } + + pub mod proxy_endpoint { + use super::super::helpers::empty; + use super::super::TokioIo; + use super::proxy_pool::ProxyPool; + use bytes::Bytes; + use http_body_util::combinators::BoxBody; + use hyper::server; + use hyper::service::service_fn; + use hyper::{Method, Request, Response}; + use tokio::net::TcpListener; + + pub async fn listen_for_proxies_connecting( + pool: ProxyPool, + proxy_listener: TcpListener, + ) -> () { + while let Ok((stream, addr)) = proxy_listener.accept().await { + let pool = pool.clone(); + tokio::spawn(async move { + if let Err(err) = server::conn::http1::Builder::new() + .preserve_header_case(true) + .title_case_headers(true) + .serve_connection( + TokioIo::new(stream), + service_fn(move |req| handle_proxy_request(pool.clone(), req)), + ) + .with_upgrades() + .await + { + println!("Failed to serve connection from addr {:?}: {:?}", addr, err); + } + }); + } + } + + async fn handle_proxy_request( + pool: ProxyPool, + req: Request, + ) -> Result>, hyper::Error> { + if Method::CONNECT == req.method() { + // Received an HTTP request like: + // ``` + // CONNECT www.domain.com:443 HTTP/1.1 + // Host: www.domain.com:443 + // Proxy-Connection: Keep-Alive + // ``` + // + // When HTTP method is CONNECT we should return an empty body + // then we can eventually upgrade the connection and talk a new protocol. + // + // Note: only after client received an empty body with STATUS_OK can the + // connection be upgraded, so we can't return a response inside + // `on_upgrade` future. + tokio::spawn(async move { + match hyper::upgrade::on(req).await { + Ok(upgraded) => { + // We can put proxy along with req here + pool.put(upgraded).await; + } + Err(e) => println!("upgrade error: {}", e), + } + }); + Ok(Response::new(empty())) + } else { + // TODO : Process request - can register proxy here + println!("NOT CONNECT request"); + Ok(Response::new(empty())) + } + } + } + + pub mod clients_endpoint { + use super::super::helpers::empty; + use super::super::TokioIo; + use super::proxy_pool::ProxyPool; + use bytes::Bytes; + use http_body_util::combinators::BoxBody; + use hyper::service::service_fn; + use hyper::{client, server, Method, Request, Response}; + use tokio::io::copy_bidirectional; + use tokio::net::TcpListener; + + pub async fn listen_for_clients_connecting(pool: ProxyPool, client_listener: TcpListener) { + while let Ok((stream, addr)) = client_listener.accept().await { + let pool = pool.clone(); + tokio::spawn(async move { + if let Err(err) = server::conn::http1::Builder::new() + .preserve_header_case(true) + .title_case_headers(true) + .serve_connection( + TokioIo::new(stream), + service_fn(move |req| handle_client_request(pool.clone(), req)), + ) + .with_upgrades() + .await + { + println!("Failed to serve connection from addr {:?}: {:?}", addr, err); + } + }); + } + } + + async fn handle_client_request( + pool: ProxyPool, + mut req: Request, + ) -> Result>, hyper::Error> { + if Method::CONNECT == req.method() { + tokio::spawn(async move { + match hyper::upgrade::on(&mut req).await { + Ok(upgraded) => { + let proxy = pool.get().await.unwrap(); + let (mut send_request, conn) = + client::conn::http1::Builder::new().handshake(proxy).await?; + tokio::spawn(conn.with_upgrades()); + let res = send_request.send_request(req).await?; + let stream = hyper::upgrade::on(res).await?; + let (from_client, from_server) = copy_bidirectional( + &mut TokioIo::new(upgraded), + &mut TokioIo::new(stream), + ) + .await + .unwrap(); + println!( + "proxy_master => from_client = {} | from_server = {}", + from_client, from_server + ); + } + Err(e) => println!("upgrade error = {}", e), + } + Ok::<(), hyper::Error>(()) + }); + Ok(Response::new(empty())) + } else { + Ok(Response::new(empty())) + } + } + } +} + +#[tokio::main] +async fn main() { + let pool = proxy_master::proxy_pool::ProxyPool::default(); + let addr_proxies = SocketAddr::from(([127, 0, 0, 1], 5000)); + let proxy_listener = TcpListener::bind(addr_proxies).await.unwrap(); + println!("Listening on for proxies on: {}", addr_proxies); + let addr_clients = SocketAddr::from(([127, 0, 0, 1], 4000)); + let client_listener = TcpListener::bind(addr_clients).await.unwrap(); + println!("Listening on for clients on: {}", addr_clients); + + let proxy_listener_pool = pool.clone(); + + let proxy_endpoint_main_task = tokio::task::spawn(async move { + proxy_endpoint::proxy_endpoint_main().await.unwrap(); + }); + + let proxy_listener_task = tokio::task::spawn(async move { + proxy_master::proxy_endpoint::listen_for_proxies_connecting( + proxy_listener_pool, + proxy_listener, + ) + .await + }); + let proxy_listener_pool = pool.clone(); + let clients_listener_task = tokio::task::spawn(async move { + proxy_master::clients_endpoint::listen_for_clients_connecting( + proxy_listener_pool, + client_listener, + ) + .await; + }); + let _ = join_all(vec![ + proxy_listener_task, + clients_listener_task, + proxy_endpoint_main_task, + ]) + .await; +}