From 7b69644f4ab62b7c29ce0da84c33665a0391b4a3 Mon Sep 17 00:00:00 2001 From: Chinedu Francis Nwafili Date: Tue, 10 Dec 2024 07:07:49 -0500 Subject: [PATCH] feat(http1): graceful shutdown wait for first byte This commit introduces a new `Connection::graceful_shutdown_with_config` method that gives users control over the HTTP/1 graceful process. Before this commit, if a graceful shutdown was initiated on an inactive connection hyper would immediately close it. As of this commit the `GracefulShutdownConfig::first_byte_read_timeout` method can be used to give inactive connections a grace period where, should the server begin receiving bytes from them before the deadline, the request will be processed. ## HTTP/2 Graceful Shutdowns This commit does not modify hyper's HTTP/2 graceful shutdown process. `hyper` already uses the HTTP/2 `GOAWAY` frame, meaning that `hyper` already gives inactive connections a brief period during which they can transmit their final requests. Note that while this commit enables slightly more graceful shutdowns for HTTP/1 servers, HTTP/2 graceful shutdowns are still superior. HTTP/2's `GOAWAY` frame allows the server to finish processing a last batch of multiple incoming requests from the client, whereas the new graceful shutdown configuration in this commit only allows the server to wait for one final incoming request to be received. This limitations stems from a limitation in HTTP/1, where there is nothing like the `GOAWAY` frame that can be used to coordinate the graceful shutdown process with the client in the face of multiple concurrent incoming requests. Instead for HTTP/1 connections `hyper` gracefully shuts down by disabling Keep-Alive, at which point the server will only receive at most one new request, even if the client has multiple requests that are moments from reaching the server. ## Motivating Use Case I'm working on a server that is being designed to handle a large amount of traffic from a large number of clients. It is expected that many clients will open many TCP connections with the server every second. As a server receives more traffic it becomes increasingly likely that at the point that it begins gracefully shutting down there are connections that were just opened, but the client's bytes have not yet been seen by the server. Before this commit, calling `Connection::graceful_shutdown` on such a freshly opened HTTP/1 connection will immediately close it. This means that the client will get an error, despite the server having been perfectly capable of handling a final request before closing the connection. This commit solves this problem for HTTP/1 clients that tend to send one request at a time. By setting a `GracefulShutdownConfig::first_byte_read_timeout` of, say, 1 second, the server will wait a moment to see if any of the client's bytes have been received. During this period, the client will have been informed that Keep-Alive is now disabled, meaning that at most one more request will be processed. Clients that have multiple in-flight requests that have not yet reached the server will have at most one of those requests handled, even if all of them reach the server before the `first_byte_read_timeout`. This is a limitation of HTTP/1. ## Work to do in other Crates #### hyper-util To expose this to users that use `hyper-util`, a method should be added to `hyper-util`'s `Connection` type. This new `hyper-util Connection::graceful_shutdown_with_config` method would expose a `http1_first_byte_read_timeout` method that would lead `hyper-util` to set `hyper GracefulShutdownConfig::first_byte_read_timeout`. --- Closes https://github.com/hyperium/hyper/issues/3792 --- src/proto/h1/conn.rs | 124 +++++++++++++++++++++++++++++++-------- src/proto/h1/dispatch.rs | 21 ++++--- src/server/conn/http1.rs | 96 +++++++++++++++++++++++++++++- tests/server.rs | 93 +++++++++++++++++++++++++++++ 4 files changed, 295 insertions(+), 39 deletions(-) diff --git a/src/proto/h1/conn.rs b/src/proto/h1/conn.rs index 8ddf7558e1..0d2b4b950c 100644 --- a/src/proto/h1/conn.rs +++ b/src/proto/h1/conn.rs @@ -59,11 +59,9 @@ where h1_parser_config: ParserConfig::default(), h1_max_headers: None, #[cfg(feature = "server")] - h1_header_read_timeout: None, + h1_header_read_timeout: TimeoutState::default(), #[cfg(feature = "server")] - h1_header_read_timeout_fut: None, - #[cfg(feature = "server")] - h1_header_read_timeout_running: false, + h1_graceful_shutdown_first_byte_read_timeout: TimeoutState::default(), #[cfg(feature = "server")] date_header: true, #[cfg(feature = "server")] @@ -144,7 +142,14 @@ where #[cfg(feature = "server")] pub(crate) fn set_http1_header_read_timeout(&mut self, val: Duration) { - self.state.h1_header_read_timeout = Some(val); + self.state.h1_header_read_timeout.timeout = Some(val); + } + + #[cfg(feature = "server")] + pub(crate) fn set_http1_graceful_shutdown_first_byte_read_timeout(&mut self, val: Duration) { + self.state + .h1_graceful_shutdown_first_byte_read_timeout + .timeout = Some(val); } #[cfg(feature = "server")] @@ -209,6 +214,19 @@ where read_buf.len() >= 24 && read_buf[..24] == *H2_PREFACE } + fn close_if_inactive(&mut self) { + // When a graceful shutdown is triggered we wait for up to some + // `Duration` to allow for the client to begin transmitting bytes to the + // server. + // If that duration has elapsed and the connection is still idle, or + // no bytes have been received on the connection, then we close it. + // This prevents inactive connections from keeping the server alive + // despite having no intention of sending a request. + if self.is_idle() || self.has_initial_read_write_state() { + self.state.close(); + } + } + pub(super) fn poll_read_head( &mut self, cx: &mut Context<'_>, @@ -217,24 +235,50 @@ where trace!("Conn::read_head"); #[cfg(feature = "server")] - if !self.state.h1_header_read_timeout_running { - if let Some(h1_header_read_timeout) = self.state.h1_header_read_timeout { + if !self.state.h1_header_read_timeout.is_running { + if let Some(h1_header_read_timeout) = self.state.h1_header_read_timeout.timeout { + self.state.h1_header_read_timeout.is_running = true; let deadline = Instant::now() + h1_header_read_timeout; - self.state.h1_header_read_timeout_running = true; - match self.state.h1_header_read_timeout_fut { + match self.state.h1_header_read_timeout.deadline_fut { Some(ref mut h1_header_read_timeout_fut) => { trace!("resetting h1 header read timeout timer"); self.state.timer.reset(h1_header_read_timeout_fut, deadline); } None => { trace!("setting h1 header read timeout timer"); - self.state.h1_header_read_timeout_fut = + self.state.h1_header_read_timeout.deadline_fut = Some(self.state.timer.sleep_until(deadline)); } } } } + #[cfg(feature = "server")] + if !self + .state + .h1_graceful_shutdown_first_byte_read_timeout + .is_running + { + if let Some(h1_graceful_shutdown_timeout) = self + .state + .h1_graceful_shutdown_first_byte_read_timeout + .timeout + { + if h1_graceful_shutdown_timeout == Duration::from_secs(0) { + self.close_if_inactive(); + } else { + self.state + .h1_graceful_shutdown_first_byte_read_timeout + .is_running = true; + + let deadline = Instant::now() + h1_graceful_shutdown_timeout; + self.state + .h1_graceful_shutdown_first_byte_read_timeout + .deadline_fut = Some(self.state.timer.sleep_until(deadline)); + } + } + } + let msg = match self.io.parse::( cx, ParseContext { @@ -254,12 +298,12 @@ where Poll::Ready(Err(e)) => return self.on_read_head_error(e), Poll::Pending => { #[cfg(feature = "server")] - if self.state.h1_header_read_timeout_running { + if self.state.h1_header_read_timeout.is_running { if let Some(ref mut h1_header_read_timeout_fut) = - self.state.h1_header_read_timeout_fut + self.state.h1_header_read_timeout.deadline_fut { if Pin::new(h1_header_read_timeout_fut).poll(cx).is_ready() { - self.state.h1_header_read_timeout_running = false; + self.state.h1_header_read_timeout.is_running = false; warn!("read header from client timeout"); return Poll::Ready(Some(Err(crate::Error::new_header_timeout()))); @@ -267,14 +311,34 @@ where } } + #[cfg(feature = "server")] + if self + .state + .h1_graceful_shutdown_first_byte_read_timeout + .is_running + { + if let Some(ref mut h1_graceful_shutdown_timeout_fut) = self + .state + .h1_graceful_shutdown_first_byte_read_timeout + .deadline_fut + { + if Pin::new(h1_graceful_shutdown_timeout_fut) + .poll(cx) + .is_ready() + { + self.close_if_inactive(); + } + } + } + return Poll::Pending; } }; #[cfg(feature = "server")] { - self.state.h1_header_read_timeout_running = false; - self.state.h1_header_read_timeout_fut = None; + self.state.h1_header_read_timeout.is_running = false; + self.state.h1_header_read_timeout.deadline_fut = None; } // Note: don't deconstruct `msg` into local variables, it appears @@ -872,15 +936,15 @@ where self.state.close_write(); } + #[cfg(feature = "server")] + pub(crate) fn is_idle(&mut self) -> bool { + self.state.is_idle() + } + #[cfg(feature = "server")] pub(crate) fn disable_keep_alive(&mut self) { - if self.state.is_idle() { - trace!("disable_keep_alive; closing idle connection"); - self.state.close(); - } else { - trace!("disable_keep_alive; in-progress connection"); - self.state.disable_keep_alive(); - } + trace!("disable_keep_alive"); + self.state.disable_keep_alive(); } pub(crate) fn take_error(&mut self) -> crate::Result<()> { @@ -926,11 +990,11 @@ struct State { h1_parser_config: ParserConfig, h1_max_headers: Option, #[cfg(feature = "server")] - h1_header_read_timeout: Option, + h1_header_read_timeout: TimeoutState, + /// If a graceful shutdown is initiated, and the `TimeoutState` duration has elapsed without + /// receiving any bytes from the client, the connection will be closed. #[cfg(feature = "server")] - h1_header_read_timeout_fut: Option>>, - #[cfg(feature = "server")] - h1_header_read_timeout_running: bool, + h1_graceful_shutdown_first_byte_read_timeout: TimeoutState, #[cfg(feature = "server")] date_header: bool, #[cfg(feature = "server")] @@ -1144,6 +1208,14 @@ impl State { } } +#[derive(Default)] +#[cfg(feature = "server")] +struct TimeoutState { + timeout: Option, + deadline_fut: Option>>, + is_running: bool, +} + #[cfg(test)] mod tests { #[cfg(all(feature = "nightly", not(miri)))] diff --git a/src/proto/h1/dispatch.rs b/src/proto/h1/dispatch.rs index 79ea48be9f..ccb1c43b8f 100644 --- a/src/proto/h1/dispatch.rs +++ b/src/proto/h1/dispatch.rs @@ -1,3 +1,8 @@ +use crate::rt::{Read, Write}; +use bytes::{Buf, Bytes}; +use futures_util::ready; +use http::Request; +use std::time::Duration; use std::{ error::Error as StdError, future::Future, @@ -6,11 +11,6 @@ use std::{ task::{Context, Poll}, }; -use crate::rt::{Read, Write}; -use bytes::{Buf, Bytes}; -use futures_util::ready; -use http::Request; - use super::{Http1Transaction, Wants}; use crate::body::{Body, DecodedLength, Incoming as IncomingBody}; #[cfg(feature = "client")] @@ -90,13 +90,12 @@ where #[cfg(feature = "server")] pub(crate) fn disable_keep_alive(&mut self) { self.conn.disable_keep_alive(); + } - // If keep alive has been disabled and no read or write has been seen on - // the connection yet, we must be in a state where the server is being asked to - // shut down before any data has been seen on the connection - if self.conn.is_write_closed() || self.conn.has_initial_read_write_state() { - self.close(); - } + #[cfg(feature = "server")] + pub(crate) fn set_graceful_shutdown_first_byte_read_timeout(&mut self, read_timeout: Duration) { + self.conn + .set_http1_graceful_shutdown_first_byte_read_timeout(read_timeout); } pub(crate) fn into_inner(self) -> (I, Bytes, D) { diff --git a/src/server/conn/http1.rs b/src/server/conn/http1.rs index 097497bf41..84713dca47 100644 --- a/src/server/conn/http1.rs +++ b/src/server/conn/http1.rs @@ -123,7 +123,8 @@ where B: Body + 'static, B::Error: Into>, { - /// Start a graceful shutdown process for this connection. + /// Start a graceful shutdown process for this connection, using the default + /// [`GracefulShutdownConfig`]. /// /// This `Connection` should continue to be polled until shutdown /// can finish. @@ -133,8 +134,29 @@ where /// This should only be called while the `Connection` future is still /// pending. If called after `Connection::poll` has resolved, this does /// nothing. - pub fn graceful_shutdown(mut self: Pin<&mut Self>) { + pub fn graceful_shutdown(self: Pin<&mut Self>) { + self.graceful_shutdown_with_config(GracefulShutdownConfig::default()); + } + + /// Start a graceful shutdown process for this connection. + /// + /// This `Connection` should continue to be polled until shutdown can finish. + /// + /// Requires a [`Timer`] set by [`Builder::timer`]. + /// + /// # Note + /// + /// This should only be called while the `Connection` future is still + /// pending. If called after `Connection::poll` has resolved, this does + /// nothing. + /// + /// # Panics + /// If [`GracefulShutdownConfig::first_byte_read_timeout`] was configured to greater than zero + /// nanoseconds, but no timer was set, then the `Connection` will panic when it is next polled. + pub fn graceful_shutdown_with_config(mut self: Pin<&mut Self>, config: GracefulShutdownConfig) { self.conn.disable_keep_alive(); + self.conn + .set_graceful_shutdown_first_byte_read_timeout(config.first_byte_read_timeout); } /// Return the inner IO object, and additional information. @@ -526,3 +548,73 @@ where } } } + +/// Configuration for graceful shutdowns. +/// +/// # Example +/// +/// ``` +/// # use hyper::{body::Incoming, Request, Response}; +/// # use hyper::service::Service; +/// # use hyper::server::conn::http1::Builder; +/// # use hyper::rt::{Read, Write}; +/// # use std::time::Duration; +/// # use hyper::server::conn::http1::GracefulShutdownConfig; +/// # async fn run(some_io: I, some_service: S) +/// # where +/// # I: Read + Write + Unpin + Send + 'static, +/// # S: Service, Response=hyper::Response> + Send + 'static, +/// # S::Error: Into>, +/// # S::Future: Send, +/// # { +/// let http = Builder::new(); +/// let conn = http.serve_connection(some_io, some_service); +/// +/// let mut config = GracefulShutdownConfig::default(); +/// config.first_byte_read_timeout(Duration::from_secs(2)); +/// +/// conn.graceful_shutdown_with_config(config); +/// conn.await.unwrap(); +/// # } +/// # fn main() {} +/// ``` +#[derive(Debug)] +pub struct GracefulShutdownConfig { + first_byte_read_timeout: Duration, +} +impl Default for GracefulShutdownConfig { + fn default() -> Self { + GracefulShutdownConfig { + first_byte_read_timeout: Duration::from_secs(0), + } + } +} +impl GracefulShutdownConfig { + /// It is possible for a client to open a connection and begin transmitting bytes, but have the + /// server initiate a graceful shutdown just before it sees any of the client's bytes. + /// + /// The more traffic that a server receives, the more likely this race condition is to occur for + /// some of the open connections. + /// + /// The `first_byte_read_timeout` controls how long the server waits for the first bytes of a + /// final request to be received from the client. + /// + /// If no bytes were received from the client between the time that keep alive was disabled and + /// the `first_byte_timeout` duration, the connection is considered inactive and the server will + /// close it. + /// + /// # Recommendations + /// Servers are recommended to use a `first_byte_read_timeout` that reduces the likelihood of + /// the client receiving an error due to the connection closing just after they began + /// transmitting their final request. + /// For most internet connections, a roughly one second timeout should be enough time for the + /// server to begin receiving the client's request's bytes. + /// + /// # Default + /// A default of 0 seconds was chosen to remain backwards compatible with version of hyper that + /// did not have this `first_byte_read_timeout` configuration. + pub fn first_byte_read_timeout(&mut self, timeout: Duration) -> &mut Self { + self.first_byte_read_timeout = timeout; + self + } +} diff --git a/tests/server.rs b/tests/server.rs index f72cf62702..ecf5025d9f 100644 --- a/tests/server.rs +++ b/tests/server.rs @@ -27,6 +27,7 @@ use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::net::{TcpListener as TkTcpListener, TcpListener, TcpStream as TkTcpStream}; use hyper::body::{Body, Incoming as IncomingBody}; +use hyper::server::conn::http1::GracefulShutdownConfig; use hyper::server::conn::{http1, http2}; use hyper::service::{service_fn, Service}; use hyper::{Method, Request, Response, StatusCode, Uri, Version}; @@ -1348,6 +1349,38 @@ async fn http1_graceful_shutdown_after_upgrade() { conn.as_mut().graceful_shutdown(); } +/// When hyper reached 1.0 the `Connection::graceful_shutdown` did not require a timer. +/// It would immediately close connections that were idle or had not received any bytes. +/// A later release made it possible to wait some Duration before closing the inactive connection. +/// Here we confirm that `Connection::graceful_shutdown` does not require a timer. +#[tokio::test] +async fn http1_graceful_shutdown_no_timer_required_for_zero_second_next_byte_timeout() { + let (listener, addr) = setup_tcp_listener(); + + tokio::spawn(async move { + let mut stream = TkTcpStream::connect(addr).await.unwrap(); + + stream.write_all(b"GET / HTTP/1.1\r\n\r\n").await.unwrap(); + + let mut buf = vec![]; + stream.read_to_end(&mut buf).await.unwrap(); + }); + + let socket = listener.accept().await.unwrap().0; + let socket = TokioIo::new(socket); + + // Construct a builder that does not call the `.timer` method. + let future = http1::Builder::new().serve_connection(socket, HelloWorld); + pin!(future); + + future.as_mut().graceful_shutdown(); + + tokio::time::timeout(Duration::from_secs(5), future) + .await + .unwrap() + .unwrap(); +} + #[tokio::test] async fn empty_parse_eof_does_not_return_error() { let (listener, addr) = setup_tcp_listener(); @@ -2352,6 +2385,66 @@ async fn graceful_shutdown_before_first_request_no_block() { .expect("error receiving response"); } +#[cfg(feature = "http1")] +#[tokio::test] +async fn graceful_shutdown_grace_period_for_first_byte() { + // (Client wait before sending, Server first byte timeout, Expected completed response) + let test_cases = [ + // When the client sends bytes before the grace period we expect a response. + (500, 1000, true), + // When the client sends bytes after the grace period we do not expect a response. + (1000, 500, false), + ]; + for (client_wait_before_sending, server_first_byte_timeout, expected_to_respond) in test_cases { + let (listener, addr) = setup_tcp_listener(); + + let graceful_shutdown_first_byte_timeout = Duration::from_millis(server_first_byte_timeout); + let client_wait_before_sending = Duration::from_millis(client_wait_before_sending); + + tokio::spawn(async move { + let socket = listener.accept().await.unwrap().0; + let socket = TokioIo::new(socket); + + let future = http1::Builder::new() + .timer(TokioTimer) + .serve_connection(socket, HelloWorld); + pin!(future); + + let mut graceful_config = GracefulShutdownConfig::default(); + graceful_config.first_byte_read_timeout(graceful_shutdown_first_byte_timeout); + future + .as_mut() + .graceful_shutdown_with_config(graceful_config); + + future.await.unwrap(); + }); + + let mut stream = TkTcpStream::connect(addr).await.unwrap(); + + tokio::time::sleep(client_wait_before_sending).await; + stream.write_all(b"GET / HTTP/1.1\r\n\r\n").await.unwrap(); + + let mut buf = vec![]; + stream.read_to_end(&mut buf).await.unwrap(); + + if expected_to_respond { + assert!(buf.starts_with(b"HTTP/1.1 200 OK\r\nconnection: close")); + } else { + assert!(buf.is_empty()); + } + + // Since the server was gracefully shut down it should not respond to any further requests. + { + stream.write_all(b"GET / HTTP/1.1\r\n\r\n").await.unwrap(); + + let mut buf = vec![]; + stream.read_to_end(&mut buf).await.unwrap(); + + assert_eq!(buf.len(), 0); + } + } +} + #[test] fn streaming_body() { use futures_util::StreamExt;