Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: create importer online component #1541

Merged
merged 59 commits into from
Jul 31, 2024
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
59 commits
Select commit Hold shift + click to select a range
3dbe1da
initial commit
gabriel-aranha-cw Jul 25, 2024
e22407a
temp fix
gabriel-aranha-cw Jul 25, 2024
e443aa7
add test
gabriel-aranha-cw Jul 25, 2024
3c35bc8
Merge branch 'main' into importer-component
gabriel-aranha-cw Jul 25, 2024
eeea4cc
todo: fix consensus and chain
gabriel-aranha-cw Jul 25, 2024
cf5eec2
fix chain
gabriel-aranha-cw Jul 25, 2024
356ced3
fix build errors
gabriel-aranha-cw Jul 25, 2024
5b6dd49
Merge branch 'main' into importer-component
gabriel-aranha-cw Jul 26, 2024
84eeccd
fix modes
gabriel-aranha-cw Jul 26, 2024
c97c691
lint
gabriel-aranha-cw Jul 26, 2024
1ddc779
fix
gabriel-aranha-cw Jul 26, 2024
da3dd54
rename
gabriel-aranha-cw Jul 26, 2024
3f6a381
fix
gabriel-aranha-cw Jul 26, 2024
014686a
fix chain
gabriel-aranha-cw Jul 26, 2024
09a2775
rename
gabriel-aranha-cw Jul 26, 2024
c484bfb
refac and implement simpleconsensus
carneiro-cw Jul 26, 2024
fe03ff4
add simple consensus into main
gabriel-aranha-cw Jul 26, 2024
02aa0cc
lint
gabriel-aranha-cw Jul 26, 2024
79f66a8
Merge branch 'main' into importer-component
gabriel-aranha-cw Jul 26, 2024
b682c62
fix: remove optional http url
gabriel-aranha-cw Jul 26, 2024
49aedae
Merge branch 'importer-component' of https://github.com/cloudwalk/str…
gabriel-aranha-cw Jul 26, 2024
0d09c25
fix: review
gabriel-aranha-cw Jul 26, 2024
d85b0cf
temp fix
gabriel-aranha-cw Jul 26, 2024
a4c2b86
comment
gabriel-aranha-cw Jul 26, 2024
6821f5a
Merge branch 'main' into importer-component
gabriel-aranha-cw Jul 26, 2024
a0a038b
Merge branch 'main' into importer-component
gabriel-aranha-cw Jul 29, 2024
5719ed4
fix init
gabriel-aranha-cw Jul 29, 2024
facad2f
change to spawn named
gabriel-aranha-cw Jul 29, 2024
2a57d9c
add blockchain client graceful shutdown
gabriel-aranha-cw Jul 29, 2024
7bc4092
improve shutdown warnings
gabriel-aranha-cw Jul 29, 2024
f0e9995
improve shutdown
gabriel-aranha-cw Jul 29, 2024
8628c54
small refactoring
gabriel-aranha-cw Jul 29, 2024
e30cc9f
revert optional args and add validations
gabriel-aranha-cw Jul 29, 2024
f70dd4a
revert
gabriel-aranha-cw Jul 29, 2024
6f8e0d4
fix
gabriel-aranha-cw Jul 29, 2024
b01fb3d
Merge branch 'main' into importer-component
gabriel-aranha-cw Jul 30, 2024
4b8ffc9
fix just file
gabriel-aranha-cw Jul 30, 2024
9342f01
remove unused test
gabriel-aranha-cw Jul 30, 2024
9d121b8
Merge branch 'main' into importer-component
gabriel-aranha-cw Jul 30, 2024
e442359
remove default flag
gabriel-aranha-cw Jul 30, 2024
43fab58
Merge branch 'importer-component' of https://github.com/cloudwalk/str…
gabriel-aranha-cw Jul 30, 2024
5755508
fix missing flag
gabriel-aranha-cw Jul 30, 2024
8c3fe8a
fix missing flags
gabriel-aranha-cw Jul 30, 2024
4e9cd7a
Merge branch 'main' into importer-component
gabriel-aranha-cw Jul 30, 2024
4b04de0
moving mode config var to stratus config
gabriel-aranha-cw Jul 30, 2024
f8ec01c
Merge branch 'main' into importer-component
gabriel-aranha-cw Jul 30, 2024
86f383e
Merge branch 'main' into importer-component
gabriel-aranha-cw Jul 30, 2024
f8df837
Merge branch 'main' into importer-component
gabriel-aranha-cw Jul 31, 2024
8d03a9d
fix raft
gabriel-aranha-cw Jul 31, 2024
dd24964
improve validation
gabriel-aranha-cw Jul 31, 2024
7d29875
naming
gabriel-aranha-cw Jul 31, 2024
a69ab65
remove unused
gabriel-aranha-cw Jul 31, 2024
b8bf98f
remove unused
gabriel-aranha-cw Jul 31, 2024
d708b42
comments
gabriel-aranha-cw Jul 31, 2024
2892741
add ws check
gabriel-aranha-cw Jul 31, 2024
337f8e5
lint
gabriel-aranha-cw Jul 31, 2024
b70c6f9
fmt
gabriel-aranha-cw Jul 31, 2024
8adb599
Merge branch 'main' into importer-component
gabriel-aranha-cw Jul 31, 2024
1047336
Merge branch 'main' into importer-component
gabriel-aranha-cw Jul 31, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
name: E2E Run With Importer
name: E2E Importer

