Skip to content

Commit

Permalink
feat(server): add h1 idle_timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
finnbear committed Aug 26, 2024
1 parent 492ab24 commit 4362e38
Show file tree
Hide file tree
Showing 5 changed files with 224 additions and 37 deletions.
10 changes: 10 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ pub(super) enum Kind {
/// User took too long to send headers
#[cfg(all(feature = "http1", feature = "server"))]
HeaderTimeout,
/// User took too long to send another request
#[cfg(all(feature = "http1", feature = "server"))]
IdleTimeout,
/// Error while reading a body from connection.
#[cfg(all(
any(feature = "client", feature = "server"),
Expand Down Expand Up @@ -360,6 +363,11 @@ impl Error {
Error::new(Kind::HeaderTimeout)
}

#[cfg(all(feature = "http1", feature = "server"))]
pub(super) fn new_idle_timeout() -> Error {
Error::new(Kind::IdleTimeout)
}

#[cfg(feature = "http1")]
#[cfg(feature = "server")]
pub(super) fn new_user_unsupported_status_code() -> Error {
Expand Down Expand Up @@ -458,6 +466,8 @@ impl Error {
Kind::Canceled => "operation was canceled",
#[cfg(all(feature = "http1", feature = "server"))]
Kind::HeaderTimeout => "read header from client timeout",
#[cfg(all(feature = "http1", feature = "server"))]
Kind::IdleTimeout => "idle client timeout",
#[cfg(all(
any(feature = "client", feature = "server"),
any(feature = "http1", feature = "http2")
Expand Down
117 changes: 86 additions & 31 deletions src/proto/h1/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,14 @@ where
#[cfg(feature = "server")]
h1_header_read_timeout: None,
#[cfg(feature = "server")]
h1_header_read_timeout_fut: None,
#[cfg(feature = "server")]
h1_header_read_timeout_running: false,
#[cfg(feature = "server")]
h1_idle_timeout: None,
#[cfg(feature = "server")]
h1_idle_timeout_running: false,
#[cfg(feature = "server")]
h1_timeout_fut: None,
#[cfg(feature = "server")]
date_header: true,
#[cfg(feature = "server")]
timer: Time::Empty,
Expand Down Expand Up @@ -147,6 +151,11 @@ where
self.state.h1_header_read_timeout = Some(val);
}

#[cfg(feature = "server")]
pub(crate) fn set_http1_idle_timeout(&mut self, val: Duration) {
self.state.h1_idle_timeout = Some(val);
}

#[cfg(feature = "server")]
pub(crate) fn set_allow_half_close(&mut self) {
self.state.allow_half_close = true;
Expand Down Expand Up @@ -217,24 +226,10 @@ where
trace!("Conn::read_head");

#[cfg(feature = "server")]
if !self.state.h1_header_read_timeout_running {
if let Some(h1_header_read_timeout) = self.state.h1_header_read_timeout {
let deadline = Instant::now() + h1_header_read_timeout;
self.state.h1_header_read_timeout_running = true;
match self.state.h1_header_read_timeout_fut {
Some(ref mut h1_header_read_timeout_fut) => {
trace!("resetting h1 header read timeout timer");
self.state.timer.reset(h1_header_read_timeout_fut, deadline);
}
None => {
trace!("setting h1 header read timeout timer");
self.state.h1_header_read_timeout_fut =
Some(self.state.timer.sleep_until(deadline));
}
}
}
}
let first_head = !self.io.is_read_blocked();

#[cfg_attr(not(feature = "server"), allow(unused))]
let mut progress = false;
let msg = match self.io.parse::<T>(
cx,
ParseContext {
Expand All @@ -249,20 +244,69 @@ where
#[cfg(feature = "ffi")]
on_informational: &mut self.state.on_informational,
},
&mut progress,
) {
Poll::Ready(Ok(msg)) => msg,
Poll::Ready(Err(e)) => return self.on_read_head_error(e),
Poll::Pending => {
// - Use the read timeout on the first head to avoid common DoS.
// - If made progress in reading header, must no longer be idle.
#[cfg(feature = "server")]
if self.state.h1_header_read_timeout_running {
if let Some(ref mut h1_header_read_timeout_fut) =
self.state.h1_header_read_timeout_fut
{
if Pin::new(h1_header_read_timeout_fut).poll(cx).is_ready() {
self.state.h1_header_read_timeout_running = false;

warn!("read header from client timeout");
return Poll::Ready(Some(Err(crate::Error::new_header_timeout())));
if first_head || progress {
if !self.state.h1_header_read_timeout_running {
if let Some(h1_header_read_timeout) = self.state.h1_header_read_timeout {
let deadline = Instant::now() + h1_header_read_timeout;
self.state.h1_idle_timeout_running = false;
self.state.h1_header_read_timeout_running = true;
match self.state.h1_timeout_fut {
Some(ref mut ht_timeout_fut) => {
trace!("resetting h1 timeout timer for header read");
self.state.timer.reset(ht_timeout_fut, deadline);
}
None => {
trace!("setting h1 timeout timer for header read");
self.state.h1_timeout_fut =
Some(self.state.timer.sleep_until(deadline));
}
}
} else if std::mem::take(&mut self.state.h1_idle_timeout_running) {
trace!("unsetting h1 timeout timer for idle");
self.state.h1_timeout_fut = None;
}
}
} else if !self.state.h1_header_read_timeout_running
&& !self.state.h1_idle_timeout_running
{
if let Some(h1_idle_timeout) = self.state.h1_idle_timeout {
let deadline = Instant::now() + h1_idle_timeout;
self.state.h1_idle_timeout_running = true;
match self.state.h1_timeout_fut {
Some(ref mut h1_timeout_fut) => {
trace!("resetting h1 timeout timer for idle");
self.state.timer.reset(h1_timeout_fut, deadline);
}
None => {
trace!("setting h1 timeout timer for idle");
self.state.h1_timeout_fut =
Some(self.state.timer.sleep_until(deadline));
}
}
}
}

#[cfg(feature = "server")]
if self.state.h1_header_read_timeout_running || self.state.h1_idle_timeout_running {
if let Some(ref mut h1_timeout_fut) = self.state.h1_timeout_fut {
if Pin::new(h1_timeout_fut).poll(cx).is_ready() {
return Poll::Ready(Some(Err(
if self.state.h1_header_read_timeout_running {
warn!("read header from client timeout");
crate::Error::new_header_timeout()
} else {
warn!("idle client timeout");
crate::Error::new_idle_timeout()
},
)));
}
}
}
Expand All @@ -274,7 +318,8 @@ where
#[cfg(feature = "server")]
{
self.state.h1_header_read_timeout_running = false;
self.state.h1_header_read_timeout_fut = None;
self.state.h1_idle_timeout_running = false;
self.state.h1_timeout_fut = None;
}

// Note: don't deconstruct `msg` into local variables, it appears
Expand Down Expand Up @@ -919,10 +964,14 @@ struct State {
#[cfg(feature = "server")]
h1_header_read_timeout: Option<Duration>,
#[cfg(feature = "server")]
h1_header_read_timeout_fut: Option<Pin<Box<dyn Sleep>>>,
#[cfg(feature = "server")]
h1_header_read_timeout_running: bool,
#[cfg(feature = "server")]
h1_idle_timeout: Option<Duration>,
#[cfg(feature = "server")]
h1_idle_timeout_running: bool,
#[cfg(feature = "server")]
h1_timeout_fut: Option<Pin<Box<dyn Sleep>>>,
#[cfg(feature = "server")]
date_header: bool,
#[cfg(feature = "server")]
timer: Time,
Expand Down Expand Up @@ -1106,6 +1155,12 @@ impl State {
self.reading = Reading::Init;
self.writing = Writing::Init;

#[cfg(feature = "server")]
if self.h1_idle_timeout.is_some() {
// Next read will start and poll the idle timeout.
self.notify_read = true;
}

// !T::should_read_first() means Client.
//
// If Client connection has just gone idle, the Dispatcher
Expand Down
4 changes: 3 additions & 1 deletion src/proto/h1/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ where
&mut self,
cx: &mut Context<'_>,
parse_ctx: ParseContext<'_>,
progress: &mut bool,
) -> Poll<crate::Result<ParsedMessage<S::Incoming>>>
where
S: Http1Transaction,
Expand Down Expand Up @@ -205,6 +206,7 @@ where
trace!("parse eof");
return Poll::Ready(Err(crate::Error::new_incomplete()));
}
*progress = true;
}
}

Expand Down Expand Up @@ -702,7 +704,7 @@ mod tests {
on_informational: &mut None,
};
assert!(buffered
.parse::<ClientTransaction>(cx, parse_ctx)
.parse::<ClientTransaction>(cx, parse_ctx, &mut false)
.is_pending());
Poll::Ready(())
})
Expand Down
20 changes: 20 additions & 0 deletions src/server/conn/http1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ pub struct Builder {
h1_preserve_header_case: bool,
h1_max_headers: Option<usize>,
h1_header_read_timeout: Dur,
h1_idle_timeout: Dur,
h1_writev: Option<bool>,
max_buf_size: Option<usize>,
pipeline_flush: bool,
Expand Down Expand Up @@ -238,6 +239,7 @@ impl Builder {
h1_preserve_header_case: false,
h1_max_headers: None,
h1_header_read_timeout: Dur::Default(Some(Duration::from_secs(30))),
h1_idle_timeout: Dur::Default(Some(Duration::from_secs(120))),
h1_writev: None,
max_buf_size: None,
pipeline_flush: false,
Expand Down Expand Up @@ -322,6 +324,21 @@ impl Builder {
self
}

/// Set a timeout for idle time between requests. If a client does not
/// transmit another request within this time after receiving the last
/// response, the connection is closed.
///
/// Requires a [`Timer`] set by [`Builder::timer`] to take effect. Panics if `idle_timeout` is configured
/// without a [`Timer`].
///
/// Pass `None` to disable.
///
/// Default is 120 seconds.
pub fn idle_timeout(&mut self, idle_timeout: impl Into<Option<Duration>>) -> &mut Self {
self.h1_idle_timeout = Dur::Configured(idle_timeout.into());
self
}

/// Set whether HTTP/1 connections should try to use vectored writes,
/// or always flatten into a single buffer.
///
Expand Down Expand Up @@ -448,6 +465,9 @@ impl Builder {
{
conn.set_http1_header_read_timeout(dur);
};
if let Some(dur) = self.timer.check(self.h1_idle_timeout, "idle_timeout") {
conn.set_http1_idle_timeout(dur);
};
if let Some(writev) = self.h1_writev {
if writev {
conn.set_write_strategy_queue();
Expand Down
Loading

0 comments on commit 4362e38

Please sign in to comment.