From 1672f9d884fa27339c3601062e7efeae15df7f21 Mon Sep 17 00:00:00 2001 From: Andrew Date: Sat, 30 Nov 2024 20:47:33 +0100 Subject: [PATCH] fuse gzip decoder stream in order to safely poll it after it has terminated and to not rely on implementation details of async-compression --- src/async_impl/decoder.rs | 36 ++++++++++++++++++++++++++++-------- 1 file changed, 28 insertions(+), 8 deletions(-) diff --git a/src/async_impl/decoder.rs b/src/async_impl/decoder.rs index 1fbb94b83..1967a2e7e 100644 --- a/src/async_impl/decoder.rs +++ b/src/async_impl/decoder.rs @@ -9,6 +9,14 @@ use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; +#[cfg(any( + feature = "gzip", + // feature = "zstd", + // feature = "brotli", + // feature = "deflate" +))] +use futures_util::stream::Fuse; + #[cfg(feature = "gzip")] use async_compression::tokio::bufread::GzipDecoder; @@ -108,7 +116,7 @@ enum Inner { /// A `Gzip` decoder will uncompress the gzipped response content before returning it. #[cfg(feature = "gzip")] - Gzip(Pin, BytesCodec>>>), + Gzip(Pin, BytesCodec>>>>), /// A `Brotli` decoder will uncompress the brotlied response content before returning it. #[cfg(feature = "brotli")] @@ -122,6 +130,12 @@ enum Inner { #[cfg(feature = "deflate")] Deflate(Pin, BytesCodec>>>), + // #[cfg(any( + // feature = "brotli", + // feature = "zstd", + // feature = "gzip", + // feature = "deflate" + // ))] /// A decoder that doesn't have a value yet. #[cfg(any( feature = "brotli", @@ -368,12 +382,15 @@ impl HttpBody for Decoder { match futures_core::ready!(Pin::new(&mut *decoder).poll_next(cx)) { Some(Ok(bytes)) => Poll::Ready(Some(Ok(Frame::data(bytes.freeze())))), Some(Err(err)) => Poll::Ready(Some(Err(crate::error::decode_io(err)))), - None => { // poll inner connection until EOF after gzip stream is finished - let gzip_decoder = decoder.get_mut(); + None => { + // poll inner connection until EOF after gzip stream is finished + let gzip_decoder = decoder.get_mut().get_mut(); let stream_reader = gzip_decoder.get_mut(); let peekable_io_stream = stream_reader.get_mut(); match futures_core::ready!(Pin::new(peekable_io_stream).poll_next(cx)) { - Some(Ok(bytes)) => Poll::Ready(Some(Err(crate::error::decode("there are extra bytes after body has been decompressed")))), + Some(Ok(_)) => Poll::Ready(Some(Err(crate::error::decode( + "there are extra bytes after body has been decompressed", + )))), Some(Err(err)) => Poll::Ready(Some(Err(crate::error::decode_io(err)))), None => Poll::Ready(None), } @@ -475,10 +492,13 @@ impl Future for Pending { BytesCodec::new(), ))))), #[cfg(feature = "gzip")] - DecoderType::Gzip => Poll::Ready(Ok(Inner::Gzip(Box::pin(FramedRead::new( - GzipDecoder::new(StreamReader::new(_body)), - BytesCodec::new(), - ))))), + DecoderType::Gzip => Poll::Ready(Ok(Inner::Gzip(Box::pin( + FramedRead::new( + GzipDecoder::new(StreamReader::new(_body)), + BytesCodec::new(), + ) + .fuse(), + )))), #[cfg(feature = "deflate")] DecoderType::Deflate => Poll::Ready(Ok(Inner::Deflate(Box::pin(FramedRead::new( ZlibDecoder::new(StreamReader::new(_body)),