Skip to content

Commit

Permalink
Add simulated Read/Write fragmentation test
Browse files Browse the repository at this point in the history
  • Loading branch information
tirr-c committed Apr 10, 2020
1 parent 5b9e6ca commit 459ce85
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 3 deletions.
49 changes: 46 additions & 3 deletions tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use async_std::path::PathBuf;
use async_std::sync::Arc;
use async_std::task::{Context, Poll};
use std::pin::Pin;
use std::sync::Mutex;
use std::sync::{atomic::AtomicBool, Mutex};

#[derive(Debug, Copy, Clone)]
#[allow(dead_code)]
Expand All @@ -19,6 +19,12 @@ pub struct TestCase {
source_fixture: Arc<File>,
expected_fixture: Arc<Mutex<File>>,
result: Arc<Mutex<File>>,
throttle: Arc<Throttle>,
}

enum Throttle {
NoThrottle,
YieldPending(AtomicBool, AtomicBool),
}

impl TestCase {
Expand Down Expand Up @@ -68,9 +74,15 @@ impl TestCase {
source_fixture: Arc::new(source_fixture),
expected_fixture: Arc::new(Mutex::new(expected_fixture)),
result,
throttle: Arc::new(Throttle::NoThrottle),
}
}

#[allow(dead_code)]
pub fn throttle(&mut self) {
self.throttle = Arc::new(Throttle::YieldPending(AtomicBool::new(false), AtomicBool::new(false)));
}

pub async fn read_result(&self) -> String {
use async_std::prelude::*;
let mut result = String::new();
Expand Down Expand Up @@ -128,13 +140,44 @@ impl Read for TestCase {
cx: &mut Context,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
Pin::new(&mut &*self.source_fixture).poll_read(cx, buf)
match &*self.throttle {
Throttle::NoThrottle => {
Pin::new(&mut &*self.source_fixture).poll_read(cx, buf)
},
Throttle::YieldPending(read_flag, _) => {
if read_flag.fetch_xor(true, std::sync::atomic::Ordering::SeqCst) {
cx.waker().wake_by_ref();
Poll::Pending
} else {
// read partial
let throttle_len = std::cmp::min(buf.len(), 10);
let buf = &mut buf[..throttle_len];
let ret = Pin::new(&mut &*self.source_fixture).poll_read(cx, buf);
ret
}
},
}
}
}

impl Write for TestCase {
fn poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>> {
Pin::new(&mut &*self.result.lock().unwrap()).poll_write(cx, buf)
match &*self.throttle {
Throttle::NoThrottle => {
Pin::new(&mut &*self.result.lock().unwrap()).poll_write(cx, buf)
},
Throttle::YieldPending(_, write_flag) => {
if write_flag.fetch_xor(true, std::sync::atomic::Ordering::SeqCst) {
cx.waker().wake_by_ref();
Poll::Pending
} else {
// write partial
let throttle_len = std::cmp::min(buf.len(), 10);
let buf = &buf[..throttle_len];
Pin::new(&mut &*self.result.lock().unwrap()).poll_write(cx, buf)
}
},
}
}

fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
Expand Down
17 changes: 17 additions & 0 deletions tests/fixtures/response-chunked-echo-throttled.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
HTTP/1.1 200 OK
transfer-encoding: chunked
date: {DATE}
content-type: text/plain

1
M
6
ozilla
9
Developer
5
Netwo
2
rk
0

27 changes: 27 additions & 0 deletions tests/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,33 @@ async fn test_chunked_echo() {
case.assert().await;
}

#[async_std::test]
async fn test_chunked_echo_throttled() {
let mut case = TestCase::new_server(
"fixtures/request-chunked-echo.txt",
"fixtures/response-chunked-echo-throttled.txt",
)
.await;
case.throttle();
let addr = "http://example.com";

async_h1::accept(addr, case.clone(), |req| async {
let mut resp = Response::new(StatusCode::Ok);
let ct = req.content_type();
let body: Body = req.into();
resp.set_body(body);
if let Some(ct) = ct {
resp.set_content_type(ct);
}

Ok(resp)
})
.await
.unwrap();

case.assert().await;
}

#[async_std::test]
async fn test_unexpected_eof() {
// We can't predict unexpected EOF, so the response content-length is still 11
Expand Down

0 comments on commit 459ce85

Please sign in to comment.