Skip to content

Commit

Permalink
feat: stream block updates from pool, use in builder
Browse files Browse the repository at this point in the history
  • Loading branch information
dancoombs committed Sep 1, 2023
1 parent 4ae7662 commit 8b58032
Show file tree
Hide file tree
Showing 20 changed files with 550 additions and 177 deletions.
11 changes: 6 additions & 5 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ default-run = "rundler"
[dependencies]
anyhow = "1.0.70"
arrayvec = "0.7.2"
async-stream = "0.3.5"
chrono = "0.4.24"
clap = { version = "4.2.4", features = ["derive", "env"] }
dotenv = "0.15.0"
Expand Down
16 changes: 16 additions & 0 deletions proto/op_pool/op_pool.proto
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,10 @@ service OpPool {
// reputation objects, each with the fields described above in
// debug_bundler_setReputation
rpc DebugDumpReputation (DebugDumpReputationRequest) returns (DebugDumpReputationResponse);

// Streaming API to subscribe to be updated upon a new block being added to (or reorged onto)
// the chain.
rpc SubscribeNewHeads(SubscribeNewHeadsRequest) returns (stream SubscribeNewHeadsResponse);
}

message GetSupportedEntryPointsRequest {}
Expand Down Expand Up @@ -239,6 +243,18 @@ message DebugDumpReputationSuccess {
repeated Reputation reputations = 1;
}

message SubscribeNewHeadsRequest {}
message SubscribeNewHeadsResponse {
// The new chain head
NewHead new_head = 1;
}
message NewHead {
// The block hash
bytes block_hash = 1;
// The block number
uint64 block_number = 2;
}

