Skip to content

Commit

Permalink
Merge pull request eqlabs#2164 from eqlabs/chris/rm-async-stream
Browse files Browse the repository at this point in the history
refactor: finally remove async_stream
  • Loading branch information
CHr15F0x authored Aug 13, 2024
2 parents 40ed242 + 0ce7f10 commit 2dd391b
Show file tree
Hide file tree
Showing 9 changed files with 406 additions and 130 deletions.
24 changes: 0 additions & 24 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ authors = ["Equilibrium Labs <[email protected]>"]
anyhow = "1.0.75"
ark-ff = "0.4.2"
assert_matches = "1.5.0"
async-stream = "0.3.5"
async-trait = "0.1.73"
axum = "0.7.5"
base64 = "0.13.1"
Expand Down
1 change: 0 additions & 1 deletion crates/p2p/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ rust-version = { workspace = true }

[dependencies]
anyhow = { workspace = true }
async-stream = { workspace = true }
async-trait = { workspace = true }
base64 = { workspace = true }
clap = { workspace = true, features = ["derive", "env", "wrap_help"] }
Expand Down
1 change: 0 additions & 1 deletion crates/pathfinder/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ p2p = []

[dependencies]
anyhow = { workspace = true }
async-stream = { workspace = true }
async-trait = { workspace = true }
axum = { workspace = true }
base64 = { workspace = true }
Expand Down
29 changes: 25 additions & 4 deletions crates/pathfinder/src/sync/checkpoint.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#![allow(dead_code, unused_variables)]
use std::collections::HashSet;
use std::num::NonZeroUsize;
use std::sync::{Arc, RwLock};

