From 68c63413d383a2feae9bbf46ed43a03ae7a20603 Mon Sep 17 00:00:00 2001 From: Jerome Gravel-Niquet Date: Tue, 30 Oct 2018 12:33:48 -0400 Subject: [PATCH] use sqlite for cache too --- Cargo.lock | 48 ++++----- Cargo.toml | 10 +- README.md | 2 +- hello-world.js | 2 +- src/bin/dns/main.rs | 3 +- src/lib.rs | 1 + src/runtime.rs | 117 +++++++++------------ src/sqlite_cache.rs | 229 ++++++++++++++++++++++++++++++++++++++++++ v8env/src/fly/http.ts | 3 +- v8env/src/globals.ts | 14 ++- v8env/src/resolv.ts | 1 - 11 files changed, 323 insertions(+), 107 deletions(-) create mode 100644 src/sqlite_cache.rs diff --git a/Cargo.lock b/Cargo.lock index 88ac702..41ec34e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -380,9 +380,9 @@ dependencies = [ "log 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)", "r2d2 0.8.2 (registry+https://github.com/rust-lang/crates.io-index)", "r2d2_redis 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)", - "r2d2_sqlite 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)", + "r2d2_sqlite 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)", "rand 0.5.5 (registry+https://github.com/rust-lang/crates.io-index)", - "rusqlite 0.14.0 (registry+https://github.com/rust-lang/crates.io-index)", + "rusqlite 0.15.0 (registry+https://github.com/rust-lang/crates.io-index)", "sha-1 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)", "sha2 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)", "sourcemap 2.2.1 (registry+https://github.com/rust-lang/crates.io-index)", @@ -392,9 +392,9 @@ dependencies = [ "tokio-io-pool 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-signal 0.2.5 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-udp 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", - "trust-dns 0.15.0-alpha.2 (registry+https://github.com/rust-lang/crates.io-index)", - "trust-dns-proto 0.5.0-alpha.3 (registry+https://github.com/rust-lang/crates.io-index)", - "trust-dns-server 0.15.0-alpha.2 (registry+https://github.com/rust-lang/crates.io-index)", + "trust-dns 0.15.0-alpha.2 (git+https://github.com/bluejekyll/trust-dns?rev=dc133e91c4d1461aece3ddcb9525dce824d2baa1)", + "trust-dns-proto 0.5.0-alpha.5 (git+https://github.com/bluejekyll/trust-dns?rev=dc133e91c4d1461aece3ddcb9525dce824d2baa1)", + "trust-dns-server 0.15.0-alpha.2 (git+https://github.com/bluejekyll/trust-dns?rev=dc133e91c4d1461aece3ddcb9525dce824d2baa1)", "url 1.7.1 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -634,7 +634,7 @@ dependencies = [ [[package]] name = "libsqlite3-sys" -version = "0.9.3" +version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "cc 1.0.25 (registry+https://github.com/rust-lang/crates.io-index)", @@ -962,11 +962,11 @@ dependencies = [ [[package]] name = "r2d2_sqlite" -version = "0.6.0" +version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "r2d2 0.8.2 (registry+https://github.com/rust-lang/crates.io-index)", - "rusqlite 0.14.0 (registry+https://github.com/rust-lang/crates.io-index)", + "rusqlite 0.15.0 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -1085,11 +1085,11 @@ dependencies = [ [[package]] name = "rusqlite" -version = "0.14.0" +version = "0.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "bitflags 1.0.4 (registry+https://github.com/rust-lang/crates.io-index)", - "libsqlite3-sys 0.9.3 (registry+https://github.com/rust-lang/crates.io-index)", + "libsqlite3-sys 0.10.0 (registry+https://github.com/rust-lang/crates.io-index)", "lru-cache 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", "time 0.1.40 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -1642,7 +1642,7 @@ dependencies = [ [[package]] name = "trust-dns" version = "0.15.0-alpha.2" -source = "registry+https://github.com/rust-lang/crates.io-index" +source = "git+https://github.com/bluejekyll/trust-dns?rev=dc133e91c4d1461aece3ddcb9525dce824d2baa1#dc133e91c4d1461aece3ddcb9525dce824d2baa1" dependencies = [ "chrono 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", "data-encoding 2.1.1 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1655,13 +1655,13 @@ dependencies = [ "rand 0.5.5 (registry+https://github.com/rust-lang/crates.io-index)", "tokio 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-tcp 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", - "trust-dns-proto 0.5.0-alpha.3 (registry+https://github.com/rust-lang/crates.io-index)", + "trust-dns-proto 0.5.0-alpha.5 (git+https://github.com/bluejekyll/trust-dns?rev=dc133e91c4d1461aece3ddcb9525dce824d2baa1)", ] [[package]] name = "trust-dns-proto" -version = "0.5.0-alpha.3" -source = "registry+https://github.com/rust-lang/crates.io-index" +version = "0.5.0-alpha.5" +source = "git+https://github.com/bluejekyll/trust-dns?rev=dc133e91c4d1461aece3ddcb9525dce824d2baa1#dc133e91c4d1461aece3ddcb9525dce824d2baa1" dependencies = [ "byteorder 1.2.6 (registry+https://github.com/rust-lang/crates.io-index)", "data-encoding 2.1.1 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1685,7 +1685,7 @@ dependencies = [ [[package]] name = "trust-dns-server" version = "0.15.0-alpha.2" -source = "registry+https://github.com/rust-lang/crates.io-index" +source = "git+https://github.com/bluejekyll/trust-dns?rev=dc133e91c4d1461aece3ddcb9525dce824d2baa1#dc133e91c4d1461aece3ddcb9525dce824d2baa1" dependencies = [ "backtrace 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)", "bytes 0.4.10 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1697,7 +1697,7 @@ dependencies = [ "lazy_static 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)", "rand 0.5.5 (registry+https://github.com/rust-lang/crates.io-index)", - "rusqlite 0.14.0 (registry+https://github.com/rust-lang/crates.io-index)", + "rusqlite 0.15.0 (registry+https://github.com/rust-lang/crates.io-index)", "serde 1.0.79 (registry+https://github.com/rust-lang/crates.io-index)", "serde_derive 1.0.21 (registry+https://github.com/rust-lang/crates.io-index)", "time 0.1.40 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1710,8 +1710,8 @@ dependencies = [ "tokio-timer 0.2.6 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-udp 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", "toml 0.4.7 (registry+https://github.com/rust-lang/crates.io-index)", - "trust-dns 0.15.0-alpha.2 (registry+https://github.com/rust-lang/crates.io-index)", - "trust-dns-proto 0.5.0-alpha.3 (registry+https://github.com/rust-lang/crates.io-index)", + "trust-dns 0.15.0-alpha.2 (git+https://github.com/bluejekyll/trust-dns?rev=dc133e91c4d1461aece3ddcb9525dce824d2baa1)", + "trust-dns-proto 0.5.0-alpha.5 (git+https://github.com/bluejekyll/trust-dns?rev=dc133e91c4d1461aece3ddcb9525dce824d2baa1)", ] [[package]] @@ -1967,7 +1967,7 @@ dependencies = [ "checksum lazycell 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ddba4c30a78328befecec92fc94970e53b3ae385827d28620f0f5bb2493081e0" "checksum libc 0.2.43 (registry+https://github.com/rust-lang/crates.io-index)" = "76e3a3ef172f1a0b9a9ff0dd1491ae5e6c948b94479a3021819ba7d860c8645d" "checksum libloading 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)" = "9c3ad660d7cb8c5822cd83d10897b0f1f1526792737a179e73896152f85b88c2" -"checksum libsqlite3-sys 0.9.3 (registry+https://github.com/rust-lang/crates.io-index)" = "d3711dfd91a1081d2458ad2d06ea30a8755256e74038be2ad927d94e1c955ca8" +"checksum libsqlite3-sys 0.10.0 (registry+https://github.com/rust-lang/crates.io-index)" = "742b695cbfb89e549dca6960a55e6802f67d352e33e97859ee46dee835211b0f" "checksum linked-hash-map 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)" = "7860ec297f7008ff7a1e3382d7f7e1dcd69efc94751a2284bafc3d013c2aa939" "checksum lock_api 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "775751a3e69bde4df9b38dd00a1b5d6ac13791e4223d4a0506577f0dd27cfb7a" "checksum log 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)" = "d4fcce5fa49cc693c312001daf1d13411c4a5283796bac1084299ea3e567113f" @@ -2007,7 +2007,7 @@ dependencies = [ "checksum quote 0.6.8 (registry+https://github.com/rust-lang/crates.io-index)" = "dd636425967c33af890042c483632d33fa7a18f19ad1d7ea72e8998c6ef8dea5" "checksum r2d2 0.8.2 (registry+https://github.com/rust-lang/crates.io-index)" = "f9078ca6a8a5568ed142083bb2f7dc9295b69d16f867ddcc9849e51b17d8db46" "checksum r2d2_redis 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)" = "98318a90bb5f4990e084cd54e875aedb794f73dcef8668287cc74e875955b9db" -"checksum r2d2_sqlite 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)" = "7353e4cee1aef402161e78c9a17367c018d47157625dd9e01f9651aafd098426" +"checksum r2d2_sqlite 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ce466d46abbb1b52280818d3fdb300f456f7cc90a899e1ab4f6890aa3aeae909" "checksum radix_trie 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "03d0d770481e8af620ca61d3d304bf014f965d7f78e923dc58545e6a545070a9" "checksum rand 0.5.5 (registry+https://github.com/rust-lang/crates.io-index)" = "e464cd887e869cddcae8792a4ee31d23c7edd516700695608f5b98c67ee0131c" "checksum rand_core 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "edecf0f94da5551fc9b492093e30b041a891657db7940ee221f9d2f66e82eef2" @@ -2020,7 +2020,7 @@ dependencies = [ "checksum regex-syntax 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)" = "747ba3b235651f6e2f67dfa8bcdcd073ddb7c243cb21c442fc12395dfcac212d" "checksum remove_dir_all 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)" = "3488ba1b9a2084d38645c4c08276a1752dcbf2c7130d74f1569681ad5d2799c5" "checksum ring 0.13.2 (registry+https://github.com/rust-lang/crates.io-index)" = "dbe642b9dd1ba0038d78c4a3999d1ee56178b4d415c1e1fbaba83b06dce012f0" -"checksum rusqlite 0.14.0 (registry+https://github.com/rust-lang/crates.io-index)" = "c9d9118f1ce84d8d0b67f9779936432fb42bb620cef2122409d786892cce9a3c" +"checksum rusqlite 0.15.0 (registry+https://github.com/rust-lang/crates.io-index)" = "39bae767eb27866f5c0be918635ae54af705bc09db11be2c43a3c6b361cf3462" "checksum rustc-demangle 0.1.9 (registry+https://github.com/rust-lang/crates.io-index)" = "bcfe5b13211b4d78e5c2cadfebd7769197d95c639c35a50057eb4c05de811395" "checksum rustc_version 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)" = "138e3e0acb6c9fb258b19b67cb8abd63c00679d2851805ea151465464fe9030a" "checksum rustls 0.14.0 (registry+https://github.com/rust-lang/crates.io-index)" = "8b7891791343c75b73ed9a18cadcafd8c8563d11a88ebe2d87f5b8a3182654d9" @@ -2078,9 +2078,9 @@ dependencies = [ "checksum tokio-udp 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "da941144b816d0dcda4db3a1ba87596e4df5e860a72b70783fe435891f80601c" "checksum tokio-uds 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "424c1ed15a0132251813ccea50640b224c809d6ceafb88154c1a8775873a0e89" "checksum toml 0.4.7 (registry+https://github.com/rust-lang/crates.io-index)" = "b7e7d59d55f36979a9dd86d71ae54657a5e9c7fdb4fa2212f4064e2d32f9dcda" -"checksum trust-dns 0.15.0-alpha.2 (registry+https://github.com/rust-lang/crates.io-index)" = "033df9f62cc40509a24ee6b6c8aa0905c0d644d2d2baa758297267f46138c151" -"checksum trust-dns-proto 0.5.0-alpha.3 (registry+https://github.com/rust-lang/crates.io-index)" = "3fabc184ed90d027afee46386e6418b9c953b7be527f62cc37724a1720e07d68" -"checksum trust-dns-server 0.15.0-alpha.2 (registry+https://github.com/rust-lang/crates.io-index)" = "338c2177ded0a4ef05518ba0674a3c518a29bdfbf39ad49b6561a463a920c04f" +"checksum trust-dns 0.15.0-alpha.2 (git+https://github.com/bluejekyll/trust-dns?rev=dc133e91c4d1461aece3ddcb9525dce824d2baa1)" = "" +"checksum trust-dns-proto 0.5.0-alpha.5 (git+https://github.com/bluejekyll/trust-dns?rev=dc133e91c4d1461aece3ddcb9525dce824d2baa1)" = "" +"checksum trust-dns-server 0.15.0-alpha.2 (git+https://github.com/bluejekyll/trust-dns?rev=dc133e91c4d1461aece3ddcb9525dce824d2baa1)" = "" "checksum try-lock 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "e604eb7b43c06650e854be16a2a03155743d3752dd1c943f6829e26b7a36e382" "checksum typenum 1.10.0 (registry+https://github.com/rust-lang/crates.io-index)" = "612d636f949607bdf9b123b4a6f6d966dedf3ff669f7f045890d3a4a73948169" "checksum ucd-util 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "fd2be2d6639d0f8fe6cdda291ad456e23629558d466e2789d2c3e9892bda285d" diff --git a/Cargo.toml b/Cargo.toml index ec52493..02d6a25 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,9 +17,9 @@ flatbuffers = { path = "./third_party/flatbuffers/rust/flatbuffers" } hyper = "0.12" hyper-tls = "0.3" http = "0.1" -rusqlite = {version="0.14", features=["bundled"]} +rusqlite = {version="0.15", features=["bundled","blob"]} r2d2 = "0.8" -r2d2_sqlite = "0.6" +r2d2_sqlite = "0.7" r2d2_redis = "0.8" sha2 = "0.7" sha-1 = "0.7" @@ -28,9 +28,9 @@ sourcemap = "2.2" rand = "0.5" libfly = { path = "libfly" } tokio-udp = "0.1" -trust-dns-server = "0.15.0-alpha.2" -trust-dns = "0.15.0-alpha.2" -trust-dns-proto = "0.5.0-alpha.3" +trust-dns-server = { git = "https://github.com/bluejekyll/trust-dns", rev = "dc133e91c4d1461aece3ddcb9525dce824d2baa1" } +trust-dns = { git = "https://github.com/bluejekyll/trust-dns", rev = "dc133e91c4d1461aece3ddcb9525dce824d2baa1" } +trust-dns-proto = { git = "https://github.com/bluejekyll/trust-dns", rev = "dc133e91c4d1461aece3ddcb9525dce824d2baa1" } tokio-fs = "0.1" tokio-codec = "0.1" glob = "0.2" diff --git a/README.md b/README.md index 5672cb8..1242d3c 100644 --- a/README.md +++ b/README.md @@ -47,7 +47,7 @@ addEventListener("resolv", event => { name: event.request.queries[0].name, // name of the DNS entry rrType: DNSRecordType.A, // record type ttl: 300, // time-to-live for the client - data: "127.0.0.1" // data for the record + data: {ip: "127.0.0.1"} // data for the record } ] } diff --git a/hello-world.js b/hello-world.js index f9ffeea..f3fd7ff 100644 --- a/hello-world.js +++ b/hello-world.js @@ -15,7 +15,7 @@ addEventListener("fetch", function (event) { event.respondWith(new Response(helloWorld)) else if (url.pathname == "/kitchensink") { - const coll = flyData.collection("testing") + const coll = fly.data.collection("testing") coll.put("id", { foo: "bar" }).then(b => { console.log("put returned:", b); coll.get("id").then(d => { diff --git a/src/bin/dns/main.rs b/src/bin/dns/main.rs index 9e6c88f..f5b6355 100644 --- a/src/bin/dns/main.rs +++ b/src/bin/dns/main.rs @@ -13,6 +13,7 @@ extern crate trust_dns_server; use trust_dns_server::authority::{AuthLookup, MessageResponseBuilder}; use trust_dns_proto::op::header::Header; +use trust_dns_proto::rr::record_type::RecordType; use trust_dns_proto::rr::{Record, RrsetRecords}; use trust_dns_server::authority::authority::LookupRecords; @@ -123,7 +124,7 @@ impl RequestHandler for DnsHandler { .iter() .map(|q| { debug!("query: {:?}", q); - use self::dns::rr::{DNSClass, Name, RecordType}; + use self::dns::rr::{DNSClass, Name}; let name = builder.create_string(&Name::from(q.name().clone()).to_utf8()); let rr_type = match q.query_type() { RecordType::A => msg::DnsRecordType::A, diff --git a/src/lib.rs b/src/lib.rs index 03f4f7d..bf3ee1e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -24,4 +24,5 @@ pub mod msg; pub mod ops; pub mod redis_stream; pub mod runtime; +mod sqlite_cache; pub mod utils; diff --git a/src/runtime.rs b/src/runtime.rs index 59b7d10..2ba3c21 100644 --- a/src/runtime.rs +++ b/src/runtime.rs @@ -45,7 +45,6 @@ extern crate r2d2; extern crate r2d2_redis; extern crate r2d2_sqlite; extern crate rusqlite; -use self::r2d2_redis::redis; use self::r2d2_sqlite::SqliteConnectionManager; use self::hyper::body::Payload; @@ -63,8 +62,6 @@ use msg; use errors::{FlyError, FlyResult}; -use redis_stream; - extern crate log; extern crate rand; @@ -78,6 +75,8 @@ use self::tokio_codec::{BytesCodec, FramedRead}; use ops; // src/ops/ use utils::*; +use sqlite_cache; + #[derive(Debug)] pub enum JsHttpResponseBody { Stream(mpsc::UnboundedReceiver>), @@ -624,9 +623,7 @@ fn op_crypto_digest(_ptr: JsRuntime, base: &msg::Base, raw: fly_buf) -> Box use super::NEXT_EVENT_ID; use std::str; -use std::ops::Deref; fn op_cache_set(ptr: JsRuntime, base: &msg::Base, _raw: fly_buf) -> Box { - println!("CACHE SET"); let cmd_id = base.cmd_id(); let msg = base.msg_as_cache_set().unwrap(); let key = msg.key().unwrap().to_string(); @@ -640,30 +637,15 @@ fn op_cache_set(ptr: JsRuntime, base: &msg::Base, _raw: fly_buf) -> Box { rt.streams.lock().unwrap().insert(stream_id, sender); } - { - let pool = Arc::clone(&redis_stream::REDIS_CACHE_POOL); - let con = pool.get().unwrap(); // TODO: no unwrap - let offset: AtomicUsize = ATOMIC_USIZE_INIT; - let spawnres = rt.event_loop.lock().unwrap().spawn( - recver - .map_err(|_| println!("error cache set stream!")) - .for_each(move |b| { - let start = offset.fetch_add(b.len(), Ordering::SeqCst); - match redis::cmd("SETRANGE") - .arg(key.clone()) - .arg(start) - .arg(b) - .query::(con.deref()) - { - Ok(_r) => {} - Err(e) => println!("error in redis.. {}", e), - } - Ok(()) - }), - ); - if let Err(err) = spawnres { - return odd_future(format!("{}", err).into()); - } + let fut = sqlite_cache::set(key, None, Box::new(recver)); + + let spawnres = rt.event_loop.lock().unwrap().spawn( + fut + .map_err(|e| println!("error cache set stream! {:?}", e)) + .and_then(move |_b| Ok(())), + ); + if let Err(err) = spawnres { + return odd_future(format!("{}", err).into()); } let builder = &mut FlatBufferBuilder::new(); @@ -693,23 +675,19 @@ fn op_cache_get(ptr: JsRuntime, base: &msg::Base, _raw: fly_buf) -> Box { let key = msg.key().unwrap().to_string(); - let got = { - let pool = Arc::clone(&redis_stream::REDIS_CACHE_POOL); - let con = pool.get().unwrap(); // TODO: no unwrap - match redis::cmd("EXISTS") - .arg(key.clone()) - .query::(con.deref()) - { - Ok(b) => b, - Err(e) => { - println!("redis exists err: {}", e); - false - } - } + let maybe_stream = match sqlite_cache::get(key) { + Ok(s) => s, + Err(e) => match e { + sqlite_cache::CacheError::IoErr(ioe) => return odd_future(ioe.into()), + sqlite_cache::CacheError::Unknown => return odd_future("unknown error".to_string().into()), + sqlite_cache::CacheError::RusqliteErr(e) => return odd_future(format!("{}", e).into()), + }, }; let rt = ptr.to_runtime(); + let got = maybe_stream.is_some(); + { // need to hijack the order here. let fut = future::lazy(move || { @@ -744,11 +722,9 @@ fn op_cache_get(ptr: JsRuntime, base: &msg::Base, _raw: fly_buf) -> Box { } } - if got { - let stream = redis_stream::redis_stream(key.clone()); - + if let Some(stream) = maybe_stream { let fut = stream - .map_err(|e| println!("error redis stream: {}", e)) + .map_err(|e| println!("error cache stream: {:?}", e)) .for_each(move |bytes| { let builder = &mut FlatBufferBuilder::new(); let chunk_msg = msg::StreamChunk::create( @@ -1179,7 +1155,7 @@ fn op_http_request(ptr: JsRuntime, base: &msg::Base, _raw: fly_buf) -> Box { if let Err(_) = p.send(Err(err.into())) { error!("error sending error for http response :/"); } - return Ok(()) + return Ok(()); } let res = reserr.unwrap(); // should be safe. @@ -1218,27 +1194,27 @@ fn op_http_request(ptr: JsRuntime, base: &msg::Base, _raw: fly_buf) -> Box { }, ); ptr.send( - fly_buf_from( - serialize_response( - 0, - builder, - msg::BaseArgs { - msg: Some(chunk_msg.as_union_value()), - msg_type: msg::Any::StreamChunk, - ..Default::default() - }, - ).unwrap(), - ), - Some(fly_buf { - alloc_ptr: 0 as *mut u8, - alloc_len: 0, - data_ptr: (*bytes).as_ptr() as *mut u8, - data_len: bytes.len(), - }), - ); + fly_buf_from( + serialize_response( + 0, + builder, + msg::BaseArgs { + msg: Some(chunk_msg.as_union_value()), + msg_type: msg::Any::StreamChunk, + ..Default::default() + }, + ).unwrap(), + ), + Some(fly_buf { + alloc_ptr: 0 as *mut u8, + alloc_len: 0, + data_ptr: (*bytes).as_ptr() as *mut u8, + data_len: bytes.len(), + }), + ); } Ok(Async::Ready(())) - }).map_err(|e: hyper::Error| println!("hyper error: {}",e)), + }).map_err(|e: hyper::Error| println!("hyper error: {}", e)), ); if let Err(err) = spawnres { error!("error spawning http res stream: {}", err); @@ -1446,7 +1422,7 @@ fn op_data_get(_ptr: JsRuntime, base: &msg::Base, _raw: fly_buf) -> Box { create_collection(&*con, &coll).unwrap(); - match con.query_row::( + match con.query_row::( format!("SELECT obj FROM {} WHERE key == ?", coll).as_str(), &[&key], |row| row.get(0), @@ -1508,7 +1484,10 @@ fn op_data_drop_coll(_ptr: JsRuntime, base: &msg::Base, _raw: fly_buf) -> Box Ok(None), Err(e) => Err(format!("{}", e).into()), } @@ -1521,6 +1500,6 @@ fn create_collection(con: &rusqlite::Connection, name: &String) -> rusqlite::Res "CREATE TABLE IF NOT EXISTS {} (key TEXT PRIMARY KEY NOT NULL, obj JSON NOT NULL)", name ).as_str(), - &[], + rusqlite::NO_PARAMS, ) } diff --git a/src/sqlite_cache.rs b/src/sqlite_cache.rs new file mode 100644 index 0000000..254d94e --- /dev/null +++ b/src/sqlite_cache.rs @@ -0,0 +1,229 @@ +use std::sync::Arc; + +extern crate r2d2; +extern crate r2d2_sqlite; +extern crate rusqlite; +use self::r2d2_sqlite::SqliteConnectionManager; + +use self::rusqlite::types::ToSql; +use self::rusqlite::NO_PARAMS; + +use futures::{future, stream, Future, Stream}; +use std::ops::Deref; + +use std::io::{Read, Seek, SeekFrom}; + +use std::io; + +#[derive(Debug)] +pub enum CacheError { + Unknown, + RusqliteErr(rusqlite::Error), + IoErr(io::Error), +} + +impl From for CacheError { + #[inline] + fn from(err: io::Error) -> CacheError { + CacheError::IoErr(err) + } +} + +impl From for CacheError { + #[inline] + fn from(err: rusqlite::Error) -> CacheError { + CacheError::RusqliteErr(err) + } +} + +lazy_static! { + static ref SQLITE_CACHE_POOL: Arc> = { + let manager = SqliteConnectionManager::file("cache.db"); + let pool = r2d2::Pool::builder().build(manager).unwrap(); + let con = pool.get().unwrap(); // TODO: no unwrap + con.execute("CREATE TABLE IF NOT EXISTS cache ( + key TEXT PRIMARY KEY NOT NULL, + value BLOB NOT NULL, + expires_at DATETIME + ); + CREATE UNIQUE INDEX IF NOT EXISTS ON cache (key); + CREATE INDEX IF NOT EXISTS ON cache (key, expires_at);", + NO_PARAMS, + ).unwrap(); + Arc::new(pool) + }; +} + +pub fn set( + key: String, + maybe_ttl: Option, + data_stream: Box, Error = ()> + Send>, +) -> Box + Send> { + debug!( + "sqlite cache set with key: {} and ttl: {:?}", + key, maybe_ttl + ); + + Box::new( + data_stream + .concat2() + .map_err(|_e| { + error!("sqlite cache set error concatenating stream"); + CacheError::Unknown + }).and_then(move |b| { + let pool = Arc::clone(&SQLITE_CACHE_POOL); + let conn = pool.get().unwrap(); // TODO: no unwrap + + if let Some(ttl) = maybe_ttl { + let mut stmt = conn + .prepare( + "INSERT INTO cache(key, value, expires_at) + VALUES (?, ?, datetime('now', '+? seconds')) + ON CONFLICT (key) DO + UPDATE SET value=excluded.value,expires_at=excluded.expires_at + ", + ).unwrap(); + + stmt + .insert(&[&key as &ToSql, &b as &ToSql, &format!("{}", ttl) as &ToSql]) + .unwrap() + } else { + let mut stmt = conn + .prepare( + "INSERT INTO cache(key, value) + VALUES (?, ?) + ON CONFLICT (key) DO + UPDATE SET value=excluded.value + ", + ).unwrap(); + + stmt.insert(&[&key as &ToSql, &b as &ToSql]).unwrap() + }; + + // let mut stmt = conn + // .prepare("SELECT rowid FROM cache WHERE key = ? LIMIT 1") + // .unwrap(); + + // let mut rows = match stmt.query(&[&key]) { + // Ok(r) => r, + // Err(e) => return Err(e.into()), + // }; + // let rowid: i64 = match rows.next() { + // Some(res) => match res { + // Ok(r) => r.get(0), + // Err(e) => return Err(e.into()), + // }, + // None => return Err(CacheError::Unknown), + // }; + Ok(()) + }), + ) + + // Ok(Box::new( + // data_stream + // .map_err(|e| { + // error!("error cache set stream!"); + // CacheError::Unknown + // }).map(move |b| -> Result<(), CacheError> { + // let start = offset.fetch_add(b.len(), Ordering::SeqCst) as u64; + // debug!("sqlite cache set len: {} at offset: {}", b.len(), start); + + // let conn = pool.get().unwrap(); + + // let mut blob = + // match conn + // .deref() + // .blob_open(rusqlite::DatabaseName::Main, "cache", "value", rowid, false) + // { + // Ok(b) => b, + // Err(e) => { + // return Err(e.into()); + // } + // }; + + // // if let Err(e) = blob.seek(SeekFrom::Start(start)) { + // // return Err(e.into()); + // // } + + // match blob.write(b.as_slice()) { + // Ok(n) => debug!("sqlite cache set set, wrote {}", n), + // Err(e) => return Err(e.into()), + // }; + + // Ok(()) + // }).and_then(|res| res), // uh, ok. that's required! + // )) +} + +pub fn get( + key: String, +) -> Result, Error = CacheError> + Send>>, CacheError> { + debug!("sqlite cache get with key: {}", key); + let pool = Arc::clone(&SQLITE_CACHE_POOL); + let conn = pool.get().unwrap(); // TODO: no unwrap + let size = 256 * 1024; + + let mut stmt = conn + .prepare( + "SELECT rowid FROM cache + WHERE key = ? AND + ( + expires_at IS NULL OR + expires_at <= datetime('now') + ) LIMIT 1", + ).unwrap(); + + let mut rows = stmt.query(&[&key])?; + + debug!("got rows"); + + let rowid: i64 = match rows.next() { + Some(res) => res?.get(0), + None => { + debug!("row not found"); + return Ok(None); + } + }; + + debug!("got a rowid: {}", rowid); + + Ok(Some(Box::new(stream::unfold(0, move |pos| { + debug!("sqlite cache get in stream future, pos: {}", pos); + // println!("unfolding... pos: {}, modulo: {}", pos, pos % size); + + // End early given some rules! + // not a multiple of size, means we're done. + if pos > 0 && pos % size > 0 { + debug!("sqlite cache get returning early"); + return None; + } + + let conn = pool.get().unwrap(); + + let mut blob = + match conn + .deref() + .blob_open(rusqlite::DatabaseName::Main, "cache", "value", rowid, true) + { + Ok(b) => b, + Err(e) => return Some(future::err(e.into())), + }; + + if let Err(e) = blob.seek(SeekFrom::Start(pos)) { + return Some(future::err(e.into())); + } + let mut buf = [0u8; 256 * 1024]; + match blob.read(&mut buf[..]) { + Ok(bytes_read) => { + if bytes_read == 0 { + return None; + } + Some(future::ok::<(Vec, u64), _>(( + buf[..bytes_read].to_vec(), + pos + bytes_read as u64, + ))) + } + Err(e) => Some(future::err(e.into())), + } + })))) +} diff --git a/v8env/src/fly/http.ts b/v8env/src/fly/http.ts index 415affa..ad64244 100644 --- a/v8env/src/fly/http.ts +++ b/v8env/src/fly/http.ts @@ -4,7 +4,6 @@ * @private */ -declare var fly; let fetchEventBound = false let flyFetchHandler = null @@ -43,7 +42,7 @@ function handleFetch(event) { * @param {Object} [params] The parameters (if any) extracted from the route pattern * @returns {Response} An HTTP response generated for the request */ -module.exports = { +export default { /** * Registers an HTTP handler functions. This handler is matched when no routes are set, or no routes match a given request. * @public diff --git a/v8env/src/globals.ts b/v8env/src/globals.ts index ddfc897..7339b8e 100644 --- a/v8env/src/globals.ts +++ b/v8env/src/globals.ts @@ -17,7 +17,8 @@ import cache_ from "./cache"; import * as url from './url'; import { FlyRequest } from "./request"; -import data from './fly/data'; +import flyData from './fly/data'; +import flyCache from './fly/cache'; declare global { interface Window { @@ -51,8 +52,12 @@ declare global { let crypto: typeof crypto_.crypto; let cache: typeof cache_; + interface Fly { + cache: typeof flyCache + data: typeof flyData + } // TODO: remove - let flyData: typeof data; + const fly: Fly const resolv: typeof resolv_.resolv; const DNSClass: typeof dns.DNSClass; @@ -89,7 +94,10 @@ window.resolv = resolv_.resolv; window.crypto = crypto_.crypto; window.cache = cache_; -window.flyData = data; +window.fly = { + cache: flyCache, + data: flyData +} window.DNSClass = dns.DNSClass; window.DNSRecordType = dns.DNSRecordType; diff --git a/v8env/src/resolv.ts b/v8env/src/resolv.ts index 8ab42ea..02c20a8 100644 --- a/v8env/src/resolv.ts +++ b/v8env/src/resolv.ts @@ -20,7 +20,6 @@ export function resolv(req: DNSQuery | string): Promise { fbs.DnsQuery.addDnsClass(fbb, query.dnsClass); fbs.DnsQuery.addRrType(fbb, query.rrType); sendAsync(fbb, fbs.Any.DnsQuery, fbs.DnsQuery.endDnsQuery(fbb)).then(baseRes => { - console.log("hello from resolv response") let msg = new fbs.DnsResponse() baseRes.msg(msg); const answers: DNSRecord[] = [];