Skip to content

Commit

Permalink
Merge pull request #42 from ip-v8/networking
Browse files Browse the repository at this point in the history
Networking
  • Loading branch information
Dany Sluijk authored Jun 13, 2019
2 parents ccfa197 + ad45905 commit f3ca505
Show file tree
Hide file tree
Showing 14 changed files with 303 additions and 67 deletions.
3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,12 @@ rust_sodium = "0.10.2"
openssl = { version = "0.10", features = ["vendored"] }
lazy_static = "1.2.0"
log = "0.4"
mio = "0.6.19"
rayon = "1.0.3"

[dev-dependencies]
criterion = "0.2.11"
simple_logger = "1.3.0"

[[bench]]
name = "bench_crypto"
Expand Down
3 changes: 1 addition & 2 deletions src/community/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use crate::serialization::header::Header;
use std::error::Error;
use std::collections::HashMap;
use crate::networking::address::Address;
use std::fmt;

pub mod peer;

Expand Down Expand Up @@ -89,7 +88,7 @@ create_error!(InsertionError, "Error inserting community into hashmap");
/// let community = TestCommunity::new().unwrap();
/// let mid = community.get_mid();
/// config.communities.add_community(Box::new(community));
/// let ipv8 = IPv8::new(config);
/// let ipv8 = IPv8::new(config).unwrap();
///
/// // now simulate a packet coming in
///
Expand Down
18 changes: 18 additions & 0 deletions src/configuration.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,19 @@
use crate::networking::address::Address;
use std::net::Ipv4Addr;
use crate::community::CommunityRegistry;
use std::time::Duration;

