Skip to content

Commit

Permalink
chore(node,node-wasm)!: Rename syncing_window to sampling_window (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
oblique authored Dec 12, 2024
1 parent 7705ece commit 27339cb
Show file tree
Hide file tree
Showing 8 changed files with 59 additions and 57 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ Rust implementation of Celestia's [data availability node](https://github.com/ce
Run Lumina now at [lumina.rs](https://lumina.rs/) and directly verify Celestia.

Supported features:
- Backward and forward synchronization of block headers within syncing window
- Backward and forward synchronization of block headers within sampling window
- Header exchange (`header-ex`) client and server
- Listening for, verifying and redistributing extended headers on gossip protocol (`header-sub`)
- Listening for, verifying and redistributing fraud proofs on gossip protocol (`fraud-sub`)
Expand Down
12 changes: 6 additions & 6 deletions cli/src/native.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,11 @@ pub(crate) struct Params {
#[arg(short, long = "store")]
pub(crate) store: Option<PathBuf>,

/// Syncing window size, defines maximum age of headers considered for syncing and sampling.
/// Headers older than syncing window by more than an hour are eligible for pruning.
#[arg(long = "syncing-window", verbatim_doc_comment)]
/// Sampling window size, defines maximum age of headers considered for syncing and sampling.
/// Headers older than sampling window by more than an hour are eligible for pruning.
#[arg(long = "sampling-window", verbatim_doc_comment)]
#[clap(value_parser = parse_duration::parse)]
pub(crate) syncing_window: Option<Duration>,
pub(crate) sampling_window: Option<Duration>,
}

pub(crate) async fn run(args: Params) -> Result<()> {
Expand Down Expand Up @@ -77,8 +77,8 @@ pub(crate) async fn run(args: Params) -> Result<()> {
node_builder = node_builder.listen(args.listen_addrs);
}

if let Some(syncing_window) = args.syncing_window {
node_builder = node_builder.syncing_window(syncing_window);
if let Some(sampling_window) = args.sampling_window {
node_builder = node_builder.sampling_window(sampling_window);
}

let (_node, mut events) = node_builder
Expand Down
14 changes: 7 additions & 7 deletions node-wasm/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ pub struct WasmNodeConfig {
/// A list of bootstrap peers to connect to.
#[wasm_bindgen(getter_with_clone)]
pub bootnodes: Vec<String>,
/// Syncing window size, defines maximum age of headers considered for syncing and sampling.
/// Headers older than syncing window by more than an hour are eligible for pruning.
pub custom_syncing_window_secs: Option<u32>,
/// Sampling window size, defines maximum age of headers considered for syncing and sampling.
/// Headers older than sampling window by more than an hour are eligible for pruning.
pub custom_sampling_window_secs: Option<u32>,
}

/// `NodeClient` is responsible for steering [`NodeWorker`] by sending it commands and receiving
Expand Down Expand Up @@ -382,7 +382,7 @@ impl WasmNodeConfig {
WasmNodeConfig {
network,
bootnodes,
custom_syncing_window_secs: None,
custom_sampling_window_secs: None,
}
}

Expand Down Expand Up @@ -417,9 +417,9 @@ impl WasmNodeConfig {

builder = builder.bootnodes(bootnodes);

if let Some(secs) = self.custom_syncing_window_secs {
if let Some(secs) = self.custom_sampling_window_secs {
let dur = Duration::from_secs(secs.into());
builder = builder.syncing_window(dur);
builder = builder.sampling_window(dur);
}

Ok(builder)
Expand Down Expand Up @@ -513,7 +513,7 @@ mod tests {
.start(&WasmNodeConfig {
network: Network::Private,
bootnodes,
custom_syncing_window_secs: None,
custom_sampling_window_secs: None,
})
.await
.unwrap();
Expand Down
8 changes: 4 additions & 4 deletions node/src/daser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,7 @@ mod tests {
use super::*;
use crate::events::{EventChannel, EventSubscriber};
use crate::executor::sleep;
use crate::node::DEFAULT_SYNCING_WINDOW;
use crate::node::DEFAULT_SAMPLING_WINDOW;
use crate::p2p::shwap::convert_cid;
use crate::p2p::P2pCmd;
use crate::store::InMemoryStore;
Expand Down Expand Up @@ -503,7 +503,7 @@ mod tests {
event_pub: events.publisher(),
p2p: Arc::new(mock),
store: store.clone(),
sampling_window: DEFAULT_SYNCING_WINDOW,
sampling_window: DEFAULT_SAMPLING_WINDOW,
})
.unwrap();

Expand All @@ -530,7 +530,7 @@ mod tests {
event_pub: events.publisher(),
p2p: Arc::new(mock),
store: store.clone(),
sampling_window: DEFAULT_SYNCING_WINDOW,
sampling_window: DEFAULT_SAMPLING_WINDOW,
})
.unwrap();

Expand All @@ -555,7 +555,7 @@ mod tests {
event_pub: events.publisher(),
p2p: Arc::new(mock),
store: store.clone(),
sampling_window: DEFAULT_SYNCING_WINDOW,
sampling_window: DEFAULT_SAMPLING_WINDOW,
})
.unwrap();

Expand Down
12 changes: 7 additions & 5 deletions node/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ use crate::syncer::{Syncer, SyncerArgs};
mod builder;

pub use self::builder::{
NodeBuilder, NodeBuilderError, DEFAULT_PRUNING_DELAY, DEFAULT_SYNCING_WINDOW,
MIN_PRUNING_DELAY, MIN_SYNCING_WINDOW,
NodeBuilder, NodeBuilderError, DEFAULT_PRUNING_DELAY, DEFAULT_SAMPLING_WINDOW,
MIN_PRUNING_DELAY, MIN_SAMPLING_WINDOW,
};
pub use crate::daser::DaserError;
pub use crate::p2p::{HeaderExError, P2pError};
Expand Down Expand Up @@ -82,7 +82,7 @@ where
pub(crate) p2p_bootnodes: Vec<Multiaddr>,
pub(crate) p2p_listen_on: Vec<Multiaddr>,
pub(crate) sync_batch_size: u64,
pub(crate) syncing_window: Duration,
pub(crate) sampling_window: Duration,
pub(crate) pruning_window: Duration,
}

Expand Down Expand Up @@ -158,14 +158,16 @@ where
p2p: p2p.clone(),
event_pub: event_channel.publisher(),
batch_size: config.sync_batch_size,
syncing_window: config.syncing_window,
// We sync only what we need to sample. So syncing_window is
// the same as sampling_window.
syncing_window: config.sampling_window,
})?);

let daser = Arc::new(Daser::start(DaserArgs {
p2p: p2p.clone(),
store: store.clone(),
event_pub: event_channel.publisher(),
sampling_window: config.syncing_window,
sampling_window: config.sampling_window,
})?);

let pruner = Arc::new(Pruner::start(PrunerArgs {
Expand Down
54 changes: 27 additions & 27 deletions node/src/node/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@ const HOUR: u64 = 60 * 60;
const DAY: u64 = 24 * HOUR;

/// Default maximum age of blocks [`Node`] will synchronise, sample, and store.
pub const DEFAULT_SYNCING_WINDOW: Duration = Duration::from_secs(30 * DAY);
/// Minimum configurable syncing window that can be used in [`NodeBuilder`].
pub const MIN_SYNCING_WINDOW: Duration = Duration::from_secs(60);
pub const DEFAULT_SAMPLING_WINDOW: Duration = Duration::from_secs(30 * DAY);
/// Minimum configurable sampling window that can be used in [`NodeBuilder`].
pub const MIN_SAMPLING_WINDOW: Duration = Duration::from_secs(60);

/// Default delay after the syncing window before [`Node`] prunes the block.
/// Default delay after the sampling window before [`Node`] prunes the block.
pub const DEFAULT_PRUNING_DELAY: Duration = Duration::from_secs(HOUR);
/// Minimum pruning delay that can be used in [`NodeBuilder`].
pub const MIN_PRUNING_DELAY: Duration = Duration::from_secs(60);
Expand All @@ -38,7 +38,7 @@ where
bootnodes: Vec<Multiaddr>,
listen: Vec<Multiaddr>,
sync_batch_size: Option<u64>,
syncing_window: Option<Duration>,
sampling_window: Option<Duration>,
pruning_delay: Option<Duration>,
}

Expand All @@ -49,9 +49,9 @@ pub enum NodeBuilderError {
#[error("Network is not specified")]
NetworkNotSpecified,

/// Syncing window is smaller than [`MIN_SYNCING_WINDOW`].
#[error("Syncing window is {0:?} but cannot be smaller than {MIN_SYNCING_WINDOW:?}")]
SyncingWindowTooSmall(Duration),
/// Sampling window is smaller than [`MIN_SAMPLING_WINDOW`].
#[error("Sampling window is {0:?} but cannot be smaller than {MIN_SAMPLING_WINDOW:?}")]
SamplingWindowTooSmall(Duration),

/// Pruning delay is smaller than [`MIN_PRUNING_DELAY`].
#[error("Pruning delay is {0:?} but cannot be smaller than {MIN_PRUNING_DELAY:?}")]
Expand Down Expand Up @@ -87,7 +87,7 @@ impl NodeBuilder<InMemoryBlockstore, InMemoryStore> {
bootnodes: Vec::new(),
listen: Vec::new(),
sync_batch_size: None,
syncing_window: None,
sampling_window: None,
pruning_delay: None,
}
}
Expand Down Expand Up @@ -134,7 +134,7 @@ where
bootnodes: self.bootnodes,
listen: self.listen,
sync_batch_size: self.sync_batch_size,
syncing_window: self.syncing_window,
sampling_window: self.sampling_window,
pruning_delay: self.pruning_delay,
}
}
Expand All @@ -154,7 +154,7 @@ where
bootnodes: self.bootnodes,
listen: self.listen,
sync_batch_size: self.sync_batch_size,
syncing_window: self.syncing_window,
sampling_window: self.sampling_window,
pruning_delay: self.pruning_delay,
}
}
Expand Down Expand Up @@ -211,23 +211,23 @@ where
}
}

