Skip to content

Commit

Permalink
chore: Use a fork of hyper with Body::poll_progress
Browse files Browse the repository at this point in the history
hyperium/http-body#90 proposes adding a `Body::poll_progress` method to the
`Body` trait. This PR uses a fork of hyper that uses this proposed API when
awaiting stream send capacity. This supports implementing timeouts on streams
and connections in an unhealthy state to defend servers against resource
exhaustion.
  • Loading branch information
olix0r committed Apr 12, 2024
1 parent 31d7464 commit 1df8262
Show file tree
Hide file tree
Showing 16 changed files with 99 additions and 5 deletions.
6 changes: 6 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
[submodule "http-body"]
path = http-body
url = [email protected]:olix0r/http-body.git
[submodule "hyper"]
path = hyper
url = [email protected]:olix0r/hyper.git
4 changes: 0 additions & 4 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -830,8 +830,6 @@ dependencies = [
[[package]]
name = "http-body"
version = "0.4.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7ceab25649e9960c0311ea418d17bee82c0dcec1bd053b5f9a66e265a693bed2"
dependencies = [
"bytes",
"http",
Expand Down Expand Up @@ -859,8 +857,6 @@ checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4"
[[package]]
name = "hyper"
version = "0.14.28"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bf96e135eb83a2a8ddf766e426a841d8ddd7449d5f00d34ea02b41d2f19eef80"
dependencies = [
"bytes",
"futures-channel",
Expand Down
9 changes: 9 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,15 @@ members = [
"tools",
]

exclude = [
"http-body",
"hyper",
]

[profile.release]
debug = 1
lto = true

[patch.crates-io]
http-body = { path = "http-body" }
hyper = { path = "hyper" }
1 change: 1 addition & 0 deletions http-body
Submodule http-body added at 30db49
1 change: 1 addition & 0 deletions hyper
Submodule hyper added at 542b48
5 changes: 5 additions & 0 deletions hyper-balance/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,11 @@ where
Poll::Ready(ret)
}

#[inline]
fn poll_progress(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.project().body.poll_progress(cx)
}

fn poll_trailers(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
Expand Down
7 changes: 7 additions & 0 deletions linkerd/app/core/src/errors/respond.rs
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,13 @@ where
}
}

fn poll_progress(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
match self.project() {
ResponseBodyProj::Passthru(inner) => inner.poll_progress(cx),
ResponseBodyProj::GrpcRescue { inner, .. } => inner.poll_progress(cx),
}
}

#[inline]
fn poll_trailers(
self: Pin<&mut Self>,
Expand Down
15 changes: 14 additions & 1 deletion linkerd/http/box/src/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,14 @@ impl Body for BoxBody {
self.as_mut().inner.as_mut().poll_data(cx)
}

#[inline]
fn poll_progress(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), Self::Error>> {
self.as_mut().inner.as_mut().poll_progress(cx)
}

#[inline]
fn poll_trailers(
mut self: Pin<&mut Self>,
Expand Down Expand Up @@ -116,12 +124,17 @@ where
}))
}

#[inline]
fn poll_progress(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.project().0.poll_progress(cx).map_err(Into::into)
}

#[inline]
fn poll_trailers(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Option<HeaderMap<HeaderValue>>, Self::Error>> {
Poll::Ready(futures::ready!(self.project().0.poll_trailers(cx)).map_err(Into::into))
self.project().0.poll_trailers(cx).map_err(Into::into)
}

#[inline]
Expand Down
11 changes: 11 additions & 0 deletions linkerd/http/metrics/src/requests/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,12 @@ where
Poll::Ready(frame)
}

#[inline]
fn poll_progress(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.project().inner.poll_progress(cx)
}

#[inline]
fn poll_trailers(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
Expand Down Expand Up @@ -422,6 +428,11 @@ where
Poll::Ready(frame)
}

#[inline]
fn poll_progress(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.project().inner.poll_progress(cx).map_err(Into::into)
}

fn poll_trailers(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
Expand Down
15 changes: 15 additions & 0 deletions linkerd/http/retry/src/replay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,21 @@ where
Poll::Ready(Some(Ok(Data::Initial(chunk))))
}

fn poll_progress(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let this = self.get_mut();
let state = Self::acquire_state(&mut this.state, &this.shared.body);
tracing::trace!("ReplayBody::poll_progress");

if let Some(rest) = state.rest.as_mut() {
// If the inner body has previously ended, don't poll it again.
if !rest.is_end_stream() {
return Pin::new(rest).poll_progress(cx).map_err(Into::into);
}
}

Poll::Ready(Ok(()))
}

fn poll_trailers(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
Expand Down
5 changes: 5 additions & 0 deletions linkerd/http/retry/src/with_trailers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,11 @@ where
Pin::new(&mut this.inner).poll_data(cx)
}

fn poll_progress(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let this = self.get_mut();
Pin::new(&mut this.inner).poll_progress(cx)
}

fn poll_trailers(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
Expand Down
5 changes: 5 additions & 0 deletions linkerd/proxy/http/src/classify/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,11 @@ where
}
}

#[inline]
fn poll_progress(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.project().inner.poll_progress(cx)
}

fn poll_trailers(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
Expand Down
1 change: 1 addition & 0 deletions linkerd/proxy/http/src/glue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ pub struct HyperConnectFuture<F> {
}
// === impl UpgradeBody ===

// Note: There's no poll_progress implementation in hyper::Body.
impl HttpBody for UpgradeBody {
type Data = Bytes;
type Error = hyper::Error;
Expand Down
9 changes: 9 additions & 0 deletions linkerd/proxy/http/src/orig_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ impl HttpBody for UpgradeResponseBody {
self.inner.is_end_stream()
}

#[inline]
fn poll_data(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
Expand All @@ -212,6 +213,14 @@ impl HttpBody for UpgradeResponseBody {
.map_err(downgrade_h2_error)
}

#[inline]
fn poll_progress(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Pin::new(self.project().inner)
.poll_progress(cx)
.map_err(downgrade_h2_error)
}

#[inline]
fn poll_trailers(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
Expand Down
5 changes: 5 additions & 0 deletions linkerd/proxy/http/src/retain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,11 @@ impl<T, B: http_body::Body> http_body::Body for RetainBody<T, B> {
self.project().inner.poll_data(cx)
}

#[inline]
fn poll_progress(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.project().inner.poll_progress(cx)
}

#[inline]
fn poll_trailers(
self: Pin<&mut Self>,
Expand Down
5 changes: 5 additions & 0 deletions linkerd/proxy/tap/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,11 @@ where
}
}

#[inline]
fn poll_progress(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.project().inner.poll_progress(cx)
}

fn poll_trailers(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
Expand Down

0 comments on commit 1df8262

Please sign in to comment.