Skip to content

Commit

Permalink
fix(node)!: Do not skip header-sub reports when store writes are slow (
Browse files Browse the repository at this point in the history
…#333)

Co-authored-by: Mikołaj Florkiewicz <[email protected]>
  • Loading branch information
oblique and fl0rek authored Jul 25, 2024
1 parent a63c045 commit 4023855
Show file tree
Hide file tree
Showing 8 changed files with 116 additions and 91 deletions.
2 changes: 1 addition & 1 deletion node-wasm/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ impl NodeDriver {
let response = self.client.exec(command).await?;
let header = response.into_last_seen_network_head().check_variant()?;

Ok(header)
header.into()
}

/// Get the latest locally synced header.
Expand Down
10 changes: 6 additions & 4 deletions node-wasm/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,9 +164,11 @@ impl NodeWorker {
.context("could not serialise fetched headers")
}

async fn get_last_seen_network_head(&mut self) -> JsValue {
// JS interface returns `undefined`, if node haven't received any headers from HeaderSub yet
to_value(&self.node.get_network_head_header()).unwrap_or(JsValue::UNDEFINED)
async fn get_last_seen_network_head(&mut self) -> Result<JsValue> {
match self.node.get_network_head_header().await? {
Some(header) => to_value(&header).context("could not serialise head header"),
None => Ok(JsValue::UNDEFINED),
}
}

async fn get_sampling_metadata(&mut self, height: u64) -> Result<Option<SamplingMetadata>> {
Expand Down Expand Up @@ -222,7 +224,7 @@ impl NodeWorker {
.into(),
),
NodeCommand::LastSeenNetworkHead => {
WorkerResponse::LastSeenNetworkHead(self.get_last_seen_network_head().await)
WorkerResponse::LastSeenNetworkHead(self.get_last_seen_network_head().await.into())
}
NodeCommand::GetSamplingMetadata { height } => {
WorkerResponse::SamplingMetadata(self.get_sampling_metadata(height).await)
Expand Down
3 changes: 1 addition & 2 deletions node-wasm/src/worker/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,7 @@ pub(crate) enum WorkerResponse {
Listeners(Result<Vec<Multiaddr>>),
Header(JsResult<JsValue, Error>),
Headers(JsResult<Array, Error>),
#[serde(with = "serde_wasm_bindgen::preserve")]
LastSeenNetworkHead(JsValue),
LastSeenNetworkHead(JsResult<JsValue, Error>),
SamplingMetadata(Result<Option<SamplingMetadata>>),
WorkerClosed(()),
}
Expand Down
4 changes: 2 additions & 2 deletions node/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,8 +300,8 @@ where
}

/// Get the latest header announced in the network.
pub fn get_network_head_header(&self) -> Option<ExtendedHeader> {
self.p2p.header_sub_watcher().borrow().clone()
pub async fn get_network_head_header(&self) -> Result<Option<ExtendedHeader>> {
Ok(self.p2p.get_network_head().await?)
}

/// Get the latest locally synced header.
Expand Down
130 changes: 73 additions & 57 deletions node/src/p2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use celestia_types::nmt::Namespace;
use celestia_types::row::Row;
use celestia_types::sample::Sample;
use celestia_types::{fraud_proof::BadEncodingFraudProof, hash::Hash};
use celestia_types::{ExtendedHeader, FraudProof, Height};
use celestia_types::{ExtendedHeader, FraudProof};
use cid::Cid;
use futures::StreamExt;
use libp2p::{
Expand Down Expand Up @@ -165,7 +165,6 @@ impl From<oneshot::error::RecvError> for P2pError {
#[derive(Debug)]
pub struct P2p {
cmd_tx: mpsc::Sender<P2pCmd>,
header_sub_watcher: watch::Receiver<Option<ExtendedHeader>>,
peer_tracker_info_watcher: watch::Receiver<PeerTrackerInfo>,
local_peer_id: PeerId,
}
Expand Down Expand Up @@ -209,6 +208,8 @@ pub(crate) enum P2pCmd {
},
InitHeaderSub {
head: Box<ExtendedHeader>,
/// Any valid headers received by header-sub will be send to this channel.
channel: mpsc::Sender<ExtendedHeader>,
},
SetPeerTrust {
peer_id: PeerId,
Expand All @@ -221,6 +222,9 @@ pub(crate) enum P2pCmd {
GetNetworkCompromisedToken {
respond_to: oneshot::Sender<CancellationToken>,
},
GetNetworkHead {
respond_to: oneshot::Sender<Option<ExtendedHeader>>,
},
}

impl P2p {
Expand All @@ -234,21 +238,18 @@ impl P2p {

let local_peer_id = PeerId::from(args.local_keypair.public());

let (cmd_tx, cmd_rx) = mpsc::channel(16);
let (header_sub_tx, header_sub_rx) = watch::channel(None);

let peer_tracker = Arc::new(PeerTracker::new(args.event_pub.clone()));
let peer_tracker_info_watcher = peer_tracker.info_watcher();

let mut worker = Worker::new(args, cmd_rx, header_sub_tx, peer_tracker)?;
let (cmd_tx, cmd_rx) = mpsc::channel(16);
let mut worker = Worker::new(args, cmd_rx, peer_tracker)?;

spawn(async move {
worker.run().await;
});

Ok(P2p {
cmd_tx,
header_sub_watcher: header_sub_rx,
peer_tracker_info_watcher,
local_peer_id,
})
Expand All @@ -258,20 +259,18 @@ impl P2p {
#[cfg(any(test, feature = "test-utils"))]
pub fn mocked() -> (Self, crate::test_utils::MockP2pHandle) {
let (cmd_tx, cmd_rx) = mpsc::channel(16);
let (header_sub_tx, header_sub_rx) = watch::channel(None);
let (peer_tracker_tx, peer_tracker_rx) = watch::channel(PeerTrackerInfo::default());

let p2p = P2p {
cmd_tx: cmd_tx.clone(),
header_sub_watcher: header_sub_rx,
peer_tracker_info_watcher: peer_tracker_rx,
local_peer_id: PeerId::random(),
};

let handle = crate::test_utils::MockP2pHandle {
cmd_tx,
cmd_rx,
header_sub_tx,
header_sub_tx: None,
peer_tracker_tx,
};

Expand All @@ -296,11 +295,6 @@ impl P2p {
.map_err(|_| P2pError::WorkerDied)
}

/// Watcher for the latest verified network head headers announced on `header-sub`.
pub fn header_sub_watcher(&self) -> watch::Receiver<Option<ExtendedHeader>> {
self.header_sub_watcher.clone()
}

/// Watcher for the current [`PeerTrackerInfo`].
pub fn peer_tracker_info_watcher(&self) -> watch::Receiver<PeerTrackerInfo> {
self.peer_tracker_info_watcher.clone()
Expand All @@ -312,9 +306,14 @@ impl P2p {
}

/// Initializes `header-sub` protocol with a given `subjective_head`.
pub async fn init_header_sub(&self, head: ExtendedHeader) -> Result<()> {
pub async fn init_header_sub(
&self,
head: ExtendedHeader,
channel: mpsc::Sender<ExtendedHeader>,
) -> Result<()> {
self.send_command(P2pCmd::InitHeaderSub {
head: Box::new(head),
channel,
})
.await
}
Expand Down Expand Up @@ -545,6 +544,16 @@ impl P2p {

Ok(rx.await?)
}

/// Get the latest header announced on the network.
pub async fn get_network_head(&self) -> Result<Option<ExtendedHeader>> {
let (tx, rx) = oneshot::channel();

self.send_command(P2pCmd::GetNetworkHead { respond_to: tx })
.await?;

Ok(rx.await?)
}
}

/// Our network behaviour.
Expand Down Expand Up @@ -573,12 +582,17 @@ where
bad_encoding_fraud_sub_topic: TopicHash,
cmd_rx: mpsc::Receiver<P2pCmd>,
peer_tracker: Arc<PeerTracker>,
header_sub_watcher: watch::Sender<Option<ExtendedHeader>>,
header_sub_state: Option<HeaderSubState>,
bitswap_queries: HashMap<beetswap::QueryId, OneshotResultSender<Vec<u8>, P2pError>>,
network_compromised_token: CancellationToken,
store: Arc<S>,
}

struct HeaderSubState {
known_head: ExtendedHeader,
channel: mpsc::Sender<ExtendedHeader>,
}

impl<B, S> Worker<B, S>
where
B: Blockstore,
Expand All @@ -587,7 +601,6 @@ where
fn new(
args: P2pArgs<B, S>,
cmd_rx: mpsc::Receiver<P2pCmd>,
header_sub_watcher: watch::Sender<Option<ExtendedHeader>>,
peer_tracker: Arc<PeerTracker>,
) -> Result<Self, P2pError> {
let local_peer_id = PeerId::from(args.local_keypair.public());
Expand Down Expand Up @@ -649,7 +662,7 @@ where
bad_encoding_fraud_sub_topic: bad_encoding_fraud_sub_topic.hash(),
header_sub_topic_hash: header_sub_topic.hash(),
peer_tracker,
header_sub_watcher,
header_sub_state: None,
bitswap_queries: HashMap::new(),
network_compromised_token: CancellationToken::new(),
store: args.store,
Expand Down Expand Up @@ -771,8 +784,8 @@ where
P2pCmd::ConnectedPeers { respond_to } => {
respond_to.maybe_send(self.peer_tracker.connected_peers());
}
P2pCmd::InitHeaderSub { head } => {
self.on_init_header_sub(*head);
P2pCmd::InitHeaderSub { head, channel } => {
self.on_init_header_sub(*head, channel);
}
P2pCmd::SetPeerTrust {
peer_id,
Expand All @@ -786,7 +799,14 @@ where
self.on_get_shwap_cid(cid, respond_to);
}
P2pCmd::GetNetworkCompromisedToken { respond_to } => {
respond_to.maybe_send(self.network_compromised_token.child_token())
respond_to.maybe_send(self.network_compromised_token.child_token());
}
P2pCmd::GetNetworkHead { respond_to } => {
let head = self
.header_sub_state
.as_ref()
.map(|state| state.known_head.clone());
respond_to.maybe_send(head);
}
}

Expand Down Expand Up @@ -836,7 +856,7 @@ where
};

let acceptance = if message.topic == self.header_sub_topic_hash {
self.on_header_sub_message(&message.data[..]).await
self.on_header_sub_message(&message.data[..])
} else if message.topic == self.bad_encoding_fraud_sub_topic {
self.on_bad_encoding_fraud_sub_message(&message.data[..], &peer)
.await
Expand Down Expand Up @@ -961,41 +981,41 @@ where
}

#[instrument(skip_all, fields(header = %head))]
fn on_init_header_sub(&mut self, head: ExtendedHeader) {
self.header_sub_watcher.send_replace(Some(head));
fn on_init_header_sub(&mut self, head: ExtendedHeader, channel: mpsc::Sender<ExtendedHeader>) {
self.header_sub_state = Some(HeaderSubState {
known_head: head,
channel,
});
trace!("HeaderSub initialized");
}

#[instrument(skip_all)]
async fn on_header_sub_message(&mut self, data: &[u8]) -> gossipsub::MessageAcceptance {
fn on_header_sub_message(&mut self, data: &[u8]) -> gossipsub::MessageAcceptance {
let Ok(header) = ExtendedHeader::decode_and_validate(data) else {
trace!("Malformed or invalid header from header-sub");
return gossipsub::MessageAcceptance::Reject;
};

trace!("Received header from header-sub ({header})");

let updated = self.header_sub_watcher.send_if_modified(move |state| {
let Some(known_header) = state else {
debug!("HeaderSub not initialized yet");
return false;
};
let Some(ref mut state) = self.header_sub_state else {
debug!("header-sub not initialized yet");
return gossipsub::MessageAcceptance::Ignore;
};

if known_header.verify(&header).is_err() {
trace!("Failed to verify HeaderSub header. Ignoring {header}");
return false;
}
if state.known_head.verify(&header).is_err() {
trace!("Failed to verify HeaderSub header. Ignoring {header}");
return gossipsub::MessageAcceptance::Ignore;
}

debug!("New header from header-sub ({header})");
*state = Some(header);
true
});
trace!("New header from header-sub ({header})");

if updated {
gossipsub::MessageAcceptance::Accept
} else {
gossipsub::MessageAcceptance::Ignore
}
state.known_head = header.clone();
// We intentionally do not `send().await` to avoid blocking `P2p`
// in case `Syncer` enters some weird state.
let _ = state.channel.try_send(header);

gossipsub::MessageAcceptance::Accept
}

#[instrument(skip_all)]
Expand All @@ -1011,15 +1031,15 @@ where
};

let height = befp.height().value();
let current_height =
if let Some(network_height) = network_head_height(&self.header_sub_watcher) {
network_height.value()
} else if let Ok(local_head) = self.store.get_head().await {
local_head.height().value()
} else {
// we aren't tracking the network and have uninitialized store
return gossipsub::MessageAcceptance::Ignore;
};

let current_height = if let Some(ref header_sub_state) = self.header_sub_state {
header_sub_state.known_head.height().value()
} else if let Ok(local_head) = self.store.get_head().await {
local_head.height().value()
} else {
// we aren't tracking the network and have uninitialized store
return gossipsub::MessageAcceptance::Ignore;
};

if height > current_height + FRAUD_PROOF_HEAD_HEIGHT_THRESHOLD {
// does this threshold make any sense if we're gonna ignore it anyway
Expand Down Expand Up @@ -1156,7 +1176,3 @@ where
.client_set_send_dont_have(false)
.build())
}

fn network_head_height(watcher: &watch::Sender<Option<ExtendedHeader>>) -> Option<Height> {
watcher.borrow().as_ref().map(|header| header.height())
}
Loading

0 comments on commit 4023855

Please sign in to comment.