Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!: Type-safe ExtendedHeader validation and verification types #331

Draft
wants to merge 7 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 13 additions & 5 deletions node/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,17 +224,17 @@ where

/// Request the head header from the network.
pub async fn request_head_header(&self) -> Result<ExtendedHeader> {
Ok(self.p2p.get_head_header().await?)
Ok(self.p2p.get_head_header().await?.into())
}

/// Request a header for the block with a given hash from the network.
pub async fn request_header_by_hash(&self, hash: &Hash) -> Result<ExtendedHeader> {
Ok(self.p2p.get_header(*hash).await?)
Ok(self.p2p.get_header(*hash).await?.into())
}

/// Request a header for the block with a given height from the network.
pub async fn request_header_by_height(&self, hash: u64) -> Result<ExtendedHeader> {
Ok(self.p2p.get_header_by_height(hash).await?)
Ok(self.p2p.get_header_by_height(hash).await?.into())
}

/// Request headers in range (from, from + amount] from the network.
Expand All @@ -245,7 +245,11 @@ where
from: &ExtendedHeader,
amount: u64,
) -> Result<Vec<ExtendedHeader>> {
Ok(self.p2p.get_verified_headers_range(from, amount).await?)
Ok(self
.p2p
.get_verified_headers_range(from, amount)
.await?
.into())
}

/// Request a verified [`Row`] from the network.
Expand Down Expand Up @@ -301,7 +305,11 @@ where

/// Get the latest header announced in the network.
pub fn get_network_head_header(&self) -> Option<ExtendedHeader> {
self.p2p.header_sub_watcher().borrow().clone()
self.p2p
.header_sub_watcher()
.borrow()
.clone()
.map(|h| h.into())
}

