From 33aa9d76484826a5069ff83adbc33f96fa5ea239 Mon Sep 17 00:00:00 2001 From: Brooks Townsend Date: Thu, 15 Aug 2024 13:03:25 -0400 Subject: [PATCH] wip: feat(wasm): allow streaming incoming body Signed-off-by: Brooks Townsend --- examples/wasm_component/src/lib.rs | 7 +++--- src/wasm/component/client/future.rs | 14 +++++++----- src/wasm/component/response.rs | 35 +++++++++++++---------------- 3 files changed, 28 insertions(+), 28 deletions(-) diff --git a/examples/wasm_component/src/lib.rs b/examples/wasm_component/src/lib.rs index 8c013d88e..bd0e92647 100644 --- a/examples/wasm_component/src/lib.rs +++ b/examples/wasm_component/src/lib.rs @@ -15,13 +15,12 @@ impl wasi::exports::http::incoming_handler::Guest for ReqwestComponent { .expect("should be able to get response body"); ResponseOutparam::set(response_out, Ok(response)); - let response = + let mut response = futures::executor::block_on(reqwest::Client::new().get("https://hyper.rs").send()) .expect("should get response bytes"); - let incoming_body = response.bytes_stream().expect("should get incoming body"); - let stream = incoming_body.stream().expect("should get bytes stream"); + let body_stream = response.bytes_stream().expect("should get incoming body"); stream_input_to_output( - stream, + body_stream, response_body .write() .expect("should be able to write to response body"), diff --git a/src/wasm/component/client/future.rs b/src/wasm/component/client/future.rs index 5dfb0d3e6..8fb67ee02 100644 --- a/src/wasm/component/client/future.rs +++ b/src/wasm/component/client/future.rs @@ -5,11 +5,11 @@ use std::{ use futures_core::Future; use wasi::{ - self, - http::{ - outgoing_handler::{FutureIncomingResponse, OutgoingRequest}, - types::{OutgoingBody, OutputStream}, - }, + self, + http::{ + outgoing_handler::{FutureIncomingResponse, OutgoingRequest}, + types::{OutgoingBody, OutputStream}, + }, }; use crate::{Body, Request, Response}; @@ -69,6 +69,10 @@ impl Future for ResponseFuture { }, RequestState::Response(future) => { if !future.subscribe().ready() { + // NOTE(brooksmtownsend): We shouldn't be waking here since we don't know that + // the future is ready to be polled again. Sleeping for a nanosecond appears to + // allow the future to be polled again without causing a busy loop. + std::thread::sleep(std::time::Duration::from_nanos(1)); cx.waker().wake_by_ref(); return Poll::Pending; } diff --git a/src/wasm/component/response.rs b/src/wasm/component/response.rs index 07534a470..0516d237c 100644 --- a/src/wasm/component/response.rs +++ b/src/wasm/component/response.rs @@ -13,6 +13,8 @@ pub struct Response { // Boxed to save space (11 words to 1 word), and it's not accessed // frequently internally. url: Box, + // The incoming body must be persisted if streaming to keep the stream open + incoming_body: Option, } impl Response { @@ -23,6 +25,7 @@ impl Response { Response { http: res, url: Box::new(url), + incoming_body: None, } } @@ -121,27 +124,21 @@ impl Response { Ok(body.into()) } - /// Convert the response into a `Stream` of `Bytes` from the body. + /// Convert the response into a [`wasi::http::types::IncomingBody`] resource which can + /// then be used to stream the body. #[cfg(feature = "stream")] - pub fn bytes_stream(self) -> impl futures_core::Stream> { - let web_response = self.http.into_body(); - let abort = self._abort; - let body = web_response + pub fn bytes_stream(&mut self) -> crate::Result { + let body = self + .http .body() - .expect("could not create wasm byte stream"); - let body = wasm_streams::ReadableStream::from_raw(body.unchecked_into()); - Box::pin(body.into_stream().map(move |buf_js| { - // Keep the abort guard alive as long as this stream is. - let _abort = &abort; - let buffer = Uint8Array::new( - &buf_js - .map_err(crate::error::wasm) - .map_err(crate::error::decode)?, - ); - let mut bytes = vec![0; buffer.length() as usize]; - buffer.copy_to(&mut bytes); - Ok(bytes.into()) - })) + .consume() + .map_err(|_| crate::error::decode("failed to consume response body"))?; + + let stream = body + .stream() + .map_err(|_| crate::error::decode("failed to stream response body")); + self.incoming_body = Some(body); + stream } // util methods