Skip to content

Commit

Permalink
feat: stream block updates from pool, use in builder
Browse files Browse the repository at this point in the history
  • Loading branch information
dancoombs committed Aug 23, 2023
1 parent 4ae7662 commit 7210e9c
Show file tree
Hide file tree
Showing 17 changed files with 443 additions and 170 deletions.
16 changes: 16 additions & 0 deletions proto/op_pool/op_pool.proto
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,10 @@ service OpPool {
// reputation objects, each with the fields described above in
// debug_bundler_setReputation
rpc DebugDumpReputation (DebugDumpReputationRequest) returns (DebugDumpReputationResponse);

// Streaming API to subscribe to be updated upon a new block being added to (or reorged onto)
// the chain.
rpc SubscribeNewBlocks(SubscribeNewBlocksRequest) returns (stream SubscribeNewBlocksResponse);
}

message GetSupportedEntryPointsRequest {}
Expand Down Expand Up @@ -239,6 +243,18 @@ message DebugDumpReputationSuccess {
repeated Reputation reputations = 1;
}

message SubscribeNewBlocksRequest {}
message SubscribeNewBlocksResponse {
// The newly added block
NewBlock new_block = 1;
}
message NewBlock {
// The block hash
bytes hash = 1;
// The block number
uint64 number = 2;
}

