Skip to content

Commit

Permalink
feat: stream block updates from pool, use in builder
Browse files Browse the repository at this point in the history
  • Loading branch information
dancoombs committed Aug 23, 2023
1 parent ac87557 commit 8755bbf
Show file tree
Hide file tree
Showing 16 changed files with 450 additions and 172 deletions.
7 changes: 7 additions & 0 deletions proto/op_pool/op_pool.proto
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ service OpPool {
// reputation objects, each with the fields described above in
// debug_bundler_setReputation
rpc DebugDumpReputation (DebugDumpReputationRequest) returns (DebugDumpReputationResponse);
rpc SubscribeNewBlocks(SubscribeNewBlocksRequest) returns (stream NewBlock);

Check failure on line 119 in proto/op_pool/op_pool.proto

View workflow job for this annotation

GitHub Actions / lint

RPC response type "NewBlock" should be named "SubscribeNewBlocksResponse" or "OpPoolSubscribeNewBlocksResponse".
}

message GetSupportedEntryPointsRequest {}
Expand Down Expand Up @@ -239,6 +240,12 @@ message DebugDumpReputationSuccess {
repeated Reputation reputations = 1;
}

message SubscribeNewBlocksRequest {}
message NewBlock {
bytes hash = 1;
uint64 number = 2;
}

message Reputation {
// The (serialized) address to set the reputation for
bytes address = 1;
Expand Down
52 changes: 36 additions & 16 deletions src/builder/bundle_sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,7 @@ use std::{
};

use anyhow::{bail, Context};
use ethers::{
providers::{Http, Provider, RetryClient},
types::{transaction::eip2718::TypedTransaction, Address, H256, U256},
};
use ethers::types::{transaction::eip2718::TypedTransaction, Address, H256, U256};
use tokio::{
join,
sync::{broadcast, mpsc, oneshot},
Expand All @@ -26,13 +23,12 @@ use crate::{
transaction_tracker::{SendResult, TrackerUpdate, TransactionTracker},
},
common::{
block_watcher,
emit::WithEntryPoint,
gas::GasFees,
math,
types::{Entity, EntryPointLike, ExpectedStorage, UserOperation},
},
op_pool::PoolClient,
op_pool::{NewBlock, PoolClient},
};

// Overhead on gas estimates to account for inaccuracies.
Expand Down Expand Up @@ -66,8 +62,6 @@ where
entry_point: E,
transaction_tracker: T,
pool_client: C,
// TODO: Figure out what we really want to do for detecting new blocks.
provider: Arc<Provider<RetryClient<Http>>>,
settings: Settings,
event_sender: broadcast::Sender<WithEntryPoint<BuilderEvent>>,
}
Expand Down Expand Up @@ -112,6 +106,13 @@ where
/// next one.
async fn send_bundles_in_loop(&mut self) {
let mut last_block_number = 0;
let mut new_blocks = if let Ok(new_blocks) = self.pool_client.subscribe_new_blocks().await {
new_blocks
} else {
error!("Failed to subscribe to new blocks");
return;
};

loop {
let mut send_bundle_response: Option<oneshot::Sender<SendBundleResult>> = None;

Expand All @@ -126,12 +127,17 @@ where
}
}

last_block_number = block_watcher::wait_for_new_block_number(
&*self.provider,
last_block_number,
self.eth_poll_interval,
)
.await;
last_block_number = match self
.wait_for_new_block_number(last_block_number, &mut new_blocks)
.await
{
Ok(l) => l,
Err(e) => {
error!("Failed to retrieve new block number: {e}");
continue;
}
};

self.check_for_and_log_transaction_update().await;
let result = self.send_bundle_with_increasing_gas_fees().await;
match &result {
Expand Down Expand Up @@ -184,7 +190,6 @@ where
entry_point: E,
transaction_tracker: T,
pool_client: C,
provider: Arc<Provider<RetryClient<Http>>>,
settings: Settings,
event_sender: broadcast::Sender<WithEntryPoint<BuilderEvent>>,
) -> Self {
Expand All @@ -198,7 +203,6 @@ where
entry_point,
transaction_tracker,
pool_client,
provider,
settings,
event_sender,
}
Expand Down Expand Up @@ -377,6 +381,22 @@ where
Ok(SendBundleResult::StalledAtMaxFeeIncreases)
}

