Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Integration tests cleanup #1297

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions .github/workflows/integration-tests.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
on:
push:
branches:
- main
pull_request:
branches:
- main

name: Integration Tests

jobs:
ci:
runs-on: ${{ matrix.os }}
strategy:
matrix:
os:
- ubuntu-latest
include:
- os: ubuntu-latest
target: x86_64-unknown-linux-musl
plebhash marked this conversation as resolved.
Show resolved Hide resolved

steps:
- name: Use stable toolchain
uses: actions/checkout@v4
with:
profile: minimal
toolchain: stable
override: true

- name: Roles Integration Tests
run: |
cargo test --manifest-path=roles/Cargo.toml --verbose --test '*' -- --nocapture
4 changes: 0 additions & 4 deletions .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,6 @@ jobs:
cargo build --manifest-path=roles/Cargo.toml
cargo build --manifest-path=utils/Cargo.toml

- name: Roles Integration Tests
run: |
cargo test --manifest-path=roles/Cargo.toml --verbose --test '*' -- --nocapture

- name: Run sv1-client-and-server example
run: |
cargo run --manifest-path=examples/sv1-client-and-server/Cargo.toml --bin client_and_server -- 60
Expand Down
1 change: 1 addition & 0 deletions roles/jd-client/src/lib/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ pub static IS_NEW_TEMPLATE_HANDLED: AtomicBool = AtomicBool::new(true);
/// switching to backup Pools in case of declared custom jobs refused by JDS (which is Pool side).
/// As a solution of last-resort, it is able to switch to Solo Mining until new safe Pools appear
/// in the market.
#[derive(Debug, Clone)]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don’t understand the need to make the configuration structure clonable. Why are we returning configuration objects downstream? While this approach does make all our starter APIs consistent, I don’t see any other benefit. Am I missing something?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right that none of them is currently used beside the Sniffer. I think in the future some of those roles will have some functionality that we might need while testing, thats the main motivation for including them

