From 49dd555936fe255b1cf15c320ff4d210aff44920 Mon Sep 17 00:00:00 2001 From: dancoombs Date: Thu, 24 Aug 2023 15:24:33 -0500 Subject: [PATCH] refactor: rework the pool server classes --- src/builder/bundle_proposer.rs | 20 +- src/builder/bundle_sender.rs | 28 +- src/builder/task.rs | 105 ++--- src/cli/builder.rs | 25 +- src/cli/node/mod.rs | 38 +- src/cli/pool.rs | 4 +- src/cli/rpc.rs | 21 +- src/common/handle.rs | 4 +- src/common/server.rs | 39 +- src/op_pool/mempool/mod.rs | 135 +----- src/op_pool/mod.rs | 5 +- src/op_pool/server/local.rs | 612 ++++++++++++++++++++++++++++ src/op_pool/server/local/mod.rs | 5 - src/op_pool/server/mod.rs | 40 +- src/op_pool/server/remote/client.rs | 71 ++-- src/op_pool/server/remote/protos.rs | 9 + src/op_pool/server/remote/server.rs | 114 +++--- src/op_pool/task.rs | 81 ++-- src/rpc/debug.rs | 18 +- src/rpc/eth/mod.rs | 30 +- src/rpc/health.rs | 76 ++-- src/rpc/task.rs | 89 ++-- 22 files changed, 982 insertions(+), 587 deletions(-) create mode 100644 src/op_pool/server/local.rs delete mode 100644 src/op_pool/server/local/mod.rs diff --git a/src/builder/bundle_proposer.rs b/src/builder/bundle_proposer.rs index 2172653a6..bc91b5eb5 100644 --- a/src/builder/bundle_proposer.rs +++ b/src/builder/bundle_proposer.rs @@ -28,7 +28,7 @@ use crate::{ Timestamp, UserOperation, }, }, - op_pool::{PoolClient, PoolOperation}, + op_pool::{PoolOperation, PoolServer}, }; /// A user op must be valid for at least this long into the future to be included. @@ -76,9 +76,9 @@ where S: Simulator, E: EntryPointLike, P: ProviderLike, - C: PoolClient, + C: PoolServer, { - op_pool: C, + pool: C, simulator: S, entry_point: E, provider: Arc