async fn wait_for_new_block_number(
&self,
prev_block_number: u64,
new_blocks: &mut broadcast::Receiver<NewBlock>,
) -> anyhow::Result<u64> {
loop {
let block = new_blocks
.recv()
.await
.context("builder should receive new blocks")?;
if block.number > prev_block_number {
return Ok(block.number);
}
}
}

/// Builds a bundle and returns some metadata and the transaction to send
/// it, or `None` if there are no valid operations available.
async fn get_bundle_tx(
Expand Down
10 changes: 6 additions & 4 deletions src/builder/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,12 @@ impl Task for BuilderTask {
let (send_bundle_tx, send_bundle_rx) = mpsc::channel(1);

let mut builder: Box<dyn BundleSender> = match &self.args.pool_client_mode {
PoolClientMode::Local { sender } => {
let pool_client = LocalPoolClient::new(sender.clone());
PoolClientMode::Local {
req_sender,
block_receiver,
} => {
let pool_client =
LocalPoolClient::new(req_sender.clone(), block_receiver.resubscribe());
let proposer = BundleProposerImpl::new(
pool_client.clone(),
simulator,
Expand All @@ -175,7 +179,6 @@ impl Task for BuilderTask {
entry_point,
transaction_tracker,
pool_client,
provider,
builder_settings,
self.event_sender.clone(),
))
Expand All @@ -201,7 +204,6 @@ impl Task for BuilderTask {
entry_point,
transaction_tracker,
pool_client,
provider,
builder_settings,
self.event_sender.clone(),
))
Expand Down
22 changes: 19 additions & 3 deletions src/cli/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,20 +41,36 @@ pub async fn run(bundler_args: NodeCliArgs, common_args: CommonArgs) -> anyhow::
let builder_url = builder_args.url(false);

let (tx, rx) = mpsc::channel(1024);
let (block_sender, _) = broadcast::channel(1024);

let pool_task_args = pool_args
.to_args(&common_args, PoolServerMode::Local { receiver: Some(rx) })
.to_args(
&common_args,
PoolServerMode::Local {
req_receiver: Some(rx),
block_sender: Some(block_sender.clone()),
},
)
.await?;
let builder_task_args = builder_args
.to_args(&common_args, PoolClientMode::Local { sender: tx.clone() })
.to_args(
&common_args,
PoolClientMode::Local {
req_sender: tx.clone(),
block_receiver: block_sender.subscribe(),
},
)
.await?;
let rpc_task_args = rpc_args
.to_args(
&common_args,
builder_url,
(&common_args).try_into()?,
(&common_args).try_into()?,
PoolClientMode::Local { sender: tx },
PoolClientMode::Local {
req_sender: tx,
block_receiver: block_sender.subscribe(),
},
)
.await?;

Expand Down
2 changes: 1 addition & 1 deletion src/op_pool/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ pub struct Chain<P: ProviderLike> {
load_ops_semaphore: Semaphore,
}

#[derive(Debug, Eq, PartialEq)]
#[derive(Default, Debug, Eq, PartialEq)]
pub struct ChainUpdate {
pub latest_block_number: u64,
pub latest_block_hash: H256,
Expand Down
43 changes: 38 additions & 5 deletions src/op_pool/mempool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@ pub use reputation::{
HourlyMovingAverageReputation, Reputation, ReputationParams, ReputationStatus,
};
use strum::IntoEnumIterator;
use tokio::sync::broadcast;
use tokio_util::sync::CancellationToken;
use tonic::async_trait;

use self::error::MempoolResult;
use super::MempoolError;
use super::{chain::ChainUpdate, MempoolError};
use crate::common::{
mempool::MempoolConfig,
precheck, simulation,
Expand All @@ -30,6 +32,9 @@ use crate::common::{
#[async_trait]
/// In-memory operation pool
pub trait Mempool: Send + Sync + 'static {
/// Call to update the mempool with a new chain update
fn on_chain_update(&self, update: &ChainUpdate);

/// Returns the entry point address this pool targets.
fn entry_point(&self) -> Address;

Expand Down Expand Up @@ -178,16 +183,45 @@ impl PoolOperation {

#[derive(Debug)]
pub struct MempoolGroup<M> {
mempools: HashMap<Address, Arc<M>>,
mempools: HashMap<Address, M>,
chain_updates: broadcast::Sender<Arc<ChainUpdate>>,
}

impl<M> MempoolGroup<M>
where
M: Mempool,
{
pub fn new(mempools: Vec<Arc<M>>) -> Self {
pub fn new(mempools: Vec<M>) -> Self {
Self {
mempools: mempools.into_iter().map(|m| (m.entry_point(), m)).collect(),
chain_updates: broadcast::channel(1024).0,
}
}

pub fn subscribe_chain_update(self: Arc<Self>) -> broadcast::Receiver<Arc<ChainUpdate>> {
self.chain_updates.subscribe()
}

pub async fn run(
self: Arc<Self>,
mut chain_updates: broadcast::Receiver<Arc<ChainUpdate>>,
shutdown_token: CancellationToken,
) {
loop {
tokio::select! {
_ = shutdown_token.cancelled() => {
tracing::info!("Shutting down UoPool");
break;
}
chain_update = chain_updates.recv() => {
if let Ok(chain_update) = chain_update {
for mempool in self.mempools.values() {
mempool.on_chain_update(&chain_update);
}
let _ = self.chain_updates.send(chain_update);
}
}
}
}
}

Expand Down Expand Up @@ -265,10 +299,9 @@ where
Ok(mempool.dump_reputation())
}

pub fn get_pool(&self, entry_point: Address) -> MempoolResult<Arc<M>> {
fn get_pool(&self, entry_point: Address) -> MempoolResult<&M> {
self.mempools
.get(&entry_point)
.cloned()
.ok_or_else(|| MempoolError::UnknownEntryPoint(entry_point))
}
}
Expand Down
37 changes: 8 additions & 29 deletions src/op_pool/mempool/uo_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use std::{
use ethers::types::{Address, H256};
use parking_lot::RwLock;
use tokio::sync::broadcast;
use tokio_util::sync::CancellationToken;
use tonic::async_trait;
use tracing::info;

Expand Down Expand Up @@ -81,33 +80,21 @@ where
}
}

pub async fn run(
self: Arc<Self>,
mut chain_events: broadcast::Receiver<Arc<ChainUpdate>>,
shutdown_token: CancellationToken,
) {
loop {
tokio::select! {
_ = shutdown_token.cancelled() => {
tracing::info!("Shutting down UoPool");
break;
}
update = chain_events.recv() => {
if let Ok(update) = update {
self.on_chain_update(&update);
}
}
}
}
}

fn emit(&self, event: OpPoolEvent) {
let _ = self.event_sender.send(WithEntryPoint {
entry_point: self.entry_point,
event,
});
}
}

#[async_trait]
impl<R, P, S> Mempool for UoPool<R, P, S>
where
R: ReputationManager,
P: Prechecker,
S: Simulator,
{
fn on_chain_update(&self, update: &ChainUpdate) {
let mut state = self.state.write();
let deduped_ops = update.deduped_ops();
Expand Down Expand Up @@ -188,15 +175,7 @@ where
}
state.block_number = update.latest_block_number;
}
}

#[async_trait]
impl<R, P, S> Mempool for UoPool<R, P, S>
where
R: ReputationManager,
P: Prechecker,
S: Simulator,
{
fn entry_point(&self) -> Address {
self.entry_point
}
Expand Down
2 changes: 1 addition & 1 deletion src/op_pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ pub use mempool::{error::MempoolError, PoolConfig, PoolOperation, Reputation, Re
#[cfg(test)]
pub use server::MockPoolClient;
pub use server::{
connect_remote_pool_client, LocalPoolClient, PoolClient, PoolClientMode, PoolResult,
connect_remote_pool_client, LocalPoolClient, NewBlock, PoolClient, PoolClientMode, PoolResult,
PoolServerError, RemotePoolClient, ServerRequest as LocalPoolServerRequest,
};
pub use task::*;
Loading

0 comments on commit 8755bbf

Please sign in to comment.