Skip to content

Commit

Permalink
Remove Sync bound from Body::wrap_stream (#2088)
Browse files Browse the repository at this point in the history
Co-authored-by: Sean McArthur <[email protected]>
  • Loading branch information
davidpdrsn and seanmonstar authored Jan 12, 2024
1 parent eeaece9 commit 837a58e
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 5 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ serde_urlencoded = "0.7.1"
tower-service = "0.3"
futures-core = { version = "0.3.0", default-features = false }
futures-util = { version = "0.3.0", default-features = false }
sync_wrapper = "0.1.2"

# Optional deps...

Expand Down
11 changes: 6 additions & 5 deletions src/async_impl/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use bytes::Bytes;
use futures_core::Stream;
use http_body::Body as HttpBody;
use pin_project_lite::pin_project;
use sync_wrapper::SyncWrapper;
#[cfg(feature = "stream")]
use tokio::fs::File;
use tokio::time::Sleep;
Expand Down Expand Up @@ -38,7 +39,7 @@ enum Inner {
pin_project! {
struct WrapStream<S> {
#[pin]
inner: S,
inner: SyncWrapper<S>,
}
}

Expand Down Expand Up @@ -82,7 +83,7 @@ impl Body {
#[cfg_attr(docsrs, doc(cfg(feature = "stream")))]
pub fn wrap_stream<S>(stream: S) -> Body
where
S: futures_core::stream::TryStream + Send + Sync + 'static,
S: futures_core::stream::TryStream + Send + 'static,
S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
Bytes: From<S::Ok>,
{
Expand All @@ -91,14 +92,14 @@ impl Body {

pub(crate) fn stream<S>(stream: S) -> Body
where
S: futures_core::stream::TryStream + Send + Sync + 'static,
S: futures_core::stream::TryStream + Send + 'static,
S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
Bytes: From<S::Ok>,
{
use futures_util::TryStreamExt;

let body = Box::pin(WrapStream {
inner: stream.map_ok(Bytes::from).map_err(Into::into),
inner: SyncWrapper::new(stream.map_ok(Bytes::from).map_err(Into::into)),
});
Body {
inner: Inner::Streaming {
Expand Down Expand Up @@ -312,7 +313,7 @@ where
self: Pin<&mut Self>,
cx: &mut Context,
) -> Poll<Option<Result<Self::Data, Self::Error>>> {
let item = futures_core::ready!(self.project().inner.poll_next(cx)?);
let item = futures_core::ready!(self.project().inner.get_pin_mut().poll_next(cx)?);

Poll::Ready(item.map(|val| Ok(val.into())))
}
Expand Down
3 changes: 3 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,9 @@ fn _assert_impls() {

assert_send::<Error>();
assert_sync::<Error>();

assert_send::<Body>();
assert_sync::<Body>();
}

if_hyper! {
Expand Down

0 comments on commit 837a58e

Please sign in to comment.