diff --git a/roles/Cargo.lock b/roles/Cargo.lock index fbaeadc6da..ea0dcfa1cb 100644 --- a/roles/Cargo.lock +++ b/roles/Cargo.lock @@ -1341,7 +1341,6 @@ version = "0.1.1" dependencies = [ "async-channel 1.9.0", "async-recursion 0.3.2", - "async-std", "binary_sv2", "buffer_sv2", "clap", @@ -1354,6 +1353,7 @@ dependencies = [ "roles_logic_sv2", "sha2 0.10.8", "stratum-common", + "tokio", "tracing", "tracing-subscriber", ] @@ -2053,7 +2053,7 @@ dependencies = [ [[package]] name = "sv1_api" -version = "1.0.0" +version = "1.0.1" dependencies = [ "binary_sv2", "bitcoin_hashes 0.3.2", @@ -2117,9 +2117,9 @@ dependencies = [ [[package]] name = "tokio" -version = "1.37.0" +version = "1.38.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1adbebffeca75fcfd058afa480fb6c0b81e165a0323f9c9d39c9697e37c46787" +checksum = "ba4f4a02a7a80d6f274636f0aa95c7e383b912d41fe721a31f29e29698585a4a" dependencies = [ "backtrace", "bytes", @@ -2136,9 +2136,9 @@ dependencies = [ [[package]] name = "tokio-macros" -version = "2.2.0" +version = "2.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5b8a1e28f2deaa14e508979454cb3a223b10b938b45af148bc0986de36f1923b" +checksum = "5f5ae998a069d4b5aba8ee9dad856af7d520c3699e6159b185c2acd48155d39a" dependencies = [ "proc-macro2", "quote", diff --git a/roles/test-utils/mining-device/Cargo.toml b/roles/test-utils/mining-device/Cargo.toml index 4ce6c5037a..ca710fefb0 100644 --- a/roles/test-utils/mining-device/Cargo.toml +++ b/roles/test-utils/mining-device/Cargo.toml @@ -12,9 +12,8 @@ codec_sv2 = { version = "^1.0.1", path = "../../../protocols/v2/codec-sv2", feat roles_logic_sv2 = { version = "1.0.0", path = "../../../protocols/v2/roles-logic-sv2" } const_sv2 = { version = "1.0.0", path = "../../../protocols/v2/const-sv2" } async-channel = "1.5.1" -async-std={version = "1.8.0", features = ["attributes"]} binary_sv2 = { version = "1.0.0", path = "../../../protocols/v2/binary-sv2/binary-sv2" } -network_helpers_sv2 = { version = "2.0.0", path = "../../roles-utils/network-helpers", features=["async_std"] } +network_helpers_sv2 = { version = "2.0.0", path = "../../roles-utils/network-helpers", features=["tokio"] } buffer_sv2 = { version = "1.0.0", path = "../../../utils/buffer"} async-recursion = "0.3.2" rand = "0.8.4" @@ -24,3 +23,4 @@ clap = { version = "^4.5.4", features = ["derive"] } tracing = { version = "0.1" } tracing-subscriber = "0.3" sha2 = "0.10.6" +tokio = "^1.38.0" diff --git a/roles/test-utils/mining-device/README.md b/roles/test-utils/mining-device/README.md new file mode 100644 index 0000000000..4065c2c30b --- /dev/null +++ b/roles/test-utils/mining-device/README.md @@ -0,0 +1,21 @@ +# CPU Sv2 mining device + +Header only sv2 cpu miner. + +``` +Usage: mining-device [OPTIONS] --address-pool + +Options: + -p, --pubkey-pool Pool pub key, when left empty the pool certificate is not checked + -i, --id-device Sometimes used by the pool to identify the device + -a, --address-pool Address of the pool in this format ip:port or domain:port + --handicap This value is used to slow down the cpu miner, it represents the number of micro-seconds that are awaited between hashes [default: 0] + --id-user User id, used when a new channel is opened, it can be used by the pool to identify the miner + -h, --help Print help + -V, --version Print version +``` + +Usage example: +``` +cargo run --release -- --address-pool 127.0.0.1:20000 --id-device device_id::SOLO::bc1qxy2kgdygjrsqtzq2n0yrf2493p83kkfjhx0wlh +``` diff --git a/roles/test-utils/mining-device/src/main.rs b/roles/test-utils/mining-device/src/main.rs index 763f83af5a..35a31d56ac 100644 --- a/roles/test-utils/mining-device/src/main.rs +++ b/roles/test-utils/mining-device/src/main.rs @@ -1,10 +1,17 @@ -use async_std::net::TcpStream; +#![allow(clippy::option_map_unit_fn)] use key_utils::Secp256k1PublicKey; -use network_helpers_sv2::Connection; +use network_helpers_sv2::noise_connection_tokio::Connection; use roles_logic_sv2::utils::Id; -use std::{net::SocketAddr, sync::Arc, thread::sleep, time::Duration}; +use std::{ + net::{SocketAddr, ToSocketAddrs}, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, + time::Duration, +}; +use tokio::net::TcpStream; -use async_std::net::ToSocketAddrs; use clap::Parser; use rand::{thread_rng, Rng}; use std::time::Instant; @@ -36,7 +43,7 @@ struct Args { address_pool: String, #[arg( long, - help = "This value is used to slow down the cpu miner, it rapresents the number of micro-seconds that are awaited between hashes", + help = "This value is used to slow down the cpu miner, it represents the number of micro-seconds that are awaited between hashes", default_value = "0" )] handicap: u32, @@ -57,14 +64,12 @@ async fn connect( let address = address .clone() .to_socket_addrs() - .await .expect("Invalid pool address, use one of this formats: ip:port, domain:port") .next() .expect("Invalid pool address, use one of this formats: ip:port, domain:port"); info!("Connecting to pool at {}", address); let socket = loop { - let pool = - async_std::future::timeout(Duration::from_secs(5), TcpStream::connect(address)).await; + let pool = tokio::time::timeout(Duration::from_secs(5), TcpStream::connect(address)).await; match pool { Ok(result) => match result { Ok(socket) => break socket, @@ -73,7 +78,7 @@ async fn connect( "Failed to connect to Upstream role at {}, retrying in 5s: {}", address, e ); - sleep(Duration::from_secs(5)); + tokio::time::sleep(Duration::from_secs(5)).await; } }, Err(_) => { @@ -85,15 +90,15 @@ async fn connect( info!("Pool tcp connection established at {}", address); let address = socket.peer_addr().unwrap(); let initiator = Initiator::new(pub_key.map(|e| e.0)); - let (receiver, sender): (Receiver, Sender) = - Connection::new(socket, codec_sv2::HandshakeRole::Initiator(initiator), 10) + let (receiver, sender, _, _): (Receiver, Sender, _, _) = + Connection::new(socket, codec_sv2::HandshakeRole::Initiator(initiator)) .await .unwrap(); info!("Pool noise connection established at {}", address); Device::start(receiver, sender, address, device_id, user_id, handicap).await } -#[async_std::main] +#[tokio::main(flavor = "current_thread")] async fn main() { let args = Args::parse(); tracing_subscriber::fmt::init(); @@ -218,6 +223,12 @@ impl ParseUpstreamCommonMessages for SetupConnectionHandler { } } +#[derive(Debug, Clone)] +struct NewWorkNotifier { + should_send: bool, + sender: Sender<()>, +} + #[derive(Debug)] pub struct Device { #[allow(dead_code)] @@ -230,13 +241,15 @@ pub struct Device { jobs: Vec>, prev_hash: Option>, sequence_numbers: Id, + notify_changes_to_mining_thread: NewWorkNotifier, } fn open_channel(device_id: Option) -> OpenStandardMiningChannel<'static> { let user_identity = device_id.unwrap_or_default().try_into().unwrap(); let id: u32 = 10; info!("Measuring CPU hashrate"); - let nominal_hash_rate = measure_hashrate(5) as f32; + let p = std::thread::available_parallelism().unwrap().get() as u32 - 3; + let nominal_hash_rate = measure_hashrate(5) as f32 * p as f32; info!("Pc hashrate is {}", nominal_hash_rate); info!("MINING DEVICE: send open channel with request id {}", id); OpenStandardMiningChannel { @@ -267,6 +280,7 @@ impl Device { .await; info!("Pool sv2 connection established at {}", addr); let miner = Arc::new(Mutex::new(Miner::new(handicap))); + let (notify_changes_to_mining_thread, update_miners) = async_channel::unbounded(); let self_ = Self { channel_opened: false, receiver: receiver.clone(), @@ -276,6 +290,10 @@ impl Device { prev_hash: None, channel_id: None, sequence_numbers: Id::new(), + notify_changes_to_mining_thread: NewWorkNotifier { + should_send: true, + sender: notify_changes_to_mining_thread, + }, }; let open_channel = MiningDeviceMessages::Mining(Mining::OpenStandardMiningChannel(open_channel(user_id))); @@ -286,24 +304,8 @@ impl Device { let (share_send, share_recv) = async_channel::unbounded(); - let handicap = miner.safe_lock(|m| m.handicap).unwrap(); - std::thread::spawn(move || loop { - std::thread::sleep(std::time::Duration::from_micros(handicap.into())); - if miner.safe_lock(|m| m.next_share()).unwrap().is_valid() { - let nonce = miner.safe_lock(|m| m.header.unwrap().nonce).unwrap(); - let time = miner.safe_lock(|m| m.header.unwrap().time).unwrap(); - let job_id = miner.safe_lock(|m| m.job_id).unwrap(); - let version = miner.safe_lock(|m| m.version).unwrap(); - share_send - .try_send((nonce, job_id.unwrap(), version.unwrap(), time)) - .unwrap(); - } - miner - .safe_lock(|m| m.header.as_mut().map(|h| h.nonce += 1)) - .unwrap(); - }); - - async_std::task::spawn(async move { + start_mining_threads(update_miners, miner, share_send); + tokio::task::spawn(async move { let recv = share_recv.clone(); loop { let (nonce, job_id, version, ntime) = recv.recv().await.unwrap(); @@ -322,6 +324,21 @@ impl Device { MiningRoutingLogic::None, ) .unwrap(); + let mut notify_changes_to_mining_thread = self_mutex + .safe_lock(|s| s.notify_changes_to_mining_thread.clone()) + .unwrap(); + if notify_changes_to_mining_thread.should_send + && (message_type == const_sv2::MESSAGE_TYPE_NEW_MINING_JOB + || message_type == const_sv2::MESSAGE_TYPE_SET_NEW_PREV_HASH + || message_type == const_sv2::MESSAGE_TYPE_SET_TARGET) + { + notify_changes_to_mining_thread + .sender + .send(()) + .await + .unwrap(); + notify_changes_to_mining_thread.should_send = false; + }; match next { SendTo::RelayNewMessageToRemote(_, m) => { let sv2_frame: StdFrame = MiningDeviceMessages::Mining(m).try_into().unwrap(); @@ -425,6 +442,7 @@ impl ParseUpstreamMiningMessages<(), NullDownstreamMiningSelector, NoRouting> fo self.miner .safe_lock(|miner| miner.new_target(m.target.to_vec())) .unwrap(); + self.notify_changes_to_mining_thread.should_send = true; Ok(SendTo::None(None)) } @@ -477,6 +495,7 @@ impl ParseUpstreamMiningMessages<(), NullDownstreamMiningSelector, NoRouting> fo .safe_lock(|miner| miner.new_header(p_h, &m)) .unwrap(); self.jobs = vec![m.as_static()]; + self.notify_changes_to_mining_thread.should_send = true; } (true, _) => self.jobs.push(m.as_static()), (false, None) => { @@ -509,6 +528,7 @@ impl ParseUpstreamMiningMessages<(), NullDownstreamMiningSelector, NoRouting> fo .unwrap(); self.jobs = vec![jobs[0].clone()]; self.prev_hash = Some(m.as_static()); + self.notify_changes_to_mining_thread.should_send = true; } _ => panic!(), } @@ -533,6 +553,7 @@ impl ParseUpstreamMiningMessages<(), NullDownstreamMiningSelector, NoRouting> fo self.miner .safe_lock(|miner| miner.new_target(m.maximum_target.to_vec())) .unwrap(); + self.notify_changes_to_mining_thread.should_send = true; Ok(SendTo::None(None)) } @@ -541,7 +562,7 @@ impl ParseUpstreamMiningMessages<(), NullDownstreamMiningSelector, NoRouting> fo } } -#[derive(Debug)] +#[derive(Debug, Clone)] struct Miner { header: Option, target: Option, @@ -669,3 +690,74 @@ fn generate_random_32_byte_array() -> [u8; 32] { rng.fill(&mut arr[..]); arr } + +fn start_mining_threads( + have_new_job: Receiver<()>, + miner: Arc>, + share_send: Sender<(u32, u32, u32, u32)>, +) { + tokio::task::spawn(async move { + let mut killers: Vec> = vec![]; + loop { + let available_parallelism = u32::max( + 2, + std::thread::available_parallelism().unwrap().get() as u32, + ); + let p = available_parallelism - 1; + let unit = u32::MAX / p; + while have_new_job.recv().await.is_ok() { + while let Some(killer) = killers.pop() { + killer.store(true, Ordering::Relaxed); + } + let miner = miner.safe_lock(|m| m.clone()).unwrap(); + for i in 0..p { + let mut miner = miner.clone(); + let share_send = share_send.clone(); + let killer = Arc::new(AtomicBool::new(false)); + miner.header.as_mut().map(|h| h.nonce = i * unit); + killers.push(killer.clone()); + std::thread::spawn(move || { + mine(miner, share_send, killer); + }); + } + } + } + }); +} + +fn mine(mut miner: Miner, share_send: Sender<(u32, u32, u32, u32)>, kill: Arc) { + if miner.handicap != 0 { + loop { + if kill.load(Ordering::Relaxed) { + break; + } + std::thread::sleep(std::time::Duration::from_micros(miner.handicap.into())); + if miner.next_share().is_valid() { + let nonce = miner.header.unwrap().nonce; + let time = miner.header.unwrap().time; + let job_id = miner.job_id.unwrap(); + let version = miner.version; + share_send + .try_send((nonce, job_id, version.unwrap(), time)) + .unwrap(); + } + miner.header.as_mut().map(|h| h.nonce += 1); + } + } else { + loop { + if miner.next_share().is_valid() { + if kill.load(Ordering::Relaxed) { + break; + } + let nonce = miner.header.unwrap().nonce; + let time = miner.header.unwrap().time; + let job_id = miner.job_id.unwrap(); + let version = miner.version; + share_send + .try_send((nonce, job_id, version.unwrap(), time)) + .unwrap(); + } + miner.header.as_mut().map(|h| h.nonce += 1); + } + } +}