diff --git a/src/eth/executor/executor.rs b/src/eth/executor/executor.rs index b1217ef85..88bb2c954 100644 --- a/src/eth/executor/executor.rs +++ b/src/eth/executor/executor.rs @@ -222,14 +222,12 @@ impl Executor { // ------------------------------------------------------------------------- /// Reexecutes an external block locally and imports it to the temporary storage. - #[tracing::instrument(name = "executor::external_block", skip_all, fields(block_number))] pub fn execute_external_block(&self, block: &ExternalBlock, receipts: &ExternalReceipts) -> anyhow::Result<()> { + // track #[cfg(feature = "metrics")] let (start, mut block_metrics) = (metrics::now(), EvmExecutionMetrics::default()); - - Span::with(|s| { - s.rec_str("block_number", &block.number()); - }); + #[cfg(feature = "tracing")] + let _span = info_span!("executor::external_block", block_number = %block.number()).entered(); tracing::info!(block_number = %block.number(), "reexecuting external block"); // track pending block number @@ -264,7 +262,6 @@ impl Executor { /// /// This function wraps `reexecute_external_tx_inner` and returns back the payload /// to facilitate re-execution of parallel transactions that failed - #[tracing::instrument(name = "executor::external_transaction", skip_all, fields(tx_hash))] fn execute_external_transaction( &self, tx: &ExternalTransaction, @@ -272,13 +269,11 @@ impl Executor { block: &ExternalBlock, #[cfg(feature = "metrics")] block_metrics: &mut EvmExecutionMetrics, ) -> anyhow::Result<()> { + // track #[cfg(feature = "metrics")] let start = metrics::now(); - - // track - Span::with(|s| { - s.rec_str("tx_hash", &tx.hash); - }); + #[cfg(feature = "tracing")] + let _span = info_span!("executor::external_transaction", tx_hash = %tx.hash).entered(); tracing::info!(block_number = %block.number(), tx_hash = %tx.hash(), "reexecuting external transaction"); // when transaction externally failed, create fake transaction instead of reexecuting @@ -333,7 +328,6 @@ impl Executor { self.miner.save_execution(tx_execution.clone())?; // track metrics - #[cfg(feature = "metrics")] { let evm_execution = tx_execution.execution(); @@ -394,7 +388,6 @@ impl Executor { self.locks.serial.clear_poison(); poison.into_inner() })); - tracing::info!("executor acquired mine_and_commit lock to prevent executor mining block"); miner_lock } else { None diff --git a/src/eth/miner/miner.rs b/src/eth/miner/miner.rs index fb834bd8d..0f4ac5e07 100644 --- a/src/eth/miner/miner.rs +++ b/src/eth/miner/miner.rs @@ -5,6 +5,8 @@ use std::sync::Mutex; use keccak_hasher::KeccakHasher; use nonempty::NonEmpty; use tokio::sync::broadcast; +use tracing::field; +use tracing::info_span; use tracing::Span; use crate::eth::miner::MinerMode; @@ -37,7 +39,7 @@ pub struct Miner { pub mode: MinerMode, /// Broadcasts pending transactions events. - pub notifier_pending_txs: broadcast::Sender, + pub notifier_pending_txs: broadcast::Sender, /// Broadcasts new mined blocks events. pub notifier_blocks: broadcast::Sender, @@ -90,11 +92,10 @@ impl Miner { } /// Persists a transaction execution. - #[tracing::instrument(name = "miner::save_execution", skip_all, fields(tx_hash))] pub fn save_execution(&self, tx_execution: TransactionExecution) -> Result<(), StratusError> { - Span::with(|s| { - s.rec_str("tx_hash", &tx_execution.hash()); - }); + // track + #[cfg(feature = "tracing")] + let _span = info_span!("miner::save_execution", tx_hash = %tx_execution.hash()).entered(); // if automine is enabled, only one transaction can enter the block at time. let _save_execution_lock = if self.mode.is_automine() { @@ -104,10 +105,11 @@ impl Miner { }; // save execution to temporary storage - self.storage.save_execution(tx_execution.clone())?; + let tx_hash = tx_execution.hash(); + self.storage.save_execution(tx_execution)?; // if automine is enabled, automatically mines a block - let _ = self.notifier_pending_txs.send(tx_execution); + let _ = self.notifier_pending_txs.send(tx_hash); if self.mode.is_automine() { self.mine_local_and_commit()?; } @@ -118,7 +120,6 @@ impl Miner { /// Same as [`Self::mine_external`], but automatically commits the block instead of returning it. pub fn mine_external_and_commit(&self) -> anyhow::Result<()> { let _mine_and_commit_lock = self.locks.mine_and_commit.lock().unwrap(); - tracing::info!("miner acquired mine and commit lock for external block"); let block = self.mine_external()?; self.commit(block) @@ -127,13 +128,14 @@ impl Miner { /// Mines external block and external transactions. /// /// Local transactions are not allowed to be part of the block. - #[tracing::instrument(name = "miner::mine_external", skip_all, fields(block_number))] pub fn mine_external(&self) -> anyhow::Result { + // track + #[cfg(feature = "tracing")] + let _span = info_span!("miner::mine_external", block_number = field::Empty).entered(); tracing::debug!("mining external block"); // lock let _mine_lock = self.locks.mine.lock().unwrap(); - tracing::info!("miner acquired mine lock for external block"); // mine let block = self.storage.finish_pending_block()?; @@ -151,13 +153,17 @@ impl Miner { let mined_txs = mine_external_transactions(block.number, external_txs)?; let block = block_from_external(external_block, mined_txs); - block.inspect(|block| Span::with(|s| s.rec_str("block_number", &block.number()))) + #[cfg(feature = "tracing")] + if let Ok(ref block) = block { + Span::with(|s| s.rec_str("block_number", &block.number())); + } + + block } /// Same as [`Self::mine_external_mixed`], but automatically commits the block instead of returning it. pub fn mine_external_mixed_and_commit(&self) -> anyhow::Result<()> { let _mine_and_commit_lock = self.locks.mine_and_commit.lock().unwrap(); - tracing::info!("miner acquired mine and commit lock for external mixed local block"); let block = self.mine_external_mixed()?; self.commit(block) @@ -172,7 +178,6 @@ impl Miner { // lock let _mine_lock = self.locks.mine.lock().unwrap(); - tracing::info!("miner acquired mining lock for external mixed block"); // mine let block = self.storage.finish_pending_block()?; @@ -204,7 +209,6 @@ impl Miner { /// mainly used when is_automine is enabled. pub fn mine_local_and_commit(&self) -> anyhow::Result<()> { let _mine_and_commit_lock = self.locks.mine_and_commit.lock().unwrap(); - tracing::info!("miner acquired mine and commit lock for local block"); let block = self.mine_local()?; self.commit(block) @@ -219,7 +223,6 @@ impl Miner { // lock let _mine_lock = self.locks.mine.lock().unwrap(); - tracing::info!("miner acquired mining lock for local block"); // mine let block = self.storage.finish_pending_block()?; @@ -240,9 +243,10 @@ impl Miner { } /// Persists a mined block to permanent storage and prepares new block. - #[tracing::instrument(name = "miner::commit", skip_all, fields(block_number))] pub fn commit(&self, block: Block) -> anyhow::Result<()> { - Span::with(|s| s.rec_str("block_number", &block.number())); + // track + #[cfg(feature = "tracing")] + let _span = info_span!("miner::commit", block_number = %block.number()).entered(); tracing::info!(block_number = %block.number(), transactions_len = %block.transactions.len(), "commiting block"); // lock @@ -407,7 +411,6 @@ mod interval_miner { #[inline(always)] fn mine_and_commit(miner: &Miner) { let _mine_and_commit_lock = miner.locks.mine_and_commit.lock().unwrap(); - tracing::info!("miner acquired mine and commit lock for interval local block"); // mine let block = loop { diff --git a/src/eth/rpc/rpc_subscriptions.rs b/src/eth/rpc/rpc_subscriptions.rs index d129fdafd..2759a0ea4 100644 --- a/src/eth/rpc/rpc_subscriptions.rs +++ b/src/eth/rpc/rpc_subscriptions.rs @@ -17,11 +17,11 @@ use tokio::time::Duration; use crate::eth::primitives::Block; use crate::eth::primitives::DateTimeNow; +use crate::eth::primitives::Hash; use crate::eth::primitives::LogFilter; use crate::eth::primitives::LogFilterInput; use crate::eth::primitives::LogMined; use crate::eth::primitives::StratusError; -use crate::eth::primitives::TransactionExecution; use crate::eth::rpc::RpcClientApp; use crate::ext::not; use crate::ext::spawn_named; @@ -59,11 +59,7 @@ pub struct RpcSubscriptions { impl RpcSubscriptions { /// Creates a new subscription manager that automatically spawns all necessary tasks in background. - pub fn spawn( - rx_pending_txs: broadcast::Receiver, - rx_blocks: broadcast::Receiver, - rx_logs: broadcast::Receiver, - ) -> Self { + pub fn spawn(rx_pending_txs: broadcast::Receiver, rx_blocks: broadcast::Receiver, rx_logs: broadcast::Receiver) -> Self { let connected = Arc::new(RpcSubscriptionsConnected::default()); Self::spawn_subscriptions_cleaner(Arc::clone(&connected)); @@ -146,10 +142,7 @@ impl RpcSubscriptions { } /// Spawns a new task that notifies subscribers about new executed transactions. - fn spawn_new_pending_txs_notifier( - subs: Arc, - mut rx_tx_hash: broadcast::Receiver, - ) -> JoinHandle> { + fn spawn_new_pending_txs_notifier(subs: Arc, mut rx_tx_hash: broadcast::Receiver) -> JoinHandle> { const TASK_NAME: &str = "rpc::sub::newPendingTransactions"; spawn_named(TASK_NAME, async move { loop { @@ -157,7 +150,7 @@ impl RpcSubscriptions { return Ok(()); } - let tx = match timeout(NOTIFIER_SHUTDOWN_CHECK_INTERVAL, rx_tx_hash.recv()).await { + let tx_hash = match timeout(NOTIFIER_SHUTDOWN_CHECK_INTERVAL, rx_tx_hash.recv()).await { Ok(Ok(tx)) => tx, Ok(Err(_channel_closed)) => break, Err(_timed_out) => continue, @@ -165,7 +158,7 @@ impl RpcSubscriptions { let interested_subs = subs.pending_txs.read().await; let interested_subs = interested_subs.values().collect_vec(); - Self::notify(interested_subs, tx.hash().to_string()); + Self::notify(interested_subs, tx_hash.to_string()); } warn_task_rx_closed(TASK_NAME); Ok(())