Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(node)!: Implement NodeBuilder and remove NodeConfig #472

Merged
merged 23 commits into from
Dec 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 1 addition & 33 deletions cli/src/common.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,12 @@
use std::env::current_exe;

use anyhow::Result;
use clap::{Parser, ValueEnum};
use lumina_node::network::Network;
use clap::Parser;

use crate::native;
#[cfg(feature = "browser-node")]
use crate::server;

#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, ValueEnum)]
pub(crate) enum ArgNetwork {
#[default]
Mainnet,
Arabica,
Mocha,
Private,
}

#[derive(Debug, Parser)]
pub(crate) enum CliArgs {
/// Run native node locally
Expand Down Expand Up @@ -64,25 +54,3 @@ fn init_tracing() -> tracing_appender::non_blocking::WorkerGuard {

guard
}

impl From<ArgNetwork> for Network {
fn from(network: ArgNetwork) -> Network {
match network {
ArgNetwork::Mainnet => Network::Mainnet,
ArgNetwork::Arabica => Network::Arabica,
ArgNetwork::Mocha => Network::Mocha,
ArgNetwork::Private => Network::Private,
}
}
}

impl From<Network> for ArgNetwork {
fn from(network: Network) -> ArgNetwork {
match network {
Network::Mainnet => ArgNetwork::Mainnet,
Network::Arabica => ArgNetwork::Arabica,
Network::Mocha => ArgNetwork::Mocha,
Network::Private => ArgNetwork::Private,
}
}
}
71 changes: 35 additions & 36 deletions cli/src/native.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,27 +6,26 @@ use std::time::Duration;
use anyhow::{bail, Context, Result};
use celestia_rpc::prelude::*;
use celestia_rpc::Client;
use clap::Parser;
use clap::{value_parser, Parser};
use directories::ProjectDirs;
use libp2p::{identity, multiaddr::Protocol, Multiaddr};
use libp2p::multiaddr::{Multiaddr, Protocol};
use lumina_node::blockstore::RedbBlockstore;
use lumina_node::events::NodeEvent;
use lumina_node::network::{canonical_network_bootnodes, network_id, Network};
use lumina_node::node::{Node, NodeConfig};
use lumina_node::network::Network;
use lumina_node::node::Node;
use lumina_node::store::{RedbStore, Store};
use tokio::task::spawn_blocking;
use tracing::info;
use tracing::warn;

use crate::common::ArgNetwork;

const CELESTIA_LOCAL_BRIDGE_RPC_ADDR: &str = "ws://localhost:36658";

#[derive(Debug, Parser)]
pub(crate) struct Params {
/// Network to connect.
#[arg(short, long, value_enum, default_value_t)]
pub(crate) network: ArgNetwork,
#[arg(short, long)]
#[clap(value_parser = value_parser!(Network))]
pub(crate) network: Network,

/// Listening addresses. Can be used multiple times.
#[arg(short, long = "listen")]
Expand All @@ -44,26 +43,12 @@ pub(crate) struct Params {
/// Headers older than syncing window by more than an hour are eligible for pruning.
#[arg(long = "syncing-window", verbatim_doc_comment)]
#[clap(value_parser = parse_duration::parse)]
pub(crate) custom_syncing_window: Option<Duration>,
pub(crate) syncing_window: Option<Duration>,
}

pub(crate) async fn run(args: Params) -> Result<()> {
let network = args.network.into();
let p2p_local_keypair = identity::Keypair::generate_ed25519();

let p2p_bootnodes = if args.bootnodes.is_empty() {
match network {
Network::Private => fetch_bridge_multiaddrs(CELESTIA_LOCAL_BRIDGE_RPC_ADDR).await?,
network => canonical_network_bootnodes(network).collect(),
}
} else {
args.bootnodes
};

let network_id = network_id(network).to_owned();

info!("Initializing store");
let db = open_db(args.store, &network_id).await?;
let db = open_db(args.store, args.network.id()).await?;
let store = RedbStore::new(db.clone()).await?;
let blockstore = RedbBlockstore::new(db);

Expand All @@ -74,18 +59,32 @@ pub(crate) async fn run(args: Params) -> Result<()> {
info!("Initialised store, present headers: {stored_ranges}");
}

let (_node, mut events) = Node::new_subscribed(NodeConfig {
network_id,
p2p_local_keypair,
p2p_bootnodes,
p2p_listen_on: args.listen_addrs,
sync_batch_size: 512,
custom_syncing_window: args.custom_syncing_window,
blockstore,
store,
})
.await
.context("Failed to start node")?;
let mut node_builder = Node::builder()
.store(store)
.blockstore(blockstore)
.network(args.network.clone());

if args.bootnodes.is_empty() {
if args.network.is_custom() {
let bootnodes = fetch_bridge_multiaddrs(CELESTIA_LOCAL_BRIDGE_RPC_ADDR).await?;
node_builder = node_builder.bootnodes(bootnodes);
}
} else {
node_builder = node_builder.bootnodes(args.bootnodes);
}

if !args.listen_addrs.is_empty() {
node_builder = node_builder.listen(args.listen_addrs);
}

if let Some(syncing_window) = args.syncing_window {
node_builder = node_builder.syncing_window(syncing_window);
}

let (_node, mut events) = node_builder
.start_subscribed()
.await
.context("Failed to start node")?;

while let Ok(ev) = events.recv().await {
match ev.event {
Expand Down
55 changes: 29 additions & 26 deletions node-wasm/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,15 @@
use std::time::Duration;

use js_sys::Array;
use libp2p::identity::Keypair;
use serde::{Deserialize, Serialize};
use serde_wasm_bindgen::to_value;
use tracing::{debug, error};
use wasm_bindgen::prelude::*;
use web_sys::BroadcastChannel;

use lumina_node::blockstore::IndexedDbBlockstore;
use lumina_node::network::{canonical_network_bootnodes, network_id};
use lumina_node::node::NodeConfig;
use lumina_node::network;
use lumina_node::node::NodeBuilder;
use lumina_node::store::IndexedDbStore;

use crate::commands::{CheckableResponseExt, NodeCommand, SingleHeaderQuery};
Expand Down Expand Up @@ -375,51 +374,55 @@ impl NodeClient {
impl WasmNodeConfig {
/// Get the configuration with default bootnodes for provided network
pub fn default(network: Network) -> WasmNodeConfig {
let bootnodes = network::Network::from(network)
.canonical_bootnodes()
.map(|addr| addr.to_string())
.collect::<Vec<_>>();

WasmNodeConfig {
network,
bootnodes: canonical_network_bootnodes(network.into())
.map(|addr| addr.to_string())
.collect::<Vec<_>>(),
bootnodes,
custom_syncing_window_secs: None,
}
}

pub(crate) async fn into_node_config(
pub(crate) async fn into_node_builder(
self,
) -> Result<NodeConfig<IndexedDbBlockstore, IndexedDbStore>> {
let network_id = network_id(self.network.into());
) -> Result<NodeBuilder<IndexedDbBlockstore, IndexedDbStore>> {
let network = network::Network::from(self.network);
let network_id = network.id();

let store = IndexedDbStore::new(network_id)
.await
.context("Failed to open the store")?;
let blockstore = IndexedDbBlockstore::new(&format!("{network_id}-blockstore"))
.await
.context("Failed to open the blockstore")?;

let p2p_local_keypair = Keypair::generate_ed25519();
let mut builder = NodeBuilder::new()
.store(store)
.blockstore(blockstore)
.network(network)
.sync_batch_size(128);

let mut bootnodes = Vec::with_capacity(self.bootnodes.len());

let mut p2p_bootnodes = Vec::with_capacity(self.bootnodes.len());
for addr in self.bootnodes {
let addr = addr
.parse()
.with_context(|| format!("invalid multiaddr: '{addr}"))?;
let resolved_addrs = resolve_dnsaddr_multiaddress(addr).await?;
p2p_bootnodes.extend(resolved_addrs.into_iter());
bootnodes.extend(resolved_addrs.into_iter());
}

builder = builder.bootnodes(bootnodes);

if let Some(secs) = self.custom_syncing_window_secs {
let dur = Duration::from_secs(secs.into());
builder = builder.syncing_window(dur);
}

let syncing_window = self
.custom_syncing_window_secs
.map(|d| Duration::from_secs(d.into()));

Ok(NodeConfig {
network_id: network_id.to_string(),
p2p_bootnodes,
p2p_local_keypair,
p2p_listen_on: vec![],
sync_batch_size: 128,
custom_syncing_window: syncing_window,
blockstore,
store,
})
Ok(builder)
}
}

Expand Down
19 changes: 12 additions & 7 deletions node-wasm/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,18 +61,23 @@ impl From<Network> for network::Network {
Network::Mainnet => network::Network::Mainnet,
Network::Arabica => network::Network::Arabica,
Network::Mocha => network::Network::Mocha,
Network::Private => network::Network::Private,
Network::Private => network::Network::custom("private").expect("invalid network id"),
}
}
}

impl From<network::Network> for Network {
fn from(network: network::Network) -> Network {
impl TryFrom<network::Network> for Network {
type Error = Error;

fn try_from(network: network::Network) -> Result<Network, Error> {
match network {
network::Network::Mainnet => Network::Mainnet,
network::Network::Arabica => Network::Arabica,
network::Network::Mocha => Network::Mocha,
network::Network::Private => Network::Private,
network::Network::Mainnet => Ok(Network::Mainnet),
network::Network::Arabica => Ok(Network::Arabica),
network::Network::Mocha => Ok(Network::Mocha),
network::Network::Custom(id) => match id.as_ref() {
"private" => Ok(Network::Private),
_ => Err(Error::new("Unsupported network id: {id}")),
},
}
}
}
Expand Down
12 changes: 2 additions & 10 deletions node-wasm/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use web_sys::BroadcastChannel;
use lumina_node::blockstore::IndexedDbBlockstore;
use lumina_node::events::{EventSubscriber, NodeEventInfo};
use lumina_node::node::{Node, SyncingInfo};
use lumina_node::store::{IndexedDbStore, SamplingMetadata, Store};
use lumina_node::store::{IndexedDbStore, SamplingMetadata};

use crate::client::WasmNodeConfig;
use crate::commands::{NodeCommand, SingleHeaderQuery, WorkerResponse};
Expand Down Expand Up @@ -123,15 +123,7 @@ impl NodeWorker {

impl NodeWorkerInstance {
async fn new(events_channel_name: &str, config: WasmNodeConfig) -> Result<Self> {
let config = config.into_node_config().await?;

if let Ok(store_height) = config.store.head_height().await {
info!("Initialised store with head height: {store_height}");
} else {
info!("Initialised new empty store");
}

let (node, events_sub) = Node::new_subscribed(config).await?;
let (node, events_sub) = config.into_node_builder().await?.start_subscribed().await?;

let events_channel = BroadcastChannel::new(events_channel_name)
.context("Failed to allocate BroadcastChannel")?;
Expand Down
38 changes: 15 additions & 23 deletions node/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,17 @@ A crate to configure, run and interact with Celestia's data availability nodes.

```rust,no_run
use std::sync::Arc;
use libp2p::{identity, multiaddr::Protocol, Multiaddr};

use lumina_node::blockstore::RedbBlockstore;
use lumina_node::network::{
canonical_network_bootnodes, network_id, Network,
};
use lumina_node::node::{Node, NodeConfig};
use lumina_node::network::Network;
use lumina_node::node::Node;
use lumina_node::store::RedbStore;
use redb::Database;
use tokio::task::spawn_blocking;

#[tokio::main]
async fn main() {
let p2p_local_keypair = identity::Keypair::generate_ed25519();
let network = Network::Mainnet;
let network_id = network_id(network).to_owned();
let p2p_bootnodes = canonical_network_bootnodes(network).collect();

let db = spawn_blocking(|| redb::Database::create("path/to/db"))
let db = spawn_blocking(|| Database::create("lumina.redb"))
.await
.expect("Failed to join")
.expect("Failed to open the database");
Expand All @@ -31,24 +25,22 @@ async fn main() {
.expect("Failed to create a store");
let blockstore = RedbBlockstore::new(db);

let node = Node::new(NodeConfig {
network_id,
p2p_local_keypair,
p2p_bootnodes,
p2p_listen_on: vec!["/ip4/0.0.0.0/tcp/0".parse().unwrap()],
sync_batch_size: 512,
custom_syncing_window: None,
blockstore,
store,
})
.await
.expect("Failed to start node");
let node = Node::builder()
.store(store)
.blockstore(blockstore)
.network(Network::Mainnet)
.listen(["/ip4/0.0.0.0/tcp/0".parse().unwrap()])
.start()
.await
.expect("Failed to build and start node");

node.wait_connected().await.expect("Failed to connect");

let header = node
.request_header_by_height(15)
.await
.expect("Height not found");

println!("{}", serde_json::to_string_pretty(&header).unwrap());
}
```
Loading
Loading