diff --git a/Cargo.lock b/Cargo.lock index 397172bbbd..2c0d881f53 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -596,28 +596,6 @@ dependencies = [ "wasm-bindgen-futures", ] -[[package]] -name = "async-stream" -version = "0.3.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cd56dd203fef61ac097dd65721a419ddccb106b2d2b70ba60a6b529f03961a51" -dependencies = [ - "async-stream-impl", - "futures-core", - "pin-project-lite", -] - -[[package]] -name = "async-stream-impl" -version = "0.3.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193" -dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.72", -] - [[package]] name = "async-task" version = "4.7.1" @@ -6302,7 +6280,6 @@ name = "p2p" version = "0.14.1" dependencies = [ "anyhow", - "async-stream", "async-trait", "base64 0.13.1", "clap", @@ -6503,7 +6480,6 @@ version = "0.14.1" dependencies = [ "anyhow", "assert_matches", - "async-stream", "async-trait", "axum 0.7.5", "base64 0.13.1", diff --git a/Cargo.toml b/Cargo.toml index a2b43e44bd..213b6266b9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -48,7 +48,6 @@ authors = ["Equilibrium Labs "] anyhow = "1.0.75" ark-ff = "0.4.2" assert_matches = "1.5.0" -async-stream = "0.3.5" async-trait = "0.1.73" axum = "0.7.5" base64 = "0.13.1" diff --git a/crates/p2p/Cargo.toml b/crates/p2p/Cargo.toml index bf1b47da9e..7d40bd4f61 100644 --- a/crates/p2p/Cargo.toml +++ b/crates/p2p/Cargo.toml @@ -10,7 +10,6 @@ rust-version = { workspace = true } [dependencies] anyhow = { workspace = true } -async-stream = { workspace = true } async-trait = { workspace = true } base64 = { workspace = true } clap = { workspace = true, features = ["derive", "env", "wrap_help"] } diff --git a/crates/pathfinder/Cargo.toml b/crates/pathfinder/Cargo.toml index 25f964f3c8..90ea9cb0d4 100644 --- a/crates/pathfinder/Cargo.toml +++ b/crates/pathfinder/Cargo.toml @@ -17,7 +17,6 @@ p2p = [] [dependencies] anyhow = { workspace = true } -async-stream = { workspace = true } async-trait = { workspace = true } axum = { workspace = true } base64 = { workspace = true } diff --git a/crates/pathfinder/src/sync/checkpoint.rs b/crates/pathfinder/src/sync/checkpoint.rs index acff0a45a0..23890bfa90 100644 --- a/crates/pathfinder/src/sync/checkpoint.rs +++ b/crates/pathfinder/src/sync/checkpoint.rs @@ -1,5 +1,6 @@ #![allow(dead_code, unused_variables)] use std::collections::HashSet; +use std::num::NonZeroUsize; use std::sync::{Arc, RwLock}; use anyhow::Context; @@ -181,7 +182,12 @@ impl Sync { let transaction_stream = self.p2p.clone().transaction_stream( start, stop, - transactions::counts_and_commitments_stream(self.storage.clone(), start, stop), + transactions::counts_and_commitments_stream( + self.storage.clone(), + start, + stop, + NonZeroUsize::new(100).expect("100>0"), + ), ); handle_transaction_stream(transaction_stream, self.storage.clone(), chain_id, start) @@ -206,7 +212,12 @@ impl Sync { let stream = self.p2p.clone().state_diff_stream( start, stop, - state_updates::length_and_commitment_stream(self.storage.clone(), start, stop), + state_updates::length_and_commitment_stream( + self.storage.clone(), + start, + stop, + NonZeroUsize::new(100).expect("100>0"), + ), ); handle_state_diff_stream(stream, self.storage.clone(), start, verify_tree_hashes).await?; @@ -226,7 +237,12 @@ impl Sync { let class_stream = self.p2p.clone().class_stream( start, stop, - class_definitions::declared_class_counts_stream(self.storage.clone(), start, stop), + class_definitions::declared_class_counts_stream( + self.storage.clone(), + start, + stop, + NonZeroUsize::new(100).expect("100>0"), + ), ); handle_class_stream( @@ -253,7 +269,12 @@ impl Sync { let event_stream = self.p2p.clone().event_stream( start, stop, - events::counts_stream(self.storage.clone(), start, stop), + events::counts_stream( + self.storage.clone(), + start, + stop, + NonZeroUsize::new(100).expect("100>0"), + ), ); handle_event_stream(event_stream, self.storage.clone()).await?; diff --git a/crates/pathfinder/src/sync/class_definitions.rs b/crates/pathfinder/src/sync/class_definitions.rs index e9785aa988..0e87ab43df 100644 --- a/crates/pathfinder/src/sync/class_definitions.rs +++ b/crates/pathfinder/src/sync/class_definitions.rs @@ -1,6 +1,6 @@ use std::collections::HashSet; use std::num::NonZeroUsize; -use std::sync::Arc; +use std::thread; use anyhow::Context; use futures::pin_mut; @@ -19,9 +19,10 @@ use starknet_gateway_types::class_hash::from_parts::{ compute_sierra_class_hash, }; use starknet_gateway_types::reply::call; -use tokio::sync::mpsc::Receiver; +use tokio::sync::mpsc::{self, Receiver}; use tokio::sync::Mutex; use tokio::task::spawn_blocking; +use tokio_stream::wrappers::ReceiverStream; use crate::sync::error::{SyncError, SyncError2}; use crate::sync::stream::ProcessStage; @@ -88,57 +89,70 @@ pub(super) async fn next_missing( .context("Joining blocking task")? } +/// ### Important +/// +/// Caller must ensure that `start <= stop`. pub(super) fn declared_class_counts_stream( storage: Storage, mut start: BlockNumber, stop: BlockNumber, + batch_size: NonZeroUsize, ) -> impl futures::Stream> { - const BATCH_SIZE: usize = 1000; - - async_stream::try_stream! { + let (tx, rx) = mpsc::channel(1); + thread::spawn(move || { let mut batch = Vec::::new(); while start <= stop { if let Some(counts) = batch.pop() { - yield counts; + _ = tx.blocking_send(Ok(counts)); continue; } - let batch_size = NonZeroUsize::new( - BATCH_SIZE.min( + let batch_size = batch_size.min( + NonZeroUsize::new( (stop.get() - start.get() + 1) .try_into() .expect("ptr size is 64bits"), - ), - ) - .expect(">0"); + ) + .expect(">0"), + ); + let storage = storage.clone(); - batch = tokio::task::spawn_blocking(move || { + let get = move || { let mut db = storage .connection() .context("Creating database connection")?; let db = db.transaction().context("Creating database transaction")?; - db.declared_classes_counts(start, batch_size) - .context("Querying declared classes counts") - }) - .await - .context("Joining blocking task")??; - - if batch.is_empty() { - Err(anyhow::anyhow!( - "No declared classes counts found for range: start {start}, batch_size {batch_size}" - ))?; - break; - } + let batch = db + .declared_classes_counts(start, batch_size) + .context("Querying declared classes counts")?; + + anyhow::ensure!( + !batch.is_empty(), + "No class counts found: start {start}, batch_size {batch_size}" + ); + + Ok(batch) + }; + + batch = match get() { + Ok(x) => x, + Err(e) => { + _ = tx.blocking_send(Err(e)); + return; + } + }; start += batch.len().try_into().expect("ptr size is 64bits"); } while let Some(counts) = batch.pop() { - yield counts; + _ = tx.blocking_send(Ok(counts)); } - } + }); + + ReceiverStream::new(rx) } pub struct VerifyLayout; @@ -297,7 +311,7 @@ impl ExpectedDeclarationsSource { } pub fn spawn(self) -> anyhow::Result> { - let (tx, rx) = tokio::sync::mpsc::channel(1); + let (tx, rx) = mpsc::channel(1); let Self { mut db_connection, mut start, @@ -479,3 +493,60 @@ impl ProcessStage for VerifyClassHashes { } } } + +#[cfg(test)] +mod tests { + use pathfinder_common::StateUpdate; + use pathfinder_storage::fake::Block; + + use super::*; + + #[rstest::rstest] + #[case::request_shorter_than_batch_size(1)] + #[case::request_equal_to_batch_size(2)] + #[case::request_longer_than_batch_size(3)] + #[case::request_equal_to_db_size(5)] + #[case::request_longer_than_db_size(6)] + #[tokio::test] + async fn declared_class_counts_stream(#[case] len: usize) { + const DB_LEN: usize = 5; + let ok_len = len.min(DB_LEN); + let storage = pathfinder_storage::StorageBuilder::in_memory().unwrap(); + let expected = pathfinder_storage::fake::with_n_blocks(&storage, DB_LEN) + .into_iter() + .map(|b| { + let Block { + state_update: + StateUpdate { + declared_cairo_classes, + declared_sierra_classes, + .. + }, + .. + } = b; + declared_cairo_classes.len() + declared_sierra_classes.len() + }) + .collect::>(); + let stream = super::declared_class_counts_stream( + storage.clone(), + BlockNumber::GENESIS, + BlockNumber::GENESIS + len as u64 - 1, + NonZeroUsize::new(2).unwrap(), + ); + + let mut remainder = stream.collect::>().await; + + let actual = remainder + .drain(..ok_len) + .map(|x| x.unwrap()) + .collect::>(); + + assert_eq!(expected[..ok_len], actual); + + if len > DB_LEN { + assert!(remainder.pop().unwrap().is_err()); + } else { + assert!(remainder.is_empty()); + } + } +} diff --git a/crates/pathfinder/src/sync/events.rs b/crates/pathfinder/src/sync/events.rs index 19ac941da2..1590b2a746 100644 --- a/crates/pathfinder/src/sync/events.rs +++ b/crates/pathfinder/src/sync/events.rs @@ -15,7 +15,9 @@ use pathfinder_common::{ TransactionHash, }; use pathfinder_storage::Storage; +use tokio::sync::mpsc; use tokio::task::spawn_blocking; +use tokio_stream::wrappers::ReceiverStream; use super::error::SyncError; use crate::state::block_hash::calculate_event_commitment; @@ -51,53 +53,62 @@ pub(super) fn counts_stream( storage: Storage, mut start: BlockNumber, stop: BlockNumber, + batch_size: NonZeroUsize, ) -> impl futures::Stream> { - const BATCH_SIZE: usize = 1000; - - async_stream::try_stream! { + let (tx, rx) = mpsc::channel(1); + std::thread::spawn(move || { let mut batch = VecDeque::new(); while start <= stop { if let Some(counts) = batch.pop_front() { - yield counts; + _ = tx.blocking_send(Ok(counts)); continue; } - let batch_size = NonZeroUsize::new( - BATCH_SIZE.min( + let batch_size = batch_size.min( + NonZeroUsize::new( (stop.get() - start.get() + 1) .try_into() .expect("ptr size is 64bits"), - ), - ) - .expect(">0"); + ) + .expect(">0"), + ); let storage = storage.clone(); - batch = tokio::task::spawn_blocking(move || { + let get = move || { let mut db = storage .connection() .context("Creating database connection")?; let db = db.transaction().context("Creating database transaction")?; - db.event_counts(start.into(), batch_size) - .context("Querying event counts") - }) - .await - .context("Joining blocking task")??; - - if batch.is_empty() { - Err(anyhow::anyhow!( - "No event counts found for range: start {start}, batch_size {batch_size}" - ))?; - break; - } + let batch = db + .event_counts(start.into(), batch_size) + .context("Querying event counts")?; + + anyhow::ensure!( + !batch.is_empty(), + "No event counts found: start {start}, batch_size {batch_size}" + ); + + Ok(batch) + }; + + batch = match get() { + Ok(x) => x, + Err(e) => { + _ = tx.blocking_send(Err(e)); + return; + } + }; start += batch.len().try_into().expect("ptr size is 64bits"); } while let Some(counts) = batch.pop_front() { - yield counts; + _ = tx.blocking_send(Ok(counts)); } - } + }); + + ReceiverStream::new(rx) } pub(super) async fn verify_commitment( @@ -207,3 +218,57 @@ impl ProcessStage for VerifyCommitment { Ok(events) } } + +#[cfg(test)] +mod tests { + use futures::StreamExt; + use pathfinder_common::StateUpdate; + use pathfinder_storage::fake::Block; + + use super::*; + + #[rstest::rstest] + #[case::request_shorter_than_batch_size(1)] + #[case::request_equal_to_batch_size(2)] + #[case::request_longer_than_batch_size(3)] + #[case::request_equal_to_db_size(5)] + #[case::request_longer_than_db_size(6)] + #[tokio::test] + async fn counts_stream(#[case] len: usize) { + const DB_LEN: usize = 5; + let ok_len = len.min(DB_LEN); + let storage = pathfinder_storage::StorageBuilder::in_memory().unwrap(); + let expected = pathfinder_storage::fake::with_n_blocks(&storage, DB_LEN) + .into_iter() + .map(|b| { + let Block { + transaction_data, .. + } = b; + transaction_data + .iter() + .fold(0, |acc, (_, _, evs)| acc + evs.len()) + }) + .collect::>(); + let stream = super::counts_stream( + storage.clone(), + BlockNumber::GENESIS, + BlockNumber::GENESIS + len as u64 - 1, + NonZeroUsize::new(2).unwrap(), + ); + + let mut remainder = stream.collect::>().await; + + let actual = remainder + .drain(..ok_len) + .map(|x| x.unwrap()) + .collect::>(); + + assert_eq!(expected[..ok_len], actual); + + if len > DB_LEN { + assert!(remainder.pop().unwrap().is_err()); + } else { + assert!(remainder.is_empty()); + } + } +} diff --git a/crates/pathfinder/src/sync/state_updates.rs b/crates/pathfinder/src/sync/state_updates.rs index 5b53554214..28c43c7dca 100644 --- a/crates/pathfinder/src/sync/state_updates.rs +++ b/crates/pathfinder/src/sync/state_updates.rs @@ -19,7 +19,9 @@ use pathfinder_common::{ use pathfinder_merkle_tree::contract_state::{update_contract_state, ContractStateUpdateResult}; use pathfinder_merkle_tree::StorageCommitmentTree; use pathfinder_storage::{Storage, TrieUpdate}; +use tokio::sync::mpsc; use tokio::task::spawn_blocking; +use tokio_stream::wrappers::ReceiverStream; use crate::state::{update_starknet_state, StarknetStateUpdate}; use crate::sync::error::{SyncError, SyncError2}; @@ -56,53 +58,62 @@ pub(super) fn length_and_commitment_stream( storage: Storage, mut start: BlockNumber, stop: BlockNumber, + batch_size: NonZeroUsize, ) -> impl futures::Stream> { - const BATCH_SIZE: usize = 1000; - - async_stream::try_stream! { + let (tx, rx) = mpsc::channel(1); + std::thread::spawn(move || { let mut batch = VecDeque::new(); while start <= stop { if let Some(counts) = batch.pop_front() { - yield counts; + _ = tx.blocking_send(Ok(counts)); continue; } - let batch_size = NonZeroUsize::new( - BATCH_SIZE.min( + let batch_size = batch_size.min( + NonZeroUsize::new( (stop.get() - start.get() + 1) .try_into() .expect("ptr size is 64bits"), - ), - ) - .expect(">0"); + ) + .expect(">0"), + ); let storage = storage.clone(); - batch = tokio::task::spawn_blocking(move || { + let get = move || { let mut db = storage .connection() .context("Creating database connection")?; let db = db.transaction().context("Creating database transaction")?; - db.state_diff_lengths_and_commitments(start, batch_size) - .context("Querying state update counts") - }) - .await - .context("Joining blocking task")??; - - if batch.is_empty() { - Err(anyhow::anyhow!( - "No state update counts found for range: start {start}, batch_size {batch_size}" - ))?; - break; - } + let batch = db + .state_diff_lengths_and_commitments(start, batch_size) + .context("Querying state update counts")?; + + anyhow::ensure!( + !batch.is_empty(), + "No state update counts found: start {start}, batch_size {batch_size}" + ); + + Ok(batch) + }; + + batch = match get() { + Ok(x) => x, + Err(e) => { + _ = tx.blocking_send(Err(e)); + return; + } + }; start += batch.len().try_into().expect("ptr size is 64bits"); } while let Some(counts) = batch.pop_front() { - yield counts; + _ = tx.blocking_send(Ok(counts)); } - } + }); + + ReceiverStream::new(rx) } pub struct VerifyCommitment; @@ -185,3 +196,65 @@ impl ProcessStage for UpdateStarknetState { Ok(tail) } } + +#[cfg(test)] +mod tests { + use futures::StreamExt; + use pathfinder_common::{SignedBlockHeader, StateUpdate}; + use pathfinder_storage::fake::Block; + + use super::*; + + #[rstest::rstest] + #[case::request_shorter_than_batch_size(1)] + #[case::request_equal_to_batch_size(2)] + #[case::request_longer_than_batch_size(3)] + #[case::request_equal_to_db_size(5)] + #[case::request_longer_than_db_size(6)] + #[tokio::test] + async fn length_and_commitment_stream(#[case] len: usize) { + const DB_LEN: usize = 5; + let ok_len = len.min(DB_LEN); + let storage = pathfinder_storage::StorageBuilder::in_memory().unwrap(); + let expected = pathfinder_storage::fake::with_n_blocks(&storage, DB_LEN) + .into_iter() + .map(|b| { + let Block { + header: + SignedBlockHeader { + header: + BlockHeader { + state_diff_commitment, + state_diff_length, + .. + }, + .. + }, + .. + } = b; + (state_diff_length as usize, state_diff_commitment) + }) + .collect::>(); + let stream = super::length_and_commitment_stream( + storage.clone(), + BlockNumber::GENESIS, + BlockNumber::GENESIS + len as u64 - 1, + NonZeroUsize::new(2).unwrap(), + ); + + let mut remainder = stream.collect::>().await; + + let actual = remainder + .drain(..ok_len) + .map(|x| x.unwrap()) + .collect::>(); + + assert_eq!(expected[..ok_len], actual); + + if len > DB_LEN { + assert!(remainder.pop().unwrap().is_err()); + } else { + assert!(remainder.is_empty()); + } + } +} diff --git a/crates/pathfinder/src/sync/transactions.rs b/crates/pathfinder/src/sync/transactions.rs index 6c2f1b6adb..f4a51ee22b 100644 --- a/crates/pathfinder/src/sync/transactions.rs +++ b/crates/pathfinder/src/sync/transactions.rs @@ -15,6 +15,8 @@ use pathfinder_common::{ TransactionHash, }; use pathfinder_storage::Storage; +use tokio::sync::mpsc; +use tokio_stream::wrappers::ReceiverStream; use super::error::{SyncError, SyncError2}; use super::stream::ProcessStage; @@ -54,53 +56,62 @@ pub(super) fn counts_and_commitments_stream( storage: Storage, mut start: BlockNumber, stop: BlockNumber, + batch_size: NonZeroUsize, ) -> impl futures::Stream> { - const BATCH_SIZE: usize = 1000; - - async_stream::try_stream! { + let (tx, rx) = mpsc::channel(1); + std::thread::spawn(move || { let mut batch = VecDeque::new(); while start <= stop { if let Some(counts) = batch.pop_front() { - yield counts; + _ = tx.blocking_send(Ok(counts)); continue; } - let batch_size = NonZeroUsize::new( - BATCH_SIZE.min( + let batch_size = batch_size.min( + NonZeroUsize::new( (stop.get() - start.get() + 1) .try_into() .expect("ptr size is 64bits"), - ), - ) - .expect(">0"); + ) + .expect(">0"), + ); let storage = storage.clone(); - batch = tokio::task::spawn_blocking(move || { + let get = move || { let mut db = storage .connection() .context("Creating database connection")?; let db = db.transaction().context("Creating database transaction")?; - db.transaction_counts_and_commitments(start.into(), batch_size) - .context("Querying transaction counts") - }) - .await - .context("Joining blocking task")??; - - if batch.is_empty() { - Err(anyhow::anyhow!( - "No transaction counts found for range: start {start}, batch_size: {batch_size}" - ))?; - break; - } + batch = db + .transaction_counts_and_commitments(start.into(), batch_size) + .context("Querying transaction counts")?; + + anyhow::ensure!( + !batch.is_empty(), + "No transaction counts found: start {start}, batch_size {batch_size}" + ); + + Ok(batch) + }; + + batch = match get() { + Ok(x) => x, + Err(e) => { + _ = tx.blocking_send(Err(e)); + return; + } + }; start += batch.len().try_into().expect("ptr size is 64bits"); } while let Some(counts) = batch.pop_front() { - yield counts; + _ = tx.blocking_send(Ok(counts)); } - } + }); + + ReceiverStream::new(rx) } pub struct CalculateHashes(pub ChainId); @@ -205,3 +216,65 @@ impl ProcessStage for Store { Ok(tail) } } + +#[cfg(test)] +mod tests { + use futures::StreamExt; + use pathfinder_common::{SignedBlockHeader, StateUpdate}; + use pathfinder_storage::fake::Block; + + use super::*; + + #[rstest::rstest] + #[case::request_shorter_than_batch_size(1)] + #[case::request_equal_to_batch_size(2)] + #[case::request_longer_than_batch_size(3)] + #[case::request_equal_to_db_size(5)] + #[case::request_longer_than_db_size(6)] + #[tokio::test] + async fn length_and_commitment_stream(#[case] len: usize) { + const DB_LEN: usize = 5; + let ok_len = len.min(DB_LEN); + let storage = pathfinder_storage::StorageBuilder::in_memory().unwrap(); + let expected = pathfinder_storage::fake::with_n_blocks(&storage, DB_LEN) + .into_iter() + .map(|b| { + let Block { + header: + SignedBlockHeader { + header: + BlockHeader { + transaction_commitment, + transaction_count, + .. + }, + .. + }, + .. + } = b; + (transaction_count, transaction_commitment) + }) + .collect::>(); + let stream = super::counts_and_commitments_stream( + storage.clone(), + BlockNumber::GENESIS, + BlockNumber::GENESIS + len as u64 - 1, + NonZeroUsize::new(2).unwrap(), + ); + + let mut remainder = stream.collect::>().await; + + let actual = remainder + .drain(..ok_len) + .map(|x| x.unwrap()) + .collect::>(); + + assert_eq!(expected[..ok_len], actual); + + if len > DB_LEN { + assert!(remainder.pop().unwrap().is_err()); + } else { + assert!(remainder.is_empty()); + } + } +}