Skip to content

Commit

Permalink
[fix] hyperledger-iroha#2137: Add UniquePortProvider
Browse files Browse the repository at this point in the history
Signed-off-by: Ales Tsurko <[email protected]>
  • Loading branch information
ales-tsurko committed Jun 13, 2022
1 parent c4e5ea0 commit d262a27
Show file tree
Hide file tree
Showing 7 changed files with 314 additions and 101 deletions.
17 changes: 17 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion cli/src/torii/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ async fn try_init_network(config: &mut Configuration) -> Result<IrohaNetwork, ir
Broker::new(),
config.torii.p2p_addr.clone(),
config.public_key.clone(),
config.network.mailbox,
config.network.actor_channel_capacity,
)
.await
}
Expand Down
5 changes: 3 additions & 2 deletions client/examples/million_accounts_genesis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ use iroha_data_model::prelude::*;
use test_network::{get_key_pair, Peer as TestPeer, PeerBuilder, TestRuntime};
use tokio::runtime::Runtime;

fn main() {
#[tokio::main]
async fn main() {
fn generate_genesis(num_domains: u32) -> RawGenesisBlock {
let mut builder = RawGenesisBlockBuilder::new();

Expand All @@ -29,7 +30,7 @@ fn main() {

builder.build()
}
let mut peer = <TestPeer>::new().expect("Failed to create peer");
let mut peer = <TestPeer>::new().await.expect("Failed to create peer");
let configuration = get_config(
std::iter::once(peer.id.clone()).collect(),
Some(get_key_pair()),
Expand Down
7 changes: 4 additions & 3 deletions client/tests/integration/restart_peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,13 @@ use tokio::runtime::Runtime;

use super::Configuration;

#[test]
fn restarted_peer_should_have_the_same_asset_amount() -> Result<()> {
// #[test]
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn restarted_peer_should_have_the_same_asset_amount() -> Result<()> {
let temp_dir = Arc::new(TempDir::new()?);

let mut configuration = Configuration::test();
let mut peer = <TestPeer>::new()?;
let mut peer = <TestPeer>::new().await?;
configuration.sumeragi.trusted_peers.peers = std::iter::once(peer.id.clone()).collect();

let pipeline_time = Duration::from_millis(configuration.sumeragi.pipeline_time_ms());
Expand Down
8 changes: 5 additions & 3 deletions core/test_network/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@ iroha_actor = { version = "=2.0.0-pre-rc.5", path = "../../actor" }
iroha = { path = "../../cli", features = ["test-network"] }

eyre = "0.6.5"
futures = { version = "0.3.17", default-features = false, features = ["std", "async-await"] }
rand = "0.8"
sysinfo = "0.24.3"
tempfile = "3"
unique_port = "0.1.0"
thiserror = "1.0.28"
tokio = { version = "1.6.0", features = ["rt", "rt-multi-thread", "macros"]}
rand = "0.8"
futures = { version = "0.3.17", default-features = false, features = ["std", "async-await"] }
unique_port = "0.1.0"

[dev-dependencies]

Expand Down
212 changes: 202 additions & 10 deletions core/test_network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,21 @@
#![allow(clippy::restriction, clippy::future_not_send)]

use core::{fmt::Debug, str::FromStr as _, time::Duration};
use std::{collections::HashMap, sync::Arc, thread};
use std::{
collections::HashMap,
collections::HashSet,
env,
fs::{self, File},
io::Write,
net::{Ipv4Addr, SocketAddrV4, TcpListener},
path::PathBuf,
process,
sync::Arc,
thread,
time::SystemTime,
};

use eyre::{Error, Result};
use eyre::{Error, Report, Result};
use futures::{prelude::*, stream::FuturesUnordered};
use iroha::{config::Configuration, torii::config::ToriiConfiguration, Iroha};
use iroha_actor::{broker::*, prelude::*};
Expand All @@ -22,6 +34,7 @@ use iroha_core::{
use iroha_data_model::{peer::Peer as DataModelPeer, prelude::*};
use iroha_logger::{Configuration as LoggerConfiguration, InstrumentFutures};
use rand::seq::IteratorRandom;
use sysinfo::{Pid, PidExt, System, SystemExt};
use tempfile::TempDir;
use tokio::{
runtime::{self, Runtime},
Expand Down Expand Up @@ -250,9 +263,13 @@ where
offline_peers: u32,
) -> Result<Self> {
let n_peers = n_peers - 1;
let mut genesis = Peer::<W, G, K, S, B>::new()?;
let mut genesis = Peer::<W, G, K, S, B>::new().await?;
let mut peers = (0..n_peers)
.map(|_| Peer::new())
.collect::<FuturesUnordered<_>>()
.collect::<Vec<_>>()
.await
.into_iter()
.map(|result| result.map(|peer| (peer.id.clone(), peer)))
.collect::<Result<HashMap<_, _>>>()?;

Expand Down Expand Up @@ -499,11 +516,12 @@ where
/// - `p2p_address`
/// - `api_address`
/// - `telemetry_address`
pub fn new() -> Result<Self> {
pub async fn new() -> Result<Self> {
let key_pair = KeyPair::generate()?;
let p2p_address = local_unique_port()?;
let api_address = local_unique_port()?;
let telemetry_address = local_unique_port()?;
let port_provider = UniquePortProvider::new()?;
let p2p_address = port_provider.new_unique_local_address().await?.to_string();
let api_address = port_provider.new_unique_local_address().await?.to_string();
let telemetry_address = port_provider.new_unique_local_address().await?.to_string();
let id = PeerId {
address: p2p_address.clone(),
public_key: key_pair.public_key().clone(),
Expand Down Expand Up @@ -667,7 +685,7 @@ where
self,
) -> Peer<W, G, Kura<W>, Sumeragi<G, Kura<W>, W>, BlockSynchronizer<Sumeragi<G, Kura<W>, W>, W>>
{
let mut peer = Peer::new().expect("Failed to create a peer.");
let mut peer = Peer::new().await.expect("Failed to create a peer.");
self.start_with_peer(&mut peer).await;
peer
}
Expand Down Expand Up @@ -726,6 +744,182 @@ where
}
}

struct UniquePortProvider {
id: u128,
}

impl UniquePortProvider {
const STORAGE_PATH_PREFIX: &'static str = "iroha_port_provider_storage_";
const LOCKFILE_NAME: &'static str = "iroha_port_provider_lockfile";
const LOCK_WAIT_TIMEOUT: Duration = Duration::from_secs(3);
const NEW_PORT_TIMEOUT: Duration = Duration::from_secs(5);

fn new() -> Result<Self> {
let now = SystemTime::now();
let id = now.duration_since(std::time::UNIX_EPOCH)?.as_nanos();
Ok(Self { id })
}

async fn new_unique_local_address(&self) -> Result<SocketAddrV4> {
let port = self.new_free_port().await?;
Ok(SocketAddrV4::new(Ipv4Addr::LOCALHOST, port))
}

async fn new_free_port(&self) -> Result<u16> {
self.lock().await?;
let result = self.find_free_port().and_then(|port| self.store_port(port));
self.unlock()?;
result
}

async fn lock(&self) -> Result<()> {
let path = Self::lockfile_path();

if self.check_lockfile(&path).await? {
return Ok(());
}

fs::write(&path, self.id.to_string())?;

Ok(())
}

fn find_free_port(&self) -> Result<u16> {
let taken_ports = self.taken_ports()?;
let from = SystemTime::now();
loop {
let now = SystemTime::now();
if now.duration_since(from)? > Self::NEW_PORT_TIMEOUT {
return Err(Report::new(UniquePortProviderError::Timeout));
}

let new_port = rand::random::<u16>();
let tcp_listener = TcpListener::bind(SocketAddrV4::new(Ipv4Addr::LOCALHOST, new_port));

if taken_ports.contains(&new_port) || tcp_listener.is_err() {
continue;
}

return Ok(new_port);
}
}

fn store_port(&self, port: u16) -> Result<u16> {
let path = Self::storage_path();
let mut file = File::options().append(true).create(true).open(path)?;
writeln!(&mut file, "{}", port)?;

Ok(port)
}

fn unlock(&self) -> Result<()> {
let path = Self::lockfile_path();
if path.exists() {
fs::remove_file(&path)?;
}
Ok(())
}

fn lockfile_path() -> PathBuf {
let mut result = env::temp_dir();
result.push(Self::LOCKFILE_NAME);
result
}

fn taken_ports(&self) -> Result<HashSet<u16>> {
self.remove_orphan_storages()?;
self.storages()?
.into_iter()
.map(|(_, path)| fs::read_to_string(path).map_err(Report::new))
.collect::<Result<Vec<String>>>()?
.iter()
.map(|value| value.lines())
.flatten()
.map(|line| line.parse::<u16>().map_err(Report::new))
.collect()
}

fn remove_orphan_storages(&self) -> Result<()> {
let sysinfo = System::new();
let processes = sysinfo.processes();

self.storages()?
.into_iter()
.filter(|(pid, _)| {
let pid = Pid::from_u32(*pid);
!processes.contains_key(&pid)
})
.map(|(_, path)| fs::remove_file(path).map_err(Report::new))
.collect()
}

fn storages(&self) -> Result<HashMap<u32, PathBuf>> {
let target_dir = env::temp_dir();
let mut result = HashMap::new();
for entry in fs::read_dir(target_dir)? {
let entry = entry?;
let is_file = entry.path().is_file();
let file_name = entry.file_name();
let file_name = file_name.to_string_lossy();
if is_file && file_name.starts_with(Self::STORAGE_PATH_PREFIX) {
let id: u32 = file_name
.strip_prefix(Self::STORAGE_PATH_PREFIX)
.expect("Can't parse storage id")
.parse()?;
result.insert(id, entry.path());
}
}

Ok(result)
}

fn storage_path() -> PathBuf {
let id = process::id();
Self::storage_path_from_id(id)
}

fn storage_path_from_id(id: u32) -> PathBuf {
let mut result = env::temp_dir();
let file_path = PathBuf::from(format!("{}{}", Self::STORAGE_PATH_PREFIX, id));
result.push(file_path);
result
}

async fn check_lockfile(&self, path: &PathBuf) -> Result<bool> {
if !path.exists() {
return Ok(true);
}

if !self.check_lockfile_id(path)? {
self.wait_for_unlock(path).await?;
}

Ok(false)
}

fn check_lockfile_id(&self, path: &PathBuf) -> Result<bool> {
let id: u128 = fs::read_to_string(path)?.parse()?;
Ok(self.id == id)
}

async fn wait_for_unlock(&self, path: &PathBuf) -> Result<()> {
let from = SystemTime::now();
loop {
let now = SystemTime::now();
if !path.exists() || now.duration_since(from)? > Self::LOCK_WAIT_TIMEOUT {
return Ok(());
}
time::sleep(Duration::from_millis(2)).await;
}
}
}

#[derive(Debug, thiserror::Error)]
enum UniquePortProviderError {
#[error("Can't find a free port")]
Timeout,
}

fn local_unique_port() -> Result<String> {
Ok(format!(
"127.0.0.1:{}",
Expand Down Expand Up @@ -875,8 +1069,6 @@ impl TestRuntime for Runtime {
}
}

use std::collections::HashSet;

impl TestConfiguration for Configuration {
fn test() -> Self {
let mut configuration = iroha::samples::get_config(HashSet::new(), Some(get_key_pair()));
Expand Down
Loading

0 comments on commit d262a27

Please sign in to comment.