Skip to content

Commit

Permalink
fixup: fix regressions, add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
CHr15F0x committed Aug 13, 2024
1 parent 6f77849 commit 0ce7f10
Show file tree
Hide file tree
Showing 5 changed files with 294 additions and 33 deletions.
29 changes: 25 additions & 4 deletions crates/pathfinder/src/sync/checkpoint.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#![allow(dead_code, unused_variables)]
use std::collections::HashSet;
use std::num::NonZeroUsize;
use std::sync::{Arc, RwLock};

use anyhow::Context;
Expand Down Expand Up @@ -181,7 +182,12 @@ impl Sync {
let transaction_stream = self.p2p.clone().transaction_stream(
start,
stop,
transactions::counts_and_commitments_stream(self.storage.clone(), start, stop),
transactions::counts_and_commitments_stream(
self.storage.clone(),
start,
stop,
NonZeroUsize::new(100).expect("100>0"),
),
);

handle_transaction_stream(transaction_stream, self.storage.clone(), chain_id, start)
Expand All @@ -206,7 +212,12 @@ impl Sync {
let stream = self.p2p.clone().state_diff_stream(
start,
stop,
state_updates::length_and_commitment_stream(self.storage.clone(), start, stop),
state_updates::length_and_commitment_stream(
self.storage.clone(),
start,
stop,
NonZeroUsize::new(100).expect("100>0"),
),
);

handle_state_diff_stream(stream, self.storage.clone(), start, verify_tree_hashes).await?;
Expand All @@ -226,7 +237,12 @@ impl Sync {
let class_stream = self.p2p.clone().class_stream(
start,
stop,
class_definitions::declared_class_counts_stream(self.storage.clone(), start, stop),
class_definitions::declared_class_counts_stream(
self.storage.clone(),
start,
stop,
NonZeroUsize::new(100).expect("100>0"),
),
);

handle_class_stream(
Expand All @@ -253,7 +269,12 @@ impl Sync {
let event_stream = self.p2p.clone().event_stream(
start,
stop,
events::counts_stream(self.storage.clone(), start, stop),
events::counts_stream(
self.storage.clone(),
start,
stop,
NonZeroUsize::new(100).expect("100>0"),
),
);

handle_event_stream(event_stream, self.storage.clone()).await?;
Expand Down
75 changes: 68 additions & 7 deletions crates/pathfinder/src/sync/class_definitions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,30 +89,34 @@ pub(super) async fn next_missing(
.context("Joining blocking task")?
}

/// ### Important
///
/// Caller must ensure that `start <= stop`.
pub(super) fn declared_class_counts_stream(
storage: Storage,
mut start: BlockNumber,
stop: BlockNumber,
batch_size: NonZeroUsize,
) -> impl futures::Stream<Item = anyhow::Result<usize>> {
const BATCH_SIZE: usize = 1000;

let (tx, rx) = mpsc::channel(1);
thread::spawn(move || {
let mut batch = Vec::<usize>::new();

while start <= stop {
if let Some(counts) = batch.pop() {
_ = tx.blocking_send(Ok(counts));
continue;
}

let batch_size = NonZeroUsize::new(
BATCH_SIZE.min(
let batch_size = batch_size.min(
NonZeroUsize::new(
(stop.get() - start.get() + 1)
.try_into()
.expect("ptr size is 64bits"),
),
)
.expect(">0");
)
.expect(">0"),
);

let storage = storage.clone();

let get = move || {
Expand Down Expand Up @@ -489,3 +493,60 @@ impl ProcessStage for VerifyClassHashes {
}
}
}

#[cfg(test)]
mod tests {
use pathfinder_common::StateUpdate;
use pathfinder_storage::fake::Block;

use super::*;

#[rstest::rstest]
#[case::request_shorter_than_batch_size(1)]
#[case::request_equal_to_batch_size(2)]
#[case::request_longer_than_batch_size(3)]
#[case::request_equal_to_db_size(5)]
#[case::request_longer_than_db_size(6)]
#[tokio::test]
async fn declared_class_counts_stream(#[case] len: usize) {
const DB_LEN: usize = 5;
let ok_len = len.min(DB_LEN);
let storage = pathfinder_storage::StorageBuilder::in_memory().unwrap();
let expected = pathfinder_storage::fake::with_n_blocks(&storage, DB_LEN)
.into_iter()
.map(|b| {
let Block {
state_update:
StateUpdate {
declared_cairo_classes,
declared_sierra_classes,
..
},
..
} = b;
declared_cairo_classes.len() + declared_sierra_classes.len()
})
.collect::<Vec<_>>();
let stream = super::declared_class_counts_stream(
storage.clone(),
BlockNumber::GENESIS,
BlockNumber::GENESIS + len as u64 - 1,
NonZeroUsize::new(2).unwrap(),
);

let mut remainder = stream.collect::<Vec<_>>().await;

let actual = remainder
.drain(..ok_len)
.map(|x| x.unwrap())
.collect::<Vec<_>>();

assert_eq!(expected[..ok_len], actual);

if len > DB_LEN {
assert!(remainder.pop().unwrap().is_err());
} else {
assert!(remainder.is_empty());
}
}
}
68 changes: 61 additions & 7 deletions crates/pathfinder/src/sync/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,26 +53,26 @@ pub(super) fn counts_stream(
storage: Storage,
mut start: BlockNumber,
stop: BlockNumber,
batch_size: NonZeroUsize,
) -> impl futures::Stream<Item = anyhow::Result<usize>> {
const BATCH_SIZE: usize = 1000;

let (tx, rx) = mpsc::channel(1);
std::thread::spawn(move || {
let mut batch = VecDeque::new();

while start <= stop {
if let Some(counts) = batch.pop_front() {
_ = tx.blocking_send(Ok(counts));
continue;
}

let batch_size = NonZeroUsize::new(
BATCH_SIZE.min(
let batch_size = batch_size.min(
NonZeroUsize::new(
(stop.get() - start.get() + 1)
.try_into()
.expect("ptr size is 64bits"),
),
)
.expect(">0");
)
.expect(">0"),
);
let storage = storage.clone();

let get = move || {
Expand Down Expand Up @@ -218,3 +218,57 @@ impl ProcessStage for VerifyCommitment {
Ok(events)
}
}

#[cfg(test)]
mod tests {
use futures::StreamExt;
use pathfinder_common::StateUpdate;
use pathfinder_storage::fake::Block;

use super::*;

#[rstest::rstest]
#[case::request_shorter_than_batch_size(1)]
#[case::request_equal_to_batch_size(2)]
#[case::request_longer_than_batch_size(3)]
#[case::request_equal_to_db_size(5)]
#[case::request_longer_than_db_size(6)]
#[tokio::test]
async fn counts_stream(#[case] len: usize) {
const DB_LEN: usize = 5;
let ok_len = len.min(DB_LEN);
let storage = pathfinder_storage::StorageBuilder::in_memory().unwrap();
let expected = pathfinder_storage::fake::with_n_blocks(&storage, DB_LEN)
.into_iter()
.map(|b| {
let Block {
transaction_data, ..
} = b;
transaction_data
.iter()
.fold(0, |acc, (_, _, evs)| acc + evs.len())
})
.collect::<Vec<_>>();
let stream = super::counts_stream(
storage.clone(),
BlockNumber::GENESIS,
BlockNumber::GENESIS + len as u64 - 1,
NonZeroUsize::new(2).unwrap(),
);

let mut remainder = stream.collect::<Vec<_>>().await;

let actual = remainder
.drain(..ok_len)
.map(|x| x.unwrap())
.collect::<Vec<_>>();

assert_eq!(expected[..ok_len], actual);

if len > DB_LEN {
assert!(remainder.pop().unwrap().is_err());
} else {
assert!(remainder.is_empty());
}
}
}
76 changes: 69 additions & 7 deletions crates/pathfinder/src/sync/state_updates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,26 +58,26 @@ pub(super) fn length_and_commitment_stream(
storage: Storage,
mut start: BlockNumber,
stop: BlockNumber,
batch_size: NonZeroUsize,
) -> impl futures::Stream<Item = anyhow::Result<(usize, StateDiffCommitment)>> {
const BATCH_SIZE: usize = 1000;

let (tx, rx) = mpsc::channel(1);
std::thread::spawn(move || {
let mut batch = VecDeque::new();

while start <= stop {
if let Some(counts) = batch.pop_front() {
_ = tx.blocking_send(Ok(counts));
continue;
}

let batch_size = NonZeroUsize::new(
BATCH_SIZE.min(
let batch_size = batch_size.min(
NonZeroUsize::new(
(stop.get() - start.get() + 1)
.try_into()
.expect("ptr size is 64bits"),
),
)
.expect(">0");
)
.expect(">0"),
);
let storage = storage.clone();

let get = move || {
Expand Down Expand Up @@ -196,3 +196,65 @@ impl ProcessStage for UpdateStarknetState {
Ok(tail)
}
}

#[cfg(test)]
mod tests {
use futures::StreamExt;
use pathfinder_common::{SignedBlockHeader, StateUpdate};
use pathfinder_storage::fake::Block;

use super::*;

#[rstest::rstest]
#[case::request_shorter_than_batch_size(1)]
#[case::request_equal_to_batch_size(2)]
#[case::request_longer_than_batch_size(3)]
#[case::request_equal_to_db_size(5)]
#[case::request_longer_than_db_size(6)]
#[tokio::test]
async fn length_and_commitment_stream(#[case] len: usize) {
const DB_LEN: usize = 5;
let ok_len = len.min(DB_LEN);
let storage = pathfinder_storage::StorageBuilder::in_memory().unwrap();
let expected = pathfinder_storage::fake::with_n_blocks(&storage, DB_LEN)
.into_iter()
.map(|b| {
let Block {
header:
SignedBlockHeader {
header:
BlockHeader {
state_diff_commitment,
state_diff_length,
..
},
..
},
..
} = b;
(state_diff_length as usize, state_diff_commitment)
})
.collect::<Vec<_>>();
let stream = super::length_and_commitment_stream(
storage.clone(),
BlockNumber::GENESIS,
BlockNumber::GENESIS + len as u64 - 1,
NonZeroUsize::new(2).unwrap(),
);

let mut remainder = stream.collect::<Vec<_>>().await;

let actual = remainder
.drain(..ok_len)
.map(|x| x.unwrap())
.collect::<Vec<_>>();

assert_eq!(expected[..ok_len], actual);

if len > DB_LEN {
assert!(remainder.pop().unwrap().is_err());
} else {
assert!(remainder.is_empty());
}
}
}
Loading

0 comments on commit 0ce7f10

Please sign in to comment.