From 0412b5c72c704d005d536c6c5a238ff5132fa64c Mon Sep 17 00:00:00 2001 From: dancoombs Date: Thu, 24 Aug 2023 15:24:33 -0500 Subject: [PATCH] refactor: rework the pool server classes --- src/builder/bundle_proposer.rs | 20 +- src/builder/bundle_sender.rs | 20 +- src/builder/task.rs | 105 ++--- src/cli/builder.rs | 29 +- src/cli/node/mod.rs | 38 +- src/cli/pool.rs | 4 +- src/cli/rpc.rs | 25 +- src/common/handle.rs | 4 +- src/common/server.rs | 16 +- src/op_pool/mempool/mod.rs | 135 +------ src/op_pool/mod.rs | 7 +- src/op_pool/server/local.rs | 602 ++++++++++++++++++++++++++++ src/op_pool/server/local/client.rs | 154 ------- src/op_pool/server/local/mod.rs | 5 - src/op_pool/server/local/server.rs | 323 --------------- src/op_pool/server/mod.rs | 36 +- src/op_pool/server/remote/client.rs | 68 +++- src/op_pool/server/remote/protos.rs | 12 +- src/op_pool/server/remote/server.rs | 102 ++--- src/op_pool/task.rs | 79 ++-- src/rpc/debug.rs | 18 +- src/rpc/eth/estimation.rs | 34 +- src/rpc/eth/mod.rs | 30 +- src/rpc/health.rs | 76 ++-- src/rpc/task.rs | 89 ++-- 25 files changed, 966 insertions(+), 1065 deletions(-) create mode 100644 src/op_pool/server/local.rs delete mode 100644 src/op_pool/server/local/client.rs delete mode 100644 src/op_pool/server/local/mod.rs delete mode 100644 src/op_pool/server/local/server.rs diff --git a/src/builder/bundle_proposer.rs b/src/builder/bundle_proposer.rs index 2172653a6..bc91b5eb5 100644 --- a/src/builder/bundle_proposer.rs +++ b/src/builder/bundle_proposer.rs @@ -28,7 +28,7 @@ use crate::{ Timestamp, UserOperation, }, }, - op_pool::{PoolClient, PoolOperation}, + op_pool::{PoolOperation, PoolServer}, }; /// A user op must be valid for at least this long into the future to be included. @@ -76,9 +76,9 @@ where S: Simulator, E: EntryPointLike, P: ProviderLike, - C: PoolClient, + C: PoolServer, { - op_pool: C, + pool: C, simulator: S, entry_point: E, provider: Arc

