Skip to content

Commit

Permalink
Align all integration test start_* ..
Browse files Browse the repository at this point in the history
functions return signature
  • Loading branch information
jbesraa committed Dec 16, 2024
1 parent f217bcb commit 006d19e
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 65 deletions.
50 changes: 28 additions & 22 deletions roles/tests-integration/tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ pub(crate) mod sniffer;

use bitcoind::{bitcoincore_rpc::RpcApi, BitcoinD, Conf};
use flate2::read::GzDecoder;
use jd_client::JobDeclaratorClient;
use jd_server::JobDeclaratorServer;
use key_utils::{Secp256k1PublicKey, Secp256k1SecretKey};
use once_cell::sync::Lazy;
use pool_sv2::PoolSv2;
Expand All @@ -19,6 +21,7 @@ use std::{
sync::Mutex,
};
use tar::Archive;
use translator_sv2::TranslatorSv2;

// prevents get_available_port from ever returning the same port twice
static UNIQUE_PORTS: Lazy<Mutex<HashSet<u16>>> = Lazy::new(|| Mutex::new(HashSet::new()));
Expand Down Expand Up @@ -186,11 +189,11 @@ pub fn get_available_address() -> SocketAddr {

pub async fn start_sniffer(
identifier: String,
listening_address: SocketAddr,
upstream: SocketAddr,
check_on_drop: bool,
intercept_message: Option<Vec<sniffer::InterceptMessage>>,
) -> Sniffer {
) -> (Sniffer, SocketAddr) {
let listening_address = get_available_address();
let sniffer = Sniffer::new(
identifier,
listening_address,
Expand All @@ -203,7 +206,7 @@ pub async fn start_sniffer(
tokio::spawn(async move {
sniffer_clone.start().await;
});
sniffer
(sniffer, listening_address)
}

#[derive(Debug)]
Expand Down Expand Up @@ -266,32 +269,31 @@ impl TestPoolSv2 {
}
}

pub async fn start_pool(
listening_address: Option<SocketAddr>,
template_provider_address: Option<SocketAddr>,
) -> PoolSv2 {
let test_pool = TestPoolSv2::new(listening_address, template_provider_address);
pub async fn start_pool(template_provider_address: Option<SocketAddr>) -> (PoolSv2, SocketAddr) {
let listening_address = get_available_address();
let test_pool = TestPoolSv2::new(Some(listening_address), template_provider_address);
let pool = test_pool.pool.clone();
let pool_clone = pool.clone();
tokio::task::spawn(async move {
assert!(pool_clone.start().await.is_ok());
});
// Wait a bit to let the pool exchange initial messages with the TP
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
pool
(pool, listening_address)
}

pub async fn start_template_provider(tp_port: u16) -> TemplateProvider {
let template_provider = TemplateProvider::start(tp_port);
pub async fn start_template_provider() -> (TemplateProvider, SocketAddr) {
let address = get_available_address();
let template_provider = TemplateProvider::start(address.port());
template_provider.generate_blocks(16);
template_provider
(template_provider, address)
}

pub async fn start_jdc(
pool_address: SocketAddr,
tp_address: SocketAddr,
jds_address: SocketAddr,
) -> SocketAddr {
) -> (JobDeclaratorClient, SocketAddr) {
use jd_client::proxy_config::{
CoinbaseOutput, PoolConfig, ProtocolConfig, ProxyConfig, TPConfig, Upstream,
};
Expand Down Expand Up @@ -342,13 +344,14 @@ pub async fn start_jdc(
std::time::Duration::from_secs(cert_validity_sec),
);
let ret = jd_client::JobDeclaratorClient::new(jd_client_proxy);
tokio::spawn(async move { ret.start().await });
let ret_clone = ret.clone();
tokio::spawn(async move { ret_clone.start().await });
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
jdc_address
(ret, jdc_address)
}

pub async fn start_jds(tp_address: SocketAddr) -> SocketAddr {
use jd_server::{CoinbaseOutput, Configuration, CoreRpc, JobDeclaratorServer};
pub async fn start_jds(tp_address: SocketAddr) -> (JobDeclaratorServer, SocketAddr) {
use jd_server::{CoinbaseOutput, Configuration, CoreRpc};
let authority_public_key = Secp256k1PublicKey::try_from(
"9auqWEzQDVyd2oe1JVGFLMLHZtCo2FFqZwtKA5gd9xbuEu7PH72".to_string(),
)
Expand Down Expand Up @@ -378,14 +381,16 @@ pub async fn start_jds(tp_address: SocketAddr) -> SocketAddr {
core_rpc,
std::time::Duration::from_secs(1),
);
let job_declarator_server = JobDeclaratorServer::new(config);
let job_declarator_server_clone = job_declarator_server.clone();
tokio::spawn(async move {
JobDeclaratorServer::new(config).start().await;
job_declarator_server_clone.start().await;
});
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
listen_jd_address
(job_declarator_server, listen_jd_address)
}

pub async fn start_sv2_translator(upstream: SocketAddr) -> SocketAddr {
pub async fn start_sv2_translator(upstream: SocketAddr) -> (TranslatorSv2, SocketAddr) {
let upstream_address = upstream.ip().to_string();
let upstream_port = upstream.port();
let upstream_authority_pubkey = Secp256k1PublicKey::try_from(
Expand Down Expand Up @@ -427,11 +432,12 @@ pub async fn start_sv2_translator(upstream: SocketAddr) -> SocketAddr {
let config =
translator_sv2::proxy_config::ProxyConfig::new(upstream_conf, downstream_conf, 2, 2, 8);
let translator_v2 = translator_sv2::TranslatorSv2::new(config);
let clone_translator_v2 = translator_v2.clone();
tokio::spawn(async move {
translator_v2.start().await;
clone_translator_v2.start().await;
});
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
listening_address
(translator_v2, listening_address)
}

fn measure_hashrate(duration_secs: u64) -> f64 {
Expand Down
19 changes: 3 additions & 16 deletions roles/tests-integration/tests/pool_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,22 +11,9 @@ use roles_logic_sv2::{
// Pool will connect to the Sniffer, and the Sniffer will connect to the Template Provider.
#[tokio::test]
async fn success_pool_template_provider_connection() {
let sniffer_addr = common::get_available_address();
let tp_addr = common::get_available_address();
let pool_addr = common::get_available_address();
let _tp = common::start_template_provider(tp_addr.port()).await;
let sniffer_identifier =
"success_pool_template_provider_connection tp_pool sniffer".to_string();
let sniffer_check_on_drop = true;
let sniffer = common::start_sniffer(
sniffer_identifier,
sniffer_addr,
tp_addr,
sniffer_check_on_drop,
None,
)
.await;
let _ = common::start_pool(Some(pool_addr), Some(sniffer_addr)).await;
let (_tp, tp_addr) = common::start_template_provider().await;
let (sniffer, sniffer_addr) = common::start_sniffer("".to_string(), tp_addr, true, None).await;
let _ = common::start_pool(Some(sniffer_addr)).await;
// here we assert that the downstream(pool in this case) have sent `SetupConnection` message
// with the correct parameters, protocol, flags, min_version and max_version. Note that the
// macro can take any number of arguments after the message argument, but the order is
Expand Down
17 changes: 4 additions & 13 deletions roles/tests-integration/tests/sniffer_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,7 @@ use roles_logic_sv2::{

#[tokio::test]
async fn test_sniffer_interrupter() {
let sniffer_addr = common::get_available_address();
let tp_addr = common::get_available_address();
let pool_addr = common::get_available_address();
let _tp = common::start_template_provider(tp_addr.port()).await;
let (_tp, tp_addr) = common::start_template_provider().await;
use const_sv2::MESSAGE_TYPE_SETUP_CONNECTION_SUCCESS;
let message =
PoolMessages::Common(CommonMessages::SetupConnectionError(SetupConnectionError {
Expand All @@ -32,15 +29,9 @@ async fn test_sniffer_interrupter() {
MESSAGE_TYPE_SETUP_CONNECTION_ERROR,
true,
);
let sniffer = common::start_sniffer(
"1".to_string(),
sniffer_addr,
tp_addr,
false,
Some(vec![interrupt_msgs]),
)
.await;
let _ = common::start_pool(Some(pool_addr), Some(sniffer_addr)).await;
let (sniffer, sniffer_addr) =
common::start_sniffer("".to_string(), tp_addr, false, Some(vec![interrupt_msgs])).await;
let _ = common::start_pool(Some(sniffer_addr)).await;
assert_common_message!(&sniffer.next_message_from_downstream(), SetupConnection);
assert_common_message!(&sniffer.next_message_from_upstream(), SetupConnectionError);
}
19 changes: 5 additions & 14 deletions roles/tests-integration/tests/translator_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,11 @@ use roles_logic_sv2::parsers::{CommonMessages, Mining, PoolMessages};
// shares.
#[tokio::test]
async fn translation_proxy() {
let pool_translator_sniffer_addr = common::get_available_address();
let tp_addr = common::get_available_address();
let pool_addr = common::get_available_address();
let pool_translator_sniffer = common::start_sniffer(
"0".to_string(),
pool_translator_sniffer_addr,
pool_addr,
false,
None,
)
.await;
let _tp = common::start_template_provider(tp_addr.port()).await;
let _pool = common::start_pool(Some(pool_addr), Some(tp_addr)).await;
let tproxy_addr = common::start_sv2_translator(pool_translator_sniffer_addr).await;
let (_tp, tp_addr) = common::start_template_provider().await;
let (_pool, pool_addr) = common::start_pool(Some(tp_addr)).await;
let (pool_translator_sniffer, pool_translator_sniffer_addr) =
common::start_sniffer("0".to_string(), pool_addr, false, None).await;
let (_, tproxy_addr) = common::start_sv2_translator(pool_translator_sniffer_addr).await;
let _mining_device = common::start_mining_device_sv1(tproxy_addr).await;
pool_translator_sniffer
.wait_for_message_type(MessageDirection::ToUpstream, MESSAGE_TYPE_SETUP_CONNECTION)
Expand Down

0 comments on commit 006d19e

Please sign in to comment.