From f7c8cca3199c43f6f8427170aa54db5af0ecead5 Mon Sep 17 00:00:00 2001 From: Riccardo Casatta Date: Sat, 16 Sep 2023 22:22:48 +0200 Subject: [PATCH] scope and spawn instead of par iter --- src/p2p.rs | 77 ++++++++++++++++++++++++++++++++++++------------------ 1 file changed, 51 insertions(+), 26 deletions(-) diff --git a/src/p2p.rs b/src/p2p.rs index 9312243a6..eefab57f1 100644 --- a/src/p2p.rs +++ b/src/p2p.rs @@ -18,12 +18,10 @@ use bitcoin::{ }; use bitcoin_slices::{bsl, Parse}; use crossbeam_channel::{bounded, select, Receiver, Sender}; -use rayon::iter::ParallelIterator; -use rayon::prelude::IntoParallelIterator; use std::io::{self, ErrorKind, Write}; use std::net::{IpAddr, Ipv4Addr, SocketAddr, TcpStream}; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; use crate::types::SerBlock; @@ -102,8 +100,8 @@ impl Connection { R: Send + Sync, { self.blocks_duration.observe_duration("total", || { - let mut result = vec![]; let blockhashes: Vec = blockhashes.into_iter().collect(); + let blockhashes_len = blockhashes.len(); if blockhashes.is_empty() { return Ok(vec![]); } @@ -112,31 +110,58 @@ impl Connection { self.req_send.send(Request::get_blocks(&blockhashes)) })?; - for hash in blockhashes { - let block = self.blocks_duration.observe_duration("response", || { - let block = self - .blocks_recv - .recv() - .with_context(|| format!("failed to get block {}", hash))?; - let header = bsl::BlockHeader::parse(&block[..]) - .expect("core returned invalid blockheader") - .parsed_owned(); - ensure!( - &header.block_hash_sha2()[..] == hash.as_byte_array(), - "got unexpected block" - ); - Ok(block) - })?; - result.push((hash, block)); + let mut result = Vec::with_capacity(blockhashes_len); + for _ in 0..blockhashes_len { + // TODO use `OnceLock` instead of `Mutex>` once MSRV 1.70 + result.push(Mutex::new(None)); } - Ok(result - .into_par_iter() - .map(|(hash, block)| { - self.blocks_duration - .observe_duration("process", || func(hash, block)) + rayon::scope(|s| { + for (i, hash) in blockhashes.iter().enumerate() { + let block_result = self.blocks_duration.observe_duration("response", || { + let block = self + .blocks_recv + .recv() + .with_context(|| format!("failed to get block {}", hash))?; + let header = bsl::BlockHeader::parse(&block[..]) + .expect("core returned invalid blockheader") + .parsed_owned(); + ensure!( + &header.block_hash_sha2()[..] == hash.as_byte_array(), + "got unexpected block" + ); + Ok(block) + }); + if let Ok(block) = block_result { + let func = &func; + let blocks_duration = &self.blocks_duration; + let hash = *hash; + let result = &result; + + s.spawn(move |_| { + let r = + blocks_duration.observe_duration("process", || func(hash, block)); + *result[i] + .lock() + .expect("I am the only user of this mutex until the scope ends") = + Some(r); + }); + } + } + }); + + let result: Option> = result + .into_iter() + .map(|e| { + e.into_inner() + .expect("spawn cannot panic and the scope ensure all the threads ended") }) - .collect()) + .collect(); + + match result { + Some(r) => Ok(r), + None => bail!("One or more of the jobs in for_blocks failed"), + } }) }