message Reputation {
// The (serialized) address to set the reputation for
bytes address = 1;
Expand Down
38 changes: 21 additions & 17 deletions src/builder/bundle_sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,7 @@ use std::{
};

use anyhow::{bail, Context};
use ethers::{
providers::{Http, Provider, RetryClient},
types::{transaction::eip2718::TypedTransaction, Address, H256, U256},
};
use ethers::types::{transaction::eip2718::TypedTransaction, Address, H256, U256};
use tokio::{
join,
sync::{broadcast, mpsc, oneshot},
Expand All @@ -26,7 +23,6 @@ use crate::{
transaction_tracker::{SendResult, TrackerUpdate, TransactionTracker},
},
common::{
block_watcher,
emit::WithEntryPoint,
gas::GasFees,
math,
Expand Down Expand Up @@ -66,8 +62,6 @@ where
entry_point: E,
transaction_tracker: T,
pool_client: C,
// TODO: Figure out what we really want to do for detecting new blocks.
provider: Arc<Provider<RetryClient<Http>>>,
settings: Settings,
event_sender: broadcast::Sender<WithEntryPoint<BuilderEvent>>,
}
Expand Down Expand Up @@ -111,7 +105,13 @@ where
/// then waiting for one bundle to be mined or dropped before forming the
/// next one.
async fn send_bundles_in_loop(&mut self) {
let mut last_block_number = 0;
let mut new_blocks = if let Ok(new_blocks) = self.pool_client.subscribe_new_blocks().await {
new_blocks
} else {
error!("Failed to subscribe to new blocks");
return;
};

loop {
let mut send_bundle_response: Option<oneshot::Sender<SendBundleResult>> = None;

Expand All @@ -126,12 +126,18 @@ where
}
}

last_block_number = block_watcher::wait_for_new_block_number(
&*self.provider,
last_block_number,
self.eth_poll_interval,
)
.await;
// Wait for new block. It doesn't matter which the block number as
// the pool will only notify of new blocks after the pool has updated its state.
// The bundle will be formed using the latest pool state and can land in the
// next block
let last_block = match new_blocks.recv().await {
Ok(b) => b,
Err(e) => {
error!("Failed to retrieve new block: {e}");
continue;
}
};

self.check_for_and_log_transaction_update().await;
let result = self.send_bundle_with_increasing_gas_fees().await;
match &result {
Expand All @@ -145,7 +151,7 @@ where
} else {
info!("Bundle with hash {tx_hash:?} landed in block {block_number} after increasing gas fees {attempt_number} time(s)");
}
SendBundleResult::NoOperationsInitially => trace!("No ops to send at block {last_block_number}"),
SendBundleResult::NoOperationsInitially => trace!("No ops to send at block {}", last_block.number),
SendBundleResult::NoOperationsAfterFeeIncreases {
initial_op_count,
attempt_number,
Expand Down Expand Up @@ -184,7 +190,6 @@ where
entry_point: E,
transaction_tracker: T,
pool_client: C,
provider: Arc<Provider<RetryClient<Http>>>,
settings: Settings,
event_sender: broadcast::Sender<WithEntryPoint<BuilderEvent>>,
) -> Self {
Expand All @@ -198,7 +203,6 @@ where
entry_point,
transaction_tracker,
pool_client,
provider,
settings,
event_sender,
}
Expand Down
10 changes: 6 additions & 4 deletions src/builder/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,12 @@ impl Task for BuilderTask {
let (send_bundle_tx, send_bundle_rx) = mpsc::channel(1);

let mut builder: Box<dyn BundleSender> = match &self.args.pool_client_mode {
PoolClientMode::Local { sender } => {
let pool_client = LocalPoolClient::new(sender.clone());
PoolClientMode::Local {
req_sender,
block_receiver,
} => {
let pool_client =
LocalPoolClient::new(req_sender.clone(), block_receiver.resubscribe());
let proposer = BundleProposerImpl::new(
pool_client.clone(),
simulator,
Expand All @@ -175,7 +179,6 @@ impl Task for BuilderTask {
entry_point,
transaction_tracker,
pool_client,
provider,
builder_settings,
self.event_sender.clone(),
))
Expand All @@ -201,7 +204,6 @@ impl Task for BuilderTask {
entry_point,
transaction_tracker,
pool_client,
provider,
builder_settings,
self.event_sender.clone(),
))
Expand Down
22 changes: 19 additions & 3 deletions src/cli/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,20 +41,36 @@ pub async fn run(bundler_args: NodeCliArgs, common_args: CommonArgs) -> anyhow::
let builder_url = builder_args.url(false);

let (tx, rx) = mpsc::channel(1024);
let (block_sender, _) = broadcast::channel(1024);

let pool_task_args = pool_args
.to_args(&common_args, PoolServerMode::Local { receiver: Some(rx) })
.to_args(
&common_args,
PoolServerMode::Local {
req_receiver: Some(rx),
block_sender: Some(block_sender.clone()),
},
)
.await?;
let builder_task_args = builder_args
.to_args(&common_args, PoolClientMode::Local { sender: tx.clone() })
.to_args(
&common_args,
PoolClientMode::Local {
req_sender: tx.clone(),
block_receiver: block_sender.subscribe(),
},
)
.await?;
let rpc_task_args = rpc_args
.to_args(
&common_args,
builder_url,
(&common_args).try_into()?,
(&common_args).try_into()?,
PoolClientMode::Local { sender: tx },
PoolClientMode::Local {
req_sender: tx,
block_receiver: block_sender.subscribe(),
},
)
.await?;

Expand Down
2 changes: 1 addition & 1 deletion src/op_pool/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ pub struct Chain<P: ProviderLike> {
load_ops_semaphore: Semaphore,
}

#[derive(Debug, Eq, PartialEq)]
#[derive(Default, Debug, Eq, PartialEq)]
pub struct ChainUpdate {
pub latest_block_number: u64,
pub latest_block_hash: H256,
Expand Down
49 changes: 44 additions & 5 deletions src/op_pool/mempool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@ pub use reputation::{
HourlyMovingAverageReputation, Reputation, ReputationParams, ReputationStatus,
};
use strum::IntoEnumIterator;
use tokio::sync::broadcast;
use tokio_util::sync::CancellationToken;
use tonic::async_trait;

use self::error::MempoolResult;
use super::MempoolError;
use super::{chain::ChainUpdate, MempoolError};
use crate::common::{
mempool::MempoolConfig,
precheck, simulation,
Expand All @@ -30,6 +32,9 @@ use crate::common::{
#[async_trait]
/// In-memory operation pool
pub trait Mempool: Send + Sync + 'static {
/// Call to update the mempool with a new chain update
fn on_chain_update(&self, update: &ChainUpdate);

/// Returns the entry point address this pool targets.
fn entry_point(&self) -> Address;

Expand Down Expand Up @@ -178,16 +183,51 @@ impl PoolOperation {

#[derive(Debug)]
pub struct MempoolGroup<M> {
mempools: HashMap<Address, Arc<M>>,
mempools: HashMap<Address, M>,
chain_updates: broadcast::Sender<Arc<ChainUpdate>>,
}

impl<M> MempoolGroup<M>
where
M: Mempool,
{
pub fn new(mempools: Vec<Arc<M>>) -> Self {
pub fn new(mempools: Vec<M>) -> Self {
Self {
mempools: mempools.into_iter().map(|m| (m.entry_point(), m)).collect(),
chain_updates: broadcast::channel(1024).0,
}
}

pub fn subscribe_chain_update(self: Arc<Self>) -> broadcast::Receiver<Arc<ChainUpdate>> {
self.chain_updates.subscribe()
}

pub async fn run(
self: Arc<Self>,
mut chain_updates: broadcast::Receiver<Arc<ChainUpdate>>,
shutdown_token: CancellationToken,
) {
loop {
tokio::select! {
_ = shutdown_token.cancelled() => {
tracing::info!("Shutting down UoPool");
break;
}
chain_update = chain_updates.recv() => {
if let Ok(chain_update) = chain_update {
// Update each mempool before notifying listeners of the chain update
// This allows the mempools to update their state before the listeners
// pull information from the mempool.
// For example, a bundle builder listening for a new block to kick off
// its bundle building process will want to be able to query the mempool
// and only receive operations that have not yet been mined.
for mempool in self.mempools.values() {
mempool.on_chain_update(&chain_update);
}
let _ = self.chain_updates.send(chain_update);
}
}
}
}
}

Expand Down Expand Up @@ -265,10 +305,9 @@ where
Ok(mempool.dump_reputation())
}

pub fn get_pool(&self, entry_point: Address) -> MempoolResult<Arc<M>> {
fn get_pool(&self, entry_point: Address) -> MempoolResult<&M> {
self.mempools
.get(&entry_point)
.cloned()
.ok_or_else(|| MempoolError::UnknownEntryPoint(entry_point))
}
}
Expand Down
37 changes: 8 additions & 29 deletions src/op_pool/mempool/uo_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use std::{
use ethers::types::{Address, H256};
use parking_lot::RwLock;
use tokio::sync::broadcast;
use tokio_util::sync::CancellationToken;
use tonic::async_trait;
use tracing::info;

Expand Down Expand Up @@ -81,33 +80,21 @@ where
}
}

pub async fn run(
self: Arc<Self>,
mut chain_events: broadcast::Receiver<Arc<ChainUpdate>>,
shutdown_token: CancellationToken,
) {
loop {
tokio::select! {
_ = shutdown_token.cancelled() => {
tracing::info!("Shutting down UoPool");
break;
}
update = chain_events.recv() => {
if let Ok(update) = update {
self.on_chain_update(&update);
}
}
}
}
}

fn emit(&self, event: OpPoolEvent) {
let _ = self.event_sender.send(WithEntryPoint {
entry_point: self.entry_point,
event,
});
}
}

#[async_trait]
impl<R, P, S> Mempool for UoPool<R, P, S>
where
R: ReputationManager,
P: Prechecker,
S: Simulator,
{
fn on_chain_update(&self, update: &ChainUpdate) {
let mut state = self.state.write();
let deduped_ops = update.deduped_ops();
Expand Down Expand Up @@ -188,15 +175,7 @@ where
}
state.block_number = update.latest_block_number;
}
}

#[async_trait]
impl<R, P, S> Mempool for UoPool<R, P, S>
where
R: ReputationManager,
P: Prechecker,
S: Simulator,
{
fn entry_point(&self) -> Address {
self.entry_point
}
Expand Down
2 changes: 1 addition & 1 deletion src/op_pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ pub use mempool::{error::MempoolError, PoolConfig, PoolOperation, Reputation, Re
#[cfg(test)]
pub use server::MockPoolClient;
pub use server::{
connect_remote_pool_client, LocalPoolClient, PoolClient, PoolClientMode, PoolResult,
connect_remote_pool_client, LocalPoolClient, NewBlock, PoolClient, PoolClientMode, PoolResult,
PoolServerError, RemotePoolClient, ServerRequest as LocalPoolServerRequest,
};
pub use task::*;
Loading

0 comments on commit 7210e9c

Please sign in to comment.