Skip to content

Commit

Permalink
Unbuffered block stream (#2707)
Browse files Browse the repository at this point in the history
# Description
On networks with a very high block frequency we can get into troubles
when using the auto batching http transport for the RPC requests.

# Changes
With this PR we construct an fresh unbuffered `Web3` instance
specifically for the `CurrentBlockStream`.

## How to test
e2e tests should still work
  • Loading branch information
MartinquaXD authored May 10, 2024
1 parent 3e9a4d4 commit 16629c9
Show file tree
Hide file tree
Showing 10 changed files with 60 additions and 46 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 12 additions & 9 deletions crates/autopilot/src/infra/blockchain/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@ use {
ethcontract::dyns::DynWeb3,
ethrpc::current_block::CurrentBlockStream,
primitive_types::{H256, U256},
std::{sync::Arc, time::Duration},
std::time::Duration,
thiserror::Error,
url::Url,
};

pub mod contracts;
Expand All @@ -32,6 +33,7 @@ impl From<U256> for ChainId {
pub struct Rpc {
web3: DynWeb3,
chain: ChainId,
url: Url,
}

impl Rpc {
Expand All @@ -41,7 +43,11 @@ impl Rpc {
let web3 = boundary::buffered_web3_client(url);
let chain = web3.eth().chain_id().await?.into();

Ok(Self { web3, chain })
Ok(Self {
web3,
chain,
url: url.clone(),
})
}

/// Returns the chain id for the RPC connection.
Expand Down Expand Up @@ -72,16 +78,13 @@ impl Ethereum {
/// Since this type is essential for the program this method will panic on
/// any initialization error.
pub async fn new(rpc: Rpc, addresses: contracts::Addresses, poll_interval: Duration) -> Self {
let Rpc { web3, chain } = rpc;
let Rpc { web3, chain, url } = rpc;
let contracts = Contracts::new(&web3, &chain, addresses).await;

Self {
current_block: ethrpc::current_block::current_block_stream(
Arc::new(web3.clone()),
poll_interval,
)
.await
.expect("couldn't initialize current block stream"),
current_block: ethrpc::current_block::current_block_stream(url, poll_interval)
.await
.expect("couldn't initialize current block stream"),
web3,
chain,
contracts,
Expand Down
8 changes: 4 additions & 4 deletions crates/driver/src/boundary/liquidity/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,14 +64,14 @@ impl Fetcher {
block_stream_poll_interval: BLOCK_POLL_INTERVAL,
};

let block_stream = blocks.stream(boundary::web3(eth)).await?;
let block_stream = eth.current_block();
let block_retriever = blocks.retriever(boundary::web3(eth));

let uni_v2: Vec<_> = future::try_join_all(
config
.uniswap_v2
.iter()
.map(|config| uniswap::v2::collector(eth, &block_stream, config)),
.map(|config| uniswap::v2::collector(eth, block_stream, config)),
)
.await?;

Expand All @@ -80,7 +80,7 @@ impl Fetcher {
config
.swapr
.iter()
.map(|config| swapr::collector(eth, &block_stream, config)),
.map(|config| swapr::collector(eth, block_stream, config)),
)
.await?;

Expand Down Expand Up @@ -119,7 +119,7 @@ impl Fetcher {
);

Ok(Self {
blocks: block_stream,
blocks: block_stream.clone(),
inner: LiquidityCollector {
liquidity_sources: [uni_v2, swapr, bal_v2, uni_v3, zeroex]
.into_iter()
Expand Down
12 changes: 9 additions & 3 deletions crates/driver/src/infra/blockchain/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use {
ethrpc::current_block::CurrentBlockStream,
std::{fmt, sync::Arc},
thiserror::Error,
url::Url,
web3::Transport,
};

Expand All @@ -20,6 +21,7 @@ pub use self::{contracts::Contracts, gas::GasPriceEstimator};
pub struct Rpc {
web3: DynWeb3,
chain: eth::ChainId,
url: Url,
}

impl Rpc {
Expand All @@ -29,7 +31,11 @@ impl Rpc {
let web3 = boundary::buffered_web3_client(url);
let chain = web3.eth().chain_id().await?.into();

Ok(Self { web3, chain })
Ok(Self {
web3,
chain,
url: url.clone(),
})
}

/// Returns the chain id for the RPC connection.
Expand Down Expand Up @@ -69,15 +75,15 @@ impl Ethereum {
addresses: contracts::Addresses,
gas: Arc<GasPriceEstimator>,
) -> Self {
let Rpc { web3, chain } = rpc;
let Rpc { web3, chain, url } = rpc;
let contracts = Contracts::new(&web3, chain, addresses)
.await
.expect("could not initialize important smart contracts");

Self {
inner: Arc::new(Inner {
current_block: ethrpc::current_block::current_block_stream(
Arc::new(web3.clone()),
url,
std::time::Duration::from_millis(500),
)
.await
Expand Down
14 changes: 8 additions & 6 deletions crates/e2e/tests/e2e/quote_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,14 @@ const FORK_BLOCK_MAINNET: u64 = 19796077;
/// integrations. Based on an RFQ quote we saw on prod:
/// https://www.tdly.co/shared/simulation/7402de5e-e524-4e24-9af8-50d0a38c105b
async fn test_bypass_verification_for_rfq_quotes(web3: Web3) {
let block_stream = ethrpc::current_block::current_block_stream(
Arc::new(web3.clone()),
std::time::Duration::from_millis(1_000),
)
.await
.unwrap();
let url = std::env::var("FORK_URL_MAINNET")
.expect("FORK_URL_MAINNET must be set to run forked tests")
.parse()
.unwrap();
let block_stream =
ethrpc::current_block::current_block_stream(url, std::time::Duration::from_millis(1_000))
.await
.unwrap();
let onchain = OnchainComponents::deployed(web3.clone()).await;

let verifier = TradeVerifier::new(
Expand Down
1 change: 1 addition & 0 deletions crates/ethrpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ web3 = { workspace = true }
contracts = { path = "../contracts" }
ethcontract = { workspace = true }
tracing = { workspace = true }
url = { workspace = true }

[dev-dependencies]
maplit = { workspace = true }
Expand Down
29 changes: 16 additions & 13 deletions crates/ethrpc/src/current_block/mod.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
use {
crate::Web3,
crate::{http::HttpTransport, Web3, Web3Transport},
anyhow::{anyhow, ensure, Context as _, Result},
futures::StreamExt,
primitive_types::{H256, U256},
std::{fmt::Debug, num::NonZeroU64, sync::Arc, time::Duration},
std::{fmt::Debug, num::NonZeroU64, time::Duration},
tokio::sync::watch,
tokio_stream::wrappers::WatchStream,
tracing::Instrument,
url::Url,
web3::{
helpers,
types::{Block, BlockId, BlockNumber, U64},
Expand Down Expand Up @@ -81,19 +82,23 @@ impl TryFrom<Block<H256>> for BlockInfo {
/// being able to share the result with several consumers. Calling this function
/// again would create a new poller so it is preferable to clone an existing
/// stream instead.
pub async fn current_block_stream(
retriever: Arc<dyn BlockRetrieving>,
poll_interval: Duration,
) -> Result<CurrentBlockStream> {
let first_block = retriever.current_block().await?;
pub async fn current_block_stream(url: Url, poll_interval: Duration) -> Result<CurrentBlockStream> {
// Build new Web3 specifically for the current block stream to avoid batching
// requests together on chains with a very high block frequency.
let web3 = Web3::new(Web3Transport::new(HttpTransport::new(
Default::default(),
url,
"block_stream".into(),
)));
let first_block = web3.current_block().await?;
tracing::debug!(number=%first_block.number, hash=?first_block.hash, "polled block");

let (sender, receiver) = watch::channel(first_block);
let update_future = async move {
let mut previous_block = first_block;
loop {
tokio::time::sleep(poll_interval).await;
let block = match retriever.current_block().await {
let block = match web3.current_block().await {
Ok(block) => block,
Err(err) => {
tracing::warn!("failed to get current block: {:?}", err);
Expand Down Expand Up @@ -309,7 +314,7 @@ fn update_block_metrics(current_block: u64, new_block: u64) {
mod tests {
use {
super::*,
crate::{create_env_test_transport, create_test_transport},
crate::create_env_test_transport,
futures::StreamExt,
tokio::time::{timeout, Duration},
};
Expand All @@ -318,10 +323,8 @@ mod tests {
#[ignore]
async fn mainnet() {
observe::tracing::initialize_reentrant("shared=debug");
let node = std::env::var("NODE_URL").unwrap();
let transport = create_test_transport(&node);
let web3 = Web3::new(transport);
let receiver = current_block_stream(Arc::new(web3), Duration::from_secs(1))
let node = std::env::var("NODE_URL").unwrap().parse().unwrap();
let receiver = current_block_stream(node, Duration::from_secs(1))
.await
.unwrap();
let mut stream = into_stream(receiver);
Expand Down
2 changes: 1 addition & 1 deletion crates/orderbook/src/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ pub async fn run(args: Arguments) {
let current_block_stream = args
.shared
.current_block
.stream(web3.clone())
.stream(args.shared.node_url.clone())
.await
.unwrap();

Expand Down
5 changes: 3 additions & 2 deletions crates/shared/src/current_block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use {
sync::Arc,
time::Duration,
},
url::Url,
};

/// Command line arguments for creating global block stream.
Expand All @@ -34,8 +35,8 @@ impl Arguments {
Arc::new(web3)
}

pub async fn stream(&self, web3: Web3) -> Result<CurrentBlockStream> {
current_block_stream(self.retriever(web3), self.block_stream_poll_interval).await
pub async fn stream(&self, rpc: Url) -> Result<CurrentBlockStream> {
current_block_stream(rpc, self.block_stream_poll_interval).await
}
}

Expand Down
13 changes: 5 additions & 8 deletions crates/shared/src/price_estimation/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -788,14 +788,11 @@ mod tests {

let client = Client::new();

let transport = HttpTransport::new(
client.clone(),
crate::url::join(
&Url::parse("https://mainnet.infura.io/v3/").unwrap(),
&infura_project_id,
),
"main".into(),
let url = crate::url::join(
&Url::parse("https://mainnet.infura.io/v3/").unwrap(),
&infura_project_id,
);
let transport = HttpTransport::new(client.clone(), url.clone(), "main".into());
let web3 = Web3::new(DynTransport::new(transport));
let chain_id = web3.eth().chain_id().await.unwrap().as_u64();
let version = chain_id.to_string();
Expand All @@ -812,7 +809,7 @@ mod tests {
.await
.unwrap()
.pool_fetching,
current_block_stream(Arc::new(web3.clone()), Duration::from_secs(1))
current_block_stream(url, Duration::from_secs(1))
.await
.unwrap(),
)
Expand Down

0 comments on commit 16629c9

Please sign in to comment.