, @@ -104,7 +104,7 @@ where S: Simulator, E: EntryPointLike, P: ProviderLike, - C: PoolClient, + C: PoolServer, { async fn make_bundle(&self, required_fees: Option) -> anyhow::Result { let (ops, block_hash, bundle_fees) = try_join!( @@ -180,10 +180,10 @@ where S: Simulator, E: EntryPointLike, P: ProviderLike, - C: PoolClient, + C: PoolServer, { pub fn new( - op_pool: C, + pool: C, simulator: S, entry_point: E, provider: Arc

, @@ -192,7 +192,7 @@ where event_sender: broadcast::Sender>, ) -> Self { Self { - op_pool, + pool, simulator, entry_point, provider: provider.clone(), @@ -398,7 +398,7 @@ where } async fn get_ops_from_pool(&self) -> anyhow::Result> { - self.op_pool + self.pool .get_ops(self.entry_point.address(), self.settings.max_bundle_size) .await .context("should get ops from pool") @@ -698,7 +698,7 @@ mod tests { }, types::{MockEntryPointLike, MockProviderLike, ValidTimeRange}, }, - op_pool::MockPoolClient, + op_pool::MockPoolServer, }; #[tokio::test] @@ -1174,7 +1174,7 @@ mod tests { }) .collect(); - let mut pool_client = MockPoolClient::new(); + let mut pool_client = MockPoolServer::new(); pool_client .expect_get_ops() .returning(move |_, _| Ok(ops.clone())); diff --git a/src/builder/bundle_sender.rs b/src/builder/bundle_sender.rs index 67353d0e8..e5fedab12 100644 --- a/src/builder/bundle_sender.rs +++ b/src/builder/bundle_sender.rs @@ -28,7 +28,7 @@ use crate::{ math, types::{Entity, EntryPointLike, ExpectedStorage, UserOperation}, }, - op_pool::PoolClient, + op_pool::PoolServer, }; // Overhead on gas estimates to account for inaccuracies. @@ -51,7 +51,7 @@ where P: BundleProposer, E: EntryPointLike, T: TransactionTracker, - C: PoolClient, + C: PoolServer, { manual_bundling_mode: Arc, send_bundle_receiver: mpsc::Receiver, @@ -61,7 +61,7 @@ where proposer: P, entry_point: E, transaction_tracker: T, - pool_client: C, + pool: C, settings: Settings, event_sender: broadcast::Sender>, } @@ -99,13 +99,13 @@ where P: BundleProposer, E: EntryPointLike, T: TransactionTracker, - C: PoolClient, + C: PoolServer, { /// Loops forever, attempting to form and send a bundle on each new block, /// then waiting for one bundle to be mined or dropped before forming the /// next one. async fn send_bundles_in_loop(&mut self) { - let mut new_blocks = if let Ok(new_blocks) = self.pool_client.subscribe_new_blocks().await { + let mut new_blocks = if let Ok(new_blocks) = self.pool.subscribe_new_blocks().await { new_blocks } else { error!("Failed to subscribe to new blocks"); @@ -177,7 +177,7 @@ where P: BundleProposer, E: EntryPointLike, T: TransactionTracker, - C: PoolClient, + C: PoolServer, { #[allow(clippy::too_many_arguments)] pub fn new( @@ -189,7 +189,7 @@ where proposer: P, entry_point: E, transaction_tracker: T, - pool_client: C, + pool: C, settings: Settings, event_sender: broadcast::Sender>, ) -> Self { @@ -202,7 +202,7 @@ where proposer, entry_point, transaction_tracker, - pool_client, + pool, settings, event_sender, } @@ -441,7 +441,7 @@ where } async fn remove_ops_from_pool(&self, ops: &[UserOperation]) -> anyhow::Result<()> { - self.pool_client + self.pool .remove_ops( self.entry_point.address(), ops.iter() @@ -453,7 +453,7 @@ where } async fn remove_entities_from_pool(&self, entities: &[Entity]) -> anyhow::Result<()> { - self.pool_client + self.pool .remove_entities(self.entry_point.address(), entities.to_vec()) .await .context("builder should remove rejected entities from pool") diff --git a/src/builder/task.rs b/src/builder/task.rs index 4dfb07ef8..4ee798b0e 100644 --- a/src/builder/task.rs +++ b/src/builder/task.rs @@ -37,7 +37,7 @@ use crate::{ server::format_socket_addr, simulation::{self, SimulatorImpl}, }, - op_pool::{connect_remote_pool_client, LocalPoolClient, PoolClientMode}, + op_pool::PoolServer, }; #[derive(Debug)] @@ -64,18 +64,21 @@ pub struct Args { pub max_blocks_to_wait_for_mine: u64, pub replacement_fee_percent_increase: u64, pub max_fee_increases: u64, - pub pool_client_mode: PoolClientMode, } #[derive(Debug)] -pub struct BuilderTask { +pub struct BuilderTask

{ args: Args, event_sender: broadcast::Sender>, + pool: P, } #[async_trait] -impl Task for BuilderTask { - async fn run(&mut self, shutdown_token: CancellationToken) -> anyhow::Result<()> { +impl

Task for BuilderTask

+where + P: PoolServer + Clone, +{ + async fn run(mut self: Box, shutdown_token: CancellationToken) -> anyhow::Result<()> { let addr = format_socket_addr(&self.args.host, self.args.port).parse()?; info!("Starting builder server on {}", addr); tracing::info!("Mempool config: {:?}", self.args.mempool_configs); @@ -153,62 +156,28 @@ impl Task for BuilderTask { let manual_bundling_mode = Arc::new(AtomicBool::new(false)); let (send_bundle_tx, send_bundle_rx) = mpsc::channel(1); - let mut builder: Box = match &self.args.pool_client_mode { - PoolClientMode::Local { - req_sender, - block_receiver, - } => { - let pool_client = - LocalPoolClient::new(req_sender.clone(), block_receiver.resubscribe()); - let proposer = BundleProposerImpl::new( - pool_client.clone(), - simulator, - entry_point.clone(), - Arc::clone(&provider), - self.args.chain_id, - proposer_settings, - self.event_sender.clone(), - ); - Box::new(BundleSenderImpl::new( - manual_bundling_mode.clone(), - send_bundle_rx, - self.args.chain_id, - beneficiary, - self.args.eth_poll_interval, - proposer, - entry_point, - transaction_tracker, - pool_client, - builder_settings, - self.event_sender.clone(), - )) - } - PoolClientMode::Remote { url } => { - let pool_client = connect_remote_pool_client(url, shutdown_token.clone()).await?; - let proposer = BundleProposerImpl::new( - pool_client.clone(), - simulator, - entry_point.clone(), - Arc::clone(&provider), - self.args.chain_id, - proposer_settings, - self.event_sender.clone(), - ); - Box::new(BundleSenderImpl::new( - manual_bundling_mode.clone(), - send_bundle_rx, - self.args.chain_id, - beneficiary, - self.args.eth_poll_interval, - proposer, - entry_point, - transaction_tracker, - pool_client, - builder_settings, - self.event_sender.clone(), - )) - } - }; + let proposer = BundleProposerImpl::new( + self.pool.clone(), + simulator, + entry_point.clone(), + Arc::clone(&provider), + self.args.chain_id, + proposer_settings, + self.event_sender.clone(), + ); + let mut builder = BundleSenderImpl::new( + manual_bundling_mode.clone(), + send_bundle_rx, + self.args.chain_id, + beneficiary, + self.args.eth_poll_interval, + proposer, + entry_point, + transaction_tracker, + self.pool, + builder_settings, + self.event_sender.clone(), + ); let _builder_loop_guard = { SpawnGuard::spawn_with_guard(async move { builder.send_bundles_in_loop().await }) }; @@ -248,12 +217,20 @@ impl Task for BuilderTask { } } -impl BuilderTask { +impl

BuilderTask

+where + P: PoolServer + Clone, +{ pub fn new( args: Args, event_sender: broadcast::Sender>, - ) -> BuilderTask { - Self { args, event_sender } + pool: P, + ) -> Self { + Self { + args, + event_sender, + pool, + } } pub fn boxed(self) -> Box { diff --git a/src/cli/builder.rs b/src/cli/builder.rs index cfb3f13d9..6ba2eaa09 100644 --- a/src/cli/builder.rs +++ b/src/cli/builder.rs @@ -1,6 +1,6 @@ use std::{collections::HashMap, time::Duration}; -use anyhow::Context; +use anyhow::{bail, Context}; use clap::Args; use ethers::types::H256; use tokio::sync::broadcast; @@ -13,9 +13,9 @@ use crate::{ gas::PriorityFeeMode, handle::spawn_tasks_with_shutdown, mempool::MempoolConfig, - server::format_server_addr, + server::{connect_with_retries, format_server_addr}, }, - op_pool::PoolClientMode, + op_pool::RemotePoolClient, }; /// CLI options for the builder @@ -150,11 +150,7 @@ pub struct BuilderArgs { impl BuilderArgs { /// Convert the CLI arguments into the arguments for the builder combining /// common and builder specific arguments. - pub async fn to_args( - &self, - common: &CommonArgs, - pool_client_mode: PoolClientMode, - ) -> anyhow::Result { + pub async fn to_args(&self, common: &CommonArgs) -> anyhow::Result { let priority_fee_mode = PriorityFeeMode::try_from( common.priority_fee_mode_kind.as_str(), common.priority_fee_mode_value, @@ -204,7 +200,6 @@ impl BuilderArgs { max_blocks_to_wait_for_mine: self.max_blocks_to_wait_for_mine, replacement_fee_percent_increase: self.replacement_fee_percent_increase, max_fee_increases: self.max_fee_increases, - pool_client_mode, }) } @@ -238,12 +233,20 @@ pub async fn run(builder_args: BuilderCliArgs, common_args: CommonArgs) -> anyho let (event_sender, event_rx) = broadcast::channel(EVENT_CHANNEL_CAPACITY); emit::receive_and_log_events_with_filter(event_rx, is_nonspammy_event); - let task_args = builder_args - .to_args(&common_args, PoolClientMode::Remote { url: pool_url }) - .await?; + let task_args = builder_args.to_args(&common_args).await?; + + let pool = tokio::select! { + _ = tokio::signal::ctrl_c() => { + tracing::error!("bailing from connecting client, server shutting down"); + bail!("Server shutting down") + } + res = connect_with_retries("op pool from builder", &pool_url, RemotePoolClient::connect_func()) => { + res + } + }?; spawn_tasks_with_shutdown( - [BuilderTask::new(task_args, event_sender).boxed()], + [BuilderTask::new(task_args, event_sender, pool).boxed()], tokio::signal::ctrl_c(), ) .await; diff --git a/src/cli/node/mod.rs b/src/cli/node/mod.rs index fa1709f07..73e30f027 100644 --- a/src/cli/node/mod.rs +++ b/src/cli/node/mod.rs @@ -1,5 +1,5 @@ use clap::Args; -use tokio::sync::{broadcast, mpsc}; +use tokio::sync::broadcast; use self::events::Event; use crate::{ @@ -14,7 +14,7 @@ use crate::{ emit::{self, WithEntryPoint, EVENT_CHANNEL_CAPACITY}, handle, }, - op_pool::{emit::OpPoolEvent, PoolClientMode, PoolServerMode, PoolTask}, + op_pool::{emit::OpPoolEvent, LocalPoolBuilder, PoolServerMode, PoolTask}, rpc::RpcTask, }; mod events; @@ -40,37 +40,16 @@ pub async fn run(bundler_args: NodeCliArgs, common_args: CommonArgs) -> anyhow:: let builder_url = builder_args.url(false); - let (tx, rx) = mpsc::channel(1024); - let (block_sender, _) = broadcast::channel(1024); - let pool_task_args = pool_args - .to_args( - &common_args, - PoolServerMode::Local { - req_receiver: Some(rx), - block_sender: Some(block_sender.clone()), - }, - ) - .await?; - let builder_task_args = builder_args - .to_args( - &common_args, - PoolClientMode::Local { - req_sender: tx.clone(), - block_receiver: block_sender.subscribe(), - }, - ) + .to_args(&common_args, PoolServerMode::Local) .await?; + let builder_task_args = builder_args.to_args(&common_args).await?; let rpc_task_args = rpc_args .to_args( &common_args, builder_url, (&common_args).try_into()?, (&common_args).try_into()?, - PoolClientMode::Local { - req_sender: tx, - block_receiver: block_sender.subscribe(), - }, ) .await?; @@ -97,11 +76,14 @@ pub async fn run(bundler_args: NodeCliArgs, common_args: CommonArgs) -> anyhow:: } }); + let pool_builder = LocalPoolBuilder::new(1024, 1024); + let pool_handle = pool_builder.get_handle(); + handle::spawn_tasks_with_shutdown( [ - PoolTask::new(pool_task_args, op_pool_event_sender).boxed(), - BuilderTask::new(builder_task_args, builder_event_sender).boxed(), - RpcTask::new(rpc_task_args).boxed(), + PoolTask::new(pool_task_args, op_pool_event_sender, pool_builder).boxed(), + BuilderTask::new(builder_task_args, builder_event_sender, pool_handle.clone()).boxed(), + RpcTask::new(rpc_task_args, pool_handle).boxed(), ], tokio::signal::ctrl_c(), ) diff --git a/src/cli/pool.rs b/src/cli/pool.rs index 3b14cb546..7a85e510a 100644 --- a/src/cli/pool.rs +++ b/src/cli/pool.rs @@ -13,7 +13,7 @@ use crate::{ handle::spawn_tasks_with_shutdown, mempool::MempoolConfig, }, - op_pool::{self, PoolConfig, PoolServerMode, PoolTask}, + op_pool::{self, LocalPoolBuilder, PoolConfig, PoolServerMode, PoolTask}, }; /// CLI options for the OP Pool #[derive(Args, Debug)] @@ -198,7 +198,7 @@ pub async fn run(pool_args: PoolCliArgs, common_args: CommonArgs) -> anyhow::Res emit::receive_and_log_events_with_filter(event_rx, |_| true); spawn_tasks_with_shutdown( - [PoolTask::new(task_args, event_sender).boxed()], + [PoolTask::new(task_args, event_sender, LocalPoolBuilder::new(1024, 1024)).boxed()], tokio::signal::ctrl_c(), ) .await; diff --git a/src/cli/rpc.rs b/src/cli/rpc.rs index 5609aa644..022c9888f 100644 --- a/src/cli/rpc.rs +++ b/src/cli/rpc.rs @@ -1,12 +1,12 @@ use std::time::Duration; -use anyhow::Context; +use anyhow::{bail, Context}; use clap::Args; use super::CommonArgs; use crate::{ - common::{handle::spawn_tasks_with_shutdown, precheck}, - op_pool::PoolClientMode, + common::{handle::spawn_tasks_with_shutdown, precheck, server::connect_with_retries}, + op_pool::RemotePoolClient, rpc::{self, estimation, RpcTask}, }; @@ -71,7 +71,6 @@ impl RpcArgs { builder_url: String, precheck_settings: precheck::Settings, estimation_settings: estimation::Settings, - pool_client_mode: PoolClientMode, ) -> anyhow::Result { let apis = self .api @@ -99,7 +98,6 @@ impl RpcArgs { estimation_settings, rpc_timeout: Duration::from_secs(self.timeout_seconds.parse()?), max_connections: self.max_connections, - pool_client_mode, }) } } @@ -142,10 +140,23 @@ pub async fn run(rpc_args: RpcCliArgs, common_args: CommonArgs) -> anyhow::Resul builder_url, (&common_args).try_into()?, (&common_args).try_into()?, - PoolClientMode::Remote { url: pool_url }, ) .await?; - spawn_tasks_with_shutdown([RpcTask::new(task_args).boxed()], tokio::signal::ctrl_c()).await; + let pool = tokio::select! { + _ = tokio::signal::ctrl_c() => { + tracing::error!("bailing from connecting client, server shutting down"); + bail!("Server shutting down") + } + res = connect_with_retries("op pool from builder", &pool_url, RemotePoolClient::connect_func()) => { + res + } + }?; + + spawn_tasks_with_shutdown( + [RpcTask::new(task_args, pool).boxed()], + tokio::signal::ctrl_c(), + ) + .await; Ok(()) } diff --git a/src/common/handle.rs b/src/common/handle.rs index ec4cb1367..ee083e989 100644 --- a/src/common/handle.rs +++ b/src/common/handle.rs @@ -49,7 +49,7 @@ impl Drop for SpawnGuard { #[async_trait] pub trait Task: Sync + Send + 'static { - async fn run(&mut self, shutdown_token: CancellationToken) -> anyhow::Result<()>; + async fn run(mut self: Box, shutdown_token: CancellationToken) -> anyhow::Result<()>; } pub async fn spawn_tasks_with_shutdown( @@ -63,7 +63,7 @@ pub async fn spawn_tasks_with_shutdown( let shutdown_token = CancellationToken::new(); let mut shutdown_scope = Some(shutdown_scope); - let handles = tasks.into_iter().map(|mut task| { + let handles = tasks.into_iter().map(|task| { let st = shutdown_token.clone(); let ss = shutdown_scope.clone(); async move { diff --git a/src/common/server.rs b/src/common/server.rs index f5bc64eeb..db7f1872a 100644 --- a/src/common/server.rs +++ b/src/common/server.rs @@ -1,6 +1,7 @@ use std::{future::Future, time::Duration}; use anyhow::Context; +use tonic::async_trait; use crate::common::{retry, retry::RetryOpts}; @@ -23,7 +24,7 @@ pub async fn connect_with_retries( ) -> anyhow::Result where F: Fn(String) -> FutF, - FutF: Future> + Send + 'static, + FutF: Future> + Send + 'static, { let description = format!("connect to {server_name} at {url}"); retry::with_retries( @@ -39,3 +40,16 @@ where .await .context("should connect to server when retrying") } + +#[derive(Debug)] +pub enum ServerStatus { + Serving, + NotServing, +} + +#[async_trait] +pub trait HealthCheck: Send + Sync + 'static { + fn name(&self) -> &'static str; + + async fn status(&self) -> ServerStatus; +} diff --git a/src/op_pool/mempool/mod.rs b/src/op_pool/mempool/mod.rs index cce0c35d6..57425167d 100644 --- a/src/op_pool/mempool/mod.rs +++ b/src/op_pool/mempool/mod.rs @@ -16,12 +16,10 @@ pub use reputation::{ HourlyMovingAverageReputation, Reputation, ReputationParams, ReputationStatus, }; use strum::IntoEnumIterator; -use tokio::sync::broadcast; -use tokio_util::sync::CancellationToken; use tonic::async_trait; use self::error::MempoolResult; -use super::{chain::ChainUpdate, MempoolError}; +use super::chain::ChainUpdate; use crate::common::{ mempool::MempoolConfig, precheck, simulation, @@ -181,137 +179,6 @@ impl PoolOperation { } } -#[derive(Debug)] -pub struct MempoolGroup { - mempools: HashMap, - chain_updates: broadcast::Sender>, -} - -impl MempoolGroup -where - M: Mempool, -{ - pub fn new(mempools: Vec) -> Self { - Self { - mempools: mempools.into_iter().map(|m| (m.entry_point(), m)).collect(), - chain_updates: broadcast::channel(1024).0, - } - } - - pub fn subscribe_chain_update(self: Arc) -> broadcast::Receiver> { - self.chain_updates.subscribe() - } - - pub async fn run( - self: Arc, - mut chain_updates: broadcast::Receiver>, - shutdown_token: CancellationToken, - ) { - loop { - tokio::select! { - _ = shutdown_token.cancelled() => { - tracing::info!("Shutting down UoPool"); - break; - } - chain_update = chain_updates.recv() => { - if let Ok(chain_update) = chain_update { - // Update each mempool before notifying listeners of the chain update - // This allows the mempools to update their state before the listeners - // pull information from the mempool. - // For example, a bundle builder listening for a new block to kick off - // its bundle building process will want to be able to query the mempool - // and only receive operations that have not yet been mined. - for mempool in self.mempools.values() { - mempool.on_chain_update(&chain_update); - } - let _ = self.chain_updates.send(chain_update); - } - } - } - } - } - - pub fn get_supported_entry_points(&self) -> Vec

{ - self.mempools.keys().copied().collect() - } - - pub async fn add_op( - &self, - entry_point: Address, - op: UserOperation, - origin: OperationOrigin, - ) -> MempoolResult { - let mempool = self.get_pool(entry_point)?; - mempool.add_operation(origin, op).await - } - - pub fn get_ops(&self, entry_point: Address, max_ops: u64) -> MempoolResult> { - let mempool = self.get_pool(entry_point)?; - Ok(mempool - .best_operations(max_ops as usize) - .iter() - .map(|op| (**op).clone()) - .collect()) - } - - pub fn remove_ops(&self, entry_point: Address, ops: &[H256]) -> MempoolResult<()> { - let mempool = self.get_pool(entry_point)?; - mempool.remove_operations(ops); - Ok(()) - } - - pub fn remove_entities<'a>( - &self, - entry_point: Address, - entities: impl IntoIterator, - ) -> MempoolResult<()> { - let mempool = self.get_pool(entry_point)?; - for entity in entities { - mempool.remove_entity(*entity); - } - Ok(()) - } - - pub fn debug_clear_state(&self) -> MempoolResult<()> { - for mempool in self.mempools.values() { - mempool.clear(); - } - Ok(()) - } - - pub fn debug_dump_mempool(&self, entry_point: Address) -> MempoolResult> { - let mempool = self.get_pool(entry_point)?; - Ok(mempool - .all_operations(usize::MAX) - .iter() - .map(|op| (**op).clone()) - .collect()) - } - - pub fn debug_set_reputations<'a>( - &self, - entry_point: Address, - reputations: impl IntoIterator, - ) -> MempoolResult<()> { - let mempool = self.get_pool(entry_point)?; - for rep in reputations { - mempool.set_reputation(rep.address, rep.ops_seen, rep.ops_included); - } - Ok(()) - } - - pub fn debug_dump_reputation(&self, entry_point: Address) -> MempoolResult> { - let mempool = self.get_pool(entry_point)?; - Ok(mempool.dump_reputation()) - } - - fn get_pool(&self, entry_point: Address) -> MempoolResult<&M> { - self.mempools - .get(&entry_point) - .ok_or_else(|| MempoolError::UnknownEntryPoint(entry_point)) - } -} - #[cfg(test)] mod tests { use super::*; diff --git a/src/op_pool/mod.rs b/src/op_pool/mod.rs index 9d4a09ca0..ef7bcefcd 100644 --- a/src/op_pool/mod.rs +++ b/src/op_pool/mod.rs @@ -6,9 +6,6 @@ mod task; pub use mempool::{error::MempoolError, PoolConfig, PoolOperation, Reputation, ReputationStatus}; #[cfg(test)] -pub use server::MockPoolClient; -pub use server::{ - connect_remote_pool_client, LocalPoolClient, NewBlock, PoolClient, PoolClientMode, PoolResult, - PoolServerError, RemotePoolClient, ServerRequest as LocalPoolServerRequest, -}; +pub use server::MockPoolServer; +pub use server::{LocalPoolBuilder, PoolResult, PoolServer, PoolServerError, RemotePoolClient}; pub use task::*; diff --git a/src/op_pool/server/local.rs b/src/op_pool/server/local.rs new file mode 100644 index 000000000..e2856c012 --- /dev/null +++ b/src/op_pool/server/local.rs @@ -0,0 +1,602 @@ +use std::{collections::HashMap, sync::Arc}; + +use ethers::types::{Address, H256}; +use tokio::{ + sync::{broadcast, mpsc, oneshot}, + task::JoinHandle, +}; +use tokio_util::sync::CancellationToken; +use tonic::async_trait; + +use crate::{ + common::{ + server::{HealthCheck, ServerStatus}, + types::{Entity, UserOperation}, + }, + op_pool::{ + chain::ChainUpdate, + mempool::{Mempool, OperationOrigin, PoolOperation}, + server::{NewBlock, PoolServer, Reputation}, + MempoolError, PoolResult, PoolServerError, + }, +}; + +#[derive(Debug)] +pub struct LocalPoolBuilder { + req_sender: mpsc::Sender, + req_receiver: mpsc::Receiver, + block_sender: broadcast::Sender, +} + +impl LocalPoolBuilder { + pub fn new(request_capacity: usize, block_capacity: usize) -> Self { + let (req_sender, req_receiver) = mpsc::channel(request_capacity); + let (block_sender, _) = broadcast::channel(block_capacity); + Self { + req_sender, + req_receiver, + block_sender, + } + } + + pub fn get_handle(&self) -> LocalPoolHandle { + LocalPoolHandle { + req_sender: self.req_sender.clone(), + } + } + + pub fn run( + self, + mempools: HashMap>, + chain_updates: broadcast::Receiver>, + shutdown_token: CancellationToken, + ) -> JoinHandle> { + let mut runner = LocalPoolServerRunner::new( + self.req_receiver, + self.block_sender, + mempools, + chain_updates, + ); + tokio::spawn(async move { runner.run(shutdown_token).await }) + } +} + +#[derive(Debug)] +pub struct LocalPoolHandle { + req_sender: mpsc::Sender, +} + +pub struct LocalPoolServerRunner { + req_receiver: mpsc::Receiver, + block_sender: broadcast::Sender, + mempools: HashMap>, + chain_updates: broadcast::Receiver>, +} + +impl LocalPoolHandle { + async fn send(&self, request: ServerRequestKind) -> PoolResult { + let (send, recv) = oneshot::channel(); + self.req_sender + .send(ServerRequest { + request, + response: send, + }) + .await + .map_err(|_| anyhow::anyhow!("LocalPoolServer closed"))?; + recv.await + .map_err(|_| anyhow::anyhow!("LocalPoolServer closed"))? + } +} + +#[async_trait] +impl PoolServer for LocalPoolHandle { + async fn get_supported_entry_points(&self) -> PoolResult> { + let req = ServerRequestKind::GetSupportedEntryPoints; + let resp = self.send(req).await?; + match resp { + ServerResponse::GetSupportedEntryPoints { entry_points } => Ok(entry_points), + _ => Err(PoolServerError::UnexpectedResponse), + } + } + + async fn add_op(&self, entry_point: Address, op: UserOperation) -> PoolResult { + let req = ServerRequestKind::AddOp { + entry_point, + op, + origin: OperationOrigin::Local, + }; + let resp = self.send(req).await?; + match resp { + ServerResponse::AddOp { hash } => Ok(hash), + _ => Err(PoolServerError::UnexpectedResponse), + } + } + + async fn get_ops(&self, entry_point: Address, max_ops: u64) -> PoolResult> { + let req = ServerRequestKind::GetOps { + entry_point, + max_ops, + }; + let resp = self.send(req).await?; + match resp { + ServerResponse::GetOps { ops } => Ok(ops), + _ => Err(PoolServerError::UnexpectedResponse), + } + } + + async fn remove_ops(&self, entry_point: Address, ops: Vec) -> PoolResult<()> { + let req = ServerRequestKind::RemoveOps { entry_point, ops }; + let resp = self.send(req).await?; + match resp { + ServerResponse::RemoveOps => Ok(()), + _ => Err(PoolServerError::UnexpectedResponse), + } + } + + async fn remove_entities(&self, entry_point: Address, entities: Vec) -> PoolResult<()> { + let req = ServerRequestKind::RemoveEntities { + entry_point, + entities, + }; + let resp = self.send(req).await?; + match resp { + ServerResponse::RemoveEntities => Ok(()), + _ => Err(PoolServerError::UnexpectedResponse), + } + } + + async fn debug_clear_state(&self) -> Result<(), PoolServerError> { + let req = ServerRequestKind::DebugClearState; + let resp = self.send(req).await?; + match resp { + ServerResponse::DebugClearState => Ok(()), + _ => Err(PoolServerError::UnexpectedResponse), + } + } + + async fn debug_dump_mempool(&self, entry_point: Address) -> PoolResult> { + let req = ServerRequestKind::DebugDumpMempool { entry_point }; + let resp = self.send(req).await?; + match resp { + ServerResponse::DebugDumpMempool { ops } => Ok(ops), + _ => Err(PoolServerError::UnexpectedResponse), + } + } + + async fn debug_set_reputations( + &self, + entry_point: Address, + reputations: Vec, + ) -> PoolResult<()> { + let req = ServerRequestKind::DebugSetReputations { + entry_point, + reputations, + }; + let resp = self.send(req).await?; + match resp { + ServerResponse::DebugSetReputations => Ok(()), + _ => Err(PoolServerError::UnexpectedResponse), + } + } + + async fn debug_dump_reputation(&self, entry_point: Address) -> PoolResult> { + let req = ServerRequestKind::DebugDumpReputation { entry_point }; + let resp = self.send(req).await?; + match resp { + ServerResponse::DebugDumpReputation { reputations } => Ok(reputations), + _ => Err(PoolServerError::UnexpectedResponse), + } + } + + async fn subscribe_new_blocks(&self) -> PoolResult> { + let req = ServerRequestKind::SubscribeNewBlocks; + let resp = self.send(req).await?; + match resp { + ServerResponse::SubscribeNewBlocks { new_blocks } => Ok(new_blocks), + _ => Err(PoolServerError::UnexpectedResponse), + } + } +} + +#[async_trait] +impl HealthCheck for LocalPoolHandle { + fn name(&self) -> &'static str { + "LocalPoolServer" + } + + async fn status(&self) -> ServerStatus { + if self.get_supported_entry_points().await.is_ok() { + ServerStatus::Serving + } else { + ServerStatus::NotServing + } + } +} + +impl Clone for LocalPoolHandle { + fn clone(&self) -> Self { + Self { + req_sender: self.req_sender.clone(), + } + } +} + +impl LocalPoolServerRunner +where + M: Mempool, +{ + fn new( + req_receiver: mpsc::Receiver, + block_sender: broadcast::Sender, + mempools: HashMap>, + chain_updates: broadcast::Receiver>, + ) -> Self { + Self { + req_receiver, + block_sender, + mempools, + chain_updates, + } + } + + fn get_pool(&self, entry_point: Address) -> PoolResult<&Arc> { + self.mempools.get(&entry_point).ok_or_else(|| { + PoolServerError::MempoolError(MempoolError::UnknownEntryPoint(entry_point)) + }) + } + + fn get_ops(&self, entry_point: Address, max_ops: u64) -> PoolResult> { + let mempool = self.get_pool(entry_point)?; + Ok(mempool + .best_operations(max_ops as usize) + .iter() + .map(|op| (**op).clone()) + .collect()) + } + + fn remove_ops(&self, entry_point: Address, ops: &[H256]) -> PoolResult<()> { + let mempool = self.get_pool(entry_point)?; + mempool.remove_operations(ops); + Ok(()) + } + + fn remove_entities<'a>( + &self, + entry_point: Address, + entities: impl IntoIterator, + ) -> PoolResult<()> { + let mempool = self.get_pool(entry_point)?; + for entity in entities { + mempool.remove_entity(*entity); + } + Ok(()) + } + + fn debug_clear_state(&self) -> PoolResult<()> { + for mempool in self.mempools.values() { + mempool.clear(); + } + Ok(()) + } + + fn debug_dump_mempool(&self, entry_point: Address) -> PoolResult> { + let mempool = self.get_pool(entry_point)?; + Ok(mempool + .all_operations(usize::MAX) + .iter() + .map(|op| (**op).clone()) + .collect()) + } + + fn debug_set_reputations<'a>( + &self, + entry_point: Address, + reputations: impl IntoIterator, + ) -> PoolResult<()> { + let mempool = self.get_pool(entry_point)?; + for rep in reputations { + mempool.set_reputation(rep.address, rep.ops_seen, rep.ops_included); + } + Ok(()) + } + + fn debug_dump_reputation(&self, entry_point: Address) -> PoolResult> { + let mempool = self.get_pool(entry_point)?; + Ok(mempool.dump_reputation()) + } + + async fn run(&mut self, shutdown_token: CancellationToken) -> anyhow::Result<()> { + loop { + tokio::select! { + _ = shutdown_token.cancelled() => { + break; + } + chain_update = self.chain_updates.recv() => { + if let Ok(chain_update) = chain_update { + // Update each mempool before notifying listeners of the chain update + // This allows the mempools to update their state before the listeners + // pull information from the mempool. + // For example, a bundle builder listening for a new block to kick off + // its bundle building process will want to be able to query the mempool + // and only receive operations that have not yet been mined. + for mempool in self.mempools.values() { + mempool.on_chain_update(&chain_update); + } + + let _ = self.block_sender.send(NewBlock { + hash: chain_update.latest_block_hash, + number: chain_update.latest_block_number, + }); + } + } + Some(req) = self.req_receiver.recv() => { + let resp = match req.request { + ServerRequestKind::GetSupportedEntryPoints => { + Ok(ServerResponse::GetSupportedEntryPoints { + entry_points: self.mempools.keys().copied().collect() + }) + }, + ServerRequestKind::AddOp { entry_point, op, origin } => { + match self.get_pool(entry_point) { + Ok(mempool) => { + let mempool = Arc::clone(mempool); + tokio::spawn(async move { + let resp = match mempool.add_operation(origin, op).await { + Ok(hash) => Ok(ServerResponse::AddOp { hash }), + Err(e) => Err(e.into()), + }; + if let Err(e) = req.response.send(resp) { + tracing::error!("Failed to send response: {:?}", e); + } + }); + continue; + }, + Err(e) => Err(e), + } + }, + ServerRequestKind::GetOps { entry_point, max_ops } => { + match self.get_ops(entry_point, max_ops) { + Ok(ops) => Ok(ServerResponse::GetOps { ops }), + Err(e) => Err(e), + } + }, + ServerRequestKind::RemoveOps { entry_point, ops } => { + match self.remove_ops(entry_point, &ops) { + Ok(_) => Ok(ServerResponse::RemoveOps), + Err(e) => Err(e), + } + }, + ServerRequestKind::RemoveEntities { entry_point, entities } => { + match self.remove_entities(entry_point, &entities) { + Ok(_) => Ok(ServerResponse::RemoveEntities), + Err(e) => Err(e), + } + }, + ServerRequestKind::DebugClearState => { + match self.debug_clear_state() { + Ok(_) => Ok(ServerResponse::DebugClearState), + Err(e) => Err(e), + } + }, + ServerRequestKind::DebugDumpMempool { entry_point } => { + match self.debug_dump_mempool(entry_point) { + Ok(ops) => Ok(ServerResponse::DebugDumpMempool { ops }), + Err(e) => Err(e), + } + }, + ServerRequestKind::DebugSetReputations { entry_point, reputations } => { + match self.debug_set_reputations(entry_point, &reputations) { + Ok(_) => Ok(ServerResponse::DebugSetReputations), + Err(e) => Err(e), + } + }, + ServerRequestKind::DebugDumpReputation { entry_point } => { + match self.debug_dump_reputation(entry_point) { + Ok(reputations) => Ok(ServerResponse::DebugDumpReputation { reputations }), + Err(e) => Err(e), + } + }, + ServerRequestKind::SubscribeNewBlocks => { + Ok(ServerResponse::SubscribeNewBlocks { new_blocks: self.block_sender.subscribe() } ) + } + }; + if let Err(e) = req.response.send(resp) { + tracing::error!("Failed to send response: {:?}", e); + } + } + } + } + + Ok(()) + } +} + +#[derive(Debug)] +struct ServerRequest { + pub request: ServerRequestKind, + pub response: oneshot::Sender>, +} + +#[derive(Debug)] +enum ServerRequestKind { + GetSupportedEntryPoints, + AddOp { + entry_point: Address, + op: UserOperation, + origin: OperationOrigin, + }, + GetOps { + entry_point: Address, + max_ops: u64, + }, + RemoveOps { + entry_point: Address, + ops: Vec, + }, + RemoveEntities { + entry_point: Address, + entities: Vec, + }, + DebugClearState, + DebugDumpMempool { + entry_point: Address, + }, + DebugSetReputations { + entry_point: Address, + reputations: Vec, + }, + DebugDumpReputation { + entry_point: Address, + }, + SubscribeNewBlocks, +} + +#[derive(Debug)] +enum ServerResponse { + GetSupportedEntryPoints { + entry_points: Vec
, + }, + AddOp { + hash: H256, + }, + GetOps { + ops: Vec, + }, + RemoveOps, + RemoveEntities, + DebugClearState, + DebugDumpMempool { + ops: Vec, + }, + DebugSetReputations, + DebugDumpReputation { + reputations: Vec, + }, + SubscribeNewBlocks { + new_blocks: broadcast::Receiver, + }, +} + +#[cfg(test)] +mod tests { + use std::{iter::zip, sync::Arc}; + + use super::*; + use crate::op_pool::{chain::ChainUpdate, mempool::MockMempool}; + + #[tokio::test] + async fn test_add_op() { + let mut mock_pool = MockMempool::new(); + let hash0 = H256::random(); + mock_pool + .expect_add_operation() + .returning(move |_, _| Ok(hash0)); + + let ep = Address::random(); + let state = setup(HashMap::from([(ep, Arc::new(mock_pool))])); + + let hash1 = state + .handle + .add_op(ep, UserOperation::default()) + .await + .unwrap(); + assert_eq!(hash0, hash1); + } + + #[tokio::test] + async fn test_chain_update() { + let mut mock_pool = MockMempool::new(); + mock_pool.expect_on_chain_update().returning(|_| ()); + + let ep = Address::random(); + let state = setup(HashMap::from([(ep, Arc::new(mock_pool))])); + + let mut sub = state.handle.subscribe_new_blocks().await.unwrap(); + + let hash = H256::random(); + let number = 1234; + state + .chain_update_tx + .send(Arc::new(ChainUpdate { + latest_block_hash: hash, + latest_block_number: number, + ..Default::default() + })) + .unwrap(); + + let new_block = sub.recv().await.unwrap(); + assert_eq!(hash, new_block.hash); + assert_eq!(number, new_block.number); + } + + #[tokio::test] + async fn test_get_supported_entry_points() { + let mut eps0 = vec![Address::random(), Address::random(), Address::random()]; + + let state = setup( + eps0.iter() + .map(|ep| (*ep, Arc::new(MockMempool::new()))) + .collect(), + ); + + let mut eps1 = state.handle.get_supported_entry_points().await.unwrap(); + + eps0.sort(); + eps1.sort(); + assert_eq!(eps0, eps1); + } + + #[tokio::test] + async fn test_multiple_entry_points() { + let eps = [Address::random(), Address::random(), Address::random()]; + let mut pools = [MockMempool::new(), MockMempool::new(), MockMempool::new()]; + let h0 = H256::random(); + let h1 = H256::random(); + let h2 = H256::random(); + let hashes = [h0, h1, h2]; + pools[0] + .expect_add_operation() + .returning(move |_, _| Ok(h0)); + pools[1] + .expect_add_operation() + .returning(move |_, _| Ok(h1)); + pools[2] + .expect_add_operation() + .returning(move |_, _| Ok(h2)); + + let state = setup( + zip(eps.iter(), pools.into_iter()) + .map(|(ep, pool)| (*ep, Arc::new(pool))) + .collect(), + ); + + for (ep, hash) in zip(eps.iter(), hashes.iter()) { + assert_eq!( + *hash, + state + .handle + .add_op(*ep, UserOperation::default()) + .await + .unwrap() + ); + } + } + + struct State { + handle: LocalPoolHandle, + chain_update_tx: broadcast::Sender>, + _run_handle: JoinHandle>, + } + + fn setup(pools: HashMap>) -> State { + let builder = LocalPoolBuilder::new(10, 10); + let handle = builder.get_handle(); + let (tx, rx) = broadcast::channel(10); + let run_handle = builder.run(pools, rx, CancellationToken::new()); + State { + handle, + chain_update_tx: tx, + _run_handle: run_handle, + } + } +} diff --git a/src/op_pool/server/local/client.rs b/src/op_pool/server/local/client.rs deleted file mode 100644 index 18fe6413c..000000000 --- a/src/op_pool/server/local/client.rs +++ /dev/null @@ -1,154 +0,0 @@ -use ethers::types::{Address, H256}; -use tokio::sync::{broadcast, mpsc, oneshot}; -use tonic::async_trait; - -use super::server::{ServerRequest, ServerRequestKind, ServerResponse}; -use crate::{ - common::types::{Entity, UserOperation}, - op_pool::{ - mempool::PoolOperation, - server::{error::PoolServerError, NewBlock, PoolClient, Reputation}, - PoolResult, - }, -}; - -#[derive(Debug)] -pub struct LocalPoolClient { - sender: mpsc::Sender, - block_receiver: broadcast::Receiver, -} - -impl LocalPoolClient { - pub fn new( - sender: mpsc::Sender, - block_receiver: broadcast::Receiver, - ) -> Self { - Self { - sender, - block_receiver, - } - } - - async fn send(&self, request: ServerRequestKind) -> PoolResult { - let (send, recv) = oneshot::channel(); - self.sender - .send(ServerRequest { - request, - response: send, - }) - .await - .map_err(|_| anyhow::anyhow!("LocalPoolServer closed"))?; - recv.await - .map_err(|_| anyhow::anyhow!("LocalPoolServer closed"))? - } -} - -#[async_trait] -impl PoolClient for LocalPoolClient { - async fn get_supported_entry_points(&self) -> PoolResult> { - let req = ServerRequestKind::GetSupportedEntryPoints; - let resp = self.send(req).await?; - match resp { - ServerResponse::GetSupportedEntryPoints { entry_points } => Ok(entry_points), - _ => Err(PoolServerError::UnexpectedResponse), - } - } - - async fn add_op(&self, entry_point: Address, op: UserOperation) -> PoolResult { - let req = ServerRequestKind::AddOp { entry_point, op }; - let resp = self.send(req).await?; - match resp { - ServerResponse::AddOp { hash } => Ok(hash), - _ => Err(PoolServerError::UnexpectedResponse), - } - } - - async fn get_ops(&self, entry_point: Address, max_ops: u64) -> PoolResult> { - let req = ServerRequestKind::GetOps { - entry_point, - max_ops, - }; - let resp = self.send(req).await?; - match resp { - ServerResponse::GetOps { ops } => Ok(ops), - _ => Err(PoolServerError::UnexpectedResponse), - } - } - - async fn remove_ops(&self, entry_point: Address, ops: Vec) -> PoolResult<()> { - let req = ServerRequestKind::RemoveOps { entry_point, ops }; - let resp = self.send(req).await?; - match resp { - ServerResponse::RemoveOps => Ok(()), - _ => Err(PoolServerError::UnexpectedResponse), - } - } - - async fn remove_entities(&self, entry_point: Address, entities: Vec) -> PoolResult<()> { - let req = ServerRequestKind::RemoveEntities { - entry_point, - entities, - }; - let resp = self.send(req).await?; - match resp { - ServerResponse::RemoveEntities => Ok(()), - _ => Err(PoolServerError::UnexpectedResponse), - } - } - - async fn debug_clear_state(&self) -> Result<(), PoolServerError> { - let req = ServerRequestKind::DebugClearState; - let resp = self.send(req).await?; - match resp { - ServerResponse::DebugClearState => Ok(()), - _ => Err(PoolServerError::UnexpectedResponse), - } - } - - async fn debug_dump_mempool(&self, entry_point: Address) -> PoolResult> { - let req = ServerRequestKind::DebugDumpMempool { entry_point }; - let resp = self.send(req).await?; - match resp { - ServerResponse::DebugDumpMempool { ops } => Ok(ops), - _ => Err(PoolServerError::UnexpectedResponse), - } - } - - async fn debug_set_reputations( - &self, - entry_point: Address, - reputations: Vec, - ) -> PoolResult<()> { - let req = ServerRequestKind::DebugSetReputations { - entry_point, - reputations, - }; - let resp = self.send(req).await?; - match resp { - ServerResponse::DebugSetReputations => Ok(()), - _ => Err(PoolServerError::UnexpectedResponse), - } - } - - async fn debug_dump_reputation(&self, entry_point: Address) -> PoolResult> { - let req = ServerRequestKind::DebugDumpReputation { entry_point }; - let resp = self.send(req).await?; - match resp { - ServerResponse::DebugDumpReputation { reputations } => Ok(reputations), - _ => Err(PoolServerError::UnexpectedResponse), - } - } - - async fn subscribe_new_blocks(&self) -> PoolResult> { - Ok(self.block_receiver.resubscribe()) - } -} - -impl Clone for LocalPoolClient { - fn clone(&self) -> Self { - Self { - sender: self.sender.clone(), - block_receiver: self.block_receiver.resubscribe(), - } - } -} diff --git a/src/op_pool/server/local/mod.rs b/src/op_pool/server/local/mod.rs deleted file mode 100644 index 24bf375cd..000000000 --- a/src/op_pool/server/local/mod.rs +++ /dev/null @@ -1,5 +0,0 @@ -mod client; -mod server; - -pub use client::*; -pub use server::*; diff --git a/src/op_pool/server/local/server.rs b/src/op_pool/server/local/server.rs deleted file mode 100644 index 3f6bad1a0..000000000 --- a/src/op_pool/server/local/server.rs +++ /dev/null @@ -1,323 +0,0 @@ -use std::sync::Arc; - -use ethers::types::{Address, H256}; -use tokio::{ - sync::{broadcast, mpsc, oneshot}, - task::JoinHandle, -}; -use tokio_util::sync::CancellationToken; - -use crate::{ - common::types::{Entity, UserOperation}, - op_pool::{ - mempool::{Mempool, MempoolGroup, OperationOrigin, PoolOperation}, - server::{NewBlock, Reputation}, - PoolResult, - }, -}; - -pub fn spawn_local_mempool_server( - mempool_runner: Arc>, - req_receiver: mpsc::Receiver, - block_sender: broadcast::Sender, - shutdown_token: CancellationToken, -) -> anyhow::Result>> { - let mut server = LocalPoolServer::new(req_receiver, block_sender, mempool_runner); - let handle = tokio::spawn(async move { server.run(shutdown_token).await }); - Ok(handle) -} - -pub struct LocalPoolServer { - req_receiver: mpsc::Receiver, - block_sender: broadcast::Sender, - mempools: Arc>, -} - -impl LocalPoolServer -where - M: Mempool, -{ - pub fn new( - req_receiver: mpsc::Receiver, - block_sender: broadcast::Sender, - mempools: Arc>, - ) -> Self { - Self { - req_receiver, - block_sender, - mempools, - } - } - - pub async fn run(&mut self, shutdown_token: CancellationToken) -> anyhow::Result<()> { - let mut chain_updates = self.mempools.clone().subscribe_chain_update(); - - loop { - tokio::select! { - _ = shutdown_token.cancelled() => { - break; - } - chain_update = chain_updates.recv() => { - if let Ok(chain_update) = chain_update { - let _ = self.block_sender.send(NewBlock { - hash: chain_update.latest_block_hash, - number: chain_update.latest_block_number, - }); - } - } - Some(req) = self.req_receiver.recv() => { - let resp = match req.request { - ServerRequestKind::GetSupportedEntryPoints => { - Ok(ServerResponse::GetSupportedEntryPoints { - entry_points: self.mempools.get_supported_entry_points() - }) - }, - ServerRequestKind::AddOp { entry_point, op } => { - let mempools = self.mempools.clone(); - tokio::spawn(async move { - let resp = match mempools.add_op(entry_point, op, OperationOrigin::Local).await { - Ok(hash) => Ok(ServerResponse::AddOp { hash }), - Err(e) => Err(e.into()), - }; - req.response.send(resp).unwrap(); - }); - continue; - }, - ServerRequestKind::GetOps { entry_point, max_ops } => { - match self.mempools.get_ops(entry_point, max_ops) { - Ok(ops) => Ok(ServerResponse::GetOps { ops }), - Err(e) => Err(e.into()), - } - }, - ServerRequestKind::RemoveOps { entry_point, ops } => { - match self.mempools.remove_ops(entry_point, &ops) { - Ok(_) => Ok(ServerResponse::RemoveOps), - Err(e) => Err(e.into()), - } - }, - ServerRequestKind::RemoveEntities { entry_point, entities } => { - match self.mempools.remove_entities(entry_point, &entities) { - Ok(_) => Ok(ServerResponse::RemoveOps), - Err(e) => Err(e.into()), - } - }, - ServerRequestKind::DebugClearState => { - match self.mempools.debug_clear_state() { - Ok(_) => Ok(ServerResponse::RemoveOps), - Err(e) => Err(e.into()), - } - }, - ServerRequestKind::DebugDumpMempool { entry_point } => { - match self.mempools.debug_dump_mempool(entry_point) { - Ok(ops) => Ok(ServerResponse::DebugDumpMempool { ops }), - Err(e) => Err(e.into()), - } - }, - ServerRequestKind::DebugSetReputations { entry_point, reputations } => { - match self.mempools.debug_set_reputations(entry_point, &reputations) { - Ok(_) => Ok(ServerResponse::DebugSetReputations), - Err(e) => Err(e.into()), - } - }, - ServerRequestKind::DebugDumpReputation { entry_point } => { - match self.mempools.debug_dump_reputation(entry_point) { - Ok(reputations) => Ok(ServerResponse::DebugDumpReputation { reputations }), - Err(e) => Err(e.into()), - } - } - }; - if let Err(e) = req.response.send(resp) { - tracing::error!("Failed to send response: {:?}", e); - } - } - } - } - - Ok(()) - } -} - -#[derive(Debug)] -pub struct ServerRequest { - pub request: ServerRequestKind, - pub response: oneshot::Sender>, -} - -#[derive(Clone, Debug)] -pub enum ServerRequestKind { - GetSupportedEntryPoints, - AddOp { - entry_point: Address, - op: UserOperation, - }, - GetOps { - entry_point: Address, - max_ops: u64, - }, - RemoveOps { - entry_point: Address, - ops: Vec, - }, - RemoveEntities { - entry_point: Address, - entities: Vec, - }, - DebugClearState, - DebugDumpMempool { - entry_point: Address, - }, - DebugSetReputations { - entry_point: Address, - reputations: Vec, - }, - DebugDumpReputation { - entry_point: Address, - }, -} - -#[derive(Clone, Debug)] -pub enum ServerResponse { - GetSupportedEntryPoints { entry_points: Vec
}, - AddOp { hash: H256 }, - GetOps { ops: Vec }, - RemoveOps, - RemoveEntities, - DebugClearState, - DebugDumpMempool { ops: Vec }, - DebugSetReputations, - DebugDumpReputation { reputations: Vec }, -} - -#[cfg(test)] -mod tests { - use std::sync::Arc; - - use super::*; - use crate::op_pool::{chain::ChainUpdate, mempool::MockMempool, LocalPoolClient, PoolClient}; - - #[tokio::test] - async fn send_receive() { - let ep = Address::random(); - let mut mock_pool = MockMempool::new(); - mock_pool.expect_entry_point().returning(move || ep); - - let mempool_group = Arc::new(MempoolGroup::new(vec![mock_pool])); - let (tx, rx) = mpsc::channel(1); - let shutdown_token = CancellationToken::new(); - let (block_tx, block_rx) = broadcast::channel(1); - let handle = - spawn_local_mempool_server(mempool_group, rx, block_tx, shutdown_token.clone()) - .unwrap(); - let client = LocalPoolClient::new(tx, block_rx); - - let ret = client.get_supported_entry_points().await.unwrap(); - assert_eq!(ret, vec![ep]); - - shutdown_token.cancel(); - handle.await.unwrap().unwrap(); - } - - #[tokio::test] - async fn early_shutdown() { - let ep = Address::random(); - let mut mock_pool = MockMempool::new(); - mock_pool.expect_entry_point().returning(move || ep); - - let mempool_group = Arc::new(MempoolGroup::new(vec![mock_pool])); - let (tx, rx) = mpsc::channel(1); - let shutdown_token = CancellationToken::new(); - let (block_tx, block_rx) = broadcast::channel(1); - let handle = - spawn_local_mempool_server(mempool_group, rx, block_tx, shutdown_token.clone()) - .unwrap(); - - shutdown_token.cancel(); - handle.await.unwrap().unwrap(); - - let client = LocalPoolClient::new(tx, block_rx); - let ret = client.get_supported_entry_points().await; - assert!(ret.is_err()); - } - - #[tokio::test] - async fn add_op() { - let ep = Address::random(); - let mut mock_pool = MockMempool::new(); - let uo = UserOperation::default(); - let uo_hash = uo.op_hash(ep, 0); - - mock_pool.expect_entry_point().returning(move || ep); - mock_pool - .expect_add_operation() - .returning(move |_, _| Ok(uo_hash)); - - let mempool_group = Arc::new(MempoolGroup::new(vec![mock_pool])); - let (tx, rx) = mpsc::channel(1); - let shutdown_token = CancellationToken::new(); - let (block_tx, block_rx) = broadcast::channel(1); - let handle = - spawn_local_mempool_server(mempool_group, rx, block_tx, shutdown_token.clone()) - .unwrap(); - let client = LocalPoolClient::new(tx, block_rx); - - let ret = client.add_op(ep, uo).await.unwrap(); - assert_eq!(ret, uo_hash); - - shutdown_token.cancel(); - handle.await.unwrap().unwrap(); - } - - #[tokio::test] - async fn send_blocks() { - let ep = Address::random(); - let mut mock_pool = MockMempool::new(); - let uo = UserOperation::default(); - let uo_hash = uo.op_hash(ep, 0); - - mock_pool.expect_entry_point().returning(move || ep); - mock_pool - .expect_add_operation() - .returning(move |_, _| Ok(uo_hash)); - mock_pool - .expect_on_chain_update() - .times(..11) - .returning(|_| {}); - - let mempool_group = Arc::new(MempoolGroup::new(vec![mock_pool])); - let (tx, rx) = mpsc::channel(1); - let shutdown_token = CancellationToken::new(); - let (block_tx, block_rx) = broadcast::channel(10); - let handle = - spawn_local_mempool_server(mempool_group.clone(), rx, block_tx, shutdown_token.clone()) - .unwrap(); - let client = LocalPoolClient::new(tx, block_rx); - - let (new_blocks_tx, new_blocks_rx) = broadcast::channel(10); - - let mempool_shutdown = shutdown_token.clone(); - let mempool_handle = tokio::spawn(async move { - mempool_group.run(new_blocks_rx, mempool_shutdown).await; - }); - - let mut recv = client.subscribe_new_blocks().await.unwrap(); - - for i in 0..10 { - new_blocks_tx - .send(Arc::new(ChainUpdate { - latest_block_hash: H256::random(), - latest_block_number: i, - ..Default::default() - })) - .unwrap(); - } - - for i in 0..10 { - let ret = recv.recv().await.unwrap(); - assert_eq!(ret.number, i); - } - - shutdown_token.cancel(); - handle.await.unwrap().unwrap(); - mempool_handle.await.unwrap(); - } -} diff --git a/src/op_pool/server/mod.rs b/src/op_pool/server/mod.rs index 48915ad76..66136dbf2 100644 --- a/src/op_pool/server/mod.rs +++ b/src/op_pool/server/mod.rs @@ -4,25 +4,28 @@ mod remote; pub use error::PoolServerError; use ethers::types::{Address, H256}; -pub use local::{spawn_local_mempool_server, LocalPoolClient, ServerRequest}; +pub use local::{LocalPoolBuilder, LocalPoolHandle}; #[cfg(test)] use mockall::automock; -pub use remote::{connect_remote_pool_client, spawn_remote_mempool_server, RemotePoolClient}; -use tokio::sync::{broadcast, mpsc}; +pub use remote::{spawn_remote_mempool_server, RemotePoolClient}; +use tokio::sync::broadcast; use tonic::async_trait; use super::{mempool::PoolOperation, Reputation}; -use crate::{ - common::types::{Entity, UserOperation}, - op_pool::LocalPoolServerRequest, -}; +use crate::common::types::{Entity, UserOperation}; pub type Error = error::PoolServerError; pub type PoolResult = std::result::Result; +#[derive(Clone, Debug)] +pub struct NewBlock { + pub hash: H256, + pub number: u64, +} + #[cfg_attr(test, automock)] #[async_trait] -pub trait PoolClient: Send + Sync + 'static { +pub trait PoolServer: Send + Sync + 'static { async fn get_supported_entry_points(&self) -> PoolResult>; async fn add_op(&self, entry_point: Address, op: UserOperation) -> PoolResult; @@ -47,20 +50,3 @@ pub trait PoolClient: Send + Sync + 'static { async fn subscribe_new_blocks(&self) -> PoolResult>; } - -#[derive(Clone, Debug)] -pub struct NewBlock { - pub hash: H256, - pub number: u64, -} - -#[derive(Debug)] -pub enum PoolClientMode { - Local { - req_sender: mpsc::Sender, - block_receiver: broadcast::Receiver, - }, - Remote { - url: String, - }, -} diff --git a/src/op_pool/server/remote/client.rs b/src/op_pool/server/remote/client.rs index b9af81929..597eaf205 100644 --- a/src/op_pool/server/remote/client.rs +++ b/src/op_pool/server/remote/client.rs @@ -1,8 +1,15 @@ -use anyhow::bail; +use std::{future::Future, pin::Pin, str::FromStr}; + use ethers::types::{Address, H256}; use tokio::sync::broadcast; -use tokio_util::sync::CancellationToken; -use tonic::{async_trait, transport::Channel}; +use tonic::{ + async_trait, + transport::{Channel, Uri}, +}; +use tonic_health::{ + pb::{health_client::HealthClient, HealthCheckRequest}, + ServingStatus, +}; use super::protos::{ self, add_op_response, debug_clear_state_response, debug_dump_mempool_response, @@ -15,12 +22,12 @@ use super::protos::{ use crate::{ common::{ protos::{from_bytes, ConversionError}, - server::connect_with_retries, + server::{HealthCheck, ServerStatus}, types::{Entity, UserOperation}, }, op_pool::{ mempool::{PoolOperation, Reputation}, - server::{error::PoolServerError, NewBlock, PoolClient}, + server::{error::PoolServerError, NewBlock, PoolServer}, PoolResult, }, }; @@ -28,18 +35,29 @@ use crate::{ #[derive(Debug, Clone)] pub struct RemotePoolClient { op_pool_client: OpPoolClient, + op_pool_health: HealthClient, } impl RemotePoolClient { - pub fn new(client: OpPoolClient) -> Self { - Self { - op_pool_client: client, + pub fn connect_func( + ) -> impl Fn(String) -> Pin> + Send + 'static>> + { + |url| { + Box::pin(async move { + let op_pool_client = OpPoolClient::connect(url.clone()).await?; + let op_pool_health = + HealthClient::new(Channel::builder(Uri::from_str(&url)?).connect().await?); + Ok(Self { + op_pool_client, + op_pool_health, + }) + }) } } } #[async_trait] -impl PoolClient for RemotePoolClient { +impl PoolServer for RemotePoolClient { async fn get_supported_entry_points(&self) -> PoolResult> { Ok(self .op_pool_client @@ -281,17 +299,27 @@ impl PoolClient for RemotePoolClient { } } -pub async fn connect_remote_pool_client( - op_pool_url: &str, - shutdown_token: CancellationToken, -) -> anyhow::Result { - tokio::select! { - _ = shutdown_token.cancelled() => { - tracing::error!("bailing from connecting client, server shutting down"); - bail!("Server shutting down") - } - res = connect_with_retries("op pool from builder", op_pool_url, OpPoolClient::connect) => { - Ok(RemotePoolClient::new(res?)) +#[async_trait] +impl HealthCheck for RemotePoolClient { + fn name(&self) -> &'static str { + "RemotePoolServer" + } + + async fn status(&self) -> ServerStatus { + match self + .op_pool_health + .clone() + .check(HealthCheckRequest::default()) + .await + { + Ok(status) => { + if status.into_inner().status == ServingStatus::Serving as i32 { + ServerStatus::Serving + } else { + ServerStatus::NotServing + } + } + Err(_) => ServerStatus::NotServing, } } } diff --git a/src/op_pool/server/remote/protos.rs b/src/op_pool/server/remote/protos.rs index b148d55b1..f243013c8 100644 --- a/src/op_pool/server/remote/protos.rs +++ b/src/op_pool/server/remote/protos.rs @@ -11,7 +11,8 @@ use crate::{ }, op_pool::{ mempool::{Reputation as PoolReputation, ReputationStatus as PoolReputationStatus}, - NewBlock as PoolNewBlock, PoolOperation, + server::NewBlock as PoolNewBlock, + PoolOperation, }, }; @@ -219,3 +220,12 @@ impl TryFrom for PoolNewBlock { }) } } + +impl From for NewBlock { + fn from(block: PoolNewBlock) -> Self { + Self { + hash: block.hash.as_bytes().to_vec(), + number: block.number, + } + } +} diff --git a/src/op_pool/server/remote/server.rs b/src/op_pool/server/remote/server.rs index e383bed46..749d56f96 100644 --- a/src/op_pool/server/remote/server.rs +++ b/src/op_pool/server/remote/server.rs @@ -22,27 +22,26 @@ use super::protos::{ DebugDumpReputationRequest, DebugDumpReputationResponse, DebugDumpReputationSuccess, DebugSetReputationRequest, DebugSetReputationResponse, DebugSetReputationSuccess, GetOpsRequest, GetOpsResponse, GetOpsSuccess, GetSupportedEntryPointsRequest, - GetSupportedEntryPointsResponse, MempoolOp, NewBlock, RemoveEntitiesRequest, - RemoveEntitiesResponse, RemoveEntitiesSuccess, RemoveOpsRequest, RemoveOpsResponse, - RemoveOpsSuccess, SubscribeNewBlocksRequest, SubscribeNewBlocksResponse, - OP_POOL_FILE_DESCRIPTOR_SET, + GetSupportedEntryPointsResponse, MempoolOp, RemoveEntitiesRequest, RemoveEntitiesResponse, + RemoveEntitiesSuccess, RemoveOpsRequest, RemoveOpsResponse, RemoveOpsSuccess, + SubscribeNewBlocksRequest, SubscribeNewBlocksResponse, OP_POOL_FILE_DESCRIPTOR_SET, }; use crate::{ common::{grpc::metrics::GrpcMetricsLayer, protos::from_bytes, types::Entity}, - op_pool::mempool::{Mempool, MempoolGroup, OperationOrigin, Reputation}, + op_pool::{mempool::Reputation, server::local::LocalPoolHandle, PoolServer}, }; const MAX_REMOTE_BLOCK_SUBSCRIPTIONS: usize = 32; -pub async fn spawn_remote_mempool_server( +pub async fn spawn_remote_mempool_server( chain_id: u64, - mempool_group: Arc>, + local_pool: LocalPoolHandle, addr: SocketAddr, shutdown_token: CancellationToken, ) -> anyhow::Result>> { // gRPC server - let pool_impl = Arc::new(OpPoolImpl::new(chain_id, mempool_group)); - let op_pool_server = OpPoolServer::new(Arc::clone(&pool_impl)); + let pool_impl = OpPoolImpl::new(chain_id, local_pool); + let op_pool_server = OpPoolServer::new(pool_impl); let reflection_service = tonic_reflection::server::Builder::configure() .register_encoded_file_descriptor_set(OP_POOL_FILE_DESCRIPTOR_SET) .build()?; @@ -50,7 +49,7 @@ pub async fn spawn_remote_mempool_server( // health service let (mut health_reporter, health_service) = tonic_health::server::health_reporter(); health_reporter - .set_serving::>>>() + .set_serving::>() .await; let metrics_layer = GrpcMetricsLayer::new("op_pool".to_string()); @@ -68,20 +67,17 @@ pub async fn spawn_remote_mempool_server( Ok(handle) } -struct OpPoolImpl { +struct OpPoolImpl { chain_id: u64, - mempools: Arc>, + local_pool: LocalPoolHandle, num_block_subscriptions: Arc, } -impl OpPoolImpl -where - M: Mempool, -{ - pub fn new(chain_id: u64, mempools: Arc>) -> Self { +impl OpPoolImpl { + pub fn new(chain_id: u64, local_pool: LocalPoolHandle) -> Self { Self { chain_id, - mempools, + local_pool, num_block_subscriptions: Arc::new(AtomicUsize::new(0)), } } @@ -93,23 +89,25 @@ where } #[async_trait] -impl OpPool for Arc> -where - M: Mempool + 'static, -{ +impl OpPool for OpPoolImpl { async fn get_supported_entry_points( &self, _request: Request, ) -> Result> { - Ok(Response::new(GetSupportedEntryPointsResponse { - chain_id: self.chain_id, - entry_points: self - .mempools - .get_supported_entry_points() - .into_iter() - .map(|ep| ep.as_bytes().to_vec()) - .collect(), - })) + let resp = match self.local_pool.get_supported_entry_points().await { + Ok(entry_points) => GetSupportedEntryPointsResponse { + chain_id: self.chain_id, + entry_points: entry_points + .into_iter() + .map(|ep| ep.as_bytes().to_vec()) + .collect(), + }, + Err(e) => { + return Err(Status::internal(format!("Failed to get entry points: {e}"))); + } + }; + + Ok(Response::new(resp)) } async fn add_op(&self, request: Request) -> Result> { @@ -123,7 +121,7 @@ where Status::invalid_argument(format!("Failed to convert to UserOperation: {e}")) })?; - let resp = match self.mempools.add_op(ep, uo, OperationOrigin::Local).await { + let resp = match self.local_pool.add_op(ep, uo).await { Ok(hash) => AddOpResponse { result: Some(add_op_response::Result::Success(AddOpSuccess { hash: hash.as_bytes().to_vec(), @@ -141,7 +139,7 @@ where let req = request.into_inner(); let ep = self.get_entry_point(&req.entry_point)?; - let resp = match self.mempools.get_ops(ep, req.max_ops) { + let resp = match self.local_pool.get_ops(ep, req.max_ops).await { Ok(ops) => GetOpsResponse { result: Some(get_ops_response::Result::Success(GetOpsSuccess { ops: ops.iter().map(MempoolOp::from).collect(), @@ -173,7 +171,7 @@ where }) .collect::, _>>()?; - let resp = match self.mempools.remove_ops(ep, &hashes) { + let resp = match self.local_pool.remove_ops(ep, hashes).await { Ok(_) => RemoveOpsResponse { result: Some(remove_ops_response::Result::Success(RemoveOpsSuccess {})), }, @@ -198,11 +196,7 @@ where .collect::, _>>() .map_err(|e| Status::internal(format!("Failed to convert to proto entity: {e}")))?; - self.mempools - .remove_entities(ep, &entities) - .map_err(|e| Status::internal(e.to_string()))?; - - let resp = match self.mempools.remove_entities(ep, &entities) { + let resp = match self.local_pool.remove_entities(ep, entities).await { Ok(_) => RemoveEntitiesResponse { result: Some(remove_entities_response::Result::Success( RemoveEntitiesSuccess {}, @@ -220,7 +214,7 @@ where &self, _request: Request, ) -> Result> { - let resp = match self.mempools.debug_clear_state() { + let resp = match self.local_pool.debug_clear_state().await { Ok(_) => DebugClearStateResponse { result: Some(debug_clear_state_response::Result::Success( DebugClearStateSuccess {}, @@ -241,7 +235,7 @@ where let req = request.into_inner(); let ep = self.get_entry_point(&req.entry_point)?; - let resp = match self.mempools.debug_dump_mempool(ep) { + let resp = match self.local_pool.debug_dump_mempool(ep).await { Ok(ops) => DebugDumpMempoolResponse { result: Some(debug_dump_mempool_response::Result::Success( DebugDumpMempoolSuccess { @@ -280,7 +274,7 @@ where Status::internal(format!("Failed to convert from proto reputation {e}")) })?; - let resp = match self.mempools.debug_set_reputations(ep, &reps) { + let resp = match self.local_pool.debug_set_reputations(ep, reps).await { Ok(_) => DebugSetReputationResponse { result: Some(debug_set_reputation_response::Result::Success( DebugSetReputationSuccess {}, @@ -301,7 +295,7 @@ where let req = request.into_inner(); let ep = self.get_entry_point(&req.entry_point)?; - let resp = match self.mempools.debug_dump_reputation(ep) { + let resp = match self.local_pool.debug_dump_reputation(ep).await { Ok(reps) => DebugDumpReputationResponse { result: Some(debug_dump_reputation_response::Result::Success( DebugDumpReputationSuccess { @@ -333,17 +327,23 @@ where self.num_block_subscriptions.fetch_add(1, Ordering::Relaxed); let num_block_subscriptions = Arc::clone(&self.num_block_subscriptions); - let mut chain_updates = self.mempools.clone().subscribe_chain_update(); + let mut new_blocks = match self.local_pool.subscribe_new_blocks().await { + Ok(new_blocks) => new_blocks, + Err(error) => { + tracing::error!("Failed to subscribe to new blocks: {error}"); + return Err(Status::internal(format!( + "Failed to subscribe to new blocks: {error}" + ))); + } + }; + tokio::spawn(async move { loop { - match chain_updates.recv().await { - Ok(chain_update) => { + match new_blocks.recv().await { + Ok(new_block) => { if tx .send(Ok(SubscribeNewBlocksResponse { - new_block: Some(NewBlock { - hash: chain_update.latest_block_hash.as_bytes().to_vec(), - number: chain_update.latest_block_number, - }), + new_block: Some(new_block.into()), })) .await .is_err() @@ -352,7 +352,7 @@ where } } Err(_) => { - tracing::warn!("chain update channel closed"); + tracing::warn!("new block subscription closed"); break; } } diff --git a/src/op_pool/task.rs b/src/op_pool/task.rs index 530ef6783..a563f2e0b 100644 --- a/src/op_pool/task.rs +++ b/src/op_pool/task.rs @@ -1,21 +1,15 @@ -use std::{net::SocketAddr, sync::Arc, time::Duration}; +use std::{collections::HashMap, net::SocketAddr, sync::Arc, time::Duration}; use anyhow::{bail, Context}; use ethers::providers::{ Http, HttpRateLimitRetryPolicy, JsonRpcClient, Provider, RetryClientBuilder, }; -use tokio::{ - sync::{broadcast, mpsc}, - try_join, -}; +use tokio::{sync::broadcast, try_join}; use tokio_util::sync::CancellationToken; use tonic::async_trait; use url::Url; -use super::{ - mempool::{HourlyMovingAverageReputation, PoolConfig, ReputationParams}, - server::{NewBlock, ServerRequest}, -}; +use super::mempool::{HourlyMovingAverageReputation, PoolConfig, ReputationParams}; use crate::{ common::{ contracts::i_entry_point::IEntryPoint, @@ -28,20 +22,15 @@ use crate::{ op_pool::{ chain::{self, Chain}, emit::OpPoolEvent, - mempool::{uo_pool::UoPool, MempoolGroup}, - server::{spawn_local_mempool_server, spawn_remote_mempool_server}, + mempool::uo_pool::UoPool, + server::{spawn_remote_mempool_server, LocalPoolBuilder}, }, }; #[derive(Debug)] pub enum PoolServerMode { - Local { - req_receiver: Option>, - block_sender: Option>, - }, - Remote { - addr: SocketAddr, - }, + Local, + Remote { addr: SocketAddr }, } #[derive(Debug)] @@ -58,11 +47,12 @@ pub struct Args { pub struct PoolTask { args: Args, event_sender: broadcast::Sender>, + pool_builder: LocalPoolBuilder, } #[async_trait] impl Task for PoolTask { - async fn run(&mut self, shutdown_token: CancellationToken) -> anyhow::Result<()> { + async fn run(mut self: Box, shutdown_token: CancellationToken) -> anyhow::Result<()> { let chain_id = self.args.chain_id; tracing::info!("Chain id: {chain_id}"); tracing::info!("Http url: {:?}", self.args.http_url); @@ -96,47 +86,27 @@ impl Task for PoolTask { let provider = Arc::new(Provider::new(client)); // create mempools - let mut mempools = vec![]; + let mut mempools = HashMap::new(); for pool_config in &self.args.pool_configs { let pool = PoolTask::create_mempool(pool_config, self.event_sender.clone(), provider.clone()) .await .context("should have created mempool")?; - mempools.push(pool); + mempools.insert(pool_config.entry_point, Arc::new(pool)); } - let mempool_group = Arc::new(MempoolGroup::new(mempools)); - let mempool_group_runner = Arc::clone(&mempool_group); - let mempool_shutdown = shutdown_token.clone(); - // handle to wait for mempool group to terminate - let mempool_handle = tokio::spawn(async move { - mempool_group_runner - .run(update_sender.subscribe(), mempool_shutdown) - .await; - Ok(()) - }); - - let server_handle = match &mut self.args.server_mode { - PoolServerMode::Local { - ref mut req_receiver, - ref mut block_sender, - } => { - let req_receiver = req_receiver - .take() - .context("should have local server message receiver")?; - let block_sender = block_sender.take().context("should have block sender")?; - spawn_local_mempool_server( - Arc::clone(&mempool_group), - req_receiver, - block_sender, - shutdown_token.clone(), - )? - } + let pool_handle = self.pool_builder.get_handle(); + let pool_runnder_handle = + self.pool_builder + .run(mempools, update_sender.subscribe(), shutdown_token.clone()); + + let remote_handle = match &mut self.args.server_mode { + PoolServerMode::Local => tokio::spawn(async { Ok(()) }), PoolServerMode::Remote { addr } => { spawn_remote_mempool_server( self.args.chain_id, - Arc::clone(&mempool_group), + pool_handle, *addr, shutdown_token.clone(), ) @@ -147,8 +117,8 @@ impl Task for PoolTask { tracing::info!("Started op_pool"); match try_join!( - handle::flatten_handle(mempool_handle), - handle::flatten_handle(server_handle), + handle::flatten_handle(pool_runnder_handle), + handle::flatten_handle(remote_handle), handle::as_anyhow_handle(chain_handle), ) { Ok(_) => { @@ -167,8 +137,13 @@ impl PoolTask { pub fn new( args: Args, event_sender: broadcast::Sender>, + pool_builder: LocalPoolBuilder, ) -> PoolTask { - Self { args, event_sender } + Self { + args, + event_sender, + pool_builder, + } } pub fn boxed(self) -> Box { diff --git a/src/rpc/debug.rs b/src/rpc/debug.rs index 615cf2f96..202a9ce17 100644 --- a/src/rpc/debug.rs +++ b/src/rpc/debug.rs @@ -14,7 +14,7 @@ use crate::{ }, types::BundlingMode, }, - op_pool::PoolClient, + op_pool::PoolServer, }; /// Debug API @@ -44,14 +44,14 @@ pub trait DebugApi { } pub struct DebugApi

{ - op_pool_client: P, + pool: P, builder_client: builder_client::BuilderClient, } impl

DebugApi

{ - pub fn new(op_pool_client: P, builder_client: builder_client::BuilderClient) -> Self { + pub fn new(pool: P, builder_client: builder_client::BuilderClient) -> Self { Self { - op_pool_client, + pool, builder_client, } } @@ -60,11 +60,11 @@ impl

DebugApi

{ #[async_trait] impl

DebugApiServer for DebugApi

where - P: PoolClient, + P: PoolServer, { async fn bundler_clear_state(&self) -> RpcResult { let _ = self - .op_pool_client + .pool .debug_clear_state() .await .map_err(|e| RpcError::Custom(e.to_string()))?; @@ -74,7 +74,7 @@ where async fn bundler_dump_mempool(&self, entry_point: Address) -> RpcResult> { Ok(self - .op_pool_client + .pool .debug_dump_mempool(entry_point) .await .map_err(|e| RpcError::Custom(e.to_string()))? @@ -112,7 +112,7 @@ where entry_point: Address, ) -> RpcResult { let _ = self - .op_pool_client + .pool .debug_set_reputations( entry_point, reputations.into_iter().map(Into::into).collect(), @@ -124,7 +124,7 @@ where async fn bundler_dump_reputation(&self, entry_point: Address) -> RpcResult> { let result = self - .op_pool_client + .pool .debug_dump_reputation(entry_point) .await .map_err(|e| RpcError::Custom(e.to_string()))?; diff --git a/src/rpc/eth/estimation.rs b/src/rpc/eth/estimation.rs index d86b24f03..f01c6275a 100644 --- a/src/rpc/eth/estimation.rs +++ b/src/rpc/eth/estimation.rs @@ -430,7 +430,7 @@ mod tests { }; let estimator: GasEstimatorImpl = - GasEstimatorImpl::new(0, Arc::new(provider), entry, settings.clone()); + GasEstimatorImpl::new(0, Arc::new(provider), entry, settings); (estimator, settings) } @@ -524,12 +524,7 @@ mod tests { // Chose arbitrum let estimator: GasEstimatorImpl = - GasEstimatorImpl::new( - Chain::Arbitrum as u64, - Arc::new(provider), - entry, - settings.clone(), - ); + GasEstimatorImpl::new(Chain::Arbitrum as u64, Arc::new(provider), entry, settings); let user_op = demo_user_op_optional_gas(); let estimation = estimator.calc_pre_verification_gas(&user_op).await.unwrap(); @@ -572,12 +567,7 @@ mod tests { // Chose OP let estimator: GasEstimatorImpl = - GasEstimatorImpl::new( - Chain::Optimism as u64, - Arc::new(provider), - entry, - settings.clone(), - ); + GasEstimatorImpl::new(Chain::Optimism as u64, Arc::new(provider), entry, settings); let user_op = demo_user_op_optional_gas(); let estimation = estimator.calc_pre_verification_gas(&user_op).await.unwrap(); @@ -706,7 +696,7 @@ mod tests { // check for this overlflow provider.expect_call().returning(|_a, _b| { let result_data: Bytes = GasUsedResult { - gas_used: U256::from(18446744073709551616 as u128), + gas_used: U256::from(18446744073709551616_u128), success: false, result: Bytes::new(), } @@ -797,7 +787,7 @@ mod tests { .binary_search_verification_gas(&user_op, H256::zero()) .await; - assert_eq!(estimation.is_err(), true); + assert!(estimation.is_err()); } #[tokio::test] @@ -850,7 +840,7 @@ mod tests { .binary_search_verification_gas(&user_op, H256::zero()) .await; - assert_eq!(estimation.is_err(), true); + assert!(estimation.is_err()); } #[tokio::test] @@ -902,7 +892,7 @@ mod tests { .binary_search_verification_gas(&user_op, H256::zero()) .await; - assert_eq!(estimation.is_err(), true); + assert!(estimation.is_err()); } #[tokio::test] @@ -947,7 +937,7 @@ mod tests { .binary_search_verification_gas(&user_op, H256::zero()) .await; - assert_eq!(estimation.is_err(), true); + assert!(estimation.is_err()); } #[tokio::test] @@ -1020,10 +1010,10 @@ mod tests { .err() .unwrap(); - assert_eq!( - matches!(estimation, GasEstimationError::RevertInCallWithBytes(_)), - true - ); + assert!(matches!( + estimation, + GasEstimationError::RevertInCallWithBytes(_) + )); } #[tokio::test] diff --git a/src/rpc/eth/mod.rs b/src/rpc/eth/mod.rs index cacaeeadc..adf36cbd7 100644 --- a/src/rpc/eth/mod.rs +++ b/src/rpc/eth/mod.rs @@ -32,7 +32,7 @@ use crate::{ eth::log_to_raw_log, types::{UserOperation, BASE_CHAIN_IDS}, }, - op_pool::PoolClient, + op_pool::PoolServer, rpc::{ estimation::{GasEstimationError, GasEstimator, GasEstimatorImpl}, GasEstimate, RichUserOperation, RpcUserOperation, UserOperationOptionalGas, @@ -97,24 +97,24 @@ where } #[derive(Debug)] -pub struct EthApi { +pub struct EthApi { contexts_by_entry_point: HashMap>, provider: Arc>, chain_id: u64, - op_pool_client: P, + pool: P, } impl EthApi where C: JsonRpcClient + 'static, - P: PoolClient, + P: PoolServer, { #[allow(clippy::too_many_arguments)] pub fn new( provider: Arc>, entry_points: Vec

, chain_id: u64, - op_pool_client: P, + pool: P, estimation_settings: estimation::Settings, ) -> Self { let contexts_by_entry_point = entry_points @@ -131,7 +131,7 @@ where contexts_by_entry_point, provider, chain_id, - op_pool_client, + pool, } } @@ -311,7 +311,7 @@ where impl EthApiServer for EthApi where C: JsonRpcClient + 'static, - P: PoolClient, + P: PoolServer, { async fn send_user_operation( &self, @@ -324,7 +324,7 @@ where ))?; } Ok(self - .op_pool_client + .pool .add_op(entry_point, op.into()) .await .map_err(EthRpcError::from) @@ -506,7 +506,7 @@ mod tests { }; use super::*; - use crate::op_pool::MockPoolClient; + use crate::op_pool::MockPoolServer; const UO_OP_TOPIC: &str = "user-op-event-topic"; @@ -520,7 +520,7 @@ mod tests { given_log(UO_OP_TOPIC, "another-hash"), ]); - let result = EthApi::::filter_receipt_logs_matching_user_op( + let result = EthApi::::filter_receipt_logs_matching_user_op( &reference_log, &receipt, ); @@ -542,7 +542,7 @@ mod tests { given_log(UO_OP_TOPIC, "another-hash"), ]); - let result = EthApi::::filter_receipt_logs_matching_user_op( + let result = EthApi::::filter_receipt_logs_matching_user_op( &reference_log, &receipt, ); @@ -564,7 +564,7 @@ mod tests { reference_log.clone(), ]); - let result = EthApi::::filter_receipt_logs_matching_user_op( + let result = EthApi::::filter_receipt_logs_matching_user_op( &reference_log, &receipt, ); @@ -590,7 +590,7 @@ mod tests { reference_log.clone(), ]); - let result = EthApi::::filter_receipt_logs_matching_user_op( + let result = EthApi::::filter_receipt_logs_matching_user_op( &reference_log, &receipt, ); @@ -615,7 +615,7 @@ mod tests { given_log(UO_OP_TOPIC, "other-hash"), ]); - let result = EthApi::::filter_receipt_logs_matching_user_op( + let result = EthApi::::filter_receipt_logs_matching_user_op( &reference_log, &receipt, ); @@ -636,7 +636,7 @@ mod tests { given_log("another-topic-2", "some-hash"), ]); - let result = EthApi::::filter_receipt_logs_matching_user_op( + let result = EthApi::::filter_receipt_logs_matching_user_op( &reference_log, &receipt, ); diff --git a/src/rpc/health.rs b/src/rpc/health.rs index 86be3a946..100af6842 100644 --- a/src/rpc/health.rs +++ b/src/rpc/health.rs @@ -1,9 +1,7 @@ -use anyhow::Context; -use jsonrpsee::{core::RpcResult, proc_macros::rpc}; -use tonic::{async_trait, transport::Channel}; -use tonic_health::pb::{health_client::HealthClient, HealthCheckRequest}; +use jsonrpsee::{core::RpcResult, proc_macros::rpc, types::error::CallError}; +use tonic::async_trait; -use crate::op_pool::{LocalPoolClient, PoolClient}; +use crate::common::server::{HealthCheck, ServerStatus}; #[rpc(server, namespace = "system")] pub trait SystemApi { @@ -11,60 +9,32 @@ pub trait SystemApi { async fn get_health(&self) -> RpcResult; } -pub struct RemoteHealthCheck { - op_pool_health_client: HealthClient, - builder_health_client: HealthClient, +pub struct HealthChecker { + servers: Vec>, } -impl RemoteHealthCheck { - pub fn new( - op_pool_health_client: HealthClient, - builder_health_client: HealthClient, - ) -> Self { - Self { - op_pool_health_client, - builder_health_client, - } - } -} - -#[async_trait] -impl SystemApiServer for RemoteHealthCheck { - async fn get_health(&self) -> RpcResult { - self.op_pool_health_client - .clone() - .check(HealthCheckRequest::default()) - .await - .context("Op pool server should be live")?; - - self.builder_health_client - .clone() - .check(HealthCheckRequest::default()) - .await - .context("Builder server should be live")?; - - Ok("ok".to_string()) - } -} - -pub struct LocalHealthCheck { - pool_client: LocalPoolClient, -} - -impl LocalHealthCheck { - pub fn new(pool_client: LocalPoolClient) -> Self { - Self { pool_client } +impl HealthChecker { + pub fn new(servers: Vec>) -> Self { + Self { servers } } } #[async_trait] -impl SystemApiServer for LocalHealthCheck { +impl SystemApiServer for HealthChecker { async fn get_health(&self) -> RpcResult { - self.pool_client - .get_supported_entry_points() - .await - .context("Op pool server should be live")?; - - Ok("ok".to_string()) + let mut errors = Vec::new(); + for server in &self.servers { + match server.status().await { + ServerStatus::Serving => {} + ServerStatus::NotServing => errors.push(server.name()), + } + } + if errors.is_empty() { + Ok("ok".to_owned()) + } else { + Err(jsonrpsee::core::Error::Call(CallError::Failed( + anyhow::anyhow!("Health check failed: {:?}", errors), + ))) + } } } diff --git a/src/rpc/task.rs b/src/rpc/task.rs index 2f337831c..cec021d20 100644 --- a/src/rpc/task.rs +++ b/src/rpc/task.rs @@ -1,4 +1,4 @@ -use std::{net::SocketAddr, str::FromStr, sync::Arc, time::Duration}; +use std::{net::SocketAddr, sync::Arc, time::Duration}; use anyhow::{bail, Context}; use ethers::{ @@ -11,11 +11,7 @@ use jsonrpsee::{ }; use tokio::select; use tokio_util::sync::CancellationToken; -use tonic::{ - async_trait, - transport::{Channel, Uri}, -}; -use tonic_health::pb::health_client::HealthClient; +use tonic::{async_trait, transport::Channel}; use tracing::info; use url::Url; @@ -25,13 +21,13 @@ use crate::{ handle::Task, precheck, protos::builder::builder_client::BuilderClient, - server::{self, format_socket_addr}, + server::{self, format_socket_addr, HealthCheck}, }, - op_pool::{connect_remote_pool_client, LocalPoolClient, PoolClient, PoolClientMode}, + op_pool::PoolServer, rpc::{ debug::{DebugApi, DebugApiServer}, eth::{estimation, EthApi, EthApiServer}, - health::{LocalHealthCheck, RemoteHealthCheck, SystemApiServer}, + health::{HealthChecker, SystemApiServer}, metrics::RpcMetricsLogger, rundler::{RundlerApi, RundlerApiServer}, }, @@ -50,17 +46,20 @@ pub struct Args { pub estimation_settings: estimation::Settings, pub rpc_timeout: Duration, pub max_connections: u32, - pub pool_client_mode: PoolClientMode, } #[derive(Debug)] -pub struct RpcTask { +pub struct RpcTask

{ args: Args, + pool: P, } #[async_trait] -impl Task for RpcTask { - async fn run(&mut self, shutdown_token: CancellationToken) -> anyhow::Result<()> { +impl

Task for RpcTask

+where + P: PoolServer + HealthCheck + Clone, +{ + async fn run(mut self: Box, shutdown_token: CancellationToken) -> anyhow::Result<()> { let addr: SocketAddr = format_socket_addr(&self.args.host, self.args.port).parse()?; tracing::info!("Starting rpc server on {}", addr); @@ -89,44 +88,12 @@ impl Task for RpcTask { info!("Connected to builder service at {}", self.args.builder_url); let mut module = RpcModule::new(()); - match &self.args.pool_client_mode { - PoolClientMode::Local { - req_sender, - block_receiver, - } => { - let pool_client = - LocalPoolClient::new(req_sender.clone(), block_receiver.resubscribe()); - self.attach_namespaces(provider, pool_client.clone(), builder_client, &mut module)?; - - module.merge(LocalHealthCheck::new(pool_client).into_rpc())?; - } - PoolClientMode::Remote { url } => { - let pool_client = connect_remote_pool_client(url, shutdown_token.clone()).await?; - info!("Connected to op_pool service at {}", url); - self.attach_namespaces(provider, pool_client, builder_client, &mut module)?; - - let builder_uri = Uri::from_str(&self.args.builder_url) - .context("should be a valid URI for op_pool")?; - let op_pool_uri = - Uri::from_str(url).context("should be a valid URI for op_pool")?; - - let op_pool_health_client = HealthClient::new( - Channel::builder(op_pool_uri) - .connect() - .await - .context("should have connected to op_pool health service channel")?, - ); - let builder_health_client = HealthClient::new( - Channel::builder(builder_uri) - .connect() - .await - .context("should have connected to builder health service channel")?, - ); - module.merge( - RemoteHealthCheck::new(op_pool_health_client, builder_health_client).into_rpc(), - )?; - } - } + self.attach_namespaces(provider, builder_client, &mut module)?; + + // TODO(danc): builder health check + let servers: Vec> = vec![Box::new(self.pool.clone())]; + let health_checker = HealthChecker::new(servers); + module.merge(health_checker.into_rpc())?; // Set up health check endpoint via GET /health registers the jsonrpc handler let service_builder = tower::ServiceBuilder::new() @@ -158,9 +125,12 @@ impl Task for RpcTask { } } -impl RpcTask { - pub fn new(args: Args) -> RpcTask { - Self { args } +impl

RpcTask

+where + P: PoolServer + HealthCheck + Clone, +{ + pub fn new(args: Args, pool: P) -> Self { + Self { args, pool } } pub fn boxed(self) -> Box { @@ -176,16 +146,17 @@ impl RpcTask { tracing::error!("bailing from conneting client, server shutting down"); bail!("Server shutting down") } - res = server::connect_with_retries("builder from common", url, BuilderClient::connect) => { + res = server::connect_with_retries("builder from common", url, |url| Box::pin(async move { + BuilderClient::connect(url).await.context("should connect to builder") + })) => { res.context("should connect to builder") } } } - fn attach_namespaces( + fn attach_namespaces( &self, provider: Arc>>, - pool_client: C, builder_client: BuilderClient, module: &mut RpcModule<()>, ) -> anyhow::Result<()> { @@ -196,13 +167,13 @@ impl RpcTask { provider.clone(), self.args.entry_points.clone(), self.args.chain_id, - pool_client.clone(), + self.pool.clone(), self.args.estimation_settings, ) .into_rpc(), )?, ApiNamespace::Debug => module - .merge(DebugApi::new(pool_client.clone(), builder_client.clone()).into_rpc())?, + .merge(DebugApi::new(self.pool.clone(), builder_client.clone()).into_rpc())?, ApiNamespace::Rundler => module.merge( RundlerApi::new( provider.clone(),