diff --git a/src/eth/executor/executor.rs b/src/eth/executor/executor.rs index d9eeb6031..701c48765 100644 --- a/src/eth/executor/executor.rs +++ b/src/eth/executor/executor.rs @@ -2,10 +2,10 @@ use std::cmp::max; use std::mem; use std::str::FromStr; use std::sync::Arc; -use std::sync::Mutex; use anyhow::anyhow; use cfg_if::cfg_if; +use parking_lot::Mutex; use tracing::info_span; use tracing::Span; @@ -35,7 +35,6 @@ use crate::eth::storage::Storage; use crate::eth::storage::StratusStorage; use crate::ext::spawn_thread; use crate::ext::to_json_string; -use crate::ext::MutexExt; #[cfg(feature = "metrics")] use crate::ext::OptionExt; #[cfg(feature = "metrics")] @@ -399,7 +398,7 @@ impl Executor { // * Conflict detection runs, but it should never trigger because of the Mutex. ExecutorStrategy::Serial => { // acquire serial execution lock - let _serial_lock = self.locks.serial.lock_or_clear("executor serial lock was poisoned"); + let _serial_lock = self.locks.serial.lock(); // execute transaction self.execute_local_transaction_attempts(tx, EvmRoute::Serial, INFINITE_ATTEMPTS) diff --git a/src/eth/miner/miner.rs b/src/eth/miner/miner.rs index 643a3ab20..fbe6bc680 100644 --- a/src/eth/miner/miner.rs +++ b/src/eth/miner/miner.rs @@ -2,13 +2,13 @@ use std::sync::atomic::AtomicBool; use std::sync::atomic::Ordering; use std::sync::mpsc; use std::sync::Arc; -use std::sync::Mutex; -use std::sync::RwLock; use std::time::Duration; use anyhow::anyhow; use itertools::Itertools; use keccak_hasher::KeccakHasher; +use parking_lot::Mutex; +use parking_lot::RwLock; use tokio::sync::broadcast; use tokio::sync::Mutex as AsyncMutex; use tokio::task::JoinSet; @@ -34,8 +34,6 @@ use crate::eth::storage::Storage; use crate::eth::storage::StratusStorage; use crate::ext::not; use crate::ext::DisplayExt; -use crate::ext::MutexExt; -use crate::ext::MutexResultExt; use crate::globals::STRATUS_SHUTDOWN_SIGNAL; use crate::infra::tracing::SpanExt; use crate::log_and_err; @@ -128,7 +126,7 @@ impl Miner { joinset.spawn(interval_miner_ticker::run(block_time, ticks_tx, new_shutdown_signal.clone())); - *self.shutdown_signal.lock_or_clear("setting up shutdown signal for interval miner") = new_shutdown_signal; + *self.shutdown_signal.lock() = new_shutdown_signal; *self.interval_joinset.lock().await = Some(joinset); } @@ -159,19 +157,11 @@ impl Miner { } pub fn mode(&self) -> MinerMode { - *self.mode.read().unwrap_or_else(|poison_error| { - tracing::error!("miner mode read lock was poisoned"); - self.mode.clear_poison(); - poison_error.into_inner() - }) + *self.mode.read() } fn set_mode(&self, new_mode: MinerMode) { - *self.mode.write().unwrap_or_else(|poison_error| { - tracing::error!("miner mode write lock was poisoned"); - self.mode.clear_poison(); - poison_error.into_inner() - }) = new_mode; + *self.mode.write() = new_mode; } pub fn is_interval_miner_running(&self) -> bool { @@ -195,7 +185,7 @@ impl Miner { tracing::warn!("Shutting down interval miner to switch to external mode"); - self.shutdown_signal.lock_or_clear("sending shutdown signal to interval miner").cancel(); + self.shutdown_signal.lock().cancel(); // wait for all tasks to end while let Some(result) = joinset.join_next().await { @@ -217,11 +207,7 @@ impl Miner { let is_automine = self.mode().is_automine(); // if automine is enabled, only one transaction can enter the block at a time. - let _save_execution_lock = if is_automine { - Some(self.locks.save_execution.lock().map_lock_error("save_execution")?) - } else { - None - }; + let _save_execution_lock = if is_automine { Some(self.locks.save_execution.lock()) } else { None }; // save execution to temporary storage self.storage.save_execution(tx_execution, check_conflicts)?; @@ -237,14 +223,6 @@ impl Miner { Ok(()) } - /// Same as [`Self::mine_external`], but automatically commits the block instead of returning it. - pub fn mine_external_and_commit(&self, external_block: ExternalBlock) -> anyhow::Result<()> { - let _mine_and_commit_lock = self.locks.mine_and_commit.lock().map_lock_error("mine_external_and_commit")?; - - let block = self.mine_external(external_block)?; - self.commit(block) - } - /// Mines external block and external transactions. /// /// Local transactions are not allowed to be part of the block. @@ -254,7 +232,7 @@ impl Miner { let _span = info_span!("miner::mine_external", block_number = field::Empty).entered(); // lock - let _mine_lock = self.locks.mine.lock().map_lock_error("mine_external")?; + let _mine_lock = self.locks.mine.lock(); // mine block let block = self.storage.finish_pending_block()?; @@ -277,7 +255,7 @@ impl Miner { /// Same as [`Self::mine_local`], but automatically commits the block instead of returning it. /// mainly used when is_automine is enabled. pub fn mine_local_and_commit(&self) -> anyhow::Result<()> { - let _mine_and_commit_lock = self.locks.mine_and_commit.lock().map_lock_error("mine_local_and_commit")?; + let _mine_and_commit_lock = self.locks.mine_and_commit.lock(); let block = self.mine_local()?; self.commit(block) @@ -291,7 +269,7 @@ impl Miner { let _span = info_span!("miner::mine_local", block_number = field::Empty).entered(); // lock - let _mine_lock = self.locks.mine.lock().map_lock_error("mine_local")?; + let _mine_lock = self.locks.mine.lock(); // mine block let block = self.storage.finish_pending_block()?; @@ -320,7 +298,7 @@ impl Miner { tracing::info!(%block_number, transactions_len = %block.transactions.len(), "commiting block"); // lock - let _commit_lock = self.locks.commit.lock().map_lock_error("commit")?; + let _commit_lock = self.locks.commit.lock(); tracing::info!(%block_number, "miner acquired commit lock"); @@ -453,7 +431,6 @@ mod interval_miner { use tokio_util::sync::CancellationToken; use crate::eth::miner::Miner; - use crate::ext::MutexExt; use crate::infra::tracing::warn_task_cancellation; use crate::infra::tracing::warn_task_rx_closed; @@ -489,7 +466,7 @@ mod interval_miner { #[inline(always)] fn mine_and_commit(miner: &Miner) { - let _mine_and_commit_lock = miner.locks.mine_and_commit.lock_or_clear("mutex in mine_and_commit is poisoned"); + let _mine_and_commit_lock = miner.locks.mine_and_commit.lock(); // mine let block = loop { diff --git a/src/eth/rpc/rpc_context.rs b/src/eth/rpc/rpc_context.rs index e137d1ad9..9f7cf62e1 100644 --- a/src/eth/rpc/rpc_context.rs +++ b/src/eth/rpc/rpc_context.rs @@ -1,6 +1,7 @@ use std::fmt::Debug; use std::sync::Arc; -use std::sync::RwLock; + +use parking_lot::RwLock; use crate::alias::JsonValue; use crate::eth::executor::Executor; @@ -33,22 +34,11 @@ pub struct RpcContext { impl RpcContext { pub fn consensus(&self) -> Option> { - self.consensus - .read() - .unwrap_or_else(|poison_error| { - tracing::error!("consensus read lock was poisoned"); - self.consensus.clear_poison(); - poison_error.into_inner() - }) - .clone() + self.consensus.read().clone() } pub fn set_consensus(&self, new_consensus: Option>) { - *self.consensus.write().unwrap_or_else(|poison_error| { - tracing::error!("consensus write lock was poisoned"); - self.consensus.clear_poison(); - poison_error.into_inner() - }) = new_consensus; + *self.consensus.write() = new_consensus; } } diff --git a/src/eth/storage/permanent/inmemory.rs b/src/eth/storage/permanent/inmemory.rs index 935de949f..6115c6d70 100644 --- a/src/eth/storage/permanent/inmemory.rs +++ b/src/eth/storage/permanent/inmemory.rs @@ -5,13 +5,13 @@ use std::fmt::Debug; use std::sync::atomic::AtomicU64; use std::sync::atomic::Ordering; use std::sync::Arc; -use std::sync::RwLock; -use std::sync::RwLockReadGuard; -use std::sync::RwLockWriteGuard; use indexmap::IndexMap; use itertools::Itertools; use nonempty::NonEmpty; +use parking_lot::RwLock; +use parking_lot::RwLockReadGuard; +use parking_lot::RwLockWriteGuard; use crate::eth::primitives::Account; use crate::eth::primitives::Address; @@ -52,12 +52,12 @@ impl InMemoryPermanentStorage { /// Locks inner state for reading. fn lock_read(&self) -> RwLockReadGuard<'_, InMemoryPermanentStorageState> { - self.state.read().unwrap() + self.state.read() } /// Locks inner state for writing. fn lock_write(&self) -> RwLockWriteGuard<'_, InMemoryPermanentStorageState> { - self.state.write().unwrap() + self.state.write() } // ------------------------------------------------------------------------- diff --git a/src/eth/storage/permanent/rocks/rocks_state.rs b/src/eth/storage/permanent/rocks/rocks_state.rs index 6348e944c..e2776aa4b 100644 --- a/src/eth/storage/permanent/rocks/rocks_state.rs +++ b/src/eth/storage/permanent/rocks/rocks_state.rs @@ -51,15 +51,13 @@ use crate::eth::primitives::PointInTime; use crate::eth::primitives::Slot; use crate::eth::primitives::SlotIndex; use crate::eth::primitives::TransactionMined; -#[cfg(feature = "metrics")] -use crate::ext::MutexExt; use crate::ext::OptionExt; use crate::log_and_err; use crate::utils::GIGABYTE; cfg_if::cfg_if! { if #[cfg(feature = "metrics")] { - use std::sync::Mutex; + use parking_lot::Mutex; use rocksdb::statistics::Histogram; use rocksdb::statistics::Ticker; @@ -597,7 +595,7 @@ impl RocksStorageState { // The stats are cumulative since opening the db // we can get the average in the time interval with: avg = (new_sum - sum)/(new_count - count) - let mut prev_values = self.prev_stats.lock_or_clear("mutex in get_histogram_average_in_interval is poisoned"); + let mut prev_values = self.prev_stats.lock(); let (prev_sum, prev_count): (Sum, Count) = *prev_values.get(&(hist as u32)).unwrap_or(&(0, 0)); let data = self.db_options.get_histogram_data(hist); diff --git a/src/eth/storage/temporary/inmemory.rs b/src/eth/storage/temporary/inmemory.rs index da0263de7..015628eef 100644 --- a/src/eth/storage/temporary/inmemory.rs +++ b/src/eth/storage/temporary/inmemory.rs @@ -1,7 +1,8 @@ //! In-memory storage implementations. use std::collections::HashMap; -use std::sync::RwLock; + +use parking_lot::RwLock; use crate::eth::executor::EvmInput; use crate::eth::primitives::Account; @@ -111,7 +112,7 @@ impl TemporaryStorage for InMemoryTemporaryStorage { // Uneeded clone here, return Cow fn read_pending_block_header(&self) -> PendingBlockHeader { - self.pending_block.read().unwrap().block.header.clone() + self.pending_block.read().block.header.clone() } // ------------------------------------------------------------------------- @@ -120,7 +121,7 @@ impl TemporaryStorage for InMemoryTemporaryStorage { fn save_pending_execution(&self, tx: TransactionExecution, check_conflicts: bool) -> Result<(), StratusError> { // check conflicts - let mut pending_block = self.pending_block.write().unwrap(); + let mut pending_block = self.pending_block.write(); if let TransactionExecution::Local(tx) = &tx { let expected_input = EvmInput::from_eth_transaction(&tx.input, &pending_block.block.header); @@ -174,11 +175,11 @@ impl TemporaryStorage for InMemoryTemporaryStorage { } fn read_pending_executions(&self) -> Vec { - self.pending_block.read().unwrap().block.transactions.iter().map(|(_, tx)| tx.clone()).collect() + self.pending_block.read().block.transactions.iter().map(|(_, tx)| tx.clone()).collect() } fn finish_pending_block(&self) -> anyhow::Result { - let mut pending_block = self.pending_block.write().unwrap(); + let mut pending_block = self.pending_block.write(); #[cfg(feature = "dev")] let mut finished_block = pending_block.block.clone(); @@ -194,7 +195,7 @@ impl TemporaryStorage for InMemoryTemporaryStorage { } } - let mut latest = self.latest_block.write().unwrap(); + let mut latest = self.latest_block.write(); *latest = Some(std::mem::replace( &mut *pending_block, InMemoryTemporaryStorageState::new(finished_block.header.number.next_block_number()), @@ -204,7 +205,7 @@ impl TemporaryStorage for InMemoryTemporaryStorage { } fn read_pending_execution(&self, hash: Hash) -> anyhow::Result> { - let pending_block = self.pending_block.read().unwrap(); + let pending_block = self.pending_block.read(); match pending_block.block.transactions.get(&hash) { Some(tx) => Ok(Some(tx.clone())), None => Ok(None), @@ -216,12 +217,11 @@ impl TemporaryStorage for InMemoryTemporaryStorage { // ------------------------------------------------------------------------- fn read_account(&self, address: Address) -> anyhow::Result> { - Ok(match self.pending_block.read().unwrap().accounts.get(&address) { + Ok(match self.pending_block.read().accounts.get(&address) { Some(pending_account) => Some(pending_account.info.clone()), None => self .latest_block .read() - .unwrap() .as_ref() .and_then(|latest| latest.accounts.get(&address)) .map(|account| account.info.clone()), @@ -230,19 +230,11 @@ impl TemporaryStorage for InMemoryTemporaryStorage { fn read_slot(&self, address: Address, index: SlotIndex) -> anyhow::Result> { Ok( - match self - .pending_block - .read() - .unwrap() - .accounts - .get(&address) - .and_then(|account| account.slots.get(&index)) - { + match self.pending_block.read().accounts.get(&address).and_then(|account| account.slots.get(&index)) { Some(pending_slot) => Some(*pending_slot), None => self .latest_block .read() - .unwrap() .as_ref() .and_then(|latest| latest.accounts.get(&address).and_then(|account| account.slots.get(&index))) .copied(), @@ -254,8 +246,8 @@ impl TemporaryStorage for InMemoryTemporaryStorage { // Global state // ------------------------------------------------------------------------- fn reset(&self) -> anyhow::Result<()> { - self.pending_block.write().unwrap().reset(); - *self.latest_block.write().unwrap() = None; + self.pending_block.write().reset(); + *self.latest_block.write() = None; Ok(()) } } diff --git a/src/ext.rs b/src/ext.rs index 2918ed16e..74547c5c5 100644 --- a/src/ext.rs +++ b/src/ext.rs @@ -2,8 +2,6 @@ use std::collections::BTreeMap; use std::collections::HashMap; -use std::sync::Mutex; -use std::sync::MutexGuard; use std::time::Duration; use anyhow::anyhow; @@ -176,20 +174,6 @@ impl MutexResultExt for Result> { } } -pub trait MutexExt { - fn lock_or_clear<'a>(&'a self, error_context: &str) -> MutexGuard<'a, T>; -} - -impl MutexExt for Mutex { - fn lock_or_clear<'a>(&'a self, error_context: &str) -> MutexGuard<'a, T> { - self.lock().unwrap_or_else(|poison_err| { - tracing::error!(error_context, "fatal: failed to lock mutex"); - self.clear_poison(); - poison_err.into_inner() - }) - } -} - // ----------------------------------------------------------------------------- // Duration // ----------------------------------------------------------------------------- diff --git a/src/globals.rs b/src/globals.rs index ce4dc4334..ce00603d9 100644 --- a/src/globals.rs +++ b/src/globals.rs @@ -1,11 +1,11 @@ use std::fmt::Debug; use std::sync::atomic::AtomicBool; use std::sync::atomic::Ordering; -use std::sync::Mutex; use chrono::DateTime; use chrono::Utc; use once_cell::sync::Lazy; +use parking_lot::Mutex; use sentry::ClientInitGuard; use serde::Deserialize; use serde::Serialize; @@ -22,7 +22,6 @@ use crate::eth::follower::importer::Importer; use crate::eth::rpc::RpcContext; use crate::ext::not; use crate::ext::spawn_signal_handler; -use crate::ext::MutexExt; use crate::infra::tracing::warn_task_cancellation; // ----------------------------------------------------------------------------- @@ -262,11 +261,11 @@ impl GlobalState { } pub fn set_node_mode(mode: NodeMode) { - *NODE_MODE.lock_or_clear("set_node_mode") = mode; + *NODE_MODE.lock() = mode; } pub fn get_node_mode() -> NodeMode { - *NODE_MODE.lock_or_clear("get_node_mode") + *NODE_MODE.lock() } // -------------------------------------------------------------------------