From f50a71f556fe4abe7313a608c2883ecf5fefdf9d Mon Sep 17 00:00:00 2001 From: Roland Sherwin Date: Sat, 6 Apr 2024 00:50:37 +0530 Subject: [PATCH] feat: report protocol mismatch error --- sn_cli/src/bin/main.rs | 23 ++++++++++------ sn_client/src/api.rs | 30 ++++++++++++++++++--- sn_client/src/error.rs | 3 +++ sn_client/src/event.rs | 5 ++++ sn_faucet/src/main.rs | 32 ++++++++++++---------- sn_networking/src/driver.rs | 1 + sn_networking/src/event.rs | 53 ++++++++++++++++++++++++++----------- sn_node/src/node.rs | 3 +++ 8 files changed, 109 insertions(+), 41 deletions(-) diff --git a/sn_cli/src/bin/main.rs b/sn_cli/src/bin/main.rs index 13ce887081..67294df045 100644 --- a/sn_cli/src/bin/main.rs +++ b/sn_cli/src/bin/main.rs @@ -112,7 +112,8 @@ async fn main() -> Result<()> { // get the broadcaster as we want to have our own progress bar. let broadcaster = ClientEventsBroadcaster::default(); - let progress_bar_handler = spawn_connection_progress_bar(broadcaster.subscribe()); + let (progress_bar, progress_bar_handler) = + spawn_connection_progress_bar(broadcaster.subscribe()); let result = Client::new( secret_key, @@ -121,11 +122,15 @@ async fn main() -> Result<()> { Some(broadcaster), ) .await; - - // await on the progress bar to complete before handling the client result. If client errors out, we would - // want to make the progress bar clean up gracefully. + let client = match result { + Ok(client) => client, + Err(err) => { + // clean up progress bar + progress_bar.finish_with_message("Could not connect to the network"); + return Err(err.into()); + } + }; progress_bar_handler.await?; - let client = result?; // default to verifying storage let should_verify_store = !opt.no_verify; @@ -153,9 +158,10 @@ async fn main() -> Result<()> { /// Helper to subscribe to the client events broadcaster and spin up a progress bar that terminates when the /// client successfully connects to the network or if it errors out. -fn spawn_connection_progress_bar(mut rx: ClientEventsReceiver) -> JoinHandle<()> { +fn spawn_connection_progress_bar(mut rx: ClientEventsReceiver) -> (ProgressBar, JoinHandle<()>) { // Network connection progress bar let progress_bar = ProgressBar::new_spinner(); + let progress_bar_clone = progress_bar.clone(); progress_bar.enable_steady_tick(Duration::from_millis(120)); progress_bar.set_message("Connecting to The SAFE Network..."); let new_style = progress_bar.style().tick_chars("⠁⠂⠄⡀⢀⠠⠐⠈🔗"); @@ -163,7 +169,7 @@ fn spawn_connection_progress_bar(mut rx: ClientEventsReceiver) -> JoinHandle<()> progress_bar.set_message("Connecting to The SAFE Network..."); - tokio::spawn(async move { + let handle = tokio::spawn(async move { let mut peers_connected = 0; loop { match rx.recv().await { @@ -190,7 +196,8 @@ fn spawn_connection_progress_bar(mut rx: ClientEventsReceiver) -> JoinHandle<()> _ => {} } } - }) + }); + (progress_bar_clone, handle) } fn get_client_secret_key(root_dir: &PathBuf) -> Result { diff --git a/sn_client/src/api.rs b/sn_client/src/api.rs index 7fad35d576..626c3c389f 100644 --- a/sn_client/src/api.rs +++ b/sn_client/src/api.rs @@ -191,7 +191,9 @@ impl Client { // loop to connect to the network let mut is_connected = false; let connection_timeout = connection_timeout.unwrap_or(CONNECTION_TIMEOUT); + let mut unsupported_protocol_tracker: Option<(String, String)> = None; + debug!("Client connection timeout: {connection_timeout:?}"); let mut connection_timeout_interval = interval(connection_timeout); // first tick completes immediately connection_timeout_interval.tick().await; @@ -200,16 +202,26 @@ impl Client { tokio::select! { _ = connection_timeout_interval.tick() => { if !is_connected { + if let Some((our_protocol, their_protocols)) = unsupported_protocol_tracker { + error!("Timeout: Client could not connect to the network as it does not support the protocol"); + break Err(Error::UnsupportedProtocol(our_protocol, their_protocols)); + } error!("Timeout: Client failed to connect to the network within {connection_timeout:?}"); - return Err(Error::ConnectionTimeout(connection_timeout)); + break Err(Error::ConnectionTimeout(connection_timeout)); } } event = client_events_rx.recv() => { match event { + // we do not error out directly as we might still connect if the other initial peers are from + // the correct network. + Ok(ClientEvent::PeerWithUnsupportedProtocol { our_protocol, their_protocol }) => { + warn!(%our_protocol, %their_protocol, "Client tried to connect to a peer with an unsupported protocol. Tracking the latest one"); + unsupported_protocol_tracker = Some((our_protocol, their_protocol)); + } Ok(ClientEvent::ConnectedToNetwork) => { is_connected = true; info!("Client connected to the Network {is_connected:?}."); - break; + break Ok(()); } Ok(ClientEvent::InactiveClient(timeout)) => { if is_connected { @@ -221,12 +233,12 @@ impl Client { Err(err) => { error!("Unexpected error during client startup {err:?}"); println!("Unexpected error during client startup {err:?}"); - return Err(err.into()); + break Err(err.into()); } _ => {} } }} - } + }?; Ok(client) } @@ -252,6 +264,16 @@ impl Client { debug!("{peers_added}/{CLOSE_GROUP_SIZE} initial peers found.",); } } + NetworkEvent::PeerWithUnsupportedProtocol { + our_protocol, + their_protocol, + } => { + self.events_broadcaster + .broadcast(ClientEvent::PeerWithUnsupportedProtocol { + our_protocol, + their_protocol, + }); + } _other => {} } diff --git a/sn_client/src/error.rs b/sn_client/src/error.rs index a5880bd420..13bf844ed1 100644 --- a/sn_client/src/error.rs +++ b/sn_client/src/error.rs @@ -112,6 +112,9 @@ pub enum Error { #[error("Could not find register after batch sync: {0:?}")] RegisterNotFoundAfterUpload(XorName), + #[error("Could not connect due to incompatible network protocols. Our protocol: {0} Network protocol: {1}")] + UnsupportedProtocol(String, String), + // ------ Upload Errors -------- #[error("Overflow occurred while adding values")] NumericOverflow, diff --git a/sn_client/src/event.rs b/sn_client/src/event.rs index 0f53c6a79f..25b18eef1c 100644 --- a/sn_client/src/event.rs +++ b/sn_client/src/event.rs @@ -42,6 +42,11 @@ pub enum ClientEvent { /// A peer has been added to the Routing table. /// Also contains the max number of peers to connect to before we receive ClientEvent::ConnectedToNetwork PeerAdded { max_peers_to_connect: usize }, + /// We've encountered a Peer with an unsupported protocol. + PeerWithUnsupportedProtocol { + our_protocol: String, + their_protocol: String, + }, /// The client has been connected to the network ConnectedToNetwork, /// No network activity has been received for a given duration diff --git a/sn_faucet/src/main.rs b/sn_faucet/src/main.rs index f35bb810b0..935d0066f2 100644 --- a/sn_faucet/src/main.rs +++ b/sn_faucet/src/main.rs @@ -60,19 +60,21 @@ async fn main() -> Result<()> { let secret_key = bls::SecretKey::random(); let broadcaster = ClientEventsBroadcaster::default(); - let handle = spawn_connection_progress_bar(broadcaster.subscribe()); + let (progress_bar, handle) = spawn_connection_progress_bar(broadcaster.subscribe()); let result = Client::new(secret_key, bootstrap_peers, None, Some(broadcaster)).await; - - // await on the progress bar to complete before handling the client result. If client errors out, we would - // want to make the progress bar clean up gracefully. - handle.await?; - match result { - Ok(client) => { - if let Err(err) = faucet_cmds(opt.cmd.clone(), &client).await { - error!("Failed to run faucet cmd {:?} with err {err:?}", opt.cmd) - } + let client = match result { + Ok(client) => client, + Err(err) => { + // clean up progress bar + progress_bar.finish_with_message("Could not connect to the network"); + error!("Failed to get Client with err {err:?}"); + return Err(err.into()); } - Err(err) => error!("Failed to get Client with err {err:?}"), + }; + handle.await?; + + if let Err(err) = faucet_cmds(opt.cmd.clone(), &client).await { + error!("Failed to run faucet cmd {:?} with err {err:?}", opt.cmd) } Ok(()) @@ -80,9 +82,10 @@ async fn main() -> Result<()> { /// Helper to subscribe to the client events broadcaster and spin up a progress bar that terminates when the /// client successfully connects to the network or if it errors out. -fn spawn_connection_progress_bar(mut rx: ClientEventsReceiver) -> JoinHandle<()> { +fn spawn_connection_progress_bar(mut rx: ClientEventsReceiver) -> (ProgressBar, JoinHandle<()>) { // Network connection progress bar let progress_bar = ProgressBar::new_spinner(); + let progress_bar_clone = progress_bar.clone(); progress_bar.enable_steady_tick(Duration::from_millis(120)); progress_bar.set_message("Connecting to The SAFE Network..."); let new_style = progress_bar.style().tick_chars("⠁⠂⠄⡀⢀⠠⠐⠈🔗"); @@ -90,7 +93,7 @@ fn spawn_connection_progress_bar(mut rx: ClientEventsReceiver) -> JoinHandle<()> progress_bar.set_message("Connecting to The SAFE Network..."); - tokio::spawn(async move { + let handle = tokio::spawn(async move { let mut peers_connected = 0; loop { match rx.recv().await { @@ -117,7 +120,8 @@ fn spawn_connection_progress_bar(mut rx: ClientEventsReceiver) -> JoinHandle<()> _ => {} } } - }) + }); + (progress_bar_clone, handle) } #[derive(Parser)] diff --git a/sn_networking/src/driver.rs b/sn_networking/src/driver.rs index 42d5a2c5cd..b91ddfa112 100644 --- a/sn_networking/src/driver.rs +++ b/sn_networking/src/driver.rs @@ -328,6 +328,7 @@ impl NetworkBuilder { // 1mb packet size let _ = kad_cfg + .set_kbucket_inserts(libp2p::kad::BucketInserts::Manual) .set_max_packet_size(MAX_PACKET_SIZE) // Require iterative queries to use disjoint paths for increased resiliency in the presence of potentially adversarial nodes. .disjoint_query_paths(true) diff --git a/sn_networking/src/event.rs b/sn_networking/src/event.rs index c61e2b0e13..45c4d95b7f 100644 --- a/sn_networking/src/event.rs +++ b/sn_networking/src/event.rs @@ -30,7 +30,7 @@ use libp2p::{ Multiaddr, PeerId, TransportError, }; use rand::{rngs::OsRng, Rng}; -use sn_protocol::version::IDENTIFY_NODE_VERSION_STR; +use sn_protocol::version::{IDENTIFY_NODE_VERSION_STR, IDENTIFY_PROTOCOL_STR}; use sn_protocol::{ get_port_from_multiaddr, messages::{CmdResponse, Query, Request, Response}, @@ -107,8 +107,13 @@ pub enum NetworkEvent { }, /// Peer has been added to the Routing Table. And the number of connected peers. PeerAdded(PeerId, usize), - // Peer has been removed from the Routing Table. And the number of connected peers. + /// Peer has been removed from the Routing Table. And the number of connected peers. PeerRemoved(PeerId, usize), + /// The peer does not support our protocol + PeerWithUnsupportedProtocol { + our_protocol: String, + their_protocol: String, + }, /// The records bearing these keys are to be fetched from the holder or the network KeysToFetchForReplication(Vec<(PeerId, RecordKey)>), /// Started listening on a new address @@ -120,13 +125,9 @@ pub enum NetworkEvent { /// List of peer nodes that failed to fetch replication copy from. FailedToFetchHolders(BTreeSet), /// A peer in RT that supposed to be verified. - BadNodeVerification { - peer_id: PeerId, - }, + BadNodeVerification { peer_id: PeerId }, /// Quotes to be verified - QuoteVerification { - quotes: Vec<(PeerId, PaymentQuote)>, - }, + QuoteVerification { quotes: Vec<(PeerId, PaymentQuote)> }, /// Carry out chunk proof check against the specified record and peer ChunkProofVerification { peer_id: PeerId, @@ -153,6 +154,12 @@ impl Debug for NetworkEvent { "NetworkEvent::PeerRemoved({peer_id:?}, {connected_peers})" ) } + NetworkEvent::PeerWithUnsupportedProtocol { + our_protocol, + their_protocol, + } => { + write!(f, "NetworkEvent::PeerWithUnsupportedProtocol({our_protocol:?}, {their_protocol:?})") + } NetworkEvent::KeysToFetchForReplication(list) => { let keys_len = list.len(); write!(f, "NetworkEvent::KeysForReplication({keys_len:?})") @@ -219,10 +226,20 @@ impl SwarmDriver { libp2p::identify::Event::Received { peer_id, info } => { trace!(%peer_id, ?info, "identify: received info"); + if info.protocol_version != IDENTIFY_PROTOCOL_STR.to_string() { + warn!(?info.protocol_version, "identify: {peer_id:?} does not have the same protocol. Our IDENTIFY_PROTOCOL_STR: {:?}", IDENTIFY_PROTOCOL_STR.as_str()); + + self.send_event(NetworkEvent::PeerWithUnsupportedProtocol { + our_protocol: IDENTIFY_PROTOCOL_STR.to_string(), + their_protocol: info.protocol_version, + }); + + return Ok(()); + } + let has_dialed = self.dialed_peers.contains(&peer_id); - let peer_is_agent = info - .agent_version - .starts_with(&IDENTIFY_NODE_VERSION_STR.to_string()); + let peer_is_node = + info.agent_version == IDENTIFY_NODE_VERSION_STR.to_string(); // If we're not in local mode, only add globally reachable addresses. // Strip the `/p2p/...` part of the multiaddresses. @@ -243,8 +260,8 @@ impl SwarmDriver { // When received an identify from un-dialed peer, try to dial it // The dial shall trigger the same identify to be sent again and confirm - // peer is external accessable, hence safe to be added into RT. - if !self.local && peer_is_agent && !has_dialed { + // peer is external accessible, hence safe to be added into RT. + if !self.local && peer_is_node && !has_dialed { // Only need to dial back for not fulfilled kbucket let (kbucket_full, ilog2) = if let Some(kbucket) = self.swarm.behaviour_mut().kademlia.kbucket(peer_id) @@ -275,7 +292,7 @@ impl SwarmDriver { }; if !kbucket_full { - info!(%peer_id, ?addrs, "received identify info from undialed peer for not full kbucket {:?}, dail back to confirm external accesable", ilog2); + info!(%peer_id, ?addrs, "received identify info from undialed peer for not full kbucket {ilog2:?}, dial back to confirm external accessible"); self.dialed_peers .push(peer_id) .map_err(|_| NetworkError::CircularVecPopFrontError)?; @@ -297,7 +314,7 @@ impl SwarmDriver { } // If we are not local, we care only for peers that we dialed and thus are reachable. - if self.local || has_dialed && peer_is_agent { + if self.local || has_dialed && peer_is_node { // To reduce the bad_node check resource usage, // during the connection establish process, only check cached black_list // The periodical check, which involves network queries shall filter @@ -1029,6 +1046,12 @@ impl SwarmDriver { event_string = "kad_event::UnroutablePeer"; trace!(peer_id = %peer, "kad::Event: UnroutablePeer"); } + kad::Event::RoutablePeer { peer, .. } => { + // We get this when we don't add a peer via the identify step. + // And we don't want to add these as they were rejected by identify for some reason. + event_string = "kad_event::RoutablePeer"; + trace!(peer_id = %peer, "kad::Event: RoutablePeer"); + } other => { event_string = "kad_event::Other"; trace!("kad::Event ignored: {other:?}"); diff --git a/sn_node/src/node.rs b/sn_node/src/node.rs index 8dd1314cfe..0ee4b8ffe2 100644 --- a/sn_node/src/node.rs +++ b/sn_node/src/node.rs @@ -326,6 +326,9 @@ impl Node { Self::try_interval_replication(net); }); } + NetworkEvent::PeerWithUnsupportedProtocol { .. } => { + event_header = "PeerWithUnsupportedProtocol"; + } NetworkEvent::NewListenAddr(_) => { event_header = "NewListenAddr"; if !cfg!(feature = "local-discovery") {