Skip to content

Commit

Permalink
enha: use parking_lot mutex instead of std::sync
Browse files Browse the repository at this point in the history
  • Loading branch information
carneiro-cw committed Dec 5, 2024
1 parent 2f05cb1 commit 30fae30
Show file tree
Hide file tree
Showing 5 changed files with 17 additions and 44 deletions.
5 changes: 2 additions & 3 deletions src/eth/executor/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ use std::cmp::max;
use std::mem;
use std::str::FromStr;
use std::sync::Arc;
use std::sync::Mutex;

use anyhow::anyhow;
use cfg_if::cfg_if;
use parking_lot::Mutex;
use tracing::info_span;
use tracing::Span;

Expand Down Expand Up @@ -35,7 +35,6 @@ use crate::eth::storage::Storage;
use crate::eth::storage::StratusStorage;
use crate::ext::spawn_thread;
use crate::ext::to_json_string;
use crate::ext::MutexExt;
#[cfg(feature = "metrics")]
use crate::ext::OptionExt;
#[cfg(feature = "metrics")]
Expand Down Expand Up @@ -399,7 +398,7 @@ impl Executor {
// * Conflict detection runs, but it should never trigger because of the Mutex.
ExecutorStrategy::Serial => {
// acquire serial execution lock
let _serial_lock = self.locks.serial.lock_or_clear("executor serial lock was poisoned");
let _serial_lock = self.locks.serial.lock();

// execute transaction
self.execute_local_transaction_attempts(tx, EvmRoute::Serial, INFINITE_ATTEMPTS)
Expand Down
27 changes: 10 additions & 17 deletions src/eth/miner/miner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@ use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::sync::mpsc;
use std::sync::Arc;
use std::sync::Mutex;
use std::sync::RwLock;
use std::time::Duration;

use anyhow::anyhow;
use itertools::Itertools;
use keccak_hasher::KeccakHasher;
use parking_lot::Mutex;
use tokio::sync::broadcast;
use tokio::sync::Mutex as AsyncMutex;
use tokio::task::JoinSet;
Expand All @@ -34,8 +34,6 @@ use crate::eth::storage::Storage;
use crate::eth::storage::StratusStorage;
use crate::ext::not;
use crate::ext::DisplayExt;
use crate::ext::MutexExt;
use crate::ext::MutexResultExt;
use crate::globals::STRATUS_SHUTDOWN_SIGNAL;
use crate::infra::tracing::SpanExt;
use crate::log_and_err;
Expand Down Expand Up @@ -128,7 +126,7 @@ impl Miner {

joinset.spawn(interval_miner_ticker::run(block_time, ticks_tx, new_shutdown_signal.clone()));

*self.shutdown_signal.lock_or_clear("setting up shutdown signal for interval miner") = new_shutdown_signal;
*self.shutdown_signal.lock() = new_shutdown_signal;
*self.interval_joinset.lock().await = Some(joinset);
}

Expand Down Expand Up @@ -195,7 +193,7 @@ impl Miner {

tracing::warn!("Shutting down interval miner to switch to external mode");

self.shutdown_signal.lock_or_clear("sending shutdown signal to interval miner").cancel();
self.shutdown_signal.lock().cancel();

// wait for all tasks to end
while let Some(result) = joinset.join_next().await {
Expand All @@ -217,11 +215,7 @@ impl Miner {
let is_automine = self.mode().is_automine();

// if automine is enabled, only one transaction can enter the block at a time.
let _save_execution_lock = if is_automine {
Some(self.locks.save_execution.lock().map_lock_error("save_execution")?)
} else {
None
};
let _save_execution_lock = if is_automine { Some(self.locks.save_execution.lock()) } else { None };

// save execution to temporary storage
self.storage.save_execution(tx_execution, check_conflicts)?;
Expand All @@ -239,7 +233,7 @@ impl Miner {

/// Same as [`Self::mine_external`], but automatically commits the block instead of returning it.
pub fn mine_external_and_commit(&self, external_block: ExternalBlock) -> anyhow::Result<()> {
let _mine_and_commit_lock = self.locks.mine_and_commit.lock().map_lock_error("mine_external_and_commit")?;
let _mine_and_commit_lock = self.locks.mine_and_commit.lock();

let block = self.mine_external(external_block)?;
self.commit(block)
Expand All @@ -254,7 +248,7 @@ impl Miner {
let _span = info_span!("miner::mine_external", block_number = field::Empty).entered();

// lock
let _mine_lock = self.locks.mine.lock().map_lock_error("mine_external")?;
let _mine_lock = self.locks.mine.lock();

// mine block
let block = self.storage.finish_pending_block()?;
Expand All @@ -277,7 +271,7 @@ impl Miner {
/// Same as [`Self::mine_local`], but automatically commits the block instead of returning it.
/// mainly used when is_automine is enabled.
pub fn mine_local_and_commit(&self) -> anyhow::Result<()> {
let _mine_and_commit_lock = self.locks.mine_and_commit.lock().map_lock_error("mine_local_and_commit")?;
let _mine_and_commit_lock = self.locks.mine_and_commit.lock();

let block = self.mine_local()?;
self.commit(block)
Expand All @@ -291,7 +285,7 @@ impl Miner {
let _span = info_span!("miner::mine_local", block_number = field::Empty).entered();

// lock
let _mine_lock = self.locks.mine.lock().map_lock_error("mine_local")?;
let _mine_lock = self.locks.mine.lock();

// mine block
let block = self.storage.finish_pending_block()?;
Expand Down Expand Up @@ -320,7 +314,7 @@ impl Miner {
tracing::info!(%block_number, transactions_len = %block.transactions.len(), "commiting block");

// lock
let _commit_lock = self.locks.commit.lock().map_lock_error("commit")?;
let _commit_lock = self.locks.commit.lock();

tracing::info!(%block_number, "miner acquired commit lock");

Expand Down Expand Up @@ -453,7 +447,6 @@ mod interval_miner {
use tokio_util::sync::CancellationToken;

use crate::eth::miner::Miner;
use crate::ext::MutexExt;
use crate::infra::tracing::warn_task_cancellation;
use crate::infra::tracing::warn_task_rx_closed;

Expand Down Expand Up @@ -489,7 +482,7 @@ mod interval_miner {

#[inline(always)]
fn mine_and_commit(miner: &Miner) {
let _mine_and_commit_lock = miner.locks.mine_and_commit.lock_or_clear("mutex in mine_and_commit is poisoned");
let _mine_and_commit_lock = miner.locks.mine_and_commit.lock();

// mine
let block = loop {
Expand Down
6 changes: 2 additions & 4 deletions src/eth/storage/permanent/rocks/rocks_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,13 @@ use crate::eth::primitives::PointInTime;
use crate::eth::primitives::Slot;
use crate::eth::primitives::SlotIndex;
use crate::eth::primitives::TransactionMined;
#[cfg(feature = "metrics")]
use crate::ext::MutexExt;
use crate::ext::OptionExt;
use crate::log_and_err;
use crate::utils::GIGABYTE;

cfg_if::cfg_if! {
if #[cfg(feature = "metrics")] {
use std::sync::Mutex;
use parking_lot::Mutex;

use rocksdb::statistics::Histogram;
use rocksdb::statistics::Ticker;
Expand Down Expand Up @@ -597,7 +595,7 @@ impl RocksStorageState {
// The stats are cumulative since opening the db
// we can get the average in the time interval with: avg = (new_sum - sum)/(new_count - count)

let mut prev_values = self.prev_stats.lock_or_clear("mutex in get_histogram_average_in_interval is poisoned");
let mut prev_values = self.prev_stats.lock();

let (prev_sum, prev_count): (Sum, Count) = *prev_values.get(&(hist as u32)).unwrap_or(&(0, 0));
let data = self.db_options.get_histogram_data(hist);
Expand Down
16 changes: 0 additions & 16 deletions src/ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@
use std::collections::BTreeMap;
use std::collections::HashMap;
use std::sync::Mutex;
use std::sync::MutexGuard;
use std::time::Duration;

use anyhow::anyhow;
Expand Down Expand Up @@ -176,20 +174,6 @@ impl<T> MutexResultExt<T> for Result<T, std::sync::PoisonError<T>> {
}
}

pub trait MutexExt<T> {
fn lock_or_clear<'a>(&'a self, error_context: &str) -> MutexGuard<'a, T>;
}

impl<T> MutexExt<T> for Mutex<T> {
fn lock_or_clear<'a>(&'a self, error_context: &str) -> MutexGuard<'a, T> {
self.lock().unwrap_or_else(|poison_err| {
tracing::error!(error_context, "fatal: failed to lock mutex");
self.clear_poison();
poison_err.into_inner()
})
}
}

// -----------------------------------------------------------------------------
// Duration
// -----------------------------------------------------------------------------
Expand Down
7 changes: 3 additions & 4 deletions src/globals.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use std::fmt::Debug;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::sync::Mutex;

use chrono::DateTime;
use chrono::Utc;
use once_cell::sync::Lazy;
use parking_lot::Mutex;
use sentry::ClientInitGuard;
use serde::Deserialize;
use serde::Serialize;
Expand All @@ -22,7 +22,6 @@ use crate::eth::follower::importer::Importer;
use crate::eth::rpc::RpcContext;
use crate::ext::not;
use crate::ext::spawn_signal_handler;
use crate::ext::MutexExt;
use crate::infra::tracing::warn_task_cancellation;

// -----------------------------------------------------------------------------
Expand Down Expand Up @@ -262,11 +261,11 @@ impl GlobalState {
}

pub fn set_node_mode(mode: NodeMode) {
*NODE_MODE.lock_or_clear("set_node_mode") = mode;
*NODE_MODE.lock() = mode;
}

pub fn get_node_mode() -> NodeMode {
*NODE_MODE.lock_or_clear("get_node_mode")
*NODE_MODE.lock()
}

// -------------------------------------------------------------------------
Expand Down

0 comments on commit 30fae30

Please sign in to comment.