diff --git a/node/src/node.rs b/node/src/node.rs index 0ce5a1be..deb6be27 100644 --- a/node/src/node.rs +++ b/node/src/node.rs @@ -224,17 +224,17 @@ where /// Request the head header from the network. pub async fn request_head_header(&self) -> Result { - Ok(self.p2p.get_head_header().await?) + Ok(self.p2p.get_head_header().await?.into()) } /// Request a header for the block with a given hash from the network. pub async fn request_header_by_hash(&self, hash: &Hash) -> Result { - Ok(self.p2p.get_header(*hash).await?) + Ok(self.p2p.get_header(*hash).await?.into()) } /// Request a header for the block with a given height from the network. pub async fn request_header_by_height(&self, hash: u64) -> Result { - Ok(self.p2p.get_header_by_height(hash).await?) + Ok(self.p2p.get_header_by_height(hash).await?.into()) } /// Request headers in range (from, from + amount] from the network. @@ -245,7 +245,11 @@ where from: &ExtendedHeader, amount: u64, ) -> Result> { - Ok(self.p2p.get_verified_headers_range(from, amount).await?) + Ok(self + .p2p + .get_verified_headers_range(from, amount) + .await? + .into()) } /// Request a verified [`Row`] from the network. @@ -301,7 +305,11 @@ where /// Get the latest header announced in the network. pub fn get_network_head_header(&self) -> Option { - self.p2p.header_sub_watcher().borrow().clone() + self.p2p + .header_sub_watcher() + .borrow() + .clone() + .map(|h| h.into()) } /// Get the latest locally synced header. diff --git a/node/src/p2p.rs b/node/src/p2p.rs index 38407088..20e38f5d 100644 --- a/node/src/p2p.rs +++ b/node/src/p2p.rs @@ -63,7 +63,8 @@ use crate::p2p::shwap::{namespaced_data_cid, row_cid, sample_cid, ShwapMultihash use crate::p2p::swarm::new_swarm; use crate::peer_tracker::PeerTracker; use crate::peer_tracker::PeerTrackerInfo; -use crate::store::Store; +use crate::store::utils::ValidatedExtendedHeader; +use crate::store::{Store, ValidatedExtendedHeaders, VerifiedExtendedHeaders}; use crate::utils::{ celestia_protocol_id, fraudsub_ident_topic, gossipsub_ident_topic, MultiaddrExt, OneshotResultSender, OneshotResultSenderExt, OneshotSenderExt, @@ -157,7 +158,7 @@ impl From for P2pError { #[derive(Debug)] pub struct P2p { cmd_tx: mpsc::Sender, - header_sub_watcher: watch::Receiver>, + header_sub_watcher: watch::Receiver>, peer_tracker_info_watcher: watch::Receiver, local_peer_id: PeerId, } @@ -191,7 +192,7 @@ pub(crate) enum P2pCmd { }, HeaderExRequest { request: HeaderRequest, - respond_to: OneshotResultSender, P2pError>, + respond_to: OneshotResultSender, }, Listeners { respond_to: oneshot::Sender>, @@ -200,7 +201,7 @@ pub(crate) enum P2pCmd { respond_to: oneshot::Sender>, }, InitHeaderSub { - head: Box, + head: Box, }, SetPeerTrust { peer_id: PeerId, @@ -289,7 +290,7 @@ impl P2p { } /// Watcher for the latest verified network head headers announced on `header-sub`. - pub fn header_sub_watcher(&self) -> watch::Receiver> { + pub fn header_sub_watcher(&self) -> watch::Receiver> { self.header_sub_watcher.clone() } @@ -304,7 +305,7 @@ impl P2p { } /// Initializes `header-sub` protocol with a given `subjective_head`. - pub async fn init_header_sub(&self, head: ExtendedHeader) -> Result<()> { + pub async fn init_header_sub(&self, head: ValidatedExtendedHeader) -> Result<()> { self.send_command(P2pCmd::InitHeaderSub { head: Box::new(head), }) @@ -340,7 +341,10 @@ impl P2p { } /// Send a request on the `header-ex` protocol. - pub async fn header_ex_request(&self, request: HeaderRequest) -> Result> { + pub async fn header_ex_request( + &self, + request: HeaderRequest, + ) -> Result { let (tx, rx) = oneshot::channel(); self.send_command(P2pCmd::HeaderExRequest { @@ -353,35 +357,37 @@ impl P2p { } /// Request the head header on the `header-ex` protocol. - pub async fn get_head_header(&self) -> Result { + pub async fn get_head_header(&self) -> Result { self.get_header_by_height(0).await } /// Request the header by hash on the `header-ex` protocol. - pub async fn get_header(&self, hash: Hash) -> Result { + pub async fn get_header(&self, hash: Hash) -> Result { self.header_ex_request(HeaderRequest { data: Some(header_request::Data::Hash(hash.as_bytes().to_vec())), amount: 1, }) .await? + .into_validated_vec() .into_iter() .next() .ok_or(HeaderExError::HeaderNotFound.into()) } /// Request the header by height on the `header-ex` protocol. - pub async fn get_header_by_height(&self, height: u64) -> Result { + pub async fn get_header_by_height(&self, height: u64) -> Result { self.header_ex_request(HeaderRequest { data: Some(header_request::Data::Origin(height)), amount: 1, }) .await? + .into_validated_vec() .into_iter() .next() .ok_or(HeaderExError::HeaderNotFound.into()) } - /// Request the headers following the one given with the `header-ex` protocol. + /// Request the headers followingValidatedExtendedHeader the one given with the `header-ex` protocol. /// /// First header from the requested range will be verified against the provided one, /// then each subsequent is verified against the previous one. @@ -389,22 +395,26 @@ impl P2p { &self, from: &ExtendedHeader, amount: u64, - ) -> Result> { + ) -> Result { + if amount == 0 { + return Err(HeaderExError::InvalidRequest.into()); + } + // User can give us a bad header, so validate it. from.validate().map_err(|_| HeaderExError::InvalidRequest)?; let height = from.height().value() + 1; - let range = height..=height + amount - 1; let mut session = HeaderSession::new(range, self.cmd_tx.clone()); let headers = session.run().await?; - // `.validate()` is called on each header separately by `HeaderExClientHandler`. - // - // The last step is to verify that all headers are from the same chain - // and indeed connected with the next one. - from.verify_adjacent_range(&headers) + // Verify the first header, the rest will be verified + // in `VerifiedExtendedHeaders::from_validated`. + from.verify_adjacent(&headers[0]) + .map_err(|_| HeaderExError::InvalidResponse)?; + + let headers = VerifiedExtendedHeaders::from_validated(headers) .map_err(|_| HeaderExError::InvalidResponse)?; Ok(headers) @@ -417,7 +427,7 @@ impl P2p { pub(crate) async fn get_unverified_header_range( &self, range: BlockRange, - ) -> Result> { + ) -> Result { if range.is_empty() { return Err(HeaderExError::InvalidRequest.into()); } @@ -425,13 +435,6 @@ impl P2p { let mut session = HeaderSession::new(range, self.cmd_tx.clone()); let headers = session.run().await?; - let Some(head) = headers.first() else { - return Err(HeaderExError::InvalidResponse.into()); - }; - - head.verify_adjacent_range(&headers[1..]) - .map_err(|_| HeaderExError::InvalidResponse)?; - Ok(headers) } @@ -565,7 +568,7 @@ where bad_encoding_fraud_sub_topic: TopicHash, cmd_rx: mpsc::Receiver, peer_tracker: Arc, - header_sub_watcher: watch::Sender>, + header_sub_watcher: watch::Sender>, bitswap_queries: HashMap, P2pError>>, network_compromised_token: CancellationToken, store: Arc, @@ -579,7 +582,7 @@ where fn new( args: P2pArgs, cmd_rx: mpsc::Receiver, - header_sub_watcher: watch::Sender>, + header_sub_watcher: watch::Sender>, peer_tracker: Arc, ) -> Result { let local_peer_id = PeerId::from(args.local_keypair.public()); @@ -948,15 +951,20 @@ where } #[instrument(skip_all, fields(header = %head))] - fn on_init_header_sub(&mut self, head: ExtendedHeader) { + fn on_init_header_sub(&mut self, head: ValidatedExtendedHeader) { self.header_sub_watcher.send_replace(Some(head)); trace!("HeaderSub initialized"); } #[instrument(skip_all)] async fn on_header_sub_message(&mut self, data: &[u8]) -> gossipsub::MessageAcceptance { - let Ok(header) = ExtendedHeader::decode_and_validate(data) else { - trace!("Malformed or invalid header from header-sub"); + let Ok(header) = ExtendedHeader::decode(data) else { + trace!("Malformed header from header-sub"); + return gossipsub::MessageAcceptance::Reject; + }; + + let Ok(header) = ValidatedExtendedHeader::new(header) else { + trace!("Invalid header from header-sub"); return gossipsub::MessageAcceptance::Reject; }; @@ -1142,6 +1150,6 @@ where .build()) } -fn network_head_height(watcher: &watch::Sender>) -> Option { +fn network_head_height(watcher: &watch::Sender>) -> Option { watcher.borrow().as_ref().map(|header| header.height()) } diff --git a/node/src/p2p/header_ex.rs b/node/src/p2p/header_ex.rs index 79b18d1f..34cd33a6 100644 --- a/node/src/p2p/header_ex.rs +++ b/node/src/p2p/header_ex.rs @@ -4,7 +4,6 @@ use std::task::{Context, Poll}; use async_trait::async_trait; use celestia_proto::p2p::pb::{HeaderRequest, HeaderResponse}; -use celestia_types::ExtendedHeader; use futures::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; use libp2p::{ core::Endpoint, @@ -29,7 +28,7 @@ use crate::p2p::header_ex::client::HeaderExClientHandler; use crate::p2p::header_ex::server::HeaderExServerHandler; use crate::p2p::P2pError; use crate::peer_tracker::PeerTracker; -use crate::store::Store; +use crate::store::{Store, ValidatedExtendedHeaders}; use crate::utils::{protocol_id, OneshotResultSender}; /// Size limit of a request in bytes @@ -111,7 +110,7 @@ where pub(crate) fn send_request( &mut self, request: HeaderRequest, - respond_to: OneshotResultSender, P2pError>, + respond_to: OneshotResultSender, ) { self.client_handler .on_send_request(&mut self.req_resp, request, respond_to); diff --git a/node/src/p2p/header_ex/client.rs b/node/src/p2p/header_ex/client.rs index bdf215ad..fc3d3ba8 100644 --- a/node/src/p2p/header_ex/client.rs +++ b/node/src/p2p/header_ex/client.rs @@ -7,7 +7,6 @@ use std::task::{Context, Poll}; use celestia_proto::p2p::pb::header_request::Data; use celestia_proto::p2p::pb::{HeaderRequest, HeaderResponse}; -use celestia_types::ExtendedHeader; use futures::future::join_all; use libp2p::request_response::{OutboundFailure, OutboundRequestId}; use libp2p::PeerId; @@ -19,7 +18,7 @@ use crate::p2p::header_ex::utils::{HeaderRequestExt, HeaderResponseExt}; use crate::p2p::header_ex::{HeaderExError, ReqRespBehaviour}; use crate::p2p::P2pError; use crate::peer_tracker::PeerTracker; -use crate::store::utils::VALIDATIONS_PER_YIELD; +use crate::store::utils::{ValidatedExtendedHeaders, VALIDATIONS_PER_YIELD}; use crate::utils::{OneshotResultSender, OneshotResultSenderExt}; const MAX_PEERS: usize = 10; @@ -34,7 +33,7 @@ where struct State { request: HeaderRequest, - respond_to: OneshotResultSender, P2pError>, + respond_to: OneshotResultSender, } pub(super) trait RequestSender { @@ -67,7 +66,7 @@ where &mut self, sender: &mut S, request: HeaderRequest, - respond_to: OneshotResultSender, P2pError>, + respond_to: OneshotResultSender, ) { if !request.is_valid() { respond_to.maybe_send_err(HeaderExError::InvalidRequest); @@ -87,7 +86,7 @@ where &mut self, sender: &mut S, request: HeaderRequest, - respond_to: OneshotResultSender, P2pError>, + respond_to: OneshotResultSender, ) { // Validate amount if usize::try_from(request.amount).is_err() { @@ -113,7 +112,7 @@ where &mut self, sender: &mut S, request: HeaderRequest, - respond_to: OneshotResultSender, P2pError>, + respond_to: OneshotResultSender, ) { const MIN_HEAD_RESPONSES: usize = 2; @@ -149,7 +148,7 @@ where .into_iter() // In case of HEAD all responses have only 1 header. // This was already enforced by `decode_and_verify_responses`. - .filter_map(|v| v.ok()?.ok()?.into_iter().next()) + .filter_map(|v| v.ok()?.ok()?.into_validated_vec().into_iter().next()) .collect(); let mut counter: HashMap<_, usize> = HashMap::new(); @@ -173,14 +172,14 @@ where // Return the header with the highest height that was received by at least 2 peers for resp in &resps { if counter[&resp.hash()] >= MIN_HEAD_RESPONSES { - respond_to.maybe_send_ok(vec![resp.to_owned()]); + respond_to.maybe_send_ok(resp.into()); return; } } // Otherwise return the header with the maximum height let resp = resps.into_iter().next().expect("no reposnes"); - respond_to.maybe_send_ok(vec![resp]); + respond_to.maybe_send_ok(resp.into()); }); } @@ -238,7 +237,7 @@ where async fn decode_and_verify_responses( request: &HeaderRequest, responses: &[HeaderResponse], -) -> Result, HeaderExError> { +) -> Result { if responses.is_empty() { return Err(HeaderExError::InvalidResponse); } @@ -298,7 +297,7 @@ async fn decode_and_verify_responses( _ => return Err(HeaderExError::InvalidResponse), } - Ok(headers) + Ok(headers.into()) } #[cfg(test)] diff --git a/node/src/p2p/header_ex/utils.rs b/node/src/p2p/header_ex/utils.rs index eedf8e9a..2ab51799 100644 --- a/node/src/p2p/header_ex/utils.rs +++ b/node/src/p2p/header_ex/utils.rs @@ -6,6 +6,7 @@ use celestia_types::hash::Hash; use celestia_types::ExtendedHeader; use crate::p2p::header_ex::HeaderExError; +use crate::store::utils::ValidatedExtendedHeader; pub(crate) trait HeaderRequestExt { fn with_origin(origin: u64, amount: u64) -> HeaderRequest; @@ -51,18 +52,21 @@ impl HeaderRequestExt for HeaderRequest { } pub(super) trait HeaderResponseExt { - fn to_validated_extented_header(&self) -> Result; + fn to_validated_extented_header(&self) -> Result; fn not_found() -> HeaderResponse; fn invalid() -> HeaderResponse; } impl HeaderResponseExt for HeaderResponse { - fn to_validated_extented_header(&self) -> Result { + fn to_validated_extented_header(&self) -> Result { match self.status_code() { StatusCode::Invalid => Err(HeaderExError::InvalidResponse), StatusCode::NotFound => Err(HeaderExError::HeaderNotFound), - StatusCode::Ok => ExtendedHeader::decode_and_validate(&self.body[..]) - .map_err(|_| HeaderExError::InvalidResponse), + StatusCode::Ok => { + let header = ExtendedHeader::decode(&self.body[..]) + .map_err(|_| HeaderExError::InvalidResponse)?; + ValidatedExtendedHeader::new(header).map_err(|_| HeaderExError::InvalidResponse) + } } } diff --git a/node/src/p2p/header_session.rs b/node/src/p2p/header_session.rs index 5b3c4d15..53e938d6 100644 --- a/node/src/p2p/header_session.rs +++ b/node/src/p2p/header_session.rs @@ -1,5 +1,4 @@ use celestia_proto::p2p::pb::HeaderRequest; -use celestia_types::ExtendedHeader; use tokio::sync::{mpsc, oneshot}; use tracing::debug; @@ -7,6 +6,7 @@ use crate::block_ranges::{BlockRange, BlockRangeExt}; use crate::executor::spawn; use crate::p2p::header_ex::utils::HeaderRequestExt; use crate::p2p::{P2pCmd, P2pError}; +use crate::store::utils::ValidatedExtendedHeaders; pub(crate) const MIN_AMOUNT_PER_REQ: u64 = 8; pub(crate) const MAX_AMOUNT_PER_REQ: u64 = 64; @@ -17,8 +17,8 @@ type Result = std::result::Result; pub(crate) struct HeaderSession { to_fetch: Option, cmd_tx: mpsc::Sender, - response_tx: mpsc::Sender<(u64, u64, Result>)>, - response_rx: mpsc::Receiver<(u64, u64, Result>)>, + response_tx: mpsc::Sender<(u64, u64, Result)>, + response_rx: mpsc::Receiver<(u64, u64, Result)>, ongoing: usize, batch_size: u64, } @@ -49,7 +49,7 @@ impl HeaderSession { } } - pub(crate) async fn run(&mut self) -> Result> { + pub(crate) async fn run(&mut self) -> Result { let mut responses = Vec::new(); for _ in 0..MAX_CONCURRENT_REQS { @@ -61,6 +61,7 @@ impl HeaderSession { match res { Ok(headers) => { + let headers = headers.into_validated_vec(); let headers_len = headers.len() as u64; if headers_len > 0 { @@ -97,7 +98,7 @@ impl HeaderSession { Ok(responses.into_iter().flatten().collect()) } - async fn recv_response(&mut self) -> (u64, u64, Result>) { + async fn recv_response(&mut self) -> (u64, u64, Result) { let (height, requested_amount, res) = self.response_rx.recv().await.expect("channel never closes"); diff --git a/node/src/store.rs b/node/src/store.rs index 4921961c..6dbff7a6 100644 --- a/node/src/store.rs +++ b/node/src/store.rs @@ -15,7 +15,10 @@ use serde::{Deserialize, Serialize}; use thiserror::Error; pub use crate::block_ranges::{BlockRange, BlockRanges, BlockRangesError}; -pub use crate::store::utils::{ExtendedHeaderGeneratorExt, VerifiedExtendedHeaders}; +pub use crate::store::utils::{ + IntoVerifiedExtendedHeaders, ValidatedExtendedHeader, ValidatedExtendedHeaders, + VerifiedExtendedHeaders, +}; pub use in_memory_store::InMemoryStore; #[cfg(target_arch = "wasm32")] @@ -144,14 +147,11 @@ pub trait Store: Send + Sync + Debug { /// `Ok(None)` indicates that header is in the store but sampling metadata is not set yet. async fn get_sampling_metadata(&self, height: u64) -> Result>; - /// Insert a range of headers into the store. + /// Insert a chain of headers into the store. /// /// New insertion should pass all the constraints in [`BlockRanges::check_insertion_constraints`], /// additionaly it should be [`ExtendedHeader::verify`]ed against neighbor headers. - async fn insert(&self, headers: R) -> Result<()> - where - R: TryInto + Send, - StoreError: From<>::Error>; + async fn insert(&self, headers: impl IntoVerifiedExtendedHeaders) -> Result<()>; /// Returns a list of header ranges currenty held in store. async fn get_stored_header_ranges(&self) -> Result; @@ -315,6 +315,7 @@ fn to_headers_range(bounds: impl RangeBounds, last_index: u64) -> Result(&self, headers: R) -> Result<()> - where - R: TryInto + Send, - StoreError: From<>::Error>, - { - let headers = headers.try_into()?; + pub(crate) async fn insert(&self, headers: impl IntoVerifiedExtendedHeaders) -> Result<()> { + let headers = headers.into_verified().await?; self.inner.write().await.insert(headers).await?; self.header_added_notifier.notify_waiters(); Ok(()) @@ -363,12 +362,8 @@ impl Store for InMemoryStore { self.contains_height(height).await } - async fn insert(&self, header: R) -> Result<()> - where - R: TryInto + Send, - StoreError: From<>::Error>, - { - self.insert(header).await + async fn insert(&self, headers: impl IntoVerifiedExtendedHeaders) -> Result<()> { + self.insert(headers).await } async fn update_sampling_metadata( diff --git a/node/src/store/indexed_db_store.rs b/node/src/store/indexed_db_store.rs index debff02b..0ed0f8cc 100644 --- a/node/src/store/indexed_db_store.rs +++ b/node/src/store/indexed_db_store.rs @@ -709,7 +709,7 @@ mod v3 { #[cfg(test)] pub mod tests { use super::*; - use crate::store::utils::ExtendedHeaderGeneratorExt; + use crate::test_utils::ExtendedHeaderGeneratorExt; use celestia_types::test_utils::ExtendedHeaderGenerator; use function_name::named; use wasm_bindgen_test::wasm_bindgen_test; diff --git a/node/src/store/redb_store.rs b/node/src/store/redb_store.rs index 0bf53397..4846c4f6 100644 --- a/node/src/store/redb_store.rs +++ b/node/src/store/redb_store.rs @@ -18,8 +18,9 @@ use tracing::warn; use tracing::{debug, trace}; use crate::block_ranges::BlockRanges; -use crate::store::utils::VerifiedExtendedHeaders; -use crate::store::{Result, SamplingMetadata, SamplingStatus, Store, StoreError}; +use crate::store::{ + IntoVerifiedExtendedHeaders, Result, SamplingMetadata, SamplingStatus, Store, StoreError, +}; const SCHEMA_VERSION: u64 = 2; @@ -243,12 +244,8 @@ impl RedbStore { .unwrap_or(false) } - async fn insert(&self, headers: R) -> Result<()> - where - R: TryInto + Send, - StoreError: From<>::Error>, - { - let headers = headers.try_into()?; + async fn insert(&self, headers: impl IntoVerifiedExtendedHeaders) -> Result<()> { + let headers = headers.into_verified().await?; self.write_tx(move |tx| { let headers = headers.as_ref(); @@ -465,11 +462,7 @@ impl Store for RedbStore { self.contains_height(height).await } - async fn insert(&self, headers: R) -> Result<()> - where - R: TryInto + Send, - StoreError: From<>::Error>, - { + async fn insert(&self, headers: impl IntoVerifiedExtendedHeaders) -> Result<()> { self.insert(headers).await } @@ -678,13 +671,10 @@ fn migrate_v1_to_v2( #[cfg(test)] pub mod tests { - use crate::store::ExtendedHeaderGeneratorExt; - use super::*; - - use std::path::Path; - + use crate::test_utils::ExtendedHeaderGeneratorExt; use celestia_types::test_utils::ExtendedHeaderGenerator; + use std::path::Path; use tempfile::TempDir; #[tokio::test] diff --git a/node/src/store/utils.rs b/node/src/store/utils.rs index d50a670e..7c8ec296 100644 --- a/node/src/store/utils.rs +++ b/node/src/store/utils.rs @@ -1,7 +1,8 @@ -use std::ops::RangeInclusive; +use std::fmt::{self, Display}; +use std::future::Future; +use std::mem; +use std::ops::{Deref, RangeInclusive}; -#[cfg(any(test, feature = "test-utils"))] -use celestia_types::test_utils::ExtendedHeaderGenerator; use celestia_types::ExtendedHeader; use crate::block_ranges::{BlockRange, BlockRangeExt}; @@ -59,10 +60,125 @@ fn get_most_recent_missing_range( penultimate_range_end + 1..=store_head_range.start().saturating_sub(1) } -/// Span of header that's been verified internally -#[derive(Clone)] +#[repr(transparent)] +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct ValidatedExtendedHeader(ExtendedHeader); + +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct ValidatedExtendedHeaders(Vec); + +#[derive(Clone, Debug, PartialEq, Eq)] pub struct VerifiedExtendedHeaders(Vec); +pub trait IntoVerifiedExtendedHeaders: Send { + fn into_verified( + self, + ) -> impl Future> + Send; +} + +impl ValidatedExtendedHeader { + pub fn new(header: ExtendedHeader) -> celestia_types::Result { + header.validate()?; + Ok(ValidatedExtendedHeader(header)) + } + + /// Create a new instance out of pre-checked vec of headers + /// + /// # Safety + /// + /// This function may produce invalid `ValidatedExtendedHeader`, if passed range + /// is not validated manually. + pub unsafe fn new_unchecked(header: ExtendedHeader) -> Self { + ValidatedExtendedHeader(header) + } + + pub fn into_inner(self) -> ExtendedHeader { + self.0 + } +} + +impl ValidatedExtendedHeaders { + pub async fn new(headers: Vec) -> celestia_types::Result { + validate_headers(&headers).await?; + Ok(ValidatedExtendedHeaders(headers)) + } + + /// Create a new instance out of pre-checked vec of headers + /// + /// # Safety + /// + /// This function may produce invalid `ValidatedExtendedHeaders`, if passed range + /// is not validated manually. + pub unsafe fn new_unchecked(headers: Vec) -> Self { + ValidatedExtendedHeaders(headers) + } + + pub fn into_inner(self) -> Vec { + self.0 + } + + pub fn into_validated_vec(self) -> Vec { + // SAFETY: It is safe to transmute because of `repr(transparent)`. + unsafe { mem::transmute(self.0) } + } +} + +impl VerifiedExtendedHeaders { + pub async fn new(headers: Vec) -> celestia_types::Result { + let headers = ValidatedExtendedHeaders::new(headers).await?; + VerifiedExtendedHeaders::from_validated(headers) + } + + pub fn from_validated(headers: ValidatedExtendedHeaders) -> celestia_types::Result { + verify_headers(&headers.0)?; + Ok(VerifiedExtendedHeaders(headers.0)) + } + + /// Create a new instance out of pre-checked vec of headers + /// + /// # Safety + /// + /// This function may produce invalid `VerifiedExtendedHeaders`, if passed range + /// is not validated and verified manually. + pub unsafe fn new_unchecked(headers: Vec) -> Self { + VerifiedExtendedHeaders(headers) + } + + pub fn into_inner(self) -> Vec { + self.0 + } + + pub fn into_validated_vec(self) -> Vec { + // SAFETY: It is safe to transmute because of `repr(transparent)`. + unsafe { mem::transmute(self.0) } + } +} + +impl Display for ValidatedExtendedHeader { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + Display::fmt(&self.0, f) + } +} + +impl FromIterator for ValidatedExtendedHeaders { + fn from_iter(iter: T) -> Self + where + T: IntoIterator, + { + let headers = iter.into_iter().map(|h| h.0).collect(); + ValidatedExtendedHeaders(headers) + } +} + +impl IntoIterator for ValidatedExtendedHeaders { + type Item = ExtendedHeader; + type IntoIter = std::vec::IntoIter; + + fn into_iter(self) -> Self::IntoIter { + self.0.into_iter() + } +} + impl IntoIterator for VerifiedExtendedHeaders { type Item = ExtendedHeader; type IntoIter = std::vec::IntoIter; @@ -72,85 +188,244 @@ impl IntoIterator for VerifiedExtendedHeaders { } } -impl<'a> TryFrom<&'a [ExtendedHeader]> for VerifiedExtendedHeaders { - type Error = celestia_types::Error; +impl Deref for ValidatedExtendedHeader { + type Target = ExtendedHeader; - fn try_from(value: &'a [ExtendedHeader]) -> Result { - value.to_vec().try_into() + fn deref(&self) -> &Self::Target { + &self.0 } } -impl From for Vec { - fn from(value: VerifiedExtendedHeaders) -> Self { - value.0 +impl Deref for ValidatedExtendedHeaders { + type Target = [ExtendedHeader]; + + fn deref(&self) -> &Self::Target { + &self.0[..] + } +} + +impl Deref for VerifiedExtendedHeaders { + type Target = [ExtendedHeader]; + + fn deref(&self) -> &Self::Target { + &self.0[..] + } +} + +impl AsRef for ValidatedExtendedHeader { + fn as_ref(&self) -> &ExtendedHeader { + &self.0 + } +} + +impl AsRef<[ExtendedHeader]> for ValidatedExtendedHeaders { + fn as_ref(&self) -> &[ExtendedHeader] { + &self.0[..] } } impl AsRef<[ExtendedHeader]> for VerifiedExtendedHeaders { fn as_ref(&self) -> &[ExtendedHeader] { - &self.0 + &self.0[..] } } -/// 1-length hedaer span is internally verified, this is valid -impl From<[ExtendedHeader; 1]> for VerifiedExtendedHeaders { - fn from(value: [ExtendedHeader; 1]) -> Self { - Self(value.into()) +impl From for ExtendedHeader { + fn from(header: ValidatedExtendedHeader) -> Self { + header.0 } } -impl From for VerifiedExtendedHeaders { - fn from(value: ExtendedHeader) -> Self { - Self(vec![value]) +impl From for Vec { + fn from(headers: ValidatedExtendedHeaders) -> Self { + headers.0 } } -impl<'a> From<&'a ExtendedHeader> for VerifiedExtendedHeaders { - fn from(value: &ExtendedHeader) -> Self { - Self(vec![value.to_owned()]) +impl From for Vec { + fn from(headers: VerifiedExtendedHeaders) -> Self { + headers.0 } } -impl TryFrom> for VerifiedExtendedHeaders { - type Error = celestia_types::Error; +impl From for ValidatedExtendedHeaders { + fn from(header: ValidatedExtendedHeader) -> Self { + ValidatedExtendedHeaders(vec![header.0]) + } +} - fn try_from(headers: Vec) -> Result { - let Some(head) = headers.first() else { - return Ok(VerifiedExtendedHeaders(Vec::default())); - }; +impl<'a> From<&'a ValidatedExtendedHeader> for ValidatedExtendedHeaders { + fn from(header: &'a ValidatedExtendedHeader) -> Self { + header.to_owned().into() + } +} - head.verify_adjacent_range(&headers[1..])?; +impl<'a> From<&'a mut ValidatedExtendedHeader> for ValidatedExtendedHeaders { + fn from(header: &'a mut ValidatedExtendedHeader) -> Self { + header.to_owned().into() + } +} - Ok(Self(headers)) +impl From> for ValidatedExtendedHeaders { + fn from(headers: Vec) -> Self { + // SAFETY: It is safe to transmute because of `repr(transparent)`. + let headers = unsafe { mem::transmute(headers) }; + ValidatedExtendedHeaders(headers) } } -impl VerifiedExtendedHeaders { - /// Create a new instance out of pre-checked vec of headers - /// - /// # Safety - /// - /// This function may produce invalid `VerifiedExtendedHeaders`, if passed range is not - /// validated manually - pub unsafe fn new_unchecked(headers: Vec) -> Self { - Self(headers) +impl<'a> From<&'a [ValidatedExtendedHeader]> for ValidatedExtendedHeaders { + fn from(headers: &'a [ValidatedExtendedHeader]) -> Self { + Vec::from(headers).into() + } +} + +impl<'a> From<&'a mut [ValidatedExtendedHeader]> for ValidatedExtendedHeaders { + fn from(headers: &'a mut [ValidatedExtendedHeader]) -> Self { + Vec::from(headers).into() + } +} + +impl<'a, const N: usize> From<[ValidatedExtendedHeader; N]> for ValidatedExtendedHeaders { + fn from(headers: [ValidatedExtendedHeader; N]) -> Self { + Vec::from(headers).into() + } +} + +impl<'a, const N: usize> From<&'a [ValidatedExtendedHeader; N]> for ValidatedExtendedHeaders { + fn from(headers: &'a [ValidatedExtendedHeader; N]) -> Self { + Vec::from(headers).into() + } +} + +impl<'a, const N: usize> From<&'a mut [ValidatedExtendedHeader; N]> for ValidatedExtendedHeaders { + fn from(headers: &'a mut [ValidatedExtendedHeader; N]) -> Self { + Vec::from(headers).into() + } +} + +impl IntoVerifiedExtendedHeaders for VerifiedExtendedHeaders { + async fn into_verified(self) -> celestia_types::Result { + Ok(self) + } +} + +impl IntoVerifiedExtendedHeaders for ValidatedExtendedHeader { + async fn into_verified(self) -> celestia_types::Result { + // A single validated header has no neighbors so there nothing to verify. + Ok(VerifiedExtendedHeaders(vec![self.0])) + } +} + +impl<'a> IntoVerifiedExtendedHeaders for &'a ValidatedExtendedHeader { + async fn into_verified(self) -> celestia_types::Result { + // A single validated header has no neighbors so there nothing to verify. + Ok(VerifiedExtendedHeaders(vec![self.0.to_owned()])) + } +} + +impl<'a> IntoVerifiedExtendedHeaders for &'a mut ValidatedExtendedHeader { + async fn into_verified(self) -> celestia_types::Result { + // A single validated header has no neighbors so there nothing to verify. + Ok(VerifiedExtendedHeaders(vec![self.0.to_owned()])) + } +} + +impl IntoVerifiedExtendedHeaders for ValidatedExtendedHeaders { + async fn into_verified(self) -> celestia_types::Result { + VerifiedExtendedHeaders::from_validated(self) } } -/// Extends test header generator for easier insertion into the store -pub trait ExtendedHeaderGeneratorExt { - /// Generate next amount verified headers - fn next_many_verified(&mut self, amount: u64) -> VerifiedExtendedHeaders; +impl<'a> IntoVerifiedExtendedHeaders for &'a ValidatedExtendedHeaders { + async fn into_verified(self) -> celestia_types::Result { + verify_headers(&self.0[..])?; + Ok(VerifiedExtendedHeaders(self.0.to_owned())) + } } -#[cfg(any(test, feature = "test-utils"))] -impl ExtendedHeaderGeneratorExt for ExtendedHeaderGenerator { - fn next_many_verified(&mut self, amount: u64) -> VerifiedExtendedHeaders { - unsafe { VerifiedExtendedHeaders::new_unchecked(self.next_many(amount)) } +impl<'a> IntoVerifiedExtendedHeaders for &'a mut ValidatedExtendedHeaders { + async fn into_verified(self) -> celestia_types::Result { + verify_headers(&self.0[..])?; + Ok(VerifiedExtendedHeaders(self.0.to_owned())) } } -#[allow(unused)] +impl IntoVerifiedExtendedHeaders for ExtendedHeader { + async fn into_verified(self) -> celestia_types::Result { + self.validate()?; + Ok(VerifiedExtendedHeaders(vec![self])) + } +} + +impl<'a> IntoVerifiedExtendedHeaders for &'a ExtendedHeader { + async fn into_verified(self) -> celestia_types::Result { + self.validate()?; + Ok(VerifiedExtendedHeaders(vec![self.to_owned()])) + } +} + +impl<'a> IntoVerifiedExtendedHeaders for &'a mut ExtendedHeader { + async fn into_verified(self) -> celestia_types::Result { + self.validate()?; + Ok(VerifiedExtendedHeaders(vec![self.to_owned()])) + } +} + +impl IntoVerifiedExtendedHeaders for Vec { + async fn into_verified(self) -> celestia_types::Result { + VerifiedExtendedHeaders::new(self).await + } +} + +impl<'a> IntoVerifiedExtendedHeaders for &'a [ExtendedHeader] { + async fn into_verified(self) -> celestia_types::Result { + validate_headers(self).await?; + verify_headers(self)?; + Ok(VerifiedExtendedHeaders(Vec::from(self))) + } +} + +impl<'a> IntoVerifiedExtendedHeaders for &'a mut [ExtendedHeader] { + async fn into_verified(self) -> celestia_types::Result { + validate_headers(self).await?; + verify_headers(self)?; + Ok(VerifiedExtendedHeaders(Vec::from(self))) + } +} + +impl IntoVerifiedExtendedHeaders for [ExtendedHeader; N] { + async fn into_verified(self) -> celestia_types::Result { + validate_headers(&self[..]).await?; + verify_headers(&self[..])?; + Ok(VerifiedExtendedHeaders(Vec::from(self))) + } +} + +impl<'a, const N: usize> IntoVerifiedExtendedHeaders for &'a [ExtendedHeader; N] { + async fn into_verified(self) -> celestia_types::Result { + validate_headers(&self[..]).await?; + verify_headers(&self[..])?; + Ok(VerifiedExtendedHeaders(Vec::from(self))) + } +} + +impl<'a, const N: usize> IntoVerifiedExtendedHeaders for &'a mut [ExtendedHeader; N] { + async fn into_verified(self) -> celestia_types::Result { + validate_headers(&self[..]).await?; + verify_headers(&self[..])?; + Ok(VerifiedExtendedHeaders(Vec::from(self))) + } +} + +pub(crate) fn verify_headers(headers: &[ExtendedHeader]) -> celestia_types::Result<()> { + let Some(head) = headers.first() else { + return Ok(()); + }; + + head.verify_adjacent_range(&headers[1..]) +} + pub(crate) async fn validate_headers(headers: &[ExtendedHeader]) -> celestia_types::Result<()> { for headers in headers.chunks(VALIDATIONS_PER_YIELD) { for header in headers { diff --git a/node/src/syncer.rs b/node/src/syncer.rs index 094a052a..69a0e408 100644 --- a/node/src/syncer.rs +++ b/node/src/syncer.rs @@ -30,7 +30,7 @@ use crate::events::{EventPublisher, NodeEvent}; use crate::executor::{sleep, spawn, spawn_cancellable, Interval}; use crate::p2p::{P2p, P2pError}; use crate::store::utils::calculate_range_to_fetch; -use crate::store::{Store, StoreError}; +use crate::store::{Store, StoreError, ValidatedExtendedHeader, ValidatedExtendedHeaders}; use crate::utils::OneshotSenderExt; type Result = std::result::Result; @@ -178,7 +178,7 @@ where event_pub: EventPublisher, p2p: Arc, store: Arc, - header_sub_watcher: watch::Receiver>, + header_sub_watcher: watch::Receiver>, subjective_head_height: Option, batch_size: u64, ongoing_batch: Option, @@ -415,8 +415,6 @@ where if let Ok(store_head_height) = self.store.head_height().await { // If our new header is adjacent to the HEAD of the store if store_head_height + 1 == new_head_height { - // Header is already verified by HeaderSub and will be validated against previous - // head on insert if self.store.insert(new_head).await.is_ok() { self.event_pub.send(NodeEvent::AddedHeaderFromHeaderSub { height: new_head_height, @@ -439,7 +437,7 @@ where #[instrument(skip_all)] async fn fetch_next_batch( &mut self, - headers_tx: &mpsc::Sender<(Result, P2pError>, Duration)>, + headers_tx: &mpsc::Sender<(Result, Duration)>, ) { if self.ongoing_batch.is_some() { // Another batch is ongoing. We do not parallelize `Syncer` @@ -514,7 +512,7 @@ where #[instrument(skip_all)] async fn on_fetch_next_batch_result( &mut self, - res: Result, P2pError>, + res: Result, took: Duration, ) { let Some(ongoing) = self.ongoing_batch.take() else { @@ -588,6 +586,7 @@ mod tests { use crate::block_ranges::{BlockRange, BlockRangeExt}; use crate::events::EventChannel; use crate::p2p::header_session; + use crate::store::utils::validate_headers; use crate::store::InMemoryStore; use crate::test_utils::{async_test, gen_filled_store, MockP2pHandle}; use celestia_types::test_utils::ExtendedHeaderGenerator; @@ -884,6 +883,8 @@ mod tests { let headers = ExtendedHeaderGenerator::new().next_many(20); let headers_prime = ExtendedHeaderGenerator::new().next_many(20); + //validate_headers(&headers).await.unwrap(); + // Start Syncer and report last header as network head let (syncer, store, mut p2p_mock) = initialized_syncer(headers[19].clone()).await; @@ -891,13 +892,17 @@ mod tests { handle_session_batch(&mut p2p_mock, &headers_prime, 1..=19, true).await; // Syncer should not apply headers from invalid response + dbg!(1); assert_syncing(&syncer, &store, &[20..=20], 20).await; + dbg!(2); // Syncer requests missing headers again handle_session_batch(&mut p2p_mock, &headers, 1..=19, true).await; + dbg!(3); // With a correct resposne, syncer should update the store assert_syncing(&syncer, &store, &[1..=20], 20).await; + dbg!(4); } async fn assert_syncing( @@ -908,7 +913,7 @@ mod tests { ) { // Syncer receives responds on the same loop that receive other events. // Wait a bit to be processed. - sleep(Duration::from_millis(1)).await; + sleep(Duration::from_millis(10)).await; let store_ranges = store.get_stored_header_ranges().await.unwrap(); let syncing_info = syncer.info().await.unwrap(); diff --git a/node/src/test_utils.rs b/node/src/test_utils.rs index 8c1122a1..98865c61 100644 --- a/node/src/test_utils.rs +++ b/node/src/test_utils.rs @@ -16,7 +16,9 @@ use crate::{ node::NodeConfig, p2p::{P2pCmd, P2pError}, peer_tracker::PeerTrackerInfo, - store::{ExtendedHeaderGeneratorExt, InMemoryStore}, + store::{ + InMemoryStore, ValidatedExtendedHeader, ValidatedExtendedHeaders, VerifiedExtendedHeaders, + }, utils::OneshotResultSender, }; @@ -27,6 +29,25 @@ pub(crate) use tokio::test as async_test; #[cfg(target_arch = "wasm32")] pub(crate) use wasm_bindgen_test::wasm_bindgen_test as async_test; +/// Extends test header generator for easier insertion into the store +pub trait ExtendedHeaderGeneratorExt { + /// Generate next amount verified headers + fn next_many_verified(&mut self, amount: u64) -> VerifiedExtendedHeaders; + + /// Generate next amount validated headers + fn next_many_validated(&mut self, amount: u64) -> ValidatedExtendedHeaders; +} + +impl ExtendedHeaderGeneratorExt for ExtendedHeaderGenerator { + fn next_many_verified(&mut self, amount: u64) -> VerifiedExtendedHeaders { + unsafe { VerifiedExtendedHeaders::new_unchecked(self.next_many(amount)) } + } + + fn next_many_validated(&mut self, amount: u64) -> ValidatedExtendedHeaders { + unsafe { ValidatedExtendedHeaders::new_unchecked(self.next_many(amount)) } + } +} + /// Generate a store pre-filled with headers. pub async fn gen_filled_store(amount: u64) -> (InMemoryStore, ExtendedHeaderGenerator) { let s = InMemoryStore::new(); @@ -80,7 +101,7 @@ pub struct MockP2pHandle { #[allow(dead_code)] pub(crate) cmd_tx: mpsc::Sender, pub(crate) cmd_rx: mpsc::Receiver, - pub(crate) header_sub_tx: watch::Sender>, + pub(crate) header_sub_tx: watch::Sender>, pub(crate) peer_tracker_tx: watch::Sender, } @@ -109,7 +130,7 @@ impl MockP2pHandle { } /// Simulate a new header announced in the network. - pub fn announce_new_head(&self, header: ExtendedHeader) { + pub fn announce_new_head(&self, header: ValidatedExtendedHeader) { self.header_sub_tx.send_replace(Some(header)); } @@ -146,7 +167,7 @@ impl MockP2pHandle { &mut self, ) -> ( HeaderRequest, - OneshotResultSender, P2pError>, + OneshotResultSender, ) { match self.expect_cmd().await { P2pCmd::HeaderExRequest { @@ -162,7 +183,11 @@ impl MockP2pHandle { /// [`P2p`]: crate::p2p::P2p pub async fn expect_header_request_for_height_cmd( &mut self, - ) -> (u64, u64, OneshotResultSender, P2pError>) { + ) -> ( + u64, + u64, + OneshotResultSender, + ) { let (req, respond_to) = self.expect_header_request_cmd().await; match req.data { @@ -176,7 +201,10 @@ impl MockP2pHandle { /// [`P2p`]: crate::p2p::P2p pub async fn expect_header_request_for_hash_cmd( &mut self, - ) -> (Hash, OneshotResultSender, P2pError>) { + ) -> ( + Hash, + OneshotResultSender, + ) { let (req, respond_to) = self.expect_header_request_cmd().await; match req.data { @@ -192,7 +220,7 @@ impl MockP2pHandle { /// Assert that a header-sub initialization command was sent to the [`P2p`] worker. /// /// [`P2p`]: crate::p2p::P2p - pub async fn expect_init_header_sub(&mut self) -> ExtendedHeader { + pub async fn expect_init_header_sub(&mut self) -> ValidatedExtendedHeader { match self.expect_cmd().await { P2pCmd::InitHeaderSub { head } => *head, cmd => panic!("Expecting InitHeaderSub, but received: {cmd:?}"), diff --git a/node/tests/header_ex.rs b/node/tests/header_ex.rs index 80f319f9..c1abd9a4 100644 --- a/node/tests/header_ex.rs +++ b/node/tests/header_ex.rs @@ -6,7 +6,7 @@ use celestia_types::test_utils::{invalidate, unverify}; use lumina_node::{ node::{Node, NodeConfig, NodeError}, p2p::{HeaderExError, P2pError}, - store::{Store, VerifiedExtendedHeaders}, + store::{Store, ValidExtendedHeadersChain}, test_utils::{gen_filled_store, listening_test_node_config, test_node_config}, }; use tokio::time::{sleep, timeout}; @@ -254,7 +254,7 @@ async fn replaced_header_server_store() { server_headers[10] = replaced_header.clone(); server_store - .insert(unsafe { VerifiedExtendedHeaders::new_unchecked(server_headers.clone()) }) + .insert(unsafe { ValidExtendedHeadersChain::new_unchecked(server_headers.clone()) }) .await .unwrap(); @@ -373,7 +373,7 @@ async fn unverified_header_server_store() { unverify(&mut server_headers[10]); server_store - .insert(unsafe { VerifiedExtendedHeaders::new_unchecked(server_headers.clone()) }) + .insert(unsafe { ValidExtendedHeadersChain::new_unchecked(server_headers.clone()) }) .await .unwrap(); diff --git a/node/tests/node.rs b/node/tests/node.rs index 7c2fc6b3..56cc5026 100644 --- a/node/tests/node.rs +++ b/node/tests/node.rs @@ -11,9 +11,10 @@ use futures::StreamExt; use libp2p::swarm::NetworkBehaviour; use libp2p::{gossipsub, identity, noise, ping, tcp, yamux, Multiaddr, SwarmBuilder}; use lumina_node::node::{Node, NodeConfig}; -use lumina_node::store::{ExtendedHeaderGeneratorExt, InMemoryStore, Store}; +use lumina_node::store::{InMemoryStore, Store}; use lumina_node::test_utils::{ gen_filled_store, listening_test_node_config, test_node_config, test_node_config_with_keypair, + ExtendedHeaderGeneratorExt, }; use rand::Rng; use tokio::{select, spawn, sync::mpsc, time::sleep}; diff --git a/types/src/extended_header.rs b/types/src/extended_header.rs index 7ddfa168..aa18351d 100644 --- a/types/src/extended_header.rs +++ b/types/src/extended_header.rs @@ -264,6 +264,18 @@ impl ExtendedHeader { Ok(()) } + pub fn verify_adjacent(&self, untrusted: &ExtendedHeader) -> Result<()> { + if self.height().increment() != untrusted.height() { + bail_verification!( + "untrusted header height ({}) not adjacent to the current trusted ({})", + untrusted.height(), + self.height(), + ); + } + + self.verify(untrusted) + } + /// Verify a chain of adjacent untrusted headers. /// /// # Note