Skip to content

Commit

Permalink
perf: improve importer-offline hot path
Browse files Browse the repository at this point in the history
  • Loading branch information
dinhani-cw committed Aug 10, 2024
1 parent f9899da commit b7515f3
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 43 deletions.
19 changes: 6 additions & 13 deletions src/eth/executor/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,14 +222,12 @@ impl Executor {
// -------------------------------------------------------------------------

/// Reexecutes an external block locally and imports it to the temporary storage.
#[tracing::instrument(name = "executor::external_block", skip_all, fields(block_number))]
pub fn execute_external_block(&self, block: &ExternalBlock, receipts: &ExternalReceipts) -> anyhow::Result<()> {
// track
#[cfg(feature = "metrics")]
let (start, mut block_metrics) = (metrics::now(), EvmExecutionMetrics::default());

Span::with(|s| {
s.rec_str("block_number", &block.number());
});
#[cfg(feature = "tracing")]
let _span = info_span!("executor::external_block", block_number = %block.number()).entered();
tracing::info!(block_number = %block.number(), "reexecuting external block");

// track pending block number
Expand Down Expand Up @@ -264,21 +262,18 @@ impl Executor {
///
/// This function wraps `reexecute_external_tx_inner` and returns back the payload
/// to facilitate re-execution of parallel transactions that failed
#[tracing::instrument(name = "executor::external_transaction", skip_all, fields(tx_hash))]
fn execute_external_transaction(
&self,
tx: &ExternalTransaction,
receipt: &ExternalReceipt,
block: &ExternalBlock,
#[cfg(feature = "metrics")] block_metrics: &mut EvmExecutionMetrics,
) -> anyhow::Result<()> {
// track
#[cfg(feature = "metrics")]
let start = metrics::now();

// track
Span::with(|s| {
s.rec_str("tx_hash", &tx.hash);
});
#[cfg(feature = "tracing")]
let _span = info_span!("executor::external_transaction", tx_hash = %tx.hash).entered();
tracing::info!(block_number = %block.number(), tx_hash = %tx.hash(), "reexecuting external transaction");

// when transaction externally failed, create fake transaction instead of reexecuting
Expand Down Expand Up @@ -333,7 +328,6 @@ impl Executor {
self.miner.save_execution(tx_execution.clone())?;

// track metrics

#[cfg(feature = "metrics")]
{
let evm_execution = tx_execution.execution();
Expand Down Expand Up @@ -394,7 +388,6 @@ impl Executor {
self.locks.serial.clear_poison();
poison.into_inner()
}));
tracing::info!("executor acquired mine_and_commit lock to prevent executor mining block");
miner_lock
} else {
None
Expand Down
39 changes: 21 additions & 18 deletions src/eth/miner/miner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ use std::sync::Mutex;
use keccak_hasher::KeccakHasher;
use nonempty::NonEmpty;
use tokio::sync::broadcast;
use tracing::field;
use tracing::info_span;
use tracing::Span;

use crate::eth::miner::MinerMode;
Expand Down Expand Up @@ -37,7 +39,7 @@ pub struct Miner {
pub mode: MinerMode,

/// Broadcasts pending transactions events.
pub notifier_pending_txs: broadcast::Sender<TransactionExecution>,
pub notifier_pending_txs: broadcast::Sender<Hash>,

/// Broadcasts new mined blocks events.
pub notifier_blocks: broadcast::Sender<Block>,
Expand Down Expand Up @@ -90,11 +92,10 @@ impl Miner {
}

/// Persists a transaction execution.
#[tracing::instrument(name = "miner::save_execution", skip_all, fields(tx_hash))]
pub fn save_execution(&self, tx_execution: TransactionExecution) -> Result<(), StratusError> {
Span::with(|s| {
s.rec_str("tx_hash", &tx_execution.hash());
});
// track
#[cfg(feature = "tracing")]
let _span = info_span!("miner::save_execution", tx_hash = %tx_execution.hash()).entered();

// if automine is enabled, only one transaction can enter the block at time.
let _save_execution_lock = if self.mode.is_automine() {
Expand All @@ -104,10 +105,11 @@ impl Miner {
};

// save execution to temporary storage
self.storage.save_execution(tx_execution.clone())?;
let tx_hash = tx_execution.hash();
self.storage.save_execution(tx_execution)?;

// if automine is enabled, automatically mines a block
let _ = self.notifier_pending_txs.send(tx_execution);
let _ = self.notifier_pending_txs.send(tx_hash);
if self.mode.is_automine() {
self.mine_local_and_commit()?;
}
Expand All @@ -118,7 +120,6 @@ impl Miner {
/// Same as [`Self::mine_external`], but automatically commits the block instead of returning it.
pub fn mine_external_and_commit(&self) -> anyhow::Result<()> {
let _mine_and_commit_lock = self.locks.mine_and_commit.lock().unwrap();
tracing::info!("miner acquired mine and commit lock for external block");

let block = self.mine_external()?;
self.commit(block)
Expand All @@ -127,13 +128,14 @@ impl Miner {
/// Mines external block and external transactions.
///
/// Local transactions are not allowed to be part of the block.
#[tracing::instrument(name = "miner::mine_external", skip_all, fields(block_number))]
pub fn mine_external(&self) -> anyhow::Result<Block> {
// track
#[cfg(feature = "tracing")]
let _span = info_span!("miner::mine_external", block_number = field::Empty).entered();
tracing::debug!("mining external block");

// lock
let _mine_lock = self.locks.mine.lock().unwrap();
tracing::info!("miner acquired mine lock for external block");

// mine
let block = self.storage.finish_pending_block()?;
Expand All @@ -151,13 +153,17 @@ impl Miner {
let mined_txs = mine_external_transactions(block.number, external_txs)?;
let block = block_from_external(external_block, mined_txs);

block.inspect(|block| Span::with(|s| s.rec_str("block_number", &block.number())))
#[cfg(feature = "tracing")]
if let Ok(ref block) = block {
Span::with(|s| s.rec_str("block_number", &block.number()));
}

block
}

/// Same as [`Self::mine_external_mixed`], but automatically commits the block instead of returning it.
pub fn mine_external_mixed_and_commit(&self) -> anyhow::Result<()> {
let _mine_and_commit_lock = self.locks.mine_and_commit.lock().unwrap();
tracing::info!("miner acquired mine and commit lock for external mixed local block");

let block = self.mine_external_mixed()?;
self.commit(block)
Expand All @@ -172,7 +178,6 @@ impl Miner {

// lock
let _mine_lock = self.locks.mine.lock().unwrap();
tracing::info!("miner acquired mining lock for external mixed block");

// mine
let block = self.storage.finish_pending_block()?;
Expand Down Expand Up @@ -204,7 +209,6 @@ impl Miner {
/// mainly used when is_automine is enabled.
pub fn mine_local_and_commit(&self) -> anyhow::Result<()> {
let _mine_and_commit_lock = self.locks.mine_and_commit.lock().unwrap();
tracing::info!("miner acquired mine and commit lock for local block");

let block = self.mine_local()?;
self.commit(block)
Expand All @@ -219,7 +223,6 @@ impl Miner {

// lock
let _mine_lock = self.locks.mine.lock().unwrap();
tracing::info!("miner acquired mining lock for local block");

// mine
let block = self.storage.finish_pending_block()?;
Expand All @@ -240,9 +243,10 @@ impl Miner {
}

/// Persists a mined block to permanent storage and prepares new block.
#[tracing::instrument(name = "miner::commit", skip_all, fields(block_number))]
pub fn commit(&self, block: Block) -> anyhow::Result<()> {
Span::with(|s| s.rec_str("block_number", &block.number()));
// track
#[cfg(feature = "tracing")]
let _span = info_span!("miner::commit", block_number = %block.number()).entered();
tracing::info!(block_number = %block.number(), transactions_len = %block.transactions.len(), "commiting block");

// lock
Expand Down Expand Up @@ -407,7 +411,6 @@ mod interval_miner {
#[inline(always)]
fn mine_and_commit(miner: &Miner) {
let _mine_and_commit_lock = miner.locks.mine_and_commit.lock().unwrap();
tracing::info!("miner acquired mine and commit lock for interval local block");

// mine
let block = loop {
Expand Down
17 changes: 5 additions & 12 deletions src/eth/rpc/rpc_subscriptions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@ use tokio::time::Duration;

use crate::eth::primitives::Block;
use crate::eth::primitives::DateTimeNow;
use crate::eth::primitives::Hash;
use crate::eth::primitives::LogFilter;
use crate::eth::primitives::LogFilterInput;
use crate::eth::primitives::LogMined;
use crate::eth::primitives::StratusError;
use crate::eth::primitives::TransactionExecution;
use crate::eth::rpc::RpcClientApp;
use crate::ext::not;
use crate::ext::spawn_named;
Expand Down Expand Up @@ -59,11 +59,7 @@ pub struct RpcSubscriptions {

impl RpcSubscriptions {
/// Creates a new subscription manager that automatically spawns all necessary tasks in background.
pub fn spawn(
rx_pending_txs: broadcast::Receiver<TransactionExecution>,
rx_blocks: broadcast::Receiver<Block>,
rx_logs: broadcast::Receiver<LogMined>,
) -> Self {
pub fn spawn(rx_pending_txs: broadcast::Receiver<Hash>, rx_blocks: broadcast::Receiver<Block>, rx_logs: broadcast::Receiver<LogMined>) -> Self {
let connected = Arc::new(RpcSubscriptionsConnected::default());

Self::spawn_subscriptions_cleaner(Arc::clone(&connected));
Expand Down Expand Up @@ -146,26 +142,23 @@ impl RpcSubscriptions {
}

/// Spawns a new task that notifies subscribers about new executed transactions.
fn spawn_new_pending_txs_notifier(
subs: Arc<RpcSubscriptionsConnected>,
mut rx_tx_hash: broadcast::Receiver<TransactionExecution>,
) -> JoinHandle<anyhow::Result<()>> {
fn spawn_new_pending_txs_notifier(subs: Arc<RpcSubscriptionsConnected>, mut rx_tx_hash: broadcast::Receiver<Hash>) -> JoinHandle<anyhow::Result<()>> {
const TASK_NAME: &str = "rpc::sub::newPendingTransactions";
spawn_named(TASK_NAME, async move {
loop {
if GlobalState::is_shutdown_warn(TASK_NAME) {
return Ok(());
}

let tx = match timeout(NOTIFIER_SHUTDOWN_CHECK_INTERVAL, rx_tx_hash.recv()).await {
let tx_hash = match timeout(NOTIFIER_SHUTDOWN_CHECK_INTERVAL, rx_tx_hash.recv()).await {
Ok(Ok(tx)) => tx,
Ok(Err(_channel_closed)) => break,
Err(_timed_out) => continue,
};

let interested_subs = subs.pending_txs.read().await;
let interested_subs = interested_subs.values().collect_vec();
Self::notify(interested_subs, tx.hash().to_string());
Self::notify(interested_subs, tx_hash.to_string());
}
warn_task_rx_closed(TASK_NAME);
Ok(())
Expand Down

0 comments on commit b7515f3

Please sign in to comment.