From 99c64cf3cd245e56958048d6c3e05f19eb84d61f Mon Sep 17 00:00:00 2001 From: dswij Date: Fri, 19 Jan 2024 17:31:48 +0800 Subject: [PATCH 1/2] tests: add test for peer unexpectedly dropping connection --- tests/h2-support/src/mock.rs | 16 ++++++ tests/h2-tests/tests/client_request.rs | 67 +++++++++++--------------- tests/h2-tests/tests/server.rs | 37 ++++++++++++++ 3 files changed, 82 insertions(+), 38 deletions(-) diff --git a/tests/h2-support/src/mock.rs b/tests/h2-support/src/mock.rs index 18d08484..60539d0a 100644 --- a/tests/h2-support/src/mock.rs +++ b/tests/h2-support/src/mock.rs @@ -54,6 +54,9 @@ struct Inner { /// True when the pipe is closed. closed: bool, + + /// Trigger an `UnexpectedEof` error on read + unexpected_eof: bool, } const PREFACE: &[u8] = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"; @@ -73,6 +76,7 @@ pub fn new_with_write_capacity(cap: usize) -> (Mock, Handle) { tx_rem: cap, tx_rem_task: None, closed: false, + unexpected_eof: false, })); let mock = Mock { @@ -96,6 +100,11 @@ impl Handle { &mut self.codec } + pub fn close_without_notify(&mut self) { + let mut me = self.codec.get_mut().inner.lock().unwrap(); + me.unexpected_eof = true; + } + /// Send a frame pub async fn send(&mut self, item: SendFrame) -> Result<(), SendError> { // Queue the frame @@ -348,6 +357,13 @@ impl AsyncRead for Mock { let mut me = self.pipe.inner.lock().unwrap(); + if me.unexpected_eof { + return Poll::Ready(Err(io::Error::new( + io::ErrorKind::UnexpectedEof, + "Simulate an unexpected eof error", + ))); + } + if me.rx.is_empty() { if me.closed { return Poll::Ready(Ok(())); diff --git a/tests/h2-tests/tests/client_request.rs b/tests/h2-tests/tests/client_request.rs index 13bc5f22..3d285ce2 100644 --- a/tests/h2-tests/tests/client_request.rs +++ b/tests/h2-tests/tests/client_request.rs @@ -2,6 +2,7 @@ use futures::future::{join, ready, select, Either}; use futures::stream::FuturesUnordered; use futures::StreamExt; use h2_support::prelude::*; +use std::io; use std::pin::Pin; use std::task::Context; @@ -1773,52 +1774,42 @@ async fn receive_settings_frame_twice_with_second_one_empty() { } #[tokio::test] -async fn receive_settings_frame_twice_with_second_one_non_empty() { +async fn server_drop_connection_unexpectedly_return_unexpected_eof_err() { h2_support::trace_init!(); let (io, mut srv) = mock::new(); let srv = async move { - // Send the initial SETTINGS frame with MAX_CONCURRENT_STREAMS set to 42 - srv.send_frame(frames::settings().max_concurrent_streams(42)) - .await; - - // Handle the client's connection preface - srv.read_preface().await.unwrap(); - match srv.next().await { - Some(frame) => match frame.unwrap() { - h2::frame::Frame::Settings(_) => { - let ack = frame::Settings::ack(); - srv.send(ack.into()).await.unwrap(); - } - frame => { - panic!("unexpected frame: {:?}", frame); - } - }, - None => { - panic!("unexpected EOF"); - } - } - - // Should receive the ack for the server's initial SETTINGS frame - let frame = assert_settings!(srv.next().await.unwrap().unwrap()); - assert!(frame.is_ack()); - - // Send another SETTINGS frame with no MAX_CONCURRENT_STREAMS - // This should not update the max_concurrent_send_streams value that - // the client manages. - srv.send_frame(frames::settings().max_concurrent_streams(2024)) - .await; + let settings = srv.assert_client_handshake().await; + assert_default_settings!(settings); + srv.recv_frame( + frames::headers(1) + .request("GET", "https://http2.akamai.com/") + .eos(), + ) + .await; + srv.close_without_notify(); }; let h2 = async move { - let (_client, h2) = client::handshake(io).await.unwrap(); - let mut h2 = std::pin::pin!(h2); - assert_eq!(h2.max_concurrent_send_streams(), usize::MAX); - h2.as_mut().await.unwrap(); - // The most-recently advertised value should be used - assert_eq!(h2.max_concurrent_send_streams(), 2024); + let (mut client, h2) = client::handshake(io).await.unwrap(); + tokio::spawn(async move { + let request = Request::builder() + .uri("https://http2.akamai.com/") + .body(()) + .unwrap(); + let _res = client + .send_request(request, true) + .unwrap() + .0 + .await + .expect("request"); + }); + let err = h2.await.expect_err("should receive UnexpectedEof"); + assert_eq!( + err.get_io().expect("should be UnexpectedEof").kind(), + io::ErrorKind::UnexpectedEof, + ); }; - join(srv, h2).await; } diff --git a/tests/h2-tests/tests/server.rs b/tests/h2-tests/tests/server.rs index dd97e94d..831f1882 100644 --- a/tests/h2-tests/tests/server.rs +++ b/tests/h2-tests/tests/server.rs @@ -1416,3 +1416,40 @@ async fn reject_informational_status_header_in_request() { join(client, srv).await; } + +#[tokio::test] +async fn client_drop_connection_without_close_notify() { + h2_support::trace_init!(); + + let (io, mut client) = mock::new(); + let client = async move { + let _recv_settings = client.assert_server_handshake().await; + client + .send_frame(frames::headers(1).request("GET", "https://example.com/")) + .await; + client.send_frame(frames::data(1, &b"hello"[..])).await; + client.recv_frame(frames::headers(1).response(200)).await; + + client.close_without_notify(); // Client closed without notify causing UnexpectedEof + }; + + let mut builder = server::Builder::new(); + builder.max_concurrent_streams(1); + + let h2 = async move { + let mut srv = builder.handshake::<_, Bytes>(io).await.expect("handshake"); + let (req, mut stream) = srv.next().await.unwrap().unwrap(); + + assert_eq!(req.method(), &http::Method::GET); + + let rsp = http::Response::builder().status(200).body(()).unwrap(); + stream.send_response(rsp, false).unwrap(); + + // Step the conn state forward and hitting the EOF + // But we have no outstanding request from client to be satisfied, so we should not return + // an error + let _ = poll_fn(|cx| srv.poll_closed(cx)).await.unwrap(); + }; + + join(client, h2).await; +} From f4dd7bf65c85d221d3c6c59e80ed4c622549f64e Mon Sep 17 00:00:00 2001 From: dswij Date: Fri, 19 Jan 2024 20:15:16 +0800 Subject: [PATCH 2/2] feat: not returning UnexpectedEof when client drop without close_notify --- src/proto/connection.rs | 20 +++++++++-- src/proto/streams/buffer.rs | 4 +++ src/proto/streams/streams.rs | 13 +++++++ tests/h2-tests/tests/client_request.rs | 50 ++++++++++++++++++++++++++ 4 files changed, 84 insertions(+), 3 deletions(-) diff --git a/src/proto/connection.rs b/src/proto/connection.rs index 5d6b9d2b..8627375a 100644 --- a/src/proto/connection.rs +++ b/src/proto/connection.rs @@ -461,13 +461,27 @@ where // active streams must be reset. // // TODO: Are I/O errors recoverable? - Err(Error::Io(e, inner)) => { - tracing::debug!(error = ?e, "Connection::poll; IO error"); - let e = Error::Io(e, inner); + Err(Error::Io(kind, inner)) => { + tracing::debug!(error = ?kind, "Connection::poll; IO error"); + let e = Error::Io(kind, inner); // Reset all active streams self.streams.handle_error(e.clone()); + // Some client implementations drop the connections without notifying its peer + // Attempting to read after the client dropped the connection results in UnexpectedEof + // If as a server, we don't have anything more to send, just close the connection + // without error + // + // See https://github.com/hyperium/hyper/issues/3427 + if self.streams.is_server() + && self.streams.is_buffer_empty() + && matches!(kind, io::ErrorKind::UnexpectedEof) + { + *self.state = State::Closed(Reason::NO_ERROR, Initiator::Library); + return Ok(()); + } + // Return the error Err(e) } diff --git a/src/proto/streams/buffer.rs b/src/proto/streams/buffer.rs index 2648a410..02d26506 100644 --- a/src/proto/streams/buffer.rs +++ b/src/proto/streams/buffer.rs @@ -29,6 +29,10 @@ impl Buffer { pub fn new() -> Self { Buffer { slab: Slab::new() } } + + pub fn is_empty(&self) -> bool { + self.slab.is_empty() + } } impl Deque { diff --git a/src/proto/streams/streams.rs b/src/proto/streams/streams.rs index f4b12c7b..fa8e6843 100644 --- a/src/proto/streams/streams.rs +++ b/src/proto/streams/streams.rs @@ -323,6 +323,14 @@ where } impl DynStreams<'_, B> { + pub fn is_buffer_empty(&self) -> bool { + self.send_buffer.is_empty() + } + + pub fn is_server(&self) -> bool { + self.peer.is_server() + } + pub fn recv_headers(&mut self, frame: frame::Headers) -> Result<(), Error> { let mut me = self.inner.lock().unwrap(); @@ -1509,6 +1517,11 @@ impl SendBuffer { let inner = Mutex::new(Buffer::new()); SendBuffer { inner } } + + pub fn is_empty(&self) -> bool { + let buf = self.inner.lock().unwrap(); + buf.is_empty() + } } // ===== impl Actions ===== diff --git a/tests/h2-tests/tests/client_request.rs b/tests/h2-tests/tests/client_request.rs index 3d285ce2..7bd223e3 100644 --- a/tests/h2-tests/tests/client_request.rs +++ b/tests/h2-tests/tests/client_request.rs @@ -1773,6 +1773,56 @@ async fn receive_settings_frame_twice_with_second_one_empty() { join(srv, h2).await; } +#[tokio::test] +async fn receive_settings_frame_twice_with_second_one_non_empty() { + h2_support::trace_init!(); + let (io, mut srv) = mock::new(); + + let srv = async move { + // Send the initial SETTINGS frame with MAX_CONCURRENT_STREAMS set to 42 + srv.send_frame(frames::settings().max_concurrent_streams(42)) + .await; + + // Handle the client's connection preface + srv.read_preface().await.unwrap(); + match srv.next().await { + Some(frame) => match frame.unwrap() { + h2::frame::Frame::Settings(_) => { + let ack = frame::Settings::ack(); + srv.send(ack.into()).await.unwrap(); + } + frame => { + panic!("unexpected frame: {:?}", frame); + } + }, + None => { + panic!("unexpected EOF"); + } + } + + // Should receive the ack for the server's initial SETTINGS frame + let frame = assert_settings!(srv.next().await.unwrap().unwrap()); + assert!(frame.is_ack()); + + // Send another SETTINGS frame with no MAX_CONCURRENT_STREAMS + // This should not update the max_concurrent_send_streams value that + // the client manages. + srv.send_frame(frames::settings().max_concurrent_streams(2024)) + .await; + }; + + let h2 = async move { + let (_client, h2) = client::handshake(io).await.unwrap(); + let mut h2 = std::pin::pin!(h2); + assert_eq!(h2.max_concurrent_send_streams(), usize::MAX); + h2.as_mut().await.unwrap(); + // The most-recently advertised value should be used + assert_eq!(h2.max_concurrent_send_streams(), 2024); + }; + + join(srv, h2).await; +} + #[tokio::test] async fn server_drop_connection_unexpectedly_return_unexpected_eof_err() { h2_support::trace_init!();