Skip to content

Commit

Permalink
for_blocks process in parallel
Browse files Browse the repository at this point in the history
  • Loading branch information
RCasatta committed Aug 29, 2023
1 parent 5be0ada commit 73eae57
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 9 deletions.
3 changes: 2 additions & 1 deletion src/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,8 @@ impl Daemon {
pub(crate) fn for_blocks<B, F, R>(&self, blockhashes: B, func: F) -> Result<Vec<R>>
where
B: IntoIterator<Item = BlockHash>,
F: Fn(BlockHash, SerBlock) -> R,
F: Fn(BlockHash, SerBlock) -> R + Send + Sync,
R: Send + Sync,
{
self.p2p.lock().for_blocks(blockhashes, func)
}
Expand Down
16 changes: 9 additions & 7 deletions src/p2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ use bitcoin::{
};
use bitcoin_slices::{bsl, Parse};
use crossbeam_channel::{bounded, select, Receiver, Sender};
use rayon::iter::ParallelIterator;
use rayon::prelude::IntoParallelIterator;

use std::io::{self, ErrorKind, Write};
use std::net::{IpAddr, Ipv4Addr, SocketAddr, TcpStream};
Expand Down Expand Up @@ -97,13 +99,14 @@ impl Connection {
pub(crate) fn for_blocks<B, F, R>(&mut self, blockhashes: B, func: F) -> Result<Vec<R>>
where
B: IntoIterator<Item = BlockHash>,
F: Fn(BlockHash, SerBlock) -> R,
F: Fn(BlockHash, SerBlock) -> R + Send + Sync,
R: Send + Sync,
{
self.blocks_duration.observe_duration("total", || {
let mut result = vec![];
let blockhashes: Vec<BlockHash> = blockhashes.into_iter().collect();
if blockhashes.is_empty() {
return Ok(result);
return Ok(vec![]);
}
self.blocks_duration.observe_duration("request", || {
debug!("loading {} blocks", blockhashes.len());
Expand All @@ -125,12 +128,11 @@ impl Connection {
);
Ok(block)
})?;
result.push(
self.blocks_duration
.observe_duration("process", || func(hash, block)),
);
result.push((hash, block));
}
Ok(result)
let x: Vec<_> = result.into_par_iter().map(|(a, b)| func(a, b)).collect(); // TODO restore observe

Ok(x)
})
}

Expand Down
3 changes: 2 additions & 1 deletion src/status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,8 @@ impl ScriptHashStatus {
fn for_new_blocks<B, F, R>(&self, blockhashes: B, daemon: &Daemon, func: F) -> Result<Vec<R>>
where
B: IntoIterator<Item = BlockHash>,
F: Fn(BlockHash, SerBlock) -> R,
F: Fn(BlockHash, SerBlock) -> R + Send + Sync,
R: Send + Sync,
{
daemon.for_blocks(
blockhashes
Expand Down

0 comments on commit 73eae57

Please sign in to comment.