diff --git a/crates/core/component/stake/src/component/stake.rs b/crates/core/component/stake/src/component/stake.rs index d3c06d2052..6871000caf 100644 --- a/crates/core/component/stake/src/component/stake.rs +++ b/crates/core/component/stake/src/component/stake.rs @@ -1,3 +1,5 @@ +pub mod address; + use crate::params::StakeParameters; use crate::rate::BaseRateData; use crate::validator::{self, Validator}; @@ -14,7 +16,6 @@ use futures::{StreamExt, TryStreamExt}; use penumbra_num::Amount; use penumbra_proto::{StateReadProto, StateWriteProto}; use penumbra_sct::component::clock::EpochRead; -use sha2::{Digest, Sha256}; use std::pin::Pin; use std::str::FromStr; use std::{collections::BTreeMap, sync::Arc}; @@ -25,7 +26,9 @@ use tendermint::{block, PublicKey}; use tracing::{error, instrument, trace}; use crate::component::epoch_handler::EpochHandler; -use crate::component::validator_handler::{ValidatorDataRead, ValidatorManager}; +use crate::component::validator_handler::{ + ValidatorDataRead, ValidatorManager, ValidatorUptimeTracker, +}; pub struct Staking {} @@ -323,20 +326,7 @@ pub trait StateWriteExt: StateWrite { identity_key: &IdentityKey, consensus_key: &PublicKey, ) { - /// Translates from consensus keys to the truncated sha256 hashes in last_commit_info - /// This should really be a refined type upstream, but we can't currently upstream - /// to tendermint-rs, for process reasons, and shouldn't do our own tendermint data - /// modeling, so this is an interim hack. - fn validator_address(ck: &PublicKey) -> [u8; 20] { - let ck_bytes = ck.to_bytes(); - let addr: [u8; 20] = Sha256::digest(ck_bytes).as_slice()[0..20] - .try_into() - .expect("Sha256 digest should be 20-bytes long"); - - addr - } - - let address = validator_address(consensus_key); + let address = self::address::validator_address(consensus_key); tracing::debug!(?identity_key, ?consensus_key, hash = ?hex::encode(address), "registering consensus key"); self.put( state_key::validators::lookup_by::cometbft_address(&address), diff --git a/crates/core/component/stake/src/component/stake/address.rs b/crates/core/component/stake/src/component/stake/address.rs new file mode 100644 index 0000000000..70c9458c53 --- /dev/null +++ b/crates/core/component/stake/src/component/stake/address.rs @@ -0,0 +1,23 @@ +use { + sha2::{Digest, Sha256}, + tendermint::PublicKey, +}; + +/// A type alias for 20-byte truncated SHA256 validator addresses. +/// +/// This is the format in which [`tendermint::abci::types::CommitInfo`] presents vote information. +pub(crate) type Address = [u8; ADDRESS_LEN]; + +const ADDRESS_LEN: usize = 20; + +/// Translates from consensus keys to the truncated sha256 hashes in `last_commit_info`. +// +// NOTE: This should really be a refined type upstream, but we can't currently upstream to +// tendermint-rs, for process reasons, and shouldn't do our own tendermint data modeling, so +// this is an interim hack. +pub(crate) fn validator_address(ck: &PublicKey) -> Address { + let ck_bytes = ck.to_bytes(); + Sha256::digest(ck_bytes).as_slice()[0..ADDRESS_LEN] + .try_into() + .expect("Sha256 digest should be 20-bytes long") +} diff --git a/crates/core/component/stake/src/component/validator_handler.rs b/crates/core/component/stake/src/component/validator_handler.rs index 05e268cf53..f5e5081a5f 100644 --- a/crates/core/component/stake/src/component/validator_handler.rs +++ b/crates/core/component/stake/src/component/validator_handler.rs @@ -5,3 +5,6 @@ pub mod validator_store; pub use validator_store::ValidatorDataRead; pub(crate) use validator_store::ValidatorDataWrite; pub(crate) use validator_store::ValidatorPoolTracker; + +pub mod uptime_tracker; +pub use uptime_tracker::ValidatorUptimeTracker; diff --git a/crates/core/component/stake/src/component/validator_handler/uptime_tracker.rs b/crates/core/component/stake/src/component/validator_handler/uptime_tracker.rs new file mode 100644 index 0000000000..3c3190a98c --- /dev/null +++ b/crates/core/component/stake/src/component/validator_handler/uptime_tracker.rs @@ -0,0 +1,189 @@ +use { + super::{ValidatorDataRead, ValidatorDataWrite, ValidatorManager}, + crate::{ + component::{ + metrics, + stake::{ + address::{validator_address, Address}, + ConsensusIndexRead, + }, + StateReadExt as _, + }, + params::StakeParameters, + validator, IdentityKey, Uptime, + }, + anyhow::Result, + async_trait::async_trait, + cnidarium::StateWrite, + futures::StreamExt as _, + penumbra_sct::component::clock::EpochRead, + std::collections::BTreeMap, + tap::Tap, + tendermint::abci::types::CommitInfo, + tokio::task::{AbortHandle, JoinSet}, + tracing::{debug, error_span, instrument, trace, Instrument}, +}; + +/// A bundle of information about a validator used to track its uptime. +type ValidatorInformation = (IdentityKey, tendermint::PublicKey, Uptime); + +/// A collection of tasks retrieving [`ValidatorInformation`]. +type Lookups = JoinSet>>; + +/// Tracks validator uptimes. +/// +/// Use [`track_uptime()`] to process a block's [`CommitInfo`] and update validator uptime +/// bookkeeping. +/// +/// [`track_uptime()`]: Self::track_uptime +#[async_trait] +pub trait ValidatorUptimeTracker: StateWrite { + #[instrument(skip(self, last_commit_info))] + async fn track_uptime(&mut self, last_commit_info: &CommitInfo) -> Result<()> { + // Note: this probably isn't the correct height for the LastCommitInfo, + // which is about the *last* commit, but at least it'll be consistent, + // which is all we need to count signatures. + let height = self.get_block_height().await?; + let params = self.get_stake_params().await?; + + // Build a mapping from addresses (20-byte truncated SHA256(pubkey)) to vote statuses. + let did_address_vote = last_commit_info + .votes + .as_slice() + .tap(|votes| { + if votes.is_empty() { + debug!("no validators voted") + } else { + debug!(len = %votes.len(), "collecting validator votes") + } + }) + .into_iter() + .map(|vote| (vote.validator.address, vote.sig_info.is_signed())) + .inspect(|(address, voted)| { + trace!( + address = %hex::encode(address), + %voted, + "validator vote information" + ) + }) + .collect::>(); + + // Since we don't have a lookup from "addresses" to identity keys, + // iterate over our app's validators, and match them up with the vote data. + // We can fetch all the data required for processing each validator concurrently: + let mut lookups = Lookups::new(); + let mut validator_identity_stream = self.consensus_set_stream()?; + while let Some(identity_key) = validator_identity_stream.next().await.transpose()? { + self.spawn_validator_lookup_fut(identity_key, &mut lookups); + } + + // Now process the data we fetched concurrently. + // Note that this will process validator uptime changes in a random order, but because they are all + // independent, this doesn't introduce any nondeterminism into the complete state change. + while let Some(data) = lookups.join_next().await.transpose()? { + if let Some(validator_info) = data? { + self.process_vote(validator_info, &did_address_vote, ¶ms, height) + .await?; + } + } + + Ok(()) + } + + /// Spawns a future that will retrieve validator information. + /// + /// NB: This function is synchronous, but the lookup will run asynchronously as part of the + /// provided [`JoinSet`]. This permits us to fetch information about all of the validators + /// in the consensus set in parallel. + /// + /// # Panics + /// + /// This will panic if there is no recorded state for a validator with the given + /// [`IdentityKey`]. + fn spawn_validator_lookup_fut( + &self, + identity_key: crate::IdentityKey, + lookups: &mut Lookups, + ) -> AbortHandle { + // Define, but do not yet `.await` upon, a collection of futures fetching information + // about a validator. + let state = self.get_validator_state(&identity_key); + let uptime = self.get_validator_uptime(&identity_key); + let consensus_key = self.fetch_validator_consensus_key(&identity_key); + + // Define a span indicating that the spawned future follows from the current context. + let span = { + let span = error_span!("fetching validator information", %identity_key); + let current = tracing::Span::current(); + span.follows_from(current); + span + }; + + lookups.spawn( + async move { + let state = state + .await? + .expect("every known validator must have a recorded state"); + + match state { + validator::State::Active => { + // If the validator is active, we need its consensus key and current uptime data: + Ok(Some(( + identity_key, + consensus_key + .await? + .expect("every known validator must have a recorded consensus key"), + uptime + .await? + .expect("every known validator must have a recorded uptime"), + ))) + } + _ => { + // Otherwise, we don't need to track its uptime, and there's no data to fetch. + Ok(None) + } + } + } + .instrument(span), + ) + } + + async fn process_vote( + &mut self, + (identity_key, consensus_key, mut uptime): ValidatorInformation, + did_address_vote: &BTreeMap, + params: &StakeParameters, + height: u64, + ) -> anyhow::Result<()> { + let addr = validator_address(&consensus_key); + let voted = did_address_vote + .get(&addr) + .cloned() + // If the height is `1`, then the `LastCommitInfo` refers to the genesis block, + // which has no signers -- so we'll mark all validators as having signed. + // https://github.com/penumbra-zone/penumbra/issues/1050 + .unwrap_or(height == 1); + + tracing::debug!( + ?voted, + num_missed_blocks = ?uptime.num_missed_blocks(), + ?identity_key, + ?params.missed_blocks_maximum, + "recorded vote info" + ); + metrics::gauge!(metrics::MISSED_BLOCKS, "identity_key" => identity_key.to_string()) + .increment(uptime.num_missed_blocks() as f64); + + uptime.mark_height_as_signed(height, voted)?; + if uptime.num_missed_blocks() as u64 >= params.missed_blocks_maximum { + self.set_validator_state(&identity_key, validator::State::Jailed) + .await?; + } else { + self.set_validator_uptime(&identity_key, uptime); + } + + Ok(()) + } +} + +impl ValidatorUptimeTracker for T {} diff --git a/crates/core/component/stake/src/component/validator_handler/validator_manager.rs b/crates/core/component/stake/src/component/validator_handler/validator_manager.rs index 58780c9012..710f2f04f1 100644 --- a/crates/core/component/stake/src/component/validator_handler/validator_manager.rs +++ b/crates/core/component/stake/src/component/validator_handler/validator_manager.rs @@ -2,7 +2,7 @@ use { crate::{ component::{ metrics, - stake::{ConsensusIndexRead, ConsensusIndexWrite, RateDataWrite}, + stake::{ConsensusIndexWrite, RateDataWrite}, validator_handler::{ validator_store::ValidatorPoolTracker, ValidatorDataRead, ValidatorDataWrite, }, @@ -16,17 +16,14 @@ use { anyhow::{ensure, Result}, async_trait::async_trait, cnidarium::StateWrite, - futures::StreamExt as _, penumbra_asset::asset, penumbra_num::Amount, penumbra_proto::StateWriteProto, penumbra_sct::component::clock::{EpochManager, EpochRead}, penumbra_sct::component::StateReadExt as _, penumbra_shielded_pool::component::AssetRegistry, - sha2::{Digest as _, Sha256}, std::collections::BTreeMap, - tendermint::abci::types::{CommitInfo, Misbehavior}, - tokio::task::JoinSet, + tendermint::abci::types::Misbehavior, tracing::{instrument, Instrument}, }; @@ -39,7 +36,6 @@ use { /// ## Validator management /// - Add validator definition via [`add_validator`]. /// - Update validator definitions via [`update_validator_definition`]. -/// - Tracking a validator's uptime via [`track_uptime`]. /// - Process byzantine behavior evidence via [`process_evidence`]. /// /// ## State machine interface @@ -82,7 +78,6 @@ use { /// [`update_validator_definition`]: Self::update_validator_definition /// [`set_validator_state`]: Self::set_validator_state /// [`try_precursor_transition`]: Self::try_precursor_transition -/// [`track_uptime`]: Self::track_uptime /// [`process_evidence`]: Self::process_evidence pub trait ValidatorManager: StateWrite { /// Execute a legal state transition, updating the validator records and @@ -637,96 +632,6 @@ pub trait ValidatorManager: StateWrite { Ok(()) } - #[instrument(skip(self, last_commit_info))] - async fn track_uptime(&mut self, last_commit_info: &CommitInfo) -> Result<()> { - // Note: this probably isn't the correct height for the LastCommitInfo, - // which is about the *last* commit, but at least it'll be consistent, - // which is all we need to count signatures. - let height = self.get_block_height().await?; - let params = self.get_stake_params().await?; - - // Build a mapping from addresses (20-byte truncated SHA256(pubkey)) to vote statuses. - let did_address_vote = last_commit_info - .votes - .iter() - .map(|vote| (vote.validator.address, vote.sig_info.is_signed())) - .collect::>(); - - // Since we don't have a lookup from "addresses" to identity keys, - // iterate over our app's validators, and match them up with the vote data. - // We can fetch all the data required for processing each validator concurrently: - let mut js = JoinSet::new(); - let mut validator_identity_stream = self.consensus_set_stream()?; - while let Some(identity_key) = validator_identity_stream.next().await { - let identity_key = identity_key?; - let state = self.get_validator_state(&identity_key); - let uptime = self.get_validator_uptime(&identity_key); - let consensus_key = self.fetch_validator_consensus_key(&identity_key); - js.spawn(async move { - let state = state - .await? - .expect("every known validator must have a recorded state"); - - match state { - validator::State::Active => { - // If the validator is active, we need its consensus key and current uptime data: - Ok(Some(( - identity_key, - consensus_key - .await? - .expect("every known validator must have a recorded consensus key"), - uptime - .await? - .expect("every known validator must have a recorded uptime"), - ))) - } - _ => { - // Otherwise, we don't need to track its uptime, and there's no data to fetch. - anyhow::Ok(None) - } - } - }); - } - // Now process the data we fetched concurrently. - // Note that this will process validator uptime changes in a random order, but because they are all - // independent, this doesn't introduce any nondeterminism into the complete state change. - while let Some(data) = js.join_next().await.transpose()? { - if let Some((identity_key, consensus_key, mut uptime)) = data? { - // for some reason last_commit_info has truncated sha256 hashes - let addr: [u8; 20] = - Sha256::digest(&consensus_key.to_bytes()).as_slice()[0..20].try_into()?; - - let voted = did_address_vote - .get(&addr) - .cloned() - // If the height is `1`, then the `LastCommitInfo` refers to the genesis block, - // which has no signers -- so we'll mark all validators as having signed. - // https://github.com/penumbra-zone/penumbra/issues/1050 - .unwrap_or(height == 1); - - tracing::debug!( - ?voted, - num_missed_blocks = ?uptime.num_missed_blocks(), - ?identity_key, - ?params.missed_blocks_maximum, - "recorded vote info" - ); - metrics::gauge!(metrics::MISSED_BLOCKS, "identity_key" => identity_key.to_string()) - .increment(uptime.num_missed_blocks() as f64); - - uptime.mark_height_as_signed(height, voted)?; - if uptime.num_missed_blocks() as u64 >= params.missed_blocks_maximum { - self.set_validator_state(&identity_key, validator::State::Jailed) - .await?; - } else { - self.set_validator_uptime(&identity_key, uptime); - } - } - } - - Ok(()) - } - /// Process evidence of byzantine behavior from CometBFT. /// /// Evidence *MUST* be processed before `end_block` is called, because