Skip to content

Commit

Permalink
Change proof queue data structure (#13878)
Browse files Browse the repository at this point in the history
  • Loading branch information
vusirikala authored Jul 1, 2024
1 parent 1c2ee70 commit daea1c7
Showing 1 changed file with 47 additions and 50 deletions.
97 changes: 47 additions & 50 deletions consensus/src/quorum_store/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,11 +198,10 @@ pub struct ProofQueue {
author_to_batches: HashMap<PeerId, BTreeMap<BatchSortKey, BatchInfo>>,
// ProofOfStore and insertion_time. None if committed
batch_to_proof: HashMap<BatchKey, Option<(ProofOfStore, Instant)>>,
// Map of txn_summary = (sender, sequence number, hash) to all the batches that contain
// the transaction. This helps in counting the number of unique transactions in the pipeline.
txn_summary_to_batches: HashMap<TransactionSummary, HashSet<BatchKey>>,
// List of batches for which we received txn summaries from the batch coordinator
batches_with_txn_summary: HashSet<BatchKey>,
// Number of batches in which the txn_summary = (sender, sequence number, hash) has been included
txn_summary_num_occurrences: HashMap<TransactionSummary, u64>,
// List of transaction summaries for each batch
batch_to_txn_summaries: HashMap<BatchKey, Vec<TransactionSummary>>,
// Expiration index
expirations: TimeExpirations<BatchSortKey>,
latest_block_timestamp: u64,
Expand All @@ -218,8 +217,8 @@ impl ProofQueue {
my_peer_id,
author_to_batches: HashMap::new(),
batch_to_proof: HashMap::new(),
txn_summary_to_batches: HashMap::new(),
batches_with_txn_summary: HashSet::new(),
txn_summary_num_occurrences: HashMap::new(),
batch_to_txn_summaries: HashMap::new(),
expirations: TimeExpirations::new(),
latest_block_timestamp: 0,
remaining_txns_with_duplicates: 0,
Expand Down Expand Up @@ -250,30 +249,15 @@ impl ProofQueue {
}

fn remaining_txns_without_duplicates(&self) -> u64 {
// All the batch keys for which batch_to_proof is not None. This is the set of unexpired and uncommitted proofs.
let unexpired_batch_keys = self
.batch_to_proof
.iter()
.filter(|(_, proof)| proof.is_some())
.map(|(batch_key, _)| batch_key)
.collect::<HashSet<_>>();
let mut remaining_txns = self
.txn_summary_to_batches
.iter()
.filter(|(_, batches)| {
batches
.iter()
.any(|batch_key| unexpired_batch_keys.contains(batch_key))
})
.count() as u64;
let mut remaining_txns = self.txn_summary_num_occurrences.len() as u64;

// If a batch_key is not in batches_with_txn_summary, it means we've received the proof but haven't receive the
// transaction summary of the batch from batch coordinator. Add the number of txns in the batch to remaining_txns.
remaining_txns += self
.batch_to_proof
.iter()
.filter_map(|(batch_key, proof)| {
if proof.is_some() && !self.batches_with_txn_summary.contains(batch_key) {
if proof.is_some() && !self.batch_to_txn_summaries.contains_key(batch_key) {
Some(proof.as_ref().unwrap().0.num_txns())
} else {
None
Expand Down Expand Up @@ -322,13 +306,19 @@ impl ProofQueue {
let start = Instant::now();
for (batch_info, txn_summaries) in batch_summaries {
let batch_key = BatchKey::from_info(&batch_info);
for txn_summary in txn_summaries {
self.txn_summary_to_batches
.entry(txn_summary)
.or_default()
.insert(batch_key.clone());
if self
.batch_to_txn_summaries
.insert(batch_key, txn_summaries.clone())
.is_none()
{
for txn_summary in txn_summaries {
if let Some(count) = self.txn_summary_num_occurrences.get_mut(&txn_summary) {
*count += 1;
} else {
self.txn_summary_num_occurrences.insert(txn_summary, 1);
}
}
}
self.batches_with_txn_summary.insert(batch_key);
}
counters::PROOF_QUEUE_ADD_BATCH_SUMMARIES_DURATION.observe_duration(start.elapsed());
}
Expand Down Expand Up @@ -472,11 +462,17 @@ impl ProofQueue {
num_expired_but_not_committed += 1;
counters::GAP_BETWEEN_BATCH_EXPIRATION_AND_CURRENT_TIME_WHEN_COMMIT
.observe((block_timestamp - batch.expiration()) as f64);
self.txn_summary_to_batches.retain(|_, batches| {
batches.remove(&key.batch_key);
!batches.is_empty()
});
self.batches_with_txn_summary.remove(&key.batch_key);
if let Some(txn_summaries) = self.batch_to_txn_summaries.get(&key.batch_key)
{
for txn_summary in txn_summaries {
if let Some(count) =
self.txn_summary_num_occurrences.get_mut(txn_summary)
{
*count -= 1;
};
}
}
self.batch_to_txn_summaries.remove(&key.batch_key);
self.dec_remaining(&batch.author(), batch.num_txns());
}
claims::assert_some!(self.batch_to_proof.remove(&key.batch_key));
Expand All @@ -486,6 +482,8 @@ impl ProofQueue {
}
}
}
self.txn_summary_num_occurrences
.retain(|_, count| *count > 0);
counters::PROOF_QUEUE_UPDATE_TIMESTAMP_DURATION.observe_duration(start.elapsed());
counters::NUM_PROOFS_EXPIRED_WHEN_COMMIT.inc_by(num_expired_but_not_committed);
}
Expand All @@ -501,20 +499,20 @@ impl ProofQueue {
.observe(remaining_txns_without_duplicates as f64);
//count the number of transactions with more than one batches
counters::TXNS_WITH_DUPLICATE_BATCHES.set(
self.txn_summary_to_batches
self.txn_summary_num_occurrences
.iter()
.filter(|(_, batches)| batches.len() > 1)
.filter(|(_, count)| **count > 1)
.count() as i64,
);

counters::TXNS_IN_PROOF_QUEUE.set(self.txn_summary_to_batches.len() as i64);
counters::TXNS_IN_PROOF_QUEUE.set(self.txn_summary_num_occurrences.len() as i64);

// count the number of batches with proofs but without txn summaries
counters::PROOFS_WITHOUT_BATCH_DATA.set(
self.batch_to_proof
.iter()
.map(|(batch_key, proof)| {
if proof.is_some() && !self.batches_with_txn_summary.contains(batch_key) {
if proof.is_some() && !self.batch_to_txn_summaries.contains_key(batch_key) {
1
} else {
0
Expand Down Expand Up @@ -546,18 +544,17 @@ impl ProofQueue {
self.dec_remaining(&batch.author(), batch.num_txns());
}
self.batch_to_proof.insert(batch_key.clone(), None);
self.batches_with_txn_summary.remove(&batch_key);
}
let batch_keys = batches
.iter()
.map(BatchKey::from_info)
.collect::<HashSet<_>>();
self.txn_summary_to_batches.retain(|_, batches| {
for batch_key in &batch_keys {
batches.remove(batch_key);
if let Some(txn_summaries) = self.batch_to_txn_summaries.get(&batch_key) {
for txn_summary in txn_summaries {
if let Some(count) = self.txn_summary_num_occurrences.get_mut(txn_summary) {
*count -= 1;
};
}
}
!batches.is_empty()
});
self.batch_to_txn_summaries.remove(&batch_key);
}
self.txn_summary_num_occurrences
.retain(|_, count| *count > 0);
counters::PROOF_QUEUE_COMMIT_DURATION.observe_duration(start.elapsed());
}
}

0 comments on commit daea1c7

Please sign in to comment.