Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: not returning UnexpectedEof when client drop without close_notify #743

Merged
merged 2 commits into from
Jan 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 17 additions & 3 deletions src/proto/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -461,13 +461,27 @@ where
// active streams must be reset.
//
// TODO: Are I/O errors recoverable?
Err(Error::Io(e, inner)) => {
tracing::debug!(error = ?e, "Connection::poll; IO error");
let e = Error::Io(e, inner);
Err(Error::Io(kind, inner)) => {
tracing::debug!(error = ?kind, "Connection::poll; IO error");
let e = Error::Io(kind, inner);

// Reset all active streams
self.streams.handle_error(e.clone());

// Some client implementations drop the connections without notifying its peer
// Attempting to read after the client dropped the connection results in UnexpectedEof
// If as a server, we don't have anything more to send, just close the connection
// without error
//
// See https://github.com/hyperium/hyper/issues/3427
if self.streams.is_server()
&& self.streams.is_buffer_empty()
&& matches!(kind, io::ErrorKind::UnexpectedEof)
{
*self.state = State::Closed(Reason::NO_ERROR, Initiator::Library);
return Ok(());
}

// Return the error
Err(e)
}
Expand Down
4 changes: 4 additions & 0 deletions src/proto/streams/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ impl<T> Buffer<T> {
pub fn new() -> Self {
Buffer { slab: Slab::new() }
}

pub fn is_empty(&self) -> bool {
self.slab.is_empty()
}
}

impl Deque {
Expand Down
13 changes: 13 additions & 0 deletions src/proto/streams/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,14 @@ where
}

impl<B> DynStreams<'_, B> {
pub fn is_buffer_empty(&self) -> bool {
self.send_buffer.is_empty()
}

pub fn is_server(&self) -> bool {
self.peer.is_server()
}

pub fn recv_headers(&mut self, frame: frame::Headers) -> Result<(), Error> {
let mut me = self.inner.lock().unwrap();

Expand Down Expand Up @@ -1509,6 +1517,11 @@ impl<B> SendBuffer<B> {
let inner = Mutex::new(Buffer::new());
SendBuffer { inner }
}

pub fn is_empty(&self) -> bool {
let buf = self.inner.lock().unwrap();
buf.is_empty()
}
}

// ===== impl Actions =====
Expand Down
16 changes: 16 additions & 0 deletions tests/h2-support/src/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ struct Inner {

/// True when the pipe is closed.
closed: bool,

/// Trigger an `UnexpectedEof` error on read
unexpected_eof: bool,
}

const PREFACE: &[u8] = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n";
Expand All @@ -73,6 +76,7 @@ pub fn new_with_write_capacity(cap: usize) -> (Mock, Handle) {
tx_rem: cap,
tx_rem_task: None,
closed: false,
unexpected_eof: false,
}));

let mock = Mock {
Expand All @@ -96,6 +100,11 @@ impl Handle {
&mut self.codec
}

pub fn close_without_notify(&mut self) {
let mut me = self.codec.get_mut().inner.lock().unwrap();
me.unexpected_eof = true;
}

/// Send a frame
pub async fn send(&mut self, item: SendFrame) -> Result<(), SendError> {
// Queue the frame
Expand Down Expand Up @@ -348,6 +357,13 @@ impl AsyncRead for Mock {

let mut me = self.pipe.inner.lock().unwrap();

if me.unexpected_eof {
return Poll::Ready(Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
"Simulate an unexpected eof error",
)));
}

if me.rx.is_empty() {
if me.closed {
return Poll::Ready(Ok(()));
Expand Down
41 changes: 41 additions & 0 deletions tests/h2-tests/tests/client_request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use futures::future::{join, ready, select, Either};
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use h2_support::prelude::*;
use std::io;
use std::pin::Pin;
use std::task::Context;

Expand Down Expand Up @@ -1822,6 +1823,46 @@ async fn receive_settings_frame_twice_with_second_one_non_empty() {
join(srv, h2).await;
}

#[tokio::test]
async fn server_drop_connection_unexpectedly_return_unexpected_eof_err() {
h2_support::trace_init!();
let (io, mut srv) = mock::new();

let srv = async move {
let settings = srv.assert_client_handshake().await;
assert_default_settings!(settings);
srv.recv_frame(
frames::headers(1)
.request("GET", "https://http2.akamai.com/")
.eos(),
)
.await;
srv.close_without_notify();
};

let h2 = async move {
let (mut client, h2) = client::handshake(io).await.unwrap();
tokio::spawn(async move {
let request = Request::builder()
.uri("https://http2.akamai.com/")
.body(())
.unwrap();
let _res = client
.send_request(request, true)
.unwrap()
.0
.await
.expect("request");
});
let err = h2.await.expect_err("should receive UnexpectedEof");
assert_eq!(
err.get_io().expect("should be UnexpectedEof").kind(),
io::ErrorKind::UnexpectedEof,
);
};
join(srv, h2).await;
}

const SETTINGS: &[u8] = &[0, 0, 0, 4, 0, 0, 0, 0, 0];
const SETTINGS_ACK: &[u8] = &[0, 0, 0, 4, 1, 0, 0, 0, 0];

Expand Down
37 changes: 37 additions & 0 deletions tests/h2-tests/tests/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1416,3 +1416,40 @@ async fn reject_informational_status_header_in_request() {

join(client, srv).await;
}

#[tokio::test]
async fn client_drop_connection_without_close_notify() {
h2_support::trace_init!();

let (io, mut client) = mock::new();
let client = async move {
let _recv_settings = client.assert_server_handshake().await;
client
.send_frame(frames::headers(1).request("GET", "https://example.com/"))
.await;
client.send_frame(frames::data(1, &b"hello"[..])).await;
client.recv_frame(frames::headers(1).response(200)).await;

client.close_without_notify(); // Client closed without notify causing UnexpectedEof
};

let mut builder = server::Builder::new();
builder.max_concurrent_streams(1);

let h2 = async move {
let mut srv = builder.handshake::<_, Bytes>(io).await.expect("handshake");
let (req, mut stream) = srv.next().await.unwrap().unwrap();

assert_eq!(req.method(), &http::Method::GET);

let rsp = http::Response::builder().status(200).body(()).unwrap();
stream.send_response(rsp, false).unwrap();

// Step the conn state forward and hitting the EOF
// But we have no outstanding request from client to be satisfied, so we should not return
// an error
let _ = poll_fn(|cx| srv.poll_closed(cx)).await.unwrap();
};

join(client, h2).await;
}