Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(staking): 🌹 hoist uptime tracking into ValidatorUptimeTracker extension trait #4099

22 changes: 6 additions & 16 deletions crates/core/component/stake/src/component/stake.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
pub mod address;

use crate::params::StakeParameters;
use crate::rate::BaseRateData;
use crate::validator::{self, Validator};
Expand All @@ -14,7 +16,6 @@ use futures::{StreamExt, TryStreamExt};
use penumbra_num::Amount;
use penumbra_proto::{StateReadProto, StateWriteProto};
use penumbra_sct::component::clock::EpochRead;
use sha2::{Digest, Sha256};
use std::pin::Pin;
use std::str::FromStr;
use std::{collections::BTreeMap, sync::Arc};
Expand All @@ -25,7 +26,9 @@ use tendermint::{block, PublicKey};
use tracing::{error, instrument, trace};

use crate::component::epoch_handler::EpochHandler;
use crate::component::validator_handler::{ValidatorDataRead, ValidatorManager};
use crate::component::validator_handler::{
ValidatorDataRead, ValidatorManager, ValidatorUptimeTracker,
};

pub struct Staking {}

Expand Down Expand Up @@ -323,20 +326,7 @@ pub trait StateWriteExt: StateWrite {
identity_key: &IdentityKey,
consensus_key: &PublicKey,
) {
/// Translates from consensus keys to the truncated sha256 hashes in last_commit_info
/// This should really be a refined type upstream, but we can't currently upstream
/// to tendermint-rs, for process reasons, and shouldn't do our own tendermint data
/// modeling, so this is an interim hack.
fn validator_address(ck: &PublicKey) -> [u8; 20] {
let ck_bytes = ck.to_bytes();
let addr: [u8; 20] = Sha256::digest(ck_bytes).as_slice()[0..20]
.try_into()
.expect("Sha256 digest should be 20-bytes long");

addr
}

let address = validator_address(consensus_key);
let address = self::address::validator_address(consensus_key);
tracing::debug!(?identity_key, ?consensus_key, hash = ?hex::encode(address), "registering consensus key");
self.put(
state_key::validators::lookup_by::cometbft_address(&address),
Expand Down
23 changes: 23 additions & 0 deletions crates/core/component/stake/src/component/stake/address.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
use {
sha2::{Digest, Sha256},
tendermint::PublicKey,
};

/// A type alias for 20-byte truncated SHA256 validator addresses.
///
/// This is the format in which [`tendermint::abci::types::CommitInfo`] presents vote information.
pub(crate) type Address = [u8; ADDRESS_LEN];

const ADDRESS_LEN: usize = 20;

/// Translates from consensus keys to the truncated sha256 hashes in `last_commit_info`.
//
// NOTE: This should really be a refined type upstream, but we can't currently upstream to
// tendermint-rs, for process reasons, and shouldn't do our own tendermint data modeling, so
// this is an interim hack.
pub(crate) fn validator_address(ck: &PublicKey) -> Address {
let ck_bytes = ck.to_bytes();
Sha256::digest(ck_bytes).as_slice()[0..ADDRESS_LEN]
.try_into()
.expect("Sha256 digest should be 20-bytes long")
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,6 @@ pub mod validator_store;
pub use validator_store::ValidatorDataRead;
pub(crate) use validator_store::ValidatorDataWrite;
pub(crate) use validator_store::ValidatorPoolTracker;

pub mod uptime_tracker;
pub use uptime_tracker::ValidatorUptimeTracker;
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
use {
super::{ValidatorDataRead, ValidatorDataWrite, ValidatorManager},
crate::{
component::{
metrics,
stake::{
address::{validator_address, Address},
ConsensusIndexRead,
},
StateReadExt as _,
},
params::StakeParameters,
validator, IdentityKey, Uptime,
},
anyhow::Result,
async_trait::async_trait,
cnidarium::StateWrite,
futures::StreamExt as _,
penumbra_sct::component::clock::EpochRead,
std::collections::BTreeMap,
tap::Tap,
tendermint::abci::types::CommitInfo,
tokio::task::{AbortHandle, JoinSet},
tracing::{debug, error_span, instrument, trace, Instrument},
};

/// A bundle of information about a validator used to track its uptime.
type ValidatorInformation = (IdentityKey, tendermint::PublicKey, Uptime);

/// A collection of tasks retrieving [`ValidatorInformation`].
type Lookups = JoinSet<anyhow::Result<Option<ValidatorInformation>>>;

/// Tracks validator uptimes.
///
/// Use [`track_uptime()`] to process a block's [`CommitInfo`] and update validator uptime
/// bookkeeping.
///
/// [`track_uptime()`]: Self::track_uptime
#[async_trait]
pub trait ValidatorUptimeTracker: StateWrite {
#[instrument(skip(self, last_commit_info))]
async fn track_uptime(&mut self, last_commit_info: &CommitInfo) -> Result<()> {
// Note: this probably isn't the correct height for the LastCommitInfo,
// which is about the *last* commit, but at least it'll be consistent,
// which is all we need to count signatures.
let height = self.get_block_height().await?;
let params = self.get_stake_params().await?;

// Build a mapping from addresses (20-byte truncated SHA256(pubkey)) to vote statuses.
let did_address_vote = last_commit_info
.votes
.as_slice()
.tap(|votes| {
if votes.is_empty() {
debug!("no validators voted")
} else {
debug!(len = %votes.len(), "collecting validator votes")
}
})
.into_iter()
.map(|vote| (vote.validator.address, vote.sig_info.is_signed()))
.inspect(|(address, voted)| {
trace!(
address = %hex::encode(address),
%voted,
"validator vote information"
)
})
.collect::<BTreeMap<Address, bool>>();

// Since we don't have a lookup from "addresses" to identity keys,
// iterate over our app's validators, and match them up with the vote data.
// We can fetch all the data required for processing each validator concurrently:
let mut lookups = Lookups::new();
cratelyn marked this conversation as resolved.
Show resolved Hide resolved
let mut validator_identity_stream = self.consensus_set_stream()?;
while let Some(identity_key) = validator_identity_stream.next().await.transpose()? {
self.spawn_validator_lookup_fut(identity_key, &mut lookups);
}

// Now process the data we fetched concurrently.
// Note that this will process validator uptime changes in a random order, but because they are all
// independent, this doesn't introduce any nondeterminism into the complete state change.
while let Some(data) = lookups.join_next().await.transpose()? {
if let Some(validator_info) = data? {
self.process_vote(validator_info, &did_address_vote, &params, height)
cratelyn marked this conversation as resolved.
Show resolved Hide resolved
.await?;
}
}

Ok(())
}

/// Spawns a future that will retrieve validator information.
///
/// NB: This function is synchronous, but the lookup will run asynchronously as part of the
/// provided [`JoinSet`]. This permits us to fetch information about all of the validators
/// in the consensus set in parallel.
///
/// # Panics
///
/// This will panic if there is no recorded state for a validator with the given
/// [`IdentityKey`].
fn spawn_validator_lookup_fut(
&self,
identity_key: crate::IdentityKey,
lookups: &mut Lookups,
) -> AbortHandle {
// Define, but do not yet `.await` upon, a collection of futures fetching information
// about a validator.
let state = self.get_validator_state(&identity_key);
let uptime = self.get_validator_uptime(&identity_key);
let consensus_key = self.fetch_validator_consensus_key(&identity_key);

// Define a span indicating that the spawned future follows from the current context.
let span = {
let span = error_span!("fetching validator information", %identity_key);
let current = tracing::Span::current();
span.follows_from(current);
span
};

lookups.spawn(
async move {
let state = state
.await?
.expect("every known validator must have a recorded state");

match state {
validator::State::Active => {
// If the validator is active, we need its consensus key and current uptime data:
Ok(Some((
identity_key,
consensus_key
.await?
.expect("every known validator must have a recorded consensus key"),
uptime
.await?
.expect("every known validator must have a recorded uptime"),
)))
}
_ => {
// Otherwise, we don't need to track its uptime, and there's no data to fetch.
Ok(None)
}
}
}
.instrument(span),
)
}

async fn process_vote(
&mut self,
(identity_key, consensus_key, mut uptime): ValidatorInformation,
did_address_vote: &BTreeMap<Address, bool>,
params: &StakeParameters,
height: u64,
) -> anyhow::Result<()> {
let addr = validator_address(&consensus_key);
let voted = did_address_vote
.get(&addr)
.cloned()
// If the height is `1`, then the `LastCommitInfo` refers to the genesis block,
// which has no signers -- so we'll mark all validators as having signed.
// https://github.com/penumbra-zone/penumbra/issues/1050
.unwrap_or(height == 1);

tracing::debug!(
?voted,
num_missed_blocks = ?uptime.num_missed_blocks(),
?identity_key,
?params.missed_blocks_maximum,
"recorded vote info"
);
metrics::gauge!(metrics::MISSED_BLOCKS, "identity_key" => identity_key.to_string())
.increment(uptime.num_missed_blocks() as f64);

uptime.mark_height_as_signed(height, voted)?;
if uptime.num_missed_blocks() as u64 >= params.missed_blocks_maximum {
self.set_validator_state(&identity_key, validator::State::Jailed)
.await?;
} else {
self.set_validator_uptime(&identity_key, uptime);
}

Ok(())
}
}

impl<T: StateWrite + ?Sized> ValidatorUptimeTracker for T {}
Loading
Loading