From 64daad0ca0034513af11d6d9207dfe8fe779e1bf Mon Sep 17 00:00:00 2001 From: gabriel-aranha-cw <166405807+gabriel-aranha-cw@users.noreply.github.com> Date: Wed, 31 Jul 2024 16:55:21 -0300 Subject: [PATCH] feat: create importer online component (#1541) * initial commit * temp fix * add test * todo: fix consensus and chain * fix chain * fix build errors * fix modes * lint * fix * rename * fix * fix chain * rename * refac and implement simpleconsensus * add simple consensus into main * lint * fix: remove optional http url * fix: review * temp fix * comment * fix init * change to spawn named * add blockchain client graceful shutdown * improve shutdown warnings * improve shutdown * small refactoring * revert optional args and add validations * revert * fix * fix just file * remove unused test * remove default flag * fix missing flag * fix missing flags * moving mode config var to stratus config * fix raft * improve validation * naming * remove unused * remove unused * comments * add ws check * lint * fmt --------- Co-authored-by: Daniel Freire --- .github/workflows/e2e-generic.yml | 95 --- ...run-with-importer.yml => e2e-importer.yml} | 6 +- Cargo.toml | 8 - chaos/experiments/main.sh | 345 --------- .../integration/test/helpers/rpc.ts | 2 +- .../integration/test/importer.test.ts | 54 +- .../integration/test/simple.test.ts | 114 --- justfile | 68 +- src/bin/importer_online.rs | 420 ----------- src/bin/run_with_importer.rs | 78 -- src/config.rs | 132 +--- src/eth/consensus/mod.rs | 669 +---------------- .../{ => raft}/append_log_entries_storage.rs | 2 +- src/eth/consensus/{ => raft}/discovery.rs | 6 +- src/eth/consensus/{ => raft}/log_entry.rs | 0 src/eth/consensus/raft/mod.rs | 683 ++++++++++++++++++ src/eth/consensus/{ => raft}/propagation.rs | 36 +- src/eth/consensus/{ => raft}/server.rs | 34 +- .../consensus/{ => raft}/tests/factories.rs | 38 +- .../{ => raft}/tests/test_simple_blocks.rs | 14 +- src/eth/consensus/{ => raft}/utils.rs | 0 src/eth/consensus/simple_consensus/mod.rs | 75 ++ src/eth/importer/importer.rs | 437 +++++++++++ src/eth/importer/importer_config.rs | 67 ++ src/eth/importer/mod.rs | 6 + src/eth/miner/miner.rs | 7 +- src/eth/mod.rs | 1 + src/eth/primitives/block_header.rs | 2 +- src/eth/primitives/transaction_execution.rs | 6 +- src/eth/rpc/rpc_context.rs | 2 +- src/eth/rpc/rpc_server.rs | 11 +- src/globals.rs | 10 - .../blockchain_client/blockchain_client.rs | 6 + src/infra/build_info.rs | 21 +- src/main.rs | 64 +- 35 files changed, 1467 insertions(+), 2052 deletions(-) delete mode 100644 .github/workflows/e2e-generic.yml rename .github/workflows/{e2e-run-with-importer.yml => e2e-importer.yml} (95%) delete mode 100755 chaos/experiments/main.sh delete mode 100644 e2e/cloudwalk-contracts/integration/test/simple.test.ts delete mode 100644 src/bin/importer_online.rs delete mode 100644 src/bin/run_with_importer.rs rename src/eth/consensus/{ => raft}/append_log_entries_storage.rs (99%) rename src/eth/consensus/{ => raft}/discovery.rs (97%) rename src/eth/consensus/{ => raft}/log_entry.rs (100%) create mode 100644 src/eth/consensus/raft/mod.rs rename src/eth/consensus/{ => raft}/propagation.rs (92%) rename src/eth/consensus/{ => raft}/server.rs (97%) rename src/eth/consensus/{ => raft}/tests/factories.rs (89%) rename src/eth/consensus/{ => raft}/tests/test_simple_blocks.rs (95%) rename src/eth/consensus/{ => raft}/utils.rs (100%) create mode 100644 src/eth/consensus/simple_consensus/mod.rs create mode 100644 src/eth/importer/importer.rs create mode 100644 src/eth/importer/importer_config.rs create mode 100644 src/eth/importer/mod.rs diff --git a/.github/workflows/e2e-generic.yml b/.github/workflows/e2e-generic.yml deleted file mode 100644 index 60240e05d..000000000 --- a/.github/workflows/e2e-generic.yml +++ /dev/null @@ -1,95 +0,0 @@ -name: E2E Generic - -on: - pull_request: - branches: - - '*' - paths-ignore: - - '.github/workflows/deploy.yml' - - '.github/workflows/docs-release.yml' - - '.github/workflows/outdated.yml' - - '.github/workflows/comment-tag-report.yml' - - '.github/workflows/pr-agent.yml' - - '.github/workflows/build-binary.yml' - - '.github/CODEOWNERS' - - 'config/**' - - 'README.md' - - 'LICENSE' - - 'CONTRIBUTING.md' - - 'utils/slack-notifiers/**' - workflow_dispatch: - -jobs: - e2e_generic: - strategy: - fail-fast: false - matrix: - include: - # - leader-restart: true - # instances: 2 - # iterations: 1 - # - leader-restart: true - # instances: 3 - # iterations: 1 - # - leader-restart: false - # instances: 2 - # iterations: 1 - # - leader-restart: false - # instances: 3 - # iterations: 1 - - leader-restart: true - instances: 1 - iterations: 2 - - name: E2E Generic with (${{ matrix.instances }}, ${{ matrix.leader-restart }}) - runs-on: ubuntu-latest - timeout-minutes: 45 - - concurrency: - group: ${{ github.workflow }}-${{ matrix.leader-restart }}-${{ matrix.instances }}-${{ matrix.iterations }}-${{ github.ref || github.run_id }} - cancel-in-progress: true - - steps: - - name: Checkout code - uses: actions/checkout@v4 - - - name: Set up Rust - run: curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y --default-toolchain 1.79 - - - name: Rust Cache - uses: Swatinem/rust-cache@v2 - id: cache-cargo - with: - prefix-key: ${{ runner.os }}-v3-cargo - shared-key: stable-release - key: ${{ hashFiles('Cargo.lock', 'Cargo.toml') }} - cache-provider: "github" - cache-directories: "~/.cargo/bin/" - - - name: Install protoc - run: sudo apt-get install -y protobuf-compiler - - - name: Install jq - run: sudo apt-get install jq -y - - - name: Set up Just - uses: extractions/setup-just@v2 - - - name: Set up dependencies - if: ${{ steps.cache-cargo.outputs.cache-hit != 'true' }} - run: | - cargo install killport || true - cargo install wait-service || true - - - name: Clone all contracts - run: just contracts-clone --token - - - name: Flatten all contracts - run: just contracts-flatten --token - - - name: Run e2e tests - run: just run-chaos-experiment stratus ${{ matrix.instances }} ${{ matrix.iterations }} ${{ matrix.leader-restart }} main - env: - CARGO_PROFILE_RELEASE_DEBUG: 0 - RUST_LOG: error - RELEASE: 1 diff --git a/.github/workflows/e2e-run-with-importer.yml b/.github/workflows/e2e-importer.yml similarity index 95% rename from .github/workflows/e2e-run-with-importer.yml rename to .github/workflows/e2e-importer.yml index cec094538..e61d77ba0 100644 --- a/.github/workflows/e2e-run-with-importer.yml +++ b/.github/workflows/e2e-importer.yml @@ -1,4 +1,4 @@ -name: E2E Run With Importer +name: E2E Importer on: pull_request: @@ -26,10 +26,10 @@ on: - 'Cargo.toml' jobs: - run_with_importer_test: + importer_test: strategy: fail-fast: false - name: E2E Run With Importer on BRLCToken + name: E2E Importer on BRLCToken runs-on: ubuntu-latest timeout-minutes: 45 diff --git a/Cargo.toml b/Cargo.toml index fd222dda1..1bd97254a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -156,14 +156,6 @@ path = "src/bin/rpc_downloader.rs" name = "importer-offline" path = "src/bin/importer_offline.rs" -[[bin]] -name = "importer-online" -path = "src/bin/importer_online.rs" - -[[bin]] -name = "run-with-importer" -path = "src/bin/run_with_importer.rs" - # ------------------------------------------------------------------------------ # Features # ------------------------------------------------------------------------------ diff --git a/chaos/experiments/main.sh b/chaos/experiments/main.sh deleted file mode 100755 index cb527ce94..000000000 --- a/chaos/experiments/main.sh +++ /dev/null @@ -1,345 +0,0 @@ -#!/bin/bash - -set -e - -# Default binary -binary="stratus" - -# Default number of instances -num_instances=3 - -# Default number of iterations -iterations=4 - -# Flag for enabling leader restart feature -enable_leader_restart=false - -# Parse command-line options -while [[ "$#" -gt 0 ]]; do - case $1 in - --bin) - binary="$2" - shift 2 - ;; - --instances) - num_instances="$2" - shift 2 - ;; - --iterations) - iterations="$2" - shift 2 - ;; - --enable-leader-restart) - if [[ "$2" == "true" || "$2" == "false" ]]; then - enable_leader_restart="$2" - shift 2 - else - echo "Error: --enable-leader-restart must be followed by true or false" - exit 1 - fi - ;; - *) - echo "Unknown parameter passed: $1" - echo "Usage: $0 [--bin binary] [--instances number] [--iterations number] [--enable-leader-restart true|false]" - exit 1 - ;; - esac -done - -echo "Using binary: $binary" -echo "Number of instances: $num_instances" -echo "Number of iterations: $iterations" -echo "Enable leader restart: $enable_leader_restart" - -cleanup() { - exit_code=$? - echo "Cleaning up..." - for port in "${ports[@]}"; do - killport --quiet $port || true - done - - sleep 2 - if [ $exit_code -eq 0 ]; then - rm instance_*.log 2>/dev/null || true - find . -xdev -type d -name "instance_*" -print0 | xargs -0 rm -rf 2>/dev/null || true - fi - rm -rf tmp_rocks_* 2>/dev/null || true - find . -xdev -type d -name "tmp_rocks_*" -print0 | xargs -0 rm -rf 2>/dev/null || true - echo "Job is done. With exit code $exit_code" -} -trap cleanup EXIT INT TERM - -send_transactions() { - echo "running transactions test" - - export STRATUS_PORT=3001 #TODO implement roundrobin on all nodes - cd ./e2e/cloudwalk-contracts/integration - npx hardhat test test/simple.test.ts --network stratus --bail - test_exit_code=$? - cd ../../.. - - if [ $test_exit_code -eq 0 ]; then - echo "Test contracts executed successfully." - else - echo "Test contracts failed with exit code $test_exit_code." - exit 1 - fi -} - -# Function to start an instance -start_instance() { - local address=$1 - local grpc_address=$2 - local rocks_path_prefix=$3 - local log_file=$4 - local candidate_peers=$5 - local tokio_console_address=$6 - local metrics_exporter_address=$7 - - RUST_LOG=info RUST_BACKTRACE=1 cargo run --release --bin $binary --features dev -- \ - --block-mode=1s \ - --candidate-peers="$candidate_peers" \ - -a=$address \ - --grpc-server-address=$grpc_address \ - --rocks-path-prefix=$rocks_path_prefix \ - --tokio-console-address=$tokio_console_address \ - --perm-storage=rocks \ - --metrics-exporter-address=$metrics_exporter_address >> $log_file 2>&1 & -} - -# Function to check liveness of an instance -check_liveness() { - local port=$1 - curl -s http://0.0.0.0:$port \ - --header "content-type: application/json" \ - --data '{"jsonrpc":"2.0","method":"stratus_health","params":[],"id":1}' | jq '.result' -} - -# Function to check if an instance is the leader -check_leader() { - local port=$1 - local response=$(curl -s http://0.0.0.0:$port \ - --header "content-type: application/json" \ - --data '{"jsonrpc":"2.0","method":"stratus_version","params":[],"id":1}') - - local is_leader=$(echo $response | jq -r '.result.consensus.is_leader') - local term=$(echo $response | jq -r '.result.consensus.current_term') - - echo "$is_leader $term" -} - -# Function to find the leader node -# Returns nothing if no leader is found or if there is mismatch between instances terms -# Otherwise returns a list of leader addresses -find_leader() { - local ports=("$@") - local leaders=() - local term="" - - for port in "${ports[@]}"; do - read -r is_leader current_term <<< "$(check_leader "$port")" - - if [ -z "$term" ]; then - term="$current_term" - elif [ "$term" != "$current_term" ]; then - echo "" - return - fi - - if [ "$is_leader" == "true" ]; then - leaders+=("$port") - fi - done - - if [ ${#leaders[@]} -gt 0 ]; then - echo "${leaders[@]}" - fi -} - -# Function to run the election test -run_test() { - local instances=() - local all_addresses=() - - for ((i=1; i<=num_instances; i++)); do - local base_port=$((3000 + i)) - local grpc_port=$((3777 + i)) - all_addresses+=("http://0.0.0.0:$base_port;$grpc_port") - done - - for ((i=1; i<=num_instances; i++)); do - local base_port=$((3000 + i)) - local grpc_port=$((3777 + i)) - local tokio_console_port=$((6668 + i)) - local metrics_exporter_port=$((9000 + i)) - - # Exclude current instance's address to get candidate_peers - local candidate_peers=($(printf "%s\n" "${all_addresses[@]}")) - local candidate_peers_str="" - - if [ ${#candidate_peers[@]} -gt 0 ]; then - candidate_peers_str=$(printf ",%s" "${candidate_peers[@]}") - candidate_peers_str=${candidate_peers_str:1} - fi - instances+=("0.0.0.0:$base_port 0.0.0.0:$grpc_port tmp_rocks_$base_port instance_$base_port.log $base_port ${candidate_peers_str} 0.0.0.0:$tokio_console_port 0.0.0.0:$metrics_exporter_port") - done - - # Start instances - echo "Starting $num_instances instance(s)..." - ports=() - grpc_addresses=() - rocks_paths=() - liveness=() - for instance in "${instances[@]}"; do - IFS=' ' read -r -a params <<< "$instance" - start_instance "${params[0]}" "${params[1]}" "${params[2]}" "${params[3]}" "${params[5]}" "${params[6]}" "${params[7]}" - ports+=("${params[4]}") - grpc_addresses+=("${params[1]}") - rocks_paths+=("${params[2]}") - liveness+=(false) - sleep 15 - done - - all_ready=false - while [ "$all_ready" != true ]; do - all_ready=true - for i in "${!ports[@]}"; do - if [ "${liveness[$i]}" != true ]; then - response=$(check_liveness "${ports[$i]}") - if [ "$response" = "true" ]; then - liveness[$i]=true - echo "Instance on port ${ports[$i]} is ready." - else - all_ready=false - fi - fi - done - if [ "$all_ready" != true ]; then - echo "Waiting for all instances to be ready..." - sleep 5 - fi - done - - echo "All instances are ready. Waiting for leader election" - - # Maximum timeout duration in seconds for the initial leader election - initial_leader_timeout=120 - - # Capture the start time - initial_start_time=$(date +%s) - - # Wait for the initial leader election with a timeout - while true; do - current_time=$(date +%s) - elapsed_time=$((current_time - initial_start_time)) - - if [ $elapsed_time -ge $initial_leader_timeout ]; then - echo "Timeout reached without finding an initial leader." - exit 1 - fi - - leader_ports=($(find_leader "${ports[@]}")) - if [ ${#leader_ports[@]} -gt 1 ]; then - echo "Error: More than one leader found: ${leader_ports[*]}" - exit 1 - elif [ ${#leader_ports[@]} -eq 1 ]; then - leader_port=${leader_ports[0]} - echo "Leader found on address $leader_port" - break - else - sleep 1 - fi - done - - if [ -z "$leader_port" ]; then - echo "Exiting due to leader election failure." - exit 1 - fi - - send_transactions - - if [ "$enable_leader_restart" = true ]; then - # Kill the leader instance - echo "Killing the leader instance on address $leader_port..." - for i in "${!leader_ports[@]}"; do - if [ "${leader_ports[i]}" == "$leader_port" ]; then - killport --quiet $leader_port || true - break - fi - done - - if [ $num_instances -gt 1 ]; then - sleep 40 # wait for leader election before raising the other instance to avoid split vote - fi - - # Restart the killed instance - echo "Restarting the killed instance..." - for i in "${!instances[@]}"; do - IFS=' ' read -r -a params <<< "${instances[i]}" - if [ "${params[4]}" == "$leader_port" ]; then - start_instance "${params[0]}" "${params[1]}" "${params[2]}" "${params[3]}" "${params[5]}" "${params[6]}" "${params[7]}" - liveness[i]=false - break - fi - done - - restart_all_ready=false - while [ "$restart_all_ready" != true ]; do - restart_all_ready=true - for i in "${!ports[@]}"; do - if [ "${liveness[$i]}" != true ]; then - response=$(check_liveness "${ports[$i]}") - if [ "$response" = "true" ]; then - liveness[$i]=true - echo "Instance on address ${ports[$i]} is ready." - else - restart_all_ready=false - fi - fi - done - if [ "$restart_all_ready" != true ]; then - echo "Waiting for all instances to be ready..." - sleep 5 - fi - done - - echo "All instances are ready after restart. Waiting for new leader election." - - # Maximum timeout duration in seconds for new leader election - max_timeout=120 - - # Capture the start time - start_time=$(date +%s) - - # Wait until a new leader is found or timeout - while true; do - current_time=$(date +%s) - elapsed_time=$((current_time - start_time)) - - if [ $elapsed_time -ge $max_timeout ]; then - echo "Timeout reached without finding a new leader." - exit 1 - fi - - leader_ports=($(find_leader "${ports[@]}")) - if [ ${#leader_ports[@]} -gt 1 ]; then - echo "Error: More than one leader found: ${leader_ports[*]}" - exit 1 - elif [ ${#leader_ports[@]} -eq 1 ]; then - leader_port=${leader_ports[0]} - echo "Leader found on address $leader_port" - break - else - sleep 1 - fi - done - fi -} - -# Run the test n times -for ((iteration_n=1; iteration_n<=$iterations; iteration_n++)); do - echo -e "\n##############################################\n" - echo "Running binary $binary test iteration $iteration_n of $iterations..." - run_test - sleep 5 -done diff --git a/e2e/cloudwalk-contracts/integration/test/helpers/rpc.ts b/e2e/cloudwalk-contracts/integration/test/helpers/rpc.ts index 9211c63e5..8ca9a1392 100644 --- a/e2e/cloudwalk-contracts/integration/test/helpers/rpc.ts +++ b/e2e/cloudwalk-contracts/integration/test/helpers/rpc.ts @@ -44,7 +44,7 @@ export let ETHERJS = new JsonRpcProvider(providerUrl, undefined); export function updateProviderUrl(providerName: string) { switch (providerName) { - case "run-with-importer": + case "importer": providerUrl = "http://localhost:3001?app=e2e"; break; case "stratus": diff --git a/e2e/cloudwalk-contracts/integration/test/importer.test.ts b/e2e/cloudwalk-contracts/integration/test/importer.test.ts index 40033e69e..451125a55 100644 --- a/e2e/cloudwalk-contracts/integration/test/importer.test.ts +++ b/e2e/cloudwalk-contracts/integration/test/importer.test.ts @@ -13,7 +13,7 @@ import { updateProviderUrl, } from "./helpers/rpc"; -describe("Run With Importer integration test", function () { +describe("Importer integration test", function () { before(async function () { await setDeployer(); }); @@ -92,7 +92,7 @@ describe("Run With Importer integration test", function () { } }); - it(`${params.name}: Validate transaction mined delay between Stratus and Run With Importer`, async function () { + it(`${params.name}: Validate transaction mined delay between Stratus and Importer`, async function () { // Get Stratus timestamps updateProviderUrl("stratus"); const stratusTimestamps = await Promise.all( @@ -103,9 +103,9 @@ describe("Run With Importer integration test", function () { }), ); - // Get Run With Importer timestamps - updateProviderUrl("run-with-importer"); - const runWithImporterTimestamps = await Promise.all( + // Get Importer timestamps + updateProviderUrl("importer"); + const importerTimestamps = await Promise.all( txHashList.map(async (txHash) => { const receipt = await sendWithRetry("eth_getTransactionReceipt", [txHash], 20); const block = await sendWithRetry("eth_getBlockByNumber", [receipt.blockNumber, false]); @@ -116,17 +116,17 @@ describe("Run With Importer integration test", function () { // Total time it took for Stratus to process all the blocks containing transactions const stratusProcessingTime = stratusTimestamps[stratusTimestamps.length - 1] - stratusTimestamps[0]; - // Total time it took for Run With Importer to process all the blocks containing transactions - const runWithImporterProcessingTime = - runWithImporterTimestamps[runWithImporterTimestamps.length - 1] - runWithImporterTimestamps[0]; + // Total time it took for Importer to process all the blocks containing transactions + const importerProcessingTime = + importerTimestamps[importerTimestamps.length - 1] - importerTimestamps[0]; console.log(` ✔ Number of transactions sent: ${txHashList.length}`); console.log( - ` ✔ Stratus processing time: ${stratusProcessingTime}s | Run With Importer processing time: ${runWithImporterProcessingTime}s`, + ` ✔ Stratus processing time: ${stratusProcessingTime}s | Importer processing time: ${importerProcessingTime}s`, ); }); - it(`${params.name}: Validate all transactions were imported from Stratus to Run With Importer`, async function () { + it(`${params.name}: Validate all transactions were imported from Stratus to Importer`, async function () { // Get Stratus transaction receipts updateProviderUrl("stratus"); const stratusReceipts = await Promise.all( @@ -136,9 +136,9 @@ describe("Run With Importer integration test", function () { }), ); - // Get Run With Importer transaction receipts - updateProviderUrl("run-with-importer"); - const runWithImporterReceipts = await Promise.all( + // Get Importer transaction receipts + updateProviderUrl("importer"); + const importerReceipts = await Promise.all( txHashList.map(async (txHash) => { const receipt = await sendWithRetry("eth_getTransactionReceipt", [txHash]); return receipt; @@ -148,11 +148,11 @@ describe("Run With Importer integration test", function () { // Assert that all transactions were imported for (let i = 0; i < txHashList.length; i++) { expect(stratusReceipts[i]).to.exist; - expect(runWithImporterReceipts[i]).to.exist; + expect(importerReceipts[i]).to.exist; } }); - it(`${params.name}: Validate each transaction was imported into the same block between Stratus and Run With Importer`, async function () { + it(`${params.name}: Validate each transaction was imported into the same block between Stratus and Importer`, async function () { // Get Stratus block numbers updateProviderUrl("stratus"); const stratusBlockNumbers = await Promise.all( @@ -162,38 +162,38 @@ describe("Run With Importer integration test", function () { }), ); - // Get Run With Importer block numbers - updateProviderUrl("run-with-importer"); - const runWithImporterBlockNumbers = await Promise.all( + // Get Importer block numbers + updateProviderUrl("importer"); + const importerBlockNumbers = await Promise.all( txHashList.map(async (txHash) => { const receipt = await sendWithRetry("eth_getTransactionReceipt", [txHash], 20); return receipt.blockNumber; }), ); - // Assert that each transaction fell into the same block between Stratus and Run With Importer + // Assert that each transaction fell into the same block between Stratus and Importer for (let i = 0; i < txHashList.length; i++) { expect( stratusBlockNumbers[i], - `Transaction ${txHashList[i]} did not fall into the same block between Stratus and Run With Importer`, - ).to.equal(runWithImporterBlockNumbers[i]); + `Transaction ${txHashList[i]} did not fall into the same block between Stratus and Importer`, + ).to.equal(importerBlockNumbers[i]); } }); - it(`${params.name}: Validate balances between Stratus and Run With Importer`, async function () { + it(`${params.name}: Validate balances between Stratus and Importer`, async function () { for (let i = 0; i < wallets.length; i++) { // Get Stratus balance updateProviderUrl("stratus"); const stratusBalance = await brlcToken.balanceOf(wallets[i].address); - // Get Run With Importer balance - updateProviderUrl("run-with-importer"); - const runWithImporterBalance = await brlcToken.balanceOf(wallets[i].address); + // Get Importer balance + updateProviderUrl("importer"); + const importerBalance = await brlcToken.balanceOf(wallets[i].address); // Assert that the balances are equal expect(stratusBalance).to.equal( - runWithImporterBalance, - `Wallet ${wallets[i].address} balances are not equal between Stratus and Run With Importer`, + importerBalance, + `Wallet ${wallets[i].address} balances are not equal between Stratus and Importer`, ); } updateProviderUrl("stratus"); diff --git a/e2e/cloudwalk-contracts/integration/test/simple.test.ts b/e2e/cloudwalk-contracts/integration/test/simple.test.ts deleted file mode 100644 index 91b7fe482..000000000 --- a/e2e/cloudwalk-contracts/integration/test/simple.test.ts +++ /dev/null @@ -1,114 +0,0 @@ -import { expect } from "chai"; -import { ethers } from "hardhat"; - -import { - GAS_LIMIT_OVERRIDE, - brlcToken, - configureBRLC, - deployBRLC, - deployer, - send, - sendWithRetry, - setDeployer, -} from "./helpers/rpc"; - -describe("Integration test", function () { - before(async function () { - await setDeployer(); - }); - - describe("Deploy and configure BRLC contract", function () { - it("Validate deployer is main minter", async function () { - await deployBRLC(); - await configureBRLC(); - - expect(deployer.address).to.equal(await brlcToken.mainMinter()); - expect(await brlcToken.isMinter(deployer.address)).to.be.true; - }); - }); - - describe("Long duration transaction tests", function () { - const parameters = [ - { name: "Few wallets, sufficient balance", wallets: 3, duration: 5, tps: 15, baseBalance: 2000 }, - { name: "Few wallets, insufficient balance", wallets: 2, duration: 5, tps: 5, baseBalance: 5 }, - { name: "Many wallets, sufficient balance", wallets: 20, duration: 5, tps: 5, baseBalance: 2000 }, - ]; - parameters.forEach((params, index) => { - const wallets: any[] = []; - it(`${params.name}: Prepare and mint BRLC to wallets`, async function () { - this.timeout(params.wallets * 1000 + 10000); - - for (let i = 0; i < params.wallets; i++) { - const wallet = ethers.Wallet.createRandom().connect(ethers.provider); - wallets.push(wallet); - } - - for (let i = 0; i < wallets.length; i++) { - const wallet = wallets[i]; - expect( - await brlcToken.mint(wallet.address, params.baseBalance, { gasLimit: GAS_LIMIT_OVERRIDE }), - ).to.have.changeTokenBalance(brlcToken, wallet, params.baseBalance); - } - }); - - let txHashList: string[] = []; - it(`${params.name}: Transfer BRLC between wallets at a configurable TPS`, async function () { - this.timeout(params.duration * 1000 + 10000); - - const transactionInterval = 1000 / params.tps; - - let nonces = await Promise.all( - wallets.map((wallet) => send("eth_getTransactionCount", [wallet.address, "latest"])), - ); - - const startTime = Date.now(); - while (Date.now() - startTime < params.duration * 1000) { - let senderIndex; - let receiverIndex; - do { - senderIndex = Math.floor(Math.random() * wallets.length); - receiverIndex = Math.floor(Math.random() * wallets.length); - } while (senderIndex === receiverIndex); - - let sender = wallets[senderIndex]; - let receiver = wallets[receiverIndex]; - - const amount = Math.floor(Math.random() * 10) + 1; - - try { - const tx = await brlcToken.connect(sender).transfer(receiver.address, amount, { - gasPrice: 0, - gasLimit: GAS_LIMIT_OVERRIDE, - type: 0, - nonce: nonces[senderIndex], - }); - txHashList.push(tx.hash); - } catch (error) {} - - nonces[senderIndex]++; - - await new Promise((resolve) => setTimeout(resolve, transactionInterval)); - } - }); - - it(`${params.name}: Validate transactions are present on blocks`, async function () { - const allBlockTransactions = new Set(); - - await Promise.all( - txHashList.map(async (txHash) => { - const receipt = await sendWithRetry("eth_getTransactionReceipt", [txHash]); - const block = await sendWithRetry("eth_getBlockByNumber", [receipt.blockNumber, false]); - - block.transactions.forEach((tx) => allBlockTransactions.add(tx)); - }), - ); - - const missingTransactions = txHashList.filter((txHash) => !allBlockTransactions.has(txHash)); - - if (missingTransactions.length > 0) { - throw new Error(`Missing transactions: ${missingTransactions.join(", ")}`); - } - }); - }); - }); -}); diff --git a/justfile b/justfile index 49ac6b42c..7ddb345e4 100644 --- a/justfile +++ b/justfile @@ -97,18 +97,6 @@ rpc-downloader *args="": importer-offline *args="": cargo {{nightly_flag}} run --bin importer-offline {{release_flag}} -- {{args}} -# Bin: Import external RPC blocks from external RPC endpoint to Stratus storage -importer-online *args="": - cargo {{nightly_flag}} run --bin importer-online {{release_flag}} -- {{args}} - -# Bin: Validate Stratus storage slots matches reference slots -state-validator *args="": - cargo {{nightly_flag}} run --bin state-validator {{release_flag}} -- {{args}} - -# Bin: `stratus` and `importer-online` in a single binary -run-with-importer *args="": - cargo {{nightly_flag}} run --bin run-with-importer {{release_flag}} -- {{args}} - # ------------------------------------------------------------------------------ # Test tasks # ------------------------------------------------------------------------------ @@ -165,7 +153,7 @@ e2e-stratus block-mode="automine" test="": just _log "Starting Stratus" just build "dev" || exit 1 - just run -a 0.0.0.0:3000 --block-mode {{block-mode}} > stratus.log & + just run -a 0.0.0.0:3000 --leader --block-mode {{block-mode}} > stratus.log & just _wait_for_stratus @@ -186,7 +174,7 @@ e2e-stratus-rocks block-mode="automine" test="": just _log "Starting Stratus" just build "dev" || exit 1 - just run -a 0.0.0.0:3000 --block-mode {{block-mode}} --perm-storage=rocks > stratus.log & + just run -a 0.0.0.0:3000 --leader --block-mode {{block-mode}} --perm-storage=rocks > stratus.log & just _wait_for_stratus @@ -203,7 +191,7 @@ e2e-clock-stratus: #!/bin/bash just _log "Starting Stratus" just build "dev" || exit 1 - cargo run --release --bin stratus --features dev -- --block-mode 1s -a 0.0.0.0:3000 > stratus.log & + cargo run --release --bin stratus --features dev -- --leader --block-mode 1s -a 0.0.0.0:3000 > stratus.log & just _wait_for_stratus @@ -220,7 +208,7 @@ e2e-clock-stratus-rocks: #!/bin/bash just _log "Starting Stratus" just build "dev" || exit 1 - cargo run --release --bin stratus --features dev -- --block-mode 1s --perm-storage=rocks -a 0.0.0.0:3000 > stratus.log & + cargo run --release --bin stratus --features dev -- --leader --block-mode 1s --perm-storage=rocks -a 0.0.0.0:3000 > stratus.log & just _wait_for_stratus @@ -278,22 +266,22 @@ e2e-importer-online: e2e-importer-online-up: #!/bin/bash - # Build Stratus and Run With Importer binaries - just _log "Building Stratus and Run With Importer binaries" - cargo build --release --bin stratus --bin run-with-importer --features dev + # Build Stratus binary + just _log "Building Stratus binary" + cargo build --release --bin stratus --features dev mkdir e2e_logs - # Start Stratus binary - RUST_LOG=info cargo run --release --bin stratus --features dev -- --block-mode 1s --perm-storage=rocks --rocks-path-prefix=temp_3000 --tokio-console-address=0.0.0.0:6668 --metrics-exporter-address=0.0.0.0:9000 -a 0.0.0.0:3000 > e2e_logs/stratus.log & + # Start Stratus with leader flag + RUST_LOG=info cargo run --release --bin stratus --features dev -- --leader --block-mode 1s --perm-storage=rocks --rocks-path-prefix=temp_3000 --tokio-console-address=0.0.0.0:6668 --metrics-exporter-address=0.0.0.0:9000 -a 0.0.0.0:3000 > e2e_logs/stratus.log & - # Wait for Stratus to start + # Wait for Stratus with leader flag to start just _wait_for_stratus 3000 - # Start Run With Importer binary - RUST_LOG=info cargo run --release --bin run-with-importer --features dev -- --block-mode 1s --perm-storage=rocks --rocks-path-prefix=temp_3001 --tokio-console-address=0.0.0.0:6669 --metrics-exporter-address=0.0.0.0:9001 -a 0.0.0.0:3001 -r http://0.0.0.0:3000/ -w ws://0.0.0.0:3000/ > e2e_logs/run_with_importer.log & + # Start Stratus with follower flag + RUST_LOG=info cargo run --release --bin stratus --features dev -- --follower --perm-storage=rocks --rocks-path-prefix=temp_3001 --tokio-console-address=0.0.0.0:6669 --metrics-exporter-address=0.0.0.0:9001 -a 0.0.0.0:3001 -r http://0.0.0.0:3000/ -w ws://0.0.0.0:3000/ > e2e_logs/importer.log & - # Wait for Run With Importer to start + # Wait for Stratus with follower flag to start just _wait_for_stratus 3001 if [ -d e2e/cloudwalk-contracts ]; then @@ -315,12 +303,8 @@ e2e-importer-online-up: e2e-importer-online-down: #!/bin/bash - # Kill run-with-importer - killport 3001 - run_with_importer_pid=$(pgrep -f 'run-with-importer') - kill $run_with_importer_pid - # Kill Stratus + killport 3001 killport 3000 stratus_pid=$(pgrep -f 'stratus') kill $stratus_pid @@ -331,26 +315,6 @@ e2e-importer-online-down: # Delete zeppelin directory rm -rf ./e2e/cloudwalk-contracts/integration/.openzeppelin -# ------------------------------------------------------------------------------ -# Chaos tests -# ------------------------------------------------------------------------------ - -# Chaos: Run chaos testing experiment -run-chaos-experiment bin="" instances="" iterations="" enable-leader-restart="" experiment="": - #!/bin/bash - - just _log "Building Stratus" - cargo build --release --bin {{ bin }} --features dev - - cd e2e/cloudwalk-contracts/integration - if [ ! -d node_modules ]; then - npm install - fi - cd ../../.. - - just _log "Executing experiment {{ experiment }} {{ iterations }}x on {{ bin }} binary with {{ instances }} instance(s)" - ./chaos/experiments/{{ experiment }}.sh --bin {{ bin }} --instances {{ instances }} --iterations {{ iterations }} --enable-leader-restart {{ enable-leader-restart }} - # ------------------------------------------------------------------------------ # Hive tests # ------------------------------------------------------------------------------ @@ -407,7 +371,7 @@ contracts-test-stratus *args="": #!/bin/bash just _log "Starting Stratus" just build "dev" || exit 1 - just run -a 0.0.0.0:3000 & + just run --leader -a 0.0.0.0:3000 & just _wait_for_stratus @@ -424,7 +388,7 @@ contracts-test-stratus-rocks *args="": #!/bin/bash just _log "Starting Stratus" just build "dev" || exit 1 - just run -a 0.0.0.0:3000 --perm-storage=rocks > stratus.log & + just run --leader -a 0.0.0.0:3000 --perm-storage=rocks > stratus.log & just _wait_for_stratus diff --git a/src/bin/importer_online.rs b/src/bin/importer_online.rs deleted file mode 100644 index 4a3e52a79..000000000 --- a/src/bin/importer_online.rs +++ /dev/null @@ -1,420 +0,0 @@ -use std::cmp::min; -use std::sync::atomic::AtomicU64; -use std::sync::atomic::Ordering; -use std::sync::Arc; -use std::time::Duration; - -use futures::try_join; -use futures::StreamExt; -use serde::Deserialize; -use stratus::config::ImporterOnlineConfig; -use stratus::eth::executor::Executor; -use stratus::eth::miner::Miner; -use stratus::eth::primitives::BlockNumber; -use stratus::eth::primitives::ExternalBlock; -use stratus::eth::primitives::ExternalReceipt; -use stratus::eth::primitives::ExternalReceipts; -use stratus::eth::primitives::Hash; -use stratus::eth::storage::StratusStorage; -use stratus::ext::spawn_named; -use stratus::ext::traced_sleep; -use stratus::ext::DisplayExt; -use stratus::ext::SleepReason; -use stratus::if_else; -#[cfg(feature = "metrics")] -use stratus::infra::metrics; -use stratus::infra::tracing::warn_task_rx_closed; -use stratus::infra::tracing::warn_task_tx_closed; -use stratus::infra::tracing::SpanExt; -use stratus::infra::BlockchainClient; -use stratus::log_and_err; -#[cfg(feature = "metrics")] -use stratus::utils::calculate_tps; -use stratus::utils::DropTimer; -use stratus::GlobalServices; -use stratus::GlobalState; -use tokio::sync::mpsc; -use tokio::task::yield_now; -use tokio::time::timeout; -use tracing::Span; - -// ----------------------------------------------------------------------------- -// Globals -// ----------------------------------------------------------------------------- - -/// Current block number of the external RPC blockchain. -static EXTERNAL_RPC_CURRENT_BLOCK: AtomicU64 = AtomicU64::new(0); - -/// Only sets the external RPC current block number if it is equals or greater than the current one. -fn set_external_rpc_current_block(new_number: BlockNumber) { - let new_number_u64 = new_number.as_u64(); - let _ = EXTERNAL_RPC_CURRENT_BLOCK.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |current_number| { - if_else!(new_number_u64 >= current_number, Some(new_number_u64), None) - }); -} - -// ----------------------------------------------------------------------------- -// Constants -// ----------------------------------------------------------------------------- -/// Number of blocks that are downloaded in parallel. -const PARALLEL_BLOCKS: usize = 3; - -/// Number of receipts that are downloaded in parallel. -const PARALLEL_RECEIPTS: usize = 100; - -/// Timeout awaiting for newHeads event before fallback to polling. -const TIMEOUT_NEW_HEADS: Duration = Duration::from_millis(2000); - -/// Interval before we starting retrieving receipts because they are not immediately available after the block is retrieved. -const INTERVAL_FETCH_RECEIPTS: Duration = Duration::from_millis(50); - -// ----------------------------------------------------------------------------- -// Execution -// ----------------------------------------------------------------------------- -#[allow(dead_code)] -fn main() -> anyhow::Result<()> { - let global_services = GlobalServices::::init(); - global_services.runtime.block_on(run(global_services.config)) -} - -async fn run(config: ImporterOnlineConfig) -> anyhow::Result<()> { - let _timer = DropTimer::start("importer-online"); - - // init server - let storage = config.storage.init()?; - let miner = config.miner.init_external_mode(Arc::clone(&storage))?; - let executor = config.executor.init(Arc::clone(&storage), Arc::clone(&miner)); - let chain = Arc::new( - BlockchainClient::new_http_ws( - &config.base.external_rpc, - config.base.external_rpc_ws.as_deref(), - config.base.external_rpc_timeout, - ) - .await?, - ); - - let result = run_importer_online(executor, miner, Arc::clone(&storage), chain, config.base.sync_interval).await; - if let Err(ref e) = result { - tracing::error!(reason = ?e, "importer-online failed"); - } - - // Explicitly block the `main` thread to drop the storage. - drop(storage); - - result -} - -pub async fn run_importer_online( - executor: Arc, - miner: Arc, - storage: Arc, - chain: Arc, - sync_interval: Duration, -) -> anyhow::Result<()> { - let _timer = DropTimer::start("importer-online::run_importer_online"); - - let number = storage.read_block_number_to_resume_import()?; - - let (backlog_tx, backlog_rx) = mpsc::unbounded_channel(); - - // spawn block executor: - // it executes and mines blocks and expects to receive them via channel in the correct order. - let task_executor = spawn_named("importer::executor", start_block_executor(executor, miner, backlog_rx)); - - // spawn block number: - // it keeps track of the blockchain current block number. - let number_fetcher_chain = Arc::clone(&chain); - let task_number_fetcher = spawn_named("importer::number-fetcher", start_number_fetcher(number_fetcher_chain, sync_interval)); - - // spawn block fetcher: - // it fetches blocks and receipts in parallel and sends them to the executor in the correct order. - // it uses the number fetcher current block to determine if should keep downloading more blocks or not. - let block_fetcher_chain = Arc::clone(&chain); - let task_block_fetcher = spawn_named("importer::block-fetcher", start_block_fetcher(block_fetcher_chain, backlog_tx, number)); - - // await all tasks - if let Err(e) = try_join!(task_executor, task_block_fetcher, task_number_fetcher) { - tracing::error!(reason = ?e, "importer-online failed"); - } - Ok(()) -} - -// ----------------------------------------------------------------------------- -// Executor -// ----------------------------------------------------------------------------- - -// Executes external blocks and persist them to storage. -async fn start_block_executor( - executor: Arc, - miner: Arc, - mut backlog_rx: mpsc::UnboundedReceiver<(ExternalBlock, Vec)>, -) -> anyhow::Result<()> { - const TASK_NAME: &str = "block-executor"; - - loop { - if GlobalState::is_shutdown_warn(TASK_NAME) { - return Ok(()); - } - - let (block, receipts) = match timeout(Duration::from_secs(2), backlog_rx.recv()).await { - Ok(Some(inner)) => inner, - Ok(None) => break, // channel closed - Err(_timed_out) => { - tracing::warn!(timeout = "2s", "timeout reading block executor channel, expected around 1 block per second"); - continue; - } - }; - - #[cfg(feature = "metrics")] - let start = metrics::now(); - - // execute and mine - let receipts = ExternalReceipts::from(receipts); - if let Err(e) = executor.execute_external_block(&block, &receipts) { - let message = GlobalState::shutdown_from(TASK_NAME, "failed to reexecute external block"); - return log_and_err!(reason = e, message); - }; - - // statistics - #[cfg(feature = "metrics")] - { - let duration = start.elapsed(); - let tps = calculate_tps(duration, block.transactions.len()); - - tracing::info!( - tps, - duraton = %duration.to_string_ext(), - block_number = %block.number(), - receipts = receipts.len(), - "reexecuted external block", - ); - } - - if let Err(e) = miner.mine_external_mixed_and_commit() { - let message = GlobalState::shutdown_from(TASK_NAME, "failed to mine external block"); - return log_and_err!(reason = e, message); - }; - - #[cfg(feature = "metrics")] - { - metrics::inc_n_importer_online_transactions_total(receipts.len() as u64); - metrics::inc_import_online_mined_block(start.elapsed()); - } - } - - warn_task_tx_closed(TASK_NAME); - Ok(()) -} - -// ----------------------------------------------------------------------------- -// Number fetcher -// ----------------------------------------------------------------------------- - -/// Retrieves the blockchain current block number. -async fn start_number_fetcher(chain: Arc, sync_interval: Duration) -> anyhow::Result<()> { - const TASK_NAME: &str = "external-number-fetcher"; - - // initial newHeads subscriptions. - // abort application if cannot subscribe. - let mut sub_new_heads = if chain.supports_ws() { - tracing::info!("{} subscribing to newHeads event", TASK_NAME); - - match chain.subscribe_new_heads().await { - Ok(sub) => { - tracing::info!("{} subscribed to newHeads events", TASK_NAME); - Some(sub) - } - Err(e) => { - let message = GlobalState::shutdown_from(TASK_NAME, "cannot subscribe to newHeads event"); - return log_and_err!(reason = e, message); - } - } - } else { - tracing::warn!("{} blockchain client does not have websocket enabled", TASK_NAME); - None - }; - - // keep reading websocket subscription or polling via http. - loop { - if GlobalState::is_shutdown_warn(TASK_NAME) { - return Ok(()); - } - - // if we have a subscription, try to read from subscription. - // in case of failure, re-subscribe because current subscription may have been closed in the server. - if let Some(sub) = &mut sub_new_heads { - tracing::info!("{} awaiting block number from newHeads subscription", TASK_NAME); - let resubscribe_ws = match timeout(TIMEOUT_NEW_HEADS, sub.next()).await { - Ok(Some(Ok(block))) => { - tracing::info!(block_number = %block.number(), "{} received newHeads event", TASK_NAME); - set_external_rpc_current_block(block.number()); - continue; - } - Ok(None) => { - tracing::error!("{} newHeads subscription closed by the other side", TASK_NAME); - true - } - Ok(Some(Err(e))) => { - tracing::error!(reason = ?e, "{} failed to read newHeads subscription event", TASK_NAME); - true - } - Err(_) => { - tracing::error!("{} timed-out waiting for newHeads subscription event", TASK_NAME); - true - } - }; - - // resubscribe if necessary. - // only update the existing subscription if succedeed, otherwise we will try again in the next iteration. - if chain.supports_ws() && resubscribe_ws { - tracing::info!("{} resubscribing to newHeads event", TASK_NAME); - match chain.subscribe_new_heads().await { - Ok(sub) => { - tracing::info!("{} resubscribed to newHeads event", TASK_NAME); - sub_new_heads = Some(sub); - } - Err(e) => { - tracing::error!(reason = ?e, "{} failed to resubscribe to newHeads event", TASK_NAME); - } - } - } - } - - // fallback to polling - tracing::warn!("{} falling back to http polling because subscription failed or it is not enabled", TASK_NAME); - match chain.fetch_block_number().await { - Ok(block_number) => { - tracing::info!( - %block_number, - sync_interval = %sync_interval.to_string_ext(), - "fetched current block number via http. awaiting sync interval to retrieve again." - ); - set_external_rpc_current_block(block_number); - traced_sleep(sync_interval, SleepReason::SyncData).await; - } - Err(e) => { - tracing::error!(reason = ?e, "failed to retrieve block number. retrying now."); - } - } - } -} - -// ----------------------------------------------------------------------------- -// Block fetcher -// ----------------------------------------------------------------------------- - -/// Retrieves blocks and receipts. -async fn start_block_fetcher( - chain: Arc, - backlog_tx: mpsc::UnboundedSender<(ExternalBlock, Vec)>, - mut importer_block_number: BlockNumber, -) -> anyhow::Result<()> { - const TASK_NAME: &str = "external-block-fetcher"; - - loop { - if GlobalState::is_shutdown_warn(TASK_NAME) { - return Ok(()); - } - - // if we are ahead of current block number, await until we are behind again - let external_rpc_current_block = EXTERNAL_RPC_CURRENT_BLOCK.load(Ordering::Relaxed); - if importer_block_number.as_u64() > external_rpc_current_block { - yield_now().await; - continue; - } - - // we are behind current, so we will fetch multiple blocks in parallel to catch up - let blocks_behind = external_rpc_current_block.saturating_sub(importer_block_number.as_u64()) + 1; // TODO: use count_to from BlockNumber - let mut blocks_to_fetch = min(blocks_behind, 1_000); // avoid spawning millions of tasks (not parallelism), at least until we know it is safe - tracing::info!(%blocks_behind, blocks_to_fetch, "catching up with blocks"); - - let mut tasks = Vec::with_capacity(blocks_to_fetch as usize); - while blocks_to_fetch > 0 { - blocks_to_fetch -= 1; - tasks.push(fetch_block_and_receipts(Arc::clone(&chain), importer_block_number)); - importer_block_number = importer_block_number.next(); - } - - // keep fetching in order - let mut tasks = futures::stream::iter(tasks).buffered(PARALLEL_BLOCKS); - while let Some((block, receipts)) = tasks.next().await { - if backlog_tx.send((block, receipts)).is_err() { - warn_task_rx_closed(TASK_NAME); - return Ok(()); - } - } - } -} - -#[tracing::instrument(name = "importer::fetch_block_and_receipts", skip_all, fields(block_number))] -async fn fetch_block_and_receipts(chain: Arc, block_number: BlockNumber) -> (ExternalBlock, Vec) { - Span::with(|s| { - s.rec_str("block_number", &block_number); - }); - - // fetch block - let block = fetch_block(Arc::clone(&chain), block_number).await; - - // wait some time until receipts are available - let _ = traced_sleep(INTERVAL_FETCH_RECEIPTS, SleepReason::SyncData).await; - - // fetch receipts in parallel - let mut receipts_tasks = Vec::with_capacity(block.transactions.len()); - for hash in block.transactions.iter().map(|tx| tx.hash()) { - receipts_tasks.push(fetch_receipt(Arc::clone(&chain), block_number, hash)); - } - let receipts = futures::stream::iter(receipts_tasks).buffer_unordered(PARALLEL_RECEIPTS).collect().await; - - (block, receipts) -} - -#[tracing::instrument(name = "importer::fetch_block", skip_all, fields(block_number))] -async fn fetch_block(chain: Arc, block_number: BlockNumber) -> ExternalBlock { - const RETRY_DELAY: Duration = Duration::from_millis(10); - Span::with(|s| { - s.rec_str("block_number", &block_number); - }); - - loop { - tracing::info!(%block_number, "fetching block"); - let block = match chain.fetch_block(block_number).await { - Ok(json) => json, - Err(e) => { - tracing::warn!(reason = ?e, %block_number, delay_ms=%RETRY_DELAY.as_millis(), "failed to retrieve block. retrying with delay."); - traced_sleep(RETRY_DELAY, SleepReason::RetryBackoff).await; - continue; - } - }; - - if block.is_null() { - tracing::warn!(%block_number, delay_ms=%RETRY_DELAY.as_millis(), "block not mined yet. retrying with delay."); - traced_sleep(RETRY_DELAY, SleepReason::SyncData).await; - continue; - } - - return ExternalBlock::deserialize(&block).expect("cannot fail to deserialize external block"); - } -} - -#[tracing::instrument(name = "importer::fetch_receipt", skip_all, fields(block_number, tx_hash))] -async fn fetch_receipt(chain: Arc, block_number: BlockNumber, tx_hash: Hash) -> ExternalReceipt { - Span::with(|s| { - s.rec_str("block_number", &block_number); - s.rec_str("tx_hash", &tx_hash); - }); - - loop { - tracing::info!(%block_number, %tx_hash, "fetching receipt"); - - match chain.fetch_receipt(tx_hash).await { - Ok(Some(receipt)) => return receipt, - Ok(None) => { - tracing::warn!(%block_number, %tx_hash, "receipt not available yet because block is not mined. retrying now."); - continue; - } - Err(e) => { - tracing::error!(reason = ?e, %block_number, %tx_hash, "failed to fetch receipt. retrying now."); - } - } - } -} diff --git a/src/bin/run_with_importer.rs b/src/bin/run_with_importer.rs deleted file mode 100644 index 03960bc6c..000000000 --- a/src/bin/run_with_importer.rs +++ /dev/null @@ -1,78 +0,0 @@ -mod importer_online; - -use std::sync::Arc; - -use importer_online::run_importer_online; -use stratus::config::RunWithImporterConfig; -use stratus::eth::rpc::serve_rpc; -use stratus::eth::Consensus; -use stratus::infra::BlockchainClient; -use stratus::GlobalServices; -use stratus::GlobalState; -use tokio::join; - -fn main() -> anyhow::Result<()> { - let global_services = GlobalServices::::init(); - global_services.runtime.block_on(run(global_services.config)) -} - -async fn run(config: RunWithImporterConfig) -> anyhow::Result<()> { - const TASK_NAME: &str = "run-with-importer"; - - // init services - let storage = config.storage.init()?; - let miner = config.miner.init_external_mode(Arc::clone(&storage))?; - let consensus = Consensus::new( - Arc::clone(&storage), - Arc::clone(&miner), - config.storage.perm_storage.rocks_path_prefix.clone(), - config.clone().candidate_peers.clone(), - Some(config.clone()), - config.rpc_server.address, - config.grpc_server_address, - ); // in development, with no leader configured, the current node ends up being the leader - let (http_url, ws_url) = consensus.get_chain_url().await.expect("chain url not found"); - let chain = Arc::new(BlockchainClient::new_http_ws(&http_url, ws_url.as_deref(), config.online.external_rpc_timeout).await?); - - let executor = config.executor.init(Arc::clone(&storage), Arc::clone(&miner)); - - let rpc_storage = Arc::clone(&storage); - let rpc_executor = Arc::clone(&executor); - let rpc_miner = Arc::clone(&miner); - - // run rpc and importer-online in parallel - let rpc_config = config.clone(); - let rpc_task = async move { - let res = serve_rpc( - // services - rpc_storage, - rpc_executor, - rpc_miner, - Arc::clone(&consensus), - // config - rpc_config.clone(), - rpc_config.rpc_server, - rpc_config.executor.executor_chain_id.into(), - ) - .await; - GlobalState::shutdown_from(TASK_NAME, "rpc server finished unexpectedly"); - res - }; - - let importer_task = async { - let res = run_importer_online(executor, miner, Arc::clone(&storage), chain, config.online.sync_interval).await; - GlobalState::shutdown_from(TASK_NAME, "importer online finished unexpectedly"); - res - }; - - // await both services to finish - let (rpc_result, importer_result) = join!(rpc_task, importer_task); - tracing::debug!(?rpc_result, ?importer_result, "rpc and importer tasks finished"); - rpc_result?; - importer_result?; - - // Explicitly block the `main` thread to drop the storage. - drop(storage); - - Ok(()) -} diff --git a/src/config.rs b/src/config.rs index c09eb6b13..86fea61e6 100644 --- a/src/config.rs +++ b/src/config.rs @@ -7,6 +7,7 @@ use std::sync::atomic::Ordering; use std::time::Duration; use anyhow::anyhow; +use clap::ArgGroup; use clap::Parser; use display_json::DebugAsJson; use strum::VariantNames; @@ -14,6 +15,7 @@ use tokio::runtime::Builder; use tokio::runtime::Runtime; use crate::eth::executor::ExecutorConfig; +use crate::eth::importer::ImporterConfig; use crate::eth::miner::MinerConfig; use crate::eth::primitives::Address; use crate::eth::rpc::RpcServerConfig; @@ -156,7 +158,14 @@ impl CommonConfig { /// Configuration for main Stratus service. #[derive(DebugAsJson, Clone, Parser, derive_more::Deref, serde::Serialize)] +#[clap(group = ArgGroup::new("mode").required(true).args(&["leader", "follower"]))] pub struct StratusConfig { + #[arg(long = "leader", env = "LEADER", conflicts_with("follower"))] + pub leader: bool, + + #[arg(long = "follower", env = "FOLLOWER", conflicts_with("leader"))] + pub follower: bool, + #[clap(flatten)] pub rpc_server: RpcServerConfig, @@ -169,6 +178,9 @@ pub struct StratusConfig { #[clap(flatten)] pub miner: MinerConfig, + #[clap(flatten)] + pub importer: Option, + #[deref] #[clap(flatten)] pub common: CommonConfig, @@ -271,126 +283,6 @@ impl WithCommonConfig for ImporterOfflineConfig { } } -// ----------------------------------------------------------------------------- -// Config: ImporterOnline -// ----------------------------------------------------------------------------- - -/// Configuration for `importer-online` binary. -#[derive(DebugAsJson, Clone, Parser, derive_more::Deref, serde::Serialize)] -pub struct ImporterOnlineConfig { - #[clap(flatten)] - pub base: ImporterOnlineBaseConfig, - - #[clap(flatten)] - pub executor: ExecutorConfig, - - #[clap(flatten)] - pub miner: MinerConfig, - - #[clap(flatten)] - pub storage: StratusStorageConfig, - - #[deref] - #[clap(flatten)] - pub common: CommonConfig, -} - -#[derive(DebugAsJson, Clone, Parser, serde::Serialize)] -pub struct ImporterOnlineBaseConfig { - /// External RPC HTTP endpoint to sync blocks with Stratus. - #[arg(short = 'r', long = "external-rpc", env = "EXTERNAL_RPC")] - pub external_rpc: String, - - /// External RPC WS endpoint to sync blocks with Stratus. - #[arg(short = 'w', long = "external-rpc-ws", env = "EXTERNAL_RPC_WS")] - pub external_rpc_ws: Option, - - /// Timeout for blockchain requests (importer online) - #[arg(long = "external-rpc-timeout", value_parser=parse_duration, env = "EXTERNAL_RPC_TIMEOUT", default_value = "2s")] - pub external_rpc_timeout: Duration, - - #[arg(long = "sync-interval", value_parser=parse_duration, env = "SYNC_INTERVAL", default_value = "100ms")] - pub sync_interval: Duration, -} - -impl WithCommonConfig for ImporterOnlineConfig { - fn common(&self) -> &CommonConfig { - &self.common - } -} - -#[derive(DebugAsJson, Clone, Parser, derive_more::Deref, serde::Serialize)] -pub struct RunWithImporterConfig { - #[clap(flatten)] - pub rpc_server: RpcServerConfig, - - #[arg(long = "leader-node", env = "LEADER_NODE")] - pub leader_node: Option, // to simulate this in use locally with other nodes, you need to add the node name into /etc/hostname - - #[clap(flatten)] - pub online: ImporterOnlineBaseConfig, - - #[clap(flatten)] - pub storage: StratusStorageConfig, - - #[clap(flatten)] - pub executor: ExecutorConfig, - - #[clap(flatten)] - pub miner: MinerConfig, - - #[deref] - #[clap(flatten)] - pub common: CommonConfig, -} - -impl WithCommonConfig for RunWithImporterConfig { - fn common(&self) -> &CommonConfig { - &self.common - } -} - -// ----------------------------------------------------------------------------- -// Config: StateValidator -// ----------------------------------------------------------------------------- - -/// Configuration for `state-validator` binary. -#[derive(DebugAsJson, Clone, Parser, derive_more::Deref, serde::Serialize)] -pub struct StateValidatorConfig { - /// How many slots to validate per batch. 0 means every slot. - #[arg(long = "max-samples", env = "MAX_SAMPLES", default_value_t = 0)] - pub sample_size: u64, - - /// Seed to use when sampling. 0 for random seed. - #[arg(long = "seed", env = "SEED", default_value_t = 0, requires = "sample_size")] - pub seed: u64, - - /// Validate in batches of n blocks. - #[arg(short = 'i', long = "interval", env = "INTERVAL", default_value = "1000")] - pub interval: u64, - - /// What method to use when validating. - #[arg(short = 'm', long = "method", env = "METHOD")] - pub method: ValidatorMethodConfig, - - /// How many concurrent validation tasks to run - #[arg(short = 'c', long = "concurrent-tasks", env = "CONCURRENT_TASKS", default_value_t = 10)] - pub concurrent_tasks: u16, - - #[deref] - #[clap(flatten)] - pub common: CommonConfig, - - #[clap(flatten)] - pub storage: StratusStorageConfig, -} - -impl WithCommonConfig for StateValidatorConfig { - fn common(&self) -> &CommonConfig { - &self.common - } -} - // ----------------------------------------------------------------------------- // Config: Test // ----------------------------------------------------------------------------- diff --git a/src/eth/consensus/mod.rs b/src/eth/consensus/mod.rs index be9abfe44..61979de0a 100644 --- a/src/eth/consensus/mod.rs +++ b/src/eth/consensus/mod.rs @@ -1,664 +1,13 @@ -//FIXME temporarily there are a lot of allow dead_code and allow unused_imports, we need to deal with them properly later -#[allow(dead_code)] //TODO remove this -mod append_log_entries_storage; -mod discovery; -mod log_entry; -mod propagation; -pub mod utils; - -mod server; - -use std::collections::HashMap; -use std::net::SocketAddr; -use std::net::UdpSocket; -use std::sync::atomic::AtomicU64; -use std::sync::atomic::AtomicU8; -use std::sync::atomic::Ordering; -use std::sync::Arc; -use std::time::Duration; - -use anyhow::anyhow; -use rand::Rng; -use tokio::sync::broadcast; -use tokio::sync::Mutex; -use tokio::sync::RwLock; -use tokio::task::JoinHandle; -use tonic::Request; +pub mod raft; +pub mod simple_consensus; +use async_trait::async_trait; +use crate::eth::primitives::Bytes; use crate::eth::primitives::Hash; -#[allow(unused_imports)] -use crate::eth::primitives::TransactionExecution; -use crate::eth::storage::StratusStorage; -use crate::ext::spawn_named; -use crate::ext::traced_sleep; -use crate::ext::SleepReason; -use crate::infra::BlockchainClient; -use crate::GlobalState; - -pub mod append_entry { - #![allow(clippy::default_trait_access, clippy::wildcard_imports)] - tonic::include_proto!("append_entry"); -} -#[allow(unused_imports)] -use append_entry::append_entry_service_client::AppendEntryServiceClient; -use append_entry::append_entry_service_server::AppendEntryService; -use append_entry::RequestVoteRequest; -use append_entry::TransactionExecutionEntry; - -use self::append_log_entries_storage::AppendLogEntriesStorage; -use self::log_entry::LogEntryData; -use super::primitives::Bytes; -use crate::config::RunWithImporterConfig; -use crate::eth::miner::Miner; -use crate::eth::primitives::Block; -#[cfg(feature = "metrics")] -use crate::infra::metrics; - -const PEER_DISCOVERY_DELAY: Duration = Duration::from_secs(30); - -#[derive(Clone, Copy, Debug, PartialEq)] -enum Role { - Leader = 1, - Follower = 2, - _Candidate = 3, -} - -static ROLE: AtomicU8 = AtomicU8::new(Role::Follower as u8); - -#[derive(Clone, Debug, Default, Hash, Eq, PartialEq)] -struct PeerAddress { - address: String, - jsonrpc_port: u16, - grpc_port: u16, -} - -impl PeerAddress { - fn new(address: String, jsonrpc_port: u16, grpc_port: u16) -> Self { - PeerAddress { - address, - jsonrpc_port, - grpc_port, - } - } - - fn full_grpc_address(&self) -> String { - format!("{}:{}", self.address, self.grpc_port) - } - - fn full_jsonrpc_address(&self) -> String { - format!("http://{}:{}", self.address, self.jsonrpc_port) - } - - fn from_string(s: String) -> Result { - let (scheme, address_part) = if let Some(address) = s.strip_prefix("http://") { - ("http://", address) - } else if let Some(address) = s.strip_prefix("https://") { - ("https://", address) - } else { - return Err(anyhow::anyhow!("invalid scheme")); - }; - - let parts: Vec<&str> = address_part.split(':').collect(); - if parts.len() != 2 { - return Err(anyhow::anyhow!("invalid format")); - } - let address = format!("{}{}", scheme, parts[0]); - let ports: Vec<&str> = parts[1].split(';').collect(); - if ports.len() != 2 { - return Err(anyhow::anyhow!("invalid format for jsonrpc and grpc ports")); - } - let jsonrpc_port = ports[0].parse::()?; - let grpc_port = ports[1].parse::()?; - Ok(PeerAddress { - address, - jsonrpc_port, - grpc_port, - }) - } -} - -use std::fmt; - -impl fmt::Display for PeerAddress { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "{}:{};{}", self.address, self.jsonrpc_port, self.grpc_port) - } -} - -#[cfg(test)] -use crate::eth::consensus::tests::factories::MockAppendEntryServiceClient; - -#[cfg(not(test))] -type ClientType = AppendEntryServiceClient; -#[cfg(test)] -type ClientType = MockAppendEntryServiceClient; - -#[derive(Clone)] -struct Peer { - client: ClientType, - match_index: u64, - next_index: u64, - role: Role, - receiver: Arc>>, -} - -type PeerTuple = (Peer, JoinHandle<()>); - -pub struct Consensus { - broadcast_sender: broadcast::Sender, //propagates the blocks - importer_config: Option, //HACK this is used with sync online only - storage: Arc, - miner: Arc, - log_entries_storage: Arc, - peers: Arc>>, - #[allow(dead_code)] - direct_peers: Vec, - voted_for: Mutex>, //essential to ensure that a server only votes once per term - current_term: AtomicU64, - last_arrived_block_number: AtomicU64, // kept for should_serve method check - prev_log_index: AtomicU64, - transaction_execution_queue: Arc>>, - heartbeat_timeout: Duration, - my_address: PeerAddress, - #[allow(dead_code)] - grpc_address: SocketAddr, - reset_heartbeat_signal: tokio::sync::Notify, - blockchain_client: Mutex>>, -} - -impl Consensus { - #[allow(clippy::too_many_arguments)] //TODO: refactor into consensus config - pub fn new( - storage: Arc, - miner: Arc, - log_storage_path: Option, - direct_peers: Vec, - importer_config: Option, - jsonrpc_address: SocketAddr, - grpc_address: SocketAddr, - ) -> Arc { - let (broadcast_sender, _) = broadcast::channel(32); //TODO rename to internal_peer_broadcast_sender - let last_arrived_block_number = AtomicU64::new(0); - let peers = Arc::new(RwLock::new(HashMap::new())); - let my_address = Self::discover_my_address(jsonrpc_address.port(), grpc_address.port()); - - let log_entries_storage: Arc = Arc::new(AppendLogEntriesStorage::new(log_storage_path).unwrap()); - let current_term: u64 = log_entries_storage.get_last_term().unwrap_or(1); - let prev_log_index: u64 = log_entries_storage.get_last_index().unwrap_or(0); - - let consensus = Self { - broadcast_sender, - miner: Arc::clone(&miner), - storage, - log_entries_storage, - peers, - direct_peers, - current_term: AtomicU64::new(current_term), - voted_for: Mutex::new(None), - prev_log_index: AtomicU64::new(prev_log_index), - last_arrived_block_number, - transaction_execution_queue: Arc::new(Mutex::new(Vec::new())), - importer_config, - heartbeat_timeout: Duration::from_millis(rand::thread_rng().gen_range(300..400)), // Adjust as needed - my_address: my_address.clone(), - grpc_address, - reset_heartbeat_signal: tokio::sync::Notify::new(), - blockchain_client: Mutex::new(None), - }; - let consensus = Arc::new(consensus); - - Self::initialize_periodic_peer_discovery(Arc::clone(&consensus)); - #[cfg(feature = "raft")] - { - //TODO replace this for a synchronous call - let rx_pending_txs: broadcast::Receiver = miner.notifier_pending_txs.subscribe(); - let rx_blocks: broadcast::Receiver = miner.notifier_blocks.subscribe(); - propagation::initialize_transaction_execution_queue(Arc::clone(&consensus)); - propagation::initialize_append_entries_channel(Arc::clone(&consensus), rx_pending_txs, rx_blocks); - server::initialize_server(Arc::clone(&consensus)); - } - Self::initialize_heartbeat_timer(Arc::clone(&consensus)); - - tracing::info!(my_address = %my_address, "consensus module initialized"); - consensus - } - - fn discover_my_address(jsonrpc_port: u16, grpc_port: u16) -> PeerAddress { - let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); - socket.connect("8.8.8.8:80").ok().unwrap(); - let my_ip = socket.local_addr().ok().map(|addr| addr.ip().to_string()).unwrap(); - - PeerAddress::new(format!("http://{}", my_ip), jsonrpc_port, grpc_port) - } - - /// Initializes the heartbeat and election timers. - /// This function periodically checks if the node should start a new election based on the election timeout. - /// The timer is reset when an `AppendEntries` request is received, ensuring the node remains a follower if a leader is active. - /// - /// When there are healthy peers we need to wait for the grace period of discovery - /// to avoid starting an election too soon (due to the leader not being discovered yet) - fn initialize_heartbeat_timer(consensus: Arc) { - const TASK_NAME: &str = "consensus::heartbeat_timer"; - spawn_named(TASK_NAME, async move { - discovery::discover_peers(Arc::clone(&consensus)).await; - if consensus.peers.read().await.is_empty() { - tracing::info!("no peers, starting hearbeat timer immediately"); - Self::start_election(Arc::clone(&consensus)).await; - } else { - traced_sleep(PEER_DISCOVERY_DELAY, SleepReason::Interval).await; - tracing::info!("waiting for peer discovery grace period"); - } - - let timeout = consensus.heartbeat_timeout; - loop { - tokio::select! { - _ = GlobalState::wait_shutdown_warn(TASK_NAME) => { - return; - }, - _ = traced_sleep(timeout, SleepReason::Interval) => { - if !Self::is_leader() { - tracing::info!("starting election due to heartbeat timeout"); - Self::start_election(Arc::clone(&consensus)).await; - } else { - let current_term = consensus.current_term.load(Ordering::SeqCst); - tracing::info!(current_term = current_term, "heartbeat timeout reached, but I am the leader, so we ignore the election"); - } - }, - _ = consensus.reset_heartbeat_signal.notified() => { - // Timer reset upon receiving AppendEntries - match consensus.leader_address().await { - Ok(leader_address) => tracing::info!(leader_address = %leader_address, "resetting election timer due to AppendEntries"), - Err(e) => tracing::warn!(error = %e, "resetting election timer due to AppendEntries, but leader not found"), // this should not happen, but if it does it's because the leader changed in the middle of an append entry - } - }, - } - } - }); - } - - /// Starts the election process for the consensus module. - /// - /// This method is called when a node suspects that there is no active leader in the cluster. - /// The node increments its term and votes for itself, then sends RequestVote RPCs to all other nodes in the cluster. - /// If the node receives a majority of votes, it becomes the leader. Otherwise, it remains a follower and waits for the next election timeout. - /// - /// # Details - /// - /// - The method first increments the current term and votes for itself. - /// - It then sends out `RequestVote` RPCs to all known peers. - /// - If a majority of the peers grant their votes, the node transitions to the leader role. - /// - If not, it remains a follower and waits for the next election cycle. - async fn start_election(consensus: Arc) { - #[cfg(feature = "metrics")] - let start = metrics::now(); - - discovery::discover_peers(Arc::clone(&consensus)).await; - - let term = consensus.current_term.fetch_add(1, Ordering::SeqCst) + 1; - consensus.current_term.store(term, Ordering::SeqCst); - - *consensus.voted_for.lock().await = Some(consensus.my_address.clone()); - - let mut votes = 1; // Vote for self - - let peer_addresses = { - let peers = consensus.peers.read().await; - peers.keys().cloned().collect::>() - }; - - tracing::info!( - requested_term = term, - candidate_id = %consensus.my_address, - "requesting vote on election for {} peers", - peer_addresses.len() - ); - - for peer_address in peer_addresses { - let peer_clone = { - let peers = consensus.peers.read().await; - peers.get(&peer_address).map(|(p, _)| p.clone()) - }; - - if let Some(mut peer) = peer_clone { - let request = Request::new(RequestVoteRequest { - term, - candidate_id: consensus.my_address.to_string(), - last_log_index: consensus.prev_log_index.load(Ordering::SeqCst), - last_log_term: term, - }); - - match peer.client.request_vote(request).await { - Ok(response) => { - let response_inner = response.into_inner(); - if response_inner.vote_granted { - let current_term = consensus.current_term.load(Ordering::SeqCst); - if response_inner.term == current_term { - tracing::info!(peer_address = %peer_address, "received vote on election"); - votes += 1; - } else { - // this usually happens when we have either a split brain or a network issue, maybe both - tracing::error!( - peer_address = %peer_address, - expected_term = response_inner.term, - "received vote on election with different term" - ); - } - } else { - tracing::info!(peer_address = %peer_address, "did not receive vote on election"); - } - } - Err(_) => { - tracing::warn!("failed to request vote on election from {:?}", peer_address); - } - } - } - } - - let total_nodes = { - let peers = consensus.peers.read().await; - peers.len() + 1 // Including self - }; - let majority = total_nodes / 2 + 1; - - if votes >= majority { - tracing::info!(votes = votes, peers = total_nodes - 1, term = term, "became the leader on election"); - consensus.become_leader().await; - } else { - tracing::info!(votes = votes, peers = total_nodes - 1, term = term, "failed to become the leader on election"); - Self::set_role(Role::Follower); - } - - #[cfg(feature = "metrics")] - metrics::inc_consensus_start_election(start.elapsed()); - } - - /// When importer config is set, it will refresh the blockchain client (substrate or any other blockchain client) - /// however, if stratus is on miner mode, it will clear the blockchain client for safety reasons, so it has no chance to forward anything to a follower node - async fn become_leader(&self) { - // checks if it's running on importer-online mode - if self.importer_config.is_some() { - self.refresh_blockchain_client().await; - } else { - let mut blockchain_client_lock = self.blockchain_client.lock().await; - *blockchain_client_lock = None; // clear the blockchain client for safety reasons when not running on importer-online mode - } - - let last_index: u64 = self.log_entries_storage.get_last_index().unwrap_or(0); - - let next_index = last_index + 1; - // When a node becomes a leader, it should reset the match_index for all peers. - // Also, the next_index should be set to the last index + 1. - { - let mut peers = self.peers.write().await; - for (peer, _) in peers.values_mut() { - peer.match_index = 0; - peer.next_index = next_index; - } - } - - Self::set_role(Role::Leader); - } - - async fn refresh_blockchain_client(&self) { - let (http_url, _) = self.get_chain_url().await.expect("failed to get chain url"); - let mut blockchain_client_lock = self.blockchain_client.lock().await; - - tracing::info!(http_url = http_url, "changing blockchain client"); - - *blockchain_client_lock = Some( - BlockchainClient::new_http(&http_url, Duration::from_secs(2)) - .await - .expect("failed to create blockchain client") - .into(), - ); - } - - fn initialize_periodic_peer_discovery(consensus: Arc) { - const TASK_NAME: &str = "consensus::peer_discovery"; - spawn_named(TASK_NAME, async move { - let mut interval = tokio::time::interval(PEER_DISCOVERY_DELAY); - - let periodic_discover = || async move { - loop { - discovery::discover_peers(Arc::clone(&consensus)).await; - interval.tick().await; - } - }; - - tokio::select! { - _ = GlobalState::wait_shutdown_warn(TASK_NAME) => {}, - _ = periodic_discover() => { - unreachable!("this infinite future doesn't end"); - }, - }; - }); - } - - fn set_role(role: Role) { - if ROLE.load(Ordering::SeqCst) == role as u8 { - tracing::info!(role = ?role, "role remains the same"); - return; - } - - tracing::info!(role = ?role, "setting role"); - ROLE.store(role as u8, Ordering::SeqCst); - - #[cfg(feature = "metrics")] - { - if role == Role::Leader { - metrics::set_consensus_is_leader(1_u64); - metrics::inc_consensus_leadership_change(); - } else { - metrics::set_consensus_is_leader(0_u64); - } - } - } - - //FIXME TODO automate the way we gather the leader, instead of using a env var - pub fn is_leader() -> bool { - ROLE.load(Ordering::SeqCst) == Role::Leader as u8 - } - - pub fn is_follower() -> bool { - ROLE.load(Ordering::SeqCst) == Role::Follower as u8 - } - - pub fn current_term(&self) -> u64 { - self.current_term.load(Ordering::SeqCst) - } - - pub fn last_index(&self) -> u64 { - self.prev_log_index.load(Ordering::SeqCst) - } - - pub fn should_forward(&self) -> bool { - let is_leader = Self::is_leader(); - tracing::info!( - is_leader = is_leader, - sync_online_enabled = self.importer_config.is_some(), - "handling request forward" - ); - if is_leader && self.importer_config.is_none() { - return false; // the leader is on miner mode and should deal with the requests - } - true - } - - pub async fn forward(&self, transaction: Bytes) -> anyhow::Result<(Hash, String)> { - #[cfg(feature = "metrics")] - let start = metrics::now(); - - let blockchain_client_lock = self.blockchain_client.lock().await; - - let Some(ref blockchain_client) = *blockchain_client_lock else { - return Err(anyhow::anyhow!("blockchain client is not set, cannot forward transaction")); - }; - - let result = blockchain_client.send_raw_transaction(transaction.into()).await?; - - #[cfg(feature = "metrics")] - metrics::inc_consensus_forward(start.elapsed()); - - Ok((result.tx_hash, blockchain_client.http_url.clone())) - } - - pub async fn should_serve(&self) -> bool { - if self.importer_config.is_some() { - //gather the latest block number, check how far behind it is from current storage block - //if its greater than 3 blocks of distance, it should not be served - let blockchain_client_lock = self.blockchain_client.lock().await; - - let Some(ref blockchain_client) = *blockchain_client_lock else { - tracing::error!("blockchain client is not set at importer, cannot serve requests because they cant be forwarded"); - return false; - }; - - let Ok(validator_block_number) = blockchain_client.fetch_block_number().await else { - tracing::error!("unable to fetch latest block number"); - return false; - }; - - let Ok(current_block_number) = self.storage.read_mined_block_number() else { - tracing::error!("unable to fetch current block number"); - return false; - }; - - return (validator_block_number.as_u64() - 3) <= current_block_number.as_u64(); - } - - // consensus - if Self::is_leader() { - return true; - } - - if self.blockchain_client.lock().await.is_none() { - tracing::warn!("blockchain client is not set, cannot serve requests because they cant be forwarded"); - return false; - } - - let last_arrived_block_number = self.last_arrived_block_number.load(Ordering::SeqCst); - - if last_arrived_block_number == 0 { - tracing::warn!("no appendEntry has been received yet"); - false - } else { - { - let storage_block_number: u64 = self.storage.read_mined_block_number().unwrap_or_default().into(); - - tracing::info!( - "last arrived block number: {}, storage block number: {}", - last_arrived_block_number, - storage_block_number - ); - - if (last_arrived_block_number - 3) <= storage_block_number { - tracing::info!("should serve request"); - true - } else { - let diff = (last_arrived_block_number as i128) - (storage_block_number as i128); - tracing::warn!(diff = diff, "should not serve request"); - false - } - } - } - } - - async fn leader_address(&self) -> anyhow::Result { - let peers = self.peers.read().await; - for (address, (peer, _)) in peers.iter() { - if peer.role == Role::Leader { - return Ok(address.clone()); - } - } - Err(anyhow!("Leader not found")) - } - - pub async fn get_chain_url(&self) -> anyhow::Result<(String, Option)> { - if let Some(importer_config) = self.importer_config.clone() { - return Ok((importer_config.online.external_rpc, importer_config.online.external_rpc_ws)); - } - - if Self::is_follower() { - if let Ok(leader_address) = self.leader_address().await { - return Ok((leader_address.full_jsonrpc_address(), None)); - } - } - - Err(anyhow!("tried to get chain url as a leader and while running on miner mode")) - } - - async fn update_leader(&self, leader_address: PeerAddress) { - if leader_address == self.leader_address().await.unwrap_or_default() { - tracing::info!("leader is the same as before"); - return; - } - - let mut peers = self.peers.write().await; - for (address, (peer, _)) in peers.iter_mut() { - if *address == leader_address { - peer.role = Role::Leader; - - self.refresh_blockchain_client().await; - } else { - peer.role = Role::Follower; - } - } - - tracing::info!(leader = %leader_address, "updated leader information"); - } -} - -#[cfg(test)] -mod tests { - use super::*; - pub mod factories; - mod test_simple_blocks; - - #[test] - fn test_peer_address_from_string_valid_http() { - let input = "http://127.0.0.1:3000;3777".to_string(); - let result = PeerAddress::from_string(input); - - assert!(result.is_ok()); - let peer_address = result.unwrap(); - assert_eq!(peer_address.address, "http://127.0.0.1"); - assert_eq!(peer_address.jsonrpc_port, 3000); - assert_eq!(peer_address.grpc_port, 3777); - } - - #[test] - fn test_peer_address_from_string_valid_https() { - let input = "https://127.0.0.1:3000;3777".to_string(); - let result = PeerAddress::from_string(input); - - assert!(result.is_ok()); - let peer_address = result.unwrap(); - assert_eq!(peer_address.address, "https://127.0.0.1"); - assert_eq!(peer_address.jsonrpc_port, 3000); - assert_eq!(peer_address.grpc_port, 3777); - } - - #[test] - fn test_peer_address_from_string_invalid_format() { - let input = "http://127.0.0.1-3000;3777".to_string(); - let result = PeerAddress::from_string(input); - - assert!(result.is_err()); - assert_eq!(result.err().unwrap().to_string(), "invalid format"); - } - - #[test] - fn test_peer_address_from_string_missing_scheme() { - let input = "127.0.0.1:3000;3777".to_string(); - let result = PeerAddress::from_string(input); - - assert!(result.is_err()); - assert_eq!(result.err().unwrap().to_string(), "invalid scheme"); - } - #[test] - fn test_peer_address_full_grpc_address() { - let peer_address = PeerAddress::new("127.0.0.1".to_string(), 3000, 3777); - assert_eq!(peer_address.full_grpc_address(), "127.0.0.1:3777"); - } +#[async_trait] +pub trait Consensus: Send + Sync { + async fn should_serve(&self) -> bool; + fn should_forward(&self) -> bool; + async fn forward(&self, transaction: Bytes) -> anyhow::Result; } diff --git a/src/eth/consensus/append_log_entries_storage.rs b/src/eth/consensus/raft/append_log_entries_storage.rs similarity index 99% rename from src/eth/consensus/append_log_entries_storage.rs rename to src/eth/consensus/raft/append_log_entries_storage.rs index 7331b4fb7..b17840f5a 100644 --- a/src/eth/consensus/append_log_entries_storage.rs +++ b/src/eth/consensus/raft/append_log_entries_storage.rs @@ -141,7 +141,7 @@ mod tests { use tempfile::TempDir; use super::*; - use crate::eth::consensus::tests::factories::*; + use crate::eth::consensus::raft::tests::factories::*; fn setup_storage() -> AppendLogEntriesStorage { let temp_dir = TempDir::new().unwrap(); diff --git a/src/eth/consensus/discovery.rs b/src/eth/consensus/raft/discovery.rs similarity index 97% rename from src/eth/consensus/discovery.rs rename to src/eth/consensus/raft/discovery.rs index 300c34b71..6ae17c8e0 100644 --- a/src/eth/consensus/discovery.rs +++ b/src/eth/consensus/raft/discovery.rs @@ -9,9 +9,9 @@ use tokio::sync::Mutex; #[cfg(not(test))] use super::append_entry::append_entry_service_client::AppendEntryServiceClient; -use super::Consensus; use super::Peer; use super::PeerAddress; +use super::Raft; #[cfg(not(test))] use super::Role; use crate::ext::spawn_named; @@ -19,7 +19,7 @@ use crate::ext::spawn_named; use crate::infra::metrics; #[tracing::instrument(skip_all)] -pub async fn discover_peers(consensus: Arc) { +pub async fn discover_peers(consensus: Arc) { #[allow(unused_mut)] let mut new_peers: Vec<(PeerAddress, Peer)> = Vec::new(); @@ -79,7 +79,7 @@ pub async fn discover_peers(consensus: Arc) { } #[cfg(not(test))] // FIXME: This is a workaround to avoid running this code in tests we need a proper Tonic mock -async fn discover_peers_env(addresses: &[String], consensus: Arc) -> Result, anyhow::Error> { +async fn discover_peers_env(addresses: &[String], consensus: Arc) -> Result, anyhow::Error> { #[allow(unused_mut)] let mut peers: Vec<(PeerAddress, Peer)> = Vec::new(); diff --git a/src/eth/consensus/log_entry.rs b/src/eth/consensus/raft/log_entry.rs similarity index 100% rename from src/eth/consensus/log_entry.rs rename to src/eth/consensus/raft/log_entry.rs diff --git a/src/eth/consensus/raft/mod.rs b/src/eth/consensus/raft/mod.rs new file mode 100644 index 000000000..c876b4b65 --- /dev/null +++ b/src/eth/consensus/raft/mod.rs @@ -0,0 +1,683 @@ +//FIXME temporarily there are a lot of allow dead_code and allow unused_imports, we need to deal with them properly later +#[allow(dead_code)] //TODO remove this +mod append_log_entries_storage; +mod discovery; +mod log_entry; +mod propagation; +pub mod utils; + +mod server; + +use std::collections::HashMap; +use std::net::SocketAddr; +use std::net::UdpSocket; +use std::sync::atomic::AtomicU64; +use std::sync::atomic::AtomicU8; +use std::sync::atomic::Ordering; +use std::sync::Arc; +use std::time::Duration; + +use anyhow::anyhow; +use async_trait::async_trait; +use rand::Rng; +use tokio::sync::broadcast; +use tokio::sync::Mutex; +use tokio::sync::RwLock; +use tokio::task::JoinHandle; +use tonic::Request; + +use crate::eth::importer::ImporterConfig; +use crate::eth::primitives::Hash; +#[allow(unused_imports)] +use crate::eth::primitives::TransactionExecution; +use crate::eth::storage::StratusStorage; +use crate::ext::spawn_named; +use crate::ext::traced_sleep; +use crate::ext::SleepReason; +use crate::infra::BlockchainClient; +use crate::GlobalState; + +pub mod append_entry { + #![allow(clippy::default_trait_access, clippy::wildcard_imports)] + tonic::include_proto!("append_entry"); +} +#[allow(unused_imports)] +use append_entry::append_entry_service_client::AppendEntryServiceClient; +use append_entry::append_entry_service_server::AppendEntryService; +use append_entry::RequestVoteRequest; +use append_entry::TransactionExecutionEntry; + +use self::append_log_entries_storage::AppendLogEntriesStorage; +use self::log_entry::LogEntryData; +use super::Consensus; +use crate::eth::miner::Miner; +use crate::eth::primitives::Block; +use crate::eth::primitives::Bytes; +#[cfg(feature = "metrics")] +use crate::infra::metrics; + +const PEER_DISCOVERY_DELAY: Duration = Duration::from_secs(30); + +#[derive(Clone, Copy, Debug, PartialEq)] +enum Role { + Leader = 1, + Follower = 2, + _Candidate = 3, +} + +static ROLE: AtomicU8 = AtomicU8::new(Role::Follower as u8); + +#[derive(Clone, Debug, Default, Hash, Eq, PartialEq)] +struct PeerAddress { + address: String, + jsonrpc_port: u16, + grpc_port: u16, +} + +impl PeerAddress { + fn new(address: String, jsonrpc_port: u16, grpc_port: u16) -> Self { + PeerAddress { + address, + jsonrpc_port, + grpc_port, + } + } + + fn full_grpc_address(&self) -> String { + format!("{}:{}", self.address, self.grpc_port) + } + + fn full_jsonrpc_address(&self) -> String { + format!("http://{}:{}", self.address, self.jsonrpc_port) + } + + fn from_string(s: String) -> Result { + let (scheme, address_part) = if let Some(address) = s.strip_prefix("http://") { + ("http://", address) + } else if let Some(address) = s.strip_prefix("https://") { + ("https://", address) + } else { + return Err(anyhow::anyhow!("invalid scheme")); + }; + + let parts: Vec<&str> = address_part.split(':').collect(); + if parts.len() != 2 { + return Err(anyhow::anyhow!("invalid format")); + } + let address = format!("{}{}", scheme, parts[0]); + let ports: Vec<&str> = parts[1].split(';').collect(); + if ports.len() != 2 { + return Err(anyhow::anyhow!("invalid format for jsonrpc and grpc ports")); + } + let jsonrpc_port = ports[0].parse::()?; + let grpc_port = ports[1].parse::()?; + Ok(PeerAddress { + address, + jsonrpc_port, + grpc_port, + }) + } +} + +use std::fmt; + +impl fmt::Display for PeerAddress { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}:{};{}", self.address, self.jsonrpc_port, self.grpc_port) + } +} + +#[cfg(test)] +use crate::eth::consensus::raft::tests::factories::MockAppendEntryServiceClient; + +#[cfg(not(test))] +type ClientType = AppendEntryServiceClient; +#[cfg(test)] +type ClientType = MockAppendEntryServiceClient; + +#[derive(Clone)] +struct Peer { + client: ClientType, + match_index: u64, + next_index: u64, + role: Role, + receiver: Arc>>, +} + +type PeerTuple = (Peer, JoinHandle<()>); + +pub struct Raft { + broadcast_sender: broadcast::Sender, //propagates the blocks + importer_config: Option, //HACK this is used with sync online only + storage: Arc, + miner: Arc, + log_entries_storage: Arc, + peers: Arc>>, + #[allow(dead_code)] + direct_peers: Vec, + voted_for: Mutex>, //essential to ensure that a server only votes once per term + current_term: AtomicU64, + last_arrived_block_number: AtomicU64, // kept for should_serve method check + prev_log_index: AtomicU64, + transaction_execution_queue: Arc>>, + heartbeat_timeout: Duration, + my_address: PeerAddress, + #[allow(dead_code)] + grpc_address: SocketAddr, + reset_heartbeat_signal: tokio::sync::Notify, + blockchain_client: Mutex>>, +} + +impl Raft { + #[allow(clippy::too_many_arguments)] //TODO: refactor into consensus config + pub fn new( + storage: Arc, + miner: Arc, + log_storage_path: Option, + direct_peers: Vec, + importer_config: Option, + jsonrpc_address: SocketAddr, + grpc_address: SocketAddr, + ) -> Arc { + let (broadcast_sender, _) = broadcast::channel(32); //TODO rename to internal_peer_broadcast_sender + let last_arrived_block_number = AtomicU64::new(0); + let peers = Arc::new(RwLock::new(HashMap::new())); + let my_address = Self::discover_my_address(jsonrpc_address.port(), grpc_address.port()); + + let log_entries_storage: Arc = Arc::new(AppendLogEntriesStorage::new(log_storage_path).unwrap()); + let current_term: u64 = log_entries_storage.get_last_term().unwrap_or(1); + let prev_log_index: u64 = log_entries_storage.get_last_index().unwrap_or(0); + + let consensus = Self { + broadcast_sender, + miner: Arc::clone(&miner), + storage, + log_entries_storage, + peers, + direct_peers, + current_term: AtomicU64::new(current_term), + voted_for: Mutex::new(None), + prev_log_index: AtomicU64::new(prev_log_index), + last_arrived_block_number, + transaction_execution_queue: Arc::new(Mutex::new(Vec::new())), + importer_config, + heartbeat_timeout: Duration::from_millis(rand::thread_rng().gen_range(300..400)), // Adjust as needed + my_address: my_address.clone(), + grpc_address, + reset_heartbeat_signal: tokio::sync::Notify::new(), + blockchain_client: Mutex::new(None), + }; + let consensus = Arc::new(consensus); + + Self::initialize_periodic_peer_discovery(Arc::clone(&consensus)); + #[cfg(feature = "raft")] + { + //TODO replace this for a synchronous call + let rx_pending_txs: broadcast::Receiver = miner.notifier_pending_txs.subscribe(); + let rx_blocks: broadcast::Receiver = miner.notifier_blocks.subscribe(); + propagation::initialize_transaction_execution_queue(Arc::clone(&consensus)); + propagation::initialize_append_entries_channel(Arc::clone(&consensus), rx_pending_txs, rx_blocks); + server::initialize_server(Arc::clone(&consensus)); + } + Self::initialize_heartbeat_timer(Arc::clone(&consensus)); + + tracing::info!(my_address = %my_address, "consensus module initialized"); + consensus + } + + fn discover_my_address(jsonrpc_port: u16, grpc_port: u16) -> PeerAddress { + let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); + socket.connect("8.8.8.8:80").ok().unwrap(); + let my_ip = socket.local_addr().ok().map(|addr| addr.ip().to_string()).unwrap(); + + PeerAddress::new(format!("http://{}", my_ip), jsonrpc_port, grpc_port) + } + + /// Initializes the heartbeat and election timers. + /// This function periodically checks if the node should start a new election based on the election timeout. + /// The timer is reset when an `AppendEntries` request is received, ensuring the node remains a follower if a leader is active. + /// + /// When there are healthy peers we need to wait for the grace period of discovery + /// to avoid starting an election too soon (due to the leader not being discovered yet) + fn initialize_heartbeat_timer(consensus: Arc) { + const TASK_NAME: &str = "consensus::heartbeat_timer"; + spawn_named(TASK_NAME, async move { + discovery::discover_peers(Arc::clone(&consensus)).await; + if consensus.peers.read().await.is_empty() { + tracing::info!("no peers, starting hearbeat timer immediately"); + Self::start_election(Arc::clone(&consensus)).await; + } else { + traced_sleep(PEER_DISCOVERY_DELAY, SleepReason::Interval).await; + tracing::info!("waiting for peer discovery grace period"); + } + + let timeout = consensus.heartbeat_timeout; + loop { + tokio::select! { + _ = GlobalState::wait_shutdown_warn(TASK_NAME) => { + return; + }, + _ = traced_sleep(timeout, SleepReason::Interval) => { + if !Self::is_leader() { + tracing::info!("starting election due to heartbeat timeout"); + Self::start_election(Arc::clone(&consensus)).await; + } else { + let current_term = consensus.current_term.load(Ordering::SeqCst); + tracing::info!(current_term = current_term, "heartbeat timeout reached, but I am the leader, so we ignore the election"); + } + }, + _ = consensus.reset_heartbeat_signal.notified() => { + // Timer reset upon receiving AppendEntries + match consensus.leader_address().await { + Ok(leader_address) => tracing::info!(leader_address = %leader_address, "resetting election timer due to AppendEntries"), + Err(e) => tracing::warn!(error = %e, "resetting election timer due to AppendEntries, but leader not found"), // this should not happen, but if it does it's because the leader changed in the middle of an append entry + } + }, + } + } + }); + } + + /// Starts the election process for the consensus module. + /// + /// This method is called when a node suspects that there is no active leader in the cluster. + /// The node increments its term and votes for itself, then sends RequestVote RPCs to all other nodes in the cluster. + /// If the node receives a majority of votes, it becomes the leader. Otherwise, it remains a follower and waits for the next election timeout. + /// + /// # Details + /// + /// - The method first increments the current term and votes for itself. + /// - It then sends out `RequestVote` RPCs to all known peers. + /// - If a majority of the peers grant their votes, the node transitions to the leader role. + /// - If not, it remains a follower and waits for the next election cycle. + async fn start_election(consensus: Arc) { + #[cfg(feature = "metrics")] + let start = metrics::now(); + + discovery::discover_peers(Arc::clone(&consensus)).await; + + let term = consensus.current_term.fetch_add(1, Ordering::SeqCst) + 1; + consensus.current_term.store(term, Ordering::SeqCst); + + *consensus.voted_for.lock().await = Some(consensus.my_address.clone()); + + let mut votes = 1; // Vote for self + + let peer_addresses = { + let peers = consensus.peers.read().await; + peers.keys().cloned().collect::>() + }; + + tracing::info!( + requested_term = term, + candidate_id = %consensus.my_address, + "requesting vote on election for {} peers", + peer_addresses.len() + ); + + for peer_address in peer_addresses { + let peer_clone = { + let peers = consensus.peers.read().await; + peers.get(&peer_address).map(|(p, _)| p.clone()) + }; + + if let Some(mut peer) = peer_clone { + let request = Request::new(RequestVoteRequest { + term, + candidate_id: consensus.my_address.to_string(), + last_log_index: consensus.prev_log_index.load(Ordering::SeqCst), + last_log_term: term, + }); + + match peer.client.request_vote(request).await { + Ok(response) => { + let response_inner = response.into_inner(); + if response_inner.vote_granted { + let current_term = consensus.current_term.load(Ordering::SeqCst); + if response_inner.term == current_term { + tracing::info!(peer_address = %peer_address, "received vote on election"); + votes += 1; + } else { + // this usually happens when we have either a split brain or a network issue, maybe both + tracing::error!( + peer_address = %peer_address, + expected_term = response_inner.term, + "received vote on election with different term" + ); + } + } else { + tracing::info!(peer_address = %peer_address, "did not receive vote on election"); + } + } + Err(_) => { + tracing::warn!("failed to request vote on election from {:?}", peer_address); + } + } + } + } + + let total_nodes = { + let peers = consensus.peers.read().await; + peers.len() + 1 // Including self + }; + let majority = total_nodes / 2 + 1; + + if votes >= majority { + tracing::info!(votes = votes, peers = total_nodes - 1, term = term, "became the leader on election"); + consensus.become_leader().await; + } else { + tracing::info!(votes = votes, peers = total_nodes - 1, term = term, "failed to become the leader on election"); + Self::set_role(Role::Follower); + } + + #[cfg(feature = "metrics")] + metrics::inc_consensus_start_election(start.elapsed()); + } + + /// When importer config is set, it will refresh the blockchain client (substrate or any other blockchain client) + /// however, if stratus is on miner mode, it will clear the blockchain client for safety reasons, so it has no chance to forward anything to a follower node + async fn become_leader(&self) { + // checks if it's running on importer-online mode + if self.importer_config.is_some() { + self.refresh_blockchain_client().await; + } else { + let mut blockchain_client_lock = self.blockchain_client.lock().await; + *blockchain_client_lock = None; // clear the blockchain client for safety reasons when not running on importer-online mode + } + + let last_index: u64 = self.log_entries_storage.get_last_index().unwrap_or(0); + + let next_index = last_index + 1; + // When a node becomes a leader, it should reset the match_index for all peers. + // Also, the next_index should be set to the last index + 1. + { + let mut peers = self.peers.write().await; + for (peer, _) in peers.values_mut() { + peer.match_index = 0; + peer.next_index = next_index; + } + } + + Self::set_role(Role::Leader); + } + + async fn refresh_blockchain_client(&self) { + let (http_url, _) = self.get_chain_url().await.expect("failed to get chain url"); + let mut blockchain_client_lock = self.blockchain_client.lock().await; + + tracing::info!(http_url = http_url, "changing blockchain client"); + + *blockchain_client_lock = Some( + BlockchainClient::new_http(&http_url, Duration::from_secs(2)) + .await + .expect("failed to create blockchain client") + .into(), + ); + } + + fn initialize_periodic_peer_discovery(consensus: Arc) { + const TASK_NAME: &str = "consensus::peer_discovery"; + spawn_named(TASK_NAME, async move { + let mut interval = tokio::time::interval(PEER_DISCOVERY_DELAY); + + let periodic_discover = || async move { + loop { + discovery::discover_peers(Arc::clone(&consensus)).await; + interval.tick().await; + } + }; + + tokio::select! { + _ = GlobalState::wait_shutdown_warn(TASK_NAME) => {}, + _ = periodic_discover() => { + unreachable!("this infinite future doesn't end"); + }, + }; + }); + } + + fn set_role(role: Role) { + if ROLE.load(Ordering::SeqCst) == role as u8 { + tracing::info!(role = ?role, "role remains the same"); + return; + } + + tracing::info!(role = ?role, "setting role"); + ROLE.store(role as u8, Ordering::SeqCst); + + #[cfg(feature = "metrics")] + { + if role == Role::Leader { + metrics::set_consensus_is_leader(1_u64); + metrics::inc_consensus_leadership_change(); + } else { + metrics::set_consensus_is_leader(0_u64); + } + } + } + + //FIXME TODO automate the way we gather the leader, instead of using a env var + pub fn is_leader() -> bool { + ROLE.load(Ordering::SeqCst) == Role::Leader as u8 + } + + pub fn is_follower() -> bool { + ROLE.load(Ordering::SeqCst) == Role::Follower as u8 + } + + pub fn current_term(&self) -> u64 { + self.current_term.load(Ordering::SeqCst) + } + + pub fn last_index(&self) -> u64 { + self.prev_log_index.load(Ordering::SeqCst) + } + + pub fn should_forward(&self) -> bool { + let is_leader = Self::is_leader(); + tracing::info!( + is_leader = is_leader, + sync_online_enabled = self.importer_config.is_some(), + "handling request forward" + ); + if is_leader && self.importer_config.is_none() { + return false; // the leader is on miner mode and should deal with the requests + } + true + } + + pub async fn forward(&self, transaction: Bytes) -> anyhow::Result<(Hash, String)> { + #[cfg(feature = "metrics")] + let start = metrics::now(); + + let blockchain_client_lock = self.blockchain_client.lock().await; + + let Some(ref blockchain_client) = *blockchain_client_lock else { + return Err(anyhow::anyhow!("blockchain client is not set, cannot forward transaction")); + }; + + let result = blockchain_client.send_raw_transaction(transaction.into()).await?; + + #[cfg(feature = "metrics")] + metrics::inc_consensus_forward(start.elapsed()); + + Ok((result.tx_hash, blockchain_client.http_url.clone())) //XXX HEX + } + + pub async fn should_serve(&self) -> bool { + if self.importer_config.is_some() { + //gather the latest block number, check how far behind it is from current storage block + //if its greater than 3 blocks of distance, it should not be served + let blockchain_client_lock = self.blockchain_client.lock().await; + + let Some(ref blockchain_client) = *blockchain_client_lock else { + tracing::error!("blockchain client is not set at importer, cannot serve requests because they cant be forwarded"); + return false; + }; + + let Ok(validator_block_number) = blockchain_client.fetch_block_number().await else { + tracing::error!("unable to fetch latest block number"); + return false; + }; + + let Ok(current_block_number) = self.storage.read_mined_block_number() else { + tracing::error!("unable to fetch current block number"); + return false; + }; + + return (validator_block_number.as_u64() - 3) <= current_block_number.as_u64(); + } + + // consensus + if Self::is_leader() { + return true; + } + + if self.blockchain_client.lock().await.is_none() { + tracing::warn!("blockchain client is not set, cannot serve requests because they cant be forwarded"); + return false; + } + + let last_arrived_block_number = self.last_arrived_block_number.load(Ordering::SeqCst); + + if last_arrived_block_number == 0 { + tracing::warn!("no appendEntry has been received yet"); + false + } else { + { + let storage_block_number: u64 = self.storage.read_mined_block_number().unwrap_or_default().into(); + + tracing::info!( + "last arrived block number: {}, storage block number: {}", + last_arrived_block_number, + storage_block_number + ); + + if (last_arrived_block_number - 3) <= storage_block_number { + tracing::info!("should serve request"); + true + } else { + let diff = (last_arrived_block_number as i128) - (storage_block_number as i128); + tracing::warn!(diff = diff, "should not serve request"); + false + } + } + } + } + + async fn leader_address(&self) -> anyhow::Result { + let peers = self.peers.read().await; + for (address, (peer, _)) in peers.iter() { + if peer.role == Role::Leader { + return Ok(address.clone()); + } + } + Err(anyhow!("Leader not found")) + } + + pub async fn get_chain_url(&self) -> anyhow::Result<(String, Option)> { + if let Some(importer_config) = self.importer_config.clone() { + return Ok((importer_config.external_rpc, importer_config.external_rpc_ws)); + } + + if Self::is_follower() { + if let Ok(leader_address) = self.leader_address().await { + return Ok((leader_address.full_jsonrpc_address(), None)); + } + } + + Err(anyhow!("tried to get chain url as a leader and while running on miner mode")) + } + + async fn update_leader(&self, leader_address: PeerAddress) { + if leader_address == self.leader_address().await.unwrap_or_default() { + tracing::info!("leader is the same as before"); + return; + } + + let mut peers = self.peers.write().await; + for (address, (peer, _)) in peers.iter_mut() { + if *address == leader_address { + peer.role = Role::Leader; + + self.refresh_blockchain_client().await; + } else { + peer.role = Role::Follower; + } + } + + tracing::info!(leader = %leader_address, "updated leader information"); + } +} + +#[async_trait] +impl Consensus for Raft { + async fn should_serve(&self) -> bool { + self.should_serve().await + } + + fn should_forward(&self) -> bool { + self.should_forward() + } + + async fn forward(&self, transaction: Bytes) -> anyhow::Result { + let (tx_hash, url) = self.forward(transaction).await?; + tracing::info!(%tx_hash, %url, "forwarded eth_sendRawTransaction to leader"); + Ok(tx_hash) + } +} + +#[cfg(test)] +mod tests { + use super::*; + pub mod factories; + mod test_simple_blocks; + + #[test] + fn test_peer_address_from_string_valid_http() { + let input = "http://127.0.0.1:3000;3777".to_string(); + let result = PeerAddress::from_string(input); + + assert!(result.is_ok()); + let peer_address = result.unwrap(); + assert_eq!(peer_address.address, "http://127.0.0.1"); + assert_eq!(peer_address.jsonrpc_port, 3000); + assert_eq!(peer_address.grpc_port, 3777); + } + + #[test] + fn test_peer_address_from_string_valid_https() { + let input = "https://127.0.0.1:3000;3777".to_string(); + let result = PeerAddress::from_string(input); + + assert!(result.is_ok()); + let peer_address = result.unwrap(); + assert_eq!(peer_address.address, "https://127.0.0.1"); + assert_eq!(peer_address.jsonrpc_port, 3000); + assert_eq!(peer_address.grpc_port, 3777); + } + + #[test] + fn test_peer_address_from_string_invalid_format() { + let input = "http://127.0.0.1-3000;3777".to_string(); + let result = PeerAddress::from_string(input); + + assert!(result.is_err()); + assert_eq!(result.err().unwrap().to_string(), "invalid format"); + } + + #[test] + fn test_peer_address_from_string_missing_scheme() { + let input = "127.0.0.1:3000;3777".to_string(); + let result = PeerAddress::from_string(input); + + assert!(result.is_err()); + assert_eq!(result.err().unwrap().to_string(), "invalid scheme"); + } + + #[test] + fn test_peer_address_full_grpc_address() { + let peer_address = PeerAddress::new("127.0.0.1".to_string(), 3000, 3777); + assert_eq!(peer_address.full_grpc_address(), "127.0.0.1:3777"); + } +} diff --git a/src/eth/consensus/propagation.rs b/src/eth/consensus/raft/propagation.rs similarity index 92% rename from src/eth/consensus/propagation.rs rename to src/eth/consensus/raft/propagation.rs index 6114bbc01..dc929a4df 100644 --- a/src/eth/consensus/propagation.rs +++ b/src/eth/consensus/raft/propagation.rs @@ -8,14 +8,14 @@ use tokio::sync::broadcast; use tonic::Request; use super::Block; -use super::Consensus; use super::LogEntryData; use super::Peer; -use crate::eth::consensus::append_entry::AppendBlockCommitRequest; -use crate::eth::consensus::append_entry::AppendBlockCommitResponse; -use crate::eth::consensus::append_entry::AppendTransactionExecutionsRequest; -use crate::eth::consensus::append_entry::AppendTransactionExecutionsResponse; -use crate::eth::consensus::append_entry::StatusCode; +use super::Raft; +use crate::eth::consensus::raft::append_entry::AppendBlockCommitRequest; +use crate::eth::consensus::raft::append_entry::AppendBlockCommitResponse; +use crate::eth::consensus::raft::append_entry::AppendTransactionExecutionsRequest; +use crate::eth::consensus::raft::append_entry::AppendTransactionExecutionsResponse; +use crate::eth::consensus::raft::append_entry::StatusCode; use crate::eth::primitives::TransactionExecution; use crate::ext::spawn_named; use crate::ext::traced_sleep; @@ -37,7 +37,7 @@ enum AppendResponse { } #[allow(dead_code)] -pub async fn save_and_handle_log_entry(consensus: Arc, log_entry_data: LogEntryData) -> Result<()> { +pub async fn save_and_handle_log_entry(consensus: Arc, log_entry_data: LogEntryData) -> Result<()> { let last_index = consensus.log_entries_storage.get_last_index().unwrap_or(0); tracing::debug!(last_index, "Last index fetched"); @@ -69,8 +69,8 @@ pub async fn save_and_handle_log_entry(consensus: Arc, log_entry_data } #[allow(dead_code)] -pub async fn handle_block_entry(consensus: Arc, block: Block) { - if Consensus::is_leader() { +pub async fn handle_block_entry(consensus: Arc, block: Block) { + if Raft::is_leader() { tracing::info!(number = block.header.number.as_u64(), "Leader received block to send to followers"); let transaction_hashes: Vec> = block.transactions.iter().map(|tx| tx.input.hash.as_fixed_bytes().to_vec()).collect(); @@ -83,8 +83,8 @@ pub async fn handle_block_entry(consensus: Arc, block: Block) { } #[allow(dead_code)] -pub async fn handle_transaction_executions(consensus: Arc) { - if Consensus::is_leader() { +pub async fn handle_transaction_executions(consensus: Arc) { + if Raft::is_leader() { let mut queue = consensus.transaction_execution_queue.lock().await; let executions = queue.drain(..).collect::>(); drop(queue); @@ -97,7 +97,7 @@ pub async fn handle_transaction_executions(consensus: Arc) { } } -pub async fn handle_peer_propagation(mut peer: Peer, consensus: Arc) { +pub async fn handle_peer_propagation(mut peer: Peer, consensus: Arc) { const TASK_NAME: &str = "consensus::propagate"; let mut log_entry_queue: Vec = Vec::new(); @@ -153,8 +153,8 @@ pub async fn handle_peer_propagation(mut peer: Peer, consensus: Arc) } } -async fn append_entry_to_peer(consensus: Arc, peer: &mut Peer, entry_data: &LogEntryData) -> Result<(), anyhow::Error> { - if !Consensus::is_leader() { +async fn append_entry_to_peer(consensus: Arc, peer: &mut Peer, entry_data: &LogEntryData) -> Result<(), anyhow::Error> { + if !Raft::is_leader() { tracing::error!("append_entry_to_peer called on non-leader node"); return Err(anyhow!("append_entry_to_peer called on non-leader node")); } @@ -253,7 +253,7 @@ async fn append_entry_to_peer(consensus: Arc, peer: &mut Peer, entry_ } async fn send_append_entry_request( - consensus: Arc, + consensus: Arc, peer: &mut Peer, current_term: u64, prev_log_index: u64, @@ -304,7 +304,7 @@ async fn send_append_entry_request( } #[allow(dead_code)] -pub fn initialize_transaction_execution_queue(consensus: Arc) { +pub fn initialize_transaction_execution_queue(consensus: Arc) { // XXX FIXME: deal with the scenario where a transactionHash arrives after the block; // in this case, before saving the block LogEntry, it should ALWAYS wait for all transaction hashes @@ -329,7 +329,7 @@ pub fn initialize_transaction_execution_queue(consensus: Arc) { //TODO this broadcast needs to wait for majority of followers to confirm the log before sending the next one #[allow(dead_code)] pub fn initialize_append_entries_channel( - consensus: Arc, + consensus: Arc, mut rx_pending_txs: broadcast::Receiver, mut rx_blocks: broadcast::Receiver, ) { @@ -341,7 +341,7 @@ pub fn initialize_append_entries_channel( return; }, Ok(tx) = rx_pending_txs.recv() => { - if Consensus::is_leader() { + if Raft::is_leader() { tracing::info!(tx_hash = %tx.hash(), "received transaction execution to send to followers"); if tx.is_local() { tracing::debug!(tx_hash = %tx.hash(), "skipping local transaction because only external transactions are supported for now"); diff --git a/src/eth/consensus/server.rs b/src/eth/consensus/raft/server.rs similarity index 97% rename from src/eth/consensus/server.rs rename to src/eth/consensus/raft/server.rs index 5bdbe811b..22e1be1e9 100644 --- a/src/eth/consensus/server.rs +++ b/src/eth/consensus/raft/server.rs @@ -17,15 +17,15 @@ use super::append_entry::AppendTransactionExecutionsResponse; use super::append_entry::RequestVoteRequest; use super::append_entry::RequestVoteResponse; use super::append_entry::StatusCode; -use crate::eth::consensus::append_entry::append_entry_service_server::AppendEntryServiceServer; -use crate::eth::consensus::AppendEntryService; -use crate::eth::consensus::LogEntryData; -use crate::eth::consensus::PeerAddress; -use crate::eth::consensus::Role; +use super::Raft; +use crate::eth::consensus::raft::append_entry::append_entry_service_server::AppendEntryServiceServer; +use crate::eth::consensus::raft::AppendEntryService; +use crate::eth::consensus::raft::LogEntryData; +use crate::eth::consensus::raft::PeerAddress; +use crate::eth::consensus::raft::Role; use crate::eth::miner::block_from_propagation; use crate::eth::primitives::LocalTransactionExecution; use crate::eth::primitives::TransactionExecution; -use crate::eth::Consensus; use crate::ext::spawn_named; #[cfg(feature = "metrics")] use crate::infra::metrics; @@ -39,7 +39,7 @@ mod label { } #[allow(dead_code)] -pub fn initialize_server(consensus: Arc) { +pub fn initialize_server(consensus: Arc) { const TASK_NAME: &str = "consensus::server"; spawn_named(TASK_NAME, async move { tracing::info!("Starting append entry service at address: {}", consensus.grpc_address); @@ -64,7 +64,7 @@ pub fn initialize_server(consensus: Arc) { } pub struct AppendEntryServiceImpl { - pub consensus: Mutex>, + pub consensus: Mutex>, } #[tonic::async_trait] @@ -87,7 +87,7 @@ impl AppendEntryService for AppendEntryServiceImpl { let current_term = consensus.current_term.load(Ordering::SeqCst); let request_inner = request.into_inner(); - if Consensus::is_leader() { + if Raft::is_leader() { tracing::error!(sender = request_inner.leader_id, "append_transaction_executions called on leader node"); return Err(Status::new( (StatusCode::NotLeader as i32).into(), @@ -252,7 +252,7 @@ impl AppendEntryService for AppendEntryServiceImpl { let current_term = consensus.current_term.load(Ordering::SeqCst); let request_inner = request.into_inner(); - if Consensus::is_leader() { + if Raft::is_leader() { tracing::error!(sender = request_inner.leader_id, "append_block_commit called on leader node"); return Err(Status::new( (StatusCode::NotLeader as i32).into(), @@ -455,7 +455,7 @@ impl AppendEntryService for AppendEntryServiceImpl { if request.last_log_index >= candidate_last_log_index { consensus.current_term.store(request.term, Ordering::SeqCst); - Consensus::set_role(Role::Follower); + Raft::set_role(Role::Follower); consensus.reset_heartbeat_signal.notify_waiters(); // reset the heartbeat signal to avoid election timeout just after voting *consensus.voted_for.lock().await = Some(candidate_address.clone()); @@ -485,8 +485,8 @@ impl AppendEntryService for AppendEntryServiceImpl { #[cfg(test)] mod tests { use super::*; - use crate::eth::consensus::append_entry::BlockEntry; - use crate::eth::consensus::tests::factories::*; + use crate::eth::consensus::raft::append_entry::BlockEntry; + use crate::eth::consensus::raft::tests::factories::*; #[tokio::test] async fn test_append_transaction_executions_insert() { @@ -495,7 +495,7 @@ mod tests { consensus: Mutex::new(Arc::clone(&consensus)), }; - Consensus::set_role(Role::Follower); + Raft::set_role(Role::Follower); let executions = vec![create_mock_transaction_execution_entry(None)]; @@ -530,7 +530,7 @@ mod tests { }; // Simulate the node as not a leader - Consensus::set_role(Role::Follower); + Raft::set_role(Role::Follower); let request = Request::new(AppendTransactionExecutionsRequest { term: 1, @@ -557,7 +557,7 @@ mod tests { }; // Simulate the node as a leader - Consensus::set_role(Role::Leader); + Raft::set_role(Role::Leader); let request = Request::new(AppendTransactionExecutionsRequest { term: 1, @@ -614,7 +614,7 @@ mod tests { }; // Simulate the node as a leader - Consensus::set_role(Role::Leader); + Raft::set_role(Role::Leader); let request = Request::new(AppendBlockCommitRequest { term: 1, diff --git a/src/eth/consensus/tests/factories.rs b/src/eth/consensus/raft/tests/factories.rs similarity index 89% rename from src/eth/consensus/tests/factories.rs rename to src/eth/consensus/raft/tests/factories.rs index 384f55346..ae3b16ce5 100644 --- a/src/eth/consensus/tests/factories.rs +++ b/src/eth/consensus/raft/tests/factories.rs @@ -10,18 +10,17 @@ use ethereum_types::H256; use rand::Rng; use tokio::sync::Mutex; -use crate::eth::consensus::append_entry::AppendBlockCommitResponse; -use crate::eth::consensus::append_entry::AppendTransactionExecutionsResponse; -use crate::eth::consensus::append_entry::BlockEntry; -use crate::eth::consensus::append_entry::Log; -use crate::eth::consensus::append_entry::RequestVoteResponse; -use crate::eth::consensus::append_entry::TransactionExecutionEntry; -use crate::eth::consensus::log_entry::LogEntry; -use crate::eth::consensus::Consensus; -use crate::eth::consensus::LogEntryData; -use crate::eth::consensus::Peer; -use crate::eth::consensus::PeerAddress; -use crate::eth::consensus::Role; +use crate::eth::consensus::raft::append_entry::AppendBlockCommitResponse; +use crate::eth::consensus::raft::append_entry::AppendTransactionExecutionsResponse; +use crate::eth::consensus::raft::append_entry::BlockEntry; +use crate::eth::consensus::raft::append_entry::Log; +use crate::eth::consensus::raft::append_entry::RequestVoteResponse; +use crate::eth::consensus::raft::append_entry::TransactionExecutionEntry; +use crate::eth::consensus::raft::log_entry::LogEntry; +use crate::eth::consensus::raft::LogEntryData; +use crate::eth::consensus::raft::Peer; +use crate::eth::consensus::raft::PeerAddress; +use crate::eth::consensus::raft::Role; use crate::eth::storage::StratusStorage; static GLOBAL_COUNTER: AtomicUsize = AtomicUsize::new(0); @@ -107,7 +106,7 @@ pub fn create_mock_log_entry(index: u64, term: u64, data: LogEntryData) -> LogEn LogEntry { index, term, data } } -pub fn create_mock_consensus() -> Arc { +pub fn create_mock_consensus() -> Arc { let (storage, _tmpdir) = StratusStorage::mock_new_rocksdb(); storage.set_pending_block_number_as_next_if_not_set().unwrap(); let (_log_entries_storage, tmpdir_log_entries) = StratusStorage::mock_new_rocksdb(); @@ -121,7 +120,7 @@ pub fn create_mock_consensus() -> Arc { let miner = Miner::new(Arc::clone(&storage), crate::eth::miner::MinerMode::External); - Consensus::new( + Raft::new( Arc::clone(&storage), miner.into(), tmpdir_log_entries_path, @@ -139,6 +138,7 @@ use super::append_entry::AppendTransactionExecutionsRequest; use super::append_entry::StatusCode; use super::Hash; use super::Miner; +use super::Raft; // Define a simple interceptor that does nothing #[allow(dead_code)] // HACK to avoid unused code warning @@ -149,7 +149,7 @@ impl Interceptor for MockInterceptor { } } -fn create_mock_leader_peer(consensus: Arc) -> (PeerAddress, Peer) { +fn create_mock_leader_peer(consensus: Arc) -> (PeerAddress, Peer) { let leader_address = PeerAddress::from_string("http://127.0.0.1:3000;3777".to_string()).unwrap(); let client = MockAppendEntryServiceClient::new(); let leader_peer = Peer { @@ -162,9 +162,9 @@ fn create_mock_leader_peer(consensus: Arc) -> (PeerAddress, Peer) { (leader_address, leader_peer) } -pub async fn create_follower_consensus_with_leader(term: Option) -> Arc { +pub async fn create_follower_consensus_with_leader(term: Option) -> Arc { let consensus = create_mock_consensus(); - Consensus::set_role(Role::Follower); + Raft::set_role(Role::Follower); if let Some(term) = term { consensus.current_term.store(term, Ordering::SeqCst); @@ -179,9 +179,9 @@ pub async fn create_follower_consensus_with_leader(term: Option) -> Arc Arc { +pub fn create_leader_consensus() -> Arc { let consensus = create_mock_consensus(); - Consensus::set_role(Role::Leader); + Raft::set_role(Role::Leader); zero_global_counter(); consensus } diff --git a/src/eth/consensus/tests/test_simple_blocks.rs b/src/eth/consensus/raft/tests/test_simple_blocks.rs similarity index 95% rename from src/eth/consensus/tests/test_simple_blocks.rs rename to src/eth/consensus/raft/tests/test_simple_blocks.rs index e401a3cae..07d19840f 100644 --- a/src/eth/consensus/tests/test_simple_blocks.rs +++ b/src/eth/consensus/raft/tests/test_simple_blocks.rs @@ -9,13 +9,13 @@ use tonic::Request; use super::factories::create_follower_consensus_with_leader; use super::factories::create_mock_block_entry; use super::factories::create_mock_transaction_execution_entry; -use crate::eth::consensus::append_entry::append_entry_service_server::AppendEntryService; -use crate::eth::consensus::append_entry::AppendBlockCommitRequest; -use crate::eth::consensus::append_entry::AppendTransactionExecutionsRequest; -use crate::eth::consensus::append_entry::StatusCode; -use crate::eth::consensus::server::AppendEntryServiceImpl; -use crate::eth::consensus::Role; -use crate::eth::consensus::TransactionExecutionEntry; +use crate::eth::consensus::raft::append_entry::append_entry_service_server::AppendEntryService; +use crate::eth::consensus::raft::append_entry::AppendBlockCommitRequest; +use crate::eth::consensus::raft::append_entry::AppendTransactionExecutionsRequest; +use crate::eth::consensus::raft::append_entry::StatusCode; +use crate::eth::consensus::raft::server::AppendEntryServiceImpl; +use crate::eth::consensus::raft::Role; +use crate::eth::consensus::raft::TransactionExecutionEntry; use crate::eth::primitives::Address; use crate::eth::primitives::BlockFilter; use crate::eth::primitives::BlockNumber; diff --git a/src/eth/consensus/utils.rs b/src/eth/consensus/raft/utils.rs similarity index 100% rename from src/eth/consensus/utils.rs rename to src/eth/consensus/raft/utils.rs diff --git a/src/eth/consensus/simple_consensus/mod.rs b/src/eth/consensus/simple_consensus/mod.rs new file mode 100644 index 000000000..5ec4d1eb6 --- /dev/null +++ b/src/eth/consensus/simple_consensus/mod.rs @@ -0,0 +1,75 @@ +use std::sync::Arc; + +use anyhow::anyhow; +use async_trait::async_trait; + +use super::Consensus; +use crate::eth::primitives::Bytes; +use crate::eth::primitives::Hash; +use crate::eth::storage::StratusStorage; +use crate::infra::metrics; +use crate::infra::BlockchainClient; +use crate::log_and_err; + +pub struct SimpleConsensus { + storage: Arc, + // if blockchain_client.is_some() then this is a replica, else this is the validator + blockchain_client: Option>, +} + +impl SimpleConsensus { + pub fn new(storage: Arc, blockchain_client: Option>) -> Self { + SimpleConsensus { storage, blockchain_client } + } +} + +#[async_trait] +impl Consensus for SimpleConsensus { + fn should_forward(&self) -> bool { + self.blockchain_client.is_some() + } + async fn should_serve(&self) -> bool { + let Some(blockchain_client) = &self.blockchain_client else { + return true; + }; + + //gather the latest block number, check how far behind it is from current storage block + //if its greater than 3 blocks of distance, it should not be served + let Ok(validator_block_number) = blockchain_client.fetch_block_number().await else { + tracing::error!("unable to fetch latest block number"); + return false; + }; + + let Ok(current_block_number) = self.storage.read_mined_block_number() else { + tracing::error!("unable to fetch current block number"); + return false; + }; + let should_serve = (validator_block_number.as_u64() - 3) <= current_block_number.as_u64(); + + if !should_serve { + tracing::info!(?validator_block_number, ?current_block_number, "validator and replica are too far appart"); + } + + return should_serve; + } + + async fn forward(&self, transaction: Bytes) -> anyhow::Result { + #[cfg(feature = "metrics")] + let start = metrics::now(); + + let Some(blockchain_client) = &self.blockchain_client else { + return log_and_err!("SimpleConsensus.forward was called but no blockchain client is set"); + }; + + let result = blockchain_client.send_raw_transaction(transaction.into()).await?; + + #[cfg(feature = "metrics")] + metrics::inc_consensus_forward(start.elapsed()); + + let tx_hash = result.tx_hash; + let validator_url = &blockchain_client.http_url; + tracing::info!(%tx_hash, ?validator_url, "forwarded eth_sendRawTransaction to leader"); + + Ok(result.tx_hash) + } +} diff --git a/src/eth/importer/importer.rs b/src/eth/importer/importer.rs new file mode 100644 index 000000000..30176b431 --- /dev/null +++ b/src/eth/importer/importer.rs @@ -0,0 +1,437 @@ +use std::cmp::min; +use std::sync::atomic::AtomicU64; +use std::sync::atomic::Ordering; +use std::sync::Arc; +use std::time::Duration; + +use futures::try_join; +use futures::StreamExt; +use serde::Deserialize; +use tokio::sync::mpsc; +use tokio::task::yield_now; +use tokio::time::timeout; +use tracing::Span; + +use crate::eth::executor::Executor; +use crate::eth::miner::Miner; +use crate::eth::primitives::BlockNumber; +use crate::eth::primitives::ExternalBlock; +use crate::eth::primitives::ExternalReceipt; +use crate::eth::primitives::ExternalReceipts; +use crate::eth::primitives::Hash; +use crate::eth::storage::StratusStorage; +use crate::ext::spawn_named; +use crate::ext::traced_sleep; +use crate::ext::DisplayExt; +use crate::ext::SleepReason; +use crate::if_else; +#[cfg(feature = "metrics")] +use crate::infra::metrics; +use crate::infra::tracing::warn_task_rx_closed; +use crate::infra::tracing::warn_task_tx_closed; +use crate::infra::tracing::SpanExt; +use crate::infra::BlockchainClient; +use crate::log_and_err; +#[cfg(feature = "metrics")] +use crate::utils::calculate_tps; +use crate::utils::DropTimer; +use crate::GlobalState; + +// ----------------------------------------------------------------------------- +// Globals +// ----------------------------------------------------------------------------- + +/// Current block number of the external RPC blockchain. +static EXTERNAL_RPC_CURRENT_BLOCK: AtomicU64 = AtomicU64::new(0); + +/// Only sets the external RPC current block number if it is equals or greater than the current one. +fn set_external_rpc_current_block(new_number: BlockNumber) { + let new_number_u64 = new_number.as_u64(); + let _ = EXTERNAL_RPC_CURRENT_BLOCK.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |current_number| { + if_else!(new_number_u64 >= current_number, Some(new_number_u64), None) + }); +} + +// ----------------------------------------------------------------------------- +// Constants +// ----------------------------------------------------------------------------- +/// Number of blocks that are downloaded in parallel. +const PARALLEL_BLOCKS: usize = 3; + +/// Number of receipts that are downloaded in parallel. +const PARALLEL_RECEIPTS: usize = 100; + +/// Timeout awaiting for newHeads event before fallback to polling. +const TIMEOUT_NEW_HEADS: Duration = Duration::from_millis(2000); + +/// Interval before we starting retrieving receipts because they are not immediately available after the block is retrieved. +const INTERVAL_FETCH_RECEIPTS: Duration = Duration::from_millis(50); + +pub struct Importer { + executor: Arc, + + miner: Arc, + + storage: Arc, + + chain: Arc, + + sync_interval: Duration, +} + +impl Importer { + /// Creates a new [`Importer`]. + pub fn new(executor: Arc, miner: Arc, storage: Arc, chain: Arc, sync_interval: Duration) -> Self { + tracing::info!("creating importer"); + Self { + executor, + miner, + storage, + chain, + sync_interval, + } + } + + // ----------------------------------------------------------------------------- + // Execution + // ----------------------------------------------------------------------------- + + pub async fn run_importer_online(self: Arc) -> anyhow::Result<()> { + let _timer = DropTimer::start("importer-online::run_importer_online"); + + let storage = &self.storage; + let number = storage.read_block_number_to_resume_import()?; + + let (backlog_tx, backlog_rx) = mpsc::unbounded_channel(); + + // spawn block executor: + // it executes and mines blocks and expects to receive them via channel in the correct order. + let task_executor = spawn_named( + "importer::executor", + Importer::start_block_executor(Arc::clone(&self.executor), Arc::clone(&self.miner), backlog_rx), + ); + + // spawn block number: + // it keeps track of the blockchain current block number. + let number_fetcher_chain = Arc::clone(&self.chain); + let task_number_fetcher = spawn_named( + "importer::number-fetcher", + Importer::start_number_fetcher(number_fetcher_chain, self.sync_interval), + ); + + // spawn block fetcher: + // it fetches blocks and receipts in parallel and sends them to the executor in the correct order. + // it uses the number fetcher current block to determine if should keep downloading more blocks or not. + let block_fetcher_chain = Arc::clone(&self.chain); + let task_block_fetcher = spawn_named( + "importer::block-fetcher", + Importer::start_block_fetcher(block_fetcher_chain, backlog_tx, number), + ); + + // await all tasks + if let Err(e) = try_join!(task_executor, task_block_fetcher, task_number_fetcher) { + tracing::error!(reason = ?e, "importer-online failed"); + } + Ok(()) + } + + // ----------------------------------------------------------------------------- + // Executor + // ----------------------------------------------------------------------------- + + // Executes external blocks and persist them to storage. + async fn start_block_executor( + executor: Arc, + miner: Arc, + mut backlog_rx: mpsc::UnboundedReceiver<(ExternalBlock, Vec)>, + ) -> anyhow::Result<()> { + const TASK_NAME: &str = "block-executor"; + + loop { + if GlobalState::is_shutdown_warn(TASK_NAME) { + return Ok(()); + } + + let (block, receipts) = match timeout(Duration::from_secs(2), backlog_rx.recv()).await { + Ok(Some(inner)) => inner, + Ok(None) => break, // channel closed + Err(_timed_out) => { + tracing::warn!(timeout = "2s", "timeout reading block executor channel, expected around 1 block per second"); + continue; + } + }; + + #[cfg(feature = "metrics")] + let start = metrics::now(); + + // execute and mine + let receipts = ExternalReceipts::from(receipts); + if let Err(e) = executor.execute_external_block(&block, &receipts) { + let message = GlobalState::shutdown_from(TASK_NAME, "failed to reexecute external block"); + return log_and_err!(reason = e, message); + }; + + // statistics + #[cfg(feature = "metrics")] + { + let duration = start.elapsed(); + let tps = calculate_tps(duration, block.transactions.len()); + + tracing::info!( + tps, + duraton = %duration.to_string_ext(), + block_number = %block.number(), + receipts = receipts.len(), + "reexecuted external block", + ); + } + + if let Err(e) = miner.mine_external_mixed_and_commit() { + let message = GlobalState::shutdown_from(TASK_NAME, "failed to mine external block"); + return log_and_err!(reason = e, message); + }; + + #[cfg(feature = "metrics")] + { + metrics::inc_n_importer_online_transactions_total(receipts.len() as u64); + metrics::inc_import_online_mined_block(start.elapsed()); + } + } + + warn_task_tx_closed(TASK_NAME); + Ok(()) + } + + // ----------------------------------------------------------------------------- + // Number fetcher + // ----------------------------------------------------------------------------- + + /// Retrieves the blockchain current block number. + async fn start_number_fetcher(chain: Arc, sync_interval: Duration) -> anyhow::Result<()> { + const TASK_NAME: &str = "external-number-fetcher"; + + // initial newHeads subscriptions. + // abort application if cannot subscribe. + let mut sub_new_heads = if chain.supports_ws() { + tracing::info!("{} subscribing to newHeads event", TASK_NAME); + + match chain.subscribe_new_heads().await { + Ok(sub) => { + tracing::info!("{} subscribed to newHeads events", TASK_NAME); + Some(sub) + } + Err(e) => { + let message = GlobalState::shutdown_from(TASK_NAME, "cannot subscribe to newHeads event"); + return log_and_err!(reason = e, message); + } + } + } else { + tracing::warn!("{} blockchain client does not have websocket enabled", TASK_NAME); + None + }; + + // keep reading websocket subscription or polling via http. + loop { + if GlobalState::is_shutdown_warn(TASK_NAME) { + return Ok(()); + } + + // if we have a subscription, try to read from subscription. + // in case of failure, re-subscribe because current subscription may have been closed in the server. + if let Some(sub) = &mut sub_new_heads { + tracing::info!("{} awaiting block number from newHeads subscription", TASK_NAME); + let resubscribe_ws = match timeout(TIMEOUT_NEW_HEADS, sub.next()).await { + Ok(Some(Ok(block))) => { + tracing::info!(block_number = %block.number(), "{} received newHeads event", TASK_NAME); + set_external_rpc_current_block(block.number()); + continue; + } + Ok(None) => { + if !GlobalState::is_shutdown_warn(TASK_NAME) { + tracing::error!("{} newHeads subscription closed by the other side", TASK_NAME); + } + true + } + Ok(Some(Err(e))) => { + if !GlobalState::is_shutdown_warn(TASK_NAME) { + tracing::error!(reason = ?e, "{} failed to read newHeads subscription event", TASK_NAME); + } + true + } + Err(_) => { + if !GlobalState::is_shutdown_warn(TASK_NAME) { + tracing::error!("{} timed-out waiting for newHeads subscription event", TASK_NAME); + } + true + } + }; + + if GlobalState::is_shutdown_warn(TASK_NAME) { + return Ok(()); + } + + // resubscribe if necessary. + // only update the existing subscription if succedeed, otherwise we will try again in the next iteration. + if chain.supports_ws() && resubscribe_ws { + tracing::info!("{} resubscribing to newHeads event", TASK_NAME); + match chain.subscribe_new_heads().await { + Ok(sub) => { + tracing::info!("{} resubscribed to newHeads event", TASK_NAME); + sub_new_heads = Some(sub); + } + Err(e) => + if !GlobalState::is_shutdown_warn(TASK_NAME) { + tracing::error!(reason = ?e, "{} failed to resubscribe to newHeads event", TASK_NAME); + }, + } + } + } + + if GlobalState::is_shutdown_warn(TASK_NAME) { + return Ok(()); + } + + // fallback to polling + tracing::warn!("{} falling back to http polling because subscription failed or it is not enabled", TASK_NAME); + match chain.fetch_block_number().await { + Ok(block_number) => { + tracing::info!( + %block_number, + sync_interval = %sync_interval.to_string_ext(), + "fetched current block number via http. awaiting sync interval to retrieve again." + ); + set_external_rpc_current_block(block_number); + traced_sleep(sync_interval, SleepReason::SyncData).await; + } + Err(e) => + if !GlobalState::is_shutdown_warn(TASK_NAME) { + tracing::error!(reason = ?e, "failed to retrieve block number. retrying now."); + }, + } + } + } + + // ----------------------------------------------------------------------------- + // Block fetcher + // ----------------------------------------------------------------------------- + + /// Retrieves blocks and receipts. + async fn start_block_fetcher( + chain: Arc, + backlog_tx: mpsc::UnboundedSender<(ExternalBlock, Vec)>, + mut importer_block_number: BlockNumber, + ) -> anyhow::Result<()> { + const TASK_NAME: &str = "external-block-fetcher"; + + loop { + if GlobalState::is_shutdown_warn(TASK_NAME) { + return Ok(()); + } + + // if we are ahead of current block number, await until we are behind again + let external_rpc_current_block = EXTERNAL_RPC_CURRENT_BLOCK.load(Ordering::Relaxed); + if importer_block_number.as_u64() > external_rpc_current_block { + yield_now().await; + continue; + } + + // we are behind current, so we will fetch multiple blocks in parallel to catch up + let blocks_behind = external_rpc_current_block.saturating_sub(importer_block_number.as_u64()) + 1; // TODO: use count_to from BlockNumber + let mut blocks_to_fetch = min(blocks_behind, 1_000); // avoid spawning millions of tasks (not parallelism), at least until we know it is safe + tracing::info!(%blocks_behind, blocks_to_fetch, "catching up with blocks"); + + let mut tasks = Vec::with_capacity(blocks_to_fetch as usize); + while blocks_to_fetch > 0 { + blocks_to_fetch -= 1; + tasks.push(fetch_block_and_receipts(Arc::clone(&chain), importer_block_number)); + importer_block_number = importer_block_number.next(); + } + + // keep fetching in order + let mut tasks = futures::stream::iter(tasks).buffered(PARALLEL_BLOCKS); + while let Some((block, receipts)) = tasks.next().await { + if backlog_tx.send((block, receipts)).is_err() { + warn_task_rx_closed(TASK_NAME); + return Ok(()); + } + } + } + } +} + +// ----------------------------------------------------------------------------- +// Helpers +// ----------------------------------------------------------------------------- + +#[tracing::instrument(name = "importer::fetch_block_and_receipts", skip_all, fields(block_number))] +async fn fetch_block_and_receipts(chain: Arc, block_number: BlockNumber) -> (ExternalBlock, Vec) { + Span::with(|s| { + s.rec_str("block_number", &block_number); + }); + + // fetch block + let block = fetch_block(Arc::clone(&chain), block_number).await; + + // wait some time until receipts are available + let _ = traced_sleep(INTERVAL_FETCH_RECEIPTS, SleepReason::SyncData).await; + + // fetch receipts in parallel + let mut receipts_tasks = Vec::with_capacity(block.transactions.len()); + for hash in block.transactions.iter().map(|tx| tx.hash()) { + receipts_tasks.push(fetch_receipt(Arc::clone(&chain), block_number, hash)); + } + let receipts = futures::stream::iter(receipts_tasks).buffer_unordered(PARALLEL_RECEIPTS).collect().await; + + (block, receipts) +} + +#[tracing::instrument(name = "importer::fetch_block", skip_all, fields(block_number))] +async fn fetch_block(chain: Arc, block_number: BlockNumber) -> ExternalBlock { + const RETRY_DELAY: Duration = Duration::from_millis(10); + Span::with(|s| { + s.rec_str("block_number", &block_number); + }); + + loop { + tracing::info!(%block_number, "fetching block"); + let block = match chain.fetch_block(block_number).await { + Ok(json) => json, + Err(e) => { + tracing::warn!(reason = ?e, %block_number, delay_ms=%RETRY_DELAY.as_millis(), "failed to retrieve block. retrying with delay."); + traced_sleep(RETRY_DELAY, SleepReason::RetryBackoff).await; + continue; + } + }; + + if block.is_null() { + tracing::warn!(%block_number, delay_ms=%RETRY_DELAY.as_millis(), "block not mined yet. retrying with delay."); + traced_sleep(RETRY_DELAY, SleepReason::SyncData).await; + continue; + } + + return ExternalBlock::deserialize(&block).expect("cannot fail to deserialize external block"); + } +} + +#[tracing::instrument(name = "importer::fetch_receipt", skip_all, fields(block_number, tx_hash))] +async fn fetch_receipt(chain: Arc, block_number: BlockNumber, tx_hash: Hash) -> ExternalReceipt { + Span::with(|s| { + s.rec_str("block_number", &block_number); + s.rec_str("tx_hash", &tx_hash); + }); + + loop { + tracing::info!(%block_number, %tx_hash, "fetching receipt"); + + match chain.fetch_receipt(tx_hash).await { + Ok(Some(receipt)) => return receipt, + Ok(None) => { + tracing::warn!(%block_number, %tx_hash, "receipt not available yet because block is not mined. retrying now."); + continue; + } + Err(e) => { + tracing::error!(reason = ?e, %block_number, %tx_hash, "failed to fetch receipt. retrying now."); + } + } + } +} diff --git a/src/eth/importer/importer_config.rs b/src/eth/importer/importer_config.rs new file mode 100644 index 000000000..b2e604e76 --- /dev/null +++ b/src/eth/importer/importer_config.rs @@ -0,0 +1,67 @@ +use std::sync::Arc; +use std::time::Duration; + +use clap::Parser; +use display_json::DebugAsJson; + +use crate::eth::executor::Executor; +use crate::eth::importer::Importer; +use crate::eth::miner::Miner; +use crate::eth::storage::StratusStorage; +use crate::ext::parse_duration; +use crate::ext::spawn_named; +use crate::infra::BlockchainClient; + +#[derive(Default, Parser, DebugAsJson, Clone, serde::Serialize)] +#[group(requires_all = ["external_rpc"])] +pub struct ImporterConfig { + /// External RPC HTTP endpoint to sync blocks with Stratus. + #[arg( + short = 'r', + long = "external-rpc", + env = "EXTERNAL_RPC", + conflicts_with("leader"), + requires_if("follower", "true") + )] + pub external_rpc: String, + + /// External RPC WS endpoint to sync blocks with Stratus. + #[arg(short = 'w', long = "external-rpc-ws", env = "EXTERNAL_RPC_WS", conflicts_with("leader"))] + pub external_rpc_ws: Option, + + /// Timeout for blockchain requests (importer online) + #[arg(long = "external-rpc-timeout", value_parser=parse_duration, env = "EXTERNAL_RPC_TIMEOUT", default_value = "2s")] + pub external_rpc_timeout: Duration, + + #[arg(long = "sync-interval", value_parser=parse_duration, env = "SYNC_INTERVAL", default_value = "100ms")] + pub sync_interval: Duration, +} + +impl ImporterConfig { + pub fn init( + &self, + executor: Arc, + miner: Arc, + storage: Arc, + chain: Arc, + ) -> anyhow::Result> { + const TASK_NAME: &str = "importer::init"; + tracing::info!(config = ?self, "creating importer"); + + let config = self.clone(); + + let importer = Importer::new(executor, miner, Arc::clone(&storage), chain, config.sync_interval); + let importer = Arc::new(importer); + + spawn_named(TASK_NAME, { + let importer = Arc::clone(&importer); + async move { + if let Err(e) = importer.run_importer_online().await { + tracing::error!(reason = ?e, "importer-online failed"); + } + } + }); + + Ok(importer) + } +} diff --git a/src/eth/importer/mod.rs b/src/eth/importer/mod.rs new file mode 100644 index 000000000..943c920b2 --- /dev/null +++ b/src/eth/importer/mod.rs @@ -0,0 +1,6 @@ +#[allow(clippy::module_inception)] +mod importer; +mod importer_config; + +pub use importer::Importer; +pub use importer_config::ImporterConfig; diff --git a/src/eth/miner/miner.rs b/src/eth/miner/miner.rs index 318721f4c..0f8f956ce 100644 --- a/src/eth/miner/miner.rs +++ b/src/eth/miner/miner.rs @@ -8,7 +8,7 @@ use nonempty::NonEmpty; use tokio::sync::broadcast; use tracing::Span; -use crate::eth::consensus::append_entry; +use crate::eth::consensus::raft::append_entry; use crate::eth::miner::MinerMode; use crate::eth::primitives::Block; use crate::eth::primitives::BlockHeader; @@ -477,11 +477,6 @@ mod interval_miner { continue; } - if not(GlobalState::is_leader()) { - tracing::warn!("skipping mining block because node is not a leader"); - continue; - } - // mine tracing::info!(lag_us = %tick.elapsed().as_micros(), "interval mining block"); mine_and_commit(&miner); diff --git a/src/eth/mod.rs b/src/eth/mod.rs index 58972b3e6..05d00acc3 100644 --- a/src/eth/mod.rs +++ b/src/eth/mod.rs @@ -1,6 +1,7 @@ pub mod codegen; pub mod consensus; pub mod executor; +pub mod importer; pub mod miner; pub mod primitives; pub mod rpc; diff --git a/src/eth/primitives/block_header.rs b/src/eth/primitives/block_header.rs index ded3cb577..f58557d58 100644 --- a/src/eth/primitives/block_header.rs +++ b/src/eth/primitives/block_header.rs @@ -13,7 +13,7 @@ use jsonrpsee::SubscriptionMessage; use crate::alias::EthersBlockVoid; use crate::alias::EthersBytes; -use crate::eth::consensus::append_entry; +use crate::eth::consensus::raft::append_entry; use crate::eth::primitives::logs_bloom::LogsBloom; use crate::eth::primitives::Address; use crate::eth::primitives::BlockNumber; diff --git a/src/eth/primitives/transaction_execution.rs b/src/eth/primitives/transaction_execution.rs index 74a9f4d7e..c326748e6 100644 --- a/src/eth/primitives/transaction_execution.rs +++ b/src/eth/primitives/transaction_execution.rs @@ -7,9 +7,9 @@ use ethereum_types::H256; use ethereum_types::U64; use super::Gas; -use crate::eth::consensus::append_entry; -use crate::eth::consensus::utils::bytes_to_u256; -use crate::eth::consensus::utils::u256_to_bytes; +use crate::eth::consensus::raft::append_entry; +use crate::eth::consensus::raft::utils::bytes_to_u256; +use crate::eth::consensus::raft::utils::u256_to_bytes; use crate::eth::executor::EvmExecutionResult; use crate::eth::primitives::Address; use crate::eth::primitives::Bytes; diff --git a/src/eth/rpc/rpc_context.rs b/src/eth/rpc/rpc_context.rs index 90bfec7bf..673bbccc6 100644 --- a/src/eth/rpc/rpc_context.rs +++ b/src/eth/rpc/rpc_context.rs @@ -26,7 +26,7 @@ pub struct RpcContext { #[allow(dead_code)] // HACK this was triggered in Rust 1.79 pub miner: Arc, pub storage: Arc, - pub consensus: Arc, + pub consensus: Arc, pub rpc_server: RpcServerConfig, pub subs: Arc, } diff --git a/src/eth/rpc/rpc_server.rs b/src/eth/rpc/rpc_server.rs index ea7cc2656..e6af9a5d7 100644 --- a/src/eth/rpc/rpc_server.rs +++ b/src/eth/rpc/rpc_server.rs @@ -69,7 +69,7 @@ pub async fn serve_rpc( storage: Arc, executor: Arc, miner: Arc, - consensus: Arc, + consensus: Arc, // config app_config: impl serde::Serialize, @@ -300,8 +300,8 @@ fn stratus_disable_miner(_: Params<'_>, _: &RpcContext, _: &Extensions) -> bool // Stratus - State // ----------------------------------------------------------------------------- -fn stratus_version(_: Params<'_>, ctx: &RpcContext, _: &Extensions) -> Result { - Ok(build_info::as_json(ctx)) +fn stratus_version(_: Params<'_>, _: &RpcContext, _: &Extensions) -> Result { + Ok(build_info::as_json()) } fn stratus_config(_: Params<'_>, ctx: &RpcContext, _: &Extensions) -> Result { @@ -621,10 +621,7 @@ fn eth_send_raw_transaction(params: Params<'_>, ctx: Arc, ext: Exten if ctx.consensus.should_forward() { tracing::info!(%tx_hash, "forwarding eth_sendRawTransaction to leader"); return match Handle::current().block_on(ctx.consensus.forward(data)) { - Ok((hash, url)) => { - tracing::info!(%tx_hash, %url, "forwarded eth_sendRawTransaction to leader"); - Ok(hex_data(hash)) - } + Ok(hash) => Ok(hex_data(hash)), Err(e) => { tracing::error!(reason = ?e, %tx_hash, "failed to forward eth_sendRawTransaction to leader"); Err(StratusError::TransactionForwardToLeaderFailed) diff --git a/src/globals.rs b/src/globals.rs index 6c0a85f67..bd09e317b 100644 --- a/src/globals.rs +++ b/src/globals.rs @@ -10,7 +10,6 @@ use tokio_util::sync::CancellationToken; use crate::config::load_dotenv; use crate::config::WithCommonConfig; -use crate::eth::Consensus; use crate::ext::spawn_signal_handler; use crate::infra; use crate::infra::tracing::warn_task_cancellation; @@ -145,15 +144,6 @@ impl GlobalState { warn_task_cancellation(task_name); } - // ------------------------------------------------------------------------- - // Leadership - // ------------------------------------------------------------------------- - - /// Checks if node is leader. - pub fn is_leader() -> bool { - Consensus::is_leader() - } - // ------------------------------------------------------------------------- // Transaction // ------------------------------------------------------------------------- diff --git a/src/infra/blockchain_client/blockchain_client.rs b/src/infra/blockchain_client/blockchain_client.rs index bf4fe93ee..b2ea0786c 100644 --- a/src/infra/blockchain_client/blockchain_client.rs +++ b/src/infra/blockchain_client/blockchain_client.rs @@ -26,6 +26,7 @@ use crate::ext::to_json_value; use crate::ext::DisplayExt; use crate::infra::tracing::TracingExt; use crate::log_and_err; +use crate::GlobalState; #[derive(Debug)] pub struct BlockchainClient { @@ -227,10 +228,15 @@ impl BlockchainClient { // ------------------------------------------------------------------------- pub async fn subscribe_new_heads(&self) -> anyhow::Result> { + const TASK_NAME: &str = "blockchain::subscribe_new_heads"; tracing::debug!("subscribing to newHeads event"); let mut first_attempt = true; loop { + if GlobalState::is_shutdown_warn(TASK_NAME) { + return Err(anyhow::anyhow!("shutdown warning")); + }; + let ws_read = self.require_ws().await?; let result = ws_read .subscribe::>("eth_subscribe", vec![JsonValue::String("newHeads".to_owned())], "eth_unsubscribe") diff --git a/src/infra/build_info.rs b/src/infra/build_info.rs index 41272dffa..7b37b4cbf 100644 --- a/src/infra/build_info.rs +++ b/src/infra/build_info.rs @@ -1,8 +1,6 @@ use serde_json::json; use crate::alias::JsonValue; -use crate::eth::rpc::RpcContext; -use crate::eth::Consensus; // ----------------------------------------------------------------------------- // Build constants @@ -59,20 +57,8 @@ pub fn version() -> &'static str { } } -pub fn is_leader() -> bool { - Consensus::is_leader() -} - -pub fn current_term(ctx: &RpcContext) -> u64 { - ctx.consensus.current_term() -} - -pub fn last_index(ctx: &RpcContext) -> u64 { - ctx.consensus.last_index() -} - /// Returns build info as JSON. -pub fn as_json(ctx: &RpcContext) -> JsonValue { +pub fn as_json() -> JsonValue { json!( { "build": { @@ -98,11 +84,6 @@ pub fn as_json(ctx: &RpcContext) -> JsonValue { "version": RUST_VERSION, "channel": RUST_CHANNEL, "target": RUST_TARGET, - }, - "consensus": { - "is_leader": is_leader(), - "current_term": current_term(ctx), - "last_index": last_index(ctx), } } ) diff --git a/src/main.rs b/src/main.rs index 0085a719a..2e93f4d85 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,8 +1,10 @@ use std::sync::Arc; use stratus::config::StratusConfig; +use stratus::eth::consensus::simple_consensus::SimpleConsensus; +use stratus::eth::consensus::Consensus; use stratus::eth::rpc::serve_rpc; -use stratus::eth::Consensus; +use stratus::infra::BlockchainClient; use stratus::GlobalServices; fn main() -> anyhow::Result<()> { @@ -11,28 +13,58 @@ fn main() -> anyhow::Result<()> { } async fn run(config: StratusConfig) -> anyhow::Result<()> { - // init services + // Init services let storage = config.storage.init()?; - let miner = config.miner.init(Arc::clone(&storage))?; + + // Init miner + let miner = if config.follower { + config.miner.init_external_mode(Arc::clone(&storage))? + } else { + config.miner.init(Arc::clone(&storage))? + }; + + // Init executor let executor = config.executor.init(Arc::clone(&storage), Arc::clone(&miner)); - let consensus = Consensus::new( - Arc::clone(&storage), - Arc::clone(&miner), - config.storage.perm_storage.rocks_path_prefix.clone(), - config.clone().candidate_peers.clone(), - None, - config.rpc_server.address, - config.grpc_server_address, - ); // for now, we force None to initiate with the current node being the leader - - // start rpc server + + // Init chain + let chain = if config.follower { + let importer_config = config.importer.as_ref().ok_or(anyhow::anyhow!("importer config is not set"))?; + Some(Arc::new( + BlockchainClient::new_http_ws( + importer_config.external_rpc.as_ref(), + importer_config.external_rpc_ws.as_deref(), + importer_config.external_rpc_timeout, + ) + .await?, + )) + } else { + None + }; + + // Init consensus + let consensus: Arc = Arc::new(SimpleConsensus::new(Arc::clone(&storage), chain.clone())); + + // Init importer + if config.follower { + if let Some(importer_config) = &config.importer { + if let Some(chain) = chain { + importer_config.init(Arc::clone(&executor), Arc::clone(&miner), Arc::clone(&storage), chain)?; + } else { + return Err(anyhow::anyhow!("chain is not initialized")); + } + } else { + return Err(anyhow::anyhow!("importer config is not set")); + } + } + + // Init RPC server serve_rpc( - // services + // Services Arc::clone(&storage), executor, miner, consensus, - // config + // Config config.clone(), config.rpc_server, config.executor.executor_chain_id.into(),