-
Notifications
You must be signed in to change notification settings - Fork 305
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
refactor(staking): š¹ hoist uptime tracking into `ValidatorUptimeTrackā¦
ā¦er` extension trait (#4099) this is a small collection of noop refactors, cherry-picked out of #4070. most importantly, this hoists `track_uptime` into a standalone extension trait. additionally, some improvements to telemetry helped make debugging tests in #4070 easier.
- Loading branch information
Showing
5 changed files
with
223 additions
and
113 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
23 changes: 23 additions & 0 deletions
23
crates/core/component/stake/src/component/stake/address.rs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,23 @@ | ||
use { | ||
sha2::{Digest, Sha256}, | ||
tendermint::PublicKey, | ||
}; | ||
|
||
/// A type alias for 20-byte truncated SHA256 validator addresses. | ||
/// | ||
/// This is the format in which [`tendermint::abci::types::CommitInfo`] presents vote information. | ||
pub(crate) type Address = [u8; ADDRESS_LEN]; | ||
|
||
const ADDRESS_LEN: usize = 20; | ||
|
||
/// Translates from consensus keys to the truncated sha256 hashes in `last_commit_info`. | ||
// | ||
// NOTE: This should really be a refined type upstream, but we can't currently upstream to | ||
// tendermint-rs, for process reasons, and shouldn't do our own tendermint data modeling, so | ||
// this is an interim hack. | ||
pub(crate) fn validator_address(ck: &PublicKey) -> Address { | ||
let ck_bytes = ck.to_bytes(); | ||
Sha256::digest(ck_bytes).as_slice()[0..ADDRESS_LEN] | ||
.try_into() | ||
.expect("Sha256 digest should be 20-bytes long") | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
189 changes: 189 additions & 0 deletions
189
crates/core/component/stake/src/component/validator_handler/uptime_tracker.rs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,189 @@ | ||
use { | ||
super::{ValidatorDataRead, ValidatorDataWrite, ValidatorManager}, | ||
crate::{ | ||
component::{ | ||
metrics, | ||
stake::{ | ||
address::{validator_address, Address}, | ||
ConsensusIndexRead, | ||
}, | ||
StateReadExt as _, | ||
}, | ||
params::StakeParameters, | ||
validator, IdentityKey, Uptime, | ||
}, | ||
anyhow::Result, | ||
async_trait::async_trait, | ||
cnidarium::StateWrite, | ||
futures::StreamExt as _, | ||
penumbra_sct::component::clock::EpochRead, | ||
std::collections::BTreeMap, | ||
tap::Tap, | ||
tendermint::abci::types::CommitInfo, | ||
tokio::task::{AbortHandle, JoinSet}, | ||
tracing::{debug, error_span, instrument, trace, Instrument}, | ||
}; | ||
|
||
/// A bundle of information about a validator used to track its uptime. | ||
type ValidatorInformation = (IdentityKey, tendermint::PublicKey, Uptime); | ||
|
||
/// A collection of tasks retrieving [`ValidatorInformation`]. | ||
type Lookups = JoinSet<anyhow::Result<Option<ValidatorInformation>>>; | ||
|
||
/// Tracks validator uptimes. | ||
/// | ||
/// Use [`track_uptime()`] to process a block's [`CommitInfo`] and update validator uptime | ||
/// bookkeeping. | ||
/// | ||
/// [`track_uptime()`]: Self::track_uptime | ||
#[async_trait] | ||
pub trait ValidatorUptimeTracker: StateWrite { | ||
#[instrument(skip(self, last_commit_info))] | ||
async fn track_uptime(&mut self, last_commit_info: &CommitInfo) -> Result<()> { | ||
// Note: this probably isn't the correct height for the LastCommitInfo, | ||
// which is about the *last* commit, but at least it'll be consistent, | ||
// which is all we need to count signatures. | ||
let height = self.get_block_height().await?; | ||
let params = self.get_stake_params().await?; | ||
|
||
// Build a mapping from addresses (20-byte truncated SHA256(pubkey)) to vote statuses. | ||
let did_address_vote = last_commit_info | ||
.votes | ||
.as_slice() | ||
.tap(|votes| { | ||
if votes.is_empty() { | ||
debug!("no validators voted") | ||
} else { | ||
debug!(len = %votes.len(), "collecting validator votes") | ||
} | ||
}) | ||
.into_iter() | ||
.map(|vote| (vote.validator.address, vote.sig_info.is_signed())) | ||
.inspect(|(address, voted)| { | ||
trace!( | ||
address = %hex::encode(address), | ||
%voted, | ||
"validator vote information" | ||
) | ||
}) | ||
.collect::<BTreeMap<Address, bool>>(); | ||
|
||
// Since we don't have a lookup from "addresses" to identity keys, | ||
// iterate over our app's validators, and match them up with the vote data. | ||
// We can fetch all the data required for processing each validator concurrently: | ||
let mut lookups = Lookups::new(); | ||
let mut validator_identity_stream = self.consensus_set_stream()?; | ||
while let Some(identity_key) = validator_identity_stream.next().await.transpose()? { | ||
self.spawn_validator_lookup_fut(identity_key, &mut lookups); | ||
} | ||
|
||
// Now process the data we fetched concurrently. | ||
// Note that this will process validator uptime changes in a random order, but because they are all | ||
// independent, this doesn't introduce any nondeterminism into the complete state change. | ||
while let Some(data) = lookups.join_next().await.transpose()? { | ||
if let Some(validator_info) = data? { | ||
self.process_vote(validator_info, &did_address_vote, ¶ms, height) | ||
.await?; | ||
} | ||
} | ||
|
||
Ok(()) | ||
} | ||
|
||
/// Spawns a future that will retrieve validator information. | ||
/// | ||
/// NB: This function is synchronous, but the lookup will run asynchronously as part of the | ||
/// provided [`JoinSet`]. This permits us to fetch information about all of the validators | ||
/// in the consensus set in parallel. | ||
/// | ||
/// # Panics | ||
/// | ||
/// This will panic if there is no recorded state for a validator with the given | ||
/// [`IdentityKey`]. | ||
fn spawn_validator_lookup_fut( | ||
&self, | ||
identity_key: crate::IdentityKey, | ||
lookups: &mut Lookups, | ||
) -> AbortHandle { | ||
// Define, but do not yet `.await` upon, a collection of futures fetching information | ||
// about a validator. | ||
let state = self.get_validator_state(&identity_key); | ||
let uptime = self.get_validator_uptime(&identity_key); | ||
let consensus_key = self.fetch_validator_consensus_key(&identity_key); | ||
|
||
// Define a span indicating that the spawned future follows from the current context. | ||
let span = { | ||
let span = error_span!("fetching validator information", %identity_key); | ||
let current = tracing::Span::current(); | ||
span.follows_from(current); | ||
span | ||
}; | ||
|
||
lookups.spawn( | ||
async move { | ||
let state = state | ||
.await? | ||
.expect("every known validator must have a recorded state"); | ||
|
||
match state { | ||
validator::State::Active => { | ||
// If the validator is active, we need its consensus key and current uptime data: | ||
Ok(Some(( | ||
identity_key, | ||
consensus_key | ||
.await? | ||
.expect("every known validator must have a recorded consensus key"), | ||
uptime | ||
.await? | ||
.expect("every known validator must have a recorded uptime"), | ||
))) | ||
} | ||
_ => { | ||
// Otherwise, we don't need to track its uptime, and there's no data to fetch. | ||
Ok(None) | ||
} | ||
} | ||
} | ||
.instrument(span), | ||
) | ||
} | ||
|
||
async fn process_vote( | ||
&mut self, | ||
(identity_key, consensus_key, mut uptime): ValidatorInformation, | ||
did_address_vote: &BTreeMap<Address, bool>, | ||
params: &StakeParameters, | ||
height: u64, | ||
) -> anyhow::Result<()> { | ||
let addr = validator_address(&consensus_key); | ||
let voted = did_address_vote | ||
.get(&addr) | ||
.cloned() | ||
// If the height is `1`, then the `LastCommitInfo` refers to the genesis block, | ||
// which has no signers -- so we'll mark all validators as having signed. | ||
// https://github.com/penumbra-zone/penumbra/issues/1050 | ||
.unwrap_or(height == 1); | ||
|
||
tracing::debug!( | ||
?voted, | ||
num_missed_blocks = ?uptime.num_missed_blocks(), | ||
?identity_key, | ||
?params.missed_blocks_maximum, | ||
"recorded vote info" | ||
); | ||
metrics::gauge!(metrics::MISSED_BLOCKS, "identity_key" => identity_key.to_string()) | ||
.increment(uptime.num_missed_blocks() as f64); | ||
|
||
uptime.mark_height_as_signed(height, voted)?; | ||
if uptime.num_missed_blocks() as u64 >= params.missed_blocks_maximum { | ||
self.set_validator_state(&identity_key, validator::State::Jailed) | ||
.await?; | ||
} else { | ||
self.set_validator_uptime(&identity_key, uptime); | ||
} | ||
|
||
Ok(()) | ||
} | ||
} | ||
|
||
impl<T: StateWrite + ?Sized> ValidatorUptimeTracker for T {} |
Oops, something went wrong.