From 4362e381aff272a3ea4eeae777f2d361cb416062 Mon Sep 17 00:00:00 2001 From: Finn Bear Date: Sun, 25 Aug 2024 23:45:28 -0700 Subject: [PATCH] feat(server): add h1 `idle_timeout` --- src/error.rs | 10 ++++ src/proto/h1/conn.rs | 117 ++++++++++++++++++++++++++++----------- src/proto/h1/io.rs | 4 +- src/server/conn/http1.rs | 20 +++++++ tests/server.rs | 110 ++++++++++++++++++++++++++++++++++-- 5 files changed, 224 insertions(+), 37 deletions(-) diff --git a/src/error.rs b/src/error.rs index 9ad4c0e5b3..18e66c77e3 100644 --- a/src/error.rs +++ b/src/error.rs @@ -64,6 +64,9 @@ pub(super) enum Kind { /// User took too long to send headers #[cfg(all(feature = "http1", feature = "server"))] HeaderTimeout, + /// User took too long to send another request + #[cfg(all(feature = "http1", feature = "server"))] + IdleTimeout, /// Error while reading a body from connection. #[cfg(all( any(feature = "client", feature = "server"), @@ -360,6 +363,11 @@ impl Error { Error::new(Kind::HeaderTimeout) } + #[cfg(all(feature = "http1", feature = "server"))] + pub(super) fn new_idle_timeout() -> Error { + Error::new(Kind::IdleTimeout) + } + #[cfg(feature = "http1")] #[cfg(feature = "server")] pub(super) fn new_user_unsupported_status_code() -> Error { @@ -458,6 +466,8 @@ impl Error { Kind::Canceled => "operation was canceled", #[cfg(all(feature = "http1", feature = "server"))] Kind::HeaderTimeout => "read header from client timeout", + #[cfg(all(feature = "http1", feature = "server"))] + Kind::IdleTimeout => "idle client timeout", #[cfg(all( any(feature = "client", feature = "server"), any(feature = "http1", feature = "http2") diff --git a/src/proto/h1/conn.rs b/src/proto/h1/conn.rs index 10f4f87b40..4d15d92a47 100644 --- a/src/proto/h1/conn.rs +++ b/src/proto/h1/conn.rs @@ -61,10 +61,14 @@ where #[cfg(feature = "server")] h1_header_read_timeout: None, #[cfg(feature = "server")] - h1_header_read_timeout_fut: None, - #[cfg(feature = "server")] h1_header_read_timeout_running: false, #[cfg(feature = "server")] + h1_idle_timeout: None, + #[cfg(feature = "server")] + h1_idle_timeout_running: false, + #[cfg(feature = "server")] + h1_timeout_fut: None, + #[cfg(feature = "server")] date_header: true, #[cfg(feature = "server")] timer: Time::Empty, @@ -147,6 +151,11 @@ where self.state.h1_header_read_timeout = Some(val); } + #[cfg(feature = "server")] + pub(crate) fn set_http1_idle_timeout(&mut self, val: Duration) { + self.state.h1_idle_timeout = Some(val); + } + #[cfg(feature = "server")] pub(crate) fn set_allow_half_close(&mut self) { self.state.allow_half_close = true; @@ -217,24 +226,10 @@ where trace!("Conn::read_head"); #[cfg(feature = "server")] - if !self.state.h1_header_read_timeout_running { - if let Some(h1_header_read_timeout) = self.state.h1_header_read_timeout { - let deadline = Instant::now() + h1_header_read_timeout; - self.state.h1_header_read_timeout_running = true; - match self.state.h1_header_read_timeout_fut { - Some(ref mut h1_header_read_timeout_fut) => { - trace!("resetting h1 header read timeout timer"); - self.state.timer.reset(h1_header_read_timeout_fut, deadline); - } - None => { - trace!("setting h1 header read timeout timer"); - self.state.h1_header_read_timeout_fut = - Some(self.state.timer.sleep_until(deadline)); - } - } - } - } + let first_head = !self.io.is_read_blocked(); + #[cfg_attr(not(feature = "server"), allow(unused))] + let mut progress = false; let msg = match self.io.parse::( cx, ParseContext { @@ -249,20 +244,69 @@ where #[cfg(feature = "ffi")] on_informational: &mut self.state.on_informational, }, + &mut progress, ) { Poll::Ready(Ok(msg)) => msg, Poll::Ready(Err(e)) => return self.on_read_head_error(e), Poll::Pending => { + // - Use the read timeout on the first head to avoid common DoS. + // - If made progress in reading header, must no longer be idle. #[cfg(feature = "server")] - if self.state.h1_header_read_timeout_running { - if let Some(ref mut h1_header_read_timeout_fut) = - self.state.h1_header_read_timeout_fut - { - if Pin::new(h1_header_read_timeout_fut).poll(cx).is_ready() { - self.state.h1_header_read_timeout_running = false; - - warn!("read header from client timeout"); - return Poll::Ready(Some(Err(crate::Error::new_header_timeout()))); + if first_head || progress { + if !self.state.h1_header_read_timeout_running { + if let Some(h1_header_read_timeout) = self.state.h1_header_read_timeout { + let deadline = Instant::now() + h1_header_read_timeout; + self.state.h1_idle_timeout_running = false; + self.state.h1_header_read_timeout_running = true; + match self.state.h1_timeout_fut { + Some(ref mut ht_timeout_fut) => { + trace!("resetting h1 timeout timer for header read"); + self.state.timer.reset(ht_timeout_fut, deadline); + } + None => { + trace!("setting h1 timeout timer for header read"); + self.state.h1_timeout_fut = + Some(self.state.timer.sleep_until(deadline)); + } + } + } else if std::mem::take(&mut self.state.h1_idle_timeout_running) { + trace!("unsetting h1 timeout timer for idle"); + self.state.h1_timeout_fut = None; + } + } + } else if !self.state.h1_header_read_timeout_running + && !self.state.h1_idle_timeout_running + { + if let Some(h1_idle_timeout) = self.state.h1_idle_timeout { + let deadline = Instant::now() + h1_idle_timeout; + self.state.h1_idle_timeout_running = true; + match self.state.h1_timeout_fut { + Some(ref mut h1_timeout_fut) => { + trace!("resetting h1 timeout timer for idle"); + self.state.timer.reset(h1_timeout_fut, deadline); + } + None => { + trace!("setting h1 timeout timer for idle"); + self.state.h1_timeout_fut = + Some(self.state.timer.sleep_until(deadline)); + } + } + } + } + + #[cfg(feature = "server")] + if self.state.h1_header_read_timeout_running || self.state.h1_idle_timeout_running { + if let Some(ref mut h1_timeout_fut) = self.state.h1_timeout_fut { + if Pin::new(h1_timeout_fut).poll(cx).is_ready() { + return Poll::Ready(Some(Err( + if self.state.h1_header_read_timeout_running { + warn!("read header from client timeout"); + crate::Error::new_header_timeout() + } else { + warn!("idle client timeout"); + crate::Error::new_idle_timeout() + }, + ))); } } } @@ -274,7 +318,8 @@ where #[cfg(feature = "server")] { self.state.h1_header_read_timeout_running = false; - self.state.h1_header_read_timeout_fut = None; + self.state.h1_idle_timeout_running = false; + self.state.h1_timeout_fut = None; } // Note: don't deconstruct `msg` into local variables, it appears @@ -919,10 +964,14 @@ struct State { #[cfg(feature = "server")] h1_header_read_timeout: Option, #[cfg(feature = "server")] - h1_header_read_timeout_fut: Option>>, - #[cfg(feature = "server")] h1_header_read_timeout_running: bool, #[cfg(feature = "server")] + h1_idle_timeout: Option, + #[cfg(feature = "server")] + h1_idle_timeout_running: bool, + #[cfg(feature = "server")] + h1_timeout_fut: Option>>, + #[cfg(feature = "server")] date_header: bool, #[cfg(feature = "server")] timer: Time, @@ -1106,6 +1155,12 @@ impl State { self.reading = Reading::Init; self.writing = Writing::Init; + #[cfg(feature = "server")] + if self.h1_idle_timeout.is_some() { + // Next read will start and poll the idle timeout. + self.notify_read = true; + } + // !T::should_read_first() means Client. // // If Client connection has just gone idle, the Dispatcher diff --git a/src/proto/h1/io.rs b/src/proto/h1/io.rs index 4ad2fca1f4..ca55ae8129 100644 --- a/src/proto/h1/io.rs +++ b/src/proto/h1/io.rs @@ -169,6 +169,7 @@ where &mut self, cx: &mut Context<'_>, parse_ctx: ParseContext<'_>, + progress: &mut bool, ) -> Poll>> where S: Http1Transaction, @@ -205,6 +206,7 @@ where trace!("parse eof"); return Poll::Ready(Err(crate::Error::new_incomplete())); } + *progress = true; } } @@ -702,7 +704,7 @@ mod tests { on_informational: &mut None, }; assert!(buffered - .parse::(cx, parse_ctx) + .parse::(cx, parse_ctx, &mut false) .is_pending()); Poll::Ready(()) }) diff --git a/src/server/conn/http1.rs b/src/server/conn/http1.rs index 097497bf41..bf0d949ee6 100644 --- a/src/server/conn/http1.rs +++ b/src/server/conn/http1.rs @@ -76,6 +76,7 @@ pub struct Builder { h1_preserve_header_case: bool, h1_max_headers: Option, h1_header_read_timeout: Dur, + h1_idle_timeout: Dur, h1_writev: Option, max_buf_size: Option, pipeline_flush: bool, @@ -238,6 +239,7 @@ impl Builder { h1_preserve_header_case: false, h1_max_headers: None, h1_header_read_timeout: Dur::Default(Some(Duration::from_secs(30))), + h1_idle_timeout: Dur::Default(Some(Duration::from_secs(120))), h1_writev: None, max_buf_size: None, pipeline_flush: false, @@ -322,6 +324,21 @@ impl Builder { self } + /// Set a timeout for idle time between requests. If a client does not + /// transmit another request within this time after receiving the last + /// response, the connection is closed. + /// + /// Requires a [`Timer`] set by [`Builder::timer`] to take effect. Panics if `idle_timeout` is configured + /// without a [`Timer`]. + /// + /// Pass `None` to disable. + /// + /// Default is 120 seconds. + pub fn idle_timeout(&mut self, idle_timeout: impl Into>) -> &mut Self { + self.h1_idle_timeout = Dur::Configured(idle_timeout.into()); + self + } + /// Set whether HTTP/1 connections should try to use vectored writes, /// or always flatten into a single buffer. /// @@ -448,6 +465,9 @@ impl Builder { { conn.set_http1_header_read_timeout(dur); }; + if let Some(dur) = self.timer.check(self.h1_idle_timeout, "idle_timeout") { + conn.set_http1_idle_timeout(dur); + }; if let Some(writev) = self.h1_writev { if writev { conn.set_write_strategy_queue(); diff --git a/tests/server.rs b/tests/server.rs index 5120ad776f..54d4b05253 100644 --- a/tests/server.rs +++ b/tests/server.rs @@ -1500,7 +1500,6 @@ async fn header_read_timeout_slow_writes() { tcp.write_all( b"\ Something: 1\r\n\ - \r\n\ ", ) .expect("write 2"); @@ -1593,21 +1592,31 @@ async fn header_read_timeout_slow_writes_multiple_requests() { thread::sleep(Duration::from_secs(6)); + // idle would timeout in 2s, switched to header read with timeout in 5s tcp.write_all( b"\ GET / HTTP/1.1\r\n\ Something: 1\r\n\ - \r\n\ ", ) .expect("write 5"); - thread::sleep(Duration::from_secs(6)); + thread::sleep(Duration::from_secs(3)); + // idle would have timed out, header read times out in 2s tcp.write_all( b"\ - Works: 0\r\n\ + Is: 0\r\n\ + ", + ) + .expect("write 6"); + thread::sleep(Duration::from_secs(3)); + // header read timed out 1s ago + tcp.write_all( + b"\ + Wrong: 0\r\n\ ", ) - .expect_err("write 6"); + .and(tcp.flush()) + .expect_err("write 7"); }); let (socket, _) = listener.accept().await.unwrap(); @@ -1615,6 +1624,7 @@ async fn header_read_timeout_slow_writes_multiple_requests() { let conn = http1::Builder::new() .timer(TokioTimer) .header_read_timeout(Duration::from_secs(5)) + .idle_timeout(Duration::from_secs(8)) .serve_connection( socket, service_fn(|_| { @@ -1628,6 +1638,96 @@ async fn header_read_timeout_slow_writes_multiple_requests() { conn.without_shutdown().await.expect_err("header timeout"); } +#[tokio::test] +async fn no_idle_timeout_multiple_requests() { + let (listener, addr) = setup_tcp_listener(); + + let done = Arc::new(AtomicBool::new(false)); + let done_clone = Arc::clone(&done); + thread::spawn(move || { + let mut tcp = connect(&addr); + + for i in 1..=3 { + tcp.write_all( + b"\ + GET / HTTP/1.1\r\n\ + \r\n\ + ", + ) + .and(tcp.flush()) + .expect(&format!("write {i}")); + thread::sleep(Duration::from_secs(3)); + } + + done_clone.store(true, Ordering::SeqCst); + drop(tcp); + }); + + let (socket, _) = listener.accept().await.unwrap(); + let socket = TokioIo::new(socket); + let conn = http1::Builder::new() + .timer(TokioTimer) + .idle_timeout(None) + .serve_connection( + socket, + service_fn(|_| { + let res = Response::builder() + .status(200) + .body(Empty::::new()) + .unwrap(); + future::ready(Ok::<_, hyper::Error>(res)) + }), + ); + conn.without_shutdown().await.expect("hung up"); + assert!(done.load(Ordering::SeqCst)); +} + +#[tokio::test] +async fn idle_timeout_multiple_intermittent_requests() { + let (listener, addr) = setup_tcp_listener(); + + let done = Arc::new(AtomicBool::new(false)); + let done_clone = Arc::clone(&done); + thread::spawn(move || { + let mut tcp = connect(&addr); + + for i in 1..=3 { + tcp.write_all( + b"\ + GET / HTTP/1.1\r\n\ + \r\n\ + ", + ) + .and(tcp.flush()) + .expect(&format!("write {i}")); + thread::sleep(Duration::from_secs(3)); + } + + done_clone.store(true, Ordering::SeqCst); + + thread::sleep(Duration::from_secs(3)); + drop(tcp); + }); + + let (socket, _) = listener.accept().await.unwrap(); + let socket = TokioIo::new(socket); + let conn = http1::Builder::new() + .timer(TokioTimer) + .idle_timeout(Duration::from_secs(5)) + .serve_connection( + socket, + service_fn(|_| { + let res = Response::builder() + .status(200) + .body(Empty::::new()) + .unwrap(); + future::ready(Ok::<_, hyper::Error>(res)) + }), + ); + conn.without_shutdown().await.expect_err("idle timeout"); + assert!(done.load(Ordering::SeqCst)); +} + #[tokio::test] async fn upgrades() { let (listener, addr) = setup_tcp_listener();