/// Get the latest locally synced header.
Expand Down
72 changes: 40 additions & 32 deletions node/src/p2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ use crate::p2p::shwap::{namespaced_data_cid, row_cid, sample_cid, ShwapMultihash
use crate::p2p::swarm::new_swarm;
use crate::peer_tracker::PeerTracker;
use crate::peer_tracker::PeerTrackerInfo;
use crate::store::Store;
use crate::store::utils::ValidatedExtendedHeader;
use crate::store::{Store, ValidatedExtendedHeaders, VerifiedExtendedHeaders};
use crate::utils::{
celestia_protocol_id, fraudsub_ident_topic, gossipsub_ident_topic, MultiaddrExt,
OneshotResultSender, OneshotResultSenderExt, OneshotSenderExt,
Expand Down Expand Up @@ -157,7 +158,7 @@ impl From<oneshot::error::RecvError> for P2pError {
#[derive(Debug)]
pub struct P2p {
cmd_tx: mpsc::Sender<P2pCmd>,
header_sub_watcher: watch::Receiver<Option<ExtendedHeader>>,
header_sub_watcher: watch::Receiver<Option<ValidatedExtendedHeader>>,
peer_tracker_info_watcher: watch::Receiver<PeerTrackerInfo>,
local_peer_id: PeerId,
}
Expand Down Expand Up @@ -191,7 +192,7 @@ pub(crate) enum P2pCmd {
},
HeaderExRequest {
request: HeaderRequest,
respond_to: OneshotResultSender<Vec<ExtendedHeader>, P2pError>,
respond_to: OneshotResultSender<ValidatedExtendedHeaders, P2pError>,
},
Listeners {
respond_to: oneshot::Sender<Vec<Multiaddr>>,
Expand All @@ -200,7 +201,7 @@ pub(crate) enum P2pCmd {
respond_to: oneshot::Sender<Vec<PeerId>>,
},
InitHeaderSub {
head: Box<ExtendedHeader>,
head: Box<ValidatedExtendedHeader>,
},
SetPeerTrust {
peer_id: PeerId,
Expand Down Expand Up @@ -289,7 +290,7 @@ impl P2p {
}

/// Watcher for the latest verified network head headers announced on `header-sub`.
pub fn header_sub_watcher(&self) -> watch::Receiver<Option<ExtendedHeader>> {
pub fn header_sub_watcher(&self) -> watch::Receiver<Option<ValidatedExtendedHeader>> {
self.header_sub_watcher.clone()
}

Expand All @@ -304,7 +305,7 @@ impl P2p {
}

/// Initializes `header-sub` protocol with a given `subjective_head`.
pub async fn init_header_sub(&self, head: ExtendedHeader) -> Result<()> {
pub async fn init_header_sub(&self, head: ValidatedExtendedHeader) -> Result<()> {
self.send_command(P2pCmd::InitHeaderSub {
head: Box::new(head),
})
Expand Down Expand Up @@ -340,7 +341,10 @@ impl P2p {
}

/// Send a request on the `header-ex` protocol.
pub async fn header_ex_request(&self, request: HeaderRequest) -> Result<Vec<ExtendedHeader>> {
pub async fn header_ex_request(
&self,
request: HeaderRequest,
) -> Result<ValidatedExtendedHeaders> {
let (tx, rx) = oneshot::channel();

self.send_command(P2pCmd::HeaderExRequest {
Expand All @@ -353,58 +357,64 @@ impl P2p {
}

/// Request the head header on the `header-ex` protocol.
pub async fn get_head_header(&self) -> Result<ExtendedHeader> {
pub async fn get_head_header(&self) -> Result<ValidatedExtendedHeader> {
self.get_header_by_height(0).await
}

/// Request the header by hash on the `header-ex` protocol.
pub async fn get_header(&self, hash: Hash) -> Result<ExtendedHeader> {
pub async fn get_header(&self, hash: Hash) -> Result<ValidatedExtendedHeader> {
self.header_ex_request(HeaderRequest {
data: Some(header_request::Data::Hash(hash.as_bytes().to_vec())),
amount: 1,
})
.await?
.into_validated_vec()
.into_iter()
.next()
.ok_or(HeaderExError::HeaderNotFound.into())
}

/// Request the header by height on the `header-ex` protocol.
pub async fn get_header_by_height(&self, height: u64) -> Result<ExtendedHeader> {
pub async fn get_header_by_height(&self, height: u64) -> Result<ValidatedExtendedHeader> {
self.header_ex_request(HeaderRequest {
data: Some(header_request::Data::Origin(height)),
amount: 1,
})
.await?
.into_validated_vec()
.into_iter()
.next()
.ok_or(HeaderExError::HeaderNotFound.into())
}

/// Request the headers following the one given with the `header-ex` protocol.
/// Request the headers followingValidatedExtendedHeader the one given with the `header-ex` protocol.
///
/// First header from the requested range will be verified against the provided one,
/// then each subsequent is verified against the previous one.
pub async fn get_verified_headers_range(
&self,
from: &ExtendedHeader,
amount: u64,
) -> Result<Vec<ExtendedHeader>> {
) -> Result<VerifiedExtendedHeaders> {
if amount == 0 {
return Err(HeaderExError::InvalidRequest.into());
}

// User can give us a bad header, so validate it.
from.validate().map_err(|_| HeaderExError::InvalidRequest)?;

let height = from.height().value() + 1;

let range = height..=height + amount - 1;

let mut session = HeaderSession::new(range, self.cmd_tx.clone());
let headers = session.run().await?;

// `.validate()` is called on each header separately by `HeaderExClientHandler`.
//
// The last step is to verify that all headers are from the same chain
// and indeed connected with the next one.
from.verify_adjacent_range(&headers)
// Verify the first header, the rest will be verified
// in `VerifiedExtendedHeaders::from_validated`.
from.verify_adjacent(&headers[0])
.map_err(|_| HeaderExError::InvalidResponse)?;

let headers = VerifiedExtendedHeaders::from_validated(headers)
.map_err(|_| HeaderExError::InvalidResponse)?;

Ok(headers)
Expand All @@ -417,21 +427,14 @@ impl P2p {
pub(crate) async fn get_unverified_header_range(
&self,
range: BlockRange,
) -> Result<Vec<ExtendedHeader>> {
) -> Result<ValidatedExtendedHeaders> {
if range.is_empty() {
return Err(HeaderExError::InvalidRequest.into());
}

let mut session = HeaderSession::new(range, self.cmd_tx.clone());
let headers = session.run().await?;

let Some(head) = headers.first() else {
return Err(HeaderExError::InvalidResponse.into());
};

head.verify_adjacent_range(&headers[1..])
.map_err(|_| HeaderExError::InvalidResponse)?;

Ok(headers)
}

Expand Down Expand Up @@ -565,7 +568,7 @@ where
bad_encoding_fraud_sub_topic: TopicHash,
cmd_rx: mpsc::Receiver<P2pCmd>,
peer_tracker: Arc<PeerTracker>,
header_sub_watcher: watch::Sender<Option<ExtendedHeader>>,
header_sub_watcher: watch::Sender<Option<ValidatedExtendedHeader>>,
bitswap_queries: HashMap<beetswap::QueryId, OneshotResultSender<Vec<u8>, P2pError>>,
network_compromised_token: CancellationToken,
store: Arc<S>,
Expand All @@ -579,7 +582,7 @@ where
fn new(
args: P2pArgs<B, S>,
cmd_rx: mpsc::Receiver<P2pCmd>,
header_sub_watcher: watch::Sender<Option<ExtendedHeader>>,
header_sub_watcher: watch::Sender<Option<ValidatedExtendedHeader>>,
peer_tracker: Arc<PeerTracker>,
) -> Result<Self, P2pError> {
let local_peer_id = PeerId::from(args.local_keypair.public());
Expand Down Expand Up @@ -948,15 +951,20 @@ where
}

#[instrument(skip_all, fields(header = %head))]
fn on_init_header_sub(&mut self, head: ExtendedHeader) {
fn on_init_header_sub(&mut self, head: ValidatedExtendedHeader) {
self.header_sub_watcher.send_replace(Some(head));
trace!("HeaderSub initialized");
}

#[instrument(skip_all)]
async fn on_header_sub_message(&mut self, data: &[u8]) -> gossipsub::MessageAcceptance {
let Ok(header) = ExtendedHeader::decode_and_validate(data) else {
trace!("Malformed or invalid header from header-sub");
let Ok(header) = ExtendedHeader::decode(data) else {
trace!("Malformed header from header-sub");
return gossipsub::MessageAcceptance::Reject;
};

let Ok(header) = ValidatedExtendedHeader::new(header) else {
trace!("Invalid header from header-sub");
return gossipsub::MessageAcceptance::Reject;
};

Expand Down Expand Up @@ -1142,6 +1150,6 @@ where
.build())
}

fn network_head_height(watcher: &watch::Sender<Option<ExtendedHeader>>) -> Option<Height> {
fn network_head_height(watcher: &watch::Sender<Option<ValidatedExtendedHeader>>) -> Option<Height> {
watcher.borrow().as_ref().map(|header| header.height())
}
5 changes: 2 additions & 3 deletions node/src/p2p/header_ex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use std::task::{Context, Poll};

use async_trait::async_trait;
use celestia_proto::p2p::pb::{HeaderRequest, HeaderResponse};
use celestia_types::ExtendedHeader;
use futures::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
use libp2p::{
core::Endpoint,
Expand All @@ -29,7 +28,7 @@ use crate::p2p::header_ex::client::HeaderExClientHandler;
use crate::p2p::header_ex::server::HeaderExServerHandler;
use crate::p2p::P2pError;
use crate::peer_tracker::PeerTracker;
use crate::store::Store;
use crate::store::{Store, ValidatedExtendedHeaders};
use crate::utils::{protocol_id, OneshotResultSender};

/// Size limit of a request in bytes
Expand Down Expand Up @@ -111,7 +110,7 @@ where
pub(crate) fn send_request(
&mut self,
request: HeaderRequest,
respond_to: OneshotResultSender<Vec<ExtendedHeader>, P2pError>,
respond_to: OneshotResultSender<ValidatedExtendedHeaders, P2pError>,
) {
self.client_handler
.on_send_request(&mut self.req_resp, request, respond_to);
Expand Down
21 changes: 10 additions & 11 deletions node/src/p2p/header_ex/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use std::task::{Context, Poll};

use celestia_proto::p2p::pb::header_request::Data;
use celestia_proto::p2p::pb::{HeaderRequest, HeaderResponse};
use celestia_types::ExtendedHeader;
use futures::future::join_all;
use libp2p::request_response::{OutboundFailure, OutboundRequestId};
use libp2p::PeerId;
Expand All @@ -19,7 +18,7 @@ use crate::p2p::header_ex::utils::{HeaderRequestExt, HeaderResponseExt};
use crate::p2p::header_ex::{HeaderExError, ReqRespBehaviour};
use crate::p2p::P2pError;
use crate::peer_tracker::PeerTracker;
use crate::store::utils::VALIDATIONS_PER_YIELD;
use crate::store::utils::{ValidatedExtendedHeaders, VALIDATIONS_PER_YIELD};
use crate::utils::{OneshotResultSender, OneshotResultSenderExt};

const MAX_PEERS: usize = 10;
Expand All @@ -34,7 +33,7 @@ where

struct State {
request: HeaderRequest,
respond_to: OneshotResultSender<Vec<ExtendedHeader>, P2pError>,
respond_to: OneshotResultSender<ValidatedExtendedHeaders, P2pError>,
}

pub(super) trait RequestSender {
Expand Down Expand Up @@ -67,7 +66,7 @@ where
&mut self,
sender: &mut S,
request: HeaderRequest,
respond_to: OneshotResultSender<Vec<ExtendedHeader>, P2pError>,
respond_to: OneshotResultSender<ValidatedExtendedHeaders, P2pError>,
) {
if !request.is_valid() {
respond_to.maybe_send_err(HeaderExError::InvalidRequest);
Expand All @@ -87,7 +86,7 @@ where
&mut self,
sender: &mut S,
request: HeaderRequest,
respond_to: OneshotResultSender<Vec<ExtendedHeader>, P2pError>,
respond_to: OneshotResultSender<ValidatedExtendedHeaders, P2pError>,
) {
// Validate amount
if usize::try_from(request.amount).is_err() {
Expand All @@ -113,7 +112,7 @@ where
&mut self,
sender: &mut S,
request: HeaderRequest,
respond_to: OneshotResultSender<Vec<ExtendedHeader>, P2pError>,
respond_to: OneshotResultSender<ValidatedExtendedHeaders, P2pError>,
) {
const MIN_HEAD_RESPONSES: usize = 2;

Expand Down Expand Up @@ -149,7 +148,7 @@ where
.into_iter()
// In case of HEAD all responses have only 1 header.
// This was already enforced by `decode_and_verify_responses`.
.filter_map(|v| v.ok()?.ok()?.into_iter().next())
.filter_map(|v| v.ok()?.ok()?.into_validated_vec().into_iter().next())
.collect();
let mut counter: HashMap<_, usize> = HashMap::new();

Expand All @@ -173,14 +172,14 @@ where
// Return the header with the highest height that was received by at least 2 peers
for resp in &resps {
if counter[&resp.hash()] >= MIN_HEAD_RESPONSES {
respond_to.maybe_send_ok(vec![resp.to_owned()]);
respond_to.maybe_send_ok(resp.into());
return;
}
}

// Otherwise return the header with the maximum height
let resp = resps.into_iter().next().expect("no reposnes");
respond_to.maybe_send_ok(vec![resp]);
respond_to.maybe_send_ok(resp.into());
});
}

Expand Down Expand Up @@ -238,7 +237,7 @@ where
async fn decode_and_verify_responses(
request: &HeaderRequest,
responses: &[HeaderResponse],
) -> Result<Vec<ExtendedHeader>, HeaderExError> {
) -> Result<ValidatedExtendedHeaders, HeaderExError> {
if responses.is_empty() {
return Err(HeaderExError::InvalidResponse);
}
Expand Down Expand Up @@ -298,7 +297,7 @@ async fn decode_and_verify_responses(
_ => return Err(HeaderExError::InvalidResponse),
}

Ok(headers)
Ok(headers.into())
}

#[cfg(test)]
Expand Down
Loading
Loading