diff --git a/node/Cargo.toml b/node/Cargo.toml index cdf5d2eb..8949e6f1 100644 --- a/node/Cargo.toml +++ b/node/Cargo.toml @@ -7,6 +7,7 @@ license = "Apache-2.0" [dependencies] celestia-proto = { workspace = true } celestia-types = { workspace = true } +tendermint = { workspace = true } tendermint-proto = { workspace = true } async-trait = "0.1.73" @@ -20,12 +21,14 @@ libp2p = { version = "0.52.3", features = [ "request-response", ] } prost = "0.12.0" +rand = "0.8.5" thiserror = "1.0.48" tokio = { version = "1.32.0", features = ["macros", "sync"] } tracing = "0.1.37" [target.'cfg(not(target_arch = "wasm32"))'.dependencies] tokio = { version = "1.32.0", features = ["rt-multi-thread"] } +dashmap = "5.5.3" [target.'cfg(target_arch = "wasm32")'.dependencies] getrandom = { version = "0.2.10", features = ["js"] } diff --git a/node/src/exchange.rs b/node/src/exchange.rs index 78b9b1ed..da253b7d 100644 --- a/node/src/exchange.rs +++ b/node/src/exchange.rs @@ -21,11 +21,13 @@ use tracing::instrument; mod client; mod server; mod utils; +pub use utils::ExtendedHeaderExt; use crate::exchange::client::ExchangeClientHandler; use crate::exchange::server::ExchangeServerHandler; use crate::p2p::P2pError; use crate::peer_tracker::PeerTracker; +use crate::store::Store; use crate::utils::{stream_protocol_id, OneshotResultSender}; /// Max request size in bytes @@ -48,6 +50,7 @@ pub(crate) struct ExchangeBehaviour { pub(crate) struct ExchangeConfig<'a> { pub network_id: &'a str, pub peer_tracker: Arc, + pub header_store: Arc, } #[derive(Debug, thiserror::Error)] @@ -79,7 +82,7 @@ impl ExchangeBehaviour { request_response::Config::default(), ), client_handler: ExchangeClientHandler::new(config.peer_tracker), - server_handler: ExchangeServerHandler::new(), + server_handler: ExchangeServerHandler::new(config.header_store), } } diff --git a/node/src/exchange/client.rs b/node/src/exchange/client.rs index f0e1270a..51531f6c 100644 --- a/node/src/exchange/client.rs +++ b/node/src/exchange/client.rs @@ -286,7 +286,7 @@ where #[cfg(test)] mod tests { use super::*; - use crate::exchange::utils::ExtendedHeaderExt; + use crate::exchange::utils::ToHeaderResponse; use celestia_proto::p2p::pb::header_request::Data; use celestia_proto::p2p::pb::StatusCode; use celestia_types::consts::HASH_SIZE; diff --git a/node/src/exchange/server.rs b/node/src/exchange/server.rs index 64227487..010efc98 100644 --- a/node/src/exchange/server.rs +++ b/node/src/exchange/server.rs @@ -3,15 +3,18 @@ use libp2p::{ request_response::{InboundFailure, RequestId, ResponseChannel}, PeerId, }; +use std::sync::Arc; use tracing::instrument; +use crate::store::Store; + pub(super) struct ExchangeServerHandler { - // TODO + _store: Arc, } impl ExchangeServerHandler { - pub(super) fn new() -> Self { - ExchangeServerHandler {} + pub(super) fn new(store: Arc) -> Self { + ExchangeServerHandler { _store: store } } #[instrument(level = "trace", skip(self, _respond_to))] diff --git a/node/src/exchange/utils.rs b/node/src/exchange/utils.rs index a4f9dfd9..c85d1edd 100644 --- a/node/src/exchange/utils.rs +++ b/node/src/exchange/utils.rs @@ -1,7 +1,13 @@ +use celestia_proto::header::pb::ExtendedHeader as RawExtendedHeader; use celestia_proto::p2p::pb::header_request::Data; use celestia_proto::p2p::pb::{HeaderRequest, HeaderResponse, StatusCode}; use celestia_types::consts::HASH_SIZE; +use celestia_types::{DataAvailabilityHeader, ValidatorSet}; use celestia_types::{ExtendedHeader, Hash}; +use tendermint::block::header::Header; +use tendermint::block::Commit; +use tendermint::Time; +use tendermint::{block::header::Version, AppHash}; use tendermint_proto::Protobuf; use crate::exchange::ExchangeError; @@ -58,11 +64,63 @@ impl HeaderResponseExt for HeaderResponse { } } -pub(super) trait ExtendedHeaderExt { - fn to_header_response(&self) -> HeaderResponse; +pub trait ExtendedHeaderExt { + fn with_height(height: u64) -> ExtendedHeader; } impl ExtendedHeaderExt for ExtendedHeader { + fn with_height(height: u64) -> ExtendedHeader { + RawExtendedHeader { + header: Some( + Header { + version: Version { block: 11, app: 1 }, + chain_id: "private".to_string().try_into().unwrap(), + height: height.try_into().unwrap(), + time: Time::now(), + last_block_id: None, + last_commit_hash: Hash::default(), + data_hash: Hash::default(), + validators_hash: Hash::default(), + next_validators_hash: Hash::default(), + consensus_hash: Hash::default(), + app_hash: AppHash::default(), + last_results_hash: Hash::default(), + evidence_hash: Hash::default(), + proposer_address: tendermint::account::Id::new([0; 20]), + } + .into(), + ), + commit: Some( + Commit { + height: height.try_into().unwrap(), + block_id: tendermint::block::Id { + hash: Hash::Sha256(rand::random()), + ..Default::default() + }, + ..Default::default() + } + .into(), + ), + validator_set: Some(ValidatorSet::new(Vec::new(), None).into()), + dah: Some( + DataAvailabilityHeader { + row_roots: Vec::new(), + column_roots: Vec::new(), + hash: [0; 32], + } + .into(), + ), + } + .try_into() + .unwrap() + } +} + +pub(super) trait ToHeaderResponse { + fn to_header_response(&self) -> HeaderResponse; +} + +impl ToHeaderResponse for ExtendedHeader { fn to_header_response(&self) -> HeaderResponse { HeaderResponse { body: self.encode_vec().unwrap(), diff --git a/node/src/node.rs b/node/src/node.rs index 68ca68a4..6ef3e164 100644 --- a/node/src/node.rs +++ b/node/src/node.rs @@ -10,7 +10,6 @@ use libp2p::core::muxing::StreamMuxerBox; use libp2p::core::transport::Boxed; use libp2p::identity::Keypair; use libp2p::{Multiaddr, PeerId}; -use tokio::sync::RwLock; use crate::p2p::{P2p, P2pArgs, P2pService}; use crate::store::Store; @@ -54,7 +53,7 @@ where SyncerSrv: SyncerService, { pub async fn new(config: NodeConfig) -> Result> { - let store = Arc::new(RwLock::new(Store::new())); + let store = Arc::new(Store::new()); let p2p = Arc::new( P2pSrv::start(P2pArgs { @@ -63,6 +62,7 @@ where local_keypair: config.p2p_local_keypair, bootstrap_peers: config.p2p_bootstrap_peers, listen_on: config.p2p_listen_on, + store: store.clone(), }) .await .map_err(NodeError::P2pService)?, diff --git a/node/src/p2p.rs b/node/src/p2p.rs index 55921074..0236a4e9 100644 --- a/node/src/p2p.rs +++ b/node/src/p2p.rs @@ -17,11 +17,12 @@ use libp2p::{identify, Multiaddr, PeerId, TransportError}; use tendermint_proto::Protobuf; use tokio::select; use tokio::sync::oneshot; -use tracing::{debug, instrument, warn}; +use tracing::{debug, info, instrument, warn}; use crate::exchange::{ExchangeBehaviour, ExchangeConfig}; use crate::executor::{spawn, Executor}; use crate::peer_tracker::PeerTracker; +use crate::store::Store; use crate::utils::{gossipsub_ident_topic, OneshotResultSender, OneshotSenderExt}; use crate::Service; @@ -73,6 +74,7 @@ pub struct P2pArgs { pub local_keypair: Keypair, pub bootstrap_peers: Vec, pub listen_on: Vec, + pub store: Arc, } #[doc(hidden)] @@ -269,6 +271,7 @@ impl Worker { let header_ex = ExchangeBehaviour::new(ExchangeConfig { network_id: &args.network_id, peer_tracker: peer_tracker.clone(), + header_store: args.store, }); let behaviour = Behaviour { @@ -338,6 +341,10 @@ impl Worker { SwarmEvent::ConnectionClosed { peer_id, .. } => { self.peer_tracker.remove(peer_id); } + #[cfg(debug_assertions)] + SwarmEvent::NewListenAddr { address, .. } => { + info!("listening on: {address}"); + } _ => {} } diff --git a/node/src/store.rs b/node/src/store.rs index 09cea445..d78e75a8 100644 --- a/node/src/store.rs +++ b/node/src/store.rs @@ -1,9 +1,136 @@ +use celestia_types::ExtendedHeader; +use dashmap::mapref::entry::Entry; +use dashmap::DashMap; +use tendermint::Hash; +use thiserror::Error; +use tracing::{error, info}; + +use crate::exchange::ExtendedHeaderExt; + #[derive(Debug)] -pub struct Store {} +pub struct Store { + headers: DashMap, + height_to_hash: DashMap, +} + +#[derive(Error, Debug, PartialEq)] +pub enum InsertError { + // TODO: do we care about the distinction + #[error("Hash {0} already exists in store")] + HashExists(Hash), + #[error("Height {0} already exists in store")] + HeightExists(u64), +} + +#[derive(Error, Debug, PartialEq)] +pub enum ReadError { + // TODO: should we roll internal errors into one + #[error("Store in inconsistent state, lost head")] + LostStoreHead, + #[error("Store in inconsistent state, height->hash mapping exists, but hash doesn't")] + LostHash, + #[error("Failed to convert height from usize to u64, should not happen")] + HeadHeightConversionError, + + #[error("Header not found in store")] + NotFound, +} impl Store { pub fn new() -> Self { - Store {} + Store { + headers: DashMap::new(), + height_to_hash: DashMap::new(), + } + } + + pub fn get_head_height(&self) -> usize { + self.height_to_hash.len() + } + + pub fn append_header(&self, header: ExtendedHeader) -> Result<(), InsertError> { + let hash = header.hash(); + let height = header.height(); // TODO: should Store be responsible for making sure we have + // all 1..=head headers? + + // lock both maps to ensure consistency + // this shouldn't deadlock as long as we don't hold references across awaits if any + // https://github.com/xacrimon/dashmap/issues/233 + let hash_entry = self.headers.entry(hash); + let height_entry = self.height_to_hash.entry(height.into()); + + if matches!(hash_entry, Entry::Occupied(_)) { + return Err(InsertError::HashExists(hash)); + } + if matches!(height_entry, Entry::Occupied(_)) { + return Err(InsertError::HeightExists(height.into())); + } + + info!("Will insert {hash} at {height}"); + hash_entry.insert(header); + height_entry.insert(hash); + + Ok(()) + } + + pub fn get_head(&self) -> Result { + let head_height = match self.height_to_hash.len() { + 0 => return Err(ReadError::NotFound), + height => match u64::try_from(height) { + Ok(h) => h, + Err(e) => { + error!("Failed to convert {height} from usize to u64, should not happen: {e}"); + return Err(ReadError::HeadHeightConversionError); + } + }, + }; + + let Some(head_hash) = self.height_to_hash.get(&head_height).map(|v| *v) else { + error!("height_to_hash[height_to_hash.len()] not found, store is inconsistent"); + return Err(ReadError::LostStoreHead); + }; + + match self.headers.get(&head_hash) { + Some(v) => Ok(v.clone()), + None => { + error!("Header with hash {head_hash} for height {head_height} missing"); + Err(ReadError::LostHash) + } + } + } + + pub fn get_by_hash(&self, hash: &Hash) -> Result { + match self.headers.get(hash) { + Some(v) => Ok(v.clone()), + None => Err(ReadError::NotFound), + } + } + + pub fn get_by_height(&self, height: u64) -> Result { + let Some(hash) = self.height_to_hash.get(&height).map(|h| *h) else { + return Err(ReadError::NotFound); + }; + + match self.headers.get(&hash) { + Some(h) => Ok(h.clone()), + None => { + error!("Lost hash {hash} at height {height}"); + Err(ReadError::LostHash) + } + } + } + + #[doc(hidden)] + pub fn test_filled_store(height: u64) -> Self { + let s = Store::new(); + + // block height is 1-indexed + for height in 1..=height { + s.append_header(ExtendedHeader::with_height(height)) + .expect("inserting test data failed"); + } + + s } } @@ -12,3 +139,76 @@ impl Default for Store { Store::new() } } + +#[cfg(test)] +mod tests { + use super::*; + use celestia_types::ExtendedHeader; + use tendermint::block::Height; + use tendermint::Hash; + + #[test] + fn test_empty_store() { + let s = Store::new(); + assert_eq!(s.get_head_height(), 0); + assert_eq!(s.get_head(), Err(ReadError::NotFound)); + assert_eq!(s.get_by_height(1), Err(ReadError::NotFound)); + assert_eq!( + s.get_by_hash(&Hash::Sha256([0; 32])), + Err(ReadError::NotFound) + ); + } + + #[test] + fn test_read_write() { + let s = Store::new(); + let header = ExtendedHeader::with_height(1); + s.append_header(header.clone()).unwrap(); + assert_eq!(s.get_head_height(), 1); + assert_eq!(s.get_head().unwrap(), header); + assert_eq!(s.get_by_height(1).unwrap(), header); + assert_eq!(s.get_by_hash(&header.hash()).unwrap(), header); + } + + #[test] + fn test_pregenerated_data() { + let s = Store::test_filled_store(100); + assert_eq!(s.get_head_height(), 100); + let head = s.get_head().unwrap(); + assert_eq!(s.get_by_height(100), Ok(head)); + assert_eq!(s.get_by_height(101), Err(ReadError::NotFound)); + + let header = s.get_by_height(54).unwrap(); + assert_eq!(s.get_by_hash(&header.hash()), Ok(header)); + } + + #[test] + fn test_duplicate_insert() { + let s = Store::test_filled_store(100); + let header = ExtendedHeader::with_height(101); + assert_eq!(s.append_header(header.clone()), Ok(())); + assert_eq!( + s.append_header(header.clone()), + Err(InsertError::HashExists(header.hash())) + ); + } + + #[test] + fn test_overwrite_height() { + let s = Store::test_filled_store(100); + let insert_existing_result = s.append_header(ExtendedHeader::with_height(30)); + assert_eq!(insert_existing_result, Err(InsertError::HeightExists(30))); + } + + #[test] + fn test_overwrite_hash() { + let s = Store::test_filled_store(100); + let mut dup_header = s.get_by_height(33).unwrap(); + dup_header.header.height = Height::from(101u32); + let insert_existing_result = s.append_header(dup_header.clone()); + assert_eq!( + insert_existing_result, + Err(InsertError::HashExists(dup_header.hash())) + ); + } +} diff --git a/node/src/syncer.rs b/node/src/syncer.rs index 2a04535d..a2b8f5d2 100644 --- a/node/src/syncer.rs +++ b/node/src/syncer.rs @@ -1,7 +1,6 @@ use std::sync::Arc; use async_trait::async_trait; -use tokio::sync::RwLock; use crate::{p2p::P2pService, store::Store, Service}; @@ -14,12 +13,12 @@ pub enum SyncerError {} #[derive(Debug)] pub struct Syncer { p2p: Arc, - store: Arc>, + store: Arc, } pub struct SyncerArgs { pub p2p: Arc, - pub store: Arc>, + pub store: Arc, } #[doc(hidden)] diff --git a/types/src/extended_header.rs b/types/src/extended_header.rs index 3b303aaa..cd4aad26 100644 --- a/types/src/extended_header.rs +++ b/types/src/extended_header.rs @@ -28,6 +28,13 @@ pub struct ExtendedHeader { pub dah: DataAvailabilityHeader, } +use core::fmt::{Display, Formatter}; +impl Display for ExtendedHeader { + fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result { + write!(f, "hash: {}; height: {}", self.hash(), self.height()) + } +} + impl ExtendedHeader { pub fn chain_id(&self) -> &Id { &self.header.chain_id