use anyhow::Context;
Expand Down Expand Up @@ -181,7 +182,12 @@ impl Sync {
let transaction_stream = self.p2p.clone().transaction_stream(
start,
stop,
transactions::counts_and_commitments_stream(self.storage.clone(), start, stop),
transactions::counts_and_commitments_stream(
self.storage.clone(),
start,
stop,
NonZeroUsize::new(100).expect("100>0"),
),
);

handle_transaction_stream(transaction_stream, self.storage.clone(), chain_id, start)
Expand All @@ -206,7 +212,12 @@ impl Sync {
let stream = self.p2p.clone().state_diff_stream(
start,
stop,
state_updates::length_and_commitment_stream(self.storage.clone(), start, stop),
state_updates::length_and_commitment_stream(
self.storage.clone(),
start,
stop,
NonZeroUsize::new(100).expect("100>0"),
),
);

handle_state_diff_stream(stream, self.storage.clone(), start, verify_tree_hashes).await?;
Expand All @@ -226,7 +237,12 @@ impl Sync {
let class_stream = self.p2p.clone().class_stream(
start,
stop,
class_definitions::declared_class_counts_stream(self.storage.clone(), start, stop),
class_definitions::declared_class_counts_stream(
self.storage.clone(),
start,
stop,
NonZeroUsize::new(100).expect("100>0"),
),
);

handle_class_stream(
Expand All @@ -253,7 +269,12 @@ impl Sync {
let event_stream = self.p2p.clone().event_stream(
start,
stop,
events::counts_stream(self.storage.clone(), start, stop),
events::counts_stream(
self.storage.clone(),
start,
stop,
NonZeroUsize::new(100).expect("100>0"),
),
);

handle_event_stream(event_stream, self.storage.clone()).await?;
Expand Down
125 changes: 98 additions & 27 deletions crates/pathfinder/src/sync/class_definitions.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::collections::HashSet;
use std::num::NonZeroUsize;
use std::sync::Arc;
use std::thread;

use anyhow::Context;
use futures::pin_mut;
Expand All @@ -19,9 +19,10 @@ use starknet_gateway_types::class_hash::from_parts::{
compute_sierra_class_hash,
};
use starknet_gateway_types::reply::call;
use tokio::sync::mpsc::Receiver;
use tokio::sync::mpsc::{self, Receiver};
use tokio::sync::Mutex;
use tokio::task::spawn_blocking;
use tokio_stream::wrappers::ReceiverStream;

use crate::sync::error::{SyncError, SyncError2};
use crate::sync::stream::ProcessStage;
Expand Down Expand Up @@ -88,57 +89,70 @@ pub(super) async fn next_missing(
.context("Joining blocking task")?
}

/// ### Important
///
/// Caller must ensure that `start <= stop`.
pub(super) fn declared_class_counts_stream(
storage: Storage,
mut start: BlockNumber,
stop: BlockNumber,
batch_size: NonZeroUsize,
) -> impl futures::Stream<Item = anyhow::Result<usize>> {
const BATCH_SIZE: usize = 1000;

async_stream::try_stream! {
let (tx, rx) = mpsc::channel(1);
thread::spawn(move || {
let mut batch = Vec::<usize>::new();

while start <= stop {
if let Some(counts) = batch.pop() {
yield counts;
_ = tx.blocking_send(Ok(counts));
continue;
}

let batch_size = NonZeroUsize::new(
BATCH_SIZE.min(
let batch_size = batch_size.min(
NonZeroUsize::new(
(stop.get() - start.get() + 1)
.try_into()
.expect("ptr size is 64bits"),
),
)
.expect(">0");
)
.expect(">0"),
);

let storage = storage.clone();

batch = tokio::task::spawn_blocking(move || {
let get = move || {
let mut db = storage
.connection()
.context("Creating database connection")?;
let db = db.transaction().context("Creating database transaction")?;
db.declared_classes_counts(start, batch_size)
.context("Querying declared classes counts")
})
.await
.context("Joining blocking task")??;

if batch.is_empty() {
Err(anyhow::anyhow!(
"No declared classes counts found for range: start {start}, batch_size {batch_size}"
))?;
break;
}
let batch = db
.declared_classes_counts(start, batch_size)
.context("Querying declared classes counts")?;

anyhow::ensure!(
!batch.is_empty(),
"No class counts found: start {start}, batch_size {batch_size}"
);

Ok(batch)
};

batch = match get() {
Ok(x) => x,
Err(e) => {
_ = tx.blocking_send(Err(e));
return;
}
};

start += batch.len().try_into().expect("ptr size is 64bits");
}

while let Some(counts) = batch.pop() {
yield counts;
_ = tx.blocking_send(Ok(counts));
}
}
});

ReceiverStream::new(rx)
}

pub struct VerifyLayout;
Expand Down Expand Up @@ -297,7 +311,7 @@ impl ExpectedDeclarationsSource {
}

pub fn spawn(self) -> anyhow::Result<Receiver<ExpectedDeclarations>> {
let (tx, rx) = tokio::sync::mpsc::channel(1);
let (tx, rx) = mpsc::channel(1);
let Self {
mut db_connection,
mut start,
Expand Down Expand Up @@ -479,3 +493,60 @@ impl ProcessStage for VerifyClassHashes {
}
}
}

#[cfg(test)]
mod tests {
use pathfinder_common::StateUpdate;
use pathfinder_storage::fake::Block;

use super::*;

#[rstest::rstest]
#[case::request_shorter_than_batch_size(1)]
#[case::request_equal_to_batch_size(2)]
#[case::request_longer_than_batch_size(3)]
#[case::request_equal_to_db_size(5)]
#[case::request_longer_than_db_size(6)]
#[tokio::test]
async fn declared_class_counts_stream(#[case] len: usize) {
const DB_LEN: usize = 5;
let ok_len = len.min(DB_LEN);
let storage = pathfinder_storage::StorageBuilder::in_memory().unwrap();
let expected = pathfinder_storage::fake::with_n_blocks(&storage, DB_LEN)
.into_iter()
.map(|b| {
let Block {
state_update:
StateUpdate {
declared_cairo_classes,
declared_sierra_classes,
..
},
..
} = b;
declared_cairo_classes.len() + declared_sierra_classes.len()
})
.collect::<Vec<_>>();
let stream = super::declared_class_counts_stream(
storage.clone(),
BlockNumber::GENESIS,
BlockNumber::GENESIS + len as u64 - 1,
NonZeroUsize::new(2).unwrap(),
);

let mut remainder = stream.collect::<Vec<_>>().await;

let actual = remainder
.drain(..ok_len)
.map(|x| x.unwrap())
.collect::<Vec<_>>();

assert_eq!(expected[..ok_len], actual);

if len > DB_LEN {
assert!(remainder.pop().unwrap().is_err());
} else {
assert!(remainder.is_empty());
}
}
}
Loading

0 comments on commit 2dd391b

Please sign in to comment.