pub struct JobDeclaratorClient {
/// Configuration of the proxy server [`JobDeclaratorClient`] is connected to.
config: ProxyConfig,
Expand Down
1 change: 1 addition & 0 deletions roles/jd-server/src/lib/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ pub type Message = JdsMessages<'static>;
pub type StdFrame = StandardSv2Frame<Message>;
pub type EitherFrame = StandardEitherFrame<Message>;

#[derive(Debug, Clone)]
pub struct JobDeclaratorServer {
config: Configuration,
}
Expand Down
26 changes: 26 additions & 0 deletions roles/tests-integration/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# SV2 Integration Tests

This is a test crate and it can be used in order to test the behavior of different roles when
working together. Each role should have a `start_[role_name]` function under `common` folder that
can be called in order to run the role. In order to assert the behavior of the role or the messages
it exchanges with other roles, you can use the `Sniffer` helper in order to listen to the messages
exchanged between the roles, and assert those messages using the `assert_message_[message_type]`
function. For examples on how to use the `Sniffer` helper, you can check the
`sniffer_integration.rs` module or other tests in the `tests` folder.

plebhash marked this conversation as resolved.
Show resolved Hide resolved
All of our tests run in regtest network. We download the Template Provider node from
https://github.com/Sjors/bitcoin/releases/download. This is a pre-built binary that we use to run an
Stratum V2 compatible bitcoin node. Note that this is the only external dependency(and Role) that we
have in our tests.

## Running Instructions

In order to run the integration tests, you can use the following command:

```bash
$ git clone [email protected]:stratum-mining/stratum.git
$ cargo test --manifest-path=roles/Cargo.toml --verbose --test '*' -- --nocapture
```

## License
MIT OR Apache-2.0
55 changes: 30 additions & 25 deletions roles/tests-integration/tests/common/mod.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
mod sniffer;
pub(crate) mod sniffer;

use bitcoind::{bitcoincore_rpc::RpcApi, BitcoinD, Conf};
use flate2::read::GzDecoder;
use jd_client::JobDeclaratorClient;
use jd_server::JobDeclaratorServer;
use key_utils::{Secp256k1PublicKey, Secp256k1SecretKey};
use once_cell::sync::Lazy;
use pool_sv2::PoolSv2;
use rand::{thread_rng, Rng};
use sniffer::Sniffer;
pub use sniffer::{InterceptMessage, MessageDirection};
use std::{
collections::HashSet,
convert::{TryFrom, TryInto},
Expand All @@ -20,6 +21,7 @@ use std::{
sync::Mutex,
};
use tar::Archive;
use translator_sv2::TranslatorSv2;

// prevents get_available_port from ever returning the same port twice
static UNIQUE_PORTS: Lazy<Mutex<HashSet<u16>>> = Lazy::new(|| Mutex::new(HashSet::new()));
Expand Down Expand Up @@ -187,11 +189,11 @@ pub fn get_available_address() -> SocketAddr {

pub async fn start_sniffer(
identifier: String,
listening_address: SocketAddr,
upstream: SocketAddr,
check_on_drop: bool,
intercept_message: Option<Vec<InterceptMessage>>,
) -> Sniffer {
intercept_message: Option<Vec<sniffer::InterceptMessage>>,
) -> (Sniffer, SocketAddr) {
let listening_address = get_available_address();
let sniffer = Sniffer::new(
identifier,
listening_address,
Expand All @@ -204,7 +206,7 @@ pub async fn start_sniffer(
tokio::spawn(async move {
sniffer_clone.start().await;
});
sniffer
(sniffer, listening_address)
}

#[derive(Debug)]
Expand Down Expand Up @@ -267,32 +269,31 @@ impl TestPoolSv2 {
}
}

pub async fn start_pool(
listening_address: Option<SocketAddr>,
template_provider_address: Option<SocketAddr>,
) -> PoolSv2 {
let test_pool = TestPoolSv2::new(listening_address, template_provider_address);
pub async fn start_pool(template_provider_address: Option<SocketAddr>) -> (PoolSv2, SocketAddr) {
let listening_address = get_available_address();
let test_pool = TestPoolSv2::new(Some(listening_address), template_provider_address);
let pool = test_pool.pool.clone();
let pool_clone = pool.clone();
tokio::task::spawn(async move {
assert!(pool_clone.start().await.is_ok());
});
// Wait a bit to let the pool exchange initial messages with the TP
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
pool
(pool, listening_address)
}

pub async fn start_template_provider(tp_port: u16) -> TemplateProvider {
let template_provider = TemplateProvider::start(tp_port);
pub async fn start_template_provider() -> (TemplateProvider, SocketAddr) {
let address = get_available_address();
let template_provider = TemplateProvider::start(address.port());
template_provider.generate_blocks(16);
template_provider
(template_provider, address)
}

pub async fn start_jdc(
pool_address: SocketAddr,
tp_address: SocketAddr,
jds_address: SocketAddr,
) -> SocketAddr {
) -> (JobDeclaratorClient, SocketAddr) {
use jd_client::proxy_config::{
CoinbaseOutput, PoolConfig, ProtocolConfig, ProxyConfig, TPConfig, Upstream,
};
Expand Down Expand Up @@ -343,13 +344,14 @@ pub async fn start_jdc(
std::time::Duration::from_secs(cert_validity_sec),
);
let ret = jd_client::JobDeclaratorClient::new(jd_client_proxy);
tokio::spawn(async move { ret.start().await });
let ret_clone = ret.clone();
tokio::spawn(async move { ret_clone.start().await });
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This ret can be changed to job_declarator_client

tokio::time::sleep(std::time::Duration::from_secs(2)).await;
jdc_address
(ret, jdc_address)
}

pub async fn start_jds(tp_address: SocketAddr) -> SocketAddr {
use jd_server::{CoinbaseOutput, Configuration, CoreRpc, JobDeclaratorServer};
pub async fn start_jds(tp_address: SocketAddr) -> (JobDeclaratorServer, SocketAddr) {
use jd_server::{CoinbaseOutput, Configuration, CoreRpc};
let authority_public_key = Secp256k1PublicKey::try_from(
"9auqWEzQDVyd2oe1JVGFLMLHZtCo2FFqZwtKA5gd9xbuEu7PH72".to_string(),
)
Expand Down Expand Up @@ -379,14 +381,16 @@ pub async fn start_jds(tp_address: SocketAddr) -> SocketAddr {
core_rpc,
std::time::Duration::from_secs(1),
);
let job_declarator_server = JobDeclaratorServer::new(config);
let job_declarator_server_clone = job_declarator_server.clone();
tokio::spawn(async move {
JobDeclaratorServer::new(config).start().await;
job_declarator_server_clone.start().await;
});
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
listen_jd_address
(job_declarator_server, listen_jd_address)
}

pub async fn start_sv2_translator(upstream: SocketAddr) -> SocketAddr {
pub async fn start_sv2_translator(upstream: SocketAddr) -> (TranslatorSv2, SocketAddr) {
let upstream_address = upstream.ip().to_string();
let upstream_port = upstream.port();
let upstream_authority_pubkey = Secp256k1PublicKey::try_from(
Expand Down Expand Up @@ -428,11 +432,12 @@ pub async fn start_sv2_translator(upstream: SocketAddr) -> SocketAddr {
let config =
translator_sv2::proxy_config::ProxyConfig::new(upstream_conf, downstream_conf, 2, 2, 8);
let translator_v2 = translator_sv2::TranslatorSv2::new(config);
let clone_translator_v2 = translator_v2.clone();
tokio::spawn(async move {
translator_v2.start().await;
clone_translator_v2.start().await;
});
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
listening_address
(translator_v2, listening_address)
}

fn measure_hashrate(duration_secs: u64) -> f64 {
Expand Down
51 changes: 34 additions & 17 deletions roles/tests-integration/tests/common/sniffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,10 @@ impl Sniffer {
check_on_drop: bool,
intercept_messages: Option<Vec<InterceptMessage>>,
) -> Self {
// Don't print backtrace on panic
std::panic::set_hook(Box::new(|_| {
println!();
}));
Self {
identifier,
listening_address,
Expand Down Expand Up @@ -576,23 +580,36 @@ macro_rules! assert_jd_message {
impl Drop for Sniffer {
fn drop(&mut self) {
if self.check_on_drop {
// Don't print backtrace on panic
std::panic::set_hook(Box::new(|_| {
println!();
}));
if !self.messages_from_downstream.is_empty() {
println!(
"Sniffer {}: You didn't handle all downstream messages: {:?}",
self.identifier, self.messages_from_downstream
);
panic!();
}
if !self.messages_from_upstream.is_empty() {
println!(
"Sniffer{}: You didn't handle all upstream messages: {:?}",
self.identifier, self.messages_from_upstream
);
panic!();
match (
self.messages_from_downstream.is_empty(),
self.messages_from_upstream.is_empty(),
) {
(true, true) => {}
(true, false) => {
println!(
"Sniffer {}: You didn't handle all upstream messages: {:?}",
self.identifier, self.messages_from_upstream
);
panic!();
}
(false, true) => {
println!(
"Sniffer {}: You didn't handle all downstream messages: {:?}",
self.identifier, self.messages_from_downstream
);
panic!();
}
(false, false) => {
println!(
"Sniffer {}: You didn't handle all downstream messages: {:?}",
self.identifier, self.messages_from_downstream
);
println!(
"Sniffer {}: You didn't handle all upstream messages: {:?}",
self.identifier, self.messages_from_upstream
);
panic!();
}
}
}
}
Expand Down
61 changes: 4 additions & 57 deletions roles/tests-integration/tests/pool_integration.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,7 @@
mod common;

use std::convert::TryInto;

use common::{InterceptMessage, MessageDirection};
use const_sv2::MESSAGE_TYPE_SETUP_CONNECTION_ERROR;
use roles_logic_sv2::{
common_messages_sv2::{Protocol, SetupConnection, SetupConnectionError},
common_messages_sv2::{Protocol, SetupConnection},
parsers::{CommonMessages, PoolMessages, TemplateDistribution},
};

Expand All @@ -15,22 +11,9 @@ use roles_logic_sv2::{
// Pool will connect to the Sniffer, and the Sniffer will connect to the Template Provider.
#[tokio::test]
async fn success_pool_template_provider_connection() {
let sniffer_addr = common::get_available_address();
let tp_addr = common::get_available_address();
let pool_addr = common::get_available_address();
let _tp = common::start_template_provider(tp_addr.port()).await;
let sniffer_identifier =
"success_pool_template_provider_connection tp_pool sniffer".to_string();
let sniffer_check_on_drop = true;
let sniffer = common::start_sniffer(
sniffer_identifier,
sniffer_addr,
tp_addr,
sniffer_check_on_drop,
None,
)
.await;
let _ = common::start_pool(Some(pool_addr), Some(sniffer_addr)).await;
let (_tp, tp_addr) = common::start_template_provider().await;
let (sniffer, sniffer_addr) = common::start_sniffer("".to_string(), tp_addr, true, None).await;
let _ = common::start_pool(Some(sniffer_addr)).await;
// here we assert that the downstream(pool in this case) have sent `SetupConnection` message
// with the correct parameters, protocol, flags, min_version and max_version. Note that the
// macro can take any number of arguments after the message argument, but the order is
Expand Down Expand Up @@ -58,39 +41,3 @@ async fn success_pool_template_provider_connection() {
assert_tp_message!(&sniffer.next_message_from_upstream(), NewTemplate);
assert_tp_message!(sniffer.next_message_from_upstream(), SetNewPrevHash);
}

#[tokio::test]
async fn test_sniffer_interrupter() {
let sniffer_addr = common::get_available_address();
let tp_addr = common::get_available_address();
let pool_addr = common::get_available_address();
let _tp = common::start_template_provider(tp_addr.port()).await;
use const_sv2::MESSAGE_TYPE_SETUP_CONNECTION_SUCCESS;
let message =
PoolMessages::Common(CommonMessages::SetupConnectionError(SetupConnectionError {
flags: 0,
error_code: "unsupported-feature-flags"
.to_string()
.into_bytes()
.try_into()
.unwrap(),
}));
let interrupt_msgs = InterceptMessage::new(
MessageDirection::ToDownstream,
MESSAGE_TYPE_SETUP_CONNECTION_SUCCESS,
message,
MESSAGE_TYPE_SETUP_CONNECTION_ERROR,
true,
);
let sniffer = common::start_sniffer(
"1".to_string(),
sniffer_addr,
tp_addr,
false,
Some(vec![interrupt_msgs]),
)
.await;
let _ = common::start_pool(Some(pool_addr), Some(sniffer_addr)).await;
assert_common_message!(&sniffer.next_message_from_downstream(), SetupConnection);
assert_common_message!(&sniffer.next_message_from_upstream(), SetupConnectionError);
}
Loading
Loading