Skip to content

Commit

Permalink
Add zstd support
Browse files Browse the repository at this point in the history
  • Loading branch information
paolobarbolini committed Jun 3, 2023
1 parent cec216b commit 7ba30e0
Show file tree
Hide file tree
Showing 8 changed files with 364 additions and 24 deletions.
15 changes: 7 additions & 8 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ jobs:
- "feat.: blocking"
- "feat.: gzip"
- "feat.: brotli"
- "feat.: zstd"
- "feat.: deflate"
- "feat.: json"
- "feat.: multipart"
Expand All @@ -101,25 +102,21 @@ jobs:
- name: windows / stable-x86_64-msvc
os: windows-latest
target: x86_64-pc-windows-msvc
features: "--features blocking,gzip,brotli,deflate,json,multipart"
features: "--features blocking,gzip,brotli,zstd,deflate,json,multipart"
- name: windows / stable-i686-msvc
os: windows-latest
target: i686-pc-windows-msvc
features: "--features blocking,gzip,brotli,deflate,json,multipart"
features: "--features blocking,gzip,brotli,zstd,deflate,json,multipart"
- name: windows / stable-x86_64-gnu
os: windows-latest
rust: stable-x86_64-pc-windows-gnu
target: x86_64-pc-windows-gnu
features: "--features blocking,gzip,brotli,deflate,json,multipart"
package_name: mingw-w64-x86_64-gcc
mingw64_path: "C:\\msys64\\mingw64\\bin"
features: "--features blocking,gzip,brotli,zstd,deflate,json,multipart"
- name: windows / stable-i686-gnu
os: windows-latest
rust: stable-i686-pc-windows-gnu
target: i686-pc-windows-gnu
features: "--features blocking,gzip,brotli,deflate,json,multipart"
package_name: mingw-w64-i686-gcc
mingw64_path: "C:\\msys64\\mingw32\\bin"
features: "--features blocking,gzip,brotli,zstd,deflate,json,multipart"

- name: "feat.: default-tls disabled"
features: "--no-default-features"
Expand All @@ -139,6 +136,8 @@ jobs:
features: "--features gzip"
- name: "feat.: brotli"
features: "--features brotli"
- name: "feat.: zstd"
features: "--features zstd"
- name: "feat.: deflate"
features: "--features deflate"
- name: "feat.: json"
Expand Down
8 changes: 8 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ gzip = ["async-compression", "async-compression/gzip", "tokio-util"]

brotli = ["async-compression", "async-compression/brotli", "tokio-util"]

zstd = ["async-compression", "async-compression/zstd", "tokio-util"]

deflate = ["async-compression", "async-compression/zlib", "tokio-util"]

json = ["serde_json"]
Expand Down Expand Up @@ -152,6 +154,7 @@ hyper = { version = "0.14", default-features = false, features = ["tcp", "stream
serde = { version = "1.0", features = ["derive"] }
libflate = "1.0"
brotli_crate = { package = "brotli", version = "3.3.0" }
zstd_crate = { package = "zstd", version = "0.12" }
doc-comment = "0.3"
tokio = { version = "1.0", default-features = false, features = ["macros", "rt-multi-thread"] }

Expand Down Expand Up @@ -239,6 +242,11 @@ name = "brotli"
path = "tests/brotli.rs"
required-features = ["brotli"]

[[test]]
name = "zstd"
path = "tests/zstd.rs"
required-features = ["zstd"]

[[test]]
name = "deflate"
path = "tests/deflate.rs"
Expand Down
40 changes: 40 additions & 0 deletions src/async_impl/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -787,6 +787,29 @@ impl ClientBuilder {
self
}

/// Enable auto zstd decompression by checking the `Content-Encoding` response header.
///
/// If auto zstd decompression is turned on:
///
/// - When sending a request and if the request's headers do not already contain
/// an `Accept-Encoding` **and** `Range` values, the `Accept-Encoding` header is set to `zstd`.
/// The request body is **not** automatically compressed.
/// - When receiving a response, if its headers contain a `Content-Encoding` value of
/// `zstd`, both `Content-Encoding` and `Content-Length` are removed from the
/// headers' set. The response body is automatically decompressed.
///
/// If the `zstd` feature is turned on, the default option is enabled.
///
/// # Optional
///
/// This requires the optional `zstd` feature to be enabled
#[cfg(feature = "zstd")]
#[cfg_attr(docsrs, doc(cfg(feature = "zstd")))]
pub fn zstd(mut self, enable: bool) -> ClientBuilder {
self.config.accepts.zstd = enable;
self
}

/// Enable auto deflate decompression by checking the `Content-Encoding` response header.
///
/// If auto deflate decompression is turned on:
Expand Down Expand Up @@ -844,6 +867,23 @@ impl ClientBuilder {
}
}

/// Disable auto response body zstd decompression.
///
/// This method exists even if the optional `zstd` feature is not enabled.
/// This can be used to ensure a `Client` doesn't use zstd decompression
/// even if another dependency were to enable the optional `zstd` feature.
pub fn no_zstd(self) -> ClientBuilder {
#[cfg(feature = "zstd")]
{
self.zstd(false)
}

#[cfg(not(feature = "zstd"))]
{
self
}
}

/// Disable auto response body deflate decompression.
///
/// This method exists even if the optional `deflate` feature is not enabled.
Expand Down
142 changes: 126 additions & 16 deletions src/async_impl/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ use async_compression::tokio::bufread::GzipDecoder;
#[cfg(feature = "brotli")]
use async_compression::tokio::bufread::BrotliDecoder;

#[cfg(feature = "zstd")]
use async_compression::tokio::bufread::ZstdDecoder;

#[cfg(feature = "deflate")]
use async_compression::tokio::bufread::ZlibDecoder;

Expand All @@ -18,9 +21,19 @@ use futures_util::stream::Peekable;
use http::HeaderMap;
use hyper::body::HttpBody;

#[cfg(any(feature = "gzip", feature = "brotli", feature = "deflate"))]
#[cfg(any(
feature = "gzip",
feature = "brotli",
feature = "zstd",
feature = "deflate"
))]
use tokio_util::codec::{BytesCodec, FramedRead};
#[cfg(any(feature = "gzip", feature = "brotli", feature = "deflate"))]
#[cfg(any(
feature = "gzip",
feature = "brotli",
feature = "zstd",
feature = "deflate"
))]
use tokio_util::io::StreamReader;

