From 6523217b42cdb44739ecc80d9ec23d0708183b76 Mon Sep 17 00:00:00 2001 From: Trey Aspelund Date: Fri, 18 Oct 2024 03:35:42 -0600 Subject: [PATCH] Various refactoring Removes an unused function. Guards an illumos-specific import. Consolidates (bfd) nexthop enabled functions into one that takes a bool. Moves RIB locking and looping of prefixes and PrefixChangeNotifications into RIB removal helper functions to improve pcn batching and consolidate locations for future RIB batch work. Removes re-processing of BGP path attributes. Removes re-looping over routes/paths in a few places. Eliminates some return types when no callers handled Result. Adds some TODOs. Signed-off-by: Trey Aspelund --- bfd/src/sm.rs | 4 +- bgp/src/connection_tcp.rs | 4 +- bgp/src/session.rs | 107 +++++++++++------- mg-lower/src/lib.rs | 6 +- mgd/src/bgp_admin.rs | 5 +- mgd/src/main.rs | 13 +-- mgd/src/static_admin.rs | 22 ++-- rdb/src/db.rs | 220 ++++++++++++++++++++------------------ rdb/src/types.rs | 14 +-- 9 files changed, 211 insertions(+), 184 deletions(-) diff --git a/bfd/src/sm.rs b/bfd/src/sm.rs index ed23a049..99327c21 100644 --- a/bfd/src/sm.rs +++ b/bfd/src/sm.rs @@ -438,7 +438,7 @@ impl State for Down { db: rdb::Db, counters: Arc, ) -> Result<(Box, BfdEndpoint)> { - db.disable_nexthop(self.peer); + db.set_nexthop_shutdown(self.peer, true); loop { // Get an incoming message let (_addr, msg) = match self.recv( @@ -597,7 +597,7 @@ impl State for Up { db: rdb::Db, counters: Arc, ) -> Result<(Box, BfdEndpoint)> { - db.enable_nexthop(self.peer); + db.set_nexthop_shutdown(self.peer, false); loop { // Get an incoming message let (_addr, msg) = match self.recv( diff --git a/bgp/src/connection_tcp.rs b/bgp/src/connection_tcp.rs index d9f290d8..3caab633 100644 --- a/bgp/src/connection_tcp.rs +++ b/bgp/src/connection_tcp.rs @@ -12,7 +12,7 @@ use crate::session::FsmEvent; use crate::to_canonical; use libc::{c_int, sockaddr_storage}; use mg_common::lock; -use slog::{debug, error, info, trace, warn, Logger}; +use slog::{error, info, trace, warn, Logger}; use std::collections::BTreeMap; use std::io::Read; use std::io::Write; @@ -31,6 +31,8 @@ use libc::{c_void, IPPROTO_IP, IPPROTO_IPV6, IPPROTO_TCP}; #[cfg(target_os = "linux")] use libc::{IP_MINTTL, TCP_MD5SIG}; #[cfg(target_os = "illumos")] +use slog::debug; +#[cfg(target_os = "illumos")] use std::collections::HashSet; #[cfg(target_os = "illumos")] use std::time::Instant; diff --git a/bgp/src/session.rs b/bgp/src/session.rs index 3d37d069..2adeae71 100644 --- a/bgp/src/session.rs +++ b/bgp/src/session.rs @@ -1504,7 +1504,7 @@ impl SessionRunner { FsmEvent::RouteRefreshNeeded => { if let Some(remote_id) = lock!(self.session).remote_id { - self.db.mark_bgp_id_stale(remote_id); + self.db.mark_bgp_peer_stale(remote_id); self.send_route_refresh(&pc.conn); } FsmState::Established(pc) @@ -1929,7 +1929,7 @@ impl SessionRunner { write_lock!(self.fanout).remove_egress(self.neighbor.host.ip()); // remove peer prefixes from db - self.db.remove_peer_prefixes(pc.id); + self.db.remove_bgp_peer_prefixes(pc.id); FsmState::Idle } @@ -2024,9 +2024,14 @@ impl SessionRunner { /// Update this router's RIB based on an update message from a peer. fn update_rib(&self, update: &UpdateMessage, id: u32, peer_as: u32) { - for w in &update.withdrawn { - self.db.remove_peer_prefix(id, w.as_prefix4().into()); - } + self.db.remove_bgp_prefixes( + update + .withdrawn + .iter() + .map(|w| rdb::Prefix::from(w.as_prefix4())) + .collect(), + id, + ); let originated = match self.db.get_origin4() { Ok(value) => value, @@ -2037,6 +2042,37 @@ impl SessionRunner { }; if !update.nlri.is_empty() { + // TODO: parse and prefer nexthop in MP_REACH_NLRI + // + // Per RFC 4760: + // """ + // The next hop information carried in the MP_REACH_NLRI path attribute + // defines the Network Layer address of the router that SHOULD be used + // as the next hop to the destinations listed in the MP_NLRI attribute + // in the UPDATE message. + // + // [..] + // + // An UPDATE message that carries no NLRI, other than the one encoded in + // the MP_REACH_NLRI attribute, SHOULD NOT carry the NEXT_HOP attribute. + // If such a message contains the NEXT_HOP attribute, the BGP speaker + // that receives the message SHOULD ignore this attribute. + // """ + // + // i.e. + // 1) NEXT_HOP SHOULD NOT be sent unless there are no MP_REACH_NLRI + // 2) NEXT_HOP SHOULD be ignored unless there are no MP_REACH_NLRI + // + // The standards do not state whether an implementation can/should send + // IPv4 Unicast prefixes embedded in an MP_REACH_NLRI attribute or in the + // classic NLRI field of an Update message. If we participate in MP-BGP + // and negotiate IPv4 Unicast, it's entirely likely that we'll peer with + // other BGP speakers falling into any of the combinations: + // a) MP not negotiated, IPv4 Unicast in NLRI, NEXT_HOP included + // b) MP negotiated, IPv4 Unicast in NLRI, NEXT_HOP included + // c) MP negotiated, IPv4 Unicast in NLRI, NEXT_HOP not included + // d) MP negotiated, IPv4 Unicast in MP_REACH_NLRI, NEXT_HOP included + // e) MP negotiated, IPv4 Unicast in MP_REACH_NLRI, NEXT_HOP not included let nexthop = match update.nexthop4() { Some(nh) => nh, None => { @@ -2051,41 +2087,36 @@ impl SessionRunner { } }; - for n in &update.nlri { - let prefix = n.as_prefix4(); - // ignore prefixes we originate - if originated.contains(&prefix) { - continue; - } - - let mut as_path = Vec::new(); - if let Some(segments_list) = update.as_path() { - for segments in &segments_list { - as_path.extend(segments.value.iter()); - } - } - - let path = rdb::Path { - nexthop: nexthop.into(), - shutdown: update.graceful_shutdown(), - rib_priority: DEFAULT_RIB_PRIORITY_BGP, - bgp: Some(BgpPathProperties { - origin_as: peer_as, - id, - med: update.multi_exit_discriminator(), - local_pref: update.local_pref(), - as_path, - stale: None, - }), - vlan_id: lock!(self.session).vlan_id, - }; - - if let Err(e) = - self.db.add_prefix_path(prefix.into(), path.clone(), false) - { - err!(self; "failed to add path {:?} -> {:?}: {e}", prefix, path); + let mut as_path = Vec::new(); + if let Some(segments_list) = update.as_path() { + for segments in &segments_list { + as_path.extend(segments.value.iter()); } } + let path = rdb::Path { + nexthop: nexthop.into(), + shutdown: update.graceful_shutdown(), + rib_priority: DEFAULT_RIB_PRIORITY_BGP, + bgp: Some(BgpPathProperties { + origin_as: peer_as, + id, + med: update.multi_exit_discriminator(), + local_pref: update.local_pref(), + as_path, + stale: None, + }), + vlan_id: lock!(self.session).vlan_id, + }; + + self.db.add_bgp_prefixes( + update + .nlri + .iter() + .filter(|p| !originated.contains(&p.as_prefix4())) + .map(|n| rdb::Prefix::from(n.as_prefix4())) + .collect(), + path.clone(), + ); } //TODO(IPv6) iterate through MpReachNlri attributes for IPv6 diff --git a/mg-lower/src/lib.rs b/mg-lower/src/lib.rs index 6cb408ef..eaf43200 100644 --- a/mg-lower/src/lib.rs +++ b/mg-lower/src/lib.rs @@ -24,7 +24,7 @@ use slog::{error, info, Logger}; use std::collections::HashSet; use std::net::Ipv6Addr; use std::sync::mpsc::{channel, RecvTimeoutError}; -use std::sync::{Arc, Mutex}; +use std::sync::Arc; use std::thread::sleep; use std::time::Duration; @@ -160,7 +160,7 @@ fn handle_change( fn sync_prefix( tep: Ipv6Addr, - rib_loc: Arc>, + rib_loc: Rib, prefix: &Prefix, dpd: &DpdClient, ddm: &DdmClient, @@ -172,7 +172,7 @@ fn sync_prefix( // The best routes in the RIB let mut best: HashSet = HashSet::new(); - if let Some(paths) = rib_loc.lock().unwrap().get(prefix) { + if let Some(paths) = rib_loc.get(prefix) { for path in paths { best.insert(RouteHash::for_prefix_path(*prefix, path.clone())?); } diff --git a/mgd/src/bgp_admin.rs b/mgd/src/bgp_admin.rs index edfef88c..aee9640f 100644 --- a/mgd/src/bgp_admin.rs +++ b/mgd/src/bgp_admin.rs @@ -418,8 +418,7 @@ pub async fn get_selected( ) -> Result, HttpError> { let rq = request.into_inner(); let ctx = ctx.context(); - let rib = get_router!(ctx, rq.asn)?.db.loc_rib(); - let selected = rib.lock().unwrap().clone(); + let selected = get_router!(ctx, rq.asn)?.db.loc_rib(); Ok(HttpResponseOk(selected.into())) } @@ -797,7 +796,7 @@ pub(crate) mod helpers { ) .remote_id .ok_or(Error::NotFound("bgp peer not found".into()))?; - ctx.db.remove_peer_prefixes(id); + ctx.db.remove_bgp_peer_prefixes(id); ctx.db.remove_bgp_neighbor(addr)?; get_router!(&ctx, asn)?.delete_session(addr); diff --git a/mgd/src/main.rs b/mgd/src/main.rs index 5e54723a..14e0beae 100644 --- a/mgd/src/main.rs +++ b/mgd/src/main.rs @@ -11,7 +11,7 @@ use clap::{Parser, Subcommand}; use mg_common::cli::oxide_cli_style; use mg_common::stats::MgLowerStats; use rand::Fill; -use rdb::{BfdPeerConfig, BgpNeighborInfo, BgpRouterInfo, Path}; +use rdb::{BfdPeerConfig, BgpNeighborInfo, BgpRouterInfo}; use signal::handle_signals; use slog::{error, Logger}; use std::collections::BTreeMap; @@ -271,14 +271,9 @@ fn initialize_static_routes(db: &rdb::Db) { let routes = db .get_static4() .expect("failed to get static routes from db"); - for route in &routes { - let path = - Path::for_static(route.nexthop, route.vlan_id, route.rib_priority); - db.add_prefix_path(route.prefix, path, true) - .unwrap_or_else(|e| { - panic!("failed to initialize static route {route:#?}: {e}") - }) - } + db.add_static_routes(&routes).unwrap_or_else(|e| { + panic!("failed to initialize static routes {routes:#?}: {e}") + }) } fn get_tunnel_endpoint_ula(db: &rdb::Db) -> Ipv6Addr { diff --git a/mgd/src/static_admin.rs b/mgd/src/static_admin.rs index 0d170227..e4bafb93 100644 --- a/mgd/src/static_admin.rs +++ b/mgd/src/static_admin.rs @@ -68,13 +68,10 @@ pub async fn static_add_v4_route( .into_iter() .map(Into::into) .collect(); - for r in routes { - let path = Path::for_static(r.nexthop, r.vlan_id, r.rib_priority); - ctx.context() - .db - .add_prefix_path(r.prefix, path, true) - .map_err(|e| HttpError::for_internal_error(e.to_string()))?; - } + ctx.context() + .db + .add_static_routes(&routes) + .map_err(|e| HttpError::for_internal_error(e.to_string()))?; Ok(HttpResponseUpdatedNoContent()) } @@ -90,13 +87,10 @@ pub async fn static_remove_v4_route( .into_iter() .map(Into::into) .collect(); - for r in routes { - let path = Path::for_static(r.nexthop, r.vlan_id, r.rib_priority); - ctx.context() - .db - .remove_prefix_path(r.prefix, path, true) - .map_err(|e| HttpError::for_internal_error(e.to_string()))?; - } + ctx.context() + .db + .remove_static_routes(routes) + .map_err(|e| HttpError::for_internal_error(e.to_string()))?; Ok(HttpResponseDeleted()) } diff --git a/rdb/src/db.rs b/rdb/src/db.rs index ee468b87..3e36082b 100644 --- a/rdb/src/db.rs +++ b/rdb/src/db.rs @@ -129,8 +129,8 @@ impl Db { } } - pub fn loc_rib(&self) -> Arc> { - self.rib_loc.clone() + pub fn loc_rib(&self) -> Rib { + lock!(self.rib_loc).clone() } pub fn full_rib(&self) -> Rib { @@ -379,10 +379,6 @@ impl Db { } } - pub fn get_imported(&self) -> Rib { - lock!(self.rib_in).clone() - } - pub fn update_loc_rib(rib_in: &Rib, rib_loc: &mut Rib, prefix: Prefix) { let bp = bestpaths(prefix, rib_in, BESTPATH_FANOUT); match bp { @@ -395,12 +391,7 @@ impl Db { } } - pub fn add_prefix_path( - &self, - prefix: Prefix, - path: Path, - is_static: bool, - ) -> Result<(), Error> { + pub fn add_prefix_path(&self, prefix: Prefix, path: &Path) { let mut rib = lock!(self.rib_in); match rib.get_mut(&prefix) { Some(paths) => { @@ -411,24 +402,51 @@ impl Db { } } Self::update_loc_rib(&rib, &mut lock!(self.rib_loc), prefix); + } - if is_static { - let tree = self.persistent.open_tree(STATIC4_ROUTES)?; - let srk = StaticRouteKey { - prefix, - nexthop: path.nexthop, - vlan_id: path.vlan_id, - rib_priority: path.rib_priority, + pub fn add_static_routes( + &self, + routes: &Vec, + ) -> Result<(), Error> { + let mut pcn = PrefixChangeNotification::default(); + let tree = self.persistent.open_tree(STATIC4_ROUTES)?; + for route in routes { + self.add_prefix_path(route.prefix, &Path::from(*route)); + let key = match serde_json::to_string(route) { + Err(e) => { + error!( + self.log, + "{} serde_json::to_string conversion failed: {e}", + route.prefix + ); + continue; + } + Ok(s) => s, + }; + if let Err(e) = tree.insert(key.as_str(), "") { + error!( + self.log, + "{}: insert into tree STATIC4_ROUTES failed: {e}", + route.prefix + ); + continue; }; - let key = serde_json::to_string(&srk)?; - tree.insert(key.as_str(), "")?; - tree.flush()?; + pcn.changed.insert(route.prefix); } - - self.notify(prefix.into()); + self.notify(pcn); + tree.flush()?; Ok(()) } + pub fn add_bgp_prefixes(&self, prefixes: Vec, path: Path) { + let mut pcn = PrefixChangeNotification::default(); + for prefix in prefixes { + self.add_prefix_path(prefix, &path); + pcn.changed.insert(prefix); + } + self.notify(pcn); + } + pub fn get_static4(&self) -> Result, Error> { let tree = self.persistent.open_tree(STATIC4_ROUTES)?; Ok(tree @@ -475,35 +493,14 @@ impl Db { Ok(nexthops.len()) } - pub fn disable_nexthop(&self, nexthop: IpAddr) { + pub fn set_nexthop_shutdown(&self, nexthop: IpAddr, shutdown: bool) { let mut rib = lock!(self.rib_in); let mut pcn = PrefixChangeNotification::default(); for (prefix, paths) in rib.iter_mut() { for p in paths.clone().into_iter() { - if p.nexthop == nexthop && !p.shutdown { + if p.nexthop == nexthop && p.shutdown != shutdown { let mut replacement = p.clone(); - replacement.shutdown = true; - paths.insert(replacement); - pcn.changed.insert(*prefix); - } - } - } - - for prefix in pcn.changed.iter() { - Self::update_loc_rib(&rib, &mut lock!(self.rib_loc), *prefix); - } - - self.notify(pcn); - } - - pub fn enable_nexthop(&self, nexthop: IpAddr) { - let mut rib = lock!(self.rib_in); - let mut pcn = PrefixChangeNotification::default(); - for (prefix, paths) in rib.iter_mut() { - for p in paths.clone().into_iter() { - if p.nexthop == nexthop && p.shutdown { - let mut replacement = p.clone(); - replacement.shutdown = false; + replacement.shutdown = shutdown; paths.insert(replacement); pcn.changed.insert(*prefix); } @@ -517,79 +514,87 @@ impl Db { self.notify(pcn); } - pub fn remove_prefix_path( - &self, - prefix: Prefix, - path: Path, - is_static: bool, //TODO - ) -> Result<(), Error> { + pub fn remove_prefix_path(&self, prefix: Prefix, prefix_cmp: F) + where + F: Fn(&Path) -> bool, + { let mut rib = lock!(self.rib_in); if let Some(paths) = rib.get_mut(&prefix) { - paths.retain(|x| x.cmp(&path) != CmpOrdering::Equal); + paths.retain(|p| prefix_cmp(p)); if paths.is_empty() { rib.remove(&prefix); } } - if is_static { - let tree = self.persistent.open_tree(STATIC4_ROUTES)?; - let srk = StaticRouteKey { - prefix, - nexthop: path.nexthop, - vlan_id: path.vlan_id, - rib_priority: path.rib_priority, - }; - let key = serde_json::to_string(&srk)?; - tree.remove(key.as_str())?; - tree.flush()?; - } - Self::update_loc_rib(&rib, &mut lock!(self.rib_loc), prefix); - self.notify(prefix.into()); - Ok(()) } - pub fn remove_peer_prefix(&self, id: u32, prefix: Prefix) { - let mut rib = lock!(self.rib_in); - let paths = match rib.get_mut(&prefix) { - None => return, - Some(ps) => ps, - }; - paths.retain(|x| match x.bgp { - Some(ref bgp) => bgp.id != id, - None => true, - }); - - rib.retain(|&_, paths| !paths.is_empty()); - - Self::update_loc_rib(&rib, &mut lock!(self.rib_loc), prefix); - self.notify(prefix.into()); - } - - pub fn remove_peer_prefixes( + pub fn remove_static_routes( &self, - id: u32, - ) -> BTreeMap> { - let mut rib = lock!(self.rib_in); - + routes: Vec, + ) -> Result<(), Error> { let mut pcn = PrefixChangeNotification::default(); - let mut result = BTreeMap::new(); - for (prefix, paths) in rib.iter_mut() { - paths.retain(|x| match x.bgp { - Some(ref bgp) => bgp.id != id, - None => true, + let tree = self.persistent.open_tree(STATIC4_ROUTES)?; + for route in routes { + self.remove_prefix_path(route.prefix, |rib_path: &Path| { + rib_path.cmp(&Path::from(route)) != CmpOrdering::Equal }); - result.insert(*prefix, paths.clone()); - pcn.changed.insert(*prefix); + pcn.changed.insert(route.prefix); + + let key = match serde_json::to_string(&route) { + Err(e) => { + error!( + self.log, + "{} serde_json::to_string conversion failed: {e}", + route.prefix + ); + continue; + } + Ok(s) => s, + }; + if let Err(e) = tree.remove(key.as_str()) { + error!( + self.log, + "{}: remove from tree STATIC4_ROUTES failed: {e}", + route.prefix + ); + continue; + } } - rib.retain(|&_, paths| !paths.is_empty()); + self.notify(pcn); + tree.flush()?; + Ok(()) + } - for prefix in pcn.changed.iter() { - Self::update_loc_rib(&rib, &mut lock!(self.rib_loc), *prefix); + pub fn remove_bgp_prefixes(&self, prefixes: Vec, id: u32) { + // TODO: don't rely on BGP ID for path definition. + // We currently only use Router-IDs to determine which + // peer/session a route was learned from. This will not work + // long term, as it will not work with add-path or multiple + // parallel sessions to the same router. + let mut pcn = PrefixChangeNotification::default(); + for prefix in prefixes { + self.remove_prefix_path(prefix, |rib_path: &Path| { + match rib_path.bgp { + Some(ref bgp) => bgp.id != id, + None => true, + } + }); + pcn.changed.insert(prefix); } self.notify(pcn); - result + } + + // helper function to remove all routes learned from a given peer + // e.g. when peer is deleted or exits Established state + pub fn remove_bgp_peer_prefixes(&self, id: u32) { + // TODO: don't rely on BGP ID for path definition. + // We currently only use Router-IDs to determine which + // peer/session a route was learned from. This will not work + // long term, as it will not work with add-path or multiple + // parallel sessions to the same router. + self.remove_bgp_prefixes(self.full_rib().keys().copied().collect(), id); } pub fn generation(&self) -> u64 { @@ -621,8 +626,13 @@ impl Db { Ok(()) } - pub fn mark_bgp_id_stale(&self, id: u32) { - let mut rib = self.rib_loc.lock().unwrap(); + pub fn mark_bgp_peer_stale(&self, id: u32) { + // TODO: don't rely on BGP ID for path definition. + // We currently only use Router-IDs to determine which + // peer/session a route was learned from. This will not work + // long term, as it will not work with add-path or multiple + // parallel sessions to the same router. + let mut rib = lock!(self.rib_loc); rib.iter_mut().for_each(|(_prefix, path)| { let targets: Vec = path .iter() diff --git a/rdb/src/types.rs b/rdb/src/types.rs index 4726e77a..7bc27265 100644 --- a/rdb/src/types.rs +++ b/rdb/src/types.rs @@ -45,16 +45,12 @@ impl Ord for Path { } } -impl Path { - pub fn for_static( - nexthop: IpAddr, - vlan_id: Option, - rib_priority: u8, - ) -> Self { +impl From for Path { + fn from(value: StaticRouteKey) -> Self { Self { - nexthop, - vlan_id, - rib_priority, + nexthop: value.nexthop, + vlan_id: value.vlan_id, + rib_priority: value.rib_priority, shutdown: false, bgp: None, }