From 35fac5a45c2a5170885e125ed04d3f4c3683a443 Mon Sep 17 00:00:00 2001 From: Yiannis Marangos Date: Tue, 17 Dec 2024 14:28:59 +0200 Subject: [PATCH] feat(cli): Add `in-memory-store` and `pruning-delay` parameters (#490) --- Cargo.lock | 1 + cli/Cargo.toml | 1 + cli/src/native.rs | 85 +++++++++++++++++++++++++++++++++++------------ 3 files changed, 66 insertions(+), 21 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 74b19d66..47caaaa7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3354,6 +3354,7 @@ version = "0.5.2" dependencies = [ "anyhow", "axum", + "blockstore", "celestia-rpc", "celestia-types", "clap", diff --git a/cli/Cargo.toml b/cli/Cargo.toml index ca20db78..bebb050e 100644 --- a/cli/Cargo.toml +++ b/cli/Cargo.toml @@ -24,6 +24,7 @@ name = "lumina" path = "src/main.rs" [target.'cfg(not(target_arch = "wasm32"))'.dependencies] +blockstore.workspace = true celestia-rpc = { workspace = true, features = ["p2p"] } celestia-types.workspace = true libp2p.workspace = true diff --git a/cli/src/native.rs b/cli/src/native.rs index e06cf5d2..49f1875c 100644 --- a/cli/src/native.rs +++ b/cli/src/native.rs @@ -4,22 +4,26 @@ use std::sync::Arc; use std::time::Duration; use anyhow::{bail, Context, Result}; +use blockstore::EitherBlockstore; use celestia_rpc::prelude::*; use celestia_rpc::Client; use clap::{value_parser, Parser}; use directories::ProjectDirs; use libp2p::multiaddr::{Multiaddr, Protocol}; -use lumina_node::blockstore::RedbBlockstore; +use lumina_node::blockstore::{InMemoryBlockstore, RedbBlockstore}; use lumina_node::events::NodeEvent; use lumina_node::network::Network; -use lumina_node::node::Node; -use lumina_node::store::{RedbStore, Store}; +use lumina_node::node::{Node, MIN_PRUNING_DELAY, MIN_SAMPLING_WINDOW}; +use lumina_node::store::{EitherStore, InMemoryStore, RedbStore, Store as _}; use tokio::task::spawn_blocking; use tracing::info; use tracing::warn; const CELESTIA_LOCAL_BRIDGE_RPC_ADDR: &str = "ws://localhost:36658"; +type Blockstore = EitherBlockstore; +type Store = EitherStore; + #[derive(Debug, Parser)] pub(crate) struct Params { /// Network to connect. @@ -36,34 +40,51 @@ pub(crate) struct Params { pub(crate) bootnodes: Vec, /// Persistent header store path. - #[arg(short, long = "store")] + #[arg(short, long)] pub(crate) store: Option, - /// Sampling window size, defines maximum age of headers considered for syncing and sampling. - /// Headers older than sampling window by more than an hour are eligible for pruning. - #[arg(long = "sampling-window", verbatim_doc_comment)] + /// Use in-memory store. + #[arg(long)] + pub(crate) in_memory_store: bool, + + /// Sampling window defines maximum age of a block considered for syncing and sampling. + #[arg(long)] #[clap(value_parser = parse_duration::parse)] pub(crate) sampling_window: Option, + + /// Pruning delay defines how much time the pruner should wait after sampling window in + /// order to prune the block. + #[arg(long)] + #[clap(value_parser = parse_duration::parse)] + pub(crate) pruning_delay: Option, } pub(crate) async fn run(args: Params) -> Result<()> { - info!("Initializing store"); - let db = open_db(args.store, args.network.id()).await?; - let store = RedbStore::new(db.clone()).await?; - let blockstore = RedbBlockstore::new(db); - - let stored_ranges = store.get_stored_header_ranges().await?; - if stored_ranges.is_empty() { - info!("Initialised new store"); + let (blockstore, store) = if args.in_memory_store { + open_in_memory_stores() } else { - info!("Initialised store, present headers: {stored_ranges}"); - } + open_db_stores(args.store, args.network.id()).await? + }; let mut node_builder = Node::builder() .store(store) .blockstore(blockstore) .network(args.network.clone()); + if let Some(sampling_window) = args.sampling_window { + node_builder = node_builder.sampling_window(sampling_window); + } else if args.in_memory_store { + // In-memory stores are memory hungry, so we lower sampling window. + node_builder = node_builder.sampling_window(MIN_SAMPLING_WINDOW); + } + + if let Some(pruning_delay) = args.pruning_delay { + node_builder = node_builder.pruning_delay(pruning_delay); + } else if args.in_memory_store { + // In-memory stores are memory hungry, so we lower pruning window. + node_builder = node_builder.pruning_delay(MIN_PRUNING_DELAY); + } + if args.bootnodes.is_empty() { if args.network.is_custom() { let bootnodes = fetch_bridge_multiaddrs(CELESTIA_LOCAL_BRIDGE_RPC_ADDR).await?; @@ -77,10 +98,6 @@ pub(crate) async fn run(args: Params) -> Result<()> { node_builder = node_builder.listen(args.listen_addrs); } - if let Some(sampling_window) = args.sampling_window { - node_builder = node_builder.sampling_window(sampling_window); - } - let (_node, mut events) = node_builder .start_subscribed() .await @@ -98,6 +115,32 @@ pub(crate) async fn run(args: Params) -> Result<()> { Ok(()) } +fn open_in_memory_stores() -> (Blockstore, Store) { + info!("Initializing in-memory store"); + let store = InMemoryStore::new(); + let blockstore = InMemoryBlockstore::new(); + (EitherBlockstore::Left(blockstore), EitherStore::Left(store)) +} + +async fn open_db_stores(path: Option, network_id: &str) -> Result<(Blockstore, Store)> { + info!("Initializing store"); + let db = open_db(path, network_id).await?; + let store = RedbStore::new(db.clone()).await?; + let blockstore = RedbBlockstore::new(db); + + let stored_ranges = store.get_stored_header_ranges().await?; + if stored_ranges.is_empty() { + info!("Initialised new store",); + } else { + info!("Initialised store, present headers: {stored_ranges}"); + } + + Ok(( + EitherBlockstore::Right(blockstore), + EitherStore::Right(store), + )) +} + async fn open_db(path: Option, network_id: &str) -> Result> { let network_id = network_id.to_owned();