From 198391032889579a7ac8812f4a602a2d941748bc Mon Sep 17 00:00:00 2001 From: jonay2000 Date: Tue, 11 Jun 2019 22:00:14 +0200 Subject: [PATCH 01/10] Added the basis of communities. Co-authored-by: Victor Roest --- Cargo.toml | 2 + src/community/basecommunity/mod.rs | 0 src/community/mod.rs | 210 +++++++++++++++++++++++++++++ src/community/peer.rs | 18 +++ src/configuration.rs | 8 +- src/crypto/keytypes.rs | 29 +++- src/lib.rs | 8 +- src/serialization/mod.rs | 27 +++- tests/testpyipv8packets.rs | 30 ++--- 9 files changed, 308 insertions(+), 24 deletions(-) create mode 100644 src/community/basecommunity/mod.rs create mode 100644 src/community/mod.rs create mode 100644 src/community/peer.rs diff --git a/Cargo.toml b/Cargo.toml index 672a4ddf..55c4f478 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,6 +28,8 @@ serde = { version = "1.0", features = ["derive"] } bincode = "1.1.3" rust_sodium = "0.10.2" openssl = { version = "0.10", features = ["vendored"] } +lazy_static = "1.2.0" +log = "0.4" [dev-dependencies] criterion = "0.2.11" diff --git a/src/community/basecommunity/mod.rs b/src/community/basecommunity/mod.rs new file mode 100644 index 00000000..e69de29b diff --git a/src/community/mod.rs b/src/community/mod.rs new file mode 100644 index 00000000..05189646 --- /dev/null +++ b/src/community/mod.rs @@ -0,0 +1,210 @@ +use crate::serialization::{Packet, PacketDeserializer}; +use crate::serialization::header::Header; +use std::error::Error; +use std::collections::HashMap; +use crate::networking::address::Address; +use std::fmt; + +pub mod basecommunity; +pub mod peer; + +create_error!( + HeaderUnwrapError, + "The community experienced an error trying to deserialize the header of a packet" +); +create_error!(MidError, "Failed to get the mid"); +create_error!( + UnknownCommunityError, + "No community with matching mid found" +); +create_error!(InsertionError, "Error inserting community into hashmap"); + +/// Example Community +/// +/// _**Note:** Try to avoid the use of .unwrap() in actual production code, this is just an example_ +/// ``` +/// use ipv8::community::peer::Peer; +/// use ipv8::community::Community; +/// use ipv8::serialization::header::Header; +/// use ipv8::serialization::{PacketDeserializer, Packet}; +/// use std::net::Ipv4Addr; +/// use ipv8::networking::address::Address; +/// use std::error::Error; +/// use ipv8::IPv8; +/// use ipv8::configuration::Config; +/// use ipv8::serialization::header::HeaderVersion::PyIPV8Header; +/// use ipv8::crypto::keytypes::PublicKey; +/// +/// pub struct TestCommunity{ +/// peer: Peer +/// } +/// +/// impl TestCommunity{ +/// fn new() -> Option { +/// +/// // Use the highest available key +/// let pk: PublicKey = PublicKey::from_vec(vec![ +/// 48, 129, 167, 48, 16, 6, 7, 42, 134, 72, 206, 61, 2, 1, 6, 5, 43, 129, 4, 0, 39, 3, +/// 129, 146, 0, 4, 2, 86, 251, 75, 206, 159, 133, 120, 63, 176, 235, 178, 14, 8, 197, 59, +/// 107, 51, 179, 139, 3, 155, 20, 194, 112, 113, 15, 40, 67, 115, 37, 223, 152, 7, 102, +/// 154, 214, 90, 110, 180, 226, 5, 190, 99, 163, 54, 116, 173, 121, 40, 80, 129, 142, 82, +/// 118, 154, 96, 127, 164, 248, 217, 91, 13, 80, 91, 94, 210, 16, 110, 108, 41, 57, 4, +/// 243, 49, 52, 194, 254, 130, 98, 229, 50, 84, 21, 206, 134, 223, 157, 189, 133, 50, 210, +/// 181, 93, 229, 32, 179, 228, 179, 132, 143, 147, 96, 207, 68, 48, 184, 160, 47, 227, 70, +/// 147, 23, 159, 213, 105, 134, 60, 211, 226, 8, 235, 186, 20, 241, 85, 170, 4, 3, 40, +/// 183, 98, 103, 80, 164, 128, 87, 205, 101, 67, 254, 83, 142, 133, +/// ])?; +/// +/// // Actually create the community +/// Some(TestCommunity { +/// peer: Peer::new( +/// pk, +/// Address{ +/// address: Ipv4Addr::new(0,0,0,0), +/// port: 0 +/// }, +/// true, +/// ) +/// }) +/// } +/// } +/// +/// impl Community for TestCommunity{ +/// +/// // Returns the hash of our master peer +/// fn get_mid(&self) -> Option> { +/// Some(self.peer.get_sha1()?.to_vec()) +/// } +/// +/// // The function which will be called when the community receives a packet +/// fn on_receive(&self, header: Header, deserializer: PacketDeserializer, address: Address) -> Result<(),Box>{ +/// # assert_eq!(header.mid_hash, self.get_mid()); +/// # assert_eq!(header.version, PyIPV8Header); +/// # assert_eq!(header.message_type, Some(42)); +/// // Do some stuff here like to distribute the message based on it's message_type (in the header) +/// Ok(()) +/// } +/// } +/// +/// let mut config = Config::default(); +/// let community = TestCommunity::new().unwrap(); +/// let mid = community.get_mid(); +/// config.communities.add_community(Box::new(community)); +/// let ipv8 = IPv8::new(config); +/// +/// // now simulate a packet coming in +/// +/// // Create a packet to test the community with +/// let packet = Packet::new(Header{ +/// size: 23, +/// version: PyIPV8Header, +/// mid_hash: mid, +/// message_type: Some(42), +/// }).unwrap(); +/// +/// // Normally you would want to sign the packet here +/// +/// // Yeet the packet +/// ipv8.config.communities.forward_message(packet,Address{ +/// address: Ipv4Addr::new(42,42,42,42), +/// port: 42, +/// }); +/// +/// ``` +pub trait Community { + // fn new() -> Option + // where + // Self: Sized; + + /// Returns the hash of our master p + fn get_mid(&self) -> Option>; + + /// Gets called whenever a packet is received directed at this community + /// DO NOT OVERRIDE + fn receive( + &self, + header: Header, + deserializer: PacketDeserializer, + address: Address, + ) -> Result<(), Box> { + // DO NOT OVERRIDE + //! used to pre-decode the header and filter out messages + + fn warn_deprecated(message: &str, address: Address) -> Result<(), Box> { + warn!( + "Received deprecated message {} from ({:?})", + message, address + ); + Ok(()) + } + return match header.message_type.ok_or(HeaderUnwrapError)? { + 255 => warn_deprecated("reserved-255", address), + 254 => warn_deprecated("on-missing-sequence", address), + 253 => warn_deprecated("missing-proof", address), + 252 => warn_deprecated("signature-request", address), + 251 => warn_deprecated("signature-response", address), + 248 => warn_deprecated("on-identity", address), + 247 => warn_deprecated("on-missing-identity", address), + 244 => warn_deprecated("destroy-community", address), + 243 => warn_deprecated("authorize", address), + 242 => warn_deprecated("revoke", address), + 241 => warn_deprecated("subjective-set", address), + 240 => warn_deprecated("missing-subjective-set", address), + 239 => warn_deprecated("on-missing-message", address), + 238 => warn_deprecated("undo-own", address), + 237 => warn_deprecated("undo-other", address), + 236 => warn_deprecated("dynamic-settings", address), + 235 => warn_deprecated("missing-last-message", address), + _ => self.on_receive(header, deserializer, address), + }; + } + + fn on_receive( + &self, + header: Header, + deserializer: PacketDeserializer, + address: Address, + ) -> Result<(), Box>; +} + +pub struct CommunityRegistry { + // mid, community + communities: HashMap, Box>, +} + +impl CommunityRegistry { + pub fn add_community(&mut self, item: Box) -> Result<(), Box> { + self.communities + .insert(item.get_mid().ok_or(MidError)?, item) + .ok_or(InsertionError)?; + Ok(()) + } + + /// Forwards the message to the corresponding community + pub fn forward_message(&self, packet: Packet, address: Address) -> Result<(), Box> { + // deserialize the header + let mut deserializer = packet.start_deserialize(); + + // We use peek here instead of get, even though we give the header along with the receive call + // this is because at this point, the header is not verified yet so we still assume the message is valid. + // We can't verify the header here yet as not all messages have a signature. Communities will have to decide + // on their own if they want to verify the header. We do give it along as only having to deserialize the header once + // makes it slightly more efficient. + let header = deserializer.peek_header()?; + + // get the mid from the header and use it for a hashtable lookup + let mid = header.mid_hash.as_ref().ok_or(MidError)?; + let community = self.communities.get(mid).ok_or(UnknownCommunityError)?; + + // Actually forward it + community.receive(header, deserializer, address) + } +} + +impl Default for CommunityRegistry { + fn default() -> Self { + Self { + communities: HashMap::new(), + } + } +} diff --git a/src/community/peer.rs b/src/community/peer.rs new file mode 100644 index 00000000..ab47ddd7 --- /dev/null +++ b/src/community/peer.rs @@ -0,0 +1,18 @@ +use crate::crypto::keytypes::PublicKey; +use crate::networking::address::Address; + +pub struct Peer{ + key: PublicKey, + address: Address, + intro: bool +} + +impl Peer{ + pub fn new(key: PublicKey, address: Address, intro: bool) -> Self{ + Self{key,address,intro} + } + + pub fn get_sha1(&self) -> Option<[u8; 20]>{ + self.key.sha1() + } +} diff --git a/src/configuration.rs b/src/configuration.rs index c9f56a48..9b377fac 100644 --- a/src/configuration.rs +++ b/src/configuration.rs @@ -1,5 +1,6 @@ use crate::networking::address::Address; use std::net::Ipv4Addr; +use crate::community::CommunityRegistry; pub struct Config { /// Default list of host used for peer discovery @@ -7,9 +8,12 @@ pub struct Config { /// from py-ipv8 configuration. UDP socket address. /// There split up in "address" and "port" pub socketaddress: Address, + + /// The registry containing all the communities + pub communities: CommunityRegistry } -impl Default for Config { +impl Default for Config{ fn default() -> Self { Config { socketaddress: Address { @@ -17,6 +21,8 @@ impl Default for Config { port: 8090, }, + communities: CommunityRegistry::default(), + default_hosts: vec![ // Dispersy Address { diff --git a/src/crypto/keytypes.rs b/src/crypto/keytypes.rs index 0fb8b772..a3e95301 100644 --- a/src/crypto/keytypes.rs +++ b/src/crypto/keytypes.rs @@ -16,9 +16,11 @@ //! | HIGH_SIGNATURE_LENGTH | 144 | 153 | 9 | 144 | //! | ED25519_SIGNATURE_LENGTH | 64 | 64 | 0 | 64 | +use std::fmt; + use openssl; +use openssl::sha::sha1; use rust_sodium::crypto::sign::ed25519; -use std::fmt; // TODO: when ed25519 becomes available for rust OpenSSL, rust_sodium will be removed. @@ -129,6 +131,10 @@ impl PublicKey { Some(m) } } + + pub fn sha1(&self) -> Option<[u8; 20]> { + Some(sha1(&self.to_vec()?)) + } } impl PartialEq for PublicKey { @@ -350,4 +356,25 @@ mod tests { keyvec ); } + + #[test] + fn test_sha1() { + let keyvec = vec![ + 48, 64, 48, 16, 6, 7, 42, 134, 72, 206, 61, 2, 1, 6, 5, 43, 129, 4, 0, 1, 3, 44, 0, 4, + 0, 80, 239, 172, 104, 165, 76, 172, 6, 229, 136, 156, 105, 23, 249, 46, 30, 148, 87, + 105, 57, 6, 105, 134, 2, 229, 115, 169, 44, 162, 41, 190, 228, 56, 20, 100, 64, 79, + 167, 224, 118, 14, + ]; + + let key = PublicKey::from_vec(keyvec.clone()).unwrap(); + let sha1sum = key.sha1().unwrap(); + + assert_eq!( + [ + 254, 127, 138, 49, 67, 126, 206, 58, 169, 85, 132, 14, 211, 247, 13, 170, 244, 166, + 152, 180, + ], + sha1sum + ) + } } diff --git a/src/lib.rs b/src/lib.rs index 479d1463..52882b82 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,6 +1,10 @@ +#[macro_use] +extern crate log; + pub mod error; pub mod serialization; +pub mod community; pub mod configuration; pub mod crypto; pub mod event; @@ -17,10 +21,10 @@ use configuration::Config; /// use ipv8::IPv8; /// use ipv8::configuration::Config; /// -/// let ipv8_instance = IPv8::new(Config::default()); +/// let ipv8 = IPv8::new(Config::default()); /// ``` pub struct IPv8 { - config: Config, + pub config: Config, } impl IPv8 { diff --git a/src/serialization/mod.rs b/src/serialization/mod.rs index 8e20ed62..93c3d290 100644 --- a/src/serialization/mod.rs +++ b/src/serialization/mod.rs @@ -22,13 +22,13 @@ create_error!(HeaderError, "The supplied header was invalid"); pub struct Packet(pub Vec); #[derive(Debug, PartialEq)] -pub struct PacketIterator { +pub struct PacketDeserializer { pub pntr: Packet, pub index: usize, } /// iterates over a packet to extract it's possibly multiple payloads -impl PacketIterator { +impl PacketDeserializer { /// Deserializes a stream of bytes into an ipv8 payload. Which payload is inferred by the type of T which is generic. /// T has to be deserializable and implement the Ipv8Payload trait. pub fn next_payload(&mut self) -> Result> @@ -57,7 +57,7 @@ impl PacketIterator { Ok(res) } - pub fn get_header(&mut self) -> Result> { + pub fn pop_header(&mut self) -> Result> { let res: Header = bincode::config() .big_endian() .deserialize(&self.pntr.0[self.index..])?; @@ -65,6 +65,13 @@ impl PacketIterator { Ok(res) } + pub fn peek_header(&self) -> Result> { + let res: Header = bincode::config() + .big_endian() + .deserialize(&self.pntr.0[self.index..])?; + Ok(res) + } + pub fn skip_header(mut self) -> Result> { let res: Header = bincode::config() .big_endian() @@ -135,8 +142,8 @@ impl Packet { } /// Deserializes a stream of bytes into ipv8 payloads. - pub fn start_deserialize(self) -> PacketIterator { - PacketIterator { + pub fn start_deserialize(self) -> PacketDeserializer { + PacketDeserializer { pntr: self, index: 0, } @@ -221,6 +228,16 @@ mod tests { // }); // } + #[test] + fn test_peek_header() { + let packet = Packet::new(create_test_header!()).unwrap(); + let mut deserializer = packet.start_deserialize(); + let header1 = deserializer.peek_header().unwrap(); + let header2 = deserializer.peek_header().unwrap(); + + assert_eq!(header1, header2); + } + #[test] fn test_sign_verify_verylow() { let a = TestPayload1 { test: 42 }; diff --git a/tests/testpyipv8packets.rs b/tests/testpyipv8packets.rs index b306619c..caa42c9b 100644 --- a/tests/testpyipv8packets.rs +++ b/tests/testpyipv8packets.rs @@ -32,7 +32,7 @@ fn test_packet_1() { ]); let mut deserializer = data.start_deserialize(); - let header: Header = deserializer.get_header().unwrap(); + let header: Header = deserializer.pop_header().unwrap(); assert_eq!( header, Header::py_ipv8_header( @@ -102,7 +102,7 @@ fn test_packet_2() { ]); let mut deserializer = data.start_deserialize(); - let header: Header = deserializer.get_header().unwrap(); + let header: Header = deserializer.pop_header().unwrap(); assert_eq!( header, Header::py_ipv8_header( @@ -156,7 +156,7 @@ fn test_packet_3() { ]); let mut deserializer = data.start_deserialize(); - let header: Header = deserializer.get_header().unwrap(); + let header: Header = deserializer.pop_header().unwrap(); assert_eq!( header, Header::py_ipv8_header( @@ -211,7 +211,7 @@ fn test_packet_4() { ]); let mut deserializer = data.start_deserialize(); - let header: Header = deserializer.get_header().unwrap(); + let header: Header = deserializer.pop_header().unwrap(); assert_eq!( header, Header::py_ipv8_header( @@ -266,7 +266,7 @@ fn test_packet_5() { ]); let mut deserializer = data.start_deserialize(); - let header: Header = deserializer.get_header().unwrap(); + let header: Header = deserializer.pop_header().unwrap(); assert_eq!( header, Header::py_ipv8_header( @@ -321,7 +321,7 @@ fn test_packet_6() { ]); let mut deserializer = data.start_deserialize(); - let header: Header = deserializer.get_header().unwrap(); + let header: Header = deserializer.pop_header().unwrap(); assert_eq!( header, Header::py_ipv8_header( @@ -376,7 +376,7 @@ fn test_packet_7() { ]); let mut deserializer = data.start_deserialize(); - let header: Header = deserializer.get_header().unwrap(); + let header: Header = deserializer.pop_header().unwrap(); assert_eq!( header, Header::py_ipv8_header( @@ -431,7 +431,7 @@ fn test_packet_8() { 0xc1, 0xb3, 0x53, 0x92, 0x1f, 0x58, 0xb6, 0x81, 0x0d, ]); let mut deserializer = data.start_deserialize(); - let header: Header = deserializer.get_header().unwrap(); + let header: Header = deserializer.pop_header().unwrap(); assert_eq!( header, @@ -503,7 +503,7 @@ fn test_packet_9() { 0xe7, 0x6f, 0xea, 0x90, 0x43, 0xfd, 0x5f, 0x93, 0x04, ]); let mut deserializer = data.start_deserialize(); - let header: Header = deserializer.get_header().unwrap(); + let header: Header = deserializer.pop_header().unwrap(); assert_eq!( header, @@ -581,7 +581,7 @@ fn test_packet_10() { 0xa2, ]); let mut deserializer = data.start_deserialize(); - let header: Header = deserializer.get_header().unwrap(); + let header: Header = deserializer.pop_header().unwrap(); assert!(deserializer.verify()); @@ -652,7 +652,7 @@ fn test_packet_11() { 0xa3, ]); let mut deserializer = data.start_deserialize(); - let header: Header = deserializer.get_header().unwrap(); + let header: Header = deserializer.pop_header().unwrap(); assert!(deserializer.verify()); @@ -723,7 +723,7 @@ fn test_packet_12() { 0xb8, ]); let mut deserializer = data.start_deserialize(); - let header: Header = deserializer.get_header().unwrap(); + let header: Header = deserializer.pop_header().unwrap(); assert!(deserializer.verify()); @@ -793,7 +793,7 @@ fn test_packet_13() { 0x56, ]); let mut deserializer = data.start_deserialize(); - let header: Header = deserializer.get_header().unwrap(); + let header: Header = deserializer.pop_header().unwrap(); assert!(deserializer.verify()); @@ -863,7 +863,7 @@ fn test_packet_14() { 0xd8, ]); let mut deserializer = data.start_deserialize(); - let header: Header = deserializer.get_header().unwrap(); + let header: Header = deserializer.pop_header().unwrap(); assert!(deserializer.verify()); @@ -928,7 +928,7 @@ fn test_packet_15() { 0x0b, ]); let mut deserializer = data.start_deserialize(); - let header: Header = deserializer.get_header().unwrap(); + let header: Header = deserializer.pop_header().unwrap(); assert!(deserializer.verify()); From f9cdb0c3b744299dde5e6fad6aec6adeb3bdb578 Mon Sep 17 00:00:00 2001 From: Victor Roest Date: Tue, 11 Jun 2019 22:31:23 +0200 Subject: [PATCH 02/10] Minor refactor --- src/community/basecommunity/mod.rs | 0 src/community/mod.rs | 9 ++++----- src/community/peer.rs | 16 ++++++++++------ src/configuration.rs | 4 ++-- src/serialization/mod.rs | 17 ++++++----------- 5 files changed, 22 insertions(+), 24 deletions(-) delete mode 100644 src/community/basecommunity/mod.rs diff --git a/src/community/basecommunity/mod.rs b/src/community/basecommunity/mod.rs deleted file mode 100644 index e69de29b..00000000 diff --git a/src/community/mod.rs b/src/community/mod.rs index 05189646..4ab95106 100644 --- a/src/community/mod.rs +++ b/src/community/mod.rs @@ -5,7 +5,6 @@ use std::collections::HashMap; use crate::networking::address::Address; use std::fmt; -pub mod basecommunity; pub mod peer; create_error!( @@ -116,7 +115,7 @@ pub trait Community { // where // Self: Sized; - /// Returns the hash of our master p + /// Returns the hash of our master peer public key fn get_mid(&self) -> Option>; /// Gets called whenever a packet is received directed at this community @@ -137,7 +136,7 @@ pub trait Community { ); Ok(()) } - return match header.message_type.ok_or(HeaderUnwrapError)? { + match header.message_type.ok_or(HeaderUnwrapError)? { 255 => warn_deprecated("reserved-255", address), 254 => warn_deprecated("on-missing-sequence", address), 253 => warn_deprecated("missing-proof", address), @@ -156,7 +155,7 @@ pub trait Community { 236 => warn_deprecated("dynamic-settings", address), 235 => warn_deprecated("missing-last-message", address), _ => self.on_receive(header, deserializer, address), - }; + } } fn on_receive( @@ -183,7 +182,7 @@ impl CommunityRegistry { /// Forwards the message to the corresponding community pub fn forward_message(&self, packet: Packet, address: Address) -> Result<(), Box> { // deserialize the header - let mut deserializer = packet.start_deserialize(); + let deserializer = packet.start_deserialize(); // We use peek here instead of get, even though we give the header along with the receive call // this is because at this point, the header is not verified yet so we still assume the message is valid. diff --git a/src/community/peer.rs b/src/community/peer.rs index ab47ddd7..583e69d0 100644 --- a/src/community/peer.rs +++ b/src/community/peer.rs @@ -1,18 +1,22 @@ use crate::crypto::keytypes::PublicKey; use crate::networking::address::Address; -pub struct Peer{ +pub struct Peer { key: PublicKey, address: Address, - intro: bool + intro: bool, } -impl Peer{ - pub fn new(key: PublicKey, address: Address, intro: bool) -> Self{ - Self{key,address,intro} +impl Peer { + pub fn new(key: PublicKey, address: Address, intro: bool) -> Self { + Self { + key, + address, + intro, + } } - pub fn get_sha1(&self) -> Option<[u8; 20]>{ + pub fn get_sha1(&self) -> Option<[u8; 20]> { self.key.sha1() } } diff --git a/src/configuration.rs b/src/configuration.rs index 9b377fac..dc7a0e77 100644 --- a/src/configuration.rs +++ b/src/configuration.rs @@ -10,10 +10,10 @@ pub struct Config { pub socketaddress: Address, /// The registry containing all the communities - pub communities: CommunityRegistry + pub communities: CommunityRegistry, } -impl Default for Config{ +impl Default for Config { fn default() -> Self { Config { socketaddress: Address { diff --git a/src/serialization/mod.rs b/src/serialization/mod.rs index 93c3d290..634ea851 100644 --- a/src/serialization/mod.rs +++ b/src/serialization/mod.rs @@ -57,26 +57,21 @@ impl PacketDeserializer { Ok(res) } - pub fn pop_header(&mut self) -> Result> { + pub fn peek_header(&self) -> Result> { let res: Header = bincode::config() .big_endian() .deserialize(&self.pntr.0[self.index..])?; - self.index += res.size; Ok(res) } - pub fn peek_header(&self) -> Result> { - let res: Header = bincode::config() - .big_endian() - .deserialize(&self.pntr.0[self.index..])?; + pub fn pop_header(&mut self) -> Result> { + let res = self.peek_header()?; + self.index += res.size; Ok(res) } pub fn skip_header(mut self) -> Result> { - let res: Header = bincode::config() - .big_endian() - .deserialize(&self.pntr.0[self.index..])?; - self.index += res.size; + self.pop_header()?; Ok(self) } @@ -231,7 +226,7 @@ mod tests { #[test] fn test_peek_header() { let packet = Packet::new(create_test_header!()).unwrap(); - let mut deserializer = packet.start_deserialize(); + let deserializer = packet.start_deserialize(); let header1 = deserializer.peek_header().unwrap(); let header2 = deserializer.peek_header().unwrap(); From e61e09c0f8acbfd996d282581ad882ab141372f4 Mon Sep 17 00:00:00 2001 From: jonay2000 Date: Tue, 11 Jun 2019 22:35:39 +0200 Subject: [PATCH 03/10] formatted --- src/crypto/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/crypto/mod.rs b/src/crypto/mod.rs index 788dd1b7..65f3f58d 100644 --- a/src/crypto/mod.rs +++ b/src/crypto/mod.rs @@ -91,7 +91,7 @@ pub fn verify_signature_openssl( mod tests { use crate::crypto::{ create_signature_openssl, verify_signature_ed25519, verify_signature_openssl, SizeError, -}; + }; use openssl::bn::BigNum; use rust_sodium::crypto::sign::ed25519; use std::error::Error; From 4c3aae47195d126bcb320eb37c83d6d169db01d9 Mon Sep 17 00:00:00 2001 From: Victor Roest Date: Tue, 11 Jun 2019 22:41:42 +0200 Subject: [PATCH 04/10] Make sure pre-commit runs on the stable branch of rust --- hooks/pre-commit | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hooks/pre-commit b/hooks/pre-commit index 1768075b..dbf281a9 100755 --- a/hooks/pre-commit +++ b/hooks/pre-commit @@ -23,7 +23,7 @@ fi # Check rustfmt against the git tree printf "${PREFIX} Checking formatting ... " -command cargo fmt -- --check > /dev/null +command cargo +stable fmt -- --check > /dev/null if [[ $? == 0 ]]; then printf "${SUCCESS}\n" exit 0 From c091b5e43b50ed983ecee443cfda8594670a4686 Mon Sep 17 00:00:00 2001 From: Victor Roest Date: Tue, 11 Jun 2019 23:21:02 +0200 Subject: [PATCH 05/10] Improved pre-commit hook some more for easier use --- hooks/pre-commit | 5 +++-- src/crypto/mod.rs | 2 +- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/hooks/pre-commit b/hooks/pre-commit index dbf281a9..ec5b30dd 100755 --- a/hooks/pre-commit +++ b/hooks/pre-commit @@ -13,11 +13,12 @@ SUCCESS="${GREEN}ok${RC}" # Check if rustfmt is installed printf "${PREFIX} Checking for rustfmt ... " -command -v cargo fmt &>/dev/null -if [[ $? == 0 ]]; then +OUT=$(cargo +stable fmt --version 2>&1 > /dev/null ) # Save the output of the command to OUT for later printing +if [[ $? -eq 0 ]]; then printf "${SUCCESS}\n" else printf "${FAILURE}\n" + printf "${OUT}\n" exit 1 fi diff --git a/src/crypto/mod.rs b/src/crypto/mod.rs index 65f3f58d..788dd1b7 100644 --- a/src/crypto/mod.rs +++ b/src/crypto/mod.rs @@ -91,7 +91,7 @@ pub fn verify_signature_openssl( mod tests { use crate::crypto::{ create_signature_openssl, verify_signature_ed25519, verify_signature_openssl, SizeError, - }; +}; use openssl::bn::BigNum; use rust_sodium::crypto::sign::ed25519; use std::error::Error; From d8d98787ab461fd74578dc0e3691f731ebb3427c Mon Sep 17 00:00:00 2001 From: jonay2000 Date: Thu, 13 Jun 2019 01:53:58 +0200 Subject: [PATCH 06/10] added the network io interface. This system creates sockets and distributes all incoming messages over a threadpool with a task stealing algorithm Co-authored-by: Victor Roest --- Cargo.toml | 3 + src/community/mod.rs | 3 +- src/configuration.rs | 18 ++ src/crypto/mod.rs | 3 +- src/crypto/signature.rs | 1 - src/error.rs | 7 +- src/lib.rs | 16 +- src/networking/connection.rs | 27 --- src/networking/connector.rs | 1 - src/networking/mod.rs | 211 +++++++++++++++++- .../binmemberauthenticationpayload.rs | 12 +- src/serialization/header.rs | 35 +-- src/serialization/mod.rs | 22 +- src/serialization/varlen.rs | 4 +- 14 files changed, 296 insertions(+), 67 deletions(-) delete mode 100644 src/networking/connection.rs delete mode 100644 src/networking/connector.rs diff --git a/Cargo.toml b/Cargo.toml index 55c4f478..a9b2e09a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,9 +30,12 @@ rust_sodium = "0.10.2" openssl = { version = "0.10", features = ["vendored"] } lazy_static = "1.2.0" log = "0.4" +mio = "0.6.19" +rayon = "1.0.3" [dev-dependencies] criterion = "0.2.11" +simple_logger = "1.3.0" [[bench]] name = "bench_crypto" diff --git a/src/community/mod.rs b/src/community/mod.rs index 4ab95106..2059cf9c 100644 --- a/src/community/mod.rs +++ b/src/community/mod.rs @@ -3,7 +3,6 @@ use crate::serialization::header::Header; use std::error::Error; use std::collections::HashMap; use crate::networking::address::Address; -use std::fmt; pub mod peer; @@ -89,7 +88,7 @@ create_error!(InsertionError, "Error inserting community into hashmap"); /// let community = TestCommunity::new().unwrap(); /// let mid = community.get_mid(); /// config.communities.add_community(Box::new(community)); -/// let ipv8 = IPv8::new(config); +/// let ipv8 = IPv8::new(config).unwrap(); /// /// // now simulate a packet coming in /// diff --git a/src/configuration.rs b/src/configuration.rs index dc7a0e77..6ad2a3fb 100644 --- a/src/configuration.rs +++ b/src/configuration.rs @@ -1,8 +1,19 @@ use crate::networking::address::Address; use std::net::Ipv4Addr; use crate::community::CommunityRegistry; +use std::time::Duration; pub struct Config { + /// the amount of space reserved for queueing up incoming messages (messages) + pub queuesize: usize, + /// the size of the buffer reserved for incoming messages (bytes) + pub buffersize: usize, + /// frequency at which polling times out and events are checked (ms) + /// None is as fast as possible + pub pollinterval: Option, + /// the max number of threads to use in the network manager. 0 is #cores + pub threadcount: usize, + /// Default list of host used for peer discovery pub default_hosts: Vec
, /// from py-ipv8 configuration. UDP socket address. @@ -16,6 +27,13 @@ pub struct Config { impl Default for Config { fn default() -> Self { Config { + queuesize: 100, + buffersize: 2048, + pollinterval: None, + + // zero means equal to number of cores + threadcount: 0, + socketaddress: Address { address: Ipv4Addr::new(0, 0, 0, 0), port: 8090, diff --git a/src/crypto/mod.rs b/src/crypto/mod.rs index 65f3f58d..ebcdbeae 100644 --- a/src/crypto/mod.rs +++ b/src/crypto/mod.rs @@ -9,7 +9,6 @@ use openssl::sign::Signer; use rust_sodium; use rust_sodium::crypto::sign::ed25519; use std::error::Error; -use std::fmt; use std::os::raw::c_int; create_error!(SignatureError, "Invalid signature"); @@ -34,7 +33,7 @@ pub fn verify_signature_ed25519( pkey: ed25519::PublicKey, ) -> Result> { let verify = ed25519::verify_detached( - &ed25519::Signature::from_slice(&*signature).ok_or(Box::new(SignatureError))?, + &ed25519::Signature::from_slice(&*signature).ok_or_else(|| Box::new(SignatureError))?, data, &pkey, ); diff --git a/src/crypto/signature.rs b/src/crypto/signature.rs index 2b196b79..68cefa12 100644 --- a/src/crypto/signature.rs +++ b/src/crypto/signature.rs @@ -1,5 +1,4 @@ use std::error::Error; -use std::fmt; use openssl::bn::BigNum; use serde::ser::SerializeTuple; diff --git a/src/error.rs b/src/error.rs index 92146636..ba9d64e4 100644 --- a/src/error.rs +++ b/src/error.rs @@ -5,13 +5,13 @@ macro_rules! create_error { #[derive(Debug)] pub struct $name; - impl fmt::Display for $name { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + impl std::fmt::Display for $name { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { write!(f, $message) } } - impl Error for $name { + impl std::error::Error for $name { fn description(&self) -> &str { $message } @@ -22,7 +22,6 @@ macro_rules! create_error { #[cfg(test)] mod tests { use std::error::Error; - use std::fmt; #[test] fn test_errors() { diff --git a/src/lib.rs b/src/lib.rs index 52882b82..288a81b7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -12,6 +12,8 @@ pub mod networking; pub mod payloads; use configuration::Config; +use crate::networking::NetworkManager; +use std::error::Error; /// The IPv8 instance. /// This struct is how you can interact with the network. @@ -25,10 +27,20 @@ use configuration::Config; /// ``` pub struct IPv8 { pub config: Config, + pub networkmanager: NetworkManager, } impl IPv8 { - pub fn new(config: configuration::Config) -> Self { - IPv8 { config } + pub fn new(config: configuration::Config) -> Result> { + let networkmanager = + NetworkManager::new(&config.socketaddress, config.threadcount.to_owned())?; + Ok(IPv8 { + config, + networkmanager, + }) + } + + pub fn start(self) { + self.networkmanager.start(&self.config); } } diff --git a/src/networking/connection.rs b/src/networking/connection.rs deleted file mode 100644 index 62bc2a3f..00000000 --- a/src/networking/connection.rs +++ /dev/null @@ -1,27 +0,0 @@ -use super::{super::event::EventGenerator, address::Address}; -use std::net::{SocketAddr, UdpSocket}; - -// NOTE: i am really unhappy with how this connection class works as of now. -// please improve - -pub struct Connection { - socket: UdpSocket, - on_message: EventGenerator, -} - -impl Connection { - fn new(address: Address) -> Result { - let socketaddress = SocketAddr::from((address.address, address.port)); - let socket = match UdpSocket::bind(socketaddress) { - Ok(i) => i, - Err(i) => return Err(format!("{:?}", i)), - }; - - Ok(Connection { - socket, - on_message: EventGenerator::new(), - }) - } - - fn send(address: Address, data: Vec) {} -} diff --git a/src/networking/connector.rs b/src/networking/connector.rs deleted file mode 100644 index 50e15566..00000000 --- a/src/networking/connector.rs +++ /dev/null @@ -1 +0,0 @@ -struct Connector {} diff --git a/src/networking/mod.rs b/src/networking/mod.rs index d057bedc..988949a7 100644 --- a/src/networking/mod.rs +++ b/src/networking/mod.rs @@ -1,3 +1,210 @@ +use std::net::{SocketAddr, IpAddr}; +use crate::networking::address::Address; +use crate::serialization::Packet; +use std::error::Error; +use mio::net::UdpSocket; +use rayon::{ThreadPool, ThreadPoolBuilder}; +use std::thread; +use std::thread::JoinHandle; +use mio::{Poll, Token, Events, Ready, PollOpt}; +use crate::configuration::Config; +use rayon::iter::{IntoParallelRefIterator, ParallelIterator}; +use std::time::Duration; + pub mod address; -pub mod connection; -pub mod connector; + +create_error!(SocketCreationError, "The socket creation failed"); +create_error!(ListenError, "An error occured during the listening"); + +pub trait Receiver { + fn on_receive(&self, packet: Packet, address: Address); +} + +pub struct NetworkManager { + receivers: Vec>, + socket: UdpSocket, + threadpool: ThreadPool, +} + +impl NetworkManager { + pub fn new(address: &Address, threadcount: usize) -> Result> { + let socket = UdpSocket::bind(&SocketAddr::new(IpAddr::V4(address.address), address.port)) + .or(Err(SocketCreationError))?; + + let pool = ThreadPoolBuilder::new() + .num_threads(threadcount) + .breadth_first() + .build()?; + + let nm = Self { + threadpool: pool, + receivers: vec![], + socket, + }; + Ok(nm) + } + + pub fn start(self, configuration: &Config) -> JoinHandle<()> { + let queuesize = configuration.queuesize.to_owned(); + let buffersize = configuration.buffersize.to_owned(); + let pollinterval = configuration.pollinterval.to_owned(); + + thread::spawn(move || { + self.listen(queuesize, buffersize, pollinterval) + .or_else(|i| { + error!("the listening thread crashed"); + Err(i) + }) + .unwrap(); // :gasp: <-- here it's allowed as it will only crash this thread + }) + } + + pub fn listen( + self, + queuesize: usize, + buffersize: usize, + pollinterval: Option, + ) -> Result<(), Box> { + debug!("IPV8 is starting it's listener!"); + + let poll = Poll::new()?; + // this is basically a generated magic number we can later check for + const RECEIVER: Token = Token(0); + let mut events = Events::with_capacity(queuesize); + + let mut tmp_buf = vec![0; buffersize]; + let buffer = tmp_buf.as_mut_slice(); + + poll.register(&self.socket, RECEIVER, Ready::readable(), PollOpt::edge())?; + + loop { + poll.poll(&mut events, pollinterval)?; + trace!("checking poll"); + for _ in events.iter() { + debug!("handling event"); + + let (recv_size, address) = self.socket.recv_from(buffer)?; + + let packet = Packet(buffer[..recv_size].to_vec()).clone(); + + let ip = match address.ip() { + IpAddr::V4(a) => a, + IpAddr::V6(_) => { + warn!("Unexpectedly received ipv6 packet"); + continue; + } + }; + + // use our own threadpool + self.threadpool.install(|| { + // iterate over the receivers asynchronously and non blocking + self.receivers.par_iter().for_each(|r| { + r.on_receive( + packet.clone(), + Address { + address: ip.to_owned(), + port: address.port(), + }, + ); + }); + }); + } + } + } + + pub fn add_receiver(&mut self, receiver: Box) { + self.receivers.push(receiver) + } + + pub fn send(address: Address, packet: Packet) -> Result<(), Box> { + unimplemented!( + "Trying to send {:?} to {:?} but sending is not implemented", + packet, + address + ) + } +} + +#[cfg(test)] +mod tests { + use lazy_static::lazy_static; + + use crate::IPv8; + use crate::configuration::Config; + use mio::net::UdpSocket; + use crate::networking::address::Address; + use std::net::{Ipv4Addr, SocketAddr, IpAddr}; + use crate::serialization::Packet; + use std::sync::Once; + use std::time::Duration; + use crate::networking::Receiver; + use std::thread; + use std::cmp::Ordering; + use std::sync::atomic::AtomicUsize; + + static BEFORE: Once = Once::new(); + + // A poor man's @Before + fn before() { + BEFORE.call_once(|| { + simple_logger::init().unwrap(); + }) + } + + // `pacman -Syu networkmanager` + #[test] + fn test_networkmanager() { + before(); + + // start ipv8 + let mut config = Config::default(); + let address = Ipv4Addr::new(0, 0, 0, 0); + let port = 8090; + + config.socketaddress = Address { address, port }; + config.buffersize = 2048; + + let mut ipv8 = IPv8::new(config).unwrap(); + + lazy_static! { + static ref OGPacket: Packet = Packet::new(create_test_header!()).unwrap(); + } + + static C: AtomicUsize = AtomicUsize::new(0); + + //create receiver + struct AReceiver; + impl Receiver for AReceiver { + fn on_receive(&self, packet: Packet, address: Address) { + assert_eq!(packet.raw(), OGPacket.raw()); + C.fetch_add(1, std::sync::atomic::Ordering::SeqCst); + } + } + + ipv8.networkmanager.add_receiver(Box::new(AReceiver)); + + ipv8.start(); + // wait for it to start up + thread::sleep(Duration::from_millis(300)); + + // now try to send ipv8 a message + let sender_socket = UdpSocket::bind(&SocketAddr::new(IpAddr::V4(address), 0)).unwrap(); + + sender_socket + .connect(SocketAddr::new(IpAddr::V4(address), port)) + .unwrap(); + + let a = sender_socket.send(OGPacket.raw()).unwrap(); + assert_eq!(a, OGPacket.raw().len()); + + thread::sleep(Duration::from_millis(20)); + + let b = sender_socket.send(OGPacket.raw()).unwrap(); + assert_eq!(b, OGPacket.raw().len()); + + thread::sleep(Duration::from_millis(20)); + + // a poor man's `verify(AReceiver, times(2)).on_receiver();` + assert_eq!(2, C.load(std::sync::atomic::Ordering::SeqCst)); + } +} diff --git a/src/payloads/binmemberauthenticationpayload.rs b/src/payloads/binmemberauthenticationpayload.rs index bb9a5e85..b0cecaf9 100644 --- a/src/payloads/binmemberauthenticationpayload.rs +++ b/src/payloads/binmemberauthenticationpayload.rs @@ -23,9 +23,9 @@ impl Serialize for BinMemberAuthenticationPayload { where S: Serializer, { - let v = self.public_key_bin.to_vec().ok_or(ser::Error::custom( - "The key was malformed in a way which made it unserializable.", - ))?; + let v = self.public_key_bin.to_vec().ok_or_else(|| { + ser::Error::custom("The key was malformed in a way which made it unserializable.") + })?; let mut state = serializer.serialize_tuple(v.len() + 2)?; state.serialize_element(&(v.len() as u16))?; @@ -50,9 +50,9 @@ impl<'de> Deserialize<'de> for BinMemberAuthenticationPayload { { // first deserialize it to a temporary struct which literally represents the packer let payload_temporary = BinMemberAuthenticationPayloadPattern::deserialize(deserializer)?; - let public_key_bin = PublicKey::from_vec((payload_temporary.0).0).ok_or( - de::Error::custom("The key was malformed in a way which made it undeserializable."), - )?; + let public_key_bin = PublicKey::from_vec((payload_temporary.0).0).ok_or_else(|| { + de::Error::custom("The key was malformed in a way which made it undeserializable.") + })?; // now build the struct for real Ok(BinMemberAuthenticationPayload { diff --git a/src/serialization/header.rs b/src/serialization/header.rs index 2ddc4c5e..2b659ad6 100644 --- a/src/serialization/header.rs +++ b/src/serialization/header.rs @@ -105,9 +105,9 @@ impl Serialize for Header { } // unwrap the message type - let message_type: u8 = self.message_type.ok_or(serde::ser::Error::custom( - "Message type was empty and this wasn't expected", - ))? as u8; + let message_type: u8 = self.message_type.ok_or_else(|| { + serde::ser::Error::custom("Message type was empty and this wasn't expected") + })? as u8; // Serialize the message type state.serialize_element(&message_type)?; @@ -146,16 +146,16 @@ impl<'de> Deserialize<'de> for Header { // this block is here to be breaked out of or else return an error // Deserialize the first two bytes into `version_bytes` - version_bytes.push(seq.next_element()?.ok_or(serde::de::Error::custom( - "No valid header type could be determined", - ))?); - version_bytes.push(seq.next_element()?.ok_or(serde::de::Error::custom( - "No valid header type could be determined", - ))?); + version_bytes.push(seq.next_element()?.ok_or_else(|| { + serde::de::Error::custom("No valid header type could be determined") + })?); + version_bytes.push(seq.next_element()?.ok_or_else(|| { + serde::de::Error::custom("No valid header type could be determined") + })?); // Check if they match the header version of PyIPv8: `0002`, if so set it and break out of // the pseudo-loop - if version_bytes.as_slice() == [00, 02] { + if version_bytes.as_slice() == [0, 2] { version = Some(HeaderVersion::PyIPV8Header); break; } @@ -179,14 +179,15 @@ impl<'de> Deserialize<'de> for Header { let mut mid_hash: [u8; 20] = [0; 20]; // Init with zeroes for i in mid_hash.iter_mut() { - *i = seq.next_element()?.ok_or(serde::de::Error::custom( - "No valid header type could be determined", - ))?; + *i = seq.next_element()?.ok_or_else(|| { + serde::de::Error::custom( + "No valid header type could be determined", + ) + })?; } - let message_type: u8 = - seq.next_element()?.ok_or(serde::de::Error::custom( - "No valid header type could be determined", - ))?; + let message_type: u8 = seq.next_element()?.ok_or_else(|| { + serde::de::Error::custom("No valid header type could be determined") + })?; Ok(Header::py_ipv8_header(mid_hash, message_type)) } diff --git a/src/serialization/mod.rs b/src/serialization/mod.rs index 634ea851..4c826bb6 100644 --- a/src/serialization/mod.rs +++ b/src/serialization/mod.rs @@ -14,13 +14,18 @@ use bincode; use bincode::ErrorKind; use serde::{Deserialize, Serialize}; use std::error::Error; -use std::fmt; create_error!(HeaderError, "The supplied header was invalid"); #[derive(Debug, Serialize, Deserialize, PartialEq)] pub struct Packet(pub Vec); +impl Clone for Packet { + fn clone(&self) -> Packet { + Packet(self.0.to_vec()) + } +} + #[derive(Debug, PartialEq)] pub struct PacketDeserializer { pub pntr: Packet, @@ -119,6 +124,10 @@ impl Packet { Ok(res) } + pub fn raw(&self) -> &[u8] { + &*self.0 + } + /// Signs a packet. After this, new payloads must under no circumstances be added as this will /// break the verification process on the receiving end. There is no check for this by design for a speed boost /// (though this may or may not be revisited later). Sign deliberately consumes self and returns it again so it can @@ -223,6 +232,17 @@ mod tests { // }); // } + #[test] + fn test_raw() { + let header = create_test_header!(); + let packet = Packet::new(header).unwrap(); + let raw = packet.raw(); + assert_eq!( + raw, + &[0, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 42,] + ) + } + #[test] fn test_peek_header() { let packet = Packet::new(create_test_header!()).unwrap(); diff --git a/src/serialization/varlen.rs b/src/serialization/varlen.rs index 14af3316..a37b1437 100644 --- a/src/serialization/varlen.rs +++ b/src/serialization/varlen.rs @@ -44,13 +44,13 @@ impl<'de> Deserialize<'de> for VarLen16 { // first read the length from the sequence let length: u16 = seq .next_element()? - .ok_or(serde::de::Error::invalid_length(1, &self))?; + .ok_or_else(|| serde::de::Error::invalid_length(1, &self))?; // now read that many bytes from the sequence for _i in 0..length { res.push( seq.next_element()? - .ok_or(serde::de::Error::invalid_length(1, &self))?, + .ok_or_else(|| serde::de::Error::invalid_length(1, &self))?, ); } From 2909194d526a61bc9de85f040f590b2324743191 Mon Sep 17 00:00:00 2001 From: jonay2000 Date: Thu, 13 Jun 2019 02:00:39 +0200 Subject: [PATCH 07/10] resolved some warnings and improved testing a bit --- src/networking/mod.rs | 35 +++++++++++++++++++++-------------- 1 file changed, 21 insertions(+), 14 deletions(-) diff --git a/src/networking/mod.rs b/src/networking/mod.rs index 988949a7..7525a515 100644 --- a/src/networking/mod.rs +++ b/src/networking/mod.rs @@ -139,7 +139,6 @@ mod tests { use std::time::Duration; use crate::networking::Receiver; use std::thread; - use std::cmp::Ordering; use std::sync::atomic::AtomicUsize; static BEFORE: Once = Once::new(); @@ -159,25 +158,32 @@ mod tests { // start ipv8 let mut config = Config::default(); let address = Ipv4Addr::new(0, 0, 0, 0); - let port = 8090; + static RECV_PORT: u16 = 8090; + static SEND_PORT: u16 = 30240; - config.socketaddress = Address { address, port }; + config.socketaddress = Address { + address, + port: RECV_PORT, + }; config.buffersize = 2048; let mut ipv8 = IPv8::new(config).unwrap(); lazy_static! { - static ref OGPacket: Packet = Packet::new(create_test_header!()).unwrap(); + static ref OGPACKET: Packet = Packet::new(create_test_header!()).unwrap(); } - static C: AtomicUsize = AtomicUsize::new(0); + static PACKET_COUNTER: AtomicUsize = AtomicUsize::new(0); //create receiver struct AReceiver; impl Receiver for AReceiver { fn on_receive(&self, packet: Packet, address: Address) { - assert_eq!(packet.raw(), OGPacket.raw()); - C.fetch_add(1, std::sync::atomic::Ordering::SeqCst); + assert_eq!(OGPACKET.raw(), packet.raw()); + assert_eq!(SEND_PORT, address.port); + + // Count each packet + PACKET_COUNTER.fetch_add(1, std::sync::atomic::Ordering::SeqCst); } } @@ -188,23 +194,24 @@ mod tests { thread::sleep(Duration::from_millis(300)); // now try to send ipv8 a message - let sender_socket = UdpSocket::bind(&SocketAddr::new(IpAddr::V4(address), 0)).unwrap(); + let sender_socket = + UdpSocket::bind(&SocketAddr::new(IpAddr::V4(address), SEND_PORT)).unwrap(); sender_socket - .connect(SocketAddr::new(IpAddr::V4(address), port)) + .connect(SocketAddr::new(IpAddr::V4(address), RECV_PORT)) .unwrap(); - let a = sender_socket.send(OGPacket.raw()).unwrap(); - assert_eq!(a, OGPacket.raw().len()); + let a = sender_socket.send(OGPACKET.raw()).unwrap(); + assert_eq!(a, OGPACKET.raw().len()); thread::sleep(Duration::from_millis(20)); - let b = sender_socket.send(OGPacket.raw()).unwrap(); - assert_eq!(b, OGPacket.raw().len()); + let b = sender_socket.send(OGPACKET.raw()).unwrap(); + assert_eq!(b, OGPACKET.raw().len()); thread::sleep(Duration::from_millis(20)); // a poor man's `verify(AReceiver, times(2)).on_receiver();` - assert_eq!(2, C.load(std::sync::atomic::Ordering::SeqCst)); + assert_eq!(2, PACKET_COUNTER.load(std::sync::atomic::Ordering::SeqCst)); } } From e986bf46242c01227fdaa25d690ae73ae0bad7e5 Mon Sep 17 00:00:00 2001 From: Victor Roest Date: Thu, 13 Jun 2019 02:10:29 +0200 Subject: [PATCH 08/10] fixed rustfmt --- src/crypto/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/crypto/mod.rs b/src/crypto/mod.rs index ebcdbeae..fbbaab26 100644 --- a/src/crypto/mod.rs +++ b/src/crypto/mod.rs @@ -90,7 +90,7 @@ pub fn verify_signature_openssl( mod tests { use crate::crypto::{ create_signature_openssl, verify_signature_ed25519, verify_signature_openssl, SizeError, - }; +}; use openssl::bn::BigNum; use rust_sodium::crypto::sign::ed25519; use std::error::Error; From 75bbdfa8ec2d824c6b9dbed9f0902d48155bb1ce Mon Sep 17 00:00:00 2001 From: jonay2000 Date: Thu, 13 Jun 2019 01:53:58 +0200 Subject: [PATCH 09/10] added the network io interface. This system creates sockets and distributes all incoming messages over a threadpool with a task stealing algorithm Co-authored-by: Victor Roest --- Cargo.toml | 3 + src/community/mod.rs | 3 +- src/configuration.rs | 18 ++ src/crypto/mod.rs | 3 +- src/crypto/signature.rs | 1 - src/error.rs | 7 +- src/lib.rs | 16 +- src/networking/connection.rs | 27 --- src/networking/connector.rs | 1 - src/networking/mod.rs | 211 +++++++++++++++++- .../binmemberauthenticationpayload.rs | 12 +- src/serialization/header.rs | 35 +-- src/serialization/mod.rs | 22 +- src/serialization/varlen.rs | 4 +- 14 files changed, 296 insertions(+), 67 deletions(-) delete mode 100644 src/networking/connection.rs delete mode 100644 src/networking/connector.rs diff --git a/Cargo.toml b/Cargo.toml index 55c4f478..a9b2e09a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,9 +30,12 @@ rust_sodium = "0.10.2" openssl = { version = "0.10", features = ["vendored"] } lazy_static = "1.2.0" log = "0.4" +mio = "0.6.19" +rayon = "1.0.3" [dev-dependencies] criterion = "0.2.11" +simple_logger = "1.3.0" [[bench]] name = "bench_crypto" diff --git a/src/community/mod.rs b/src/community/mod.rs index 4ab95106..2059cf9c 100644 --- a/src/community/mod.rs +++ b/src/community/mod.rs @@ -3,7 +3,6 @@ use crate::serialization::header::Header; use std::error::Error; use std::collections::HashMap; use crate::networking::address::Address; -use std::fmt; pub mod peer; @@ -89,7 +88,7 @@ create_error!(InsertionError, "Error inserting community into hashmap"); /// let community = TestCommunity::new().unwrap(); /// let mid = community.get_mid(); /// config.communities.add_community(Box::new(community)); -/// let ipv8 = IPv8::new(config); +/// let ipv8 = IPv8::new(config).unwrap(); /// /// // now simulate a packet coming in /// diff --git a/src/configuration.rs b/src/configuration.rs index dc7a0e77..6ad2a3fb 100644 --- a/src/configuration.rs +++ b/src/configuration.rs @@ -1,8 +1,19 @@ use crate::networking::address::Address; use std::net::Ipv4Addr; use crate::community::CommunityRegistry; +use std::time::Duration; pub struct Config { + /// the amount of space reserved for queueing up incoming messages (messages) + pub queuesize: usize, + /// the size of the buffer reserved for incoming messages (bytes) + pub buffersize: usize, + /// frequency at which polling times out and events are checked (ms) + /// None is as fast as possible + pub pollinterval: Option, + /// the max number of threads to use in the network manager. 0 is #cores + pub threadcount: usize, + /// Default list of host used for peer discovery pub default_hosts: Vec
, /// from py-ipv8 configuration. UDP socket address. @@ -16,6 +27,13 @@ pub struct Config { impl Default for Config { fn default() -> Self { Config { + queuesize: 100, + buffersize: 2048, + pollinterval: None, + + // zero means equal to number of cores + threadcount: 0, + socketaddress: Address { address: Ipv4Addr::new(0, 0, 0, 0), port: 8090, diff --git a/src/crypto/mod.rs b/src/crypto/mod.rs index 788dd1b7..fbbaab26 100644 --- a/src/crypto/mod.rs +++ b/src/crypto/mod.rs @@ -9,7 +9,6 @@ use openssl::sign::Signer; use rust_sodium; use rust_sodium::crypto::sign::ed25519; use std::error::Error; -use std::fmt; use std::os::raw::c_int; create_error!(SignatureError, "Invalid signature"); @@ -34,7 +33,7 @@ pub fn verify_signature_ed25519( pkey: ed25519::PublicKey, ) -> Result> { let verify = ed25519::verify_detached( - &ed25519::Signature::from_slice(&*signature).ok_or(Box::new(SignatureError))?, + &ed25519::Signature::from_slice(&*signature).ok_or_else(|| Box::new(SignatureError))?, data, &pkey, ); diff --git a/src/crypto/signature.rs b/src/crypto/signature.rs index 2b196b79..68cefa12 100644 --- a/src/crypto/signature.rs +++ b/src/crypto/signature.rs @@ -1,5 +1,4 @@ use std::error::Error; -use std::fmt; use openssl::bn::BigNum; use serde::ser::SerializeTuple; diff --git a/src/error.rs b/src/error.rs index 92146636..ba9d64e4 100644 --- a/src/error.rs +++ b/src/error.rs @@ -5,13 +5,13 @@ macro_rules! create_error { #[derive(Debug)] pub struct $name; - impl fmt::Display for $name { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + impl std::fmt::Display for $name { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { write!(f, $message) } } - impl Error for $name { + impl std::error::Error for $name { fn description(&self) -> &str { $message } @@ -22,7 +22,6 @@ macro_rules! create_error { #[cfg(test)] mod tests { use std::error::Error; - use std::fmt; #[test] fn test_errors() { diff --git a/src/lib.rs b/src/lib.rs index 52882b82..288a81b7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -12,6 +12,8 @@ pub mod networking; pub mod payloads; use configuration::Config; +use crate::networking::NetworkManager; +use std::error::Error; /// The IPv8 instance. /// This struct is how you can interact with the network. @@ -25,10 +27,20 @@ use configuration::Config; /// ``` pub struct IPv8 { pub config: Config, + pub networkmanager: NetworkManager, } impl IPv8 { - pub fn new(config: configuration::Config) -> Self { - IPv8 { config } + pub fn new(config: configuration::Config) -> Result> { + let networkmanager = + NetworkManager::new(&config.socketaddress, config.threadcount.to_owned())?; + Ok(IPv8 { + config, + networkmanager, + }) + } + + pub fn start(self) { + self.networkmanager.start(&self.config); } } diff --git a/src/networking/connection.rs b/src/networking/connection.rs deleted file mode 100644 index 62bc2a3f..00000000 --- a/src/networking/connection.rs +++ /dev/null @@ -1,27 +0,0 @@ -use super::{super::event::EventGenerator, address::Address}; -use std::net::{SocketAddr, UdpSocket}; - -// NOTE: i am really unhappy with how this connection class works as of now. -// please improve - -pub struct Connection { - socket: UdpSocket, - on_message: EventGenerator, -} - -impl Connection { - fn new(address: Address) -> Result { - let socketaddress = SocketAddr::from((address.address, address.port)); - let socket = match UdpSocket::bind(socketaddress) { - Ok(i) => i, - Err(i) => return Err(format!("{:?}", i)), - }; - - Ok(Connection { - socket, - on_message: EventGenerator::new(), - }) - } - - fn send(address: Address, data: Vec) {} -} diff --git a/src/networking/connector.rs b/src/networking/connector.rs deleted file mode 100644 index 50e15566..00000000 --- a/src/networking/connector.rs +++ /dev/null @@ -1 +0,0 @@ -struct Connector {} diff --git a/src/networking/mod.rs b/src/networking/mod.rs index d057bedc..988949a7 100644 --- a/src/networking/mod.rs +++ b/src/networking/mod.rs @@ -1,3 +1,210 @@ +use std::net::{SocketAddr, IpAddr}; +use crate::networking::address::Address; +use crate::serialization::Packet; +use std::error::Error; +use mio::net::UdpSocket; +use rayon::{ThreadPool, ThreadPoolBuilder}; +use std::thread; +use std::thread::JoinHandle; +use mio::{Poll, Token, Events, Ready, PollOpt}; +use crate::configuration::Config; +use rayon::iter::{IntoParallelRefIterator, ParallelIterator}; +use std::time::Duration; + pub mod address; -pub mod connection; -pub mod connector; + +create_error!(SocketCreationError, "The socket creation failed"); +create_error!(ListenError, "An error occured during the listening"); + +pub trait Receiver { + fn on_receive(&self, packet: Packet, address: Address); +} + +pub struct NetworkManager { + receivers: Vec>, + socket: UdpSocket, + threadpool: ThreadPool, +} + +impl NetworkManager { + pub fn new(address: &Address, threadcount: usize) -> Result> { + let socket = UdpSocket::bind(&SocketAddr::new(IpAddr::V4(address.address), address.port)) + .or(Err(SocketCreationError))?; + + let pool = ThreadPoolBuilder::new() + .num_threads(threadcount) + .breadth_first() + .build()?; + + let nm = Self { + threadpool: pool, + receivers: vec![], + socket, + }; + Ok(nm) + } + + pub fn start(self, configuration: &Config) -> JoinHandle<()> { + let queuesize = configuration.queuesize.to_owned(); + let buffersize = configuration.buffersize.to_owned(); + let pollinterval = configuration.pollinterval.to_owned(); + + thread::spawn(move || { + self.listen(queuesize, buffersize, pollinterval) + .or_else(|i| { + error!("the listening thread crashed"); + Err(i) + }) + .unwrap(); // :gasp: <-- here it's allowed as it will only crash this thread + }) + } + + pub fn listen( + self, + queuesize: usize, + buffersize: usize, + pollinterval: Option, + ) -> Result<(), Box> { + debug!("IPV8 is starting it's listener!"); + + let poll = Poll::new()?; + // this is basically a generated magic number we can later check for + const RECEIVER: Token = Token(0); + let mut events = Events::with_capacity(queuesize); + + let mut tmp_buf = vec![0; buffersize]; + let buffer = tmp_buf.as_mut_slice(); + + poll.register(&self.socket, RECEIVER, Ready::readable(), PollOpt::edge())?; + + loop { + poll.poll(&mut events, pollinterval)?; + trace!("checking poll"); + for _ in events.iter() { + debug!("handling event"); + + let (recv_size, address) = self.socket.recv_from(buffer)?; + + let packet = Packet(buffer[..recv_size].to_vec()).clone(); + + let ip = match address.ip() { + IpAddr::V4(a) => a, + IpAddr::V6(_) => { + warn!("Unexpectedly received ipv6 packet"); + continue; + } + }; + + // use our own threadpool + self.threadpool.install(|| { + // iterate over the receivers asynchronously and non blocking + self.receivers.par_iter().for_each(|r| { + r.on_receive( + packet.clone(), + Address { + address: ip.to_owned(), + port: address.port(), + }, + ); + }); + }); + } + } + } + + pub fn add_receiver(&mut self, receiver: Box) { + self.receivers.push(receiver) + } + + pub fn send(address: Address, packet: Packet) -> Result<(), Box> { + unimplemented!( + "Trying to send {:?} to {:?} but sending is not implemented", + packet, + address + ) + } +} + +#[cfg(test)] +mod tests { + use lazy_static::lazy_static; + + use crate::IPv8; + use crate::configuration::Config; + use mio::net::UdpSocket; + use crate::networking::address::Address; + use std::net::{Ipv4Addr, SocketAddr, IpAddr}; + use crate::serialization::Packet; + use std::sync::Once; + use std::time::Duration; + use crate::networking::Receiver; + use std::thread; + use std::cmp::Ordering; + use std::sync::atomic::AtomicUsize; + + static BEFORE: Once = Once::new(); + + // A poor man's @Before + fn before() { + BEFORE.call_once(|| { + simple_logger::init().unwrap(); + }) + } + + // `pacman -Syu networkmanager` + #[test] + fn test_networkmanager() { + before(); + + // start ipv8 + let mut config = Config::default(); + let address = Ipv4Addr::new(0, 0, 0, 0); + let port = 8090; + + config.socketaddress = Address { address, port }; + config.buffersize = 2048; + + let mut ipv8 = IPv8::new(config).unwrap(); + + lazy_static! { + static ref OGPacket: Packet = Packet::new(create_test_header!()).unwrap(); + } + + static C: AtomicUsize = AtomicUsize::new(0); + + //create receiver + struct AReceiver; + impl Receiver for AReceiver { + fn on_receive(&self, packet: Packet, address: Address) { + assert_eq!(packet.raw(), OGPacket.raw()); + C.fetch_add(1, std::sync::atomic::Ordering::SeqCst); + } + } + + ipv8.networkmanager.add_receiver(Box::new(AReceiver)); + + ipv8.start(); + // wait for it to start up + thread::sleep(Duration::from_millis(300)); + + // now try to send ipv8 a message + let sender_socket = UdpSocket::bind(&SocketAddr::new(IpAddr::V4(address), 0)).unwrap(); + + sender_socket + .connect(SocketAddr::new(IpAddr::V4(address), port)) + .unwrap(); + + let a = sender_socket.send(OGPacket.raw()).unwrap(); + assert_eq!(a, OGPacket.raw().len()); + + thread::sleep(Duration::from_millis(20)); + + let b = sender_socket.send(OGPacket.raw()).unwrap(); + assert_eq!(b, OGPacket.raw().len()); + + thread::sleep(Duration::from_millis(20)); + + // a poor man's `verify(AReceiver, times(2)).on_receiver();` + assert_eq!(2, C.load(std::sync::atomic::Ordering::SeqCst)); + } +} diff --git a/src/payloads/binmemberauthenticationpayload.rs b/src/payloads/binmemberauthenticationpayload.rs index bb9a5e85..b0cecaf9 100644 --- a/src/payloads/binmemberauthenticationpayload.rs +++ b/src/payloads/binmemberauthenticationpayload.rs @@ -23,9 +23,9 @@ impl Serialize for BinMemberAuthenticationPayload { where S: Serializer, { - let v = self.public_key_bin.to_vec().ok_or(ser::Error::custom( - "The key was malformed in a way which made it unserializable.", - ))?; + let v = self.public_key_bin.to_vec().ok_or_else(|| { + ser::Error::custom("The key was malformed in a way which made it unserializable.") + })?; let mut state = serializer.serialize_tuple(v.len() + 2)?; state.serialize_element(&(v.len() as u16))?; @@ -50,9 +50,9 @@ impl<'de> Deserialize<'de> for BinMemberAuthenticationPayload { { // first deserialize it to a temporary struct which literally represents the packer let payload_temporary = BinMemberAuthenticationPayloadPattern::deserialize(deserializer)?; - let public_key_bin = PublicKey::from_vec((payload_temporary.0).0).ok_or( - de::Error::custom("The key was malformed in a way which made it undeserializable."), - )?; + let public_key_bin = PublicKey::from_vec((payload_temporary.0).0).ok_or_else(|| { + de::Error::custom("The key was malformed in a way which made it undeserializable.") + })?; // now build the struct for real Ok(BinMemberAuthenticationPayload { diff --git a/src/serialization/header.rs b/src/serialization/header.rs index 2ddc4c5e..2b659ad6 100644 --- a/src/serialization/header.rs +++ b/src/serialization/header.rs @@ -105,9 +105,9 @@ impl Serialize for Header { } // unwrap the message type - let message_type: u8 = self.message_type.ok_or(serde::ser::Error::custom( - "Message type was empty and this wasn't expected", - ))? as u8; + let message_type: u8 = self.message_type.ok_or_else(|| { + serde::ser::Error::custom("Message type was empty and this wasn't expected") + })? as u8; // Serialize the message type state.serialize_element(&message_type)?; @@ -146,16 +146,16 @@ impl<'de> Deserialize<'de> for Header { // this block is here to be breaked out of or else return an error // Deserialize the first two bytes into `version_bytes` - version_bytes.push(seq.next_element()?.ok_or(serde::de::Error::custom( - "No valid header type could be determined", - ))?); - version_bytes.push(seq.next_element()?.ok_or(serde::de::Error::custom( - "No valid header type could be determined", - ))?); + version_bytes.push(seq.next_element()?.ok_or_else(|| { + serde::de::Error::custom("No valid header type could be determined") + })?); + version_bytes.push(seq.next_element()?.ok_or_else(|| { + serde::de::Error::custom("No valid header type could be determined") + })?); // Check if they match the header version of PyIPv8: `0002`, if so set it and break out of // the pseudo-loop - if version_bytes.as_slice() == [00, 02] { + if version_bytes.as_slice() == [0, 2] { version = Some(HeaderVersion::PyIPV8Header); break; } @@ -179,14 +179,15 @@ impl<'de> Deserialize<'de> for Header { let mut mid_hash: [u8; 20] = [0; 20]; // Init with zeroes for i in mid_hash.iter_mut() { - *i = seq.next_element()?.ok_or(serde::de::Error::custom( - "No valid header type could be determined", - ))?; + *i = seq.next_element()?.ok_or_else(|| { + serde::de::Error::custom( + "No valid header type could be determined", + ) + })?; } - let message_type: u8 = - seq.next_element()?.ok_or(serde::de::Error::custom( - "No valid header type could be determined", - ))?; + let message_type: u8 = seq.next_element()?.ok_or_else(|| { + serde::de::Error::custom("No valid header type could be determined") + })?; Ok(Header::py_ipv8_header(mid_hash, message_type)) } diff --git a/src/serialization/mod.rs b/src/serialization/mod.rs index 634ea851..4c826bb6 100644 --- a/src/serialization/mod.rs +++ b/src/serialization/mod.rs @@ -14,13 +14,18 @@ use bincode; use bincode::ErrorKind; use serde::{Deserialize, Serialize}; use std::error::Error; -use std::fmt; create_error!(HeaderError, "The supplied header was invalid"); #[derive(Debug, Serialize, Deserialize, PartialEq)] pub struct Packet(pub Vec); +impl Clone for Packet { + fn clone(&self) -> Packet { + Packet(self.0.to_vec()) + } +} + #[derive(Debug, PartialEq)] pub struct PacketDeserializer { pub pntr: Packet, @@ -119,6 +124,10 @@ impl Packet { Ok(res) } + pub fn raw(&self) -> &[u8] { + &*self.0 + } + /// Signs a packet. After this, new payloads must under no circumstances be added as this will /// break the verification process on the receiving end. There is no check for this by design for a speed boost /// (though this may or may not be revisited later). Sign deliberately consumes self and returns it again so it can @@ -223,6 +232,17 @@ mod tests { // }); // } + #[test] + fn test_raw() { + let header = create_test_header!(); + let packet = Packet::new(header).unwrap(); + let raw = packet.raw(); + assert_eq!( + raw, + &[0, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 42,] + ) + } + #[test] fn test_peek_header() { let packet = Packet::new(create_test_header!()).unwrap(); diff --git a/src/serialization/varlen.rs b/src/serialization/varlen.rs index 14af3316..a37b1437 100644 --- a/src/serialization/varlen.rs +++ b/src/serialization/varlen.rs @@ -44,13 +44,13 @@ impl<'de> Deserialize<'de> for VarLen16 { // first read the length from the sequence let length: u16 = seq .next_element()? - .ok_or(serde::de::Error::invalid_length(1, &self))?; + .ok_or_else(|| serde::de::Error::invalid_length(1, &self))?; // now read that many bytes from the sequence for _i in 0..length { res.push( seq.next_element()? - .ok_or(serde::de::Error::invalid_length(1, &self))?, + .ok_or_else(|| serde::de::Error::invalid_length(1, &self))?, ); } From 5619f74bfc6dacc7bdde42aed89e4ba1eb277dc9 Mon Sep 17 00:00:00 2001 From: jonay2000 Date: Thu, 13 Jun 2019 02:00:39 +0200 Subject: [PATCH 10/10] resolved some warnings and improved testing a bit --- src/networking/mod.rs | 35 +++++++++++++++++++++-------------- 1 file changed, 21 insertions(+), 14 deletions(-) diff --git a/src/networking/mod.rs b/src/networking/mod.rs index 988949a7..7525a515 100644 --- a/src/networking/mod.rs +++ b/src/networking/mod.rs @@ -139,7 +139,6 @@ mod tests { use std::time::Duration; use crate::networking::Receiver; use std::thread; - use std::cmp::Ordering; use std::sync::atomic::AtomicUsize; static BEFORE: Once = Once::new(); @@ -159,25 +158,32 @@ mod tests { // start ipv8 let mut config = Config::default(); let address = Ipv4Addr::new(0, 0, 0, 0); - let port = 8090; + static RECV_PORT: u16 = 8090; + static SEND_PORT: u16 = 30240; - config.socketaddress = Address { address, port }; + config.socketaddress = Address { + address, + port: RECV_PORT, + }; config.buffersize = 2048; let mut ipv8 = IPv8::new(config).unwrap(); lazy_static! { - static ref OGPacket: Packet = Packet::new(create_test_header!()).unwrap(); + static ref OGPACKET: Packet = Packet::new(create_test_header!()).unwrap(); } - static C: AtomicUsize = AtomicUsize::new(0); + static PACKET_COUNTER: AtomicUsize = AtomicUsize::new(0); //create receiver struct AReceiver; impl Receiver for AReceiver { fn on_receive(&self, packet: Packet, address: Address) { - assert_eq!(packet.raw(), OGPacket.raw()); - C.fetch_add(1, std::sync::atomic::Ordering::SeqCst); + assert_eq!(OGPACKET.raw(), packet.raw()); + assert_eq!(SEND_PORT, address.port); + + // Count each packet + PACKET_COUNTER.fetch_add(1, std::sync::atomic::Ordering::SeqCst); } } @@ -188,23 +194,24 @@ mod tests { thread::sleep(Duration::from_millis(300)); // now try to send ipv8 a message - let sender_socket = UdpSocket::bind(&SocketAddr::new(IpAddr::V4(address), 0)).unwrap(); + let sender_socket = + UdpSocket::bind(&SocketAddr::new(IpAddr::V4(address), SEND_PORT)).unwrap(); sender_socket - .connect(SocketAddr::new(IpAddr::V4(address), port)) + .connect(SocketAddr::new(IpAddr::V4(address), RECV_PORT)) .unwrap(); - let a = sender_socket.send(OGPacket.raw()).unwrap(); - assert_eq!(a, OGPacket.raw().len()); + let a = sender_socket.send(OGPACKET.raw()).unwrap(); + assert_eq!(a, OGPACKET.raw().len()); thread::sleep(Duration::from_millis(20)); - let b = sender_socket.send(OGPacket.raw()).unwrap(); - assert_eq!(b, OGPacket.raw().len()); + let b = sender_socket.send(OGPACKET.raw()).unwrap(); + assert_eq!(b, OGPACKET.raw().len()); thread::sleep(Duration::from_millis(20)); // a poor man's `verify(AReceiver, times(2)).on_receiver();` - assert_eq!(2, C.load(std::sync::atomic::Ordering::SeqCst)); + assert_eq!(2, PACKET_COUNTER.load(std::sync::atomic::Ordering::SeqCst)); } }