Skip to content

Commit

Permalink
feat(node)!: Implement NodeBuilder and remove NodeConfig (#472)
Browse files Browse the repository at this point in the history
Co-authored-by: Mikołaj Florkiewicz <[email protected]>
Signed-off-by: Yiannis Marangos <[email protected]>
  • Loading branch information
oblique and fl0rek authored Dec 11, 2024
1 parent 82c51f6 commit 7705ece
Show file tree
Hide file tree
Showing 19 changed files with 777 additions and 446 deletions.
34 changes: 1 addition & 33 deletions cli/src/common.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,12 @@
use std::env::current_exe;

use anyhow::Result;
use clap::{Parser, ValueEnum};
use lumina_node::network::Network;
use clap::Parser;

use crate::native;
#[cfg(feature = "browser-node")]
use crate::server;

#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, ValueEnum)]
pub(crate) enum ArgNetwork {
#[default]
Mainnet,
Arabica,
Mocha,
Private,
}

#[derive(Debug, Parser)]
pub(crate) enum CliArgs {
/// Run native node locally
Expand Down Expand Up @@ -64,25 +54,3 @@ fn init_tracing() -> tracing_appender::non_blocking::WorkerGuard {

guard
}

impl From<ArgNetwork> for Network {
fn from(network: ArgNetwork) -> Network {
match network {
ArgNetwork::Mainnet => Network::Mainnet,
ArgNetwork::Arabica => Network::Arabica,
ArgNetwork::Mocha => Network::Mocha,
ArgNetwork::Private => Network::Private,
}
}
}

impl From<Network> for ArgNetwork {
fn from(network: Network) -> ArgNetwork {
match network {
Network::Mainnet => ArgNetwork::Mainnet,
Network::Arabica => ArgNetwork::Arabica,
Network::Mocha => ArgNetwork::Mocha,
Network::Private => ArgNetwork::Private,
}
}
}
71 changes: 35 additions & 36 deletions cli/src/native.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,27 +6,26 @@ use std::time::Duration;
use anyhow::{bail, Context, Result};
use celestia_rpc::prelude::*;
use celestia_rpc::Client;
use clap::Parser;
use clap::{value_parser, Parser};
use directories::ProjectDirs;
use libp2p::{identity, multiaddr::Protocol, Multiaddr};
use libp2p::multiaddr::{Multiaddr, Protocol};
use lumina_node::blockstore::RedbBlockstore;
use lumina_node::events::NodeEvent;
use lumina_node::network::{canonical_network_bootnodes, network_id, Network};
use lumina_node::node::{Node, NodeConfig};
use lumina_node::network::Network;
use lumina_node::node::Node;
use lumina_node::store::{RedbStore, Store};
use tokio::task::spawn_blocking;
use tracing::info;
use tracing::warn;

use crate::common::ArgNetwork;

const CELESTIA_LOCAL_BRIDGE_RPC_ADDR: &str = "ws://localhost:36658";

#[derive(Debug, Parser)]
pub(crate) struct Params {
/// Network to connect.
#[arg(short, long, value_enum, default_value_t)]
pub(crate) network: ArgNetwork,
#[arg(short, long)]
#[clap(value_parser = value_parser!(Network))]
pub(crate) network: Network,

/// Listening addresses. Can be used multiple times.
#[arg(short, long = "listen")]
Expand All @@ -44,26 +43,12 @@ pub(crate) struct Params {
/// Headers older than syncing window by more than an hour are eligible for pruning.
#[arg(long = "syncing-window", verbatim_doc_comment)]
#[clap(value_parser = parse_duration::parse)]
pub(crate) custom_syncing_window: Option<Duration>,
pub(crate) syncing_window: Option<Duration>,
}

pub(crate) async fn run(args: Params) -> Result<()> {
let network = args.network.into();
let p2p_local_keypair = identity::Keypair::generate_ed25519();

let p2p_bootnodes = if args.bootnodes.is_empty() {
match network {
Network::Private => fetch_bridge_multiaddrs(CELESTIA_LOCAL_BRIDGE_RPC_ADDR).await?,
network => canonical_network_bootnodes(network).collect(),
}
} else {
args.bootnodes
};

let network_id = network_id(network).to_owned();

info!("Initializing store");
let db = open_db(args.store, &network_id).await?;
let db = open_db(args.store, args.network.id()).await?;
let store = RedbStore::new(db.clone()).await?;
let blockstore = RedbBlockstore::new(db);

Expand All @@ -74,18 +59,32 @@ pub(crate) async fn run(args: Params) -> Result<()> {
info!("Initialised store, present headers: {stored_ranges}");
}

let (_node, mut events) = Node::new_subscribed(NodeConfig {
network_id,
p2p_local_keypair,
p2p_bootnodes,
p2p_listen_on: args.listen_addrs,
sync_batch_size: 512,
custom_syncing_window: args.custom_syncing_window,
blockstore,
store,
})
.await
.context("Failed to start node")?;
let mut node_builder = Node::builder()
.store(store)
.blockstore(blockstore)
.network(args.network.clone());

if args.bootnodes.is_empty() {
if args.network.is_custom() {
let bootnodes = fetch_bridge_multiaddrs(CELESTIA_LOCAL_BRIDGE_RPC_ADDR).await?;
node_builder = node_builder.bootnodes(bootnodes);
}
} else {
node_builder = node_builder.bootnodes(args.bootnodes);
}

if !args.listen_addrs.is_empty() {
node_builder = node_builder.listen(args.listen_addrs);
}

if let Some(syncing_window) = args.syncing_window {
node_builder = node_builder.syncing_window(syncing_window);
}

let (_node, mut events) = node_builder
.start_subscribed()
.await
.context("Failed to start node")?;

while let Ok(ev) = events.recv().await {
match ev.event {
Expand Down
55 changes: 29 additions & 26 deletions node-wasm/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,15 @@
use std::time::Duration;

use js_sys::Array;
use libp2p::identity::Keypair;
use serde::{Deserialize, Serialize};
use serde_wasm_bindgen::to_value;
use tracing::{debug, error};
use wasm_bindgen::prelude::*;
use web_sys::BroadcastChannel;

use lumina_node::blockstore::IndexedDbBlockstore;
use lumina_node::network::{canonical_network_bootnodes, network_id};
use lumina_node::node::NodeConfig;
use lumina_node::network;
use lumina_node::node::NodeBuilder;
use lumina_node::store::IndexedDbStore;

use crate::commands::{CheckableResponseExt, NodeCommand, SingleHeaderQuery};
Expand Down Expand Up @@ -375,51 +374,55 @@ impl NodeClient {
impl WasmNodeConfig {
/// Get the configuration with default bootnodes for provided network
pub fn default(network: Network) -> WasmNodeConfig {
let bootnodes = network::Network::from(network)
.canonical_bootnodes()
.map(|addr| addr.to_string())
.collect::<Vec<_>>();

WasmNodeConfig {
network,
bootnodes: canonical_network_bootnodes(network.into())
.map(|addr| addr.to_string())
.collect::<Vec<_>>(),
bootnodes,
custom_syncing_window_secs: None,
}
}

pub(crate) async fn into_node_config(
pub(crate) async fn into_node_builder(
self,
) -> Result<NodeConfig<IndexedDbBlockstore, IndexedDbStore>> {
let network_id = network_id(self.network.into());
) -> Result<NodeBuilder<IndexedDbBlockstore, IndexedDbStore>> {
let network = network::Network::from(self.network);
let network_id = network.id();

let store = IndexedDbStore::new(network_id)
.await
.context("Failed to open the store")?;
let blockstore = IndexedDbBlockstore::new(&format!("{network_id}-blockstore"))
.await
.context("Failed to open the blockstore")?;

let p2p_local_keypair = Keypair::generate_ed25519();
let mut builder = NodeBuilder::new()
.store(store)
.blockstore(blockstore)
.network(network)
.sync_batch_size(128);

let mut bootnodes = Vec::with_capacity(self.bootnodes.len());

let mut p2p_bootnodes = Vec::with_capacity(self.bootnodes.len());
for addr in self.bootnodes {
let addr = addr
.parse()
.with_context(|| format!("invalid multiaddr: '{addr}"))?;
let resolved_addrs = resolve_dnsaddr_multiaddress(addr).await?;
p2p_bootnodes.extend(resolved_addrs.into_iter());
bootnodes.extend(resolved_addrs.into_iter());
}

builder = builder.bootnodes(bootnodes);

if let Some(secs) = self.custom_syncing_window_secs {
let dur = Duration::from_secs(secs.into());
builder = builder.syncing_window(dur);
}

let syncing_window = self
.custom_syncing_window_secs
.map(|d| Duration::from_secs(d.into()));

Ok(NodeConfig {
network_id: network_id.to_string(),
p2p_bootnodes,
p2p_local_keypair,
p2p_listen_on: vec![],
sync_batch_size: 128,
custom_syncing_window: syncing_window,
blockstore,
store,
})
Ok(builder)
}
}

Expand Down
19 changes: 12 additions & 7 deletions node-wasm/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,18 +61,23 @@ impl From<Network> for network::Network {
Network::Mainnet => network::Network::Mainnet,
Network::Arabica => network::Network::Arabica,
Network::Mocha => network::Network::Mocha,
Network::Private => network::Network::Private,
Network::Private => network::Network::custom("private").expect("invalid network id"),
}
}
}

impl From<network::Network> for Network {
fn from(network: network::Network) -> Network {
impl TryFrom<network::Network> for Network {
type Error = Error;

fn try_from(network: network::Network) -> Result<Network, Error> {
match network {
network::Network::Mainnet => Network::Mainnet,
network::Network::Arabica => Network::Arabica,
network::Network::Mocha => Network::Mocha,
network::Network::Private => Network::Private,
network::Network::Mainnet => Ok(Network::Mainnet),
network::Network::Arabica => Ok(Network::Arabica),
network::Network::Mocha => Ok(Network::Mocha),
network::Network::Custom(id) => match id.as_ref() {
"private" => Ok(Network::Private),
_ => Err(Error::new("Unsupported network id: {id}")),
},
}
}
}
Expand Down
12 changes: 2 additions & 10 deletions node-wasm/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use web_sys::BroadcastChannel;
use lumina_node::blockstore::IndexedDbBlockstore;
use lumina_node::events::{EventSubscriber, NodeEventInfo};
use lumina_node::node::{Node, SyncingInfo};
use lumina_node::store::{IndexedDbStore, SamplingMetadata, Store};
use lumina_node::store::{IndexedDbStore, SamplingMetadata};

use crate::client::WasmNodeConfig;
use crate::commands::{NodeCommand, SingleHeaderQuery, WorkerResponse};
Expand Down Expand Up @@ -123,15 +123,7 @@ impl NodeWorker {

impl NodeWorkerInstance {
async fn new(events_channel_name: &str, config: WasmNodeConfig) -> Result<Self> {
let config = config.into_node_config().await?;

if let Ok(store_height) = config.store.head_height().await {
info!("Initialised store with head height: {store_height}");
} else {
info!("Initialised new empty store");
}

let (node, events_sub) = Node::new_subscribed(config).await?;
let (node, events_sub) = config.into_node_builder().await?.start_subscribed().await?;

let events_channel = BroadcastChannel::new(events_channel_name)
.context("Failed to allocate BroadcastChannel")?;
Expand Down
38 changes: 15 additions & 23 deletions node/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,17 @@ A crate to configure, run and interact with Celestia's data availability nodes.

```rust,no_run
use std::sync::Arc;
use libp2p::{identity, multiaddr::Protocol, Multiaddr};
use lumina_node::blockstore::RedbBlockstore;
use lumina_node::network::{
canonical_network_bootnodes, network_id, Network,
};
use lumina_node::node::{Node, NodeConfig};
use lumina_node::network::Network;
use lumina_node::node::Node;
use lumina_node::store::RedbStore;
use redb::Database;
use tokio::task::spawn_blocking;
#[tokio::main]
async fn main() {
let p2p_local_keypair = identity::Keypair::generate_ed25519();
let network = Network::Mainnet;
let network_id = network_id(network).to_owned();
let p2p_bootnodes = canonical_network_bootnodes(network).collect();
let db = spawn_blocking(|| redb::Database::create("path/to/db"))
let db = spawn_blocking(|| Database::create("lumina.redb"))
.await
.expect("Failed to join")
.expect("Failed to open the database");
Expand All @@ -31,24 +25,22 @@ async fn main() {
.expect("Failed to create a store");
let blockstore = RedbBlockstore::new(db);
let node = Node::new(NodeConfig {
network_id,
p2p_local_keypair,
p2p_bootnodes,
p2p_listen_on: vec!["/ip4/0.0.0.0/tcp/0".parse().unwrap()],
sync_batch_size: 512,
custom_syncing_window: None,
blockstore,
store,
})
.await
.expect("Failed to start node");
let node = Node::builder()
.store(store)
.blockstore(blockstore)
.network(Network::Mainnet)
.listen(["/ip4/0.0.0.0/tcp/0".parse().unwrap()])
.start()
.await
.expect("Failed to build and start node");
node.wait_connected().await.expect("Failed to connect");
let header = node
.request_header_by_height(15)
.await
.expect("Height not found");
println!("{}", serde_json::to_string_pretty(&header).unwrap());
}
```
Loading

0 comments on commit 7705ece

Please sign in to comment.