Skip to content
This repository has been archived by the owner on Sep 25, 2019. It is now read-only.

Commit

Permalink
allow more than one body type on the response, static or streaming
Browse files Browse the repository at this point in the history
  • Loading branch information
jeromegn committed Oct 17, 2018
1 parent c0de42c commit b76880f
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 57 deletions.
7 changes: 5 additions & 2 deletions src/bin/server/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,8 +238,11 @@ impl Service for FlyServer {
parts.headers = res.headers;
parts.status = res.status;

if let Some(bytes) = res.bytes {
body = Body::wrap_stream(bytes.map_err(|_| RecvError {}));
if let Some(js_body) = res.body {
body = match js_body {
JsHttpResponseBody::Stream(s) => Body::wrap_stream(s.map_err(|_| RecvError {})),
JsHttpResponseBody::Static(b) => Body::from(b),
};
}

future::ok(Response::from_parts(parts, body))
Expand Down
121 changes: 66 additions & 55 deletions src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,17 @@ extern crate trust_dns as dns;
extern crate trust_dns_proto as dns_proto;
use self::dns::client::ClientHandle; // necessary for trait to be in scope

#[derive(Debug)]
pub enum JsHttpResponseBody {
Stream(mpsc::UnboundedReceiver<Vec<u8>>),
Static(Vec<u8>),
}

#[derive(Debug)]
pub struct JsHttpResponse {
pub headers: HeaderMap,
pub status: StatusCode,
pub bytes: Option<mpsc::UnboundedReceiver<Vec<u8>>>,
pub body: Option<JsHttpResponseBody>,
}

#[derive(Debug)]
Expand Down Expand Up @@ -150,7 +156,7 @@ pub struct Runtime {
timers: Mutex<HashMap<u32, oneshot::Sender<()>>>,
pub responses: Mutex<HashMap<u32, oneshot::Sender<JsHttpResponse>>>,
pub dns_responses: Mutex<HashMap<u32, oneshot::Sender<JsDnsResponse>>>,
pub bytes: Mutex<HashMap<u32, mpsc::UnboundedSender<Vec<u8>>>>,
pub streams: Mutex<HashMap<u32, mpsc::UnboundedSender<Vec<u8>>>>,
pub http_client: Client<HttpsConnector<HttpConnector>, Body>,
pub name: String,
}
Expand Down Expand Up @@ -187,7 +193,7 @@ impl Runtime {
timers: Mutex::new(HashMap::new()),
responses: Mutex::new(HashMap::new()),
dns_responses: Mutex::new(HashMap::new()),
bytes: Mutex::new(HashMap::new()),
streams: Mutex::new(HashMap::new()),
http_client: Client::builder().build(HttpsConnector::new(4).unwrap()),
name: name.unwrap_or("v8".to_string()),
});
Expand Down Expand Up @@ -721,7 +727,7 @@ fn op_cache_set(rt: &Runtime, base: &msg::Base, _raw: fly_buf) -> Box<Op> {

let (sender, recver) = mpsc::unbounded::<Vec<u8>>();
{
rt.bytes.lock().unwrap().insert(stream_id, sender);
rt.streams.lock().unwrap().insert(stream_id, sender);
}

{
Expand Down Expand Up @@ -947,14 +953,13 @@ fn op_file_request(rt: &Runtime, cmd_id: u32, url: &str) -> Box<Op> {
let file = fileerr.unwrap(); // should be safe.

let (tx, rx) = mpsc::unbounded::<Vec<u8>>();
let bytes_rx = Some(rx);
let mut bytes = from_c(rtptr2.0).bytes.lock().unwrap();
bytes.insert(req_id, tx);
let mut stream = from_c(rtptr2.0).streams.lock().unwrap();
stream.insert(req_id, tx);

if let Err(_) = p.send(Ok(JsHttpResponse {
headers: HeaderMap::new(),
status: StatusCode::OK,
bytes: bytes_rx,
body: Some(JsHttpResponseBody::Stream(rx)),
})) {
error!("error sending http response");
return Ok(());
Expand Down Expand Up @@ -1052,13 +1057,13 @@ fn op_file_request(rt: &Runtime, cmd_id: u32, url: &str) -> Box<Op> {
}
let read_dir = read_dir_err.unwrap();
let (tx, rx) = mpsc::unbounded::<Vec<u8>>();
let mut bytes = from_c(rtptr2.0).bytes.lock().unwrap();
bytes.insert(req_id, tx);
let mut streams = from_c(rtptr2.0).streams.lock().unwrap();
streams.insert(req_id, tx);

if let Err(_) = p.send(Ok(JsHttpResponse {
headers: HeaderMap::new(),
status: StatusCode::OK,
bytes: Some(rx),
body: Some(JsHttpResponseBody::Stream(rx)),
})) {
error!("error sending http response");
return Ok(());
Expand Down Expand Up @@ -1182,7 +1187,7 @@ fn op_file_request(rt: &Runtime, cmd_id: u32, url: &str) -> Box<Op> {
id: req_id,
headers: Some(res_headers),
status: res.status.as_u16(),
has_body: res.bytes.is_some(),
has_body: res.body.is_some(),
..Default::default()
},
);
Expand Down Expand Up @@ -1519,18 +1524,18 @@ fn op_http_request(rt: &Runtime, base: &msg::Base, _raw: fly_buf) -> Box<Op> {

let (parts, mut body) = res.into_parts();

let mut bytes_rx: Option<mpsc::UnboundedReceiver<Vec<u8>>> = None;
let mut stream_rx: Option<JsHttpResponseBody> = None;
if !body.is_end_stream() {
let (tx, rx) = mpsc::unbounded::<Vec<u8>>();
bytes_rx = Some(rx);
let mut bytes = from_c(rtptr2.0).bytes.lock().unwrap();
bytes.insert(req_id, tx);
stream_rx = Some(JsHttpResponseBody::Stream(rx));
let mut streams = from_c(rtptr2.0).streams.lock().unwrap();
streams.insert(req_id, tx);
}

if let Err(_) = p.send(Ok(JsHttpResponse {
headers: parts.headers,
status: parts.status,
bytes: bytes_rx,
body: stream_rx,
})) {
error!("error sending http response");
return Ok(());
Expand Down Expand Up @@ -1618,7 +1623,7 @@ fn op_http_request(rt: &Runtime, base: &msg::Base, _raw: fly_buf) -> Box<Op> {
id: req_id,
headers: Some(res_headers),
status: res.status.as_u16(),
has_body: res.bytes.is_some(),
has_body: res.body.is_some(),
..Default::default()
},
);
Expand Down Expand Up @@ -1674,16 +1679,22 @@ fn op_http_response(rt: &Runtime, base: &msg::Base, raw: fly_buf) -> Box<Op> {
}
}

let mut chunk_recver: Option<mpsc::UnboundedReceiver<Vec<u8>>> = None;
let mut body: Option<JsHttpResponseBody> = None;
let has_body = msg.has_body();
if has_body {
debug!("http response will have a body");
let (sender, recver) = mpsc::unbounded::<Vec<u8>>();
{
let mut bytes = rt.bytes.lock().unwrap();
bytes.insert(req_id, sender);
if raw.data_len == 0 {
debug!("http response will have a streaming body");
let (sender, recver) = mpsc::unbounded::<Vec<u8>>();
{
let mut streams = rt.streams.lock().unwrap();
streams.insert(req_id, sender);
}
body = Some(JsHttpResponseBody::Stream(recver));
} else {
body = Some(JsHttpResponseBody::Static(
unsafe { slice::from_raw_parts(raw.data_ptr, raw.data_len) }.to_vec(),
));
}
chunk_recver = Some(recver);
}

let mut responses = rt.responses.lock().unwrap();
Expand All @@ -1692,40 +1703,40 @@ fn op_http_response(rt: &Runtime, base: &msg::Base, raw: fly_buf) -> Box<Op> {
if let Err(_) = sender.send(JsHttpResponse {
headers: headers,
status: status,
bytes: chunk_recver,
body: body,
}) {
return odd_future("error sending http response".to_string().into());
}
}
None => return odd_future("no response receiver!".to_string().into()),
};

if raw.data_len > 0 {
let rtptr = rt.ptr;
let spawnres = rt
.rt
.lock()
.unwrap()
.spawn(future::lazy(move || -> Result<(), ()> {
let mut bytes = rtptr.to_runtime().bytes.lock().unwrap();
match bytes.remove(&req_id) {
Some(sender) => {
debug!("sending raw bytes! {}", raw.data_len);
let v = unsafe { slice::from_raw_parts(raw.data_ptr, raw.data_len) }.to_vec();

if let Err(err) = sender.unbounded_send(v) {
error!("could not send body: {}", err);
}
}
None => error!("no bytes sender for req id: {}", req_id),
};

Ok(())
}));
if let Err(_err) = spawnres {
return odd_future("error using static body".to_string().into());
}
}
// if raw.data_len > 0 {
// let rtptr = rt.ptr;
// let spawnres = rt
// .rt
// .lock()
// .unwrap()
// .spawn(future::lazy(move || -> Result<(), ()> {
// let mut streams = rtptr.to_runtime().streams.lock().unwrap();
// match streams.remove(&req_id) {
// Some(sender) => {
// debug!("sending raw bytes! {}", raw.data_len);
// let v = ;

// if let Err(err) = sender.unbounded_send(v) {
// error!("could not send body: {}", err);
// }
// }
// None => error!("no bytes sender for req id: {}", req_id),
// };

// Ok(())
// }));
// if let Err(_err) = spawnres {
// return odd_future("error using static body".to_string().into());
// }
// }

ok_future(None)
}
Expand Down Expand Up @@ -1909,9 +1920,9 @@ fn op_stream_chunk(rt: &Runtime, base: &msg::Base, raw: fly_buf) -> Box<Op> {
let msg = base.msg_as_stream_chunk().unwrap();
let stream_id = msg.id();

let mut bytes = rt.bytes.lock().unwrap();
let mut streams = rt.streams.lock().unwrap();
if raw.data_len > 0 {
match bytes.get_mut(&stream_id) {
match streams.get_mut(&stream_id) {
Some(sender) => {
let bytes = unsafe { slice::from_raw_parts(raw.data_ptr, raw.data_len) }.to_vec();
match sender.unbounded_send(bytes.to_vec()) {
Expand All @@ -1923,7 +1934,7 @@ fn op_stream_chunk(rt: &Runtime, base: &msg::Base, raw: fly_buf) -> Box<Op> {
};
}
if msg.done() {
bytes.remove(&stream_id);
streams.remove(&stream_id);
}

ok_future(None)
Expand Down

0 comments on commit b76880f

Please sign in to comment.