/// Set syncing window.
/// Set sampling window.
///
/// Syncing window defines maximum age of a block considered for syncing and sampling.
/// Sampling window defines maximum age of a block considered for syncing and sampling.
///
/// **Default if [`InMemoryStore`]/[`InMemoryBlockstore`] are used:** 60 seconds.\
/// **Default:** 30 days.\
/// **Minimum:** 60 seconds.
pub fn syncing_window(self, dur: Duration) -> Self {
pub fn sampling_window(self, dur: Duration) -> Self {
NodeBuilder {
syncing_window: Some(dur),
sampling_window: Some(dur),
..self
}
}

/// Set pruning delay.
///
/// Pruning delay defines how much time the pruner should wait after syncing window in
/// Pruning delay defines how much time the pruner should wait after sampling window in
/// order to prune the block.
///
/// **Default if [`InMemoryStore`]/[`InMemoryBlockstore`] are used:** 60 seconds.\
Expand Down Expand Up @@ -257,20 +257,20 @@ where
}

// `Node` is memory hungry when in-memory stores are used and the user may not
// expect they should set a smaller syncing window to reduce that. For user-friendliness
// sake, use smaller default syncing window, if we're running in memory.
// expect they should set a smaller sampling window to reduce that. For user-friendliness
// sake, use smaller default sampling window, if we're running in memory.
//
// If user implements their own in-memory stores then they are responsible
// to set the syncing window to something smaller than `DEFAULT_SYNCING_WINDOW`.
// to set the sampling window to something smaller than `DEFAULT_SAMPLING_WINDOW`.
let in_memory_stores_used = TypeId::of::<S>() == TypeId::of::<InMemoryStore>()
|| TypeId::of::<B>() == TypeId::of::<InMemoryBlockstore>();

