diff --git a/.github/workflows/sv2-header-check.yaml b/.github/workflows/sv2-header-check.yaml index 732bd316b7..62f5231fdf 100644 --- a/.github/workflows/sv2-header-check.yaml +++ b/.github/workflows/sv2-header-check.yaml @@ -24,7 +24,7 @@ jobs: with: profile: minimal - toolchain: 1.65.0 + toolchain: 1.75.0 override: true - name: Check sv2 header file is up to date with commit run: | diff --git a/roles/Cargo.lock b/roles/Cargo.lock index af5f8304fb..721c272a35 100644 --- a/roles/Cargo.lock +++ b/roles/Cargo.lock @@ -1641,7 +1641,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a" [[package]] -name = "mining-device" +name = "mining_device" version = "0.1.1" dependencies = [ "async-channel 1.9.0", diff --git a/roles/test-utils/mining-device/Cargo.toml b/roles/test-utils/mining-device/Cargo.toml index a597195040..775d38f3b2 100644 --- a/roles/test-utils/mining-device/Cargo.toml +++ b/roles/test-utils/mining-device/Cargo.toml @@ -1,11 +1,16 @@ [package] -name = "mining-device" +name = "mining_device" version = "0.1.1" edition = "2018" publish = false # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html +[lib] +name = "mining_device" +path = "src/lib/mod.rs" + + [dependencies] stratum-common = { version = "1.0.0", path = "../../../common" } codec_sv2 = { version = "^1.0.1", path = "../../../protocols/v2/codec-sv2", features=["noise_sv2"] } @@ -24,3 +29,4 @@ tracing = { version = "0.1" } tracing-subscriber = "0.3" sha2 = "0.10.6" tokio = "^1.38.0" + diff --git a/roles/test-utils/mining-device/src/lib/mod.rs b/roles/test-utils/mining-device/src/lib/mod.rs new file mode 100644 index 0000000000..abfa1d9691 --- /dev/null +++ b/roles/test-utils/mining-device/src/lib/mod.rs @@ -0,0 +1,712 @@ +#![allow(clippy::option_map_unit_fn)] +use key_utils::Secp256k1PublicKey; +use network_helpers_sv2::noise_connection_tokio::Connection; +use roles_logic_sv2::utils::Id; +use std::{ + net::{SocketAddr, ToSocketAddrs}, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, + time::Duration, +}; +use tokio::net::TcpStream; + +use async_channel::{Receiver, Sender}; +use binary_sv2::u256_from_int; +use codec_sv2::{Initiator, StandardEitherFrame, StandardSv2Frame}; +use rand::{thread_rng, Rng}; +use roles_logic_sv2::{ + common_messages_sv2::{Protocol, SetupConnection, SetupConnectionSuccess}, + common_properties::{IsMiningUpstream, IsUpstream}, + errors::Error, + handlers::{ + common::ParseUpstreamCommonMessages, + mining::{ParseUpstreamMiningMessages, SendTo, SupportedChannelTypes}, + }, + mining_sv2::*, + parsers::{Mining, MiningDeviceMessages}, + routing_logic::{CommonRoutingLogic, MiningRoutingLogic, NoRouting}, + selectors::NullDownstreamMiningSelector, + utils::Mutex, +}; +use std::time::Instant; +use stratum_common::bitcoin::{ + blockdata::block::BlockHeader, hash_types::BlockHash, hashes::Hash, util::uint::Uint256, +}; +use tracing::{error, info}; + +pub async fn connect( + address: String, + pub_key: Option, + device_id: Option, + user_id: Option, + handicap: u32, +) { + let address = address + .clone() + .to_socket_addrs() + .expect("Invalid pool address, use one of this formats: ip:port, domain:port") + .next() + .expect("Invalid pool address, use one of this formats: ip:port, domain:port"); + info!("Connecting to pool at {}", address); + let socket = loop { + let pool = tokio::time::timeout(Duration::from_secs(5), TcpStream::connect(address)).await; + match pool { + Ok(result) => match result { + Ok(socket) => break socket, + Err(e) => { + error!( + "Failed to connect to Upstream role at {}, retrying in 5s: {}", + address, e + ); + tokio::time::sleep(Duration::from_secs(5)).await; + } + }, + Err(_) => { + error!("Pool is unresponsive, terminating"); + std::process::exit(1); + } + } + }; + info!("Pool tcp connection established at {}", address); + let address = socket.peer_addr().unwrap(); + let initiator = Initiator::new(pub_key.map(|e| e.0)); + let (receiver, sender, _, _): (Receiver, Sender, _, _) = + Connection::new(socket, codec_sv2::HandshakeRole::Initiator(initiator)) + .await + .unwrap(); + info!("Pool noise connection established at {}", address); + Device::start(receiver, sender, address, device_id, user_id, handicap).await +} + +pub type Message = MiningDeviceMessages<'static>; +pub type StdFrame = StandardSv2Frame; +pub type EitherFrame = StandardEitherFrame; + +struct SetupConnectionHandler {} +use std::convert::TryInto; + +impl SetupConnectionHandler { + pub fn new() -> Self { + SetupConnectionHandler {} + } + fn get_setup_connection_message( + address: SocketAddr, + device_id: Option, + ) -> SetupConnection<'static> { + let endpoint_host = address.ip().to_string().into_bytes().try_into().unwrap(); + let vendor = String::new().try_into().unwrap(); + let hardware_version = String::new().try_into().unwrap(); + let firmware = String::new().try_into().unwrap(); + let device_id = device_id.unwrap_or_default(); + info!( + "Creating SetupConnection message with device id: {:?}", + device_id + ); + SetupConnection { + protocol: Protocol::MiningProtocol, + min_version: 2, + max_version: 2, + flags: 0b0000_0000_0000_0000_0000_0000_0000_0001, + endpoint_host, + endpoint_port: address.port(), + vendor, + hardware_version, + firmware, + device_id: device_id.try_into().unwrap(), + } + } + pub async fn setup( + self_: Arc>, + receiver: &mut Receiver, + sender: &mut Sender, + device_id: Option, + address: SocketAddr, + ) { + let setup_connection = Self::get_setup_connection_message(address, device_id); + + let sv2_frame: StdFrame = MiningDeviceMessages::Common(setup_connection.into()) + .try_into() + .unwrap(); + let sv2_frame = sv2_frame.into(); + sender.send(sv2_frame).await.unwrap(); + info!("Setup connection sent to {}", address); + + let mut incoming: StdFrame = receiver.recv().await.unwrap().try_into().unwrap(); + let message_type = incoming.get_header().unwrap().msg_type(); + let payload = incoming.payload(); + ParseUpstreamCommonMessages::handle_message_common( + self_, + message_type, + payload, + CommonRoutingLogic::None, + ) + .unwrap(); + } +} + +impl ParseUpstreamCommonMessages for SetupConnectionHandler { + fn handle_setup_connection_success( + &mut self, + _: SetupConnectionSuccess, + ) -> Result { + use roles_logic_sv2::handlers::common::SendTo; + info!("Setup connection success"); + Ok(SendTo::None(None)) + } + + fn handle_setup_connection_error( + &mut self, + _: roles_logic_sv2::common_messages_sv2::SetupConnectionError, + ) -> Result { + error!("Setup connection error"); + todo!() + } + + fn handle_channel_endpoint_changed( + &mut self, + _: roles_logic_sv2::common_messages_sv2::ChannelEndpointChanged, + ) -> Result { + todo!() + } +} + +#[derive(Debug, Clone)] +struct NewWorkNotifier { + should_send: bool, + sender: Sender<()>, +} + +#[derive(Debug)] +pub struct Device { + #[allow(dead_code)] + receiver: Receiver, + sender: Sender, + #[allow(dead_code)] + channel_opened: bool, + channel_id: Option, + miner: Arc>, + jobs: Vec>, + prev_hash: Option>, + sequence_numbers: Id, + notify_changes_to_mining_thread: NewWorkNotifier, +} + +fn open_channel(device_id: Option) -> OpenStandardMiningChannel<'static> { + let user_identity = device_id.unwrap_or_default().try_into().unwrap(); + let id: u32 = 10; + info!("Measuring CPU hashrate"); + let p = std::thread::available_parallelism().unwrap().get() as u32 - 3; + let nominal_hash_rate = measure_hashrate(5) as f32 * p as f32; + info!("Pc hashrate is {}", nominal_hash_rate); + info!("MINING DEVICE: send open channel with request id {}", id); + OpenStandardMiningChannel { + request_id: id.into(), + user_identity, + nominal_hash_rate, + max_target: u256_from_int(567_u64), + } +} + +impl Device { + async fn start( + mut receiver: Receiver, + mut sender: Sender, + addr: SocketAddr, + device_id: Option, + user_id: Option, + handicap: u32, + ) { + let setup_connection_handler = Arc::new(Mutex::new(SetupConnectionHandler::new())); + SetupConnectionHandler::setup( + setup_connection_handler, + &mut receiver, + &mut sender, + device_id, + addr, + ) + .await; + info!("Pool sv2 connection established at {}", addr); + let miner = Arc::new(Mutex::new(Miner::new(handicap))); + let (notify_changes_to_mining_thread, update_miners) = async_channel::unbounded(); + let self_ = Self { + channel_opened: false, + receiver: receiver.clone(), + sender: sender.clone(), + miner: miner.clone(), + jobs: Vec::new(), + prev_hash: None, + channel_id: None, + sequence_numbers: Id::new(), + notify_changes_to_mining_thread: NewWorkNotifier { + should_send: true, + sender: notify_changes_to_mining_thread, + }, + }; + let open_channel = + MiningDeviceMessages::Mining(Mining::OpenStandardMiningChannel(open_channel(user_id))); + let frame: StdFrame = open_channel.try_into().unwrap(); + self_.sender.send(frame.into()).await.unwrap(); + let self_mutex = std::sync::Arc::new(Mutex::new(self_)); + let cloned = self_mutex.clone(); + + let (share_send, share_recv) = async_channel::unbounded(); + + start_mining_threads(update_miners, miner, share_send); + tokio::task::spawn(async move { + let recv = share_recv.clone(); + loop { + let (nonce, job_id, version, ntime) = recv.recv().await.unwrap(); + Self::send_share(cloned.clone(), nonce, job_id, version, ntime).await; + } + }); + + loop { + let mut incoming: StdFrame = receiver.recv().await.unwrap().try_into().unwrap(); + let message_type = incoming.get_header().unwrap().msg_type(); + let payload = incoming.payload(); + let next = Device::handle_message_mining( + self_mutex.clone(), + message_type, + payload, + MiningRoutingLogic::None, + ) + .unwrap(); + let mut notify_changes_to_mining_thread = self_mutex + .safe_lock(|s| s.notify_changes_to_mining_thread.clone()) + .unwrap(); + if notify_changes_to_mining_thread.should_send + && (message_type == const_sv2::MESSAGE_TYPE_NEW_MINING_JOB + || message_type == const_sv2::MESSAGE_TYPE_SET_NEW_PREV_HASH + || message_type == const_sv2::MESSAGE_TYPE_SET_TARGET) + { + notify_changes_to_mining_thread + .sender + .send(()) + .await + .unwrap(); + notify_changes_to_mining_thread.should_send = false; + }; + match next { + SendTo::RelayNewMessageToRemote(_, m) => { + let sv2_frame: StdFrame = MiningDeviceMessages::Mining(m).try_into().unwrap(); + let either_frame: EitherFrame = sv2_frame.into(); + sender.send(either_frame).await.unwrap(); + } + SendTo::None(_) => (), + _ => panic!(), + } + } + } + + async fn send_share( + self_mutex: Arc>, + nonce: u32, + job_id: u32, + version: u32, + ntime: u32, + ) { + let share = + MiningDeviceMessages::Mining(Mining::SubmitSharesStandard(SubmitSharesStandard { + channel_id: self_mutex.safe_lock(|s| s.channel_id.unwrap()).unwrap(), + sequence_number: self_mutex.safe_lock(|s| s.sequence_numbers.next()).unwrap(), + job_id, + nonce, + ntime, + version, + })); + let frame: StdFrame = share.try_into().unwrap(); + let sender = self_mutex.safe_lock(|s| s.sender.clone()).unwrap(); + sender.send(frame.into()).await.unwrap(); + } +} + +impl IsUpstream<(), NullDownstreamMiningSelector> for Device { + fn get_version(&self) -> u16 { + todo!() + } + + fn get_flags(&self) -> u32 { + todo!() + } + + fn get_supported_protocols(&self) -> Vec { + todo!() + } + + fn get_id(&self) -> u32 { + todo!() + } + + fn get_mapper(&mut self) -> Option<&mut roles_logic_sv2::common_properties::RequestIdMapper> { + todo!() + } + + fn get_remote_selector(&mut self) -> &mut NullDownstreamMiningSelector { + todo!() + } +} + +impl IsMiningUpstream<(), NullDownstreamMiningSelector> for Device { + fn total_hash_rate(&self) -> u64 { + todo!() + } + + fn add_hash_rate(&mut self, _to_add: u64) { + todo!() + } + fn get_opened_channels( + &mut self, + ) -> &mut Vec { + todo!() + } + + fn update_channels(&mut self, _: roles_logic_sv2::common_properties::UpstreamChannel) { + todo!() + } +} + +impl ParseUpstreamMiningMessages<(), NullDownstreamMiningSelector, NoRouting> for Device { + fn get_channel_type(&self) -> SupportedChannelTypes { + SupportedChannelTypes::Standard + } + + fn is_work_selection_enabled(&self) -> bool { + false + } + + fn handle_open_standard_mining_channel_success( + &mut self, + m: OpenStandardMiningChannelSuccess, + _: Option>>, + ) -> Result, Error> { + self.channel_opened = true; + self.channel_id = Some(m.channel_id); + let req_id = m.get_request_id_as_u32(); + info!( + "MINING DEVICE: channel opened with: group id {}, channel id {}, request id {}", + m.group_channel_id, m.channel_id, req_id + ); + self.miner + .safe_lock(|miner| miner.new_target(m.target.to_vec())) + .unwrap(); + self.notify_changes_to_mining_thread.should_send = true; + Ok(SendTo::None(None)) + } + + fn handle_open_extended_mining_channel_success( + &mut self, + _: OpenExtendedMiningChannelSuccess, + ) -> Result, Error> { + unreachable!() + } + + fn handle_open_mining_channel_error( + &mut self, + _: OpenMiningChannelError, + ) -> Result, Error> { + todo!() + } + + fn handle_update_channel_error(&mut self, _: UpdateChannelError) -> Result, Error> { + todo!() + } + + fn handle_close_channel(&mut self, _: CloseChannel) -> Result, Error> { + todo!() + } + + fn handle_set_extranonce_prefix( + &mut self, + _: SetExtranoncePrefix, + ) -> Result, Error> { + todo!() + } + + fn handle_submit_shares_success( + &mut self, + m: SubmitSharesSuccess, + ) -> Result, Error> { + info!("SUCCESS {:?}", m); + Ok(SendTo::None(None)) + } + + fn handle_submit_shares_error(&mut self, _: SubmitSharesError) -> Result, Error> { + info!("Submit shares error"); + Ok(SendTo::None(None)) + } + + fn handle_new_mining_job(&mut self, m: NewMiningJob) -> Result, Error> { + match (m.is_future(), self.prev_hash.as_ref()) { + (false, Some(p_h)) => { + self.miner + .safe_lock(|miner| miner.new_header(p_h, &m)) + .unwrap(); + self.jobs = vec![m.as_static()]; + self.notify_changes_to_mining_thread.should_send = true; + } + (true, _) => self.jobs.push(m.as_static()), + (false, None) => { + panic!() + } + } + Ok(SendTo::None(None)) + } + + fn handle_new_extended_mining_job( + &mut self, + _: NewExtendedMiningJob, + ) -> Result, Error> { + todo!() + } + + fn handle_set_new_prev_hash(&mut self, m: SetNewPrevHash) -> Result, Error> { + let jobs: Vec<&NewMiningJob<'static>> = self + .jobs + .iter() + .filter(|j| j.job_id == m.job_id && j.is_future()) + .collect(); + match jobs.len() { + 0 => { + self.prev_hash = Some(m.as_static()); + } + 1 => { + self.miner + .safe_lock(|miner| miner.new_header(&m, jobs[0])) + .unwrap(); + self.jobs = vec![jobs[0].clone()]; + self.prev_hash = Some(m.as_static()); + self.notify_changes_to_mining_thread.should_send = true; + } + _ => panic!(), + } + Ok(SendTo::None(None)) + } + + fn handle_set_custom_mining_job_success( + &mut self, + _: SetCustomMiningJobSuccess, + ) -> Result, Error> { + todo!() + } + + fn handle_set_custom_mining_job_error( + &mut self, + _: SetCustomMiningJobError, + ) -> Result, Error> { + todo!() + } + + fn handle_set_target(&mut self, m: SetTarget) -> Result, Error> { + self.miner + .safe_lock(|miner| miner.new_target(m.maximum_target.to_vec())) + .unwrap(); + self.notify_changes_to_mining_thread.should_send = true; + Ok(SendTo::None(None)) + } + + fn handle_reconnect(&mut self, _: Reconnect) -> Result, Error> { + todo!() + } +} + +#[derive(Debug, Clone)] +struct Miner { + header: Option, + target: Option, + job_id: Option, + version: Option, + handicap: u32, +} + +impl Miner { + fn new(handicap: u32) -> Self { + Self { + target: None, + header: None, + job_id: None, + version: None, + handicap, + } + } + + fn new_target(&mut self, mut target: Vec) { + // target is sent in LE and comparisons in this file are done in BE + target.reverse(); + let hex_string = target + .iter() + .fold("".to_string(), |acc, b| acc + format!("{:02x}", b).as_str()); + info!("Set target to {}", hex_string); + self.target = Some(Uint256::from_be_bytes(target.try_into().unwrap())); + } + + fn new_header(&mut self, set_new_prev_hash: &SetNewPrevHash, new_job: &NewMiningJob) { + self.job_id = Some(new_job.job_id); + self.version = Some(new_job.version); + let prev_hash: [u8; 32] = set_new_prev_hash.prev_hash.to_vec().try_into().unwrap(); + let prev_hash = Hash::from_inner(prev_hash); + let merkle_root: [u8; 32] = new_job.merkle_root.to_vec().try_into().unwrap(); + let merkle_root = Hash::from_inner(merkle_root); + // fields need to be added as BE and the are converted to LE in the background before hashing + let header = BlockHeader { + version: new_job.version as i32, + prev_blockhash: BlockHash::from_hash(prev_hash), + merkle_root, + time: std::time::SystemTime::now() + .duration_since( + std::time::SystemTime::UNIX_EPOCH - std::time::Duration::from_secs(60), + ) + .unwrap() + .as_secs() as u32, + bits: set_new_prev_hash.nbits, + nonce: 0, + }; + self.header = Some(header); + } + pub fn next_share(&mut self) -> NextShareOutcome { + if let Some(header) = self.header.as_ref() { + let mut hash = header.block_hash().as_hash().into_inner(); + hash.reverse(); + let hash = Uint256::from_be_bytes(hash); + if hash < *self.target.as_ref().unwrap() { + info!( + "Found share with nonce: {}, for target: {:?}, with hash: {:?}", + header.nonce, self.target, hash, + ); + NextShareOutcome::ValidShare + } else { + NextShareOutcome::InvalidShare + } + } else { + std::thread::yield_now(); + NextShareOutcome::InvalidShare + } + } +} + +enum NextShareOutcome { + ValidShare, + InvalidShare, +} + +impl NextShareOutcome { + pub fn is_valid(&self) -> bool { + matches!(self, NextShareOutcome::ValidShare) + } +} + +// returns hashrate based on how fast the device hashes over the given duration +fn measure_hashrate(duration_secs: u64) -> f64 { + let mut rng = thread_rng(); + let prev_hash: [u8; 32] = generate_random_32_byte_array().to_vec().try_into().unwrap(); + let prev_hash = Hash::from_inner(prev_hash); + // We create a random block that we can hash, we are only interested in knowing how many hashes + // per unit of time we can do + let merkle_root: [u8; 32] = generate_random_32_byte_array().to_vec().try_into().unwrap(); + let merkle_root = Hash::from_inner(merkle_root); + let header = BlockHeader { + version: rng.gen(), + prev_blockhash: BlockHash::from_hash(prev_hash), + merkle_root, + time: std::time::SystemTime::now() + .duration_since(std::time::SystemTime::UNIX_EPOCH - std::time::Duration::from_secs(60)) + .unwrap() + .as_secs() as u32, + bits: rng.gen(), + nonce: 0, + }; + let start_time = Instant::now(); + let mut hashes: u64 = 0; + let duration = Duration::from_secs(duration_secs); + let mut miner = Miner::new(0); + // We put the target to 0 we are only interested in how many hashes per unit of time we can do + // and do not want to be botherd by messages about valid shares found. + miner.new_target(vec![0_u8; 32]); + miner.header = Some(header); + + while start_time.elapsed() < duration { + miner.next_share(); + hashes += 1; + } + + let elapsed_secs = start_time.elapsed().as_secs_f64(); + hashes as f64 / elapsed_secs +} +fn generate_random_32_byte_array() -> [u8; 32] { + let mut rng = thread_rng(); + let mut arr = [0u8; 32]; + rng.fill(&mut arr[..]); + arr +} + +fn start_mining_threads( + have_new_job: Receiver<()>, + miner: Arc>, + share_send: Sender<(u32, u32, u32, u32)>, +) { + tokio::task::spawn(async move { + let mut killers: Vec> = vec![]; + loop { + let available_parallelism = u32::max( + 2, + std::thread::available_parallelism().unwrap().get() as u32, + ); + let p = available_parallelism - 1; + let unit = u32::MAX / p; + while have_new_job.recv().await.is_ok() { + while let Some(killer) = killers.pop() { + killer.store(true, Ordering::Relaxed); + } + let miner = miner.safe_lock(|m| m.clone()).unwrap(); + for i in 0..p { + let mut miner = miner.clone(); + let share_send = share_send.clone(); + let killer = Arc::new(AtomicBool::new(false)); + miner.header.as_mut().map(|h| h.nonce = i * unit); + killers.push(killer.clone()); + std::thread::spawn(move || { + mine(miner, share_send, killer); + }); + } + } + } + }); +} + +fn mine(mut miner: Miner, share_send: Sender<(u32, u32, u32, u32)>, kill: Arc) { + if miner.handicap != 0 { + loop { + if kill.load(Ordering::Relaxed) { + break; + } + std::thread::sleep(std::time::Duration::from_micros(miner.handicap.into())); + if miner.next_share().is_valid() { + let nonce = miner.header.unwrap().nonce; + let time = miner.header.unwrap().time; + let job_id = miner.job_id.unwrap(); + let version = miner.version; + share_send + .try_send((nonce, job_id, version.unwrap(), time)) + .unwrap(); + } + miner.header.as_mut().map(|h| h.nonce += 1); + } + } else { + loop { + if miner.next_share().is_valid() { + if kill.load(Ordering::Relaxed) { + break; + } + let nonce = miner.header.unwrap().nonce; + let time = miner.header.unwrap().time; + let job_id = miner.job_id.unwrap(); + let version = miner.version; + share_send + .try_send((nonce, job_id, version.unwrap(), time)) + .unwrap(); + } + miner.header.as_mut().map(|h| h.nonce += 1); + } + } +} diff --git a/roles/test-utils/mining-device/src/main.rs b/roles/test-utils/mining-device/src/main.rs index 35a31d56ac..02a19c12f0 100644 --- a/roles/test-utils/mining-device/src/main.rs +++ b/roles/test-utils/mining-device/src/main.rs @@ -1,24 +1,11 @@ +#![allow(special_module_name)] #![allow(clippy::option_map_unit_fn)] use key_utils::Secp256k1PublicKey; -use network_helpers_sv2::noise_connection_tokio::Connection; -use roles_logic_sv2::utils::Id; -use std::{ - net::{SocketAddr, ToSocketAddrs}, - sync::{ - atomic::{AtomicBool, Ordering}, - Arc, - }, - time::Duration, -}; -use tokio::net::TcpStream; use clap::Parser; -use rand::{thread_rng, Rng}; -use std::time::Instant; -use stratum_common::bitcoin::{ - blockdata::block::BlockHeader, hash_types::BlockHash, hashes::Hash, util::uint::Uint256, -}; -use tracing::{error, info}; +use tracing::info; + +pub mod lib; #[derive(Parser, Debug)] #[command(version, about, long_about = None)] @@ -54,710 +41,17 @@ struct Args { id_user: Option, } -async fn connect( - address: String, - pub_key: Option, - device_id: Option, - user_id: Option, - handicap: u32, -) { - let address = address - .clone() - .to_socket_addrs() - .expect("Invalid pool address, use one of this formats: ip:port, domain:port") - .next() - .expect("Invalid pool address, use one of this formats: ip:port, domain:port"); - info!("Connecting to pool at {}", address); - let socket = loop { - let pool = tokio::time::timeout(Duration::from_secs(5), TcpStream::connect(address)).await; - match pool { - Ok(result) => match result { - Ok(socket) => break socket, - Err(e) => { - error!( - "Failed to connect to Upstream role at {}, retrying in 5s: {}", - address, e - ); - tokio::time::sleep(Duration::from_secs(5)).await; - } - }, - Err(_) => { - error!("Pool is unresponsive, terminating"); - std::process::exit(1); - } - } - }; - info!("Pool tcp connection established at {}", address); - let address = socket.peer_addr().unwrap(); - let initiator = Initiator::new(pub_key.map(|e| e.0)); - let (receiver, sender, _, _): (Receiver, Sender, _, _) = - Connection::new(socket, codec_sv2::HandshakeRole::Initiator(initiator)) - .await - .unwrap(); - info!("Pool noise connection established at {}", address); - Device::start(receiver, sender, address, device_id, user_id, handicap).await -} - #[tokio::main(flavor = "current_thread")] async fn main() { let args = Args::parse(); tracing_subscriber::fmt::init(); info!("start"); - connect( + let _ = lib::connect( args.address_pool, args.pubkey_pool, args.id_device, args.id_user, args.handicap, ) - .await -} - -use async_channel::{Receiver, Sender}; -use binary_sv2::u256_from_int; -use codec_sv2::{Initiator, StandardEitherFrame, StandardSv2Frame}; -use roles_logic_sv2::{ - common_messages_sv2::{Protocol, SetupConnection, SetupConnectionSuccess}, - common_properties::{IsMiningUpstream, IsUpstream}, - errors::Error, - handlers::{ - common::ParseUpstreamCommonMessages, - mining::{ParseUpstreamMiningMessages, SendTo, SupportedChannelTypes}, - }, - mining_sv2::*, - parsers::{Mining, MiningDeviceMessages}, - routing_logic::{CommonRoutingLogic, MiningRoutingLogic, NoRouting}, - selectors::NullDownstreamMiningSelector, - utils::Mutex, -}; - -pub type Message = MiningDeviceMessages<'static>; -pub type StdFrame = StandardSv2Frame; -pub type EitherFrame = StandardEitherFrame; - -struct SetupConnectionHandler {} -use std::convert::TryInto; - -impl SetupConnectionHandler { - pub fn new() -> Self { - SetupConnectionHandler {} - } - fn get_setup_connection_message( - address: SocketAddr, - device_id: Option, - ) -> SetupConnection<'static> { - let endpoint_host = address.ip().to_string().into_bytes().try_into().unwrap(); - let vendor = String::new().try_into().unwrap(); - let hardware_version = String::new().try_into().unwrap(); - let firmware = String::new().try_into().unwrap(); - let device_id = device_id.unwrap_or_default(); - info!( - "Creating SetupConnection message with device id: {:?}", - device_id - ); - SetupConnection { - protocol: Protocol::MiningProtocol, - min_version: 2, - max_version: 2, - flags: 0b0000_0000_0000_0000_0000_0000_0000_0001, - endpoint_host, - endpoint_port: address.port(), - vendor, - hardware_version, - firmware, - device_id: device_id.try_into().unwrap(), - } - } - pub async fn setup( - self_: Arc>, - receiver: &mut Receiver, - sender: &mut Sender, - device_id: Option, - address: SocketAddr, - ) { - let setup_connection = Self::get_setup_connection_message(address, device_id); - - let sv2_frame: StdFrame = MiningDeviceMessages::Common(setup_connection.into()) - .try_into() - .unwrap(); - let sv2_frame = sv2_frame.into(); - sender.send(sv2_frame).await.unwrap(); - info!("Setup connection sent to {}", address); - - let mut incoming: StdFrame = receiver.recv().await.unwrap().try_into().unwrap(); - let message_type = incoming.get_header().unwrap().msg_type(); - let payload = incoming.payload(); - ParseUpstreamCommonMessages::handle_message_common( - self_, - message_type, - payload, - CommonRoutingLogic::None, - ) - .unwrap(); - } -} - -impl ParseUpstreamCommonMessages for SetupConnectionHandler { - fn handle_setup_connection_success( - &mut self, - _: SetupConnectionSuccess, - ) -> Result { - use roles_logic_sv2::handlers::common::SendTo; - info!("Setup connection success"); - Ok(SendTo::None(None)) - } - - fn handle_setup_connection_error( - &mut self, - _: roles_logic_sv2::common_messages_sv2::SetupConnectionError, - ) -> Result { - error!("Setup connection error"); - todo!() - } - - fn handle_channel_endpoint_changed( - &mut self, - _: roles_logic_sv2::common_messages_sv2::ChannelEndpointChanged, - ) -> Result { - todo!() - } -} - -#[derive(Debug, Clone)] -struct NewWorkNotifier { - should_send: bool, - sender: Sender<()>, -} - -#[derive(Debug)] -pub struct Device { - #[allow(dead_code)] - receiver: Receiver, - sender: Sender, - #[allow(dead_code)] - channel_opened: bool, - channel_id: Option, - miner: Arc>, - jobs: Vec>, - prev_hash: Option>, - sequence_numbers: Id, - notify_changes_to_mining_thread: NewWorkNotifier, -} - -fn open_channel(device_id: Option) -> OpenStandardMiningChannel<'static> { - let user_identity = device_id.unwrap_or_default().try_into().unwrap(); - let id: u32 = 10; - info!("Measuring CPU hashrate"); - let p = std::thread::available_parallelism().unwrap().get() as u32 - 3; - let nominal_hash_rate = measure_hashrate(5) as f32 * p as f32; - info!("Pc hashrate is {}", nominal_hash_rate); - info!("MINING DEVICE: send open channel with request id {}", id); - OpenStandardMiningChannel { - request_id: id.into(), - user_identity, - nominal_hash_rate, - max_target: u256_from_int(567_u64), - } -} - -impl Device { - async fn start( - mut receiver: Receiver, - mut sender: Sender, - addr: SocketAddr, - device_id: Option, - user_id: Option, - handicap: u32, - ) { - let setup_connection_handler = Arc::new(Mutex::new(SetupConnectionHandler::new())); - SetupConnectionHandler::setup( - setup_connection_handler, - &mut receiver, - &mut sender, - device_id, - addr, - ) - .await; - info!("Pool sv2 connection established at {}", addr); - let miner = Arc::new(Mutex::new(Miner::new(handicap))); - let (notify_changes_to_mining_thread, update_miners) = async_channel::unbounded(); - let self_ = Self { - channel_opened: false, - receiver: receiver.clone(), - sender: sender.clone(), - miner: miner.clone(), - jobs: Vec::new(), - prev_hash: None, - channel_id: None, - sequence_numbers: Id::new(), - notify_changes_to_mining_thread: NewWorkNotifier { - should_send: true, - sender: notify_changes_to_mining_thread, - }, - }; - let open_channel = - MiningDeviceMessages::Mining(Mining::OpenStandardMiningChannel(open_channel(user_id))); - let frame: StdFrame = open_channel.try_into().unwrap(); - self_.sender.send(frame.into()).await.unwrap(); - let self_mutex = std::sync::Arc::new(Mutex::new(self_)); - let cloned = self_mutex.clone(); - - let (share_send, share_recv) = async_channel::unbounded(); - - start_mining_threads(update_miners, miner, share_send); - tokio::task::spawn(async move { - let recv = share_recv.clone(); - loop { - let (nonce, job_id, version, ntime) = recv.recv().await.unwrap(); - Self::send_share(cloned.clone(), nonce, job_id, version, ntime).await; - } - }); - - loop { - let mut incoming: StdFrame = receiver.recv().await.unwrap().try_into().unwrap(); - let message_type = incoming.get_header().unwrap().msg_type(); - let payload = incoming.payload(); - let next = Device::handle_message_mining( - self_mutex.clone(), - message_type, - payload, - MiningRoutingLogic::None, - ) - .unwrap(); - let mut notify_changes_to_mining_thread = self_mutex - .safe_lock(|s| s.notify_changes_to_mining_thread.clone()) - .unwrap(); - if notify_changes_to_mining_thread.should_send - && (message_type == const_sv2::MESSAGE_TYPE_NEW_MINING_JOB - || message_type == const_sv2::MESSAGE_TYPE_SET_NEW_PREV_HASH - || message_type == const_sv2::MESSAGE_TYPE_SET_TARGET) - { - notify_changes_to_mining_thread - .sender - .send(()) - .await - .unwrap(); - notify_changes_to_mining_thread.should_send = false; - }; - match next { - SendTo::RelayNewMessageToRemote(_, m) => { - let sv2_frame: StdFrame = MiningDeviceMessages::Mining(m).try_into().unwrap(); - let either_frame: EitherFrame = sv2_frame.into(); - sender.send(either_frame).await.unwrap(); - } - SendTo::None(_) => (), - _ => panic!(), - } - } - } - - async fn send_share( - self_mutex: Arc>, - nonce: u32, - job_id: u32, - version: u32, - ntime: u32, - ) { - let share = - MiningDeviceMessages::Mining(Mining::SubmitSharesStandard(SubmitSharesStandard { - channel_id: self_mutex.safe_lock(|s| s.channel_id.unwrap()).unwrap(), - sequence_number: self_mutex.safe_lock(|s| s.sequence_numbers.next()).unwrap(), - job_id, - nonce, - ntime, - version, - })); - let frame: StdFrame = share.try_into().unwrap(); - let sender = self_mutex.safe_lock(|s| s.sender.clone()).unwrap(); - sender.send(frame.into()).await.unwrap(); - } -} - -impl IsUpstream<(), NullDownstreamMiningSelector> for Device { - fn get_version(&self) -> u16 { - todo!() - } - - fn get_flags(&self) -> u32 { - todo!() - } - - fn get_supported_protocols(&self) -> Vec { - todo!() - } - - fn get_id(&self) -> u32 { - todo!() - } - - fn get_mapper(&mut self) -> Option<&mut roles_logic_sv2::common_properties::RequestIdMapper> { - todo!() - } - - fn get_remote_selector(&mut self) -> &mut NullDownstreamMiningSelector { - todo!() - } -} - -impl IsMiningUpstream<(), NullDownstreamMiningSelector> for Device { - fn total_hash_rate(&self) -> u64 { - todo!() - } - - fn add_hash_rate(&mut self, _to_add: u64) { - todo!() - } - fn get_opened_channels( - &mut self, - ) -> &mut Vec { - todo!() - } - - fn update_channels(&mut self, _: roles_logic_sv2::common_properties::UpstreamChannel) { - todo!() - } -} - -impl ParseUpstreamMiningMessages<(), NullDownstreamMiningSelector, NoRouting> for Device { - fn get_channel_type(&self) -> SupportedChannelTypes { - SupportedChannelTypes::Standard - } - - fn is_work_selection_enabled(&self) -> bool { - false - } - - fn handle_open_standard_mining_channel_success( - &mut self, - m: OpenStandardMiningChannelSuccess, - _: Option>>, - ) -> Result, Error> { - self.channel_opened = true; - self.channel_id = Some(m.channel_id); - let req_id = m.get_request_id_as_u32(); - info!( - "MINING DEVICE: channel opened with: group id {}, channel id {}, request id {}", - m.group_channel_id, m.channel_id, req_id - ); - self.miner - .safe_lock(|miner| miner.new_target(m.target.to_vec())) - .unwrap(); - self.notify_changes_to_mining_thread.should_send = true; - Ok(SendTo::None(None)) - } - - fn handle_open_extended_mining_channel_success( - &mut self, - _: OpenExtendedMiningChannelSuccess, - ) -> Result, Error> { - unreachable!() - } - - fn handle_open_mining_channel_error( - &mut self, - _: OpenMiningChannelError, - ) -> Result, Error> { - todo!() - } - - fn handle_update_channel_error(&mut self, _: UpdateChannelError) -> Result, Error> { - todo!() - } - - fn handle_close_channel(&mut self, _: CloseChannel) -> Result, Error> { - todo!() - } - - fn handle_set_extranonce_prefix( - &mut self, - _: SetExtranoncePrefix, - ) -> Result, Error> { - todo!() - } - - fn handle_submit_shares_success( - &mut self, - m: SubmitSharesSuccess, - ) -> Result, Error> { - info!("SUCCESS {:?}", m); - Ok(SendTo::None(None)) - } - - fn handle_submit_shares_error(&mut self, _: SubmitSharesError) -> Result, Error> { - info!("Submit shares error"); - Ok(SendTo::None(None)) - } - - fn handle_new_mining_job(&mut self, m: NewMiningJob) -> Result, Error> { - match (m.is_future(), self.prev_hash.as_ref()) { - (false, Some(p_h)) => { - self.miner - .safe_lock(|miner| miner.new_header(p_h, &m)) - .unwrap(); - self.jobs = vec![m.as_static()]; - self.notify_changes_to_mining_thread.should_send = true; - } - (true, _) => self.jobs.push(m.as_static()), - (false, None) => { - panic!() - } - } - Ok(SendTo::None(None)) - } - - fn handle_new_extended_mining_job( - &mut self, - _: NewExtendedMiningJob, - ) -> Result, Error> { - todo!() - } - - fn handle_set_new_prev_hash(&mut self, m: SetNewPrevHash) -> Result, Error> { - let jobs: Vec<&NewMiningJob<'static>> = self - .jobs - .iter() - .filter(|j| j.job_id == m.job_id && j.is_future()) - .collect(); - match jobs.len() { - 0 => { - self.prev_hash = Some(m.as_static()); - } - 1 => { - self.miner - .safe_lock(|miner| miner.new_header(&m, jobs[0])) - .unwrap(); - self.jobs = vec![jobs[0].clone()]; - self.prev_hash = Some(m.as_static()); - self.notify_changes_to_mining_thread.should_send = true; - } - _ => panic!(), - } - Ok(SendTo::None(None)) - } - - fn handle_set_custom_mining_job_success( - &mut self, - _: SetCustomMiningJobSuccess, - ) -> Result, Error> { - todo!() - } - - fn handle_set_custom_mining_job_error( - &mut self, - _: SetCustomMiningJobError, - ) -> Result, Error> { - todo!() - } - - fn handle_set_target(&mut self, m: SetTarget) -> Result, Error> { - self.miner - .safe_lock(|miner| miner.new_target(m.maximum_target.to_vec())) - .unwrap(); - self.notify_changes_to_mining_thread.should_send = true; - Ok(SendTo::None(None)) - } - - fn handle_reconnect(&mut self, _: Reconnect) -> Result, Error> { - todo!() - } -} - -#[derive(Debug, Clone)] -struct Miner { - header: Option, - target: Option, - job_id: Option, - version: Option, - handicap: u32, -} - -impl Miner { - fn new(handicap: u32) -> Self { - Self { - target: None, - header: None, - job_id: None, - version: None, - handicap, - } - } - - fn new_target(&mut self, mut target: Vec) { - // target is sent in LE and comparisons in this file are done in BE - target.reverse(); - let hex_string = target - .iter() - .fold("".to_string(), |acc, b| acc + format!("{:02x}", b).as_str()); - info!("Set target to {}", hex_string); - self.target = Some(Uint256::from_be_bytes(target.try_into().unwrap())); - } - - fn new_header(&mut self, set_new_prev_hash: &SetNewPrevHash, new_job: &NewMiningJob) { - self.job_id = Some(new_job.job_id); - self.version = Some(new_job.version); - let prev_hash: [u8; 32] = set_new_prev_hash.prev_hash.to_vec().try_into().unwrap(); - let prev_hash = Hash::from_inner(prev_hash); - let merkle_root: [u8; 32] = new_job.merkle_root.to_vec().try_into().unwrap(); - let merkle_root = Hash::from_inner(merkle_root); - // fields need to be added as BE and the are converted to LE in the background before hashing - let header = BlockHeader { - version: new_job.version as i32, - prev_blockhash: BlockHash::from_hash(prev_hash), - merkle_root, - time: std::time::SystemTime::now() - .duration_since( - std::time::SystemTime::UNIX_EPOCH - std::time::Duration::from_secs(60), - ) - .unwrap() - .as_secs() as u32, - bits: set_new_prev_hash.nbits, - nonce: 0, - }; - self.header = Some(header); - } - pub fn next_share(&mut self) -> NextShareOutcome { - if let Some(header) = self.header.as_ref() { - let mut hash = header.block_hash().as_hash().into_inner(); - hash.reverse(); - let hash = Uint256::from_be_bytes(hash); - if hash < *self.target.as_ref().unwrap() { - info!( - "Found share with nonce: {}, for target: {:?}, with hash: {:?}", - header.nonce, self.target, hash, - ); - NextShareOutcome::ValidShare - } else { - NextShareOutcome::InvalidShare - } - } else { - std::thread::yield_now(); - NextShareOutcome::InvalidShare - } - } -} - -enum NextShareOutcome { - ValidShare, - InvalidShare, -} - -impl NextShareOutcome { - pub fn is_valid(&self) -> bool { - matches!(self, NextShareOutcome::ValidShare) - } -} - -// returns hashrate based on how fast the device hashes over the given duration -fn measure_hashrate(duration_secs: u64) -> f64 { - let mut rng = thread_rng(); - let prev_hash: [u8; 32] = generate_random_32_byte_array().to_vec().try_into().unwrap(); - let prev_hash = Hash::from_inner(prev_hash); - // We create a random block that we can hash, we are only interested in knowing how many hashes - // per unit of time we can do - let merkle_root: [u8; 32] = generate_random_32_byte_array().to_vec().try_into().unwrap(); - let merkle_root = Hash::from_inner(merkle_root); - let header = BlockHeader { - version: rng.gen(), - prev_blockhash: BlockHash::from_hash(prev_hash), - merkle_root, - time: std::time::SystemTime::now() - .duration_since(std::time::SystemTime::UNIX_EPOCH - std::time::Duration::from_secs(60)) - .unwrap() - .as_secs() as u32, - bits: rng.gen(), - nonce: 0, - }; - let start_time = Instant::now(); - let mut hashes: u64 = 0; - let duration = Duration::from_secs(duration_secs); - let mut miner = Miner::new(0); - // We put the target to 0 we are only interested in how many hashes per unit of time we can do - // and do not want to be botherd by messages about valid shares found. - miner.new_target(vec![0_u8; 32]); - miner.header = Some(header); - - while start_time.elapsed() < duration { - miner.next_share(); - hashes += 1; - } - - let elapsed_secs = start_time.elapsed().as_secs_f64(); - hashes as f64 / elapsed_secs -} -fn generate_random_32_byte_array() -> [u8; 32] { - let mut rng = thread_rng(); - let mut arr = [0u8; 32]; - rng.fill(&mut arr[..]); - arr -} - -fn start_mining_threads( - have_new_job: Receiver<()>, - miner: Arc>, - share_send: Sender<(u32, u32, u32, u32)>, -) { - tokio::task::spawn(async move { - let mut killers: Vec> = vec![]; - loop { - let available_parallelism = u32::max( - 2, - std::thread::available_parallelism().unwrap().get() as u32, - ); - let p = available_parallelism - 1; - let unit = u32::MAX / p; - while have_new_job.recv().await.is_ok() { - while let Some(killer) = killers.pop() { - killer.store(true, Ordering::Relaxed); - } - let miner = miner.safe_lock(|m| m.clone()).unwrap(); - for i in 0..p { - let mut miner = miner.clone(); - let share_send = share_send.clone(); - let killer = Arc::new(AtomicBool::new(false)); - miner.header.as_mut().map(|h| h.nonce = i * unit); - killers.push(killer.clone()); - std::thread::spawn(move || { - mine(miner, share_send, killer); - }); - } - } - } - }); -} - -fn mine(mut miner: Miner, share_send: Sender<(u32, u32, u32, u32)>, kill: Arc) { - if miner.handicap != 0 { - loop { - if kill.load(Ordering::Relaxed) { - break; - } - std::thread::sleep(std::time::Duration::from_micros(miner.handicap.into())); - if miner.next_share().is_valid() { - let nonce = miner.header.unwrap().nonce; - let time = miner.header.unwrap().time; - let job_id = miner.job_id.unwrap(); - let version = miner.version; - share_send - .try_send((nonce, job_id, version.unwrap(), time)) - .unwrap(); - } - miner.header.as_mut().map(|h| h.nonce += 1); - } - } else { - loop { - if miner.next_share().is_valid() { - if kill.load(Ordering::Relaxed) { - break; - } - let nonce = miner.header.unwrap().nonce; - let time = miner.header.unwrap().time; - let job_id = miner.job_id.unwrap(); - let version = miner.version; - share_send - .try_send((nonce, job_id, version.unwrap(), time)) - .unwrap(); - } - miner.header.as_mut().map(|h| h.nonce += 1); - } - } + .await; } diff --git a/scripts/build-on-all-workspaces.sh b/scripts/build-on-all-workspaces.sh index 9d7943ed87..c74fa25f36 100755 --- a/scripts/build-on-all-workspaces.sh +++ b/scripts/build-on-all-workspaces.sh @@ -5,7 +5,7 @@ WORKSPACES="benches common protocols roles utils" for workspace in $WORKSPACES; do echo "Executing build on: $workspace" - cargo build --manifest-path="$workspace/Cargo.toml" -- + cargo +1.75.0 build --manifest-path="$workspace/Cargo.toml" -- if [ $? -ne 0 ]; then echo "Build found some errors in: $workspace" exit 1 diff --git a/scripts/build_header.sh b/scripts/build_header.sh index 046723cca6..6f0b4957de 100755 --- a/scripts/build_header.sh +++ b/scripts/build_header.sh @@ -1,5 +1,5 @@ #! /bin/sh -cargo install --version 0.20.0 cbindgen +cargo install --version 0.21.0 cbindgen --force rm -f ./sv2.h touch ./sv2.h diff --git a/scripts/clippy-fmt-and-test.sh b/scripts/clippy-fmt-and-test.sh index 185ad63eaa..db1fcbadf6 100755 --- a/scripts/clippy-fmt-and-test.sh +++ b/scripts/clippy-fmt-and-test.sh @@ -4,14 +4,14 @@ WORKSPACES="benches common protocols roles utils" for workspace in $WORKSPACES; do echo "Executing clippy on: $workspace" - cargo clippy --manifest-path="$workspace/Cargo.toml" -- -D warnings -A dead-code + cargo +1.75.0 clippy --manifest-path="$workspace/Cargo.toml" -- -D warnings -A dead-code if [ $? -ne 0 ]; then echo "Clippy found some errors in: $workspace" exit 1 fi echo "Running tests on: $workspace" - cargo test --manifest-path="$workspace/Cargo.toml" + cargo +1.75 test --manifest-path="$workspace/Cargo.toml" if [ $? -ne 0 ]; then echo "Tests failed in: $workspace" exit 1 diff --git a/scripts/code-coverage-report.sh b/scripts/code-coverage-report.sh index f54828ce00..e996221952 100755 --- a/scripts/code-coverage-report.sh +++ b/scripts/code-coverage-report.sh @@ -1,4 +1,4 @@ #! /bin/sh cd roles -RUST_LOG=debug cargo llvm-cov --ignore-filename-regex "utils/message-generator/|experimental/|protocols/" --cobertura --output-path "target/mg_coverage.xml" report +RUST_LOG=debug cargo +1.75.0 llvm-cov --ignore-filename-regex "utils/message-generator/|experimental/|protocols/" --cobertura --output-path "target/mg_coverage.xml" report diff --git a/scripts/message-generator-tests.sh b/scripts/message-generator-tests.sh index 01b3e919e3..a7897a07d5 100755 --- a/scripts/message-generator-tests.sh +++ b/scripts/message-generator-tests.sh @@ -13,5 +13,5 @@ for entry in `ls $search_dir`; do done cd roles -RUST_LOG=debug cargo llvm-cov --ignore-filename-regex "utils/message-generator/|experimental/|protocols/" --cobertura --output-path "target/mg_coverage.xml" report +RUST_LOG=debug cargo +1.75.0 llvm-cov --ignore-filename-regex "utils/message-generator/|experimental/|protocols/" --cobertura --output-path "target/mg_coverage.xml" report diff --git a/scripts/sv2-header-check.sh b/scripts/sv2-header-check.sh index 7464009ebf..8f3a7f50a4 100755 --- a/scripts/sv2-header-check.sh +++ b/scripts/sv2-header-check.sh @@ -21,7 +21,7 @@ if ! command -v sha1sum >/dev/null 2>&1; then exit 1 fi -cargo install --version 0.20.0 cbindgen +cargo install --version 0.21.0 cbindgen --force set -ex # cargo install cbindgen --force bts