Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Switch to Hyper 1 #100

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,14 @@ darling = "0.20.10"
erased-serde = "0.3.28"
futures-util = "0.3.28"
governor = "0.6"
hyper = { version = "0.14", default-features = false }
http-body-util = "0.1"
hyper = { version = "1", default-features = false }
hyper-util = { version = "0.1", default-features = false }
indexmap = "2.0.0"
ipnetwork = "0.20"
once_cell = "1.5"
tonic = { version = "0.11.0", default-features = false }
opentelemetry-proto = "0.5.0"
tonic = { version = "0.12", default-features = false }
opentelemetry-proto = "0.7"
parking_lot = "0.12.1"
proc-macro2 = { version = "1", default-features = false }
prometheus = { version = "0.13.3", default-features = false }
Expand All @@ -68,6 +70,7 @@ tokio = "1.41.0"
thread_local = "1.1"
tikv-jemallocator = "0.5"
tikv-jemalloc-ctl = "0.5"
tower-service = "0.3"
yaml-merge-keys = "0.5"

# needed for minver
Expand Down
2 changes: 2 additions & 0 deletions examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ publish = false
anyhow = { workspace = true }
foundations = { workspace = true }
futures-util = { workspace = true }
http-body-util = { workspace = true }
hyper = { workspace = true }
hyper-util = { workspace = true, features = ["server", "tokio"] }
tokio = { workspace = true, features = ["full"]}

[[example]]
Expand Down
15 changes: 10 additions & 5 deletions examples/http_server/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@ use foundations::settings::collections::Map;
use foundations::telemetry::{self, log, tracing, TelemetryConfig, TelemetryContext};
use foundations::BootstrapResult;
use futures_util::stream::{FuturesUnordered, StreamExt};
use hyper::server::conn::Http;
use http_body_util::Full;
use hyper::body::{Bytes, Incoming};
use hyper::service::service_fn;
use hyper::{Body, Request, Response};
use hyper::{Request, Response};
use hyper_util::rt::{TokioExecutor, TokioIo};
use std::convert::Infallible;
use std::net::{SocketAddr, TcpListener as StdTcpListener};
use std::sync::Arc;
Expand Down Expand Up @@ -193,7 +195,10 @@ async fn serve_connection(
}
});

