Skip to content

Commit

Permalink
feat(node): bad quoting detection
Browse files Browse the repository at this point in the history
  • Loading branch information
maqi committed Mar 29, 2024
1 parent f377a15 commit bacb878
Show file tree
Hide file tree
Showing 11 changed files with 347 additions and 95 deletions.
154 changes: 96 additions & 58 deletions sn_networking/src/cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use sn_protocol::{
storage::{RecordHeader, RecordKind, RecordType},
NetworkAddress, PrettyPrintRecordKey,
};
use sn_transfers::{NanoTokens, QuotingMetrics};
use sn_transfers::{NanoTokens, PaymentQuote, QuotingMetrics};
use std::{
collections::{BTreeMap, HashMap},
fmt::Debug,
Expand All @@ -42,6 +42,8 @@ pub enum NodeIssue {
ReplicationFailure,
/// Close nodes have reported this peer as bad
CloseNodesShunning,
/// Provided a bad quote
BadQuoting,
}

/// Commands to send to the Swarm
Expand Down Expand Up @@ -169,6 +171,10 @@ pub enum SwarmCmd {
target: NetworkAddress,
sender: oneshot::Sender<bool>,
},
// Quote verification agaisnt historical collected quotes
QuoteVerification {
quotes: Vec<(PeerId, PaymentQuote)>,
},
}

/// Debug impl for SwarmCmd to avoid printing full Record, instead only RecodKey
Expand Down Expand Up @@ -286,6 +292,9 @@ impl Debug for SwarmCmd {
SwarmCmd::IsPeerShunned { target, .. } => {
write!(f, "SwarmCmd::IsPeerInTrouble target: {target:?}")
}
SwarmCmd::QuoteVerification { quotes } => {
write!(f, "SwarmCmd::QuoteVerification of {} quotes", quotes.len())
}
}
}
}
Expand Down Expand Up @@ -685,63 +694,7 @@ impl SwarmDriver {
SwarmCmd::RecordNodeIssue { peer_id, issue } => {
cmd_string = "RecordNodeIssues";
let _ = self.bad_nodes_ongoing_verifications.remove(&peer_id);

info!("Peer {peer_id:?} is reported as having issue {issue:?}");
let (issue_vec, is_bad) = self.bad_nodes.entry(peer_id).or_default();

// If being considered as bad already, skip certain operations
if !(*is_bad) {
// Remove outdated entries
issue_vec.retain(|(_, timestamp)| timestamp.elapsed().as_secs() < 300);

// check if vec is already 10 long, if so, remove the oldest issue
// we only track 10 issues to avoid mem leaks
if issue_vec.len() == 10 {
issue_vec.remove(0);
}

// To avoid being too sensitive, only consider as a new issue
// when after certain while since the last one
let is_new_issue = if let Some((_issue, timestamp)) = issue_vec.last() {
timestamp.elapsed().as_secs() > 10
} else {
true
};

if is_new_issue {
issue_vec.push((issue, Instant::now()));
}

// Only consider candidate as a bad node when:
// accumulated THREE same kind issues within certain period
for (issue, _timestamp) in issue_vec.iter() {
let issue_counts = issue_vec
.iter()
.filter(|(i, _timestamp)| *issue == *i)
.count();
if issue_counts >= 3 {
*is_bad = true;
info!("Peer {peer_id:?} accumulated {issue_counts} times of issue {issue:?}. Consider it as a bad node now.");
// Once a bad behaviour detected, no point to continue
break;
}
}
}

if *is_bad {
warn!("Cleaning out bad_peer {peer_id:?}");
if let Some(dead_peer) =
self.swarm.behaviour_mut().kademlia.remove_peer(&peer_id)
{
self.connected_peers = self.connected_peers.saturating_sub(1);
self.send_event(NetworkEvent::PeerRemoved(
*dead_peer.node.key.preimage(),
self.connected_peers,
));
self.log_kbuckets(&peer_id);
let _ = self.check_for_change_in_our_close_group();
}
}
self.record_node_issue(peer_id, issue);
}
SwarmCmd::IsPeerShunned { target, sender } => {
cmd_string = "IsPeerInTrouble";
Expand All @@ -756,13 +709,98 @@ impl SwarmDriver {
};
let _ = sender.send(is_bad);
}
SwarmCmd::QuoteVerification { quotes } => {
cmd_string = "QuoteVerification";
for (peer_id, quote) in quotes {
// Do nothing if already being bad
if let Some((_issues, is_bad)) = self.bad_nodes.get(&peer_id) {
if *is_bad {
continue;
}
}
self.verify_peer_quote(peer_id, quote);
}
}
}

self.log_handling(cmd_string.to_string(), start.elapsed());

Ok(())
}

