Skip to content

Commit

Permalink
feat(wasm): Run Lumina in a Shared Worker (#265)
Browse files Browse the repository at this point in the history
This PR introduces several features:
    either SharedWorker or Worker is spawned for the lumina instance, so that we no longer run in main "GUI" thread.
    NodeDriver replaces WasmNode and is responsible for steering actual lumina node running inside a Worker. Multiple tabs from the same context can access the same lumina instance. After spawning the shared worker, NodeDriver establishes a request-response communication channel over JS primitives.
    Node inside the SharedWorker has server-like infrastructure. onconnect event means that a new connection happened and we're supposed to use event.port[0] to communicate with a newly connected NodeDriver by responding with results of the command we're sent.
    Node inside Worker has simplified architecture, since it doesn't need to handle new tabs connecting, but otherwise works the same

---------

Signed-off-by: Mikołaj Florkiewicz <[email protected]>
Co-authored-by: Yiannis Marangos <[email protected]>
Co-authored-by: Maciej Zwoliński <[email protected]>
  • Loading branch information
3 people authored Jun 25, 2024
1 parent d00e97a commit e932d3e
Show file tree
Hide file tree
Showing 15 changed files with 1,234 additions and 224 deletions.
10 changes: 10 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

26 changes: 15 additions & 11 deletions cli/static/run_node.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Error.stackTraceLimit = 99; // rust stack traces can get pretty big, increase the default

import init, { Node, NodeConfig } from "/wasm/lumina_node_wasm.js";
import init, { NodeConfig, NodeClient } from "/wasm/lumina_node_wasm.js";

async function fetch_config() {
const response = await fetch('/cfg.json');
Expand All @@ -20,7 +20,7 @@ async function fetch_config() {
}

async function show_stats(node) {
if (!node) {
if (!node || !await node.is_running()) {
return;
}
const info = await node.syncer_info();
Expand All @@ -38,7 +38,7 @@ async function show_stats(node) {

document.getElementById("peers").replaceChildren(peers_ul);

const network_head = node.get_network_head_header();
const network_head = await node.get_network_head_header();
if (network_head == null) {
return
}
Expand Down Expand Up @@ -95,21 +95,25 @@ function bind_config(data) {
});
}

async function start_node(config) {
window.node = await new Node(config);

document.getElementById("peer-id").innerText = await window.node.local_peer_id();
document.querySelectorAll(".status").forEach(elem => elem.style.visibility = "visible");
}

async function main(document, window) {
await init();

window.node = await new NodeClient();

bind_config(await fetch_config());

if (await window.node.is_running() === true) {
document.querySelectorAll('.config').forEach(elem => elem.disabled = true);
document.getElementById("peer-id").innerText = await window.node.local_peer_id();
document.querySelectorAll(".status").forEach(elem => elem.style.visibility = "visible");
}

document.getElementById("start").addEventListener("click", async () => {
document.querySelectorAll('.config').forEach(elem => elem.disabled = true);
start_node(window.config);

await window.node.start(window.config);
document.getElementById("peer-id").innerText = await window.node.local_peer_id();
document.querySelectorAll(".status").forEach(elem => elem.style.visibility = "visible");
});

setInterval(async () => await show_stats(window.node), 1000)
Expand Down
25 changes: 22 additions & 3 deletions node-wasm/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,22 +26,41 @@ crate-type = ["cdylib", "rlib"]
blockstore = { workspace = true }
celestia-tendermint = { workspace = true }
celestia-types = { workspace = true }
libp2p = { workspace = true }
libp2p = { workspace = true, features = ["serde"] }
lumina-node = { workspace = true }

anyhow = "1.0.86"
console_error_panic_hook = "0.1.7"
enum-as-inner = "0.6.0"
futures = "0.3.30"
gloo-timers = "0.3.0"
instant = "0.1.13"
js-sys = "0.3.69"
serde = { version = "1.0.203", features = ["derive"] }
serde_repr = "0.1.19"
serde-wasm-bindgen = "0.6.5"
serde_repr = "0.1.19"
thiserror = "1.0.61"
time = { version = "0.3.36", features = ["wasm-bindgen"] }
tokio = { version = "1.38.0", features = ["sync"] }
tracing = "0.1.40"
tracing-subscriber = { version = "0.3.18", features = ["time"] }
tracing-web = "0.1.3"
wasm-bindgen = "0.2.92"
wasm-bindgen-futures = "0.4.42"
web-sys = { version = "0.3.69", features = ["BroadcastChannel", "Crypto"] }
web-sys = { version = "0.3.69", features = [
"BroadcastChannel",
"Crypto",
"DedicatedWorkerGlobalScope",
"MessageEvent",
"MessagePort",
"Navigator",
"SharedWorker",
"SharedWorkerGlobalScope",
"Worker",
"WorkerGlobalScope",
"WorkerOptions",
"WorkerType",
] }

[package.metadata.docs.rs]
targets = ["wasm32-unknown-unknown"]
29 changes: 29 additions & 0 deletions node-wasm/js/worker.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// this file will be installed by wasm-pack in pkg/snippets/<pkg-name>-<hash>/js/
import init, { run_worker } from '../../../lumina_node_wasm.js';

// get the path to this file
export function worker_script_url() {
return import.meta.url;
}

// if we are in a worker
if (typeof WorkerGlobalScope !== 'undefined' && self instanceof WorkerGlobalScope) {
Error.stackTraceLimit = 99;

// for SharedWorker we queue incoming connections
// for dedicated Worker we queue incoming messages (coming from the single client)
let queued = [];
if (typeof SharedWorkerGlobalScope !== 'undefined' && self instanceof SharedWorkerGlobalScope) {
onconnect = (event) => {
queued.push(event)
}
} else {
onmessage = (event) => {
queued.push(event);
}
}

await init();
console.log("starting worker, queued messages: ", queued.length);
await run_worker(queued);
}
5 changes: 4 additions & 1 deletion node-wasm/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
use std::fmt::Display;

use serde::{Deserialize, Serialize};
use wasm_bindgen::convert::IntoWasmAbi;
use wasm_bindgen::describe::WasmDescribe;
use wasm_bindgen::JsValue;
Expand All @@ -10,7 +11,8 @@ use wasm_bindgen::JsValue;
pub type Result<T, E = Error> = std::result::Result<T, E>;

/// An error that can cross the WASM ABI border.
pub struct Error(JsValue);
#[derive(Debug, Serialize, Deserialize)]
pub struct Error(#[serde(with = "serde_wasm_bindgen::preserve")] JsValue);

impl Error {
/// Create a new `Error` with the specified message.
Expand Down Expand Up @@ -113,6 +115,7 @@ from_display! {
libp2p::multiaddr::Error,
lumina_node::node::NodeError,
lumina_node::store::StoreError,
crate::worker::WorkerError,
}

/// Utility to add more context to the [`Error`].
Expand Down
1 change: 1 addition & 0 deletions node-wasm/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@
pub mod error;
pub mod node;
pub mod utils;
mod worker;
mod wrapper;
Loading

0 comments on commit e932d3e

Please sign in to comment.