Skip to content

Commit

Permalink
refactor: rework the pool server classes
Browse files Browse the repository at this point in the history
  • Loading branch information
dancoombs committed Sep 2, 2023
1 parent 781543c commit f489dca
Show file tree
Hide file tree
Showing 22 changed files with 990 additions and 587 deletions.
20 changes: 10 additions & 10 deletions src/builder/bundle_proposer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use crate::{
Timestamp, UserOperation,
},
},
op_pool::{PoolClient, PoolOperation},
op_pool::{PoolOperation, PoolServer},
};

/// A user op must be valid for at least this long into the future to be included.
Expand Down Expand Up @@ -76,9 +76,9 @@ where
S: Simulator,
E: EntryPointLike,
P: ProviderLike,
C: PoolClient,
C: PoolServer,
{
op_pool: C,
pool: C,
simulator: S,
entry_point: E,
provider: Arc<P>,
Expand All @@ -104,7 +104,7 @@ where
S: Simulator,
E: EntryPointLike,
P: ProviderLike,
C: PoolClient,
C: PoolServer,
{
async fn make_bundle(&self, required_fees: Option<GasFees>) -> anyhow::Result<Bundle> {
let (ops, block_hash, bundle_fees) = try_join!(
Expand Down Expand Up @@ -180,10 +180,10 @@ where
S: Simulator,
E: EntryPointLike,
P: ProviderLike,
C: PoolClient,
C: PoolServer,
{
pub fn new(
op_pool: C,
pool: C,
simulator: S,
entry_point: E,
provider: Arc<P>,
Expand All @@ -192,7 +192,7 @@ where
event_sender: broadcast::Sender<WithEntryPoint<BuilderEvent>>,
) -> Self {
Self {
op_pool,
pool,
simulator,
entry_point,
provider: provider.clone(),
Expand Down Expand Up @@ -398,7 +398,7 @@ where
}

async fn get_ops_from_pool(&self) -> anyhow::Result<Vec<PoolOperation>> {
self.op_pool
self.pool
.get_ops(self.entry_point.address(), self.settings.max_bundle_size)
.await
.context("should get ops from pool")
Expand Down Expand Up @@ -698,7 +698,7 @@ mod tests {
},
types::{MockEntryPointLike, MockProviderLike, ValidTimeRange},
},
op_pool::MockPoolClient,
op_pool::MockPoolServer,
};

#[tokio::test]
Expand Down Expand Up @@ -1174,7 +1174,7 @@ mod tests {
})
.collect();

let mut pool_client = MockPoolClient::new();
let mut pool_client = MockPoolServer::new();
pool_client
.expect_get_ops()
.returning(move |_, _| Ok(ops.clone()));
Expand Down
28 changes: 13 additions & 15 deletions src/builder/bundle_sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@ use std::{

use anyhow::{bail, Context};
use ethers::types::{transaction::eip2718::TypedTransaction, Address, H256, U256};
use futures_util::StreamExt;
use tokio::{
join,
sync::{broadcast, mpsc, oneshot},
time,
};
use tokio_stream::StreamExt;
use tonic::async_trait;
use tracing::{error, info, trace, warn};

Expand All @@ -29,7 +29,7 @@ use crate::{
math,
types::{Entity, EntryPointLike, ExpectedStorage, UserOperation},
},
op_pool::PoolClient,
op_pool::PoolServer,
};

// Overhead on gas estimates to account for inaccuracies.
Expand All @@ -52,7 +52,7 @@ where
P: BundleProposer,
E: EntryPointLike,
T: TransactionTracker,
C: PoolClient,
C: PoolServer,
{
manual_bundling_mode: Arc<AtomicBool>,
send_bundle_receiver: mpsc::Receiver<SendBundleRequest>,
Expand All @@ -62,7 +62,7 @@ where
proposer: P,
entry_point: E,
transaction_tracker: T,
pool_client: C,
pool: C,
settings: Settings,
event_sender: broadcast::Sender<WithEntryPoint<BuilderEvent>>,
}
Expand Down Expand Up @@ -100,28 +100,26 @@ where
P: BundleProposer,
E: EntryPointLike,
T: TransactionTracker,
C: PoolClient,
C: PoolServer,
{
/// Loops forever, attempting to form and send a bundle on each new block,
/// then waiting for one bundle to be mined or dropped before forming the
/// next one.
async fn send_bundles_in_loop(&mut self) {
let mut new_heads = if let Ok(new_blocks) = self.pool_client.subscribe_new_heads() {
new_blocks
} else {
let Ok(mut new_heads) = self.pool.subscribe_new_heads().await else {
error!("Failed to subscribe to new blocks");
return;
};

// The new_heads stream can buffer up multiple blocks, but we only want to consume the latest one.
// This task is used to consume the new heads and place them onto a channel that can be syncronously
// consumed until the latest block is reached.
let (tx, mut rx) = mpsc::channel(1024);
let (tx, mut rx) = mpsc::unbounded_channel();
tokio::spawn(async move {
loop {
match new_heads.next().await {
Some(b) => {
if tx.send(b).await.is_err() {
if tx.send(b).is_err() {
error!("Failed to buffer new block for bundle sender");
return;
}
Expand Down Expand Up @@ -213,7 +211,7 @@ where
P: BundleProposer,
E: EntryPointLike,
T: TransactionTracker,
C: PoolClient,
C: PoolServer,
{
#[allow(clippy::too_many_arguments)]
pub fn new(
Expand All @@ -225,7 +223,7 @@ where
proposer: P,
entry_point: E,
transaction_tracker: T,
pool_client: C,
pool: C,
settings: Settings,
event_sender: broadcast::Sender<WithEntryPoint<BuilderEvent>>,
) -> Self {
Expand All @@ -238,7 +236,7 @@ where
proposer,
entry_point,
transaction_tracker,
pool_client,
pool,
settings,
event_sender,
}
Expand Down Expand Up @@ -477,7 +475,7 @@ where
}

async fn remove_ops_from_pool(&self, ops: &[UserOperation]) -> anyhow::Result<()> {
self.pool_client
self.pool
.remove_ops(
self.entry_point.address(),
ops.iter()
Expand All @@ -489,7 +487,7 @@ where
}

async fn remove_entities_from_pool(&self, entities: &[Entity]) -> anyhow::Result<()> {
self.pool_client
self.pool
.remove_entities(self.entry_point.address(), entities.to_vec())
.await
.context("builder should remove rejected entities from pool")
Expand Down
105 changes: 41 additions & 64 deletions src/builder/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use crate::{
server::format_socket_addr,
simulation::{self, SimulatorImpl},
},
op_pool::{connect_remote_pool_client, LocalPoolClient, PoolClientMode},
op_pool::PoolServer,
};

#[derive(Debug)]
Expand All @@ -64,18 +64,21 @@ pub struct Args {
pub max_blocks_to_wait_for_mine: u64,
pub replacement_fee_percent_increase: u64,
pub max_fee_increases: u64,
pub pool_client_mode: PoolClientMode,
}

#[derive(Debug)]
pub struct BuilderTask {
pub struct BuilderTask<P> {
args: Args,
event_sender: broadcast::Sender<WithEntryPoint<BuilderEvent>>,
pool: P,
}

#[async_trait]
impl Task for BuilderTask {
async fn run(&mut self, shutdown_token: CancellationToken) -> anyhow::Result<()> {
impl<P> Task for BuilderTask<P>
where
P: PoolServer + Clone,
{
async fn run(mut self: Box<Self>, shutdown_token: CancellationToken) -> anyhow::Result<()> {
let addr = format_socket_addr(&self.args.host, self.args.port).parse()?;
info!("Starting builder server on {}", addr);
tracing::info!("Mempool config: {:?}", self.args.mempool_configs);
Expand Down Expand Up @@ -153,62 +156,28 @@ impl Task for BuilderTask {
let manual_bundling_mode = Arc::new(AtomicBool::new(false));
let (send_bundle_tx, send_bundle_rx) = mpsc::channel(1);

let mut builder: Box<dyn BundleSender> = match &self.args.pool_client_mode {
PoolClientMode::Local {
req_sender,
new_heads_receiver,
} => {
let pool_client =
LocalPoolClient::new(req_sender.clone(), new_heads_receiver.resubscribe());
let proposer = BundleProposerImpl::new(
pool_client.clone(),
simulator,
entry_point.clone(),
Arc::clone(&provider),
self.args.chain_id,
proposer_settings,
self.event_sender.clone(),
);
Box::new(BundleSenderImpl::new(
manual_bundling_mode.clone(),
send_bundle_rx,
self.args.chain_id,
beneficiary,
self.args.eth_poll_interval,
proposer,
entry_point,
transaction_tracker,
pool_client,
builder_settings,
self.event_sender.clone(),
))
}
PoolClientMode::Remote { url } => {
let pool_client = connect_remote_pool_client(url, shutdown_token.clone()).await?;
let proposer = BundleProposerImpl::new(
pool_client.clone(),
simulator,
entry_point.clone(),
Arc::clone(&provider),
self.args.chain_id,
proposer_settings,
self.event_sender.clone(),
);
Box::new(BundleSenderImpl::new(
manual_bundling_mode.clone(),
send_bundle_rx,
self.args.chain_id,
beneficiary,
self.args.eth_poll_interval,
proposer,
entry_point,
transaction_tracker,
pool_client,
builder_settings,
self.event_sender.clone(),
))
}
};
let proposer = BundleProposerImpl::new(
self.pool.clone(),
simulator,
entry_point.clone(),
Arc::clone(&provider),
self.args.chain_id,
proposer_settings,
self.event_sender.clone(),
);
let mut builder = BundleSenderImpl::new(
manual_bundling_mode.clone(),
send_bundle_rx,
self.args.chain_id,
beneficiary,
self.args.eth_poll_interval,
proposer,
entry_point,
transaction_tracker,
self.pool,
builder_settings,
self.event_sender.clone(),
);

let _builder_loop_guard =
{ SpawnGuard::spawn_with_guard(async move { builder.send_bundles_in_loop().await }) };
Expand Down Expand Up @@ -248,12 +217,20 @@ impl Task for BuilderTask {
}
}

impl BuilderTask {
impl<P> BuilderTask<P>
where
P: PoolServer + Clone,
{
pub fn new(
args: Args,
event_sender: broadcast::Sender<WithEntryPoint<BuilderEvent>>,
) -> BuilderTask {
Self { args, event_sender }
pool: P,
) -> Self {
Self {
args,
event_sender,
pool,
}
}

pub fn boxed(self) -> Box<dyn Task> {
Expand Down
25 changes: 13 additions & 12 deletions src/cli/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ use crate::{
gas::PriorityFeeMode,
handle::spawn_tasks_with_shutdown,
mempool::MempoolConfig,
server::format_server_addr,
server::{connect_with_retries_shutdown, format_server_addr},
},
op_pool::PoolClientMode,
op_pool::RemotePoolClient,
};

/// CLI options for the builder
Expand Down Expand Up @@ -150,11 +150,7 @@ pub struct BuilderArgs {
impl BuilderArgs {
/// Convert the CLI arguments into the arguments for the builder combining
/// common and builder specific arguments.
pub async fn to_args(
&self,
common: &CommonArgs,
pool_client_mode: PoolClientMode,
) -> anyhow::Result<builder::Args> {
pub async fn to_args(&self, common: &CommonArgs) -> anyhow::Result<builder::Args> {
let priority_fee_mode = PriorityFeeMode::try_from(
common.priority_fee_mode_kind.as_str(),
common.priority_fee_mode_value,
Expand Down Expand Up @@ -204,7 +200,6 @@ impl BuilderArgs {
max_blocks_to_wait_for_mine: self.max_blocks_to_wait_for_mine,
replacement_fee_percent_increase: self.replacement_fee_percent_increase,
max_fee_increases: self.max_fee_increases,
pool_client_mode,
})
}

Expand Down Expand Up @@ -238,12 +233,18 @@ pub async fn run(builder_args: BuilderCliArgs, common_args: CommonArgs) -> anyho
let (event_sender, event_rx) = broadcast::channel(EVENT_CHANNEL_CAPACITY);
emit::receive_and_log_events_with_filter(event_rx, is_nonspammy_event);

let task_args = builder_args
.to_args(&common_args, PoolClientMode::Remote { url: pool_url })
.await?;
let task_args = builder_args.to_args(&common_args).await?;

let pool = connect_with_retries_shutdown(
"op pool from builder",
&pool_url,
RemotePoolClient::connect,
tokio::signal::ctrl_c(),
)
.await?;

spawn_tasks_with_shutdown(
[BuilderTask::new(task_args, event_sender).boxed()],
[BuilderTask::new(task_args, event_sender, pool).boxed()],
tokio::signal::ctrl_c(),
)
.await;
Expand Down
Loading

0 comments on commit f489dca

Please sign in to comment.