Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: stream block updates from pool, use in builder #272

Merged
merged 1 commit into from
Sep 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ default-run = "rundler"
[dependencies]
anyhow = "1.0.70"
arrayvec = "0.7.2"
async-stream = "0.3.5"
chrono = "0.4.24"
clap = { version = "4.2.4", features = ["derive", "env"] }
dotenv = "0.15.0"
Expand Down
16 changes: 16 additions & 0 deletions proto/op_pool/op_pool.proto
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,10 @@ service OpPool {
// reputation objects, each with the fields described above in
// debug_bundler_setReputation
rpc DebugDumpReputation (DebugDumpReputationRequest) returns (DebugDumpReputationResponse);

// Streaming API to subscribe to be updated upon a new block being added to (or reorged onto)
// the chain.
rpc SubscribeNewHeads(SubscribeNewHeadsRequest) returns (stream SubscribeNewHeadsResponse);
}

message GetSupportedEntryPointsRequest {}
Expand Down Expand Up @@ -239,6 +243,18 @@ message DebugDumpReputationSuccess {
repeated Reputation reputations = 1;
}

message SubscribeNewHeadsRequest {}
message SubscribeNewHeadsResponse {
// The new chain head
NewHead new_head = 1;
}
message NewHead {
// The block hash
bytes block_hash = 1;
// The block number
uint64 block_number = 2;
}

