From 6bb7eb03eb2558687839395f2ed2ebd6921072b2 Mon Sep 17 00:00:00 2001 From: bit-aloo Date: Mon, 23 Dec 2024 22:16:57 +0530 Subject: [PATCH] fix jdc sigterm signalling issue --- roles/jd-client/src/lib/mod.rs | 420 +++++++++--------- .../jds-config-local-example.toml | 2 +- 2 files changed, 218 insertions(+), 204 deletions(-) diff --git a/roles/jd-client/src/lib/mod.rs b/roles/jd-client/src/lib/mod.rs index 467ac52b54..64ce5b0d6c 100644 --- a/roles/jd-client/src/lib/mod.rs +++ b/roles/jd-client/src/lib/mod.rs @@ -20,7 +20,7 @@ use std::{ str::FromStr, sync::Arc, }; -use tokio::task::AbortHandle; +use tokio::{sync::Notify, task::AbortHandle}; use tracing::{error, info}; @@ -57,48 +57,65 @@ pub static IS_NEW_TEMPLATE_HANDLED: AtomicBool = AtomicBool::new(true); pub struct JobDeclaratorClient { /// Configuration of the proxy server [`JobDeclaratorClient`] is connected to. config: ProxyConfig, + shutdown: Arc, } impl JobDeclaratorClient { pub fn new(config: ProxyConfig) -> Self { - Self { config } + Self { + config, + shutdown: Arc::new(Notify::new()), + } } pub async fn start(self) { let mut upstream_index = 0; - let mut interrupt_signal_future = Box::pin(tokio::signal::ctrl_c().fuse()); // Channel used to manage failed tasks let (tx_status, rx_status) = unbounded(); let task_collector = Arc::new(Mutex::new(vec![])); - let proxy_config = &self.config; + tokio::spawn({ + let shutdown_signal = self.shutdown.clone(); + async move { + if tokio::signal::ctrl_c().await.is_ok() { + info!("Interrupt received"); + shutdown_signal.notify_one(); + } + } + }); + let proxy_config = self.config; loop { let task_collector = task_collector.clone(); let tx_status = tx_status.clone(); + let proxy_config = proxy_config.clone(); if let Some(upstream) = proxy_config.upstreams.get(upstream_index) { - self.initialize_jd(tx_status.clone(), task_collector.clone(), upstream.clone()) - .await; + let tx_status = tx_status.clone(); + let task_collector = task_collector.clone(); + let upstream = upstream.clone(); + tokio::spawn(async move { + initialize_jd(proxy_config, tx_status, task_collector, upstream).await; + }); } else { - self.initialize_jd_as_solo_miner(tx_status.clone(), task_collector.clone()) + let tx_status = tx_status.clone(); + let task_collector = task_collector.clone(); + tokio::spawn(async move { + initialize_jd_as_solo_miner( + proxy_config, + tx_status.clone(), + task_collector.clone(), + ) .await; + }); } // Check all tasks if is_finished() is true, if so exit loop { let task_status = select! { task_status = rx_status.recv().fuse() => task_status, - interrupt_signal = interrupt_signal_future => { - match interrupt_signal { - Ok(()) => { - info!("Interrupt received"); - }, - Err(err) => { - error!("Unable to listen for interrupt signal: {}", err); - // we also shut down in case of error - }, - } + _ = self.shutdown.notified().fuse() => { + info!("Shutting down gracefully..."); std::process::exit(0); } }; @@ -153,199 +170,196 @@ impl JobDeclaratorClient { } } } +} - async fn initialize_jd_as_solo_miner( - &self, - tx_status: async_channel::Sender>, - task_collector: Arc>>, - ) { - let proxy_config = &self.config; - let timeout = proxy_config.timeout; - let miner_tx_out = proxy_config::get_coinbase_output(proxy_config).unwrap(); - - // When Downstream receive a share that meets bitcoin target it transformit in a - // SubmitSolution and send it to the TemplateReceiver - let (send_solution, recv_solution) = bounded(10); - - // Format `Downstream` connection address - let downstream_addr = SocketAddr::new( - IpAddr::from_str(&proxy_config.downstream_address).unwrap(), - proxy_config.downstream_port, - ); - - // Wait for downstream to connect - let downstream = downstream::listen_for_downstream_mining( - downstream_addr, - None, - send_solution, - proxy_config.withhold, - proxy_config.authority_public_key, - proxy_config.authority_secret_key, - proxy_config.cert_validity_sec, - task_collector.clone(), - status::Sender::Downstream(tx_status.clone()), - miner_tx_out.clone(), - None, - ) - .await - .unwrap(); - - // Initialize JD part - let mut parts = proxy_config.tp_address.split(':'); - let ip_tp = parts.next().unwrap().to_string(); - let port_tp = parts.next().unwrap().parse::().unwrap(); - - TemplateRx::connect( - SocketAddr::new(IpAddr::from_str(ip_tp.as_str()).unwrap(), port_tp), - recv_solution, - status::Sender::TemplateReceiver(tx_status.clone()), - None, - downstream, - task_collector, - Arc::new(Mutex::new(PoolChangerTrigger::new(timeout))), - miner_tx_out.clone(), - proxy_config.tp_authority_public_key, - false, - ) - .await; - } +async fn initialize_jd_as_solo_miner( + proxy_config: ProxyConfig, + tx_status: async_channel::Sender>, + task_collector: Arc>>, +) { + let timeout = proxy_config.timeout; + let miner_tx_out = proxy_config::get_coinbase_output(&proxy_config).unwrap(); + + // When Downstream receive a share that meets bitcoin target it transformit in a + // SubmitSolution and send it to the TemplateReceiver + let (send_solution, recv_solution) = bounded(10); + + // Format `Downstream` connection address + let downstream_addr = SocketAddr::new( + IpAddr::from_str(&proxy_config.downstream_address).unwrap(), + proxy_config.downstream_port, + ); + + // Wait for downstream to connect + let downstream = downstream::listen_for_downstream_mining( + downstream_addr, + None, + send_solution, + proxy_config.withhold, + proxy_config.authority_public_key, + proxy_config.authority_secret_key, + proxy_config.cert_validity_sec, + task_collector.clone(), + status::Sender::Downstream(tx_status.clone()), + miner_tx_out.clone(), + None, + ) + .await + .unwrap(); + + // Initialize JD part + let mut parts = proxy_config.tp_address.split(':'); + let ip_tp = parts.next().unwrap().to_string(); + let port_tp = parts.next().unwrap().parse::().unwrap(); + + TemplateRx::connect( + SocketAddr::new(IpAddr::from_str(ip_tp.as_str()).unwrap(), port_tp), + recv_solution, + status::Sender::TemplateReceiver(tx_status.clone()), + None, + downstream, + task_collector, + Arc::new(Mutex::new(PoolChangerTrigger::new(timeout))), + miner_tx_out.clone(), + proxy_config.tp_authority_public_key, + false, + ) + .await; +} - async fn initialize_jd( - &self, - tx_status: async_channel::Sender>, - task_collector: Arc>>, - upstream_config: proxy_config::Upstream, - ) { - let proxy_config = &self.config; - let timeout = proxy_config.timeout; - let test_only_do_not_send_solution_to_tp = proxy_config - .test_only_do_not_send_solution_to_tp - .unwrap_or(false); - - // Format `Upstream` connection address - let mut parts = upstream_config.pool_address.split(':'); - let address = parts - .next() - .unwrap_or_else(|| panic!("Invalid pool address {}", upstream_config.pool_address)); - let port = parts - .next() - .and_then(|p| p.parse::().ok()) - .unwrap_or_else(|| panic!("Invalid pool address {}", upstream_config.pool_address)); - let upstream_addr = SocketAddr::new( - IpAddr::from_str(address).unwrap_or_else(|_| { - panic!("Invalid pool address {}", upstream_config.pool_address) - }), - port, - ); - - // When Downstream receive a share that meets bitcoin target it transformit in a - // SubmitSolution and send it to the TemplateReceiver - let (send_solution, recv_solution) = bounded(10); - - // Instantiate a new `Upstream` (SV2 Pool) - let upstream = match upstream_sv2::Upstream::new( - upstream_addr, - upstream_config.authority_pubkey, - 0, // TODO - upstream_config.pool_signature.clone(), - status::Sender::Upstream(tx_status.clone()), - task_collector.clone(), - Arc::new(Mutex::new(PoolChangerTrigger::new(timeout))), - ) - .await - { - Ok(upstream) => upstream, - Err(e) => { - error!("Failed to create upstream: {}", e); - panic!() - } - }; - - match upstream_sv2::Upstream::setup_connection( - upstream.clone(), - proxy_config.min_supported_version, - proxy_config.max_supported_version, - ) - .await - { - Ok(_) => info!("Connected to Upstream!"), - Err(e) => { - error!("Failed to connect to Upstream EXITING! : {}", e); - panic!() - } +async fn initialize_jd( + proxy_config: ProxyConfig, + tx_status: async_channel::Sender>, + task_collector: Arc>>, + upstream_config: proxy_config::Upstream, +) { + let timeout = proxy_config.timeout; + let test_only_do_not_send_solution_to_tp = proxy_config + .test_only_do_not_send_solution_to_tp + .unwrap_or(false); + + // Format `Upstream` connection address + let mut parts = upstream_config.pool_address.split(':'); + let address = parts + .next() + .unwrap_or_else(|| panic!("Invalid pool address {}", upstream_config.pool_address)); + let port = parts + .next() + .and_then(|p| p.parse::().ok()) + .unwrap_or_else(|| panic!("Invalid pool address {}", upstream_config.pool_address)); + let upstream_addr = SocketAddr::new( + IpAddr::from_str(address) + .unwrap_or_else(|_| panic!("Invalid pool address {}", upstream_config.pool_address)), + port, + ); + + // When Downstream receive a share that meets bitcoin target it transformit in a + // SubmitSolution and send it to the TemplateReceiver + let (send_solution, recv_solution) = bounded(10); + + // Instantiate a new `Upstream` (SV2 Pool) + let upstream = match upstream_sv2::Upstream::new( + upstream_addr, + upstream_config.authority_pubkey, + 0, // TODO + upstream_config.pool_signature.clone(), + status::Sender::Upstream(tx_status.clone()), + task_collector.clone(), + Arc::new(Mutex::new(PoolChangerTrigger::new(timeout))), + ) + .await + { + Ok(upstream) => upstream, + Err(e) => { + error!("Failed to create upstream: {}", e); + panic!() } - - // Start receiving messages from the SV2 Upstream role - if let Err(e) = upstream_sv2::Upstream::parse_incoming(upstream.clone()) { - error!("failed to create sv2 parser: {}", e); + }; + + match upstream_sv2::Upstream::setup_connection( + upstream.clone(), + proxy_config.min_supported_version, + proxy_config.max_supported_version, + ) + .await + { + Ok(_) => info!("Connected to Upstream!"), + Err(e) => { + error!("Failed to connect to Upstream EXITING! : {}", e); panic!() } + } - // Format `Downstream` connection address - let downstream_addr = SocketAddr::new( - IpAddr::from_str(&proxy_config.downstream_address).unwrap(), - proxy_config.downstream_port, - ); - - // Initialize JD part - let mut parts = proxy_config.tp_address.split(':'); - let ip_tp = parts.next().unwrap().to_string(); - let port_tp = parts.next().unwrap().parse::().unwrap(); - - let mut parts = upstream_config.jd_address.split(':'); - let ip_jd = parts.next().unwrap().to_string(); - let port_jd = parts.next().unwrap().parse::().unwrap(); - let jd = match JobDeclarator::new( - SocketAddr::new(IpAddr::from_str(ip_jd.as_str()).unwrap(), port_jd), - upstream_config.authority_pubkey.into_bytes(), - proxy_config.clone(), - upstream.clone(), - task_collector.clone(), - ) - .await - { - Ok(c) => c, - Err(e) => { - let _ = tx_status - .send(status::Status { - state: status::State::UpstreamShutdown(e), - }) - .await; - return; - } - }; - - // Wait for downstream to connect - let downstream = downstream::listen_for_downstream_mining( - downstream_addr, - Some(upstream), - send_solution, - proxy_config.withhold, - proxy_config.authority_public_key, - proxy_config.authority_secret_key, - proxy_config.cert_validity_sec, - task_collector.clone(), - status::Sender::Downstream(tx_status.clone()), - vec![], - Some(jd.clone()), - ) - .await - .unwrap(); - - TemplateRx::connect( - SocketAddr::new(IpAddr::from_str(ip_tp.as_str()).unwrap(), port_tp), - recv_solution, - status::Sender::TemplateReceiver(tx_status.clone()), - Some(jd.clone()), - downstream, - task_collector, - Arc::new(Mutex::new(PoolChangerTrigger::new(timeout))), - vec![], - proxy_config.tp_authority_public_key, - test_only_do_not_send_solution_to_tp, - ) - .await; + // Start receiving messages from the SV2 Upstream role + if let Err(e) = upstream_sv2::Upstream::parse_incoming(upstream.clone()) { + error!("failed to create sv2 parser: {}", e); + panic!() } + + // Format `Downstream` connection address + let downstream_addr = SocketAddr::new( + IpAddr::from_str(&proxy_config.downstream_address).unwrap(), + proxy_config.downstream_port, + ); + + // Initialize JD part + let mut parts = proxy_config.tp_address.split(':'); + let ip_tp = parts.next().unwrap().to_string(); + let port_tp = parts.next().unwrap().parse::().unwrap(); + + let mut parts = upstream_config.jd_address.split(':'); + let ip_jd = parts.next().unwrap().to_string(); + let port_jd = parts.next().unwrap().parse::().unwrap(); + let jd = match JobDeclarator::new( + SocketAddr::new(IpAddr::from_str(ip_jd.as_str()).unwrap(), port_jd), + upstream_config.authority_pubkey.into_bytes(), + proxy_config.clone(), + upstream.clone(), + task_collector.clone(), + ) + .await + { + Ok(c) => c, + Err(e) => { + let _ = tx_status + .send(status::Status { + state: status::State::UpstreamShutdown(e), + }) + .await; + return; + } + }; + + // Wait for downstream to connect + let downstream = downstream::listen_for_downstream_mining( + downstream_addr, + Some(upstream), + send_solution, + proxy_config.withhold, + proxy_config.authority_public_key, + proxy_config.authority_secret_key, + proxy_config.cert_validity_sec, + task_collector.clone(), + status::Sender::Downstream(tx_status.clone()), + vec![], + Some(jd.clone()), + ) + .await + .unwrap(); + + TemplateRx::connect( + SocketAddr::new(IpAddr::from_str(ip_tp.as_str()).unwrap(), port_tp), + recv_solution, + status::Sender::TemplateReceiver(tx_status.clone()), + Some(jd.clone()), + downstream, + task_collector, + Arc::new(Mutex::new(PoolChangerTrigger::new(timeout))), + vec![], + proxy_config.tp_authority_public_key, + test_only_do_not_send_solution_to_tp, + ) + .await; } #[derive(Debug)] diff --git a/roles/jd-server/config-examples/jds-config-local-example.toml b/roles/jd-server/config-examples/jds-config-local-example.toml index dc8ce0555d..f10f5f6324 100644 --- a/roles/jd-server/config-examples/jds-config-local-example.toml +++ b/roles/jd-server/config-examples/jds-config-local-example.toml @@ -22,7 +22,7 @@ coinbase_outputs = [ listen_jd_address = "127.0.0.1:34264" # RPC config for mempool (it can be also the same TP if correctly configured) core_rpc_url = "http://127.0.0.1" -core_rpc_port = 48332 +core_rpc_port = 18332 core_rpc_user = "username" core_rpc_pass = "password" # Time interval used for JDS mempool update