From 69a064a8067110f3998545cb566815ab7bf93427 Mon Sep 17 00:00:00 2001 From: dancoombs Date: Fri, 21 Jul 2023 08:20:58 -0400 Subject: [PATCH] feat: pool server abstraction and local pool server --- proto/op_pool/op_pool.proto | 63 ++++- src/builder/bundle_proposer.rs | 106 +++---- src/builder/bundle_sender.rs | 128 ++++----- src/builder/task.rs | 129 ++++----- src/cli/builder.rs | 12 +- src/cli/node/mod.rs | 18 +- src/cli/pool.rs | 20 +- src/cli/rpc.rs | 7 +- src/common/grpc/mocks.rs | 166 +---------- src/common/handle.rs | 4 +- src/common/protos.rs | 107 +------- src/common/simulation.rs | 2 +- src/op_pool/mempool/error.rs | 2 + src/op_pool/mempool/mod.rs | 108 +++++++- src/op_pool/{ => mempool}/reputation.rs | 0 src/op_pool/mempool/uo_pool.rs | 9 +- src/op_pool/mod.rs | 11 +- src/op_pool/server.rs | 334 ----------------------- src/op_pool/server/error.rs | 20 ++ src/op_pool/server/local/client.rs | 134 +++++++++ src/op_pool/server/local/mod.rs | 5 + src/op_pool/server/local/server.rs | 240 ++++++++++++++++ src/op_pool/server/mod.rs | 57 ++++ src/op_pool/server/remote/client.rs | 253 +++++++++++++++++ src/op_pool/{ => server/remote}/error.rs | 92 +++++-- src/op_pool/server/remote/mod.rs | 7 + src/op_pool/server/remote/protos.rs | 210 ++++++++++++++ src/op_pool/server/remote/server.rs | 311 +++++++++++++++++++++ src/op_pool/task.rs | 92 +++---- src/op_pool/types.rs | 176 ------------ src/rpc/debug.rs | 69 ++--- src/rpc/eth/error.rs | 17 +- src/rpc/eth/mod.rs | 80 +++--- src/rpc/health.rs | 30 +- src/rpc/mod.rs | 33 +-- src/rpc/task.rs | 161 ++++++----- 36 files changed, 1948 insertions(+), 1265 deletions(-) rename src/op_pool/{ => mempool}/reputation.rs (100%) delete mode 100644 src/op_pool/server.rs create mode 100644 src/op_pool/server/error.rs create mode 100644 src/op_pool/server/local/client.rs create mode 100644 src/op_pool/server/local/mod.rs create mode 100644 src/op_pool/server/local/server.rs create mode 100644 src/op_pool/server/mod.rs create mode 100644 src/op_pool/server/remote/client.rs rename src/op_pool/{ => server/remote}/error.rs (89%) create mode 100644 src/op_pool/server/remote/mod.rs create mode 100644 src/op_pool/server/remote/protos.rs create mode 100644 src/op_pool/server/remote/server.rs delete mode 100644 src/op_pool/types.rs diff --git a/proto/op_pool/op_pool.proto b/proto/op_pool/op_pool.proto index e398e1011..94afdaf64 100644 --- a/proto/op_pool/op_pool.proto +++ b/proto/op_pool/op_pool.proto @@ -136,16 +136,13 @@ message AddOpRequest { message AddOpResponse { oneof result { AddOpSuccess success = 1; - AddOpFailure failure = 2; + MempoolError failure = 2; } } message AddOpSuccess { // The serialized UserOperation hash bytes hash = 1; } -message AddOpFailure { - MempoolError error = 1; -} message GetOpsRequest { // The serialized entry point address @@ -154,6 +151,12 @@ message GetOpsRequest { uint64 max_ops = 2; } message GetOpsResponse { + oneof result { + GetOpsSuccess success = 1; + MempoolError failure = 2; + } +} +message GetOpsSuccess { repeated MempoolOp ops = 1; } @@ -163,7 +166,13 @@ message RemoveOpsRequest { // The serialized UserOperation hashes to remove repeated bytes hashes = 2; } -message RemoveOpsResponse {} +message RemoveOpsResponse { + oneof result { + RemoveOpsSuccess success = 1; + MempoolError failure = 2; + } +} +message RemoveOpsSuccess {} message RemoveEntitiesRequest { // The serilaized entry point address @@ -172,15 +181,33 @@ message RemoveEntitiesRequest { // should be removed from the mempool repeated Entity entities = 2; } -message RemoveEntitiesResponse {} +message RemoveEntitiesResponse { + oneof result { + RemoveEntitiesSuccess success = 1; + MempoolError failure = 2; + } +} +message RemoveEntitiesSuccess {} message DebugClearStateRequest {} -message DebugClearStateResponse {} +message DebugClearStateResponse { + oneof result { + DebugClearStateSuccess success = 1; + MempoolError failure = 2; + } +} +message DebugClearStateSuccess {} message DebugDumpMempoolRequest { bytes entry_point = 1; } message DebugDumpMempoolResponse { + oneof result { + DebugDumpMempoolSuccess success = 1; + MempoolError failure = 2; + } +} +message DebugDumpMempoolSuccess { repeated MempoolOp ops = 1; } @@ -190,13 +217,25 @@ message DebugSetReputationRequest { // An array of reputation entries to add/replace repeated Reputation reputations = 2; } -message DebugSetReputationResponse {} +message DebugSetReputationResponse { + oneof result { + DebugSetReputationSuccess success = 1; + MempoolError failure = 2; + } +} +message DebugSetReputationSuccess {} message DebugDumpReputationRequest { // The serialized entry point address. bytes entry_point = 1; } message DebugDumpReputationResponse { + oneof result { + DebugDumpReputationSuccess success = 1; + MempoolError failure = 2; + } +} +message DebugDumpReputationSuccess { repeated Reputation reputations = 1; } @@ -229,9 +268,15 @@ message MempoolError { PrecheckViolationError precheck_violation = 6; SimulationViolationError simulation_violation = 7; UnsupportedAggregatorError unsupported_aggregator = 8; + InvalidSignatureError invalid_signature = 9; + UnknownEntryPointError unknown_entry_point = 10; } } +message UnknownEntryPointError { + bytes entry_point = 1; +} + message ReplacementUnderpricedError { bytes current_fee = 1; bytes current_priority_fee = 2; @@ -252,6 +297,8 @@ message UnsupportedAggregatorError { bytes aggregator_address = 1; } +message InvalidSignatureError {} + // PRECHECK VIOLATIONS message PrecheckViolationError { oneof violation { diff --git a/src/builder/bundle_proposer.rs b/src/builder/bundle_proposer.rs index 5f8120e7a..2172653a6 100644 --- a/src/builder/bundle_proposer.rs +++ b/src/builder/bundle_proposer.rs @@ -12,7 +12,7 @@ use linked_hash_map::LinkedHashMap; #[cfg(test)] use mockall::automock; use tokio::{sync::broadcast, try_join}; -use tonic::{async_trait, transport::Channel}; +use tonic::async_trait; use tracing::{error, info}; use crate::{ @@ -22,16 +22,13 @@ use crate::{ emit::WithEntryPoint, gas::{FeeEstimator, GasFees, PriorityFeeMode}, math, - protos::{ - self, - op_pool::{op_pool_client::OpPoolClient, GetOpsRequest, MempoolOp}, - }, simulation::{SimulationError, SimulationSuccess, Simulator}, types::{ Entity, EntityType, EntryPointLike, ExpectedStorage, HandleOpsOut, ProviderLike, Timestamp, UserOperation, }, }, + op_pool::{PoolClient, PoolOperation}, }; /// A user op must be valid for at least this long into the future to be included. @@ -74,13 +71,14 @@ pub trait BundleProposer: Send + Sync + 'static { } #[derive(Debug)] -pub struct BundleProposerImpl +pub struct BundleProposerImpl where S: Simulator, E: EntryPointLike, P: ProviderLike, + C: PoolClient, { - op_pool: OpPoolClient, + op_pool: C, simulator: S, entry_point: E, provider: Arc

, @@ -101,11 +99,12 @@ pub struct Settings { } #[async_trait] -impl BundleProposer for BundleProposerImpl +impl BundleProposer for BundleProposerImpl where S: Simulator, E: EntryPointLike, P: ProviderLike, + C: PoolClient, { async fn make_bundle(&self, required_fees: Option) -> anyhow::Result { let (ops, block_hash, bundle_fees) = try_join!( @@ -123,14 +122,14 @@ where let simulation_futures = ops .iter() .filter(|op| { - op.op.max_fee_per_gas >= required_op_fees.max_fee_per_gas - && op.op.max_priority_fee_per_gas >= required_op_fees.max_priority_fee_per_gas + op.uo.max_fee_per_gas >= required_op_fees.max_fee_per_gas + && op.uo.max_priority_fee_per_gas >= required_op_fees.max_priority_fee_per_gas }) .cloned() .map(|op| self.simulate_validation(op, block_hash)); let ops_with_simulations_future = future::join_all(simulation_futures); - let all_paymaster_addresses = ops.iter().filter_map(|op| op.op.paymaster()); + let all_paymaster_addresses = ops.iter().filter_map(|op| op.uo.paymaster()); let balances_by_paymaster_future = self.get_balances_by_paymaster(all_paymaster_addresses, block_hash); let (ops_with_simulations, balances_by_paymaster) = @@ -176,14 +175,15 @@ where } } -impl BundleProposerImpl +impl BundleProposerImpl where S: Simulator, E: EntryPointLike, P: ProviderLike, + C: PoolClient, { pub fn new( - op_pool: OpPoolClient, + op_pool: C, simulator: S, entry_point: E, provider: Arc

, @@ -211,17 +211,17 @@ where async fn simulate_validation( &self, - op: OpFromPool, + op: PoolOperation, block_hash: H256, ) -> anyhow::Result<(UserOperation, Result)> { let result = self .simulator - .simulate_validation(op.op.clone(), Some(block_hash), Some(op.expected_code_hash)) + .simulate_validation(op.uo.clone(), Some(block_hash), Some(op.expected_code_hash)) .await; match result { - Ok(success) => Ok((op.op, Ok(success))), + Ok(success) => Ok((op.uo, Ok(success))), Err(error) => match error { - SimulationError::Violations(_) => Ok((op.op, Err(error))), + SimulationError::Violations(_) => Ok((op.uo, Err(error))), SimulationError::Other(error) => Err(error), }, } @@ -397,20 +397,11 @@ where } } - async fn get_ops_from_pool(&self) -> anyhow::Result> { + async fn get_ops_from_pool(&self) -> anyhow::Result> { self.op_pool - .clone() - .get_ops(GetOpsRequest { - entry_point: self.entry_point.address().as_bytes().to_vec(), - max_ops: self.settings.max_bundle_size, - }) + .get_ops(self.entry_point.address(), self.settings.max_bundle_size) .await - .context("should get ops from op pool to bundle")? - .into_inner() - .ops - .into_iter() - .map(OpFromPool::try_from) - .collect() + .context("should get ops from pool") } async fn get_balances_by_paymaster( @@ -482,11 +473,11 @@ where Ok(()) } - fn limit_gas_in_bundle(&self, ops: Vec) -> Vec { + fn limit_gas_in_bundle(&self, ops: Vec) -> Vec { let mut gas_left = U256::from(MAX_BUNDLE_GAS_LIMIT); let mut ops_in_bundle = Vec::new(); for op in ops { - let gas = op.op.total_execution_gas_limit(self.chain_id); + let gas = op.uo.total_execution_gas_limit(self.chain_id); if gas_left < gas { break; } @@ -508,26 +499,6 @@ where } } -#[derive(Clone, Debug)] -struct OpFromPool { - op: UserOperation, - expected_code_hash: H256, -} - -impl TryFrom for OpFromPool { - type Error = anyhow::Error; - - fn try_from(value: MempoolOp) -> Result { - Ok(Self { - op: value - .uo - .context("mempool op should contain user operation")? - .try_into()?, - expected_code_hash: protos::from_bytes(&value.expected_code_hash)?, - }) - } -} - #[derive(Debug)] struct OpWithSimulation { op: UserOperation, @@ -717,17 +688,17 @@ impl ProposalContext { mod tests { use anyhow::anyhow; use ethers::{types::H160, utils::parse_units}; - use tonic::Response; use super::*; - use crate::common::{ - grpc::mocks::{self, MockOpPool}, - protos::op_pool::GetOpsResponse, - simulation::{ - AggregatorSimOut, MockSimulator, SimulationError, SimulationSuccess, - SimulationViolation, + use crate::{ + common::{ + simulation::{ + AggregatorSimOut, MockSimulator, SimulationError, SimulationSuccess, + SimulationViolation, + }, + types::{MockEntryPointLike, MockProviderLike, ValidTimeRange}, }, - types::{MockEntryPointLike, MockProviderLike, ValidTimeRange}, + op_pool::MockPoolClient, }; #[tokio::test] @@ -1194,19 +1165,20 @@ mod tests { let current_block_hash = hash(125); let expected_code_hash = hash(126); let max_bundle_size = mock_ops.len() as u64; - let mut op_pool = MockOpPool::new(); let ops: Vec<_> = mock_ops .iter() - .map(|MockOp { op, .. }| MempoolOp { - uo: Some(op.into()), - expected_code_hash: expected_code_hash.as_bytes().to_vec(), + .map(|MockOp { op, .. }| PoolOperation { + uo: op.clone(), + expected_code_hash, ..Default::default() }) .collect(); - op_pool + + let mut pool_client = MockPoolClient::new(); + pool_client .expect_get_ops() - .return_once(|_| Ok(Response::new(GetOpsResponse { ops }))); - let op_pool_handle = mocks::mock_op_pool_client(op_pool).await; + .returning(move |_, _| Ok(ops.clone())); + let simulations_by_op: HashMap<_, _> = mock_ops .into_iter() .map(|op| (op.op.op_hash(entry_point_address, 0), op.simulation_result)) @@ -1255,7 +1227,7 @@ mod tests { .returning(move |address, _| signatures_by_aggregator[&address]()); let (event_sender, _) = broadcast::channel(16); let proposer = BundleProposerImpl::new( - op_pool_handle.client.clone(), + pool_client, simulator, entry_point, Arc::new(provider), diff --git a/src/builder/bundle_sender.rs b/src/builder/bundle_sender.rs index 03ada7ea4..9c225ff90 100644 --- a/src/builder/bundle_sender.rs +++ b/src/builder/bundle_sender.rs @@ -16,7 +16,7 @@ use tokio::{ sync::{broadcast, mpsc, oneshot}, time, }; -use tonic::transport::Channel; +use tonic::async_trait; use tracing::{error, info, trace, warn}; use crate::{ @@ -30,16 +30,19 @@ use crate::{ emit::WithEntryPoint, gas::GasFees, math, - protos::op_pool::{ - self, op_pool_client::OpPoolClient, RemoveEntitiesRequest, RemoveOpsRequest, - }, types::{Entity, EntryPointLike, ExpectedStorage, UserOperation}, }, + op_pool::PoolClient, }; // Overhead on gas estimates to account for inaccuracies. const GAS_ESTIMATE_OVERHEAD_PERCENT: u64 = 10; +#[async_trait] +pub trait BundleSender: Send + Sync + 'static { + async fn send_bundles_in_loop(&mut self); +} + #[derive(Debug)] pub struct Settings { pub replacement_fee_percent_increase: u64, @@ -47,21 +50,22 @@ pub struct Settings { } #[derive(Debug)] -pub struct BundleSender +pub struct BundleSenderImpl where P: BundleProposer, E: EntryPointLike, T: TransactionTracker, + C: PoolClient, { manual_bundling_mode: Arc, send_bundle_receiver: mpsc::Receiver, chain_id: u64, beneficiary: Address, eth_poll_interval: Duration, - op_pool: OpPoolClient, proposer: P, entry_point: E, transaction_tracker: T, + pool_client: C, // TODO: Figure out what we really want to do for detecting new blocks. provider: Arc>>, settings: Settings, @@ -95,47 +99,18 @@ pub enum SendBundleResult { Error(anyhow::Error), } -impl BundleSender +#[async_trait] +impl BundleSender for BundleSenderImpl where P: BundleProposer, E: EntryPointLike, T: TransactionTracker, + C: PoolClient, { - #[allow(clippy::too_many_arguments)] - pub fn new( - manual_bundling_mode: Arc, - send_bundle_receiver: mpsc::Receiver, - chain_id: u64, - beneficiary: Address, - eth_poll_interval: Duration, - op_pool: OpPoolClient, - proposer: P, - entry_point: E, - transaction_tracker: T, - provider: Arc>>, - settings: Settings, - event_sender: broadcast::Sender>, - ) -> Self { - Self { - manual_bundling_mode, - send_bundle_receiver, - chain_id, - beneficiary, - eth_poll_interval, - op_pool, - proposer, - entry_point, - transaction_tracker, - provider, - settings, - event_sender, - } - } - /// Loops forever, attempting to form and send a bundle on each new block, /// then waiting for one bundle to be mined or dropped before forming the /// next one. - pub async fn send_bundles_in_loop(&mut self) -> ! { + async fn send_bundles_in_loop(&mut self) { let mut last_block_number = 0; loop { let mut send_bundle_response: Option> = None; @@ -189,6 +164,45 @@ where } } } +} + +impl BundleSenderImpl +where + P: BundleProposer, + E: EntryPointLike, + T: TransactionTracker, + C: PoolClient, +{ + #[allow(clippy::too_many_arguments)] + pub fn new( + manual_bundling_mode: Arc, + send_bundle_receiver: mpsc::Receiver, + chain_id: u64, + beneficiary: Address, + eth_poll_interval: Duration, + proposer: P, + entry_point: E, + transaction_tracker: T, + pool_client: C, + provider: Arc>>, + settings: Settings, + event_sender: broadcast::Sender>, + ) -> Self { + Self { + manual_bundling_mode, + send_bundle_receiver, + chain_id, + beneficiary, + eth_poll_interval, + proposer, + entry_point, + transaction_tracker, + pool_client, + provider, + settings, + event_sender, + } + } async fn check_for_and_log_transaction_update(&self) { let update = self.transaction_tracker.check_for_update_now().await; @@ -423,34 +437,22 @@ where } async fn remove_ops_from_pool(&self, ops: &[UserOperation]) -> anyhow::Result<()> { - self.op_pool - .clone() - .remove_ops(RemoveOpsRequest { - entry_point: self.entry_point.address().as_bytes().to_vec(), - hashes: ops - .iter() - .map(|op| self.op_hash(op).as_bytes().to_vec()) + self.pool_client + .remove_ops( + self.entry_point.address(), + ops.iter() + .map(|op| op.op_hash(self.entry_point.address(), self.chain_id)) .collect(), - }) + ) .await - .context("builder should remove rejected ops from pool")?; - Ok(()) + .context("builder should remove rejected ops from pool") } async fn remove_entities_from_pool(&self, entities: &[Entity]) -> anyhow::Result<()> { - self.op_pool - .clone() - .remove_entities(RemoveEntitiesRequest { - entry_point: self.entry_point.address().as_bytes().to_vec(), - entities: entities.iter().map(op_pool::Entity::from).collect(), - }) + self.pool_client + .remove_entities(self.entry_point.address(), entities.to_vec()) .await - .context("builder should remove rejected entities from pool")?; - Ok(()) - } - - fn op_hash(&self, op: &UserOperation) -> H256 { - op.op_hash(self.entry_point.address(), self.chain_id) + .context("builder should remove rejected entities from pool") } fn emit(&self, event: BuilderEvent) { @@ -459,6 +461,10 @@ where event, }); } + + fn op_hash(&self, op: &UserOperation) -> H256 { + op.op_hash(self.entry_point.address(), self.chain_id) + } } struct BuilderMetrics {} diff --git a/src/builder/task.rs b/src/builder/task.rs index 7eafda22f..65487a845 100644 --- a/src/builder/task.rs +++ b/src/builder/task.rs @@ -4,27 +4,23 @@ use std::{ time::Duration, }; -use anyhow::{bail, Context}; +use anyhow::Context; use ethers::types::{Address, H256}; use ethers_signers::Signer; use rusoto_core::Region; use tokio::{ - select, sync::{broadcast, mpsc}, time, }; use tokio_util::sync::CancellationToken; -use tonic::{ - async_trait, - transport::{Channel, Server}, -}; +use tonic::{async_trait, transport::Server}; use tracing::info; +use super::emit::BuilderEvent; use crate::{ builder::{ bundle_proposer::{self, BundleProposerImpl}, - bundle_sender::{self, BundleSender}, - emit::BuilderEvent, + bundle_sender::{self, BundleSender, BundleSenderImpl}, sender::get_sender, server::BuilderImpl, signer::{BundlerSigner, KmsSigner, LocalSigner}, @@ -33,17 +29,15 @@ use crate::{ common::{ contracts::i_entry_point::IEntryPoint, emit::WithEntryPoint, - eth, + eth::{self, new_provider}, gas::PriorityFeeMode, handle::{SpawnGuard, Task}, mempool::MempoolConfig, - protos::{ - builder::{builder_server::BuilderServer, BUILDER_FILE_DESCRIPTOR_SET}, - op_pool::op_pool_client::OpPoolClient, - }, - server::{self, format_socket_addr}, + protos::builder::{builder_server::BuilderServer, BUILDER_FILE_DESCRIPTOR_SET}, + server::format_socket_addr, simulation::{self, SimulatorImpl}, }, + op_pool::{connect_remote_pool_client, LocalPoolClient, PoolClientMode}, }; #[derive(Debug)] @@ -51,7 +45,6 @@ pub struct Args { pub port: u16, pub host: String, pub rpc_url: String, - pub pool_url: String, pub entry_point_address: Address, pub private_key: Option, pub aws_kms_key_ids: Vec, @@ -71,6 +64,7 @@ pub struct Args { pub max_blocks_to_wait_for_mine: u64, pub replacement_fee_percent_increase: u64, pub max_fee_increases: u64, + pub pool_client_mode: PoolClientMode, } #[derive(Debug)] @@ -81,7 +75,7 @@ pub struct BuilderTask { #[async_trait] impl Task for BuilderTask { - async fn run(&self, shutdown_token: CancellationToken) -> anyhow::Result<()> { + async fn run(&mut self, shutdown_token: CancellationToken) -> anyhow::Result<()> { let addr = format_socket_addr(&self.args.host, self.args.port).parse()?; info!("Starting builder server on {}", addr); tracing::info!("Mempool config: {:?}", self.args.mempool_configs); @@ -126,8 +120,6 @@ impl Task for BuilderTask { priority_fee_mode: self.args.priority_fee_mode, bundle_priority_fee_overhead_percent: self.args.bundle_priority_fee_overhead_percent, }; - let op_pool = - Self::connect_client_with_shutdown(&self.args.pool_url, shutdown_token.clone()).await?; let simulator = SimulatorImpl::new( Arc::clone(&provider), self.args.entry_point_address, @@ -135,17 +127,7 @@ impl Task for BuilderTask { self.args.mempool_configs.clone(), ); let entry_point = IEntryPoint::new(self.args.entry_point_address, Arc::clone(&provider)); - let proposer = BundleProposerImpl::new( - op_pool.clone(), - simulator, - entry_point.clone(), - Arc::clone(&provider), - self.args.chain_id, - proposer_settings, - self.event_sender.clone(), - ); - let submit_provider = - eth::new_provider(&self.args.submit_url, self.args.eth_poll_interval)?; + let submit_provider = new_provider(&self.args.submit_url, self.args.eth_poll_interval)?; let transaction_sender = get_sender( submit_provider, signer, @@ -171,28 +153,66 @@ impl Task for BuilderTask { let manual_bundling_mode = Arc::new(AtomicBool::new(false)); let (send_bundle_tx, send_bundle_rx) = mpsc::channel(1); - let mut bundle_sender = BundleSender::new( - manual_bundling_mode.clone(), - send_bundle_rx, - self.args.chain_id, - beneficiary, - self.args.eth_poll_interval, - op_pool, - proposer, - entry_point, - transaction_tracker, - provider, - builder_settings, - self.event_sender.clone(), - ); - - let _builder_loop_guard = { - SpawnGuard::spawn_with_guard(async move { bundle_sender.send_bundles_in_loop().await }) + let mut builder: Box = match &self.args.pool_client_mode { + PoolClientMode::Local { sender } => { + let pool_client = LocalPoolClient::new(sender.clone()); + let proposer = BundleProposerImpl::new( + pool_client.clone(), + simulator, + entry_point.clone(), + Arc::clone(&provider), + self.args.chain_id, + proposer_settings, + self.event_sender.clone(), + ); + Box::new(BundleSenderImpl::new( + manual_bundling_mode.clone(), + send_bundle_rx, + self.args.chain_id, + beneficiary, + self.args.eth_poll_interval, + proposer, + entry_point, + transaction_tracker, + pool_client, + provider, + builder_settings, + self.event_sender.clone(), + )) + } + PoolClientMode::Remote { url } => { + let pool_client = connect_remote_pool_client(url, shutdown_token.clone()).await?; + let proposer = BundleProposerImpl::new( + pool_client.clone(), + simulator, + entry_point.clone(), + Arc::clone(&provider), + self.args.chain_id, + proposer_settings, + self.event_sender.clone(), + ); + Box::new(BundleSenderImpl::new( + manual_bundling_mode.clone(), + send_bundle_rx, + self.args.chain_id, + beneficiary, + self.args.eth_poll_interval, + proposer, + entry_point, + transaction_tracker, + pool_client, + provider, + builder_settings, + self.event_sender.clone(), + )) + } }; - let builder_server = BuilderImpl::new(manual_bundling_mode, send_bundle_tx); + let _builder_loop_guard = + { SpawnGuard::spawn_with_guard(async move { builder.send_bundles_in_loop().await }) }; // gRPC server + let builder_server = BuilderImpl::new(manual_bundling_mode, send_bundle_tx); let builder_server = BuilderServer::new(builder_server); let reflection_service = tonic_reflection::server::Builder::configure() @@ -237,19 +257,4 @@ impl BuilderTask { pub fn boxed(self) -> Box { Box::new(self) } - - async fn connect_client_with_shutdown( - op_pool_url: &str, - shutdown_token: CancellationToken, - ) -> anyhow::Result> { - select! { - _ = shutdown_token.cancelled() => { - tracing::error!("bailing from connecting client, server shutting down"); - bail!("Server shutting down") - } - res = server::connect_with_retries("op pool from builder", op_pool_url, OpPoolClient::connect) => { - res - } - } - } } diff --git a/src/cli/builder.rs b/src/cli/builder.rs index fb74f2f44..cfb3f13d9 100644 --- a/src/cli/builder.rs +++ b/src/cli/builder.rs @@ -15,6 +15,7 @@ use crate::{ mempool::MempoolConfig, server::format_server_addr, }, + op_pool::PoolClientMode, }; /// CLI options for the builder @@ -152,7 +153,7 @@ impl BuilderArgs { pub async fn to_args( &self, common: &CommonArgs, - pool_url: String, + pool_client_mode: PoolClientMode, ) -> anyhow::Result { let priority_fee_mode = PriorityFeeMode::try_from( common.priority_fee_mode_kind.as_str(), @@ -176,7 +177,6 @@ impl BuilderArgs { port: self.port, host: self.host.clone(), rpc_url, - pool_url, entry_point_address: common .entry_points .get(0) @@ -204,6 +204,7 @@ impl BuilderArgs { max_blocks_to_wait_for_mine: self.max_blocks_to_wait_for_mine, replacement_fee_percent_increase: self.replacement_fee_percent_increase, max_fee_increases: self.max_fee_increases, + pool_client_mode, }) } @@ -233,11 +234,14 @@ pub async fn run(builder_args: BuilderCliArgs, common_args: CommonArgs) -> anyho builder: builder_args, pool_url, } = builder_args; - let task_args = builder_args.to_args(&common_args, pool_url).await?; - let (event_sender, event_rx) = broadcast::channel(EVENT_CHANNEL_CAPACITY); + let (event_sender, event_rx) = broadcast::channel(EVENT_CHANNEL_CAPACITY); emit::receive_and_log_events_with_filter(event_rx, is_nonspammy_event); + let task_args = builder_args + .to_args(&common_args, PoolClientMode::Remote { url: pool_url }) + .await?; + spawn_tasks_with_shutdown( [BuilderTask::new(task_args, event_sender).boxed()], tokio::signal::ctrl_c(), diff --git a/src/cli/node/mod.rs b/src/cli/node/mod.rs index 2bbf4f8a8..6acb0f29a 100644 --- a/src/cli/node/mod.rs +++ b/src/cli/node/mod.rs @@ -1,5 +1,5 @@ use clap::Args; -use tokio::sync::broadcast; +use tokio::sync::{broadcast, mpsc}; use self::events::Event; use crate::{ @@ -13,9 +13,8 @@ use crate::{ common::{ emit::{self, WithEntryPoint, EVENT_CHANNEL_CAPACITY}, handle, - server::format_server_addr, }, - op_pool::{emit::OpPoolEvent, PoolTask}, + op_pool::{emit::OpPoolEvent, PoolClientMode, PoolServerMode, PoolTask}, rpc::RpcTask, }; mod events; @@ -39,18 +38,23 @@ pub async fn run(bundler_args: NodeCliArgs, common_args: CommonArgs) -> anyhow:: rpc: rpc_args, } = bundler_args; - let pool_url = format_server_addr(&pool_args.host, pool_args.port, false); let builder_url = builder_args.url(false); - let pool_task_args = pool_args.to_args(&common_args).await?; - let builder_task_args = builder_args.to_args(&common_args, pool_url.clone()).await?; + let (tx, rx) = mpsc::channel(1024); + + let pool_task_args = pool_args + .to_args(&common_args, PoolServerMode::Local { receiver: Some(rx) }) + .await?; + let builder_task_args = builder_args + .to_args(&common_args, PoolClientMode::Local { sender: tx.clone() }) + .await?; let rpc_task_args = rpc_args .to_args( &common_args, - pool_url, builder_url, (&common_args).try_into()?, (&common_args).try_into()?, + PoolClientMode::Local { sender: tx }, ) .await?; diff --git a/src/cli/pool.rs b/src/cli/pool.rs index a83725537..3b14cb546 100644 --- a/src/cli/pool.rs +++ b/src/cli/pool.rs @@ -13,7 +13,7 @@ use crate::{ handle::spawn_tasks_with_shutdown, mempool::MempoolConfig, }, - op_pool::{self, PoolConfig, PoolTask}, + op_pool::{self, PoolConfig, PoolServerMode, PoolTask}, }; /// CLI options for the OP Pool #[derive(Args, Debug)] @@ -96,7 +96,11 @@ pub struct PoolArgs { impl PoolArgs { /// Convert the CLI arguments into the arguments for the OP Pool combining /// common and op pool specific arguments. - pub async fn to_args(&self, common: &CommonArgs) -> anyhow::Result { + pub async fn to_args( + &self, + common: &CommonArgs, + server_mode: PoolServerMode, + ) -> anyhow::Result { let blocklist = match &self.blocklist_path { Some(blocklist) => Some(get_json_config(blocklist, &common.aws_region).await?), None => None, @@ -138,8 +142,6 @@ impl PoolArgs { .collect::>>()?; Ok(op_pool::Args { - port: self.port, - host: self.host.clone(), chain_id: common.chain_id, chain_history_size: self .chain_history_size @@ -150,6 +152,7 @@ impl PoolArgs { .context("pool requires node_http arg")?, http_poll_interval: Duration::from_millis(self.http_poll_interval_millis), pool_configs, + server_mode, }) } } @@ -182,8 +185,15 @@ pub struct PoolCliArgs { pub async fn run(pool_args: PoolCliArgs, common_args: CommonArgs) -> anyhow::Result<()> { let PoolCliArgs { pool: pool_args } = pool_args; - let task_args = pool_args.to_args(&common_args).await?; let (event_sender, event_rx) = broadcast::channel(EVENT_CHANNEL_CAPACITY); + let task_args = pool_args + .to_args( + &common_args, + PoolServerMode::Remote { + addr: format!("{}:{}", pool_args.host, pool_args.port).parse()?, + }, + ) + .await?; emit::receive_and_log_events_with_filter(event_rx, |_| true); diff --git a/src/cli/rpc.rs b/src/cli/rpc.rs index e02445fa9..5609aa644 100644 --- a/src/cli/rpc.rs +++ b/src/cli/rpc.rs @@ -6,6 +6,7 @@ use clap::Args; use super::CommonArgs; use crate::{ common::{handle::spawn_tasks_with_shutdown, precheck}, + op_pool::PoolClientMode, rpc::{self, estimation, RpcTask}, }; @@ -67,10 +68,10 @@ impl RpcArgs { pub async fn to_args( &self, common: &CommonArgs, - pool_url: String, builder_url: String, precheck_settings: precheck::Settings, estimation_settings: estimation::Settings, + pool_client_mode: PoolClientMode, ) -> anyhow::Result { let apis = self .api @@ -81,7 +82,6 @@ impl RpcArgs { Ok(rpc::Args { port: self.port, host: self.host.clone(), - pool_url, builder_url, entry_points: common .entry_points @@ -99,6 +99,7 @@ impl RpcArgs { estimation_settings, rpc_timeout: Duration::from_secs(self.timeout_seconds.parse()?), max_connections: self.max_connections, + pool_client_mode, }) } } @@ -138,10 +139,10 @@ pub async fn run(rpc_args: RpcCliArgs, common_args: CommonArgs) -> anyhow::Resul let task_args = rpc_args .to_args( &common_args, - pool_url, builder_url, (&common_args).try_into()?, (&common_args).try_into()?, + PoolClientMode::Remote { url: pool_url }, ) .await?; diff --git a/src/common/grpc/mocks.rs b/src/common/grpc/mocks.rs index 2643b01fd..82f8dcf68 100644 --- a/src/common/grpc/mocks.rs +++ b/src/common/grpc/mocks.rs @@ -9,76 +9,17 @@ use tonic::{ Request, Response, }; -use crate::common::protos::{ - builder::{ - builder_client::BuilderClient, - builder_server::{Builder, BuilderServer}, - DebugSendBundleNowRequest, DebugSendBundleNowResponse, DebugSetBundlingModeRequest, - DebugSetBundlingModeResponse, - }, - op_pool::{ - op_pool_client::OpPoolClient, - op_pool_server::{OpPool, OpPoolServer}, - AddOpRequest, AddOpResponse, DebugClearStateRequest, DebugClearStateResponse, - DebugDumpMempoolRequest, DebugDumpMempoolResponse, DebugDumpReputationRequest, - DebugDumpReputationResponse, DebugSetReputationRequest, DebugSetReputationResponse, - GetOpsRequest, GetOpsResponse, GetSupportedEntryPointsRequest, - GetSupportedEntryPointsResponse, RemoveEntitiesRequest, RemoveEntitiesResponse, - RemoveOpsRequest, RemoveOpsResponse, - }, +use crate::common::protos::builder::{ + builder_client::BuilderClient, + builder_server::{Builder, BuilderServer}, + DebugSendBundleNowRequest, DebugSendBundleNowResponse, DebugSetBundlingModeRequest, + DebugSetBundlingModeResponse, }; /// Maximum number of incrementing ports to try when looking for an open port /// for a mock server. const MAX_PORT_ATTEMPTS: u16 = 32; -mock! { - #[derive(Debug)] - pub OpPool {} - - #[async_trait] - impl OpPool for OpPool { - async fn get_supported_entry_points( - &self, - _request: Request, - ) -> tonic::Result>; - - async fn add_op(&self, request: Request) -> tonic::Result>; - - async fn get_ops(&self, request: Request) -> tonic::Result>; - - async fn remove_ops( - &self, - request: Request, - ) -> tonic::Result>; - - async fn remove_entities( - &self, - _request: Request, - ) -> tonic::Result>; - - async fn debug_clear_state( - &self, - _request: Request, - ) -> tonic::Result>; - - async fn debug_dump_mempool( - &self, - request: Request, - ) -> tonic::Result>; - - async fn debug_set_reputation( - &self, - request: Request, - ) -> tonic::Result>; - - async fn debug_dump_reputation( - &self, - request: Request, - ) -> tonic::Result>; - } -} - mock! { #[derive(Debug)] pub Builder {} @@ -111,30 +52,8 @@ impl Drop for ClientHandle { } } -pub type OpPoolHandle = ClientHandle>; pub type BuilderHandle = ClientHandle>; -/// Creates an `OpPoolClient` connected to a local gRPC server which uses the -/// provided `mock` to respond to requests. Returns a handle which exposes the -/// client and shuts down the server when dropped. -pub async fn mock_op_pool_client(mock: impl OpPool) -> OpPoolHandle { - mock_op_pool_client_with_port(mock, 56776).await -} - -/// Like `mock_op_pool_client`, but accepts a custom port to avoid conflicts. -pub async fn mock_op_pool_client_with_port(mock: impl OpPool, port: u16) -> OpPoolHandle { - mock_client_with_port( - |listener_stream| { - Server::builder() - .add_service(OpPoolServer::new(mock)) - .serve_with_incoming(listener_stream) - }, - OpPoolClient::connect, - port, - ) - .await -} - /// Creates a `BuilderClient` connected to a local gRPC server which uses the /// provided `mock` to respond to requests. Returns a handle which exposes the /// client and shuts down the server when dropped. @@ -195,30 +114,8 @@ async fn find_open_port(starting_port: u16) -> (u16, TcpListenerStream) { #[cfg(test)] mod test { - use tokio::time; - use super::*; - #[tokio::test] - async fn test_op_pool_mock() { - let mut op_pool = MockOpPool::new(); - op_pool.expect_get_supported_entry_points().returning(|_| { - Ok(Response::new(GetSupportedEntryPointsResponse { - chain_id: 1337, - entry_points: vec![vec![1, 2, 3]], - })) - }); - let mut handle = mock_op_pool_client(op_pool).await; - let response = handle - .client - .get_supported_entry_points(GetSupportedEntryPointsRequest {}) - .await - .expect("should get response from mock") - .into_inner(); - assert_eq!(response.chain_id, 1337); - assert_eq!(response.entry_points, vec![vec![1, 2, 3]]); - } - #[tokio::test] async fn test_builder_mock() { let mut builder = MockBuilder::new(); @@ -237,57 +134,4 @@ mod test { .transaction_hash; assert_eq!(transaction_hash, vec![1, 2, 3]); } - - #[tokio::test] - async fn test_multiple_mocks_avoid_port_collision() { - let mut op_pool = MockOpPool::new(); - op_pool.expect_get_supported_entry_points().returning(|_| { - Ok(Response::new(GetSupportedEntryPointsResponse { - chain_id: 10, - entry_points: vec![], - })) - }); - let mut handle1 = mock_op_pool_client(op_pool).await; - let mut op_pool = MockOpPool::new(); - op_pool.expect_get_supported_entry_points().returning(|_| { - Ok(Response::new(GetSupportedEntryPointsResponse { - chain_id: 20, - entry_points: vec![], - })) - }); - let mut handle2 = mock_op_pool_client(op_pool).await; - let chain_id1 = handle1 - .client - .get_supported_entry_points(GetSupportedEntryPointsRequest {}) - .await - .expect("should get response from mock") - .into_inner() - .chain_id; - assert_eq!(chain_id1, 10); - let chain_id2 = handle2 - .client - .get_supported_entry_points(GetSupportedEntryPointsRequest {}) - .await - .expect("should get response from mock") - .into_inner() - .chain_id; - assert_eq!(chain_id2, 20) - } - - #[tokio::test] - async fn test_server_shutdown_when_handle_drops() { - let port = 10000; - assert!(port_is_available(port).await); - { - let _handle = mock_op_pool_client_with_port(MockOpPool::new(), port).await; - assert!(!port_is_available(port).await); - } - // Sleeping any duration is enough for the server to shut down. - time::sleep(Duration::from_millis(1)).await; - assert!(port_is_available(port).await); - } - - async fn port_is_available(port: u16) -> bool { - TcpListener::bind(format!("[::1]:{port}")).await.is_ok() - } } diff --git a/src/common/handle.rs b/src/common/handle.rs index f3f4b7654..ec4cb1367 100644 --- a/src/common/handle.rs +++ b/src/common/handle.rs @@ -49,7 +49,7 @@ impl Drop for SpawnGuard { #[async_trait] pub trait Task: Sync + Send + 'static { - async fn run(&self, shutdown_token: CancellationToken) -> anyhow::Result<()>; + async fn run(&mut self, shutdown_token: CancellationToken) -> anyhow::Result<()>; } pub async fn spawn_tasks_with_shutdown( @@ -63,7 +63,7 @@ pub async fn spawn_tasks_with_shutdown( let shutdown_token = CancellationToken::new(); let mut shutdown_scope = Some(shutdown_scope); - let handles = tasks.into_iter().map(|task| { + let handles = tasks.into_iter().map(|mut task| { let st = shutdown_token.clone(); let ss = shutdown_scope.clone(); async move { diff --git a/src/common/protos.rs b/src/common/protos.rs index c4fd28da4..3c4a406ed 100644 --- a/src/common/protos.rs +++ b/src/common/protos.rs @@ -1,103 +1,6 @@ use ethers::types::{Address, H256, U256}; -use crate::common::types::{ - BundlingMode as RpcBundlingMode, Entity as CommonEntity, EntityType as CommonEntityType, - UserOperation as RpcUserOperation, -}; - -pub mod op_pool { - use super::*; - - tonic::include_proto!("op_pool"); - - pub const OP_POOL_FILE_DESCRIPTOR_SET: &[u8] = - tonic::include_file_descriptor_set!("op_pool_descriptor"); - - impl From<&RpcUserOperation> for UserOperation { - fn from(op: &RpcUserOperation) -> Self { - UserOperation { - sender: op.sender.0.to_vec(), - nonce: to_le_bytes(op.nonce), - init_code: op.init_code.to_vec(), - call_data: op.call_data.to_vec(), - call_gas_limit: to_le_bytes(op.call_gas_limit), - verification_gas_limit: to_le_bytes(op.verification_gas_limit), - pre_verification_gas: to_le_bytes(op.pre_verification_gas), - max_fee_per_gas: to_le_bytes(op.max_fee_per_gas), - max_priority_fee_per_gas: to_le_bytes(op.max_priority_fee_per_gas), - paymaster_and_data: op.paymaster_and_data.to_vec(), - signature: op.signature.to_vec(), - } - } - } - - impl TryFrom for RpcUserOperation { - type Error = ConversionError; - - fn try_from(op: UserOperation) -> Result { - Ok(RpcUserOperation { - sender: from_bytes(&op.sender)?, - nonce: from_bytes(&op.nonce)?, - init_code: op.init_code.into(), - call_data: op.call_data.into(), - call_gas_limit: from_bytes(&op.call_gas_limit)?, - verification_gas_limit: from_bytes(&op.verification_gas_limit)?, - pre_verification_gas: from_bytes(&op.pre_verification_gas)?, - max_fee_per_gas: from_bytes(&op.max_fee_per_gas)?, - max_priority_fee_per_gas: from_bytes(&op.max_priority_fee_per_gas)?, - paymaster_and_data: op.paymaster_and_data.into(), - signature: op.signature.into(), - }) - } - } - - impl TryFrom for CommonEntityType { - type Error = ConversionError; - - fn try_from(entity: EntityType) -> Result { - match entity { - EntityType::Unspecified => Err(ConversionError::InvalidEntity(entity as i32)), - EntityType::Account => Ok(CommonEntityType::Account), - EntityType::Paymaster => Ok(CommonEntityType::Paymaster), - EntityType::Aggregator => Ok(CommonEntityType::Aggregator), - EntityType::Factory => Ok(CommonEntityType::Factory), - } - } - } - - impl From for EntityType { - fn from(entity: CommonEntityType) -> Self { - match entity { - CommonEntityType::Account => EntityType::Account, - CommonEntityType::Paymaster => EntityType::Paymaster, - CommonEntityType::Aggregator => EntityType::Aggregator, - CommonEntityType::Factory => EntityType::Factory, - } - } - } - - impl TryFrom<&Entity> for CommonEntity { - type Error = ConversionError; - - fn try_from(entity: &Entity) -> Result { - Ok(CommonEntity { - kind: EntityType::from_i32(entity.kind) - .ok_or(ConversionError::InvalidEntity(entity.kind))? - .try_into()?, - address: from_bytes(&entity.address)?, - }) - } - } - - impl From<&CommonEntity> for Entity { - fn from(entity: &CommonEntity) -> Self { - Entity { - kind: EntityType::from(entity.kind).into(), - address: entity.address.as_bytes().to_vec(), - } - } - } -} +use crate::common::types::BundlingMode as RpcBundlingMode; pub mod builder { use super::*; @@ -123,7 +26,7 @@ pub mod builder { match value { BundlingMode::Auto => Ok(RpcBundlingMode::Auto), BundlingMode::Manual => Ok(RpcBundlingMode::Manual), - _ => Err(ConversionError::InvalidBundlingMode(value as i32)), + _ => Err(ConversionError::InvalidEnumValue(value as i32)), } } } @@ -142,10 +45,8 @@ pub enum ConversionError { InvalidLength(usize, usize), #[error("Invalid timestamp: {0}")] InvalidTimestamp(u64), - #[error("Entity was invalid or unspecified: {0}")] - InvalidEntity(i32), - #[error("Bundling Mode was invalid or unspecified: {0}")] - InvalidBundlingMode(i32), + #[error("Invalid enum value {0}")] + InvalidEnumValue(i32), } pub fn from_bytes(bytes: &[u8]) -> Result { diff --git a/src/common/simulation.rs b/src/common/simulation.rs index 75859e642..a5bb83765 100644 --- a/src/common/simulation.rs +++ b/src/common/simulation.rs @@ -506,7 +506,7 @@ pub enum SimulationViolation { #[display("ran out of gas during {0.kind} validation")] OutOfGas(Entity), #[display( - "{0.kind} tried to access code at {1} during validation, but that address is not a contract" + "{0.kind} tried to access code at {1:?} during validation, but that address is not a contract" )] AccessedUndeployedContract(Entity, Address), #[display("{0.kind} called entry point method other than depositTo")] diff --git a/src/op_pool/mempool/error.rs b/src/op_pool/mempool/error.rs index 79a3ebf2d..41b512de4 100644 --- a/src/op_pool/mempool/error.rs +++ b/src/op_pool/mempool/error.rs @@ -35,6 +35,8 @@ pub enum MempoolError { SimulationViolation(SimulationViolation), #[error("Unsupported aggregator {0}")] UnsupportedAggregator(Address), + #[error("Unknown entry point {0}")] + UnknownEntryPoint(Address), } impl From for MempoolError { diff --git a/src/op_pool/mempool/mod.rs b/src/op_pool/mempool/mod.rs index 284ffb165..d7d767a3d 100644 --- a/src/op_pool/mempool/mod.rs +++ b/src/op_pool/mempool/mod.rs @@ -1,5 +1,6 @@ pub mod error; mod pool; +mod reputation; mod size; pub mod uo_pool; @@ -9,17 +10,23 @@ use std::{ }; use ethers::types::{Address, H256}; +#[cfg(test)] +use mockall::automock; +pub use reputation::{ + HourlyMovingAverageReputation, Reputation, ReputationParams, ReputationStatus, +}; use strum::IntoEnumIterator; use tonic::async_trait; use self::error::MempoolResult; -use super::reputation::Reputation; +use super::MempoolError; use crate::common::{ mempool::MempoolConfig, precheck, simulation, types::{Entity, EntityType, UserOperation, ValidTimeRange}, }; +#[cfg_attr(test, automock)] #[async_trait] /// In-memory operation pool pub trait Mempool: Send + Sync + 'static { @@ -34,7 +41,7 @@ pub trait Mempool: Send + Sync + 'static { ) -> MempoolResult; /// Removes a set of operations from the pool. - fn remove_operations<'a>(&self, hashes: impl IntoIterator); + fn remove_operations(&self, hashes: &[H256]); /// Removes all operations assocaited with a given entity from the pool. fn remove_entity(&self, entity: Entity); @@ -169,6 +176,103 @@ impl PoolOperation { } } +#[derive(Debug)] +pub struct MempoolGroup { + mempools: HashMap>, +} + +impl MempoolGroup +where + M: Mempool, +{ + pub fn new(mempools: Vec>) -> Self { + Self { + mempools: mempools.into_iter().map(|m| (m.entry_point(), m)).collect(), + } + } + + pub fn get_supported_entry_points(&self) -> Vec

{ + self.mempools.keys().copied().collect() + } + + pub async fn add_op( + &self, + entry_point: Address, + op: UserOperation, + origin: OperationOrigin, + ) -> MempoolResult { + let mempool = self.get_pool(entry_point)?; + mempool.add_operation(origin, op).await + } + + pub fn get_ops(&self, entry_point: Address, max_ops: u64) -> MempoolResult> { + let mempool = self.get_pool(entry_point)?; + Ok(mempool + .best_operations(max_ops as usize) + .iter() + .map(|op| (**op).clone()) + .collect()) + } + + pub fn remove_ops(&self, entry_point: Address, ops: &[H256]) -> MempoolResult<()> { + let mempool = self.get_pool(entry_point)?; + mempool.remove_operations(ops); + Ok(()) + } + + pub fn remove_entities<'a>( + &self, + entry_point: Address, + entities: impl IntoIterator, + ) -> MempoolResult<()> { + let mempool = self.get_pool(entry_point)?; + for entity in entities { + mempool.remove_entity(*entity); + } + Ok(()) + } + + pub fn debug_clear_state(&self) -> MempoolResult<()> { + for mempool in self.mempools.values() { + mempool.clear(); + } + Ok(()) + } + + pub fn debug_dump_mempool(&self, entry_point: Address) -> MempoolResult> { + let mempool = self.get_pool(entry_point)?; + Ok(mempool + .all_operations(usize::MAX) + .iter() + .map(|op| (**op).clone()) + .collect()) + } + + pub fn debug_set_reputations<'a>( + &self, + entry_point: Address, + reputations: impl IntoIterator, + ) -> MempoolResult<()> { + let mempool = self.get_pool(entry_point)?; + for rep in reputations { + mempool.set_reputation(rep.address, rep.ops_seen, rep.ops_included); + } + Ok(()) + } + + pub fn debug_dump_reputation(&self, entry_point: Address) -> MempoolResult> { + let mempool = self.get_pool(entry_point)?; + Ok(mempool.dump_reputation()) + } + + pub fn get_pool(&self, entry_point: Address) -> MempoolResult> { + self.mempools + .get(&entry_point) + .cloned() + .ok_or_else(|| MempoolError::UnknownEntryPoint(entry_point)) + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/src/op_pool/reputation.rs b/src/op_pool/mempool/reputation.rs similarity index 100% rename from src/op_pool/reputation.rs rename to src/op_pool/mempool/reputation.rs diff --git a/src/op_pool/mempool/uo_pool.rs b/src/op_pool/mempool/uo_pool.rs index 1b29806f8..bbe7ba34b 100644 --- a/src/op_pool/mempool/uo_pool.rs +++ b/src/op_pool/mempool/uo_pool.rs @@ -13,6 +13,7 @@ use tracing::info; use super::{ error::{MempoolError, MempoolResult}, pool::PoolInner, + reputation::{Reputation, ReputationManager, ReputationStatus}, Mempool, OperationOrigin, PoolConfig, PoolOperation, }; use crate::{ @@ -25,7 +26,6 @@ use crate::{ op_pool::{ chain::ChainUpdate, emit::{EntityReputation, EntityStatus, EntitySummary, OpPoolEvent, OpRemovalReason}, - reputation::{Reputation, ReputationManager, ReputationStatus}, }, }; @@ -312,7 +312,7 @@ where Ok(hash) } - fn remove_operations<'a>(&self, hashes: impl IntoIterator) { + fn remove_operations(&self, hashes: &[H256]) { let mut count = 0; let mut removed_hashes = vec![]; { @@ -324,6 +324,7 @@ where } } } + for hash in removed_hashes { self.emit(OpPoolEvent::RemovedOp { op_hash: hash, @@ -412,7 +413,7 @@ mod tests { }, types::UserOperation, }, - op_pool::{chain::MinedOp, reputation::MockReputationManager}, + op_pool::{chain::MinedOp, mempool::reputation::MockReputationManager}, }; #[tokio::test] @@ -427,7 +428,7 @@ mod tests { .await .unwrap(); check_ops(pool.best_operations(1), uos); - pool.remove_operations(&vec![hash]); + pool.remove_operations(&[hash]); assert_eq!(pool.best_operations(1), vec![]); } diff --git a/src/op_pool/mod.rs b/src/op_pool/mod.rs index 0d3d23da0..aa80cdac9 100644 --- a/src/op_pool/mod.rs +++ b/src/op_pool/mod.rs @@ -1,11 +1,14 @@ mod chain; pub mod emit; -mod error; mod mempool; -mod reputation; mod server; mod task; -mod types; -pub use mempool::{error::MempoolError, PoolConfig}; +pub use mempool::{error::MempoolError, PoolConfig, PoolOperation, Reputation, ReputationStatus}; +#[cfg(test)] +pub use server::MockPoolClient; +pub use server::{ + connect_remote_pool_client, LocalPoolClient, PoolClient, PoolClientMode, PoolResult, + PoolServerError, RemotePoolClient, ServerRequest as LocalPoolServerRequest, +}; pub use task::*; diff --git a/src/op_pool/server.rs b/src/op_pool/server.rs deleted file mode 100644 index 97783f156..000000000 --- a/src/op_pool/server.rs +++ /dev/null @@ -1,334 +0,0 @@ -use std::{collections::HashMap, sync::Arc}; - -use ethers::types::{Address, H256}; -use tonic::{async_trait, Request, Response, Result, Status}; - -use super::mempool::{Mempool, OperationOrigin}; -use crate::common::protos::{ - self, - op_pool::{ - add_op_response, op_pool_server::OpPool, AddOpFailure, AddOpRequest, AddOpResponse, - AddOpSuccess, DebugClearStateRequest, DebugClearStateResponse, DebugDumpMempoolRequest, - DebugDumpMempoolResponse, DebugDumpReputationRequest, DebugDumpReputationResponse, - DebugSetReputationRequest, DebugSetReputationResponse, GetOpsRequest, GetOpsResponse, - GetSupportedEntryPointsRequest, GetSupportedEntryPointsResponse, MempoolOp, - RemoveEntitiesRequest, RemoveEntitiesResponse, RemoveOpsRequest, RemoveOpsResponse, - }, -}; - -pub struct OpPoolImpl { - chain_id: u64, - mempools: HashMap>, -} - -impl OpPoolImpl -where - M: Mempool, -{ - pub fn new(chain_id: u64, mempools: HashMap>) -> Self { - Self { chain_id, mempools } - } - - fn get_mempool_for_entry_point(&self, req_entry_point: &[u8]) -> Result<&Arc> { - let req_ep: Address = protos::from_bytes(req_entry_point) - .map_err(|e| Status::invalid_argument(format!("Invalid entry point: {e}")))?; - let Some(mempool) = self.mempools.get(&req_ep) else { - return Err(Status::invalid_argument(format!( - "Entry point not supported: {req_ep:?}" - ))); - }; - - Ok(mempool) - } -} - -#[async_trait] -impl OpPool for OpPoolImpl -where - M: Mempool + 'static, -{ - async fn get_supported_entry_points( - &self, - _request: Request, - ) -> Result> { - let entry_points = self - .mempools - .keys() - .map(|k| k.as_bytes().to_vec()) - .collect(); - Ok(Response::new(GetSupportedEntryPointsResponse { - chain_id: self.chain_id, - entry_points, - })) - } - - async fn add_op(&self, request: Request) -> Result> { - let req = request.into_inner(); - let mempool = self.get_mempool_for_entry_point(&req.entry_point)?; - - let op = req - .op - .ok_or_else(|| Status::invalid_argument("op is required in AddOpRequest"))? - .try_into() - .map_err(|e| Status::invalid_argument(format!("Invalid operation: {e}")))?; - - let resp = match mempool.add_operation(OperationOrigin::Local, op).await { - Ok(hash) => AddOpResponse { - result: Some(add_op_response::Result::Success(AddOpSuccess { - hash: hash.as_bytes().to_vec(), - })), - }, - Err(error) => AddOpResponse { - result: Some(add_op_response::Result::Failure(AddOpFailure { - error: Some(error.into()), - })), - }, - }; - - Ok(Response::new(resp)) - } - - async fn get_ops(&self, request: Request) -> Result> { - let req = request.into_inner(); - let mempool = self.get_mempool_for_entry_point(&req.entry_point)?; - - let ops = mempool - .best_operations(req.max_ops as usize) - .iter() - .map(|op| MempoolOp::try_from(&(**op))) - .collect::, _>>() - .map_err(|e| Status::internal(format!("Failed to convert to proto mempool op: {e}")))?; - - Ok(Response::new(GetOpsResponse { ops })) - } - - async fn remove_ops( - &self, - request: Request, - ) -> Result> { - let req = request.into_inner(); - let mempool = self.get_mempool_for_entry_point(&req.entry_point)?; - - let hashes: Vec = req - .hashes - .into_iter() - .map(|h| { - if h.len() != 32 { - return Err(Status::invalid_argument("Hash must be 32 bytes long")); - } - Ok(H256::from_slice(&h)) - }) - .collect::, _>>()?; - - mempool.remove_operations(&hashes); - - Ok(Response::new(RemoveOpsResponse {})) - } - - async fn remove_entities( - &self, - request: Request, - ) -> Result> { - let req = request.into_inner(); - let mempool = self.get_mempool_for_entry_point(&req.entry_point)?; - - for entity in &req.entities { - mempool.remove_entity( - entity - .try_into() - .map_err(|e| Status::internal(format!("Failed to convert to entity: {e}")))?, - ); - } - - Ok(Response::new(RemoveEntitiesResponse {})) - } - - async fn debug_clear_state( - &self, - _request: Request, - ) -> Result> { - self.mempools.values().for_each(|mempool| mempool.clear()); - Ok(Response::new(DebugClearStateResponse {})) - } - - async fn debug_dump_mempool( - &self, - request: Request, - ) -> Result> { - let req = request.into_inner(); - let mempool = self.get_mempool_for_entry_point(&req.entry_point)?; - - let ops = mempool - .all_operations(usize::MAX) - .iter() - .map(|op| MempoolOp::try_from(&(**op))) - .collect::, _>>() - .map_err(|e| Status::internal(format!("Failed to convert to proto mempool op: {e}")))?; - - Ok(Response::new(DebugDumpMempoolResponse { ops })) - } - - async fn debug_set_reputation( - &self, - request: Request, - ) -> Result> { - let req = request.into_inner(); - let mempool = self.get_mempool_for_entry_point(&req.entry_point)?; - - let reps = if req.reputations.is_empty() { - return Err(Status::invalid_argument( - "Reputation is required in DebugSetReputationRequest", - )); - } else { - req.reputations - }; - - for rep in reps { - let addr = protos::from_bytes(&rep.address) - .map_err(|e| Status::invalid_argument(format!("Invalid address: {e}")))?; - - mempool.set_reputation(addr, rep.ops_seen, rep.ops_included); - } - - Ok(Response::new(DebugSetReputationResponse {})) - } - - async fn debug_dump_reputation( - &self, - request: Request, - ) -> Result> { - let req = request.into_inner(); - let mempool = self.get_mempool_for_entry_point(&req.entry_point)?; - - let reps = mempool.dump_reputation(); - Ok(Response::new(DebugDumpReputationResponse { - reputations: reps.into_iter().map(Into::into).collect(), - })) - } -} - -#[cfg(test)] -mod tests { - use tonic::Code; - - use super::*; - - const TEST_ADDRESS_ARR: [u8; 20] = [ - 0x11, 0xAB, 0xB0, 0x5d, 0x9A, 0xd3, 0x18, 0xbf, 0x65, 0x65, 0x26, 0x72, 0xB1, 0x3b, 0x1d, - 0xcb, 0x0E, 0x6D, 0x4a, 0x32, - ]; - - use crate::{ - common::{protos::op_pool::UserOperation, types, types::Entity}, - op_pool::{ - mempool::{error::MempoolResult, PoolOperation}, - reputation::Reputation, - }, - }; - - #[test] - fn test_check_entry_point() { - let pool = given_oppool(); - let result = pool.get_mempool_for_entry_point(&TEST_ADDRESS_ARR); - assert!(result.is_ok()); - } - - #[tokio::test] - async fn test_add_op_fails_with_mismatch_entry_point() { - let oppool = given_oppool(); - let request = Request::new(AddOpRequest { - entry_point: [0; 20].to_vec(), - op: None, - }); - - let result = oppool.add_op(request).await; - - assert!(result.is_err()); - let result = result.unwrap_err(); - assert_eq!(result.code(), Code::InvalidArgument); - } - - #[tokio::test] - async fn test_add_op_fails_with_null_uop() { - let oppool = given_oppool(); - let request = Request::new(AddOpRequest { - entry_point: TEST_ADDRESS_ARR.to_vec(), - op: None, - }); - - let result = oppool.add_op(request).await; - - assert!(result.is_err()); - let result = result.unwrap_err(); - assert_eq!(result.code(), Code::InvalidArgument); - } - - #[tokio::test] - async fn test_add_op_fails_with_bad_proto_op() { - let oppool = given_oppool(); - let request = Request::new(AddOpRequest { - entry_point: TEST_ADDRESS_ARR.to_vec(), - op: Some(UserOperation::default()), - }); - - let result = oppool.add_op(request).await; - - assert!(result.is_err()); - let result = result.unwrap_err(); - assert_eq!(result.code(), Code::InvalidArgument); - } - - fn given_oppool() -> OpPoolImpl { - OpPoolImpl::::new( - 1, - HashMap::from([(TEST_ADDRESS_ARR.into(), MockMempool::default().into())]), - ) - } - - pub struct MockMempool { - entry_point: Address, - } - - impl Default for MockMempool { - fn default() -> Self { - Self { - entry_point: TEST_ADDRESS_ARR.into(), - } - } - } - - #[async_trait] - impl Mempool for MockMempool { - fn entry_point(&self) -> Address { - self.entry_point - } - - async fn add_operation( - &self, - _origin: OperationOrigin, - _opp: types::UserOperation, - ) -> MempoolResult { - Ok(H256::zero()) - } - - fn remove_operations<'a>(&self, _hashes: impl IntoIterator) {} - - fn remove_entity(&self, _entity: Entity) {} - - fn best_operations(&self, _max: usize) -> Vec> { - vec![] - } - - fn all_operations(&self, _max: usize) -> Vec> { - vec![] - } - - fn clear(&self) {} - - fn dump_reputation(&self) -> Vec { - vec![] - } - - fn set_reputation(&self, _address: Address, _ops_seenn: u64, _ops_included: u64) {} - } -} diff --git a/src/op_pool/server/error.rs b/src/op_pool/server/error.rs new file mode 100644 index 000000000..607346b25 --- /dev/null +++ b/src/op_pool/server/error.rs @@ -0,0 +1,20 @@ +use crate::op_pool::mempool::error::MempoolError; + +#[derive(Debug, thiserror::Error)] +pub enum PoolServerError { + #[error(transparent)] + MempoolError(MempoolError), + #[error("Unexpected response from PoolServer")] + UnexpectedResponse, + #[error(transparent)] + Other(#[from] anyhow::Error), +} + +impl From for PoolServerError { + fn from(error: MempoolError) -> Self { + match error { + MempoolError::Other(e) => Self::Other(e), + _ => Self::MempoolError(error), + } + } +} diff --git a/src/op_pool/server/local/client.rs b/src/op_pool/server/local/client.rs new file mode 100644 index 000000000..b0fc26fc1 --- /dev/null +++ b/src/op_pool/server/local/client.rs @@ -0,0 +1,134 @@ +use ethers::types::{Address, H256}; +use tokio::sync::{mpsc, oneshot}; +use tonic::async_trait; + +use super::server::{ServerRequest, ServerRequestKind, ServerResponse}; +use crate::{ + common::types::{Entity, UserOperation}, + op_pool::{ + mempool::PoolOperation, + server::{error::PoolServerError, PoolClient, Reputation}, + PoolResult, + }, +}; + +#[derive(Clone, Debug)] +pub struct LocalPoolClient { + sender: mpsc::Sender, +} + +impl LocalPoolClient { + pub fn new(sender: mpsc::Sender) -> Self { + Self { sender } + } + + async fn send(&self, request: ServerRequestKind) -> PoolResult { + let (send, recv) = oneshot::channel(); + self.sender + .send(ServerRequest { + request, + response: send, + }) + .await + .map_err(|_| anyhow::anyhow!("LocalPoolServer closed"))?; + recv.await + .map_err(|_| anyhow::anyhow!("LocalPoolServer closed"))? + } +} + +#[async_trait] +impl PoolClient for LocalPoolClient { + async fn get_supported_entry_points(&self) -> PoolResult> { + let req = ServerRequestKind::GetSupportedEntryPoints; + let resp = self.send(req).await?; + match resp { + ServerResponse::GetSupportedEntryPoints { entry_points } => Ok(entry_points), + _ => Err(PoolServerError::UnexpectedResponse), + } + } + + async fn add_op(&self, entry_point: Address, op: UserOperation) -> PoolResult { + let req = ServerRequestKind::AddOp { entry_point, op }; + let resp = self.send(req).await?; + match resp { + ServerResponse::AddOp { hash } => Ok(hash), + _ => Err(PoolServerError::UnexpectedResponse), + } + } + + async fn get_ops(&self, entry_point: Address, max_ops: u64) -> PoolResult> { + let req = ServerRequestKind::GetOps { + entry_point, + max_ops, + }; + let resp = self.send(req).await?; + match resp { + ServerResponse::GetOps { ops } => Ok(ops), + _ => Err(PoolServerError::UnexpectedResponse), + } + } + + async fn remove_ops(&self, entry_point: Address, ops: Vec) -> PoolResult<()> { + let req = ServerRequestKind::RemoveOps { entry_point, ops }; + let resp = self.send(req).await?; + match resp { + ServerResponse::RemoveOps => Ok(()), + _ => Err(PoolServerError::UnexpectedResponse), + } + } + + async fn remove_entities(&self, entry_point: Address, entities: Vec) -> PoolResult<()> { + let req = ServerRequestKind::RemoveEntities { + entry_point, + entities, + }; + let resp = self.send(req).await?; + match resp { + ServerResponse::RemoveEntities => Ok(()), + _ => Err(PoolServerError::UnexpectedResponse), + } + } + + async fn debug_clear_state(&self) -> Result<(), PoolServerError> { + let req = ServerRequestKind::DebugClearState; + let resp = self.send(req).await?; + match resp { + ServerResponse::DebugClearState => Ok(()), + _ => Err(PoolServerError::UnexpectedResponse), + } + } + + async fn debug_dump_mempool(&self, entry_point: Address) -> PoolResult> { + let req = ServerRequestKind::DebugDumpMempool { entry_point }; + let resp = self.send(req).await?; + match resp { + ServerResponse::DebugDumpMempool { ops } => Ok(ops), + _ => Err(PoolServerError::UnexpectedResponse), + } + } + + async fn debug_set_reputations( + &self, + entry_point: Address, + reputations: Vec, + ) -> PoolResult<()> { + let req = ServerRequestKind::DebugSetReputations { + entry_point, + reputations, + }; + let resp = self.send(req).await?; + match resp { + ServerResponse::DebugSetReputations => Ok(()), + _ => Err(PoolServerError::UnexpectedResponse), + } + } + + async fn debug_dump_reputation(&self, entry_point: Address) -> PoolResult> { + let req = ServerRequestKind::DebugDumpReputation { entry_point }; + let resp = self.send(req).await?; + match resp { + ServerResponse::DebugDumpReputation { reputations } => Ok(reputations), + _ => Err(PoolServerError::UnexpectedResponse), + } + } +} diff --git a/src/op_pool/server/local/mod.rs b/src/op_pool/server/local/mod.rs new file mode 100644 index 000000000..24bf375cd --- /dev/null +++ b/src/op_pool/server/local/mod.rs @@ -0,0 +1,5 @@ +mod client; +mod server; + +pub use client::*; +pub use server::*; diff --git a/src/op_pool/server/local/server.rs b/src/op_pool/server/local/server.rs new file mode 100644 index 000000000..a6aa0adac --- /dev/null +++ b/src/op_pool/server/local/server.rs @@ -0,0 +1,240 @@ +use ethers::types::{Address, H256}; +use tokio::{ + sync::{mpsc, oneshot}, + task::JoinHandle, +}; +use tokio_util::sync::CancellationToken; + +use crate::{ + common::types::{Entity, UserOperation}, + op_pool::{ + mempool::{Mempool, MempoolGroup, OperationOrigin, PoolOperation}, + server::Reputation, + PoolResult, + }, +}; + +pub fn spawn_local_mempool_server( + mempool_runner: MempoolGroup, + receiver: mpsc::Receiver, + shutdown_token: CancellationToken, +) -> anyhow::Result>> { + let mut server = LocalPoolServer::new(receiver, mempool_runner); + let handle = tokio::spawn(async move { server.run(shutdown_token).await }); + Ok(handle) +} + +pub struct LocalPoolServer { + receiver: mpsc::Receiver, + mempools: MempoolGroup, +} + +impl LocalPoolServer +where + M: Mempool, +{ + pub fn new(receiver: mpsc::Receiver, mempools: MempoolGroup) -> Self { + Self { receiver, mempools } + } + + pub async fn run(&mut self, shutdown_token: CancellationToken) -> anyhow::Result<()> { + loop { + tokio::select! { + _ = shutdown_token.cancelled() => { + break; + } + Some(req) = self.receiver.recv() => { + let resp = match req.request { + ServerRequestKind::GetSupportedEntryPoints => { + Ok(ServerResponse::GetSupportedEntryPoints { + entry_points: self.mempools.get_supported_entry_points() + }) + }, + ServerRequestKind::AddOp { entry_point, op } => { + let pool = self.mempools.get_pool(entry_point)?; + tokio::spawn(async move { + let resp = match pool.add_operation(OperationOrigin::Local, op).await { + Ok(hash) => Ok(ServerResponse::AddOp { hash }), + Err(e) => Err(e.into()), + }; + if let Err(e) = req.response.send(resp) { + tracing::error!("Failed to send response: {:?}", e); + } + }); + continue; + }, + ServerRequestKind::GetOps { entry_point, max_ops } => { + match self.mempools.get_ops(entry_point, max_ops) { + Ok(ops) => Ok(ServerResponse::GetOps { ops }), + Err(e) => Err(e.into()), + } + }, + ServerRequestKind::RemoveOps { entry_point, ops } => { + match self.mempools.remove_ops(entry_point, &ops) { + Ok(_) => Ok(ServerResponse::RemoveOps), + Err(e) => Err(e.into()), + } + }, + ServerRequestKind::RemoveEntities { entry_point, entities } => { + match self.mempools.remove_entities(entry_point, &entities) { + Ok(_) => Ok(ServerResponse::RemoveOps), + Err(e) => Err(e.into()), + } + }, + ServerRequestKind::DebugClearState => { + match self.mempools.debug_clear_state() { + Ok(_) => Ok(ServerResponse::RemoveOps), + Err(e) => Err(e.into()), + } + }, + ServerRequestKind::DebugDumpMempool { entry_point } => { + match self.mempools.debug_dump_mempool(entry_point) { + Ok(ops) => Ok(ServerResponse::DebugDumpMempool { ops }), + Err(e) => Err(e.into()), + } + }, + ServerRequestKind::DebugSetReputations { entry_point, reputations } => { + match self.mempools.debug_set_reputations(entry_point, &reputations) { + Ok(_) => Ok(ServerResponse::DebugSetReputations), + Err(e) => Err(e.into()), + } + }, + ServerRequestKind::DebugDumpReputation { entry_point } => { + match self.mempools.debug_dump_reputation(entry_point) { + Ok(reputations) => Ok(ServerResponse::DebugDumpReputation { reputations }), + Err(e) => Err(e.into()), + } + } + }; + if let Err(e) = req.response.send(resp) { + tracing::error!("Failed to send response: {:?}", e); + } + } + } + } + + Ok(()) + } +} + +#[derive(Debug)] +pub struct ServerRequest { + pub request: ServerRequestKind, + pub response: oneshot::Sender>, +} + +#[derive(Clone, Debug)] +pub enum ServerRequestKind { + GetSupportedEntryPoints, + AddOp { + entry_point: Address, + op: UserOperation, + }, + GetOps { + entry_point: Address, + max_ops: u64, + }, + RemoveOps { + entry_point: Address, + ops: Vec, + }, + RemoveEntities { + entry_point: Address, + entities: Vec, + }, + DebugClearState, + DebugDumpMempool { + entry_point: Address, + }, + DebugSetReputations { + entry_point: Address, + reputations: Vec, + }, + DebugDumpReputation { + entry_point: Address, + }, +} + +#[derive(Clone, Debug)] +pub enum ServerResponse { + GetSupportedEntryPoints { entry_points: Vec
}, + AddOp { hash: H256 }, + GetOps { ops: Vec }, + RemoveOps, + RemoveEntities, + DebugClearState, + DebugDumpMempool { ops: Vec }, + DebugSetReputations, + DebugDumpReputation { reputations: Vec }, +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use super::*; + use crate::op_pool::{mempool::MockMempool, LocalPoolClient, PoolClient}; + + #[tokio::test] + async fn send_receive() { + let ep = Address::random(); + let mut mock_pool = MockMempool::new(); + mock_pool.expect_entry_point().returning(move || ep); + + let mempool_group = MempoolGroup::new(vec![Arc::new(mock_pool)]); + let (tx, rx) = mpsc::channel(1); + let shutdown_token = CancellationToken::new(); + let handle = spawn_local_mempool_server(mempool_group, rx, shutdown_token.clone()).unwrap(); + + let client = LocalPoolClient::new(tx); + let ret = client.get_supported_entry_points().await.unwrap(); + assert_eq!(ret, vec![ep]); + + shutdown_token.cancel(); + handle.await.unwrap().unwrap(); + } + + #[tokio::test] + async fn early_shutdown() { + let ep = Address::random(); + let mut mock_pool = MockMempool::new(); + mock_pool.expect_entry_point().returning(move || ep); + + let mempool_group = MempoolGroup::new(vec![Arc::new(mock_pool)]); + let (tx, rx) = mpsc::channel(1); + let shutdown_token = CancellationToken::new(); + let handle = spawn_local_mempool_server(mempool_group, rx, shutdown_token.clone()).unwrap(); + + shutdown_token.cancel(); + handle.await.unwrap().unwrap(); + + let client = LocalPoolClient::new(tx); + let ret = client.get_supported_entry_points().await; + assert!(ret.is_err()); + } + + #[tokio::test] + async fn add_op() { + let ep = Address::random(); + let mut mock_pool = MockMempool::new(); + let uo = UserOperation::default(); + let uo_hash = uo.op_hash(ep, 0); + + mock_pool.expect_entry_point().returning(move || ep); + mock_pool + .expect_add_operation() + .returning(move |_, _| Ok(uo_hash)); + + let mempool_group = MempoolGroup::new(vec![Arc::new(mock_pool)]); + let (tx, rx) = mpsc::channel(1); + let shutdown_token = CancellationToken::new(); + let handle = spawn_local_mempool_server(mempool_group, rx, shutdown_token.clone()).unwrap(); + + let client = LocalPoolClient::new(tx); + let ret = client.add_op(ep, uo).await.unwrap(); + assert_eq!(ret, uo_hash); + + shutdown_token.cancel(); + handle.await.unwrap().unwrap(); + } +} diff --git a/src/op_pool/server/mod.rs b/src/op_pool/server/mod.rs new file mode 100644 index 000000000..d3d219570 --- /dev/null +++ b/src/op_pool/server/mod.rs @@ -0,0 +1,57 @@ +mod error; +mod local; +mod remote; + +pub use error::PoolServerError; +use ethers::types::{Address, H256}; +pub use local::{spawn_local_mempool_server, LocalPoolClient, ServerRequest}; +#[cfg(test)] +use mockall::automock; +pub use remote::{connect_remote_pool_client, spawn_remote_mempool_server, RemotePoolClient}; +use tokio::sync::mpsc; +use tonic::async_trait; + +use super::{mempool::PoolOperation, Reputation}; +use crate::{ + common::types::{Entity, UserOperation}, + op_pool::LocalPoolServerRequest, +}; + +pub type Error = error::PoolServerError; +pub type PoolResult = std::result::Result; + +#[cfg_attr(test, automock)] +#[async_trait] +pub trait PoolClient: Send + Sync + 'static { + async fn get_supported_entry_points(&self) -> PoolResult>; + + async fn add_op(&self, entry_point: Address, op: UserOperation) -> PoolResult; + + async fn get_ops(&self, entry_point: Address, max_ops: u64) -> PoolResult>; + + async fn remove_ops(&self, entry_point: Address, ops: Vec) -> PoolResult<()>; + + async fn remove_entities(&self, entry_point: Address, entities: Vec) -> PoolResult<()>; + + async fn debug_clear_state(&self) -> PoolResult<()>; + + async fn debug_dump_mempool(&self, entry_point: Address) -> PoolResult>; + + async fn debug_set_reputations( + &self, + entry_point: Address, + reputations: Vec, + ) -> PoolResult<()>; + + async fn debug_dump_reputation(&self, entry_point: Address) -> PoolResult>; +} + +#[derive(Debug)] +pub enum PoolClientMode { + Local { + sender: mpsc::Sender, + }, + Remote { + url: String, + }, +} diff --git a/src/op_pool/server/remote/client.rs b/src/op_pool/server/remote/client.rs new file mode 100644 index 000000000..2afe7d53e --- /dev/null +++ b/src/op_pool/server/remote/client.rs @@ -0,0 +1,253 @@ +use anyhow::bail; +use ethers::types::{Address, H256}; +use tokio_util::sync::CancellationToken; +use tonic::{async_trait, transport::Channel}; + +use super::protos::{ + self, add_op_response, debug_clear_state_response, debug_dump_mempool_response, + debug_dump_reputation_response, debug_set_reputation_response, get_ops_response, + op_pool_client::OpPoolClient, remove_entities_response, remove_ops_response, AddOpRequest, + DebugClearStateRequest, DebugDumpMempoolRequest, DebugDumpReputationRequest, + DebugSetReputationRequest, GetOpsRequest, RemoveEntitiesRequest, RemoveOpsRequest, +}; +use crate::{ + common::{ + protos::{from_bytes, ConversionError}, + server::connect_with_retries, + types::{Entity, UserOperation}, + }, + op_pool::{ + mempool::{PoolOperation, Reputation}, + server::{error::PoolServerError, PoolClient}, + PoolResult, + }, +}; + +#[derive(Clone, Debug)] +pub struct RemotePoolClient { + op_pool_client: OpPoolClient, +} + +impl RemotePoolClient { + pub fn new(client: OpPoolClient) -> Self { + Self { + op_pool_client: client, + } + } +} + +#[async_trait] +impl PoolClient for RemotePoolClient { + async fn get_supported_entry_points(&self) -> PoolResult> { + Ok(self + .op_pool_client + .clone() + .get_supported_entry_points(protos::GetSupportedEntryPointsRequest {}) + .await? + .into_inner() + .entry_points + .into_iter() + .map(|ep| from_bytes(ep.as_slice())) + .collect::>()?) + } + + async fn add_op(&self, entry_point: Address, op: UserOperation) -> PoolResult { + let res = self + .op_pool_client + .clone() + .add_op(AddOpRequest { + entry_point: entry_point.as_bytes().to_vec(), + op: Some(protos::UserOperation::from(&op)), + }) + .await? + .into_inner() + .result; + + match res { + Some(add_op_response::Result::Success(s)) => Ok(H256::from_slice(&s.hash)), + Some(add_op_response::Result::Failure(f)) => Err(f.try_into()?), + None => Err(PoolServerError::Other(anyhow::anyhow!( + "should have received result from op pool" + )))?, + } + } + + async fn get_ops(&self, entry_point: Address, max_ops: u64) -> PoolResult> { + let res = self + .op_pool_client + .clone() + .get_ops(GetOpsRequest { + entry_point: entry_point.as_bytes().to_vec(), + max_ops, + }) + .await? + .into_inner() + .result; + + match res { + Some(get_ops_response::Result::Success(s)) => s + .ops + .into_iter() + .map(PoolOperation::try_from) + .map(|res| res.map_err(PoolServerError::from)) + .collect(), + Some(get_ops_response::Result::Failure(f)) => Err(f.try_into()?), + None => Err(PoolServerError::Other(anyhow::anyhow!( + "should have received result from op pool" + )))?, + } + } + + async fn remove_ops(&self, entry_point: Address, ops: Vec) -> PoolResult<()> { + let res = self + .op_pool_client + .clone() + .remove_ops(RemoveOpsRequest { + entry_point: entry_point.as_bytes().to_vec(), + hashes: ops.into_iter().map(|h| h.as_bytes().to_vec()).collect(), + }) + .await? + .into_inner() + .result; + + match res { + Some(remove_ops_response::Result::Success(_)) => Ok(()), + Some(remove_ops_response::Result::Failure(f)) => Err(f.try_into()?), + None => Err(PoolServerError::Other(anyhow::anyhow!( + "should have received result from op pool" + )))?, + } + } + + async fn remove_entities(&self, entry_point: Address, entities: Vec) -> PoolResult<()> { + let res = self + .op_pool_client + .clone() + .remove_entities(RemoveEntitiesRequest { + entry_point: entry_point.as_bytes().to_vec(), + entities: entities.iter().map(protos::Entity::from).collect(), + }) + .await? + .into_inner() + .result; + + match res { + Some(remove_entities_response::Result::Success(_)) => Ok(()), + Some(remove_entities_response::Result::Failure(f)) => Err(f.try_into()?), + None => Err(PoolServerError::Other(anyhow::anyhow!( + "should have received result from op pool" + )))?, + } + } + + async fn debug_clear_state(&self) -> PoolResult<()> { + let res = self + .op_pool_client + .clone() + .debug_clear_state(DebugClearStateRequest {}) + .await? + .into_inner() + .result; + + match res { + Some(debug_clear_state_response::Result::Success(_)) => Ok(()), + Some(debug_clear_state_response::Result::Failure(f)) => Err(f.try_into()?), + None => Err(PoolServerError::Other(anyhow::anyhow!( + "should have received result from op pool" + )))?, + } + } + + async fn debug_dump_mempool(&self, entry_point: Address) -> PoolResult> { + let res = self + .op_pool_client + .clone() + .debug_dump_mempool(DebugDumpMempoolRequest { + entry_point: entry_point.as_bytes().to_vec(), + }) + .await? + .into_inner() + .result; + + match res { + Some(debug_dump_mempool_response::Result::Success(s)) => s + .ops + .into_iter() + .map(PoolOperation::try_from) + .map(|res| res.map_err(PoolServerError::from)) + .collect(), + Some(debug_dump_mempool_response::Result::Failure(f)) => Err(f.try_into()?), + None => Err(PoolServerError::Other(anyhow::anyhow!( + "should have received result from op pool" + )))?, + } + } + + async fn debug_set_reputations( + &self, + entry_point: Address, + reputations: Vec, + ) -> PoolResult<()> { + let res = self + .op_pool_client + .clone() + .debug_set_reputation(DebugSetReputationRequest { + entry_point: entry_point.as_bytes().to_vec(), + reputations: reputations + .into_iter() + .map(protos::Reputation::from) + .collect(), + }) + .await? + .into_inner() + .result; + + match res { + Some(debug_set_reputation_response::Result::Success(_)) => Ok(()), + Some(debug_set_reputation_response::Result::Failure(f)) => Err(f.try_into()?), + None => Err(PoolServerError::Other(anyhow::anyhow!( + "should have received result from op pool" + )))?, + } + } + + async fn debug_dump_reputation(&self, entry_point: Address) -> PoolResult> { + let res = self + .op_pool_client + .clone() + .debug_dump_reputation(DebugDumpReputationRequest { + entry_point: entry_point.as_bytes().to_vec(), + }) + .await? + .into_inner() + .result; + + match res { + Some(debug_dump_reputation_response::Result::Success(s)) => s + .reputations + .into_iter() + .map(Reputation::try_from) + .map(|res| res.map_err(PoolServerError::from)) + .collect(), + Some(debug_dump_reputation_response::Result::Failure(f)) => Err(f.try_into()?), + None => Err(PoolServerError::Other(anyhow::anyhow!( + "should have received result from op pool" + )))?, + } + } +} + +pub async fn connect_remote_pool_client( + op_pool_url: &str, + shutdown_token: CancellationToken, +) -> anyhow::Result { + tokio::select! { + _ = shutdown_token.cancelled() => { + tracing::error!("bailing from connecting client, server shutting down"); + bail!("Server shutting down") + } + res = connect_with_retries("op pool from builder", op_pool_url, OpPoolClient::connect) => { + Ok(RemotePoolClient::new(res?)) + } + } +} diff --git a/src/op_pool/error.rs b/src/op_pool/server/remote/error.rs similarity index 89% rename from src/op_pool/error.rs rename to src/op_pool/server/remote/error.rs index 0082d7e75..506119161 100644 --- a/src/op_pool/error.rs +++ b/src/op_pool/server/remote/error.rs @@ -1,32 +1,66 @@ use anyhow::{bail, Context}; use ethers::types::Opcode; -use super::mempool::error::MempoolError; -use crate::common::{ - precheck::PrecheckViolation, - protos::{ - from_bytes, - op_pool::{ - mempool_error, precheck_violation_error, simulation_violation_error, - AccessedUndeployedContract, AggregatorValidationFailed, CallGasLimitTooLow, - CallHadValue, CalledBannedEntryPointMethod, CodeHashChanged, DidNotRevert, - DiscardedOnInsertError, Entity, EntityThrottledError, EntityType, - ExistingSenderWithInitCode, FactoryCalledCreate2Twice, FactoryIsNotContract, - InitCodeTooShort, InvalidSignature, InvalidStorageAccess, MaxFeePerGasTooLow, - MaxOperationsReachedError, MaxPriorityFeePerGasTooLow, - MempoolError as ProtoMempoolError, NotStaked, OutOfGas, PaymasterDepositTooLow, - PaymasterIsNotContract, PaymasterTooShort, PreVerificationGasTooLow, - PrecheckViolationError as ProtoPrecheckViolationError, ReplacementUnderpricedError, - SenderFundsTooLow, SenderIsNotContractAndNoInitCode, - SimulationViolationError as ProtoSimulationViolationError, UnintendedRevert, - UnintendedRevertWithMessage, UnsupportedAggregatorError, UsedForbiddenOpcode, - UsedForbiddenPrecompile, VerificationGasLimitTooHigh, WrongNumberOfPhases, - }, - to_le_bytes, +use super::protos::{ + mempool_error, precheck_violation_error, simulation_violation_error, + AccessedUndeployedContract, AggregatorValidationFailed, CallGasLimitTooLow, CallHadValue, + CalledBannedEntryPointMethod, CodeHashChanged, DidNotRevert, DiscardedOnInsertError, Entity, + EntityThrottledError, EntityType, ExistingSenderWithInitCode, FactoryCalledCreate2Twice, + FactoryIsNotContract, InitCodeTooShort, InvalidSignature, InvalidStorageAccess, + MaxFeePerGasTooLow, MaxOperationsReachedError, MaxPriorityFeePerGasTooLow, + MempoolError as ProtoMempoolError, NotStaked, OutOfGas, PaymasterDepositTooLow, + PaymasterIsNotContract, PaymasterTooShort, PreVerificationGasTooLow, + PrecheckViolationError as ProtoPrecheckViolationError, ReplacementUnderpricedError, + SenderFundsTooLow, SenderIsNotContractAndNoInitCode, + SimulationViolationError as ProtoSimulationViolationError, UnintendedRevert, + UnintendedRevertWithMessage, UnknownEntryPointError, UnsupportedAggregatorError, + UsedForbiddenOpcode, UsedForbiddenPrecompile, VerificationGasLimitTooHigh, WrongNumberOfPhases, +}; +use crate::{ + common::{ + precheck::PrecheckViolation, + protos::{from_bytes, to_le_bytes, ConversionError}, + simulation::{SimulationViolation, StorageSlot}, }, - simulation::{SimulationViolation, StorageSlot}, + op_pool::{mempool::error::MempoolError, server::error::PoolServerError}, }; +impl From for PoolServerError { + fn from(value: tonic::Status) -> Self { + PoolServerError::Other(anyhow::anyhow!(value.to_string())) + } +} + +impl From for PoolServerError { + fn from(value: ConversionError) -> Self { + PoolServerError::Other(anyhow::anyhow!(value.to_string())) + } +} + +impl TryFrom for PoolServerError { + type Error = anyhow::Error; + + fn try_from(value: ProtoMempoolError) -> Result { + Ok(PoolServerError::MempoolError(value.try_into()?)) + } +} + +impl From for ProtoMempoolError { + fn from(value: PoolServerError) -> Self { + match value { + PoolServerError::MempoolError(e) => e.into(), + PoolServerError::UnexpectedResponse => ProtoMempoolError { + error: Some(mempool_error::Error::Internal( + "unexpected response from pool server".to_string(), + )), + }, + PoolServerError::Other(e) => ProtoMempoolError { + error: Some(mempool_error::Error::Internal(e.to_string())), + }, + } + } +} + impl TryFrom for MempoolError { type Error = anyhow::Error; @@ -58,7 +92,10 @@ impl TryFrom for MempoolError { Some(mempool_error::Error::UnsupportedAggregator(e)) => { MempoolError::UnsupportedAggregator(from_bytes(&e.aggregator_address)?) } - None => bail!("unknown proto mempool error"), + Some(mempool_error::Error::UnknownEntryPoint(e)) => { + MempoolError::UnknownEntryPoint(from_bytes(&e.entry_point)?) + } + _ => bail!("unknown proto mempool error"), }) } } @@ -110,6 +147,13 @@ impl From for ProtoMempoolError { }, )), }, + MempoolError::UnknownEntryPoint(entry_point) => ProtoMempoolError { + error: Some(mempool_error::Error::UnknownEntryPoint( + UnknownEntryPointError { + entry_point: entry_point.as_bytes().to_vec(), + }, + )), + }, } } } diff --git a/src/op_pool/server/remote/mod.rs b/src/op_pool/server/remote/mod.rs new file mode 100644 index 000000000..7459d6fae --- /dev/null +++ b/src/op_pool/server/remote/mod.rs @@ -0,0 +1,7 @@ +mod client; +mod error; +mod protos; +mod server; + +pub use client::*; +pub use server::*; diff --git a/src/op_pool/server/remote/protos.rs b/src/op_pool/server/remote/protos.rs new file mode 100644 index 000000000..df850f813 --- /dev/null +++ b/src/op_pool/server/remote/protos.rs @@ -0,0 +1,210 @@ +use anyhow::Context; +use ethers::types::{Address, H256}; + +use crate::{ + common::{ + protos::{from_bytes, to_le_bytes, ConversionError}, + types::{ + Entity as CommonEntity, EntityType as CommonEntityType, + UserOperation as RpcUserOperation, ValidTimeRange, + }, + }, + op_pool::{ + mempool::{Reputation as PoolReputation, ReputationStatus as PoolReputationStatus}, + PoolOperation, + }, +}; + +tonic::include_proto!("op_pool"); + +pub const OP_POOL_FILE_DESCRIPTOR_SET: &[u8] = + tonic::include_file_descriptor_set!("op_pool_descriptor"); + +impl From<&RpcUserOperation> for UserOperation { + fn from(op: &RpcUserOperation) -> Self { + UserOperation { + sender: op.sender.0.to_vec(), + nonce: to_le_bytes(op.nonce), + init_code: op.init_code.to_vec(), + call_data: op.call_data.to_vec(), + call_gas_limit: to_le_bytes(op.call_gas_limit), + verification_gas_limit: to_le_bytes(op.verification_gas_limit), + pre_verification_gas: to_le_bytes(op.pre_verification_gas), + max_fee_per_gas: to_le_bytes(op.max_fee_per_gas), + max_priority_fee_per_gas: to_le_bytes(op.max_priority_fee_per_gas), + paymaster_and_data: op.paymaster_and_data.to_vec(), + signature: op.signature.to_vec(), + } + } +} + +impl TryFrom for RpcUserOperation { + type Error = ConversionError; + + fn try_from(op: UserOperation) -> Result { + Ok(RpcUserOperation { + sender: from_bytes(&op.sender)?, + nonce: from_bytes(&op.nonce)?, + init_code: op.init_code.into(), + call_data: op.call_data.into(), + call_gas_limit: from_bytes(&op.call_gas_limit)?, + verification_gas_limit: from_bytes(&op.verification_gas_limit)?, + pre_verification_gas: from_bytes(&op.pre_verification_gas)?, + max_fee_per_gas: from_bytes(&op.max_fee_per_gas)?, + max_priority_fee_per_gas: from_bytes(&op.max_priority_fee_per_gas)?, + paymaster_and_data: op.paymaster_and_data.into(), + signature: op.signature.into(), + }) + } +} + +impl TryFrom for CommonEntityType { + type Error = ConversionError; + + fn try_from(entity: EntityType) -> Result { + match entity { + EntityType::Unspecified => Err(ConversionError::InvalidEnumValue(entity as i32)), + EntityType::Account => Ok(CommonEntityType::Account), + EntityType::Paymaster => Ok(CommonEntityType::Paymaster), + EntityType::Aggregator => Ok(CommonEntityType::Aggregator), + EntityType::Factory => Ok(CommonEntityType::Factory), + } + } +} + +impl From for EntityType { + fn from(entity: CommonEntityType) -> Self { + match entity { + CommonEntityType::Account => EntityType::Account, + CommonEntityType::Paymaster => EntityType::Paymaster, + CommonEntityType::Aggregator => EntityType::Aggregator, + CommonEntityType::Factory => EntityType::Factory, + } + } +} + +impl TryFrom<&Entity> for CommonEntity { + type Error = ConversionError; + + fn try_from(entity: &Entity) -> Result { + Ok(CommonEntity { + kind: EntityType::from_i32(entity.kind) + .ok_or(ConversionError::InvalidEnumValue(entity.kind))? + .try_into()?, + address: from_bytes(&entity.address)?, + }) + } +} + +impl From<&CommonEntity> for Entity { + fn from(entity: &CommonEntity) -> Self { + Entity { + kind: EntityType::from(entity.kind).into(), + address: entity.address.as_bytes().to_vec(), + } + } +} + +impl From for ReputationStatus { + fn from(status: PoolReputationStatus) -> Self { + match status { + PoolReputationStatus::Ok => ReputationStatus::Ok, + PoolReputationStatus::Throttled => ReputationStatus::Throttled, + PoolReputationStatus::Banned => ReputationStatus::Banned, + } + } +} + +impl From for Reputation { + fn from(rep: PoolReputation) -> Self { + Reputation { + address: rep.address.as_bytes().to_vec(), + status: ReputationStatus::from(rep.status).into(), + ops_seen: rep.ops_seen, + ops_included: rep.ops_included, + } + } +} + +impl TryFrom for PoolReputationStatus { + type Error = ConversionError; + + fn try_from(status: i32) -> Result { + match status { + x if x == ReputationStatus::Ok as i32 => Ok(Self::Ok), + x if x == ReputationStatus::Throttled as i32 => Ok(Self::Throttled), + x if x == ReputationStatus::Banned as i32 => Ok(Self::Banned), + _ => Err(ConversionError::InvalidEnumValue(status)), + } + } +} + +impl TryFrom for PoolReputation { + type Error = ConversionError; + + fn try_from(op: Reputation) -> Result { + Ok(Self { + address: from_bytes(&op.address)?, + status: PoolReputationStatus::try_from(op.status)?, + ops_seen: op.ops_seen, + ops_included: op.ops_included, + }) + } +} + +impl From<&PoolOperation> for MempoolOp { + fn from(op: &PoolOperation) -> Self { + MempoolOp { + uo: Some(UserOperation::from(&op.uo)), + aggregator: op.aggregator.map_or(vec![], |a| a.as_bytes().to_vec()), + valid_after: op.valid_time_range.valid_after.seconds_since_epoch(), + valid_until: op.valid_time_range.valid_until.seconds_since_epoch(), + expected_code_hash: op.expected_code_hash.as_bytes().to_vec(), + sim_block_hash: op.sim_block_hash.as_bytes().to_vec(), + entities_needing_stake: op + .entities_needing_stake + .iter() + .map(|e| EntityType::from(*e).into()) + .collect(), + account_is_staked: op.account_is_staked, + } + } +} + +pub const MISSING_USER_OP_ERR_STR: &str = "Mempool op should contain user operation"; +impl TryFrom for PoolOperation { + type Error = anyhow::Error; + + fn try_from(op: MempoolOp) -> Result { + let uo = op.uo.context(MISSING_USER_OP_ERR_STR)?.try_into()?; + + let aggregator: Option
= if op.aggregator.is_empty() { + None + } else { + Some(from_bytes(&op.aggregator)?) + }; + + let valid_time_range = ValidTimeRange::new(op.valid_after.into(), op.valid_until.into()); + + let expected_code_hash = H256::from_slice(&op.expected_code_hash); + let sim_block_hash = H256::from_slice(&op.sim_block_hash); + let entities_needing_stake = op + .entities_needing_stake + .into_iter() + .map(|e| { + let pe = EntityType::from_i32(e).ok_or(ConversionError::InvalidEnumValue(e))?; + pe.try_into() + }) + .collect::, ConversionError>>()?; + + Ok(PoolOperation { + uo, + aggregator, + valid_time_range, + expected_code_hash, + entities_needing_stake, + sim_block_hash, + account_is_staked: op.account_is_staked, + }) + } +} diff --git a/src/op_pool/server/remote/server.rs b/src/op_pool/server/remote/server.rs new file mode 100644 index 000000000..6390ffd82 --- /dev/null +++ b/src/op_pool/server/remote/server.rs @@ -0,0 +1,311 @@ +use std::net::SocketAddr; + +use ethers::types::{Address, H256}; +use tokio::task::JoinHandle; +use tokio_util::sync::CancellationToken; +use tonic::{async_trait, transport::Server, Request, Response, Result, Status}; + +use super::protos::{ + add_op_response, debug_clear_state_response, debug_dump_mempool_response, + debug_dump_reputation_response, debug_set_reputation_response, get_ops_response, + op_pool_server::{OpPool, OpPoolServer}, + remove_entities_response, remove_ops_response, AddOpRequest, AddOpResponse, AddOpSuccess, + DebugClearStateRequest, DebugClearStateResponse, DebugClearStateSuccess, + DebugDumpMempoolRequest, DebugDumpMempoolResponse, DebugDumpMempoolSuccess, + DebugDumpReputationRequest, DebugDumpReputationResponse, DebugDumpReputationSuccess, + DebugSetReputationRequest, DebugSetReputationResponse, DebugSetReputationSuccess, + GetOpsRequest, GetOpsResponse, GetOpsSuccess, GetSupportedEntryPointsRequest, + GetSupportedEntryPointsResponse, MempoolOp, RemoveEntitiesRequest, RemoveEntitiesResponse, + RemoveEntitiesSuccess, RemoveOpsRequest, RemoveOpsResponse, RemoveOpsSuccess, + OP_POOL_FILE_DESCRIPTOR_SET, +}; +use crate::{ + common::{grpc::metrics::GrpcMetricsLayer, protos::from_bytes, types::Entity}, + op_pool::mempool::{Mempool, MempoolGroup, OperationOrigin, Reputation}, +}; + +pub async fn spawn_remote_mempool_server( + chain_id: u64, + mempool_runner: MempoolGroup, + addr: SocketAddr, + shutdown_token: CancellationToken, +) -> anyhow::Result>> { + // gRPC server + let op_pool_server = OpPoolServer::new(OpPoolImpl::new(chain_id, mempool_runner)); + let reflection_service = tonic_reflection::server::Builder::configure() + .register_encoded_file_descriptor_set(OP_POOL_FILE_DESCRIPTOR_SET) + .build()?; + + // health service + let (mut health_reporter, health_service) = tonic_health::server::health_reporter(); + health_reporter + .set_serving::>>() + .await; + + let metrics_layer = GrpcMetricsLayer::new("op_pool".to_string()); + let handle = tokio::spawn(async move { + Server::builder() + .layer(metrics_layer) + .add_service(op_pool_server) + .add_service(reflection_service) + .add_service(health_service) + .serve_with_shutdown(addr, async move { shutdown_token.cancelled().await }) + .await + .map_err(|err| anyhow::anyhow!("Server error: {err:?}")) + }); + Ok(handle) +} + +struct OpPoolImpl { + chain_id: u64, + mempool_runner: MempoolGroup, +} + +impl OpPoolImpl +where + M: Mempool, +{ + pub fn new(chain_id: u64, mempool_runner: MempoolGroup) -> Self { + Self { + chain_id, + mempool_runner, + } + } + + fn get_entry_point(&self, req_entry_point: &[u8]) -> Result
{ + from_bytes(req_entry_point) + .map_err(|e| Status::invalid_argument(format!("Invalid entry point: {e}"))) + } +} + +#[async_trait] +impl OpPool for OpPoolImpl +where + M: Mempool + 'static, +{ + async fn get_supported_entry_points( + &self, + _request: Request, + ) -> Result> { + Ok(Response::new(GetSupportedEntryPointsResponse { + chain_id: self.chain_id, + entry_points: self + .mempool_runner + .get_supported_entry_points() + .into_iter() + .map(|ep| ep.as_bytes().to_vec()) + .collect(), + })) + } + + async fn add_op(&self, request: Request) -> Result> { + let req = request.into_inner(); + let ep = self.get_entry_point(&req.entry_point)?; + + let proto_op = req + .op + .ok_or_else(|| Status::invalid_argument("Operation is required in AddOpRequest"))?; + let uo = proto_op.try_into().map_err(|e| { + Status::invalid_argument(format!("Failed to convert to UserOperation: {e}")) + })?; + + let resp = match self + .mempool_runner + .add_op(ep, uo, OperationOrigin::Local) + .await + { + Ok(hash) => AddOpResponse { + result: Some(add_op_response::Result::Success(AddOpSuccess { + hash: hash.as_bytes().to_vec(), + })), + }, + Err(error) => AddOpResponse { + result: Some(add_op_response::Result::Failure(error.into())), + }, + }; + + Ok(Response::new(resp)) + } + + async fn get_ops(&self, request: Request) -> Result> { + let req = request.into_inner(); + let ep = self.get_entry_point(&req.entry_point)?; + + let resp = match self.mempool_runner.get_ops(ep, req.max_ops) { + Ok(ops) => GetOpsResponse { + result: Some(get_ops_response::Result::Success(GetOpsSuccess { + ops: ops.iter().map(MempoolOp::from).collect(), + })), + }, + Err(error) => GetOpsResponse { + result: Some(get_ops_response::Result::Failure(error.into())), + }, + }; + + Ok(Response::new(resp)) + } + + async fn remove_ops( + &self, + request: Request, + ) -> Result> { + let req = request.into_inner(); + let ep = self.get_entry_point(&req.entry_point)?; + + let hashes: Vec = req + .hashes + .into_iter() + .map(|h| { + if h.len() != 32 { + return Err(Status::invalid_argument("Hash must be 32 bytes long")); + } + Ok(H256::from_slice(&h)) + }) + .collect::, _>>()?; + + let resp = match self.mempool_runner.remove_ops(ep, &hashes) { + Ok(_) => RemoveOpsResponse { + result: Some(remove_ops_response::Result::Success(RemoveOpsSuccess {})), + }, + Err(error) => RemoveOpsResponse { + result: Some(remove_ops_response::Result::Failure(error.into())), + }, + }; + + Ok(Response::new(resp)) + } + + async fn remove_entities( + &self, + request: Request, + ) -> Result> { + let req = request.into_inner(); + let ep = self.get_entry_point(&req.entry_point)?; + let entities = req + .entities + .iter() + .map(|et| et.try_into()) + .collect::, _>>() + .map_err(|e| Status::internal(format!("Failed to convert to proto entity: {e}")))?; + + self.mempool_runner + .remove_entities(ep, &entities) + .map_err(|e| Status::internal(e.to_string()))?; + + let resp = match self.mempool_runner.remove_entities(ep, &entities) { + Ok(_) => RemoveEntitiesResponse { + result: Some(remove_entities_response::Result::Success( + RemoveEntitiesSuccess {}, + )), + }, + Err(error) => RemoveEntitiesResponse { + result: Some(remove_entities_response::Result::Failure(error.into())), + }, + }; + + Ok(Response::new(resp)) + } + + async fn debug_clear_state( + &self, + _request: Request, + ) -> Result> { + let resp = match self.mempool_runner.debug_clear_state() { + Ok(_) => DebugClearStateResponse { + result: Some(debug_clear_state_response::Result::Success( + DebugClearStateSuccess {}, + )), + }, + Err(error) => DebugClearStateResponse { + result: Some(debug_clear_state_response::Result::Failure(error.into())), + }, + }; + + Ok(Response::new(resp)) + } + + async fn debug_dump_mempool( + &self, + request: Request, + ) -> Result> { + let req = request.into_inner(); + let ep = self.get_entry_point(&req.entry_point)?; + + let resp = match self.mempool_runner.debug_dump_mempool(ep) { + Ok(ops) => DebugDumpMempoolResponse { + result: Some(debug_dump_mempool_response::Result::Success( + DebugDumpMempoolSuccess { + ops: ops.iter().map(MempoolOp::from).collect(), + }, + )), + }, + Err(error) => DebugDumpMempoolResponse { + result: Some(debug_dump_mempool_response::Result::Failure(error.into())), + }, + }; + + Ok(Response::new(resp)) + } + + async fn debug_set_reputation( + &self, + request: Request, + ) -> Result> { + let req = request.into_inner(); + let ep = self.get_entry_point(&req.entry_point)?; + + let reps = if req.reputations.is_empty() { + return Err(Status::invalid_argument( + "Reputation is required in DebugSetReputationRequest", + )); + } else { + req.reputations + }; + + let reps = reps + .into_iter() + .map(|r| r.try_into()) + .collect::, _>>() + .map_err(|e| { + Status::internal(format!("Failed to convert from proto reputation {e}")) + })?; + + let resp = match self.mempool_runner.debug_set_reputations(ep, &reps) { + Ok(_) => DebugSetReputationResponse { + result: Some(debug_set_reputation_response::Result::Success( + DebugSetReputationSuccess {}, + )), + }, + Err(error) => DebugSetReputationResponse { + result: Some(debug_set_reputation_response::Result::Failure(error.into())), + }, + }; + + Ok(Response::new(resp)) + } + + async fn debug_dump_reputation( + &self, + request: Request, + ) -> Result> { + let req = request.into_inner(); + let ep = self.get_entry_point(&req.entry_point)?; + + let resp = match self.mempool_runner.debug_dump_reputation(ep) { + Ok(reps) => DebugDumpReputationResponse { + result: Some(debug_dump_reputation_response::Result::Success( + DebugDumpReputationSuccess { + reputations: reps.into_iter().map(Into::into).collect(), + }, + )), + }, + Err(error) => DebugDumpReputationResponse { + result: Some(debug_dump_reputation_response::Result::Failure( + error.into(), + )), + }, + }; + + Ok(Response::new(resp)) + } +} diff --git a/src/op_pool/task.rs b/src/op_pool/task.rs index 83e4e8ae1..8e7c81912 100644 --- a/src/op_pool/task.rs +++ b/src/op_pool/task.rs @@ -1,49 +1,58 @@ -use std::{sync::Arc, time::Duration}; +use std::{net::SocketAddr, sync::Arc, time::Duration}; use anyhow::{bail, Context}; use ethers::providers::{ Http, HttpRateLimitRetryPolicy, JsonRpcClient, Provider, RetryClientBuilder, }; use futures::future; -use tokio::{sync::broadcast, task::JoinHandle, try_join}; -use tokio_util::sync::CancellationToken; -use tonic::{ - async_trait, - transport::{NamedService, Server}, +use tokio::{ + sync::{broadcast, mpsc}, + task::JoinHandle, + try_join, }; -use tonic_health::server::HealthReporter; +use tokio_util::sync::CancellationToken; +use tonic::async_trait; use url::Url; +use super::{ + mempool::{HourlyMovingAverageReputation, PoolConfig, ReputationParams}, + server::ServerRequest, +}; use crate::{ common::{ contracts::i_entry_point::IEntryPoint, emit::WithEntryPoint, - eth, - grpc::metrics::GrpcMetricsLayer, - handle, + eth, handle, handle::Task, precheck::{Prechecker, PrecheckerImpl}, - protos::op_pool::{op_pool_server::OpPoolServer, OP_POOL_FILE_DESCRIPTOR_SET}, simulation::{Simulator, SimulatorImpl}, }, op_pool::{ chain::{self, Chain, ChainUpdate}, emit::OpPoolEvent, - mempool::{uo_pool::UoPool, Mempool, PoolConfig}, - reputation::{HourlyMovingAverageReputation, ReputationParams}, - server::OpPoolImpl, + mempool::{uo_pool::UoPool, MempoolGroup}, + server::{spawn_local_mempool_server, spawn_remote_mempool_server}, }, }; +#[derive(Debug)] +pub enum PoolServerMode { + Local { + receiver: Option>, + }, + Remote { + addr: SocketAddr, + }, +} + #[derive(Debug)] pub struct Args { - pub port: u16, - pub host: String, pub http_url: String, pub http_poll_interval: Duration, pub chain_id: u64, pub chain_history_size: u64, pub pool_configs: Vec, + pub server_mode: PoolServerMode, } #[derive(Debug)] @@ -54,10 +63,8 @@ pub struct PoolTask { #[async_trait] impl Task for PoolTask { - async fn run(&self, shutdown_token: CancellationToken) -> anyhow::Result<()> { - let addr = format!("{}:{}", self.args.host, self.args.port).parse()?; + async fn run(&mut self, shutdown_token: CancellationToken) -> anyhow::Result<()> { let chain_id = self.args.chain_id; - tracing::info!("Starting server on {addr}"); tracing::info!("Chain id: {chain_id}"); tracing::info!("Http url: {:?}", self.args.http_url); @@ -106,10 +113,6 @@ impl Task for PoolTask { mempools.push(pool); mempool_handles.push(handle); } - let mempool_map = mempools - .into_iter() - .map(|mp| (mp.entry_point(), mp)) - .collect(); // handle to wait for mempools to terminate let mempool_handle = tokio::spawn(async move { @@ -121,27 +124,24 @@ impl Task for PoolTask { .context("should have joined mempool handles") }); - // gRPC server - let op_pool_server = OpPoolServer::new(OpPoolImpl::new(chain_id, mempool_map)); - let reflection_service = tonic_reflection::server::Builder::configure() - .register_encoded_file_descriptor_set(OP_POOL_FILE_DESCRIPTOR_SET) - .build()?; - - // health service - let (mut health_reporter, health_service) = tonic_health::server::health_reporter(); - Self::set_serving(&mut health_reporter, &op_pool_server).await; - - let metrics_layer = GrpcMetricsLayer::new("op_pool".to_string()); - let server_handle = tokio::spawn(async move { - Server::builder() - .layer(metrics_layer) - .add_service(op_pool_server) - .add_service(reflection_service) - .add_service(health_service) - .serve_with_shutdown(addr, async move { shutdown_token.cancelled().await }) - .await - .map_err(|err| anyhow::anyhow!("Server error: {err:?}")) - }); + let mempool_group = MempoolGroup::new(mempools); + let server_handle = match &mut self.args.server_mode { + PoolServerMode::Local { ref mut receiver } => { + let recv = receiver + .take() + .context("should have local server message receiver")?; + spawn_local_mempool_server(mempool_group, recv, shutdown_token.clone())? + } + PoolServerMode::Remote { addr } => { + spawn_remote_mempool_server( + self.args.chain_id, + mempool_group, + *addr, + shutdown_token.clone(), + ) + .await? + } + }; tracing::info!("Started op_pool"); @@ -222,8 +222,4 @@ impl PoolTask { Ok((mp, handle)) } - - async fn set_serving(reporter: &mut HealthReporter, _service: &S) { - reporter.set_serving::().await; - } } diff --git a/src/op_pool/types.rs b/src/op_pool/types.rs deleted file mode 100644 index dd777207e..000000000 --- a/src/op_pool/types.rs +++ /dev/null @@ -1,176 +0,0 @@ -use anyhow::Context; -use ethers::types::{Address, H256}; - -use super::{ - mempool::PoolOperation, - reputation::{Reputation, ReputationStatus}, -}; -use crate::common::{ - protos::{ - self, - op_pool::{ - EntityType as ProtoEntityType, MempoolOp, Reputation as ProtoReputation, - ReputationStatus as ProtoReputationStatus, UserOperation, - }, - ConversionError, - }, - types::ValidTimeRange, -}; - -impl TryFrom<&PoolOperation> for MempoolOp { - type Error = anyhow::Error; - - fn try_from(op: &PoolOperation) -> Result { - Ok(MempoolOp { - uo: Some(UserOperation::from(&op.uo)), - aggregator: op.aggregator.map_or(vec![], |a| a.as_bytes().to_vec()), - valid_after: op.valid_time_range.valid_after.seconds_since_epoch(), - valid_until: op.valid_time_range.valid_until.seconds_since_epoch(), - expected_code_hash: op.expected_code_hash.as_bytes().to_vec(), - sim_block_hash: op.sim_block_hash.as_bytes().to_vec(), - entities_needing_stake: op - .entities_needing_stake - .iter() - .map(|e| ProtoEntityType::from(*e).into()) - .collect(), - account_is_staked: op.account_is_staked, - }) - } -} - -pub const MISSING_USER_OP_ERR_STR: &str = "Mempool op should contain user operation"; -impl TryFrom for PoolOperation { - type Error = anyhow::Error; - - fn try_from(op: MempoolOp) -> Result { - let uo = op.uo.context(MISSING_USER_OP_ERR_STR)?.try_into()?; - - let aggregator: Option
= if op.aggregator.is_empty() { - None - } else { - Some(protos::from_bytes(&op.aggregator)?) - }; - - let valid_time_range = ValidTimeRange::new(op.valid_after.into(), op.valid_until.into()); - - let expected_code_hash = H256::from_slice(&op.expected_code_hash); - let sim_block_hash = H256::from_slice(&op.sim_block_hash); - let entities_needing_stake = op - .entities_needing_stake - .into_iter() - .map(|e| { - let pe = ProtoEntityType::from_i32(e).ok_or(ConversionError::InvalidEntity(e))?; - pe.try_into() - }) - .collect::, ConversionError>>()?; - - Ok(PoolOperation { - uo, - aggregator, - valid_time_range, - expected_code_hash, - entities_needing_stake, - sim_block_hash, - account_is_staked: op.account_is_staked, - }) - } -} - -impl From for ProtoReputationStatus { - fn from(status: ReputationStatus) -> Self { - match status { - ReputationStatus::Ok => ProtoReputationStatus::Ok, - ReputationStatus::Throttled => ProtoReputationStatus::Throttled, - ReputationStatus::Banned => ProtoReputationStatus::Banned, - } - } -} - -impl From for ProtoReputation { - fn from(rep: Reputation) -> Self { - ProtoReputation { - address: rep.address.as_bytes().to_vec(), - status: ProtoReputationStatus::from(rep.status).into(), - ops_seen: rep.ops_seen, - ops_included: rep.ops_included, - } - } -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::common::{contracts::shared_types, protos::op_pool, types::Timestamp}; - - const TEST_ADDRESS_ARR: [u8; 20] = [ - 0x11, 0xAB, 0xB0, 0x5d, 0x9A, 0xd3, 0x18, 0xbf, 0x65, 0x65, 0x26, 0x72, 0xB1, 0x3b, 0x1d, - 0xcb, 0x0E, 0x6D, 0x4a, 0x32, - ]; - const TEST_ADDRESS_STR: &str = "0x11aBB05d9Ad318bf65652672B13b1dcB0E6D4a32"; - - #[test] - fn test_mempool_op_to_pool_op() { - let now_secs = Timestamp::now().seconds_since_epoch(); - - let mempool_op = MempoolOp { - uo: Some(op_pool::UserOperation { - sender: TEST_ADDRESS_ARR.to_vec(), - nonce: vec![0; 32], - call_gas_limit: vec![0; 32], - verification_gas_limit: vec![0; 32], - pre_verification_gas: vec![0; 32], - max_fee_per_gas: vec![0; 32], - max_priority_fee_per_gas: vec![0; 32], - ..Default::default() - }), - aggregator: TEST_ADDRESS_ARR.to_vec(), - valid_after: now_secs, - valid_until: now_secs, - expected_code_hash: vec![0; 32], - sim_block_hash: vec![0; 32], - entities_needing_stake: vec![], - account_is_staked: false, - }; - - let pool_op: PoolOperation = mempool_op.try_into().unwrap(); - - assert_eq!(pool_op.uo.sender, TEST_ADDRESS_STR.parse().unwrap()); - assert_eq!(pool_op.aggregator, Some(TEST_ADDRESS_STR.parse().unwrap())); - assert_eq!(pool_op.expected_code_hash, H256::zero()); - } - - #[test] - fn test_pool_op_to_mempool_op() { - let now = Timestamp::now(); - let pool_op = PoolOperation { - uo: shared_types::UserOperation { - ..Default::default() - }, - aggregator: Some(TEST_ADDRESS_STR.parse().unwrap()), - valid_time_range: ValidTimeRange::new(now, now), - expected_code_hash: H256::random(), - entities_needing_stake: vec![], - sim_block_hash: H256::random(), - account_is_staked: false, - }; - - let mempool_op: MempoolOp = (&pool_op).try_into().unwrap(); - - assert_eq!( - mempool_op.uo, - Some(op_pool::UserOperation { - sender: vec![0; 20], - nonce: vec![0; 32], - call_gas_limit: vec![0; 32], - verification_gas_limit: vec![0; 32], - pre_verification_gas: vec![0; 32], - max_fee_per_gas: vec![0; 32], - max_priority_fee_per_gas: vec![0; 32], - ..Default::default() - }) - ); - assert_eq!(mempool_op.aggregator, TEST_ADDRESS_ARR.to_vec()); - assert_eq!(mempool_op.valid_after, now.seconds_since_epoch()); - assert_eq!(mempool_op.valid_until, now.seconds_since_epoch()); - } -} diff --git a/src/rpc/debug.rs b/src/rpc/debug.rs index b9f54ab64..615cf2f96 100644 --- a/src/rpc/debug.rs +++ b/src/rpc/debug.rs @@ -6,18 +6,15 @@ use jsonrpsee::{ use tonic::{async_trait, transport::Channel}; use super::{RpcReputation, RpcUserOperation}; -use crate::common::{ - protos::{ - builder::{ +use crate::{ + common::{ + protos::builder::{ builder_client, BundlingMode as ProtoBundlingMode, DebugSendBundleNowRequest, DebugSetBundlingModeRequest, }, - op_pool::{ - op_pool_client, DebugClearStateRequest, DebugDumpMempoolRequest, - DebugDumpReputationRequest, DebugSetReputationRequest, - }, + types::BundlingMode, }, - types::{BundlingMode, UserOperation}, + op_pool::PoolClient, }; /// Debug API @@ -46,16 +43,13 @@ pub trait DebugApi { async fn bundler_dump_reputation(&self, entry_point: Address) -> RpcResult>; } -pub struct DebugApi { - op_pool_client: op_pool_client::OpPoolClient, +pub struct DebugApi

{ + op_pool_client: P, builder_client: builder_client::BuilderClient, } -impl DebugApi { - pub fn new( - op_pool_client: op_pool_client::OpPoolClient, - builder_client: builder_client::BuilderClient, - ) -> Self { +impl

DebugApi

{ + pub fn new(op_pool_client: P, builder_client: builder_client::BuilderClient) -> Self { Self { op_pool_client, builder_client, @@ -64,12 +58,14 @@ impl DebugApi { } #[async_trait] -impl DebugApiServer for DebugApi { +impl

DebugApiServer for DebugApi

+where + P: PoolClient, +{ async fn bundler_clear_state(&self) -> RpcResult { let _ = self .op_pool_client - .clone() - .debug_clear_state(DebugClearStateRequest {}) + .debug_clear_state() .await .map_err(|e| RpcError::Custom(e.to_string()))?; @@ -77,25 +73,14 @@ impl DebugApiServer for DebugApi { } async fn bundler_dump_mempool(&self, entry_point: Address) -> RpcResult> { - let response = self + Ok(self .op_pool_client - .clone() - .debug_dump_mempool(DebugDumpMempoolRequest { - entry_point: entry_point.to_fixed_bytes().into(), - }) + .debug_dump_mempool(entry_point) .await - .map_err(|e| RpcError::Custom(e.to_string()))?; - - let ops = response - .into_inner() - .ops + .map_err(|e| RpcError::Custom(e.to_string()))? .into_iter() - .filter_map(|mop| mop.uo) - .map(|uo| uo.try_into()) - .collect::, _>>() - .map_err(|e| RpcError::Custom(e.to_string()))?; - - Ok(ops.into_iter().map(|uo| uo.into()).collect()) + .map(|pop| pop.uo.into()) + .collect::>()) } async fn bundler_send_bundle_now(&self) -> RpcResult { @@ -128,11 +113,10 @@ impl DebugApiServer for DebugApi { ) -> RpcResult { let _ = self .op_pool_client - .clone() - .debug_set_reputation(DebugSetReputationRequest { - entry_point: entry_point.to_fixed_bytes().into(), - reputations: reputations.into_iter().map(Into::into).collect(), - }) + .debug_set_reputations( + entry_point, + reputations.into_iter().map(Into::into).collect(), + ) .await; Ok("ok".to_string()) @@ -141,16 +125,11 @@ impl DebugApiServer for DebugApi { async fn bundler_dump_reputation(&self, entry_point: Address) -> RpcResult> { let result = self .op_pool_client - .clone() - .debug_dump_reputation(DebugDumpReputationRequest { - entry_point: entry_point.to_fixed_bytes().into(), - }) + .debug_dump_reputation(entry_point) .await .map_err(|e| RpcError::Custom(e.to_string()))?; result - .into_inner() - .reputations .into_iter() .map(|r| r.try_into()) .collect::, anyhow::Error>>() diff --git a/src/rpc/eth/error.rs b/src/rpc/eth/error.rs index 667e6e2f2..7dba4b7fe 100644 --- a/src/rpc/eth/error.rs +++ b/src/rpc/eth/error.rs @@ -14,7 +14,7 @@ use crate::{ simulation::SimulationViolation, types::{Entity, EntityType, Timestamp}, }, - op_pool::MempoolError, + op_pool::{MempoolError, PoolServerError}, }; // Error codes borrowed from jsonrpsee @@ -139,6 +139,18 @@ pub struct UnsupportedAggregatorData { pub aggregator: Address, } +impl From for EthRpcError { + fn from(value: PoolServerError) -> Self { + match value { + PoolServerError::MempoolError(e) => e.into(), + PoolServerError::UnexpectedResponse => { + EthRpcError::Internal(anyhow::anyhow!("unexpected response from pool server")) + } + PoolServerError::Other(e) => EthRpcError::Internal(e), + } + } +} + impl From for EthRpcError { fn from(value: MempoolError) -> Self { match value { @@ -161,6 +173,9 @@ impl From for EthRpcError { MempoolError::UnsupportedAggregator(a) => { EthRpcError::UnsupportedAggregator(UnsupportedAggregatorData { aggregator: a }) } + MempoolError::UnknownEntryPoint(a) => { + EthRpcError::EntryPointValidationRejected(format!("unknown entry point: {}", a)) + } } } } diff --git a/src/rpc/eth/mod.rs b/src/rpc/eth/mod.rs index 9fef17380..cacaeeadc 100644 --- a/src/rpc/eth/mod.rs +++ b/src/rpc/eth/mod.rs @@ -5,7 +5,7 @@ use std::{ sync::Arc, }; -use anyhow::{anyhow, bail, Context}; +use anyhow::{bail, Context}; use ethers::{ abi::{AbiDecode, RawLog}, prelude::EthEvent, @@ -18,7 +18,7 @@ use ethers::{ utils::to_checksum, }; use jsonrpsee::{core::RpcResult, proc_macros::rpc}; -use tonic::{async_trait, transport::Channel}; +use tonic::async_trait; use tracing::Level; use self::error::EthRpcError; @@ -30,10 +30,9 @@ use crate::{ UserOperationRevertReasonFilter, }, eth::log_to_raw_log, - protos::op_pool::{add_op_response, op_pool_client::OpPoolClient, AddOpRequest}, types::{UserOperation, BASE_CHAIN_IDS}, }, - op_pool::MempoolError, + op_pool::PoolClient, rpc::{ estimation::{GasEstimationError, GasEstimator, GasEstimatorImpl}, GasEstimate, RichUserOperation, RpcUserOperation, UserOperationOptionalGas, @@ -98,23 +97,24 @@ where } #[derive(Debug)] -pub struct EthApi { +pub struct EthApi { contexts_by_entry_point: HashMap>, provider: Arc>, chain_id: u64, - op_pool_client: OpPoolClient, + op_pool_client: P, } -impl EthApi +impl EthApi where C: JsonRpcClient + 'static, + P: PoolClient, { #[allow(clippy::too_many_arguments)] pub fn new( provider: Arc>, entry_points: Vec

, chain_id: u64, - op_pool_client: OpPoolClient, + op_pool_client: P, estimation_settings: estimation::Settings, ) -> Self { let contexts_by_entry_point = entry_points @@ -308,9 +308,10 @@ where } #[async_trait] -impl EthApiServer for EthApi +impl EthApiServer for EthApi where C: JsonRpcClient + 'static, + P: PoolClient, { async fn send_user_operation( &self, @@ -322,32 +323,12 @@ where "supplied entry point addr is not a known entry point".to_string(), ))?; } - - let op: UserOperation = op.into(); - - let add_op_result = self + Ok(self .op_pool_client - .clone() - .add_op(AddOpRequest { - entry_point: entry_point.as_bytes().to_vec(), - op: Some((&op).into()), - }) + .add_op(entry_point, op.into()) .await .map_err(EthRpcError::from) - .log_on_error_level(Level::DEBUG, "failed to add op to the mempool")?; - - match add_op_result.into_inner().result { - Some(add_op_response::Result::Success(s)) => Ok(H256::from_slice(&s.hash)), - Some(add_op_response::Result::Failure(f)) => { - Err(EthRpcError::from(MempoolError::try_from( - f.error.context("should have received error from op pool")?, - )?)) - .log_on_error_level(Level::DEBUG, "failed to add op to the mempool")? - } - None => Err(EthRpcError::Internal(anyhow!( - "should have received result from op pool" - )))?, - } + .log_on_error_level(Level::DEBUG, "failed to add op to the mempool")?) } async fn estimate_user_operation_gas( @@ -473,7 +454,7 @@ where } // Filter receipt logs to match just those belonging to the user op - let filtered_logs = EthApi::::filter_receipt_logs_matching_user_op(&log, &tx_receipt) + let filtered_logs = EthApi::::filter_receipt_logs_matching_user_op(&log, &tx_receipt) .context("should have found receipt logs matching user op")?; // Decode log and find failure reason if not success @@ -483,7 +464,7 @@ where let reason: String = if uo_event.success { "".to_owned() } else { - EthApi::::get_user_operation_failure_reason(&tx_receipt.logs, hash) + EthApi::::get_user_operation_failure_reason(&tx_receipt.logs, hash) .context("should have found revert reason if tx wasn't successful")? .unwrap_or_default() }; @@ -525,6 +506,7 @@ mod tests { }; use super::*; + use crate::op_pool::MockPoolClient; const UO_OP_TOPIC: &str = "user-op-event-topic"; @@ -538,7 +520,10 @@ mod tests { given_log(UO_OP_TOPIC, "another-hash"), ]); - let result = EthApi::::filter_receipt_logs_matching_user_op(&reference_log, &receipt); + let result = EthApi::::filter_receipt_logs_matching_user_op( + &reference_log, + &receipt, + ); assert!(result.is_ok(), "{}", result.unwrap_err()); let result = result.unwrap(); @@ -557,7 +542,10 @@ mod tests { given_log(UO_OP_TOPIC, "another-hash"), ]); - let result = EthApi::::filter_receipt_logs_matching_user_op(&reference_log, &receipt); + let result = EthApi::::filter_receipt_logs_matching_user_op( + &reference_log, + &receipt, + ); assert!(result.is_ok(), "{}", result.unwrap_err()); let result = result.unwrap(); @@ -576,7 +564,10 @@ mod tests { reference_log.clone(), ]); - let result = EthApi::::filter_receipt_logs_matching_user_op(&reference_log, &receipt); + let result = EthApi::::filter_receipt_logs_matching_user_op( + &reference_log, + &receipt, + ); assert!(result.is_ok(), "{}", result.unwrap_err()); let result = result.unwrap(); @@ -599,7 +590,10 @@ mod tests { reference_log.clone(), ]); - let result = EthApi::::filter_receipt_logs_matching_user_op(&reference_log, &receipt); + let result = EthApi::::filter_receipt_logs_matching_user_op( + &reference_log, + &receipt, + ); assert!(result.is_ok(), "{}", result.unwrap_err()); let result = result.unwrap(); @@ -621,7 +615,10 @@ mod tests { given_log(UO_OP_TOPIC, "other-hash"), ]); - let result = EthApi::::filter_receipt_logs_matching_user_op(&reference_log, &receipt); + let result = EthApi::::filter_receipt_logs_matching_user_op( + &reference_log, + &receipt, + ); assert!(result.is_ok(), "{}", result.unwrap_err()); let result = result.unwrap(); @@ -639,7 +636,10 @@ mod tests { given_log("another-topic-2", "some-hash"), ]); - let result = EthApi::::filter_receipt_logs_matching_user_op(&reference_log, &receipt); + let result = EthApi::::filter_receipt_logs_matching_user_op( + &reference_log, + &receipt, + ); assert!(result.is_err(), "{:?}", result.unwrap()); } diff --git a/src/rpc/health.rs b/src/rpc/health.rs index 801b8768f..86be3a946 100644 --- a/src/rpc/health.rs +++ b/src/rpc/health.rs @@ -3,18 +3,20 @@ use jsonrpsee::{core::RpcResult, proc_macros::rpc}; use tonic::{async_trait, transport::Channel}; use tonic_health::pb::{health_client::HealthClient, HealthCheckRequest}; +use crate::op_pool::{LocalPoolClient, PoolClient}; + #[rpc(server, namespace = "system")] pub trait SystemApi { #[method(name = "health")] async fn get_health(&self) -> RpcResult; } -pub struct SystemApi { +pub struct RemoteHealthCheck { op_pool_health_client: HealthClient, builder_health_client: HealthClient, } -impl SystemApi { +impl RemoteHealthCheck { pub fn new( op_pool_health_client: HealthClient, builder_health_client: HealthClient, @@ -27,7 +29,7 @@ impl SystemApi { } #[async_trait] -impl SystemApiServer for SystemApi { +impl SystemApiServer for RemoteHealthCheck { async fn get_health(&self) -> RpcResult { self.op_pool_health_client .clone() @@ -44,3 +46,25 @@ impl SystemApiServer for SystemApi { Ok("ok".to_string()) } } + +pub struct LocalHealthCheck { + pool_client: LocalPoolClient, +} + +impl LocalHealthCheck { + pub fn new(pool_client: LocalPoolClient) -> Self { + Self { pool_client } + } +} + +#[async_trait] +impl SystemApiServer for LocalHealthCheck { + async fn get_health(&self) -> RpcResult { + self.pool_client + .get_supported_entry_points() + .await + .context("Op pool server should be live")?; + + Ok("ok".to_string()) + } +} diff --git a/src/rpc/mod.rs b/src/rpc/mod.rs index e7b34610f..ed6ec16ae 100644 --- a/src/rpc/mod.rs +++ b/src/rpc/mod.rs @@ -5,7 +5,6 @@ mod metrics; mod rundler; mod task; -use anyhow::bail; pub use debug::DebugApiClient; pub use eth::{estimation, EthApiClient}; use ethers::{ @@ -17,12 +16,9 @@ use serde::{de, Deserialize, Deserializer, Serialize, Serializer}; use strum; pub use task::*; -use crate::common::{ - protos::{ - self, - op_pool::{Reputation, ReputationStatus}, - }, - types::UserOperation, +use crate::{ + common::types::UserOperation, + op_pool::{Reputation, ReputationStatus}, }; /// API namespace @@ -300,7 +296,6 @@ impl Serialize for ReputationStatus { S: Serializer, { match self { - ReputationStatus::Unspecified => serializer.serialize_str("unspecified"), ReputationStatus::Ok => serializer.serialize_str("ok"), ReputationStatus::Throttled => serializer.serialize_str("throttled"), ReputationStatus::Banned => serializer.serialize_str("banned"), @@ -315,7 +310,6 @@ impl<'de> Deserialize<'de> for ReputationStatus { { let s = String::deserialize(deserializer)?; match s.as_str() { - "unspecified" => Ok(ReputationStatus::Unspecified), "ok" => Ok(ReputationStatus::Ok), "throttled" => Ok(ReputationStatus::Throttled), "banned" => Ok(ReputationStatus::Banned), @@ -324,19 +318,6 @@ impl<'de> Deserialize<'de> for ReputationStatus { } } -impl TryFrom for ReputationStatus { - type Error = anyhow::Error; - - fn try_from(status: i32) -> Result { - match status { - 1 => Ok(ReputationStatus::Ok), - 2 => Ok(ReputationStatus::Throttled), - 3 => Ok(ReputationStatus::Banned), - _ => bail!("Invalid reputation status {status}"), - } - } -} - #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct RpcReputation { pub address: Address, @@ -348,10 +329,10 @@ pub struct RpcReputation { impl From for Reputation { fn from(rpc_reputation: RpcReputation) -> Self { Reputation { - address: rpc_reputation.address.as_bytes().to_vec(), + address: rpc_reputation.address, ops_seen: rpc_reputation.ops_seen.as_u64(), ops_included: rpc_reputation.ops_included.as_u64(), - status: rpc_reputation.status.into(), + status: rpc_reputation.status, } } } @@ -361,10 +342,10 @@ impl TryFrom for RpcReputation { fn try_from(reputation: Reputation) -> Result { Ok(RpcReputation { - address: protos::from_bytes(&reputation.address)?, + address: reputation.address, ops_seen: reputation.ops_seen.into(), ops_included: reputation.ops_included.into(), - status: reputation.status.try_into()?, + status: reputation.status, }) } } diff --git a/src/rpc/task.rs b/src/rpc/task.rs index 21229cf27..973e8f8e9 100644 --- a/src/rpc/task.rs +++ b/src/rpc/task.rs @@ -2,14 +2,14 @@ use std::{net::SocketAddr, str::FromStr, sync::Arc, time::Duration}; use anyhow::{bail, Context}; use ethers::{ - providers::{Http, HttpRateLimitRetryPolicy, Provider, RetryClientBuilder}, + providers::{Http, HttpRateLimitRetryPolicy, Provider, RetryClient, RetryClientBuilder}, types::Address, }; use jsonrpsee::{ server::{middleware::proxy_get_request::ProxyGetRequestLayer, ServerBuilder}, RpcModule, }; -use tokio::{select, try_join}; +use tokio::select; use tokio_util::sync::CancellationToken; use tonic::{ async_trait, @@ -24,13 +24,14 @@ use crate::{ common::{ handle::Task, precheck, - protos::{builder::builder_client::BuilderClient, op_pool::op_pool_client::OpPoolClient}, + protos::builder::builder_client::BuilderClient, server::{self, format_socket_addr}, }, + op_pool::{connect_remote_pool_client, LocalPoolClient, PoolClient, PoolClientMode}, rpc::{ debug::{DebugApi, DebugApiServer}, eth::{estimation, EthApi, EthApiServer}, - health::{SystemApi, SystemApiServer}, + health::{LocalHealthCheck, RemoteHealthCheck, SystemApiServer}, metrics::RpcMetricsLogger, rundler::{RundlerApi, RundlerApiServer}, }, @@ -40,7 +41,6 @@ use crate::{ pub struct Args { pub port: u16, pub host: String, - pub pool_url: String, pub builder_url: String, pub entry_points: Vec
, pub chain_id: u64, @@ -50,6 +50,7 @@ pub struct Args { pub estimation_settings: estimation::Settings, pub rpc_timeout: Duration, pub max_connections: u32, + pub pool_client_mode: PoolClientMode, } #[derive(Debug)] @@ -59,11 +60,13 @@ pub struct RpcTask { #[async_trait] impl Task for RpcTask { - async fn run(&self, shutdown_token: CancellationToken) -> anyhow::Result<()> { + async fn run(&mut self, shutdown_token: CancellationToken) -> anyhow::Result<()> { let addr: SocketAddr = format_socket_addr(&self.args.host, self.args.port).parse()?; tracing::info!("Starting rpc server on {}", addr); - let mut module = RpcModule::new(()); + if self.args.entry_points.is_empty() { + bail!("No entry points provided"); + } let parsed_url = Url::parse(&self.args.rpc_url).context("Invalid RPC URL")?; let http = Http::new(parsed_url); @@ -79,67 +82,49 @@ impl Task for RpcTask { let provider = Arc::new(Provider::new(client)); - let op_pool_uri = - Uri::from_str(&self.args.pool_url).context("should be a valid URI for op_pool")?; - let builder_uri = - Uri::from_str(&self.args.builder_url).context("should be a valid URI for op_pool")?; - let (op_pool_client, builder_client) = Self::connect_clients_with_shutdown( - &self.args.pool_url, - &self.args.builder_url, - shutdown_token.clone(), - ) - .await?; - - let op_pool_health_client = HealthClient::new( - Channel::builder(op_pool_uri) - .connect() - .await - .context("should have connected to op_pool health service channel")?, - ); - let builder_health_client = HealthClient::new( - Channel::builder(builder_uri) - .connect() - .await - .context("should have connected to builder health service channel")?, - ); - info!("Connected to op_pool service at {}", self.args.pool_url); + // TODO(danc) local builder client + let builder_client = + Self::connect_remote_builder_client(&self.args.builder_url, shutdown_token.clone()) + .await?; info!("Connected to builder service at {}", self.args.builder_url); - if self.args.entry_points.is_empty() { - bail!("No entry points provided"); - } + let mut module = RpcModule::new(()); + match &self.args.pool_client_mode { + PoolClientMode::Local { sender } => { + let pool_client = LocalPoolClient::new(sender.clone()); + self.attach_namespaces(provider, pool_client.clone(), builder_client, &mut module)?; - for api in &self.args.api_namespaces { - match api { - ApiNamespace::Eth => module.merge( - EthApi::new( - provider.clone(), - self.args.entry_points.clone(), - self.args.chain_id, - op_pool_client.clone(), - self.args.estimation_settings, - ) - .into_rpc(), - )?, - ApiNamespace::Debug => module.merge( - DebugApi::new(op_pool_client.clone(), builder_client.clone()).into_rpc(), - )?, - ApiNamespace::Rundler => module.merge( - RundlerApi::new( - provider.clone(), - self.args.chain_id, - self.args.precheck_settings, - ) - .into_rpc(), - )?, + module.merge(LocalHealthCheck::new(pool_client).into_rpc())?; + } + PoolClientMode::Remote { url } => { + let pool_client = connect_remote_pool_client(url, shutdown_token.clone()).await?; + info!("Connected to op_pool service at {}", url); + self.attach_namespaces(provider, pool_client, builder_client, &mut module)?; + + let builder_uri = Uri::from_str(&self.args.builder_url) + .context("should be a valid URI for op_pool")?; + let op_pool_uri = + Uri::from_str(url).context("should be a valid URI for op_pool")?; + + let op_pool_health_client = HealthClient::new( + Channel::builder(op_pool_uri) + .connect() + .await + .context("should have connected to op_pool health service channel")?, + ); + let builder_health_client = HealthClient::new( + Channel::builder(builder_uri) + .connect() + .await + .context("should have connected to builder health service channel")?, + ); + module.merge( + RemoteHealthCheck::new(op_pool_health_client, builder_health_client).into_rpc(), + )?; } } - // Set up health check endpoint via GET /health - // registers the jsonrpc handler - // NOTE: I couldn't use module.register_async_method because it requires async move - // and neither the clients or the args.*_url are copyable - module.merge(SystemApi::new(op_pool_health_client, builder_health_client).into_rpc())?; + // Set up health check endpoint via GET /health registers the jsonrpc handler let service_builder = tower::ServiceBuilder::new() // Proxy `GET /health` requests to internal `system_health` method. .layer(ProxyGetRequestLayer::new("/health", "system_health")?) @@ -178,25 +163,53 @@ impl RpcTask { Box::new(self) } - async fn connect_clients_with_shutdown( - op_pool_url: &str, - builder_url: &str, + async fn connect_remote_builder_client( + url: &str, shutdown_token: CancellationToken, - ) -> anyhow::Result<(OpPoolClient, BuilderClient)> { + ) -> anyhow::Result> { select! { _ = shutdown_token.cancelled() => { tracing::error!("bailing from conneting client, server shutting down"); bail!("Server shutting down") } - res = async { - try_join!( - server::connect_with_retries("op pool from common", op_pool_url, OpPoolClient::connect), - server::connect_with_retries("builder from common", builder_url, BuilderClient::connect) - ) - .context("should connect to op pool and builder") - } => { - res + res = server::connect_with_retries("builder from common", url, BuilderClient::connect) => { + res.context("should connect to builder") + } + } + } + + fn attach_namespaces( + &self, + provider: Arc>>, + pool_client: C, + builder_client: BuilderClient, + module: &mut RpcModule<()>, + ) -> anyhow::Result<()> { + for api in &self.args.api_namespaces { + match api { + ApiNamespace::Eth => module.merge( + EthApi::new( + provider.clone(), + self.args.entry_points.clone(), + self.args.chain_id, + pool_client.clone(), + self.args.estimation_settings, + ) + .into_rpc(), + )?, + ApiNamespace::Debug => module + .merge(DebugApi::new(pool_client.clone(), builder_client.clone()).into_rpc())?, + ApiNamespace::Rundler => module.merge( + RundlerApi::new( + provider.clone(), + self.args.chain_id, + self.args.precheck_settings, + ) + .into_rpc(), + )?, } } + + Ok(()) } }