Skip to content

Commit

Permalink
feat: not returning UnexpectedEof when client drop without close_notify
Browse files Browse the repository at this point in the history
  • Loading branch information
dswij committed Jan 19, 2024
1 parent 99c64cf commit 22c61b3
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 3 deletions.
20 changes: 17 additions & 3 deletions src/proto/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -461,13 +461,27 @@ where
// active streams must be reset.
//
// TODO: Are I/O errors recoverable?
Err(Error::Io(e, inner)) => {
tracing::debug!(error = ?e, "Connection::poll; IO error");
let e = Error::Io(e, inner);
Err(Error::Io(kind, inner)) => {
tracing::debug!(error = ?kind, "Connection::poll; IO error");
let e = Error::Io(kind, inner);

// Reset all active streams
self.streams.handle_error(e.clone());

// Some client implementations drop the connections without notifying its peer
// Attempting to read after the client dropped the connection results in UnexpectedEof
// If as a server, we don't have anything more to send, just close the connection
// without error
//
// See https://github.com/hyperium/hyper/issues/3427
if self.streams.is_server()
&& self.streams.is_buffer_empty()
&& matches!(kind, io::ErrorKind::UnexpectedEof)
{
*self.state = State::Closed(Reason::NO_ERROR, Initiator::Library);
return Ok(());
}

// Return the error
Err(e)
}
Expand Down
4 changes: 4 additions & 0 deletions src/proto/streams/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ impl<T> Buffer<T> {
pub fn new() -> Self {
Buffer { slab: Slab::new() }
}

pub fn is_empty(&self) -> bool {
return self.slab.is_empty();
}
}

impl Deque {
Expand Down
13 changes: 13 additions & 0 deletions src/proto/streams/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,14 @@ where
}

impl<B> DynStreams<'_, B> {
pub fn is_buffer_empty(&self) -> bool {
self.send_buffer.is_empty()
}

pub fn is_server(&self) -> bool {
self.peer.is_server()
}

pub fn recv_headers(&mut self, frame: frame::Headers) -> Result<(), Error> {
let mut me = self.inner.lock().unwrap();

Expand Down Expand Up @@ -1509,6 +1517,11 @@ impl<B> SendBuffer<B> {
let inner = Mutex::new(Buffer::new());
SendBuffer { inner }
}

pub fn is_empty(&self) -> bool {
let buf = self.inner.lock().unwrap();
buf.is_empty()
}
}

// ===== impl Actions =====
Expand Down
50 changes: 50 additions & 0 deletions tests/h2-tests/tests/client_request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1773,6 +1773,56 @@ async fn receive_settings_frame_twice_with_second_one_empty() {
join(srv, h2).await;
}

#[tokio::test]
async fn receive_settings_frame_twice_with_second_one_non_empty() {
h2_support::trace_init!();
let (io, mut srv) = mock::new();

let srv = async move {
// Send the initial SETTINGS frame with MAX_CONCURRENT_STREAMS set to 42
srv.send_frame(frames::settings().max_concurrent_streams(42))
.await;

// Handle the client's connection preface
srv.read_preface().await.unwrap();
match srv.next().await {
Some(frame) => match frame.unwrap() {
h2::frame::Frame::Settings(_) => {
let ack = frame::Settings::ack();
srv.send(ack.into()).await.unwrap();
}
frame => {
panic!("unexpected frame: {:?}", frame);
}
},
None => {
panic!("unexpected EOF");
}
}

// Should receive the ack for the server's initial SETTINGS frame
let frame = assert_settings!(srv.next().await.unwrap().unwrap());
assert!(frame.is_ack());

// Send another SETTINGS frame with no MAX_CONCURRENT_STREAMS
// This should not update the max_concurrent_send_streams value that
// the client manages.
srv.send_frame(frames::settings().max_concurrent_streams(2024))
.await;
};

let h2 = async move {
let (_client, h2) = client::handshake(io).await.unwrap();
let mut h2 = std::pin::pin!(h2);
assert_eq!(h2.max_concurrent_send_streams(), usize::MAX);
h2.as_mut().await.unwrap();
// The most-recently advertised value should be used
assert_eq!(h2.max_concurrent_send_streams(), 2024);
};

join(srv, h2).await;
}

#[tokio::test]
async fn server_drop_connection_unexpectedly_return_unexpected_eof_err() {
h2_support::trace_init!();
Expand Down

0 comments on commit 22c61b3

Please sign in to comment.