if let Err(e) = Http::new().serve_connection(conn, on_request).await {
if let Err(e) = hyper_util::server::conn::auto::Builder::new(TokioExecutor::new())
.serve_connection(TokioIo::new(conn), on_request)
.await
{
log::error!("failed to serve HTTP"; "error" => ?e);
metrics::http_server::failed_connections_total(&endpoint_name).inc();
}
Expand All @@ -204,9 +209,9 @@ async fn serve_connection(
#[tracing::span_fn("respond to request")]
async fn respond(
endpoint_name: Arc<String>,
req: Request<Body>,
req: Request<Incoming>,
routes: Arc<Map<String, ResponseSettings>>,
) -> Result<Response<Body>, Infallible> {
) -> Result<Response<Full<Bytes>>, Infallible> {
log::add_fields! {
"request_uri" => req.uri().to_string(),
"method" => req.method().to_string()
Expand Down
14 changes: 7 additions & 7 deletions foundations/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -77,13 +77,15 @@ client-telemetry = ["logging", "metrics", "tracing", "dep:futures-util"]

# Enables the telemetry server.
telemetry-server = [
"dep:http-body-util",
"dep:hyper",
"dep:hyper-util",
"dep:socket2",
"dep:percent-encoding"
"dep:percent-encoding",
]

# Enables telemetry reporting over gRPC
telemetry-otlp-grpc = ["dep:tonic", "dep:tokio", "dep:hyper"]
telemetry-otlp-grpc = ["dep:tonic", "tonic/prost", "dep:tokio", "dep:hyper"]

# Enables experimental tokio runtime metrics
tokio-runtime-metrics = [
Expand Down Expand Up @@ -177,11 +179,9 @@ clap = { workspace = true, optional = true }
erased-serde = { workspace = true, optional = true }
futures-util = { workspace = true, optional = true }
governor = { workspace = true, optional = true }
hyper = { workspace = true, optional = true, features = [
"http1",
"runtime",
"server",
] }
http-body-util = { workspace = true, optional = true }
hyper = { workspace = true, optional = true, features = ["http1", "server"] }
hyper-util = { workspace = true, optional = true, features = ["tokio"] }
indexmap = { workspace = true, optional = true, features = ["serde"] }
once_cell = { workspace = true, optional = true }
opentelemetry-proto = { workspace = true, optional = true, features = ["gen-tonic-messages", "trace"] }
Expand Down
18 changes: 9 additions & 9 deletions foundations/src/telemetry/driver.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
use crate::utils::feature_use;
use crate::BootstrapResult;
use futures_util::future::BoxFuture;
use futures_util::stream::{FuturesUnordered, Stream};
use futures_util::FutureExt;
use futures_util::stream::FuturesUnordered;
use futures_util::{FutureExt, Stream};
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};

feature_use!(cfg(feature = "telemetry-server"), {
use super::server::TelemetryServerFuture;
use anyhow::anyhow;
use hyper::Server;
use std::net::SocketAddr;
});

Expand Down Expand Up @@ -38,7 +36,7 @@ impl TelemetryDriver {
) -> Self {
Self {
#[cfg(feature = "telemetry-server")]
server_addr: server_fut.as_ref().map(Server::local_addr),
server_addr: server_fut.as_ref().map(|fut| fut.local_addr()),

#[cfg(feature = "telemetry-server")]
server_fut,
Expand Down Expand Up @@ -66,9 +64,11 @@ impl TelemetryDriver {
#[cfg(feature = "telemetry-server")]
{
if let Some(server_fut) = self.server_fut.take() {
self.tele_futures.push(
async move { Ok(server_fut.with_graceful_shutdown(signal).await?) }.boxed(),
);
self.tele_futures.push(Box::pin(async move {
server_fut.with_graceful_shutdown(signal).await;

Ok(())
}));

return;
}
Expand All @@ -93,7 +93,7 @@ impl Future for TelemetryDriver {
#[cfg(feature = "telemetry-server")]
if let Some(server_fut) = &mut self.server_fut {
if let Poll::Ready(res) = Pin::new(server_fut).poll(cx) {
ready_res.push(res.map_err(|err| anyhow!(err)));
match res {}
}
}

Expand Down
9 changes: 7 additions & 2 deletions foundations/src/telemetry/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,9 @@ pub use self::testing::TestTelemetryContext;
pub use self::memory_profiler::MemoryProfiler;

#[cfg(feature = "telemetry-server")]
pub use self::server::{TelemetryRouteHandler, TelemetryRouteHandlerFuture, TelemetryServerRoute};
pub use self::server::{
BoxError, TelemetryRouteHandler, TelemetryRouteHandlerFuture, TelemetryServerRoute,
};

pub use self::driver::TelemetryDriver;
pub use self::telemetry_context::{
Expand Down Expand Up @@ -290,7 +292,10 @@ pub fn init(config: TelemetryConfig) -> BootstrapResult<TelemetryDriver> {

#[cfg(feature = "telemetry-server")]
{
let server_fut = self::server::init(config.settings.clone(), config.custom_server_routes)?;
let server_fut = server::TelemetryServerFuture::new(
config.settings.clone(),
config.custom_server_routes,
)?;

Ok(TelemetryDriver::new(server_fut, tele_futures))
}
Expand Down
203 changes: 203 additions & 0 deletions foundations/src/telemetry/server/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
#[cfg(feature = "metrics")]
use super::metrics;
use super::settings::TelemetrySettings;
use crate::telemetry::log;
use crate::BootstrapResult;
use anyhow::Context as _;
use futures_util::future::FutureExt;
use futures_util::{pin_mut, ready};
use hyper_util::rt::TokioIo;
use socket2::{Domain, SockAddr, Socket, Type};
use std::convert::Infallible;
use std::future::Future;
use std::net::SocketAddr;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use tokio::net::TcpListener;
use tokio::sync::watch;

mod router;

use router::Router;
pub use router::{
BoxError, TelemetryRouteHandler, TelemetryRouteHandlerFuture, TelemetryServerRoute,
};

pub(super) struct TelemetryServerFuture {
listener: TcpListener,
router: Router,
}

impl TelemetryServerFuture {
pub(super) fn new(
settings: TelemetrySettings,
custom_routes: Vec<TelemetryServerRoute>,
) -> BootstrapResult<Option<TelemetryServerFuture>> {
if !settings.server.enabled {
return Ok(None);
}

let settings = Arc::new(settings);

// Eagerly init the memory profiler so it gets set up before syscalls are sandboxed with seccomp.
#[cfg(all(target_os = "linux", feature = "memory-profiling"))]
if settings.memory_profiler.enabled {
memory_profiling::profiler(Arc::clone(&settings))
.map_err(|err| anyhow::anyhow!(err))?;
}

let addr = settings.server.addr;

#[cfg(feature = "settings")]
let addr = SocketAddr::from(addr);

let router = Router::new(custom_routes, settings);

let listener = {
let std_listener = std::net::TcpListener::from(
bind_socket(addr).with_context(|| format!("binding to socket {addr:?}"))?,
);

std_listener.set_nonblocking(true)?;

tokio::net::TcpListener::from_std(std_listener)?
};

Ok(Some(TelemetryServerFuture { listener, router }))
}
pub(super) fn local_addr(&self) -> SocketAddr {
self.listener.local_addr().unwrap()
}

// Adapted from Hyper 0.14 Server stuff and axum::serve::serve.
pub(super) async fn with_graceful_shutdown(
self,
shutdown_signal: impl Future<Output = ()> + Send + Sync + 'static,
) {
let (signal_tx, signal_rx) = watch::channel(());
let signal_tx = Arc::new(signal_tx);

tokio::spawn(async move {
shutdown_signal.await;

drop(signal_rx);
});

let (close_tx, close_rx) = watch::channel(());
let listener = self.listener;

pin_mut!(listener);

loop {
let socket = tokio::select! {
conn = listener.accept() => match conn {
Ok((conn, _)) => TokioIo::new(conn),
Err(e) => {
log::warn!("failed to accept connection"; "error" => e);

continue;
}
},
_ = signal_tx.closed() => { break },
};

let router = self.router.clone();
let signal_tx = Arc::clone(&signal_tx);
let close_rx = close_rx.clone();

tokio::spawn(async move {
let conn = hyper::server::conn::http1::Builder::new()
.serve_connection(socket, router)
.with_upgrades();

let signal_closed = signal_tx.closed().fuse();

pin_mut!(conn);
pin_mut!(signal_closed);

loop {
tokio::select! {
_ = conn.as_mut() => break,
_ = &mut signal_closed => conn.as_mut().graceful_shutdown(),
}
}

drop(close_rx);
});
}

drop(close_rx);

close_tx.closed().await;
}
}

impl Future for TelemetryServerFuture {
type Output = Infallible;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let this = &mut *self;

loop {
let socket = match ready!(Pin::new(&mut this.listener).poll_accept(cx)) {
Ok((conn, _)) => TokioIo::new(conn),
Err(e) => {
log::warn!("failed to accept connection"; "error" => e);

continue;
}
};

let router = this.router.clone();

tokio::spawn(
hyper::server::conn::http1::Builder::new()
// upgrades needed for websockets
.serve_connection(socket, router)
.with_upgrades(),
);
}
}
}

fn bind_socket(addr: SocketAddr) -> BootstrapResult<Socket> {
let socket = Socket::new(
if addr.is_ipv4() {
Domain::IPV4
} else {
Domain::IPV6
},
Type::STREAM,
None,
)?;

socket.set_reuse_address(true)?;
#[cfg(unix)]
socket.set_reuse_port(true)?;
socket.bind(&SockAddr::from(addr))?;
socket.listen(1024)?;

Ok(socket)
}

#[cfg(all(target_os = "linux", feature = "memory-profiling"))]
mod memory_profiling {
use super::*;
use crate::telemetry::MemoryProfiler;
use crate::Result;

pub(super) fn profiler(settings: Arc<TelemetrySettings>) -> Result<MemoryProfiler> {
MemoryProfiler::get_or_init_with(&settings.memory_profiler)?.ok_or_else(|| {
"profiling should be enabled via `_RJEM_MALLOC_CONF=prof:true` env var".into()
})
}

pub(super) async fn heap_profile(settings: Arc<TelemetrySettings>) -> Result<String> {
profiler(settings)?.heap_profile().await
}

pub(super) async fn heap_stats(settings: Arc<TelemetrySettings>) -> Result<String> {
profiler(settings)?.heap_stats()
}
}
Loading