Skip to content

Commit

Permalink
feat: stream block updates from pool, use in builder
Browse files Browse the repository at this point in the history
  • Loading branch information
dancoombs committed Aug 1, 2023
1 parent 08ab644 commit 26b5a2f
Show file tree
Hide file tree
Showing 16 changed files with 460 additions and 256 deletions.
7 changes: 7 additions & 0 deletions proto/op_pool.proto
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ service OpPool {
rpc DebugDumpMempool (DebugDumpMempoolRequest) returns (DebugDumpMempoolResponse);
rpc DebugSetReputation (DebugSetReputationRequest) returns (DebugSetReputationResponse);
rpc DebugDumpReputation (DebugDumpReputationRequest) returns (DebugDumpReputationResponse);
rpc SubscribeNewBlocks(SubscribeNewBlocksRequest) returns (stream NewBlock);
}

message GetSupportedEntryPointsRequest {}
Expand Down Expand Up @@ -157,6 +158,12 @@ message DebugDumpReputationSuccess {
repeated Reputation reputations = 1;
}

message SubscribeNewBlocksRequest {}
message NewBlock {
bytes hash = 1;
uint64 number = 2;
}

message Reputation {
bytes address = 1;
ReputationStatus status = 2;
Expand Down
58 changes: 32 additions & 26 deletions src/builder/bundle_sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,10 @@ use std::{
};

use anyhow::{bail, Context};
use ethers::{
providers::{Http, Middleware, Provider, RetryClient},
types::{transaction::eip2718::TypedTransaction, Address, H256, U256},
};
use ethers::types::{transaction::eip2718::TypedTransaction, Address, H256, U256};
use tokio::{
join,
sync::{mpsc, oneshot},
sync::{broadcast, mpsc, oneshot},
time,
};
use tonic::async_trait;
Expand All @@ -29,7 +26,7 @@ use crate::{
math,
types::{Entity, EntryPointLike, ExpectedStorage, UserOperation},
},
op_pool::PoolClient,
op_pool::{NewBlock, PoolClient},
};

// Overhead on gas estimates to account for inaccuracies.
Expand Down Expand Up @@ -63,8 +60,6 @@ where
entry_point: E,
transaction_tracker: T,
pool_client: C,
// TODO: Figure out what we really want to do for detecting new blocks.
provider: Arc<Provider<RetryClient<Http>>>,
settings: Settings,
}

Expand Down Expand Up @@ -108,6 +103,13 @@ where
/// next one.
async fn send_bundles_in_loop(&mut self) {
let mut last_block_number = 0;
let mut new_blocks = if let Ok(new_blocks) = self.pool_client.subscribe_new_blocks().await {
new_blocks
} else {
error!("Failed to subscribe to new blocks");
return;
};

loop {
let mut send_bundle_response: Option<oneshot::Sender<SendBundleResult>> = None;

Expand All @@ -122,7 +124,17 @@ where
}
}

last_block_number = self.wait_for_new_block_number(last_block_number).await;
last_block_number = match self
.wait_for_new_block_number(last_block_number, &mut new_blocks)
.await
{
Ok(l) => l,
Err(e) => {
error!("Failed to retrieve new block number: {e}");
continue;
}
};

self.check_for_and_log_transaction_update().await;
let result = self.send_bundle_with_increasing_gas_fees().await;
match &result {
Expand Down Expand Up @@ -175,7 +187,6 @@ where
entry_point: E,
transaction_tracker: T,
pool_client: C,
provider: Arc<Provider<RetryClient<Http>>>,
settings: Settings,
) -> Self {
Self {
Expand All @@ -188,7 +199,6 @@ where
entry_point,
transaction_tracker,
pool_client,
provider,
settings,
}
}
Expand Down Expand Up @@ -326,23 +336,19 @@ where
Ok(SendBundleResult::StalledAtMaxFeeIncreases)
}

