diff --git a/Cargo.lock b/Cargo.lock index 255bf754..4542089f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -271,6 +271,7 @@ name = "bfd" version = "0.1.0" dependencies = [ "anyhow", + "mg-common", "num_enum 0.7.3", "pretty_assertions", "rand", @@ -304,8 +305,6 @@ dependencies = [ "serde", "sled", "slog", - "slog-async", - "slog-bunyan", "socket2", "thiserror", ] @@ -2509,6 +2508,8 @@ dependencies = [ "schemars", "serde", "slog", + "slog-async", + "slog-bunyan", "smf 0.10.0", "thiserror", "tokio", @@ -4001,6 +4002,8 @@ dependencies = [ "serde_json", "sled", "slog", + "slog-async", + "slog-bunyan", "thiserror", ] diff --git a/bfd/Cargo.toml b/bfd/Cargo.toml index c80e5b16..f97d0bac 100644 --- a/bfd/Cargo.toml +++ b/bfd/Cargo.toml @@ -5,6 +5,7 @@ edition = "2021" [dependencies] rdb = { path = "../rdb" } +mg-common.workspace = true slog.workspace = true slog-bunyan.workspace = true slog-async.workspace = true diff --git a/bfd/src/sm.rs b/bfd/src/sm.rs index ed23a049..f7c4bf9d 100644 --- a/bfd/src/sm.rs +++ b/bfd/src/sm.rs @@ -8,6 +8,7 @@ use crate::{ }; use crate::{err, SessionCounters}; use anyhow::{anyhow, Result}; +use mg_common::lock; use slog::{warn, Logger}; use std::net::IpAddr; use std::sync::atomic::{AtomicBool, Ordering}; @@ -197,7 +198,7 @@ impl StateMachine { // Get what we need from peer info, holding the lock a briefly as // possible. let (_delay, demand_mode, your_discriminator) = { - let r = remote.lock().unwrap(); + let r = lock!(remote); ( DeferredDelay(r.required_min_rx), r.demand_mode, @@ -305,7 +306,7 @@ pub(crate) trait State: Sync + Send { log: Logger, ) { let state = self.state(); - let your_discriminator = remote.lock().unwrap().discriminator; + let your_discriminator = lock!(remote).discriminator; let mut pkt = packet::Control { desired_min_tx: local.desired_min_tx.as_micros() as u32, @@ -438,7 +439,7 @@ impl State for Down { db: rdb::Db, counters: Arc, ) -> Result<(Box, BfdEndpoint)> { - db.disable_nexthop(self.peer); + db.set_nexthop_shutdown(self.peer, true); loop { // Get an incoming message let (_addr, msg) = match self.recv( @@ -597,7 +598,7 @@ impl State for Up { db: rdb::Db, counters: Arc, ) -> Result<(Box, BfdEndpoint)> { - db.enable_nexthop(self.peer); + db.set_nexthop_shutdown(self.peer, false); loop { // Get an incoming message let (_addr, msg) = match self.recv( diff --git a/bfd/src/util.rs b/bfd/src/util.rs index 79382a54..7b2fd0ab 100644 --- a/bfd/src/util.rs +++ b/bfd/src/util.rs @@ -3,6 +3,7 @@ // file, You can obtain one at https://mozilla.org/MPL/2.0/. use crate::{packet, PeerInfo}; +use mg_common::lock; use std::sync::{Arc, Mutex}; use std::time::Duration; @@ -117,7 +118,7 @@ macro_rules! err { } pub fn update_peer_info(remote: &Arc>, msg: &packet::Control) { - let mut r = remote.lock().unwrap(); + let mut r = lock!(remote); r.desired_min_tx = Duration::from_micros(msg.desired_min_tx.into()); r.required_min_rx = Duration::from_micros(msg.required_min_rx.into()); r.discriminator = msg.my_discriminator; diff --git a/bgp/Cargo.toml b/bgp/Cargo.toml index 9df43e87..4cf9ec49 100644 --- a/bgp/Cargo.toml +++ b/bgp/Cargo.toml @@ -8,8 +8,6 @@ rdb = { path = "../rdb" } nom.workspace = true num_enum.workspace = true slog.workspace = true -slog-bunyan.workspace = true -slog-async.workspace = true thiserror.workspace = true serde.workspace = true schemars.workspace = true diff --git a/bgp/src/clock.rs b/bgp/src/clock.rs index 6003b8ec..fa9c0d93 100644 --- a/bgp/src/clock.rs +++ b/bgp/src/clock.rs @@ -108,35 +108,35 @@ impl Clock { ) { Self::step( resolution, - &timers.connect_retry_timer.lock().unwrap(), + &lock!(timers.connect_retry_timer), FsmEvent::ConnectRetryTimerExpires, s.clone(), &log, ); Self::step( resolution, - &timers.keepalive_timer.lock().unwrap(), + &lock!(timers.keepalive_timer), FsmEvent::KeepaliveTimerExpires, s.clone(), &log, ); Self::step( resolution, - &timers.hold_timer.lock().unwrap(), + &lock!(timers.hold_timer), FsmEvent::HoldTimerExpires, s.clone(), &log, ); Self::step( resolution, - &timers.idle_hold_timer.lock().unwrap(), + &lock!(timers.idle_hold_timer), FsmEvent::IdleHoldTimerExpires, s.clone(), &log, ); Self::step( resolution, - &timers.delay_open_timer.lock().unwrap(), + &lock!(timers.delay_open_timer), FsmEvent::DelayOpenTimerExpires, s.clone(), &log, diff --git a/bgp/src/connection_tcp.rs b/bgp/src/connection_tcp.rs index d9f290d8..a0537926 100644 --- a/bgp/src/connection_tcp.rs +++ b/bgp/src/connection_tcp.rs @@ -12,7 +12,7 @@ use crate::session::FsmEvent; use crate::to_canonical; use libc::{c_int, sockaddr_storage}; use mg_common::lock; -use slog::{debug, error, info, trace, warn, Logger}; +use slog::{error, info, trace, warn, Logger}; use std::collections::BTreeMap; use std::io::Read; use std::io::Write; @@ -31,6 +31,8 @@ use libc::{c_void, IPPROTO_IP, IPPROTO_IPV6, IPPROTO_TCP}; #[cfg(target_os = "linux")] use libc::{IP_MINTTL, TCP_MD5SIG}; #[cfg(target_os = "illumos")] +use slog::debug; +#[cfg(target_os = "illumos")] use std::collections::HashSet; #[cfg(target_os = "illumos")] use std::time::Instant; @@ -269,7 +271,7 @@ impl BgpConnection for BgpConnectionTcp { #[allow(unused_variables)] fn set_min_ttl(&self, ttl: u8) -> Result<(), Error> { - let conn = self.conn.lock().unwrap(); + let conn = lock!(self.conn); match conn.as_ref() { None => Err(Error::NotConnected), Some(conn) => { @@ -313,7 +315,7 @@ impl BgpConnection for BgpConnectionTcp { key: [u8; MAX_MD5SIG_KEYLEN], ) -> Result<(), Error> { info!(self.log, "setting md5 auth for {}", self.peer); - let conn = self.conn.lock().unwrap(); + let conn = lock!(self.conn); let fd = match conn.as_ref() { None => return Err(Error::NotConnected), Some(c) => c.as_raw_fd(), @@ -329,7 +331,7 @@ impl BgpConnection for BgpConnectionTcp { key: [u8; MAX_MD5SIG_KEYLEN], ) -> Result<(), Error> { info!(self.log, "setting md5 auth for {}", self.peer); - let conn = self.conn.lock().unwrap(); + let conn = lock!(self.conn); match conn.as_ref() { None => return Err(Error::NotConnected), Some(c) => { @@ -594,7 +596,7 @@ impl BgpConnectionTcp { #[cfg(target_os = "illumos")] fn md5_sig_drop(&self) { - let guard = self.sas.lock().unwrap(); + let guard = lock!(self.sas); if let Some(ref sas) = *guard { for (local, peer) in sas.associations.iter() { for (a, b) in sa_set(*local, *peer) { @@ -642,7 +644,7 @@ impl BgpConnectionTcp { locals: Vec, peer: SocketAddr, ) -> Result<(), Error> { - let mut guard = self.sas.lock().unwrap(); + let mut guard = lock!(self.sas); match &mut *guard { Some(sas) => { for local in locals.into_iter() { @@ -705,7 +707,7 @@ impl BgpConnectionTcp { // we may accept a connection from a client (as opposed to the client) // accepting a connection from us, and that will result in the // association set increasing according to the source port of the client. - let guard = sas.lock().unwrap(); + let guard = lock!(sas); if let Some(ref sas) = *guard { for (local, peer) in sas.associations.iter() { for (a, b) in sa_set(*local, *peer) { diff --git a/bgp/src/lib.rs b/bgp/src/lib.rs index e80f96d9..13708583 100644 --- a/bgp/src/lib.rs +++ b/bgp/src/lib.rs @@ -11,7 +11,6 @@ pub mod connection_tcp; pub mod dispatcher; pub mod error; pub mod fanout; -pub mod log; pub mod messages; pub mod policy; pub mod router; diff --git a/bgp/src/policy.rs b/bgp/src/policy.rs index 19d79b7b..aef885a9 100644 --- a/bgp/src/policy.rs +++ b/bgp/src/policy.rs @@ -403,23 +403,13 @@ pub fn load_checker(program_source: &str) -> Result { #[cfg(test)] mod test { - use slog::Drain; - use crate::messages::{ Community, PathAttribute, PathAttributeType, PathAttributeTypeCode, PathAttributeValue, }; use super::*; - - fn log() -> Logger { - let drain = slog_bunyan::new(std::io::stdout()).build().fuse(); - let drain = slog_async::Async::new(drain) - .chan_size(0x8000) - .build() - .fuse(); - slog::Logger::root(drain, slog::o!()) - } + use mg_common::log::init_logger; #[test] fn open_require_4byte_as() { @@ -432,13 +422,15 @@ mod test { .unwrap(); let ast = load_checker(&source).unwrap(); let result = - check_incoming_open(m, &ast, asn.into(), addr, log()).unwrap(); + check_incoming_open(m, &ast, asn.into(), addr, init_logger()) + .unwrap(); assert_eq!(result, CheckerResult::Drop); // check that open messages with the 4-octet AS capability code get accepted let m = OpenMessage::new4(asn.into(), 30, 1701); let result = - check_incoming_open(m, &ast, asn.into(), addr, log()).unwrap(); + check_incoming_open(m, &ast, asn.into(), addr, init_logger()) + .unwrap(); assert_eq!(result, CheckerResult::Accept); } @@ -459,12 +451,14 @@ mod test { std::fs::read_to_string("../bgp/policy/policy-check0.rhai") .unwrap(); let ast = load_checker(&source).unwrap(); - let result = check_incoming_update(m, &ast, asn, addr, log()).unwrap(); + let result = + check_incoming_update(m, &ast, asn, addr, init_logger()).unwrap(); assert_eq!(result, CheckerResult::Drop); // check that messages without the no-export community are accepted let m = UpdateMessage::default(); - let result = check_incoming_update(m, &ast, asn, addr, log()).unwrap(); + let result = + check_incoming_update(m, &ast, asn, addr, init_logger()).unwrap(); assert_eq!(result, CheckerResult::Accept); } @@ -478,9 +472,14 @@ mod test { std::fs::read_to_string("../bgp/policy/policy-shape0.rhai") .unwrap(); let ast = load_shaper(&source).unwrap(); - let result = - shape_outgoing_open(m.clone(), &ast, asn.into(), addr, log()) - .unwrap(); + let result = shape_outgoing_open( + m.clone(), + &ast, + asn.into(), + addr, + init_logger(), + ) + .unwrap(); m.add_four_octet_as(74); assert_eq!(result, ShaperResult::Emit(m.into())); } @@ -503,7 +502,8 @@ mod test { .unwrap(); let ast = load_shaper(&source).unwrap(); let result = - shape_outgoing_update(m.clone(), &ast, asn, addr, log()).unwrap(); + shape_outgoing_update(m.clone(), &ast, asn, addr, init_logger()) + .unwrap(); m.add_community(Community::UserDefined(1701)); assert_eq!(result, ShaperResult::Emit(m.into())); } @@ -528,18 +528,23 @@ mod test { let ast = load_shaper(&source).unwrap(); // ASN 100 should not have any changes - let result: UpdateMessage = - shape_outgoing_update(originated.clone(), &ast, 100, addr, log()) - .unwrap() - .unwrap() - .try_into() - .unwrap(); + let result: UpdateMessage = shape_outgoing_update( + originated.clone(), + &ast, + 100, + addr, + init_logger(), + ) + .unwrap() + .unwrap() + .try_into() + .unwrap(); assert_eq!(result, originated.clone()); // ASN 65402 should have only the 10.128./16 prefix let result: UpdateMessage = - shape_outgoing_update(originated, &ast, 65402, addr, log()) + shape_outgoing_update(originated, &ast, 65402, addr, init_logger()) .unwrap() .unwrap() .try_into() diff --git a/bgp/src/session.rs b/bgp/src/session.rs index 3397d2bd..98fdba8f 100644 --- a/bgp/src/session.rs +++ b/bgp/src/session.rs @@ -14,7 +14,8 @@ use crate::messages::{ }; use crate::policy::{CheckerResult, ShaperResult}; use crate::router::Router; -use crate::{dbg, err, inf, to_canonical, trc, wrn}; +use crate::to_canonical; +use mg_common::{dbg, err, inf, trc, wrn}; use mg_common::{lock, read_lock, write_lock}; use rdb::{Asn, BgpPathProperties, Db, ImportExportPolicy, Prefix, Prefix4}; pub use rdb::{DEFAULT_RIB_PRIORITY_BGP, DEFAULT_ROUTE_PRIORITY}; @@ -850,7 +851,7 @@ impl SessionRunner { return FsmState::Idle; } { - let ht = self.clock.timers.hold_timer.lock().unwrap(); + let ht = lock!(self.clock.timers.hold_timer); ht.reset(); ht.enable(); } @@ -875,7 +876,7 @@ impl SessionRunner { return FsmState::Idle; } { - let ht = self.clock.timers.hold_timer.lock().unwrap(); + let ht = lock!(self.clock.timers.hold_timer); ht.reset(); ht.enable(); } @@ -952,7 +953,7 @@ impl SessionRunner { } lock!(self.clock.timers.connect_retry_timer).disable(); { - let ht = self.clock.timers.hold_timer.lock().unwrap(); + let ht = lock!(self.clock.timers.hold_timer); ht.reset(); ht.enable(); } @@ -1069,7 +1070,7 @@ impl SessionRunner { return FsmState::Idle; } { - let ht = self.clock.timers.hold_timer.lock().unwrap(); + let ht = lock!(self.clock.timers.hold_timer); ht.reset(); ht.enable(); } @@ -1131,12 +1132,12 @@ impl SessionRunner { // session timers and enter session setup. FsmEvent::Message(Message::KeepAlive) => { { - let ht = self.clock.timers.hold_timer.lock().unwrap(); + let ht = lock!(self.clock.timers.hold_timer); ht.reset(); ht.enable(); } { - let kt = self.clock.timers.keepalive_timer.lock().unwrap(); + let kt = lock!(self.clock.timers.keepalive_timer); kt.reset(); kt.enable(); } @@ -1155,8 +1156,8 @@ impl SessionRunner { .receive(m.clone().into()); wrn!(self; "notification received: {:#?}", m); lock!(self.session).connect_retry_counter += 1; - self.clock.timers.hold_timer.lock().unwrap().disable(); - self.clock.timers.keepalive_timer.lock().unwrap().disable(); + lock!(self.clock.timers.hold_timer).disable(); + lock!(self.clock.timers.keepalive_timer).disable(); self.counters .notifications_received .fetch_add(1, Ordering::Relaxed); @@ -1164,7 +1165,7 @@ impl SessionRunner { } FsmEvent::HoldTimerExpires => { wrn!(self; "open sent: hold timer expired"); - self.clock.timers.hold_timer.lock().unwrap().disable(); + lock!(self.clock.timers.hold_timer).disable(); self.send_hold_timer_expired_notification(&pc.conn); self.counters .hold_timer_expirations @@ -1200,7 +1201,7 @@ impl SessionRunner { return FsmState::Idle; } { - let ht = self.clock.timers.hold_timer.lock().unwrap(); + let ht = lock!(self.clock.timers.hold_timer); ht.reset(); ht.enable(); } @@ -1334,11 +1335,11 @@ impl SessionRunner { // We've received an update message from the peer. Reset the hold // timer and apply the update to the RIB. FsmEvent::Message(Message::Update(m)) => { - self.clock.timers.hold_timer.lock().unwrap().reset(); + lock!(self.clock.timers.hold_timer).reset(); inf!(self; "update received: {m:#?}"); let peer_as = lock!(self.session).remote_asn.unwrap_or(0); self.apply_update(m.clone(), pc.id, peer_as); - self.message_history.lock().unwrap().receive(m.into()); + lock!(self.message_history).receive(m.into()); self.counters .updates_received .fetch_add(1, Ordering::Relaxed); @@ -1346,7 +1347,7 @@ impl SessionRunner { } FsmEvent::Message(Message::RouteRefresh(m)) => { - self.clock.timers.hold_timer.lock().unwrap().reset(); + lock!(self.clock.timers.hold_timer).reset(); inf!(self; "route refresh received: {m:#?}"); self.message_history .lock() @@ -1367,7 +1368,7 @@ impl SessionRunner { // with us. Exit established and restart from the connect state. FsmEvent::Message(Message::Notification(m)) => { wrn!(self; "notification received: {m:#?}"); - self.message_history.lock().unwrap().receive(m.into()); + lock!(self.message_history).receive(m.into()); self.counters .notifications_received .fetch_add(1, Ordering::Relaxed); @@ -1381,7 +1382,7 @@ impl SessionRunner { self.counters .keepalives_received .fetch_add(1, Ordering::Relaxed); - self.clock.timers.hold_timer.lock().unwrap().reset(); + lock!(self.clock.timers.hold_timer).reset(); FsmState::Established(pc) } @@ -1504,7 +1505,7 @@ impl SessionRunner { FsmEvent::RouteRefreshNeeded => { if let Some(remote_id) = lock!(self.session).remote_id { - self.db.mark_bgp_id_stale(remote_id); + self.db.mark_bgp_peer_stale(remote_id); self.send_route_refresh(&pc.conn); } FsmState::Established(pc) @@ -1601,8 +1602,8 @@ impl SessionRunner { } { - let mut ht = self.clock.timers.hold_timer.lock().unwrap(); - let mut kt = self.clock.timers.keepalive_timer.lock().unwrap(); + let mut ht = lock!(self.clock.timers.hold_timer); + let mut kt = lock!(self.clock.timers.keepalive_timer); let mut theirs = false; let requested = u64::from(om.hold_time); if requested > 0 { @@ -1685,7 +1686,7 @@ impl SessionRunner { error_subcode, data: Vec::new(), }); - self.message_history.lock().unwrap().send(msg.clone()); + lock!(self.message_history).send(msg.clone()); if let Err(e) = conn.send(msg) { err!(self; "failed to send notification {e}"); @@ -1750,7 +1751,7 @@ impl SessionRunner { } } drop(msg); - self.message_history.lock().unwrap().send(out.clone()); + lock!(self.message_history).send(out.clone()); self.counters.opens_sent.fetch_add(1, Ordering::Relaxed); if let Err(e) = conn.send(out) { @@ -1765,7 +1766,7 @@ impl SessionRunner { } fn is_ebgp(&self) -> bool { - if let Some(remote) = self.session.lock().unwrap().remote_asn { + if let Some(remote) = lock!(self.session).remote_asn { if remote != self.asn.as_u32() { return true; } @@ -1851,8 +1852,7 @@ impl SessionRunner { .path_attributes .push(PathAttributeValue::NextHop(nexthop).into()); - if let Some(med) = self.session.lock().unwrap().multi_exit_discriminator - { + if let Some(med) = lock!(self.session).multi_exit_discriminator { update .path_attributes .push(PathAttributeValue::MultiExitDisc(med).into()); @@ -1861,7 +1861,7 @@ impl SessionRunner { if self.is_ibgp() { update.path_attributes.push( PathAttributeValue::LocalPref( - self.session.lock().unwrap().local_pref.unwrap_or(0), + lock!(self.session).local_pref.unwrap_or(0), ) .into(), ); @@ -1896,7 +1896,6 @@ impl SessionRunner { .collect::>(); update.nlri.retain(|x| message_policy.contains(x)); - update.withdrawn.retain(|x| message_policy.contains(x)); }; let out = match self.shape_update(update, shaper_application)? { @@ -1904,7 +1903,7 @@ impl SessionRunner { ShaperResult::Drop => return Ok(()), }; - self.message_history.lock().unwrap().send(out.clone()); + lock!(self.message_history).send(out.clone()); self.counters.updates_sent.fetch_add(1, Ordering::Relaxed); @@ -1924,13 +1923,13 @@ impl SessionRunner { /// to the connect state. fn exit_established(&self, pc: PeerConnection) -> FsmState { lock!(self.session).connect_retry_counter += 1; - self.clock.timers.hold_timer.lock().unwrap().disable(); - self.clock.timers.keepalive_timer.lock().unwrap().disable(); + lock!(self.clock.timers.hold_timer).disable(); + lock!(self.clock.timers.keepalive_timer).disable(); write_lock!(self.fanout).remove_egress(self.neighbor.host.ip()); // remove peer prefixes from db - self.db.remove_peer_prefixes(pc.id); + self.db.remove_bgp_peer_prefixes(pc.id); FsmState::Idle } @@ -1982,7 +1981,6 @@ impl SessionRunner { .collect::>(); update.nlri.retain(|x| message_policy.contains(x)); - update.withdrawn.retain(|x| message_policy.contains(x)); }; self.update_rib(&update, id, peer_as); @@ -2026,9 +2024,14 @@ impl SessionRunner { /// Update this router's RIB based on an update message from a peer. fn update_rib(&self, update: &UpdateMessage, id: u32, peer_as: u32) { - for w in &update.withdrawn { - self.db.remove_peer_prefix(id, w.as_prefix4().into()); - } + self.db.remove_bgp_prefixes( + update + .withdrawn + .iter() + .map(|w| rdb::Prefix::from(w.as_prefix4())) + .collect(), + id, + ); let originated = match self.db.get_origin4() { Ok(value) => value, @@ -2039,6 +2042,37 @@ impl SessionRunner { }; if !update.nlri.is_empty() { + // TODO: parse and prefer nexthop in MP_REACH_NLRI + // + // Per RFC 4760: + // """ + // The next hop information carried in the MP_REACH_NLRI path attribute + // defines the Network Layer address of the router that SHOULD be used + // as the next hop to the destinations listed in the MP_NLRI attribute + // in the UPDATE message. + // + // [..] + // + // An UPDATE message that carries no NLRI, other than the one encoded in + // the MP_REACH_NLRI attribute, SHOULD NOT carry the NEXT_HOP attribute. + // If such a message contains the NEXT_HOP attribute, the BGP speaker + // that receives the message SHOULD ignore this attribute. + // """ + // + // i.e. + // 1) NEXT_HOP SHOULD NOT be sent unless there are no MP_REACH_NLRI + // 2) NEXT_HOP SHOULD be ignored unless there are no MP_REACH_NLRI + // + // The standards do not state whether an implementation can/should send + // IPv4 Unicast prefixes embedded in an MP_REACH_NLRI attribute or in the + // classic NLRI field of an Update message. If we participate in MP-BGP + // and negotiate IPv4 Unicast, it's entirely likely that we'll peer with + // other BGP speakers falling into any of the combinations: + // a) MP not negotiated, IPv4 Unicast in NLRI, NEXT_HOP included + // b) MP negotiated, IPv4 Unicast in NLRI, NEXT_HOP included + // c) MP negotiated, IPv4 Unicast in NLRI, NEXT_HOP not included + // d) MP negotiated, IPv4 Unicast in MP_REACH_NLRI, NEXT_HOP included + // e) MP negotiated, IPv4 Unicast in MP_REACH_NLRI, NEXT_HOP not included let nexthop = match update.nexthop4() { Some(nh) => nh, None => { @@ -2053,41 +2087,36 @@ impl SessionRunner { } }; - for n in &update.nlri { - let prefix = n.as_prefix4(); - // ignore prefixes we originate - if originated.contains(&prefix) { - continue; - } - - let mut as_path = Vec::new(); - if let Some(segments_list) = update.as_path() { - for segments in &segments_list { - as_path.extend(segments.value.iter()); - } - } - - let path = rdb::Path { - nexthop: nexthop.into(), - shutdown: update.graceful_shutdown(), - rib_priority: DEFAULT_RIB_PRIORITY_BGP, - bgp: Some(BgpPathProperties { - origin_as: peer_as, - id, - med: update.multi_exit_discriminator(), - local_pref: update.local_pref(), - as_path, - stale: None, - }), - vlan_id: lock!(self.session).vlan_id, - }; - - if let Err(e) = - self.db.add_prefix_path(prefix.into(), path.clone(), false) - { - err!(self; "failed to add path {:?} -> {:?}: {e}", prefix, path); + let mut as_path = Vec::new(); + if let Some(segments_list) = update.as_path() { + for segments in &segments_list { + as_path.extend(segments.value.iter()); } } + let path = rdb::Path { + nexthop: nexthop.into(), + shutdown: update.graceful_shutdown(), + rib_priority: DEFAULT_RIB_PRIORITY_BGP, + bgp: Some(BgpPathProperties { + origin_as: peer_as, + id, + med: update.multi_exit_discriminator(), + local_pref: update.local_pref(), + as_path, + stale: None, + }), + vlan_id: lock!(self.session).vlan_id, + }; + + self.db.add_bgp_prefixes( + update + .nlri + .iter() + .filter(|p| !originated.contains(&p.as_prefix4())) + .map(|n| rdb::Prefix::from(n.as_prefix4())) + .collect(), + path.clone(), + ); } //TODO(IPv6) iterate through MpReachNlri attributes for IPv6 diff --git a/bgp/src/test.rs b/bgp/src/test.rs index 3426b38b..5f07cb9a 100644 --- a/bgp/src/test.rs +++ b/bgp/src/test.rs @@ -129,7 +129,7 @@ fn two_router_test_setup( r1_info: Option, r2_info: Option, ) -> (Arc, Arc, Arc, Arc) { - let log = crate::log::init_file_logger(&format!("r1.{name}.log")); + let log = mg_common::log::init_file_logger(&format!("r1.{name}.log")); std::fs::create_dir_all("/tmp").expect("create tmp dir"); @@ -188,7 +188,7 @@ fn two_router_test_setup( // Router 2 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - let log = crate::log::init_file_logger(&format!("r2.{name}.log")); + let log = mg_common::log::init_file_logger(&format!("r2.{name}.log")); let db_path = format!("/tmp/r2.{name}.db"); let _ = std::fs::remove_dir_all(&db_path); diff --git a/ddm/src/admin.rs b/ddm/src/admin.rs index cc3da051..a1a519ae 100644 --- a/ddm/src/admin.rs +++ b/ddm/src/admin.rs @@ -17,6 +17,7 @@ use dropshot::Path; use dropshot::RequestContext; use dropshot::TypedBody; use dropshot::{endpoint, ApiDescriptionRegisterError}; +use mg_common::lock; use mg_common::net::TunnelOrigin; use oxnet::Ipv6Net; use schemars::JsonSchema; @@ -94,7 +95,7 @@ pub fn handler( async fn get_peers( ctx: RequestContext>>, ) -> Result>, HttpError> { - let ctx = ctx.context().lock().unwrap(); + let ctx = lock!(ctx.context()); Ok(HttpResponseOk(ctx.db.peers())) } @@ -109,7 +110,7 @@ async fn expire_peer( params: Path, ) -> Result { let addr = params.into_inner().addr; - let ctx = ctx.context().lock().unwrap(); + let ctx = lock!(ctx.context()); for e in &ctx.event_channels { e.send(Event::Admin(AdminEvent::Expire(addr))) @@ -127,7 +128,7 @@ type PrefixMap = BTreeMap>; async fn get_originated( ctx: RequestContext>>, ) -> Result>, HttpError> { - let ctx = ctx.context().lock().unwrap(); + let ctx = lock!(ctx.context()); let originated = ctx .db .originated() @@ -139,7 +140,7 @@ async fn get_originated( async fn get_originated_tunnel_endpoints( ctx: RequestContext>>, ) -> Result>, HttpError> { - let ctx = ctx.context().lock().unwrap(); + let ctx = lock!(ctx.context()); let originated = ctx .db .originated_tunnel() @@ -151,7 +152,7 @@ async fn get_originated_tunnel_endpoints( async fn get_prefixes( ctx: RequestContext>>, ) -> Result, HttpError> { - let ctx = ctx.context().lock().unwrap(); + let ctx = lock!(ctx.context()); let imported = ctx.db.imported(); let mut result = PrefixMap::default(); @@ -179,7 +180,7 @@ async fn get_prefixes( async fn get_tunnel_endpoints( ctx: RequestContext>>, ) -> Result>, HttpError> { - let ctx = ctx.context().lock().unwrap(); + let ctx = lock!(ctx.context()); let imported = ctx.db.imported_tunnel(); Ok(HttpResponseOk(imported)) } @@ -189,7 +190,7 @@ async fn advertise_prefixes( ctx: RequestContext>>, request: TypedBody>, ) -> Result { - let ctx = ctx.context().lock().unwrap(); + let ctx = lock!(ctx.context()); let prefixes = request.into_inner(); ctx.db .originate(&prefixes) @@ -225,7 +226,7 @@ async fn advertise_tunnel_endpoints( ctx: RequestContext>>, request: TypedBody>, ) -> Result { - let ctx = ctx.context().lock().unwrap(); + let ctx = lock!(ctx.context()); let endpoints = request.into_inner(); slog::info!(ctx.log, "advertise tunnel: {:#?}", endpoints); ctx.db @@ -261,7 +262,7 @@ async fn withdraw_prefixes( ctx: RequestContext>>, request: TypedBody>, ) -> Result { - let ctx = ctx.context().lock().unwrap(); + let ctx = lock!(ctx.context()); let prefixes = request.into_inner(); ctx.db .withdraw(&prefixes) @@ -297,7 +298,7 @@ async fn withdraw_tunnel_endpoints( ctx: RequestContext>>, request: TypedBody>, ) -> Result { - let ctx = ctx.context().lock().unwrap(); + let ctx = lock!(ctx.context()); let endpoints = request.into_inner(); slog::info!(ctx.log, "withdraw tunnel: {:#?}", endpoints); ctx.db @@ -333,7 +334,7 @@ async fn withdraw_tunnel_endpoints( async fn sync( ctx: RequestContext>>, ) -> Result { - let ctx = ctx.context().lock().unwrap(); + let ctx = lock!(ctx.context()); for e in &ctx.event_channels { e.send(Event::Admin(AdminEvent::Sync)).map_err(|e| { @@ -360,9 +361,9 @@ async fn enable_stats( request: TypedBody, ) -> Result { let rq = request.into_inner(); - let ctx = ctx.context().lock().unwrap(); + let ctx = lock!(ctx.context()); - let mut jh = ctx.stats_handler.lock().unwrap(); + let mut jh = lock!(ctx.stats_handler); if jh.is_none() { let hostname = hostname::get() .expect("failed to get hostname") @@ -393,8 +394,8 @@ async fn enable_stats( async fn disable_stats( ctx: RequestContext>>, ) -> Result { - let ctx = ctx.context().lock().unwrap(); - let mut jh = ctx.stats_handler.lock().unwrap(); + let ctx = lock!(ctx.context()); + let mut jh = lock!(ctx.stats_handler); if let Some(ref h) = *jh { h.abort(); } diff --git a/ddm/src/db.rs b/ddm/src/db.rs index b788ddad..eafa5c80 100644 --- a/ddm/src/db.rs +++ b/ddm/src/db.rs @@ -2,6 +2,7 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at https://mozilla.org/MPL/2.0/. +use mg_common::lock; use mg_common::net::TunnelOrigin; use oxnet::{IpNet, Ipv6Net}; use schemars::{JsonSchema, JsonSchema_repr}; @@ -61,46 +62,46 @@ impl Db { }) } pub fn dump(&self) -> DbData { - self.data.lock().unwrap().clone() + lock!(self.data).clone() } pub fn peers(&self) -> HashMap { - self.data.lock().unwrap().peers.clone() + lock!(self.data).peers.clone() } pub fn imported(&self) -> HashSet { - self.data.lock().unwrap().imported.clone() + lock!(self.data).imported.clone() } pub fn imported_count(&self) -> usize { - self.data.lock().unwrap().imported.len() + lock!(self.data).imported.len() } pub fn imported_tunnel(&self) -> HashSet { - self.data.lock().unwrap().imported_tunnel.clone() + lock!(self.data).imported_tunnel.clone() } pub fn imported_tunnel_count(&self) -> usize { - self.data.lock().unwrap().imported_tunnel.len() + lock!(self.data).imported_tunnel.len() } pub fn import(&self, r: &HashSet) { - self.data.lock().unwrap().imported.extend(r.clone()); + lock!(self.data).imported.extend(r.clone()); } pub fn import_tunnel(&self, r: &HashSet) { - self.data.lock().unwrap().imported_tunnel.extend(r.clone()); + lock!(self.data).imported_tunnel.extend(r.clone()); } pub fn delete_import(&self, r: &HashSet) { - let imported = &mut self.data.lock().unwrap().imported; + let imported = &mut lock!(self.data).imported; for x in r { imported.remove(x); } } pub fn delete_import_tunnel(&self, r: &HashSet) { - let imported = &mut self.data.lock().unwrap().imported_tunnel; + let imported = &mut lock!(self.data).imported_tunnel; for x in r { imported.remove(x); } @@ -223,7 +224,7 @@ impl Db { /// Set peer info at the given index. Returns true if peer information was /// changed. pub fn set_peer(&self, index: u32, info: PeerInfo) -> bool { - match self.data.lock().unwrap().peers.insert(index, info.clone()) { + match lock!(self.data).peers.insert(index, info.clone()) { Some(previous) => previous == info, None => true, } @@ -233,7 +234,7 @@ impl Db { &self, nexthop: Ipv6Addr, ) -> (HashSet, HashSet) { - let mut data = self.data.lock().unwrap(); + let mut data = lock!(self.data); // Routes are generally held in sets to prevent duplication and provide // handy set-algebra operations. let mut removed = HashSet::new(); @@ -259,7 +260,7 @@ impl Db { } pub fn remove_peer(&self, index: u32) { - self.data.lock().unwrap().peers.remove(&index); + lock!(self.data).peers.remove(&index); } pub fn routes_by_vector( @@ -267,7 +268,7 @@ impl Db { dst: Ipv6Net, nexthop: Ipv6Addr, ) -> Vec { - let data = self.data.lock().unwrap(); + let data = lock!(self.data); let mut result = Vec::new(); for x in &data.imported { if x.destination == dst && x.nexthop == nexthop { diff --git a/ddm/src/discovery.rs b/ddm/src/discovery.rs index 67cd41a5..acc40f29 100644 --- a/ddm/src/discovery.rs +++ b/ddm/src/discovery.rs @@ -89,6 +89,7 @@ use crate::db::{Db, PeerInfo, PeerStatus, RouterKind}; use crate::sm::{Config, Event, NeighborEvent, SessionStats}; use crate::util::u8_slice_assume_init_ref; use crate::{dbg, err, inf, trc, wrn}; +use mg_common::lock; use serde::{Deserialize, Serialize}; use slog::Logger; use socket2::{Domain, Protocol, SockAddr, Socket, Type}; @@ -514,7 +515,7 @@ fn handle_advertisement( }, ); if updated { - stats.peer_address.lock().unwrap().replace(*sender); + lock!(stats.peer_address).replace(*sender); emit_nbr_update(ctx, sender, version); } } diff --git a/ddmd/src/smf.rs b/ddmd/src/smf.rs index 4ea58872..61bcf1f7 100644 --- a/ddmd/src/smf.rs +++ b/ddmd/src/smf.rs @@ -3,6 +3,7 @@ // file, You can obtain one at https://mozilla.org/MPL/2.0/. use ddm::admin::{HandlerContext, DDM_STATS_PORT}; +use mg_common::lock; use mg_common::smf::get_stats_server_props; use slog::{error, info, warn, Logger}; use smf::PropertyGroup; @@ -59,8 +60,8 @@ fn refresh_stats_server( } }; - let context = ctx.lock().unwrap(); - let mut handler = context.stats_handler.lock().unwrap(); + let context = lock!(ctx); + let mut handler = lock!(context.stats_handler); if handler.is_none() { info!(log, "starting stats server on smf refresh"); match ddm::oxstats::start_server( diff --git a/mg-common/Cargo.toml b/mg-common/Cargo.toml index b6d46bab..b456ecd6 100644 --- a/mg-common/Cargo.toml +++ b/mg-common/Cargo.toml @@ -11,6 +11,8 @@ serde.workspace = true schemars.workspace = true thiserror.workspace = true slog.workspace = true +slog-bunyan.workspace = true +slog-async.workspace = true omicron-common.workspace = true internal-dns.workspace = true tokio.workspace = true diff --git a/mg-common/src/lib.rs b/mg-common/src/lib.rs index c7024822..b9466df4 100644 --- a/mg-common/src/lib.rs +++ b/mg-common/src/lib.rs @@ -3,6 +3,7 @@ // file, You can obtain one at https://mozilla.org/MPL/2.0/. pub mod cli; +pub mod log; pub mod net; pub mod nexus; pub mod smf; diff --git a/bgp/src/log.rs b/mg-common/src/log.rs similarity index 100% rename from bgp/src/log.rs rename to mg-common/src/log.rs diff --git a/mg-lower/src/lib.rs b/mg-lower/src/lib.rs index 6cb408ef..eaf43200 100644 --- a/mg-lower/src/lib.rs +++ b/mg-lower/src/lib.rs @@ -24,7 +24,7 @@ use slog::{error, info, Logger}; use std::collections::HashSet; use std::net::Ipv6Addr; use std::sync::mpsc::{channel, RecvTimeoutError}; -use std::sync::{Arc, Mutex}; +use std::sync::Arc; use std::thread::sleep; use std::time::Duration; @@ -160,7 +160,7 @@ fn handle_change( fn sync_prefix( tep: Ipv6Addr, - rib_loc: Arc>, + rib_loc: Rib, prefix: &Prefix, dpd: &DpdClient, ddm: &DdmClient, @@ -172,7 +172,7 @@ fn sync_prefix( // The best routes in the RIB let mut best: HashSet = HashSet::new(); - if let Some(paths) = rib_loc.lock().unwrap().get(prefix) { + if let Some(paths) = rib_loc.get(prefix) { for path in paths { best.insert(RouteHash::for_prefix_path(*prefix, path.clone())?); } diff --git a/mgd/src/bfd_admin.rs b/mgd/src/bfd_admin.rs index 8fa0bba6..4c836ee3 100644 --- a/mgd/src/bfd_admin.rs +++ b/mgd/src/bfd_admin.rs @@ -13,6 +13,7 @@ use dropshot::HttpResponseUpdatedNoContent; use dropshot::Path; use dropshot::RequestContext; use dropshot::TypedBody; +use mg_common::lock; use rdb::BfdPeerConfig; use rdb::SessionMode; use schemars::JsonSchema; @@ -67,9 +68,7 @@ pub(crate) async fn get_bfd_peers( ctx: RequestContext>, ) -> Result>, HttpError> { let mut result = Vec::new(); - for (addr, session) in - ctx.context().bfd.daemon.lock().unwrap().sessions.iter() - { + for (addr, session) in lock!(ctx.context().bfd.daemon).sessions.iter() { result.push(BfdPeerInfo { config: BfdPeerConfig { peer: *addr, @@ -118,7 +117,7 @@ pub(crate) fn add_peer( ctx: Arc, rq: BfdPeerConfig, ) -> Result<(), HttpError> { - let mut daemon = ctx.bfd.daemon.lock().unwrap(); + let mut daemon = lock!(ctx.bfd.daemon); let dispatcher = ctx.bfd.dispatcher.clone(); let db = ctx.db.clone(); @@ -211,13 +210,7 @@ pub(crate) fn channel( // Ensure there is a dispatcher thread for this listening address and a // corresponding entry in the dispatcher table to send messages from `peer` // to the appropriate session via `remote.tx`. - dispatcher.lock().unwrap().ensure( - listen, - peer, - remote.tx, - dst_port, - log.clone(), - )?; + lock!(dispatcher).ensure(listen, peer, remote.tx, dst_port, log.clone())?; // Spawn an egress thread to take packets from the session and send them // out a UDP socket. diff --git a/mgd/src/bgp_admin.rs b/mgd/src/bgp_admin.rs index edfef88c..73c9fdb9 100644 --- a/mgd/src/bgp_admin.rs +++ b/mgd/src/bgp_admin.rs @@ -20,6 +20,7 @@ use dropshot::{ HttpResponseUpdatedNoContent, Query, RequestContext, TypedBody, }; use http::status::StatusCode; +use mg_common::lock; use rdb::{Asn, BgpRouterInfo, ImportExportPolicy, Prefix}; use slog::info; use std::collections::{BTreeMap, HashMap, HashSet}; @@ -53,12 +54,6 @@ impl BgpContext { } } -macro_rules! lock { - ($mtx:expr) => { - $mtx.lock().expect("lock mutex") - }; -} - macro_rules! get_router { ($ctx:expr, $asn:expr) => { lock!($ctx.bgp.router) @@ -418,8 +413,7 @@ pub async fn get_selected( ) -> Result, HttpError> { let rq = request.into_inner(); let ctx = ctx.context(); - let rib = get_router!(ctx, rq.asn)?.db.loc_rib(); - let selected = rib.lock().unwrap().clone(); + let selected = get_router!(ctx, rq.asn)?.db.loc_rib(); Ok(HttpResponseOk(selected.into())) } @@ -640,10 +634,8 @@ pub async fn message_history( let mut result = HashMap::new(); - for (addr, session) in - get_router!(ctx, rq.asn)?.sessions.lock().unwrap().iter() - { - result.insert(*addr, session.message_history.lock().unwrap().clone()); + for (addr, session) in lock!(get_router!(ctx, rq.asn)?.sessions).iter() { + result.insert(*addr, lock!(session.message_history).clone()); } Ok(HttpResponseOk(MessageHistoryResponse { by_peer: result })) @@ -667,7 +659,7 @@ pub async fn read_checker( ) -> Result, HttpError> { let ctx = ctx.context(); let rq = request.into_inner(); - match ctx.bgp.router.lock().unwrap().get(&rq.asn) { + match lock!(ctx.bgp.router).get(&rq.asn) { None => Err(HttpError::for_not_found( None, String::from("ASN not found"), @@ -724,7 +716,7 @@ pub async fn read_shaper( ) -> Result, HttpError> { let ctx = ctx.context(); let rq = request.into_inner(); - match ctx.bgp.router.lock().unwrap().get(&rq.asn) { + match lock!(ctx.bgp.router).get(&rq.asn) { None => Err(HttpError::for_not_found( None, String::from("ASN not found"), @@ -797,7 +789,7 @@ pub(crate) mod helpers { ) .remote_id .ok_or(Error::NotFound("bgp peer not found".into()))?; - ctx.db.remove_peer_prefixes(id); + ctx.db.remove_bgp_peer_prefixes(id); ctx.db.remove_bgp_neighbor(addr)?; get_router!(&ctx, asn)?.delete_session(addr); @@ -932,7 +924,7 @@ pub(crate) mod helpers { policy: PolicySource, overwrite: bool, ) -> Result { - match ctx.bgp.router.lock().unwrap().get(&asn) { + match lock!(ctx.bgp.router).get(&asn) { None => { return Err(HttpError::for_not_found( None, @@ -993,7 +985,7 @@ pub(crate) mod helpers { asn: u32, policy: PolicyKind, ) -> Result { - match ctx.bgp.router.lock().unwrap().get(&asn) { + match lock!(ctx.bgp.router).get(&asn) { None => { return Err(HttpError::for_not_found( None, diff --git a/mgd/src/main.rs b/mgd/src/main.rs index 5e54723a..549c328b 100644 --- a/mgd/src/main.rs +++ b/mgd/src/main.rs @@ -6,12 +6,13 @@ use crate::admin::HandlerContext; use crate::bfd_admin::BfdContext; use crate::bgp_admin::BgpContext; use bgp::connection_tcp::{BgpConnectionTcp, BgpListenerTcp}; -use bgp::log::init_logger; use clap::{Parser, Subcommand}; use mg_common::cli::oxide_cli_style; +use mg_common::lock; +use mg_common::log::init_logger; use mg_common::stats::MgLowerStats; use rand::Fill; -use rdb::{BfdPeerConfig, BgpNeighborInfo, BgpRouterInfo, Path}; +use rdb::{BfdPeerConfig, BgpNeighborInfo, BgpRouterInfo}; use signal::handle_signals; use slog::{error, Logger}; use std::collections::BTreeMap; @@ -161,7 +162,7 @@ async fn run(args: RunArgs) { if let (Some(rack_uuid), Some(sled_uuid)) = (args.rack_uuid, args.sled_uuid) { - let mut is_running = context.stats_server_running.lock().unwrap(); + let mut is_running = lock!(context.stats_server_running); if !*is_running { match oxstats::start_server( context.clone(), @@ -271,14 +272,9 @@ fn initialize_static_routes(db: &rdb::Db) { let routes = db .get_static4() .expect("failed to get static routes from db"); - for route in &routes { - let path = - Path::for_static(route.nexthop, route.vlan_id, route.rib_priority); - db.add_prefix_path(route.prefix, path, true) - .unwrap_or_else(|e| { - panic!("failed to initialize static route {route:#?}: {e}") - }) - } + db.add_static_routes(&routes).unwrap_or_else(|e| { + panic!("failed to initialize static routes {routes:#?}: {e}") + }) } fn get_tunnel_endpoint_ula(db: &rdb::Db) -> Ipv6Addr { diff --git a/mgd/src/oxstats.rs b/mgd/src/oxstats.rs index 73a00741..91cabe04 100644 --- a/mgd/src/oxstats.rs +++ b/mgd/src/oxstats.rs @@ -6,6 +6,7 @@ use crate::admin::HandlerContext; use crate::bfd_admin::BfdContext; use crate::bgp_admin::BgpContext; use chrono::{DateTime, Utc}; +use mg_common::lock; use mg_common::nexus::{local_underlay_address, run_oximeter}; use mg_common::stats::MgLowerStats; use omicron_common::api::internal::nexus::{ProducerEndpoint, ProducerKind}; @@ -265,12 +266,12 @@ impl Producer for Stats { impl Stats { fn bgp_stats(&mut self) -> Result, MetricsError> { - let routers = self.bgp.router.lock().unwrap(); + let routers = lock!(self.bgp.router); let mut router_counters = BTreeMap::new(); let mut session_count: usize = 0; for (asn, r) in &*routers { let mut session_counters = BTreeMap::new(); - let sessions = r.sessions.lock().unwrap(); + let sessions = lock!(r.sessions); for (addr, session) in &*sessions { session_counters.insert(*addr, session.counters.clone()); session_count += 1; @@ -580,7 +581,7 @@ impl Stats { } fn bfd_stats(&mut self) -> Result, MetricsError> { - let daemon = self.bfd.daemon.lock().unwrap(); + let daemon = lock!(self.bfd.daemon); let mut counters = BTreeMap::new(); for (addr, session) in &daemon.sessions { counters.insert(*addr, session.counters.clone()); diff --git a/mgd/src/smf.rs b/mgd/src/smf.rs index b2d72eb2..fe8ef649 100644 --- a/mgd/src/smf.rs +++ b/mgd/src/smf.rs @@ -3,6 +3,7 @@ // file, You can obtain one at https://mozilla.org/MPL/2.0/. use crate::admin::HandlerContext; +use mg_common::lock; use mg_common::smf::get_stats_server_props; use slog::{error, info, warn, Logger}; use smf::PropertyGroup; @@ -59,7 +60,7 @@ fn refresh_stats_server( } }; - let mut is_running = ctx.stats_server_running.lock().unwrap(); + let mut is_running = lock!(ctx.stats_server_running); if !*is_running { info!(log, "starting stats server on smf refresh"); match crate::oxstats::start_server( diff --git a/mgd/src/static_admin.rs b/mgd/src/static_admin.rs index 0d170227..4ec1a9cd 100644 --- a/mgd/src/static_admin.rs +++ b/mgd/src/static_admin.rs @@ -68,13 +68,10 @@ pub async fn static_add_v4_route( .into_iter() .map(Into::into) .collect(); - for r in routes { - let path = Path::for_static(r.nexthop, r.vlan_id, r.rib_priority); - ctx.context() - .db - .add_prefix_path(r.prefix, path, true) - .map_err(|e| HttpError::for_internal_error(e.to_string()))?; - } + ctx.context() + .db + .add_static_routes(&routes) + .map_err(|e| HttpError::for_internal_error(e.to_string()))?; Ok(HttpResponseUpdatedNoContent()) } @@ -90,13 +87,10 @@ pub async fn static_remove_v4_route( .into_iter() .map(Into::into) .collect(); - for r in routes { - let path = Path::for_static(r.nexthop, r.vlan_id, r.rib_priority); - ctx.context() - .db - .remove_prefix_path(r.prefix, path, true) - .map_err(|e| HttpError::for_internal_error(e.to_string()))?; - } + ctx.context() + .db + .remove_static_routes(&routes) + .map_err(|e| HttpError::for_internal_error(e.to_string()))?; Ok(HttpResponseDeleted()) } diff --git a/rdb/Cargo.toml b/rdb/Cargo.toml index a41c12ae..488db222 100644 --- a/rdb/Cargo.toml +++ b/rdb/Cargo.toml @@ -12,6 +12,8 @@ schemars.workspace = true serde_json.workspace = true thiserror.workspace = true slog.workspace = true +slog-bunyan.workspace = true +slog-async.workspace = true mg-common.workspace = true itertools.workspace = true chrono.workspace = true diff --git a/rdb/src/db.rs b/rdb/src/db.rs index d041b333..de7f5730 100644 --- a/rdb/src/db.rs +++ b/rdb/src/db.rs @@ -15,6 +15,7 @@ use crate::types::*; use chrono::Utc; use mg_common::{lock, read_lock, write_lock}; use slog::{error, Logger}; +use std::cmp::Ordering as CmpOrdering; use std::collections::{BTreeMap, BTreeSet}; use std::net::{IpAddr, Ipv6Addr}; use std::sync::atomic::{AtomicU64, Ordering}; @@ -105,11 +106,11 @@ impl Db { } pub fn set_reaper_interval(&self, interval: std::time::Duration) { - *self.reaper.interval.lock().unwrap() = interval; + *lock!(self.reaper.interval) = interval; } pub fn set_reaper_stale_max(&self, stale_max: chrono::Duration) { - *self.reaper.stale_max.lock().unwrap() = stale_max; + *lock!(self.reaper.stale_max) = stale_max; } /// Register a routing databse watcher. @@ -128,8 +129,8 @@ impl Db { } } - pub fn loc_rib(&self) -> Arc> { - self.rib_loc.clone() + pub fn loc_rib(&self) -> Rib { + lock!(self.rib_loc).clone() } pub fn full_rib(&self) -> Rib { @@ -378,8 +379,13 @@ impl Db { } } - pub fn get_imported(&self) -> Rib { - lock!(self.rib_in).clone() + pub fn get_selected_prefix_paths(&self, prefix: &Prefix) -> Vec { + let rib = lock!(self.rib_loc); + let paths = rib.get(prefix); + match paths { + None => Vec::new(), + Some(p) => p.iter().cloned().collect(), + } } pub fn update_loc_rib(rib_in: &Rib, rib_loc: &mut Rib, prefix: Prefix) { @@ -394,12 +400,7 @@ impl Db { } } - pub fn add_prefix_path( - &self, - prefix: Prefix, - path: Path, - is_static: bool, - ) -> Result<(), Error> { + pub fn add_prefix_path(&self, prefix: Prefix, path: &Path) { let mut rib = lock!(self.rib_in); match rib.get_mut(&prefix) { Some(paths) => { @@ -410,24 +411,47 @@ impl Db { } } Self::update_loc_rib(&rib, &mut lock!(self.rib_loc), prefix); + } + + pub fn add_static_routes( + &self, + routes: &Vec, + ) -> Result<(), Error> { + let tree = self.persistent.open_tree(STATIC4_ROUTES)?; - if is_static { - let tree = self.persistent.open_tree(STATIC4_ROUTES)?; - let srk = StaticRouteKey { - prefix, - nexthop: path.nexthop, - vlan_id: path.vlan_id, - rib_priority: path.rib_priority, - }; - let key = serde_json::to_string(&srk)?; - tree.insert(key.as_str(), "")?; - tree.flush()?; + let mut route_keys = Vec::new(); + for route in routes { + let key = serde_json::to_string(route)?; + route_keys.push(key); } - self.notify(prefix.into()); + tree.transaction(|tx_db| { + for key in &route_keys { + tx_db.insert(key.as_str(), "")?; + } + Ok(()) + })?; + tree.flush()?; + + let mut pcn = PrefixChangeNotification::default(); + for route in routes { + self.add_prefix_path(route.prefix, &Path::from(*route)); + pcn.changed.insert(route.prefix); + } + + self.notify(pcn); Ok(()) } + pub fn add_bgp_prefixes(&self, prefixes: Vec, path: Path) { + let mut pcn = PrefixChangeNotification::default(); + for prefix in prefixes { + self.add_prefix_path(prefix, &path); + pcn.changed.insert(prefix); + } + self.notify(pcn); + } + pub fn get_static4(&self) -> Result, Error> { let tree = self.persistent.open_tree(STATIC4_ROUTES)?; Ok(tree @@ -474,14 +498,14 @@ impl Db { Ok(nexthops.len()) } - pub fn disable_nexthop(&self, nexthop: IpAddr) { + pub fn set_nexthop_shutdown(&self, nexthop: IpAddr, shutdown: bool) { let mut rib = lock!(self.rib_in); let mut pcn = PrefixChangeNotification::default(); for (prefix, paths) in rib.iter_mut() { for p in paths.clone().into_iter() { - if p.nexthop == nexthop && !p.shutdown { + if p.nexthop == nexthop && p.shutdown != shutdown { let mut replacement = p.clone(); - replacement.shutdown = true; + replacement.shutdown = shutdown; paths.insert(replacement); pcn.changed.insert(*prefix); } @@ -491,101 +515,86 @@ impl Db { for prefix in pcn.changed.iter() { Self::update_loc_rib(&rib, &mut lock!(self.rib_loc), *prefix); } - self.notify(pcn); } - pub fn enable_nexthop(&self, nexthop: IpAddr) { + pub fn remove_prefix_path(&self, prefix: Prefix, prefix_cmp: F) + where + F: Fn(&Path) -> bool, + { let mut rib = lock!(self.rib_in); - let mut pcn = PrefixChangeNotification::default(); - for (prefix, paths) in rib.iter_mut() { - for p in paths.clone().into_iter() { - if p.nexthop == nexthop && p.shutdown { - let mut replacement = p.clone(); - replacement.shutdown = false; - paths.insert(replacement); - pcn.changed.insert(*prefix); - } + if let Some(paths) = rib.get_mut(&prefix) { + paths.retain(|p| !prefix_cmp(p)); + if paths.is_empty() { + rib.remove(&prefix); } } - //TODO loc_rib updater as a pcn listener? - for prefix in pcn.changed.iter() { - Self::update_loc_rib(&rib, &mut lock!(self.rib_loc), *prefix); - } - self.notify(pcn); + Self::update_loc_rib(&rib, &mut lock!(self.rib_loc), prefix); } - pub fn remove_prefix_path( + pub fn remove_static_routes( &self, - prefix: Prefix, - path: Path, - is_static: bool, //TODO + routes: &Vec, ) -> Result<(), Error> { - let mut rib = lock!(self.rib_in); - if let Some(paths) = rib.get_mut(&prefix) { - paths.retain(|x| x.nexthop != path.nexthop) - } + let tree = self.persistent.open_tree(STATIC4_ROUTES)?; - if is_static { - let tree = self.persistent.open_tree(STATIC4_ROUTES)?; - let srk = StaticRouteKey { - prefix, - nexthop: path.nexthop, - vlan_id: path.vlan_id, - rib_priority: path.rib_priority, - }; - let key = serde_json::to_string(&srk)?; - tree.remove(key.as_str())?; - tree.flush()?; + let mut route_keys = Vec::new(); + for route in routes { + let key = serde_json::to_string(route)?; + route_keys.push(key); } - Self::update_loc_rib(&rib, &mut lock!(self.rib_loc), prefix); - self.notify(prefix.into()); - Ok(()) - } - - pub fn remove_peer_prefix(&self, id: u32, prefix: Prefix) { - let mut rib = lock!(self.rib_in); - let paths = match rib.get_mut(&prefix) { - None => return, - Some(ps) => ps, - }; - paths.retain(|x| match x.bgp { - Some(ref bgp) => bgp.id != id, - None => true, - }); - - rib.retain(|&_, paths| !paths.is_empty()); - - Self::update_loc_rib(&rib, &mut lock!(self.rib_loc), prefix); - self.notify(prefix.into()); - } - - pub fn remove_peer_prefixes( - &self, - id: u32, - ) -> BTreeMap> { - let mut rib = lock!(self.rib_in); + tree.transaction(|tx_db| { + for key in &route_keys { + tx_db.remove(key.as_str())?; + } + Ok(()) + })?; + tree.flush()?; let mut pcn = PrefixChangeNotification::default(); - let mut result = BTreeMap::new(); - for (prefix, paths) in rib.iter_mut() { - paths.retain(|x| match x.bgp { - Some(ref bgp) => bgp.id != id, - None => true, + for route in routes { + self.remove_prefix_path(route.prefix, |rib_path: &Path| { + rib_path.cmp(&Path::from(*route)) == CmpOrdering::Equal }); - result.insert(*prefix, paths.clone()); - pcn.changed.insert(*prefix); + pcn.changed.insert(route.prefix); } - rib.retain(|&_, paths| !paths.is_empty()); + self.notify(pcn); + Ok(()) + } - for prefix in pcn.changed.iter() { - Self::update_loc_rib(&rib, &mut lock!(self.rib_loc), *prefix); + pub fn remove_bgp_prefixes(&self, prefixes: Vec, id: u32) { + // TODO: don't rely on BGP ID for path definition. + // See: maghemite #241 + // We currently only use Router-IDs to determine which + // peer/session a route was learned from. This will not work + // long term, as it will not work with add-path or multiple + // parallel sessions to the same router. + let mut pcn = PrefixChangeNotification::default(); + for prefix in prefixes { + self.remove_prefix_path(prefix, |rib_path: &Path| { + match rib_path.bgp { + Some(ref bgp) => bgp.id == id, + None => true, + } + }); + pcn.changed.insert(prefix); } self.notify(pcn); - result + } + + // helper function to remove all routes learned from a given peer + // e.g. when peer is deleted or exits Established state + pub fn remove_bgp_peer_prefixes(&self, id: u32) { + // TODO: don't rely on BGP ID for path definition. + // See: maghemite #241 + // We currently only use Router-IDs to determine which + // peer/session a route was learned from. This will not work + // long term, as it will not work with add-path or multiple + // parallel sessions to the same router. + self.remove_bgp_prefixes(self.full_rib().keys().copied().collect(), id); } pub fn generation(&self) -> u64 { @@ -617,8 +626,14 @@ impl Db { Ok(()) } - pub fn mark_bgp_id_stale(&self, id: u32) { - let mut rib = self.rib_loc.lock().unwrap(); + pub fn mark_bgp_peer_stale(&self, id: u32) { + // TODO: don't rely on BGP ID for path definition. + // See: maghemite #241 + // We currently only use Router-IDs to determine which + // peer/session a route was learned from. This will not work + // long term, as it will not work with add-path or multiple + // parallel sessions to the same router. + let mut rib = lock!(self.rib_loc); rib.iter_mut().for_each(|(_prefix, path)| { let targets: Vec = path .iter() @@ -661,7 +676,7 @@ impl Reaper { let s = self.clone(); spawn(move || loop { s.reap(); - sleep(*s.interval.lock().unwrap()); + sleep(*lock!(s.interval)); }); } @@ -678,7 +693,7 @@ impl Reaper { b.stale .map(|s| { Utc::now().signed_duration_since(s) - < *self.stale_max.lock().unwrap() + < *lock!(self.stale_max) }) .unwrap_or(true) }) @@ -687,3 +702,190 @@ impl Reaper { }); } } + +#[cfg(test)] +mod test { + use slog::{Drain, Logger}; + use std::fs::File; + use std::io::Write; + + pub fn init_file_logger(filename: &str) -> Logger { + build_logger(File::create(filename).expect("build logger")) + } + + pub fn build_logger(w: W) -> Logger { + let drain = slog_bunyan::new(w).build().fuse(); + let drain = slog_async::Async::new(drain) + .chan_size(0x8000) + .build() + .fuse(); + slog::Logger::root(drain, slog::o!()) + } + + #[test] + fn test_rib() { + use crate::StaticRouteKey; + use crate::{ + db::Db, BgpPathProperties, Path, Prefix, Prefix4, + DEFAULT_RIB_PRIORITY_BGP, DEFAULT_RIB_PRIORITY_STATIC, + }; + // init test vars + let p0 = Prefix::from("192.168.0.0/24".parse::().unwrap()); + let p1 = Prefix::from("192.168.1.0/24".parse::().unwrap()); + let p2 = Prefix::from("192.168.2.0/24".parse::().unwrap()); + let nh0 = "203.0.113.0"; + let nh1 = "203.0.113.1"; + let bgp_path0 = Path { + nexthop: nh0.parse().unwrap(), + rib_priority: DEFAULT_RIB_PRIORITY_BGP, + shutdown: false, + bgp: Some(BgpPathProperties { + origin_as: 1111, + id: 1111, + med: Some(1111), + local_pref: Some(1111), + as_path: vec![1111, 1111, 1111], + stale: None, + }), + vlan_id: None, + }; + let bgp_path1 = Path { + nexthop: nh1.parse().unwrap(), + rib_priority: DEFAULT_RIB_PRIORITY_BGP, + shutdown: false, + bgp: Some(BgpPathProperties { + origin_as: 2222, + id: 2222, + med: Some(2222), + local_pref: Some(2222), + as_path: vec![2222, 2222, 2222], + stale: None, + }), + vlan_id: None, + }; + let static_key0 = StaticRouteKey { + prefix: p0, + nexthop: nh0.parse().unwrap(), + vlan_id: None, + rib_priority: DEFAULT_RIB_PRIORITY_STATIC, + }; + let static_path0 = Path::from(static_key0); + let static_key1 = StaticRouteKey { + prefix: p0, + nexthop: nh0.parse().unwrap(), + vlan_id: None, + rib_priority: DEFAULT_RIB_PRIORITY_STATIC + 10, + }; + let static_path1 = Path::from(static_key1); + + // setup + let log = init_file_logger("/tmp/rib.log"); + let db_path = "/tmp/rb.db".to_string(); + let _ = std::fs::remove_dir_all(&db_path); + let db = Db::new(&db_path, log.clone()).expect("create db"); + + // Start test cases + + // start from empty rib + assert!(db.full_rib().is_empty()); + assert!(db.loc_rib().is_empty()); + + // both paths have the same next-hop, but not all fields + // from StaticRouteKey match (rib_priority is different). + db.add_static_routes(&vec![static_key0, static_key1]) + .expect( + "add_static_routes failed for {static_key0} and {static_key1}", + ); + + // both paths should get installed in rib_in, but only + // static_path0 should get into rib_loc. + assert_eq!( + db.get_prefix_paths(&p0), + vec![static_path0.clone(), static_path1.clone()] + ); + assert_eq!( + db.get_selected_prefix_paths(&p0), + vec![static_path0.clone()] + ); + + // rib_priority differs, so removal of static_key0 + // should not affect path from static_key1 + db.remove_static_routes(&vec![static_key0]) + .expect("remove_static_routes_failed for {static_key0}"); + assert_eq!(db.get_prefix_paths(&p0), vec![static_path1.clone()]); + assert_eq!( + db.get_selected_prefix_paths(&p0), + vec![static_path1.clone()] + ); + + // install bgp routes + db.add_bgp_prefixes(vec![p0, p1], bgp_path0.clone()); + db.add_bgp_prefixes(vec![p1, p2], bgp_path1.clone()); + + // p0 via paths static_path0 and bgp_path0 in rib_in + assert_eq!( + db.get_prefix_paths(&p0), + vec![static_path1.clone(), bgp_path0.clone()] + ); + // p0 via path static_path0 in rib_loc + assert_eq!( + db.get_selected_prefix_paths(&p0), + vec![static_path1.clone()] + ); + + // p1 via both bgp_path0 and bgp_path1 in rib_in + assert_eq!( + db.get_prefix_paths(&p1), + vec![bgp_path0.clone(), bgp_path1.clone()] + ); + // p1 via bgp_path1 in rib_loc (highest local_pref) + assert_eq!(db.get_selected_prefix_paths(&p1), vec![bgp_path1.clone()]); + + // p2 via bgp_path1 in rib_in + assert_eq!(db.get_prefix_paths(&p2), vec![bgp_path1.clone()]); + // p2 via bgp_path1 in rib_loc + assert_eq!(db.get_selected_prefix_paths(&p2), vec![bgp_path1.clone()]); + + // withdrawal of p2 via bgp_path1 + db.remove_bgp_prefixes(vec![p2], bgp_path1.bgp.clone().unwrap().id); + assert!(db.get_prefix_paths(&p2).is_empty()); + assert!(db.get_selected_prefix_paths(&p2).is_empty()); + // p1 is unaffected + assert_eq!( + db.get_prefix_paths(&p1), + vec![bgp_path0.clone(), bgp_path1.clone()] + ); + assert_eq!(db.get_selected_prefix_paths(&p1), vec![bgp_path1.clone()]); + + // yank all routes from bgp_path0, simulating peer shutdown + db.remove_bgp_peer_prefixes(bgp_path0.bgp.clone().unwrap().id); + // p0 via static_path1 in rib_in and rib_loc + assert_eq!(db.get_prefix_paths(&p0), vec![static_path1.clone()]); + assert_eq!( + db.get_selected_prefix_paths(&p0), + vec![static_path1.clone()] + ); + // p1 via bgp_path1 in rib_in and rib_loc + assert_eq!(db.get_prefix_paths(&p1), vec![bgp_path1.clone()]); + assert_eq!(db.get_selected_prefix_paths(&p1), vec![bgp_path1.clone()]); + // p2 not present, test that key is missing + assert!(db.get_prefix_paths(&p2).is_empty()); + assert!(db.get_selected_prefix_paths(&p2).is_empty()); + + // withdrawal of p1 via bgp_path1 + db.remove_bgp_prefixes(vec![p1], bgp_path1.bgp.clone().unwrap().id); + assert!(db.get_prefix_paths(&p1).is_empty()); + assert!(db.get_selected_prefix_paths(&p1).is_empty()); + + // removal of final static route (from static_key1) should result + // in the prefix being completely deleted + db.remove_static_routes(&vec![static_key1]) + .expect("remove_static_routes_failed for {static_key1}"); + assert!(db.get_prefix_paths(&p0).is_empty()); + assert!(db.get_selected_prefix_paths(&p0).is_empty()); + + // rib should be empty again + assert!(db.full_rib().is_empty()); + assert!(db.loc_rib().is_empty()); + } +} diff --git a/rdb/src/error.rs b/rdb/src/error.rs index 666811a5..6df54728 100644 --- a/rdb/src/error.rs +++ b/rdb/src/error.rs @@ -7,6 +7,9 @@ pub enum Error { #[error("datastore error {0}")] DataStore(#[from] sled::Error), + #[error("data store transaction {0}")] + DataStoreTransaction(#[from] sled::transaction::TransactionError), + #[error("serialization error {0}")] Serialization(#[from] serde_json::Error), diff --git a/rdb/src/types.rs b/rdb/src/types.rs index 4726e77a..7bc27265 100644 --- a/rdb/src/types.rs +++ b/rdb/src/types.rs @@ -45,16 +45,12 @@ impl Ord for Path { } } -impl Path { - pub fn for_static( - nexthop: IpAddr, - vlan_id: Option, - rib_priority: u8, - ) -> Self { +impl From for Path { + fn from(value: StaticRouteKey) -> Self { Self { - nexthop, - vlan_id, - rib_priority, + nexthop: value.nexthop, + vlan_id: value.vlan_id, + rib_priority: value.rib_priority, shutdown: false, bgp: None, }