message Reputation {
// The (serialized) address to set the reputation for
bytes address = 1;
Expand Down
74 changes: 57 additions & 17 deletions src/builder/bundle_sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,13 @@ use std::{
};

use anyhow::{bail, Context};
use ethers::{
providers::{Http, Provider, RetryClient},
types::{transaction::eip2718::TypedTransaction, Address, H256, U256},
};
use ethers::types::{transaction::eip2718::TypedTransaction, Address, H256, U256};
use tokio::{
join,
sync::{broadcast, mpsc, oneshot},
time,
};
use tokio_stream::StreamExt;
use tonic::async_trait;
use tracing::{error, info, trace, warn};

Expand All @@ -26,7 +24,6 @@ use crate::{
transaction_tracker::{SendResult, TrackerUpdate, TransactionTracker},
},
common::{
block_watcher,
emit::WithEntryPoint,
gas::GasFees,
math,
Expand Down Expand Up @@ -66,8 +63,6 @@ where
entry_point: E,
transaction_tracker: T,
pool_client: C,
// TODO: Figure out what we really want to do for detecting new blocks.
provider: Arc<Provider<RetryClient<Http>>>,
settings: Settings,
event_sender: broadcast::Sender<WithEntryPoint<BuilderEvent>>,
}
Expand Down Expand Up @@ -111,7 +106,34 @@ where
/// then waiting for one bundle to be mined or dropped before forming the
/// next one.
async fn send_bundles_in_loop(&mut self) {
let mut last_block_number = 0;
let mut new_heads = if let Ok(new_blocks) = self.pool_client.subscribe_new_heads() {
new_blocks
} else {
error!("Failed to subscribe to new blocks");
return;
};

// The new_heads stream can buffer up multiple blocks, but we only want to consume the latest one.
// This task is used to consume the new heads and place them onto a channel that can be syncronously
// consumed until the latest block is reached.
let (tx, mut rx) = mpsc::channel(1024);
tokio::spawn(async move {
loop {
match new_heads.next().await {
Some(b) => {
if tx.send(b).await.is_err() {
error!("Failed to buffer new block for bundle sender");
return;
}
}
None => {
error!("Block stream ended");
return;
dancoombs marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
});

loop {
let mut send_bundle_response: Option<oneshot::Sender<SendBundleResult>> = None;

Expand All @@ -126,12 +148,32 @@ where
}
}

last_block_number = block_watcher::wait_for_new_block_number(
&*self.provider,
last_block_number,
self.eth_poll_interval,
)
.await;
// Wait for new block. Block number doesn't matter as the pool will only notify of new blocks
// after the pool has updated its state. The bundle will be formed using the latest pool state
// and can land in the next block
let mut last_block = match rx.recv().await {
Some(b) => b,
None => {
error!("Block stream closed");
return;
}
};
// Consume any other blocks that may have been buffered up
loop {
match rx.try_recv() {
Ok(b) => {
last_block = b;
}
Err(mpsc::error::TryRecvError::Empty) => {
break;
}
Err(mpsc::error::TryRecvError::Disconnected) => {
error!("Block stream closed");
return;
}
}
}

self.check_for_and_log_transaction_update().await;
let result = self.send_bundle_with_increasing_gas_fees().await;
match &result {
Expand All @@ -145,7 +187,7 @@ where
} else {
info!("Bundle with hash {tx_hash:?} landed in block {block_number} after increasing gas fees {attempt_number} time(s)");
}
SendBundleResult::NoOperationsInitially => trace!("No ops to send at block {last_block_number}"),
SendBundleResult::NoOperationsInitially => trace!("No ops to send at block {}", last_block.block_number),
SendBundleResult::NoOperationsAfterFeeIncreases {
initial_op_count,
attempt_number,
Expand Down Expand Up @@ -184,7 +226,6 @@ where
entry_point: E,
transaction_tracker: T,
pool_client: C,
provider: Arc<Provider<RetryClient<Http>>>,
settings: Settings,
event_sender: broadcast::Sender<WithEntryPoint<BuilderEvent>>,
) -> Self {
Expand All @@ -198,7 +239,6 @@ where
entry_point,
transaction_tracker,
pool_client,
provider,
settings,
event_sender,
}
Expand Down
10 changes: 6 additions & 4 deletions src/builder/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,12 @@ impl Task for BuilderTask {
let (send_bundle_tx, send_bundle_rx) = mpsc::channel(1);

let mut builder: Box<dyn BundleSender> = match &self.args.pool_client_mode {
PoolClientMode::Local { sender } => {
let pool_client = LocalPoolClient::new(sender.clone());
PoolClientMode::Local {
req_sender,
new_heads_receiver,
} => {
let pool_client =
LocalPoolClient::new(req_sender.clone(), new_heads_receiver.resubscribe());
let proposer = BundleProposerImpl::new(
pool_client.clone(),
simulator,
Expand All @@ -175,7 +179,6 @@ impl Task for BuilderTask {
entry_point,
transaction_tracker,
pool_client,
provider,
builder_settings,
self.event_sender.clone(),
))
Expand All @@ -201,7 +204,6 @@ impl Task for BuilderTask {
entry_point,
transaction_tracker,
pool_client,
provider,
builder_settings,
self.event_sender.clone(),
))
Expand Down
22 changes: 19 additions & 3 deletions src/cli/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,20 +41,36 @@ pub async fn run(bundler_args: NodeCliArgs, common_args: CommonArgs) -> anyhow::
let builder_url = builder_args.url(false);

let (tx, rx) = mpsc::channel(1024);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if all the channels are the same size we should probably make this configurable and use a default channel size

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah we have this problem all over the codebase. Definitely something we should clean up. I think we should just have a concept of an unbounded_channel that we use in most places.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeh I think we could clean up the thread messaging a bit by using some type of event passing structure that can handle easy iterations of the codebase.

I am going to play around with it over the weekend and see what I can come up with.

pub struct Engine<E, A> {
    /// The set of collectors that the engine will use to collect events.
    collectors: Vec<Box<dyn Collector<E>>>,

    /// The set of executors that the engine will use to execute actions.
    executors: Vec<Box<dyn Executor<A>>>,

    /// The capacity of the event channel.
    event_channel_capacity: usize,

    /// The capacity of the action channel.
    action_channel_capacity: usize,
}

Something like this then we can have a collector trait that listens for new block events and exectutors that can process the events and bundle the user ops.

will most likely take a while to get right but will let you know how I go

let (new_heads_sender, _) = broadcast::channel(1024);

let pool_task_args = pool_args
.to_args(&common_args, PoolServerMode::Local { receiver: Some(rx) })
.to_args(
&common_args,
PoolServerMode::Local {
req_receiver: Some(rx),
new_heads_sender: Some(new_heads_sender.clone()),
},
)
.await?;
let builder_task_args = builder_args
.to_args(&common_args, PoolClientMode::Local { sender: tx.clone() })
.to_args(
&common_args,
PoolClientMode::Local {
req_sender: tx.clone(),
new_heads_receiver: new_heads_sender.subscribe(),
},
)
.await?;
let rpc_task_args = rpc_args
.to_args(
&common_args,
builder_url,
(&common_args).try_into()?,
(&common_args).try_into()?,
PoolClientMode::Local { sender: tx },
PoolClientMode::Local {
req_sender: tx,
new_heads_receiver: new_heads_sender.subscribe(),
},
)
.await?;

Expand Down
2 changes: 1 addition & 1 deletion src/op_pool/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ pub struct Chain<P: ProviderLike> {
load_ops_semaphore: Semaphore,
}

#[derive(Debug, Eq, PartialEq)]
#[derive(Default, Debug, Eq, PartialEq)]
pub struct ChainUpdate {
pub latest_block_number: u64,
pub latest_block_hash: H256,
Expand Down
49 changes: 44 additions & 5 deletions src/op_pool/mempool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@ pub use reputation::{
HourlyMovingAverageReputation, Reputation, ReputationParams, ReputationStatus,
};
use strum::IntoEnumIterator;
use tokio::sync::broadcast;
use tokio_util::sync::CancellationToken;
use tonic::async_trait;

use self::error::MempoolResult;
use super::MempoolError;
use super::{chain::ChainUpdate, MempoolError};
use crate::common::{
mempool::MempoolConfig,
precheck, simulation,
Expand All @@ -30,6 +32,9 @@ use crate::common::{
#[async_trait]
/// In-memory operation pool
pub trait Mempool: Send + Sync + 'static {
/// Call to update the mempool with a new chain update
fn on_chain_update(&self, update: &ChainUpdate);

/// Returns the entry point address this pool targets.
fn entry_point(&self) -> Address;

Expand Down Expand Up @@ -178,16 +183,51 @@ impl PoolOperation {

#[derive(Debug)]
pub struct MempoolGroup<M> {
mempools: HashMap<Address, Arc<M>>,
mempools: HashMap<Address, M>,
chain_update_sender: broadcast::Sender<Arc<ChainUpdate>>,
}

impl<M> MempoolGroup<M>
where
M: Mempool,
{
pub fn new(mempools: Vec<Arc<M>>) -> Self {
pub fn new(mempools: Vec<M>) -> Self {
Self {
mempools: mempools.into_iter().map(|m| (m.entry_point(), m)).collect(),
chain_update_sender: broadcast::channel(1024).0,
}
}

pub fn subscribe_chain_update(self: Arc<Self>) -> broadcast::Receiver<Arc<ChainUpdate>> {
self.chain_update_sender.subscribe()
}

pub async fn run(
self: Arc<Self>,
mut chain_update_receiver: broadcast::Receiver<Arc<ChainUpdate>>,
shutdown_token: CancellationToken,
) {
loop {
tokio::select! {
_ = shutdown_token.cancelled() => {
tracing::info!("Shutting down UoPool");
break;
}
chain_update = chain_update_receiver.recv() => {
if let Ok(chain_update) = chain_update {
// Update each mempool before notifying listeners of the chain update
// This allows the mempools to update their state before the listeners
// pull information from the mempool.
// For example, a bundle builder listening for a new block to kick off
// its bundle building process will want to be able to query the mempool
// and only receive operations that have not yet been mined.
for mempool in self.mempools.values() {
mempool.on_chain_update(&chain_update);
}
let _ = self.chain_update_sender.send(chain_update);
}
}
}
}
}

Expand Down Expand Up @@ -265,10 +305,9 @@ where
Ok(mempool.dump_reputation())
}

pub fn get_pool(&self, entry_point: Address) -> MempoolResult<Arc<M>> {
fn get_pool(&self, entry_point: Address) -> MempoolResult<&M> {
self.mempools
.get(&entry_point)
.cloned()
.ok_or_else(|| MempoolError::UnknownEntryPoint(entry_point))
}
}
Expand Down
Loading
Loading