From ff3541444894b97d0cbc9c6a27e30d637cc57d39 Mon Sep 17 00:00:00 2001 From: Josh Lind Date: Wed, 17 Jul 2024 19:21:52 -0400 Subject: [PATCH] [Consensus Observer] Add block payload verification. --- consensus/consensus-types/src/common.rs | 4 + .../src/consensus_observer/network_message.rs | 158 ++- consensus/src/consensus_observer/observer.rs | 134 ++- .../src/consensus_observer/ordered_blocks.rs | 4 +- .../src/consensus_observer/payload_store.rs | 976 +++++++++++++++--- .../src/consensus_observer/pending_blocks.rs | 21 +- consensus/src/consensus_observer/publisher.rs | 13 +- consensus/src/payload_manager.rs | 245 ++++- 8 files changed, 1270 insertions(+), 285 deletions(-) diff --git a/consensus/consensus-types/src/common.rs b/consensus/consensus-types/src/common.rs index a1dbe0033089d..6c62a2cceff4c 100644 --- a/consensus/consensus-types/src/common.rs +++ b/consensus/consensus-types/src/common.rs @@ -165,6 +165,10 @@ impl ProofWithData { } } + pub fn empty() -> Self { + Self::new(vec![]) + } + pub fn extend(&mut self, other: ProofWithData) { let other_data_status = other.status.lock().as_mut().unwrap().take(); self.proofs.extend(other.proofs); diff --git a/consensus/src/consensus_observer/network_message.rs b/consensus/src/consensus_observer/network_message.rs index 7490c56cb06ac..a8ec2f2796098 100644 --- a/consensus/src/consensus_observer/network_message.rs +++ b/consensus/src/consensus_observer/network_message.rs @@ -2,7 +2,12 @@ // SPDX-License-Identifier: Apache-2.0 use crate::consensus_observer::error::Error; -use aptos_consensus_types::pipelined_block::PipelinedBlock; +use aptos_consensus_types::{ + common::{BatchPayload, ProofWithData}, + pipelined_block::PipelinedBlock, + proof_of_store::{BatchInfo, ProofCache}, +}; +use aptos_crypto::hash::CryptoHash; use aptos_types::{ block_info::{BlockInfo, Round}, epoch_change::Verifier, @@ -13,6 +18,7 @@ use aptos_types::{ use serde::{Deserialize, Serialize}; use std::{ fmt::{Display, Formatter}, + slice::Iter, sync::Arc, }; @@ -46,13 +52,11 @@ impl ConsensusObserverMessage { /// Creates and returns a new block payload message using the given block, transactions and limit pub fn new_block_payload_message( block: BlockInfo, - transactions: Vec, - limit: Option, + transaction_payload: BlockTransactionPayload, ) -> ConsensusObserverDirectSend { ConsensusObserverDirectSend::BlockPayload(BlockPayload { block, - transactions, - limit, + transaction_payload, }) } } @@ -150,10 +154,11 @@ impl ConsensusObserverDirectSend { }, ConsensusObserverDirectSend::BlockPayload(block_payload) => { format!( - "BlockPayload: {} {} {:?}", - block_payload.block.id(), - block_payload.transactions.len(), - block_payload.limit + "BlockPayload: {}. Number of transactions: {}, limit: {:?}, proofs: {:?}", + block_payload.block, + block_payload.transaction_payload.transactions.len(), + block_payload.transaction_payload.limit, + block_payload.transaction_payload.proof_with_data.proofs, ) }, } @@ -304,10 +309,139 @@ impl CommitDecision { } } -/// Payload message contains the block, transactions and the limit of the block +/// The transaction payload of each block #[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] -pub struct BlockPayload { - pub block: BlockInfo, +pub struct BlockTransactionPayload { pub transactions: Vec, pub limit: Option, + pub proof_with_data: ProofWithData, + pub inline_batches: Vec, +} + +impl BlockTransactionPayload { + pub fn new( + transactions: Vec, + limit: Option, + proof_with_data: ProofWithData, + inline_batches: Vec, + ) -> Self { + Self { + transactions, + limit, + proof_with_data, + inline_batches, + } + } + + #[cfg(test)] + /// Returns an empty transaction payload (for testing) + pub fn empty() -> Self { + Self { + transactions: vec![], + limit: None, + proof_with_data: ProofWithData::empty(), + inline_batches: vec![], + } + } +} + +/// Payload message contains the block and transaction payload +#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] +pub struct BlockPayload { + pub block: BlockInfo, + pub transaction_payload: BlockTransactionPayload, +} + +impl BlockPayload { + pub fn new(block: BlockInfo, transaction_payload: BlockTransactionPayload) -> Self { + Self { + block, + transaction_payload, + } + } + + /// Verifies the block payload digests and returns an error if the data is invalid + pub fn verify_payload_digests(&self) -> Result<(), Error> { + // Verify the proof of store digests against the transaction + let mut transactions_iter = self.transaction_payload.transactions.iter(); + for proof_of_store in &self.transaction_payload.proof_with_data.proofs { + reconstruct_and_verify_batch(&mut transactions_iter, proof_of_store.info())?; + } + + // Verify the inline batch digests against the inline batches + for batch_info in &self.transaction_payload.inline_batches { + reconstruct_and_verify_batch(&mut transactions_iter, batch_info)?; + } + + // Verify that there are no transactions remaining + if transactions_iter.next().is_some() { + return Err(Error::InvalidMessageError(format!( + "Failed to verify payload transactions! Transactions remaining: {:?}. Expected: 0", + transactions_iter.as_slice().len() + ))); + } + + Ok(()) // All digests match + } + + /// Verifies that the block payload proofs are correctly signed according + /// to the current epoch state. Returns an error if the data is invalid. + pub fn verify_payload_signatures(&self, epoch_state: &EpochState) -> Result<(), Error> { + // Create a dummy proof cache to verify the proofs + let proof_cache = ProofCache::new(1); + + // TODO: parallelize the verification of the proof signatures! + + // Verify each of the proof signatures + let validator_verifier = &epoch_state.verifier; + for proof_of_store in &self.transaction_payload.proof_with_data.proofs { + if let Err(error) = proof_of_store.verify(validator_verifier, &proof_cache) { + return Err(Error::InvalidMessageError(format!( + "Failed to verify the proof of store for batch: {:?}, Error: {:?}", + proof_of_store.info(), + error + ))); + } + } + + Ok(()) // All proofs are correctly signed + } +} + +/// Reconstructs and verifies the batch using the +/// given transactions and the expected batch info. +fn reconstruct_and_verify_batch( + transactions_iter: &mut Iter, + expected_batch_info: &BatchInfo, +) -> Result<(), Error> { + // Gather the transactions for the batch + let mut batch_transactions = vec![]; + for i in 0..expected_batch_info.num_txns() { + let batch_transaction = match transactions_iter.next() { + Some(transaction) => transaction, + None => { + return Err(Error::InvalidMessageError(format!( + "Failed to extract transaction during batch reconstruction! Batch: {:?}, transaction index: {:?}", + expected_batch_info, i + ))); + }, + }; + batch_transactions.push(batch_transaction.clone()); + } + + // Calculate the batch digest + let batch_payload = BatchPayload::new(expected_batch_info.author(), batch_transactions); + let batch_digest = batch_payload.hash(); + + // Verify the reconstructed digest against the expected digest + let expected_digest = expected_batch_info.digest(); + if batch_digest != *expected_digest { + return Err(Error::InvalidMessageError(format!( + "The reconstructed inline batch digest does not match the expected digest!\ + Batch: {:?}, Expected digest: {:?}, Reconstructed digest: {:?}", + expected_batch_info, expected_digest, batch_digest + ))); + } + + Ok(()) } diff --git a/consensus/src/consensus_observer/observer.rs b/consensus/src/consensus_observer/observer.rs index a729bbfca2c56..d1eb1091834d1 100644 --- a/consensus/src/consensus_observer/observer.rs +++ b/consensus/src/consensus_observer/observer.rs @@ -87,8 +87,9 @@ pub struct ConsensusObserver { // The execution client to the buffer manager execution_client: Arc, - // If the sync handle is set it indicates that we're in state sync mode - sync_handle: Option, + // If the sync handle is set it indicates that we're in state sync mode. + // The flag indicates if we're waiting to transition to a new epoch. + sync_handle: Option<(DropGuard, bool)>, // The sender to notify the consensus observer that state sync to the (epoch, round) is done sync_notification_sender: UnboundedSender<(u64, Round)>, // The reconfiguration event listener to refresh on-chain configs @@ -128,7 +129,7 @@ impl ConsensusObserver { epoch_state: None, quorum_store_enabled: false, // Updated on epoch changes root: Arc::new(Mutex::new(root)), - block_payload_store: BlockPayloadStore::new(), + block_payload_store: BlockPayloadStore::new(consensus_observer_config), pending_block_store: PendingBlockStore::new(consensus_observer_config), pending_ordered_blocks: PendingOrderedBlocks::new(consensus_observer_config), execution_client, @@ -286,10 +287,8 @@ impl ConsensusObserver { // Create the commit callback Box::new(move |blocks, ledger_info: LedgerInfoWithSignatures| { - // Remove the committed blocks from the payload store - block_payload_store.remove_blocks(blocks); - - // Remove the committed blocks from the pending blocks + // Remove the committed blocks from the payload and pending stores + block_payload_store.remove_committed_blocks(blocks); pending_ordered_blocks.remove_blocks_for_commit(&ledger_info); // Verify the ledger info is for the same epoch @@ -503,6 +502,11 @@ impl ConsensusObserver { } } + /// Returns true iff we are waiting for state sync to complete an epoch change + fn in_state_sync_epoch_change(&self) -> bool { + matches!(self.sync_handle, Some((_, true))) + } + /// Returns true iff we are waiting for state sync to complete fn in_state_sync_mode(&self) -> bool { self.sync_handle.is_some() @@ -510,14 +514,9 @@ impl ConsensusObserver { /// Processes the block payload message async fn process_block_payload_message(&mut self, block_payload: BlockPayload) { - // Get the block round and epoch - let block = block_payload.block; - let block_round = block.round(); - let block_epoch = block.epoch(); - - // Get the block transactions and limit - let transactions = block_payload.transactions; - let limit = block_payload.limit; + // Get the epoch and round for the block + let block_epoch = block_payload.block.epoch(); + let block_round = block_payload.block.round(); // Update the metrics for the received block payload metrics::set_gauge_with_label( @@ -526,21 +525,52 @@ impl ConsensusObserver { block_round, ); - // TODO: verify the block payload! + // Verify the block payload digests + if let Err(error) = block_payload.verify_payload_digests() { + error!( + LogSchema::new(LogEntry::ConsensusObserver).message(&format!( + "Failed to verify block payload digests! Ignoring block: {:?}. Error: {:?}", + block_payload.block, error + )) + ); + return; + } + + // If the payload is for the current epoch, verify the proof signatures + let epoch_state = self.get_epoch_state(); + let verified_payload = if block_epoch == epoch_state.epoch { + // Verify the block proof signatures + if let Err(error) = block_payload.verify_payload_signatures(&epoch_state) { + error!( + LogSchema::new(LogEntry::ConsensusObserver).message(&format!( + "Failed to verify block payload signatures! Ignoring block: {:?}. Error: {:?}", + block_payload.block, error + )) + ); + return; + } + + true // We have successfully verified the signatures + } else { + false // We can't verify the signatures yet + }; // Update the payload store with the payload self.block_payload_store - .insert_block_payload(block, transactions, limit); - - // Check if there are blocks that were pending payloads - // but are now ready because of the new payload. - if let Some(ordered_block) = self.pending_block_store.remove_ready_block( - block_epoch, - block_round, - &self.block_payload_store, - ) { - // Process the ordered block - self.process_ordered_block(ordered_block).await; + .insert_block_payload(block_payload, verified_payload); + + // Check if there are blocks that were missing payloads but are + // now ready because of the new payload. Note: this should only + // be done if the payload has been verified correctly. + if verified_payload { + if let Some(ordered_block) = self.pending_block_store.remove_ready_block( + block_epoch, + block_round, + &self.block_payload_store, + ) { + // Process the ordered block + self.process_ordered_block(ordered_block).await; + } } } @@ -553,7 +583,7 @@ impl ConsensusObserver { commit_decision.round(), ); - // If the commit decision is for the current epoch, verify it + // If the commit decision is for the current epoch, verify and process it let epoch_state = self.get_epoch_state(); let commit_decision_epoch = commit_decision.epoch(); if commit_decision_epoch == epoch_state.epoch { @@ -580,10 +610,24 @@ impl ConsensusObserver { // Otherwise, we failed to process the commit decision. If the commit // is for a future epoch or round, we need to state sync. - let commit_decision_round = commit_decision.round(); let last_block = self.get_last_block(); - if commit_decision_epoch > last_block.epoch() || commit_decision_round > last_block.round() - { + let commit_decision_round = commit_decision.round(); + let epoch_changed = commit_decision_epoch > last_block.epoch(); + if epoch_changed || commit_decision_round > last_block.round() { + // If we're waiting for state sync to transition into a new epoch, + // we should just wait and not issue a new state sync request. + if self.in_state_sync_epoch_change() { + info!( + LogSchema::new(LogEntry::ConsensusObserver).message(&format!( + "Already waiting for state sync to reach new epoch: {:?}. Dropping commit decision: {:?}!", + self.root.lock().commit_info(), + commit_decision.proof_block_info() + )) + ); + return; + } + + // Otherwise, we should start the state sync process info!( LogSchema::new(LogEntry::ConsensusObserver).message(&format!( "Started syncing to {}!", @@ -593,6 +637,8 @@ impl ConsensusObserver { // Update the root and clear the pending blocks (up to the commit) *self.root.lock() = commit_decision.commit_proof().clone(); + self.block_payload_store + .remove_blocks_for_epoch_round(commit_decision_epoch, commit_decision_round); self.pending_ordered_blocks .remove_blocks_for_commit(commit_decision.commit_proof()); @@ -604,7 +650,7 @@ impl ConsensusObserver { self.execution_client.clone(), self.sync_notification_sender.clone(), ); - self.sync_handle = Some(DropGuard::new(abort_handle)); + self.sync_handle = Some((DropGuard::new(abort_handle), epoch_changed)); } } @@ -857,16 +903,28 @@ impl ConsensusObserver { self.execution_client.end_epoch().await; self.wait_for_epoch_start().await; - // Verify the pending blocks for the new epoch - let new_epoch_state = self.get_epoch_state(); - self.pending_ordered_blocks - .verify_pending_blocks(&new_epoch_state); - } + // Verify the block payloads for the new epoch + let verified_payload_rounds = self + .block_payload_store + .verify_payload_signatures(&self.get_epoch_state()); + + // Order all the pending blocks that are now ready (these were buffered during state sync) + for payload_round in verified_payload_rounds { + if let Some(ordered_block) = self.pending_block_store.remove_ready_block( + self.get_epoch_state().epoch, + payload_round, + &self.block_payload_store, + ) { + // Process the ordered block + self.process_ordered_block(ordered_block).await; + } + } + }; // Reset and drop the sync handle self.sync_handle = None; - // Process all the pending blocks. These were all buffered during the state sync process. + // Process all the newly ordered blocks for (_, (ordered_block, commit_decision)) in self .pending_ordered_blocks .get_all_verified_pending_blocks() diff --git a/consensus/src/consensus_observer/ordered_blocks.rs b/consensus/src/consensus_observer/ordered_blocks.rs index 17f163350d930..518698cba6a24 100644 --- a/consensus/src/consensus_observer/ordered_blocks.rs +++ b/consensus/src/consensus_observer/ordered_blocks.rs @@ -767,8 +767,8 @@ mod test { validator_signer.public_key(), 100, ); - let validator_verified = ValidatorVerifier::new(vec![validator_consensus_info]); - let epoch_state = EpochState::new(next_epoch, validator_verified); + let validator_verifier = ValidatorVerifier::new(vec![validator_consensus_info]); + let epoch_state = EpochState::new(next_epoch, validator_verifier); // Verify the pending blocks for the next epoch pending_ordered_blocks.verify_pending_blocks(&epoch_state); diff --git a/consensus/src/consensus_observer/payload_store.rs b/consensus/src/consensus_observer/payload_store.rs index fcc490b4df6ef..4c4758400d534 100644 --- a/consensus/src/consensus_observer/payload_store.rs +++ b/consensus/src/consensus_observer/payload_store.rs @@ -4,308 +4,875 @@ use crate::consensus_observer::{ logging::{LogEntry, LogSchema}, metrics, + network_message::BlockPayload, }; -use aptos_consensus_types::pipelined_block::PipelinedBlock; -use aptos_crypto::HashValue; +use aptos_config::config::ConsensusObserverConfig; +use aptos_consensus_types::{common::Round, pipelined_block::PipelinedBlock}; use aptos_infallible::Mutex; -use aptos_logger::error; -use aptos_types::{block_info::BlockInfo, transaction::SignedTransaction}; +use aptos_logger::{error, warn}; +use aptos_types::epoch_state::EpochState; use std::{ - collections::{hash_map::Entry, HashMap}, - mem, + collections::{btree_map::Entry, BTreeMap}, sync::Arc, }; -use tokio::sync::oneshot; -/// The transaction payload of each block -#[derive(Debug, Clone)] -pub struct BlockTransactionPayload { - pub transactions: Vec, - pub limit: Option, -} - -impl BlockTransactionPayload { - pub fn new(transactions: Vec, limit: Option) -> Self { - Self { - transactions, - limit, - } - } -} - -/// The status of the block payload (requested or available) +/// The status of the block payload pub enum BlockPayloadStatus { - Requested(oneshot::Sender), - Available(BlockTransactionPayload), + AvailableAndVerified(BlockPayload), + AvailableAndUnverified(BlockPayload), } /// A simple struct to store the block payloads of ordered and committed blocks #[derive(Clone)] pub struct BlockPayloadStore { - // Block transaction payloads map the block ID to the transaction payloads - // (the same payloads that the payload manager returns). - block_transaction_payloads: Arc>>, + // The configuration of the consensus observer + consensus_observer_config: ConsensusObserverConfig, + + // Block transaction payloads (indexed by epoch and round) + block_payloads: Arc>>, } impl BlockPayloadStore { - pub fn new() -> Self { + pub fn new(consensus_observer_config: ConsensusObserverConfig) -> Self { Self { - block_transaction_payloads: Arc::new(Mutex::new(HashMap::new())), + consensus_observer_config, + block_payloads: Arc::new(Mutex::new(BTreeMap::new())), } } - /// Returns true iff all the payloads for the given blocks are available + /// Returns true iff all the payloads for the given blocks + /// are available and have been verified. pub fn all_payloads_exist(&self, blocks: &[Arc]) -> bool { - let block_transaction_payloads = self.block_transaction_payloads.lock(); + let block_payloads = self.block_payloads.lock(); blocks.iter().all(|block| { + let epoch_and_round = (block.epoch(), block.round()); matches!( - block_transaction_payloads.get(&block.id()), - Some(BlockPayloadStatus::Available(_)) + block_payloads.get(&epoch_and_round), + Some(BlockPayloadStatus::AvailableAndVerified(_)) ) }) } /// Clears all the payloads from the block payload store pub fn clear_all_payloads(&self) { - self.block_transaction_payloads.lock().clear(); + self.block_payloads.lock().clear(); } - /// Returns a reference to the block transaction payloads - pub fn get_block_payloads(&self) -> Arc>> { - self.block_transaction_payloads.clone() + /// Returns a reference to the block payloads + pub fn get_block_payloads(&self) -> Arc>> { + self.block_payloads.clone() } /// Inserts the given block payload data into the payload store pub fn insert_block_payload( &mut self, - block: BlockInfo, - transactions: Vec, - limit: Option, + block_payload: BlockPayload, + verified_payload_signatures: bool, ) { - let mut block_transaction_payloads = self.block_transaction_payloads.lock(); - let block_transaction_payload = BlockTransactionPayload::new(transactions, limit); - - match block_transaction_payloads.entry(block.id()) { - Entry::Occupied(mut entry) => { - // Replace the data status with the new block payload - let mut status = BlockPayloadStatus::Available(block_transaction_payload.clone()); - mem::swap(entry.get_mut(), &mut status); - - // If the status was originally requested, send the payload to the listener - if let BlockPayloadStatus::Requested(payload_sender) = status { - if payload_sender.send(block_transaction_payload).is_err() { - error!(LogSchema::new(LogEntry::ConsensusObserver) - .message("Failed to send block payload to listener!",)); - } - } - }, - Entry::Vacant(entry) => { - // Insert the block payload directly into the payload store - entry.insert(BlockPayloadStatus::Available(block_transaction_payload)); - }, + // Verify that the number of payloads doesn't exceed the maximum + let max_num_pending_blocks = self.consensus_observer_config.max_num_pending_blocks as usize; + if self.block_payloads.lock().len() >= max_num_pending_blocks { + warn!( + LogSchema::new(LogEntry::ConsensusObserver).message(&format!( + "Exceeded the maximum number of payloads: {:?}. Dropping block: {:?}!", + max_num_pending_blocks, block_payload.block, + )) + ); + return; // Drop the block if we've exceeded the maximum } + + // Create the new payload status + let epoch_and_round = (block_payload.block.epoch(), block_payload.block.round()); + let payload_status = if verified_payload_signatures { + BlockPayloadStatus::AvailableAndVerified(block_payload) + } else { + BlockPayloadStatus::AvailableAndUnverified(block_payload) + }; + + // Insert the new payload status + self.block_payloads + .lock() + .insert(epoch_and_round, payload_status); } - /// Removes the given pipelined blocks from the payload store - pub fn remove_blocks(&self, blocks: &[Arc]) { - let mut block_transaction_payloads = self.block_transaction_payloads.lock(); - for block in blocks.iter() { - block_transaction_payloads.remove(&block.id()); - } + /// Removes all blocks up to the specified epoch and round (inclusive) + pub fn remove_blocks_for_epoch_round(&self, epoch: u64, round: Round) { + // Determine the round to split off + let split_off_round = round.saturating_add(1); + + // Remove the blocks from the payload store + let mut block_payloads = self.block_payloads.lock(); + *block_payloads = block_payloads.split_off(&(epoch, split_off_round)); + } + + /// Removes the committed blocks from the payload store + pub fn remove_committed_blocks(&self, committed_blocks: &[Arc]) { + // Get the highest epoch and round for the committed blocks + let (highest_epoch, highest_round) = committed_blocks + .last() + .map_or((0, 0), |block| (block.epoch(), block.round())); + + // Remove the blocks + self.remove_blocks_for_epoch_round(highest_epoch, highest_round); } /// Updates the metrics for the payload store pub fn update_payload_store_metrics(&self) { - // Update the number of block transaction payloads - let block_transaction_payloads = self.block_transaction_payloads.lock(); - let num_payloads = block_transaction_payloads.len() as u64; + // Update the number of block payloads + let num_payloads = self.block_payloads.lock().len() as u64; metrics::set_gauge_with_label( &metrics::OBSERVER_NUM_PROCESSED_BLOCKS, metrics::STORED_PAYLOADS_LABEL, num_payloads, ); + + // Update the highest round for the block payloads + let highest_round = self + .block_payloads + .lock() + .last_key_value() + .map(|((_, round), _)| *round) + .unwrap_or(0); + metrics::set_gauge_with_label( + &metrics::OBSERVER_PROCESSED_BLOCK_ROUNDS, + metrics::STORED_PAYLOADS_LABEL, + highest_round, + ); } -} -impl Default for BlockPayloadStore { - fn default() -> Self { - Self::new() + /// Verifies the block payload signatures against the given epoch state. + /// If verification is successful, blocks are marked as verified. Each + /// new verified block is + pub fn verify_payload_signatures(&mut self, epoch_state: &EpochState) -> Vec { + // Get the current epoch + let current_epoch = epoch_state.epoch; + + // Gather the keys for the block payloads + let payload_epochs_and_rounds: Vec<(u64, Round)> = + self.block_payloads.lock().keys().cloned().collect(); + + // Go through all unverified blocks and attempt to verify the signatures + let mut verified_payloads_to_update = vec![]; + for (epoch, round) in payload_epochs_and_rounds { + // Check if we can break early (BtreeMaps are sorted by key) + if epoch > current_epoch { + break; + } + + // Otherwise, attempt to verify the payload signatures + if epoch == current_epoch { + if let Entry::Occupied(mut entry) = self.block_payloads.lock().entry((epoch, round)) + { + if let BlockPayloadStatus::AvailableAndUnverified(block_payload) = + entry.get_mut() + { + if let Err(error) = block_payload.verify_payload_signatures(epoch_state) { + // Log the verification failure + error!( + LogSchema::new(LogEntry::ConsensusObserver).message(&format!( + "Failed to verify the block payload signatures for epoch: {:?} and round: {:?}. Error: {:?}", + epoch, round, error + )) + ); + + // Remove the block payload from the store + entry.remove(); + } else { + // Save the block payload for reinsertion + verified_payloads_to_update.push(block_payload.clone()); + } + } + } + } + } + + // Collect the rounds of all newly verified blocks + let verified_payload_rounds: Vec = verified_payloads_to_update + .iter() + .map(|block_payload| block_payload.block.round()) + .collect(); + + // Update the verified block payloads. Note: this will cause + // notifications to be sent to any listeners that are waiting. + for verified_payload in verified_payloads_to_update { + self.insert_block_payload(verified_payload, true); + } + + // Return the newly verified payload rounds + verified_payload_rounds } } #[cfg(test)] mod test { use super::*; + use crate::consensus_observer::network_message::BlockTransactionPayload; use aptos_consensus_types::{ block::Block, block_data::{BlockData, BlockType}, + common::ProofWithData, + proof_of_store::{BatchId, BatchInfo, ProofOfStore}, quorum_cert::QuorumCert, }; - use aptos_types::{block_info::Round, transaction::Version}; + use aptos_crypto::HashValue; + use aptos_types::{ + aggregate_signature::AggregateSignature, + block_info::{BlockInfo, Round}, + transaction::Version, + validator_signer::ValidatorSigner, + validator_verifier::{ValidatorConsensusInfo, ValidatorVerifier}, + PeerId, + }; + use rand::{rngs::OsRng, Rng}; #[test] fn test_all_payloads_exist() { + // Create the consensus observer config + let max_num_pending_blocks = 1000; + let consensus_observer_config = ConsensusObserverConfig { + max_num_pending_blocks, + ..ConsensusObserverConfig::default() + }; + // Create a new block payload store - let block_payload_store = BlockPayloadStore::new(); + let block_payload_store = BlockPayloadStore::new(consensus_observer_config); + + // Add some unverified blocks to the payload store + let num_blocks_in_store = 100; + let unverified_blocks = create_and_add_blocks_to_store( + block_payload_store.clone(), + num_blocks_in_store, + 1, + false, + ); + + // Verify the payloads don't exist in the block payload store + assert!(!block_payload_store.all_payloads_exist(&unverified_blocks)); + assert_eq!(get_num_verified_payloads(&block_payload_store), 0); + assert_eq!( + get_num_unverified_payloads(&block_payload_store), + num_blocks_in_store + ); - // Add some blocks to the payload store + // Add some verified blocks to the payload store let num_blocks_in_store = 100; - let pipelined_blocks = - create_and_add_blocks_to_store(block_payload_store.clone(), num_blocks_in_store); + let verified_blocks = create_and_add_blocks_to_store( + block_payload_store.clone(), + num_blocks_in_store, + 0, + true, + ); // Check that all the payloads exist in the block payload store - assert!(block_payload_store.all_payloads_exist(&pipelined_blocks)); + assert!(block_payload_store.all_payloads_exist(&verified_blocks)); // Check that a subset of the payloads exist in the block payload store - let subset_pipelined_blocks = &pipelined_blocks[0..50]; - assert!(block_payload_store.all_payloads_exist(subset_pipelined_blocks)); + let subset_verified_blocks = &verified_blocks[0..50]; + assert!(block_payload_store.all_payloads_exist(subset_verified_blocks)); // Remove some of the payloads from the block payload store - block_payload_store.remove_blocks(subset_pipelined_blocks); + block_payload_store.remove_committed_blocks(subset_verified_blocks); // Check that the payloads no longer exist in the block payload store - assert!(!block_payload_store.all_payloads_exist(subset_pipelined_blocks)); + assert!(!block_payload_store.all_payloads_exist(subset_verified_blocks)); // Check that the remaining payloads still exist in the block payload store - let subset_pipelined_blocks = &pipelined_blocks[50..100]; - assert!(block_payload_store.all_payloads_exist(subset_pipelined_blocks)); + let subset_verified_blocks = &verified_blocks[50..100]; + assert!(block_payload_store.all_payloads_exist(subset_verified_blocks)); // Remove the remaining payloads from the block payload store - block_payload_store.remove_blocks(subset_pipelined_blocks); + block_payload_store.remove_committed_blocks(subset_verified_blocks); // Check that the payloads no longer exist in the block payload store - assert!(!block_payload_store.all_payloads_exist(subset_pipelined_blocks)); + assert!(!block_payload_store.all_payloads_exist(subset_verified_blocks)); + } + + #[test] + fn test_all_payloads_exist_unverified() { + // Create a new block payload store + let consensus_observer_config = ConsensusObserverConfig::default(); + let block_payload_store = BlockPayloadStore::new(consensus_observer_config); + + // Add several verified blocks to the payload store + let num_blocks_in_store = 10; + let verified_blocks = create_and_add_blocks_to_store( + block_payload_store.clone(), + num_blocks_in_store, + 0, + true, + ); + + // Check that the payloads exists in the block payload store + assert!(block_payload_store.all_payloads_exist(&verified_blocks)); + + // Mark the payload of the first block as unverified + mark_payload_as_unverified(block_payload_store.clone(), &verified_blocks[0]); + + // Check that the payload no longer exists in the block payload store + assert!(!block_payload_store.all_payloads_exist(&verified_blocks)); + + // Check that the remaining payloads still exist in the block payload store + assert!(block_payload_store.all_payloads_exist(&verified_blocks[1..10])); } #[test] fn test_clear_all_payloads() { // Create a new block payload store - let block_payload_store = BlockPayloadStore::new(); + let consensus_observer_config = ConsensusObserverConfig::default(); + let block_payload_store = BlockPayloadStore::new(consensus_observer_config); + + // Add some unverified blocks to the payload store + let num_blocks_in_store = 30; + create_and_add_blocks_to_store(block_payload_store.clone(), num_blocks_in_store, 1, false); + + // Add some verified blocks to the payload store + let verified_blocks = create_and_add_blocks_to_store( + block_payload_store.clone(), + num_blocks_in_store, + 0, + true, + ); - // Add some blocks to the payload store - let num_blocks_in_store = 100; - let pipelined_blocks = - create_and_add_blocks_to_store(block_payload_store.clone(), num_blocks_in_store); + // Check that the payloads exist in the block payload store + assert!(block_payload_store.all_payloads_exist(&verified_blocks)); - // Check that all the payloads exist in the block payload store - assert!(block_payload_store.all_payloads_exist(&pipelined_blocks)); + // Verify the number of blocks in the block payload store + check_num_unverified_payloads(&block_payload_store, num_blocks_in_store); + check_num_verified_payloads(&block_payload_store, num_blocks_in_store); // Clear all the payloads from the block payload store block_payload_store.clear_all_payloads(); - // Check that all the payloads exist in the block payload store - assert!(!block_payload_store.all_payloads_exist(&pipelined_blocks)); + // Check that the payloads no longer exist in the block payload store + assert!(!block_payload_store.all_payloads_exist(&verified_blocks)); // Check that the block payload store is empty - assert!(block_payload_store - .block_transaction_payloads - .lock() - .is_empty()); + check_num_unverified_payloads(&block_payload_store, 0); + check_num_verified_payloads(&block_payload_store, 0); } #[test] - fn test_all_payloads_exist_requested() { + fn test_insert_block_payload() { // Create a new block payload store - let block_payload_store = BlockPayloadStore::new(); + let consensus_observer_config = ConsensusObserverConfig::default(); + let mut block_payload_store = BlockPayloadStore::new(consensus_observer_config); + + // Add some verified blocks to the payload store + let num_blocks_in_store = 20; + let verified_blocks = create_and_add_blocks_to_store( + block_payload_store.clone(), + num_blocks_in_store, + 0, + true, + ); - // Add several blocks to the payload store - let num_blocks_in_store = 10; - let pipelined_blocks = - create_and_add_blocks_to_store(block_payload_store.clone(), num_blocks_in_store); + // Check that the block payload store contains the new block payloads + assert!(block_payload_store.all_payloads_exist(&verified_blocks)); - // Check that the payloads exists in the block payload store - assert!(block_payload_store.all_payloads_exist(&pipelined_blocks)); + // Verify the number of blocks in the block payload store + check_num_unverified_payloads(&block_payload_store, 0); + check_num_verified_payloads(&block_payload_store, num_blocks_in_store); - // Mark the payload of the first block as requested - mark_payload_as_requested(block_payload_store.clone(), pipelined_blocks[0].id()); + // Mark the payload of the first block as unverified + mark_payload_as_unverified(block_payload_store.clone(), &verified_blocks[0]); // Check that the payload no longer exists in the block payload store - assert!(!block_payload_store.all_payloads_exist(&pipelined_blocks)); + assert!(!block_payload_store.all_payloads_exist(&verified_blocks)); - // Check that the remaining payloads still exist in the block payload store - assert!(block_payload_store.all_payloads_exist(&pipelined_blocks[1..10])); + // Verify the number of verified blocks in the block payload store + check_num_verified_payloads(&block_payload_store, num_blocks_in_store - 1); + + // Insert the same block payload into the block payload store (as verified) + let transaction_payload = + BlockTransactionPayload::new(vec![], Some(0), ProofWithData::empty(), vec![]); + let block_payload = BlockPayload::new(verified_blocks[0].block_info(), transaction_payload); + block_payload_store.insert_block_payload(block_payload, true); + + // Check that the block payload store now contains the requested block payload + assert!(block_payload_store.all_payloads_exist(&verified_blocks)); } #[test] - fn test_insert_block_payload() { + fn test_insert_block_payload_limit_verified() { + // Create a new config observer config + let max_num_pending_blocks = 10; + let consensus_observer_config = ConsensusObserverConfig { + max_num_pending_blocks, + ..ConsensusObserverConfig::default() + }; + // Create a new block payload store - let mut block_payload_store = BlockPayloadStore::new(); + let block_payload_store = BlockPayloadStore::new(consensus_observer_config); - // Add some blocks to the payload store - let num_blocks_in_store = 10; - let pipelined_blocks = - create_and_add_blocks_to_store(block_payload_store.clone(), num_blocks_in_store); + // Add the maximum number of verified blocks to the payload store + let num_blocks_in_store = max_num_pending_blocks as usize; + create_and_add_blocks_to_store(block_payload_store.clone(), num_blocks_in_store, 0, true); - // Check that the block payload store contains the new block payloads - assert!(block_payload_store.all_payloads_exist(&pipelined_blocks)); + // Verify the number of blocks in the block payload store + check_num_verified_payloads(&block_payload_store, num_blocks_in_store); + check_num_unverified_payloads(&block_payload_store, 0); - // Mark the payload of the first block as requested - let payload_receiver = - mark_payload_as_requested(block_payload_store.clone(), pipelined_blocks[0].id()); + // Add more blocks to the payload store + let num_blocks_to_add = 5; + create_and_add_blocks_to_store(block_payload_store.clone(), num_blocks_to_add, 0, true); - // Check that the payload no longer exists in the block payload store - assert!(!block_payload_store.all_payloads_exist(&pipelined_blocks)); + // Verify the number of blocks in the block payload store + check_num_verified_payloads(&block_payload_store, max_num_pending_blocks as usize); + check_num_unverified_payloads(&block_payload_store, 0); - // Insert the same block payload into the block payload store - block_payload_store.insert_block_payload(pipelined_blocks[0].block_info(), vec![], Some(0)); + // Add a large number of blocks to the payload store + let num_blocks_to_add = 100; + create_and_add_blocks_to_store(block_payload_store.clone(), num_blocks_to_add, 0, true); - // Check that the block payload store now contains the requested block payload - assert!(block_payload_store.all_payloads_exist(&pipelined_blocks)); + // Verify the number of blocks in the block payload store + check_num_verified_payloads(&block_payload_store, max_num_pending_blocks as usize); + check_num_unverified_payloads(&block_payload_store, 0); + } + + #[test] + fn test_insert_block_payload_limit_unverified() { + // Create a new config observer config + let max_num_pending_blocks = 10; + let consensus_observer_config = ConsensusObserverConfig { + max_num_pending_blocks, + ..ConsensusObserverConfig::default() + }; + + // Create a new block payload store + let block_payload_store = BlockPayloadStore::new(consensus_observer_config); + + // Add the maximum number of unverified blocks to the payload store + let num_blocks_in_store = max_num_pending_blocks as usize; + create_and_add_blocks_to_store(block_payload_store.clone(), num_blocks_in_store, 0, false); + + // Verify the number of blocks in the block payload store + check_num_unverified_payloads(&block_payload_store, num_blocks_in_store); + check_num_verified_payloads(&block_payload_store, 0); + + // Add more blocks to the payload store + let num_blocks_to_add = 5; + create_and_add_blocks_to_store(block_payload_store.clone(), num_blocks_to_add, 0, false); + + // Verify the number of blocks in the block payload store + check_num_unverified_payloads(&block_payload_store, max_num_pending_blocks as usize); + check_num_verified_payloads(&block_payload_store, 0); - // Check that the payload receiver receives the requested block payload message - let block_transaction_payload = payload_receiver.blocking_recv().unwrap(); - assert!(block_transaction_payload.transactions.is_empty()); - assert_eq!(block_transaction_payload.limit, Some(0)); + // Add a large number of blocks to the payload store + let num_blocks_to_add = 100; + create_and_add_blocks_to_store(block_payload_store.clone(), num_blocks_to_add, 0, false); + + // Verify the number of blocks in the block payload store + check_num_unverified_payloads(&block_payload_store, max_num_pending_blocks as usize); + check_num_verified_payloads(&block_payload_store, 0); } #[test] - fn test_remove_blocks() { + fn test_remove_blocks_for_epoch_round_verified() { // Create a new block payload store - let block_payload_store = BlockPayloadStore::new(); + let consensus_observer_config = ConsensusObserverConfig::default(); + let block_payload_store = BlockPayloadStore::new(consensus_observer_config); - // Add some blocks to the payload store - let num_blocks_in_store = 10; - let pipelined_blocks = - create_and_add_blocks_to_store(block_payload_store.clone(), num_blocks_in_store); + // Add some verified blocks to the payload store for the current epoch + let current_epoch = 0; + let num_blocks_in_store = 100; + let verified_blocks = create_and_add_blocks_to_store( + block_payload_store.clone(), + num_blocks_in_store, + current_epoch, + true, + ); + + // Remove all the blocks for the given epoch and round + block_payload_store.remove_blocks_for_epoch_round(current_epoch, 49); + + // Check that the block payload store no longer contains the removed blocks + let block_payloads = block_payload_store.get_block_payloads(); + for verified_block in verified_blocks.iter().take(50) { + assert!(!block_payloads + .lock() + .contains_key(&(verified_block.epoch(), verified_block.round()))); + } + + // Verify the number of blocks in the block payload store + check_num_verified_payloads(&block_payload_store, num_blocks_in_store - 50); + + // Remove all the blocks for the given epoch and round + block_payload_store + .remove_blocks_for_epoch_round(current_epoch, num_blocks_in_store as Round); + + // Check that the block payload store no longer contains any blocks + let block_payloads = block_payload_store.get_block_payloads(); + assert!(block_payloads.lock().is_empty()); + + // Verify the number of blocks in the block payload store + check_num_verified_payloads(&block_payload_store, 0); + + // Add some verified blocks to the payload store for the next epoch + let next_epoch = current_epoch + 1; + create_and_add_blocks_to_store( + block_payload_store.clone(), + num_blocks_in_store, + next_epoch, + true, + ); + + // Remove all the blocks for the future epoch and round + let future_epoch = next_epoch + 1; + block_payload_store.remove_blocks_for_epoch_round(future_epoch, 0); + + // Verify the store is now empty + check_num_verified_payloads(&block_payload_store, 0); + } + + #[test] + fn test_remove_blocks_for_epoch_round_unverified() { + // Create a new block payload store + let consensus_observer_config = ConsensusObserverConfig::default(); + let block_payload_store = BlockPayloadStore::new(consensus_observer_config); + + // Add some unverified blocks to the payload store for the current epoch + let current_epoch = 10; + let num_blocks_in_store = 100; + let unverified_blocks = create_and_add_blocks_to_store( + block_payload_store.clone(), + num_blocks_in_store, + current_epoch, + false, + ); + + // Remove all the blocks for the given epoch and round + block_payload_store.remove_blocks_for_epoch_round(current_epoch, 49); + + // Check that the block payload store no longer contains the removed blocks + for unverified_block in unverified_blocks.iter().take(50) { + assert!(!block_payload_store + .block_payloads + .lock() + .contains_key(&(unverified_block.epoch(), unverified_block.round()))); + } + + // Verify the number of blocks in the block payload store + check_num_unverified_payloads(&block_payload_store, num_blocks_in_store - 50); + + // Remove all the blocks for the given epoch and round + block_payload_store + .remove_blocks_for_epoch_round(current_epoch, num_blocks_in_store as Round); + + // Check that the block payload store no longer contains any blocks + assert!(block_payload_store.block_payloads.lock().is_empty()); + + // Verify the number of blocks in the block payload store + check_num_unverified_payloads(&block_payload_store, 0); + + // Add some unverified blocks to the payload store for the next epoch + let next_epoch = current_epoch + 1; + create_and_add_blocks_to_store( + block_payload_store.clone(), + num_blocks_in_store, + next_epoch, + false, + ); + + // Remove all the blocks for the future epoch and round + let future_epoch = next_epoch + 10; + block_payload_store.remove_blocks_for_epoch_round(future_epoch, 0); + + // Verify the store is now empty + check_num_unverified_payloads(&block_payload_store, 0); + } + + #[test] + fn test_remove_committed_blocks_verified() { + // Create a new block payload store + let consensus_observer_config = ConsensusObserverConfig::default(); + let block_payload_store = BlockPayloadStore::new(consensus_observer_config); + + // Add some blocks to the payload store for the current epoch + let current_epoch = 0; + let num_blocks_in_store = 100; + let verified_blocks = create_and_add_blocks_to_store( + block_payload_store.clone(), + num_blocks_in_store, + current_epoch, + true, + ); + + // Remove the first block from the block payload store + block_payload_store.remove_committed_blocks(&verified_blocks[0..1]); + + // Check that the block payload store no longer contains the removed block + let block_payloads = block_payload_store.get_block_payloads(); + let removed_block = &verified_blocks[0]; + assert!(!block_payloads + .lock() + .contains_key(&(removed_block.epoch(), removed_block.round()))); + + // Verify the number of blocks in the block payload store + check_num_verified_payloads(&block_payload_store, num_blocks_in_store - 1); + + // Remove the last 5 blocks from the block payload store + block_payload_store.remove_committed_blocks(&verified_blocks[5..10]); + + // Check that the block payload store no longer contains the removed blocks + let block_payloads = block_payload_store.get_block_payloads(); + for verified_block in verified_blocks.iter().take(10).skip(5) { + assert!(!block_payloads + .lock() + .contains_key(&(verified_block.epoch(), verified_block.round()))); + } + + // Verify the number of blocks in the block payload store + check_num_verified_payloads(&block_payload_store, num_blocks_in_store - 10); + + // Remove all the blocks from the block payload store (including some that don't exist) + block_payload_store.remove_committed_blocks(&verified_blocks[0..num_blocks_in_store]); + + // Check that the block payload store no longer contains any blocks + let block_payloads = block_payload_store.get_block_payloads(); + assert!(block_payloads.lock().is_empty()); + + // Verify the number of blocks in the block payload store + check_num_verified_payloads(&block_payload_store, 0); + + // Add some blocks to the payload store for the next epoch + let next_epoch = 1; + let verified_blocks = create_and_add_blocks_to_store( + block_payload_store.clone(), + num_blocks_in_store, + next_epoch, + true, + ); + + // Remove the last committed block from the future epoch + block_payload_store.remove_committed_blocks(&verified_blocks[99..100]); + + // Check that the block payload store is now empty + check_num_verified_payloads(&block_payload_store, 0); + } + + #[test] + fn test_remove_committed_blocks_unverified() { + // Create a new block payload store + let consensus_observer_config = ConsensusObserverConfig::default(); + let block_payload_store = BlockPayloadStore::new(consensus_observer_config); + + // Add some blocks to the payload store for the current epoch + let current_epoch = 10; + let num_blocks_in_store = 100; + let unverified_blocks = create_and_add_blocks_to_store( + block_payload_store.clone(), + num_blocks_in_store, + current_epoch, + false, + ); // Remove the first block from the block payload store - block_payload_store.remove_blocks(&pipelined_blocks[0..1]); + block_payload_store.remove_committed_blocks(&unverified_blocks[0..1]); // Check that the block payload store no longer contains the removed block - let block_transaction_payloads = block_payload_store.get_block_payloads(); - assert!(!block_transaction_payloads + let removed_block = &unverified_blocks[0]; + assert!(!block_payload_store + .block_payloads .lock() - .contains_key(&pipelined_blocks[0].id())); + .contains_key(&(removed_block.epoch(), removed_block.round()))); + + // Verify the number of blocks in the block payload store + check_num_unverified_payloads(&block_payload_store, num_blocks_in_store - 1); // Remove the last 5 blocks from the block payload store - block_payload_store.remove_blocks(&pipelined_blocks[5..10]); + block_payload_store.remove_committed_blocks(&unverified_blocks[5..10]); // Check that the block payload store no longer contains the removed blocks - let block_transaction_payloads = block_payload_store.get_block_payloads(); - for pipelined_block in pipelined_blocks.iter().take(10).skip(5) { - assert!(!block_transaction_payloads + for verified_block in unverified_blocks.iter().take(10).skip(5) { + assert!(!block_payload_store + .block_payloads .lock() - .contains_key(&pipelined_block.id())); + .contains_key(&(verified_block.epoch(), verified_block.round()))); } + // Verify the number of blocks in the block payload store + check_num_unverified_payloads(&block_payload_store, num_blocks_in_store - 10); + // Remove all the blocks from the block payload store (including some that don't exist) - block_payload_store.remove_blocks(&pipelined_blocks[0..10]); + block_payload_store.remove_committed_blocks(&unverified_blocks[0..num_blocks_in_store]); // Check that the block payload store no longer contains any blocks - let block_transaction_payloads = block_payload_store.get_block_payloads(); - assert!(block_transaction_payloads.lock().is_empty()); + assert!(block_payload_store.block_payloads.lock().is_empty()); + + // Verify the number of blocks in the block payload store + check_num_unverified_payloads(&block_payload_store, 0); + + // Add some blocks to the payload store for the next epoch + let next_epoch = 11; + let unverified_blocks = create_and_add_blocks_to_store( + block_payload_store.clone(), + num_blocks_in_store, + next_epoch, + false, + ); + + // Remove the last committed block from the future epoch + block_payload_store.remove_committed_blocks(&unverified_blocks[99..100]); + + // Check that the block payload store is now empty + check_num_unverified_payloads(&block_payload_store, 0); + } + + #[test] + fn test_verify_payload_signatures() { + // Create a new block payload store + let consensus_observer_config = ConsensusObserverConfig::default(); + let mut block_payload_store = BlockPayloadStore::new(consensus_observer_config); + + // Add some verified blocks for the current epoch + let current_epoch = 0; + let num_verified_blocks = 10; + create_and_add_blocks_to_store( + block_payload_store.clone(), + num_verified_blocks, + current_epoch, + true, + ); + + // Add some unverified blocks for the next epoch + let next_epoch = current_epoch + 1; + let num_unverified_blocks = 20; + let unverified_blocks = create_and_add_blocks_to_store( + block_payload_store.clone(), + num_unverified_blocks, + next_epoch, + false, + ); + + // Add some unverified blocks for a future epoch + let future_epoch = current_epoch + 30; + let num_future_blocks = 30; + let future_unverified_blocks = create_and_add_blocks_to_store( + block_payload_store.clone(), + num_future_blocks, + future_epoch, + false, + ); + + // Create an epoch state for the next epoch (with an empty verifier) + let epoch_state = EpochState::new(next_epoch, ValidatorVerifier::new(vec![])); + + // Verify the block payload signatures + block_payload_store.verify_payload_signatures(&epoch_state); + + // Verify the unverified payloads were moved to the verified store + assert!(block_payload_store.all_payloads_exist(&unverified_blocks)); + assert_eq!( + get_num_verified_payloads(&block_payload_store), + num_verified_blocks + num_unverified_blocks + ); + assert_eq!( + get_num_unverified_payloads(&block_payload_store), + num_future_blocks + ); + + // Clear the verified blocks and check the verified blocks are empty + block_payload_store.remove_committed_blocks(&unverified_blocks); + assert_eq!(get_num_verified_payloads(&block_payload_store), 0); + + // Create an epoch state for the future epoch (with an empty verifier) + let epoch_state = EpochState::new(future_epoch, ValidatorVerifier::new(vec![])); + + // Verify the block payload signatures for a future epoch + block_payload_store.verify_payload_signatures(&epoch_state); + + // Verify the future unverified payloads were moved to the verified store + assert!(block_payload_store.all_payloads_exist(&future_unverified_blocks)); + assert_eq!( + get_num_verified_payloads(&block_payload_store), + num_future_blocks + ); + assert_eq!(get_num_unverified_payloads(&block_payload_store), 0); + } + + #[test] + fn test_verify_payload_signatures_failure() { + // Create a new block payload store + let consensus_observer_config = ConsensusObserverConfig::default(); + let mut block_payload_store = BlockPayloadStore::new(consensus_observer_config); + + // Add some verified blocks for the current epoch + let current_epoch = 10; + let num_verified_blocks = 6; + create_and_add_blocks_to_store( + block_payload_store.clone(), + num_verified_blocks, + current_epoch, + true, + ); + + // Add some unverified blocks for the next epoch + let next_epoch = current_epoch + 1; + let num_unverified_blocks = 15; + let unverified_blocks = create_and_add_blocks_to_store( + block_payload_store.clone(), + num_unverified_blocks, + next_epoch, + false, + ); + + // Add some unverified blocks for a future epoch + let future_epoch = next_epoch + 1; + let num_future_blocks = 10; + let unverified_future_blocks = create_and_add_blocks_to_store( + block_payload_store.clone(), + num_future_blocks, + future_epoch, + false, + ); + + // Create an epoch state for the next epoch (with a non-empty verifier) + let validator_signer = ValidatorSigner::random(None); + let validator_consensus_info = ValidatorConsensusInfo::new( + validator_signer.author(), + validator_signer.public_key(), + 100, + ); + let validator_verifier = ValidatorVerifier::new(vec![validator_consensus_info]); + let epoch_state = EpochState::new(next_epoch, validator_verifier.clone()); + + // Verify the block payload signatures (for this epoch) + block_payload_store.verify_payload_signatures(&epoch_state); + + // Ensure the unverified payloads were not verified + assert!(!block_payload_store.all_payloads_exist(&unverified_blocks)); + + // Ensure the unverified payloads were all removed (for this epoch) + assert_eq!( + get_num_unverified_payloads(&block_payload_store), + num_future_blocks + ); + + // Create an epoch state for the future epoch (with a non-empty verifier) + let epoch_state = EpochState::new(future_epoch, validator_verifier); + + // Verify the block payload signatures (for the future epoch) + block_payload_store.verify_payload_signatures(&epoch_state); + + // Ensure the future unverified payloads were not verified + assert!(!block_payload_store.all_payloads_exist(&unverified_future_blocks)); + + // Ensure the future unverified payloads were all removed (for the future epoch) + assert_eq!(get_num_unverified_payloads(&block_payload_store), 0); } /// Creates and adds the given number of blocks to the block payload store fn create_and_add_blocks_to_store( mut block_payload_store: BlockPayloadStore, num_blocks: usize, + epoch: u64, + verified_payload_signatures: bool, ) -> Vec> { let mut pipelined_blocks = vec![]; for i in 0..num_blocks { // Create the block info let block_info = BlockInfo::new( - i as u64, + epoch, i as Round, HashValue::random(), HashValue::random(), @@ -314,8 +881,31 @@ mod test { None, ); + // Create the block transaction payload with proofs of store + let mut proofs_of_store = vec![]; + for _ in 0..10 { + let batch_info = BatchInfo::new( + PeerId::random(), + BatchId::new(0), + epoch, + 0, + HashValue::random(), + 0, + 0, + 0, + ); + proofs_of_store.push(ProofOfStore::new(batch_info, AggregateSignature::empty())); + } + let block_transaction_payload = BlockTransactionPayload::new( + vec![], + None, + ProofWithData::new(proofs_of_store), + vec![], + ); + // Insert the block payload into the store - block_payload_store.insert_block_payload(block_info.clone(), vec![], Some(i as u64)); + let block_payload = BlockPayload::new(block_info.clone(), block_transaction_payload); + block_payload_store.insert_block_payload(block_payload, verified_payload_signatures); // Create the equivalent pipelined block let block_data = BlockData::new_for_testing( @@ -335,21 +925,67 @@ mod test { pipelined_blocks } - /// Marks the payload of the given block ID as requested and returns the receiver - fn mark_payload_as_requested( + /// Checks the number of unverified payloads in the block payload store + fn check_num_unverified_payloads( + block_payload_store: &BlockPayloadStore, + expected_num_payloads: usize, + ) { + let num_payloads = get_num_unverified_payloads(block_payload_store); + assert_eq!(num_payloads, expected_num_payloads); + } + + /// Checks the number of verified payloads in the block payload store + fn check_num_verified_payloads( + block_payload_store: &BlockPayloadStore, + expected_num_payloads: usize, + ) { + let num_payloads = get_num_verified_payloads(block_payload_store); + assert_eq!(num_payloads, expected_num_payloads); + } + + /// Generates and returns a random number (u64) + pub fn get_random_u64() -> u64 { + OsRng.gen() + } + + /// Returns the number of unverified payloads in the block payload store + fn get_num_unverified_payloads(block_payload_store: &BlockPayloadStore) -> usize { + let mut num_unverified_payloads = 0; + for (_, block_payload_status) in block_payload_store.block_payloads.lock().iter() { + if let BlockPayloadStatus::AvailableAndUnverified(_) = block_payload_status { + num_unverified_payloads += 1; + } + } + num_unverified_payloads + } + + /// Returns the number of verified payloads in the block payload store + fn get_num_verified_payloads(block_payload_store: &BlockPayloadStore) -> usize { + let mut num_verified_payloads = 0; + for (_, block_payload_status) in block_payload_store.block_payloads.lock().iter() { + if let BlockPayloadStatus::AvailableAndVerified(_) = block_payload_status { + num_verified_payloads += 1; + } + } + num_verified_payloads + } + + /// Marks the payload of the given block as unverified + fn mark_payload_as_unverified( block_payload_store: BlockPayloadStore, - block_id: HashValue, - ) -> oneshot::Receiver { - // Get the block payload entry for the given block ID + block: &Arc, + ) { + // Get the payload entry for the given block let block_payloads = block_payload_store.get_block_payloads(); let mut block_payloads = block_payloads.lock(); - let block_payload = block_payloads.get_mut(&block_id).unwrap(); - - // Mark the block payload as requested - let (payload_sender, payload_receiver) = oneshot::channel(); - *block_payload = BlockPayloadStatus::Requested(payload_sender); - - // Return the payload receiver - payload_receiver + let block_payload = block_payloads + .get_mut(&(block.epoch(), block.round())) + .unwrap(); + + // Mark the block payload as unverified + *block_payload = BlockPayloadStatus::AvailableAndUnverified(BlockPayload::new( + block.block_info(), + BlockTransactionPayload::empty(), + )); } } diff --git a/consensus/src/consensus_observer/pending_blocks.rs b/consensus/src/consensus_observer/pending_blocks.rs index 32832c1897aec..1f6090e220989 100644 --- a/consensus/src/consensus_observer/pending_blocks.rs +++ b/consensus/src/consensus_observer/pending_blocks.rs @@ -180,6 +180,7 @@ impl PendingBlockStore { #[cfg(test)] mod test { use super::*; + use crate::consensus_observer::network_message::{BlockPayload, BlockTransactionPayload}; use aptos_consensus_types::{ block::Block, block_data::{BlockData, BlockType}, @@ -399,7 +400,7 @@ mod test { ); // Create a new block payload store and insert payloads for the second block - let mut block_payload_store = BlockPayloadStore::new(); + let mut block_payload_store = BlockPayloadStore::new(consensus_observer_config); let second_block = pending_blocks[1].clone(); insert_payloads_for_ordered_block(&mut block_payload_store, &second_block); @@ -460,13 +461,15 @@ mod test { ); // Create an empty block payload store - let mut block_payload_store = BlockPayloadStore::new(); + let mut block_payload_store = BlockPayloadStore::new(consensus_observer_config); // Incrementally insert and process each payload for the first block let first_block = pending_blocks.first().unwrap().clone(); for block in first_block.blocks().clone() { // Insert the block - block_payload_store.insert_block_payload(block.block_info(), vec![], None); + let block_payload = + BlockPayload::new(block.block_info(), BlockTransactionPayload::empty()); + block_payload_store.insert_block_payload(block_payload, true); // Attempt to remove the block (which might not be ready) let payload_round = block.round(); @@ -507,7 +510,9 @@ mod test { // Insert the block only if this is not the first block let payload_round = block.round(); if payload_round != last_block.first_block().round() { - block_payload_store.insert_block_payload(block.block_info(), vec![], None); + let block_payload = + BlockPayload::new(block.block_info(), BlockTransactionPayload::empty()); + block_payload_store.insert_block_payload(block_payload, true); } // Attempt to remove the block (which might not be ready) @@ -554,7 +559,7 @@ mod test { ); // Create a new block payload store and insert payloads for the first block - let mut block_payload_store = BlockPayloadStore::new(); + let mut block_payload_store = BlockPayloadStore::new(consensus_observer_config); let first_block = pending_blocks.first().unwrap().clone(); insert_payloads_for_ordered_block(&mut block_payload_store, &first_block); @@ -635,7 +640,7 @@ mod test { ); // Create an empty block payload store - let block_payload_store = BlockPayloadStore::new(); + let block_payload_store = BlockPayloadStore::new(consensus_observer_config); // Remove the third block (which is not ready) let third_block = pending_blocks[2].clone(); @@ -737,7 +742,9 @@ mod test { ordered_block: &OrderedBlock, ) { for block in ordered_block.blocks() { - block_payload_store.insert_block_payload(block.block_info(), vec![], None); + let block_payload = + BlockPayload::new(block.block_info(), BlockTransactionPayload::empty()); + block_payload_store.insert_block_payload(block_payload, true); } } diff --git a/consensus/src/consensus_observer/publisher.rs b/consensus/src/consensus_observer/publisher.rs index 46e31fa6f65ae..c95d51c706283 100644 --- a/consensus/src/consensus_observer/publisher.rs +++ b/consensus/src/consensus_observer/publisher.rs @@ -314,7 +314,9 @@ fn spawn_message_serializer_and_sender( #[cfg(test)] mod test { use super::*; + use crate::consensus_observer::network_message::BlockTransactionPayload; use aptos_config::network_id::NetworkId; + use aptos_consensus_types::common::ProofWithData; use aptos_crypto::HashValue; use aptos_network::{ application::{metadata::ConnectionState, storage::PeersAndMetadata}, @@ -491,10 +493,11 @@ mod test { } // Publish a message to the active subscribers + let transaction_payload = + BlockTransactionPayload::new(vec![], Some(10), ProofWithData::empty(), vec![]); let block_payload_message = ConsensusObserverMessage::new_block_payload_message( BlockInfo::empty(), - vec![], - Some(10), + transaction_payload, ); consensus_publisher .publish_message(block_payload_message.clone()) @@ -537,8 +540,10 @@ mod test { } // Publish another message to the active subscribers - let block_payload_message = - ConsensusObserverMessage::new_block_payload_message(BlockInfo::empty(), vec![], None); + let block_payload_message = ConsensusObserverMessage::new_block_payload_message( + BlockInfo::empty(), + BlockTransactionPayload::empty(), + ); consensus_publisher .publish_message(block_payload_message.clone()) .await; diff --git a/consensus/src/payload_manager.rs b/consensus/src/payload_manager.rs index f502c372aa38a..363c33000e404 100644 --- a/consensus/src/payload_manager.rs +++ b/consensus/src/payload_manager.rs @@ -3,7 +3,8 @@ use crate::{ consensus_observer::{ - network_message::ConsensusObserverMessage, payload_store::BlockPayloadStatus, + network_message::{BlockTransactionPayload, ConsensusObserverMessage}, + payload_store::BlockPayloadStatus, publisher::ConsensusPublisher, }, counters, @@ -11,22 +12,23 @@ use crate::{ }; use aptos_consensus_types::{ block::Block, - common::{DataStatus, Payload, ProofWithData}, - proof_of_store::ProofOfStore, + common::{DataStatus, Payload, ProofWithData, Round}, + proof_of_store::{BatchInfo, ProofOfStore}, }; use aptos_crypto::HashValue; -use aptos_executor_types::{ExecutorError::DataNotFound, *}; +use aptos_executor_types::{ + ExecutorError::{DataNotFound, InternalError}, + *, +}; use aptos_infallible::Mutex; use aptos_logger::prelude::*; use aptos_types::transaction::SignedTransaction; use futures::channel::mpsc::Sender; -use itertools::Either; use std::{ - collections::{hash_map::Entry, HashMap}, + collections::{btree_map::Entry, BTreeMap}, sync::Arc, - time::Duration, }; -use tokio::{sync::oneshot, time::timeout}; +use tokio::sync::oneshot; pub trait TPayloadManager: Send + Sync { fn prefetch_payload_data(&self, payload: &Payload, timestamp: u64); @@ -42,7 +44,7 @@ pub enum PayloadManager { Option>, ), ConsensusObserver( - Arc>>, + Arc>>, Option>, ), } @@ -184,43 +186,14 @@ impl PayloadManager { None => return Ok((Vec::new(), None)), }; - if let PayloadManager::ConsensusObserver(txns_pool, consensus_publisher) = self { - // If the data is already available, return it, otherwise put the tx in the pool and wait for it. - // It's important to make sure this doesn't race with the payload insertion part. - let result = match txns_pool.lock().entry(block.id()) { - Entry::Occupied(mut value) => match value.get_mut() { - BlockPayloadStatus::Available(data) => Either::Left(data.clone()), - BlockPayloadStatus::Requested(tx) => { - let (new_tx, rx) = oneshot::channel(); - *tx = new_tx; - Either::Right(rx) - }, - }, - Entry::Vacant(entry) => { - let (tx, rx) = oneshot::channel(); - entry.insert(BlockPayloadStatus::Requested(tx)); - Either::Right(rx) - }, - }; - let block_transaction_payload = match result { - Either::Left(data) => data, - Either::Right(rx) => timeout(Duration::from_millis(300), rx) - .await - .map_err(|_| ExecutorError::CouldNotGetData)? - .map_err(|_| ExecutorError::CouldNotGetData)?, - }; - if let Some(consensus_publisher) = consensus_publisher { - let message = ConsensusObserverMessage::new_block_payload_message( - block.gen_block_info(HashValue::zero(), 0, None), - block_transaction_payload.transactions.clone(), - block_transaction_payload.limit, - ); - consensus_publisher.publish_message(message).await; - } - return Ok(( - block_transaction_payload.transactions, - block_transaction_payload.limit, - )); + if let PayloadManager::ConsensusObserver(block_payloads, consensus_publisher) = self { + return get_transactions_for_observer( + block, + payload, + block_payloads, + consensus_publisher, + ) + .await; } async fn process_payload( @@ -296,15 +269,18 @@ impl PayloadManager { }, } } - - let result = match (self, payload) { - (PayloadManager::DirectMempool, Payload::DirectMempool(txns)) => (txns.clone(), None), + let (transactions, limit, proof_with_data, inline_batches) = match (self, payload) { + (PayloadManager::DirectMempool, Payload::DirectMempool(txns)) => { + return Ok((txns.clone(), None)) + }, ( PayloadManager::InQuorumStore(batch_reader, _, _), Payload::InQuorumStore(proof_with_data), ) => ( process_payload(proof_with_data, batch_reader.clone(), block).await?, None, + proof_with_data.clone(), + vec![], // No inline batches ), ( PayloadManager::InQuorumStore(batch_reader, _, _), @@ -317,6 +293,8 @@ impl PayloadManager { ) .await?, proof_with_data.max_txns_to_execute, + proof_with_data.proof_with_data.clone(), + vec![], // No inline batches ), ( PayloadManager::InQuorumStore(batch_reader, _, _), @@ -339,6 +317,11 @@ impl PayloadManager { all_txns }, *max_txns_to_execute, + proof_with_data.clone(), + inline_batches + .iter() + .map(|(batch_info, _)| batch_info.clone()) + .collect(), ), (_, _) => unreachable!( "Wrong payload {} epoch {}, round {}, id {}", @@ -348,14 +331,172 @@ impl PayloadManager { block.id() ), }; + if let PayloadManager::InQuorumStore(_, _, Some(consensus_publisher)) = self { + let transaction_payload = BlockTransactionPayload::new( + transactions.clone(), + limit, + proof_with_data, + inline_batches, + ); let message = ConsensusObserverMessage::new_block_payload_message( block.gen_block_info(HashValue::zero(), 0, None), - result.0.clone(), - result.1, + transaction_payload, ); consensus_publisher.publish_message(message).await; } - Ok(result) + + Ok((transactions, limit)) + } +} + +/// Returns the transactions for the consensus observer payload manager +async fn get_transactions_for_observer( + block: &Block, + payload: &Payload, + block_payloads: &Arc>>, + consensus_publisher: &Option>, +) -> ExecutorResult<(Vec, Option)> { + // The data should already be available (as consensus observer will only ever + // forward a block to the executor once the data has been received and verified). + let block_payload = match block_payloads.lock().entry((block.epoch(), block.round())) { + Entry::Occupied(mut value) => match value.get_mut() { + BlockPayloadStatus::AvailableAndVerified(block_payload) => block_payload.clone(), + BlockPayloadStatus::AvailableAndUnverified(_) => { + // This shouldn't happen (the payload should already be verified) + let error = format!( + "Payload data for block epoch {}, round {} is unverified!", + block.epoch(), + block.round() + ); + return Err(InternalError { error }); + }, + }, + Entry::Vacant(_) => { + // This shouldn't happen (the payload should already be present) + let error = format!( + "Missing payload data for block epoch {}, round {}!", + block.epoch(), + block.round() + ); + return Err(InternalError { error }); + }, + }; + + // Verify the payload and inline batches before returning the data. The + // batch digests and transactions will have already been verified by the + // consensus observer on message receipt. + let transaction_payload = block_payload.transaction_payload; + match payload { + Payload::DirectMempool(_) => { + let error = + "DirectMempool payloads should not be sent to the consensus observer!".into(); + return Err(InternalError { error }); + }, + Payload::InQuorumStore(proof_with_data) => { + // Verify the batches in the requested block + verify_batches_in_block(&proof_with_data.proofs, &transaction_payload)?; + }, + Payload::InQuorumStoreWithLimit(proof_with_data) => { + // Verify the batches in the requested block + verify_batches_in_block( + &proof_with_data.proof_with_data.proofs, + &transaction_payload, + )?; + + // Verify the transaction limit + verify_transaction_limit(proof_with_data.max_txns_to_execute, &transaction_payload)?; + }, + Payload::QuorumStoreInlineHybrid(inline_batches, proof_with_data, max_txns_to_execute) => { + // Verify the batches in the requested block + verify_batches_in_block(&proof_with_data.proofs, &transaction_payload)?; + + // Verify the inline batches + verify_inline_batches_in_block(inline_batches, &transaction_payload)?; + + // Verify the transaction limit + verify_transaction_limit(*max_txns_to_execute, &transaction_payload)?; + }, + } + + // If the payload is valid, publish it to any downstream observers + if let Some(consensus_publisher) = consensus_publisher { + let message = ConsensusObserverMessage::new_block_payload_message( + block.gen_block_info(HashValue::zero(), 0, None), + transaction_payload.clone(), + ); + consensus_publisher.publish_message(message).await; + } + + // Return the transactions and the transaction limit + Ok((transaction_payload.transactions, transaction_payload.limit)) +} + +/// Verifies that the batches in the block transaction payload +/// match the batches that were already verified by the observer. +fn verify_batches_in_block( + verified_proofs: &[ProofOfStore], + block_transaction_payload: &BlockTransactionPayload, +) -> ExecutorResult<()> { + let verified_batches: Vec<&BatchInfo> = + verified_proofs.iter().map(|proof| proof.info()).collect(); + let found_batches: Vec<&BatchInfo> = block_transaction_payload + .proof_with_data + .proofs + .iter() + .map(|proof| proof.info()) + .collect(); + + if verified_batches != found_batches { + Err(ExecutorError::InternalError { + error: format!( + "Expected batches {:?} but found {:?}!", + verified_batches, found_batches + ), + }) + } else { + Ok(()) + } +} + +/// Verifies that the inline batches in the block transaction payload +/// match the inline batches that were already verified by the observer. +fn verify_inline_batches_in_block( + verified_inline_batches: &[(BatchInfo, Vec)], + block_transaction_payload: &BlockTransactionPayload, +) -> ExecutorResult<()> { + let verified_batches: Vec = verified_inline_batches + .iter() + .map(|(batch_info, _)| batch_info.clone()) + .collect(); + let found_inline_batches = &block_transaction_payload.inline_batches; + + if verified_batches != *found_inline_batches { + Err(ExecutorError::InternalError { + error: format!( + "Expected inline batches {:?} but found {:?}", + verified_batches, found_inline_batches + ), + }) + } else { + Ok(()) + } +} + +/// Verifies that the transaction limit in the block transaction payload +/// matches the transaction limit that was already verified by the observer. +fn verify_transaction_limit( + max_txns_to_execute: Option, + block_transaction_payload: &BlockTransactionPayload, +) -> ExecutorResult<()> { + if max_txns_to_execute != block_transaction_payload.limit { + Err(ExecutorError::InternalError { + error: format!( + "Expected transaction limit {:?} but found {:?}", + max_txns_to_execute, block_transaction_payload.limit + ), + }) + } else { + Ok(()) } }