Skip to content

Commit

Permalink
feat(http1): graceful shutdown first byte timeout
Browse files Browse the repository at this point in the history
This commit introduces a new `Connection::graceful_shutdown_with_config`
method that gives users control over the HTTP/1 graceful process.

Before this commit, if a graceful shutdown was initiated on an inactive
connection hyper would immediately close it.

As of this commit the `GracefulShutdownConfig::first_byte_read_timeout`
method can be used to give inactive connections a grace period where,
should the server begin receiving bytes from them before the deadline,
the request will be processed.

## HTTP/2 Graceful Shutdowns

This commit does not modify hyper's HTTP/2 graceful shutdown process.

`hyper` already uses the HTTP/2 `GOAWAY` frame, meaning that `hyper`
already gives inactive connections a brief period during which they can
transmit their final requests.

Note that while this commit enables slightly more graceful shutdowns for
HTTP/1 servers, HTTP/2 graceful shutdowns are still superior.

HTTP/2's `GOAWAY` frame allows the server to finish processing a last
batch of multiple incoming requests from the client, whereas the new
graceful shutdown configuration in this commit only allows the server to
wait for one final incoming request to be received.

This limitations stems from a limitation in HTTP/1, where there is
nothing like the `GOAWAY` frame that can be used to coordinate the
graceful shutdown process with the client in the face of multiple
concurrent incoming requests.

Instead for HTTP/1 connections `hyper` gracefully shuts down by
disabling Keep-Alive, at which point the server will only receive at
most one new request, even if the client has multiple requests that are
moments from reaching the server.

## Motivating Use Case

I'm working on a server that is being designed to handle a large amount
of traffic from a large number of clients.

It is expected that many clients will open many TCP connections with the
server every second.

As a server receives more traffic it becomes increasingly likely that at
the point that it begins gracefully shutting down there are connections
that were just opened, but the client's bytes have not yet been seen by
the server.

Before this commit, calling `Connection::graceful_shutdown` on such a
freshly opened HTTP/1 connection will immediately close it. This means
that the client will get an error, despite the server having been
perfectly capable of handling a final request before closing the
connection.

This commit solves this problem for HTTP/1 clients that tend to send one
request at a time.
By setting a `GracefulShutdownConfig::first_byte_read_timeout` of, say,
1 second, the server will wait a moment to see if any of the client's
bytes have been received.
During this period, the client will have been informed that Keep-Alive
is now disabled, meaning that at most one more request will be
processed.

Clients that have multiple in-flight requests that have not yet reached
the server will have at most one of those requests handled, even if all
of them reach the server before the `first_byte_read_timeout`.
This is a limitation of HTTP/1.

## Work to do in other Crates

#### hyper-util

To expose this to users that use `hyper-util`, a method should be added
to `hyper-util`'s `Connection` type.
This new `hyper-util Connection::graceful_shutdown_with_config` method
would expose a `http1_first_byte_read_timeout` method that would lead
`hyper-util` to set `hyper
GracefulShutdownConfig::first_byte_read_timeout`.

---

Closes hyperium#3792
  • Loading branch information
chinedufn committed Dec 10, 2024
1 parent 7f4a682 commit 2a33cb5
Show file tree
Hide file tree
Showing 4 changed files with 295 additions and 39 deletions.
124 changes: 98 additions & 26 deletions src/proto/h1/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,9 @@ where
h1_parser_config: ParserConfig::default(),
h1_max_headers: None,
#[cfg(feature = "server")]
h1_header_read_timeout: None,
h1_header_read_timeout: TimeoutState::default(),
#[cfg(feature = "server")]
h1_header_read_timeout_fut: None,
#[cfg(feature = "server")]
h1_header_read_timeout_running: false,
h1_graceful_shutdown_first_byte_read_timeout: TimeoutState::default(),
#[cfg(feature = "server")]
date_header: true,
#[cfg(feature = "server")]
Expand Down Expand Up @@ -144,7 +142,14 @@ where

#[cfg(feature = "server")]
pub(crate) fn set_http1_header_read_timeout(&mut self, val: Duration) {
self.state.h1_header_read_timeout = Some(val);
self.state.h1_header_read_timeout.timeout = Some(val);
}

