diff --git a/foundations/src/http/server/builder.rs b/foundations/src/http/server/builder.rs index d3618a3b8..37a7b0c37 100644 --- a/foundations/src/http/server/builder.rs +++ b/foundations/src/http/server/builder.rs @@ -1,5 +1,6 @@ use std::net::SocketAddr; use std::sync::Arc; +use std::time::Duration; use hyper_util::rt::TokioExecutor; @@ -25,6 +26,7 @@ pub struct ServerBuilder { http1_2: hyper_util::server::conn::auto::Builder, #[cfg(feature = "http3")] quic: Option, + keep_alive_timeout: Option, worker_count: usize, } @@ -39,6 +41,7 @@ impl Default for ServerBuilder { http1_2: hyper_util::server::conn::auto::Builder::new(TokioExecutor::new()), #[cfg(feature = "http3")] quic: None, + keep_alive_timeout: Some(Duration::from_secs(30)), worker_count: 1, } } @@ -91,6 +94,13 @@ impl ServerBuilder { self } + /// Set the keep alive timeout for the server. + /// Defaults to 5 seconds. + pub fn with_keep_alive_timeout(mut self, timeout: impl Into>) -> Self { + self.keep_alive_timeout = timeout.into(); + self + } + /// Build the server. pub fn build(self, make_service: M) -> Result, Error> where @@ -124,6 +134,7 @@ impl ServerBuilder { backends: Vec::new(), handler: None, worker_count: self.worker_count, + keep_alive_timeout: self.keep_alive_timeout, }) } } diff --git a/foundations/src/http/server/mod.rs b/foundations/src/http/server/mod.rs index d3930ee8d..74393ddba 100644 --- a/foundations/src/http/server/mod.rs +++ b/foundations/src/http/server/mod.rs @@ -48,6 +48,7 @@ pub struct Server { backends: Vec, handler: Option, worker_count: usize, + keep_alive_timeout: Option, } #[derive(Debug, thiserror::Error)] @@ -152,7 +153,8 @@ impl Server { for i in 0..self.worker_count { let tcp_listener = make_tcp_listener(self.bind)?; let make_service = self.make_service.clone(); - let backend = TlsBackend::new(tcp_listener, acceptor.clone(), self.http1_2.clone(), &ctx); + let backend = TlsBackend::new(tcp_listener, acceptor.clone(), self.http1_2.clone(), &ctx) + .with_keep_alive_timeout(self.keep_alive_timeout); let span = tracing::info_span!("tls", addr = %self.bind, worker = i); self.backends .push(AbortOnDrop::new(spawn(backend.serve(make_service).instrument(span)))); @@ -170,7 +172,8 @@ impl Server { for i in 0..self.worker_count { let tcp_listener = make_tcp_listener(addr)?; let make_service = self.make_service.clone(); - let backend = TcpBackend::new(tcp_listener, self.http1_2.clone(), &ctx); + let backend = TcpBackend::new(tcp_listener, self.http1_2.clone(), &ctx) + .with_keep_alive_timeout(self.keep_alive_timeout); let span = tracing::info_span!("tcp", addr = %addr, worker = i); self.backends .push(AbortOnDrop::new(spawn(backend.serve(make_service).instrument(span)))); @@ -188,7 +191,8 @@ impl Server { quinn::default_runtime().unwrap(), )?; let make_service = self.make_service.clone(); - let backend = QuicBackend::new(endpoint, quic.h3.clone(), &ctx); + let backend = + QuicBackend::new(endpoint, quic.h3.clone(), &ctx).with_keep_alive_timeout(self.keep_alive_timeout); let span = tracing::info_span!("quic", addr = %self.bind, worker = i); self.backends .push(AbortOnDrop::new(spawn(backend.serve(make_service).instrument(span)))); diff --git a/foundations/src/http/server/stream/mod.rs b/foundations/src/http/server/stream/mod.rs index 4e3aa743a..9a3dfc456 100644 --- a/foundations/src/http/server/stream/mod.rs +++ b/foundations/src/http/server/stream/mod.rs @@ -5,10 +5,12 @@ pub mod tcp; pub mod tls; use std::convert::Infallible; +use std::sync::Arc; pub use axum::body::Body; pub use axum::extract::Request; pub use axum::response::{IntoResponse, Response}; +use rand::Rng; use super::Error; @@ -152,3 +154,24 @@ pub enum SocketKind { TlsTcp, Quic, } + +fn jitter(duration: std::time::Duration) -> std::time::Duration { + let mut rng = rand::thread_rng(); + let jitter = rng.gen_range(0..duration.as_millis() / 10); + duration + std::time::Duration::from_millis(jitter as u64) +} + +struct ActiveRequestsGuard(Arc); + +impl Drop for ActiveRequestsGuard { + fn drop(&mut self) { + self.0.fetch_sub(1, std::sync::atomic::Ordering::Relaxed); + } +} + +impl ActiveRequestsGuard { + fn new(active_requests: Arc) -> Self { + active_requests.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + Self(active_requests) + } +} diff --git a/foundations/src/http/server/stream/quic.rs b/foundations/src/http/server/stream/quic.rs index ac3155180..fd9b21e03 100644 --- a/foundations/src/http/server/stream/quic.rs +++ b/foundations/src/http/server/stream/quic.rs @@ -20,6 +20,7 @@ use tracing::Instrument; use super::{Backend, IncomingConnection, MakeService, ServiceHandler, SocketKind}; use crate::context::ContextFutExt; +use crate::http::server::stream::{jitter, ActiveRequestsGuard}; use crate::http::server::Error; #[cfg(feature = "runtime")] use crate::runtime::spawn; @@ -30,6 +31,7 @@ pub struct QuicBackend { endpoint: quinn::Endpoint, builder: Arc, handler: crate::context::Handler, + keep_alive_timeout: Option, } impl QuicBackend { @@ -38,8 +40,14 @@ impl QuicBackend { endpoint, builder, handler: ctx.new_child().1, + keep_alive_timeout: None, } } + + pub fn with_keep_alive_timeout(mut self, timeout: impl Into>) -> Self { + self.keep_alive_timeout = timeout.into(); + self + } } struct IncomingQuicConnection<'a> { @@ -82,7 +90,13 @@ impl Backend for QuicBackend { break; }; - let connection = connection.accept()?; + let connection = match connection.accept() { + Ok(connection) => connection, + Err(e) => { + tracing::debug!(error = %e, "failed to accept quic connection"); + continue; + } + }; let span = tracing::trace_span!("connection", remote_addr = %connection.remote_address()); let _guard = span.enter(); @@ -106,6 +120,7 @@ impl Backend for QuicBackend { connection, builder: self.builder.clone(), service, + keep_alive_timeout: self.keep_alive_timeout, parent_ctx: ctx, } .serve() @@ -125,6 +140,7 @@ struct Connection { connection: Connecting, builder: Arc, service: S, + keep_alive_timeout: Option, parent_ctx: crate::context::Context, } @@ -138,7 +154,10 @@ impl Connection { self.service.on_close().await; return; } - None => return, + None => { + self.service.on_close().await; + return; + } }; let mut connection = match self @@ -153,7 +172,10 @@ impl Connection { self.service.on_close().await; return; } - None => return, + None => { + self.service.on_close().await; + return; + } }; let (hijack_conn_tx, mut hijack_conn_rx) = tokio::sync::mpsc::channel::(1); @@ -170,6 +192,8 @@ impl Connection { // When the above is cancelled, the connection is allowed to finish. let connection_handle = crate::context::Handler::new(); + let active_requests = Arc::new(std::sync::atomic::AtomicUsize::new(0)); + loop { let (request, stream) = tokio::select! { request = connection.accept() => { @@ -189,6 +213,23 @@ impl Connection { break; } } + }, + Some(_) = async { + if let Some(keep_alive_timeout) = self.keep_alive_timeout { + loop { + tokio::time::sleep(jitter(keep_alive_timeout)).await; + if active_requests.load(std::sync::atomic::Ordering::Relaxed) != 0 { + continue; + } + + break Some(()); + } + } else { + None + } + } => { + tracing::debug!("keep alive timeout"); + break; } // This happens when the connection has been upgraded to a WebTransport connection. Some(send_hijack_conn) = hijack_conn_rx.recv() => { @@ -201,6 +242,7 @@ impl Connection { }; tracing::trace!("new request"); + let active_requests = ActiveRequestsGuard::new(active_requests.clone()); let service = self.service.clone(); let stream = QuinnStream::new(stream); @@ -226,6 +268,7 @@ impl Connection { service.on_error(err).await; } + drop(active_requests); drop(ctx); } .with_context(connection_context) diff --git a/foundations/src/http/server/stream/stream.rs b/foundations/src/http/server/stream/stream.rs deleted file mode 100644 index 3fd75c243..000000000 --- a/foundations/src/http/server/stream/stream.rs +++ /dev/null @@ -1,402 +0,0 @@ -use std::convert::Infallible; -use std::pin::{pin, Pin}; -use std::sync::Arc; -use std::task::{Context, Poll}; - -use axum::body::Body; -use axum::extract::Request; -use bytes::{Buf, Bytes}; -use futures::{Future, StreamExt}; -use h3::server::RequestStream; -use h3_quinn::{BidiStream, RecvStream, SendStream}; -use hyper::body::Incoming; -use hyper::rt::Executor; -use hyper_util::rt::TokioIo; -use tower::ServiceExt; - -use super::builder::RuntimeExecutor; -use super::Error; - -pub enum ServerBackend { - Quic(QuicBackend), - Tcp(TcpBackend), - Tls(TlsBackend), -} - -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] -pub enum StreamKind { - Quic, - Tcp, -} - -impl ServerBackend { - pub async fn accept(&self) -> Result { - match self { - ServerBackend::Quic(backend) => { - let incoming = backend.endpoint.accept().await.ok_or(Error::QuinnClosed)?; - let addr = incoming.remote_address(); - Ok(IncomingConnection::Quic { - incoming, - addr, - builder: backend.builder.clone(), - }) - } - ServerBackend::Tcp(backend) => { - let (stream, addr) = backend.listener.accept().await?; - Ok(IncomingConnection::Tcp { - stream, - addr, - builder: backend.builder.clone(), - }) - } - ServerBackend::Tls(backend) => { - let (stream, addr) = backend.tcp_listener.accept().await?; - Ok(IncomingConnection::Tls { - stream, - addr, - builder: backend.builder.clone(), - acceptor: backend.acceptor.clone(), - }) - } - } - } -} - -pub enum IncomingConnection { - Quic { - incoming: quinn::Connecting, - addr: std::net::SocketAddr, - builder: Arc, - }, - Tcp { - stream: tokio::net::TcpStream, - addr: std::net::SocketAddr, - builder: Arc>, - }, - Tls { - stream: tokio::net::TcpStream, - addr: std::net::SocketAddr, - builder: Arc>, - acceptor: Arc, - }, -} - -impl IncomingConnection { - pub fn remote_addr(&self) -> std::net::SocketAddr { - match self { - IncomingConnection::Quic { addr, .. } => *addr, - IncomingConnection::Tcp { addr, .. } => *addr, - IncomingConnection::Tls { addr, .. } => *addr, - } - } - - pub fn kind(&self) -> StreamKind { - match self { - IncomingConnection::Quic { .. } => StreamKind::Quic, - IncomingConnection::Tcp { .. } => StreamKind::Tcp, - IncomingConnection::Tls { .. } => StreamKind::Tcp, - } - } - - pub async fn accept(self) -> Result { - match self { - IncomingConnection::Quic { incoming, builder, addr } => { - let connection = incoming.await?; - let connection = builder.build(h3_quinn::Connection::new(connection)).await?; - Ok(Connection::Quic { connection, addr }) - } - IncomingConnection::Tcp { stream, addr, builder } => Ok(Connection::Tcp { stream, addr, builder }), - IncomingConnection::Tls { - stream, - addr, - builder, - acceptor, - } => { - let stream = acceptor.accept(stream).await?; - Ok(Connection::Tls { stream, addr, builder }) - } - } - } -} - -pub enum Connection { - Quic { - connection: h3::server::Connection, - addr: std::net::SocketAddr, - }, - Tcp { - stream: tokio::net::TcpStream, - addr: std::net::SocketAddr, - builder: Arc>, - }, - Tls { - stream: tokio_rustls::server::TlsStream, - addr: std::net::SocketAddr, - builder: Arc>, - }, -} - -pub struct IncomingStream<'a> { - stream: &'a Connection, -} - -type SendQuicConnection = tokio::sync::oneshot::Sender>; - -impl IncomingStream<'_> { - pub fn remote_addr(&self) -> std::net::SocketAddr { - self.stream.remote_addr() - } - - pub fn kind(&self) -> StreamKind { - self.stream.kind() - } -} - -impl std::fmt::Debug for IncomingStream<'_> { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("IncomingStream") - .field("remote_addr", &self.remote_addr()) - .field("kind", &self.kind()) - .finish() - } -} - -impl Connection { - pub fn remote_addr(&self) -> std::net::SocketAddr { - match self { - Connection::Quic { addr, .. } => *addr, - Connection::Tcp { addr, .. } => *addr, - Connection::Tls { addr, .. } => *addr, - } - } - - pub fn kind(&self) -> StreamKind { - match self { - Connection::Quic { .. } => StreamKind::Quic, - Connection::Tcp { .. } => StreamKind::Tcp, - Connection::Tls { .. } => StreamKind::Tcp, - } - } - - pub(super) async fn serve_connection(self, mut layer: M) -> Result<(), Error> - where - M: for<'a> tower::Service, Response = S, Error = Infallible> + Send + 'static, - S: tower::Service - + Clone - + 'static - + Send, - S::Future: Send + 'static, - { - let service: S = layer - .call(IncomingStream { stream: &self }) - .await - .unwrap_or_else(|err| match err {}); - - match self { - Connection::Quic { mut connection, .. } => { - let (free_conn_tx, mut free_conn_rx) = tokio::sync::mpsc::channel::(1); - loop { - let (request, stream) = tokio::select! { - request = connection.accept() => { - request?.ok_or(Error::ConnectionClosed)? - } - // This happens when the connection has been upgraded to a WebTransport connection. - Some(free_conn) = free_conn_rx.recv() => { - free_conn.send(connection).ok(); - return Ok(()); - } - }; - - let mut service = service.clone(); - let mut stream = QuinnStream::new(stream); - let webtransport_ext = WebTransportExt { - free_conn_tx: free_conn_tx.clone(), - stream: stream.clone(), - }; - let mut request = - request.map(|()| axum::body::Body::from_stream(QuinnHttpBodyAdapter { stream: stream.clone() })); - request.extensions_mut().insert(webtransport_ext); - - RuntimeExecutor.execute(async move { - let response = service.call(request).await.unwrap_or_else(|err| match err {}); - - let mut send_stream = stream.clone(); - let Some(send) = send_stream.get_send() else { - return; - }; - - stream.get_recv(); - drop(stream); - - let (parts, body) = response.into_parts(); - let response = axum::response::Response::from_parts(parts, ()); - - if let Err(err) = send.send_response(response).await { - tracing::error!(%err, "failed to send response"); - return; - } - - let mut stream = body.into_data_stream(); - while let Some(data) = stream.next().await { - match data { - Ok(data) => { - if let Err(err) = send.send_data(data.clone()).await { - tracing::error!(%err, "failed to send response body"); - return; - } - - tracing::info!("response body sent"); - } - Err(err) => { - tracing::error!(%err, "failed to read response body"); - return; - } - } - } - - if let Err(err) = send.finish().await { - tracing::error!(%err, "failed to send response body"); - return; - } - - tracing::info!("response sent"); - }); - } - } - Connection::Tcp { stream, builder, .. } => { - builder - .serve_connection_with_upgrades(TokioIo::new(stream), TowerToHyperService { service }) - .await?; - } - Connection::Tls { stream, builder, .. } => { - builder - .serve_connection_with_upgrades(TokioIo::new(stream), TowerToHyperService { service }) - .await?; - } - } - - Ok(()) - } -} - -#[derive(Clone)] -struct WebTransportExt { - free_conn_tx: tokio::sync::mpsc::Sender, - stream: QuinnStream, -} - -struct QuinnHttpBodyAdapter { - stream: QuinnStream, -} - -impl futures::Stream for QuinnHttpBodyAdapter { - type Item = Result; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let stream = match self.stream.get_recv() { - Some(stream) => stream, - None => return Poll::Ready(None), - }; - - match pin!(stream.recv_data()).poll(cx) { - Poll::Ready(Ok(Some(mut buf))) => { - // The buf here isnt a Bytes but an impl Buf. - // We need to convert it to a Bytes. - let buf = buf.copy_to_bytes(buf.remaining()); - Poll::Ready(Some(Ok(buf))) - } - Poll::Ready(Ok(None)) => Poll::Ready(None), - Poll::Ready(Err(err)) => Poll::Ready(Some(Err(err))), - Poll::Pending => Poll::Pending, - } - } -} - -#[derive(Debug, Copy, Clone)] -struct TowerToHyperService { - service: S, -} - -impl hyper::service::Service> for TowerToHyperService -where - S: tower::Service + Clone, -{ - type Error = S::Error; - type Future = TowerToHyperServiceFuture; - type Response = S::Response; - - fn call(&self, req: Request) -> Self::Future { - let req = req.map(Body::new); - TowerToHyperServiceFuture { - future: self.service.clone().oneshot(req), - } - } -} - -#[pin_project::pin_project] -struct TowerToHyperServiceFuture -where - S: tower::Service, -{ - #[pin] - future: tower::util::Oneshot, -} - -impl std::future::Future for TowerToHyperServiceFuture -where - S: tower::Service, -{ - type Output = Result; - - #[inline] - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - self.project().future.poll(cx) - } -} - -pub struct QuicBackend { - endpoint: quinn::Endpoint, - builder: Arc, -} - -impl QuicBackend { - pub fn new(endpoint: quinn::Endpoint, builder: Arc) -> Self { - Self { endpoint, builder } - } -} - -pub struct TcpBackend { - listener: tokio::net::TcpListener, - builder: Arc>, - rewrite_tls: bool, -} - -impl TcpBackend { - pub fn new(listener: tokio::net::TcpListener, builder: Arc>, rewrite_tls: bool) -> Self { - Self { - listener, - builder, - rewrite_tls, - } - } -} - -pub struct TlsBackend { - acceptor: Arc, - builder: Arc>, - tcp_listener: tokio::net::TcpListener, -} - -impl TlsBackend { - pub fn new( - tcp_listener: tokio::net::TcpListener, - acceptor: Arc, - builder: Arc>, - ) -> Self { - Self { - tcp_listener, - acceptor, - builder, - } - } -} diff --git a/foundations/src/http/server/stream/tcp.rs b/foundations/src/http/server/stream/tcp.rs index 940fcaf5d..005e07fb7 100644 --- a/foundations/src/http/server/stream/tcp.rs +++ b/foundations/src/http/server/stream/tcp.rs @@ -15,6 +15,7 @@ use tracing::Instrument; use super::{Backend, IncomingConnection, MakeService, ServiceHandler, SocketKind}; use crate::context::ContextFutExt; +use crate::http::server::stream::{jitter, ActiveRequestsGuard}; #[cfg(feature = "runtime")] use crate::runtime::spawn; #[cfg(feature = "opentelemetry")] @@ -24,6 +25,7 @@ pub struct TcpBackend { listener: TcpListener, builder: Arc>, handler: crate::context::Handler, + keep_alive_timeout: Option, } impl TcpBackend { @@ -32,8 +34,14 @@ impl TcpBackend { listener, builder, handler: ctx.new_child().1, + keep_alive_timeout: None, } } + + pub fn with_keep_alive_timeout(mut self, timeout: impl Into>) -> Self { + self.keep_alive_timeout = timeout.into(); + self + } } struct IncomingTcpConnection<'a> { @@ -103,6 +111,7 @@ impl Backend for TcpBackend { builder: self.builder.clone(), service, parent_ctx: ctx, + keep_alive_timeout: self.keep_alive_timeout, } .serve() .in_current_span(), @@ -122,6 +131,7 @@ struct Connection { builder: Arc>, service: S, parent_ctx: crate::context::Context, + keep_alive_timeout: Option, } impl Connection { @@ -138,19 +148,24 @@ impl Connection { Arc::new(move || handle.context()) }; + let active_requests = Arc::new(std::sync::atomic::AtomicUsize::new(0)); + let service_fn = { let service = self.service.clone(); let span = tracing::Span::current(); + let active_requests = active_requests.clone(); service_fn(move |mut req: Request| { let service = service.clone(); let make_ctx = make_ctx.clone(); + let guard = ActiveRequestsGuard::new(active_requests.clone()); async move { let ctx = make_ctx(); req.extensions_mut().insert(ctx.clone()); req.extensions_mut().insert(SocketKind::Tcp); let resp = service.on_request(req.map(Body::new)).await.into_response(); drop(ctx); + drop(guard); Ok::<_, Infallible>(resp) } .instrument(span.clone()) @@ -159,6 +174,23 @@ impl Connection { let r = tokio::select! { r = self.builder.serve_connection_with_upgrades(TokioIo::new(self.connection), service_fn) => r, + Some(_) = async { + if let Some(keep_alive_timeout) = self.keep_alive_timeout { + loop { + tokio::time::sleep(jitter(keep_alive_timeout)).await; + if active_requests.load(std::sync::atomic::Ordering::Relaxed) != 0 { + continue; + } + + break Some(()); + } + } else { + None + } + } => { + tracing::debug!("keep alive timeout"); + Ok(()) + } _ = async { self.parent_ctx.done().await; handle.shutdown().await; diff --git a/foundations/src/http/server/stream/tls.rs b/foundations/src/http/server/stream/tls.rs index 5321de137..75cf19ce5 100644 --- a/foundations/src/http/server/stream/tls.rs +++ b/foundations/src/http/server/stream/tls.rs @@ -16,6 +16,7 @@ use tracing::Instrument; use super::{Backend, IncomingConnection, MakeService, ServiceHandler, SocketKind}; use crate::context::ContextFutExt; +use crate::http::server::stream::{jitter, ActiveRequestsGuard}; #[cfg(feature = "runtime")] use crate::runtime::spawn; #[cfg(feature = "opentelemetry")] @@ -26,6 +27,7 @@ pub struct TlsBackend { acceptor: Arc, builder: Arc>, handler: crate::context::Handler, + keep_alive_timeout: Option, } impl TlsBackend { @@ -40,8 +42,14 @@ impl TlsBackend { acceptor, builder, handler: ctx.new_child().1, + keep_alive_timeout: None, } } + + pub fn with_keep_alive_timeout(mut self, timeout: impl Into>) -> Self { + self.keep_alive_timeout = timeout.into(); + self + } } struct IncomingTlsConnection<'a> { @@ -113,6 +121,7 @@ impl Backend for TlsBackend { acceptor: self.acceptor.clone(), service, parent_ctx: ctx, + keep_alive_timeout: self.keep_alive_timeout, } .serve() .in_current_span(), @@ -132,20 +141,25 @@ struct Connection { builder: Arc>, acceptor: Arc, service: S, + keep_alive_timeout: Option, parent_ctx: crate::context::Context, } impl Connection { async fn serve(self) { tracing::trace!("connection handler started"); - let connection = match self.acceptor.accept(self.connection).await { - Ok(connection) => connection, - Err(err) => { + let connection = match self.acceptor.accept(self.connection).with_context(&self.parent_ctx).await { + Some(Ok(connection)) => connection, + Some(Err(err)) => { tracing::debug!(err = %err, "error accepting connection"); self.service.on_error(err.into()).await; self.service.on_close().await; return; } + None => { + self.service.on_close().await; + return; + } }; self.service.on_ready().await; @@ -160,20 +174,25 @@ impl Connection { Arc::new(move || handle.context()) }; + let active_requests = Arc::new(std::sync::atomic::AtomicUsize::new(0)); + let service_fn = { let service = self.service.clone(); let make_ctx = make_ctx.clone(); let span = tracing::Span::current(); + let active_requests = active_requests.clone(); service_fn(move |mut req: Request| { let service = service.clone(); let make_ctx = make_ctx.clone(); + let guard = ActiveRequestsGuard::new(active_requests.clone()); async move { let ctx = make_ctx(); req.extensions_mut().insert(ctx.clone()); req.extensions_mut().insert(SocketKind::TlsTcp); let resp = service.on_request(req.map(Body::new)).await.into_response(); drop(ctx); + drop(guard); Ok::<_, Infallible>(resp) } .instrument(span.clone()) @@ -182,6 +201,23 @@ impl Connection { let r = tokio::select! { r = self.builder.serve_connection_with_upgrades(TokioIo::new(connection), service_fn) => r, + Some(_) = async { + if let Some(keep_alive_timeout) = self.keep_alive_timeout { + loop { + tokio::time::sleep(jitter(keep_alive_timeout)).await; + if active_requests.load(std::sync::atomic::Ordering::Relaxed) != 0 { + continue; + } + + break Some(()); + } + } else { + None + } + } => { + tracing::debug!("keep alive timeout"); + Ok(()) + } _ = async { self.parent_ctx.done().await; handle.shutdown().await; diff --git a/image-processor/src/worker/process/encoder/libwebp.rs b/image-processor/src/worker/process/encoder/libwebp.rs index 11a583199..0e7f386cd 100644 --- a/image-processor/src/worker/process/encoder/libwebp.rs +++ b/image-processor/src/worker/process/encoder/libwebp.rs @@ -31,7 +31,10 @@ impl WebpEncoder { pub fn new(settings: EncoderSettings) -> Result { let mut config = zero_memory_default::(); - wrap_error(unsafe { libwebp_sys::WebPConfigInit(&mut config) }, "failed to initialize webp config")?; + wrap_error( + unsafe { libwebp_sys::WebPConfigInit(&mut config) }, + "failed to initialize webp config", + )?; config.lossless = if settings.quality == OutputQuality::Lossless { 1 } else { 0 }; config.quality = match settings.quality { @@ -127,25 +130,26 @@ impl Encoder for WebpEncoder { let encoder = SmartPtr::new( NonNull::new(unsafe { - libwebp_sys::WebPAnimEncoderNew( - self.picture.width, - self.picture.height, - &{ - let mut config = zero_memory_default::(); - wrap_error(libwebp_sys::WebPAnimEncoderOptionsInit(&mut config), "failed to initialize webp anim encoder options")?; - - config.allow_mixed = 1; - // TOOD(troy): open a libwebp issue to report that images are being encoded incorrectly unless this is set to 1. However this forces every frame to be a keyframe and thus the size of the file is much larger. - config.kmax = 1; - - config.anim_params.loop_count = match self.settings.loop_count { - LoopCount::Finite(count) => count as _, - LoopCount::Infinite => 0, - }; - - config - }, - ) + libwebp_sys::WebPAnimEncoderNew(self.picture.width, self.picture.height, &{ + let mut config = zero_memory_default::(); + wrap_error( + libwebp_sys::WebPAnimEncoderOptionsInit(&mut config), + "failed to initialize webp anim encoder options", + )?; + + config.allow_mixed = 1; + // TOOD(troy): open a libwebp issue to report that images are being encoded + // incorrectly unless this is set to 1. However this forces every frame to be a + // keyframe and thus the size of the file is much larger. + config.kmax = 1; + + config.anim_params.loop_count = match self.settings.loop_count { + LoopCount::Finite(count) => count as _, + LoopCount::Infinite => 0, + }; + + config + }) }) .ok_or(WebPError::OutOfMemory)?, |encoder| {