From 831f62edb8a7d3e3bf9edbae886881bf4f332fe1 Mon Sep 17 00:00:00 2001 From: Victor Roest Date: Sat, 15 Jun 2019 20:16:07 +0200 Subject: [PATCH 1/3] updated benchmarks --- benches/bench_crypto.rs | 57 +------------------------- benches/networking_throughput.rs | 69 ++++++++++++++++++++++++++++++++ tests/ipv8-community.rs | 0 3 files changed, 70 insertions(+), 56 deletions(-) create mode 100644 benches/networking_throughput.rs create mode 100644 tests/ipv8-community.rs diff --git a/benches/bench_crypto.rs b/benches/bench_crypto.rs index e60db751..e0a8512a 100644 --- a/benches/bench_crypto.rs +++ b/benches/bench_crypto.rs @@ -43,60 +43,5 @@ fn e25519_benchmark(c: &mut Criterion) { }); } -fn openssl_verylow_benchmark(c: &mut Criterion) { - c.bench_function("bench: openssl very low", |b| b.iter(|| { - // private key generated with SECT163K1 and is always constant because it is directly pasted here - let skey = openssl::pkey::PKey::private_key_from_pem("-----BEGIN EC PRIVATE KEY-----\nMFMCAQEEFQKu4aaDxyTSj92iquQP5CIdbagLP6AHBgUrgQQAAaEuAywABABQ76xopUysBuWInGkX+S4elFdpOQZphgLlc6ksoim+5DgUZEBPp+B2Dg==\n-----END EC PRIVATE KEY-----".as_bytes()).unwrap(); - let pkey = openssl::pkey::PKey::public_key_from_pem("-----BEGIN PUBLIC KEY-----\nMEAwEAYHKoZIzj0CAQYFK4EEAAEDLAAEAFDvrGilTKwG5YicaRf5Lh6UV2k5BmmGAuVzqSyiKb7kOBRkQE+n4HYO\n-----END PUBLIC KEY-----".as_bytes()).unwrap(); - - let sig = Signature::from_bytes(&[42, 43, 44], PrivateKey::OpenSSLVeryLow(skey)).unwrap(); - - assert!(sig.verify(&[42, 43, 44], PublicKey::OpenSSLVeryLow(pkey))); - })); -} - -fn openssl_low_benchmark(c: &mut Criterion) { - c.bench_function("bench: openssl low", |b| b.iter(|| { - // private key generated with SECT233K1 and is always constant because it is directly pasted here - let skey = openssl::pkey::PKey::private_key_from_pem("-----BEGIN EC PRIVATE KEY-----\nMG0CAQEEHQ7vns0bhePCngPc4WeP3wnglzSrml0HdQ+jcpfAoAcGBSuBBAAaoUAD\nPgAEAe2ikH75P/vkdl1Bu8tP/WjOeB6LRxW11qGQNUmUAaFxQ7zff5eZyppMv7D0\n9sRcEuSNjk5nUQgTe6zV\n-----END EC PRIVATE KEY-----".as_bytes()).unwrap(); - let pkey = openssl::pkey::PKey::public_key_from_pem("-----BEGIN PUBLIC KEY-----\nMFIwEAYHKoZIzj0CAQYFK4EEABoDPgAEAe2ikH75P/vkdl1Bu8tP/WjOeB6LRxW11qGQNUmUAaFxQ7zff5eZyppMv7D09sRcEuSNjk5nUQgTe6zV\n-----END PUBLIC KEY-----".as_bytes()).unwrap(); - - let sig = Signature::from_bytes(&[42, 43, 44], PrivateKey::OpenSSLLow(skey)).unwrap(); - - assert!(sig.verify(&[42, 43, 44], PublicKey::OpenSSLLow(pkey))); - })); -} - -fn openssl_medium_benchmark(c: &mut Criterion) { - c.bench_function("bench: openssl medium", |b| b.iter(|| { - // private key generated with SECT409K1 and is always constant because it is directly pasted here - let skey = openssl::pkey::PKey::private_key_from_pem("-----BEGIN EC PRIVATE KEY-----\nMIGvAgEBBDNDkh1KSwaBgRj5GGcbYm2qWI5TyBVkOeMVkWWX5+8Dmd44OoSzmR5xCmc1DWuEsasIhhagBwYFK4EEACShbANqAAQAP5r6iYsyTkM7Hea2/tc95iGXV3oCXMLxSWiR/vF/zKjHkPClBN8BQBbBCMjpeS1xLZMUAUi2RoJN69jQevTG+vfhzBNqxIE0dazxbLMvx3wZ6Bol918H8oAa31axHKVaz3SbKLbDTw==\n-----END EC PRIVATE KEY-----".as_bytes()).unwrap(); - let pkey = openssl::pkey::PKey::public_key_from_pem("-----BEGIN PUBLIC KEY-----\nMH4wEAYHKoZIzj0CAQYFK4EEACQDagAEAD+a+omLMk5DOx3mtv7XPeYhl1d6AlzC8Ulokf7xf8yox5DwpQTfAUAWwQjI6XktcS2TFAFItkaCTevY0Hr0xvr34cwTasSBNHWs8WyzL8d8GegaJfdfB/KAGt9WsRylWs90myi2w08=\n-----END PUBLIC KEY-----".as_bytes()).unwrap(); - - let sig = Signature::from_bytes(&[42, 43, 44], PrivateKey::OpenSSLMedium(skey)).unwrap(); - - assert!(sig.verify(&[42, 43, 44], PublicKey::OpenSSLMedium(pkey))); - })); -} - -fn openssl_high_benchmark(c: &mut Criterion) { - c.bench_function("bench: openssl high", |b| b.iter(|| { - // private key generated with SECT571R1 and is always constant because it is directly pasted here - let skey = openssl::pkey::PKey::private_key_from_pem("-----BEGIN EC PRIVATE KEY-----\nMIHuAgEBBEgCQPcwiTfJz3T0/fDqAgvtTO3fvCobbxvJAnsDKQwjJbK9Ak2njemFanI8BOGp/1Mi6nrjfJs9+8h9LhUIYsrJ2j7piRxo2SygBwYFK4EEACehgZUDgZIABAJW+0vOn4V4P7Drsg4IxTtrM7OLA5sUwnBxDyhDcyXfmAdmmtZabrTiBb5jozZ0rXkoUIGOUnaaYH+k+NlbDVBbXtIQbmwpOQTzMTTC/oJi5TJUFc6G3529hTLStV3lILPks4SPk2DPRDC4oC/jRpMXn9VphjzT4gjruhTxVaoEAyi3YmdQpIBXzWVD/lOOhQ==\n-----END EC PRIVATE KEY-----".as_bytes()).unwrap(); - let pkey = openssl::pkey::PKey::public_key_from_pem("-----BEGIN PUBLIC KEY-----\nMIGnMBAGByqGSM49AgEGBSuBBAAnA4GSAAQCVvtLzp+FeD+w67IOCMU7azOziwObFMJwcQ8oQ3Ml35gHZprWWm604gW+Y6M2dK15KFCBjlJ2mmB/pPjZWw1QW17SEG5sKTkE8zE0wv6CYuUyVBXOht+dvYUy0rVd5SCz5LOEj5Ngz0QwuKAv40aTF5/VaYY80+II67oU8VWqBAMot2JnUKSAV81lQ/5TjoU=\n-----END PUBLIC KEY-----".as_bytes()).unwrap(); - - let sig = Signature::from_bytes(&[42, 43, 44], PrivateKey::OpenSSLHigh(skey)).unwrap(); - - assert!(sig.verify(&[42, 43, 44], PublicKey::OpenSSLHigh(pkey))); - })); -} - -criterion_group!( - benches, - e25519_benchmark, - openssl_verylow_benchmark, - openssl_low_benchmark, - openssl_medium_benchmark, - openssl_high_benchmark -); +criterion_group!(benches, e25519_benchmark); criterion_main!(benches); diff --git a/benches/networking_throughput.rs b/benches/networking_throughput.rs new file mode 100644 index 00000000..d3903449 --- /dev/null +++ b/benches/networking_throughput.rs @@ -0,0 +1,69 @@ +use criterion::*; +use ipv8::serialization::Packet; +use ipv8::serialization::header::Header; +use ipv8::payloads::timedistributionpayload::TimeDistributionPayload; +use ipv8::payloads::introductionresponsepayload::IntroductionResponsePayload; + +fn throughput(c: &mut Criterion) { + static BYTES: [u8; 208] = [ + 0x00, 0x02, 0xba, 0xf3, 0x0e, 0xd9, 0x19, 0x2b, 0xa3, 0x54, 0xcd, 0xd7, 0xb1, 0x73, 0xe0, + 0xef, 0x2c, 0x32, 0x80, 0x27, 0xf1, 0xd3, 0xf5, 0x00, 0x4a, 0x4c, 0x69, 0x62, 0x4e, 0x61, + 0x43, 0x4c, 0x50, 0x4b, 0x3a, 0x51, 0xe7, 0x12, 0xc4, 0xeb, 0x8a, 0xc2, 0x5a, 0xe3, 0xa5, + 0x68, 0x24, 0x08, 0xb2, 0xad, 0xbd, 0x6b, 0x78, 0xa4, 0x25, 0x54, 0x7f, 0x26, 0x85, 0xcf, + 0xdf, 0x1e, 0xe9, 0x27, 0x0c, 0xbe, 0x7e, 0xc3, 0x36, 0xc4, 0x16, 0x0f, 0xf5, 0x72, 0x05, + 0x4c, 0x87, 0x78, 0x42, 0xbe, 0x37, 0x73, 0x50, 0x45, 0xa9, 0x3b, 0xc4, 0xe2, 0x04, 0x15, + 0x31, 0x6f, 0xdb, 0x14, 0x71, 0x61, 0xa2, 0xd7, 0x46, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x01, 0x51, 0xab, 0x1b, 0xc2, 0x2b, 0x67, 0xc0, 0xa8, 0x01, 0x4b, 0x1f, 0x9a, 0xc0, + 0xa8, 0x01, 0x4b, 0x1f, 0x9a, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0xd2, 0x0e, 0x00, 0x00, 0x00, 0x00, 0xce, 0xe9, 0x32, 0x6b, 0x9d, 0xd4, + 0xbb, 0x8a, 0xaf, 0x8d, 0xc0, 0x39, 0x28, 0x8e, 0xbf, 0xc2, 0x4a, 0x10, 0xad, 0xc3, 0x7a, + 0xf1, 0xd9, 0xc8, 0x04, 0x17, 0x72, 0x5d, 0x2d, 0x3e, 0x5e, 0x07, 0x52, 0x4d, 0xab, 0x6e, + 0xa7, 0x1b, 0x17, 0x5a, 0x77, 0x5d, 0xb5, 0xd8, 0x91, 0x0c, 0x2b, 0x4b, 0xc8, 0xbb, 0x03, + 0xd3, 0x55, 0xed, 0x10, 0x26, 0xdd, 0xbb, 0xd8, 0xb2, 0x3b, 0xfd, 0xfc, 0x01, + ]; + + c.bench( + "throughput", + Benchmark::new("simple-deserialize", |b| { + b.iter(|| { + let data = Packet(BYTES.to_vec()); + let mut de = data.start_deserialize(); + let _: Header = de.pop_header().unwrap(); + // de.verify(); + let _: TimeDistributionPayload = de.next_payload().unwrap(); + let _: IntroductionResponsePayload = de.next_payload().unwrap(); + }) + }) + .throughput(Throughput::Bytes(BYTES.len() as u32)), + ); + + c.bench( + "throughput", + Benchmark::new("only-verify", |b| { + b.iter(|| { + let data = Packet(BYTES.to_vec()); + let de = data.start_deserialize(); + de.skip_header().unwrap().verify(); + }) + }) + .throughput(Throughput::Bytes(BYTES.len() as u32)), + ); + + c.bench( + "throughput", + Benchmark::new("deserialize+verify", |b| { + b.iter(|| { + let data = Packet(BYTES.to_vec()); + let mut de = data.start_deserialize(); + let _: Header = de.pop_header().unwrap(); + de.verify(); + let _: TimeDistributionPayload = de.next_payload().unwrap(); + let _: IntroductionResponsePayload = de.next_payload().unwrap(); + }) + }) + .throughput(Throughput::Bytes(BYTES.len() as u32)), + ); +} + +criterion_group!(benches, throughput); +criterion_main!(benches); diff --git a/tests/ipv8-community.rs b/tests/ipv8-community.rs new file mode 100644 index 00000000..e69de29b From 22134c1766d4743dc9460aef8f3016974df2fc62 Mon Sep 17 00:00:00 2001 From: Victor Roest Date: Sat, 15 Jun 2019 22:03:25 +0200 Subject: [PATCH 2/3] renamed benchmark --- ...roughput.rs => deserializer_throughput.rs} | 29 +++++++++++++++++++ 1 file changed, 29 insertions(+) rename benches/{networking_throughput.rs => deserializer_throughput.rs} (77%) diff --git a/benches/networking_throughput.rs b/benches/deserializer_throughput.rs similarity index 77% rename from benches/networking_throughput.rs rename to benches/deserializer_throughput.rs index d3903449..db2086d5 100644 --- a/benches/networking_throughput.rs +++ b/benches/deserializer_throughput.rs @@ -3,8 +3,22 @@ use ipv8::serialization::Packet; use ipv8::serialization::header::Header; use ipv8::payloads::timedistributionpayload::TimeDistributionPayload; use ipv8::payloads::introductionresponsepayload::IntroductionResponsePayload; +use ipv8::payloads::binmemberauthenticationpayload::BinMemberAuthenticationPayload; + +/// Critirion benchmark example +/// ``` +/// fn my_bench(c: &mut Criterion) { +/// // One-time setup code goes here +/// c.bench_function("my_bench", |b| { +/// // Per-sample (note that a sample can be many iterations) setup goes here +/// b.iter(|| { +/// // Measured code goes here +/// }); +/// }); +///} fn throughput(c: &mut Criterion) { + // These are the bytes of packet number 1 static BYTES: [u8; 208] = [ 0x00, 0x02, 0xba, 0xf3, 0x0e, 0xd9, 0x19, 0x2b, 0xa3, 0x54, 0xcd, 0xd7, 0xb1, 0x73, 0xe0, 0xef, 0x2c, 0x32, 0x80, 0x27, 0xf1, 0xd3, 0xf5, 0x00, 0x4a, 0x4c, 0x69, 0x62, 0x4e, 0x61, @@ -22,6 +36,8 @@ fn throughput(c: &mut Criterion) { 0xd3, 0x55, 0xed, 0x10, 0x26, 0xdd, 0xbb, 0xd8, 0xb2, 0x3b, 0xfd, 0xfc, 0x01, ]; + let packet = Packet(BYTES.to_vec()); + c.bench( "throughput", Benchmark::new("simple-deserialize", |b| { @@ -37,6 +53,19 @@ fn throughput(c: &mut Criterion) { .throughput(Throughput::Bytes(BYTES.len() as u32)), ); + c.bench( + "throughput", + Benchmark::new("only-bin-member-auth", |b| { + b.iter(|| { + let data = Packet(BYTES.to_vec()); + let de = data.start_deserialize(); + let bin: BinMemberAuthenticationPayload = + de.skip_header().unwrap().next_payload().unwrap(); + }) + }) + .throughput(Throughput::Bytes(BYTES.len() as u32)), + ); + c.bench( "throughput", Benchmark::new("only-verify", |b| { From 26a1a29999d07d9b0ee87b5965c8362e95b89c5d Mon Sep 17 00:00:00 2001 From: Victor Roest Date: Sun, 16 Jun 2019 00:48:25 +0200 Subject: [PATCH 3/3] Sending now works! * Improved Sending * Fixed send tests to be more reliable MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Jonathan Dönszelmann --- Cargo.toml | 27 ++-- examples/ipv8-join.rs | 2 +- src/community/mod.rs | 266 ++++++++++++++-------------------------- src/crypto/keytypes.rs | 16 --- src/lib.rs | 39 ++++-- src/networking/mod.rs | 184 ++++++++++++++++++--------- tests/ipv8-community.rs | 100 +++++++++++++++ 7 files changed, 364 insertions(+), 270 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index a9b2e09a..ce5dd6e0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,40 +7,43 @@ description = "IPv8 implementation for rust." homepage = "https://ip-v8.github.io/rust-ipv8/ipv8/" documentation = "https://ip-v8.github.io/rust-ipv8/ipv8/" repository = "https://github.com/ip-v8/rust-ipv8" -readme = "readme.md" +readme = "README.md" license-file = "license.md" - -[lib] -crate-type = ["lib", "cdylib"] - exclude = [ ".editorconfig", ".travis.yml", ".codeclimate.yml" ] +[lib] +crate-type = ["lib", "cdylib"] + [badges] travis-ci = { repository = "ip-v8/rust-ipv8", branch = "develop" } maintenance = { status = "experimental" } [dependencies] serde = { version = "1.0", features = ["derive"] } -bincode = "1.1.3" -rust_sodium = "0.10.2" +bincode = "1.1" +rust_sodium = "0.10" openssl = { version = "0.10", features = ["vendored"] } -lazy_static = "1.2.0" +lazy_static = "1.2" log = "0.4" -mio = "0.6.19" -rayon = "1.0.3" +mio = "0.6" +rayon = "1.0" [dev-dependencies] -criterion = "0.2.11" -simple_logger = "1.3.0" +criterion = "0.2" +simple_logger = "1.3" [[bench]] name = "bench_crypto" harness = false +[[bench]] +name = "deserializer_throughput" +harness = false + [profile.release] lto = true # Enables link time optimization (allows for inlining cross-crate) opt-level = 3 # Ensures optimization level is set to the maximum diff --git a/examples/ipv8-join.rs b/examples/ipv8-join.rs index 2202c3d8..c6f4b4c1 100644 --- a/examples/ipv8-join.rs +++ b/examples/ipv8-join.rs @@ -6,5 +6,5 @@ use ipv8; fn main() { - let new = ipv8::IPv8::new(ipv8::configuration::Config::default()); + ipv8::IPv8::new(ipv8::configuration::Config::default()); } diff --git a/src/community/mod.rs b/src/community/mod.rs index 55c52d0e..d6672c12 100644 --- a/src/community/mod.rs +++ b/src/community/mod.rs @@ -3,8 +3,8 @@ use crate::serialization::header::Header; use std::error::Error; use std::collections::HashMap; -use crate::networking::NetworkManager; use crate::networking::address::Address; +use crate::networking::NetworkSender; pub mod peer; @@ -26,107 +26,9 @@ create_error!( /// /// _**Note:** Try to avoid the use of .unwrap() in actual production code, this is just an example_ /// -/// -/// ``` -/// use ipv8::community::peer::Peer; -/// use ipv8::community::Community; -/// use ipv8::serialization::header::Header; -/// use ipv8::serialization::{PacketDeserializer, Packet}; -/// use std::net::{Ipv4Addr, SocketAddr, IpAddr}; -/// use ipv8::networking::address::Address; -/// use std::error::Error; -/// use ipv8::IPv8; -/// use ipv8::configuration::Config; -/// use ipv8::serialization::header::HeaderVersion::PyIPV8Header; -/// use ipv8::crypto::keytypes::PublicKey; -/// use ipv8::networking::NetworkManager; -/// -/// pub struct TestCommunity{ -/// peer: Peer -/// } -/// -/// impl TestCommunity{ -/// } -/// -/// impl Community for TestCommunity{ -/// fn new(endpoint: &NetworkManager) -> Result> { -/// -/// // Use the highest available key -/// let pk: PublicKey = PublicKey::from_vec(vec![ -/// 48, 129, 167, 48, 16, 6, 7, 42, 134, 72, 206, 61, 2, 1, 6, 5, 43, 129, 4, 0, 39, 3, -/// 129, 146, 0, 4, 2, 86, 251, 75, 206, 159, 133, 120, 63, 176, 235, 178, 14, 8, 197, 59, -/// 107, 51, 179, 139, 3, 155, 20, 194, 112, 113, 15, 40, 67, 115, 37, 223, 152, 7, 102, -/// 154, 214, 90, 110, 180, 226, 5, 190, 99, 163, 54, 116, 173, 121, 40, 80, 129, 142, 82, -/// 118, 154, 96, 127, 164, 248, 217, 91, 13, 80, 91, 94, 210, 16, 110, 108, 41, 57, 4, -/// 243, 49, 52, 194, 254, 130, 98, 229, 50, 84, 21, 206, 134, 223, 157, 189, 133, 50, 210, -/// 181, 93, 229, 32, 179, 228, 179, 132, 143, 147, 96, 207, 68, 48, 184, 160, 47, 227, 70, -/// 147, 23, 159, 213, 105, 134, 60, 211, 226, 8, 235, 186, 20, 241, 85, 170, 4, 3, 40, -/// 183, 98, 103, 80, 164, 128, 87, 205, 101, 67, 254, 83, 142, 133, -/// ])?; -/// -/// // Actually create the community -/// Ok(TestCommunity { -/// peer: Peer::new( -/// pk, -/// Address(SocketAddr::new( -/// IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), -/// 42, -/// )), -/// true, -/// ) -/// }) -/// } -/// -/// -/// // Returns the hash of our master peer -/// fn get_mid(&self) -> Option> { -/// Some(self.peer.get_sha1()?.to_vec()) -/// } -/// -/// // The function which will be called when the community receives a packet -/// fn on_receive(&self, header: Header, deserializer: PacketDeserializer, address: Address) -> Result<(),Box>{ -/// # assert_eq!(header.mid_hash, self.get_mid()); -/// # assert_eq!(header.version, PyIPV8Header); -/// # assert_eq!(header.message_type, Some(42)); -/// // Do some stuff here like to distribute the message based on it's message_type (in the header) -/// // and check it's signature -/// Ok(()) -/// } -/// } -/// -/// let mut config = Config::default(); -/// let mut ipv8 = IPv8::new(config).unwrap(); -/// -/// let community = TestCommunity::new(&ipv8.networkmanager).unwrap(); -/// let mid = community.get_mid(); -/// ipv8.communities.add_community(Box::new(community)); -/// -/// // now simulate a packet coming in -/// -/// // Create a packet to test the community with -/// let packet = Packet::new(Header{ -/// size: 23, -/// version: PyIPV8Header, -/// mid_hash: mid, -/// message_type: Some(42), -/// }).unwrap(); -/// -/// // Normally you would want to sign the packet here -/// -/// // Send the packet -/// ipv8.communities -/// .forward_message( -/// packet, -/// Address(SocketAddr::new( -/// IpAddr::V4(Ipv4Addr::new(42, 42, 42, 42)), -/// 42, -/// )), -/// ) -/// .unwrap(); -/// -/// ``` + pub trait Community { - fn new(endpoint: &NetworkManager) -> Result> + fn new(endpoint: &NetworkSender) -> Result> where Self: Sized; @@ -184,15 +86,6 @@ pub trait Community { deserializer: PacketDeserializer, address: Address, ) -> Result<(), Box>; - - fn send( - &self, - endpoint: NetworkManager, - address: Address, - packet: Packet, - ) -> Result<(), Box> { - endpoint.send(&address, packet) - } } /// Every different kind of community is registered here with it's MID. @@ -201,6 +94,9 @@ pub trait Community { /// O(1) lookup time. pub struct CommunityRegistry { // mid, community + #[cfg(test)] + pub communities: HashMap, Box>, + #[cfg(not(test))] communities: HashMap, Box>, } @@ -249,81 +145,103 @@ impl Default for CommunityRegistry { } #[cfg(test)] -mod tests { - use crate::networking::NetworkManager; +mod nightly_tests { + use crate::networking::{NetworkReceiver}; use crate::networking::address::Address; - use std::net::{SocketAddr, IpAddr}; + use std::net::{SocketAddr, IpAddr, SocketAddrV4}; use std::error::Error; + use crate::networking::NetworkSender; - #[test] - fn test_networking() { - use crate::community::peer::Peer; - use crate::community::Community; - use crate::serialization::header::Header; - use crate::serialization::{PacketDeserializer, Packet}; - use std::net::Ipv4Addr; - - use crate::IPv8; - use crate::configuration::Config; - use crate::serialization::header::HeaderVersion::PyIPV8Header; - use crate::crypto::keytypes::PublicKey; - - pub struct TestCommunity { - peer: Peer, + use crate::community::peer::Peer; + use crate::community::{Community, CommunityRegistry}; + use crate::serialization::header::Header; + use crate::serialization::{PacketDeserializer, Packet}; + use std::net::Ipv4Addr; + + use crate::IPv8; + use crate::configuration::Config; + use crate::serialization::header::HeaderVersion::PyIPV8Header; + use crate::crypto::keytypes::PublicKey; + use std::sync::atomic::Ordering; + use crate::networking::test_helper::localhost; + + pub struct TestCommunity { + peer: Peer, + } + + impl Community for TestCommunity { + fn new(endpoint: &NetworkSender) -> Result> { + // Use the highest available key + let pk: PublicKey = PublicKey::from_vec(vec![ + 48, 129, 167, 48, 16, 6, 7, 42, 134, 72, 206, 61, 2, 1, 6, 5, 43, 129, 4, 0, 39, 3, + 129, 146, 0, 4, 2, 86, 251, 75, 206, 159, 133, 120, 63, 176, 235, 178, 14, 8, 197, + 59, 107, 51, 179, 139, 3, 155, 20, 194, 112, 113, 15, 40, 67, 115, 37, 223, 152, 7, + 102, 154, 214, 90, 110, 180, 226, 5, 190, 99, 163, 54, 116, 173, 121, 40, 80, 129, + 142, 82, 118, 154, 96, 127, 164, 248, 217, 91, 13, 80, 91, 94, 210, 16, 110, 108, + 41, 57, 4, 243, 49, 52, 194, 254, 130, 98, 229, 50, 84, 21, 206, 134, 223, 157, + 189, 133, 50, 210, 181, 93, 229, 32, 179, 228, 179, 132, 143, 147, 96, 207, 68, 48, + 184, 160, 47, 227, 70, 147, 23, 159, 213, 105, 134, 60, 211, 226, 8, 235, 186, 20, + 241, 85, 170, 4, 3, 40, 183, 98, 103, 80, 164, 128, 87, 205, 101, 67, 254, 83, 142, + 133, + ])?; + // Actually create the community + Ok(TestCommunity { + peer: Peer::new( + pk, + Address(SocketAddr::new( + IpAddr::V4(Ipv4Addr::new(42, 42, 42, 42)), + 8000, + )), + true, + ), + }) + } + + // Returns the hash of our master peer + fn get_mid(&self) -> Option> { + Some(self.peer.get_sha1()?.to_vec()) } - impl Community for TestCommunity { - fn new(endpoint: &NetworkManager) -> Result> { - // Use the highest available key - let pk: PublicKey = PublicKey::from_vec(vec![ - 48, 129, 167, 48, 16, 6, 7, 42, 134, 72, 206, 61, 2, 1, 6, 5, 43, 129, 4, 0, - 39, 3, 129, 146, 0, 4, 2, 86, 251, 75, 206, 159, 133, 120, 63, 176, 235, 178, - 14, 8, 197, 59, 107, 51, 179, 139, 3, 155, 20, 194, 112, 113, 15, 40, 67, 115, - 37, 223, 152, 7, 102, 154, 214, 90, 110, 180, 226, 5, 190, 99, 163, 54, 116, - 173, 121, 40, 80, 129, 142, 82, 118, 154, 96, 127, 164, 248, 217, 91, 13, 80, - 91, 94, 210, 16, 110, 108, 41, 57, 4, 243, 49, 52, 194, 254, 130, 98, 229, 50, - 84, 21, 206, 134, 223, 157, 189, 133, 50, 210, 181, 93, 229, 32, 179, 228, 179, - 132, 143, 147, 96, 207, 68, 48, 184, 160, 47, 227, 70, 147, 23, 159, 213, 105, - 134, 60, 211, 226, 8, 235, 186, 20, 241, 85, 170, 4, 3, 40, 183, 98, 103, 80, - 164, 128, 87, 205, 101, 67, 254, 83, 142, 133, - ])?; - // Actually create the community - Ok(TestCommunity { - peer: Peer::new( - pk, - Address(SocketAddr::new( - IpAddr::V4(Ipv4Addr::new(42, 42, 42, 42)), - 8000, - )), - true, - ), - }) - } - - // Returns the hash of our master peer - fn get_mid(&self) -> Option> { - Some(self.peer.get_sha1()?.to_vec()) - } - - // The function which will be called when the community receives a packet - fn on_receive( - &self, - header: Header, - deserializer: PacketDeserializer, - _address: Address, - ) -> Result<(), Box> { - assert_eq!(header.mid_hash, self.get_mid()); - assert_eq!(header.version, PyIPV8Header); - assert_eq!(header.message_type, Some(42)); - Ok(()) - } + // The function which will be called when the community receives a packet + fn on_receive( + &self, + header: Header, + deserializer: PacketDeserializer, + _address: Address, + ) -> Result<(), Box> { + assert_eq!(header.mid_hash, self.get_mid()); + assert_eq!(header.version, PyIPV8Header); + assert_eq!(header.message_type, Some(42)); + Ok(()) } + } + #[test] + fn test_add_receiver() { let config = Config::default(); + let ipv8 = IPv8::new(config).unwrap(); + let community = Box::new(TestCommunity::new(&ipv8.network_sender).unwrap()); + let the_same = Box::new(TestCommunity::new(&ipv8.network_sender).unwrap()); + let mid = &*community.get_mid().unwrap(); + let mut registry: CommunityRegistry = CommunityRegistry::default(); + + registry.add_community(community).unwrap(); + + let get = registry.communities.get(mid).unwrap(); + + assert_eq!(the_same.get_mid(), get.get_mid()); // TODO: More thorough comparison + } + + #[test] + fn test_networking() { + let mut config = Config::default(); + config.receiving_address = localhost(); + config.sending_address = localhost(); + config.buffersize = 2048; let mut ipv8 = IPv8::new(config).unwrap(); - let community = TestCommunity::new(&ipv8.networkmanager).unwrap(); + let community = TestCommunity::new(&ipv8.network_sender).unwrap(); let mid = community.get_mid(); ipv8.communities.add_community(Box::new(community)).unwrap(); diff --git a/src/crypto/keytypes.rs b/src/crypto/keytypes.rs index 0fda4562..b67c725a 100644 --- a/src/crypto/keytypes.rs +++ b/src/crypto/keytypes.rs @@ -1,20 +1,4 @@ //! Module for the various types of keys -//! -//! When adding new key types follow these steps: -//! 1. Add it the the PublicKey and PrivateKey enums -//! 2. Change all the match statements testing for either an openssl or libnacl key (or if your key is not either of this, add yours) -//! 3. Add your key to the list of consts below for signature length -//! 4. add your constant to the macro sig_size. -//! -//! The length of the used signatures -//! -//! | Signature Type | pyipv8 | .size() | diff | (bits/8)*2 | -//! |----------------------------------|--------|---------|------|------------| -//! | VERY_LOW_SIGNATURE_LENGTH | 42 | 50 | 8 | 42 | -//! | LOW_SIGNATURE_LENGTH | 60 | 66 | 6 | 58 | -//! | MEDIUM_SIGNATURE_LENGTH | 104 | 110 | 6 | 102 | -//! | HIGH_SIGNATURE_LENGTH | 144 | 153 | 9 | 144 | -//! | ED25519_SIGNATURE_LENGTH | 64 | 64 | 0 | 64 | use std::fmt; diff --git a/src/lib.rs b/src/lib.rs index c752b68d..f7a457aa 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -11,9 +11,11 @@ pub mod networking; pub mod payloads; use configuration::Config; -use crate::networking::NetworkManager; +use crate::networking::{NetworkSender, NetworkReceiver}; use std::error::Error; use crate::community::CommunityRegistry; +use rayon::{ThreadPoolBuilder}; +use std::sync::Once; /// The IPv8 instance. /// This struct is how you can interact with the network. @@ -27,27 +29,46 @@ use crate::community::CommunityRegistry; /// ``` pub struct IPv8 { pub config: Config, - pub networkmanager: NetworkManager, + pub network_receiver: NetworkReceiver, + pub network_sender: NetworkSender, /// The registry containing all the communities pub communities: CommunityRegistry, } +// To keep track if the threadpool is already started +static THREADPOOL_START: Once = Once::new(); + impl IPv8 { pub fn new(config: configuration::Config) -> Result> { - let networkmanager = NetworkManager::new( - &config.sending_address, - &config.receiving_address, - config.threadcount.to_owned(), - )?; + // Setup the global threadpool + { + let mut started = None; + + THREADPOOL_START.call_once(|| { + started = Some( + ThreadPoolBuilder::new() + .num_threads(config.threadcount) + .build_global(), + ) + }); + + if let Some(s) = started { + s? + } + } + + let network_receiver = NetworkReceiver::new(&config.receiving_address)?; + let network_sender = NetworkSender::new(&config.sending_address)?; Ok(IPv8 { config, - networkmanager, + network_receiver, + network_sender, communities: CommunityRegistry::default(), }) } pub fn start(self) { - self.networkmanager.start(&self.config); + self.network_receiver.start(&self.config); } } diff --git a/src/networking/mod.rs b/src/networking/mod.rs index 93160ece..7c69aa1b 100644 --- a/src/networking/mod.rs +++ b/src/networking/mod.rs @@ -1,7 +1,6 @@ use crate::serialization::Packet; use std::error::Error; use mio::net::UdpSocket; -use rayon::{ThreadPool, ThreadPoolBuilder}; use std::thread; use std::thread::JoinHandle; use mio::{Poll, Token, Events, Ready, PollOpt}; @@ -9,6 +8,7 @@ use crate::configuration::Config; use rayon::iter::{IntoParallelRefIterator, ParallelIterator}; use std::time::Duration; use crate::networking::address::Address; +use rayon::scope_fifo; pub mod address; @@ -22,40 +22,40 @@ pub trait Receiver { fn on_receive(&self, packet: Packet, address: Address); } -/// This struct manages the sockets and receives incoming messages. -pub struct NetworkManager { +pub struct NetworkSender { + socket: UdpSocket, +} + +impl NetworkSender { + pub fn new(sending_address: &Address) -> Result> { + let socket = UdpSocket::bind(&sending_address.0)?; + debug!("Starting, sending_address: {:?}", sending_address); + + Ok(Self { socket }) + } + + /// Sends a Packet to the specified address. + pub fn send(&self, address: &Address, packet: Packet) -> Result> { + Ok(self.socket.send_to(packet.raw(), &address.0)?) + } +} + +pub struct NetworkReceiver { receivers: Vec>, - receiving_socket: UdpSocket, - sending_socket: UdpSocket, - threadpool: ThreadPool, + socket: UdpSocket, } -impl NetworkManager { +impl NetworkReceiver { /// Creates a new networkmanager. This creates a receiver socket and builds a new threadpool on which /// all messages are distributed. - pub fn new( - sending_address: &Address, - receiving_address: &Address, - threadcount: usize, - ) -> Result> { - let receiving_socket = - UdpSocket::bind(&receiving_address.0).or(Err(SocketCreationError))?; - - let sending_socket = UdpSocket::bind(&sending_address.0).or(Err(SocketCreationError))?; + pub fn new(receiving_address: &Address) -> Result> { + let socket = UdpSocket::bind(&receiving_address.0)?; - trace!( - "Starting, sending_address: {:?}, receiving_address: {:?}", - sending_address, - receiving_address - ); - - let pool = ThreadPoolBuilder::new().num_threads(threadcount).build()?; + debug!("Starting, receiving_address: {:?}", receiving_address); let nm = Self { - threadpool: pool, receivers: vec![], - receiving_socket, - sending_socket, + socket, }; Ok(nm) } @@ -71,13 +71,14 @@ impl NetworkManager { let buffersize = configuration.buffersize.to_owned(); let pollinterval = configuration.pollinterval.to_owned(); + // Start the I/O thread thread::spawn(move || { self.listen(queuesize, buffersize, pollinterval) .or_else(|i| { error!("the listening thread crashed"); Err(i) }) - .unwrap(); // :gasp: <-- here it's allowed as it will only crash this thread + .unwrap(); // This only panics the I/O thread not the whole application }) } @@ -90,32 +91,27 @@ impl NetworkManager { debug!("IPV8 is starting it's listener!"); let poll = Poll::new()?; - // this is basically a generated magic number we can later check for - const RECEIVER: Token = Token(0); + let mut events = Events::with_capacity(queuesize); let mut tmp_buf = vec![0; buffersize]; let buffer = tmp_buf.as_mut_slice(); - poll.register( - &self.receiving_socket, - RECEIVER, - Ready::readable(), - PollOpt::edge(), - )?; + const RECEIVER: Token = Token(0); + poll.register(&self.socket, RECEIVER, Ready::readable(), PollOpt::edge())?; loop { poll.poll(&mut events, pollinterval)?; trace!("checking poll"); for _ in events.iter() { - debug!("handling event"); + trace!("handling event"); - let (recv_size, address) = self.receiving_socket.recv_from(buffer)?; + let (recv_size, address) = self.socket.recv_from(buffer)?; let packet = Packet(buffer[..recv_size].to_vec()).clone(); - // use our own threadpool - self.threadpool.scope_fifo(|s| { + // We want a FIFO threadpool + scope_fifo(|s| { s.spawn_fifo(|_| { // iterate over the receivers asynchronously and non blocking self.receivers.par_iter().for_each(|r| { @@ -131,11 +127,34 @@ impl NetworkManager { pub fn add_receiver(&mut self, receiver: Box) { self.receivers.push(receiver) } +} - /// Sends a Packet to the specified address. - pub fn send(&self, address: &Address, packet: Packet) -> Result<(), Box> { - self.sending_socket.send_to(packet.raw(), &address.0)?; - Ok(()) +/// Taken and adapted from the [mio testing suite](https://github.com/tokio-rs/mio/blob/master/test/mod.rs#L113) +#[cfg(test)] +pub mod test_helper { + use std::net::{SocketAddr, Ipv4Addr, SocketAddrV4, IpAddr}; + use std::str::FromStr; + use std::sync::atomic::Ordering::SeqCst; + use std::sync::atomic::{AtomicU16}; + use crate::networking::address::Address; + + // Helper for getting a unique port for the task run + // TODO: Reuse ports to not spam the system + const FIRST_PORT: u16 = 18080; + static NEXT_PORT: AtomicU16 = AtomicU16::new(FIRST_PORT); + pub const LOCALHOST_IP: Ipv4Addr = Ipv4Addr::new(127, 0, 0, 1); + + fn next_port() -> u16 { + // Get and increment the port list + NEXT_PORT.fetch_add(1, SeqCst) + } + + pub fn localhost() -> Address { + Address(localhost_socket()) + } + + pub fn localhost_socket() -> SocketAddr { + SocketAddr::new(IpAddr::V4(LOCALHOST_IP), next_port()) } } @@ -151,10 +170,14 @@ mod tests { use crate::serialization::Packet; use std::sync::Once; use std::time::Duration; - use crate::networking::Receiver; + use crate::networking::{Receiver, NetworkSender, NetworkReceiver}; use std::thread; use std::sync::atomic::{AtomicUsize, Ordering, AtomicU16}; use crate::networking::address::Address; + use crate::serialization::header::HeaderVersion::PyIPV8Header; + use crate::serialization::header::Header; + use serde::private::ser::constrain; + use crate::networking::test_helper::{localhost, localhost_socket, LOCALHOST_IP}; static BEFORE: Once = Once::new(); @@ -172,24 +195,18 @@ mod tests { // start ipv8 let mut config = Config::default(); - let address = Ipv4Addr::new(0, 0, 0, 0); - config.receiving_address = Address(SocketAddr::new(IpAddr::V4(address), 8090)); - config.sending_address = Address(SocketAddr::new(IpAddr::V4(address), 0)); + config.receiving_address = localhost(); + config.sending_address = localhost(); config.buffersize = 2048; let mut ipv8 = IPv8::new(config).unwrap(); - let sender_socket = UdpSocket::bind(&SocketAddr::new(IpAddr::V4(address), 0)).unwrap(); + let sender_socket = UdpSocket::bind(&localhost_socket()).unwrap(); static SEND_PORT: AtomicU16 = AtomicU16::new(0); - let recv_port: u16 = ipv8 - .networkmanager - .receiving_socket - .local_addr() - .unwrap() - .port(); + let recv_port: u16 = ipv8.network_receiver.socket.local_addr().unwrap().port(); let send_port: u16 = sender_socket.local_addr().unwrap().port(); SEND_PORT.store(send_port, Ordering::SeqCst); @@ -212,15 +229,13 @@ mod tests { } } - ipv8.networkmanager.add_receiver(Box::new(AReceiver)); + ipv8.network_receiver.add_receiver(Box::new(AReceiver)); ipv8.start(); - // wait for it to start up - thread::sleep(Duration::from_millis(300)); // now try to send ipv8 a message sender_socket - .connect(SocketAddr::new(IpAddr::V4(address), recv_port)) + .connect(SocketAddr::new(IpAddr::V4(LOCALHOST_IP), recv_port)) .unwrap(); let a = sender_socket.send(OGPACKET.raw()).unwrap(); @@ -236,4 +251,57 @@ mod tests { // a poor man's `verify(AReceiver, times(2)).on_receiver();` assert_eq!(2, PACKET_COUNTER.load(std::sync::atomic::Ordering::SeqCst)); } + + #[test] + fn test_sending_networkmanager() { + before(); + + // start ipv8 + let mut config = Config::default(); + + config.receiving_address = localhost(); + config.sending_address = localhost(); + config.buffersize = 2048; + + // let mut ipv8 = IPv8::new(config).unwrap(); + let ns = NetworkSender::new(&config.sending_address).unwrap(); + let mut nr = NetworkReceiver::new(&config.receiving_address).unwrap(); + + static SEND_PORT: AtomicU16 = AtomicU16::new(0); + + let recv_port: u16 = nr.socket.local_addr().unwrap().port(); + let send_port: u16 = ns.socket.local_addr().unwrap().port(); + + SEND_PORT.store(send_port, Ordering::SeqCst); + + lazy_static! { + static ref OGPACKET: Packet = Packet::new(create_test_header!()).unwrap(); + } + + static PACKET_COUNTER: AtomicUsize = AtomicUsize::new(0); + + //create receiver + struct AReceiver; + impl Receiver for AReceiver { + fn on_receive(&self, packet: Packet, address: Address) { + assert_eq!(OGPACKET.raw(), packet.raw()); + assert_eq!(SEND_PORT.load(Ordering::SeqCst), (address.0).port()); + + // Count each packet + PACKET_COUNTER.fetch_add(1, Ordering::SeqCst); + } + } + + nr.add_receiver(Box::new(AReceiver)); + nr.start(&config); + + let addr = Address(SocketAddr::new(IpAddr::V4(LOCALHOST_IP), recv_port)); + + ns.send(&addr, Packet(OGPACKET.raw().to_vec())).unwrap(); + + thread::sleep(Duration::from_millis(20)); + + // a poor man's `verify(AReceiver, times(2)).on_receiver();` + assert_eq!(1, PACKET_COUNTER.load(std::sync::atomic::Ordering::SeqCst)); + } } diff --git a/tests/ipv8-community.rs b/tests/ipv8-community.rs index e69de29b..29cb4dd5 100644 --- a/tests/ipv8-community.rs +++ b/tests/ipv8-community.rs @@ -0,0 +1,100 @@ +use ipv8::networking::NetworkSender; + +#[test] +fn community_integration_test() { + use ipv8::community::peer::Peer; + use ipv8::community::Community; + use ipv8::serialization::header::Header; + use ipv8::serialization::{PacketDeserializer, Packet}; + use std::net::{Ipv4Addr, SocketAddr, IpAddr}; + use ipv8::networking::address::Address; + use std::error::Error; + use ipv8::IPv8; + use ipv8::configuration::Config; + use ipv8::serialization::header::HeaderVersion::PyIPV8Header; + use ipv8::crypto::keytypes::PublicKey; + + pub struct TestCommunity { + peer: Peer, + } + + impl TestCommunity {} + + impl Community for TestCommunity { + fn new(endpoint: &NetworkSender) -> Result> { + // Use the highest available key + let pk: PublicKey = PublicKey::from_vec(vec![ + 48, 129, 167, 48, 16, 6, 7, 42, 134, 72, 206, 61, 2, 1, 6, 5, 43, 129, 4, 0, 39, 3, + 129, 146, 0, 4, 2, 86, 251, 75, 206, 159, 133, 120, 63, 176, 235, 178, 14, 8, 197, + 59, 107, 51, 179, 139, 3, 155, 20, 194, 112, 113, 15, 40, 67, 115, 37, 223, 152, 7, + 102, 154, 214, 90, 110, 180, 226, 5, 190, 99, 163, 54, 116, 173, 121, 40, 80, 129, + 142, 82, 118, 154, 96, 127, 164, 248, 217, 91, 13, 80, 91, 94, 210, 16, 110, 108, + 41, 57, 4, 243, 49, 52, 194, 254, 130, 98, 229, 50, 84, 21, 206, 134, 223, 157, + 189, 133, 50, 210, 181, 93, 229, 32, 179, 228, 179, 132, 143, 147, 96, 207, 68, 48, + 184, 160, 47, 227, 70, 147, 23, 159, 213, 105, 134, 60, 211, 226, 8, 235, 186, 20, + 241, 85, 170, 4, 3, 40, 183, 98, 103, 80, 164, 128, 87, 205, 101, 67, 254, 83, 142, + 133, + ])?; + + // Actually create the community + Ok(TestCommunity { + peer: Peer::new( + pk, + Address(SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 42)), + true, + ), + }) + } + + // Returns the hash of our master peer + fn get_mid(&self) -> Option> { + Some(self.peer.get_sha1()?.to_vec()) + } + + // The function which will be called when the community receives a packet + fn on_receive( + &self, + header: Header, + deserializer: PacketDeserializer, + address: Address, + ) -> Result<(), Box> { + assert_eq!(header.mid_hash, self.get_mid()); + assert_eq!(header.version, PyIPV8Header); + assert_eq!(header.message_type, Some(42)); + // Do some stuff here like to distribute the message based on it's message_type (in the header) + // and check it's signature + Ok(()) + } + } + + let config = Config::default(); + let mut ipv8 = IPv8::new(config).unwrap(); + + let community = TestCommunity::new(&ipv8.network_sender).unwrap(); + let mid = community.get_mid(); + ipv8.communities.add_community(Box::new(community)).unwrap(); + + // now simulate a packet coming in + + // Create a packet to test the community with + let packet = Packet::new(Header { + size: 23, + version: PyIPV8Header, + mid_hash: mid, + message_type: Some(42), + }) + .unwrap(); + + // Normally you would want to sign the packet here + + // Send the packet + ipv8.communities + .forward_message( + packet, + Address(SocketAddr::new( + IpAddr::V4(Ipv4Addr::new(42, 42, 42, 42)), + 42, + )), + ) + .unwrap(); +}