diff --git a/roles/mining-proxy/src/lib/downstream_mining.rs b/roles/mining-proxy/src/lib/downstream_mining.rs index 1880551196..da8901c49c 100644 --- a/roles/mining-proxy/src/lib/downstream_mining.rs +++ b/roles/mining-proxy/src/lib/downstream_mining.rs @@ -1,7 +1,11 @@ -#![allow(dead_code)] +use std::{convert::TryInto, sync::Arc}; -use super::upstream_mining::{StdFrame as UpstreamFrame, UpstreamMiningNode}; use async_channel::{Receiver, SendError, Sender}; +use tokio::{net::TcpListener, sync::oneshot::Receiver as TokioReceiver}; +use tracing::{info, warn}; + +use codec_sv2::{StandardEitherFrame, StandardSv2Frame}; +use network_helpers_sv2::plain_connection_tokio::PlainConnection; use roles_logic_sv2::{ common_messages_sv2::{SetupConnection, SetupConnectionSuccess}, common_properties::{CommonDownstreamData, IsDownstream, IsMiningDownstream}, @@ -15,9 +19,8 @@ use roles_logic_sv2::{ routing_logic::MiningProxyRoutingLogic, utils::Mutex, }; -use tracing::info; -use codec_sv2::{StandardEitherFrame, StandardSv2Frame}; +use super::upstream_mining::{ProxyRemoteSelector, StdFrame as UpstreamFrame, UpstreamMiningNode}; pub type Message = MiningDeviceMessages<'static>; pub type StdFrame = StandardSv2Frame; @@ -25,15 +28,14 @@ pub type EitherFrame = StandardEitherFrame; /// 1 to 1 connection with a downstream node that implement the mining (sub)protocol can be either /// a mining device or a downstream proxy. -/// A downstream can only be linked with an upstream at a time. Support multi upstrems for -/// downstream do no make much sense. +/// A downstream can only be linked with an upstream at a time. Support multi upstreams for +/// downstream do not make much sense. #[derive(Debug)] pub struct DownstreamMiningNode { id: u32, receiver: Receiver, sender: Sender, pub status: DownstreamMiningNodeStatus, - pub prev_job_id: Option, upstream: Option>>, } @@ -47,22 +49,14 @@ pub enum DownstreamMiningNodeStatus { #[derive(Debug, Clone)] #[allow(clippy::enum_variant_names)] pub enum Channel { - DowntreamHomUpstreamGroup { + DownstreamHomUpstreamGroup { data: CommonDownstreamData, channel_id: u32, group_id: u32, }, - DowntreamHomUpstreamExtended { + DownstreamHomUpstreamExtended { data: CommonDownstreamData, channel_id: u32, - group_id: u32, - }, - // Below variant is not supported cause do not have much sense - // DowntreamNonHomUpstreamGroup { data: CommonDownstreamData, group_ids: Vec, extended_ids: Vec}, - DowntreamNonHomUpstreamExtended { - data: CommonDownstreamData, - group_ids: Vec, - extended_ids: Vec, }, } @@ -101,7 +95,7 @@ impl DownstreamMiningNodeStatus { match self { DownstreamMiningNodeStatus::Initializing => panic!(), DownstreamMiningNodeStatus::Paired(data) => { - let channel = Channel::DowntreamHomUpstreamGroup { + let channel = Channel::DownstreamHomUpstreamGroup { data: *data, channel_id, group_id, @@ -113,14 +107,13 @@ impl DownstreamMiningNodeStatus { } } - fn open_channel_for_down_hom_up_extended(&mut self, channel_id: u32, group_id: u32) { + fn open_channel_for_down_hom_up_extended(&mut self, channel_id: u32, _group_id: u32) { match self { DownstreamMiningNodeStatus::Initializing => panic!(), DownstreamMiningNodeStatus::Paired(data) => { - let channel = Channel::DowntreamHomUpstreamExtended { + let channel = Channel::DownstreamHomUpstreamExtended { data: *data, channel_id, - group_id, }; let self_ = Self::ChannelOpened(channel); let _ = std::mem::replace(self, self_); @@ -128,35 +121,8 @@ impl DownstreamMiningNodeStatus { DownstreamMiningNodeStatus::ChannelOpened(..) => panic!("Channel already opened"), } } - - fn add_extended_from_non_hom_for_up_extended(&mut self, id: u32) { - match self { - DownstreamMiningNodeStatus::Initializing => panic!(), - DownstreamMiningNodeStatus::Paired(data) => { - let channel = Channel::DowntreamNonHomUpstreamExtended { - data: *data, - group_ids: vec![], - extended_ids: vec![id], - }; - let self_ = Self::ChannelOpened(channel); - let _ = std::mem::replace(self, self_); - } - DownstreamMiningNodeStatus::ChannelOpened( - Channel::DowntreamNonHomUpstreamExtended { extended_ids, .. }, - ) => { - if !extended_ids.contains(&id) { - extended_ids.push(id) - } - } - _ => panic!(), - } - } } -use core::convert::TryInto; -use std::sync::Arc; -use tokio::task; - impl PartialEq for DownstreamMiningNode { fn eq(&self, other: &Self) -> bool { self.id == other.id @@ -177,16 +143,12 @@ impl DownstreamMiningNode { self.status .open_channel_for_down_hom_up_extended(channel_id, group_id); } - pub fn add_extended_from_non_hom_for_up_extended(&mut self, id: u32) { - self.status.add_extended_from_non_hom_for_up_extended(id); - } pub fn new(receiver: Receiver, sender: Sender, id: u32) -> Self { Self { receiver, sender, status: DownstreamMiningNodeStatus::Initializing, - prev_job_id: None, upstream: None, id, } @@ -316,7 +278,7 @@ impl DownstreamMiningNode { pub fn exit(self_: Arc>) { if let Some(up) = self_.safe_lock(|s| s.upstream.clone()).unwrap() { - super::upstream_mining::UpstreamMiningNode::remove_dowstream(up, &self_); + UpstreamMiningNode::remove_dowstream(up, &self_); }; self_ .safe_lock(|s| { @@ -326,8 +288,6 @@ impl DownstreamMiningNode { } } -use super::upstream_mining::ProxyRemoteSelector; - /// It impl UpstreamMining cause the proxy act as an upstream node for the DownstreamMiningNode impl ParseDownstreamMiningMessages< @@ -414,14 +374,14 @@ impl match &self.status { DownstreamMiningNodeStatus::Initializing => todo!(), DownstreamMiningNodeStatus::Paired(_) => todo!(), - DownstreamMiningNodeStatus::ChannelOpened(Channel::DowntreamHomUpstreamGroup { + DownstreamMiningNodeStatus::ChannelOpened(Channel::DownstreamHomUpstreamGroup { .. }) => { let remote = self.upstream.as_ref().unwrap(); let message = Mining::SubmitSharesStandard(m); Ok(SendTo::RelayNewMessageToRemote(remote.clone(), message)) } - DownstreamMiningNodeStatus::ChannelOpened(Channel::DowntreamHomUpstreamExtended { + DownstreamMiningNodeStatus::ChannelOpened(Channel::DownstreamHomUpstreamExtended { .. }) => { // Safe unwrap is channel have been opened it means that the dowsntream is paired @@ -430,12 +390,6 @@ impl let res = UpstreamMiningNode::handle_std_shr(remote.clone(), m).unwrap(); Ok(SendTo::Respond(res)) } - DownstreamMiningNodeStatus::ChannelOpened( - Channel::DowntreamNonHomUpstreamExtended { .. }, - ) => { - // unreachable cause the proxy do not support this kind of channel - unreachable!(); - } } } @@ -483,44 +437,48 @@ impl } } -use network_helpers_sv2::plain_connection_tokio::PlainConnection; -use std::net::SocketAddr; -use tokio::net::TcpListener; - -pub async fn listen_for_downstream_mining(address: SocketAddr) { - info!("Listening for downstream mining connections on {}", address); - let listner = TcpListener::bind(address).await.unwrap(); +pub async fn listen_for_downstream_mining( + listener: TcpListener, + mut shutdown_rx: TokioReceiver<()>, +) { let mut ids = roles_logic_sv2::utils::Id::new(); - - while let Ok((stream, _)) = listner.accept().await { - let (receiver, sender): (Receiver, Sender) = - PlainConnection::new(stream).await; - let node = DownstreamMiningNode::new(receiver, sender, ids.next()); - - task::spawn(async move { - let mut incoming: StdFrame = node.receiver.recv().await.unwrap().try_into().unwrap(); - let message_type = incoming.get_header().unwrap().msg_type(); - let payload = incoming.payload(); - let routing_logic = super::get_common_routing_logic(); - let node = Arc::new(Mutex::new(node)); - - // Call handle_setup_connection or fail - match DownstreamMiningNode::handle_message_common( - node.clone(), - message_type, - payload, - routing_logic, - ) { - Ok(SendToCommon::RelayNewMessageToRemote(_, message)) => { - let message = match message { - roles_logic_sv2::parsers::CommonMessages::SetupConnectionSuccess(m) => m, - _ => panic!(), - }; - DownstreamMiningNode::start(node, message).await + loop { + tokio::select! { + accept_result = listener.accept() => { + let (stream, _) = accept_result.expect("failed to accept downstream connection"); + let (receiver, sender): (Receiver, Sender) = + PlainConnection::new(stream).await; + let node = DownstreamMiningNode::new(receiver, sender, ids.next()); + + let mut incoming: StdFrame = + node.receiver.recv().await.unwrap().try_into().unwrap(); + let message_type = incoming.get_header().unwrap().msg_type(); + let payload = incoming.payload(); + let routing_logic = super::get_common_routing_logic(); + let node = Arc::new(Mutex::new(node)); + + // Call handle_setup_connection or fail + let common_msg = DownstreamMiningNode::handle_message_common( + node.clone(), + message_type, + payload, + routing_logic + ).expect("failed to process downstream message"); + + + if let SendToCommon::RelayNewMessageToRemote(_, relay_msg) = common_msg { + if let roles_logic_sv2::parsers::CommonMessages::SetupConnectionSuccess(setup_msg) = relay_msg { + DownstreamMiningNode::start(node, setup_msg).await; + } + } else { + warn!("Received unexpected message from downstream"); } - _ => panic!(), } - }); + _ = &mut shutdown_rx => { + info!("Closing listener"); + return; + } + } } } @@ -529,14 +487,11 @@ impl IsDownstream for DownstreamMiningNode { match self.status { DownstreamMiningNodeStatus::Initializing => panic!(), DownstreamMiningNodeStatus::Paired(data) => data, - DownstreamMiningNodeStatus::ChannelOpened(Channel::DowntreamHomUpstreamGroup { + DownstreamMiningNodeStatus::ChannelOpened(Channel::DownstreamHomUpstreamGroup { data, .. }) => data, - DownstreamMiningNodeStatus::ChannelOpened( - Channel::DowntreamNonHomUpstreamExtended { data, .. }, - ) => data, - DownstreamMiningNodeStatus::ChannelOpened(Channel::DowntreamHomUpstreamExtended { + DownstreamMiningNodeStatus::ChannelOpened(Channel::DownstreamHomUpstreamExtended { data, .. }) => data, diff --git a/roles/mining-proxy/src/lib/upstream_mining.rs b/roles/mining-proxy/src/lib/upstream_mining.rs index e3f6eef999..5ad012d10e 100644 --- a/roles/mining-proxy/src/lib/upstream_mining.rs +++ b/roles/mining-proxy/src/lib/upstream_mining.rs @@ -1,14 +1,16 @@ #![allow(dead_code)] -use super::EXTRANONCE_RANGE_1_LENGTH; -use roles_logic_sv2::utils::Id; +use core::convert::TryInto; +use std::{collections::HashMap, net::SocketAddr, sync::Arc, time::Duration}; -use super::downstream_mining::{Channel, DownstreamMiningNode, StdFrame as DownstreamFrame}; use async_channel::{Receiver, SendError, Sender}; use async_recursion::async_recursion; +use nohash_hasher::BuildNoHashHasher; +use tokio::{net::TcpStream, task}; +use tracing::{debug, error, info}; + use codec_sv2::{HandshakeRole, Initiator, StandardEitherFrame, StandardSv2Frame}; use network_helpers_sv2::noise_connection_tokio::Connection; -use nohash_hasher::BuildNoHashHasher; use roles_logic_sv2::{ channel_logic::{ channel_factory::{ExtendedChannelKind, OnNewShare, ProxyExtendedChannelFactory, Share}, @@ -26,14 +28,15 @@ use roles_logic_sv2::{ routing_logic::MiningProxyRoutingLogic, selectors::{DownstreamMiningSelector, ProxyDownstreamMiningSelector as Prs}, template_distribution_sv2::SubmitSolution, - utils::{GroupId, Mutex}, + utils::{GroupId, Id, Mutex}, }; -use std::{collections::HashMap, sync::Arc}; -use tokio::{net::TcpStream, task}; -use tracing::error; - use stratum_common::bitcoin::TxOut; +use super::{ + downstream_mining::{Channel, DownstreamMiningNode, StdFrame as DownstreamFrame}, + EXTRANONCE_RANGE_1_LENGTH, +}; + pub type Message = PoolMessages<'static>; pub type StdFrame = StandardSv2Frame; pub type EitherFrame = StandardEitherFrame; @@ -188,10 +191,6 @@ pub struct UpstreamMiningNode { reconnect: bool, } -use core::convert::TryInto; -use std::{net::SocketAddr, time::Duration}; -use tracing::{debug, info}; - /// It assume that endpoint NEVER change flags and version! /// I can open both extended and group channel with upstream. impl UpstreamMiningNode { @@ -471,11 +470,10 @@ impl UpstreamMiningNode { super::downstream_mining::DownstreamMiningNodeStatus::ChannelOpened( channel, ) => match channel { - Channel::DowntreamHomUpstreamGroup { channel_id, .. } => Some(*channel_id), - Channel::DowntreamHomUpstreamExtended { channel_id, .. } => { + Channel::DownstreamHomUpstreamGroup { channel_id, .. } => Some(*channel_id), + Channel::DownstreamHomUpstreamExtended { channel_id, .. } => { Some(*channel_id) } - Channel::DowntreamNonHomUpstreamExtended { .. } => todo!(), }, }) .unwrap() @@ -1048,7 +1046,7 @@ impl .ok_or(Error::NoDownstreamsConnected)?; for downstream in downstreams { match downstream.safe_lock(|r| r.get_channel().clone()).unwrap() { - Channel::DowntreamHomUpstreamGroup { + Channel::DownstreamHomUpstreamGroup { channel_id, group_id, .. @@ -1257,9 +1255,10 @@ impl IsMiningUpstream for UpstreamMin #[cfg(test)] mod tests { - use super::*; use std::net::{IpAddr, Ipv4Addr}; + use super::*; + #[test] fn new_upstream_minining_node() { let id = 0; diff --git a/roles/mining-proxy/src/main.rs b/roles/mining-proxy/src/main.rs index 0725c189b1..5931990ac5 100644 --- a/roles/mining-proxy/src/main.rs +++ b/roles/mining-proxy/src/main.rs @@ -3,7 +3,7 @@ //! Downstream means another proxy or a mining device //! //! UpstreamMining is the trait that a proxy must implement in order to -//! understant Downstream mining messages. +//! understand Downstream mining messages. //! //! DownstreamMining is the trait that a proxy must implement in order to //! understand Upstream mining messages @@ -18,12 +18,15 @@ //! A Downstream that signal the incapacity to handle group channels can open only one channel. //! #![allow(special_module_name)] -mod lib; +use std::{net::SocketAddr, sync::Arc}; + +use tokio::{net::TcpListener, sync::oneshot}; +use tracing::{error, info}; use lib::Config; use roles_logic_sv2::utils::{GroupId, Mutex}; -use std::{net::SocketAddr, sync::Arc}; -use tracing::{error, info}; + +mod lib; mod args { use std::path::PathBuf; @@ -89,12 +92,12 @@ mod args { } /// 1. the proxy scan all the upstreams and map them -/// 2. donwstream open a connetcion with proxy +/// 2. downstream open a connection with proxy /// 3. downstream send SetupConnection -/// 4. a mining_channle::Upstream is created +/// 4. a mining_channels::Upstream is created /// 5. upstream_mining::UpstreamMiningNodes is used to pair this downstream with the most suitable /// upstream -/// 6. mining_channle::Upstream create a new downstream_mining::DownstreamMiningNode embedding +/// 6. mining_channels::Upstream create a new downstream_mining::DownstreamMiningNode embedding /// itself in it /// 7. normal operation between the paired downstream_mining::DownstreamMiningNode and /// upstream_mining::UpstreamMiningNode begin @@ -126,16 +129,37 @@ async fn main() { lib::initialize_r_logic(&config.upstreams, group_id, config.clone()).await, )) .expect("BUG: Failed to set ROUTING_LOGIC"); - info!("PROXY INITIALIZING"); + + info!("Initializing upstream scanner"); lib::initialize_upstreams(config.min_supported_version, config.max_supported_version).await; - info!("PROXY INITIALIZED"); + info!("Initializing downstream listener"); - // Wait for downstream connection let socket = SocketAddr::new( config.listen_address.parse().unwrap(), config.listen_mining_port, ); + let listener = TcpListener::bind(socket).await.unwrap(); + + info!("Listening for downstream mining connections on {}", socket); + + let (shutdown_tx, shutdown_rx) = oneshot::channel(); + + let (_, res) = tokio::join!( + // Wait for downstream connection + lib::downstream_mining::listen_for_downstream_mining(listener, shutdown_rx), + // handle SIGTERM/QUIT / ctrl+c + tokio::spawn(async { + tokio::signal::ctrl_c() + .await + .expect("Failed to listen to signals"); + let _ = shutdown_tx.send(()); + info!("Interrupt received"); + }) + ); + + if let Err(e) = res { + panic!("Failed to wait for clean exit: {:?}", e); + } - info!("PROXY INITIALIZED"); - crate::lib::downstream_mining::listen_for_downstream_mining(socket).await + info!("Shutdown done"); }