From 614e382f1ecedf2c802f80423040491c498daa81 Mon Sep 17 00:00:00 2001 From: Maycon Amaro <131882788+mayconamaroCW@users.noreply.github.com> Date: Wed, 31 Jul 2024 17:45:26 -0300 Subject: [PATCH] Revert "feat: create importer online component (#1541)" (#1583) This reverts commit 64daad0ca0034513af11d6d9207dfe8fe779e1bf. --- .github/workflows/e2e-generic.yml | 95 +++ ...importer.yml => e2e-run-with-importer.yml} | 6 +- Cargo.toml | 8 + chaos/experiments/main.sh | 345 +++++++++ .../integration/test/helpers/rpc.ts | 2 +- .../integration/test/importer.test.ts | 54 +- .../integration/test/simple.test.ts | 114 +++ justfile | 68 +- src/bin/importer_online.rs | 420 +++++++++++ src/bin/run_with_importer.rs | 78 ++ src/config.rs | 132 +++- .../{raft => }/append_log_entries_storage.rs | 2 +- src/eth/consensus/{raft => }/discovery.rs | 6 +- src/eth/consensus/{raft => }/log_entry.rs | 0 src/eth/consensus/mod.rs | 669 ++++++++++++++++- src/eth/consensus/{raft => }/propagation.rs | 36 +- src/eth/consensus/raft/mod.rs | 683 ------------------ src/eth/consensus/{raft => }/server.rs | 34 +- src/eth/consensus/simple_consensus/mod.rs | 75 -- .../consensus/{raft => }/tests/factories.rs | 38 +- .../{raft => }/tests/test_simple_blocks.rs | 14 +- src/eth/consensus/{raft => }/utils.rs | 0 src/eth/importer/importer.rs | 437 ----------- src/eth/importer/importer_config.rs | 67 -- src/eth/importer/mod.rs | 6 - src/eth/miner/miner.rs | 7 +- src/eth/mod.rs | 1 - src/eth/primitives/block_header.rs | 2 +- src/eth/primitives/transaction_execution.rs | 6 +- src/eth/rpc/rpc_context.rs | 2 +- src/eth/rpc/rpc_server.rs | 11 +- src/globals.rs | 10 + .../blockchain_client/blockchain_client.rs | 6 - src/infra/build_info.rs | 21 +- src/main.rs | 64 +- 35 files changed, 2052 insertions(+), 1467 deletions(-) create mode 100644 .github/workflows/e2e-generic.yml rename .github/workflows/{e2e-importer.yml => e2e-run-with-importer.yml} (95%) create mode 100755 chaos/experiments/main.sh create mode 100644 e2e/cloudwalk-contracts/integration/test/simple.test.ts create mode 100644 src/bin/importer_online.rs create mode 100644 src/bin/run_with_importer.rs rename src/eth/consensus/{raft => }/append_log_entries_storage.rs (99%) rename src/eth/consensus/{raft => }/discovery.rs (97%) rename src/eth/consensus/{raft => }/log_entry.rs (100%) rename src/eth/consensus/{raft => }/propagation.rs (92%) delete mode 100644 src/eth/consensus/raft/mod.rs rename src/eth/consensus/{raft => }/server.rs (97%) delete mode 100644 src/eth/consensus/simple_consensus/mod.rs rename src/eth/consensus/{raft => }/tests/factories.rs (89%) rename src/eth/consensus/{raft => }/tests/test_simple_blocks.rs (95%) rename src/eth/consensus/{raft => }/utils.rs (100%) delete mode 100644 src/eth/importer/importer.rs delete mode 100644 src/eth/importer/importer_config.rs delete mode 100644 src/eth/importer/mod.rs diff --git a/.github/workflows/e2e-generic.yml b/.github/workflows/e2e-generic.yml new file mode 100644 index 000000000..60240e05d --- /dev/null +++ b/.github/workflows/e2e-generic.yml @@ -0,0 +1,95 @@ +name: E2E Generic + +on: + pull_request: + branches: + - '*' + paths-ignore: + - '.github/workflows/deploy.yml' + - '.github/workflows/docs-release.yml' + - '.github/workflows/outdated.yml' + - '.github/workflows/comment-tag-report.yml' + - '.github/workflows/pr-agent.yml' + - '.github/workflows/build-binary.yml' + - '.github/CODEOWNERS' + - 'config/**' + - 'README.md' + - 'LICENSE' + - 'CONTRIBUTING.md' + - 'utils/slack-notifiers/**' + workflow_dispatch: + +jobs: + e2e_generic: + strategy: + fail-fast: false + matrix: + include: + # - leader-restart: true + # instances: 2 + # iterations: 1 + # - leader-restart: true + # instances: 3 + # iterations: 1 + # - leader-restart: false + # instances: 2 + # iterations: 1 + # - leader-restart: false + # instances: 3 + # iterations: 1 + - leader-restart: true + instances: 1 + iterations: 2 + + name: E2E Generic with (${{ matrix.instances }}, ${{ matrix.leader-restart }}) + runs-on: ubuntu-latest + timeout-minutes: 45 + + concurrency: + group: ${{ github.workflow }}-${{ matrix.leader-restart }}-${{ matrix.instances }}-${{ matrix.iterations }}-${{ github.ref || github.run_id }} + cancel-in-progress: true + + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Set up Rust + run: curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y --default-toolchain 1.79 + + - name: Rust Cache + uses: Swatinem/rust-cache@v2 + id: cache-cargo + with: + prefix-key: ${{ runner.os }}-v3-cargo + shared-key: stable-release + key: ${{ hashFiles('Cargo.lock', 'Cargo.toml') }} + cache-provider: "github" + cache-directories: "~/.cargo/bin/" + + - name: Install protoc + run: sudo apt-get install -y protobuf-compiler + + - name: Install jq + run: sudo apt-get install jq -y + + - name: Set up Just + uses: extractions/setup-just@v2 + + - name: Set up dependencies + if: ${{ steps.cache-cargo.outputs.cache-hit != 'true' }} + run: | + cargo install killport || true + cargo install wait-service || true + + - name: Clone all contracts + run: just contracts-clone --token + + - name: Flatten all contracts + run: just contracts-flatten --token + + - name: Run e2e tests + run: just run-chaos-experiment stratus ${{ matrix.instances }} ${{ matrix.iterations }} ${{ matrix.leader-restart }} main + env: + CARGO_PROFILE_RELEASE_DEBUG: 0 + RUST_LOG: error + RELEASE: 1 diff --git a/.github/workflows/e2e-importer.yml b/.github/workflows/e2e-run-with-importer.yml similarity index 95% rename from .github/workflows/e2e-importer.yml rename to .github/workflows/e2e-run-with-importer.yml index e61d77ba0..cec094538 100644 --- a/.github/workflows/e2e-importer.yml +++ b/.github/workflows/e2e-run-with-importer.yml @@ -1,4 +1,4 @@ -name: E2E Importer +name: E2E Run With Importer on: pull_request: @@ -26,10 +26,10 @@ on: - 'Cargo.toml' jobs: - importer_test: + run_with_importer_test: strategy: fail-fast: false - name: E2E Importer on BRLCToken + name: E2E Run With Importer on BRLCToken runs-on: ubuntu-latest timeout-minutes: 45 diff --git a/Cargo.toml b/Cargo.toml index 1bd97254a..fd222dda1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -156,6 +156,14 @@ path = "src/bin/rpc_downloader.rs" name = "importer-offline" path = "src/bin/importer_offline.rs" +[[bin]] +name = "importer-online" +path = "src/bin/importer_online.rs" + +[[bin]] +name = "run-with-importer" +path = "src/bin/run_with_importer.rs" + # ------------------------------------------------------------------------------ # Features # ------------------------------------------------------------------------------ diff --git a/chaos/experiments/main.sh b/chaos/experiments/main.sh new file mode 100755 index 000000000..cb527ce94 --- /dev/null +++ b/chaos/experiments/main.sh @@ -0,0 +1,345 @@ +#!/bin/bash + +set -e + +# Default binary +binary="stratus" + +# Default number of instances +num_instances=3 + +# Default number of iterations +iterations=4 + +# Flag for enabling leader restart feature +enable_leader_restart=false + +# Parse command-line options +while [[ "$#" -gt 0 ]]; do + case $1 in + --bin) + binary="$2" + shift 2 + ;; + --instances) + num_instances="$2" + shift 2 + ;; + --iterations) + iterations="$2" + shift 2 + ;; + --enable-leader-restart) + if [[ "$2" == "true" || "$2" == "false" ]]; then + enable_leader_restart="$2" + shift 2 + else + echo "Error: --enable-leader-restart must be followed by true or false" + exit 1 + fi + ;; + *) + echo "Unknown parameter passed: $1" + echo "Usage: $0 [--bin binary] [--instances number] [--iterations number] [--enable-leader-restart true|false]" + exit 1 + ;; + esac +done + +echo "Using binary: $binary" +echo "Number of instances: $num_instances" +echo "Number of iterations: $iterations" +echo "Enable leader restart: $enable_leader_restart" + +cleanup() { + exit_code=$? + echo "Cleaning up..." + for port in "${ports[@]}"; do + killport --quiet $port || true + done + + sleep 2 + if [ $exit_code -eq 0 ]; then + rm instance_*.log 2>/dev/null || true + find . -xdev -type d -name "instance_*" -print0 | xargs -0 rm -rf 2>/dev/null || true + fi + rm -rf tmp_rocks_* 2>/dev/null || true + find . -xdev -type d -name "tmp_rocks_*" -print0 | xargs -0 rm -rf 2>/dev/null || true + echo "Job is done. With exit code $exit_code" +} +trap cleanup EXIT INT TERM + +send_transactions() { + echo "running transactions test" + + export STRATUS_PORT=3001 #TODO implement roundrobin on all nodes + cd ./e2e/cloudwalk-contracts/integration + npx hardhat test test/simple.test.ts --network stratus --bail + test_exit_code=$? + cd ../../.. + + if [ $test_exit_code -eq 0 ]; then + echo "Test contracts executed successfully." + else + echo "Test contracts failed with exit code $test_exit_code." + exit 1 + fi +} + +# Function to start an instance +start_instance() { + local address=$1 + local grpc_address=$2 + local rocks_path_prefix=$3 + local log_file=$4 + local candidate_peers=$5 + local tokio_console_address=$6 + local metrics_exporter_address=$7 + + RUST_LOG=info RUST_BACKTRACE=1 cargo run --release --bin $binary --features dev -- \ + --block-mode=1s \ + --candidate-peers="$candidate_peers" \ + -a=$address \ + --grpc-server-address=$grpc_address \ + --rocks-path-prefix=$rocks_path_prefix \ + --tokio-console-address=$tokio_console_address \ + --perm-storage=rocks \ + --metrics-exporter-address=$metrics_exporter_address >> $log_file 2>&1 & +} + +# Function to check liveness of an instance +check_liveness() { + local port=$1 + curl -s http://0.0.0.0:$port \ + --header "content-type: application/json" \ + --data '{"jsonrpc":"2.0","method":"stratus_health","params":[],"id":1}' | jq '.result' +} + +# Function to check if an instance is the leader +check_leader() { + local port=$1 + local response=$(curl -s http://0.0.0.0:$port \ + --header "content-type: application/json" \ + --data '{"jsonrpc":"2.0","method":"stratus_version","params":[],"id":1}') + + local is_leader=$(echo $response | jq -r '.result.consensus.is_leader') + local term=$(echo $response | jq -r '.result.consensus.current_term') + + echo "$is_leader $term" +} + +# Function to find the leader node +# Returns nothing if no leader is found or if there is mismatch between instances terms +# Otherwise returns a list of leader addresses +find_leader() { + local ports=("$@") + local leaders=() + local term="" + + for port in "${ports[@]}"; do + read -r is_leader current_term <<< "$(check_leader "$port")" + + if [ -z "$term" ]; then + term="$current_term" + elif [ "$term" != "$current_term" ]; then + echo "" + return + fi + + if [ "$is_leader" == "true" ]; then + leaders+=("$port") + fi + done + + if [ ${#leaders[@]} -gt 0 ]; then + echo "${leaders[@]}" + fi +} + +# Function to run the election test +run_test() { + local instances=() + local all_addresses=() + + for ((i=1; i<=num_instances; i++)); do + local base_port=$((3000 + i)) + local grpc_port=$((3777 + i)) + all_addresses+=("http://0.0.0.0:$base_port;$grpc_port") + done + + for ((i=1; i<=num_instances; i++)); do + local base_port=$((3000 + i)) + local grpc_port=$((3777 + i)) + local tokio_console_port=$((6668 + i)) + local metrics_exporter_port=$((9000 + i)) + + # Exclude current instance's address to get candidate_peers + local candidate_peers=($(printf "%s\n" "${all_addresses[@]}")) + local candidate_peers_str="" + + if [ ${#candidate_peers[@]} -gt 0 ]; then + candidate_peers_str=$(printf ",%s" "${candidate_peers[@]}") + candidate_peers_str=${candidate_peers_str:1} + fi + instances+=("0.0.0.0:$base_port 0.0.0.0:$grpc_port tmp_rocks_$base_port instance_$base_port.log $base_port ${candidate_peers_str} 0.0.0.0:$tokio_console_port 0.0.0.0:$metrics_exporter_port") + done + + # Start instances + echo "Starting $num_instances instance(s)..." + ports=() + grpc_addresses=() + rocks_paths=() + liveness=() + for instance in "${instances[@]}"; do + IFS=' ' read -r -a params <<< "$instance" + start_instance "${params[0]}" "${params[1]}" "${params[2]}" "${params[3]}" "${params[5]}" "${params[6]}" "${params[7]}" + ports+=("${params[4]}") + grpc_addresses+=("${params[1]}") + rocks_paths+=("${params[2]}") + liveness+=(false) + sleep 15 + done + + all_ready=false + while [ "$all_ready" != true ]; do + all_ready=true + for i in "${!ports[@]}"; do + if [ "${liveness[$i]}" != true ]; then + response=$(check_liveness "${ports[$i]}") + if [ "$response" = "true" ]; then + liveness[$i]=true + echo "Instance on port ${ports[$i]} is ready." + else + all_ready=false + fi + fi + done + if [ "$all_ready" != true ]; then + echo "Waiting for all instances to be ready..." + sleep 5 + fi + done + + echo "All instances are ready. Waiting for leader election" + + # Maximum timeout duration in seconds for the initial leader election + initial_leader_timeout=120 + + # Capture the start time + initial_start_time=$(date +%s) + + # Wait for the initial leader election with a timeout + while true; do + current_time=$(date +%s) + elapsed_time=$((current_time - initial_start_time)) + + if [ $elapsed_time -ge $initial_leader_timeout ]; then + echo "Timeout reached without finding an initial leader." + exit 1 + fi + + leader_ports=($(find_leader "${ports[@]}")) + if [ ${#leader_ports[@]} -gt 1 ]; then + echo "Error: More than one leader found: ${leader_ports[*]}" + exit 1 + elif [ ${#leader_ports[@]} -eq 1 ]; then + leader_port=${leader_ports[0]} + echo "Leader found on address $leader_port" + break + else + sleep 1 + fi + done + + if [ -z "$leader_port" ]; then + echo "Exiting due to leader election failure." + exit 1 + fi + + send_transactions + + if [ "$enable_leader_restart" = true ]; then + # Kill the leader instance + echo "Killing the leader instance on address $leader_port..." + for i in "${!leader_ports[@]}"; do + if [ "${leader_ports[i]}" == "$leader_port" ]; then + killport --quiet $leader_port || true + break + fi + done + + if [ $num_instances -gt 1 ]; then + sleep 40 # wait for leader election before raising the other instance to avoid split vote + fi + + # Restart the killed instance + echo "Restarting the killed instance..." + for i in "${!instances[@]}"; do + IFS=' ' read -r -a params <<< "${instances[i]}" + if [ "${params[4]}" == "$leader_port" ]; then + start_instance "${params[0]}" "${params[1]}" "${params[2]}" "${params[3]}" "${params[5]}" "${params[6]}" "${params[7]}" + liveness[i]=false + break + fi + done + + restart_all_ready=false + while [ "$restart_all_ready" != true ]; do + restart_all_ready=true + for i in "${!ports[@]}"; do + if [ "${liveness[$i]}" != true ]; then + response=$(check_liveness "${ports[$i]}") + if [ "$response" = "true" ]; then + liveness[$i]=true + echo "Instance on address ${ports[$i]} is ready." + else + restart_all_ready=false + fi + fi + done + if [ "$restart_all_ready" != true ]; then + echo "Waiting for all instances to be ready..." + sleep 5 + fi + done + + echo "All instances are ready after restart. Waiting for new leader election." + + # Maximum timeout duration in seconds for new leader election + max_timeout=120 + + # Capture the start time + start_time=$(date +%s) + + # Wait until a new leader is found or timeout + while true; do + current_time=$(date +%s) + elapsed_time=$((current_time - start_time)) + + if [ $elapsed_time -ge $max_timeout ]; then + echo "Timeout reached without finding a new leader." + exit 1 + fi + + leader_ports=($(find_leader "${ports[@]}")) + if [ ${#leader_ports[@]} -gt 1 ]; then + echo "Error: More than one leader found: ${leader_ports[*]}" + exit 1 + elif [ ${#leader_ports[@]} -eq 1 ]; then + leader_port=${leader_ports[0]} + echo "Leader found on address $leader_port" + break + else + sleep 1 + fi + done + fi +} + +# Run the test n times +for ((iteration_n=1; iteration_n<=$iterations; iteration_n++)); do + echo -e "\n##############################################\n" + echo "Running binary $binary test iteration $iteration_n of $iterations..." + run_test + sleep 5 +done diff --git a/e2e/cloudwalk-contracts/integration/test/helpers/rpc.ts b/e2e/cloudwalk-contracts/integration/test/helpers/rpc.ts index 8ca9a1392..9211c63e5 100644 --- a/e2e/cloudwalk-contracts/integration/test/helpers/rpc.ts +++ b/e2e/cloudwalk-contracts/integration/test/helpers/rpc.ts @@ -44,7 +44,7 @@ export let ETHERJS = new JsonRpcProvider(providerUrl, undefined); export function updateProviderUrl(providerName: string) { switch (providerName) { - case "importer": + case "run-with-importer": providerUrl = "http://localhost:3001?app=e2e"; break; case "stratus": diff --git a/e2e/cloudwalk-contracts/integration/test/importer.test.ts b/e2e/cloudwalk-contracts/integration/test/importer.test.ts index 451125a55..40033e69e 100644 --- a/e2e/cloudwalk-contracts/integration/test/importer.test.ts +++ b/e2e/cloudwalk-contracts/integration/test/importer.test.ts @@ -13,7 +13,7 @@ import { updateProviderUrl, } from "./helpers/rpc"; -describe("Importer integration test", function () { +describe("Run With Importer integration test", function () { before(async function () { await setDeployer(); }); @@ -92,7 +92,7 @@ describe("Importer integration test", function () { } }); - it(`${params.name}: Validate transaction mined delay between Stratus and Importer`, async function () { + it(`${params.name}: Validate transaction mined delay between Stratus and Run With Importer`, async function () { // Get Stratus timestamps updateProviderUrl("stratus"); const stratusTimestamps = await Promise.all( @@ -103,9 +103,9 @@ describe("Importer integration test", function () { }), ); - // Get Importer timestamps - updateProviderUrl("importer"); - const importerTimestamps = await Promise.all( + // Get Run With Importer timestamps + updateProviderUrl("run-with-importer"); + const runWithImporterTimestamps = await Promise.all( txHashList.map(async (txHash) => { const receipt = await sendWithRetry("eth_getTransactionReceipt", [txHash], 20); const block = await sendWithRetry("eth_getBlockByNumber", [receipt.blockNumber, false]); @@ -116,17 +116,17 @@ describe("Importer integration test", function () { // Total time it took for Stratus to process all the blocks containing transactions const stratusProcessingTime = stratusTimestamps[stratusTimestamps.length - 1] - stratusTimestamps[0]; - // Total time it took for Importer to process all the blocks containing transactions - const importerProcessingTime = - importerTimestamps[importerTimestamps.length - 1] - importerTimestamps[0]; + // Total time it took for Run With Importer to process all the blocks containing transactions + const runWithImporterProcessingTime = + runWithImporterTimestamps[runWithImporterTimestamps.length - 1] - runWithImporterTimestamps[0]; console.log(` ✔ Number of transactions sent: ${txHashList.length}`); console.log( - ` ✔ Stratus processing time: ${stratusProcessingTime}s | Importer processing time: ${importerProcessingTime}s`, + ` ✔ Stratus processing time: ${stratusProcessingTime}s | Run With Importer processing time: ${runWithImporterProcessingTime}s`, ); }); - it(`${params.name}: Validate all transactions were imported from Stratus to Importer`, async function () { + it(`${params.name}: Validate all transactions were imported from Stratus to Run With Importer`, async function () { // Get Stratus transaction receipts updateProviderUrl("stratus"); const stratusReceipts = await Promise.all( @@ -136,9 +136,9 @@ describe("Importer integration test", function () { }), ); - // Get Importer transaction receipts - updateProviderUrl("importer"); - const importerReceipts = await Promise.all( + // Get Run With Importer transaction receipts + updateProviderUrl("run-with-importer"); + const runWithImporterReceipts = await Promise.all( txHashList.map(async (txHash) => { const receipt = await sendWithRetry("eth_getTransactionReceipt", [txHash]); return receipt; @@ -148,11 +148,11 @@ describe("Importer integration test", function () { // Assert that all transactions were imported for (let i = 0; i < txHashList.length; i++) { expect(stratusReceipts[i]).to.exist; - expect(importerReceipts[i]).to.exist; + expect(runWithImporterReceipts[i]).to.exist; } }); - it(`${params.name}: Validate each transaction was imported into the same block between Stratus and Importer`, async function () { + it(`${params.name}: Validate each transaction was imported into the same block between Stratus and Run With Importer`, async function () { // Get Stratus block numbers updateProviderUrl("stratus"); const stratusBlockNumbers = await Promise.all( @@ -162,38 +162,38 @@ describe("Importer integration test", function () { }), ); - // Get Importer block numbers - updateProviderUrl("importer"); - const importerBlockNumbers = await Promise.all( + // Get Run With Importer block numbers + updateProviderUrl("run-with-importer"); + const runWithImporterBlockNumbers = await Promise.all( txHashList.map(async (txHash) => { const receipt = await sendWithRetry("eth_getTransactionReceipt", [txHash], 20); return receipt.blockNumber; }), ); - // Assert that each transaction fell into the same block between Stratus and Importer + // Assert that each transaction fell into the same block between Stratus and Run With Importer for (let i = 0; i < txHashList.length; i++) { expect( stratusBlockNumbers[i], - `Transaction ${txHashList[i]} did not fall into the same block between Stratus and Importer`, - ).to.equal(importerBlockNumbers[i]); + `Transaction ${txHashList[i]} did not fall into the same block between Stratus and Run With Importer`, + ).to.equal(runWithImporterBlockNumbers[i]); } }); - it(`${params.name}: Validate balances between Stratus and Importer`, async function () { + it(`${params.name}: Validate balances between Stratus and Run With Importer`, async function () { for (let i = 0; i < wallets.length; i++) { // Get Stratus balance updateProviderUrl("stratus"); const stratusBalance = await brlcToken.balanceOf(wallets[i].address); - // Get Importer balance - updateProviderUrl("importer"); - const importerBalance = await brlcToken.balanceOf(wallets[i].address); + // Get Run With Importer balance + updateProviderUrl("run-with-importer"); + const runWithImporterBalance = await brlcToken.balanceOf(wallets[i].address); // Assert that the balances are equal expect(stratusBalance).to.equal( - importerBalance, - `Wallet ${wallets[i].address} balances are not equal between Stratus and Importer`, + runWithImporterBalance, + `Wallet ${wallets[i].address} balances are not equal between Stratus and Run With Importer`, ); } updateProviderUrl("stratus"); diff --git a/e2e/cloudwalk-contracts/integration/test/simple.test.ts b/e2e/cloudwalk-contracts/integration/test/simple.test.ts new file mode 100644 index 000000000..91b7fe482 --- /dev/null +++ b/e2e/cloudwalk-contracts/integration/test/simple.test.ts @@ -0,0 +1,114 @@ +import { expect } from "chai"; +import { ethers } from "hardhat"; + +import { + GAS_LIMIT_OVERRIDE, + brlcToken, + configureBRLC, + deployBRLC, + deployer, + send, + sendWithRetry, + setDeployer, +} from "./helpers/rpc"; + +describe("Integration test", function () { + before(async function () { + await setDeployer(); + }); + + describe("Deploy and configure BRLC contract", function () { + it("Validate deployer is main minter", async function () { + await deployBRLC(); + await configureBRLC(); + + expect(deployer.address).to.equal(await brlcToken.mainMinter()); + expect(await brlcToken.isMinter(deployer.address)).to.be.true; + }); + }); + + describe("Long duration transaction tests", function () { + const parameters = [ + { name: "Few wallets, sufficient balance", wallets: 3, duration: 5, tps: 15, baseBalance: 2000 }, + { name: "Few wallets, insufficient balance", wallets: 2, duration: 5, tps: 5, baseBalance: 5 }, + { name: "Many wallets, sufficient balance", wallets: 20, duration: 5, tps: 5, baseBalance: 2000 }, + ]; + parameters.forEach((params, index) => { + const wallets: any[] = []; + it(`${params.name}: Prepare and mint BRLC to wallets`, async function () { + this.timeout(params.wallets * 1000 + 10000); + + for (let i = 0; i < params.wallets; i++) { + const wallet = ethers.Wallet.createRandom().connect(ethers.provider); + wallets.push(wallet); + } + + for (let i = 0; i < wallets.length; i++) { + const wallet = wallets[i]; + expect( + await brlcToken.mint(wallet.address, params.baseBalance, { gasLimit: GAS_LIMIT_OVERRIDE }), + ).to.have.changeTokenBalance(brlcToken, wallet, params.baseBalance); + } + }); + + let txHashList: string[] = []; + it(`${params.name}: Transfer BRLC between wallets at a configurable TPS`, async function () { + this.timeout(params.duration * 1000 + 10000); + + const transactionInterval = 1000 / params.tps; + + let nonces = await Promise.all( + wallets.map((wallet) => send("eth_getTransactionCount", [wallet.address, "latest"])), + ); + + const startTime = Date.now(); + while (Date.now() - startTime < params.duration * 1000) { + let senderIndex; + let receiverIndex; + do { + senderIndex = Math.floor(Math.random() * wallets.length); + receiverIndex = Math.floor(Math.random() * wallets.length); + } while (senderIndex === receiverIndex); + + let sender = wallets[senderIndex]; + let receiver = wallets[receiverIndex]; + + const amount = Math.floor(Math.random() * 10) + 1; + + try { + const tx = await brlcToken.connect(sender).transfer(receiver.address, amount, { + gasPrice: 0, + gasLimit: GAS_LIMIT_OVERRIDE, + type: 0, + nonce: nonces[senderIndex], + }); + txHashList.push(tx.hash); + } catch (error) {} + + nonces[senderIndex]++; + + await new Promise((resolve) => setTimeout(resolve, transactionInterval)); + } + }); + + it(`${params.name}: Validate transactions are present on blocks`, async function () { + const allBlockTransactions = new Set(); + + await Promise.all( + txHashList.map(async (txHash) => { + const receipt = await sendWithRetry("eth_getTransactionReceipt", [txHash]); + const block = await sendWithRetry("eth_getBlockByNumber", [receipt.blockNumber, false]); + + block.transactions.forEach((tx) => allBlockTransactions.add(tx)); + }), + ); + + const missingTransactions = txHashList.filter((txHash) => !allBlockTransactions.has(txHash)); + + if (missingTransactions.length > 0) { + throw new Error(`Missing transactions: ${missingTransactions.join(", ")}`); + } + }); + }); + }); +}); diff --git a/justfile b/justfile index 7ddb345e4..49ac6b42c 100644 --- a/justfile +++ b/justfile @@ -97,6 +97,18 @@ rpc-downloader *args="": importer-offline *args="": cargo {{nightly_flag}} run --bin importer-offline {{release_flag}} -- {{args}} +# Bin: Import external RPC blocks from external RPC endpoint to Stratus storage +importer-online *args="": + cargo {{nightly_flag}} run --bin importer-online {{release_flag}} -- {{args}} + +# Bin: Validate Stratus storage slots matches reference slots +state-validator *args="": + cargo {{nightly_flag}} run --bin state-validator {{release_flag}} -- {{args}} + +# Bin: `stratus` and `importer-online` in a single binary +run-with-importer *args="": + cargo {{nightly_flag}} run --bin run-with-importer {{release_flag}} -- {{args}} + # ------------------------------------------------------------------------------ # Test tasks # ------------------------------------------------------------------------------ @@ -153,7 +165,7 @@ e2e-stratus block-mode="automine" test="": just _log "Starting Stratus" just build "dev" || exit 1 - just run -a 0.0.0.0:3000 --leader --block-mode {{block-mode}} > stratus.log & + just run -a 0.0.0.0:3000 --block-mode {{block-mode}} > stratus.log & just _wait_for_stratus @@ -174,7 +186,7 @@ e2e-stratus-rocks block-mode="automine" test="": just _log "Starting Stratus" just build "dev" || exit 1 - just run -a 0.0.0.0:3000 --leader --block-mode {{block-mode}} --perm-storage=rocks > stratus.log & + just run -a 0.0.0.0:3000 --block-mode {{block-mode}} --perm-storage=rocks > stratus.log & just _wait_for_stratus @@ -191,7 +203,7 @@ e2e-clock-stratus: #!/bin/bash just _log "Starting Stratus" just build "dev" || exit 1 - cargo run --release --bin stratus --features dev -- --leader --block-mode 1s -a 0.0.0.0:3000 > stratus.log & + cargo run --release --bin stratus --features dev -- --block-mode 1s -a 0.0.0.0:3000 > stratus.log & just _wait_for_stratus @@ -208,7 +220,7 @@ e2e-clock-stratus-rocks: #!/bin/bash just _log "Starting Stratus" just build "dev" || exit 1 - cargo run --release --bin stratus --features dev -- --leader --block-mode 1s --perm-storage=rocks -a 0.0.0.0:3000 > stratus.log & + cargo run --release --bin stratus --features dev -- --block-mode 1s --perm-storage=rocks -a 0.0.0.0:3000 > stratus.log & just _wait_for_stratus @@ -266,22 +278,22 @@ e2e-importer-online: e2e-importer-online-up: #!/bin/bash - # Build Stratus binary - just _log "Building Stratus binary" - cargo build --release --bin stratus --features dev + # Build Stratus and Run With Importer binaries + just _log "Building Stratus and Run With Importer binaries" + cargo build --release --bin stratus --bin run-with-importer --features dev mkdir e2e_logs - # Start Stratus with leader flag - RUST_LOG=info cargo run --release --bin stratus --features dev -- --leader --block-mode 1s --perm-storage=rocks --rocks-path-prefix=temp_3000 --tokio-console-address=0.0.0.0:6668 --metrics-exporter-address=0.0.0.0:9000 -a 0.0.0.0:3000 > e2e_logs/stratus.log & + # Start Stratus binary + RUST_LOG=info cargo run --release --bin stratus --features dev -- --block-mode 1s --perm-storage=rocks --rocks-path-prefix=temp_3000 --tokio-console-address=0.0.0.0:6668 --metrics-exporter-address=0.0.0.0:9000 -a 0.0.0.0:3000 > e2e_logs/stratus.log & - # Wait for Stratus with leader flag to start + # Wait for Stratus to start just _wait_for_stratus 3000 - # Start Stratus with follower flag - RUST_LOG=info cargo run --release --bin stratus --features dev -- --follower --perm-storage=rocks --rocks-path-prefix=temp_3001 --tokio-console-address=0.0.0.0:6669 --metrics-exporter-address=0.0.0.0:9001 -a 0.0.0.0:3001 -r http://0.0.0.0:3000/ -w ws://0.0.0.0:3000/ > e2e_logs/importer.log & + # Start Run With Importer binary + RUST_LOG=info cargo run --release --bin run-with-importer --features dev -- --block-mode 1s --perm-storage=rocks --rocks-path-prefix=temp_3001 --tokio-console-address=0.0.0.0:6669 --metrics-exporter-address=0.0.0.0:9001 -a 0.0.0.0:3001 -r http://0.0.0.0:3000/ -w ws://0.0.0.0:3000/ > e2e_logs/run_with_importer.log & - # Wait for Stratus with follower flag to start + # Wait for Run With Importer to start just _wait_for_stratus 3001 if [ -d e2e/cloudwalk-contracts ]; then @@ -303,8 +315,12 @@ e2e-importer-online-up: e2e-importer-online-down: #!/bin/bash - # Kill Stratus + # Kill run-with-importer killport 3001 + run_with_importer_pid=$(pgrep -f 'run-with-importer') + kill $run_with_importer_pid + + # Kill Stratus killport 3000 stratus_pid=$(pgrep -f 'stratus') kill $stratus_pid @@ -315,6 +331,26 @@ e2e-importer-online-down: # Delete zeppelin directory rm -rf ./e2e/cloudwalk-contracts/integration/.openzeppelin +# ------------------------------------------------------------------------------ +# Chaos tests +# ------------------------------------------------------------------------------ + +# Chaos: Run chaos testing experiment +run-chaos-experiment bin="" instances="" iterations="" enable-leader-restart="" experiment="": + #!/bin/bash + + just _log "Building Stratus" + cargo build --release --bin {{ bin }} --features dev + + cd e2e/cloudwalk-contracts/integration + if [ ! -d node_modules ]; then + npm install + fi + cd ../../.. + + just _log "Executing experiment {{ experiment }} {{ iterations }}x on {{ bin }} binary with {{ instances }} instance(s)" + ./chaos/experiments/{{ experiment }}.sh --bin {{ bin }} --instances {{ instances }} --iterations {{ iterations }} --enable-leader-restart {{ enable-leader-restart }} + # ------------------------------------------------------------------------------ # Hive tests # ------------------------------------------------------------------------------ @@ -371,7 +407,7 @@ contracts-test-stratus *args="": #!/bin/bash just _log "Starting Stratus" just build "dev" || exit 1 - just run --leader -a 0.0.0.0:3000 & + just run -a 0.0.0.0:3000 & just _wait_for_stratus @@ -388,7 +424,7 @@ contracts-test-stratus-rocks *args="": #!/bin/bash just _log "Starting Stratus" just build "dev" || exit 1 - just run --leader -a 0.0.0.0:3000 --perm-storage=rocks > stratus.log & + just run -a 0.0.0.0:3000 --perm-storage=rocks > stratus.log & just _wait_for_stratus diff --git a/src/bin/importer_online.rs b/src/bin/importer_online.rs new file mode 100644 index 000000000..4a3e52a79 --- /dev/null +++ b/src/bin/importer_online.rs @@ -0,0 +1,420 @@ +use std::cmp::min; +use std::sync::atomic::AtomicU64; +use std::sync::atomic::Ordering; +use std::sync::Arc; +use std::time::Duration; + +use futures::try_join; +use futures::StreamExt; +use serde::Deserialize; +use stratus::config::ImporterOnlineConfig; +use stratus::eth::executor::Executor; +use stratus::eth::miner::Miner; +use stratus::eth::primitives::BlockNumber; +use stratus::eth::primitives::ExternalBlock; +use stratus::eth::primitives::ExternalReceipt; +use stratus::eth::primitives::ExternalReceipts; +use stratus::eth::primitives::Hash; +use stratus::eth::storage::StratusStorage; +use stratus::ext::spawn_named; +use stratus::ext::traced_sleep; +use stratus::ext::DisplayExt; +use stratus::ext::SleepReason; +use stratus::if_else; +#[cfg(feature = "metrics")] +use stratus::infra::metrics; +use stratus::infra::tracing::warn_task_rx_closed; +use stratus::infra::tracing::warn_task_tx_closed; +use stratus::infra::tracing::SpanExt; +use stratus::infra::BlockchainClient; +use stratus::log_and_err; +#[cfg(feature = "metrics")] +use stratus::utils::calculate_tps; +use stratus::utils::DropTimer; +use stratus::GlobalServices; +use stratus::GlobalState; +use tokio::sync::mpsc; +use tokio::task::yield_now; +use tokio::time::timeout; +use tracing::Span; + +// ----------------------------------------------------------------------------- +// Globals +// ----------------------------------------------------------------------------- + +/// Current block number of the external RPC blockchain. +static EXTERNAL_RPC_CURRENT_BLOCK: AtomicU64 = AtomicU64::new(0); + +/// Only sets the external RPC current block number if it is equals or greater than the current one. +fn set_external_rpc_current_block(new_number: BlockNumber) { + let new_number_u64 = new_number.as_u64(); + let _ = EXTERNAL_RPC_CURRENT_BLOCK.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |current_number| { + if_else!(new_number_u64 >= current_number, Some(new_number_u64), None) + }); +} + +// ----------------------------------------------------------------------------- +// Constants +// ----------------------------------------------------------------------------- +/// Number of blocks that are downloaded in parallel. +const PARALLEL_BLOCKS: usize = 3; + +/// Number of receipts that are downloaded in parallel. +const PARALLEL_RECEIPTS: usize = 100; + +/// Timeout awaiting for newHeads event before fallback to polling. +const TIMEOUT_NEW_HEADS: Duration = Duration::from_millis(2000); + +/// Interval before we starting retrieving receipts because they are not immediately available after the block is retrieved. +const INTERVAL_FETCH_RECEIPTS: Duration = Duration::from_millis(50); + +// ----------------------------------------------------------------------------- +// Execution +// ----------------------------------------------------------------------------- +#[allow(dead_code)] +fn main() -> anyhow::Result<()> { + let global_services = GlobalServices::::init(); + global_services.runtime.block_on(run(global_services.config)) +} + +async fn run(config: ImporterOnlineConfig) -> anyhow::Result<()> { + let _timer = DropTimer::start("importer-online"); + + // init server + let storage = config.storage.init()?; + let miner = config.miner.init_external_mode(Arc::clone(&storage))?; + let executor = config.executor.init(Arc::clone(&storage), Arc::clone(&miner)); + let chain = Arc::new( + BlockchainClient::new_http_ws( + &config.base.external_rpc, + config.base.external_rpc_ws.as_deref(), + config.base.external_rpc_timeout, + ) + .await?, + ); + + let result = run_importer_online(executor, miner, Arc::clone(&storage), chain, config.base.sync_interval).await; + if let Err(ref e) = result { + tracing::error!(reason = ?e, "importer-online failed"); + } + + // Explicitly block the `main` thread to drop the storage. + drop(storage); + + result +} + +pub async fn run_importer_online( + executor: Arc, + miner: Arc, + storage: Arc, + chain: Arc, + sync_interval: Duration, +) -> anyhow::Result<()> { + let _timer = DropTimer::start("importer-online::run_importer_online"); + + let number = storage.read_block_number_to_resume_import()?; + + let (backlog_tx, backlog_rx) = mpsc::unbounded_channel(); + + // spawn block executor: + // it executes and mines blocks and expects to receive them via channel in the correct order. + let task_executor = spawn_named("importer::executor", start_block_executor(executor, miner, backlog_rx)); + + // spawn block number: + // it keeps track of the blockchain current block number. + let number_fetcher_chain = Arc::clone(&chain); + let task_number_fetcher = spawn_named("importer::number-fetcher", start_number_fetcher(number_fetcher_chain, sync_interval)); + + // spawn block fetcher: + // it fetches blocks and receipts in parallel and sends them to the executor in the correct order. + // it uses the number fetcher current block to determine if should keep downloading more blocks or not. + let block_fetcher_chain = Arc::clone(&chain); + let task_block_fetcher = spawn_named("importer::block-fetcher", start_block_fetcher(block_fetcher_chain, backlog_tx, number)); + + // await all tasks + if let Err(e) = try_join!(task_executor, task_block_fetcher, task_number_fetcher) { + tracing::error!(reason = ?e, "importer-online failed"); + } + Ok(()) +} + +// ----------------------------------------------------------------------------- +// Executor +// ----------------------------------------------------------------------------- + +// Executes external blocks and persist them to storage. +async fn start_block_executor( + executor: Arc, + miner: Arc, + mut backlog_rx: mpsc::UnboundedReceiver<(ExternalBlock, Vec)>, +) -> anyhow::Result<()> { + const TASK_NAME: &str = "block-executor"; + + loop { + if GlobalState::is_shutdown_warn(TASK_NAME) { + return Ok(()); + } + + let (block, receipts) = match timeout(Duration::from_secs(2), backlog_rx.recv()).await { + Ok(Some(inner)) => inner, + Ok(None) => break, // channel closed + Err(_timed_out) => { + tracing::warn!(timeout = "2s", "timeout reading block executor channel, expected around 1 block per second"); + continue; + } + }; + + #[cfg(feature = "metrics")] + let start = metrics::now(); + + // execute and mine + let receipts = ExternalReceipts::from(receipts); + if let Err(e) = executor.execute_external_block(&block, &receipts) { + let message = GlobalState::shutdown_from(TASK_NAME, "failed to reexecute external block"); + return log_and_err!(reason = e, message); + }; + + // statistics + #[cfg(feature = "metrics")] + { + let duration = start.elapsed(); + let tps = calculate_tps(duration, block.transactions.len()); + + tracing::info!( + tps, + duraton = %duration.to_string_ext(), + block_number = %block.number(), + receipts = receipts.len(), + "reexecuted external block", + ); + } + + if let Err(e) = miner.mine_external_mixed_and_commit() { + let message = GlobalState::shutdown_from(TASK_NAME, "failed to mine external block"); + return log_and_err!(reason = e, message); + }; + + #[cfg(feature = "metrics")] + { + metrics::inc_n_importer_online_transactions_total(receipts.len() as u64); + metrics::inc_import_online_mined_block(start.elapsed()); + } + } + + warn_task_tx_closed(TASK_NAME); + Ok(()) +} + +// ----------------------------------------------------------------------------- +// Number fetcher +// ----------------------------------------------------------------------------- + +/// Retrieves the blockchain current block number. +async fn start_number_fetcher(chain: Arc, sync_interval: Duration) -> anyhow::Result<()> { + const TASK_NAME: &str = "external-number-fetcher"; + + // initial newHeads subscriptions. + // abort application if cannot subscribe. + let mut sub_new_heads = if chain.supports_ws() { + tracing::info!("{} subscribing to newHeads event", TASK_NAME); + + match chain.subscribe_new_heads().await { + Ok(sub) => { + tracing::info!("{} subscribed to newHeads events", TASK_NAME); + Some(sub) + } + Err(e) => { + let message = GlobalState::shutdown_from(TASK_NAME, "cannot subscribe to newHeads event"); + return log_and_err!(reason = e, message); + } + } + } else { + tracing::warn!("{} blockchain client does not have websocket enabled", TASK_NAME); + None + }; + + // keep reading websocket subscription or polling via http. + loop { + if GlobalState::is_shutdown_warn(TASK_NAME) { + return Ok(()); + } + + // if we have a subscription, try to read from subscription. + // in case of failure, re-subscribe because current subscription may have been closed in the server. + if let Some(sub) = &mut sub_new_heads { + tracing::info!("{} awaiting block number from newHeads subscription", TASK_NAME); + let resubscribe_ws = match timeout(TIMEOUT_NEW_HEADS, sub.next()).await { + Ok(Some(Ok(block))) => { + tracing::info!(block_number = %block.number(), "{} received newHeads event", TASK_NAME); + set_external_rpc_current_block(block.number()); + continue; + } + Ok(None) => { + tracing::error!("{} newHeads subscription closed by the other side", TASK_NAME); + true + } + Ok(Some(Err(e))) => { + tracing::error!(reason = ?e, "{} failed to read newHeads subscription event", TASK_NAME); + true + } + Err(_) => { + tracing::error!("{} timed-out waiting for newHeads subscription event", TASK_NAME); + true + } + }; + + // resubscribe if necessary. + // only update the existing subscription if succedeed, otherwise we will try again in the next iteration. + if chain.supports_ws() && resubscribe_ws { + tracing::info!("{} resubscribing to newHeads event", TASK_NAME); + match chain.subscribe_new_heads().await { + Ok(sub) => { + tracing::info!("{} resubscribed to newHeads event", TASK_NAME); + sub_new_heads = Some(sub); + } + Err(e) => { + tracing::error!(reason = ?e, "{} failed to resubscribe to newHeads event", TASK_NAME); + } + } + } + } + + // fallback to polling + tracing::warn!("{} falling back to http polling because subscription failed or it is not enabled", TASK_NAME); + match chain.fetch_block_number().await { + Ok(block_number) => { + tracing::info!( + %block_number, + sync_interval = %sync_interval.to_string_ext(), + "fetched current block number via http. awaiting sync interval to retrieve again." + ); + set_external_rpc_current_block(block_number); + traced_sleep(sync_interval, SleepReason::SyncData).await; + } + Err(e) => { + tracing::error!(reason = ?e, "failed to retrieve block number. retrying now."); + } + } + } +} + +// ----------------------------------------------------------------------------- +// Block fetcher +// ----------------------------------------------------------------------------- + +/// Retrieves blocks and receipts. +async fn start_block_fetcher( + chain: Arc, + backlog_tx: mpsc::UnboundedSender<(ExternalBlock, Vec)>, + mut importer_block_number: BlockNumber, +) -> anyhow::Result<()> { + const TASK_NAME: &str = "external-block-fetcher"; + + loop { + if GlobalState::is_shutdown_warn(TASK_NAME) { + return Ok(()); + } + + // if we are ahead of current block number, await until we are behind again + let external_rpc_current_block = EXTERNAL_RPC_CURRENT_BLOCK.load(Ordering::Relaxed); + if importer_block_number.as_u64() > external_rpc_current_block { + yield_now().await; + continue; + } + + // we are behind current, so we will fetch multiple blocks in parallel to catch up + let blocks_behind = external_rpc_current_block.saturating_sub(importer_block_number.as_u64()) + 1; // TODO: use count_to from BlockNumber + let mut blocks_to_fetch = min(blocks_behind, 1_000); // avoid spawning millions of tasks (not parallelism), at least until we know it is safe + tracing::info!(%blocks_behind, blocks_to_fetch, "catching up with blocks"); + + let mut tasks = Vec::with_capacity(blocks_to_fetch as usize); + while blocks_to_fetch > 0 { + blocks_to_fetch -= 1; + tasks.push(fetch_block_and_receipts(Arc::clone(&chain), importer_block_number)); + importer_block_number = importer_block_number.next(); + } + + // keep fetching in order + let mut tasks = futures::stream::iter(tasks).buffered(PARALLEL_BLOCKS); + while let Some((block, receipts)) = tasks.next().await { + if backlog_tx.send((block, receipts)).is_err() { + warn_task_rx_closed(TASK_NAME); + return Ok(()); + } + } + } +} + +#[tracing::instrument(name = "importer::fetch_block_and_receipts", skip_all, fields(block_number))] +async fn fetch_block_and_receipts(chain: Arc, block_number: BlockNumber) -> (ExternalBlock, Vec) { + Span::with(|s| { + s.rec_str("block_number", &block_number); + }); + + // fetch block + let block = fetch_block(Arc::clone(&chain), block_number).await; + + // wait some time until receipts are available + let _ = traced_sleep(INTERVAL_FETCH_RECEIPTS, SleepReason::SyncData).await; + + // fetch receipts in parallel + let mut receipts_tasks = Vec::with_capacity(block.transactions.len()); + for hash in block.transactions.iter().map(|tx| tx.hash()) { + receipts_tasks.push(fetch_receipt(Arc::clone(&chain), block_number, hash)); + } + let receipts = futures::stream::iter(receipts_tasks).buffer_unordered(PARALLEL_RECEIPTS).collect().await; + + (block, receipts) +} + +#[tracing::instrument(name = "importer::fetch_block", skip_all, fields(block_number))] +async fn fetch_block(chain: Arc, block_number: BlockNumber) -> ExternalBlock { + const RETRY_DELAY: Duration = Duration::from_millis(10); + Span::with(|s| { + s.rec_str("block_number", &block_number); + }); + + loop { + tracing::info!(%block_number, "fetching block"); + let block = match chain.fetch_block(block_number).await { + Ok(json) => json, + Err(e) => { + tracing::warn!(reason = ?e, %block_number, delay_ms=%RETRY_DELAY.as_millis(), "failed to retrieve block. retrying with delay."); + traced_sleep(RETRY_DELAY, SleepReason::RetryBackoff).await; + continue; + } + }; + + if block.is_null() { + tracing::warn!(%block_number, delay_ms=%RETRY_DELAY.as_millis(), "block not mined yet. retrying with delay."); + traced_sleep(RETRY_DELAY, SleepReason::SyncData).await; + continue; + } + + return ExternalBlock::deserialize(&block).expect("cannot fail to deserialize external block"); + } +} + +#[tracing::instrument(name = "importer::fetch_receipt", skip_all, fields(block_number, tx_hash))] +async fn fetch_receipt(chain: Arc, block_number: BlockNumber, tx_hash: Hash) -> ExternalReceipt { + Span::with(|s| { + s.rec_str("block_number", &block_number); + s.rec_str("tx_hash", &tx_hash); + }); + + loop { + tracing::info!(%block_number, %tx_hash, "fetching receipt"); + + match chain.fetch_receipt(tx_hash).await { + Ok(Some(receipt)) => return receipt, + Ok(None) => { + tracing::warn!(%block_number, %tx_hash, "receipt not available yet because block is not mined. retrying now."); + continue; + } + Err(e) => { + tracing::error!(reason = ?e, %block_number, %tx_hash, "failed to fetch receipt. retrying now."); + } + } + } +} diff --git a/src/bin/run_with_importer.rs b/src/bin/run_with_importer.rs new file mode 100644 index 000000000..03960bc6c --- /dev/null +++ b/src/bin/run_with_importer.rs @@ -0,0 +1,78 @@ +mod importer_online; + +use std::sync::Arc; + +use importer_online::run_importer_online; +use stratus::config::RunWithImporterConfig; +use stratus::eth::rpc::serve_rpc; +use stratus::eth::Consensus; +use stratus::infra::BlockchainClient; +use stratus::GlobalServices; +use stratus::GlobalState; +use tokio::join; + +fn main() -> anyhow::Result<()> { + let global_services = GlobalServices::::init(); + global_services.runtime.block_on(run(global_services.config)) +} + +async fn run(config: RunWithImporterConfig) -> anyhow::Result<()> { + const TASK_NAME: &str = "run-with-importer"; + + // init services + let storage = config.storage.init()?; + let miner = config.miner.init_external_mode(Arc::clone(&storage))?; + let consensus = Consensus::new( + Arc::clone(&storage), + Arc::clone(&miner), + config.storage.perm_storage.rocks_path_prefix.clone(), + config.clone().candidate_peers.clone(), + Some(config.clone()), + config.rpc_server.address, + config.grpc_server_address, + ); // in development, with no leader configured, the current node ends up being the leader + let (http_url, ws_url) = consensus.get_chain_url().await.expect("chain url not found"); + let chain = Arc::new(BlockchainClient::new_http_ws(&http_url, ws_url.as_deref(), config.online.external_rpc_timeout).await?); + + let executor = config.executor.init(Arc::clone(&storage), Arc::clone(&miner)); + + let rpc_storage = Arc::clone(&storage); + let rpc_executor = Arc::clone(&executor); + let rpc_miner = Arc::clone(&miner); + + // run rpc and importer-online in parallel + let rpc_config = config.clone(); + let rpc_task = async move { + let res = serve_rpc( + // services + rpc_storage, + rpc_executor, + rpc_miner, + Arc::clone(&consensus), + // config + rpc_config.clone(), + rpc_config.rpc_server, + rpc_config.executor.executor_chain_id.into(), + ) + .await; + GlobalState::shutdown_from(TASK_NAME, "rpc server finished unexpectedly"); + res + }; + + let importer_task = async { + let res = run_importer_online(executor, miner, Arc::clone(&storage), chain, config.online.sync_interval).await; + GlobalState::shutdown_from(TASK_NAME, "importer online finished unexpectedly"); + res + }; + + // await both services to finish + let (rpc_result, importer_result) = join!(rpc_task, importer_task); + tracing::debug!(?rpc_result, ?importer_result, "rpc and importer tasks finished"); + rpc_result?; + importer_result?; + + // Explicitly block the `main` thread to drop the storage. + drop(storage); + + Ok(()) +} diff --git a/src/config.rs b/src/config.rs index 86fea61e6..c09eb6b13 100644 --- a/src/config.rs +++ b/src/config.rs @@ -7,7 +7,6 @@ use std::sync::atomic::Ordering; use std::time::Duration; use anyhow::anyhow; -use clap::ArgGroup; use clap::Parser; use display_json::DebugAsJson; use strum::VariantNames; @@ -15,7 +14,6 @@ use tokio::runtime::Builder; use tokio::runtime::Runtime; use crate::eth::executor::ExecutorConfig; -use crate::eth::importer::ImporterConfig; use crate::eth::miner::MinerConfig; use crate::eth::primitives::Address; use crate::eth::rpc::RpcServerConfig; @@ -158,14 +156,7 @@ impl CommonConfig { /// Configuration for main Stratus service. #[derive(DebugAsJson, Clone, Parser, derive_more::Deref, serde::Serialize)] -#[clap(group = ArgGroup::new("mode").required(true).args(&["leader", "follower"]))] pub struct StratusConfig { - #[arg(long = "leader", env = "LEADER", conflicts_with("follower"))] - pub leader: bool, - - #[arg(long = "follower", env = "FOLLOWER", conflicts_with("leader"))] - pub follower: bool, - #[clap(flatten)] pub rpc_server: RpcServerConfig, @@ -178,9 +169,6 @@ pub struct StratusConfig { #[clap(flatten)] pub miner: MinerConfig, - #[clap(flatten)] - pub importer: Option, - #[deref] #[clap(flatten)] pub common: CommonConfig, @@ -283,6 +271,126 @@ impl WithCommonConfig for ImporterOfflineConfig { } } +// ----------------------------------------------------------------------------- +// Config: ImporterOnline +// ----------------------------------------------------------------------------- + +/// Configuration for `importer-online` binary. +#[derive(DebugAsJson, Clone, Parser, derive_more::Deref, serde::Serialize)] +pub struct ImporterOnlineConfig { + #[clap(flatten)] + pub base: ImporterOnlineBaseConfig, + + #[clap(flatten)] + pub executor: ExecutorConfig, + + #[clap(flatten)] + pub miner: MinerConfig, + + #[clap(flatten)] + pub storage: StratusStorageConfig, + + #[deref] + #[clap(flatten)] + pub common: CommonConfig, +} + +#[derive(DebugAsJson, Clone, Parser, serde::Serialize)] +pub struct ImporterOnlineBaseConfig { + /// External RPC HTTP endpoint to sync blocks with Stratus. + #[arg(short = 'r', long = "external-rpc", env = "EXTERNAL_RPC")] + pub external_rpc: String, + + /// External RPC WS endpoint to sync blocks with Stratus. + #[arg(short = 'w', long = "external-rpc-ws", env = "EXTERNAL_RPC_WS")] + pub external_rpc_ws: Option, + + /// Timeout for blockchain requests (importer online) + #[arg(long = "external-rpc-timeout", value_parser=parse_duration, env = "EXTERNAL_RPC_TIMEOUT", default_value = "2s")] + pub external_rpc_timeout: Duration, + + #[arg(long = "sync-interval", value_parser=parse_duration, env = "SYNC_INTERVAL", default_value = "100ms")] + pub sync_interval: Duration, +} + +impl WithCommonConfig for ImporterOnlineConfig { + fn common(&self) -> &CommonConfig { + &self.common + } +} + +#[derive(DebugAsJson, Clone, Parser, derive_more::Deref, serde::Serialize)] +pub struct RunWithImporterConfig { + #[clap(flatten)] + pub rpc_server: RpcServerConfig, + + #[arg(long = "leader-node", env = "LEADER_NODE")] + pub leader_node: Option, // to simulate this in use locally with other nodes, you need to add the node name into /etc/hostname + + #[clap(flatten)] + pub online: ImporterOnlineBaseConfig, + + #[clap(flatten)] + pub storage: StratusStorageConfig, + + #[clap(flatten)] + pub executor: ExecutorConfig, + + #[clap(flatten)] + pub miner: MinerConfig, + + #[deref] + #[clap(flatten)] + pub common: CommonConfig, +} + +impl WithCommonConfig for RunWithImporterConfig { + fn common(&self) -> &CommonConfig { + &self.common + } +} + +// ----------------------------------------------------------------------------- +// Config: StateValidator +// ----------------------------------------------------------------------------- + +/// Configuration for `state-validator` binary. +#[derive(DebugAsJson, Clone, Parser, derive_more::Deref, serde::Serialize)] +pub struct StateValidatorConfig { + /// How many slots to validate per batch. 0 means every slot. + #[arg(long = "max-samples", env = "MAX_SAMPLES", default_value_t = 0)] + pub sample_size: u64, + + /// Seed to use when sampling. 0 for random seed. + #[arg(long = "seed", env = "SEED", default_value_t = 0, requires = "sample_size")] + pub seed: u64, + + /// Validate in batches of n blocks. + #[arg(short = 'i', long = "interval", env = "INTERVAL", default_value = "1000")] + pub interval: u64, + + /// What method to use when validating. + #[arg(short = 'm', long = "method", env = "METHOD")] + pub method: ValidatorMethodConfig, + + /// How many concurrent validation tasks to run + #[arg(short = 'c', long = "concurrent-tasks", env = "CONCURRENT_TASKS", default_value_t = 10)] + pub concurrent_tasks: u16, + + #[deref] + #[clap(flatten)] + pub common: CommonConfig, + + #[clap(flatten)] + pub storage: StratusStorageConfig, +} + +impl WithCommonConfig for StateValidatorConfig { + fn common(&self) -> &CommonConfig { + &self.common + } +} + // ----------------------------------------------------------------------------- // Config: Test // ----------------------------------------------------------------------------- diff --git a/src/eth/consensus/raft/append_log_entries_storage.rs b/src/eth/consensus/append_log_entries_storage.rs similarity index 99% rename from src/eth/consensus/raft/append_log_entries_storage.rs rename to src/eth/consensus/append_log_entries_storage.rs index b17840f5a..7331b4fb7 100644 --- a/src/eth/consensus/raft/append_log_entries_storage.rs +++ b/src/eth/consensus/append_log_entries_storage.rs @@ -141,7 +141,7 @@ mod tests { use tempfile::TempDir; use super::*; - use crate::eth::consensus::raft::tests::factories::*; + use crate::eth::consensus::tests::factories::*; fn setup_storage() -> AppendLogEntriesStorage { let temp_dir = TempDir::new().unwrap(); diff --git a/src/eth/consensus/raft/discovery.rs b/src/eth/consensus/discovery.rs similarity index 97% rename from src/eth/consensus/raft/discovery.rs rename to src/eth/consensus/discovery.rs index 6ae17c8e0..300c34b71 100644 --- a/src/eth/consensus/raft/discovery.rs +++ b/src/eth/consensus/discovery.rs @@ -9,9 +9,9 @@ use tokio::sync::Mutex; #[cfg(not(test))] use super::append_entry::append_entry_service_client::AppendEntryServiceClient; +use super::Consensus; use super::Peer; use super::PeerAddress; -use super::Raft; #[cfg(not(test))] use super::Role; use crate::ext::spawn_named; @@ -19,7 +19,7 @@ use crate::ext::spawn_named; use crate::infra::metrics; #[tracing::instrument(skip_all)] -pub async fn discover_peers(consensus: Arc) { +pub async fn discover_peers(consensus: Arc) { #[allow(unused_mut)] let mut new_peers: Vec<(PeerAddress, Peer)> = Vec::new(); @@ -79,7 +79,7 @@ pub async fn discover_peers(consensus: Arc) { } #[cfg(not(test))] // FIXME: This is a workaround to avoid running this code in tests we need a proper Tonic mock -async fn discover_peers_env(addresses: &[String], consensus: Arc) -> Result, anyhow::Error> { +async fn discover_peers_env(addresses: &[String], consensus: Arc) -> Result, anyhow::Error> { #[allow(unused_mut)] let mut peers: Vec<(PeerAddress, Peer)> = Vec::new(); diff --git a/src/eth/consensus/raft/log_entry.rs b/src/eth/consensus/log_entry.rs similarity index 100% rename from src/eth/consensus/raft/log_entry.rs rename to src/eth/consensus/log_entry.rs diff --git a/src/eth/consensus/mod.rs b/src/eth/consensus/mod.rs index 61979de0a..be9abfe44 100644 --- a/src/eth/consensus/mod.rs +++ b/src/eth/consensus/mod.rs @@ -1,13 +1,664 @@ -pub mod raft; -pub mod simple_consensus; -use async_trait::async_trait; +//FIXME temporarily there are a lot of allow dead_code and allow unused_imports, we need to deal with them properly later +#[allow(dead_code)] //TODO remove this +mod append_log_entries_storage; +mod discovery; +mod log_entry; +mod propagation; +pub mod utils; + +mod server; + +use std::collections::HashMap; +use std::net::SocketAddr; +use std::net::UdpSocket; +use std::sync::atomic::AtomicU64; +use std::sync::atomic::AtomicU8; +use std::sync::atomic::Ordering; +use std::sync::Arc; +use std::time::Duration; + +use anyhow::anyhow; +use rand::Rng; +use tokio::sync::broadcast; +use tokio::sync::Mutex; +use tokio::sync::RwLock; +use tokio::task::JoinHandle; +use tonic::Request; -use crate::eth::primitives::Bytes; use crate::eth::primitives::Hash; +#[allow(unused_imports)] +use crate::eth::primitives::TransactionExecution; +use crate::eth::storage::StratusStorage; +use crate::ext::spawn_named; +use crate::ext::traced_sleep; +use crate::ext::SleepReason; +use crate::infra::BlockchainClient; +use crate::GlobalState; + +pub mod append_entry { + #![allow(clippy::default_trait_access, clippy::wildcard_imports)] + tonic::include_proto!("append_entry"); +} +#[allow(unused_imports)] +use append_entry::append_entry_service_client::AppendEntryServiceClient; +use append_entry::append_entry_service_server::AppendEntryService; +use append_entry::RequestVoteRequest; +use append_entry::TransactionExecutionEntry; + +use self::append_log_entries_storage::AppendLogEntriesStorage; +use self::log_entry::LogEntryData; +use super::primitives::Bytes; +use crate::config::RunWithImporterConfig; +use crate::eth::miner::Miner; +use crate::eth::primitives::Block; +#[cfg(feature = "metrics")] +use crate::infra::metrics; + +const PEER_DISCOVERY_DELAY: Duration = Duration::from_secs(30); + +#[derive(Clone, Copy, Debug, PartialEq)] +enum Role { + Leader = 1, + Follower = 2, + _Candidate = 3, +} + +static ROLE: AtomicU8 = AtomicU8::new(Role::Follower as u8); + +#[derive(Clone, Debug, Default, Hash, Eq, PartialEq)] +struct PeerAddress { + address: String, + jsonrpc_port: u16, + grpc_port: u16, +} + +impl PeerAddress { + fn new(address: String, jsonrpc_port: u16, grpc_port: u16) -> Self { + PeerAddress { + address, + jsonrpc_port, + grpc_port, + } + } + + fn full_grpc_address(&self) -> String { + format!("{}:{}", self.address, self.grpc_port) + } + + fn full_jsonrpc_address(&self) -> String { + format!("http://{}:{}", self.address, self.jsonrpc_port) + } + + fn from_string(s: String) -> Result { + let (scheme, address_part) = if let Some(address) = s.strip_prefix("http://") { + ("http://", address) + } else if let Some(address) = s.strip_prefix("https://") { + ("https://", address) + } else { + return Err(anyhow::anyhow!("invalid scheme")); + }; + + let parts: Vec<&str> = address_part.split(':').collect(); + if parts.len() != 2 { + return Err(anyhow::anyhow!("invalid format")); + } + let address = format!("{}{}", scheme, parts[0]); + let ports: Vec<&str> = parts[1].split(';').collect(); + if ports.len() != 2 { + return Err(anyhow::anyhow!("invalid format for jsonrpc and grpc ports")); + } + let jsonrpc_port = ports[0].parse::()?; + let grpc_port = ports[1].parse::()?; + Ok(PeerAddress { + address, + jsonrpc_port, + grpc_port, + }) + } +} + +use std::fmt; + +impl fmt::Display for PeerAddress { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}:{};{}", self.address, self.jsonrpc_port, self.grpc_port) + } +} + +#[cfg(test)] +use crate::eth::consensus::tests::factories::MockAppendEntryServiceClient; + +#[cfg(not(test))] +type ClientType = AppendEntryServiceClient; +#[cfg(test)] +type ClientType = MockAppendEntryServiceClient; + +#[derive(Clone)] +struct Peer { + client: ClientType, + match_index: u64, + next_index: u64, + role: Role, + receiver: Arc>>, +} + +type PeerTuple = (Peer, JoinHandle<()>); + +pub struct Consensus { + broadcast_sender: broadcast::Sender, //propagates the blocks + importer_config: Option, //HACK this is used with sync online only + storage: Arc, + miner: Arc, + log_entries_storage: Arc, + peers: Arc>>, + #[allow(dead_code)] + direct_peers: Vec, + voted_for: Mutex>, //essential to ensure that a server only votes once per term + current_term: AtomicU64, + last_arrived_block_number: AtomicU64, // kept for should_serve method check + prev_log_index: AtomicU64, + transaction_execution_queue: Arc>>, + heartbeat_timeout: Duration, + my_address: PeerAddress, + #[allow(dead_code)] + grpc_address: SocketAddr, + reset_heartbeat_signal: tokio::sync::Notify, + blockchain_client: Mutex>>, +} + +impl Consensus { + #[allow(clippy::too_many_arguments)] //TODO: refactor into consensus config + pub fn new( + storage: Arc, + miner: Arc, + log_storage_path: Option, + direct_peers: Vec, + importer_config: Option, + jsonrpc_address: SocketAddr, + grpc_address: SocketAddr, + ) -> Arc { + let (broadcast_sender, _) = broadcast::channel(32); //TODO rename to internal_peer_broadcast_sender + let last_arrived_block_number = AtomicU64::new(0); + let peers = Arc::new(RwLock::new(HashMap::new())); + let my_address = Self::discover_my_address(jsonrpc_address.port(), grpc_address.port()); + + let log_entries_storage: Arc = Arc::new(AppendLogEntriesStorage::new(log_storage_path).unwrap()); + let current_term: u64 = log_entries_storage.get_last_term().unwrap_or(1); + let prev_log_index: u64 = log_entries_storage.get_last_index().unwrap_or(0); + + let consensus = Self { + broadcast_sender, + miner: Arc::clone(&miner), + storage, + log_entries_storage, + peers, + direct_peers, + current_term: AtomicU64::new(current_term), + voted_for: Mutex::new(None), + prev_log_index: AtomicU64::new(prev_log_index), + last_arrived_block_number, + transaction_execution_queue: Arc::new(Mutex::new(Vec::new())), + importer_config, + heartbeat_timeout: Duration::from_millis(rand::thread_rng().gen_range(300..400)), // Adjust as needed + my_address: my_address.clone(), + grpc_address, + reset_heartbeat_signal: tokio::sync::Notify::new(), + blockchain_client: Mutex::new(None), + }; + let consensus = Arc::new(consensus); + + Self::initialize_periodic_peer_discovery(Arc::clone(&consensus)); + #[cfg(feature = "raft")] + { + //TODO replace this for a synchronous call + let rx_pending_txs: broadcast::Receiver = miner.notifier_pending_txs.subscribe(); + let rx_blocks: broadcast::Receiver = miner.notifier_blocks.subscribe(); + propagation::initialize_transaction_execution_queue(Arc::clone(&consensus)); + propagation::initialize_append_entries_channel(Arc::clone(&consensus), rx_pending_txs, rx_blocks); + server::initialize_server(Arc::clone(&consensus)); + } + Self::initialize_heartbeat_timer(Arc::clone(&consensus)); + + tracing::info!(my_address = %my_address, "consensus module initialized"); + consensus + } + + fn discover_my_address(jsonrpc_port: u16, grpc_port: u16) -> PeerAddress { + let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); + socket.connect("8.8.8.8:80").ok().unwrap(); + let my_ip = socket.local_addr().ok().map(|addr| addr.ip().to_string()).unwrap(); + + PeerAddress::new(format!("http://{}", my_ip), jsonrpc_port, grpc_port) + } + + /// Initializes the heartbeat and election timers. + /// This function periodically checks if the node should start a new election based on the election timeout. + /// The timer is reset when an `AppendEntries` request is received, ensuring the node remains a follower if a leader is active. + /// + /// When there are healthy peers we need to wait for the grace period of discovery + /// to avoid starting an election too soon (due to the leader not being discovered yet) + fn initialize_heartbeat_timer(consensus: Arc) { + const TASK_NAME: &str = "consensus::heartbeat_timer"; + spawn_named(TASK_NAME, async move { + discovery::discover_peers(Arc::clone(&consensus)).await; + if consensus.peers.read().await.is_empty() { + tracing::info!("no peers, starting hearbeat timer immediately"); + Self::start_election(Arc::clone(&consensus)).await; + } else { + traced_sleep(PEER_DISCOVERY_DELAY, SleepReason::Interval).await; + tracing::info!("waiting for peer discovery grace period"); + } + + let timeout = consensus.heartbeat_timeout; + loop { + tokio::select! { + _ = GlobalState::wait_shutdown_warn(TASK_NAME) => { + return; + }, + _ = traced_sleep(timeout, SleepReason::Interval) => { + if !Self::is_leader() { + tracing::info!("starting election due to heartbeat timeout"); + Self::start_election(Arc::clone(&consensus)).await; + } else { + let current_term = consensus.current_term.load(Ordering::SeqCst); + tracing::info!(current_term = current_term, "heartbeat timeout reached, but I am the leader, so we ignore the election"); + } + }, + _ = consensus.reset_heartbeat_signal.notified() => { + // Timer reset upon receiving AppendEntries + match consensus.leader_address().await { + Ok(leader_address) => tracing::info!(leader_address = %leader_address, "resetting election timer due to AppendEntries"), + Err(e) => tracing::warn!(error = %e, "resetting election timer due to AppendEntries, but leader not found"), // this should not happen, but if it does it's because the leader changed in the middle of an append entry + } + }, + } + } + }); + } + + /// Starts the election process for the consensus module. + /// + /// This method is called when a node suspects that there is no active leader in the cluster. + /// The node increments its term and votes for itself, then sends RequestVote RPCs to all other nodes in the cluster. + /// If the node receives a majority of votes, it becomes the leader. Otherwise, it remains a follower and waits for the next election timeout. + /// + /// # Details + /// + /// - The method first increments the current term and votes for itself. + /// - It then sends out `RequestVote` RPCs to all known peers. + /// - If a majority of the peers grant their votes, the node transitions to the leader role. + /// - If not, it remains a follower and waits for the next election cycle. + async fn start_election(consensus: Arc) { + #[cfg(feature = "metrics")] + let start = metrics::now(); + + discovery::discover_peers(Arc::clone(&consensus)).await; + + let term = consensus.current_term.fetch_add(1, Ordering::SeqCst) + 1; + consensus.current_term.store(term, Ordering::SeqCst); + + *consensus.voted_for.lock().await = Some(consensus.my_address.clone()); + + let mut votes = 1; // Vote for self + + let peer_addresses = { + let peers = consensus.peers.read().await; + peers.keys().cloned().collect::>() + }; + + tracing::info!( + requested_term = term, + candidate_id = %consensus.my_address, + "requesting vote on election for {} peers", + peer_addresses.len() + ); + + for peer_address in peer_addresses { + let peer_clone = { + let peers = consensus.peers.read().await; + peers.get(&peer_address).map(|(p, _)| p.clone()) + }; + + if let Some(mut peer) = peer_clone { + let request = Request::new(RequestVoteRequest { + term, + candidate_id: consensus.my_address.to_string(), + last_log_index: consensus.prev_log_index.load(Ordering::SeqCst), + last_log_term: term, + }); + + match peer.client.request_vote(request).await { + Ok(response) => { + let response_inner = response.into_inner(); + if response_inner.vote_granted { + let current_term = consensus.current_term.load(Ordering::SeqCst); + if response_inner.term == current_term { + tracing::info!(peer_address = %peer_address, "received vote on election"); + votes += 1; + } else { + // this usually happens when we have either a split brain or a network issue, maybe both + tracing::error!( + peer_address = %peer_address, + expected_term = response_inner.term, + "received vote on election with different term" + ); + } + } else { + tracing::info!(peer_address = %peer_address, "did not receive vote on election"); + } + } + Err(_) => { + tracing::warn!("failed to request vote on election from {:?}", peer_address); + } + } + } + } + + let total_nodes = { + let peers = consensus.peers.read().await; + peers.len() + 1 // Including self + }; + let majority = total_nodes / 2 + 1; + + if votes >= majority { + tracing::info!(votes = votes, peers = total_nodes - 1, term = term, "became the leader on election"); + consensus.become_leader().await; + } else { + tracing::info!(votes = votes, peers = total_nodes - 1, term = term, "failed to become the leader on election"); + Self::set_role(Role::Follower); + } + + #[cfg(feature = "metrics")] + metrics::inc_consensus_start_election(start.elapsed()); + } + + /// When importer config is set, it will refresh the blockchain client (substrate or any other blockchain client) + /// however, if stratus is on miner mode, it will clear the blockchain client for safety reasons, so it has no chance to forward anything to a follower node + async fn become_leader(&self) { + // checks if it's running on importer-online mode + if self.importer_config.is_some() { + self.refresh_blockchain_client().await; + } else { + let mut blockchain_client_lock = self.blockchain_client.lock().await; + *blockchain_client_lock = None; // clear the blockchain client for safety reasons when not running on importer-online mode + } + + let last_index: u64 = self.log_entries_storage.get_last_index().unwrap_or(0); + + let next_index = last_index + 1; + // When a node becomes a leader, it should reset the match_index for all peers. + // Also, the next_index should be set to the last index + 1. + { + let mut peers = self.peers.write().await; + for (peer, _) in peers.values_mut() { + peer.match_index = 0; + peer.next_index = next_index; + } + } + + Self::set_role(Role::Leader); + } + + async fn refresh_blockchain_client(&self) { + let (http_url, _) = self.get_chain_url().await.expect("failed to get chain url"); + let mut blockchain_client_lock = self.blockchain_client.lock().await; + + tracing::info!(http_url = http_url, "changing blockchain client"); + + *blockchain_client_lock = Some( + BlockchainClient::new_http(&http_url, Duration::from_secs(2)) + .await + .expect("failed to create blockchain client") + .into(), + ); + } + + fn initialize_periodic_peer_discovery(consensus: Arc) { + const TASK_NAME: &str = "consensus::peer_discovery"; + spawn_named(TASK_NAME, async move { + let mut interval = tokio::time::interval(PEER_DISCOVERY_DELAY); + + let periodic_discover = || async move { + loop { + discovery::discover_peers(Arc::clone(&consensus)).await; + interval.tick().await; + } + }; + + tokio::select! { + _ = GlobalState::wait_shutdown_warn(TASK_NAME) => {}, + _ = periodic_discover() => { + unreachable!("this infinite future doesn't end"); + }, + }; + }); + } + + fn set_role(role: Role) { + if ROLE.load(Ordering::SeqCst) == role as u8 { + tracing::info!(role = ?role, "role remains the same"); + return; + } + + tracing::info!(role = ?role, "setting role"); + ROLE.store(role as u8, Ordering::SeqCst); + + #[cfg(feature = "metrics")] + { + if role == Role::Leader { + metrics::set_consensus_is_leader(1_u64); + metrics::inc_consensus_leadership_change(); + } else { + metrics::set_consensus_is_leader(0_u64); + } + } + } + + //FIXME TODO automate the way we gather the leader, instead of using a env var + pub fn is_leader() -> bool { + ROLE.load(Ordering::SeqCst) == Role::Leader as u8 + } + + pub fn is_follower() -> bool { + ROLE.load(Ordering::SeqCst) == Role::Follower as u8 + } + + pub fn current_term(&self) -> u64 { + self.current_term.load(Ordering::SeqCst) + } + + pub fn last_index(&self) -> u64 { + self.prev_log_index.load(Ordering::SeqCst) + } + + pub fn should_forward(&self) -> bool { + let is_leader = Self::is_leader(); + tracing::info!( + is_leader = is_leader, + sync_online_enabled = self.importer_config.is_some(), + "handling request forward" + ); + if is_leader && self.importer_config.is_none() { + return false; // the leader is on miner mode and should deal with the requests + } + true + } + + pub async fn forward(&self, transaction: Bytes) -> anyhow::Result<(Hash, String)> { + #[cfg(feature = "metrics")] + let start = metrics::now(); + + let blockchain_client_lock = self.blockchain_client.lock().await; + + let Some(ref blockchain_client) = *blockchain_client_lock else { + return Err(anyhow::anyhow!("blockchain client is not set, cannot forward transaction")); + }; + + let result = blockchain_client.send_raw_transaction(transaction.into()).await?; + + #[cfg(feature = "metrics")] + metrics::inc_consensus_forward(start.elapsed()); + + Ok((result.tx_hash, blockchain_client.http_url.clone())) + } + + pub async fn should_serve(&self) -> bool { + if self.importer_config.is_some() { + //gather the latest block number, check how far behind it is from current storage block + //if its greater than 3 blocks of distance, it should not be served + let blockchain_client_lock = self.blockchain_client.lock().await; + + let Some(ref blockchain_client) = *blockchain_client_lock else { + tracing::error!("blockchain client is not set at importer, cannot serve requests because they cant be forwarded"); + return false; + }; + + let Ok(validator_block_number) = blockchain_client.fetch_block_number().await else { + tracing::error!("unable to fetch latest block number"); + return false; + }; + + let Ok(current_block_number) = self.storage.read_mined_block_number() else { + tracing::error!("unable to fetch current block number"); + return false; + }; + + return (validator_block_number.as_u64() - 3) <= current_block_number.as_u64(); + } + + // consensus + if Self::is_leader() { + return true; + } + + if self.blockchain_client.lock().await.is_none() { + tracing::warn!("blockchain client is not set, cannot serve requests because they cant be forwarded"); + return false; + } + + let last_arrived_block_number = self.last_arrived_block_number.load(Ordering::SeqCst); + + if last_arrived_block_number == 0 { + tracing::warn!("no appendEntry has been received yet"); + false + } else { + { + let storage_block_number: u64 = self.storage.read_mined_block_number().unwrap_or_default().into(); + + tracing::info!( + "last arrived block number: {}, storage block number: {}", + last_arrived_block_number, + storage_block_number + ); + + if (last_arrived_block_number - 3) <= storage_block_number { + tracing::info!("should serve request"); + true + } else { + let diff = (last_arrived_block_number as i128) - (storage_block_number as i128); + tracing::warn!(diff = diff, "should not serve request"); + false + } + } + } + } + + async fn leader_address(&self) -> anyhow::Result { + let peers = self.peers.read().await; + for (address, (peer, _)) in peers.iter() { + if peer.role == Role::Leader { + return Ok(address.clone()); + } + } + Err(anyhow!("Leader not found")) + } + + pub async fn get_chain_url(&self) -> anyhow::Result<(String, Option)> { + if let Some(importer_config) = self.importer_config.clone() { + return Ok((importer_config.online.external_rpc, importer_config.online.external_rpc_ws)); + } + + if Self::is_follower() { + if let Ok(leader_address) = self.leader_address().await { + return Ok((leader_address.full_jsonrpc_address(), None)); + } + } + + Err(anyhow!("tried to get chain url as a leader and while running on miner mode")) + } + + async fn update_leader(&self, leader_address: PeerAddress) { + if leader_address == self.leader_address().await.unwrap_or_default() { + tracing::info!("leader is the same as before"); + return; + } + + let mut peers = self.peers.write().await; + for (address, (peer, _)) in peers.iter_mut() { + if *address == leader_address { + peer.role = Role::Leader; + + self.refresh_blockchain_client().await; + } else { + peer.role = Role::Follower; + } + } + + tracing::info!(leader = %leader_address, "updated leader information"); + } +} + +#[cfg(test)] +mod tests { + use super::*; + pub mod factories; + mod test_simple_blocks; + + #[test] + fn test_peer_address_from_string_valid_http() { + let input = "http://127.0.0.1:3000;3777".to_string(); + let result = PeerAddress::from_string(input); + + assert!(result.is_ok()); + let peer_address = result.unwrap(); + assert_eq!(peer_address.address, "http://127.0.0.1"); + assert_eq!(peer_address.jsonrpc_port, 3000); + assert_eq!(peer_address.grpc_port, 3777); + } + + #[test] + fn test_peer_address_from_string_valid_https() { + let input = "https://127.0.0.1:3000;3777".to_string(); + let result = PeerAddress::from_string(input); + + assert!(result.is_ok()); + let peer_address = result.unwrap(); + assert_eq!(peer_address.address, "https://127.0.0.1"); + assert_eq!(peer_address.jsonrpc_port, 3000); + assert_eq!(peer_address.grpc_port, 3777); + } + + #[test] + fn test_peer_address_from_string_invalid_format() { + let input = "http://127.0.0.1-3000;3777".to_string(); + let result = PeerAddress::from_string(input); + + assert!(result.is_err()); + assert_eq!(result.err().unwrap().to_string(), "invalid format"); + } + + #[test] + fn test_peer_address_from_string_missing_scheme() { + let input = "127.0.0.1:3000;3777".to_string(); + let result = PeerAddress::from_string(input); + + assert!(result.is_err()); + assert_eq!(result.err().unwrap().to_string(), "invalid scheme"); + } -#[async_trait] -pub trait Consensus: Send + Sync { - async fn should_serve(&self) -> bool; - fn should_forward(&self) -> bool; - async fn forward(&self, transaction: Bytes) -> anyhow::Result; + #[test] + fn test_peer_address_full_grpc_address() { + let peer_address = PeerAddress::new("127.0.0.1".to_string(), 3000, 3777); + assert_eq!(peer_address.full_grpc_address(), "127.0.0.1:3777"); + } } diff --git a/src/eth/consensus/raft/propagation.rs b/src/eth/consensus/propagation.rs similarity index 92% rename from src/eth/consensus/raft/propagation.rs rename to src/eth/consensus/propagation.rs index dc929a4df..6114bbc01 100644 --- a/src/eth/consensus/raft/propagation.rs +++ b/src/eth/consensus/propagation.rs @@ -8,14 +8,14 @@ use tokio::sync::broadcast; use tonic::Request; use super::Block; +use super::Consensus; use super::LogEntryData; use super::Peer; -use super::Raft; -use crate::eth::consensus::raft::append_entry::AppendBlockCommitRequest; -use crate::eth::consensus::raft::append_entry::AppendBlockCommitResponse; -use crate::eth::consensus::raft::append_entry::AppendTransactionExecutionsRequest; -use crate::eth::consensus::raft::append_entry::AppendTransactionExecutionsResponse; -use crate::eth::consensus::raft::append_entry::StatusCode; +use crate::eth::consensus::append_entry::AppendBlockCommitRequest; +use crate::eth::consensus::append_entry::AppendBlockCommitResponse; +use crate::eth::consensus::append_entry::AppendTransactionExecutionsRequest; +use crate::eth::consensus::append_entry::AppendTransactionExecutionsResponse; +use crate::eth::consensus::append_entry::StatusCode; use crate::eth::primitives::TransactionExecution; use crate::ext::spawn_named; use crate::ext::traced_sleep; @@ -37,7 +37,7 @@ enum AppendResponse { } #[allow(dead_code)] -pub async fn save_and_handle_log_entry(consensus: Arc, log_entry_data: LogEntryData) -> Result<()> { +pub async fn save_and_handle_log_entry(consensus: Arc, log_entry_data: LogEntryData) -> Result<()> { let last_index = consensus.log_entries_storage.get_last_index().unwrap_or(0); tracing::debug!(last_index, "Last index fetched"); @@ -69,8 +69,8 @@ pub async fn save_and_handle_log_entry(consensus: Arc, log_entry_data: Log } #[allow(dead_code)] -pub async fn handle_block_entry(consensus: Arc, block: Block) { - if Raft::is_leader() { +pub async fn handle_block_entry(consensus: Arc, block: Block) { + if Consensus::is_leader() { tracing::info!(number = block.header.number.as_u64(), "Leader received block to send to followers"); let transaction_hashes: Vec> = block.transactions.iter().map(|tx| tx.input.hash.as_fixed_bytes().to_vec()).collect(); @@ -83,8 +83,8 @@ pub async fn handle_block_entry(consensus: Arc, block: Block) { } #[allow(dead_code)] -pub async fn handle_transaction_executions(consensus: Arc) { - if Raft::is_leader() { +pub async fn handle_transaction_executions(consensus: Arc) { + if Consensus::is_leader() { let mut queue = consensus.transaction_execution_queue.lock().await; let executions = queue.drain(..).collect::>(); drop(queue); @@ -97,7 +97,7 @@ pub async fn handle_transaction_executions(consensus: Arc) { } } -pub async fn handle_peer_propagation(mut peer: Peer, consensus: Arc) { +pub async fn handle_peer_propagation(mut peer: Peer, consensus: Arc) { const TASK_NAME: &str = "consensus::propagate"; let mut log_entry_queue: Vec = Vec::new(); @@ -153,8 +153,8 @@ pub async fn handle_peer_propagation(mut peer: Peer, consensus: Arc) { } } -async fn append_entry_to_peer(consensus: Arc, peer: &mut Peer, entry_data: &LogEntryData) -> Result<(), anyhow::Error> { - if !Raft::is_leader() { +async fn append_entry_to_peer(consensus: Arc, peer: &mut Peer, entry_data: &LogEntryData) -> Result<(), anyhow::Error> { + if !Consensus::is_leader() { tracing::error!("append_entry_to_peer called on non-leader node"); return Err(anyhow!("append_entry_to_peer called on non-leader node")); } @@ -253,7 +253,7 @@ async fn append_entry_to_peer(consensus: Arc, peer: &mut Peer, entry_data: } async fn send_append_entry_request( - consensus: Arc, + consensus: Arc, peer: &mut Peer, current_term: u64, prev_log_index: u64, @@ -304,7 +304,7 @@ async fn send_append_entry_request( } #[allow(dead_code)] -pub fn initialize_transaction_execution_queue(consensus: Arc) { +pub fn initialize_transaction_execution_queue(consensus: Arc) { // XXX FIXME: deal with the scenario where a transactionHash arrives after the block; // in this case, before saving the block LogEntry, it should ALWAYS wait for all transaction hashes @@ -329,7 +329,7 @@ pub fn initialize_transaction_execution_queue(consensus: Arc) { //TODO this broadcast needs to wait for majority of followers to confirm the log before sending the next one #[allow(dead_code)] pub fn initialize_append_entries_channel( - consensus: Arc, + consensus: Arc, mut rx_pending_txs: broadcast::Receiver, mut rx_blocks: broadcast::Receiver, ) { @@ -341,7 +341,7 @@ pub fn initialize_append_entries_channel( return; }, Ok(tx) = rx_pending_txs.recv() => { - if Raft::is_leader() { + if Consensus::is_leader() { tracing::info!(tx_hash = %tx.hash(), "received transaction execution to send to followers"); if tx.is_local() { tracing::debug!(tx_hash = %tx.hash(), "skipping local transaction because only external transactions are supported for now"); diff --git a/src/eth/consensus/raft/mod.rs b/src/eth/consensus/raft/mod.rs deleted file mode 100644 index c876b4b65..000000000 --- a/src/eth/consensus/raft/mod.rs +++ /dev/null @@ -1,683 +0,0 @@ -//FIXME temporarily there are a lot of allow dead_code and allow unused_imports, we need to deal with them properly later -#[allow(dead_code)] //TODO remove this -mod append_log_entries_storage; -mod discovery; -mod log_entry; -mod propagation; -pub mod utils; - -mod server; - -use std::collections::HashMap; -use std::net::SocketAddr; -use std::net::UdpSocket; -use std::sync::atomic::AtomicU64; -use std::sync::atomic::AtomicU8; -use std::sync::atomic::Ordering; -use std::sync::Arc; -use std::time::Duration; - -use anyhow::anyhow; -use async_trait::async_trait; -use rand::Rng; -use tokio::sync::broadcast; -use tokio::sync::Mutex; -use tokio::sync::RwLock; -use tokio::task::JoinHandle; -use tonic::Request; - -use crate::eth::importer::ImporterConfig; -use crate::eth::primitives::Hash; -#[allow(unused_imports)] -use crate::eth::primitives::TransactionExecution; -use crate::eth::storage::StratusStorage; -use crate::ext::spawn_named; -use crate::ext::traced_sleep; -use crate::ext::SleepReason; -use crate::infra::BlockchainClient; -use crate::GlobalState; - -pub mod append_entry { - #![allow(clippy::default_trait_access, clippy::wildcard_imports)] - tonic::include_proto!("append_entry"); -} -#[allow(unused_imports)] -use append_entry::append_entry_service_client::AppendEntryServiceClient; -use append_entry::append_entry_service_server::AppendEntryService; -use append_entry::RequestVoteRequest; -use append_entry::TransactionExecutionEntry; - -use self::append_log_entries_storage::AppendLogEntriesStorage; -use self::log_entry::LogEntryData; -use super::Consensus; -use crate::eth::miner::Miner; -use crate::eth::primitives::Block; -use crate::eth::primitives::Bytes; -#[cfg(feature = "metrics")] -use crate::infra::metrics; - -const PEER_DISCOVERY_DELAY: Duration = Duration::from_secs(30); - -#[derive(Clone, Copy, Debug, PartialEq)] -enum Role { - Leader = 1, - Follower = 2, - _Candidate = 3, -} - -static ROLE: AtomicU8 = AtomicU8::new(Role::Follower as u8); - -#[derive(Clone, Debug, Default, Hash, Eq, PartialEq)] -struct PeerAddress { - address: String, - jsonrpc_port: u16, - grpc_port: u16, -} - -impl PeerAddress { - fn new(address: String, jsonrpc_port: u16, grpc_port: u16) -> Self { - PeerAddress { - address, - jsonrpc_port, - grpc_port, - } - } - - fn full_grpc_address(&self) -> String { - format!("{}:{}", self.address, self.grpc_port) - } - - fn full_jsonrpc_address(&self) -> String { - format!("http://{}:{}", self.address, self.jsonrpc_port) - } - - fn from_string(s: String) -> Result { - let (scheme, address_part) = if let Some(address) = s.strip_prefix("http://") { - ("http://", address) - } else if let Some(address) = s.strip_prefix("https://") { - ("https://", address) - } else { - return Err(anyhow::anyhow!("invalid scheme")); - }; - - let parts: Vec<&str> = address_part.split(':').collect(); - if parts.len() != 2 { - return Err(anyhow::anyhow!("invalid format")); - } - let address = format!("{}{}", scheme, parts[0]); - let ports: Vec<&str> = parts[1].split(';').collect(); - if ports.len() != 2 { - return Err(anyhow::anyhow!("invalid format for jsonrpc and grpc ports")); - } - let jsonrpc_port = ports[0].parse::()?; - let grpc_port = ports[1].parse::()?; - Ok(PeerAddress { - address, - jsonrpc_port, - grpc_port, - }) - } -} - -use std::fmt; - -impl fmt::Display for PeerAddress { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "{}:{};{}", self.address, self.jsonrpc_port, self.grpc_port) - } -} - -#[cfg(test)] -use crate::eth::consensus::raft::tests::factories::MockAppendEntryServiceClient; - -#[cfg(not(test))] -type ClientType = AppendEntryServiceClient; -#[cfg(test)] -type ClientType = MockAppendEntryServiceClient; - -#[derive(Clone)] -struct Peer { - client: ClientType, - match_index: u64, - next_index: u64, - role: Role, - receiver: Arc>>, -} - -type PeerTuple = (Peer, JoinHandle<()>); - -pub struct Raft { - broadcast_sender: broadcast::Sender, //propagates the blocks - importer_config: Option, //HACK this is used with sync online only - storage: Arc, - miner: Arc, - log_entries_storage: Arc, - peers: Arc>>, - #[allow(dead_code)] - direct_peers: Vec, - voted_for: Mutex>, //essential to ensure that a server only votes once per term - current_term: AtomicU64, - last_arrived_block_number: AtomicU64, // kept for should_serve method check - prev_log_index: AtomicU64, - transaction_execution_queue: Arc>>, - heartbeat_timeout: Duration, - my_address: PeerAddress, - #[allow(dead_code)] - grpc_address: SocketAddr, - reset_heartbeat_signal: tokio::sync::Notify, - blockchain_client: Mutex>>, -} - -impl Raft { - #[allow(clippy::too_many_arguments)] //TODO: refactor into consensus config - pub fn new( - storage: Arc, - miner: Arc, - log_storage_path: Option, - direct_peers: Vec, - importer_config: Option, - jsonrpc_address: SocketAddr, - grpc_address: SocketAddr, - ) -> Arc { - let (broadcast_sender, _) = broadcast::channel(32); //TODO rename to internal_peer_broadcast_sender - let last_arrived_block_number = AtomicU64::new(0); - let peers = Arc::new(RwLock::new(HashMap::new())); - let my_address = Self::discover_my_address(jsonrpc_address.port(), grpc_address.port()); - - let log_entries_storage: Arc = Arc::new(AppendLogEntriesStorage::new(log_storage_path).unwrap()); - let current_term: u64 = log_entries_storage.get_last_term().unwrap_or(1); - let prev_log_index: u64 = log_entries_storage.get_last_index().unwrap_or(0); - - let consensus = Self { - broadcast_sender, - miner: Arc::clone(&miner), - storage, - log_entries_storage, - peers, - direct_peers, - current_term: AtomicU64::new(current_term), - voted_for: Mutex::new(None), - prev_log_index: AtomicU64::new(prev_log_index), - last_arrived_block_number, - transaction_execution_queue: Arc::new(Mutex::new(Vec::new())), - importer_config, - heartbeat_timeout: Duration::from_millis(rand::thread_rng().gen_range(300..400)), // Adjust as needed - my_address: my_address.clone(), - grpc_address, - reset_heartbeat_signal: tokio::sync::Notify::new(), - blockchain_client: Mutex::new(None), - }; - let consensus = Arc::new(consensus); - - Self::initialize_periodic_peer_discovery(Arc::clone(&consensus)); - #[cfg(feature = "raft")] - { - //TODO replace this for a synchronous call - let rx_pending_txs: broadcast::Receiver = miner.notifier_pending_txs.subscribe(); - let rx_blocks: broadcast::Receiver = miner.notifier_blocks.subscribe(); - propagation::initialize_transaction_execution_queue(Arc::clone(&consensus)); - propagation::initialize_append_entries_channel(Arc::clone(&consensus), rx_pending_txs, rx_blocks); - server::initialize_server(Arc::clone(&consensus)); - } - Self::initialize_heartbeat_timer(Arc::clone(&consensus)); - - tracing::info!(my_address = %my_address, "consensus module initialized"); - consensus - } - - fn discover_my_address(jsonrpc_port: u16, grpc_port: u16) -> PeerAddress { - let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); - socket.connect("8.8.8.8:80").ok().unwrap(); - let my_ip = socket.local_addr().ok().map(|addr| addr.ip().to_string()).unwrap(); - - PeerAddress::new(format!("http://{}", my_ip), jsonrpc_port, grpc_port) - } - - /// Initializes the heartbeat and election timers. - /// This function periodically checks if the node should start a new election based on the election timeout. - /// The timer is reset when an `AppendEntries` request is received, ensuring the node remains a follower if a leader is active. - /// - /// When there are healthy peers we need to wait for the grace period of discovery - /// to avoid starting an election too soon (due to the leader not being discovered yet) - fn initialize_heartbeat_timer(consensus: Arc) { - const TASK_NAME: &str = "consensus::heartbeat_timer"; - spawn_named(TASK_NAME, async move { - discovery::discover_peers(Arc::clone(&consensus)).await; - if consensus.peers.read().await.is_empty() { - tracing::info!("no peers, starting hearbeat timer immediately"); - Self::start_election(Arc::clone(&consensus)).await; - } else { - traced_sleep(PEER_DISCOVERY_DELAY, SleepReason::Interval).await; - tracing::info!("waiting for peer discovery grace period"); - } - - let timeout = consensus.heartbeat_timeout; - loop { - tokio::select! { - _ = GlobalState::wait_shutdown_warn(TASK_NAME) => { - return; - }, - _ = traced_sleep(timeout, SleepReason::Interval) => { - if !Self::is_leader() { - tracing::info!("starting election due to heartbeat timeout"); - Self::start_election(Arc::clone(&consensus)).await; - } else { - let current_term = consensus.current_term.load(Ordering::SeqCst); - tracing::info!(current_term = current_term, "heartbeat timeout reached, but I am the leader, so we ignore the election"); - } - }, - _ = consensus.reset_heartbeat_signal.notified() => { - // Timer reset upon receiving AppendEntries - match consensus.leader_address().await { - Ok(leader_address) => tracing::info!(leader_address = %leader_address, "resetting election timer due to AppendEntries"), - Err(e) => tracing::warn!(error = %e, "resetting election timer due to AppendEntries, but leader not found"), // this should not happen, but if it does it's because the leader changed in the middle of an append entry - } - }, - } - } - }); - } - - /// Starts the election process for the consensus module. - /// - /// This method is called when a node suspects that there is no active leader in the cluster. - /// The node increments its term and votes for itself, then sends RequestVote RPCs to all other nodes in the cluster. - /// If the node receives a majority of votes, it becomes the leader. Otherwise, it remains a follower and waits for the next election timeout. - /// - /// # Details - /// - /// - The method first increments the current term and votes for itself. - /// - It then sends out `RequestVote` RPCs to all known peers. - /// - If a majority of the peers grant their votes, the node transitions to the leader role. - /// - If not, it remains a follower and waits for the next election cycle. - async fn start_election(consensus: Arc) { - #[cfg(feature = "metrics")] - let start = metrics::now(); - - discovery::discover_peers(Arc::clone(&consensus)).await; - - let term = consensus.current_term.fetch_add(1, Ordering::SeqCst) + 1; - consensus.current_term.store(term, Ordering::SeqCst); - - *consensus.voted_for.lock().await = Some(consensus.my_address.clone()); - - let mut votes = 1; // Vote for self - - let peer_addresses = { - let peers = consensus.peers.read().await; - peers.keys().cloned().collect::>() - }; - - tracing::info!( - requested_term = term, - candidate_id = %consensus.my_address, - "requesting vote on election for {} peers", - peer_addresses.len() - ); - - for peer_address in peer_addresses { - let peer_clone = { - let peers = consensus.peers.read().await; - peers.get(&peer_address).map(|(p, _)| p.clone()) - }; - - if let Some(mut peer) = peer_clone { - let request = Request::new(RequestVoteRequest { - term, - candidate_id: consensus.my_address.to_string(), - last_log_index: consensus.prev_log_index.load(Ordering::SeqCst), - last_log_term: term, - }); - - match peer.client.request_vote(request).await { - Ok(response) => { - let response_inner = response.into_inner(); - if response_inner.vote_granted { - let current_term = consensus.current_term.load(Ordering::SeqCst); - if response_inner.term == current_term { - tracing::info!(peer_address = %peer_address, "received vote on election"); - votes += 1; - } else { - // this usually happens when we have either a split brain or a network issue, maybe both - tracing::error!( - peer_address = %peer_address, - expected_term = response_inner.term, - "received vote on election with different term" - ); - } - } else { - tracing::info!(peer_address = %peer_address, "did not receive vote on election"); - } - } - Err(_) => { - tracing::warn!("failed to request vote on election from {:?}", peer_address); - } - } - } - } - - let total_nodes = { - let peers = consensus.peers.read().await; - peers.len() + 1 // Including self - }; - let majority = total_nodes / 2 + 1; - - if votes >= majority { - tracing::info!(votes = votes, peers = total_nodes - 1, term = term, "became the leader on election"); - consensus.become_leader().await; - } else { - tracing::info!(votes = votes, peers = total_nodes - 1, term = term, "failed to become the leader on election"); - Self::set_role(Role::Follower); - } - - #[cfg(feature = "metrics")] - metrics::inc_consensus_start_election(start.elapsed()); - } - - /// When importer config is set, it will refresh the blockchain client (substrate or any other blockchain client) - /// however, if stratus is on miner mode, it will clear the blockchain client for safety reasons, so it has no chance to forward anything to a follower node - async fn become_leader(&self) { - // checks if it's running on importer-online mode - if self.importer_config.is_some() { - self.refresh_blockchain_client().await; - } else { - let mut blockchain_client_lock = self.blockchain_client.lock().await; - *blockchain_client_lock = None; // clear the blockchain client for safety reasons when not running on importer-online mode - } - - let last_index: u64 = self.log_entries_storage.get_last_index().unwrap_or(0); - - let next_index = last_index + 1; - // When a node becomes a leader, it should reset the match_index for all peers. - // Also, the next_index should be set to the last index + 1. - { - let mut peers = self.peers.write().await; - for (peer, _) in peers.values_mut() { - peer.match_index = 0; - peer.next_index = next_index; - } - } - - Self::set_role(Role::Leader); - } - - async fn refresh_blockchain_client(&self) { - let (http_url, _) = self.get_chain_url().await.expect("failed to get chain url"); - let mut blockchain_client_lock = self.blockchain_client.lock().await; - - tracing::info!(http_url = http_url, "changing blockchain client"); - - *blockchain_client_lock = Some( - BlockchainClient::new_http(&http_url, Duration::from_secs(2)) - .await - .expect("failed to create blockchain client") - .into(), - ); - } - - fn initialize_periodic_peer_discovery(consensus: Arc) { - const TASK_NAME: &str = "consensus::peer_discovery"; - spawn_named(TASK_NAME, async move { - let mut interval = tokio::time::interval(PEER_DISCOVERY_DELAY); - - let periodic_discover = || async move { - loop { - discovery::discover_peers(Arc::clone(&consensus)).await; - interval.tick().await; - } - }; - - tokio::select! { - _ = GlobalState::wait_shutdown_warn(TASK_NAME) => {}, - _ = periodic_discover() => { - unreachable!("this infinite future doesn't end"); - }, - }; - }); - } - - fn set_role(role: Role) { - if ROLE.load(Ordering::SeqCst) == role as u8 { - tracing::info!(role = ?role, "role remains the same"); - return; - } - - tracing::info!(role = ?role, "setting role"); - ROLE.store(role as u8, Ordering::SeqCst); - - #[cfg(feature = "metrics")] - { - if role == Role::Leader { - metrics::set_consensus_is_leader(1_u64); - metrics::inc_consensus_leadership_change(); - } else { - metrics::set_consensus_is_leader(0_u64); - } - } - } - - //FIXME TODO automate the way we gather the leader, instead of using a env var - pub fn is_leader() -> bool { - ROLE.load(Ordering::SeqCst) == Role::Leader as u8 - } - - pub fn is_follower() -> bool { - ROLE.load(Ordering::SeqCst) == Role::Follower as u8 - } - - pub fn current_term(&self) -> u64 { - self.current_term.load(Ordering::SeqCst) - } - - pub fn last_index(&self) -> u64 { - self.prev_log_index.load(Ordering::SeqCst) - } - - pub fn should_forward(&self) -> bool { - let is_leader = Self::is_leader(); - tracing::info!( - is_leader = is_leader, - sync_online_enabled = self.importer_config.is_some(), - "handling request forward" - ); - if is_leader && self.importer_config.is_none() { - return false; // the leader is on miner mode and should deal with the requests - } - true - } - - pub async fn forward(&self, transaction: Bytes) -> anyhow::Result<(Hash, String)> { - #[cfg(feature = "metrics")] - let start = metrics::now(); - - let blockchain_client_lock = self.blockchain_client.lock().await; - - let Some(ref blockchain_client) = *blockchain_client_lock else { - return Err(anyhow::anyhow!("blockchain client is not set, cannot forward transaction")); - }; - - let result = blockchain_client.send_raw_transaction(transaction.into()).await?; - - #[cfg(feature = "metrics")] - metrics::inc_consensus_forward(start.elapsed()); - - Ok((result.tx_hash, blockchain_client.http_url.clone())) //XXX HEX - } - - pub async fn should_serve(&self) -> bool { - if self.importer_config.is_some() { - //gather the latest block number, check how far behind it is from current storage block - //if its greater than 3 blocks of distance, it should not be served - let blockchain_client_lock = self.blockchain_client.lock().await; - - let Some(ref blockchain_client) = *blockchain_client_lock else { - tracing::error!("blockchain client is not set at importer, cannot serve requests because they cant be forwarded"); - return false; - }; - - let Ok(validator_block_number) = blockchain_client.fetch_block_number().await else { - tracing::error!("unable to fetch latest block number"); - return false; - }; - - let Ok(current_block_number) = self.storage.read_mined_block_number() else { - tracing::error!("unable to fetch current block number"); - return false; - }; - - return (validator_block_number.as_u64() - 3) <= current_block_number.as_u64(); - } - - // consensus - if Self::is_leader() { - return true; - } - - if self.blockchain_client.lock().await.is_none() { - tracing::warn!("blockchain client is not set, cannot serve requests because they cant be forwarded"); - return false; - } - - let last_arrived_block_number = self.last_arrived_block_number.load(Ordering::SeqCst); - - if last_arrived_block_number == 0 { - tracing::warn!("no appendEntry has been received yet"); - false - } else { - { - let storage_block_number: u64 = self.storage.read_mined_block_number().unwrap_or_default().into(); - - tracing::info!( - "last arrived block number: {}, storage block number: {}", - last_arrived_block_number, - storage_block_number - ); - - if (last_arrived_block_number - 3) <= storage_block_number { - tracing::info!("should serve request"); - true - } else { - let diff = (last_arrived_block_number as i128) - (storage_block_number as i128); - tracing::warn!(diff = diff, "should not serve request"); - false - } - } - } - } - - async fn leader_address(&self) -> anyhow::Result { - let peers = self.peers.read().await; - for (address, (peer, _)) in peers.iter() { - if peer.role == Role::Leader { - return Ok(address.clone()); - } - } - Err(anyhow!("Leader not found")) - } - - pub async fn get_chain_url(&self) -> anyhow::Result<(String, Option)> { - if let Some(importer_config) = self.importer_config.clone() { - return Ok((importer_config.external_rpc, importer_config.external_rpc_ws)); - } - - if Self::is_follower() { - if let Ok(leader_address) = self.leader_address().await { - return Ok((leader_address.full_jsonrpc_address(), None)); - } - } - - Err(anyhow!("tried to get chain url as a leader and while running on miner mode")) - } - - async fn update_leader(&self, leader_address: PeerAddress) { - if leader_address == self.leader_address().await.unwrap_or_default() { - tracing::info!("leader is the same as before"); - return; - } - - let mut peers = self.peers.write().await; - for (address, (peer, _)) in peers.iter_mut() { - if *address == leader_address { - peer.role = Role::Leader; - - self.refresh_blockchain_client().await; - } else { - peer.role = Role::Follower; - } - } - - tracing::info!(leader = %leader_address, "updated leader information"); - } -} - -#[async_trait] -impl Consensus for Raft { - async fn should_serve(&self) -> bool { - self.should_serve().await - } - - fn should_forward(&self) -> bool { - self.should_forward() - } - - async fn forward(&self, transaction: Bytes) -> anyhow::Result { - let (tx_hash, url) = self.forward(transaction).await?; - tracing::info!(%tx_hash, %url, "forwarded eth_sendRawTransaction to leader"); - Ok(tx_hash) - } -} - -#[cfg(test)] -mod tests { - use super::*; - pub mod factories; - mod test_simple_blocks; - - #[test] - fn test_peer_address_from_string_valid_http() { - let input = "http://127.0.0.1:3000;3777".to_string(); - let result = PeerAddress::from_string(input); - - assert!(result.is_ok()); - let peer_address = result.unwrap(); - assert_eq!(peer_address.address, "http://127.0.0.1"); - assert_eq!(peer_address.jsonrpc_port, 3000); - assert_eq!(peer_address.grpc_port, 3777); - } - - #[test] - fn test_peer_address_from_string_valid_https() { - let input = "https://127.0.0.1:3000;3777".to_string(); - let result = PeerAddress::from_string(input); - - assert!(result.is_ok()); - let peer_address = result.unwrap(); - assert_eq!(peer_address.address, "https://127.0.0.1"); - assert_eq!(peer_address.jsonrpc_port, 3000); - assert_eq!(peer_address.grpc_port, 3777); - } - - #[test] - fn test_peer_address_from_string_invalid_format() { - let input = "http://127.0.0.1-3000;3777".to_string(); - let result = PeerAddress::from_string(input); - - assert!(result.is_err()); - assert_eq!(result.err().unwrap().to_string(), "invalid format"); - } - - #[test] - fn test_peer_address_from_string_missing_scheme() { - let input = "127.0.0.1:3000;3777".to_string(); - let result = PeerAddress::from_string(input); - - assert!(result.is_err()); - assert_eq!(result.err().unwrap().to_string(), "invalid scheme"); - } - - #[test] - fn test_peer_address_full_grpc_address() { - let peer_address = PeerAddress::new("127.0.0.1".to_string(), 3000, 3777); - assert_eq!(peer_address.full_grpc_address(), "127.0.0.1:3777"); - } -} diff --git a/src/eth/consensus/raft/server.rs b/src/eth/consensus/server.rs similarity index 97% rename from src/eth/consensus/raft/server.rs rename to src/eth/consensus/server.rs index 22e1be1e9..5bdbe811b 100644 --- a/src/eth/consensus/raft/server.rs +++ b/src/eth/consensus/server.rs @@ -17,15 +17,15 @@ use super::append_entry::AppendTransactionExecutionsResponse; use super::append_entry::RequestVoteRequest; use super::append_entry::RequestVoteResponse; use super::append_entry::StatusCode; -use super::Raft; -use crate::eth::consensus::raft::append_entry::append_entry_service_server::AppendEntryServiceServer; -use crate::eth::consensus::raft::AppendEntryService; -use crate::eth::consensus::raft::LogEntryData; -use crate::eth::consensus::raft::PeerAddress; -use crate::eth::consensus::raft::Role; +use crate::eth::consensus::append_entry::append_entry_service_server::AppendEntryServiceServer; +use crate::eth::consensus::AppendEntryService; +use crate::eth::consensus::LogEntryData; +use crate::eth::consensus::PeerAddress; +use crate::eth::consensus::Role; use crate::eth::miner::block_from_propagation; use crate::eth::primitives::LocalTransactionExecution; use crate::eth::primitives::TransactionExecution; +use crate::eth::Consensus; use crate::ext::spawn_named; #[cfg(feature = "metrics")] use crate::infra::metrics; @@ -39,7 +39,7 @@ mod label { } #[allow(dead_code)] -pub fn initialize_server(consensus: Arc) { +pub fn initialize_server(consensus: Arc) { const TASK_NAME: &str = "consensus::server"; spawn_named(TASK_NAME, async move { tracing::info!("Starting append entry service at address: {}", consensus.grpc_address); @@ -64,7 +64,7 @@ pub fn initialize_server(consensus: Arc) { } pub struct AppendEntryServiceImpl { - pub consensus: Mutex>, + pub consensus: Mutex>, } #[tonic::async_trait] @@ -87,7 +87,7 @@ impl AppendEntryService for AppendEntryServiceImpl { let current_term = consensus.current_term.load(Ordering::SeqCst); let request_inner = request.into_inner(); - if Raft::is_leader() { + if Consensus::is_leader() { tracing::error!(sender = request_inner.leader_id, "append_transaction_executions called on leader node"); return Err(Status::new( (StatusCode::NotLeader as i32).into(), @@ -252,7 +252,7 @@ impl AppendEntryService for AppendEntryServiceImpl { let current_term = consensus.current_term.load(Ordering::SeqCst); let request_inner = request.into_inner(); - if Raft::is_leader() { + if Consensus::is_leader() { tracing::error!(sender = request_inner.leader_id, "append_block_commit called on leader node"); return Err(Status::new( (StatusCode::NotLeader as i32).into(), @@ -455,7 +455,7 @@ impl AppendEntryService for AppendEntryServiceImpl { if request.last_log_index >= candidate_last_log_index { consensus.current_term.store(request.term, Ordering::SeqCst); - Raft::set_role(Role::Follower); + Consensus::set_role(Role::Follower); consensus.reset_heartbeat_signal.notify_waiters(); // reset the heartbeat signal to avoid election timeout just after voting *consensus.voted_for.lock().await = Some(candidate_address.clone()); @@ -485,8 +485,8 @@ impl AppendEntryService for AppendEntryServiceImpl { #[cfg(test)] mod tests { use super::*; - use crate::eth::consensus::raft::append_entry::BlockEntry; - use crate::eth::consensus::raft::tests::factories::*; + use crate::eth::consensus::append_entry::BlockEntry; + use crate::eth::consensus::tests::factories::*; #[tokio::test] async fn test_append_transaction_executions_insert() { @@ -495,7 +495,7 @@ mod tests { consensus: Mutex::new(Arc::clone(&consensus)), }; - Raft::set_role(Role::Follower); + Consensus::set_role(Role::Follower); let executions = vec![create_mock_transaction_execution_entry(None)]; @@ -530,7 +530,7 @@ mod tests { }; // Simulate the node as not a leader - Raft::set_role(Role::Follower); + Consensus::set_role(Role::Follower); let request = Request::new(AppendTransactionExecutionsRequest { term: 1, @@ -557,7 +557,7 @@ mod tests { }; // Simulate the node as a leader - Raft::set_role(Role::Leader); + Consensus::set_role(Role::Leader); let request = Request::new(AppendTransactionExecutionsRequest { term: 1, @@ -614,7 +614,7 @@ mod tests { }; // Simulate the node as a leader - Raft::set_role(Role::Leader); + Consensus::set_role(Role::Leader); let request = Request::new(AppendBlockCommitRequest { term: 1, diff --git a/src/eth/consensus/simple_consensus/mod.rs b/src/eth/consensus/simple_consensus/mod.rs deleted file mode 100644 index 5ec4d1eb6..000000000 --- a/src/eth/consensus/simple_consensus/mod.rs +++ /dev/null @@ -1,75 +0,0 @@ -use std::sync::Arc; - -use anyhow::anyhow; -use async_trait::async_trait; - -use super::Consensus; -use crate::eth::primitives::Bytes; -use crate::eth::primitives::Hash; -use crate::eth::storage::StratusStorage; -use crate::infra::metrics; -use crate::infra::BlockchainClient; -use crate::log_and_err; - -pub struct SimpleConsensus { - storage: Arc, - // if blockchain_client.is_some() then this is a replica, else this is the validator - blockchain_client: Option>, -} - -impl SimpleConsensus { - pub fn new(storage: Arc, blockchain_client: Option>) -> Self { - SimpleConsensus { storage, blockchain_client } - } -} - -#[async_trait] -impl Consensus for SimpleConsensus { - fn should_forward(&self) -> bool { - self.blockchain_client.is_some() - } - async fn should_serve(&self) -> bool { - let Some(blockchain_client) = &self.blockchain_client else { - return true; - }; - - //gather the latest block number, check how far behind it is from current storage block - //if its greater than 3 blocks of distance, it should not be served - let Ok(validator_block_number) = blockchain_client.fetch_block_number().await else { - tracing::error!("unable to fetch latest block number"); - return false; - }; - - let Ok(current_block_number) = self.storage.read_mined_block_number() else { - tracing::error!("unable to fetch current block number"); - return false; - }; - let should_serve = (validator_block_number.as_u64() - 3) <= current_block_number.as_u64(); - - if !should_serve { - tracing::info!(?validator_block_number, ?current_block_number, "validator and replica are too far appart"); - } - - return should_serve; - } - - async fn forward(&self, transaction: Bytes) -> anyhow::Result { - #[cfg(feature = "metrics")] - let start = metrics::now(); - - let Some(blockchain_client) = &self.blockchain_client else { - return log_and_err!("SimpleConsensus.forward was called but no blockchain client is set"); - }; - - let result = blockchain_client.send_raw_transaction(transaction.into()).await?; - - #[cfg(feature = "metrics")] - metrics::inc_consensus_forward(start.elapsed()); - - let tx_hash = result.tx_hash; - let validator_url = &blockchain_client.http_url; - tracing::info!(%tx_hash, ?validator_url, "forwarded eth_sendRawTransaction to leader"); - - Ok(result.tx_hash) - } -} diff --git a/src/eth/consensus/raft/tests/factories.rs b/src/eth/consensus/tests/factories.rs similarity index 89% rename from src/eth/consensus/raft/tests/factories.rs rename to src/eth/consensus/tests/factories.rs index ae3b16ce5..384f55346 100644 --- a/src/eth/consensus/raft/tests/factories.rs +++ b/src/eth/consensus/tests/factories.rs @@ -10,17 +10,18 @@ use ethereum_types::H256; use rand::Rng; use tokio::sync::Mutex; -use crate::eth::consensus::raft::append_entry::AppendBlockCommitResponse; -use crate::eth::consensus::raft::append_entry::AppendTransactionExecutionsResponse; -use crate::eth::consensus::raft::append_entry::BlockEntry; -use crate::eth::consensus::raft::append_entry::Log; -use crate::eth::consensus::raft::append_entry::RequestVoteResponse; -use crate::eth::consensus::raft::append_entry::TransactionExecutionEntry; -use crate::eth::consensus::raft::log_entry::LogEntry; -use crate::eth::consensus::raft::LogEntryData; -use crate::eth::consensus::raft::Peer; -use crate::eth::consensus::raft::PeerAddress; -use crate::eth::consensus::raft::Role; +use crate::eth::consensus::append_entry::AppendBlockCommitResponse; +use crate::eth::consensus::append_entry::AppendTransactionExecutionsResponse; +use crate::eth::consensus::append_entry::BlockEntry; +use crate::eth::consensus::append_entry::Log; +use crate::eth::consensus::append_entry::RequestVoteResponse; +use crate::eth::consensus::append_entry::TransactionExecutionEntry; +use crate::eth::consensus::log_entry::LogEntry; +use crate::eth::consensus::Consensus; +use crate::eth::consensus::LogEntryData; +use crate::eth::consensus::Peer; +use crate::eth::consensus::PeerAddress; +use crate::eth::consensus::Role; use crate::eth::storage::StratusStorage; static GLOBAL_COUNTER: AtomicUsize = AtomicUsize::new(0); @@ -106,7 +107,7 @@ pub fn create_mock_log_entry(index: u64, term: u64, data: LogEntryData) -> LogEn LogEntry { index, term, data } } -pub fn create_mock_consensus() -> Arc { +pub fn create_mock_consensus() -> Arc { let (storage, _tmpdir) = StratusStorage::mock_new_rocksdb(); storage.set_pending_block_number_as_next_if_not_set().unwrap(); let (_log_entries_storage, tmpdir_log_entries) = StratusStorage::mock_new_rocksdb(); @@ -120,7 +121,7 @@ pub fn create_mock_consensus() -> Arc { let miner = Miner::new(Arc::clone(&storage), crate::eth::miner::MinerMode::External); - Raft::new( + Consensus::new( Arc::clone(&storage), miner.into(), tmpdir_log_entries_path, @@ -138,7 +139,6 @@ use super::append_entry::AppendTransactionExecutionsRequest; use super::append_entry::StatusCode; use super::Hash; use super::Miner; -use super::Raft; // Define a simple interceptor that does nothing #[allow(dead_code)] // HACK to avoid unused code warning @@ -149,7 +149,7 @@ impl Interceptor for MockInterceptor { } } -fn create_mock_leader_peer(consensus: Arc) -> (PeerAddress, Peer) { +fn create_mock_leader_peer(consensus: Arc) -> (PeerAddress, Peer) { let leader_address = PeerAddress::from_string("http://127.0.0.1:3000;3777".to_string()).unwrap(); let client = MockAppendEntryServiceClient::new(); let leader_peer = Peer { @@ -162,9 +162,9 @@ fn create_mock_leader_peer(consensus: Arc) -> (PeerAddress, Peer) { (leader_address, leader_peer) } -pub async fn create_follower_consensus_with_leader(term: Option) -> Arc { +pub async fn create_follower_consensus_with_leader(term: Option) -> Arc { let consensus = create_mock_consensus(); - Raft::set_role(Role::Follower); + Consensus::set_role(Role::Follower); if let Some(term) = term { consensus.current_term.store(term, Ordering::SeqCst); @@ -179,9 +179,9 @@ pub async fn create_follower_consensus_with_leader(term: Option) -> Arc Arc { +pub fn create_leader_consensus() -> Arc { let consensus = create_mock_consensus(); - Raft::set_role(Role::Leader); + Consensus::set_role(Role::Leader); zero_global_counter(); consensus } diff --git a/src/eth/consensus/raft/tests/test_simple_blocks.rs b/src/eth/consensus/tests/test_simple_blocks.rs similarity index 95% rename from src/eth/consensus/raft/tests/test_simple_blocks.rs rename to src/eth/consensus/tests/test_simple_blocks.rs index 07d19840f..e401a3cae 100644 --- a/src/eth/consensus/raft/tests/test_simple_blocks.rs +++ b/src/eth/consensus/tests/test_simple_blocks.rs @@ -9,13 +9,13 @@ use tonic::Request; use super::factories::create_follower_consensus_with_leader; use super::factories::create_mock_block_entry; use super::factories::create_mock_transaction_execution_entry; -use crate::eth::consensus::raft::append_entry::append_entry_service_server::AppendEntryService; -use crate::eth::consensus::raft::append_entry::AppendBlockCommitRequest; -use crate::eth::consensus::raft::append_entry::AppendTransactionExecutionsRequest; -use crate::eth::consensus::raft::append_entry::StatusCode; -use crate::eth::consensus::raft::server::AppendEntryServiceImpl; -use crate::eth::consensus::raft::Role; -use crate::eth::consensus::raft::TransactionExecutionEntry; +use crate::eth::consensus::append_entry::append_entry_service_server::AppendEntryService; +use crate::eth::consensus::append_entry::AppendBlockCommitRequest; +use crate::eth::consensus::append_entry::AppendTransactionExecutionsRequest; +use crate::eth::consensus::append_entry::StatusCode; +use crate::eth::consensus::server::AppendEntryServiceImpl; +use crate::eth::consensus::Role; +use crate::eth::consensus::TransactionExecutionEntry; use crate::eth::primitives::Address; use crate::eth::primitives::BlockFilter; use crate::eth::primitives::BlockNumber; diff --git a/src/eth/consensus/raft/utils.rs b/src/eth/consensus/utils.rs similarity index 100% rename from src/eth/consensus/raft/utils.rs rename to src/eth/consensus/utils.rs diff --git a/src/eth/importer/importer.rs b/src/eth/importer/importer.rs deleted file mode 100644 index 30176b431..000000000 --- a/src/eth/importer/importer.rs +++ /dev/null @@ -1,437 +0,0 @@ -use std::cmp::min; -use std::sync::atomic::AtomicU64; -use std::sync::atomic::Ordering; -use std::sync::Arc; -use std::time::Duration; - -use futures::try_join; -use futures::StreamExt; -use serde::Deserialize; -use tokio::sync::mpsc; -use tokio::task::yield_now; -use tokio::time::timeout; -use tracing::Span; - -use crate::eth::executor::Executor; -use crate::eth::miner::Miner; -use crate::eth::primitives::BlockNumber; -use crate::eth::primitives::ExternalBlock; -use crate::eth::primitives::ExternalReceipt; -use crate::eth::primitives::ExternalReceipts; -use crate::eth::primitives::Hash; -use crate::eth::storage::StratusStorage; -use crate::ext::spawn_named; -use crate::ext::traced_sleep; -use crate::ext::DisplayExt; -use crate::ext::SleepReason; -use crate::if_else; -#[cfg(feature = "metrics")] -use crate::infra::metrics; -use crate::infra::tracing::warn_task_rx_closed; -use crate::infra::tracing::warn_task_tx_closed; -use crate::infra::tracing::SpanExt; -use crate::infra::BlockchainClient; -use crate::log_and_err; -#[cfg(feature = "metrics")] -use crate::utils::calculate_tps; -use crate::utils::DropTimer; -use crate::GlobalState; - -// ----------------------------------------------------------------------------- -// Globals -// ----------------------------------------------------------------------------- - -/// Current block number of the external RPC blockchain. -static EXTERNAL_RPC_CURRENT_BLOCK: AtomicU64 = AtomicU64::new(0); - -/// Only sets the external RPC current block number if it is equals or greater than the current one. -fn set_external_rpc_current_block(new_number: BlockNumber) { - let new_number_u64 = new_number.as_u64(); - let _ = EXTERNAL_RPC_CURRENT_BLOCK.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |current_number| { - if_else!(new_number_u64 >= current_number, Some(new_number_u64), None) - }); -} - -// ----------------------------------------------------------------------------- -// Constants -// ----------------------------------------------------------------------------- -/// Number of blocks that are downloaded in parallel. -const PARALLEL_BLOCKS: usize = 3; - -/// Number of receipts that are downloaded in parallel. -const PARALLEL_RECEIPTS: usize = 100; - -/// Timeout awaiting for newHeads event before fallback to polling. -const TIMEOUT_NEW_HEADS: Duration = Duration::from_millis(2000); - -/// Interval before we starting retrieving receipts because they are not immediately available after the block is retrieved. -const INTERVAL_FETCH_RECEIPTS: Duration = Duration::from_millis(50); - -pub struct Importer { - executor: Arc, - - miner: Arc, - - storage: Arc, - - chain: Arc, - - sync_interval: Duration, -} - -impl Importer { - /// Creates a new [`Importer`]. - pub fn new(executor: Arc, miner: Arc, storage: Arc, chain: Arc, sync_interval: Duration) -> Self { - tracing::info!("creating importer"); - Self { - executor, - miner, - storage, - chain, - sync_interval, - } - } - - // ----------------------------------------------------------------------------- - // Execution - // ----------------------------------------------------------------------------- - - pub async fn run_importer_online(self: Arc) -> anyhow::Result<()> { - let _timer = DropTimer::start("importer-online::run_importer_online"); - - let storage = &self.storage; - let number = storage.read_block_number_to_resume_import()?; - - let (backlog_tx, backlog_rx) = mpsc::unbounded_channel(); - - // spawn block executor: - // it executes and mines blocks and expects to receive them via channel in the correct order. - let task_executor = spawn_named( - "importer::executor", - Importer::start_block_executor(Arc::clone(&self.executor), Arc::clone(&self.miner), backlog_rx), - ); - - // spawn block number: - // it keeps track of the blockchain current block number. - let number_fetcher_chain = Arc::clone(&self.chain); - let task_number_fetcher = spawn_named( - "importer::number-fetcher", - Importer::start_number_fetcher(number_fetcher_chain, self.sync_interval), - ); - - // spawn block fetcher: - // it fetches blocks and receipts in parallel and sends them to the executor in the correct order. - // it uses the number fetcher current block to determine if should keep downloading more blocks or not. - let block_fetcher_chain = Arc::clone(&self.chain); - let task_block_fetcher = spawn_named( - "importer::block-fetcher", - Importer::start_block_fetcher(block_fetcher_chain, backlog_tx, number), - ); - - // await all tasks - if let Err(e) = try_join!(task_executor, task_block_fetcher, task_number_fetcher) { - tracing::error!(reason = ?e, "importer-online failed"); - } - Ok(()) - } - - // ----------------------------------------------------------------------------- - // Executor - // ----------------------------------------------------------------------------- - - // Executes external blocks and persist them to storage. - async fn start_block_executor( - executor: Arc, - miner: Arc, - mut backlog_rx: mpsc::UnboundedReceiver<(ExternalBlock, Vec)>, - ) -> anyhow::Result<()> { - const TASK_NAME: &str = "block-executor"; - - loop { - if GlobalState::is_shutdown_warn(TASK_NAME) { - return Ok(()); - } - - let (block, receipts) = match timeout(Duration::from_secs(2), backlog_rx.recv()).await { - Ok(Some(inner)) => inner, - Ok(None) => break, // channel closed - Err(_timed_out) => { - tracing::warn!(timeout = "2s", "timeout reading block executor channel, expected around 1 block per second"); - continue; - } - }; - - #[cfg(feature = "metrics")] - let start = metrics::now(); - - // execute and mine - let receipts = ExternalReceipts::from(receipts); - if let Err(e) = executor.execute_external_block(&block, &receipts) { - let message = GlobalState::shutdown_from(TASK_NAME, "failed to reexecute external block"); - return log_and_err!(reason = e, message); - }; - - // statistics - #[cfg(feature = "metrics")] - { - let duration = start.elapsed(); - let tps = calculate_tps(duration, block.transactions.len()); - - tracing::info!( - tps, - duraton = %duration.to_string_ext(), - block_number = %block.number(), - receipts = receipts.len(), - "reexecuted external block", - ); - } - - if let Err(e) = miner.mine_external_mixed_and_commit() { - let message = GlobalState::shutdown_from(TASK_NAME, "failed to mine external block"); - return log_and_err!(reason = e, message); - }; - - #[cfg(feature = "metrics")] - { - metrics::inc_n_importer_online_transactions_total(receipts.len() as u64); - metrics::inc_import_online_mined_block(start.elapsed()); - } - } - - warn_task_tx_closed(TASK_NAME); - Ok(()) - } - - // ----------------------------------------------------------------------------- - // Number fetcher - // ----------------------------------------------------------------------------- - - /// Retrieves the blockchain current block number. - async fn start_number_fetcher(chain: Arc, sync_interval: Duration) -> anyhow::Result<()> { - const TASK_NAME: &str = "external-number-fetcher"; - - // initial newHeads subscriptions. - // abort application if cannot subscribe. - let mut sub_new_heads = if chain.supports_ws() { - tracing::info!("{} subscribing to newHeads event", TASK_NAME); - - match chain.subscribe_new_heads().await { - Ok(sub) => { - tracing::info!("{} subscribed to newHeads events", TASK_NAME); - Some(sub) - } - Err(e) => { - let message = GlobalState::shutdown_from(TASK_NAME, "cannot subscribe to newHeads event"); - return log_and_err!(reason = e, message); - } - } - } else { - tracing::warn!("{} blockchain client does not have websocket enabled", TASK_NAME); - None - }; - - // keep reading websocket subscription or polling via http. - loop { - if GlobalState::is_shutdown_warn(TASK_NAME) { - return Ok(()); - } - - // if we have a subscription, try to read from subscription. - // in case of failure, re-subscribe because current subscription may have been closed in the server. - if let Some(sub) = &mut sub_new_heads { - tracing::info!("{} awaiting block number from newHeads subscription", TASK_NAME); - let resubscribe_ws = match timeout(TIMEOUT_NEW_HEADS, sub.next()).await { - Ok(Some(Ok(block))) => { - tracing::info!(block_number = %block.number(), "{} received newHeads event", TASK_NAME); - set_external_rpc_current_block(block.number()); - continue; - } - Ok(None) => { - if !GlobalState::is_shutdown_warn(TASK_NAME) { - tracing::error!("{} newHeads subscription closed by the other side", TASK_NAME); - } - true - } - Ok(Some(Err(e))) => { - if !GlobalState::is_shutdown_warn(TASK_NAME) { - tracing::error!(reason = ?e, "{} failed to read newHeads subscription event", TASK_NAME); - } - true - } - Err(_) => { - if !GlobalState::is_shutdown_warn(TASK_NAME) { - tracing::error!("{} timed-out waiting for newHeads subscription event", TASK_NAME); - } - true - } - }; - - if GlobalState::is_shutdown_warn(TASK_NAME) { - return Ok(()); - } - - // resubscribe if necessary. - // only update the existing subscription if succedeed, otherwise we will try again in the next iteration. - if chain.supports_ws() && resubscribe_ws { - tracing::info!("{} resubscribing to newHeads event", TASK_NAME); - match chain.subscribe_new_heads().await { - Ok(sub) => { - tracing::info!("{} resubscribed to newHeads event", TASK_NAME); - sub_new_heads = Some(sub); - } - Err(e) => - if !GlobalState::is_shutdown_warn(TASK_NAME) { - tracing::error!(reason = ?e, "{} failed to resubscribe to newHeads event", TASK_NAME); - }, - } - } - } - - if GlobalState::is_shutdown_warn(TASK_NAME) { - return Ok(()); - } - - // fallback to polling - tracing::warn!("{} falling back to http polling because subscription failed or it is not enabled", TASK_NAME); - match chain.fetch_block_number().await { - Ok(block_number) => { - tracing::info!( - %block_number, - sync_interval = %sync_interval.to_string_ext(), - "fetched current block number via http. awaiting sync interval to retrieve again." - ); - set_external_rpc_current_block(block_number); - traced_sleep(sync_interval, SleepReason::SyncData).await; - } - Err(e) => - if !GlobalState::is_shutdown_warn(TASK_NAME) { - tracing::error!(reason = ?e, "failed to retrieve block number. retrying now."); - }, - } - } - } - - // ----------------------------------------------------------------------------- - // Block fetcher - // ----------------------------------------------------------------------------- - - /// Retrieves blocks and receipts. - async fn start_block_fetcher( - chain: Arc, - backlog_tx: mpsc::UnboundedSender<(ExternalBlock, Vec)>, - mut importer_block_number: BlockNumber, - ) -> anyhow::Result<()> { - const TASK_NAME: &str = "external-block-fetcher"; - - loop { - if GlobalState::is_shutdown_warn(TASK_NAME) { - return Ok(()); - } - - // if we are ahead of current block number, await until we are behind again - let external_rpc_current_block = EXTERNAL_RPC_CURRENT_BLOCK.load(Ordering::Relaxed); - if importer_block_number.as_u64() > external_rpc_current_block { - yield_now().await; - continue; - } - - // we are behind current, so we will fetch multiple blocks in parallel to catch up - let blocks_behind = external_rpc_current_block.saturating_sub(importer_block_number.as_u64()) + 1; // TODO: use count_to from BlockNumber - let mut blocks_to_fetch = min(blocks_behind, 1_000); // avoid spawning millions of tasks (not parallelism), at least until we know it is safe - tracing::info!(%blocks_behind, blocks_to_fetch, "catching up with blocks"); - - let mut tasks = Vec::with_capacity(blocks_to_fetch as usize); - while blocks_to_fetch > 0 { - blocks_to_fetch -= 1; - tasks.push(fetch_block_and_receipts(Arc::clone(&chain), importer_block_number)); - importer_block_number = importer_block_number.next(); - } - - // keep fetching in order - let mut tasks = futures::stream::iter(tasks).buffered(PARALLEL_BLOCKS); - while let Some((block, receipts)) = tasks.next().await { - if backlog_tx.send((block, receipts)).is_err() { - warn_task_rx_closed(TASK_NAME); - return Ok(()); - } - } - } - } -} - -// ----------------------------------------------------------------------------- -// Helpers -// ----------------------------------------------------------------------------- - -#[tracing::instrument(name = "importer::fetch_block_and_receipts", skip_all, fields(block_number))] -async fn fetch_block_and_receipts(chain: Arc, block_number: BlockNumber) -> (ExternalBlock, Vec) { - Span::with(|s| { - s.rec_str("block_number", &block_number); - }); - - // fetch block - let block = fetch_block(Arc::clone(&chain), block_number).await; - - // wait some time until receipts are available - let _ = traced_sleep(INTERVAL_FETCH_RECEIPTS, SleepReason::SyncData).await; - - // fetch receipts in parallel - let mut receipts_tasks = Vec::with_capacity(block.transactions.len()); - for hash in block.transactions.iter().map(|tx| tx.hash()) { - receipts_tasks.push(fetch_receipt(Arc::clone(&chain), block_number, hash)); - } - let receipts = futures::stream::iter(receipts_tasks).buffer_unordered(PARALLEL_RECEIPTS).collect().await; - - (block, receipts) -} - -#[tracing::instrument(name = "importer::fetch_block", skip_all, fields(block_number))] -async fn fetch_block(chain: Arc, block_number: BlockNumber) -> ExternalBlock { - const RETRY_DELAY: Duration = Duration::from_millis(10); - Span::with(|s| { - s.rec_str("block_number", &block_number); - }); - - loop { - tracing::info!(%block_number, "fetching block"); - let block = match chain.fetch_block(block_number).await { - Ok(json) => json, - Err(e) => { - tracing::warn!(reason = ?e, %block_number, delay_ms=%RETRY_DELAY.as_millis(), "failed to retrieve block. retrying with delay."); - traced_sleep(RETRY_DELAY, SleepReason::RetryBackoff).await; - continue; - } - }; - - if block.is_null() { - tracing::warn!(%block_number, delay_ms=%RETRY_DELAY.as_millis(), "block not mined yet. retrying with delay."); - traced_sleep(RETRY_DELAY, SleepReason::SyncData).await; - continue; - } - - return ExternalBlock::deserialize(&block).expect("cannot fail to deserialize external block"); - } -} - -#[tracing::instrument(name = "importer::fetch_receipt", skip_all, fields(block_number, tx_hash))] -async fn fetch_receipt(chain: Arc, block_number: BlockNumber, tx_hash: Hash) -> ExternalReceipt { - Span::with(|s| { - s.rec_str("block_number", &block_number); - s.rec_str("tx_hash", &tx_hash); - }); - - loop { - tracing::info!(%block_number, %tx_hash, "fetching receipt"); - - match chain.fetch_receipt(tx_hash).await { - Ok(Some(receipt)) => return receipt, - Ok(None) => { - tracing::warn!(%block_number, %tx_hash, "receipt not available yet because block is not mined. retrying now."); - continue; - } - Err(e) => { - tracing::error!(reason = ?e, %block_number, %tx_hash, "failed to fetch receipt. retrying now."); - } - } - } -} diff --git a/src/eth/importer/importer_config.rs b/src/eth/importer/importer_config.rs deleted file mode 100644 index b2e604e76..000000000 --- a/src/eth/importer/importer_config.rs +++ /dev/null @@ -1,67 +0,0 @@ -use std::sync::Arc; -use std::time::Duration; - -use clap::Parser; -use display_json::DebugAsJson; - -use crate::eth::executor::Executor; -use crate::eth::importer::Importer; -use crate::eth::miner::Miner; -use crate::eth::storage::StratusStorage; -use crate::ext::parse_duration; -use crate::ext::spawn_named; -use crate::infra::BlockchainClient; - -#[derive(Default, Parser, DebugAsJson, Clone, serde::Serialize)] -#[group(requires_all = ["external_rpc"])] -pub struct ImporterConfig { - /// External RPC HTTP endpoint to sync blocks with Stratus. - #[arg( - short = 'r', - long = "external-rpc", - env = "EXTERNAL_RPC", - conflicts_with("leader"), - requires_if("follower", "true") - )] - pub external_rpc: String, - - /// External RPC WS endpoint to sync blocks with Stratus. - #[arg(short = 'w', long = "external-rpc-ws", env = "EXTERNAL_RPC_WS", conflicts_with("leader"))] - pub external_rpc_ws: Option, - - /// Timeout for blockchain requests (importer online) - #[arg(long = "external-rpc-timeout", value_parser=parse_duration, env = "EXTERNAL_RPC_TIMEOUT", default_value = "2s")] - pub external_rpc_timeout: Duration, - - #[arg(long = "sync-interval", value_parser=parse_duration, env = "SYNC_INTERVAL", default_value = "100ms")] - pub sync_interval: Duration, -} - -impl ImporterConfig { - pub fn init( - &self, - executor: Arc, - miner: Arc, - storage: Arc, - chain: Arc, - ) -> anyhow::Result> { - const TASK_NAME: &str = "importer::init"; - tracing::info!(config = ?self, "creating importer"); - - let config = self.clone(); - - let importer = Importer::new(executor, miner, Arc::clone(&storage), chain, config.sync_interval); - let importer = Arc::new(importer); - - spawn_named(TASK_NAME, { - let importer = Arc::clone(&importer); - async move { - if let Err(e) = importer.run_importer_online().await { - tracing::error!(reason = ?e, "importer-online failed"); - } - } - }); - - Ok(importer) - } -} diff --git a/src/eth/importer/mod.rs b/src/eth/importer/mod.rs deleted file mode 100644 index 943c920b2..000000000 --- a/src/eth/importer/mod.rs +++ /dev/null @@ -1,6 +0,0 @@ -#[allow(clippy::module_inception)] -mod importer; -mod importer_config; - -pub use importer::Importer; -pub use importer_config::ImporterConfig; diff --git a/src/eth/miner/miner.rs b/src/eth/miner/miner.rs index 0f8f956ce..318721f4c 100644 --- a/src/eth/miner/miner.rs +++ b/src/eth/miner/miner.rs @@ -8,7 +8,7 @@ use nonempty::NonEmpty; use tokio::sync::broadcast; use tracing::Span; -use crate::eth::consensus::raft::append_entry; +use crate::eth::consensus::append_entry; use crate::eth::miner::MinerMode; use crate::eth::primitives::Block; use crate::eth::primitives::BlockHeader; @@ -477,6 +477,11 @@ mod interval_miner { continue; } + if not(GlobalState::is_leader()) { + tracing::warn!("skipping mining block because node is not a leader"); + continue; + } + // mine tracing::info!(lag_us = %tick.elapsed().as_micros(), "interval mining block"); mine_and_commit(&miner); diff --git a/src/eth/mod.rs b/src/eth/mod.rs index 05d00acc3..58972b3e6 100644 --- a/src/eth/mod.rs +++ b/src/eth/mod.rs @@ -1,7 +1,6 @@ pub mod codegen; pub mod consensus; pub mod executor; -pub mod importer; pub mod miner; pub mod primitives; pub mod rpc; diff --git a/src/eth/primitives/block_header.rs b/src/eth/primitives/block_header.rs index f58557d58..ded3cb577 100644 --- a/src/eth/primitives/block_header.rs +++ b/src/eth/primitives/block_header.rs @@ -13,7 +13,7 @@ use jsonrpsee::SubscriptionMessage; use crate::alias::EthersBlockVoid; use crate::alias::EthersBytes; -use crate::eth::consensus::raft::append_entry; +use crate::eth::consensus::append_entry; use crate::eth::primitives::logs_bloom::LogsBloom; use crate::eth::primitives::Address; use crate::eth::primitives::BlockNumber; diff --git a/src/eth/primitives/transaction_execution.rs b/src/eth/primitives/transaction_execution.rs index c326748e6..74a9f4d7e 100644 --- a/src/eth/primitives/transaction_execution.rs +++ b/src/eth/primitives/transaction_execution.rs @@ -7,9 +7,9 @@ use ethereum_types::H256; use ethereum_types::U64; use super::Gas; -use crate::eth::consensus::raft::append_entry; -use crate::eth::consensus::raft::utils::bytes_to_u256; -use crate::eth::consensus::raft::utils::u256_to_bytes; +use crate::eth::consensus::append_entry; +use crate::eth::consensus::utils::bytes_to_u256; +use crate::eth::consensus::utils::u256_to_bytes; use crate::eth::executor::EvmExecutionResult; use crate::eth::primitives::Address; use crate::eth::primitives::Bytes; diff --git a/src/eth/rpc/rpc_context.rs b/src/eth/rpc/rpc_context.rs index 673bbccc6..90bfec7bf 100644 --- a/src/eth/rpc/rpc_context.rs +++ b/src/eth/rpc/rpc_context.rs @@ -26,7 +26,7 @@ pub struct RpcContext { #[allow(dead_code)] // HACK this was triggered in Rust 1.79 pub miner: Arc, pub storage: Arc, - pub consensus: Arc, + pub consensus: Arc, pub rpc_server: RpcServerConfig, pub subs: Arc, } diff --git a/src/eth/rpc/rpc_server.rs b/src/eth/rpc/rpc_server.rs index e6af9a5d7..ea7cc2656 100644 --- a/src/eth/rpc/rpc_server.rs +++ b/src/eth/rpc/rpc_server.rs @@ -69,7 +69,7 @@ pub async fn serve_rpc( storage: Arc, executor: Arc, miner: Arc, - consensus: Arc, + consensus: Arc, // config app_config: impl serde::Serialize, @@ -300,8 +300,8 @@ fn stratus_disable_miner(_: Params<'_>, _: &RpcContext, _: &Extensions) -> bool // Stratus - State // ----------------------------------------------------------------------------- -fn stratus_version(_: Params<'_>, _: &RpcContext, _: &Extensions) -> Result { - Ok(build_info::as_json()) +fn stratus_version(_: Params<'_>, ctx: &RpcContext, _: &Extensions) -> Result { + Ok(build_info::as_json(ctx)) } fn stratus_config(_: Params<'_>, ctx: &RpcContext, _: &Extensions) -> Result { @@ -621,7 +621,10 @@ fn eth_send_raw_transaction(params: Params<'_>, ctx: Arc, ext: Exten if ctx.consensus.should_forward() { tracing::info!(%tx_hash, "forwarding eth_sendRawTransaction to leader"); return match Handle::current().block_on(ctx.consensus.forward(data)) { - Ok(hash) => Ok(hex_data(hash)), + Ok((hash, url)) => { + tracing::info!(%tx_hash, %url, "forwarded eth_sendRawTransaction to leader"); + Ok(hex_data(hash)) + } Err(e) => { tracing::error!(reason = ?e, %tx_hash, "failed to forward eth_sendRawTransaction to leader"); Err(StratusError::TransactionForwardToLeaderFailed) diff --git a/src/globals.rs b/src/globals.rs index bd09e317b..6c0a85f67 100644 --- a/src/globals.rs +++ b/src/globals.rs @@ -10,6 +10,7 @@ use tokio_util::sync::CancellationToken; use crate::config::load_dotenv; use crate::config::WithCommonConfig; +use crate::eth::Consensus; use crate::ext::spawn_signal_handler; use crate::infra; use crate::infra::tracing::warn_task_cancellation; @@ -144,6 +145,15 @@ impl GlobalState { warn_task_cancellation(task_name); } + // ------------------------------------------------------------------------- + // Leadership + // ------------------------------------------------------------------------- + + /// Checks if node is leader. + pub fn is_leader() -> bool { + Consensus::is_leader() + } + // ------------------------------------------------------------------------- // Transaction // ------------------------------------------------------------------------- diff --git a/src/infra/blockchain_client/blockchain_client.rs b/src/infra/blockchain_client/blockchain_client.rs index b2ea0786c..bf4fe93ee 100644 --- a/src/infra/blockchain_client/blockchain_client.rs +++ b/src/infra/blockchain_client/blockchain_client.rs @@ -26,7 +26,6 @@ use crate::ext::to_json_value; use crate::ext::DisplayExt; use crate::infra::tracing::TracingExt; use crate::log_and_err; -use crate::GlobalState; #[derive(Debug)] pub struct BlockchainClient { @@ -228,15 +227,10 @@ impl BlockchainClient { // ------------------------------------------------------------------------- pub async fn subscribe_new_heads(&self) -> anyhow::Result> { - const TASK_NAME: &str = "blockchain::subscribe_new_heads"; tracing::debug!("subscribing to newHeads event"); let mut first_attempt = true; loop { - if GlobalState::is_shutdown_warn(TASK_NAME) { - return Err(anyhow::anyhow!("shutdown warning")); - }; - let ws_read = self.require_ws().await?; let result = ws_read .subscribe::>("eth_subscribe", vec![JsonValue::String("newHeads".to_owned())], "eth_unsubscribe") diff --git a/src/infra/build_info.rs b/src/infra/build_info.rs index 7b37b4cbf..41272dffa 100644 --- a/src/infra/build_info.rs +++ b/src/infra/build_info.rs @@ -1,6 +1,8 @@ use serde_json::json; use crate::alias::JsonValue; +use crate::eth::rpc::RpcContext; +use crate::eth::Consensus; // ----------------------------------------------------------------------------- // Build constants @@ -57,8 +59,20 @@ pub fn version() -> &'static str { } } +pub fn is_leader() -> bool { + Consensus::is_leader() +} + +pub fn current_term(ctx: &RpcContext) -> u64 { + ctx.consensus.current_term() +} + +pub fn last_index(ctx: &RpcContext) -> u64 { + ctx.consensus.last_index() +} + /// Returns build info as JSON. -pub fn as_json() -> JsonValue { +pub fn as_json(ctx: &RpcContext) -> JsonValue { json!( { "build": { @@ -84,6 +98,11 @@ pub fn as_json() -> JsonValue { "version": RUST_VERSION, "channel": RUST_CHANNEL, "target": RUST_TARGET, + }, + "consensus": { + "is_leader": is_leader(), + "current_term": current_term(ctx), + "last_index": last_index(ctx), } } ) diff --git a/src/main.rs b/src/main.rs index 2e93f4d85..0085a719a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,10 +1,8 @@ use std::sync::Arc; use stratus::config::StratusConfig; -use stratus::eth::consensus::simple_consensus::SimpleConsensus; -use stratus::eth::consensus::Consensus; use stratus::eth::rpc::serve_rpc; -use stratus::infra::BlockchainClient; +use stratus::eth::Consensus; use stratus::GlobalServices; fn main() -> anyhow::Result<()> { @@ -13,58 +11,28 @@ fn main() -> anyhow::Result<()> { } async fn run(config: StratusConfig) -> anyhow::Result<()> { - // Init services + // init services let storage = config.storage.init()?; - - // Init miner - let miner = if config.follower { - config.miner.init_external_mode(Arc::clone(&storage))? - } else { - config.miner.init(Arc::clone(&storage))? - }; - - // Init executor + let miner = config.miner.init(Arc::clone(&storage))?; let executor = config.executor.init(Arc::clone(&storage), Arc::clone(&miner)); - - // Init chain - let chain = if config.follower { - let importer_config = config.importer.as_ref().ok_or(anyhow::anyhow!("importer config is not set"))?; - Some(Arc::new( - BlockchainClient::new_http_ws( - importer_config.external_rpc.as_ref(), - importer_config.external_rpc_ws.as_deref(), - importer_config.external_rpc_timeout, - ) - .await?, - )) - } else { - None - }; - - // Init consensus - let consensus: Arc = Arc::new(SimpleConsensus::new(Arc::clone(&storage), chain.clone())); - - // Init importer - if config.follower { - if let Some(importer_config) = &config.importer { - if let Some(chain) = chain { - importer_config.init(Arc::clone(&executor), Arc::clone(&miner), Arc::clone(&storage), chain)?; - } else { - return Err(anyhow::anyhow!("chain is not initialized")); - } - } else { - return Err(anyhow::anyhow!("importer config is not set")); - } - } - - // Init RPC server + let consensus = Consensus::new( + Arc::clone(&storage), + Arc::clone(&miner), + config.storage.perm_storage.rocks_path_prefix.clone(), + config.clone().candidate_peers.clone(), + None, + config.rpc_server.address, + config.grpc_server_address, + ); // for now, we force None to initiate with the current node being the leader + + // start rpc server serve_rpc( - // Services + // services Arc::clone(&storage), executor, miner, consensus, - // Config + // config config.clone(), config.rpc_server, config.executor.executor_chain_id.into(),