Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Throttle #65

Draft
wants to merge 6 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion http-body-util/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,15 @@ categories = ["web-programming"]

[dependencies]
bytes = "1"
futures-util = { version = "0.3", default-features = false, features = ["alloc"] }
http = "0.2"
http-body = { path = "../http-body" }
pin-project-lite = "0.2"
tokio = { version = "1", features = ["time"], optional = true }

[dev-dependencies]
tokio = { version = "1", features = ["macros", "rt"] }
tokio = { version = "1", features = ["macros", "rt", "test-util"] }

[package.metadata.docs.rs]
all-features = true
rustdoc-args = ["--cfg", "docsrs"]
9 changes: 9 additions & 0 deletions http-body-util/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
unreachable_pub,
rustdoc::broken_intra_doc_links
)]
#![cfg_attr(docsrs, feature(doc_auto_cfg, doc_cfg))]
#![cfg_attr(test, deny(warnings))]

//! Utilities for [`http_body::Body`].
Expand All @@ -16,11 +17,19 @@ pub mod combinators;
mod empty;
mod full;
mod limited;
mod stream;

#[cfg(feature = "tokio")]
mod throttle;

use self::combinators::{BoxBody, MapData, MapErr, UnsyncBoxBody};
pub use self::empty::Empty;
pub use self::full::Full;
pub use self::limited::{LengthLimitError, Limited};
pub use self::stream::StreamBody;

#[cfg(feature = "tokio")]
pub use self::throttle::Throttle;

/// An extension trait for [`http_body::Body`] adding various combinators and adapters
pub trait BodyExt: http_body::Body {
Expand Down
78 changes: 41 additions & 37 deletions http-body-util/src/limited.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ impl Error for LengthLimitError {}
#[cfg(test)]
mod tests {
use super::*;
use crate::Full;
use crate::{Full, StreamBody};
use bytes::Bytes;
use std::convert::Infallible;

Expand Down Expand Up @@ -150,40 +150,25 @@ mod tests {
assert!(matches!(error.downcast_ref(), Some(LengthLimitError)));
}

struct Chunky(&'static [&'static [u8]]);

impl Body for Chunky {
type Data = &'static [u8];
type Error = Infallible;

fn poll_data(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<Option<Result<Self::Data, Self::Error>>> {
let mut this = self;
match this.0.split_first().map(|(&head, tail)| (Ok(head), tail)) {
Some((data, new_tail)) => {
this.0 = new_tail;

Poll::Ready(Some(data))
}
None => Poll::Ready(None),
}
}

fn poll_trailers(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<Result<Option<HeaderMap>, Self::Error>> {
Poll::Ready(Ok(Some(HeaderMap::new())))
}
fn body_from_iter<I>(into_iter: I) -> impl Body<Data = Bytes, Error = Infallible>
where
I: IntoIterator,
I::Item: Into<Bytes> + 'static,
I::IntoIter: Send + 'static,
{
let iter = into_iter
.into_iter()
.map(Into::into)
.map(Ok::<_, Infallible>);

StreamBody::new(futures_util::stream::iter(iter))
}

#[tokio::test]
async fn read_for_chunked_body_around_limit_returns_first_chunk_but_returns_error_on_over_limit_chunk(
) {
const DATA: &[&[u8]] = &[b"testing ", b"a string that is too long"];
let inner = Chunky(DATA);
const DATA: [&[u8]; 2] = [b"testing ", b"a string that is too long"];
let inner = body_from_iter(DATA);
let body = &mut Limited::new(inner, 8);

let mut hint = SizeHint::new();
Expand All @@ -201,8 +186,8 @@ mod tests {

#[tokio::test]
async fn read_for_chunked_body_over_limit_on_first_chunk_returns_error() {
const DATA: &[&[u8]] = &[b"testing a string", b" that is too long"];
let inner = Chunky(DATA);
const DATA: [&[u8]; 2] = [b"testing a string", b" that is too long"];
let inner = body_from_iter(DATA);
let body = &mut Limited::new(inner, 8);

let mut hint = SizeHint::new();
Expand All @@ -215,8 +200,8 @@ mod tests {

#[tokio::test]
async fn read_for_chunked_body_under_limit_is_okay() {
const DATA: &[&[u8]] = &[b"test", b"ing!"];
let inner = Chunky(DATA);
const DATA: [&[u8]; 2] = [b"test", b"ing!"];
let inner = body_from_iter(DATA);
let body = &mut Limited::new(inner, 8);

let mut hint = SizeHint::new();
Expand All @@ -236,11 +221,30 @@ mod tests {
assert!(matches!(body.data().await, None));
}

struct SomeTrailers;

impl Body for SomeTrailers {
type Data = Bytes;
type Error = Infallible;

fn poll_data(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<Option<Result<Self::Data, Self::Error>>> {
Poll::Ready(None)
}

fn poll_trailers(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<Result<Option<HeaderMap>, Self::Error>> {
Poll::Ready(Ok(Some(HeaderMap::new())))
}
}

#[tokio::test]
async fn read_for_trailers_propagates_inner_trailers() {
const DATA: &[&[u8]] = &[b"test", b"ing!"];
let inner = Chunky(DATA);
let body = &mut Limited::new(inner, 8);
let body = &mut Limited::new(SomeTrailers, 8);
let trailers = body.trailers().await.unwrap();
assert_eq!(trailers, Some(HeaderMap::new()))
}
Expand Down
85 changes: 85 additions & 0 deletions http-body-util/src/stream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
use bytes::Buf;
use futures_util::stream::Stream;
use http::HeaderMap;
use http_body::Body;
use pin_project_lite::pin_project;
use std::{
pin::Pin,
task::{Context, Poll},
};

pin_project! {
/// A body created from a `Stream`.
#[derive(Clone, Copy, Debug)]
pub struct StreamBody<S> {
#[pin]
stream: S,
}
}

impl<S> StreamBody<S> {
/// Create a new `StreamBody`.
pub fn new(stream: S) -> Self {
Self { stream }
}
}

impl<S, D, E> Body for StreamBody<S>
where
S: Stream<Item = Result<D, E>>,
D: Buf,
{
type Data = D;
type Error = E;

fn poll_data(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Self::Data, Self::Error>>> {
self.project().stream.poll_next(cx)
}

fn poll_trailers(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<Result<Option<HeaderMap>, Self::Error>> {
Poll::Ready(Ok(None))
}
}

impl<S: Stream> Stream for StreamBody<S> {
type Item = S::Item;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.project().stream.poll_next(cx)
}

fn size_hint(&self) -> (usize, Option<usize>) {
self.stream.size_hint()
}
}

#[cfg(test)]
mod tests {
use crate::StreamBody;
use bytes::Bytes;
use http_body::Body;
use std::convert::Infallible;

#[tokio::test]
async fn body_from_stream() {
let chunks: Vec<Result<Bytes, Infallible>> = vec![
Ok(Bytes::from(vec![1])),
Ok(Bytes::from(vec![2])),
Ok(Bytes::from(vec![3])),
];
let stream = futures_util::stream::iter(chunks);
let mut body = StreamBody::new(stream);

assert_eq!(body.data().await.unwrap().unwrap().as_ref(), [1]);
assert_eq!(body.data().await.unwrap().unwrap().as_ref(), [2]);
assert_eq!(body.data().await.unwrap().unwrap().as_ref(), [3]);

assert!(body.data().await.is_none());
}
}
Loading