fn record_node_issue(&mut self, peer_id: PeerId, issue: NodeIssue) {
info!("Peer {peer_id:?} is reported as having issue {issue:?}");
let (issue_vec, is_bad) = self.bad_nodes.entry(peer_id).or_default();

// If being considered as bad already, skip certain operations
if !(*is_bad) {
// Remove outdated entries
issue_vec.retain(|(_, timestamp)| timestamp.elapsed().as_secs() < 300);

// check if vec is already 10 long, if so, remove the oldest issue
// we only track 10 issues to avoid mem leaks
if issue_vec.len() == 10 {
issue_vec.remove(0);
}

// To avoid being too sensitive, only consider as a new issue
// when after certain while since the last one
let is_new_issue = if let Some((_issue, timestamp)) = issue_vec.last() {
timestamp.elapsed().as_secs() > 10
} else {
true
};

if is_new_issue {
issue_vec.push((issue, Instant::now()));
}

// Only consider candidate as a bad node when:
// accumulated THREE same kind issues within certain period
for (issue, _timestamp) in issue_vec.iter() {
let issue_counts = issue_vec
.iter()
.filter(|(i, _timestamp)| *issue == *i)
.count();
if issue_counts >= 3 {
*is_bad = true;
info!("Peer {peer_id:?} accumulated {issue_counts} times of issue {issue:?}. Consider it as a bad node now.");
// Once a bad behaviour detected, no point to continue
break;
}
}
}

if *is_bad {
warn!("Cleaning out bad_peer {peer_id:?}");
if let Some(dead_peer) = self.swarm.behaviour_mut().kademlia.remove_peer(&peer_id) {
self.connected_peers = self.connected_peers.saturating_sub(1);
self.send_event(NetworkEvent::PeerRemoved(
*dead_peer.node.key.preimage(),
self.connected_peers,
));
self.log_kbuckets(&peer_id);
let _ = self.check_for_change_in_our_close_group();
}
}
}

fn verify_peer_quote(&mut self, peer_id: PeerId, quote: PaymentQuote) {
if let Some(history_quote) = self.quotes_history.get(&peer_id) {
if !history_quote.historical_verify(&quote) {
info!("From {peer_id:?}, detected a bad quote {quote:?} against history_quote {history_quote:?}");
self.record_node_issue(peer_id, NodeIssue::BadQuoting);
return;
}

if history_quote.is_newer_than(&quote) {
return;
}
}

let _ = self.quotes_history.insert(peer_id, quote);
}