async fn wait_for_new_block_number(&self, prev_block_number: u64) -> u64 {
async fn wait_for_new_block_number(
&self,
prev_block_number: u64,
new_blocks: &mut broadcast::Receiver<NewBlock>,
) -> anyhow::Result<u64> {
loop {
let block_number = self.provider.get_block_number().await;
match block_number {
Ok(n) => {
let n = n.as_u64();
if n > prev_block_number {
return n;
}
}
Err(error) => {
error!(
"Failed to load latest block number in builder. Will keep trying: {error}"
);
}
let block = new_blocks
.recv()
.await
.context("builder should receive new blocks")?;
if block.number > prev_block_number {
return Ok(block.number);
}
time::sleep(self.eth_poll_interval).await;
}
}

Expand Down
10 changes: 6 additions & 4 deletions src/builder/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,12 @@ impl Task for BuilderTask {
let (send_bundle_tx, send_bundle_rx) = mpsc::channel(1);

let mut builder: Box<dyn BundleSender> = match &self.args.pool_client_mode {
PoolClientMode::Local { sender } => {
let pool_client = LocalPoolClient::new(sender.clone());
PoolClientMode::Local {
sender,
block_receiver,
} => {
let pool_client =
LocalPoolClient::new(sender.clone(), block_receiver.resubscribe());
let proposer = BundleProposerImpl::new(
pool_client.clone(),
simulator,
Expand All @@ -170,7 +174,6 @@ impl Task for BuilderTask {
entry_point,
transaction_tracker,
pool_client,
provider,
builder_settings,
))
}
Expand All @@ -194,7 +197,6 @@ impl Task for BuilderTask {
entry_point,
transaction_tracker,
pool_client,
provider,
builder_settings,
))
}
Expand Down
24 changes: 20 additions & 4 deletions src/cli/node.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use clap::Args;
use tokio::sync::mpsc;
use tokio::sync::{broadcast, mpsc};

use super::{builder::BuilderArgs, pool::PoolArgs, rpc::RpcArgs, CommonArgs};
use crate::{
Expand Down Expand Up @@ -31,19 +31,35 @@ pub async fn run(bundler_args: NodeCliArgs, common_args: CommonArgs) -> anyhow::
let builder_url = builder_args.url(false);

let (tx, rx) = mpsc::channel(1024);
let (block_sender, block_receiver) = broadcast::channel(1024);

let pool_task_args = pool_args
.to_args(&common_args, PoolServerMode::Local { receiver: Some(rx) })
.to_args(
&common_args,
PoolServerMode::Local {
req_receiver: Some(rx),
block_sender: Some(block_sender),
},
)
.await?;
let builder_task_args = builder_args
.to_args(&common_args, PoolClientMode::Local { sender: tx.clone() })
.to_args(
&common_args,
PoolClientMode::Local {
sender: tx.clone(),
block_receiver: block_receiver.resubscribe(),
},
)
.await?;
let rpc_task_args = rpc_args
.to_args(
&common_args,
builder_url,
(&common_args).try_into()?,
PoolClientMode::Local { sender: tx.clone() },
PoolClientMode::Local {
sender: tx.clone(),
block_receiver,
},
)
.await?;

Expand Down
80 changes: 22 additions & 58 deletions src/op_pool/event/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,19 +34,14 @@ pub struct EventListener<F: BlockProviderFactory> {
provider_factory: F,
provider: Option<F::Provider>,
log_filter_base: Filter,
entrypoint_event_broadcasts: HashMap<Address, broadcast::Sender<Arc<NewBlockEvent>>>,
event_broadcast: broadcast::Sender<Arc<NewBlockEvent>>,
last_reconnect: Option<Instant>,
backoff_idx: u32,
}

impl<F: BlockProviderFactory> EventProvider for EventListener<F> {
fn subscribe_by_entrypoint(
&self,
entry_point: Address,
) -> Option<broadcast::Receiver<Arc<NewBlockEvent>>> {
self.entrypoint_event_broadcasts
.get(&entry_point)
.map(|b| b.subscribe())
fn subscribe(&self) -> broadcast::Receiver<Arc<NewBlockEvent>> {
self.event_broadcast.subscribe()
}

fn spawn(
Expand All @@ -60,24 +55,12 @@ impl<F: BlockProviderFactory> EventProvider for EventListener<F> {
impl<F: BlockProviderFactory> EventListener<F> {
/// Create a new event listener from a block provider factory and list entry points
/// Must call listen_with_shutdown to start listening
pub fn new<'a>(
provider_factory: F,
entry_points: impl IntoIterator<Item = &'a Address>,
) -> Self {
let mut entry_point_addresses = vec![];
let mut entrypoint_event_broadcasts = HashMap::new();
for ep in entry_points {
entry_point_addresses.push(*ep);
entrypoint_event_broadcasts.insert(*ep, broadcast::channel(1000).0);
}

let log_filter_base = Filter::new().address(entry_point_addresses);

pub fn new(provider_factory: F, entry_points: Vec<Address>) -> Self {
Self {
provider_factory,
provider: None,
log_filter_base,
entrypoint_event_broadcasts,
log_filter_base: Filter::new().address(entry_points),
event_broadcast: broadcast::channel(1024).0,
last_reconnect: None,
backoff_idx: 0,
}
Expand Down Expand Up @@ -209,30 +192,18 @@ impl<F: BlockProviderFactory> EventListener<F> {
.block
.number
.context("block should have number")?;
let mut block_events = HashMap::new();

EventListenerMetrics::increment_blocks_seen();
EventListenerMetrics::set_block_height(block_number.as_u64());

for ep in self.entrypoint_event_broadcasts.keys() {
block_events.insert(
*ep,
NewBlockEvent {
address: *ep,
hash: block_hash,
number: block_number,
events: vec![],
},
);
}
let mut block_event = NewBlockEvent {
hash: block_hash,
number: block_number,
events: HashMap::new(),
};

for log in block_with_logs.logs {
let ep_address = log.address;
if !block_events.contains_key(&ep_address) {
error!("Received log for unknown entrypoint {ep_address:?}");
continue;
}

let txn_hash = log
.transaction_hash
.context("log should have transaction hash")?;
Expand All @@ -246,24 +217,17 @@ impl<F: BlockProviderFactory> EventListener<F> {
txn_index,
};

block_events.entry(ep_address).and_modify(|e| {
e.events.push(event);
});
block_event
.events
.entry(ep_address)
.or_insert_with(Vec::new)
.push(event);
EventListenerMetrics::increment_events_seen();
}

for (ep, block_event) in block_events {
match self.entrypoint_event_broadcasts.get(&ep) {
Some(broadcast) => {
// ignore sender errors, which can only happen if there are no receivers
let _ = broadcast.send(Arc::new(block_event));
}
None => {
error!("No broadcast channel for entry point: {:?}", ep);
}
}
if let Err(e) = self.event_broadcast.send(Arc::new(block_event)) {
error!("Error broadcasting block event: {:?}", e);
}

Ok(())
}

Expand Down Expand Up @@ -391,7 +355,7 @@ mod tests {
assert_eq!(block_event.events.len(), 1);

if let IEntryPointEvents::UserOperationEventFilter(block_event) =
&block_event.events[0].contract_event
&block_event.events.get(&state.ep).unwrap()[0].contract_event
{
assert_eq!(block_event, &event);
} else {
Expand Down Expand Up @@ -427,7 +391,7 @@ mod tests {
assert_eq!(block_event.events.len(), 1);

if let IEntryPointEvents::UserOperationEventFilter(block_event) =
&block_event.events[0].contract_event
&block_event.events.get(&state.ep).unwrap()[0].contract_event
{
assert_eq!(block_event, &event);
} else {
Expand Down Expand Up @@ -543,8 +507,8 @@ mod tests {
let factory =
MockBlockProviderFactory::new(rx, connection_event_tx, should_fail_connection.clone());
let ep = Address::random();
let listener = EventListener::new(factory, vec![&ep]);
let events = listener.subscribe_by_entrypoint(ep).unwrap();
let listener = EventListener::new(factory, vec![ep]);
let events = listener.subscribe();
let listener_shutdown = shutdown_token.clone();
let handle =
tokio::spawn(async move { listener.listen_with_shutdown(listener_shutdown).await });
Expand Down
Loading

0 comments on commit 26b5a2f

Please sign in to comment.