, @@ -104,7 +104,7 @@ where S: Simulator, E: EntryPointLike, P: ProviderLike, - C: PoolClient, + C: PoolServer, { async fn make_bundle(&self, required_fees: Option) -> anyhow::Result { let (ops, block_hash, bundle_fees) = try_join!( @@ -180,10 +180,10 @@ where S: Simulator, E: EntryPointLike, P: ProviderLike, - C: PoolClient, + C: PoolServer, { pub fn new( - op_pool: C, + pool: C, simulator: S, entry_point: E, provider: Arc

, @@ -192,7 +192,7 @@ where event_sender: broadcast::Sender>, ) -> Self { Self { - op_pool, + pool, simulator, entry_point, provider: provider.clone(), @@ -398,7 +398,7 @@ where } async fn get_ops_from_pool(&self) -> anyhow::Result> { - self.op_pool + self.pool .get_ops(self.entry_point.address(), self.settings.max_bundle_size) .await .context("should get ops from pool") @@ -698,7 +698,7 @@ mod tests { }, types::{MockEntryPointLike, MockProviderLike, ValidTimeRange}, }, - op_pool::MockPoolClient, + op_pool::MockPoolServer, }; #[tokio::test] @@ -1174,7 +1174,7 @@ mod tests { }) .collect(); - let mut pool_client = MockPoolClient::new(); + let mut pool_client = MockPoolServer::new(); pool_client .expect_get_ops() .returning(move |_, _| Ok(ops.clone())); diff --git a/src/builder/bundle_sender.rs b/src/builder/bundle_sender.rs index 3ed81dc7f..a015c66bb 100644 --- a/src/builder/bundle_sender.rs +++ b/src/builder/bundle_sender.rs @@ -8,12 +8,12 @@ use std::{ use anyhow::{bail, Context}; use ethers::types::{transaction::eip2718::TypedTransaction, Address, H256, U256}; +use futures_util::StreamExt; use tokio::{ join, sync::{broadcast, mpsc, oneshot}, time, }; -use tokio_stream::StreamExt; use tonic::async_trait; use tracing::{error, info, trace, warn}; @@ -29,7 +29,7 @@ use crate::{ math, types::{Entity, EntryPointLike, ExpectedStorage, UserOperation}, }, - op_pool::PoolClient, + op_pool::PoolServer, }; // Overhead on gas estimates to account for inaccuracies. @@ -52,7 +52,7 @@ where P: BundleProposer, E: EntryPointLike, T: TransactionTracker, - C: PoolClient, + C: PoolServer, { manual_bundling_mode: Arc, send_bundle_receiver: mpsc::Receiver, @@ -62,7 +62,7 @@ where proposer: P, entry_point: E, transaction_tracker: T, - pool_client: C, + pool: C, settings: Settings, event_sender: broadcast::Sender>, } @@ -100,15 +100,13 @@ where P: BundleProposer, E: EntryPointLike, T: TransactionTracker, - C: PoolClient, + C: PoolServer, { /// Loops forever, attempting to form and send a bundle on each new block, /// then waiting for one bundle to be mined or dropped before forming the /// next one. async fn send_bundles_in_loop(&mut self) { - let mut new_heads = if let Ok(new_blocks) = self.pool_client.subscribe_new_heads() { - new_blocks - } else { + let Ok(mut new_heads) = self.pool.subscribe_new_heads().await else { error!("Failed to subscribe to new blocks"); return; }; @@ -116,12 +114,12 @@ where // The new_heads stream can buffer up multiple blocks, but we only want to consume the latest one. // This task is used to consume the new heads and place them onto a channel that can be syncronously // consumed until the latest block is reached. - let (tx, mut rx) = mpsc::channel(1024); + let (tx, mut rx) = mpsc::unbounded_channel(); tokio::spawn(async move { loop { match new_heads.next().await { Some(b) => { - if tx.send(b).await.is_err() { + if tx.send(b).is_err() { error!("Failed to buffer new block for bundle sender"); return; } @@ -213,7 +211,7 @@ where P: BundleProposer, E: EntryPointLike, T: TransactionTracker, - C: PoolClient, + C: PoolServer, { #[allow(clippy::too_many_arguments)] pub fn new( @@ -225,7 +223,7 @@ where proposer: P, entry_point: E, transaction_tracker: T, - pool_client: C, + pool: C, settings: Settings, event_sender: broadcast::Sender>, ) -> Self { @@ -238,7 +236,7 @@ where proposer, entry_point, transaction_tracker, - pool_client, + pool, settings, event_sender, } @@ -477,7 +475,7 @@ where } async fn remove_ops_from_pool(&self, ops: &[UserOperation]) -> anyhow::Result<()> { - self.pool_client + self.pool .remove_ops( self.entry_point.address(), ops.iter() @@ -489,7 +487,7 @@ where } async fn remove_entities_from_pool(&self, entities: &[Entity]) -> anyhow::Result<()> { - self.pool_client + self.pool .remove_entities(self.entry_point.address(), entities.to_vec()) .await .context("builder should remove rejected entities from pool") diff --git a/src/builder/task.rs b/src/builder/task.rs index 9719d6bca..4ee798b0e 100644 --- a/src/builder/task.rs +++ b/src/builder/task.rs @@ -37,7 +37,7 @@ use crate::{ server::format_socket_addr, simulation::{self, SimulatorImpl}, }, - op_pool::{connect_remote_pool_client, LocalPoolClient, PoolClientMode}, + op_pool::PoolServer, }; #[derive(Debug)] @@ -64,18 +64,21 @@ pub struct Args { pub max_blocks_to_wait_for_mine: u64, pub replacement_fee_percent_increase: u64, pub max_fee_increases: u64, - pub pool_client_mode: PoolClientMode, } #[derive(Debug)] -pub struct BuilderTask { +pub struct BuilderTask

{ args: Args, event_sender: broadcast::Sender>, + pool: P, } #[async_trait] -impl Task for BuilderTask { - async fn run(&mut self, shutdown_token: CancellationToken) -> anyhow::Result<()> { +impl

Task for BuilderTask

+where + P: PoolServer + Clone, +{ + async fn run(mut self: Box, shutdown_token: CancellationToken) -> anyhow::Result<()> { let addr = format_socket_addr(&self.args.host, self.args.port).parse()?; info!("Starting builder server on {}", addr); tracing::info!("Mempool config: {:?}", self.args.mempool_configs); @@ -153,62 +156,28 @@ impl Task for BuilderTask { let manual_bundling_mode = Arc::new(AtomicBool::new(false)); let (send_bundle_tx, send_bundle_rx) = mpsc::channel(1); - let mut builder: Box = match &self.args.pool_client_mode { - PoolClientMode::Local { - req_sender, - new_heads_receiver, - } => { - let pool_client = - LocalPoolClient::new(req_sender.clone(), new_heads_receiver.resubscribe()); - let proposer = BundleProposerImpl::new( - pool_client.clone(), - simulator, - entry_point.clone(), - Arc::clone(&provider), - self.args.chain_id, - proposer_settings, - self.event_sender.clone(), - ); - Box::new(BundleSenderImpl::new( - manual_bundling_mode.clone(), - send_bundle_rx, - self.args.chain_id, - beneficiary, - self.args.eth_poll_interval, - proposer, - entry_point, - transaction_tracker, - pool_client, - builder_settings, - self.event_sender.clone(), - )) - } - PoolClientMode::Remote { url } => { - let pool_client = connect_remote_pool_client(url, shutdown_token.clone()).await?; - let proposer = BundleProposerImpl::new( - pool_client.clone(), - simulator, - entry_point.clone(), - Arc::clone(&provider), - self.args.chain_id, - proposer_settings, - self.event_sender.clone(), - ); - Box::new(BundleSenderImpl::new( - manual_bundling_mode.clone(), - send_bundle_rx, - self.args.chain_id, - beneficiary, - self.args.eth_poll_interval, - proposer, - entry_point, - transaction_tracker, - pool_client, - builder_settings, - self.event_sender.clone(), - )) - } - }; + let proposer = BundleProposerImpl::new( + self.pool.clone(), + simulator, + entry_point.clone(), + Arc::clone(&provider), + self.args.chain_id, + proposer_settings, + self.event_sender.clone(), + ); + let mut builder = BundleSenderImpl::new( + manual_bundling_mode.clone(), + send_bundle_rx, + self.args.chain_id, + beneficiary, + self.args.eth_poll_interval, + proposer, + entry_point, + transaction_tracker, + self.pool, + builder_settings, + self.event_sender.clone(), + ); let _builder_loop_guard = { SpawnGuard::spawn_with_guard(async move { builder.send_bundles_in_loop().await }) }; @@ -248,12 +217,20 @@ impl Task for BuilderTask { } } -impl BuilderTask { +impl

BuilderTask

+where + P: PoolServer + Clone, +{ pub fn new( args: Args, event_sender: broadcast::Sender>, - ) -> BuilderTask { - Self { args, event_sender } + pool: P, + ) -> Self { + Self { + args, + event_sender, + pool, + } } pub fn boxed(self) -> Box { diff --git a/src/cli/builder.rs b/src/cli/builder.rs index cfb3f13d9..cb1e6d03a 100644 --- a/src/cli/builder.rs +++ b/src/cli/builder.rs @@ -13,9 +13,9 @@ use crate::{ gas::PriorityFeeMode, handle::spawn_tasks_with_shutdown, mempool::MempoolConfig, - server::format_server_addr, + server::{connect_with_retries_shutdown, format_server_addr}, }, - op_pool::PoolClientMode, + op_pool::RemotePoolClient, }; /// CLI options for the builder @@ -150,11 +150,7 @@ pub struct BuilderArgs { impl BuilderArgs { /// Convert the CLI arguments into the arguments for the builder combining /// common and builder specific arguments. - pub async fn to_args( - &self, - common: &CommonArgs, - pool_client_mode: PoolClientMode, - ) -> anyhow::Result { + pub async fn to_args(&self, common: &CommonArgs) -> anyhow::Result { let priority_fee_mode = PriorityFeeMode::try_from( common.priority_fee_mode_kind.as_str(), common.priority_fee_mode_value, @@ -204,7 +200,6 @@ impl BuilderArgs { max_blocks_to_wait_for_mine: self.max_blocks_to_wait_for_mine, replacement_fee_percent_increase: self.replacement_fee_percent_increase, max_fee_increases: self.max_fee_increases, - pool_client_mode, }) } @@ -238,12 +233,18 @@ pub async fn run(builder_args: BuilderCliArgs, common_args: CommonArgs) -> anyho let (event_sender, event_rx) = broadcast::channel(EVENT_CHANNEL_CAPACITY); emit::receive_and_log_events_with_filter(event_rx, is_nonspammy_event); - let task_args = builder_args - .to_args(&common_args, PoolClientMode::Remote { url: pool_url }) - .await?; + let task_args = builder_args.to_args(&common_args).await?; + + let pool = connect_with_retries_shutdown( + "op pool from builder", + &pool_url, + RemotePoolClient::connect, + tokio::signal::ctrl_c(), + ) + .await?; spawn_tasks_with_shutdown( - [BuilderTask::new(task_args, event_sender).boxed()], + [BuilderTask::new(task_args, event_sender, pool).boxed()], tokio::signal::ctrl_c(), ) .await; diff --git a/src/cli/node/mod.rs b/src/cli/node/mod.rs index 452d59878..73e30f027 100644 --- a/src/cli/node/mod.rs +++ b/src/cli/node/mod.rs @@ -1,5 +1,5 @@ use clap::Args; -use tokio::sync::{broadcast, mpsc}; +use tokio::sync::broadcast; use self::events::Event; use crate::{ @@ -14,7 +14,7 @@ use crate::{ emit::{self, WithEntryPoint, EVENT_CHANNEL_CAPACITY}, handle, }, - op_pool::{emit::OpPoolEvent, PoolClientMode, PoolServerMode, PoolTask}, + op_pool::{emit::OpPoolEvent, LocalPoolBuilder, PoolServerMode, PoolTask}, rpc::RpcTask, }; mod events; @@ -40,37 +40,16 @@ pub async fn run(bundler_args: NodeCliArgs, common_args: CommonArgs) -> anyhow:: let builder_url = builder_args.url(false); - let (tx, rx) = mpsc::channel(1024); - let (new_heads_sender, _) = broadcast::channel(1024); - let pool_task_args = pool_args - .to_args( - &common_args, - PoolServerMode::Local { - req_receiver: Some(rx), - new_heads_sender: Some(new_heads_sender.clone()), - }, - ) - .await?; - let builder_task_args = builder_args - .to_args( - &common_args, - PoolClientMode::Local { - req_sender: tx.clone(), - new_heads_receiver: new_heads_sender.subscribe(), - }, - ) + .to_args(&common_args, PoolServerMode::Local) .await?; + let builder_task_args = builder_args.to_args(&common_args).await?; let rpc_task_args = rpc_args .to_args( &common_args, builder_url, (&common_args).try_into()?, (&common_args).try_into()?, - PoolClientMode::Local { - req_sender: tx, - new_heads_receiver: new_heads_sender.subscribe(), - }, ) .await?; @@ -97,11 +76,14 @@ pub async fn run(bundler_args: NodeCliArgs, common_args: CommonArgs) -> anyhow:: } }); + let pool_builder = LocalPoolBuilder::new(1024, 1024); + let pool_handle = pool_builder.get_handle(); + handle::spawn_tasks_with_shutdown( [ - PoolTask::new(pool_task_args, op_pool_event_sender).boxed(), - BuilderTask::new(builder_task_args, builder_event_sender).boxed(), - RpcTask::new(rpc_task_args).boxed(), + PoolTask::new(pool_task_args, op_pool_event_sender, pool_builder).boxed(), + BuilderTask::new(builder_task_args, builder_event_sender, pool_handle.clone()).boxed(), + RpcTask::new(rpc_task_args, pool_handle).boxed(), ], tokio::signal::ctrl_c(), ) diff --git a/src/cli/pool.rs b/src/cli/pool.rs index 3b14cb546..7a85e510a 100644 --- a/src/cli/pool.rs +++ b/src/cli/pool.rs @@ -13,7 +13,7 @@ use crate::{ handle::spawn_tasks_with_shutdown, mempool::MempoolConfig, }, - op_pool::{self, PoolConfig, PoolServerMode, PoolTask}, + op_pool::{self, LocalPoolBuilder, PoolConfig, PoolServerMode, PoolTask}, }; /// CLI options for the OP Pool #[derive(Args, Debug)] @@ -198,7 +198,7 @@ pub async fn run(pool_args: PoolCliArgs, common_args: CommonArgs) -> anyhow::Res emit::receive_and_log_events_with_filter(event_rx, |_| true); spawn_tasks_with_shutdown( - [PoolTask::new(task_args, event_sender).boxed()], + [PoolTask::new(task_args, event_sender, LocalPoolBuilder::new(1024, 1024)).boxed()], tokio::signal::ctrl_c(), ) .await; diff --git a/src/cli/rpc.rs b/src/cli/rpc.rs index 5609aa644..e59ea335b 100644 --- a/src/cli/rpc.rs +++ b/src/cli/rpc.rs @@ -5,8 +5,8 @@ use clap::Args; use super::CommonArgs; use crate::{ - common::{handle::spawn_tasks_with_shutdown, precheck}, - op_pool::PoolClientMode, + common::{handle::spawn_tasks_with_shutdown, precheck, server::connect_with_retries_shutdown}, + op_pool::RemotePoolClient, rpc::{self, estimation, RpcTask}, }; @@ -71,7 +71,6 @@ impl RpcArgs { builder_url: String, precheck_settings: precheck::Settings, estimation_settings: estimation::Settings, - pool_client_mode: PoolClientMode, ) -> anyhow::Result { let apis = self .api @@ -99,7 +98,6 @@ impl RpcArgs { estimation_settings, rpc_timeout: Duration::from_secs(self.timeout_seconds.parse()?), max_connections: self.max_connections, - pool_client_mode, }) } } @@ -142,10 +140,21 @@ pub async fn run(rpc_args: RpcCliArgs, common_args: CommonArgs) -> anyhow::Resul builder_url, (&common_args).try_into()?, (&common_args).try_into()?, - PoolClientMode::Remote { url: pool_url }, ) .await?; - spawn_tasks_with_shutdown([RpcTask::new(task_args).boxed()], tokio::signal::ctrl_c()).await; + let pool = connect_with_retries_shutdown( + "op pool from rpc", + &pool_url, + RemotePoolClient::connect, + tokio::signal::ctrl_c(), + ) + .await?; + + spawn_tasks_with_shutdown( + [RpcTask::new(task_args, pool).boxed()], + tokio::signal::ctrl_c(), + ) + .await; Ok(()) } diff --git a/src/common/handle.rs b/src/common/handle.rs index ec4cb1367..eb6196335 100644 --- a/src/common/handle.rs +++ b/src/common/handle.rs @@ -49,7 +49,7 @@ impl Drop for SpawnGuard { #[async_trait] pub trait Task: Sync + Send + 'static { - async fn run(&mut self, shutdown_token: CancellationToken) -> anyhow::Result<()>; + async fn run(self: Box, shutdown_token: CancellationToken) -> anyhow::Result<()>; } pub async fn spawn_tasks_with_shutdown( @@ -63,7 +63,7 @@ pub async fn spawn_tasks_with_shutdown( let shutdown_token = CancellationToken::new(); let mut shutdown_scope = Some(shutdown_scope); - let handles = tasks.into_iter().map(|mut task| { + let handles = tasks.into_iter().map(|task| { let st = shutdown_token.clone(); let ss = shutdown_scope.clone(); async move { diff --git a/src/common/server.rs b/src/common/server.rs index f5bc64eeb..f4aa482de 100644 --- a/src/common/server.rs +++ b/src/common/server.rs @@ -1,6 +1,7 @@ use std::{future::Future, time::Duration}; -use anyhow::Context; +use anyhow::{bail, Context}; +use tonic::async_trait; use crate::common::{retry, retry::RetryOpts}; @@ -16,6 +17,27 @@ pub fn format_socket_addr(host: &String, port: u16) -> String { format!("{}:{}", host, port) } +pub async fn connect_with_retries_shutdown( + server_name: &str, + url: &str, + func: F, + shutdown_signal: S, +) -> anyhow::Result +where + F: Fn(String) -> FutF, + FutF: Future> + Send + 'static, + S: Future> + Send + 'static, +{ + tokio::select! { + _ = shutdown_signal => { + bail!("shutdown signal received") + } + res = connect_with_retries(server_name, url, func) => { + res + } + } +} + pub async fn connect_with_retries( server_name: &str, url: &str, @@ -23,7 +45,7 @@ pub async fn connect_with_retries( ) -> anyhow::Result where F: Fn(String) -> FutF, - FutF: Future> + Send + 'static, + FutF: Future> + Send + 'static, { let description = format!("connect to {server_name} at {url}"); retry::with_retries( @@ -39,3 +61,16 @@ where .await .context("should connect to server when retrying") } + +#[derive(Debug)] +pub enum ServerStatus { + Serving, + NotServing, +} + +#[async_trait] +pub trait HealthCheck: Send + Sync + 'static { + fn name(&self) -> &'static str; + + async fn status(&self) -> ServerStatus; +} diff --git a/src/op_pool/mempool/mod.rs b/src/op_pool/mempool/mod.rs index 5de63d4cb..57425167d 100644 --- a/src/op_pool/mempool/mod.rs +++ b/src/op_pool/mempool/mod.rs @@ -16,12 +16,10 @@ pub use reputation::{ HourlyMovingAverageReputation, Reputation, ReputationParams, ReputationStatus, }; use strum::IntoEnumIterator; -use tokio::sync::broadcast; -use tokio_util::sync::CancellationToken; use tonic::async_trait; use self::error::MempoolResult; -use super::{chain::ChainUpdate, MempoolError}; +use super::chain::ChainUpdate; use crate::common::{ mempool::MempoolConfig, precheck, simulation, @@ -181,137 +179,6 @@ impl PoolOperation { } } -#[derive(Debug)] -pub struct MempoolGroup { - mempools: HashMap, - chain_update_sender: broadcast::Sender>, -} - -impl MempoolGroup -where - M: Mempool, -{ - pub fn new(mempools: Vec) -> Self { - Self { - mempools: mempools.into_iter().map(|m| (m.entry_point(), m)).collect(), - chain_update_sender: broadcast::channel(1024).0, - } - } - - pub fn subscribe_chain_update(self: Arc) -> broadcast::Receiver> { - self.chain_update_sender.subscribe() - } - - pub async fn run( - self: Arc, - mut chain_update_receiver: broadcast::Receiver>, - shutdown_token: CancellationToken, - ) { - loop { - tokio::select! { - _ = shutdown_token.cancelled() => { - tracing::info!("Shutting down UoPool"); - break; - } - chain_update = chain_update_receiver.recv() => { - if let Ok(chain_update) = chain_update { - // Update each mempool before notifying listeners of the chain update - // This allows the mempools to update their state before the listeners - // pull information from the mempool. - // For example, a bundle builder listening for a new block to kick off - // its bundle building process will want to be able to query the mempool - // and only receive operations that have not yet been mined. - for mempool in self.mempools.values() { - mempool.on_chain_update(&chain_update); - } - let _ = self.chain_update_sender.send(chain_update); - } - } - } - } - } - - pub fn get_supported_entry_points(&self) -> Vec

{ - self.mempools.keys().copied().collect() - } - - pub async fn add_op( - &self, - entry_point: Address, - op: UserOperation, - origin: OperationOrigin, - ) -> MempoolResult { - let mempool = self.get_pool(entry_point)?; - mempool.add_operation(origin, op).await - } - - pub fn get_ops(&self, entry_point: Address, max_ops: u64) -> MempoolResult> { - let mempool = self.get_pool(entry_point)?; - Ok(mempool - .best_operations(max_ops as usize) - .iter() - .map(|op| (**op).clone()) - .collect()) - } - - pub fn remove_ops(&self, entry_point: Address, ops: &[H256]) -> MempoolResult<()> { - let mempool = self.get_pool(entry_point)?; - mempool.remove_operations(ops); - Ok(()) - } - - pub fn remove_entities<'a>( - &self, - entry_point: Address, - entities: impl IntoIterator, - ) -> MempoolResult<()> { - let mempool = self.get_pool(entry_point)?; - for entity in entities { - mempool.remove_entity(*entity); - } - Ok(()) - } - - pub fn debug_clear_state(&self) -> MempoolResult<()> { - for mempool in self.mempools.values() { - mempool.clear(); - } - Ok(()) - } - - pub fn debug_dump_mempool(&self, entry_point: Address) -> MempoolResult> { - let mempool = self.get_pool(entry_point)?; - Ok(mempool - .all_operations(usize::MAX) - .iter() - .map(|op| (**op).clone()) - .collect()) - } - - pub fn debug_set_reputations<'a>( - &self, - entry_point: Address, - reputations: impl IntoIterator, - ) -> MempoolResult<()> { - let mempool = self.get_pool(entry_point)?; - for rep in reputations { - mempool.set_reputation(rep.address, rep.ops_seen, rep.ops_included); - } - Ok(()) - } - - pub fn debug_dump_reputation(&self, entry_point: Address) -> MempoolResult> { - let mempool = self.get_pool(entry_point)?; - Ok(mempool.dump_reputation()) - } - - fn get_pool(&self, entry_point: Address) -> MempoolResult<&M> { - self.mempools - .get(&entry_point) - .ok_or_else(|| MempoolError::UnknownEntryPoint(entry_point)) - } -} - #[cfg(test)] mod tests { use super::*; diff --git a/src/op_pool/mod.rs b/src/op_pool/mod.rs index a049503b2..8b59c22f0 100644 --- a/src/op_pool/mod.rs +++ b/src/op_pool/mod.rs @@ -6,9 +6,8 @@ mod task; pub use mempool::{error::MempoolError, PoolConfig, PoolOperation, Reputation, ReputationStatus}; #[cfg(test)] -pub use server::MockPoolClient; +pub use server::MockPoolServer; pub use server::{ - connect_remote_pool_client, LocalPoolClient, NewHead, PoolClient, PoolClientMode, PoolResult, - PoolServerError, RemotePoolClient, ServerRequest as LocalPoolServerRequest, + LocalPoolBuilder, NewHead, PoolResult, PoolServer, PoolServerError, RemotePoolClient, }; pub use task::*; diff --git a/src/op_pool/server/local.rs b/src/op_pool/server/local.rs new file mode 100644 index 000000000..8cc2be2ba --- /dev/null +++ b/src/op_pool/server/local.rs @@ -0,0 +1,612 @@ +use std::{collections::HashMap, pin::Pin, sync::Arc}; + +use async_stream::stream; +use ethers::types::{Address, H256}; +use futures_util::Stream; +use tokio::{ + sync::{broadcast, mpsc, oneshot}, + task::JoinHandle, +}; +use tokio_util::sync::CancellationToken; +use tonic::async_trait; +use tracing::error; + +use crate::{ + common::{ + server::{HealthCheck, ServerStatus}, + types::{Entity, UserOperation}, + }, + op_pool::{ + chain::ChainUpdate, + mempool::{Mempool, OperationOrigin, PoolOperation}, + server::{NewHead, PoolServer, Reputation}, + MempoolError, PoolResult, PoolServerError, + }, +}; + +#[derive(Debug)] +pub struct LocalPoolBuilder { + req_sender: mpsc::Sender, + req_receiver: mpsc::Receiver, + block_sender: broadcast::Sender, +} + +impl LocalPoolBuilder { + pub fn new(request_capacity: usize, block_capacity: usize) -> Self { + let (req_sender, req_receiver) = mpsc::channel(request_capacity); + let (block_sender, _) = broadcast::channel(block_capacity); + Self { + req_sender, + req_receiver, + block_sender, + } + } + + pub fn get_handle(&self) -> LocalPoolHandle { + LocalPoolHandle { + req_sender: self.req_sender.clone(), + } + } + + pub fn run( + self, + mempools: HashMap>, + chain_updates: broadcast::Receiver>, + shutdown_token: CancellationToken, + ) -> JoinHandle> { + let mut runner = LocalPoolServerRunner::new( + self.req_receiver, + self.block_sender, + mempools, + chain_updates, + ); + tokio::spawn(async move { runner.run(shutdown_token).await }) + } +} + +#[derive(Debug, Clone)] +pub struct LocalPoolHandle { + req_sender: mpsc::Sender, +} + +pub struct LocalPoolServerRunner { + req_receiver: mpsc::Receiver, + block_sender: broadcast::Sender, + mempools: HashMap>, + chain_updates: broadcast::Receiver>, +} + +impl LocalPoolHandle { + async fn send(&self, request: ServerRequestKind) -> PoolResult { + let (send, recv) = oneshot::channel(); + self.req_sender + .send(ServerRequest { + request, + response: send, + }) + .await + .map_err(|_| anyhow::anyhow!("LocalPoolServer closed"))?; + recv.await + .map_err(|_| anyhow::anyhow!("LocalPoolServer closed"))? + } +} + +#[async_trait] +impl PoolServer for LocalPoolHandle { + async fn get_supported_entry_points(&self) -> PoolResult> { + let req = ServerRequestKind::GetSupportedEntryPoints; + let resp = self.send(req).await?; + match resp { + ServerResponse::GetSupportedEntryPoints { entry_points } => Ok(entry_points), + _ => Err(PoolServerError::UnexpectedResponse), + } + } + + async fn add_op(&self, entry_point: Address, op: UserOperation) -> PoolResult { + let req = ServerRequestKind::AddOp { + entry_point, + op, + origin: OperationOrigin::Local, + }; + let resp = self.send(req).await?; + match resp { + ServerResponse::AddOp { hash } => Ok(hash), + _ => Err(PoolServerError::UnexpectedResponse), + } + } + + async fn get_ops(&self, entry_point: Address, max_ops: u64) -> PoolResult> { + let req = ServerRequestKind::GetOps { + entry_point, + max_ops, + }; + let resp = self.send(req).await?; + match resp { + ServerResponse::GetOps { ops } => Ok(ops), + _ => Err(PoolServerError::UnexpectedResponse), + } + } + + async fn remove_ops(&self, entry_point: Address, ops: Vec) -> PoolResult<()> { + let req = ServerRequestKind::RemoveOps { entry_point, ops }; + let resp = self.send(req).await?; + match resp { + ServerResponse::RemoveOps => Ok(()), + _ => Err(PoolServerError::UnexpectedResponse), + } + } + + async fn remove_entities(&self, entry_point: Address, entities: Vec) -> PoolResult<()> { + let req = ServerRequestKind::RemoveEntities { + entry_point, + entities, + }; + let resp = self.send(req).await?; + match resp { + ServerResponse::RemoveEntities => Ok(()), + _ => Err(PoolServerError::UnexpectedResponse), + } + } + + async fn debug_clear_state(&self) -> Result<(), PoolServerError> { + let req = ServerRequestKind::DebugClearState; + let resp = self.send(req).await?; + match resp { + ServerResponse::DebugClearState => Ok(()), + _ => Err(PoolServerError::UnexpectedResponse), + } + } + + async fn debug_dump_mempool(&self, entry_point: Address) -> PoolResult> { + let req = ServerRequestKind::DebugDumpMempool { entry_point }; + let resp = self.send(req).await?; + match resp { + ServerResponse::DebugDumpMempool { ops } => Ok(ops), + _ => Err(PoolServerError::UnexpectedResponse), + } + } + + async fn debug_set_reputations( + &self, + entry_point: Address, + reputations: Vec, + ) -> PoolResult<()> { + let req = ServerRequestKind::DebugSetReputations { + entry_point, + reputations, + }; + let resp = self.send(req).await?; + match resp { + ServerResponse::DebugSetReputations => Ok(()), + _ => Err(PoolServerError::UnexpectedResponse), + } + } + + async fn debug_dump_reputation(&self, entry_point: Address) -> PoolResult> { + let req = ServerRequestKind::DebugDumpReputation { entry_point }; + let resp = self.send(req).await?; + match resp { + ServerResponse::DebugDumpReputation { reputations } => Ok(reputations), + _ => Err(PoolServerError::UnexpectedResponse), + } + } + + async fn subscribe_new_heads(&self) -> PoolResult + Send>>> { + let req = ServerRequestKind::SubscribeNewHeads; + let resp = self.send(req).await?; + match resp { + ServerResponse::SubscribeNewHeads { mut new_heads } => Ok(Box::pin(stream! { + loop { + match new_heads.recv().await { + Ok(block) => yield block, + Err(broadcast::error::RecvError::Lagged(c)) => { + error!("new_heads_receiver lagged {c} blocks"); + } + Err(broadcast::error::RecvError::Closed) => { + error!("new_heads_receiver closed"); + break; + } + } + } + })), + _ => Err(PoolServerError::UnexpectedResponse), + } + } +} + +#[async_trait] +impl HealthCheck for LocalPoolHandle { + fn name(&self) -> &'static str { + "LocalPoolServer" + } + + async fn status(&self) -> ServerStatus { + if self.get_supported_entry_points().await.is_ok() { + ServerStatus::Serving + } else { + ServerStatus::NotServing + } + } +} + +impl LocalPoolServerRunner +where + M: Mempool, +{ + fn new( + req_receiver: mpsc::Receiver, + block_sender: broadcast::Sender, + mempools: HashMap>, + chain_updates: broadcast::Receiver>, + ) -> Self { + Self { + req_receiver, + block_sender, + mempools, + chain_updates, + } + } + + fn get_pool(&self, entry_point: Address) -> PoolResult<&Arc> { + self.mempools.get(&entry_point).ok_or_else(|| { + PoolServerError::MempoolError(MempoolError::UnknownEntryPoint(entry_point)) + }) + } + + fn get_ops(&self, entry_point: Address, max_ops: u64) -> PoolResult> { + let mempool = self.get_pool(entry_point)?; + Ok(mempool + .best_operations(max_ops as usize) + .iter() + .map(|op| (**op).clone()) + .collect()) + } + + fn remove_ops(&self, entry_point: Address, ops: &[H256]) -> PoolResult<()> { + let mempool = self.get_pool(entry_point)?; + mempool.remove_operations(ops); + Ok(()) + } + + fn remove_entities<'a>( + &self, + entry_point: Address, + entities: impl IntoIterator, + ) -> PoolResult<()> { + let mempool = self.get_pool(entry_point)?; + for entity in entities { + mempool.remove_entity(*entity); + } + Ok(()) + } + + fn debug_clear_state(&self) -> PoolResult<()> { + for mempool in self.mempools.values() { + mempool.clear(); + } + Ok(()) + } + + fn debug_dump_mempool(&self, entry_point: Address) -> PoolResult> { + let mempool = self.get_pool(entry_point)?; + Ok(mempool + .all_operations(usize::MAX) + .iter() + .map(|op| (**op).clone()) + .collect()) + } + + fn debug_set_reputations<'a>( + &self, + entry_point: Address, + reputations: impl IntoIterator, + ) -> PoolResult<()> { + let mempool = self.get_pool(entry_point)?; + for rep in reputations { + mempool.set_reputation(rep.address, rep.ops_seen, rep.ops_included); + } + Ok(()) + } + + fn debug_dump_reputation(&self, entry_point: Address) -> PoolResult> { + let mempool = self.get_pool(entry_point)?; + Ok(mempool.dump_reputation()) + } + + async fn run(&mut self, shutdown_token: CancellationToken) -> anyhow::Result<()> { + loop { + tokio::select! { + _ = shutdown_token.cancelled() => { + break; + } + chain_update = self.chain_updates.recv() => { + if let Ok(chain_update) = chain_update { + // Update each mempool before notifying listeners of the chain update + // This allows the mempools to update their state before the listeners + // pull information from the mempool. + // For example, a bundle builder listening for a new block to kick off + // its bundle building process will want to be able to query the mempool + // and only receive operations that have not yet been mined. + for mempool in self.mempools.values() { + mempool.on_chain_update(&chain_update); + } + + let _ = self.block_sender.send(NewHead { + block_hash: chain_update.latest_block_hash, + block_number: chain_update.latest_block_number, + }); + } + } + Some(req) = self.req_receiver.recv() => { + let resp = match req.request { + ServerRequestKind::GetSupportedEntryPoints => { + Ok(ServerResponse::GetSupportedEntryPoints { + entry_points: self.mempools.keys().copied().collect() + }) + }, + ServerRequestKind::AddOp { entry_point, op, origin } => { + match self.get_pool(entry_point) { + Ok(mempool) => { + let mempool = Arc::clone(mempool); + tokio::spawn(async move { + let resp = match mempool.add_operation(origin, op).await { + Ok(hash) => Ok(ServerResponse::AddOp { hash }), + Err(e) => Err(e.into()), + }; + if let Err(e) = req.response.send(resp) { + tracing::error!("Failed to send response: {:?}", e); + } + }); + continue; + }, + Err(e) => Err(e), + } + }, + ServerRequestKind::GetOps { entry_point, max_ops } => { + match self.get_ops(entry_point, max_ops) { + Ok(ops) => Ok(ServerResponse::GetOps { ops }), + Err(e) => Err(e), + } + }, + ServerRequestKind::RemoveOps { entry_point, ops } => { + match self.remove_ops(entry_point, &ops) { + Ok(_) => Ok(ServerResponse::RemoveOps), + Err(e) => Err(e), + } + }, + ServerRequestKind::RemoveEntities { entry_point, entities } => { + match self.remove_entities(entry_point, &entities) { + Ok(_) => Ok(ServerResponse::RemoveEntities), + Err(e) => Err(e), + } + }, + ServerRequestKind::DebugClearState => { + match self.debug_clear_state() { + Ok(_) => Ok(ServerResponse::DebugClearState), + Err(e) => Err(e), + } + }, + ServerRequestKind::DebugDumpMempool { entry_point } => { + match self.debug_dump_mempool(entry_point) { + Ok(ops) => Ok(ServerResponse::DebugDumpMempool { ops }), + Err(e) => Err(e), + } + }, + ServerRequestKind::DebugSetReputations { entry_point, reputations } => { + match self.debug_set_reputations(entry_point, &reputations) { + Ok(_) => Ok(ServerResponse::DebugSetReputations), + Err(e) => Err(e), + } + }, + ServerRequestKind::DebugDumpReputation { entry_point } => { + match self.debug_dump_reputation(entry_point) { + Ok(reputations) => Ok(ServerResponse::DebugDumpReputation { reputations }), + Err(e) => Err(e), + } + }, + ServerRequestKind::SubscribeNewHeads => { + Ok(ServerResponse::SubscribeNewHeads { new_heads: self.block_sender.subscribe() } ) + } + }; + if let Err(e) = req.response.send(resp) { + tracing::error!("Failed to send response: {:?}", e); + } + } + } + } + + Ok(()) + } +} + +#[derive(Debug)] +struct ServerRequest { + pub request: ServerRequestKind, + pub response: oneshot::Sender>, +} + +#[derive(Debug)] +enum ServerRequestKind { + GetSupportedEntryPoints, + AddOp { + entry_point: Address, + op: UserOperation, + origin: OperationOrigin, + }, + GetOps { + entry_point: Address, + max_ops: u64, + }, + RemoveOps { + entry_point: Address, + ops: Vec, + }, + RemoveEntities { + entry_point: Address, + entities: Vec, + }, + DebugClearState, + DebugDumpMempool { + entry_point: Address, + }, + DebugSetReputations { + entry_point: Address, + reputations: Vec, + }, + DebugDumpReputation { + entry_point: Address, + }, + SubscribeNewHeads, +} + +#[derive(Debug)] +enum ServerResponse { + GetSupportedEntryPoints { + entry_points: Vec
, + }, + AddOp { + hash: H256, + }, + GetOps { + ops: Vec, + }, + RemoveOps, + RemoveEntities, + DebugClearState, + DebugDumpMempool { + ops: Vec, + }, + DebugSetReputations, + DebugDumpReputation { + reputations: Vec, + }, + SubscribeNewHeads { + new_heads: broadcast::Receiver, + }, +} + +#[cfg(test)] +mod tests { + use std::{iter::zip, sync::Arc}; + + use futures_util::StreamExt; + + use super::*; + use crate::op_pool::{chain::ChainUpdate, mempool::MockMempool}; + + #[tokio::test] + async fn test_add_op() { + let mut mock_pool = MockMempool::new(); + let hash0 = H256::random(); + mock_pool + .expect_add_operation() + .returning(move |_, _| Ok(hash0)); + + let ep = Address::random(); + let state = setup(HashMap::from([(ep, Arc::new(mock_pool))])); + + let hash1 = state + .handle + .add_op(ep, UserOperation::default()) + .await + .unwrap(); + assert_eq!(hash0, hash1); + } + + #[tokio::test] + async fn test_chain_update() { + let mut mock_pool = MockMempool::new(); + mock_pool.expect_on_chain_update().returning(|_| ()); + + let ep = Address::random(); + let state = setup(HashMap::from([(ep, Arc::new(mock_pool))])); + + let mut sub = state.handle.subscribe_new_heads().await.unwrap(); + + let hash = H256::random(); + let number = 1234; + state + .chain_update_tx + .send(Arc::new(ChainUpdate { + latest_block_hash: hash, + latest_block_number: number, + ..Default::default() + })) + .unwrap(); + + let new_block = sub.next().await.unwrap(); + assert_eq!(hash, new_block.block_hash); + assert_eq!(number, new_block.block_number); + } + + #[tokio::test] + async fn test_get_supported_entry_points() { + let mut eps0 = vec![Address::random(), Address::random(), Address::random()]; + + let state = setup( + eps0.iter() + .map(|ep| (*ep, Arc::new(MockMempool::new()))) + .collect(), + ); + + let mut eps1 = state.handle.get_supported_entry_points().await.unwrap(); + + eps0.sort(); + eps1.sort(); + assert_eq!(eps0, eps1); + } + + #[tokio::test] + async fn test_multiple_entry_points() { + let eps = [Address::random(), Address::random(), Address::random()]; + let mut pools = [MockMempool::new(), MockMempool::new(), MockMempool::new()]; + let h0 = H256::random(); + let h1 = H256::random(); + let h2 = H256::random(); + let hashes = [h0, h1, h2]; + pools[0] + .expect_add_operation() + .returning(move |_, _| Ok(h0)); + pools[1] + .expect_add_operation() + .returning(move |_, _| Ok(h1)); + pools[2] + .expect_add_operation() + .returning(move |_, _| Ok(h2)); + + let state = setup( + zip(eps.iter(), pools.into_iter()) + .map(|(ep, pool)| (*ep, Arc::new(pool))) + .collect(), + ); + + for (ep, hash) in zip(eps.iter(), hashes.iter()) { + assert_eq!( + *hash, + state + .handle + .add_op(*ep, UserOperation::default()) + .await + .unwrap() + ); + } + } + + struct State { + handle: LocalPoolHandle, + chain_update_tx: broadcast::Sender>, + _run_handle: JoinHandle>, + } + + fn setup(pools: HashMap>) -> State { + let builder = LocalPoolBuilder::new(10, 10); + let handle = builder.get_handle(); + let (tx, rx) = broadcast::channel(10); + let run_handle = builder.run(pools, rx, CancellationToken::new()); + State { + handle, + chain_update_tx: tx, + _run_handle: run_handle, + } + } +} diff --git a/src/op_pool/server/local/mod.rs b/src/op_pool/server/local/mod.rs deleted file mode 100644 index 24bf375cd..000000000 --- a/src/op_pool/server/local/mod.rs +++ /dev/null @@ -1,5 +0,0 @@ -mod client; -mod server; - -pub use client::*; -pub use server::*; diff --git a/src/op_pool/server/mod.rs b/src/op_pool/server/mod.rs index ac143733e..6fce77450 100644 --- a/src/op_pool/server/mod.rs +++ b/src/op_pool/server/mod.rs @@ -7,25 +7,26 @@ use std::pin::Pin; pub use error::PoolServerError; use ethers::types::{Address, H256}; use futures_util::Stream; -pub use local::{spawn_local_mempool_server, LocalPoolClient, ServerRequest}; +pub use local::{LocalPoolBuilder, LocalPoolHandle}; #[cfg(test)] use mockall::automock; -pub use remote::{connect_remote_pool_client, spawn_remote_mempool_server, RemotePoolClient}; -use tokio::sync::{broadcast, mpsc}; +pub use remote::{spawn_remote_mempool_server, RemotePoolClient}; use tonic::async_trait; use super::{mempool::PoolOperation, Reputation}; -use crate::{ - common::types::{Entity, UserOperation}, - op_pool::LocalPoolServerRequest, -}; +use crate::common::types::{Entity, UserOperation}; -pub type Error = error::PoolServerError; -pub type PoolResult = std::result::Result; +pub type PoolResult = std::result::Result; + +#[derive(Clone, Debug)] +pub struct NewHead { + pub block_hash: H256, + pub block_number: u64, +} #[cfg_attr(test, automock)] #[async_trait] -pub trait PoolClient: Send + Sync + 'static { +pub trait PoolServer: Send + Sync + 'static { async fn get_supported_entry_points(&self) -> PoolResult>; async fn add_op(&self, entry_point: Address, op: UserOperation) -> PoolResult; @@ -48,22 +49,5 @@ pub trait PoolClient: Send + Sync + 'static { async fn debug_dump_reputation(&self, entry_point: Address) -> PoolResult>; - fn subscribe_new_heads(&self) -> PoolResult + Send>>>; -} - -#[derive(Clone, Debug)] -pub struct NewHead { - pub block_hash: H256, - pub block_number: u64, -} - -#[derive(Debug)] -pub enum PoolClientMode { - Local { - req_sender: mpsc::Sender, - new_heads_receiver: broadcast::Receiver, - }, - Remote { - url: String, - }, + async fn subscribe_new_heads(&self) -> PoolResult + Send>>>; } diff --git a/src/op_pool/server/remote/client.rs b/src/op_pool/server/remote/client.rs index 873d95c17..de24fa7de 100644 --- a/src/op_pool/server/remote/client.rs +++ b/src/op_pool/server/remote/client.rs @@ -1,12 +1,17 @@ -use std::pin::Pin; +use std::{pin::Pin, str::FromStr}; -use anyhow::bail; use ethers::types::{Address, H256}; use futures_util::Stream; use tokio::sync::mpsc; -use tokio_stream::wrappers::ReceiverStream; -use tokio_util::sync::CancellationToken; -use tonic::{async_trait, transport::Channel}; +use tokio_stream::wrappers::UnboundedReceiverStream; +use tonic::{ + async_trait, + transport::{Channel, Uri}, +}; +use tonic_health::{ + pb::{health_client::HealthClient, HealthCheckRequest}, + ServingStatus, +}; use super::protos::{ self, add_op_response, debug_clear_state_response, debug_dump_mempool_response, @@ -20,12 +25,12 @@ use crate::{ common::{ protos::{from_bytes, ConversionError}, retry::{self, UnlimitedRetryOpts}, - server::connect_with_retries, + server::{HealthCheck, ServerStatus}, types::{Entity, UserOperation}, }, op_pool::{ mempool::{PoolOperation, Reputation}, - server::{error::PoolServerError, NewHead, PoolClient}, + server::{error::PoolServerError, NewHead, PoolServer}, PoolResult, }, }; @@ -33,20 +38,25 @@ use crate::{ #[derive(Debug, Clone)] pub struct RemotePoolClient { op_pool_client: OpPoolClient, + op_pool_health: HealthClient, } impl RemotePoolClient { - pub fn new(client: OpPoolClient) -> Self { - Self { - op_pool_client: client, - } + pub async fn connect(url: String) -> anyhow::Result { + let op_pool_client = OpPoolClient::connect(url.clone()).await?; + let op_pool_health = + HealthClient::new(Channel::builder(Uri::from_str(&url)?).connect().await?); + Ok(Self { + op_pool_client, + op_pool_health, + }) } // Handler for the new block subscription. This will attempt to resubscribe if the gRPC // connection disconnects using expenential backoff. async fn new_heads_subscription_handler( client: OpPoolClient, - tx: mpsc::Sender, + tx: mpsc::UnboundedSender, ) { let mut stream = None; @@ -69,7 +79,7 @@ impl RemotePoolClient { match stream.as_mut().unwrap().message().await { Ok(Some(SubscribeNewHeadsResponse { new_head: Some(b) })) => match b.try_into() { Ok(new_head) => { - if tx.send(new_head).await.is_err() { + if tx.send(new_head).is_err() { // recv handle dropped return; } @@ -95,7 +105,7 @@ impl RemotePoolClient { } #[async_trait] -impl PoolClient for RemotePoolClient { +impl PoolServer for RemotePoolClient { async fn get_supported_entry_points(&self) -> PoolResult> { Ok(self .op_pool_client @@ -294,26 +304,29 @@ impl PoolClient for RemotePoolClient { } } - fn subscribe_new_heads(&self) -> PoolResult + Send>>> { - let (tx, rx) = mpsc::channel(1024); + async fn subscribe_new_heads(&self) -> PoolResult + Send>>> { + let (tx, rx) = mpsc::unbounded_channel(); let client = self.op_pool_client.clone(); tokio::spawn(Self::new_heads_subscription_handler(client, tx)); - Ok(Box::pin(ReceiverStream::new(rx))) + Ok(Box::pin(UnboundedReceiverStream::new(rx))) } } -pub async fn connect_remote_pool_client( - op_pool_url: &str, - shutdown_token: CancellationToken, -) -> anyhow::Result { - tokio::select! { - _ = shutdown_token.cancelled() => { - tracing::error!("bailing from connecting client, server shutting down"); - bail!("Server shutting down") - } - res = connect_with_retries("op pool from builder", op_pool_url, OpPoolClient::connect) => { - Ok(RemotePoolClient::new(res?)) - } +#[async_trait] +impl HealthCheck for RemotePoolClient { + fn name(&self) -> &'static str { + "RemotePoolServer" + } + + async fn status(&self) -> ServerStatus { + self.op_pool_health + .clone() + .check(HealthCheckRequest::default()) + .await + .ok() + .filter(|status| status.get_ref().status == ServingStatus::Serving as i32) + .map(|_| ServerStatus::Serving) + .unwrap_or(ServerStatus::NotServing) } } diff --git a/src/op_pool/server/remote/protos.rs b/src/op_pool/server/remote/protos.rs index 7a3335a4c..0c62aab29 100644 --- a/src/op_pool/server/remote/protos.rs +++ b/src/op_pool/server/remote/protos.rs @@ -219,3 +219,12 @@ impl TryFrom for PoolNewHead { }) } } + +impl From for NewHead { + fn from(head: PoolNewHead) -> Self { + Self { + block_hash: head.block_hash.as_bytes().to_vec(), + block_number: head.block_number, + } + } +} diff --git a/src/op_pool/server/remote/server.rs b/src/op_pool/server/remote/server.rs index 5e3ecacb6..0ca16ad2c 100644 --- a/src/op_pool/server/remote/server.rs +++ b/src/op_pool/server/remote/server.rs @@ -7,8 +7,9 @@ use std::{ }; use ethers::types::{Address, H256}; +use futures_util::StreamExt; use tokio::{sync::mpsc, task::JoinHandle}; -use tokio_stream::wrappers::ReceiverStream; +use tokio_stream::wrappers::UnboundedReceiverStream; use tokio_util::sync::CancellationToken; use tonic::{async_trait, transport::Server, Request, Response, Result, Status}; @@ -22,27 +23,26 @@ use super::protos::{ DebugDumpReputationRequest, DebugDumpReputationResponse, DebugDumpReputationSuccess, DebugSetReputationRequest, DebugSetReputationResponse, DebugSetReputationSuccess, GetOpsRequest, GetOpsResponse, GetOpsSuccess, GetSupportedEntryPointsRequest, - GetSupportedEntryPointsResponse, MempoolOp, NewHead, RemoveEntitiesRequest, - RemoveEntitiesResponse, RemoveEntitiesSuccess, RemoveOpsRequest, RemoveOpsResponse, - RemoveOpsSuccess, SubscribeNewHeadsRequest, SubscribeNewHeadsResponse, - OP_POOL_FILE_DESCRIPTOR_SET, + GetSupportedEntryPointsResponse, MempoolOp, RemoveEntitiesRequest, RemoveEntitiesResponse, + RemoveEntitiesSuccess, RemoveOpsRequest, RemoveOpsResponse, RemoveOpsSuccess, + SubscribeNewHeadsRequest, SubscribeNewHeadsResponse, OP_POOL_FILE_DESCRIPTOR_SET, }; use crate::{ common::{grpc::metrics::GrpcMetricsLayer, protos::from_bytes, types::Entity}, - op_pool::mempool::{Mempool, MempoolGroup, OperationOrigin, Reputation}, + op_pool::{mempool::Reputation, server::local::LocalPoolHandle, PoolServer}, }; const MAX_REMOTE_BLOCK_SUBSCRIPTIONS: usize = 32; -pub async fn spawn_remote_mempool_server( +pub async fn spawn_remote_mempool_server( chain_id: u64, - mempool_group: Arc>, + local_pool: LocalPoolHandle, addr: SocketAddr, shutdown_token: CancellationToken, ) -> anyhow::Result>> { // gRPC server - let pool_impl = Arc::new(OpPoolImpl::new(chain_id, mempool_group)); - let op_pool_server = OpPoolServer::new(Arc::clone(&pool_impl)); + let pool_impl = OpPoolImpl::new(chain_id, local_pool); + let op_pool_server = OpPoolServer::new(pool_impl); let reflection_service = tonic_reflection::server::Builder::configure() .register_encoded_file_descriptor_set(OP_POOL_FILE_DESCRIPTOR_SET) .build()?; @@ -50,7 +50,7 @@ pub async fn spawn_remote_mempool_server( // health service let (mut health_reporter, health_service) = tonic_health::server::health_reporter(); health_reporter - .set_serving::>>>() + .set_serving::>() .await; let metrics_layer = GrpcMetricsLayer::new("op_pool".to_string()); @@ -68,20 +68,17 @@ pub async fn spawn_remote_mempool_server( Ok(handle) } -struct OpPoolImpl { +struct OpPoolImpl { chain_id: u64, - mempools: Arc>, + local_pool: LocalPoolHandle, num_block_subscriptions: Arc, } -impl OpPoolImpl -where - M: Mempool, -{ - pub fn new(chain_id: u64, mempools: Arc>) -> Self { +impl OpPoolImpl { + pub fn new(chain_id: u64, local_pool: LocalPoolHandle) -> Self { Self { chain_id, - mempools, + local_pool, num_block_subscriptions: Arc::new(AtomicUsize::new(0)), } } @@ -93,23 +90,25 @@ where } #[async_trait] -impl OpPool for Arc> -where - M: Mempool + 'static, -{ +impl OpPool for OpPoolImpl { async fn get_supported_entry_points( &self, _request: Request, ) -> Result> { - Ok(Response::new(GetSupportedEntryPointsResponse { - chain_id: self.chain_id, - entry_points: self - .mempools - .get_supported_entry_points() - .into_iter() - .map(|ep| ep.as_bytes().to_vec()) - .collect(), - })) + let resp = match self.local_pool.get_supported_entry_points().await { + Ok(entry_points) => GetSupportedEntryPointsResponse { + chain_id: self.chain_id, + entry_points: entry_points + .into_iter() + .map(|ep| ep.as_bytes().to_vec()) + .collect(), + }, + Err(e) => { + return Err(Status::internal(format!("Failed to get entry points: {e}"))); + } + }; + + Ok(Response::new(resp)) } async fn add_op(&self, request: Request) -> Result> { @@ -123,7 +122,7 @@ where Status::invalid_argument(format!("Failed to convert to UserOperation: {e}")) })?; - let resp = match self.mempools.add_op(ep, uo, OperationOrigin::Local).await { + let resp = match self.local_pool.add_op(ep, uo).await { Ok(hash) => AddOpResponse { result: Some(add_op_response::Result::Success(AddOpSuccess { hash: hash.as_bytes().to_vec(), @@ -141,7 +140,7 @@ where let req = request.into_inner(); let ep = self.get_entry_point(&req.entry_point)?; - let resp = match self.mempools.get_ops(ep, req.max_ops) { + let resp = match self.local_pool.get_ops(ep, req.max_ops).await { Ok(ops) => GetOpsResponse { result: Some(get_ops_response::Result::Success(GetOpsSuccess { ops: ops.iter().map(MempoolOp::from).collect(), @@ -173,7 +172,7 @@ where }) .collect::, _>>()?; - let resp = match self.mempools.remove_ops(ep, &hashes) { + let resp = match self.local_pool.remove_ops(ep, hashes).await { Ok(_) => RemoveOpsResponse { result: Some(remove_ops_response::Result::Success(RemoveOpsSuccess {})), }, @@ -198,11 +197,7 @@ where .collect::, _>>() .map_err(|e| Status::internal(format!("Failed to convert to proto entity: {e}")))?; - self.mempools - .remove_entities(ep, &entities) - .map_err(|e| Status::internal(e.to_string()))?; - - let resp = match self.mempools.remove_entities(ep, &entities) { + let resp = match self.local_pool.remove_entities(ep, entities).await { Ok(_) => RemoveEntitiesResponse { result: Some(remove_entities_response::Result::Success( RemoveEntitiesSuccess {}, @@ -220,7 +215,7 @@ where &self, _request: Request, ) -> Result> { - let resp = match self.mempools.debug_clear_state() { + let resp = match self.local_pool.debug_clear_state().await { Ok(_) => DebugClearStateResponse { result: Some(debug_clear_state_response::Result::Success( DebugClearStateSuccess {}, @@ -241,7 +236,7 @@ where let req = request.into_inner(); let ep = self.get_entry_point(&req.entry_point)?; - let resp = match self.mempools.debug_dump_mempool(ep) { + let resp = match self.local_pool.debug_dump_mempool(ep).await { Ok(ops) => DebugDumpMempoolResponse { result: Some(debug_dump_mempool_response::Result::Success( DebugDumpMempoolSuccess { @@ -280,7 +275,7 @@ where Status::internal(format!("Failed to convert from proto reputation {e}")) })?; - let resp = match self.mempools.debug_set_reputations(ep, &reps) { + let resp = match self.local_pool.debug_set_reputations(ep, reps).await { Ok(_) => DebugSetReputationResponse { result: Some(debug_set_reputation_response::Result::Success( DebugSetReputationSuccess {}, @@ -301,7 +296,7 @@ where let req = request.into_inner(); let ep = self.get_entry_point(&req.entry_point)?; - let resp = match self.mempools.debug_dump_reputation(ep) { + let resp = match self.local_pool.debug_dump_reputation(ep).await { Ok(reps) => DebugDumpReputationResponse { result: Some(debug_dump_reputation_response::Result::Success( DebugDumpReputationSuccess { @@ -319,13 +314,13 @@ where Ok(Response::new(resp)) } - type SubscribeNewHeadsStream = ReceiverStream>; + type SubscribeNewHeadsStream = UnboundedReceiverStream>; async fn subscribe_new_heads( &self, _request: Request, ) -> Result> { - let (tx, rx) = mpsc::channel(1024); + let (tx, rx) = mpsc::unbounded_channel(); if self.num_block_subscriptions.fetch_add(1, Ordering::Relaxed) >= MAX_REMOTE_BLOCK_SUBSCRIPTIONS @@ -335,26 +330,31 @@ where } let num_block_subscriptions = Arc::clone(&self.num_block_subscriptions); - let mut chain_updates = self.mempools.clone().subscribe_chain_update(); + let mut new_heads = match self.local_pool.subscribe_new_heads().await { + Ok(new_heads) => new_heads, + Err(error) => { + tracing::error!("Failed to subscribe to new blocks: {error}"); + return Err(Status::internal(format!( + "Failed to subscribe to new blocks: {error}" + ))); + } + }; + tokio::spawn(async move { loop { - match chain_updates.recv().await { - Ok(chain_update) => { + match new_heads.next().await { + Some(new_head) => { if tx .send(Ok(SubscribeNewHeadsResponse { - new_head: Some(NewHead { - block_hash: chain_update.latest_block_hash.as_bytes().to_vec(), - block_number: chain_update.latest_block_number, - }), + new_head: Some(new_head.into()), })) - .await .is_err() { break; } } - Err(_) => { - tracing::warn!("chain update channel closed"); + None => { + tracing::warn!("new block subscription closed"); break; } } @@ -362,6 +362,6 @@ where num_block_subscriptions.fetch_sub(1, Ordering::Relaxed); }); - Ok(Response::new(ReceiverStream::new(rx))) + Ok(Response::new(UnboundedReceiverStream::new(rx))) } } diff --git a/src/op_pool/task.rs b/src/op_pool/task.rs index 76a888b0e..a563f2e0b 100644 --- a/src/op_pool/task.rs +++ b/src/op_pool/task.rs @@ -1,21 +1,15 @@ -use std::{net::SocketAddr, sync::Arc, time::Duration}; +use std::{collections::HashMap, net::SocketAddr, sync::Arc, time::Duration}; use anyhow::{bail, Context}; use ethers::providers::{ Http, HttpRateLimitRetryPolicy, JsonRpcClient, Provider, RetryClientBuilder, }; -use tokio::{ - sync::{broadcast, mpsc}, - try_join, -}; +use tokio::{sync::broadcast, try_join}; use tokio_util::sync::CancellationToken; use tonic::async_trait; use url::Url; -use super::{ - mempool::{HourlyMovingAverageReputation, PoolConfig, ReputationParams}, - server::{NewHead, ServerRequest}, -}; +use super::mempool::{HourlyMovingAverageReputation, PoolConfig, ReputationParams}; use crate::{ common::{ contracts::i_entry_point::IEntryPoint, @@ -28,20 +22,15 @@ use crate::{ op_pool::{ chain::{self, Chain}, emit::OpPoolEvent, - mempool::{uo_pool::UoPool, MempoolGroup}, - server::{spawn_local_mempool_server, spawn_remote_mempool_server}, + mempool::uo_pool::UoPool, + server::{spawn_remote_mempool_server, LocalPoolBuilder}, }, }; #[derive(Debug)] pub enum PoolServerMode { - Local { - req_receiver: Option>, - new_heads_sender: Option>, - }, - Remote { - addr: SocketAddr, - }, + Local, + Remote { addr: SocketAddr }, } #[derive(Debug)] @@ -58,11 +47,12 @@ pub struct Args { pub struct PoolTask { args: Args, event_sender: broadcast::Sender>, + pool_builder: LocalPoolBuilder, } #[async_trait] impl Task for PoolTask { - async fn run(&mut self, shutdown_token: CancellationToken) -> anyhow::Result<()> { + async fn run(mut self: Box, shutdown_token: CancellationToken) -> anyhow::Result<()> { let chain_id = self.args.chain_id; tracing::info!("Chain id: {chain_id}"); tracing::info!("Http url: {:?}", self.args.http_url); @@ -96,49 +86,27 @@ impl Task for PoolTask { let provider = Arc::new(Provider::new(client)); // create mempools - let mut mempools = vec![]; + let mut mempools = HashMap::new(); for pool_config in &self.args.pool_configs { let pool = PoolTask::create_mempool(pool_config, self.event_sender.clone(), provider.clone()) .await .context("should have created mempool")?; - mempools.push(pool); + mempools.insert(pool_config.entry_point, Arc::new(pool)); } - let mempool_group = Arc::new(MempoolGroup::new(mempools)); - let mempool_group_runner = Arc::clone(&mempool_group); - let mempool_shutdown = shutdown_token.clone(); - // handle to wait for mempool group to terminate - let mempool_handle = tokio::spawn(async move { - mempool_group_runner - .run(update_sender.subscribe(), mempool_shutdown) - .await; - Ok(()) - }); - - let server_handle = match &mut self.args.server_mode { - PoolServerMode::Local { - ref mut req_receiver, - ref mut new_heads_sender, - } => { - let req_receiver = req_receiver - .take() - .context("should have local server message receiver")?; - let new_heads_sender = new_heads_sender - .take() - .context("should have block sender")?; - spawn_local_mempool_server( - Arc::clone(&mempool_group), - req_receiver, - new_heads_sender, - shutdown_token.clone(), - )? - } + let pool_handle = self.pool_builder.get_handle(); + let pool_runnder_handle = + self.pool_builder + .run(mempools, update_sender.subscribe(), shutdown_token.clone()); + + let remote_handle = match &mut self.args.server_mode { + PoolServerMode::Local => tokio::spawn(async { Ok(()) }), PoolServerMode::Remote { addr } => { spawn_remote_mempool_server( self.args.chain_id, - Arc::clone(&mempool_group), + pool_handle, *addr, shutdown_token.clone(), ) @@ -149,8 +117,8 @@ impl Task for PoolTask { tracing::info!("Started op_pool"); match try_join!( - handle::flatten_handle(mempool_handle), - handle::flatten_handle(server_handle), + handle::flatten_handle(pool_runnder_handle), + handle::flatten_handle(remote_handle), handle::as_anyhow_handle(chain_handle), ) { Ok(_) => { @@ -169,8 +137,13 @@ impl PoolTask { pub fn new( args: Args, event_sender: broadcast::Sender>, + pool_builder: LocalPoolBuilder, ) -> PoolTask { - Self { args, event_sender } + Self { + args, + event_sender, + pool_builder, + } } pub fn boxed(self) -> Box { diff --git a/src/rpc/debug.rs b/src/rpc/debug.rs index 615cf2f96..202a9ce17 100644 --- a/src/rpc/debug.rs +++ b/src/rpc/debug.rs @@ -14,7 +14,7 @@ use crate::{ }, types::BundlingMode, }, - op_pool::PoolClient, + op_pool::PoolServer, }; /// Debug API @@ -44,14 +44,14 @@ pub trait DebugApi { } pub struct DebugApi

{ - op_pool_client: P, + pool: P, builder_client: builder_client::BuilderClient, } impl

DebugApi

{ - pub fn new(op_pool_client: P, builder_client: builder_client::BuilderClient) -> Self { + pub fn new(pool: P, builder_client: builder_client::BuilderClient) -> Self { Self { - op_pool_client, + pool, builder_client, } } @@ -60,11 +60,11 @@ impl

DebugApi

{ #[async_trait] impl

DebugApiServer for DebugApi

where - P: PoolClient, + P: PoolServer, { async fn bundler_clear_state(&self) -> RpcResult { let _ = self - .op_pool_client + .pool .debug_clear_state() .await .map_err(|e| RpcError::Custom(e.to_string()))?; @@ -74,7 +74,7 @@ where async fn bundler_dump_mempool(&self, entry_point: Address) -> RpcResult> { Ok(self - .op_pool_client + .pool .debug_dump_mempool(entry_point) .await .map_err(|e| RpcError::Custom(e.to_string()))? @@ -112,7 +112,7 @@ where entry_point: Address, ) -> RpcResult { let _ = self - .op_pool_client + .pool .debug_set_reputations( entry_point, reputations.into_iter().map(Into::into).collect(), @@ -124,7 +124,7 @@ where async fn bundler_dump_reputation(&self, entry_point: Address) -> RpcResult> { let result = self - .op_pool_client + .pool .debug_dump_reputation(entry_point) .await .map_err(|e| RpcError::Custom(e.to_string()))?; diff --git a/src/rpc/eth/mod.rs b/src/rpc/eth/mod.rs index cacaeeadc..adf36cbd7 100644 --- a/src/rpc/eth/mod.rs +++ b/src/rpc/eth/mod.rs @@ -32,7 +32,7 @@ use crate::{ eth::log_to_raw_log, types::{UserOperation, BASE_CHAIN_IDS}, }, - op_pool::PoolClient, + op_pool::PoolServer, rpc::{ estimation::{GasEstimationError, GasEstimator, GasEstimatorImpl}, GasEstimate, RichUserOperation, RpcUserOperation, UserOperationOptionalGas, @@ -97,24 +97,24 @@ where } #[derive(Debug)] -pub struct EthApi { +pub struct EthApi { contexts_by_entry_point: HashMap>, provider: Arc>, chain_id: u64, - op_pool_client: P, + pool: P, } impl EthApi where C: JsonRpcClient + 'static, - P: PoolClient, + P: PoolServer, { #[allow(clippy::too_many_arguments)] pub fn new( provider: Arc>, entry_points: Vec

, chain_id: u64, - op_pool_client: P, + pool: P, estimation_settings: estimation::Settings, ) -> Self { let contexts_by_entry_point = entry_points @@ -131,7 +131,7 @@ where contexts_by_entry_point, provider, chain_id, - op_pool_client, + pool, } } @@ -311,7 +311,7 @@ where impl EthApiServer for EthApi where C: JsonRpcClient + 'static, - P: PoolClient, + P: PoolServer, { async fn send_user_operation( &self, @@ -324,7 +324,7 @@ where ))?; } Ok(self - .op_pool_client + .pool .add_op(entry_point, op.into()) .await .map_err(EthRpcError::from) @@ -506,7 +506,7 @@ mod tests { }; use super::*; - use crate::op_pool::MockPoolClient; + use crate::op_pool::MockPoolServer; const UO_OP_TOPIC: &str = "user-op-event-topic"; @@ -520,7 +520,7 @@ mod tests { given_log(UO_OP_TOPIC, "another-hash"), ]); - let result = EthApi::::filter_receipt_logs_matching_user_op( + let result = EthApi::::filter_receipt_logs_matching_user_op( &reference_log, &receipt, ); @@ -542,7 +542,7 @@ mod tests { given_log(UO_OP_TOPIC, "another-hash"), ]); - let result = EthApi::::filter_receipt_logs_matching_user_op( + let result = EthApi::::filter_receipt_logs_matching_user_op( &reference_log, &receipt, ); @@ -564,7 +564,7 @@ mod tests { reference_log.clone(), ]); - let result = EthApi::::filter_receipt_logs_matching_user_op( + let result = EthApi::::filter_receipt_logs_matching_user_op( &reference_log, &receipt, ); @@ -590,7 +590,7 @@ mod tests { reference_log.clone(), ]); - let result = EthApi::::filter_receipt_logs_matching_user_op( + let result = EthApi::::filter_receipt_logs_matching_user_op( &reference_log, &receipt, ); @@ -615,7 +615,7 @@ mod tests { given_log(UO_OP_TOPIC, "other-hash"), ]); - let result = EthApi::::filter_receipt_logs_matching_user_op( + let result = EthApi::::filter_receipt_logs_matching_user_op( &reference_log, &receipt, ); @@ -636,7 +636,7 @@ mod tests { given_log("another-topic-2", "some-hash"), ]); - let result = EthApi::::filter_receipt_logs_matching_user_op( + let result = EthApi::::filter_receipt_logs_matching_user_op( &reference_log, &receipt, ); diff --git a/src/rpc/health.rs b/src/rpc/health.rs index 86be3a946..100af6842 100644 --- a/src/rpc/health.rs +++ b/src/rpc/health.rs @@ -1,9 +1,7 @@ -use anyhow::Context; -use jsonrpsee::{core::RpcResult, proc_macros::rpc}; -use tonic::{async_trait, transport::Channel}; -use tonic_health::pb::{health_client::HealthClient, HealthCheckRequest}; +use jsonrpsee::{core::RpcResult, proc_macros::rpc, types::error::CallError}; +use tonic::async_trait; -use crate::op_pool::{LocalPoolClient, PoolClient}; +use crate::common::server::{HealthCheck, ServerStatus}; #[rpc(server, namespace = "system")] pub trait SystemApi { @@ -11,60 +9,32 @@ pub trait SystemApi { async fn get_health(&self) -> RpcResult; } -pub struct RemoteHealthCheck { - op_pool_health_client: HealthClient, - builder_health_client: HealthClient, +pub struct HealthChecker { + servers: Vec>, } -impl RemoteHealthCheck { - pub fn new( - op_pool_health_client: HealthClient, - builder_health_client: HealthClient, - ) -> Self { - Self { - op_pool_health_client, - builder_health_client, - } - } -} - -#[async_trait] -impl SystemApiServer for RemoteHealthCheck { - async fn get_health(&self) -> RpcResult { - self.op_pool_health_client - .clone() - .check(HealthCheckRequest::default()) - .await - .context("Op pool server should be live")?; - - self.builder_health_client - .clone() - .check(HealthCheckRequest::default()) - .await - .context("Builder server should be live")?; - - Ok("ok".to_string()) - } -} - -pub struct LocalHealthCheck { - pool_client: LocalPoolClient, -} - -impl LocalHealthCheck { - pub fn new(pool_client: LocalPoolClient) -> Self { - Self { pool_client } +impl HealthChecker { + pub fn new(servers: Vec>) -> Self { + Self { servers } } } #[async_trait] -impl SystemApiServer for LocalHealthCheck { +impl SystemApiServer for HealthChecker { async fn get_health(&self) -> RpcResult { - self.pool_client - .get_supported_entry_points() - .await - .context("Op pool server should be live")?; - - Ok("ok".to_string()) + let mut errors = Vec::new(); + for server in &self.servers { + match server.status().await { + ServerStatus::Serving => {} + ServerStatus::NotServing => errors.push(server.name()), + } + } + if errors.is_empty() { + Ok("ok".to_owned()) + } else { + Err(jsonrpsee::core::Error::Call(CallError::Failed( + anyhow::anyhow!("Health check failed: {:?}", errors), + ))) + } } } diff --git a/src/rpc/task.rs b/src/rpc/task.rs index 917ede1f8..cec021d20 100644 --- a/src/rpc/task.rs +++ b/src/rpc/task.rs @@ -1,4 +1,4 @@ -use std::{net::SocketAddr, str::FromStr, sync::Arc, time::Duration}; +use std::{net::SocketAddr, sync::Arc, time::Duration}; use anyhow::{bail, Context}; use ethers::{ @@ -11,11 +11,7 @@ use jsonrpsee::{ }; use tokio::select; use tokio_util::sync::CancellationToken; -use tonic::{ - async_trait, - transport::{Channel, Uri}, -}; -use tonic_health::pb::health_client::HealthClient; +use tonic::{async_trait, transport::Channel}; use tracing::info; use url::Url; @@ -25,13 +21,13 @@ use crate::{ handle::Task, precheck, protos::builder::builder_client::BuilderClient, - server::{self, format_socket_addr}, + server::{self, format_socket_addr, HealthCheck}, }, - op_pool::{connect_remote_pool_client, LocalPoolClient, PoolClient, PoolClientMode}, + op_pool::PoolServer, rpc::{ debug::{DebugApi, DebugApiServer}, eth::{estimation, EthApi, EthApiServer}, - health::{LocalHealthCheck, RemoteHealthCheck, SystemApiServer}, + health::{HealthChecker, SystemApiServer}, metrics::RpcMetricsLogger, rundler::{RundlerApi, RundlerApiServer}, }, @@ -50,17 +46,20 @@ pub struct Args { pub estimation_settings: estimation::Settings, pub rpc_timeout: Duration, pub max_connections: u32, - pub pool_client_mode: PoolClientMode, } #[derive(Debug)] -pub struct RpcTask { +pub struct RpcTask

{ args: Args, + pool: P, } #[async_trait] -impl Task for RpcTask { - async fn run(&mut self, shutdown_token: CancellationToken) -> anyhow::Result<()> { +impl

Task for RpcTask

+where + P: PoolServer + HealthCheck + Clone, +{ + async fn run(mut self: Box, shutdown_token: CancellationToken) -> anyhow::Result<()> { let addr: SocketAddr = format_socket_addr(&self.args.host, self.args.port).parse()?; tracing::info!("Starting rpc server on {}", addr); @@ -89,44 +88,12 @@ impl Task for RpcTask { info!("Connected to builder service at {}", self.args.builder_url); let mut module = RpcModule::new(()); - match &self.args.pool_client_mode { - PoolClientMode::Local { - req_sender, - new_heads_receiver, - } => { - let pool_client = - LocalPoolClient::new(req_sender.clone(), new_heads_receiver.resubscribe()); - self.attach_namespaces(provider, pool_client.clone(), builder_client, &mut module)?; - - module.merge(LocalHealthCheck::new(pool_client).into_rpc())?; - } - PoolClientMode::Remote { url } => { - let pool_client = connect_remote_pool_client(url, shutdown_token.clone()).await?; - info!("Connected to op_pool service at {}", url); - self.attach_namespaces(provider, pool_client, builder_client, &mut module)?; - - let builder_uri = Uri::from_str(&self.args.builder_url) - .context("should be a valid URI for op_pool")?; - let op_pool_uri = - Uri::from_str(url).context("should be a valid URI for op_pool")?; - - let op_pool_health_client = HealthClient::new( - Channel::builder(op_pool_uri) - .connect() - .await - .context("should have connected to op_pool health service channel")?, - ); - let builder_health_client = HealthClient::new( - Channel::builder(builder_uri) - .connect() - .await - .context("should have connected to builder health service channel")?, - ); - module.merge( - RemoteHealthCheck::new(op_pool_health_client, builder_health_client).into_rpc(), - )?; - } - } + self.attach_namespaces(provider, builder_client, &mut module)?; + + // TODO(danc): builder health check + let servers: Vec> = vec![Box::new(self.pool.clone())]; + let health_checker = HealthChecker::new(servers); + module.merge(health_checker.into_rpc())?; // Set up health check endpoint via GET /health registers the jsonrpc handler let service_builder = tower::ServiceBuilder::new() @@ -158,9 +125,12 @@ impl Task for RpcTask { } } -impl RpcTask { - pub fn new(args: Args) -> RpcTask { - Self { args } +impl

RpcTask

+where + P: PoolServer + HealthCheck + Clone, +{ + pub fn new(args: Args, pool: P) -> Self { + Self { args, pool } } pub fn boxed(self) -> Box { @@ -176,16 +146,17 @@ impl RpcTask { tracing::error!("bailing from conneting client, server shutting down"); bail!("Server shutting down") } - res = server::connect_with_retries("builder from common", url, BuilderClient::connect) => { + res = server::connect_with_retries("builder from common", url, |url| Box::pin(async move { + BuilderClient::connect(url).await.context("should connect to builder") + })) => { res.context("should connect to builder") } } } - fn attach_namespaces( + fn attach_namespaces( &self, provider: Arc>>, - pool_client: C, builder_client: BuilderClient, module: &mut RpcModule<()>, ) -> anyhow::Result<()> { @@ -196,13 +167,13 @@ impl RpcTask { provider.clone(), self.args.entry_points.clone(), self.args.chain_id, - pool_client.clone(), + self.pool.clone(), self.args.estimation_settings, ) .into_rpc(), )?, ApiNamespace::Debug => module - .merge(DebugApi::new(pool_client.clone(), builder_client.clone()).into_rpc())?, + .merge(DebugApi::new(self.pool.clone(), builder_client.clone()).into_rpc())?, ApiNamespace::Rundler => module.merge( RundlerApi::new( provider.clone(),