Skip to content

Commit

Permalink
enha: use parking_lot mutex instead of std::sync (#1902)
Browse files Browse the repository at this point in the history
  • Loading branch information
carneiro-cw authored Dec 10, 2024
1 parent d1c86fe commit 426da2f
Show file tree
Hide file tree
Showing 8 changed files with 40 additions and 101 deletions.
5 changes: 2 additions & 3 deletions src/eth/executor/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ use std::cmp::max;
use std::mem;
use std::str::FromStr;
use std::sync::Arc;
use std::sync::Mutex;

use anyhow::anyhow;
use cfg_if::cfg_if;
use parking_lot::Mutex;
use tracing::info_span;
use tracing::Span;

Expand Down Expand Up @@ -35,7 +35,6 @@ use crate::eth::storage::Storage;
use crate::eth::storage::StratusStorage;
use crate::ext::spawn_thread;
use crate::ext::to_json_string;
use crate::ext::MutexExt;
#[cfg(feature = "metrics")]
use crate::ext::OptionExt;
#[cfg(feature = "metrics")]
Expand Down Expand Up @@ -399,7 +398,7 @@ impl Executor {
// * Conflict detection runs, but it should never trigger because of the Mutex.
ExecutorStrategy::Serial => {
// acquire serial execution lock
let _serial_lock = self.locks.serial.lock_or_clear("executor serial lock was poisoned");
let _serial_lock = self.locks.serial.lock();

// execute transaction
self.execute_local_transaction_attempts(tx, EvmRoute::Serial, INFINITE_ATTEMPTS)
Expand Down
47 changes: 12 additions & 35 deletions src/eth/miner/miner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@ use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::sync::mpsc;
use std::sync::Arc;
use std::sync::Mutex;
use std::sync::RwLock;
use std::time::Duration;

use anyhow::anyhow;
use itertools::Itertools;
use keccak_hasher::KeccakHasher;
use parking_lot::Mutex;
use parking_lot::RwLock;
use tokio::sync::broadcast;
use tokio::sync::Mutex as AsyncMutex;
use tokio::task::JoinSet;
Expand All @@ -34,8 +34,6 @@ use crate::eth::storage::Storage;
use crate::eth::storage::StratusStorage;
use crate::ext::not;
use crate::ext::DisplayExt;
use crate::ext::MutexExt;
use crate::ext::MutexResultExt;
use crate::globals::STRATUS_SHUTDOWN_SIGNAL;
use crate::infra::tracing::SpanExt;
use crate::log_and_err;
Expand Down Expand Up @@ -128,7 +126,7 @@ impl Miner {

joinset.spawn(interval_miner_ticker::run(block_time, ticks_tx, new_shutdown_signal.clone()));

*self.shutdown_signal.lock_or_clear("setting up shutdown signal for interval miner") = new_shutdown_signal;
*self.shutdown_signal.lock() = new_shutdown_signal;
*self.interval_joinset.lock().await = Some(joinset);
}

Expand Down Expand Up @@ -159,19 +157,11 @@ impl Miner {
}

pub fn mode(&self) -> MinerMode {
*self.mode.read().unwrap_or_else(|poison_error| {
tracing::error!("miner mode read lock was poisoned");
self.mode.clear_poison();
poison_error.into_inner()
})
*self.mode.read()
}

fn set_mode(&self, new_mode: MinerMode) {
*self.mode.write().unwrap_or_else(|poison_error| {
tracing::error!("miner mode write lock was poisoned");
self.mode.clear_poison();
poison_error.into_inner()
}) = new_mode;
*self.mode.write() = new_mode;
}

pub fn is_interval_miner_running(&self) -> bool {
Expand All @@ -195,7 +185,7 @@ impl Miner {

tracing::warn!("Shutting down interval miner to switch to external mode");

self.shutdown_signal.lock_or_clear("sending shutdown signal to interval miner").cancel();
self.shutdown_signal.lock().cancel();

// wait for all tasks to end
while let Some(result) = joinset.join_next().await {
Expand All @@ -217,11 +207,7 @@ impl Miner {
let is_automine = self.mode().is_automine();

// if automine is enabled, only one transaction can enter the block at a time.
let _save_execution_lock = if is_automine {
Some(self.locks.save_execution.lock().map_lock_error("save_execution")?)
} else {
None
};
let _save_execution_lock = if is_automine { Some(self.locks.save_execution.lock()) } else { None };

// save execution to temporary storage
self.storage.save_execution(tx_execution, check_conflicts)?;
Expand All @@ -237,14 +223,6 @@ impl Miner {
Ok(())
}

/// Same as [`Self::mine_external`], but automatically commits the block instead of returning it.
pub fn mine_external_and_commit(&self, external_block: ExternalBlock) -> anyhow::Result<()> {
let _mine_and_commit_lock = self.locks.mine_and_commit.lock().map_lock_error("mine_external_and_commit")?;

let block = self.mine_external(external_block)?;
self.commit(block)
}

/// Mines external block and external transactions.
///
/// Local transactions are not allowed to be part of the block.
Expand All @@ -254,7 +232,7 @@ impl Miner {
let _span = info_span!("miner::mine_external", block_number = field::Empty).entered();

// lock
let _mine_lock = self.locks.mine.lock().map_lock_error("mine_external")?;
let _mine_lock = self.locks.mine.lock();

// mine block
let block = self.storage.finish_pending_block()?;
Expand All @@ -277,7 +255,7 @@ impl Miner {
/// Same as [`Self::mine_local`], but automatically commits the block instead of returning it.
/// mainly used when is_automine is enabled.
pub fn mine_local_and_commit(&self) -> anyhow::Result<()> {
let _mine_and_commit_lock = self.locks.mine_and_commit.lock().map_lock_error("mine_local_and_commit")?;
let _mine_and_commit_lock = self.locks.mine_and_commit.lock();

let block = self.mine_local()?;
self.commit(block)
Expand All @@ -291,7 +269,7 @@ impl Miner {
let _span = info_span!("miner::mine_local", block_number = field::Empty).entered();

// lock
let _mine_lock = self.locks.mine.lock().map_lock_error("mine_local")?;
let _mine_lock = self.locks.mine.lock();

// mine block
let block = self.storage.finish_pending_block()?;
Expand Down Expand Up @@ -320,7 +298,7 @@ impl Miner {
tracing::info!(%block_number, transactions_len = %block.transactions.len(), "commiting block");

// lock
let _commit_lock = self.locks.commit.lock().map_lock_error("commit")?;
let _commit_lock = self.locks.commit.lock();

tracing::info!(%block_number, "miner acquired commit lock");

Expand Down Expand Up @@ -453,7 +431,6 @@ mod interval_miner {
use tokio_util::sync::CancellationToken;

use crate::eth::miner::Miner;
use crate::ext::MutexExt;
use crate::infra::tracing::warn_task_cancellation;
use crate::infra::tracing::warn_task_rx_closed;

Expand Down Expand Up @@ -489,7 +466,7 @@ mod interval_miner {

#[inline(always)]
fn mine_and_commit(miner: &Miner) {
let _mine_and_commit_lock = miner.locks.mine_and_commit.lock_or_clear("mutex in mine_and_commit is poisoned");
let _mine_and_commit_lock = miner.locks.mine_and_commit.lock();

// mine
let block = loop {
Expand Down
18 changes: 4 additions & 14 deletions src/eth/rpc/rpc_context.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::fmt::Debug;
use std::sync::Arc;
use std::sync::RwLock;

use parking_lot::RwLock;

use crate::alias::JsonValue;
use crate::eth::executor::Executor;
Expand Down Expand Up @@ -33,22 +34,11 @@ pub struct RpcContext {

impl RpcContext {
pub fn consensus(&self) -> Option<Arc<dyn Consensus>> {
self.consensus
.read()
.unwrap_or_else(|poison_error| {
tracing::error!("consensus read lock was poisoned");
self.consensus.clear_poison();
poison_error.into_inner()
})
.clone()
self.consensus.read().clone()
}

pub fn set_consensus(&self, new_consensus: Option<Arc<dyn Consensus>>) {
*self.consensus.write().unwrap_or_else(|poison_error| {
tracing::error!("consensus write lock was poisoned");
self.consensus.clear_poison();
poison_error.into_inner()
}) = new_consensus;
*self.consensus.write() = new_consensus;
}
}

Expand Down
10 changes: 5 additions & 5 deletions src/eth/storage/permanent/inmemory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@ use std::fmt::Debug;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::sync::RwLock;
use std::sync::RwLockReadGuard;
use std::sync::RwLockWriteGuard;

use indexmap::IndexMap;
use itertools::Itertools;
use nonempty::NonEmpty;
use parking_lot::RwLock;
use parking_lot::RwLockReadGuard;
use parking_lot::RwLockWriteGuard;

use crate::eth::primitives::Account;
use crate::eth::primitives::Address;
Expand Down Expand Up @@ -52,12 +52,12 @@ impl InMemoryPermanentStorage {

/// Locks inner state for reading.
fn lock_read(&self) -> RwLockReadGuard<'_, InMemoryPermanentStorageState> {
self.state.read().unwrap()
self.state.read()
}

/// Locks inner state for writing.
fn lock_write(&self) -> RwLockWriteGuard<'_, InMemoryPermanentStorageState> {
self.state.write().unwrap()
self.state.write()
}

// -------------------------------------------------------------------------
Expand Down
6 changes: 2 additions & 4 deletions src/eth/storage/permanent/rocks/rocks_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,13 @@ use crate::eth::primitives::PointInTime;
use crate::eth::primitives::Slot;
use crate::eth::primitives::SlotIndex;
use crate::eth::primitives::TransactionMined;
#[cfg(feature = "metrics")]
use crate::ext::MutexExt;
use crate::ext::OptionExt;
use crate::log_and_err;
use crate::utils::GIGABYTE;

cfg_if::cfg_if! {
if #[cfg(feature = "metrics")] {
use std::sync::Mutex;
use parking_lot::Mutex;

use rocksdb::statistics::Histogram;
use rocksdb::statistics::Ticker;
Expand Down Expand Up @@ -597,7 +595,7 @@ impl RocksStorageState {
// The stats are cumulative since opening the db
// we can get the average in the time interval with: avg = (new_sum - sum)/(new_count - count)

let mut prev_values = self.prev_stats.lock_or_clear("mutex in get_histogram_average_in_interval is poisoned");
let mut prev_values = self.prev_stats.lock();

let (prev_sum, prev_count): (Sum, Count) = *prev_values.get(&(hist as u32)).unwrap_or(&(0, 0));
let data = self.db_options.get_histogram_data(hist);
Expand Down
32 changes: 12 additions & 20 deletions src/eth/storage/temporary/inmemory.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
//! In-memory storage implementations.
use std::collections::HashMap;
use std::sync::RwLock;

use parking_lot::RwLock;

use crate::eth::executor::EvmInput;
use crate::eth::primitives::Account;
Expand Down Expand Up @@ -111,7 +112,7 @@ impl TemporaryStorage for InMemoryTemporaryStorage {

// Uneeded clone here, return Cow
fn read_pending_block_header(&self) -> PendingBlockHeader {
self.pending_block.read().unwrap().block.header.clone()
self.pending_block.read().block.header.clone()
}

// -------------------------------------------------------------------------
Expand All @@ -120,7 +121,7 @@ impl TemporaryStorage for InMemoryTemporaryStorage {

fn save_pending_execution(&self, tx: TransactionExecution, check_conflicts: bool) -> Result<(), StratusError> {
// check conflicts
let mut pending_block = self.pending_block.write().unwrap();
let mut pending_block = self.pending_block.write();
if let TransactionExecution::Local(tx) = &tx {
let expected_input = EvmInput::from_eth_transaction(&tx.input, &pending_block.block.header);

Expand Down Expand Up @@ -174,11 +175,11 @@ impl TemporaryStorage for InMemoryTemporaryStorage {
}

fn read_pending_executions(&self) -> Vec<TransactionExecution> {
self.pending_block.read().unwrap().block.transactions.iter().map(|(_, tx)| tx.clone()).collect()
self.pending_block.read().block.transactions.iter().map(|(_, tx)| tx.clone()).collect()
}

fn finish_pending_block(&self) -> anyhow::Result<PendingBlock> {
let mut pending_block = self.pending_block.write().unwrap();
let mut pending_block = self.pending_block.write();

#[cfg(feature = "dev")]
let mut finished_block = pending_block.block.clone();
Expand All @@ -194,7 +195,7 @@ impl TemporaryStorage for InMemoryTemporaryStorage {
}
}

let mut latest = self.latest_block.write().unwrap();
let mut latest = self.latest_block.write();
*latest = Some(std::mem::replace(
&mut *pending_block,
InMemoryTemporaryStorageState::new(finished_block.header.number.next_block_number()),
Expand All @@ -204,7 +205,7 @@ impl TemporaryStorage for InMemoryTemporaryStorage {
}

fn read_pending_execution(&self, hash: Hash) -> anyhow::Result<Option<TransactionExecution>> {
let pending_block = self.pending_block.read().unwrap();
let pending_block = self.pending_block.read();
match pending_block.block.transactions.get(&hash) {
Some(tx) => Ok(Some(tx.clone())),
None => Ok(None),
Expand All @@ -216,12 +217,11 @@ impl TemporaryStorage for InMemoryTemporaryStorage {
// -------------------------------------------------------------------------

fn read_account(&self, address: Address) -> anyhow::Result<Option<Account>> {
Ok(match self.pending_block.read().unwrap().accounts.get(&address) {
Ok(match self.pending_block.read().accounts.get(&address) {
Some(pending_account) => Some(pending_account.info.clone()),
None => self
.latest_block
.read()
.unwrap()
.as_ref()
.and_then(|latest| latest.accounts.get(&address))
.map(|account| account.info.clone()),
Expand All @@ -230,19 +230,11 @@ impl TemporaryStorage for InMemoryTemporaryStorage {

fn read_slot(&self, address: Address, index: SlotIndex) -> anyhow::Result<Option<Slot>> {
Ok(
match self
.pending_block
.read()
.unwrap()
.accounts
.get(&address)
.and_then(|account| account.slots.get(&index))
{
match self.pending_block.read().accounts.get(&address).and_then(|account| account.slots.get(&index)) {
Some(pending_slot) => Some(*pending_slot),
None => self
.latest_block
.read()
.unwrap()
.as_ref()
.and_then(|latest| latest.accounts.get(&address).and_then(|account| account.slots.get(&index)))
.copied(),
Expand All @@ -254,8 +246,8 @@ impl TemporaryStorage for InMemoryTemporaryStorage {
// Global state
// -------------------------------------------------------------------------
fn reset(&self) -> anyhow::Result<()> {
self.pending_block.write().unwrap().reset();
*self.latest_block.write().unwrap() = None;
self.pending_block.write().reset();
*self.latest_block.write() = None;
Ok(())
}
}
Loading

0 comments on commit 426da2f

Please sign in to comment.