diff --git a/src/builder/bundle_proposer.rs b/src/builder/bundle_proposer.rs
index 2172653a6..bc91b5eb5 100644
--- a/src/builder/bundle_proposer.rs
+++ b/src/builder/bundle_proposer.rs
@@ -28,7 +28,7 @@ use crate::{
Timestamp, UserOperation,
},
},
- op_pool::{PoolClient, PoolOperation},
+ op_pool::{PoolOperation, PoolServer},
};
/// A user op must be valid for at least this long into the future to be included.
@@ -76,9 +76,9 @@ where
S: Simulator,
E: EntryPointLike,
P: ProviderLike,
- C: PoolClient,
+ C: PoolServer,
{
- op_pool: C,
+ pool: C,
simulator: S,
entry_point: E,
provider: Arc
,
@@ -104,7 +104,7 @@ where
S: Simulator,
E: EntryPointLike,
P: ProviderLike,
- C: PoolClient,
+ C: PoolServer,
{
async fn make_bundle(&self, required_fees: Option) -> anyhow::Result {
let (ops, block_hash, bundle_fees) = try_join!(
@@ -180,10 +180,10 @@ where
S: Simulator,
E: EntryPointLike,
P: ProviderLike,
- C: PoolClient,
+ C: PoolServer,
{
pub fn new(
- op_pool: C,
+ pool: C,
simulator: S,
entry_point: E,
provider: Arc,
@@ -192,7 +192,7 @@ where
event_sender: broadcast::Sender>,
) -> Self {
Self {
- op_pool,
+ pool,
simulator,
entry_point,
provider: provider.clone(),
@@ -398,7 +398,7 @@ where
}
async fn get_ops_from_pool(&self) -> anyhow::Result> {
- self.op_pool
+ self.pool
.get_ops(self.entry_point.address(), self.settings.max_bundle_size)
.await
.context("should get ops from pool")
@@ -698,7 +698,7 @@ mod tests {
},
types::{MockEntryPointLike, MockProviderLike, ValidTimeRange},
},
- op_pool::MockPoolClient,
+ op_pool::MockPoolServer,
};
#[tokio::test]
@@ -1174,7 +1174,7 @@ mod tests {
})
.collect();
- let mut pool_client = MockPoolClient::new();
+ let mut pool_client = MockPoolServer::new();
pool_client
.expect_get_ops()
.returning(move |_, _| Ok(ops.clone()));
diff --git a/src/builder/bundle_sender.rs b/src/builder/bundle_sender.rs
index 3ed81dc7f..a015c66bb 100644
--- a/src/builder/bundle_sender.rs
+++ b/src/builder/bundle_sender.rs
@@ -8,12 +8,12 @@ use std::{
use anyhow::{bail, Context};
use ethers::types::{transaction::eip2718::TypedTransaction, Address, H256, U256};
+use futures_util::StreamExt;
use tokio::{
join,
sync::{broadcast, mpsc, oneshot},
time,
};
-use tokio_stream::StreamExt;
use tonic::async_trait;
use tracing::{error, info, trace, warn};
@@ -29,7 +29,7 @@ use crate::{
math,
types::{Entity, EntryPointLike, ExpectedStorage, UserOperation},
},
- op_pool::PoolClient,
+ op_pool::PoolServer,
};
// Overhead on gas estimates to account for inaccuracies.
@@ -52,7 +52,7 @@ where
P: BundleProposer,
E: EntryPointLike,
T: TransactionTracker,
- C: PoolClient,
+ C: PoolServer,
{
manual_bundling_mode: Arc,
send_bundle_receiver: mpsc::Receiver,
@@ -62,7 +62,7 @@ where
proposer: P,
entry_point: E,
transaction_tracker: T,
- pool_client: C,
+ pool: C,
settings: Settings,
event_sender: broadcast::Sender>,
}
@@ -100,15 +100,13 @@ where
P: BundleProposer,
E: EntryPointLike,
T: TransactionTracker,
- C: PoolClient,
+ C: PoolServer,
{
/// Loops forever, attempting to form and send a bundle on each new block,
/// then waiting for one bundle to be mined or dropped before forming the
/// next one.
async fn send_bundles_in_loop(&mut self) {
- let mut new_heads = if let Ok(new_blocks) = self.pool_client.subscribe_new_heads() {
- new_blocks
- } else {
+ let Ok(mut new_heads) = self.pool.subscribe_new_heads().await else {
error!("Failed to subscribe to new blocks");
return;
};
@@ -116,12 +114,12 @@ where
// The new_heads stream can buffer up multiple blocks, but we only want to consume the latest one.
// This task is used to consume the new heads and place them onto a channel that can be syncronously
// consumed until the latest block is reached.
- let (tx, mut rx) = mpsc::channel(1024);
+ let (tx, mut rx) = mpsc::unbounded_channel();
tokio::spawn(async move {
loop {
match new_heads.next().await {
Some(b) => {
- if tx.send(b).await.is_err() {
+ if tx.send(b).is_err() {
error!("Failed to buffer new block for bundle sender");
return;
}
@@ -213,7 +211,7 @@ where
P: BundleProposer,
E: EntryPointLike,
T: TransactionTracker,
- C: PoolClient,
+ C: PoolServer,
{
#[allow(clippy::too_many_arguments)]
pub fn new(
@@ -225,7 +223,7 @@ where
proposer: P,
entry_point: E,
transaction_tracker: T,
- pool_client: C,
+ pool: C,
settings: Settings,
event_sender: broadcast::Sender>,
) -> Self {
@@ -238,7 +236,7 @@ where
proposer,
entry_point,
transaction_tracker,
- pool_client,
+ pool,
settings,
event_sender,
}
@@ -477,7 +475,7 @@ where
}
async fn remove_ops_from_pool(&self, ops: &[UserOperation]) -> anyhow::Result<()> {
- self.pool_client
+ self.pool
.remove_ops(
self.entry_point.address(),
ops.iter()
@@ -489,7 +487,7 @@ where
}
async fn remove_entities_from_pool(&self, entities: &[Entity]) -> anyhow::Result<()> {
- self.pool_client
+ self.pool
.remove_entities(self.entry_point.address(), entities.to_vec())
.await
.context("builder should remove rejected entities from pool")
diff --git a/src/builder/task.rs b/src/builder/task.rs
index 9719d6bca..4ee798b0e 100644
--- a/src/builder/task.rs
+++ b/src/builder/task.rs
@@ -37,7 +37,7 @@ use crate::{
server::format_socket_addr,
simulation::{self, SimulatorImpl},
},
- op_pool::{connect_remote_pool_client, LocalPoolClient, PoolClientMode},
+ op_pool::PoolServer,
};
#[derive(Debug)]
@@ -64,18 +64,21 @@ pub struct Args {
pub max_blocks_to_wait_for_mine: u64,
pub replacement_fee_percent_increase: u64,
pub max_fee_increases: u64,
- pub pool_client_mode: PoolClientMode,
}
#[derive(Debug)]
-pub struct BuilderTask {
+pub struct BuilderTask {
args: Args,
event_sender: broadcast::Sender>,
+ pool: P,
}
#[async_trait]
-impl Task for BuilderTask {
- async fn run(&mut self, shutdown_token: CancellationToken) -> anyhow::Result<()> {
+impl Task for BuilderTask
+where
+ P: PoolServer + Clone,
+{
+ async fn run(mut self: Box, shutdown_token: CancellationToken) -> anyhow::Result<()> {
let addr = format_socket_addr(&self.args.host, self.args.port).parse()?;
info!("Starting builder server on {}", addr);
tracing::info!("Mempool config: {:?}", self.args.mempool_configs);
@@ -153,62 +156,28 @@ impl Task for BuilderTask {
let manual_bundling_mode = Arc::new(AtomicBool::new(false));
let (send_bundle_tx, send_bundle_rx) = mpsc::channel(1);
- let mut builder: Box = match &self.args.pool_client_mode {
- PoolClientMode::Local {
- req_sender,
- new_heads_receiver,
- } => {
- let pool_client =
- LocalPoolClient::new(req_sender.clone(), new_heads_receiver.resubscribe());
- let proposer = BundleProposerImpl::new(
- pool_client.clone(),
- simulator,
- entry_point.clone(),
- Arc::clone(&provider),
- self.args.chain_id,
- proposer_settings,
- self.event_sender.clone(),
- );
- Box::new(BundleSenderImpl::new(
- manual_bundling_mode.clone(),
- send_bundle_rx,
- self.args.chain_id,
- beneficiary,
- self.args.eth_poll_interval,
- proposer,
- entry_point,
- transaction_tracker,
- pool_client,
- builder_settings,
- self.event_sender.clone(),
- ))
- }
- PoolClientMode::Remote { url } => {
- let pool_client = connect_remote_pool_client(url, shutdown_token.clone()).await?;
- let proposer = BundleProposerImpl::new(
- pool_client.clone(),
- simulator,
- entry_point.clone(),
- Arc::clone(&provider),
- self.args.chain_id,
- proposer_settings,
- self.event_sender.clone(),
- );
- Box::new(BundleSenderImpl::new(
- manual_bundling_mode.clone(),
- send_bundle_rx,
- self.args.chain_id,
- beneficiary,
- self.args.eth_poll_interval,
- proposer,
- entry_point,
- transaction_tracker,
- pool_client,
- builder_settings,
- self.event_sender.clone(),
- ))
- }
- };
+ let proposer = BundleProposerImpl::new(
+ self.pool.clone(),
+ simulator,
+ entry_point.clone(),
+ Arc::clone(&provider),
+ self.args.chain_id,
+ proposer_settings,
+ self.event_sender.clone(),
+ );
+ let mut builder = BundleSenderImpl::new(
+ manual_bundling_mode.clone(),
+ send_bundle_rx,
+ self.args.chain_id,
+ beneficiary,
+ self.args.eth_poll_interval,
+ proposer,
+ entry_point,
+ transaction_tracker,
+ self.pool,
+ builder_settings,
+ self.event_sender.clone(),
+ );
let _builder_loop_guard =
{ SpawnGuard::spawn_with_guard(async move { builder.send_bundles_in_loop().await }) };
@@ -248,12 +217,20 @@ impl Task for BuilderTask {
}
}
-impl BuilderTask {
+impl BuilderTask
+where
+ P: PoolServer + Clone,
+{
pub fn new(
args: Args,
event_sender: broadcast::Sender>,
- ) -> BuilderTask {
- Self { args, event_sender }
+ pool: P,
+ ) -> Self {
+ Self {
+ args,
+ event_sender,
+ pool,
+ }
}
pub fn boxed(self) -> Box {
diff --git a/src/cli/builder.rs b/src/cli/builder.rs
index cfb3f13d9..cb1e6d03a 100644
--- a/src/cli/builder.rs
+++ b/src/cli/builder.rs
@@ -13,9 +13,9 @@ use crate::{
gas::PriorityFeeMode,
handle::spawn_tasks_with_shutdown,
mempool::MempoolConfig,
- server::format_server_addr,
+ server::{connect_with_retries_shutdown, format_server_addr},
},
- op_pool::PoolClientMode,
+ op_pool::RemotePoolClient,
};
/// CLI options for the builder
@@ -150,11 +150,7 @@ pub struct BuilderArgs {
impl BuilderArgs {
/// Convert the CLI arguments into the arguments for the builder combining
/// common and builder specific arguments.
- pub async fn to_args(
- &self,
- common: &CommonArgs,
- pool_client_mode: PoolClientMode,
- ) -> anyhow::Result {
+ pub async fn to_args(&self, common: &CommonArgs) -> anyhow::Result {
let priority_fee_mode = PriorityFeeMode::try_from(
common.priority_fee_mode_kind.as_str(),
common.priority_fee_mode_value,
@@ -204,7 +200,6 @@ impl BuilderArgs {
max_blocks_to_wait_for_mine: self.max_blocks_to_wait_for_mine,
replacement_fee_percent_increase: self.replacement_fee_percent_increase,
max_fee_increases: self.max_fee_increases,
- pool_client_mode,
})
}
@@ -238,12 +233,18 @@ pub async fn run(builder_args: BuilderCliArgs, common_args: CommonArgs) -> anyho
let (event_sender, event_rx) = broadcast::channel(EVENT_CHANNEL_CAPACITY);
emit::receive_and_log_events_with_filter(event_rx, is_nonspammy_event);
- let task_args = builder_args
- .to_args(&common_args, PoolClientMode::Remote { url: pool_url })
- .await?;
+ let task_args = builder_args.to_args(&common_args).await?;
+
+ let pool = connect_with_retries_shutdown(
+ "op pool from builder",
+ &pool_url,
+ RemotePoolClient::connect,
+ tokio::signal::ctrl_c(),
+ )
+ .await?;
spawn_tasks_with_shutdown(
- [BuilderTask::new(task_args, event_sender).boxed()],
+ [BuilderTask::new(task_args, event_sender, pool).boxed()],
tokio::signal::ctrl_c(),
)
.await;
diff --git a/src/cli/node/mod.rs b/src/cli/node/mod.rs
index 452d59878..73e30f027 100644
--- a/src/cli/node/mod.rs
+++ b/src/cli/node/mod.rs
@@ -1,5 +1,5 @@
use clap::Args;
-use tokio::sync::{broadcast, mpsc};
+use tokio::sync::broadcast;
use self::events::Event;
use crate::{
@@ -14,7 +14,7 @@ use crate::{
emit::{self, WithEntryPoint, EVENT_CHANNEL_CAPACITY},
handle,
},
- op_pool::{emit::OpPoolEvent, PoolClientMode, PoolServerMode, PoolTask},
+ op_pool::{emit::OpPoolEvent, LocalPoolBuilder, PoolServerMode, PoolTask},
rpc::RpcTask,
};
mod events;
@@ -40,37 +40,16 @@ pub async fn run(bundler_args: NodeCliArgs, common_args: CommonArgs) -> anyhow::
let builder_url = builder_args.url(false);
- let (tx, rx) = mpsc::channel(1024);
- let (new_heads_sender, _) = broadcast::channel(1024);
-
let pool_task_args = pool_args
- .to_args(
- &common_args,
- PoolServerMode::Local {
- req_receiver: Some(rx),
- new_heads_sender: Some(new_heads_sender.clone()),
- },
- )
- .await?;
- let builder_task_args = builder_args
- .to_args(
- &common_args,
- PoolClientMode::Local {
- req_sender: tx.clone(),
- new_heads_receiver: new_heads_sender.subscribe(),
- },
- )
+ .to_args(&common_args, PoolServerMode::Local)
.await?;
+ let builder_task_args = builder_args.to_args(&common_args).await?;
let rpc_task_args = rpc_args
.to_args(
&common_args,
builder_url,
(&common_args).try_into()?,
(&common_args).try_into()?,
- PoolClientMode::Local {
- req_sender: tx,
- new_heads_receiver: new_heads_sender.subscribe(),
- },
)
.await?;
@@ -97,11 +76,14 @@ pub async fn run(bundler_args: NodeCliArgs, common_args: CommonArgs) -> anyhow::
}
});
+ let pool_builder = LocalPoolBuilder::new(1024, 1024);
+ let pool_handle = pool_builder.get_handle();
+
handle::spawn_tasks_with_shutdown(
[
- PoolTask::new(pool_task_args, op_pool_event_sender).boxed(),
- BuilderTask::new(builder_task_args, builder_event_sender).boxed(),
- RpcTask::new(rpc_task_args).boxed(),
+ PoolTask::new(pool_task_args, op_pool_event_sender, pool_builder).boxed(),
+ BuilderTask::new(builder_task_args, builder_event_sender, pool_handle.clone()).boxed(),
+ RpcTask::new(rpc_task_args, pool_handle).boxed(),
],
tokio::signal::ctrl_c(),
)
diff --git a/src/cli/pool.rs b/src/cli/pool.rs
index 3b14cb546..7a85e510a 100644
--- a/src/cli/pool.rs
+++ b/src/cli/pool.rs
@@ -13,7 +13,7 @@ use crate::{
handle::spawn_tasks_with_shutdown,
mempool::MempoolConfig,
},
- op_pool::{self, PoolConfig, PoolServerMode, PoolTask},
+ op_pool::{self, LocalPoolBuilder, PoolConfig, PoolServerMode, PoolTask},
};
/// CLI options for the OP Pool
#[derive(Args, Debug)]
@@ -198,7 +198,7 @@ pub async fn run(pool_args: PoolCliArgs, common_args: CommonArgs) -> anyhow::Res
emit::receive_and_log_events_with_filter(event_rx, |_| true);
spawn_tasks_with_shutdown(
- [PoolTask::new(task_args, event_sender).boxed()],
+ [PoolTask::new(task_args, event_sender, LocalPoolBuilder::new(1024, 1024)).boxed()],
tokio::signal::ctrl_c(),
)
.await;
diff --git a/src/cli/rpc.rs b/src/cli/rpc.rs
index 5609aa644..e59ea335b 100644
--- a/src/cli/rpc.rs
+++ b/src/cli/rpc.rs
@@ -5,8 +5,8 @@ use clap::Args;
use super::CommonArgs;
use crate::{
- common::{handle::spawn_tasks_with_shutdown, precheck},
- op_pool::PoolClientMode,
+ common::{handle::spawn_tasks_with_shutdown, precheck, server::connect_with_retries_shutdown},
+ op_pool::RemotePoolClient,
rpc::{self, estimation, RpcTask},
};
@@ -71,7 +71,6 @@ impl RpcArgs {
builder_url: String,
precheck_settings: precheck::Settings,
estimation_settings: estimation::Settings,
- pool_client_mode: PoolClientMode,
) -> anyhow::Result {
let apis = self
.api
@@ -99,7 +98,6 @@ impl RpcArgs {
estimation_settings,
rpc_timeout: Duration::from_secs(self.timeout_seconds.parse()?),
max_connections: self.max_connections,
- pool_client_mode,
})
}
}
@@ -142,10 +140,21 @@ pub async fn run(rpc_args: RpcCliArgs, common_args: CommonArgs) -> anyhow::Resul
builder_url,
(&common_args).try_into()?,
(&common_args).try_into()?,
- PoolClientMode::Remote { url: pool_url },
)
.await?;
- spawn_tasks_with_shutdown([RpcTask::new(task_args).boxed()], tokio::signal::ctrl_c()).await;
+ let pool = connect_with_retries_shutdown(
+ "op pool from rpc",
+ &pool_url,
+ RemotePoolClient::connect,
+ tokio::signal::ctrl_c(),
+ )
+ .await?;
+
+ spawn_tasks_with_shutdown(
+ [RpcTask::new(task_args, pool).boxed()],
+ tokio::signal::ctrl_c(),
+ )
+ .await;
Ok(())
}
diff --git a/src/common/handle.rs b/src/common/handle.rs
index ec4cb1367..eb6196335 100644
--- a/src/common/handle.rs
+++ b/src/common/handle.rs
@@ -49,7 +49,7 @@ impl Drop for SpawnGuard {
#[async_trait]
pub trait Task: Sync + Send + 'static {
- async fn run(&mut self, shutdown_token: CancellationToken) -> anyhow::Result<()>;
+ async fn run(self: Box, shutdown_token: CancellationToken) -> anyhow::Result<()>;
}
pub async fn spawn_tasks_with_shutdown(
@@ -63,7 +63,7 @@ pub async fn spawn_tasks_with_shutdown(
let shutdown_token = CancellationToken::new();
let mut shutdown_scope = Some(shutdown_scope);
- let handles = tasks.into_iter().map(|mut task| {
+ let handles = tasks.into_iter().map(|task| {
let st = shutdown_token.clone();
let ss = shutdown_scope.clone();
async move {
diff --git a/src/common/server.rs b/src/common/server.rs
index f5bc64eeb..f4aa482de 100644
--- a/src/common/server.rs
+++ b/src/common/server.rs
@@ -1,6 +1,7 @@
use std::{future::Future, time::Duration};
-use anyhow::Context;
+use anyhow::{bail, Context};
+use tonic::async_trait;
use crate::common::{retry, retry::RetryOpts};
@@ -16,6 +17,27 @@ pub fn format_socket_addr(host: &String, port: u16) -> String {
format!("{}:{}", host, port)
}
+pub async fn connect_with_retries_shutdown(
+ server_name: &str,
+ url: &str,
+ func: F,
+ shutdown_signal: S,
+) -> anyhow::Result
+where
+ F: Fn(String) -> FutF,
+ FutF: Future