Skip to content

Commit

Permalink
feat(wasm): allow streaming incoming body
Browse files Browse the repository at this point in the history
Signed-off-by: Brooks Townsend <[email protected]>

io::copy instead of streams

Signed-off-by: Brooks Townsend <[email protected]>
  • Loading branch information
brooksmtownsend committed Aug 22, 2024
1 parent b0c6e1b commit ff8a364
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 66 deletions.
2 changes: 1 addition & 1 deletion examples/wasm_component/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ crate-type = ["cdylib"]
[dependencies]
futures = "0.3.30"
reqwest = { version = "0.12.4", path = "../../", features = ["stream"] }
wasi = "0.13.2"
wasi = "=0.13.1" # For compatibility, pin to [email protected] bindings

[profile.release]
# Optimize for small code size
Expand Down
32 changes: 6 additions & 26 deletions examples/wasm_component/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use wasi::{
http::types::{Fields, IncomingRequest, OutgoingBody, OutgoingResponse, ResponseOutparam},
io::streams::{InputStream, OutputStream, StreamError},
use wasi::http::types::{
Fields, IncomingRequest, OutgoingBody, OutgoingResponse, ResponseOutparam,
};

#[allow(unused)]
Expand All @@ -15,14 +14,12 @@ impl wasi::exports::http::incoming_handler::Guest for ReqwestComponent {
.expect("should be able to get response body");
ResponseOutparam::set(response_out, Ok(response));

let response =
let mut response =
futures::executor::block_on(reqwest::Client::new().get("https://hyper.rs").send())
.expect("should get response bytes");
let incoming_body = response.bytes_stream().expect("should get incoming body");
let stream = incoming_body.stream().expect("should get bytes stream");
stream_input_to_output(
stream,
response_body
std::io::copy(
&mut response.bytes_stream().expect("should get incoming body"),
&mut response_body
.write()
.expect("should be able to write to response body"),
)
Expand All @@ -32,21 +29,4 @@ impl wasi::exports::http::incoming_handler::Guest for ReqwestComponent {
}
}

pub fn stream_input_to_output(data: InputStream, out: OutputStream) -> Result<(), StreamError> {
loop {
match out.blocking_splice(&data, u64::MAX) {
Ok(bytes_spliced) if bytes_spliced == 0 => return Ok(()),
Ok(_) => {}
Err(e) => match e {
StreamError::Closed => {
return Ok(());
}
StreamError::LastOperationFailed(e) => {
return Err(StreamError::LastOperationFailed(e));
}
},
}
}
}

wasi::http::proxy::export!(ReqwestComponent);
14 changes: 9 additions & 5 deletions src/wasm/component/client/future.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ use std::{

use futures_core::Future;
use wasi::{
self,
http::{
outgoing_handler::{FutureIncomingResponse, OutgoingRequest},
types::{OutgoingBody, OutputStream},
},
self,
http::{
outgoing_handler::{FutureIncomingResponse, OutgoingRequest},
types::{OutgoingBody, OutputStream},
},
};

use crate::{Body, Request, Response};
Expand Down Expand Up @@ -69,6 +69,10 @@ impl Future for ResponseFuture {
},
RequestState::Response(future) => {
if !future.subscribe().ready() {
// NOTE(brooksmtownsend): We shouldn't be waking here since we don't know that
// the future is ready to be polled again. Sleeping for a nanosecond appears to
// allow the future to be polled again without causing a busy loop.
std::thread::sleep(std::time::Duration::from_nanos(1));
cx.waker().wake_by_ref();
return Poll::Pending;
}
Expand Down
52 changes: 18 additions & 34 deletions src/wasm/component/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ pub struct Response {
// Boxed to save space (11 words to 1 word), and it's not accessed
// frequently internally.
url: Box<Url>,
// The incoming body must be persisted if streaming to keep the stream open
incoming_body: Option<wasi::http::types::IncomingBody>,
}

impl Response {
Expand All @@ -23,6 +25,7 @@ impl Response {
Response {
http: res,
url: Box::new(url),
incoming_body: None,
}
}

Expand Down Expand Up @@ -83,21 +86,9 @@ impl Response {

/// Get the response text.
pub async fn text(self) -> crate::Result<String> {
// let p = self
// .http
// .body()
// .text()
// .map_err(crate::error::wasm)
// .map_err(crate::error::decode)?;
// let js_val = super::promise::<wasm_bindgen::JsValue>(p)
// .await
// .map_err(crate::error::decode)?;
// if let Some(s) = js_val.as_string() {
// Ok(s)
// } else {
// Err(crate::error::decode("response.text isn't string"))
// }
Ok("str_resp".to_string())
self.bytes()
.await
.map(|s| String::from_utf8(s.to_vec()).map_err(crate::error::decode))?
}

/// Get the response as bytes
Expand All @@ -121,27 +112,20 @@ impl Response {
Ok(body.into())
}

/// Convert the response into a `Stream` of `Bytes` from the body.
/// Convert the response into a [`wasi::http::types::IncomingBody`] resource which can
/// then be used to stream the body.
#[cfg(feature = "stream")]
pub fn bytes_stream(self) -> impl futures_core::Stream<Item = crate::Result<Bytes>> {
let web_response = self.http.into_body();
let abort = self._abort;
let body = web_response
pub fn bytes_stream(&mut self) -> crate::Result<wasi::io::streams::InputStream> {
let body = self
.http
.body()
.expect("could not create wasm byte stream");
let body = wasm_streams::ReadableStream::from_raw(body.unchecked_into());
Box::pin(body.into_stream().map(move |buf_js| {
// Keep the abort guard alive as long as this stream is.
let _abort = &abort;
let buffer = Uint8Array::new(
&buf_js
.map_err(crate::error::wasm)
.map_err(crate::error::decode)?,
);
let mut bytes = vec![0; buffer.length() as usize];
buffer.copy_to(&mut bytes);
Ok(bytes.into())
}))
.consume()
.map_err(|_| crate::error::decode("failed to consume response body"))?;
let stream = body
.stream()
.map_err(|_| crate::error::decode("failed to stream response body"));
self.incoming_body = Some(body);
stream
}

// util methods
Expand Down

0 comments on commit ff8a364

Please sign in to comment.