Skip to content

Commit

Permalink
Merge pull request #983 from Fi3/AddParallelismToCpuMiner
Browse files Browse the repository at this point in the history
Add parallelism to cpu miner
  • Loading branch information
GitGab19 authored Jul 22, 2024
2 parents 5d569bb + 8c33ba4 commit 04f6df8
Show file tree
Hide file tree
Showing 4 changed files with 153 additions and 40 deletions.
12 changes: 6 additions & 6 deletions roles/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions roles/test-utils/mining-device/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,8 @@ codec_sv2 = { version = "^1.0.1", path = "../../../protocols/v2/codec-sv2", feat
roles_logic_sv2 = { version = "1.0.0", path = "../../../protocols/v2/roles-logic-sv2" }
const_sv2 = { version = "1.0.0", path = "../../../protocols/v2/const-sv2" }
async-channel = "1.5.1"
async-std={version = "1.8.0", features = ["attributes"]}
binary_sv2 = { version = "1.0.0", path = "../../../protocols/v2/binary-sv2/binary-sv2" }
network_helpers_sv2 = { version = "2.0.0", path = "../../roles-utils/network-helpers", features=["async_std"] }
network_helpers_sv2 = { version = "2.0.0", path = "../../roles-utils/network-helpers", features=["tokio"] }
buffer_sv2 = { version = "1.0.0", path = "../../../utils/buffer"}
async-recursion = "0.3.2"
rand = "0.8.4"
Expand All @@ -24,3 +23,4 @@ clap = { version = "^4.5.4", features = ["derive"] }
tracing = { version = "0.1" }
tracing-subscriber = "0.3"
sha2 = "0.10.6"
tokio = "^1.38.0"
21 changes: 21 additions & 0 deletions roles/test-utils/mining-device/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# CPU Sv2 mining device

Header only sv2 cpu miner.

```
Usage: mining-device [OPTIONS] --address-pool <ADDRESS_POOL>
Options:
-p, --pubkey-pool <PUBKEY_POOL> Pool pub key, when left empty the pool certificate is not checked
-i, --id-device <ID_DEVICE> Sometimes used by the pool to identify the device
-a, --address-pool <ADDRESS_POOL> Address of the pool in this format ip:port or domain:port
--handicap <HANDICAP> This value is used to slow down the cpu miner, it represents the number of micro-seconds that are awaited between hashes [default: 0]
--id-user <ID_USER> User id, used when a new channel is opened, it can be used by the pool to identify the miner
-h, --help Print help
-V, --version Print version
```

Usage example:
```
cargo run --release -- --address-pool 127.0.0.1:20000 --id-device device_id::SOLO::bc1qxy2kgdygjrsqtzq2n0yrf2493p83kkfjhx0wlh
```
156 changes: 124 additions & 32 deletions roles/test-utils/mining-device/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,17 @@
use async_std::net::TcpStream;
#![allow(clippy::option_map_unit_fn)]
use key_utils::Secp256k1PublicKey;
use network_helpers_sv2::Connection;
use network_helpers_sv2::noise_connection_tokio::Connection;
use roles_logic_sv2::utils::Id;
use std::{net::SocketAddr, sync::Arc, thread::sleep, time::Duration};
use std::{
net::{SocketAddr, ToSocketAddrs},
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
time::Duration,
};
use tokio::net::TcpStream;

use async_std::net::ToSocketAddrs;
use clap::Parser;
use rand::{thread_rng, Rng};
use std::time::Instant;
Expand Down Expand Up @@ -36,7 +43,7 @@ struct Args {
address_pool: String,
#[arg(
long,
help = "This value is used to slow down the cpu miner, it rapresents the number of micro-seconds that are awaited between hashes",
help = "This value is used to slow down the cpu miner, it represents the number of micro-seconds that are awaited between hashes",
default_value = "0"
)]
handicap: u32,
Expand All @@ -57,14 +64,12 @@ async fn connect(
let address = address
.clone()
.to_socket_addrs()
.await
.expect("Invalid pool address, use one of this formats: ip:port, domain:port")
.next()
.expect("Invalid pool address, use one of this formats: ip:port, domain:port");
info!("Connecting to pool at {}", address);
let socket = loop {
let pool =
async_std::future::timeout(Duration::from_secs(5), TcpStream::connect(address)).await;
let pool = tokio::time::timeout(Duration::from_secs(5), TcpStream::connect(address)).await;
match pool {
Ok(result) => match result {
Ok(socket) => break socket,
Expand All @@ -73,7 +78,7 @@ async fn connect(
"Failed to connect to Upstream role at {}, retrying in 5s: {}",
address, e
);
sleep(Duration::from_secs(5));
tokio::time::sleep(Duration::from_secs(5)).await;
}
},
Err(_) => {
Expand All @@ -85,15 +90,15 @@ async fn connect(
info!("Pool tcp connection established at {}", address);
let address = socket.peer_addr().unwrap();
let initiator = Initiator::new(pub_key.map(|e| e.0));
let (receiver, sender): (Receiver<EitherFrame>, Sender<EitherFrame>) =
Connection::new(socket, codec_sv2::HandshakeRole::Initiator(initiator), 10)
let (receiver, sender, _, _): (Receiver<EitherFrame>, Sender<EitherFrame>, _, _) =
Connection::new(socket, codec_sv2::HandshakeRole::Initiator(initiator))
.await
.unwrap();
info!("Pool noise connection established at {}", address);
Device::start(receiver, sender, address, device_id, user_id, handicap).await
}

#[async_std::main]
#[tokio::main(flavor = "current_thread")]
async fn main() {
let args = Args::parse();
tracing_subscriber::fmt::init();
Expand Down Expand Up @@ -218,6 +223,12 @@ impl ParseUpstreamCommonMessages<NoRouting> for SetupConnectionHandler {
}
}

#[derive(Debug, Clone)]
struct NewWorkNotifier {
should_send: bool,
sender: Sender<()>,
}

#[derive(Debug)]
pub struct Device {
#[allow(dead_code)]
Expand All @@ -230,13 +241,15 @@ pub struct Device {
jobs: Vec<NewMiningJob<'static>>,
prev_hash: Option<SetNewPrevHash<'static>>,
sequence_numbers: Id,
notify_changes_to_mining_thread: NewWorkNotifier,
}

fn open_channel(device_id: Option<String>) -> OpenStandardMiningChannel<'static> {
let user_identity = device_id.unwrap_or_default().try_into().unwrap();
let id: u32 = 10;
info!("Measuring CPU hashrate");
let nominal_hash_rate = measure_hashrate(5) as f32;
let p = std::thread::available_parallelism().unwrap().get() as u32 - 3;
let nominal_hash_rate = measure_hashrate(5) as f32 * p as f32;
info!("Pc hashrate is {}", nominal_hash_rate);
info!("MINING DEVICE: send open channel with request id {}", id);
OpenStandardMiningChannel {
Expand Down Expand Up @@ -267,6 +280,7 @@ impl Device {
.await;
info!("Pool sv2 connection established at {}", addr);
let miner = Arc::new(Mutex::new(Miner::new(handicap)));
let (notify_changes_to_mining_thread, update_miners) = async_channel::unbounded();
let self_ = Self {
channel_opened: false,
receiver: receiver.clone(),
Expand All @@ -276,6 +290,10 @@ impl Device {
prev_hash: None,
channel_id: None,
sequence_numbers: Id::new(),
notify_changes_to_mining_thread: NewWorkNotifier {
should_send: true,
sender: notify_changes_to_mining_thread,
},
};
let open_channel =
MiningDeviceMessages::Mining(Mining::OpenStandardMiningChannel(open_channel(user_id)));
Expand All @@ -286,24 +304,8 @@ impl Device {

let (share_send, share_recv) = async_channel::unbounded();

let handicap = miner.safe_lock(|m| m.handicap).unwrap();
std::thread::spawn(move || loop {
std::thread::sleep(std::time::Duration::from_micros(handicap.into()));
if miner.safe_lock(|m| m.next_share()).unwrap().is_valid() {
let nonce = miner.safe_lock(|m| m.header.unwrap().nonce).unwrap();
let time = miner.safe_lock(|m| m.header.unwrap().time).unwrap();
let job_id = miner.safe_lock(|m| m.job_id).unwrap();
let version = miner.safe_lock(|m| m.version).unwrap();
share_send
.try_send((nonce, job_id.unwrap(), version.unwrap(), time))
.unwrap();
}
miner
.safe_lock(|m| m.header.as_mut().map(|h| h.nonce += 1))
.unwrap();
});

async_std::task::spawn(async move {
start_mining_threads(update_miners, miner, share_send);
tokio::task::spawn(async move {
let recv = share_recv.clone();
loop {
let (nonce, job_id, version, ntime) = recv.recv().await.unwrap();
Expand All @@ -322,6 +324,21 @@ impl Device {
MiningRoutingLogic::None,
)
.unwrap();
let mut notify_changes_to_mining_thread = self_mutex
.safe_lock(|s| s.notify_changes_to_mining_thread.clone())
.unwrap();
if notify_changes_to_mining_thread.should_send
&& (message_type == const_sv2::MESSAGE_TYPE_NEW_MINING_JOB
|| message_type == const_sv2::MESSAGE_TYPE_SET_NEW_PREV_HASH
|| message_type == const_sv2::MESSAGE_TYPE_SET_TARGET)
{
notify_changes_to_mining_thread
.sender
.send(())
.await
.unwrap();
notify_changes_to_mining_thread.should_send = false;
};
match next {
SendTo::RelayNewMessageToRemote(_, m) => {
let sv2_frame: StdFrame = MiningDeviceMessages::Mining(m).try_into().unwrap();
Expand Down Expand Up @@ -425,6 +442,7 @@ impl ParseUpstreamMiningMessages<(), NullDownstreamMiningSelector, NoRouting> fo
self.miner
.safe_lock(|miner| miner.new_target(m.target.to_vec()))
.unwrap();
self.notify_changes_to_mining_thread.should_send = true;
Ok(SendTo::None(None))
}

Expand Down Expand Up @@ -477,6 +495,7 @@ impl ParseUpstreamMiningMessages<(), NullDownstreamMiningSelector, NoRouting> fo
.safe_lock(|miner| miner.new_header(p_h, &m))
.unwrap();
self.jobs = vec![m.as_static()];
self.notify_changes_to_mining_thread.should_send = true;
}
(true, _) => self.jobs.push(m.as_static()),
(false, None) => {
Expand Down Expand Up @@ -509,6 +528,7 @@ impl ParseUpstreamMiningMessages<(), NullDownstreamMiningSelector, NoRouting> fo
.unwrap();
self.jobs = vec![jobs[0].clone()];
self.prev_hash = Some(m.as_static());
self.notify_changes_to_mining_thread.should_send = true;
}
_ => panic!(),
}
Expand All @@ -533,6 +553,7 @@ impl ParseUpstreamMiningMessages<(), NullDownstreamMiningSelector, NoRouting> fo
self.miner
.safe_lock(|miner| miner.new_target(m.maximum_target.to_vec()))
.unwrap();
self.notify_changes_to_mining_thread.should_send = true;
Ok(SendTo::None(None))
}

Expand All @@ -541,7 +562,7 @@ impl ParseUpstreamMiningMessages<(), NullDownstreamMiningSelector, NoRouting> fo
}
}

#[derive(Debug)]
#[derive(Debug, Clone)]
struct Miner {
header: Option<BlockHeader>,
target: Option<Uint256>,
Expand Down Expand Up @@ -669,3 +690,74 @@ fn generate_random_32_byte_array() -> [u8; 32] {
rng.fill(&mut arr[..]);
arr
}

fn start_mining_threads(
have_new_job: Receiver<()>,
miner: Arc<Mutex<Miner>>,
share_send: Sender<(u32, u32, u32, u32)>,
) {
tokio::task::spawn(async move {
let mut killers: Vec<Arc<AtomicBool>> = vec![];
loop {
let available_parallelism = u32::max(
2,
std::thread::available_parallelism().unwrap().get() as u32,
);
let p = available_parallelism - 1;
let unit = u32::MAX / p;
while have_new_job.recv().await.is_ok() {
while let Some(killer) = killers.pop() {
killer.store(true, Ordering::Relaxed);
}
let miner = miner.safe_lock(|m| m.clone()).unwrap();
for i in 0..p {
let mut miner = miner.clone();
let share_send = share_send.clone();
let killer = Arc::new(AtomicBool::new(false));
miner.header.as_mut().map(|h| h.nonce = i * unit);
killers.push(killer.clone());
std::thread::spawn(move || {
mine(miner, share_send, killer);
});
}
}
}
});
}

fn mine(mut miner: Miner, share_send: Sender<(u32, u32, u32, u32)>, kill: Arc<AtomicBool>) {
if miner.handicap != 0 {
loop {
if kill.load(Ordering::Relaxed) {
break;
}
std::thread::sleep(std::time::Duration::from_micros(miner.handicap.into()));
if miner.next_share().is_valid() {
let nonce = miner.header.unwrap().nonce;
let time = miner.header.unwrap().time;
let job_id = miner.job_id.unwrap();
let version = miner.version;
share_send
.try_send((nonce, job_id, version.unwrap(), time))
.unwrap();
}
miner.header.as_mut().map(|h| h.nonce += 1);
}
} else {
loop {
if miner.next_share().is_valid() {
if kill.load(Ordering::Relaxed) {
break;
}
let nonce = miner.header.unwrap().nonce;
let time = miner.header.unwrap().time;
let job_id = miner.job_id.unwrap();
let version = miner.version;
share_send
.try_send((nonce, job_id, version.unwrap(), time))
.unwrap();
}
miner.header.as_mut().map(|h| h.nonce += 1);
}
}
}

0 comments on commit 04f6df8

Please sign in to comment.