diff --git a/consensus/src/quorum_store/utils.rs b/consensus/src/quorum_store/utils.rs index e4ee5357dde9d..f584c836d7c54 100644 --- a/consensus/src/quorum_store/utils.rs +++ b/consensus/src/quorum_store/utils.rs @@ -198,11 +198,10 @@ pub struct ProofQueue { author_to_batches: HashMap>, // ProofOfStore and insertion_time. None if committed batch_to_proof: HashMap>, - // Map of txn_summary = (sender, sequence number, hash) to all the batches that contain - // the transaction. This helps in counting the number of unique transactions in the pipeline. - txn_summary_to_batches: HashMap>, - // List of batches for which we received txn summaries from the batch coordinator - batches_with_txn_summary: HashSet, + // Number of batches in which the txn_summary = (sender, sequence number, hash) has been included + txn_summary_num_occurrences: HashMap, + // List of transaction summaries for each batch + batch_to_txn_summaries: HashMap>, // Expiration index expirations: TimeExpirations, latest_block_timestamp: u64, @@ -218,8 +217,8 @@ impl ProofQueue { my_peer_id, author_to_batches: HashMap::new(), batch_to_proof: HashMap::new(), - txn_summary_to_batches: HashMap::new(), - batches_with_txn_summary: HashSet::new(), + txn_summary_num_occurrences: HashMap::new(), + batch_to_txn_summaries: HashMap::new(), expirations: TimeExpirations::new(), latest_block_timestamp: 0, remaining_txns_with_duplicates: 0, @@ -250,22 +249,7 @@ impl ProofQueue { } fn remaining_txns_without_duplicates(&self) -> u64 { - // All the batch keys for which batch_to_proof is not None. This is the set of unexpired and uncommitted proofs. - let unexpired_batch_keys = self - .batch_to_proof - .iter() - .filter(|(_, proof)| proof.is_some()) - .map(|(batch_key, _)| batch_key) - .collect::>(); - let mut remaining_txns = self - .txn_summary_to_batches - .iter() - .filter(|(_, batches)| { - batches - .iter() - .any(|batch_key| unexpired_batch_keys.contains(batch_key)) - }) - .count() as u64; + let mut remaining_txns = self.txn_summary_num_occurrences.len() as u64; // If a batch_key is not in batches_with_txn_summary, it means we've received the proof but haven't receive the // transaction summary of the batch from batch coordinator. Add the number of txns in the batch to remaining_txns. @@ -273,7 +257,7 @@ impl ProofQueue { .batch_to_proof .iter() .filter_map(|(batch_key, proof)| { - if proof.is_some() && !self.batches_with_txn_summary.contains(batch_key) { + if proof.is_some() && !self.batch_to_txn_summaries.contains_key(batch_key) { Some(proof.as_ref().unwrap().0.num_txns()) } else { None @@ -322,13 +306,19 @@ impl ProofQueue { let start = Instant::now(); for (batch_info, txn_summaries) in batch_summaries { let batch_key = BatchKey::from_info(&batch_info); - for txn_summary in txn_summaries { - self.txn_summary_to_batches - .entry(txn_summary) - .or_default() - .insert(batch_key.clone()); + if self + .batch_to_txn_summaries + .insert(batch_key, txn_summaries.clone()) + .is_none() + { + for txn_summary in txn_summaries { + if let Some(count) = self.txn_summary_num_occurrences.get_mut(&txn_summary) { + *count += 1; + } else { + self.txn_summary_num_occurrences.insert(txn_summary, 1); + } + } } - self.batches_with_txn_summary.insert(batch_key); } counters::PROOF_QUEUE_ADD_BATCH_SUMMARIES_DURATION.observe_duration(start.elapsed()); } @@ -472,11 +462,17 @@ impl ProofQueue { num_expired_but_not_committed += 1; counters::GAP_BETWEEN_BATCH_EXPIRATION_AND_CURRENT_TIME_WHEN_COMMIT .observe((block_timestamp - batch.expiration()) as f64); - self.txn_summary_to_batches.retain(|_, batches| { - batches.remove(&key.batch_key); - !batches.is_empty() - }); - self.batches_with_txn_summary.remove(&key.batch_key); + if let Some(txn_summaries) = self.batch_to_txn_summaries.get(&key.batch_key) + { + for txn_summary in txn_summaries { + if let Some(count) = + self.txn_summary_num_occurrences.get_mut(txn_summary) + { + *count -= 1; + }; + } + } + self.batch_to_txn_summaries.remove(&key.batch_key); self.dec_remaining(&batch.author(), batch.num_txns()); } claims::assert_some!(self.batch_to_proof.remove(&key.batch_key)); @@ -486,6 +482,8 @@ impl ProofQueue { } } } + self.txn_summary_num_occurrences + .retain(|_, count| *count > 0); counters::PROOF_QUEUE_UPDATE_TIMESTAMP_DURATION.observe_duration(start.elapsed()); counters::NUM_PROOFS_EXPIRED_WHEN_COMMIT.inc_by(num_expired_but_not_committed); } @@ -501,20 +499,20 @@ impl ProofQueue { .observe(remaining_txns_without_duplicates as f64); //count the number of transactions with more than one batches counters::TXNS_WITH_DUPLICATE_BATCHES.set( - self.txn_summary_to_batches + self.txn_summary_num_occurrences .iter() - .filter(|(_, batches)| batches.len() > 1) + .filter(|(_, count)| **count > 1) .count() as i64, ); - counters::TXNS_IN_PROOF_QUEUE.set(self.txn_summary_to_batches.len() as i64); + counters::TXNS_IN_PROOF_QUEUE.set(self.txn_summary_num_occurrences.len() as i64); // count the number of batches with proofs but without txn summaries counters::PROOFS_WITHOUT_BATCH_DATA.set( self.batch_to_proof .iter() .map(|(batch_key, proof)| { - if proof.is_some() && !self.batches_with_txn_summary.contains(batch_key) { + if proof.is_some() && !self.batch_to_txn_summaries.contains_key(batch_key) { 1 } else { 0 @@ -546,18 +544,17 @@ impl ProofQueue { self.dec_remaining(&batch.author(), batch.num_txns()); } self.batch_to_proof.insert(batch_key.clone(), None); - self.batches_with_txn_summary.remove(&batch_key); - } - let batch_keys = batches - .iter() - .map(BatchKey::from_info) - .collect::>(); - self.txn_summary_to_batches.retain(|_, batches| { - for batch_key in &batch_keys { - batches.remove(batch_key); + if let Some(txn_summaries) = self.batch_to_txn_summaries.get(&batch_key) { + for txn_summary in txn_summaries { + if let Some(count) = self.txn_summary_num_occurrences.get_mut(txn_summary) { + *count -= 1; + }; + } } - !batches.is_empty() - }); + self.batch_to_txn_summaries.remove(&batch_key); + } + self.txn_summary_num_occurrences + .retain(|_, count| *count > 0); counters::PROOF_QUEUE_COMMIT_DURATION.observe_duration(start.elapsed()); } }