Skip to content
This repository has been archived by the owner on Sep 25, 2019. It is now read-only.

Commit

Permalink
rename functions, change message format to be clearer, allow sending …
Browse files Browse the repository at this point in the history
…Uint8Array directly as a body
  • Loading branch information
jeromegn committed Oct 16, 2018
1 parent 0fd1817 commit 7d19376
Show file tree
Hide file tree
Showing 8 changed files with 65 additions and 59 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ The Fly runtime was originally derived from [deno](denoland/deno) and shares som
- [ ] Actually use the config hostnames and correct app
- [x] Spawn multiple runtime instances for the same app (n cpus? configurable?)
- [ ] Add `Server` header for Fly and current version (maybe?)
- [ ] Fetch request bodies
- [ ] Observability
- [ ] Exception reporting (via Sentry probably)
- [ ] Metrics (prometheus)
Expand Down
8 changes: 7 additions & 1 deletion hello-world.js
Original file line number Diff line number Diff line change
@@ -1,13 +1,19 @@
const helloWorldStr = "Hello World";
const helloWorld = new TextEncoder().encode(helloWorldStr);

addEventListener("fetch", function (event) {
const req = event.request;
// console.log("req url:", event.request.url);
let url = new URL(req.url)
if (url.pathname.endsWith("echo"))
event.respondWith(new Response(req.body, { headers: { foo: "bar" } }))

else if (url.pathname.endsWith("null"))
event.respondWith(new Response(null, { headers: {} }))

else if (url.pathname.endsWith("hello-world"))
event.respondWith(new Response("Hello World"))
event.respondWith(new Response(helloWorld))

else if (url.pathname == "/kitchensink") {
const coll = flyData.collection("testing")
coll.put("id", { foo: "bar" }).then(b => {
Expand Down
6 changes: 3 additions & 3 deletions msg.fbs
Original file line number Diff line number Diff line change
Expand Up @@ -122,14 +122,14 @@ table HttpRequest {
method: HttpMethod;
url: string;
headers: [HttpHeader];
body: bool;
has_body: bool;
}

table HttpResponse {
id: uint;
headers: [HttpHeader];
status: ushort;
body: bool;
has_body: bool;
}

table HttpRequestStart {
Expand All @@ -140,7 +140,7 @@ table FetchHttpResponse {
id: uint;
headers: [HttpHeader];
status: ushort;
body: bool;
has_body: bool;
}

table CacheGet {
Expand Down
2 changes: 1 addition & 1 deletion src/bin/server/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ impl Service for FlyServer {
method: req_method,
url: Some(req_url),
headers: Some(req_headers),
body: !body.is_end_stream(),
has_body: !body.is_end_stream(),
..Default::default()
},
);
Expand Down
84 changes: 42 additions & 42 deletions src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -338,9 +338,9 @@ pub type Buf = Option<Box<[u8]>>;

// JS promises in Deno map onto a specific Future
// which yields either a DenoError or a byte array.
type Op = Future<Item = Buf, Error = FlyError> + Send;
pub type Op = Future<Item = Buf, Error = FlyError> + Send;

type Handler = fn(rt: &Runtime, base: &msg::Base, raw_buf: fly_buf) -> Box<Op>;
pub type Handler = fn(rt: &Runtime, base: &msg::Base, raw_buf: fly_buf) -> Box<Op>;

use std::slice;

Expand All @@ -352,22 +352,22 @@ pub extern "C" fn msg_from_js(raw: *const js_runtime, buf: fly_buf, raw_buf: fly
let cmd_id = base.cmd_id();
// println!("msg id {}", cmd_id);
let handler: Handler = match msg_type {
msg::Any::TimerStart => handle_timer_start,
msg::Any::TimerClear => handle_timer_clear,
msg::Any::HttpRequest => handle_http_request,
msg::Any::HttpResponse => handle_http_response,
msg::Any::StreamChunk => handle_stream_chunk,
msg::Any::CacheGet => handle_cache_get,
msg::Any::CacheSet => handle_cache_set,
msg::Any::CryptoDigest => handle_crypto_digest,
msg::Any::CryptoRandomValues => handle_crypto_random_values,
msg::Any::SourceMap => handle_source_map,
msg::Any::DataPut => handle_data_put,
msg::Any::DataGet => handle_data_get,
msg::Any::DataDel => handle_data_del,
msg::Any::DataDropCollection => handle_data_drop_coll,
msg::Any::DnsQuery => handle_dns_query,
msg::Any::DnsResponse => handle_dns_response,
msg::Any::TimerStart => op_timer_start,
msg::Any::TimerClear => op_timer_clear,
msg::Any::HttpRequest => op_http_request,
msg::Any::HttpResponse => op_http_response,
msg::Any::StreamChunk => op_stream_chunk,
msg::Any::CacheGet => op_cache_get,
msg::Any::CacheSet => op_cache_set,
msg::Any::CryptoDigest => op_crypto_digest,
msg::Any::CryptoRandomValues => op_crypto_random_values,
msg::Any::SourceMap => op_source_map,
msg::Any::DataPut => op_data_put,
msg::Any::DataGet => op_data_get,
msg::Any::DataDel => op_data_del,
msg::Any::DataDropCollection => op_data_drop_coll,
msg::Any::DnsQuery => op_dns_query,
msg::Any::DnsResponse => op_dns_response,
_ => unimplemented!(),
};

Expand Down Expand Up @@ -478,8 +478,8 @@ pub extern "C" fn print_from_js(raw: *const js_runtime, lvl: i8, msg: *const lib
log!(lvl, "console/{}: {}", &rt.name, &msg);
}

fn handle_timer_start(rt: &Runtime, base: &msg::Base, _raw: fly_buf) -> Box<Op> {
println!("handle_timer_start");
fn op_timer_start(rt: &Runtime, base: &msg::Base, _raw: fly_buf) -> Box<Op> {
println!("op_timer_start");
let msg = base.msg_as_timer_start().unwrap();
let cmd_id = base.cmd_id();
let timer_id = msg.id();
Expand Down Expand Up @@ -542,14 +542,14 @@ fn remove_timer(ptr: JsRuntime, timer_id: u32) {
rt.timers.lock().unwrap().remove(&timer_id);
}

fn handle_timer_clear(rt: &Runtime, base: &msg::Base, _raw: fly_buf) -> Box<Op> {
fn op_timer_clear(rt: &Runtime, base: &msg::Base, _raw: fly_buf) -> Box<Op> {
let msg = base.msg_as_timer_clear().unwrap();
println!("handle_timer_clear");
println!("op_timer_clear");
remove_timer(rt.ptr, msg.id());
ok_future(None)
}

fn handle_source_map(_rt: &Runtime, base: &msg::Base, _raw: fly_buf) -> Box<Op> {
fn op_source_map(_rt: &Runtime, base: &msg::Base, _raw: fly_buf) -> Box<Op> {
let cmd_id = base.cmd_id();
let msg = base.msg_as_source_map().unwrap();

Expand Down Expand Up @@ -629,7 +629,7 @@ fn handle_source_map(_rt: &Runtime, base: &msg::Base, _raw: fly_buf) -> Box<Op>
)
}

fn handle_crypto_random_values(_rt: &Runtime, base: &msg::Base, _raw: fly_buf) -> Box<Op> {
fn op_crypto_random_values(_rt: &Runtime, base: &msg::Base, _raw: fly_buf) -> Box<Op> {
let cmd_id = base.cmd_id();
let msg = base.msg_as_crypto_random_values().unwrap();

Expand Down Expand Up @@ -661,7 +661,7 @@ fn handle_crypto_random_values(_rt: &Runtime, base: &msg::Base, _raw: fly_buf) -
))
}

fn handle_crypto_digest(_rt: &Runtime, base: &msg::Base, raw: fly_buf) -> Box<Op> {
fn op_crypto_digest(_rt: &Runtime, base: &msg::Base, raw: fly_buf) -> Box<Op> {
let cmd_id = base.cmd_id();
let msg = base.msg_as_crypto_digest().unwrap();

Expand Down Expand Up @@ -709,7 +709,7 @@ use super::NEXT_EVENT_ID;
use std::str;

use std::ops::Deref;
fn handle_cache_set(rt: &Runtime, base: &msg::Base, _raw: fly_buf) -> Box<Op> {
fn op_cache_set(rt: &Runtime, base: &msg::Base, _raw: fly_buf) -> Box<Op> {
println!("CACHE SET");
let cmd_id = base.cmd_id();
let msg = base.msg_as_cache_set().unwrap();
Expand Down Expand Up @@ -767,7 +767,7 @@ fn handle_cache_set(rt: &Runtime, base: &msg::Base, _raw: fly_buf) -> Box<Op> {
))
}

fn handle_cache_get(rt: &Runtime, base: &msg::Base, _raw: fly_buf) -> Box<Op> {
fn op_cache_get(rt: &Runtime, base: &msg::Base, _raw: fly_buf) -> Box<Op> {
let cmd_id = base.cmd_id();
let msg = base.msg_as_cache_get().unwrap();

Expand Down Expand Up @@ -914,7 +914,7 @@ fn handle_cache_get(rt: &Runtime, base: &msg::Base, _raw: fly_buf) -> Box<Op> {
// ))
}

fn handle_file_request(rt: &Runtime, cmd_id: u32, url: &str) -> Box<Op> {
fn op_file_request(rt: &Runtime, cmd_id: u32, url: &str) -> Box<Op> {
let req_id = NEXT_EVENT_ID.fetch_add(1, Ordering::SeqCst) as u32;
let rtptr = rt.ptr;

Expand Down Expand Up @@ -1180,7 +1180,7 @@ fn handle_file_request(rt: &Runtime, cmd_id: u32, url: &str) -> Box<Op> {
id: req_id,
headers: Some(res_headers),
status: res.status.as_u16(),
body: res.bytes.is_some(),
has_body: res.bytes.is_some(),
..Default::default()
},
);
Expand Down Expand Up @@ -1209,7 +1209,7 @@ fn handle_file_request(rt: &Runtime, cmd_id: u32, url: &str) -> Box<Op> {
Box::new(fut2)
}

fn handle_dns_query(_rt: &Runtime, base: &msg::Base, _raw: fly_buf) -> Box<Op> {
fn op_dns_query(_rt: &Runtime, base: &msg::Base, _raw: fly_buf) -> Box<Op> {
println!("handle dns");
let cmd_id = base.cmd_id();
let msg = base.msg_as_dns_query().unwrap();
Expand Down Expand Up @@ -1453,20 +1453,20 @@ fn handle_dns_query(_rt: &Runtime, base: &msg::Base, _raw: fly_buf) -> Box<Op> {
)
}

fn handle_http_request(rt: &Runtime, base: &msg::Base, _raw: fly_buf) -> Box<Op> {
fn op_http_request(rt: &Runtime, base: &msg::Base, _raw: fly_buf) -> Box<Op> {
let cmd_id = base.cmd_id();
let msg = base.msg_as_http_request().unwrap();

let url = msg.url().unwrap();
if url.starts_with("file://") {
return handle_file_request(rt, cmd_id, url);
return op_file_request(rt, cmd_id, url);
}

let req_id = NEXT_EVENT_ID.fetch_add(1, Ordering::SeqCst) as u32;
let rtptr = rt.ptr;

let req_body: Body;
if msg.body() {
if msg.has_body() {
unimplemented!();
} else {
req_body = Body::empty();
Expand Down Expand Up @@ -1616,7 +1616,7 @@ fn handle_http_request(rt: &Runtime, base: &msg::Base, _raw: fly_buf) -> Box<Op>
id: req_id,
headers: Some(res_headers),
status: res.status.as_u16(),
body: res.bytes.is_some(),
has_body: res.bytes.is_some(),
..Default::default()
},
);
Expand Down Expand Up @@ -1650,7 +1650,7 @@ fn handle_http_request(rt: &Runtime, base: &msg::Base, _raw: fly_buf) -> Box<Op>
// ))
}

fn handle_http_response(rt: &Runtime, base: &msg::Base, _raw: fly_buf) -> Box<Op> {
fn op_http_response(rt: &Runtime, base: &msg::Base, _raw: fly_buf) -> Box<Op> {
debug!("handling http response");
let msg = base.msg_as_http_response().unwrap();
let req_id = msg.id();
Expand All @@ -1673,7 +1673,7 @@ fn handle_http_response(rt: &Runtime, base: &msg::Base, _raw: fly_buf) -> Box<Op
}

let mut chunk_recver: Option<mpsc::UnboundedReceiver<Vec<u8>>> = None;
if msg.body() {
if msg.has_body() {
debug!("http response will have a body");
let (sender, recver) = mpsc::unbounded::<Vec<u8>>();
{
Expand All @@ -1700,7 +1700,7 @@ fn handle_http_response(rt: &Runtime, base: &msg::Base, _raw: fly_buf) -> Box<Op
ok_future(None)
}

fn handle_dns_response(rt: &Runtime, base: &msg::Base, _raw: fly_buf) -> Box<Op> {
fn op_dns_response(rt: &Runtime, base: &msg::Base, _raw: fly_buf) -> Box<Op> {
let msg = base.msg_as_dns_response().unwrap();
let req_id = msg.id();

Expand Down Expand Up @@ -1874,7 +1874,7 @@ fn handle_dns_response(rt: &Runtime, base: &msg::Base, _raw: fly_buf) -> Box<Op>
ok_future(None)
}

fn handle_stream_chunk(rt: &Runtime, base: &msg::Base, raw: fly_buf) -> Box<Op> {
fn op_stream_chunk(rt: &Runtime, base: &msg::Base, raw: fly_buf) -> Box<Op> {
debug!("handle stream chunk {:?}", raw);
let msg = base.msg_as_stream_chunk().unwrap();
let stream_id = msg.id();
Expand Down Expand Up @@ -1917,7 +1917,7 @@ where
(delay_task, cancel_tx)
}

fn handle_data_put(_rt: &Runtime, base: &msg::Base, _raw: fly_buf) -> Box<Op> {
fn op_data_put(_rt: &Runtime, base: &msg::Base, _raw: fly_buf) -> Box<Op> {
let msg = base.msg_as_data_put().unwrap();
let coll = msg.collection().unwrap().to_string();
let key = msg.key().unwrap().to_string();
Expand All @@ -1942,7 +1942,7 @@ fn handle_data_put(_rt: &Runtime, base: &msg::Base, _raw: fly_buf) -> Box<Op> {
}))
}

fn handle_data_get(_rt: &Runtime, base: &msg::Base, _raw: fly_buf) -> Box<Op> {
fn op_data_get(_rt: &Runtime, base: &msg::Base, _raw: fly_buf) -> Box<Op> {
let cmd_id = base.cmd_id();
let msg = base.msg_as_data_get().unwrap();
let coll = msg.collection().unwrap().to_string();
Expand Down Expand Up @@ -1987,7 +1987,7 @@ fn handle_data_get(_rt: &Runtime, base: &msg::Base, _raw: fly_buf) -> Box<Op> {
}))
}

fn handle_data_del(_rt: &Runtime, base: &msg::Base, _raw: fly_buf) -> Box<Op> {
fn op_data_del(_rt: &Runtime, base: &msg::Base, _raw: fly_buf) -> Box<Op> {
let msg = base.msg_as_data_del().unwrap();
let coll = msg.collection().unwrap().to_string();
let key = msg.key().unwrap().to_string();
Expand All @@ -2008,7 +2008,7 @@ fn handle_data_del(_rt: &Runtime, base: &msg::Base, _raw: fly_buf) -> Box<Op> {
}))
}

fn handle_data_drop_coll(_rt: &Runtime, base: &msg::Base, _raw: fly_buf) -> Box<Op> {
fn op_data_drop_coll(_rt: &Runtime, base: &msg::Base, _raw: fly_buf) -> Box<Op> {
let msg = base.msg_as_data_del().unwrap();
let coll = msg.collection().unwrap().to_string();

Expand Down
15 changes: 7 additions & 8 deletions v8env/src/body_mixin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@ import { parse as queryParse } from 'query-string'
import { Blob, FormData, Body, ReadableStream, ReadableStreamReader, BodyInit } from './dom_types';
import { FlyBlob } from './blob';
import FlyFormData from './form_data';
import { ReadableStream as WhatWGReadableStream } from "@stardazed/streams";
import { ReadableStream as WhatWGReadableStream, ReadableStreamDefaultController } from "@stardazed/streams";

interface ReadableStreamController {
enqueue(chunk: string | ArrayBuffer): void
close(): void
}
// interface ReadableStreamController {
// enqueue(chunk: string | ArrayBuffer): void
// close(): void
// }

export type BodySource = Blob | BufferSource |
FormData | URLSearchParams |
Expand All @@ -32,12 +32,11 @@ export default class FlyBody implements Body {
if (this.bodySource instanceof WhatWGReadableStream) {
this.stream = this.bodySource
}
if (typeof this.bodySource === "string") {
if (typeof this.bodySource === "string" || this.bodySource instanceof Uint8Array) {
const bodySource = this.bodySource
this.stream = new WhatWGReadableStream({
start(controller: ReadableStreamController) {
start(controller: ReadableStreamDefaultController) {
controller.enqueue(bodySource)
console.debug("enqueued bodySource:", bodySource)
controller.close()
}
})
Expand Down
6 changes: 3 additions & 3 deletions v8env/src/bridge.ts
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ export function addEventListener(name: string, fn: Function) {
let req = new FlyRequest(msg.url(), {
method: fbs.HttpMethod[msg.method()].toUpperCase(),
headers: headersInit,
body: msg.body() ?
body: msg.hasBody() ?
new ReadableStream({
start(controller) {
streams.set(id, (chunkMsg: fbs.StreamChunk, raw: Uint8Array) => {
Expand Down Expand Up @@ -225,7 +225,7 @@ function handleError(id: number, err: Error) {

fbs.HttpResponse.startHttpResponse(fbb);
fbs.HttpResponse.addId(fbb, id);
fbs.HttpResponse.addBody(fbb, true)
fbs.HttpResponse.addHasBody(fbb, true)
fbs.HttpResponse.addStatus(fbb, 500)

const resMsg = fbs.HttpResponse.endHttpResponse(fbb);
Expand Down Expand Up @@ -295,7 +295,7 @@ async function handleRes(id: number, res: FlyResponse) {
fbs.HttpResponse.addHeaders(fbb, resHeaders);
fbs.HttpResponse.addStatus(fbb, res.status);
let resBody = res.body;
fbs.HttpResponse.addBody(fbb, resBody != null)
fbs.HttpResponse.addHasBody(fbb, resBody != null)

const resMsg = fbs.HttpResponse.endHttpResponse(fbb);
sendSync(fbb, fbs.Any.HttpResponse, resMsg); // sync so we can send body chunks when it's ready!
Expand Down
2 changes: 1 addition & 1 deletion v8env/src/fetch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ export function fetch(info: RequestInfo, init?: FlyRequestInit): Promise<FlyResp
sendAsync(fbb, fbs.Any.HttpRequest, fbs.HttpRequest.endHttpRequest(fbb)).then((base) => {
let msg = new fbs.FetchHttpResponse();
base.msg(msg);
const body = msg.body() ?
const body = msg.hasBody() ?
new ReadableStream({
start(controller) {
streams.set(msg.id(), (chunkMsg: fbs.StreamChunk, raw: Uint8Array) => {
Expand Down

0 comments on commit 7d19376

Please sign in to comment.