From 71541a26ac4078468a2f0fe854839c905037c610 Mon Sep 17 00:00:00 2001 From: Arnau Orriols Date: Tue, 18 Apr 2023 08:26:15 +0200 Subject: [PATCH 1/4] fix(client): run `connect_to` to completion even if `ResponseFuture` is canceled In the previous implementation the `connect_to` future was spawned to let it finish when the checkout future won the race. However, in the event of the user dropping the `ResponseFuture`, the `connect_to` future was canceled as well, and all the http2 requests waiting for the connection would error as a result. This commit prevents that by spawning the `connect_to` future invariably (once it starts) thus covering both cases. Fixes #3199 --- src/client/client.rs | 211 ++++++++++++++++++++----------------------- src/common/lazy.rs | 76 ---------------- src/common/mod.rs | 4 - tests/client.rs | 63 +++++++++++++ 4 files changed, 160 insertions(+), 194 deletions(-) delete mode 100644 src/common/lazy.rs diff --git a/src/client/client.rs b/src/client/client.rs index 9baead7349..2b3a539480 100644 --- a/src/client/client.rs +++ b/src/client/client.rs @@ -12,10 +12,7 @@ use tracing::{debug, trace, warn}; use crate::body::{Body, HttpBody}; use crate::client::connect::CaptureConnectionExtension; -use crate::common::{ - exec::BoxSendFuture, lazy as hyper_lazy, sync_wrapper::SyncWrapper, task, Future, Lazy, Pin, - Poll, -}; +use crate::common::{exec::BoxSendFuture, sync_wrapper::SyncWrapper, task, Future, Pin, Poll}; use crate::rt::Executor; use super::conn; @@ -366,34 +363,10 @@ where // The order of the `select` is depended on below... match future::select(checkout, connect).await { - // Checkout won, connect future may have been started or not. - // - // If it has, let it finish and insert back into the pool, - // so as to not waste the socket... - Either::Left((Ok(checked_out), connecting)) => { - // This depends on the `select` above having the correct - // order, such that if the checkout future were ready - // immediately, the connect future will never have been - // started. - // - // If it *wasn't* ready yet, then the connect future will - // have been started... - if connecting.started() { - let bg = connecting - .map_err(|err| { - trace!("background connect error: {}", err); - }) - .map(|_pooled| { - // dropping here should just place it in - // the Pool for us... - }); - // An execute error here isn't important, we're just trying - // to prevent a waste of a socket... - #[cfg_attr(feature = "deprecated", allow(deprecated))] - self.conn_builder.exec.execute(bg); - } - Ok(checked_out) - } + // Checkout won. If connect has started it will finish in the + // background and insert back into the pool, so as to not waste the + // socket... + Either::Left((Ok(checked_out), _connecting)) => Ok(checked_out), // Connect won, checkout can just be dropped. Either::Right((Ok(connected), _checkout)) => Ok(connected), // Either checkout or connect could get canceled: @@ -433,7 +406,7 @@ where fn connect_to( &self, pool_key: PoolKey, - ) -> impl Lazy>>> + Unpin { + ) -> impl Future>>> + Unpin + '_ { #[cfg_attr(feature = "deprecated", allow(deprecated))] let executor = self.conn_builder.exec.clone(); let pool = self.pool.clone(); @@ -445,91 +418,101 @@ where let is_ver_h2 = ver == Ver::Http2; let connector = self.connector.clone(); let dst = domain_as_uri(pool_key.clone()); - hyper_lazy(move || { - // Try to take a "connecting lock". - // - // If the pool_key is for HTTP/2, and there is already a - // connection being established, then this can't take a - // second lock. The "connect_to" future is Canceled. - let connecting = match pool.connecting(&pool_key, ver) { - Some(lock) => lock, - None => { - let canceled = - crate::Error::new_canceled().with("HTTP/2 connection in progress"); - return Either::Right(future::err(canceled)); - } - }; - Either::Left( - connector - .connect(connect::sealed::Internal, dst) - .map_err(crate::Error::new_connect) - .and_then(move |io| { - let connected = io.connected(); - // If ALPN is h2 and we aren't http2_only already, - // then we need to convert our pool checkout into - // a single HTTP2 one. - let connecting = if connected.alpn == Alpn::H2 && !is_ver_h2 { - match connecting.alpn_h2(&pool) { - Some(lock) => { - trace!("ALPN negotiated h2, updating pool"); - lock - } - None => { - // Another connection has already upgraded, - // the pool checkout should finish up for us. - let canceled = crate::Error::new_canceled() - .with("ALPN upgraded to HTTP/2"); - return Either::Right(future::err(canceled)); - } + // Try to take a "connecting lock". + // + // If the pool_key is for HTTP/2, and there is already a + // connection being established, then this can't take a + // second lock. The "connect_to" future is Canceled. + let connecting = match pool.connecting(&pool_key, ver) { + Some(lock) => lock, + None => { + let canceled = crate::Error::new_canceled().with("HTTP/2 connection in progress"); + return Either::Right(future::err(canceled)); + } + }; + Either::Left(Box::pin(async move { + // This future is executed in the background and its result is + // awaited in a oneshot channel + let connect_fut = connector + .connect(connect::sealed::Internal, dst) + .map_err(crate::Error::new_connect) + .and_then(move |io| { + let connected = io.connected(); + // If ALPN is h2 and we aren't http2_only already, + // then we need to convert our pool checkout into + // a single HTTP2 one. + let connecting = if connected.alpn == Alpn::H2 && !is_ver_h2 { + match connecting.alpn_h2(&pool) { + Some(lock) => { + trace!("ALPN negotiated h2, updating pool"); + lock + } + None => { + // Another connection has already upgraded, + // the pool checkout should finish up for us. + let canceled = + crate::Error::new_canceled().with("ALPN upgraded to HTTP/2"); + return Either::Right(future::err(canceled)); } - } else { - connecting - }; - - #[cfg_attr(not(feature = "http2"), allow(unused))] - let is_h2 = is_ver_h2 || connected.alpn == Alpn::H2; - #[cfg(feature = "http2")] - { - conn_builder.http2_only(is_h2); } + } else { + connecting + }; + + #[cfg_attr(not(feature = "http2"), allow(unused))] + let is_h2 = is_ver_h2 || connected.alpn == Alpn::H2; + #[cfg(feature = "http2")] + { + conn_builder.http2_only(is_h2); + } - Either::Left(Box::pin(async move { - let (tx, conn) = conn_builder.handshake(io).await?; - - trace!("handshake complete, spawning background dispatcher task"); - executor.execute( - conn.map_err(|e| debug!("client connection error: {}", e)) - .map(|_| ()), - ); - - // Wait for 'conn' to ready up before we - // declare this tx as usable - let tx = tx.when_ready().await?; - - let tx = { - #[cfg(feature = "http2")] - { - if is_h2 { - PoolTx::Http2(tx.into_http2()) - } else { - PoolTx::Http1(tx) - } + Either::Left(async move { + let (tx, conn) = conn_builder.handshake(io).await?; + + trace!("handshake complete, spawning background dispatcher task"); + executor.execute( + conn.map_err(|e| debug!("client connection error: {}", e)) + .map(|_| ()), + ); + + // Wait for 'conn' to ready up before we + // declare this tx as usable + let tx = tx.when_ready().await?; + + let tx = { + #[cfg(feature = "http2")] + { + if is_h2 { + PoolTx::Http2(tx.into_http2()) + } else { + PoolTx::Http1(tx) } - #[cfg(not(feature = "http2"))] - PoolTx::Http1(tx) - }; - - Ok(pool.pooled( - connecting, - PoolClient { - conn_info: connected, - tx, - }, - )) - })) - }), - ) - }) + } + #[cfg(not(feature = "http2"))] + PoolTx::Http1(tx) + }; + + Ok(pool.pooled( + connecting, + PoolClient { + conn_info: connected, + tx, + }, + )) + }) + }); + let (tx, rx) = oneshot::channel(); + self.conn_builder.exec.execute(async move { + let result = connect_fut.await; + if let Err(Err(err)) = tx.send(result) { + // The receiver has been dropped (checkout won or the user + // dropped the `ResponseFuture`). We have nowhere to send + // the error + trace!("background connect error: {}", err); + } + }); + rx.await.expect("connect_tx should never drop") + })) } } diff --git a/src/common/lazy.rs b/src/common/lazy.rs deleted file mode 100644 index 2722077303..0000000000 --- a/src/common/lazy.rs +++ /dev/null @@ -1,76 +0,0 @@ -use pin_project_lite::pin_project; - -use super::{task, Future, Pin, Poll}; - -pub(crate) trait Started: Future { - fn started(&self) -> bool; -} - -pub(crate) fn lazy(func: F) -> Lazy -where - F: FnOnce() -> R, - R: Future + Unpin, -{ - Lazy { - inner: Inner::Init { func }, - } -} - -// FIXME: allow() required due to `impl Trait` leaking types to this lint -pin_project! { - #[allow(missing_debug_implementations)] - pub(crate) struct Lazy { - #[pin] - inner: Inner, - } -} - -pin_project! { - #[project = InnerProj] - #[project_replace = InnerProjReplace] - enum Inner { - Init { func: F }, - Fut { #[pin] fut: R }, - Empty, - } -} - -impl Started for Lazy -where - F: FnOnce() -> R, - R: Future, -{ - fn started(&self) -> bool { - match self.inner { - Inner::Init { .. } => false, - Inner::Fut { .. } | Inner::Empty => true, - } - } -} - -impl Future for Lazy -where - F: FnOnce() -> R, - R: Future, -{ - type Output = R::Output; - - fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { - let mut this = self.project(); - - if let InnerProj::Fut { fut } = this.inner.as_mut().project() { - return fut.poll(cx); - } - - match this.inner.as_mut().project_replace(Inner::Empty) { - InnerProjReplace::Init { func } => { - this.inner.set(Inner::Fut { fut: func() }); - if let InnerProj::Fut { fut } = this.inner.project() { - return fut.poll(cx); - } - unreachable!() - } - _ => unreachable!("lazy state wrong"), - } - } -} diff --git a/src/common/mod.rs b/src/common/mod.rs index e38c6f5c7a..ae92582bc0 100644 --- a/src/common/mod.rs +++ b/src/common/mod.rs @@ -15,8 +15,6 @@ pub(crate) mod drain; #[cfg(any(feature = "http1", feature = "http2", feature = "server"))] pub(crate) mod exec; pub(crate) mod io; -#[cfg(all(feature = "client", any(feature = "http1", feature = "http2")))] -mod lazy; mod never; #[cfg(any( feature = "stream", @@ -26,8 +24,6 @@ pub(crate) mod sync_wrapper; pub(crate) mod task; pub(crate) mod watch; -#[cfg(all(feature = "client", any(feature = "http1", feature = "http2")))] -pub(crate) use self::lazy::{lazy, Started as Lazy}; #[cfg(any(feature = "http1", feature = "http2", feature = "runtime"))] pub(crate) use self::never::Never; pub(crate) use self::task::Poll; diff --git a/tests/client.rs b/tests/client.rs index 2953313798..610dfc610e 100644 --- a/tests/client.rs +++ b/tests/client.rs @@ -1358,6 +1358,69 @@ mod dispatch_impl { future::select(t, close).await; } + #[tokio::test] + async fn http2_connection_waiters_are_not_canceled_when_outstanding_connecting_task_is_canceled() { + let _ = pretty_env_logger::try_init(); + + let server = TcpListener::bind("127.0.0.1:0").unwrap(); + let addr = server.local_addr().unwrap(); + + #[derive(Clone)] + struct SlowConnector; + + impl hyper::service::Service for SlowConnector { + type Response = TcpStream; + type Error = hyper::Error; + type Future = future::Pending>; + + fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, _dst: Uri) -> Self::Future { + // A connector that takes a long time to resolve + future::pending() + } + } + + let client = Client::builder().http2_only(true).build(SlowConnector); + + // This first request starts the connecting task + let req = Request::builder() + .uri(&*format!("http://{}/a", addr)) + .body(Body::empty()) + .unwrap(); + let mut res1 = client.request(req).map(|_| unreachable!()); + + // This second request waits for the connecting task to finish + // instead of starting a new connecting task + let req = Request::builder() + .uri(&*format!("http://{}/a", addr)) + .body(Body::empty()) + .unwrap(); + let mut res2 = client + .request(req) + .map(|r| unreachable!("res2 should had never resolved, but resulted in {:?}", r)); + + // Prime the requests + assert!( + future::poll_fn(|ctx| Poll::Ready(Pin::new(&mut res1).poll(ctx).is_pending())).await + ); + assert!( + future::poll_fn(|ctx| Poll::Ready(Pin::new(&mut res2).poll(ctx).is_pending())).await + ); + + // The `ResponseFuture` that drives the connecting task is dropped, but + // the connecting task should finish in the background + drop(res1); + + assert!( + // The second response has not been canceled and it is still waiting for the connection + // that will never arrive + future::poll_fn(|ctx| Poll::Ready(Pin::new(&mut res2).poll(ctx).is_pending())).await + ); + } + #[tokio::test] async fn drop_response_body_closes_in_progress_connection() { let _ = pretty_env_logger::try_init(); From ea3fdb7ae3805d9cf9bb688cae7b516c5f459ab1 Mon Sep 17 00:00:00 2001 From: Arnau Orriols Date: Tue, 18 Apr 2023 16:37:53 +0200 Subject: [PATCH 2/4] Fix fmt --- tests/client.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/tests/client.rs b/tests/client.rs index 610dfc610e..099693280d 100644 --- a/tests/client.rs +++ b/tests/client.rs @@ -1359,7 +1359,8 @@ mod dispatch_impl { } #[tokio::test] - async fn http2_connection_waiters_are_not_canceled_when_outstanding_connecting_task_is_canceled() { + async fn http2_connection_waiters_are_not_canceled_when_outstanding_connecting_task_is_canceled( + ) { let _ = pretty_env_logger::try_init(); let server = TcpListener::bind("127.0.0.1:0").unwrap(); @@ -1385,7 +1386,7 @@ mod dispatch_impl { let client = Client::builder().http2_only(true).build(SlowConnector); - // This first request starts the connecting task + // This first request starts the connecting task let req = Request::builder() .uri(&*format!("http://{}/a", addr)) .body(Body::empty()) From 241a0ff8242a60229ed9e6b6ffa390f0f883abc9 Mon Sep 17 00:00:00 2001 From: Arnau Orriols Date: Wed, 19 Apr 2023 00:57:27 +0200 Subject: [PATCH 3/4] Fix deprecated use of `conn_builder.exec.execute` --- src/client/client.rs | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/src/client/client.rs b/src/client/client.rs index fb83ed0ebf..60588a94ad 100644 --- a/src/client/client.rs +++ b/src/client/client.rs @@ -248,7 +248,8 @@ where if req.version() == Version::HTTP_2 { warn!("Connection is HTTP/1, but request requires HTTP/2"); return Err(ClientError::Normal( - crate::Error::new_user_unsupported_version().with_client_connect_info(pooled.conn_info.clone()), + crate::Error::new_user_unsupported_version() + .with_client_connect_info(pooled.conn_info.clone()), )); } @@ -502,6 +503,7 @@ where }) }); let (tx, rx) = oneshot::channel(); + #[cfg_attr(feature = "deprecated", allow(deprecated))] self.conn_builder.exec.execute(async move { let result = connect_fut.await; if let Err(Err(err)) = tx.send(result) { @@ -581,7 +583,7 @@ impl ResponseFuture { F: Future>> + Send + 'static, { Self { - inner: SyncWrapper::new(Box::pin(value)) + inner: SyncWrapper::new(Box::pin(value)), } } @@ -686,7 +688,10 @@ where { fn is_open(&self) -> bool { if self.conn_info.poisoned.poisoned() { - trace!("marking {:?} as closed because it was poisoned", self.conn_info); + trace!( + "marking {:?} as closed because it was poisoned", + self.conn_info + ); return false; } match self.tx { @@ -1089,10 +1094,7 @@ impl Builder { /// line in the input to resume parsing the rest of the headers. An error /// will be emitted nonetheless if it finds `\0` or a lone `\r` while /// looking for the next line. - pub fn http1_ignore_invalid_headers_in_responses( - &mut self, - val: bool, - ) -> &mut Builder { + pub fn http1_ignore_invalid_headers_in_responses(&mut self, val: bool) -> &mut Builder { self.conn_builder .http1_ignore_invalid_headers_in_responses(val); self From fa3e220f0ee59b22464d99e3fd5d4470f58f22d0 Mon Sep 17 00:00:00 2001 From: Arnau Orriols Date: Tue, 2 May 2023 02:03:30 +0200 Subject: [PATCH 4/4] Make second request succeed to ensure the completion of the connection task --- tests/client.rs | 64 ++++++++++++++++++++++++++++++++++--------------- 1 file changed, 45 insertions(+), 19 deletions(-) diff --git a/tests/client.rs b/tests/client.rs index 099693280d..dd3c418400 100644 --- a/tests/client.rs +++ b/tests/client.rs @@ -1122,12 +1122,14 @@ mod dispatch_impl { use http::Uri; use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; use tokio::net::TcpStream; + use tokio::sync::Notify; use tokio_test::block_on; use super::support; use hyper::body::HttpBody; use hyper::client::connect::{capture_connection, Connected, Connection, HttpConnector}; - use hyper::Client; + use hyper::service::{make_service_fn, service_fn}; + use hyper::{Client, Response, Server}; #[test] fn drop_body_before_eof_closes_connection() { @@ -1363,35 +1365,59 @@ mod dispatch_impl { ) { let _ = pretty_env_logger::try_init(); - let server = TcpListener::bind("127.0.0.1:0").unwrap(); - let addr = server.local_addr().unwrap(); + let server = Server::bind(&([127, 0, 0, 1], 0).into()) + .http2_only(true) + .serve(make_service_fn(|_| async move { + Ok::<_, hyper::Error>(service_fn(|_req| { + future::ok::<_, hyper::Error>(Response::new(Body::empty())) + })) + })); + let addr = server.local_addr(); + let (shdn_tx, shdn_rx) = oneshot::channel(); + tokio::task::spawn(async move { + server + .with_graceful_shutdown(async move { + let _ = shdn_rx.await; + }) + .await + .expect("server") + }); + let first_req_cancelled = Arc::new(Notify::new()); #[derive(Clone)] - struct SlowConnector; + struct SlowConnector(Arc); impl hyper::service::Service for SlowConnector { type Response = TcpStream; - type Error = hyper::Error; - type Future = future::Pending>; + type Error = io::Error; + type Future = Pin> + Send>>; fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { Poll::Ready(Ok(())) } - fn call(&mut self, _dst: Uri) -> Self::Future { - // A connector that takes a long time to resolve - future::pending() + fn call(&mut self, dst: Uri) -> Self::Future { + // A connector that connects after the first request has been cancelled + let first_req_cancelled_rx = self.0.clone(); + Box::pin(async move { + first_req_cancelled_rx.notified().await; + tcp_connect(&dst.authority().unwrap().to_string().parse().unwrap()).await + }) } } - let client = Client::builder().http2_only(true).build(SlowConnector); + let client = Client::builder() + .http2_only(true) + .build(SlowConnector(first_req_cancelled.clone())); // This first request starts the connecting task let req = Request::builder() .uri(&*format!("http://{}/a", addr)) .body(Body::empty()) .unwrap(); - let mut res1 = client.request(req).map(|_| unreachable!()); + let mut res1 = client.request(req).inspect(|_| { + unreachable!("first request should be cancelled before connection is stablished") + }); // This second request waits for the connecting task to finish // instead of starting a new connecting task @@ -1399,9 +1425,7 @@ mod dispatch_impl { .uri(&*format!("http://{}/a", addr)) .body(Body::empty()) .unwrap(); - let mut res2 = client - .request(req) - .map(|r| unreachable!("res2 should had never resolved, but resulted in {:?}", r)); + let mut res2 = client.request(req); // Prime the requests assert!( @@ -1414,12 +1438,14 @@ mod dispatch_impl { // The `ResponseFuture` that drives the connecting task is dropped, but // the connecting task should finish in the background drop(res1); + first_req_cancelled.notify_one(); - assert!( - // The second response has not been canceled and it is still waiting for the connection - // that will never arrive - future::poll_fn(|ctx| Poll::Ready(Pin::new(&mut res2).poll(ctx).is_pending())).await - ); + // The second response has not been canceled and finishes successfully + tokio::time::timeout(Duration::from_millis(100), res2) + .await + .expect("response to second request should resolve within 100ms") + .expect("response to second request should resolve successfully"); + let _ = shdn_tx.send(()); } #[tokio::test]