From 5829eddfe4e3926eb6cdd858854ecc303c9c0d31 Mon Sep 17 00:00:00 2001 From: Riccardo Casatta Date: Tue, 29 Aug 2023 16:59:49 +0200 Subject: [PATCH] `for_blocks` apply closure in parallel requires changing the signature of the function from FnMut -> Fn and return data instead of relying on mutating the context --- src/daemon.rs | 5 ++-- src/db.rs | 11 ++++++++ src/index.rs | 26 +++++++++++------- src/p2p.rs | 24 ++++++++++++----- src/status.rs | 72 +++++++++++++++++++++++++++++++------------------- src/tracker.rs | 23 ++++++++-------- 6 files changed, 104 insertions(+), 57 deletions(-) diff --git a/src/daemon.rs b/src/daemon.rs index 19e440308..03193a624 100644 --- a/src/daemon.rs +++ b/src/daemon.rs @@ -229,10 +229,11 @@ impl Daemon { self.p2p.lock().get_new_headers(chain) } - pub(crate) fn for_blocks(&self, blockhashes: B, func: F) -> Result<()> + pub(crate) fn for_blocks(&self, blockhashes: B, func: F) -> Result> where B: IntoIterator, - F: FnMut(BlockHash, SerBlock), + F: Fn(BlockHash, SerBlock) -> R + Send + Sync, + R: Send + Sync, { self.p2p.lock().for_blocks(blockhashes, func) } diff --git a/src/db.rs b/src/db.rs index dbb0381dd..1aec45d7b 100644 --- a/src/db.rs +++ b/src/db.rs @@ -8,6 +8,7 @@ pub(crate) type Row = Box<[u8]>; #[derive(Default)] pub(crate) struct WriteBatch { + pub(crate) height: usize, pub(crate) tip_row: Row, pub(crate) header_rows: Vec, pub(crate) funding_rows: Vec, @@ -22,6 +23,16 @@ impl WriteBatch { self.spending_rows.sort_unstable(); self.txid_rows.sort_unstable(); } + pub(crate) fn merge(mut self, other: WriteBatch) -> Self { + self.header_rows.extend(other.header_rows.into_iter()); + self.funding_rows.extend(other.funding_rows.into_iter()); + self.spending_rows.extend(other.spending_rows.into_iter()); + self.txid_rows.extend(other.txid_rows.into_iter()); + if self.height < other.height { + self.tip_row = other.tip_row + } + self + } } /// RocksDB wrapper for index storage diff --git a/src/index.rs b/src/index.rs index d48bf2150..f53697003 100644 --- a/src/index.rs +++ b/src/index.rs @@ -2,6 +2,7 @@ use anyhow::{Context, Result}; use bitcoin::consensus::{deserialize, serialize, Decodable}; use bitcoin::{BlockHash, OutPoint, Txid}; use bitcoin_slices::{bsl, Visit, Visitor}; +use std::collections::HashMap; use std::ops::ControlFlow; use crate::{ @@ -199,29 +200,33 @@ impl Index { } fn sync_blocks(&mut self, daemon: &Daemon, chunk: &[NewHeader]) -> Result<()> { - let blockhashes: Vec = chunk.iter().map(|h| h.hash()).collect(); - let mut heights = chunk.iter().map(|h| h.height()); + let hash_height: HashMap<_, _> = chunk.iter().map(|h| (h.hash(), h.height())).collect(); - let mut batch = WriteBatch::default(); + let batches = daemon.for_blocks(hash_height.keys().cloned(), |blockhash, block| { + let height = *hash_height.get(&blockhash).expect("some by construnction"); + let mut batch = WriteBatch::default(); - daemon.for_blocks(blockhashes, |blockhash, block| { - let height = heights.next().expect("unexpected block"); self.stats.observe_duration("block", || { index_single_block(blockhash, block, height, &mut batch); }); self.stats.height.set("tip", height as f64); + batch })?; - let heights: Vec<_> = heights.collect(); - assert!( - heights.is_empty(), - "some blocks were not indexed: {:?}", - heights + assert_eq!( + hash_height.len(), + batches.len(), + "some blocks were not indexed", ); + let mut batch = batches + .into_iter() + .fold(WriteBatch::default(), |a, b| a.merge(b)); + batch.sort(); self.stats.observe_batch(&batch); self.stats .observe_duration("write", || self.store.write(&batch)); self.stats.observe_db(&self.store); + Ok(()) } @@ -287,4 +292,5 @@ fn index_single_block( let mut index_block = IndexBlockVisitor { batch, height }; bsl::Block::visit(&block, &mut index_block).expect("core returned invalid block"); batch.tip_row = serialize(&block_hash).into_boxed_slice(); + batch.height = height; } diff --git a/src/p2p.rs b/src/p2p.rs index 0726e3b93..9312243a6 100644 --- a/src/p2p.rs +++ b/src/p2p.rs @@ -18,6 +18,8 @@ use bitcoin::{ }; use bitcoin_slices::{bsl, Parse}; use crossbeam_channel::{bounded, select, Receiver, Sender}; +use rayon::iter::ParallelIterator; +use rayon::prelude::IntoParallelIterator; use std::io::{self, ErrorKind, Write}; use std::net::{IpAddr, Ipv4Addr, SocketAddr, TcpStream}; @@ -90,18 +92,20 @@ impl Connection { .collect()) } - /// Request and process the specified blocks (in the specified order). + /// Request and process the specified blocks. /// See https://en.bitcoin.it/wiki/Protocol_documentation#getblocks for details. /// Defined as `&mut self` to prevent concurrent invocations (https://github.com/romanz/electrs/pull/526#issuecomment-934685515). - pub(crate) fn for_blocks(&mut self, blockhashes: B, mut func: F) -> Result<()> + pub(crate) fn for_blocks(&mut self, blockhashes: B, func: F) -> Result> where B: IntoIterator, - F: FnMut(BlockHash, SerBlock), + F: Fn(BlockHash, SerBlock) -> R + Send + Sync, + R: Send + Sync, { self.blocks_duration.observe_duration("total", || { + let mut result = vec![]; let blockhashes: Vec = blockhashes.into_iter().collect(); if blockhashes.is_empty() { - return Ok(()); + return Ok(vec![]); } self.blocks_duration.observe_duration("request", || { debug!("loading {} blocks", blockhashes.len()); @@ -123,10 +127,16 @@ impl Connection { ); Ok(block) })?; - self.blocks_duration - .observe_duration("process", || func(hash, block)); + result.push((hash, block)); } - Ok(()) + + Ok(result + .into_par_iter() + .map(|(hash, block)| { + self.blocks_duration + .observe_duration("process", || func(hash, block)) + }) + .collect()) }) } diff --git a/src/status.rs b/src/status.rs index 41115b394..5c9a1e4b5 100644 --- a/src/status.rs +++ b/src/status.rs @@ -308,10 +308,11 @@ impl ScriptHashStatus { } /// Apply func only on the new blocks (fetched from daemon). - fn for_new_blocks(&self, blockhashes: B, daemon: &Daemon, func: F) -> Result<()> + fn for_new_blocks(&self, blockhashes: B, daemon: &Daemon, func: F) -> Result> where B: IntoIterator, - F: FnMut(BlockHash, SerBlock), + F: Fn(BlockHash, SerBlock) -> R + Send + Sync, + R: Send + Sync, { daemon.for_blocks( blockhashes @@ -331,37 +332,54 @@ impl ScriptHashStatus { outpoints: &mut HashSet, ) -> Result>> { let scripthash = self.scripthash; - let mut result = HashMap::>::new(); let funding_blockhashes = index.limit_result(index.filter_by_funding(scripthash))?; - self.for_new_blocks(funding_blockhashes, daemon, |blockhash, block| { - let block_entries = result.entry(blockhash).or_default(); - for filtered_outputs in filter_block_txs_outputs(block, scripthash) { - cache.add_tx(filtered_outputs.txid, move || filtered_outputs.tx); - outpoints.extend(make_outpoints( - filtered_outputs.txid, - &filtered_outputs.result, - )); - block_entries - .entry(filtered_outputs.pos) - .or_insert_with(|| TxEntry::new(filtered_outputs.txid)) - .outputs = filtered_outputs.result; - } - })?; + let outputs_filtering = + self.for_new_blocks(funding_blockhashes, daemon, |blockhash, block| { + let mut block_entries: HashMap = HashMap::new(); + let mut outpoints = vec![]; + for filtered_outputs in filter_block_txs_outputs(block, scripthash) { + cache.add_tx(filtered_outputs.txid, move || filtered_outputs.tx); + outpoints.extend(make_outpoints( + filtered_outputs.txid, + &filtered_outputs.result, + )); + block_entries + .entry(filtered_outputs.pos) + .or_insert_with(|| TxEntry::new(filtered_outputs.txid)) + .outputs = filtered_outputs.result; + } + (blockhash, outpoints, block_entries) + })?; + + outpoints.extend(outputs_filtering.iter().flat_map(|(_, o, _)| o).cloned()); + + let mut result: HashMap<_, _> = outputs_filtering + .into_iter() + .map(|(a, _, b)| (a, b)) + .collect(); + let spending_blockhashes: HashSet = outpoints .par_iter() .flat_map_iter(|outpoint| index.filter_by_spending(*outpoint)) .collect(); - self.for_new_blocks(spending_blockhashes, daemon, |blockhash, block| { - let block_entries = result.entry(blockhash).or_default(); - for filtered_inputs in filter_block_txs_inputs(&block, outpoints) { - cache.add_tx(filtered_inputs.txid, move || filtered_inputs.tx); - block_entries - .entry(filtered_inputs.pos) - .or_insert_with(|| TxEntry::new(filtered_inputs.txid)) - .spent = filtered_inputs.result; - } - })?; + let inputs_filtering = + self.for_new_blocks(spending_blockhashes, daemon, |blockhash, block| { + let mut block_entries: HashMap = HashMap::new(); + + for filtered_inputs in filter_block_txs_inputs(&block, outpoints) { + cache.add_tx(filtered_inputs.txid, move || filtered_inputs.tx); + block_entries + .entry(filtered_inputs.pos) + .or_insert_with(|| TxEntry::new(filtered_inputs.txid)) + .spent = filtered_inputs.result; + } + (blockhash, block_entries) + })?; + for (b, h) in inputs_filtering { + let e = result.entry(b).or_default(); + e.extend(h); + } Ok(result .into_iter() diff --git a/src/tracker.rs b/src/tracker.rs index 39ba779ea..a6c50524d 100644 --- a/src/tracker.rs +++ b/src/tracker.rs @@ -105,17 +105,18 @@ impl Tracker { ) -> Result> { // Note: there are two blocks with coinbase transactions having same txid (see BIP-30) let blockhashes = self.index.filter_by_txid(txid); - let mut result = None; - daemon.for_blocks(blockhashes, |blockhash, block| { - if result.is_some() { - return; // keep first matching transaction - } - let mut visitor = FindTransaction::new(txid); - result = match bsl::Block::visit(&block, &mut visitor) { - Ok(_) | Err(VisitBreak) => visitor.tx_found().map(|tx| (blockhash, tx)), - Err(e) => panic!("core returned invalid block: {:?}", e), - }; - })?; + let result = daemon + .for_blocks(blockhashes, |blockhash, block| { + let mut visitor = FindTransaction::new(txid); + match bsl::Block::visit(&block, &mut visitor) { + Ok(_) | Err(VisitBreak) => (), + Err(e) => panic!("core returned invalid block: {:?}", e), + } + visitor.tx_found().map(|tx| (blockhash, tx)) + })? + .first() + .cloned() + .flatten(); Ok(result) } }