let syncing_window = if let Some(dur) = self.syncing_window {
let sampling_window = if let Some(dur) = self.sampling_window {
dur
} else if in_memory_stores_used {
MIN_SYNCING_WINDOW
MIN_SAMPLING_WINDOW
} else {
DEFAULT_SYNCING_WINDOW
DEFAULT_SAMPLING_WINDOW
};

let pruning_delay = if let Some(dur) = self.pruning_delay {
Expand All @@ -281,17 +281,17 @@ where
DEFAULT_PRUNING_DELAY
};

if syncing_window < MIN_SYNCING_WINDOW {
return Err(NodeBuilderError::SyncingWindowTooSmall(syncing_window));
if sampling_window < MIN_SAMPLING_WINDOW {
return Err(NodeBuilderError::SamplingWindowTooSmall(sampling_window));
}

if pruning_delay < MIN_PRUNING_DELAY {
return Err(NodeBuilderError::PruningDelayTooSmall(pruning_delay));
}

let pruning_window = syncing_window.saturating_add(pruning_delay);
let pruning_window = sampling_window.saturating_add(pruning_delay);

info!("Syncing window: {syncing_window:?}, Pruning window: {pruning_window:?}",);
info!("Sampling window: {sampling_window:?}, Pruning window: {pruning_window:?}",);

Ok(NodeConfig {
blockstore: self.blockstore,
Expand All @@ -301,7 +301,7 @@ where
p2p_bootnodes: bootnodes,
p2p_listen_on: self.listen,
sync_batch_size: self.sync_batch_size.unwrap_or(512),
syncing_window,
sampling_window,
pruning_window,
})
}
Expand Down
6 changes: 3 additions & 3 deletions node/src/pruner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ pub(crate) enum PrunerError {

/// Pruner removed CIDs for the last header in the store, but the last header changed
/// before it could be removed (probably becasue of an insert of an older header).
/// Since pruning window is 1h behind syncing window, this should not happen.
/// Since pruning window is at least 1 minute behind sampling window, this should not happen.
#[error("Pruner detected invalid removal and will stop")]
WrongHeightRemoved,
}
Expand Down Expand Up @@ -218,7 +218,7 @@ mod test {
use super::*;
use crate::blockstore::InMemoryBlockstore;
use crate::events::{EventChannel, TryRecvError};
use crate::node::{DEFAULT_PRUNING_DELAY, DEFAULT_SYNCING_WINDOW};
use crate::node::{DEFAULT_PRUNING_DELAY, DEFAULT_SAMPLING_WINDOW};
use crate::store::{InMemoryStore, SamplingStatus};
use crate::test_utils::{
async_test, gen_filled_store, new_block_ranges, ExtendedHeaderGeneratorExt,
Expand All @@ -227,7 +227,7 @@ mod test {
const TEST_CODEC: u64 = 0x0D;
const TEST_MH_CODE: u64 = 0x0D;
const TEST_PRUNING_WINDOW: Duration =
DEFAULT_SYNCING_WINDOW.saturating_add(DEFAULT_PRUNING_DELAY);
DEFAULT_SAMPLING_WINDOW.saturating_add(DEFAULT_PRUNING_DELAY);

#[async_test]
async fn empty_store() {
Expand Down
8 changes: 4 additions & 4 deletions node/src/syncer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -696,7 +696,7 @@ mod tests {
use crate::block_ranges::{BlockRange, BlockRangeExt};
use crate::events::EventChannel;
use crate::node::HeaderExError;
use crate::node::DEFAULT_SYNCING_WINDOW;
use crate::node::DEFAULT_SAMPLING_WINDOW;
use crate::p2p::header_session;
use crate::store::InMemoryStore;
use crate::test_utils::{async_test, gen_filled_store, MockP2pHandle};
Expand Down Expand Up @@ -787,7 +787,7 @@ mod tests {
store: Arc::new(InMemoryStore::new()),
event_pub: events.publisher(),
batch_size: 512,
syncing_window: DEFAULT_SYNCING_WINDOW,
syncing_window: DEFAULT_SAMPLING_WINDOW,
})
.unwrap();

Expand Down Expand Up @@ -935,7 +935,7 @@ mod tests {
store: store.clone(),
event_pub: events.publisher(),
batch_size: 512,
syncing_window: DEFAULT_SYNCING_WINDOW,
syncing_window: DEFAULT_SAMPLING_WINDOW,
})
.unwrap();

Expand Down Expand Up @@ -1175,7 +1175,7 @@ mod tests {
store: store.clone(),
event_pub: events.publisher(),
batch_size: 512,
syncing_window: DEFAULT_SYNCING_WINDOW,
syncing_window: DEFAULT_SAMPLING_WINDOW,
})
.unwrap();

Expand Down

0 comments on commit 27339cb

Please sign in to comment.