#[cfg(feature = "server")]
pub(crate) fn set_http1_graceful_shutdown_first_byte_read_timeout(&mut self, val: Duration) {
self.state
.h1_graceful_shutdown_first_byte_read_timeout
.timeout = Some(val);
}

#[cfg(feature = "server")]
Expand Down Expand Up @@ -209,6 +214,19 @@ where
read_buf.len() >= 24 && read_buf[..24] == *H2_PREFACE
}

fn close_if_inactive(&mut self) {
// When a graceful shutdown is triggered we wait for up to some
// `Duration` to allow for the client to begin transmitting bytes to the
// server.
// If that duration has elapsed and the connection is still idle, or
// no bytes have been received on the connection, then we close it.
// This prevents inactive connections from keeping the server alive
// despite having no intention of sending a request.
if self.is_idle() || self.has_initial_read_write_state() {
self.state.close();
}
}

pub(super) fn poll_read_head(
&mut self,
cx: &mut Context<'_>,
Expand All @@ -217,24 +235,50 @@ where
trace!("Conn::read_head");

#[cfg(feature = "server")]
if !self.state.h1_header_read_timeout_running {
if let Some(h1_header_read_timeout) = self.state.h1_header_read_timeout {
if !self.state.h1_header_read_timeout.is_running {
if let Some(h1_header_read_timeout) = self.state.h1_header_read_timeout.timeout {
self.state.h1_header_read_timeout.is_running = true;
let deadline = Instant::now() + h1_header_read_timeout;
self.state.h1_header_read_timeout_running = true;
match self.state.h1_header_read_timeout_fut {
match self.state.h1_header_read_timeout.deadline_fut {
Some(ref mut h1_header_read_timeout_fut) => {
trace!("resetting h1 header read timeout timer");
self.state.timer.reset(h1_header_read_timeout_fut, deadline);
}
None => {
trace!("setting h1 header read timeout timer");
self.state.h1_header_read_timeout_fut =
self.state.h1_header_read_timeout.deadline_fut =
Some(self.state.timer.sleep_until(deadline));
}
}
}
}

#[cfg(feature = "server")]
if !self
.state
.h1_graceful_shutdown_first_byte_read_timeout
.is_running
{
if let Some(h1_graceful_shutdown_timeout) = self
.state
.h1_graceful_shutdown_first_byte_read_timeout
.timeout
{
if h1_graceful_shutdown_timeout == Duration::from_secs(0) {
self.close_if_inactive();
} else {
self.state
.h1_graceful_shutdown_first_byte_read_timeout
.is_running = true;

let deadline = Instant::now() + h1_graceful_shutdown_timeout;
self.state
.h1_graceful_shutdown_first_byte_read_timeout
.deadline_fut = Some(self.state.timer.sleep_until(deadline));
}
}
}

let msg = match self.io.parse::<T>(
cx,
ParseContext {
Expand All @@ -254,27 +298,47 @@ where
Poll::Ready(Err(e)) => return self.on_read_head_error(e),
Poll::Pending => {
#[cfg(feature = "server")]
if self.state.h1_header_read_timeout_running {
if self.state.h1_header_read_timeout.is_running {
if let Some(ref mut h1_header_read_timeout_fut) =
self.state.h1_header_read_timeout_fut
self.state.h1_header_read_timeout.deadline_fut
{
if Pin::new(h1_header_read_timeout_fut).poll(cx).is_ready() {
self.state.h1_header_read_timeout_running = false;
self.state.h1_header_read_timeout.is_running = false;

warn!("read header from client timeout");
return Poll::Ready(Some(Err(crate::Error::new_header_timeout())));
}
}
}

#[cfg(feature = "server")]
if self
.state
.h1_graceful_shutdown_first_byte_read_timeout
.is_running
{
if let Some(ref mut h1_graceful_shutdown_timeout_fut) = self
.state
.h1_graceful_shutdown_first_byte_read_timeout
.deadline_fut
{
if Pin::new(h1_graceful_shutdown_timeout_fut)
.poll(cx)
.is_ready()
{
self.close_if_inactive();
}
}
}

return Poll::Pending;
}
};

#[cfg(feature = "server")]
{
self.state.h1_header_read_timeout_running = false;
self.state.h1_header_read_timeout_fut = None;
self.state.h1_header_read_timeout.is_running = false;
self.state.h1_header_read_timeout.deadline_fut = None;
}

// Note: don't deconstruct `msg` into local variables, it appears
Expand Down Expand Up @@ -872,15 +936,15 @@ where
self.state.close_write();
}

#[cfg(feature = "server")]
pub(crate) fn is_idle(&mut self) -> bool {
self.state.is_idle()
}

#[cfg(feature = "server")]
pub(crate) fn disable_keep_alive(&mut self) {
if self.state.is_idle() {
trace!("disable_keep_alive; closing idle connection");
self.state.close();
} else {
trace!("disable_keep_alive; in-progress connection");
self.state.disable_keep_alive();
}
trace!("disable_keep_alive");
self.state.disable_keep_alive();
}

pub(crate) fn take_error(&mut self) -> crate::Result<()> {
Expand Down Expand Up @@ -926,11 +990,11 @@ struct State {
h1_parser_config: ParserConfig,
h1_max_headers: Option<usize>,
#[cfg(feature = "server")]
h1_header_read_timeout: Option<Duration>,
h1_header_read_timeout: TimeoutState,
/// If a graceful shutdown is initiated, and the `TimeoutState` duration has elapsed without
/// receiving any bytes from the client, the connection will be closed.
#[cfg(feature = "server")]
h1_header_read_timeout_fut: Option<Pin<Box<dyn Sleep>>>,
#[cfg(feature = "server")]
h1_header_read_timeout_running: bool,
h1_graceful_shutdown_first_byte_read_timeout: TimeoutState,
#[cfg(feature = "server")]
date_header: bool,
#[cfg(feature = "server")]
Expand Down Expand Up @@ -1144,6 +1208,14 @@ impl State {
}
}

#[derive(Default)]
#[cfg(feature = "server")]
struct TimeoutState {
timeout: Option<Duration>,
deadline_fut: Option<Pin<Box<dyn Sleep>>>,
is_running: bool,
}

#[cfg(test)]
mod tests {
#[cfg(all(feature = "nightly", not(miri)))]
Expand Down
21 changes: 10 additions & 11 deletions src/proto/h1/dispatch.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
use crate::rt::{Read, Write};
use bytes::{Buf, Bytes};
use futures_util::ready;
use http::Request;
use std::time::Duration;
use std::{
error::Error as StdError,
future::Future,
Expand All @@ -6,11 +11,6 @@ use std::{
task::{Context, Poll},
};

use crate::rt::{Read, Write};
use bytes::{Buf, Bytes};
use futures_util::ready;
use http::Request;

use super::{Http1Transaction, Wants};
use crate::body::{Body, DecodedLength, Incoming as IncomingBody};
#[cfg(feature = "client")]
Expand Down Expand Up @@ -90,13 +90,12 @@ where
#[cfg(feature = "server")]
pub(crate) fn disable_keep_alive(&mut self) {
self.conn.disable_keep_alive();
}

// If keep alive has been disabled and no read or write has been seen on
// the connection yet, we must be in a state where the server is being asked to
// shut down before any data has been seen on the connection
if self.conn.is_write_closed() || self.conn.has_initial_read_write_state() {
self.close();
}
#[cfg(feature = "server")]
pub(crate) fn set_graceful_shutdown_first_byte_read_timeout(&mut self, read_timeout: Duration) {
self.conn
.set_http1_graceful_shutdown_first_byte_read_timeout(read_timeout);
}

pub(crate) fn into_inner(self) -> (I, Bytes, D) {
Expand Down
96 changes: 94 additions & 2 deletions src/server/conn/http1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,8 @@ where
B: Body + 'static,
B::Error: Into<Box<dyn StdError + Send + Sync>>,
{
/// Start a graceful shutdown process for this connection.
/// Start a graceful shutdown process for this connection, using the default
/// [`GracefulShutdownConfig`].
///
/// This `Connection` should continue to be polled until shutdown
/// can finish.
Expand All @@ -133,8 +134,29 @@ where
/// This should only be called while the `Connection` future is still
/// pending. If called after `Connection::poll` has resolved, this does
/// nothing.
pub fn graceful_shutdown(mut self: Pin<&mut Self>) {
pub fn graceful_shutdown(self: Pin<&mut Self>) {
self.graceful_shutdown_with_config(GracefulShutdownConfig::default());
}

/// Start a graceful shutdown process for this connection.
///
/// This `Connection` should continue to be polled until shutdown can finish.
///
/// Requires a [`Timer`] set by [`Builder::timer`].
///
/// # Note
///
/// This should only be called while the `Connection` future is still
/// pending. If called after `Connection::poll` has resolved, this does
/// nothing.
///
/// # Panics
/// If [`GracefulShutdownConfig::first_byte_read_timeout`] was configured to greater than zero
/// nanoseconds, but no timer was set, then the `Connection` will panic when it is next polled.
pub fn graceful_shutdown_with_config(mut self: Pin<&mut Self>, config: GracefulShutdownConfig) {
self.conn.disable_keep_alive();
self.conn
.set_graceful_shutdown_first_byte_read_timeout(config.first_byte_read_timeout);
}

/// Return the inner IO object, and additional information.
Expand Down Expand Up @@ -526,3 +548,73 @@ where
}
}
}

/// Configuration for graceful shutdowns.
///
/// # Example
///
/// ```
/// # use hyper::{body::Incoming, Request, Response};
/// # use hyper::service::Service;
/// # use hyper::server::conn::http1::Builder;
/// # use hyper::rt::{Read, Write};
/// # use std::time::Duration;
/// # use hyper::server::conn::http1::GracefulShutdownConfig;
/// # async fn run<I, S>(some_io: I, some_service: S)
/// # where
/// # I: Read + Write + Unpin + Send + 'static,
/// # S: Service<hyper::Request<Incoming>, Response=hyper::Response<Incoming>> + Send + 'static,
/// # S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
/// # S::Future: Send,
/// # {
/// let http = Builder::new();
/// let conn = http.serve_connection(some_io, some_service);
///
/// let mut config = GracefulShutdownConfig::default();
/// config.first_byte_read_timeout(Duration::from_secs(2));
///
/// conn.graceful_shutdown_with_config(config);
/// conn.await.unwrap();
/// # }
/// # fn main() {}
/// ```
#[derive(Debug)]
pub struct GracefulShutdownConfig {
first_byte_read_timeout: Duration,
}
impl Default for GracefulShutdownConfig {
fn default() -> Self {
GracefulShutdownConfig {
first_byte_read_timeout: Duration::from_secs(0),
}
}
}
impl GracefulShutdownConfig {
/// It is possible for a client to open a connection and begin transmitting bytes, but have the
/// server initiate a graceful shutdown just before it sees any of the client's bytes.
///
/// The more traffic that a server receives, the more likely this race condition is to occur for
/// some of the open connections.
///
/// The `first_byte_read_timeout` controls how long the server waits for the first bytes of a
/// final request to be received from the client.
///
/// If no bytes were received from the client between the time that keep alive was disabled and
/// the `first_byte_timeout` duration, the connection is considered inactive and the server will
/// close it.
///
/// # Recommendations
/// Servers are recommended to use a `first_byte_read_timeout` that reduces the likelihood of
/// the client receiving an error due to the connection closing just after they began
/// transmitting their final request.
/// For most internet connections, a roughly one second timeout should be enough time for the
/// server to begin receiving the client's request's bytes.
///
/// # Default
/// A default of 0 seconds was chosen to remain backwards compatible with version of hyper that
/// did not have this `first_byte_read_timeout` configuration.
pub fn first_byte_read_timeout(&mut self, timeout: Duration) -> &mut Self {
self.first_byte_read_timeout = timeout;
self
}
}
Loading

0 comments on commit 2a33cb5

Please sign in to comment.