use super::super::Body;
Expand All @@ -32,6 +45,8 @@ pub(super) struct Accepts {
pub(super) gzip: bool,
#[cfg(feature = "brotli")]
pub(super) brotli: bool,
#[cfg(feature = "zstd")]
pub(super) zstd: bool,
#[cfg(feature = "deflate")]
pub(super) deflate: bool,
}
Expand All @@ -45,7 +60,12 @@ pub(crate) struct Decoder {

type PeekableIoStream = Peekable<IoStream>;

#[cfg(any(feature = "gzip", feature = "brotli", feature = "deflate"))]
#[cfg(any(
feature = "gzip",
feature = "zstd",
feature = "brotli",
feature = "deflate"
))]
type PeekableIoStreamReader = StreamReader<PeekableIoStream, Bytes>;

enum Inner {
Expand All @@ -60,12 +80,21 @@ enum Inner {
#[cfg(feature = "brotli")]
Brotli(Pin<Box<FramedRead<BrotliDecoder<PeekableIoStreamReader>, BytesCodec>>>),

/// A `Zstd` decoder will uncompress the zstd compressed response content before returning it.
#[cfg(feature = "zstd")]
Zstd(Pin<Box<FramedRead<ZstdDecoder<PeekableIoStreamReader>, BytesCodec>>>),

/// A `Deflate` decoder will uncompress the deflated response content before returning it.
#[cfg(feature = "deflate")]
Deflate(Pin<Box<FramedRead<ZlibDecoder<PeekableIoStreamReader>, BytesCodec>>>),

/// A decoder that doesn't have a value yet.
#[cfg(any(feature = "brotli", feature = "gzip", feature = "deflate"))]
#[cfg(any(
feature = "brotli",
feature = "zstd",
feature = "gzip",
feature = "deflate"
))]
Pending(Pin<Box<Pending>>),
}

Expand All @@ -79,6 +108,8 @@ enum DecoderType {
Gzip,
#[cfg(feature = "brotli")]
Brotli,
#[cfg(feature = "zstd")]
Zstd,
#[cfg(feature = "deflate")]
Deflate,
}
Expand Down Expand Up @@ -136,6 +167,21 @@ impl Decoder {
}
}

/// A zstd decoder.
///
/// This decoder will buffer and decompress chunks that are zstd compressed.
#[cfg(feature = "zstd")]
fn zstd(body: Body) -> Decoder {
use futures_util::StreamExt;

Decoder {
inner: Inner::Pending(Box::pin(Pending(
IoStream(body.into_stream()).peekable(),
DecoderType::Zstd,
))),
}
}

/// A deflate decoder.
///
/// This decoder will buffer and decompress chunks that are deflated.
Expand All @@ -151,7 +197,12 @@ impl Decoder {
}
}

#[cfg(any(feature = "brotli", feature = "gzip", feature = "deflate"))]
#[cfg(any(
feature = "brotli",
feature = "zstd",
feature = "gzip",
feature = "deflate"
))]
fn detect_encoding(headers: &mut HeaderMap, encoding_str: &str) -> bool {
use http::header::{CONTENT_ENCODING, CONTENT_LENGTH, TRANSFER_ENCODING};
use log::warn;
Expand Down Expand Up @@ -202,6 +253,13 @@ impl Decoder {
}
}

#[cfg(feature = "zstd")]
{
if _accepts.zstd && Decoder::detect_encoding(_headers, "zstd") {
return Decoder::zstd(body);
}
}

