From e8a595deeee40c7d666fe63ed1d339015b78ba43 Mon Sep 17 00:00:00 2001 From: Renato Dinhani <101204870+dinhani-cw@users.noreply.github.com> Date: Thu, 6 Jun 2024 14:44:04 -0300 Subject: [PATCH 1/4] feat: track found in some rpc methods (#1027) --- src/bin/importer_online.rs | 6 ++-- src/eth/block_miner.rs | 10 +++--- src/eth/executor.rs | 8 ++--- src/eth/relayer/external.rs | 8 ++--- src/eth/rpc/rpc_server.rs | 51 ++++++++++++++++++++---------- src/eth/storage/stratus_storage.rs | 22 ++++++------- src/ext.rs | 4 +-- 7 files changed, 63 insertions(+), 46 deletions(-) diff --git a/src/bin/importer_online.rs b/src/bin/importer_online.rs index 4e087ce01..fa003bb3a 100644 --- a/src/bin/importer_online.rs +++ b/src/bin/importer_online.rs @@ -353,7 +353,7 @@ async fn fetch_block_and_receipts(chain: Arc, number: BlockNum #[tracing::instrument(name = "importer::fetch_block", skip_all, fields(number))] async fn fetch_block(chain: Arc, number: BlockNumber) -> ExternalBlock { Span::with(|s| { - s.rec("number", &number); + s.rec_str("number", &number); }); let mut backoff = 10; @@ -391,8 +391,8 @@ async fn fetch_block(chain: Arc, number: BlockNumber) -> Exter #[tracing::instrument(name = "importer::fetch_receipt", skip_all, fields(number, hash))] async fn fetch_receipt(chain: Arc, number: BlockNumber, hash: Hash) -> ExternalReceipt { Span::with(|s| { - s.rec("number", &number); - s.rec("hash", &hash); + s.rec_str("number", &number); + s.rec_str("hash", &hash); }); loop { diff --git a/src/eth/block_miner.rs b/src/eth/block_miner.rs index 1e0ec1e7e..3c003caf7 100644 --- a/src/eth/block_miner.rs +++ b/src/eth/block_miner.rs @@ -93,7 +93,7 @@ impl BlockMiner { #[tracing::instrument(name = "miner::save_execution", skip_all, fields(hash))] pub async fn save_execution(&self, tx_execution: TransactionExecution) -> anyhow::Result<()> { Span::with(|s| { - s.rec("hash", &tx_execution.hash()); + s.rec_str("hash", &tx_execution.hash()); }); // save execution to temporary storage @@ -149,7 +149,7 @@ impl BlockMiner { let block = block_from_external(external_block, mined_txs); block.map(|block| { - Span::with(|s| s.rec("number", &block.number())); + Span::with(|s| s.rec_str("number", &block.number())); block }) } @@ -187,7 +187,7 @@ impl BlockMiner { block.push_execution(tx.input, tx.result); } - Span::with(|s| s.rec("number", &block.number())); + Span::with(|s| s.rec_str("number", &block.number())); Ok(block) } @@ -220,7 +220,7 @@ impl BlockMiner { }; block.map(|block| { - Span::with(|s| s.rec("number", &block.number())); + Span::with(|s| s.rec_str("number", &block.number())); block }) } @@ -234,7 +234,7 @@ impl BlockMiner { /// Persists a mined block to permanent storage and prepares new block. #[tracing::instrument(name = "miner::commit", skip_all, fields(number))] pub async fn commit(&self, block: Block) -> anyhow::Result<()> { - Span::with(|s| s.rec("number", &block.number())); + Span::with(|s| s.rec_str("number", &block.number())); tracing::info!(number = %block.number(), transactions_len = %block.transactions.len(), "commiting block"); diff --git a/src/eth/executor.rs b/src/eth/executor.rs index 4bcac0b9e..bc1efb06c 100644 --- a/src/eth/executor.rs +++ b/src/eth/executor.rs @@ -100,7 +100,7 @@ impl Executor { let (start, mut block_metrics) = (metrics::now(), ExecutionMetrics::default()); Span::with(|s| { - s.rec("number", &block.number()); + s.rec_str("number", &block.number()); }); tracing::info!(number = %block.number(), "reexecuting external block"); @@ -184,7 +184,7 @@ impl Executor { block: &ExternalBlock, ) -> Result { Span::with(|s| { - s.rec("hash", &tx.hash); + s.rec_str("hash", &tx.hash); }); self.external_transaction_inner(tx, receipt, block).await.map_err(|e| (tx, receipt, e)) @@ -264,8 +264,8 @@ impl Executor { let (start, function) = (metrics::now(), tx_input.extract_function()); Span::with(|s| { - s.rec("hash", &tx_input.hash); - s.rec("from", &tx_input.signer); + s.rec_str("hash", &tx_input.hash); + s.rec_str("from", &tx_input.signer); s.rec_opt("to", &tx_input.to); }); tracing::info!( diff --git a/src/eth/relayer/external.rs b/src/eth/relayer/external.rs index 52d8bfd6e..92f62398e 100644 --- a/src/eth/relayer/external.rs +++ b/src/eth/relayer/external.rs @@ -86,7 +86,7 @@ impl ExternalRelayer { // fill span let span = Span::current(); - span.rec("block_number", &block_number); + span.rec_str("block_number", &block_number); // TODO: Replace failed transactions with transactions that will for sure fail in substrate (need access to primary keys) let dag = TransactionDag::new(block.transactions); @@ -137,7 +137,7 @@ impl ExternalRelayer { // fill span let span = Span::current(); - span.rec("hash", &tx_hash); + span.rec_str("hash", &tx_hash); let start = Instant::now(); let mut substrate_receipt = substrate_pending_transaction; @@ -233,7 +233,7 @@ impl ExternalRelayer { // fill span let span = Span::current(); - span.rec("hash", &tx_hash); + span.rec_str("hash", &tx_hash); let ethers_tx = Transaction::from(tx_mined.input.clone()); let tx = loop { @@ -324,7 +324,7 @@ impl ExternalRelayerClient { // fill span let span = Span::current(); - span.rec("block_number", &block_number); + span.rec_str("block_number", &block_number); sqlx::query!( "INSERT INTO relayer_blocks (number, payload) VALUES ($1, $2)", diff --git a/src/eth/rpc/rpc_server.rs b/src/eth/rpc/rpc_server.rs index 0dd38d319..ee8a87437 100644 --- a/src/eth/rpc/rpc_server.rs +++ b/src/eth/rpc/rpc_server.rs @@ -282,12 +282,12 @@ async fn eth_block_number(_params: Params<'_>, ctx: Arc) -> anyhow:: Ok(serde_json::to_value(number).expect_infallible()) } -#[tracing::instrument(name = "rpc::eth_getBlockByHash", parent = None, skip_all, fields(hash))] +#[tracing::instrument(name = "rpc::eth_getBlockByHash", parent = None, skip_all, fields(filter, found, number))] async fn eth_get_block_by_hash(params: Params<'_>, ctx: Arc) -> anyhow::Result { eth_get_block_by_selector(params, ctx).await } -#[tracing::instrument(name = "rpc::eth_getBlockByNumber", parent = None, skip_all, fields(number))] +#[tracing::instrument(name = "rpc::eth_getBlockByNumber", parent = None, skip_all, fields(filter, found, number))] async fn eth_get_block_by_number(params: Params<'_>, ctx: Arc) -> anyhow::Result { eth_get_block_by_selector(params, ctx).await } @@ -298,14 +298,23 @@ async fn eth_get_block_by_selector(params: Params<'_>, ctx: Arc) -> let (_, full_transactions) = next_rpc_param::(params)?; Span::with(|s| match block_selection { - BlockSelection::Hash(hash) => s.rec("hash", &hash), - BlockSelection::Latest => s.rec("number", &"latest"), - BlockSelection::Earliest => s.rec("number", &"earliest"), - BlockSelection::Number(number) => s.rec("number", &number), + BlockSelection::Hash(hash) => s.rec_str("filter", &hash), + BlockSelection::Latest => s.rec_str("filter", &"latest"), + BlockSelection::Earliest => s.rec_str("filter", &"earliest"), + BlockSelection::Number(number) => s.rec_str("filter", &number), }); // execute let block = ctx.storage.read_block(&block_selection).await?; + + Span::with(|s| { + s.record("found", block.is_some()); + if let Some(ref block) = block { + s.rec_str("number", &block.number()); + } + }); + + // handle response match (block, full_transactions) { (Some(block), true) => Ok(block.to_json_rpc_with_full_transactions()), (Some(block), false) => Ok(block.to_json_rpc_with_transactions_hashes()), @@ -326,22 +335,30 @@ async fn eth_get_uncle_by_block_hash_and_index(_params: Params<'_>, _ctx: Arc, ctx: Arc) -> anyhow::Result { let (_, hash) = next_rpc_param::(params.sequence())?; - Span::with(|s| s.rec("hash", &hash)); + Span::with(|s| s.rec_str("hash", &hash)); let mined = ctx.storage.read_mined_transaction(&hash).await?; + Span::with(|s| { + s.record("found", mined.is_some()); + }); + match mined { Some(mined) => Ok(mined.to_json_rpc_transaction()), None => Ok(JsonValue::Null), } } -#[tracing::instrument(name = "rpc::eth_getTransactionReceipt", parent = None, skip_all, fields(hash))] +#[tracing::instrument(name = "rpc::eth_getTransactionReceipt", parent = None, skip_all, fields(hash, found))] async fn eth_get_transaction_receipt(params: Params<'_>, ctx: Arc) -> anyhow::Result { let (_, hash) = next_rpc_param::(params.sequence())?; + Span::with(|s| s.rec_str("hash", &hash)); - Span::with(|s| s.rec("hash", &hash)); + let mined = ctx.storage.read_mined_transaction(&hash).await?; + Span::with(|s| { + s.record("found", mined.is_some()); + }); - match ctx.storage.read_mined_transaction(&hash).await? { + match mined { Some(mined_transaction) => Ok(mined_transaction.to_json_rpc_receipt()), None => Ok(JsonValue::Null), } @@ -395,8 +412,8 @@ async fn eth_send_raw_transaction(params: Params<'_>, ctx: Arc) -> a let tx = parse_rpc_rlp::(&data)?; Span::with(|s| { - s.rec("hash", &tx.hash); - s.rec("from", &tx.signer); + s.rec_str("hash", &tx.hash); + s.rec_str("from", &tx.signer); s.rec_opt("to", &tx.to); }); @@ -458,7 +475,7 @@ async fn eth_get_transaction_count(params: Params<'_>, ctx: Arc) -> let (_, block_selection) = next_rpc_param_or_default::(params)?; Span::with(|s| { - s.rec("address", &address); + s.rec_str("address", &address); }); let point_in_time = ctx.storage.translate_to_point_in_time(&block_selection).await?; @@ -472,7 +489,7 @@ async fn eth_get_balance(params: Params<'_>, ctx: Arc) -> anyhow::Re let (_, block_selection) = next_rpc_param_or_default::(params)?; Span::with(|s| { - s.rec("address", &address); + s.rec_str("address", &address); }); let point_in_time = ctx.storage.translate_to_point_in_time(&block_selection).await?; @@ -487,7 +504,7 @@ async fn eth_get_code(params: Params<'_>, ctx: Arc) -> anyhow::Resul let (_, block_selection) = next_rpc_param_or_default::(params)?; Span::with(|s| { - s.rec("address", &address); + s.rec_str("address", &address); }); let point_in_time = ctx.storage.translate_to_point_in_time(&block_selection).await?; @@ -540,8 +557,8 @@ async fn eth_get_storage_at(params: Params<'_>, ctx: Arc) -> anyhow: let (_, block_selection) = next_rpc_param_or_default::(params)?; Span::with(|s| { - s.rec("address", &address); - s.rec("index", &index); + s.rec_str("address", &address); + s.rec_str("index", &index); }); let point_in_time = ctx.storage.translate_to_point_in_time(&block_selection).await?; diff --git a/src/eth/storage/stratus_storage.rs b/src/eth/storage/stratus_storage.rs index 1eaa7e319..7fdf4680d 100644 --- a/src/eth/storage/stratus_storage.rs +++ b/src/eth/storage/stratus_storage.rs @@ -133,7 +133,7 @@ impl StratusStorage { #[tracing::instrument(name = "storage::set_active_block_number", skip_all, fields(number))] pub async fn set_active_block_number(&self, number: BlockNumber) -> anyhow::Result<()> { Span::with(|s| { - s.rec("number", &number); + s.rec_str("number", &number); }); #[cfg(feature = "metrics")] @@ -165,7 +165,7 @@ impl StratusStorage { #[tracing::instrument(name = "storage::set_mined_block_number", skip_all, fields(number))] pub async fn set_mined_block_number(&self, number: BlockNumber) -> anyhow::Result<()> { - Span::with(|s| s.rec("number", &number)); + Span::with(|s| s.rec_str("number", &number)); #[cfg(feature = "metrics")] { @@ -236,8 +236,8 @@ impl StratusStorage { #[tracing::instrument(name = "storage::read_account", skip_all, fields(address, point_in_time))] pub async fn read_account(&self, address: &Address, point_in_time: &StoragePointInTime) -> anyhow::Result { Span::with(|s| { - s.rec("address", address); - s.rec("point_in_time", point_in_time); + s.rec_str("address", address); + s.rec_str("point_in_time", point_in_time); }); #[cfg(feature = "metrics")] @@ -273,9 +273,9 @@ impl StratusStorage { #[tracing::instrument(name = "storage::read_slot", skip_all, fields(address, index, point_in_time))] pub async fn read_slot(&self, address: &Address, index: &SlotIndex, point_in_time: &StoragePointInTime) -> anyhow::Result { Span::with(|s| { - s.rec("address", address); - s.rec("index", index); - s.rec("point_in_time", point_in_time); + s.rec_str("address", address); + s.rec_str("index", index); + s.rec_str("point_in_time", point_in_time); }); #[cfg(feature = "metrics")] @@ -359,7 +359,7 @@ impl StratusStorage { #[tracing::instrument(name = "storage::save_execution", skip_all, fields(hash))] pub async fn save_execution(&self, tx: TransactionExecution) -> anyhow::Result<()> { Span::with(|s| { - s.rec("hash", &tx.hash()); + s.rec_str("hash", &tx.hash()); }); #[cfg(feature = "metrics")] @@ -388,7 +388,7 @@ impl StratusStorage { let result = self.temp.finish_block().await; if let Ok(ref block) = result { - Span::with(|s| s.rec("number", &block.number)); + Span::with(|s| s.rec_str("number", &block.number)); } result @@ -396,7 +396,7 @@ impl StratusStorage { #[tracing::instrument(name = "storage::save_block", skip_all, fields(number))] pub async fn save_block(&self, block: Block) -> anyhow::Result<()> { - Span::with(|s| s.rec("number", &block.number())); + Span::with(|s| s.rec_str("number", &block.number())); #[cfg(feature = "metrics")] { @@ -426,7 +426,7 @@ impl StratusStorage { #[tracing::instrument(name = "storage::read_transaction", skip_all, fields(hash))] pub async fn read_mined_transaction(&self, hash: &Hash) -> anyhow::Result> { - Span::with(|s| s.rec("hash", hash)); + Span::with(|s| s.rec_str("hash", hash)); #[cfg(feature = "metrics")] { diff --git a/src/ext.rs b/src/ext.rs index fbbe59136..14d48a585 100644 --- a/src/ext.rs +++ b/src/ext.rs @@ -188,7 +188,7 @@ pub trait SpanExt { } /// Records a value using `ToString` implementation. - fn rec(&self, field: &'static str, value: &T) + fn rec_str(&self, field: &'static str, value: &T) where T: ToString; @@ -199,7 +199,7 @@ pub trait SpanExt { } impl SpanExt for Span { - fn rec(&self, field: &'static str, value: &T) + fn rec_str(&self, field: &'static str, value: &T) where T: ToString, { From 68ec30bee7e04a9488b0d238b27d07562fd39f65 Mon Sep 17 00:00:00 2001 From: Renato Dinhani <101204870+dinhani-cw@users.noreply.github.com> Date: Thu, 6 Jun 2024 15:09:10 -0300 Subject: [PATCH 2/4] feat: tracing feature-flag and custom tracing service name (#1028) --- Cargo.toml | 10 +++++----- src/bin/importer_online.rs | 16 +++++----------- src/config.rs | 17 +---------------- src/ext.rs | 18 ++++++++++++++++-- src/infra/tracing.rs | 3 ++- 5 files changed, 29 insertions(+), 35 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index a92fd6ada..b296c4ffe 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -173,17 +173,17 @@ path = "src/bin/relayer.rs" # ------------------------------------------------------------------------------ [features] -default = ["rocks", "metrics", "kubernetes"] +default = ["metrics", "tracing", "rocks", "kubernetes"] # Application is running in develoment mode. dev = [] -# Application is running in performance test mode. -perf = [] - -# Enable metrics dependencies and code for metrics collection. +# Enable runtime metrics collection. metrics = ["dep:metrics", "dep:metrics-exporter-prometheus"] +# Enable runtime tracing/spans collection. +tracing = [] + # Enable RocksDB dependencies. rocks = ["rocksdb", "bincode"] diff --git a/src/bin/importer_online.rs b/src/bin/importer_online.rs index fa003bb3a..f0c4efc59 100644 --- a/src/bin/importer_online.rs +++ b/src/bin/importer_online.rs @@ -371,17 +371,11 @@ async fn fetch_block(chain: Arc, number: BlockNumber) -> Exter }; if block.is_null() { - #[cfg(not(feature = "perf"))] - { - backoff *= 2; - backoff = min(backoff, 1000); // no more than 1000ms of backoff - tracing::warn!(%number, "block not available yet because block is not mined. retrying with backoff."); - sleep(Duration::from_millis(backoff)).await; - continue; - } - - #[cfg(feature = "perf")] - std::process::exit(0); + backoff *= 2; + backoff = min(backoff, 1000); // no more than 1000ms of backoff + tracing::warn!(%number, "block not available yet because block is not mined. retrying with backoff."); + sleep(Duration::from_millis(backoff)).await; + continue; } return ExternalBlock::deserialize(&block).expect("cannot fail to deserialize external block"); diff --git a/src/config.rs b/src/config.rs index 2a83d27a5..3eb283340 100644 --- a/src/config.rs +++ b/src/config.rs @@ -44,6 +44,7 @@ use crate::eth::Consensus; use crate::eth::EvmTask; use crate::eth::Executor; use crate::eth::TransactionRelayer; +use crate::ext::binary_name; use crate::ext::parse_duration; use crate::infra::tracing::info_task_spawn; use crate::infra::tracing::warn_task_tx_closed; @@ -929,19 +930,3 @@ impl FromStr for ValidatorMethodConfig { } } } - -// ----------------------------------------------------------------------------- -// Helpers -// ----------------------------------------------------------------------------- - -/// Gets the current binary basename. -fn binary_name() -> String { - let binary = std::env::current_exe().unwrap(); - let binary_basename = binary.file_name().unwrap().to_str().unwrap().to_lowercase(); - - if binary_basename.starts_with("test_") { - "tests".to_string() - } else { - binary_basename - } -} diff --git a/src/ext.rs b/src/ext.rs index 14d48a585..d9c36e5d0 100644 --- a/src/ext.rs +++ b/src/ext.rs @@ -33,6 +33,18 @@ pub fn not(value: bool) -> bool { !value } +/// Gets the current binary basename. +pub fn binary_name() -> String { + let binary = std::env::current_exe().unwrap(); + let binary_basename = binary.file_name().unwrap().to_str().unwrap().to_lowercase(); + + if binary_basename.starts_with("test_") { + "tests".to_string() + } else { + binary_basename + } +} + // ----------------------------------------------------------------------------- // From / TryFrom // ----------------------------------------------------------------------------- @@ -183,8 +195,10 @@ pub trait SpanExt { where F: Fn(Span), { - let span = Span::current(); - fill(span); + if cfg!(tracing) { + let span = Span::current(); + fill(span); + } } /// Records a value using `ToString` implementation. diff --git a/src/infra/tracing.rs b/src/infra/tracing.rs index 462494c9f..88d40e34d 100644 --- a/src/infra/tracing.rs +++ b/src/infra/tracing.rs @@ -20,6 +20,7 @@ use tracing_subscriber::util::SubscriberInitExt; use tracing_subscriber::EnvFilter; use tracing_subscriber::Layer; +use crate::ext::binary_name; use crate::ext::named_spawn; /// Init application global tracing. @@ -74,7 +75,7 @@ pub async fn init_tracing(url: Option<&String>, console_address: SocketAddr) -> let opentelemetry_layer = match url { Some(url) => { println!("tracing registry: enabling opentelemetry exporter | url={}", url); - let tracer_config = trace::config().with_resource(Resource::new(vec![KeyValue::new("service.name", "stratus")])); + let tracer_config = trace::config().with_resource(Resource::new(vec![KeyValue::new("service.name", format!("stratus-{}", binary_name()))])); let tracer_exporter = opentelemetry_otlp::new_exporter().tonic().with_endpoint(url); let tracer = opentelemetry_otlp::new_pipeline() From 621a64dee431afae8313a93951fd00269515fc35 Mon Sep 17 00:00:00 2001 From: gabriel-aranha-cw <166405807+gabriel-aranha-cw@users.noreply.github.com> Date: Thu, 6 Jun 2024 15:50:34 -0300 Subject: [PATCH 3/4] fix e2e relayer name and locks (#1015) * fix workflow naming * chore(ci): e2e relayer setup * chore: comment --------- Co-authored-by: Maycon Amaro --- .github/workflows/e2e-relayer.yml | 22 ++++++---------------- justfile | 2 +- 2 files changed, 7 insertions(+), 17 deletions(-) diff --git a/.github/workflows/e2e-relayer.yml b/.github/workflows/e2e-relayer.yml index 2f7662bfc..212a30ac4 100644 --- a/.github/workflows/e2e-relayer.yml +++ b/.github/workflows/e2e-relayer.yml @@ -1,4 +1,4 @@ -name: Contracts E2E Rocks tests +name: E2E Relayer on: pull_request: @@ -38,7 +38,7 @@ jobs: steps: - name: Checkout code - uses: actions/checkout@v2 + uses: actions/checkout@v4 - name: Set up Rust run: curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y --default-toolchain 1.75 @@ -57,22 +57,12 @@ jobs: run: sudo apt-get install -y protobuf-compiler - name: Set up Just - uses: extractions/setup-just@v1 - - - name: Set up ASDF Version Manager - uses: asdf-vm/actions/setup@v3 + uses: extractions/setup-just@v2 - name: Install Node.js - run: | - asdf plugin add nodejs https://github.com/asdf-vm/asdf-nodejs.git - asdf install nodejs 20.10.0 - asdf global nodejs 20.10.0 - - - name: Install Solidity - run: | - asdf plugin add solidity https://github.com/diegodorado/asdf-solidity.git - asdf install solidity 0.8.16 - asdf global solidity 0.8.16 + uses: actions/setup-node@v4 + with: + node-version-file: .tool-versions - name: Set up dependencies if: ${{ steps.cache-cargo.outputs.cache-hit != 'true' }} diff --git a/justfile b/justfile index 598e32d41..1add2500e 100644 --- a/justfile +++ b/justfile @@ -371,7 +371,7 @@ e2e-relayer-external-up: # Build Stratus and Relayer binaries echo "Building Stratus binary" cargo build --release --bin stratus --features dev & - echo "Building Stratus binary" + echo "Building Relayer binary" cargo build --release --bin relayer --features dev & mkdir e2e_logs From 9ef07d08b66366bb7b7c7cbfd3a6b2c7896c347c Mon Sep 17 00:00:00 2001 From: Renato Dinhani <101204870+dinhani-cw@users.noreply.github.com> Date: Thu, 6 Jun 2024 16:28:30 -0300 Subject: [PATCH 4/4] feat: some importer-online tracing (#1029) --- src/bin/importer_online.rs | 27 +++++++++++-------- src/bin/relayer.rs | 3 ++- src/eth/consensus/mod.rs | 6 ++--- src/eth/relayer/external.rs | 5 ++-- src/eth/rpc/rpc_subscriptions.rs | 3 ++- .../postgres_external_rpc.rs | 8 +++--- src/ext.rs | 14 ++++++++++ src/infra/tracing.rs | 5 ++-- tests/test_import_external_snapshot_common.rs | 3 ++- 9 files changed, 50 insertions(+), 24 deletions(-) diff --git a/src/bin/importer_online.rs b/src/bin/importer_online.rs index f0c4efc59..47df3939f 100644 --- a/src/bin/importer_online.rs +++ b/src/bin/importer_online.rs @@ -18,6 +18,7 @@ use stratus::eth::storage::StratusStorage; use stratus::eth::BlockMiner; use stratus::eth::Executor; use stratus::ext::named_spawn; +use stratus::ext::traced_sleep; use stratus::ext::DisplayExt; use stratus::ext::SpanExt; use stratus::if_else; @@ -34,7 +35,6 @@ use stratus::GlobalServices; use stratus::GlobalState; use tokio::sync::mpsc; use tokio::task::yield_now; -use tokio::time::sleep; use tokio::time::timeout; use tracing::Span; @@ -277,7 +277,7 @@ async fn start_number_fetcher(chain: Arc, sync_interval: Durat "fetched current block number via http. awaiting sync interval to retrieve again." ); set_external_rpc_current_block(number); - sleep(sync_interval).await; + traced_sleep(sync_interval).await; } Err(e) => { tracing::error!(reason = ?e, "failed to retrieve block number. retrying now."); @@ -333,12 +333,17 @@ async fn start_block_fetcher( } } +#[tracing::instrument(name = "importer::fetch_block_and_receipts", skip_all, fields(number))] async fn fetch_block_and_receipts(chain: Arc, number: BlockNumber) -> (ExternalBlock, Vec) { + Span::with(|s| { + s.rec_str("number", &number); + }); + // fetch block let block = fetch_block(Arc::clone(&chain), number).await; // wait some time until receipts are available - let _ = sleep(INTERVAL_FETCH_RECEIPTS).await; + let _ = traced_sleep(INTERVAL_FETCH_RECEIPTS).await; // fetch receipts in parallel let mut receipts_tasks = Vec::with_capacity(block.transactions.len()); @@ -356,25 +361,25 @@ async fn fetch_block(chain: Arc, number: BlockNumber) -> Exter s.rec_str("number", &number); }); - let mut backoff = 10; + let mut backoff_ms = 10; loop { tracing::info!(%number, "fetching block"); let block = match chain.fetch_block(number).await { Ok(json) => json, Err(e) => { - backoff *= 2; - backoff = min(backoff, 1000); // no more than 1000ms of backoff - tracing::warn!(reason = ?e, %number, %backoff, "failed to retrieve block. retrying with backoff."); - sleep(Duration::from_millis(backoff)).await; + backoff_ms *= 2; + backoff_ms = min(backoff_ms, 1000); // no more than 1000ms of backoff + tracing::warn!(reason = ?e, %number, %backoff_ms, "failed to retrieve block. retrying with backoff."); + traced_sleep(Duration::from_millis(backoff_ms)).await; continue; } }; if block.is_null() { - backoff *= 2; - backoff = min(backoff, 1000); // no more than 1000ms of backoff + backoff_ms *= 2; + backoff_ms = min(backoff_ms, 1000); // no more than 1000ms of backoff tracing::warn!(%number, "block not available yet because block is not mined. retrying with backoff."); - sleep(Duration::from_millis(backoff)).await; + traced_sleep(Duration::from_millis(backoff_ms)).await; continue; } diff --git a/src/bin/relayer.rs b/src/bin/relayer.rs index 6a03ebfc5..9fc9de359 100644 --- a/src/bin/relayer.rs +++ b/src/bin/relayer.rs @@ -1,6 +1,7 @@ mod importer_online; use stratus::config::ExternalRelayerConfig; +use stratus::ext::traced_sleep; #[cfg(feature = "metrics")] use stratus::infra::metrics; use stratus::utils::DropTimer; @@ -46,7 +47,7 @@ async fn run(config: ExternalRelayerConfig) -> anyhow::Result<()> { Some(block_number) => tracing::info!(number = %block_number, "relayed"), None => { tracing::info!("no pending block found"); - tokio::time::sleep(backoff).await; + traced_sleep(backoff).await; } }; } diff --git a/src/eth/consensus/mod.rs b/src/eth/consensus/mod.rs index b21b87225..bfd5046f6 100644 --- a/src/eth/consensus/mod.rs +++ b/src/eth/consensus/mod.rs @@ -24,7 +24,6 @@ use tokio::sync::mpsc::{self}; use tokio::sync::Mutex; use tokio::sync::RwLock; use tokio::task::JoinHandle; -use tokio::time::sleep; use tonic::transport::Channel; use tonic::transport::Server; use tonic::Request; @@ -35,6 +34,7 @@ use crate::eth::primitives::BlockNumber; use crate::eth::primitives::Hash; use crate::eth::storage::StratusStorage; use crate::ext::named_spawn; +use crate::ext::traced_sleep; use crate::infra::BlockchainClient; use crate::GlobalState; @@ -200,7 +200,7 @@ impl Consensus { loop { let timeout = consensus.heartbeat_timeout; tokio::select! { - _ = sleep(timeout) => { + _ = traced_sleep(timeout) => { if !consensus.is_leader().await { tracing::info!("starting election due to heartbeat timeout"); Self::start_election(Arc::clone(&consensus)).await; @@ -612,7 +612,7 @@ impl Consensus { } Err(e) => { tracing::warn!("failed to append block to peer {:?}: {:?}", peer.client, e); - sleep(RETRY_DELAY).await; + traced_sleep(RETRY_DELAY).await; } } } diff --git a/src/eth/relayer/external.rs b/src/eth/relayer/external.rs index 92f62398e..cca300d6f 100644 --- a/src/eth/relayer/external.rs +++ b/src/eth/relayer/external.rs @@ -19,6 +19,7 @@ use crate::eth::primitives::BlockNumber; use crate::eth::primitives::ExternalReceipt; use crate::eth::primitives::Hash; use crate::eth::primitives::TransactionMined; +use crate::ext::traced_sleep; use crate::ext::ResultExt; use crate::ext::SpanExt; use crate::infra::blockchain_client::pending_transaction::PendingTransaction; @@ -164,7 +165,7 @@ impl ExternalRelayer { } } substrate_receipt = PendingTransaction::new(tx_hash, &self.substrate_chain); - tokio::time::sleep(Duration::from_millis(50)).await; + traced_sleep(Duration::from_millis(50)).await; } } @@ -259,7 +260,7 @@ impl ExternalRelayer { let mut tries = 0; while self.substrate_chain.fetch_transaction(tx_mined.input.hash).await.unwrap_or(None).is_none() { tracing::warn!(?tx_mined.input.hash, ?tries, "transaction not found, retrying..."); - tokio::time::sleep(Duration::from_millis(100)).await; + traced_sleep(Duration::from_millis(100)).await; tries += 1; } diff --git a/src/eth/rpc/rpc_subscriptions.rs b/src/eth/rpc/rpc_subscriptions.rs index d6092ff4e..a0d235b44 100644 --- a/src/eth/rpc/rpc_subscriptions.rs +++ b/src/eth/rpc/rpc_subscriptions.rs @@ -17,6 +17,7 @@ use crate::eth::primitives::LogFilter; use crate::eth::primitives::LogMined; use crate::ext::named_spawn; use crate::ext::not; +use crate::ext::traced_sleep; use crate::if_else; #[cfg(feature = "metrics")] use crate::infra::metrics; @@ -83,7 +84,7 @@ impl RpcSubscriptions { } // await next iteration - tokio::time::sleep(CLEANING_FREQUENCY).await; + traced_sleep(CLEANING_FREQUENCY).await; } }) } diff --git a/src/eth/storage/postgres_external_rpc/postgres_external_rpc.rs b/src/eth/storage/postgres_external_rpc/postgres_external_rpc.rs index 3471375f7..c82cac09f 100644 --- a/src/eth/storage/postgres_external_rpc/postgres_external_rpc.rs +++ b/src/eth/storage/postgres_external_rpc/postgres_external_rpc.rs @@ -6,7 +6,6 @@ use serde_json::Value as JsonValue; use sqlx::postgres::PgPoolOptions; use sqlx::types::BigDecimal; use sqlx::PgPool; -use tokio::time::sleep; use crate::eth::primitives::Account; use crate::eth::primitives::Address; @@ -16,6 +15,7 @@ use crate::eth::primitives::ExternalReceipt; use crate::eth::primitives::Hash; use crate::eth::primitives::Wei; use crate::eth::storage::ExternalRpcStorage; +use crate::ext::traced_sleep; use crate::ext::ResultExt; use crate::log_and_err; @@ -99,7 +99,8 @@ impl ExternalRpcStorage for PostgresExternalRpcStorage { if attempts < MAX_RETRIES { attempts += 1; tracing::warn!("Attempt {} failed, retrying...: {}", attempts, e); - sleep(Duration::from_millis(attempts.pow(2))).await; // Exponential backoff + traced_sleep(Duration::from_millis(attempts.pow(2))).await; + // Exponential backoff } else { return log_and_err!(reason = e, "failed to retrieve external blocks"); } @@ -133,7 +134,8 @@ impl ExternalRpcStorage for PostgresExternalRpcStorage { if attempts < MAX_RETRIES { attempts += 1; tracing::warn!("Attempt {} failed, retrying...: {}", attempts, e); - sleep(Duration::from_millis(attempts.pow(2))).await; // Exponential backoff + traced_sleep(Duration::from_millis(attempts.pow(2))).await; + // Exponential backoff } else { return log_and_err!(reason = e, "failed to retrieve receipts"); } diff --git a/src/ext.rs b/src/ext.rs index d9c36e5d0..bd7d5f72e 100644 --- a/src/ext.rs +++ b/src/ext.rs @@ -6,6 +6,8 @@ use anyhow::anyhow; use tokio::select; use tokio::signal::unix::signal; use tokio::signal::unix::SignalKind; +use tracing::info_span; +use tracing::Instrument; use tracing::Span; use crate::infra::tracing::info_task_spawn; @@ -271,6 +273,18 @@ macro_rules! log_and_err { // Tokio // ----------------------------------------------------------------------------- +#[inline(always)] +pub async fn traced_sleep(duration: Duration) { + #[cfg(feature = "tracing")] + { + let span = info_span!("tokio::sleep", duration_ms = %duration.as_millis()); + tokio::time::sleep(duration).instrument(span).await; + } + + #[cfg(not(feature = "tracing"))] + tokio::time::sleep(duration).await; +} + /// Spawns an async Tokio task with a name to be displayed in tokio-console. #[track_caller] pub fn named_spawn(name: &str, task: impl std::future::Future + Send + 'static) -> tokio::task::JoinHandle diff --git a/src/infra/tracing.rs b/src/infra/tracing.rs index 88d40e34d..6f557417d 100644 --- a/src/infra/tracing.rs +++ b/src/infra/tracing.rs @@ -74,8 +74,9 @@ pub async fn init_tracing(url: Option<&String>, console_address: SocketAddr) -> // configure opentelemetry layer let opentelemetry_layer = match url { Some(url) => { - println!("tracing registry: enabling opentelemetry exporter | url={}", url); - let tracer_config = trace::config().with_resource(Resource::new(vec![KeyValue::new("service.name", format!("stratus-{}", binary_name()))])); + let service_name = format!("stratus-{}", binary_name()); + println!("tracing registry: enabling opentelemetry exporter | url={} service={}", url, service_name); + let tracer_config = trace::config().with_resource(Resource::new(vec![KeyValue::new("service.name", service_name)])); let tracer_exporter = opentelemetry_otlp::new_exporter().tonic().with_endpoint(url); let tracer = opentelemetry_otlp::new_pipeline() diff --git a/tests/test_import_external_snapshot_common.rs b/tests/test_import_external_snapshot_common.rs index ea96b71af..18f83c5f5 100644 --- a/tests/test_import_external_snapshot_common.rs +++ b/tests/test_import_external_snapshot_common.rs @@ -18,6 +18,7 @@ use stratus::eth::storage::InMemoryPermanentStorageState; use stratus::eth::storage::InMemoryTemporaryStorage; use stratus::eth::storage::PermanentStorage; use stratus::eth::storage::StratusStorage; +use stratus::ext::traced_sleep; use stratus::infra::docker::Docker; use stratus::GlobalServices; #[cfg(feature = "metrics")] @@ -162,7 +163,7 @@ pub async fn execute_test( miner.mine_external_and_commit().await.unwrap(); // get metrics from prometheus (sleep to ensure prometheus collected) - tokio::time::sleep(Duration::from_secs(5)).await; + traced_sleep(Duration::from_secs(5)).await; println!("{}\n{}\n{}", "=".repeat(80), test_name, "=".repeat(80)); for query in METRIC_QUERIES {