Skip to content

Commit

Permalink
fuse gzip decoder stream in order to safely poll it after it has term…
Browse files Browse the repository at this point in the history
…inated and to not rely on implementation details of async-compression
  • Loading branch information
Andrey36652 committed Nov 30, 2024
1 parent 925f147 commit 1672f9d
Showing 1 changed file with 28 additions and 8 deletions.
36 changes: 28 additions & 8 deletions src/async_impl/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,14 @@ use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};

#[cfg(any(
feature = "gzip",
// feature = "zstd",
// feature = "brotli",
// feature = "deflate"
))]
use futures_util::stream::Fuse;

#[cfg(feature = "gzip")]
use async_compression::tokio::bufread::GzipDecoder;

Expand Down Expand Up @@ -108,7 +116,7 @@ enum Inner {

/// A `Gzip` decoder will uncompress the gzipped response content before returning it.
#[cfg(feature = "gzip")]
Gzip(Pin<Box<FramedRead<GzipDecoder<PeekableIoStreamReader>, BytesCodec>>>),
Gzip(Pin<Box<Fuse<FramedRead<GzipDecoder<PeekableIoStreamReader>, BytesCodec>>>>),

/// A `Brotli` decoder will uncompress the brotlied response content before returning it.
#[cfg(feature = "brotli")]
Expand All @@ -122,6 +130,12 @@ enum Inner {
#[cfg(feature = "deflate")]
Deflate(Pin<Box<FramedRead<ZlibDecoder<PeekableIoStreamReader>, BytesCodec>>>),

// #[cfg(any(
// feature = "brotli",
// feature = "zstd",
// feature = "gzip",
// feature = "deflate"
// ))]
/// A decoder that doesn't have a value yet.
#[cfg(any(
feature = "brotli",
Expand Down Expand Up @@ -368,12 +382,15 @@ impl HttpBody for Decoder {
match futures_core::ready!(Pin::new(&mut *decoder).poll_next(cx)) {
Some(Ok(bytes)) => Poll::Ready(Some(Ok(Frame::data(bytes.freeze())))),
Some(Err(err)) => Poll::Ready(Some(Err(crate::error::decode_io(err)))),
None => { // poll inner connection until EOF after gzip stream is finished
let gzip_decoder = decoder.get_mut();
None => {
// poll inner connection until EOF after gzip stream is finished
let gzip_decoder = decoder.get_mut().get_mut();
let stream_reader = gzip_decoder.get_mut();
let peekable_io_stream = stream_reader.get_mut();
match futures_core::ready!(Pin::new(peekable_io_stream).poll_next(cx)) {
Some(Ok(bytes)) => Poll::Ready(Some(Err(crate::error::decode("there are extra bytes after body has been decompressed")))),
Some(Ok(_)) => Poll::Ready(Some(Err(crate::error::decode(
"there are extra bytes after body has been decompressed",
)))),
Some(Err(err)) => Poll::Ready(Some(Err(crate::error::decode_io(err)))),
None => Poll::Ready(None),
}
Expand Down Expand Up @@ -475,10 +492,13 @@ impl Future for Pending {
BytesCodec::new(),
))))),
#[cfg(feature = "gzip")]
DecoderType::Gzip => Poll::Ready(Ok(Inner::Gzip(Box::pin(FramedRead::new(
GzipDecoder::new(StreamReader::new(_body)),
BytesCodec::new(),
))))),
DecoderType::Gzip => Poll::Ready(Ok(Inner::Gzip(Box::pin(
FramedRead::new(
GzipDecoder::new(StreamReader::new(_body)),
BytesCodec::new(),
)
.fuse(),
)))),
#[cfg(feature = "deflate")]
DecoderType::Deflate => Poll::Ready(Ok(Inner::Deflate(Box::pin(FramedRead::new(
ZlibDecoder::new(StreamReader::new(_body)),
Expand Down

0 comments on commit 1672f9d

Please sign in to comment.