Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(client): run connect_to to completion even if ResponseFuture is canceled #3205

Open
wants to merge 5 commits into
base: 0.14.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
227 changes: 106 additions & 121 deletions src/client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,7 @@ use tracing::{debug, trace, warn};

use crate::body::{Body, HttpBody};
use crate::client::connect::CaptureConnectionExtension;
use crate::common::{
exec::BoxSendFuture, lazy as hyper_lazy, sync_wrapper::SyncWrapper, task, Future, Lazy, Pin,
Poll,
};
use crate::common::{exec::BoxSendFuture, sync_wrapper::SyncWrapper, task, Future, Pin, Poll};
use crate::rt::Executor;

use super::conn;
Expand Down Expand Up @@ -251,7 +248,8 @@ where
if req.version() == Version::HTTP_2 {
warn!("Connection is HTTP/1, but request requires HTTP/2");
return Err(ClientError::Normal(
crate::Error::new_user_unsupported_version().with_client_connect_info(pooled.conn_info.clone()),
crate::Error::new_user_unsupported_version()
.with_client_connect_info(pooled.conn_info.clone()),
));
}

Expand Down Expand Up @@ -366,34 +364,10 @@ where
// The order of the `select` is depended on below...

match future::select(checkout, connect).await {
// Checkout won, connect future may have been started or not.
//
// If it has, let it finish and insert back into the pool,
// so as to not waste the socket...
Either::Left((Ok(checked_out), connecting)) => {
// This depends on the `select` above having the correct
// order, such that if the checkout future were ready
// immediately, the connect future will never have been
// started.
//
// If it *wasn't* ready yet, then the connect future will
// have been started...
if connecting.started() {
let bg = connecting
.map_err(|err| {
trace!("background connect error: {}", err);
})
.map(|_pooled| {
// dropping here should just place it in
// the Pool for us...
});
// An execute error here isn't important, we're just trying
// to prevent a waste of a socket...
#[cfg_attr(feature = "deprecated", allow(deprecated))]
self.conn_builder.exec.execute(bg);
}
Ok(checked_out)
}
// Checkout won. If connect has started it will finish in the
// background and insert back into the pool, so as to not waste the
// socket...
Either::Left((Ok(checked_out), _connecting)) => Ok(checked_out),
// Connect won, checkout can just be dropped.
Either::Right((Ok(connected), _checkout)) => Ok(connected),
// Either checkout or connect could get canceled:
Expand Down Expand Up @@ -433,7 +407,7 @@ where
fn connect_to(
&self,
pool_key: PoolKey,
) -> impl Lazy<Output = crate::Result<Pooled<PoolClient<B>>>> + Unpin {
) -> impl Future<Output = crate::Result<Pooled<PoolClient<B>>>> + Unpin + '_ {
#[cfg_attr(feature = "deprecated", allow(deprecated))]
let executor = self.conn_builder.exec.clone();
let pool = self.pool.clone();
Expand All @@ -445,91 +419,102 @@ where
let is_ver_h2 = ver == Ver::Http2;
let connector = self.connector.clone();
let dst = domain_as_uri(pool_key.clone());
hyper_lazy(move || {
// Try to take a "connecting lock".
//
// If the pool_key is for HTTP/2, and there is already a
// connection being established, then this can't take a
// second lock. The "connect_to" future is Canceled.
let connecting = match pool.connecting(&pool_key, ver) {
Some(lock) => lock,
None => {
let canceled =
crate::Error::new_canceled().with("HTTP/2 connection in progress");
return Either::Right(future::err(canceled));
}
};
Either::Left(
connector
.connect(connect::sealed::Internal, dst)
.map_err(crate::Error::new_connect)
.and_then(move |io| {
let connected = io.connected();
// If ALPN is h2 and we aren't http2_only already,
// then we need to convert our pool checkout into
// a single HTTP2 one.
let connecting = if connected.alpn == Alpn::H2 && !is_ver_h2 {
match connecting.alpn_h2(&pool) {
Some(lock) => {
trace!("ALPN negotiated h2, updating pool");
lock
}
None => {
// Another connection has already upgraded,
// the pool checkout should finish up for us.
let canceled = crate::Error::new_canceled()
.with("ALPN upgraded to HTTP/2");
return Either::Right(future::err(canceled));
}
// Try to take a "connecting lock".
//
// If the pool_key is for HTTP/2, and there is already a
// connection being established, then this can't take a
// second lock. The "connect_to" future is Canceled.
let connecting = match pool.connecting(&pool_key, ver) {
Some(lock) => lock,
None => {
let canceled = crate::Error::new_canceled().with("HTTP/2 connection in progress");
return Either::Right(future::err(canceled));
}
};
Either::Left(Box::pin(async move {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I see, by putting the call to connector.connect() inside this sub future, async move, it won't call connect() until the first poll. Got it.

// This future is executed in the background and its result is
// awaited in a oneshot channel
let connect_fut = connector
.connect(connect::sealed::Internal, dst)
.map_err(crate::Error::new_connect)
.and_then(move |io| {
let connected = io.connected();
// If ALPN is h2 and we aren't http2_only already,
// then we need to convert our pool checkout into
// a single HTTP2 one.
let connecting = if connected.alpn == Alpn::H2 && !is_ver_h2 {
match connecting.alpn_h2(&pool) {
Some(lock) => {
trace!("ALPN negotiated h2, updating pool");
lock
}
None => {
// Another connection has already upgraded,
// the pool checkout should finish up for us.
let canceled =
crate::Error::new_canceled().with("ALPN upgraded to HTTP/2");
return Either::Right(future::err(canceled));
}
} else {
connecting
};

#[cfg_attr(not(feature = "http2"), allow(unused))]
let is_h2 = is_ver_h2 || connected.alpn == Alpn::H2;
#[cfg(feature = "http2")]
{
conn_builder.http2_only(is_h2);
}
} else {
connecting
};

#[cfg_attr(not(feature = "http2"), allow(unused))]
let is_h2 = is_ver_h2 || connected.alpn == Alpn::H2;
#[cfg(feature = "http2")]
{
conn_builder.http2_only(is_h2);
}

Either::Left(Box::pin(async move {
let (tx, conn) = conn_builder.handshake(io).await?;

trace!("handshake complete, spawning background dispatcher task");
executor.execute(
conn.map_err(|e| debug!("client connection error: {}", e))
.map(|_| ()),
);

// Wait for 'conn' to ready up before we
// declare this tx as usable
let tx = tx.when_ready().await?;

let tx = {
#[cfg(feature = "http2")]
{
if is_h2 {
PoolTx::Http2(tx.into_http2())
} else {
PoolTx::Http1(tx)
}
Either::Left(async move {
let (tx, conn) = conn_builder.handshake(io).await?;

trace!("handshake complete, spawning background dispatcher task");
executor.execute(
conn.map_err(|e| debug!("client connection error: {}", e))
.map(|_| ()),
);

// Wait for 'conn' to ready up before we
// declare this tx as usable
let tx = tx.when_ready().await?;

let tx = {
#[cfg(feature = "http2")]
{
if is_h2 {
PoolTx::Http2(tx.into_http2())
} else {
PoolTx::Http1(tx)
}
#[cfg(not(feature = "http2"))]
PoolTx::Http1(tx)
};

Ok(pool.pooled(
connecting,
PoolClient {
conn_info: connected,
tx,
},
))
}))
}),
)
})
}
#[cfg(not(feature = "http2"))]
PoolTx::Http1(tx)
};

Ok(pool.pooled(
connecting,
PoolClient {
conn_info: connected,
tx,
},
))
})
});
let (tx, rx) = oneshot::channel();
#[cfg_attr(feature = "deprecated", allow(deprecated))]
self.conn_builder.exec.execute(async move {
let result = connect_fut.await;
if let Err(Err(err)) = tx.send(result) {
// The receiver has been dropped (checkout won or the user
// dropped the `ResponseFuture`). We have nowhere to send
// the error
trace!("background connect error: {}", err);
}
});
rx.await.expect("connect_tx should never drop")
}))
}
}

Expand Down Expand Up @@ -598,7 +583,7 @@ impl ResponseFuture {
F: Future<Output = crate::Result<Response<Body>>> + Send + 'static,
{
Self {
inner: SyncWrapper::new(Box::pin(value))
inner: SyncWrapper::new(Box::pin(value)),
}
}

Expand Down Expand Up @@ -703,7 +688,10 @@ where
{
fn is_open(&self) -> bool {
if self.conn_info.poisoned.poisoned() {
trace!("marking {:?} as closed because it was poisoned", self.conn_info);
trace!(
"marking {:?} as closed because it was poisoned",
self.conn_info
);
return false;
}
match self.tx {
Expand Down Expand Up @@ -1106,10 +1094,7 @@ impl Builder {
/// line in the input to resume parsing the rest of the headers. An error
/// will be emitted nonetheless if it finds `\0` or a lone `\r` while
/// looking for the next line.
pub fn http1_ignore_invalid_headers_in_responses(
&mut self,
val: bool,
) -> &mut Builder {
pub fn http1_ignore_invalid_headers_in_responses(&mut self, val: bool) -> &mut Builder {
self.conn_builder
.http1_ignore_invalid_headers_in_responses(val);
self
Expand Down
76 changes: 0 additions & 76 deletions src/common/lazy.rs

This file was deleted.

4 changes: 0 additions & 4 deletions src/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@ pub(crate) mod drain;
#[cfg(any(feature = "http1", feature = "http2", feature = "server"))]
pub(crate) mod exec;
pub(crate) mod io;
#[cfg(all(feature = "client", any(feature = "http1", feature = "http2")))]
mod lazy;
mod never;
#[cfg(any(
feature = "stream",
Expand All @@ -26,8 +24,6 @@ pub(crate) mod sync_wrapper;
pub(crate) mod task;
pub(crate) mod watch;

#[cfg(all(feature = "client", any(feature = "http1", feature = "http2")))]
pub(crate) use self::lazy::{lazy, Started as Lazy};
#[cfg(any(feature = "http1", feature = "http2", feature = "runtime"))]
pub(crate) use self::never::Never;
pub(crate) use self::task::Poll;
Expand Down
Loading