Skip to content

Commit

Permalink
Add Worker/SharedWorker switch for browser that need it
Browse files Browse the repository at this point in the history
  • Loading branch information
fl0rek committed May 16, 2024
1 parent 0495d05 commit 7fe0397
Show file tree
Hide file tree
Showing 6 changed files with 358 additions and 147 deletions.
6 changes: 5 additions & 1 deletion node-wasm/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,16 @@ web-sys = { version = "0.3.69", features = [
"Blob",
"BlobPropertyBag",
"BroadcastChannel",
"DedicatedWorkerGlobalScope",
"MessageEvent",
"MessagePort",
"Navigator",
"SharedWorker",
"SharedWorkerGlobalScope",
"Url",
"Worker",
"WorkerGlobalScope",
"WorkerNavigator",
"WorkerOptions",
"WorkerType",
"WorkerType"
] }
19 changes: 12 additions & 7 deletions node-wasm/js/worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,23 @@ export function worker_script_url() {
}

// if we are in a worker
if (
typeof WorkerGlobalScope !== 'undefined'
&& self instanceof WorkerGlobalScope
) {
if (typeof WorkerGlobalScope !== 'undefined' && self instanceof WorkerGlobalScope) {
Error.stackTraceLimit = 99;

// for SharedWorker we queue incoming connections
// for dedicated Workerwe queue incoming messages (coming from the single client)
let queued = [];
onconnect = (event) => {
console.log("Queued connection", event);
queued.push(event.ports[0]);
if (typeof SharedWorkerGlobalScope !== 'undefined' && self instanceof SharedWorkerGlobalScope) {
onconnect = (event) => {
queued.push(event)
}
} else {
onmessage = (event) => {
queued.push(event);
}
}

await init();
console.log("starting worker, queued messages: ", queued.length);
await run_worker(queued);
}
113 changes: 70 additions & 43 deletions node-wasm/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,22 @@ use libp2p::identity::Keypair;
use libp2p::multiaddr::Protocol;
use serde::{Deserialize, Serialize};
use serde_wasm_bindgen::to_value;
use tracing::error;
use tracing::info;
use wasm_bindgen::prelude::*;
use web_sys::{MessageEvent, SharedWorker};
use web_sys::{SharedWorker, Worker, WorkerOptions, WorkerType};

use lumina_node::blockstore::IndexedDbBlockstore;
use lumina_node::network::{canonical_network_bootnodes, network_genesis, network_id};
use lumina_node::node::NodeConfig;
use lumina_node::store::IndexedDbStore;

use crate::utils::{js_value_from_display, Network};
use crate::utils::{is_chrome, js_value_from_display, JsValueToJsError, Network};
use crate::worker::commands::{CheckableResponseExt, NodeCommand, SingleHeaderQuery};
use crate::worker::{spawn_worker, WorkerClient, WorkerError};
use crate::worker::{worker_script_url, WorkerClient, WorkerError};
use crate::wrapper::libp2p::NetworkInfoSnapshot;
use crate::Result;

const LUMINA_SHARED_WORKER_NAME: &str = "lumina";
const LUMINA_WORKER_NAME: &str = "lumina";

/// Config for the lumina wasm node.
#[wasm_bindgen(js_name = NodeConfig)]
Expand All @@ -37,11 +37,25 @@ pub struct WasmNodeConfig {
pub bootnodes: Vec<String>,
}

/// `NodeDriver` represents lumina node running in a dedicated Worker/SharedWorker.
/// It's responsible for sending commands and receiving responses from the node.
#[wasm_bindgen(js_name = NodeClient)]
struct NodeDriver {
_worker: SharedWorker,
_onerror_callback: Closure<dyn Fn(MessageEvent)>,
channel: WorkerClient,
client: WorkerClient,
}

/// Type of worker to run lumina in. Allows overriding automatically detected worker kind
/// (which should usually be appropriate).
#[wasm_bindgen]
pub enum NodeWorkerKind {
/// Run in [`SharedWorker`]
///
/// [`SharedWorker`]: https://developer.mozilla.org/en-US/docs/Web/API/SharedWorker
Shared,
/// Run in [`Worker`]
///
/// [`Worker`]: https://developer.mozilla.org/en-US/docs/Web/API/Worker
Dedicated,
}

#[wasm_bindgen(js_class = NodeClient)]
Expand All @@ -50,27 +64,40 @@ impl NodeDriver {
/// Note that single Shared Worker can be accessed from multiple tabs, so Lumina may
/// already have been started. Otherwise it needs to be started with [`NodeDriver::start`].
#[wasm_bindgen(constructor)]
pub async fn new() -> Result<NodeDriver> {
let worker = spawn_worker(LUMINA_SHARED_WORKER_NAME)?;

let onerror_callback: Closure<dyn Fn(MessageEvent)> = Closure::new(|ev: MessageEvent| {
error!("received error from SharedWorker: {:?}", ev.to_string());
});
worker.set_onerror(Some(onerror_callback.as_ref().unchecked_ref()));
pub async fn new(worker_type: Option<NodeWorkerKind>) -> Result<NodeDriver> {
let url = worker_script_url();
let mut opts = WorkerOptions::new();
opts.type_(WorkerType::Module);
opts.name(LUMINA_WORKER_NAME);

let default_worker_type = if is_chrome() {
NodeWorkerKind::Dedicated
} else {
NodeWorkerKind::Shared
};

let channel = WorkerClient::new(worker.port());
let client = match worker_type.unwrap_or(default_worker_type) {
NodeWorkerKind::Shared => {
info!("Starting SharedWorker");
let worker = SharedWorker::new_with_worker_options(&url, &opts)
.to_error("could not create SharedWorker")?;
WorkerClient::from(worker)
}
NodeWorkerKind::Dedicated => {
info!("Starting Worker");
let worker =
Worker::new_with_options(&url, &opts).to_error("could not create Worker")?;
WorkerClient::from(worker)
}
};

Ok(Self {
_worker: worker,
_onerror_callback: onerror_callback,
channel,
})
Ok(Self { client })
}

/// Check whether Lumina is currently running
pub async fn is_running(&self) -> Result<bool> {
let command = NodeCommand::IsRunning;
let response = self.channel.exec(command).await?;
let response = self.client.exec(command).await?;
let running = response.into_is_running().check_variant()?;

Ok(running)
Expand All @@ -79,7 +106,7 @@ impl NodeDriver {
/// Start a node with the provided config, if it's not running
pub async fn start(&self, config: WasmNodeConfig) -> Result<()> {
let command = NodeCommand::StartNode(config);
let response = self.channel.exec(command).await?;
let response = self.client.exec(command).await?;
let started = response.into_node_started().check_variant()?;

Ok(started?)
Expand All @@ -88,7 +115,7 @@ impl NodeDriver {
/// Get node's local peer ID.
pub async fn local_peer_id(&self) -> Result<String> {
let command = NodeCommand::GetLocalPeerId;
let response = self.channel.exec(command).await?;
let response = self.client.exec(command).await?;
let peer_id = response.into_local_peer_id().check_variant()?;

Ok(peer_id)
Expand All @@ -97,7 +124,7 @@ impl NodeDriver {
/// Get current [`PeerTracker`] info.
pub async fn peer_tracker_info(&self) -> Result<JsValue> {
let command = NodeCommand::GetPeerTrackerInfo;
let response = self.channel.exec(command).await?;
let response = self.client.exec(command).await?;
let peer_info = response.into_peer_tracker_info().check_variant()?;

Ok(to_value(&peer_info)?)
Expand All @@ -106,7 +133,7 @@ impl NodeDriver {
/// Wait until the node is connected to at least 1 peer.
pub async fn wait_connected(&self) -> Result<()> {
let command = NodeCommand::WaitConnected { trusted: false };
let response = self.channel.exec(command).await?;
let response = self.client.exec(command).await?;
let result = response.into_connected().check_variant()?;

Ok(result?)
Expand All @@ -115,7 +142,7 @@ impl NodeDriver {
/// Wait until the node is connected to at least 1 trusted peer.
pub async fn wait_connected_trusted(&self) -> Result<()> {
let command = NodeCommand::WaitConnected { trusted: true };
let response = self.channel.exec(command).await?;
let response = self.client.exec(command).await?;
let result = response.into_connected().check_variant()?;

Ok(result?)
Expand All @@ -124,7 +151,7 @@ impl NodeDriver {
/// Get current network info.
pub async fn network_info(&self) -> Result<NetworkInfoSnapshot> {
let command = NodeCommand::GetNetworkInfo;
let response = self.channel.exec(command).await?;
let response = self.client.exec(command).await?;
let network_info = response.into_network_info().check_variant()?;

Ok(network_info?)
Expand All @@ -133,7 +160,7 @@ impl NodeDriver {
/// Get all the multiaddresses on which the node listens.
pub async fn listeners(&self) -> Result<Array> {
let command = NodeCommand::GetListeners;
let response = self.channel.exec(command).await?;
let response = self.client.exec(command).await?;
let listeners = response.into_listeners().check_variant()?;
let result = listeners?.iter().map(js_value_from_display).collect();

Expand All @@ -143,7 +170,7 @@ impl NodeDriver {
/// Get all the peers that node is connected to.
pub async fn connected_peers(&self) -> Result<Array> {
let command = NodeCommand::GetConnectedPeers;
let response = self.channel.exec(command).await?;
let response = self.client.exec(command).await?;
let peers = response.into_connected_peers().check_variant()?;
let result = peers?.iter().map(js_value_from_display).collect();

Expand All @@ -156,7 +183,7 @@ impl NodeDriver {
peer_id: peer_id.parse()?,
is_trusted,
};
let response = self.channel.exec(command).await?;
let response = self.client.exec(command).await?;
let set_result = response.into_set_peer_trust().check_variant()?;

Ok(set_result?)
Expand All @@ -165,7 +192,7 @@ impl NodeDriver {
/// Request the head header from the network.
pub async fn request_head_header(&self) -> Result<JsValue> {
let command = NodeCommand::RequestHeader(SingleHeaderQuery::Head);
let response = self.channel.exec(command).await?;
let response = self.client.exec(command).await?;
let header = response.into_header().check_variant()?;

Ok(header.into_result()?)
Expand All @@ -174,7 +201,7 @@ impl NodeDriver {
/// Request a header for the block with a given hash from the network.
pub async fn request_header_by_hash(&self, hash: &str) -> Result<JsValue> {
let command = NodeCommand::RequestHeader(SingleHeaderQuery::ByHash(hash.parse()?));
let response = self.channel.exec(command).await?;
let response = self.client.exec(command).await?;
let header = response.into_header().check_variant()?;

Ok(header.into_result()?)
Expand All @@ -183,7 +210,7 @@ impl NodeDriver {
/// Request a header for the block with a given height from the network.
pub async fn request_header_by_height(&self, height: u64) -> Result<JsValue> {
let command = NodeCommand::RequestHeader(SingleHeaderQuery::ByHeight(height));
let response = self.channel.exec(command).await?;
let response = self.client.exec(command).await?;
let header = response.into_header().check_variant()?;

Ok(header.into_result()?)
Expand All @@ -201,7 +228,7 @@ impl NodeDriver {
from: from_header,
amount,
};
let response = self.channel.exec(command).await?;
let response = self.client.exec(command).await?;
let headers = response.into_headers().check_variant()?;

Ok(headers.into_result()?)
Expand All @@ -210,7 +237,7 @@ impl NodeDriver {
/// Get current header syncing info.
pub async fn syncer_info(&self) -> Result<JsValue> {
let command = NodeCommand::GetSyncerInfo;
let response = self.channel.exec(command).await?;
let response = self.client.exec(command).await?;
let syncer_info = response.into_syncer_info().check_variant()?;

Ok(to_value(&syncer_info?)?)
Expand All @@ -219,7 +246,7 @@ impl NodeDriver {
/// Get the latest header announced in the network.
pub async fn get_network_head_header(&self) -> Result<JsValue> {
let command = NodeCommand::LastSeenNetworkHead;
let response = self.channel.exec(command).await?;
let response = self.client.exec(command).await?;
let header = response.into_last_seen_network_head().check_variant()?;

Ok(header)
Expand All @@ -228,7 +255,7 @@ impl NodeDriver {
/// Get the latest locally synced header.
pub async fn get_local_head_header(&self) -> Result<JsValue> {
let command = NodeCommand::GetHeader(SingleHeaderQuery::Head);
let response = self.channel.exec(command).await?;
let response = self.client.exec(command).await?;
let header = response.into_header().check_variant()?;

Ok(header.into_result()?)
Expand All @@ -237,7 +264,7 @@ impl NodeDriver {
/// Get a synced header for the block with a given hash.
pub async fn get_header_by_hash(&self, hash: &str) -> Result<JsValue> {
let command = NodeCommand::GetHeader(SingleHeaderQuery::ByHash(hash.parse()?));
let response = self.channel.exec(command).await?;
let response = self.client.exec(command).await?;
let header = response.into_header().check_variant()?;

Ok(header.into_result()?)
Expand All @@ -246,7 +273,7 @@ impl NodeDriver {
/// Get a synced header for the block with a given height.
pub async fn get_header_by_height(&self, height: u64) -> Result<JsValue> {
let command = NodeCommand::GetHeader(SingleHeaderQuery::ByHeight(height));
let response = self.channel.exec(command).await?;
let response = self.client.exec(command).await?;
let header = response.into_header().check_variant()?;

Ok(header.into_result()?)
Expand All @@ -270,7 +297,7 @@ impl NodeDriver {
start_height,
end_height,
};
let response = self.channel.exec(command).await?;
let response = self.client.exec(command).await?;
let headers = response.into_headers().check_variant()?;

Ok(headers.into_result()?)
Expand All @@ -279,7 +306,7 @@ impl NodeDriver {
/// Get data sampling metadata of an already sampled height.
pub async fn get_sampling_metadata(&self, height: u64) -> Result<JsValue> {
let command = NodeCommand::GetSamplingMetadata { height };
let response = self.channel.exec(command).await?;
let response = self.client.exec(command).await?;
let metadata = response.into_sampling_metadata().check_variant()?;

Ok(to_value(&metadata?)?)
Expand All @@ -289,7 +316,7 @@ impl NodeDriver {
/// be processed and new NodeClient needs to be created to restart a node.
pub async fn close(&self) -> Result<()> {
let command = NodeCommand::CloseWorker;
let response = self.channel.exec(command).await?;
let response = self.client.exec(command).await?;
if response.is_worker_closed() {
Ok(())
} else {
Expand Down
Loading

0 comments on commit 7fe0397

Please sign in to comment.