Skip to content

Commit

Permalink
Merge pull request #1021 from johnnyasantoss/johnny/issue/484
Browse files Browse the repository at this point in the history
Graceful shutdown for mining-proxy
  • Loading branch information
pavlenex authored Jul 25, 2024
2 parents 2cbc88a + 5b5d28e commit f3c1e3d
Show file tree
Hide file tree
Showing 3 changed files with 111 additions and 133 deletions.
161 changes: 58 additions & 103 deletions roles/mining-proxy/src/lib/downstream_mining.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
#![allow(dead_code)]
use std::{convert::TryInto, sync::Arc};

use super::upstream_mining::{StdFrame as UpstreamFrame, UpstreamMiningNode};
use async_channel::{Receiver, SendError, Sender};
use tokio::{net::TcpListener, sync::oneshot::Receiver as TokioReceiver};
use tracing::{info, warn};

use codec_sv2::{StandardEitherFrame, StandardSv2Frame};
use network_helpers_sv2::plain_connection_tokio::PlainConnection;
use roles_logic_sv2::{
common_messages_sv2::{SetupConnection, SetupConnectionSuccess},
common_properties::{CommonDownstreamData, IsDownstream, IsMiningDownstream},
Expand All @@ -15,25 +19,23 @@ use roles_logic_sv2::{
routing_logic::MiningProxyRoutingLogic,
utils::Mutex,
};
use tracing::info;

use codec_sv2::{StandardEitherFrame, StandardSv2Frame};
use super::upstream_mining::{ProxyRemoteSelector, StdFrame as UpstreamFrame, UpstreamMiningNode};

pub type Message = MiningDeviceMessages<'static>;
pub type StdFrame = StandardSv2Frame<Message>;
pub type EitherFrame = StandardEitherFrame<Message>;

/// 1 to 1 connection with a downstream node that implement the mining (sub)protocol can be either
/// a mining device or a downstream proxy.
/// A downstream can only be linked with an upstream at a time. Support multi upstrems for
/// downstream do no make much sense.
/// A downstream can only be linked with an upstream at a time. Support multi upstreams for
/// downstream do not make much sense.
#[derive(Debug)]
pub struct DownstreamMiningNode {
id: u32,
receiver: Receiver<EitherFrame>,
sender: Sender<EitherFrame>,
pub status: DownstreamMiningNodeStatus,
pub prev_job_id: Option<u32>,
upstream: Option<Arc<Mutex<UpstreamMiningNode>>>,
}

Expand All @@ -47,22 +49,14 @@ pub enum DownstreamMiningNodeStatus {
#[derive(Debug, Clone)]
#[allow(clippy::enum_variant_names)]
pub enum Channel {
DowntreamHomUpstreamGroup {
DownstreamHomUpstreamGroup {
data: CommonDownstreamData,
channel_id: u32,
group_id: u32,
},
DowntreamHomUpstreamExtended {
DownstreamHomUpstreamExtended {
data: CommonDownstreamData,
channel_id: u32,
group_id: u32,
},
// Below variant is not supported cause do not have much sense
// DowntreamNonHomUpstreamGroup { data: CommonDownstreamData, group_ids: Vec<u32>, extended_ids: Vec<u32>},
DowntreamNonHomUpstreamExtended {
data: CommonDownstreamData,
group_ids: Vec<u32>,
extended_ids: Vec<u32>,
},
}

Expand Down Expand Up @@ -101,7 +95,7 @@ impl DownstreamMiningNodeStatus {
match self {
DownstreamMiningNodeStatus::Initializing => panic!(),
DownstreamMiningNodeStatus::Paired(data) => {
let channel = Channel::DowntreamHomUpstreamGroup {
let channel = Channel::DownstreamHomUpstreamGroup {
data: *data,
channel_id,
group_id,
Expand All @@ -113,50 +107,22 @@ impl DownstreamMiningNodeStatus {
}
}

fn open_channel_for_down_hom_up_extended(&mut self, channel_id: u32, group_id: u32) {
fn open_channel_for_down_hom_up_extended(&mut self, channel_id: u32, _group_id: u32) {
match self {
DownstreamMiningNodeStatus::Initializing => panic!(),
DownstreamMiningNodeStatus::Paired(data) => {
let channel = Channel::DowntreamHomUpstreamExtended {
let channel = Channel::DownstreamHomUpstreamExtended {
data: *data,
channel_id,
group_id,
};
let self_ = Self::ChannelOpened(channel);
let _ = std::mem::replace(self, self_);
}
DownstreamMiningNodeStatus::ChannelOpened(..) => panic!("Channel already opened"),
}
}

fn add_extended_from_non_hom_for_up_extended(&mut self, id: u32) {
match self {
DownstreamMiningNodeStatus::Initializing => panic!(),
DownstreamMiningNodeStatus::Paired(data) => {
let channel = Channel::DowntreamNonHomUpstreamExtended {
data: *data,
group_ids: vec![],
extended_ids: vec![id],
};
let self_ = Self::ChannelOpened(channel);
let _ = std::mem::replace(self, self_);
}
DownstreamMiningNodeStatus::ChannelOpened(
Channel::DowntreamNonHomUpstreamExtended { extended_ids, .. },
) => {
if !extended_ids.contains(&id) {
extended_ids.push(id)
}
}
_ => panic!(),
}
}
}

use core::convert::TryInto;
use std::sync::Arc;
use tokio::task;

impl PartialEq for DownstreamMiningNode {
fn eq(&self, other: &Self) -> bool {
self.id == other.id
Expand All @@ -177,16 +143,12 @@ impl DownstreamMiningNode {
self.status
.open_channel_for_down_hom_up_extended(channel_id, group_id);
}
pub fn add_extended_from_non_hom_for_up_extended(&mut self, id: u32) {
self.status.add_extended_from_non_hom_for_up_extended(id);
}

pub fn new(receiver: Receiver<EitherFrame>, sender: Sender<EitherFrame>, id: u32) -> Self {
Self {
receiver,
sender,
status: DownstreamMiningNodeStatus::Initializing,
prev_job_id: None,
upstream: None,
id,
}
Expand Down Expand Up @@ -316,7 +278,7 @@ impl DownstreamMiningNode {

pub fn exit(self_: Arc<Mutex<Self>>) {
if let Some(up) = self_.safe_lock(|s| s.upstream.clone()).unwrap() {
super::upstream_mining::UpstreamMiningNode::remove_dowstream(up, &self_);
UpstreamMiningNode::remove_dowstream(up, &self_);
};
self_
.safe_lock(|s| {
Expand All @@ -326,8 +288,6 @@ impl DownstreamMiningNode {
}
}

use super::upstream_mining::ProxyRemoteSelector;

/// It impl UpstreamMining cause the proxy act as an upstream node for the DownstreamMiningNode
impl
ParseDownstreamMiningMessages<
Expand Down Expand Up @@ -414,14 +374,14 @@ impl
match &self.status {
DownstreamMiningNodeStatus::Initializing => todo!(),
DownstreamMiningNodeStatus::Paired(_) => todo!(),
DownstreamMiningNodeStatus::ChannelOpened(Channel::DowntreamHomUpstreamGroup {
DownstreamMiningNodeStatus::ChannelOpened(Channel::DownstreamHomUpstreamGroup {
..
}) => {
let remote = self.upstream.as_ref().unwrap();
let message = Mining::SubmitSharesStandard(m);
Ok(SendTo::RelayNewMessageToRemote(remote.clone(), message))
}
DownstreamMiningNodeStatus::ChannelOpened(Channel::DowntreamHomUpstreamExtended {
DownstreamMiningNodeStatus::ChannelOpened(Channel::DownstreamHomUpstreamExtended {
..
}) => {
// Safe unwrap is channel have been opened it means that the dowsntream is paired
Expand All @@ -430,12 +390,6 @@ impl
let res = UpstreamMiningNode::handle_std_shr(remote.clone(), m).unwrap();
Ok(SendTo::Respond(res))
}
DownstreamMiningNodeStatus::ChannelOpened(
Channel::DowntreamNonHomUpstreamExtended { .. },
) => {
// unreachable cause the proxy do not support this kind of channel
unreachable!();
}
}
}

Expand Down Expand Up @@ -483,44 +437,48 @@ impl
}
}

use network_helpers_sv2::plain_connection_tokio::PlainConnection;
use std::net::SocketAddr;
use tokio::net::TcpListener;

pub async fn listen_for_downstream_mining(address: SocketAddr) {
info!("Listening for downstream mining connections on {}", address);
let listner = TcpListener::bind(address).await.unwrap();
pub async fn listen_for_downstream_mining(
listener: TcpListener,
mut shutdown_rx: TokioReceiver<()>,
) {
let mut ids = roles_logic_sv2::utils::Id::new();

while let Ok((stream, _)) = listner.accept().await {
let (receiver, sender): (Receiver<EitherFrame>, Sender<EitherFrame>) =
PlainConnection::new(stream).await;
let node = DownstreamMiningNode::new(receiver, sender, ids.next());

task::spawn(async move {
let mut incoming: StdFrame = node.receiver.recv().await.unwrap().try_into().unwrap();
let message_type = incoming.get_header().unwrap().msg_type();
let payload = incoming.payload();
let routing_logic = super::get_common_routing_logic();
let node = Arc::new(Mutex::new(node));

// Call handle_setup_connection or fail
match DownstreamMiningNode::handle_message_common(
node.clone(),
message_type,
payload,
routing_logic,
) {
Ok(SendToCommon::RelayNewMessageToRemote(_, message)) => {
let message = match message {
roles_logic_sv2::parsers::CommonMessages::SetupConnectionSuccess(m) => m,
_ => panic!(),
};
DownstreamMiningNode::start(node, message).await
loop {
tokio::select! {
accept_result = listener.accept() => {
let (stream, _) = accept_result.expect("failed to accept downstream connection");
let (receiver, sender): (Receiver<EitherFrame>, Sender<EitherFrame>) =
PlainConnection::new(stream).await;
let node = DownstreamMiningNode::new(receiver, sender, ids.next());

let mut incoming: StdFrame =
node.receiver.recv().await.unwrap().try_into().unwrap();
let message_type = incoming.get_header().unwrap().msg_type();
let payload = incoming.payload();
let routing_logic = super::get_common_routing_logic();
let node = Arc::new(Mutex::new(node));

// Call handle_setup_connection or fail
let common_msg = DownstreamMiningNode::handle_message_common(
node.clone(),
message_type,
payload,
routing_logic
).expect("failed to process downstream message");


if let SendToCommon::RelayNewMessageToRemote(_, relay_msg) = common_msg {
if let roles_logic_sv2::parsers::CommonMessages::SetupConnectionSuccess(setup_msg) = relay_msg {
DownstreamMiningNode::start(node, setup_msg).await;
}
} else {
warn!("Received unexpected message from downstream");
}
_ => panic!(),
}
});
_ = &mut shutdown_rx => {
info!("Closing listener");
return;
}
}
}
}

Expand All @@ -529,14 +487,11 @@ impl IsDownstream for DownstreamMiningNode {
match self.status {
DownstreamMiningNodeStatus::Initializing => panic!(),
DownstreamMiningNodeStatus::Paired(data) => data,
DownstreamMiningNodeStatus::ChannelOpened(Channel::DowntreamHomUpstreamGroup {
DownstreamMiningNodeStatus::ChannelOpened(Channel::DownstreamHomUpstreamGroup {
data,
..
}) => data,
DownstreamMiningNodeStatus::ChannelOpened(
Channel::DowntreamNonHomUpstreamExtended { data, .. },
) => data,
DownstreamMiningNodeStatus::ChannelOpened(Channel::DowntreamHomUpstreamExtended {
DownstreamMiningNodeStatus::ChannelOpened(Channel::DownstreamHomUpstreamExtended {
data,
..
}) => data,
Expand Down
35 changes: 17 additions & 18 deletions roles/mining-proxy/src/lib/upstream_mining.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
#![allow(dead_code)]

use super::EXTRANONCE_RANGE_1_LENGTH;
use roles_logic_sv2::utils::Id;
use core::convert::TryInto;
use std::{collections::HashMap, net::SocketAddr, sync::Arc, time::Duration};

use super::downstream_mining::{Channel, DownstreamMiningNode, StdFrame as DownstreamFrame};
use async_channel::{Receiver, SendError, Sender};
use async_recursion::async_recursion;
use nohash_hasher::BuildNoHashHasher;
use tokio::{net::TcpStream, task};
use tracing::{debug, error, info};

use codec_sv2::{HandshakeRole, Initiator, StandardEitherFrame, StandardSv2Frame};
use network_helpers_sv2::noise_connection_tokio::Connection;
use nohash_hasher::BuildNoHashHasher;
use roles_logic_sv2::{
channel_logic::{
channel_factory::{ExtendedChannelKind, OnNewShare, ProxyExtendedChannelFactory, Share},
Expand All @@ -26,14 +28,15 @@ use roles_logic_sv2::{
routing_logic::MiningProxyRoutingLogic,
selectors::{DownstreamMiningSelector, ProxyDownstreamMiningSelector as Prs},
template_distribution_sv2::SubmitSolution,
utils::{GroupId, Mutex},
utils::{GroupId, Id, Mutex},
};
use std::{collections::HashMap, sync::Arc};
use tokio::{net::TcpStream, task};
use tracing::error;

use stratum_common::bitcoin::TxOut;

use super::{
downstream_mining::{Channel, DownstreamMiningNode, StdFrame as DownstreamFrame},
EXTRANONCE_RANGE_1_LENGTH,
};

pub type Message = PoolMessages<'static>;
pub type StdFrame = StandardSv2Frame<Message>;
pub type EitherFrame = StandardEitherFrame<Message>;
Expand Down Expand Up @@ -188,10 +191,6 @@ pub struct UpstreamMiningNode {
reconnect: bool,
}

use core::convert::TryInto;
use std::{net::SocketAddr, time::Duration};
use tracing::{debug, info};

/// It assume that endpoint NEVER change flags and version!
/// I can open both extended and group channel with upstream.
impl UpstreamMiningNode {
Expand Down Expand Up @@ -471,11 +470,10 @@ impl UpstreamMiningNode {
super::downstream_mining::DownstreamMiningNodeStatus::ChannelOpened(
channel,
) => match channel {
Channel::DowntreamHomUpstreamGroup { channel_id, .. } => Some(*channel_id),
Channel::DowntreamHomUpstreamExtended { channel_id, .. } => {
Channel::DownstreamHomUpstreamGroup { channel_id, .. } => Some(*channel_id),
Channel::DownstreamHomUpstreamExtended { channel_id, .. } => {
Some(*channel_id)
}
Channel::DowntreamNonHomUpstreamExtended { .. } => todo!(),
},
})
.unwrap()
Expand Down Expand Up @@ -1048,7 +1046,7 @@ impl
.ok_or(Error::NoDownstreamsConnected)?;
for downstream in downstreams {
match downstream.safe_lock(|r| r.get_channel().clone()).unwrap() {
Channel::DowntreamHomUpstreamGroup {
Channel::DownstreamHomUpstreamGroup {
channel_id,
group_id,
..
Expand Down Expand Up @@ -1257,9 +1255,10 @@ impl IsMiningUpstream<DownstreamMiningNode, ProxyRemoteSelector> for UpstreamMin

#[cfg(test)]
mod tests {
use super::*;
use std::net::{IpAddr, Ipv4Addr};

use super::*;

#[test]
fn new_upstream_minining_node() {
let id = 0;
Expand Down
Loading

0 comments on commit f3c1e3d

Please sign in to comment.