diff --git a/Cargo.lock b/Cargo.lock index 3ac8ac46a..802c40723 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -255,9 +255,9 @@ dependencies = [ [[package]] name = "async-stream" -version = "0.3.4" +version = "0.3.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ad445822218ce64be7a341abfb0b1ea43b5c23aa83902542a4542e78309d8e5e" +checksum = "cd56dd203fef61ac097dd65721a419ddccb106b2d2b70ba60a6b529f03961a51" dependencies = [ "async-stream-impl", "futures-core", @@ -266,13 +266,13 @@ dependencies = [ [[package]] name = "async-stream-impl" -version = "0.3.4" +version = "0.3.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e4655ae1a7b0cdf149156f780c5bf3f1352bc53cbd9e0a361a7ef7b22947e965" +checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193" dependencies = [ "proc-macro2", "quote", - "syn 1.0.107", + "syn 2.0.17", ] [[package]] @@ -3940,6 +3940,7 @@ version = "0.1.0" dependencies = [ "anyhow", "arrayvec", + "async-stream", "cargo-husky", "chrono", "clap 4.2.4", diff --git a/Cargo.toml b/Cargo.toml index 4630d9487..a955fa5e7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,6 +9,7 @@ default-run = "rundler" [dependencies] anyhow = "1.0.70" arrayvec = "0.7.2" +async-stream = "0.3.5" chrono = "0.4.24" clap = { version = "4.2.4", features = ["derive", "env"] } dotenv = "0.15.0" diff --git a/proto/op_pool/op_pool.proto b/proto/op_pool/op_pool.proto index 94afdaf64..3b585edf9 100644 --- a/proto/op_pool/op_pool.proto +++ b/proto/op_pool/op_pool.proto @@ -116,6 +116,10 @@ service OpPool { // reputation objects, each with the fields described above in // debug_bundler_setReputation rpc DebugDumpReputation (DebugDumpReputationRequest) returns (DebugDumpReputationResponse); + + // Streaming API to subscribe to be updated upon a new block being added to (or reorged onto) + // the chain. + rpc SubscribeNewHeads(SubscribeNewHeadsRequest) returns (stream SubscribeNewHeadsResponse); } message GetSupportedEntryPointsRequest {} @@ -239,6 +243,18 @@ message DebugDumpReputationSuccess { repeated Reputation reputations = 1; } +message SubscribeNewHeadsRequest {} +message SubscribeNewHeadsResponse { + // The new chain head + NewHead new_head = 1; +} +message NewHead { + // The block hash + bytes block_hash = 1; + // The block number + uint64 block_number = 2; +} + message Reputation { // The (serialized) address to set the reputation for bytes address = 1; diff --git a/src/builder/bundle_sender.rs b/src/builder/bundle_sender.rs index 9c225ff90..3ed81dc7f 100644 --- a/src/builder/bundle_sender.rs +++ b/src/builder/bundle_sender.rs @@ -7,15 +7,13 @@ use std::{ }; use anyhow::{bail, Context}; -use ethers::{ - providers::{Http, Provider, RetryClient}, - types::{transaction::eip2718::TypedTransaction, Address, H256, U256}, -}; +use ethers::types::{transaction::eip2718::TypedTransaction, Address, H256, U256}; use tokio::{ join, sync::{broadcast, mpsc, oneshot}, time, }; +use tokio_stream::StreamExt; use tonic::async_trait; use tracing::{error, info, trace, warn}; @@ -26,7 +24,6 @@ use crate::{ transaction_tracker::{SendResult, TrackerUpdate, TransactionTracker}, }, common::{ - block_watcher, emit::WithEntryPoint, gas::GasFees, math, @@ -66,8 +63,6 @@ where entry_point: E, transaction_tracker: T, pool_client: C, - // TODO: Figure out what we really want to do for detecting new blocks. - provider: Arc>>, settings: Settings, event_sender: broadcast::Sender>, } @@ -111,7 +106,34 @@ where /// then waiting for one bundle to be mined or dropped before forming the /// next one. async fn send_bundles_in_loop(&mut self) { - let mut last_block_number = 0; + let mut new_heads = if let Ok(new_blocks) = self.pool_client.subscribe_new_heads() { + new_blocks + } else { + error!("Failed to subscribe to new blocks"); + return; + }; + + // The new_heads stream can buffer up multiple blocks, but we only want to consume the latest one. + // This task is used to consume the new heads and place them onto a channel that can be syncronously + // consumed until the latest block is reached. + let (tx, mut rx) = mpsc::channel(1024); + tokio::spawn(async move { + loop { + match new_heads.next().await { + Some(b) => { + if tx.send(b).await.is_err() { + error!("Failed to buffer new block for bundle sender"); + return; + } + } + None => { + error!("Block stream ended"); + return; + } + } + } + }); + loop { let mut send_bundle_response: Option> = None; @@ -126,12 +148,32 @@ where } } - last_block_number = block_watcher::wait_for_new_block_number( - &*self.provider, - last_block_number, - self.eth_poll_interval, - ) - .await; + // Wait for new block. Block number doesn't matter as the pool will only notify of new blocks + // after the pool has updated its state. The bundle will be formed using the latest pool state + // and can land in the next block + let mut last_block = match rx.recv().await { + Some(b) => b, + None => { + error!("Block stream closed"); + return; + } + }; + // Consume any other blocks that may have been buffered up + loop { + match rx.try_recv() { + Ok(b) => { + last_block = b; + } + Err(mpsc::error::TryRecvError::Empty) => { + break; + } + Err(mpsc::error::TryRecvError::Disconnected) => { + error!("Block stream closed"); + return; + } + } + } + self.check_for_and_log_transaction_update().await; let result = self.send_bundle_with_increasing_gas_fees().await; match &result { @@ -145,7 +187,7 @@ where } else { info!("Bundle with hash {tx_hash:?} landed in block {block_number} after increasing gas fees {attempt_number} time(s)"); } - SendBundleResult::NoOperationsInitially => trace!("No ops to send at block {last_block_number}"), + SendBundleResult::NoOperationsInitially => trace!("No ops to send at block {}", last_block.block_number), SendBundleResult::NoOperationsAfterFeeIncreases { initial_op_count, attempt_number, @@ -184,7 +226,6 @@ where entry_point: E, transaction_tracker: T, pool_client: C, - provider: Arc>>, settings: Settings, event_sender: broadcast::Sender>, ) -> Self { @@ -198,7 +239,6 @@ where entry_point, transaction_tracker, pool_client, - provider, settings, event_sender, } diff --git a/src/builder/task.rs b/src/builder/task.rs index 65487a845..9719d6bca 100644 --- a/src/builder/task.rs +++ b/src/builder/task.rs @@ -154,8 +154,12 @@ impl Task for BuilderTask { let (send_bundle_tx, send_bundle_rx) = mpsc::channel(1); let mut builder: Box = match &self.args.pool_client_mode { - PoolClientMode::Local { sender } => { - let pool_client = LocalPoolClient::new(sender.clone()); + PoolClientMode::Local { + req_sender, + new_heads_receiver, + } => { + let pool_client = + LocalPoolClient::new(req_sender.clone(), new_heads_receiver.resubscribe()); let proposer = BundleProposerImpl::new( pool_client.clone(), simulator, @@ -175,7 +179,6 @@ impl Task for BuilderTask { entry_point, transaction_tracker, pool_client, - provider, builder_settings, self.event_sender.clone(), )) @@ -201,7 +204,6 @@ impl Task for BuilderTask { entry_point, transaction_tracker, pool_client, - provider, builder_settings, self.event_sender.clone(), )) diff --git a/src/cli/node/mod.rs b/src/cli/node/mod.rs index 6acb0f29a..452d59878 100644 --- a/src/cli/node/mod.rs +++ b/src/cli/node/mod.rs @@ -41,12 +41,25 @@ pub async fn run(bundler_args: NodeCliArgs, common_args: CommonArgs) -> anyhow:: let builder_url = builder_args.url(false); let (tx, rx) = mpsc::channel(1024); + let (new_heads_sender, _) = broadcast::channel(1024); let pool_task_args = pool_args - .to_args(&common_args, PoolServerMode::Local { receiver: Some(rx) }) + .to_args( + &common_args, + PoolServerMode::Local { + req_receiver: Some(rx), + new_heads_sender: Some(new_heads_sender.clone()), + }, + ) .await?; let builder_task_args = builder_args - .to_args(&common_args, PoolClientMode::Local { sender: tx.clone() }) + .to_args( + &common_args, + PoolClientMode::Local { + req_sender: tx.clone(), + new_heads_receiver: new_heads_sender.subscribe(), + }, + ) .await?; let rpc_task_args = rpc_args .to_args( @@ -54,7 +67,10 @@ pub async fn run(bundler_args: NodeCliArgs, common_args: CommonArgs) -> anyhow:: builder_url, (&common_args).try_into()?, (&common_args).try_into()?, - PoolClientMode::Local { sender: tx }, + PoolClientMode::Local { + req_sender: tx, + new_heads_receiver: new_heads_sender.subscribe(), + }, ) .await?; diff --git a/src/op_pool/chain.rs b/src/op_pool/chain.rs index afccca859..42288b478 100644 --- a/src/op_pool/chain.rs +++ b/src/op_pool/chain.rs @@ -41,7 +41,7 @@ pub struct Chain { load_ops_semaphore: Semaphore, } -#[derive(Debug, Eq, PartialEq)] +#[derive(Default, Debug, Eq, PartialEq)] pub struct ChainUpdate { pub latest_block_number: u64, pub latest_block_hash: H256, diff --git a/src/op_pool/mempool/mod.rs b/src/op_pool/mempool/mod.rs index d7d767a3d..5de63d4cb 100644 --- a/src/op_pool/mempool/mod.rs +++ b/src/op_pool/mempool/mod.rs @@ -16,10 +16,12 @@ pub use reputation::{ HourlyMovingAverageReputation, Reputation, ReputationParams, ReputationStatus, }; use strum::IntoEnumIterator; +use tokio::sync::broadcast; +use tokio_util::sync::CancellationToken; use tonic::async_trait; use self::error::MempoolResult; -use super::MempoolError; +use super::{chain::ChainUpdate, MempoolError}; use crate::common::{ mempool::MempoolConfig, precheck, simulation, @@ -30,6 +32,9 @@ use crate::common::{ #[async_trait] /// In-memory operation pool pub trait Mempool: Send + Sync + 'static { + /// Call to update the mempool with a new chain update + fn on_chain_update(&self, update: &ChainUpdate); + /// Returns the entry point address this pool targets. fn entry_point(&self) -> Address; @@ -178,16 +183,51 @@ impl PoolOperation { #[derive(Debug)] pub struct MempoolGroup { - mempools: HashMap>, + mempools: HashMap, + chain_update_sender: broadcast::Sender>, } impl MempoolGroup where M: Mempool, { - pub fn new(mempools: Vec>) -> Self { + pub fn new(mempools: Vec) -> Self { Self { mempools: mempools.into_iter().map(|m| (m.entry_point(), m)).collect(), + chain_update_sender: broadcast::channel(1024).0, + } + } + + pub fn subscribe_chain_update(self: Arc) -> broadcast::Receiver> { + self.chain_update_sender.subscribe() + } + + pub async fn run( + self: Arc, + mut chain_update_receiver: broadcast::Receiver>, + shutdown_token: CancellationToken, + ) { + loop { + tokio::select! { + _ = shutdown_token.cancelled() => { + tracing::info!("Shutting down UoPool"); + break; + } + chain_update = chain_update_receiver.recv() => { + if let Ok(chain_update) = chain_update { + // Update each mempool before notifying listeners of the chain update + // This allows the mempools to update their state before the listeners + // pull information from the mempool. + // For example, a bundle builder listening for a new block to kick off + // its bundle building process will want to be able to query the mempool + // and only receive operations that have not yet been mined. + for mempool in self.mempools.values() { + mempool.on_chain_update(&chain_update); + } + let _ = self.chain_update_sender.send(chain_update); + } + } + } } } @@ -265,10 +305,9 @@ where Ok(mempool.dump_reputation()) } - pub fn get_pool(&self, entry_point: Address) -> MempoolResult> { + fn get_pool(&self, entry_point: Address) -> MempoolResult<&M> { self.mempools .get(&entry_point) - .cloned() .ok_or_else(|| MempoolError::UnknownEntryPoint(entry_point)) } } diff --git a/src/op_pool/mempool/uo_pool.rs b/src/op_pool/mempool/uo_pool.rs index bbe7ba34b..ea6f74c6f 100644 --- a/src/op_pool/mempool/uo_pool.rs +++ b/src/op_pool/mempool/uo_pool.rs @@ -6,7 +6,6 @@ use std::{ use ethers::types::{Address, H256}; use parking_lot::RwLock; use tokio::sync::broadcast; -use tokio_util::sync::CancellationToken; use tonic::async_trait; use tracing::info; @@ -81,33 +80,21 @@ where } } - pub async fn run( - self: Arc, - mut chain_events: broadcast::Receiver>, - shutdown_token: CancellationToken, - ) { - loop { - tokio::select! { - _ = shutdown_token.cancelled() => { - tracing::info!("Shutting down UoPool"); - break; - } - update = chain_events.recv() => { - if let Ok(update) = update { - self.on_chain_update(&update); - } - } - } - } - } - fn emit(&self, event: OpPoolEvent) { let _ = self.event_sender.send(WithEntryPoint { entry_point: self.entry_point, event, }); } +} +#[async_trait] +impl Mempool for UoPool +where + R: ReputationManager, + P: Prechecker, + S: Simulator, +{ fn on_chain_update(&self, update: &ChainUpdate) { let mut state = self.state.write(); let deduped_ops = update.deduped_ops(); @@ -188,15 +175,7 @@ where } state.block_number = update.latest_block_number; } -} -#[async_trait] -impl Mempool for UoPool -where - R: ReputationManager, - P: Prechecker, - S: Simulator, -{ fn entry_point(&self) -> Address { self.entry_point } diff --git a/src/op_pool/mod.rs b/src/op_pool/mod.rs index aa80cdac9..a049503b2 100644 --- a/src/op_pool/mod.rs +++ b/src/op_pool/mod.rs @@ -8,7 +8,7 @@ pub use mempool::{error::MempoolError, PoolConfig, PoolOperation, Reputation, Re #[cfg(test)] pub use server::MockPoolClient; pub use server::{ - connect_remote_pool_client, LocalPoolClient, PoolClient, PoolClientMode, PoolResult, + connect_remote_pool_client, LocalPoolClient, NewHead, PoolClient, PoolClientMode, PoolResult, PoolServerError, RemotePoolClient, ServerRequest as LocalPoolServerRequest, }; pub use task::*; diff --git a/src/op_pool/server/local/client.rs b/src/op_pool/server/local/client.rs index b0fc26fc1..8de6ec5d6 100644 --- a/src/op_pool/server/local/client.rs +++ b/src/op_pool/server/local/client.rs @@ -1,25 +1,37 @@ +use std::pin::Pin; + +use async_stream::stream; use ethers::types::{Address, H256}; -use tokio::sync::{mpsc, oneshot}; +use futures_util::Stream; +use tokio::sync::{broadcast, mpsc, oneshot}; use tonic::async_trait; +use tracing::error; use super::server::{ServerRequest, ServerRequestKind, ServerResponse}; use crate::{ common::types::{Entity, UserOperation}, op_pool::{ mempool::PoolOperation, - server::{error::PoolServerError, PoolClient, Reputation}, + server::{error::PoolServerError, NewHead, PoolClient, Reputation}, PoolResult, }, }; -#[derive(Clone, Debug)] +#[derive(Debug)] pub struct LocalPoolClient { sender: mpsc::Sender, + new_heads_receiver: broadcast::Receiver, } impl LocalPoolClient { - pub fn new(sender: mpsc::Sender) -> Self { - Self { sender } + pub fn new( + sender: mpsc::Sender, + new_heads_receiver: broadcast::Receiver, + ) -> Self { + Self { + sender, + new_heads_receiver, + } } async fn send(&self, request: ServerRequestKind) -> PoolResult { @@ -131,4 +143,31 @@ impl PoolClient for LocalPoolClient { _ => Err(PoolServerError::UnexpectedResponse), } } + + fn subscribe_new_heads(&self) -> PoolResult + Send>>> { + let mut rx = self.new_heads_receiver.resubscribe(); + Ok(Box::pin(stream! { + loop { + match rx.recv().await { + Ok(block) => yield block, + Err(broadcast::error::RecvError::Lagged(c)) => { + error!("new_heads_receiver lagged {c} blocks"); + } + Err(broadcast::error::RecvError::Closed) => { + error!("new_heads_receiver closed"); + break; + } + } + } + })) + } +} + +impl Clone for LocalPoolClient { + fn clone(&self) -> Self { + Self { + sender: self.sender.clone(), + new_heads_receiver: self.new_heads_receiver.resubscribe(), + } + } } diff --git a/src/op_pool/server/local/server.rs b/src/op_pool/server/local/server.rs index a6aa0adac..f0e80ff3c 100644 --- a/src/op_pool/server/local/server.rs +++ b/src/op_pool/server/local/server.rs @@ -1,6 +1,8 @@ +use std::sync::Arc; + use ethers::types::{Address, H256}; use tokio::{ - sync::{mpsc, oneshot}, + sync::{broadcast, mpsc, oneshot}, task::JoinHandle, }; use tokio_util::sync::CancellationToken; @@ -9,41 +11,61 @@ use crate::{ common::types::{Entity, UserOperation}, op_pool::{ mempool::{Mempool, MempoolGroup, OperationOrigin, PoolOperation}, - server::Reputation, + server::{NewHead, Reputation}, PoolResult, }, }; pub fn spawn_local_mempool_server( - mempool_runner: MempoolGroup, - receiver: mpsc::Receiver, + mempool_runner: Arc>, + req_receiver: mpsc::Receiver, + new_heads_sender: broadcast::Sender, shutdown_token: CancellationToken, ) -> anyhow::Result>> { - let mut server = LocalPoolServer::new(receiver, mempool_runner); + let mut server = LocalPoolServer::new(req_receiver, new_heads_sender, mempool_runner); let handle = tokio::spawn(async move { server.run(shutdown_token).await }); Ok(handle) } pub struct LocalPoolServer { - receiver: mpsc::Receiver, - mempools: MempoolGroup, + req_receiver: mpsc::Receiver, + new_heads_sender: broadcast::Sender, + mempools: Arc>, } impl LocalPoolServer where M: Mempool, { - pub fn new(receiver: mpsc::Receiver, mempools: MempoolGroup) -> Self { - Self { receiver, mempools } + pub fn new( + req_receiver: mpsc::Receiver, + new_heads_sender: broadcast::Sender, + mempools: Arc>, + ) -> Self { + Self { + req_receiver, + new_heads_sender, + mempools, + } } pub async fn run(&mut self, shutdown_token: CancellationToken) -> anyhow::Result<()> { + let mut chain_updates = self.mempools.clone().subscribe_chain_update(); + loop { tokio::select! { _ = shutdown_token.cancelled() => { break; } - Some(req) = self.receiver.recv() => { + chain_update = chain_updates.recv() => { + if let Ok(chain_update) = chain_update { + let _ = self.new_heads_sender.send(NewHead { + block_hash: chain_update.latest_block_hash, + block_number: chain_update.latest_block_number, + }); + } + } + Some(req) = self.req_receiver.recv() => { let resp = match req.request { ServerRequestKind::GetSupportedEntryPoints => { Ok(ServerResponse::GetSupportedEntryPoints { @@ -51,15 +73,13 @@ where }) }, ServerRequestKind::AddOp { entry_point, op } => { - let pool = self.mempools.get_pool(entry_point)?; + let mempools = self.mempools.clone(); tokio::spawn(async move { - let resp = match pool.add_operation(OperationOrigin::Local, op).await { + let resp = match mempools.add_op(entry_point, op, OperationOrigin::Local).await { Ok(hash) => Ok(ServerResponse::AddOp { hash }), Err(e) => Err(e.into()), }; - if let Err(e) = req.response.send(resp) { - tracing::error!("Failed to send response: {:?}", e); - } + req.response.send(resp).unwrap(); }); continue; }, @@ -172,8 +192,10 @@ pub enum ServerResponse { mod tests { use std::sync::Arc; + use tokio_stream::StreamExt; + use super::*; - use crate::op_pool::{mempool::MockMempool, LocalPoolClient, PoolClient}; + use crate::op_pool::{chain::ChainUpdate, mempool::MockMempool, LocalPoolClient, PoolClient}; #[tokio::test] async fn send_receive() { @@ -181,12 +203,15 @@ mod tests { let mut mock_pool = MockMempool::new(); mock_pool.expect_entry_point().returning(move || ep); - let mempool_group = MempoolGroup::new(vec![Arc::new(mock_pool)]); + let mempool_group = Arc::new(MempoolGroup::new(vec![mock_pool])); let (tx, rx) = mpsc::channel(1); let shutdown_token = CancellationToken::new(); - let handle = spawn_local_mempool_server(mempool_group, rx, shutdown_token.clone()).unwrap(); + let (block_tx, block_rx) = broadcast::channel(1); + let handle = + spawn_local_mempool_server(mempool_group, rx, block_tx, shutdown_token.clone()) + .unwrap(); + let client = LocalPoolClient::new(tx, block_rx); - let client = LocalPoolClient::new(tx); let ret = client.get_supported_entry_points().await.unwrap(); assert_eq!(ret, vec![ep]); @@ -200,15 +225,18 @@ mod tests { let mut mock_pool = MockMempool::new(); mock_pool.expect_entry_point().returning(move || ep); - let mempool_group = MempoolGroup::new(vec![Arc::new(mock_pool)]); + let mempool_group = Arc::new(MempoolGroup::new(vec![mock_pool])); let (tx, rx) = mpsc::channel(1); let shutdown_token = CancellationToken::new(); - let handle = spawn_local_mempool_server(mempool_group, rx, shutdown_token.clone()).unwrap(); + let (block_tx, block_rx) = broadcast::channel(1); + let handle = + spawn_local_mempool_server(mempool_group, rx, block_tx, shutdown_token.clone()) + .unwrap(); shutdown_token.cancel(); handle.await.unwrap().unwrap(); - let client = LocalPoolClient::new(tx); + let client = LocalPoolClient::new(tx, block_rx); let ret = client.get_supported_entry_points().await; assert!(ret.is_err()); } @@ -225,16 +253,73 @@ mod tests { .expect_add_operation() .returning(move |_, _| Ok(uo_hash)); - let mempool_group = MempoolGroup::new(vec![Arc::new(mock_pool)]); + let mempool_group = Arc::new(MempoolGroup::new(vec![mock_pool])); let (tx, rx) = mpsc::channel(1); let shutdown_token = CancellationToken::new(); - let handle = spawn_local_mempool_server(mempool_group, rx, shutdown_token.clone()).unwrap(); + let (block_tx, block_rx) = broadcast::channel(1); + let handle = + spawn_local_mempool_server(mempool_group, rx, block_tx, shutdown_token.clone()) + .unwrap(); + let client = LocalPoolClient::new(tx, block_rx); - let client = LocalPoolClient::new(tx); let ret = client.add_op(ep, uo).await.unwrap(); assert_eq!(ret, uo_hash); shutdown_token.cancel(); handle.await.unwrap().unwrap(); } + + #[tokio::test] + async fn send_blocks() { + let ep = Address::random(); + let mut mock_pool = MockMempool::new(); + let uo = UserOperation::default(); + let uo_hash = uo.op_hash(ep, 0); + + mock_pool.expect_entry_point().return_const(ep); + mock_pool + .expect_add_operation() + .returning(move |_, _| Ok(uo_hash)); + mock_pool + .expect_on_chain_update() + .times(..11) + .return_const(()); + + let mempool_group = Arc::new(MempoolGroup::new(vec![mock_pool])); + let (tx, rx) = mpsc::channel(1); + let shutdown_token = CancellationToken::new(); + let (block_tx, block_rx) = broadcast::channel(10); + let handle = + spawn_local_mempool_server(mempool_group.clone(), rx, block_tx, shutdown_token.clone()) + .unwrap(); + let client = LocalPoolClient::new(tx, block_rx); + + let (new_blocks_tx, new_blocks_rx) = broadcast::channel(10); + + let mempool_shutdown = shutdown_token.clone(); + let mempool_handle = tokio::spawn(async move { + mempool_group.run(new_blocks_rx, mempool_shutdown).await; + }); + + let mut recv = client.subscribe_new_heads().unwrap(); + + for i in 0..10 { + new_blocks_tx + .send(Arc::new(ChainUpdate { + latest_block_hash: H256::random(), + latest_block_number: i, + ..Default::default() + })) + .unwrap(); + } + + for i in 0..10 { + let ret = recv.next().await.unwrap(); + assert_eq!(ret.block_number, i); + } + + shutdown_token.cancel(); + handle.await.unwrap().unwrap(); + mempool_handle.await.unwrap(); + } } diff --git a/src/op_pool/server/mod.rs b/src/op_pool/server/mod.rs index d3d219570..ac143733e 100644 --- a/src/op_pool/server/mod.rs +++ b/src/op_pool/server/mod.rs @@ -2,13 +2,16 @@ mod error; mod local; mod remote; +use std::pin::Pin; + pub use error::PoolServerError; use ethers::types::{Address, H256}; +use futures_util::Stream; pub use local::{spawn_local_mempool_server, LocalPoolClient, ServerRequest}; #[cfg(test)] use mockall::automock; pub use remote::{connect_remote_pool_client, spawn_remote_mempool_server, RemotePoolClient}; -use tokio::sync::mpsc; +use tokio::sync::{broadcast, mpsc}; use tonic::async_trait; use super::{mempool::PoolOperation, Reputation}; @@ -44,12 +47,21 @@ pub trait PoolClient: Send + Sync + 'static { ) -> PoolResult<()>; async fn debug_dump_reputation(&self, entry_point: Address) -> PoolResult>; + + fn subscribe_new_heads(&self) -> PoolResult + Send>>>; +} + +#[derive(Clone, Debug)] +pub struct NewHead { + pub block_hash: H256, + pub block_number: u64, } #[derive(Debug)] pub enum PoolClientMode { Local { - sender: mpsc::Sender, + req_sender: mpsc::Sender, + new_heads_receiver: broadcast::Receiver, }, Remote { url: String, diff --git a/src/op_pool/server/remote/client.rs b/src/op_pool/server/remote/client.rs index 2afe7d53e..873d95c17 100644 --- a/src/op_pool/server/remote/client.rs +++ b/src/op_pool/server/remote/client.rs @@ -1,5 +1,10 @@ +use std::pin::Pin; + use anyhow::bail; use ethers::types::{Address, H256}; +use futures_util::Stream; +use tokio::sync::mpsc; +use tokio_stream::wrappers::ReceiverStream; use tokio_util::sync::CancellationToken; use tonic::{async_trait, transport::Channel}; @@ -9,21 +14,23 @@ use super::protos::{ op_pool_client::OpPoolClient, remove_entities_response, remove_ops_response, AddOpRequest, DebugClearStateRequest, DebugDumpMempoolRequest, DebugDumpReputationRequest, DebugSetReputationRequest, GetOpsRequest, RemoveEntitiesRequest, RemoveOpsRequest, + SubscribeNewHeadsRequest, SubscribeNewHeadsResponse, }; use crate::{ common::{ protos::{from_bytes, ConversionError}, + retry::{self, UnlimitedRetryOpts}, server::connect_with_retries, types::{Entity, UserOperation}, }, op_pool::{ mempool::{PoolOperation, Reputation}, - server::{error::PoolServerError, PoolClient}, + server::{error::PoolServerError, NewHead, PoolClient}, PoolResult, }, }; -#[derive(Clone, Debug)] +#[derive(Debug, Clone)] pub struct RemotePoolClient { op_pool_client: OpPoolClient, } @@ -34,6 +41,57 @@ impl RemotePoolClient { op_pool_client: client, } } + + // Handler for the new block subscription. This will attempt to resubscribe if the gRPC + // connection disconnects using expenential backoff. + async fn new_heads_subscription_handler( + client: OpPoolClient, + tx: mpsc::Sender, + ) { + let mut stream = None; + + loop { + if stream.is_none() { + stream = Some( + retry::with_unlimited_retries( + "subscribe new heads", + || { + let mut c = client.clone(); + async move { c.subscribe_new_heads(SubscribeNewHeadsRequest {}).await } + }, + UnlimitedRetryOpts::default(), + ) + .await + .into_inner(), + ); + } + + match stream.as_mut().unwrap().message().await { + Ok(Some(SubscribeNewHeadsResponse { new_head: Some(b) })) => match b.try_into() { + Ok(new_head) => { + if tx.send(new_head).await.is_err() { + // recv handle dropped + return; + } + } + Err(e) => { + tracing::error!("error parsing new block: {:?}", e); + break; + } + }, + Ok(Some(SubscribeNewHeadsResponse { new_head: None })) | Ok(None) => { + tracing::debug!("block subscription closed"); + stream.take(); + break; + } + Err(e) => { + tracing::error!("error in new block subscription: {:?}", e); + stream.take(); + break; + } + } + } + } } #[async_trait] @@ -235,6 +293,14 @@ impl PoolClient for RemotePoolClient { )))?, } } + + fn subscribe_new_heads(&self) -> PoolResult + Send>>> { + let (tx, rx) = mpsc::channel(1024); + let client = self.op_pool_client.clone(); + + tokio::spawn(Self::new_heads_subscription_handler(client, tx)); + Ok(Box::pin(ReceiverStream::new(rx))) + } } pub async fn connect_remote_pool_client( diff --git a/src/op_pool/server/remote/mod.rs b/src/op_pool/server/remote/mod.rs index 7459d6fae..da78b5146 100644 --- a/src/op_pool/server/remote/mod.rs +++ b/src/op_pool/server/remote/mod.rs @@ -1,5 +1,6 @@ mod client; mod error; +#[allow(non_snake_case)] mod protos; mod server; diff --git a/src/op_pool/server/remote/protos.rs b/src/op_pool/server/remote/protos.rs index df850f813..7a3335a4c 100644 --- a/src/op_pool/server/remote/protos.rs +++ b/src/op_pool/server/remote/protos.rs @@ -6,12 +6,12 @@ use crate::{ protos::{from_bytes, to_le_bytes, ConversionError}, types::{ Entity as CommonEntity, EntityType as CommonEntityType, - UserOperation as RpcUserOperation, ValidTimeRange, + UserOperation as PoolUserOperation, ValidTimeRange, }, }, op_pool::{ mempool::{Reputation as PoolReputation, ReputationStatus as PoolReputationStatus}, - PoolOperation, + NewHead as PoolNewHead, PoolOperation, }, }; @@ -20,8 +20,8 @@ tonic::include_proto!("op_pool"); pub const OP_POOL_FILE_DESCRIPTOR_SET: &[u8] = tonic::include_file_descriptor_set!("op_pool_descriptor"); -impl From<&RpcUserOperation> for UserOperation { - fn from(op: &RpcUserOperation) -> Self { +impl From<&PoolUserOperation> for UserOperation { + fn from(op: &PoolUserOperation) -> Self { UserOperation { sender: op.sender.0.to_vec(), nonce: to_le_bytes(op.nonce), @@ -38,11 +38,11 @@ impl From<&RpcUserOperation> for UserOperation { } } -impl TryFrom for RpcUserOperation { +impl TryFrom for PoolUserOperation { type Error = ConversionError; fn try_from(op: UserOperation) -> Result { - Ok(RpcUserOperation { + Ok(PoolUserOperation { sender: from_bytes(&op.sender)?, nonce: from_bytes(&op.nonce)?, init_code: op.init_code.into(), @@ -208,3 +208,14 @@ impl TryFrom for PoolOperation { }) } } + +impl TryFrom for PoolNewHead { + type Error = ConversionError; + + fn try_from(new_head: NewHead) -> Result { + Ok(Self { + block_hash: from_bytes(&new_head.block_hash)?, + block_number: new_head.block_number, + }) + } +} diff --git a/src/op_pool/server/remote/server.rs b/src/op_pool/server/remote/server.rs index 9d5dbf0a6..5e3ecacb6 100644 --- a/src/op_pool/server/remote/server.rs +++ b/src/op_pool/server/remote/server.rs @@ -1,7 +1,14 @@ -use std::net::SocketAddr; +use std::{ + net::SocketAddr, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }, +}; use ethers::types::{Address, H256}; -use tokio::task::JoinHandle; +use tokio::{sync::mpsc, task::JoinHandle}; +use tokio_stream::wrappers::ReceiverStream; use tokio_util::sync::CancellationToken; use tonic::{async_trait, transport::Server, Request, Response, Result, Status}; @@ -15,8 +22,9 @@ use super::protos::{ DebugDumpReputationRequest, DebugDumpReputationResponse, DebugDumpReputationSuccess, DebugSetReputationRequest, DebugSetReputationResponse, DebugSetReputationSuccess, GetOpsRequest, GetOpsResponse, GetOpsSuccess, GetSupportedEntryPointsRequest, - GetSupportedEntryPointsResponse, MempoolOp, RemoveEntitiesRequest, RemoveEntitiesResponse, - RemoveEntitiesSuccess, RemoveOpsRequest, RemoveOpsResponse, RemoveOpsSuccess, + GetSupportedEntryPointsResponse, MempoolOp, NewHead, RemoveEntitiesRequest, + RemoveEntitiesResponse, RemoveEntitiesSuccess, RemoveOpsRequest, RemoveOpsResponse, + RemoveOpsSuccess, SubscribeNewHeadsRequest, SubscribeNewHeadsResponse, OP_POOL_FILE_DESCRIPTOR_SET, }; use crate::{ @@ -24,14 +32,17 @@ use crate::{ op_pool::mempool::{Mempool, MempoolGroup, OperationOrigin, Reputation}, }; +const MAX_REMOTE_BLOCK_SUBSCRIPTIONS: usize = 32; + pub async fn spawn_remote_mempool_server( chain_id: u64, - mempool_group: MempoolGroup, + mempool_group: Arc>, addr: SocketAddr, shutdown_token: CancellationToken, ) -> anyhow::Result>> { // gRPC server - let op_pool_server = OpPoolServer::new(OpPoolImpl::new(chain_id, mempool_group)); + let pool_impl = Arc::new(OpPoolImpl::new(chain_id, mempool_group)); + let op_pool_server = OpPoolServer::new(Arc::clone(&pool_impl)); let reflection_service = tonic_reflection::server::Builder::configure() .register_encoded_file_descriptor_set(OP_POOL_FILE_DESCRIPTOR_SET) .build()?; @@ -39,7 +50,7 @@ pub async fn spawn_remote_mempool_server( // health service let (mut health_reporter, health_service) = tonic_health::server::health_reporter(); health_reporter - .set_serving::>>() + .set_serving::>>>() .await; let metrics_layer = GrpcMetricsLayer::new("op_pool".to_string()); @@ -51,24 +62,27 @@ pub async fn spawn_remote_mempool_server( .add_service(health_service) .serve_with_shutdown(addr, async move { shutdown_token.cancelled().await }) .await - .map_err(|err| anyhow::anyhow!("Server error: {err:?}")) + .map_err(|e| anyhow::anyhow!(format!("pool server failed: {e:?}"))) }); + Ok(handle) } struct OpPoolImpl { chain_id: u64, - mempool_group: MempoolGroup, + mempools: Arc>, + num_block_subscriptions: Arc, } impl OpPoolImpl where M: Mempool, { - pub fn new(chain_id: u64, mempool_group: MempoolGroup) -> Self { + pub fn new(chain_id: u64, mempools: Arc>) -> Self { Self { chain_id, - mempool_group, + mempools, + num_block_subscriptions: Arc::new(AtomicUsize::new(0)), } } @@ -79,7 +93,7 @@ where } #[async_trait] -impl OpPool for OpPoolImpl +impl OpPool for Arc> where M: Mempool + 'static, { @@ -90,7 +104,7 @@ where Ok(Response::new(GetSupportedEntryPointsResponse { chain_id: self.chain_id, entry_points: self - .mempool_group + .mempools .get_supported_entry_points() .into_iter() .map(|ep| ep.as_bytes().to_vec()) @@ -109,11 +123,7 @@ where Status::invalid_argument(format!("Failed to convert to UserOperation: {e}")) })?; - let resp = match self - .mempool_group - .add_op(ep, uo, OperationOrigin::Local) - .await - { + let resp = match self.mempools.add_op(ep, uo, OperationOrigin::Local).await { Ok(hash) => AddOpResponse { result: Some(add_op_response::Result::Success(AddOpSuccess { hash: hash.as_bytes().to_vec(), @@ -131,7 +141,7 @@ where let req = request.into_inner(); let ep = self.get_entry_point(&req.entry_point)?; - let resp = match self.mempool_group.get_ops(ep, req.max_ops) { + let resp = match self.mempools.get_ops(ep, req.max_ops) { Ok(ops) => GetOpsResponse { result: Some(get_ops_response::Result::Success(GetOpsSuccess { ops: ops.iter().map(MempoolOp::from).collect(), @@ -163,7 +173,7 @@ where }) .collect::, _>>()?; - let resp = match self.mempool_group.remove_ops(ep, &hashes) { + let resp = match self.mempools.remove_ops(ep, &hashes) { Ok(_) => RemoveOpsResponse { result: Some(remove_ops_response::Result::Success(RemoveOpsSuccess {})), }, @@ -188,11 +198,11 @@ where .collect::, _>>() .map_err(|e| Status::internal(format!("Failed to convert to proto entity: {e}")))?; - self.mempool_group + self.mempools .remove_entities(ep, &entities) .map_err(|e| Status::internal(e.to_string()))?; - let resp = match self.mempool_group.remove_entities(ep, &entities) { + let resp = match self.mempools.remove_entities(ep, &entities) { Ok(_) => RemoveEntitiesResponse { result: Some(remove_entities_response::Result::Success( RemoveEntitiesSuccess {}, @@ -210,7 +220,7 @@ where &self, _request: Request, ) -> Result> { - let resp = match self.mempool_group.debug_clear_state() { + let resp = match self.mempools.debug_clear_state() { Ok(_) => DebugClearStateResponse { result: Some(debug_clear_state_response::Result::Success( DebugClearStateSuccess {}, @@ -231,7 +241,7 @@ where let req = request.into_inner(); let ep = self.get_entry_point(&req.entry_point)?; - let resp = match self.mempool_group.debug_dump_mempool(ep) { + let resp = match self.mempools.debug_dump_mempool(ep) { Ok(ops) => DebugDumpMempoolResponse { result: Some(debug_dump_mempool_response::Result::Success( DebugDumpMempoolSuccess { @@ -270,7 +280,7 @@ where Status::internal(format!("Failed to convert from proto reputation {e}")) })?; - let resp = match self.mempool_group.debug_set_reputations(ep, &reps) { + let resp = match self.mempools.debug_set_reputations(ep, &reps) { Ok(_) => DebugSetReputationResponse { result: Some(debug_set_reputation_response::Result::Success( DebugSetReputationSuccess {}, @@ -291,7 +301,7 @@ where let req = request.into_inner(); let ep = self.get_entry_point(&req.entry_point)?; - let resp = match self.mempool_group.debug_dump_reputation(ep) { + let resp = match self.mempools.debug_dump_reputation(ep) { Ok(reps) => DebugDumpReputationResponse { result: Some(debug_dump_reputation_response::Result::Success( DebugDumpReputationSuccess { @@ -308,4 +318,50 @@ where Ok(Response::new(resp)) } + + type SubscribeNewHeadsStream = ReceiverStream>; + + async fn subscribe_new_heads( + &self, + _request: Request, + ) -> Result> { + let (tx, rx) = mpsc::channel(1024); + + if self.num_block_subscriptions.fetch_add(1, Ordering::Relaxed) + >= MAX_REMOTE_BLOCK_SUBSCRIPTIONS + { + self.num_block_subscriptions.fetch_sub(1, Ordering::Relaxed); + return Err(Status::resource_exhausted("Too many block subscriptions")); + } + + let num_block_subscriptions = Arc::clone(&self.num_block_subscriptions); + let mut chain_updates = self.mempools.clone().subscribe_chain_update(); + tokio::spawn(async move { + loop { + match chain_updates.recv().await { + Ok(chain_update) => { + if tx + .send(Ok(SubscribeNewHeadsResponse { + new_head: Some(NewHead { + block_hash: chain_update.latest_block_hash.as_bytes().to_vec(), + block_number: chain_update.latest_block_number, + }), + })) + .await + .is_err() + { + break; + } + } + Err(_) => { + tracing::warn!("chain update channel closed"); + break; + } + } + } + num_block_subscriptions.fetch_sub(1, Ordering::Relaxed); + }); + + Ok(Response::new(ReceiverStream::new(rx))) + } } diff --git a/src/op_pool/task.rs b/src/op_pool/task.rs index 8e7c81912..76a888b0e 100644 --- a/src/op_pool/task.rs +++ b/src/op_pool/task.rs @@ -4,10 +4,8 @@ use anyhow::{bail, Context}; use ethers::providers::{ Http, HttpRateLimitRetryPolicy, JsonRpcClient, Provider, RetryClientBuilder, }; -use futures::future; use tokio::{ sync::{broadcast, mpsc}, - task::JoinHandle, try_join, }; use tokio_util::sync::CancellationToken; @@ -16,7 +14,7 @@ use url::Url; use super::{ mempool::{HourlyMovingAverageReputation, PoolConfig, ReputationParams}, - server::ServerRequest, + server::{NewHead, ServerRequest}, }; use crate::{ common::{ @@ -28,7 +26,7 @@ use crate::{ simulation::{Simulator, SimulatorImpl}, }, op_pool::{ - chain::{self, Chain, ChainUpdate}, + chain::{self, Chain}, emit::OpPoolEvent, mempool::{uo_pool::UoPool, MempoolGroup}, server::{spawn_local_mempool_server, spawn_remote_mempool_server}, @@ -38,7 +36,8 @@ use crate::{ #[derive(Debug)] pub enum PoolServerMode { Local { - receiver: Option>, + req_receiver: Option>, + new_heads_sender: Option>, }, Remote { addr: SocketAddr, @@ -98,44 +97,48 @@ impl Task for PoolTask { // create mempools let mut mempools = vec![]; - let mut mempool_handles = Vec::new(); for pool_config in &self.args.pool_configs { - let (pool, handle) = PoolTask::create_mempool( - pool_config, - update_sender.subscribe(), - self.event_sender.clone(), - shutdown_token.clone(), - provider.clone(), - ) - .await - .context("should have created mempool")?; + let pool = + PoolTask::create_mempool(pool_config, self.event_sender.clone(), provider.clone()) + .await + .context("should have created mempool")?; mempools.push(pool); - mempool_handles.push(handle); } - // handle to wait for mempools to terminate + let mempool_group = Arc::new(MempoolGroup::new(mempools)); + let mempool_group_runner = Arc::clone(&mempool_group); + let mempool_shutdown = shutdown_token.clone(); + // handle to wait for mempool group to terminate let mempool_handle = tokio::spawn(async move { - future::join_all(mempool_handles) - .await - .into_iter() - .collect::, _>>() - .map(|_| ()) - .context("should have joined mempool handles") + mempool_group_runner + .run(update_sender.subscribe(), mempool_shutdown) + .await; + Ok(()) }); - let mempool_group = MempoolGroup::new(mempools); let server_handle = match &mut self.args.server_mode { - PoolServerMode::Local { ref mut receiver } => { - let recv = receiver + PoolServerMode::Local { + ref mut req_receiver, + ref mut new_heads_sender, + } => { + let req_receiver = req_receiver .take() .context("should have local server message receiver")?; - spawn_local_mempool_server(mempool_group, recv, shutdown_token.clone())? + let new_heads_sender = new_heads_sender + .take() + .context("should have block sender")?; + spawn_local_mempool_server( + Arc::clone(&mempool_group), + req_receiver, + new_heads_sender, + shutdown_token.clone(), + )? } PoolServerMode::Remote { addr } => { spawn_remote_mempool_server( self.args.chain_id, - mempool_group, + Arc::clone(&mempool_group), *addr, shutdown_token.clone(), ) @@ -176,14 +179,10 @@ impl PoolTask { async fn create_mempool( pool_config: &PoolConfig, - update_rx: broadcast::Receiver>, event_sender: broadcast::Sender>, - shutdown_token: CancellationToken, provider: Arc>, - ) -> anyhow::Result<( - Arc>, - JoinHandle<()>, - )> { + ) -> anyhow::Result> + { // Reputation manager let reputation = Arc::new(HourlyMovingAverageReputation::new( ReputationParams::bundler_default(), @@ -208,18 +207,12 @@ impl PoolTask { pool_config.mempool_channel_configs.clone(), ); - // Mempool - let mp = Arc::new(UoPool::new( + Ok(UoPool::new( pool_config.clone(), Arc::clone(&reputation), event_sender, prechecker, simulator, - )); - let mp_runner = Arc::clone(&mp); - let handle = - tokio::spawn(async move { mp_runner.run(update_rx, shutdown_token.clone()).await }); - - Ok((mp, handle)) + )) } } diff --git a/src/rpc/eth/estimation.rs b/src/rpc/eth/estimation.rs index d86b24f03..f01c6275a 100644 --- a/src/rpc/eth/estimation.rs +++ b/src/rpc/eth/estimation.rs @@ -430,7 +430,7 @@ mod tests { }; let estimator: GasEstimatorImpl = - GasEstimatorImpl::new(0, Arc::new(provider), entry, settings.clone()); + GasEstimatorImpl::new(0, Arc::new(provider), entry, settings); (estimator, settings) } @@ -524,12 +524,7 @@ mod tests { // Chose arbitrum let estimator: GasEstimatorImpl = - GasEstimatorImpl::new( - Chain::Arbitrum as u64, - Arc::new(provider), - entry, - settings.clone(), - ); + GasEstimatorImpl::new(Chain::Arbitrum as u64, Arc::new(provider), entry, settings); let user_op = demo_user_op_optional_gas(); let estimation = estimator.calc_pre_verification_gas(&user_op).await.unwrap(); @@ -572,12 +567,7 @@ mod tests { // Chose OP let estimator: GasEstimatorImpl = - GasEstimatorImpl::new( - Chain::Optimism as u64, - Arc::new(provider), - entry, - settings.clone(), - ); + GasEstimatorImpl::new(Chain::Optimism as u64, Arc::new(provider), entry, settings); let user_op = demo_user_op_optional_gas(); let estimation = estimator.calc_pre_verification_gas(&user_op).await.unwrap(); @@ -706,7 +696,7 @@ mod tests { // check for this overlflow provider.expect_call().returning(|_a, _b| { let result_data: Bytes = GasUsedResult { - gas_used: U256::from(18446744073709551616 as u128), + gas_used: U256::from(18446744073709551616_u128), success: false, result: Bytes::new(), } @@ -797,7 +787,7 @@ mod tests { .binary_search_verification_gas(&user_op, H256::zero()) .await; - assert_eq!(estimation.is_err(), true); + assert!(estimation.is_err()); } #[tokio::test] @@ -850,7 +840,7 @@ mod tests { .binary_search_verification_gas(&user_op, H256::zero()) .await; - assert_eq!(estimation.is_err(), true); + assert!(estimation.is_err()); } #[tokio::test] @@ -902,7 +892,7 @@ mod tests { .binary_search_verification_gas(&user_op, H256::zero()) .await; - assert_eq!(estimation.is_err(), true); + assert!(estimation.is_err()); } #[tokio::test] @@ -947,7 +937,7 @@ mod tests { .binary_search_verification_gas(&user_op, H256::zero()) .await; - assert_eq!(estimation.is_err(), true); + assert!(estimation.is_err()); } #[tokio::test] @@ -1020,10 +1010,10 @@ mod tests { .err() .unwrap(); - assert_eq!( - matches!(estimation, GasEstimationError::RevertInCallWithBytes(_)), - true - ); + assert!(matches!( + estimation, + GasEstimationError::RevertInCallWithBytes(_) + )); } #[tokio::test] diff --git a/src/rpc/task.rs b/src/rpc/task.rs index 973e8f8e9..917ede1f8 100644 --- a/src/rpc/task.rs +++ b/src/rpc/task.rs @@ -90,8 +90,12 @@ impl Task for RpcTask { let mut module = RpcModule::new(()); match &self.args.pool_client_mode { - PoolClientMode::Local { sender } => { - let pool_client = LocalPoolClient::new(sender.clone()); + PoolClientMode::Local { + req_sender, + new_heads_receiver, + } => { + let pool_client = + LocalPoolClient::new(req_sender.clone(), new_heads_receiver.resubscribe()); self.attach_namespaces(provider, pool_client.clone(), builder_client, &mut module)?; module.merge(LocalHealthCheck::new(pool_client).into_rpc())?; diff --git a/test/spec-tests/local/launcher.sh b/test/spec-tests/local/launcher.sh index 39b6f6893..b8ceb08d0 100755 --- a/test/spec-tests/local/launcher.sh +++ b/test/spec-tests/local/launcher.sh @@ -7,7 +7,7 @@ case $1 in start) docker-compose up -d sleep 10 - cast send --from $(cast rpc eth_accounts | tail -n 1 | tr -d '[]"') --value 1ether 0xf39Fd6e51aad88F6F4ce6aB8827279cffFb92266 > /dev/null + cast send --unlocked --from $(cast rpc eth_accounts | tail -n 1 | tr -d '[]"') --value 1ether 0xf39Fd6e51aad88F6F4ce6aB8827279cffFb92266 > /dev/null (cd ../bundler-spec-tests/@account-abstraction && yarn deploy --network localhost) ../../../target/debug/rundler node --log.file out.log & while [[ "$(curl -s -o /dev/null -w ''%{http_code}'' localhost:3000/health)" != "200" ]]; do sleep 1 ; done