diff --git a/Cargo.lock b/Cargo.lock index 077c3609ecd..2db7018d9d2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3509,6 +3509,21 @@ dependencies = [ "unicode-xid", ] +[[package]] +name = "sysinfo" +version = "0.24.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e6241cec618592e5d52f7ed0c8abba0cd1969a5aa4be7b5351281d922113e1d" +dependencies = [ + "cfg-if", + "core-foundation-sys", + "libc", + "ntapi", + "once_cell", + "rayon", + "winapi", +] + [[package]] name = "tap" version = "1.0.1" @@ -3569,7 +3584,9 @@ dependencies = [ "iroha_logger", "once_cell", "rand 0.8.5", + "sysinfo", "tempfile", + "thiserror", "tokio", "unique_port", ] diff --git a/cli/src/torii/tests.rs b/cli/src/torii/tests.rs index 5c333776069..9c719d4f39d 100644 --- a/cli/src/torii/tests.rs +++ b/cli/src/torii/tests.rs @@ -42,7 +42,7 @@ async fn try_init_network(config: &mut Configuration) -> Result RawGenesisBlock { let mut builder = RawGenesisBlockBuilder::new(); @@ -29,7 +30,7 @@ fn main() { builder.build() } - let mut peer = ::new().expect("Failed to create peer"); + let mut peer = ::new().await.expect("Failed to create peer"); let configuration = get_config( std::iter::once(peer.id.clone()).collect(), Some(get_key_pair()), diff --git a/client/tests/integration/restart_peer.rs b/client/tests/integration/restart_peer.rs index b474b564b87..6a2239f9d37 100644 --- a/client/tests/integration/restart_peer.rs +++ b/client/tests/integration/restart_peer.rs @@ -12,12 +12,13 @@ use tokio::runtime::Runtime; use super::Configuration; -#[test] -fn restarted_peer_should_have_the_same_asset_amount() -> Result<()> { +// #[test] +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn restarted_peer_should_have_the_same_asset_amount() -> Result<()> { let temp_dir = Arc::new(TempDir::new()?); let mut configuration = Configuration::test(); - let mut peer = ::new()?; + let mut peer = ::new().await?; configuration.sumeragi.trusted_peers.peers = std::iter::once(peer.id.clone()).collect(); let pipeline_time = Duration::from_millis(configuration.sumeragi.pipeline_time_ms()); diff --git a/core/test_network/Cargo.toml b/core/test_network/Cargo.toml index 6e8ccaed36e..ecaa3dbf9f5 100644 --- a/core/test_network/Cargo.toml +++ b/core/test_network/Cargo.toml @@ -20,11 +20,13 @@ iroha_actor = { version = "=2.0.0-pre-rc.5", path = "../../actor" } iroha = { path = "../../cli", features = ["test-network"] } eyre = "0.6.5" +futures = { version = "0.3.17", default-features = false, features = ["std", "async-await"] } +rand = "0.8" +sysinfo = "0.24.3" tempfile = "3" -unique_port = "0.1.0" +thiserror = "1.0.28" tokio = { version = "1.6.0", features = ["rt", "rt-multi-thread", "macros"]} -rand = "0.8" -futures = { version = "0.3.17", default-features = false, features = ["std", "async-await"] } +unique_port = "0.1.0" [dev-dependencies] diff --git a/core/test_network/src/lib.rs b/core/test_network/src/lib.rs index 4ed9fac8d28..afda227e84d 100644 --- a/core/test_network/src/lib.rs +++ b/core/test_network/src/lib.rs @@ -3,9 +3,21 @@ #![allow(clippy::restriction, clippy::future_not_send)] use core::{fmt::Debug, str::FromStr as _, time::Duration}; -use std::{collections::HashMap, sync::Arc, thread}; +use std::{ + collections::HashMap, + collections::HashSet, + env, + fs::{self, File}, + io::Write, + net::{Ipv4Addr, SocketAddrV4, TcpListener}, + path::PathBuf, + process, + sync::Arc, + thread, + time::SystemTime, +}; -use eyre::{Error, Result}; +use eyre::{Error, Report, Result}; use futures::{prelude::*, stream::FuturesUnordered}; use iroha::{config::Configuration, torii::config::ToriiConfiguration, Iroha}; use iroha_actor::{broker::*, prelude::*}; @@ -22,6 +34,7 @@ use iroha_core::{ use iroha_data_model::{peer::Peer as DataModelPeer, prelude::*}; use iroha_logger::{Configuration as LoggerConfiguration, InstrumentFutures}; use rand::seq::IteratorRandom; +use sysinfo::{Pid, PidExt, System, SystemExt}; use tempfile::TempDir; use tokio::{ runtime::{self, Runtime}, @@ -250,9 +263,13 @@ where offline_peers: u32, ) -> Result { let n_peers = n_peers - 1; - let mut genesis = Peer::::new()?; + let mut genesis = Peer::::new().await?; let mut peers = (0..n_peers) .map(|_| Peer::new()) + .collect::>() + .collect::>() + .await + .into_iter() .map(|result| result.map(|peer| (peer.id.clone(), peer))) .collect::>>()?; @@ -499,11 +516,12 @@ where /// - `p2p_address` /// - `api_address` /// - `telemetry_address` - pub fn new() -> Result { + pub async fn new() -> Result { let key_pair = KeyPair::generate()?; - let p2p_address = local_unique_port()?; - let api_address = local_unique_port()?; - let telemetry_address = local_unique_port()?; + let port_provider = UniquePortProvider::new()?; + let p2p_address = port_provider.new_unique_local_address().await?.to_string(); + let api_address = port_provider.new_unique_local_address().await?.to_string(); + let telemetry_address = port_provider.new_unique_local_address().await?.to_string(); let id = PeerId { address: p2p_address.clone(), public_key: key_pair.public_key().clone(), @@ -667,7 +685,7 @@ where self, ) -> Peer, Sumeragi, W>, BlockSynchronizer, W>, W>> { - let mut peer = Peer::new().expect("Failed to create a peer."); + let mut peer = Peer::new().await.expect("Failed to create a peer."); self.start_with_peer(&mut peer).await; peer } @@ -726,6 +744,182 @@ where } } +struct UniquePortProvider { + id: u128, +} + +impl UniquePortProvider { + const STORAGE_PATH_PREFIX: &'static str = "iroha_port_provider_storage_"; + const LOCKFILE_NAME: &'static str = "iroha_port_provider_lockfile"; + const LOCK_WAIT_TIMEOUT: Duration = Duration::from_secs(3); + const NEW_PORT_TIMEOUT: Duration = Duration::from_secs(5); + + fn new() -> Result { + let now = SystemTime::now(); + let id = now.duration_since(std::time::UNIX_EPOCH)?.as_nanos(); + Ok(Self { id }) + } + + async fn new_unique_local_address(&self) -> Result { + let port = self.new_free_port().await?; + Ok(SocketAddrV4::new(Ipv4Addr::LOCALHOST, port)) + } + + async fn new_free_port(&self) -> Result { + self.lock().await?; + let result = self.find_free_port().and_then(|port| self.store_port(port)); + self.unlock()?; + result + } + + async fn lock(&self) -> Result<()> { + let path = Self::lockfile_path(); + + if self.check_lockfile(&path).await? { + return Ok(()); + } + + fs::write(&path, self.id.to_string())?; + + Ok(()) + } + + fn find_free_port(&self) -> Result { + let taken_ports = self.taken_ports()?; + let from = SystemTime::now(); + loop { + let now = SystemTime::now(); + if now.duration_since(from)? > Self::NEW_PORT_TIMEOUT { + return Err(Report::new(UniquePortProviderError::Timeout)); + } + + let new_port = rand::random::(); + let tcp_listener = TcpListener::bind(SocketAddrV4::new(Ipv4Addr::LOCALHOST, new_port)); + + if taken_ports.contains(&new_port) || tcp_listener.is_err() { + continue; + } + + return Ok(new_port); + } + } + + fn store_port(&self, port: u16) -> Result { + let path = Self::storage_path(); + let mut file = File::options().append(true).create(true).open(path)?; + writeln!(&mut file, "{}", port)?; + + Ok(port) + } + + fn unlock(&self) -> Result<()> { + let path = Self::lockfile_path(); + if path.exists() { + fs::remove_file(&path)?; + } + Ok(()) + } + + fn lockfile_path() -> PathBuf { + let mut result = env::temp_dir(); + result.push(Self::LOCKFILE_NAME); + result + } + + fn taken_ports(&self) -> Result> { + self.remove_orphan_storages()?; + self.storages()? + .into_iter() + .map(|(_, path)| fs::read_to_string(path).map_err(Report::new)) + .collect::>>()? + .iter() + .map(|value| value.lines()) + .flatten() + .map(|line| line.parse::().map_err(Report::new)) + .collect() + } + + fn remove_orphan_storages(&self) -> Result<()> { + let sysinfo = System::new(); + let processes = sysinfo.processes(); + + self.storages()? + .into_iter() + .filter(|(pid, _)| { + let pid = Pid::from_u32(*pid); + !processes.contains_key(&pid) + }) + .map(|(_, path)| fs::remove_file(path).map_err(Report::new)) + .collect() + } + + fn storages(&self) -> Result> { + let target_dir = env::temp_dir(); + let mut result = HashMap::new(); + for entry in fs::read_dir(target_dir)? { + let entry = entry?; + let is_file = entry.path().is_file(); + let file_name = entry.file_name(); + let file_name = file_name.to_string_lossy(); + if is_file && file_name.starts_with(Self::STORAGE_PATH_PREFIX) { + let id: u32 = file_name + .strip_prefix(Self::STORAGE_PATH_PREFIX) + .expect("Can't parse storage id") + .parse()?; + result.insert(id, entry.path()); + } + } + + Ok(result) + } + + fn storage_path() -> PathBuf { + let id = process::id(); + Self::storage_path_from_id(id) + } + + fn storage_path_from_id(id: u32) -> PathBuf { + let mut result = env::temp_dir(); + let file_path = PathBuf::from(format!("{}{}", Self::STORAGE_PATH_PREFIX, id)); + result.push(file_path); + result + } + + async fn check_lockfile(&self, path: &PathBuf) -> Result { + if !path.exists() { + return Ok(true); + } + + if !self.check_lockfile_id(path)? { + self.wait_for_unlock(path).await?; + } + + Ok(false) + } + + fn check_lockfile_id(&self, path: &PathBuf) -> Result { + let id: u128 = fs::read_to_string(path)?.parse()?; + Ok(self.id == id) + } + + async fn wait_for_unlock(&self, path: &PathBuf) -> Result<()> { + let from = SystemTime::now(); + loop { + let now = SystemTime::now(); + if !path.exists() || now.duration_since(from)? > Self::LOCK_WAIT_TIMEOUT { + return Ok(()); + } + time::sleep(Duration::from_millis(2)).await; + } + } +} + +#[derive(Debug, thiserror::Error)] +enum UniquePortProviderError { + #[error("Can't find a free port")] + Timeout, +} + fn local_unique_port() -> Result { Ok(format!( "127.0.0.1:{}", @@ -875,8 +1069,6 @@ impl TestRuntime for Runtime { } } -use std::collections::HashSet; - impl TestConfiguration for Configuration { fn test() -> Self { let mut configuration = iroha::samples::get_config(HashSet::new(), Some(get_key_pair())); diff --git a/p2p/tests/integration/p2p.rs b/p2p/tests/integration/p2p.rs index d7f88b6e025..0b0a24f0614 100644 --- a/p2p/tests/integration/p2p.rs +++ b/p2p/tests/integration/p2p.rs @@ -209,6 +209,88 @@ async fn two_networks() { assert_eq!(connected_peers.peers.len(), 1); } +async fn init_peers() -> Vec<(Addr>, String, Broker, KeyPair)> { + (0..10) + .map(|_| { + let broker = Broker::new(); + let keypair = KeyPair::generate().unwrap(); + let fut = { + let broker = broker.clone(); + let keypair = keypair.clone(); + async { (broker, keypair) } + }; + futures::future::join(init_network(broker, keypair.public_key().to_owned()), fut) + }) + .collect::>() + .collect::>() + .await + .into_iter() + .map(|((network, addr), (broker, keypair))| (network, addr, broker, keypair)) + .collect() +} + +async fn init_test_actor(broker: &Broker, messages: Arc) { + // This actor will get the messages from other peers and increment the counter + let actor = TestActor { + broker: broker.to_owned(), + messages, + }; + actor.start().await; +} + +async fn send_message_to_peers( + sender: &str, + receivers: &[String], + broker: &Broker, + public_key: &iroha_crypto::PublicKey, +) -> usize { + let delay: u64 = rand::random(); + time::sleep(Duration::from_millis(250 + (delay % 500))).await; + + let mut conn_count = 0; + for p in receivers { + if p != sender { + let peer = PeerId { + address: p.to_owned(), + public_key: public_key.to_owned(), + }; + + broker + .issue_send(ConnectPeer { + address: peer.address, + }) + .await; + conn_count += 1; + time::sleep(Duration::from_millis(100)).await; + } + } + conn_count +} + +async fn start_network( + sender: String, + network: Addr>, + broker: Broker, + public_key: iroha_crypto::PublicKey, + receivers: &[String], +) { + info!(peer_addr = %sender, "Starting network"); + + let conn_count = send_message_to_peers(&sender, receivers, &broker, &public_key).await; + + let mut test_count = 0_usize; + while let Ok(result) = network.send(iroha_p2p::network::GetConnectedPeers).await { + let connections = result.peers.len(); + info!(peer_addr = %sender, %connections); + if connections == conn_count || test_count >= 10 { + break; + } + test_count += 1; + time::sleep(Duration::from_millis(1000)).await; + } + info!(peer_addr = %sender, %conn_count, "Got all connections!"); +} + #[tokio::test(flavor = "multi_thread", worker_threads = 8)] async fn multiple_networks() { let log_config = Configuration { @@ -274,88 +356,6 @@ async fn multiple_networks() { assert_eq!(msgs.load(Ordering::SeqCst), 90); } -async fn init_peers() -> Vec<(Addr>, String, Broker, KeyPair)> { - (0..10) - .map(|_| { - let broker = Broker::new(); - let keypair = KeyPair::generate().unwrap(); - let fut = { - let broker = broker.clone(); - let keypair = keypair.clone(); - async { (broker, keypair) } - }; - futures::future::join(init_network(broker, keypair.public_key().to_owned()), fut) - }) - .collect::>() - .collect::>() - .await - .into_iter() - .map(|((network, addr), (broker, keypair))| (network, addr, broker, keypair)) - .collect() -} - -async fn init_test_actor(broker: &Broker, messages: Arc) { - // This actor will get the messages from other peers and increment the counter - let actor = TestActor { - broker: broker.to_owned(), - messages, - }; - actor.start().await; -} - -async fn start_network( - sender: String, - network: Addr>, - broker: Broker, - public_key: iroha_crypto::PublicKey, - receivers: &[String], -) { - info!(peer_addr = %sender, "Starting network"); - - let conn_count = send_message_to_peers(&sender, receivers, &broker, &public_key).await; - - let mut test_count = 0_usize; - while let Ok(result) = network.send(iroha_p2p::network::GetConnectedPeers).await { - let connections = result.peers.len(); - info!(peer_addr = %sender, %connections); - if connections == conn_count || test_count >= 10 { - break; - } - test_count += 1; - time::sleep(Duration::from_millis(1000)).await; - } - info!(peer_addr = %sender, %conn_count, "Got all connections!"); -} - -async fn send_message_to_peers( - sender: &str, - receivers: &[String], - broker: &Broker, - public_key: &iroha_crypto::PublicKey, -) -> usize { - let delay: u64 = rand::random(); - time::sleep(Duration::from_millis(250 + (delay % 500))).await; - - let mut conn_count = 0; - for p in receivers { - if p != sender { - let peer = PeerId { - address: p.to_owned(), - public_key: public_key.to_owned(), - }; - - broker - .issue_send(ConnectPeer { - address: peer.address, - }) - .await; - conn_count += 1; - time::sleep(Duration::from_millis(100)).await; - } - } - conn_count -} - #[test] fn test_encryption() { use iroha_crypto::ursa::encryption::symm::prelude::*;