fn try_interval_replication(&mut self) -> Result<()> {
// get closest peers from buckets, sorted by increasing distance to us
let our_peer_id = self.self_peer_id.into();
Expand Down
3 changes: 3 additions & 0 deletions sn_networking/src/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ use sn_protocol::{
storage::RetryStrategy,
NetworkAddress, PrettyPrintKBucketKey, PrettyPrintRecordKey,
};
use sn_transfers::PaymentQuote;
use std::{
collections::{btree_map::Entry, BTreeMap, BTreeSet, HashMap, HashSet},
fmt::Debug,
Expand Down Expand Up @@ -513,6 +514,7 @@ impl NetworkBuilder {
hard_disk_write_error: 0,
bad_nodes: Default::default(),
bad_nodes_ongoing_verifications: Default::default(),
quotes_history: Default::default(),
};

Ok((
Expand Down Expand Up @@ -567,6 +569,7 @@ pub struct SwarmDriver {
// the boolean flag to indicate whether the node is considered as bad or not
pub(crate) bad_nodes: BTreeMap<PeerId, (Vec<(NodeIssue, Instant)>, bool)>,
pub(crate) bad_nodes_ongoing_verifications: BTreeSet<PeerId>,
pub(crate) quotes_history: BTreeMap<PeerId, PaymentQuote>,
}

impl SwarmDriver {
Expand Down
42 changes: 40 additions & 2 deletions sn_networking/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use sn_protocol::{
storage::RecordType,
NetworkAddress, PrettyPrintRecordKey,
};
use sn_transfers::PaymentQuote;
use std::{
collections::{hash_map::Entry, BTreeSet, HashSet},
fmt::{Debug, Formatter},
Expand Down Expand Up @@ -123,6 +124,10 @@ pub enum NetworkEvent {
BadNodeVerification {
peer_id: PeerId,
},
/// Quotes to be verified
QuoteVerification {
quotes: Vec<(PeerId, PaymentQuote)>,
},
}

// Manually implement Debug as `#[debug(with = "unverified_record_fmt")]` not working as expected.
Expand Down Expand Up @@ -164,6 +169,13 @@ impl Debug for NetworkEvent {
NetworkEvent::BadNodeVerification { peer_id } => {
write!(f, "NetworkEvent::BadNodeVerification({peer_id:?})")
}
NetworkEvent::QuoteVerification { quotes } => {
write!(
f,
"NetworkEvent::QuoteVerification({} quotes)",
quotes.len()
)
}
}
}
}
Expand Down Expand Up @@ -566,8 +578,9 @@ impl SwarmDriver {
..
} => {
trace!("Received request {request_id:?} from peer {peer:?}, req: {request:?}");
// if the request is replication, we can handle it and send the OK response here,
// as we send that regardless of how we handle the request as its unimportant to the sender.
// If the request is replication or quote verification,
// we can handle it and send the OK response here.
// As the handle result is unimportant to the sender.
match request {
Request::Cmd(sn_protocol::messages::Cmd::Replicate { holder, keys }) => {
let response = Response::Cmd(
Expand All @@ -581,6 +594,31 @@ impl SwarmDriver {

self.add_keys_to_replication_fetcher(holder, keys);
}
Request::Cmd(sn_protocol::messages::Cmd::QuoteVerification {
quotes,
..
}) => {
let response = Response::Cmd(
sn_protocol::messages::CmdResponse::QuoteVerification(Ok(())),
);
self.swarm
.behaviour_mut()
.request_response
.send_response(channel, response)
.map_err(|_| NetworkError::InternalMsgChannelDropped)?;

// The keypair is required to verify the quotes,
// hence throw it up to Network layer for further actions.
let quotes = quotes
.iter()
.filter_map(|(peer_address, quote)| {
peer_address
.as_peer_id()
.map(|peer_id| (peer_id, quote.clone()))
})
.collect();
self.send_event(NetworkEvent::QuoteVerification { quotes })
}
Request::Query(query) => {
self.send_event(NetworkEvent::QueryRequestReceived {
query,
Expand Down
21 changes: 18 additions & 3 deletions sn_networking/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ pub use self::{
driver::{GetRecordCfg, NetworkBuilder, PutRecordCfg, SwarmDriver, VerificationKind},
error::{GetRecordError, NetworkError},
event::{MsgResponder, NetworkEvent},
record_store::NodeRecordStore,
record_store::{calculate_cost_for_records, NodeRecordStore},
transfers::{get_raw_signed_spends_from_record, get_signed_spend_from_record},
};

Expand All @@ -53,7 +53,7 @@ use libp2p::{
use rand::Rng;
use sn_protocol::{
error::Error as ProtocolError,
messages::{ChunkProof, Nonce, Query, QueryResponse, Request, Response},
messages::{ChunkProof, Cmd, Nonce, Query, QueryResponse, Request, Response},
storage::{RecordType, RetryStrategy},
NetworkAddress, PrettyPrintKBucketKey, PrettyPrintRecordKey,
};
Expand Down Expand Up @@ -362,6 +362,7 @@ impl Network {

// loop over responses, generating an average fee and storing all responses along side
let mut all_costs = vec![];
let mut all_quotes = vec![];
for response in responses.into_values().flatten() {
debug!(
"StoreCostReq for {record_address:?} received response: {:?}",
Expand All @@ -373,7 +374,8 @@ impl Network {
payment_address,
peer_address,
}) => {
all_costs.push((peer_address, payment_address, quote));
all_costs.push((peer_address.clone(), payment_address, quote.clone()));
all_quotes.push((peer_address, quote));
}
Response::Query(QueryResponse::GetStoreCost {
quote: Err(ProtocolError::RecordExists(_)),
Expand All @@ -388,6 +390,15 @@ impl Network {
}
}

for peer_id in close_nodes.iter() {
let request = Request::Cmd(Cmd::QuoteVerification {
target: NetworkAddress::from_peer(*peer_id),
quotes: all_quotes.clone(),
});

self.send_req_ignore_reply(request, *peer_id);
}

// Sort all_costs by the NetworkAddress proximity to record_address
all_costs.sort_by(|(peer_address_a, _, _), (peer_address_b, _, _)| {
record_address
Expand Down Expand Up @@ -746,6 +757,10 @@ impl Network {
self.send_swarm_cmd(SwarmCmd::RecordNodeIssue { peer_id, issue });
}

pub fn historical_verify_quotes(&self, quotes: Vec<(PeerId, PaymentQuote)>) {
self.send_swarm_cmd(SwarmCmd::QuoteVerification { quotes });
}

// Helper to send SwarmCmd
fn send_swarm_cmd(&self, cmd: SwarmCmd) {
send_swarm_cmd(self.swarm_cmd_sender.clone(), cmd);
Expand Down
2 changes: 1 addition & 1 deletion sn_networking/src/record_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -680,7 +680,7 @@ impl RecordStore for ClientRecordStore {
// to allow nodes receiving too many replication copies can still got paid,
// and gives an exponential pricing curve when storage reaches high.
// and give extra reward (lower the quoting price to gain a better chance) to long lived nodes.
fn calculate_cost_for_records(
pub fn calculate_cost_for_records(
records_stored: usize,
received_payment_count: usize,
max_records: usize,
Expand Down
9 changes: 9 additions & 0 deletions sn_node/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
use super::{
error::{Error, Result},
event::NodeEventsChannel,
quote::quotes_verification,
Marker, NodeEvent,
};
#[cfg(feature = "open-metrics")]
Expand Down Expand Up @@ -401,6 +402,14 @@ impl Node {
}
});
}
NetworkEvent::QuoteVerification { quotes } => {
event_header = "QuoteVerification";
let network = self.network.clone();

let _handle = spawn(async move {
quotes_verification(&network, quotes).await;
});
}
}

trace!(
Expand Down
Loading

0 comments on commit bacb878

Please sign in to comment.