on:
pull_request:
Expand Down Expand Up @@ -26,10 +26,10 @@ on:
- 'Cargo.toml'

jobs:
run_with_importer_test:
importer_test:
strategy:
fail-fast: false
name: E2E Run With Importer on BRLCToken
name: E2E Importer on BRLCToken
runs-on: ubuntu-latest
timeout-minutes: 45

Expand Down
8 changes: 0 additions & 8 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -155,14 +155,6 @@ path = "src/bin/rpc_downloader.rs"
name = "importer-offline"
path = "src/bin/importer_offline.rs"

[[bin]]
name = "importer-online"
path = "src/bin/importer_online.rs"

[[bin]]
name = "run-with-importer"
path = "src/bin/run_with_importer.rs"

# ------------------------------------------------------------------------------
# Features
# ------------------------------------------------------------------------------
Expand Down
2 changes: 1 addition & 1 deletion e2e/cloudwalk-contracts/integration/test/helpers/rpc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ export let ETHERJS = new JsonRpcProvider(providerUrl, undefined);

export function updateProviderUrl(providerName: string) {
switch (providerName) {
case "run-with-importer":
case "importer":
providerUrl = "http://localhost:3001?app=e2e";
break;
case "stratus":
Expand Down
54 changes: 27 additions & 27 deletions e2e/cloudwalk-contracts/integration/test/importer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import {
updateProviderUrl,
} from "./helpers/rpc";

describe("Run With Importer integration test", function () {
describe("Importer integration test", function () {
before(async function () {
await setDeployer();
});
Expand Down Expand Up @@ -92,7 +92,7 @@ describe("Run With Importer integration test", function () {
}
});

it(`${params.name}: Validate transaction mined delay between Stratus and Run With Importer`, async function () {
it(`${params.name}: Validate transaction mined delay between Stratus and Importer`, async function () {
// Get Stratus timestamps
updateProviderUrl("stratus");
const stratusTimestamps = await Promise.all(
Expand All @@ -103,9 +103,9 @@ describe("Run With Importer integration test", function () {
}),
);

// Get Run With Importer timestamps
updateProviderUrl("run-with-importer");
const runWithImporterTimestamps = await Promise.all(
// Get Importer timestamps
updateProviderUrl("importer");
const importerTimestamps = await Promise.all(
txHashList.map(async (txHash) => {
const receipt = await sendWithRetry("eth_getTransactionReceipt", [txHash], 20);
const block = await sendWithRetry("eth_getBlockByNumber", [receipt.blockNumber, false]);
Expand All @@ -116,17 +116,17 @@ describe("Run With Importer integration test", function () {
// Total time it took for Stratus to process all the blocks containing transactions
const stratusProcessingTime = stratusTimestamps[stratusTimestamps.length - 1] - stratusTimestamps[0];

// Total time it took for Run With Importer to process all the blocks containing transactions
const runWithImporterProcessingTime =
runWithImporterTimestamps[runWithImporterTimestamps.length - 1] - runWithImporterTimestamps[0];
// Total time it took for Importer to process all the blocks containing transactions
const importerProcessingTime =
importerTimestamps[importerTimestamps.length - 1] - importerTimestamps[0];

console.log(` ✔ Number of transactions sent: ${txHashList.length}`);
console.log(
` ✔ Stratus processing time: ${stratusProcessingTime}s | Run With Importer processing time: ${runWithImporterProcessingTime}s`,
` ✔ Stratus processing time: ${stratusProcessingTime}s | Importer processing time: ${importerProcessingTime}s`,
);
});

it(`${params.name}: Validate all transactions were imported from Stratus to Run With Importer`, async function () {
it(`${params.name}: Validate all transactions were imported from Stratus to Importer`, async function () {
// Get Stratus transaction receipts
updateProviderUrl("stratus");
const stratusReceipts = await Promise.all(
Expand All @@ -136,9 +136,9 @@ describe("Run With Importer integration test", function () {
}),
);

// Get Run With Importer transaction receipts
updateProviderUrl("run-with-importer");
const runWithImporterReceipts = await Promise.all(
// Get Importer transaction receipts
updateProviderUrl("importer");
const importerReceipts = await Promise.all(
txHashList.map(async (txHash) => {
const receipt = await sendWithRetry("eth_getTransactionReceipt", [txHash]);
return receipt;
Expand All @@ -148,11 +148,11 @@ describe("Run With Importer integration test", function () {
// Assert that all transactions were imported
for (let i = 0; i < txHashList.length; i++) {
expect(stratusReceipts[i]).to.exist;
expect(runWithImporterReceipts[i]).to.exist;
expect(importerReceipts[i]).to.exist;
}
});

it(`${params.name}: Validate each transaction was imported into the same block between Stratus and Run With Importer`, async function () {
it(`${params.name}: Validate each transaction was imported into the same block between Stratus and Importer`, async function () {
// Get Stratus block numbers
updateProviderUrl("stratus");
const stratusBlockNumbers = await Promise.all(
Expand All @@ -162,38 +162,38 @@ describe("Run With Importer integration test", function () {
}),
);

// Get Run With Importer block numbers
updateProviderUrl("run-with-importer");
const runWithImporterBlockNumbers = await Promise.all(
// Get Importer block numbers
updateProviderUrl("importer");
const importerBlockNumbers = await Promise.all(
txHashList.map(async (txHash) => {
const receipt = await sendWithRetry("eth_getTransactionReceipt", [txHash], 20);
return receipt.blockNumber;
}),
);

// Assert that each transaction fell into the same block between Stratus and Run With Importer
// Assert that each transaction fell into the same block between Stratus and Importer
for (let i = 0; i < txHashList.length; i++) {
expect(
stratusBlockNumbers[i],
`Transaction ${txHashList[i]} did not fall into the same block between Stratus and Run With Importer`,
).to.equal(runWithImporterBlockNumbers[i]);
`Transaction ${txHashList[i]} did not fall into the same block between Stratus and Importer`,
).to.equal(importerBlockNumbers[i]);
}
});

it(`${params.name}: Validate balances between Stratus and Run With Importer`, async function () {
it(`${params.name}: Validate balances between Stratus and Importer`, async function () {
for (let i = 0; i < wallets.length; i++) {
// Get Stratus balance
updateProviderUrl("stratus");
const stratusBalance = await brlcToken.balanceOf(wallets[i].address);

// Get Run With Importer balance
updateProviderUrl("run-with-importer");
const runWithImporterBalance = await brlcToken.balanceOf(wallets[i].address);
// Get Importer balance
updateProviderUrl("importer");
const importerBalance = await brlcToken.balanceOf(wallets[i].address);

// Assert that the balances are equal
expect(stratusBalance).to.equal(
runWithImporterBalance,
`Wallet ${wallets[i].address} balances are not equal between Stratus and Run With Importer`,
importerBalance,
`Wallet ${wallets[i].address} balances are not equal between Stratus and Importer`,
);
}
updateProviderUrl("stratus");
Expand Down
24 changes: 10 additions & 14 deletions justfile
Original file line number Diff line number Diff line change
Expand Up @@ -312,22 +312,22 @@ e2e-importer-online:
e2e-importer-online-up:
#!/bin/bash

# Build Stratus and Run With Importer binaries
echo "Building Stratus and Run With Importer binaries"
cargo build --release --bin stratus --bin run-with-importer --features dev
# Build Stratus binary
echo "Building Stratus binary"
cargo build --release --bin stratus --features dev

mkdir e2e_logs

# Start Stratus binary
RUST_LOG=info cargo run --release --bin stratus --features dev -- --block-mode 1s --enable-genesis --enable-test-accounts --perm-storage=rocks --rocks-path-prefix=temp_3000 --tokio-console-address=0.0.0.0:6668 --metrics-exporter-address=0.0.0.0:9000 -a 0.0.0.0:3000 > e2e_logs/stratus.log &
# Start Stratus with leader flag
RUST_LOG=info cargo run --release --bin stratus --features dev -- --mode leader --block-mode 1s --enable-genesis --enable-test-accounts --perm-storage=rocks --rocks-path-prefix=temp_3000 --tokio-console-address=0.0.0.0:6668 --metrics-exporter-address=0.0.0.0:9000 -a 0.0.0.0:3000 > e2e_logs/stratus.log &

# Wait for Stratus to start
# Wait for Stratus with leader flag to start
wait-service --tcp 0.0.0.0:3000 -t {{ wait_service_timeout }} -- echo

# Start Run With Importer binary
RUST_LOG=info cargo run --release --bin run-with-importer --features dev -- --block-mode 1s --enable-test-accounts --perm-storage=rocks --rocks-path-prefix=temp_3001 --tokio-console-address=0.0.0.0:6669 --metrics-exporter-address=0.0.0.0:9001 -a 0.0.0.0:3001 -r http://0.0.0.0:3000/ -w ws://0.0.0.0:3000/ > e2e_logs/run_with_importer.log &
# Start Stratus with follower flag
RUST_LOG=info cargo run --release --bin stratus --features dev -- --mode follower --enable-test-accounts --perm-storage=rocks --rocks-path-prefix=temp_3001 --tokio-console-address=0.0.0.0:6669 --metrics-exporter-address=0.0.0.0:9001 -a 0.0.0.0:3001 -r http://0.0.0.0:3000/ -w ws://0.0.0.0:3000/ > e2e_logs/importer.log &

# Wait for Run With Importer to start
# Wait for Stratus with follower flag to start
wait-service --tcp 0.0.0.0:3001 -t {{ wait_service_timeout }} -- echo

if [ -d e2e/cloudwalk-contracts ]; then
Expand All @@ -349,12 +349,8 @@ e2e-importer-online-up:
e2e-importer-online-down:
#!/bin/bash

# Kill run-with-importer
killport 3001
run_with_importer_pid=$(pgrep -f 'run-with-importer')
kill $run_with_importer_pid

# Kill Stratus
killport 3001
killport 3000
stratus_pid=$(pgrep -f 'stratus')
kill $stratus_pid
Expand Down
2 changes: 1 addition & 1 deletion src/bin/rpc_downloader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ async fn run(config: RpcDownloaderConfig) -> anyhow::Result<()> {
let _timer = DropTimer::start("rpc-downloader");

let rpc_storage = config.rpc_storage.init().await?;
let chain = Arc::new(BlockchainClient::new_http(&config.external_rpc, config.external_rpc_timeout).await?);
let chain = Arc::new(BlockchainClient::new_http(Some(&config.external_rpc), config.external_rpc_timeout).await?);

let block_end = match config.block_end {
Some(end) => BlockNumber::from(end),
Expand Down
78 changes: 0 additions & 78 deletions src/bin/run_with_importer.rs

This file was deleted.

35 changes: 35 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use tokio::runtime::Builder;
use tokio::runtime::Runtime;

use crate::eth::executor::ExecutorConfig;
use crate::eth::importer::ImporterConfig;
use crate::eth::miner::MinerConfig;
use crate::eth::primitives::Address;
use crate::eth::rpc::RpcServerConfig;
Expand Down Expand Up @@ -63,6 +64,10 @@ pub struct CommonConfig {
#[arg(long = "env", env = "ENV", default_value = "local")]
pub env: Environment,

/// Stratus mode.
#[arg(long = "mode", env = "MODE", default_value = "leader")] // should we leave a default value?
pub mode: StratusMode,

/// Number of threads to execute global async tasks.
#[arg(long = "async-threads", env = "ASYNC_THREADS", default_value = "10")]
pub num_async_threads: usize,
Expand Down Expand Up @@ -169,6 +174,9 @@ pub struct StratusConfig {
#[clap(flatten)]
pub miner: MinerConfig,

#[clap(flatten)]
pub importer: ImporterConfig,
gabriel-aranha-cw marked this conversation as resolved.
Show resolved Hide resolved

#[deref]
#[clap(flatten)]
pub common: CommonConfig,
Expand Down Expand Up @@ -453,6 +461,33 @@ impl FromStr for Environment {
}
}

// -----------------------------------------------------------------------------
// Enum: Stratus Mode
// -----------------------------------------------------------------------------
#[derive(DebugAsJson, strum::Display, strum::VariantNames, Clone, Copy, Parser, serde::Serialize)]
pub enum StratusMode {
gabriel-aranha-cw marked this conversation as resolved.
Show resolved Hide resolved
#[serde(rename = "leader")]
#[strum(to_string = "leader")]
Leader,

#[serde(rename = "follower")]
#[strum(to_string = "follower")]
Follower,
}

impl FromStr for StratusMode {
type Err = anyhow::Error;

fn from_str(s: &str) -> anyhow::Result<Self, Self::Err> {
let s = s.trim().to_lowercase();
match s.as_ref() {
"leader" => Ok(Self::Leader),
"follower" => Ok(Self::Follower),
s => Err(anyhow!("unknown stratus mode: \"{}\" - valid values are {:?}", s, StratusMode::VARIANTS)),
}
}
}

// -----------------------------------------------------------------------------
// Enum: ValidatorMethodConfig
// -----------------------------------------------------------------------------
Expand Down
6 changes: 4 additions & 2 deletions src/eth/consensus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,7 @@ impl Consensus {
tracing::info!(http_url = http_url, "changing blockchain client");

*blockchain_client_lock = Some(
BlockchainClient::new_http(&http_url, Duration::from_secs(2))
BlockchainClient::new_http(Some(&http_url), Duration::from_secs(2))
.await
.expect("failed to create blockchain client")
.into(),
Expand Down Expand Up @@ -499,7 +499,9 @@ impl Consensus {
#[cfg(feature = "metrics")]
metrics::inc_consensus_forward(start.elapsed());

Ok((result.tx_hash, blockchain_client.http_url.clone())) //XXX HEX
let http_url = blockchain_client.http_url.clone().ok_or(anyhow::anyhow!("HTTP URL is not set"))?;

Ok((result.tx_hash, http_url)) //XXX HEX
}

pub async fn should_serve(&self) -> bool {
Expand Down
Loading