Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add parallelism to cpu miner #983

Merged
merged 4 commits into from
Jul 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions roles/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions roles/test-utils/mining-device/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,8 @@ codec_sv2 = { version = "^1.0.1", path = "../../../protocols/v2/codec-sv2", feat
roles_logic_sv2 = { version = "1.0.0", path = "../../../protocols/v2/roles-logic-sv2" }
const_sv2 = { version = "1.0.0", path = "../../../protocols/v2/const-sv2" }
async-channel = "1.5.1"
async-std={version = "1.8.0", features = ["attributes"]}
binary_sv2 = { version = "1.0.0", path = "../../../protocols/v2/binary-sv2/binary-sv2" }
network_helpers_sv2 = { version = "2.0.0", path = "../../roles-utils/network-helpers", features=["async_std"] }
network_helpers_sv2 = { version = "2.0.0", path = "../../roles-utils/network-helpers", features=["tokio"] }
buffer_sv2 = { version = "1.0.0", path = "../../../utils/buffer"}
async-recursion = "0.3.2"
rand = "0.8.4"
Expand All @@ -24,3 +23,4 @@ clap = { version = "^4.5.4", features = ["derive"] }
tracing = { version = "0.1" }
tracing-subscriber = "0.3"
sha2 = "0.10.6"
tokio = "^1.38.0"
21 changes: 21 additions & 0 deletions roles/test-utils/mining-device/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# CPU Sv2 mining device

Header only sv2 cpu miner.

```
Usage: mining-device [OPTIONS] --address-pool <ADDRESS_POOL>

Options:
-p, --pubkey-pool <PUBKEY_POOL> Pool pub key, when left empty the pool certificate is not checked
-i, --id-device <ID_DEVICE> Sometimes used by the pool to identify the device
-a, --address-pool <ADDRESS_POOL> Address of the pool in this format ip:port or domain:port
--handicap <HANDICAP> This value is used to slow down the cpu miner, it represents the number of micro-seconds that are awaited between hashes [default: 0]
--id-user <ID_USER> User id, used when a new channel is opened, it can be used by the pool to identify the miner
-h, --help Print help
-V, --version Print version
```

Usage example:
```
cargo run --release -- --address-pool 127.0.0.1:20000 --id-device device_id::SOLO::bc1qxy2kgdygjrsqtzq2n0yrf2493p83kkfjhx0wlh
Fi3 marked this conversation as resolved.
Show resolved Hide resolved
```
156 changes: 124 additions & 32 deletions roles/test-utils/mining-device/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,17 @@
use async_std::net::TcpStream;
#![allow(clippy::option_map_unit_fn)]
use key_utils::Secp256k1PublicKey;
use network_helpers_sv2::Connection;
use network_helpers_sv2::noise_connection_tokio::Connection;
use roles_logic_sv2::utils::Id;
use std::{net::SocketAddr, sync::Arc, thread::sleep, time::Duration};
use std::{
net::{SocketAddr, ToSocketAddrs},
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
time::Duration,
};
use tokio::net::TcpStream;

use async_std::net::ToSocketAddrs;
use clap::Parser;
use rand::{thread_rng, Rng};
use std::time::Instant;
Expand Down Expand Up @@ -36,7 +43,7 @@ struct Args {
address_pool: String,
#[arg(
long,
help = "This value is used to slow down the cpu miner, it rapresents the number of micro-seconds that are awaited between hashes",
help = "This value is used to slow down the cpu miner, it represents the number of micro-seconds that are awaited between hashes",
default_value = "0"
)]
handicap: u32,
Expand All @@ -57,14 +64,12 @@ async fn connect(
let address = address
.clone()
.to_socket_addrs()
.await
.expect("Invalid pool address, use one of this formats: ip:port, domain:port")
.next()
.expect("Invalid pool address, use one of this formats: ip:port, domain:port");
info!("Connecting to pool at {}", address);
let socket = loop {
let pool =
async_std::future::timeout(Duration::from_secs(5), TcpStream::connect(address)).await;
let pool = tokio::time::timeout(Duration::from_secs(5), TcpStream::connect(address)).await;
match pool {
Ok(result) => match result {
Ok(socket) => break socket,
Expand All @@ -73,7 +78,7 @@ async fn connect(
"Failed to connect to Upstream role at {}, retrying in 5s: {}",
address, e
);
sleep(Duration::from_secs(5));
tokio::time::sleep(Duration::from_secs(5)).await;
}
},
Err(_) => {
Expand All @@ -85,15 +90,15 @@ async fn connect(
info!("Pool tcp connection established at {}", address);
let address = socket.peer_addr().unwrap();
let initiator = Initiator::new(pub_key.map(|e| e.0));
let (receiver, sender): (Receiver<EitherFrame>, Sender<EitherFrame>) =
Connection::new(socket, codec_sv2::HandshakeRole::Initiator(initiator), 10)
let (receiver, sender, _, _): (Receiver<EitherFrame>, Sender<EitherFrame>, _, _) =
Connection::new(socket, codec_sv2::HandshakeRole::Initiator(initiator))
.await
.unwrap();
info!("Pool noise connection established at {}", address);
Device::start(receiver, sender, address, device_id, user_id, handicap).await
}

#[async_std::main]
#[tokio::main(flavor = "current_thread")]
async fn main() {
let args = Args::parse();
tracing_subscriber::fmt::init();
Expand Down Expand Up @@ -218,6 +223,12 @@ impl ParseUpstreamCommonMessages<NoRouting> for SetupConnectionHandler {
}
}

#[derive(Debug, Clone)]
struct NewWorkNotifier {
should_send: bool,
sender: Sender<()>,
}

#[derive(Debug)]
pub struct Device {
#[allow(dead_code)]
Expand All @@ -230,13 +241,15 @@ pub struct Device {
jobs: Vec<NewMiningJob<'static>>,
prev_hash: Option<SetNewPrevHash<'static>>,
sequence_numbers: Id,
notify_changes_to_mining_thread: NewWorkNotifier,
}

fn open_channel(device_id: Option<String>) -> OpenStandardMiningChannel<'static> {
let user_identity = device_id.unwrap_or_default().try_into().unwrap();
let id: u32 = 10;
info!("Measuring CPU hashrate");
let nominal_hash_rate = measure_hashrate(5) as f32;
let p = std::thread::available_parallelism().unwrap().get() as u32 - 3;
let nominal_hash_rate = measure_hashrate(5) as f32 * p as f32;
info!("Pc hashrate is {}", nominal_hash_rate);
info!("MINING DEVICE: send open channel with request id {}", id);
OpenStandardMiningChannel {
Expand Down Expand Up @@ -267,6 +280,7 @@ impl Device {
.await;
info!("Pool sv2 connection established at {}", addr);
let miner = Arc::new(Mutex::new(Miner::new(handicap)));
let (notify_changes_to_mining_thread, update_miners) = async_channel::unbounded();
let self_ = Self {
channel_opened: false,
receiver: receiver.clone(),
Expand All @@ -276,6 +290,10 @@ impl Device {
prev_hash: None,
channel_id: None,
sequence_numbers: Id::new(),
notify_changes_to_mining_thread: NewWorkNotifier {
should_send: true,
sender: notify_changes_to_mining_thread,
},
};
let open_channel =
MiningDeviceMessages::Mining(Mining::OpenStandardMiningChannel(open_channel(user_id)));
Expand All @@ -286,24 +304,8 @@ impl Device {

let (share_send, share_recv) = async_channel::unbounded();

let handicap = miner.safe_lock(|m| m.handicap).unwrap();
std::thread::spawn(move || loop {
std::thread::sleep(std::time::Duration::from_micros(handicap.into()));
if miner.safe_lock(|m| m.next_share()).unwrap().is_valid() {
let nonce = miner.safe_lock(|m| m.header.unwrap().nonce).unwrap();
let time = miner.safe_lock(|m| m.header.unwrap().time).unwrap();
let job_id = miner.safe_lock(|m| m.job_id).unwrap();
let version = miner.safe_lock(|m| m.version).unwrap();
share_send
.try_send((nonce, job_id.unwrap(), version.unwrap(), time))
.unwrap();
}
miner
.safe_lock(|m| m.header.as_mut().map(|h| h.nonce += 1))
.unwrap();
});

async_std::task::spawn(async move {
start_mining_threads(update_miners, miner, share_send);
tokio::task::spawn(async move {
Fi3 marked this conversation as resolved.
Show resolved Hide resolved
let recv = share_recv.clone();
loop {
let (nonce, job_id, version, ntime) = recv.recv().await.unwrap();
Expand All @@ -322,6 +324,21 @@ impl Device {
MiningRoutingLogic::None,
)
.unwrap();
let mut notify_changes_to_mining_thread = self_mutex
.safe_lock(|s| s.notify_changes_to_mining_thread.clone())
.unwrap();
if notify_changes_to_mining_thread.should_send
&& (message_type == const_sv2::MESSAGE_TYPE_NEW_MINING_JOB
|| message_type == const_sv2::MESSAGE_TYPE_SET_NEW_PREV_HASH
|| message_type == const_sv2::MESSAGE_TYPE_SET_TARGET)
{
notify_changes_to_mining_thread
.sender
.send(())
.await
.unwrap();
notify_changes_to_mining_thread.should_send = false;
};
match next {
SendTo::RelayNewMessageToRemote(_, m) => {
let sv2_frame: StdFrame = MiningDeviceMessages::Mining(m).try_into().unwrap();
Expand Down Expand Up @@ -425,6 +442,7 @@ impl ParseUpstreamMiningMessages<(), NullDownstreamMiningSelector, NoRouting> fo
self.miner
.safe_lock(|miner| miner.new_target(m.target.to_vec()))
.unwrap();
self.notify_changes_to_mining_thread.should_send = true;
Ok(SendTo::None(None))
}

Expand Down Expand Up @@ -477,6 +495,7 @@ impl ParseUpstreamMiningMessages<(), NullDownstreamMiningSelector, NoRouting> fo
.safe_lock(|miner| miner.new_header(p_h, &m))
.unwrap();
self.jobs = vec![m.as_static()];
self.notify_changes_to_mining_thread.should_send = true;
}
(true, _) => self.jobs.push(m.as_static()),
(false, None) => {
Expand Down Expand Up @@ -509,6 +528,7 @@ impl ParseUpstreamMiningMessages<(), NullDownstreamMiningSelector, NoRouting> fo
.unwrap();
self.jobs = vec![jobs[0].clone()];
self.prev_hash = Some(m.as_static());
self.notify_changes_to_mining_thread.should_send = true;
}
_ => panic!(),
}
Expand All @@ -533,6 +553,7 @@ impl ParseUpstreamMiningMessages<(), NullDownstreamMiningSelector, NoRouting> fo
self.miner
.safe_lock(|miner| miner.new_target(m.maximum_target.to_vec()))
.unwrap();
self.notify_changes_to_mining_thread.should_send = true;
Ok(SendTo::None(None))
}

Expand All @@ -541,7 +562,7 @@ impl ParseUpstreamMiningMessages<(), NullDownstreamMiningSelector, NoRouting> fo
}
}

#[derive(Debug)]
#[derive(Debug, Clone)]
struct Miner {
header: Option<BlockHeader>,
target: Option<Uint256>,
Expand Down Expand Up @@ -669,3 +690,74 @@ fn generate_random_32_byte_array() -> [u8; 32] {
rng.fill(&mut arr[..]);
arr
}

fn start_mining_threads(
have_new_job: Receiver<()>,
miner: Arc<Mutex<Miner>>,
share_send: Sender<(u32, u32, u32, u32)>,
) {
tokio::task::spawn(async move {
let mut killers: Vec<Arc<AtomicBool>> = vec![];
loop {
let available_parallelism = u32::max(
Fi3 marked this conversation as resolved.
Show resolved Hide resolved
2,
std::thread::available_parallelism().unwrap().get() as u32,
);
let p = available_parallelism - 1;
let unit = u32::MAX / p;
while have_new_job.recv().await.is_ok() {
while let Some(killer) = killers.pop() {
killer.store(true, Ordering::Relaxed);
}
let miner = miner.safe_lock(|m| m.clone()).unwrap();
for i in 0..p {
let mut miner = miner.clone();
let share_send = share_send.clone();
let killer = Arc::new(AtomicBool::new(false));
miner.header.as_mut().map(|h| h.nonce = i * unit);
killers.push(killer.clone());
std::thread::spawn(move || {
mine(miner, share_send, killer);
});
}
}
}
});
}

fn mine(mut miner: Miner, share_send: Sender<(u32, u32, u32, u32)>, kill: Arc<AtomicBool>) {
if miner.handicap != 0 {
loop {
if kill.load(Ordering::Relaxed) {
break;
}
std::thread::sleep(std::time::Duration::from_micros(miner.handicap.into()));
if miner.next_share().is_valid() {
let nonce = miner.header.unwrap().nonce;
let time = miner.header.unwrap().time;
let job_id = miner.job_id.unwrap();
let version = miner.version;
share_send
.try_send((nonce, job_id, version.unwrap(), time))
.unwrap();
}
miner.header.as_mut().map(|h| h.nonce += 1);
}
plebhash marked this conversation as resolved.
Show resolved Hide resolved
} else {
loop {
if miner.next_share().is_valid() {
if kill.load(Ordering::Relaxed) {
break;
}
let nonce = miner.header.unwrap().nonce;
let time = miner.header.unwrap().time;
let job_id = miner.job_id.unwrap();
let version = miner.version;
share_send
.try_send((nonce, job_id, version.unwrap(), time))
.unwrap();
}
miner.header.as_mut().map(|h| h.nonce += 1);
}
}
}
Loading