Skip to content

Commit

Permalink
feat(cli): Add in-memory-store and pruning-delay parameters (#490)
Browse files Browse the repository at this point in the history
  • Loading branch information
oblique authored Dec 17, 2024
1 parent 5cfe280 commit 35fac5a
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 21 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ name = "lumina"
path = "src/main.rs"

[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
blockstore.workspace = true
celestia-rpc = { workspace = true, features = ["p2p"] }
celestia-types.workspace = true
libp2p.workspace = true
Expand Down
85 changes: 64 additions & 21 deletions cli/src/native.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,26 @@ use std::sync::Arc;
use std::time::Duration;

use anyhow::{bail, Context, Result};
use blockstore::EitherBlockstore;
use celestia_rpc::prelude::*;
use celestia_rpc::Client;
use clap::{value_parser, Parser};
use directories::ProjectDirs;
use libp2p::multiaddr::{Multiaddr, Protocol};
use lumina_node::blockstore::RedbBlockstore;
use lumina_node::blockstore::{InMemoryBlockstore, RedbBlockstore};
use lumina_node::events::NodeEvent;
use lumina_node::network::Network;
use lumina_node::node::Node;
use lumina_node::store::{RedbStore, Store};
use lumina_node::node::{Node, MIN_PRUNING_DELAY, MIN_SAMPLING_WINDOW};
use lumina_node::store::{EitherStore, InMemoryStore, RedbStore, Store as _};
use tokio::task::spawn_blocking;
use tracing::info;
use tracing::warn;

const CELESTIA_LOCAL_BRIDGE_RPC_ADDR: &str = "ws://localhost:36658";

type Blockstore = EitherBlockstore<InMemoryBlockstore, RedbBlockstore>;
type Store = EitherStore<InMemoryStore, RedbStore>;

#[derive(Debug, Parser)]
pub(crate) struct Params {
/// Network to connect.
Expand All @@ -36,34 +40,51 @@ pub(crate) struct Params {
pub(crate) bootnodes: Vec<Multiaddr>,

/// Persistent header store path.
#[arg(short, long = "store")]
#[arg(short, long)]
pub(crate) store: Option<PathBuf>,

/// Sampling window size, defines maximum age of headers considered for syncing and sampling.
/// Headers older than sampling window by more than an hour are eligible for pruning.
#[arg(long = "sampling-window", verbatim_doc_comment)]
/// Use in-memory store.
#[arg(long)]
pub(crate) in_memory_store: bool,

/// Sampling window defines maximum age of a block considered for syncing and sampling.
#[arg(long)]
#[clap(value_parser = parse_duration::parse)]
pub(crate) sampling_window: Option<Duration>,

/// Pruning delay defines how much time the pruner should wait after sampling window in
/// order to prune the block.
#[arg(long)]
#[clap(value_parser = parse_duration::parse)]
pub(crate) pruning_delay: Option<Duration>,
}

pub(crate) async fn run(args: Params) -> Result<()> {
info!("Initializing store");
let db = open_db(args.store, args.network.id()).await?;
let store = RedbStore::new(db.clone()).await?;
let blockstore = RedbBlockstore::new(db);

let stored_ranges = store.get_stored_header_ranges().await?;
if stored_ranges.is_empty() {
info!("Initialised new store");
let (blockstore, store) = if args.in_memory_store {
open_in_memory_stores()
} else {
info!("Initialised store, present headers: {stored_ranges}");
}
open_db_stores(args.store, args.network.id()).await?
};

let mut node_builder = Node::builder()
.store(store)
.blockstore(blockstore)
.network(args.network.clone());

if let Some(sampling_window) = args.sampling_window {
node_builder = node_builder.sampling_window(sampling_window);
} else if args.in_memory_store {
// In-memory stores are memory hungry, so we lower sampling window.
node_builder = node_builder.sampling_window(MIN_SAMPLING_WINDOW);
}

if let Some(pruning_delay) = args.pruning_delay {
node_builder = node_builder.pruning_delay(pruning_delay);
} else if args.in_memory_store {
// In-memory stores are memory hungry, so we lower pruning window.
node_builder = node_builder.pruning_delay(MIN_PRUNING_DELAY);
}

if args.bootnodes.is_empty() {
if args.network.is_custom() {
let bootnodes = fetch_bridge_multiaddrs(CELESTIA_LOCAL_BRIDGE_RPC_ADDR).await?;
Expand All @@ -77,10 +98,6 @@ pub(crate) async fn run(args: Params) -> Result<()> {
node_builder = node_builder.listen(args.listen_addrs);
}

if let Some(sampling_window) = args.sampling_window {
node_builder = node_builder.sampling_window(sampling_window);
}

let (_node, mut events) = node_builder
.start_subscribed()
.await
Expand All @@ -98,6 +115,32 @@ pub(crate) async fn run(args: Params) -> Result<()> {
Ok(())
}

fn open_in_memory_stores() -> (Blockstore, Store) {
info!("Initializing in-memory store");
let store = InMemoryStore::new();
let blockstore = InMemoryBlockstore::new();
(EitherBlockstore::Left(blockstore), EitherStore::Left(store))
}

async fn open_db_stores(path: Option<PathBuf>, network_id: &str) -> Result<(Blockstore, Store)> {
info!("Initializing store");
let db = open_db(path, network_id).await?;
let store = RedbStore::new(db.clone()).await?;
let blockstore = RedbBlockstore::new(db);

let stored_ranges = store.get_stored_header_ranges().await?;
if stored_ranges.is_empty() {
info!("Initialised new store",);
} else {
info!("Initialised store, present headers: {stored_ranges}");
}

Ok((
EitherBlockstore::Right(blockstore),
EitherStore::Right(store),
))
}

async fn open_db(path: Option<PathBuf>, network_id: &str) -> Result<Arc<redb::Database>> {
let network_id = network_id.to_owned();

Expand Down

0 comments on commit 35fac5a

Please sign in to comment.