diff --git a/roles/jd-client/src/lib/mod.rs b/roles/jd-client/src/lib/mod.rs index 455e901cfd..7e2fe7edc8 100644 --- a/roles/jd-client/src/lib/mod.rs +++ b/roles/jd-client/src/lib/mod.rs @@ -8,6 +8,22 @@ pub mod upstream_sv2; use std::{sync::atomic::AtomicBool, time::Duration}; +use job_declarator::JobDeclarator; +use proxy_config::ProxyConfig; +use template_receiver::TemplateRx; + +use async_channel::{bounded, unbounded}; +use futures::{select, FutureExt}; +use roles_logic_sv2::utils::Mutex; +use std::{ + net::{IpAddr, SocketAddr}, + str::FromStr, + sync::Arc, +}; +use tokio::task::AbortHandle; + +use tracing::{error, info}; + /// Is used by the template receiver and the downstream. When a NewTemplate is received the context /// that is running the template receiver set this value to false and then the message is sent to /// the context that is running the Downstream that do something and then set it back to true. @@ -31,6 +47,307 @@ use std::{sync::atomic::AtomicBool, time::Duration}; /// between all the contexts is not necessary. pub static IS_NEW_TEMPLATE_HANDLED: AtomicBool = AtomicBool::new(true); +/// Job Declarator Client (or JDC) is the role which is Miner-side, in charge of creating new +/// mining jobs from the templates received by the Template Provider to which it is connected. It +/// declares custom jobs to the JDS, in order to start working on them. +/// JDC is also responsible for putting in action the Pool-fallback mechanism, automatically +/// switching to backup Pools in case of declared custom jobs refused by JDS (which is Pool side). +/// As a solution of last-resort, it is able to switch to Solo Mining until new safe Pools appear +/// in the market. +pub struct JobDeclaratorClient { + /// Configuration of the proxy server [`JobDeclaratorClient`] is connected to. + config: ProxyConfig, +} + +impl JobDeclaratorClient { + pub fn new(config: ProxyConfig) -> Self { + Self { config } + } + + pub async fn start(self) { + let mut upstream_index = 0; + let mut interrupt_signal_future = Box::pin(tokio::signal::ctrl_c().fuse()); + + // Channel used to manage failed tasks + let (tx_status, rx_status) = unbounded(); + + let task_collector = Arc::new(Mutex::new(vec![])); + + let proxy_config = &self.config; + + loop { + let task_collector = task_collector.clone(); + let tx_status = tx_status.clone(); + if let Some(upstream) = proxy_config.upstreams.get(upstream_index) { + self.initialize_jd(tx_status.clone(), task_collector.clone(), upstream.clone()) + .await; + } else { + self.initialize_jd_as_solo_miner(tx_status.clone(), task_collector.clone()) + .await; + } + // Check all tasks if is_finished() is true, if so exit + loop { + let task_status = select! { + task_status = rx_status.recv().fuse() => task_status, + interrupt_signal = interrupt_signal_future => { + match interrupt_signal { + Ok(()) => { + info!("Interrupt received"); + }, + Err(err) => { + error!("Unable to listen for interrupt signal: {}", err); + // we also shut down in case of error + }, + } + std::process::exit(0); + } + }; + let task_status: status::Status = task_status.unwrap(); + + match task_status.state { + // Should only be sent by the downstream listener + status::State::DownstreamShutdown(err) => { + error!("SHUTDOWN from: {}", err); + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + task_collector + .safe_lock(|s| { + for handle in s { + handle.abort(); + } + }) + .unwrap(); + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + break; + } + status::State::UpstreamShutdown(err) => { + error!("SHUTDOWN from: {}", err); + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + task_collector + .safe_lock(|s| { + for handle in s { + handle.abort(); + } + }) + .unwrap(); + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + break; + } + status::State::UpstreamRogue => { + error!("Changin Pool"); + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + task_collector + .safe_lock(|s| { + for handle in s { + handle.abort(); + } + }) + .unwrap(); + upstream_index += 1; + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + break; + } + status::State::Healthy(msg) => { + info!("HEALTHY message: {}", msg); + } + } + } + } + } + + async fn initialize_jd_as_solo_miner( + &self, + tx_status: async_channel::Sender>, + task_collector: Arc>>, + ) { + let proxy_config = &self.config; + let timeout = proxy_config.timeout; + let miner_tx_out = proxy_config::get_coinbase_output(proxy_config).unwrap(); + + // When Downstream receive a share that meets bitcoin target it transformit in a + // SubmitSolution and send it to the TemplateReceiver + let (send_solution, recv_solution) = bounded(10); + + // Format `Downstream` connection address + let downstream_addr = SocketAddr::new( + IpAddr::from_str(&proxy_config.downstream_address).unwrap(), + proxy_config.downstream_port, + ); + + // Wait for downstream to connect + let downstream = downstream::listen_for_downstream_mining( + downstream_addr, + None, + send_solution, + proxy_config.withhold, + proxy_config.authority_public_key, + proxy_config.authority_secret_key, + proxy_config.cert_validity_sec, + task_collector.clone(), + status::Sender::Downstream(tx_status.clone()), + miner_tx_out.clone(), + None, + ) + .await + .unwrap(); + + // Initialize JD part + let mut parts = proxy_config.tp_address.split(':'); + let ip_tp = parts.next().unwrap().to_string(); + let port_tp = parts.next().unwrap().parse::().unwrap(); + + TemplateRx::connect( + SocketAddr::new(IpAddr::from_str(ip_tp.as_str()).unwrap(), port_tp), + recv_solution, + status::Sender::TemplateReceiver(tx_status.clone()), + None, + downstream, + task_collector, + Arc::new(Mutex::new(PoolChangerTrigger::new(timeout))), + miner_tx_out.clone(), + proxy_config.tp_authority_public_key, + false, + ) + .await; + } + + async fn initialize_jd( + &self, + tx_status: async_channel::Sender>, + task_collector: Arc>>, + upstream_config: proxy_config::Upstream, + ) { + let proxy_config = &self.config; + let timeout = proxy_config.timeout; + let test_only_do_not_send_solution_to_tp = proxy_config + .test_only_do_not_send_solution_to_tp + .unwrap_or(false); + + // Format `Upstream` connection address + let mut parts = upstream_config.pool_address.split(':'); + let address = parts + .next() + .unwrap_or_else(|| panic!("Invalid pool address {}", upstream_config.pool_address)); + let port = parts + .next() + .and_then(|p| p.parse::().ok()) + .unwrap_or_else(|| panic!("Invalid pool address {}", upstream_config.pool_address)); + let upstream_addr = SocketAddr::new( + IpAddr::from_str(address).unwrap_or_else(|_| { + panic!("Invalid pool address {}", upstream_config.pool_address) + }), + port, + ); + + // When Downstream receive a share that meets bitcoin target it transformit in a + // SubmitSolution and send it to the TemplateReceiver + let (send_solution, recv_solution) = bounded(10); + + // Instantiate a new `Upstream` (SV2 Pool) + let upstream = match upstream_sv2::Upstream::new( + upstream_addr, + upstream_config.authority_pubkey, + 0, // TODO + upstream_config.pool_signature.clone(), + status::Sender::Upstream(tx_status.clone()), + task_collector.clone(), + Arc::new(Mutex::new(PoolChangerTrigger::new(timeout))), + ) + .await + { + Ok(upstream) => upstream, + Err(e) => { + error!("Failed to create upstream: {}", e); + panic!() + } + }; + + // Start receiving messages from the SV2 Upstream role + if let Err(e) = upstream_sv2::Upstream::parse_incoming(upstream.clone()) { + error!("failed to create sv2 parser: {}", e); + panic!() + } + + match upstream_sv2::Upstream::setup_connection( + upstream.clone(), + proxy_config.min_supported_version, + proxy_config.max_supported_version, + ) + .await + { + Ok(_) => info!("Connected to Upstream!"), + Err(e) => { + error!("Failed to connect to Upstream EXITING! : {}", e); + panic!() + } + } + + // Format `Downstream` connection address + let downstream_addr = SocketAddr::new( + IpAddr::from_str(&proxy_config.downstream_address).unwrap(), + proxy_config.downstream_port, + ); + + // Initialize JD part + let mut parts = proxy_config.tp_address.split(':'); + let ip_tp = parts.next().unwrap().to_string(); + let port_tp = parts.next().unwrap().parse::().unwrap(); + + let mut parts = upstream_config.jd_address.split(':'); + let ip_jd = parts.next().unwrap().to_string(); + let port_jd = parts.next().unwrap().parse::().unwrap(); + let jd = match JobDeclarator::new( + SocketAddr::new(IpAddr::from_str(ip_jd.as_str()).unwrap(), port_jd), + upstream_config.authority_pubkey.into_bytes(), + proxy_config.clone(), + upstream.clone(), + task_collector.clone(), + ) + .await + { + Ok(c) => c, + Err(e) => { + let _ = tx_status + .send(status::Status { + state: status::State::UpstreamShutdown(e), + }) + .await; + return; + } + }; + + // Wait for downstream to connect + let downstream = downstream::listen_for_downstream_mining( + downstream_addr, + Some(upstream), + send_solution, + proxy_config.withhold, + proxy_config.authority_public_key, + proxy_config.authority_secret_key, + proxy_config.cert_validity_sec, + task_collector.clone(), + status::Sender::Downstream(tx_status.clone()), + vec![], + Some(jd.clone()), + ) + .await + .unwrap(); + + TemplateRx::connect( + SocketAddr::new(IpAddr::from_str(ip_tp.as_str()).unwrap(), port_tp), + recv_solution, + status::Sender::TemplateReceiver(tx_status.clone()), + Some(jd.clone()), + downstream, + task_collector, + Arc::new(Mutex::new(PoolChangerTrigger::new(timeout))), + vec![], + proxy_config.tp_authority_public_key, + test_only_do_not_send_solution_to_tp, + ) + .await; + } +} + #[derive(Debug)] pub struct PoolChangerTrigger { timeout: Duration, diff --git a/roles/jd-client/src/main.rs b/roles/jd-client/src/main.rs index 27f05f361d..cbc3fbb61b 100644 --- a/roles/jd-client/src/main.rs +++ b/roles/jd-client/src/main.rs @@ -1,30 +1,15 @@ #![allow(special_module_name)] - mod args; mod lib; use lib::{ error::{Error, ProxyResult}, - job_declarator::JobDeclarator, proxy_config::ProxyConfig, - status, - template_receiver::TemplateRx, - PoolChangerTrigger, + status, JobDeclaratorClient, }; use args::Args; -use async_channel::{bounded, unbounded}; -use futures::{select, FutureExt}; -use roles_logic_sv2::utils::Mutex; -use std::{ - net::{IpAddr, SocketAddr}, - str::FromStr, - sync::Arc, - time::Duration, -}; -use tokio::task::AbortHandle; - -use tracing::{error, info}; +use tracing::error; /// Process CLI args, if any. #[allow(clippy::result_large_err)] @@ -96,305 +81,14 @@ fn process_cli_args<'a>() -> ProxyResult<'a, ProxyConfig> { #[tokio::main] async fn main() { tracing_subscriber::fmt::init(); - - let mut upstream_index = 0; - let mut interrupt_signal_future = Box::pin(tokio::signal::ctrl_c().fuse()); - - // Channel used to manage failed tasks - let (tx_status, rx_status) = unbounded(); - - let task_collector = Arc::new(Mutex::new(vec![])); - - let proxy_config = match process_cli_args() { - Ok(p) => p, - Err(e) => { - error!("Failed to read config file: {}", e); - return; - } - }; - - loop { - { - let task_collector = task_collector.clone(); - let tx_status = tx_status.clone(); - - if let Some(upstream) = proxy_config.upstreams.get(upstream_index) { - let initialize = initialize_jd( - tx_status.clone(), - task_collector, - upstream.clone(), - proxy_config.timeout, - ); - tokio::task::spawn(initialize); - } else { - let initialize = initialize_jd_as_solo_miner( - tx_status.clone(), - task_collector, - proxy_config.timeout, - ); - tokio::task::spawn(initialize); - } - } - // Check all tasks if is_finished() is true, if so exit - loop { - let task_status = select! { - task_status = rx_status.recv().fuse() => task_status, - interrupt_signal = interrupt_signal_future => { - match interrupt_signal { - Ok(()) => { - info!("Interrupt received"); - }, - Err(err) => { - error!("Unable to listen for interrupt signal: {}", err); - // we also shut down in case of error - }, - } - std::process::exit(0); - } - }; - let task_status: status::Status = task_status.unwrap(); - - match task_status.state { - // Should only be sent by the downstream listener - status::State::DownstreamShutdown(err) => { - error!("SHUTDOWN from: {}", err); - tokio::time::sleep(std::time::Duration::from_secs(1)).await; - task_collector - .safe_lock(|s| { - for handle in s { - handle.abort(); - } - }) - .unwrap(); - tokio::time::sleep(std::time::Duration::from_secs(1)).await; - break; - } - status::State::UpstreamShutdown(err) => { - error!("SHUTDOWN from: {}", err); - tokio::time::sleep(std::time::Duration::from_secs(1)).await; - task_collector - .safe_lock(|s| { - for handle in s { - handle.abort(); - } - }) - .unwrap(); - tokio::time::sleep(std::time::Duration::from_secs(1)).await; - break; - } - status::State::UpstreamRogue => { - error!("Changin Pool"); - tokio::time::sleep(std::time::Duration::from_secs(1)).await; - task_collector - .safe_lock(|s| { - for handle in s { - handle.abort(); - } - }) - .unwrap(); - upstream_index += 1; - tokio::time::sleep(std::time::Duration::from_secs(1)).await; - break; - } - status::State::Healthy(msg) => { - info!("HEALTHY message: {}", msg); - } - } - } - } -} -async fn initialize_jd_as_solo_miner( - tx_status: async_channel::Sender>, - task_collector: Arc>>, - timeout: Duration, -) { let proxy_config = match process_cli_args() { Ok(p) => p, Err(e) => { - error!("Failed to read config file: {}", e); - return; - } - }; - let miner_tx_out = lib::proxy_config::get_coinbase_output(&proxy_config).unwrap(); - - // When Downstream receive a share that meets bitcoin target it transform it in a - // SubmitSolution and send it to the TemplateReceiver - let (send_solution, recv_solution) = bounded(10); - - // Format `Downstream` connection address - let downstream_addr = SocketAddr::new( - IpAddr::from_str(&proxy_config.downstream_address).unwrap(), - proxy_config.downstream_port, - ); - - // Wait for downstream to connect - let downstream = lib::downstream::listen_for_downstream_mining( - downstream_addr, - None, - send_solution, - proxy_config.withhold, - proxy_config.authority_public_key, - proxy_config.authority_secret_key, - proxy_config.cert_validity_sec, - task_collector.clone(), - status::Sender::Downstream(tx_status.clone()), - miner_tx_out.clone(), - None, - ) - .await - .unwrap(); - - // Initialize JD part - let mut parts = proxy_config.tp_address.split(':'); - let ip_tp = parts.next().unwrap().to_string(); - let port_tp = parts.next().unwrap().parse::().unwrap(); - - TemplateRx::connect( - SocketAddr::new(IpAddr::from_str(ip_tp.as_str()).unwrap(), port_tp), - recv_solution, - status::Sender::TemplateReceiver(tx_status.clone()), - None, - downstream, - task_collector, - Arc::new(Mutex::new(PoolChangerTrigger::new(timeout))), - miner_tx_out.clone(), - proxy_config.tp_authority_public_key, - false, - ) - .await; -} - -async fn initialize_jd( - tx_status: async_channel::Sender>, - task_collector: Arc>>, - upstream_config: lib::proxy_config::Upstream, - timeout: Duration, -) { - let proxy_config = process_cli_args().unwrap(); - let test_only_do_not_send_solution_to_tp = proxy_config - .test_only_do_not_send_solution_to_tp - .unwrap_or(false); - - // Format `Upstream` connection address - let mut parts = upstream_config.pool_address.split(':'); - let address = parts - .next() - .unwrap_or_else(|| panic!("Invalid pool address {}", upstream_config.pool_address)); - let port = parts - .next() - .and_then(|p| p.parse::().ok()) - .unwrap_or_else(|| panic!("Invalid pool address {}", upstream_config.pool_address)); - let upstream_addr = SocketAddr::new( - IpAddr::from_str(address) - .unwrap_or_else(|_| panic!("Invalid pool address {}", upstream_config.pool_address)), - port, - ); - - // When Downstream receive a share that meets bitcoin target it transform it in a - // SubmitSolution and send it to the TemplateReceiver - let (send_solution, recv_solution) = bounded(10); - - // Instantiate a new `Upstream` (SV2 Pool) - let upstream = match lib::upstream_sv2::Upstream::new( - upstream_addr, - upstream_config.authority_pubkey, - 0, // TODO - upstream_config.pool_signature.clone(), - status::Sender::Upstream(tx_status.clone()), - task_collector.clone(), - Arc::new(Mutex::new(PoolChangerTrigger::new(timeout))), - ) - .await - { - Ok(upstream) => upstream, - Err(e) => { - error!("Failed to create upstream: {}", e); - panic!() - } - }; - - // Start receiving messages from the SV2 Upstream role - if let Err(e) = lib::upstream_sv2::Upstream::parse_incoming(upstream.clone()) { - error!("failed to create sv2 parser: {}", e); - panic!() - } - - match lib::upstream_sv2::Upstream::setup_connection( - upstream.clone(), - proxy_config.min_supported_version, - proxy_config.max_supported_version, - ) - .await - { - Ok(_) => info!("Connected to Upstream!"), - Err(e) => { - error!("Failed to connect to Upstream EXITING! : {}", e); - panic!() - } - } - - // Format `Downstream` connection address - let downstream_addr = SocketAddr::new( - IpAddr::from_str(&proxy_config.downstream_address).unwrap(), - proxy_config.downstream_port, - ); - - // Initialize JD part - let mut parts = proxy_config.tp_address.split(':'); - let ip_tp = parts.next().unwrap().to_string(); - let port_tp = parts.next().unwrap().parse::().unwrap(); - - let mut parts = upstream_config.jd_address.split(':'); - let ip_jd = parts.next().unwrap().to_string(); - let port_jd = parts.next().unwrap().parse::().unwrap(); - let jd = match JobDeclarator::new( - SocketAddr::new(IpAddr::from_str(ip_jd.as_str()).unwrap(), port_jd), - upstream_config.authority_pubkey.into_bytes(), - proxy_config.clone(), - upstream.clone(), - task_collector.clone(), - ) - .await - { - Ok(c) => c, - Err(e) => { - let _ = tx_status - .send(status::Status { - state: status::State::UpstreamShutdown(e), - }) - .await; + error!("Job Declarator Client Config error: {}", e); return; } }; - // Wait for downstream to connect - let downstream = lib::downstream::listen_for_downstream_mining( - downstream_addr, - Some(upstream), - send_solution, - proxy_config.withhold, - proxy_config.authority_public_key, - proxy_config.authority_secret_key, - proxy_config.cert_validity_sec, - task_collector.clone(), - status::Sender::Downstream(tx_status.clone()), - vec![], - Some(jd.clone()), - ) - .await - .unwrap(); - - TemplateRx::connect( - SocketAddr::new(IpAddr::from_str(ip_tp.as_str()).unwrap(), port_tp), - recv_solution, - status::Sender::TemplateReceiver(tx_status.clone()), - Some(jd.clone()), - downstream, - task_collector, - Arc::new(Mutex::new(PoolChangerTrigger::new(timeout))), - vec![], - proxy_config.tp_authority_public_key, - test_only_do_not_send_solution_to_tp, - ) - .await; + let jdc = JobDeclaratorClient::new(proxy_config); + jdc.start().await; }