Skip to content

Commit

Permalink
perf: only send block header to notifications
Browse files Browse the repository at this point in the history
  • Loading branch information
dinhani-cw committed Aug 12, 2024
1 parent 5cf6607 commit ea4fa77
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 8 deletions.
8 changes: 5 additions & 3 deletions src/eth/miner/miner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ pub struct Miner {
pub notifier_pending_txs: broadcast::Sender<Hash>,

/// Broadcasts new mined blocks events.
pub notifier_blocks: broadcast::Sender<Block>,
pub notifier_blocks: broadcast::Sender<BlockHeader>,

/// Broadcasts transaction logs events.
pub notifier_logs: broadcast::Sender<LogMined>,
Expand Down Expand Up @@ -255,16 +255,18 @@ impl Miner {

// extract fields to use in notifications
let block_number = block.number();
let block_header = block.header.clone();
let block_logs: Vec<LogMined> = block.transactions.iter().flat_map(|tx| &tx.logs).cloned().collect();

self.storage.save_block(block.clone())?;
// save storage
self.storage.save_block(block)?;
self.storage.set_mined_block_number(block_number)?;

// notify
for log in block_logs {
let _ = self.notifier_logs.send(log);
}
let _ = self.notifier_blocks.send(block);
let _ = self.notifier_blocks.send(block_header);

Ok(())
}
Expand Down
10 changes: 5 additions & 5 deletions src/eth/rpc/rpc_subscriptions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use tokio::task::JoinHandle;
use tokio::time::timeout;
use tokio::time::Duration;

use crate::eth::primitives::Block;
use crate::eth::primitives::BlockHeader;
use crate::eth::primitives::DateTimeNow;
use crate::eth::primitives::Hash;
use crate::eth::primitives::LogFilter;
Expand Down Expand Up @@ -59,7 +59,7 @@ pub struct RpcSubscriptions {

impl RpcSubscriptions {
/// Creates a new subscription manager that automatically spawns all necessary tasks in background.
pub fn spawn(rx_pending_txs: broadcast::Receiver<Hash>, rx_blocks: broadcast::Receiver<Block>, rx_logs: broadcast::Receiver<LogMined>) -> Self {
pub fn spawn(rx_pending_txs: broadcast::Receiver<Hash>, rx_blocks: broadcast::Receiver<BlockHeader>, rx_logs: broadcast::Receiver<LogMined>) -> Self {
let connected = Arc::new(RpcSubscriptionsConnected::default());

Self::spawn_subscriptions_cleaner(Arc::clone(&connected));
Expand Down Expand Up @@ -166,23 +166,23 @@ impl RpcSubscriptions {
}

/// Spawns a new task that notifies subscribers about new created blocks.
fn spawn_new_heads_notifier(subs: Arc<RpcSubscriptionsConnected>, mut rx_block: broadcast::Receiver<Block>) -> JoinHandle<anyhow::Result<()>> {
fn spawn_new_heads_notifier(subs: Arc<RpcSubscriptionsConnected>, mut rx_block: broadcast::Receiver<BlockHeader>) -> JoinHandle<anyhow::Result<()>> {
const TASK_NAME: &str = "rpc::sub::newHeads";
spawn_named(TASK_NAME, async move {
loop {
if GlobalState::is_shutdown_warn(TASK_NAME) {
return Ok(());
}

let block = match timeout(NOTIFIER_SHUTDOWN_CHECK_INTERVAL, rx_block.recv()).await {
let block_header = match timeout(NOTIFIER_SHUTDOWN_CHECK_INTERVAL, rx_block.recv()).await {
Ok(Ok(block)) => block,
Ok(Err(_channel_closed)) => break,
Err(_timed_out) => continue,
};

let interested_subs = subs.new_heads.read().await;
let interested_subs = interested_subs.values().collect_vec();
Self::notify(interested_subs, block.header);
Self::notify(interested_subs, block_header);
}
warn_task_rx_closed(TASK_NAME);
Ok(())
Expand Down

0 comments on commit ea4fa77

Please sign in to comment.