#[cfg(feature = "deflate")]
{
if _accepts.deflate && Decoder::detect_encoding(_headers, "deflate") {
Expand All @@ -219,7 +277,12 @@ impl Stream for Decoder {
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
// Do a read or poll for a pending decoder value.
match self.inner {
#[cfg(any(feature = "brotli", feature = "gzip", feature = "deflate"))]
#[cfg(any(
feature = "brotli",
feature = "zstd",
feature = "gzip",
feature = "deflate"
))]
Inner::Pending(ref mut future) => match Pin::new(future).poll(cx) {
Poll::Ready(Ok(inner)) => {
self.inner = inner;
Expand All @@ -245,6 +308,14 @@ impl Stream for Decoder {
None => Poll::Ready(None),
}
}
#[cfg(feature = "zstd")]
Inner::Zstd(ref mut decoder) => {
return match futures_core::ready!(Pin::new(decoder).poll_next(cx)) {
Some(Ok(bytes)) => Poll::Ready(Some(Ok(bytes.freeze()))),
Some(Err(err)) => Poll::Ready(Some(Err(crate::error::decode_io(err)))),
None => Poll::Ready(None),
};
}
#[cfg(feature = "deflate")]
Inner::Deflate(ref mut decoder) => {
match futures_core::ready!(Pin::new(decoder).poll_next(cx)) {
Expand Down Expand Up @@ -279,7 +350,12 @@ impl HttpBody for Decoder {
match self.inner {
Inner::PlainText(ref body) => HttpBody::size_hint(body),
// the rest are "unknown", so default
#[cfg(any(feature = "brotli", feature = "gzip", feature = "deflate"))]
#[cfg(any(
feature = "brotli",
feature = "zstd",
feature = "gzip",
feature = "deflate"
))]
_ => http_body::SizeHint::default(),
}
}
Expand Down Expand Up @@ -317,6 +393,11 @@ impl Future for Pending {
BrotliDecoder::new(StreamReader::new(_body)),
BytesCodec::new(),
))))),
#[cfg(feature = "zstd")]
DecoderType::Zstd => Poll::Ready(Ok(Inner::Zstd(Box::pin(FramedRead::new(
ZstdDecoder::new(StreamReader::new(_body)),
BytesCodec::new(),
))))),
#[cfg(feature = "gzip")]
DecoderType::Gzip => Poll::Ready(Ok(Inner::Gzip(Box::pin(FramedRead::new(
GzipDecoder::new(StreamReader::new(_body)),
Expand Down Expand Up @@ -352,21 +433,36 @@ impl Accepts {
gzip: false,
#[cfg(feature = "brotli")]
brotli: false,
#[cfg(feature = "zstd")]
zstd: false,
#[cfg(feature = "deflate")]
deflate: false,
}
}

pub(super) fn as_str(&self) -> Option<&'static str> {
match (self.is_gzip(), self.is_brotli(), self.is_deflate()) {
(true, true, true) => Some("gzip, br, deflate"),
(true, true, false) => Some("gzip, br"),
(true, false, true) => Some("gzip, deflate"),
(false, true, true) => Some("br, deflate"),
(true, false, false) => Some("gzip"),
(false, true, false) => Some("br"),
(false, false, true) => Some("deflate"),
(false, false, false) => None,
match (
self.is_gzip(),
self.is_brotli(),
self.is_zstd(),
self.is_deflate(),
) {
(true, true, true, true) => Some("gzip, br, zstd, deflate"),
(true, true, false, true) => Some("gzip, br, deflate"),
(true, true, true, false) => Some("gzip, br, zstd"),
(true, true, false, false) => Some("gzip, br"),
(true, false, true, true) => Some("gzip, zstd, deflate"),
(true, false, false, true) => Some("gzip, zstd, deflate"),
(false, true, true, true) => Some("br, zstd, deflate"),
(false, true, false, true) => Some("br, zstd, deflate"),
(true, false, true, false) => Some("gzip, zstd"),
(true, false, false, false) => Some("gzip"),
(false, true, true, false) => Some("br, zstd"),
(false, true, false, false) => Some("br"),
(false, false, true, true) => Some("zstd, deflate"),
(false, false, true, false) => Some("zstd"),
(false, false, false, true) => Some("deflate"),
(false, false, false, false) => None,
}
}

Expand Down Expand Up @@ -394,6 +490,18 @@ impl Accepts {
}
}

fn is_zstd(&self) -> bool {
#[cfg(feature = "zstd")]
{
self.zstd
}

#[cfg(not(feature = "zstd"))]
{
false
}
}

fn is_deflate(&self) -> bool {
#[cfg(feature = "deflate")]
{
Expand All @@ -414,6 +522,8 @@ impl Default for Accepts {
gzip: true,
#[cfg(feature = "brotli")]
brotli: true,
#[cfg(feature = "zstd")]
zstd: true,
#[cfg(feature = "deflate")]
deflate: true,
}
Expand Down
Loading

0 comments on commit 7ba30e0

Please sign in to comment.