diff --git a/crates/pathfinder/src/sync/checkpoint.rs b/crates/pathfinder/src/sync/checkpoint.rs index acff0a45a0..23890bfa90 100644 --- a/crates/pathfinder/src/sync/checkpoint.rs +++ b/crates/pathfinder/src/sync/checkpoint.rs @@ -1,5 +1,6 @@ #![allow(dead_code, unused_variables)] use std::collections::HashSet; +use std::num::NonZeroUsize; use std::sync::{Arc, RwLock}; use anyhow::Context; @@ -181,7 +182,12 @@ impl Sync { let transaction_stream = self.p2p.clone().transaction_stream( start, stop, - transactions::counts_and_commitments_stream(self.storage.clone(), start, stop), + transactions::counts_and_commitments_stream( + self.storage.clone(), + start, + stop, + NonZeroUsize::new(100).expect("100>0"), + ), ); handle_transaction_stream(transaction_stream, self.storage.clone(), chain_id, start) @@ -206,7 +212,12 @@ impl Sync { let stream = self.p2p.clone().state_diff_stream( start, stop, - state_updates::length_and_commitment_stream(self.storage.clone(), start, stop), + state_updates::length_and_commitment_stream( + self.storage.clone(), + start, + stop, + NonZeroUsize::new(100).expect("100>0"), + ), ); handle_state_diff_stream(stream, self.storage.clone(), start, verify_tree_hashes).await?; @@ -226,7 +237,12 @@ impl Sync { let class_stream = self.p2p.clone().class_stream( start, stop, - class_definitions::declared_class_counts_stream(self.storage.clone(), start, stop), + class_definitions::declared_class_counts_stream( + self.storage.clone(), + start, + stop, + NonZeroUsize::new(100).expect("100>0"), + ), ); handle_class_stream( @@ -253,7 +269,12 @@ impl Sync { let event_stream = self.p2p.clone().event_stream( start, stop, - events::counts_stream(self.storage.clone(), start, stop), + events::counts_stream( + self.storage.clone(), + start, + stop, + NonZeroUsize::new(100).expect("100>0"), + ), ); handle_event_stream(event_stream, self.storage.clone()).await?; diff --git a/crates/pathfinder/src/sync/class_definitions.rs b/crates/pathfinder/src/sync/class_definitions.rs index 6931371fe5..0e87ab43df 100644 --- a/crates/pathfinder/src/sync/class_definitions.rs +++ b/crates/pathfinder/src/sync/class_definitions.rs @@ -89,13 +89,15 @@ pub(super) async fn next_missing( .context("Joining blocking task")? } +/// ### Important +/// +/// Caller must ensure that `start <= stop`. pub(super) fn declared_class_counts_stream( storage: Storage, mut start: BlockNumber, stop: BlockNumber, + batch_size: NonZeroUsize, ) -> impl futures::Stream> { - const BATCH_SIZE: usize = 1000; - let (tx, rx) = mpsc::channel(1); thread::spawn(move || { let mut batch = Vec::::new(); @@ -103,16 +105,18 @@ pub(super) fn declared_class_counts_stream( while start <= stop { if let Some(counts) = batch.pop() { _ = tx.blocking_send(Ok(counts)); + continue; } - let batch_size = NonZeroUsize::new( - BATCH_SIZE.min( + let batch_size = batch_size.min( + NonZeroUsize::new( (stop.get() - start.get() + 1) .try_into() .expect("ptr size is 64bits"), - ), - ) - .expect(">0"); + ) + .expect(">0"), + ); + let storage = storage.clone(); let get = move || { @@ -489,3 +493,60 @@ impl ProcessStage for VerifyClassHashes { } } } + +#[cfg(test)] +mod tests { + use pathfinder_common::StateUpdate; + use pathfinder_storage::fake::Block; + + use super::*; + + #[rstest::rstest] + #[case::request_shorter_than_batch_size(1)] + #[case::request_equal_to_batch_size(2)] + #[case::request_longer_than_batch_size(3)] + #[case::request_equal_to_db_size(5)] + #[case::request_longer_than_db_size(6)] + #[tokio::test] + async fn declared_class_counts_stream(#[case] len: usize) { + const DB_LEN: usize = 5; + let ok_len = len.min(DB_LEN); + let storage = pathfinder_storage::StorageBuilder::in_memory().unwrap(); + let expected = pathfinder_storage::fake::with_n_blocks(&storage, DB_LEN) + .into_iter() + .map(|b| { + let Block { + state_update: + StateUpdate { + declared_cairo_classes, + declared_sierra_classes, + .. + }, + .. + } = b; + declared_cairo_classes.len() + declared_sierra_classes.len() + }) + .collect::>(); + let stream = super::declared_class_counts_stream( + storage.clone(), + BlockNumber::GENESIS, + BlockNumber::GENESIS + len as u64 - 1, + NonZeroUsize::new(2).unwrap(), + ); + + let mut remainder = stream.collect::>().await; + + let actual = remainder + .drain(..ok_len) + .map(|x| x.unwrap()) + .collect::>(); + + assert_eq!(expected[..ok_len], actual); + + if len > DB_LEN { + assert!(remainder.pop().unwrap().is_err()); + } else { + assert!(remainder.is_empty()); + } + } +} diff --git a/crates/pathfinder/src/sync/events.rs b/crates/pathfinder/src/sync/events.rs index 23b4a4e5e5..1590b2a746 100644 --- a/crates/pathfinder/src/sync/events.rs +++ b/crates/pathfinder/src/sync/events.rs @@ -53,9 +53,8 @@ pub(super) fn counts_stream( storage: Storage, mut start: BlockNumber, stop: BlockNumber, + batch_size: NonZeroUsize, ) -> impl futures::Stream> { - const BATCH_SIZE: usize = 1000; - let (tx, rx) = mpsc::channel(1); std::thread::spawn(move || { let mut batch = VecDeque::new(); @@ -63,16 +62,17 @@ pub(super) fn counts_stream( while start <= stop { if let Some(counts) = batch.pop_front() { _ = tx.blocking_send(Ok(counts)); + continue; } - let batch_size = NonZeroUsize::new( - BATCH_SIZE.min( + let batch_size = batch_size.min( + NonZeroUsize::new( (stop.get() - start.get() + 1) .try_into() .expect("ptr size is 64bits"), - ), - ) - .expect(">0"); + ) + .expect(">0"), + ); let storage = storage.clone(); let get = move || { @@ -218,3 +218,57 @@ impl ProcessStage for VerifyCommitment { Ok(events) } } + +#[cfg(test)] +mod tests { + use futures::StreamExt; + use pathfinder_common::StateUpdate; + use pathfinder_storage::fake::Block; + + use super::*; + + #[rstest::rstest] + #[case::request_shorter_than_batch_size(1)] + #[case::request_equal_to_batch_size(2)] + #[case::request_longer_than_batch_size(3)] + #[case::request_equal_to_db_size(5)] + #[case::request_longer_than_db_size(6)] + #[tokio::test] + async fn counts_stream(#[case] len: usize) { + const DB_LEN: usize = 5; + let ok_len = len.min(DB_LEN); + let storage = pathfinder_storage::StorageBuilder::in_memory().unwrap(); + let expected = pathfinder_storage::fake::with_n_blocks(&storage, DB_LEN) + .into_iter() + .map(|b| { + let Block { + transaction_data, .. + } = b; + transaction_data + .iter() + .fold(0, |acc, (_, _, evs)| acc + evs.len()) + }) + .collect::>(); + let stream = super::counts_stream( + storage.clone(), + BlockNumber::GENESIS, + BlockNumber::GENESIS + len as u64 - 1, + NonZeroUsize::new(2).unwrap(), + ); + + let mut remainder = stream.collect::>().await; + + let actual = remainder + .drain(..ok_len) + .map(|x| x.unwrap()) + .collect::>(); + + assert_eq!(expected[..ok_len], actual); + + if len > DB_LEN { + assert!(remainder.pop().unwrap().is_err()); + } else { + assert!(remainder.is_empty()); + } + } +} diff --git a/crates/pathfinder/src/sync/state_updates.rs b/crates/pathfinder/src/sync/state_updates.rs index 67a9d2384e..28c43c7dca 100644 --- a/crates/pathfinder/src/sync/state_updates.rs +++ b/crates/pathfinder/src/sync/state_updates.rs @@ -58,9 +58,8 @@ pub(super) fn length_and_commitment_stream( storage: Storage, mut start: BlockNumber, stop: BlockNumber, + batch_size: NonZeroUsize, ) -> impl futures::Stream> { - const BATCH_SIZE: usize = 1000; - let (tx, rx) = mpsc::channel(1); std::thread::spawn(move || { let mut batch = VecDeque::new(); @@ -68,16 +67,17 @@ pub(super) fn length_and_commitment_stream( while start <= stop { if let Some(counts) = batch.pop_front() { _ = tx.blocking_send(Ok(counts)); + continue; } - let batch_size = NonZeroUsize::new( - BATCH_SIZE.min( + let batch_size = batch_size.min( + NonZeroUsize::new( (stop.get() - start.get() + 1) .try_into() .expect("ptr size is 64bits"), - ), - ) - .expect(">0"); + ) + .expect(">0"), + ); let storage = storage.clone(); let get = move || { @@ -196,3 +196,65 @@ impl ProcessStage for UpdateStarknetState { Ok(tail) } } + +#[cfg(test)] +mod tests { + use futures::StreamExt; + use pathfinder_common::{SignedBlockHeader, StateUpdate}; + use pathfinder_storage::fake::Block; + + use super::*; + + #[rstest::rstest] + #[case::request_shorter_than_batch_size(1)] + #[case::request_equal_to_batch_size(2)] + #[case::request_longer_than_batch_size(3)] + #[case::request_equal_to_db_size(5)] + #[case::request_longer_than_db_size(6)] + #[tokio::test] + async fn length_and_commitment_stream(#[case] len: usize) { + const DB_LEN: usize = 5; + let ok_len = len.min(DB_LEN); + let storage = pathfinder_storage::StorageBuilder::in_memory().unwrap(); + let expected = pathfinder_storage::fake::with_n_blocks(&storage, DB_LEN) + .into_iter() + .map(|b| { + let Block { + header: + SignedBlockHeader { + header: + BlockHeader { + state_diff_commitment, + state_diff_length, + .. + }, + .. + }, + .. + } = b; + (state_diff_length as usize, state_diff_commitment) + }) + .collect::>(); + let stream = super::length_and_commitment_stream( + storage.clone(), + BlockNumber::GENESIS, + BlockNumber::GENESIS + len as u64 - 1, + NonZeroUsize::new(2).unwrap(), + ); + + let mut remainder = stream.collect::>().await; + + let actual = remainder + .drain(..ok_len) + .map(|x| x.unwrap()) + .collect::>(); + + assert_eq!(expected[..ok_len], actual); + + if len > DB_LEN { + assert!(remainder.pop().unwrap().is_err()); + } else { + assert!(remainder.is_empty()); + } + } +} diff --git a/crates/pathfinder/src/sync/transactions.rs b/crates/pathfinder/src/sync/transactions.rs index e6628d29b3..f4a51ee22b 100644 --- a/crates/pathfinder/src/sync/transactions.rs +++ b/crates/pathfinder/src/sync/transactions.rs @@ -56,9 +56,8 @@ pub(super) fn counts_and_commitments_stream( storage: Storage, mut start: BlockNumber, stop: BlockNumber, + batch_size: NonZeroUsize, ) -> impl futures::Stream> { - const BATCH_SIZE: usize = 1000; - let (tx, rx) = mpsc::channel(1); std::thread::spawn(move || { let mut batch = VecDeque::new(); @@ -66,16 +65,17 @@ pub(super) fn counts_and_commitments_stream( while start <= stop { if let Some(counts) = batch.pop_front() { _ = tx.blocking_send(Ok(counts)); + continue; } - let batch_size = NonZeroUsize::new( - BATCH_SIZE.min( + let batch_size = batch_size.min( + NonZeroUsize::new( (stop.get() - start.get() + 1) .try_into() .expect("ptr size is 64bits"), - ), - ) - .expect(">0"); + ) + .expect(">0"), + ); let storage = storage.clone(); let get = move || { @@ -83,7 +83,8 @@ pub(super) fn counts_and_commitments_stream( .connection() .context("Creating database connection")?; let db = db.transaction().context("Creating database transaction")?; - db.transaction_counts_and_commitments(start.into(), batch_size) + batch = db + .transaction_counts_and_commitments(start.into(), batch_size) .context("Querying transaction counts")?; anyhow::ensure!( @@ -215,3 +216,65 @@ impl ProcessStage for Store { Ok(tail) } } + +#[cfg(test)] +mod tests { + use futures::StreamExt; + use pathfinder_common::{SignedBlockHeader, StateUpdate}; + use pathfinder_storage::fake::Block; + + use super::*; + + #[rstest::rstest] + #[case::request_shorter_than_batch_size(1)] + #[case::request_equal_to_batch_size(2)] + #[case::request_longer_than_batch_size(3)] + #[case::request_equal_to_db_size(5)] + #[case::request_longer_than_db_size(6)] + #[tokio::test] + async fn length_and_commitment_stream(#[case] len: usize) { + const DB_LEN: usize = 5; + let ok_len = len.min(DB_LEN); + let storage = pathfinder_storage::StorageBuilder::in_memory().unwrap(); + let expected = pathfinder_storage::fake::with_n_blocks(&storage, DB_LEN) + .into_iter() + .map(|b| { + let Block { + header: + SignedBlockHeader { + header: + BlockHeader { + transaction_commitment, + transaction_count, + .. + }, + .. + }, + .. + } = b; + (transaction_count, transaction_commitment) + }) + .collect::>(); + let stream = super::counts_and_commitments_stream( + storage.clone(), + BlockNumber::GENESIS, + BlockNumber::GENESIS + len as u64 - 1, + NonZeroUsize::new(2).unwrap(), + ); + + let mut remainder = stream.collect::>().await; + + let actual = remainder + .drain(..ok_len) + .map(|x| x.unwrap()) + .collect::>(); + + assert_eq!(expected[..ok_len], actual); + + if len > DB_LEN { + assert!(remainder.pop().unwrap().is_err()); + } else { + assert!(remainder.is_empty()); + } + } +}