From 70993d4805f257aa9ebaa3612615b511c10b6492 Mon Sep 17 00:00:00 2001 From: carneiro-cw <156914855+carneiro-cw@users.noreply.github.com> Date: Mon, 27 May 2024 11:24:01 -0300 Subject: [PATCH 01/11] chore: add more context to rocks_state.rs errors (#930) --- src/eth/storage/rocks/rocks_state.rs | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/src/eth/storage/rocks/rocks_state.rs b/src/eth/storage/rocks/rocks_state.rs index 4bfd9b102..794e24484 100644 --- a/src/eth/storage/rocks/rocks_state.rs +++ b/src/eth/storage/rocks/rocks_state.rs @@ -4,6 +4,7 @@ use std::sync::atomic::AtomicU64; use std::sync::Arc; use anyhow::anyhow; +use anyhow::Context; use futures::future::join_all; use itertools::Itertools; use num_traits::cast::ToPrimitive; @@ -442,17 +443,17 @@ impl RocksStorageState { pub fn read_transaction(&self, tx_hash: &Hash) -> anyhow::Result> { match self.transactions.get(&(*tx_hash).into()) { - Some(transaction) => match self.blocks_by_number.get(&transaction) { + Some(block_number) => match self.blocks_by_number.get(&block_number) { Some(block) => { tracing::trace!(%tx_hash, "transaction found"); match block.transactions.into_iter().find(|tx| &Hash::from(tx.input.hash) == tx_hash) { Some(tx) => Ok(Some(tx.into())), - None => log_and_err!("transaction was not found in block"), + None => log_and_err!("transaction was not found in block") + .with_context(|| format!("block_number = {:?} tx_hash = {}", block_number, tx_hash)), } } - None => { - log_and_err!("the block that the transaction was supposed to be in was not found") - } + None => log_and_err!("the block that the transaction was supposed to be in was not found") + .with_context(|| format!("block_number = {:?} tx_hash = {}", block_number, tx_hash)), }, None => Ok(None), } @@ -468,7 +469,7 @@ impl RocksStorageState { }) .map(|((tx_hash, _), _)| match self.read_transaction(&tx_hash.into()) { Ok(Some(tx)) => Ok(tx.logs), - Ok(None) => Err(anyhow!("the transaction the log was supposed to be in was not found")), + Ok(None) => Err(anyhow!("the transaction the log was supposed to be in was not found")).with_context(|| format!("tx_hash = {:?}", tx_hash)), Err(err) => Err(err), }) .flatten_ok() From 5ca45846a72bdce0ed3d684250790d12127e6b0f Mon Sep 17 00:00:00 2001 From: renancloudwalk <53792026+renancloudwalk@users.noreply.github.com> Date: Mon, 27 May 2024 13:48:03 -0300 Subject: [PATCH 02/11] chore: implementing tonic as comm between pods (#929) * chore: implementing tonic as comm between pods * lint * lint * chore: add protoc * chore: add missing proto file * chore: add missing proto file * chore: use tonic for communication * lint * chore: add protoc to missing github calls * chore: track time appending to followers * chore: change metrics measure * chore: add protoc to missing test --- .dockerignore | 1 + .github/workflows/_setup-e2e.yml | 5 +- .github/workflows/comment-tag-report.yml | 4 +- .github/workflows/doc-test.yml | 5 +- .github/workflows/e2e-contracts-postgres.yml | 3 + .github/workflows/e2e-contracts-rocks.yml | 3 + .github/workflows/e2e-contracts.yml | 3 + .github/workflows/int-test.yml | 3 + .github/workflows/lint-check.yml | 5 +- .github/workflows/outdated.yml | 8 +- .github/workflows/unit-test.yml | 5 +- Cargo.lock | 69 ++++++++++++ Cargo.toml | 3 + build.rs | 1 + docker/Dockerfile.run_with_importer | 3 +- proto/raft.proto | 20 ++++ src/eth/block_miner.rs | 9 +- src/eth/consensus.rs | 105 ++++++++++++++---- .../blockchain_client/blockchain_client.rs | 13 --- 19 files changed, 219 insertions(+), 49 deletions(-) create mode 100644 proto/raft.proto diff --git a/.dockerignore b/.dockerignore index 009d53ecd..3ff41081f 100644 --- a/.dockerignore +++ b/.dockerignore @@ -12,3 +12,4 @@ !static/ !.sqlx/ !build.rs +!proto diff --git a/.github/workflows/_setup-e2e.yml b/.github/workflows/_setup-e2e.yml index 8fc295967..b31c680fd 100644 --- a/.github/workflows/_setup-e2e.yml +++ b/.github/workflows/_setup-e2e.yml @@ -42,6 +42,9 @@ jobs: if: ${{ steps.cache-cargo.outputs.cache-hit != 'true' }} run: curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y + - name: Install protoc + run: sudo apt-get install -y protobuf-compiler + - name: Set up Just uses: extractions/setup-just@v1 @@ -68,4 +71,4 @@ jobs: env: CARGO_PROFILE_RELEASE_DEBUG: 0 RUST_LOG: off - RELEASE: 1 \ No newline at end of file + RELEASE: 1 diff --git a/.github/workflows/comment-tag-report.yml b/.github/workflows/comment-tag-report.yml index 828776375..2252b106b 100644 --- a/.github/workflows/comment-tag-report.yml +++ b/.github/workflows/comment-tag-report.yml @@ -20,6 +20,9 @@ jobs: - name: Set up Rust run: curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y + - name: Install protoc + run: sudo apt-get install -y protobuf-compiler + - name: Setup Ruby uses: ruby/setup-ruby@v1 with: @@ -39,4 +42,3 @@ jobs: run: ruby utils/slack-notifiers/comments.rb env: SLACK_WEBHOOK_URL: ${{ secrets.STRATUS_SLACK_WEBHOOK_URL }} - \ No newline at end of file diff --git a/.github/workflows/doc-test.yml b/.github/workflows/doc-test.yml index 115424210..4c6cbb0d1 100644 --- a/.github/workflows/doc-test.yml +++ b/.github/workflows/doc-test.yml @@ -47,8 +47,11 @@ jobs: if: ${{ steps.cache-cargo.outputs.cache-hit != 'true' }} run: curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y + - name: Install protoc + run: sudo apt-get install -y protobuf-compiler + - name: Set up Just uses: extractions/setup-just@v1 - name: Test docs - run: just test-doc \ No newline at end of file + run: just test-doc diff --git a/.github/workflows/e2e-contracts-postgres.yml b/.github/workflows/e2e-contracts-postgres.yml index 9496f8c90..e30188749 100644 --- a/.github/workflows/e2e-contracts-postgres.yml +++ b/.github/workflows/e2e-contracts-postgres.yml @@ -59,6 +59,9 @@ jobs: if: ${{ steps.cache-cargo.outputs.cache-hit != 'true' }} run: curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y + - name: Install protoc + run: sudo apt-get install -y protobuf-compiler + - name: Set up Just uses: extractions/setup-just@v1 diff --git a/.github/workflows/e2e-contracts-rocks.yml b/.github/workflows/e2e-contracts-rocks.yml index cac7b4e46..9654a46b9 100644 --- a/.github/workflows/e2e-contracts-rocks.yml +++ b/.github/workflows/e2e-contracts-rocks.yml @@ -59,6 +59,9 @@ jobs: if: ${{ steps.cache-cargo.outputs.cache-hit != 'true' }} run: curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y + - name: Install protoc + run: sudo apt-get install -y protobuf-compiler + - name: Set up Just uses: extractions/setup-just@v1 diff --git a/.github/workflows/e2e-contracts.yml b/.github/workflows/e2e-contracts.yml index 225116f23..f8faf3c9d 100644 --- a/.github/workflows/e2e-contracts.yml +++ b/.github/workflows/e2e-contracts.yml @@ -59,6 +59,9 @@ jobs: if: ${{ steps.cache-cargo.outputs.cache-hit != 'true' }} run: curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y + - name: Install protoc + run: sudo apt-get install -y protobuf-compiler + - name: Set up Just uses: extractions/setup-just@v1 diff --git a/.github/workflows/int-test.yml b/.github/workflows/int-test.yml index f254e2404..94f2cc122 100644 --- a/.github/workflows/int-test.yml +++ b/.github/workflows/int-test.yml @@ -55,6 +55,9 @@ jobs: if: ${{ steps.cache-cargo.outputs.cache-hit != 'true' }} run: curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y + - name: Install protoc + run: sudo apt-get install -y protobuf-compiler + - name: Set up Just uses: extractions/setup-just@v1 diff --git a/.github/workflows/lint-check.yml b/.github/workflows/lint-check.yml index aafc1497a..4eaf6c181 100644 --- a/.github/workflows/lint-check.yml +++ b/.github/workflows/lint-check.yml @@ -53,8 +53,11 @@ jobs: - name: Set up Rust Nightly run: curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y --default-toolchain nightly-2024-01-01 + - name: Install protoc + run: sudo apt-get install -y protobuf-compiler + - name: Set up Just uses: extractions/setup-just@v1 - name: Just lint-check - run: just lint-check -2024-01-01 \ No newline at end of file + run: just lint-check -2024-01-01 diff --git a/.github/workflows/outdated.yml b/.github/workflows/outdated.yml index 2b2cc6586..43b82d74a 100644 --- a/.github/workflows/outdated.yml +++ b/.github/workflows/outdated.yml @@ -18,8 +18,11 @@ jobs: uses: actions/checkout@v2 - name: Set up Rust - run: curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y - + run: curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y + + - name: Install protoc + run: sudo apt-get install -y protobuf-compiler + - name: Set up Just uses: extractions/setup-just@v1 @@ -36,4 +39,3 @@ jobs: env: SLACK_WEBHOOK_URL: ${{ secrets.STRATUS_SLACK_WEBHOOK_URL }} OUTDATED_TABLE_FILE_NAME: outdated.txt - \ No newline at end of file diff --git a/.github/workflows/unit-test.yml b/.github/workflows/unit-test.yml index dda207994..058350e94 100644 --- a/.github/workflows/unit-test.yml +++ b/.github/workflows/unit-test.yml @@ -54,8 +54,11 @@ jobs: if: ${{ steps.cache-cargo.outputs.cache-hit != 'true' }} run: curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y + - name: Install protoc + run: sudo apt-get install -y protobuf-compiler + - name: Set up Just uses: extractions/setup-just@v1 - name: Unit tests - run: just test-unit \ No newline at end of file + run: just test-unit diff --git a/Cargo.lock b/Cargo.lock index 493b77159..f4e3ca5e1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1543,6 +1543,12 @@ dependencies = [ "static_assertions", ] +[[package]] +name = "fixedbitset" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80" + [[package]] name = "flate2" version = "1.0.30" @@ -2963,6 +2969,12 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "multimap" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "defc4c55412d89136f966bbb339008b474350e5e6e78d2714439c386b3137a03" + [[package]] name = "native-tls" version = "0.2.11" @@ -3464,6 +3476,16 @@ dependencies = [ "sha2", ] +[[package]] +name = "petgraph" +version = "0.6.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b4c5cc86750666a3ed20bdaf5ca2a0344f9c67674cae0515bec2da16fbaa47db" +dependencies = [ + "fixedbitset", + "indexmap 2.2.6", +] + [[package]] name = "phf" version = "0.11.2" @@ -3579,6 +3601,16 @@ version = "0.2.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" +[[package]] +name = "prettyplease" +version = "0.2.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5f12335488a2f3b0a83b14edad48dca9879ce89b2edd10e80237e4e852dd645e" +dependencies = [ + "proc-macro2", + "syn 2.0.60", +] + [[package]] name = "primitive-types" version = "0.12.2" @@ -3690,6 +3722,27 @@ dependencies = [ "prost-derive", ] +[[package]] +name = "prost-build" +version = "0.12.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "22505a5c94da8e3b7c2996394d1c933236c4d743e81a410bcca4e6989fc066a4" +dependencies = [ + "bytes", + "heck 0.5.0", + "itertools 0.12.1", + "log", + "multimap", + "once_cell", + "petgraph", + "prettyplease", + "prost", + "prost-types", + "regex", + "syn 2.0.60", + "tempfile", +] + [[package]] name = "prost-derive" version = "0.12.6" @@ -5341,6 +5394,7 @@ dependencies = [ "phf", "phf_codegen", "pin-project", + "prost", "quote", "raft", "rand", @@ -5363,6 +5417,8 @@ dependencies = [ "thiserror", "tokio", "tokio-util", + "tonic 0.11.0", + "tonic-build", "tower", "tracing", "tracing-opentelemetry", @@ -5861,6 +5917,19 @@ dependencies = [ "tracing", ] +[[package]] +name = "tonic-build" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "be4ef6dd70a610078cb4e338a0f79d06bc759ff1b22d2120c2ff02ae264ba9c2" +dependencies = [ + "prettyplease", + "proc-macro2", + "prost-build", + "quote", + "syn 2.0.60", +] + [[package]] name = "tower" version = "0.4.13" diff --git a/Cargo.toml b/Cargo.toml index ac4da9ea3..9956a45e1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -69,6 +69,8 @@ triehash = "=0.8.4" jsonrpsee = { version = "=0.22.4", features = ["server", "client"] } reqwest = { version = "=0.12.4", features = ["json"] } tower = "=0.4.13" +tonic = "=0.11.0" +prost = "=0.12.6" # observability console-subscriber = "=0.2.0" @@ -112,6 +114,7 @@ const-hex = "=1.10.0" glob = "=0.3.1" nom = "=7.1.3" phf_codegen = "=0.11.2" +tonic-build = "=0.11.0" # ------------------------------------------------------------------------------ # Binaries diff --git a/build.rs b/build.rs index fd20b4af0..533e7da3e 100644 --- a/build.rs +++ b/build.rs @@ -11,6 +11,7 @@ use nom::sequence::separated_pair; use nom::IResult; fn main() { + tonic_build::compile_protos("proto/raft.proto").unwrap(); // any code change println!("cargo:rerun-if-changed=src/"); // used in signatures codegen diff --git a/docker/Dockerfile.run_with_importer b/docker/Dockerfile.run_with_importer index b36e16a93..9a4c4c47c 100644 --- a/docker/Dockerfile.run_with_importer +++ b/docker/Dockerfile.run_with_importer @@ -10,9 +10,10 @@ COPY Cargo.toml /app/Cargo.toml COPY Cargo.lock /app/Cargo.lock COPY .cargo .cargo COPY config /app/config +COPY proto /app/proto RUN apt update -RUN apt-get install -y libclang-dev cmake +RUN apt-get install -y libclang-dev cmake protobuf-compiler ENV CARGO_PROFILE_RELEASE_DEBUG=1 ENV JSON_LOGS=1 diff --git a/proto/raft.proto b/proto/raft.proto new file mode 100644 index 000000000..2a0d70f2d --- /dev/null +++ b/proto/raft.proto @@ -0,0 +1,20 @@ +syntax = "proto3"; + +package raft; + +message Entry { + uint64 index = 1; + string data = 2; +} + +message AppendEntriesRequest { + repeated Entry entries = 1; +} + +message AppendEntriesResponse { + bool success = 1; +} + +service RaftService { + rpc AppendEntries(AppendEntriesRequest) returns (AppendEntriesResponse); +} diff --git a/src/eth/block_miner.rs b/src/eth/block_miner.rs index 5538a44da..1bd7686fc 100644 --- a/src/eth/block_miner.rs +++ b/src/eth/block_miner.rs @@ -89,6 +89,11 @@ impl BlockMiner { let tx_hash = tx_execution.hash(); self.storage.save_execution(tx_execution.clone()).await?; + if let Some(consensus) = &self.consensus { + let execution = format!("{:?}", tx_execution.clone()); + consensus.sender.send(execution).await.unwrap(); + } + // decide what to do based on mining mode match self.mode { // * do not consensus transactions @@ -101,10 +106,6 @@ impl BlockMiner { // * consensus transactions // * notify pending transactions BlockMinerMode::Interval(_) => { - if let Some(consensus) = &self.consensus { - let execution = format!("{:?}", tx_execution.clone()); - consensus.sender.send(execution).await.unwrap(); - } let _ = self.notifier_pending_txs.send(tx_hash); } // * do nothing, the caller will decide what to do diff --git a/src/eth/consensus.rs b/src/eth/consensus.rs index dc460e164..700fb527b 100644 --- a/src/eth/consensus.rs +++ b/src/eth/consensus.rs @@ -6,21 +6,34 @@ use k8s_openapi::api::core::v1::Pod; use kube::api::Api; use kube::api::ListParams; use kube::Client; +use raft::raft_service_server::RaftService; +use raft::raft_service_server::RaftServiceServer; use serde::Deserialize; use serde::Serialize; use tokio::sync::mpsc::Sender; use tokio::sync::mpsc::{self}; use tokio::time::sleep; +use tonic::transport::Server; +use tonic::Request; +use tonic::Response; +use tonic::Status; + +pub mod raft { + tonic::include_proto!("raft"); +} +use raft::raft_service_client::RaftServiceClient; +use raft::AppendEntriesRequest; +use raft::AppendEntriesResponse; +use raft::Entry; use crate::config::RunWithImporterConfig; use crate::infra::metrics; -use crate::infra::BlockchainClient; const RETRY_ATTEMPTS: u32 = 3; const RETRY_DELAY: Duration = Duration::from_millis(10); #[derive(Debug, Serialize, Deserialize, Clone)] -pub struct Entry { +pub struct LogEntry { index: u64, data: String, } @@ -66,19 +79,20 @@ impl Consensus { //TODO use gRPC instead of jsonrpc //FIXME for now, this has no colateral efects, but it will have in the future - //XXX match Self::append_entries_to_followers(vec![Entry { index: 0, data: data.clone() }], followers.clone()).await { - //XXX Ok(_) => { - //XXX tracing::info!("Data sent to followers: {}", data); - //XXX } - //XXX Err(e) => { - //XXX //TODO rediscover followers on comunication error - //XXX tracing::error!("Failed to send data to followers: {}", e); - //XXX } - //XXX } + match Self::append_entries_to_followers(vec![LogEntry { index: 0, data: data.clone() }], followers.clone()).await { + Ok(_) => { + tracing::info!("Data sent to followers: {}", data); + } + Err(e) => { + //TODO rediscover followers on comunication error + tracing::error!("Failed to send data to followers: {}", e); + } + } } } }); + Self::initialize_server(); Self { leader_name, sender } } @@ -97,6 +111,17 @@ impl Consensus { } } + fn initialize_server() { + tokio::spawn(async move { + tracing::info!("Starting consensus module server at port 3777"); + let addr = "0.0.0.0:3777".parse().unwrap(); + + let raft_service = RaftServiceImpl; + + Server::builder().add_service(RaftServiceServer::new(raft_service)).serve(addr).await.unwrap(); + }); + } + //FIXME TODO automate the way we gather the leader, instead of using a env var pub fn is_leader(leader_name: String) -> bool { Self::current_node().unwrap_or("".to_string()) == leader_name @@ -141,7 +166,7 @@ impl Consensus { if let Some(pod_name) = p.metadata.name { if pod_name != Self::current_node().unwrap() { if let Some(namespace) = Self::current_namespace() { - followers.push(format!("http://{}.stratus-api.{}.svc.cluster.local:3000", pod_name, namespace)); + followers.push(format!("http://{}.stratus-api.{}.svc.cluster.local:3777", pod_name, namespace)); } } } @@ -150,20 +175,38 @@ impl Consensus { Ok(followers) } - #[tracing::instrument(skip_all)] - async fn append_entries(follower: &str, entries: Vec) -> Result<(), anyhow::Error> { + async fn append_entries(follower: String, entries: Vec) -> Result<(), anyhow::Error> { #[cfg(feature = "metrics")] let start = metrics::now(); - let client = BlockchainClient::new_http_ws(follower, None, Duration::from_secs(2)).await?; + let mut client = RaftServiceClient::connect(follower.clone()).await?; for attempt in 1..=RETRY_ATTEMPTS { - let response = client.send_append_entries(entries.clone()).await; + let grpc_entries: Vec = entries + .iter() + .map(|e| Entry { + index: e.index, + data: e.data.clone(), + }) + .collect(); + + let request = Request::new(AppendEntriesRequest { entries: grpc_entries }); + let response = client.append_entries(request).await; + match response { - Ok(resp) => { - tracing::debug!("Entries appended to follower {}: attempt {}: {:?}", follower, attempt, resp); - return Ok(()); - } + Ok(resp) => + if resp.into_inner().success { + #[cfg(not(feature = "metrics"))] + tracing::debug!("Entries appended to follower {}: attempt {}: success", follower, attempt); + #[cfg(feature = "metrics")] + tracing::debug!( + "Entries appended to follower {}: attempt {}: success time_elapsed: {:?}", + follower, + attempt, + start.elapsed() + ); + return Ok(()); + }, Err(e) => tracing::error!("Error appending entries to follower {}: attempt {}: {:?}", follower, attempt, e), } sleep(RETRY_DELAY).await; @@ -176,13 +219,12 @@ impl Consensus { } #[tracing::instrument(skip_all)] - pub async fn append_entries_to_followers(entries: Vec, followers: Vec) -> Result<(), anyhow::Error> { + pub async fn append_entries_to_followers(entries: Vec, followers: Vec) -> Result<(), anyhow::Error> { #[cfg(feature = "metrics")] let start = metrics::now(); - for entry in entries { for follower in &followers { - if let Err(e) = Self::append_entries(follower, vec![entry.clone()]).await { + if let Err(e) = Self::append_entries(follower.clone(), vec![entry.clone()]).await { tracing::debug!("Error appending entry to follower {}: {:?}", follower, e); } } @@ -194,3 +236,20 @@ impl Consensus { Ok(()) } } + +pub struct RaftServiceImpl; + +#[tonic::async_trait] +impl RaftService for RaftServiceImpl { + async fn append_entries(&self, request: Request) -> Result, Status> { + let entries = request.into_inner().entries; + // Process the entries here + + // For example, let's just print the entries + for entry in entries { + println!("Received entry: {:?}", entry); + } + + Ok(Response::new(AppendEntriesResponse { success: true })) + } +} diff --git a/src/infra/blockchain_client/blockchain_client.rs b/src/infra/blockchain_client/blockchain_client.rs index a37d9c6a5..eb5494f47 100644 --- a/src/infra/blockchain_client/blockchain_client.rs +++ b/src/infra/blockchain_client/blockchain_client.rs @@ -209,19 +209,6 @@ impl BlockchainClient { // RPC mutations // ------------------------------------------------------------------------- - /// Appends entries to followers. - pub async fn send_append_entries(&self, entries: Vec) -> anyhow::Result<()> { - tracing::debug!(?entries, "appending entries"); - - let entries = serde_json::to_value(entries)?; - let result = self.http.request::<(), Vec>("stratus_appendEntries", vec![entries]).await; - - match result { - Ok(_) => Ok(()), - Err(e) => log_and_err!(reason = e, "failed to send append entries"), - } - } - /// Sends a signed transaction. pub async fn send_raw_transaction(&self, hash: Hash, tx: Bytes) -> anyhow::Result> { tracing::debug!(%hash, "sending raw transaction"); From defdae6b66ccd6b0aaae6b88ff21f33964dd9860 Mon Sep 17 00:00:00 2001 From: renancloudwalk <53792026+renancloudwalk@users.noreply.github.com> Date: Mon, 27 May 2024 18:33:30 -0300 Subject: [PATCH 03/11] Reutilize connection (#931) * chore: reutilize http2 client * lint --- src/eth/consensus.rs | 39 ++++++++++++++++++++++++--------------- 1 file changed, 24 insertions(+), 15 deletions(-) diff --git a/src/eth/consensus.rs b/src/eth/consensus.rs index 700fb527b..8c7b4cbe5 100644 --- a/src/eth/consensus.rs +++ b/src/eth/consensus.rs @@ -13,6 +13,7 @@ use serde::Serialize; use tokio::sync::mpsc::Sender; use tokio::sync::mpsc::{self}; use tokio::time::sleep; +use tonic::transport::Channel; use tonic::transport::Server; use tonic::Request; use tonic::Response; @@ -38,10 +39,16 @@ pub struct LogEntry { data: String, } +#[derive(Clone)] +struct Peer { + address: String, + client: RaftServiceClient, +} + pub struct Consensus { pub sender: Sender, - leader_name: String, - //XXX current_index: AtomicU64, + leader_name: String, //XXX check the peers instead of using it + //XXX current_index: AtomicU64, } impl Consensus { @@ -68,7 +75,7 @@ impl Consensus { tracing::info!( "Discovered followers: {}", - followers.iter().map(|f| f.to_string()).collect::>().join(", ") + followers.iter().map(|f| f.address.to_string()).collect::>().join(", ") ); while let Some(data) = receiver.recv().await { @@ -154,7 +161,7 @@ impl Consensus { } #[tracing::instrument(skip_all)] - pub async fn discover_followers() -> Result, anyhow::Error> { + pub async fn discover_followers() -> Result, anyhow::Error> { let client = Client::try_default().await?; let pods: Api = Api::namespaced(client, &Self::current_namespace().unwrap_or("default".to_string())); @@ -166,7 +173,11 @@ impl Consensus { if let Some(pod_name) = p.metadata.name { if pod_name != Self::current_node().unwrap() { if let Some(namespace) = Self::current_namespace() { - followers.push(format!("http://{}.stratus-api.{}.svc.cluster.local:3777", pod_name, namespace)); + let address = format!("http://{}.stratus-api.{}.svc.cluster.local:3777", pod_name, namespace); + let client = RaftServiceClient::connect(address.clone()).await?; + + let peer = Peer { address, client }; + followers.push(peer); } } } @@ -175,12 +186,10 @@ impl Consensus { Ok(followers) } - async fn append_entries(follower: String, entries: Vec) -> Result<(), anyhow::Error> { + async fn append_entries(mut follower: Peer, entries: Vec) -> Result<(), anyhow::Error> { #[cfg(feature = "metrics")] let start = metrics::now(); - let mut client = RaftServiceClient::connect(follower.clone()).await?; - for attempt in 1..=RETRY_ATTEMPTS { let grpc_entries: Vec = entries .iter() @@ -191,23 +200,23 @@ impl Consensus { .collect(); let request = Request::new(AppendEntriesRequest { entries: grpc_entries }); - let response = client.append_entries(request).await; + let response = follower.client.append_entries(request).await; match response { Ok(resp) => if resp.into_inner().success { #[cfg(not(feature = "metrics"))] - tracing::debug!("Entries appended to follower {}: attempt {}: success", follower, attempt); + tracing::debug!("Entries appended to follower {}: attempt {}: success", follower.address, attempt); #[cfg(feature = "metrics")] tracing::debug!( "Entries appended to follower {}: attempt {}: success time_elapsed: {:?}", - follower, + follower.address, attempt, start.elapsed() ); return Ok(()); }, - Err(e) => tracing::error!("Error appending entries to follower {}: attempt {}: {:?}", follower, attempt, e), + Err(e) => tracing::error!("Error appending entries to follower {}: attempt {}: {:?}", follower.address, attempt, e), } sleep(RETRY_DELAY).await; } @@ -215,17 +224,17 @@ impl Consensus { #[cfg(feature = "metrics")] metrics::inc_append_entries(start.elapsed()); - Err(anyhow!("Failed to append entries to {} after {} attempts", follower, RETRY_ATTEMPTS)) + Err(anyhow!("Failed to append entries to {} after {} attempts", follower.address, RETRY_ATTEMPTS)) } #[tracing::instrument(skip_all)] - pub async fn append_entries_to_followers(entries: Vec, followers: Vec) -> Result<(), anyhow::Error> { + pub async fn append_entries_to_followers(entries: Vec, followers: Vec) -> Result<(), anyhow::Error> { #[cfg(feature = "metrics")] let start = metrics::now(); for entry in entries { for follower in &followers { if let Err(e) = Self::append_entries(follower.clone(), vec![entry.clone()]).await { - tracing::debug!("Error appending entry to follower {}: {:?}", follower, e); + tracing::debug!("Error appending entry to follower {}: {:?}", follower.address, e); } } } From afde5a723e2a4033ab99ef8e4a4ea15f6ddb00bb Mon Sep 17 00:00:00 2001 From: carneiro-cw <156914855+carneiro-cw@users.noreply.github.com> Date: Mon, 27 May 2024 18:40:42 -0300 Subject: [PATCH 04/11] fix: preload_block_number (#933) --- e2e/test/automine/e2e-json-rpc.test.ts | 6 +++++- src/eth/storage/rocks/rocks_state.rs | 7 +++---- src/eth/storage/rocks/types.rs | 6 ++++++ 3 files changed, 14 insertions(+), 5 deletions(-) diff --git a/e2e/test/automine/e2e-json-rpc.test.ts b/e2e/test/automine/e2e-json-rpc.test.ts index d35558db0..2422648d8 100644 --- a/e2e/test/automine/e2e-json-rpc.test.ts +++ b/e2e/test/automine/e2e-json-rpc.test.ts @@ -16,6 +16,7 @@ import { sendReset, subscribeAndGetEvent, subscribeAndGetEventWithContract, + ONE, } from "../helpers/rpc"; describe("JSON-RPC", () => { @@ -66,7 +67,7 @@ describe("JSON-RPC", () => { let gas = await send("eth_estimateGas", [tx]); expect(gas).match(HEX_PATTERN, "format"); - const gasDec = parseInt(gas, 16); + const gasDec = parseInt(gas, 16); expect(gasDec).to.be.greaterThan(0).and.lessThan(1_000_000); }); }); @@ -85,6 +86,9 @@ describe("JSON-RPC", () => { describe("Block", () => { it("eth_blockNumber", async function () { (await sendExpect("eth_blockNumber")).eq(ZERO); + await sendEvmMine(); + (await sendExpect("eth_blockNumber")).eq(ONE); + await sendReset(); }); it("eth_getBlockByNumber", async function () { let block: Block = await send("eth_getBlockByNumber", [ZERO, true]); diff --git a/src/eth/storage/rocks/rocks_state.rs b/src/eth/storage/rocks/rocks_state.rs index 794e24484..d1c0a3e3b 100644 --- a/src/eth/storage/rocks/rocks_state.rs +++ b/src/eth/storage/rocks/rocks_state.rs @@ -7,7 +7,6 @@ use anyhow::anyhow; use anyhow::Context; use futures::future::join_all; use itertools::Itertools; -use num_traits::cast::ToPrimitive; use tokio::sync::mpsc; use tokio::task; use tokio::task::JoinHandle; @@ -110,9 +109,9 @@ impl RocksStorageState { } pub fn preload_block_number(&self) -> anyhow::Result { - let account_block_number = self.accounts.get_current_block_number(); - - Ok((account_block_number.to_u64().unwrap_or(0u64)).into()) + let block_number = self.blocks_by_number.last().map(|(num, _)| num).unwrap_or_default(); + tracing::error!(?block_number); + Ok((u64::from(block_number)).into()) } pub async fn sync_data(&self) -> anyhow::Result<()> { diff --git a/src/eth/storage/rocks/types.rs b/src/eth/storage/rocks/types.rs index ea54c75bb..2315ad46d 100644 --- a/src/eth/storage/rocks/types.rs +++ b/src/eth/storage/rocks/types.rs @@ -229,6 +229,12 @@ impl From for BlockNumber { } } +impl From for u64 { + fn from(value: BlockNumberRocksdb) -> Self { + value.0.as_u64() + } +} + #[derive(Clone, Default, Hash, Eq, PartialEq, PartialOrd, Ord, serde::Serialize, serde::Deserialize)] pub struct SlotIndexRocksdb(U256); From 9f0a8c1456faaf7bad6c9f5c3b469e28098625d2 Mon Sep 17 00:00:00 2001 From: carneiro-cw <156914855+carneiro-cw@users.noreply.github.com> Date: Mon, 27 May 2024 18:48:27 -0300 Subject: [PATCH 05/11] chore: add transaction type to TransactionInput (#934) --- src/eth/primitives/transaction_input.rs | 8 +++++++- src/eth/primitives/transaction_mined.rs | 1 + src/eth/storage/postgres_permanent/types.rs | 1 + src/eth/storage/rocks/types.rs | 1 + 4 files changed, 10 insertions(+), 1 deletion(-) diff --git a/src/eth/primitives/transaction_input.rs b/src/eth/primitives/transaction_input.rs index 8388bbf66..aff2990ed 100644 --- a/src/eth/primitives/transaction_input.rs +++ b/src/eth/primitives/transaction_input.rs @@ -34,6 +34,10 @@ use crate::log_and_err; #[derive(DebugAsJson, Clone, Default, PartialEq, Eq, serde::Serialize, serde::Deserialize)] pub struct TransactionInput { + /// This is needed for relaying transactions correctly, a transaction sent as Legacy should + /// be relayed using rlp to the legacy format, the same is true for the other possible formats. + /// Otherwise we'd need to re-sign the transactions to always encode in the same format. + pub tx_type: Option, /// TODO: Optional for external/older transactions, but it should be required for newer transactions. /// /// Maybe TransactionInput should be split into two structs for representing these two different requirements. @@ -72,6 +76,7 @@ impl TransactionInput { impl Dummy for TransactionInput { fn dummy_with_rng(faker: &Faker, rng: &mut R) -> Self { Self { + tx_type: Some(rng.next_u64().into()), chain_id: faker.fake_with_rng(rng), hash: faker.fake_with_rng(rng), nonce: faker.fake_with_rng(rng), @@ -141,6 +146,7 @@ fn try_from_ethers_transaction(value: EthersTransaction, compute_signer: bool) - }; Ok(TransactionInput { + tx_type: value.transaction_type, chain_id: match value.chain_id { Some(chain_id) => Some(chain_id.try_into()?), None => None, @@ -179,10 +185,10 @@ impl From for EthersTransaction { v: value.v, r: value.r, s: value.s, + transaction_type: value.tx_type, block_hash: None, block_number: None, transaction_index: None, - transaction_type: None, access_list: None, max_priority_fee_per_gas: None, max_fee_per_gas: None, diff --git a/src/eth/primitives/transaction_mined.rs b/src/eth/primitives/transaction_mined.rs index e70cecf3e..c29f02ead 100644 --- a/src/eth/primitives/transaction_mined.rs +++ b/src/eth/primitives/transaction_mined.rs @@ -99,6 +99,7 @@ impl From for EthersTransaction { v: input.v, r: input.r, s: input.s, + transaction_type: input.tx_type, ..Default::default() } } diff --git a/src/eth/storage/postgres_permanent/types.rs b/src/eth/storage/postgres_permanent/types.rs index 06993a137..171062515 100644 --- a/src/eth/storage/postgres_permanent/types.rs +++ b/src/eth/storage/postgres_permanent/types.rs @@ -78,6 +78,7 @@ impl PostgresTransaction { r: self.r.into(), s: self.s.into(), value: self.value, + tx_type: None, }; TransactionMined { transaction_index: self.idx_in_block, diff --git a/src/eth/storage/rocks/types.rs b/src/eth/storage/rocks/types.rs index 2315ad46d..e3712e7ce 100644 --- a/src/eth/storage/rocks/types.rs +++ b/src/eth/storage/rocks/types.rs @@ -488,6 +488,7 @@ impl From for TransactionInput { v: item.v, r: item.r, s: item.s, + tx_type: None, } } } From 010e11d258488d35890ee02d72d955e386f5911f Mon Sep 17 00:00:00 2001 From: Renato Dinhani <101204870+dinhani-cw@users.noreply.github.com> Date: Mon, 27 May 2024 19:19:30 -0300 Subject: [PATCH 06/11] feat: reconnect websocket client on failure (#932) --- .../blockchain_client/blockchain_client.rs | 88 ++++++++++++++----- 1 file changed, 66 insertions(+), 22 deletions(-) diff --git a/src/infra/blockchain_client/blockchain_client.rs b/src/infra/blockchain_client/blockchain_client.rs index eb5494f47..3da94ffb5 100644 --- a/src/infra/blockchain_client/blockchain_client.rs +++ b/src/infra/blockchain_client/blockchain_client.rs @@ -6,11 +6,14 @@ use ethers_core::types::Transaction; use jsonrpsee::core::client::ClientT; use jsonrpsee::core::client::Subscription; use jsonrpsee::core::client::SubscriptionClientT; +use jsonrpsee::core::ClientError; use jsonrpsee::http_client::HttpClient; use jsonrpsee::http_client::HttpClientBuilder; use jsonrpsee::ws_client::WsClient; use jsonrpsee::ws_client::WsClientBuilder; use serde_json::Value as JsonValue; +use tokio::sync::RwLock; +use tokio::sync::RwLockReadGuard; use super::pending_transaction::PendingTransaction; use crate::eth::primitives::Address; @@ -22,14 +25,15 @@ use crate::eth::primitives::SlotIndex; use crate::eth::primitives::SlotValue; use crate::eth::primitives::StoragePointInTime; use crate::eth::primitives::Wei; +use crate::ext::DisplayExt; use crate::log_and_err; #[derive(Debug)] pub struct BlockchainClient { http: HttpClient, - pub http_url: String, - ws: Option, - pub ws_url: Option, + ws: Option>, + ws_url: Option, + timeout: Duration, } impl BlockchainClient { @@ -43,7 +47,7 @@ impl BlockchainClient { tracing::info!(%http_url, "starting blockchain client"); // build http provider - let http = match HttpClientBuilder::default().request_timeout(timeout).build(http_url) { + let http = match Self::build_http_client(http_url, timeout) { Ok(http) => http, Err(e) => { tracing::error!(reason = ?e, url = %http_url, "failed to create blockchain http client"); @@ -53,8 +57,8 @@ impl BlockchainClient { // build ws provider let (ws, ws_url) = if let Some(ws_url) = ws_url { - match WsClientBuilder::new().connection_timeout(timeout).build(ws_url).await { - Ok(ws) => (Some(ws), Some(ws_url.to_string())), + match Self::build_ws_client(ws_url, timeout).await { + Ok(ws) => (Some(RwLock::new(ws)), Some(ws_url.to_string())), Err(e) => { tracing::error!(reason = ?e, url = %ws_url, "failed to create blockchain websocket client"); return Err(e).context("failed to create blockchain websocket client"); @@ -64,12 +68,7 @@ impl BlockchainClient { (None, None) }; - let client = Self { - http, - http_url: http_url.to_string(), - ws, - ws_url, - }; + let client = Self { http, ws, ws_url, timeout }; // check health before assuming it is ok client.fetch_listening().await?; @@ -77,6 +76,28 @@ impl BlockchainClient { Ok(client) } + fn build_http_client(url: &str, timeout: Duration) -> anyhow::Result { + tracing::info!(%url, timeout = %timeout.to_string_ext(), "creating blockchain http client"); + match HttpClientBuilder::default().request_timeout(timeout).build(url) { + Ok(http) => Ok(http), + Err(e) => { + tracing::error!(reason = ?e, %url, timeout = %timeout.to_string_ext(), "failed to create blockchain http client"); + Err(e).context("failed to create blockchain http client") + } + } + } + + async fn build_ws_client(url: &str, timeout: Duration) -> anyhow::Result { + tracing::info!(%url, timeout = %timeout.to_string_ext(), "creating blockchain websocket client"); + match WsClientBuilder::new().connection_timeout(timeout).build(url).await { + Ok(ws) => Ok(ws), + Err(e) => { + tracing::error!(reason = ?e, %url, timeout = %timeout.to_string_ext(), "failed to create blockchain websocket client"); + Err(e).context("failed to create blockchain websocket client") + } + } + } + // ------------------------------------------------------------------------- // Websocket // ------------------------------------------------------------------------- @@ -87,9 +108,9 @@ impl BlockchainClient { } /// Validates it is connected to websocket and returns a reference to the websocket client. - fn require_ws(&self) -> anyhow::Result<&WsClient> { + async fn require_ws(&self) -> anyhow::Result> { match &self.ws { - Some(ws) => Ok(ws), + Some(ws) => Ok(ws.read().await), None => log_and_err!("blockchain client not connected to websocket"), } } @@ -228,15 +249,38 @@ impl BlockchainClient { pub async fn subscribe_new_heads(&self) -> anyhow::Result> { tracing::debug!("subscribing to newHeads event"); + let ws = self.require_ws().await?; + + let mut first_attempt = true; + loop { + let result = ws + .subscribe::>("eth_subscribe", vec![JsonValue::String("newHeads".to_owned())], "eth_unsubscribe") + .await; + + match result { + // subscribed + Ok(sub) => return Ok(sub), + + // failed and need to reconnect + e @ Err(ClientError::RestartNeeded(_)) => { + // will try to reconnect websocket client only in first attempt + if first_attempt { + tracing::error!(%first_attempt, reason = ?e, "failed to subscribe to newHeads event. trying to reconnect websocket client now."); + } else { + tracing::error!(%first_attempt, reason = ?e, "failed to subscribe to newHeads event. will not try to reconnect websocket client."); + return e.context("failed to subscribe to newHeads event"); + } + first_attempt = false; + + // reconnect websocket client + let new_ws_client = Self::build_ws_client(self.ws_url.as_ref().unwrap(), self.timeout).await?; + let mut current_ws_client = self.ws.as_ref().unwrap().write().await; + let _ = std::mem::replace(&mut *current_ws_client, new_ws_client); + } - let ws = self.require_ws()?; - let result = ws - .subscribe::>("eth_subscribe", vec![JsonValue::String("newHeads".to_owned())], "eth_unsubscribe") - .await; - - match result { - Ok(sub) => Ok(sub), - Err(e) => log_and_err!(reason = e, "failed to subscribe to newHeads event"), + // failed and cannot do anything + Err(e) => return log_and_err!(reason = e, "failed to subscribe to newHeads event"), + } } } } From c34d3b133304647a046dcf138f0d8e5f314cef2a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Marcos?= <164224824+marcospb19-cw@users.noreply.github.com> Date: Mon, 27 May 2024 19:52:42 -0300 Subject: [PATCH 07/11] improve TPS summary + add it to importer-online (#935) * improve precision of TPS summary (importer-offline) * add TPS summary to importer-online --- src/bin/importer_offline.rs | 17 ++++++----------- src/bin/importer_online.rs | 17 ++++++++++++++++- src/utils.rs | 14 ++++++++++++++ 3 files changed, 36 insertions(+), 12 deletions(-) diff --git a/src/bin/importer_offline.rs b/src/bin/importer_offline.rs index 6ead9c00b..cf0cd7b3f 100644 --- a/src/bin/importer_offline.rs +++ b/src/bin/importer_offline.rs @@ -19,6 +19,7 @@ use stratus::eth::storage::InMemoryPermanentStorage; use stratus::eth::storage::StratusStorage; use stratus::eth::BlockMiner; use stratus::eth::Executor; +use stratus::utils::calculate_tps_and_bpm; use stratus::GlobalServices; use stratus::GlobalState; use tokio::runtime::Handle; @@ -175,21 +176,15 @@ async fn execute_block_importer( } } - let seconds_elapsed = match instant_before_execution.elapsed().as_secs() as usize { - // avoid division by zero - 0 => 1, - non_zero => non_zero, - }; - let tps = transaction_count.checked_div(seconds_elapsed).unwrap_or(transaction_count); - let minutes_elapsed = seconds_elapsed as f64 / 60.0; - let blocks_per_minute = blocks_len as f64 / minutes_elapsed; + let duration = instant_before_execution.elapsed(); + let (tps, bpm) = calculate_tps_and_bpm(duration, transaction_count, blocks_len); + tracing::info!( tps, - blocks_per_minute = format_args!("{blocks_per_minute:.2}"), - seconds_elapsed, + blocks_per_minute = format_args!("{bpm:.2}"), + ?duration, %block_start, %block_end, - transaction_count, receipts = receipts.len(), "reexecuted blocks batch", ); diff --git a/src/bin/importer_online.rs b/src/bin/importer_online.rs index 9b3c095db..2a816772b 100644 --- a/src/bin/importer_online.rs +++ b/src/bin/importer_online.rs @@ -22,12 +22,14 @@ use stratus::infra::metrics; use stratus::infra::tracing::warn_task_rx_closed; use stratus::infra::tracing::warn_task_tx_closed; use stratus::infra::BlockchainClient; +use stratus::utils::calculate_tps; use stratus::GlobalServices; use stratus::GlobalState; use tokio::sync::mpsc; use tokio::task::yield_now; use tokio::time::sleep; use tokio::time::timeout; +use tokio::time::Instant; // ----------------------------------------------------------------------------- // Globals @@ -134,11 +136,24 @@ async fn start_block_executor(executor: Arc, miner: Arc, m // execute and mine let receipts = ExternalReceipts::from(receipts); - tracing::info!(number = %block.number(), txs_len = block.transactions.len(), "reexecuting external block"); + let instant_before_execution = Instant::now(); + if executor.reexecute_external(&block, &receipts).await.is_err() { GlobalState::shutdown_from(TASK_NAME, "failed to re-execute external block"); return; }; + + let duration = instant_before_execution.elapsed(); + let tps = calculate_tps(duration, block.transactions.len()); + + tracing::info!( + tps, + ?duration, + block_number = ?block.number(), + receipts = receipts.len(), + "reexecuted external block", + ); + if miner.mine_external_mixed_and_commit().await.is_err() { GlobalState::shutdown_from(TASK_NAME, "failed to mine external block"); return; diff --git a/src/utils.rs b/src/utils.rs index db5c5d3dd..36a5c1bf9 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -1,3 +1,5 @@ +use std::time::Duration; + use tokio::select; use tokio::signal::unix::signal; use tokio::signal::unix::SignalKind; @@ -36,3 +38,15 @@ pub async fn spawn_signal_handler() -> anyhow::Result<()> { Ok(()) } + +pub fn calculate_tps_and_bpm(duration: Duration, transaction_count: usize, block_count: usize) -> (f64, f64) { + let seconds_elapsed = duration.as_secs_f64() + f64::EPSILON; + let tps = transaction_count as f64 / seconds_elapsed; + let blocks_per_minute = block_count as f64 / (seconds_elapsed / 60.0); + (tps, blocks_per_minute) +} + +pub fn calculate_tps(duration: Duration, transaction_count: usize) -> f64 { + let seconds_elapsed = duration.as_secs_f64() + f64::EPSILON; + transaction_count as f64 / seconds_elapsed +} From e419bcd3c7999159baea2306c9146f34bcf90fc3 Mon Sep 17 00:00:00 2001 From: Renato Dinhani <101204870+dinhani-cw@users.noreply.github.com> Date: Mon, 27 May 2024 22:42:15 -0300 Subject: [PATCH 08/11] feat: more logs and expect_infallible for serde serialization (#937) --- src/bin/importer_offline.rs | 85 ++++++++++++------- src/bin/importer_online.rs | 76 ++++++++++------- src/config.rs | 16 ++-- src/eth/block_miner.rs | 2 +- src/eth/evm/revm.rs | 2 +- src/eth/executor.rs | 13 +-- src/eth/primitives/block.rs | 5 +- src/eth/primitives/log_mined.rs | 3 +- src/eth/primitives/transaction_mined.rs | 5 +- src/eth/rpc/rpc_server.rs | 11 +-- .../storage/inmemory/inmemory_permanent.rs | 4 +- .../postgres_external_rpc.rs | 7 +- .../postgres_permanent/postgres_permanent.rs | 4 +- src/eth/storage/rocks/rocks_state.rs | 2 +- src/eth/transaction_relayer.rs | 2 +- src/ext.rs | 23 ++++- .../blockchain_client/blockchain_client.rs | 50 +++++------ src/infra/docker.rs | 4 +- src/infra/metrics/metrics_init.rs | 4 +- src/infra/tracing.rs | 14 ++- 20 files changed, 193 insertions(+), 139 deletions(-) diff --git a/src/bin/importer_offline.rs b/src/bin/importer_offline.rs index cf0cd7b3f..200cd7dcd 100644 --- a/src/bin/importer_offline.rs +++ b/src/bin/importer_offline.rs @@ -19,6 +19,8 @@ use stratus::eth::storage::InMemoryPermanentStorage; use stratus::eth::storage::StratusStorage; use stratus::eth::BlockMiner; use stratus::eth::Executor; +use stratus::ext::ResultExt; +use stratus::log_and_err; use stratus::utils::calculate_tps_and_bpm; use stratus::GlobalServices; use stratus::GlobalState; @@ -80,34 +82,44 @@ async fn run(config: ImporterOfflineConfig) -> anyhow::Result<()> { // execute thread: external rpc storage loader let storage_thread = thread::Builder::new().name("storage-loader".into()); let storage_tokio = Handle::current(); - let _ = storage_thread.spawn(move || { - let _tokio_guard = storage_tokio.enter(); - - let result = storage_tokio.block_on(execute_external_rpc_storage_loader( - rpc_storage, - config.blocks_by_fetch, - config.paralellism, - block_start, - block_end, - backlog_tx, - )); - if let Err(e) = result { - tracing::error!(reason = ?e, "storage-loader failed"); - } - }); + let storage_loader_thread = storage_thread + .spawn(move || { + let _tokio_guard = storage_tokio.enter(); + + let result = storage_tokio.block_on(execute_external_rpc_storage_loader( + rpc_storage, + config.blocks_by_fetch, + config.paralellism, + block_start, + block_end, + backlog_tx, + )); + if let Err(e) = result { + tracing::error!(reason = ?e, "storage-loader failed"); + } + }) + .expect("spawning storage-loader thread should not fail"); // execute thread: block importer let importer_thread = thread::Builder::new().name("block-importer".into()); let importer_tokio = Handle::current(); - let importer_join = importer_thread.spawn(move || { - let _tokio_guard = importer_tokio.enter(); - let result = importer_tokio.block_on(execute_block_importer(executor, miner, storage, csv, backlog_rx, block_snapshots)); - if let Err(e) = result { - tracing::error!(reason = ?e, "block-importer failed"); - } - })?; + let block_importer_thread = importer_thread + .spawn(move || { + let _tokio_guard = importer_tokio.enter(); + let result = importer_tokio.block_on(execute_block_importer(executor, miner, storage, csv, backlog_rx, block_snapshots)); + if let Err(e) = result { + tracing::error!(reason = ?e, "block-importer failed"); + } + }) + .expect("spawning block-importer thread should not fail"); - let _ = importer_join.join(); + // await tasks + if let Err(e) = block_importer_thread.join() { + tracing::error!(reason = ?e, "block-importer thread failed"); + } + if let Err(e) = storage_loader_thread.join() { + tracing::error!(reason = ?e, "storage-loader thread failed"); + } Ok(()) } @@ -127,7 +139,7 @@ async fn execute_block_importer( blocks_to_export_snapshot: Vec, ) -> anyhow::Result<()> { const TASK_NAME: &str = "external-block-executor"; - tracing::info!("starting {}", TASK_NAME); + tracing::info!("creating task {}", TASK_NAME); // receives blocks and receipts from the backlog to reexecute and import loop { @@ -206,7 +218,7 @@ async fn execute_external_rpc_storage_loader( backlog: mpsc::Sender, ) -> anyhow::Result<()> { const TASK_NAME: &str = "external-block-loader"; - tracing::info!(%start, %end, "starting {}", TASK_NAME); + tracing::info!(%start, %end, "creating task {}", TASK_NAME); // prepare loads to be executed in parallel let mut tasks = Vec::new(); @@ -234,13 +246,18 @@ async fn execute_external_rpc_storage_loader( }; // check if executed correctly - let Ok((blocks, receipts)) = result else { - return Err(anyhow!(GlobalState::shutdown_from(TASK_NAME, "failed to fetch block or receipt"))); + let (blocks, receipts) = match result { + Ok((blocks, receipts)) => (blocks, receipts), + Err(e) => { + let message = GlobalState::shutdown_from(TASK_NAME, "failed to fetch block or receipt"); + return log_and_err!(reason = e, message); + } }; // check blocks were really loaded if blocks.is_empty() { - return Err(anyhow!(GlobalState::shutdown_from(TASK_NAME, "no blocks returned when they were expected"))); + let message = GlobalState::shutdown_from(TASK_NAME, "no blocks returned when they were expected"); + return log_and_err!(message); } // send to backlog @@ -279,9 +296,15 @@ fn export_snapshot(external_block: &ExternalBlock, external_receipts: &ExternalR fs::create_dir_all(&dir)?; // write json - fs::write(format!("{}/block.json", dir), serde_json::to_string_pretty(external_block)?)?; - fs::write(format!("{}/receipts.json", dir), serde_json::to_string_pretty(&receipts_snapshot)?)?; - fs::write(format!("{}/snapshot.json", dir), serde_json::to_string_pretty(&state_snapshot)?)?; + fs::write(format!("{}/block.json", dir), serde_json::to_string_pretty(external_block).expect_infallible())?; + fs::write( + format!("{}/receipts.json", dir), + serde_json::to_string_pretty(&receipts_snapshot).expect_infallible(), + )?; + fs::write( + format!("{}/snapshot.json", dir), + serde_json::to_string_pretty(&state_snapshot).expect_infallible(), + )?; Ok(()) } diff --git a/src/bin/importer_online.rs b/src/bin/importer_online.rs index 2a816772b..05eb93403 100644 --- a/src/bin/importer_online.rs +++ b/src/bin/importer_online.rs @@ -22,6 +22,7 @@ use stratus::infra::metrics; use stratus::infra::tracing::warn_task_rx_closed; use stratus::infra::tracing::warn_task_tx_closed; use stratus::infra::BlockchainClient; +use stratus::log_and_err; use stratus::utils::calculate_tps; use stratus::GlobalServices; use stratus::GlobalState; @@ -29,7 +30,6 @@ use tokio::sync::mpsc; use tokio::task::yield_now; use tokio::time::sleep; use tokio::time::timeout; -use tokio::time::Instant; // ----------------------------------------------------------------------------- // Globals @@ -113,7 +113,9 @@ pub async fn run_importer_online( let task_block_fetcher = tokio::spawn(start_block_fetcher(block_fetcher_chain, backlog_tx, number)); // await all tasks - try_join!(task_executor, task_block_fetcher, task_number_fetcher)?; + if let Err(e) = try_join!(task_executor, task_block_fetcher, task_number_fetcher) { + tracing::error!(reason = ?e, "importer-online failed"); + } Ok(()) } @@ -122,12 +124,16 @@ pub async fn run_importer_online( // ----------------------------------------------------------------------------- // Executes external blocks and persist them to storage. -async fn start_block_executor(executor: Arc, miner: Arc, mut backlog_rx: mpsc::UnboundedReceiver<(ExternalBlock, Vec)>) { +async fn start_block_executor( + executor: Arc, + miner: Arc, + mut backlog_rx: mpsc::UnboundedReceiver<(ExternalBlock, Vec)>, +) -> anyhow::Result<()> { const TASK_NAME: &str = "block-executor"; while let Some((block, receipts)) = backlog_rx.recv().await { if GlobalState::warn_if_shutdown(TASK_NAME) { - return; + return Ok(()); } #[cfg(feature = "metrics")] @@ -135,28 +141,29 @@ async fn start_block_executor(executor: Arc, miner: Arc, m // execute and mine let receipts = ExternalReceipts::from(receipts); - - let instant_before_execution = Instant::now(); - - if executor.reexecute_external(&block, &receipts).await.is_err() { - GlobalState::shutdown_from(TASK_NAME, "failed to re-execute external block"); - return; + if let Err(e) = executor.reexecute_external(&block, &receipts).await { + let message = GlobalState::shutdown_from(TASK_NAME, "failed to re-execute external block"); + return log_and_err!(reason = e, message); }; - let duration = instant_before_execution.elapsed(); - let tps = calculate_tps(duration, block.transactions.len()); - - tracing::info!( - tps, - ?duration, - block_number = ?block.number(), - receipts = receipts.len(), - "reexecuted external block", - ); + // statistics + #[cfg(feature = "metrics")] + { + let duration = start.elapsed(); + let tps = calculate_tps(duration, block.transactions.len()); + + tracing::info!( + tps, + duraton = %duration.to_string_ext(), + block_number = ?block.number(), + receipts = receipts.len(), + "reexecuted external block", + ); + } - if miner.mine_external_mixed_and_commit().await.is_err() { - GlobalState::shutdown_from(TASK_NAME, "failed to mine external block"); - return; + if let Err(e) = miner.mine_external_mixed_and_commit().await { + let message = GlobalState::shutdown_from(TASK_NAME, "failed to mine external block"); + return log_and_err!(reason = e, message); }; #[cfg(feature = "metrics")] @@ -167,6 +174,7 @@ async fn start_block_executor(executor: Arc, miner: Arc, m } warn_task_tx_closed(TASK_NAME); + Ok(()) } // ----------------------------------------------------------------------------- @@ -174,7 +182,7 @@ async fn start_block_executor(executor: Arc, miner: Arc, m // ----------------------------------------------------------------------------- /// Retrieves the blockchain current block number. -async fn start_number_fetcher(chain: Arc, sync_interval: Duration) { +async fn start_number_fetcher(chain: Arc, sync_interval: Duration) -> anyhow::Result<()> { const TASK_NAME: &str = "external-number-fetcher"; // subscribe to newHeads event if WS is enabled @@ -183,9 +191,9 @@ async fn start_number_fetcher(chain: Arc, sync_interval: Durat tracing::info!("subscribing {} to newHeads event", TASK_NAME); match chain.subscribe_new_heads().await { Ok(sub) => Some(sub), - Err(_) => { - GlobalState::shutdown_from(TASK_NAME, "cannot subscribe to newHeads event"); - return; + Err(e) => { + let message = GlobalState::shutdown_from(TASK_NAME, "cannot subscribe to newHeads event"); + return log_and_err!(reason = e, message); } } } @@ -197,7 +205,7 @@ async fn start_number_fetcher(chain: Arc, sync_interval: Durat loop { if GlobalState::warn_if_shutdown(TASK_NAME) { - return; + return Ok(()); } // if we have a subscription, try to read from subscription. @@ -238,7 +246,7 @@ async fn start_number_fetcher(chain: Arc, sync_interval: Durat } // fallback to polling - tracing::warn!("number-fetcher falling back to http polling because subscription failed or it not enabled"); + tracing::warn!("number-fetcher falling back to http polling because subscription failed or it is not enabled"); match chain.fetch_block_number().await { Ok(number) => { tracing::info!( @@ -261,12 +269,16 @@ async fn start_number_fetcher(chain: Arc, sync_interval: Durat // ----------------------------------------------------------------------------- /// Retrieves blocks and receipts. -async fn start_block_fetcher(chain: Arc, backlog_tx: mpsc::UnboundedSender<(ExternalBlock, Vec)>, mut number: BlockNumber) { +async fn start_block_fetcher( + chain: Arc, + backlog_tx: mpsc::UnboundedSender<(ExternalBlock, Vec)>, + mut number: BlockNumber, +) -> anyhow::Result<()> { const TASK_NAME: &str = "external-block-fetcher"; loop { if GlobalState::warn_if_shutdown(TASK_NAME) { - return; + return Ok(()); } // if we are ahead of current block number, await until we are behind again @@ -292,7 +304,7 @@ async fn start_block_fetcher(chain: Arc, backlog_tx: mpsc::Unb while let Some((block, receipts)) = tasks.next().await { if backlog_tx.send((block, receipts)).is_err() { warn_task_rx_closed(TASK_NAME); - return; + return Ok(()); } } } diff --git a/src/config.rs b/src/config.rs index 259cf0699..f6817dc90 100644 --- a/src/config.rs +++ b/src/config.rs @@ -103,7 +103,7 @@ impl CommonConfig { /// Initializes Tokio runtime. pub fn init_runtime(&self) -> Runtime { print!( - "starting tokio runtime; async_threads={}; blocking_threads={}", + "creating tokio runtime; async_threads={}; blocking_threads={}", self.num_async_threads, self.num_blocking_threads ); @@ -114,7 +114,7 @@ impl CommonConfig { .max_blocking_threads(self.num_blocking_threads) .thread_keep_alive(Duration::from_secs(u64::MAX)) .build() - .expect("failed to start tokio runtime"); + .expect("failed to create tokio runtime"); runtime } @@ -174,7 +174,7 @@ impl ExecutorConfig { const TASK_NAME: &str = "evm-thread"; let num_evms = max(self.num_evms, 1); - tracing::info!(config = ?self, "starting executor"); + tracing::info!(config = ?self, "creating executor"); // spawn evm in background using native threads let (evm_tx, evm_rx) = crossbeam_channel::unbounded::(); @@ -257,7 +257,7 @@ impl MinerConfig { } async fn init_with_mode(&self, mode: BlockMinerMode, storage: Arc, consensus: Option>) -> anyhow::Result> { - tracing::info!(config = ?self, "starting block miner"); + tracing::info!(config = ?self, "creating block miner"); // create miner @@ -309,7 +309,7 @@ pub struct RelayerConfig { impl RelayerConfig { pub async fn init(&self, storage: Arc) -> anyhow::Result>> { - tracing::info!(config = ?self, "starting transaction relayer"); + tracing::info!(config = ?self, "creating transaction relayer"); match self.forward_to { Some(ref forward_to) => { @@ -635,7 +635,7 @@ pub enum ExternalRpcStorageKind { impl ExternalRpcStorageConfig { /// Initializes external rpc storage implementation. pub async fn init(&self) -> anyhow::Result> { - tracing::info!(config = ?self, "starting external rpc storage"); + tracing::info!(config = ?self, "creating external rpc storage"); match self.external_rpc_storage_kind { ExternalRpcStorageKind::Postgres { ref url } => { @@ -681,7 +681,7 @@ pub enum TemporaryStorageKind { impl TemporaryStorageConfig { /// Initializes temporary storage implementation. pub async fn init(&self) -> anyhow::Result> { - tracing::info!(config = ?self, "starting temporary storage"); + tracing::info!(config = ?self, "creating temporary storage"); match self.temp_storage_kind { TemporaryStorageKind::InMemory => Ok(Arc::new(InMemoryTemporaryStorage::default())), @@ -733,7 +733,7 @@ pub enum PermanentStorageKind { impl PermanentStorageConfig { /// Initializes permanent storage implementation. pub async fn init(&self) -> anyhow::Result> { - tracing::info!(config = ?self, "starting permanent storage"); + tracing::info!(config = ?self, "creating permanent storage"); let perm: Arc = match self.perm_storage_kind { PermanentStorageKind::InMemory => Arc::new(InMemoryPermanentStorage::default()), diff --git a/src/eth/block_miner.rs b/src/eth/block_miner.rs index 1bd7686fc..de67fb275 100644 --- a/src/eth/block_miner.rs +++ b/src/eth/block_miner.rs @@ -51,7 +51,7 @@ pub struct BlockMiner { impl BlockMiner { /// Creates a new [`BlockMiner`]. pub fn new(storage: Arc, mode: BlockMinerMode, consensus: Option>) -> Self { - tracing::info!(?mode, "starting block miner"); + tracing::info!(?mode, "creating block miner"); Self { storage, mode, diff --git a/src/eth/evm/revm.rs b/src/eth/evm/revm.rs index 392fea4fc..308169435 100644 --- a/src/eth/evm/revm.rs +++ b/src/eth/evm/revm.rs @@ -63,7 +63,7 @@ impl Revm { /// Creates a new instance of the Revm ready to be used. #[allow(clippy::arc_with_non_send_sync)] pub fn new(storage: Arc, config: EvmConfig) -> Self { - tracing::info!(?config, "starting revm"); + tracing::info!(?config, "creating revm"); // configure handler let mut handler = Handler::mainnet_with_spec(SpecId::LONDON); diff --git a/src/eth/executor.rs b/src/eth/executor.rs index 4c92c20c0..6e93c0a02 100644 --- a/src/eth/executor.rs +++ b/src/eth/executor.rs @@ -37,6 +37,7 @@ use crate::eth::storage::StorageError; use crate::eth::storage::StratusStorage; use crate::eth::BlockMiner; use crate::eth::TransactionRelayer; +use crate::ext::ResultExt; #[cfg(feature = "metrics")] use crate::infra::metrics; use crate::infra::BlockchainClient; @@ -72,7 +73,7 @@ impl Executor { num_evms: usize, consensus: Option>, ) -> Self { - tracing::info!(%num_evms, "starting executor"); + tracing::info!(%num_evms, "creating executor"); Self { evm_tx, @@ -209,8 +210,8 @@ impl Executor { let mut evm_result = match evm_result { Ok(inner) => inner, Err(e) => { - let json_tx = serde_json::to_string(&tx).unwrap(); - let json_receipt = serde_json::to_string(&receipt).unwrap(); + let json_tx = serde_json::to_string(&tx).expect_infallible(); + let json_receipt = serde_json::to_string(&receipt).expect_infallible(); tracing::error!(reason = ?e, %json_tx, %json_receipt, "unexpected error reexecuting transaction"); return Err(e); } @@ -228,9 +229,9 @@ impl Executor { // ensure it matches receipt before saving if let Err(e) = evm_result.execution.compare_with_receipt(receipt) { - let json_tx = serde_json::to_string(&tx).unwrap(); - let json_receipt = serde_json::to_string(&receipt).unwrap(); - let json_execution_logs = serde_json::to_string(&evm_result.execution.logs).unwrap(); + let json_tx = serde_json::to_string(&tx).expect_infallible(); + let json_receipt = serde_json::to_string(&receipt).expect_infallible(); + let json_execution_logs = serde_json::to_string(&evm_result.execution.logs).expect_infallible(); tracing::error!(%json_tx, %json_receipt, %json_execution_logs, "mismatch reexecuting transaction"); return Err(e); }; diff --git a/src/eth/primitives/block.rs b/src/eth/primitives/block.rs index 3be290bf7..cd8b78ae1 100644 --- a/src/eth/primitives/block.rs +++ b/src/eth/primitives/block.rs @@ -26,6 +26,7 @@ use crate::eth::primitives::ExecutionAccountChanges; use crate::eth::primitives::Hash; use crate::eth::primitives::TransactionMined; use crate::eth::primitives::UnixTime; +use crate::ext::ResultExt; #[derive(Debug, Clone, PartialEq, Eq, fake::Dummy, serde::Serialize, serde::Deserialize)] pub struct Block { @@ -112,13 +113,13 @@ impl Block { /// Serializes itself to JSON-RPC block format with full transactions included. pub fn to_json_rpc_with_full_transactions(self) -> JsonValue { let json_rpc_format: EthersBlock = self.into(); - serde_json::to_value(json_rpc_format).unwrap() + serde_json::to_value(json_rpc_format).expect_infallible() } /// Serializes itself to JSON-RPC block format with only transactions hashes included. pub fn to_json_rpc_with_transactions_hashes(self) -> JsonValue { let json_rpc_format: EthersBlock = self.into(); - serde_json::to_value(json_rpc_format).unwrap() + serde_json::to_value(json_rpc_format).expect_infallible() } /// Returns the block number. diff --git a/src/eth/primitives/log_mined.rs b/src/eth/primitives/log_mined.rs index 198e42891..f03515824 100644 --- a/src/eth/primitives/log_mined.rs +++ b/src/eth/primitives/log_mined.rs @@ -17,6 +17,7 @@ use crate::eth::primitives::Hash; use crate::eth::primitives::Index; use crate::eth::primitives::Log; use crate::eth::primitives::LogTopic; +use crate::ext::ResultExt; /// Log that was emitted by the EVM and added to a block. #[derive(Debug, Clone, PartialEq, Eq, fake::Dummy, serde::Serialize, serde::Deserialize)] @@ -54,7 +55,7 @@ impl LogMined { /// Serializes itself to JSON-RPC log format. pub fn to_json_rpc_log(self) -> JsonValue { let json_rpc_format: EthersLog = self.into(); - serde_json::to_value(json_rpc_format).unwrap() + serde_json::to_value(json_rpc_format).expect_infallible() } } diff --git a/src/eth/primitives/transaction_mined.rs b/src/eth/primitives/transaction_mined.rs index c29f02ead..3832e8c03 100644 --- a/src/eth/primitives/transaction_mined.rs +++ b/src/eth/primitives/transaction_mined.rs @@ -20,6 +20,7 @@ use crate::eth::primitives::Index; use crate::eth::primitives::LogMined; use crate::eth::primitives::TransactionInput; use crate::ext::OptionExt; +use crate::ext::ResultExt; use crate::if_else; /// Transaction that was executed by the EVM and added to a block. @@ -67,13 +68,13 @@ impl TransactionMined { /// Serializes itself to JSON-RPC transaction format. pub fn to_json_rpc_transaction(self) -> JsonValue { let json_rpc_format: EthersTransaction = self.into(); - serde_json::to_value(json_rpc_format).unwrap() + serde_json::to_value(json_rpc_format).expect_infallible() } /// Serializes itself to JSON-RPC receipt format. pub fn to_json_rpc_receipt(self) -> JsonValue { let json_rpc_format: EthersReceipt = self.into(); - serde_json::to_value(json_rpc_format).unwrap() + serde_json::to_value(json_rpc_format).expect_infallible() } } diff --git a/src/eth/rpc/rpc_server.rs b/src/eth/rpc/rpc_server.rs index a1164b582..45bdceddd 100644 --- a/src/eth/rpc/rpc_server.rs +++ b/src/eth/rpc/rpc_server.rs @@ -42,6 +42,7 @@ use crate::eth::rpc::RpcSubscriptions; use crate::eth::storage::StratusStorage; use crate::eth::BlockMiner; use crate::eth::Executor; +use crate::ext::ResultExt; use crate::infra::tracing::warn_task_cancellation; use crate::GlobalState; @@ -61,7 +62,7 @@ pub async fn serve_rpc( ) -> anyhow::Result<()> { const TASK_NAME: &str = "rpc-server"; - tracing::info!("starting {}", TASK_NAME); + tracing::info!("creating task {}", TASK_NAME); // configure subscriptions let subs = RpcSubscriptions::spawn( @@ -189,13 +190,13 @@ fn register_methods(mut module: RpcModule) -> anyhow::Result, ctx: Arc) -> anyhow::Result { let (_, number) = next_rpc_param::(params.sequence())?; ctx.storage.reset(number).await?; - Ok(serde_json::to_value(number).unwrap()) + Ok(serde_json::to_value(number).expect_infallible()) } #[cfg(feature = "dev")] async fn evm_mine(_params: Params<'_>, ctx: Arc) -> anyhow::Result { ctx.miner.mine_local_and_commit().await?; - Ok(serde_json::to_value(true).unwrap()) + Ok(serde_json::to_value(true).expect_infallible()) } #[cfg(feature = "dev")] @@ -209,7 +210,7 @@ async fn evm_set_next_block_timestamp(params: Params<'_>, ctx: Arc) Some(block) => UnixTime::set_offset(timestamp, block.header.timestamp)?, None => return log_and_err!("reading latest block returned None")?, } - Ok(serde_json::to_value(timestamp).unwrap()) + Ok(serde_json::to_value(timestamp).expect_infallible()) } // Status @@ -256,7 +257,7 @@ async fn eth_gas_price(_: Params<'_>, _: Arc) -> String { #[tracing::instrument(skip_all)] async fn eth_block_number(_params: Params<'_>, ctx: Arc) -> anyhow::Result { let number = ctx.storage.read_mined_block_number().await?; - Ok(serde_json::to_value(number).unwrap()) + Ok(serde_json::to_value(number).expect_infallible()) } #[tracing::instrument(skip_all)] diff --git a/src/eth/storage/inmemory/inmemory_permanent.rs b/src/eth/storage/inmemory/inmemory_permanent.rs index 642abb259..b3dea0624 100644 --- a/src/eth/storage/inmemory/inmemory_permanent.rs +++ b/src/eth/storage/inmemory/inmemory_permanent.rs @@ -102,7 +102,7 @@ impl InMemoryPermanentStorage { /// Creates a new InMemoryPermanentStorage from a snapshot dump. pub fn from_snapshot(state: InMemoryPermanentStorageState) -> Self { - tracing::info!("starting inmemory permanent storage from snapshot"); + tracing::info!("creating inmemory permanent storage from snapshot"); Self { state: RwLock::new(state), block_number: AtomicU64::new(0), @@ -126,7 +126,7 @@ impl InMemoryPermanentStorage { impl Default for InMemoryPermanentStorage { fn default() -> Self { - tracing::info!("starting inmemory permanent storage"); + tracing::info!("creating inmemory permanent storage"); Self { state: RwLock::new(InMemoryPermanentStorageState::default()), block_number: Default::default(), diff --git a/src/eth/storage/postgres_external_rpc/postgres_external_rpc.rs b/src/eth/storage/postgres_external_rpc/postgres_external_rpc.rs index 152ea974b..3471375f7 100644 --- a/src/eth/storage/postgres_external_rpc/postgres_external_rpc.rs +++ b/src/eth/storage/postgres_external_rpc/postgres_external_rpc.rs @@ -16,6 +16,7 @@ use crate::eth::primitives::ExternalReceipt; use crate::eth::primitives::Hash; use crate::eth::primitives::Wei; use crate::eth::storage::ExternalRpcStorage; +use crate::ext::ResultExt; use crate::log_and_err; const MAX_RETRIES: u64 = 50; @@ -34,7 +35,7 @@ pub struct PostgresExternalRpcStorageConfig { impl PostgresExternalRpcStorage { /// Creates a new [`PostgresExternalRpcStorage`]. pub async fn new(config: PostgresExternalRpcStorageConfig) -> anyhow::Result { - tracing::info!(?config, "starting postgres external rpc storage"); + tracing::info!(?config, "creating postgres external rpc storage"); let result = PgPoolOptions::new() .min_connections(config.connections) @@ -45,7 +46,7 @@ impl PostgresExternalRpcStorage { let pool = match result { Ok(pool) => pool, - Err(e) => return log_and_err!(reason = e, "failed to start postgres external rpc storage"), + Err(e) => return log_and_err!(reason = e, "failed to create postgres external rpc storage"), }; Ok(Self { pool }) @@ -201,7 +202,7 @@ impl ExternalRpcStorage for PostgresExternalRpcStorage { // insert receipts for (hash, receipt) in receipts { - let receipt_json = serde_json::to_value(&receipt)?; + let receipt_json = serde_json::to_value(&receipt).expect_infallible(); let result = sqlx::query_file!( "src/eth/storage/postgres_external_rpc/sql/insert_external_receipt.sql", hash.as_ref(), diff --git a/src/eth/storage/postgres_permanent/postgres_permanent.rs b/src/eth/storage/postgres_permanent/postgres_permanent.rs index 930a19824..617e0021d 100644 --- a/src/eth/storage/postgres_permanent/postgres_permanent.rs +++ b/src/eth/storage/postgres_permanent/postgres_permanent.rs @@ -68,7 +68,7 @@ impl Drop for PostgresPermanentStorage { impl PostgresPermanentStorage { /// Creates a new [`PostgresPermanentStorage`]. pub async fn new(config: PostgresPermanentStorageConfig) -> anyhow::Result { - tracing::info!(?config, "starting postgres permanent storage"); + tracing::info!(?config, "creating postgres permanent storage"); let result = PgPoolOptions::new() .min_connections(config.connections) @@ -79,7 +79,7 @@ impl PostgresPermanentStorage { let pool = match result { Ok(pool) => pool.clone(), - Err(e) => return log_and_err!(reason = e, "failed to start postgres permanent storage"), + Err(e) => return log_and_err!(reason = e, "failed to create postgres permanent storage"), }; let storage = Self { diff --git a/src/eth/storage/rocks/rocks_state.rs b/src/eth/storage/rocks/rocks_state.rs index d1c0a3e3b..3bd019252 100644 --- a/src/eth/storage/rocks/rocks_state.rs +++ b/src/eth/storage/rocks/rocks_state.rs @@ -80,7 +80,7 @@ impl RocksStorageState { } pub fn listen_for_backup_trigger(&self, rx: mpsc::Receiver<()>) -> anyhow::Result<()> { - tracing::info!("starting backup trigger listener"); + tracing::info!("creating backup trigger listener"); let accounts = Arc::>::clone(&self.accounts); let accounts_history = Arc::>::clone(&self.accounts_history); let account_slots = Arc::>::clone(&self.account_slots); diff --git a/src/eth/transaction_relayer.rs b/src/eth/transaction_relayer.rs index d5e9a5fc7..a4f438be8 100644 --- a/src/eth/transaction_relayer.rs +++ b/src/eth/transaction_relayer.rs @@ -21,7 +21,7 @@ pub struct TransactionRelayer { impl TransactionRelayer { /// Creates a new [`TransactionRelayer`]. pub fn new(storage: Arc, chain: BlockchainClient) -> Self { - tracing::info!(?chain, "starting transaction relayer"); + tracing::info!(?chain, "creating transaction relayer"); Self { storage, chain } } diff --git a/src/ext.rs b/src/ext.rs index ef238c7d7..c1af69ef6 100644 --- a/src/ext.rs +++ b/src/ext.rs @@ -44,8 +44,10 @@ macro_rules! gen_test_serde { paste::paste! { #[test] pub fn []() { + use $crate::ext::ResultExt; + let value = ::fake::<$type>(&fake::Faker); - let json = serde_json::to_string(&value).unwrap(); + let json = serde_json::to_string(&value).expect_infallible(); assert_eq!(serde_json::from_str::<$type>(&json).unwrap(), value); } } @@ -122,6 +124,25 @@ pub fn parse_duration(s: &str) -> anyhow::Result { Err(anyhow!("invalid duration format: {}", s)) } +// ----------------------------------------------------------------------------- +// Result +// ----------------------------------------------------------------------------- + +/// Extensions for `Result`. +pub trait ResultExt { + /// Unwraps a result informing that this operation is expected to be infallible. + fn expect_infallible(self) -> T; +} + +impl ResultExt for Result +where + T: Sized, +{ + fn expect_infallible(self) -> T { + self.expect("serialization should be infallible") + } +} + // ----------------------------------------------------------------------------- // Option // ----------------------------------------------------------------------------- diff --git a/src/infra/blockchain_client/blockchain_client.rs b/src/infra/blockchain_client/blockchain_client.rs index 3da94ffb5..270e73b38 100644 --- a/src/infra/blockchain_client/blockchain_client.rs +++ b/src/infra/blockchain_client/blockchain_client.rs @@ -26,6 +26,7 @@ use crate::eth::primitives::SlotValue; use crate::eth::primitives::StoragePointInTime; use crate::eth::primitives::Wei; use crate::ext::DisplayExt; +use crate::ext::ResultExt; use crate::log_and_err; #[derive(Debug)] @@ -44,31 +45,24 @@ impl BlockchainClient { /// Creates a new RPC client connected to HTTP and optionally to WS. pub async fn new_http_ws(http_url: &str, ws_url: Option<&str>, timeout: Duration) -> anyhow::Result { - tracing::info!(%http_url, "starting blockchain client"); + tracing::info!(%http_url, "creating blockchain client"); // build http provider - let http = match Self::build_http_client(http_url, timeout) { - Ok(http) => http, - Err(e) => { - tracing::error!(reason = ?e, url = %http_url, "failed to create blockchain http client"); - return Err(e).context("failed to create blockchain http client"); - } - }; + let http = Self::build_http_client(http_url, timeout)?; // build ws provider - let (ws, ws_url) = if let Some(ws_url) = ws_url { - match Self::build_ws_client(ws_url, timeout).await { - Ok(ws) => (Some(RwLock::new(ws)), Some(ws_url.to_string())), - Err(e) => { - tracing::error!(reason = ?e, url = %ws_url, "failed to create blockchain websocket client"); - return Err(e).context("failed to create blockchain websocket client"); - } - } + let ws = if let Some(ws_url) = ws_url { + Some(RwLock::new(Self::build_ws_client(ws_url, timeout).await?)) } else { - (None, None) + None }; - let client = Self { http, ws, ws_url, timeout }; + let client = Self { + http, + ws, + ws_url: ws_url.map(|x| x.to_owned()), + timeout, + }; // check health before assuming it is ok client.fetch_listening().await?; @@ -146,7 +140,7 @@ impl BlockchainClient { pub async fn fetch_block(&self, number: BlockNumber) -> anyhow::Result { tracing::debug!(%number, "fetching block"); - let number = serde_json::to_value(number)?; + let number = serde_json::to_value(number).expect_infallible(); let result = self .http .request::>("eth_getBlockByNumber", vec![number, JsonValue::Bool(true)]) @@ -162,7 +156,7 @@ impl BlockchainClient { pub async fn fetch_transaction(&self, hash: Hash) -> anyhow::Result> { tracing::debug!(%hash, "fetching transaction"); - let hash = serde_json::to_value(hash)?; + let hash = serde_json::to_value(hash).expect_infallible(); let result = self .http @@ -179,7 +173,7 @@ impl BlockchainClient { pub async fn fetch_receipt(&self, hash: Hash) -> anyhow::Result> { tracing::debug!(%hash, "fetching transaction receipt"); - let hash = serde_json::to_value(hash)?; + let hash = serde_json::to_value(hash).expect_infallible(); let result = self .http .request::, Vec>("eth_getTransactionReceipt", vec![hash]) @@ -195,8 +189,8 @@ impl BlockchainClient { pub async fn fetch_balance(&self, address: &Address, number: Option) -> anyhow::Result { tracing::debug!(%address, ?number, "fetching account balance"); - let address = serde_json::to_value(address)?; - let number = serde_json::to_value(number)?; + let address = serde_json::to_value(address).expect_infallible(); + let number = serde_json::to_value(number).expect_infallible(); let result = self.http.request::>("eth_getBalance", vec![address, number]).await; match result { @@ -209,11 +203,11 @@ impl BlockchainClient { pub async fn fetch_storage_at(&self, address: &Address, index: &SlotIndex, point_in_time: StoragePointInTime) -> anyhow::Result { tracing::debug!(%address, ?point_in_time, "fetching account balance"); - let address = serde_json::to_value(address)?; - let index = serde_json::to_value(index)?; + let address = serde_json::to_value(address).expect_infallible(); + let index = serde_json::to_value(index).expect_infallible(); let number = match point_in_time { - StoragePointInTime::Present => serde_json::to_value("latest")?, - StoragePointInTime::Past(number) => serde_json::to_value(number)?, + StoragePointInTime::Present => serde_json::to_value("latest").expect_infallible(), + StoragePointInTime::Past(number) => serde_json::to_value(number).expect_infallible(), }; let result = self .http @@ -234,7 +228,7 @@ impl BlockchainClient { pub async fn send_raw_transaction(&self, hash: Hash, tx: Bytes) -> anyhow::Result> { tracing::debug!(%hash, "sending raw transaction"); - let tx = serde_json::to_value(tx)?; + let tx = serde_json::to_value(tx).expect_infallible(); let result = self.http.request::>("eth_sendRawTransaction", vec![tx]).await; match result { diff --git a/src/infra/docker.rs b/src/infra/docker.rs index 0bbe889d7..151e947ab 100644 --- a/src/infra/docker.rs +++ b/src/infra/docker.rs @@ -14,7 +14,7 @@ impl Docker { /// Starts PostgreSQL container for local development. #[must_use] pub fn start_postgres(&self) -> Container<'_, PostgresImage> { - tracing::info!("starting postgres container"); + tracing::info!("creating postgres container"); let image = RunnableImage::from(PostgresImage::default().with_user("postgres").with_password("123").with_db_name("stratus")) .with_mapped_port((5432, 5432)) @@ -32,7 +32,7 @@ impl Docker { /// Starts Prometheus container for local development. #[must_use] pub fn start_prometheus(&self) -> Container<'_, GenericImage> { - tracing::info!("starting prometheus container"); + tracing::info!("creating prometheus container"); let prometheus_image = GenericImage::new("prom/prometheus", "v2.50.1").with_wait_for(WaitFor::StdErrMessage { message: "Starting rule manager...".to_string(), diff --git a/src/infra/metrics/metrics_init.rs b/src/infra/metrics/metrics_init.rs index d4f69bd29..352f4e0e4 100644 --- a/src/infra/metrics/metrics_init.rs +++ b/src/infra/metrics/metrics_init.rs @@ -29,7 +29,7 @@ const BUCKET_FOR_DURATION: [f64; 37] = [ /// /// Default configuration runs metrics exporter on port 9000. pub fn init_metrics(histogram_kind: MetricsHistogramKind) { - tracing::info!("starting metrics"); + tracing::info!("creating metrics exporter"); // get metric definitions let mut metrics = Vec::new(); @@ -55,7 +55,7 @@ pub fn init_metrics(histogram_kind: MetricsHistogramKind) { } } - builder.install().expect("failed to start metrics"); + builder.install().expect("failed to create metrics exporter"); // init metric description (always after provider started) for metric in &metrics { diff --git a/src/infra/tracing.rs b/src/infra/tracing.rs index 7cfe5d320..38535a653 100644 --- a/src/infra/tracing.rs +++ b/src/infra/tracing.rs @@ -19,12 +19,12 @@ use crate::ext::spawn_named; /// Init application global tracing. pub async fn init_tracing(url: Option<&String>) { - println!("starting tracing"); + println!("creating tracing registry"); // configure stdout layer let format_as_json = env::var_os("JSON_LOGS").is_some_and(|var| not(var.is_empty())); let stdout_layer = if format_as_json { - println!("tracing enabling json logs"); + println!("tracing registry enabling json logs"); fmt::Layer::default() .json() .with_target(true) @@ -33,7 +33,7 @@ pub async fn init_tracing(url: Option<&String>) { .with_filter(EnvFilter::from_default_env()) .boxed() } else { - println!("tracing enabling text logs"); + println!("tracing registry enabling text logs"); fmt::Layer::default() .with_target(false) .with_thread_ids(true) @@ -43,13 +43,13 @@ pub async fn init_tracing(url: Option<&String>) { }; // configure tokio console layer - println!("tracing enabling tokio console"); + println!("tracing registry enabling tokio console"); let (console_layer, console_server) = ConsoleLayer::builder().with_default_env().build(); // configure opentelemetry layer let opentelemetry_layer = match url { Some(url) => { - println!("tracing enabling opentelemetry"); + println!("tracing registry enabling opentelemetry"); let tracer = opentelemetry_otlp::new_pipeline() .tracing() .with_exporter(opentelemetry_otlp::new_exporter().tonic().with_endpoint(url)) @@ -72,11 +72,9 @@ pub async fn init_tracing(url: Option<&String>) { // init tokio console server spawn_named("console::grpc-server", async move { if let Err(e) = console_server.serve().await { - tracing::error!(reason = ?e, "failed to start tokio-console server"); + tracing::error!(reason = ?e, "failed to create tokio-console server"); }; }); - - tracing::info!("started tracing"); } /// Emits an warning that a task is exiting because it received a cancenllation signal. From 548042502b23f251faa4c6f1be57664ef0370c43 Mon Sep 17 00:00:00 2001 From: Renato Dinhani <101204870+dinhani-cw@users.noreply.github.com> Date: Mon, 27 May 2024 23:06:04 -0300 Subject: [PATCH 09/11] feat: more logs (#938) --- src/bin/importer_offline.rs | 6 ++++- src/config.rs | 3 +++ src/eth/rpc/rpc_server.rs | 3 +-- src/eth/rpc/rpc_subscriptions.rs | 24 +++++++------------ .../storage/inmemory/inmemory_temporary.rs | 2 +- src/eth/storage/rocks/rocks_permanent.rs | 2 +- src/ext.rs | 3 +++ .../blockchain_client/blockchain_client.rs | 2 +- src/infra/tracing.rs | 8 ++++++- 9 files changed, 30 insertions(+), 23 deletions(-) diff --git a/src/bin/importer_offline.rs b/src/bin/importer_offline.rs index 200cd7dcd..313735901 100644 --- a/src/bin/importer_offline.rs +++ b/src/bin/importer_offline.rs @@ -20,6 +20,7 @@ use stratus::eth::storage::StratusStorage; use stratus::eth::BlockMiner; use stratus::eth::Executor; use stratus::ext::ResultExt; +use stratus::infra::tracing::info_task_spawn; use stratus::log_and_err; use stratus::utils::calculate_tps_and_bpm; use stratus::GlobalServices; @@ -82,6 +83,8 @@ async fn run(config: ImporterOfflineConfig) -> anyhow::Result<()> { // execute thread: external rpc storage loader let storage_thread = thread::Builder::new().name("storage-loader".into()); let storage_tokio = Handle::current(); + + info_task_spawn("storage-loader"); let storage_loader_thread = storage_thread .spawn(move || { let _tokio_guard = storage_tokio.enter(); @@ -103,6 +106,8 @@ async fn run(config: ImporterOfflineConfig) -> anyhow::Result<()> { // execute thread: block importer let importer_thread = thread::Builder::new().name("block-importer".into()); let importer_tokio = Handle::current(); + + info_task_spawn("block-importer"); let block_importer_thread = importer_thread .spawn(move || { let _tokio_guard = importer_tokio.enter(); @@ -139,7 +144,6 @@ async fn execute_block_importer( blocks_to_export_snapshot: Vec, ) -> anyhow::Result<()> { const TASK_NAME: &str = "external-block-executor"; - tracing::info!("creating task {}", TASK_NAME); // receives blocks and receipts from the backlog to reexecute and import loop { diff --git a/src/config.rs b/src/config.rs index f6817dc90..e74b8f030 100644 --- a/src/config.rs +++ b/src/config.rs @@ -42,6 +42,7 @@ use crate::eth::EvmTask; use crate::eth::Executor; use crate::eth::TransactionRelayer; use crate::ext::parse_duration; +use crate::infra::tracing::info_task_spawn; use crate::infra::tracing::warn_task_tx_closed; use crate::infra::BlockchainClient; use crate::GlobalState; @@ -191,6 +192,8 @@ impl ExecutorConfig { // spawn thread that will run evm // todo: needs a way to signal error like a cancellation token in case it fails to initialize let t = thread::Builder::new().name("evm".into()); + + info_task_spawn(TASK_NAME); t.spawn(move || { // init tokio let _tokio_guard = evm_tokio.enter(); diff --git a/src/eth/rpc/rpc_server.rs b/src/eth/rpc/rpc_server.rs index 45bdceddd..4f1b7b025 100644 --- a/src/eth/rpc/rpc_server.rs +++ b/src/eth/rpc/rpc_server.rs @@ -61,8 +61,7 @@ pub async fn serve_rpc( chain_id: ChainId, ) -> anyhow::Result<()> { const TASK_NAME: &str = "rpc-server"; - - tracing::info!("creating task {}", TASK_NAME); + tracing::info!("creating {}", TASK_NAME); // configure subscriptions let subs = RpcSubscriptions::spawn( diff --git a/src/eth/rpc/rpc_subscriptions.rs b/src/eth/rpc/rpc_subscriptions.rs index a50d4938e..2e32ff904 100644 --- a/src/eth/rpc/rpc_subscriptions.rs +++ b/src/eth/rpc/rpc_subscriptions.rs @@ -61,10 +61,8 @@ impl RpcSubscriptions { /// Spawns a new task to clean up closed subscriptions from time to time. fn spawn_subscriptions_cleaner(subs: Arc) -> JoinHandle> { - const TASK_NAME: &str = "rpc-subscription-cleaner"; - tracing::info!("spawning {}", TASK_NAME); - - spawn_named("rpc::sub::cleaner", async move { + const TASK_NAME: &str = "rpc::sub::cleaner"; + spawn_named(TASK_NAME, async move { loop { if GlobalState::warn_if_shutdown(TASK_NAME) { return Ok(()); @@ -91,10 +89,8 @@ impl RpcSubscriptions { /// Spawns a new task that notifies subscribers about new executed transactions. fn spawn_new_pending_txs_notifier(subs: Arc, mut rx: broadcast::Receiver) -> JoinHandle> { - const TASK_NAME: &str = "rpc-newPendingTransactions-notifier"; - tracing::info!("spawning {}", TASK_NAME); - - spawn_named("rpc::sub::newPendingTransactions", async move { + const TASK_NAME: &str = "rpc::sub::newPendingTransactions"; + spawn_named(TASK_NAME, async move { loop { if GlobalState::warn_if_shutdown(TASK_NAME) { return Ok(()); @@ -114,10 +110,8 @@ impl RpcSubscriptions { /// Spawns a new task that notifies subscribers about new created blocks. fn spawn_new_heads_notifier(subs: Arc, mut rx: broadcast::Receiver) -> JoinHandle> { - const TASK_NAME: &str = "rpc-newHeads-notifier"; - tracing::info!("spawning {}", TASK_NAME); - - spawn_named("rpc::sub::newHeads", async move { + const TASK_NAME: &str = "rpc::sub::newHeads"; + spawn_named(TASK_NAME, async move { loop { if GlobalState::warn_if_shutdown(TASK_NAME) { return Ok(()); @@ -137,10 +131,8 @@ impl RpcSubscriptions { /// Spawns a new task that notifies subscribers about new transactions logs. fn spawn_logs_notifier(subs: Arc, mut rx: broadcast::Receiver) -> JoinHandle> { - const TASK_NAME: &str = "rpc-logs-notifier"; - tracing::info!("spawning {}", TASK_NAME); - - spawn_named("rpc::sub::logs", async move { + const TASK_NAME: &str = "rpc::sub::logs"; + spawn_named(TASK_NAME, async move { loop { if GlobalState::warn_if_shutdown(TASK_NAME) { return Ok(()); diff --git a/src/eth/storage/inmemory/inmemory_temporary.rs b/src/eth/storage/inmemory/inmemory_temporary.rs index 76b89c819..9c4dd72c3 100644 --- a/src/eth/storage/inmemory/inmemory_temporary.rs +++ b/src/eth/storage/inmemory/inmemory_temporary.rs @@ -52,7 +52,7 @@ impl InMemoryTemporaryStorage { impl Default for InMemoryTemporaryStorage { fn default() -> Self { - tracing::info!("starting inmemory temporary storage"); + tracing::info!("creating inmemory temporary storage"); Self { states: RwLock::new(NonEmpty::new(InMemoryTemporaryStorageState::default())), } diff --git a/src/eth/storage/rocks/rocks_permanent.rs b/src/eth/storage/rocks/rocks_permanent.rs index c4722cdbb..64c82af7f 100644 --- a/src/eth/storage/rocks/rocks_permanent.rs +++ b/src/eth/storage/rocks/rocks_permanent.rs @@ -40,7 +40,7 @@ pub struct RocksPermanentStorage { impl RocksPermanentStorage { pub async fn new() -> anyhow::Result { - tracing::info!("starting rocksdb storage"); + tracing::info!("creating rocksdb storage"); let state = RocksStorageState::new(); state.sync_data().await?; diff --git a/src/ext.rs b/src/ext.rs index c1af69ef6..9c954d86a 100644 --- a/src/ext.rs +++ b/src/ext.rs @@ -4,6 +4,8 @@ use std::time::Duration; use anyhow::anyhow; +use crate::infra::tracing::info_task_spawn; + // ----------------------------------------------------------------------------- // Macros // ----------------------------------------------------------------------------- @@ -185,6 +187,7 @@ pub fn spawn_named(name: &str, task: impl std::future::Future + S where T: Send + 'static, { + info_task_spawn(name); tokio::task::Builder::new().name(name).spawn(task).expect("spawning named task should not fail") } diff --git a/src/infra/blockchain_client/blockchain_client.rs b/src/infra/blockchain_client/blockchain_client.rs index 270e73b38..edebd3223 100644 --- a/src/infra/blockchain_client/blockchain_client.rs +++ b/src/infra/blockchain_client/blockchain_client.rs @@ -124,7 +124,7 @@ impl BlockchainClient { } } - /// Retrieves the current block number. + /// Fetches the current block number. pub async fn fetch_block_number(&self) -> anyhow::Result { tracing::debug!("fetching block number"); diff --git a/src/infra/tracing.rs b/src/infra/tracing.rs index 38535a653..90219bbd0 100644 --- a/src/infra/tracing.rs +++ b/src/infra/tracing.rs @@ -33,7 +33,7 @@ pub async fn init_tracing(url: Option<&String>) { .with_filter(EnvFilter::from_default_env()) .boxed() } else { - println!("tracing registry enabling text logs"); + println!("tracing registry enabling text logs"); fmt::Layer::default() .with_target(false) .with_thread_ids(true) @@ -77,6 +77,12 @@ pub async fn init_tracing(url: Option<&String>) { }); } +/// Emits an info message that a task was spawned to backgroud. +#[track_caller] +pub fn info_task_spawn(name: &str) { + tracing::info!(%name, "spawning task"); +} + /// Emits an warning that a task is exiting because it received a cancenllation signal. /// /// Returns the formatted tracing message. From 1a861cf2c6ff2976a36bdf72e7d28bd18d89b3e5 Mon Sep 17 00:00:00 2001 From: carneiro-cw <156914855+carneiro-cw@users.noreply.github.com> Date: Tue, 28 May 2024 12:09:44 -0300 Subject: [PATCH 10/11] feat: debug_readAllSlotsFromAccount rpc call (#939) --- src/eth/rpc/rpc_server.rs | 7 +++++++ src/eth/storage/inmemory/inmemory_permanent.rs | 11 +++++++++++ src/eth/storage/permanent_storage.rs | 3 +++ .../postgres_permanent/postgres_permanent.rs | 4 ++++ src/eth/storage/rocks/rocks_permanent.rs | 16 ++++++++++++++++ src/eth/storage/rocks/types.rs | 2 ++ src/eth/storage/stratus_storage.rs | 5 +++++ 7 files changed, 48 insertions(+) diff --git a/src/eth/rpc/rpc_server.rs b/src/eth/rpc/rpc_server.rs index 4f1b7b025..c512ba664 100644 --- a/src/eth/rpc/rpc_server.rs +++ b/src/eth/rpc/rpc_server.rs @@ -131,6 +131,7 @@ fn register_methods(mut module: RpcModule) -> anyhow::Result, ctx: Arc) Ok(serde_json::to_value(timestamp).expect_infallible()) } +#[cfg(feature = "dev")] +async fn debug_read_all_slots(params: Params<'_>, ctx: Arc) -> anyhow::Result { + let (_, address) = next_rpc_param::
(params.sequence())?; + Ok(serde_json::to_value(ctx.storage.read_all_slots(&address).await?).expect_infallible()) +} + // Status async fn net_listening(params: Params<'_>, arc: Arc) -> anyhow::Result { stratus_readiness(params, arc).await diff --git a/src/eth/storage/inmemory/inmemory_permanent.rs b/src/eth/storage/inmemory/inmemory_permanent.rs index b3dea0624..270d1bea1 100644 --- a/src/eth/storage/inmemory/inmemory_permanent.rs +++ b/src/eth/storage/inmemory/inmemory_permanent.rs @@ -219,6 +219,17 @@ impl PermanentStorage for InMemoryPermanentStorage { Ok(slots) } + async fn read_all_slots(&self, address: &Address) -> anyhow::Result> { + let state = self.lock_read().await; + + let Some(account) = state.accounts.get(address) else { + tracing::trace!(%address, "account not found in permanent"); + return Ok(Default::default()); + }; + + Ok(account.slots.clone().into_values().map(|slot| slot.get_current()).collect()) + } + async fn read_block(&self, selection: &BlockSelection) -> anyhow::Result> { tracing::debug!(?selection, "reading block"); diff --git a/src/eth/storage/permanent_storage.rs b/src/eth/storage/permanent_storage.rs index 709ada995..ac97a1c47 100644 --- a/src/eth/storage/permanent_storage.rs +++ b/src/eth/storage/permanent_storage.rs @@ -81,6 +81,9 @@ pub trait PermanentStorage: Send + Sync { /// Retrieves a random sample of slots, from the provided start and end blocks. async fn read_slots_sample(&self, start: BlockNumber, end: BlockNumber, max_samples: u64, seed: u64) -> anyhow::Result>; + /// Retrieves all current slots associated to an address. + async fn read_all_slots(&self, address: &Address) -> anyhow::Result>; + // ------------------------------------------------------------------------- // Global state // ------------------------------------------------------------------------- diff --git a/src/eth/storage/postgres_permanent/postgres_permanent.rs b/src/eth/storage/postgres_permanent/postgres_permanent.rs index 617e0021d..f267243a2 100644 --- a/src/eth/storage/postgres_permanent/postgres_permanent.rs +++ b/src/eth/storage/postgres_permanent/postgres_permanent.rs @@ -97,6 +97,10 @@ impl PermanentStorage for PostgresPermanentStorage { PermanentStorageKind::Postgres { url: self.url.clone() } } + async fn read_all_slots(&self, _address: &Address) -> anyhow::Result> { + todo!(); + } + async fn allocate_evm_thread_resources(&self) -> anyhow::Result<()> { let conn = self.pool.acquire().await?; let conn = conn.leak(); diff --git a/src/eth/storage/rocks/rocks_permanent.rs b/src/eth/storage/rocks/rocks_permanent.rs index 64c82af7f..626eaebe2 100644 --- a/src/eth/storage/rocks/rocks_permanent.rs +++ b/src/eth/storage/rocks/rocks_permanent.rs @@ -9,6 +9,8 @@ use async_trait::async_trait; use futures::future::join_all; use super::rocks_state::RocksStorageState; +use super::types::AddressRocksdb; +use super::types::SlotIndexRocksdb; use crate::config::PermanentStorageKind; use crate::eth::primitives::Account; use crate::eth::primitives::Address; @@ -229,4 +231,18 @@ impl PermanentStorage for RocksPermanentStorage { async fn read_slots_sample(&self, _start: BlockNumber, _end: BlockNumber, _max_samples: u64, _seed: u64) -> anyhow::Result> { todo!() } + + async fn read_all_slots(&self, address: &Address) -> anyhow::Result> { + let address: AddressRocksdb = (*address).into(); + Ok(self + .state + .account_slots + .iter_from((address, SlotIndexRocksdb::from(0)), rocksdb::Direction::Forward) + .take_while(|((addr, _), _)| &address == addr) + .map(|((_, idx), value)| Slot { + index: idx.into(), + value: value.into(), + }) + .collect()) + } } diff --git a/src/eth/storage/rocks/types.rs b/src/eth/storage/rocks/types.rs index e3712e7ce..6c5bf66a6 100644 --- a/src/eth/storage/rocks/types.rs +++ b/src/eth/storage/rocks/types.rs @@ -238,6 +238,8 @@ impl From for u64 { #[derive(Clone, Default, Hash, Eq, PartialEq, PartialOrd, Ord, serde::Serialize, serde::Deserialize)] pub struct SlotIndexRocksdb(U256); +gen_newtype_from!(self = SlotIndexRocksdb, other = u64); + impl SlotIndexRocksdb { pub fn inner_value(&self) -> U256 { self.0 diff --git a/src/eth/storage/stratus_storage.rs b/src/eth/storage/stratus_storage.rs index 5bcfc26c0..8eeefd9d7 100644 --- a/src/eth/storage/stratus_storage.rs +++ b/src/eth/storage/stratus_storage.rs @@ -327,6 +327,11 @@ impl StratusStorage { Ok(slots) } + #[tracing::instrument(skip_all)] + pub async fn read_all_slots(&self, address: &Address) -> anyhow::Result> { + self.perm.read_all_slots(address).await + } + // ------------------------------------------------------------------------- // Blocks // ------------------------------------------------------------------------- From dd874b1784e4968a51340847f9268187e58a2125 Mon Sep 17 00:00:00 2001 From: Renato Dinhani <101204870+dinhani-cw@users.noreply.github.com> Date: Tue, 28 May 2024 16:08:12 -0300 Subject: [PATCH 11/11] fix: max gas allowed per transaction (#940) --- src/eth/evm/revm.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/eth/evm/revm.rs b/src/eth/evm/revm.rs index 308169435..8bb2f7cf3 100644 --- a/src/eth/evm/revm.rs +++ b/src/eth/evm/revm.rs @@ -5,6 +5,7 @@ //! interacting with the project's storage backend to manage state. `Revm` embodies the practical application //! of the `Evm` trait, serving as a bridge between Ethereum's abstract operations and Stratus's storage mechanisms. +use std::cmp::min; use std::collections::HashMap; use std::collections::HashSet; use std::sync::Arc; @@ -54,6 +55,9 @@ use crate::ext::OptionExt; #[cfg(feature = "metrics")] use crate::infra::metrics; +/// Maximum gas limit allowed for a transaction. Prevents a transaction from consuming too many resources. +const GAS_MAX_LIMIT: u64 = 1_000_000_000; + /// Implementation of EVM using [`revm`](https://crates.io/crates/revm). pub struct Revm { evm: RevmEvm<'static, (), RevmSession>, @@ -128,7 +132,7 @@ impl Evm for Revm { Some(contract) => TransactTo::Call(contract.into()), None => TransactTo::Create(CreateScheme::Create), }; - tx_env.gas_limit = input.gas_limit.into(); + tx_env.gas_limit = min(input.gas_limit.into(), GAS_MAX_LIMIT); tx_env.gas_price = input.gas_price.into(); tx_env.chain_id = input.chain_id.map_into(); tx_env.nonce = input.nonce.map_into();