Skip to content

Commit

Permalink
feat: report protocol mismatch error
Browse files Browse the repository at this point in the history
  • Loading branch information
RolandSherwin committed Apr 8, 2024
1 parent 71ea622 commit f50a71f
Show file tree
Hide file tree
Showing 8 changed files with 109 additions and 41 deletions.
23 changes: 15 additions & 8 deletions sn_cli/src/bin/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@ async fn main() -> Result<()> {

// get the broadcaster as we want to have our own progress bar.
let broadcaster = ClientEventsBroadcaster::default();
let progress_bar_handler = spawn_connection_progress_bar(broadcaster.subscribe());
let (progress_bar, progress_bar_handler) =
spawn_connection_progress_bar(broadcaster.subscribe());

let result = Client::new(
secret_key,
Expand All @@ -121,11 +122,15 @@ async fn main() -> Result<()> {
Some(broadcaster),
)
.await;

// await on the progress bar to complete before handling the client result. If client errors out, we would
// want to make the progress bar clean up gracefully.
let client = match result {
Ok(client) => client,
Err(err) => {
// clean up progress bar
progress_bar.finish_with_message("Could not connect to the network");
return Err(err.into());
}
};
progress_bar_handler.await?;
let client = result?;

// default to verifying storage
let should_verify_store = !opt.no_verify;
Expand Down Expand Up @@ -153,17 +158,18 @@ async fn main() -> Result<()> {

/// Helper to subscribe to the client events broadcaster and spin up a progress bar that terminates when the
/// client successfully connects to the network or if it errors out.
fn spawn_connection_progress_bar(mut rx: ClientEventsReceiver) -> JoinHandle<()> {
fn spawn_connection_progress_bar(mut rx: ClientEventsReceiver) -> (ProgressBar, JoinHandle<()>) {
// Network connection progress bar
let progress_bar = ProgressBar::new_spinner();
let progress_bar_clone = progress_bar.clone();
progress_bar.enable_steady_tick(Duration::from_millis(120));
progress_bar.set_message("Connecting to The SAFE Network...");
let new_style = progress_bar.style().tick_chars("⠁⠂⠄⡀⢀⠠⠐⠈🔗");
progress_bar.set_style(new_style);

progress_bar.set_message("Connecting to The SAFE Network...");

tokio::spawn(async move {
let handle = tokio::spawn(async move {
let mut peers_connected = 0;
loop {
match rx.recv().await {
Expand All @@ -190,7 +196,8 @@ fn spawn_connection_progress_bar(mut rx: ClientEventsReceiver) -> JoinHandle<()>
_ => {}
}
}
})
});
(progress_bar_clone, handle)
}

fn get_client_secret_key(root_dir: &PathBuf) -> Result<SecretKey> {
Expand Down
30 changes: 26 additions & 4 deletions sn_client/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,9 @@ impl Client {
// loop to connect to the network
let mut is_connected = false;
let connection_timeout = connection_timeout.unwrap_or(CONNECTION_TIMEOUT);
let mut unsupported_protocol_tracker: Option<(String, String)> = None;

debug!("Client connection timeout: {connection_timeout:?}");
let mut connection_timeout_interval = interval(connection_timeout);
// first tick completes immediately
connection_timeout_interval.tick().await;
Expand All @@ -200,16 +202,26 @@ impl Client {
tokio::select! {
_ = connection_timeout_interval.tick() => {
if !is_connected {
if let Some((our_protocol, their_protocols)) = unsupported_protocol_tracker {
error!("Timeout: Client could not connect to the network as it does not support the protocol");
break Err(Error::UnsupportedProtocol(our_protocol, their_protocols));
}
error!("Timeout: Client failed to connect to the network within {connection_timeout:?}");
return Err(Error::ConnectionTimeout(connection_timeout));
break Err(Error::ConnectionTimeout(connection_timeout));
}
}
event = client_events_rx.recv() => {
match event {
// we do not error out directly as we might still connect if the other initial peers are from
// the correct network.
Ok(ClientEvent::PeerWithUnsupportedProtocol { our_protocol, their_protocol }) => {
warn!(%our_protocol, %their_protocol, "Client tried to connect to a peer with an unsupported protocol. Tracking the latest one");
unsupported_protocol_tracker = Some((our_protocol, their_protocol));
}
Ok(ClientEvent::ConnectedToNetwork) => {
is_connected = true;
info!("Client connected to the Network {is_connected:?}.");
break;
break Ok(());
}
Ok(ClientEvent::InactiveClient(timeout)) => {
if is_connected {
Expand All @@ -221,12 +233,12 @@ impl Client {
Err(err) => {
error!("Unexpected error during client startup {err:?}");
println!("Unexpected error during client startup {err:?}");
return Err(err.into());
break Err(err.into());
}
_ => {}
}
}}
}
}?;

Ok(client)
}
Expand All @@ -252,6 +264,16 @@ impl Client {
debug!("{peers_added}/{CLOSE_GROUP_SIZE} initial peers found.",);
}
}
NetworkEvent::PeerWithUnsupportedProtocol {
our_protocol,
their_protocol,
} => {
self.events_broadcaster
.broadcast(ClientEvent::PeerWithUnsupportedProtocol {
our_protocol,
their_protocol,
});
}
_other => {}
}

Expand Down
3 changes: 3 additions & 0 deletions sn_client/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,9 @@ pub enum Error {
#[error("Could not find register after batch sync: {0:?}")]
RegisterNotFoundAfterUpload(XorName),

#[error("Could not connect due to incompatible network protocols. Our protocol: {0} Network protocol: {1}")]
UnsupportedProtocol(String, String),

// ------ Upload Errors --------
#[error("Overflow occurred while adding values")]
NumericOverflow,
Expand Down
5 changes: 5 additions & 0 deletions sn_client/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ pub enum ClientEvent {
/// A peer has been added to the Routing table.
/// Also contains the max number of peers to connect to before we receive ClientEvent::ConnectedToNetwork
PeerAdded { max_peers_to_connect: usize },
/// We've encountered a Peer with an unsupported protocol.
PeerWithUnsupportedProtocol {
our_protocol: String,
their_protocol: String,
},
/// The client has been connected to the network
ConnectedToNetwork,
/// No network activity has been received for a given duration
Expand Down
32 changes: 18 additions & 14 deletions sn_faucet/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,37 +60,40 @@ async fn main() -> Result<()> {

let secret_key = bls::SecretKey::random();
let broadcaster = ClientEventsBroadcaster::default();
let handle = spawn_connection_progress_bar(broadcaster.subscribe());
let (progress_bar, handle) = spawn_connection_progress_bar(broadcaster.subscribe());
let result = Client::new(secret_key, bootstrap_peers, None, Some(broadcaster)).await;

// await on the progress bar to complete before handling the client result. If client errors out, we would
// want to make the progress bar clean up gracefully.
handle.await?;
match result {
Ok(client) => {
if let Err(err) = faucet_cmds(opt.cmd.clone(), &client).await {
error!("Failed to run faucet cmd {:?} with err {err:?}", opt.cmd)
}
let client = match result {
Ok(client) => client,
Err(err) => {
// clean up progress bar
progress_bar.finish_with_message("Could not connect to the network");
error!("Failed to get Client with err {err:?}");
return Err(err.into());
}
Err(err) => error!("Failed to get Client with err {err:?}"),
};
handle.await?;

if let Err(err) = faucet_cmds(opt.cmd.clone(), &client).await {
error!("Failed to run faucet cmd {:?} with err {err:?}", opt.cmd)
}

Ok(())
}

/// Helper to subscribe to the client events broadcaster and spin up a progress bar that terminates when the
/// client successfully connects to the network or if it errors out.
fn spawn_connection_progress_bar(mut rx: ClientEventsReceiver) -> JoinHandle<()> {
fn spawn_connection_progress_bar(mut rx: ClientEventsReceiver) -> (ProgressBar, JoinHandle<()>) {
// Network connection progress bar
let progress_bar = ProgressBar::new_spinner();
let progress_bar_clone = progress_bar.clone();
progress_bar.enable_steady_tick(Duration::from_millis(120));
progress_bar.set_message("Connecting to The SAFE Network...");
let new_style = progress_bar.style().tick_chars("⠁⠂⠄⡀⢀⠠⠐⠈🔗");
progress_bar.set_style(new_style);

progress_bar.set_message("Connecting to The SAFE Network...");

tokio::spawn(async move {
let handle = tokio::spawn(async move {
let mut peers_connected = 0;
loop {
match rx.recv().await {
Expand All @@ -117,7 +120,8 @@ fn spawn_connection_progress_bar(mut rx: ClientEventsReceiver) -> JoinHandle<()>
_ => {}
}
}
})
});
(progress_bar_clone, handle)
}

#[derive(Parser)]
Expand Down
1 change: 1 addition & 0 deletions sn_networking/src/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,7 @@ impl NetworkBuilder {

// 1mb packet size
let _ = kad_cfg
.set_kbucket_inserts(libp2p::kad::BucketInserts::Manual)
.set_max_packet_size(MAX_PACKET_SIZE)
// Require iterative queries to use disjoint paths for increased resiliency in the presence of potentially adversarial nodes.
.disjoint_query_paths(true)
Expand Down
53 changes: 38 additions & 15 deletions sn_networking/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use libp2p::{
Multiaddr, PeerId, TransportError,
};
use rand::{rngs::OsRng, Rng};
use sn_protocol::version::IDENTIFY_NODE_VERSION_STR;
use sn_protocol::version::{IDENTIFY_NODE_VERSION_STR, IDENTIFY_PROTOCOL_STR};
use sn_protocol::{
get_port_from_multiaddr,
messages::{CmdResponse, Query, Request, Response},
Expand Down Expand Up @@ -107,8 +107,13 @@ pub enum NetworkEvent {
},
/// Peer has been added to the Routing Table. And the number of connected peers.
PeerAdded(PeerId, usize),
// Peer has been removed from the Routing Table. And the number of connected peers.
/// Peer has been removed from the Routing Table. And the number of connected peers.
PeerRemoved(PeerId, usize),
/// The peer does not support our protocol
PeerWithUnsupportedProtocol {
our_protocol: String,
their_protocol: String,
},
/// The records bearing these keys are to be fetched from the holder or the network
KeysToFetchForReplication(Vec<(PeerId, RecordKey)>),
/// Started listening on a new address
Expand All @@ -120,13 +125,9 @@ pub enum NetworkEvent {
/// List of peer nodes that failed to fetch replication copy from.
FailedToFetchHolders(BTreeSet<PeerId>),
/// A peer in RT that supposed to be verified.
BadNodeVerification {
peer_id: PeerId,
},
BadNodeVerification { peer_id: PeerId },
/// Quotes to be verified
QuoteVerification {
quotes: Vec<(PeerId, PaymentQuote)>,
},
QuoteVerification { quotes: Vec<(PeerId, PaymentQuote)> },
/// Carry out chunk proof check against the specified record and peer
ChunkProofVerification {
peer_id: PeerId,
Expand All @@ -153,6 +154,12 @@ impl Debug for NetworkEvent {
"NetworkEvent::PeerRemoved({peer_id:?}, {connected_peers})"
)
}
NetworkEvent::PeerWithUnsupportedProtocol {
our_protocol,
their_protocol,
} => {
write!(f, "NetworkEvent::PeerWithUnsupportedProtocol({our_protocol:?}, {their_protocol:?})")
}
NetworkEvent::KeysToFetchForReplication(list) => {
let keys_len = list.len();
write!(f, "NetworkEvent::KeysForReplication({keys_len:?})")
Expand Down Expand Up @@ -219,10 +226,20 @@ impl SwarmDriver {
libp2p::identify::Event::Received { peer_id, info } => {
trace!(%peer_id, ?info, "identify: received info");

if info.protocol_version != IDENTIFY_PROTOCOL_STR.to_string() {
warn!(?info.protocol_version, "identify: {peer_id:?} does not have the same protocol. Our IDENTIFY_PROTOCOL_STR: {:?}", IDENTIFY_PROTOCOL_STR.as_str());

self.send_event(NetworkEvent::PeerWithUnsupportedProtocol {
our_protocol: IDENTIFY_PROTOCOL_STR.to_string(),
their_protocol: info.protocol_version,
});

return Ok(());
}

let has_dialed = self.dialed_peers.contains(&peer_id);
let peer_is_agent = info
.agent_version
.starts_with(&IDENTIFY_NODE_VERSION_STR.to_string());
let peer_is_node =
info.agent_version == IDENTIFY_NODE_VERSION_STR.to_string();

// If we're not in local mode, only add globally reachable addresses.
// Strip the `/p2p/...` part of the multiaddresses.
Expand All @@ -243,8 +260,8 @@ impl SwarmDriver {

// When received an identify from un-dialed peer, try to dial it
// The dial shall trigger the same identify to be sent again and confirm
// peer is external accessable, hence safe to be added into RT.
if !self.local && peer_is_agent && !has_dialed {
// peer is external accessible, hence safe to be added into RT.
if !self.local && peer_is_node && !has_dialed {
// Only need to dial back for not fulfilled kbucket
let (kbucket_full, ilog2) = if let Some(kbucket) =
self.swarm.behaviour_mut().kademlia.kbucket(peer_id)
Expand Down Expand Up @@ -275,7 +292,7 @@ impl SwarmDriver {
};

if !kbucket_full {
info!(%peer_id, ?addrs, "received identify info from undialed peer for not full kbucket {:?}, dail back to confirm external accesable", ilog2);
info!(%peer_id, ?addrs, "received identify info from undialed peer for not full kbucket {ilog2:?}, dial back to confirm external accessible");
self.dialed_peers
.push(peer_id)
.map_err(|_| NetworkError::CircularVecPopFrontError)?;
Expand All @@ -297,7 +314,7 @@ impl SwarmDriver {
}

// If we are not local, we care only for peers that we dialed and thus are reachable.
if self.local || has_dialed && peer_is_agent {
if self.local || has_dialed && peer_is_node {
// To reduce the bad_node check resource usage,
// during the connection establish process, only check cached black_list
// The periodical check, which involves network queries shall filter
Expand Down Expand Up @@ -1029,6 +1046,12 @@ impl SwarmDriver {
event_string = "kad_event::UnroutablePeer";
trace!(peer_id = %peer, "kad::Event: UnroutablePeer");
}
kad::Event::RoutablePeer { peer, .. } => {
// We get this when we don't add a peer via the identify step.
// And we don't want to add these as they were rejected by identify for some reason.
event_string = "kad_event::RoutablePeer";
trace!(peer_id = %peer, "kad::Event: RoutablePeer");
}
other => {
event_string = "kad_event::Other";
trace!("kad::Event ignored: {other:?}");
Expand Down
3 changes: 3 additions & 0 deletions sn_node/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,9 @@ impl Node {
Self::try_interval_replication(net);
});
}
NetworkEvent::PeerWithUnsupportedProtocol { .. } => {
event_header = "PeerWithUnsupportedProtocol";
}
NetworkEvent::NewListenAddr(_) => {
event_header = "NewListenAddr";
if !cfg!(feature = "local-discovery") {
Expand Down

0 comments on commit f50a71f

Please sign in to comment.