From f56ddd1513cd3b53c16d9d32fa3b2a62d627c78a Mon Sep 17 00:00:00 2001 From: Kestrel Date: Sun, 10 Nov 2024 19:55:48 -0800 Subject: [PATCH 1/2] Rearrange ReadNotifier to remove dependency on async-global-executor. --- Cargo.toml | 1 - src/read_notifier.rs | 22 ++++++++++++++-------- src/server/decode.rs | 31 +++++++++++-------------------- 3 files changed, 25 insertions(+), 29 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 900323d..1007ea9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,7 +20,6 @@ edition = "2018" [dependencies] async-channel = "1.5.1" async-dup = "1.2.2" -async-global-executor = "2.3.1" async-io = "1.13.0" futures-lite = "1.13.0" http-types = { version = "2.9.0", default-features = false } diff --git a/src/read_notifier.rs b/src/read_notifier.rs index 1db1942..f781517 100644 --- a/src/read_notifier.rs +++ b/src/read_notifier.rs @@ -2,8 +2,10 @@ use std::fmt; use std::pin::Pin; use std::task::{Context, Poll}; -use async_channel::Sender; -use futures_lite::io::{self, AsyncBufRead as BufRead, AsyncRead as Read}; +use futures_lite::{ + io::{self, AsyncBufRead as BufRead, AsyncRead as Read}, + Future, FutureExt, +}; /// ReadNotifier forwards [`async_std::io::Read`] and /// [`async_std::io::BufRead`] to an inner reader. When the @@ -14,7 +16,7 @@ use futures_lite::io::{self, AsyncBufRead as BufRead, AsyncRead as Read}; pub(crate) struct ReadNotifier { #[pin] reader: B, - sender: Sender<()>, + before_read: Pin + Send + Sync>>, has_been_read: bool, } @@ -27,10 +29,10 @@ impl fmt::Debug for ReadNotifier { } impl ReadNotifier { - pub(crate) fn new(reader: B, sender: Sender<()>) -> Self { + pub(crate) fn new(reader: B, before_read: Box + Send + Sync>) -> Self { Self { reader, - sender, + before_read: Box::into_pin(before_read), has_been_read: false, } } @@ -55,9 +57,13 @@ impl Read for ReadNotifier { let this = self.project(); if !*this.has_been_read { - if let Ok(()) = this.sender.try_send(()) { - *this.has_been_read = true; - }; + // execute the before_read future before we start polling the reader + match this.before_read.poll(cx) { + Poll::Ready(_) => { + *this.has_been_read = true; + } + Poll::Pending => return Poll::Pending, + } } this.reader.poll_read(cx, buf) diff --git a/src/server/decode.rs b/src/server/decode.rs index a51829c..af45ca6 100644 --- a/src/server/decode.rs +++ b/src/server/decode.rs @@ -96,25 +96,16 @@ where "Unexpected Content-Length header" ); - // Establish a channel to wait for the body to be read. This - // allows us to avoid sending 100-continue in situations that - // respond without reading the body, saving clients from uploading - // their body. - let (body_read_sender, body_read_receiver) = async_channel::bounded(1); - - if Some(CONTINUE_HEADER_VALUE) == req.header(EXPECT).map(|h| h.as_str()) { - async_global_executor::spawn(async move { - // If the client expects a 100-continue header, spawn a - // task to wait for the first read attempt on the body. - if let Ok(()) = body_read_receiver.recv().await { + // Create a future to be polled when the body is to be read. By avoiding + // sending a 100-continue until the handler explicitly reads the body, + // we can save the client from uploading unnecessary data. + let before_read: Box + Send + Sync> = + match req.header(EXPECT).map(|h| h.as_str()) { + Some(CONTINUE_HEADER_VALUE) => Box::new(async move { io.write_all(CONTINUE_RESPONSE).await.ok(); - }; - // Since the sender is moved into the Body, this task will - // finish when the client disconnects, whether or not - // 100-continue was sent. - }) - .detach(); - } + }), + _ => Box::new(async {}), + }; // Check for Transfer-Encoding if transfer_encoding @@ -125,7 +116,7 @@ where let reader = ChunkedDecoder::new(reader, trailer_sender); let reader = Arc::new(Mutex::new(reader)); let reader_clone = reader.clone(); - let reader = ReadNotifier::new(reader, body_read_sender); + let reader = ReadNotifier::new(reader, before_read); let reader = BufReader::new(reader); req.set_body(Body::from_reader(reader, None)); Ok(Some((req, BodyReader::Chunked(reader_clone)))) @@ -133,7 +124,7 @@ where let len = len.len(); let reader = Arc::new(Mutex::new(reader.take(len))); req.set_body(Body::from_reader( - BufReader::new(ReadNotifier::new(reader.clone(), body_read_sender)), + BufReader::new(ReadNotifier::new(reader.clone(), before_read)), Some(len as usize), )); Ok(Some((req, BodyReader::Fixed(reader)))) From c499feb596cdc30ddf640fa7100c26d4b1454163 Mon Sep 17 00:00:00 2001 From: Kestrel Date: Sun, 10 Nov 2024 20:15:29 -0800 Subject: [PATCH 2/2] Move async-channel to [dev-dependencies]. --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 1007ea9..565c3c3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,7 +18,6 @@ readme = "README.md" edition = "2018" [dependencies] -async-channel = "1.5.1" async-dup = "1.2.2" async-io = "1.13.0" futures-lite = "1.13.0" @@ -28,5 +27,6 @@ log = "0.4.11" pin-project = "1.0.2" [dev-dependencies] +async-channel = "1.5.1" async-std = { version = "1.7.0", features = ["attributes"] } pretty_assertions = "0.6.1"