diff --git a/proto/op_pool.proto b/proto/op_pool.proto index b64d76886..da6a8921c 100644 --- a/proto/op_pool.proto +++ b/proto/op_pool.proto @@ -50,6 +50,7 @@ service OpPool { rpc DebugDumpMempool (DebugDumpMempoolRequest) returns (DebugDumpMempoolResponse); rpc DebugSetReputation (DebugSetReputationRequest) returns (DebugSetReputationResponse); rpc DebugDumpReputation (DebugDumpReputationRequest) returns (DebugDumpReputationResponse); + rpc SubscribeNewBlocks(SubscribeNewBlocksRequest) returns (stream NewBlock); } message GetSupportedEntryPointsRequest {} @@ -157,6 +158,12 @@ message DebugDumpReputationSuccess { repeated Reputation reputations = 1; } +message SubscribeNewBlocksRequest {} +message NewBlock { + bytes hash = 1; + uint64 number = 2; +} + message Reputation { bytes address = 1; ReputationStatus status = 2; diff --git a/src/builder/bundle_sender.rs b/src/builder/bundle_sender.rs index f28653da2..18afe1a32 100644 --- a/src/builder/bundle_sender.rs +++ b/src/builder/bundle_sender.rs @@ -7,13 +7,10 @@ use std::{ }; use anyhow::{bail, Context}; -use ethers::{ - providers::{Http, Middleware, Provider, RetryClient}, - types::{transaction::eip2718::TypedTransaction, Address, H256, U256}, -}; +use ethers::types::{transaction::eip2718::TypedTransaction, Address, H256, U256}; use tokio::{ join, - sync::{mpsc, oneshot}, + sync::{broadcast, mpsc, oneshot}, time, }; use tonic::async_trait; @@ -29,7 +26,7 @@ use crate::{ math, types::{Entity, EntryPointLike, ExpectedStorage, UserOperation}, }, - op_pool::PoolClient, + op_pool::{NewBlock, PoolClient}, }; // Overhead on gas estimates to account for inaccuracies. @@ -63,8 +60,6 @@ where entry_point: E, transaction_tracker: T, pool_client: C, - // TODO: Figure out what we really want to do for detecting new blocks. - provider: Arc>>, settings: Settings, } @@ -108,6 +103,13 @@ where /// next one. async fn send_bundles_in_loop(&mut self) { let mut last_block_number = 0; + let mut new_blocks = if let Ok(new_blocks) = self.pool_client.subscribe_new_blocks().await { + new_blocks + } else { + error!("Failed to subscribe to new blocks"); + return; + }; + loop { let mut send_bundle_response: Option> = None; @@ -122,7 +124,17 @@ where } } - last_block_number = self.wait_for_new_block_number(last_block_number).await; + last_block_number = match self + .wait_for_new_block_number(last_block_number, &mut new_blocks) + .await + { + Ok(l) => l, + Err(e) => { + error!("Failed to retrieve new block number: {e}"); + continue; + } + }; + self.check_for_and_log_transaction_update().await; let result = self.send_bundle_with_increasing_gas_fees().await; match &result { @@ -175,7 +187,6 @@ where entry_point: E, transaction_tracker: T, pool_client: C, - provider: Arc>>, settings: Settings, ) -> Self { Self { @@ -188,7 +199,6 @@ where entry_point, transaction_tracker, pool_client, - provider, settings, } } @@ -326,23 +336,19 @@ where Ok(SendBundleResult::StalledAtMaxFeeIncreases) } - async fn wait_for_new_block_number(&self, prev_block_number: u64) -> u64 { + async fn wait_for_new_block_number( + &self, + prev_block_number: u64, + new_blocks: &mut broadcast::Receiver, + ) -> anyhow::Result { loop { - let block_number = self.provider.get_block_number().await; - match block_number { - Ok(n) => { - let n = n.as_u64(); - if n > prev_block_number { - return n; - } - } - Err(error) => { - error!( - "Failed to load latest block number in builder. Will keep trying: {error}" - ); - } + let block = new_blocks + .recv() + .await + .context("builder should receive new blocks")?; + if block.number > prev_block_number { + return Ok(block.number); } - time::sleep(self.eth_poll_interval).await; } } diff --git a/src/builder/task.rs b/src/builder/task.rs index 8111bcad9..5d28b33a3 100644 --- a/src/builder/task.rs +++ b/src/builder/task.rs @@ -150,8 +150,12 @@ impl Task for BuilderTask { let (send_bundle_tx, send_bundle_rx) = mpsc::channel(1); let mut builder: Box = match &self.args.pool_client_mode { - PoolClientMode::Local { sender } => { - let pool_client = LocalPoolClient::new(sender.clone()); + PoolClientMode::Local { + sender, + block_receiver, + } => { + let pool_client = + LocalPoolClient::new(sender.clone(), block_receiver.resubscribe()); let proposer = BundleProposerImpl::new( pool_client.clone(), simulator, @@ -170,7 +174,6 @@ impl Task for BuilderTask { entry_point, transaction_tracker, pool_client, - provider, builder_settings, )) } @@ -194,7 +197,6 @@ impl Task for BuilderTask { entry_point, transaction_tracker, pool_client, - provider, builder_settings, )) } diff --git a/src/cli/node.rs b/src/cli/node.rs index 2ba6ffd32..d896351a3 100644 --- a/src/cli/node.rs +++ b/src/cli/node.rs @@ -1,5 +1,5 @@ use clap::Args; -use tokio::sync::mpsc; +use tokio::sync::{broadcast, mpsc}; use super::{builder::BuilderArgs, pool::PoolArgs, rpc::RpcArgs, CommonArgs}; use crate::{ @@ -31,19 +31,35 @@ pub async fn run(bundler_args: NodeCliArgs, common_args: CommonArgs) -> anyhow:: let builder_url = builder_args.url(false); let (tx, rx) = mpsc::channel(1024); + let (block_sender, block_receiver) = broadcast::channel(1024); let pool_task_args = pool_args - .to_args(&common_args, PoolServerMode::Local { receiver: Some(rx) }) + .to_args( + &common_args, + PoolServerMode::Local { + req_receiver: Some(rx), + block_sender: Some(block_sender), + }, + ) .await?; let builder_task_args = builder_args - .to_args(&common_args, PoolClientMode::Local { sender: tx.clone() }) + .to_args( + &common_args, + PoolClientMode::Local { + sender: tx.clone(), + block_receiver: block_receiver.resubscribe(), + }, + ) .await?; let rpc_task_args = rpc_args .to_args( &common_args, builder_url, (&common_args).try_into()?, - PoolClientMode::Local { sender: tx.clone() }, + PoolClientMode::Local { + sender: tx.clone(), + block_receiver, + }, ) .await?; diff --git a/src/op_pool/event/listener.rs b/src/op_pool/event/listener.rs index e7c15b03b..9e5b4a52e 100644 --- a/src/op_pool/event/listener.rs +++ b/src/op_pool/event/listener.rs @@ -34,19 +34,14 @@ pub struct EventListener { provider_factory: F, provider: Option, log_filter_base: Filter, - entrypoint_event_broadcasts: HashMap>>, + event_broadcast: broadcast::Sender>, last_reconnect: Option, backoff_idx: u32, } impl EventProvider for EventListener { - fn subscribe_by_entrypoint( - &self, - entry_point: Address, - ) -> Option>> { - self.entrypoint_event_broadcasts - .get(&entry_point) - .map(|b| b.subscribe()) + fn subscribe(&self) -> broadcast::Receiver> { + self.event_broadcast.subscribe() } fn spawn( @@ -60,24 +55,12 @@ impl EventProvider for EventListener { impl EventListener { /// Create a new event listener from a block provider factory and list entry points /// Must call listen_with_shutdown to start listening - pub fn new<'a>( - provider_factory: F, - entry_points: impl IntoIterator, - ) -> Self { - let mut entry_point_addresses = vec![]; - let mut entrypoint_event_broadcasts = HashMap::new(); - for ep in entry_points { - entry_point_addresses.push(*ep); - entrypoint_event_broadcasts.insert(*ep, broadcast::channel(1000).0); - } - - let log_filter_base = Filter::new().address(entry_point_addresses); - + pub fn new(provider_factory: F, entry_points: Vec
) -> Self { Self { provider_factory, provider: None, - log_filter_base, - entrypoint_event_broadcasts, + log_filter_base: Filter::new().address(entry_points), + event_broadcast: broadcast::channel(1024).0, last_reconnect: None, backoff_idx: 0, } @@ -209,30 +192,18 @@ impl EventListener { .block .number .context("block should have number")?; - let mut block_events = HashMap::new(); EventListenerMetrics::increment_blocks_seen(); EventListenerMetrics::set_block_height(block_number.as_u64()); - for ep in self.entrypoint_event_broadcasts.keys() { - block_events.insert( - *ep, - NewBlockEvent { - address: *ep, - hash: block_hash, - number: block_number, - events: vec![], - }, - ); - } + let mut block_event = NewBlockEvent { + hash: block_hash, + number: block_number, + events: HashMap::new(), + }; for log in block_with_logs.logs { let ep_address = log.address; - if !block_events.contains_key(&ep_address) { - error!("Received log for unknown entrypoint {ep_address:?}"); - continue; - } - let txn_hash = log .transaction_hash .context("log should have transaction hash")?; @@ -246,24 +217,17 @@ impl EventListener { txn_index, }; - block_events.entry(ep_address).and_modify(|e| { - e.events.push(event); - }); + block_event + .events + .entry(ep_address) + .or_insert_with(Vec::new) + .push(event); EventListenerMetrics::increment_events_seen(); } - for (ep, block_event) in block_events { - match self.entrypoint_event_broadcasts.get(&ep) { - Some(broadcast) => { - // ignore sender errors, which can only happen if there are no receivers - let _ = broadcast.send(Arc::new(block_event)); - } - None => { - error!("No broadcast channel for entry point: {:?}", ep); - } - } + if let Err(e) = self.event_broadcast.send(Arc::new(block_event)) { + error!("Error broadcasting block event: {:?}", e); } - Ok(()) } @@ -391,7 +355,7 @@ mod tests { assert_eq!(block_event.events.len(), 1); if let IEntryPointEvents::UserOperationEventFilter(block_event) = - &block_event.events[0].contract_event + &block_event.events.get(&state.ep).unwrap()[0].contract_event { assert_eq!(block_event, &event); } else { @@ -427,7 +391,7 @@ mod tests { assert_eq!(block_event.events.len(), 1); if let IEntryPointEvents::UserOperationEventFilter(block_event) = - &block_event.events[0].contract_event + &block_event.events.get(&state.ep).unwrap()[0].contract_event { assert_eq!(block_event, &event); } else { @@ -543,8 +507,8 @@ mod tests { let factory = MockBlockProviderFactory::new(rx, connection_event_tx, should_fail_connection.clone()); let ep = Address::random(); - let listener = EventListener::new(factory, vec![&ep]); - let events = listener.subscribe_by_entrypoint(ep).unwrap(); + let listener = EventListener::new(factory, vec![ep]); + let events = listener.subscribe(); let listener_shutdown = shutdown_token.clone(); let handle = tokio::spawn(async move { listener.listen_with_shutdown(listener_shutdown).await }); diff --git a/src/op_pool/event/mod.rs b/src/op_pool/event/mod.rs index 974d107c2..0618fb8d3 100644 --- a/src/op_pool/event/mod.rs +++ b/src/op_pool/event/mod.rs @@ -1,4 +1,4 @@ -use std::sync::Arc; +use std::{collections::HashMap, sync::Arc}; use ethers::types::{Address, Block, Filter, Log, H256, U64}; use tokio::{sync::broadcast, task::JoinHandle}; @@ -20,14 +20,12 @@ pub use http::HttpBlockProviderFactory; /// Events correspond to a single entry point #[derive(Debug)] pub struct NewBlockEvent { - /// The entry point address - pub address: Address, /// The block hash pub hash: H256, /// The block number pub number: U64, - /// Ordered EntryPoint events - pub events: Vec, + /// Ordered EntryPoint events by address + pub events: HashMap>, } /// An event emitted by an entry point with metadata @@ -43,11 +41,8 @@ pub struct EntryPointEvent { /// A trait that provides a stream of new blocks with their events by entrypoint pub trait EventProvider: Send + Sync { - /// Subscribe to new blocks by entrypoint - fn subscribe_by_entrypoint( - &self, - entry_point: Address, - ) -> Option>>; + /// Subscribe to a new block stream + fn subscribe(&self) -> broadcast::Receiver>; /// Spawn the event provider fn spawn( diff --git a/src/op_pool/mempool/mod.rs b/src/op_pool/mempool/mod.rs index b7272537a..f3ca9e7db 100644 --- a/src/op_pool/mempool/mod.rs +++ b/src/op_pool/mempool/mod.rs @@ -16,6 +16,8 @@ pub use reputation::{ HourlyMovingAverageReputation, Reputation, ReputationParams, ReputationStatus, }; use strum::IntoEnumIterator; +use tokio::sync::broadcast; +use tokio_util::sync::CancellationToken; use tonic::async_trait; use self::error::MempoolResult; @@ -180,16 +182,45 @@ impl PoolOperation { #[derive(Debug)] pub struct MempoolGroup { - mempools: HashMap>, + mempools: HashMap, + block_broadcast: broadcast::Sender>, } impl MempoolGroup where M: Mempool, { - pub fn new(mempools: Vec>) -> Self { + pub fn new(mempools: Vec) -> Self { Self { mempools: mempools.into_iter().map(|m| (m.entry_point(), m)).collect(), + block_broadcast: broadcast::channel(1024).0, + } + } + + pub fn subscribe_new_blocks(self: Arc) -> broadcast::Receiver> { + self.block_broadcast.subscribe() + } + + pub async fn run( + self: Arc, + mut new_block_events: broadcast::Receiver>, + shutdown_token: CancellationToken, + ) { + loop { + tokio::select! { + _ = shutdown_token.cancelled() => { + tracing::info!("Shutting down UoPool"); + break; + } + new_block = new_block_events.recv() => { + if let Ok(new_block) = new_block { + for mempool in self.mempools.values() { + mempool.on_new_block(&new_block); + } + let _ = self.block_broadcast.send(new_block); + } + } + } } } @@ -267,10 +298,9 @@ where Ok(mempool.dump_reputation()) } - pub fn get_pool(&self, entry_point: Address) -> MempoolResult> { + fn get_pool(&self, entry_point: Address) -> MempoolResult<&M> { self.mempools .get(&entry_point) - .cloned() .ok_or_else(|| MempoolError::UnknownEntryPoint(entry_point)) } } diff --git a/src/op_pool/mempool/uo_pool.rs b/src/op_pool/mempool/uo_pool.rs index 2dd7e6e3b..6cdbbbf87 100644 --- a/src/op_pool/mempool/uo_pool.rs +++ b/src/op_pool/mempool/uo_pool.rs @@ -5,8 +5,6 @@ use std::{ use ethers::types::{Address, H256}; use parking_lot::RwLock; -use tokio::sync::broadcast; -use tokio_util::sync::CancellationToken; use tonic::async_trait; use super::{ @@ -66,26 +64,6 @@ where simulator, } } - - pub async fn run( - self: Arc, - mut new_block_events: broadcast::Receiver>, - shutdown_token: CancellationToken, - ) { - loop { - tokio::select! { - _ = shutdown_token.cancelled() => { - tracing::info!("Shutting down UoPool"); - break; - } - new_block = new_block_events.recv() => { - if let Ok(new_block) = new_block { - self.on_new_block(&new_block); - } - } - } - } - } } #[async_trait] @@ -106,17 +84,21 @@ where new_block.number, new_block.events.len() ); - for event in &new_block.events { - if let IEntryPointEvents::UserOperationEventFilter(uo_event) = &event.contract_event { - let op_hash = uo_event.user_op_hash.into(); - if let Some(op) = state.pool.remove_operation_by_hash(op_hash) { - for e in op.staked_entities() { - self.reputation.add_included(e.address); + + if let Some(events) = new_block.events.get(&self.entry_point) { + for event in events { + if let IEntryPointEvents::UserOperationEventFilter(uo_event) = &event.contract_event + { + let op_hash = uo_event.user_op_hash.into(); + if let Some(op) = state.pool.remove_operation_by_hash(op_hash) { + for e in op.staked_entities() { + self.reputation.add_included(e.address); + } } - } - // Remove throttled ops that were included in the block - state.throttled_ops.remove(&op_hash); + // Remove throttled ops that were included in the block + state.throttled_ops.remove(&op_hash); + } } } diff --git a/src/op_pool/mod.rs b/src/op_pool/mod.rs index 7b4e6ce3e..c7477c21b 100644 --- a/src/op_pool/mod.rs +++ b/src/op_pool/mod.rs @@ -7,7 +7,7 @@ pub use mempool::{error::MempoolError, PoolConfig, PoolOperation, Reputation, Re #[cfg(test)] pub use server::MockPoolClient; pub use server::{ - connect_remote_pool_client, LocalPoolClient, PoolClient, PoolClientMode, PoolResult, + connect_remote_pool_client, LocalPoolClient, NewBlock, PoolClient, PoolClientMode, PoolResult, PoolServerError, RemotePoolClient, ServerRequest as LocalPoolServerRequest, }; pub use task::*; diff --git a/src/op_pool/server/local/client.rs b/src/op_pool/server/local/client.rs index b0fc26fc1..18fe6413c 100644 --- a/src/op_pool/server/local/client.rs +++ b/src/op_pool/server/local/client.rs @@ -1,5 +1,5 @@ use ethers::types::{Address, H256}; -use tokio::sync::{mpsc, oneshot}; +use tokio::sync::{broadcast, mpsc, oneshot}; use tonic::async_trait; use super::server::{ServerRequest, ServerRequestKind, ServerResponse}; @@ -7,19 +7,26 @@ use crate::{ common::types::{Entity, UserOperation}, op_pool::{ mempool::PoolOperation, - server::{error::PoolServerError, PoolClient, Reputation}, + server::{error::PoolServerError, NewBlock, PoolClient, Reputation}, PoolResult, }, }; -#[derive(Clone, Debug)] +#[derive(Debug)] pub struct LocalPoolClient { sender: mpsc::Sender, + block_receiver: broadcast::Receiver, } impl LocalPoolClient { - pub fn new(sender: mpsc::Sender) -> Self { - Self { sender } + pub fn new( + sender: mpsc::Sender, + block_receiver: broadcast::Receiver, + ) -> Self { + Self { + sender, + block_receiver, + } } async fn send(&self, request: ServerRequestKind) -> PoolResult { @@ -131,4 +138,17 @@ impl PoolClient for LocalPoolClient { _ => Err(PoolServerError::UnexpectedResponse), } } + + async fn subscribe_new_blocks(&self) -> PoolResult> { + Ok(self.block_receiver.resubscribe()) + } +} + +impl Clone for LocalPoolClient { + fn clone(&self) -> Self { + Self { + sender: self.sender.clone(), + block_receiver: self.block_receiver.resubscribe(), + } + } } diff --git a/src/op_pool/server/local/server.rs b/src/op_pool/server/local/server.rs index a6aa0adac..6772a1b06 100644 --- a/src/op_pool/server/local/server.rs +++ b/src/op_pool/server/local/server.rs @@ -1,6 +1,8 @@ +use std::sync::Arc; + use ethers::types::{Address, H256}; use tokio::{ - sync::{mpsc, oneshot}, + sync::{broadcast, mpsc, oneshot}, task::JoinHandle, }; use tokio_util::sync::CancellationToken; @@ -9,41 +11,61 @@ use crate::{ common::types::{Entity, UserOperation}, op_pool::{ mempool::{Mempool, MempoolGroup, OperationOrigin, PoolOperation}, - server::Reputation, + server::{NewBlock, Reputation}, PoolResult, }, }; pub fn spawn_local_mempool_server( - mempool_runner: MempoolGroup, - receiver: mpsc::Receiver, + mempool_runner: Arc>, + req_receiver: mpsc::Receiver, + block_sender: broadcast::Sender, shutdown_token: CancellationToken, ) -> anyhow::Result>> { - let mut server = LocalPoolServer::new(receiver, mempool_runner); + let mut server = LocalPoolServer::new(req_receiver, block_sender, mempool_runner); let handle = tokio::spawn(async move { server.run(shutdown_token).await }); Ok(handle) } pub struct LocalPoolServer { - receiver: mpsc::Receiver, - mempools: MempoolGroup, + req_receiver: mpsc::Receiver, + block_sender: broadcast::Sender, + mempools: Arc>, } impl LocalPoolServer where M: Mempool, { - pub fn new(receiver: mpsc::Receiver, mempools: MempoolGroup) -> Self { - Self { receiver, mempools } + pub fn new( + req_receiver: mpsc::Receiver, + block_sender: broadcast::Sender, + mempools: Arc>, + ) -> Self { + Self { + req_receiver, + block_sender, + mempools, + } } pub async fn run(&mut self, shutdown_token: CancellationToken) -> anyhow::Result<()> { + let mut new_blocks = self.mempools.clone().subscribe_new_blocks(); + loop { tokio::select! { _ = shutdown_token.cancelled() => { break; } - Some(req) = self.receiver.recv() => { + new_block = new_blocks.recv() => { + if let Ok(new_block) = new_block { + let _ = self.block_sender.send(NewBlock { + hash: new_block.hash, + number: new_block.number.as_u64(), + }); + } + } + Some(req) = self.req_receiver.recv() => { let resp = match req.request { ServerRequestKind::GetSupportedEntryPoints => { Ok(ServerResponse::GetSupportedEntryPoints { @@ -51,15 +73,13 @@ where }) }, ServerRequestKind::AddOp { entry_point, op } => { - let pool = self.mempools.get_pool(entry_point)?; + let mempools = self.mempools.clone(); tokio::spawn(async move { - let resp = match pool.add_operation(OperationOrigin::Local, op).await { + let resp = match mempools.add_op(entry_point, op, OperationOrigin::Local).await { Ok(hash) => Ok(ServerResponse::AddOp { hash }), Err(e) => Err(e.into()), }; - if let Err(e) = req.response.send(resp) { - tracing::error!("Failed to send response: {:?}", e); - } + req.response.send(resp).unwrap(); }); continue; }, @@ -170,10 +190,12 @@ pub enum ServerResponse { #[cfg(test)] mod tests { - use std::sync::Arc; + use std::{collections::HashMap, sync::Arc}; + + use ethers::types::U64; use super::*; - use crate::op_pool::{mempool::MockMempool, LocalPoolClient, PoolClient}; + use crate::op_pool::{event::NewBlockEvent, mempool::MockMempool, LocalPoolClient, PoolClient}; #[tokio::test] async fn send_receive() { @@ -181,12 +203,15 @@ mod tests { let mut mock_pool = MockMempool::new(); mock_pool.expect_entry_point().returning(move || ep); - let mempool_group = MempoolGroup::new(vec![Arc::new(mock_pool)]); + let mempool_group = Arc::new(MempoolGroup::new(vec![mock_pool])); let (tx, rx) = mpsc::channel(1); let shutdown_token = CancellationToken::new(); - let handle = spawn_local_mempool_server(mempool_group, rx, shutdown_token.clone()).unwrap(); + let (block_tx, block_rx) = broadcast::channel(1); + let handle = + spawn_local_mempool_server(mempool_group, rx, block_tx, shutdown_token.clone()) + .unwrap(); + let client = LocalPoolClient::new(tx, block_rx); - let client = LocalPoolClient::new(tx); let ret = client.get_supported_entry_points().await.unwrap(); assert_eq!(ret, vec![ep]); @@ -200,15 +225,18 @@ mod tests { let mut mock_pool = MockMempool::new(); mock_pool.expect_entry_point().returning(move || ep); - let mempool_group = MempoolGroup::new(vec![Arc::new(mock_pool)]); + let mempool_group = Arc::new(MempoolGroup::new(vec![mock_pool])); let (tx, rx) = mpsc::channel(1); let shutdown_token = CancellationToken::new(); - let handle = spawn_local_mempool_server(mempool_group, rx, shutdown_token.clone()).unwrap(); + let (block_tx, block_rx) = broadcast::channel(1); + let handle = + spawn_local_mempool_server(mempool_group, rx, block_tx, shutdown_token.clone()) + .unwrap(); shutdown_token.cancel(); handle.await.unwrap().unwrap(); - let client = LocalPoolClient::new(tx); + let client = LocalPoolClient::new(tx, block_rx); let ret = client.get_supported_entry_points().await; assert!(ret.is_err()); } @@ -225,16 +253,73 @@ mod tests { .expect_add_operation() .returning(move |_, _| Ok(uo_hash)); - let mempool_group = MempoolGroup::new(vec![Arc::new(mock_pool)]); + let mempool_group = Arc::new(MempoolGroup::new(vec![mock_pool])); let (tx, rx) = mpsc::channel(1); let shutdown_token = CancellationToken::new(); - let handle = spawn_local_mempool_server(mempool_group, rx, shutdown_token.clone()).unwrap(); + let (block_tx, block_rx) = broadcast::channel(1); + let handle = + spawn_local_mempool_server(mempool_group, rx, block_tx, shutdown_token.clone()) + .unwrap(); + let client = LocalPoolClient::new(tx, block_rx); - let client = LocalPoolClient::new(tx); let ret = client.add_op(ep, uo).await.unwrap(); assert_eq!(ret, uo_hash); shutdown_token.cancel(); handle.await.unwrap().unwrap(); } + + #[tokio::test] + async fn send_blocks() { + let ep = Address::random(); + let mut mock_pool = MockMempool::new(); + let uo = UserOperation::default(); + let uo_hash = uo.op_hash(ep, 0); + + mock_pool.expect_entry_point().returning(move || ep); + mock_pool + .expect_add_operation() + .returning(move |_, _| Ok(uo_hash)); + mock_pool + .expect_on_new_block() + .times(..11) + .returning(|_| {}); + + let mempool_group = Arc::new(MempoolGroup::new(vec![mock_pool])); + let (tx, rx) = mpsc::channel(1); + let shutdown_token = CancellationToken::new(); + let (block_tx, block_rx) = broadcast::channel(10); + let handle = + spawn_local_mempool_server(mempool_group.clone(), rx, block_tx, shutdown_token.clone()) + .unwrap(); + let client = LocalPoolClient::new(tx, block_rx); + + let (new_blocks_tx, new_blocks_rx) = broadcast::channel(10); + + let mempool_shutdown = shutdown_token.clone(); + let mempool_handle = tokio::spawn(async move { + mempool_group.run(new_blocks_rx, mempool_shutdown).await; + }); + + let mut recv = client.subscribe_new_blocks().await.unwrap(); + + for i in 0..10 { + new_blocks_tx + .send(Arc::new(NewBlockEvent { + hash: H256::random(), + number: U64::from(i), + events: HashMap::new(), + })) + .unwrap(); + } + + for i in 0..10 { + let ret = recv.recv().await.unwrap(); + assert_eq!(ret.number, i); + } + + shutdown_token.cancel(); + handle.await.unwrap().unwrap(); + mempool_handle.await.unwrap(); + } } diff --git a/src/op_pool/server/mod.rs b/src/op_pool/server/mod.rs index d3d219570..1057ebb86 100644 --- a/src/op_pool/server/mod.rs +++ b/src/op_pool/server/mod.rs @@ -8,7 +8,7 @@ pub use local::{spawn_local_mempool_server, LocalPoolClient, ServerRequest}; #[cfg(test)] use mockall::automock; pub use remote::{connect_remote_pool_client, spawn_remote_mempool_server, RemotePoolClient}; -use tokio::sync::mpsc; +use tokio::sync::{broadcast, mpsc}; use tonic::async_trait; use super::{mempool::PoolOperation, Reputation}; @@ -44,12 +44,21 @@ pub trait PoolClient: Send + Sync + 'static { ) -> PoolResult<()>; async fn debug_dump_reputation(&self, entry_point: Address) -> PoolResult>; + + async fn subscribe_new_blocks(&self) -> PoolResult>; +} + +#[derive(Clone, Debug)] +pub struct NewBlock { + pub hash: H256, + pub number: u64, } #[derive(Debug)] pub enum PoolClientMode { Local { sender: mpsc::Sender, + block_receiver: broadcast::Receiver, }, Remote { url: String, diff --git a/src/op_pool/server/remote/client.rs b/src/op_pool/server/remote/client.rs index 2afe7d53e..ec3ddc58c 100644 --- a/src/op_pool/server/remote/client.rs +++ b/src/op_pool/server/remote/client.rs @@ -1,5 +1,8 @@ +use std::sync::Arc; + use anyhow::bail; use ethers::types::{Address, H256}; +use tokio::sync::broadcast; use tokio_util::sync::CancellationToken; use tonic::{async_trait, transport::Channel}; @@ -9,35 +12,61 @@ use super::protos::{ op_pool_client::OpPoolClient, remove_entities_response, remove_ops_response, AddOpRequest, DebugClearStateRequest, DebugDumpMempoolRequest, DebugDumpReputationRequest, DebugSetReputationRequest, GetOpsRequest, RemoveEntitiesRequest, RemoveOpsRequest, + SubscribeNewBlocksRequest, }; use crate::{ common::{ + handle::SpawnGuard, protos::{from_bytes, ConversionError}, server::connect_with_retries, types::{Entity, UserOperation}, }, op_pool::{ mempool::{PoolOperation, Reputation}, - server::{error::PoolServerError, PoolClient}, + server::{error::PoolServerError, NewBlock, PoolClient}, PoolResult, }, }; -#[derive(Clone, Debug)] +#[derive(Debug)] pub struct RemotePoolClient { op_pool_client: OpPoolClient, + rx: broadcast::Receiver, + _guard: SpawnGuard, } impl RemotePoolClient { pub fn new(client: OpPoolClient) -> Self { + let (tx, rx) = broadcast::channel(1024); + let _guard = SpawnGuard::spawn_with_guard(Self::run(client.clone(), tx)); Self { op_pool_client: client, + rx, + _guard, + } + } + + async fn run(mut client: OpPoolClient, tx: broadcast::Sender) { + // Panics if it fails to subscribe as this functionality + // is fundamental to the working of the client. + let mut stream = client + .subscribe_new_blocks(SubscribeNewBlocksRequest {}) + .await + .unwrap() + .into_inner(); + + while let Some(new_block) = stream.message().await.unwrap() { + let new_block = NewBlock { + hash: from_bytes(&new_block.hash).unwrap(), + number: new_block.number, + }; + tx.send(new_block).unwrap(); } } } #[async_trait] -impl PoolClient for RemotePoolClient { +impl PoolClient for Arc { async fn get_supported_entry_points(&self) -> PoolResult> { Ok(self .op_pool_client @@ -235,19 +264,23 @@ impl PoolClient for RemotePoolClient { )))?, } } + + async fn subscribe_new_blocks(&self) -> PoolResult> { + Ok(self.rx.resubscribe()) + } } pub async fn connect_remote_pool_client( op_pool_url: &str, shutdown_token: CancellationToken, -) -> anyhow::Result { +) -> anyhow::Result> { tokio::select! { _ = shutdown_token.cancelled() => { tracing::error!("bailing from connecting client, server shutting down"); bail!("Server shutting down") } res = connect_with_retries("op pool from builder", op_pool_url, OpPoolClient::connect) => { - Ok(RemotePoolClient::new(res?)) + Ok(Arc::new(RemotePoolClient::new(res?))) } } } diff --git a/src/op_pool/server/remote/server.rs b/src/op_pool/server/remote/server.rs index 6390ffd82..1df9fd475 100644 --- a/src/op_pool/server/remote/server.rs +++ b/src/op_pool/server/remote/server.rs @@ -1,7 +1,16 @@ -use std::net::SocketAddr; +use std::{ + net::SocketAddr, + sync::{Arc, Mutex}, +}; +use anyhow::Context; use ethers::types::{Address, H256}; -use tokio::task::JoinHandle; +use futures_util::TryFutureExt; +use tokio::{ + sync::mpsc::{self, Sender}, + task::JoinHandle, +}; +use tokio_stream::wrappers::ReceiverStream; use tokio_util::sync::CancellationToken; use tonic::{async_trait, transport::Server, Request, Response, Result, Status}; @@ -15,9 +24,9 @@ use super::protos::{ DebugDumpReputationRequest, DebugDumpReputationResponse, DebugDumpReputationSuccess, DebugSetReputationRequest, DebugSetReputationResponse, DebugSetReputationSuccess, GetOpsRequest, GetOpsResponse, GetOpsSuccess, GetSupportedEntryPointsRequest, - GetSupportedEntryPointsResponse, MempoolOp, RemoveEntitiesRequest, RemoveEntitiesResponse, - RemoveEntitiesSuccess, RemoveOpsRequest, RemoveOpsResponse, RemoveOpsSuccess, - OP_POOL_FILE_DESCRIPTOR_SET, + GetSupportedEntryPointsResponse, MempoolOp, NewBlock, RemoveEntitiesRequest, + RemoveEntitiesResponse, RemoveEntitiesSuccess, RemoveOpsRequest, RemoveOpsResponse, + RemoveOpsSuccess, SubscribeNewBlocksRequest, OP_POOL_FILE_DESCRIPTOR_SET, }; use crate::{ common::{grpc::metrics::GrpcMetricsLayer, protos::from_bytes, types::Entity}, @@ -26,12 +35,13 @@ use crate::{ pub async fn spawn_remote_mempool_server( chain_id: u64, - mempool_runner: MempoolGroup, + mempool_group: Arc>, addr: SocketAddr, shutdown_token: CancellationToken, ) -> anyhow::Result>> { // gRPC server - let op_pool_server = OpPoolServer::new(OpPoolImpl::new(chain_id, mempool_runner)); + let pool_impl = Arc::new(OpPoolImpl::new(chain_id, mempool_group)); + let op_pool_server = OpPoolServer::new(Arc::clone(&pool_impl)); let reflection_service = tonic_reflection::server::Builder::configure() .register_encoded_file_descriptor_set(OP_POOL_FILE_DESCRIPTOR_SET) .build()?; @@ -39,36 +49,80 @@ pub async fn spawn_remote_mempool_server( // health service let (mut health_reporter, health_service) = tonic_health::server::health_reporter(); health_reporter - .set_serving::>>() + .set_serving::>>>() .await; let metrics_layer = GrpcMetricsLayer::new("op_pool".to_string()); let handle = tokio::spawn(async move { - Server::builder() + let pool_handle = pool_impl + .run(shutdown_token.clone()) + .map_err(|e| anyhow::anyhow!(format!("pool server failed: {e:?}"))); + + let server_handle = Server::builder() .layer(metrics_layer) .add_service(op_pool_server) .add_service(reflection_service) .add_service(health_service) .serve_with_shutdown(addr, async move { shutdown_token.cancelled().await }) - .await - .map_err(|err| anyhow::anyhow!("Server error: {err:?}")) + .map_err(|e| anyhow::anyhow!(format!("pool server failed: {e:?}"))); + + tokio::try_join!(pool_handle, server_handle)?; + Ok(()) }); + Ok(handle) } struct OpPoolImpl { chain_id: u64, - mempool_runner: MempoolGroup, + mempools: Arc>, + block_subscriptions: Mutex>>>, } impl OpPoolImpl where M: Mempool, { - pub fn new(chain_id: u64, mempool_runner: MempoolGroup) -> Self { + pub fn new(chain_id: u64, mempools: Arc>) -> Self { Self { chain_id, - mempool_runner, + mempools, + block_subscriptions: Mutex::new(vec![]), + } + } + + pub async fn run(self: Arc, shutdown_token: CancellationToken) -> anyhow::Result<()> { + let mut rx_blocks = self.mempools.clone().subscribe_new_blocks(); + loop { + let block = tokio::select! { + _ = shutdown_token.cancelled() => { + return Ok(()); + } + r = rx_blocks.recv() => { + r.context("Failed to receive block")? + } + }; + + let subs = self.block_subscriptions.lock().unwrap().clone(); + + let mut to_remove = vec![]; + for (i, tx) in subs.iter().enumerate() { + if tx + .send(Ok(NewBlock { + hash: block.hash.as_bytes().to_vec(), + number: block.number.as_u64(), + })) + .await + .is_err() + { + to_remove.push(i); + } + } + + let mut subscriptions = self.block_subscriptions.lock().unwrap(); + for i in to_remove { + subscriptions.remove(i); + } } } @@ -79,7 +133,7 @@ where } #[async_trait] -impl OpPool for OpPoolImpl +impl OpPool for Arc> where M: Mempool + 'static, { @@ -90,7 +144,7 @@ where Ok(Response::new(GetSupportedEntryPointsResponse { chain_id: self.chain_id, entry_points: self - .mempool_runner + .mempools .get_supported_entry_points() .into_iter() .map(|ep| ep.as_bytes().to_vec()) @@ -109,11 +163,7 @@ where Status::invalid_argument(format!("Failed to convert to UserOperation: {e}")) })?; - let resp = match self - .mempool_runner - .add_op(ep, uo, OperationOrigin::Local) - .await - { + let resp = match self.mempools.add_op(ep, uo, OperationOrigin::Local).await { Ok(hash) => AddOpResponse { result: Some(add_op_response::Result::Success(AddOpSuccess { hash: hash.as_bytes().to_vec(), @@ -131,7 +181,7 @@ where let req = request.into_inner(); let ep = self.get_entry_point(&req.entry_point)?; - let resp = match self.mempool_runner.get_ops(ep, req.max_ops) { + let resp = match self.mempools.get_ops(ep, req.max_ops) { Ok(ops) => GetOpsResponse { result: Some(get_ops_response::Result::Success(GetOpsSuccess { ops: ops.iter().map(MempoolOp::from).collect(), @@ -163,7 +213,7 @@ where }) .collect::, _>>()?; - let resp = match self.mempool_runner.remove_ops(ep, &hashes) { + let resp = match self.mempools.remove_ops(ep, &hashes) { Ok(_) => RemoveOpsResponse { result: Some(remove_ops_response::Result::Success(RemoveOpsSuccess {})), }, @@ -188,11 +238,11 @@ where .collect::, _>>() .map_err(|e| Status::internal(format!("Failed to convert to proto entity: {e}")))?; - self.mempool_runner + self.mempools .remove_entities(ep, &entities) .map_err(|e| Status::internal(e.to_string()))?; - let resp = match self.mempool_runner.remove_entities(ep, &entities) { + let resp = match self.mempools.remove_entities(ep, &entities) { Ok(_) => RemoveEntitiesResponse { result: Some(remove_entities_response::Result::Success( RemoveEntitiesSuccess {}, @@ -210,7 +260,7 @@ where &self, _request: Request, ) -> Result> { - let resp = match self.mempool_runner.debug_clear_state() { + let resp = match self.mempools.debug_clear_state() { Ok(_) => DebugClearStateResponse { result: Some(debug_clear_state_response::Result::Success( DebugClearStateSuccess {}, @@ -231,7 +281,7 @@ where let req = request.into_inner(); let ep = self.get_entry_point(&req.entry_point)?; - let resp = match self.mempool_runner.debug_dump_mempool(ep) { + let resp = match self.mempools.debug_dump_mempool(ep) { Ok(ops) => DebugDumpMempoolResponse { result: Some(debug_dump_mempool_response::Result::Success( DebugDumpMempoolSuccess { @@ -270,7 +320,7 @@ where Status::internal(format!("Failed to convert from proto reputation {e}")) })?; - let resp = match self.mempool_runner.debug_set_reputations(ep, &reps) { + let resp = match self.mempools.debug_set_reputations(ep, &reps) { Ok(_) => DebugSetReputationResponse { result: Some(debug_set_reputation_response::Result::Success( DebugSetReputationSuccess {}, @@ -291,7 +341,7 @@ where let req = request.into_inner(); let ep = self.get_entry_point(&req.entry_point)?; - let resp = match self.mempool_runner.debug_dump_reputation(ep) { + let resp = match self.mempools.debug_dump_reputation(ep) { Ok(reps) => DebugDumpReputationResponse { result: Some(debug_dump_reputation_response::Result::Success( DebugDumpReputationSuccess { @@ -308,4 +358,15 @@ where Ok(Response::new(resp)) } + + type SubscribeNewBlocksStream = ReceiverStream>; + + async fn subscribe_new_blocks( + &self, + _request: Request, + ) -> Result> { + let (tx, rx) = mpsc::channel(1024); + self.block_subscriptions.lock().unwrap().push(tx); + Ok(Response::new(ReceiverStream::new(rx))) + } } diff --git a/src/op_pool/task.rs b/src/op_pool/task.rs index 02c5af80e..c50da3e38 100644 --- a/src/op_pool/task.rs +++ b/src/op_pool/task.rs @@ -4,8 +4,10 @@ use anyhow::{bail, Context}; use ethers::providers::{ Http, HttpRateLimitRetryPolicy, JsonRpcClient, Provider, RetryClientBuilder, }; -use futures::future; -use tokio::{sync::mpsc, task::JoinHandle, try_join}; +use tokio::{ + sync::{broadcast, mpsc}, + try_join, +}; use tokio_util::sync::CancellationToken; use tonic::async_trait; use url::Url; @@ -13,7 +15,7 @@ use url::Url; use super::{ event::EventProvider, mempool::{HourlyMovingAverageReputation, PoolConfig, ReputationParams}, - server::ServerRequest, + server::{NewBlock, ServerRequest}, }; use crate::{ common::{ @@ -32,7 +34,8 @@ use crate::{ #[derive(Debug)] pub enum PoolServerMode { Local { - receiver: Option>, + req_receiver: Option>, + block_sender: Option>, }, Remote { addr: SocketAddr, @@ -58,7 +61,12 @@ pub struct PoolTask { impl Task for PoolTask { async fn run(&mut self, shutdown_token: CancellationToken) -> anyhow::Result<()> { let chain_id = self.args.chain_id; - let entry_points = self.args.pool_configs.iter().map(|pc| &pc.entry_point); + let entry_points = self + .args + .pool_configs + .iter() + .map(|pc| pc.entry_point) + .collect(); tracing::info!("Chain id: {chain_id}"); tracing::info!("Websocket url: {:?}", self.args.ws_url); tracing::info!("Http url: {:?}", self.args.http_url); @@ -90,47 +98,46 @@ impl Task for PoolTask { // create mempools let mut mempools = vec![]; - let mut mempool_handles = Vec::new(); for pool_config in &self.args.pool_configs { - let (pool, handle) = PoolTask::create_mempool( - pool_config, - event_provider.as_ref(), - shutdown_token.clone(), - provider.clone(), - ) - .await - .context("should have created mempool")?; - + let pool = PoolTask::create_mempool(pool_config, provider.clone()) + .await + .context("should have created mempool")?; mempools.push(pool); - mempool_handles.push(handle); } - // handle to wait for mempools to terminate + let mempool_group = Arc::new(MempoolGroup::new(mempools)); + let mempool_group_runner = Arc::clone(&mempool_group); + let mempool_shutdown = shutdown_token.clone(); + let events = event_provider.subscribe(); + // handle to wait for mempool group to terminate let mempool_handle = tokio::spawn(async move { - future::join_all(mempool_handles) - .await - .into_iter() - .collect::, _>>() - .map(|_| ()) - .context("should have joined mempool handles") + mempool_group_runner.run(events, mempool_shutdown).await; + Ok(()) }); // Start events listener let events_provider_handle = event_provider.spawn(shutdown_token.clone()); - let mempool_group = MempoolGroup::new(mempools); - let server_handle = match &mut self.args.server_mode { - PoolServerMode::Local { ref mut receiver } => { - let recv = receiver + PoolServerMode::Local { + ref mut req_receiver, + ref mut block_sender, + } => { + let req_receiver = req_receiver .take() .context("should have local server message receiver")?; - spawn_local_mempool_server(mempool_group, recv, shutdown_token.clone())? + let block_sender = block_sender.take().context("should have block sender")?; + spawn_local_mempool_server( + Arc::clone(&mempool_group), + req_receiver, + block_sender, + shutdown_token.clone(), + )? } PoolServerMode::Remote { addr } => { spawn_remote_mempool_server( self.args.chain_id, - mempool_group, + Arc::clone(&mempool_group), *addr, shutdown_token.clone(), ) @@ -168,14 +175,9 @@ impl PoolTask { async fn create_mempool( pool_config: &PoolConfig, - event_provider: &dyn EventProvider, - shutdown_token: CancellationToken, provider: Arc>, - ) -> anyhow::Result<( - Arc>, - JoinHandle<()>, - )> { - let entry_point = pool_config.entry_point; + ) -> anyhow::Result> + { // Reputation manager let reputation = Arc::new(HourlyMovingAverageReputation::new( ReputationParams::bundler_default(), @@ -200,23 +202,11 @@ impl PoolTask { pool_config.mempool_channel_configs.clone(), ); - // Mempool - let mp = Arc::new(UoPool::new( + Ok(UoPool::new( pool_config.clone(), Arc::clone(&reputation), prechecker, simulator, - )); - // Start mempool - let mempool_events = event_provider - .subscribe_by_entrypoint(entry_point) - .context("event listener should have entrypoint subscriber")?; - let mp_runner = Arc::clone(&mp); - let handle = - tokio::spawn( - async move { mp_runner.run(mempool_events, shutdown_token.clone()).await }, - ); - - Ok((mp, handle)) + )) } } diff --git a/src/rpc/task.rs b/src/rpc/task.rs index 2fd88fa9b..c28d50983 100644 --- a/src/rpc/task.rs +++ b/src/rpc/task.rs @@ -86,8 +86,12 @@ impl Task for RpcTask { let mut module = RpcModule::new(()); match &self.args.pool_client_mode { - PoolClientMode::Local { sender } => { - let pool_client = LocalPoolClient::new(sender.clone()); + PoolClientMode::Local { + sender, + block_receiver, + } => { + let pool_client = + LocalPoolClient::new(sender.clone(), block_receiver.resubscribe()); self.attach_namespaces(provider, pool_client.clone(), builder_client, &mut module)?; module.merge(LocalHealthCheck::new(pool_client).into_rpc())?;