diff --git a/beacon_node/client/src/lib.rs b/beacon_node/client/src/lib.rs index 584a0d736de..399aa06511e 100644 --- a/beacon_node/client/src/lib.rs +++ b/beacon_node/client/src/lib.rs @@ -46,20 +46,6 @@ impl Client { self.http_metrics_listen_addr } - /// Returns the ipv4 port of the client's libp2p stack, if it was started. - pub fn libp2p_listen_ipv4_port(&self) -> Option { - self.network_globals - .as_ref() - .and_then(|n| n.listen_port_tcp4()) - } - - /// Returns the ipv6 port of the client's libp2p stack, if it was started. - pub fn libp2p_listen_ipv6_port(&self) -> Option { - self.network_globals - .as_ref() - .and_then(|n| n.listen_port_tcp6()) - } - /// Returns the list of libp2p addresses the client is listening to. pub fn libp2p_listen_addresses(&self) -> Option> { self.network_globals.as_ref().map(|n| n.listen_multiaddrs()) diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index 8f4c3d31408..de1e5113465 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -2237,12 +2237,8 @@ pub fn serve( })?; if let Some(peer_info) = network_globals.peers.read().peer_info(&peer_id) { - let address = if let Some(socket_addr) = peer_info.seen_addresses().next() { - let mut addr = lighthouse_network::Multiaddr::from(socket_addr.ip()); - addr.push(lighthouse_network::multiaddr::Protocol::Tcp( - socket_addr.port(), - )); - addr.to_string() + let address = if let Some(multiaddr) = peer_info.seen_multiaddrs().next() { + multiaddr.to_string() } else if let Some(addr) = peer_info.listening_addresses().first() { addr.to_string() } else { @@ -2288,13 +2284,8 @@ pub fn serve( .peers() .for_each(|(peer_id, peer_info)| { let address = - if let Some(socket_addr) = peer_info.seen_addresses().next() { - let mut addr = - lighthouse_network::Multiaddr::from(socket_addr.ip()); - addr.push(lighthouse_network::multiaddr::Protocol::Tcp( - socket_addr.port(), - )); - addr.to_string() + if let Some(multiaddr) = peer_info.seen_multiaddrs().next() { + multiaddr.to_string() } else if let Some(addr) = peer_info.listening_addresses().first() { addr.to_string() } else { diff --git a/beacon_node/http_api/src/test_utils.rs b/beacon_node/http_api/src/test_utils.rs index 15b7ecd331c..2dec31210c4 100644 --- a/beacon_node/http_api/src/test_utils.rs +++ b/beacon_node/http_api/src/test_utils.rs @@ -149,8 +149,6 @@ pub async fn create_api_server_on_port( let enr = EnrBuilder::new("v4").build(&enr_key).unwrap(); let network_globals = Arc::new(NetworkGlobals::new( enr.clone(), - Some(TCP_PORT), - None, meta_data, vec![], false, diff --git a/beacon_node/lighthouse_network/Cargo.toml b/beacon_node/lighthouse_network/Cargo.toml index 925d278ad62..3e399f74800 100644 --- a/beacon_node/lighthouse_network/Cargo.toml +++ b/beacon_node/lighthouse_network/Cargo.toml @@ -44,12 +44,13 @@ prometheus-client = "0.21.0" unused_port = { path = "../../common/unused_port" } delay_map = "0.3.0" void = "1" +libp2p-quic= { version = "0.9.2", features=["tokio"]} libp2p-mplex = "0.40.0" [dependencies.libp2p] version = "0.52" default-features = false -features = ["websocket", "identify", "yamux", "noise", "gossipsub", "dns", "tcp", "tokio", "plaintext", "secp256k1", "macros", "ecdsa"] +features = ["identify", "yamux", "noise", "gossipsub", "dns", "tcp", "tokio", "plaintext", "secp256k1", "macros", "ecdsa"] [dev-dependencies] slog-term = "2.6.0" @@ -59,6 +60,3 @@ exit-future = "0.2.0" void = "1" quickcheck = "0.9.2" quickcheck_macros = "0.9.1" - -[features] -libp2p-websocket = [] \ No newline at end of file diff --git a/beacon_node/lighthouse_network/src/config.rs b/beacon_node/lighthouse_network/src/config.rs index 268ec8f7bfe..7b850e4f0d4 100644 --- a/beacon_node/lighthouse_network/src/config.rs +++ b/beacon_node/lighthouse_network/src/config.rs @@ -58,18 +58,24 @@ pub struct Config { /// that no discovery address has been set in the CLI args. pub enr_address: (Option, Option), - /// The udp4 port to broadcast to peers in order to reach back for discovery. + /// The udp ipv4 port to broadcast to peers in order to reach back for discovery. pub enr_udp4_port: Option, - /// The tcp4 port to broadcast to peers in order to reach back for libp2p services. + /// The quic ipv4 port to broadcast to peers in order to reach back for libp2p services. + pub enr_quic4_port: Option, + + /// The tcp ipv4 port to broadcast to peers in order to reach back for libp2p services. pub enr_tcp4_port: Option, - /// The udp6 port to broadcast to peers in order to reach back for discovery. + /// The udp ipv6 port to broadcast to peers in order to reach back for discovery. pub enr_udp6_port: Option, - /// The tcp6 port to broadcast to peers in order to reach back for libp2p services. + /// The tcp ipv6 port to broadcast to peers in order to reach back for libp2p services. pub enr_tcp6_port: Option, + /// The quic ipv6 port to broadcast to peers in order to reach back for libp2p services. + pub enr_quic6_port: Option, + /// Target number of connected peers. pub target_peers: usize, @@ -102,6 +108,9 @@ pub struct Config { /// Disables the discovery protocol from starting. pub disable_discovery: bool, + /// Disables quic support. + pub disable_quic_support: bool, + /// Attempt to construct external port mappings with UPnP. pub upnp_enabled: bool, @@ -149,57 +158,76 @@ impl Config { /// Sets the listening address to use an ipv4 address. The discv5 ip_mode and table filter are /// adjusted accordingly to ensure addresses that are present in the enr are globally /// reachable. - pub fn set_ipv4_listening_address(&mut self, addr: Ipv4Addr, tcp_port: u16, udp_port: u16) { + pub fn set_ipv4_listening_address( + &mut self, + addr: Ipv4Addr, + tcp_port: u16, + disc_port: u16, + quic_port: u16, + ) { self.listen_addresses = ListenAddress::V4(ListenAddr { addr, - udp_port, + disc_port, + quic_port, tcp_port, }); - self.discv5_config.listen_config = discv5::ListenConfig::from_ip(addr.into(), udp_port); + self.discv5_config.listen_config = discv5::ListenConfig::from_ip(addr.into(), disc_port); self.discv5_config.table_filter = |enr| enr.ip4().as_ref().map_or(false, is_global_ipv4) } /// Sets the listening address to use an ipv6 address. The discv5 ip_mode and table filter is /// adjusted accordingly to ensure addresses that are present in the enr are globally /// reachable. - pub fn set_ipv6_listening_address(&mut self, addr: Ipv6Addr, tcp_port: u16, udp_port: u16) { + pub fn set_ipv6_listening_address( + &mut self, + addr: Ipv6Addr, + tcp_port: u16, + disc_port: u16, + quic_port: u16, + ) { self.listen_addresses = ListenAddress::V6(ListenAddr { addr, - udp_port, + disc_port, + quic_port, tcp_port, }); - self.discv5_config.listen_config = discv5::ListenConfig::from_ip(addr.into(), udp_port); + self.discv5_config.listen_config = discv5::ListenConfig::from_ip(addr.into(), disc_port); self.discv5_config.table_filter = |enr| enr.ip6().as_ref().map_or(false, is_global_ipv6) } /// Sets the listening address to use both an ipv4 and ipv6 address. The discv5 ip_mode and /// table filter is adjusted accordingly to ensure addresses that are present in the enr are /// globally reachable. + #[allow(clippy::too_many_arguments)] pub fn set_ipv4_ipv6_listening_addresses( &mut self, v4_addr: Ipv4Addr, tcp4_port: u16, - udp4_port: u16, + disc4_port: u16, + quic4_port: u16, v6_addr: Ipv6Addr, tcp6_port: u16, - udp6_port: u16, + disc6_port: u16, + quic6_port: u16, ) { self.listen_addresses = ListenAddress::DualStack( ListenAddr { addr: v4_addr, - udp_port: udp4_port, + disc_port: disc4_port, + quic_port: quic4_port, tcp_port: tcp4_port, }, ListenAddr { addr: v6_addr, - udp_port: udp6_port, + disc_port: disc6_port, + quic_port: quic6_port, tcp_port: tcp6_port, }, ); self.discv5_config.listen_config = discv5::ListenConfig::default() - .with_ipv4(v4_addr, udp4_port) - .with_ipv6(v6_addr, udp6_port); + .with_ipv4(v4_addr, disc4_port) + .with_ipv6(v6_addr, disc6_port); self.discv5_config.table_filter = |enr| match (&enr.ip4(), &enr.ip6()) { (None, None) => false, @@ -213,27 +241,32 @@ impl Config { match listen_addr { ListenAddress::V4(ListenAddr { addr, - udp_port, + disc_port, + quic_port, tcp_port, - }) => self.set_ipv4_listening_address(addr, tcp_port, udp_port), + }) => self.set_ipv4_listening_address(addr, tcp_port, disc_port, quic_port), ListenAddress::V6(ListenAddr { addr, - udp_port, + disc_port, + quic_port, tcp_port, - }) => self.set_ipv6_listening_address(addr, tcp_port, udp_port), + }) => self.set_ipv6_listening_address(addr, tcp_port, disc_port, quic_port), ListenAddress::DualStack( ListenAddr { addr: ip4addr, - udp_port: udp4_port, + disc_port: disc4_port, + quic_port: quic4_port, tcp_port: tcp4_port, }, ListenAddr { addr: ip6addr, - udp_port: udp6_port, + disc_port: disc6_port, + quic_port: quic6_port, tcp_port: tcp6_port, }, ) => self.set_ipv4_ipv6_listening_addresses( - ip4addr, tcp4_port, udp4_port, ip6addr, tcp6_port, udp6_port, + ip4addr, tcp4_port, disc4_port, quic4_port, ip6addr, tcp6_port, disc6_port, + quic6_port, ), } } @@ -272,7 +305,8 @@ impl Default for Config { ); let listen_addresses = ListenAddress::V4(ListenAddr { addr: Ipv4Addr::UNSPECIFIED, - udp_port: 9000, + disc_port: 9000, + quic_port: 9001, tcp_port: 9000, }); @@ -305,10 +339,11 @@ impl Default for Config { network_dir, listen_addresses, enr_address: (None, None), - enr_udp4_port: None, + enr_quic4_port: None, enr_tcp4_port: None, enr_udp6_port: None, + enr_quic6_port: None, enr_tcp6_port: None, target_peers: 50, gs_config, @@ -320,6 +355,7 @@ impl Default for Config { disable_peer_scoring: false, client_version: lighthouse_version::version_with_platform(), disable_discovery: false, + disable_quic_support: false, upnp_enabled: true, network_load: 3, private: false, diff --git a/beacon_node/lighthouse_network/src/discovery/enr.rs b/beacon_node/lighthouse_network/src/discovery/enr.rs index ef22f816a77..3f46285a807 100644 --- a/beacon_node/lighthouse_network/src/discovery/enr.rs +++ b/beacon_node/lighthouse_network/src/discovery/enr.rs @@ -17,6 +17,8 @@ use std::path::Path; use std::str::FromStr; use types::{EnrForkId, EthSpec}; +use super::enr_ext::{EnrExt, QUIC6_ENR_KEY, QUIC_ENR_KEY}; + /// The ENR field specifying the fork id. pub const ETH2_ENR_KEY: &str = "eth2"; /// The ENR field specifying the attestation subnet bitfield. @@ -142,7 +144,7 @@ pub fn build_or_load_enr( pub fn create_enr_builder_from_config( config: &NetworkConfig, - enable_tcp: bool, + enable_libp2p: bool, ) -> EnrBuilder { let mut builder = EnrBuilder::new("v4"); let (maybe_ipv4_address, maybe_ipv6_address) = &config.enr_address; @@ -163,7 +165,28 @@ pub fn create_enr_builder_from_config( builder.udp6(udp6_port); } - if enable_tcp { + if enable_libp2p { + // Add QUIC fields to the ENR. + // Since QUIC is used as an alternative transport for the libp2p protocols, + // the related fields should only be added when both QUIC and libp2p are enabled + if !config.disable_quic_support { + // If we are listening on ipv4, add the quic ipv4 port. + if let Some(quic4_port) = config + .enr_quic4_port + .or_else(|| config.listen_addrs().v4().map(|v4_addr| v4_addr.quic_port)) + { + builder.add_value(QUIC_ENR_KEY, &quic4_port); + } + + // If we are listening on ipv6, add the quic ipv6 port. + if let Some(quic6_port) = config + .enr_quic6_port + .or_else(|| config.listen_addrs().v6().map(|v6_addr| v6_addr.quic_port)) + { + builder.add_value(QUIC6_ENR_KEY, &quic6_port); + } + } + // If the ENR port is not set, and we are listening over that ip version, use the listening port instead. let tcp4_port = config .enr_tcp4_port @@ -218,6 +241,9 @@ fn compare_enr(local_enr: &Enr, disk_enr: &Enr) -> bool { // tcp ports must match && local_enr.tcp4() == disk_enr.tcp4() && local_enr.tcp6() == disk_enr.tcp6() + // quic ports must match + && local_enr.quic4() == disk_enr.quic4() + && local_enr.quic6() == disk_enr.quic6() // must match on the same fork && local_enr.get(ETH2_ENR_KEY) == disk_enr.get(ETH2_ENR_KEY) // take preference over disk udp port if one is not specified diff --git a/beacon_node/lighthouse_network/src/discovery/enr_ext.rs b/beacon_node/lighthouse_network/src/discovery/enr_ext.rs index 753da6292ca..2efaa76ac31 100644 --- a/beacon_node/lighthouse_network/src/discovery/enr_ext.rs +++ b/beacon_node/lighthouse_network/src/discovery/enr_ext.rs @@ -6,12 +6,15 @@ use libp2p::core::multiaddr::Protocol; use libp2p::identity::{ed25519, secp256k1, KeyType, Keypair, PublicKey}; use tiny_keccak::{Hasher, Keccak}; +pub const QUIC_ENR_KEY: &str = "quic"; +pub const QUIC6_ENR_KEY: &str = "quic6"; + /// Extend ENR for libp2p types. pub trait EnrExt { /// The libp2p `PeerId` for the record. fn peer_id(&self) -> PeerId; - /// Returns a list of multiaddrs if the ENR has an `ip` and either a `tcp` or `udp` key **or** an `ip6` and either a `tcp6` or `udp6`. + /// Returns a list of multiaddrs if the ENR has an `ip` and one of [`tcp`,`udp`,`quic`] key **or** an `ip6` and one of [`tcp6`,`udp6`,`quic6`]. /// The vector remains empty if these fields are not defined. fn multiaddr(&self) -> Vec; @@ -26,6 +29,15 @@ pub trait EnrExt { /// Returns any multiaddrs that contain the TCP protocol. fn multiaddr_tcp(&self) -> Vec; + + /// Returns any QUIC multiaddrs that are registered in this ENR. + fn multiaddr_quic(&self) -> Vec; + + /// Returns the quic port if one is set. + fn quic4(&self) -> Option; + + /// Returns the quic6 port if one is set. + fn quic6(&self) -> Option; } /// Extend ENR CombinedPublicKey for libp2p types. @@ -49,7 +61,17 @@ impl EnrExt for Enr { self.public_key().as_peer_id() } - /// Returns a list of multiaddrs if the ENR has an `ip` and either a `tcp` or `udp` key **or** an `ip6` and either a `tcp6` or `udp6`. + /// Returns the quic port if one is set. + fn quic4(&self) -> Option { + self.get_decodable(QUIC_ENR_KEY).and_then(Result::ok) + } + + /// Returns the quic6 port if one is set. + fn quic6(&self) -> Option { + self.get_decodable(QUIC6_ENR_KEY).and_then(Result::ok) + } + + /// Returns a list of multiaddrs if the ENR has an `ip` and either a `tcp`, `quic` or `udp` key **or** an `ip6` and either a `tcp6` `quic6` or `udp6`. /// The vector remains empty if these fields are not defined. fn multiaddr(&self) -> Vec { let mut multiaddrs: Vec = Vec::new(); @@ -59,6 +81,12 @@ impl EnrExt for Enr { multiaddr.push(Protocol::Udp(udp)); multiaddrs.push(multiaddr); } + if let Some(quic) = self.quic4() { + let mut multiaddr: Multiaddr = ip.into(); + multiaddr.push(Protocol::Udp(quic)); + multiaddr.push(Protocol::QuicV1); + multiaddrs.push(multiaddr); + } if let Some(tcp) = self.tcp4() { let mut multiaddr: Multiaddr = ip.into(); @@ -73,6 +101,13 @@ impl EnrExt for Enr { multiaddrs.push(multiaddr); } + if let Some(quic6) = self.quic6() { + let mut multiaddr: Multiaddr = ip6.into(); + multiaddr.push(Protocol::Udp(quic6)); + multiaddr.push(Protocol::QuicV1); + multiaddrs.push(multiaddr); + } + if let Some(tcp6) = self.tcp6() { let mut multiaddr: Multiaddr = ip6.into(); multiaddr.push(Protocol::Tcp(tcp6)); @@ -174,8 +209,30 @@ impl EnrExt for Enr { multiaddrs } + /// Returns a list of multiaddrs if the ENR has an `ip` and a `quic` key **or** an `ip6` and a `quic6`. + fn multiaddr_quic(&self) -> Vec { + let mut multiaddrs: Vec = Vec::new(); + if let Some(quic_port) = self.quic4() { + if let Some(ip) = self.ip4() { + let mut multiaddr: Multiaddr = ip.into(); + multiaddr.push(Protocol::Udp(quic_port)); + multiaddr.push(Protocol::QuicV1); + multiaddrs.push(multiaddr); + } + } + + if let Some(quic6_port) = self.quic6() { + if let Some(ip6) = self.ip6() { + let mut multiaddr: Multiaddr = ip6.into(); + multiaddr.push(Protocol::Udp(quic6_port)); + multiaddr.push(Protocol::QuicV1); + multiaddrs.push(multiaddr); + } + } + multiaddrs + } + /// Returns a list of multiaddrs if the ENR has an `ip` and either a `tcp` or `udp` key **or** an `ip6` and either a `tcp6` or `udp6`. - /// The vector remains empty if these fields are not defined. fn multiaddr_tcp(&self) -> Vec { let mut multiaddrs: Vec = Vec::new(); if let Some(ip) = self.ip4() { diff --git a/beacon_node/lighthouse_network/src/discovery/mod.rs b/beacon_node/lighthouse_network/src/discovery/mod.rs index c3819f1eb94..4d8807336bf 100644 --- a/beacon_node/lighthouse_network/src/discovery/mod.rs +++ b/beacon_node/lighthouse_network/src/discovery/mod.rs @@ -21,7 +21,6 @@ pub use libp2p::identity::{Keypair, PublicKey}; use enr::{ATTESTATION_BITFIELD_ENR_KEY, ETH2_ENR_KEY, SYNC_COMMITTEE_BITFIELD_ENR_KEY}; use futures::prelude::*; use futures::stream::FuturesUnordered; -use libp2p::multiaddr::Protocol; use libp2p::swarm::behaviour::{DialFailure, FromSwarm}; use libp2p::swarm::THandlerInEvent; pub use libp2p::{ @@ -75,7 +74,7 @@ const DURATION_DIFFERENCE: Duration = Duration::from_millis(1); /// of the peer if it is specified. #[derive(Debug)] pub struct DiscoveredPeers { - pub peers: HashMap>, + pub peers: HashMap>, } #[derive(Clone, PartialEq)] @@ -208,7 +207,8 @@ impl Discovery { let local_node_id = local_enr.node_id(); info!(log, "ENR Initialised"; "enr" => local_enr.to_base64(), "seq" => local_enr.seq(), "id"=> %local_enr.node_id(), - "ip4" => ?local_enr.ip4(), "udp4"=> ?local_enr.udp4(), "tcp4" => ?local_enr.tcp4(), "tcp6" => ?local_enr.tcp6(), "udp6" => ?local_enr.udp6() + "ip4" => ?local_enr.ip4(), "udp4"=> ?local_enr.udp4(), "tcp4" => ?local_enr.tcp4(), "tcp6" => ?local_enr.tcp6(), "udp6" => ?local_enr.udp6(), + "quic4" => ?local_enr.quic4(), "quic6" => ?local_enr.quic6() ); // convert the keypair into an ENR key @@ -230,7 +230,8 @@ impl Discovery { "peer_id" => %bootnode_enr.peer_id(), "ip" => ?bootnode_enr.ip4(), "udp" => ?bootnode_enr.udp4(), - "tcp" => ?bootnode_enr.tcp4() + "tcp" => ?bootnode_enr.tcp4(), + "quic" => ?bootnode_enr.quic4() ); let repr = bootnode_enr.to_string(); let _ = discv5.add_enr(bootnode_enr).map_err(|e| { @@ -281,7 +282,8 @@ impl Discovery { "peer_id" => %enr.peer_id(), "ip" => ?enr.ip4(), "udp" => ?enr.udp4(), - "tcp" => ?enr.tcp4() + "tcp" => ?enr.tcp4(), + "quic" => ?enr.quic4() ); let _ = discv5.add_enr(enr).map_err(|e| { error!( @@ -383,20 +385,6 @@ impl Discovery { self.discv5.table_entries_enr() } - /// Returns the ENR of a known peer if it exists. - pub fn enr_of_peer(&mut self, peer_id: &PeerId) -> Option { - // first search the local cache - if let Some(enr) = self.cached_enrs.get(peer_id) { - return Some(enr.clone()); - } - // not in the local cache, look in the routing table - if let Ok(node_id) = enr_ext::peer_id_to_node_id(peer_id) { - self.discv5.find_enr(&node_id) - } else { - None - } - } - /// Updates the local ENR TCP port. /// There currently isn't a case to update the address here. We opt for discovery to /// automatically update the external address. @@ -414,6 +402,23 @@ impl Discovery { Ok(()) } + // TODO: Group these functions here once the ENR is shared across discv5 and lighthouse and + // Lighthouse can modify the ENR directly. + // This currently doesn't support ipv6. All of these functions should be removed and + // addressed properly in the following issue. + // https://github.com/sigp/lighthouse/issues/4706 + pub fn update_enr_quic_port(&mut self, port: u16) -> Result<(), String> { + self.discv5 + .enr_insert("quic", &port) + .map_err(|e| format!("{:?}", e))?; + + // replace the global version + *self.network_globals.local_enr.write() = self.discv5.local_enr(); + // persist modified enr to disk + enr::save_enr_to_disk(Path::new(&self.enr_dir), &self.local_enr(), &self.log); + Ok(()) + } + /// Updates the local ENR UDP socket. /// /// This is with caution. Discovery should automatically maintain this. This should only be @@ -733,23 +738,6 @@ impl Discovery { target_peers: usize, additional_predicate: impl Fn(&Enr) -> bool + Send + 'static, ) { - // Make sure there are subnet queries included - let contains_queries = match &query { - QueryType::Subnet(queries) => !queries.is_empty(), - QueryType::FindPeers => true, - }; - - if !contains_queries { - debug!( - self.log, - "No subnets included in this request. Skipping discovery request." - ); - return; - } - - // Generate a random target node id. - let random_node = NodeId::random(); - let enr_fork_id = match self.local_enr().eth2() { Ok(v) => v, Err(e) => { @@ -773,7 +761,8 @@ impl Discovery { // Build the future let query_future = self .discv5 - .find_node_predicate(random_node, predicate, target_peers) + // Generate a random target node id. + .find_node_predicate(NodeId::random(), predicate, target_peers) .map(|v| QueryResult { query_type: query, result: v, @@ -787,7 +776,7 @@ impl Discovery { fn process_completed_queries( &mut self, query: QueryResult, - ) -> Option>> { + ) -> Option>> { match query.query_type { QueryType::FindPeers => { self.find_peer_active = false; @@ -797,12 +786,14 @@ impl Discovery { } Ok(r) => { debug!(self.log, "Discovery query completed"; "peers_found" => r.len()); - let mut results: HashMap<_, Option> = HashMap::new(); - r.iter().for_each(|enr| { - // cache the found ENR's - self.cached_enrs.put(enr.peer_id(), enr.clone()); - results.insert(enr.peer_id(), None); - }); + let results = r + .into_iter() + .map(|enr| { + // cache the found ENR's + self.cached_enrs.put(enr.peer_id(), enr.clone()); + (enr, None) + }) + .collect(); return Some(results); } Err(e) => { @@ -850,17 +841,17 @@ impl Discovery { let subnet_predicate = subnet_predicate::(vec![query.subnet], &self.log); - r.iter() + r.clone() + .into_iter() .filter(|enr| subnet_predicate(enr)) - .map(|enr| enr.peer_id()) - .for_each(|peer_id| { + .for_each(|enr| { if let Some(v) = metrics::get_int_counter( &metrics::SUBNET_PEERS_FOUND, &[query_str], ) { v.inc(); } - let other_min_ttl = mapped_results.get_mut(&peer_id); + let other_min_ttl = mapped_results.get_mut(&enr); // map peer IDs to the min_ttl furthest in the future match (query.min_ttl, other_min_ttl) { @@ -878,15 +869,11 @@ impl Discovery { } // update the mapping if we have a specified min_ttl (Some(min_ttl), Some(None)) => { - mapped_results.insert(peer_id, Some(min_ttl)); - } - // first seen min_ttl for this enr - (Some(min_ttl), None) => { - mapped_results.insert(peer_id, Some(min_ttl)); + mapped_results.insert(enr, Some(min_ttl)); } // first seen min_ttl for this enr - (None, None) => { - mapped_results.insert(peer_id, None); + (min_ttl, None) => { + mapped_results.insert(enr, min_ttl); } (None, Some(Some(_))) => {} // Don't replace the existing specific min_ttl (None, Some(None)) => {} // No-op because this is a duplicate @@ -910,7 +897,7 @@ impl Discovery { } /// Drives the queries returning any results from completed queries. - fn poll_queries(&mut self, cx: &mut Context) -> Option>> { + fn poll_queries(&mut self, cx: &mut Context) -> Option>> { while let Poll::Ready(Some(query_result)) = self.active_queries.poll_next_unpin(cx) { let result = self.process_completed_queries(query_result); if result.is_some() { @@ -957,23 +944,6 @@ impl NetworkBehaviour for Discovery { ) { } - fn handle_pending_outbound_connection( - &mut self, - _connection_id: ConnectionId, - maybe_peer: Option, - _addresses: &[Multiaddr], - _effective_role: libp2p::core::Endpoint, - ) -> Result, libp2p::swarm::ConnectionDenied> { - if let Some(enr) = maybe_peer.and_then(|peer_id| self.enr_of_peer(&peer_id)) { - // ENR's may have multiple Multiaddrs. The multi-addr associated with the UDP - // port is removed, which is assumed to be associated with the discv5 protocol (and - // therefore irrelevant for other libp2p components). - Ok(enr.multiaddr_tcp()) - } else { - Ok(vec![]) - } - } - // Main execution loop to drive the behaviour fn poll( &mut self, @@ -1047,25 +1017,8 @@ impl NetworkBehaviour for Discovery { // update network globals *self.network_globals.local_enr.write() = enr; // A new UDP socket has been detected. - // Build a multiaddr to report to libp2p - let addr = match socket_addr.ip() { - IpAddr::V4(v4_addr) => { - self.network_globals.listen_port_tcp4().map(|tcp4_port| { - Multiaddr::from(v4_addr).with(Protocol::Tcp(tcp4_port)) - }) - } - IpAddr::V6(v6_addr) => { - self.network_globals.listen_port_tcp6().map(|tcp6_port| { - Multiaddr::from(v6_addr).with(Protocol::Tcp(tcp6_port)) - }) - } - }; - - if let Some(address) = addr { - // NOTE: This doesn't actually track the external TCP port. More sophisticated NAT handling - // should handle this. - return Poll::Ready(ToSwarm::NewExternalAddrCandidate(address)); - } + // NOTE: We assume libp2p itself can keep track of IP changes and we do + // not inform it about IP changes found via discovery. } Discv5Event::EnrAdded { .. } | Discv5Event::TalkRequest(_) @@ -1152,8 +1105,6 @@ mod tests { let log = build_log(slog::Level::Debug, false); let globals = NetworkGlobals::new( enr, - Some(9000), - None, MetaData::V2(MetaDataV2 { seq_number: 0, attnets: Default::default(), @@ -1261,6 +1212,6 @@ mod tests { assert_eq!(results.len(), 2); // when a peer belongs to multiple subnet ids, we use the highest ttl. - assert_eq!(results.get(&enr1.peer_id()).unwrap(), &instant1); + assert_eq!(results.get(&enr1).unwrap(), &instant1); } } diff --git a/beacon_node/lighthouse_network/src/listen_addr.rs b/beacon_node/lighthouse_network/src/listen_addr.rs index 20d87d403cd..53f7d9dacae 100644 --- a/beacon_node/lighthouse_network/src/listen_addr.rs +++ b/beacon_node/lighthouse_network/src/listen_addr.rs @@ -6,14 +6,23 @@ use serde::{Deserialize, Serialize}; /// A listening address composed by an Ip, an UDP port and a TCP port. #[derive(Clone, Debug, Serialize, Deserialize)] pub struct ListenAddr { + /// The IP address we will listen on. pub addr: Ip, - pub udp_port: u16, + /// The UDP port that discovery will listen on. + pub disc_port: u16, + /// The UDP port that QUIC will listen on. + pub quic_port: u16, + /// The TCP port that libp2p will listen on. pub tcp_port: u16, } impl + Clone> ListenAddr { - pub fn udp_socket_addr(&self) -> SocketAddr { - (self.addr.clone().into(), self.udp_port).into() + pub fn discovery_socket_addr(&self) -> SocketAddr { + (self.addr.clone().into(), self.disc_port).into() + } + + pub fn quic_socket_addr(&self) -> SocketAddr { + (self.addr.clone().into(), self.quic_port).into() } pub fn tcp_socket_addr(&self) -> SocketAddr { @@ -46,22 +55,41 @@ impl ListenAddress { } } - /// Returns the TCP addresses. - pub fn tcp_addresses(&self) -> impl Iterator + '_ { - let v4_multiaddr = self + /// Returns the addresses the Swarm will listen on, given the setup. + pub fn libp2p_addresses(&self) -> impl Iterator { + let v4_tcp_multiaddr = self .v4() .map(|v4_addr| Multiaddr::from(v4_addr.addr).with(Protocol::Tcp(v4_addr.tcp_port))); - let v6_multiaddr = self + + let v4_quic_multiaddr = self.v4().map(|v4_addr| { + Multiaddr::from(v4_addr.addr) + .with(Protocol::Udp(v4_addr.quic_port)) + .with(Protocol::QuicV1) + }); + + let v6_quic_multiaddr = self.v6().map(|v6_addr| { + Multiaddr::from(v6_addr.addr) + .with(Protocol::Udp(v6_addr.quic_port)) + .with(Protocol::QuicV1) + }); + + let v6_tcp_multiaddr = self .v6() .map(|v6_addr| Multiaddr::from(v6_addr.addr).with(Protocol::Tcp(v6_addr.tcp_port))); - v4_multiaddr.into_iter().chain(v6_multiaddr) + + v4_tcp_multiaddr + .into_iter() + .chain(v4_quic_multiaddr) + .chain(v6_quic_multiaddr) + .chain(v6_tcp_multiaddr) } #[cfg(test)] pub fn unused_v4_ports() -> Self { ListenAddress::V4(ListenAddr { addr: Ipv4Addr::UNSPECIFIED, - udp_port: unused_port::unused_udp4_port().unwrap(), + disc_port: unused_port::unused_udp4_port().unwrap(), + quic_port: unused_port::unused_udp4_port().unwrap(), tcp_port: unused_port::unused_tcp4_port().unwrap(), }) } @@ -70,7 +98,8 @@ impl ListenAddress { pub fn unused_v6_ports() -> Self { ListenAddress::V6(ListenAddr { addr: Ipv6Addr::UNSPECIFIED, - udp_port: unused_port::unused_udp6_port().unwrap(), + disc_port: unused_port::unused_udp6_port().unwrap(), + quic_port: unused_port::unused_udp6_port().unwrap(), tcp_port: unused_port::unused_tcp6_port().unwrap(), }) } @@ -84,12 +113,14 @@ impl slog::KV for ListenAddress { ) -> slog::Result { if let Some(v4_addr) = self.v4() { serializer.emit_arguments("ip4_address", &format_args!("{}", v4_addr.addr))?; - serializer.emit_u16("udp4_port", v4_addr.udp_port)?; + serializer.emit_u16("disc4_port", v4_addr.disc_port)?; + serializer.emit_u16("quic4_port", v4_addr.quic_port)?; serializer.emit_u16("tcp4_port", v4_addr.tcp_port)?; } if let Some(v6_addr) = self.v6() { serializer.emit_arguments("ip6_address", &format_args!("{}", v6_addr.addr))?; - serializer.emit_u16("udp6_port", v6_addr.udp_port)?; + serializer.emit_u16("disc6_port", v6_addr.disc_port)?; + serializer.emit_u16("quic6_port", v6_addr.quic_port)?; serializer.emit_u16("tcp6_port", v6_addr.tcp_port)?; } slog::Result::Ok(()) diff --git a/beacon_node/lighthouse_network/src/metrics.rs b/beacon_node/lighthouse_network/src/metrics.rs index 58cc9920126..ae02b689d81 100644 --- a/beacon_node/lighthouse_network/src/metrics.rs +++ b/beacon_node/lighthouse_network/src/metrics.rs @@ -14,6 +14,16 @@ lazy_static! { "Count of libp2p peers currently connected" ); + pub static ref TCP_PEERS_CONNECTED: Result = try_create_int_gauge( + "libp2p_tcp_peers", + "Count of libp2p peers currently connected via TCP" + ); + + pub static ref QUIC_PEERS_CONNECTED: Result = try_create_int_gauge( + "libp2p_quic_peers", + "Count of libp2p peers currently connected via QUIC" + ); + pub static ref PEER_CONNECT_EVENT_COUNT: Result = try_create_int_counter( "libp2p_peer_connect_event_total", "Count of libp2p peer connect events (not the current number of connected peers)" diff --git a/beacon_node/lighthouse_network/src/peer_manager/mod.rs b/beacon_node/lighthouse_network/src/peer_manager/mod.rs index 578fb50a32c..7a975e5e8e5 100644 --- a/beacon_node/lighthouse_network/src/peer_manager/mod.rs +++ b/beacon_node/lighthouse_network/src/peer_manager/mod.rs @@ -1,5 +1,6 @@ //! Implementation of Lighthouse's peer management system. +use crate::discovery::enr_ext::EnrExt; use crate::rpc::{GoodbyeReason, MetaData, Protocol, RPCError, RPCResponseErrorCode}; use crate::service::TARGET_SUBNET_PEERS; use crate::{error, metrics, Gossipsub}; @@ -13,7 +14,6 @@ use peerdb::{client::ClientKind, BanOperation, BanResult, ScoreUpdateResult}; use rand::seq::SliceRandom; use slog::{debug, error, trace, warn}; use smallvec::SmallVec; -use std::collections::BTreeMap; use std::{ sync::Arc, time::{Duration, Instant}, @@ -78,7 +78,7 @@ pub struct PeerManager { /// The target number of peers we would like to connect to. target_peers: usize, /// Peers queued to be dialed. - peers_to_dial: BTreeMap>, + peers_to_dial: Vec, /// The number of temporarily banned peers. This is used to prevent instantaneous /// reconnection. // NOTE: This just prevents re-connections. The state of the peer is otherwise unaffected. A @@ -303,16 +303,12 @@ impl PeerManager { /// Peers that have been returned by discovery requests that are suitable for dialing are /// returned here. /// - /// NOTE: By dialing `PeerId`s and not multiaddrs, libp2p requests the multiaddr associated - /// with a new `PeerId` which involves a discovery routing table lookup. We could dial the - /// multiaddr here, however this could relate to duplicate PeerId's etc. If the lookup - /// proves resource constraining, we should switch to multiaddr dialling here. + /// This function decides whether or not to dial these peers. #[allow(clippy::mutable_key_type)] - pub fn peers_discovered(&mut self, results: HashMap>) -> Vec { - let mut to_dial_peers = Vec::with_capacity(4); - + pub fn peers_discovered(&mut self, results: HashMap>) { + let mut to_dial_peers = 0; let connected_or_dialing = self.network_globals.connected_or_dialing_peers(); - for (peer_id, min_ttl) in results { + for (enr, min_ttl) in results { // There are two conditions in deciding whether to dial this peer. // 1. If we are less than our max connections. Discovery queries are executed to reach // our target peers, so its fine to dial up to our max peers (which will get pruned @@ -321,10 +317,8 @@ impl PeerManager { // considered a priority. We have pre-allocated some extra priority slots for these // peers as specified by PRIORITY_PEER_EXCESS. Therefore we dial these peers, even // if we are already at our max_peer limit. - if (min_ttl.is_some() - && connected_or_dialing + to_dial_peers.len() < self.max_priority_peers() - || connected_or_dialing + to_dial_peers.len() < self.max_peers()) - && self.network_globals.peers.read().should_dial(&peer_id) + if min_ttl.is_some() && connected_or_dialing + to_dial_peers < self.max_priority_peers() + || connected_or_dialing + to_dial_peers < self.max_peers() { // This should be updated with the peer dialing. In fact created once the peer is // dialed @@ -332,16 +326,16 @@ impl PeerManager { self.network_globals .peers .write() - .update_min_ttl(&peer_id, min_ttl); + .update_min_ttl(&enr.peer_id(), min_ttl); } - to_dial_peers.push(peer_id); + debug!(self.log, "Dialing discovered peer"; "peer_id" => %enr.peer_id()); + self.dial_peer(enr); + to_dial_peers += 1; } } // Queue another discovery if we need to - self.maintain_peer_count(to_dial_peers.len()); - - to_dial_peers + self.maintain_peer_count(to_dial_peers); } /// A STATUS message has been received from a peer. This resets the status timer. @@ -397,9 +391,16 @@ impl PeerManager { /* Notifications from the Swarm */ - // A peer is being dialed. - pub fn dial_peer(&mut self, peer_id: &PeerId, enr: Option) { - self.peers_to_dial.insert(*peer_id, enr); + /// A peer is being dialed. + pub fn dial_peer(&mut self, peer: Enr) { + if self + .network_globals + .peers + .read() + .should_dial(&peer.peer_id()) + { + self.peers_to_dial.push(peer); + } } /// Reports if a peer is banned or not. diff --git a/beacon_node/lighthouse_network/src/peer_manager/network_behaviour.rs b/beacon_node/lighthouse_network/src/peer_manager/network_behaviour.rs index 70f421681a3..fedb876bb23 100644 --- a/beacon_node/lighthouse_network/src/peer_manager/network_behaviour.rs +++ b/beacon_node/lighthouse_network/src/peer_manager/network_behaviour.rs @@ -3,7 +3,7 @@ use std::task::{Context, Poll}; use futures::StreamExt; -use libp2p::core::ConnectedPoint; +use libp2p::core::{multiaddr, ConnectedPoint}; use libp2p::identity::PeerId; use libp2p::swarm::behaviour::{ConnectionClosed, ConnectionEstablished, DialFailure, FromSwarm}; use libp2p::swarm::dial_opts::{DialOpts, PeerCondition}; @@ -12,6 +12,7 @@ use libp2p::swarm::{ConnectionId, NetworkBehaviour, PollParameters, ToSwarm}; use slog::{debug, error}; use types::EthSpec; +use crate::discovery::enr_ext::EnrExt; use crate::rpc::GoodbyeReason; use crate::types::SyncState; use crate::{metrics, ClearDialError}; @@ -95,11 +96,23 @@ impl NetworkBehaviour for PeerManager { self.events.shrink_to_fit(); } - if let Some((peer_id, maybe_enr)) = self.peers_to_dial.pop_first() { - self.inject_peer_connection(&peer_id, ConnectingType::Dialing, maybe_enr); + if let Some(enr) = self.peers_to_dial.pop() { + let peer_id = enr.peer_id(); + self.inject_peer_connection(&peer_id, ConnectingType::Dialing, Some(enr.clone())); + let quic_multiaddrs = enr.multiaddr_quic(); + if !quic_multiaddrs.is_empty() { + debug!(self.log, "Dialing QUIC supported peer"; "peer_id"=> %peer_id, "quic_multiaddrs" => ?quic_multiaddrs); + } + + // Prioritize Quic connections over Tcp ones. + let multiaddrs = quic_multiaddrs + .into_iter() + .chain(enr.multiaddr_tcp()) + .collect(); return Poll::Ready(ToSwarm::Dial { opts: DialOpts::peer_id(peer_id) .condition(PeerCondition::Disconnected) + .addresses(multiaddrs) .build(), }); } @@ -124,9 +137,11 @@ impl NetworkBehaviour for PeerManager { } FromSwarm::ConnectionClosed(ConnectionClosed { peer_id, + endpoint, + remaining_established, .. - }) => self.on_connection_closed(peer_id, remaining_established), + }) => self.on_connection_closed(peer_id, endpoint, remaining_established), FromSwarm::DialFailure(DialFailure { peer_id, error, @@ -184,7 +199,11 @@ impl PeerManager { endpoint: &ConnectedPoint, other_established: usize, ) { - debug!(self.log, "Connection established"; "peer_id" => %peer_id, "connection" => ?endpoint.to_endpoint()); + debug!(self.log, "Connection established"; "peer_id" => %peer_id, + "multiaddr" => %endpoint.get_remote_address(), + "connection" => ?endpoint.to_endpoint() + ); + if other_established == 0 { self.events.push(PeerManagerEvent::MetaData(peer_id)); } @@ -194,6 +213,34 @@ impl PeerManager { metrics::check_nat(); } + // increment prometheus metrics + if self.metrics_enabled { + let remote_addr = match endpoint { + ConnectedPoint::Dialer { address, .. } => address, + ConnectedPoint::Listener { send_back_addr, .. } => send_back_addr, + }; + match remote_addr.iter().find(|proto| { + matches!( + proto, + multiaddr::Protocol::QuicV1 | multiaddr::Protocol::Tcp(_) + ) + }) { + Some(multiaddr::Protocol::QuicV1) => { + metrics::inc_gauge(&metrics::QUIC_PEERS_CONNECTED); + } + Some(multiaddr::Protocol::Tcp(_)) => { + metrics::inc_gauge(&metrics::TCP_PEERS_CONNECTED); + } + Some(_) => unreachable!(), + None => { + error!(self.log, "Connection established via unknown transport"; "addr" => %remote_addr) + } + }; + + self.update_connected_peer_metrics(); + metrics::inc_counter(&metrics::PEER_CONNECT_EVENT_COUNT); + } + // Check to make sure the peer is not supposed to be banned match self.ban_status(&peer_id) { // TODO: directly emit the ban event? @@ -245,14 +292,15 @@ impl PeerManager { self.events .push(PeerManagerEvent::PeerConnectedOutgoing(peer_id)); } - } - - // increment prometheus metrics - self.update_connected_peer_metrics(); - metrics::inc_counter(&metrics::PEER_CONNECT_EVENT_COUNT); + }; } - fn on_connection_closed(&mut self, peer_id: PeerId, remaining_established: usize) { + fn on_connection_closed( + &mut self, + peer_id: PeerId, + endpoint: &ConnectedPoint, + remaining_established: usize, + ) { if remaining_established > 0 { return; } @@ -278,9 +326,31 @@ impl PeerManager { // reference so that peer manager can track this peer. self.inject_disconnect(&peer_id); + let remote_addr = match endpoint { + ConnectedPoint::Listener { send_back_addr, .. } => send_back_addr, + ConnectedPoint::Dialer { address, .. } => address, + }; + // Update the prometheus metrics - self.update_connected_peer_metrics(); - metrics::inc_counter(&metrics::PEER_DISCONNECT_EVENT_COUNT); + if self.metrics_enabled { + match remote_addr.iter().find(|proto| { + matches!( + proto, + multiaddr::Protocol::QuicV1 | multiaddr::Protocol::Tcp(_) + ) + }) { + Some(multiaddr::Protocol::QuicV1) => { + metrics::dec_gauge(&metrics::QUIC_PEERS_CONNECTED); + } + Some(multiaddr::Protocol::Tcp(_)) => { + metrics::dec_gauge(&metrics::TCP_PEERS_CONNECTED); + } + // If it's an unknown protocol we already logged when connection was established. + _ => {} + }; + self.update_connected_peer_metrics(); + metrics::inc_counter(&metrics::PEER_DISCONNECT_EVENT_COUNT); + } } /// A dial attempt has failed. diff --git a/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs b/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs index 52f0bbd9dfc..4a1efe8f2e9 100644 --- a/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs +++ b/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs @@ -1,16 +1,11 @@ -use crate::{ - metrics, - multiaddr::{Multiaddr, Protocol}, - types::Subnet, - Enr, Gossipsub, PeerId, -}; +use crate::{metrics, multiaddr::Multiaddr, types::Subnet, Enr, Gossipsub, PeerId}; use peer_info::{ConnectionDirection, PeerConnectionStatus, PeerInfo}; use rand::seq::SliceRandom; use score::{PeerAction, ReportSource, Score, ScoreState}; use slog::{crit, debug, error, trace, warn}; use std::cmp::Ordering; use std::collections::{HashMap, HashSet}; -use std::net::{IpAddr, SocketAddr}; +use std::net::IpAddr; use std::time::Instant; use sync_status::SyncStatus; use types::EthSpec; @@ -764,28 +759,10 @@ impl PeerDB { | PeerConnectionStatus::Dialing { .. } => {} } - // Add the seen ip address and port to the peer's info - let socket_addr = match seen_address.iter().fold( - (None, None), - |(found_ip, found_port), protocol| match protocol { - Protocol::Ip4(ip) => (Some(ip.into()), found_port), - Protocol::Ip6(ip) => (Some(ip.into()), found_port), - Protocol::Tcp(port) => (found_ip, Some(port)), - _ => (found_ip, found_port), - }, - ) { - (Some(ip), Some(port)) => Some(SocketAddr::new(ip, port)), - (Some(_ip), None) => { - crit!(self.log, "Connected peer has an IP but no TCP port"; "peer_id" => %peer_id); - None - } - _ => None, - }; - // Update the connection state match direction { - ConnectionDirection::Incoming => info.connect_ingoing(socket_addr), - ConnectionDirection::Outgoing => info.connect_outgoing(socket_addr), + ConnectionDirection::Incoming => info.connect_ingoing(Some(seen_address)), + ConnectionDirection::Outgoing => info.connect_outgoing(Some(seen_address)), } } @@ -1274,6 +1251,7 @@ impl BannedPeersCount { #[cfg(test)] mod tests { use super::*; + use libp2p::core::multiaddr::Protocol; use libp2p::core::Multiaddr; use slog::{o, Drain}; use std::net::{Ipv4Addr, Ipv6Addr}; diff --git a/beacon_node/lighthouse_network/src/peer_manager/peerdb/peer_info.rs b/beacon_node/lighthouse_network/src/peer_manager/peerdb/peer_info.rs index 555266d0e2e..44c54511ddc 100644 --- a/beacon_node/lighthouse_network/src/peer_manager/peerdb/peer_info.rs +++ b/beacon_node/lighthouse_network/src/peer_manager/peerdb/peer_info.rs @@ -2,15 +2,15 @@ use super::client::Client; use super::score::{PeerAction, Score, ScoreState}; use super::sync_status::SyncStatus; use crate::discovery::Eth2Enr; -use crate::Multiaddr; use crate::{rpc::MetaData, types::Subnet}; use discv5::Enr; +use libp2p::core::multiaddr::{Multiaddr, Protocol}; use serde::{ ser::{SerializeStruct, Serializer}, Serialize, }; use std::collections::HashSet; -use std::net::{IpAddr, SocketAddr}; +use std::net::IpAddr; use std::time::Instant; use strum::AsRefStr; use types::EthSpec; @@ -29,9 +29,9 @@ pub struct PeerInfo { /// The known listening addresses of this peer. This is given by identify and can be arbitrary /// (including local IPs). listening_addresses: Vec, - /// This is addresses we have physically seen and this is what we use for banning/un-banning + /// These are the multiaddrs we have physically seen and is what we use for banning/un-banning /// peers. - seen_addresses: HashSet, + seen_multiaddrs: HashSet, /// The current syncing state of the peer. The state may be determined after it's initial /// connection. sync_status: SyncStatus, @@ -60,7 +60,7 @@ impl Default for PeerInfo { client: Client::default(), connection_status: Default::default(), listening_addresses: Vec::new(), - seen_addresses: HashSet::new(), + seen_multiaddrs: HashSet::new(), subnets: HashSet::new(), sync_status: SyncStatus::Unknown, meta_data: None, @@ -227,15 +227,21 @@ impl PeerInfo { } /// Returns the seen addresses of the peer. - pub fn seen_addresses(&self) -> impl Iterator + '_ { - self.seen_addresses.iter() + pub fn seen_multiaddrs(&self) -> impl Iterator + '_ { + self.seen_multiaddrs.iter() } /// Returns a list of seen IP addresses for the peer. pub fn seen_ip_addresses(&self) -> impl Iterator + '_ { - self.seen_addresses - .iter() - .map(|socket_addr| socket_addr.ip()) + self.seen_multiaddrs.iter().filter_map(|multiaddr| { + multiaddr.iter().find_map(|protocol| { + match protocol { + Protocol::Ip4(ip) => Some(ip.into()), + Protocol::Ip6(ip) => Some(ip.into()), + _ => None, // Only care for IP addresses + } + }) + }) } /// Returns the connection status of the peer. @@ -415,7 +421,7 @@ impl PeerInfo { /// Modifies the status to Connected and increases the number of ingoing /// connections by one - pub(super) fn connect_ingoing(&mut self, seen_address: Option) { + pub(super) fn connect_ingoing(&mut self, seen_multiaddr: Option) { match &mut self.connection_status { Connected { n_in, .. } => *n_in += 1, Disconnected { .. } @@ -428,14 +434,14 @@ impl PeerInfo { } } - if let Some(socket_addr) = seen_address { - self.seen_addresses.insert(socket_addr); + if let Some(multiaddr) = seen_multiaddr { + self.seen_multiaddrs.insert(multiaddr); } } /// Modifies the status to Connected and increases the number of outgoing /// connections by one - pub(super) fn connect_outgoing(&mut self, seen_address: Option) { + pub(super) fn connect_outgoing(&mut self, seen_multiaddr: Option) { match &mut self.connection_status { Connected { n_out, .. } => *n_out += 1, Disconnected { .. } @@ -447,8 +453,8 @@ impl PeerInfo { self.connection_direction = Some(ConnectionDirection::Outgoing); } } - if let Some(ip_addr) = seen_address { - self.seen_addresses.insert(ip_addr); + if let Some(multiaddr) = seen_multiaddr { + self.seen_multiaddrs.insert(multiaddr); } } diff --git a/beacon_node/lighthouse_network/src/service/mod.rs b/beacon_node/lighthouse_network/src/service/mod.rs index 133aa646db6..7e4bdff513b 100644 --- a/beacon_node/lighthouse_network/src/service/mod.rs +++ b/beacon_node/lighthouse_network/src/service/mod.rs @@ -160,8 +160,6 @@ impl Network { let meta_data = utils::load_or_build_metadata(&config.network_dir, &log); let globals = NetworkGlobals::new( enr, - config.listen_addrs().v4().map(|v4_addr| v4_addr.tcp_port), - config.listen_addrs().v6().map(|v6_addr| v6_addr.tcp_port), meta_data, config .trusted_peers @@ -345,8 +343,9 @@ impl Network { let (swarm, bandwidth) = { // Set up the transport - tcp/ws with noise and mplex - let (transport, bandwidth) = build_transport(local_keypair.clone()) - .map_err(|e| format!("Failed to build transport: {:?}", e))?; + let (transport, bandwidth) = + build_transport(local_keypair.clone(), !config.disable_quic_support) + .map_err(|e| format!("Failed to build transport: {:?}", e))?; // use the executor for libp2p struct Executor(task_executor::TaskExecutor); @@ -401,9 +400,16 @@ impl Network { async fn start(&mut self, config: &crate::NetworkConfig) -> error::Result<()> { let enr = self.network_globals.local_enr(); info!(self.log, "Libp2p Starting"; "peer_id" => %enr.peer_id(), "bandwidth_config" => format!("{}-{}", config.network_load, NetworkLoad::from(config.network_load).name)); - debug!(self.log, "Attempting to open listening ports"; config.listen_addrs(), "discovery_enabled" => !config.disable_discovery); + debug!(self.log, "Attempting to open listening ports"; config.listen_addrs(), "discovery_enabled" => !config.disable_discovery, "quic_enabled" => !config.disable_quic_support); + + for listen_multiaddr in config.listen_addrs().libp2p_addresses() { + // If QUIC is disabled, ignore listening on QUIC ports + if config.disable_quic_support + && listen_multiaddr.iter().any(|v| v == MProtocol::QuicV1) + { + continue; + } - for listen_multiaddr in config.listen_addrs().tcp_addresses() { match self.swarm.listen_on(listen_multiaddr.clone()) { Ok(_) => { let mut log_address = listen_multiaddr; @@ -444,6 +450,20 @@ impl Network { boot_nodes.dedup(); for bootnode_enr in boot_nodes { + // If QUIC is enabled, attempt QUIC connections first + if !config.disable_quic_support { + for quic_multiaddr in &bootnode_enr.multiaddr_quic() { + if !self + .network_globals + .peers + .read() + .is_connected_or_dialing(&bootnode_enr.peer_id()) + { + dial(quic_multiaddr.clone()); + } + } + } + for multiaddr in &bootnode_enr.multiaddr() { // ignore udp multiaddr if it exists let components = multiaddr.iter().collect::>(); @@ -1013,30 +1033,27 @@ impl Network { } } - /// Dial cached enrs in discovery service that are in the given `subnet_id` and aren't + /// Dial cached Enrs in discovery service that are in the given `subnet_id` and aren't /// in Connected, Dialing or Banned state. fn dial_cached_enrs_in_subnet(&mut self, subnet: Subnet) { let predicate = subnet_predicate::(vec![subnet], &self.log); - let peers_to_dial: Vec = self + let peers_to_dial: Vec = self .discovery() .cached_enrs() - .filter_map(|(peer_id, enr)| { - let peers = self.network_globals.peers.read(); - if predicate(enr) && peers.should_dial(peer_id) { - Some(*peer_id) + .filter_map(|(_peer_id, enr)| { + if predicate(enr) { + Some(enr.clone()) } else { None } }) .collect(); - for peer_id in peers_to_dial { - debug!(self.log, "Dialing cached ENR peer"; "peer_id" => %peer_id); - // Remove the ENR from the cache to prevent continual re-dialing on disconnects - - self.discovery_mut().remove_cached_enr(&peer_id); - // For any dial event, inform the peer manager - let enr = self.discovery_mut().enr_of_peer(&peer_id); - self.peer_manager_mut().dial_peer(&peer_id, enr); + + // Remove the ENR from the cache to prevent continual re-dialing on disconnects + for enr in peers_to_dial { + debug!(self.log, "Dialing cached ENR peer"; "peer_id" => %enr.peer_id()); + self.discovery_mut().remove_cached_enr(&enr.peer_id()); + self.peer_manager_mut().dial_peer(enr); } } @@ -1326,22 +1343,6 @@ impl Network { } } - /// Handle a discovery event. - fn inject_discovery_event( - &mut self, - event: DiscoveredPeers, - ) -> Option> { - let DiscoveredPeers { peers } = event; - let to_dial_peers = self.peer_manager_mut().peers_discovered(peers); - for peer_id in to_dial_peers { - debug!(self.log, "Dialing discovered peer"; "peer_id" => %peer_id); - // For any dial event, inform the peer manager - let enr = self.discovery_mut().enr_of_peer(&peer_id); - self.peer_manager_mut().dial_peer(&peer_id, enr); - } - None - } - /// Handle an identify event. fn inject_identify_event( &mut self, @@ -1442,7 +1443,14 @@ impl Network { BehaviourEvent::BannedPeers(void) => void::unreachable(void), BehaviourEvent::Gossipsub(ge) => self.inject_gs_event(ge), BehaviourEvent::Eth2Rpc(re) => self.inject_rpc_event(re), - BehaviourEvent::Discovery(de) => self.inject_discovery_event(de), + // Inform the peer manager about discovered peers. + // + // The peer manager will subsequently decide which peers need to be dialed and then dial + // them. + BehaviourEvent::Discovery(DiscoveredPeers { peers }) => { + self.peer_manager_mut().peers_discovered(peers); + None + } BehaviourEvent::Identify(ie) => self.inject_identify_event(ie), BehaviourEvent::PeerManager(pe) => self.inject_pm_event(pe), BehaviourEvent::ConnectionLimits(le) => void::unreachable(le), @@ -1474,7 +1482,7 @@ impl Network { format!("Dialing local peer id {endpoint:?}") } libp2p::swarm::ListenError::Denied { cause } => { - format!("Connection was denied with cause {cause}") + format!("Connection was denied with cause: {cause:?}") } libp2p::swarm::ListenError::Transport(t) => match t { libp2p::TransportError::MultiaddrNotSupported(m) => { @@ -1524,13 +1532,7 @@ impl Network { None } } - SwarmEvent::Dialing { - peer_id, - connection_id: _, - } => { - debug!(self.log, "Swarm Dialing"; "peer_id" => ?peer_id); - None - } + SwarmEvent::Dialing { .. } => None, }; if let Some(ev) = maybe_event { diff --git a/beacon_node/lighthouse_network/src/service/utils.rs b/beacon_node/lighthouse_network/src/service/utils.rs index 65982ce941c..0ba58e68b7d 100644 --- a/beacon_node/lighthouse_network/src/service/utils.rs +++ b/beacon_node/lighthouse_network/src/service/utils.rs @@ -4,11 +4,13 @@ use crate::types::{ error, EnrAttestationBitfield, EnrSyncCommitteeBitfield, GossipEncoding, GossipKind, }; use crate::{GossipTopic, NetworkConfig}; +use futures::future::Either; use libp2p::bandwidth::BandwidthSinks; use libp2p::core::{multiaddr::Multiaddr, muxing::StreamMuxerBox, transport::Boxed}; use libp2p::gossipsub; use libp2p::identity::{secp256k1, Keypair}; use libp2p::{core, noise, yamux, PeerId, Transport, TransportExt}; +use libp2p_quic; use prometheus_client::registry::Registry; use slog::{debug, warn}; use ssz::Decode; @@ -37,19 +39,12 @@ pub struct Context<'a> { type BoxedTransport = Boxed<(PeerId, StreamMuxerBox)>; -/// The implementation supports TCP/IP, WebSockets over TCP/IP, noise as the encryption layer, and -/// mplex as the multiplexing layer. +/// The implementation supports TCP/IP, QUIC (experimental) over UDP, noise as the encryption layer, and +/// mplex/yamux as the multiplexing layer (when using TCP). pub fn build_transport( local_private_key: Keypair, + quic_support: bool, ) -> std::io::Result<(BoxedTransport, Arc)> { - let tcp = libp2p::tcp::tokio::Transport::new(libp2p::tcp::Config::default().nodelay(true)); - let transport = libp2p::dns::TokioDnsConfig::system(tcp)?; - #[cfg(feature = "libp2p-websocket")] - let transport = { - let trans_clone = transport.clone(); - transport.or_transport(libp2p::websocket::WsConfig::new(trans_clone)) - }; - // mplex config let mut mplex_config = libp2p_mplex::MplexConfig::new(); mplex_config.set_max_buffer_size(256); @@ -58,18 +53,34 @@ pub fn build_transport( // yamux config let mut yamux_config = yamux::Config::default(); yamux_config.set_window_update_mode(yamux::WindowUpdateMode::on_read()); - let (transport, bandwidth) = transport + + // Creates the TCP transport layer + let tcp = libp2p::tcp::tokio::Transport::new(libp2p::tcp::Config::default().nodelay(true)) .upgrade(core::upgrade::Version::V1) .authenticate(generate_noise_config(&local_private_key)) .multiplex(core::upgrade::SelectUpgrade::new( yamux_config, mplex_config, )) - .timeout(Duration::from_secs(10)) - .boxed() - .with_bandwidth_logging(); + .timeout(Duration::from_secs(10)); + + let (transport, bandwidth) = if quic_support { + // Enables Quic + // The default quic configuration suits us for now. + let quic_config = libp2p_quic::Config::new(&local_private_key); + tcp.or_transport(libp2p_quic::tokio::Transport::new(quic_config)) + .map(|either_output, _| match either_output { + Either::Left((peer_id, muxer)) => (peer_id, StreamMuxerBox::new(muxer)), + Either::Right((peer_id, muxer)) => (peer_id, StreamMuxerBox::new(muxer)), + }) + .with_bandwidth_logging() + } else { + tcp.with_bandwidth_logging() + }; + + // // Enables DNS over the transport. + let transport = libp2p::dns::TokioDnsConfig::system(transport)?.boxed(); - // Authentication Ok((transport, bandwidth)) } diff --git a/beacon_node/lighthouse_network/src/types/globals.rs b/beacon_node/lighthouse_network/src/types/globals.rs index 97eaaa0051b..b2b605e8aec 100644 --- a/beacon_node/lighthouse_network/src/types/globals.rs +++ b/beacon_node/lighthouse_network/src/types/globals.rs @@ -16,10 +16,6 @@ pub struct NetworkGlobals { pub peer_id: RwLock, /// Listening multiaddrs. pub listen_multiaddrs: RwLock>, - /// The TCP port that the libp2p service is listening on over Ipv4. - listen_port_tcp4: Option, - /// The TCP port that the libp2p service is listening on over Ipv6. - listen_port_tcp6: Option, /// The collection of known peers. pub peers: RwLock>, // The local meta data of our node. @@ -35,8 +31,6 @@ pub struct NetworkGlobals { impl NetworkGlobals { pub fn new( enr: Enr, - listen_port_tcp4: Option, - listen_port_tcp6: Option, local_metadata: MetaData, trusted_peers: Vec, disable_peer_scoring: bool, @@ -46,8 +40,6 @@ impl NetworkGlobals { local_enr: RwLock::new(enr.clone()), peer_id: RwLock::new(enr.peer_id()), listen_multiaddrs: RwLock::new(Vec::new()), - listen_port_tcp4, - listen_port_tcp6, local_metadata: RwLock::new(local_metadata), peers: RwLock::new(PeerDB::new(trusted_peers, disable_peer_scoring, log)), gossipsub_subscriptions: RwLock::new(HashSet::new()), @@ -72,16 +64,6 @@ impl NetworkGlobals { self.listen_multiaddrs.read().clone() } - /// Returns the libp2p TCP port that this node has been configured to listen on. - pub fn listen_port_tcp4(&self) -> Option { - self.listen_port_tcp4 - } - - /// Returns the UDP discovery port that this node has been configured to listen on. - pub fn listen_port_tcp6(&self) -> Option { - self.listen_port_tcp6 - } - /// Returns the number of libp2p connected peers. pub fn connected_peers(&self) -> usize { self.peers.read().connected_peer_ids().count() @@ -139,8 +121,6 @@ impl NetworkGlobals { let enr = discv5::enr::EnrBuilder::new("v4").build(&enr_key).unwrap(); NetworkGlobals::new( enr, - Some(9000), - None, MetaData::V2(MetaDataV2 { seq_number: 0, attnets: Default::default(), diff --git a/beacon_node/lighthouse_network/tests/common.rs b/beacon_node/lighthouse_network/tests/common.rs index 7f3a36acc56..449280915e9 100644 --- a/beacon_node/lighthouse_network/tests/common.rs +++ b/beacon_node/lighthouse_network/tests/common.rs @@ -70,15 +70,19 @@ pub fn build_log(level: slog::Level, enabled: bool) -> slog::Logger { } } -pub fn build_config(port: u16, mut boot_nodes: Vec) -> NetworkConfig { +pub fn build_config(mut boot_nodes: Vec) -> NetworkConfig { let mut config = NetworkConfig::default(); + + // Find unused ports by using the 0 port. + let port = 0; + + let random_path: u16 = rand::random(); let path = TempBuilder::new() - .prefix(&format!("libp2p_test{}", port)) + .prefix(&format!("libp2p_test_{}", random_path)) .tempdir() .unwrap(); - config.set_ipv4_listening_address(std::net::Ipv4Addr::UNSPECIFIED, port, port); - config.enr_udp4_port = if port == 0 { None } else { Some(port) }; + config.set_ipv4_listening_address(std::net::Ipv4Addr::UNSPECIFIED, port, port, port); config.enr_address = (Some(std::net::Ipv4Addr::LOCALHOST), None); config.boot_nodes_enr.append(&mut boot_nodes); config.network_dir = path.into_path(); @@ -98,8 +102,7 @@ pub async fn build_libp2p_instance( fork_name: ForkName, spec: &ChainSpec, ) -> Libp2pInstance { - let port = 0; - let config = build_config(port, boot_nodes); + let config = build_config(boot_nodes); // launch libp2p service let (signal, exit) = exit_future::signal(); @@ -126,6 +129,12 @@ pub fn get_enr(node: &LibP2PService) -> Enr { node.local_enr() } +// Protocol for the node pair connection. +pub enum Protocol { + Tcp, + Quic, +} + // Constructs a pair of nodes with separate loggers. The sender dials the receiver. // This returns a (sender, receiver) pair. #[allow(dead_code)] @@ -134,6 +143,7 @@ pub async fn build_node_pair( log: &slog::Logger, fork_name: ForkName, spec: &ChainSpec, + protocol: Protocol, ) -> (Libp2pInstance, Libp2pInstance) { let sender_log = log.new(o!("who" => "sender")); let receiver_log = log.new(o!("who" => "receiver")); @@ -145,14 +155,45 @@ pub async fn build_node_pair( let sender_fut = async { loop { if let NetworkEvent::NewListenAddr(addr) = sender.next_event().await { - return addr; + // Only end once we've listened on the protocol we care about + match protocol { + Protocol::Tcp => { + if addr.iter().any(|multiaddr_proto| { + matches!(multiaddr_proto, libp2p::multiaddr::Protocol::Tcp(_)) + }) { + return addr; + } + } + Protocol::Quic => { + if addr.iter().any(|multiaddr_proto| { + matches!(multiaddr_proto, libp2p::multiaddr::Protocol::QuicV1) + }) { + return addr; + } + } + } } } }; let receiver_fut = async { loop { if let NetworkEvent::NewListenAddr(addr) = receiver.next_event().await { - return addr; + match protocol { + Protocol::Tcp => { + if addr.iter().any(|multiaddr_proto| { + matches!(multiaddr_proto, libp2p::multiaddr::Protocol::Tcp(_)) + }) { + return addr; + } + } + Protocol::Quic => { + if addr.iter().any(|multiaddr_proto| { + matches!(multiaddr_proto, libp2p::multiaddr::Protocol::QuicV1) + }) { + return addr; + } + } + } } } }; diff --git a/beacon_node/lighthouse_network/tests/rpc_tests.rs b/beacon_node/lighthouse_network/tests/rpc_tests.rs index 05fa5ab8542..795afd06b9e 100644 --- a/beacon_node/lighthouse_network/tests/rpc_tests.rs +++ b/beacon_node/lighthouse_network/tests/rpc_tests.rs @@ -1,4 +1,8 @@ #![cfg(test)] + +mod common; + +use common::Protocol; use lighthouse_network::rpc::methods::*; use lighthouse_network::{rpc::max_rpc_size, NetworkEvent, ReportSource, Request, Response}; use slog::{debug, warn, Level}; @@ -14,8 +18,6 @@ use types::{ Slot, }; -mod common; - type E = MinimalEthSpec; /// Merge block with length < max_rpc_size. @@ -49,7 +51,7 @@ fn merge_block_large(fork_context: &ForkContext, spec: &ChainSpec) -> BeaconBloc // Tests the STATUS RPC message #[test] #[allow(clippy::single_match)] -fn test_status_rpc() { +fn test_tcp_status_rpc() { // set up the logging. The level and enabled logging or not let log_level = Level::Debug; let enable_logging = false; @@ -62,8 +64,14 @@ fn test_status_rpc() { rt.block_on(async { // get sender/receiver - let (mut sender, mut receiver) = - common::build_node_pair(Arc::downgrade(&rt), &log, ForkName::Base, &spec).await; + let (mut sender, mut receiver) = common::build_node_pair( + Arc::downgrade(&rt), + &log, + ForkName::Base, + &spec, + Protocol::Tcp, + ) + .await; // Dummy STATUS RPC message let rpc_request = Request::Status(StatusMessage { @@ -141,7 +149,7 @@ fn test_status_rpc() { // Tests a streamed BlocksByRange RPC Message #[test] #[allow(clippy::single_match)] -fn test_blocks_by_range_chunked_rpc() { +fn test_tcp_blocks_by_range_chunked_rpc() { // set up the logging. The level and enabled logging or not let log_level = Level::Debug; let enable_logging = false; @@ -156,8 +164,14 @@ fn test_blocks_by_range_chunked_rpc() { rt.block_on(async { // get sender/receiver - let (mut sender, mut receiver) = - common::build_node_pair(Arc::downgrade(&rt), &log, ForkName::Merge, &spec).await; + let (mut sender, mut receiver) = common::build_node_pair( + Arc::downgrade(&rt), + &log, + ForkName::Merge, + &spec, + Protocol::Tcp, + ) + .await; // BlocksByRange Request let rpc_request = Request::BlocksByRange(BlocksByRangeRequest::new(0, messages_to_send)); @@ -267,7 +281,7 @@ fn test_blocks_by_range_chunked_rpc() { // Tests rejection of blocks over `MAX_RPC_SIZE`. #[test] #[allow(clippy::single_match)] -fn test_blocks_by_range_over_limit() { +fn test_tcp_blocks_by_range_over_limit() { // set up the logging. The level and enabled logging or not let log_level = Level::Debug; let enable_logging = false; @@ -282,8 +296,14 @@ fn test_blocks_by_range_over_limit() { rt.block_on(async { // get sender/receiver - let (mut sender, mut receiver) = - common::build_node_pair(Arc::downgrade(&rt), &log, ForkName::Merge, &spec).await; + let (mut sender, mut receiver) = common::build_node_pair( + Arc::downgrade(&rt), + &log, + ForkName::Merge, + &spec, + Protocol::Tcp, + ) + .await; // BlocksByRange Request let rpc_request = Request::BlocksByRange(BlocksByRangeRequest::new(0, messages_to_send)); @@ -350,7 +370,7 @@ fn test_blocks_by_range_over_limit() { // Tests that a streamed BlocksByRange RPC Message terminates when all expected chunks were received #[test] -fn test_blocks_by_range_chunked_rpc_terminates_correctly() { +fn test_tcp_blocks_by_range_chunked_rpc_terminates_correctly() { // set up the logging. The level and enabled logging or not let log_level = Level::Debug; let enable_logging = false; @@ -366,8 +386,14 @@ fn test_blocks_by_range_chunked_rpc_terminates_correctly() { rt.block_on(async { // get sender/receiver - let (mut sender, mut receiver) = - common::build_node_pair(Arc::downgrade(&rt), &log, ForkName::Base, &spec).await; + let (mut sender, mut receiver) = common::build_node_pair( + Arc::downgrade(&rt), + &log, + ForkName::Base, + &spec, + Protocol::Tcp, + ) + .await; // BlocksByRange Request let rpc_request = Request::BlocksByRange(BlocksByRangeRequest::new(0, messages_to_send)); @@ -476,7 +502,7 @@ fn test_blocks_by_range_chunked_rpc_terminates_correctly() { // Tests an empty response to a BlocksByRange RPC Message #[test] #[allow(clippy::single_match)] -fn test_blocks_by_range_single_empty_rpc() { +fn test_tcp_blocks_by_range_single_empty_rpc() { // set up the logging. The level and enabled logging or not let log_level = Level::Trace; let enable_logging = false; @@ -488,8 +514,14 @@ fn test_blocks_by_range_single_empty_rpc() { rt.block_on(async { // get sender/receiver - let (mut sender, mut receiver) = - common::build_node_pair(Arc::downgrade(&rt), &log, ForkName::Base, &spec).await; + let (mut sender, mut receiver) = common::build_node_pair( + Arc::downgrade(&rt), + &log, + ForkName::Base, + &spec, + Protocol::Tcp, + ) + .await; // BlocksByRange Request let rpc_request = Request::BlocksByRange(BlocksByRangeRequest::new(0, 10)); @@ -576,7 +608,7 @@ fn test_blocks_by_range_single_empty_rpc() { // serves to test the snappy framing format as well. #[test] #[allow(clippy::single_match)] -fn test_blocks_by_root_chunked_rpc() { +fn test_tcp_blocks_by_root_chunked_rpc() { // set up the logging. The level and enabled logging or not let log_level = Level::Debug; let enable_logging = false; @@ -589,8 +621,14 @@ fn test_blocks_by_root_chunked_rpc() { let rt = Arc::new(Runtime::new().unwrap()); // get sender/receiver rt.block_on(async { - let (mut sender, mut receiver) = - common::build_node_pair(Arc::downgrade(&rt), &log, ForkName::Merge, &spec).await; + let (mut sender, mut receiver) = common::build_node_pair( + Arc::downgrade(&rt), + &log, + ForkName::Merge, + &spec, + Protocol::Tcp, + ) + .await; // BlocksByRoot Request let rpc_request = @@ -702,7 +740,7 @@ fn test_blocks_by_root_chunked_rpc() { // Tests a streamed, chunked BlocksByRoot RPC Message terminates when all expected reponses have been received #[test] -fn test_blocks_by_root_chunked_rpc_terminates_correctly() { +fn test_tcp_blocks_by_root_chunked_rpc_terminates_correctly() { // set up the logging. The level and enabled logging or not let log_level = Level::Debug; let enable_logging = false; @@ -716,8 +754,14 @@ fn test_blocks_by_root_chunked_rpc_terminates_correctly() { let rt = Arc::new(Runtime::new().unwrap()); // get sender/receiver rt.block_on(async { - let (mut sender, mut receiver) = - common::build_node_pair(Arc::downgrade(&rt), &log, ForkName::Base, &spec).await; + let (mut sender, mut receiver) = common::build_node_pair( + Arc::downgrade(&rt), + &log, + ForkName::Base, + &spec, + Protocol::Tcp, + ) + .await; // BlocksByRoot Request let rpc_request = @@ -833,14 +877,9 @@ fn test_blocks_by_root_chunked_rpc_terminates_correctly() { }) } -// Tests a Goodbye RPC message -#[test] -#[allow(clippy::single_match)] -fn test_goodbye_rpc() { - // set up the logging. The level and enabled logging or not - let log_level = Level::Trace; - let enable_logging = false; - +/// Establishes a pair of nodes and disconnects the pair based on the selected protocol via an RPC +/// Goodbye message. +fn goodbye_test(log_level: Level, enable_logging: bool, protocol: Protocol) { let log = common::build_log(log_level, enable_logging); let rt = Arc::new(Runtime::new().unwrap()); @@ -850,7 +889,8 @@ fn test_goodbye_rpc() { // get sender/receiver rt.block_on(async { let (mut sender, mut receiver) = - common::build_node_pair(Arc::downgrade(&rt), &log, ForkName::Base, &spec).await; + common::build_node_pair(Arc::downgrade(&rt), &log, ForkName::Base, &spec, protocol) + .await; // build the sender future let sender_future = async { @@ -876,12 +916,9 @@ fn test_goodbye_rpc() { // build the receiver future let receiver_future = async { loop { - match receiver.next_event().await { - NetworkEvent::PeerDisconnected(_) => { - // Should receive sent RPC request - return; - } - _ => {} // Ignore other events + if let NetworkEvent::PeerDisconnected(_) = receiver.next_event().await { + // Should receive sent RPC request + return; } } }; @@ -896,3 +933,23 @@ fn test_goodbye_rpc() { } }) } + +// Tests a Goodbye RPC message +#[test] +#[allow(clippy::single_match)] +fn tcp_test_goodbye_rpc() { + // set up the logging. The level and enabled logging or not + let log_level = Level::Debug; + let enable_logging = true; + goodbye_test(log_level, enable_logging, Protocol::Tcp); +} + +// Tests a Goodbye RPC message +#[test] +#[allow(clippy::single_match)] +fn quic_test_goodbye_rpc() { + // set up the logging. The level and enabled logging or not + let log_level = Level::Debug; + let enable_logging = true; + goodbye_test(log_level, enable_logging, Protocol::Quic); +} diff --git a/beacon_node/network/src/nat.rs b/beacon_node/network/src/nat.rs index 9bf123e8dec..d011ac42e84 100644 --- a/beacon_node/network/src/nat.rs +++ b/beacon_node/network/src/nat.rs @@ -12,20 +12,49 @@ use types::EthSpec; /// Configuration required to construct the UPnP port mappings. pub struct UPnPConfig { - /// The local tcp port. + /// The local TCP port. tcp_port: u16, - /// The local udp port. - udp_port: u16, + /// The local UDP discovery port. + disc_port: u16, + /// The local UDP quic port. + quic_port: u16, /// Whether discovery is enabled or not. disable_discovery: bool, + /// Whether quic is enabled or not. + disable_quic_support: bool, +} + +/// Contains mappings that managed to be established. +#[derive(Default, Debug)] +pub struct EstablishedUPnPMappings { + /// A TCP port mapping for libp2p. + pub tcp_port: Option, + /// A UDP port for the QUIC libp2p transport. + pub udp_quic_port: Option, + /// A UDP port for discv5. + pub udp_disc_port: Option, +} + +impl EstablishedUPnPMappings { + /// Returns true if at least one value is set. + pub fn is_some(&self) -> bool { + self.tcp_port.is_some() || self.udp_quic_port.is_some() || self.udp_disc_port.is_some() + } + + // Iterator over the UDP ports + pub fn udp_ports(&self) -> impl Iterator { + self.udp_quic_port.iter().chain(self.udp_disc_port.iter()) + } } impl UPnPConfig { pub fn from_config(config: &NetworkConfig) -> Option { config.listen_addrs().v4().map(|v4_addr| UPnPConfig { tcp_port: v4_addr.tcp_port, - udp_port: v4_addr.udp_port, + disc_port: v4_addr.disc_port, + quic_port: v4_addr.quic_port, disable_discovery: config.disable_discovery, + disable_quic_support: config.disable_quic_support, }) } } @@ -68,6 +97,8 @@ pub fn construct_upnp_mappings( debug!(log, "UPnP Local IP Discovered"; "ip" => ?local_ip); + let mut mappings = EstablishedUPnPMappings::default(); + match local_ip { IpAddr::V4(address) => { let libp2p_socket = SocketAddrV4::new(address, config.tcp_port); @@ -76,39 +107,46 @@ pub fn construct_upnp_mappings( // one. // I've found this to be more reliable. If multiple users are behind a single // router, they should ideally try to set different port numbers. - let tcp_socket = add_port_mapping( + mappings.tcp_port = add_port_mapping( &gateway, igd::PortMappingProtocol::TCP, libp2p_socket, "tcp", &log, - ).and_then(|_| { + ).map(|_| { let external_socket = external_ip.as_ref().map(|ip| SocketAddr::new((*ip).into(), config.tcp_port)).map_err(|_| ()); info!(log, "UPnP TCP route established"; "external_socket" => format!("{}:{}", external_socket.as_ref().map(|ip| ip.to_string()).unwrap_or_else(|_| "".into()), config.tcp_port)); - external_socket + config.tcp_port }).ok(); - let udp_socket = if !config.disable_discovery { - let discovery_socket = SocketAddrV4::new(address, config.udp_port); + let set_udp_mapping = |udp_port| { + let udp_socket = SocketAddrV4::new(address, udp_port); add_port_mapping( &gateway, igd::PortMappingProtocol::UDP, - discovery_socket, + udp_socket, "udp", &log, - ).and_then(|_| { - let external_socket = external_ip - .map(|ip| SocketAddr::new(ip.into(), config.udp_port)).map_err(|_| ()); - info!(log, "UPnP UDP route established"; "external_socket" => format!("{}:{}", external_socket.as_ref().map(|ip| ip.to_string()).unwrap_or_else(|_| "".into()), config.udp_port)); - external_socket - }).ok() - } else { - None + ).map(|_| { + info!(log, "UPnP UDP route established"; "external_socket" => format!("{}:{}", external_ip.as_ref().map(|ip| ip.to_string()).unwrap_or_else(|_| "".into()), udp_port)); + }) }; + // Set the discovery UDP port mapping + if !config.disable_discovery && set_udp_mapping(config.disc_port).is_ok() { + mappings.udp_disc_port = Some(config.disc_port); + } + + // Set the quic UDP port mapping + if !config.disable_quic_support && set_udp_mapping(config.quic_port).is_ok() { + mappings.udp_quic_port = Some(config.quic_port); + } + // report any updates to the network service. - network_send.send(NetworkMessage::UPnPMappingEstablished{ tcp_socket, udp_socket }) - .unwrap_or_else(|e| debug!(log, "Could not send message to the network service"; "error" => %e)); + if mappings.is_some() { + network_send.send(NetworkMessage::UPnPMappingEstablished{ mappings }) + .unwrap_or_else(|e| debug!(log, "Could not send message to the network service"; "error" => %e)); + } } _ => debug!(log, "UPnP no routes constructed. IPv6 not supported"), } @@ -161,12 +199,12 @@ fn add_port_mapping( } /// Removes the specified TCP and UDP port mappings. -pub fn remove_mappings(tcp_port: Option, udp_port: Option, log: &slog::Logger) { - if tcp_port.is_some() || udp_port.is_some() { +pub fn remove_mappings(mappings: &EstablishedUPnPMappings, log: &slog::Logger) { + if mappings.is_some() { debug!(log, "Removing UPnP port mappings"); match igd::search_gateway(Default::default()) { Ok(gateway) => { - if let Some(tcp_port) = tcp_port { + if let Some(tcp_port) = mappings.tcp_port { match gateway.remove_port(igd::PortMappingProtocol::TCP, tcp_port) { Ok(()) => debug!(log, "UPnP Removed TCP port mapping"; "port" => tcp_port), Err(e) => { @@ -174,8 +212,8 @@ pub fn remove_mappings(tcp_port: Option, udp_port: Option, log: &slog: } } } - if let Some(udp_port) = udp_port { - match gateway.remove_port(igd::PortMappingProtocol::UDP, udp_port) { + for udp_port in mappings.udp_ports() { + match gateway.remove_port(igd::PortMappingProtocol::UDP, *udp_port) { Ok(()) => debug!(log, "UPnP Removed UDP port mapping"; "port" => udp_port), Err(e) => { debug!(log, "UPnP Failed to remove UDP port mapping"; "port" => udp_port, "error" => %e) diff --git a/beacon_node/network/src/network_beacon_processor/tests.rs b/beacon_node/network/src/network_beacon_processor/tests.rs index 04dd3693382..fd1c8c6a62a 100644 --- a/beacon_node/network/src/network_beacon_processor/tests.rs +++ b/beacon_node/network/src/network_beacon_processor/tests.rs @@ -34,7 +34,6 @@ const VALIDATOR_COUNT: usize = SLOTS_PER_EPOCH as usize; const SMALL_CHAIN: u64 = 2; const LONG_CHAIN: u64 = SLOTS_PER_EPOCH * 2; -const TCP_PORT: u16 = 42; const SEQ_NUMBER: u64 = 0; /// The default time to wait for `BeaconProcessor` events. @@ -178,15 +177,7 @@ impl TestRig { }); let enr_key = CombinedKey::generate_secp256k1(); let enr = EnrBuilder::new("v4").build(&enr_key).unwrap(); - let network_globals = Arc::new(NetworkGlobals::new( - enr, - Some(TCP_PORT), - None, - meta_data, - vec![], - false, - &log, - )); + let network_globals = Arc::new(NetworkGlobals::new(enr, meta_data, vec![], false, &log)); let executor = harness.runtime.task_executor.clone(); diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index 193955ee53f..5bff23925c1 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -1,4 +1,5 @@ use super::sync::manager::RequestId as SyncId; +use crate::nat::EstablishedUPnPMappings; use crate::network_beacon_processor::InvalidBlockStorage; use crate::persisted_dht::{clear_dht, load_dht, persist_dht}; use crate::router::{Router, RouterMessage}; @@ -26,7 +27,7 @@ use lighthouse_network::{ MessageId, NetworkEvent, NetworkGlobals, PeerId, }; use slog::{crit, debug, error, info, o, trace, warn}; -use std::{collections::HashSet, net::SocketAddr, pin::Pin, sync::Arc, time::Duration}; +use std::{collections::HashSet, pin::Pin, sync::Arc, time::Duration}; use store::HotColdDB; use strum::IntoStaticStr; use task_executor::ShutdownReason; @@ -93,12 +94,10 @@ pub enum NetworkMessage { /// The result of the validation validation_result: MessageAcceptance, }, - /// Called if a known external TCP socket address has been updated. + /// Called if UPnP managed to establish an external port mapping. UPnPMappingEstablished { - /// The external TCP address has been updated. - tcp_socket: Option, - /// The external UDP address has been updated. - udp_socket: Option, + /// The mappings that were established. + mappings: EstablishedUPnPMappings, }, /// Reports a peer to the peer manager for performing an action. ReportPeer { @@ -190,11 +189,8 @@ pub struct NetworkService { /// A collection of global variables, accessible outside of the network service. network_globals: Arc>, /// Stores potentially created UPnP mappings to be removed on shutdown. (TCP port and UDP - /// port). - upnp_mappings: (Option, Option), - /// Keeps track of if discovery is auto-updating or not. This is used to inform us if we should - /// update the UDP socket of discovery if the UPnP mappings get established. - discovery_auto_update: bool, + /// ports). + upnp_mappings: EstablishedUPnPMappings, /// A delay that expires when a new fork takes place. next_fork_update: Pin>>, /// A delay that expires when we need to subscribe to a new fork's topics. @@ -359,8 +355,7 @@ impl NetworkService { router_send, store, network_globals: network_globals.clone(), - upnp_mappings: (None, None), - discovery_auto_update: config.discv5_config.enr_update, + upnp_mappings: EstablishedUPnPMappings::default(), next_fork_update, next_fork_subscriptions, next_unsubscribe, @@ -617,32 +612,18 @@ impl NetworkService { } => { self.libp2p.send_error_reponse(peer_id, id, error, reason); } - NetworkMessage::UPnPMappingEstablished { - tcp_socket, - udp_socket, - } => { - self.upnp_mappings = (tcp_socket.map(|s| s.port()), udp_socket.map(|s| s.port())); + NetworkMessage::UPnPMappingEstablished { mappings } => { + self.upnp_mappings = mappings; // If there is an external TCP port update, modify our local ENR. - if let Some(tcp_socket) = tcp_socket { - if let Err(e) = self - .libp2p - .discovery_mut() - .update_enr_tcp_port(tcp_socket.port()) - { + if let Some(tcp_port) = self.upnp_mappings.tcp_port { + if let Err(e) = self.libp2p.discovery_mut().update_enr_tcp_port(tcp_port) { warn!(self.log, "Failed to update ENR"; "error" => e); } } - // if the discovery service is not auto-updating, update it with the - // UPnP mappings - if !self.discovery_auto_update { - if let Some(udp_socket) = udp_socket { - if let Err(e) = self - .libp2p - .discovery_mut() - .update_enr_udp_socket(udp_socket) - { - warn!(self.log, "Failed to update ENR"; "error" => e); - } + // If there is an external QUIC port update, modify our local ENR. + if let Some(quic_port) = self.upnp_mappings.udp_quic_port { + if let Err(e) = self.libp2p.discovery_mut().update_enr_quic_port(quic_port) { + warn!(self.log, "Failed to update ENR"; "error" => e); } } } @@ -995,7 +976,7 @@ impl Drop for NetworkService { } // attempt to remove port mappings - crate::nat::remove_mappings(self.upnp_mappings.0, self.upnp_mappings.1, &self.log); + crate::nat::remove_mappings(&self.upnp_mappings, &self.log); info!(self.log, "Network service shutdown"); } diff --git a/beacon_node/network/src/service/tests.rs b/beacon_node/network/src/service/tests.rs index 633b83ddd9c..ebf720be635 100644 --- a/beacon_node/network/src/service/tests.rs +++ b/beacon_node/network/src/service/tests.rs @@ -61,7 +61,7 @@ mod tests { ); let mut config = NetworkConfig::default(); - config.set_ipv4_listening_address(std::net::Ipv4Addr::UNSPECIFIED, 21212, 21212); + config.set_ipv4_listening_address(std::net::Ipv4Addr::UNSPECIFIED, 21212, 21212, 21213); config.discv5_config.table_filter = |_| true; // Do not ignore local IPs config.upnp_enabled = false; config.boot_nodes_enr = enrs.clone(); diff --git a/beacon_node/src/cli.rs b/beacon_node/src/cli.rs index 88d4fa92092..85cdd4e23ba 100644 --- a/beacon_node/src/cli.rs +++ b/beacon_node/src/cli.rs @@ -82,11 +82,11 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { .help("The address lighthouse will listen for UDP and TCP connections. To listen \ over IpV4 and IpV6 set this flag twice with the different values.\n\ Examples:\n\ - - --listen-address '0.0.0.0' will listen over Ipv4.\n\ - - --listen-address '::' will listen over Ipv6.\n\ + - --listen-address '0.0.0.0' will listen over IPv4.\n\ + - --listen-address '::' will listen over IPv6.\n\ - --listen-address '0.0.0.0' --listen-address '::' will listen over both \ - Ipv4 and Ipv6. The order of the given addresses is not relevant. However, \ - multiple Ipv4, or multiple Ipv6 addresses will not be accepted.") + IPv4 and IPv6. The order of the given addresses is not relevant. However, \ + multiple IPv4, or multiple IPv6 addresses will not be accepted.") .multiple(true) .max_values(2) .default_value("0.0.0.0") @@ -96,9 +96,10 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { Arg::with_name("port") .long("port") .value_name("PORT") - .help("The TCP/UDP port to listen on. The UDP port can be modified by the \ - --discovery-port flag. If listening over both Ipv4 and Ipv6 the --port flag \ - will apply to the Ipv4 address and --port6 to the Ipv6 address.") + .help("The TCP/UDP ports to listen on. There are two UDP ports. \ + The discovery UDP port will be set to this value and the Quic UDP port will be set to this value + 1. The discovery port can be modified by the \ + --discovery-port flag and the quic port can be modified by the --quic-port flag. If listening over both IPv4 and IPv6 the --port flag \ + will apply to the IPv4 address and --port6 to the IPv6 address.") .default_value("9000") .takes_value(true), ) @@ -106,8 +107,8 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { Arg::with_name("port6") .long("port6") .value_name("PORT") - .help("The TCP/UDP port to listen on over IpV6 when listening over both Ipv4 and \ - Ipv6. Defaults to 9090 when required.") + .help("The TCP/UDP ports to listen on over IPv6 when listening over both IPv4 and \ + IPv6. Defaults to 9090 when required. The Quic UDP port will be set to this value + 1.") .default_value("9090") .takes_value(true), ) @@ -118,12 +119,27 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { .help("The UDP port that discovery will listen on. Defaults to `port`") .takes_value(true), ) + .arg( + Arg::with_name("quic-port") + .long("quic-port") + .value_name("PORT") + .help("The UDP port that quic will listen on. Defaults to `port` + 1") + .takes_value(true), + ) .arg( Arg::with_name("discovery-port6") .long("discovery-port6") .value_name("PORT") - .help("The UDP port that discovery will listen on over IpV6 if listening over \ - both Ipv4 and IpV6. Defaults to `port6`") + .help("The UDP port that discovery will listen on over IPv6 if listening over \ + both IPv4 and IPv6. Defaults to `port6`") + .takes_value(true), + ) + .arg( + Arg::with_name("quic-port6") + .long("quic-port6") + .value_name("PORT") + .help("The UDP port that quic will listen on over IPv6 if listening over \ + both IPv4 and IPv6. Defaults to `port6` + 1") .takes_value(true), ) .arg( @@ -166,7 +182,15 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { .long("enr-udp-port") .value_name("PORT") .help("The UDP4 port of the local ENR. Set this only if you are sure other nodes \ - can connect to your local node on this port over IpV4.") + can connect to your local node on this port over IPv4.") + .takes_value(true), + ) + .arg( + Arg::with_name("enr-quic-port") + .long("enr-quic-port") + .value_name("PORT") + .help("The quic UDP4 port that will be set on the local ENR. Set this only if you are sure other nodes \ + can connect to your local node on this port over IPv4.") .takes_value(true), ) .arg( @@ -174,7 +198,15 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { .long("enr-udp6-port") .value_name("PORT") .help("The UDP6 port of the local ENR. Set this only if you are sure other nodes \ - can connect to your local node on this port over IpV6.") + can connect to your local node on this port over IPv6.") + .takes_value(true), + ) + .arg( + Arg::with_name("enr-quic6-port") + .long("enr-quic6-port") + .value_name("PORT") + .help("The quic UDP6 port that will be set on the local ENR. Set this only if you are sure other nodes \ + can connect to your local node on this port over IPv6.") .takes_value(true), ) .arg( @@ -182,7 +214,7 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { .long("enr-tcp-port") .value_name("PORT") .help("The TCP4 port of the local ENR. Set this only if you are sure other nodes \ - can connect to your local node on this port over IpV4. The --port flag is \ + can connect to your local node on this port over IPv4. The --port flag is \ used if this is not set.") .takes_value(true), ) @@ -191,7 +223,7 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { .long("enr-tcp6-port") .value_name("PORT") .help("The TCP6 port of the local ENR. Set this only if you are sure other nodes \ - can connect to your local node on this port over IpV6. The --port6 flag is \ + can connect to your local node on this port over IPv6. The --port6 flag is \ used if this is not set.") .takes_value(true), ) @@ -232,11 +264,18 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { without an ENR.") .takes_value(true), ) + // NOTE: This is hidden because it is primarily a developer feature for testnets and + // debugging. We remove it from the list to avoid clutter. .arg( Arg::with_name("disable-discovery") .long("disable-discovery") .help("Disables the discv5 discovery protocol. The node will not search for new peers or participate in the discovery protocol.") - .takes_value(false), + .hidden(true) + ) + .arg( + Arg::with_name("disable-quic") + .long("disable-quic") + .help("Disables the quic transport. The node will rely solely on the TCP transport for libp2p connections.") ) .arg( Arg::with_name("disable-peer-scoring") diff --git a/beacon_node/src/config.rs b/beacon_node/src/config.rs index 4bd4d6a97bb..3d7e5e01f4e 100644 --- a/beacon_node/src/config.rs +++ b/beacon_node/src/config.rs @@ -945,15 +945,15 @@ pub fn parse_listening_addresses( .map_err(|parse_error| format!("Failed to parse --port6 as an integer: {parse_error}"))? .unwrap_or(9090); - // parse the possible udp ports - let maybe_udp_port = cli_args + // parse the possible discovery ports. + let maybe_disc_port = cli_args .value_of("discovery-port") .map(str::parse::) .transpose() .map_err(|parse_error| { format!("Failed to parse --discovery-port as an integer: {parse_error}") })?; - let maybe_udp6_port = cli_args + let maybe_disc6_port = cli_args .value_of("discovery-port6") .map(str::parse::) .transpose() @@ -961,6 +961,24 @@ pub fn parse_listening_addresses( format!("Failed to parse --discovery-port6 as an integer: {parse_error}") })?; + // parse the possible quic port. + let maybe_quic_port = cli_args + .value_of("quic-port") + .map(str::parse::) + .transpose() + .map_err(|parse_error| { + format!("Failed to parse --quic-port as an integer: {parse_error}") + })?; + + // parse the possible quic port. + let maybe_quic6_port = cli_args + .value_of("quic-port6") + .map(str::parse::) + .transpose() + .map_err(|parse_error| { + format!("Failed to parse --quic6-port as an integer: {parse_error}") + })?; + // Now put everything together let listening_addresses = match (maybe_ipv4, maybe_ipv6) { (None, None) => { @@ -971,7 +989,7 @@ pub fn parse_listening_addresses( // A single ipv6 address was provided. Set the ports if cli_args.is_present("port6") { - warn!(log, "When listening only over IpV6, use the --port flag. The value of --port6 will be ignored.") + warn!(log, "When listening only over IPv6, use the --port flag. The value of --port6 will be ignored.") } // use zero ports if required. If not, use the given port. let tcp_port = use_zero_ports @@ -979,20 +997,32 @@ pub fn parse_listening_addresses( .transpose()? .unwrap_or(port); - if maybe_udp6_port.is_some() { - warn!(log, "When listening only over IpV6, use the --discovery-port flag. The value of --discovery-port6 will be ignored.") + if maybe_disc6_port.is_some() { + warn!(log, "When listening only over IPv6, use the --discovery-port flag. The value of --discovery-port6 will be ignored.") + } + + if maybe_quic6_port.is_some() { + warn!(log, "When listening only over IPv6, use the --quic-port flag. The value of --quic-port6 will be ignored.") } + // use zero ports if required. If not, use the specific udp port. If none given, use // the tcp port. - let udp_port = use_zero_ports + let disc_port = use_zero_ports .then(unused_port::unused_udp6_port) .transpose()? - .or(maybe_udp_port) + .or(maybe_disc_port) .unwrap_or(port); + let quic_port = use_zero_ports + .then(unused_port::unused_udp6_port) + .transpose()? + .or(maybe_quic_port) + .unwrap_or(port + 1); + ListenAddress::V6(lighthouse_network::ListenAddr { addr: ipv6, - udp_port, + quic_port, + disc_port, tcp_port, }) } @@ -1004,16 +1034,25 @@ pub fn parse_listening_addresses( .then(unused_port::unused_tcp4_port) .transpose()? .unwrap_or(port); - // use zero ports if required. If not, use the specific udp port. If none given, use + // use zero ports if required. If not, use the specific discovery port. If none given, use // the tcp port. - let udp_port = use_zero_ports + let disc_port = use_zero_ports .then(unused_port::unused_udp4_port) .transpose()? - .or(maybe_udp_port) + .or(maybe_disc_port) .unwrap_or(port); + // use zero ports if required. If not, use the specific quic port. If none given, use + // the tcp port + 1. + let quic_port = use_zero_ports + .then(unused_port::unused_udp4_port) + .transpose()? + .or(maybe_quic_port) + .unwrap_or(port + 1); + ListenAddress::V4(lighthouse_network::ListenAddr { addr: ipv4, - udp_port, + disc_port, + quic_port, tcp_port, }) } @@ -1022,31 +1061,44 @@ pub fn parse_listening_addresses( .then(unused_port::unused_tcp4_port) .transpose()? .unwrap_or(port); - let ipv4_udp_port = use_zero_ports + let ipv4_disc_port = use_zero_ports .then(unused_port::unused_udp4_port) .transpose()? - .or(maybe_udp_port) + .or(maybe_disc_port) .unwrap_or(ipv4_tcp_port); + let ipv4_quic_port = use_zero_ports + .then(unused_port::unused_udp4_port) + .transpose()? + .or(maybe_quic_port) + .unwrap_or(port + 1); // Defaults to 9090 when required let ipv6_tcp_port = use_zero_ports .then(unused_port::unused_tcp6_port) .transpose()? .unwrap_or(port6); - let ipv6_udp_port = use_zero_ports + let ipv6_disc_port = use_zero_ports .then(unused_port::unused_udp6_port) .transpose()? - .or(maybe_udp6_port) + .or(maybe_disc6_port) .unwrap_or(ipv6_tcp_port); + let ipv6_quic_port = use_zero_ports + .then(unused_port::unused_udp6_port) + .transpose()? + .or(maybe_quic6_port) + .unwrap_or(ipv6_tcp_port + 1); + ListenAddress::DualStack( lighthouse_network::ListenAddr { addr: ipv4, - udp_port: ipv4_udp_port, + disc_port: ipv4_disc_port, + quic_port: ipv4_quic_port, tcp_port: ipv4_tcp_port, }, lighthouse_network::ListenAddr { addr: ipv6, - udp_port: ipv6_udp_port, + disc_port: ipv6_disc_port, + quic_port: ipv6_quic_port, tcp_port: ipv6_tcp_port, }, ) @@ -1162,6 +1214,14 @@ pub fn set_network_config( ); } + if let Some(enr_quic_port_str) = cli_args.value_of("enr-quic-port") { + config.enr_quic4_port = Some( + enr_quic_port_str + .parse::() + .map_err(|_| format!("Invalid quic port: {}", enr_quic_port_str))?, + ); + } + if let Some(enr_tcp_port_str) = cli_args.value_of("enr-tcp-port") { config.enr_tcp4_port = Some( enr_tcp_port_str @@ -1178,6 +1238,14 @@ pub fn set_network_config( ); } + if let Some(enr_quic_port_str) = cli_args.value_of("enr-quic6-port") { + config.enr_quic6_port = Some( + enr_quic_port_str + .parse::() + .map_err(|_| format!("Invalid quic port: {}", enr_quic_port_str))?, + ); + } + if let Some(enr_tcp_port_str) = cli_args.value_of("enr-tcp6-port") { config.enr_tcp6_port = Some( enr_tcp_port_str @@ -1187,9 +1255,9 @@ pub fn set_network_config( } if cli_args.is_present("enr-match") { - // Match the Ip and UDP port in the enr. + // Match the IP and UDP port in the ENR. - // set the enr address to localhost if the address is unspecified + // Set the ENR address to localhost if the address is unspecified. if let Some(ipv4_addr) = config.listen_addrs().v4().cloned() { let ipv4_enr_addr = if ipv4_addr.addr == Ipv4Addr::UNSPECIFIED { Ipv4Addr::LOCALHOST @@ -1197,7 +1265,7 @@ pub fn set_network_config( ipv4_addr.addr }; config.enr_address.0 = Some(ipv4_enr_addr); - config.enr_udp4_port = Some(ipv4_addr.udp_port); + config.enr_udp4_port = Some(ipv4_addr.disc_port); } if let Some(ipv6_addr) = config.listen_addrs().v6().cloned() { @@ -1207,7 +1275,7 @@ pub fn set_network_config( ipv6_addr.addr }; config.enr_address.1 = Some(ipv6_enr_addr); - config.enr_udp6_port = Some(ipv6_addr.udp_port); + config.enr_udp6_port = Some(ipv6_addr.disc_port); } } @@ -1240,11 +1308,11 @@ pub fn set_network_config( // actually matters. Just use the udp port. let port = match config.listen_addrs() { - ListenAddress::V4(v4_addr) => v4_addr.udp_port, - ListenAddress::V6(v6_addr) => v6_addr.udp_port, + ListenAddress::V4(v4_addr) => v4_addr.disc_port, + ListenAddress::V6(v6_addr) => v6_addr.disc_port, ListenAddress::DualStack(v4_addr, _v6_addr) => { // NOTE: slight preference for ipv4 that I don't think is of importance. - v4_addr.udp_port + v4_addr.disc_port } }; @@ -1303,6 +1371,10 @@ pub fn set_network_config( warn!(log, "Discovery is disabled. New peers will not be found"); } + if cli_args.is_present("disable-quic") { + config.disable_quic_support = true; + } + if cli_args.is_present("disable-upnp") { config.upnp_enabled = false; } diff --git a/book/src/SUMMARY.md b/book/src/SUMMARY.md index 507896f4311..3f58d8aa457 100644 --- a/book/src/SUMMARY.md +++ b/book/src/SUMMARY.md @@ -56,3 +56,4 @@ * [Contributing](./contributing.md) * [Development Environment](./setup.md) * [FAQs](./faq.md) +* [Protocol Developers](./developers.md) \ No newline at end of file diff --git a/book/src/developers.md b/book/src/developers.md new file mode 100644 index 00000000000..2ba09bd3412 --- /dev/null +++ b/book/src/developers.md @@ -0,0 +1,51 @@ +# For Protocol Developers + +_Documentation for protocol developers._ + +This section lists Lighthouse-specific decisions that are not strictly spec'd and may be useful for +other protocol developers wishing to interact with lighthouse. + + +## Custom ENR Fields + +Lighthouse currently uses the following ENR fields: + +### Ethereum Consensus Specified + +| Field | Description | +| ---- | ---- | +| `eth2` | The `ENRForkId` in SSZ bytes specifying which fork the node is on | +| `attnets` | An SSZ bitfield which indicates which of the 64 subnets the node is subscribed to for an extended period of time | +| `syncnets` | An SSZ bitfield which indicates which of the sync committee subnets the node is subscribed to | + + +### Lighthouse Custom Fields + +Lighthouse is currently using the following custom ENR fields. +| Field | Description | +| ---- | ---- | +| `quic` | The UDP port on which the QUIC transport is listening on IPv4 | +| `quic6` | The UDP port on which the QUIC transport is listening on IPv6 | + + +## Custom RPC Messages + +The specification leaves room for implementation-specific errors. Lighthouse uses the following +custom RPC error messages. + +### Goodbye Reason Codes + +| Code | Message | Description | +| ---- | ---- | ---- | +| 128 | Unable to Verify Network | Teku uses this, so we adopted it. It relates to having a fork mismatch | +| 129 | Too Many Peers | Lighthouse can close a connection because it has reached its peer-limit and pruned excess peers | +| 250 | Bad Score | The node has been dropped due to having a bad peer score | +| 251 | Banned | The peer has been banned and disconnected | +| 252 | Banned IP | The IP the node is connected to us with has been banned | + + +### Error Codes + +| Code | Message | Description | +| ---- | ---- | ---- | +| 139 | Rate Limited | The peer has been rate limited so we return this error as a response | \ No newline at end of file diff --git a/boot_node/src/config.rs b/boot_node/src/config.rs index 779269921a5..817bd2ab52f 100644 --- a/boot_node/src/config.rs +++ b/boot_node/src/config.rs @@ -58,12 +58,12 @@ impl BootNodeConfig { set_network_config(&mut network_config, matches, &data_dir, &logger)?; - // Set the Enr UDP ports to the listening ports if not present. + // Set the Enr Discovery ports to the listening ports if not present. if let Some(listening_addr_v4) = network_config.listen_addrs().v4() { network_config.enr_udp4_port = Some( network_config .enr_udp4_port - .unwrap_or(listening_addr_v4.udp_port), + .unwrap_or(listening_addr_v4.disc_port), ) }; @@ -71,7 +71,7 @@ impl BootNodeConfig { network_config.enr_udp6_port = Some( network_config .enr_udp6_port - .unwrap_or(listening_addr_v6.udp_port), + .unwrap_or(listening_addr_v6.disc_port), ) }; diff --git a/lighthouse/tests/beacon_node.rs b/lighthouse/tests/beacon_node.rs index c89b8316294..2d9a11a6acb 100644 --- a/lighthouse/tests/beacon_node.rs +++ b/lighthouse/tests/beacon_node.rs @@ -1010,12 +1010,12 @@ fn network_port_flag_over_ipv4() { .run() .with_config(|config| { assert_eq!( - config - .network - .listen_addrs() - .v4() - .map(|listen_addr| (listen_addr.udp_port, listen_addr.tcp_port)), - Some((port, port)) + config.network.listen_addrs().v4().map(|listen_addr| ( + listen_addr.disc_port, + listen_addr.quic_port, + listen_addr.tcp_port + )), + Some((port, port + 1, port)) ); }); } @@ -1028,22 +1028,22 @@ fn network_port_flag_over_ipv6() { .run() .with_config(|config| { assert_eq!( - config - .network - .listen_addrs() - .v6() - .map(|listen_addr| (listen_addr.udp_port, listen_addr.tcp_port)), - Some((port, port)) + config.network.listen_addrs().v6().map(|listen_addr| ( + listen_addr.disc_port, + listen_addr.quic_port, + listen_addr.tcp_port + )), + Some((port, port + 1, port)) ); }); } #[test] fn network_port_and_discovery_port_flags_over_ipv4() { let tcp4_port = unused_tcp4_port().expect("Unable to find unused port."); - let udp4_port = unused_udp4_port().expect("Unable to find unused port."); + let disc4_port = unused_udp4_port().expect("Unable to find unused port."); CommandLineTest::new() .flag("port", Some(tcp4_port.to_string().as_str())) - .flag("discovery-port", Some(udp4_port.to_string().as_str())) + .flag("discovery-port", Some(disc4_port.to_string().as_str())) .run() .with_config(|config| { assert_eq!( @@ -1051,19 +1051,19 @@ fn network_port_and_discovery_port_flags_over_ipv4() { .network .listen_addrs() .v4() - .map(|listen_addr| (listen_addr.tcp_port, listen_addr.udp_port)), - Some((tcp4_port, udp4_port)) + .map(|listen_addr| (listen_addr.tcp_port, listen_addr.disc_port)), + Some((tcp4_port, disc4_port)) ); }); } #[test] fn network_port_and_discovery_port_flags_over_ipv6() { let tcp6_port = unused_tcp6_port().expect("Unable to find unused port."); - let udp6_port = unused_udp6_port().expect("Unable to find unused port."); + let disc6_port = unused_udp6_port().expect("Unable to find unused port."); CommandLineTest::new() .flag("listen-address", Some("::1")) .flag("port", Some(tcp6_port.to_string().as_str())) - .flag("discovery-port", Some(udp6_port.to_string().as_str())) + .flag("discovery-port", Some(disc6_port.to_string().as_str())) .run() .with_config(|config| { assert_eq!( @@ -1071,24 +1071,24 @@ fn network_port_and_discovery_port_flags_over_ipv6() { .network .listen_addrs() .v6() - .map(|listen_addr| (listen_addr.tcp_port, listen_addr.udp_port)), - Some((tcp6_port, udp6_port)) + .map(|listen_addr| (listen_addr.tcp_port, listen_addr.disc_port)), + Some((tcp6_port, disc6_port)) ); }); } #[test] fn network_port_and_discovery_port_flags_over_ipv4_and_ipv6() { let tcp4_port = unused_tcp4_port().expect("Unable to find unused port."); - let udp4_port = unused_udp4_port().expect("Unable to find unused port."); + let disc4_port = unused_udp4_port().expect("Unable to find unused port."); let tcp6_port = unused_tcp6_port().expect("Unable to find unused port."); - let udp6_port = unused_udp6_port().expect("Unable to find unused port."); + let disc6_port = unused_udp6_port().expect("Unable to find unused port."); CommandLineTest::new() .flag("listen-address", Some("::1")) .flag("listen-address", Some("127.0.0.1")) .flag("port", Some(tcp4_port.to_string().as_str())) - .flag("discovery-port", Some(udp4_port.to_string().as_str())) + .flag("discovery-port", Some(disc4_port.to_string().as_str())) .flag("port6", Some(tcp6_port.to_string().as_str())) - .flag("discovery-port6", Some(udp6_port.to_string().as_str())) + .flag("discovery-port6", Some(disc6_port.to_string().as_str())) .run() .with_config(|config| { assert_eq!( @@ -1096,8 +1096,8 @@ fn network_port_and_discovery_port_flags_over_ipv4_and_ipv6() { .network .listen_addrs() .v4() - .map(|listen_addr| (listen_addr.tcp_port, listen_addr.udp_port)), - Some((tcp4_port, udp4_port)) + .map(|listen_addr| (listen_addr.tcp_port, listen_addr.disc_port)), + Some((tcp4_port, disc4_port)) ); assert_eq!( @@ -1105,8 +1105,47 @@ fn network_port_and_discovery_port_flags_over_ipv4_and_ipv6() { .network .listen_addrs() .v6() - .map(|listen_addr| (listen_addr.tcp_port, listen_addr.udp_port)), - Some((tcp6_port, udp6_port)) + .map(|listen_addr| (listen_addr.tcp_port, listen_addr.disc_port)), + Some((tcp6_port, disc6_port)) + ); + }); +} + +#[test] +fn network_port_discovery_quic_port_flags_over_ipv4_and_ipv6() { + let tcp4_port = unused_tcp4_port().expect("Unable to find unused port."); + let disc4_port = unused_udp4_port().expect("Unable to find unused port."); + let quic4_port = unused_udp4_port().expect("Unable to find unused port."); + let tcp6_port = unused_tcp6_port().expect("Unable to find unused port."); + let disc6_port = unused_udp6_port().expect("Unable to find unused port."); + let quic6_port = unused_udp6_port().expect("Unable to find unused port."); + CommandLineTest::new() + .flag("listen-address", Some("::1")) + .flag("listen-address", Some("127.0.0.1")) + .flag("port", Some(tcp4_port.to_string().as_str())) + .flag("discovery-port", Some(disc4_port.to_string().as_str())) + .flag("quic-port", Some(quic4_port.to_string().as_str())) + .flag("port6", Some(tcp6_port.to_string().as_str())) + .flag("discovery-port6", Some(disc6_port.to_string().as_str())) + .flag("quic-port6", Some(quic6_port.to_string().as_str())) + .run() + .with_config(|config| { + assert_eq!( + config.network.listen_addrs().v4().map(|listen_addr| ( + listen_addr.tcp_port, + listen_addr.disc_port, + listen_addr.quic_port + )), + Some((tcp4_port, disc4_port, quic4_port)) + ); + + assert_eq!( + config.network.listen_addrs().v6().map(|listen_addr| ( + listen_addr.tcp_port, + listen_addr.disc_port, + listen_addr.quic_port + )), + Some((tcp6_port, disc6_port, quic6_port)) ); }); } @@ -1118,6 +1157,14 @@ fn disable_discovery_flag() { .run_with_zero_port() .with_config(|config| assert!(config.network.disable_discovery)); } + +#[test] +fn disable_quic_flag() { + CommandLineTest::new() + .flag("disable-quic", None) + .run_with_zero_port() + .with_config(|config| assert!(config.network.disable_quic_support)); +} #[test] fn disable_peer_scoring_flag() { CommandLineTest::new() @@ -1224,6 +1271,14 @@ fn enr_udp_port_flag() { .with_config(|config| assert_eq!(config.network.enr_udp4_port, Some(port))); } #[test] +fn enr_quic_port_flag() { + let port = unused_udp4_port().expect("Unable to find unused port."); + CommandLineTest::new() + .flag("enr-quic-port", Some(port.to_string().as_str())) + .run_with_zero_port() + .with_config(|config| assert_eq!(config.network.enr_quic4_port, Some(port))); +} +#[test] fn enr_tcp_port_flag() { let port = unused_tcp4_port().expect("Unable to find unused port."); CommandLineTest::new() @@ -1240,6 +1295,14 @@ fn enr_udp6_port_flag() { .with_config(|config| assert_eq!(config.network.enr_udp6_port, Some(port))); } #[test] +fn enr_quic6_port_flag() { + let port = unused_udp6_port().expect("Unable to find unused port."); + CommandLineTest::new() + .flag("enr-quic6-port", Some(port.to_string().as_str())) + .run_with_zero_port() + .with_config(|config| assert_eq!(config.network.enr_quic6_port, Some(port))); +} +#[test] fn enr_tcp6_port_flag() { let port = unused_tcp6_port().expect("Unable to find unused port."); CommandLineTest::new() @@ -1262,7 +1325,7 @@ fn enr_match_flag_over_ipv4() { assert_eq!( config.network.listen_addrs().v4().map(|listen_addr| ( listen_addr.addr, - listen_addr.udp_port, + listen_addr.disc_port, listen_addr.tcp_port )), Some((addr, udp4_port, tcp4_port)) @@ -1287,7 +1350,7 @@ fn enr_match_flag_over_ipv6() { assert_eq!( config.network.listen_addrs().v6().map(|listen_addr| ( listen_addr.addr, - listen_addr.udp_port, + listen_addr.disc_port, listen_addr.tcp_port )), Some((addr, udp6_port, tcp6_port)) @@ -1319,7 +1382,7 @@ fn enr_match_flag_over_ipv4_and_ipv6() { assert_eq!( config.network.listen_addrs().v6().map(|listen_addr| ( listen_addr.addr, - listen_addr.udp_port, + listen_addr.disc_port, listen_addr.tcp_port )), Some((ipv6_addr, udp6_port, tcp6_port)) @@ -1327,7 +1390,7 @@ fn enr_match_flag_over_ipv4_and_ipv6() { assert_eq!( config.network.listen_addrs().v4().map(|listen_addr| ( listen_addr.addr, - listen_addr.udp_port, + listen_addr.disc_port, listen_addr.tcp_port )), Some((ipv4_addr, udp4_port, tcp4_port)) diff --git a/scripts/local_testnet/beacon_node.sh b/scripts/local_testnet/beacon_node.sh index 1a04d12d4a0..70a36614c7a 100755 --- a/scripts/local_testnet/beacon_node.sh +++ b/scripts/local_testnet/beacon_node.sh @@ -39,10 +39,11 @@ done # Get positional arguments data_dir=${@:$OPTIND+0:1} -network_port=${@:$OPTIND+1:1} -http_port=${@:$OPTIND+2:1} -execution_endpoint=${@:$OPTIND+3:1} -execution_jwt=${@:$OPTIND+4:1} +tcp_port=${@:$OPTIND+1:1} +quic_port=${@:$OPTIND+2:1} +http_port=${@:$OPTIND+3:1} +execution_endpoint=${@:$OPTIND+4:1} +execution_jwt=${@:$OPTIND+5:1} lighthouse_binary=lighthouse @@ -56,9 +57,11 @@ exec $lighthouse_binary \ --disable-peer-scoring \ --staking \ --enr-address 127.0.0.1 \ - --enr-udp-port $network_port \ - --enr-tcp-port $network_port \ - --port $network_port \ + --enr-udp-port $tcp_port \ + --enr-tcp-port $tcp_port \ + --enr-quic-port $quic_port \ + --port $tcp_port \ + --quic-port $quic_port \ --http-port $http_port \ --disable-packet-filter \ --target-peers $((BN_COUNT - 1)) \ diff --git a/scripts/local_testnet/start_local_testnet.sh b/scripts/local_testnet/start_local_testnet.sh index ce36966e279..c7f888a5e45 100755 --- a/scripts/local_testnet/start_local_testnet.sh +++ b/scripts/local_testnet/start_local_testnet.sh @@ -132,7 +132,7 @@ sed -i 's/"shardingForkTime".*$/"shardingForkTime": 0,/g' genesis.json for (( bn=1; bn<=$BN_COUNT; bn++ )); do secret=$DATADIR/geth_datadir$bn/geth/jwtsecret echo $secret - execute_command_add_PID beacon_node_$bn.log ./beacon_node.sh $SAS -d $DEBUG_LEVEL $DATADIR/node_$bn $((BN_udp_tcp_base + $bn)) $((BN_http_port_base + $bn)) http://localhost:$((EL_base_auth_http + $bn)) $secret + execute_command_add_PID beacon_node_$bn.log ./beacon_node.sh $SAS -d $DEBUG_LEVEL $DATADIR/node_$bn $((BN_udp_tcp_base + $bn)) $((BN_udp_tcp_base + $bn + 100)) $((BN_http_port_base + $bn)) http://localhost:$((EL_base_auth_http + $bn)) $secret done # Start requested number of validator clients diff --git a/scripts/tests/doppelganger_protection.sh b/scripts/tests/doppelganger_protection.sh index e9d3e39ce50..1eefa7cf522 100755 --- a/scripts/tests/doppelganger_protection.sh +++ b/scripts/tests/doppelganger_protection.sh @@ -43,23 +43,23 @@ sleep 10 echo "Starting local execution nodes" -exit_if_fails ../local_testnet/geth.sh $HOME/.lighthouse/local-testnet/geth_datadir1 7000 6000 5000 $genesis_file &> geth.log & -exit_if_fails ../local_testnet/geth.sh $HOME/.lighthouse/local-testnet/geth_datadir2 7100 6100 5100 $genesis_file &> /dev/null & -exit_if_fails ../local_testnet/geth.sh $HOME/.lighthouse/local-testnet/geth_datadir3 7200 6200 5200 $genesis_file &> /dev/null & +exit_if_fails ../local_testnet/geth.sh $HOME/.lighthouse/local-testnet/geth_datadir1 6000 5000 4000 $genesis_file &> geth.log & +exit_if_fails ../local_testnet/geth.sh $HOME/.lighthouse/local-testnet/geth_datadir2 6100 5100 4100 $genesis_file &> /dev/null & +exit_if_fails ../local_testnet/geth.sh $HOME/.lighthouse/local-testnet/geth_datadir3 6200 5200 4200 $genesis_file &> /dev/null & sleep 20 echo "Starting local beacon nodes" -exit_if_fails ../local_testnet/beacon_node.sh -d debug $HOME/.lighthouse/local-testnet/node_1 9000 8000 http://localhost:5000 $HOME/.lighthouse/local-testnet/geth_datadir1/geth/jwtsecret &> beacon1.log & -exit_if_fails ../local_testnet/beacon_node.sh $HOME/.lighthouse/local-testnet/node_2 9100 8100 http://localhost:5100 $HOME/.lighthouse/local-testnet/geth_datadir2/geth/jwtsecret &> /dev/null & -exit_if_fails ../local_testnet/beacon_node.sh $HOME/.lighthouse/local-testnet/node_3 9200 8200 http://localhost:5200 $HOME/.lighthouse/local-testnet/geth_datadir3/geth/jwtsecret &> /dev/null & +exit_if_fails ../local_testnet/beacon_node.sh -d debug $HOME/.lighthouse/local-testnet/node_1 8000 7000 9000 http://localhost:4000 $HOME/.lighthouse/local-testnet/geth_datadir1/geth/jwtsecret &> /dev/null & +exit_if_fails ../local_testnet/beacon_node.sh $HOME/.lighthouse/local-testnet/node_2 8100 7100 9100 http://localhost:4100 $HOME/.lighthouse/local-testnet/geth_datadir2/geth/jwtsecret &> /dev/null & +exit_if_fails ../local_testnet/beacon_node.sh $HOME/.lighthouse/local-testnet/node_3 8200 7200 9200 http://localhost:4200 $HOME/.lighthouse/local-testnet/geth_datadir3/geth/jwtsecret &> /dev/null & echo "Starting local validator clients" -exit_if_fails ../local_testnet/validator_client.sh $HOME/.lighthouse/local-testnet/node_1 http://localhost:8000 &> /dev/null & -exit_if_fails ../local_testnet/validator_client.sh $HOME/.lighthouse/local-testnet/node_2 http://localhost:8100 &> /dev/null & -exit_if_fails ../local_testnet/validator_client.sh $HOME/.lighthouse/local-testnet/node_3 http://localhost:8200 &> /dev/null & +exit_if_fails ../local_testnet/validator_client.sh $HOME/.lighthouse/local-testnet/node_1 http://localhost:9000 &> /dev/null & +exit_if_fails ../local_testnet/validator_client.sh $HOME/.lighthouse/local-testnet/node_2 http://localhost:9100 &> /dev/null & +exit_if_fails ../local_testnet/validator_client.sh $HOME/.lighthouse/local-testnet/node_3 http://localhost:9200 &> /dev/null & echo "Waiting an epoch before starting the next validator client" sleep $(( $SECONDS_PER_SLOT * 32 )) @@ -70,7 +70,7 @@ if [[ "$BEHAVIOR" == "failure" ]]; then # Use same keys as keys from VC1 and connect to BN2 # This process should not last longer than 2 epochs - timeout $(( $SECONDS_PER_SLOT * 32 * 2 )) ../local_testnet/validator_client.sh $HOME/.lighthouse/local-testnet/node_1_doppelganger http://localhost:8100 + timeout $(( $SECONDS_PER_SLOT * 32 * 2 )) ../local_testnet/validator_client.sh $HOME/.lighthouse/local-testnet/node_1_doppelganger http://localhost:9100 DOPPELGANGER_EXIT=$? echo "Shutting down" @@ -96,7 +96,7 @@ if [[ "$BEHAVIOR" == "success" ]]; then echo "Starting the last validator client" - ../local_testnet/validator_client.sh $HOME/.lighthouse/local-testnet/node_4 http://localhost:8100 & + ../local_testnet/validator_client.sh $HOME/.lighthouse/local-testnet/node_4 http://localhost:9100 & DOPPELGANGER_FAILURE=0 # Sleep three epochs, then make sure all validators were active in epoch 2. Use @@ -110,7 +110,7 @@ if [[ "$BEHAVIOR" == "success" ]]; then cd $HOME/.lighthouse/local-testnet/node_4/validators for val in 0x*; do [[ -e $val ]] || continue - curl -s localhost:8100/lighthouse/validator_inclusion/3/$val | jq | grep -q '"is_previous_epoch_target_attester": false' + curl -s localhost:9100/lighthouse/validator_inclusion/3/$val | jq | grep -q '"is_previous_epoch_target_attester": false' IS_ATTESTER=$? if [[ $IS_ATTESTER -eq 0 ]]; then echo "$val did not attest in epoch 2." @@ -128,7 +128,7 @@ if [[ "$BEHAVIOR" == "success" ]]; then sleep $(( $SECONDS_PER_SLOT * 32 * 2 )) for val in 0x*; do [[ -e $val ]] || continue - curl -s localhost:8100/lighthouse/validator_inclusion/5/$val | jq | grep -q '"is_previous_epoch_target_attester": true' + curl -s localhost:9100/lighthouse/validator_inclusion/5/$val | jq | grep -q '"is_previous_epoch_target_attester": true' IS_ATTESTER=$? if [[ $IS_ATTESTER -eq 0 ]]; then echo "$val attested in epoch 4." @@ -154,4 +154,4 @@ if [[ "$BEHAVIOR" == "success" ]]; then fi fi -exit 0 \ No newline at end of file +exit 0 diff --git a/testing/node_test_rig/src/lib.rs b/testing/node_test_rig/src/lib.rs index 3cd8205eb65..d842a72747b 100644 --- a/testing/node_test_rig/src/lib.rs +++ b/testing/node_test_rig/src/lib.rs @@ -98,7 +98,7 @@ pub fn testing_client_config() -> ClientConfig { // Setting ports to `0` means that the OS will choose some available port. client_config .network - .set_ipv4_listening_address(std::net::Ipv4Addr::UNSPECIFIED, 0, 0); + .set_ipv4_listening_address(std::net::Ipv4Addr::UNSPECIFIED, 0, 0, 0); client_config.network.upnp_enabled = false; client_config.http_api.enabled = true; client_config.http_api.listen_port = 0; diff --git a/testing/simulator/src/local_network.rs b/testing/simulator/src/local_network.rs index e35870d126c..69fa8ded023 100644 --- a/testing/simulator/src/local_network.rs +++ b/testing/simulator/src/local_network.rs @@ -14,6 +14,7 @@ use std::{sync::Arc, time::Duration}; use types::{Epoch, EthSpec}; const BOOTNODE_PORT: u16 = 42424; +const QUIC_PORT: u16 = 43424; pub const INVALID_ADDRESS: &str = "http://127.0.0.1:42423"; pub const EXECUTION_PORT: u16 = 4000; @@ -63,6 +64,7 @@ impl LocalNetwork { std::net::Ipv4Addr::UNSPECIFIED, BOOTNODE_PORT, BOOTNODE_PORT, + QUIC_PORT, ); beacon_config.network.enr_udp4_port = Some(BOOTNODE_PORT); beacon_config.network.enr_tcp4_port = Some(BOOTNODE_PORT); @@ -154,6 +156,7 @@ impl LocalNetwork { std::net::Ipv4Addr::UNSPECIFIED, BOOTNODE_PORT + count, BOOTNODE_PORT + count, + QUIC_PORT + count, ); beacon_config.network.enr_udp4_port = Some(BOOTNODE_PORT + count); beacon_config.network.enr_tcp4_port = Some(BOOTNODE_PORT + count);