diff --git a/CHANGELOG.md b/CHANGELOG.md index ff27087221..847d74f866 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,7 @@ - Allow `sample_rate` to be float type when deserializing `DynamicSamplingContext`. ([#4181](https://github.com/getsentry/relay/pull/4181)) - Support inbound filters for profiles. ([#4176](https://github.com/getsentry/relay/pull/4176)) - Scrub lower-case redis commands. ([#4235](https://github.com/getsentry/relay/pull/4235)) +- Make the maximum idle time of a HTTP connection configurable. ([#4248](https://github.com/getsentry/relay/pull/4248)) **Internal**: diff --git a/relay-config/src/config.rs b/relay-config/src/config.rs index 9370698e6f..5025831d6e 100644 --- a/relay-config/src/config.rs +++ b/relay-config/src/config.rs @@ -633,10 +633,17 @@ struct Limits { /// The maximum number of seconds to wait for pending envelopes after receiving a shutdown /// signal. shutdown_timeout: u64, - /// server keep-alive timeout in seconds. + /// Server keep-alive timeout in seconds. /// /// By default keep-alive is set to a 5 seconds. keepalive_timeout: u64, + /// Server idle timeout in seconds. + /// + /// The idle timeout limits the amount of time a connection is kept open without activity. + /// Setting this too short may abort connections before Relay is able to send a response. + /// + /// By default there is no idle timeout. + idle_timeout: Option, /// The TCP listen backlog. /// /// Configures the TCP listen backlog for the listening socket of Relay. @@ -673,6 +680,7 @@ impl Default for Limits { query_timeout: 30, shutdown_timeout: 10, keepalive_timeout: 5, + idle_timeout: None, tcp_listen_backlog: 1024, } } @@ -2319,6 +2327,11 @@ impl Config { Duration::from_secs(self.values.limits.keepalive_timeout) } + /// Returns the server idle timeout in seconds. + pub fn idle_timeout(&self) -> Option { + self.values.limits.idle_timeout.map(Duration::from_secs) + } + /// TCP listen backlog to configure on Relay's listening socket. pub fn tcp_listen_backlog(&self) -> u32 { self.values.limits.tcp_listen_backlog diff --git a/relay-server/src/services/server/acceptor.rs b/relay-server/src/services/server/acceptor.rs new file mode 100644 index 0000000000..d8607e91be --- /dev/null +++ b/relay-server/src/services/server/acceptor.rs @@ -0,0 +1,98 @@ +use std::io; +use std::time::Duration; + +use axum_server::accept::Accept; +use socket2::TcpKeepalive; +use tokio::net::TcpStream; + +use crate::services::server::io::IdleTimeout; +use crate::statsd::RelayCounters; + +#[derive(Clone, Debug, Default)] +pub struct RelayAcceptor { + tcp_keepalive: Option, + idle_timeout: Option, +} + +impl RelayAcceptor { + /// Creates a new [`RelayAcceptor`] which only configures `TCP_NODELAY`. + pub fn new() -> Self { + Default::default() + } + + /// Configures the acceptor to enable TCP keep-alive. + /// + /// The `timeout` is used to configure the keep-alive time as well as interval. + /// A zero duration timeout disables TCP keep-alive. + /// + /// `retries` configures the amount of keep-alive probes. + pub fn tcp_keepalive(mut self, timeout: Duration, retries: u32) -> Self { + if timeout.is_zero() { + self.tcp_keepalive = None; + return self; + } + + let mut keepalive = socket2::TcpKeepalive::new().with_time(timeout); + #[cfg(not(any(target_os = "openbsd", target_os = "redox", target_os = "solaris")))] + { + keepalive = keepalive.with_interval(timeout); + } + #[cfg(not(any( + target_os = "openbsd", + target_os = "redox", + target_os = "solaris", + target_os = "windows" + )))] + { + keepalive = keepalive.with_retries(retries); + } + self.tcp_keepalive = Some(keepalive); + + self + } + + /// Configures an idle timeout for the connection. + /// + /// Whenever there is no activity on a connection for the specified timeout, + /// the connection is closed. + /// + /// Note: This limits the total idle time of a duration and unlike read and write timeouts + /// also limits the time a connection is kept alive without requests. + pub fn idle_timeout(mut self, idle_timeout: Option) -> Self { + self.idle_timeout = idle_timeout; + self + } +} + +impl Accept for RelayAcceptor { + type Stream = IdleTimeout; + type Service = S; + type Future = std::future::Ready>; + + fn accept(&self, stream: TcpStream, service: S) -> Self::Future { + let mut keepalive = "ok"; + let mut nodelay = "ok"; + + if let Some(tcp_keepalive) = &self.tcp_keepalive { + let sock_ref = socket2::SockRef::from(&stream); + if let Err(e) = sock_ref.set_tcp_keepalive(tcp_keepalive) { + relay_log::trace!("error trying to set TCP keepalive: {e}"); + keepalive = "error"; + } + } + + if let Err(e) = stream.set_nodelay(true) { + relay_log::trace!("failed to set TCP_NODELAY: {e}"); + nodelay = "error"; + } + + relay_statsd::metric!( + counter(RelayCounters::ServerSocketAccept) += 1, + keepalive = keepalive, + nodelay = nodelay + ); + + let stream = IdleTimeout::new(stream, self.idle_timeout); + std::future::ready(Ok((stream, service))) + } +} diff --git a/relay-server/src/services/server/io.rs b/relay-server/src/services/server/io.rs new file mode 100644 index 0000000000..941b656c09 --- /dev/null +++ b/relay-server/src/services/server/io.rs @@ -0,0 +1,265 @@ +use std::pin::Pin; +use std::task::{Context, Poll}; +use std::time::Duration; + +use futures::FutureExt; +use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; +use tokio::time::{Instant, Sleep}; + +use crate::statsd::RelayCounters; + +/// A wrapper for [`AsyncRead`] and [`AsyncWrite`] streams with a maximum idle time. +/// +/// If there is no activity in the underlying stream for the specified `timeout` +/// the [`IdleTimeout`] will abort the operation and return [`std::io::ErrorKind::TimedOut`]. +pub struct IdleTimeout { + inner: T, + timeout: Option, + // `Box::pin` the sleep timer, the entire connection/stream is required to be `Unpin` anyways. + timer: Option>>, + is_idle: bool, +} + +impl IdleTimeout { + /// Creates a new [`IdleTimeout`] with the specified timeout. + /// + /// A `None` timeout is equivalent to an infinite timeout. + pub fn new(inner: T, timeout: Option) -> Self { + Self { + inner, + timeout, + timer: None, + is_idle: false, + } + } + + fn wrap_poll( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + poll_fn: F, + ) -> Poll> + where + F: FnOnce(Pin<&mut T>, &mut Context<'_>) -> Poll>, + { + match poll_fn(Pin::new(&mut self.inner), cx) { + Poll::Ready(ret) => { + // Any activity on the stream resets the timeout. + self.is_idle = false; + Poll::Ready(ret) + } + Poll::Pending => { + // No timeout configured -> nothing to do. + let Some(timeout) = self.timeout else { + return Poll::Pending; + }; + + let was_idle = self.is_idle; + self.is_idle = true; + + let timer = match &mut self.timer { + // No timer created and we're idle now, create a timer with the appropriate deadline. + entry @ None => entry.insert(Box::pin(tokio::time::sleep(timeout))), + Some(sleep) => { + // Only if we were not idle, we have to reset the schedule. + if !was_idle { + let deadline = Instant::now() + timeout; + sleep.as_mut().reset(deadline); + } + sleep + } + }; + + match timer.poll_unpin(cx) { + Poll::Ready(_) => { + relay_log::trace!("closing idle server connection"); + relay_statsd::metric!( + counter(RelayCounters::ServerConnectionIdleTimeout) += 1 + ); + Poll::Ready(Err(std::io::ErrorKind::TimedOut.into())) + } + Poll::Pending => Poll::Pending, + } + } + } + } +} + +impl AsyncRead for IdleTimeout { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll> { + self.wrap_poll(cx, |stream, cx| stream.poll_read(cx, buf)) + } +} + +impl AsyncWrite for IdleTimeout { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + self.wrap_poll(cx, |stream, cx| stream.poll_write(cx, buf)) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.wrap_poll(cx, |stream, cx| stream.poll_flush(cx)) + } + + fn poll_shutdown( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + self.wrap_poll(cx, |stream, cx| stream.poll_shutdown(cx)) + } + + fn poll_write_vectored( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + bufs: &[std::io::IoSlice<'_>], + ) -> Poll> { + self.wrap_poll(cx, |stream, cx| stream.poll_write_vectored(cx, bufs)) + } + + fn is_write_vectored(&self) -> bool { + self.inner.is_write_vectored() + } +} + +#[cfg(test)] +mod tests { + use std::io::ErrorKind; + + use tokio::io::{AsyncReadExt, AsyncWriteExt, SimplexStream}; + + use super::*; + + macro_rules! assert_timeout { + ($duration:expr, $future:expr) => { + if let Ok(r) = tokio::time::timeout($duration, $future).await { + assert!( + false, + "expected {} to fail, but it returned {:?} in time", + stringify!($future), + r + ) + } + }; + } + + #[tokio::test(start_paused = true)] + async fn test_read() { + let (receiver, mut sender) = tokio::io::simplex(64); + let mut receiver = IdleTimeout::new(receiver, Some(Duration::from_secs(3))); + + assert_timeout!(Duration::from_millis(2900), receiver.read_u8()); + assert_timeout!(Duration::from_millis(70), receiver.read_u8()); + assert_timeout!(Duration::from_millis(29), receiver.read_u8()); + + sender.write_u8(1).await.unwrap(); + assert_eq!(receiver.read_u8().await.unwrap(), 1); + + // Timeout must be reset after reading. + assert_timeout!(Duration::from_millis(2900), receiver.read_u8()); + assert_timeout!(Duration::from_millis(70), receiver.read_u8()); + assert_timeout!(Duration::from_millis(29), receiver.read_u8()); + + // Only now it should fail. + assert_eq!( + receiver.read_u8().await.unwrap_err().kind(), + ErrorKind::TimedOut + ); + } + + #[tokio::test(start_paused = true)] + async fn test_read_no_idle_time() { + let (receiver, _sender) = tokio::io::simplex(64); + let mut receiver = IdleTimeout::new(receiver, None); + + // A year should be enough... + assert_timeout!(Duration::from_secs(365 * 24 * 3600), receiver.read_u8()); + } + + #[tokio::test(start_paused = true)] + async fn test_write() { + let (mut receiver, sender) = tokio::io::simplex(1); + let mut sender = IdleTimeout::new(sender, Some(Duration::from_secs(3))); + + // First byte can immediately write. + sender.write_u8(1).await.unwrap(); + // Second write, is blocked on the 1 byte sized buffer. + assert_timeout!(Duration::from_millis(2900), sender.write_u8(2)); + assert_timeout!(Duration::from_millis(70), sender.write_u8(2)); + assert_timeout!(Duration::from_millis(29), sender.write_u8(2)); + + // Consume the blocking byte and write successfully. + assert_eq!(receiver.read_u8().await.unwrap(), 1); + sender.write_u8(2).await.unwrap(); + + // Timeout must be reset. + assert_timeout!(Duration::from_millis(2900), sender.write_u8(3)); + assert_timeout!(Duration::from_millis(70), sender.write_u8(3)); + assert_timeout!(Duration::from_millis(29), sender.write_u8(3)); + + // Only now it should fail. + assert_eq!( + sender.write_u8(3).await.unwrap_err().kind(), + ErrorKind::TimedOut + ); + } + + #[tokio::test(start_paused = true)] + async fn test_write_no_timeout() { + let (_receiver, sender) = tokio::io::simplex(1); + let mut sender = IdleTimeout::new(sender, None); + + sender.write_u8(1).await.unwrap(); + // A year should be enough... + assert_timeout!(Duration::from_secs(365 * 24 * 3600), sender.write_u8(2)); + } + + #[tokio::test(start_paused = true)] + async fn test_read_write() { + let stream = SimplexStream::new_unsplit(1); + let mut stream = IdleTimeout::new(stream, Some(Duration::from_secs(3))); + + // First byte can immediately write. + stream.write_u8(1).await.unwrap(); + // And read. + assert_eq!(stream.read_u8().await.unwrap(), 1); + + // The buffer is empty, but we should not time out. + assert_timeout!(Duration::from_millis(2900), stream.read_u8()); + assert_timeout!(Duration::from_millis(70), stream.read_u8()); + assert_timeout!(Duration::from_millis(29), stream.read_u8()); + + // A write resets the read timer. + stream.write_u8(2).await.unwrap(); + tokio::time::advance(Duration::from_millis(2900)).await; + assert_eq!(stream.read_u8().await.unwrap(), 2); + + // Same for writes. + stream.write_u8(3).await.unwrap(); + assert_timeout!(Duration::from_millis(2900), stream.write_u8(3)); + assert_timeout!(Duration::from_millis(70), stream.write_u8(3)); + assert_timeout!(Duration::from_millis(29), stream.write_u8(3)); + + assert_eq!(stream.read_u8().await.unwrap(), 3); + tokio::time::advance(Duration::from_millis(2900)).await; + stream.write_u8(99).await.unwrap(); + + // Buffer is full and no one is clearing it, this should fail. + assert_eq!( + stream.write_u8(0).await.unwrap_err().kind(), + ErrorKind::TimedOut + ); + + // Make sure reads are also timing out. + assert_eq!(stream.read_u8().await.unwrap(), 99); + assert_eq!( + stream.read_u8().await.unwrap_err().kind(), + ErrorKind::TimedOut + ); + } +} diff --git a/relay-server/src/services/server.rs b/relay-server/src/services/server/mod.rs similarity index 76% rename from relay-server/src/services/server.rs rename to relay-server/src/services/server/mod.rs index 659485211e..d2661c1e98 100644 --- a/relay-server/src/services/server.rs +++ b/relay-server/src/services/server/mod.rs @@ -1,4 +1,3 @@ -use std::io; use std::net::{SocketAddr, TcpListener}; use std::sync::Arc; use std::time::Duration; @@ -6,13 +5,11 @@ use std::time::Duration; use axum::extract::Request; use axum::http::{header, HeaderName, HeaderValue}; use axum::ServiceExt; -use axum_server::accept::Accept; use axum_server::Handle; use hyper_util::rt::TokioTimer; use relay_config::Config; use relay_system::{Controller, Service, Shutdown}; -use socket2::TcpKeepalive; -use tokio::net::{TcpSocket, TcpStream}; +use tokio::net::TcpSocket; use tower::ServiceBuilder; use tower_http::compression::predicate::SizeAbove; use tower_http::compression::{CompressionLayer, DefaultPredicate, Predicate}; @@ -26,6 +23,9 @@ use crate::middlewares::{ use crate::service::ServiceState; use crate::statsd::{RelayCounters, RelayGauges}; +mod acceptor; +mod io; + /// Set the number of keep-alive retransmissions to be carried out before declaring that remote end /// is not available. const KEEPALIVE_RETRIES: u32 = 5; @@ -47,7 +47,7 @@ const COMPRESSION_MIN_SIZE: u16 = 128; pub enum ServerError { /// Binding failed. #[error("bind to interface failed")] - BindFailed(#[from] io::Error), + BindFailed(#[from] std::io::Error), /// TLS support was not compiled in. #[error("SSL is no longer supported by Relay, please use a proxy in front")] @@ -112,78 +112,15 @@ fn listen(config: &Config) -> Result { Ok(socket.listen(config.tcp_listen_backlog())?.into_std()?) } -fn build_keepalive(config: &Config) -> Option { - let ka_timeout = config.keepalive_timeout(); - if ka_timeout.is_zero() { - return None; - } - - let mut ka = TcpKeepalive::new().with_time(ka_timeout); - #[cfg(not(any(target_os = "openbsd", target_os = "redox", target_os = "solaris")))] - { - ka = ka.with_interval(ka_timeout); - } - - #[cfg(not(any( - target_os = "openbsd", - target_os = "redox", - target_os = "solaris", - target_os = "windows" - )))] - { - ka = ka.with_retries(KEEPALIVE_RETRIES); - } - - Some(ka) -} - -#[derive(Clone, Debug, Default)] -pub struct KeepAliveAcceptor(Option); - -impl KeepAliveAcceptor { - /// Create a new acceptor that sets `TCP_NODELAY` and keep-alive. - pub fn new(config: &Config) -> Self { - Self(build_keepalive(config)) - } -} - -impl Accept for KeepAliveAcceptor { - type Stream = TcpStream; - type Service = S; - type Future = std::future::Ready>; - - fn accept(&self, stream: TcpStream, service: S) -> Self::Future { - let mut keepalive = "ok"; - let mut nodelay = "ok"; - - if let Self(Some(ref tcp_keepalive)) = self { - let sock_ref = socket2::SockRef::from(&stream); - if let Err(e) = sock_ref.set_tcp_keepalive(tcp_keepalive) { - relay_log::trace!("error trying to set TCP keepalive: {e}"); - keepalive = "error"; - } - } - - if let Err(e) = stream.set_nodelay(true) { - relay_log::trace!("failed to set TCP_NODELAY: {e}"); - nodelay = "error"; - } - - relay_statsd::metric!( - counter(RelayCounters::ServerSocketAccept) += 1, - keepalive = keepalive, - nodelay = nodelay - ); - - std::future::ready(Ok((stream, service))) - } -} - fn serve(listener: TcpListener, app: App, config: Arc) { let handle = Handle::new(); + let acceptor = self::acceptor::RelayAcceptor::new() + .tcp_keepalive(config.keepalive_timeout(), KEEPALIVE_RETRIES) + .idle_timeout(config.idle_timeout()); + let mut server = axum_server::from_tcp(listener) - .acceptor(KeepAliveAcceptor::new(&config)) + .acceptor(acceptor) .handle(handle.clone()); server diff --git a/relay-server/src/statsd.rs b/relay-server/src/statsd.rs index 1cb11ac3cc..a5a7e19801 100644 --- a/relay-server/src/statsd.rs +++ b/relay-server/src/statsd.rs @@ -871,6 +871,8 @@ pub enum RelayCounters { ReplayExceededSegmentLimit, /// Incremented every time the server accepts a new connection. ServerSocketAccept, + /// Incremented every time the server aborts a connection because of an idle timeout. + ServerConnectionIdleTimeout, } impl CounterMetric for RelayCounters { @@ -919,6 +921,7 @@ impl CounterMetric for RelayCounters { RelayCounters::BucketsDropped => "metrics.buckets.dropped", RelayCounters::ReplayExceededSegmentLimit => "replay.segment_limit_exceeded", RelayCounters::ServerSocketAccept => "server.http.accepted", + RelayCounters::ServerConnectionIdleTimeout => "server.http.idle_timeout", } } }