Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: rework the pool server classes #328

Merged
merged 1 commit into from
Sep 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 10 additions & 10 deletions src/builder/bundle_proposer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use crate::{
Timestamp, UserOperation,
},
},
op_pool::{PoolClient, PoolOperation},
op_pool::{PoolOperation, PoolServer},
};

/// A user op must be valid for at least this long into the future to be included.
Expand Down Expand Up @@ -76,9 +76,9 @@ where
S: Simulator,
E: EntryPointLike,
P: ProviderLike,
C: PoolClient,
C: PoolServer,
{
op_pool: C,
pool: C,
simulator: S,
entry_point: E,
provider: Arc<P>,
Expand All @@ -104,7 +104,7 @@ where
S: Simulator,
E: EntryPointLike,
P: ProviderLike,
C: PoolClient,
C: PoolServer,
{
async fn make_bundle(&self, required_fees: Option<GasFees>) -> anyhow::Result<Bundle> {
let (ops, block_hash, bundle_fees) = try_join!(
Expand Down Expand Up @@ -180,10 +180,10 @@ where
S: Simulator,
E: EntryPointLike,
P: ProviderLike,
C: PoolClient,
C: PoolServer,
{
pub fn new(
op_pool: C,
pool: C,
simulator: S,
entry_point: E,
provider: Arc<P>,
Expand All @@ -192,7 +192,7 @@ where
event_sender: broadcast::Sender<WithEntryPoint<BuilderEvent>>,
) -> Self {
Self {
op_pool,
pool,
simulator,
entry_point,
provider: provider.clone(),
Expand Down Expand Up @@ -398,7 +398,7 @@ where
}

async fn get_ops_from_pool(&self) -> anyhow::Result<Vec<PoolOperation>> {
self.op_pool
self.pool
.get_ops(self.entry_point.address(), self.settings.max_bundle_size)
.await
.context("should get ops from pool")
Expand Down Expand Up @@ -698,7 +698,7 @@ mod tests {
},
types::{MockEntryPointLike, MockProviderLike, ValidTimeRange},
},
op_pool::MockPoolClient,
op_pool::MockPoolServer,
};

#[tokio::test]
Expand Down Expand Up @@ -1174,7 +1174,7 @@ mod tests {
})
.collect();

let mut pool_client = MockPoolClient::new();
let mut pool_client = MockPoolServer::new();
pool_client
.expect_get_ops()
.returning(move |_, _| Ok(ops.clone()));
Expand Down
28 changes: 13 additions & 15 deletions src/builder/bundle_sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@ use std::{

use anyhow::{bail, Context};
use ethers::types::{transaction::eip2718::TypedTransaction, Address, H256, U256};
use futures_util::StreamExt;
use tokio::{
join,
sync::{broadcast, mpsc, oneshot},
time,
};
use tokio_stream::StreamExt;
use tonic::async_trait;
use tracing::{error, info, trace, warn};

Expand All @@ -29,7 +29,7 @@ use crate::{
math,
types::{Entity, EntryPointLike, ExpectedStorage, UserOperation},
},
op_pool::PoolClient,
op_pool::PoolServer,
};

// Overhead on gas estimates to account for inaccuracies.
Expand All @@ -52,7 +52,7 @@ where
P: BundleProposer,
E: EntryPointLike,
T: TransactionTracker,
C: PoolClient,
C: PoolServer,
{
manual_bundling_mode: Arc<AtomicBool>,
send_bundle_receiver: mpsc::Receiver<SendBundleRequest>,
Expand All @@ -62,7 +62,7 @@ where
proposer: P,
entry_point: E,
transaction_tracker: T,
pool_client: C,
pool: C,
settings: Settings,
event_sender: broadcast::Sender<WithEntryPoint<BuilderEvent>>,
}
Expand Down Expand Up @@ -100,28 +100,26 @@ where
P: BundleProposer,
E: EntryPointLike,
T: TransactionTracker,
C: PoolClient,
C: PoolServer,
{
/// Loops forever, attempting to form and send a bundle on each new block,
/// then waiting for one bundle to be mined or dropped before forming the
/// next one.
async fn send_bundles_in_loop(&mut self) {
let mut new_heads = if let Ok(new_blocks) = self.pool_client.subscribe_new_heads() {
new_blocks
} else {
let Ok(mut new_heads) = self.pool.subscribe_new_heads().await else {
error!("Failed to subscribe to new blocks");
return;
};

// The new_heads stream can buffer up multiple blocks, but we only want to consume the latest one.
// This task is used to consume the new heads and place them onto a channel that can be syncronously
// consumed until the latest block is reached.
let (tx, mut rx) = mpsc::channel(1024);
let (tx, mut rx) = mpsc::unbounded_channel();
dancoombs marked this conversation as resolved.
Show resolved Hide resolved
tokio::spawn(async move {
loop {
match new_heads.next().await {
Some(b) => {
if tx.send(b).await.is_err() {
if tx.send(b).is_err() {
error!("Failed to buffer new block for bundle sender");
return;
}
Expand Down Expand Up @@ -213,7 +211,7 @@ where
P: BundleProposer,
E: EntryPointLike,
T: TransactionTracker,
C: PoolClient,
C: PoolServer,
{
#[allow(clippy::too_many_arguments)]
pub fn new(
Expand All @@ -225,7 +223,7 @@ where
proposer: P,
entry_point: E,
transaction_tracker: T,
pool_client: C,
pool: C,
settings: Settings,
event_sender: broadcast::Sender<WithEntryPoint<BuilderEvent>>,
) -> Self {
Expand All @@ -238,7 +236,7 @@ where
proposer,
entry_point,
transaction_tracker,
pool_client,
pool,
settings,
event_sender,
}
Expand Down Expand Up @@ -477,7 +475,7 @@ where
}

async fn remove_ops_from_pool(&self, ops: &[UserOperation]) -> anyhow::Result<()> {
self.pool_client
self.pool
.remove_ops(
self.entry_point.address(),
ops.iter()
Expand All @@ -489,7 +487,7 @@ where
}

async fn remove_entities_from_pool(&self, entities: &[Entity]) -> anyhow::Result<()> {
self.pool_client
self.pool
.remove_entities(self.entry_point.address(), entities.to_vec())
.await
.context("builder should remove rejected entities from pool")
Expand Down
105 changes: 41 additions & 64 deletions src/builder/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use crate::{
server::format_socket_addr,
simulation::{self, SimulatorImpl},
},
op_pool::{connect_remote_pool_client, LocalPoolClient, PoolClientMode},
op_pool::PoolServer,
};

#[derive(Debug)]
Expand All @@ -64,18 +64,21 @@ pub struct Args {
pub max_blocks_to_wait_for_mine: u64,
pub replacement_fee_percent_increase: u64,
pub max_fee_increases: u64,
pub pool_client_mode: PoolClientMode,
}

#[derive(Debug)]
pub struct BuilderTask {
pub struct BuilderTask<P> {
args: Args,
event_sender: broadcast::Sender<WithEntryPoint<BuilderEvent>>,
pool: P,
}

#[async_trait]
impl Task for BuilderTask {
async fn run(&mut self, shutdown_token: CancellationToken) -> anyhow::Result<()> {
impl<P> Task for BuilderTask<P>
where
P: PoolServer + Clone,
{
async fn run(mut self: Box<Self>, shutdown_token: CancellationToken) -> anyhow::Result<()> {
let addr = format_socket_addr(&self.args.host, self.args.port).parse()?;
info!("Starting builder server on {}", addr);
tracing::info!("Mempool config: {:?}", self.args.mempool_configs);
Expand Down Expand Up @@ -153,62 +156,28 @@ impl Task for BuilderTask {
let manual_bundling_mode = Arc::new(AtomicBool::new(false));
let (send_bundle_tx, send_bundle_rx) = mpsc::channel(1);

let mut builder: Box<dyn BundleSender> = match &self.args.pool_client_mode {
PoolClientMode::Local {
req_sender,
new_heads_receiver,
} => {
let pool_client =
LocalPoolClient::new(req_sender.clone(), new_heads_receiver.resubscribe());
let proposer = BundleProposerImpl::new(
pool_client.clone(),
simulator,
entry_point.clone(),
Arc::clone(&provider),
self.args.chain_id,
proposer_settings,
self.event_sender.clone(),
);
Box::new(BundleSenderImpl::new(
manual_bundling_mode.clone(),
send_bundle_rx,
self.args.chain_id,
beneficiary,
self.args.eth_poll_interval,
proposer,
entry_point,
transaction_tracker,
pool_client,
builder_settings,
self.event_sender.clone(),
))
}
PoolClientMode::Remote { url } => {
let pool_client = connect_remote_pool_client(url, shutdown_token.clone()).await?;
let proposer = BundleProposerImpl::new(
pool_client.clone(),
simulator,
entry_point.clone(),
Arc::clone(&provider),
self.args.chain_id,
proposer_settings,
self.event_sender.clone(),
);
Box::new(BundleSenderImpl::new(
manual_bundling_mode.clone(),
send_bundle_rx,
self.args.chain_id,
beneficiary,
self.args.eth_poll_interval,
proposer,
entry_point,
transaction_tracker,
pool_client,
builder_settings,
self.event_sender.clone(),
))
}
};
let proposer = BundleProposerImpl::new(
self.pool.clone(),
simulator,
entry_point.clone(),
Arc::clone(&provider),
self.args.chain_id,
proposer_settings,
self.event_sender.clone(),
);
let mut builder = BundleSenderImpl::new(
manual_bundling_mode.clone(),
send_bundle_rx,
self.args.chain_id,
beneficiary,
self.args.eth_poll_interval,
proposer,
entry_point,
transaction_tracker,
self.pool,
builder_settings,
self.event_sender.clone(),
);

let _builder_loop_guard =
{ SpawnGuard::spawn_with_guard(async move { builder.send_bundles_in_loop().await }) };
Expand Down Expand Up @@ -248,12 +217,20 @@ impl Task for BuilderTask {
}
}

impl BuilderTask {
impl<P> BuilderTask<P>
where
P: PoolServer + Clone,
{
pub fn new(
args: Args,
event_sender: broadcast::Sender<WithEntryPoint<BuilderEvent>>,
) -> BuilderTask {
Self { args, event_sender }
pool: P,
) -> Self {
Self {
args,
event_sender,
pool,
}
}

pub fn boxed(self) -> Box<dyn Task> {
Expand Down
25 changes: 13 additions & 12 deletions src/cli/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ use crate::{
gas::PriorityFeeMode,
handle::spawn_tasks_with_shutdown,
mempool::MempoolConfig,
server::format_server_addr,
server::{connect_with_retries_shutdown, format_server_addr},
},
op_pool::PoolClientMode,
op_pool::RemotePoolClient,
};

/// CLI options for the builder
Expand Down Expand Up @@ -150,11 +150,7 @@ pub struct BuilderArgs {
impl BuilderArgs {
/// Convert the CLI arguments into the arguments for the builder combining
/// common and builder specific arguments.
pub async fn to_args(
&self,
common: &CommonArgs,
pool_client_mode: PoolClientMode,
) -> anyhow::Result<builder::Args> {
pub async fn to_args(&self, common: &CommonArgs) -> anyhow::Result<builder::Args> {
let priority_fee_mode = PriorityFeeMode::try_from(
common.priority_fee_mode_kind.as_str(),
common.priority_fee_mode_value,
Expand Down Expand Up @@ -204,7 +200,6 @@ impl BuilderArgs {
max_blocks_to_wait_for_mine: self.max_blocks_to_wait_for_mine,
replacement_fee_percent_increase: self.replacement_fee_percent_increase,
max_fee_increases: self.max_fee_increases,
pool_client_mode,
})
}

Expand Down Expand Up @@ -238,12 +233,18 @@ pub async fn run(builder_args: BuilderCliArgs, common_args: CommonArgs) -> anyho
let (event_sender, event_rx) = broadcast::channel(EVENT_CHANNEL_CAPACITY);
emit::receive_and_log_events_with_filter(event_rx, is_nonspammy_event);

let task_args = builder_args
.to_args(&common_args, PoolClientMode::Remote { url: pool_url })
.await?;
let task_args = builder_args.to_args(&common_args).await?;

let pool = connect_with_retries_shutdown(
"op pool from builder",
&pool_url,
RemotePoolClient::connect,
tokio::signal::ctrl_c(),
)
.await?;

spawn_tasks_with_shutdown(
[BuilderTask::new(task_args, event_sender).boxed()],
[BuilderTask::new(task_args, event_sender, pool).boxed()],
tokio::signal::ctrl_c(),
)
.await;
Expand Down
Loading
Loading