diff --git a/.dockerignore b/.dockerignore index 009d53ecd..3ff41081f 100644 --- a/.dockerignore +++ b/.dockerignore @@ -12,3 +12,4 @@ !static/ !.sqlx/ !build.rs +!proto diff --git a/.github/workflows/_setup-e2e.yml b/.github/workflows/_setup-e2e.yml index 8fc295967..b31c680fd 100644 --- a/.github/workflows/_setup-e2e.yml +++ b/.github/workflows/_setup-e2e.yml @@ -42,6 +42,9 @@ jobs: if: ${{ steps.cache-cargo.outputs.cache-hit != 'true' }} run: curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y + - name: Install protoc + run: sudo apt-get install -y protobuf-compiler + - name: Set up Just uses: extractions/setup-just@v1 @@ -68,4 +71,4 @@ jobs: env: CARGO_PROFILE_RELEASE_DEBUG: 0 RUST_LOG: off - RELEASE: 1 \ No newline at end of file + RELEASE: 1 diff --git a/.github/workflows/comment-tag-report.yml b/.github/workflows/comment-tag-report.yml index 828776375..2252b106b 100644 --- a/.github/workflows/comment-tag-report.yml +++ b/.github/workflows/comment-tag-report.yml @@ -20,6 +20,9 @@ jobs: - name: Set up Rust run: curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y + - name: Install protoc + run: sudo apt-get install -y protobuf-compiler + - name: Setup Ruby uses: ruby/setup-ruby@v1 with: @@ -39,4 +42,3 @@ jobs: run: ruby utils/slack-notifiers/comments.rb env: SLACK_WEBHOOK_URL: ${{ secrets.STRATUS_SLACK_WEBHOOK_URL }} - \ No newline at end of file diff --git a/.github/workflows/doc-test.yml b/.github/workflows/doc-test.yml index 115424210..4c6cbb0d1 100644 --- a/.github/workflows/doc-test.yml +++ b/.github/workflows/doc-test.yml @@ -47,8 +47,11 @@ jobs: if: ${{ steps.cache-cargo.outputs.cache-hit != 'true' }} run: curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y + - name: Install protoc + run: sudo apt-get install -y protobuf-compiler + - name: Set up Just uses: extractions/setup-just@v1 - name: Test docs - run: just test-doc \ No newline at end of file + run: just test-doc diff --git a/.github/workflows/e2e-contracts-postgres.yml b/.github/workflows/e2e-contracts-postgres.yml index 9496f8c90..e30188749 100644 --- a/.github/workflows/e2e-contracts-postgres.yml +++ b/.github/workflows/e2e-contracts-postgres.yml @@ -59,6 +59,9 @@ jobs: if: ${{ steps.cache-cargo.outputs.cache-hit != 'true' }} run: curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y + - name: Install protoc + run: sudo apt-get install -y protobuf-compiler + - name: Set up Just uses: extractions/setup-just@v1 diff --git a/.github/workflows/e2e-contracts-rocks.yml b/.github/workflows/e2e-contracts-rocks.yml index cac7b4e46..9654a46b9 100644 --- a/.github/workflows/e2e-contracts-rocks.yml +++ b/.github/workflows/e2e-contracts-rocks.yml @@ -59,6 +59,9 @@ jobs: if: ${{ steps.cache-cargo.outputs.cache-hit != 'true' }} run: curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y + - name: Install protoc + run: sudo apt-get install -y protobuf-compiler + - name: Set up Just uses: extractions/setup-just@v1 diff --git a/.github/workflows/e2e-contracts.yml b/.github/workflows/e2e-contracts.yml index 225116f23..f8faf3c9d 100644 --- a/.github/workflows/e2e-contracts.yml +++ b/.github/workflows/e2e-contracts.yml @@ -59,6 +59,9 @@ jobs: if: ${{ steps.cache-cargo.outputs.cache-hit != 'true' }} run: curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y + - name: Install protoc + run: sudo apt-get install -y protobuf-compiler + - name: Set up Just uses: extractions/setup-just@v1 diff --git a/.github/workflows/int-test.yml b/.github/workflows/int-test.yml index f254e2404..94f2cc122 100644 --- a/.github/workflows/int-test.yml +++ b/.github/workflows/int-test.yml @@ -55,6 +55,9 @@ jobs: if: ${{ steps.cache-cargo.outputs.cache-hit != 'true' }} run: curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y + - name: Install protoc + run: sudo apt-get install -y protobuf-compiler + - name: Set up Just uses: extractions/setup-just@v1 diff --git a/.github/workflows/lint-check.yml b/.github/workflows/lint-check.yml index aafc1497a..4eaf6c181 100644 --- a/.github/workflows/lint-check.yml +++ b/.github/workflows/lint-check.yml @@ -53,8 +53,11 @@ jobs: - name: Set up Rust Nightly run: curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y --default-toolchain nightly-2024-01-01 + - name: Install protoc + run: sudo apt-get install -y protobuf-compiler + - name: Set up Just uses: extractions/setup-just@v1 - name: Just lint-check - run: just lint-check -2024-01-01 \ No newline at end of file + run: just lint-check -2024-01-01 diff --git a/.github/workflows/outdated.yml b/.github/workflows/outdated.yml index 2b2cc6586..43b82d74a 100644 --- a/.github/workflows/outdated.yml +++ b/.github/workflows/outdated.yml @@ -18,8 +18,11 @@ jobs: uses: actions/checkout@v2 - name: Set up Rust - run: curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y - + run: curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y + + - name: Install protoc + run: sudo apt-get install -y protobuf-compiler + - name: Set up Just uses: extractions/setup-just@v1 @@ -36,4 +39,3 @@ jobs: env: SLACK_WEBHOOK_URL: ${{ secrets.STRATUS_SLACK_WEBHOOK_URL }} OUTDATED_TABLE_FILE_NAME: outdated.txt - \ No newline at end of file diff --git a/.github/workflows/unit-test.yml b/.github/workflows/unit-test.yml index dda207994..058350e94 100644 --- a/.github/workflows/unit-test.yml +++ b/.github/workflows/unit-test.yml @@ -54,8 +54,11 @@ jobs: if: ${{ steps.cache-cargo.outputs.cache-hit != 'true' }} run: curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y + - name: Install protoc + run: sudo apt-get install -y protobuf-compiler + - name: Set up Just uses: extractions/setup-just@v1 - name: Unit tests - run: just test-unit \ No newline at end of file + run: just test-unit diff --git a/Cargo.lock b/Cargo.lock index 493b77159..f4e3ca5e1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1543,6 +1543,12 @@ dependencies = [ "static_assertions", ] +[[package]] +name = "fixedbitset" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80" + [[package]] name = "flate2" version = "1.0.30" @@ -2963,6 +2969,12 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "multimap" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "defc4c55412d89136f966bbb339008b474350e5e6e78d2714439c386b3137a03" + [[package]] name = "native-tls" version = "0.2.11" @@ -3464,6 +3476,16 @@ dependencies = [ "sha2", ] +[[package]] +name = "petgraph" +version = "0.6.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b4c5cc86750666a3ed20bdaf5ca2a0344f9c67674cae0515bec2da16fbaa47db" +dependencies = [ + "fixedbitset", + "indexmap 2.2.6", +] + [[package]] name = "phf" version = "0.11.2" @@ -3579,6 +3601,16 @@ version = "0.2.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" +[[package]] +name = "prettyplease" +version = "0.2.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5f12335488a2f3b0a83b14edad48dca9879ce89b2edd10e80237e4e852dd645e" +dependencies = [ + "proc-macro2", + "syn 2.0.60", +] + [[package]] name = "primitive-types" version = "0.12.2" @@ -3690,6 +3722,27 @@ dependencies = [ "prost-derive", ] +[[package]] +name = "prost-build" +version = "0.12.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "22505a5c94da8e3b7c2996394d1c933236c4d743e81a410bcca4e6989fc066a4" +dependencies = [ + "bytes", + "heck 0.5.0", + "itertools 0.12.1", + "log", + "multimap", + "once_cell", + "petgraph", + "prettyplease", + "prost", + "prost-types", + "regex", + "syn 2.0.60", + "tempfile", +] + [[package]] name = "prost-derive" version = "0.12.6" @@ -5341,6 +5394,7 @@ dependencies = [ "phf", "phf_codegen", "pin-project", + "prost", "quote", "raft", "rand", @@ -5363,6 +5417,8 @@ dependencies = [ "thiserror", "tokio", "tokio-util", + "tonic 0.11.0", + "tonic-build", "tower", "tracing", "tracing-opentelemetry", @@ -5861,6 +5917,19 @@ dependencies = [ "tracing", ] +[[package]] +name = "tonic-build" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "be4ef6dd70a610078cb4e338a0f79d06bc759ff1b22d2120c2ff02ae264ba9c2" +dependencies = [ + "prettyplease", + "proc-macro2", + "prost-build", + "quote", + "syn 2.0.60", +] + [[package]] name = "tower" version = "0.4.13" diff --git a/Cargo.toml b/Cargo.toml index ac4da9ea3..9956a45e1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -69,6 +69,8 @@ triehash = "=0.8.4" jsonrpsee = { version = "=0.22.4", features = ["server", "client"] } reqwest = { version = "=0.12.4", features = ["json"] } tower = "=0.4.13" +tonic = "=0.11.0" +prost = "=0.12.6" # observability console-subscriber = "=0.2.0" @@ -112,6 +114,7 @@ const-hex = "=1.10.0" glob = "=0.3.1" nom = "=7.1.3" phf_codegen = "=0.11.2" +tonic-build = "=0.11.0" # ------------------------------------------------------------------------------ # Binaries diff --git a/build.rs b/build.rs index fd20b4af0..533e7da3e 100644 --- a/build.rs +++ b/build.rs @@ -11,6 +11,7 @@ use nom::sequence::separated_pair; use nom::IResult; fn main() { + tonic_build::compile_protos("proto/raft.proto").unwrap(); // any code change println!("cargo:rerun-if-changed=src/"); // used in signatures codegen diff --git a/docker/Dockerfile.run_with_importer b/docker/Dockerfile.run_with_importer index b36e16a93..9a4c4c47c 100644 --- a/docker/Dockerfile.run_with_importer +++ b/docker/Dockerfile.run_with_importer @@ -10,9 +10,10 @@ COPY Cargo.toml /app/Cargo.toml COPY Cargo.lock /app/Cargo.lock COPY .cargo .cargo COPY config /app/config +COPY proto /app/proto RUN apt update -RUN apt-get install -y libclang-dev cmake +RUN apt-get install -y libclang-dev cmake protobuf-compiler ENV CARGO_PROFILE_RELEASE_DEBUG=1 ENV JSON_LOGS=1 diff --git a/e2e/test/automine/e2e-json-rpc.test.ts b/e2e/test/automine/e2e-json-rpc.test.ts index d35558db0..2422648d8 100644 --- a/e2e/test/automine/e2e-json-rpc.test.ts +++ b/e2e/test/automine/e2e-json-rpc.test.ts @@ -16,6 +16,7 @@ import { sendReset, subscribeAndGetEvent, subscribeAndGetEventWithContract, + ONE, } from "../helpers/rpc"; describe("JSON-RPC", () => { @@ -66,7 +67,7 @@ describe("JSON-RPC", () => { let gas = await send("eth_estimateGas", [tx]); expect(gas).match(HEX_PATTERN, "format"); - const gasDec = parseInt(gas, 16); + const gasDec = parseInt(gas, 16); expect(gasDec).to.be.greaterThan(0).and.lessThan(1_000_000); }); }); @@ -85,6 +86,9 @@ describe("JSON-RPC", () => { describe("Block", () => { it("eth_blockNumber", async function () { (await sendExpect("eth_blockNumber")).eq(ZERO); + await sendEvmMine(); + (await sendExpect("eth_blockNumber")).eq(ONE); + await sendReset(); }); it("eth_getBlockByNumber", async function () { let block: Block = await send("eth_getBlockByNumber", [ZERO, true]); diff --git a/proto/raft.proto b/proto/raft.proto new file mode 100644 index 000000000..2a0d70f2d --- /dev/null +++ b/proto/raft.proto @@ -0,0 +1,20 @@ +syntax = "proto3"; + +package raft; + +message Entry { + uint64 index = 1; + string data = 2; +} + +message AppendEntriesRequest { + repeated Entry entries = 1; +} + +message AppendEntriesResponse { + bool success = 1; +} + +service RaftService { + rpc AppendEntries(AppendEntriesRequest) returns (AppendEntriesResponse); +} diff --git a/src/bin/importer_offline.rs b/src/bin/importer_offline.rs index 6ead9c00b..cf0cd7b3f 100644 --- a/src/bin/importer_offline.rs +++ b/src/bin/importer_offline.rs @@ -19,6 +19,7 @@ use stratus::eth::storage::InMemoryPermanentStorage; use stratus::eth::storage::StratusStorage; use stratus::eth::BlockMiner; use stratus::eth::Executor; +use stratus::utils::calculate_tps_and_bpm; use stratus::GlobalServices; use stratus::GlobalState; use tokio::runtime::Handle; @@ -175,21 +176,15 @@ async fn execute_block_importer( } } - let seconds_elapsed = match instant_before_execution.elapsed().as_secs() as usize { - // avoid division by zero - 0 => 1, - non_zero => non_zero, - }; - let tps = transaction_count.checked_div(seconds_elapsed).unwrap_or(transaction_count); - let minutes_elapsed = seconds_elapsed as f64 / 60.0; - let blocks_per_minute = blocks_len as f64 / minutes_elapsed; + let duration = instant_before_execution.elapsed(); + let (tps, bpm) = calculate_tps_and_bpm(duration, transaction_count, blocks_len); + tracing::info!( tps, - blocks_per_minute = format_args!("{blocks_per_minute:.2}"), - seconds_elapsed, + blocks_per_minute = format_args!("{bpm:.2}"), + ?duration, %block_start, %block_end, - transaction_count, receipts = receipts.len(), "reexecuted blocks batch", ); diff --git a/src/bin/importer_online.rs b/src/bin/importer_online.rs index 9b3c095db..2a816772b 100644 --- a/src/bin/importer_online.rs +++ b/src/bin/importer_online.rs @@ -22,12 +22,14 @@ use stratus::infra::metrics; use stratus::infra::tracing::warn_task_rx_closed; use stratus::infra::tracing::warn_task_tx_closed; use stratus::infra::BlockchainClient; +use stratus::utils::calculate_tps; use stratus::GlobalServices; use stratus::GlobalState; use tokio::sync::mpsc; use tokio::task::yield_now; use tokio::time::sleep; use tokio::time::timeout; +use tokio::time::Instant; // ----------------------------------------------------------------------------- // Globals @@ -134,11 +136,24 @@ async fn start_block_executor(executor: Arc, miner: Arc, m // execute and mine let receipts = ExternalReceipts::from(receipts); - tracing::info!(number = %block.number(), txs_len = block.transactions.len(), "reexecuting external block"); + let instant_before_execution = Instant::now(); + if executor.reexecute_external(&block, &receipts).await.is_err() { GlobalState::shutdown_from(TASK_NAME, "failed to re-execute external block"); return; }; + + let duration = instant_before_execution.elapsed(); + let tps = calculate_tps(duration, block.transactions.len()); + + tracing::info!( + tps, + ?duration, + block_number = ?block.number(), + receipts = receipts.len(), + "reexecuted external block", + ); + if miner.mine_external_mixed_and_commit().await.is_err() { GlobalState::shutdown_from(TASK_NAME, "failed to mine external block"); return; diff --git a/src/eth/block_miner.rs b/src/eth/block_miner.rs index 5538a44da..1bd7686fc 100644 --- a/src/eth/block_miner.rs +++ b/src/eth/block_miner.rs @@ -89,6 +89,11 @@ impl BlockMiner { let tx_hash = tx_execution.hash(); self.storage.save_execution(tx_execution.clone()).await?; + if let Some(consensus) = &self.consensus { + let execution = format!("{:?}", tx_execution.clone()); + consensus.sender.send(execution).await.unwrap(); + } + // decide what to do based on mining mode match self.mode { // * do not consensus transactions @@ -101,10 +106,6 @@ impl BlockMiner { // * consensus transactions // * notify pending transactions BlockMinerMode::Interval(_) => { - if let Some(consensus) = &self.consensus { - let execution = format!("{:?}", tx_execution.clone()); - consensus.sender.send(execution).await.unwrap(); - } let _ = self.notifier_pending_txs.send(tx_hash); } // * do nothing, the caller will decide what to do diff --git a/src/eth/consensus.rs b/src/eth/consensus.rs index e28acc580..f7652e805 100644 --- a/src/eth/consensus.rs +++ b/src/eth/consensus.rs @@ -6,29 +6,49 @@ use k8s_openapi::api::core::v1::Pod; use kube::api::Api; use kube::api::ListParams; use kube::Client; +use raft::raft_service_server::RaftService; +use raft::raft_service_server::RaftServiceServer; use serde::Deserialize; use serde::Serialize; use tokio::sync::mpsc::Sender; use tokio::sync::mpsc::{self}; use tokio::time::sleep; +use tonic::transport::Channel; +use tonic::transport::Server; +use tonic::Request; +use tonic::Response; +use tonic::Status; + +pub mod raft { + tonic::include_proto!("raft"); +} +use raft::raft_service_client::RaftServiceClient; +use raft::AppendEntriesRequest; +use raft::AppendEntriesResponse; +use raft::Entry; use crate::config::RunWithImporterConfig; use crate::infra::metrics; -use crate::infra::BlockchainClient; const RETRY_ATTEMPTS: u32 = 3; const RETRY_DELAY: Duration = Duration::from_millis(10); #[derive(Debug, Serialize, Deserialize, Clone)] -pub struct Entry { +pub struct LogEntry { index: u64, data: String, } +#[derive(Clone)] +struct Peer { + address: String, + client: RaftServiceClient, +} + pub struct Consensus { pub sender: Sender, - leader_name: String, - //XXX current_index: AtomicU64, + leader_name: String, //XXX check the peers instead of using it + //XXX current_index: AtomicU64, } impl Consensus { @@ -55,7 +75,7 @@ impl Consensus { tracing::info!( "Discovered followers: {}", - followers.iter().map(|f| f.to_string()).collect::>().join(", ") + followers.iter().map(|f| f.address.to_string()).collect::>().join(", ") ); while let Some(data) = receiver.recv().await { @@ -66,19 +86,20 @@ impl Consensus { //TODO use gRPC instead of jsonrpc //FIXME for now, this has no colateral efects, but it will have in the future - //XXX match Self::append_entries_to_followers(vec![Entry { index: 0, data: data.clone() }], followers.clone()).await { - //XXX Ok(_) => { - //XXX tracing::info!("Data sent to followers: {}", data); - //XXX } - //XXX Err(e) => { - //XXX //TODO rediscover followers on comunication error - //XXX tracing::error!("Failed to send data to followers: {}", e); - //XXX } - //XXX } + match Self::append_entries_to_followers(vec![LogEntry { index: 0, data: data.clone() }], followers.clone()).await { + Ok(_) => { + tracing::info!("Data sent to followers: {}", data); + } + Err(e) => { + //TODO rediscover followers on comunication error + tracing::error!("Failed to send data to followers: {}", e); + } + } } } }); + Self::initialize_server(); Self { leader_name, sender } } @@ -97,6 +118,17 @@ impl Consensus { } } + fn initialize_server() { + tokio::spawn(async move { + tracing::info!("Starting consensus module server at port 3777"); + let addr = "0.0.0.0:3777".parse().unwrap(); + + let raft_service = RaftServiceImpl; + + Server::builder().add_service(RaftServiceServer::new(raft_service)).serve(addr).await.unwrap(); + }); + } + //FIXME TODO automate the way we gather the leader, instead of using a env var pub fn is_leader(leader_name: String) -> bool { Self::current_node().unwrap_or("".to_string()) == leader_name @@ -131,7 +163,7 @@ impl Consensus { } #[tracing::instrument(skip_all)] - pub async fn discover_followers() -> Result, anyhow::Error> { + pub async fn discover_followers() -> Result, anyhow::Error> { let client = Client::try_default().await?; let pods: Api = Api::namespaced(client, &Self::current_namespace().unwrap_or("default".to_string())); @@ -143,7 +175,11 @@ impl Consensus { if let Some(pod_name) = p.metadata.name { if pod_name != Self::current_node().unwrap() { if let Some(namespace) = Self::current_namespace() { - followers.push(format!("http://{}.stratus-api.{}.svc.cluster.local:3000", pod_name, namespace)); + let address = format!("http://{}.stratus-api.{}.svc.cluster.local:3777", pod_name, namespace); + let client = RaftServiceClient::connect(address.clone()).await?; + + let peer = Peer { address, client }; + followers.push(peer); } } } @@ -152,21 +188,37 @@ impl Consensus { Ok(followers) } - #[tracing::instrument(skip_all)] - async fn append_entries(follower: &str, entries: Vec) -> Result<(), anyhow::Error> { + async fn append_entries(mut follower: Peer, entries: Vec) -> Result<(), anyhow::Error> { #[cfg(feature = "metrics")] let start = metrics::now(); - let client = BlockchainClient::new_http_ws(follower, None, Duration::from_secs(2)).await?; - for attempt in 1..=RETRY_ATTEMPTS { - let response = client.send_append_entries(entries.clone()).await; + let grpc_entries: Vec = entries + .iter() + .map(|e| Entry { + index: e.index, + data: e.data.clone(), + }) + .collect(); + + let request = Request::new(AppendEntriesRequest { entries: grpc_entries }); + let response = follower.client.append_entries(request).await; + match response { - Ok(resp) => { - tracing::debug!("Entries appended to follower {}: attempt {}: {:?}", follower, attempt, resp); - return Ok(()); - } - Err(e) => tracing::error!("Error appending entries to follower {}: attempt {}: {:?}", follower, attempt, e), + Ok(resp) => + if resp.into_inner().success { + #[cfg(not(feature = "metrics"))] + tracing::debug!("Entries appended to follower {}: attempt {}: success", follower.address, attempt); + #[cfg(feature = "metrics")] + tracing::debug!( + "Entries appended to follower {}: attempt {}: success time_elapsed: {:?}", + follower.address, + attempt, + start.elapsed() + ); + return Ok(()); + }, + Err(e) => tracing::error!("Error appending entries to follower {}: attempt {}: {:?}", follower.address, attempt, e), } sleep(RETRY_DELAY).await; } @@ -174,18 +226,17 @@ impl Consensus { #[cfg(feature = "metrics")] metrics::inc_append_entries(start.elapsed()); - Err(anyhow!("Failed to append entries to {} after {} attempts", follower, RETRY_ATTEMPTS)) + Err(anyhow!("Failed to append entries to {} after {} attempts", follower.address, RETRY_ATTEMPTS)) } #[tracing::instrument(skip_all)] - pub async fn append_entries_to_followers(entries: Vec, followers: Vec) -> Result<(), anyhow::Error> { + pub async fn append_entries_to_followers(entries: Vec, followers: Vec) -> Result<(), anyhow::Error> { #[cfg(feature = "metrics")] let start = metrics::now(); - for entry in entries { for follower in &followers { - if let Err(e) = Self::append_entries(follower, vec![entry.clone()]).await { - tracing::debug!("Error appending entry to follower {}: {:?}", follower, e); + if let Err(e) = Self::append_entries(follower.clone(), vec![entry.clone()]).await { + tracing::debug!("Error appending entry to follower {}: {:?}", follower.address, e); } } } @@ -196,3 +247,20 @@ impl Consensus { Ok(()) } } + +pub struct RaftServiceImpl; + +#[tonic::async_trait] +impl RaftService for RaftServiceImpl { + async fn append_entries(&self, request: Request) -> Result, Status> { + let entries = request.into_inner().entries; + // Process the entries here + + // For example, let's just print the entries + for entry in entries { + println!("Received entry: {:?}", entry); + } + + Ok(Response::new(AppendEntriesResponse { success: true })) + } +} diff --git a/src/eth/primitives/transaction_input.rs b/src/eth/primitives/transaction_input.rs index 8388bbf66..aff2990ed 100644 --- a/src/eth/primitives/transaction_input.rs +++ b/src/eth/primitives/transaction_input.rs @@ -34,6 +34,10 @@ use crate::log_and_err; #[derive(DebugAsJson, Clone, Default, PartialEq, Eq, serde::Serialize, serde::Deserialize)] pub struct TransactionInput { + /// This is needed for relaying transactions correctly, a transaction sent as Legacy should + /// be relayed using rlp to the legacy format, the same is true for the other possible formats. + /// Otherwise we'd need to re-sign the transactions to always encode in the same format. + pub tx_type: Option, /// TODO: Optional for external/older transactions, but it should be required for newer transactions. /// /// Maybe TransactionInput should be split into two structs for representing these two different requirements. @@ -72,6 +76,7 @@ impl TransactionInput { impl Dummy for TransactionInput { fn dummy_with_rng(faker: &Faker, rng: &mut R) -> Self { Self { + tx_type: Some(rng.next_u64().into()), chain_id: faker.fake_with_rng(rng), hash: faker.fake_with_rng(rng), nonce: faker.fake_with_rng(rng), @@ -141,6 +146,7 @@ fn try_from_ethers_transaction(value: EthersTransaction, compute_signer: bool) - }; Ok(TransactionInput { + tx_type: value.transaction_type, chain_id: match value.chain_id { Some(chain_id) => Some(chain_id.try_into()?), None => None, @@ -179,10 +185,10 @@ impl From for EthersTransaction { v: value.v, r: value.r, s: value.s, + transaction_type: value.tx_type, block_hash: None, block_number: None, transaction_index: None, - transaction_type: None, access_list: None, max_priority_fee_per_gas: None, max_fee_per_gas: None, diff --git a/src/eth/primitives/transaction_mined.rs b/src/eth/primitives/transaction_mined.rs index e70cecf3e..c29f02ead 100644 --- a/src/eth/primitives/transaction_mined.rs +++ b/src/eth/primitives/transaction_mined.rs @@ -99,6 +99,7 @@ impl From for EthersTransaction { v: input.v, r: input.r, s: input.s, + transaction_type: input.tx_type, ..Default::default() } } diff --git a/src/eth/storage/postgres_permanent/types.rs b/src/eth/storage/postgres_permanent/types.rs index 06993a137..171062515 100644 --- a/src/eth/storage/postgres_permanent/types.rs +++ b/src/eth/storage/postgres_permanent/types.rs @@ -78,6 +78,7 @@ impl PostgresTransaction { r: self.r.into(), s: self.s.into(), value: self.value, + tx_type: None, }; TransactionMined { transaction_index: self.idx_in_block, diff --git a/src/eth/storage/rocks/rocks_state.rs b/src/eth/storage/rocks/rocks_state.rs index 4bfd9b102..d1c0a3e3b 100644 --- a/src/eth/storage/rocks/rocks_state.rs +++ b/src/eth/storage/rocks/rocks_state.rs @@ -4,9 +4,9 @@ use std::sync::atomic::AtomicU64; use std::sync::Arc; use anyhow::anyhow; +use anyhow::Context; use futures::future::join_all; use itertools::Itertools; -use num_traits::cast::ToPrimitive; use tokio::sync::mpsc; use tokio::task; use tokio::task::JoinHandle; @@ -109,9 +109,9 @@ impl RocksStorageState { } pub fn preload_block_number(&self) -> anyhow::Result { - let account_block_number = self.accounts.get_current_block_number(); - - Ok((account_block_number.to_u64().unwrap_or(0u64)).into()) + let block_number = self.blocks_by_number.last().map(|(num, _)| num).unwrap_or_default(); + tracing::error!(?block_number); + Ok((u64::from(block_number)).into()) } pub async fn sync_data(&self) -> anyhow::Result<()> { @@ -442,17 +442,17 @@ impl RocksStorageState { pub fn read_transaction(&self, tx_hash: &Hash) -> anyhow::Result> { match self.transactions.get(&(*tx_hash).into()) { - Some(transaction) => match self.blocks_by_number.get(&transaction) { + Some(block_number) => match self.blocks_by_number.get(&block_number) { Some(block) => { tracing::trace!(%tx_hash, "transaction found"); match block.transactions.into_iter().find(|tx| &Hash::from(tx.input.hash) == tx_hash) { Some(tx) => Ok(Some(tx.into())), - None => log_and_err!("transaction was not found in block"), + None => log_and_err!("transaction was not found in block") + .with_context(|| format!("block_number = {:?} tx_hash = {}", block_number, tx_hash)), } } - None => { - log_and_err!("the block that the transaction was supposed to be in was not found") - } + None => log_and_err!("the block that the transaction was supposed to be in was not found") + .with_context(|| format!("block_number = {:?} tx_hash = {}", block_number, tx_hash)), }, None => Ok(None), } @@ -468,7 +468,7 @@ impl RocksStorageState { }) .map(|((tx_hash, _), _)| match self.read_transaction(&tx_hash.into()) { Ok(Some(tx)) => Ok(tx.logs), - Ok(None) => Err(anyhow!("the transaction the log was supposed to be in was not found")), + Ok(None) => Err(anyhow!("the transaction the log was supposed to be in was not found")).with_context(|| format!("tx_hash = {:?}", tx_hash)), Err(err) => Err(err), }) .flatten_ok() diff --git a/src/eth/storage/rocks/types.rs b/src/eth/storage/rocks/types.rs index ea54c75bb..e3712e7ce 100644 --- a/src/eth/storage/rocks/types.rs +++ b/src/eth/storage/rocks/types.rs @@ -229,6 +229,12 @@ impl From for BlockNumber { } } +impl From for u64 { + fn from(value: BlockNumberRocksdb) -> Self { + value.0.as_u64() + } +} + #[derive(Clone, Default, Hash, Eq, PartialEq, PartialOrd, Ord, serde::Serialize, serde::Deserialize)] pub struct SlotIndexRocksdb(U256); @@ -482,6 +488,7 @@ impl From for TransactionInput { v: item.v, r: item.r, s: item.s, + tx_type: None, } } } diff --git a/src/infra/blockchain_client/blockchain_client.rs b/src/infra/blockchain_client/blockchain_client.rs index a37d9c6a5..3da94ffb5 100644 --- a/src/infra/blockchain_client/blockchain_client.rs +++ b/src/infra/blockchain_client/blockchain_client.rs @@ -6,11 +6,14 @@ use ethers_core::types::Transaction; use jsonrpsee::core::client::ClientT; use jsonrpsee::core::client::Subscription; use jsonrpsee::core::client::SubscriptionClientT; +use jsonrpsee::core::ClientError; use jsonrpsee::http_client::HttpClient; use jsonrpsee::http_client::HttpClientBuilder; use jsonrpsee::ws_client::WsClient; use jsonrpsee::ws_client::WsClientBuilder; use serde_json::Value as JsonValue; +use tokio::sync::RwLock; +use tokio::sync::RwLockReadGuard; use super::pending_transaction::PendingTransaction; use crate::eth::primitives::Address; @@ -22,14 +25,15 @@ use crate::eth::primitives::SlotIndex; use crate::eth::primitives::SlotValue; use crate::eth::primitives::StoragePointInTime; use crate::eth::primitives::Wei; +use crate::ext::DisplayExt; use crate::log_and_err; #[derive(Debug)] pub struct BlockchainClient { http: HttpClient, - pub http_url: String, - ws: Option, - pub ws_url: Option, + ws: Option>, + ws_url: Option, + timeout: Duration, } impl BlockchainClient { @@ -43,7 +47,7 @@ impl BlockchainClient { tracing::info!(%http_url, "starting blockchain client"); // build http provider - let http = match HttpClientBuilder::default().request_timeout(timeout).build(http_url) { + let http = match Self::build_http_client(http_url, timeout) { Ok(http) => http, Err(e) => { tracing::error!(reason = ?e, url = %http_url, "failed to create blockchain http client"); @@ -53,8 +57,8 @@ impl BlockchainClient { // build ws provider let (ws, ws_url) = if let Some(ws_url) = ws_url { - match WsClientBuilder::new().connection_timeout(timeout).build(ws_url).await { - Ok(ws) => (Some(ws), Some(ws_url.to_string())), + match Self::build_ws_client(ws_url, timeout).await { + Ok(ws) => (Some(RwLock::new(ws)), Some(ws_url.to_string())), Err(e) => { tracing::error!(reason = ?e, url = %ws_url, "failed to create blockchain websocket client"); return Err(e).context("failed to create blockchain websocket client"); @@ -64,12 +68,7 @@ impl BlockchainClient { (None, None) }; - let client = Self { - http, - http_url: http_url.to_string(), - ws, - ws_url, - }; + let client = Self { http, ws, ws_url, timeout }; // check health before assuming it is ok client.fetch_listening().await?; @@ -77,6 +76,28 @@ impl BlockchainClient { Ok(client) } + fn build_http_client(url: &str, timeout: Duration) -> anyhow::Result { + tracing::info!(%url, timeout = %timeout.to_string_ext(), "creating blockchain http client"); + match HttpClientBuilder::default().request_timeout(timeout).build(url) { + Ok(http) => Ok(http), + Err(e) => { + tracing::error!(reason = ?e, %url, timeout = %timeout.to_string_ext(), "failed to create blockchain http client"); + Err(e).context("failed to create blockchain http client") + } + } + } + + async fn build_ws_client(url: &str, timeout: Duration) -> anyhow::Result { + tracing::info!(%url, timeout = %timeout.to_string_ext(), "creating blockchain websocket client"); + match WsClientBuilder::new().connection_timeout(timeout).build(url).await { + Ok(ws) => Ok(ws), + Err(e) => { + tracing::error!(reason = ?e, %url, timeout = %timeout.to_string_ext(), "failed to create blockchain websocket client"); + Err(e).context("failed to create blockchain websocket client") + } + } + } + // ------------------------------------------------------------------------- // Websocket // ------------------------------------------------------------------------- @@ -87,9 +108,9 @@ impl BlockchainClient { } /// Validates it is connected to websocket and returns a reference to the websocket client. - fn require_ws(&self) -> anyhow::Result<&WsClient> { + async fn require_ws(&self) -> anyhow::Result> { match &self.ws { - Some(ws) => Ok(ws), + Some(ws) => Ok(ws.read().await), None => log_and_err!("blockchain client not connected to websocket"), } } @@ -209,19 +230,6 @@ impl BlockchainClient { // RPC mutations // ------------------------------------------------------------------------- - /// Appends entries to followers. - pub async fn send_append_entries(&self, entries: Vec) -> anyhow::Result<()> { - tracing::debug!(?entries, "appending entries"); - - let entries = serde_json::to_value(entries)?; - let result = self.http.request::<(), Vec>("stratus_appendEntries", vec![entries]).await; - - match result { - Ok(_) => Ok(()), - Err(e) => log_and_err!(reason = e, "failed to send append entries"), - } - } - /// Sends a signed transaction. pub async fn send_raw_transaction(&self, hash: Hash, tx: Bytes) -> anyhow::Result> { tracing::debug!(%hash, "sending raw transaction"); @@ -241,15 +249,38 @@ impl BlockchainClient { pub async fn subscribe_new_heads(&self) -> anyhow::Result> { tracing::debug!("subscribing to newHeads event"); + let ws = self.require_ws().await?; + + let mut first_attempt = true; + loop { + let result = ws + .subscribe::>("eth_subscribe", vec![JsonValue::String("newHeads".to_owned())], "eth_unsubscribe") + .await; + + match result { + // subscribed + Ok(sub) => return Ok(sub), + + // failed and need to reconnect + e @ Err(ClientError::RestartNeeded(_)) => { + // will try to reconnect websocket client only in first attempt + if first_attempt { + tracing::error!(%first_attempt, reason = ?e, "failed to subscribe to newHeads event. trying to reconnect websocket client now."); + } else { + tracing::error!(%first_attempt, reason = ?e, "failed to subscribe to newHeads event. will not try to reconnect websocket client."); + return e.context("failed to subscribe to newHeads event"); + } + first_attempt = false; + + // reconnect websocket client + let new_ws_client = Self::build_ws_client(self.ws_url.as_ref().unwrap(), self.timeout).await?; + let mut current_ws_client = self.ws.as_ref().unwrap().write().await; + let _ = std::mem::replace(&mut *current_ws_client, new_ws_client); + } - let ws = self.require_ws()?; - let result = ws - .subscribe::>("eth_subscribe", vec![JsonValue::String("newHeads".to_owned())], "eth_unsubscribe") - .await; - - match result { - Ok(sub) => Ok(sub), - Err(e) => log_and_err!(reason = e, "failed to subscribe to newHeads event"), + // failed and cannot do anything + Err(e) => return log_and_err!(reason = e, "failed to subscribe to newHeads event"), + } } } } diff --git a/src/utils.rs b/src/utils.rs index db5c5d3dd..36a5c1bf9 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -1,3 +1,5 @@ +use std::time::Duration; + use tokio::select; use tokio::signal::unix::signal; use tokio::signal::unix::SignalKind; @@ -36,3 +38,15 @@ pub async fn spawn_signal_handler() -> anyhow::Result<()> { Ok(()) } + +pub fn calculate_tps_and_bpm(duration: Duration, transaction_count: usize, block_count: usize) -> (f64, f64) { + let seconds_elapsed = duration.as_secs_f64() + f64::EPSILON; + let tps = transaction_count as f64 / seconds_elapsed; + let blocks_per_minute = block_count as f64 / (seconds_elapsed / 60.0); + (tps, blocks_per_minute) +} + +pub fn calculate_tps(duration: Duration, transaction_count: usize) -> f64 { + let seconds_elapsed = duration.as_secs_f64() + f64::EPSILON; + transaction_count as f64 / seconds_elapsed +}