message Reputation {
// The (serialized) address to set the reputation for
bytes address = 1;
Expand Down
74 changes: 57 additions & 17 deletions src/builder/bundle_sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,13 @@ use std::{
};

use anyhow::{bail, Context};
use ethers::{
providers::{Http, Provider, RetryClient},
types::{transaction::eip2718::TypedTransaction, Address, H256, U256},
};
use ethers::types::{transaction::eip2718::TypedTransaction, Address, H256, U256};
use tokio::{
join,
sync::{broadcast, mpsc, oneshot},
time,
};
use tokio_stream::StreamExt;
use tonic::async_trait;
use tracing::{error, info, trace, warn};

Expand All @@ -26,7 +24,6 @@ use crate::{
transaction_tracker::{SendResult, TrackerUpdate, TransactionTracker},
},
common::{
block_watcher,
emit::WithEntryPoint,
gas::GasFees,
math,
Expand Down Expand Up @@ -66,8 +63,6 @@ where
entry_point: E,
transaction_tracker: T,
pool_client: C,
// TODO: Figure out what we really want to do for detecting new blocks.
provider: Arc<Provider<RetryClient<Http>>>,
settings: Settings,
event_sender: broadcast::Sender<WithEntryPoint<BuilderEvent>>,
}
Expand Down Expand Up @@ -111,7 +106,34 @@ where
/// then waiting for one bundle to be mined or dropped before forming the
/// next one.
async fn send_bundles_in_loop(&mut self) {
let mut last_block_number = 0;
let mut new_heads = if let Ok(new_blocks) = self.pool_client.subscribe_new_heads() {
new_blocks
} else {
error!("Failed to subscribe to new blocks");
return;
};

// The new_heads stream can buffer up multiple blocks, but we only want to consume the latest one.
// This function is used to consume the new heads and place them onto a channel that can be syncronously
// consumed until the latest block is reached.
let (tx, mut rx) = mpsc::channel(1);
tokio::spawn(async move {
loop {
match new_heads.next().await {
Some(b) => {
if tx.send(b).await.is_err() {
error!("Failed to send new block to bundle sender");
return;
}
}
None => {
error!("Block stream ended");
return;
}
}
}
});

loop {
let mut send_bundle_response: Option<oneshot::Sender<SendBundleResult>> = None;

Expand All @@ -126,12 +148,32 @@ where
}
}

last_block_number = block_watcher::wait_for_new_block_number(
&*self.provider,
last_block_number,
self.eth_poll_interval,
)
.await;
// Wait for new block. Block number doesn't matter as the pool will only notify of new blocks
// after the pool has updated its state. The bundle will be formed using the latest pool state
// and can land in the next block
let mut last_block = match rx.recv().await {
Some(b) => b,
None => {
error!("Block stream closed");
return;
}
};
// Consume any other blocks that may have been buffered up
loop {
match rx.try_recv() {
Ok(b) => {
last_block = b;
}
Err(mpsc::error::TryRecvError::Empty) => {
break;
}
Err(mpsc::error::TryRecvError::Disconnected) => {
error!("Block stream closed");
return;
}
}
}

self.check_for_and_log_transaction_update().await;
let result = self.send_bundle_with_increasing_gas_fees().await;
match &result {
Expand All @@ -145,7 +187,7 @@ where
} else {
info!("Bundle with hash {tx_hash:?} landed in block {block_number} after increasing gas fees {attempt_number} time(s)");
}
SendBundleResult::NoOperationsInitially => trace!("No ops to send at block {last_block_number}"),
SendBundleResult::NoOperationsInitially => trace!("No ops to send at block {}", last_block.block_number),
SendBundleResult::NoOperationsAfterFeeIncreases {
initial_op_count,
attempt_number,
Expand Down Expand Up @@ -184,7 +226,6 @@ where
entry_point: E,
transaction_tracker: T,
pool_client: C,
provider: Arc<Provider<RetryClient<Http>>>,
settings: Settings,
event_sender: broadcast::Sender<WithEntryPoint<BuilderEvent>>,
) -> Self {
Expand All @@ -198,7 +239,6 @@ where
entry_point,
transaction_tracker,
pool_client,
provider,
settings,
event_sender,
}
Expand Down
10 changes: 6 additions & 4 deletions src/builder/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,12 @@ impl Task for BuilderTask {
let (send_bundle_tx, send_bundle_rx) = mpsc::channel(1);

let mut builder: Box<dyn BundleSender> = match &self.args.pool_client_mode {
PoolClientMode::Local { sender } => {
let pool_client = LocalPoolClient::new(sender.clone());
PoolClientMode::Local {
req_sender,
new_heads_receiver,
} => {
let pool_client =
LocalPoolClient::new(req_sender.clone(), new_heads_receiver.resubscribe());
let proposer = BundleProposerImpl::new(
pool_client.clone(),
simulator,
Expand All @@ -175,7 +179,6 @@ impl Task for BuilderTask {
entry_point,
transaction_tracker,
pool_client,
provider,
builder_settings,
self.event_sender.clone(),
))
Expand All @@ -201,7 +204,6 @@ impl Task for BuilderTask {
entry_point,
transaction_tracker,
pool_client,
provider,
builder_settings,
self.event_sender.clone(),
))
Expand Down
22 changes: 19 additions & 3 deletions src/cli/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,20 +41,36 @@ pub async fn run(bundler_args: NodeCliArgs, common_args: CommonArgs) -> anyhow::
let builder_url = builder_args.url(false);

let (tx, rx) = mpsc::channel(1024);
let (new_heads_sender, _) = broadcast::channel(1024);

let pool_task_args = pool_args
.to_args(&common_args, PoolServerMode::Local { receiver: Some(rx) })
.to_args(
&common_args,
PoolServerMode::Local {
req_receiver: Some(rx),
new_heads_sender: Some(new_heads_sender.clone()),
},
)
.await?;
let builder_task_args = builder_args
.to_args(&common_args, PoolClientMode::Local { sender: tx.clone() })
.to_args(
&common_args,
PoolClientMode::Local {
req_sender: tx.clone(),
new_heads_receiver: new_heads_sender.subscribe(),
},
)
.await?;
let rpc_task_args = rpc_args
.to_args(
&common_args,
builder_url,
(&common_args).try_into()?,
(&common_args).try_into()?,
PoolClientMode::Local { sender: tx },
PoolClientMode::Local {
req_sender: tx,
new_heads_receiver: new_heads_sender.subscribe(),
},
)
.await?;

Expand Down
2 changes: 1 addition & 1 deletion src/op_pool/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ pub struct Chain<P: ProviderLike> {
load_ops_semaphore: Semaphore,
}

#[derive(Debug, Eq, PartialEq)]
#[derive(Default, Debug, Eq, PartialEq)]
pub struct ChainUpdate {
pub latest_block_number: u64,
pub latest_block_hash: H256,
Expand Down
49 changes: 44 additions & 5 deletions src/op_pool/mempool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@ pub use reputation::{
HourlyMovingAverageReputation, Reputation, ReputationParams, ReputationStatus,
};
use strum::IntoEnumIterator;
use tokio::sync::broadcast;
use tokio_util::sync::CancellationToken;
use tonic::async_trait;

use self::error::MempoolResult;
use super::MempoolError;
use super::{chain::ChainUpdate, MempoolError};
use crate::common::{
mempool::MempoolConfig,
precheck, simulation,
Expand All @@ -30,6 +32,9 @@ use crate::common::{
#[async_trait]
/// In-memory operation pool
pub trait Mempool: Send + Sync + 'static {
/// Call to update the mempool with a new chain update
fn on_chain_update(&self, update: &ChainUpdate);

/// Returns the entry point address this pool targets.
fn entry_point(&self) -> Address;

Expand Down Expand Up @@ -178,16 +183,51 @@ impl PoolOperation {

#[derive(Debug)]
pub struct MempoolGroup<M> {
mempools: HashMap<Address, Arc<M>>,
mempools: HashMap<Address, M>,
chain_update_sender: broadcast::Sender<Arc<ChainUpdate>>,
}

impl<M> MempoolGroup<M>
where
M: Mempool,
{
pub fn new(mempools: Vec<Arc<M>>) -> Self {
pub fn new(mempools: Vec<M>) -> Self {
Self {
mempools: mempools.into_iter().map(|m| (m.entry_point(), m)).collect(),
chain_update_sender: broadcast::channel(1024).0,
}
}

pub fn subscribe_chain_update(self: Arc<Self>) -> broadcast::Receiver<Arc<ChainUpdate>> {
self.chain_update_sender.subscribe()
}

pub async fn run(
self: Arc<Self>,
mut chain_update_receiver: broadcast::Receiver<Arc<ChainUpdate>>,
shutdown_token: CancellationToken,
) {
loop {
tokio::select! {
_ = shutdown_token.cancelled() => {
tracing::info!("Shutting down UoPool");
break;
}
chain_update = chain_update_receiver.recv() => {
if let Ok(chain_update) = chain_update {
// Update each mempool before notifying listeners of the chain update
// This allows the mempools to update their state before the listeners
// pull information from the mempool.
// For example, a bundle builder listening for a new block to kick off
// its bundle building process will want to be able to query the mempool
// and only receive operations that have not yet been mined.
for mempool in self.mempools.values() {
mempool.on_chain_update(&chain_update);
}
let _ = self.chain_update_sender.send(chain_update);
}
}
}
}
}

Expand Down Expand Up @@ -265,10 +305,9 @@ where
Ok(mempool.dump_reputation())
}

pub fn get_pool(&self, entry_point: Address) -> MempoolResult<Arc<M>> {
fn get_pool(&self, entry_point: Address) -> MempoolResult<&M> {
self.mempools
.get(&entry_point)
.cloned()
.ok_or_else(|| MempoolError::UnknownEntryPoint(entry_point))
}
}
Expand Down
Loading

0 comments on commit 8b58032

Please sign in to comment.