pub struct Config {
/// the amount of space reserved for queueing up incoming messages (messages)
pub queuesize: usize,
/// the size of the buffer reserved for incoming messages (bytes)
pub buffersize: usize,
/// frequency at which polling times out and events are checked (ms)
/// None is as fast as possible
pub pollinterval: Option<Duration>,
/// the max number of threads to use in the network manager. 0 is #cores
pub threadcount: usize,

/// Default list of host used for peer discovery
pub default_hosts: Vec<Address>,
/// from py-ipv8 configuration. UDP socket address.
Expand All @@ -16,6 +27,13 @@ pub struct Config {
impl Default for Config {
fn default() -> Self {
Config {
queuesize: 100,
buffersize: 2048,
pollinterval: None,

// zero means equal to number of cores
threadcount: 0,

socketaddress: Address {
address: Ipv4Addr::new(0, 0, 0, 0),
port: 8090,
Expand Down
3 changes: 1 addition & 2 deletions src/crypto/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use openssl::sign::Signer;
use rust_sodium;
use rust_sodium::crypto::sign::ed25519;
use std::error::Error;
use std::fmt;
use std::os::raw::c_int;

create_error!(SignatureError, "Invalid signature");
Expand All @@ -34,7 +33,7 @@ pub fn verify_signature_ed25519(
pkey: ed25519::PublicKey,
) -> Result<bool, Box<dyn Error>> {
let verify = ed25519::verify_detached(
&ed25519::Signature::from_slice(&*signature).ok_or(Box::new(SignatureError))?,
&ed25519::Signature::from_slice(&*signature).ok_or_else(|| Box::new(SignatureError))?,
data,
&pkey,
);
Expand Down
1 change: 0 additions & 1 deletion src/crypto/signature.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use std::error::Error;
use std::fmt;

use openssl::bn::BigNum;
use serde::ser::SerializeTuple;
Expand Down
7 changes: 3 additions & 4 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@ macro_rules! create_error {
#[derive(Debug)]
pub struct $name;

impl fmt::Display for $name {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
impl std::fmt::Display for $name {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, $message)
}
}

impl Error for $name {
impl std::error::Error for $name {
fn description(&self) -> &str {
$message
}
Expand All @@ -22,7 +22,6 @@ macro_rules! create_error {
#[cfg(test)]
mod tests {
use std::error::Error;
use std::fmt;

#[test]
fn test_errors() {
Expand Down
16 changes: 14 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ pub mod networking;
pub mod payloads;

use configuration::Config;
use crate::networking::NetworkManager;
use std::error::Error;

/// The IPv8 instance.
/// This struct is how you can interact with the network.
Expand All @@ -25,10 +27,20 @@ use configuration::Config;
/// ```
pub struct IPv8 {
pub config: Config,
pub networkmanager: NetworkManager,
}

impl IPv8 {
pub fn new(config: configuration::Config) -> Self {
IPv8 { config }
pub fn new(config: configuration::Config) -> Result<Self, Box<dyn Error>> {
let networkmanager =
NetworkManager::new(&config.socketaddress, config.threadcount.to_owned())?;
Ok(IPv8 {
config,
networkmanager,
})
}

pub fn start(self) {
self.networkmanager.start(&self.config);
}
}
27 changes: 0 additions & 27 deletions src/networking/connection.rs

This file was deleted.

1 change: 0 additions & 1 deletion src/networking/connector.rs

This file was deleted.

218 changes: 216 additions & 2 deletions src/networking/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,217 @@
use std::net::{SocketAddr, IpAddr};
use crate::networking::address::Address;
use crate::serialization::Packet;
use std::error::Error;
use mio::net::UdpSocket;
use rayon::{ThreadPool, ThreadPoolBuilder};
use std::thread;
use std::thread::JoinHandle;
use mio::{Poll, Token, Events, Ready, PollOpt};
use crate::configuration::Config;
use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
use std::time::Duration;

pub mod address;
pub mod connection;
pub mod connector;

create_error!(SocketCreationError, "The socket creation failed");
create_error!(ListenError, "An error occured during the listening");

pub trait Receiver {
fn on_receive(&self, packet: Packet, address: Address);
}

pub struct NetworkManager {
receivers: Vec<Box<dyn Receiver + Send + Sync>>,
socket: UdpSocket,
threadpool: ThreadPool,
}

impl NetworkManager {
pub fn new(address: &Address, threadcount: usize) -> Result<Self, Box<dyn Error>> {
let socket = UdpSocket::bind(&SocketAddr::new(IpAddr::V4(address.address), address.port))
.or(Err(SocketCreationError))?;

let pool = ThreadPoolBuilder::new()
.num_threads(threadcount)
.breadth_first()
.build()?;

let nm = Self {
threadpool: pool,
receivers: vec![],
socket,
};
Ok(nm)
}

pub fn start(self, configuration: &Config) -> JoinHandle<()> {
let queuesize = configuration.queuesize.to_owned();
let buffersize = configuration.buffersize.to_owned();
let pollinterval = configuration.pollinterval.to_owned();

thread::spawn(move || {
self.listen(queuesize, buffersize, pollinterval)
.or_else(|i| {
error!("the listening thread crashed");
Err(i)
})
.unwrap(); // :gasp: <-- here it's allowed as it will only crash this thread
})
}

pub fn listen(
self,
queuesize: usize,
buffersize: usize,
pollinterval: Option<Duration>,
) -> Result<(), Box<dyn Error>> {
debug!("IPV8 is starting it's listener!");

let poll = Poll::new()?;
// this is basically a generated magic number we can later check for
const RECEIVER: Token = Token(0);
let mut events = Events::with_capacity(queuesize);

let mut tmp_buf = vec![0; buffersize];
let buffer = tmp_buf.as_mut_slice();

poll.register(&self.socket, RECEIVER, Ready::readable(), PollOpt::edge())?;

loop {
poll.poll(&mut events, pollinterval)?;
trace!("checking poll");
for _ in events.iter() {
debug!("handling event");

let (recv_size, address) = self.socket.recv_from(buffer)?;

let packet = Packet(buffer[..recv_size].to_vec()).clone();

let ip = match address.ip() {
IpAddr::V4(a) => a,
IpAddr::V6(_) => {
warn!("Unexpectedly received ipv6 packet");
continue;
}
};

// use our own threadpool
self.threadpool.install(|| {
// iterate over the receivers asynchronously and non blocking
self.receivers.par_iter().for_each(|r| {
r.on_receive(
packet.clone(),
Address {
address: ip.to_owned(),
port: address.port(),
},
);
});
});
}
}
}

pub fn add_receiver(&mut self, receiver: Box<dyn Receiver + Send + Sync>) {
self.receivers.push(receiver)
}

pub fn send(address: Address, packet: Packet) -> Result<(), Box<dyn Error>> {
unimplemented!(
"Trying to send {:?} to {:?} but sending is not implemented",
packet,
address
)
}
}

#[cfg(test)]
mod tests {
use lazy_static::lazy_static;

use crate::IPv8;
use crate::configuration::Config;
use mio::net::UdpSocket;
use crate::networking::address::Address;
use std::net::{Ipv4Addr, SocketAddr, IpAddr};
use crate::serialization::Packet;
use std::sync::Once;
use std::time::Duration;
use crate::networking::Receiver;
use std::thread;
use std::sync::atomic::AtomicUsize;

static BEFORE: Once = Once::new();

// A poor man's @Before
fn before() {
BEFORE.call_once(|| {
simple_logger::init().unwrap();
})
}

// `pacman -Syu networkmanager`
#[test]
fn test_networkmanager() {
before();

// start ipv8
let mut config = Config::default();
let address = Ipv4Addr::new(0, 0, 0, 0);
static RECV_PORT: u16 = 8090;
static SEND_PORT: u16 = 30240;

config.socketaddress = Address {
address,
port: RECV_PORT,
};
config.buffersize = 2048;

let mut ipv8 = IPv8::new(config).unwrap();

lazy_static! {
static ref OGPACKET: Packet = Packet::new(create_test_header!()).unwrap();
}

static PACKET_COUNTER: AtomicUsize = AtomicUsize::new(0);

//create receiver
struct AReceiver;
impl Receiver for AReceiver {
fn on_receive(&self, packet: Packet, address: Address) {
assert_eq!(OGPACKET.raw(), packet.raw());
assert_eq!(SEND_PORT, address.port);

// Count each packet
PACKET_COUNTER.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
}
}

ipv8.networkmanager.add_receiver(Box::new(AReceiver));

ipv8.start();
// wait for it to start up
thread::sleep(Duration::from_millis(300));

// now try to send ipv8 a message
let sender_socket =
UdpSocket::bind(&SocketAddr::new(IpAddr::V4(address), SEND_PORT)).unwrap();

sender_socket
.connect(SocketAddr::new(IpAddr::V4(address), RECV_PORT))
.unwrap();

let a = sender_socket.send(OGPACKET.raw()).unwrap();
assert_eq!(a, OGPACKET.raw().len());

thread::sleep(Duration::from_millis(20));

let b = sender_socket.send(OGPACKET.raw()).unwrap();
assert_eq!(b, OGPACKET.raw().len());

thread::sleep(Duration::from_millis(20));

// a poor man's `verify(AReceiver, times(2)).on_receiver();`
assert_eq!(2, PACKET_COUNTER.load(std::sync::atomic::Ordering::SeqCst));
}
}
Loading

0 comments on commit f3ca505

Please sign in to comment.