Skip to content

Commit

Permalink
Add multi thread capabilities to cpu miner
Browse files Browse the repository at this point in the history
  • Loading branch information
fi3 committed Jun 19, 2024
1 parent 2efa74b commit 3f2a191
Showing 1 changed file with 90 additions and 20 deletions.
110 changes: 90 additions & 20 deletions roles/test-utils/mining-device/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
#![allow(clippy::option_map_unit_fn)]
use key_utils::Secp256k1PublicKey;
use network_helpers_sv2::noise_connection_tokio::Connection;
use roles_logic_sv2::utils::Id;
use std::{
net::{SocketAddr, ToSocketAddrs},
sync::Arc,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
thread::sleep,
time::Duration,
};
Expand Down Expand Up @@ -232,13 +236,15 @@ pub struct Device {
jobs: Vec<NewMiningJob<'static>>,
prev_hash: Option<SetNewPrevHash<'static>>,
sequence_numbers: Id,
notify_changes_to_mining_thread: (bool, Sender<()>),
}

fn open_channel(device_id: Option<String>) -> OpenStandardMiningChannel<'static> {
let user_identity = device_id.unwrap_or_default().try_into().unwrap();
let id: u32 = 10;
info!("Measuring CPU hashrate");
let nominal_hash_rate = measure_hashrate(5) as f32;
let p = std::thread::available_parallelism().unwrap().get() as u32 - 3;
let nominal_hash_rate = measure_hashrate(5) as f32 * p as f32;
info!("Pc hashrate is {}", nominal_hash_rate);
info!("MINING DEVICE: send open channel with request id {}", id);
OpenStandardMiningChannel {
Expand Down Expand Up @@ -269,6 +275,7 @@ impl Device {
.await;
info!("Pool sv2 connection established at {}", addr);
let miner = Arc::new(Mutex::new(Miner::new(handicap)));
let (notify_changes_to_mining_thread, update_miners) = async_channel::unbounded();
let self_ = Self {
channel_opened: false,
receiver: receiver.clone(),
Expand All @@ -278,6 +285,7 @@ impl Device {
prev_hash: None,
channel_id: None,
sequence_numbers: Id::new(),
notify_changes_to_mining_thread: (true, notify_changes_to_mining_thread),
};
let open_channel =
MiningDeviceMessages::Mining(Mining::OpenStandardMiningChannel(open_channel(user_id)));
Expand All @@ -288,23 +296,7 @@ impl Device {

let (share_send, share_recv) = async_channel::unbounded();

let handicap = miner.safe_lock(|m| m.handicap).unwrap();
std::thread::spawn(move || loop {
std::thread::sleep(std::time::Duration::from_micros(handicap.into()));
if miner.safe_lock(|m| m.next_share()).unwrap().is_valid() {
let nonce = miner.safe_lock(|m| m.header.unwrap().nonce).unwrap();
let time = miner.safe_lock(|m| m.header.unwrap().time).unwrap();
let job_id = miner.safe_lock(|m| m.job_id).unwrap();
let version = miner.safe_lock(|m| m.version).unwrap();
share_send
.try_send((nonce, job_id.unwrap(), version.unwrap(), time))
.unwrap();
}
miner
.safe_lock(|m| m.header.as_mut().map(|h| h.nonce += 1))
.unwrap();
});

start_mining_threads(update_miners, miner, share_send);
tokio::task::spawn(async move {
let recv = share_recv.clone();
loop {
Expand All @@ -324,6 +316,13 @@ impl Device {
MiningRoutingLogic::None,
)
.unwrap();
let mut notify_changes_to_mining_thread = self_mutex
.safe_lock(|s| s.notify_changes_to_mining_thread.clone())
.unwrap();
if notify_changes_to_mining_thread.0 {
notify_changes_to_mining_thread.1.send(()).await.unwrap();
notify_changes_to_mining_thread.0 = false;
};
match next {
SendTo::RelayNewMessageToRemote(_, m) => {
let sv2_frame: StdFrame = MiningDeviceMessages::Mining(m).try_into().unwrap();
Expand Down Expand Up @@ -427,6 +426,7 @@ impl ParseUpstreamMiningMessages<(), NullDownstreamMiningSelector, NoRouting> fo
self.miner
.safe_lock(|miner| miner.new_target(m.target.to_vec()))
.unwrap();
self.notify_changes_to_mining_thread.0 = true;
Ok(SendTo::None(None))
}

Expand Down Expand Up @@ -479,6 +479,7 @@ impl ParseUpstreamMiningMessages<(), NullDownstreamMiningSelector, NoRouting> fo
.safe_lock(|miner| miner.new_header(p_h, &m))
.unwrap();
self.jobs = vec![m.as_static()];
self.notify_changes_to_mining_thread.0 = true;
}
(true, _) => self.jobs.push(m.as_static()),
(false, None) => {
Expand Down Expand Up @@ -511,6 +512,7 @@ impl ParseUpstreamMiningMessages<(), NullDownstreamMiningSelector, NoRouting> fo
.unwrap();
self.jobs = vec![jobs[0].clone()];
self.prev_hash = Some(m.as_static());
self.notify_changes_to_mining_thread.0 = true;
}
_ => panic!(),
}
Expand All @@ -535,6 +537,7 @@ impl ParseUpstreamMiningMessages<(), NullDownstreamMiningSelector, NoRouting> fo
self.miner
.safe_lock(|miner| miner.new_target(m.maximum_target.to_vec()))
.unwrap();
self.notify_changes_to_mining_thread.0 = true;
Ok(SendTo::None(None))
}

Expand All @@ -543,7 +546,7 @@ impl ParseUpstreamMiningMessages<(), NullDownstreamMiningSelector, NoRouting> fo
}
}

#[derive(Debug)]
#[derive(Debug, Clone)]
struct Miner {
header: Option<BlockHeader>,
target: Option<Uint256>,
Expand Down Expand Up @@ -671,3 +674,70 @@ fn generate_random_32_byte_array() -> [u8; 32] {
rng.fill(&mut arr[..]);
arr
}

fn start_mining_threads(
have_new_job: Receiver<()>,
miner: Arc<Mutex<Miner>>,
share_send: Sender<(u32, u32, u32, u32)>,
) {
tokio::task::spawn(async move {
let mut killers: Vec<Arc<AtomicBool>> = vec![];
loop {
let p = std::thread::available_parallelism().unwrap().get() as u32 - 1;
let unit = u32::MAX / p;
while have_new_job.recv().await.is_ok() {
while let Some(killer) = killers.pop() {
killer.store(true, Ordering::Relaxed);
}
let miner = miner.safe_lock(|m| m.clone()).unwrap();
for i in 0..p {
let mut miner = miner.clone();
let share_send = share_send.clone();
let killer = Arc::new(AtomicBool::new(false));
miner.header.as_mut().map(|h| h.nonce = i * unit);
killers.push(killer.clone());
std::thread::spawn(move || {
mine(miner, share_send, killer);
});
}
}
}
});
}

fn mine(mut miner: Miner, share_send: Sender<(u32, u32, u32, u32)>, kill: Arc<AtomicBool>) {
if miner.handicap == 0 {
loop {
if kill.load(Ordering::Relaxed) {
break;
}
std::thread::sleep(std::time::Duration::from_micros(miner.handicap.into()));
if miner.next_share().is_valid() {
let nonce = miner.header.unwrap().nonce;
let time = miner.header.unwrap().time;
let job_id = miner.job_id.unwrap();
let version = miner.version;
share_send
.try_send((nonce, job_id, version.unwrap(), time))
.unwrap();
}
miner.header.as_mut().map(|h| h.nonce += 1);
}
} else {
loop {
if miner.next_share().is_valid() {
if kill.load(Ordering::Relaxed) {
break;
}
let nonce = miner.header.unwrap().nonce;
let time = miner.header.unwrap().time;
let job_id = miner.job_id.unwrap();
let version = miner.version;
share_send
.try_send((nonce, job_id, version.unwrap(), time))
.unwrap();
}
miner.header.as_mut().map(|h| h.nonce += 1);
}
}
}

0 comments on commit 3f2a191

Please sign in to comment.